Lagomを試してみるシリーズ第9回です。ドキュメントの順番でいくとユニットテストの章なんですが、ひとまず飛ばして永続化周りについて見ていきたいと思います。
Lagomの永続化APIはいわゆるORMのようにRDBMSなどのストレージへのアクセスを単純に抽象化したものではなく、イベントソーシングとCQRSに基づいたデータアクセスを実装するためのものになっています。今回はCQRSはひとまず置いておいてイベントソーシングを実現する部分だけを見ていきます。
イベントソーシングというのはエンティティに対するCRUDで状態変化を表現するのではなく、イベントログを積層しそのイベントをリプレイすることで最新の状態を復元するというものです。このような概念をストレージレベルで実現したものとしてClojureで実装されたDatomicというデータベースが有名です。Lagomはこれをフレームワークでサポートしています。
ちょっと複雑なのでactivatorで生成できるLagomのサンプルプロジェクトのソースを見ていきましょう。まずはイベントソースのエンティティであるHelloWorld
クラスです。
public class HelloWorld extends PersistentEntity<HelloCommand, HelloEvent, WorldState> { @Override public Behavior initialBehavior(Optional<WorldState> snapshotState) { BehaviorBuilder b = newBehaviorBuilder( snapshotState.orElse(new WorldState("Hello", LocalDateTime.now().toString()))); // UseGreetingMessageコマンドに対するコマンドハンドラ b.setCommandHandler(UseGreetingMessage.class, (cmd, ctx) -> // GreetingMessageChangedイベントを永続化 ctx.thenPersist(new GreetingMessageChanged(cmd.message), // 永続化に成功したらレスポンスを返す evt -> ctx.reply(Done.getInstance()))); // GreetingMessageChangedに対するイベントハンドラ b.setEventHandler(GreetingMessageChanged.class, // 現在のメッセージを変更 evt -> new WorldState(evt.message, LocalDateTime.now().toString())); // Helloコマンドに対するコマンドハンドラ b.setReadOnlyCommandHandler(Hello.class, // 現在のメッセージを返す (cmd, ctx) -> ctx.reply(state().message + ", " + cmd.name + "!")); return b.build(); } }
Lagomでは以下の要素でイベントソーシングの仕組みを実現しています。
- コマンド: このエンティティが受け付けるメッセージ。コマンドハンドラによって処理します。エンティティの型パラメータではそのエンティティが受け付けるコマンドの親スーパークラスもしくはインターフェースを指定します。
- イベント: コマンドによってイベントの永続化が成功すると発生します。イベントハンドラによって処理します。主にイベントの永続化をフックして状態変化をメモリ上のステートオブジェクトに反映するのに使用します。エンティティの型パラメータではそのエンティティが処理するイベントの親スーパークラスもしくはインターフェースを指定します。
- ステート: 現在の状態を保持するオブジェクトです。エンティティの型パラメータではクラスを指定します。
このサンプルで永続化対象になっているHelloEvent
は以下のようになっています。実際のコードはごちゃごちゃしていますが、要はmessage
というプロパティを持つ単純なJavaBeanです。
public interface HelloEvent extends Jsonable { @SuppressWarnings("serial") @Immutable @JsonDeserialize public final class GreetingMessageChanged implements HelloEvent { public final String message; @JsonCreator public GreetingMessageChanged(String message) { this.message = Preconditions.checkNotNull(message, "message"); } ... }
現在の状態を表すWordState
クラスも同じような感じです。
@SuppressWarnings("serial") @Immutable @JsonDeserialize public final class WorldState implements CompressedJsonable { public final String message; public final String timestamp; @JsonCreator public WorldState(String message, String timestamp) { this.message = Preconditions.checkNotNull(message, "message"); this.timestamp = Preconditions.checkNotNull(timestamp, "timestamp"); } ... }
最後にコマンドです。HelloCommand
インターフェース内に、その実装クラスとしてUseGreetingMessage
コマンドとHello
コマンドが定義されています。*1
public interface HelloCommand extends Jsonable { @SuppressWarnings("serial") @Immutable @JsonDeserialize public final class UseGreetingMessage implements HelloCommand, CompressedJsonable, PersistentEntity.ReplyType<Done> { public final String message; @JsonCreator public UseGreetingMessage(String message) { this.message = Preconditions.checkNotNull(message, "message"); } ... } @SuppressWarnings("serial") @Immutable @JsonDeserialize public final class Hello implements HelloCommand, PersistentEntity.ReplyType<String> { public final String name; public final Optional<String> organization; @JsonCreator public Hello(String name, Optional<String> organization) { this.name = Preconditions.checkNotNull(name, "name"); this.organization = Preconditions.checkNotNull(organization, "organization"); } ... } }
実際にこのエンティティを使用するサービスは以下のようにPersistentEntityRegistry
をDIし、そこから取得したPersistentEntityRef
に対してメッセージを送信します。このあたりはAkkaのアクターで処理されているようです。
public class HelloServiceImpl implements HelloService { private final PersistentEntityRegistry persistentEntityRegistry; @Inject public HelloServiceImpl(PersistentEntityRegistry persistentEntityRegistry) { this.persistentEntityRegistry = persistentEntityRegistry; persistentEntityRegistry.register(HelloWorld.class); } @Override public ServiceCall<String, NotUsed, String> hello() { return (id, request) -> { // Look up the hello world entity for the given ID. PersistentEntityRef<HelloCommand> ref = persistentEntityRegistry.refFor(HelloWorld.class, id); // Ask the entity the Hello command. return ref.ask(new Hello(id, Optional.empty())); }; } @Override public ServiceCall<String, GreetingMessage, Done> useGreeting() { return (id, request) -> { // Look up the hello world entity for the given ID. PersistentEntityRef<HelloCommand> ref = persistentEntityRegistry.refFor(HelloWorld.class, id); // Tell the entity to use the greeting message specified. return ref.ask(new UseGreetingMessage(request.message)); }; } }
なお、LagomはCassandraを内蔵しており、デフォルトでは永続化したイベントはCassandraに保存されているようです。
また、サービスを再起動する場合などエンティティを初期化する際に状態を復元するために毎回全イベントをリプレイするのは時間がかかるので、ある時点の状態を自動的にスナップショットとして保存しておいてくれる機能があり*2、エンティティのinitialBehavior
を以下のように実装しておくことでスナップショットから状態を復元できるようです。
@Override public Behavior initialBehavior(Optional<BlogState> snapshotState) { if (snapshotState.isPresent() && !snapshotState.get().isEmpty()) { return becomePostAdded(snapshotState.get()); } else { BehaviorBuilder b = newBehaviorBuilder(BlogState.EMPTY); // TODO define command and event handlers return b.build(); } }
この例はメモリ上に持っている状態を更新するという簡単なものなので参照もコマンドで処理することができましたが、より実用的な用途、たとえばエンティティをまたいだクエリを行いたいという場合にはこの仕組みだけでは対応することができません。そこで書き込みと読み込みを分離するというCQRSの考え方が登場します。これについては次回詳しく見ていきたいと思います。