ちょっと間が空いてしまいましたがLagomを試してみるシリーズ第11回です。
今回はPub/Subモデルによるメッセージ送信を試してみようと思いますが、その前にLagomの1.0.0-M2がリリースされ、APIが少し変わっているようです。中でも影響が大きいのはServiceCall
はこれまでId、Request、Responseという3つの型パラメータを持っていたのですが、RequestとResponseの2つになり、Idはサービスのメソッドの引数で受け取るようになっていることでしょうか。
そんなわけで今回からサンプルコードは1.0.0-M2ベースで書いていきたいと思います。
まず、Pub/Subを行うプロジェクトの依存関係にlagomJavadslPubSub
を追加します。
lazy val helloworldImpl = project("helloworld-impl") .enablePlugins(LagomJava) .settings( version := "1.0-SNAPSHOT", libraryDependencies ++= Seq( lagomJavadslPersistence, lagomJavadslPubSub, // この行を追加 lagomJavadslTestKit ) ) .settings(lagomForkedTestSettings: _*) .dependsOn(helloworldApi)
サービスの定義はこんな感じです。registerHello
はトピックにメッセージをpublishし、helloStream
はそれを受信してストリームに流します。
public interface HelloService extends Service { ServiceCall<HelloRequest, NotUsed> registerHello(String id); ServiceCall<NotUsed, Source<HelloRequest, ?>> helloStream(String id); @Override default Descriptor descriptor() { // @formatter:off return named("helloservice").with( pathCall("/device/:id/hello", this::registerHello), pathCall("/device/:id/hello/stream", this::helloStream) ).withAutoAcl(true); // @formatter:on } }
サービスの実装は以下のようになります。PubSubRegistry
をDIして操作します。今までのサンプルコードと違い、idをメソッドの引数として受け取っている点に注意してください。
public class HelloServiceImpl implements HelloService { private final PubSubRegistry pubSub; @Inject public HelloServiceImpl(PubSubRegistry pubSub){ this.pubSub = pubSub; } @Override public ServiceCall<HelloRequest, NotUsed> registerHello(String id) { return request -> { final PubSubRef<HelloRequest> topic = pubSub.refFor(TopicId.of(HelloRequest.class, id)); topic.publish(request); return CompletableFuture.completedFuture(NotUsed.getInstance()); }; } @Override public ServiceCall<NotUsed, Source<HelloRequest, ?>> helloStream(String id) { return request -> { final PubSubRef<HelloRequest> topic = pubSub.refFor(TopicId.of(HelloRequest.class, id)); return CompletableFuture.completedFuture(topic.subscriber()); }; } }
ここではトピックの取得時に識別子としてURLのパスに埋め込まれたidを使用しているため、/device/1/hello
に送信したメッセージは/device/1/hello/stream
で読み取ることができます。このような分離が不要であれば識別子には空文字列や固定の文字列を渡しておけばいいようです。
では動作確認をしてみましょう。まずはターミナルからwscatでストリーム(WebSocket)を監視しておきます。
$ wscat -c ws://localhost:9000/device/1/hello/stream connected (press CTRL+C to quit) >
別のターミナルからメッセージを送信してみます。
$ curl -XPOST http://localhost:9000/device/1/hello -d '{ "area": "Japan", "name": "Naoki" }'
するとwscatを実行しているほうのターミナルに送信したメッセージが表示されるはずです。
…とまあこんな感じでPub/Subモデルによるメッセージ送信ができます。Lagomのドキュメントによるとストリームをpublishの入力にしたり、イベントソーシングのエンティティからメッセージを送信することもできるようです。また、以下のような点が制限として挙げられています。
- Pub/Subは同一のサービス内でないとできない
- メッセージはロストする可能性がある
- 複数ノードの場合、subscriberの登録が他のノードに反映されるまで数秒かかる場合がある
将来的にはサービスをまたいだPub/Subやat-least-onceな到達保証をする仕組みを提供する予定とのことです。ロストやsubscribeの遅延はともかく、同一サービス内でないと使えないという制約によって現状では使いどころがかなり限られそうな気がしますが、サービスをまたいで使えるようになると1対多のサービス間連携に使えそうです。