티스토리 뷰
kafka-topics.sh 은 토픽 관리와 관련된 파일이다. (토픽 생성, 토픽 설정 변경 등)
토픽 : 카프카에서 데이터를 구분하는 가장 기본 개념 (RDBMS의 테이블과 유사한 기능)
토픽 생성 방법
- 컨슈머, 프로듀서가 브로커에게 생성되지 않은 토픽에 대해 데이터를 요청할 때
- 명시적으로 토픽을 생성
(유지보수를 효과적으로 하기 위해 명시적으로 토픽을 생성하는 것을 추천)
kafka-topics.sh과 kafka-configs.sh을 이용해 토픽 생성, 토픽 정보 조회, 토픽 설정 변경을 진행한다.
현재 간단히 테스트를 위해 모든 환경은 localhost를 대상으로만 동작하고, 브로커는 1개로 지정하였다.
./config/server.properties
- broker.id=0
- listeners=PLAINTEXT://127.0.0.1:9092
- advertised.listeners=PLAINTEXT://127.0.0.1:9092
1. kafka 브로커, zookeeper 실행
브로커와 zookeeper 실행은 아래 sh 파일로 진행한다.
zookeeper 실행
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
브로커 실행
./bin/kafka-server-start.sh -daemon ./config/server.properties
2. 토픽 생성
--create 옵션으로 토픽을 생성한다.
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic hello.kafka
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic hello.kafka.
(충돌 방지를 위해서 토픽 이름에는 . , _ 를 동시에 사용하지는 말라고 한다.)
파티션 개수, 파티션 복제 개수, 토픽 추가 설정도 가능하다.
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 --config retention.ms=172800000 --topic hello.kafka.2
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic hello.kafka.2.
--partitions : 파티션 개수를 지정. 최소 개수는 1.
옵션 설정하지 않은 경우, config/server.properties에 있는 num.partitions 값으로 생성
--replication-factor : 토픽의 파티션을 복제할 복제 개수. 1은 복제하지 않는다는 의미. 2이면 1개의 복제본을 생성.
최소 설정은 1이고, 최대 설정은 클러스터 내의 브로커 개수.
옵션 사용하지 않은 경우, config/server.properties에 있는 default.replication.factor 값으로 생성
--config : 토픽에 대한 추가 설정
retention.ms : 토픽의 데이터 유지 기간. 172800000ms = 2일이므로, 2일이 지난 토픽의 데이터는 삭제됨.
3. 토픽 조회
--list로 생성된 토픽을 조회한다.
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list
hello.kafka
hello.kafka.2
--describe 옵션으로 각 토픽에 대해 상세 조회를 할 수 있다.
(파티션 개수, 파티션이 위치한 브로커, 토픽 설정 등)
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic hello.kafka.2
Topic: hello.kafka.2 PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=172800000
Topic: hello.kafka.2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka.2 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka.2 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
현재 브로커가 1개이므로, 각 파티션의 리더가 모두 0으로 지정되어 있다. (0번 브로커에 리더 파티션이 있음을 의미)
실 운영 시, 특정 브로커에 리더 파티션이 몰린 경우, 네트워크 오버헤드 발생 가능하므로, 리더 파티션을 여러 브로커에 분산하는 것이 중요하다.
4. 토픽 옵션 변경
kafka-topics.sh, kafka-configs.sh을 이용해 토픽 옵션 수정이 가능하다.
파티션 개수 변경
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic hello.kafka --alter --partitions 4
변경된 내용 확인
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic hello.kafka --describe
Topic: hello.kafka PartitionCount: 4 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: hello.kafka Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: hello.kafka Partition: 3 Leader: 0 Replicas: 0 Isr: 0
앞서 1개의 파티션으로 생성한 hello.kafka 토픽이 4개의 파티션으로 변경되었다.
(파티션 개수를 늘릴 수는 있지만, 줄일 수는 없다. 반드시 늘릴 상황일 때만 사용을 권고한다.)
(파티션 개수를 2로 줄였을 때 아래와 같은 에러 발생)
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic hello.kafka --alter --partitions 2
Error while executing topic command : org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 2.
[2025-01-01 16:58:16,147] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 2.
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.alterTopic(TopicCommand.scala:270)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:64)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 4 partitions, which is higher than the requested 2.
(kafka.admin.TopicCommand$)
토픽 설정 변경 (retention.ms 값 변경)
./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name hello.kafka --alter --add-config retention.ms=86400000
변경된 내용 확인
./bin/kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name hello.kafka --describe
Dynamic configs for topic hello.kafka are:
retention.ms=86400000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=86400000}
별도 지정하지 않았던 retention.ms 값이 86400000ms로 변경되었다.
(--add-config 옵션 사용 시, 이미 존재하는 설정값은 변경되고, 존재하지 않는 설정값은 신규 추가된다.)
'메모 > kafka' 카테고리의 다른 글
producer api (0) | 2025.01.04 |
---|---|
kafka 기본 개념 (0) | 2025.01.04 |
kafka-console-producer.sh / kafka-console-consumer.sh / kafka-consumer-groups.sh / kafka-delete-records.sh (0) | 2025.01.01 |
kafka 설치 및 확인 (1) | 2025.01.01 |
kafka 공부하기 (0) | 2024.12.30 |