티스토리 뷰
카프카 커넥트 : 데이터 파이프라인 생성 시 반복 작업을 줄이고, 효율적인 전송을 위한 애플리케이션
파이프라인 구성 시, 매번 프로듀서, 컨슈머 애플리케이션을 개발하고 배포하는 것은 번거롭기 때문에 커넥트를 사용함
커넥터 (Connector) : 특정 작업 형태를 템플릿으로 만들어 둔 것. 커넥트는 커넥터를 실행함으로써 반복 작업을 줄임.
소스 커넥터 (Source Connector) : 일종의 프로듀서 역할. 특정 파일에서 데이터를 계속 읽어와 전송하거나, db에서 특정 스키마를 통해 지속적으로 읽어옴.
싱크 커넥터 (Sink Connector) : 일종의 컨슈머 역할. 특정 토픽에서 데이터를 가져와, 파일, db 등에 지속적으로 저장을 함.
미러메이커2 커넥터 (클러스터 간 토픽 미러링. SPoF 방지.), 파일 싱크, 소스 커넥터 등을 기본으로 제공함.
MySQL, S3 등 db, 스토리지에 대한 커넥터를 플러그인 형태로 추가할 수 있음 (jar 파일 추가)
공개된 커넥터는 아래 페이지에서 확인 가능 (단, 라이센스 확인 후 상용 사용 필요)
사용자가 커넥트에 커넥터 생성 명령을 내리면, 커넥트는 내부에 커넥터와 태스크를 생성하며, 커넥터는 태스크들을 관리함.
태스크 : 실질적인 데이터 처리 기능 수행
컨버터 : 데이터 처리 전 스키마 변경을 도와줌. JsonConverter, StringConverter, ByteArrayConverter 지원하며, 커스텀 컨버터 작성 가능
트랜스폼 (Transform) : 데이터 처리 시, 각 메시지 단위로 데이터를 간단히 변환하기 위한 용도로 사용
(ex. JSON 데이터에 트랜스폼을 사용하면 특정 키 삭제, 추가 등 가능)
(기본 제공 트랜스폼 : Cast, Drop, ExtractField 등)
커넥트 실행 방법
단일 모드 커넥트 (standalone mode) : 단일 애플리케이션으로 실행되며, 커넥터를 정의하는 파일을 작성하고, 해당 파일을 참조하는 단일 모드 커넥트를 실행함으로써 파이프라인을 생성한다.
1개의 프로세스만 실행되므로, SPoF가 발생할 수 있다. 개발환경이나 중요도 낮은 파이프라인 운영 시에만 사용한다.
분산 모드 커넥트 (distributed mode) : 2대 이상의 서버에서 클러스터로 운영한다. 고가용성을 유지할 수 있으며, 스케일 아웃을 통해 무중단 서비스를 실행할 수 있다. 그리고 이를 통해 처리량도 늘릴 수 있다.
실 운영 환경이라면 분산 모드 커넥트를 사용해야 한다.
Rest API를 통해 커넥트와 관련된 정보를 조회 및 수정할 수 있다.
- Method | path | description
- GET | / | 실행 중인 커넥트 정보 확인
- GET | /connectors | 실행 중인 커넥터 이름 확인
- POST | /connectors | 새로운 커넥터 생성 요청
- GET | /connectors/{connector name} | 실행 중인 커넥터 정보 확인
- GET | /connectors/{connector name}/config | 실행 중인 커넥터 설정값 확인
- PUT | /connectors/{connector name}/config | 실행 중인 커넥터 설정값 변경 요청
- GET | /connectors/{connector name}/status | 실행 중인 커넥터 상태 확인
- POST | /connectors/{connector name}/restart | 실행 중인 커넥터 재시작 요청
- PUT | /connectors/{connector name}/pause | 커넥터 일시 중지 요청
- PUT | /connectors/{connector name}/resume | 일시 중지된 커넥터 실행 요청
- DELETE | /connectors/{connector name}/ | 실행 중인 커넥터 종료
- GET | /connectors/{connector name}/tasks | 실행 중인 커넥터의 태스크 정보 확인
- GET | /connectors/{connector name}/tasks/{task id}/status | 실행 중인 커넥터의 태스크 상태 확인
- POST | /connectors/{connector name}/tasks/{task id}/restart | 실행 중인 커넥터의 태스크 재시작 요청
- GET | /connectors/{connector name}/topics | 커넥터 별 연동된 토픽 정보 확인
- GET | /connector-plugins/ | 커넥트에 존재하는 커넥터 플러그인 확인
- PUT | /connector-plugins/{plugin name}/config/validate | 커넥터 생성 시 설정값 유효 여부 확인
단일 모드 커넥트
connect-standalone.properties : 단일 모드 커넥트를 참조하는 설정 파일
connect-standalone.properties 옵션 내용
- bootstrap.servers : 커넥트와 연동할 카프카 클러스터 호스트:포트 입력, 2개 이상 브로커로 이루어진 클러스터와 연동 시, 콤마로 구분하여 브로커 정보 입력
- key.converter, value.converter : 카프카에 데이터 저장 / 카프카에서 데이터 가져올 때 사용할 직렬화, 역직렬화 타입 지정
- key.converter.schemas.enable, value.converter.schemas.enable : 토픽에 메시지 전송 시, 메시지의 schema를 같이 보낼 지 (json이면 각 필드 별 타입 명시 등) 지정하는 옵션
- offset.storage.file.filename : 오프셋 정보를 저장할 파일 지정. 소스 커넥터, 싱크 커넥터의 데이터 처리 시점을 저장하기 위해 사용한다. 소스 커넥터 종료 후 재시작 시, 오프셋 정보를 통해 마지막으로 읽은 파일 위치부터 다시 읽어 토픽으로 데이터를 저장하고, 싱크 커넥터 종료 후 재시작 시, 오프셋 정보를 통해 마지막으로 읽은 레코드 오프셋부터 다시 읽어 토픽으로부터 데이터를 가져온다.
- offset.flush.interval.ms : 태스크가 처리 완료한 오프셋을 커밋하는 주기를 설정한다.
- plugin.path : 플러그인 형태의 커넥터 디렉토리 (jar 파일이 있는 디렉토리) 를 입력한다. 여러 개인 경우, 콤마로 구분하여 입력한다. converter, transform도 추가 가능하다.
단일 모드 커넥트는 커넥트 설정파일 (connect-standalone.properties) 와 함께 커넥터 설정파일도 정의하여 실행해야 한다.
예시를 위해 connect-file-source.properties (기본으로 제공되는 파일 소스 커넥터) 를 사용하자.
connect-file-source.properties 옵션 내용
- name : 커넥터 이름을 지정한다. 커넥터 이름은 커넥트 내에서 유일해야 한다.
- connector.class : 사용할 커넥터 클래스를 지정한다.
- tasks.max : 커넥터로 실행할 태스크 개수를 지정한다. 태스크 개수를 늘려 병렬처리가 가능하다.
- file : 읽을 파일을 지정한다.
- topic : 읽은 데이터를 저장할 토픽을 지정한다.
테스트를 위해 ./config/connect-test/ 디렉토리에 connect-standalone.properties, connect-file-source.properties 를 카피하고,
connect-standalone.properties에서
key.converter = = org.apache.kafka.connect.storage.StringConverter
value.converter = org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable = false
value.converter.schemas.enable = false
connect-file-source.properties에서
file=/Users/hobin/test.txt
topic=connect-test
와 같이 설정하였으며, 나머지는 기본 옵션으로 진행하였다.
단일 모드 커넥트 실행 시, 파라미터로 커넥트 설정파일과 커넥터 설정파일을 순서대로 넣으면 된다.
./bin/connect-standalone.sh ./config/connect-test/connect-standalone.properties ./config/connect-test/connect-file-source.properties
실행 시 주기적으로 test.txt 파일을 읽는 로그가 발생하며, 이후 test.txt에 아무 말이나 넣으면 connect-test에 다음과 같이 메시지가 잘 저장됨을 확인할 수 있다.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
123
123
123
1234
분산 모드 커넥트
분산 모드 커넥트는 2개 이상의 프로세스가 1개의 그룹으로 묶여서 클러스터로 운영된다.
분산 모드 커넥트를 운영하기 위해서는 connect-distributed.properties 를 설정하면 되고, 서로 다른 서버에서 ./bin/connect-distributed.sh 실행 시, 동일한 카프카 클러스터 브로커 설정, 그룹 id를 지정한 connect-distributed.properties 지정 시, 자동으로 동일 그룹으로 인식한다.
connect-distributed.properties 옵션
(offset, config, status 옵션 값은 커넥트 그룹 내의 커넥트는 같은 값으로 동기화가 된다.)
(서로 다른 옵션으로 동일 커넥트 그룹에 포함할 시, rest api로 추가한 경우, 모든 커넥트에 rest api로 설정한 내용이 적용되며, properties 파일로 직접 그룹에 포함시킬 시, 이미 동기화된 내용으로 추가가 된다.)
- bootstrap.servers : 커넥트와 연동할 카프카 클러스터 호스트:포트
- group.id : 커넥트 프로세스를 묶을 그룹 이름을 지정. 동일 값으로 설정된 커넥트를 같은 그룹으로 자동 인식한다.
- key.converter, value.converter : 카프카에 데이터 저장 / 카프카에서 데이터 불러올 때 직렬화, 역직렬화 방식 지정
- key.converter.schemas.enable, value.converter.schemas.enable : 메시지에 스키마 포함 여부 지정
- offset.storage.topic : 오프셋 값을 저장할 토픽
- offset.storage.replication.factor : 오프셋 값을 저장할 토픽의 복제본 개수. 실 운영 시 3 이상의 값으로 지정. 지정한 값만큼 클러스터 내의 브로커에 해당 토픽이 복제됨
- offset.storage.partitions : 오프셋 값을 저장할 토픽의 파티션 개수
- config.storage.topic : 커넥트 설정값을 저장할 토픽
- config.storage.replication.factor : 커넥트 설정값을 저장할 토픽의 복제본 개수
- config.storage.partitions : 커넥트 설정값을 저장할 토픽의 파티션 개수
- status.storage.topic : 커넥트 클러스터의 상태 정보 (커넥터, 태스크 모니터링) 를 저장할 토픽
- status.storage.replication.factor : 커넥트 클러스터 상태 정보를 저장할 토픽의 복제본 개수
- status.storage.partitions : 커넥트 클러스터 상태 정보를 저장할 토픽의 파티션 개수
- offset.flush.interval.ms : 태스크가 처리 완료한 오프셋을 커밋하는 주기
- rest.host.name : 로컬에서 rest api를 요청할 호스트 지정
- rest.port : 로컬에서 rest api를 요청할 포트 지정
- rest.advertised.host.name : 외부 클라이언트가 rest api 요청할 호스트 지정
- rest.advertised.port : 외부 클라이언트가 rest api 요청할 포트 지정
- plugin.path : 플러그인 커넥터 디렉토리 입력
현재 로컬에서 카프카를 운영 중이므로 분산 모드이지만, 하나의 프로세스만 실행하여 rest api를 테스트 해본다.
분산 모드에서는 rest api로 커넥트 관련 실행이 가능하므로 커넥트 설정파일만으로 실행을 한다.
./bin/connect-distributed.sh ./config/connect-test/connect-distributed.properties
커넥트 rest api 기본 포트는 8083이며 아래와 같이 연결된 카프카 클러스터 정보를 알 수 있다.
curl -X GET http://localhost:8083/
{"version":"2.5.1","commit":"0efa8fb0f4c73d92","kafka_cluster_id":"QxRC2qw6QtaJtGZ5Pa_zag"}
kafka 2.5.1 버전에서 사용 가능한 기본 플러그인 정보는 다음과 같다.
curl -X GET http://localhost:8083/connector-plugins
[{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.5.1"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.5.1"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
단일 모드와 동일하게 FileStreamSourceConnector를 실행하면 다음과 같다.
curl -X POST -H "Content-Type:application/json" \
--data '{ \
"name":"local-file-source", \
"config":{ \
"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector", \
"file":"/Users/hobin/test.txt", \
"tasks.max":"1", \
"topic":"distributed-connect-test" \
} \
}' \
http://localhost:8083/connectors
동일하게 /Users/hobin/test.txt 파일을 읽었고, 이번 설정은 JsonConverter, schema enable 설정 (기본 옵션들) 로 진행되었다.
지정한 distributed-connect-test 토픽을 열면 다음 내용을 볼 수 있다.
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic distributed-connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"123"}
{"schema":{"type":"string","optional":false},"payload":"123"}
{"schema":{"type":"string","optional":false},"payload":"123"}
{"schema":{"type":"string","optional":false},"payload":"1234"}
json 포맷으로 스키마까지 정상적으로 토픽에 저장하는 FileStreamSourceConnector 커넥터 (name = local-file-source)가 정상적으로 생성되었음을 알 수 있다.
생성한 local-file-source 커넥터의 상태를 확인하면 다음과 같다.
curl -X GET http://localhost:8083/connectors/local-file-source/status
{"name":"local-file-source","connector":{"state":"RUNNING","worker_id":"127.0.0.1:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"127.0.0.1:8083"}],"type":"source"}
커넥터는 계속해서 토픽에 데이터 저장/불러오기를 시행하므로, 리소스 낭비를 줄이기 위해서는 커넥터를 종료하는 것이 좋다.
(커넥트는 애플리케이션, 커넥터는 태스크를 통해 실질적인 데이터 처리를 수행하므로, 커넥터 해제하여 리소스 낭비를 줄이는 것이 좋음)
(종료하여도 동일 커넥트 그룹의 설정값(config, offset, status)은 카프카 클러스터 내부의 스토리지에 저장되므로, 새로운 커넥터 추가 시, 이전 오프셋부터 처리를 한다. 그리고 모든 커넥트 해재 후, 다시 커넥트 연결하여도 config, offset, status 값을 바탕으로 커넥트 그룹에서 처리된 내용에 이어서 데이터를 처리한다.)
curl -X DELETE http://localhost:8083/connectors/local-file-source
정상적으로 커넥터가 해제되었는 지 확인한다.
curl -X GET http://localhost:8083/connectors
[]
local-file-source 커넥터를 다시 조회하면 404 에러가 발생한다.
curl -X GET http://localhost:8083/connectors/local-file-source/status
{"error_code":404,"message":"No status found for connector local-file-source"}
'메모 > kafka' 카테고리의 다른 글
kafka source connector, sink connector (0) | 2025.01.18 |
---|---|
Processor API (0) | 2025.01.12 |
Streams DSL (0) | 2025.01.12 |
kafka streams (0) | 2025.01.07 |
admin api (0) | 2025.01.06 |