前回はHystrixの簡単な紹介を書きました。
takezoe.hatenablog.com
HystrixはJavaライブラリなのでもちろんScalaからも使うことができるのですが、そのままだと若干Scalaからは使いにくい部分もあります。今回はScala(主にPlay Framework)でHystrixを使う場合について書いてみたいと思います。
Playにモニタリング用のエンドポイントを追加する
HystrixをScalaで使うには普通にHystrixCommandをScalaで実装してそれを呼び出せばよいのですが、問題はダッシュボードからのモニタリング用のエンドポイントが標準ではサーブレットコンテナ用のものしか用意されていないということです。
探してみたところ、PlayにHystrixを組み込むサンプルを作っている方がいました。
github.com
上記のリポジトリにあるHystrixSupportというコントローラがモニタリング用のエンドポイントの実装になります。ただ、このサンプルはPlayのバージョンが古く(2.3
系)、最新のPlay 2.5ではそのままでは動かなかったり警告が出たりします。そこでこのHystrixSupportをPlay 2.5用に修正してみました。
package controllers
import play.api.mvc._
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import com.netflix.config.scala.DynamicIntProperty
import com.netflix.hystrix.contrib.metrics.eventstream.HystrixMetricsPoller
import play.api.Logger
import play.api.libs.iteratee.Enumerator
import java.util.concurrent.TimeUnit
import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorSystem
import java.io.OutputStream
import javax.inject.Inject
import akka.stream.scaladsl.Source
import play.api.libs.streams.Streams
import scala.util.Try
class HystrixSupport @Inject()(system: ActorSystem) extends Controller {
import play.api.libs.concurrent.Execution.Implicits.defaultContext
def stream(delayOpt: Option[Int]) = Action {
val numberConnections = concurrentConnections.incrementAndGet()
val maxConnections = maxConcurrentConnections.get
Some(numberConnections).
filter(_ <= maxConnections).
map(_ => delayOpt.getOrElse(500)).
fold(unavailable(maxConnections)) { delay =>
val source = Source.fromPublisher(Streams.enumeratorToPublisher(streamRequest(delay)))
Ok.chunked(source).withHeaders(
"Content-Type" -> "text/event-stream;charset=UTF-8",
"Cache-Control" -> "no-cache, no-store, max-age=0, must-revalidate",
"Pragma" -> "no-cache"
)
}
}
private[this] def unavailable(max: Int) = {
concurrentConnections.decrementAndGet()
ServiceUnavailable(s"MaxConcurrentConnections reached: $maxConcurrentConnections")
}
private[this] final val concurrentConnections: AtomicInteger = new AtomicInteger(0)
private[this] final val maxConcurrentConnections: DynamicIntProperty =
new DynamicIntProperty("hystrix.stream.maxConcurrentConnections", 5)
private[this] def streamRequest(delay: Int): Enumerator[Array[Byte]] = {
val listener = new MetricJsonListener(1000)
val poller = new HystrixMetricsPoller(listener, delay)
poller.start()
Logger.info("Starting poller")
val delayDuration = FiniteDuration(delay, TimeUnit.MILLISECONDS)
val streamer = (out: OutputStream) => produceStream(poller, listener, delayDuration, system, out)
val closer = () => {
Logger.info("Closing poller")
poller.shutdown()
concurrentConnections.decrementAndGet()
}
val enum = Enumerator.outputStream(streamer)
enum.onDoneEnumerating(closer())
}
private[this] def produceStream(poller: HystrixMetricsPoller, listener: MetricJsonListener, delay: FiniteDuration, system: ActorSystem, out: OutputStream): Unit = {
val strings = produce(poller, listener)
if (strings.isEmpty) {
out.flush()
out.close()
}
else {
strings.foreach(s => out.write(s"$s\n\n".getBytes("UTF-8")))
out.flush()
Try(system.scheduler.scheduleOnce(delay)(produceStream(poller, listener, delay, system, out)))
}
}
private[this] def produce(poller: HystrixMetricsPoller, listener: MetricJsonListener): Vector[String] = {
if (!poller.isRunning) Vector()
else {
val jsonMessages = listener.getJsonMetrics
if (jsonMessages.isEmpty) Vector("ping: ")
else jsonMessages.map(j => s"data: $j")
}
}
private class MetricJsonListener(capacity: Int) extends HystrixMetricsPoller.MetricsAsJsonPollerListener {
private[this] final val metrics = new AtomicReference[Vector[String]](Vector())
@tailrec
private[this] final def set(oldValue: Vector[String], newValue: Vector[String]): Boolean = {
metrics.compareAndSet(oldValue, newValue) || set(oldValue, newValue)
}
private[this] final def getAndSet(newValue: Vector[String]): Vector[String] = {
val oldValue = metrics.get
set(oldValue, newValue)
oldValue
}
def handleJsonMetric(json: String): Unit = {
val oldMetrics = metrics.get()
if (oldMetrics.size >= capacity) throw new IllegalStateException("Queue full")
val newMetrics = oldMetrics :+ json
set(oldMetrics, newMetrics)
}
def getJsonMetrics: Vector[String] = getAndSet(Vector())
}
}
Playアプリケーションに組み込むにはまずbuild.sbt
に以下の依存関係を追加します。
libraryDependencies ++= Seq(
"com.netflix.archaius" % "archaius-scala" % "0.7.4",
"com.netflix.hystrix" % "hystrix-core" % "1.5.3",
"com.netflix.hystrix" % "hystrix-metrics-event-stream" % "1.5.3"
)
それからroutes
に以下のルーティングを追加します。
GET /hystrix.stream controllers.HystrixSupport.stream(delay: Option[Int])
このエンドポイントをダッシュボードでモニタリングすればOKです。
HystrixのコマンドをFutureに変換する
上記のPlayでのサンプルの中にHystrixのコマンドをScalaのFutureに変換するためのimplicit classがあるのですが、これを少し改変してみました。
package util
import scala.concurrent.{Future, Promise}
import com.netflix.hystrix.HystrixObservable
object Futures {
private class ForPromiseObserver[T](p: Promise[T]) extends rx.Observer[T] {
def onNext(t: T): Unit = p.trySuccess(t)
def onError(e: Throwable): Unit = p.tryFailure(e)
def onCompleted(): Unit = ()
}
implicit final class HystrixCommandWithScalaFuture[T](val cmd: HystrixObservable[T]) extends AnyVal {
def future: Future[T] = {
val promise = Promise[T]()
val observer = new ForPromiseObserver(promise)
cmd.observe().subscribe(observer)
promise.future
}
}
}
以下のような感じでfuture
メソッドでコマンドをFuture
に変換できます。
import utils.Futures._
class Application() extends Controller {
def index = Action.async {
new CommandHelloWorld("World").future.map(Ok(_))
}
}
FutureをHystrixのコマンドに変換する
同期的な処理をScalaのFuture
に合成したい場合は前述のimplicit classでよいのですが、もともとFuture
を返すScalaライブラリにHystrixを適用したい場合はもう少し工夫が必要になります。以下のようなアダプタを作ってみました。
package utils
import com.netflix.hystrix.{HystrixCommandGroupKey, HystrixObservableCommand}
import rx.Observable
import rx.lang.scala.subjects.ReplaySubject
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
abstract class HystrixFutureCommand[T](groupKey: HystrixCommandGroupKey)(implicit ec: ExecutionContext) extends HystrixObservableCommand[T](groupKey) {
override def construct(): Observable[T] = {
val channel = ReplaySubject[T]()
run().onComplete {
case Success(v) => {
channel.onNext(v)
channel.onCompleted()
}
case Failure(t) => {
channel.onError(t)
channel.onCompleted()
}
}
channel.asJavaSubject
}
def run(): Future[T]
}
こんな感じで使います。
class FutureHelloCommand extends HystrixFutureCommand[String](HystrixCommandGroupKey.Factory.asKey("HelloWorldAsync")){
def run(): Future[String] ={
Future {
"Hello World!"
}
}
override protected def resumeWithFallback(): Observable[String] = {
Observable.from(Array("resume!"))
}
}
エラー処理を行う場合、Future#recover()
でフォールバック処理を行ってしまうとHystrixでエラーのハンドリングができないので、コマンド側のフォールバック機構を使用する必要があるという点に注意が必要です。
まとめ
HystrixはJava用のライブラリですが、簡単なアダプタを用意することでScalaでもさほど違和感なく使用することができます。
ただ、使用するフレームワークによってはモニタリング用のエンドポイントを自前で実装しないといけないのでちょっと大変かもしれません。また、ScalaのFuture
を使っている場合、スレッドプールの管理やエラーハンドリングなどHystrixの機能と被る部分があるため効率が悪かったり方針の整理が必要になりそうです。ScalaのFuture
とHystrixをシームレスに統合できる方法を考えられるともっと使いやすくなりそうです。
なお、今回Play 2.5用に修正したり新たに追加したものについては整理して冒頭で紹介したPlay + Hystrixのサンプルプロジェクトにプルリクエストしておきました。
github.com