Home Kafka Connect를 활용한 CDC 파이프라인 구축 (Source, Sink Pipeline)
Post
Cancel

Kafka Connect를 활용한 CDC 파이프라인 구축 (Source, Sink Pipeline)

실습 템플릿

위 레포지토리가 구축하고자하는 환경은 각기 다른 온프레미스 서버 인스턴스에 카프카를 KRaft모드로 띄우고 CDC를 구축하는 것이다.

따라서 각 노드에는 Controller, Broker, Connect 총 3개의 프로세스가 동작하는 환경을 구축한다.


Step 1. MSSQL 접속 및 데이터베이스, 스키마, 테이블 생성

  1. SOURCE, EXTERNAL 데이터베이스를 생성
  2. SOURCE, SOURCE 데이터베이스 내에 dbo 스키마를 사용
  3. SOURCE, SOURCE 데이터베이스 내에 dbo 스키마 내에 MEMBER_BASE 테이블 생성
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
-- SOURCE 라는 데이터베이스를 사용
USE [EXTERNAL];

create table dbo.MEMBER_BASE(
    member_id bigint primary key,
    nickname nvarchar(50)
)

USE SOURCE;

create table dbo.MEMBER_BASE(
    member_id bigint primary key,
    nickname nvarchar(50)
)

EXEC sys.sp_cdc_enable_db;

ALTER DATABASE SOURCE SET CHANGE_TRACKING = ON(CHANGE_RETENTION = 3 DAYS, AUTO_CLEANUP = ON)

EXEC sys.sp_cdc_enable_table
      @source_schema = 'dbo',
      @source_name = 'MEMBER_BASE',
      @role_name = 'sa';

Step 2. Source, Sink Connector 등록

HTTP 요청으로 http://localhost:8083/connectors 혹은 http://localhost:8084/connectors 혹은 http://localhost:8085/connectors 으로 DB에 관한 커넥트를 등록한다. 이 때 메서드는 POST 요청으로 보내야하며, Requset Body는 아래와 같다.

1. 소스 커넥터 등록 [POST] http://localhost:8083/connectors

Confluent 공식문서 - JDBC Source Configure

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
{
    // 기본값: 없음 (필수 설정)
    // 다른 옵션: 사용자가 지정한 어떤 문자열이든 가능함
    "name": "mssql-cdc-member-source-connector",
    "config": {
        // 기본값: 없음 (필수 설정)
        // 다른 옵션: 다른 source connector 클래스 (예: MySqlConnector, PostgresConnector 등)
        // 중요도: high
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",

        // 기본값: 1
        // 다른 옵션: 1보다 큰 정수 (병렬 처리를 위해 증가 가능)
        // 중요도: low
        "tasks.max": "1",

        // 기본값: false
        // 다른 옵션: true (SSL/TLS를 사용한 암호화된 연결)
        // 중요도: medium
        "database.encrypt": false,

        // 기본값: 없음 (필수 설정)
        // 다른 옵션: SQL Server가 실행 중인 다른 호스트 이름이나 IP 주소
        // 중요도: high
        "database.hostname": "host.docker.internal",

        // 기본값: 1433 (SQL Server의 기본 포트)
        // 다른 옵션: SQL Server가 사용하는 다른 포트 번호
        // 중요도: high
        "database.port": "1433",

        // 기본값: 없음 (필수 설정)
        // 다른 옵션: 데이터베이스에 접근 권한이 있는 다른 사용자 이름
        // 중요도: high
        "database.user": "SA",

        // 기본값: 없음 (필수 설정)
        // 다른 옵션: 해당 사용자의 올바른 비밀번호
        // 중요도: high
        "database.password": "admin123$%",

        // 기본값: 없음 (필수 설정)
        // 다른 옵션: 다른 데이터베이스 이름 또는 여러 데이터베이스 (쉼표로 구분)
        // 중요도: high
        "database.names": "SOURCE",

        // 기본값: 없음
        // 다른 옵션: 다른 스키마 이름 또는 여러 스키마 (쉼표로 구분)
        // 중요도: medium
        "schema.include.list": "dbo",

        // 기본값: 없음
        // 다른 옵션: 다른 테이블 이름 또는 여러 테이블 (쉼표로 구분)
        // 중요도: medium
        "table.include.list": "dbo.MEMBER_BASE",

        // 기본값: 없음 (필수 설정)
        // 다른 옵션: Kafka 브로커의 다른 주소와 포트
        // 중요도: high
        "database.history.kafka.bootstrap.servers": "broker1:19091,broker2:29092,broker3:39093",

        // 기본값: UTC
        // 다른 옵션: 다른 유효한 타임존 ID
        // 중요도: medium
        "db.timezone": "Asia/Seoul",

        // 기본값: 서버 이름
        // 다른 옵션: 사용자가 지정한 다른 접두사
        // 중요도: high
        "topic.prefix": "member-source",

        // 기본값: 없음
        // 다른 옵션: 다른 transform 이름 또는 여러 transform (쉼표로 구분)
        // 중요도: low
        "transforms": "route",

        // 기본값: 없음
        // 다른 옵션: 다른 Kafka Connect transform 클래스
        // 중요도: low
        "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",

        // 기본값: 없음
        // 다른 옵션: 다른 정규식 패턴
        // 중요도: low
        "transforms.route.regex": "member-source.SOURCE.dbo.MEMBER_BASE",

        // 기본값: 없음
        // 다른 옵션: 다른 대체 문자열
        // 중요도: low
        "transforms.route.replacement": "source",

        // 기본값: 없음 (필수 설정)
        // 다른 옵션: Kafka 브로커의 다른 주소와 포트
        // 중요도: high
        "schema.history.internal.kafka.bootstrap.servers": "broker1:19091,broker2:29092,broker3:39093",

        // 기본값: ${database.server.name}.schema-changes.inventory
        // 다른 옵션: 사용자가 지정한 다른 토픽 이름
        // 중요도: medium
        "schema.history.internal.kafka.topic": "schema-history-cdc"

        // -----------------고려해볼만한 옵션------------------- //

        // 기본값: initial (중요도: high)
        // 다른 옵션: latest, timestamp
        // 설명: 커넥터가 시작될 때 데이터를 읽기 시작할 위치를 지정
        "snapshot.mode": "initial",

        // 기본값: 8192 (중요도: medium)
        // 다른 옵션: 1024에서 1048576 사이의 정수
        // 설명: 데이터베이스 로그에서 한 번에 읽어올 최대 이벤트 수
        "max.queue.size": "16384",

        // 기본값: 0 (중요도: medium)
        // 다른 옵션: 0보다 큰 정수 (밀리초 단위)
        // 설명: 폴링 간격. 0은 지속적인 폴링을 의미
        "poll.interval.ms": "1000",

        // 기본값: 10 (중요도: medium)
        // 다른 옵션: 1에서 100 사이의 정수
        // 설명: 커넥터가 데이터베이스에 연결을 시도할 최대 횟수
        "connect.max.retries": "10",

        // 기본값: 10000 (중요도: medium)
        // 다른 옵션: 1000에서 60000 사이의 정수 (밀리초 단위)
        // 설명: 연결 재시도 사이의 대기 시간
        "connect.backoff.max.ms": "10000",

        // 기본값: false (중요도: medium)
        // 다른 옵션: true
        // 설명: 데이터베이스 테이블의 컬럼 이름을 소문자로 변환할지 여부
        "column.lowercase": "false",

        // 기본값: 8192 (중요도: medium)
        // 다른 옵션: 1024에서 1048576 사이의 정수
        // 설명: 트랜잭션 로그에서 한 번에 처리할 최대 배치 크기
        "max.batch.size": "8192",

        // 기본값: 0 (중요도: medium)
        // 다른 옵션: 0 이상의 정수 (밀리초 단위)
        // 설명: 각 배치 처리 사이의 지연 시간. 0은 지연 없음을 의미
        "poll.interval.ms": "1000",

        // 기본값: false (중요도: high)
        // 다른 옵션: true
        // 설명: 데이터베이스의 시스템 테이블에서 변경 사항을 캡처할지 여부
        "include.schema.changes": "true",

        // 기본값: 0 (중요도: medium)
        // 다른 옵션: 0 이상의 정수
        // 설명: 스키마 변경 이벤트의 최대 크기 (바이트 단위). 0은 제한 없음을 의미
        "max.queue.size.in.bytes": "104857600",

        // 기본값: adaptive_time_microseconds (중요도: medium)
        // 다른 옵션: adaptive, adaptive_time, commit_time
        // 설명: 변경 이벤트의 순서를 결정하는 방법
        "event.processing.failure.handling.mode": "warn",

        // 기본값: fail (중요도: high)
        // 다른 옵션: warn, skip, stop
        // 설명: 변경 이벤트 처리 중 오류 발생 시 처리 방법
        "event.processing.failure.handling.mode": "warn"
    }
}

2. 싱크 커넥터 등록 [POST] http://localhost:18083/connectors

Confluent 공식문서 - JDBC Sink Configure

Debezium 공식문서 - SQL Server Connectors

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
{
    // 기본값: 없음 (필수 설정)
    // 다른 옵션: 사용자가 지정한 어떤 문자열이든 가능함
    "name": "mssql-cdc-member-sink-connector",
    "config": {
        // 기본값: 없음 (필수 설정)
        // 다른 옵션: 다른 sink connector 클래스 (예: ElasticsearchSinkConnector, S3SinkConnector 등)
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",

        // 기본값: 없음 (필수 설정, 중요도: high)
        // 다른 옵션: 다른 데이터베이스 URL (예: MySQL, PostgreSQL 등의 JDBC URL)
        "connection.url": "jdbc:sqlserver://host.docker.internal:1433;databaseName=EXTERNAL",

        // 기본값: null (필수 설정, 중요도: high)
        // 다른 옵션: 데이터베이스에 접근 권한이 있는 다른 사용자 이름
        "connection.user": "sa",

        // 기본값: null (필수 설정, 중요도: high)
        // 다른 옵션: 해당 사용자의 올바른 비밀번호
        "connection.password": "admin123$%",

        // 기본값: false (중요도: medium)
        // 다른 옵션: true (테이블이 존재하지 않을 경우 자동으로 생성)
        "auto.create": "false",

        // 기본값: false (중요도: medium)
        // 다른 옵션: true (스키마 변경 시 테이블 구조를 자동으로 변경)
        "auto.evolve": "false",

        // 기본값: false (중요도: medium)
        // 다른 옵션: true (현재 설정된 값, 삭제 작업 활성화)
        // 주의: pk.mode가 record_key로 설정되어야 함
        "delete.enabled": "true",

        // 기본값: 없음 (필수 설정)
        // 다른 옵션: 1보다 큰 정수 (병렬 처리를 위해 증가 가능)
        "tasks.max": "1",

        // 기본값: 없음 (필수 설정)
        // 다른 옵션: 다른 토픽 이름 또는 여러 토픽 (쉼표로 구분)
        "topics": "member-source.SOURCE.dbo.MEMBER_BASE",

        // 기본값: ${topic} (중요도: medium)
        // 다른 옵션: 다른 테이블 이름 형식 (예: "${topic}" 사용 가능)
        "table.name.format": "EXTERNAL.dbo.MEMBER_BASE",

        // 기본값: none (중요도: high)
        // 다른 옵션: kafka, record_key, record_value (현재 설정)
        "pk.mode": "record_value",

        // 기본값: insert (중요도: high)
        // 다른 옵션: upsert (현재 설정), update
        "insert.mode": "upsert",

        // 기본값: 없음
        // 다른 옵션: 다른 transform 이름 또는 여러 transform (쉼표로 구분)
        "transforms": "unwrap",

        // 기본값: 없음
        // 다른 옵션: 다른 Kafka Connect transform 클래스
        "transforms.unwrap.type": "org.apache.kafka.connect.transforms.ExtractField$Value",

        // 기본값: 없음
        // 다른 옵션: 추출하고자 하는 다른 필드 이름
        "transforms.unwrap.field": "after",

        // 소스 데이터의 CREATED_AT과 UPDATED_AT 값이 Unix timestamp 형식(밀리초)일 경우 (ex) 1644421787000 = 2022-02-09 15:16:27 UTC)
        // SQL Server의 datetime 타입은 이러한 큰 숫자를 직접 받아들일 수 없어 삽입 전에 별도로 변환해줘야한다.
        "transforms.convertInsDate.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convertInsDate.target.type": "Timestamp",
        "transforms.convertInsDate.field": "CREATED_AT",
        "transforms.convertInsDate.format": "yyyy-MM-dd HH:mm:ss.SSS",
        "transforms.convertUptDate.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
        "transforms.convertUptDate.target.type": "Timestamp",
        "transforms.convertUptDate.field": "UPDATED_AT",
        "transforms.convertUptDate.format": "yyyy-MM-dd HH:mm:ss.SSS",

        // 기본값: initial (중요도: high)
        // 다른 옵션: schema_only, initial, never
        // 설명: 커넥터 시작 시 스냅샷 모드를 지정. 'schema_only'는 스키마만 캡처하고 데이터는 캡처하지 않음
        "snapshot.mode": "schema_only",

        // 기본값: repeatable_read (중요도: medium)
        // 다른 옵션: read_committed, read_uncommitted, exclusive
        // 설명: 스냅샷 생성 시 사용할 트랜잭션 격리 수준. 스냅샷이 생성되는 동안 다른 트랜잭션에 의한 데이터 변경을 볼 수 없어 스냅샷 전체에 걸쳐 일관된 데이터 뷰를 보장한다.
        "snapshot.isolation.mode": "repeatable_read",

        // -----------------고려해볼만한 옵션------------------- //

        // 기본값: 3000 (중요도: medium)
        // 다른 옵션: 1에서 수만까지의 정수
        // 설명: 단일 배치 작업에서 처리할 최대 레코드 수
        "batch.size": "3000",

        // 기본값: 0 (중요도: medium)
        // 다른 옵션: 0보다 큰 정수
        // 설명: 작업 실패 시 재시도할 최대 횟수
        "max.retries": "3",

        // 기본값: 0 (중요도: medium)
        // 다른 옵션: 0보다 큰 정수 (밀리초 단위)
        // 설명: 재시도 사이의 대기 시간
        "retry.backoff.ms": "1000",

        // 기본값: 300000 (5분, 중요도: medium)
        // 다른 옵션: 0보다 큰 정수 (밀리초 단위)
        // 설명: 커넥터가 새 메시지를 폴링하는 최대 간격
        "max.poll.interval.ms": "300000",

        // 기본값: 500 (중요도: medium)
        // 다른 옵션: 1에서 수천까지의 정수
        // 설명: 단일 폴링 작업에서 가져올 최대 레코드 수
        "max.poll.records": "500"
    }
}

Step 3. 테스트

이제 DB의 데이터를 삭제하고 생성하고 변경하는 등 쓰기요청을 수행한 후에 http:://localhost:8080 카프카 UI로 들어가서 CDC내용에 관한 토픽 메세지를 확인한다.

1
2
3
4
5
6
7
8
9
10
11
12
USE SOURCE;

insert into dbo.MEMBER_BASE(member_id, nickname) values (1, 'testNickname1');
insert into dbo.MEMBER_BASE(member_id, nickname) values (2, 'testNickname2');
insert into dbo.MEMBER_BASE(member_id, nickname) values (3, 'testNickname3');
insert into dbo.MEMBER_BASE(member_id, nickname) values (4, 'testNickname4');
insert into dbo.MEMBER_BASE(member_id, nickname) values (5, 'testNickname5');
insert into dbo.MEMBER_BASE(member_id, nickname) values (6, 'testNickname6');
insert into dbo.MEMBER_BASE(member_id, nickname) values (7, 'testNickname7');
insert into dbo.MEMBER_BASE(member_id, nickname) values (8, 'testNickname8');
insert into dbo.MEMBER_BASE(member_id, nickname) values (9, 'testNickname9');
insert into dbo.MEMBER_BASE(member_id, nickname) values (10, 'testNickname10');

1
2
3
4
5
6
7
"payload": {
  "before": null,
  "after": {
    "member_id": 1,
    "nickname": "testNickname1"
  },
}

위 항목으로 데이터가 어떻게 변경되었는지(생성, 변경, 삭제)를 파악할 수 있다.

생성

1
2
3
4
5
6
7
"payload": {
  "before": null,
  "after": {
    "member_id": 1,
    "nickname": "testNickname1"
  },
}

수정

1
2
3
4
5
6
7
8
9
"payload": {
  "before": {
    "member_id": 1,
    "nickname": "testNickname1"
  },
  "after": {
    "member_id": 1,
    "nickname": "testModifiedNickname1"
},

삭제

1
2
3
4
5
6
7
"payload": {
    "before": {
        "member_id": 2,
        "nickname": "testNickname2"
    },
    "after": null,
}

이론으로

로그 기반 CDC 5가지 장점

Debezium을 사용하는 Source CDC는 로그 기반으로 동작하게된다.

즉, DBMS의 binlog를 활용하여 변경된 데이터를 감지하게 되는 것인데 이 방식의 구체적인 내용과 장점을 살펴본다.

Binary Log

MySQL 공식문서 - BinaryLog

Binary Log는 데이터베이스의 변경 사항을 기록하는 기능이다. 주로 소스 서버의 변경 사항을 레플리카 서버로 전송하는 데이터복제나 백업 이후의 변경 사항을 재실행하여 최신 상태로 복구할 때 활용된다.

바이너리 로그는 예상치 못한 중단이 일어날 때를 대비하여 완전히 완료된 트랜잭션만 기록하고 다시 읽는 방식으로 동작한다.

즉, 트랜잭션이 끝나지 않은 내용은 이후 BinaryLog에 기록되지 않아 DB에 적용되지 않는다.

참고로, Debezium을 MySQL 사용하기 위해 Binary Log를 활용한다는 내용이었지만 MSSQL, PostgreSQL등 각자만의 Binary Log의 역할을 하는 개념을 도입시키거나 DBMS단위에서 자체적인 CDC를 제공한다.


Global Transaction ID - GTID

MySQL 공식문서 - Replication

MySQL에서는 Binary Log방식이 아닌 Transaction ID를 통해 복제를 수행하는 GTID방식도 존재한다.

This post is licensed under CC BY 4.0 by the author.