본문 바로가기
Python/spark(python)

[Spark] 레코드와 컬럼 삭제 / 결측치 확인 및 처리

by ISLA! 2023. 10. 25.

단일 칼럼 삭제

  • drop() 메서드 인자로 단일 칼럼명 문자열, 또는 칼럼명 칼럼형을 입력 
  • drop 할 컬럼이 원본 데이터프레임에 존재하지 않아도 오류가 발생하지 않음! 유의!
# 단일 컬럼 삭제
titanic_sdf_copied = titanic_sdf_copied.drop('Name')
titanic_sdf_copied = titanic_sdf_copied.drop(col('Sex'))

 

여러 개 칼럼 삭제

  • pandas와 달리, list가 아니라 단일 칼럼명들을 각각 인자로 넣어줘야 한다.
  • 이때, 컬럼형 인자가 아닌 문자형 칼럼명으로 인자 넣어주기 >> 칼럼형 인자 나열은 오류남
titanic_sdf_copied.drop('Age', 'SibSp').limit(10).show()

 

  • 또는 아래와 같이 삭제할 컬럼을 리스트에 저장 후, unpacking 하여 입력
drop_columns = ['Age', 'SibSp']
titanic_sdf_copied.drop(*drop_columns).limit(10).show()

 

특정 조건의 Row 삭제

  • drop() 메소드 사용이 불가능하므로, filter()로 특정 조건에 해당하지 않는 로우를 걸러내는 방식을 사용
    • 아래 예시는 C를 지우기 위해, C가 아닌 것을 필터링한 결과를 반환하게 하는 것이다.
titanic_sdf_removed_Embarked_C = titanic_sdf.filter(col('Embarked') != 'C')

Null 삭제

  • spark 데이터프레임에서 dropna() 메서드는 레코드에 하나라도 Null 또는 nan이 있으면 삭제한 결과를 반환
  • 아래와 같이 두가지 방법이 있다.
titanic_sdf_dropna_02 = titanic_sdf.na.drop()
titanic_sdf_dropna_01 = titanic_sdf.dropna()

 

  • 특정 컬럼에 Null값이 있는 경우만 삭제하는 경우 : subset = ['칼럼명'] 
titanic_sdf_dropna_03 = titanic_sdf.na.drop(subset=["Age", "Embarked"])

pandas와 spark 의 null/nan/none 비교

  • pandas는 csv와 같은 파일에서 로드 시 특정 칼럼에 데이터가 없을 경우에 문자열 칼럼일 경우 None으로 숫자형 칼럼일 경우 NaN으로 할당. 단 NaN으로 할당 시에는 int형 칼럼이라도 float형으로 변경됨.
  • spark에서는 어떤 경우라도 null로 변경됨(SQL과 동일) >> Null 만 기억하고 고려하자!
▶︎ pandas DataFrame의 isnull()과 isna()는 서로 동일한 메소드임. isnull(), isna() 모두 None과 NaN을 모두 찾음.
▶︎ spark DataFrame isNull()은 null만 찾아줌, isnan()은 NaN만 찾음.
    또한 isNull()은 컬럼 조건에 붙어서 filter()메소드와 함께 사용되며, isnan()은 pyspark.sql.functions의 함수로 정의됨.

 

  • 아래와 같은 방법으로 isnull()로 필터링 가능
titanic_sdf.filter(col('Age').isNull()).show(10)
titanic_sdf.filter('Age is Null').show(10)

 

 

  • nan 값은 아래 코드로 확인하고, null로 바꿔준 다음 >> isnull() 을 사용하는 것을 추천
    • 일반적으로, isnan()은 거의 없음  >> 따라서 먼저 nan이 존재하는지 확인하는 것
# nan >> none 으로 바꾸면 null 이 됨
sdf.replace(float('nan'), None)
# 참고: 아래 코드로 Null, NaN 모두 확인함
display(titanic_sdf.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in titanic_sdf.columns]))

Null이 있는 컬럼명과 Null 건수 찾기

display(titanic_sdf.select([count(when(isnan(c), c)).alias(c) for c in titanic_sdf.columns]))


결측치 처리

  • 일괄적인 값으로 처리하는 경우
# 숫자값 결측치 일괄 처리
titanic_sdf.fillna(value=999, subset=['Age']).show(10) # titanic_pdf['Age'].fillna(999, inplace=False)

# 문자값 결측치 일괄 처리
titanic_sdf.fillna(value='NA', subset=['Cabin']).show(10) # titanic_pdf['Cabin'].fillna('NA', inplace=False)

 

  • 경우에 따라 다른 값으로 처리하는 경우
    • 예 : Age 컬럼의 평균 확인하는 코드 >> 결과는 스파크 데이터프레임(단일 값이 아님)
    • 따라서 해당 데이터프레임의 첫번째 값을 가져오는 first() 함수를 써야한다.
import pyspark.sql.functions as F
from pyspark.sql.functions import avg, col

avg_age = titanic_sdf.select(F.avg(F.col('Age')))
print(avg_age.show())

  • first()[0] 로 단일 값을 가져온다.
  • 유사한 기능을 하는 것이 .head()인데, 이는 데이터프레임의 맨 위부터 n개까지의 row를 리스트로 반환한다.
    아무값도 안넣으면 1건만 단일 값을 가져온다. >> first()[0] 와 동일한 결과가 나옴
# DataFrame의 단일 Row에서 맨 첫번째 개별 value를 가져와야함
avg_age_value = avg_age.first()[0]
print(avg_age_value, type(avg_age_value))

#결과 ---29.69911764705882 <class 'float'>

# 해당 값으로 결측치 채우기
titanic_sdf.fillna(value=avg_age_value, subset=['Age']).show()

 

▶︎ 종합

다음과 같이 Spark DataFrame의 fillna()에 인자로 Dict를 입력하여 여러개의 컬럼들에 대해서 결측치 값을 입력하는 것 추천

import pyspark.sql.functions as F

avg_age = titanic_sdf.select(F.avg(F.col('Age')))
avg_age_row = avg_age.head()
avg_age_value = avg_age.head()[0]

titanic_sdf_filled = titanic_sdf.fillna({'Age': avg_age_value, 
                                         'Cabin': 'C000',
                                         'Embarked': 'S'
})
728x90