티스토리 뷰
kafka-console-producer.sh / kafka-console-consumer.sh / kafka-consumer-groups.sh / kafka-delete-records.sh
4567은 소수 2025. 1. 1. 19:25kafka-console-producer.sh로 생성된 토픽에 데이터를 넣을 수 있다.
레코드 : 토픽에 넣는 데이터, key, value로 구성되며, key 설정하지 않을 시 null로 key를 설정한다.
1. 키 없이 토픽에 레코드 기록
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello.kafka
>hello
>kafka
>123
>
>^C
ctrl C로 종료하기 전까지 "hello", "kafka", "123", " " 4개의 레코드를 기록했다.
레코드에 기록되는 값은 UTF-8 인코딩 기준으로 byte 변환 후, ByteArraySerializer로 직렬화된다. 따라서 string 이외의 타입 (정의된 객체 등)은 전송할 수 없으며, 다른 타입으로 직렬화하여 브로커에게 전송하기 위해서는 프로듀서 애플리케이션을 직접 개발해야 한다.
토픽에 저장된 레코드는 아래와 같이 kafka-console-consumer.sh을 이용해 확인 가능하다.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka --from-beginning
kafka
123
hello
--from-beginning 옵션으로 토픽에 저장된 모든 레코드를 읽는다. 해당 옵션이 없을 시, consumer로 전송되지 않은 새로운 레코드만 출력한다.
2. 키 있이 토픽에 레코드 기록
./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic hello.kafka --property "parse.key=true" --property "key.separator=:"
>k1:v1
>k2:v2
>k1:v3
>k4:v4
>^C
parse.key=true 옵션으로 메시지 키를 추가할 수 있다.
key.separator= 옵션으로 key, value 구분자를 지정할 수 있다. 별도 옵션 없을 시, \t를 기준으로 key, value를 구분한다.
key가 null인 경우, 프로듀서가 파티션으로 메시지 전송 시, 레코드 배치 단위로 라운드 로빈으로 전송한다.
key가 존재하는 경우, key 해시값을 기준으로 파티션 중 하나에 전송된다. 따라서 같은 key는 같은 파티션으로 전송된다.
다른 파티션으로 전송하기 위해서는 커스텀 파티션을 이용해야 한다.
(파티션 개수가 늘어나는 경우, 동일한 key를 갖더라도, 동일 파티션에 레코드가 들어간다는 보장이 없다. 이를 보장하기 위해서도 커스텀 파티션을 이용해야 한다.)
각 파티션 별로 기록된 내용 확인 시 아래와 같다.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka --partition 0 --from-beginning --property print.key=true --property key.separator=:
null:hello
k4:v4
^CProcessed a total of 2 messages
--partition : 파티션 번호를 의미한다. 없는 경우, 전체 파티션에 기록된 레코드를 출력한다.
(1번 파티션 확인 시, 아래와 같이 k1 key 값이 같이 있는 것을 알 수 있다. 그리고 key가 null인 경우, 라운드 로빈에 의해 1개씩 들어가 있다.)
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka --partition 1 --from-beginning --property print.key=true --property key.separator=:
null:123
k1:v1
k2:v2
k1:v3
^CProcessed a total of 4 messages
kafka-console-consumer.sh 에서 --group 옵션을 통해 컨슈머 그룹을 생성할 수 있다.
컨슈머 그룹은 1개 이상의 컨슈머로 구성되며, 가져간 메시지에 대해 커밋을 브로커에 저장한다.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic hello.kafka --property print.key=true --property key.separator=: --group hello-group --from-beginning
null:kafka
null:
null:123
k1:v1
k2:v2
k1:v3
null:hello
k4:v4
^CProcessed a total of 8 messages
kafka-console-producer.sh로 생성한 메시지 순서와 kafka-console-consumer.sh로 조회한 메시지의 순서는 다르다.
토픽의 데이터를 가져가게 되면, 토픽의 모든 파티션으로부터 동일한 중요도로 데이터를 가져가기 때문이다.
(토픽에 넣은 데이터의 순서를 보장하고 싶다면, 파티션 1개로 구성된 토픽을 만들면 된다.)
컨슈머 그룹과 관련된 정보는 kafka-consumer-groups.sh로 확인할 수 있다.
컨슈머 그룹 조회
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
hello-group
컨슈머 그룹 별 상세 조회
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group hello-group --describe
Consumer group 'hello-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
hello-group hello.kafka 3 1 1 0 - - -
hello-group hello.kafka 2 1 1 0 - - -
hello-group hello.kafka 1 4 4 0 - - -
hello-group hello.kafka 0 2 2 0 - - -
- GROUP : 컨슈머 그룹 이름
- TOPIC, PARTITION : 해당 컨슈머 그룹이 마지막으로 브로커에게 커밋한 토픽, 파티션
- CURRENT-OFFSET : 해당 컨슈머 그룹이 가져간 토픽의 파티션의 가장 최신 오프셋 값 (현재 offset이 1인 경우, 해당 파티션의 1번째 레코드부터 읽으면 됨을 의미)
(ex. 3번 파티션의 경우, 레코드가 1개 있었으므로, offset 1은 가장 마지막 레코드 (0번째) 그 다음을 의미)
- LOG-END-OFFSET : 해당 파티션에 저장된 가장 최신 offset을 의미
(ex. LOG-END-OFFSET이 3인 경우, 0,1,2번째 레코드가 있음을 의미)
- LAG : 컨슈머 그룹이 토픽의 파티션에 있는 데이터를 가져가는데 얼마나 지연이 발생하는 지를 나타내는 지표.
컨슈머 그룹이 커밋한 offset과 해당 파티션의 가장 최신 offset의 차이를 값으로 가짐 (LOG-END-OFFSET - CURRENT-OFFSET)
(LAG값이 증가하고 있다 == 프로듀서의 속도에 비해 컨슈머의 처리 속도가 느리다)
- CONSUMER-ID : 컨슈머의 토픽 및 파티션 할당을 카프카 내부적으로 구분하기 위한 id.
client id에 uuid를 추가하여 자동할당되며, 유니크한 값을 가짐
- HOST : 컨슈머가 동작하는 host 명
(인가된 컨슈머에게 메시지가 전송 중인지 확인 가능)
- CLIENT-ID : 컨슈머에 할당된 id. 사용자가 지정하지 않으면 자동 생성됨.
kafka-verifiable로 시작하는 스크립트로 별도 코드 없이 프로듀서, 컨슈머 네트워크 동작을 테스트할 수 있다.
verify-test라는 토픽에 3개의 파티션을 만들어 주었다. (별도 생성하지 않고, kafka-verifiable-producer.sh 등으로 없는 토픽 이름 지정 시, default 값으로 해당 토픽 생성)
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic verify-test --partitions 3
kafka-verifiable-producer.sh로 20개의 메시지를 발생시켰다.
./bin/kafka-verifiable-producer.sh --bootstrap-server localhost:9092 --max-messages 20 --topic verify-test
{"timestamp":1735724468050,"name":"startup_complete"}
{"timestamp":1735724468325,"name":"producer_send_success","key":null,"value":"0","offset":0,"topic":"verify-test","partition":2}
{"timestamp":1735724468330,"name":"producer_send_success","key":null,"value":"1","offset":1,"topic":"verify-test","partition":2}
{"timestamp":1735724468330,"name":"producer_send_success","key":null,"value":"2","offset":2,"topic":"verify-test","partition":2}
{"timestamp":1735724468330,"name":"producer_send_success","key":null,"value":"3","offset":3,"topic":"verify-test","partition":2}
{"timestamp":1735724468331,"name":"producer_send_success","key":null,"value":"4","offset":4,"topic":"verify-test","partition":2}
{"timestamp":1735724468331,"name":"producer_send_success","key":null,"value":"5","offset":5,"topic":"verify-test","partition":2}
{"timestamp":1735724468331,"name":"producer_send_success","key":null,"value":"6","offset":6,"topic":"verify-test","partition":2}
{"timestamp":1735724468331,"name":"producer_send_success","key":null,"value":"7","offset":7,"topic":"verify-test","partition":2}
{"timestamp":1735724468332,"name":"producer_send_success","key":null,"value":"8","offset":8,"topic":"verify-test","partition":2}
{"timestamp":1735724468332,"name":"producer_send_success","key":null,"value":"9","offset":9,"topic":"verify-test","partition":2}
{"timestamp":1735724468332,"name":"producer_send_success","key":null,"value":"10","offset":10,"topic":"verify-test","partition":2}
{"timestamp":1735724468332,"name":"producer_send_success","key":null,"value":"11","offset":11,"topic":"verify-test","partition":2}
{"timestamp":1735724468333,"name":"producer_send_success","key":null,"value":"12","offset":12,"topic":"verify-test","partition":2}
{"timestamp":1735724468333,"name":"producer_send_success","key":null,"value":"13","offset":13,"topic":"verify-test","partition":2}
{"timestamp":1735724468333,"name":"producer_send_success","key":null,"value":"14","offset":14,"topic":"verify-test","partition":2}
{"timestamp":1735724468334,"name":"producer_send_success","key":null,"value":"15","offset":15,"topic":"verify-test","partition":2}
{"timestamp":1735724468337,"name":"producer_send_success","key":null,"value":"16","offset":16,"topic":"verify-test","partition":2}
{"timestamp":1735724468338,"name":"producer_send_success","key":null,"value":"17","offset":17,"topic":"verify-test","partition":2}
{"timestamp":1735724468338,"name":"producer_send_success","key":null,"value":"18","offset":18,"topic":"verify-test","partition":2}
{"timestamp":1735724468338,"name":"producer_send_success","key":null,"value":"19","offset":19,"topic":"verify-test","partition":2}
{"timestamp":1735724468353,"name":"shutdown_complete"}
{"timestamp":1735724468355,"name":"tool_data","sent":20,"acked":20,"target_throughput":-1,"avg_throughput":65.14657980456026}
모두 2번 파티션에 메시지가 기록되었으며, offset도 적절하게 할당되었다.
(--max-messages 값을 -1로 설정하면 kafka-verifiable-producer.sh이 종료될때까지 메시지를 생성한다.)
kafka-verifiable-consumer.sh로 해당 토픽에 기록된 메시지를 아래와 같이 가져온다.
./bin/kafka-verifiable-consumer.sh --bootstrap-server localhost:9092 --topic verify-test --group-id verify-test-group
{"timestamp":1735724899838,"name":"startup_complete"}
{"timestamp":1735724900142,"name":"partitions_assigned","partitions":[{"topic":"verify-test","partition":0},{"topic":"verify-test","partition":1},{"topic":"verify-test","partition":2}]}
{"timestamp":1735724900213,"name":"records_consumed","count":20,"partitions":[{"topic":"verify-test","partition":2,"count":20,"minOffset":0,"maxOffset":19}]}
{"timestamp":1735724900224,"name":"offsets_committed","offsets":[{"topic":"verify-test","partition":2,"offset":20}],"success":true}
--group-id로 컨슈머 그룹을 지정한다. (없었으므로 자동 생성)
총 20개의 메시지를 2번 파티션으로부터 가져옴에 성공했음을 알 수 있다.
kafka-consumer-groups.sh로 확인하면 아래와 같이 2번 파티션에서 20번 offset을 최신으로 업데이트했음을 알 수 있다.
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group verify-test-group --describe
Consumer group 'verify-test-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
verify-test-group verify-test 2 20 20 0 - - -
kafka-delete-records.sh 을 이용해 파티션의 레코드를 삭제할 수 있다.
앞서 verify-test의 2번 파티션에 20개의 레코드가 기록되어 있으므로 이 중 10개를 삭제하려면 다음과 같다.
vim delete-records.json
{
"partitions":[{
"topic":"verify-test",
"partition":2,
"offset":10
}],
"version":1
}
- topic : 삭제할 파티션이 존재하는 토픽 이름
- partition : 몇 번 파티션인지 지정
- offset : 해당 파티션의 offset 값 미만의 레코드를 삭제 (10인 경우, 0 ~ 9 offset에 해당하는 레코드를 삭제함을 의미)
kafka-delete-records.sh을 이용해 아래와 같이 작성된 json파일을 기반하여 레코드 삭제를 진행한다.
./bin/kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file delete-records.json
Executing records delete operation
Records delete operation completed:
partition: verify-test-2 low_watermark: 10
주의 사항
- 특정 offset의 레코드만 삭제하는 것은 불가능하다. 0번 offset부터 삭제가 되므로 주의해야 한다.
삭제 확인
kafka-console-consumer.sh로 verify-test 토픽의 2번 파티션에 대해 레코드를 조회한다.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic verify-test --from-beginning --partition 2
10
11
12
13
14
15
16
17
18
19
^CProcessed a total of 10 messages
이 때, 삭제가 되더라도, offset 자체는 유지가 되기 때문에 offset 계산 시, 별도로 신경 쓸 것은 없다.
삭제 후 아래와 같이 0번 offset부터 레코드를 확인하면, 0번 offset에는 아무것도 없기 때문에 출력을 할 수 없다.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic verify-test --partition 2 --offset 0
^CProcessed a total of 0 messages
10번 offset부터 출력 시, 정상적으로 출력된다.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic verify-test --partition 2 --offset 10
10
11
12
13
14
15
16
17
18
19
^CProcessed a total of 10 messages
'메모 > kafka' 카테고리의 다른 글
producer api (0) | 2025.01.04 |
---|---|
kafka 기본 개념 (0) | 2025.01.04 |
kafka-topics.sh (3) | 2025.01.01 |
kafka 설치 및 확인 (1) | 2025.01.01 |
kafka 공부하기 (0) | 2024.12.30 |