SSTエンジニアブログ

SSTのエンジニアによるWebセキュリティの技術を中心としたエンジニアブログです。

ScutumAPIリリース記念 第二弾 詳細な防御ログをAWSで自動取得してみた。

はじめに

こんにちは、研究開発の宇田川です。

前回のブログでは、防御ログ(リスト)をAWSを使って自動取得する方法を公開させていただきました。

techblog.securesky-tech.com

第二弾は、AWSサービスを利用して詳細な防御ログの自動取得する方法を公開させていただきます。

ちなみにもっと早く公開する予定でしたが、書き始めてから構成を変更すること3回。そして、結構な長文になってしまいました…
斜め読みでもいいので目を通していただければ幸いです。

防御ログ(詳細)とは

防御ログ(詳細)について説明する前に防御ログ(リスト)で取得できる情報をおさらいします。
防御ログ(リスト)で取得できる要素を表にすると下記の通りとなります。

要素名 説明
log_id 防御ログID
ip 送信元のIPアドレス
block ブロックの有無
category 検知した攻撃分類、複数の攻撃分類を検知する場合あり
uri リクエストURI
ts Scutumが通信を検知・ブロックした日時

検知又はブロックした攻撃の集計する場合はこれで足りると思いますが、リクエストどこに攻撃が含まれているかを分析したい場合は、リクエストのメソッド、クエリストリング、ヘッダー、ボディ等の情報が含まれていないため、防御ログ(リスト)では不十分です。また、レスポンスの内容により検知又はブロックされる場合もありますので、その時のレスポンスの内容についても取得することができません。
このような問題に対応するため、防御ログ(詳細)では、Scutumで検知又はブロックされたHTTPリクエストの防御ログ(リスト)の情報に加え、リクエストのメソッド、クエリストリング、ヘッダー、ボディ、レスポンス内容が取得することができ、より深い分析をすることができます。

早速、防御ログ(詳細)の取得の方法を見ていきましょう。

防御ログ(詳細)のURLは

https://api.scutum.jp/api/v1/alert_detail

です。

必要な情報を設定していきます。
必須なのが、Scutumログ取得対象FQDNを指定するhostパラメータとScutum管理サイトにログインする際に使用するユーザIDをしているidパラメータです。この2つは防御ログ(リスト)と同じですね。
防御ログ(詳細)の取得にはもう1つ必要になります。それは、防御ログ1つ1つに割り振られる固有の文字列の防御ログIDです。防御ログIDは、防御ログ(リスト)の要素のため、防御ログ(リスト)を取得した後に防御ログIDを使用して防御ログ(詳細)を取得することとなります。

Scutumログ取得対象FQDNの設定がhost=www.example.jp、Scutum管理サイトにログインする際に使用するユーザIDがid=ABC1234、防御ログIDがlog_id=1234567891234_567_89123とした場合、設定例は下記の通りになります。

https://api.scutum.jp/api/v1/alert_detail?host=www.example.jp&id=ABC1234&log_id=1234567891234_567_89123

忘れずにScutumのAPIキーをリクエストのX-Scutum-API-Keyヘッダーに追加します。APIキーのみURLパラメータでは無いので注意しましょう。
上記設定例を使用して、curlコマンドを書くと下記のようになります。

curl -H 'X-Scutum-API-Key: xxxxf5d161b33b6xxxxaa8f8ccfdfdfxxxx' "https://api.scutum.jp/api/v1/alert_detail?host=www.example.jp&id=ABC1234&log_id=1234567891234_567_89123"

APIへのリクエストが成功すると、json形式のレスポンスが返ります。

{
    "log_id": "1234567891234_567_89123",
    "request": "UE9TVCAvdGVzdCBIVFRQLzEuMQ0KSG9zdDogd3d3LmV4YW1wbGUuanANClVzZXItQWdlbnQ6IGN1cmwvNy41OC4wDQpBY2NlcHQ6ICovKg0KQ29udGVudC1MZW5ndGg6IDM0DQpDb250ZW50LVR5cGU6IGFwcGxpY2F0aW9uL3gtd3d3LWZvcm0tdXJsZW5jb2RlZA0KWC1Gb3J3YXJkZWQtRm9yOiB4LngueC54DQpYLUZvcndhcmRlZC1Gb3IyOiB4LngueC54DQpYLUNsaWVudC1Qb3J0OiA2Mzc2Ng0KDQo8c2NyaXB0PmFsZXJ0KCJTU1RURVNUIik7PC9zY3JpcHQ+DQo=",
    "ip": "x.x.x.x",
    "block": true,
    "category": [
        "クロスサイトスクリプティング攻撃"
    ],
    "uri": "/test",
    "ts": "2020-00-00T00:00:00+09:00"
}

各要素の説明は下記の通りです。

要素名 説明
log_id 防御ログID
ip 送信元のIPアドレス
block ブロックの有無
category 検知した攻撃分類、複数の攻撃分類を検知する場合あり
uri リクエストURI
ts Scutumが通信を検知・ブロックした日時
request Base64エンコードされたHTTPリクエストデータ
(Scutumシステムに記録がある場合のみ)
response Base64エンコードされたHTTPレスポンスのデータ
(Scutumシステムに記録がある場合のみ)

log_id、ip、block、category、uri、tsは防御ログ(リスト)と共通です。
赤く色のついたrequestとresponseのみ防御ログ(詳細)で取得可能となっています。

では、requestの内容をbase64コマンドで見てみます。

$ echo 'UE9TVCAvdGVzdCBIVFRQLzEuMQ0KSG9zdDogd3d3LmV4YW1wbGUuanANClVzZXItQWdlbnQ6IGN1cmwvNy41OC4wDQpBY2NlcHQ6ICovKg0KQ29udGVudC1MZW5ndGg6IDM0DQpDb250ZW50LVR5cGU6IGFwcGxpY2F0aW9uL3gtd3d3LWZvcm0tdXJsZW5jb2RlZA0KWC1Gb3J3YXJkZWQtRm9yOiB4LngueC54DQpYLUZvcndhcmRlZC1Gb3IyOiB4LngueC54DQpYLUNsaWVudC1Qb3J0OiA2Mzc2Ng0KDQo8c2NyaXB0PmFsZXJ0KCJTU1RURVNUIik7PC9zY3JpcHQ+DQo=' | base64 -d

下記が出力内容例となります。
ソケット通信でHTTPリクエストをする際の記述方法がそのまま取得できます。

POST /test HTTP/1.1
Host: www.example.jp
User-Agent: curl/7.58.0
Accept: */*
Content-Length: 34
Content-Type: application/x-www-form-urlencoded
X-Forwarded-For: x.x.x.x
X-Forwarded-For2: x.x.x.x
X-Client-Port: 63766

<script>alert("SSTTEST");</script>

ここまで取れれば、それぞれの要素を切り出して、好きなように分析できます。

取得の上での課題

APIアクセス制限

前述の通り、防御ログ(詳細)する際には、防御ログIDが必要となります。防御ログIDは、防御ログ(リスト)の要素のため、防御ログ(詳細)を取得するには防御ログ(リスト)を取得した後に防御ログIDを使用して防御ログ(詳細)を取得することとなります。
ここで問題になるのがScutumAPIはAPIキー毎5分間に25回以内という制限です。防御ログ(リスト)は1回のリクエストで最大1000件のログを取得することができますが、防御ログ(詳細)は1回のリクエストで1件のログしか取得することができません。
制限を超えるとAPIサーバからは429エラーが返されますので、ログを取得する側、される側の負荷を考え、無駄な取得のリクエスト実行をしないように制御する必要があります。

複数のFQDN対応

前回の防御ログ(リスト)の取得では1FQDNのみのログの取得を想定していました。
ただ、ありがたいことにScutumは1社で複数のサイトの契約をしていただいているお客様が多くいらっしゃいます。
前回の構成だとFQDN毎に前回の構成を構築、実行する必要があり、管理が大変になるため、1つの構成で複数のFQDNの防御ログ(詳細)を取得できるように対応する必要があります。

AWS自動取得構成

上記の課題を踏まえ、構築したAWS自動取得構成を見ていきます。

f:id:woodykedner:20201207153006p:plain

まず、防御ログ(詳細)の取得の流れは下記の通りとなります。

  1. CloudWatch Eventsにより1時間1回、0分に防御ログ(リスト)取得Lambdaを実行
  2. API認証情報管理DynamoDBのテーブルをスキャンし、「ScutumユーザID、FQDN、APIキー」のリストを取得
  3. 「ScutumユーザID、FQDN、APIキー」から前の1時間の防御ログ(リスト)を取得
  4. 防御ログ情報をFQDN毎のSQSのキューに格納
  5. CloudWatch Eventsにより防御ログ(詳細)取得Lambdaを5分毎に実行
    (防御ログ(リスト)取得と被って制限に引っかからないように0,5,55分は実行しようないようにしています。)
  6. FQDN毎のキューから防御ログ情報を各25件取得
  7. 防御ログIDから詳細ログ取得
  8. 圧縮した詳細ログを防御ログ(詳細)保存用S3バケットに格納

また、防御ログ(詳細)の取得の際、にFQDN毎のSQSのキューを必要としますが、これを複数FQDN毎作成、または削除するのは手間のため、API認証情報管理DynamoDBのテーブルへの「ScutumユーザID、FQDN、APIキー」の追加、削除をトリガーにして、自動で作成、削除するようにLambdaを用意しました。流れは下記の通りとなります。

FQDN毎SQSキュー自動操作(作成、削除)の流れ

  1. API認証情報管理DynamoDBのテーブルに「ScutumユーザID、FQDN、APIキー」を登録
  2. DynamoDBストリームによりアイテムの追加、削除がFQDN毎防御ログ(詳細)取得順番待ちSQSの操作Lambbaに通知される。
  3. それを受けて、FQDN毎防御ログ(詳細)取得順番待ちSQSを操作Lambbaが実行
  4. アイテムの追加、削除によりFQDN毎防御ログ(詳細)取得順番待ちSQSが作成、削除される。

リージョンは東京リージョンを使用しましたが、使用しているAWSサービスが使用可能であれば、どのリージョンでも稼働可能です。

では、1つ1つのAWSサービスの設定を見ていきましょう。

防御ログ(詳細)保存用S3

前回同様、Scutumの防御ログ保存用S3バケットを作成します。
特別な設定はしていないので、ドキュメントに沿って、S3バケットを作成してみてください。

docs.aws.amazon.com

API認証情報管理DynamoDB

DynamoDBのテーブルを作成します。
DynamoDBの画面から、「テーブルを作成」をクリックし、
テーブル名は適宜設定して、プライマリキーには、scutumIdを指定します。
テーブル設定は「デフォルト設定の使用」のままで大丈夫です。
これでテーブルを作成します。
テーブル名は、防御ログ(リスト)Lambdaの環境変数で設定するので控えておいてください。

docs.aws.amazon.com

作成したら、DynamoDBストリーム設定します。
DynamoDBの一覧画面より、上記で作成したテーブルを選択します。
「概要」タブを選択し、「DynamoDB ストリームの管理」をクリックします。
表示タイプに「キーのみ - 変更された項目のキー属性のみ。」を選択し、「有効化」をクリックします。

テーブルにScutumのユーザID、FQDN、APIキーの情報を登録します。
DynamoDBの一覧画面より、上記で作成したテーブルを選択します。
「項目」タブを選択し、「項目の作成」をクリックします。
ItemにscutumIdが入っている状態で表示されているかと思います。
scutumIdの隣にある「+」ボタンをクリックし、Apend、Stringと選択し、黄色く選択されているFIELDにhostNameと入力します。次にhostNameの隣にある「+」ボタンをクリックし、、Apend、Stringと選択し、黄色く選択されているFIELDにscutumApiKeyと入力します。

f:id:woodykedner:20201202164159p:plain

  • scutumIdのVALUEにScutumのユーザID
  • hostNameのVALUEにFQDN
  • scutumApiKeyのVALUEにAPIキー を登録します。

複数のFQDNの情報がある場合は、上記の方法でユーザID、FQDN、APIキーの情報を登録してください。

登録すると、このようになります。
私は2つのFQDNでテストを行っているため、2つ情報が登録されています。 f:id:woodykedner:20201202163007p:plain

FQDN毎防御ログ(詳細)取得順番待ちSQSを追加、削除するLambba

このLambdaでやっていることは、

  1. DynamoDBストリーム設定から、アイテム追加又はイベントがこのLambdaに通知される。
  2. 追加の場合はそのFQDN用のSQSキューを作成する。
    削除の場合は、そのFQDN用のSQSキューを削除する。

です。

Lambda設定

設定として、ランタイムはpython3.8、メモリのサイズは128MB、タイムアウトは300秒としました。

環境変数

このLambdaでは環境変数の設定はありません。

IAM ロール(ポリシー)

IAMロールに指定するポリシーは、AWSLambdaBasicExecutionRoleに、DynamoDBストリームレコードへのアクセスを許可するために必要な権限と SQSでキューの追加、削除に必要な権限を付与しています。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        },
        {
            "Action": [
                "dynamodb:DescribeStream",
                "dynamodb:GetRecords",
                "dynamodb:GetShardIterator",
                "dynamodb:ListStreams"
            ],
            "Resource": "arn:aws:dynamodb:${region}:${account_id}:table/${dynamodb_scutum_api_table}/stream/*",
            "Effect": "Allow"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:CreateQueue",
                "sqs:DeleteQueue",
                "sqs:ListQueues",
                "sqs:GetQueueUrl"
            ],
            "Resource": "*"
        }      
    ]
}

そのまま使わず、${region}、${account_id}、${dynamodb_scutum_api_table}は適宜設定を変更してください。

docs.aws.amazon.com

ソースコード

ソースコードはこちらになります。

import json
import os
import sys
import logging
import boto3
import traceback

logger = logging.getLogger()
logger.setLevel(logging.INFO)

sqs = boto3.resource('sqs')

def lambda_handler(event, context):


    for record in event["Records"]:

        queue_name = "scutum-detail-log-queue-" + record['dynamodb']['Keys']['scutumId']['S'].lower() 
        
        if record["eventName"] == "INSERT":
            try:
                # キューの名前を指定してインスタンスを取得
                queue = sqs.get_queue_by_name(QueueName=queue_name)
            except:
                # 指定したキューがない場合はexceptionが返るので、キューを作成
                queue = sqs.create_queue(QueueName=queue_name)
                logger.info("created queue:" + queue_name)

        elif record["eventName"] == "REMOVE":
            try:
                # キューの名前を指定してインスタンスを取得
                queue = sqs.get_queue_by_name(QueueName=queue_name)
            except:
                raise Exception(traceback.format_exc())

            try:
                queue.delete()
                logger.info("deleted queue:" + queue_name)
            except:
                raise Exception(traceback.format_exc())

    return {"result": "OK"}

if __name__ == "__main__":
    lambda_handler({},{})

トリガーの設定

DynamoDBのテーブルにアイテムが追加、削除されたら、このLambdaが実行されるようにトリガーの設定をします。
「トリガーの追加」をクリック。
「トリガーの選択」から、DynamoDBを選択します。
DynamoDB テーブルには、上記で作成したAPI認証情報管理DynamoDBのテーブル名を入力します。その他の設定はそのままで「追加」をクリックします。

FQDN毎防御ログ(詳細)取得順番待ちSQS

上記の、FQDN毎防御ログ(詳細)取得順番待ちSQSを追加、削除するLambbaでDynamoDBのテーブルにアイテムを追加、削除により勝手にFQDN毎キューが作成されるため、ここでは特に作成する必要はありません。

防御ログ(リスト)取得Lambdaの定期実行用CloudWatch Events

ここも前回と一緒ですね。
作成したLambda関数を定期実行するために、CloudWatch Eventsを作成します。 AWSマネージメントコンソールで作成したLambda関数を開きトリガーを追加から設定します。
トリガーをEventBridge (CloudWatch Events)に設定し、ルールを新規ルールの作成に設定、 ルール名を適宜設定し、ルールタイプをスケジュール式にして、スケジュール式には下記の内容を記述します。

cron(0 * * * ? *)

防御ログ(リスト)取得Lambda

このLambdaでやっていることは、

  1. ScutumのAPIへアクセスに必要なFQDN、 ユーザID、APIキーのレコードのリストをDynamoDBから取得
  2. APIから防御ログ(リスト)を取得
  3. 「FQDN、 ユーザID、APIキー、防御ログID」メッセージとしてSQSに作成したキューに入れる。

です。

Lambda設定

設定として、ランタイムはpython3.8、メモリのサイズは128MB、タイムアウトは300秒としました。

環境変数

環境変数には、上記で設定したAPI認証情報管理DynamoDBと防御ログ(詳細)取得順番待ちSQSの名前を設定します。

環境変数名
DYNAMODB_SCUTUM_API_TABLE API認証情報管理DynamoDBテーブルの名前

IAM ロール(ポリシー)

IAMロールに指定するポリシーは、AWSLambdaBasicExecutionRoleに、API認証情報管理DynamoDBへのスキャン権限と 防御ログ(詳細)取得順番待ちSQSへの書き込みに必要な権限を付与しています。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:SendMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl"
            ],
            "Resource": "arn:aws:sqs:${region}:${account_id}:scutum-detail-log-queue-*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "dynamodb:Scan"
            ],
            "Resource": "arn:aws:dynamodb:${region}:${account_id}:table/${dynamodb_scutum_api_table}"
        }
    ]
}

そのまま使わず、${region}、${account_id}、${dynamodb_scutum_api_table}は適宜設定を変更してください。

ソースコード

ソースコードはこちらになります。

import urllib.request
import json
import base64
import os
import sys
import logging
from datetime import datetime, timedelta, timezone
import boto3
import traceback

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# DYNAMODB_SCUTUM_API_TABLE
if os.environ['DYNAMODB_SCUTUM_API_TABLE']:
    DYNAMODB_SCUTUM_API_TABLE = os.environ['DYNAMODB_SCUTUM_API_TABLE']
    logger.info("Setting Environment variables - DYNAMODB_SCUTUM_API_TABLE :" + DYNAMODB_SCUTUM_API_TABLE )
else:
    logger.error("No Setting Environment variables - DYNAMODB_SCUTUM_API_TABLE ")
    sys.exit(1)  

# dynamodb client
dynamodb = boto3.client('dynamodb')

# sqs resource 
sqs = boto3.resource('sqs')

def get_scutum_block_logs(host, id,scutumApiKey,maker=None):

    # タイムゾーンの生成
    JST = timezone(timedelta(hours=+9), 'JST')

    now = datetime.now(JST)
    now_rounded_down = now.replace(minute=0, second=0, microsecond=0)

    one_hour_ago = now - timedelta(hours=1)
    one_hour_ago_rounded_down = one_hour_ago.replace(minute=0, second=0, microsecond=0)

    url = 'https://api.scutum.jp/api/v1'
    list_url = url + '/alert'
    list_params = {
        'host': host,
        'id' : id,
        'time_order':'asc',
        'from' : one_hour_ago_rounded_down.isoformat(),
        'to'  : now_rounded_down.isoformat()
    }

    if maker != None:
        list_params['maker'] = maker

    common_headers = {
        'X-Scutum-API-Key': scutumApiKey,
        'Connection': 'close'
    }

    list_res_json = {}
    list_req = urllib.request.Request(url='{}?{}'.format(list_url, urllib.parse.urlencode(list_params)),headers=common_headers)
    try:
        with urllib.request.urlopen(list_req) as list_res:
            list_res_json = json.load(list_res)
    except urllib.error.HTTPError as err:
        if err.code == 429:
            logger.error(err)
        else:
            logger.error(err)                        
    except:
        raise Exception(traceback.format_exc())

    return list_res_json

def lambda_handler(event, context):

    try:
        response_table_scan = dynamodb.scan(
            TableName=DYNAMODB_SCUTUM_API_TABLE
        )
    except:
        raise Exception(traceback.format_exc())

    if len(response_table_scan['Items']) > 0 :
        for item in response_table_scan['Items']:

            all_scutum_block_log = []
            maker = None

            while True:
                scutum_block_logs ={}
                scutum_block_logs = get_scutum_block_logs(item['hostName']['S'],item['scutumId']['S'],item['scutumApiKey']['S'],maker)
                if 'data' in scutum_block_logs:
                    all_scutum_block_log += scutum_block_logs['data']

                if 'next_marker' in scutum_block_logs:
                    maker = scutum_block_logs['next_marker']
                else:
                    break

            logger.info( item['hostName']['S'] +  " all_scutum_block_log: " + str(len(all_scutum_block_log)))

            if len(all_scutum_block_log) > 0:

                queue_name = "scutum-detail-log-queue-" + item['scutumId']['S'].lower() 
                try:
                    queue = sqs.get_queue_by_name(QueueName=queue_name)
                except:
                    raise Exception(traceback.format_exc())

                msg_list = []
                count = 1

                for scutum_block_log in all_scutum_block_log:
                    messagebody = {
                        'hostName' : item['hostName']['S'],
                        'scutumId': item['scutumId']['S'],
                        'scutumApiKey' : item['scutumApiKey']['S'],
                        'logId' : scutum_block_log['log_id']
                    }
                    message = {
                        'Id' : '{}'.format(count),
                        'MessageBody' : json.dumps(messagebody)
                    }
                    msg_list.append(message)

                    if count == 10:
                        try:
                            queue.send_messages(Entries=msg_list)
                        except:
                            raise Exception(traceback.format_exc())

                        count = 1
                        msg_list = []
                    else:
                        count = count + 1

                if len(msg_list) > 0:
                    try:
                        queue.send_messages(Entries=msg_list)
                    except:
                        raise Exception(traceback.format_exc())
                
    return {
        'statusCode': 200
    }

if __name__ == "__main__":
    lambda_handler({},{})

防御ログ(詳細)取得Lambdaの定期実行用CloudWatch Events

防御ログ(リスト)Lambdaと作成方法は同じです。ただ、こちらは5分毎に実行させます。
0,5,55分に実行しないのは、防御ログ(リスト)Lambdaと実行がかぶり25回/5分の制限に引っかからないようにするためです。

cron(10,15,20,25,30,35,40,45,50 * * * ? *)

防御ログ(詳細)取得Lambda

このLambdaでやっていることは

  1. SQSから最大25件のメッセージを取得
  2. メッセージ含まれる防御ログIDからAPIにアクセスし防御ログ(詳細)を取得
  3. gzipで圧縮して
  4. S3に保存
  5. SQSから取得したメッセージを削除

です。

S3に保存する際のパスは下記の通りです。

S3バケット/FQDN名/年/月/日/時/防御ログID.log.gz

保存されるログはjson形式で保存されて、下記の要素が含まれます。

要素名 説明
log_id 防御ログID
ip 送信元のIPアドレス
block ブロックの有無
category 検知した攻撃分類、複数の攻撃分類を検知する場合あり
uri リクエストURI
ts Scutumが通信を検知・ブロックした日時
httpMethod メソッド
Host Hostヘッダー(FQDN名)
User-Agent ユーザエージェントヘッダー
... ...
body Bodyのデータ
response Base64エンコードされたHTTPレスポンスのデータ

ヘッダーは、リクエストに含まれるすべてのヘッダーが出力されます。
response は、誤検知の場合、個人情報、機密情報が含まれている場合もあり、Base64エンコードされたそのままの状態としています。

Lambda設定

設定として、ランタイムはpython3.8、メモリのサイズは128MB、タイムアウトは300秒としました。

環境変数

環境変数には、上記で設定した防御ログ(詳細)取得順番待ちSQSと防御ログ(詳細)保存用S3のバケットの名前を設定します。

環境変数名
LOG_BUCKET 防御ログ(詳細)保存用S3のバケットの名前

IAM ロール(ポリシー)

IAMロールに指定するポリシーは、AWSLambdaBasicExecutionRoleに、上記で作成した防御ログ(詳細)取得順番待ちSQSキューへのメッセージを受け取る&削除するために必要な権限(sqs:DeleteMessage,sqs:ReceiveMessage,sqs:GetQueueAttributes,sqs:GetQueueUrl)と 防御ログ(詳細)保存用S3バケットへファイルを書き込むPutObject権限を付与しています。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "sqs:ListQueues"
            ],
            "Resource": "arn:aws:sqs:${region}:${account_id}:*"
        },        
        {
            "Effect": "Allow",
            "Action": [             
                "sqs:DeleteMessage",
                "sqs:ReceiveMessage",
                "sqs:GetQueueAttributes",
                "sqs:GetQueueUrl"
            ],
            "Resource": "arn:aws:sqs:${region}:${account_id}:scutum-detail-log-queue-*"
        },

        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject"
            ],
            "Resource": "arn:aws:s3:::${log_bucket}*"
        }       
    ]
}

そのまま使わず、${region}、${account_id}、${log_bucket}は適宜設定を変更してください。

ソースコード

ソースコードはこちらになります。

import logging
import boto3
import json
import traceback
import os
import sys
import base64
import urllib.request
from datetime import datetime, timedelta, timezone
import time
import gzip
import shutil

logger = logging.getLogger()
logger.setLevel(logging.INFO)
 
# LOG_BUCKET
if os.environ['LOG_BUCKET']:
    LOG_BUCKET = os.environ['LOG_BUCKET']
    logger.info("Setting Environment variables - LOG_BUCKET :" +  LOG_BUCKET )
else:
    logger.error("No Setting Environment variables - LOG_BUCKET ")
    sys.exit(1)   

# sqs resource 
sqs = boto3.resource('sqs')

# dynamodb client
dynamodb = boto3.client('dynamodb')

# s3 resource
s3 = boto3.resource('s3')

def get_scutum_detail_logs(host,id,scutumApiKey,log_id):

    url = 'https://api.scutum.jp/api/v1'
    detail_url = url + '/alert_detail'
    detail_params = {
        'host': host,
        'id' : id,
        'log_id' : log_id
    }
    
    common_headers = {
        'X-Scutum-API-Key': scutumApiKey,
        'Connection': 'close'
    }

    detail_res_json = {}
    detail_req = urllib.request.Request(url='{}?{}'.format(detail_url, urllib.parse.urlencode(detail_params)),headers=common_headers)
    try:
        with urllib.request.urlopen(detail_req) as detail_res:
            detail_res_json = json.load(detail_res)
    except urllib.error.HTTPError as err:
        if err.code == 429:
            logger.error(err)
            sys.exit(1)
        else:
            logger.error(err)

    except:
        raise Exception(traceback.format_exc())

    return detail_res_json

def lambda_handler(event, context):

    try:
        queue_iterator = sqs.queues.filter(QueueNamePrefix="scutum-detail-log-queue-")
    except:
        raise Exception(traceback.format_exc())

    for queue in queue_iterator:
        msg_list = []
        
        # sqsからメッセージを取得。最大25メッセージ取得。ScutumAPIは25回/5分という制限があるため。
        for i in range(5):
            msg_list += queue.receive_messages(MaxNumberOfMessages=5)

        for message in msg_list:

            message_body = json.loads(message.body)

            scutum_detail_log = get_scutum_detail_logs( message_body['hostName'], message_body['scutumId'], message_body['scutumApiKey'], message_body['logId'])

            if 'request' in scutum_detail_log:

                body_flag = False

                reqeust_source = base64.b64decode(scutum_detail_log['request']).decode(encoding='utf-8', errors='replace')
                for index,line in enumerate(reqeust_source.splitlines()):

                    # httpMethodの内容を抽出
                    if index == 0:
                        request_split = line.split(' ')
                        scutum_detail_log['httpMethod']= request_split[0]

                    if index != 0:
                        # httpリクエストヘッダーの内容を抽出
                        if ':' in line:
                            requestid_split = line.split(':',1)
                            scutum_detail_log[requestid_split[0].strip()]=requestid_split[1].strip()

                        # 空行以下がBodyの情報になるのでフラグを設定
                        #if line in ['\n', '\r\n']:
                        if len(line.strip()) == 0 :
                            body_flag = True
                            scutum_detail_log['body'] = ""

                        # Bodyの内容を抽出  
                        if body_flag:
                            scutum_detail_log['body'] += line
                    
                scutum_detail_log.pop('request')

            # 一時ファイル名を「防御ログID.log」に設定
            temp_file_name = message_body['logId'] + ".log"

            # 一時的にLambdaの/tmp以下に保存
            with open('/tmp/' +  temp_file_name, 'w') as f:
                    f.write(json.dumps(scutum_detail_log))

            # S3に保存するファイル名を設定
            s3_store_file_name = temp_file_name + ".gz"

            # /tmp以下に一時的保存したログをgzipで圧縮
            with open('/tmp/' + temp_file_name, 'rb') as f_in:
                with gzip.open('/tmp/' + s3_store_file_name , 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)

            # 防御ログのtsデータから日時情報を取得
            datetime_iso = datetime.fromisoformat(scutum_detail_log['ts'])

            # 圧縮したログをS3に保存
            s3_store_path = message_body['hostName'] + "/" + str(datetime_iso.year) + "/" + str(datetime_iso.month).zfill(2) + "/" + str(datetime_iso.day).zfill(2) + "/" + str(datetime_iso.hour) + "/"
            try:
                obj = s3.Object(LOG_BUCKET, s3_store_path + s3_store_file_name)
                obj.upload_file( '/tmp/' + s3_store_file_name )
            except:
                raise Exception(traceback.format_exc())

            logger.info("Logs(" +  message_body['logId'] + ") were stored in S3.") 

            #キューから保存した防御ログのメッセージを削除する。
            try:
                queue.delete_messages(
                    Entries=[
                        {
                            'Id': "1",
                            'ReceiptHandle': message.receipt_handle
                        },
                    ]
                )
            except:
                raise Exception(traceback.format_exc())

            logger.info("Logs(" +  message_body['logId'] + ") were removed in sqs.") 

    return {
        'statusCode': 200
    }

if __name__ == "__main__":
    lambda_handler({},{})

ログ取得テスト

それではScutumが設定してあるWEBサイトに対してcurlコマンドでPOSTリクエストのボディにXSSのペイロード入れて送信してみます。

curl -X POST -d '<script>alert("TEST");</script>' "https://www.example.jp/"

Scutumでブロックを確認。

時が変わるまで待ってLambdaが実行されるのを確認します。 LambdaのログはCloudWatch Logsに出力されるので、 CloudWatch Logsでエラーが出ていないか確認します。

実行確認後、S3に移動し、ログファイルが出力されているか確認します。

f:id:woodykedner:20201202152255p:plain

ファイルをダウンロード。

無事取得できました。内容は下記の通りです。

{
  "log_id": "0000888574723_240_00000",
  "ip": "X.X.X.X",
  "block": true,
  "category": [
    "クロスサイトスクリプティング攻撃"
  ],
  "uri": "/",
  "ts": "2020-12-02T00:00:00+09:00",
  "httpMethod": "POST",
  "Host": "www.example.jp",
  "User-Agent": "curl/7.58.0",
  "X-Amz-Cf-Id": "xxxxg2SCY837b0_teksu-Lvd7sxSherAVV_VWRSYWhxxxx_dqEZW_A==",
  "Connection": "Keep-Alive",
  "Content-Length": "31",
  "Via": "1.1 xxxxxxxxxxxxxxxxxxxcxxxxxxxxxxxx.cloudfront.net (CloudFront)",
  "X-Forwarded-For": "X.X.X.X",
  "Accept": "*/*",
  "Content-Type": "application/x-www-form-urlencoded",
  "CloudFront-Forwarded-Proto": "https",
  "X-Client-Host": "Y.Y.Y.Y",
  "X-Forwarded-For-Orig": "X.X.X.X",
  "X-Forwarded-For2": "X.X.X.X",
  "X-Client-Port": "41890",
  "body": "<script>alert(\"TEST\");</script>"
}

"log_id"から"ts"が防御ログ(詳細)の内容で、それ以下はrequestの要素内にあったbase64エンコードした情報をデコードして、1つ1つの要素に分かれています。
"httpMethod"がメソッド、"Host"から"X-Client-Port"までがリクエストヘッダーの内容、"body"がPOSTのボディの内容となります。このテストでは、responseは無かったので取得はされませんでした。

S3にログが貯まっていきますので、AWSであれば Redshift や Athena を使うことにより、攻撃分析ができると思います。

おわりに

今回はAWSを用いてScutumAPIを使用して防御ログ(詳細)を自動取得についてご紹介しました。
「じゃ、ログを取得した後どうするの?」という話になりますが、別のログとの結合と分析の内容で年内にもう1つブログを書く予定です。
ご期待ください。