Home Kafka CDC 구축
Post
Cancel

Kafka CDC 구축

Kafka CDC Guide

카프카 CDC 구축하기 (로컬 환경)

1. 컨테이너 띄우기

아래와 같은 docker-compose파일로 카프카, 카프카 UI, 카프카 Connect를 컨테이닝한다.

networks:
  kafka_network:

volumes:
  Kafka00:
    driver: local
  Kafka01:
    driver: local
  Kafka02:
    driver: local

services:
  sql-server-db:
    image: mcr.microsoft.com/mssql/server:2022-latest
    ports:
      - "1433:1433"
    environment:
      ACCEPT_EULA: "Y"
      SA_PASSWORD: "admin12#$"
      MSSQL_AGENT_ENABLED: "true"
      MSSQL_COLLATION: "LATIN1_GENERAL_100_CI_AS_SC_UTF8"
    cap_add:
      - SYS_PTRACE

  Kafka00Service:
    image: bitnami/kafka:3.5.1-debian-11-r44
    restart: unless-stopped
    container_name: Kafka00Container

    ports:
      - '10000:9094'
    environment:
      - KAFKA_CFG_BROKER_ID=0
      - KAFKA_CFG_NODE_ID=0
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka00Service:9092,EXTERNAL://127.0.0.1:10000
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    networks:
      - kafka_network
    volumes:
      - "Kafka00:/bitnami/kafka"
  ##Kafka 01
  Kafka01Service:
    image: bitnami/kafka:3.5.1-debian-11-r44
    restart: always
    container_name: Kafka01Container
    ports:
      - '10001:9094'
    environment:
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka01Service:9092,EXTERNAL://127.0.0.1:10001
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    networks:
      - kafka_network
    volumes:
      - "Kafka01:/bitnami/kafka"
  ##Kafka 02
  Kafka02Service:
    image: bitnami/kafka:3.5.1-debian-11-r44
    restart: always
    container_name: Kafka02Container
    ports:
      - '10002:9094'
    environment:
      - KAFKA_CFG_BROKER_ID=2
      - KAFKA_CFG_NODE_ID=2
      - KAFKA_KRAFT_CLUSTER_ID=HsDBs9l6UUmQq7Y5E6bNlw
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@Kafka00Service:9093,1@Kafka01Service:9093,2@Kafka02Service:9093
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://Kafka02Service:9092,EXTERNAL://127.0.0.1:10002
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3
      - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR=2
      - KAFKA_CFG_PROCESS_ROLES=controller,broker
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    networks:
      - kafka_network
    volumes:
      - "Kafka02:/bitnami/kafka"

  KafkaConnect:
    image: confluentinc/cp-kafka-connect:7.4.3
    container_name: KafkaConnect
    depends_on:
      - Kafka00Service
      - Kafka01Service
      - Kafka02Service
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
      CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars
    volumes:
      - ./data:/data
    command:
      - bash
      - -c
      - |
        echo "Installing Connector"
        confluent-hub install --no-prompt debezium/debezium-connector-sqlserver:2.2.1
        confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
        confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:11.1.3
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        sleep infinity
    networks:
      - kafka_network

  KafkaWebUiService:
    image: provectuslabs/kafka-ui:latest
    restart: always
    container_name: KafkaWebUiContainer
    ports:
      - '8080:8080'
    environment:
      - KAFKA_CLUSTERS_0_NAME=Local-Kraft-Cluster
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092
      - DYNAMIC_CONFIG_ENABLED=true
      - KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED=true
      - KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED=true
      #- KAFKA_CLUSTERS_0_METRICS_PORT=9999
    depends_on:
      - Kafka00Service
      - Kafka01Service
      - Kafka02Service
    networks:
      - kafka_network

2. MSSQL 접속 및 데이터베이스, 스키마, 테이블 생성

  • test 데이터베이스를 생성
  • test 데이터베이스 내에 Members 스키마를 생성
  • test 데이터베이스 내에 Members 스키마를 내에 member테이블 생성
create table Members.member(
    member_id bigint primary key,
    nickname nvarchar(50)
)

3. CDC적용

3.1 테이블에 CDC적용하기

-- test 라는 데이터베이스를 사용
USE test;

-- test 데이터베이스에 대해 CDC를 활성화
EXEC sys.sp_cdc_enable_db;

-- test 데이터베이스의 변경 추적 옵션을 설정
-- 변경 추적은 데이터베이스의 테이블에서 변경된 행을 식별하는 데 사용
-- 변경 추적 데이터는 3일 동안 보존되고, 자동으로 정리
ALTER DATABASE test SET CHANGE_TRACKING = ON(CHANGE_RETENTION = 3 DAYS, AUTO_CLEANUP = ON)

-- Members 스키마의 member 테이블에 대해 CDC를 활성화
EXEC sys.sp_cdc_enable_table 
      @source_schema = 'Members', 
      @source_name = 'member', 
      @role_name = 'sa';

3.2 카프카 Connect API로 설정 등록하기

아래와 같은 내용을 Body로 하여 카프카 Connect의 주소로 POST요청을 보낸다. 경로는 connectors이다.

[POST] localhost:8083/connectors

{
    "name": "mssql-cdc-members-connector",
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max": "1",
        "database.encrypt": false,
        "database.hostname": "host.docker.internal",
        "database.port": "1433",
        "database.user": "SA",
        "database.password": "admin12#$",
        "database.names": "test",
        "schema.include.list": "Members",
        "table.include.list" : "Members.member",
        "database.history.kafka.bootstrap.servers":"Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092",
        "db.timezone": "Asia/Seoul",
        "topic.prefix": "cdc",
        "schema.history.internal.kafka.bootstrap.servers":"Kafka00Service:9092,Kafka01Service:9092,Kafka02Service:9092",
        "schema.history.internal.kafka.topic":"schema-history-cdc"
    }
}

4. CDC 적용 후 데이터 변경 시키기 (생성, 삭제, 수정)

이번 가이드에서는 생성만 수행한다.

insert into Members.member(member_id, nickname) values (1, 'testNickname1');
insert into Members.member(member_id, nickname) values (2, 'testNickname2');
insert into Members.member(member_id, nickname) values (3, 'testNickname3');
insert into Members.member(member_id, nickname) values (4, 'testNickname4');
insert into Members.member(member_id, nickname) values (5, 'testNickname5');
insert into Members.member(member_id, nickname) values (6, 'testNickname6');
insert into Members.member(member_id, nickname) values (7, 'testNickname7');
insert into Members.member(member_id, nickname) values (8, 'testNickname8');
insert into Members.member(member_id, nickname) values (9, 'testNickname9');
insert into Members.member(member_id, nickname) values (10, 'testNickname10');

5. 카프카 UI를 통해 변경 감지 토픽 확인

"payload": {
  "before": null,
  "after": {
    "member_id": 1,
    "nickname": "testNickname1"
  },
}

위 항목으로 데이터가 어떻게 변경되었는지(생성, 변경, 삭제)를 파악할 수 있다.

생성

"payload": {
  "before": null,
  "after": {
    "member_id": 1,
    "nickname": "testNickname1"
  },
}

수정

"payload": {
  "before": {
    "member_id": 1,
    "nickname": "testNickname1"
  },
  "after": {
    "member_id": 1,
    "nickname": "testModifiedNickname1"
},

삭제

"payload": {
  "before": {
    "member_id": 2,
    "nickname": "testNickname2"
  },
  "after": null,
}
This post is licensed under CC BY 4.0 by the author.