前回はAkka Persistenceでアクターの状態を保存するということをやってみましたが、今回は「メッセージを永続化することでメッセージがアクターに到達することを保証する」ということをやってみたいと思います。
アクターはこんな感じになります。
class SampleActor extends PersistentActor with AtLeastOnceDelivery { override def persistenceId: String = "sample" var state: State = ... override def receiveRecover: Receive = { case SnapshotOffer(_, snapshot: Int) => state = snapshot // リストアされたメッセージを処理 case e: Event => { updateState(e) } } override def receiveCommand: Receive = { case "snap" => saveSnapshot(state) // メッセージを永続化してから処理 case e: Event => persist(e){ x => updateState(e) } } private def updateState(e: Event): Unit = { ... } }
ポイントはアクターにAtLeastOnceDelivery
トレイトをミックスインしているのと、persist
メソッドでメッセージを永続化してから処理を行っているところです。
ここでは1つのアクター内に閉じていますが、他のアクターに送信するメッセージを送信前に永続化しておくことで、メッセージの到達前にアクターがクラッシュした場合でもリストアされたメッセージが配信されることになります。
なお、処理されたメッセージは、処理後にスナップショットが取られたタイミングでパージされるようです。なのでメッセージが処理済みでもスナップショットが取得される前にアクターがクラッシュしてしまうと再度メッセージが配信されてしまいます。
AtLeastOnceDelivery
という名前の通り、最低1度メッセージが配信されることは保証されますが、重複したメッセージが配信されることもあり得るので、同じメッセージが二重に配信されても問題ないようアクターを実装しておく必要があるということになりますね。