티스토리 뷰
Streams DSL을 이용해 간단하게 스트림 프로세싱을 처리한다.
stream_log 토픽의 데이터를 stream_log_copy 토픽으로 전달을 해본다.
특정 토픽을 KStream 객체로 가져오려면 stream() 메서드 (source 프로세서)를 사용하고, KStream 객체의 데이터를 특정 토픽에 저장하려면 to() 메서드 (sync 프로세서)를 사용한다. 그리고 실제 streams application 실행은 start() 메서드를 사용한다.
우선 stream_log 토픽을 생성하고, 임의로 데이터를 몇 개 넣는다.
streams application을 실행하기 전, source 프로세서에서 사용하는 토픽은 미리 생성을 해야 한다.
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic stream_log --partitions 3
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic stream_log --property parse.key=true
>k1 v1
>k2 v2
>k3 v3
>k1 v11
>k2 v22
>k3 v33
SimpleStreamApplication.java
package org.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class SimpleStreamApplication {
private static String APPLICATION_NAME = "streams-application";
private static String BOOTSTRAP_SERVER = "localhost:9092";
private static String STREAM_LOG = "stream_log";
private static String STREAM_LOG_COPY = "stream_log_copy";
public static void main(String[] args) {
Properties props = new Properties();
// streams application id 등록
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
// streams application과 연동할 클러스터 서버
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
// streams application에서 사용할 메시지 key, value 직렬화 종류
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// streams application에 사용할 topology 정의
StreamsBuilder builder = new StreamsBuilder();
// stream() 메서드로 KStream 객체 생성
KStream<String, String> stream = builder.stream(STREAM_LOG);
// to() 메서드로 메시지 보낼 토픽 지정
stream.to(STREAM_LOG_COPY);
// streams application 인스턴스 생성
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// streams application 실행
streams.start();
}
}
streams application은 application id 값을 기준으로 병렬처리하기 때문에, 별도 애플리케이션 실행을 위해서는 다른 id를 지정해야 한다.
위 코드를 실행 후, 아래와 같이 topic list를 확인하면, 기존에 생성하지 않았던 stream_log_copy 토픽이 생성되어 있다.
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
__consumer_offsets
stream_log
stream_log_copy
test
stream_log_copy의 데이터를 조회하면 아래와 같이 기존 stream_log 토픽의 데이터가 정상적으로 옮겨졌음을 알 수 있다.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stream_log_copy --from-beginning
v2
v3
v1
v22
v33
v11
그리고 아래와 같이 추가적인 메시지를 생성하면, 생성하는데로 streams application에 의해 stream_log_copy 토픽으로 메시지를 전송한다.
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic stream_log --property parse.key=true
>123 123
>123 1234
>123 12345
>123 123456
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stream_log_copy --from-beginning
v2
v3
v1
v22
v33
v11
123
1234
12345
123456
streams application을 종료하고 동일하게 stream_log 토픽에 메시지를 생성하면 아래와 같이 stream_log_copy에는 반영되지 않음을 알 수 있다. 따라서 KStream을 이용한 streams application이 정상 동작함을 알 수 있다.
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic stream_log
>qwe
>qwe
>qwe
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stream_log_copy --from-beginning
v2
v3
v1
v22
v33
v11
123
1234
12345
123456
filter() 메서드를 이용해 특정 조건을 만족하는 메시지만 stream 처리를 할 수 있다.
filter() 메서드는 스트림 프로세서를 담당하게 되고, 파라미터로 함수형 인터페이스 Predicate를 받으면 된다.
package org.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class SimpleStreamApplication {
private static String APPLICATION_NAME = "streams-filter-application";
private static String BOOTSTRAP_SERVER = "localhost:9092";
private static String STREAM_LOG = "stream_log";
private static String STREAM_LOG_COPY = "stream_log_filter";
public static void main(String[] args) {
Properties props = new Properties();
// streams application id 등록
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
// streams application과 연동할 클러스터 서버
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
// streams application에서 사용할 메시지 key, value 직렬화 종류
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// streams application에 사용할 topology 정의
StreamsBuilder builder = new StreamsBuilder();
// stream() 메서드로 KStream 객체 생성 (소스 프로세서)
KStream<String, String> stream = builder.stream(STREAM_LOG);
// 메시지 value 길이 5 이상인 것만 필터링 (스트림 프로세서)
KStream<String, String> filteredStream = stream.filter((k, v) -> {
return v.length() >= 5;
});
// to() 메서드로 메시지 보낼 토픽 지정 (싱크 프로세서)
// filteredStream = stream.filter(...).to(...) 로 간소화 가능
filteredStream.to(STREAM_LOG_COPY);
// streams application 인스턴스 생성
KafkaStreams streamApplication = new KafkaStreams(builder.build(), props);
// streams application 실행
streamApplication.start();
}
}
stream_log 토픽의 레코드 중 value 길이가 5 이상인 것만 필터링하여 stream_log_filter 토픽에 저장한다.
위 코드 실행 후, 아래와 같이 stream_log_filter 토픽을 확인하면, 기존의 메시지 중 value 길이 5이상인 메시지만 저장되어 있다.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stream_log_filter --from-beginning
12345
123456
그리고 아래와 같이 추가 메시지를 보내면 길이 5 이상인 메시지만 KStream으로 정상적으로 처리됨을 알 수 있다.
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic stream_log
>jkdfsnjkfnjkfdjknfgjkgnfjkfgjndk
>gfjdkgfjdkgjnfkdnjdfgjkgdfjnkgfdnjk
>...
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic stream_log_filter --from-beginning
12345
123456
jkdfsnjkfnjkfdjknfgjkgnfjkfgjndk
gfjdkgfjdkgjnfkdnjdfgjkgdfjnkgfdnjk
KStream, KTable join 하기
KStream과 KTable은 메시지의 key를 기준으로 메시지를 join한다. join을 위해 별도 db에 저장 후, 처리하는 방식이 아닌, 스트림을 통해 실시간으로 join된 데이터를 생성한다. 이를 통해 이벤트 기반 스트리밍 데이터 파이프라인을 구축한다.
join 시 중요한 것은 KStream과 KTable이 코파티셔닝 되어 있어야 한다. 즉, 동일한 직렬화 방식, 동일 파티션 개수, 동일 파티셔닝 전략 (키 해싱 방법)을 취해야 한다.
KTable은 table() 메서드로 객체를 생성할 수 있다.
주의할 점은 KTable은 동일 키 중, 가장 최신 레코드의 value를 기준으로 join을 한다.
address를 저장하고 있는 토픽을 KTable로 사용하고, item을 저장하고 있는 토픽을 KStream으로 사용하자.
(address는 잘 안 변하고, item은 자주 변하는 항목이므로)
package org.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import java.util.Properties;
public class SimpleStreamApplication {
private static String APPLICATION_NAME = "kstream-ktable-join-application";
private static String BOOTSTRAP_SERVER = "localhost:9092";
private static String KTABLE_ADDRESS = "address";
private static String KSTREAM_ITEM = "item";
private static String JOIN_STREAM = "join_stream";
public static void main(String[] args) {
Properties props = new Properties();
// streams application id 등록
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
// streams application과 연동할 클러스터 서버
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
// streams application에서 사용할 메시지 key, value 직렬화 종류
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// streams application에 사용할 topology 정의
StreamsBuilder builder = new StreamsBuilder();
// stream() 메서드로 KStream 객체 생성
// table() 메서드로 KTable 객체 생성
KStream<String, String> kstream = builder.stream(KSTREAM_ITEM);
KTable<String, String> ktable = builder.table(KTABLE_ADDRESS);
// join() 메서드로 join할 테이블 및 join된 메시지 지정 (key 기준 join)
KStream<String, String> joinedStream = kstream.join(ktable, (kstream_value, ktable_value) -> kstream_value + " send to " + ktable_value);
// to() 메서드로 메시지 보낼 토픽 지정
joinedStream.to(JOIN_STREAM);
// streams application 인스턴스 생성
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// streams application 실행
streams.start();
}
}
그리고 KStream 객체는 불변성을 가지기 때문에 to() 메서드를 바로 사용하는 것이 아니라면, join()에 대한 추가 객체를 만들어 처리해야 한다.
(이것 때문에 왜 안 되지 싶었다.....)
먼저 address, item, join_stream 토픽을 코파티셔닝하게 생성하자. 파티션 개수는 3개로 지정하고, 나머지는 동일한 기본 옵션을 적용한다.
./bin/kafka-topics --create --bootstrap-server localhost:9092 --topic address --partitions 3
./bin/kafka-topics --create --bootstrap-server localhost:9092 --topic item --partitions 3
./bin/kafka-topics --create --bootstrap-server localhost:9092 --topic join_stream --partitions 3
아래와 같이 메시지를 생성하자.
address topic (KTable)
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic address --property "parse.key=true" --property "key.separator=:"
>name1:address1
>name1:address1-1
>name2:address2
>name2:address2-1
>name3:address3
item topic (KStream)
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic item --property "parse.key=true" --property "key.separator=:"
>name1:item1
>name1:item1-1
>name2:item2
애플리케이션 실행 후, 아래와 같이 join_stream 토픽을 확인한다.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic join_stream --property "parse.key=true" --property
"key.separator=:" --from-beginning
item1 send to address1-1
item1-1 send to address1-1
item2 send to address2-1
name1, name2가 공통으로 가진 key이므로, name3에 대한 join은 발생하지 않는다. 그리고, KTable의 동일 key 중 가장 최신 레코드의 value인 address1-1, address2-1과 KStream의 모든 value가 join되었다.
KStream, GlobalKTable join하기
코파티셔닝되어 있지 않은 토픽을 join하는 경우,
- KTable 대신 GlobalKTable을 이용해 join
- KTable 대상 토픽을 리파티셔닝 (파티션 개수 증가 등)
위 2가지 방식을 사용할 수 있다.
GlobalKTable과의 join 테스트를 위해 파티션 2개인 global_address 토픽을 생성한다.
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic global_address --partitions 2
application code
package org.example;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class SimpleStreamApplication {
private static String APPLICATION_NAME = "kstream-global-ktable-join-application";
private static String BOOTSTRAP_SERVER = "localhost:9092";
private static String GLOBAL_KTABLE_ADDRESS = "global_address";
private static String KSTREAM_ITEM = "item";
private static String JOIN_STREAM = "join_stream";
public static void main(String[] args) {
Properties props = new Properties();
// streams application id 등록
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
// streams application과 연동할 클러스터 서버
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
// streams application에서 사용할 메시지 key, value 직렬화 종류
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// streams application에 사용할 topology 정의
StreamsBuilder builder = new StreamsBuilder();
// stream() 메서드로 KStream 객체 생성
// table() 메서드로 KTable 객체 생성
KStream<String, String> kstream = builder.stream(KSTREAM_ITEM);
GlobalKTable<String, String> globalKtable = builder.globalTable(GLOBAL_KTABLE_ADDRESS);
// join() 메서드로 join할 테이블 및 join된 메시지 지정
// (key, value) -> key : KStream의 key, value 중 무엇을 join 할 key로 사용할 지 결정
// value 선택 시, GlobalKTable의 메시지 중 key가 KStream 메시지의 value와 일치하는 값을 대상으로 join
// 그리고 join된 key는 해당 value
KStream<String, String> joinedStream = kstream.join(globalKtable,
(key, value) -> key,
(kstreamValue, globalKtableValue) -> kstreamValue + " send to " + globalKtableValue
);
// to() 메서드로 메시지 보낼 토픽 지정
joinedStream.to(JOIN_STREAM);
// streams application 인스턴스 생성
KafkaStreams streams = new KafkaStreams(builder.build(), props);
// streams application 실행
streams.start();
}
}
GlobalKTable과 join하는 경우, KStream 메시지의 key, value 중 하나를 join할 key로 사용할 수 있다.
여기서는 key가 일치하는 경우로 선택을 하였다.
다만, GlobalKTable은 모든 레코드를 대상으로 일치성을 판단하기 때문에 join할 레코드가 적은 경우에만 사용하는 것이 권장된다.
아래와 같이 새로운 address, item을 생성한다.
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic global_address --property "parse.key=true" --property "key.separator=:"
>new_name1:new_address1
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic item --property "parse.key=true" --property "key.separator=:"
>new_name1:new_item1
join_stream에서 아래와 같이 결과를 확인할 수 있다.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic join_stream --from-beginning
...
new_item1 send to new_address1
코파티셔닝되어 있지 않지만, 정상적으로 join이 됨을 알 수 있다.
'메모 > kafka' 카테고리의 다른 글
Kafka Connect 개념 (0) | 2025.01.12 |
---|---|
Processor API (0) | 2025.01.12 |
kafka streams (0) | 2025.01.07 |
admin api (0) | 2025.01.06 |
producer api (0) | 2025.01.04 |