まずはbuild.sbt
にこんな感じの依存関係を追加します。Akka Persistenceの永続化部分はプラグイン形式になっており、永続化するストレージを選択できるようになっているのですが、Akkaのサンプルに従ってLevelDBを使うことにします。
libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % "2.4.2", "com.typesafe.akka" %% "akka-persistence" % "2.4.2", "org.iq80.leveldb" % "leveldb" % "0.7", "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8" )
src/main/resources
にapplication.conf
というファイル名で以下のような設定ファイルを作ります。LevelDBのJavaポートを使うように設定していますが、パフォーマンス的に問題があるためプロダクションでは推奨されていないようです。
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" akka.persistence.journal.leveldb.dir = "target/example/journal" akka.persistence.snapshot-store.local.dir = "target/example/snapshots" # DO NOT USE THIS IN PRODUCTION !!! # See also https://github.com/typesafehub/activator/issues/287 akka.persistence.journal.leveldb.native = false
アクターの実装は以下のような感じになります。通常のアクターとは大分様子が異なります。
import akka.persistence._ class SampleActor extends PersistentActor { override def persistenceId: String = "sample" var counter = 0 override def receiveRecover: Receive = { // 状態を復元 case SnapshotOffer(_, snapshot: Int) => counter = snapshot } override def receiveCommand: Receive = { // 状態を保存 case "snap" => saveSnapshot(counter) // 状態を変更 case "incr" => counter = counter + 1 // 状態を表示 case "print" => println(counter) } }
receiveCommand
メソッドで特定のメッセージを受け取った際にsaveSnapshot
で状態を保存しておき、receiveRecover
メソッドでSnapshotOffer
メッセージを受け取ったら状態を復元するようにします。
動作を確認してみます。
val system = ActorSystem("sample") val actor = system.actorOf(Props[SampleActor], "sample-actor") actor ! "print" actor ! "incr" actor ! "incr" actor ! "snap"
最初の"print"メッセージでは初期値の0が表示されます。その後2回"incr"メッセージで加算した後に"snap"メッセージで状態を保存しているので、このプログラムを次に起動すると最初の"print"メッセージでは2が表示されます。
このように保存・復元処理を自分で実装しなくてはならないので少々面倒ではありますが、Akka Persistenceを使うことでステートフルなアクターの状態を永続化しておくことができます。
次回はAkka Persistenceによるメッセージの永続化を試してみたいと思います。