読者です 読者をやめる 読者になる 読者になる

AWS Kinesisを使ったアプリケーションをローカルで開発する

仕事でKinesisにデータを投入し、そこからSpark Streamingでデータの加工を行うというようなことをやっているのですが、Kinesis部分だけはAWS環境を使わないとテストできないのは不便なのでなんとかしたいなぁと思っていたところ、以下のようなものを見つけました。

github.com

Node.jsとLevelDBで実装されたAWS Kinesisの実装とのこと。Node.jsが利用可能な環境であれば簡単に利用することができます。まずはnpmでインストール。

$ npm install -g kinesalite

kinesaliteコマンドで起動すると4567ポートで立ち上がります。

$ kinesalite
Listening at http://:::4567

ポート番号などはオプションで指定することができます。

$ kinesalite --help

Usage: kinesalite [--port <port>] [--path <path>] [--ssl] [options]

A Kinesis http server, optionally backed by LevelDB

Options:
--help                 Display this help message and exit
--port <port>          The port to listen on (default: 4567)
--path <path>          The path to use for the LevelDB store (in-memory by default)
--ssl                  Enable SSL for the web server (default: false)
--createStreamMs <ms>  Amount of time streams stay in CREATING state (default: 500)
--deleteStreamMs <ms>  Amount of time streams stay in DELETING state (default: 500)
--updateStreamMs <ms>  Amount of time streams stay in UPDATING state (default: 500)
--shardLimit <limit>   Shard limit for error reporting (default: 10)

Report bugs at github.com/mhart/kinesalite/issues

試しにaws-kinesis-scalaを使ってScalaプログラムからレコードを登録してみます。

val client = AmazonKinesisClient(
  new BasicCredentialsProvider("000000000000", "000000000000"))
client.setEndpoint("http://localhost:4567")

// シャード数1のストリームを作成
client.createStream("test-stream", 1)

// レコードの登録
client.putRecord(PutRecordRequest(
  streamName = "test-stream",
  partitionKey = "1234",
  data = "5678".getBytes("UTF-8")
))

こんな感じでCredentialとエンドポイントを変更するだけでkinesaliteにアクセスすることができます。コードは長くなるのでここでは書きませんが、もちろん受信もちゃんと行うこともできました。これならローカルでAWS Kinesisを使ったアプリケーションを動作させることができそうです。

…と思ったのも束の間、SparkのKinesisサポートのAPIではKCLがチェックポイントを書き込むDynamoDBのエンドポイントを指定することができないようです。あと一歩なのに無念だ…。

とりあえずデータを入れる側だけでもローカルで動かせそうなのでよしとします。SparkのKinesisサポートのインターフェースを修正すればDynamoDBの接続先も変更できそうなのでやってみようかな。