前回はHystrixの簡単な紹介を書きました。
HystrixはJavaライブラリなのでもちろんScalaからも使うことができるのですが、そのままだと若干Scalaからは使いにくい部分もあります。今回はScala(主にPlay Framework)でHystrixを使う場合について書いてみたいと思います。
Playにモニタリング用のエンドポイントを追加する
HystrixをScalaで使うには普通にHystrixCommandをScalaで実装してそれを呼び出せばよいのですが、問題はダッシュボードからのモニタリング用のエンドポイントが標準ではサーブレットコンテナ用のものしか用意されていないということです。
探してみたところ、PlayにHystrixを組み込むサンプルを作っている方がいました。
上記のリポジトリにあるHystrixSupportというコントローラがモニタリング用のエンドポイントの実装になります。ただ、このサンプルはPlayのバージョンが古く(2.3 系)、最新のPlay 2.5ではそのままでは動かなかったり警告が出たりします。そこでこのHystrixSupportをPlay 2.5用に修正してみました。
package controllers import play.api.mvc._ import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import com.netflix.config.scala.DynamicIntProperty import com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsPoller import play.api.Logger import play.api.libs.iteratee.Enumerator import java.util.concurrent.TimeUnit import scala.annotation.tailrec import scala.concurrent.duration.FiniteDuration import akka.actor.ActorSystem import java.io.OutputStream import javax.inject.Inject import akka.stream.scaladsl.Source import play.api.libs.streams.Streams import scala.util.Try class HystrixSupport @Inject()(system: ActorSystem) extends Controller { import play.api.libs.concurrent.Execution.Implicits.defaultContext def stream(delayOpt: Option[Int]) = Action { val numberConnections = concurrentConnections.incrementAndGet() val maxConnections = maxConcurrentConnections.get Some(numberConnections). filter(_ <= maxConnections). map(_ => delayOpt.getOrElse(500)). fold(unavailable(maxConnections)) { delay => val source = Source.fromPublisher(Streams.enumeratorToPublisher(streamRequest(delay))) Ok.chunked(source).withHeaders( "Content-Type" -> "text/event-stream;charset=UTF-8", "Cache-Control" -> "no-cache, no-store, max-age=0, must-revalidate", "Pragma" -> "no-cache" ) } } private[this] def unavailable(max: Int) = { concurrentConnections.decrementAndGet() ServiceUnavailable(s"MaxConcurrentConnections reached: $maxConcurrentConnections") } private[this] final val concurrentConnections: AtomicInteger = new AtomicInteger(0) private[this] final val maxConcurrentConnections: DynamicIntProperty = new DynamicIntProperty("hystrix.stream.maxConcurrentConnections", 5) private[this] def streamRequest(delay: Int): Enumerator[Array[Byte]] = { val listener = new MetricJsonListener(1000) val poller = new HystrixMetricsPoller(listener, delay) poller.start() Logger.info("Starting poller") val delayDuration = FiniteDuration(delay, TimeUnit.MILLISECONDS) //val system = Akka.system val streamer = (out: OutputStream) => produceStream(poller, listener, delayDuration, system, out) val closer = () => { Logger.info("Closing poller") poller.shutdown() concurrentConnections.decrementAndGet() } val enum = Enumerator.outputStream(streamer) enum.onDoneEnumerating(closer()) } private[this] def produceStream(poller: HystrixMetricsPoller, listener: MetricJsonListener, delay: FiniteDuration, system: ActorSystem, out: OutputStream): Unit = { val strings = produce(poller, listener) if (strings.isEmpty) { out.flush() out.close() } else { strings.foreach(s => out.write(s"$s\n\n".getBytes("UTF-8"))) out.flush() Try(system.scheduler.scheduleOnce(delay)(produceStream(poller, listener, delay, system, out))) } } private[this] def produce(poller: HystrixMetricsPoller, listener: MetricJsonListener): Vector[String] = { if (!poller.isRunning) Vector() else { val jsonMessages = listener.getJsonMetrics if (jsonMessages.isEmpty) Vector("ping: ") else jsonMessages.map(j => s"data: $j") } } private class MetricJsonListener(capacity: Int) extends HystrixMetricsPoller.MetricsAsJsonPollerListener { private[this] final val metrics = new AtomicReference[Vector[String]](Vector()) @tailrec private[this] final def set(oldValue: Vector[String], newValue: Vector[String]): Boolean = { metrics.compareAndSet(oldValue, newValue) || set(oldValue, newValue) } private[this] final def getAndSet(newValue: Vector[String]): Vector[String] = { val oldValue = metrics.get set(oldValue, newValue) oldValue } def handleJsonMetric(json: String): Unit = { val oldMetrics = metrics.get() if (oldMetrics.size >= capacity) throw new IllegalStateException("Queue full") val newMetrics = oldMetrics :+ json set(oldMetrics, newMetrics) } def getJsonMetrics: Vector[String] = getAndSet(Vector()) } }
Playアプリケーションに組み込むにはまずbuild.sbt
に以下の依存関係を追加します。
libraryDependencies ++= Seq( "com.netflix.archaius" % "archaius-scala" % "0.7.4", "com.netflix.hystrix" % "hystrix-core" % "1.5.3", "com.netflix.hystrix" % "hystrix-metrics-event-stream" % "1.5.3" )
それからroutes
に以下のルーティングを追加します。
GET /hystrix.stream controllers.HystrixSupport.stream(delay: Option[Int])
このエンドポイントをダッシュボードでモニタリングすればOKです。
HystrixのコマンドをFutureに変換する
上記のPlayでのサンプルの中にHystrixのコマンドをScalaのFutureに変換するためのimplicit classがあるのですが、これを少し改変してみました。
package util import scala.concurrent.{Future, Promise} import com.netflix.hystrix.HystrixObservable object Futures { private class ForPromiseObserver[T](p: Promise[T]) extends rx.Observer[T] { def onNext(t: T): Unit = p.trySuccess(t) def onError(e: Throwable): Unit = p.tryFailure(e) def onCompleted(): Unit = () } implicit final class HystrixCommandWithScalaFuture[T](val cmd: HystrixObservable[T]) extends AnyVal { def future: Future[T] = { val promise = Promise[T]() val observer = new ForPromiseObserver(promise) cmd.observe().subscribe(observer) promise.future } } }
以下のような感じでfuture
メソッドでコマンドをFuture
に変換できます。
import utils.Futures._ class Application() extends Controller { def index = Action.async { new CommandHelloWorld("World").future.map(Ok(_)) } }
FutureをHystrixのコマンドに変換する
同期的な処理をScalaのFuture
に合成したい場合は前述のimplicit classでよいのですが、もともとFuture
を返すScalaライブラリにHystrixを適用したい場合はもう少し工夫が必要になります。以下のようなアダプタを作ってみました。
package utils import com.netflix.hystrix.{HystrixCommandGroupKey, HystrixObservableCommand} import rx.Observable import rx.lang.scala.subjects.ReplaySubject import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} abstract class HystrixFutureCommand[T](groupKey: HystrixCommandGroupKey)(implicit ec: ExecutionContext) extends HystrixObservableCommand[T](groupKey) { override def construct(): Observable[T] = { val channel = ReplaySubject[T]() run().onComplete { case Success(v) => { channel.onNext(v) channel.onCompleted() } case Failure(t) => { channel.onError(t) channel.onCompleted() } } channel.asJavaSubject } def run(): Future[T] }
こんな感じで使います。
class FutureHelloCommand extends HystrixFutureCommand[String](HystrixCommandGroupKey.Factory.asKey("HelloWorldAsync")){ def run(): Future[String] ={ Future { "Hello World!" } } override protected def resumeWithFallback(): Observable[String] = { Observable.from(Array("resume!")) } }
エラー処理を行う場合、Future#recover()
でフォールバック処理を行ってしまうとHystrixでエラーのハンドリングができないので、コマンド側のフォールバック機構を使用する必要があるという点に注意が必要です。
まとめ
HystrixはJava用のライブラリですが、簡単なアダプタを用意することでScalaでもさほど違和感なく使用することができます。
ただ、使用するフレームワークによってはモニタリング用のエンドポイントを自前で実装しないといけないのでちょっと大変かもしれません。また、ScalaのFuture
を使っている場合、スレッドプールの管理やエラーハンドリングなどHystrixの機能と被る部分があるため効率が悪かったり方針の整理が必要になりそうです。ScalaのFuture
とHystrixをシームレスに統合できる方法を考えられるともっと使いやすくなりそうです。
なお、今回Play 2.5用に修正したり新たに追加したものについては整理して冒頭で紹介したPlay + Hystrixのサンプルプロジェクトにプルリクエストしておきました。