Spark 구조적 스트리밍
구조적 스트리밍은 스파크 SQL 엔진 기반의 스트림 처리 프레임 워크. 구조적 스트리밍은 핵심이 되는 구조적 처리 엔진 외에도 스트리밍에 특화된 여러 기능을 지원한다. 종합적이고 정확히 한 번 처리 방식뿐만 아니라 체크포인팅과 WAL(Write-ahead log)을 이용한 내고장성을 제공한다.
1. 특징
구조적 스트림잉의 핵심은 스트림 데이터를 데이터가 계속해서 추가되는 테이블처럼 다루는 것 이다. 스트리밍 잡은 계속해서 신규 입력 데이터를 확인 및 처리하는 것이며 필요한 경우 상태 저장소에 있는 일부 상태를 갱신해 결과를 변경한다. 이 API의 핵심은 배치 처리나 스트림 처리와 관련된 쿼리 구문을 변경하지 않아도 된다는 것이다.
간단히 구조적 스트리밍에서는 DataFrame도 스트리밍 방식으로 동작한다.
2. 입력 소스
- 아파치 카프카 0.10 버전
- HDFS나 S3 등 분삭 파일 시스템의 파일(스파크는 디렉터리의 신규 파일을 계속해서 읽음)
- 테스트용 소켓 소스
3. 싱크
입력 소스로 데이터를 얻듯이, 싱크로 스트림의 결과를 저장할 목적지를 명시한다. 싱크와 실행 엔진은 데이터 처리의 진행 상황을 신뢰도 있고 정확하게 추적하는 역할을 한다.
- 아파치 카프카 0.10
- 거의 모든 파일 포맷
- 출력 레코드에 임의 연산을 실행하는 foreach 싱크
- 테스트용 콘솔 싱크
- 디버깅용 메모리 싱크
4. 출력 모드
- 신규 정보만 추가하려는 경우
- 바뀐 정보로 기존 로우를 갱신하려는 경우, 예를 들어 특정 웹페이지의 클릭 수를 갱신하려는 경우
- 매번 전체 결과를 덮어쓰려는 경우, 예를 들어 모든 페이지의 전체 클릭 수를 매번 파일로 기록하려는 경우
해당 사항을 반영할 수 있도록 아래와 같이 스파크는 출력 모드를 지원
- append : 싱크에 신규 레코드만 추가
- update : 변경 대상 레코드 자체를 갱신
- complete : 전체 출력 내용 재작성하기
중요한 것은 특정 쿼리와 싱크는 일부 출력 모드만 지원한다. 예를 들어 스트림에 map 연산만 수행하는 잡이 있을 때 신규 데이터가 계속 유임되면 출력 데이터의 크기는 무한정 커진다. 이런 처리에는 매번 전체 데이터를 신규 파일로 저장하는 complete 모드는 적합하지 않다.
5. 트리거
트리거는 데이터 출력 시점을 정의한다. 구조적 스트리밍에서 언제 신규 데이터를 확인하고 결과를 갱신할지 정의한다. 구조적 스트리밍은 기본적으로 마지막 입력 데이터를 처리한 직후에 신규 입력 데이터를 조회해 최단 시간 내에 새로운 처리 결과를 만들어낸다. 하지만 이런 동작 방식 때문에 파일 싱크를 사용하는 경우 작은 크기의 파일이 여러 개 생길 수 있다. 따라서 스파크는 처리 시간 기반의 트리거도 지원한다.
6. 이벤트 시간 처리
구조적 스트리밍은 이벤트 시간 기준의 처리도 지원한다. 무작위로 도착한 레코드 내부에 기록된 타임스탬프를 기준으로 한다.
- 이벤트 시간 데이터
스파크는 데이터가 유입된 시간이 아니라 데이터 생성 시간을 기준으로 처리한다. 따라서 데이터가 늦게 업로드되거나 네트워크 지연으로 데이터 순서가 뒤섞인 채 시스템으로 들어와도 처리할 수 있다. 구조적 스트리밍에서는 아주 간단하게 이벤트 시간 처리를 표현 한다. 시스템은 입력 데이터를 테이블로 인식하므로 이벤트 시간은 테이블에 있는 하나의 컬럼일 뿐이다. 따라서 표준 SQL 연산자를 이용해 그룹화, 집계 그리고 윈도우 처리를 할 수 있다. 그리고 구조적 스트리밍이 내부적으로 이벤트 시간 필드를 인식하면 쿼리 실행 최적화나 타임 윈도우에서 상태 정보의 제거 시적ㅁ을 결정하는 등 특별한 작업을 수행할 수 있다. 이런 작업은 워터마크
를 사용한다.
- 워터마크
워터마크는 시간 제한을 설정할 수 있는 스트리밍 시스템의 기능이다. 늦게 들어온 이벤트를 어디까지 처리할 지 시간을 제한할 수 있따. 한 가지 예로 모바일 장비의 로그를 처리하는 애플리케이션에서 업로드 지연 현상 때문에 30분 전 데이터까지 처리해야 하는 경우가 있을 때 구조적 스트리밍을 포함해 이벤트 시간 처리를 지원하는 여러 시스템은 과거 데이터의 보관 주기를 제한하기 위해 워터마크를 사용한다.
7. Streaming DataFrame
스트리밍 DataFrame은 정적 DataFrame과 흡사하다. 기본적으로 정적 구조적 API의 모든 트랜스포메이션은 스트리밍 DataFrame에서도 사용할 수 있다. 하지만 구조적 스트리밍에서 스키마 추론 기능을 사용하고 싶은 경우 명시적으로 설정해야 한다. 스키마 추론 기능을 사용하려면 spark.sql.streaming.schemaInference 설정을 true로 설정해야 한다.
- 데이터 수집
streaming = spark \
.readStream \
.schema(dataSchema) \
.option("maxFilesPerTrigger", 1) \
.json("path")
# maxFilesPerTrigger 는 폴더 내의 전체 파일을 얼마나 빨리 읽을지 결정한다. 이 값을 낮게 잡으면 트리거당 하나의 파일을 읽게 만들어 스트림의 흐름을 인위적으로 제한할 수 있다.
- 데이터 트랜스포메이션 지정
activityCounts = streaming.groupBy("gt").count()
# 저성능 머신일 경우 셔플 파티션을 아래와 같이 줄일 수 있다.
spark.conf.set("spark.sql.shuffle.partitions", 5)
- 액션 정의
activityQuery = activityCounts.writeStream.queryName("activity_counts") \
.format("memory") \ # 메모리 싱크 사용
.outputMode("complete") \
.start()
# 실행 중인 스트리밍 쿼리를 제어하려면 쿼리 객체를 사용해야 한다.
# 그리고 쿼리 종료 시까지 대기할 수 있도록 아래와 같이 반드시 지정
# 쿼리 실행 중에 드라이버 프로세스가 종료되는 상황을 막을 수 있다.
activityQuery.awaitTermination()
- 확인
from time import sleep
for x in range(5):
spark.sql("SELECT * FROM activity_counts").show()
sleep(1)