티스토리 뷰

메모/kafka

kafka-topics.sh

4567은 소수 2025. 1. 1. 16:59

kafka-topics.sh 은 토픽 관리와 관련된 파일이다. (토픽 생성, 토픽 설정 변경 등)

 

토픽 : 카프카에서 데이터를 구분하는 가장 기본 개념 (RDBMS의 테이블과 유사한 기능)

 

토픽 생성 방법

- 컨슈머, 프로듀서가 브로커에게 생성되지 않은 토픽에 대해 데이터를 요청할 때

- 명시적으로 토픽을 생성

(유지보수를 효과적으로 하기 위해 명시적으로 토픽을 생성하는 것을 추천)

 

kafka-topics.sh과 kafka-configs.sh을 이용해 토픽 생성, 토픽 정보 조회, 토픽 설정 변경을 진행한다. 


 

현재 간단히 테스트를 위해 모든 환경은 localhost를 대상으로만 동작하고, 브로커는 1개로 지정하였다. 

 

./config/server.properties

- broker.id=0

- listeners=PLAINTEXT://127.0.0.1:9092

- advertised.listeners=PLAINTEXT://127.0.0.1:9092

 

1. kafka 브로커, zookeeper 실행 

 

브로커와 zookeeper 실행은 아래 sh 파일로 진행한다. 

 

zookeeper 실행

./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties

 

브로커 실행

./bin/kafka-server-start.sh -daemon ./config/server.properties

 


 

2. 토픽 생성

 

--create 옵션으로 토픽을 생성한다. 

 

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic hello.kafka

WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic hello.kafka.

 

(충돌 방지를 위해서 토픽 이름에는 . , _ 를 동시에 사용하지는 말라고 한다.)

 

파티션 개수, 파티션 복제 개수, 토픽 추가 설정도 가능하다. 

 

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --config retention.ms=172800000 --topic hello.kafka.2

WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic hello.kafka.2.

 

--partitions : 파티션 개수를 지정. 최소 개수는 1.

옵션 설정하지 않은 경우, config/server.properties에 있는 num.partitions 값으로 생성

 

--replication-factor : 토픽의 파티션을 복제할 복제 개수. 1은 복제하지 않는다는 의미. 2이면 1개의 복제본을 생성.

최소 설정은 1이고, 최대 설정은 클러스터 내의 브로커 개수. 

옵션 사용하지 않은 경우, config/server.properties에 있는 default.replication.factor 값으로 생성

 

--config : 토픽에 대한 추가 설정 

retention.ms : 토픽의 데이터 유지 기간. 172800000ms = 2일이므로, 2일이 지난 토픽의 데이터는 삭제됨.

 


 

3. 토픽 조회 

 

--list로 생성된 토픽을 조회한다. 

 

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list

hello.kafka
hello.kafka.2

 

--describe 옵션으로 각 토픽에 대해 상세 조회를 할 수 있다. 

(파티션 개수, 파티션이 위치한 브로커, 토픽 설정 등)

 

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic hello.kafka.2

Topic: hello.kafka.2	PartitionCount: 3	ReplicationFactor: 1	Configs: segment.bytes=1073741824,retention.ms=172800000
	Topic: hello.kafka.2	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: hello.kafka.2	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: hello.kafka.2	Partition: 2	Leader: 0	Replicas: 0	Isr: 0

 

현재 브로커가 1개이므로, 각 파티션의 리더가 모두 0으로 지정되어 있다. (0번 브로커에 리더 파티션이 있음을 의미) 

실 운영 시, 특정 브로커에 리더 파티션이 몰린 경우, 네트워크 오버헤드 발생 가능하므로, 리더 파티션을 여러 브로커에 분산하는 것이 중요하다. 

 


 

4. 토픽 옵션 변경

kafka-topics.sh, kafka-configs.sh을 이용해 토픽 옵션 수정이 가능하다. 

 

파티션 개수 변경

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic hello.kafka --alter --partitions 4

 

변경된 내용 확인 

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic hello.kafka --describe

Topic: hello.kafka	PartitionCount: 4	ReplicationFactor: 1	Configs: segment.bytes=1073741824
	Topic: hello.kafka	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: hello.kafka	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: hello.kafka	Partition: 2	Leader: 0	Replicas: 0	Isr: 0
	Topic: hello.kafka	Partition: 3	Leader: 0	Replicas: 0	Isr: 0

 

앞서 1개의 파티션으로 생성한 hello.kafka 토픽이 4개의 파티션으로 변경되었다. 

(파티션 개수를 늘릴 수는 있지만, 줄일 수는 없다. 반드시 늘릴 상황일 때만 사용을 권고한다.)

(파티션 개수를 2로 줄였을 때 아래와 같은 에러 발생)

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic hello.kafka --alter --partitions 2

Error while executing topic command : org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 2.
[2025-01-01 16:58:16,147] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 2.
	at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
	at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
	at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
	at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
	at kafka.admin.TopicCommand$AdminClientTopicService.alterTopic(TopicCommand.scala:270)
	at kafka.admin.TopicCommand$.main(TopicCommand.scala:64)
	at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 2.
 (kafka.admin.TopicCommand$)

 

 

토픽 설정 변경 (retention.ms 값 변경)

./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name hello.kafka --alter --add-config retention.ms=86400000

 

변경된 내용 확인

./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name hello.kafka --describe

Dynamic configs for topic hello.kafka are:
  retention.ms=86400000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=86400000}

 

별도 지정하지 않았던 retention.ms 값이 86400000ms로 변경되었다. 

(--add-config 옵션 사용 시, 이미 존재하는 설정값은 변경되고, 존재하지 않는 설정값은 신규 추가된다.)

'메모 > kafka' 카테고리의 다른 글

producer api  (0) 2025.01.04
kafka 기본 개념  (0) 2025.01.04
kafka-console-producer.sh / kafka-console-consumer.sh / kafka-consumer-groups.sh / kafka-delete-records.sh  (0) 2025.01.01
kafka 설치 및 확인  (1) 2025.01.01
kafka 공부하기  (0) 2024.12.30
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
TAG
more
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함