TL;DR
- DynamoDB LocalはDynamoDB Streamsをサポートしている.
- 本物のDynamoDB Streamsのように,一定期間以上古いレコードについてはトリミングされる (具体的なトリミング期間については未調査).
- ドキュメントが少ない (見つけられなかった……) のに加え,ライセンス的に逆コンパイルは不可能なのでどういう仕組みで実現されているかがわからない……
本文
DynamoDB Localというローカル環境で動くDynamoDBがあり *1,これはDynamoDBの動作検証やテストに利用するためのミドルウェアなんですが (もちろん本番環境で使うものではない),これがDynamoDB Streamsをサポートしていることはあまり知られていません.僕も一昨日知りました.
Yes, the latest version of DynamoDB Local supports DynamoDB Streams on the same port configured for the DynamoDB service (by default 8000).
https://forums.aws.amazon.com/thread.jspa?threadID=231696
つまりこれはDynamoDB Streamsを使っているようなソフトウェアのテストや動作の検証をDynamoDB Localを使って行えるということです.
以下はgoの場合のサンプルコードですが,内容はAWS上で動いているDynamoDBでDynamoDB Streamsを利用する場合と大差はありません.
dynamoSession, err := session.NewSession(&aws.Config{
Region: aws.String("ap-northeast-1"),
Endpoint: aws.String("http://localhost:8000"),
})
if err != nil {
panic (err)
}
dynamo := dynamodb.New(dynamoSession)
dynamoStream := dynamodbstreams.New(session)
DynamoDB Localに向けたDynamoDB及びDynamoDB Streamsのクライアントを作ります.
_, err = dynamo.CreateTable(&dynamodb.CreateTableInput{
AttributeDefinitions: []*dynamodb.AttributeDefinition{
{
AttributeName: aws.String("ID"),
AttributeType: aws.String("S"),
},
{
AttributeName: aws.String("Name"),
AttributeType: aws.String("S"),
},
},
KeySchema: []*dynamodb.KeySchemaElement{
{
AttributeName: aws.String("ID"),
KeyType: aws.String("HASH"),
},
{
AttributeName: aws.String("Name"),
KeyType: aws.String("RANGE"),
},
},
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(1),
WriteCapacityUnits: aws.Int64(1),
},
TableName: aws.String("User"),
StreamSpecification: &dynamodb.StreamSpecification{
StreamEnabled: aws.Bool(true),
StreamViewType: aws.String("NEW_AND_OLD_IMAGES"),
},
})
StreamSpecificationを用いてStreamsを有効にしたテーブルを作成します.
_, err = dynamo.PutItem(&dynamodb.PutItemInput{
TableName: aws.String("User"),
Item: map[string]*dynamodb.AttributeValue{
"ID": {
S: aws.String("ID-1"),
},
"Name": {
S: aws.String("moznion"),
},
},
ReturnConsumedCapacity: aws.String("none"),
})
作成したテーブルに対してitemをputしてみます.これでUser tableにレコードが入っているはず.
table, _ := dynamo.DescribeTable(&dynamodb.DescribeTableInput{
TableName: aws.String("User"),
})
streamArn := table.Table.LatestStreamArn
stream, _ := dynamoStream.DescribeStream(&dynamodbstreams.DescribeStreamInput{
StreamArn: streamArn,
})
shards := stream.StreamDescription.Shards
DescribeTableでtable情報を引いて,DynamoDB StreamsのstreamArnを取得します.そのstreamArnを利用してStreamのshardsを引っ張ってきます.
Shard: for _, shard := range shards { out, err := dynamoStream.GetShardIterator(&dynamodbstreams.GetShardIteratorInput{ StreamArn: streamArn, ShardId: shard.ShardId, ShardIteratorType: aws.String(dynamodbstreams.ShardIteratorTypeTrimHorizon), }) if err != nil { logrus.Error(err) continue } nextItr := out.ShardIterator for nextItr != nil { record, err := dynamoStream.GetRecords(&dynamodbstreams.GetRecordsInput{ ShardIterator: nextItr, }) if err != nil { continue Shard } records := record.Records // Do something nextItr = record.NextShardIterator } }
shardsを用いて各shardのShardIteratorを取得し,そのshard iteratorを使うことでstreamからレコードを取ってくることが可能となります.後は煮るなり焼くなりできるでしょう.
なおこのコードはあくまでサンプルなので,実際にはshards周りの処理は状況に応じて書き換える必要があるでしょう.
所感
軽く動作検証をした感想ですが,テストやDynamoDB Streamsの動作検証用途であれば十分利用できると思います.「一定期間以上古いレコードはトリミングされる」という挙動もエミュレートしていてすごい (その期間については未検証).
一方で冒頭にも書きましたが,DynamoDB LocalのDynamoSB Streamsはドキュメントが極端に少なく (というかDynamoDB Local自体の情報が少ない気がする……),また具体的にどのような仕組みで動いているのかがわからないという部分がネックと言えばネックとなりえると思います.が,まあDynamoDB Local自体がSQLiteの超テクノロジーで動いているわけだし,まあ……という気持ちではいます.
それにしてもDynamoDB LocalにStreams対応入っているの本当にすごい……
とにかく,DynamoDB LocalはDynamoDB Streamsに対応しているという話でした.