๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
Python/spark(python)

[Spark] Spark์—์„œ ์‚ฌ์šฉ์ž ์ •์˜ ํ•จ์ˆ˜๋ฅผ ์ ์šฉ & When ์‚ฌ์šฉ

by ISLA! 2023. 10. 25.

๐Ÿ‘‰  Spark์—์„œ ์‚ฌ์šฉ์ž ์ •์˜ ํ•จ์ˆ˜๋ฅผ ์ž‘์„ฑํ•˜๊ณ  ์ ์šฉํ•˜๊ธฐ
      (ํŒŒ์ด์ฌ์—์„œ์˜ ํ•จ์ˆ˜ ์ •์˜ ๋ฐ ์ ์šฉ๊ณผ ๋น„์Šทํ•œ ๋“ฏ ๋‹ค๋ฅธ ๋ถ€๋ถ„์ด ์žˆ์Œ)

๐Ÿ‘‰  SQL์˜ case when ๊ณผ ์œ ์‚ฌํ•œ When ์„ ์‚ฌ์šฉํ•ด์„œ ํ•จ์ˆ˜์™€ ์œ ์‚ฌํ•œ ๊ฒฐ๊ณผ๋ฅผ ๋งŒ๋“ค๊ธฐ

๐Ÿ‘‰ SQL์˜ case when ๋ฌธ์„ ๊ทธ๋Œ€๋กœ ์ ์šฉํ•ด๋ณด๊ธฐ


1. ํŒŒ์ด์ฌ ํ•จ์ˆ˜ ์ž‘์„ฑํ•˜๊ธฐ

  • ๋จผ์ € ์ผ๋ฐ˜ python์šฉ ํ•จ์ˆ˜๋ฅผ ์ž‘์„ฑํ•œ๋‹ค. 
  • ์ด๋•Œ, ๋ฐ˜๋“œ์‹œ ์ž…๋ ฅ ๊ฐ’๊ณผ ๋ฐ˜ํ™˜ ๊ฐ’์„ ์„ค์ • & ๊ฐ’์ด None์ผ ๊ฒฝ์šฐ ์ฒ˜๋ฆฌ ๋ฐฉ์‹๋„ ์„ค์ •
# ์ผ๋ฐ˜ python์šฉ UDF๋ฅผ ์ž‘์„ฑ
def get_category(age):
    cat = ''
    
    # age ๊ฐ’์ด None์ผ ๊ฒฝ์šฐ๋Š” NA๋ฅผ Return
    if age == None:
        return 'NA'
    
    if age <= 5: cat = 'Baby'
    elif age <= 12: cat = 'Child'
    elif age <= 18: cat = 'Teenager'
    elif age <= 25: cat = 'Student'
    elif age <= 35: cat = 'Young Adult'
    elif age <= 60: cat = 'Adult'
    else : cat = 'Elderly'
    
    return cat

 

 

2. python์šฉ ํ•จ์ˆ˜๋ฅผ Pyspark์šฉ ํ•จ์ˆ˜๋กœ ๋ณ€ํ™˜

ํ•จ์ˆ˜ ๊ฐ์ฒด = udf(lambda x : ํ•จ์ˆ˜๋ช…(x), ํ•ด๋‹น ํ•จ์ˆ˜์˜ ๋ฐ˜ํ™˜ํ˜•)
               = udf(lambda ์ž…๋ ฅ๋ณ€์ˆ˜: ์ผ๋ฐ˜ UDF, ํ•ด๋‹น ์ผ๋ฐ˜ UDF์˜ ๋ฐ˜ํ™˜ํ˜•)
from pyspark.sql.functions import udf,col
from pyspark.sql.types import StringType

udf_get_category = udf(lambda x:get_category(x), StringType() )

 

3. pySpark์šฉ ํ•จ์ˆ˜ ์ ์šฉ

2์—์„œ ๋งŒ๋“  ํ•จ์ˆ˜ ๊ฐ์ฒด๋ฅผ ์ ์šฉํ•˜๋Š”๋ฐ, ์ด๋•Œ ๊ฐ์ฒด๋ช…(ํ•จ์ˆ˜๋ฅผ ์ ์šฉํ•  ์ปฌ๋Ÿผ๋ช…) ์˜ ๋ฐฉ์‹์œผ๋กœ ์ ์šฉ

titanic_sdf_filled_01 = titanic_sdf_filled.withColumn("Age_Category",udf_get_category(col("Age")))
titanic_sdf_filled_01.show()

๐Ÿ‘‰ SQL์˜ case when ๊ณผ ์œ ์‚ฌํ•œ When ์„ ์‚ฌ์šฉํ•ด๋ณด๊ธฐ

์œ„์—์„œ ์ ์šฉํ•œ case ๋ณ„ ๋ฐ˜ํ™˜ ๊ฐ’์ด ๋‹ค๋ฅธ ํ•จ์ˆ˜๋ฅผ when ์œผ๋กœ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋‹ค.

- ์œ ์˜ํ•  ์  1 : when ์œผ๋กœ ์ผ€์ด์Šค๋ฅผ ๋‚˜๋ˆŒ ๋•Œ .when์œผ๋กœ ๊ฒฐ๊ณผ๊ฐ์ฒด์— ๋Œ€ํ•ด์„œ ์กฐ๊ฑด์„ ์จ์ค˜์•ผ ํ•จ

- ์œ ์˜ํ•  ์  2 : ๊ตฌ์กฐ๋Š” sql๊ณผ ์œ ์‚ฌํ•˜์ง€๋งŒ, ๋งˆ์ง€๋ง‰์— else๊ฐ€ ์•„๋‹Œ otherwise๋กœ ๋งˆ๋ฌด๋ฆฌํ•œ๋‹ค.

from pyspark.sql.functions import when
                   
titanic_sdf_filled_02 = titanic_sdf_filled.withColumn('Age_category', when(F.col('Age') <= 5, 'Baby')
                                                                      .when(F.col('Age') <= 12, 'Child')
                                                                      .when(F.col('Age') <= 18, 'Teenage')
                                                                      .when(F.col('Age') <= 25, 'Student')
                                                                      .when(F.col('Age') <= 35, 'Young Adult')
                                                                      .when(F.col('Age') <= 60, 'Adult')
                                                                      .when(F.col('Age').isNull(), 'NA')
                                                                      .otherwise('Elderly'))

titanic_sdf_filled_02.limit(10).show()

 

๐Ÿ‘‰ SQL์˜ case when ๋ฌธ์„ ๊ทธ๋Œ€๋กœ ์ ์šฉํ•ด๋ณด๊ธฐ

์•„๋ž˜์™€ ๊ฐ™์ด expr์„ ํ˜ธ์ถœํ•˜์—ฌ, SQL ๋ฌธ์„ ํฐ ๋”ฐ์˜ดํ‘œ ์•ˆ์— ๋„ฃ์–ด์„œ, case when ์„ ์ ์šฉํ•ด์ค„ ์ˆ˜๋„ ์žˆ์Œ!

from pyspark.sql.functions import expr, col

titanic_sdf_filled_03 = titanic_sdf.withColumn('Age_category', expr("CASE WHEN age = 12 THEN 'Child' " + 
                                               " WHEN Age <= 18 THEN 'Teenage' " +
                                               " WHEN Age <= 25 THEN 'Student' " +
                                               " WHEN Age <= 35 THEN 'Young Adult' " + 
                                               " WHEN Age <= 60 THEN 'Adult' " + 
                                               " WHEN Age IS NULL THEN 'NA' " +
                                               " ELSE 'Elderly' "))
titanic_sdf_filled_03.limit(10).show()
728x90