Akka Streams用のElasticsearchコネクタを作ってみた

github.com

元々はAlpakkaにプルリクしていたのですが、取り込んでもらうのに時間がかかりそう or 取り込んでもらえるかわからない感じなので自分で使う用に別リポジトリで単独のライブラリとして公開しました。Maven Centralにもpublishしてあります。

Elasticsearchからスクロールスキャンでデータの読み込みを行うSourceと、バルクでデータの投入を行うSink(Flowもあります)を提供しています。使い方は以下のような感じです。ElasticsearchのRestClientをimplicit valで定義しておく必要があります。

implicit val client = RestClient.builder(
  new HttpHost("localhost", 9201)).build()

val f1 = ElasticsearchSource(
  "source",
  "book",
  """{"match_all": {}}""",
  ElasticsearchSourceSettings(5)
)
.map { message: OutgoingMessage[JsObject] =>
  IncomingMessage(Some(message.id), message.source)
}
.runWith(
  ElasticsearchSink(
    "sink1",
    "book",
    ElasticsearchSinkSettings(5)
  )
)

SourceはOutgoingMessage[JsObject]で出力し、SinkはIncomingMessage[JsObject]でデータを受け取ります。JSONライブラリとしてはspray-jsonを使っています(Java向けのAPIも提供しており、そちらではJacksonを使っています)。

spray-jsonのFormatを定義することで、JsObjectではなくケースクラスなどでデータを扱うこともできます。

case class Book(title: String)
implicit val format = jsonFormat1(Book)

val f1 = ElasticsearchSource
  .typed[Book](
    "source",
    "book",
    """{"match_all": {}}""",
    ElasticsearchSourceSettings(5)
  )
  .map { message: OutgoingMessage[Book] =>
    IncomingMessage(Some(message.id), message.source)
  }
  .runWith(
    ElasticsearchSink.typed[Book](
      "sink2",
      "book",
      ElasticsearchSinkSettings(5)
    )
  )

Sourceはスクロールスキャンで読み込んだドキュメントをバッファリングしており下流からpullされた分だけpushし、バッファが空になるとElasticsearchから次のページの分を取得するという動きになっています。Sinkもバッファを持っており、バッファに溜まった分をElasticsearchにバルクリクエストでインデキシングするという感じになっています。Sink側の挙動に関しては改善の余地があるかなと思っています。