ELBとCloudFrontのアクセスログをサーバレスに集約させてみた

はじめに

AWSチームのすずきです。

ELB(ALB/CLB)とCloudFrontがS3に出力するアクセスログ、 その確認、解析を効率よく行うため、Firehoseを利用して集約する機会がありました。

Lambdaを利用したサーバレスなS3への集約の一例として紹介させて頂きます。

構成図

処理説明

1: S3

  • ELBのログ格納用となるS3バケットです
  • ELBのアクセスログ登録(PutObject)先として利用するためのバケットポリシーと、S3のレプリケーション先可能にするための、バージョニング設定を実施しています。
  • 今回、既存のELBなどのログ出力設定の変更を回避するため、既存のアクセスログが出力されるS3バケットにレプリケーション設定を追加。レプリケーション先としてこのS3バケットを指定して利用しました。

2: SNS

  • 1回のS3イベントから複数のLambda処理を実施する可能性や、イベント情報をデバッグ用に取得しやすくするため、Amazon SNSを利用しました。
  • Amazon SNS、Lambdaへの配信課金は、無料で利用する事が可能です。

3: Lambda

  • Lambda関数のコードは、CloudFormationのテンプレートにインライン記述できる4000文字制限に収まる実装としました。
  • Lambda処理のタイムアウト時間は最大(300秒)まで延長。割り当てメモリは256MB、初期値の2倍に設定しました。
  • 今回のFirehose投入を行うLambda関数の実装では、1ログファイルあたり30〜40万レコードが処理上限の目安となります。
  • ELB(ALB)は5分毎にアクセスログをS3に出力する仕様です。1分あたり10万リクエストを超えるELBのアクセスログを処理する必要が有る場合、前処理としてファイル分割処理を行うLambdaの用意や、AWS Glueなどの利用をご検討ください。

Lamba割当メモリ別の所要時間

  • ELB(ALB)アクセスログ、6万、30万レコードのファイルを用意、 Firehose投入用Lambda関数の割当メモリ別の所要時間(Duration)を測定しました
Memory Size 6万レコード 30万レコード
256 MB 81秒 (タイムアウト)
512 MB 53秒 269秒
3008 MB 45秒 216秒

Lambda処理内容

  • SNSのメッセージから、処理対象となるS3のバケット、キーを取得します。
  • ログの時刻情報として期待するカラムのデータが日付フォーマットに一致しない場合や、カラム数が異常なレコードはエラーレコードとして除外します。
  • 「request」カラムに含まれるURLを「urlparse」モジュールを利用してパースし、解析時に利用頻度の高い「HOST」や「PATH」を事前に抽出します。
  • 「json」モジュールを利用してJSONエンコードを行い、Firehoseに対しバッチ転送を行います。

4: Firehose

  • 今回、出力先はS3のみとしましたが、Amazon Elasticserach Service、Redshiftと連係も可能です。
  • Firehoseのバッファ時間はELB(ALB)のログ出力間隔にあわせ、300秒としました。
  • Firehoseの後処理をLambda関数で実装する可能性を想定し、ファイルサイズは一定(50MB)で抑止する指定を行いました。

5: S3

  • Firehoseの出力先として利用します。
  • Firehoseの出力先を対象としたイベントを設定し、後処理を行うLambda関数をSNS経由で連係させました。

6: SNS

  • 後処理のLambda関数連係に利用します。

7: Lambda

  • Firehoseで集約、S3に出力されたログファイルの後処理を実装しました。
  • FirehoseよりS3に出力されたS3ファイルのキーに含まれる日付情報(「yyyy/mm/dd/HH24」)を、Hiveフォーマット(dt=yyyy-mm-dd-HH24/)に置換し、Athena処理用のS3にコピーします。
  • Hiveフォーマットのキーを付与する事で、Athenaのスキャン対象を特定期間に限定でき、AWS利用費の抑制が可能になります。

Amazon Athenaのパーティションを理解する #reinvent

Athenaの利用

テーブル作成

  • JSONに変換済のアクセスログは、テーブル定義のみで利用する事が可能です。
  • Firehoseの出力日時をLambdaで変換した「dt」を、パーティションとして利用する指定を行います。

ALB用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
CREATE EXTERNAL TABLE IF NOT EXISTS alb_logs
(
type string,
timestamp string,
elb string,
client_port string,
target_port string,
request_processing_time float,
target_processing_time float,
response_processing_time float,
elb_status_code string,
target_status_code string,
received_bytes float,
sent_bytes float,
request string,
user_agent string,
ssl_cipher  string,
ssl_protocol string,
target_group_arn string,
trace_id string,
domain_name string,
chosen_cert_arn string,
matched_rule_priority string,
client string,
target string,
request_method string,
request_uri string,
request_http_version string,
request_uri_scheme string,
request_uri_host string,
request_uri_path string
)
PARTITIONED BY (dt string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<Athena用S3バケット名>/firehose/alb_logs/'
;

CLB用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
CREATE EXTERNAL TABLE IF NOT EXISTS clb_logs
(
timestamp string,
elb string,
client string,
client_port string,
backend string,
backend_port string,
request_processing_time float,
backend_processing_time float,
response_processing_time float,
elb_status_code string,
backend_status_code string,
received_bytes float,
sent_bytes float,
request string,
user_agent string,
ssl_cipher  string,
ssl_protocol string,
request_method string,
request_uri string,
request_http_version string,
request_uri_scheme string,
request_uri_host string,
request_uri_path string
)
PARTITIONED BY (dt string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<Athena用S3バケット名>/firehose/clb_logs/'
;

CloudFront用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
CREATE EXTERNAL TABLE IF NOT EXISTS cloudfront_logs
(
timestamp string,
date string,
time string,
x_edge_location string,
sc_bytes float,
c_ip string,
cs_method string,
cs_host string,
cs_uri_stem string,
cs_status string,
cs_referer string,
cs_user_agent string,
cs_uri_query string,
cs_cookie string,
x_edge_result_type string,
x_edge_request_id string,
x_host_header string,
cs_protocol string,
cs_bytes float,
time_taken float,
x_forwarded_for string,
ssl_protocol string,
ssl_cipher string,
x_edge_response_result_type string,
fle_status string,
fle_encrypted_fields string
)
PARTITIONED BY (dt string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://<Athena用S3バケット名>/firehose/cloudfront_logs/'
;

パーティション情報更新

  • テーブル作成直後や新たに追加されたパーティションを解析対象とする場合、「MSCK REPAIR TABLE」を実行します。
1
MSCK REPAIR TABLE alb_logs
1
MSCK REPAIR TABLE clb_logs
1
MSCK REPAIR TABLE cloudfront_logs

解析

  • Athenaのコンソールなどを利用し、SQLによるアクセスログ解析が可能になります。
  • 「target」「backend」など、カラム名が異なる項目を利用しない解析では、SQLの流用可能です。
エラー応答の抽出
  • ELBがHTTP応答500番のステータスコードを戻したログを抽出します
1
2
3
4
5
6
SELECT *
FROM alb_logs
WHERE dt< '2018-05-16-15' and dt>='2018-05-14-15'
and elb_status_code like '5%'
order by timestamp
limit 10;

  • 指定した期間の5XX応答が確認できました。
高負荷発生源の疑いのある接続元の抽出
  • EC2応答時間(target_processing_time)の長い接続元IPアドレス(client)、UserAgentを抽出します
1
2
3
4
5
6
7
8
9
10
11
12
13
SELECT
  sum(target_processing_time) as total_processing_time
, count(1) as request_count
, SUBSTR(timestamp,1,13) as date_time
,  client
, user_agent
FROM alb_logs
WHERE dt< '2018-05-16-15' and dt>='2018-05-13-15'
and request_uri_host = 'dev.classmethod.jp'
and user_agent like '%bot%'
GROUP BY user_agent, client, SUBSTR(timestamp,1,13)
order by sum(target_processing_time) desc
limit 10

  • 指定した期間、UserAgentでBotを名乗る接続では、高負荷は発生していなかった事を確認できました。

AWSコスト

  • 1日500万リクエストのELBアクセスログ、AWS東京リージョンのFirehose、Lambdaで利用した場合、1日あたりの費用は0.202USDの計算になりました。
  • 多くの環境でAWSコストを圧迫する事なく利用できる可能性が高いと思われます。

内訳

Firehose

  • 0.108($)
    • 取り込みデータ、1GBあたりの単価は「0.036 USD」で計算
    • 0.108($) = 3(GB) × 0.036($)

Lambda

  • 無料枠は無い前提で計算しました。
  • 0.0193364 ($)
    • Firehose連係用実行時間
    • 0.0188032($) = 4520000(ミリ秒) × (0.000000208 / 100):(1ミリ秒価格USD) × 2 (Lambda割当メモリ128→256増強分)
    • Athenaコピー用実行時間
    • 0.000314($) = 0.0188($) = 151000(ミリ秒) × (0.000000208 / 100):(1ミリ秒価格USD)
      • Firehose連係用リクエスト
      • 0.0001714($) = 857回 × (0.20($) /  1000000):(1回リクエスト単価)
      • Firehose連係用リクエスト
      • 0.0000478($) = 239回 × (0.20($) /  1000000):(1回リクエスト単価)

S3

  • Firehoseから1日に出力されるログ(GZIP圧縮済)が、1日300MB。これをS3上で1ヶ月保持する想定でストレージ費用を計算しました。
  • S3データの参照、PUT、GETなどのAPI費用は除外しました。
  • 0.075 ($)
    • 0.3GB (ログ1日分) × 0.025USD/GB(1GB保管)

まとめ

ELB(ALB)をのアクセスログをサーバレス、低コストで処理する一例を紹介させて頂きました。

アクセスログが非圧縮な状態でS3に出力されるELB(CLB)のアクセスログや、 S3に保存されるキーによる期間指定が困難、ログファイル中のヘッダ行の存在がAthena処理で問題になる事のあるCloudFrontのアクセスログなどでは、 より高い効果が期待出来るかと思われます。

今回、Firehoseで集約したアクセスログをAthenaで参照するまでを紹介させて頂きましたが、

  • Firehoseのバッファ時間を利用した簡易タイムウィンドウ処理
  • Firehoseの後にKinesisAnalyticsを設置、アクセスログのストリーム処理
  • AWS Glueを利用した、アクセスログの長期利用を想定した最適化

などの応用も改めて紹介させて頂ければと思います。

CloudFormationテンプレート

ALB用

AWSTemplateFormatVersion: '2010-09-09'
Description: Convert ELB(alb) access log to JSON via Firehose (20180515)

Parameters:
  S3Expiredate
:
   
Description: Number of days to keep S3 file
   
Type: String
   
Default: 10
 
CWlogsExpiredate:
   
Description: Number of days to keep Cloudwatch logs
   
Type: String
   
Default: 3
  S3AthenaExpiredate
:
   
Description: Number of days to keep S3 file (Athena)
   
Type: String
   
Default: 45

Resources:
 
Step1S3Bucket:
   
Type: AWS::S3::Bucket
   
DeletionPolicy: Delete
   
Properties:
     
LifecycleConfiguration:
       
Rules:
         
- Id: AutoDelete
           
Status: Enabled
           
ExpirationInDays: !Ref 'S3Expiredate'
     
NotificationConfiguration:
       
TopicConfigurations:
         
- Event: s3:ObjectCreated:*
           
Filter:
              S3Key
:
               
Rules:
                 
- Name: suffix
                   
Value: .gz
           
Topic: !Ref 'Step2SnsTopic'
     
VersioningConfiguration:
       
Status: Enabled

 
Step1S3BucketPolicy:
   
Type: AWS::S3::BucketPolicy
   
Properties:
     
Bucket: !Ref 'Step1S3Bucket'
     
PolicyDocument:
       
Id: ElblogsBucketPolicy
       
Statement:
         
- Sid: AddPerm
           
Effect: Allow
           
Principal:
              AWS
:
               
- arn:aws:iam::582318560864:root
               
- arn:aws:iam::127311923021:root
               
- arn:aws:iam::033677994240:root
               
- arn:aws:iam::027434742980:root
               
- arn:aws:iam::797873946194:root
               
- arn:aws:iam::985666609251:root
               
- arn:aws:iam::054676820928:root
               
- arn:aws:iam::156460612806:root
               
- arn:aws:iam::652711504416:root
               
- arn:aws:iam::156460612806:root
               
- arn:aws:iam::009996457667:root
               
- arn:aws:iam::600734575887:root
               
- arn:aws:iam::383597477331:root
               
- arn:aws:iam::114774131450:root
               
- arn:aws:iam::797873946194:root
               
- arn:aws:iam::783225319266:root
               
- arn:aws:iam::718504428378:root
               
- arn:aws:iam::507241528517:root
           
Action:
             
- s3:PutObject
           
Resource:
             
- !Sub 'arn:aws:s3:::${Step1S3Bucket}/*'

 
Step2SnsTopicPolicy:
   
Type: AWS::SNS::TopicPolicy
   
Properties:
     
PolicyDocument:
       
Version: '2012-10-17'
       
Id: MyTopicPolicy
       
Statement:
         
- Sid: allow-publish-s3
           
Effect: Allow
           
Principal:
             
Service:
               
- s3.amazonaws.com
           
Action:
             
- sns:Publish
           
Resource: !Ref 'Step2SnsTopic'
     
Topics:
       
- !Ref 'Step2SnsTopic'

 
Step2SnsTopic:
   
Type: AWS::SNS::Topic
   
Properties:
     
DisplayName: !Sub 's3-logs-ObjectCreated'
     
Subscription:
       
- Endpoint: !GetAtt 'Step3LambdaFunction.Arn'
         
Protocol: lambda

 
Step3LambdaLambdaPermission:
   
Type: AWS::Lambda::Permission
   
Properties:
     
FunctionName: !Ref 'Step3LambdaFunction'
     
Action: lambda:InvokeFunction
     
Principal: sns.amazonaws.com
     
SourceArn: !Ref 'Step2SnsTopic'

 
Step3LogGroupLambda:
   
Type: AWS::Logs::LogGroup
   
Properties:
     
LogGroupName: !Sub '/aws/lambda/${Step3LambdaFunction}'
     
RetentionInDays: !Ref 'CWlogsExpiredate'

 
Step3LambdaRole:
   
Type: AWS::IAM::Role
   
Properties:
     
AssumeRolePolicyDocument:
       
Version: '2012-10-17'
       
Statement:
         
- Effect: Allow
           
Principal:
             
Service:
               
- lambda.amazonaws.com
           
Action:
             
- sts:AssumeRole
     
Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: '*'
              - Effect: Allow
                Action:
                  - sns:Publish
                Resource: '*'
              - Effect: Allow
                Action:
                  - s3:GetObject
                Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
              - Effect: Allow
                Action:
                  - firehose:PutRecordBatch
                Resource: !GetAtt 'Step4deliverystream.Arn'
              - Effect: Allow
                Action:
                  - sqs:SendMessage
                Resource: !GetAtt [Step3SqsDeadLetterQueue, Arn]

  Step3SqsDeadLetterQueue:
    Type: AWS::SQS::Queue
    Properties:
      MessageRetentionPeriod: 1209600

  Step3LambdaFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.lambda_handler
      Role: !GetAtt 'Step3LambdaRole.Arn'
      DeadLetterConfig:
        TargetArn: !GetAtt [Step3SqsDeadLetterQueue, Arn]
      Code:
        ZipFile: !Sub |
          import boto3
          import json
          import os
          import urllib.parse
          import gzip
          from datetime import datetime
          import base64
          import re

          s3 = boto3.client('s3')
          firehose = boto3.client('firehose')

          def lambda_handler(event, context):
            a = parse_s3_event(event)
            bucket_name = a['bucket_name']
            key = a['key']

            # Process ALB log (gzip)
            if (re.match('.*.gz$', key)):
              response =s3.get_object(Bucket=bucket_name, Key=key)
              body = gzip.decompress(response['Body'].read()).decode('utf-8').splitlines()
              if len(body) > 0:
                process_log(body)

          def parse_s3_event(event):
            a = json.loads(event['Records'][0]['Sns']['Message'])
            z = {}
            z['bucket_name'] = a['Records'][0]['s3']['bucket']['name']
            z['key'] = urllib.parse.unquote_plus(a['Records'][0]['s3']['object']['key'], encoding='utf-8')
            return z

          def process_log(data):
            i = 0
            c = []
            for a in data:
              b = parse_log(a)
              if b is not None:
                c.append({'Data': b})
                i = i + 1
              if i == 100:
                PutRecordBatchFirehose(c)
                i = 0
                c = []
            if len(c) > 0:
              PutRecordBatchFirehose(c)

          def parse_log(line):
            z = {}
            a = line.split('"')
            b = a[0].split(' ')

            # ALB Log
            if len(b) == 13:
              if b[2].split('/')[0] == '
app':
                if (re.match('
[0-9]...-[0-9].-[0-9].T[0-9].:[0-9].:[0-9].\.[[0-9]*Z' , b[1])):
                  z = parse_alb_log(a)

            #Column check (number)
            if len(z) > 20:
              #print(z)
              return json.dumps(z) + "\n"

          def parse_alb_log(a):
            z = {}
            b = a[0].split('
')

            # ALB Log
            z["type"] = b[0]
            z["timestamp"] = b[1]
            z["elb"] = b[2]
            if len(b[3].split('
:')) > 1:
              z["client"] = b[3].split('
:')[0]
              z["client_port"] = b[3].split('
:')[1]
            if len(b[4].split('
:')) > 1:
              z["target"] = b[4].split('
:')[0]
              z["target_port"] = b[4].split('
:')[1]
 
            z["request_processing_time"] = float(b[5])
            z["target_processing_time"] = float(b[6])
            z["response_processing_time"] = float(b[7])
            z["elb_status_code"] = b[8]
            z["target_status_code"] = b[9]
            z["received_bytes"] = float(b[10])
            z["sent_bytes"] = float(b[11])
            z["request"] = a[1]
            z["user_agent"] = a[3]

            c = a[4].split('
')
            if len(c) == 5:
              z["ssl_cipher"] = c[1]
              z["ssl_protocol"] = c[2]
              z["target_group_arn"] = c[3]
            z["trace_id"] = a[5]
            z["domain_name"] = a[7]
            z["chosen_cert_arn"] = a[9]
            z["matched_rule_priority"] = a[10]

            if len(a) > 10:
              d = a[10].split('
')
              if len(d) > 1:
                z["matched_rule_priority"] = d[1]
            if len(z["request"].split('
')) > 2:
              z["request_method"] = z["request"].split('
')[0]
              z["request_uri"] = z["request"].split('
')[1]
              z["request_http_version"] = z["request"].split('
')[2]
              if z["request_method"] != '
-' :
                e = urllib.parse.urlparse(z["request_uri"])
                z["request_uri_scheme"] = e.scheme
                z["request_uri_user"] = e.username
                z["request_uri_host"] = e.hostname
                z["request_uri_port"] = e.port
                z["request_uri_path"] = e.path
                z["request_uri_query"] = e.query
                z["request_uri_fragment"] = e.fragment
              else:
                z["request_uri_scheme"] = z["request_uri"].split('
:')[0]
                z["request_uri_user"] = ''
                z["request_uri_host"] = z["request_uri"].split('
/')[2].split(':')[0]
                z["request_uri_port"] = z["request_uri"].split('
:')[2].split('-')[0]
                z["request_uri_path"] = ''
            return z

          def PutRecordBatchFirehose(data):
            firehose_stream_name = os.environ['
firehose_stream_name']
            r = firehose.put_record_batch(
              DeliveryStreamName = firehose_stream_name,
              Records = data
            )

            #print(str(data))
            #print(str(r["ResponseMetadata"]["HTTPHeaders"]))

      Runtime: python3.6
      MemorySize: 256
      Timeout: 300
      Description: alb accesslog S3 to firehose
      Environment:
        Variables:
          firehose_stream_name: !Ref '
Step4deliverystream'
      Tags:
        - Key: CloudformationArn
          Value: !Ref '
AWS::StackId'

  Step4deliverystream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      ExtendedS3DestinationConfiguration:
        BucketARN: !Sub '
arn:aws:s3:::${Step5S3Bucket}'
        BufferingHints:
          IntervalInSeconds: '
300'
          SizeInMBs: '
50'
        CompressionFormat: GZIP
        Prefix: firehose/alb_logs/
        RoleARN: !GetAtt '
Step4deliveryRole.Arn'
        ProcessingConfiguration:
          Enabled: '
false'

  Step4deliveryRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '
2012-10-17'
        Statement:
          - Sid: ''
            Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
            Condition:
              StringEquals:
                sts:ExternalId: !Ref '
AWS::AccountId'

  Step4deliveryPolicy:
    Type: AWS::IAM::Policy
    Properties:
      PolicyName: firehose_delivery_policy
      PolicyDocument:
        Version: '
2012-10-17'
        Statement:
          - Effect: Allow
            Action:
              - s3:AbortMultipartUpload
              - s3:GetBucketLocation
              - s3:GetObject
              - s3:ListBucket
              - s3:ListBucketMultipartUploads
              - s3:PutObject
            Resource:
              - !Sub '
arn:aws:s3:::${Step5S3Bucket}'
              - !Sub '
arn:aws:s3:::${Step5S3Bucket}*'
      Roles:
        - !Ref '
Step4deliveryRole'

  Step4LogGroupFirehose:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '
/aws/firehose/${Step4deliverystream}'
      RetentionInDays: !Ref '
CWlogsExpiredate'

  Step5S3Bucket:
    Type: AWS::S3::Bucket
    DeletionPolicy: Delete
    Properties:
      LifecycleConfiguration:
        Rules:
          - Id: AutoDelete
            Status: Enabled
            ExpirationInDays: !Ref '
S3Expiredate'
      NotificationConfiguration:
        TopicConfigurations:
          - Event: s3:ObjectCreated:*
            Filter:
              S3Key:
                Rules:
                  - Name: suffix
                    Value: .gz
                  - Name: prefix
                    Value: firehose/alb_logs/
            Topic: !Ref '
Step6SnsTopic'
      VersioningConfiguration:
        Status: Enabled

  Step6SnsTopicPolicy:
    Type: AWS::SNS::TopicPolicy
    Properties:
      PolicyDocument:
        Version: '
2012-10-17'
        Id: MyTopicPolicy
        Statement:
        - Sid: allow-publish-s3
          Effect: Allow
          Principal:
            Service:
            - s3.amazonaws.com
          Action:
          - sns:Publish
          Resource: !Ref '
Step6SnsTopic'
      Topics:
      - !Ref '
Step6SnsTopic'

  Step6SnsTopic:
    Type: AWS::SNS::Topic
    Properties:
      DisplayName: s3-trigger-firehose-output
      Subscription:
      - Endpoint: !GetAtt '
Step7LambdaFunction.Arn'
        Protocol: lambda

  Step7LambdaPermission:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName: !Ref '
Step7LambdaFunction'
      Action: lambda:InvokeFunction
      Principal: sns.amazonaws.com
      SourceArn: !Ref '
Step6SnsTopic'
  Step7LogGroupLambda:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '
/aws/lambda/${Step7LambdaFunction}'
      RetentionInDays: 14
  Step7LambdaRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '
2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: '
2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: '
*'
              - Effect: Allow
                Action:
                  - sns:Publish
                Resource: '
*'
              - Effect: Allow
                Action:
                  - s3:GetObject
                Resource: !Sub '
arn:aws:s3:::${AWS::StackName}-*'
              - Effect: Allow
                Action:
                  - s3:PutObject
                Resource: !Sub '
arn:aws:s3:::${AWS::StackName}-*'
              - Effect: Allow
                Action:
                  - sqs:SendMessage
                Resource: !GetAtt [Step7SqsDeadLetterQueue, Arn]

  Step7SqsDeadLetterQueue:
    Type: AWS::SQS::Queue
    Properties:
      MessageRetentionPeriod: 1209600

  Step7LambdaFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.lambda_handler
      DeadLetterConfig:
        TargetArn: !GetAtt [Step7SqsDeadLetterQueue, Arn]
      Role: !GetAtt '
Step7LambdaRole.Arn'
      Code:
        ZipFile: !Sub |
          import boto3
          import json
          import os
          import urllib.parse
          def lambda_handler(event, context):
            z = parse_s3_event(event)
            bucket_name = z['
bucket_name']
            key = z['
key']
            new_key = get_key_with_partition(key)
            new_bucket = os.environ['
s3_bucket']
            s3 = boto3.client('
s3')
            r = s3.copy_object(Bucket=new_bucket, Key=new_key, CopySource={'
Bucket': bucket_name, 'Key': key})
          def parse_s3_event(event):
            a = json.loads(event['
Records'][0]['Sns']['Message'])
            z = {}
            z['
bucket_name'] = a['Records'][0]['s3']['bucket']['name']
            z['
key'] = urllib.parse.unquote_plus(a['Records'][0]['s3']['object']['key'], encoding='utf-8')
            return z
          def get_key_with_partition(key):
            a = key.split('
/')
            a.reverse()
            z = {}
            z['
filename'] = a[0]
            z['
hour'] = a[1]
            z['
day'] = a[2]
            z['
month'] = a[3]
            z['
year'] = a[4]
            z['
prefix'] = get_key_prefix(key)
            f = z['
prefix'] + '/' + 'dt=' + z['year'] + '-' + z['month'] + '-' + z['day'] + '-' + z['hour'] + '/' + z['filename']
            return f
          def get_key_prefix(key):
            a = key.split('
/')
            b = len(a) - 5
            d = []
            for c in a[:b]:
              d.append(c)
            e = '
/'.join(d)
            return e
      Runtime: python3.6
      MemorySize: 128
      Timeout: 300
      Description: Copy the S3 file output by Firehose for Athena (with partition)
      Environment:
        Variables:
          CfnStackName: !Sub '
${AWS::StackName}'
          s3_bucket: !Ref '
Step8S3Bucket'
      Tags:
        - Key: CloudformationArn
          Value: !Ref '
AWS::StackId'

  Step8S3Bucket:
    Type: AWS::S3::Bucket
    DeletionPolicy: Delete
    Properties:
      LifecycleConfiguration:
        Rules:
          - Id: AutoDelete
            Status: Enabled
            ExpirationInDays: !Ref '
S3AthenaExpiredate'

Outputs:
  S3BucketSource:
    Value: !Ref '
Step1S3Bucket'
  S3BucketJson:
    Value: !Ref '
Step5S3Bucket'
  S3BucketAthena:
    Value: !Ref '
Step8S3Bucket'

CLB用

AWSTemplateFormatVersion: '2010-09-09'
Description: Convert ELB(clb) access log to JSON via Firehose (20180517)
Parameters:
  S3Expiredate
:
   
Description: Number of days to keep S3 file (S3 TTL)
   
Type: String
   
Default: 10
 
CWlogsExpiredate:
   
Description: Number of days to keep Cloudwatch logs (logs TTL)
   
Type: String
   
Default: 3
  S3AthenaExpiredate
:
   
Description: Number of days to keep S3 file (Athena)
   
Type: String
   
Default: 45

Resources:
 
Step1S3Bucket:
   
Type: AWS::S3::Bucket
   
DeletionPolicy: Delete
   
Properties:
     
LifecycleConfiguration:
       
Rules:
         
- Id: AutoDelete
           
Status: Enabled
           
ExpirationInDays: !Ref 'S3Expiredate'
     
NotificationConfiguration:
       
TopicConfigurations:
         
- Event: s3:ObjectCreated:*
           
Filter:
              S3Key
:
               
Rules:
                 
- Name: suffix
                   
Value: log
           
Topic: !Ref 'Step2SnsTopic'
     
VersioningConfiguration:
       
Status: Enabled

 
Step1S3BucketPolicy:
   
Type: AWS::S3::BucketPolicy
   
Properties:
     
Bucket: !Ref 'Step1S3Bucket'
     
PolicyDocument:
       
Id: ElblogsBucketPolicy
       
Statement:
         
- Sid: AddPerm
           
Effect: Allow
           
Principal:
              AWS
:
               
- arn:aws:iam::582318560864:root
               
- arn:aws:iam::127311923021:root
               
- arn:aws:iam::033677994240:root
               
- arn:aws:iam::027434742980:root
               
- arn:aws:iam::797873946194:root
               
- arn:aws:iam::985666609251:root
               
- arn:aws:iam::054676820928:root
               
- arn:aws:iam::156460612806:root
               
- arn:aws:iam::652711504416:root
               
- arn:aws:iam::156460612806:root
               
- arn:aws:iam::009996457667:root
               
- arn:aws:iam::600734575887:root
               
- arn:aws:iam::383597477331:root
               
- arn:aws:iam::114774131450:root
               
- arn:aws:iam::797873946194:root
               
- arn:aws:iam::783225319266:root
               
- arn:aws:iam::718504428378:root
               
- arn:aws:iam::507241528517:root
           
Action:
             
- s3:PutObject
           
Resource:
             
- !Sub 'arn:aws:s3:::${Step1S3Bucket}/*'

 
Step2SnsTopicPolicy:
   
Type: AWS::SNS::TopicPolicy
   
Properties:
     
PolicyDocument:
       
Version: '2012-10-17'
       
Id: MyTopicPolicy
       
Statement:
         
- Sid: allow-publish-s3
           
Effect: Allow
           
Principal:
             
Service:
               
- s3.amazonaws.com
           
Action:
             
- sns:Publish
           
Resource: !Ref 'Step2SnsTopic'
     
Topics:
       
- !Ref 'Step2SnsTopic'

 
Step2SnsTopic:
   
Type: AWS::SNS::Topic
   
Properties:
     
DisplayName: !Sub 's3-logs-ObjectCreated'
     
Subscription:
       
- Endpoint: !GetAtt 'Step3LambdaFunction.Arn'
         
Protocol: lambda

 
Step3LambdaLambdaPermission:
   
Type: AWS::Lambda::Permission
   
Properties:
     
FunctionName: !Ref 'Step3LambdaFunction'
     
Action: lambda:InvokeFunction
     
Principal: sns.amazonaws.com
     
SourceArn: !Ref 'Step2SnsTopic'

 
Step3LogGroupLambda:
   
Type: AWS::Logs::LogGroup
   
Properties:
     
LogGroupName: !Sub '/aws/lambda/${Step3LambdaFunction}'
     
RetentionInDays: !Ref 'CWlogsExpiredate'

 
Step3LambdaRole:
   
Type: AWS::IAM::Role
   
Properties:
     
AssumeRolePolicyDocument:
       
Version: '2012-10-17'
       
Statement:
         
- Effect: Allow
           
Principal:
             
Service:
               
- lambda.amazonaws.com
           
Action:
             
- sts:AssumeRole
     
Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: '*'
              - Effect: Allow
                Action:
                  - sns:Publish
                Resource: '*'
              - Effect: Allow
                Action:
                  - s3:GetObject
                Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
              - Effect: Allow
                Action:
                  - firehose:PutRecordBatch
                Resource: !GetAtt 'Step4deliverystream.Arn'
              - Effect: Allow
                Action:
                  - sqs:SendMessage
                Resource: !GetAtt [Step3SqsDeadLetterQueue, Arn]

  Step3LambdaFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.lambda_handler
      Role: !GetAtt 'Step3LambdaRole.Arn'
      DeadLetterConfig:
        TargetArn: !GetAtt [Step3SqsDeadLetterQueue, Arn]
      Code:
        ZipFile: !Sub |
          import boto3
          import json
          import os
          import urllib.parse
          import gzip
          from datetime import datetime
          import base64
          import re

          s3 = boto3.client('s3')
          firehose = boto3.client('firehose')

          def lambda_handler(event, context):
            a = parse_s3_event(event)
            bucket_name = a['bucket_name']
            key = a['key']

            # Process CLB log (.log)
            if (re.match('.*.log$', key)):
              response =s3.get_object(Bucket=bucket_name, Key=key)
              body = response['Body'].read().decode('utf-8').splitlines()
              if len(body) > 0:
                process_log(body)
       
          def parse_s3_event(event):
            a = json.loads(event['Records'][0]['Sns']['Message'])
            z = {}
            z['bucket_name'] = a['Records'][0]['s3']['bucket']['name']
            z['key'] = urllib.parse.unquote_plus(a['Records'][0]['s3']['object']['key'], encoding='utf-8')
            return z

          def process_log(data):
            i = 0
            c = []
            for a in data:
              b = parse_log(a)
              if b is not None:
                c.append({'Data': b})
                i = i + 1
              if i == 100:
                PutRecordBatchFirehose(c)
                i = 0
                c = []
            if len(c) > 0:
              PutRecordBatchFirehose(c)

          def parse_log(line):
            z = {}
            a = line.split('"')
            b = a[0].split(' ')

            # CLB Log
            if len(b) == 12:
              if (re.match('[0-9]...-[0-9].-[0-9].T[0-9].:[0-9].:[0-9].\.[[0-9]*Z' , b[0])):
                z = parse_clb_log(a)

            #Column check (number)
            if len(z) > 20:
              #print(z)
              return json.dumps(z) + "\n"

          def parse_clb_log(a):
            z = {}
            b = a[0].split(' ')
            # CLB Log
            z["timestamp"] = b[0]
            z["elb"] = b[1]
            if len(b[2].split(':')) > 1:
              z["client"] =  b[2].split(':')[0]
              z["client_port"] = b[2].split(':')[1]
            if len(b[3].split(':')) > 1:
              z["backend"] =  b[3].split(':')[0]
              z["backend_port"] = b[3].split(':')[1]
            z["request_processing_time"] = float(b[4])
            z["backend_processing_time"] = float(b[5])
            z["response_processing_time"] = float(b[6])
            z["elb_status_code"] = b[7]
            z["backend_status_code"] = b[8]
            z["received_bytes"] = float(b[9])
            z["sent_bytes"] = float(b[10])
            z["request"] = a[1]
            z["user_agent"] = a[3]
            c = a[4].split(' ')
            if len(c) == 2:
              z["ssl_cipher"] = c[1]
              z["ssl_protocol"] = c[2]
            if len(b[3].split(':')) > 1:
              z["client_port"] = b[3].split(':')[1]
              z["client_port"] = b[3].split(':')[1]
            if len(z["request"].split(' ')) > 2:
              z["request_method"] = z["request"].split(' ')[0]
              z["request_uri"] = z["request"].split(' ')[1]
              z["request_http_version"] = z["request"].split(' ')[2]
              if z["request_method"] != '-' :
                e = urllib.parse.urlparse(z["request_uri"])
                z["request_uri_scheme"] = e.scheme
                z["request_uri_user"] = e.username
                z["request_uri_host"] = e.hostname
                z["request_uri_port"] = e.port
                z["request_uri_path"] = e.path
                z["request_uri_query"] = e.query
                z["request_uri_fragment"] = e.fragment
            return z

          def PutRecordBatchFirehose(data):
            firehose_stream_name = os.environ['firehose_stream_name']
            r = firehose.put_record_batch(
              DeliveryStreamName = firehose_stream_name,
              Records = data
            )

            #print(str(data))
            #print(str(r["ResponseMetadata"]["HTTPHeaders"]))

      Runtime: python3.6
      MemorySize: 256
      Timeout: 300
      Description: clb accesslog S3 to firehose
      Environment:
        Variables:
          firehose_stream_name: !Ref 'Step4deliverystream'
      Tags:
        - Key: CloudformationArn
          Value: !Ref 'AWS::StackId'

  Step3SqsDeadLetterQueue:
    Type: AWS::SQS::Queue
    Properties:
      MessageRetentionPeriod: 1209600

  Step4deliverystream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      ExtendedS3DestinationConfiguration:
        BucketARN: !Sub 'arn:aws:s3:::${Step5S3Bucket}'
        BufferingHints:
          IntervalInSeconds: '300'
          SizeInMBs: '50'
        CompressionFormat: GZIP
        Prefix: firehose/
clb_logs/
       
RoleARN: !GetAtt 'Step4deliveryRole.Arn'
       
ProcessingConfiguration:
         
Enabled: 'false'

 
Step4deliveryRole:
   
Type: AWS::IAM::Role
   
Properties:
     
AssumeRolePolicyDocument:
       
Version: '2012-10-17'
       
Statement:
         
- Sid: ''
           
Effect: Allow
           
Principal:
             
Service: firehose.amazonaws.com
           
Action: sts:AssumeRole
           
Condition:
             
StringEquals:
                sts
:ExternalId: !Ref 'AWS::AccountId'

 
Step4deliveryPolicy:
   
Type: AWS::IAM::Policy
   
Properties:
     
PolicyName: firehose_delivery_policy
     
PolicyDocument:
       
Version: '2012-10-17'
       
Statement:
         
- Effect: Allow
           
Action:
             
- s3:AbortMultipartUpload
             
- s3:GetBucketLocation
             
- s3:GetObject
             
- s3:ListBucket
             
- s3:ListBucketMultipartUploads
             
- s3:PutObject
           
Resource:
             
- !Sub 'arn:aws:s3:::${Step5S3Bucket}'
             
- !Sub 'arn:aws:s3:::${Step5S3Bucket}*'
     
Roles:
       
- !Ref 'Step4deliveryRole'

 
Step4LogGroupFirehose:
   
Type: AWS::Logs::LogGroup
   
Properties:
     
LogGroupName: !Sub '/aws/firehose/${Step4deliverystream}'
     
RetentionInDays: !Ref 'CWlogsExpiredate'

 
Step5S3Bucket:
   
Type: AWS::S3::Bucket
   
DeletionPolicy: Delete
   
Properties:
     
LifecycleConfiguration:
       
Rules:
         
- Id: AutoDelete
           
Status: Enabled
           
ExpirationInDays: !Ref 'S3Expiredate'
     
NotificationConfiguration:
       
TopicConfigurations:
         
- Event: s3:ObjectCreated:*
           
Filter:
              S3Key
:
               
Rules:
                 
- Name: suffix
                   
Value: .gz
                 
- Name: prefix
                   
Value: firehose/alb_logs/
           
Topic: !Ref 'Step6SnsTopic'
     
VersioningConfiguration:
       
Status: Enabled

 
Step6SnsTopicPolicy:
   
Type: AWS::SNS::TopicPolicy
   
Properties:
     
PolicyDocument:
       
Version: '2012-10-17'
       
Id: MyTopicPolicy
       
Statement:
       
- Sid: allow-publish-s3
         
Effect: Allow
         
Principal:
           
Service:
           
- s3.amazonaws.com
         
Action:
         
- sns:Publish
         
Resource: !Ref 'Step6SnsTopic'
     
Topics:
     
- !Ref 'Step6SnsTopic'

 
Step6SnsTopic:
   
Type: AWS::SNS::Topic
   
Properties:
     
DisplayName: s3-trigger-firehose-output
     
Subscription:
     
- Endpoint: !GetAtt 'Step7LambdaFunction.Arn'
       
Protocol: lambda

 
Step7LambdaPermission:
   
Type: AWS::Lambda::Permission
   
Properties:
     
FunctionName: !Ref 'Step7LambdaFunction'
     
Action: lambda:InvokeFunction
     
Principal: sns.amazonaws.com
     
SourceArn: !Ref 'Step6SnsTopic'
 
Step7LogGroupLambda:
   
Type: AWS::Logs::LogGroup
   
Properties:
     
LogGroupName: !Sub '/aws/lambda/${Step7LambdaFunction}'
     
RetentionInDays: 14
 
Step7LambdaRole:
   
Type: AWS::IAM::Role
   
Properties:
     
AssumeRolePolicyDocument:
       
Version: '2012-10-17'
       
Statement:
         
- Effect: Allow
           
Principal:
             
Service:
               
- lambda.amazonaws.com
           
Action:
             
- sts:AssumeRole
     
Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: '*'
              - Effect: Allow
                Action:
                  - sns:Publish
                Resource: '*'
              - Effect: Allow
                Action:
                  - s3:GetObject
                Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
              - Effect: Allow
                Action:
                  - s3:PutObject
                Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
              - Effect: Allow
                Action:
                  - sqs:SendMessage
                Resource: !GetAtt [Step7SqsDeadLetterQueue, Arn]

  Step7SqsDeadLetterQueue:
    Type: AWS::SQS::Queue
    Properties:
      MessageRetentionPeriod: 1209600

  Step7LambdaFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.lambda_handler
      DeadLetterConfig:
        TargetArn: !GetAtt [Step7SqsDeadLetterQueue, Arn]
      Role: !GetAtt 'Step7LambdaRole.Arn'
      Code:
        ZipFile: !Sub |
          import boto3
          import json
          import os
          import urllib.parse
          def lambda_handler(event, context):
            z = parse_s3_event(event)
            bucket_name = z['bucket_name']
            key = z['key']
            new_key = get_key_with_partition(key)
            new_bucket = os.environ['s3_bucket']
            s3 = boto3.client('s3')
            r = s3.copy_object(Bucket=new_bucket, Key=new_key, CopySource={'Bucket': bucket_name, 'Key': key})
          def parse_s3_event(event):
            a = json.loads(event['Records'][0]['Sns']['Message'])
            z = {}
            z['bucket_name'] = a['Records'][0]['s3']['bucket']['name']
            z['key'] = urllib.parse.unquote_plus(a['Records'][0]['s3']['object']['key'], encoding='utf-8')
            return z
          def get_key_with_partition(key):
            a = key.split('/')
            a.reverse()
            z = {}
            z['
filename'] = a[0]
            z['
hour'] = a[1]
            z['
day'] = a[2]
            z['
month'] = a[3]
            z['
year'] = a[4]
            z['
prefix'] = get_key_prefix(key)
            f = z['
prefix'] + '/' + 'dt=' + z['year'] + '-' + z['month'] + '-' + z['day'] + '-' + z['hour'] + '/' + z['filename']
            return f
          def get_key_prefix(key):
            a = key.split('
/')
            b = len(a) - 5
            d = []
            for c in a[:b]:
              d.append(c)
            e = '
/'.join(d)
            return e
      Runtime: python3.6
      MemorySize: 128
      Timeout: 300
      Description: Copy the S3 file output by Firehose for Athena (with partition)
      Environment:
        Variables:
          CfnStackName: !Sub '
${AWS::StackName}'
          s3_bucket: !Ref '
Step8S3Bucket'
      Tags:
        - Key: CloudformationArn
          Value: !Ref '
AWS::StackId'

  Step8S3Bucket:
    Type: AWS::S3::Bucket
    DeletionPolicy: Delete
    Properties:
      LifecycleConfiguration:
        Rules:
          - Id: AutoDelete
            Status: Enabled
            ExpirationInDays: !Ref '
S3AthenaExpiredate'

Outputs:
  S3BucketSource:
    Value: !Ref '
Step1S3Bucket'
  S3BucketJson:
    Value: !Ref '
Step5S3Bucket'
  S3BucketAthena:
    Value: !Ref '
Step8S3Bucket'

CloudFront用

AWSTemplateFormatVersion: '2010-09-09'
Description: Convert CloudFront access log to JSON via Firehose (20180514)
Parameters:
  S3Expiredate
:
   
Description: Number of days to keep S3 file (S3 TTL)
   
Type: String
   
Default: 10
 
CWlogsExpiredate:
   
Description: Number of days to keep Cloudwatch logs (logs TTL)
   
Type: String
   
Default: 3
  S3AthenaExpiredate
:
   
Description: Number of days to keep S3 file (Athena)
   
Type: String
   
Default: 45

Resources:
 
Step1S3Bucket:
   
Type: AWS::S3::Bucket
   
DeletionPolicy: Delete
   
Properties:
     
LifecycleConfiguration:
       
Rules:
         
- Id: AutoDelete
           
Status: Enabled
           
ExpirationInDays: !Ref 'S3Expiredate'
     
NotificationConfiguration:
       
TopicConfigurations:
         
- Event: s3:ObjectCreated:*
           
Filter:
              S3Key
:
               
Rules:
                 
- Name: suffix
                   
Value: gz
           
Topic: !Ref 'Step2SnsTopic'
     
VersioningConfiguration:
       
Status: Enabled

 
Step2SnsTopicPolicy:
   
Type: AWS::SNS::TopicPolicy
   
Properties:
     
PolicyDocument:
       
Version: '2012-10-17'
       
Id: MyTopicPolicy
       
Statement:
         
- Sid: allow-publish-s3
           
Effect: Allow
           
Principal:
             
Service:
               
- s3.amazonaws.com
           
Action:
             
- sns:Publish
           
Resource: !Ref 'Step2SnsTopic'
     
Topics:
       
- !Ref 'Step2SnsTopic'

 
Step2SnsTopic:
   
Type: AWS::SNS::Topic
   
Properties:
     
DisplayName: !Sub 's3-logs-ObjectCreated'
     
Subscription:
       
- Endpoint: !GetAtt 'Step3LambdaFunction.Arn'
         
Protocol: lambda

 
Step3LambdaLambdaPermission:
   
Type: AWS::Lambda::Permission
   
Properties:
     
FunctionName: !Ref 'Step3LambdaFunction'
     
Action: lambda:InvokeFunction
     
Principal: sns.amazonaws.com
     
SourceArn: !Ref 'Step2SnsTopic'

 
Step3LogGroupLambda:
   
Type: AWS::Logs::LogGroup
   
Properties:
     
LogGroupName: !Sub '/aws/lambda/${Step3LambdaFunction}'
     
RetentionInDays: !Ref 'CWlogsExpiredate'

 
Step3LambdaRole:
   
Type: AWS::IAM::Role
   
Properties:
     
AssumeRolePolicyDocument:
       
Version: '2012-10-17'
       
Statement:
         
- Effect: Allow
           
Principal:
             
Service:
               
- lambda.amazonaws.com
           
Action:
             
- sts:AssumeRole
     
Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: '*'
              - Effect: Allow
                Action:
                  - sns:Publish
                Resource: '*'
              - Effect: Allow
                Action:
                  - s3:GetObject
                Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
              - Effect: Allow
                Action:
                  - firehose:PutRecordBatch
                Resource: !GetAtt 'Step4deliverystream.Arn'
              - Effect: Allow
                Action:
                  - sqs:SendMessage
                Resource: !GetAtt [Step3SqsDeadLetterQueue, Arn]

  Step3LambdaFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.lambda_handler
      Role: !GetAtt 'Step3LambdaRole.Arn'
      DeadLetterConfig:
        TargetArn: !GetAtt [Step3SqsDeadLetterQueue, Arn]
      Code:
        ZipFile: !Sub |
          import boto3
          import json
          import os
          import urllib.parse
          import gzip
          from datetime import datetime
          import base64
          import re

          s3 = boto3.client('s3')
          firehose = boto3.client('firehose')

          def lambda_handler(event, context):
            a = parse_s3_event(event)
            bucket_name = a['bucket_name']
            key = a['key']

            # Process CloudFront log (.gz)
            if (re.match('.*.gz$', key)):
              response =s3.get_object(Bucket=bucket_name, Key=key)
              body = gzip.decompress(response['Body'].read()).decode('utf-8').splitlines()
              if len(body) > 0:
                process_log(body)
       
          def parse_s3_event(event):
            a = json.loads(event['Records'][0]['Sns']['Message'])
            z = {}
            z['bucket_name'] = a['Records'][0]['s3']['bucket']['name']
            z['key'] = urllib.parse.unquote_plus(a['Records'][0]['s3']['object']['key'], encoding='utf-8')
            return z

          def process_log(data):
            i = 0
            c = []
            for a in data:
              b = parse_log(a)
              if b is not None:
                c.append({'Data': b})
                i = i + 1
              if i == 100:
                PutRecordBatchFirehose(c)
                i = 0
                c = []
            if len(c) > 0:
              PutRecordBatchFirehose(c)

          def parse_log(line):
            z = {}
            a = line.split('\t')

            # cloudfront Log
            if len(a) > 25:
              if (re.match('[0-9]...-[0-9].-[0-9].' , a[0])):
                if (re.match('[0-9].:[0-9].:[0-9].' , a[1])):
                  z = parse_cloudfront_log(a)

            #Column check (number)
            if len(z) > 25:
              #print(z)
              return json.dumps(z) + "\n"

          def parse_cloudfront_log(b):
            z = {}
            z["date"] = b[0]
            z["time"] = b[1]
            z["x_edge_location"] = b[2]
            z["sc_bytes"] = float(b[3])
            z["c_ip"] = b[4]
            z["cs_method"] = b[5]
            z["cs_host"] = b[6]
            z["cs_uri_stem"] = b[7]
            z["sc_status"] = b[8]
            z["cs_referer"] = b[9]
            z["cs_user_agent"] = b[10]
            z["cs_uri_query"] = b[11]
            z["cs_cookie"] = b[12]
            z["x_edge_result_type"] = b[13]
            z["x_edge_request_id"] = b[14]
            z["x_host_header"] = b[15]
            z["cs_protocol"] = b[16]
            z["cs_bytes"] = float(b[17])
            z["time_taken"] = float(b[18])
            z["x_forwarded_for"] = b[19]
            z["ssl_protocol"] = b[20]
            z["ssl_cipher"] = b[21]
            z["x_edge_response_result_type"] = b[22]
            z["cs_protocol_version"] = b[23]
            z["fle_status"] = b[3]
            z["fle_encrypted_fields"] = b[3]

            z["timestamp"] = z["date"] + 'T' + z["time"] + '.000000Z'

            return z

          def PutRecordBatchFirehose(data):
            firehose_stream_name = os.environ['firehose_stream_name']
            r = firehose.put_record_batch(
              DeliveryStreamName = firehose_stream_name,
              Records = data
            )

            #print(str(data))
            #print(str(r["ResponseMetadata"]["HTTPHeaders"]))

      Runtime: python3.6
      MemorySize: 256
      Timeout: 300
      Description: CloudFront accesslog S3 to firehose
      Environment:
        Variables:
          firehose_stream_name: !Ref 'Step4deliverystream'
      Tags:
        - Key: CloudformationArn
          Value: !Ref 'AWS::StackId'

  Step3SqsDeadLetterQueue:
    Type: AWS::SQS::Queue
    Properties:
      MessageRetentionPeriod: 1209600

  Step4deliverystream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      ExtendedS3DestinationConfiguration:
        BucketARN: !Sub 'arn:aws:s3:::${Step5S3Bucket}'
        BufferingHints:
          IntervalInSeconds: '300'
          SizeInMBs: '50'
        CompressionFormat: GZIP
        Prefix: firehose/
cloudfront_logs/
       
RoleARN: !GetAtt 'Step4deliveryRole.Arn'
       
ProcessingConfiguration:
         
Enabled: 'false'

 
Step4deliveryRole:
   
Type: AWS::IAM::Role
   
Properties:
     
AssumeRolePolicyDocument:
       
Version: '2012-10-17'
       
Statement:
         
- Sid: ''
           
Effect: Allow
           
Principal:
             
Service: firehose.amazonaws.com
           
Action: sts:AssumeRole
           
Condition:
             
StringEquals:
                sts
:ExternalId: !Ref 'AWS::AccountId'

 
Step4deliveryPolicy:
   
Type: AWS::IAM::Policy
   
Properties:
     
PolicyName: firehose_delivery_policy
     
PolicyDocument:
       
Version: '2012-10-17'
       
Statement:
         
- Effect: Allow
           
Action:
             
- s3:AbortMultipartUpload
             
- s3:GetBucketLocation
             
- s3:GetObject
             
- s3:ListBucket
             
- s3:ListBucketMultipartUploads
             
- s3:PutObject
           
Resource:
             
- !Sub 'arn:aws:s3:::${Step5S3Bucket}'
             
- !Sub 'arn:aws:s3:::${Step5S3Bucket}*'
     
Roles:
       
- !Ref 'Step4deliveryRole'

 
Step4LogGroupFirehose:
   
Type: AWS::Logs::LogGroup
   
Properties:
     
LogGroupName: !Sub '/aws/firehose/${Step4deliverystream}'
     
RetentionInDays: !Ref 'CWlogsExpiredate'

 
Step5S3Bucket:
   
Type: AWS::S3::Bucket
   
DeletionPolicy: Delete
   
Properties:
     
LifecycleConfiguration:
       
Rules:
         
- Id: AutoDelete
           
Status: Enabled
           
ExpirationInDays: !Ref 'S3Expiredate'
     
NotificationConfiguration:
       
TopicConfigurations:
         
- Event: s3:ObjectCreated:*
           
Filter:
              S3Key
:
               
Rules:
                 
- Name: suffix
                   
Value: .gz
                 
- Name: prefix
                   
Value: firehose/cloudfront_logs/
           
Topic: !Ref 'Step6SnsTopic'
     
VersioningConfiguration:
       
Status: Enabled

 
Step6SnsTopicPolicy:
   
Type: AWS::SNS::TopicPolicy
   
Properties:
     
PolicyDocument:
       
Version: '2012-10-17'
       
Id: MyTopicPolicy
       
Statement:
       
- Sid: allow-publish-s3
         
Effect: Allow
         
Principal:
           
Service:
           
- s3.amazonaws.com
         
Action:
         
- sns:Publish
         
Resource: !Ref 'Step6SnsTopic'
     
Topics:
     
- !Ref 'Step6SnsTopic'

 
Step6SnsTopic:
   
Type: AWS::SNS::Topic
   
Properties:
     
DisplayName: s3-trigger-firehose-output
     
Subscription:
     
- Endpoint: !GetAtt 'Step7LambdaFunction.Arn'
       
Protocol: lambda

 
Step7LambdaPermission:
   
Type: AWS::Lambda::Permission
   
Properties:
     
FunctionName: !Ref 'Step7LambdaFunction'
     
Action: lambda:InvokeFunction
     
Principal: sns.amazonaws.com
     
SourceArn: !Ref 'Step6SnsTopic'
 
Step7LogGroupLambda:
   
Type: AWS::Logs::LogGroup
   
Properties:
     
LogGroupName: !Sub '/aws/lambda/${Step7LambdaFunction}'
     
RetentionInDays: 14
 
Step7LambdaRole:
   
Type: AWS::IAM::Role
   
Properties:
     
AssumeRolePolicyDocument:
       
Version: '2012-10-17'
       
Statement:
         
- Effect: Allow
           
Principal:
             
Service:
               
- lambda.amazonaws.com
           
Action:
             
- sts:AssumeRole
     
Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: '*'
              - Effect: Allow
                Action:
                  - sns:Publish
                Resource: '*'
              - Effect: Allow
                Action:
                  - s3:GetObject
                Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
              - Effect: Allow
                Action:
                  - s3:PutObject
                Resource: !Sub 'arn:aws:s3:::${AWS::StackName}-*'
              - Effect: Allow
                Action:
                  - sqs:SendMessage
                Resource: !GetAtt [Step7SqsDeadLetterQueue, Arn]

  Step7SqsDeadLetterQueue:
    Type: AWS::SQS::Queue
    Properties:
      MessageRetentionPeriod: 1209600

  Step7LambdaFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.lambda_handler
      DeadLetterConfig:
        TargetArn: !GetAtt [Step7SqsDeadLetterQueue, Arn]
      Role: !GetAtt 'Step7LambdaRole.Arn'
      Code:
        ZipFile: !Sub |
          import boto3
          import json
          import os
          import urllib.parse
          def lambda_handler(event, context):
            z = parse_s3_event(event)
            bucket_name = z['bucket_name']
            key = z['key']
            new_key = get_key_with_partition(key)
            new_bucket = os.environ['s3_bucket']
            s3 = boto3.client('s3')
            r = s3.copy_object(Bucket=new_bucket, Key=new_key, CopySource={'Bucket': bucket_name, 'Key': key})
          def parse_s3_event(event):
            a = json.loads(event['Records'][0]['Sns']['Message'])
            z = {}
            z['bucket_name'] = a['Records'][0]['s3']['bucket']['name']
            z['key'] = urllib.parse.unquote_plus(a['Records'][0]['s3']['object']['key'], encoding='utf-8')
            return z
          def get_key_with_partition(key):
            a = key.split('/')
            a.reverse()
            z = {}
            z['
filename'] = a[0]
            z['
hour'] = a[1]
            z['
day'] = a[2]
            z['
month'] = a[3]
            z['
year'] = a[4]
            z['
prefix'] = get_key_prefix(key)
            f = z['
prefix'] + '/' + 'dt=' + z['year'] + '-' + z['month'] + '-' + z['day'] + '-' + z['hour'] + '/' + z['filename']
            return f
          def get_key_prefix(key):
            a = key.split('
/')
            b = len(a) - 5
            d = []
            for c in a[:b]:
              d.append(c)
            e = '
/'.join(d)
            return e
      Runtime: python3.6
      MemorySize: 128
      Timeout: 300
      Description: Copy the S3 file output by Firehose for Athena (with partition)
      Environment:
        Variables:
          CfnStackName: !Sub '
${AWS::StackName}'
          s3_bucket: !Ref '
Step8S3Bucket'
      Tags:
        - Key: CloudformationArn
          Value: !Ref '
AWS::StackId'

  Step8S3Bucket:
    Type: AWS::S3::Bucket
    DeletionPolicy: Delete
    Properties:
      LifecycleConfiguration:
        Rules:
          - Id: AutoDelete
            Status: Enabled
            ExpirationInDays: !Ref '
S3AthenaExpiredate'

Outputs:
  S3BucketSource:
    Value: !Ref '
Step1S3Bucket'
  S3BucketJson:
    Value: !Ref '
Step5S3Bucket'
  S3BucketAthena:
    Value: !Ref '
Step8S3Bucket'