Hadoop 압축


파일 압축은 파일 저장 공간을 줄이고, 네트워크 또는 디스크로부터 데이터 전송을 고속화할 수 있는 두 가지 커다란 이점이 있다. 이러한 두 가지 이점은 대용량 데이터를 처리할 대 매우 중요하기 때문에 하둡에서 압축이 사용되는 방법을 주의 깊게 살펴봐야 한다.

다양한 특성을 가지고 있는 여러 종류의 압축 포맷, 도구, 알고리즘이 존재한다. 하둡에서 사용할 수 있는 일반적인 알고리즘이다.


압축 포맷 도구 알고리즘 파일 확장명 분할 가능
DEFLATE(1) N/A DEFLATE .deflate No
gzip gzip DEFLATE .gz No
bzip2 bzip2 bzip2 .bz2 Yes
LZO lzop LZO .lzo No(2)
LZ4 N/A LZ4 .lz4 No
Snappy N/A Snappy .snappy No

(1) DEFLATE는 표준 구현이 zlib인 압축 알 고리즘이다. gzip파일 포맷은 널리 이용되고 있지만 DEFLATE 포맷의 파일을 생성해주는 범용 명령행 도구는 없다. 주목할 점은 gzip 파일 표맷은 부가적인 헤더(header)와 푸터(footer)를 포함한 DEFLATE 파일 포맷이라는 것이다. .deflate 파일 확장명은 하둡의 관례다.
(2) 하지만 LZO 파일의 경우 전처리 과정에서 색인을 생성했다면 분할할 수 있다.

모든 압축 알고리즘은 압축과 해제가 빨라질수록 공간이 늘어나는 희생을 감수해야 하기 때문에 공간과 시간은 트레이드오프 관계에 있다. 보통 9개의 옵션(-1은 스피드 최적화, -9는 공간 최적화를 의미)을 제공함으로써 어느 정도 이러한 트레이드오프에 대한 제어를 가능하게 한다. 예를 들어 다음 명령은 가장 빠른 압축 메서드를 사용해서 file.gz 라는 압축 파일을 생성한다.

gzip -1 file

각 도구는 각기 다른 압축 특성이 있다. gzip은 일반적인 목적의 압축도구고, 공간/시간 트레이드오프의 중앙에 위치한다. bzip2는 gzip보다 더 효율적으로 압축하지만 대신 더 느리다. bzip2의 압축 해제 속도는 압축 속도보다 더 빠르지만 여전히 다른 포맷에 비해 더 느리다. 한편 LZO, LZ4, Snappy는 모두 속도에 최적화되었고 gzip보다 어느정도 빠르지만 압축 효율은 떨어진다. Snappy와 LZ5는 압축 해제 속도 측면에서 LZO보다 매우 빠르다.

분할 가능은 압축 포맷이 분할을 지원하는지 여부를 알려준다. 예를 들어 스트림의 특정 지점까지 탐색한 후 이후의 일부 지점으로부터 읽기를 시작할 수 있는지 알려준다. 분할 가능한 압축 포맷은 특히 맵리듀스에 적합하다.



코덱


코덱은 압축-해제 알고리즘을 구현한 것이다. 하둡에서 코덱은 CompressionCodec 인터페이스로 구현된다. 예를 들어 GzipCodec은 gzip을 위한 압축과 해제 알고리즘을 담고 있다.

</tbody> </table> LZO 라이브러리는 GPL 라이선스고 아파치 배포판에 포함되지 않을 수 있으므로 하둡 코덱은 http://code.google.com/p/hadoop-gpl-compression/http://github.com/kevinweil/hadoop-lzo 에서 별도로 내려받아야 한다. LzopCodec은 lzop 도구와 호환되고 핵심적인 LZO 포맷에 부가적인 헤더가 포함된 것으로 일반적으로 사용자가 원하는 형태다. 또한 순수한 LZO 포맷을 위한 LzoCodec이 있는데, .lzo_deflate 파일 확장명(DEFLATE로 유축해봤을 때 헤더 없는 gzip 포맷이다)을 사용한다.

# CompressionCodec 을 통한 압축 및 해제 스트림
CompressionCodec은 데이터를 쉽게 압축하거나 해제해주는 두 개의 메서드를 제공한다. 출력 스트림에 쓸 데이터를 압축하려면 createOutputStream(OutputStreamout out) 메서드를 사용하는데, 이 메서드는 압축되지 않은 데이터를 압축된 형태로 내부 스트림에 쓰는 CompressionOutputStream을 생성한다 반대로 입력 스트림으로부터 읽어 들인 데이터를 압축 해제하려면 createInputStream(InputStream in) 메서드를 호출하는데, 미 메서드는 기존 스트림으로부터 비압축 데이터를 읽을 수 있는 CompressioninputStream을 반환한다. CompressionOutputStream과 CompressionInputStream은 기존 압축기 또는 압축 해제기를 재설정할 수 잇는 기능을 제공하는 것을 제외하면 각가 java.util.zip.DeflaterOutputStream, java.util.zip.DeflaterInputStream과 비슷하다. SequenceFile과 같이 데이터 스트림의 구간을 별개의 블록으로 압축하는 애플리케이션에서 사용되는 중요한 메서드다. ```java public class StreamCompressor { public static void main(String[] args) throw Exception { String codecClassname = args[0]; Class codeClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); CompressionOutputStream out = codec.createOutputStream(System.out); IOUtils.copyBytes(System.in, out, 4096, false); out.finish(); } } ``` 애플리케이션의 첫 번째 명령행 인자는 CompressionCodec 패키지의 정규화된 전체 이름이다. 그 코덱의 새로운 인스턴스를 생성하기 위해 RevflectionUtils를 사용하고 System.out 의 압축 래퍼를 얻는다. 그리고 입력을 출력으로 복사하기 위해 IOUtils의 copyBytes() 유틸리티 메서드를 호출하면 CompressionOutputStream으로 압축한다. 마지막으로 CompressionOutputStream의 finish()를 호출하고, 압축 도구에 압축된 스트림 쓰기 중단을 요청하지만 스트림 자체를 닫지는 않는다. 이를 다음과 같이 명령행에서 시험해볼 수 있는데, 이 예제는 GzipCodec으로 StreamCompressor를 사용해서 'Text' 문자열을 압축하고 gunzip을 사용하여 표준 입력으로부터 압축 해제 한다. ```bash echo "Text" | hadoop StreamCompressor org.apache.hadoop.io.copmress.GzipCodec | gunzip - ```

# CompressionCodecFactory를 사용하여 CompressionCodec 유추하기
압축된 파일을 읽을 때 일반적으로 해당 파일 확장명을 보면 사용된 코덱을 유축할 수 있다 예를 들어 .gz 로 끝나는 파일은 GzipCodec으로 읽을 수 있다. CompressionCodecFactory의 getCodec() 메소드는 지정된 파일에 대한 Path 객체를 인자로 받아 파일 확장명에 맞는 CompressionCodec을 찾아준다. ```java public class FileDecompressor { public static void main(String[] args) throws Exception { String uri = args[0]; Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(uri), conf); Path inputPath = new Path(uri); CompressionCodecFactory factory = new CompressionCodecFactory(conf); CompressionCodec codec = factory.getCodec(inputPath); if(codec == null){ System.err.println("No codec found for " + uri); System.exit(1); } String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension()); InputStream in = null; OutputStream out = null; try{ in = codec.createInputStream(fs.open(inputPath)); out = fs.create(new Path(outputUri)); IOUtils.copyBytes(in, out, conf); } finally { IOUtils.closeStream(in); IOUtils.closeStream(out); } } } ``` 일단 코덱을 찾았으면 출력 파일의 이름을 생성하기 위해 CompressionCodecFactory의 removeSuffix() 정적 메서드로 파일 접미사를 제거한다. 따라서 다음처럼 프로그램을 호출하면 file.gz로 명명된 파일은 file로 압축 해제된다. ``` hadoop FileDecompressor file.gz ``` CompressionCodecFacgtory는 LZO 제외 코덱과 io.compression.codecs 속성에 나열된 코덱을 불러온다. 기본적으로 io.compression.codecs 속성은 비어 있다. 따라서 새롭게 등록하는 커스텀 코덱(외부 로딩이 필요한 LZO 코덱과 같은)을 얻고자 할 때만 환경 속성을 변경할 필요가 있다. 각 코덱은 기본 파일 확장명을 알고 있으므로 CompressionCodecFactory가 주어진 확장과 일치하는 코덱을 찾기 위해 등록된 코덱 전체를 검색한다.
압축 포맷 하둡 압축 코덱
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
Gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.Bzip2Codec
LZO org.apache.hadoop.io.compress.LzopCodec
LZ4 org.apache.hadoop.io.compress.Lz4Codec
Snappy org.apache.hadoop.io.compress.SnappyCodec
속성명 타입 기본값 설명
io.compression.codecs 콤마로 구분된 클래스 이름 압축 및 해제를 위해 추가하고자 하는 CompressionCodec 클래스 목록


# 원시 라이브러리
성능 관점에서 압축과 해제를 위해 원시 라이브러리를 사용하는 것이 바람직하다. 예를 들어 원시 gzip 라이브러리를 사용하여 테스트해본 결과 압축 해제 성능은 최대 50%, 압축 성능은 거의 최대 10% 정도 더 좋아졌다(내장된 자바 구현과 비교하여). 각 압축 포맷에 대한 자바와 원시 구현제의 사용 가능 여부를 보여준다. 모든 포맷은 원시 구현체를 가지고 있지만 모든 포맷이 자바 구현체를 가지고 있는 것은 아니다(ex. LZO).
압축 포맷 자바 구현체 원시 구현체
DEFLATE Yes Yes
gzip Yes Yes
bzip2 Yes Yes
LZO No Yes
LZ4 No Yes
Snappy No Yes

하둡은 lib/native 디렉터리에 미리 빌드된 64비트 리눅스용 원시 압축 라이브러리인 libhadoop.so 를 제공한다. 다른 플랫폼은 소스 최상단의 BUILDING.txt 문서의 지시에 따라 직접 라이브러리를 컴파일해야 한다. 원시 라이브러리는 java.library.path 라는 자바 시스템 속성에 따라 선택된다. etc/hadoop 디렉터리에 있는 hadoop 스크립트가 이 속성을 직접 설정해주지만, 이 스크립트를 사용하지 않는다면 애플리케이션에서 설정할 수 도 있다. 기본적으로 하둡은 자신이 수행되는 플랫폼에 맞는 원시 라이브러리를 먼저 찾고, 있으면 자동으로 해당 라이브러리를 로드한다. 이것은 원시 라이브러리를 사용하기 위해 어떠한 환경 설정도 변경할 필요가 없음을 의미한다. 그러나 압축 관련 문제를 디버깅하는 것과 같은 상황에서는 원시 라이브러리의 사용을 비활성화하고 싶을 수도 있다. 이를 위해 io.native.lib.available 속성을 false로 설정하면 되는데, 이때 내장되어 있는 자바와 동등한 코덱 라이브러리가 사용된다. 코덱 풀. 만일 원시 라이브러리를 사용하고 애플리케이션에서 상당히 많은 압축 도는 해제 작업을 수행해야 한다면 압축기와 해제기를 재사용해서 객체 생성 비용을 절감할 수 있는 CodecPool의 사용을 고려하는 것이 좋다. 코드에서 API를 사용하는 예제를 볼 수 있다. 다만 단일 압축기(Compressor)만 생성하기 때문에 사실 코덱 풀을 사용할 필요는 없다. ```java public class PooledStreamCompressor { public static void main(String[] args) throws Exception { String codecClassname = args[0]; Class codecClass = Class.forName(codecClassname); Configuration conf = new Configuration(); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf); Compressor compressor = null; try { compressor = CodecPool.getCompressor(codec); CompressionOutputStream out = code.createOutputStream(System.out, compressor); IOUtils.copyBytes(System.in, out, 4096, false); out.finish(); } finally { CodecPool.returnCompressor(compressor); } } } ``` 주어진 CompressionCodec으로 풀에서 Compressor 인스턴스를 얻고, 그 코덱을 재정의한 createOutputStream() 메서드에서 이 인스턴스를 사용한다. Finally 블록을 사용하여 스트림 간에 바이트를 복사하는 과정에서 IOException이 발생해도 압축기 인스턴스가 풀로 반환된도록 보장한다. # 압축과 입력 스플릿 맵리듀스로 처리되는 데이터를 어떻게 압축할지 고민하는 시점에 압축 포맷이 분할을 지원하는지 여부를 알고 있는 것은 중요하다. HDFS에 1GB 크기로 저장된 비압축 파일을 고려했을 때 128MB 크기의 HDFS 블록 8개가 저장되어 있을 때 이 파일을 입력으로 사용하는 맵리듀스 잡은 개별적인 맵 태스크에서 독립적으로 처리되는 8개의 입력 스플릿을 생성할 것이다. 파일이 압축된 크기가 1GB인 단일 gzip 압축 파일이라고 가정했을 때 이전처럼 HDFS는 그 파일을 8개의 블록으로 저장할 것이다. 그러나 gzip 스트림이 특정 위치에서 읽기를 지원하지 않기 때문에 각 블록별로 스플릿을 생성할 수 없다. 그러므로 맵 태스크가 각 블록 스플릿을 개별적으로 읽는 것은 불가능하다. gzip 포맷은 압축된 데이터를 저장하기 위해 DEFLATE를 사용하고, DEFLATE는 데이터를 일련의 압축된 블록으로 저장한다. 리더가 다음 블록의 시작으로 이동하려면 스트림과 동기화되어 그 스트림의 특정 지점에 있을 수 있는 어떤 방법을 지원해야 하는데, DEFLATE 압축 방식은 각 블록의 시작점을 구별할 수 없다는 문제가 있다. 이러한 이유로 gzip은 분할을 지원하지 않는다. 이 때 맵리듀스는 입력이 gzip 압축(파일 확장명을 통해)이고, gzip은 분할을 지원하지 않는다는 것을 인식하기 때문에 파일을 분할하려 하지 않으면서 최적의 방식으로 작동할 것이다. 이것은 제대로 동작하긴 하지만 지역성 비용이 발생한다. 즉, 단일 맵이 8개의 HDFS 블록을 모두 처리해야 하는데, 블록 대부분은 맵의 로컬에 있지 않을 것이다. 또한 소수의 맵으로 잡이 덜 세분화되기 때문에 결국 시간이 더 많이 걸릴 것이다. 이 상황에서 압축 파일이 LZO라면 내부의 압축 포맷은 스트림과 동기화되는 방법을 리더에 제공하지 않기 때문에 같은 문제가 생길 것이다. 하지만 하둡 LZO 라이브러리에 포함된 색인 도구를 사용해서 LZO 파일을 전처리할 수 있다. 색인 도구는 스플릿 지점의 색인을 구축하고 적당한 맵리듀스 입력 포맷으로 사용되어 파일을 효율적으로 분할할 것이다. 반면 bzip2 파일은 블록 사이에서 동기화 표시자(synchronization marker)를 제공(파이의 48비트 근사치)하므로 분할을 지원한다. > 압축 포맷 선택 >
1. 압축과 분할 모두를 지원하는 시퀀스 파일, 에이브로, ORCFile, 파케이 같은 컨테이너 파일 포맷을 사용하면 보통 LZO, LZ4, Snappy와 같은 빠른 압축 형식을 사용한다. >
2. 상당히 느리긴 하지만 bzip2 같이 분할을 지원하는 압축 포맷을 사용하거나 분할을 지원하기 위해 색인 될 수 있는 LZO 같은 포맷을 사용한다. >
3. 애플리케이션에서 파일을 청크로 분할하고, 지원되는 모든 압축 포맷(분할에 관계없이)을 사용하여 각 청크를 개별적으로 압축하라. 이 경우 압축된 청크가 거의 HDFS 블록 하나의 크기가 되도록 청크 크기를 선택해야 한다. >
4. 파일을 압축하지 말고 그냥 저장하는 방법도 있다. 파일의 크기가 매우 크면 전체 파일에 대한 분할을 지원하지 않는 압축 포맷은 권장하지 않는다. 그이유는 지역성을 보장하지 않으면 상당히 비효율적인 맬리듀스 애플리케이션이 되기 때문이다.

# 맵리듀스에서 압축 사용하기
맵리듀스 잡의 출력을 압축하려면 잡 환경 설정에서 mapreduce.output.fileoutputformat.compress 속성을 true로 설정하고, 사용할 압축 코덱의 클래스 이름을 mapreduce.output.fileoutputformat.compress.codec 속성에 지정한다. FileOutputFormat의 정적 평의 메소드로 그 속성을 설정할 수 있다. > 압축된 출력을 생성하는 최고 기온 잡을 수행하는 어플리케이션 ```java public class MaxTemperatureWithCompression { public static void main(String[] args) throws Exception { if (args.length != 2){ System.err.println("Usage: MaxTemperatureWithCompression " + ""); System.exit(-1); } Job job = new Job(); job.setJarByClass(MaxTemperature.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); job.setMapperClass(MaxTemperatureMapper.class); job.setCombinerClass(MaxTemperatureReducer.class); job.setReducerClass(MaxTemperatureReducer.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } } ``` 출력과 동일한 압축 포맷을 사용했지만 꼭 같을 필요는 없다. ```bash hadoop MaxTemperatureWithCompression input/sample.txt.gz output ```