Netflixのオープンソースソフトウェア

ここのところHystrixについて調べていたのですが、Netflixは他にもGitHub上で様々なOSSを公開しています。

github.com

Javaのものが中心ですがPythonやGo、Cで書かれているものもあります。ライブラリ的なものからミドルウェアや運用ツールまで多岐に渡っており、NetflixAWSを利用しているということもありAWS上での利用に特化したものもあります。また各プロダクトのドキュメントもしっかりしており、以下のような専用のサイトも立ち上げられており、社内で開発したものを積極的にOSS化するという方針が伺えます。

netflix.github.io

HystrixやEurekaなどを筆頭に有名なものも多いのですが、なにぶん数が多くどのようなものがあるのかを把握するのも割と一苦労な感じなのですが、Netflixでは自社のOSSを紹介するMeetupが継続的に開催されているらしく、そのスライドを集めてみました。

www.slideshare.net

www.slideshare.net

www.slideshare.net

www.slideshare.net

www.slideshare.net

www.slideshare.net

www.slideshare.net

2014年のスライドですが、初期の頃からあるプロダクトについては以下が参考になります。

www.slideshare.net

日本語では以下の記事でまとめられています。

d.hatena.ne.jp

wazanova.jp

HystrixをScala / Playアプリケーションから使ってみる

前回はHystrixの簡単な紹介を書きました。

takezoe.hatenablog.com

HystrixはJavaライブラリなのでもちろんScalaからも使うことができるのですが、そのままだと若干Scalaからは使いにくい部分もあります。今回はScala(主にPlay Framework)でHystrixを使う場合について書いてみたいと思います。

Playにモニタリング用のエンドポイントを追加する

HystrixをScalaで使うには普通にHystrixCommandをScalaで実装してそれを呼び出せばよいのですが、問題はダッシュボードからのモニタリング用のエンドポイントが標準ではサーブレットコンテナ用のものしか用意されていないということです。

探してみたところ、PlayにHystrixを組み込むサンプルを作っている方がいました。

github.com

上記のリポジトリにあるHystrixSupportというコントローラがモニタリング用のエンドポイントの実装になります。ただ、このサンプルはPlayのバージョンが古く(2.3 系)、最新のPlay 2.5ではそのままでは動かなかったり警告が出たりします。そこでこのHystrixSupportをPlay 2.5用に修正してみました。

package controllers

import play.api.mvc._
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import com.netflix.config.scala.DynamicIntProperty
import com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsPoller
import play.api.Logger
import play.api.libs.iteratee.Enumerator
import java.util.concurrent.TimeUnit
import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import java.io.OutputStream
import javax.inject.Inject
import akka.stream.scaladsl.Source
import play.api.libs.streams.Streams
import scala.util.Try

class HystrixSupport @Inject()(system: ActorSystem) extends Controller {
  import play.api.libs.concurrent.Execution.Implicits.defaultContext

  def stream(delayOpt: Option[Int]) = Action {

    val numberConnections = concurrentConnections.incrementAndGet()
    val maxConnections = maxConcurrentConnections.get

    Some(numberConnections).
      filter(_ <= maxConnections).
      map(_ => delayOpt.getOrElse(500)).
      fold(unavailable(maxConnections)) { delay =>
        val source = Source.fromPublisher(Streams.enumeratorToPublisher(streamRequest(delay)))
        Ok.chunked(source).withHeaders(
          "Content-Type" -> "text/event-stream;charset=UTF-8",
          "Cache-Control" -> "no-cache, no-store, max-age=0, must-revalidate",
          "Pragma" -> "no-cache"
        )
      }
  }

  private[this] def unavailable(max: Int) = {
    concurrentConnections.decrementAndGet()
    ServiceUnavailable(s"MaxConcurrentConnections reached: $maxConcurrentConnections")
  }

  private[this] final val concurrentConnections: AtomicInteger = new AtomicInteger(0)
  private[this] final val maxConcurrentConnections: DynamicIntProperty =
    new DynamicIntProperty("hystrix.stream.maxConcurrentConnections", 5)

  private[this] def streamRequest(delay: Int): Enumerator[Array[Byte]] = {
    val listener = new MetricJsonListener(1000)
    val poller = new HystrixMetricsPoller(listener, delay)
    poller.start()
    Logger.info("Starting poller")

    val delayDuration = FiniteDuration(delay, TimeUnit.MILLISECONDS)
    //val system = Akka.system

    val streamer = (out: OutputStream) => produceStream(poller, listener, delayDuration, system, out)
    val closer = () => {
      Logger.info("Closing poller")
      poller.shutdown()
      concurrentConnections.decrementAndGet()
    }

    val enum = Enumerator.outputStream(streamer)
    enum.onDoneEnumerating(closer())
  }

  private[this] def produceStream(poller: HystrixMetricsPoller, listener: MetricJsonListener, delay: FiniteDuration, system: ActorSystem, out: OutputStream): Unit = {
    val strings = produce(poller, listener)
    if (strings.isEmpty) {
      out.flush()
      out.close()
    }
    else {
      strings.foreach(s => out.write(s"$s\n\n".getBytes("UTF-8")))
      out.flush()
      Try(system.scheduler.scheduleOnce(delay)(produceStream(poller, listener, delay, system, out)))
    }
  }

  private[this] def produce(poller: HystrixMetricsPoller, listener: MetricJsonListener): Vector[String] = {
    if (!poller.isRunning) Vector()
    else {
      val jsonMessages = listener.getJsonMetrics

      if (jsonMessages.isEmpty) Vector("ping: ")
      else jsonMessages.map(j => s"data: $j")
    }
  }

  private class MetricJsonListener(capacity: Int) extends HystrixMetricsPoller.MetricsAsJsonPollerListener {

    private[this] final val metrics = new AtomicReference[Vector[String]](Vector())

    @tailrec
    private[this] final def set(oldValue: Vector[String], newValue: Vector[String]): Boolean = {
      metrics.compareAndSet(oldValue, newValue) || set(oldValue, newValue)
    }

    private[this] final def getAndSet(newValue: Vector[String]): Vector[String] = {
      val oldValue = metrics.get
      set(oldValue, newValue)
      oldValue
    }

    def handleJsonMetric(json: String): Unit = {
      val oldMetrics = metrics.get()
      if (oldMetrics.size >= capacity) throw new IllegalStateException("Queue full")

      val newMetrics = oldMetrics :+ json
      set(oldMetrics, newMetrics)
    }

    def getJsonMetrics: Vector[String] = getAndSet(Vector())
  }
}

Playアプリケーションに組み込むにはまずbuild.sbtに以下の依存関係を追加します。

libraryDependencies ++= Seq(
  "com.netflix.archaius"  % "archaius-scala"                  % "0.7.4",
  "com.netflix.hystrix"   % "hystrix-core"                    % "1.5.3",
  "com.netflix.hystrix"   % "hystrix-metrics-event-stream"    % "1.5.3"
)

それからroutesに以下のルーティングを追加します。

GET    /hystrix.stream    controllers.HystrixSupport.stream(delay: Option[Int])

このエンドポイントをダッシュボードでモニタリングすればOKです。

HystrixのコマンドをFutureに変換する

上記のPlayでのサンプルの中にHystrixのコマンドをScalaのFutureに変換するためのimplicit classがあるのですが、これを少し改変してみました。

package util

import scala.concurrent.{Future, Promise}
import com.netflix.hystrix.HystrixObservable

object Futures {

  private class ForPromiseObserver[T](p: Promise[T]) extends rx.Observer[T] {
    def onNext(t: T): Unit = p.trySuccess(t)
    def onError(e: Throwable): Unit = p.tryFailure(e)
    def onCompleted(): Unit = ()
  }

  implicit final class HystrixCommandWithScalaFuture[T](val cmd: HystrixObservable[T]) extends AnyVal {
    def future: Future[T] = {
      val promise = Promise[T]()
      val observer = new ForPromiseObserver(promise)

      cmd.observe().subscribe(observer)

      promise.future
    }
  }
}

以下のような感じでfutureメソッドでコマンドをFutureに変換できます。

import utils.Futures._

class Application() extends Controller {
  def index = Action.async {
    new CommandHelloWorld("World").future.map(Ok(_))
  }
}

FutureをHystrixのコマンドに変換する

同期的な処理をScalaFutureに合成したい場合は前述のimplicit classでよいのですが、もともとFutureを返すScalaライブラリにHystrixを適用したい場合はもう少し工夫が必要になります。以下のようなアダプタを作ってみました。

package utils

import com.netflix.hystrix.{HystrixCommandGroupKey, HystrixObservableCommand}
import rx.Observable
import rx.lang.scala.subjects.ReplaySubject
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

abstract class HystrixFutureCommand[T](groupKey: HystrixCommandGroupKey)(implicit ec: ExecutionContext) extends HystrixObservableCommand[T](groupKey) {

  override def construct(): Observable[T] = {
    val channel = ReplaySubject[T]()

    run().onComplete {
      case Success(v) => {
        channel.onNext(v)
        channel.onCompleted()
      }
      case Failure(t) => {
        channel.onError(t)
        channel.onCompleted()
      }
    }

    channel.asJavaSubject
  }

  def run(): Future[T]

}

こんな感じで使います。

class FutureHelloCommand extends HystrixFutureCommand[String](HystrixCommandGroupKey.Factory.asKey("HelloWorldAsync")){

  def run(): Future[String] ={
    Future {
      "Hello World!"
    }
  }

  override protected def resumeWithFallback(): Observable[String] = {
    Observable.from(Array("resume!"))
  }
}

エラー処理を行う場合、Future#recover()でフォールバック処理を行ってしまうとHystrixでエラーのハンドリングができないので、コマンド側のフォールバック機構を使用する必要があるという点に注意が必要です。

まとめ

HystrixはJava用のライブラリですが、簡単なアダプタを用意することでScalaでもさほど違和感なく使用することができます。

ただ、使用するフレームワークによってはモニタリング用のエンドポイントを自前で実装しないといけないのでちょっと大変かもしれません。また、ScalaFutureを使っている場合、スレッドプールの管理やエラーハンドリングなどHystrixの機能と被る部分があるため効率が悪かったり方針の整理が必要になりそうです。ScalaFutureとHystrixをシームレスに統合できる方法を考えられるともっと使いやすくなりそうです。

なお、今回Play 2.5用に修正したり新たに追加したものについては整理して冒頭で紹介したPlay + Hystrixのサンプルプロジェクトにプルリクエストしておきました。

github.com

マイクロサービスにレジリエンスをもたらすHystrixを試してみる

github.com

HystrixはNetflixが開発しているオープンソースJavaライブラリで、主として分散システムにおけるサービス間のやり取りをラップして以下のような機能を提供します。

例えば外部サービスに障害があり呼び出しがエラーになる場合や、処理に時間がかかった場合にフォールバック値を返すことで呼び出し元の処理を継続できたり、外部サービスの呼び出しでエラーが多発する場合は一時的に呼び出しをショートカットして呼び出し先の回復を待ったりといった制御を行うことができます。

マイクロサービスではあるサービスの障害や遅延が他のサービスに波及する危険性がありますが。このような仕組みをサービス間の通信に導入することでサービス全体のレジリエンスの向上が期待できます。

今回はHystrixの基本的な使い方とダッシュボードに表示するところまでをやってみたので簡単に紹介したいと思います。

基本的な使い方

pom.xmlに以下の依存関係を追加します。

<dependency>
  <groupId>com.netflix.hystrix</groupId>
  <artifactId>hystrix-core</artifactId>
  <version>1.5.3</version>
</dependency>

まずはHystrixのドキュメントにある簡単な例を見てみましょう。以下のように実行したい処理をHystrixCommandのサブクラスとして実装します。

public class CommandHelloWorld extends HystrixCommand<String> {

  private final String name;

  public CommandHelloWorld(String name) {
    super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
    this.name = name;
  }

  @Override
  protected String run() {
    return "Hello " + name + "!";
  }
}

ここではサンプルということで単に文字列を返す処理を行っていますが、実際は他のサービスとの通信をこのクラスで行うことになります。

コマンドは以下のようにいくつかの呼び出し方があります。

// ブロックして結果を取得
String s = new CommandHelloWorld("Bob").execute();

// Futureを取得
Future<String> f = new CommandHelloWorld("Bob").queue();

// コールバックで結果を取得
Observable<String> o = new CommandHelloWorld("Bob").observe();
o.subscribe((s) -> {
  System.out.println("onNext: " + s);
});

エラー時にも値を返すようにする

getFallback()メソッドを実装しておくとコマンドの処理で例外が発生した場合、代わりにそのメソッドの戻り値を返すことができます。

public class CommandHelloWorld extends HystrixCommand<String> {

  private final String name;

  public CommandHelloWorld(String name) {
    super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
    this.name = name;
  }

  @Override
  protected String run() {
    throw new RuntimeException();
  }

  @Override
  protected String getFallback() {
    return "dummy message";
  }
}

処理に時間がかかる場合はタイムアウトさせる

以下のようにコマンド生成時にコンストラクタの引数としてタイムアウト値を指定することができます。

public CommandHelloWorld(String name) {
  super(Setter
    .withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
    .andCommandPropertiesDefaults(
      HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(1000)
    ));
  this.name = name;
}

コマンドの処理が指定時間以内に完了しない場合はgetFallback()メソッドの戻り値が返されます。getFallback()メソッドが実装されていない場合はHystrixRuntimeExceptionがスローされます。

値をキャッシュする

getCacheKey()メソッドでキャッシュキーを返すようにしておくと同じキーを返す呼び出しに対してはキャッシュした値を返すようにできます。

public class CommandHelloWorld extends HystrixCommand<String> {

  private final String name;

  public CommandHelloWorld(String name) {
    super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
    this.name = name;
  }

  @Override
  protected String run() {
    return "Hello " + name + "!";
  }

  @Override
  protected String getCacheKey() {
    return this.name;
  }
}

ただし、キャッシュを使用する場合はHystrixRequestContextを生成しておく必要があります。サーブレットフィルタでコンテキストの初期化とシャットダウンを行うことが想定されているようです。

HystrixRequestContext context = HystrixRequestContext.initializeContext();

ドキュメントを見てもキャッシュに関する設定項目はキャッシュを使うかどうかだけでキャッシュの有効期限やエントリ数などを細かく制御することはできないのでしょうか?

ダッシュボードでモニタリングする

アプリケーションにメトリクスを取得するためのエンドポイントを組み込むことでダッシュボードに表示することもできます。hystrix-metrics-event-streamモジュールでサーブレットコンテナ用のエンドポイント実装が提供されているのでこれを使ってみます。

まずは前述のコマンドを呼び出すWebアプリケーションを作成しておきます。直接サーブレットに実装してもよいですし、サーブレットコンテナ上で稼働するものであればフレームワークを使ったものでも構いません。そのうえでpom.xmlに以下の依存関係を追加します。

<dependency>
  <groupId>com.netflix.hystrix</groupId>
  <artifactId>hystrix-metrics-event-stream</artifactId>
  <version>1.5.3</version>
</dependency>

次に作成したアプリケーションにHytrixのダッシュボードからメトリックを取得するためのエンドポイントを追加するためにweb.xmlに以下の設定を追加します。

<servlet>
  <description></description>
  <display-name>HystrixMetricsStreamServlet</display-name>
  <servlet-name>HystrixMetricsStreamServlet</servlet-name>
  <servlet-class>com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsStreamServlet</servlet-class>
</servlet>

<servlet-mapping>
  <servlet-name>HystrixMetricsStreamServlet</servlet-name>
  <url-pattern>/hystrix.stream</url-pattern>
</servlet-mapping>

Hystrixのダッシュボードを起動します。

$ git clone https://github.com/Netflix/Hystrix.git
$ cd Hystrix/hystrix-dashboard
$ ../gradlew jettyRun
> Building > :hystrix-dashboard:jettyRun > Running at http://localhost:7979/hystrix-dashboard

f:id:takezoe:20160616165301p:plain

モニタリング対象のWebアプリケーションのエンドポイントを追加してモニタリングを開始すると以下のようにダッシュボードに状態が表示されます。

f:id:takezoe:20160616165312p:plain

まとめ

Hystrixはこの他にも様々な機能があります。ドキュメントがかなりしっかりしているので一通り読んでみるといいと思います。

通信以外の処理をコマンド化してもよいと思いますし、導入も比較的容易なのでマイクロサービスアーキテクチャを採用しない場合でもモニタリングしたい部分や外部サービスを呼び出している部分などにピンポイントで入れておくといった使い方でも便利そうです。

なお、HystrixはJava用のライブラリなのでScalaではそのままでは使いにくかったり、ダッシュボードに情報を提供するためのエンドポイントもサーブレットコンテナ用の実装しか用意されていなかたりします。次回はHystrixをScalaで使う方法について書いてみたいと思います。

ScalaCheckでケースクラスを生成するジェネレータの作り方

先日社内勉強会でJavaでのプロパティベーステストライブラリとしてjunit-quickcheckが取り上げられていました。

参加者から「単一のプロパティではなく複数のプロパティを持ったクラスのインスタンスを生成したい場合はどうすればよいか?」という質問があり、junit-quickcheckではそのクラス用のジェネレータを書く必要があるということだったのですが、Scala用のプロパティベーステストライブラリであるScalaCheckではどうなっているのか試してみました。

まず基本的な型からです。ScalaCheckでも基本的な型のジェネレータは標準で提供されているので以下のような感じでテストを書くことができます。

import org.scalacheck.Properties
import org.scalacheck.Prop.forAll

object IntSpecification extends Properties("Int") {
  property("add") = forAll { (a: Int, b: Int) =>
    a + b == b + a
  }
}

独自のジェネレータも簡単に書くことができます。

val smallInt = Gen.choose(0, 100)

property("add") = forAll(smallInt, smallInt) { (a: Int, b: Int) =>
  a + b >= a
}

さて、ここからが本題なのですが、ScalaCheckではジェネレータを組み合わせてケースクラスを生成するジェネレータを定義することができます。ジェネレータが書きやすかったり組み合わせがしやすかったりするのはScalaCheckの嬉しいところです。

val userGen = for {
  name <- arbitrary[String]
  age  <- Gen.choose(0, 100)
} yield User(name, age)

property("add") = forAll(userGen, userGen) { (a: User, b: User) =>
  a.age + b.age >= a.age
}

scalacheck-shapelessを使うとケースクラス用のジェネレータをマクロで生成することができます。

github.com

implicitly[Arbitrary[User]]

property("add") = forAll { (a: User, b: User) =>
  a.age + b.age == b.age + a.age
}

ただ、実際にプロパティベーステストを導入する場合要件に応じたカスタムジェネレータを作ることになると思うのであまり使う機会はないかも…。きちんとドメインモデルを設計してプロパティの型を定義している場合は便利かもしれません。

Happy Hacking Keyboard Professional2を買ってみた

f:id:takezoe:20160611091108j:plain

ずっとThinkPad USBキーボードを使っていたのですが、職場の若者たち(オッサンもですが…)が皆HHKBやRealforceなどの高級キーボードを使っていて羨ましくなってきたので買ってみました。*1

US配列のキーボードは初めてなのですが、配列は使っていればすぐ慣れそうかな。ただ、一部のキー配列やコンビネーションが特殊で他のキーボードと頭の切り替えが大変そうかなと感じました。また、確かにキータッチはよいのですが、パンタグラフのもの比べるとやはり手首や二の腕に負担がかかっている実感があります。

ThinkPadキーボードから乗り換える際の一番の障害はやはりなんといってもトラックポイントです。今までポインティングデバイスThinkPadキーボードについているトラックポイントロジクールの充電式Bluetoothトラックパッドを併用していました。*2

LOGICOOL 充電式トラックパッド T651

LOGICOOL 充電式トラックパッド T651

HHKB Pro2に切り替えたことでポインティングデバイストラックパッドだけになってしまったのですが、このトラックパッドの精度があまりよくないのでテキストのコピー&ペーストなどの細かい操作が随分やりにくくなってしまいました。Apple純正のトラックパッドを使うなりマウスを使うなりすればいいのかもしれませんが、この際なので今までトラックポイントに頼っていた操作もなるべくキーボードだけで操作できるよう練習してみようと思います。

しかしそう考えるとキーボードショートカット等を覚える労力を仕組みで解決しているトラックポイントは改めて偉大だなと感じました。自分はやはりTEX Yodaを買うべきだったのかもしれません。実は中国のレノボの公式ショップではThinkPadブランドでTEX Yodaの販売していたことがあるそうです。是非とも正式に取り扱って欲しいところです。

watchmono.com

*1:木曜日の夜にAmazonでポチったのですが翌日の午後には配送されたようです。すごいですね。一体どうなってるんだろう…。

*2:基本的にはトラックポイントを使っていたのですが、ブラウザでネットを見ている時のスクロールなどにトラックパッドを使っていました。

Apache DrillでLTSVを検索するためのプラグインを作ってみた

DrillはJSONCSV、TSVなどはデフォルトでサポートしているのですが残念ながらLTSVはサポートしていません。もちろんLTSVはシンプルなのでJSONなどへの変換も容易なのですが、S3にバックアップしたLTSV形式のログを直接Drillから検索できると便利かなと思って作ってみました。

github.com

使い方ですが、リリースページからダウンロードしたdrill-ltsv-plugin-バージョン.jarDRILL_HOME/jars/3rdpartyに配置し、ストレージプラグインに以下のような感じでフォーマットの設定を追加します。

  "formats": {
    "ltsv": {
      "type": "ltsv",
      "extensions": [
        "ltsv"
      ]
    },
    ...
  }

するとDrillから*.ltsvファイルに対してSQLで検索できるようになります。

f:id:takezoe:20160607115753p:plain

動作は最新のDrill 1.6.0で確認しています。

さほど大量のログに対して実行したわけではないのでどの程度の性能が出るのかよくわかりませんが、とりあえずS3に放り込んでおいたログをアドホックに検索できるのでシビアな用途でなければまあまあ便利に使えるのではないかと思います。

Apache DrillでS3にアクセスする設定をREST APIで行ってみる

Apache Drillを取り上げているブログ記事などを見るとストレージの設定はWebコンソールから行う方法が紹介されていることが多いのですが、環境構築の自動化などを考えると設定などはコマンドラインで済ませたいところです。DrillはREST APIも備えており、これを使用して設定を行うことができるようです。

たとえばS3用にアクセスする設定を行うならDrillを起動した状態でこんな感じのリクエストを投げます。

$ curl -H 'Content-Type: application/json' -XPOST http://localhost:8047/storage/s3.json -d'{
  "name": "s3",
  "config": {
    "type": "file",
    "enabled": true,
    "connection": "s3a://my-backet-name",
    "config": null,
    "workspaces": {
      "root": {
        "location": "/",
        "writable": false,
        "defaultInputFormat": null
      },
      "tmp": {
        "location": "/tmp",
        "writable": true,
        "defaultInputFormat": null
      }
    },
    "formats": {
      "psv": {
        "type": "text",
        "extensions": [
          "tbl"
        ],
        "delimiter": "|"
      },
      "csv": {
        "type": "text",
        "extensions": [
          "csv"
        ],
        "delimiter": ","
      },
      "tsv": {
        "type": "text",
        "extensions": [
          "tsv"
        ],
        "delimiter": "\t"
      },
      "parquet": {
        "type": "parquet"
      },
      "json": {
        "type": "json",
        "extensions": [
          "json"
        ]
      },
      "avro": {
        "type": "avro"
      },
      "sequencefile": {
        "type": "sequencefile",
        "extensions": [
          "seq"
        ]
      },
      "csvh": {
        "type": "text",
        "extensions": [
          "csvh"
        ],
        "extractHeader": true,
        "delimiter": ","
      }
    }
  }
}'

なお、アクセスキーの設定はDrillのインストールディレクトリ配下にあるconf/core-site.xmlで別途行う必要があります。また、IAMロールでアクセス権を設定する場合は以下のようにconf/core-site.xmlの該当箇所をコメントアウトする必要があります。

<configuration>
<!--
    <property>
        <name>fs.s3a.access.key</name>
        <value>ENTER_YOUR_ACCESSKEY</value>
    </property>

    <property>
        <name>fs.s3a.secret.key</name>
        <value>ENTER_YOUR_SECRETKEY</value>
    </property>
-->
</configuration>

SQLを実行してみます。

0: jdbc:drill:zk=local> select count(*) from s3.`request_log/*`;
+---------+
| EXPR$0  |
+---------+
| 26      |
+---------+
1 row selected (4.487 seconds)

とりあえずS3に置いておいたファイルを雑に検索できるのは便利ですね。LTSV用のストレージプラグインが欲しいなぁ…。