SSTエンジニアブログ

SSTのエンジニアによるWebセキュリティの技術を中心としたエンジニアブログです。

フルログのその先。腑に落ちるAWS WAFログ解析基盤を構築してみた。~ログ保存編~

はじめに

事業開発部と研究開発部に属している宇田川です。

今回の内容は、11/08に開催のSecurity-JAWSで登壇させていただたいた内容の構築方法を紹介させていただきます。 書き進めるうちに長くなってしまったので、今回の「ログ保存編」と「見える化編」に分けさせていただきました。
最終的には、下記のような解析基盤ができます。
f:id:woodykedner:20181024163039j:plain

(左上)国別「action」の比率が表示される世界地図
(右上)時間に関係なく国別の比率が見たいので円グラフ
(左下)分単位での検知したルール別に名前入りで集計されるグラフ
(右下)時間に関係なくルール名別の比率が見たいので円グラフ

構成

構成は下記の通りです。
f:id:woodykedner:20181025131507j:plain

AWS WAFのほかにKinesis Data Firefose、Lambda、S3、Athena、QuickSightを使用しています。
それぞれ北バージニアリージョンで使用しています。
東京リージョンでもすべて使えるAWSサービスなので、同じ手順でできると思います。

AWS WAFと連携するCloudFrontかALBを用意

AWS WAFと連携するCloudFrontかALBを用意してください。
CloudFront、ALBの設定方法は割愛させていただきます。
ALBの場合、Lambdaの環境変数設定時にリージョン名(例 東京 ap-northeast-1、北バージニア us-east-1)が必要になるので、用意しておいてください。

カスタムログ、オリジナルログ保存用のs3バケットを作成

「Amazon S3」の画面で、カスタムログ、オリジナルログ保存用のs3バケットを作成します。

(参考)バケットの作成 - Amazon Simple Storage Service

まずカスタムログ保存用のs3バケットを作成します。

  • バケット名を指定
    注意!
    S3のバケット名は、全リージョンにおいてユニークである必要があります。
    説明上、「aws-waf-logs-custom-s3-jawstest」 としていますが、ユニークなバケット名を適宜設定してください。
  • リージョンを「US East(N.Virginia)」 その他の設定は何も設定せず、そのままとして、作成します。

オリジナルログ保存用のS3バケットも作成します。
オリジナルログ保存用のS3バケット名もユニークなバケット名を適宜設定してください。 説明上、バケット名を「aws-waf-logs-org-s3-jawstest」とします。それ以外はカスタムログ保存用と同じ設定で作成します。

注意!
AWS WAFのフルログはすべてのリクエストのログが保存されます。
よって、設置するサイトにより、ログがかなり貯まります。
その分料金もかかりますので、S3のライフサイクル機能などを使い、適宜、保存量は調整してください。
(参考)オブジェクトのライフサイクル管理 - Amazon Simple Storage Service

Lambda用IAMロールを作成

(参考)AWS Lambda アクセス権限モデル - AWS Lambda

このロールを使用するサービスを選択で[Lambda]を選択します。

下記のポリシーをアタッチしてロールを作成してください。

  • AWSWAFReadOnlyAccess
  • AWSLambdaBasicExecutionRole

ロール名 は「aws-waf-logs-custom-lambda-role」とします。
これでロールを作成してください。

f:id:woodykedner:20181101163915p:plain

LambdaにアップのためのソースZIPを作成。

(参考)Python 用の AWS Lambda デプロイパッケージの構築

進めるために、python2.7の実行環境を整えてください。

下記のページを見るとLambda 実行環境が Amazon Linux AMI バージョン (AMI 名: amzn-ami-hvm-2017.03.1.20170812-x86_64-gp2)を使用しているので、EC2でインスタンスを構築し、その上でソースZIPを作成しました。
(参考)Lambda 実行環境と利用できるライブラリ - AWS Lambda

LambdaはApexやserverless Frameworkなど便利な管理ツールが出ておりますが、私がこのあたり不慣れなので、手間ですが、ソースZIPを作成してLambdaにアップします。
便利な管理ツールを使い慣れている方は適宜そちらでLambdaへのアップを行ってください。
下記は参考までに

それでは手順です。

まず、ディレクトリを作成します。

$ mkdir aws-waf-logs-custom-function

Lambdaのソースを「lambda_function.py」として、上記のディレクトリに保存します。

# coding: UTF-8

from __future__ import print_function

import os
import base64
import json
import boto3
import boto3.session
import logging
import re
import geoip2.database

from botocore.exceptions import ClientError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

REGION = os.getenv('REGION', '')

def lambda_handler(event, context):
    # 出力データ格納用の配列
    output = []

    # 一度取得したルールIDとルール名のセットの格納用
    rules_tmp = {}

    # 一度取得した経度緯度とIPアドレスのセットの格納用
    ip_geos_tmp = {}
    
    # webACL名を初期化
    webaclName = ''

    # ローカルIP検知用の正規表現
    rfc1918 = re.compile('^(10(\.(25[0-5]|2[0-4][0-9]|1[0-9]{1,2}|[0-9]{1,2})){3}|((172\.(1[6-9]|2[0-9]|3[01]))|192\.168)(\.(25[0-5]|2[0-4][0-9]|1[0-9]{1,2}|[0-9]{1,2})){2})$')
    
    # 経度緯度取得用のgeoip2のReaderオブジェクトを生成
    reader = geoip2.database.Reader('./GeoLite2-City.mmdb')

    for record in event['records']:

        # base64でエンコードされているリクエストデータをデコード
        log_data = json.loads(base64.b64decode(record['data']))
        
        # aws wafアクセス用のwafオブジェクトを生成。
        # グローバル領域に設定しても良かったが、リクエストデータ内の'httpSourceName'からALBかCF(CloudFront)か取得できるため、ここで宣言。
        # kinesis Data Firehoseは、ALB用とCloudFront用で分けましょう。混在するとwafオブジェクトが複雑になる...
        if 'waf' not in locals():
            # select client type
            if log_data['httpSourceName'] == 'ALB':
                if REGION != "":
                    session = boto3.session.Session(region_name=REGION)
                    waf = session.client('waf-regional')
                else:
                    logger.error("REGION for ALB is not set.")
            else:
                waf = boto3.client('waf')
        
        # IPアドレスから経度緯度をgeoip2から取得。ローカルIPは除外。
        if not rfc1918.match(log_data['httpRequest']['clientIp']):
            if log_data['httpRequest']['clientIp'] not in ip_geos_tmp:
                response = reader.city(log_data['httpRequest']['clientIp'])
                log_data['latitude'] = response.location.latitude
                log_data['longitude'] = response.location.longitude

                ip_geos_tmp[log_data['httpRequest']['clientIp']] = {'latitude':log_data['latitude'],'longitude':log_data['longitude']}
            else:
                log_data['latitude'] = ip_geos_tmp[log_data['httpRequest']['clientIp']]['latitude']
                log_data['longitude'] = ip_geos_tmp[log_data['httpRequest']['clientIp']]['longitude']

        # webACLに関する情報を取得。ついでにリクエストデータにwebACLも追加。
        if webaclName == '':
            try:
                acl = waf.get_web_acl(WebACLId=log_data['webaclId'])
            except ClientError as e:
                logger.error(
                    "Error get_web_acl: %s", e, exc_info=True)
            webaclName = acl['WebACL']['Name']
            
        log_data['webaclName'] = webaclName

        # ここから、ルール名の取得。    
        if str(log_data['terminatingRuleId']) != 'Default_Action':

            terminatingRuleName = ''
            for rule in acl['WebACL']['Rules']:
                RuleId = log_data['terminatingRuleId']

                # サードパーティのマネージドルール、ルールグループのルールセットで検知したリクエストの場合。
                if log_data['terminatingRuleType'] == 'GROUP':
                    if RuleId not in rules_tmp:
                        try:
                            list_subscribed_rule_groups = waf.list_subscribed_rule_groups()
                        except ClientError as e:
                            logger.error(
                                "Error list_subscribed_rule_groups : %s", e, exc_info=True)
                        SubscribedManagedRules = list_subscribed_rule_groups['RuleGroups']
                        
                        # Get Third Party Managed Rule Name
                        RuleNames = [
                            x['Name'] for x in SubscribedManagedRules if x['RuleGroupId'] == RuleId]
                        terminatingRuleName = RuleNames[0] if len(RuleNames) else ''
                        rules_tmp[RuleId] = terminatingRuleName
                    else:
                        terminatingRuleName = rules_tmp[RuleId]

                # レートベースのルールに検知したリクエストの場合     
                elif log_data['terminatingRuleType'] == 'RATE_BASED':
                    if RuleId not in rules_tmp:
                        try:
                             RuleInfo = waf.get_rate_based_rule(RuleId=RuleId)
                        except ClientError as e:
                             logger.error(
                                 "Error get_rate_based_rule : %s", e, exc_info=True)

                        terminatingRuleName = RuleInfo['Rule']['Name']
                        
                        rules_tmp[RuleId] = RuleInfo['Rule']['Name']
                    else:
                        terminatingRuleName = rules_tmp[RuleId]

                # 一般的なルールに検知したリクエストの場合     
                elif log_data['terminatingRuleType'] == 'REGULAR':
                    if RuleId not in rules_tmp:
                        try:
                            RuleInfo = waf.get_rule(RuleId=RuleId)
                        except ClientError as e:
                             logger.error(
                                 "Error get_rule : %s", e, exc_info=True)

                        terminatingRuleName = RuleInfo['Rule']['Name']
                        
                        rules_tmp[RuleId] = RuleInfo['Rule']['Name']
                        
                    else:
                        terminatingRuleName = rules_tmp[RuleId]
                        
            log_data['terminatingRuleName'] = terminatingRuleName
            
            terminatingRuleName = ''

        else:
            # 'Default_Action'リクエストは、そのまま'terminatingRuleName'に格納。
            log_data['terminatingRuleName'] = 'Default_Action'

        # httpRequest内の階層構造を無くす。
        httpRequestTerms = ['clientIp','country','httpMethod','uri','args','requestId']
        for term in httpRequestTerms:
            log_data[term] = log_data['httpRequest'][term]

        # httpRequestのヘッダー内の情報を取得
        for header in log_data['httpRequest']['headers']:
            log_data[header['name'].lower()] = header['value']

        # データをエンコードしてoutput配列に追加
        output.append(
            {
                'recordId': record['recordId'],
                'result': 'Ok',
                'data': base64.b64encode(json.dumps(log_data)+"\n")
            }
            )
    # geoip2のReader オブジェクトをクローズ
    reader.close()

    return {'records': output}

GeoIP2無料版のデータベースをダウンロードします。 下記、URLにアクセスし、
GeoLite2 « MaxMind Developer Site
GeoLite2 Cityの「MaxMind DB バイナリー、gzip 圧縮ファイル」をダウンロード。

ダウンロードした「GeoLite2-City.mmdb.gz」を解凍して、「GeoLite2-City.mmdb」をlambda_function.pyと同じ階層に置きます。

geoip2のライブラリをインストール。

$ pip install geoip2 -t

zipで固めます。lambda_function.pyがある階層で下記のコマンドを実行してください。

$ zip -r ../aws-waf-logs-custom-function.zip *

フォルダごと固めるのではなく、中身だけ固めるようにしています。
フォルダごと固めるとlambdaがエラーとなり動きません。

Lambdaに設置

AWS マネージメントコンソールのサービスで「Lambda」を選択。

「関数の作成」

「一から作成」にチェックし、
名前は 「aws-waf-logs-custom-function」
ランタイム python2.7 ロール 上記で作成した「aws-waf-logs-custom-lambda-role」 で、「関数の作成」。 一旦、関数を作ります。

f:id:woodykedner:20181101164041p:plain

コードエントリタイプで「.zipファイルをアップロード」を選択し、関数パッケージで上記で作成したソースZIPを選択。

その他の設定も行う。 基本設定は下記を設定。

  • メモリ 128MB
  • タイムアウト 15分
    ※最近MAXが5分から15分になった。余裕を持たせるため、MAXの15分に設定しておく。

AWS WAFをアタッチするのがALBの場合、環境変数にリージョンを設定する。

  • キー REGION
  • 値 リージョン名(例えば 東京 ap-northeast-1、北バージニア us-east-1)

上記を設定したら、「保存」をクリック

f:id:woodykedner:20181101164236p:plain

Kinesis Data Firefoseの設定

AWS マネージメントコンソールのサービスで「Kinesis」を選択。 [配信ストリームを作成] を選択します。

「Delivery stream name」と「source」を設定
!注意!
Delivery stream nameは適当に付けてはいけません。
AWS WAFは「aws-waf-logs-」という名前で始まるKinesis Data Firehoseしか選択できません。
なので、ここではDelivery stream nameを「aws-waf-logs-custom-firefose-stream」とします。
Source は[Direct PUT or other sources]
「Next」で次のページへ。

f:id:woodykedner:20181101164703p:plain

Record transformationを「Enable」にして、
上記で作成したLambdaファンクションを選択します。
Record format conversionは「Disabled」。
「Next」で次のページへ。

f:id:woodykedner:20181101164725p:plain

Destinationは「Amazon S3」
S3 bucketに上記で作成したカスタムログ保存用のS3バケット「aws-waf-logs-custom」、Prefixは空のままにします。
Source record S3 backupを「Enabled」として、
Backup S3 bucketに上記で作成したオリジナルログ保存用のS3バケット「aws-waf-logs-org」、
Prefixは、「source_records/」を削除して、空のままにします。
「Next」で次のページへ。

f:id:woodykedner:20181101164747p:plain

S3への保存関連の設定をします。
S3 buffer conditions

  • Buffer size 5 MB
  • Buffer interval 300 seconds

S3 compression and encryption

  • S3 compression Disabled
    テストということで、圧縮は無しとしています。

  • S3 encryption Disabled

Error logging

  • Error logging Enabled

f:id:woodykedner:20181101164810p:plain

IAM roleで「Create new or choose」をクリックし、IAMロールを作成する別ページを開きます。
IAMロール 「新しいIAMロールの作成」 を選択し、ロール名 「firehose_delivery_role」が入っていますが、AWSWAFのログ用と分かるようにしておきたいので、「aws-waf-logs-firehose-delivery-role」とします。

f:id:woodykedner:20181101164825p:plain

「Next」で次のページへ。

確認画面で、問題無ければ、「Create delivery stream」をクリックします。

Data Firehoseの一覧画面に移動し、作成したData FirehoseのStatusがActiveになるまで待ちます。

(参考)Amazon Kinesis Data Firehose 配信ストリームの作成 - Amazon Kinesis Data Firehose

AWS WAFの設定

webACL、Ruleの設定は割愛させていただきます。

設定していない場合は、AWSよりOWASP TOP 10向けのAWS WAFの設定がcloud formationで提供されているので、そちらを試してみてください。
Use AWS WAF to Mitigate OWASP’s Top 10 Web Application Vulnerabilities

下記のyamlファイルをcloudformationに読ませれば、簡単に設定できます。
https://s3.us-east-2.amazonaws.com/awswaf-owasp/owasp_10_base.yml

途中でパラメータの設定が出てきますが、「Apply to WAF」でCloudFrontにアタッチする場合は「Global」、ALBにアタッチする場合は、AWSのマネージメントコンソールが作成したいリージョンに設定されていることを確認してから、「Regional」に設定してください。 その他の設定は基本デフォルトとしてください。

ログの機能を有効にしてみます。
「Web ACLs」から対象のwebACLを選択します。
「Logging」タブを選択し、「Enable Logging」をクリックします。

f:id:woodykedner:20181101164852p:plain

Amazon Kinesis Data Firehoseで上記で作成した「aws-waf-logs-custom-stream」を選択します。
Redacted fields内は、設定せずにそのままとして、「Create」をクリックし、AWS WAFのログ設定を有効にします。

f:id:woodykedner:20181101164915p:plain

ログ保存テスト

それでは、ログがS3に保存されるか見てみましょう。

curlでSQLインジェクションのアクセスをしてみます。

curl http://cloudfront か ALBのエンドポイント/?username=1'%20or%20'1'%20=%20'1\\&password=1'%20or%20'1'%20=%20'1

大体3~5分ぐらいで、S3のカスタム、オリジナルのそれぞれのバケットにログファイルが保存されます。

ログの内容は下記のようになっています。 json形式のログをAWS Athenaで解析する場合は、1つ1つのデータが改行されている必要があるため、このような形で保存します。

{"httpRequest": {"httpMethod": "GET", "country": "JP", "args": "username=1%20or%201%20=%201%5C%5C&password=1%20or%201%20=%201", "uri": "/", "headers": [{"name": "Host", "value": "X.X.X.X"}, {"name": "User-Agent", "value": "Mozilla/5.0 (Windows NT; Windows NT 10.0; ja-JP) WindowsPowerShell/5.1.17134.228"}], "requestId": "XXXXXAdQ_PYP6N0_gF2A5Q4bmi9OLX1KJTMh0zCClja4AbhpDXXXXX==", "clientIp": "X.X.X.X", "httpVersion": "HTTP/1.1"}, "ruleGroupList": [], "terminatingRuleId": "XXXXXb07-65b0-402d-844f-72f9f59XXXXX", "terminatingRuleName": "udatest-mitigate-sqli", "rateBasedRuleList": [], "webaclName": "udatest-owasp-acl", "user-agent": "Mozilla/5.0 (Windows NT; Windows NT 10.0; ja-JP) WindowsPowerShell/5.1.17134.228", "nonTerminatingMatchingRules": [], "formatVersion": 1, "requestId": "XXXXXAdQ_PYP6N0_gF2A5Q4bmi9OLX1KJTMh0zCClja4AbhpDXXXXX==", "latitude": 35.867, "webaclId": "XXXXXe6d-c215-4339-a355-d8585eeXXXXX", "timestamp": 1541384906137, "args": "username=1%20or%201%20=%201%5C%5C&password=1%20or%201%20=%201", "httpSourceId": "XXXXXAPJSVPQHV", "httpMethod": "GET", "host": "X.X.X.X", "country": "JP", "httpSourceName": "CF", "uri": "/", "longitude": 139.7555, "terminatingRuleType": "REGULAR", "action": "BLOCK", "clientIp": "X.X.X.X"}

見やすくすると、こうなります。

{
    "httpRequest": {
        "httpMethod": "GET",
        "country": "JP",
        "args": "username=1%20or%201%20=%201%5C%5C&password=1%20or%201%20=%201",
        "uri": "/",
        "headers": [
            {
                "name": "Host",
                "value": "X.X.X.X"
            },
            {
                "name": "User-Agent",
                "value": "Mozilla/5.0 (Windows NT; Windows NT 10.0; ja-JP) WindowsPowerShell/5.1.17134.228"
            }
        ],
        "requestId": "XXXXXAdQ_PYP6N0_gF2A5Q4bmi9OLX1KJTMh0zCClja4AbhpDXXXXX==",
        "clientIp": "X.X.X.X",
        "httpVersion": "HTTP/1.1"
    },
    "ruleGroupList": [],
    "terminatingRuleId": "XXXXXb07-65b0-402d-844f-72f9f59XXXXX",
    "terminatingRuleName": "udatest-mitigate-sqli",
    "rateBasedRuleList": [],
    "webaclName": "udatest-owasp-acl",
    "user-agent": "Mozilla/5.0 (Windows NT; Windows NT 10.0; ja-JP) WindowsPowerShell/5.1.17134.228",
    "nonTerminatingMatchingRules": [],
    "formatVersion": 1,
    "requestId": "XXXXXAdQ_PYP6N0_gF2A5Q4bmi9OLX1KJTMh0zCClja4AbhpDXXXXX==",
    "latitude": 35.867,
    "webaclId": "XXXXXe6d-c215-4339-a355-d8585eeXXXXX",
    "timestamp": 1541384906137,
    "args": "username=1%20or%201%20=%201%5C%5C&password=1%20or%201%20=%201",
    "httpSourceId": "XXXXXAPJSVPQHV",
    "httpMethod": "GET",
    "host": "X.X.X.X",
    "country": "JP",
    "httpSourceName": "CF",
    "uri": "/",
    "longitude": 139.7555,
    "terminatingRuleType": "REGULAR",
    "action": "BLOCK",
    "clientIp": "X.X.X.X"
}

httpRequestの内の要素を展開して、第一階層に置いています。httpRequestは削っても良いのですが、統計を取るとき以外に元のリクエスト内容の全体像を見たい時もあるので一応残しています。

今回はここまで。
見える化編に続きます。