티스토리 뷰

메모/kafka

kafka 설치 및 확인

4567은 소수 2025. 1. 1. 15:54

일단 설치를 통해 돌아가는 지 간단히 테스트를 한 뒤, 개념적인 정리를 할 예정이다.

 

현재 보고 있는 책에서는 실습을 aws로 진행하지만, 나는 그냥 로컬에다 설치해서 테스트를 할 것이다. 

(이유 : 인스턴스 새로 만들기 귀찮 + 새로 이사한 집에 아직 인터넷 설치가 안 되서 핫스팟으로 계속 사용 중이라 ip 계속 변경)

(security group에 any로 넣고 테스트해도 되지만, 핫스팟 자체가 끊길 때가 많아 그냥 로컬에 설치하기로 함)

 

kafka install : https://kafka.apache.org/downloads

 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

wget으로 아래와 같이 특정 버전을 다운받아도 된다. 

 

wget https://archive.apache.org/dist/kafka/2.5.1/kafka_2.12-2.5.1.tgz

 

책에서 실습으로 2.5.0 버전을 이용 중이길래 2.5.1로 그냥 선정했고, 공식 페이지에서 scalar 2.12 버전을 추천하길래 그냥 2.12로 선정했다. 

(핫스팟이라 그런지 50mb 다운받는데 30분이 걸린다! 그냥 aws로 할 걸....)

 

다운로드 완료 후, 원하는 디렉토리에 압축을 해제한다. 

 

tar xvf kafka_2.12-2.5.1tgz

 

이후 해당 디렉토리에서 아래 작업들로 설정 및 kafka 실습을 진행한다.


 

1. bin/kafka-server-start.sh

 

카프카 브로커를 실행하기 위한 스크립트이다. 

 

환경변수로 설정해 줄 수 있는 것들은 아래와 같다.

KAFKA_LOG4J_OPTS : log4j 관련 properties 파일 경로 지정

KAFKA_HEAP_OPTS : 브로커가 사용할 힙 메모리 

(브로커는 레코드 내용을 페이지 캐시로 시스템 메로리를 사용, 나머지 객체를 힙 메모리에 저장하여 사용)

(운영 시, 힙 메모리를 5gb 이상 설정하지 않는 것이 일반적)

(Xmx : 힙 최대 크기, Xms : 힙 최소 크기)

 

2. config/server.properties 

 

카프카 브로커가 클러스터 운영에 필요한 옵션을 지정하는 properties 파일

 

broker.id : 브로커를 식별하는 고유 id, 기본값은 0, 클러스터 내의 각 브로커는 고유한 id를 가져야 한다. (중복 시 클러스터링에 문제 발생 가능)

 

listeners : 브로커가 수신할 네트워크 요청 대상 프로토콜과 주소를 정의

(format : listener_name://host_name:port)

(ex : PLAINTEXT://0.0.0.0:9092) 

(카프카 기본 포트 9092)

 

advertised.listeners : 클라이언트가 브로커에 연결할 때 사용하는 주소 

(format : listener_name://host_name:port)

(ex : PLAINTEXT://127.0.0.1:9092) 

 

listener.security.protocol.map : listener와 보안 프로토콜 간의 매핑을 위한 설정. SASL_SSL, SASL_PLAINTEXT 등 보안 설정 시 사용

 

num.network.threads : 네트워크 요청을 처리하는 스레드 수를 지정. 기본값은 3

(과한 스레드 사용 시, 스레드 오버헤드 발생 가능)

 

num.io.threads : 브로커 내부에서 I/O 작업을 처리하는 스레드 수를 지정. 기본값은 8

 

socket.send.buffer.bytes : 브로커가 데이터를 전송할 때 사용하는 소켓 송신 버퍼 크기. 기본값은 102400 (100kb)

(버퍼 크기 작게 설정 시, 데이터 전송 시 병목 가능)

(버퍼 너무 크게 설정 시, 메모리 사용량 증가)

(시스템 네트워크 설정 (net.core.wmem_max) 보다 큰 값 설정해도 효과는 없음)

 

socket.receive.buffer.bytes : 브로커가 데이터 수신할 때 사용하는 소켓 수신 버퍼 크기. 기본값은 102400 (100kb)

 

socket.request.max.bytes : 브로커가 클라이언트나 다른 브로커로부터 수신할 수 있는 요청 메시지의 최대 크기. 기본값은 104857600 (100mb)

(프로듀서가 보낸 배치 메시지가 해당 크기를 초과하면 브로커가 해당 메시지를 처리하지 못함)

(토픽 설정의 max.message.bytes 값과 일치하거나 더 크게 설정해야 함)

 

log.dirs : 로그 데이터를 저장할 디렉토리 경로를 지정. 기본값은 /tmp/kafka-logs

(comma로 여러 디렉토리 지정 가능)

(로그 데이터에는 메시지, 메타데이터, 인덱스 파일 등이 포함)

(여러 디렉토리 지정 시, 디렉토리 간 데이터 균등하게 분배하려고 시도하므로, 물리적으로 디스크를 분리하는게 I/O 작업 줄이는데 도움이 됨.)

 

num.partitions : 새로 생성된 토픽의 기본 파티션 수를 설정함. 

(토픽 생성 시 별도 지정하지 않으면 해당 값을 사용)

(파티션 수는 토픽 생성 후 변경 불가)

 

num.recovery.threads.per.data.dir : 로그 디렉토리 당 복구 작업에 사용할 스레드 수를 설정. 기본값은 1

 

offsets.topic.replication.factor : 컨슈머 그룹의 오프셋 정보를 저장하는 내부 토픽(__consumer_offsets)의 복제본 수를 설정. 클러스터 내 브로커 수보다 작아야 하며, 일반적으로 3 이상으로 설정해 장애 발생 시에도 오프셋 정보가 손실되지 않도록 함. 

 

transaction.state.log.replication.factor : 트랜잭션 상태 정보를 저장하는 내부 토픽(__transaction_state)의 복제본 수를 설정. 클러스터 내 브로커 수보다 작아야 하며, 일반적으로 3 이상으로 설정해 장애 발생 시에도 오프셋 정보가 손실되지 않도록 함.

 

transaction.state.log.min.isr : 트랜잭션 상태 정보를 저장하는 내부 토픽의 최소 ISR (In-Sync Replica)수를 설정함. (ISR : 데이터가 동기화된 복제본 개수)

(transaction.state.log.replication.factor 보다 작아야 하며, 일반적으로 2로 설정)

 

log.flush.interval.messages : 특정 메시지 개수마다 로그를 디스크에 강제로 쓰기 (flush). 기본값은 비활성화. 

 

log.flush.interval.ms : 특정 시간 간격(ms)마다 로그를 디스크에 강제로 flush. 기본값은 비활성화.

 

log.retention.hours : 로그 데이터가 유지되는 시간 (hour). 기본값은 168 (7일). 설정 시간 이후, 로그 세그먼트 파일이 삭제됨. 

 

log.retention.bytes : 로그 데이터가 유지되는 최대 크기 (byte). 기본값은 비활성화 (-1). 설정 크기 이후, 로그 세그먼트 파일이 삭제됨. 

 

log.segment.bytes : 로그 세그먼트 파일의 최대 크기 (byte). 기본값은 1073741824 (1gb). 설정 크기 이후, 로그가 새 파일로 분할됨. 

 

log.retention.check.interval.ms : 로그 삭제 조건을 확인하는 주기 (ms). 기본값은 300000 (5분)

 

zookeeper.connect : 카프카 브로커가 연결할 zookeeper 서버 주소. comma로 여러 zookeeper 서버 지정 가능

(format : host:port)

(ex: 127.0.0.1:2181)

(kraft 모드에서는 지원하지 않음)

 

zookeeper.connection.timeout.ms : zookeeper 서버와 연결 대기 시간 (ms). 기본값은 18000 (18초)

(kraft 모드에서는 지원하지 않음)

 

group.initial.rebalance.delay.ms : 새로운 컨슈머 그룹이 처음 시작될 때 대기하는 시간 (ms). 기본값은 3000 (3초)

(컨슈머 그룹 내 파티션 할당을 조정하는 리밸런스를 위함)

 


 

3. zookeeper 실행 

 

아래 커맨드로 zookeeper를 실행할 수 있다. (실제 운영 환경에서는 3대 이상 필요)

 

./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties

 

(데몬 동작 확인 : jps -vm)

(-v : jvm 인자 확인, -m : main 메서드 인자 확인)

 

확인 결과

3108 QuorumPeerMain config/zookeeper.properties -Xmx512M -Xms512M -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/Users/hobin/TIL/2024/kafka_study/kafka_2.12-2.5.1/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=102400 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/Users/hobin/TIL/2024/kafka_study/kafka_2.12-2.5.1/bin/../logs -Dlog4j.configuration=file:./bin/../config/log4j.properties
3132 Jps -vm -Dapplication.home=/Library/Java/JavaVirtualMachines/jdk-17.jdk/Contents/Home -Xms8m -Djdk.module.main=jdk.jcmd

 

 

4. 카프카 브로커 실행 

 

아래 커맨드로 브로커를 실행할 수 있다. (실제 운영 환경에서는 3대 이상 필요)

 

./bin/kafka-server-start.sh -daemon ./config/server.properties

 

확인 결과

4561 Jps -vm -Dapplication.home=/Library/Java/JavaVirtualMachines/jdk-17.jdk/Contents/Home -Xms8m -Djdk.module.main=jdk.jcmd
3108 QuorumPeerMain config/zookeeper.properties -Xmx512M -Xms512M -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/Users/hobin/TIL/2024/kafka_study/kafka_2.12-2.5.1/bin/../logs/zookeeper-gc.log:time,tags:filecount=10,filesize=102400 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/Users/hobin/TIL/2024/kafka_study/kafka_2.12-2.5.1/bin/../logs -Dlog4j.configuration=file:./bin/../config/log4j.properties
4543 Kafka ./config/server.properties -Xmx1G -Xms1G -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:MaxInlineLevel=15 -Djava.awt.headless=true -Xlog:gc*:file=/Users/hobin/TIL/2024/kafka_study/kafka_2.12-2.5.1/bin/../logs/kafkaServer-gc.log:time,tags:filecount=10,filesize=102400 -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/Users/hobin/TIL/2024/kafka_study/kafka_2.12-2.5.1/bin/../logs -Dlog4j.configuration=file:./bin/../config/log4j.properties

 

 

5. 브로커 동작 확인 

 

아래 커맨드로 브로커 동작을 확인할 수 있다. 

실제 운영 시, 외부 프로듀서, 컨슈머 입장에서 브로커가 살아있는 지 확인하는 간단한 방법은 version 확인이고, 카프카에서 version-api를 제공 중이다. 

(카프카 브로커 서버 이외의 환경에서 브로커와 동일한 버전을 사용하여 통신하는 것을 추천한다.)

 

./bin/kafka-broker-api-versions.sh --bootstrap-server <broker ip>:<broker port>

(ex : ./bin/kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9092)

 

확인 결과

127.0.0.1:9092 (id: 0 rack: null) -> (
	Produce(0): 0 to 8 [usable: 8],
	Fetch(1): 0 to 11 [usable: 11],
	ListOffsets(2): 0 to 5 [usable: 5],
	Metadata(3): 0 to 9 [usable: 9],
	LeaderAndIsr(4): 0 to 4 [usable: 4],
	StopReplica(5): 0 to 2 [usable: 2],
	UpdateMetadata(6): 0 to 6 [usable: 6],
	ControlledShutdown(7): 0 to 3 [usable: 3],
	OffsetCommit(8): 0 to 8 [usable: 8],
	OffsetFetch(9): 0 to 7 [usable: 7],
	FindCoordinator(10): 0 to 3 [usable: 3],
	JoinGroup(11): 0 to 7 [usable: 7],
	Heartbeat(12): 0 to 4 [usable: 4],
	LeaveGroup(13): 0 to 4 [usable: 4],
	SyncGroup(14): 0 to 5 [usable: 5],
	DescribeGroups(15): 0 to 5 [usable: 5],
	ListGroups(16): 0 to 3 [usable: 3],
	SaslHandshake(17): 0 to 1 [usable: 1],
	ApiVersions(18): 0 to 3 [usable: 3],
	CreateTopics(19): 0 to 5 [usable: 5],
	DeleteTopics(20): 0 to 4 [usable: 4],
	DeleteRecords(21): 0 to 1 [usable: 1],
	InitProducerId(22): 0 to 3 [usable: 3],
	OffsetForLeaderEpoch(23): 0 to 3 [usable: 3],
	AddPartitionsToTxn(24): 0 to 1 [usable: 1],
	AddOffsetsToTxn(25): 0 to 1 [usable: 1],
	EndTxn(26): 0 to 1 [usable: 1],
	WriteTxnMarkers(27): 0 [usable: 0],
	TxnOffsetCommit(28): 0 to 3 [usable: 3],
	DescribeAcls(29): 0 to 2 [usable: 2],
	CreateAcls(30): 0 to 2 [usable: 2],
	DeleteAcls(31): 0 to 2 [usable: 2],
	DescribeConfigs(32): 0 to 2 [usable: 2],
	AlterConfigs(33): 0 to 1 [usable: 1],
	AlterReplicaLogDirs(34): 0 to 1 [usable: 1],
	DescribeLogDirs(35): 0 to 1 [usable: 1],
	SaslAuthenticate(36): 0 to 2 [usable: 2],
	CreatePartitions(37): 0 to 2 [usable: 2],
	CreateDelegationToken(38): 0 to 2 [usable: 2],
	RenewDelegationToken(39): 0 to 2 [usable: 2],
	ExpireDelegationToken(40): 0 to 2 [usable: 2],
	DescribeDelegationToken(41): 0 to 2 [usable: 2],
	DeleteGroups(42): 0 to 2 [usable: 2],
	ElectLeaders(43): 0 to 2 [usable: 2],
	IncrementalAlterConfigs(44): 0 to 1 [usable: 1],
	AlterPartitionReassignments(45): 0 [usable: 0],
	ListPartitionReassignments(46): 0 [usable: 0],
	OffsetDelete(47): 0 [usable: 0]
)

 

'메모 > kafka' 카테고리의 다른 글

producer api  (0) 2025.01.04
kafka 기본 개념  (0) 2025.01.04
kafka-console-producer.sh / kafka-console-consumer.sh / kafka-consumer-groups.sh / kafka-delete-records.sh  (0) 2025.01.01
kafka-topics.sh  (3) 2025.01.01
kafka 공부하기  (0) 2024.12.30
댓글
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
TAG
more
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함