티스토리 뷰
Processor API 는 Streams DSL보다는 구현할 내용이 많지만 Streams DSL에서 제공되지 않는 상세 로직 작성 시 필요하다.
아래는 메시지 value 중 길이가 5 이상인 것만 필터링하여 다른 토픽에 저장하는 애플리케이션 예시이다.
FilterProcessor.java
package org.example;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.Processor;
// stream processor 생성하기 위해 Processor or Transformer 인터페이스 적용 필요
public class FilterProcessor implements Processor<String, String> {
// ProcessorContext : 프로세서에 대한 정보를 처리
// stream 처리 중인 토폴로지 토픽 정보, 애플리케이션 id 조회, 프로세싱에 필요한 메서드 (schedule, forward, commit 등) 사용
private ProcessorContext context;
// stream processor 생성자
@Override
public void init(ProcessorContext context) {
this.context = context;
}
// 실질적인 프로세싱 로직
// forward() : 로직 처리 후, 다음 토폴로지로 넘어가도록 함
// 로직 처리 완료 후, commit() 으로 데이터 처리를 선언
@Override
public void process(String key, String value) {
if(value.length() >= 5) {
context.forward(key, value);
}
context.commit();
}
// 컨텍스트 종료 전 호출되는 메서드
// 프로세싱을 위해 사용했던 리소스 해제 등을 수행
@Override
public void close() {
}
}
스트림 프로세서 class를 우선 정의해야 한다. 스트림 프로세서 생성을 위해서는 Processor나 Transformer 인터페이스를 적용하면 된다.
여기서는 Processor 인터페이스를 사용해 토픽의 메시지 중 value 길이가 5이상인 것만 다른 토픽에 저장하도록 한다.
주의할 점은 컨텍스트에서 처리된 내용을 커밋해야 데이터 저장 및 부모 노드에 처리 완료 커밋이 반영이 된다.
SimpleKafkaProcessor.java
package org.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import java.util.Properties;
public class SimpleKafkaProcessor {
private static String APPLICATION_NAME = "processor-application";
private static String BOOTSTRAP_SERVER = "localhost:9092";
private static String STREAM_LOG = "stream_log";
private static String STREAM_LOG_FILTER = "stream_log_filter";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
Topology topology = new Topology(); // processor api 사용한 토폴로지 구성
topology.addSource("Source", STREAM_LOG) // 소스 프로세서 가져옴. param[0] : 소스 프로세서 이름, param[1] : 대상 토픽
.addProcessor("Process",
() -> new FilterProcessor(),
"Source") // 스트림 프로세서 사용. param[0] : 스트림 프로세서 이름, param[1] : 사용자 정의 프로세서 인스턴스, param[2] : 토폴로지 상 부모 노드
.addSink("Sink", STREAM_LOG_FILTER, "Process"); // 데이터 저장. param[0] : 싱크 프로세서 이름, param[1] : 저장할 토픽 이름, param[2] : 토폴로지 상 부모 노드
KafkaStreams stream = new KafkaStreams(topology, props);
stream.start();
}
}
Processor API 사용을 위해서는 토폴로지를 직접 구성해야 한다.
- addSource : 소스 프로세서를 지정한다. 여기서는 stream_log 토픽을 소스로 지정한다.
첫 번째 파라미터인 "Source"는 소스 프로세서로 사용할 프로세서 이름을 지정
두 번째 파라미터는 실제 소스를 담당할 토픽을 지정
- addProcessor : 스트림 프로세서를 지정한다. 여기서는 필터링을 위해 선언한 FilterProcessor class를 스트림 프로세서로 사용한다.
첫 번째 파라미터는 스트림 프로세서 이름 지정,
두 번째 파라미터는 스트림 프로세서로 사용할 사용자 정의 프로세서 인스턴스,
세 번째 파라미터는 스트림 처리에 사용될 토폴로지 상 부모 노드의 이름 (여기서는 소스 프로세서) 이 들어간다.
- addSink : 싱크 프로세서를 지정한다. 여기서는 필터링된 내용을 stream_log_filter 토픽에 저장하는 역할을 한다.
첫 번째 파라미터는 싱크 프로세서 이름 지정
두 번째 파라미터는 싱크될 토픽 지정
세 번째 파라미터는 토폴로지 상 부모 노드를 지정한다.
아래와 같이 메시지를 생성하면 다음 결과를 얻는다.
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic stream_log
>123
>1234
>12345
>123456
>1234567
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stream_log_filter --from-beginning
123456
12345
1234567
Processor API로 정상적으로 필터링되어 저장됨을 알 수 있다.
'메모 > kafka' 카테고리의 다른 글
kafka source connector, sink connector (0) | 2025.01.18 |
---|---|
Kafka Connect 개념 (0) | 2025.01.12 |
Streams DSL (0) | 2025.01.12 |
kafka streams (0) | 2025.01.07 |
admin api (0) | 2025.01.06 |