Kafka CDC Guide
카프카 CDC 구축하기 (로컬 환경)
1. 컨테이너 띄우기
아래와 같은 docker-compose
파일로 카프카
, 카프카 UI
, 카프카 Connect
를 컨테이닝한다.
networks:
kafka_network:
volumes:
Kafka00:
driver: local
Kafka01:
driver: local
Kafka02:
driver: local
services:
sql-server-db:
image: mcr.microsoft.com/mssql/server:2022-latest
ports:
- "1433:1433"
environment:
ACCEPT_EULA: "Y"
SA_PASSWORD: "admin12#$"
MSSQL_AGENT_ENABLED: "true"
MSSQL_COLLATION: "LATIN1_GENERAL_100_CI_AS_SC_UTF8"
cap_add:
- SYS_PTRACE
Kafka00Service:
image: bitnami/kafka:3.5.1-debian-11-r44
restart: unless-stopped
container_name: Kafka00Container
ports:
- '10000:9094'
environment:
- KAFKA_CFG_BROKER_ID=0
- KAFKA_CFG_NODE_ID=0
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka00Service:9092,EXTERNAL://127.0.0.1:10000
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
networks:
- kafka_network
volumes:
- "Kafka00:/bitnami/kafka"
##Kafka 01
Kafka01Service:
image: bitnami/kafka:3.5.1-debian-11-r44
restart: always
container_name: Kafka01Container
ports:
- '10001:9094'
environment:
- KAFKA_CFG_BROKER_ID=1
- KAFKA_CFG_NODE_ID=1
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka01Service:9092,EXTERNAL://127.0.0.1:10001
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
networks:
- kafka_network
volumes:
- "Kafka01:/bitnami/kafka"
##Kafka 02
Kafka02Service:
image: bitnami/kafka:3.5.1-debian-11-r44
restart: always
container_name: Kafka02Container
ports:
- '10002:9094'
environment:
- KAFKA_CFG_BROKER_ID=2
- KAFKA_CFG_NODE_ID=2
- KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka02Service:9092,EXTERNAL://127.0.0.1:10002
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
- KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
networks:
- kafka_network
volumes:
- "Kafka02:/bitnami/kafka"
KafkaConnect:
image: confluentinc/cp-kafka-connect:7.4.3
container_name: KafkaConnect
depends_on:
- Kafka00Service
- Kafka01Service
- Kafka02Service
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
volumes:
- ./data:/data
command:
- bash
- -c
- |
echo "Installing Connector"
confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:2.2.1
confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.1.3
#
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
sleep infinity
networks:
- kafka_network
KafkaWebUiService:
image: provectuslabs/kafka-ui:latest
restart: always
container_name: KafkaWebUiContainer
ports:
- '8080:8080'
environment:
- KAFKA_CLUSTERS_0_NAME=Local-Kraft-Cluster
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
- DYNAMIC_CONFIG_ENABLED=true
- KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED=true
- KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED=true
#- KAFKA_CLUSTERS_0_METRICS_PORT=9999
depends_on:
- Kafka00Service
- Kafka01Service
- Kafka02Service
networks:
- kafka_network
2. MSSQL 접속 및 데이터베이스, 스키마, 테이블 생성
test
데이터베이스를 생성test
데이터베이스 내에Members
스키마를 생성test
데이터베이스 내에Members
스키마를 내에member
테이블 생성
create table Members.member(
member_id bigint primary key,
nickname nvarchar(50)
)
3. CDC적용
3.1 테이블에 CDC적용하기
-- test 라는 데이터베이스를 사용
USE test;
-- test 데이터베이스에 대해 CDC를 활성화
EXEC sys.sp_cdc_enable_db;
-- test 데이터베이스의 변경 추적 옵션을 설정
-- 변경 추적은 데이터베이스의 테이블에서 변경된 행을 식별하는 데 사용
-- 변경 추적 데이터는 3일 동안 보존되고, 자동으로 정리
ALTER DATABASE test SET CHANGE_TRACKING = ON(CHANGE_RETENTION = 3 DAYS, AUTO_CLEANUP = ON)
-- Members 스키마의 member 테이블에 대해 CDC를 활성화
EXEC sys.sp_cdc_enable_table
@source_schema = 'Members',
@source_name = 'member',
@role_name = 'sa';
3.2 카프카 Connect API로 설정 등록하기
아래와 같은 내용을 Body로 하여 카프카 Connect의 주소로 POST요청을 보낸다. 경로는 connectors
이다.
[POST] localhost:8083/connectors
{
"name": "mssql-cdc-members-connector",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": "1",
"database.encrypt": false,
"database.hostname": "host.docker.internal",
"database.port": "1433",
"database.user": "SA",
"database.password": "admin12#$",
"database.names": "test",
"schema.include.list": "Members",
"table.include.list" : "Members.member",
"database.history.kafka.bootstrap.servers":"Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092",
"db.timezone": "Asia/Seoul",
"topic.prefix": "cdc",
"schema.history.internal.kafka.bootstrap.servers":"Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092",
"schema.history.internal.kafka.topic":"schema-history-cdc"
}
}
4. CDC 적용 후 데이터 변경 시키기 (생성, 삭제, 수정)
이번 가이드에서는 생성만 수행한다.
insert into Members.member(member_id, nickname) values (1, 'testNickname1');
insert into Members.member(member_id, nickname) values (2, 'testNickname2');
insert into Members.member(member_id, nickname) values (3, 'testNickname3');
insert into Members.member(member_id, nickname) values (4, 'testNickname4');
insert into Members.member(member_id, nickname) values (5, 'testNickname5');
insert into Members.member(member_id, nickname) values (6, 'testNickname6');
insert into Members.member(member_id, nickname) values (7, 'testNickname7');
insert into Members.member(member_id, nickname) values (8, 'testNickname8');
insert into Members.member(member_id, nickname) values (9, 'testNickname9');
insert into Members.member(member_id, nickname) values (10, 'testNickname10');
5. 카프카 UI를 통해 변경 감지 토픽 확인
"payload": {
"before": null,
"after": {
"member_id": 1,
"nickname": "testNickname1"
},
}
위 항목으로 데이터가 어떻게 변경되었는지(생성, 변경, 삭제)를 파악할 수 있다.
생성
"payload": {
"before": null,
"after": {
"member_id": 1,
"nickname": "testNickname1"
},
}
수정
"payload": {
"before": {
"member_id": 1,
"nickname": "testNickname1"
},
"after": {
"member_id": 1,
"nickname": "testModifiedNickname1"
},
삭제
"payload": {
"before": {
"member_id": 2,
"nickname": "testNickname2"
},
"after": null,
}