SSTエンジニアブログ

SSTのエンジニアによるWebセキュリティの技術を中心としたエンジニアブログです。

ScutumAPIリリース記念!防御ログをAWSで自動取得してみた。

はじめに

こんにちは、研究開発の宇田川です。

2020年11月2日にScutumのログを取得できるAPIがリリースされました!

www.scutum.jp

ScutumのログをSIEMへの取り込みや分析で使用したいと考えていた方々には待望のリリースなのではないでしょうか。
私自身も楽しみにしていた機能追加であり、リリース前にScutumAPIを試用する機会をいただきました。
今回のブログではその時に試したAWSサービスを利用した自動取得する方法を共有させていただきます。

目次

Scutum APIについて

まず、ScutumAPIについて、簡単に説明します。

ScutumAPIの事前準備

ScutumAPIにアクセスするためには、APIキーが必要になります。
APIキーはScutumの管理画面で発行できます。
Scutumの管理サイトにログインします。左メニューに新しくAPIの設定が追加されていますね。そちらをクリックし、APIキーの管理のページに移動します。 ページにありますAPIキーの発行をクリックすると、APIキーが発行されます。 f:id:woodykedner:20201111143353p:plain

その他、Scutumの管理サイトにログインする際に使用するユーザID、Scutumのログの取得対象のFQDNが必要になります。
こちらも合わせて用意してください。

ScutumAPIで取得できるログ

それでは、ScutumAPIで取得できるログを見ていきます。
ScutumAPIでは、2種類のログが取得できます。

  • 防御ログ(リスト)
  • 防御ログ(詳細)

本稿では防御ログ(リスト)のみ対象に取得してみます。※防御ログ(詳細)はまた別のブログで紹介します!!

防御ログ(リスト)のURLは

https://api.scutum.jp/api/v1/alert

です。

必要な情報をURLパラメータに設定していきます。
必須なのが、Scutumログ取得対象FQDNを指定するhostパラメータとScutum管理サイトにログインする際に使用するユーザIDをしているidパラメータです。
Scutumログ取得対象FQDNの設定がhost=www.example.jp、Scutum管理サイトにログインする際に使用するユーザIDがid=ABC1234の場合、設定例は下記の通りになります。

https://api.scutum.jp/api/v1/alert?host=www.example.jp&id=ABC1234

では、取得したScutumのAPIキーはどこに設定するか。これは注意点でもあります。ScutumのAPIキーは、リクエストのX-Scutum-API-Keyヘッダーに追加します。

上記設定例を使用して、curlコマンドを書くと下記のようになります。

curl -H 'X-Scutum-API-Key: xxxxf5d161b33b6xxxxaa8f8ccfdfdfxxxx' "https://api.scutum.jp/api/v1/alert?host=www.example.jp&id=ABC1234"

他の設定と混同して、APIキーをURLパラメータ一に設定しないように注意しましょう。

その他、URLパラメータ一への設定には下記の設定もあります。

URLパラメータ 説明
time_order 取得する防御ログの並び順を設定
from 基点を含む、指定の日時以降の防御ログを取得
to 基点を含まず、指定の日時より前の防御ログを取得
ip 送信元IPアドレスが完全一致する防御ログデータを取得。
単一のIPアドレスのみ指定可能
marker ページネーション用マーカー

例えば、2020/11/12の防御ログを取得したい時は下記のようなcurlコマンドとなります。予約文字はURLエンコードしています。

curl -H 'X-Scutum-API-Key: xxxxf5d161b33b6xxxxaa8f8ccfdfdfxxxx' "https://api.scutum.jp/api/v1/alert?host=www.example.jp&id=ABC1234&from=2020-11-12T00%3A00%3A00%2B09%3A00&to=2020-11-13T00%3A00%3A00%2B09%3A00"

APIへのリクエストが成功すると、json形式のレスポンスが返ります。

{
   "data" : [
     {
       "log_id" : "1568081543650_595_59102",
       "ip" : "192.168.1.2",
       "block" : true,
       "category" : [
          "SQLインジェクション"
       ],
       "uri" : "/test.jsp",
       "ts" : "2020-11-12T11:12:24+09:00"
    }
  ],
  "count" : 1,
  "truncated" : false
  "next_marker" : "..."    #← truncatedtrueの場合のみ、表示
}

各要素の説明は下記の通りです。

要素名 説明
data 防御ログのリスト
count 防御ログの数
truncated 防御ログが1000件を超えているかどうか
next_marker truncatedがtrueの場合出力され、1000件以降の防御ログ取得する際に使用

1回のAPIリクエストで取得できる防御ログは1000件という制限があり、truncated は、防御ログが1000件を超えているかどうかを表しています。truncatedがtrueの場合、next_markerという要素が追加され、別のAPIへのリクエストでURLパラメータにmarker=<next_markerの値>を指定することで1000件以後の防御ログを取得することが可能となります。

dataにフォーカスし要素の内容は下記の通りとなります。実際に分析に使用するのはここの値となりますね。

要素名 説明
log_id 防御ログID
ip 送信元のIPアドレス
block ブロックの有無
category 検知した攻撃分類、複数の攻撃分類を検知する場合あり
uri リクエストURI
ts Scutumが通信を検知・ブロックした日時

先ほどの2020/11/12の防御ログが1000件を超えていて、1000件以上の防御ログを取得したい時curlコマンドにmarkerをつけるこんな感じです。

curl -H 'X-Scutum-API-Key: xxxxf5d161b33b6xxxxaa8f8ccfdfdfxxxx' "https://api.scutum.jp/api/v1/alert?host=www.example.jp&id=ABC1234&from=2020-11-12T00%3A00%3A00%2B09%3A00&to=2020-11-13T00%3A00%3A00%2B09%3A00&marker=1234567891234_567_89123"


ざっと大まかな仕様は書きましたが、細かい仕様もありますのでAPIのドキュメントも合わせて読んでください。 support.scutum.jp

AWS自動取得構成

では、AWS自動取得構成を見ていきます。
単純に1時間1回、前の1時間の防御ログ(リスト)を取得してS3に保存します。
リージョンはバージニア北部リージョンを使用しましたが、東京リージョンでも普通に構築可能です。

f:id:woodykedner:20201111103751p:plain

S3

Scutumの防御ログ保存用S3バケットを作成します。

特別な設定はしていないので、ドキュメントに沿って、S3バケットを作成してみてください。 docs.aws.amazon.com

Lambda

設定として、ランタイムはpython3.8、メモリのサイズは128MB、タイムアウトは300秒としました。

IAMロールに指定するポリシーは、AWSLambdaBasicExecutionRoleに上記で作成したS3バケットへのPutObject権限を付与しています。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject"
            ],
            "Resource": "arn:aws:s3:::${scutum_log_bucket}/*"
        }       

    ]
}

Lambdaの環境変数にはユーザID、FQDN、APIキー、防御ログ保存用のS3バケット名を入力します。 それぞれのキーは下記の通りです。環境に合わせて値を設定してください。

キー 説明
HOST_NAME 防御ログ取得のFQDN名
SCUTUM_ID ScutumのユーザID
SCUTUM_API_KEY ScutumのAPIキー
LOG_BUCKET 防御ログ保存用のS3バケット名

今回テストなのでそのままデータを登録してしまいましたが、これらの環境変数に登録するデータは機密情報に当たります。本番環境で使用するのであれば、そのまま環境変数に登録せず、KMSを使用して暗号化した方が良いかなと思います。

ソースコードはこちら。

import urllib.request
import json
import os
import sys
import logging
from datetime import datetime, timedelta, timezone
import boto3
import traceback

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# SCUTUM_ID
if os.environ['SCUTUM_ID']:
    SCUTUM_ID = os.environ['SCUTUM_ID']
    logger.info("Setting Environment variables - SCUTUM_ID :" +  SCUTUM_ID )
else:
    logger.error("No Setting Environment variables - SCUTUM_ID ")
    sys.exit(1)

# SCUTUM_API_KEY
if os.environ['SCUTUM_API_KEY']:
    SCUTUM_API_KEY = os.environ['SCUTUM_API_KEY']
    logger.info("Setting Environment variables - SCUTUM_API_KEY :" +  SCUTUM_API_KEY )
else:
    logger.error("No Setting Environment variables - SCUTUM_API_KEY ")
    sys.exit(1)    

# HOST_NAME
if os.environ['HOST_NAME']:
    HOST_NAME = os.environ['HOST_NAME']
    logger.info("Setting Environment variables - HOST_NAME :" +  HOST_NAME )
else:
    logger.error("No Setting Environment variables - HOST_NAME ")
    sys.exit(1)   

# LOG_BUCKET
if os.environ['LOG_BUCKET']:
    LOG_BUCKET = os.environ['LOG_BUCKET']
    logger.info("Setting Environment variables - LOG_BUCKET :" +  LOG_BUCKET )
else:
    logger.error("No Setting Environment variables - LOG_BUCKET ")
    sys.exit(1)   

# s3 resource
s3 = boto3.resource('s3')

def get_scutum_block_logs(host, id,scutumApiKey,from_date=None,to_date=None,maker=None):

    url = 'https://api.scutum.jp/api/v1'
    list_url = url + '/alert'
    list_params = {
        'host': host,
        'id' : id,
        'time_order':'asc'
    }
    
    if from_date is not None:
        list_params['from'] = from_date.isoformat()

    if to_date is not None:
        list_params['to'] =  to_date.isoformat()
    
    if maker != None:
        list_params['maker'] = maker

    common_headers = {
        'X-Scutum-API-Key': scutumApiKey,
        'Connection': 'close'
    }

    list_res_json = {}
    list_req = urllib.request.Request(url='{}?{}'.format(list_url, urllib.parse.urlencode(list_params)),headers=common_headers)
    try:
        with urllib.request.urlopen(list_req) as list_res:
            list_res_json = json.load(list_res)
    except urllib.error.HTTPError as err:
            logger.error(err)                 

    return list_res_json

def lambda_handler(event, context):

    # タイムゾーンの生成
    JST = timezone(timedelta(hours=+9), 'JST')

    # 現在の時間取得
    now = datetime.now(JST)
    now_rounded_down = now.replace(minute=0, second=0, microsecond=0)

    # 1時間前の時間取得
    one_hour_ago = now - timedelta(hours=1)
    one_hour_ago_rounded_down = one_hour_ago.replace(minute=0, second=0, microsecond=0)

    maker = None
    all_scutum_block_log = []
    while True:
        scutum_block_logs = get_scutum_block_logs(HOST_NAME,SCUTUM_ID,SCUTUM_API_KEY,one_hour_ago_rounded_down,now_rounded_down,maker)
        if 'data' in scutum_block_logs:
            for data in scutum_block_logs['data']:
                all_scutum_block_log.append(json.dumps(data))
        if 'next_marker' in scutum_block_logs:
            maker = scutum_block_logs['next_marker']
        else:
            break


    if len(all_scutum_block_log) > 0:
        FILE_NAME = str(one_hour_ago.year) + "/" + str(one_hour_ago.month) + "/" + str(one_hour_ago.day) + "/" + str(one_hour_ago.hour) + "/" + str(one_hour_ago.timestamp()) + "_scutum_block.log"
        body = '\n'.join(all_scutum_block_log)

        obj = s3.Object(LOG_BUCKET,FILE_NAME)            
        obj.put( Body=body )

    return {
        'statusCode': 200
    }

if __name__ == "__main__":
    lambda_handler({},{})

ログはjson形式のレスポンス S3にログを保存する際、取得した年/月/日/時間でフォルダを作成し階層分けしています。
こうすることにより特定の日時のログを探しやすいですし、GlueのクローラーでAthena用のテーブルを作成する際にパーテーションを切ってくれるので、Athenaで検索時に特定の日付配下のデータだけスキャンすることができパフォーマンスの向上、スキャンするデータも減るのでコストカットも見込めます。
また、1回で取得できる防御ログ数は1000となっており、それ以上取得する場合は、レスポンスボディに含まれる next_marker の値をリクエストパラメータ marker に指定してAPIを呼び出す必要があるので、指定した時間内のログは全て取得できるように対応してあります。
注意点はScutumAPIは5分間に25回以内という制限があります。テストのため、ここまでは対応しませんでしたが、攻撃が多く来るサイトについては制限について考慮が必要になります。

CloudWatch Events

作成したLambda関数を定期実行するために、CloudWatch Eventsを作成します。 AWSマネージメントコンソールで作成したLambda関数を開きトリガーを追加から設定します。
トリガーをEventBridge (CloudWatch Events)に設定し、ルールを新規ルールの作成に設定、 ルール名を適宜設定し、ルールタイプをスケジュール式にして、スケジュール式には下記の内容を記述します。

cron(0 * * * ? *)

トリガーの有効化にチェックが入っていることを確認し、追加を押せば設定終了です。

ログ取得テスト

それではブラウザからXSSのペイロード入りのリクエストを送信。

https://www.example.jp/?test='<script>alert("TEST");</script>'

Scutumでブロックを確認。

f:id:woodykedner:20201113111442p:plain:w500

※上記画面はイメージです。


時が変わるまで待ってLambdaが実行されるのを確認します。
LambdaのログはCloudWatch Logsに出力されるので、 CloudWatch Logsでエラーが出ていないか確認します。

f:id:woodykedner:20201112153013p:plain

実行確認後、S3に移動し、ログファイルが出力されているか確認します。

f:id:woodykedner:20201112151826p:plain

ファイルをダウンロード。

categoryに"クロスサイトスクリプティング攻撃"が表示されているので無事取得できました。

    {
        "log_id": "0000640391638_849_50000",
        "ip": "X.X.X.X",
        "block": true,
        "category": [
            "クロスサイトスクリプティング攻撃"
        ],
        "uri": "/",
    "ts": "2020-11-12T00:00:00+09:00"
    }

おわりに

今回はAWSを用いてScutumAPIを使用して防御ログ(リスト)を自動取得してみました。
詳細にユーザエージェントやcookieなどのヘッダー、POST Bodyなどもっとリクエストの情報が見たい場合は防御ログ(詳細)を取得することで実現が可能です。こちらはまた近いうちに共有します。

乞うご期待!

記事を書いたメンバー
sakamoto

宇田川(id:woodykedner

仕事内容 : WAF設計構築、クラウド技術調査
好きなAWSサービス : AWS Lambda、Amazon Athena、Amazon QuickSight
趣味 : マラソン、サッカー観戦