読者です 読者をやめる 読者になる 読者になる

LagomでAkka Streamsを使ってWebSocketによるサービスを実装してみる

Lagomを試してみるシリーズ第6回です。今回はAkka Streamsを使ったストリーミングを試してみました。

まずは簡単な例としてレスポンスをストリームで返すサービスを定義してみます。Responseの型にAkka StreamsのSourceを指定している以外は通常のサービスと変わりません。

ServiceCall<Integer, String, Source<String, ?>> tick();

@Override
default Descriptor descriptor() {
  return named("helloservice").with(
    pathCall("/tick/:interval", tick())
  ).withAutoAcl(true);
}

実装は以下のような感じになります。このサービスはリクエストされたメッセージをintervalで指定された間隔で繰り返し返却するというものです。

@Override
public ServiceCall<Integer, String, Source<String, ?>> tick() {
  return (intervalMs, tickMessage) -> {
    FiniteDuration interval = FiniteDuration.create(intervalMs, TimeUnit.MILLISECONDS);
    return completedFuture(Source.tick(interval, interval, tickMessage));
  };
}

ストリームを使用したサービスはクライアントからはWebSocketで呼び出すことができるようなのでwscatを使ってテストしてみます。wscatはNode.jsで実装されたWebSocketクライアントで、コマンドラインでWebSocketの動作確認を行うことができます。

www.npmjs.com

実行結果は以下のようになります。

$ wscat -c ws://localhost:9000/tick/1000
connected (press CTRL+C to quit)
> hello
  < hello
  < hello
  < hello
  < hello

以下のようにRequestもSourceにすることができます。

public ServiceCall<NotUsed, Source<String, ?>, Source<String, ?>> sayHello() {
  return (id, names) -> completedFuture(names.map(name -> "Hello " + name));
}

動作は以下のようになります。

$ wscat -c ws://localhost:9000/sayHello
connected (press CTRL+C to quit)
> say
  < Hello say
> hello
  < Hello hello

WebSocketを使用したサービスが手軽に実装できるのはよいですね。