恐竜本舗

エンジニアをしている恐竜の徒然日記です。

アクターモデルの位置透過性を Kotlin/Pekko で試してみる

はじめに

年始にアクターモデルについての理解を深めたく、Kotlin を用いて Pekko でアクターモデルの位置透過性に触れてみたので、その内容をメモしていきます。

Kotlin で pekko-cluster を用いて試した実際のサンプルはこちらです。

github.com

まず、そもそもアクターモデルとは何なのでしょうか?

世の中の要求変化から見ていきます。

世の中の要求の変遷

CPU 危機

90 年代ごろまで、アプリケーションはシングルスレッドで動くのが一般的でした。

そのため、世の中としては高速なCPUが市場に出回るのを待っていたような時代です。(筆者としては生まれて間もないのであまりこの頃のことはよく分かってません)

しかし、2005 年頃に CPU のクロック速度の向上は限界に達しているという論文が提唱されました。

同時期(2005 年)に、クラスター化されたマルチプロセッサーサーバ上でアプリケーションを動かす企業が出てきました。マルチコアの登場です。

しかし、マルチコアは単一コアが低速になる傾向があるため、アプリケーションの並行度が低いとパフォーマンスが低下するという問題が出てきました。 では、スレッドプログラミングにすればよいのでは?となると、下記のような問題が発生すると言われてます。

  • デッドロックのリスク
    • 複数のスレッドが互いのリソースの開放を待ち続けるリスク。ロックの順序管理などのしんどさが辛い
  • データの競合状態(race condition) のリスク
    • 複数スレッドが共有メモリ空間にアクセスするため、この問題が生まれやすく、リソースアクセス制御の難易度が高い
  • エラーハンドリングとデバッグの複雑性
    • 複数スレッドで並行するので、これらも同じく辛い

C10K 問題

CPU 危機の傍ら、世の中のクライアント数は膨大に増えていきました。

アクセスするクライアント数が1万を超えると、サーバーのスレッド数が増えて、サーバメモリのリソース不足が発生してしまう問題です。

処理能力に余力があっても、サーバ台数を増やさないといけないという課題が出てきました。

これに対して出てきた解決策として、ノンブロッキングI/O、イベントループで対応したのがNode.js (2009年〜)になります。

Akka(Pekko) が提供するアクターモデルもまた、このような問題を解決するためのものとなっています。

アクターモデルとは?

Node.js はシングルスレッドででノンブロッキングI/O とイベントループを行うことでこれらの問題を解決しましたが、アクターモデルでは、下記のような仕組みでこうした問題を解決しています。

  • マルチスレッド + アクター
    • Akkaはマルチスレッド環境で動作し、多数のアクターを並行して実行します
  • アクター
    • アクターは独立した計算の単位であり、それぞれが自身の状態とメールボックスを持ちます
  • メッセージパッシング
    • アクター間の通信は、非同期的なメッセージパッシングによって行われます

マルチコアを有効活用でき、アクターそれぞれは独立しているため、障害時などの回復性に強いと言われています。

Akka/Pekko とは

Akka(Pekko) は、Lightbend社によって提供されたアクタープログラミングモデルとランタイム、それらに必要な補助ツールを提供する、開発ツールキットです。

Akka/Pekko の違い

Akka は Akka 2.7 からライセンスが Business Source License(BSL) に変わり、本番環境での稼働にはLightbend社の有償ラインセンスが必要になりました。

Pekkoは、 Akka 2.6 からフォークされたオープンソースプロジェクトになります。

ライセンスの話については、こちらの記事がとても参考になりました。

creators-note.chatwork.com

Akka/Pekko の位置透過性について

アクターは、状態と振る舞いをカプセル化するオブジェクトであり、受信者のメールボックスにメッセージを置くことでメッセージを交換し、排他的に通信します。 ある意味では、アクターはオブジェクト指向プログラミングの中で最も厳格な形式ですが、人間にとってはより理解しやすいものです。

akka-ja-2411-translated.netlify.app

Akka/Pekko は各アクター内に状態と振る舞いを持っていて、アクター同士で排他的にメッセージのやり取りをします。

図で示すと下記のような形です。

アクターは

  • メッセージ駆動で動作する
  • 親アクターから子アクターにメッセージを渡すことが可能
  • 障害時は子アクターからのエスカレーションを見ていくことで耐障害性を担保している

というのがざっくりした概観といったところでしょうか?

そして、ようやく本題のAkka/Pekko の位置透過性についてです。

上図で示したこれらのアクターは、どこのサーバで実行されるのかをアプリケーション自体はほとんど意識せずに構築することができるそうです。

Akka/Pekkoは、送受信したメッセージを

  • 有効なスレッドでローカルで処理する
  • 別のサーバに投げてリモートで処理する

というのをあとから設定することができ、設計時にあまり意識せずに実装することが可能です。

これには、pekko-cluster というものを用います。

イメージ図としては下記のような感じで、アプリケーション自体はサーバの状態が左なのか、右なのかといった関心を保つ必要がなく、シンプルな設定だけでサーバ間での分散処理を可能としています。

テキストだけだとあまりイメージがつかないので、実際にやってみます。

やってみる

シンプルにテキストメッセージの送受信をするアプリケーションをKotlinで書き、pekko-cluster を用いて別サーバへのメッセージ送信をやってみます。

Java のスレッドを用いて、並列処理を試す

まずは、Java のスレッドを用いてシンプルに非同期処理を作成します。

下記の記事が非常に参考になりました。

tech-lab.sios.jp

ひとまず作成したJavaスレッドをベースとした並行処理は、ほとんど上記の写経なのでここでは割愛します。

実装は下記の commit 時点を参考ください。

github.com

比較対象として、Koroutine を使った例も作ってみました。

github.com

Pekko に変えて、複数コンテナで起動し、別NodeでReceiveする

では、この処理をPekko に変えて、複数コンテナでのメッセージ送受信を Pekko Cluster を用いて実現してみます。

Pekko Cluster は、 複数のアクターシステムをクラスタとしてまとめ、あたかも単一のアクターシステム上で動作しているかのように振る舞えるようにするためのもの 、になります。

pekko.apache.org

まずは、アプリケーションを複数コンテナで立ち上がるように、Dockerを書き換えます。

Dockerfile

FROM gradle:8.12.0-jdk17 AS builder

WORKDIR /workspace
COPY apps/build.gradle.kts apps/settings.gradle.kts ./
COPY apps/src ./src

RUN gradle clean build --no-daemon

FROM openjdk:17-slim AS base
WORKDIR /app

# ビルド成果物をコピー
COPY --from=builder /workspace/build/libs/*-all.jar /app/app.jar

FROM base AS node1
EXPOSE 2551
CMD ["java", "-jar", "app.jar", "2551"]

FROM base AS node2
EXPOSE 2552
CMD ["java", "-jar", "app.jar", "2552"]

FROM base AS node3
EXPOSE 2553
CMD ["java", "-jar", "app.jar", "2553"]

compose.yaml

name: kotlin-actor-sample

services:
  seed:
    container_name: pekko-cluster-seed
    build:
      context: ..
      dockerfile: docker/Dockerfile
      target: node1
    ports:
      - "2551:2551"
    environment:
      CLUSTER_PORT: 2551
      CLUSTER_IP: seed
  c1:
    container_name: pekko-cluster-c1
    build:
      context: ..
      dockerfile: docker/Dockerfile
      target: node2
    ports:
      - "2552:2552"
    environment:
      CLUSTER_PORT: 2552
      CLUSTER_IP: c1
  c2:
    container_name: pekko-cluster-c2
    build:
      context: ..
      dockerfile: docker/Dockerfile
      target: node3
    ports:
      - "2553:2553"
    environment:
      CLUSTER_PORT: 2553
      CLUSTER_IP: c2

Javaスレッドで試した方では、src をボリュームマウントしてたのですが、同一コードを複数コンテナで見ようとするとファイルハッシュが一緒で競合エラーとなってしまったので、一旦脳死で それぞれ .jar にビルドして実行することにしました。 (並行処理を試す際の開発環境でのいい方法がきっとあるはず・・・)

そして、Application.kt を下記のように変えます。

Application.kt

package com.github.daitasu

import org.apache.pekko.actor.typed.ActorSystem
import org.apache.pekko.cluster.typed.Cluster
import org.apache.pekko.cluster.typed.Join
import com.typesafe.config.ConfigFactory

fun main() {
    val port = System.getenv("CLUSTER_PORT")?.toIntOrNull() ?: 2551
    val hostname = System.getenv("CLUSTER_IP") ?: "127.0.0.1"

    // Pekkoのプロパティを設定
    System.setProperty("pekko.remote.artery.canonical.hostname", hostname)
    System.setProperty("pekko.remote.artery.canonical.port", port.toString())

    // application.conf を読み込む
    val config = ConfigFactory.load()

    // アクターシステムの起動
    val system = ActorSystem.create(
        GreetingActor.create(),
        "MyActorSystem",
        config
    )

    // クラスタにノードをJOIN する
    Cluster.get(system).manager().tell(Join.create(Cluster.get(system).selfMember().address()))

    if (port != 2551) {
        Thread.sleep(5000)
        val message = GreetingMessage("Hello World")
        system.tell(message)
    }

    println("Started ${Cluster.get(system).selfMember().address()}")
}

GreetingActor.kt

package com.github.daitasu

import org.apache.pekko.actor.typed.Behavior
import org.apache.pekko.actor.typed.javadsl.Behaviors
import org.apache.pekko.cluster.typed.Cluster

data class GreetingMessage(val message: String)

object GreetingActor {
  fun create(): Behavior<GreetingMessage> =
    Behaviors.receive<GreetingMessage> { context, message ->
      val member = Cluster.get(context.system).selfMember()
      println("[Address: ${member.address()}] Status: ${member.status()} Message: ${message.message}")

      Behaviors.same<GreetingMessage>()
  }
}

分かりやすさのため、2ファイルに分けていますが、 Application.kt では下記を行っています。

  • アクターシステムのためのいくつかの設定の追加
  • Cluster に各コンテナで動いているnode をJOIN する
  • seed node でなければ、Messageを送信する

GreetingActor.kt では、単に他のアクターから受け取ったメッセージを自身のアクターの状態などと一緒に println しています。

上記はアプリケーションの挙動ですが、 val config = ConfigFactory.load() で読み込んでいる部分が重要で、ここでpekko-cluster の設定をしています。

src/main/resources/application.conf

pekko {
  loglevel = debug
  version = "1.1.2"
  actor {
    provider = "cluster"
    debug {
      receive = off
      lifecycle = off
    }
  }
  remote {
    log-remote-lifecycle-events = on
    artery {
      canonical {
        hostname = ${clustering.ip}
        port = ${clustering.port}
      }
    }
  }
  cluster {
    seed-nodes = [
       "pekko://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port}
    ]
  }
}

clustering {
 ip = seed
 ip = ${?CLUSTER_IP}
 port = 2551
 port = ${?CLUSTER_PORT}
 seed-ip = seed
 seed-port = 2551
 cluster.name = MyActorSystem
}

この設定に追加していくだけで、seed-node の指定と、同一Cluster上に remote-node を設定することができます。

では実際に、 docker compose up してみましょう。

log をかいつまんでみます。

$docker compose up

pekko-cluster-c1    | [MyActorSystem-pekko.actor.default-dispatcher-5] INFO org.apache.pekko.cluster.Cluster -  pekkoMemberChanged Cluster Node [pekko://MyActorSystem@c1:2552] - Node [pekko://MyActorSystem@c1:2552] is JOINING itself (with roles [dc-default], version [0.0.0]) and forming new cluster
pekko-cluster-c1    | [MyActorSystem-pekko.actor.default-dispatcher-5] INFO org.apache.pekko.cluster.Cluster - Cluster Node [pekko://MyActorSystem@c1:2552] - is the new leader among reachable nodes (more leaders may exist)
pekko-cluster-c1    | [MyActorSystem-pekko.actor.default-dispatcher-5] INFO org.apache.pekko.cluster.Cluster -  pekkoMemberChanged Cluster Node [pekko://MyActorSystem@c1:2552] - Leader is moving node [pekko://MyActorSystem@c1:2552] to [Up]
pekko-cluster-c2    | [MyActorSystem-pekko.actor.default-dispatcher-3] INFO org.apache.pekko.cluster.Cluster -  pekkoMemberChanged Cluster Node [pekko://MyActorSystem@c2:2553] - Leader is moving node [pekko://MyActorSystem@c2:2553] to [Up]
pekko-cluster-seed  | [MyActorSystem-pekko.actor.default-dispatcher-16] INFO org.apache.pekko.cluster.Cluster - Cluster Node [pekko://MyActorSystem@seed:2551] - Received InitJoin message from [Actor[pekko://MyActorSystem@c1:2552/system/cluster/core/daemon/joinSeedNodeProcess-1#1074045501]] to [pekko://MyActorSystem@seed:2551]
pekko-cluster-seed  | [MyActorSystem-pekko.actor.default-dispatcher-16] INFO org.apache.pekko.cluster.Cluster - Cluster Node [pekko://MyActorSystem@seed:2551] - Sending InitJoinAck message from node [pekko://MyActorSystem@seed:2551] to [Actor[pekko://MyActorSystem@c1:2552/system/cluster/core/daemon/joinSeedNodeProcess-1#1074045501]] (version [1.1.2])
pekko-cluster-seed  | [MyActorSystem-pekko.actor.default-dispatcher-16] INFO org.apache.pekko.cluster.Cluster - Cluster Node [pekko://MyActorSystem@seed:2551] - Received InitJoin message from [Actor[pekko://MyActorSystem@c2:2553/system/cluster/core/daemon/joinSeedNodeProcess-1#1181724472]] to [pekko://MyActorSystem@seed:2551]
pekko-cluster-seed  | [MyActorSystem-pekko.actor.default-dispatcher-16] INFO org.apache.pekko.cluster.Cluster - Cluster Node [pekko://MyActorSystem@seed:2551] - Sending InitJoinAck message from node [pekko://MyActorSystem@seed:2551] to [Actor[pekko://MyActorSystem@c2:2553/system/cluster/core/daemon/joinSeedNodeProcess-1#1181724472]] (version [1.1.2])
pekko-cluster-c2    | Started pekko://MyActorSystem@c2:2553
pekko-cluster-c2    | [Address: pekko://MyActorSystem@c2:2553] Status: Up Message: Hello World
pekko-cluster-c1    | Started pekko://MyActorSystem@c1:2552
pekko-cluster-c1    | [Address: pekko://MyActorSystem@c1:2552] Status: Up Message: Hello World

無事に c1/c2 node から seed node にmessage が送られました。

その他にも、それぞれのnode がClusterのMemberになっているような log などが見えます。

アプリケーションコードの中で分散処理のための実装やリモートサーバを意識した実装などは要らず、 application.conf に設定した内容だけでほとんど実現することができました。

おわりに

この記事では、異なるサーバ間(Docker コンテナ間)をpekko-cluster を通して同一クラスタ上に置いて、メッセージの送受信をする例を作ってみました。

実際にはメッセージの種類で送るアクターを変えたりといったことはしないので、あまり実践的なイメージは湧きませんが、ここではサーバ間をシンプルにまたいでメッセージが送られていることが体感できれば良いかなと考えてます。

普段Node.js やフロントエンドをよく触っていて、JVM系言語自体が初心者なので結構苦戦しましたが、Node.js と違ったアプローチでの解決方法や、位置透過性の面白さなどが学べて良かったです。

アクターモデルは状態と振る舞いを内部にカプセル化しますが、状態の永続化は別途 Pekko Persistenceというものが必要です。

pekko.apache.org

この辺りまで理解しないと、アクターモデルの旨味は分からなそうなので、もう少し色々と調べて触っていきたいなと思います。

Reference

今回の理解、実装にあたり、下記の記事を参考にさせていただきました。