はじめに
前回の続きです。
Kafka.jsを用いて、Kafkaの producer と consumer の仕組みを理解しました。
これをもとに、CDCツールである Debezium を用いて、PostgreSQLのデータ変更を検知して、別DBに書き込む様な流れを作ってみます。
やってみる
基盤となるコンテナの立ち上げ
まずは、前回同様に、 Kafka と zookeeper のコンテナ、そして、2つのPostgreSQLと Debezium のコンテナを作成します。
kafka は、コンテナ内部から見る kafka:9092 と、consumer.ts からアクセスするための localhost:29092 を準備しておきます。
compose.yaml
services: zookeeper: image: confluentinc/cp-zookeeper:7.7.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - "2181:2181" kafka: image: confluentinc/cp-kafka:7.7.0 depends_on: - zookeeper environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" ports: - "9092:9092" - "29092:29092" postgres1: image: postgres:16 environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: password POSTGRES_DB: originalDB ports: - "5432:5432" volumes: - ./postgres1_data:/var/lib/postgresql/data postgres2: image: postgres:16 environment: POSTGRES_USER: postgres POSTGRES_PASSWORD: password POSTGRES_DB: newDB ports: - "5433:5432" volumes: - ./postgres2_data:/var/lib/postgresql/data debezium: image: debezium/connect:2.7 ports: - "8083:8083" environment: - BOOTSTRAP_SERVERS=kafka:9092 - GROUP_ID=1 - CONFIG_STORAGE_TOPIC=my_connect_configs - OFFSET_STORAGE_TOPIC=my_connect_offsets - STATUS_STORAGE_TOPIC=my_connect_statuses depends_on: - kafka - postgres1 kafka-ui: image: provectuslabs/kafka-ui:latest ports: - "8080:8080" environment: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092 KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 KAFKA_CLUSTERS_0_SCHEMAREGISTRY: "" depends_on: - kafka
コンテナを立ち上げ、先にoriginalDB と newDB のそれぞれに、仮のテーブルを作っておきます。
CREATE TABLE Users ( id SERIAL PRIMARY KEY, name VARCHAR(100), email VARCHAR(100) UNIQUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE
Debezuim コンテナにコネクタを作成する
次に、Debezuim コンテナに対して、コネクタを作成します。
connector-config.json という名前で下記を作成します。
{ "name": "postgres-users-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "tasks.max": "1", "database.hostname": "postgres1", "database.port": "5432", "database.user": "postgres", "database.password": "password", "database.dbname": "originalDB", "database.server.name": "dbserver1", "table.include.list": "public.users", "plugin.name": "pgoutput", "topic.prefix": "dbserver1" } }
下記のコマンドで、コネクタを作成できます。
curl -X POST -H "Content-Type: application/json" --data @connector-config.json http://localhost:8083/connectors
もしコネクタを消して、再度作成したい場合は、下記のコマンドを入力して再度やり直してください。
curl -X DELETE http://localhost:8083/connectors/postgres-users-connector
kafka-console-consumer を用いて試す
consumer.ts で別DBへの書き込みを行う前に、Kafkaコンテナに組み込まれている kafka-console-consumer を用いてconsole 出力で正しく動くかを確認します。
まず、Users テーブルにデータを作成し、Topicが正しく追加されているかを見てみます。
DB への追加
$ psql "postgresql://postgres:password@localhost:5432/originalDB" psql (14.12 (Homebrew), server 16.3 (Debian 16.3-1.pgdg120+1)) WARNING: psql major version 14, server major version 16. Some psql features might not work. Type "help" for help. originalDB=# INSERT INTO Users (name, email) VALUES ('daitasu', 'daitasu@example.com'); INSERT 0 1
topic の確認
[appuser@xxx ~]$ kafka-topics --list --bootstrap-server localhost:9092 __consumer_offsets dbserver1.public.users my_connect_configs my_connect_offsets my_connect_statuses
dbserver1.public.users というtopic が確認できます。
この状態で、 Kafkaコンテナに組み込まれているkafka-console-consumer を実行して、message の購読を開始してみます。
$ docker compose exec kafka bash [appuser@xxx ~]$ kafka-console-consumer --bootstrap-server kafka:9092 --topic dbserver1.public.users --from-beginning {"schema":{"type":"struct","fields":[{ ...
message がすでに溜まっていれば、上記のように取得することができます。
特に、中身の payload.before 、 payload.after でDBの変更前後を追うことができます。
"payload": { "before": null, "after": { "id": 18, "name": "daitasu", "email": "daitasu@example.com", "created_at": 1723994488034439 } }
⚠ 注意点として、PostgreSQLの wal_level が logical でないと、 debezuim の接続が切れてしまいます。
事前に、 postgresql.conf の中身を wal_level = logical に書き換えておきましょう。
(これに気づかず、結構苦戦しました)
Kafka.js で新DBに書き込みを行う
さて、CDCからの message の購読までうまく行ったので、これを Kafka.js を用いて、実際に新DBに書き込むまでを行います。
consumer.ts
import { Kafka } from "kafkajs"; import { Client } from "pg"; const kafka = new Kafka({ clientId: "cdc-client", brokers: ["localhost:29092"], }); const consumer = kafka.consumer({ groupId: "cdc-group", }); const pgClient = new Client({ user: "postgres", host: "localhost", database: "newDB", password: "password", port: 5433, }); const run = async () => { await pgClient.connect(); await consumer.connect(); await consumer.subscribe({ topic: "dbserver1.public.users", fromBeginning: true, }); await consumer.run({ eachMessage: async ({ message }) => { const change = JSON.parse(message.value.toString()); // CDC メッセージに基づいて、新DBに変更を反映させる処理 if (change.payload && change.payload.after) { const { id, name, email, created_at } = change.payload.after; const validCreatedAt = isNaN(new Date(created_at).getTime()) ? new Date() : new Date(created_at); const query = ` INSERT INTO Users (id, name, email, created_at) VALUES ($1, $2, $3, $4) ON CONFLICT (id) DO UPDATE SET name = EXCLUDED.name, email = EXCLUDED.email, created_at = EXCLUDED.created_at; `; await pgClient.query(query, [id, name, email, validCreatedAt]); } }, }); }; run().catch(console.error);
const { id, name, email, created_at } = change.payload.after で、さきほどのDBの変更後のデータを取得することができます。
このデータをもとに、新DBに書き込みを行う、といった形です。
さて、では新DBの中身を見に行ってみましょう!
psql "postgresql://postgres:password@localhost:5433/newDB" psql (14.12 (Homebrew), server 16.3 (Debian 16.3-1.pgdg120+1)) WARNING: psql major version 14, server major version 16. Some psql features might not work. Type "help" for help. newDB=# select * from Users where name = 'daitasu'; id | name | email | created_at ----+---------+---------------------+-------------------------- 18 | daitasu | daitasu@example.com | 56601-03-19 06:47:14.439
実際に、DBへの書き込みがされました!
まとめ
今回、Debezium + Kafka.js を用いて、ローカル環境でのCDCによる別DBへの書き込み処理を試しました。
普段のアプリケーション開発の中でCDCを使う機会がなく、理解になかなか苦戦しましたが、この手法を用いればDB移行などの幅が広がりそうです。
今後は、Amazon MSK などを用いて、より実際の動きに近い形で調査等続けていこうと思います!
今回のコードは、下記リポジトリに置いています。
前回の記事に引き続きで作成しており、本記事については、 packages/04_devezium_cdc/ 配下に配置しています。
Reference
今回の記事作成にあたり、下記を参考にさせて頂きました。