Lambda+SQSを使ってDynamoDBの特定のデータを別アカウントのDynamoDBへコピー(移行)させてみる
どうも!大阪オフィスの西村祐二です。
開発環境で使っていたデータを本番環境など別環境でも使いたい場面はよくあると思います。
私もあるAWSアカウント上にあるDynamoDBの特定のデータを別のアカウントのDynamoDBへコピーしたい場面に出くわしましたので、Lambda+SQSを使ってDynamoDBの負荷をあげずにコピーする仕組みを作ってみました。
※今回紹介する方法以外にも色んな方法があります。こんな方法もあるんだな程度で考えてもらえれば幸いです。
構成図
この構成の良さそうな点
個人的に思うこの構成の良さそうな点をあげてみます。
- アカウントを跨ぐときの設定がSQSで許可するだけで簡単
- ある程度速度調整ができる(今回は負荷をあげないようにゆっくりコピーするようにしているが速くもできる)
- コスト削減できるかも?
ただ、以下を考慮しないといけないです。
- 複数回DynamoDBに書き込まれても大丈夫なように考慮する
- Lambdaが起動しまくることを考慮する
では、さっそくやっていきましょう。
主な作業
- コピー先のDynamoDBテーブル作成
- SQSの設定
- SQSをトリガーとしたLambdaの作成
- コピー元のDynamoDBからデータを取得し、コピー先アカウントのSQSへputするスクリプト作成
コピー元アカウントのDynamoDBテーブル(アカウントA)
テストデータとして、ユーザ情報をもつテーブルがあるとします。
その中の特定のデータだけ別のアカウントへコピーしたいというシナリオです。
プライマリキーは「userid」としています。
コピー先のDynamoDBテーブル(アカウントB)
コピー元と同じテーブルを作成しておきます。
プライマリキーは「userid」として作成しています。
SQSの設定(アカウントB)
別のAWSアカウント(アカウントA)からのアクセスを受け付けるSQSキューを作成します。
▼「test」というキューを作成します。他の値はデフォルトのままです。
▼「アクセス許可の追加」をクリックし、別アカウントからキューにアクセスできるようにします。
▼12桁のAWSアカウントを入力し、許可するアクションを設定します。今回はすべてアクションを許可しています。
▼プリンシパルの値がrootで設定されているので、適切なIAMユーザに変更しておくのがいいかと思います。
Lambdaの作成(アカウントB)
SQSをトリガーとしたコピー先のDynamoDBテーブルにputするLambdaを作成します。
まず、LambdaのポリシーにDynamoDBとSQSを実行する権限を付与しておいてください。
ちなみに、LambdaからSQSを実行するためのマネージドポリシーがあります。「AWSLambdaSQSQueueExecutionRole」
Lambdaのタイムアウト時間はトリガーに設定したSQSのキューの「デフォルトの可視性タイムアウト」の値以上に設定する必要があります。今回は30秒に設定しているので、Lambdaのタイムアウト時間を30秒に設定します。
- DynamoDBへの書き込み速度を調整します。
- 同時実行数を「1」にしておく
- そうしないと複数のLambdaが並列に実行されて、DynamoDBの負荷があがる
- また、同時実行の制限に引っかかり、他のLambdaに影響与える可能性がある
- トリガーとなるSQSのバッチサイズを「1」にしておく
- Lambdaが複数のデータを取得しないように制限
- DynamoDBの負荷をあげないように1アイテムずつputさせるため
- 同時実行数を「1」にしておく
DynamoDBへputするプログラムをpythonで作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | import boto3 import os import ast # dynamo DYNAMO = boto3.resource( 'dynamodb' ) DYNAMO_CLIENT = DYNAMO.meta.client DYNAMO_TABLE_NAME = os.getenv( 'TABLE_NAME' , 'test' ) TABLE = DYNAMO.Table(DYNAMO_TABLE_NAME) def put_dynamo(data): """コピー先のDynamoDBテーブルへputする.""" try : response = TABLE.put_item( Item = { 'userid' : data[ 'userid' ], 'firstname' : data[ 'firstname' ], 'lastname' : data[ 'lastname' ], 'createdAt' : data[ 'createdAt' ], 'updatedAt' : data[ 'updatedAt' ] } ) return response except Exception as error: raise error def conv_dict( str ): """Str -> Dict.""" dict = ast.literal_eval( str ) return dict def lambda_handler(event, context): """main.""" try : for i in range ( len (event[ 'Records' ])): print (event[ 'Records' ][i - 1 ][ 'body' ]) getdata = event[ 'Records' ][i - 1 ][ 'body' ] put_dynamo(conv_dict(getdata)) except Exception as error: raise error |
コピー元のDynamoDBからデータを取得し、SQSに送付するスクリプト作成
今回は定期的にコピーを行う予定がなかったので、ローカルでの実行を想定しています。
ディレクトリ構成は下記のようになっています。
別アカウントにコピーしたいデータのキーを行ごとにテキストファイル(get_key_list.txt)に書き出しておきます。
1 2 3 | ├── data_put_sqs.py └── list └── get_key_list.txt |
プログラムはpythonで作成します。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | import boto3 import os import json import decimal from time import sleep from boto3.dynamodb.conditions import Key, Attr from botocore.exceptions import ClientError # dynamo DYNAMO = boto3.resource( 'dynamodb' ) DYNAMO_CLIENT = DYNAMO.meta.client DYNAMO_TABLE_NAME = os.getenv( 'TABLE_NAME' , 'test' ) TABLE = DYNAMO.Table(DYNAMO_TABLE_NAME) # SQS SQS = boto3.resource( 'sqs' ) QUEUE = SQS.Queue( 'url' ) class DecimalEncoder(json.JSONEncoder): """dynamodb get_item return number type for encoder.""" def default( self , obj): """Return encode data.""" if isinstance (obj, decimal.Decimal): if obj % 1 > 0 : return float (obj) else : return int (obj) return super (DecimalEncoder, self ).default(obj) def get_item(key): """DynamoDBからitemを取得する.""" try : response = TABLE.get_item( Key = { 'userid' : str (key) } ) encode = json.dumps( response[ 'Item' ], cls = DecimalEncoder, ensure_ascii = False ) return encode except ClientError as e: print (e.response[ 'Error' ][ 'Message' ]) raise e def put_sqs(item): """sqsのキューにメッセージをputする.""" resp = QUEUE.send_message( MessageBody = item, QueueUrl = URL, DelaySeconds = 0 ) def key_list(filename): """コピー対象のuseridをテキストから取得.""" path = os.getcwd() + f '/list/{filename}' with open (path) as f: # 一行ごとをリストに入れてreturn return f.read().split() def main(): """main.""" try : filelist = [ 'get_key_list.txt' ] for f in filelist: for i in key_list(f): result = get_item(i) put_sqs(result) # 1秒スリープ処理 sleep( 1 ) except Exception as e: raise e if __name__ = = "__main__" : main() |
- 13行目
- コピー元のDynamoDBテーブル名を指定します。
- 19行目
- コピー先のSQSキュー名を指定します。
コピー元の負荷もなるべくあげたくなかったので、1アイテム取得し、SQSにputした後、1秒のスリープ処理を入れています。
動作確認
下記で実行します。
1 | $ python data_put_sqs.py |
- アカウントAのコピー元のDynamoDBの負荷状況をみてみます。
今回3000アイテムほどコピーしましたが、コピー元のDyanamoDBの読み込みキャパシティは低い値で一定した負荷でコピーすることができました。
(へこんでいる部分はテスト実行したところの間になります。)
- アカウントBのコピー先のDynamoDBの負荷状況をみてみます。
こちらも同様にコピー先のDyanamoDBの書き込みキャパシティは一定の低い負荷でコピーすることができました。
(へこんでいる部分はテスト実行したところの間になります。)
さいごに
いかがだったでしょうか。
Lambda+SQSを使ってDynamoDBの特定のデータを負荷をあげずに別アカウントのDynamoDBへコピー(移行)する仕組みを作ってみました。
誰かの参考になれば幸いです。