๐ Spark์์ ์ฌ์ฉ์ ์ ์ ํจ์๋ฅผ ์์ฑํ๊ณ ์ ์ฉํ๊ธฐ
(ํ์ด์ฌ์์์ ํจ์ ์ ์ ๋ฐ ์ ์ฉ๊ณผ ๋น์ทํ ๋ฏ ๋ค๋ฅธ ๋ถ๋ถ์ด ์์)
๐ SQL์ case when ๊ณผ ์ ์ฌํ When ์ ์ฌ์ฉํด์ ํจ์์ ์ ์ฌํ ๊ฒฐ๊ณผ๋ฅผ ๋ง๋ค๊ธฐ
๐ SQL์ case when ๋ฌธ์ ๊ทธ๋๋ก ์ ์ฉํด๋ณด๊ธฐ
1. ํ์ด์ฌ ํจ์ ์์ฑํ๊ธฐ
- ๋จผ์ ์ผ๋ฐ python์ฉ ํจ์๋ฅผ ์์ฑํ๋ค.
- ์ด๋, ๋ฐ๋์ ์ ๋ ฅ ๊ฐ๊ณผ ๋ฐํ ๊ฐ์ ์ค์ & ๊ฐ์ด None์ผ ๊ฒฝ์ฐ ์ฒ๋ฆฌ ๋ฐฉ์๋ ์ค์
# ์ผ๋ฐ python์ฉ UDF๋ฅผ ์์ฑ
def get_category(age):
cat = ''
# age ๊ฐ์ด None์ผ ๊ฒฝ์ฐ๋ NA๋ฅผ Return
if age == None:
return 'NA'
if age <= 5: cat = 'Baby'
elif age <= 12: cat = 'Child'
elif age <= 18: cat = 'Teenager'
elif age <= 25: cat = 'Student'
elif age <= 35: cat = 'Young Adult'
elif age <= 60: cat = 'Adult'
else : cat = 'Elderly'
return cat
2. python์ฉ ํจ์๋ฅผ Pyspark์ฉ ํจ์๋ก ๋ณํ
ํจ์ ๊ฐ์ฒด = udf(lambda x : ํจ์๋ช (x), ํด๋น ํจ์์ ๋ฐํํ)
= udf(lambda ์ ๋ ฅ๋ณ์: ์ผ๋ฐ UDF, ํด๋น ์ผ๋ฐ UDF์ ๋ฐํํ)
from pyspark.sql.functions import udf,col
from pyspark.sql.types import StringType
udf_get_category = udf(lambda x:get_category(x), StringType() )
3. pySpark์ฉ ํจ์ ์ ์ฉ
2์์ ๋ง๋ ํจ์ ๊ฐ์ฒด๋ฅผ ์ ์ฉํ๋๋ฐ, ์ด๋ ๊ฐ์ฒด๋ช (ํจ์๋ฅผ ์ ์ฉํ ์ปฌ๋ผ๋ช ) ์ ๋ฐฉ์์ผ๋ก ์ ์ฉ
titanic_sdf_filled_01 = titanic_sdf_filled.withColumn("Age_Category",udf_get_category(col("Age")))
titanic_sdf_filled_01.show()
๐ SQL์ case when ๊ณผ ์ ์ฌํ When ์ ์ฌ์ฉํด๋ณด๊ธฐ
์์์ ์ ์ฉํ case ๋ณ ๋ฐํ ๊ฐ์ด ๋ค๋ฅธ ํจ์๋ฅผ when ์ผ๋ก ๊ตฌํํ ์ ์๋ค.
- ์ ์ํ ์ 1 : when ์ผ๋ก ์ผ์ด์ค๋ฅผ ๋๋ ๋ .when์ผ๋ก ๊ฒฐ๊ณผ๊ฐ์ฒด์ ๋ํด์ ์กฐ๊ฑด์ ์จ์ค์ผ ํจ
- ์ ์ํ ์ 2 : ๊ตฌ์กฐ๋ sql๊ณผ ์ ์ฌํ์ง๋ง, ๋ง์ง๋ง์ else๊ฐ ์๋ otherwise๋ก ๋ง๋ฌด๋ฆฌํ๋ค.
from pyspark.sql.functions import when
titanic_sdf_filled_02 = titanic_sdf_filled.withColumn('Age_category', when(F.col('Age') <= 5, 'Baby')
.when(F.col('Age') <= 12, 'Child')
.when(F.col('Age') <= 18, 'Teenage')
.when(F.col('Age') <= 25, 'Student')
.when(F.col('Age') <= 35, 'Young Adult')
.when(F.col('Age') <= 60, 'Adult')
.when(F.col('Age').isNull(), 'NA')
.otherwise('Elderly'))
titanic_sdf_filled_02.limit(10).show()
๐ SQL์ case when ๋ฌธ์ ๊ทธ๋๋ก ์ ์ฉํด๋ณด๊ธฐ
์๋์ ๊ฐ์ด expr์ ํธ์ถํ์ฌ, SQL ๋ฌธ์ ํฐ ๋ฐ์ดํ ์์ ๋ฃ์ด์, case when ์ ์ ์ฉํด์ค ์๋ ์์!
from pyspark.sql.functions import expr, col
titanic_sdf_filled_03 = titanic_sdf.withColumn('Age_category', expr("CASE WHEN age = 12 THEN 'Child' " +
" WHEN Age <= 18 THEN 'Teenage' " +
" WHEN Age <= 25 THEN 'Student' " +
" WHEN Age <= 35 THEN 'Young Adult' " +
" WHEN Age <= 60 THEN 'Adult' " +
" WHEN Age IS NULL THEN 'NA' " +
" ELSE 'Elderly' "))
titanic_sdf_filled_03.limit(10).show()
'Python > spark(python)' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Spark] ๋ ์ฝ๋์ ์ปฌ๋ผ ์ญ์ / ๊ฒฐ์ธก์น ํ์ธ ๋ฐ ์ฒ๋ฆฌ (0) | 2023.10.25 |
---|---|
[Spark] ์ปฌ๋ผ ์์ฑ/์ ๋ฐ์ดํธ๋ฅผ ์ํ withColumn() , substring(), split() (0) | 2023.10.24 |
[Spark] spark DataFrame์ orderBy( )์ aggregation (1) | 2023.10.24 |
[Spark] select() ์ filter() ๋ฉ์๋ (1) | 2023.10.24 |
[Spark] databrick์ผ๋ก ๋ฐ์ดํฐ ๋ก๋ํ๊ณ , ๊ธฐ์ ํต๊ณ๋ ํ์ธํ๊ธฐ (1) | 2023.10.23 |