Hive UDAF

집계 함수는 정규 UDF보다 만들기 힘들다. 수 많은 태스크에 걸쳐 있는 청크에서 그 값을 모두 모아야 하기 때문이다. 따라서 부분 집계를 다시 최종 결과로 결함할 수 있는 기능을 반드시 구현해야 한다.

package com.hadoopbook.hive;

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;

public class Maximum extends UDAF {
  public static class MaximumIntUDAFEvaluator implements UDAFEvaluator {
    private IntWritable result;

    public void init(){
      result = null;
    } 

    public boolean iterate(IntWritable value) {
      if(value == null) {
        return true;
      }
      if(result == null) {
        result = new IntWritable(value.get());
      }else{
        result.set(Math.max(result.get(), value.get()));
      }
      return true;
    }

    public IntWritable terminatePartial() {
      return result;
    }
  
    public boolean merge(IntWritable other) {
      return iterate(other);
    }

    public IntWritable terminate() {
      return result;
    }
  }
}

클래스 구조는 UDF와 조금 다르다. UDAF는 org.apache.hadoop.hive.ql.exec.UDAF의 서브 클래스를 정의해야 하고 org.apache.hadoop.hive.ql.exec.UDAFEvaluator를 구현한 하나 이상의 중첩된 정적 클래스를 반드시 포함해야 한다. 이 예제에는 MaximumIntUDAFEvaluator라는 중첩 클래스가 하나만 있다. 하지만 long 값이나 float 값 등의 집합에서 최댓값을 구하려면 재정의된 UDAF를 제공해야 하므로 MaximumLongUDAFEvaluator, MaximumFloatUDAFEvaluator 와 같은 평가자를 추가할 필요가 있다.


  • init() : 평가자를 초기화하고 내부 상태를 재설정한다. MaximumIntUDAFEvaluator에서는 최종 결과를 유지할 IntWritable 객체를 null로 설정한다. 아직 어떤 값도 집계되지 않았다는 의미로 null을 사용했는데, 이는 공집합에 대한 최댓값을 NULL로 설정하는 실질적인 효과가 있다.

  • iterate() : 집계할 값이 나올 때마다 호출된다. 평가자는 집계 결과와 함께 내부 상태를 갱신해야 한다. iterate()의 인자는 그 함수가 호출된 하이브 함수의 인자와 대응된다. 이 예제에서는 인자가 하나뿐이다. 먼저 인자의 값이 null인지 확인하여 null이면 무시한다. 그렇지 않고 값이 있을 때 처음으로 값을 처리하는 것이라면 result 인스턴스 변수에 그 값을 설정하고, 그 전에 하나 이상의 값이 처리되었으면 현재 결과와 그 값을 비교하여 더 큰 값을 설정한다. 입력값이 유효하면 true를 반화한다.

  • terminatePartial() : 하이브가 부분 집계 결과를 얻을 대 호출된다. 이 메서드는 집계 상태를 요약한 객체를 반환해야 한다. 이 예제에는 최댓값이나 처리된 값이 없으면 null을 반환하는 IntWritable이 하나 있다.

  • merge() : 하이브가 여러 개의 부분 집계를 결합할 때 호출된다. 이 메서드는 terminatePartial() 메서드의 반환 자료형과 대응하는 자료형을 가진 단일 객체를 가진다. 이 예제에서는 값을 집계하는 방식과 부분 집계를 처리하느 ㄴ방식이 같이 깨문에 단순히 iterate() 메서드로 위임했다. 이러한 방식은 일반적인 사례는 아니며(나중에 일반적인 예제를 다루겠다). 이 메서드는 평가자의 상태와 부분 집계의 상태를 결합하기 위한 로직을 구현해야 한다.

  • terminate() : 집계의 최종 결과를 얻을 때 호출된다. 평가자는 그 상태를 하나의 값으로 반환해야 한다.

CREATE TEMPORARY FUNCTION maximum AS 'com.hadoopbook.hive.Maximum';
SELECT maximum(temperature) FROM records;