Databricks社がOSS化したSpark用ストレージレイヤ「Delta Lake」について

先日開催されたSpark + AI Summit 2019にあわせてDatabricks社からSpark用のストレージレイヤ「Delta Lake」のOSS化が発表されました。

databricks.com

GitHubリポジトリはこちら。

github.com

Delta LakeはSparkのライブラリとして実装されており、分散ストレージ上で以下のような機能を提供します。

実際に動かしてみる

Delta Lakeの動作にはSpark 2.4.2以降が必要です。ローカルファイルシステムでも動作するのでspark-shellで動きを確認してみました。

$ bin/spark-shell --packages io.delta:delta-core_2.12:0.1.0

まずはテーブルへの書き込みと読み込みをやってみます。

scala> val df1 = Seq((9, "Lacazette", "France"), (14, "Aubameyang", "Gabon"), (17, "Iwobi", "Nigeria"), (23, "Welbeck", "England")).toDF("number", "name", "country")
df: org.apache.spark.sql.DataFrame = [number: int, name: string ... 1 more field]

scala> df1.write.format("delta").save("/tmp/arsenal")

scala> val df2 = spark.read.format("delta").load("/tmp/arsenal")
df: org.apache.spark.sql.DataFrame = [number: int, name: string ... 1 more field]

scala> df2.show()
+------+----------+-------+
|number|      name|country|
+------+----------+-------+
|    14|Aubameyang|  Gabon|
|     9| Lacazette| France|
|    23|   Welbeck|England|
|    17|     Iwobi|Nigeria|
+------+----------+-------+

追記してみます。

scala> val df3 = Seq((10, "Ozil", "Germany"), (11, "Torreira", "Uruguay"), (15, "Maitland-Niles", "England"), (34, "Xhaka", "Swizerland")).toDF("number", "name", "country")
df3: org.apache.spark.sql.DataFrame = [number: int, name: string ... 1 more field]

scala> df3.write.format("delta").mode("append").save("/tmp/arsenal")

scala> val df4 = spark.read.format("delta").load("/tmp/arsenal")
df4: org.apache.spark.sql.DataFrame = [number: int, name: string ... 1 more field]

scala> df4.show()
+------+--------------+----------+
|number|          name|   country|
+------+--------------+----------+
|    15|Maitland-Niles|   England|
|    34|         Xhaka|Swizerland|
|    14|    Aubameyang|     Gabon|
|    11|      Torreira|   Uruguay|
|     9|     Lacazette|    France|
|    23|       Welbeck|   England|
|    17|         Iwobi|   Nigeria|
|    10|          Ozil|   Germany|
+------+--------------+----------+

タイムトラベルで最初のバージョンのテーブルを参照してみます。ここではversionAsOfというオプションでバージョンを指定していますが、timestampAsOfというオプションで日付を指定することもできるようです。

scala> val df5 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/arsenal")
df5: org.apache.spark.sql.DataFrame = [number: int, name: string ... 1 more field]

scala> df5.show()
+------+----------+-------+
|number|      name|country|
+------+----------+-------+
|    14|Aubameyang|  Gabon|
|     9| Lacazette| France|
|    23|   Welbeck|England|
|    17|     Iwobi|Nigeria|
+------+----------+-------+

スキーマバリデーションを試してみます。数値のフィールドに文字列を入れようとするとこんな感じでエラーになります。

scala> val df6 = Seq(("8", "Ramsey", "Wales")).toDF("number", "name", "country")
df6: org.apache.spark.sql.DataFrame = [number: string, name: string ... 1 more field]

scala> df6.write.format("delta").mode("append").save("/tmp/arsenal")
org.apache.spark.sql.AnalysisException: Failed to merge fields 'number' and 'number'. Failed to merge incompatible data types IntegerType and StringType;;
  at org.apache.spark.sql.delta.schema.SchemaUtils$.$anonfun$mergeSchemas$1(SchemaUtils.scala:526)
  ...

また、存在しないカラムを追加しようとすると以下のようなエラーになります。エラーメッセージにあるようにmergeSchemaオプションを付けるとスキーマを変更できます。

scala> val df6 = Seq((8, "Ramsey", "Wales", "Juventus")).toDF("number", "name", "country", "next_club")
df6: org.apache.spark.sql.DataFrame = [number: int, name: string ... 2 more fields]

scala> df6.write.format("delta").mode("append").save("/tmp/arsenal")
org.apache.spark.sql.AnalysisException: A schema mismatch detected when writing to the Delta table.
To enable schema migration, please set:
'.option("mergeSchema", "true")'.

Table schema:
root
-- number: integer (nullable = true)
-- name: string (nullable = true)
-- country: string (nullable = true)


Data schema:
root
-- number: integer (nullable = true)
-- name: string (nullable = true)
-- country: string (nullable = true)
-- next_club: string (nullable = true)


If Table ACLs are enabled, these options will be ignored. Please use the ALTER TABLE
command for changing the schema.
...

ファイルレイアウトや内部動作について

ファイルはテーブル毎に以下のようなレイアウトで書き込まれます。

f:id:takezoe:20190503131318p:plain

データはコミット毎に追加分がParquetで書き込まれ、_delta_logディレクトリに差分情報がJSON形式で保存されます。読み込む際は差分ログをリプレイして必要なParquetファイルを読み込みます。この仕組みによってACIDトランザクションJSONファイルの書き込みによってコミット)とタイムトラベル(JSONの差分ログをリプレイすることで任意の時点のデータを復元)が可能になっています。なお、トランザクションには楽観的排他制御が使用されており、別トランザクションで先に書き込みが行われた場合はトランザクションが失敗するようになっています。

このように差分を積み重ねていく方式のため、一度作成したParquetファイルを更新する必要がなく高速に書き込みを行うことができます。また、スキーマなどのメタデータJSONの差分ログ内に書き込まれるので、中央集約型のメタデータストアがボトルネックになることはないという設計のようです。

ただ、これだけだと履歴が増えるとParquetファイルも増えるので当然読み込みが遅くなります。そのため、_delta_logディレクトリ内にチェックポイント毎にスナップショットのParquetファイルが作成され、スナップショットが存在する場合は実際には直近のスナップショットに対してそれ以降の差分を適用するという動作になります。スナップショットはデフォルトでは10コミット毎に作成されますが、これはテーブルの性質によって調整する必要があると思います。

まとめ

Delta LakeはSparkに特化していることもあり、意外とコンパクトな実装になっています。スナップショットの作成などもSparkの機能が活用されており、実装としては思っていたよりもシンプルなものでした。今後レコードの更新・削除やデータのバリデーションなどもサポート予定のようです。

スナップショットをどんどん作っていくのでストレージの消費が大きくなりそうなのと、Spark(の特定バージョン)への依存性といったあたりが懸念材料でしょうか。ファイルレイアウトや仕組み自体はSparkに特化したものではないので他の分散コンピューティングエンジン向けのドライバを書くこともできそうです。

まだざっとコードを読みつつ手元で軽く動かしてみた程度なので見落としている部分もあるかもしれませんが、ひとまずこんなところです。ソースを読み込みつつもう少し研究してみたいと思います。