AlpakkaにはElasticsearch用のコネクタも含まれていて、これは元々自分がコントリビュートしたものなのですが、これまではFlow/Sinkによるバルク書き込みはindex
とupsert
にしか対応していませんでした。また、1つのストリームでindex
とupsert
を混ぜることもできなかったのですが、最近わけあってupdate
とdelete
も含め1つのストリームで処理したい事情があったのでできるようにしてみました。
このプルリクはすでにマージされているのでAlpakkaの次のリリース(Alpakka 0.21)から利用可能になると思います。
使い方は以下のような感じで、index
、update
、upsert
、delete
それぞれの操作に応じたメッセージを生成するだけです。すべてのメッセージをIncomingIndexMessage
もしくはIncomingUpsertMessage
にすればこれまでと同じ動作になります。
val requests = List[IncomingMessage[Book, NotUsed]]( IncomingIndexMessage(id = "00001", source = Book("Book 1")), IncomingUpsertMessage(id = "00002", source = Book("Book 2")), IncomingUpsertMessage(id = "00003", source = Book("Book 3")), IncomingUpdateMessage(id = "00004", source = Book("Book 4")), IncomingDeleteMessage(id = "00002") ) val f = Source(requests) .via(ElasticsearchFlow.create[Book]("books", "_doc")) .runWith(Sink.seq)
これでただ単に流れてきたデータを突っ込むだけでなく、更新や削除もできるようになるので、Akka Streams + Elasticsearchの用途がもう少し広がるのではないかと思います。