仕事でKinesisにデータを投入し、そこからSpark Streamingでデータの加工を行うというようなことをやっているのですが、Kinesis部分だけはAWS環境を使わないとテストできないのは不便なのでなんとかしたいなぁと思っていたところ、以下のようなものを見つけました。
Node.jsとLevelDBで実装されたAWS Kinesisの実装とのこと。Node.jsが利用可能な環境であれば簡単に利用することができます。まずはnpm
でインストール。
$ npm install -g kinesalite
kinesalite
コマンドで起動すると4567ポートで立ち上がります。
$ kinesalite
Listening at http://:::4567
ポート番号などはオプションで指定することができます。
$ kinesalite --help Usage: kinesalite [--port <port>] [--path <path>] [--ssl] [options] A Kinesis http server, optionally backed by LevelDB Options: --help Display this help message and exit --port <port> The port to listen on (default: 4567) --path <path> The path to use for the LevelDB store (in-memory by default) --ssl Enable SSL for the web server (default: false) --createStreamMs <ms> Amount of time streams stay in CREATING state (default: 500) --deleteStreamMs <ms> Amount of time streams stay in DELETING state (default: 500) --updateStreamMs <ms> Amount of time streams stay in UPDATING state (default: 500) --shardLimit <limit> Shard limit for error reporting (default: 10) Report bugs at github.com/mhart/kinesalite/issues
試しにaws-kinesis-scalaを使ってScalaプログラムからレコードを登録してみます。
val client = AmazonKinesisClient( new BasicCredentialsProvider("000000000000", "000000000000")) client.setEndpoint("http://localhost:4567") // シャード数1のストリームを作成 client.createStream("test-stream", 1) // レコードの登録 client.putRecord(PutRecordRequest( streamName = "test-stream", partitionKey = "1234", data = "5678".getBytes("UTF-8") ))
こんな感じでCredentialとエンドポイントを変更するだけでkinesaliteにアクセスすることができます。コードは長くなるのでここでは書きませんが、もちろん受信もちゃんと行うこともできました。これならローカルでAWS Kinesisを使ったアプリケーションを動作させることができそうです。
…と思ったのも束の間、SparkのKinesisサポートのAPIではKCLがチェックポイントを書き込むDynamoDBのエンドポイントを指定することができないようです。あと一歩なのに無念だ…。
とりあえずデータを入れる側だけでもローカルで動かせそうなのでよしとします。SparkのKinesisサポートのインターフェースを修正すればDynamoDBの接続先も変更できそうなのでやってみようかな。