Spark 사용
목차
1. 데이터 읽기
SparkSession의 DataFrameReader 클래스를 사용하여 데이터 읽음.
스키마 추론 기능
flightData2015 = spark\
.read\
.option('inferSchema', 'true')\
.option('header', 'true')\
.csv('./data/flight-data/csv/2015-summary.csv') # 데이터 준비
DataFrame에서 CSV 파일을 읽어 로컬 배열이나 리스트 형태로 변환하는 과정
head 명령과 같은 결과
flightData2015.take(3)
# 결과
#[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
# Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
# Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]
sort 메소드는 DataFrame 을 변경하지 않는다. 트랜스포메이션으로 sort 메소드를 사용하여 이전의 DataFrame을 변환해 새로운 DataFrame 을 생성해 반환한다.
DataFrame을 이용한 데이터 읽기, 정렬, 수집
특정 DataFrame 객체에 explain 메소드를 호출하면 DataFrame의 계보(lineage)나 스파크의 쿼리 실행 계획을 확인할 수 있음.
flightData2015.sort('count').explain()
# 결과
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- Sort [count#28 ASC NULLS FIRST], true, 0
# +- Exchange rangepartitioning(count#28 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#71]
# +- FileScan csv [DEST_COUNTRY_NAME#26,ORIGIN_COUNTRY_NAME#27,count#28] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/och/Desktop/spark/pyspark/data/flight-data/csv/2015-summar..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>
sort, exchange, filescan
트랜스포메이션 실행 계획을 세우기 위한 액션 호출
spark.conf.set('spark.sql.shuffle.partitions', '5')
flightData2015.sort('count').take(2)
#[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
# Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]
셔플 수행 시 기본적으로 200개의 셔플 파티션 생성
Transformation 의 논리적 실행 계획은 DataFrame의 계보를 정의
2. SQL DataFrame
사용자가 SQL이나 DataFrame(R, Python, Scala, Java) 으로 비즈니스 로직을 표현하면 스파크에서 실제 코드를 실행하기 전에 그 로직을 기본 실행 계획(explain 메소드를 호출해 실행 계획을 확인할 수 있음) 으로 컴파일 함. 스파크 SQL을 사용하면 모든 DataFram을 테이블이나 뷰(임시 테이블)로 등록한 후 SQL 쿼리를 사용할 수 있음.
createOrReplaceTempView 메소드를 호출하면 모든 DataFrame을 테이블이 나 뷰로 만들 수 있음
- 테이블 생성
flightData2015.sort('count').createOrReplaceTempView('flight_data_2015')
- 실행 계획
sqlWay = spark.sql('''
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
''')
dataFrameWay = flightData2015\
.groupBy('DEST_COUNTRY_NAME')\
.count()
- 확인
sqlWay.explain()
# 결과
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- HashAggregate(keys=[DEST_COUNTRY_NAME#26], functions=[count(1)])
# +- Exchange hashpartitioning(DEST_COUNTRY_NAME#26, 5), ENSURE_REQUIREMENTS, [id=#93]
# +- HashAggregate(keys=[DEST_COUNTRY_NAME#26], functions=[partial_count(1)])
# +- FileScan csv [DEST_COUNTRY_NAME#26] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/och/Desktop/spark/pyspark/data/flight-data/csv/2015-summar..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
dataFrameWay.explain()
# 결과
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- HashAggregate(keys=[DEST_COUNTRY_NAME#26], functions=[count(1)])
# +- Exchange hashpartitioning(DEST_COUNTRY_NAME#26, 5), ENSURE_REQUIREMENTS, [id=#106]
# +- HashAggregate(keys=[DEST_COUNTRY_NAME#26], functions=[partial_count(1)])
# +- FileScan csv [DEST_COUNTRY_NAME#26] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/och/Desktop/spark/pyspark/data/flight-data/csv/2015-summar..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
- max 함수
from pyspark.sql.functions import max
flightData2015.select((max('count'))).take(1)
- 1 SQL 예제
maxSql = spark.sql('''
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
''')
maxSql.explain()
# 결과
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- TakeOrderedAndProject(limit=5, orderBy=[destination_total#61L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#26,destination_total#61L])
# +- HashAggregate(keys=[DEST_COUNTRY_NAME#26], functions=[sum(count#28)])
# +- Exchange hashpartitioning(DEST_COUNTRY_NAME#26, 5), ENSURE_REQUIREMENTS, [id=#231]
# +- HashAggregate(keys=[DEST_COUNTRY_NAME#26], functions=[partial_sum(count#28)])
# +- FileScan csv [DEST_COUNTRY_NAME#26,count#28] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/och/Desktop/spark/pyspark/data/flight-data/csv/2015-summar..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>
- 2 pyspark 예제
flightData2015\
.groupBy('DEST_COUNTRY_NAME')\
.sum('count')\
.withColumnRenamed('sum(count)','destination_total')\
.sort(desc('destination_total'))\
.limit(5)\
.explain()
# 결과
# == Physical Plan ==
# AdaptiveSparkPlan isFinalPlan=false
# +- TakeOrderedAndProject(limit=5, orderBy=[destination_total#116L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#26,destination_total#116L])
# +- HashAggregate(keys=[DEST_COUNTRY_NAME#26], functions=[sum(count#28)])
# +- Exchange hashpartitioning(DEST_COUNTRY_NAME#26, 5), ENSURE_REQUIREMENTS, [id=#214]
# +- HashAggregate(keys=[DEST_COUNTRY_NAME#26], functions=[partial_sum(count#28)])
# +- FileScan csv [DEST_COUNTRY_NAME#26,count#28] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/Users/och/Desktop/spark/pyspark/data/flight-data/csv/2015-summar..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>
DataFrame의 변환 흐름
1. 데이터를 읽음
2. 데이터를 그룹화. GroupBy 메소드가 호출 되면 RelationalGroupedDataset 반환, 키 혹은 키셋을 기준으로 그룹을 만들고 각 키에 대한 집계 수행
3. 집계 유형을 지정하기 위해 컬럼 표현식이나 컬럼명을 인수로 사용하는 sum 메소드를 사용. 별도의 DataFrame 을 생성. 아무런 연산이 이루어지지 않는 트랜스포메이션.
4. withColumnRenamed 메소드에 원본 컬럼명과 신규 컬럼명을 인수로 지정. 연산은 일어나지 않음.
5. desc Column 객체를 반환
6. limit
7. explain