Lagomを試してみるシリーズ第6回です。今回はAkka Streamsを使ったストリーミングを試してみました。
まずは簡単な例としてレスポンスをストリームで返すサービスを定義してみます。Responseの型にAkka StreamsのSource
を指定している以外は通常のサービスと変わりません。
ServiceCall<Integer, String, Source<String, ?>> tick(); @Override default Descriptor descriptor() { return named("helloservice").with( pathCall("/tick/:interval", tick()) ).withAutoAcl(true); }
実装は以下のような感じになります。このサービスはリクエストされたメッセージをinterval
で指定された間隔で繰り返し返却するというものです。
@Override public ServiceCall<Integer, String, Source<String, ?>> tick() { return (intervalMs, tickMessage) -> { FiniteDuration interval = FiniteDuration.create(intervalMs, TimeUnit.MILLISECONDS); return completedFuture(Source.tick(interval, interval, tickMessage)); }; }
ストリームを使用したサービスはクライアントからはWebSocketで呼び出すことができるようなのでwscatを使ってテストしてみます。wscatはNode.jsで実装されたWebSocketクライアントで、コマンドラインでWebSocketの動作確認を行うことができます。
実行結果は以下のようになります。
$ wscat -c ws://localhost:9000/tick/1000 connected (press CTRL+C to quit) > hello < hello < hello < hello < hello
以下のようにRequestもSource
にすることができます。
public ServiceCall<NotUsed, Source<String, ?>, Source<String, ?>> sayHello() { return (id, names) -> completedFuture(names.map(name -> "Hello " + name)); }
動作は以下のようになります。
$ wscat -c ws://localhost:9000/sayHello connected (press CTRL+C to quit) > say < Hello say > hello < Hello hello
WebSocketを使用したサービスが手軽に実装できるのはよいですね。