OrderBy( )
- 정렬 컬럼은 문자열, 또는 컬럼 형태로 입력
- 정렬 컬럼이 여러개라면 개별 컬럼을 넣거나, list로 넣어도 됨
- 정렬 컬럼별로 서로 다른 정렬 옵션을 적용할 때는 ascending=[True, False]와 같은 형태로 입력
- 아래 예시를 보려면 display() 안에 넣어주기!
# Pclass와 Name 컬럼으로 내림차순 정렬
titanic_pdf.sort_values(by=['Pclass', 'Name'], ascending=False)
# Pclass는 오름차순, Name은 내림차순 정렬
titanic_pdf.sort_values(by=['Pclass', 'Name'], ascending=[True, False])
from pyspark.sql.functions import col
# orderBy에 컬럼명을 col('컬럼명') 컬럼형태로 오름 차순 정렬
titanic_sdf.orderBy(col('Name'), ascending=True).show()
# orderBy에 여러개의 컬럼명을 컬럼형태로 지정하고 내림 차순 정렬
titanic_sdf.orderBy(col("Pclass"), col("Name"), ascending=False).show()
- 정렬 컬럼별로 서로 다른 정렬 옵션을 적용할 때는 ascending=[True, False]와 같은 형태로 입력
- 각 컬럼에 대한 asc(), desc() 지정도 가능함
# orderBy에 여러개의 컬럼명을 컬럼형태로 지정하고 서로 다른 방식으로 정렬
titanic_sdf.orderBy(col('Pclass'), col('Name'), ascending=[True, False]).show()
# 개별 컬럼별로 asc(), desc()를 적용.
titanic_sdf.orderBy(col('Pclass').asc(), col('Name').desc()).show()
- orderby와 동일한 메소드로 sort()도 사용 가능
titanic_sdf.sort(col('Pclass').asc(), col('Name').desc()).show()
aggregation
- spark DataFrame에 count() 를 적용하면 DataFrame의 Record 건수 반환
- 다른 aggregation 함수는 이런식으로 적용하면 오류가 난다. >> 어떤 컬럼에 함수를 적용할 지 명시해야 한다!
print('count 결과:', titanic_sdf.count())
- pyspark.sql.functions에서 max, min, sum을 불러와서 사용할 수 있다.
- select 절에 불러와서 max를 계산했으므로, 데이터프레임으로 반환된다.
titanic_sdf_max = titanic_sdf.select(max('Age'))
print(titanic_sdf_max.show())
groupBy + aggregation
- pandas 데이터프레임의 방식 : Pclass 로 그룹화하여, 컬럼별로 다른 집계함수를 적용하는 법
# 따로 딕셔너리로 집계함수 지정
agg_format = {'Age':'max', 'SibSp':'sum', 'Fare':'mean'}
# 그룹바이 + 집계
titanic_pdf.groupby(by='Pclass').agg(agg_format)
- spark 데이터프레임의 방식
titanic_sdf.groupBy('Pclass').count().show()
- 위의 결과도 스파크 데이터프레임이므로, count 컬럼 기준으로 order by 할 수 있음
titanic_sdf.groupBy('Pclass').count().orderBy('count', ascending=False).show()
- groupby 후, 특정 컬럼에 Max() 적용해보기
- 이때 주의할 점은 Max() 안에 컬럼형 인자가 아닌, 오직 문자열 컬럼만 가능하다.(예시 참고)
# 옳은 방식
titanic_sdf.groupBy('Pclass').max('Age').show()
- 여러 컬럼으로 groupby 하기
titanic_sdf.groupBy('Pclass', 'Sex').max('Age').show()
titanic_sdf.groupBy(['Pclass', 'Sex']).max('Age').show()
▶︎ 추천하는 방식 : .agg()
여러개의 aggregation 함수를 적용할 경우 : agg()메소드 내에서 개별 aggregation 함수 명시
from pyspark.sql.functions import max, avg, sum, min
titanic_sdf.groupBy('Pclass').agg(max('Age'), min('Age'), sum('Age'), avg('Age')).show()
(데이터프레임이 아닌) 컬럼 각각에 alias로 그룹바이 결과 컬럼명을 변경할 수 있음
titanic_sdf.groupBy('Pclass').agg(
max(col('Age')).alias('max_age'), min('Age').alias('min_age'), \
sum('Age').alias('sum_age'), avg('Age').alias('avg_age') \
).show()
# 여기서 결과값에 대한 필터 적용시(SQL의 having 같은 것) >> .filter()
titanic_sdf.groupBy('Pclass').agg(max(col('Age')).alias('max_age'), min('Age').alias('min_age') , \
sum('Age').alias('sum_age'), avg('Age').alias('avg_age') \
).filter(col('max_age') > 70).show()
728x90
'Python > spark(python)' 카테고리의 다른 글
[Spark] 레코드와 컬럼 삭제 / 결측치 확인 및 처리 (0) | 2023.10.25 |
---|---|
[Spark] 컬럼 생성/업데이트를 위한 withColumn() , substring(), split() (0) | 2023.10.24 |
[Spark] select() 와 filter() 메서드 (1) | 2023.10.24 |
[Spark] databrick으로 데이터 로드하고, 기술통계량 확인하기 (1) | 2023.10.23 |
[Spark] Databricks로 시작하기 (0) | 2023.10.23 |