akka-http-contribというリポジトリにRetry
というユーティリティがあり、リトライ処理が簡単に書けるようだったので試しに見てみました。
テストケースによると使い方はこんな感じです。
def flow[T] = Flow.fromFunction[(Int, T), (Try[Int], T)] { case (i, j) if i % 2 == 0 => (Failure(new Exception(“cooked failure”)), j) case (i, j) => (Success(i + 1), j) } val (source, sink) = TestSource.probe[Int] .map(i => (i, i)) .via(Retry(flow[Int]) { s => if (s < 42) Some((s + 1, s + 1)) // recover else None // give up }) .toMat(TestSink.probe)(Keep.both) .run()
Retry.apply()
は以下の2つのパラメータを取ります。
- リトライさせたい
Flow
。このFlow
はTry
(Success
またはFailure
)を返すように実装しておく必要があります。 - 失敗した時のリカバリ用関数。この関数は
Option
(Some
またはNone
)を返すように実装しておく必要があります。
Flow
がFailure
を返した場合、リカバリ用の関数が呼ばれるという仕組みです。リカバリ用の関数がSome
を返した場合はその値でリトライされ、None
を返した場合はリトライをギブアップします。
簡単なものですが、処理に失敗する可能性のあるフローをシンプルにリトライ可能にしたい場合には便利なユーティリティだと思います。akka-stream-contribにはこの他にも便利なユーティリティがあるようなのでAkka Streamsを使っているのであれば一度チェックしてみるとよいと思います。