Spark 개념
스파크 응용 프로그램의 수명은 스파크 드라이버로 시작하고 끝난다. 드라이버는 클라이언트가 스파크에서 응용 프로그램을 제출하는 데 사용하는 프로세스다. 드라이버는 스파크 프로그램의 실행을 계획 및 조정하고, 상태 및 결과(데이터)를 클라이언트에게 반환한다. 또한, 클러스터에 있는 클라이언트나 노드에 물리적으로 상주할 수 있다.
1. SparkSession
스파크 드라이버는 스파크세션을 생성한다. 스파크세션 객체는 스파크 클러스터에 대한 연결을 나타낸다. 스파크세션은 대화식 셸을 포함해 스파크 응용 프로그램의 시작 부분에서 인스턴스화되며, 프로그램 전체에 사용된다.
스파크 2.0 이전에는 스파크 핵심 응용 프로그램에 사용된 스파크콘텍스트(SparkContext), 스파크 SQL 응용 프로그램과 함께 사용된 SQL 콘텍스트(SQLContext)와 하이브콘텍스트(HiveContext), 스파크 스트리밍 응용 프로그램에 사용된 스트리밍콘텍스트(StreamingContext)가 스파크 응용 프로그램의 엔트리 포인트에 포함돼 있었다. 스파크 2.0의 스파크 세션 객체는 이러한 모든 객체를 단일 엔트리 포인트로 결합해 모든 스파크 응용 프로그램에 사용할 수 있다.
스파크세션 객체는 스파크콘텍스트 및 스파크콘트 자식 객체를 통해, 마스터, 응용 프로그램 이름, 실행자의 수 등 사용자가 설정한 모든 런타임 구성 속성을 포함한다.
SparkSession
스파크세션 인스턴스의 객체 이름은 임의로 정해진다. 기본적으로 스파크 대화형 셸에서 스파크세션 인스턴스는 spark라는 이름을 가지는데, 일관성을 유지하기 위해 스파크세션은 항상 spark를 인스턴스화 한다. 그러나 이름은 개발자가 원하는 대로 얼마든지 바꿀 수 있다.
- SparkSession 불러오기
from pyspark.sql import SparkSession
- SparkSession 초기화
spark = SparkSession\
.builder\
.master('spark://sparkmaster:7077')\
.appName('Python Spark SQL basic example')\
.config('spark.submit.deployMode', 'client')\
.getOrCreate()
- 사용
myRange = spark.range(1000).toDF('number')
2. DataFrame
테이블의 데이터를 로우와 컬럼으로 단순하게 표현
컬럼과 컬럼의 타입을 정의한 목록을 스키마라고 부름
스파크는 R과 Python 의 DataFrame을 쉽게 변환 가능(Python pandas df -> spark dataframe)
3. 파티션
스파크는 모든 익스큐터가 병렬로 작업을 수행할 수 있도록 파티션이라 불리는 청크 단위로 데이터 분할
파티션은 클러스터의 물리적 머신에 존재하는 로우의 집합
DataFrame 의 파티션은 실행 중에 데이터가 컴퓨터 클러스터에서 물리적으로 분산되는 방식을 나타냄
파티션이 하나일 경우 익스큐터가 아무리 많아도 병렬성은 1 반대로 파티션이 많아도 익스큐터가 하나일 겅우 병렬성 1
DataFrame을 사용하면 파티션을 수동 혹은 개별적으로 처리할 필요 없음
4. 트랜스포메이션
스파크의 핵심 데이터 구조는 불변성, 한 번 생성하면 변경할 수 없음
DataFrame 을 변경하려면 원하는 변경 방법을 스파크에 전달 필요, 이를 트랜스포메이션이라고 함.
myRange = spark.range(1000).toDF('number')
divisBy2 = myRange.where("number % 2 = 0")
위 코드를 실행해도 결과가 출력되지 않는다. 이는 추상적인 트뢘스포메이션만 지정한 상태이기 때문에 액션을 호출하지 않으면 스파크는 실제 트랜스포메이션을 수행하지 않는다.
-
좁은 의존성 좁은 의존성을 가진 트랜스포메이션은 각 입력 파티션이 하나의 출력 파티션에만 영향을 미침. where 구문은 좁은 의존성을 가짐. 파이프라이닝 을 수행
-
넓은 의존성 넓은 의존성을 가진 트랜스포메이션은 하나의 입력 파티션이 여러 출력 파티션에 영향을 미침. 셔플 로 스파크가 클러스터에서 파티션을 교환
5. 지연연산
스파크가 연산 그래프를 처리하기 직전까지 기다리는 동작 방식을 의미
특정 연산 명령이 내려진 즉시 데이터를 수정하지 않고 원시 데이터에 적용할 트랜스포메이션의 실행 계획을 생성
DataFrame의 조건절 pushdown
복잡한 스파크 job이 원천 데이터에서 하나의 row만 가져오는 필터를 가지고 있다면 필요한 레코드 하나만 읽는 것이 가장 효율적.
6. 액션
실제 연산을 수행하려면 액션 명령 필요
트랜스포메이션으로부터 결과를 계산하도록 지시하는 명령
myRange = spark.range(1000).toDF('number')
divisBy2 = myRange.where("number % 2 = 0")
divisBy2.count()
세 가지 유형의 액션
- 콘솔에서 데이터를 보는 액션
- 각 언어로 된 네이티브 객체에 데이터를 모으는 액션
- 출력 데이터소스에 저장하는 액션
액션을 지정하면 스파크 잡이 시작되고, 스파크 잡은 필터(좁은 트랜스포메이션)를 수행 후 파티션별로 레코드 수를 카운트(넓은 트랜스포메이션) 한다. 그리고 각 언어에 적합한 네이티브 객체에 결과를 모음.
7. Spark UI
드라이버 노드의 4040 포트로 접속 가능
로컬 모드일 경우 http://localhost:4040
스파크 잡의 상태, 환경 설정, 클러스터 상태 등의 정보 확인 가능