사내에서 데이터 처리를 위해 pandas로 데이터 계산 및 csv 저장 등을 처리하고 있다. pandas가 이미 많이 발전된 라이브러리이지만, 기존 레거시 솔루션을 신규 프로젝트로 컨버팅하는 과정에서 병렬 처리, 비동기 요청에 대해 미지원 등의 이슈가 있어, 핵심적인 계산은 기존과 동일한 상태였다. (물론 컨버팅 과정에서 불필요한 로직 삭제, 불필요한 데이터 제거 등으로 최적화를 꽤 많이 시켰지만 아쉬움이 있는 상태였다.) 최근 kafka를 공부하다가 우연히 polars라는 pandas 대체 라이브러리를 알게 되어 간단히 테스트 결과를 정리하고자 한다. (polars 외에도 dask, modin 등의 pandas 대체 라이브러리 프로젝트가 많은 것도 알게 되었다. 하지만 기존 솔루션의 상황에 가장 적합..
Source Connector : 소스 애플리케이션, 소스 파일로부터 데이터를 가져와 토픽으로 넣는 역할 오픈소스로 제공되는 것도 많지만, 라이센스 문제, 커스텀 기능 등 구현이 필요한 경우도 많음 Kafka 커넥트 라이브러리의 SourceConnector, SourceTask 클래스 이용해 구현을 하면 된다.커넥터 빌드 후, jar 파일을 플러그인으로 추가하여 사용하면 된다. source connector 만들 때 필요한 라이브러리 : org.apache.kafka.connect-api SourceConnector class : 태스크 실행 전, 커넥터 설정 파일 초기화 및 어떤 태스크 클래스를 사용할 것인지 정의 SourceTask class : 소스 애플리케이션, 파일로부터 데이터 가져와 토픽..
카프카 커넥트 : 데이터 파이프라인 생성 시 반복 작업을 줄이고, 효율적인 전송을 위한 애플리케이션 파이프라인 구성 시, 매번 프로듀서, 컨슈머 애플리케이션을 개발하고 배포하는 것은 번거롭기 때문에 커넥트를 사용함 커넥터 (Connector) : 특정 작업 형태를 템플릿으로 만들어 둔 것. 커넥트는 커넥터를 실행함으로써 반복 작업을 줄임. 소스 커넥터 (Source Connector) : 일종의 프로듀서 역할. 특정 파일에서 데이터를 계속 읽어와 전송하거나, db에서 특정 스키마를 통해 지속적으로 읽어옴. 싱크 커넥터 (Sink Connector) : 일종의 컨슈머 역할. 특정 토픽에서 데이터를 가져와, 파일, db 등에 지속적으로 저장을 함. 미러메이커2 커넥터 (클러스터 간 토픽 미러링. S..
Processor API 는 Streams DSL보다는 구현할 내용이 많지만 Streams DSL에서 제공되지 않는 상세 로직 작성 시 필요하다. 아래는 메시지 value 중 길이가 5 이상인 것만 필터링하여 다른 토픽에 저장하는 애플리케이션 예시이다. FilterProcessor.javapackage org.example;import org.apache.kafka.streams.processor.ProcessorContext;import org.apache.kafka.streams.processor.Processor;// stream processor 생성하기 위해 Processor or Transformer 인터페이스 적용 필요public class FilterProcessor implements..
Streams DSL을 이용해 간단하게 스트림 프로세싱을 처리한다. stream_log 토픽의 데이터를 stream_log_copy 토픽으로 전달을 해본다. 특정 토픽을 KStream 객체로 가져오려면 stream() 메서드 (source 프로세서)를 사용하고, KStream 객체의 데이터를 특정 토픽에 저장하려면 to() 메서드 (sync 프로세서)를 사용한다. 그리고 실제 streams application 실행은 start() 메서드를 사용한다. 우선 stream_log 토픽을 생성하고, 임의로 데이터를 몇 개 넣는다. streams application을 실행하기 전, source 프로세서에서 사용하는 토픽은 미리 생성을 해야 한다. ./bin/kafka-topics.sh --create --..