AWS WAFのフルログをAthenaで分析できるようにしてみた
こんにちは、臼田です。
皆さん、WAFWAFしてますか?(思いつき
ついにAWS WAFでフルログを取得することができるようになりました!すっごいわくわくしてます。
これまでは頑張ってsampleログを取得してゴニョゴニョしていましたが、これからはKinesis Data Firehoseにつっこんでやりたい放題です!
今回はとりあえずそのままS3に吐き出されたログをAthenaで分析できるようにしてみます。
ログをS3に入れるところまでの方法は上記記事を参考にしてください。
テーブル作成
ログのフォーマットはこちらにあります。
CREATE TABLEは下記のようになります。
CREATE EXTERNAL TABLE IF NOT EXISTS waflogs(`timestamp` int,formatVersion int,webaclId string,terminatingRuleId string,terminatingRuleType string,action string,httpSourceName string,httpSourceId string,ruleGroupList array < struct < ruleGroupId:string, terminatingRule:string, nonTerminatingMatchingRules:array < struct < action:string, ruleId:string > >> >,rateBasedRuleList array < struct < rateBasedRuleId:string, limitKey:string, maxRateAllowed:int> >,nonTerminatingMatchingRules array < struct < action:string, ruleId:string> >,httpRequest struct < clientIp:string, country:string, HTTPVersion:string, headers:array < struct < name:string, value:string > >, uri:string, args:string, httpVersion:string, httpMethod:string, requestId:string>)ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' |
いくつかログのフォーマットについて説明します。
timestamp
sampleログをSDKなどで取得していたらあまり気にすることはなかったのですが、フルログではunixtimeで格納されます。ちなみに、1535785722145のようにミリ秒付きなので、加工に困ります。
terminatingRule
terminatingRuleIdやterminatingRuleTypeは実際に最終的にリクエストが処理されたRuleに関するログです。途中で通過したRuleは2箇所のnonTerminatingMatchingRulesに出てきます。どのルールにも引っかからなければIdはDefault_Actionになります。
action
WAFを利用しているとactionとしてはALLOW, BLOCK, COUNTの3つが出てきますが、上記仕様上最終処理まで行ったあとのログのため、全体としてのactionはCOUNTになりません。nonTerminatingMatchingRulesのactionのみCOUNTとなります。
httpSource
CloudFrontのwebACLか、ALBのwebACLかがわかります。
ruleGroupList
AWS WAFのRuleにはRule・RuleGroup・RateBasedがあります。RuleGroupはマネージドルールやFirewall Managerのルールで、通常WAF画面で設定するものがRuleです。普段は意識することはないですが、API等では別れているので注意が必要です。ログでも別れています。
uri・args
sampleログの時はURLにまとめてQueryStringが格納されていましたが、フルログでは分かれるようになっています。使いやすくなった感じですね。
よく使うクエリ
とりあえず一覧
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, * FROM waflogs; |
単純に*するとtimestampがunixtimeでしか表示されず見づらいので、先頭にJSTの時間をつけてみました。
コツはミリ秒があるunixtimeなので、1000で割ってあげるのと(ミリ秒切り捨てになるけど見る分にはそこまで気にしない)from_unixtimeでtimestamp型にすると共にJSTに変換します。
timestampによる絞り込み
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, * FROM waflogs WHERE date(from_unixtime(timestamp/1000, 'Asia/Tokyo')) = date '2018-09-02' ORDER BY timestamp; |
こちらも先ほどと同じで時間を条件にするためにJSTに変換してから比較しています。ORDER BYは特に気にしなくていいのでそのままtimestampを使っています。
BLOCKのログ
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, * FROM waflogs WHERE action = 'BLOCK'; |
COUNTのログ
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, * FROM waflogs, UNNEST(nonTerminatingMatchingRules) t(nonTermRule) WHERE nonTermRule.action = 'COUNT'; |
通常RuleのCOUNTを取り出すクエリ。nonTerminatingMatchingRulesにUNNESTを使って頑張っています。COUNTが出るところはruleGroupListの中にもあるけど、こちらのCOUNTもやろうとするとすごく頑張らないといけないので割愛。
日本から来ていてBLOCKされたログ
海外ならいざ知らず、日本から来ていたら誤検知かもしれないので確認したくなります。
SELECT from_unixtime(timestamp/1000, 'Asia/Tokyo') AS JST, * FROM waflogs WHERE httpRequest.country = 'JP' AND action = 'BLOCK'; |
国毎のログ数の集計
JP以外が多いと、気になりますね。明らかに必要なければGeoIPのルールを手前に入れて止めることも検討します。
SELECT httpRequest.country, COUNT(httpRequest.country) AS count FROM waflogs GROUP BY httpRequest.country ORDER BY count DESC; |
まとめ
AWS WAFのフルログをAthenaで分析できるようにしてみました。ついでにいくつか参考クエリを用意してみました。
これまではログの一部しか取得できていませんでしたが、すべてのログが取れることで分析上で正確な情報を出すことができるので、バンバン使っていきたいですね!