티스토리 뷰

메모/kafka

producer api

4567은 소수 2025. 1. 4. 02:25

기본 설정 

gradle에서 아래와 같이 dependencies 지정함으로써 kafka client 라이브러리 사용 가능 

/*
 * This file was generated by the Gradle 'init' task.
 *
 * This generated file contains a sample Java application project to get you started.
 * For more details on building Java & JVM projects, please refer to https://docs.gradle.org/8.12/userguide/building_java_projects.html in the Gradle documentation.
 */

plugins {
    // Apply the application plugin to add support for building a CLI application in Java.
    id 'application'
}

repositories {
    // Use Maven Central for resolving dependencies.
    mavenCentral()
}

dependencies {
    // This dependency is used by the application.
    implementation libs.guava

    // Libraries
    implementation 'org.apache.kafka:kafka-clients:2.5.1'
    implementation 'org.slf4j:slf4j-simple:1.7.30'
}

// Apply a specific Java toolchain to ease working on different environments.
java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(17)
    }
}

application {
    mainClass = 'org.example.SimpleProducer'
}

 

(현재 공부용으로 사용 중인 버전이 2.5.1, slf4j 는 로깅용) 

(gradle 프로젝트 시작 : gradle init -> groovy , java 등 적합한 옵션 선택하여 시작) 

(gradle 빌드 : gradlew build / 빌드 내용 실행 : gradlew run / 빌드 내용 안 먹을 때 : gradlew clean) 

(제일 깔끔하게 새로 빌드하고 실행 : gradlew clean build run) 

 

 

producer api로 레코드 전송 시 옵션 사항 

- 필수 옵션

-- bootstrap.server : 브로커의 호스트 이름:포트 

-- key.serializer : 메시지 키 직렬화하는 클래스 지정 

-- value.serializer : 메시지 값 직렬화하는 클래스 지정 

 

- 선택 옵션 (지정 안 할 시, default 값으로 전송) 

-- acks : 브로커에 정상 저장되었는 지 여부. 

 1 (default) : 리더 파티션에 데이터 저장 시, 성공으로 판단 

 0 : 프로듀서가 전송한 즉시 성공으로 판단 

 -1 / all : 토픽의 min.insync.replicas 개수에 해당하는 리더 파티션, 팔로워 파티션에 데이터 저장 시 성공으로 판단 

 

-- buffer.memory : 브로커로 전송할 데이터를 배치로 모으기 위한 버퍼 메모리양 (default : 32mb) 

-- retries : 브로커로부터 에러 받은 뒤, 재전송 시도하는 횟수 (default : 2147483647) 

-- batch.size : 배치로 전송할 레코드의 최대 용량 (default : 16kb) 

-- linger.ms : 배치 전송하기 전까지 기다리는 최소 시간 (default : 0ms) 

-- partitioner.class : 파티션에 적용하는 파티셔너 클래스 지정 (default : org.apache.kafka.clients.producer.internals.DefaultPartitioner) 

-- enable.idempotence : 멱등성 프로듀서로 동작할 지 여부 (default : false)

-- transactional.id : 레코드를 트랜잭션 단위로 묶을 지 여부. 설정 시, 트랜잭션 프로듀서로 동작 (default : null) 


 

간단한 producer api 적용 

(시작 전, 해당 토픽 생성되어 있는 지 확인 필요) 

(아래 예제들은 3개의 파티션을 대상으로 실행함) 

package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class SimpleProducer {
    // kafka logging 용
    private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    // topic 이름 
    private final static String TOPIC_NAME = "test";
    // broker 서버 
    private final static String BOOTSTAP_SERVERS = "127.0.0.1:9092";

    public static void main(String[] args) {
        // producer 정보 입력 (key, value)
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // producer 인스턴스 생성 
        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        // 레코드에 기록될 키 
        String messageKey = "test_key";
        // 레코드에 기록될 메시지
        String messageValue = "test_message";
        // 토픽에 넣을 레코드 정보 (키 없으면 null)
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageKey, messageValue);
        
        // send 호출 시, 실제 producer가 broker에 전송하는 것이 아닌, 
        // 전송할 레코드를 배치 처리 (배치 조건 만족 시, 비동기로 전송)
        // 단, default로 대기 시간 0 (linger.ms = 0) 이므로 즉시 전송 
        producer.send(record);
        // 로깅에 레코드 출력
        logger.info("{}", record);

        // flush 호출 시, 버퍼에 남아 있는 레코드 배치를 broker에게 전송
        producer.flush();
        // producer 인스턴스 종료 
        producer.close();
    }
}

 

실행 순서 

- 토픽, 브로커 서버를 지정한다 

- Properties에 producer config 설정 값을 넣는다 

- KafkaProducer 인스턴스 만든다 

- 레코드 정보를 담은 ProducerRecord 인스턴스 만든다 

- send() 로 브로커에 전송할 레코드를 producer 버퍼에 담는다 (linger.ms = 0가 기본값이라 위 코드 상으로는 즉시 전송)

- flush() 로 브로커에 레코드를 배치 전송한다 (2.5 버전 이후로는 UniformStickyPartitioner로 기본 전송) (전송 시 압축하여 전송도 가능) 

- producer 인스턴스를 닫는다 (close) 

 

실행 결과 (key가 null 인 경우 : ProducerRecord 인스턴스 생성 시, key 위치 parameter에 value 넣으면 없는 것으로 인식) 

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --property print.key=true
null	test_message
test_key	test_message

 


 

Custom Partitioner 적용 

 

레코드의 메시지 키가 동일하다면 동일한 파티션에 동일한 키를 갖는 레코드가 기록됨

하지만, 파티션이 늘어나는 경우, 보장할 수 없음 

이와 같이, 파티션 고정 등 특수하게 파티션 선정 및 기피 등을 커스텀해야 하는 경우, Partitioner interface 상속받아 Custom Partitioner 생성 후, 적용할 수 있음

 

CustomPartitioner.java

package org.example;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

// Partitioner interface 상속받은 CustomPartitioner 
// 특정 키 갖는 레코드를 특정 파티션에 넣기 위함 
// 파티션 개수 늘어나면, 키가 동일하더라도 늘어나기 전 후 동일 파티션에 레코드 들어갈 지 모르기 때문 

// 필수 구현 메서드 : partition, configure, close 

public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, 
                        Object value, byte[] valueBytes, Cluster cluster) {
        
        // key 값 없는 경우, 에러 발생 
        if(keyBytes == null) {
            throw new InvalidRecordException("Need message key!!!");
        }

        // key == "Zero" 인 경우 0번 파티션으로 고정 
        if(((String)key).equals("Zero")) {
            return 0;
        }

        // 토픽에 대한 파티션 리스트 
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        // 파티션 전체 개수 
        int numPartitions = partitions.size();
        // 키 해시 (murmur2) 값에 따라 파티션 할당 
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    @Override
    public void configure(Map<String, ?> configs) {}

    @Override 
    public void close() {}
}

 

SimpleProducer.java

package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class SimpleProducer {
    // kafka logging 용
    private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    // topic 이름 
    private final static String TOPIC_NAME = "test";
    // broker 서버 
    private final static String BOOTSTAP_SERVERS = "127.0.0.1:9092";

    public static void main(String[] args) {
        // producer 정보 입력 (key, value)
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);

        // producer 인스턴스 생성 
        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        // 레코드에 기록될 키 
        String messageKey = "Zero";
        // 레코드에 기록될 메시지
        String messageValue = "0000000";
        // 토픽에 넣을 레코드 정보 (키 없으면 null)
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageKey, messageValue);
        
        producer.send(record);
        logger.info("{}", record);

        producer.flush();
        producer.close();
    }
}

 

Properties 설정 시, ProducerConfig.PARTITIONER_CLASS_CONFIG 값을 커스텀한 CustomPartitioner 로 지정함으로써, 기본적인 해시기반 파티셔너가 아닌, 메시지 키가 "Zero" 이면 0번에 무조건 넣도록 강제하였다. 

 

실행 결과 

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --property print.key=true --partition 0
Zero	0000000
^CProcessed a total of 1 messages

 


 

KafkaProducer의 send()는 기본적으로 비동기로 전송되며, 옵션 (batch.size, linger.ms 등) 만족 시, 버퍼에 저장된 배치가 브로커에 전송된다. 하지만, 레코드 간의 순서가 보장되어야 하는 메시지 등은 비동기 처리 시, 순서가 꼬일 수 있기 때문에 동기식 처리가 필요하다. 

 

send().get() 이용 시, 브로커에 저장된 내용을 동기식으로 처리할 수 있다. 

즉, 배치로 브로커에 전송된 모든 메시지에 대하여 브로커로부터 응답이 모두 올 때까지 다음 배치는 브로커에 전송되지 않는다. 

 

그리고 Callback interface를 상속받아 비동기 로깅을 커스텀하여 구성할 수도 있다. 

아래는 send().get()으로 레코드의 메타데이터 (RecordMetadata class) 를 동일하게 처리하기 위해 작성된 callback class이다. 

 

ProducerCallback.java

package org.example;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// Callback interface 상속 받아 
// callback 처리하여 비동기에 대한 로깅 진행 

public class ProducerCallback implements Callback {
    private final static Logger logger = LoggerFactory.getLogger(ProducerCallback.class);

    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if(e != null) {
            logger.error(e.getMessage(), e);
        } else {
            logger.info(recordMetadata.toString());
        }
    }
}

 

SimpleProducer.java

package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class SimpleProducer {
    // kafka logging 용
    private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    // topic 이름 
    private final static String TOPIC_NAME = "test";
    // broker 서버 
    private final static String BOOTSTAP_SERVERS = "127.0.0.1:9092";

    public static void main(String[] args) {
        // producer 정보 입력 (key, value)
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // producer 인스턴스 생성 
        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        long before = System.currentTimeMillis();

        // broker의 record 수신 여부 동기식 체크
        for(int i = 0; i < 100; i++) {
            String key = "syncKey" + Integer.toString(i);
            String value = "syncValue" + Integer.toString(i);

            try {
                ProducerRecord<String, String> record = new ProducerRecord<String,String>(TOPIC_NAME, key, value);
                // send().get() : 브로커에 메시지 전송을 동기식으로 처리 
                RecordMetadata metadata = producer.send(record).get();
                logger.info(metadata.toString());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }

        // 남은 거 브로커에 전송 
        producer.flush();

        long after = System.currentTimeMillis();
        long diff = after - before;
        System.out.println("동기식 처리 시간 : " + diff);

        long before2 = System.currentTimeMillis();

        // broker의 record 수신 여부 비동기 체크 
        for(int i = 0; i < 100; i++) {
            String key = "asyncKey" + Integer.toString(i);
            String value = "asyncKey" + Integer.toString(i);

            try {
                ProducerRecord<String, String> record = new ProducerRecord<String,String>(TOPIC_NAME, key, value);
                // callback 상속받은 ProducerCallback() 을 인자로 사용함으로써 
                // send() 결과 로깅을 비동기로 처리
                producer.send(record, new ProducerCallback());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }

        // 남은 거 브로커에 전송 
        producer.flush();

        long after2 = System.currentTimeMillis();
        long diff2 = after2 - before2;
        System.out.println("비동기 처리 시간 : " + diff2);

        producer.close();
    }
}

 

동일하게 100번 수행을 했지만, 브로커 응답 로깅을 동기 / 비동기로 처리했을 때 시간 차이는 다음과 같다. 

 

...
[main] INFO org.example.SimpleProducer - test-2@86
[main] INFO org.example.SimpleProducer - test-1@207
동기식 처리 시간 : 671
...
[kafka-producer-network-thread | producer-1] INFO org.example.ProducerCallback - test-0@248
[kafka-producer-network-thread | producer-1] INFO org.example.ProducerCallback - test-2@117
비동기 처리 시간 : 21
...

 

약 30배 정도 압도적으로 차이가 발생함을 확인할 수 있다. 

'메모 > kafka' 카테고리의 다른 글

kafka streams  (0) 2025.01.07
admin api  (0) 2025.01.06
kafka 기본 개념  (0) 2025.01.04
kafka-console-producer.sh / kafka-console-consumer.sh / kafka-consumer-groups.sh / kafka-delete-records.sh  (0) 2025.01.01
kafka-topics.sh  (3) 2025.01.01
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
TAG
more
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함