在KafkaConnect中加入Debezium

Kai
23 min readApr 23, 2019

--

沒意外的話這篇將會成為我第一篇技術文章,其實使用Debezium已經幾個月過去了,一直沒有把這個東西好好的了解,現在終於有空重新研究一次如何將Debezium正規的透過KafkaConnect啟動。

KafkaConnect是什麼?

KafkaConnect是一套基於Kafka產生的數據連結系統,它可以將資料來源透過Kafka的機制輸出到其他的系統中。一般來說都是將資料來源透過各式的Connector把資料加入Kafka的各種對應的Topic中,然後在其他的系統中從Topic中取得資訊。

Debezium是什麼?

官方網站簡短的一句話道盡「Stream changes from your databases」,就是把資料庫所有的異動(或是事件)輸出到一個資料流中,而這邊所使用的資料流系統是Kafka,Debezium在此處就是扮演著Connector的角色。

Debezium的兩種啟動方式

1. EmbeddedEngine

上面的連結有提到如何使用EmbeddedEngine,但是這篇文章就是用中文來紀錄,所以接下來展示一下如何運用Embedded的方式運行。

2020/04/07更新: 這個方法已經被官方淘汰,可以參考頁面中的資訊

首先,需要將這兩個dependency引入專案:

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${version.debezium}</version>
</dependency>

這邊要注意的是,如果使用的是別的Database,則需要找它支援的dependency,寫文當下已經支援的database已經有:MySQL, MongoDB, PostgreSQL, Oracle 和 SQL Server。

接著,在java的程式中用一個Runnable來執行EmbeddedEngine:

// 定義EmbeddedEngine與MySQLConnector的配置
Configuration config = Configuration.create()
/* begin engine properties */
.with("connector.class",
"io.debezium.connector.mysql.MySqlConnector")
.with("offset.storage",
"org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename",
"/debezium/offset.dat")
.with("offset.flush.interval.ms", 60000)
/* begin connector properties */
.with("name", "my-sql-connector")
.with("database.hostname", "localhost")
.with("database.port", 3306)
.with("database.user", "mysqluser")
.with("database.password", "mysqlpw")
.with("server.id", 85744)
.with("database.server.name", "my-app-connector")
.with("database.history",
"io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename",
"/debezium/dbhistory.dat")
.build();
// 用配置建立EmbeddedEngine實例
EmbeddedEngine engine = EmbeddedEngine.create()
.using(config)
.notifying(this::checkSourceRecord)
.build();
// 異步執行
Thread thread = new Thread(engine);
thread.start();
// 這邊先暫時睡個一小時
Thread.sleep(36000000);
// 這個是用來停止EmbeddedEngine的函式,可以寫在任何希望停止的事件之中
engine.stop();

接著寫一個checkSourceRecord的函式來處理攔截到的資料異動:

/**
* 檢查每一個事件紀錄
*/
private void checkSourceRecord(SourceRecord record) {
// 跳過value無值的事件
if (record.value() == null) {
if (logger.isWarnEnabled()) {
logger.warn("skip event:" + record.toString());
}
return;
}
// 只擷取debezium內部的kafkaValue
Struct value = (Struct) record.value();
try {

// 把事件Struct轉成鍵值結構
HashMap<String, Object> valueMap = parseStruct(value);
// 再轉成jsonString
String jsonValue = Utility.toJsonStr(valueMap);
// 在console紀錄歷程
if(logger.isInfoEnabled()) {
logger.info(jsonValue);
}

// 送到KafkaTopic
producer.send(new ProducerRecord<String, String>(kafkaTopic, jsonValue));
} catch (Exception e) {
if (logger.isErrorEnabled()) {
logger.error("checkSourceRecord exception", e);
}
}
}

用這個方式攔截到的結構會是叫做org.apache.kafka.connect.data.Struct的類型,因此做一個函式把它parse成json比較好讀:

/**
* 將Struct轉成鍵值的結構
*/
private HashMap<String, Object> parseStruct(Struct struct) {
HashMap<String, Object> structMap = new HashMap<>();
Schema schema = struct.schema();
for (Field field : schema.fields()) {
String fieldName = field.name();
Object object = struct.get(field);
if (field.schema().type().equals(Schema.Type.STRUCT)) {
Struct subStruct = struct.getStruct(fieldName);
if (subStruct != null) {
object = parseStruct(subStruct);
}
}
structMap.put(fieldName, object);
}
return structMap;
}

這樣就算完成了用EmbeddedEngine的方式執行Debezium了,可以在checkSourceRecord中對攔截到的事件執行你想要做的任何事情。如果配合SpringBoot的話可以這樣做:

這邊Thread可以改成用ThreadPoolTaskExecutor來做,有點離題了...

2. KafkaConnect

來到本文的重點了,其實這邊才是這篇文章真正想要紀錄的內容,那麼就開始吧!

首先,要在預先安裝好的Kafka系統中,將KafkaConnect啟動,如果還沒有裝Kafka的話,需要先去看安裝Kafka的相關的文章。安裝好之後,會需要幾個在安裝完成後會產生的批次檔:

${KAFKA_HOME}/bin/connect-distributed.sh
${KAFKA_HOME}/bin/connect-standalone.sh

這邊稍微提一下,KafkaConnect有兩種運行模式,分別是standalone以及distributed,這兩種模式簡單的說在於scalable的差異,這邊就直接用distributed來介紹。

我們可以建立一個KafkaConnect使用的配置檔 kafka-connect.properties,內容如下:

bootstrap.servers=192.168.56.21:9092,192.168.56.22:9092,192.168.56.23:9092
group.id=KafkaConnectGroup1
offset.storage.topic=center-db-offset
config.storage.topic=center-db-config
status.storage.topic=center-db-status
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

這邊補充一下,其實如果正確安裝Kafka的話,應該也會有這兩個配置檔,也可以從這兩個檔案copy一個來修改,一樣有分standalone以及distributed模式:

${KAFKA_HOME}/config/connect-distributed.properties
${KAFKA_HOME}/config/connect-standalone.properties

接下來,就是運用 connect-distributed.sh 來啟動KafkaConnect:

${KAFKA_HOME}/bin/connect-distributed.sh ${KAFKA_HOME}/config/kafka-connect.properties

好的,到這邊為止已經成功啟動了KafkaConnect,如果你希望process可以在背景執行的話可以自己寫一個批次檔用nohup啟動像是這樣:

KAFKA_HOME="你的KAFKA目錄";
KAFKA_CONNECT="$KAFKA_HOME/bin/connect-distributed.sh";
CONFIG="$KAFKA_HOME/config/kafka-connect.properties";
nohup $KAFKA_CONNECT $CONFIG >/dev/null 2>&1 &

如果要停止的話可以先找到process,然後kill它:

...@vm:~# ps -ef | grep kafka-connect.properties | grep -v grep
root 11817 1 10 02:23 pts/1 00:00:18 java -Xms256(...後略)
...@vm:~# kill 11817

接下來就是要透過KafkaConnect提供的API來加入Debezium的Connector。先下載Debezium的Connector Plugin Archive,可以從這個地方選擇配合的database包,這邊一樣用MySQL的包做範例:

這邊的下載連結會一直提供「最新」的包,要注意的是Debezium從0.9.X版本之後才有提供MySQL8的Connector,下載完的包長這樣:

debezium-connector-mysql-0.9.4.Final-plugin.tar.gz

將包解壓到這個路徑${KAFKA_HOME}/plugin/...,內容物會是這樣:

..@vm:${KAFKA_HOME}/plugin/debezium-connector-mysql/0.9.4.Final$ ll
total 6824
drwxr-xr-x 2 root root 4096 Apr 17 09:10 ./
drwxr-xr-x 3 root root 4096 Apr 17 10:26 ../
-rwxr-xr-x 1 root root 334662 Apr 17 09:10 antlr4-runtime-4.7.jar*
-rwxr-xr-x 1 root root 107200 Apr 17 09:10 CHANGELOG.md*
-rwxr-xr-x 1 root root 15719 Apr 17 09:10 CONTRIBUTE.md*
-rwxr-xr-x 1 root root 1281 Apr 17 09:10 COPYRIGHT.txt*
-rwxr-xr-x 1 root root 263934 Apr 17 09:10 debezium-connector-mysql-0.9.4.Final.jar*
-rwxr-xr-x 1 root root 671324 Apr 17 09:10 debezium-core-0.9.4.Final.jar*
-rwxr-xr-x 1 root root 2631200 Apr 17 09:10 debezium-ddl-parser-0.9.4.Final.jar*
-rwxr-xr-x 1 root root 11357 Apr 17 09:10 LICENSE.txt*
-rwxr-xr-x 1 root root 171908 Apr 17 09:10 mysql-binlog-connector-java-0.19.1.jar*
-rwxr-xr-x 1 root root 2132635 Apr 17 09:10 mysql-connector-java-8.0.13.jar*
-rwxr-xr-x 1 root root 596642 Apr 17 09:10 protobuf-java-2.6.1.jar*
-rwxr-xr-x 1 root root 13070 Apr 17 09:10 README.md*

將plugin的jar檔整理在 ${KAFKA_HOME}/plugin 中,並將kafka-connect.properties配置中增加plugin的路徑,然後重啟KafkaConnect:

bootstrap.servers=192.168.56.21:9092,192.168.56.22:9092,192.168.56.23:9092
group.id=KafkaConnectGroup1
offset.storage.topic=offset-storage
config.storage.topic=config-storage
status.storage.topic=status-storage
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
plugin.path=${KAFKA_HOME}/plugin

最後,運用KafkaConnect提供的API來新增Debezium,預設的KafkaConnect domain是 http://localhost:8083:

(POST) localhost:8083/connectors
{
"name": "dbz",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "192.168.56.21",
"database.port": "3306",
"database.user": "user",
"database.password": "pwd",
"database.server.id": "6666",
"database.server.name": "ThisIsTopicOfKafka",
"database.history.kafka.topic": "center-db-history",
"database.history.kafka.bootstrap.servers": "192.168.56.21:9092,192.168.56.22:9092,192.168.56.23:9092",
"database.history.skip.unparseable.ddl": "true",
"database.serverTimezone": "UTC",
"include.schema.changes": "false",
"poll.interval.ms": "10"
}
}

如果用GET來呼叫localhost:8083/connectors會列出KafkaConnect正在運行中的connector,所以記得要用post並且使用application/json的Content-Type,到這邊就算是完成了。

測試看看:

對你的資料源資料庫新增一些資料來觀察看看Kafka吧!

先建一個資料庫與資料表:

CREATE DATABASE `testing`;
USE testing;
CREATE TABLE `counter` (
`id` INT(11) NOT NULL,
`num` INT(11) NOT NULL,
`str1` VARCHAR(50) NULL DEFAULT NULL,
`num2` DECIMAL(10,2) NOT NULL DEFAULT '0.00',
PRIMARY KEY (`id`)
);

加入一筆資料:

INSERT INTO `testing`.`counter` (`id`, `num`, `str1`, `num2`) VALUES ('3', '66', 'ok', '1.23');

觀察看看Kafka的Topic,KafkaConnectAPI新增的時候config裡面的database.server.name參數控制Topic前綴,所以testing庫的counter表會是這樣:

運用Kafka的UI Tool看到的Topic

而剛剛新增的資料會被輸出成這樣:

運用Kafka的UI Tool看到的訊息

Key:

{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
}
],
"optional": false,
"name": "ThisIsTopicOfKafka.testing.counter.Key"
},
"payload": {
"id": 3
}
}

Message:

{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "int32",
"optional": false,
"field": "num"
},
{
"type": "string",
"optional": true,
"field": "str1"
},
{
"type": "bytes",
"optional": false,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "2",
"connect.decimal.precision": "10"
},
"default": "AA==",
"field": "num2"
}
],
"optional": true,
"name": "ThisIsTopicOfKafka.testing.counter.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "int32",
"optional": false,
"field": "num"
},
{
"type": "string",
"optional": true,
"field": "str1"
},
{
"type": "bytes",
"optional": false,
"name": "org.apache.kafka.connect.data.Decimal",
"version": 1,
"parameters": {
"scale": "2",
"connect.decimal.precision": "10"
},
"default": "AA==",
"field": "num2"
}
],
"optional": true,
"name": "ThisIsTopicOfKafka.testing.counter.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "version"
},
{
"type": "string",
"optional": true,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "server_id"
},
{
"type": "int64",
"optional": false,
"field": "ts_sec"
},
{
"type": "string",
"optional": true,
"field": "gtid"
},
{
"type": "string",
"optional": false,
"field": "file"
},
{
"type": "int64",
"optional": false,
"field": "pos"
},
{
"type": "int32",
"optional": false,
"field": "row"
},
{
"type": "boolean",
"optional": true,
"default": false,
"field": "snapshot"
},
{
"type": "int64",
"optional": true,
"field": "thread"
},
{
"type": "string",
"optional": true,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "table"
},
{
"type": "string",
"optional": true,
"field": "query"
}
],
"optional": false,
"name": "io.debezium.connector.mysql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
}
],
"optional": false,
"name": "ThisIsTopicOfKafka.testing.counter.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 3,
"num": 66,
"str1": "ok",
"num2": "ew=="
},
"source": {
"version": "0.9.4.Final",
"connector": "mysql",
"name": "ThisIsTopicOfKafka",
"server_id": 6666,
"ts_sec": 1556024484,
"gtid": "d3f99648-028c-11e9-a80e-023681421869:2306177",
"file": "mysql-bin.000121",
"pos": 418,
"row": 0,
"snapshot": false,
"thread": 3209,
"db": "testing",
"table": "counter",
"query": null
},
"op": "c",
"ts_ms": 1556024484407
}
}

這邊我們一般都比較關心payload裡面的before跟after物件,要注意的地方是:

用KafkaConnect的資料結構會跟EmbeddedEngine的結構不一樣

如果有興趣的話可以去比對兩種方式產出的json格式。

寫在最後:

花了大概兩三個小時才從頭把KafkaConnect搞定,才了解原來Debezium的嵌入其實很容易,但是因為整個都是英文的文件,然後Debezium很少說明KafkaConncet相關的設定,必須自己先去把KafkaConnect看懂才有辦法做,其實EmbeddedEngine應該也有一些好處,比如可以攔截中斷點或是想要把資料輸出到別的MessageQueue系統。感覺Debezium相關的文獻很少中文的,希望這篇文章可以有一些幫助,如果文中有什麼錯誤的話也歡迎批評指教。

--

--