Akka Streamsで巨大なXMLをストリーム処理する

Akka Streams用のコネクタを提供するAlpakkaにはXMLサポートも含まれており、XMLの読み込み・書き込みを行うためのFlowやSinkを利用することができます。

しかし読み込みはXMLのパースイベントをストリームするだけなので、実際には下流の処理で状態を管理したり、イベントをスタックするなどしてXMLの構造を判別する必要があります。複雑な構造のXMLの場合、これはかなりしんどいことになります。というわけでパスを指定するとその要素をorg.w3c.dom.Elementとして扱うことができるFlowを作ってみました。

github.com

このプルリクエストはすでにマージされていますので、Alpakkaの次のリリースから利用できるようになるかと思います。

使い方は以下のような感じでXmlParsing.parserと組み合わせて使います。ここではAkka Streams標準のFileIOを使ってファイルから読み込んでいますが、ByteStringを出力するコネクタであればなんでも使うことができます。

val graph = FileIO.fromPath(Paths.get("dataset.xml"))
  .via(XmlParsing.parser)
  .via(XmlParsing.subtree("doc" :: "elem" :: "item" :: Nil))
  .map { element =>
    ...
  }
  .to(Sink.ignore)

パースイベントのストリームと比べるとメモリ消費量は大きくなると思いますが、まあどっちみちXMLから情報を抽出するのであれば同じような処理を自分でやらないといけないと思いますし、DOM Elementなら他のXMLライブラリとも組み合わせて使いやすいかなと。

最近は巨大なデータを扱う場合はXMLではなく、より効率的なフォーマットを使うことが多いと思いますが、諸事情によりデータソースがXMLというケースもままあるのではないでしょうか。Akka StreamsとAlpakkaを使えば繰り返し構造を持つXMLを別のデータ形式に変換したり、データベースに格納するみたいな処理をシュッと書くことができるんじゃないかなと思います。