티스토리 뷰

메모/kafka

admin api

4567은 소수 2025. 1. 6. 00:04

쉘 스크립트를 이용해 kafka 클러스터 정보를 확인할 수도 있지만, 매번 확인하는 것은 번거로운 작업이며, 클러스터 옵션 자동화를 위해서는 별도 코드 작성이 필요하다. 

kafka 클라이언트 라이브러리에서는 클러스터 내부 옵션 설정, 조회 등을 위한 admin api를 제공한다. 

 

AdminClient 사용 예시 

- 컨슈머를 멀티 스레드 구성하려고 할 때, 토픽의 파티션 개수를 확인 후, 멀티 스레드로 구성

- AdminClient 클래스로 구현된 대시보드를 통해 ACL이 적용된 클러스터 리소스 접근 권한 규칙을 추가 

- 특정 토픽의 데이터 양이 늘어났을 때 AdminClient로 토픽의 파티션 개수를 늘림

 

AdminClient는 kafka 클러스터 정보만 입력하면 된다. 

 

AdminClient의 주요 메서드는 아래와 같다. 

- describeCluster : 브로커 정보 조회 

- listTopics : 토픽 리스트 조회 

- listConsumerGroups : 컨슈머 그룹 조회 

- createTopics : 신규 토픽 생성 

- createPartitions : 파티션 개수 변경 

- createAcls : 접근제어 규칙 생성 

 

AdminClient는 버전에 따라서 구성이 바뀌기 때문에 클러스터와 클라이언트 버전을 맞춰 사용하는 것이 좋다. 

 


 

AdminClient로 브로커 정보, 토픽 정보 조회 

 

package org.example;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.ConfigResource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class SimpleAdmin {
    private final static Logger logger = LoggerFactory.getLogger(SimpleAdmin.class);
    private final static String BOOTSTAP_SERVERS = "localhost:9092";

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        
        // consumer에 필요한 config 내용 추가 
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTAP_SERVERS);

        // admin client 생성 
        AdminClient admin = AdminClient.create(configs);
        
        // describeCluster() : 브로커 정보 조회 
        logger.info("==== GET BROKER INFORMATION ====");
        for(Node node : admin.describeCluster().nodes().get()) {
            logger.info("node : {}", node);
            
            ConfigResource cr = new ConfigResource(ConfigResource.Type.BROKER, node.idString());
            DescribeConfigsResult describeConfigs = admin.describeConfigs(Collections.singleton(cr));
            describeConfigs.all().get().forEach((broker, config) -> {
                config.entries().forEach(configEntry -> {
                    logger.info(configEntry.name() + " = " + configEntry.value());
                });
            });
        }

        // listTopics : 토픽 정보 조회
        logger.info("==== TOPIC LIST ====");
        for (TopicListing topicListing : admin.listTopics().listings().get()) {
            logger.info("{}", topicListing.toString());
        }

        // describeTopics : 토픽 상세 조회
        logger.info("==== 'test' TOPIC ====");
        Map<String, TopicDescription> topicInformation = admin.describeTopics(Collections.singletonList("test")).all().get();
        logger.info("{}", topicInformation);

        // listConsumerGroups : 컨슈머 그룹 조회 
        logger.info("== CONSUMER GROUPS ====");
        ListConsumerGroupsResult listConsumerGroups = admin.listConsumerGroups();
        listConsumerGroups.all().get().forEach(v -> {
            logger.info("{}", v);
        });

        admin.close();
    }
}

 

아래와 같이 정상적으로 정보를 조회할 수 있다. 

[main] INFO org.example.SimpleAdmin - ==== GET BROKER INFORMATION ====
[main] INFO org.example.SimpleAdmin - node : 127.0.0.1:9092 (id: 0 rack: null)
[main] INFO org.example.SimpleAdmin - log.cleaner.min.compaction.lag.ms = 0
[main] INFO org.example.SimpleAdmin - offsets.topic.num.partitions = 50
[main] INFO org.example.SimpleAdmin - log.flush.interval.messages = 9223372036854775807
...
[main] INFO org.example.SimpleAdmin - ==== TOPIC LIST ====
[main] INFO org.example.SimpleAdmin - (name=0, internal=false)
[main] INFO org.example.SimpleAdmin - (name=test, internal=false)
[main] INFO org.example.SimpleAdmin - (name=__consumer_offset, internal=false)
[main] INFO org.example.SimpleAdmin - ==== 'test' TOPIC ====
[main] INFO org.example.SimpleAdmin - {test=(name=test, internal=false, partitions=(partition=0, leader=127.0.0.1:9092 (id: 0 rack: null), replicas=127.0.0.1:9092 (id: 0 rack: null), isr=127.0.0.1:9092 (id: 0 rack: null)),(partition=1, leader=127.0.0.1:9092 (id: 0 rack: null), replicas=127.0.0.1:9092 (id: 0 rack: null), isr=127.0.0.1:9092 (id: 0 rack: null)),(partition=2, leader=127.0.0.1:9092 (id: 0 rack: null), replicas=127.0.0.1:9092 (id: 0 rack: null), isr=127.0.0.1:9092 (id: 0 rack: null)), authorizedOperations=null)}
[main] INFO org.example.SimpleAdmin - == CONSUMER GROUPS ====
[main] INFO org.example.SimpleAdmin - (groupId='test-group', isSimpleConsumerGroup=false)

'메모 > kafka' 카테고리의 다른 글

kafka streams  (0) 2025.01.07
producer api  (0) 2025.01.04
kafka 기본 개념  (0) 2025.01.04
kafka-console-producer.sh / kafka-console-consumer.sh / kafka-consumer-groups.sh / kafka-delete-records.sh  (0) 2025.01.01
kafka-topics.sh  (3) 2025.01.01
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
TAG
more
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함