RDD
1. RDD 란?
RDD(Resilient Distributed Dataset : 복원 분산 데이터 집합)는 스파크 프로그래밍에서 사용되는 가장 기본적인 데이터 객체다. RDD 는 스파크 응용 프로그램 내의 데이터 집합으로, 로드 된 초기 데이터 집합, 중간 데이터 집합 및 최종 결과 데이터 집합을 모두 포함한다. 대부분의 스파크 응용 프로그램은 외부 데이터로 RDD로 로드해 연산을 수행한 뒤 새로운 RDD를 생성하는데, 이러한 작업을 변환(transformation)이라고 한다. 이 프로세스는 궁극적으로 원하는 결과물이 출력될 때까지 반복되는데, 이러한 작업의 유형을 액션이라 한다. 응용 프로그램의 결과를 파일 시스템에 기록하는 것 등을 액션의 예로 들 수 있다.
RDD는 분산된 객체 모음으로, 여기서 객체 모음으로 , 여기서 객체는 스파크 프로그램에서 사용되는 데이터를 나타낸다. PySpark 의 RDD는 리스트(lists), 튜플(tuples) 및 딕셔너리(dictionaries) 와 같은 분산 파이썬 객체로 구성된다. RDD 내의 객체는 리스트의 요소와 같이 정수, 부동 소수점 수, 문자열과 같은 기본 데이터 유형은 물론 튜플, 딕셔너리, 리스트와 같은 복합 유형을 포함한 모든 유형이 될 수 있다. 스칼라 및 자바 API 에서의 RDD는 각각 스칼라 또는 자바 객체 모음으로 구성된다.
RDD를 디스크에 지속시키기 위한 옵션이 있지만, RDD는 주로 메모리에 저장되거나 적어도 가능한 한 메모리에 저장되도록 해야 한다. 스파크의 초기 용도는 머신러닝을 지원하는 것이었으므로 스파크 RDD는 제한된 형식의 공유 메모리를 제공한다. 이는 연속적이고 반복적인 연산을 수행할 때, 데이터의 효율적인 재사용을 가능하게 한다.
하둡 맵리듀스 구현의 주요 단점 중 하나는 중간 데이터를 디스크에 지속적으로 저장하고, 런타임 중 노드 간에 데이터를 복사하는 것이었다. 이러한 맵리듀스의 데이터 공유 분산 처리 방식은 복원력과 내결함성을 제공하지만, 지연 시간이 발생했다.이러한 설계의 제한은 스파크 프로젝트의 주요 촉매 중 하나로 작용했다. 데이터 양이 증가함에 따라실시간 데이터 처리 및 통찰력이 필요해지고, 이로 인해 스파크의 RDD에 기반을 둔 인 메모리 처리 프레임워크는 인기를 끌었다.
2. 복원 분산 데이터 집합 개념
- 복원(Resilient) : RDD는 탄력적이다. 스파크에서 작업을 수행하다가 노드가 손실될 경우, 데이터 집합을 재구성할 수 있다. 이것은 스파크가 각 RDD 리니지를 알고 있기 때문에 가능하다.
- 분산(Distributed) : RDD는 분산된다. 이는 파티션을 하나 또는 여러 개로 나누고, 클러스터의 작업자 노드를 통해 RDD의 데이터를 메모리 내 객체의 모음으로 분산하는 것을 의미한다. RDD는 서로 다른 노드(작업자)의 프로세스(실행자) 사이에 데이터를 교환하기 위한 효율적인 공유 메모리 형태를 제공한다.
- 데이터 집합(Dataset) : RDD는 레코드로 구성된 데이터 집합이다. 레코드는 데이터 집합안의 고유하고 식별 가능한 데이터 모음으로 , 관계형 데이터베이스의 테이블에 있는 행이나 파일 내 텍스트의 줄 또는 여러 다른 형식의 필드와 유사한 필드 모음을 말한다. RDD는 각 파티션이 고유한 레코드 세트를 포함하도록 작성되며, 독립적으로 동작할 수 있다. ( = 비공유 접근법)
- 불변성 : RDD가 인스턴스화되고 데이터로 채워진 후에는 업데이트할 수 없다는 것을 의미한다. map, filter 함수와 같은 변환 연산을 통해 기존 RDD를 기반으로 새로운 RDD를 만들 수는 있다.
- 액션 : RDD에서 수행되는 다른 연산이다. 드라이버 프로그램으로 반환된 RDD에서 데이터 형식으로 존재할 수 있는 출력을 만들거나, RDD의 콘텐츠를 파일 시스템(로컬, HDFS, S3 등)에 저장한다. 이 밖에도 RDD 내의 레코드 수를 반환하는 등 많은 다른 액션이 있다.
# 예제
# 로컬 파일 시스템에서 로그 파일 로드
logfilesrdd = sc.textFile("file://var/log/hadoop/hdfs/hadoop-hdfs-*")
# 오류에 대해서만 로그 레코드 필터링
onlyerrorsrdd = logfilesrdd.filter(lambda line: "ERROR" in line)
# onlyerrorsrdd 파일로 저장
onlyerrorsrdd.saveAsTextFile("file://tmp/onlyerrorsrdd")