티스토리 뷰

메모/kafka

kafka streams

4567은 소수 2025. 1. 7. 00:25

kafka streams : 토픽의 데이터를 stateful, stateless로 실시간 변환하여 다른 토픽에 적재하는 라이브러리 

- java 기반 streams 애플리케이션은 kafka 클러스터와 완벽하게 호환되며 (kafka에서 공식 제공하기 때문) 스트림 처리에 필요한 기능 (신규 토픽 생성, 상태 저장, 데이터 조인 등) 을 제공

- kafka 클러스터 운영 시, 실시간 스트림 처리를 해야 하는 경우, streams 애플리케이션으로 개발하는 것이 1순위 

 

streams 애플리케이션은 내부적으로 스레드를 1개 이상 생성하며, 스레드는 1개 이상의 태스크를 가짐 

태스크(task) : streams 애플리케이션 실행 시 생기는 데이터 처리 최소 단위

- streams는 병렬 처리를 위해 파티션과 streams 스레드 / 프로세스 개수를 늘림으로써 처리량을 늘릴 수 있다. 

- 실 운영 환경에서는 fault tolerance하기 위해 2개 이상의 서버로 구성된 streams 애플리케이션을 운영하는 것이 좋다. 

 

토폴로지 (topology) : kafka streams의 노드의 구성 관계 

프로세서 (processor) : 토폴로지를 이루는 노드 

- 소스 프로세서 : 데이터 처리를 위해 최초로 선언하는 노드. 하나 이상의 토픽에서 데이터를 가져오는 역할. 

- 스트림 프로세서 : 다른 프로세서가 반환한 데이터를 처리하는 역할. 변환, 분기처리 등 로직을 수행. 

- 싱크 프로세서 : 데이터를 특정 카프카 토픽으로 저장하는 역할. streams로 처리된 데이터의 최종 종착지. 

스트림 (stream) : 노드와 노드의 연결. 레코드와 동일 의미. 

 

streams DSL에서 제공하지 않는 기능은 프로세서 API를 이용해 구현이 가능하다. 

 

streams DSL로 구현하는 예시 

- 메시지 값을 기반으로 토픽 분기 처리 

- 지난 10분 간 들어온 데이터의 개수 집계

- 토픽과 다른 토픽의 결합으로 새로운 데이터 생성

 

프로세서 API로 구현하는 예시 

- 메시지 값 종류에 따라 토픽을 가변적으로 전송

- 일정 시간 간격으로 데이터 처리 

 

(참고 도서 : https://product.kyobobook.co.kr/detail/S000001804837)


streams DSL

 

streams DSL에는 레코드의 흐름을 추상화한 3가지 개념이 존재 (컨슈머, 프로듀서, 프로세서 API에는 사용되지 않는 개념) 

- KStream, KTable, GlobalKTable

 

KStream

- 레코드의 흐름을 표현한 것 

- 메시지 키와 값으로 구성 

- KStream으로 데이터 조회 시, 토픽에 존재하는 모든 레코드가 출력 

- 컨슈머로 토픽을 구독하는 것과 유사 

 

KTable

- 메시지 키를 기준으로 묶어서 사용

- 유니크한 메시지 키를 기준으로, 가장 최신 레코드를 사용하므로, 데이터 조회 시, 메시지 키를 기준으로 가장 최신에 추가된 레코드의 데이터가 출력 

- KTable로 선언된 토픽은 1개의 파티션이 1개의 태스크에 할당

 

GlobalKTable

- 메시지 키를 기준으로 묵어서 사용 

- GlobalKTable로 선언된 토픽은 모든 파티션이 각 태스크에 할당

- GlobalKTable 사용 예시 : KStream, KTable join 


KStream, KTable join 과정 

 

KStream, KTable을 join하려면 반드시 코파티셔닝(co-partitioning) 되어 있어야 함 

 

코파티셔닝 : 조인을 하는 2개의 토픽의 파티션 개수, 파티셔닝 전략을 동일하게 맞추는 작업 

- 파티션 개수가 동일, 파티셔닝 전략이 같은 경우 : 동일한 메시지 키를 가진 데이터가 동일 태스크에 들어가는 것을 보장함

- 각 태스크는 KStream의 레코드와 KTable의 메시지 키가 동일한 경우, 조인을 수행할 수 있음 

- 하지만, 토픽들이 코파티셔닝되어 있음을 보장할 수 없음 (코파티셔닝 되지 않은 2개의 토픽을 조인하면, TopologyException 에러 발생) 

 

따라서, 코파티셔닝되지 않은 KStream, KTable 을 조인하려면, 리파티셔닝(re-partitioning) 이 필요 

리파티셔닝 : 새로운 토픽에 새로운 메시지 키를 가지도록 재배열하는 과정 

- 리파티셔닝 진행 후, KStream, KTable 로 사용하는 토픽이 코파티셔닝되도록 할 수 있음 

- 하지만, 리파티셔닝 진행 시, 데이터 중복 생성 및 파티션 재배열을 위한 리소스가 필요함 

 

코파티셔닝되지 않은 KStream과 KTable을 조인하고 싶은 경우, GlobalKTable을 사용할 수 있음 

- KTable을 GlobalKTable로 선언하여 사용하면 됨

- GlobalKTable로 정의된 토픽은 모든 태스크에 동일하게 공유되기 때문

- 하지만, 각 태스크마다 GlobalKTable로 정의된 데이터를 저장하여 사용하기 때문에, 로컬 스토리지 사용량 및 네트워크, 브로커 부하가 발생함

- 따라서, 적은 데이터 양일 경우만, GlobalKTable을 이용해 조인하는 것이 좋음

- 많은 데이터 양인 경우, KStream, KTable에 대해 리파티셔닝을 직접 진행하는 것이 좋음 


streams DSL 주요 옵션

 

필수 옵션

- bootstrap.servers : 클러스터에 속한 브로커의 <호스트>:<포트>

- application.id : streams 애플리케이션을 구분하기 위한 고유 id. 서로 다른 애플리케이션 간에는 서로 다른 id를 가져야 함

 

선택 옵션

- default.key.serde : 레코드 메시지 키를 직렬화, 역직렬화하는 클래스를 지정. (default : 바이트 직렬화 : Serdes.ByteArray().getClass().getName() )

- default.value.serde : 레코드 메시지 값을 직렬화, 역직렬화하는 클래스를 지정. (default : 바이트 직렬화 : Serdes.ByteArray().getClass().getName() )

- num.stream.threads : stream 프로세싱 실행 시 실행되는 스레드 수. (default : 1) 

- state.dir : rocksDB 위치할 디렉토리 지정. (default : /tmp/kafka-streams) (실 운영 시, 해당 디렉토리 외의 별도 관리 디렉토리 사용 필요.)(tmp는 OS 설정에 따라 셧다운 시, 내부 내용 삭제될 수도 있음)

(rocksDB : 페이스북에서 만든 key-value DB, streams에서 기본으로 로컬 저장소로 사용한다.)

 

기타 옵션 : https://kafka.apache.org/documentation/#streamsconfigs

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org


 

 

'메모 > kafka' 카테고리의 다른 글

Processor API  (0) 2025.01.12
Streams DSL  (0) 2025.01.12
admin api  (0) 2025.01.06
producer api  (0) 2025.01.04
kafka 기본 개념  (0) 2025.01.04
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
TAG
more
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함