Akka Streamsで簡単にリトライするFlowを作る

akka-http-contribというリポジトリRetryというユーティリティがあり、リトライ処理が簡単に書けるようだったので試しに見てみました。

github.com

テストケースによると使い方はこんな感じです。

def flow[T] = Flow.fromFunction[(Int, T), (Try[Int], T)] {
  case (i, j) if i % 2 == 0 
    => (Failure(new Exception(“cooked failure”)), j)
  case (i, j)
    => (Success(i + 1), j)
}
val (source, sink) = TestSource.probe[Int]
  .map(i => (i, i))
  .via(Retry(flow[Int]) { s =>
    if (s < 42) Some((s + 1, s + 1)) // recover
    else None // give up
  })
  .toMat(TestSink.probe)(Keep.both)
  .run()

Retry.apply()は以下の2つのパラメータを取ります。

  • リトライさせたいFlow。このFlowTrySuccessまたはFailure)を返すように実装しておく必要があります。
  • 失敗した時のリカバリ用関数。この関数はOptionSomeまたはNone)を返すように実装しておく必要があります。

FlowFailureを返した場合、リカバリ用の関数が呼ばれるという仕組みです。リカバリ用の関数がSomeを返した場合はその値でリトライされ、Noneを返した場合はリトライをギブアップします。

簡単なものですが、処理に失敗する可能性のあるフローをシンプルにリトライ可能にしたい場合には便利なユーティリティだと思います。akka-stream-contribにはこの他にも便利なユーティリティがあるようなのでAkka Streamsを使っているのであれば一度チェックしてみるとよいと思います。