티스토리 뷰

메모/kafka

Processor API

4567은 소수 2025. 1. 12. 17:14

Processor API 는 Streams DSL보다는 구현할 내용이 많지만 Streams DSL에서 제공되지 않는 상세 로직 작성 시 필요하다. 

 

아래는 메시지 value 중 길이가 5 이상인 것만 필터링하여 다른 토픽에 저장하는 애플리케이션 예시이다. 

 

FilterProcessor.java

package org.example;

import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.Processor;

// stream processor 생성하기 위해 Processor or Transformer 인터페이스 적용 필요
public class FilterProcessor implements Processor<String, String> {
    // ProcessorContext : 프로세서에 대한 정보를 처리 
    // stream 처리 중인 토폴로지 토픽 정보, 애플리케이션 id 조회, 프로세싱에 필요한 메서드 (schedule, forward, commit 등) 사용 
    private ProcessorContext context;

    // stream processor 생성자 
    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    // 실질적인 프로세싱 로직 
    // forward() : 로직 처리 후, 다음 토폴로지로 넘어가도록 함
    // 로직 처리 완료 후, commit() 으로 데이터 처리를 선언
    @Override
    public void process(String key, String value) {
        if(value.length() >= 5) {
            context.forward(key, value);
        }
        context.commit();
    }

    // 컨텍스트 종료 전 호출되는 메서드 
    // 프로세싱을 위해 사용했던 리소스 해제 등을 수행 
    @Override 
    public void close() {

    }
}

 

스트림 프로세서 class를 우선 정의해야 한다. 스트림 프로세서 생성을 위해서는 Processor나 Transformer 인터페이스를 적용하면 된다. 

 

여기서는 Processor 인터페이스를 사용해 토픽의 메시지 중 value 길이가 5이상인 것만 다른 토픽에 저장하도록 한다. 

 

주의할 점은 컨텍스트에서 처리된 내용을 커밋해야 데이터 저장 및 부모 노드에 처리 완료 커밋이 반영이 된다.

 

SimpleKafkaProcessor.java

package org.example;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;

public class SimpleKafkaProcessor {

    private static String APPLICATION_NAME = "processor-application";
    private static String BOOTSTRAP_SERVER = "localhost:9092";
    private static String STREAM_LOG = "stream_log";
    private static String STREAM_LOG_FILTER = "stream_log_filter";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        Topology topology = new Topology(); // processor api 사용한 토폴로지 구성
        topology.addSource("Source", STREAM_LOG) // 소스 프로세서 가져옴. param[0] : 소스 프로세서 이름, param[1] : 대상 토픽
                .addProcessor("Process", 
                            () -> new FilterProcessor(),
                            "Source") // 스트림 프로세서 사용. param[0] : 스트림 프로세서 이름, param[1] : 사용자 정의 프로세서 인스턴스, param[2] : 토폴로지 상 부모 노드 
                .addSink("Sink", STREAM_LOG_FILTER, "Process"); // 데이터 저장. param[0] : 싱크 프로세서 이름, param[1] : 저장할 토픽 이름, param[2] : 토폴로지 상 부모 노드

        KafkaStreams stream = new KafkaStreams(topology, props);
        stream.start();
    }
}

 

Processor API 사용을 위해서는 토폴로지를 직접 구성해야 한다. 

- addSource : 소스 프로세서를 지정한다. 여기서는 stream_log 토픽을 소스로 지정한다.

첫 번째 파라미터인 "Source"는 소스 프로세서로 사용할 프로세서 이름을 지정

두 번째 파라미터는 실제 소스를 담당할 토픽을 지정

- addProcessor : 스트림 프로세서를 지정한다. 여기서는 필터링을 위해 선언한 FilterProcessor class를 스트림 프로세서로 사용한다. 

첫 번째 파라미터는 스트림 프로세서 이름 지정,

두 번째 파라미터는 스트림 프로세서로 사용할 사용자 정의 프로세서 인스턴스,

세 번째 파라미터는 스트림 처리에 사용될 토폴로지 상 부모 노드의 이름 (여기서는 소스 프로세서) 이 들어간다.

- addSink : 싱크 프로세서를 지정한다. 여기서는 필터링된 내용을 stream_log_filter 토픽에 저장하는 역할을 한다. 

 첫 번째 파라미터는 싱크 프로세서 이름 지정

두 번째 파라미터는 싱크될 토픽 지정

세 번째 파라미터는 토폴로지 상 부모 노드를 지정한다. 

 

아래와 같이 메시지를 생성하면 다음 결과를 얻는다. 

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic stream_log
>123
>1234
>12345
>123456
>1234567
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stream_log_filter --from-beginning
123456
12345
1234567

 

Processor API로 정상적으로 필터링되어 저장됨을 알 수 있다.

 

'메모 > kafka' 카테고리의 다른 글

kafka source connector, sink connector  (0) 2025.01.18
Kafka Connect 개념  (0) 2025.01.12
Streams DSL  (0) 2025.01.12
kafka streams  (0) 2025.01.07
admin api  (0) 2025.01.06
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
TAG
more
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함