[DATA ANAL] 실무에서 자주 쓰는 Pyspark 명령어 모음

0. INTRO

  • Cloud상에서 데이터 분석을 할 때 가장 많은 시간을 요하는 작업은 두 가지라고 생각한다.
    1. 최종 시각화 이미지를 만들어내기 위해 어떠한 테이블들을 끌어와야 할지 원천 DataBase를 보며 결정할 때
    2. 이행해 온 테이블들을 어떤식으로 조립하여 붙여야 원하는 결과를 얻어낼 수 있을지 생각할 때

    물론 데이터 성공적인 데이터 분석 프로젝트를 위해서는 중간에 많은 과정들과 수고들이 들어가지만 위의 두 가지 작업이 코드를 많이 치지는 않으면서 오랜 기간의 생각과 의논과 판단이 필요한 과정이라고 개인적으로는 생각한다.

  • 특히 2번 작업을 할때는 거의 EMR에 세팅된 Pyspark 환경에서 작업을 하게되는데 파레토의 80:20 법칙같이 메인으로 쓰이는 메소드들을 주로 사용하게 되고 자주 사용하지 않는 메소드들은 아직까지도 손에 잘 익지 않아서 매 번 구글링을 하곤한다.
  • Pyspark로 테이블을 가공하며 종종 쓰이는 명령어들을 정리해보았다. 테이블은 parquet 형식으로 저장되어있으며 불러온 DataFrame의 이름은 ‘df’이며 아래와 같은 형상을 가지고 있다고 가정한다.
col_a col_b col_c
A B C

1. 테이블 읽기, 쓰기, 보기

1) Read

    df = spark.read.parquet('parquet file path')


2) Write

    df.write.mode('overwrite').parquet('save file path')
                - overwrite : 덮어쓰기
                - append : 추가
                - ignore : 파일 있으면 작업 안함

    - 하나의 파일로 저장
    df.coalesce(1).mode('overwrite').parquet('save file path')


3) Show

    - df.show(Row수) : 기본적인 테이블 모습으로 보기
    - df.show(n, vertical=True, Truncate=False) : n개 row에 대해 컬럼을 세로로 피벗해서 보여줌
    - df.first() : 첫번째 Row 만 Row 형식으로 출력
    - df.collect() : 컬럼의 Row들을 list 형식으로 출력
    - df.limit(n) : 컬럼의 첫 n개의 Row들을 list 형식으로 출력

2. 컬럼 타입 확인, 변경, df 생성

from pyspark.sql import SparkSession, Row
from pyspark.sql import types as T
from pyspark.sql import functions as F

1) 타입 확인

    df.printSchema()


2) 타입 변경

    _dtype = {
                'col_a' : T.StringType(),
                'col_b' : T.DoubleType(),
                'col_c' : T.TimestampType()
            }

    for _col in df.columns:
        df = df.withColumn(_col, F.col(_col).cast(_dtype[_col]))


3) DataFrame 생성

    schema = T.StructType(
                [
                    T.StructField("col_A", T.StringType(), True),
                    T.StructField("col_B", T.StringType(), True),
                    T.StructField("col_C", T.StringType(), True)
                ]
            )

    rows = [
            Row(col1=, col2=, col3=),
            Row(col1=, col2=, col3=),
            Row(col1=, col2=, col3=),
            ...
            ]

    df_new = spark.createDataFrame(rows, schema)

3. 컬럼 데이터 Filtering (Filter 관련 세부 내용)

1) Filter

    df.filter(F.col('col_a') == 'aa')
    df.filter((조건 1) & (조건 2) & ...)


2) Drop Duplicates

    df.dropDuplicates([중복제거 컬럼 list])


3) 포함 여부 확인 (isin)

    df.filter(F.col('col_a').isin([....]))  <-->  df.filter(~F.col('col_a').isin([....]))


4) Null값 확인

    df.filter(F.col('col_a').isNull())  <-->  df.filter(F.col('col_a').isNotNull())


5) 비슷한 형태 확인 (like)

    df.filter(F.col('col_a').like('문자 형태'))


6) When 조건문

    > col_a의 데이터가 null이면 공백, 아니면 'O'로 채우는 조건
    df.withColumn('when_a', F.when(F.col('col_a').isNull(), '').otherwise('O')

4. Group By 집계

1) 기본 agg 함수들

    df_groupby = \
        df.groupby('col_a') \
            .agg(
                F.sum(F.col('col_a')), # 총합
                F.countDistinct(F.col('col_a')), # distinct한 개수만 세기
                F.count(F.col('col_a')), # 전체 개수 세기
                F.mean(F.col('col_a')), # 평균값
                F.avg(F.col('col_a')), # 평균값
                F.stddev(F.col('col_a')), # 표준편차
                F.min(F.col('')).alias('min'), # 최솟값
                F.max(F.col('')).alias('max'), # 최댓값
                F.round(F.avg('col_a'), 2), # 반올림
                F.collect_list(F.col('col_b')), # group by 후 특정 컬럼의 값들을 list로 묶어준다.(중복 O)
                F.collect_set(F.col('col_b')) # group by 후 특정 컬럼의 값들을 list로 묶어준다.(중복 X)
            )
    # 각 함수의 뒤에 .alias('col_name') 을 붙여 컬럼 이름도 설정 가능.


2) agg + 조건(when-otherwise)

    df_groupby = \
        df.groupby('col_a') \
            .agg(
                F.count(F.when('조건', '대상컬럼').otherwise('조건 불만족시 대상컬럼'))
            )

5. Order By 정렬

1) 기본

    df.orderBy('col_a')  < -- > df.orderBy('col_a', ascending=False)


2) 어려개

    df.orderBy('col_a', 'col_b')


3) 여러개 + 정렬 순서 반대

    df.orderBy(F.col('col_a'), F.col('col_b').desc())

6. 데이터 Cleansing

1) zfill (채움)

    df.select(
        F.lpad(F.col('col_a'), 5, '0').alias('lpad_col_a'), # 5자리까지 0으로 채운다.
        'col_b',
        'col_c'
    )


2) trim (좌우 공백 제거)

    df = df.withColumn(F.col('col_a'), F.trim(F.col('col_a')))


3) Replace

    df = df.withColumn(F.col('col_a'), F.regexp_replace('col_a', 'Before', 'After'))


4) 컬럼 순서 재정렬

    df = df.select([원하는 컬럼 순서 나열])


5) 컬럼 이름 일괄 변환

    1> df.withColumnRenamed('col1','colA').withColumnRenamed('col2','colB')....
    
    2> col_list = [....]
        df = df.toDF(*col_list)


6) 컬럼 내용 합치기

    1> df.withColumn('concat_ab', F.concat(F.col('col_a'), F.col('col_b')))

    2> df.withColumn('col_join', F.concat_ws("-", *[F.col(x) for x in df.columns if 'day' in x]))
        - 컬럼명에 'day'가 들어있는 컬럼의 데이터들을 '-' 기준으로 합친다.


7) datetime 컬럼으로 변환 및 날짜 차이

    1> df.withColumn(
        'max_date',
         F.to_date(F.max('col_a'), "yyyyMMdd")
         )

    2> df.withColumn(
        'date_diff', 
        F.datediff(
            F.to_date(F.max('proc_ymd'), "yyyyMMdd"),
            F.to_date(F.min('proc_ymd'), "yyyyMMdd")
            )
        )

8) datetime 출력 형식 변환
    
    df.withColumn('col_ymd', F.date_format(F.col('col_a'), 'yyyy-MM-dd'))
    

7. Join

1) 이름이 다른 컬럼들 끼리의 Join

    df_1.join(df_2, df_1.col_a == df_2.col_b, 'left')


2) 이름이 동일한 컬럼들 끼리의 join

    df_1.join(df_2, ['col_a'], 'left')


3) 여러 컬럼들 끼리 join

    df_1.join(df_2, () & () & (), 'left')


4) 이름이 동일한 여러 컬럼들 끼리 join

    df_1.join(df_2, ['userid', 'idx'], 'left')

8. Union

1) Union (테이블들의 컬럼이 반드시 동일)

    df_1.union(df_2)


2) UnionByName (테이블의 컬럼이 동일하지 않아도 됨)

    df_1.unionByName(df_2, allowMissingColumns = True)

9. Window

  • Spark에서 Window 함수는 다른 함수들에 비해 개인적으로는 덜 직관적이어서 이해하고 사용하는데 시간이 조금 필요했다.
  • Window는 기본적으로 DataFrame의 특정 행들을 그룹핑하여 그룹핑된 행들의 집합을 대상으로 특정 작업을 수행한 후 하나의 값을 반환해준다.
  • 특정 컬럼을 대상으로 그룹핑이 선행된 후 함수가 작동하기 때문에 함수들을 사용하기 위해서는 기준을 먼저 설정해주어야 한다.
  • df가 특정 user들이 착용한 장비들의 가격을 나타내는 테이블이라고 가정할 때, user들의 착용 장비 가격에 대해서 순위가 매겨진 컬럼을 새로 생성하려 한다면 아래와 같은 순서에 따른다.
    1. user로 그룹핑되고, 장비 가격으로 정렬(order by)된 window 변수를 하나 생성한다.
    2. window 변수를 기준으로 Window의 row_number() 함수를 사용하여 가격 순위를 매기는 컬럼을 생성한다.
from pyspark.sql import window as W

1) window 변수 생성

    w = \
        W.Window \
            .partitionBy("user") \ # 사용자들을 기준으로 그룹핑
            .orderBy("price") # 그룹 내에서 정렬기준


2) 새로운 컬럼 생성

    df = df.withColumn('price_rank', F.row_number.over(w))
  • window 기준으로 사용할 수 있는 functions 내 함수는 아래와 같다.
    • row_number(), rank(), percent_rank(), dense_rank(), ntile(), cume_dist(), lag(), lead()

10. UDF(User Define Function)

  • 사용자가 함수를 정의하고 그 함수의 기능대로 특정 컬럼의 값들을 가공할 수 있는 아주 파워풀한 기능이다.
1) udf 변수 선언 방식

    def double_string(val):
        val = val * 2
        return val

    udf_a = F.udf(double_string)

    df_udf = df.withColumn('double_col', udf_a(F.col('col_a')))


2) Decorator 사용 방식(변수 선언이 필요 없고 기존 함수명 그대로 사용 가능)

    @udf(T.StringType())
    def double_string(val):
        val = val * 2
        return val

    df_udf = df.withColumn('double_col', double_string(F.col('col_a')))

11. 기타

  • 해당 섹션에 있는 함수들은 자주 쓰이지는 않지만 한 번 이상 들어본 함수들을 정리해보았다.

1. coalesce와 repartition

  • 두 함수 모두 df의 저장시 주로 사용되며 파티션의 개수 조정해주는 함수이다. 차이점에 대한 자세한 설명은 아래와 같다. (참고사이트)
    • 두 메서드 모두 파티션의 크기를 나타내는 정수를 인자로 받아 파티션의 수를 조정한다는 점에서 공통점이 있지만 repartition()이 파티션 수를 늘리거나 줄이는 것을 모두 할 수 있는 반면 coalesce()는 줄이는 것만 가능하다.
    • repartition() 메서드로 파티션 변경 기능을 할 수 있음에도 coalesce() 메서드를 따로 두는 이유는 바로 처리 방식에 따른 성능 차이 때문이다.
    • repartition()은 셔플을 기반으로 동작을 수행하는 데 반해 coalesce()는 강제로 셔플을 수행하라는 옵션을 지정하지 않는 한 셔플을 사용하지 않기 때문이다. 따라서 데이터 필터링 등의 작업으로 데이터 수가 줄어들어 파티션의 수를 줄이고자 할 때는 상대적으로 성능이 좋은 coalesce()를 사용하고, 파티션 수를 늘여야 하는 경우에만 repartition() 메서드를 사용하는 것이 좋다.

       df.repartition(5)
       df2 = df.repartition("state")
       df2 = df.repartition(5, "state")
       df2 = df.repartition("state","department")
      
       df.coalesce(2)
      

2. split, arrays_zip, approx_count_distinct

  1. split(column, pattern, limit)
    • pattern : split할 기준 문자열 패턴 (‘-‘ > 하나의 패턴 / [‘AB’] > A와 B를 기준으로 split)
    • limit : split될 array의 길이 제한을 둘 수 있음 (-1 > 제한 없음)
      df1 = df.withColumn('year', split(F.col('col_a'), '-').getItem(0)) \ # split 후 0번 index 가져옴
      .withColumn('month', split(F.col('col_a'), '-').getItem(1)) \
      .withColumn('day', split(F.col('col_a'), '-').getItem(2))
    
  2. arrays_zip(col_a, col_b, col_c ….)
    • 각 컬럼의 row에 동일한 길이의 list가 들어있을 때 이들을 zip 해주는 함수
      df = spark.createDataFrame([(([1, 2, 3], [2, 3, 4]))], ['vals1', 'vals2'])
      df = df.withColumn(arrays_zip(df.vals1, df.vals2).alias('zipped'))
    
      df.show()
            
      -RECORD 0-------------------------
      vals1 | [1, 2, 3]                
      vals2 | [2, 3, 4]                
      sdf   | [{1, 2}, {2, 3}, {3, 4}]
    
  3. approx_count_distinct(col, rsd)
    • 특정 컬럼의 distinct한 value 개수를 return 해준다.
    • rsd : 허용 표준편차 (default = 0.05)

3. summary, describe

  • 두 메서드 모두 dataframe을 이루는 컬럼들에 대한 산술적인 통계치를 보여준다.(count, mean, stddev, min, max 등)
  • summary가 조금 더 보여주는 정보들이 넓다

       df.describe().show()
       df.describe("num1").show()
    
       df.summary().show()
       df.summary("count", "33%", "50%", "66%").show()
       df.select("num1").summary("count", "33%", "50%", "66%").show()
    

4. sample, take, limit

  1. sample(fraction, seed)
    • 0-1 사이로 사용자가 정한 fraction 비율을 바탕으로 샘플 데이터를 추출해서 보여준다.
  2. take와 limit
    • 두 메서드 모두 DataFrame을 위에서부터 n개만큼 잘라서 보여주지만 차이점을 아래와 같다.
      • take(n) : Arrays of Rows 형태로 return
      • limit(n) : 잘린 DataFrame 형태로 return

5. persist, broadcast

  1. persist -> 참고사이트
  2. broadcast -> 참고사이트

Leave a comment