Akka StreamsのElasticsearchコネクタでupdateやdeleteもできるようにした

AlpakkaにはElasticsearch用のコネクタも含まれていて、これは元々自分がコントリビュートしたものなのですが、これまではFlow/Sinkによるバルク書き込みはindexupsertにしか対応していませんでした。また、1つのストリームでindexupsertを混ぜることもできなかったのですが。最近わけあってupdatedeleteも含め1つのストリームで処理したい事情があったのでできるようにしてみました。

github.com

このプルリクはすでにマージされているのでAlpakkaの次のリリース(Alpakka 0.21)から利用可能になると思います。

使い方は以下のような感じで、indexupdateupsertdeleteそれぞれの操作に応じたメッセージを生成するだけです。すべてのメッセージをIncomingIndexMessageもしくはIncomingUpsertMessageにすればこれまでと同じ動作になります。

val requests = List[IncomingMessage[Book, NotUsed]](
  IncomingIndexMessage(id = "00001", source = Book("Book 1")),
  IncomingUpsertMessage(id = "00002", source = Book("Book 2")),
  IncomingUpsertMessage(id = "00003", source = Book("Book 3")),
  IncomingUpdateMessage(id = "00004", source = Book("Book 4")),
  IncomingDeleteMessage(id = "00002")
)

val f = Source(requests)
  .via(ElasticsearchFlow.create[Book]("books", "_doc"))
  .runWith(Sink.seq)

これでただ単に流れてきたデータを突っ込むだけでなく、更新や削除もできるようになるので、Akka Streams + Elasticsearchの用途がもう少し広がるのではないかと思います。