본문 바로가기

Python/spark(python)7

[Spark] Spark에서 사용자 정의 함수를 적용 & When 사용 👉 Spark에서 사용자 정의 함수를 작성하고 적용하기 (파이썬에서의 함수 정의 및 적용과 비슷한 듯 다른 부분이 있음) 👉  SQL의 case when 과 유사한 When 을 사용해서 함수와 유사한 결과를 만들기 👉 SQL의 case when 문을 그대로 적용해보기 1. 파이썬 함수 작성하기 먼저 일반 python용 함수를 작성한다. 이때, 반드시 입력 값과 반환 값을 설정 & 값이 None일 경우 처리 방식도 설정 # 일반 python용 UDF를 작성 def get_category(age): cat = '' # age 값이 None일 경우는 NA를 Return if age == None: return 'NA' if age 2023. 10. 25.
[Spark] 레코드와 컬럼 삭제 / 결측치 확인 및 처리 단일 칼럼 삭제 drop() 메서드 인자로 단일 칼럼명 문자열, 또는 칼럼명 칼럼형을 입력 drop 할 컬럼이 원본 데이터프레임에 존재하지 않아도 오류가 발생하지 않음! 유의! # 단일 컬럼 삭제 titanic_sdf_copied = titanic_sdf_copied.drop('Name') titanic_sdf_copied = titanic_sdf_copied.drop(col('Sex')) 여러 개 칼럼 삭제 pandas와 달리, list가 아니라 단일 칼럼명들을 각각 인자로 넣어줘야 한다. 이때, 컬럼형 인자가 아닌 문자형 칼럼명으로 인자 넣어주기 >> 칼럼형 인자 나열은 오류남 titanic_sdf_copied.drop('Age', 'SibSp').limit(10).show() 또는 아래와 같이 삭제.. 2023. 10. 25.
[Spark] 컬럼 생성/업데이트를 위한 withColumn() , substring(), split() spark DataFrame의 신규 컬럼 추가 withColumn() 은 일종의 업데이트/추가 기능이라고 생각하면 된다. 구현할 때 입력값의 형식을 어떻게 해야하는지 유의하여 코드를 작성한다. withColumn('신규/update 되는 컬럼명', '신규/Update 되는 값') >> 신규 컬럼은 문자열로, 기존 컬럼은 반드시 컬럼형으로 기입(col('컬럼명')) (예시) titanic_sdf_copied = titanic_sdf_copied.withColumn('Extra_Fare', col('Fare') * 10) # 기존 컬럼 값 update titanic_sdf_copied = titanic_sdf_copied.withColumn('Fare', col('Fare') + 20) # 컬럼 타입 변경 .. 2023. 10. 24.
[Spark] spark DataFrame의 orderBy( )와 aggregation OrderBy( ) 정렬 컬럼은 문자열, 또는 컬럼 형태로 입력 정렬 컬럼이 여러개라면 개별 컬럼을 넣거나, list로 넣어도 됨 정렬 컬럼별로 서로 다른 정렬 옵션을 적용할 때는 ascending=[True, False]와 같은 형태로 입력 아래 예시를 보려면 display() 안에 넣어주기! # Pclass와 Name 컬럼으로 내림차순 정렬 titanic_pdf.sort_values(by=['Pclass', 'Name'], ascending=False) # Pclass는 오름차순, Name은 내림차순 정렬 titanic_pdf.sort_values(by=['Pclass', 'Name'], ascending=[True, False]) from pyspark.sql.functions import col .. 2023. 10. 24.
[Spark] select() 와 filter() 메서드 데이터 프레임 생성하기 spark.createDataFrame() dict_01 = {'Name': ['Chulmin', 'Wansoo','Myunghyun','Hyunjoo', 'Chulman'], 'Year': [2011, 2016, 2015, 2015, 2011], 'Gender': ['Male', 'Male', 'Male', 'Female', 'Male'] } # 딕셔너리를 DataFrame으로 변환 data_pdf = pd.DataFrame(dict_01) # pandas DataFrame은 spark DataFrame으로 변환 data_sdf = spark.createDataFrame(data_pdf) 특정 컬럼 선택(select)하기 select() 로 하나 이상의 컬럼을 인자로 넣으면 데이.. 2023. 10. 24.
[Spark] databrick으로 데이터 로드하고, 기술통계량 확인하기 데이터 업로드하기 메인화면 > Data > Create Table Notebook에서 데이터 확인하기 위 이미지의 형광펜이 데이터 경로이다 spark.read.csv(경로.csv) 를 입력하여 데이터를 불러온다. 참고로, /FileStore는 DBFS 파일 시스템으로 Spark외에는 접근 불가하다. titanic_sdf = spark.read.csv('/FileStore/tables/titanic_train-1.csv', header=True, inferSchema=True) print('titanic sdf type:', type(titanic_sdf)) # databrick에서 데이터를 formatting 된 형식으로 확인하기 display(titanic_sdf) 데이터 형식이 pyspark의 dat.. 2023. 10. 23.
[Spark] Databricks로 시작하기 Databricks 접속 & 가입 databricks community edition 으로 검색해서 Sign up >> Sign in 해주면 다음과 같은 화면이 나타난다 무료 이용은 2주라, 2주마다 구글 이메일 계정만들어서, 크롬에 로그인 후 접속해주면 계속 이용할 수 있다. 👉 나중에 재접속 할 때는 Databricks Commuity 로 접속하도록 한다! Cluster 생성 cluster_name : 직접 입력 Databricks Runtime Version : Runtime 10.0 ML(Scala 2.12 Spark 3.2.0) 선택 현재 버전에서는 아래를 선택 2시간이 지나면 클러스터가 Terminate 가 된다. 그럴 땐 기존 것을 삭제하고 동일 세팅으로 다시 만들어서 노트북과 연결해준다... 2023. 10. 23.
728x90