Akka Streamsで巨大なXMLをストリーム処理する

Akka Streams用のコネクタを提供するAlpakkaにはXMLサポートも含まれており、XMLの読み込み・書き込みを行うためのFlowやSinkを利用することができます。

しかし読み込みはXMLのパースイベントをストリームするだけなので、実際には下流の処理で状態を管理したり、イベントをスタックするなどしてXMLの構造を判別する必要があります。複雑な構造のXMLの場合、これはかなりしんどいことになります。というわけでパスを指定するとその要素をorg.w3c.dom.Elementとして扱うことができるFlowを作ってみました。

github.com

このプルリクエストはすでにマージされていますので、Alpakkaの次のリリースから利用できるようになるかと思います。

使い方は以下のような感じでXmlParsing.parserと組み合わせて使います。ここではAkka Streams標準のFileIOを使ってファイルから読み込んでいますが、ByteStringを出力するコネクタであればなんでも使うことができます。

val graph = FileIO.fromPath(Paths.get("dataset.xml"))
  .via(XmlParsing.parser)
  .via(XmlParsing.subtree("doc" :: "elem" :: "item" :: Nil))
  .map { element =>
    ...
  }
  .to(Sink.ignore)

パースイベントのストリームと比べるとメモリ消費量は大きくなると思いますが、まあどっちみちXMLから情報を抽出するのであれば同じような処理を自分でやらないといけないと思いますし、DOM Elementなら他のXMLライブラリとも組み合わせて使いやすいかなと。

最近は巨大なデータを扱う場合はXMLではなく、より効率的なフォーマットを使うことが多いと思いますが、諸事情によりデータソースがXMLというケースもままあるのではないでしょうか。Akka StreamsとAlpakkaを使えば繰り返し構造を持つXMLを別のデータ形式に変換したり、データベースに格納するみたいな処理をシュッと書くことができるんじゃないかなと思います。

AWS Kinesisからの読み込みをAkka Streamsで行う

Akka Streams用の様々なコネクタを提供するAlpakkaですが、もちろんAWS Kinesis用のコネクタも含まれています。

しかし、書き込むためのFlow/Sinkは普通に使える感じのものなのですが、読み込むためのSourceはライセンスの問題からKCLを使ったコネクタを取り込むことができず、AWS SDKを使ったものしか含まれていません。このSourceは以下のような感じでシャード毎の設定を明示的に指定する必要があったり、チェックポイントの記録なども自前で行う必要があるなどそのまま使うにはちょっと厳しさがあります。

val mergeSettings = List(
  ShardSettings("myStreamName",
                "shard-id-1",
                ShardIteratorType.AT_SEQUENCE_NUMBER,
                startingSequenceNumber = Some("sequence"),
                refreshInterval = 1.second,
                limit = 500),
  ShardSettings("myStreamName",
                "shard-id-2",
                ShardIteratorType.AT_TIMESTAMP,
                atTimestamp = Some(new Date()),
                refreshInterval = 1.second,
                limit = 500)
)

val mergedSource: Source[Record, NotUsed] = 
  KinesisSource.basicMerge(mergeSettings, amazonKinesisAsync)

AlpakkaのドキュメントではKCLを使ったサードパーティのSourceの実装が2つ紹介されているのですが、以下のものを試してみました。

github.com

使い方はこんな感じです。InitialPositionInStream.LATESTを指定しているので未処理のレコードのみ取得されます。読み込んだレコードはmarkProcessed()メソッドで処理済みとしてマークしておくと、処理したところまでの分がチェックポイントとしてDynamoDBに書き込まれます。

val consumerConfig = new KinesisClientLibConfiguration(
    "test-app",
    "test-stream",
    new DefaultAWSCredentialsProviderChain(),
    "kinesis-worker"
  )
  .withRegionName("us-east-1")
  .withCallProcessRecordsEvenForEmptyRecordList(true)
  .withInitialPositionInStream(InitialPositionInStream.LATEST)

KinesisSource(consumerConfig).map { record =>
  try {
    ...
  } finally {
    record. markProcessed() // 処理済みとしてマーク
  }
}.runWith(Sink.ignore)

このコネクタは内部でメモリ上で未処理のレコードを管理しており、フラグを立て忘れたレコードがあると以降チェックポイントが書き込まれず、最終的にはOutOfMemoryになってしまうのでmarkProcessed()メソッドは必ず呼び出されるように実装しておく必要があります。

チェックポイントはデフォルトでは60秒間隔で書き込まれているようですが、このあたりの挙動はapplication.confで設定を行うことで変更できます。

com.contxt.kinesis.consumer.shard-checkpoint-config {
  checkpoint-period = 60 seconds
  checkpoint-after-processing-nr-of-records = 10000
  max-wait-for-completion-on-stream-shutdown = 10 seconds
}

ストリームを終了する際の挙動は少し悩ましいところで、ActorSystemのterminationに反応して未書き出しのチェックポイントを書き出すという終了処理を行なっているようです。そのためKillSwitchでストリームを終了させただけだと終了時のチェックポイントの書き出しが行われませんでした。

チェックポイントが最後まで書き込まれずに終了してしまうと次回起動時に前回処理済みのレコードが再度取得されてしまいます。ただ、チェックポイントは非同期で書き込んでいる以上、異常終了などでそのような状態になってしまうことは考えられます。チェックポイントの書き出し間隔をある程度短くしておき、受信側は同じメッセージを二重に受信しても冪等になるよう作っておくのがよいかもしれません。

動作を確認しつつソースコードにも目を通してみたのですが、GitHubのスターこそ少ないもののそれなりにしっかり作られたコネクタだと思います。Alpakkaのドキュメントでは以下のコネクタも紹介されています。こちらはチェックポイントの書き込みを自動で行うためのFlowが付属しているようです。機会があればこちらも試してみたいと思います。

github.com

GitBucket 4.26.0をリリースしました

Scalaで実装されたオープンソースのGitサーバ、GitBucket 4.26.0をリリースしました。

https://github.com/takezoe/gitbucket/releases/tag/4.26.0

セントラルレジストリからのプラグインインストール

先日運用を開始したプラグインレジストリからインターネット経由でプラグインをインストールできるようになりました。

f:id:takezoe:20180627120403p:plain

プラグインレジストリについては以下のエントリを参照していただければと思います。

takezoe.hatenablog.com

ダッシュボードにRepositoriesタブを追加

ダッシュボードにログイン中のユーザが参照可能なリポジトリの一覧を表示するRepositoriesを追加しました。

f:id:takezoe:20180627120534p:plain

これに伴い、ダッシュボードのサイドバーは非ログイン時もログイン時も最近更新されたリポジトリの一覧に固定されました(これまではログイン時は参照可能なリポジトリの一覧が表示されていました)。

Forkダイアログの改善

"Fork"ボタンがサイドバーから画面の右上のボタンに移動しました。このボタンをクリックすることでフォークするアカウントを選択するダイアログが表示されます。

f:id:takezoe:20180627120801p:plain

ダイアログはカードスタイルからリストスタイルに変更されています。長いアカウント名でも省略されなくなったため、目的のアカウントを見つけるのが容易になっています。また、"Show forks"ボタンをクリックするとフォークの一覧を表示することができます。

f:id:takezoe:20180627120933p:plain

クイックプルリクエストのサジェストを抑制

リポジトリビューアにはブランチからワンクリックでプルリクエスト作成画面に遷移できるサジェストが表示されますが、この表示条件を以下のように修正することにより、不要なサジェストの表示を抑制しました。

  • A last committer of the branch is the logged-in user
  • A last commit of the branch is within one hour
  • The branch isn't behind of the default branch

f:id:takezoe:20180627121139p:plain

未完了のタスクリストの表示

プルリクエストでは古いコミットに対するコメントはデフォルトでは折りたたまれて表示されますが、未完了のタスクリストを含むコメントについては折りたたまずに表示するようになりました。

f:id:takezoe:20180627121326p:plain

プラグイン向けに新しい通知フックを追加

通知に関して、以下のフックを追加しました。

  • assigned
  • closedByCommitComment

実際に通知を有効にするにはGitBucketのバージョンアップ後にgitbucket-notification-pluginを最新版に更新する必要があります。

今回のバージョンではこの他にも様々な改善やバグフィックスを行っています。詳細についてはIssueの一覧をご覧ください。

GitBucketのプラグインレジストリの運用を始めました

f:id:takezoe:20180618024108p:plain

GitBucketはプラグイン機構を持っているのですが、Jenkinsのようにインターネット経由でインストールできるようにしたいという考えはGitBucketを作り始めた頃からありました。また、実際にプラグイン機構を導入してみると、本体の修正にあわせてプラグインの修正が必要な場合の検出が難しかったり、ソースコードの修正は不要でもバイナリ互換性の問題で再コンパイルが必要な場合があり、プラグインのメンテナンスコストの点で問題を感じていました。

そこで、インターネット経由でプラグインをインストールするためのセントラルレジストリ兼ビルドファームをここ1ヶ月ほどコツコツ作っていたのですが、ようやく形になってきたので以下で運用を初めてみました。

https://plugins.gitbucket-community.org/

このサイトは以下の機能を持っています。

  • プラグインの開発中バージョン(master)がGitBucket本体のmasterでビルドが通るかチェックする
  • プラグインの最新のリリース版(最新のタグ)がGitBucket本体のmasterでビルドが通るかチェックする
  • プラグインまたはGitBucketに新しいタグが打たれるとビルドしてプラグインのjarファイルをダウンロード可能にする

プラグインは最新のタグのバージョンがGitBucketのリリース毎に自動的にビルドされるという形になります(jarファイル名にGitBucketのバージョンが含まれるようになります)。インストール時はお使いのGitBucketのバージョン用にビルドされたjarファイルを使用することでこれまでのようにプラグインが動作するGitBucketのバージョンを気にする必要はなくなります。

現在はGitBucketのorganizationで開発されているプラグインしか登録されていませんが、将来的にはサードパーティ製のプラグインも登録できるようにする予定です。プラグイン作者の方のメンテナンスコストの低減、ユーザの利便性の向上の両方に繋がるのではないかと思っています。

GitBucket側でこのレジストリからプラグインをインストールできるようにする機能もすでに実装してあるのですが、次のリリースに含めるかどうかはまだ検討しているところです。

github.com

ASUSのリアル店舗にスマートフォンを修理に出してみた

f:id:takezoe:20180617020005j:plain

ASUSのZenfone2というSIMフリー端末を3年ほど使っていたのですが、うっかり落としてディスプレイをバキバキに割ってしまい、バッテリーがだいぶヘタってきていたこともあり新しい端末に変えようと思い、価格重視で同じくASUSのZenfone4 Maxという端末を購入しました。

ローエンドの機種ですが、まあ自分の場合そんなに大した使い方をするわけではないので性能的には特に不満はないです。Zenfone2と比べるとディスプレイが小さく解像度も低いのでPCサイトを見ると字が潰れて読みにくくなったかなというくらいでしょうか。あとはZenfone2とSIMのサイズが違ったので(Zenfone2はMicroSIMだったのですがZenfone4 MaxはNanoSIM)SIM交換の手間がかかりました。BIC SIM(IIJ Mio)なので即時発行の手数料はかかりますがビックカメラの店頭で再発行してもらえるのは便利です。

ところが、使い始めて二週間ほどでいつの間にかメインカメラが動作しなくなってしまいました。購入直後に二段階認証の移行などでQRコードを撮影した記憶はあるので初期不良ではないと思われ、仕方なく修理に出すことにしました。海外メーカーの端末ですし、電話したり郵送したりするのは面倒だなーと思っていたのですが、調べたところどうやら赤坂にリアル店舗があり、修理も受け付けてくれているようです。職場からも近いのでWebサイトから予約して修理に出しに行ってみました。

k-tai.watch.impress.co.jp

店舗はアップルストアのような感じでだいぶお洒落な感じでした。様々なASUS製品やアクセサリの販売もしているのですが、やはりサポート・修理拠点としての意味合いが強いのでしょうか。自分以外にも数名のお客さんがいましたが、さほど待たされることもなく端末をチェックしてもらい、カメラの作動音からおそらくオートフォーカスの故障と思われるのでメインカメラの交換が必要、保証期間内なので無料で修理可能ですが2日かかるとのことでそのまま修理をお願いし、2日後無事受け取ることができました。

いままでPCなどではリアル店舗でのサポートの必要性を感じたことはなかったのですが(PCは修理に出しても職場には作業環境があるし、最悪数週間手元になくてもなんとかなる)、スマートフォン、特に非キャリア端末の場合代替機の貸し出しなどもないので国内で短期間で修理・受取が可能なのは便利だなと感じました。

今回のように安価な端末の場合は諦めて買い直すというのも選択肢の1つですが、ハイエンドの端末だとなかなかそうもいきません。電話サポートだと面倒なことになるケースも多いですが、ASUS Storeの店員さんは対応も丁寧でしたし、スキルもだいぶ高そうな感じでまったくストレスはありませんでした。特に直接店舗を訪問できる環境の方にとってはASUS製品を安心して購入できる材料になるのではないかと思います。

MacでThinkPad Bluetoothキーボードを使う場合のKarabiner Elementsの設定

職場で使っているMacBook Proの外付けキーボードをThinkPad Bluetoothキーボードにしてみました。

モノ自体はUSB接続の有線版と同じですが、Macで使う場合、ファンクションキーの入力にFnキーとのコンビネーションが必要なのを変更できなかったり、トラックポイントのスクロールボタンの挙動がちょっと違う感じがします。US配列ですが、Karabiner Elementsは以下のような設定にしてみました。

f:id:takezoe:20181103122212p:plain

f:id:takezoe:20180604113417p:plain

マウスのbutton3を適当なキーに変更することでChromeなどでトラックポイントスクロールしようとしたときに新しいウィンドウが開いてしまう問題を回避できます(USB版ではセンターボタンを単独でクリックした場合は新しいウィンドウが開いてしまうのですが、スクロールするときは開かなかったのであまり気になりませんでした)。ファンクションキーはどうしようもないみたいなので今のところ諦めています。

それとトラックポイントでのマウスカーソルの移動の感度がやや悪い気がするのですが、Mac側の設定でマウスカーソルの速度を最大にした上でトラックポイントのキャップをロープロファイル(昔のものと比べると少し背が低い)のものから通常の高さのキャップに交換することでいい感じになります。操作性がだいぶよくなるのでおすすめのカスタマイズです。

takezoe.hatenablog.com

やはりケーブルがないのはいいですね。特にキー入力の遅延も感じられませんし、有線版と同じ感覚で使用できています。ファンクションキーを多用する人だと厳しいかもしれませんが、自分はそんなでもないので今のところさほど不便はなさそうです。

GitBucket 4.25.0をリリースしました

Scalaで実装されたオープンソースのGitサーバ、GitBucket 4.25.0をリリースしました。

https://github.com/takezoe/gitbucket/releases/tag/4.25.0

セキュリティの改善

以前のバージョンのGitBucketにはKacper SzurekさんによるGitBucket 4.23.1 Unauthenticated Remote Code Executionという記事で解説されているようにいくつかのセキュリティ上の問題がありました。

リモートコードが実行可能な脆弱性はすでにGitBucket 4.24.0で修正済みでしたが今回のバージョンではセキュリティトークンの強度についても改善されています。Kacperさんの事前の連絡によって素早く修正を行うことができました。ありがとうございました。

また、この他にもパスワードのハッシュ強度などいくつかのセキュリティ上の改善が行われていますので、既存のバージョンをお使いの方はバージョンアップを強くお奨めします。

プロフィールページにメールアドレスを表示可能に

システム設定により、ユーザのプロフィールページにメールアドレスを表示できるようになりました。

f:id:takezoe:20180529111625p:plain

コミットコメントでもタスクリストが有効に

コミットコメントでもタスクリストが利用可能になりました。

f:id:takezoe:20180529111656p:plain

イシューやプルリクエストの変更履歴の強化

イシュー、プルリクエストのタイトルの変更が変更履歴として記録されるようになりました。

f:id:takezoe:20180529111640p:plain

公開キーを参照可能なエンドポイントを追加

/{user}.keys というエンドポイントでユーザの公開キーを参照できるようになりました。

リポジトリのダウンロード機能の改善

リポジトリのダウンロード機能に以下のような改善が加えられました。

  • LFS管理のファイルも含まれるように
  • リポジトリ全体だけでなく、特定のディレクトリをダウンロード可能に
  • ダウンロードファイル名改善(これまでは{branch}.zipでしたが{repository}-{branch}-{directory}.zipのようになりました。リポジトリのルートディレクトリの場合は{repository}-{branch}.zipのようになります)

今回のバージョンではこの他にも様々な改善やバグフィックスを行っています。詳細についてはIssueの一覧をご覧ください。