恐竜本舗

エンジニアをしている恐竜の徒然日記です。

Kafka.js + Debezium を使って、Change Data Capture(CDC)でDBの変更検知を行い、別DBに書き込む(前編)

はじめに

プロダクト開発がスケールしていくと、事業ドメインが枝分かれしていき、1つに集約されていたデータベースを複数のデータベースへと切り分けていきたい要求が生まれたりします。

そうした場面でDBの移行をする手段はいくつかあります。

  1. 同一データベース内に新規テーブルを作成し、既存テーブルと並行して利用
  2. 旧DBと新DBに分けて、異なるアプリケーションが書き込みを行う
  3. Kafkaを使ったメッセージキューでの処理
  4. Change Data Capture (CDC) を利用してDBの変更を検知し、新DBに書き込む

こうしたパターンの中で、今回はCDCを用いたDB検知での新DBへの書き込みを試してみます。

Kafka とは?

Apache Kafkaは、分散型ストリーム処理プラットフォームです。

複数台のサーバーで大量のデータを処理する分散メッセージングシステムで、送られてきたメッセージ(データ)を受け取り、受け取ったメッセージを別のシステムやデバイスに送るために使われます。

複数存在するシステムやデバイスをつなぐための重要な役割を果たします。

Kafka のユースケース

いくつか、Kafkaのユースケースをあげてみます。

  • リアルタイムデータストリーム処理
    • ex) イベントデータのリアルタイム処理(クリックストリーム、IoTセンサーデータの収集と解析など)
  • データの統合
    • ex) 複数のデータソースからのデータを集約し、中央のデータベースやデータウェアハウスに送信
    • 今回やりたいことはこちらです
  • ロギングと監視
    • ex) サーバーログ、アプリケーションのログ、監視データの集約
  • イベント駆動アーキテクチャ
    • ex) マイクロサービスに置ける、サービス感の非同期通信のためのメッセージング基盤

Kafka の基本用語

まず最初に、Kafka の基本となる、データ中継のためのシステム論理構成について基本的な単語を整理します。

Kafkaシステム概要図

  • Broker
    • データを受診/配信するサービス
  • Message
    • Kafka内で扱うデータの最小単位
    • Message にはKeyとValueをもたせることができ、Message送信時のパーティショニングで利用される
  • Producer
    • データの送信元となり、BrokerへMessageを送信するアプリケーション
  • Consumer
    • Broker からMessage取得を行うアプリケーション
  • Topic
    • Message を種別(トピック)ごとに管理するためのストレージ。Broker上に配置され管理される
    • Producer やConsumerは特定のTopicを指定してMessageの送受信を行うことで、単一のKafkaクラスタで多種類のMessageの中継を実現する

Kafkaの詳細については、下記の書籍をご参照ください。

amzn.asia

Kafka.js を試す

01 基本の挙動

今回はクライアントとして、Kafka.js を用いて試してみます。

まずは特段細かな設定をせずに、 Dockerで Kafkaブローカーを立ち上げ、producer と consumer を作成してみます。

compose.yaml

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.7.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.7.0
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    ports:
      - "9092:9092"

producer.ts

import { Kafka } from "kafkajs";

const kafka = new Kafka({
  clientId: "daitasu-producer",
  brokers: ["localhost:9092"],
});

const producer = kafka.producer();

const runProducer = async () => {
  await producer.connect();
  console.log("Producer connected");

  // メッセージを送信
  await producer.send({
    topic: "test-topic",
    messages: [{ value: "こんにちは!" }, { value: "さようなら!" }],
  });

  console.log("Messages sent");
  await producer.disconnect();
};

runProducer().catch(console.error);

consumer.ts

import { Kafka } from "kafkajs";

const kafka = new Kafka({
  clientId: "daitasu-consumer",
  brokers: ["localhost:9092"],
});

const consumer = kafka.consumer({ groupId: "test-group" });

const runConsumer = async () => {
  await consumer.connect();
  console.log("Consumer connected");

  // トピックの購読を開始
  await consumer.subscribe({ topic: "test-topic", fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      });
    },
  });
};

runConsumer().catch(console.error);

内容はとてもシンプルで、 new Kafka() で Broker のURLを指定します。

Brokerは複数台設置することができるので、配列として渡すようになっています。

今回は、Docker で立てた localhost を向き先にしていますが、Prod 環境では Amazon MSK(Managed Streaming for Apache Kafka)などを向き先にします。

実際に実行してみましょう!

package.json

{
  ...
  "scripts": {
    "start:producer": "tsx producer.ts",
    "start:consumer": "tsx consumer.ts"
  },
  "dependencies": {
    "kafkajs": "catalog:",
    "tsx": "catalog:"
  }
}

TypeScript は tsx で動かすので、上記のように依存パッケージの取得とscripts を作っておきます。

> tsx producer.ts

Producer connected
Messages sent

producer を実行し、上記のようになれば成功です。 送られたMessagesは、 Broker で受信し、Topic に永続化されます。

次に、consumer.ts から取り出します。

> tsx consumer.ts

Consumer connected
{"level":"INFO","timestamp":"2024-08-17T06:45:16.476Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"INFO","timestamp":"2024-08-17T06:45:43.660Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"test-group","memberId":"daitasu-consumer-534d2799-4dc5-46a8-b1d3-2c0fcb1d812e","leaderId":"daitasu-consumer-534d2799-4dc5-46a8-b1d3-2c0fcb1d812e","isLeader":true,"memberAssignment":{"test-topic":[0]},"groupProtocol":"RoundRobinAssigner","duration":27184}
{ partition: 0, offset: '18', value: 'こんにちは!' }
{ partition: 0, offset: '19', value: 'さようなら!' }

無事に取得することができました。

02 Producer の設定

Producer の細かい設定は、 kafka.producer() で設定を付与できます。

const producer = kafka.producer({
  allowAutoTopicCreation: true,
  transactionTimeout: 30000,
  maxInFlightRequests: 1,
  idempotent: true,
});

一部の例をあげます。

  • allowAutoTopicCreation
    • メッセージを送信する際に、指定されたトピックが存在しない場合は自動的にトピックを作成
    • (開発環境で都度Topicを作成するのが手間なときは true に、本番では false、といった感じでしょうか?)
  • maxInFlightRequests
    • 同時に処理できる未解決のリクエストの最大数を指定する
    • 1 にしておくと順序の保証が担保されますが、スループットは落ちる
  • idempotent
    • 冪等性を確保します。同じメッセージが複数回送信されても、Kafkaはそれを1回だけ処理する

Producer のバッチ送信は、下記の2つで送信の制御が可能です。

  • batch.size: 一度に送信するメッセージの最大バイトサイズを指定します。
  • linger.ms: メッセージが送信されるまでの最大待機時間をミリ秒で指定します。

しかし、調べたところ Kafka.js だとこの設定がまだできないようで、試す場合は他のクライアント( Kafka Java)などを用いる必要がありそうでした。

また、acks については、各messeges のsend 時に設定します。

  await producer.send({
    topic: "test-topic",
    messages: [{ value: "こんにちは!" }, { value: "さようなら!" }],
    acks: -1,
  });

all が -1 になります。

kafka.js.org

03 Offset の設定

KafkaのConsumerでエラーが発生した場合、Offset を手動でコミットすることで、エラー発生時に巻き戻して再試行することができます。

試しに、特定のエラーメッセージのときだけ確率で失敗するようにして、エラー時に再試行するようにします。

consumer.ts

import { Kafka } from "kafkajs";

const kafka = new Kafka({
  clientId: "daitasu-consumer",
  brokers: ["localhost:9092"],
});

const consumer = kafka.consumer({ groupId: "test-group" });

const runConsumer = async () => {
  await consumer.connect();
  console.log("Consumer connected");

  // トピックの購読を開始
  await consumer.subscribe({ topic: "test-topic", fromBeginning: true });

  await consumer.run({
    autoCommit: false,
    eachMessage: async ({ topic, partition, message }) => {
      try {
        // 特定のメッセージ内容でエラーを発生させる
        if (message.value.toString() === "エラーになるメッセージ") {
          if (Math.random() < 0.7) {
            throw new Error("Intentional error for testing");
          }
        }

        // メッセージの処理
        console.log({
          partition,
          offset: message.offset,
          value: message.value.toString(),
        });

        // メッセージ処理が成功した場合にのみ offset をcommit
        await consumer.commitOffsets([
          { topic, partition, offset: (Number(message.offset) + 1).toString() },
        ]);
        console.log(`Offset committed: ${message.offset}`);
      } catch (error) {
        // エラーが発生した場合、offset は commit しない
        console.error(`Error processing message: ${error}`);

        // エラーが発生した場合、特定のオフセットで再試行する
        await consumer.seek({ topic, partition, offset: message.offset });
        console.log(`Seeking to offset ${message.offset} for retry`);
      }
    },
  });
};

runConsumer().catch(console.error);

特定の offset からの再試行は、 consumer.seek() を用います。

producer で送るメッセージを修正します。

  await producer.send({
    topic: "test-topic",
    messages: [
      { value: "正常なメッセージ1" },
      { value: "エラーになるメッセージ" },
      { value: "正常なメッセージ2" },
    ],
  });

この状態で、再度実行すると、下記のような動きになります。

> tsx consumer.ts

Consumer connected
{"level":"INFO","timestamp":"2024-08-17T07:56:06.531Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"INFO","timestamp":"2024-08-17T07:56:34.689Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"test-group","memberId":"daitasu-consumer-1100f629-c06b-4f73-b035-fe5c51527016","leaderId":"daitasu-consumer-1100f629-c06b-4f73-b035-fe5c51527016","isLeader":true,"memberAssignment":{"test-topic":[0]},"groupProtocol":"RoundRobinAssigner","duration":28157}
{ partition: 0, offset: '37', value: '正常なメッセージ1' }
Offset committed: 37
Error processing message: Error: Intentional error for testing
Seeking to offset 38 for retry
Error processing message: Error: Intentional error for testing
Seeking to offset 38 for retry
{ partition: 0, offset: '38', value: 'エラーになるメッセージ' }
Offset committed: 38
{ partition: 0, offset: '39', value: '正常なメッセージ2' }
Offset committed: 39

エラーになるメッセージのoffset で再試行が発生し、そこから継続しています。

ちなみに、現在のoffset にいるかは、 kafka-consumer-groups というCLIコマンドで見ることができます。

試しにDockerコンテナに入ってみてみましょう。

$ docker exec -it {container-id} bash
[appuser@xxx ~]$ kafka-consumer-groups --bootstrap-server localhost:9092 --list # consumer group の一覧を表示
test-group
[appuser@xxx ~]$ kafka-consumer-groups --bootstrap-server localhost:9092 --group test-group --describe test-group # 特定のconsumer group の詳細表示
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
test-group      test-topic      0          40              40              0               daitasu-consumer-xxx /192.xxx.xx.x  daitasu-consumer

このようにして、現在のoffset を確認することが可能です。

kafka.js.org

おわりに

今回はKafka.js を用いて、Kafkaの基本的な仕組みの理解と、Messageの送信の流れを追いかけてみました。

この記事で扱ったコードは、下記に置いてます。

github.com

内容が長くなるので前後編に分けます!

後編では、実際にChange Data Capture(CDC) の仕組みを用いて、PostgreSQL のDB変更検知 → 別DB書き込み 、という流れをやってみようと思います!

daitasu.hatenablog.jp