데이터 흐름



파일 읽기 상세


클라이언트가 HDFS, Namenode, Datanode 와 어떻게 상호작용하는지 데이터 흐름 관점에서 이해하기 위해 파일읽기 이벤트의 기본 순서



  1. 클라이언트는 HDFS가 DistributedFileSystem 인스턴스인 FileSystem 객체의 open() 메소드를 호출하여 원하는 파일을 연다.


  1. DistributedFileSystem 은 파일의 첫 번째 블록 위치를 파악하기 위해 RPC를 사용하여 네임노드를 호출한다. 네임노드는 블록별로 해당 블록의 복제본을 가진 데이터 노드의 주소를 반환한다. 이 때 클러스터의 네트워크 위상에 따라 클라이언트와 가까운 순으로 데이터노드(예를 들면 맵리듀스 태스크)고 해당 블록의 복제본을 가지고 있으면 클라이언트는 로컬 데이터 노드에서 데이터를 읽는다.


  1. DistributedFileSystem은 클라이언트가 데이터를 읽을 수 있도록 FSDataInputStream을 반환한다. FSDataInputSTream은 데이터노드와 네임노드의 I/O를 관리하는 DFSinputStream을 래핑한다. 클라이언트는 스트림을 읽기 위해 read() 메서드를 호출한다.


  1. 파일의 첫 번째 블록의 데이터노드 주소를 저장하고 있는 DFSInputStream은 가장 가까운(첫번째) 데이터노드와 연결한다. 해당 스트림에 대해 read() 메소드를 반복적으로 호출하면 데이터노드에서 클라이언트로 모든 데이터가 전송된다.


  1. 블록의 끝에 도달하면 DFSInputStream은 데이터노드의 연결을 닫고 다음 블록의 데이터노드를 찾는다. 클라이언트의 관점에서 이러한 과정은 투명하게 전개되며 클라이언트는 단지 연속적인 스트림을 읽는 것 처럼 느낀다.


  1. 클라이언트는 스트림을 통해 블록을 순서대로 하나씩 읽는다. 이때 DFSInputStream은 블록마다 데이터노드와 새로운 연결을 맺는다. 클라이언트는 다음 블록의 데이터노드 위치를 얻기 위해 네임노드를 호출한다. 모든 블록에 대한 읽기가 끝나면 클라이언트는 FSDataInputStream의 Close 메소드를 호출한다.


데이터를 읽는 중에 데이터노드와의 통신에 장애가 발생하면 DFSInputStream은 해당 블록을 저장하고 있는 다른 데이터노드와 연결을 시도한다. 또한 이후 블록에 대한 불필요한 재시도를 방지하기 위해 장애가 발생한 데이터노드를 기억해둔다. DFSInputStream은 다른 데이터노드에 있는 블록의 복제본을 읽으려 시도한다. 몰론 손상된 블록에 대한 정보는 네임노드에 보고된다. 물론 손상된 블록에 대한 정보는 네임노드에 보고된다.


설계의 핵심으로 클라이언트는 데이터를 얻기 위해 데이터노드에 직접적으로 접촉하고, 네임노드는 각 블록에 적합한 데이터노드를 안내해준다는 것이다. 데이터 트래픽은 클러스터에 있는 모든 데이터노드에 고르게 분산되므로 HDFS는 동시에 실행되는 클라이언트의 수를 크게 늘릴 수 있다. 한 편 네임노드는 효율적인 서비스를 위해 메타데이터를 메모리에 저장하고 단순히 블록의 위치 정보 요청만 처리하며, 데이터를 저장하거나 전송하는 역할은 맡지 않으므로 클라이언트가 많아져도 병목현상은 거의 발생하지 않는다.

네트워크 토폴로지와 Hadoop


대용량 데이터 처리 맥락에서 노드 간 데이터 전송의 제약 요소는 전송률이다. 대역폭은 한정 자원이다. 그래서 노드 간의 네트워크 대역폭을 거리 측정에 이용한다.

노드 간의 대역폭을 실제 측정하는 것은 어렵다. 클러스터는 고정적이어야 하고 클러스터의 노드 쌍의 수는 전체 노드 수의 제곱에 이르기 때문이다. 대신 하둡은 단순한 방식을 취하는데, 네트워크 전체를 하나의 트리로 표현하고 두 노드의 거리는 가장 가까운 공통 조상과의 거리를 합산하여 계산한다. 트리의 단계는 고정되어 있지 않지만, 일반적으로 데이터 센터, 랙, 토드 순으로 구성된다. 가용 대역폭은 다은에 나오는 시나리오 순으로 점차 줄어든다고 가정한다.

  • 동일 노드
  • 동일 랙의 다른 노드
  • 동일 데이터 센터에 있는 다른 랙의 노드
  • 다른 데이터 센터에 있는 노드

예를 들어 d1 데이터 센터의 r1 랙에 있는 n1 노드를 상상해보자. 이 노드는 /d1/r1/n1으로 표시될 수 있으며, 앞서 언급한 네 개의 시나리오로 그 거리를 계산해본다(거리 측정함수).


  • distance(/d1/r1/n1, /d1/r1/n1) = 0 (동일 노드)
  • distance(/d1/r1/n1, /d1/r1/n2) = 2 (동일 랙 다른 노드)
  • distance(/d1/r1/n1, /d1/r2/n3) = 0 (동일 데이터 센터에 있는 다른 랙의 노드)
  • distance(/d1/r1/n1, /d2/r3/n4) = 0 (다른 데이터 센터에 있는 노드)


다수의 데이터 센터에서 하둡을 운영하는 것은 권장하지 않는다.


파일 쓰기 상세

HDFS의 일관성(coherency model)을 명확하게 설명하므로 데이터의 흐름을 이해하는데 도움이 된다.



  1. 클라이언트는 DistributedFileSystem의 create()를 호출하여 파일을 생성한다.


  1. DistributedFileSystem은 파일시스템의 네임스페이스에 새로운 파일을 생성하기 위해 네임노드에 RPC 요청을 보낸다. 이 때 블록에 대한 정보는 보내지 않는다. 네임노드는 요청한 파일과 동일한 파일이 이미 존재하는지, 클라이언트가 파일을 생성할 권한을 가지고 있는지 등 다양한 검사를 수행한다. 검사를 통과하면 네임노드는 새로운 파일의 레코드를 만들고, 그렇지 않으면 파일 생성은 실패하고 클라이언트의 IOException이 발생한다. DistributedFileSystem은 데이터를 쓸 수 있도록 클라이언트에 FSDataOutputStream을 반환한다. 읽을 때와 마찬가지로 FSDataOutputStream은 데이터노드와 네임노드의 통신을 처리하는 DFSOutputStream으로 래핑된다.


  1. 클라이언트가 데이터를 쓸 때 DFSOutputStream은 데이터를 패킷으로 분리하고, 데이터 큐(data queue)라 불리는 내부 큐로 패킷을 보낸다. DataStreamer는 데이터 큐에 있는 패킷을 처리한다. 먼저 네임노드에 복제본을 저장할 데이터노드의 목록을 요청한다. 데이터노드 목록에 포함된 노드는 파이프라인을 형성하는데, 복제 수준이 3이면 세 개의 노드가 파이프라인에 속하게 된다.


  1. Datastreamer는 파이프라인의 첫 번째 데이터노드로 패킷을 전송한다. 첫 번째 데이터노드는 각 패킷을 저장하고 그것을 파이프라인의 두 번째 데이터노드로 보낸다. 이어서 두 번째 데이터노드는 받은 패킷을 저장하고 파이프라인의 세 번째(마지막) 데이터노드로 전달한다.


  1. DFSOutputStream은 데이터노드의 승인 여부를 기다리는 ack 큐라 불리는 내부 패킷 큐를 유지한다. ack 큐에 있는 패킷은 파이프라인의 모든 데이터노드로부터 ack 응답을 받아야 제거된다. 데이터를 쓰는 중에 데이터노드에 장애가 발생하면 다음과 같은 장애 복구 작업이 시작된다. 먼저 파이프라인이 닫히고 ack 큐에 있는 모든 패킷은 데이터 큐 앞쪽에 다시 추가된다. 이렇게 한면 다운스트림(downstream) 노드가 실패해도 패킷이 하나도 유실되지 않는다. 정상 데이터노드는 네임노드로부터 새로운 ID를 다시 받는다. 장애가 발생한 데이터노드가 나중에 다시 복구되면 불완전한 블록은 삭제된다. 장애 데이터노드는 파이프라인에서 제거되고, 정상인 나머지 두 데이터노드로 새로운 파이프라인을 구성한다. 블록의 남은 데이터는 파이프라인의 정상 데이터노드로 전송된다. 네임노드는 해당 블록이 불완전 복제라는 것을 인식하고 있으므로 나중에 다른 노드에 복제본이 생성되도록 조치한다. 이어서 후속 블록을 정상적으로 처리한다. 블록을 쓰는 중에 두 개 이상의 데이터노드에서 장애가 발생할 수 잇지만 그 가능성은 희박하다. dfs.namenode.replication.min(기본값은 1)에 설정된 복제소에만 블록이 저장되면 쓰기 작업은 성공한 것으로 간주된다. 그리고 목표 복제 계수(dfs.replication, 기본값은 3)에 도달할 때까지 클러스터에서 복제가 비동기적으로 수행된다.


  1. 데이터 쓰기를 완료할 때 클라이언트는 스트림에 close() 메소드를 호출한다. 이 메소드는 데이터노드 파이프라인에 남아 있는 모든 패킷을 플러시하고 승인이 나기를 기다린다.


  1. 모든 패킷이 완전히 전송되면 네임노드에 ‘파일 완료’ 신호를 보낸다. 네임노드는 DataStreamer를 통해 블록 할당 요청을 받았기 때문에 파일의 블록이 어떻게 구성되어 있는지 이미 알고 있으며, 최소한의 블록 복제가 완료되기를 기다렸다가 최종적으로 성공 신호를 반환한다.

복제본 배치

신뢰성과 쓰기 대역폭과 읽기 대역폭은 서로 트레이드오프 관계다. 예를 들어 쓰기 대역폭을 줄이기 위해 단일 노드에 모든 복제본을 배치하면(단일 노드에서 복제 파이프라인이 실행된) 실제로 중복성은 제공되지 않는다. 따라서 해당 노드에 장애가 발생하면 그 볼록의 데이터는 유실된다. 또한 외부 랙에서 일기를 수행할 때 읽기 대역폭이 증가하는 단점이 있다. 극단적인 사례지만, 서로 다른 데이터 센터에 복제본을 배치하면 중복성은 최대화시킬 수 있지만 대역폭 비용은 커진다. 오늘날 대부분의 하둡 클러스터가 운영되는 방식인 단일 데이터 센터 환경에서도 다양한 배치 전략이 나올 수 있다.


하둡의 전략은 첫 번째 복제본을 클라이언트와 같은 노드에 배치하는 것이다. 그런데 클라이언트가 클러스터 외부에 있으며(데이터노드가 아니면) 무작위로 노드를 선택한다. 물론 파일이 너무 많거나 바쁜 노드는 제외한다. 두 번째 복제본은 첫 번째 노드와 다른 랙(무작위)의 노드에 배치된다. 세 번째 복제본은 두 번째 노드와 같은 랙의 다른 노드에 배치된다. 그 이상의 복제본은 클러스터에서 무작위로 선택하여 배치한다. 물론 같은 랙에 너무 많은 복제본이 배치되지 않도록 설계 되어 있다. 네트워크 토폴로지를 고려하여 복제본의 위치를 선정한 후 파이프라인이 형성된다. 복제 계수가 3이면 아래와 같이 파이프라인이 형성된다.


하둡의 기본 전략은 신뢰성(블록을 두 랙에 저장), 쓰기 대역폭(쓰기는 하나의 네트워크 스위치만 통과), 읽기 성능(두 랙에서 가까운 랙을 선택), 그리고 클러스터 전반에 걸친 블록의 분산(클라이언트는 로컬 랙에 하나의 블록만 저장) 사이의 균형을 전체적으로 잘 맞추고 있다.


일관성 모델

파일에 대한 읽기와 쓰기의 데이터 가시성은 파일시스템의 일관성 모델로 설명할 수 있다. HDFS는 성능을 위해 일부 POSIX 요구 사항을 포기했다. 따라서 일부 연산은 우리가 기대했던 것과 다를 수 있다.


일단 파일을 생성하면 파일시스템의 네임스페이스에서 그 파일을 볼 수 있다.

Path p = new Path("p");
fs.create(p);
assertThat(fs.exists(p), is(true));


하지만 스트림이 플러시된 상황에서 파일에 저장된 내용을 바로 볼 수 잇는 것은 아니다. 따라서 파일의 길이는 0으로 나타낼 수 있다.

Path p =new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(), is(0L));


일단 데이터가 한 블록 이상 기록되면 새로운 리더는 첫 번째 블록의 내용을 볼 수 있다. 이러한 규칙은 후속 블록에도 똑같이 적용된다. 즉, 쓰기 작업이 진행 중인 블록의 내용을 다른 리더가 볼 수 없다.


HDFS는 FSDataOutputStream의 hflush() 메소드를 통해 데이터노드에 있는 모든 버퍼의 내용이 플러시되도록 강제하는 방법을 제공한다. hflush()가 성공했다는 것은 파이프라인에 있는 모든 데이터노드가 파일의 특정 부분까지 데이터 쓰기를 완료했다는 것을 의미하며, 이제부터 새로운 리더는 파일의 내용을 볼 수 있다.

Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.hflush();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));


hflush()는 데이터노드가 디스크에 데이터를 썼다는 것을 보장하지 않는다는 점을 주의해야 한다. hflush()를 호출하면 데이터노드의 메모리에 데이터를 쓰기 때문에 데이터 센터의 전원이 중단되면 데이터 유실 가능성이 있다. 강한 신뢰성을 보장받기 위해서는 hflush() 대신 hsync() 메소드를 사용해야 한다.

hsync()의 작업 방식은 버퍼에 저장된 데이터를 파일 기술자가 커밋하는 POSIX의 fsync() 시스템 호출과 유사하다. 예를 들어 자바 API로 로컬 파일을 쓸 때 스트림을 flush()하고 동기화(sync() 호출)를 하고 나면 저장된 내용을 바로 확인할 수 있다.

FileOutputStream out = new FileOutputStream(localFile);
out.write("content".getBytes("UTF-8"));
out.flush(); // 운영 체제에 플러시
out.getFD().sync(); // 디스크에 동기화
assertThat(localFile.length(), is(((long) "content".length())));

HDFS의 파일을 닫는 것은 암묵적으로 hflush()를 수행하는 것이다.

Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.close();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));


애플리케이션 설계를 위한 결론

일관성 모델은 애플리케이션을 설계하는 방식에 영향을 준다. hflush() 나 hsync()를 호출하지 않은 상황에서 클라이언트나 시스템에 장애가 발생하면 데이터 블록을 잃어버릴 수 있다. 이를 용납하는 애플리케이션은 거의 없기 때문에 어느 정도 레코드나 바이트를 쓴 후 적절한 시점에 반드시 hflush()를 호출해야 한다. hflush() 연산을 사용하더라도 HDFS에 큰 부하가 발생하지 않도록 설계되어지만 약간의 오버헤드(hsync() 는 더 크다)는 있다. 데이터의 강건성과 처리량은 서로 트레이드 오프 관계에 있다. 적절한 균형을 잡는 것은 애플리케이션에 따라 다른다. 따라서 hflush()(또는 hsync())의 빈도수를 계속 다르게 하여 수행시간을 측정한 후 최종적으로 적절한 값을 선택하면 된다.