Lagomを試してみるシリーズ第10回です。今回はPersistenceの続きでRead-sideについて見ていきたいと思います。
前回も書いたようにLagomのPersistence APIは一般的なDBアクセスのためのものではなく、イベントソーシング+CQRSを実現することを目的としています。イベントソーシングの実装については前回見た通りですが、CQRSはコマンド(更新系)とクエリ(参照系)の責務を分離しようという考え方であり、Lagomでクエリをどう実装するかという話が残っています。
Lagomの永続化APIでの参照系の分離戦略は「イベントハンドラに非同期にイベントを通知するのであとは自分で頑張って参照用のデータストアに保存してね」というものです。その上で参照用のデータストアを自由に検索すればよいということになります。
では実装を見ていきましょう。まずはAggregateEventTag
のシングルトンインスタンスを定義します。
public class HelloEventTag { public static final AggregateEventTag<HelloEvent> INSTANCE = AggregateEventTag.of(HelloEvent.class); }
イベントはAggregateEvent
インターフェースを実装するようにし、aggregateTag()
メソッドで先ほど定義したAggregateEventTag
のインスタンスを返すようにします。*1
public interface HelloEvent extends Jsonable, AggregateEvent<HelloEvent> { @Override default public AggregateEventTag<HelloEvent> aggregateTag() { return HelloEventTag.INSTANCE; } @SuppressWarnings("serial") @Immutable @JsonDeserialize public final class GreetingMessageChanged implements HelloEvent { public final String message; @JsonCreator public GreetingMessageChanged(String message) { this.message = Preconditions.checkNotNull(message, "message"); } ... } }
ここまで準備したらイベントを参照用のデータストアに永続化するためのイベントプロセッサを実装します。
public class HelloEventProcessor extends CassandraReadSideProcessor<HelloEvent> { @Override public AggregateEventTag<HelloEvent> aggregateTag() { return HelloEventTag.INSTANCE; } @Override public CompletionStage<Optional<UUID>> prepare(CassandraSession session) { return prepareWriteMessage(session).thenCompose(a -> noOffset()); } @Override public EventHandlers defineEventHandlers(EventHandlersBuilder builder) { builder.setEventHandler(HelloEvent.GreetingMessageChanged.class, this::processGreetingMessageChanged); return builder.build(); } private PreparedStatement writeTitle = null; // initialized in prepare private void setWriteMessage(PreparedStatement writeTitle) { this.writeTitle = writeTitle; } private CompletionStage<NotUsed> prepareWriteMessage(CassandraSession session) { return session.prepare("INSERT INTO helloworld.hello (id, message) VALUES (1, ?)") .thenApply(ps -> { setWriteMessage(ps); return NotUsed.getInstance(); }); } private CompletionStage<List<BoundStatement>> processGreetingMessageChanged( HelloEvent.GreetingMessageChanged event, UUID offset) { BoundStatement bindWriteMessage = writeTitle.bind(); bindWriteMessage.setString("message", event.message); return completedStatements(Arrays.asList(bindWriteMessage)); } }
LagomのデフォルトではCassandraを使用するのでCassandraReadSideProcessor
を継承し、メソッドを以下のように実装します。
aggregateTag()
: 処理するイベントのタグを返すようにします。ここでは冒頭で定義したAggregateEventTag
のシングルトンインスタンスを返します。prepare()
: 参照用データストアに永続化するための準備をします。ここではメッセージを保存したあとにnoOffset()
を返すようにしていますが、これはすべてのイベントを処理するということを意味します。起動時などにすべてのイベントがリプレイされるのを防ぐためにはオフセットを記録しておき、それを返すように実装する必要があります。defineEventHandlers()
: イベントを受け取って参照用のデータストアの更新処理を行うイベントハンドラを返します。
最後にサービスの実装クラスでこのイベントプロセッサをCassandraReadSide
に登録します。
public class HelloServiceImpl implements HelloService { private final PersistentEntityRegistry persistentEntityRegistry; private final CassandraSession cassandraSession; @Inject public HelloServiceImpl(PersistentEntityRegistry persistentEntityRegistry, CassandraSession cassandraSession, CassandraReadSide cassandraReadSide) { this.persistentEntityRegistry = persistentEntityRegistry; this.cassandraSession = cassandraSession; persistentEntityRegistry.register(HelloWorld.class); cassandraReadSide.register(HelloEventProcessor.class); } ...
GreetingMessageChanged
イベントが発生すると非同期にイベントハンドラが呼び出され、参照用テーブルのデータが更新されます。*2
サービスではDIしたCassandraSession
を使って参照用データを検索することができます。
こんな感じで参照用データの同期処理を自前で行う必要があるため記述量が多く面倒ではあるのですが、イベントを保存するデータストア(Cassandra)以外のDBに書き込むこともできるので自由度は高そうです。Javaには他にもイベントソーシング+CQRSを実現するためのフレームワークがあるようなのでそれらがどのような仕組みを提供しているのかも気になるところです。