在 Kafka + Debezium 串流 DB – 原理介紹 說明整體的概念,這邊就來分享一下實戰步驟!
由於流程複雜,這邊先簡述一下整理過程,接下來解說會依循以下步驟:
- 設定 PostgreSQL,開啟 replication
- MSK (AWS 託管的 kafka)參數設定
- 預先建立 debezium 本人所需的 topic
- 起 debezium
- debezium 連結 PostgreSQL
- 到 kafdrop 檢查結果
- 實用指令大全
- Troubleshooting
目錄
- 1 讓 PostgreSQL 支援 wal2json 並開啟 replication
- 2 MSK (AWS Kafka) 參數設定
- 3 預先建立 debezium 本人所需的 topic
- 4 起 debezium
- 5 debezium 連結 PostgreSQL
- 6 到 kafdrop 檢查結果
- 7 實用指令大全
- 8 Troubleshooting
- 8.1 Failed to flush, timed out while waiting for producer to flush outstanding 509 messages
- 8.2 NOT_ENOUGH_REPLICAS
- 8.3 資料太多,第一次插上 connector 跑很久
- 8.4 No partition metadata for topic
- 8.5 db 欄位資料如果是 decimal ,kafka 傳來的資料會錯誤
- 8.6 Database connection failed when writing to copy
- 8.7 The message is 1886584 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
- 8.8 out of memory
- 8.9 如果發生 LSN 太小的問題
- 8.10 replication slot already exists
- 8.11 Couldn’t obtain encoding for database
- 9 結語
讓 PostgreSQL 支援 wal2json 並開啟 replication
首先我們先在 db 建立一個有 replication permission 的 role
建立 Replication Permission Role
CREATE ROLE debezium WITH LOGIN PASSWORD '<密碼>';
GRANT rds_replication to debezium;
GRANT CONNECT ON DATABASE <目標 db> TO debezium;
# 切到目標 db (要用該 db 擁有者的帳號,或是用最高權限帳號) 再跑以下指令,讓該帳號可以讀取目標 db
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT ON TABLES TO debezium;
接著要開啟 db 的 replication 功能。這邊分兩條路解釋,一條是自建 PostgreSQL,通常會在 local 測試用,又分成直接安裝 PostgreSQL 以及使用 Docker 起的。另外一條是使用 AWS RDS,RDS 的設定就會簡單許多。
自建的 PostgreSQL
Step 1:安裝 wal2json 套件
- 如果是直接安裝 PostgreSQL的機器 -> 到該機器的 linux shell 下以下指令。
- 如果是 Docker 起的 PostgreSQL -> 到 container 裡的 shell 下以下指令。
(指令出處可參考 這裏)
apt install postgresql-server-dev-9.5 build-essential git
git clone https://github.com/eulerto/wal2json -b master --single-branch \
&& cd wal2json \
&& git checkout d2b7fef021c46e0d429f2c1768de361069e58696 \
&& make && make install \
&& cd .. \
&& rm -rf wal2json
Step 2:給予 replication 權限
修改 pg_hba.conf 檔案,依照對應帳號加入權限
- 如果是直接安裝 PostgreSQL 的機器(路徑版號需更換) -> /etc/postgresql/9.4/main/pg_hba.conf
- 如果是 Docker 起的 PostgreSQL -> /var/lib/postgresql/data/pg_hba.conf
host all all all trust
host replication <role> all trust
# 對指定 role 開啟 replication 權限
Step 3:開啟 wal replication slot
修改 postgresql.conf 加入以下 wal 相關設定。這邊 slot 是給四個,可依需求調整數量。
- 如果是直接安裝 PostgreSQL 的機器(路徑版號需更換) -> /etc/postgresql/9.4/main/postgresql.conf
- 如果是 Docker 起的 PostgreSQL -> /var/lib/postgresql/data/postgresql.conf
需注意的是,wal_level 一定要一起設定!不然資料庫會無法啟動!
listen_addresses = '*'
max_wal_senders = 4
wal_level = logical
max_replication_slots = 4
shared_preload_libraries = 'wal2json'
之後需重啟資料庫,wal 相關設定才會生效(資料庫重啟會造成 app 應用斷線,如果是在 production 要特別注意 down time)。可到 DB shell 用這個指令確認,如果看到以下回應就算是成功。
postgres=> SHOW wal_level;
wal_level
-----------
logical
(1 row)
AWS RDS 託管的 PostgreSQL
Step 1:修改 Parameters Group 中的設定
在 RDS 頁面的側邊欄點選 「Parameters groups」,再點選右上角橘色的按鈕「Create parameter group」,建立一組新的設定組
依照資料庫版本選擇對應的 Parameter group family,下面名稱可自取
點擊剛剛建立的 parameter group,在上面搜尋 rds.logical_replication,把它改成 1 即可
rds.logical_replication=1
到目標 RDS 的 Modify 頁面,套用剛剛建立的 parameter group 並設定讓他立刻生效
Reboot RDS instance 後,到 db shell 用這個指令確認是否成功開啟
postgres=> SHOW wal_level;
wal_level
-----------
logical
(1 row)
假如不是顯示 logical,先確認 Parameter group 是否有套用到目標 RDS,以及 RDS 在套用後是否有重啟。
MSK (AWS Kafka) 參數設定
架設 MSK (AWS 託管的 Kafka) 這邊就不再闡述,我們直接看參數要如何調整。
首先切換到 MSK 管理頁面,點選 Cluster configurations -> Create cluster configuration
在下面編輯器調整設定值
主要有以下兩個設定值要修改或加入
# 如果沒有設定 auto create topic 到時候串起來會在 debezium log 看到 Error while fetching metadata with correlation id 錯誤,設定為 true 後,如果發布沒有宣告過的 topic,就會自動建立
auto.create.topics.enable=true
# 這一行依需求調整,如果資料表中有很大的欄位,比如說 json,這數值就要調大到符合最大資料的大小,不然會在 debezium log 看到以下錯誤
# The message is <xxxx> bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration
message.max.bytes=104857600
接著切換到目標 kafka cluster 管理頁面,在 configuration 區塊點選 Edit,選擇剛剛建立好的 configure 並套用即可
預先建立 debezium 本人所需的 topic
經過實測,雖然有設定 kafka auto create topic,但如果在起 debezium 之前沒有預先宣告 debezium 本身所需的基本 topic,後續就會發生 NOT_ENOUGH_REPLICAS error。所以我們這邊預先宣告給他。
建立 kafka topic 需要使用 kafka shell 介面,這邊我們使用 wurstmeister/kafka 這個 image 來起一次性的 container
docker run --rm -it wurstmeister/kafka:latest bash
進入 shell 後輸入以下指令,注意的是,這邊的 zookeeper 位置只是示意,請自行更換成自己的位置。
這邊我們為 debezium 建立三個 topic,分別是 db-connect-config、db-connect-offset、connect-status。
kafka-topics.sh --zookeeper z-1.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:2181 --create --topic db-connect-config --partitions 1 --replication-factor 3
kafka-topics.sh --zookeeper z-1.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:2181 --create --topic db-connect-offset --partitions 50 --replication-factor 3
kafka-topics.sh --zookeeper z-1.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:2181 --create --topic connect-status --partitions 5 --replication-factor 3
cofig 和 offset 的 replication 和 partition 的數值是參考 debezium 官方文件這一段的描述
CONFIG_STORAGE_TOPIC
This environment variable is required when running the Kafka Connect service. Set this to the name of the Kafka topic where the Kafka Connect services in the group store connector configurations. The topic must have a single partition and be highly replicated (e.g., 3x or more).
OFFSET_STORAGE_TOPIC
This environment variable is required when running the Kafka Connect service. Set this to the name of the Kafka topic where the Kafka Connect services in the group store connector offsets. The topic must have a large number of partitions (e.g., 25 or 50), be highly replicated (e.g., 3x or more) and should be configured for compaction.
簡單來說,config storage topic (我們命名為 db-connect-config),只能有 1 個 partition,但要有高的 replication,建立 3x 以上。這個 topic 是 debezium 用來儲存 connect 的設定資料。
offset storage topic (我們命名為 db-connect-offset),必須要有高的 partition,如 25 或 50 個。且也要有高的 replication,3x 以上。這個 topic 是 debezium 用來儲存 offset。
起 debezium
這邊我們用以下的 docker-compose 起
version: '2.3'
services:
# kafdrop 是ㄧ個 web 介面,可以監看所有的 topic 設定與內容,可選用
kafdrop:
image: obsidiandynamics/kafdrop:3.23.0
container_name: kafdrop
restart: always
ports:
- 9000:9000
environment:
# 請改成自己的 kafka server 位置
KAFKA_BROKERCONNECT: b-1.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:9092
JVM_OPTS: "-Xms128M -Xmx256M"
SERVER_SERVLET_CONTEXTPATH: "/"
SERVER_PORT: 9000
# 這個就是我們今天的主角之一 debezium
connect:
image: debezium/connect
container_name: connect
restart: always
ports:
- 8083:8083
environment:
GROUP_ID: debezium
# 給定我們剛剛手動建立的兩個 topic 名稱
CONFIG_STORAGE_TOPIC: db-connect-config
OFFSET_STORAGE_TOPIC: db-connect-offset
# 換成自己的 kafka servers
BOOTSTRAP_SERVERS: b-1.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-2.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:9092
# 這邊參數是這次我依照遇到的問題修改後的,可選
OFFSET_FLUSH_TIMEOUT_MS: 60000 # default 5000
OFFSET_FLUSH_INTERVAL_MS: 15000 # default 60000
MAX_BATCH_SIZE: 32768 # default 2048
MAX_QUEUE_SIZE: 131072 # default 8192
KAFKA_PRODUCER_MAX_REQUEST_SIZE: 10485760
CONNECT_PRODUCER_MAX_REQUEST_SIZE: 104857600
HEAP_OPTS: '-Xms4g -Xmx4g' # default '-Xms1g -Xmx1g'
debezium 連結 PostgreSQL
請先找出剛剛起 debezium 的機器位置,接下來會直接使用 debezium 的 restful api 來設定 connector
// PUT ,可以使用 postman 咖方便
// hostname 換成機器位置
// connector name 換成你為這個 connector 取的名子,好辨識用
// http://<hostname>:8083/connectors/<connector_name>/config
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
// 換成你的 db 位置
"database.hostname": "<your_db>.xxxxxx.ap-southeast-1.rds.amazonaws.com",
"database.port": "5432",
// 換成我們一開始為 debezium 建立擁有 replication 權限的帳號密碼
"database.user": "<user_name>",
"database.password": "<password>",
// 換成你要串出來的目標 db 名稱
"database.dbname" : "<your_db>",
// 自己取一個能辨識的來源名稱,之後會變成 topic 的前綴
"database.server.name": "<server_name>",
// 換成你要串出來的目標 db 名稱
"database.whitelist": "<your_db>",
// 換成要串出來的表名稱,可以有很多個,用逗號分隔
"table.whitelist": "public.<table_name>,public.<table_name>,public.<table_name>",
// 換成自己的 kafka servers
"database.history.kafka.bootstrap.servers": "b-2.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-1.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:9092,b-3.kafka.xxxxxx.c3.kafka.ap-northeast-1.amazonaws.com:9092",
// 換成你要串出來的目標 db 名稱
"database.history.kafka.topic": "schema-changes.<your_db>",
// 如果是 RDS,用此,否則用 wal2json
"plugin.name": "wal2json_rds_streaming",
// 這邊給定 never 有特殊涵義,後面會解釋
"snapshot.mode": "never",
// 如果欄位中有 decimal,必須宣告在此,串出來的資料才會正確
"decimal.handling.mode": "double",
"database.tcpKeepAlive": true
}
到 kafdrop 檢查結果
此時隨便在有串出來的資料表中 insert 一筆資料,切到剛剛一起建立的 kafdrop 看看有沒有新的 topic 出現,順便檢查資料內容,就能確定是否成功囉!
實用指令大全
kafka 相關 shell 指令
# 建立 topic
kafka-topics.sh --zookeeper z-1.dev-kafka.xxxx.c3.kafka.ap-southeast-1.amazonaws.com:2181 --create --topic test --partitions 1 --replication-factor 1
# 刪除 topic
kafka-topics.sh --zookeeper z-1.dev-kafka.xxxx.c3.kafka.ap-southeast-1.amazonaws.com:2181 --delete --topic test
# list topic
kafka-topics.sh --zookeeper z-1.dev-kafka.xxxx.c3.kafka.ap-southeast-1.amazonaws.com:2181 --list
# describe topic
kafka-topics.sh --zookeeper z-1.dev-kafka.xxxx.c3.kafka.ap-southeast-1.amazonaws.com:2181 --describe --topic db-connect-offset
# send message
echo "test 1" | kafka-console-producer.sh --broker-list b-2.dev-kafka.xxxx.c3.kafka.ap-southeast-1.amazonaws.com:9092 --topic test
# 調整已建立的 topic partition
kafka-topics.sh --zookeeper z-1.dev-kafka.xxxx.c3.kafka.ap-southeast-1.amazonaws.com:2181 --alter --topic 'connect-status' --partitions 2
Kafka re-assign replication
先確認目前狀態
kafka-topics.sh --zookeeper z-1.kafka.xxxx.c3.kafka.ap-northeast-1.amazonaws.com:2181 --describe --topic connect-status
Topic: connect-status PartitionCount: 5 ReplicationFactor: 1 Configs: cleanup.policy=compact
Topic: connect-status Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: connect-status Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: connect-status Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: connect-status Partition: 3 Leader: 2 Replicas: 2 Isr: 2
Topic: connect-status Partition: 4 Leader: 1 Replicas: 1 Isr: 1
準備 replication 設定檔 increase-replication-factor.json
{"version":1,
"partitions":[
{"topic":"connect-status","partition":0,"replicas":[1,2]},
{"topic":"connect-status","partition":1,"replicas":[1,2]},
{"topic":"connect-status","partition":2,"replicas":[1,2]},
{"topic":"connect-status","partition":3,"replicas":[1,2]},
{"topic":"connect-status","partition":4,"replicas":[1,2]}
]}
載入
kafka-reassign-partitions.sh --zookeeper z-1.kafka.xxxx.c3.kafka.ap-northeast-1.amazonaws.com:2181 --reassignment-json-file increase-replication-factor.json --execute
debezium 常用 restful api
修改 connector conifg,若不存在則建立
PUT http://<hostname>:8083/connectors/<connector_name>/config
刪除 connect
DELETE http://<hostname>:8083/connectors/<connector_name>
查看已創立的 connector
GET http://<hostname>:8083/connectors/
查看對應的 connector 配置訊息
GET http://<hostname>:8083/connectors/<connector_name>
查看對應的 connector 狀態
GET http://<hostname>:8083/connectors/<connector_name>/status
Troubleshooting
串接 debezium 的過程中遇到各種稀奇古怪的問題,一次在這邊集合起來,希望對你有幫助
Failed to flush, timed out while waiting for producer to flush outstanding 509 messages
debezium offset timeout 參數調大,batch, queue 調大,記憶體調大。在 debezium 的 docker compose 加入以下環境變數
OFFSET_FLUSH_TIMEOUT_MS: 60000 # default 5000
OFFSET_FLUSH_INTERVAL_MS: 15000 # default 60000
MAX_BATCH_SIZE: 32768 # default 2048
MAX_QUEUE_SIZE: 131072 # default 8192
HEAP_OPTS: '-Xms2g -Xmx2g' # default '-Xms1g -Xmx1g'
解法參考:
https://stackoverflow.com/questions/49868753/debezium-flush-timeout-and-outofmemoryerror-errors-with-mysql
參數定義:
https://debezium.io/documentation/reference/development/engine.html
NOT_ENOUGH_REPLICAS
如果一開始沒有預先為 debezium 設定他本人所需的 topic,就會在 log 看到這個問題,請看 預先建立 debezium 本人所需的 topic
解法參考:
https://medium.com/searce/realtime-cdc-from-mysql-using-aws-msk-with-debezium-28da5a4ca873
資料太多,第一次插上 connector 跑很久
PUT connect config 的參數裡面加
"snapshot.mode": "never"
請看 debezium 連結 PostgreSQL。這問題主要是原始資料表資料非常多。一開始 debezium 預設的模式是 initial,他會把目前表中所有的資料都倒入 kafka,如果表很大,就會跑到天荒地老。所以我們這邊改成 never,設定完 connector 只有該表有變動,才會開始串新的資料到 kafka。如果舊有資料不 care,就可使用 never。
參數定義:
https://debezium.io/documentation/reference/1.1/connectors/postgresql.html#snapshots
No partition metadata for topic
目前研究沒招,只能直接刪掉 kafka 所有的 topic。照理說正常使用不會遇到這個問題,當時遇到的時候可能是不斷嘗試各種設定導致的副作用吧!
解法參考:
https://blog.csdn.net/weixin_34376562/article/details/94612083
db 欄位資料如果是 decimal ,kafka 傳來的資料會錯誤
PUT connector config 的參數裡面加
"decimal.handling.mode": "double"
解法參考:
https://blog.csdn.net/u012551524/article/details/83546765
Database connection failed when writing to copy
PUT connector config 的參數裡面加
"database.tcpKeepAlive": true
The message is 1886584 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
在 debezium docker-compose 加以下環境變數
AFKA_PRODUCER_MAX_REQUEST_SIZE: 10485760
CONNECT_PRODUCER_MAX_REQUEST_SIZE: 10485760
在 MSK config 加這個
message.max.bytes=104857600
如果是用 docker-compose 起的 kafka,則加這個環境變數
KAFKA_MESSAGE_MAX_BYTES: 104857600
out of memory
如果遇到此錯誤
2018-03-27 11:37:46,307 ERROR Postgres|dbz|records-stream-producer unexpected exception while streaming logical changes [io.debezium.connector.postgresql.RecordsStreamProducer]
org.postgresql.util.PSQLException: ERROR: out of memory
Detail: Cannot enlarge string buffer containing 1073741746 bytes by 1013 more bytes.
Where: slot "debezium", output plugin "wal2json", in the change callback, associated LSN 8/5A721C
如果是使用 RDS,PUT connector config 的參數修改為以下
"plugin.name": "wal2json_rds_streaming"
如果發生 LSN 太小的問題
如果看到以下 log
2020-11-17 03:17:17,668 INFO Postgres|xxxx|postgres-connector-task Streaming requested from LSN 12049144886512 but received LSN 12046475487160 that is same or smaller so skipping the message [io.debezium.connector.postgresql.connection.AbstractMessageDecoder]
先透過 debezium restful api 刪掉 connector,然後進 db shell 刪掉 slot,再建立 connector
# 檢查有什麼 slot
SELECT * FROM pg_replication_slots ;
slot_name | plugin | slot_type | datoid | database | active | active_pid | xmin | catalog_xmin | restart_lsn
-----------+----------+-----------+----------+----------+--------+------------+------+--------------+--------------
debezium | wal2json | logical | 57326135 | xxxx | t | 11640 | | 275315499 | AFC/EAE14CC0
# 刪除掉 slot
select pg_drop_replication_slot('debezium');
這問題目前看來是 debezium 內部 bug 導致,後來升級到最新版就不會再發生了
replication slot already exists
如果在同一台 DB instance 裡開啟一個以上的 debezium connector,就必須對每個 connector 給定 slot name(預設名稱為 debezium)
PUT connect config 的參數裡面加
"slot.name" : "<your slot name>"
Couldn’t obtain encoding for database
如果打 PUT or POST connect config 得到以下回覆訊息,表示 db 可能有防火牆擋住,或是 db 帳密錯誤,或者使用的 db 帳號對該 database 沒有權限導致。
{
"error_code": 500,
"message": "Couldn't obtain encoding for database xxxx"
}
結語
設定真的是非常的複雜啊!但設定完後就可以享受鬆耦合帶來的好處!希望大家都能夠順利架設!
相關文章
封面圖片備註
單純只是搜尋 streaming 發現這張圖蠻妙的,就放上來了XD
攝影師:Photography Maghradze PH,連結:Pexels