読者です 読者をやめる 読者になる 読者になる

Akka Persistenceでメッセージの到達保証をしてみる

前回はAkka Persistenceでアクターの状態を保存するということをやってみましたが、今回は「メッセージを永続化することでメッセージがアクターに到達することを保証する」ということをやってみたいと思います。

アクターはこんな感じになります。

class SampleActor extends PersistentActor with AtLeastOnceDelivery {

  override def persistenceId: String = "sample"

  var state: State = ...

  override def receiveRecover: Receive = {
    case SnapshotOffer(_, snapshot: Int) => state = snapshot
    // リストアされたメッセージを処理
    case e: Event  => {
      updateState(e)
    }
  }

  override def receiveCommand: Receive = {
    case "snap"  => saveSnapshot(state)
    // メッセージを永続化してから処理
    case e: Event => persist(e){ x =>
      updateState(e)
    }
  }

  private def updateState(e: Event): Unit = {
    ...
  }

}

ポイントはアクターにAtLeastOnceDeliveryトレイトをミックスインしているのと、persistメソッドでメッセージを永続化してから処理を行っているところです。

ここでは1つのアクター内に閉じていますが、他のアクターに送信するメッセージを送信前に永続化しておくことで、メッセージの到達前にアクターがクラッシュした場合でもリストアされたメッセージが配信されることになります。

なお、処理されたメッセージは、処理後にスナップショットが取られたタイミングでパージされるようです。なのでメッセージが処理済みでもスナップショットが取得される前にアクターがクラッシュしてしまうと再度メッセージが配信されてしまいます。

AtLeastOnceDeliveryという名前の通り、最低1度メッセージが配信されることは保証されますが、重複したメッセージが配信されることもあり得るので、同じメッセージが二重に配信されても問題ないようアクターを実装しておく必要があるということになりますね。