티스토리 뷰

메모/kafka

kafka source connector, sink connector

4567은 소수 2025. 1. 18. 21:13

Source Connector : 소스 애플리케이션, 소스 파일로부터 데이터를 가져와 토픽으로 넣는 역할 

 

오픈소스로 제공되는 것도 많지만, 라이센스 문제, 커스텀 기능 등 구현이 필요한 경우도 많음 

Kafka 커넥트 라이브러리의 SourceConnector, SourceTask 클래스 이용해 구현을 하면 된다.

커넥터 빌드 후, jar 파일을 플러그인으로 추가하여 사용하면 된다. 

 

source connector 만들 때 필요한 라이브러리 : org.apache.kafka.connect-api 

 

SourceConnector class : 태스크 실행 전, 커넥터 설정 파일 초기화 및 어떤 태스크 클래스를 사용할 것인지 정의 

SourceTask class : 소스 애플리케이션, 파일로부터 데이터 가져와 토픽으로 데이터를 보내는 역할을 수행 

자체적인 오프셋을 설정하여 읽어오는 파일의 어느 부분까지 읽었는지 저장할 수 있음 

 

SourceConnector 구현을 위해서는 SourceConnector class 상속받아 아래 메서드를 구현해야 함

- version : 커넥터 버전 지정 

- start : 사용자가 json, config 파일 형태로 입력한 설정값을 초기화 (ex. JDBC 커넥션 URL 설정 및 검증) 

- taskClass : 커넥터가 사용할 태스크 클래스 지정 

- taskConfigs : 태스크 2개 이상인 경우, 태스크마다 다른 옵션 설정을 위해 사용 

- config : 커넥터가 사용할 설정값에 대한 정보를 받음. ConfigDef class를 통해 설정 이름, 기본값, 중요도, 설명 등을 정의할 수 있음

- stop : 커넥터 종료 시 필요한 로직 작성

 

SourceTask 구현을 위해서는 SourceTask class 상속받아 아래 메서드를 구현해야 함

- version : 태스크 버전 지정. SourceConnector.version() 과 동일한 버전으로 관리하는 게 좋음

- start : 태스크 시작 시 필요한 로직 작성. 태스크에 필요한 리소스를 start에서 초기화하면 좋음. (ex. JDBC 커넥션 생성)

- poll : 소스 애플리케이션, 파일로부터 데이터 읽어오는 로직 작성. 데이터를 읽어오면 토픽으로 보낼 데이터를 SourceRecord로 정의하여 List<SourceRecord>로 리턴하면 데이터가 토픽으로 전송됨. 

- stop : 태스크 종료 시 필요한 로직 작성. (ex. JDBC 커넥션 종료) 

 


test.txt의 파일을 읽어오는 source connector를 아래와 같이 구현한다. 

 

FileSourceConnector.java : SourceConnector class 상속받은 source connector, 실제 커넥터 기능을 수행할 애플리케이션

FileSourceConnectorConfig.java : AbstractConfig class 상속받아 config 관련된 내용을 작성. FileSourceConnector class에서 커넥터 관련 설정을 위해 사용 

FileSourceTask.java : SourceTask class 상속받은 실제 태스크 수행 클래스

 

FileSourceConnector.java

package org.example;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// SourceConnector 상속받아 커스텀된 FileSourceConnector 작성 
// 파일 하나를 읽어서 토픽에 전송하는 커넥터
// FileSourceConnector는 실제 커넥터에서 사용할 커넥트 이름 
public class FileSourceConnector extends SourceConnector {
    
    private final Logger logger = LoggerFactory.getLogger(FileSourceConnector.class);
    // 커넥터 설정값을 관리하는 용도 
    private Map<String, String> configProperties;

    @Override
    public String version() {
        // 커넥터 버전 지정 
        // task 버전과 일치하면서 관리하는게 좋음 
        return "1.0";
    }

    @Override 
    public void start(Map<String, String> props) {
        // 커넥터 생성 시 필요한 설정값 등록 
        // 설정은 FileSourceConnectorConfig 호출하여 등록 
        this.configProperties = props;
        try {
            new FileSourceConnectorConfig(props);
        } catch (ConfigException e) {
            throw new ConnectException(e.getMessage(), e);
        }
    }

    @Override 
    public Class<? extends Task> taskClass() {
        // 사용할 task 이름 등록 
        return FileSourceTask.class;
    }

    @Override 
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        // task가 2개 이상인 경우, 각 태스크 별 설정값을 적용하기 위함 
        // 여기서는 동일하게 configProperties로 설정 
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        Map<String, String> taskProps = new HashMap<>();
        
        taskProps.putAll(configProperties);
        for(int i = 0; i < maxTasks; i++) {
            taskConfigs.add(taskProps);
        }

        return taskConfigs;
    }

    @Override 
    public ConfigDef config() {
        // 커넥터에서 사용할 설정값 지정 
        return FileSourceConnectorConfig.CONFIG;
    }

    @Override 
    public void stop() {
        // 커넥터 종료 시 필요한 로직 
        // ex. jdbc 세션 종료 
    }
}

 

FileSourceConnectorConfig.java

package org.example;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;

import java.util.Map;

// 파일을 읽어서 토픽에 메시지 보낼 커넥터를 위한 설정 
// FileSourceConnectorConfig 라는 이름으로 실제 커넥터에서 설정을 하면 됨
public class FileSourceConnectorConfig extends AbstractConfig {
    
    // 읽을 파일 경로 및 이름을 지정
    // 다른 class에서는 file 이라는 값으로 해당 파일을 읽음 
    public static final String DIR_FILE_NAME = "file";
    private static final String DIR_FILE_NAME_DEFAULT_VALUE = "/Users/hobin/test.txt";
    private static final String DIR_FILE_NAME_DOC = "읽을 파일 경로 및 이름";

    // 메시지를 보낼 토픽을 지정 
    // 다른 class에서는 topic 이라는 값으로 해당 토픽에 메시지를 보냄
    public static final String TOPIC_NAME = "topic";
    private static final String TOPIC_DEFAULT_VALUE = "custom-connector-test";
    private static final String TOPIC_DOC = "보낼 토픽";

    // ConfigDef를 이용해서 파일, 토픽을 지정한다. 
    public static ConfigDef CONFIG = new ConfigDef().define(
            DIR_FILE_NAME, Type.STRING, DIR_FILE_NAME_DEFAULT_VALUE, Importance.HIGH, DIR_FILE_NAME_DOC
        ).define(
            TOPIC_NAME, Type.STRING, TOPIC_DEFAULT_VALUE, Importance.HIGH, TOPIC_DOC
        );

    // AbstractConfig 이용해서 작성한 config를 적용한다.  
    public FileSourceConnectorConfig(Map<String, String> props) {
        super(CONFIG, props);
    }
}

 

FileSourceTask.java

package org.example;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

// 실제 로직을 수행하는 task
public class FileSourceTask extends SourceTask {
    private Logger logger = LoggerFactory.getLogger(FileSourceTask.class);

    // 읽는 파일 이름, 읽은 지점을 오프셋 스토리지에 저장하기 위함
    public final String FILENAME_FIELD = "filename";
    public final String POSITION_FIELD = "position";

    // filename을 키, 실제 파일 이름을 value로 사용한다. 
    private Map<String, String> fileNamePartition;
    // 커넥터가 읽은 지점인 오프셋 저장하기 위함 
    private Map<String, Object> offset;
    private String topic;
    private String file;
    private long position = -1;


    @Override
    public String version() {
        // task 버전을 명시, connector 버전과 동일하게 관리하면 좋음 
        return "1.0";
    }

    @Override
    public void start(Map<String, String> props) {
        try {
            // task에 대한 초기 설정 
            // connector config에서 설정한 값을 가져온다. 
            FileSourceConnectorConfig config = new FileSourceConnectorConfig(props);
            topic = config.getString(FileSourceConnectorConfig.TOPIC_NAME);
            file = config.getString(FileSourceConnectorConfig.DIR_FILE_NAME);
            // 읽을 파일 및 오프셋을 설정한다. 
            fileNamePartition = Collections.singletonMap(FILENAME_FIELD, file);
            offset = context.offsetStorageReader().offset(fileNamePartition);


            if (offset != null) {
                // 현재까지 처리된 오프셋 정보를 가져온다. 
                Object lastReadFileOffset = offset.get(POSITION_FIELD);
                if (lastReadFileOffset != null) {
                    position = (Long) lastReadFileOffset;
                }
            } else {
                // 처리된 게 없으면 처음부터 진행 
                position = 0;
            }

        } catch (Exception e) {
            throw new ConnectException(e.getMessage(), e);
        }
    }

    @Override
    public List<SourceRecord> poll() {
        // 파일을 지속적으로 읽기 위한 실질적인 task 
        List<SourceRecord> results = new ArrayList<>();
        try {
            // 1초 단위로 파일 체크 
            Thread.sleep(1000);

            // position line 이후부터 읽기 
            List<String> lines = getLines(position);

            if (lines.size() > 0) {
                // 처리할 거 있을 시, 
                // SourceRecord 타입으로 읽은 내용 처리 
                lines.forEach(line -> {
                    Map<String, Long> sourceOffset = Collections.singletonMap(POSITION_FIELD, ++position);
                    SourceRecord sourceRecord = new SourceRecord(fileNamePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line);
                    results.add(sourceRecord);
                });
            }
            // 읽은 내용 polling
            return results;
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            throw new ConnectException(e.getMessage(), e);
        }
    }

    private List<String> getLines(long readLine) throws Exception {
        // 파일에서 해당 라인 이후부터 읽음 
        BufferedReader reader = Files.newBufferedReader(Paths.get(file));
        return reader.lines().skip(readLine).collect(Collectors.toList());
    }

    @Override
    public void stop() {
        // task 종료할 때 처리할 로직 
    }
}

 

카프카 커넥터를 플러그인으로 적용하기 위해서는 관련 dependencies가 모두 플러그인 디렉토리에 jar 형식으로 있어야 한다. 

 

하지만, 책에 나온데로 gradle 설정을 했지만, 버전 차이 때문인지, 아직 gradle에 익숙하지 않은건지, 빌드가 제대로 되지 않아, shadowJar이라는 오픈소스를 사용해 한 번에 jar 만드는 것으로 진행했다. 

 

build.gradle

/*
 * This file was generated by the Gradle 'init' task.
 *
 * This generated file contains a sample Java application project to get you started.
 * For more details on building Java & JVM projects, please refer to https://docs.gradle.org/8.12/userguide/building_java_projects.html in the Gradle documentation.
 */

plugins {
    // Apply the application plugin to add support for building a CLI application in Java.
    id 'application'
    id 'com.github.johnrengelman.shadow' version '8.1.1'
    id 'java'
}

repositories {
    // Use Maven Central for resolving dependencies.
    mavenCentral()
}

dependencies {
    implementation 'org.apache.kafka:connect-api:2.5.1'
    implementation 'org.slf4j:slf4j-simple:1.7.30'
}

// Apply a specific Java toolchain to ease working on different environments.
java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(17)
    }
}

application {
    // Define the main class for the application.
    mainClass = 'org.example.FileSourceConnector'
}

shadowJar {
    archiveBaseName.set('kafka-file-source-connector')
    archiveVersion.set('0.0.1')
    archiveClassifier.set('all')
    mergeServiceFiles()
}

 

shadowJar 이용해 빌드 : gradlew clean shadowJar

 

빌드 후, app/build/libs/ 에 jar이 생성된 것을 알 수 있고, jar tf로 모든 디펜던시가 포함되어 있음을 확인할 수 있다. 

( ex. jar tf kafka-file-source-connector-0.0.1-all.jar )


source connector를 설정하기 위해서는 아래와 같이 connect와 connector properties 파일을 설정해주어야 한다. 

 

간단히 standalone 모드로 테스트를 위해 connect-standalone.properties 파일 중 아래 내용을 수정한다. 

...
plugin.path=/Users/hobin/plugins/kafka-file-source-connector

 

해당 properties 파일을 이용해 커넥트 실행 시, 플러그인 위치를 추가해주는 것이다. 각 플러그인마다 하위 디펜던시 설정 등이 있을 수 있으며, 커넥터가 플러그인 적용 시 디렉토리 단위로 파일을 검색하므로, 각 플러그인은 별도 디렉토리로 관리하는 것이 좋다. 

 

그리고 커스텀으로 만든 커넥터를 지정하기 위해 기본으로 있는 connect-file-source.properties를 참고해 my-source-connector.properties 파일을 추가로 만들고, 아래와 같이 설정하였다. 

 

my-source-connector.properties

name=my-source-connector
connector.class=org.example.FileSourceConnector
tasks.max=1
topic=custom-connector-test

 

name : 커넥터 이름 지정

connector.class : 실행될 커넥터 class. 앞서 커스텀하게 생성한 커넥터 클래스의 이름을 지정하면 된다. 

tasks.max : 생성할 최대 태스크 수 

topic : 데이터를 넘겨줄 토픽 이름. 생성한 커넥터에 지정한 토픽 이름으로 지정하면 된다. 

 

기타 옵션은 공식 docs에서 확인할 수 있다.

https://kafka.apache.org/documentation/#connectconfigs

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

 

ㅁ 주의 사항

처음에는 ~를 기준으로 path 설정을 했는데 커넥터가 아무런 데이터를 읽어오지 않길래 모두 절대 경로로 바꾸니 해결되었다. 상대 경로 설정 시, 실행되는 디렉토리 기준으로 상대 경로를 인식하는 것은 알고 있었지만, ~도 인식하지 못하는 것은 몰랐다. path 설정 시에는 안전하게 절대 경로로 처리를 해주는 것이 좋다. 

 

standalone 모드로 위 설정을 이용해 커넥터 실행을 하면 다음 결과를 얻을 수 있다. 

./bin/connect-standalone.sh ./config/connect-test/connect-standalone.properties ./config/connect-test/my-source-connector.properties

 

실행 후, test.txt에 아무 말이나 입력하고, consumer로 확인 시 다음과 같이 토픽에 잘 저장됨을 확인할 수 있다. 

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic custom-connector-test
312
1231231
2321321
312
312

 


 

Sink Connector : 토픽의 데이터를 타겟 애플리케이션, 파일로 저장하는 역할 

SinkConnector, SinkTask class를 이용해 직접 구현이 가능하다. 

Source Connector와 마찬가지로 빌드 후 jar 파일을 플러그인으로 추가하여 사용할 수 있다. 

 

SinkConnector class : 태스크 실행 전, 사용자로부터 입력받은 설정값을 초기화하고, 어떤 태스크 클래스를 사용할 것인지 정의 

SinkTask : 실제 데이터 처리하는 로직을 담당, 커넥트에서 컨슈머 역할을 하고, 데이터를 저장하는 코드를 가짐 

 

SinkConnector 구현을 위해서는 SinkConnector class 상속 후 아래 메서드를 구현하면 된다. 

- version : sink connector 버전을 리턴

- start : 사용자가 json, config 파일 형태로 입혁한 설정값을 초기화 

- taskClass : 커넥터가 사용할 태스크 클래스를 지정

- taskConfigs : 태스크가 2개 이상인 경우, 태스크 별 옵션을 설정할 때 사용

- config : 커넥터가 사용할 설정값에 대한 정보를 처리. ConfigDef 클래스를 통해 설정의 이름, 기본값, 중요도, 설명 등을 정의

- stop : 커넥터가 종료될 때 필요한 로직을 작성 

 

SinkTask 구현을 위해서는 SinkTask class 상속 후 아래 메서드를 구현하면 된다. 

- version : 태스크의 버전을 지정, 커넥터의 version과 동일한 버전으로 관리하는 게 좋음. 

- start : 태스크 시작 시 필요한 로직을 작성. 데이터 처리에 필요한 리소스를 초기화하면 좋음. (ex. DB와의 커넥션 설정)

- put : 애플리케이션, 파일 등에 저장할 데이터를 토픽에서 주기적으로 가져오는 메서드. SinkRecord 타입을 여러 개 묶어 파라미터로 사용. SinkRecord는 토픽의 하나의 레코드를 의미하며, 토픽, 파티션, 타임스탬프 정보 등을 담고 있음.

- flush : put 메서드를 통해 가져온 데이터를 일정 주기로 싱크 애플리케이션, 싱크 파일에 저장할 때 사용하는 로직. put 메서드로 데이터를 토픽에서 가져와 DB에 insert하고, flush 메서드로 DB에 commit하는 역할

- stop : 태스크가 종료될 때 필요한 로직을 작성. 태스크에서 사용한 리소스를 종료하는데에 사용. 


위에서 작성한 Source Connector에서 test.txt를 custom-connector-test 토픽에 저장을 해두었다. 따라서 custom-connector-test 토픽에서 test2.txt로 데이터를 가져오도록 Sink Connector를 구현한다. 

 

SinkConnector와 SourceConnector의 내용은 대부분 유사하게 구성이 되며, 코드는 아래와 같다. 

 

FileSinkConnectorConfig.java

package org.example;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import java.util.Map;

// 토픽의 데이터를 읽어서 파일에 저장하는 커넥터를 위한 설정 
// FileSinkConnectorConfig 라는 이름으로 커넥터에서 설정을 하면 됨
public class FileSinkConnectorConfig extends AbstractConfig {

    // 저장할 파일 경로 및 이름 지정 
    public static final String DIR_FILE_NAME = "file";
    private static final String DIR_FILE_NAME_DEFAULT_VALUE = "/Users/hobin/test2.txt";
    private static final String DIR_FILE_NAME_DOC = "저장할 디렉토리와 파일 이름";

    // 커넥터에서 사용할 옵션을 지정
    // Source Connector와는 다르게, properties를 통해 지정한 토픽에서 데이터를 가져오므로 
    // 별도 토픽을 config에 지정하지 않아도 됨. 
    public static ConfigDef CONFIG = new ConfigDef().define(DIR_FILE_NAME,
                                                    Type.STRING,
                                                    DIR_FILE_NAME_DEFAULT_VALUE,
                                                    Importance.HIGH,
                                                    DIR_FILE_NAME_DOC);

    public FileSinkConnectorConfig(Map<String, String> props) {
        super(CONFIG, props);
    }
}

 

FileSinkConnector.java

package org.example;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkConnector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// 토픽에서 데이터를 읽어 파일에 저장하는 커넥터 
// FileSinkConnector는 실제 커넥트에서 사용할 커넥터 이름 
public class FileSinkConnector extends SinkConnector {

    // 커넥터 설정값을 관리
    private Map<String, String> configProperties;

    @Override
    public String version() {
        // 커넥터 버전 지정
        return "1.0";
    }

    @Override
    public void start(Map<String, String> props) {
        // 커넥터 생성 시 필요한 설정 초기화 
        // FileSinkConnectorConfig 내용으로 등록 
        this.configProperties = props;
        try {
            new FileSinkConnectorConfig(props);
        } catch (ConfigException e) {
            throw new ConnectException(e.getMessage(), e);
        }
    }

    @Override
    public Class<? extends Task> taskClass() {
        // 사용할 태스크 이름 등록 
        return FileSinkTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        // 태스크 2개 이상 시 각 태스크 별 설정값 지정
        // 여기서는 동일하게 커넥터 설정값으로 지정
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        Map<String, String> taskProps = new HashMap<>();
        taskProps.putAll(configProperties);
        for (int i = 0; i < maxTasks; i++) {
            taskConfigs.add(taskProps);
        }
        return taskConfigs;
    }

    @Override
    public ConfigDef config() {
        // 커넥터에서 사용할 설정값 지정
        return FileSinkConnectorConfig.CONFIG;
    }

    @Override
    public void stop() {
        // 커넥터 종료 시 필요한 로직 작성 
    }
}

 

FileSinkTask.java

package org.example;

import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;

// 실제 sink 관련 로직 수행할 태스크 
public class FileSinkTask extends SinkTask {
    // 커넥터 설정, 저장할 파일, 파일에 쓰기 위한 writer 스트림
    private FileSinkConnectorConfig config;
    private File file;
    private FileWriter fileWriter;

    @Override
    public String version() {
        // 버전 지정 
        return "1.0";
    }

    @Override
    public void start(Map<String, String> props) {
        // 태스크 초기 설정 
        // FileSinkConnectorConfig에서 설정한 값으로 지정
        try {
            config = new FileSinkConnectorConfig(props);
            file = new File(config.getString(config.DIR_FILE_NAME));
            fileWriter = new FileWriter(file, true);
        } catch (Exception e) {
            throw new ConnectException(e.getMessage(), e);
        }

    }

    @Override
    public void put(Collection<SinkRecord> records) {
        // 토픽에서 데이터 가져오는 로직 
        // SinkRecord : 토픽의 하나의 레코드에 대한 정보를 의미 
        try {
            for (SinkRecord record : records) {
                fileWriter.write(record.value().toString() + "\n");
            }
        } catch (IOException e) {
            throw new ConnectException(e.getMessage(), e);
        }
    }

    @Override
    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
        // put으로 가져온 데이터를 실질적으로 반영하는 로직 (commit 역할)
        try {
            fileWriter.flush();
        } catch (IOException e) {
            throw new ConnectException(e.getMessage(), e);
        }
    }

    @Override
    public void stop() {
        // 태스크 종료 시 수행할 로직 
        // 여기서는 writer를 종료
        try {
            fileWriter.close();
        } catch (IOException e) {
            throw new ConnectException(e.getMessage(), e);
        }
    }
}

 

build.gradle

/*
 * This file was generated by the Gradle 'init' task.
 *
 * This generated file contains a sample Java application project to get you started.
 * For more details on building Java & JVM projects, please refer to https://docs.gradle.org/8.12/userguide/building_java_projects.html in the Gradle documentation.
 */

plugins {
    // Apply the application plugin to add support for building a CLI application in Java.
    id 'application'
    id 'com.github.johnrengelman.shadow' version '8.1.1'
    id 'java'
}

repositories {
    // Use Maven Central for resolving dependencies.
    mavenCentral()
}

dependencies {
    implementation 'org.apache.kafka:connect-api:2.5.1'
    implementation 'org.slf4j:slf4j-simple:1.7.30'
}

// Apply a specific Java toolchain to ease working on different environments.
java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(17)
    }
}

application {
    // Define the main class for the application.
    mainClass = 'org.example.FileSinkConnector'
}

shadowJar {
    archiveBaseName.set('kafka-file-sink-connector')
    archiveVersion.set('0.0.1')
    archiveClassifier.set('all')
    mergeServiceFiles()
}

 

위 소스를 마찬가지로 shadowJar로 빌드 후, ~/plugins/kafka-file-sink-connector 디렉토리로 jar 파일을 카피하였다. 

 

그리고 저장될 파일인 ~/test2.txt 파일을 미리 생성하였다. 

 

Source Connector와 마찬가지로 standalone 모드로 간단히 테스트를 위해 아래와 같이 플러그인 경로를 추가하였으며, 기본 제공되는 connect-file-sink.properties를 이용해 my-sink-connector.properties를 생성하였다. 

 

connect-standalone.properties

...
plugin.path=/Users/hobin/plugins/kafka-file-source-connector,/Users/hobin/plugins/kafka-file-sink-connector

 

my-sink-connector.properties

name=my-sink-connector
connector.class=org.example.FileSinkConnector
tasks.max=1
file=/Users/hobin/test2.txt
topics=custom-connector-test

 

생성한 properties 파일을 바탕으로 custom-connector-test 토픽에 저장된 내용을 test2.txt에 저장시키면 다음과 같다. 

./bin/connect-standalone.sh ./config/connect-test/connect-standalone.properties ./config/connect-test/my-sink-connector.properties

 

cat test2.txt
1231231
2321321
312
312
312

 

파티션 별로 저장된 레코드를 가져오므로, 기존에 입력된 순서와는 다르지만, 전체 내용을 잘 가져왔음을 알 수 있다. 

 

'메모 > kafka' 카테고리의 다른 글

Kafka Connect 개념  (0) 2025.01.12
Processor API  (0) 2025.01.12
Streams DSL  (0) 2025.01.12
kafka streams  (0) 2025.01.07
admin api  (0) 2025.01.06
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
TAG
more
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함