Kafka-connect 예제



카프카 커넥트를 등록하기 위한 예제를 작성한다.



1. Kafka connect config


package org.dbsink.connect;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;

import java.util.Map;

public class DIPSinkDBInsertConnectorConfig extends AbstractConfig{

    private static final String CONFIG_CONNECTION_GROUP = "config group";
    public static final String CONFIG_NAME_SINK_TOPIC_NAME = "json 속성명";
    public static final String CONFIG_DOCUMENTATION_SINK_TOPIC_NAME = "해당 내용 설명";
    public static final String CONFIG_DISPLAY_SINK_TOPIC_NAME = "";

    public static ConfigDef config() {
        ConfigDef config = new ConfigDef();

        config.define(
                CONFIG_NAME_SINK_TOPIC_NAME,
                Type.STRING,
                ConfigDef.NO_DEFAULT_VALUE,
                Importance.HIGH,
                CONFIG_DOCUMENTATION_SINK_TOPIC_NAME,
                CONFIG_CONNECTION_GROUP,
                1,
                ConfigDef.Width.LONG,
                CONFIG_DISPLAY_SINK_TOPIC_NAME
        );

        return config;
    }

    public DIPSinkDBInsertConnectorConfig(Map<String, String> props) {
        super(config(), props);
    }

}



2. Kafka connect Connector


package org.dbsink.connect;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkConnector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class DIPSinkDBInsertConnector extends SinkConnector{

    private Map<String, String> configProperties;

    @Override
    public String version() {
        return "0.1";
    }

    @Override
    public void start(Map<String, String> props) {
        this.configProperties = props;
        try {
            new DIPSinkDBInsertConnectorConfig(props);
        }catch (ConfigException e){
            throw new ConnectException(e.getMessage(), e);
        }
    }

    @Override
    public Class<? extends Task> taskClass() {
        return DIPSinkDBInsertTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        Map<String, String> taskProps = new HashMap<>(configProperties);
        for (int i = 0; i < maxTasks; i++){
            taskConfigs.add(taskProps);
        }
        return taskConfigs;
    }

    @Override
    public ConfigDef config() {
        return DIPSinkDBInsertConnectorConfig.config();
    }

    @Override
    public void stop() {

    }

}



3. Kafka connect Task


package org.dbsink.connect;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;

import java.util.*;
import org.json.JSONObject;


import org.dbsink.connect.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * DIP 이미지 Sink Task 클래스
 * @author chanhae oh
 */
public class DIPSinkDBInsertTask extends SinkTask{

    /**
     * kafka config 변수
     */
    Properties pub_config = new Properties();

    /**
     * kafka connect config 변수
     */
    public DIPSinkDBInsertConnectorConfig config;

    @Override
    public String version() {
        return "0.1.98";
    }


    @Override
    public void start(Map<String, String> props) {
        this.config = new DIPSinkDBInsertConnectorConfig(props);
        SINK_TOPIC_NAME = config.getString(DIPSinkDBInsertConnectorConfig.CONFIG_NAME_SINK_TOPIC_NAME);
    }

    @Override
    public void put(Collection<SinkRecord> records) {
        if (records.isEmpty()) {
            return;
        }

        for (SinkRecord record : records) {

            String keyData = record.key().toString();
            String value = record.value().toString();
        }       
    }

    @Override
    public void stop() {
    }
}



4. kafka connect script


{
        "name":"sink name",
        "config":{
                "connector.class" : "java 패키지 경로",
                "topics" : "읽을 카프카 토픽",
                "topic.sink.name" : "카프카 config 에서 정의한 커스텀 속성",
                "key.converter" : "String",
                "value.converter" : "String"
        }

}



5. kafka connect plugin 저장소 업데이트


netstat -anlp | grep 8083
kill -9 pid
/kafka/bin/connect-distributed.sh -daemon /kafka/conf/connect-distributed.properties



6. Kafka connect 실행


curl -X POST -H "ContentType: application/json" localhost:8083/connectors -d @json



7. Kafka connect 상태 확인


curl -X GET localhost:8083/connectors | python -m json.tool
curl -X GET localhost:8083/connectors/"커넥터이름"/status | python -m json.tool