spark DataFrame의 신규 컬럼 추가
- withColumn() 은 일종의 업데이트/추가 기능이라고 생각하면 된다.
- 구현할 때 입력값의 형식을 어떻게 해야하는지 유의하여 코드를 작성한다.
withColumn('신규/update 되는 컬럼명', '신규/Update 되는 값')
>> 신규 컬럼은 문자열로, 기존 컬럼은 반드시 컬럼형으로 기입(col('컬럼명'))
(예시)
titanic_sdf_copied = titanic_sdf_copied.withColumn('Extra_Fare', col('Fare') * 10)
# 기존 컬럼 값 update
titanic_sdf_copied = titanic_sdf_copied.withColumn('Fare', col('Fare') + 20)
# 컬럼 타입 변경
- .cast() 함수를 써서 타입 변경 : SQL과 유사함
titanic_sdf_copied = titanic_sdf_copied.withColumn('Fare', col('Fare').cast('Integer'))
▶︎ 한 번에 적용
#한번에 여러 withColumn() 변경을 적용
titanic_sdf_copied = titanic_sdf_copied.withColumn('Extra_Fare', col('Fare') * 10).withColumn('Fare', col('Fare') + 20).withColumn('Fare', col('Fare').cast('Integer'))
- (유의) 단, 상수값으로 업데이트 시에는 반드시 lit()함수를 적용해야함.
from pyspark.sql.functions import lit
# 상수 값으로 update
titanic_sdf_copied = titanic_sdf_copied.withColumn('Extra_Fare', lit(10))
# 상수 값으로 신규 컬럼 생성
titanic_sdf_copied = titanic_sdf_copied.withColumn('New_Name', lit('Test_name'))
display(titanic_sdf_copied.limit(10))
▶︎ 컬럼명 변경
- withColumnRenamed( 기존 컬럼명, 변경 컬럼명 )으로 변경
- 단, 변경하려는 컬럼명이 없어도 오류가 발생하지 않으니 주의하자.
titanic_sdf_copied = titanic_sdf_copied.withColumnRenamed('Gender', 'Gender_Renamed')
▶︎ substring()
from pyspark.sql.functions import col, substring
titanic_sdf_copied = titanic_sdf_copied.withColumn('Gender_01', col('Sex'))
.withColumn('Cabin_First_01', substring('Cabin', 0, 1))
▶︎ split()
# SQL function split()을 이용하여 문자열을 ','로 분리하여 새로운 컬럼명 Name1, Name2 생성.
titanic_sdf_copied = titanic_sdf_copied.withColumn('Name1', split(col('Name'), ',').getItem(0))
titanic_sdf_copied = titanic_sdf_copied.withColumn('Name2', split(col('Name'), ',').getItem(1))
#아래와 같이 한번에 쓸 수 있음
titanic_sdf_copied = titanic_sdf_copied.withColumn('Name1', split(col('Name'), ',').getItem(0)).withColumn('Name2', split(col('Name'), ',').getItem(1))
728x90
'Python > spark(python)' 카테고리의 다른 글
[Spark] Spark에서 사용자 정의 함수를 적용 & When 사용 (0) | 2023.10.25 |
---|---|
[Spark] 레코드와 컬럼 삭제 / 결측치 확인 및 처리 (0) | 2023.10.25 |
[Spark] spark DataFrame의 orderBy( )와 aggregation (1) | 2023.10.24 |
[Spark] select() 와 filter() 메서드 (1) | 2023.10.24 |
[Spark] databrick으로 데이터 로드하고, 기술통계량 확인하기 (1) | 2023.10.23 |