Apache PredictionIOとApache Zeppelinを組み合わせてみる

Apache PredictionIOとApache ZeppelinはどちらもApache Software Foundationのプロジェクトです。PredictionIOはSparkML / MLlibベースの機械学習アプリケーションを開発・運用するためのプラットフォームを提供するもので、ZeppelinはSpark / SparkSQLを使用可能なノートブックを提供するものです。

PredictionIOは学習に使用するイベントデータをRDBMSやHBase、Elasticsearchに保存するのですが、これをZeppelin上でSQLを使って分析することができたら便利ではないかと思いやってみました。本当はPredictionIOが提供しているAPIをそのままライブラリとして使えたらよかったのですが、初期化処理などがPredictionIOのライフサイクルにべったり依存していて厳しそうだったので無理やりラップして簡単に使えるライブラリを作ってみました。

github.com

このライブラリはMavenセントラルにpublishしてあるのでZeppelinのノートブック上で以下のようにロードするだけで使用することができます。

%dep
z.load("com.github.takezoe:predictionio-toolbox_2.11:0.0.1")

以下のようにPIOToolboxクラスを使用してイベントデータをRDDとして取得し、SparkSQLで利用するためにテーブルとして登録します。

import com.github.takezoe.predictionio.toolbox._

// Create toolbox with PIO_HOME
val toolbox = PIOToolbox("/Users/naoki.takezoe/PredictionIO-0.12.1")
val eventsRDD = toolbox.find("MyApp1")(sc)

case class Rating(
  user: String,
  item: String,
  rating: Double
)

val ratingRDD = eventsRDD.map { event => 
  val ratingValue: Double = event.event match {
    case "rate" => event.properties.get[Double]("rating")
    case "buy" => 4.0 // map buy event to rating value of 4
    case _ => throw new Exception(s"Unexpected event ${event} is read.")
  }
  // entityId and targetEntityId is String
  Rating(event.entityId,
    event.targetEntityId.get,
    ratingValue)
}

val df = spark.createDataFrame(ratingRDD)
df.registerTempTable("rating")

するとSQLで検索することが可能になります。

%sql
select rating, count(*) from rating group by rating

f:id:takezoe:20180330140251p:plain

PIOToolboxクラスにはこの他にもRDDをイベントデータとして登録するメソッドなどがあります。利用可能なメソッドについてはソースコードを参照していただければと思います。

ノートブックというとJupyterの利用者が多いのではないかと思いますが、ZeppelinはデフォルトでScala / Spark / SparkSQLを利用することができるので、同じくScala / Sparkベースの機械学習プラットフォームであるPredictionIOとは相性が良さそうです。PredictionIOで機械学習アプリケーションを開発する場合、学習データのインポートや前処理などにZeppelinを使うのはありかもしれないと感じました。