昨年11月、米ラスベガスで開催されたカンファレンス「AWS re:Invent 2013」で、ストリーミングデータをリアルタイムに処理するサービスである「Amazon Kinesis」が発表されました。ストリーミングデータをリアルタイムに処理...と、なにやら難しそうな感じがしますが、API、SDKが公開されているので簡単に試すことができます。今回はRuby向けAWS SDKを利用し、Amazon Kinesisを操作してみます。
Amazon Kinesisとは
Amazon Kinesisは、Amazon Web Servicesが提供するストリーミングデータをリアルタイムに処理する為のサービスです。近年ビッグデータの処理や分析が注目されてきましたが、バッチ処理が主流でした。Amazon Kinesisを利用すると、リアルタイムにストリーミングデータの処理や分析が可能です。また、Amazon S3やAmazon DynamoDB、Amazon Redshiftなどのサービスとの連携が容易な点も特徴として挙げられます。
詳細は以下のページをご参照ください。
本記事では、Amazon KinesisをRuby向けAmazon SDKを利用して操作する方法を紹介します。
Kinesis Management Consoleから ストリームを作成する
まずはWeb画面からストリームを作成してみます。Kinesisストリームは、データのキャプチャ・格納・転送を行います。 AWS Managemant Consoleのサービスの一覧からKinesisを選択します。
次にCreate Streamを押し、ストリームを作成していきます。Number of Shardsという設定項目がありますが、これはどれだけ分割して処理を行うかを指します。まずはサンプルということで、シャード数は1に設定しました。
Createボタンを押すとストリームが作成されます。ストリーム名とシャード数の入力と、数回ボタンを押すだけでストリームが作れてしまいました。
ストリームは以下の表の通り、1シャードにつき読み込みは5トランザクション/秒 (2MB/秒 まで)、書き込みは1000トランザクション/秒(1MB/秒 まで)処理することができます。
利用料金は、1 シャード 0.015 USD/1h、0.028 USD/100万putです。※記事執筆時点
詳細は以下をご参照ください。
RubyからKinesisを操作する
ここからRubyを使ってKinesisを操作する方法を紹介していきます。
Ruby向けAWS SDKをインストール
まずは、RubyからKinesisを操作する為にAWS SDKのGemをインストールします。
$ gem install aws-sdk
動作確認した環境は以下の通りです。
- Amazon Linux AMI(3.2.39-6.88.amzn1.x86_64)
- Ruby 2.0.0p247
- aws-sdk 1.37.0
AWS SDKを利用する際にAWSのアクセスキーと秘密鍵を設定する必要がありますので、各自取得してください。
また、サンプルとして掲載するコードでは、キー情報を環境変数から取得するようにしてあります。動作を確認する際は、環境変数にAWS_ACCESS_KEY_ID
とAWS_SECRET_ACCESS_KEY
を追加して実行してください。
インストールしたAWS SDKのAPI仕様については、以下のドキュメントをご参照ください。
ストリームにデータを投入
まずは、先ほど作成したストリームにデータを投入してみます。put.rbを実行すると、標準入力で指定したストリームに対して、データとして現在時刻を1秒毎に追加していきます。パーティションキーは、ストリームにデータを投入する際にどのシャードに割り当てるかを決めるキーです。今回は、ECサイトのカテゴリを想定した値をランダムに設定しました。プログラムを実行すると、Ctrl-c などで強制終了するまで値を追加し続けるので、適当なタイミングで止めてください。
- 利用した
AWS::Kinesis::Client
のインスタンスメソッド
require 'aws-sdk' print "Enter target stream name: " stream_name = gets.chomp begin client = AWS::Kinesis.new( access_key_id: ENV['AWS_ACCESS_KEY_ID'], secret_access_key: ENV['AWS_SECRET_ACCESS_KEY']).client categories = %w{Foods Books Toys Electronics Sports Clothing Shoes Music Movies Games} loop do now = Time.now.to_s partition_key = categories.sample response = client.put_record( stream_name: stream_name, data: now, partition_key: partition_key) puts "Data : #{now}, Partition Key : #{partition_key}, Shard Id : #{response.shard_id}, Sequence Number : #{response.sequence_number}" sleep(1) end rescue => e puts "Error: #{e.message}" abort end
ストリームに対して、連続して現在時刻のデータが追加されていくのが確認できます。シャードは1つしか作成していないので、パーティションキーが異なっていても同一シャードに追加されていきます。
$ ruby put.rb Enter target stream name: example Data : 2014-03-27 08:41:13 +0000, Partition key : Electronics, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966018680468331689319184156313255937 Data : 2014-03-27 08:41:15 +0000, Partition key : Games, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966021152193174398088752583170588673 Data : 2014-03-27 08:41:17 +0000, Partition key : Games, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966022368340774319723773245008117761 Data : 2014-03-27 08:41:18 +0000, Partition key : Books, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966023755053025178259024291748118529 Data : 2014-03-27 08:41:19 +0000, Partition key : Books, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966026288268916696677167016357920769 Data : 2014-03-27 08:41:21 +0000, Partition key : Games, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966027745649131213584694134764994561 Data : 2014-03-27 08:41:22 +0000, Partition key : Shoes, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966028747790009190991264029989666817 . . .
ストリームからデータを取得
次に、先ほど投入したデータを取得してみます。get.rbを実行すると、標準入力で指定したストリームの全てのシャードから並列でデータを取得し、表示します。(今回は1シャードなのでシングルスレッドで実行されます)
- 利用した
AWS::Kinesis::Client
のインスタンスメソッド
require 'aws-sdk' require 'parallel' print "Enter target stream name: " stream_name = gets.chomp if stream_name.empty? puts "target stream name is required" abort end begin client = AWS::Kinesis.new( access_key_id: ENV['AWS_ACCESS_KEY_ID'], secret_access_key: ENV['AWS_SECRET_ACCESS_KEY']).client shards = client.describe_stream(stream_name: stream_name).stream_description.shards shard_ids = shards.map(&:shard_id) Parallel.each(shard_ids, in_threads: shard_ids.count) do |shard_id| shard_iterator_info = client.get_shard_iterator( stream_name: stream_name, shard_id: shard_id, shard_iterator_type: 'TRIM_HORIZON') shard_iterator = shard_iterator_info.shard_iterator loop do records_info = client.get_records( shard_iterator: shard_iterator, limit: 100) records_info.records.each do |record| puts "Data : #{record.data}, Partition Key : #{record.partition_key}, Shard Id : #{shard_id}, Sequence Number : #{record.sequence_number}" end shard_iterator = records_info.next_shard_iterator sleep(1) end end rescue => e puts "Error: #{e.message}" abort end
先ほど投入したデータが表示されました。
$ ruby get.rb Enter target stream name: example Data : 2014-03-27 08:41:13 +0000, Partition Key : Electronics, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966018680468331689319184156313255937 Data : 2014-03-27 08:41:15 +0000, Partition Key : Games, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966021152193174398088752583170588673 Data : 2014-03-27 08:41:17 +0000, Partition Key : Games, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966022368340774319723773245008117761 Data : 2014-03-27 08:41:18 +0000, Partition Key : Books, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966023755053025178259024291748118529 Data : 2014-03-27 08:41:19 +0000, Partition Key : Books, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966026288268916696677167016357920769 Data : 2014-03-27 08:41:21 +0000, Partition Key : Games, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966027745649131213584694134764994561 Data : 2014-03-27 08:41:22 +0000, Partition Key : Shoes, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966028747790009190991264029989666817 Data : 2014-03-27 08:41:23 +0000, Partition Key : Toys, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966031258972151233076898861739409409 Data : 2014-03-27 08:41:25 +0000, Partition Key : Music, Shard Id : shardId-000000000000, Sequence Number : 49537934509153387391966032711112311991858415213920387073 . . .
ストリームの作成
次にストリームをKinesis Management Consoleからではなく、プログラムを実行することで作成してみます。create.rbを実行すると、標準入力からストリーム名とシャード数を受け取り、ストリームを作成します。
- 利用した
AWS::Kinesis::Client
のインスタンスメソッド
require 'aws-sdk' print "Enter new stream name: " stream_name = gets.chomp print "Enter number of shards: " shard_count = gets.chomp.to_i begin client = AWS::Kinesis.new( access_key_id: ENV['AWS_ACCESS_KEY_ID'], secret_access_key: ENV['AWS_SECRET_ACCESS_KEY']).client client.create_stream( stream_name: stream_name, shard_count: shard_count) stream_status = client.describe_stream(stream_name: stream_name).stream_description.stream_status if 'CREATING' == stream_status puts "#{stream_name} is creating..." else puts "Error: Failed to create a stream" end rescue => e puts "Error: #{e.message}" abort end
example2という名前、シャード数は2でストリームを作成してみました。
$ ruby create.rb Enter new stream name: example2 Enter number of shards: 2 example2 is creating...
Kinesis Management Consoleで新しいストリームが作成されていることが確認できます。ストリームのステータスは以下の4種類が有り、ストリームの作成直後はCREATING、作成が完了するとACTIVEにステータスが遷移します。
- CREATING
- ACTIVE
- UPDATING
- DELETING
指定した通り、シャード数2で作成されたことが確認出来ます。
ストリームの一覧を表示
WebからではなくCLIからストリームの一覧を確認できるように、ストリームの一覧を表示するプログラムを作成してみます。表示するデータは、ストリーム名、開いているシャード数、閉じているシャード数、ステータスです。
- 利用した
AWS::Kinesis::Client
のインスタンスメソッド
require 'aws-sdk' begin client = AWS::Kinesis.new( access_key_id: ENV['AWS_ACCESS_KEY_ID'], secret_access_key: ENV['AWS_SECRET_ACCESS_KEY']).client stream_names = client.list_streams.stream_names if stream_names.empty? puts "You have no stream" abort else stream_names.each do |name| stream_description = client.describe_stream(stream_name: name).stream_description shard_count = stream_description.shards.count closed_shard_count = stream_description.shards.find_all{|s| s.sequence_number_range.key?(:ending_sequence_number)}.count status = stream_description.stream_status puts "Stream Name : #{name}, Open Shards : #{shard_count - closed_shard_count}, Closed Shards : #{closed_shard_count}, Status : #{status}" end end rescue => e puts "Error: #{e.message}" abort end
最初にKinesis Management Consoleから作成したストリームと、プログラムを実行して作成したストリームが表示されました。
$ ruby stream_lists.rb Stream Name : example, Open Shards : 1, Closed Shards : 0, Status : ACTIVE Stream Name : example2, Open Shards : 2, Closed Shards : 0, Status : ACTIVE
ストリームの削除
作成したストリームをプログラムから削除してみます。delete.rbを実行すると、標準入力で削除する対象のストリーム名を聞かれ、そこで指定したストリームを削除します。
- 利用した
AWS::Kinesis::Client
のインスタンスメソッド
require 'aws-sdk' print "Enter stream name to be deleted: " stream_name = gets.chomp print "Are you sure to delete it? [Y/N]: " until gets.chomp == 'Y' puts "Exiting on user command" abort end begin client = AWS::Kinesis.new( access_key_id: ENV['AWS_ACCESS_KEY_ID'], secret_access_key: ENV['AWS_SECRET_ACCESS_KEY']).client client.delete_stream(stream_name: stream_name) stream_status = client.describe_stream(stream_name: stream_name).stream_description.stream_status if 'DELETING' == stream_status puts "#{stream_name} is deleting..." else puts "Error: Failed to delete a stream" end rescue => e puts "Error: #{e.message}" abort end
exampleという名前のストリームを削除しました。
$ ruby delete.rb Enter stream name to be deleted: example Are you sure to delete it? [Y/N]: Y example is deleting...
しばらく待ってから一覧を表示してみます。
$ ruby stream_lists.rb Stream Name : example2, Open Shards : 2, Closed Shards : 0, Status : ACTIVE
exampleが削除され、example2だけ残りました。
複数シャードへの分散を確認する
複数のシャードを持つストリームを作成し、データが分散されるか確認してみます。今回はシャードを3つで作成します。
$ ruby create.rb Enter new stream name: example3 Enter number of shards: 3 example3 is creating...
次にデータを投入します。
$ ruby put.rb Enter target stream name: example3 Data : 2014-03-28 07:02:28 +0000, Partition key : Books, Shard Id : shardId-000000000001, Sequence Number : 49537963222187958072590960692563717737348864947930529809 Data : 2014-03-28 07:02:30 +0000, Partition key : Shoes, Shard Id : shardId-000000000001, Sequence Number : 49537963222187958072590961901947896970744783197272277009 Data : 2014-03-28 07:02:31 +0000, Partition key : Games, Shard Id : shardId-000000000000, Sequence Number : 49537963222165657327392433705698316417920875087729262593 Data : 2014-03-28 07:02:33 +0000, Partition key : Music, Shard Id : shardId-000000000000, Sequence Number : 49537963222165657327392435069727858322551272391820967937 Data : 2014-03-28 07:02:34 +0000, Partition key : Sports, Shard Id : shardId-000000000001, Sequence Number : 49537963222187958072590968167052073956416553280310607889 . . .
別々のシャードにデータが分散されています。また、パーティションキーが同一であれば、同一のシャードで処理していることが確認できました。
$ ruby get.rb Enter target stream name: example3 Data : 2014-03-28 07:02:31 +0000, Partition Key : Games, Shard Id : shardId-000000000000, Sequence Number : 49537963222165657327392433705698316417920875087729262593 Data : 2014-03-28 07:02:33 +0000, Partition Key : Music, Shard Id : shardId-000000000000, Sequence Number : 49537963222165657327392435069727858322551272391820967937 Data : 2014-03-28 07:02:28 +0000, Partition Key : Books, Shard Id : shardId-000000000001, Sequence Number : 49537963222187958072590960692563717737348864947930529809 Data : 2014-03-28 07:02:30 +0000, Partition Key : Shoes, Shard Id : shardId-000000000001, Sequence Number : 49537963222187958072590961901947896970744783197272277009 Data : 2014-03-28 07:02:34 +0000, Partition Key : Sports, Shard Id : shardId-000000000001, Sequence Number : 49537963222187958072590968167052073956416553280310607889 Data : 2014-03-28 07:02:36 +0000, Partition Key : Shoes, Shard Id : shardId-000000000001, Sequence Number : 49537963222187958072590970930195096956296320928312197137 Data : 2014-03-28 07:02:37 +0000, Partition Key : Music, Shard Id : shardId-000000000000, Sequence Number : 49537963222165657327392441327128852205386267870619500545 . . .
シャードの結合
ストリームの複数のシャードを結合するコードを作成しました。トラフィックが減少した際など、シャード数を減らすのに利用します。
- 利用した
AWS::Kinesis::Client
のインスタンスメソッド
require 'aws-sdk' print "Enter target stream name: " stream_name = gets.chomp begin client = AWS::Kinesis.new( access_key_id: ENV['AWS_ACCESS_KEY_ID'], secret_access_key: ENV['AWS_SECRET_ACCESS_KEY']).client open_shard_ids = client.describe_stream(stream_name: stream_name).stream_description.shards.find_all{|s| not s.sequence_number_range.key?(:ending_sequence_number)}.map(&:shard_id) if open_shard_ids.count < 2 puts "Error: require two or more open shards" abort end puts "Shard List is..." open_shard_ids.each {|id| puts "Shard Id : #{id}"} print "Enter shard id: " shard_id = gets.chomp print "Enter adjacent shard id: " adjacent_shard_id = gets.chomp client.merge_shards( stream_name: stream_name, shard_to_merge: shard_id, adjacent_shard_to_merge: adjacent_shard_id) stream_status = client.describe_stream(stream_name: stream_name).stream_description.stream_status if 'UPDATING' == stream_status puts "#{stream_name} is merging..." else puts "Error: Failed to merge stream" end rescue => e puts "Error: #{e.message}" abort end
シャードが2以上あるストリームでないと結合できないので、example2で行います。シャードを結合する前に、データを投入しておきます。
$ ruby put.rb Enter target stream name: example2 Data : 2014-03-28 07:22:29 +0000, Partition key : Shoes, Shard Id : shardId-000000000001, Sequence Number : 49537962751553031402802212298119032072372410155425333265 Data : 2014-03-28 07:22:31 +0000, Partition key : Foods, Shard Id : shardId-000000000001, Sequence Number : 49537962751553031402802213723769919920783757920734019601 Data : 2014-03-28 07:22:32 +0000, Partition key : Books, Shard Id : shardId-000000000000, Sequence Number : 49537962751530730657603684211962134769420069484336513025 Data : 2014-03-28 07:22:33 +0000, Partition key : Toys, Shard Id : shardId-000000000000, Sequence Number : 49537962751530730657603686744436901623540520213213609985 Data : 2014-03-28 07:22:35 +0000, Partition key : Shoes, Shard Id : shardId-000000000001, Sequence Number : 49537962751553031402802218687441781114772061310385389585
次にシャードを結合します。merge.rbを実行し、対象のストリーム、結合するシャードを指定すると、シャードが結合されます。
$ ruby merge.rb Enter target stream name: example2 Shard List is... shardId-000000000000 shardId-000000000001 Enter shard id: shardId-000000000000 Enter adjacent shard id: shardId-000000000001 example2 is merging...
シャードの結合、分割中はUPDATINGというステータスになるようです。
$ ruby stream_lists.rb Stream Name : example2, Open Shards : 2, Closed Shards : 0, Status : UPDATING Stream Name : example3, Open Shards : 3, Closed Shards : 0, Status : ACTIVE
しばらく待ってから再度ストリームのリストを表示すると、開いているシャードが1に、閉じているシャードが2に変化しています。
$ ruby stream_lists.rb Stream Name : example2, Open Shards : 1, Closed Shards : 2, Status : ACTIVE Stream Name : example3, Open Shards : 3, Closed Shards : 0, Status : ACTIVE
データを取得してみると、閉じたシャードからでもデータが取れています。
$ ruby get.rb Enter target stream name: example2 Data : 2014-03-28 07:22:32 +0000, Partition Key : Books, Shard Id : shardId-000000000000, Sequence Number : 49537962751530730657603684211962134769420069484336513025 Data : 2014-03-28 07:22:33 +0000, Partition Key : Toys, Shard Id : shardId-000000000000, Sequence Number : 49537962751530730657603686744436901623540520213213609985 Data : 2014-03-28 07:22:29 +0000, Partition Key : Shoes, Shard Id : shardId-000000000001, Sequence Number : 49537962751553031402802212298119032072372410155425333265 Data : 2014-03-28 07:22:31 +0000, Partition Key : Foods, Shard Id : shardId-000000000001, Sequence Number : 49537962751553031402802213723769919920783757920734019601 Data : 2014-03-28 07:22:35 +0000, Partition Key : Shoes, Shard Id : shardId-000000000001, Sequence Number : 49537962751553031402802218687441781114772061310385389585 Data : 2014-03-28 07:22:36 +0000, Partition Key : Clothing, Shard Id : shardId-000000000001, Sequence Number : 49537962751553031402802220064378010250345105119535693841
これは以下に書かれている通り、24時間はデータの参照が可能だからです。また、追加のデータ投入は受け付けないようです。
Closed shards no longer accept data from producers. Data in these shards is still available to consumers, but only for 24 hours. Resharding operations result in closed shards.
本当に閉じたシャードにデータを追加できないか試してみます。
$ ruby put.rb Enter target stream name: example2 Data : 2014-03-28 07:40:31 +0000, Partition key : Clothing, Shard Id : shardId-000000000002, Sequence Number : 495 37963958670068254067683947805569091762202451772440609 Data : 2014-03-28 07:40:33 +0000, Partition key : Books, Shard Id : shardId-000000000002, Sequence Number : 495379 63958670068254067685307460452340364744109768835105 Data : 2014-03-28 07:40:35 +0000, Partition key : Shoes, Shard Id : shardId-000000000002, Sequence Number : 495379 63958670068254067687659056849714751193781410725921 Data : 2014-03-28 07:40:36 +0000, Partition key : Books, Shard Id : shardId-000000000002, Sequence Number : 495379 63958670068254067690329753186742648516464523345953 . . .
確かに、shardId-000000000002にしかデータが追加されません。
シャードの分割
シャードを分割するコードを作成しました。シャードの分割は、結合とは逆にトラフィックが増加した際などに行います。
- 利用した
AWS::Kinesis::Client
のインスタンスメソッド
require 'aws-sdk' print "Enter target stream name: " stream_name = gets.chomp begin client = AWS::Kinesis.new( access_key_id: ENV['AWS_ACCESS_KEY_ID'], secret_access_key: ENV['AWS_SECRET_ACCESS_KEY']).client puts "Shard List is..." shards = client.describe_stream(stream_name: stream_name).stream_description.shards shards.find_all{|s| not s.sequence_number_range.key?(:ending_sequence_number)}.each do |s| puts "Shard Id : #{s.shard_id}, Starting Hash Key : #{s.hash_key_range.starting_hash_key}, Ending Hash Key : #{s.hash_key_range.ending_hash_key}, Middle Of Hash Key : #{s.hash_key_range.ending_hash_key.to_i / 2}" end print "Enter shard id: " shard_id = gets.chomp print "Enter new starting hash key: " new_starting_hash_key = gets.chomp client.split_shard( stream_name: stream_name, shard_to_split: shard_id, new_starting_hash_key: new_starting_hash_key) stream_status = client.describe_stream(stream_name: stream_name).stream_description.stream_status if 'UPDATING' == stream_status puts "#{stream_name} is spliting..." else puts "Error: Failed to split stream" end rescue => e puts "Error: #{e.message}" abort end
先ほど結合したexample2のシャードを、split.rbを実行して分割してみます。分割位置は対象のシャードにマッピングされているハッシュ値の範囲で指定します。
$ ruby split.rb Enter target stream name: example2 Shard List is... Shard Id : shardId-000000000002, Starting Hash Key : 0, Ending Hash Key : 340282366920938463463374607431768211455, Middle Of Hash Key : 170141183460469231731687303715884105727 Enter shard id: shardId-000000000002 Enter new starting hash key: 170141183460469231731687303715884105727 example2 is spliting...
分割中。分割前は開いているシャードは1つです。
$ ruby stream_lists.rb Stream Name : example2, Open Shards : 1, Closed Shards : 2, Status : UPDATING Stream Name : example3, Open Shards : 3, Closed Shards : 0, Status : ACTIVE
分割が完了し、開いているシャードが2つに増えました。
$ ruby stream_lists.rb Stream Name : example2, Open Shards : 2, Closed Shards : 3, Status : ACTIVE Stream Name : example3, Open Shards : 3, Closed Shards : 0, Status : ACTIVE
再度split.rbを途中まで実行し、ハッシュ値を確認してみます。
$ ruby split.rb Enter target stream name: example2 Shard List is... Shard Id : shardId-000000000003, Starting Hash Key : 0, Ending Hash Key : 170141183460469231731687303715884105726, Middle Of Hash Key : 85070591730234615865843651857942052863 Shard Id : shardId-000000000004, Starting Hash Key : 170141183460469231731687303715884105727, Ending Hash Key : 340282366920938463463374607431768211455, Middle Of Hash Key : 170141183460469231731687303715884105727 Enter shard id:
指定したハッシュ値からシャードが分割されていることが確認できます。
まとめ
今回はAmazon KinesisをRubyのAWS SDKを利用して操作しました。実際になにかを作る際はDynamoDBにデータを保存してランキング集計したり、Stormと連携してなにかしらの処理を行ったり、Redshiftにデータを送り込んで分析したりするのが便利そうです。今回は基本的な使い方のみの紹介となってしまったので、今後もう少し応用的な使い方を紹介していけたらと思います。