티스토리 뷰
kafka streams : 토픽의 데이터를 stateful, stateless로 실시간 변환하여 다른 토픽에 적재하는 라이브러리
- java 기반 streams 애플리케이션은 kafka 클러스터와 완벽하게 호환되며 (kafka에서 공식 제공하기 때문) 스트림 처리에 필요한 기능 (신규 토픽 생성, 상태 저장, 데이터 조인 등) 을 제공
- kafka 클러스터 운영 시, 실시간 스트림 처리를 해야 하는 경우, streams 애플리케이션으로 개발하는 것이 1순위
streams 애플리케이션은 내부적으로 스레드를 1개 이상 생성하며, 스레드는 1개 이상의 태스크를 가짐
태스크(task) : streams 애플리케이션 실행 시 생기는 데이터 처리 최소 단위
- streams는 병렬 처리를 위해 파티션과 streams 스레드 / 프로세스 개수를 늘림으로써 처리량을 늘릴 수 있다.
- 실 운영 환경에서는 fault tolerance하기 위해 2개 이상의 서버로 구성된 streams 애플리케이션을 운영하는 것이 좋다.
토폴로지 (topology) : kafka streams의 노드의 구성 관계
프로세서 (processor) : 토폴로지를 이루는 노드
- 소스 프로세서 : 데이터 처리를 위해 최초로 선언하는 노드. 하나 이상의 토픽에서 데이터를 가져오는 역할.
- 스트림 프로세서 : 다른 프로세서가 반환한 데이터를 처리하는 역할. 변환, 분기처리 등 로직을 수행.
- 싱크 프로세서 : 데이터를 특정 카프카 토픽으로 저장하는 역할. streams로 처리된 데이터의 최종 종착지.
스트림 (stream) : 노드와 노드의 연결. 레코드와 동일 의미.
streams DSL에서 제공하지 않는 기능은 프로세서 API를 이용해 구현이 가능하다.
streams DSL로 구현하는 예시
- 메시지 값을 기반으로 토픽 분기 처리
- 지난 10분 간 들어온 데이터의 개수 집계
- 토픽과 다른 토픽의 결합으로 새로운 데이터 생성
프로세서 API로 구현하는 예시
- 메시지 값 종류에 따라 토픽을 가변적으로 전송
- 일정 시간 간격으로 데이터 처리
(참고 도서 : https://product.kyobobook.co.kr/detail/S000001804837)
streams DSL
streams DSL에는 레코드의 흐름을 추상화한 3가지 개념이 존재 (컨슈머, 프로듀서, 프로세서 API에는 사용되지 않는 개념)
- KStream, KTable, GlobalKTable
KStream
- 레코드의 흐름을 표현한 것
- 메시지 키와 값으로 구성
- KStream으로 데이터 조회 시, 토픽에 존재하는 모든 레코드가 출력
- 컨슈머로 토픽을 구독하는 것과 유사
KTable
- 메시지 키를 기준으로 묶어서 사용
- 유니크한 메시지 키를 기준으로, 가장 최신 레코드를 사용하므로, 데이터 조회 시, 메시지 키를 기준으로 가장 최신에 추가된 레코드의 데이터가 출력
- KTable로 선언된 토픽은 1개의 파티션이 1개의 태스크에 할당
GlobalKTable
- 메시지 키를 기준으로 묵어서 사용
- GlobalKTable로 선언된 토픽은 모든 파티션이 각 태스크에 할당
- GlobalKTable 사용 예시 : KStream, KTable join
KStream, KTable join 과정
KStream, KTable을 join하려면 반드시 코파티셔닝(co-partitioning) 되어 있어야 함
코파티셔닝 : 조인을 하는 2개의 토픽의 파티션 개수, 파티셔닝 전략을 동일하게 맞추는 작업
- 파티션 개수가 동일, 파티셔닝 전략이 같은 경우 : 동일한 메시지 키를 가진 데이터가 동일 태스크에 들어가는 것을 보장함
- 각 태스크는 KStream의 레코드와 KTable의 메시지 키가 동일한 경우, 조인을 수행할 수 있음
- 하지만, 토픽들이 코파티셔닝되어 있음을 보장할 수 없음 (코파티셔닝 되지 않은 2개의 토픽을 조인하면, TopologyException 에러 발생)
따라서, 코파티셔닝되지 않은 KStream, KTable 을 조인하려면, 리파티셔닝(re-partitioning) 이 필요
리파티셔닝 : 새로운 토픽에 새로운 메시지 키를 가지도록 재배열하는 과정
- 리파티셔닝 진행 후, KStream, KTable 로 사용하는 토픽이 코파티셔닝되도록 할 수 있음
- 하지만, 리파티셔닝 진행 시, 데이터 중복 생성 및 파티션 재배열을 위한 리소스가 필요함
코파티셔닝되지 않은 KStream과 KTable을 조인하고 싶은 경우, GlobalKTable을 사용할 수 있음
- KTable을 GlobalKTable로 선언하여 사용하면 됨
- GlobalKTable로 정의된 토픽은 모든 태스크에 동일하게 공유되기 때문
- 하지만, 각 태스크마다 GlobalKTable로 정의된 데이터를 저장하여 사용하기 때문에, 로컬 스토리지 사용량 및 네트워크, 브로커 부하가 발생함
- 따라서, 적은 데이터 양일 경우만, GlobalKTable을 이용해 조인하는 것이 좋음
- 많은 데이터 양인 경우, KStream, KTable에 대해 리파티셔닝을 직접 진행하는 것이 좋음
streams DSL 주요 옵션
필수 옵션
- bootstrap.servers : 클러스터에 속한 브로커의 <호스트>:<포트>
- application.id : streams 애플리케이션을 구분하기 위한 고유 id. 서로 다른 애플리케이션 간에는 서로 다른 id를 가져야 함
선택 옵션
- default.key.serde : 레코드 메시지 키를 직렬화, 역직렬화하는 클래스를 지정. (default : 바이트 직렬화 : Serdes.ByteArray().getClass().getName() )
- default.value.serde : 레코드 메시지 값을 직렬화, 역직렬화하는 클래스를 지정. (default : 바이트 직렬화 : Serdes.ByteArray().getClass().getName() )
- num.stream.threads : stream 프로세싱 실행 시 실행되는 스레드 수. (default : 1)
- state.dir : rocksDB 위치할 디렉토리 지정. (default : /tmp/kafka-streams) (실 운영 시, 해당 디렉토리 외의 별도 관리 디렉토리 사용 필요.)(tmp는 OS 설정에 따라 셧다운 시, 내부 내용 삭제될 수도 있음)
(rocksDB : 페이스북에서 만든 key-value DB, streams에서 기본으로 로컬 저장소로 사용한다.)
기타 옵션 : https://kafka.apache.org/documentation/#streamsconfigs
'메모 > kafka' 카테고리의 다른 글
Processor API (0) | 2025.01.12 |
---|---|
Streams DSL (0) | 2025.01.12 |
admin api (0) | 2025.01.06 |
producer api (0) | 2025.01.04 |
kafka 기본 개념 (0) | 2025.01.04 |