Kafka + Debezium 串流 DB - 實戰步驟

Kafka + Debezium 串流 DB – 原理介紹 說明整體的概念,這邊就來分享一下實戰步驟!

由於流程複雜,這邊先簡述一下整理過程,接下來解說會依循以下步驟:

  1. 設定 PostgreSQL,開啟 replication
  2. MSK (AWS 託管的 kafka)參數設定
  3. 預先建立 debezium 本人所需的 topic
  4. 起 debezium
  5. debezium 連結 PostgreSQL
  6. 到 kafdrop 檢查結果
  7. 實用指令大全
  8. Troubleshooting

讓 PostgreSQL 支援 wal2json 並開啟 replication

首先我們先在 db 建立一個有 replication permission 的 role

建立 Replication Permission Role

CREATE ROLE debezium WITH LOGIN PASSWORD '<密碼>';
GRANT rds_replication to debezium;

GRANT CONNECT ON DATABASE <目標 db> TO debezium;

# 切到目標 db (要用該 db 擁有者的帳號,或是用最高權限帳號) 再跑以下指令,讓該帳號可以讀取目標 db

GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO debezium;

接著要開啟 db 的 replication 功能。這邊分兩條路解釋,一條是自建 PostgreSQL,通常會在 local 測試用,又分成直接安裝 PostgreSQL 以及使用 Docker 起的。另外一條是使用 AWS RDS,RDS 的設定就會簡單許多。

自建的 PostgreSQL

Step 1:安裝 wal2json 套件

  1. 如果是直接安裝 PostgreSQL的機器 -> 到該機器的 linux shell 下以下指令。
  2. 如果是 Docker 起的 PostgreSQL -> 到 container 裡的 shell 下以下指令。
    (指令出處可參考 這裏
apt install postgresql-server-dev-9.5 build-essential git

git clone https://github.com/eulerto/wal2json -b master --single-branch \
&& cd wal2json \
&& git checkout d2b7fef021c46e0d429f2c1768de361069e58696 \
&& make && make install \
&& cd .. \
&& rm -rf wal2json

Step 2:給予 replication 權限

修改 pg_hba.conf 檔案,依照對應帳號加入權限

  1. 如果是直接安裝 PostgreSQL 的機器(路徑版號需更換) -> /etc/postgresql/9.4/main/pg_hba.conf
  2. 如果是 Docker 起的 PostgreSQL -> /var/lib/postgresql/data/pg_hba.conf
host all all all trust
host replication <role> all trust 
# 對指定 role 開啟 replication 權限

Step 3:開啟 wal replication slot

修改 postgresql.conf 加入以下 wal 相關設定。這邊 slot 是給四個,可依需求調整數量。

  1. 如果是直接安裝 PostgreSQL 的機器(路徑版號需更換) -> /etc/postgresql/9.4/main/postgresql.conf 
  2. 如果是 Docker 起的 PostgreSQL -> /var/lib/postgresql/data/postgresql.conf 

需注意的是,wal_level 一定要一起設定!不然資料庫會無法啟動!

listen_addresses = '*'
   
max_wal_senders = 4
wal_level = logical
max_replication_slots = 4
shared_preload_libraries = 'wal2json'

之後需重啟資料庫,wal 相關設定才會生效(資料庫重啟會造成 app 應用斷線,如果是在 production 要特別注意 down time)。可到 DB shell 用這個指令確認,如果看到以下回應就算是成功。

postgres=> SHOW wal_level;

 wal_level 
-----------
 logical
(1 row)

AWS RDS 託管的 PostgreSQL

Step 1:修改 Parameters Group 中的設定

在 RDS 頁面的側邊欄點選 「Parameters groups」,再點選右上角橘色的按鈕「Create parameter group」,建立一組新的設定組

Kafka + Debezium 串流 DB - 實戰步驟

依照資料庫版本選擇對應的 Parameter group family,下面名稱可自取

Kafka + Debezium 串流 DB - 實戰步驟

點擊剛剛建立的 parameter group,在上面搜尋 rds.logical_replication,把它改成 1 即可

rds.logical_replication=1
Kafka + Debezium 串流 DB - 實戰步驟
打勾點擊 Edit parameters 後就可以修改

到目標 RDS 的 Modify 頁面,套用剛剛建立的 parameter group 並設定讓他立刻生效

Kafka + Debezium 串流 DB - 實戰步驟

Reboot RDS instance 後,到 db shell 用這個指令確認是否成功開啟

postgres=> SHOW wal_level;

 wal_level 
-----------
 logical
(1 row)

假如不是顯示 logical,先確認 Parameter group 是否有套用到目標 RDS,以及 RDS 在套用後是否有重啟。

MSK (AWS Kafka) 參數設定

架設 MSK (AWS 託管的 Kafka) 這邊就不再闡述,我們直接看參數要如何調整。

首先切換到 MSK 管理頁面,點選 Cluster configurations -> Create cluster configuration

Kafka + Debezium 串流 DB - 實戰步驟

在下面編輯器調整設定值

Kafka + Debezium 串流 DB - 實戰步驟

主要有以下兩個設定值要修改或加入

# 如果沒有設定 auto create topic 到時候串起來會在 debezium log 看到 Error while fetching metadata with correlation id 錯誤,設定為 true 後,如果發布沒有宣告過的 topic,就會自動建立
auto.create.topics.enable=true
 
# 這一行依需求調整,如果資料表中有很大的欄位,比如說 json,這數值就要調大到符合最大資料的大小,不然會在 debezium log 看到以下錯誤
# The message is <xxxx> bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration
message.max.bytes=104857600 

接著切換到目標 kafka cluster 管理頁面,在 configuration 區塊點選 Edit,選擇剛剛建立好的 configure 並套用即可

Kafka + Debezium 串流 DB - 實戰步驟
找到 configuration 區塊,點擊右上角的 Edit
Kafka + Debezium 串流 DB - 實戰步驟
選擇你要的 config

預先建立 debezium 本人所需的 topic

經過實測,雖然有設定 kafka auto create topic,但如果在起 debezium 之前沒有預先宣告 debezium 本身所需的基本 topic,後續就會發生 NOT_ENOUGH_REPLICAS error。所以我們這邊預先宣告給他。

建立 kafka topic 需要使用 kafka shell 介面,這邊我們使用 wurstmeister/kafka 這個 image 來起一次性的 container

docker run --rm -it wurstmeister/kafka:latest bash

進入 shell 後輸入以下指令,注意的是,這邊的 zookeeper 位置只是示意,請自行更換成自己的位置。
這邊我們為 debezium 建立三個 topic,分別是 db-connect-config、db-connect-offset、connect-status。

kafka-topics.sh --zookeeper z-1.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:2181 --create --topic db-connect-config --partitions 1 --replication-factor 3

kafka-topics.sh --zookeeper z-1.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:2181 --create --topic db-connect-offset --partitions 50 --replication-factor 3

kafka-topics.sh --zookeeper z-1.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:2181 --create --topic connect-status --partitions 5 --replication-factor 3

cofig 和 offset 的 replication 和 partition 的數值是參考 debezium 官方文件這一段的描述

CONFIG_STORAGE_TOPIC

This environment variable is required when running the Kafka Connect service. Set this to the name of the Kafka topic where the Kafka Connect services in the group store connector configurations. The topic must have a single partition and be highly replicated (e.g., 3x or more).

OFFSET_STORAGE_TOPIC

This environment variable is required when running the Kafka Connect service. Set this to the name of the Kafka topic where the Kafka Connect services in the group store connector offsets. The topic must have a large number of partitions (e.g., 25 or 50), be highly replicated (e.g., 3x or more) and should be configured for compaction.

簡單來說,config storage topic (我們命名為 db-connect-config),只能有 1 個 partition,但要有高的 replication,建立 3x 以上。這個 topic 是 debezium 用來儲存 connect 的設定資料。
offset storage topic (我們命名為 db-connect-offset),必須要有高的 partition,如 25 或 50 個。且也要有高的 replication,3x 以上。這個 topic 是 debezium 用來儲存 offset。

起 debezium

這邊我們用以下的 docker-compose 起

version: '2.3'
services:
  # kafdrop 是ㄧ個 web 介面,可以監看所有的 topic 設定與內容,可選用
  kafdrop:
    image: obsidiandynamics/kafdrop:3.23.0
    container_name: kafdrop
    restart: always
    ports:
      - 9000:9000
    environment:     
      # 請改成自己的 kafka server 位置                                                                                                                          
      KAFKA_BROKERCONNECT: b-1.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:9092
      JVM_OPTS: "-Xms128M -Xmx256M"
      SERVER_SERVLET_CONTEXTPATH: "/"
      SERVER_PORT: 9000
  
  # 這個就是我們今天的主角之一 debezium
  connect:
    image: debezium/connect
    container_name: connect
    restart: always
    ports:
      - 8083:8083
    environment:
      GROUP_ID: debezium
      # 給定我們剛剛手動建立的兩個 topic 名稱
      CONFIG_STORAGE_TOPIC: db-connect-config
      OFFSET_STORAGE_TOPIC: db-connect-offset
      # 換成自己的 kafka servers
      BOOTSTRAP_SERVERS: b-1.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-2.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:9092

      # 這邊參數是這次我依照遇到的問題修改後的,可選
      OFFSET_FLUSH_TIMEOUT_MS: 60000  # default 5000
      OFFSET_FLUSH_INTERVAL_MS: 15000  # default 60000
      MAX_BATCH_SIZE: 32768  # default 2048                                                                                                    
      MAX_QUEUE_SIZE: 131072  # default 8192
      KAFKA_PRODUCER_MAX_REQUEST_SIZE: 10485760
      CONNECT_PRODUCER_MAX_REQUEST_SIZE: 104857600
      HEAP_OPTS: '-Xms4g -Xmx4g'  # default '-Xms1g -Xmx1g'

debezium 連結 PostgreSQL

請先找出剛剛起 debezium 的機器位置,接下來會直接使用 debezium 的 restful api 來設定 connector

// PUT ,可以使用 postman 咖方便
// hostname 換成機器位置
// connector name 換成你為這個 connector 取的名子,好辨識用
// http://<hostname>:8083/connectors/<connector_name>/config

{
     "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
     "tasks.max": "1",
     // 換成你的 db 位置
     "database.hostname": "<your_db>.xxxxxx.ap-southeast-1.rds.amazonaws.com",
     "database.port": "5432",
     // 換成我們一開始為 debezium 建立擁有 replication 權限的帳號密碼
     "database.user": "<user_name>",
     "database.password": "<password>",
     // 換成你要串出來的目標 db 名稱
     "database.dbname" : "<your_db>",
     // 自己取一個能辨識的來源名稱,之後會變成 topic 的前綴
     "database.server.name": "<server_name>",
     // 換成你要串出來的目標 db 名稱
     "database.whitelist": "<your_db>",
     // 換成要串出來的表名稱,可以有很多個,用逗號分隔
     "table.whitelist": "public.<table_name>,public.<table_name>,public.<table_name>",
     // 換成自己的 kafka servers
     "database.history.kafka.bootstrap.servers": "b-2.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-1.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-3.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:9092",
     // 換成你要串出來的目標 db 名稱
     "database.history.kafka.topic": "schema-changes.<your_db>",
     // 如果是 RDS,用此,否則用 wal2json
     "plugin.name": "wal2json_rds_streaming",
     // 這邊給定 never 有特殊涵義,後面會解釋
     "snapshot.mode": "never",
     // 如果欄位中有 decimal,必須宣告在此,串出來的資料才會正確
     "decimal.handling.mode": "double",
     "database.tcpKeepAlive": true
  }

到 kafdrop 檢查結果

此時隨便在有串出來的資料表中 insert 一筆資料,切到剛剛一起建立的 kafdrop 看看有沒有新的 topic 出現,順便檢查資料內容,就能確定是否成功囉!

Kafka + Debezium 串流 DB - 實戰步驟
首頁上方可以看到目前 kafka nodes 的狀況
Kafka + Debezium 串流 DB - 實戰步驟
首頁下方會列出目前的 topics
Kafka + Debezium 串流 DB - 實戰步驟
點 topic 可以進入詳細頁面,了解目前 topic 的狀況

實用指令大全

kafka 相關 shell 指令

# 建立 topic
kafka-topics.sh --zookeeper z-1.dev-kafka.xxxx.c3.kafka.ap-southeast-1.amazonaws.com:2181 --create --topic test --partitions 1 --replication-factor 1

# 刪除 topic
kafka-topics.sh --zookeeper z-1.dev-kafka.xxxx.c3.kafka.ap-southeast-1.amazonaws.com:2181 --delete --topic test

# list topic
kafka-topics.sh --zookeeper z-1.dev-kafka.xxxx.c3.kafka.ap-southeast-1.amazonaws.com:2181 --list

# describe topic
kafka-topics.sh --zookeeper z-1.dev-kafka.xxxx.c3.kafka.ap-southeast-1.amazonaws.com:2181 --describe --topic db-connect-offset

# send message
echo "test 1" | kafka-console-producer.sh --broker-list b-2.dev-kafka.xxxx.c3.kafka.ap-southeast-1.amazonaws.com:9092 --topic test

# 調整已建立的 topic partition
kafka-topics.sh --zookeeper z-1.dev-kafka.xxxx.c3.kafka.ap-southeast-1.amazonaws.com:2181 --alter --topic 'connect-status' --partitions 2

Kafka re-assign replication

先確認目前狀態

kafka-topics.sh --zookeeper z-1.kafka.xxxx.c3.kafka.ap-northeast-1.amazonaws.com:2181 --describe --topic connect-status

Topic: connect-status   PartitionCount: 5       ReplicationFactor: 1    Configs: cleanup.policy=compact
        Topic: connect-status   Partition: 0    Leader: 1       Replicas: 1     Isr: 1
        Topic: connect-status   Partition: 1    Leader: 2       Replicas: 2     Isr: 2
        Topic: connect-status   Partition: 2    Leader: 1       Replicas: 1     Isr: 1
        Topic: connect-status   Partition: 3    Leader: 2       Replicas: 2     Isr: 2
        Topic: connect-status   Partition: 4    Leader: 1       Replicas: 1     Isr: 1

準備 replication 設定檔 increase-replication-factor.json

{"version":1,
  "partitions":[
     {"topic":"connect-status","partition":0,"replicas":[1,2]},
     {"topic":"connect-status","partition":1,"replicas":[1,2]},
     {"topic":"connect-status","partition":2,"replicas":[1,2]},
     {"topic":"connect-status","partition":3,"replicas":[1,2]},
     {"topic":"connect-status","partition":4,"replicas":[1,2]}
]}

載入

kafka-reassign-partitions.sh --zookeeper z-1.kafka.xxxx.c3.kafka.ap-northeast-1.amazonaws.com:2181 --reassignment-json-file increase-replication-factor.json --execute

debezium 常用 restful api

修改 connector conifg,若不存在則建立
PUT http://<hostname>:8083/connectors/<connector_name>/config

刪除 connect
DELETE http://<hostname>:8083/connectors/<connector_name>

查看已創立的 connector
GET http://<hostname>:8083/connectors/

查看對應的 connector 配置訊息
GET http://<hostname>:8083/connectors/<connector_name>

查看對應的 connector 狀態
GET http://<hostname>:8083/connectors/<connector_name>/status

Troubleshooting

串接 debezium 的過程中遇到各種稀奇古怪的問題,一次在這邊集合起來,希望對你有幫助

Failed to flush, timed out while waiting for producer to flush outstanding 509 messages

debezium offset timeout 參數調大,batch, queue 調大,記憶體調大。在 debezium 的 docker compose 加入以下環境變數

OFFSET_FLUSH_TIMEOUT_MS: 60000  # default 5000
OFFSET_FLUSH_INTERVAL_MS: 15000  # default 60000
MAX_BATCH_SIZE: 32768  # default 2048
MAX_QUEUE_SIZE: 131072  # default 8192
HEAP_OPTS: '-Xms2g -Xmx2g'  # default '-Xms1g -Xmx1g'

解法參考:
https://stackoverflow.com/questions/49868753/debezium-flush-timeout-and-outofmemoryerror-errors-with-mysql
參數定義:
https://debezium.io/documentation/reference/development/engine.html

NOT_ENOUGH_REPLICAS

如果一開始沒有預先為 debezium 設定他本人所需的 topic,就會在 log 看到這個問題,請看 預先建立 debezium 本人所需的 topic

解法參考:
https://medium.com/searce/realtime-cdc-from-mysql-using-aws-msk-with-debezium-28da5a4ca873

資料太多,第一次插上 connector 跑很久

PUT connect config 的參數裡面加

"snapshot.mode": "never"

請看 debezium 連結 PostgreSQL。這問題主要是原始資料表資料非常多。一開始 debezium 預設的模式是 initial,他會把目前表中所有的資料都倒入 kafka,如果表很大,就會跑到天荒地老。所以我們這邊改成 never,設定完 connector 只有該表有變動,才會開始串新的資料到 kafka。如果舊有資料不 care,就可使用 never。

參數定義:
https://debezium.io/documentation/reference/1.1/connectors/postgresql.html#snapshots

No partition metadata for topic

目前研究沒招,只能直接刪掉 kafka 所有的 topic。照理說正常使用不會遇到這個問題,當時遇到的時候可能是不斷嘗試各種設定導致的副作用吧!

解法參考:
https://blog.csdn.net/weixin_34376562/article/details/94612083

db 欄位資料如果是 decimal ,kafka 傳來的資料會錯誤

PUT connector config 的參數裡面加

"decimal.handling.mode": "double"

解法參考:
https://blog.csdn.net/u012551524/article/details/83546765

Database connection failed when writing to copy

PUT connector config 的參數裡面加

"database.tcpKeepAlive": true

The message is 1886584 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

在 debezium docker-compose 加以下環境變數

AFKA_PRODUCER_MAX_REQUEST_SIZE: 10485760       
CONNECT_PRODUCER_MAX_REQUEST_SIZE: 10485760

在 MSK config 加這個

message.max.bytes=104857600

如果是用 docker-compose 起的 kafka,則加這個環境變數

KAFKA_MESSAGE_MAX_BYTES: 104857600

解法參考:
https://stackoverflow.com/questions/46647337/kafka-confluent-producerconfig-change-default-max-request-size-using-docker-ima

out of memory

如果遇到此錯誤

2018-03-27 11:37:46,307 ERROR  Postgres|dbz|records-stream-producer  unexpected exception while streaming logical changes   [io.debezium.connector.postgresql.RecordsStreamProducer]

org.postgresql.util.PSQLException: ERROR: out of memory

  Detail: Cannot enlarge string buffer containing 1073741746 bytes by 1013 more bytes.

  Where: slot "debezium", output plugin "wal2json", in the change callback, associated LSN 8/5A721C

如果是使用 RDS,PUT connector config 的參數修改為以下

"plugin.name": "wal2json_rds_streaming"

如果發生 LSN 太小的問題

如果看到以下 log

2020-11-17 03:17:17,668 INFO Postgres|xxxx|postgres-connector-task Streaming requested from LSN 12049144886512 but received LSN 12046475487160 that is same or smaller so skipping the message [io.debezium.connector.postgresql.connection.AbstractMessageDecoder]

先透過 debezium restful api 刪掉 connector,然後進 db shell 刪掉 slot,再建立 connector

# 檢查有什麼 slot
SELECT * FROM pg_replication_slots ;
 slot_name |  plugin  | slot_type |  datoid  | database | active | active_pid | xmin | catalog_xmin | restart_lsn  
-----------+----------+-----------+----------+----------+--------+------------+------+--------------+--------------
 debezium  | wal2json | logical   | 57326135 | xxxx       | t      |      11640 |      |    275315499 | AFC/EAE14CC0
 
# 刪除掉 slot
select pg_drop_replication_slot('debezium');

這問題目前看來是 debezium 內部 bug 導致,後來升級到最新版就不會再發生了

replication slot already exists

如果在同一台 DB instance 裡開啟一個以上的 debezium connector,就必須對每個 connector 給定 slot name(預設名稱為 debezium)

PUT connect config 的參數裡面加

"slot.name" : "<your slot name>"

Couldn’t obtain encoding for database

如果打 PUT or POST connect config 得到以下回覆訊息,表示 db 可能有防火牆擋住,或是 db 帳密錯誤,或者使用的 db 帳號對該 database 沒有權限導致。

{
    "error_code": 500,
    "message": "Couldn't obtain encoding for database xxxx"
}

結語

設定真的是非常的複雜啊!但設定完後就可以享受鬆耦合帶來的好處!希望大家都能夠順利架設!

相關文章

Kafka + Debezium 串流 DB – 原理介紹

封面圖片備註

單純只是搜尋 streaming 發現這張圖蠻妙的,就放上來了XD
攝影師:Photography Maghradze PH,連結:Pexels

Written by J
雖然大學唸的是生物,但持著興趣與熱情自學,畢業後轉戰硬體工程師,與宅宅工程師們一起過著沒日沒夜的生活,做著台灣最薄的 intel 筆電,要與 macbook air 比拼。 離開後,憑著一股傻勁與朋友創業,再度轉戰軟體工程師,一手扛起前後端、雙平台 app 開發,過程中雖跌跌撞撞,卻也累計不少經驗。 可惜不是那 1% 的成功人士,於是加入其他成功人士的新創公司,專職開發後端。沒想到卻在採前人坑的過程中,拓寬了眼界,得到了深層的領悟。