Lagomの永続化APIで実現するCQRS

Lagomを試してみるシリーズ第10回です。今回はPersistenceの続きでRead-sideについて見ていきたいと思います。

前回も書いたようにLagomのPersistence APIは一般的なDBアクセスのためのものではなく、イベントソーシング+CQRSを実現することを目的としています。イベントソーシングの実装については前回見た通りですが、CQRSはコマンド(更新系)とクエリ(参照系)の責務を分離しようという考え方であり、Lagomでクエリをどう実装するかという話が残っています。

Lagomの永続化APIでの参照系の分離戦略は「イベントハンドラに非同期にイベントを通知するのであとは自分で頑張って参照用のデータストアに保存してね」というものです。その上で参照用のデータストアを自由に検索すればよいということになります。

では実装を見ていきましょう。まずはAggregateEventTagのシングルトンインスタンスを定義します。

public class HelloEventTag {
  public static final AggregateEventTag<HelloEvent> INSTANCE = AggregateEventTag.of(HelloEvent.class);
}

イベントはAggregateEventインターフェースを実装するようにし、aggregateTag()メソッドで先ほど定義したAggregateEventTagインスタンスを返すようにします。*1

public interface HelloEvent extends Jsonable, AggregateEvent<HelloEvent> {
 
  @Override
  default public AggregateEventTag<HelloEvent> aggregateTag() {
    return HelloEventTag.INSTANCE;
  }
 
  @SuppressWarnings("serial")
  @Immutable
  @JsonDeserialize
  public final class GreetingMessageChanged implements HelloEvent {
    public final String message;
 
    @JsonCreator
    public GreetingMessageChanged(String message) {
      this.message = Preconditions.checkNotNull(message, "message");
    }
    ...
  }
}

ここまで準備したらイベントを参照用のデータストアに永続化するためのイベントプロセッサを実装します。

public class HelloEventProcessor extends CassandraReadSideProcessor<HelloEvent> {
 
  @Override
  public AggregateEventTag<HelloEvent> aggregateTag() {
    return HelloEventTag.INSTANCE;
  }
 
  @Override
  public CompletionStage<Optional<UUID>> prepare(CassandraSession session) {
    return prepareWriteMessage(session).thenCompose(a -> noOffset());
  }
 
  @Override
  public EventHandlers defineEventHandlers(EventHandlersBuilder builder) {
    builder.setEventHandler(HelloEvent.GreetingMessageChanged.class, this::processGreetingMessageChanged);
    return builder.build();
  }
 
  private PreparedStatement writeTitle = null; // initialized in prepare
 
  private void setWriteMessage(PreparedStatement writeTitle) {
    this.writeTitle = writeTitle;
  }
 
  private CompletionStage<NotUsed> prepareWriteMessage(CassandraSession session) {
    return session.prepare("INSERT INTO helloworld.hello (id, message) VALUES (1, ?)")
      .thenApply(ps -> {
        setWriteMessage(ps);
        return NotUsed.getInstance();
      });
  }
 
  private CompletionStage<List<BoundStatement>> processGreetingMessageChanged(
                          HelloEvent.GreetingMessageChanged event, UUID offset) {
    BoundStatement bindWriteMessage = writeTitle.bind();
    bindWriteMessage.setString("message", event.message);
    return completedStatements(Arrays.asList(bindWriteMessage));
  }
}

LagomのデフォルトではCassandraを使用するのでCassandraReadSideProcessorを継承し、メソッドを以下のように実装します。

  • aggregateTag(): 処理するイベントのタグを返すようにします。ここでは冒頭で定義したAggregateEventTagのシングルトンインスタンスを返します。
  • prepare(): 参照用データストアに永続化するための準備をします。ここではメッセージを保存したあとにnoOffset()を返すようにしていますが、これはすべてのイベントを処理するということを意味します。起動時などにすべてのイベントがリプレイされるのを防ぐためにはオフセットを記録しておき、それを返すように実装する必要があります。
  • defineEventHandlers(): イベントを受け取って参照用のデータストアの更新処理を行うイベントハンドラを返します。

最後にサービスの実装クラスでこのイベントプロセッサをCassandraReadSideに登録します。

public class HelloServiceImpl implements HelloService {
 
  private final PersistentEntityRegistry persistentEntityRegistry;
  private final CassandraSession cassandraSession;
  @Inject
  public HelloServiceImpl(PersistentEntityRegistry persistentEntityRegistry,
                          CassandraSession cassandraSession,
                          CassandraReadSide cassandraReadSide) {
    this.persistentEntityRegistry = persistentEntityRegistry;
    this.cassandraSession = cassandraSession;
    persistentEntityRegistry.register(HelloWorld.class);
    cassandraReadSide.register(HelloEventProcessor.class);
  }
  ...

GreetingMessageChangedイベントが発生すると非同期にイベントハンドラが呼び出され、参照用テーブルのデータが更新されます。*2

サービスではDIしたCassandraSessionを使って参照用データを検索することができます。

こんな感じで参照用データの同期処理を自前で行う必要があるため記述量が多く面倒ではあるのですが、イベントを保存するデータストア(Cassandra)以外のDBに書き込むこともできるので自由度は高そうです。Javaには他にもイベントソーシング+CQRSを実現するためのフレームワークがあるようなのでそれらがどのような仕組みを提供しているのかも気になるところです。

*1:このタグはイベントをグルーピングしたりするためのもののようです。ここでは特に複数のイベントのグルーピングは行わないのでイベントと同じクラスでタグを定義しています。

*2:CassandraのCQLではINSERTとUPDATEの区別がないのでレコードが存在しなければ作成され、すでに存在すれば更新になります。また、参照用テーブルは事前に作成しておく必要があります。