恐竜本舗

エンジニアをしている恐竜の徒然日記です。

Kafka.js + Debezium を使って、Change Data Capture(CDC)でDBの変更検知を行い、別DBに書き込む(後編)

はじめに

前回の続きです。

daitasu.hatenablog.jp

Kafka.jsを用いて、Kafkaの producer と consumer の仕組みを理解しました。

これをもとに、CDCツールである Debezium を用いて、PostgreSQLのデータ変更を検知して、別DBに書き込む様な流れを作ってみます。

debezium.io

やってみる

基盤となるコンテナの立ち上げ

まずは、前回同様に、 Kafka と zookeeper のコンテナ、そして、2つのPostgreSQLと Debezium のコンテナを作成します。

kafka は、コンテナ内部から見る kafka:9092 と、consumer.ts からアクセスするための localhost:29092 を準備しておきます。

compose.yaml

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.7.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.7.0
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:29092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
    ports:
      - "9092:9092"
      - "29092:29092"

  postgres1:
    image: postgres:16
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
      POSTGRES_DB: originalDB
    ports:
      - "5432:5432"
    volumes:
      - ./postgres1_data:/var/lib/postgresql/data

  postgres2:
    image: postgres:16
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: password
      POSTGRES_DB: newDB
    ports:
      - "5433:5432"
    volumes:
      - ./postgres2_data:/var/lib/postgresql/data

  debezium:
    image: debezium/connect:2.7
    ports:
      - "8083:8083"
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    depends_on:
      - kafka
      - postgres1

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
      KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: ""
    depends_on:
      - kafka

コンテナを立ち上げ、先にoriginalDB と newDB のそれぞれに、仮のテーブルを作っておきます。

CREATE TABLE Users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100) UNIQUE,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE

Debezuim コンテナにコネクタを作成する

次に、Debezuim コンテナに対して、コネクタを作成します。 connector-config.json という名前で下記を作成します。

{
  "name": "postgres-users-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres1",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "password",
    "database.dbname": "originalDB",
    "database.server.name": "dbserver1",
    "table.include.list": "public.users",
    "plugin.name": "pgoutput",
    "topic.prefix": "dbserver1"
  }
}

下記のコマンドで、コネクタを作成できます。

curl -X POST -H "Content-Type: application/json" --data @connector-config.json http://localhost:8083/connectors

もしコネクタを消して、再度作成したい場合は、下記のコマンドを入力して再度やり直してください。

curl -X DELETE http://localhost:8083/connectors/postgres-users-connector

kafka-console-consumer を用いて試す

consumer.ts で別DBへの書き込みを行う前に、Kafkaコンテナに組み込まれている kafka-console-consumer を用いてconsole 出力で正しく動くかを確認します。

まず、Users テーブルにデータを作成し、Topicが正しく追加されているかを見てみます。

DB への追加

$ psql "postgresql://postgres:password@localhost:5432/originalDB"
psql (14.12 (Homebrew), server 16.3 (Debian 16.3-1.pgdg120+1))
WARNING: psql major version 14, server major version 16.
         Some psql features might not work.
Type "help" for help.

originalDB=# INSERT INTO Users (name, email) VALUES ('daitasu', 'daitasu@example.com');
INSERT 0 1

topic の確認

[appuser@xxx ~]$ kafka-topics --list --bootstrap-server localhost:9092
__consumer_offsets
dbserver1.public.users
my_connect_configs
my_connect_offsets
my_connect_statuses

dbserver1.public.users というtopic が確認できます。 この状態で、 Kafkaコンテナに組み込まれているkafka-console-consumer を実行して、message の購読を開始してみます。

$ docker compose exec kafka bash
[appuser@xxx ~]$ kafka-console-consumer   --bootstrap-server kafka:9092   --topic dbserver1.public.users  --from-beginning
{"schema":{"type":"struct","fields":[{ ...

message がすでに溜まっていれば、上記のように取得することができます。 特に、中身の payload.beforepayload.after でDBの変更前後を追うことができます。

  "payload": {
    "before": null,
    "after": {
      "id": 18,
      "name": "daitasu",
      "email": "daitasu@example.com",
      "created_at": 1723994488034439
    }
}

⚠ 注意点として、PostgreSQLwal_levellogical でないと、 debezuim の接続が切れてしまいます。

事前に、 postgresql.conf の中身を wal_level = logical に書き換えておきましょう。 (これに気づかず、結構苦戦しました)

Kafka.js で新DBに書き込みを行う

さて、CDCからの message の購読までうまく行ったので、これを Kafka.js を用いて、実際に新DBに書き込むまでを行います。

consumer.ts

import { Kafka } from "kafkajs";
import { Client } from "pg";

const kafka = new Kafka({
  clientId: "cdc-client",
  brokers: ["localhost:29092"],
});

const consumer = kafka.consumer({
  groupId: "cdc-group",
});

const pgClient = new Client({
  user: "postgres",
  host: "localhost",
  database: "newDB",
  password: "password",
  port: 5433,
});

const run = async () => {
  await pgClient.connect();

  await consumer.connect();
  await consumer.subscribe({
    topic: "dbserver1.public.users",
    fromBeginning: true,
  });

  await consumer.run({
    eachMessage: async ({ message }) => {
      const change = JSON.parse(message.value.toString());

      // CDC メッセージに基づいて、新DBに変更を反映させる処理
      if (change.payload && change.payload.after) {
        const { id, name, email, created_at } = change.payload.after;

        const validCreatedAt = isNaN(new Date(created_at).getTime())
          ? new Date()
          : new Date(created_at);

        const query = `
          INSERT INTO Users (id, name, email, created_at) 
          VALUES ($1, $2, $3, $4)
          ON CONFLICT (id) DO UPDATE 
          SET name = EXCLUDED.name, email = EXCLUDED.email, created_at = EXCLUDED.created_at;
        `;
        await pgClient.query(query, [id, name, email, validCreatedAt]);
      }
    },
  });
};

run().catch(console.error);

const { id, name, email, created_at } = change.payload.after で、さきほどのDBの変更後のデータを取得することができます。 このデータをもとに、新DBに書き込みを行う、といった形です。

さて、では新DBの中身を見に行ってみましょう!

psql "postgresql://postgres:password@localhost:5433/newDB"
psql (14.12 (Homebrew), server 16.3 (Debian 16.3-1.pgdg120+1))
WARNING: psql major version 14, server major version 16.
         Some psql features might not work.
Type "help" for help.

newDB=# select * from Users where name = 'daitasu';
 id |  name   |        email        |        created_at
----+---------+---------------------+--------------------------
 18 | daitasu | daitasu@example.com | 56601-03-19 06:47:14.439

実際に、DBへの書き込みがされました!

まとめ

今回、Debezium + Kafka.js を用いて、ローカル環境でのCDCによる別DBへの書き込み処理を試しました。

普段のアプリケーション開発の中でCDCを使う機会がなく、理解になかなか苦戦しましたが、この手法を用いればDB移行などの幅が広がりそうです。

今後は、Amazon MSK などを用いて、より実際の動きに近い形で調査等続けていこうと思います!

今回のコードは、下記リポジトリに置いています。 前回の記事に引き続きで作成しており、本記事については、 packages/04_devezium_cdc/ 配下に配置しています。

github.com

Reference

今回の記事作成にあたり、下記を参考にさせて頂きました。