티스토리 뷰
기본 설정
gradle에서 아래와 같이 dependencies 지정함으로써 kafka client 라이브러리 사용 가능
/*
* This file was generated by the Gradle 'init' task.
*
* This generated file contains a sample Java application project to get you started.
* For more details on building Java & JVM projects, please refer to https://docs.gradle.org/8.12/userguide/building_java_projects.html in the Gradle documentation.
*/
plugins {
// Apply the application plugin to add support for building a CLI application in Java.
id 'application'
}
repositories {
// Use Maven Central for resolving dependencies.
mavenCentral()
}
dependencies {
// This dependency is used by the application.
implementation libs.guava
// Libraries
implementation 'org.apache.kafka:kafka-clients:2.5.1'
implementation 'org.slf4j:slf4j-simple:1.7.30'
}
// Apply a specific Java toolchain to ease working on different environments.
java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}
application {
mainClass = 'org.example.SimpleProducer'
}
(현재 공부용으로 사용 중인 버전이 2.5.1, slf4j 는 로깅용)
(gradle 프로젝트 시작 : gradle init -> groovy , java 등 적합한 옵션 선택하여 시작)
(gradle 빌드 : gradlew build / 빌드 내용 실행 : gradlew run / 빌드 내용 안 먹을 때 : gradlew clean)
(제일 깔끔하게 새로 빌드하고 실행 : gradlew clean build run)
producer api로 레코드 전송 시 옵션 사항
- 필수 옵션
-- bootstrap.server : 브로커의 호스트 이름:포트
-- key.serializer : 메시지 키 직렬화하는 클래스 지정
-- value.serializer : 메시지 값 직렬화하는 클래스 지정
- 선택 옵션 (지정 안 할 시, default 값으로 전송)
-- acks : 브로커에 정상 저장되었는 지 여부.
1 (default) : 리더 파티션에 데이터 저장 시, 성공으로 판단
0 : 프로듀서가 전송한 즉시 성공으로 판단
-1 / all : 토픽의 min.insync.replicas 개수에 해당하는 리더 파티션, 팔로워 파티션에 데이터 저장 시 성공으로 판단
-- buffer.memory : 브로커로 전송할 데이터를 배치로 모으기 위한 버퍼 메모리양 (default : 32mb)
-- retries : 브로커로부터 에러 받은 뒤, 재전송 시도하는 횟수 (default : 2147483647)
-- batch.size : 배치로 전송할 레코드의 최대 용량 (default : 16kb)
-- linger.ms : 배치 전송하기 전까지 기다리는 최소 시간 (default : 0ms)
-- partitioner.class : 파티션에 적용하는 파티셔너 클래스 지정 (default : org.apache.kafka.clients.producer.internals.DefaultPartitioner)
-- enable.idempotence : 멱등성 프로듀서로 동작할 지 여부 (default : false)
-- transactional.id : 레코드를 트랜잭션 단위로 묶을 지 여부. 설정 시, 트랜잭션 프로듀서로 동작 (default : null)
간단한 producer api 적용
(시작 전, 해당 토픽 생성되어 있는 지 확인 필요)
(아래 예제들은 3개의 파티션을 대상으로 실행함)
package org.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class SimpleProducer {
// kafka logging 용
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
// topic 이름
private final static String TOPIC_NAME = "test";
// broker 서버
private final static String BOOTSTAP_SERVERS = "127.0.0.1:9092";
public static void main(String[] args) {
// producer 정보 입력 (key, value)
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// producer 인스턴스 생성
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
// 레코드에 기록될 키
String messageKey = "test_key";
// 레코드에 기록될 메시지
String messageValue = "test_message";
// 토픽에 넣을 레코드 정보 (키 없으면 null)
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageKey, messageValue);
// send 호출 시, 실제 producer가 broker에 전송하는 것이 아닌,
// 전송할 레코드를 배치 처리 (배치 조건 만족 시, 비동기로 전송)
// 단, default로 대기 시간 0 (linger.ms = 0) 이므로 즉시 전송
producer.send(record);
// 로깅에 레코드 출력
logger.info("{}", record);
// flush 호출 시, 버퍼에 남아 있는 레코드 배치를 broker에게 전송
producer.flush();
// producer 인스턴스 종료
producer.close();
}
}
실행 순서
- 토픽, 브로커 서버를 지정한다
- Properties에 producer config 설정 값을 넣는다
- KafkaProducer 인스턴스 만든다
- 레코드 정보를 담은 ProducerRecord 인스턴스 만든다
- send() 로 브로커에 전송할 레코드를 producer 버퍼에 담는다 (linger.ms = 0가 기본값이라 위 코드 상으로는 즉시 전송)
- flush() 로 브로커에 레코드를 배치 전송한다 (2.5 버전 이후로는 UniformStickyPartitioner로 기본 전송) (전송 시 압축하여 전송도 가능)
- producer 인스턴스를 닫는다 (close)
실행 결과 (key가 null 인 경우 : ProducerRecord 인스턴스 생성 시, key 위치 parameter에 value 넣으면 없는 것으로 인식)
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --property print.key=true
null test_message
test_key test_message
Custom Partitioner 적용
레코드의 메시지 키가 동일하다면 동일한 파티션에 동일한 키를 갖는 레코드가 기록됨
하지만, 파티션이 늘어나는 경우, 보장할 수 없음
이와 같이, 파티션 고정 등 특수하게 파티션 선정 및 기피 등을 커스텀해야 하는 경우, Partitioner interface 상속받아 Custom Partitioner 생성 후, 적용할 수 있음
CustomPartitioner.java
package org.example;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
// Partitioner interface 상속받은 CustomPartitioner
// 특정 키 갖는 레코드를 특정 파티션에 넣기 위함
// 파티션 개수 늘어나면, 키가 동일하더라도 늘어나기 전 후 동일 파티션에 레코드 들어갈 지 모르기 때문
// 필수 구현 메서드 : partition, configure, close
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// key 값 없는 경우, 에러 발생
if(keyBytes == null) {
throw new InvalidRecordException("Need message key!!!");
}
// key == "Zero" 인 경우 0번 파티션으로 고정
if(((String)key).equals("Zero")) {
return 0;
}
// 토픽에 대한 파티션 리스트
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// 파티션 전체 개수
int numPartitions = partitions.size();
// 키 해시 (murmur2) 값에 따라 파티션 할당
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void configure(Map<String, ?> configs) {}
@Override
public void close() {}
}
SimpleProducer.java
package org.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class SimpleProducer {
// kafka logging 용
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
// topic 이름
private final static String TOPIC_NAME = "test";
// broker 서버
private final static String BOOTSTAP_SERVERS = "127.0.0.1:9092";
public static void main(String[] args) {
// producer 정보 입력 (key, value)
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
// producer 인스턴스 생성
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
// 레코드에 기록될 키
String messageKey = "Zero";
// 레코드에 기록될 메시지
String messageValue = "0000000";
// 토픽에 넣을 레코드 정보 (키 없으면 null)
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageKey, messageValue);
producer.send(record);
logger.info("{}", record);
producer.flush();
producer.close();
}
}
Properties 설정 시, ProducerConfig.PARTITIONER_CLASS_CONFIG 값을 커스텀한 CustomPartitioner 로 지정함으로써, 기본적인 해시기반 파티셔너가 아닌, 메시지 키가 "Zero" 이면 0번에 무조건 넣도록 강제하였다.
실행 결과
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --property print.key=true --partition 0
Zero 0000000
^CProcessed a total of 1 messages
KafkaProducer의 send()는 기본적으로 비동기로 전송되며, 옵션 (batch.size, linger.ms 등) 만족 시, 버퍼에 저장된 배치가 브로커에 전송된다. 하지만, 레코드 간의 순서가 보장되어야 하는 메시지 등은 비동기 처리 시, 순서가 꼬일 수 있기 때문에 동기식 처리가 필요하다.
send().get() 이용 시, 브로커에 저장된 내용을 동기식으로 처리할 수 있다.
즉, 배치로 브로커에 전송된 모든 메시지에 대하여 브로커로부터 응답이 모두 올 때까지 다음 배치는 브로커에 전송되지 않는다.
그리고 Callback interface를 상속받아 비동기 로깅을 커스텀하여 구성할 수도 있다.
아래는 send().get()으로 레코드의 메타데이터 (RecordMetadata class) 를 동일하게 처리하기 위해 작성된 callback class이다.
ProducerCallback.java
package org.example;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// Callback interface 상속 받아
// callback 처리하여 비동기에 대한 로깅 진행
public class ProducerCallback implements Callback {
private final static Logger logger = LoggerFactory.getLogger(ProducerCallback.class);
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e != null) {
logger.error(e.getMessage(), e);
} else {
logger.info(recordMetadata.toString());
}
}
}
SimpleProducer.java
package org.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class SimpleProducer {
// kafka logging 용
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
// topic 이름
private final static String TOPIC_NAME = "test";
// broker 서버
private final static String BOOTSTAP_SERVERS = "127.0.0.1:9092";
public static void main(String[] args) {
// producer 정보 입력 (key, value)
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// producer 인스턴스 생성
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
long before = System.currentTimeMillis();
// broker의 record 수신 여부 동기식 체크
for(int i = 0; i < 100; i++) {
String key = "syncKey" + Integer.toString(i);
String value = "syncValue" + Integer.toString(i);
try {
ProducerRecord<String, String> record = new ProducerRecord<String,String>(TOPIC_NAME, key, value);
// send().get() : 브로커에 메시지 전송을 동기식으로 처리
RecordMetadata metadata = producer.send(record).get();
logger.info(metadata.toString());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
// 남은 거 브로커에 전송
producer.flush();
long after = System.currentTimeMillis();
long diff = after - before;
System.out.println("동기식 처리 시간 : " + diff);
long before2 = System.currentTimeMillis();
// broker의 record 수신 여부 비동기 체크
for(int i = 0; i < 100; i++) {
String key = "asyncKey" + Integer.toString(i);
String value = "asyncKey" + Integer.toString(i);
try {
ProducerRecord<String, String> record = new ProducerRecord<String,String>(TOPIC_NAME, key, value);
// callback 상속받은 ProducerCallback() 을 인자로 사용함으로써
// send() 결과 로깅을 비동기로 처리
producer.send(record, new ProducerCallback());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
// 남은 거 브로커에 전송
producer.flush();
long after2 = System.currentTimeMillis();
long diff2 = after2 - before2;
System.out.println("비동기 처리 시간 : " + diff2);
producer.close();
}
}
동일하게 100번 수행을 했지만, 브로커 응답 로깅을 동기 / 비동기로 처리했을 때 시간 차이는 다음과 같다.
...
[main] INFO org.example.SimpleProducer - test-2@86
[main] INFO org.example.SimpleProducer - test-1@207
동기식 처리 시간 : 671
...
[kafka-producer-network-thread | producer-1] INFO org.example.ProducerCallback - test-0@248
[kafka-producer-network-thread | producer-1] INFO org.example.ProducerCallback - test-2@117
비동기 처리 시간 : 21
...
약 30배 정도 압도적으로 차이가 발생함을 확인할 수 있다.
'메모 > kafka' 카테고리의 다른 글
kafka streams (0) | 2025.01.07 |
---|---|
admin api (0) | 2025.01.06 |
kafka 기본 개념 (0) | 2025.01.04 |
kafka-console-producer.sh / kafka-console-consumer.sh / kafka-consumer-groups.sh / kafka-delete-records.sh (0) | 2025.01.01 |
kafka-topics.sh (3) | 2025.01.01 |