AWS Kinesisからの読み込みをAkka Streamsで行う

Akka Streams用の様々なコネクタを提供するAlpakkaですが、もちろんAWS Kinesis用のコネクタも含まれています。

しかし、書き込むためのFlow/Sinkは普通に使える感じのものなのですが、読み込むためのSourceはライセンスの問題からKCLを使ったコネクタを取り込むことができず、AWS SDKを使ったものしか含まれていません。このSourceは以下のような感じでシャード毎の設定を明示的に指定する必要があったり、チェックポイントの記録なども自前で行う必要があるなどそのまま使うにはちょっと厳しさがあります。

val mergeSettings = List(
  ShardSettings("myStreamName",
                "shard-id-1",
                ShardIteratorType.AT_SEQUENCE_NUMBER,
                startingSequenceNumber = Some("sequence"),
                refreshInterval = 1.second,
                limit = 500),
  ShardSettings("myStreamName",
                "shard-id-2",
                ShardIteratorType.AT_TIMESTAMP,
                atTimestamp = Some(new Date()),
                refreshInterval = 1.second,
                limit = 500)
)

val mergedSource: Source[Record, NotUsed] = 
  KinesisSource.basicMerge(mergeSettings, amazonKinesisAsync)

AlpakkaのドキュメントではKCLを使ったサードパーティのSourceの実装が2つ紹介されているのですが、以下のものを試してみました。

github.com

使い方はこんな感じです。InitialPositionInStream.LATESTを指定しているので未処理のレコードのみ取得されます。読み込んだレコードはmarkProcessed()メソッドで処理済みとしてマークしておくと、処理したところまでの分がチェックポイントとしてDynamoDBに書き込まれます。

val consumerConfig = new KinesisClientLibConfiguration(
    "test-app",
    "test-stream",
    new DefaultAWSCredentialsProviderChain(),
    "kinesis-worker"
  )
  .withRegionName("us-east-1")
  .withCallProcessRecordsEvenForEmptyRecordList(true)
  .withInitialPositionInStream(InitialPositionInStream.LATEST)

KinesisSource(consumerConfig).map { record =>
  try {
    ...
  } finally {
    record. markProcessed() // 処理済みとしてマーク
  }
}.runWith(Sink.ignore)

このコネクタは内部でメモリ上で未処理のレコードを管理しており、フラグを立て忘れたレコードがあると以降チェックポイントが書き込まれず、最終的にはOutOfMemoryになってしまうのでmarkProcessed()メソッドは必ず呼び出されるように実装しておく必要があります。

チェックポイントはデフォルトでは60秒間隔で書き込まれているようですが、このあたりの挙動はapplication.confで設定を行うことで変更できます。

com.contxt.kinesis.consumer.shard-checkpoint-config {
  checkpoint-period = 60 seconds
  checkpoint-after-processing-nr-of-records = 10000
  max-wait-for-completion-on-stream-shutdown = 10 seconds
}

ストリームを終了する際の挙動は少し悩ましいところで、ActorSystemのterminationに反応して未書き出しのチェックポイントを書き出すという終了処理を行なっているようです。そのためKillSwitchでストリームを終了させただけだと終了時のチェックポイントの書き出しが行われませんでした。

チェックポイントが最後まで書き込まれずに終了してしまうと次回起動時に前回処理済みのレコードが再度取得されてしまいます。ただ、チェックポイントは非同期で書き込んでいる以上、異常終了などでそのような状態になってしまうことは考えられます。チェックポイントの書き出し間隔をある程度短くしておき、受信側は同じメッセージを二重に受信しても冪等になるよう作っておくのがよいかもしれません。

動作を確認しつつソースコードにも目を通してみたのですが、GitHubのスターこそ少ないもののそれなりにしっかり作られたコネクタだと思います。Alpakkaのドキュメントでは以下のコネクタも紹介されています。こちらはチェックポイントの書き込みを自動で行うためのFlowが付属しているようです。機会があればこちらも試してみたいと思います。

github.com