본문 바로가기
Python/spark(python)

[Spark] spark DataFrame의 orderBy( )와 aggregation

by ISLA! 2023. 10. 24.

OrderBy( )

  • 정렬 컬럼은 문자열, 또는 컬럼 형태로 입력
  • 정렬 컬럼이 여러개라면 개별 컬럼을 넣거나, list로 넣어도 됨
  • 정렬 컬럼별로 서로 다른 정렬 옵션을 적용할 때는 ascending=[True, False]와 같은 형태로 입력
  • 아래 예시를 보려면 display() 안에 넣어주기!
# Pclass와 Name 컬럼으로 내림차순 정렬
titanic_pdf.sort_values(by=['Pclass', 'Name'], ascending=False)

# Pclass는 오름차순, Name은 내림차순 정렬
titanic_pdf.sort_values(by=['Pclass', 'Name'], ascending=[True, False])
from pyspark.sql.functions import col 

# orderBy에 컬럼명을 col('컬럼명') 컬럼형태로 오름 차순 정렬
titanic_sdf.orderBy(col('Name'), ascending=True).show()

# orderBy에 여러개의 컬럼명을 컬럼형태로 지정하고 내림 차순 정렬
titanic_sdf.orderBy(col("Pclass"), col("Name"), ascending=False).show()

 

  • 정렬 컬럼별로 서로 다른 정렬 옵션을 적용할 때는 ascending=[True, False]와 같은 형태로 입력
  • 각 컬럼에 대한 asc(), desc() 지정도 가능함
# orderBy에 여러개의 컬럼명을 컬럼형태로 지정하고 서로 다른 방식으로 정렬
titanic_sdf.orderBy(col('Pclass'), col('Name'), ascending=[True, False]).show()

# 개별 컬럼별로 asc(), desc()를 적용. 
titanic_sdf.orderBy(col('Pclass').asc(), col('Name').desc()).show()

 

  • orderby와 동일한 메소드로 sort()도 사용 가능
titanic_sdf.sort(col('Pclass').asc(), col('Name').desc()).show()

aggregation 

  • spark DataFrame에 count() 를 적용하면 DataFrame의 Record 건수 반환
  • 다른 aggregation 함수는 이런식으로 적용하면 오류가 난다. >> 어떤 컬럼에 함수를 적용할 지 명시해야 한다!
print('count 결과:', titanic_sdf.count())

 

  • pyspark.sql.functions에서 max, min, sum을 불러와서 사용할 수 있다.
  • select 절에 불러와서 max를 계산했으므로, 데이터프레임으로 반환된다.
titanic_sdf_max = titanic_sdf.select(max('Age'))
print(titanic_sdf_max.show())

 

 

groupBy + aggregation

  • pandas 데이터프레임의 방식 : Pclass 로 그룹화하여, 컬럼별로 다른 집계함수를 적용하는 법
# 따로 딕셔너리로 집계함수 지정
agg_format = {'Age':'max', 'SibSp':'sum', 'Fare':'mean'}

# 그룹바이 + 집계
titanic_pdf.groupby(by='Pclass').agg(agg_format)

 

  • spark 데이터프레임의 방식
titanic_sdf.groupBy('Pclass').count().show()

 

  • 위의 결과도 스파크 데이터프레임이므로, count 컬럼 기준으로 order by 할 수 있음
titanic_sdf.groupBy('Pclass').count().orderBy('count', ascending=False).show()

 

  • groupby 후, 특정 컬럼에 Max() 적용해보기 
  • 이때 주의할 점은 Max() 안에 컬럼형 인자가 아닌, 오직 문자열 컬럼만 가능하다.(예시 참고)
# 옳은 방식
titanic_sdf.groupBy('Pclass').max('Age').show()

 

  • 여러 컬럼으로 groupby 하기
titanic_sdf.groupBy('Pclass', 'Sex').max('Age').show()
titanic_sdf.groupBy(['Pclass', 'Sex']).max('Age').show()

 

▶︎ 추천하는 방식 : .agg()

 여러개의 aggregation 함수를 적용할 경우 : agg()메소드 내에서 개별 aggregation 함수 명시 

from pyspark.sql.functions import max, avg, sum, min

titanic_sdf.groupBy('Pclass').agg(max('Age'), min('Age'), sum('Age'), avg('Age')).show()

 

(데이터프레임이 아닌) 컬럼 각각에 alias로 그룹바이 결과 컬럼명을 변경할 수 있음

titanic_sdf.groupBy('Pclass').agg(
    max(col('Age')).alias('max_age'), min('Age').alias('min_age'), \
    sum('Age').alias('sum_age'), avg('Age').alias('avg_age') \
    ).show()
    
# 여기서 결과값에 대한 필터 적용시(SQL의 having 같은 것) >> .filter()
titanic_sdf.groupBy('Pclass').agg(max(col('Age')).alias('max_age'), min('Age').alias('min_age') , \
                             sum('Age').alias('sum_age'), avg('Age').alias('avg_age') \
                             ).filter(col('max_age') > 70).show()

 

728x90