HystrixをScala / Playアプリケーションから使ってみる

前回はHystrixの簡単な紹介を書きました。

takezoe.hatenablog.com

HystrixはJavaライブラリなのでもちろんScalaからも使うことができるのですが、そのままだと若干Scalaからは使いにくい部分もあります。今回はScala(主にPlay Framework)でHystrixを使う場合について書いてみたいと思います。

Playにモニタリング用のエンドポイントを追加する

HystrixをScalaで使うには普通にHystrixCommandをScalaで実装してそれを呼び出せばよいのですが、問題はダッシュボードからのモニタリング用のエンドポイントが標準ではサーブレットコンテナ用のものしか用意されていないということです。

探してみたところ、PlayにHystrixを組み込むサンプルを作っている方がいました。

github.com

上記のリポジトリにあるHystrixSupportというコントローラがモニタリング用のエンドポイントの実装になります。ただ、このサンプルはPlayのバージョンが古く(2.3 系)、最新のPlay 2.5ではそのままでは動かなかったり警告が出たりします。そこでこのHystrixSupportをPlay 2.5用に修正してみました。

package controllers

import play.api.mvc._
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import com.netflix.config.scala.DynamicIntProperty
import com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsPoller
import play.api.Logger
import play.api.libs.iteratee.Enumerator
import java.util.concurrent.TimeUnit
import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import java.io.OutputStream
import javax.inject.Inject
import akka.stream.scaladsl.Source
import play.api.libs.streams.Streams
import scala.util.Try

class HystrixSupport @Inject()(system: ActorSystem) extends Controller {
  import play.api.libs.concurrent.Execution.Implicits.defaultContext

  def stream(delayOpt: Option[Int]) = Action {

    val numberConnections = concurrentConnections.incrementAndGet()
    val maxConnections = maxConcurrentConnections.get

    Some(numberConnections).
      filter(_ <= maxConnections).
      map(_ => delayOpt.getOrElse(500)).
      fold(unavailable(maxConnections)) { delay =>
        val source = Source.fromPublisher(Streams.enumeratorToPublisher(streamRequest(delay)))
        Ok.chunked(source).withHeaders(
          "Content-Type" -> "text/event-stream;charset=UTF-8",
          "Cache-Control" -> "no-cache, no-store, max-age=0, must-revalidate",
          "Pragma" -> "no-cache"
        )
      }
  }

  private[this] def unavailable(max: Int) = {
    concurrentConnections.decrementAndGet()
    ServiceUnavailable(s"MaxConcurrentConnections reached: $maxConcurrentConnections")
  }

  private[this] final val concurrentConnections: AtomicInteger = new AtomicInteger(0)
  private[this] final val maxConcurrentConnections: DynamicIntProperty =
    new DynamicIntProperty("hystrix.stream.maxConcurrentConnections", 5)

  private[this] def streamRequest(delay: Int): Enumerator[Array[Byte]] = {
    val listener = new MetricJsonListener(1000)
    val poller = new HystrixMetricsPoller(listener, delay)
    poller.start()
    Logger.info("Starting poller")

    val delayDuration = FiniteDuration(delay, TimeUnit.MILLISECONDS)
    //val system = Akka.system

    val streamer = (out: OutputStream) => produceStream(poller, listener, delayDuration, system, out)
    val closer = () => {
      Logger.info("Closing poller")
      poller.shutdown()
      concurrentConnections.decrementAndGet()
    }

    val enum = Enumerator.outputStream(streamer)
    enum.onDoneEnumerating(closer())
  }

  private[this] def produceStream(poller: HystrixMetricsPoller, listener: MetricJsonListener, delay: FiniteDuration, system: ActorSystem, out: OutputStream): Unit = {
    val strings = produce(poller, listener)
    if (strings.isEmpty) {
      out.flush()
      out.close()
    }
    else {
      strings.foreach(s => out.write(s"$s\n\n".getBytes("UTF-8")))
      out.flush()
      Try(system.scheduler.scheduleOnce(delay)(produceStream(poller, listener, delay, system, out)))
    }
  }

  private[this] def produce(poller: HystrixMetricsPoller, listener: MetricJsonListener): Vector[String] = {
    if (!poller.isRunning) Vector()
    else {
      val jsonMessages = listener.getJsonMetrics

      if (jsonMessages.isEmpty) Vector("ping: ")
      else jsonMessages.map(j => s"data: $j")
    }
  }

  private class MetricJsonListener(capacity: Int) extends HystrixMetricsPoller.MetricsAsJsonPollerListener {

    private[this] final val metrics = new AtomicReference[Vector[String]](Vector())

    @tailrec
    private[this] final def set(oldValue: Vector[String], newValue: Vector[String]): Boolean = {
      metrics.compareAndSet(oldValue, newValue) || set(oldValue, newValue)
    }

    private[this] final def getAndSet(newValue: Vector[String]): Vector[String] = {
      val oldValue = metrics.get
      set(oldValue, newValue)
      oldValue
    }

    def handleJsonMetric(json: String): Unit = {
      val oldMetrics = metrics.get()
      if (oldMetrics.size >= capacity) throw new IllegalStateException("Queue full")

      val newMetrics = oldMetrics :+ json
      set(oldMetrics, newMetrics)
    }

    def getJsonMetrics: Vector[String] = getAndSet(Vector())
  }
}

Playアプリケーションに組み込むにはまずbuild.sbtに以下の依存関係を追加します。

libraryDependencies ++= Seq(
  "com.netflix.archaius"  % "archaius-scala"                  % "0.7.4",
  "com.netflix.hystrix"   % "hystrix-core"                    % "1.5.3",
  "com.netflix.hystrix"   % "hystrix-metrics-event-stream"    % "1.5.3"
)

それからroutesに以下のルーティングを追加します。

GET    /hystrix.stream    controllers.HystrixSupport.stream(delay: Option[Int])

このエンドポイントをダッシュボードでモニタリングすればOKです。

HystrixのコマンドをFutureに変換する

上記のPlayでのサンプルの中にHystrixのコマンドをScalaのFutureに変換するためのimplicit classがあるのですが、これを少し改変してみました。

package util

import scala.concurrent.{Future, Promise}
import com.netflix.hystrix.HystrixObservable

object Futures {

  private class ForPromiseObserver[T](p: Promise[T]) extends rx.Observer[T] {
    def onNext(t: T): Unit = p.trySuccess(t)
    def onError(e: Throwable): Unit = p.tryFailure(e)
    def onCompleted(): Unit = ()
  }

  implicit final class HystrixCommandWithScalaFuture[T](val cmd: HystrixObservable[T]) extends AnyVal {
    def future: Future[T] = {
      val promise = Promise[T]()
      val observer = new ForPromiseObserver(promise)

      cmd.observe().subscribe(observer)

      promise.future
    }
  }
}

以下のような感じでfutureメソッドでコマンドをFutureに変換できます。

import utils.Futures._

class Application() extends Controller {
  def index = Action.async {
    new CommandHelloWorld("World").future.map(Ok(_))
  }
}

FutureをHystrixのコマンドに変換する

同期的な処理をScalaFutureに合成したい場合は前述のimplicit classでよいのですが、もともとFutureを返すScalaライブラリにHystrixを適用したい場合はもう少し工夫が必要になります。以下のようなアダプタを作ってみました。

package utils

import com.netflix.hystrix.{HystrixCommandGroupKey, HystrixObservableCommand}
import rx.Observable
import rx.lang.scala.subjects.ReplaySubject
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

abstract class HystrixFutureCommand[T](groupKey: HystrixCommandGroupKey)(implicit ec: ExecutionContext) extends HystrixObservableCommand[T](groupKey) {

  override def construct(): Observable[T] = {
    val channel = ReplaySubject[T]()

    run().onComplete {
      case Success(v) => {
        channel.onNext(v)
        channel.onCompleted()
      }
      case Failure(t) => {
        channel.onError(t)
        channel.onCompleted()
      }
    }

    channel.asJavaSubject
  }

  def run(): Future[T]

}

こんな感じで使います。

class FutureHelloCommand extends HystrixFutureCommand[String](HystrixCommandGroupKey.Factory.asKey("HelloWorldAsync")){

  def run(): Future[String] ={
    Future {
      "Hello World!"
    }
  }

  override protected def resumeWithFallback(): Observable[String] = {
    Observable.from(Array("resume!"))
  }
}

エラー処理を行う場合、Future#recover()でフォールバック処理を行ってしまうとHystrixでエラーのハンドリングができないので、コマンド側のフォールバック機構を使用する必要があるという点に注意が必要です。

まとめ

HystrixはJava用のライブラリですが、簡単なアダプタを用意することでScalaでもさほど違和感なく使用することができます。

ただ、使用するフレームワークによってはモニタリング用のエンドポイントを自前で実装しないといけないのでちょっと大変かもしれません。また、ScalaFutureを使っている場合、スレッドプールの管理やエラーハンドリングなどHystrixの機能と被る部分があるため効率が悪かったり方針の整理が必要になりそうです。ScalaFutureとHystrixをシームレスに統合できる方法を考えられるともっと使いやすくなりそうです。

なお、今回Play 2.5用に修正したり新たに追加したものについては整理して冒頭で紹介したPlay + Hystrixのサンプルプロジェクトにプルリクエストしておきました。

github.com