ELBとCloudFrontのアクセスログをサーバレスに集約させてみた
はじめに
AWSチームのすずきです。
ELB(ALB/CLB)とCloudFrontがS3に出力するアクセスログ、 その確認、解析を効率よく行うため、Firehoseを利用して集約する機会がありました。
Lambdaを利用したサーバレスなS3への集約の一例として紹介させて頂きます。
構成図
処理説明
1: S3
- ELBのログ格納用となるS3バケットです
- ELBのアクセスログ登録(PutObject)先として利用するためのバケットポリシーと、S3のレプリケーション先可能にするための、バージョニング設定を実施しています。
- 今回、既存のELBなどのログ出力設定の変更を回避するため、既存のアクセスログが出力されるS3バケットにレプリケーション設定を追加。レプリケーション先としてこのS3バケットを指定して利用しました。
2: SNS
- 1回のS3イベントから複数のLambda処理を実施する可能性や、イベント情報をデバッグ用に取得しやすくするため、Amazon SNSを利用しました。
- Amazon SNS、Lambdaへの配信課金は、無料で利用する事が可能です。
3: Lambda
- Lambda関数のコードは、CloudFormationのテンプレートにインライン記述できる4000文字制限に収まる実装としました。
- Lambda処理のタイムアウト時間は最大(300秒)まで延長。割り当てメモリは256MB、初期値の2倍に設定しました。
- 今回のFirehose投入を行うLambda関数の実装では、1ログファイルあたり30〜40万レコードが処理上限の目安となります。
- ELB(ALB)は5分毎にアクセスログをS3に出力する仕様です。1分あたり10万リクエストを超えるELBのアクセスログを処理する必要が有る場合、前処理としてファイル分割処理を行うLambdaの用意や、AWS Glueなどの利用をご検討ください。
Lamba割当メモリ別の所要時間
- ELB(ALB)アクセスログ、6万、30万レコードのファイルを用意、 Firehose投入用Lambda関数の割当メモリ別の所要時間(Duration)を測定しました
Memory Size | 6万レコード | 30万レコード |
---|---|---|
256 MB | 81秒 | (タイムアウト) |
512 MB | 53秒 | 269秒 |
3008 MB | 45秒 | 216秒 |
Lambda処理内容
- SNSのメッセージから、処理対象となるS3のバケット、キーを取得します。
- ログの時刻情報として期待するカラムのデータが日付フォーマットに一致しない場合や、カラム数が異常なレコードはエラーレコードとして除外します。
- 「request」カラムに含まれるURLを「urlparse」モジュールを利用してパースし、解析時に利用頻度の高い「HOST」や「PATH」を事前に抽出します。
- 「json」モジュールを利用してJSONエンコードを行い、Firehoseに対しバッチ転送を行います。
4: Firehose
- 今回、出力先はS3のみとしましたが、Amazon Elasticserach Service、Redshiftと連係も可能です。
- Firehoseのバッファ時間はELB(ALB)のログ出力間隔にあわせ、300秒としました。
- Firehoseの後処理をLambda関数で実装する可能性を想定し、ファイルサイズは一定(50MB)で抑止する指定を行いました。
5: S3
- Firehoseの出力先として利用します。
- Firehoseの出力先を対象としたイベントを設定し、後処理を行うLambda関数をSNS経由で連係させました。
6: SNS
- 後処理のLambda関数連係に利用します。
7: Lambda
- Firehoseで集約、S3に出力されたログファイルの後処理を実装しました。
- FirehoseよりS3に出力されたS3ファイルのキーに含まれる日付情報(「yyyy/mm/dd/HH24」)を、Hiveフォーマット(dt=yyyy-mm-dd-HH24/)に置換し、Athena処理用のS3にコピーします。
- Hiveフォーマットのキーを付与する事で、Athenaのスキャン対象を特定期間に限定でき、AWS利用費の抑制が可能になります。
Athenaの利用
テーブル作成
- JSONに変換済のアクセスログは、テーブル定義のみで利用する事が可能です。
- Firehoseの出力日時をLambdaで変換した「dt」を、パーティションとして利用する指定を行います。
ALB用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | CREATE EXTERNAL TABLE IF NOT EXISTS alb_logs ( type string, timestamp string, elb string, client_port string, target_port string, request_processing_time float, target_processing_time float, response_processing_time float, elb_status_code string, target_status_code string, received_bytes float, sent_bytes float, request string, user_agent string, ssl_cipher string, ssl_protocol string, target_group_arn string, trace_id string, domain_name string, chosen_cert_arn string, matched_rule_priority string, client string, target string, request_method string, request_uri string, request_http_version string, request_uri_scheme string, request_uri_host string, request_uri_path string ) PARTITIONED BY (dt string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' LOCATION 's3://<Athena用S3バケット名>/firehose/alb_logs/' ; |
CLB用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | CREATE EXTERNAL TABLE IF NOT EXISTS clb_logs ( timestamp string, elb string, client string, client_port string, backend string, backend_port string, request_processing_time float, backend_processing_time float, response_processing_time float, elb_status_code string, backend_status_code string, received_bytes float, sent_bytes float, request string, user_agent string, ssl_cipher string, ssl_protocol string, request_method string, request_uri string, request_http_version string, request_uri_scheme string, request_uri_host string, request_uri_path string ) PARTITIONED BY (dt string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' LOCATION 's3://<Athena用S3バケット名>/firehose/clb_logs/' ; |
CloudFront用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | CREATE EXTERNAL TABLE IF NOT EXISTS cloudfront_logs ( timestamp string, date string, time string, x_edge_location string, sc_bytes float, c_ip string, cs_method string, cs_host string, cs_uri_stem string, cs_status string, cs_referer string, cs_user_agent string, cs_uri_query string, cs_cookie string, x_edge_result_type string, x_edge_request_id string, x_host_header string, cs_protocol string, cs_bytes float, time_taken float, x_forwarded_for string, ssl_protocol string, ssl_cipher string, x_edge_response_result_type string, fle_status string, fle_encrypted_fields string ) PARTITIONED BY (dt string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' LOCATION 's3://<Athena用S3バケット名>/firehose/cloudfront_logs/' ; |
パーティション情報更新
- テーブル作成直後や新たに追加されたパーティションを解析対象とする場合、「MSCK REPAIR TABLE」を実行します。
1 | MSCK REPAIR TABLE alb_logs |
1 | MSCK REPAIR TABLE clb_logs |
1 | MSCK REPAIR TABLE cloudfront_logs |
解析
- Athenaのコンソールなどを利用し、SQLによるアクセスログ解析が可能になります。
- 「target」「backend」など、カラム名が異なる項目を利用しない解析では、SQLの流用可能です。
エラー応答の抽出
- ELBがHTTP応答500番のステータスコードを戻したログを抽出します
1 2 3 4 5 6 | SELECT * FROM alb_logs WHERE dt< '2018-05-16-15' and dt>='2018-05-14-15' and elb_status_code like '5%' order by timestamp limit 10; |
- 指定した期間の5XX応答が確認できました。
高負荷発生源の疑いのある接続元の抽出
- EC2応答時間(target_processing_time)の長い接続元IPアドレス(client)、UserAgentを抽出します
1 2 3 4 5 6 7 8 9 10 11 12 13 | SELECT sum(target_processing_time) as total_processing_time , count(1) as request_count , SUBSTR(timestamp,1,13) as date_time , client , user_agent FROM alb_logs WHERE dt< '2018-05-16-15' and dt>='2018-05-13-15' and request_uri_host = 'dev.classmethod.jp' and user_agent like '%bot%' GROUP BY user_agent, client, SUBSTR(timestamp,1,13) order by sum(target_processing_time) desc limit 10 |
- 指定した期間、UserAgentでBotを名乗る接続では、高負荷は発生していなかった事を確認できました。
AWSコスト
- 1日500万リクエストのELBアクセスログ、AWS東京リージョンのFirehose、Lambdaで利用した場合、1日あたりの費用は0.202USDの計算になりました。
- 多くの環境でAWSコストを圧迫する事なく利用できる可能性が高いと思われます。
内訳
Firehose
- 0.108($)
- 取り込みデータ、1GBあたりの単価は「0.036 USD」で計算
- 0.108($) = 3(GB) × 0.036($)
Lambda
- 無料枠は無い前提で計算しました。
- 0.0193364 ($)
- Firehose連係用実行時間
- 0.0188032($) = 4520000(ミリ秒) × (0.000000208 / 100):(1ミリ秒価格USD) × 2 (Lambda割当メモリ128→256増強分)
- Athenaコピー用実行時間
- 0.000314($) = 0.0188($) = 151000(ミリ秒) × (0.000000208 / 100):(1ミリ秒価格USD)
- Firehose連係用リクエスト
- 0.0001714($) = 857回 × (0.20($) / 1000000):(1回リクエスト単価)
- Firehose連係用リクエスト
- 0.0000478($) = 239回 × (0.20($) / 1000000):(1回リクエスト単価)
S3
- Firehoseから1日に出力されるログ(GZIP圧縮済)が、1日300MB。これをS3上で1ヶ月保持する想定でストレージ費用を計算しました。
- S3データの参照、PUT、GETなどのAPI費用は除外しました。
- 0.075 ($)
- 0.3GB (ログ1日分) × 0.025USD/GB(1GB保管)
まとめ
ELB(ALB)をのアクセスログをサーバレス、低コストで処理する一例を紹介させて頂きました。
アクセスログが非圧縮な状態でS3に出力されるELB(CLB)のアクセスログや、 S3に保存されるキーによる期間指定が困難、ログファイル中のヘッダ行の存在がAthena処理で問題になる事のあるCloudFrontのアクセスログなどでは、 より高い効果が期待出来るかと思われます。
今回、Firehoseで集約したアクセスログをAthenaで参照するまでを紹介させて頂きましたが、
- Firehoseのバッファ時間を利用した簡易タイムウィンドウ処理
- Firehoseの後にKinesisAnalyticsを設置、アクセスログのストリーム処理
- AWS Glueを利用した、アクセスログの長期利用を想定した最適化
などの応用も改めて紹介させて頂ければと思います。
CloudFormationテンプレート
ALB用
AWSTemplateFormatVersion: '2010-09-09'
Description: Convert ELB(alb) access log to JSON via Firehose (20180515)
Parameters:
S3Expiredate:
Description: Number of days to keep S3 file
Type: String
Default: 10
CWlogsExpiredate:
Description: Number of days to keep Cloudwatch logs
Type: String
Default: 3
S3AthenaExpiredate:
Description: Number of days to keep S3 file (Athena)
Type: String
Default: 45
Resources:
Step1S3Bucket:
Type: AWS::S3::Bucket
DeletionPolicy: Delete
Properties:
LifecycleConfiguration:
Rules:
- Id: AutoDelete
Status: Enabled
ExpirationInDays: !Ref 'S3Expiredate'
NotificationConfiguration:
TopicConfigurations:
- Event: s3:ObjectCreated:*
Filter:
S3Key:
Rules:
- Name: suffix
Value: .gz
Topic: !Ref 'Step2SnsTopic'
VersioningConfiguration:
Status: Enabled
Step1S3BucketPolicy:
Type: AWS::S3::BucketPolicy
Properties:
Bucket: !Ref 'Step1S3Bucket'
PolicyDocument:
Id: ElblogsBucketPolicy
Statement:
- Sid: AddPerm
Effect: Allow
Principal:
AWS:
- arn:aws:iam::582318560864:root
- arn:aws:iam::127311923021:root
- arn:aws:iam::033677994240:root
- arn:aws:iam::027434742980:root
- arn:aws:iam::797873946194:root
- arn:aws:iam::985666609251:root
- arn:aws:iam::054676820928:root
- arn:aws:iam::156460612806:root
- arn:aws:iam::652711504416:root
- arn:aws:iam::156460612806:root
- arn:aws:iam::009996457667:root
- arn:aws:iam::600734575887:root
- arn:aws:iam::383597477331:root
- arn:aws:iam::114774131450:root
- arn:aws:iam::797873946194:root
- arn:aws:iam::783225319266:root
- arn:aws:iam::718504428378:root
- arn:aws:iam::507241528517:root
Action:
- s3:PutObject
Resource:
- !Sub 'arn:aws:s3:::${Step1S3Bucket}/*'
Step2SnsTopicPolicy:
Type: AWS::SNS::TopicPolicy
Properties:
PolicyDocument:
Version: '2012-10-17'
Id: MyTopicPolicy
Statement:
- Sid: allow-publish-s3
Effect: Allow
Principal:
Service:
- s3.amazonaws.com
Action:
- sns:Publish
Resource: !Ref 'Step2SnsTopic'
Topics:
- !Ref 'Step2SnsTopic'
Step2SnsTopic:
Type: AWS::SNS::Topic
Properties:
DisplayName: !Sub 's3-logs-ObjectCreated'
Subscription:
- Endpoint: !GetAtt 'Step3LambdaFunction.Arn'
Protocol: lambda
Step3LambdaLambdaPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref 'Step3LambdaFunction'
Action: lambda:InvokeFunction
Principal: sns.amazonaws.com
SourceArn: !Ref 'Step2SnsTopic'
Step3LogGroupLambda:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/lambda/${Step3LambdaFunction}'
RetentionInDays: !Ref 'CWlogsExpiredate'
Step3LambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: /
Policies:
- PolicyName: root
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'
- Effect: Allow
Action:
- sns:Publish
Resource: '*'
- Effect: Allow
Action:
- s3:GetObject
Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
- Effect: Allow
Action:
- firehose:PutRecordBatch
Resource: !GetAtt 'Step4deliverystream.Arn'
- Effect: Allow
Action:
- sqs:SendMessage
Resource: !GetAtt [Step3SqsDeadLetterQueue, Arn]
Step3SqsDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
MessageRetentionPeriod: 1209600
Step3LambdaFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.lambda_handler
Role: !GetAtt 'Step3LambdaRole.Arn'
DeadLetterConfig:
TargetArn: !GetAtt [Step3SqsDeadLetterQueue, Arn]
Code:
ZipFile: !Sub |
import boto3
import json
import os
import urllib.parse
import gzip
from datetime import datetime
import base64
import re
s3 = boto3.client('s3')
firehose = boto3.client('firehose')
def lambda_handler(event, context):
a = parse_s3_event(event)
bucket_name = a['bucket_name']
key = a['key']
# Process ALB log (gzip)
if (re.match('.*.gz$', key)):
response =s3.get_object(Bucket=bucket_name, Key=key)
body = gzip.decompress(response['Body'].read()).decode('utf-8').splitlines()
if len(body) > 0:
process_log(body)
def parse_s3_event(event):
a = json.loads(event['Records'][0]['Sns']['Message'])
z = {}
z['bucket_name'] = a['Records'][0]['s3']['bucket']['name']
z['key'] = urllib.parse.unquote_plus(a['Records'][0]['s3']['object']['key'], encoding='utf-8')
return z
def process_log(data):
i = 0
c = []
for a in data:
b = parse_log(a)
if b is not None:
c.append({'Data': b})
i = i + 1
if i == 100:
PutRecordBatchFirehose(c)
i = 0
c = []
if len(c) > 0:
PutRecordBatchFirehose(c)
def parse_log(line):
z = {}
a = line.split('"')
b = a[0].split(' ')
# ALB Log
if len(b) == 13:
if b[2].split('/')[0] == 'app':
if (re.match('[0-9]...-[0-9].-[0-9].T[0-9].:[0-9].:[0-9].\.[[0-9]*Z' , b[1])):
z = parse_alb_log(a)
#Column check (number)
if len(z) > 20:
#print(z)
return json.dumps(z) + "\n"
def parse_alb_log(a):
z = {}
b = a[0].split(' ')
# ALB Log
z["type"] = b[0]
z["timestamp"] = b[1]
z["elb"] = b[2]
if len(b[3].split(':')) > 1:
z["client"] = b[3].split(':')[0]
z["client_port"] = b[3].split(':')[1]
if len(b[4].split(':')) > 1:
z["target"] = b[4].split(':')[0]
z["target_port"] = b[4].split(':')[1]
z["request_processing_time"] = float(b[5])
z["target_processing_time"] = float(b[6])
z["response_processing_time"] = float(b[7])
z["elb_status_code"] = b[8]
z["target_status_code"] = b[9]
z["received_bytes"] = float(b[10])
z["sent_bytes"] = float(b[11])
z["request"] = a[1]
z["user_agent"] = a[3]
c = a[4].split(' ')
if len(c) == 5:
z["ssl_cipher"] = c[1]
z["ssl_protocol"] = c[2]
z["target_group_arn"] = c[3]
z["trace_id"] = a[5]
z["domain_name"] = a[7]
z["chosen_cert_arn"] = a[9]
z["matched_rule_priority"] = a[10]
if len(a) > 10:
d = a[10].split(' ')
if len(d) > 1:
z["matched_rule_priority"] = d[1]
if len(z["request"].split(' ')) > 2:
z["request_method"] = z["request"].split(' ')[0]
z["request_uri"] = z["request"].split(' ')[1]
z["request_http_version"] = z["request"].split(' ')[2]
if z["request_method"] != '-' :
e = urllib.parse.urlparse(z["request_uri"])
z["request_uri_scheme"] = e.scheme
z["request_uri_user"] = e.username
z["request_uri_host"] = e.hostname
z["request_uri_port"] = e.port
z["request_uri_path"] = e.path
z["request_uri_query"] = e.query
z["request_uri_fragment"] = e.fragment
else:
z["request_uri_scheme"] = z["request_uri"].split(':')[0]
z["request_uri_user"] = ''
z["request_uri_host"] = z["request_uri"].split('/')[2].split(':')[0]
z["request_uri_port"] = z["request_uri"].split(':')[2].split('-')[0]
z["request_uri_path"] = ''
return z
def PutRecordBatchFirehose(data):
firehose_stream_name = os.environ['firehose_stream_name']
r = firehose.put_record_batch(
DeliveryStreamName = firehose_stream_name,
Records = data
)
#print(str(data))
#print(str(r["ResponseMetadata"]["HTTPHeaders"]))
Runtime: python3.6
MemorySize: 256
Timeout: 300
Description: alb accesslog S3 to firehose
Environment:
Variables:
firehose_stream_name: !Ref 'Step4deliverystream'
Tags:
- Key: CloudformationArn
Value: !Ref 'AWS::StackId'
Step4deliverystream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
ExtendedS3DestinationConfiguration:
BucketARN: !Sub 'arn:aws:s3:::${Step5S3Bucket}'
BufferingHints:
IntervalInSeconds: '300'
SizeInMBs: '50'
CompressionFormat: GZIP
Prefix: firehose/alb_logs/
RoleARN: !GetAtt 'Step4deliveryRole.Arn'
ProcessingConfiguration:
Enabled: 'false'
Step4deliveryRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: ''
Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
Condition:
StringEquals:
sts:ExternalId: !Ref 'AWS::AccountId'
Step4deliveryPolicy:
Type: AWS::IAM::Policy
Properties:
PolicyName: firehose_delivery_policy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !Sub 'arn:aws:s3:::${Step5S3Bucket}'
- !Sub 'arn:aws:s3:::${Step5S3Bucket}*'
Roles:
- !Ref 'Step4deliveryRole'
Step4LogGroupFirehose:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/firehose/${Step4deliverystream}'
RetentionInDays: !Ref 'CWlogsExpiredate'
Step5S3Bucket:
Type: AWS::S3::Bucket
DeletionPolicy: Delete
Properties:
LifecycleConfiguration:
Rules:
- Id: AutoDelete
Status: Enabled
ExpirationInDays: !Ref 'S3Expiredate'
NotificationConfiguration:
TopicConfigurations:
- Event: s3:ObjectCreated:*
Filter:
S3Key:
Rules:
- Name: suffix
Value: .gz
- Name: prefix
Value: firehose/alb_logs/
Topic: !Ref 'Step6SnsTopic'
VersioningConfiguration:
Status: Enabled
Step6SnsTopicPolicy:
Type: AWS::SNS::TopicPolicy
Properties:
PolicyDocument:
Version: '2012-10-17'
Id: MyTopicPolicy
Statement:
- Sid: allow-publish-s3
Effect: Allow
Principal:
Service:
- s3.amazonaws.com
Action:
- sns:Publish
Resource: !Ref 'Step6SnsTopic'
Topics:
- !Ref 'Step6SnsTopic'
Step6SnsTopic:
Type: AWS::SNS::Topic
Properties:
DisplayName: s3-trigger-firehose-output
Subscription:
- Endpoint: !GetAtt 'Step7LambdaFunction.Arn'
Protocol: lambda
Step7LambdaPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref 'Step7LambdaFunction'
Action: lambda:InvokeFunction
Principal: sns.amazonaws.com
SourceArn: !Ref 'Step6SnsTopic'
Step7LogGroupLambda:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/lambda/${Step7LambdaFunction}'
RetentionInDays: 14
Step7LambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: /
Policies:
- PolicyName: root
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'
- Effect: Allow
Action:
- sns:Publish
Resource: '*'
- Effect: Allow
Action:
- s3:GetObject
Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
- Effect: Allow
Action:
- s3:PutObject
Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
- Effect: Allow
Action:
- sqs:SendMessage
Resource: !GetAtt [Step7SqsDeadLetterQueue, Arn]
Step7SqsDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
MessageRetentionPeriod: 1209600
Step7LambdaFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.lambda_handler
DeadLetterConfig:
TargetArn: !GetAtt [Step7SqsDeadLetterQueue, Arn]
Role: !GetAtt 'Step7LambdaRole.Arn'
Code:
ZipFile: !Sub |
import boto3
import json
import os
import urllib.parse
def lambda_handler(event, context):
z = parse_s3_event(event)
bucket_name = z['bucket_name']
key = z['key']
new_key = get_key_with_partition(key)
new_bucket = os.environ['s3_bucket']
s3 = boto3.client('s3')
r = s3.copy_object(Bucket=new_bucket, Key=new_key, CopySource={'Bucket': bucket_name, 'Key': key})
def parse_s3_event(event):
a = json.loads(event['Records'][0]['Sns']['Message'])
z = {}
z['bucket_name'] = a['Records'][0]['s3']['bucket']['name']
z['key'] = urllib.parse.unquote_plus(a['Records'][0]['s3']['object']['key'], encoding='utf-8')
return z
def get_key_with_partition(key):
a = key.split('/')
a.reverse()
z = {}
z['filename'] = a[0]
z['hour'] = a[1]
z['day'] = a[2]
z['month'] = a[3]
z['year'] = a[4]
z['prefix'] = get_key_prefix(key)
f = z['prefix'] + '/' + 'dt=' + z['year'] + '-' + z['month'] + '-' + z['day'] + '-' + z['hour'] + '/' + z['filename']
return f
def get_key_prefix(key):
a = key.split('/')
b = len(a) - 5
d = []
for c in a[:b]:
d.append(c)
e = '/'.join(d)
return e
Runtime: python3.6
MemorySize: 128
Timeout: 300
Description: Copy the S3 file output by Firehose for Athena (with partition)
Environment:
Variables:
CfnStackName: !Sub '${AWS::StackName}'
s3_bucket: !Ref 'Step8S3Bucket'
Tags:
- Key: CloudformationArn
Value: !Ref 'AWS::StackId'
Step8S3Bucket:
Type: AWS::S3::Bucket
DeletionPolicy: Delete
Properties:
LifecycleConfiguration:
Rules:
- Id: AutoDelete
Status: Enabled
ExpirationInDays: !Ref 'S3AthenaExpiredate'
Outputs:
S3BucketSource:
Value: !Ref 'Step1S3Bucket'
S3BucketJson:
Value: !Ref 'Step5S3Bucket'
S3BucketAthena:
Value: !Ref 'Step8S3Bucket'
CLB用
AWSTemplateFormatVersion: '2010-09-09'
Description: Convert ELB(clb) access log to JSON via Firehose (20180517)
Parameters:
S3Expiredate:
Description: Number of days to keep S3 file (S3 TTL)
Type: String
Default: 10
CWlogsExpiredate:
Description: Number of days to keep Cloudwatch logs (logs TTL)
Type: String
Default: 3
S3AthenaExpiredate:
Description: Number of days to keep S3 file (Athena)
Type: String
Default: 45
Resources:
Step1S3Bucket:
Type: AWS::S3::Bucket
DeletionPolicy: Delete
Properties:
LifecycleConfiguration:
Rules:
- Id: AutoDelete
Status: Enabled
ExpirationInDays: !Ref 'S3Expiredate'
NotificationConfiguration:
TopicConfigurations:
- Event: s3:ObjectCreated:*
Filter:
S3Key:
Rules:
- Name: suffix
Value: log
Topic: !Ref 'Step2SnsTopic'
VersioningConfiguration:
Status: Enabled
Step1S3BucketPolicy:
Type: AWS::S3::BucketPolicy
Properties:
Bucket: !Ref 'Step1S3Bucket'
PolicyDocument:
Id: ElblogsBucketPolicy
Statement:
- Sid: AddPerm
Effect: Allow
Principal:
AWS:
- arn:aws:iam::582318560864:root
- arn:aws:iam::127311923021:root
- arn:aws:iam::033677994240:root
- arn:aws:iam::027434742980:root
- arn:aws:iam::797873946194:root
- arn:aws:iam::985666609251:root
- arn:aws:iam::054676820928:root
- arn:aws:iam::156460612806:root
- arn:aws:iam::652711504416:root
- arn:aws:iam::156460612806:root
- arn:aws:iam::009996457667:root
- arn:aws:iam::600734575887:root
- arn:aws:iam::383597477331:root
- arn:aws:iam::114774131450:root
- arn:aws:iam::797873946194:root
- arn:aws:iam::783225319266:root
- arn:aws:iam::718504428378:root
- arn:aws:iam::507241528517:root
Action:
- s3:PutObject
Resource:
- !Sub 'arn:aws:s3:::${Step1S3Bucket}/*'
Step2SnsTopicPolicy:
Type: AWS::SNS::TopicPolicy
Properties:
PolicyDocument:
Version: '2012-10-17'
Id: MyTopicPolicy
Statement:
- Sid: allow-publish-s3
Effect: Allow
Principal:
Service:
- s3.amazonaws.com
Action:
- sns:Publish
Resource: !Ref 'Step2SnsTopic'
Topics:
- !Ref 'Step2SnsTopic'
Step2SnsTopic:
Type: AWS::SNS::Topic
Properties:
DisplayName: !Sub 's3-logs-ObjectCreated'
Subscription:
- Endpoint: !GetAtt 'Step3LambdaFunction.Arn'
Protocol: lambda
Step3LambdaLambdaPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref 'Step3LambdaFunction'
Action: lambda:InvokeFunction
Principal: sns.amazonaws.com
SourceArn: !Ref 'Step2SnsTopic'
Step3LogGroupLambda:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/lambda/${Step3LambdaFunction}'
RetentionInDays: !Ref 'CWlogsExpiredate'
Step3LambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: /
Policies:
- PolicyName: root
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'
- Effect: Allow
Action:
- sns:Publish
Resource: '*'
- Effect: Allow
Action:
- s3:GetObject
Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
- Effect: Allow
Action:
- firehose:PutRecordBatch
Resource: !GetAtt 'Step4deliverystream.Arn'
- Effect: Allow
Action:
- sqs:SendMessage
Resource: !GetAtt [Step3SqsDeadLetterQueue, Arn]
Step3LambdaFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.lambda_handler
Role: !GetAtt 'Step3LambdaRole.Arn'
DeadLetterConfig:
TargetArn: !GetAtt [Step3SqsDeadLetterQueue, Arn]
Code:
ZipFile: !Sub |
import boto3
import json
import os
import urllib.parse
import gzip
from datetime import datetime
import base64
import re
s3 = boto3.client('s3')
firehose = boto3.client('firehose')
def lambda_handler(event, context):
a = parse_s3_event(event)
bucket_name = a['bucket_name']
key = a['key']
# Process CLB log (.log)
if (re.match('.*.log$', key)):
response =s3.get_object(Bucket=bucket_name, Key=key)
body = response['Body'].read().decode('utf-8').splitlines()
if len(body) > 0:
process_log(body)
def parse_s3_event(event):
a = json.loads(event['Records'][0]['Sns']['Message'])
z = {}
z['bucket_name'] = a['Records'][0]['s3']['bucket']['name']
z['key'] = urllib.parse.unquote_plus(a['Records'][0]['s3']['object']['key'], encoding='utf-8')
return z
def process_log(data):
i = 0
c = []
for a in data:
b = parse_log(a)
if b is not None:
c.append({'Data': b})
i = i + 1
if i == 100:
PutRecordBatchFirehose(c)
i = 0
c = []
if len(c) > 0:
PutRecordBatchFirehose(c)
def parse_log(line):
z = {}
a = line.split('"')
b = a[0].split(' ')
# CLB Log
if len(b) == 12:
if (re.match('[0-9]...-[0-9].-[0-9].T[0-9].:[0-9].:[0-9].\.[[0-9]*Z' , b[0])):
z = parse_clb_log(a)
#Column check (number)
if len(z) > 20:
#print(z)
return json.dumps(z) + "\n"
def parse_clb_log(a):
z = {}
b = a[0].split(' ')
# CLB Log
z["timestamp"] = b[0]
z["elb"] = b[1]
if len(b[2].split(':')) > 1:
z["client"] = b[2].split(':')[0]
z["client_port"] = b[2].split(':')[1]
if len(b[3].split(':')) > 1:
z["backend"] = b[3].split(':')[0]
z["backend_port"] = b[3].split(':')[1]
z["request_processing_time"] = float(b[4])
z["backend_processing_time"] = float(b[5])
z["response_processing_time"] = float(b[6])
z["elb_status_code"] = b[7]
z["backend_status_code"] = b[8]
z["received_bytes"] = float(b[9])
z["sent_bytes"] = float(b[10])
z["request"] = a[1]
z["user_agent"] = a[3]
c = a[4].split(' ')
if len(c) == 2:
z["ssl_cipher"] = c[1]
z["ssl_protocol"] = c[2]
if len(b[3].split(':')) > 1:
z["client_port"] = b[3].split(':')[1]
z["client_port"] = b[3].split(':')[1]
if len(z["request"].split(' ')) > 2:
z["request_method"] = z["request"].split(' ')[0]
z["request_uri"] = z["request"].split(' ')[1]
z["request_http_version"] = z["request"].split(' ')[2]
if z["request_method"] != '-' :
e = urllib.parse.urlparse(z["request_uri"])
z["request_uri_scheme"] = e.scheme
z["request_uri_user"] = e.username
z["request_uri_host"] = e.hostname
z["request_uri_port"] = e.port
z["request_uri_path"] = e.path
z["request_uri_query"] = e.query
z["request_uri_fragment"] = e.fragment
return z
def PutRecordBatchFirehose(data):
firehose_stream_name = os.environ['firehose_stream_name']
r = firehose.put_record_batch(
DeliveryStreamName = firehose_stream_name,
Records = data
)
#print(str(data))
#print(str(r["ResponseMetadata"]["HTTPHeaders"]))
Runtime: python3.6
MemorySize: 256
Timeout: 300
Description: clb accesslog S3 to firehose
Environment:
Variables:
firehose_stream_name: !Ref 'Step4deliverystream'
Tags:
- Key: CloudformationArn
Value: !Ref 'AWS::StackId'
Step3SqsDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
MessageRetentionPeriod: 1209600
Step4deliverystream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
ExtendedS3DestinationConfiguration:
BucketARN: !Sub 'arn:aws:s3:::${Step5S3Bucket}'
BufferingHints:
IntervalInSeconds: '300'
SizeInMBs: '50'
CompressionFormat: GZIP
Prefix: firehose/clb_logs/
RoleARN: !GetAtt 'Step4deliveryRole.Arn'
ProcessingConfiguration:
Enabled: 'false'
Step4deliveryRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: ''
Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
Condition:
StringEquals:
sts:ExternalId: !Ref 'AWS::AccountId'
Step4deliveryPolicy:
Type: AWS::IAM::Policy
Properties:
PolicyName: firehose_delivery_policy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !Sub 'arn:aws:s3:::${Step5S3Bucket}'
- !Sub 'arn:aws:s3:::${Step5S3Bucket}*'
Roles:
- !Ref 'Step4deliveryRole'
Step4LogGroupFirehose:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/firehose/${Step4deliverystream}'
RetentionInDays: !Ref 'CWlogsExpiredate'
Step5S3Bucket:
Type: AWS::S3::Bucket
DeletionPolicy: Delete
Properties:
LifecycleConfiguration:
Rules:
- Id: AutoDelete
Status: Enabled
ExpirationInDays: !Ref 'S3Expiredate'
NotificationConfiguration:
TopicConfigurations:
- Event: s3:ObjectCreated:*
Filter:
S3Key:
Rules:
- Name: suffix
Value: .gz
- Name: prefix
Value: firehose/alb_logs/
Topic: !Ref 'Step6SnsTopic'
VersioningConfiguration:
Status: Enabled
Step6SnsTopicPolicy:
Type: AWS::SNS::TopicPolicy
Properties:
PolicyDocument:
Version: '2012-10-17'
Id: MyTopicPolicy
Statement:
- Sid: allow-publish-s3
Effect: Allow
Principal:
Service:
- s3.amazonaws.com
Action:
- sns:Publish
Resource: !Ref 'Step6SnsTopic'
Topics:
- !Ref 'Step6SnsTopic'
Step6SnsTopic:
Type: AWS::SNS::Topic
Properties:
DisplayName: s3-trigger-firehose-output
Subscription:
- Endpoint: !GetAtt 'Step7LambdaFunction.Arn'
Protocol: lambda
Step7LambdaPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref 'Step7LambdaFunction'
Action: lambda:InvokeFunction
Principal: sns.amazonaws.com
SourceArn: !Ref 'Step6SnsTopic'
Step7LogGroupLambda:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/lambda/${Step7LambdaFunction}'
RetentionInDays: 14
Step7LambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: /
Policies:
- PolicyName: root
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'
- Effect: Allow
Action:
- sns:Publish
Resource: '*'
- Effect: Allow
Action:
- s3:GetObject
Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
- Effect: Allow
Action:
- s3:PutObject
Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
- Effect: Allow
Action:
- sqs:SendMessage
Resource: !GetAtt [Step7SqsDeadLetterQueue, Arn]
Step7SqsDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
MessageRetentionPeriod: 1209600
Step7LambdaFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.lambda_handler
DeadLetterConfig:
TargetArn: !GetAtt [Step7SqsDeadLetterQueue, Arn]
Role: !GetAtt 'Step7LambdaRole.Arn'
Code:
ZipFile: !Sub |
import boto3
import json
import os
import urllib.parse
def lambda_handler(event, context):
z = parse_s3_event(event)
bucket_name = z['bucket_name']
key = z['key']
new_key = get_key_with_partition(key)
new_bucket = os.environ['s3_bucket']
s3 = boto3.client('s3')
r = s3.copy_object(Bucket=new_bucket, Key=new_key, CopySource={'Bucket': bucket_name, 'Key': key})
def parse_s3_event(event):
a = json.loads(event['Records'][0]['Sns']['Message'])
z = {}
z['bucket_name'] = a['Records'][0]['s3']['bucket']['name']
z['key'] = urllib.parse.unquote_plus(a['Records'][0]['s3']['object']['key'], encoding='utf-8')
return z
def get_key_with_partition(key):
a = key.split('/')
a.reverse()
z = {}
z['filename'] = a[0]
z['hour'] = a[1]
z['day'] = a[2]
z['month'] = a[3]
z['year'] = a[4]
z['prefix'] = get_key_prefix(key)
f = z['prefix'] + '/' + 'dt=' + z['year'] + '-' + z['month'] + '-' + z['day'] + '-' + z['hour'] + '/' + z['filename']
return f
def get_key_prefix(key):
a = key.split('/')
b = len(a) - 5
d = []
for c in a[:b]:
d.append(c)
e = '/'.join(d)
return e
Runtime: python3.6
MemorySize: 128
Timeout: 300
Description: Copy the S3 file output by Firehose for Athena (with partition)
Environment:
Variables:
CfnStackName: !Sub '${AWS::StackName}'
s3_bucket: !Ref 'Step8S3Bucket'
Tags:
- Key: CloudformationArn
Value: !Ref 'AWS::StackId'
Step8S3Bucket:
Type: AWS::S3::Bucket
DeletionPolicy: Delete
Properties:
LifecycleConfiguration:
Rules:
- Id: AutoDelete
Status: Enabled
ExpirationInDays: !Ref 'S3AthenaExpiredate'
Outputs:
S3BucketSource:
Value: !Ref 'Step1S3Bucket'
S3BucketJson:
Value: !Ref 'Step5S3Bucket'
S3BucketAthena:
Value: !Ref 'Step8S3Bucket'
CloudFront用
AWSTemplateFormatVersion: '2010-09-09'
Description: Convert CloudFront access log to JSON via Firehose (20180514)
Parameters:
S3Expiredate:
Description: Number of days to keep S3 file (S3 TTL)
Type: String
Default: 10
CWlogsExpiredate:
Description: Number of days to keep Cloudwatch logs (logs TTL)
Type: String
Default: 3
S3AthenaExpiredate:
Description: Number of days to keep S3 file (Athena)
Type: String
Default: 45
Resources:
Step1S3Bucket:
Type: AWS::S3::Bucket
DeletionPolicy: Delete
Properties:
LifecycleConfiguration:
Rules:
- Id: AutoDelete
Status: Enabled
ExpirationInDays: !Ref 'S3Expiredate'
NotificationConfiguration:
TopicConfigurations:
- Event: s3:ObjectCreated:*
Filter:
S3Key:
Rules:
- Name: suffix
Value: gz
Topic: !Ref 'Step2SnsTopic'
VersioningConfiguration:
Status: Enabled
Step2SnsTopicPolicy:
Type: AWS::SNS::TopicPolicy
Properties:
PolicyDocument:
Version: '2012-10-17'
Id: MyTopicPolicy
Statement:
- Sid: allow-publish-s3
Effect: Allow
Principal:
Service:
- s3.amazonaws.com
Action:
- sns:Publish
Resource: !Ref 'Step2SnsTopic'
Topics:
- !Ref 'Step2SnsTopic'
Step2SnsTopic:
Type: AWS::SNS::Topic
Properties:
DisplayName: !Sub 's3-logs-ObjectCreated'
Subscription:
- Endpoint: !GetAtt 'Step3LambdaFunction.Arn'
Protocol: lambda
Step3LambdaLambdaPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref 'Step3LambdaFunction'
Action: lambda:InvokeFunction
Principal: sns.amazonaws.com
SourceArn: !Ref 'Step2SnsTopic'
Step3LogGroupLambda:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/lambda/${Step3LambdaFunction}'
RetentionInDays: !Ref 'CWlogsExpiredate'
Step3LambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: /
Policies:
- PolicyName: root
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'
- Effect: Allow
Action:
- sns:Publish
Resource: '*'
- Effect: Allow
Action:
- s3:GetObject
Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
- Effect: Allow
Action:
- firehose:PutRecordBatch
Resource: !GetAtt 'Step4deliverystream.Arn'
- Effect: Allow
Action:
- sqs:SendMessage
Resource: !GetAtt [Step3SqsDeadLetterQueue, Arn]
Step3LambdaFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.lambda_handler
Role: !GetAtt 'Step3LambdaRole.Arn'
DeadLetterConfig:
TargetArn: !GetAtt [Step3SqsDeadLetterQueue, Arn]
Code:
ZipFile: !Sub |
import boto3
import json
import os
import urllib.parse
import gzip
from datetime import datetime
import base64
import re
s3 = boto3.client('s3')
firehose = boto3.client('firehose')
def lambda_handler(event, context):
a = parse_s3_event(event)
bucket_name = a['bucket_name']
key = a['key']
# Process CloudFront log (.gz)
if (re.match('.*.gz$', key)):
response =s3.get_object(Bucket=bucket_name, Key=key)
body = gzip.decompress(response['Body'].read()).decode('utf-8').splitlines()
if len(body) > 0:
process_log(body)
def parse_s3_event(event):
a = json.loads(event['Records'][0]['Sns']['Message'])
z = {}
z['bucket_name'] = a['Records'][0]['s3']['bucket']['name']
z['key'] = urllib.parse.unquote_plus(a['Records'][0]['s3']['object']['key'], encoding='utf-8')
return z
def process_log(data):
i = 0
c = []
for a in data:
b = parse_log(a)
if b is not None:
c.append({'Data': b})
i = i + 1
if i == 100:
PutRecordBatchFirehose(c)
i = 0
c = []
if len(c) > 0:
PutRecordBatchFirehose(c)
def parse_log(line):
z = {}
a = line.split('\t')
# cloudfront Log
if len(a) > 25:
if (re.match('[0-9]...-[0-9].-[0-9].' , a[0])):
if (re.match('[0-9].:[0-9].:[0-9].' , a[1])):
z = parse_cloudfront_log(a)
#Column check (number)
if len(z) > 25:
#print(z)
return json.dumps(z) + "\n"
def parse_cloudfront_log(b):
z = {}
z["date"] = b[0]
z["time"] = b[1]
z["x_edge_location"] = b[2]
z["sc_bytes"] = float(b[3])
z["c_ip"] = b[4]
z["cs_method"] = b[5]
z["cs_host"] = b[6]
z["cs_uri_stem"] = b[7]
z["sc_status"] = b[8]
z["cs_referer"] = b[9]
z["cs_user_agent"] = b[10]
z["cs_uri_query"] = b[11]
z["cs_cookie"] = b[12]
z["x_edge_result_type"] = b[13]
z["x_edge_request_id"] = b[14]
z["x_host_header"] = b[15]
z["cs_protocol"] = b[16]
z["cs_bytes"] = float(b[17])
z["time_taken"] = float(b[18])
z["x_forwarded_for"] = b[19]
z["ssl_protocol"] = b[20]
z["ssl_cipher"] = b[21]
z["x_edge_response_result_type"] = b[22]
z["cs_protocol_version"] = b[23]
z["fle_status"] = b[3]
z["fle_encrypted_fields"] = b[3]
z["timestamp"] = z["date"] + 'T' + z["time"] + '.000000Z'
return z
def PutRecordBatchFirehose(data):
firehose_stream_name = os.environ['firehose_stream_name']
r = firehose.put_record_batch(
DeliveryStreamName = firehose_stream_name,
Records = data
)
#print(str(data))
#print(str(r["ResponseMetadata"]["HTTPHeaders"]))
Runtime: python3.6
MemorySize: 256
Timeout: 300
Description: CloudFront accesslog S3 to firehose
Environment:
Variables:
firehose_stream_name: !Ref 'Step4deliverystream'
Tags:
- Key: CloudformationArn
Value: !Ref 'AWS::StackId'
Step3SqsDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
MessageRetentionPeriod: 1209600
Step4deliverystream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
ExtendedS3DestinationConfiguration:
BucketARN: !Sub 'arn:aws:s3:::${Step5S3Bucket}'
BufferingHints:
IntervalInSeconds: '300'
SizeInMBs: '50'
CompressionFormat: GZIP
Prefix: firehose/cloudfront_logs/
RoleARN: !GetAtt 'Step4deliveryRole.Arn'
ProcessingConfiguration:
Enabled: 'false'
Step4deliveryRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Sid: ''
Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: sts:AssumeRole
Condition:
StringEquals:
sts:ExternalId: !Ref 'AWS::AccountId'
Step4deliveryPolicy:
Type: AWS::IAM::Policy
Properties:
PolicyName: firehose_delivery_policy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:AbortMultipartUpload
- s3:GetBucketLocation
- s3:GetObject
- s3:ListBucket
- s3:ListBucketMultipartUploads
- s3:PutObject
Resource:
- !Sub 'arn:aws:s3:::${Step5S3Bucket}'
- !Sub 'arn:aws:s3:::${Step5S3Bucket}*'
Roles:
- !Ref 'Step4deliveryRole'
Step4LogGroupFirehose:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/firehose/${Step4deliverystream}'
RetentionInDays: !Ref 'CWlogsExpiredate'
Step5S3Bucket:
Type: AWS::S3::Bucket
DeletionPolicy: Delete
Properties:
LifecycleConfiguration:
Rules:
- Id: AutoDelete
Status: Enabled
ExpirationInDays: !Ref 'S3Expiredate'
NotificationConfiguration:
TopicConfigurations:
- Event: s3:ObjectCreated:*
Filter:
S3Key:
Rules:
- Name: suffix
Value: .gz
- Name: prefix
Value: firehose/cloudfront_logs/
Topic: !Ref 'Step6SnsTopic'
VersioningConfiguration:
Status: Enabled
Step6SnsTopicPolicy:
Type: AWS::SNS::TopicPolicy
Properties:
PolicyDocument:
Version: '2012-10-17'
Id: MyTopicPolicy
Statement:
- Sid: allow-publish-s3
Effect: Allow
Principal:
Service:
- s3.amazonaws.com
Action:
- sns:Publish
Resource: !Ref 'Step6SnsTopic'
Topics:
- !Ref 'Step6SnsTopic'
Step6SnsTopic:
Type: AWS::SNS::Topic
Properties:
DisplayName: s3-trigger-firehose-output
Subscription:
- Endpoint: !GetAtt 'Step7LambdaFunction.Arn'
Protocol: lambda
Step7LambdaPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref 'Step7LambdaFunction'
Action: lambda:InvokeFunction
Principal: sns.amazonaws.com
SourceArn: !Ref 'Step6SnsTopic'
Step7LogGroupLambda:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: !Sub '/aws/lambda/${Step7LambdaFunction}'
RetentionInDays: 14
Step7LambdaRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
Path: /
Policies:
- PolicyName: root
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'
- Effect: Allow
Action:
- sns:Publish
Resource: '*'
- Effect: Allow
Action:
- s3:GetObject
Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
- Effect: Allow
Action:
- s3:PutObject
Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
- Effect: Allow
Action:
- sqs:SendMessage
Resource: !GetAtt [Step7SqsDeadLetterQueue, Arn]
Step7SqsDeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
MessageRetentionPeriod: 1209600
Step7LambdaFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.lambda_handler
DeadLetterConfig:
TargetArn: !GetAtt [Step7SqsDeadLetterQueue, Arn]
Role: !GetAtt 'Step7LambdaRole.Arn'
Code:
ZipFile: !Sub |
import boto3
import json
import os
import urllib.parse
def lambda_handler(event, context):
z = parse_s3_event(event)
bucket_name = z['bucket_name']
key = z['key']
new_key = get_key_with_partition(key)
new_bucket = os.environ['s3_bucket']
s3 = boto3.client('s3')
r = s3.copy_object(Bucket=new_bucket, Key=new_key, CopySource={'Bucket': bucket_name, 'Key': key})
def parse_s3_event(event):
a = json.loads(event['Records'][0]['Sns']['Message'])
z = {}
z['bucket_name'] = a['Records'][0]['s3']['bucket']['name']
z['key'] = urllib.parse.unquote_plus(a['Records'][0]['s3']['object']['key'], encoding='utf-8')
return z
def get_key_with_partition(key):
a = key.split('/')
a.reverse()
z = {}
z['filename'] = a[0]
z['hour'] = a[1]
z['day'] = a[2]
z['month'] = a[3]
z['year'] = a[4]
z['prefix'] = get_key_prefix(key)
f = z['prefix'] + '/' + 'dt=' + z['year'] + '-' + z['month'] + '-' + z['day'] + '-' + z['hour'] + '/' + z['filename']
return f
def get_key_prefix(key):
a = key.split('/')
b = len(a) - 5
d = []
for c in a[:b]:
d.append(c)
e = '/'.join(d)
return e
Runtime: python3.6
MemorySize: 128
Timeout: 300
Description: Copy the S3 file output by Firehose for Athena (with partition)
Environment:
Variables:
CfnStackName: !Sub '${AWS::StackName}'
s3_bucket: !Ref 'Step8S3Bucket'
Tags:
- Key: CloudformationArn
Value: !Ref 'AWS::StackId'
Step8S3Bucket:
Type: AWS::S3::Bucket
DeletionPolicy: Delete
Properties:
LifecycleConfiguration:
Rules:
- Id: AutoDelete
Status: Enabled
ExpirationInDays: !Ref 'S3AthenaExpiredate'
Outputs:
S3BucketSource:
Value: !Ref 'Step1S3Bucket'
S3BucketJson:
Value: !Ref 'Step5S3Bucket'
S3BucketAthena:
Value: !Ref 'Step8S3Bucket'