読者です 読者をやめる 読者になる 読者になる

Lagomの永続化APIで実現するイベントソーシング

Lagomを試してみるシリーズ第9回です。ドキュメントの順番でいくとユニットテストの章なんですが、ひとまず飛ばして永続化周りについて見ていきたいと思います。

Lagomの永続化APIはいわゆるORMのようにRDBMSなどのストレージへのアクセスを単純に抽象化したものではなく、イベントソーシングとCQRSに基づいたデータアクセスを実装するためのものになっています。今回はCQRSはひとまず置いておいてイベントソーシングを実現する部分だけを見ていきます。

イベントソーシングというのはエンティティに対するCRUDで状態変化を表現するのではなく、イベントログを積層しそのイベントをリプレイすることで最新の状態を復元するというものです。このような概念をストレージレベルで実現したものとしてClojureで実装されたDatomicというデータベースが有名です。Lagomはこれをフレームワークでサポートしています。

ちょっと複雑なのでactivatorで生成できるLagomのサンプルプロジェクトのソースを見ていきましょう。まずはイベントソースのエンティティであるHelloWorldクラスです。

public class HelloWorld extends PersistentEntity<HelloCommand, HelloEvent, WorldState> {

  @Override
  public Behavior initialBehavior(Optional<WorldState> snapshotState) {
    BehaviorBuilder b = newBehaviorBuilder(
        snapshotState.orElse(new WorldState("Hello", LocalDateTime.now().toString())));

    // UseGreetingMessageコマンドに対するコマンドハンドラ
    b.setCommandHandler(UseGreetingMessage.class, (cmd, ctx) ->
    // GreetingMessageChangedイベントを永続化
    ctx.thenPersist(new GreetingMessageChanged(cmd.message),
        // 永続化に成功したらレスポンスを返す
        evt -> ctx.reply(Done.getInstance())));

    // GreetingMessageChangedに対するイベントハンドラ
    b.setEventHandler(GreetingMessageChanged.class,
        // 現在のメッセージを変更
        evt -> new WorldState(evt.message, LocalDateTime.now().toString()));

    // Helloコマンドに対するコマンドハンドラ
    b.setReadOnlyCommandHandler(Hello.class,
        // 現在のメッセージを返す
        (cmd, ctx) -> ctx.reply(state().message + ", " + cmd.name + "!"));

    return b.build();
  }

}

Lagomでは以下の要素でイベントソーシングの仕組みを実現しています。

  • コマンド: このエンティティが受け付けるメッセージ。コマンドハンドラによって処理します。エンティティの型パラメータではそのエンティティが受け付けるコマンドの親スーパークラスもしくはインターフェースを指定します。
  • イベント: コマンドによってイベントの永続化が成功すると発生します。イベントハンドラによって処理します。主にイベントの永続化をフックして状態変化をメモリ上のステートオブジェクトに反映するのに使用します。エンティティの型パラメータではそのエンティティが処理するイベントの親スーパークラスもしくはインターフェースを指定します。
  • ステート: 現在の状態を保持するオブジェクトです。エンティティの型パラメータではクラスを指定します。

このサンプルで永続化対象になっているHelloEventは以下のようになっています。実際のコードはごちゃごちゃしていますが、要はmessageというプロパティを持つ単純なJavaBeanです。

public interface HelloEvent extends Jsonable {

  @SuppressWarnings("serial")
  @Immutable
  @JsonDeserialize
  public final class GreetingMessageChanged implements HelloEvent {
    public final String message;

    @JsonCreator
    public GreetingMessageChanged(String message) {
      this.message = Preconditions.checkNotNull(message, "message");
    }
    ...
}

現在の状態を表すWordStateクラスも同じような感じです。

@SuppressWarnings("serial")
@Immutable
@JsonDeserialize
public final class WorldState implements CompressedJsonable {

  public final String message;
  public final String timestamp;

  @JsonCreator
  public WorldState(String message, String timestamp) {
    this.message = Preconditions.checkNotNull(message, "message");
    this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp");
  }
  ...
}

最後にコマンドです。HelloCommandインターフェース内に、その実装クラスとしてUseGreetingMessageコマンドとHelloコマンドが定義されています。*1

public interface HelloCommand extends Jsonable {

  @SuppressWarnings("serial")
  @Immutable
  @JsonDeserialize
  public final class UseGreetingMessage implements HelloCommand, CompressedJsonable, PersistentEntity.ReplyType<Done> {
    public final String message;

    @JsonCreator
    public UseGreetingMessage(String message) {
      this.message = Preconditions.checkNotNull(message, "message");
    }
    ...
  }

  @SuppressWarnings("serial")
  @Immutable
  @JsonDeserialize
  public final class Hello implements HelloCommand, PersistentEntity.ReplyType<String> {
    public final String name;
    public final Optional<String> organization;

    @JsonCreator
    public Hello(String name, Optional<String> organization) {
      this.name = Preconditions.checkNotNull(name, "name");
      this.organization = Preconditions.checkNotNull(organization, "organization");
    }
    ...
  }

}

実際にこのエンティティを使用するサービスは以下のようにPersistentEntityRegistryをDIし、そこから取得したPersistentEntityRefに対してメッセージを送信します。このあたりはAkkaのアクターで処理されているようです。

public class HelloServiceImpl implements HelloService {

  private final PersistentEntityRegistry persistentEntityRegistry;

  @Inject
  public HelloServiceImpl(PersistentEntityRegistry persistentEntityRegistry) {
    this.persistentEntityRegistry = persistentEntityRegistry;
    persistentEntityRegistry.register(HelloWorld.class);
  }

  @Override
  public ServiceCall<String, NotUsed, String> hello() {
    return (id, request) -> {
      // Look up the hello world entity for the given ID.
      PersistentEntityRef<HelloCommand> ref = persistentEntityRegistry.refFor(HelloWorld.class, id);
      // Ask the entity the Hello command.
      return ref.ask(new Hello(id, Optional.empty()));
    };
  }

  @Override
  public ServiceCall<String, GreetingMessage, Done> useGreeting() {
    return (id, request) -> {
      // Look up the hello world entity for the given ID.
      PersistentEntityRef<HelloCommand> ref = persistentEntityRegistry.refFor(HelloWorld.class, id);
      // Tell the entity to use the greeting message specified.
      return ref.ask(new UseGreetingMessage(request.message));
    };

  }

}

なお、LagomはCassandraを内蔵しており、デフォルトでは永続化したイベントはCassandraに保存されているようです。

また、サービスを再起動する場合などエンティティを初期化する際に状態を復元するために毎回全イベントをリプレイするのは時間がかかるので、ある時点の状態を自動的にスナップショットとして保存しておいてくれる機能があり*2、エンティティのinitialBehaviorを以下のように実装しておくことでスナップショットから状態を復元できるようです。

@Override
public Behavior initialBehavior(Optional<BlogState> snapshotState) {
  if (snapshotState.isPresent() && !snapshotState.get().isEmpty()) {
    return becomePostAdded(snapshotState.get());
  } else {
    BehaviorBuilder b = newBehaviorBuilder(BlogState.EMPTY);

    // TODO define command and event handlers

    return b.build();
  }
}

この例はメモリ上に持っている状態を更新するという簡単なものなので参照もコマンドで処理することができましたが、より実用的な用途、たとえばエンティティをまたいだクエリを行いたいという場合にはこの仕組みだけでは対応することができません。そこで書き込みと読み込みを分離するというCQRSの考え方が登場します。これについては次回詳しく見ていきたいと思います。

*1:ここまでのソースコードを見るとイベント、コマンド、ステートクラス共にequals、hashCode、toStringメソッドが手動で実装されているのですが、これらはきちんと実装する必要があるんでしょうか?面倒なのでImmutablesで済ませてしまいたいところです。

*2:イベントが設定した回数ストアされるごとにスナップショットを取ってくれるようです。