LagomでPublish/Subscribeを試してみる

ちょっと間が空いてしまいましたがLagomを試してみるシリーズ第11回です。

今回はPub/Subモデルによるメッセージ送信を試してみようと思いますが、その前にLagomの1.0.0-M2がリリースされ、APIが少し変わっているようです。中でも影響が大きいのはServiceCallはこれまでId、Request、Responseという3つの型パラメータを持っていたのですが、RequestとResponseの2つになり、Idはサービスのメソッドの引数で受け取るようになっていることでしょうか。

github.com

そんなわけで今回からサンプルコードは1.0.0-M2ベースで書いていきたいと思います。

まず、Pub/Subを行うプロジェクトの依存関係にlagomJavadslPubSubを追加します。

lazy val helloworldImpl = project("helloworld-impl")
  .enablePlugins(LagomJava)
  .settings(
    version := "1.0-SNAPSHOT",
    libraryDependencies ++= Seq(
      lagomJavadslPersistence,
      lagomJavadslPubSub, // この行を追加
      lagomJavadslTestKit
    )
  )
  .settings(lagomForkedTestSettings: _*)
  .dependsOn(helloworldApi)

サービスの定義はこんな感じです。registerHelloはトピックにメッセージをpublishし、helloStreamはそれを受信してストリームに流します。

public interface HelloService extends Service {

  ServiceCall<HelloRequest, NotUsed> registerHello(String id);

  ServiceCall<NotUsed, Source<HelloRequest, ?>> helloStream(String id);
  
  @Override
  default Descriptor descriptor() {
    // @formatter:off
    return named("helloservice").with(
      pathCall("/device/:id/hello", this::registerHello),
      pathCall("/device/:id/hello/stream", this::helloStream)
    ).withAutoAcl(true);
    // @formatter:on
  }
}

サービスの実装は以下のようになります。PubSubRegistryをDIして操作します。今までのサンプルコードと違い、idをメソッドの引数として受け取っている点に注意してください。

public class HelloServiceImpl implements HelloService {

  private final PubSubRegistry pubSub;

  @Inject
  public HelloServiceImpl(PubSubRegistry pubSub){
    this.pubSub = pubSub;
  }

  @Override
  public ServiceCall<HelloRequest, NotUsed> registerHello(String id) {
    return request -> {
      final PubSubRef<HelloRequest> topic =
          pubSub.refFor(TopicId.of(HelloRequest.class, id));
      topic.publish(request);
      return CompletableFuture.completedFuture(NotUsed.getInstance());
    };
  }

  @Override
  public ServiceCall<NotUsed, Source<HelloRequest, ?>> helloStream(String id) {
    return request -> {
      final PubSubRef<HelloRequest> topic =
          pubSub.refFor(TopicId.of(HelloRequest.class, id));
      return CompletableFuture.completedFuture(topic.subscriber());
    };
  }

}

ここではトピックの取得時に識別子としてURLのパスに埋め込まれたidを使用しているため、/device/1/helloに送信したメッセージは/device/1/hello/streamで読み取ることができます。このような分離が不要であれば識別子には空文字列や固定の文字列を渡しておけばいいようです。

では動作確認をしてみましょう。まずはターミナルからwscatでストリーム(WebSocket)を監視しておきます。

$ wscat -c ws://localhost:9000/device/1/hello/stream
connected (press CTRL+C to quit)
> 

別のターミナルからメッセージを送信してみます。

$ curl -XPOST http://localhost:9000/device/1/hello -d '{
  "area": "Japan",
  "name": "Naoki"
}'

するとwscatを実行しているほうのターミナルに送信したメッセージが表示されるはずです。

…とまあこんな感じでPub/Subモデルによるメッセージ送信ができます。Lagomのドキュメントによるとストリームをpublishの入力にしたり、イベントソーシングのエンティティからメッセージを送信することもできるようです。また、以下のような点が制限として挙げられています。

  • Pub/Subは同一のサービス内でないとできない
  • メッセージはロストする可能性がある
  • 複数ノードの場合、subscriberの登録が他のノードに反映されるまで数秒かかる場合がある

将来的にはサービスをまたいだPub/Subやat-least-onceな到達保証をする仕組みを提供する予定とのことです。ロストやsubscribeの遅延はともかく、同一サービス内でないと使えないという制約によって現状では使いどころがかなり限られそうな気がしますが、サービスをまたいで使えるようになると1対多のサービス間連携に使えそうです。