ぺい

渋谷系アドテクエンジニアの落書き

Amazon Kinesis Data Firehose 使い方

f:id:tikasan0804:20180624115811p:plain

Amazon Kinesis Data Firehose(長いのでFirehoseとします)

aws.amazon.com

どういうものかというのは、公式の素晴らしい説明文で大体わかります。

ストリーミングデータをデータストアや分析ツールに確実にロードする最も簡単な方法です。ストリーミングデータをキャプチャして変換し、Amazon S3Amazon Redshift、Amazon Elasticsearch Service、Splunk にロードして、現在お使いの既存のビジネスインテリジェンスツールやダッシュボードで、ほぼリアルタイムに分析することができます。

使ったユースケース

弊社のプロダクト(広告配信システム系)でまあまあなリクエストを受けているLambdaのログを見れる状態にしたいという場面で使いました。

Lambdaが各々でS3にログをPUTするぜみたいなことをしていた時に、ファイル数多すぎて、BQに入れれないし、Athenaで見るにしてもログ多すぎて検索終わるのを待つと、日が暮れる(正確にはエラーになった)となっていました。
そこに、Kinesis Firehose使って複数のLambdaが吐くログを固めてS3にPUTすることで、少し待てばAthenaでログを見れるようになりました。(ファイル数が減ってファイルIOが減ったから)

また、S3にPUTする時に、時間ごとにフォルダを自動で作ってくれるので、パーティションを切りたい時も同じルールを適用出来るのはいいなーと思いました。

Stream作成

Step 1: Name and source

Delivery stream name
ストリーム名。今回の設定値は「firehose」

Source

  • Direct PUT or other sources
    レコードを配信ストリームに直接送信する場合、またはAWS IoT、CloudWatchログ、CloudWatchイベントからレコードを送信する場合は、このオプションを選択します。
  • Kinesis stream
    Kinesis streamの流し先として、Firehoseを選べます。今回のユースケースでは使いませんでしたが、これも便利そう。

今回の設定値は、Lambdaから直接流す形なので、「Direct PUT or other sources」。

Step 2: Process records

Transform source records with AWS Lambda
Lambda 関数を呼び出して、受信した送信元データを変換してから送信先に配信できます。Kinesis Data Firehose のデータ変換は、配信ストリームの作成時に有効にすることができます。 とあるように、流れてくるデータを何か手を加えたい時は、これを使うと良さそう。 Amazon Kinesis Data Firehose のデータ変換 - Amazon Kinesis Firehose

今回の設定値は、「Disabled」。

Convert record format
データのフォーマットをJSONから何かへみたいな感じの変換が出来る。

以下の2つが用意されてる。また、AWS Glueとも連携が出来るようです。(別記事で調査する予定) - Apache Parquet - Apache ORC

今回の設定値は、「Disabled」。

Step 3: Choose destination

Select destination
保存先を設定する。保存先として以下の4つがある。

今回の設定値は、「Amazon S3」。

S3 destination

  • S3 bucket
    保存先のS3を選ぶ。
  • Prefix
    FirehoseがPUTする時のフォルダ名と認識すればおk。

今回は、Create newして、「put-by-firehose」というバケットを作成。また、Prefixは「firehose/」とした。こうすることで、put-by-firehose/firehose/firehoseがよしなにという形にしてくれる。

Step 4: Configure settings

S3 buffer conditions
受信レコードをS3バケットに転送する前にバッファリングしてくれる。その時の設定をする。条件のいずれかが満たされると、PUTされる。 - Buffer size
1-128 MBの間でバッファするサイズを決めれる
- Buffer interval
60-900秒でPUTする間隔を設定出来る

今回は、どちらも最小値の「1MB」と「60秒」とする。もし、リアルタイム制が要求されない場合は、もっと高い数値を設定することになる。

S3 compression and encryption

  • S3 compression
    どういう圧縮形式でS3にPUTするかというもの。
  • S3 encryption S3上のデータを暗号化するか

今回は、「GZIP」で圧縮し、暗号化については「Disabled」にした。

Error logging
エラーをCloudWatchログを出力する。

今回は使わないので、「Disabled」とした。

IAM role

Create newして、いい感じにしてくれます。デフォで作成される権限は以下のような感じ。まあ、公式によるものだから、オーバーな権限はないし最低限になっている。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "glue:GetTableVersions"
      ],
      "Resource": "*"
    },
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "s3:AbortMultipartUpload",
        "s3:GetBucketLocation",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:ListBucketMultipartUploads",
        "s3:PutObject"
      ],
      "Resource": [
        "arn:aws:s3:::put-by-firehose",
        "arn:aws:s3:::put-by-firehose/*",
        "arn:aws:s3:::%FIREHOSE_BUCKET_NAME%",
        "arn:aws:s3:::%FIREHOSE_BUCKET_NAME%/*"
      ]
    },
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "lambda:InvokeFunction",
        "lambda:GetFunctionConfiguration"
      ],
      "Resource": "arn:aws:lambda:ap-northeast-1:851669633371:function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION%"
    },
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "logs:PutLogEvents"
      ],
      "Resource": [
        "arn:aws:logs:ap-northeast-1:hoge:log-group:/aws/kinesisfirehose/:log-stream:*"
      ]
    },
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "kinesis:DescribeStream",
        "kinesis:GetShardIterator",
        "kinesis:GetRecords"
      ],
      "Resource": "arn:aws:kinesis:ap-northeast-1:hoge:stream/%FIREHOSE_STREAM_NAME%"
    },
    {
      "Effect": "Allow",
      "Action": [
        "kms:Decrypt"
      ],
      "Resource": [
        "arn:aws:kms:region:accountid:key/%SSE_KEY_ARN%"
      ],
      "Condition": {
        "StringEquals": {
          "kms:ViaService": "kinesis.%REGION_NAME%.amazonaws.com"
        },
        "StringLike": {
          "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:%REGION_NAME%:hoge:stream/%FIREHOSE_STREAM_NAME%"
        }
      }
    }
  ]
}

Step 5: Review

f:id:tikasan0804:20180624141537p:plain

StatusがActiveなったら使える状態になります。
f:id:tikasan0804:20180624141928p:plain

Lambda -> Firehose

ロール作成

今回は、LambdaからFirehoseへPUTしたいので、AWSLambdaMicroserviceExecutionRoleAWSLambdaBasicExecutionRoleのロール以外に、インラインで以下のような雑なロールを付加しました。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "firehose:PutRecord",
                "firehose:PutRecordBatch"
            ],
            "Resource": "arn:aws:firehose:*:*:deliverystream/*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "firehose:DescribeDeliveryStream",
                "firehose:ListDeliveryStreams"
            ],
            "Resource": "*"
        }
    ]
}

関数作成

import boto3


def lambda_handler(event, context):
    firehose = boto3.client('firehose')
    firehose.put_record(
        DeliveryStreamName='firehose',
        Record={
            'Data': b'{"user_id": "a", "event": "AAA"}\n'
        }
    )
    firehose.put_record(
        DeliveryStreamName='firehose',
        Record={
            'Data': b'{"user_id": "b", "event": "BBB"}\n'
        }
    )
    firehose.put_record(
        DeliveryStreamName='firehose',
        Record={
            'Data': b'{"user_id": "c", "event": "CCC"}\n'
        }
    )

今回はLambdaから叩いていますが、もちろんローカルからの実行でも動きます。

PUTしてみる

Lambdaを実行すると、以下のようにサクッと出来上がりました!便利! PUTするだけで、ある程度固めてくれるのは相当ありがたい。

put-by-firehose/firehose/2018/06/24/05/firehose-1-2018-06-24-05-47-40-34054ccb-1fa3-47d6-b05f-749b51d032a5.gz

f:id:tikasan0804:20180624145709p:plain

集めたデータどうする

記事の冒頭でも書きましたが、Athenaでいい感じに見れようにしたりも出来るんですが、体力が尽きたので、次の記事で紹介することにします。