티스토리 뷰
Spring Kafka : 카프카를 스프링 프레임워크에서 효과적으로 사용할 수 있도록 만든 라이브러리
(ex. 컨슈머 멀티 스레드 환경 운영 시, concurrency 옵션 추가 등)
spring kafka와 kafka client 버전은 아래와 같이 호환됨
spring kafka 2.A.x <-> kafka client 2.A.y
최신 버전은 아래 페이지 참고 (kafka 3.x)
https://spring.io/projects/spring-kafka
Spring for Apache Kafka
The Spring for Apache Kafka (spring-kafka) project applies core Spring concepts to the development of Kafka-based messaging solutions. It provides a "template" as a high-level abstraction for sending messages. It also provides support for Message-driven PO
spring.io
spring kafka 사용을 위해 build.gradle에 아래와 같이 dependencies 지정하였다.
dependencies {
implementation 'org.springframework.kafka:spring-kafka:2.5.10.RELEASE'
implementation 'org.springframework.boot:spring-boot-starter:2.5.0'
}
spring kafka는 admin, consumer, producer, streams 기능을 제공한다.
spring kafka producer : kafka template이라는 클래스를 사용하여 데이터를 전송할 수 있다.
kafka template : ProducerFactory 클래스를 통해 생성할 수 있다.
- spring kafka에서 제공하는 기본 템플릿을 사용하거나
- 직접 ProducerFactory를 이용해 템플릿을 생성하거나
기본 템플릿을 적용한 spring kafka application : application.yaml에 프로듀서 옵션을 넣고 사용할 수 있다.
application.yaml에 설정한 프로듀서 옵션값은 애플리케이션이 실행될 때, 자동으로 오버라이딩된다.
application.yaml에 설정할 수 있는 프로듀서 옵션값
- spring.kafka.producer.akcs
- spring.kafka.producer.batch-size
- spring.kafka.producer.bootstrap-servers
- spring.kafka.producer.buffer-memory
- spring.kafka.producer.client-id
- spring.kafka.producer.compression-type
- spring.kafka.producer.key-serializer
- spring.kafka.producer.properties.*
- spring.kafka.producer.retries
- spring.kafka.producer.transaction-id-prefix
- spring.kafka.producer.value-serializer
kafka client 사용 시, ./bin/kafka-topics.sh --bootstrap-server <broker ip:port> 와 같이 기본 옵션값들이 필요하지만, spring kafka에서는 아래 kafka client 필수 옵션에 대해 기본값을 매핑하여 실행하므로 필수 옵션값이 없다.
- boostrap-servers : localhost:9092
- key-serializer, value-serializer : StringSerializer
application.yaml은 resources/application.yaml에 아래와 같이 적용할 수 있다.
(bootstrap server : localhost:909, acks: all 명시한 예시)
spring:
kafka:
producer:
bootstrap-servers: localhost:9092
acks: all
간단히 spring-test 토픽에 test0 ~ test9까지의 메시지를 전송하는 프로듀서 애플리케이션은 아래와 같다.
package org.example;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootApplication
public class SpringKafkaProducer implements CommandLineRunner {
// test용 토픽
private static String TOPIC_NAME = "spring-test";
// spring에서 제공하는 KafkaTemplate 사용
@Autowired
private KafkaTemplate<Integer, String> template;
public static void main(String[] args) {
// spring 실행
SpringApplication application = new SpringApplication(SpringKafkaProducer.class);
application.run(args);
}
// CommandLineRunner의 run() override
@Override
public void run(String... args) {
for(int i = 0; i < 10; i++) {
template.send(TOPIC_NAME, "test" + i);
}
System.exit(0);
}
}
빌드 후 실행 시 아래와 같이 정상적으로 메시지가 전송됨을 알 수 있다.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic spring-test --from-beginning
test0
test1
test2
test3
test4
test5
test6
test7
test8
test9
커스텀 템플릿을 이용한 spring kafka producer
ProducerFactory를 이용해 아래와 같이 직접 옵션값을 설정한 템플릿을 spring bean 등록 후 사용할 수도 있다.
단, 직접 생성 시, bootstrap-servers, key-serializer, value-serializer는 필수 옵션값으로 반드시 추가해야 한다.
KafkaTemplateConfig.java
package org.example;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaTemplateConfiguation {
// spring bean 등록
@Bean
public KafkaTemplate<String, String> customKafkaTemplate() {
// kafka producer 설정
// 필수 옵션 : bootstrap-servers, key / value serializer
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all");
// ProducerFactory 초기화 및 생성
ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props);
return new KafkaTemplate<>(pf);
}
}
SpringKafkaCustomProducer.java
package org.example;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.core.KafkaProducerException;
import org.springframework.kafka.core.KafkaSendCallback;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
@SpringBootApplication
public class SpringKafkaCustomProducer implements CommandLineRunner {
// test용 토픽
private static String TOPIC_NAME = "spring-custom-test";
// bean 등록된 템플릿 선언
@Autowired
private KafkaTemplate<String, String> customKafkaTemplate;
public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringKafkaCustomProducer.class);
application.run(args);
}
@Override
public void run(String... args) {
// ListenableFuture : send() 결과값에 대한 처리를 위해 사용
// callback 호출해 정상 적재 여부 비동기 확인 가능
ListenableFuture<SendResult<String, String>> future = customKafkaTemplate.send(TOPIC_NAME, "custom-test");
// onSuccess : 브로커 정상 적재 시 onSuccess 호출
// onFailure : 브로커 적재 실패 시 onFailure 호출
future.addCallback(new KafkaSendCallback<String, String>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("Send Success");
System.out.println(result);
}
@Override
public void onFailure(KafkaProducerException exc) {
System.out.println("Send fail");
System.out.println(exc);
}
});
// 비동기로 적재 여부 확인되기 때문에
// 확인을 위해 flush로 강제로 적재 실행
customKafkaTemplate.flush();
}
}
ListenableFuture 클래스는 브로커에 send 내용이 정상 적재되었는지 확인을 위해 사용한다. callback을 호출하여 정상 적재 시 onSuccess 메서드를, 실패 시 onFailure 메서드를 호출한다.
실행 시 아래와 같이 Success 메시지를 spring application에서 확인할 수 있고, 콘솔으로 메시지가 정상 적재됨을 알 수 있다.
(알게된 사실 : CommandLineRunner를 이용해 application 생성 시, run 실행 후, application은 종료된다. 왜 spring이 계속 정상 종료되는 지 모르겠어서 찾아보니 일반적인 spring application처럼 계속 운영하기 위해서는 ApplicationRunner로 application을 생성해야 한다.)
spring console
...
Send Success
SendResult [producerRecord=ProducerRecord(topic=spring-custom-test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=custom-test, timestamp=null), recordMetadata=spring-custom-test-0@2]
...
kafka client console
(안 되고 있는 줄 알아서 3번 실행했다.)
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic spring-custom-test --from-beginning
custom-test
custom-test
custom-test
spring kafka consumer : spring kafka에서는 consumer를 2개의 타입과 7개의 커밋으로 구분하여 사용한다.
타입 : 레코드 리스너 (MessageListener), 배치 리스너 (BatchMessageListener)
리스너 종류에 따라 한 번 호출하는 메서드에서 처리하는 레코드의 개수가 달라진다.
레코드 리스너 : 1개의 레코드씩 처리
배치 리스너 : 클라이언트 라이브러리의 poll의 ConsumerRecords처럼 여러 개 레코드를 처리
spring kafka의 기본 리스너 타입 : 레코드 리스너
이외에도 아래의 파생 형태가 있음
AcknowledgingMessageListener
ConsumerAwareMessageListener
AcknowledgingConsumerAwareMessageListener
BatchAcknowledgingMessageListener
BatchConsumerAwareMessageListener
BatchAcknowledgingConsumerAwareMessageListener
수동 커밋 이용 시, Acknowledging 붙은 리스너를 사용, kafka consumer 인스턴스를 직접 컨트롤할 때는 ConsumerAware 붙은 리스너를 사용한다.
리스너 종류 및 설명
- 타입 : RECORD
- 리스너
- MessageListener : 레코드 인스턴스 단위로 프로세싱. 오토 커밋 또는 컨슈머 컨테이너의 AckMode를 사용하는 경우
- AcknowledgingMessageListener : 레코드 인스턴스 단위로 프로세싱. 수동 커밋을 사용하는 경우
- ConsumerAwareMessageListener : 레코드 인스턴스 단위로 프로세싱. 컨슈머 객체를 활용하고 싶은 경우
- AcknowledgingConsumerAwareMessageListener : 레코드 인스턴스 단위로 프로세싱. 수동 커밋을 사용하고, 컨슈머 객체를 활용하고 싶은 경우
- 타입 : BATCH
- 리스너
- BatchMessageListener : 레코드 인스턴스 단위로 프로세싱. 오토 커밋 또는 컨슈머 컨테이너의 AckMode를 사용하는 경우
- BatchAcknowledgingMessageListener : 레코드 인스턴스 단위로 프로세싱. 수동 커밋을 사용하는 경우
- BatchConsumerAwareMessageListener : 레코드 인스턴스 단위로 프로세싱. 컨슈머 객체를 활용하고 싶은 경우
- BatchAcknowledgingConsumerAwareMessageListener : 레코드 인스턴스 단위로 프로세싱. 수동 커밋을 사용하고, 컨슈머 객체를 활용하고 싶은 경우
spring kafka의 커밋 == AckMode
기본값 = BATCH, 컨슈머 enable.auto.commit = false 지정
spring kafka의 커밋 종류
- RECORD : 레코드 단위로 프로세싱 이후 커밋
- BATCH : poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋. spring kafka 컨슈머의 AckMode 기본값
- TIME : 특정 시간 이후 커밋. 해당 옵션 사용 시, 시간 간격 선언하는 AckTime 옵션 설정 필요
- COUNT : 특정 개수만큼 레코드 처리된 이후 커밋. 해당 옵션 사용하는 경우, 레코드 개수 선언하는 AckCount 옵션 설정 필요
- COUNT_TIME : TIME, COUNT 옵션 중 조건 하나라도 맞을 시 커밋
- MANUAL : Acknowledgement.acknowledge() 메서드 호출되면 다음 poll() 때 커밋. 매번 acknowledge() 호출 시 BATCH와 동일 동작. 해당 옵션 사용 시, AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 함
- MANUAL_IMMEDIATE : Acknowledgement.acknowledge() 메서드 호출 즉시 커밋. 해당 옵션 사용 시, AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 함
리스너 생성 방식
- 기본 리스너 컨테이너 사용
- 컨테이너 팩토리를 사용해 직접 리스너 생성
기본적인 리스너 생성 방식 : application.yaml 에 아래와 같이 지정한다.
(레코드 리스너 생성)
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
listener:
type: RECORD
레코드 리스너를 이용한 기본 컨슈머 예시
- @KafkaListener 어노테이션에 포함된 파라미터에 따라 메서드에 필요한 파라미터 종류가 달라진다.
레코드 리스너 생성을 위해 application.yaml 파일을 아래와 같이 작성한다.
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
listener:
type: RECORD
spring kafka consumer 예제 (레코드 리스너)
package org.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
@SpringBootApplication
public class SpringKafkaConsumer {
public static Logger logger = LoggerFactory.getLogger(SpringKafkaConsumer.class);
private static final String TOPIC = "spring-consumer-test";
public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringKafkaConsumer.class);
application.run(args);
}
// @KafkaListener 어노테이션 파라미터에 따라 메서드에 필요한 파라미터 종류 달라짐
// 기본 리스너 형태
// 토픽, 그룹 id 지정하여 해당 토픽에서 레코드 가져옴
@KafkaListener(topics = TOPIC, groupId = "test-group-0")
public void recordListener(ConsumerRecord<String, String> record) {
logger.info(record.toString());
}
// 메시지 값을 파라미터로 사용하는 리스너
// 레코드 생성 시 String Serializer 사용했기 때문에 String으로 받음
@KafkaListener(topics = TOPIC, groupId = "test-group-1")
public void singleTopicListener(String messageValue) {
logger.info(messageValue);
}
// properties 옵션으로 해당 컨슈머에게 개별 옵션 부여
@KafkaListener(topics = TOPIC, groupId = "test-group-2", properties = {
"max.poll.interval.ms:60000",
"auto.offset.reset:earliest"
})
public void singleTopicWithPropertiesListener(String messageValue) {
logger.info(messageValue);
}
// concurrency : 컨슈머 멀티 스레드 실행
// 3개 스레드로 병렬로 컨슈머가 레코드 가져옴
@KafkaListener(topics = TOPIC, groupId = "test-group-3", concurrency = "3")
public void concurrentTopicListener(String messageValue) {
logger.info(messageValue);
}
// 특정 토픽의 특정 파티션 값만 가져옴 (partitions)
// 특정 오프셋부터 메시지 가져오도록도 설정 가능 (partitionOffsets)
@KafkaListener(topicPartitions = {
@TopicPartition(topic = TOPIC, partitions = {"0", "1"}),
@TopicPartition(topic = TOPIC, partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3"))
},
groupId = "test-group-4"
)
public void listenSpecificPartition(ConsumerRecord<String, String> record) {
logger.info(record.toString());
}
}
spring-consumer-test 토픽을 파티션 3개로 생성 후, 무작위로 메시지 생성을 진행한 뒤, 아래와 같이 정상적으로 컨슈머 그룹에서 메시지를 가져감을 확인할 수 있다.
각 그룹 별 확인 결과 (test-group-0)
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group-0 --describe
Consumer group 'test-group-0' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group-0 spring-consumer-test 2 5 5 0 - - -
test-group-0 spring-consumer-test 1 2 2 0 - - -
test-group-0 spring-consumer-test 0 2 2 0 - - -
배치 리스너 예시
배치 리스너 생성을 위해 application.yaml 파일을 아래와 같이 작성한다.
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
listener:
type: BATCH
spring kafka consumer 예제 (배치 리스너)
배치 리스너는 배치 처리를 위해 파라미터로 List, ConsumerRecords를 받는다.
package org.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.List;
@SpringBootApplication
public class SpringKafkaBatchConsumer {
public static Logger logger = LoggerFactory.getLogger(SpringKafkaBatchConsumer.class);
private static final String TOPIC = "spring-consumer-test";
public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringKafkaBatchConsumer.class);
application.run(args);
}
// @KafkaListener 어노테이션 파라미터에 따라 메서드에 필요한 파라미터 종류 달라짐
// 기본 배치 리스너 형태
// 토픽, 그룹 id 지정하여 해당 토픽에서 레코드 가져옴
// ConsumerRecords를 파라미터로 받음 (배치 처리)
@KafkaListener(topics = TOPIC, groupId = "test-group-00")
public void batchListener(ConsumerRecords<String, String> records) {
records.forEach(record -> logger.info(record.toString()));
}
// 메시지 값을 파라미터로 사용하는 리스너
// 레코드 생성 시 String Serializer 사용했기 때문에 String으로 받음
// 배치 처리로 토픽의 메시지를 여러 개 가져옴
@KafkaListener(topics = TOPIC, groupId = "test-group-01")
public void batchTopicListener(List<String> list) {
list.forEach(recordValue -> logger.info(recordValue));
}
// concurrency : 컨슈머 멀티 스레드 실행
// 3개 스레드로 병렬로 컨슈머가 레코드 가져옴
@KafkaListener(topics = TOPIC, groupId = "test-group-02", concurrency = "3")
public void batchConcurrentTopicListener(ConsumerRecords<String, String> records) {
records.forEach(record -> logger.info(record.toString()));
}
}
실행 결과 (test-group-00)
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group-00 --describe
Consumer group 'test-group-00' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group-00 spring-consumer-test 2 5 5 0 - - -
test-group-00 spring-consumer-test 1 2 2 0 - - -
test-group-00 spring-consumer-test 0 2 2 0 - - -
배치 컨슈머 리스너 (BatchConsumerAwareMessageListener) : 컨슈머 인스턴스를 파라미터로 사용 (컨슈머 직접 접근)
배치 커밋 리스너 (BatchAcknowledgeingMessageListener) : AckMode 사용을 위해 Acknowledgement 인스턴스를 파라미터로 사용. 단, AckMode를 MANUAL / MANUAL_IMMEDIATE로 지정 필요.
컨슈머 인스턴스 사용 시, 동기 커밋, 비동기 커밋 사용 가능하므로
동기 / 비동기 커밋, 컨슈머 인스턴스 메서드 활용 시, 배치 컨슈머 리스너를 사용하고,
컨슈머 컨테이너에서 관리하는 AckMode를 사용하여 커밋하려면 배치 커밋 리스너를 사용하면 된다.
배치 커밋 컨슈머 리스너 (BatchAcknowledgingConsumerAwareMessageListener) : AckMoe와 컨슈머 인스턴스 모두 사용하는 경우. 단, AckMode는 MANUAL_IMMEDIATE를 사용해야 한다.
배치 커밋 리스너 사용 예시
application.yaml
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
listener:
type: BATCH
ack-mode: MANUAL_IMMEDIATE
배치 컨슈머 리스너, 배치 커밋 리스너 사용 예시
package org.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
@SpringBootApplication
public class SpringKafkaCommitConsumer {
public static Logger logger = LoggerFactory.getLogger(SpringKafkaCommitConsumer.class);
private static final String TOPIC = "spring-consumer-test";
public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringKafkaCommitConsumer.class);
application.run(args);
}
// @KafkaListener 어노테이션 파라미터에 따라 메서드에 필요한 파라미터 종류 달라짐
// 배치 커밋 리스너
// 지정한 Ack Mode 값이 수동 관련 (MANUAL / MANUAL_IMMEDIATE) 인 경우, 수동으로 커밋 명시해야 함
// 또한 파라미터로 Acknowledgement 인스턴스를 받아야 한다.
@KafkaListener(topics = TOPIC, groupId = "test-group-000")
public void commitListener(ConsumerRecords<String, String> records, Acknowledgment ack) {
records.forEach(record -> logger.info(record.toString()));
// 커밋 명시
ack.acknowledge();
}
// 배치 컨슈머 리스너
// 동기 / 비동기 커밋 등 컨슈머 인스턴스를 직접 이용하기 위해서는 Consumer 를 파라미터로 받으면 된다.
@KafkaListener(topics = TOPIC, groupId = "test-group-001")
public void consumerCommitListener(ConsumerRecords<String, String> records, Consumer<String, String> consumer) {
records.forEach(record -> logger.info(record.toString()));
// 비동기로 커밋 진행
consumer.commitAsync();
}
}
컨슈머 결과 확인 (test-group-000)
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group-000 --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group-000 spring-consumer-test 0 2 2 0 consumer-test-group-000-1-0d25d63a-7a01-4472-acce-35ae4617070e /127.0.0.1 consumer-test-group-000-1
test-group-000 spring-consumer-test 1 2 2 0 consumer-test-group-000-1-0d25d63a-7a01-4472-acce-35ae4617070e /127.0.0.1 consumer-test-group-000-1
test-group-000 spring-consumer-test 2 5 5 0 consumer-test-group-000-1-0d25d63a-7a01-4472-acce-35ae4617070e /127.0.0.1 consumer-test-group-000-1
커스텀 리스너 컨테이너 : 서로 다른 설정을 가진 2개 이상의 리스너 구현 및 리밸런스 리스너 구현 시 커스텀 리스너 컨테이너를 사용해야 한다.
카프카 리스너 컨테이너 팩토리를 bean으로 등록하고, KafkaListener 어노테이션에서 커스텀 리스너 컨테이너 팩토리를 등록하면 사용할 수 있다.
listener config
package org.example;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;
import org.apache.kafka.common.TopicPartition;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class ListenerContainerConfiguration {
// spring bean 생성
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> customContainerFactory() {
// 리스너 설정값 지정
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// kafka consumer factory 생성
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props);
// 리스너 컨테이너 생성 위해 선언
// 2개 이상의 컨슈머 리스너 사용 시, concurrency 값 스레드에 맞게 설정. 1로 설정 시 1개 스레드로 실행
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
// 리밸런스 리스너 선언 위해 setConsumerRebalanceListener 메서드 호출
factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// 커밋 되기 전 리밸런스 발생했을 때 로직 작성
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// 커밋 일어난 후 리밸런스 발생했을 때 로직 작성
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 컨슈머가 새로운 파티션을 할당받을 때 로직 작성
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
// 컨슈머가 파티션 잃을 때 (컨슈머 종료, 네트워크 에러 등) 로직 작성
}
});
// 레코드 리스너 사용 명시를 위해 false 선언. 배치 리스너 사용 시 true 사용
factory.setBatchListener(false);
// AckMode 값 설정
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD);
// 컨슈머 설정값을 갖고 있는 팩토리 인스턴스를 ConcurrentKafkaListenerContainerFactory 팩토리에 등록
factory.setConsumerFactory(cf);
return factory;
}
}
커스텀 리스너를 적용한 컨슈머
package org.example;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
@SpringBootApplication
public class SpringConsumerApplication {
public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class);
public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringConsumerApplication.class);
application.run(args);
}
// ListenerContainerConfiguration.java에서 bean 등록한 객체 containerFactory로 선언하여
// 커스텀 컨슈머 리스너를 적용
@KafkaListener(topics = "spring-consumer-test",
groupId = "test-group-002",
containerFactory = "customContainerFactory")
public void customListener(String data) {
logger.info(data);
}
}
컨슈머 그룹 확인 (test-group-002)
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group test-group-002 --describe
Consumer group 'test-group-002' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group-002 spring-consumer-test 2 5 5 0 - - -
test-group-002 spring-consumer-test 1 2 2 0 - - -
test-group-002 spring-consumer-test 0 2 2 0 - - -
모두 정상적으로 컨슈머가 동작함을 확인할 수 있다.
회고
책을 통해 공부한 기본적인 내용은 이 정도로 마무리를 할 것이다. 카프카를 처음 접하면 개념적으로 어렵다는 말을 익히 들었지만, 생각보다 필요한 개념, 주의해야 할 사항이 많아 꽤 시간이 걸린 듯 하다. 책의 나머지 부분은 업무하면서 틈틈히 볼 계획이고, 업무 또는 플젝을 하면서 메모할 사항이 생기면 그 때마다 정리할 계획이다.
(ex. AWS MSK를 업무에 적용한다면 MSK 사용 방법 정리, 그라파나로 모니터링 연동하면 연동하는 방법 정리 등)
(사실 지금 업무 상에 카프카를 꼭 적용해야 하는가에 대한 의문도 있다. 약간 굳이라는 느낌? 업무 범위가 더 커지면 필요한 건 당연하지만 당장에 쓸 일이 크게 있을 거 같진 않다.)
추가적으로 업무에 본격적으로 카프카를 도입한다면 "카프카 핵심 가이드" 책도 실무에 필요한 내용들이 많다고 하여 참고할 예정이다.
'메모 > kafka' 카테고리의 다른 글
kafka source connector, sink connector (0) | 2025.01.18 |
---|---|
Kafka Connect 개념 (0) | 2025.01.12 |
Processor API (0) | 2025.01.12 |
Streams DSL (0) | 2025.01.12 |
kafka streams (0) | 2025.01.07 |