티스토리 뷰
일단 설치를 통해 돌아가는 지 간단히 테스트를 한 뒤, 개념적인 정리를 할 예정이다.
현재 보고 있는 책에서는 실습을 aws로 진행하지만, 나는 그냥 로컬에다 설치해서 테스트를 할 것이다.
(이유 : 인스턴스 새로 만들기 귀찮 + 새로 이사한 집에 아직 인터넷 설치가 안 되서 핫스팟으로 계속 사용 중이라 ip 계속 변경)
(security group에 any로 넣고 테스트해도 되지만, 핫스팟 자체가 끊길 때가 많아 그냥 로컬에 설치하기로 함)
kafka install : https://kafka.apache.org/downloads
wget으로 아래와 같이 특정 버전을 다운받아도 된다.
wget https://archive.apache.org/dist/kafka/2.5.1/kafka_2.12-2.5.1.tgz
책에서 실습으로 2.5.0 버전을 이용 중이길래 2.5.1로 그냥 선정했고, 공식 페이지에서 scalar 2.12 버전을 추천하길래 그냥 2.12로 선정했다.
(핫스팟이라 그런지 50mb 다운받는데 30분이 걸린다! 그냥 aws로 할 걸....)
다운로드 완료 후, 원하는 디렉토리에 압축을 해제한다.
tar xvf kafka_2.12-2.5.1tgz
이후 해당 디렉토리에서 아래 작업들로 설정 및 kafka 실습을 진행한다.
1. bin/kafka-server-start.sh
카프카 브로커를 실행하기 위한 스크립트이다.
환경변수로 설정해 줄 수 있는 것들은 아래와 같다.
KAFKA_LOG4J_OPTS : log4j 관련 properties 파일 경로 지정
KAFKA_HEAP_OPTS : 브로커가 사용할 힙 메모리
(브로커는 레코드 내용을 페이지 캐시로 시스템 메로리를 사용, 나머지 객체를 힙 메모리에 저장하여 사용)
(운영 시, 힙 메모리를 5gb 이상 설정하지 않는 것이 일반적)
(Xmx : 힙 최대 크기, Xms : 힙 최소 크기)
2. config/server.properties
카프카 브로커가 클러스터 운영에 필요한 옵션을 지정하는 properties 파일
broker.id : 브로커를 식별하는 고유 id, 기본값은 0, 클러스터 내의 각 브로커는 고유한 id를 가져야 한다. (중복 시 클러스터링에 문제 발생 가능)
listeners : 브로커가 수신할 네트워크 요청 대상 프로토콜과 주소를 정의
(format : listener_name://host_name:port)
(ex : PLAINTEXT://0.0.0.0:9092)
(카프카 기본 포트 9092)
advertised.listeners : 클라이언트가 브로커에 연결할 때 사용하는 주소
(format : listener_name://host_name:port)
(ex : PLAINTEXT://127.0.0.1:9092)
listener.security.protocol.map : listener와 보안 프로토콜 간의 매핑을 위한 설정. SASL_SSL, SASL_PLAINTEXT 등 보안 설정 시 사용
num.network.threads : 네트워크 요청을 처리하는 스레드 수를 지정. 기본값은 3
(과한 스레드 사용 시, 스레드 오버헤드 발생 가능)
num.io.threads : 브로커 내부에서 I/O 작업을 처리하는 스레드 수를 지정. 기본값은 8
socket.send.buffer.bytes : 브로커가 데이터를 전송할 때 사용하는 소켓 송신 버퍼 크기. 기본값은 102400 (100kb)
(버퍼 크기 작게 설정 시, 데이터 전송 시 병목 가능)
(버퍼 너무 크게 설정 시, 메모리 사용량 증가)
(시스템 네트워크 설정 (net.core.wmem_max) 보다 큰 값 설정해도 효과는 없음)
socket.receive.buffer.bytes : 브로커가 데이터 수신할 때 사용하는 소켓 수신 버퍼 크기. 기본값은 102400 (100kb)
socket.request.max.bytes : 브로커가 클라이언트나 다른 브로커로부터 수신할 수 있는 요청 메시지의 최대 크기. 기본값은 104857600 (100mb)
(프로듀서가 보낸 배치 메시지가 해당 크기를 초과하면 브로커가 해당 메시지를 처리하지 못함)
(토픽 설정의 max.message.bytes 값과 일치하거나 더 크게 설정해야 함)
log.dirs : 로그 데이터를 저장할 디렉토리 경로를 지정. 기본값은 /tmp/kafka-logs
(comma로 여러 디렉토리 지정 가능)
(로그 데이터에는 메시지, 메타데이터, 인덱스 파일 등이 포함)
(여러 디렉토리 지정 시, 디렉토리 간 데이터 균등하게 분배하려고 시도하므로, 물리적으로 디스크를 분리하는게 I/O 작업 줄이는데 도움이 됨.)
num.partitions : 새로 생성된 토픽의 기본 파티션 수를 설정함.
(토픽 생성 시 별도 지정하지 않으면 해당 값을 사용)
(파티션 수는 토픽 생성 후 변경 불가)
num.recovery.threads.per.data.dir : 로그 디렉토리 당 복구 작업에 사용할 스레드 수를 설정. 기본값은 1
offsets.topic.replication.factor : 컨슈머 그룹의 오프셋 정보를 저장하는 내부 토픽(__consumer_offsets)의 복제본 수를 설정. 클러스터 내 브로커 수보다 작아야 하며, 일반적으로 3 이상으로 설정해 장애 발생 시에도 오프셋 정보가 손실되지 않도록 함.
transaction.state.log.replication.factor : 트랜잭션 상태 정보를 저장하는 내부 토픽(__transaction_state)의 복제본 수를 설정. 클러스터 내 브로커 수보다 작아야 하며, 일반적으로 3 이상으로 설정해 장애 발생 시에도 오프셋 정보가 손실되지 않도록 함.
transaction.state.log.min.isr : 트랜잭션 상태 정보를 저장하는 내부 토픽의 최소 ISR (In-Sync Replica)수를 설정함. (ISR : 데이터가 동기화된 복제본 개수)
(transaction.state.log.replication.factor 보다 작아야 하며, 일반적으로 2로 설정)
log.flush.interval.messages : 특정 메시지 개수마다 로그를 디스크에 강제로 쓰기 (flush). 기본값은 비활성화.
log.flush.interval.ms : 특정 시간 간격(ms)마다 로그를 디스크에 강제로 flush. 기본값은 비활성화.
log.retention.hours : 로그 데이터가 유지되는 시간 (hour). 기본값은 168 (7일). 설정 시간 이후, 로그 세그먼트 파일이 삭제됨.
log.retention.bytes : 로그 데이터가 유지되는 최대 크기 (byte). 기본값은 비활성화 (-1). 설정 크기 이후, 로그 세그먼트 파일이 삭제됨.
log.segment.bytes : 로그 세그먼트 파일의 최대 크기 (byte). 기본값은 1073741824 (1gb). 설정 크기 이후, 로그가 새 파일로 분할됨.
log.retention.check.interval.ms : 로그 삭제 조건을 확인하는 주기 (ms). 기본값은 300000 (5분)
zookeeper.connect : 카프카 브로커가 연결할 zookeeper 서버 주소. comma로 여러 zookeeper 서버 지정 가능
(format : host:port)
(ex: 127.0.0.1:2181)
(kraft 모드에서는 지원하지 않음)
zookeeper.connection.timeout.ms : zookeeper 서버와 연결 대기 시간 (ms). 기본값은 18000 (18초)
(kraft 모드에서는 지원하지 않음)
group.initial.rebalance.delay.ms : 새로운 컨슈머 그룹이 처음 시작될 때 대기하는 시간 (ms). 기본값은 3000 (3초)
(컨슈머 그룹 내 파티션 할당을 조정하는 리밸런스를 위함)
3. zookeeper 실행
아래 커맨드로 zookeeper를 실행할 수 있다. (실제 운영 환경에서는 3대 이상 필요)
./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
(데몬 동작 확인 : jps -vm)
(-v : jvm 인자 확인, -m : main 메서드 인자 확인)
확인 결과
3108 QuorumPeerMain config/zookeeper.properties -Xmx512M -Xms512M -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/Users/hobin/TIL/2024/kafka_study/kafka_2.12-2.5.1/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=102400 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/Users/hobin/TIL/2024/kafka_study/kafka_2.12-2.5.1/bin/../logs -Dlog4j.configuration=file:./bin/../config/log4j.properties
3132 Jps -vm -Dapplication.home=/Library/Java/JavaVirtualMachines/jdk-17.jdk/Contents/Home -Xms8m -Djdk.module.main=jdk.jcmd
4. 카프카 브로커 실행
아래 커맨드로 브로커를 실행할 수 있다. (실제 운영 환경에서는 3대 이상 필요)
./bin/kafka-server-start.sh -daemon ./config/server.properties
확인 결과
4561 Jps -vm -Dapplication.home=/Library/Java/JavaVirtualMachines/jdk-17.jdk/Contents/Home -Xms8m -Djdk.module.main=jdk.jcmd
3108 QuorumPeerMain config/zookeeper.properties -Xmx512M -Xms512M -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/Users/hobin/TIL/2024/kafka_study/kafka_2.12-2.5.1/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=102400 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/Users/hobin/TIL/2024/kafka_study/kafka_2.12-2.5.1/bin/../logs -Dlog4j.configuration=file:./bin/../config/log4j.properties
4543 Kafka ./config/server.properties -Xmx1G -Xms1G -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/Users/hobin/TIL/2024/kafka_study/kafka_2.12-2.5.1/bin/../logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=102400 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/Users/hobin/TIL/2024/kafka_study/kafka_2.12-2.5.1/bin/../logs -Dlog4j.configuration=file:./bin/../config/log4j.properties
5. 브로커 동작 확인
아래 커맨드로 브로커 동작을 확인할 수 있다.
실제 운영 시, 외부 프로듀서, 컨슈머 입장에서 브로커가 살아있는 지 확인하는 간단한 방법은 version 확인이고, 카프카에서 version-api를 제공 중이다.
(카프카 브로커 서버 이외의 환경에서 브로커와 동일한 버전을 사용하여 통신하는 것을 추천한다.)
./bin/kafka-broker-api-versions.sh --bootstrap-server <broker ip>:<broker port>
(ex : ./bin/kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9092)
확인 결과
127.0.0.1:9092 (id: 0 rack: null) -> (
Produce(0): 0 to 8 [usable: 8],
Fetch(1): 0 to 11 [usable: 11],
ListOffsets(2): 0 to 5 [usable: 5],
Metadata(3): 0 to 9 [usable: 9],
LeaderAndIsr(4): 0 to 4 [usable: 4],
StopReplica(5): 0 to 2 [usable: 2],
UpdateMetadata(6): 0 to 6 [usable: 6],
ControlledShutdown(7): 0 to 3 [usable: 3],
OffsetCommit(8): 0 to 8 [usable: 8],
OffsetFetch(9): 0 to 7 [usable: 7],
FindCoordinator(10): 0 to 3 [usable: 3],
JoinGroup(11): 0 to 7 [usable: 7],
Heartbeat(12): 0 to 4 [usable: 4],
LeaveGroup(13): 0 to 4 [usable: 4],
SyncGroup(14): 0 to 5 [usable: 5],
DescribeGroups(15): 0 to 5 [usable: 5],
ListGroups(16): 0 to 3 [usable: 3],
SaslHandshake(17): 0 to 1 [usable: 1],
ApiVersions(18): 0 to 3 [usable: 3],
CreateTopics(19): 0 to 5 [usable: 5],
DeleteTopics(20): 0 to 4 [usable: 4],
DeleteRecords(21): 0 to 1 [usable: 1],
InitProducerId(22): 0 to 3 [usable: 3],
OffsetForLeaderEpoch(23): 0 to 3 [usable: 3],
AddPartitionsToTxn(24): 0 to 1 [usable: 1],
AddOffsetsToTxn(25): 0 to 1 [usable: 1],
EndTxn(26): 0 to 1 [usable: 1],
WriteTxnMarkers(27): 0 [usable: 0],
TxnOffsetCommit(28): 0 to 3 [usable: 3],
DescribeAcls(29): 0 to 2 [usable: 2],
CreateAcls(30): 0 to 2 [usable: 2],
DeleteAcls(31): 0 to 2 [usable: 2],
DescribeConfigs(32): 0 to 2 [usable: 2],
AlterConfigs(33): 0 to 1 [usable: 1],
AlterReplicaLogDirs(34): 0 to 1 [usable: 1],
DescribeLogDirs(35): 0 to 1 [usable: 1],
SaslAuthenticate(36): 0 to 2 [usable: 2],
CreatePartitions(37): 0 to 2 [usable: 2],
CreateDelegationToken(38): 0 to 2 [usable: 2],
RenewDelegationToken(39): 0 to 2 [usable: 2],
ExpireDelegationToken(40): 0 to 2 [usable: 2],
DescribeDelegationToken(41): 0 to 2 [usable: 2],
DeleteGroups(42): 0 to 2 [usable: 2],
ElectLeaders(43): 0 to 2 [usable: 2],
IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
AlterPartitionReassignments(45): 0 [usable: 0],
ListPartitionReassignments(46): 0 [usable: 0],
OffsetDelete(47): 0 [usable: 0]
)
'메모 > kafka' 카테고리의 다른 글
producer api (0) | 2025.01.04 |
---|---|
kafka 기본 개념 (0) | 2025.01.04 |
kafka-console-producer.sh / kafka-console-consumer.sh / kafka-consumer-groups.sh / kafka-delete-records.sh (0) | 2025.01.01 |
kafka-topics.sh (3) | 2025.01.01 |
kafka 공부하기 (0) | 2024.12.30 |