Amazon Kinesis Data Firehose 使い方
Amazon Kinesis Data Firehose(長いのでFirehoseとします)
どういうものかというのは、公式の素晴らしい説明文で大体わかります。
ストリーミングデータをデータストアや分析ツールに確実にロードする最も簡単な方法です。ストリーミングデータをキャプチャして変換し、Amazon S3、Amazon Redshift、Amazon Elasticsearch Service、Splunk にロードして、現在お使いの既存のビジネスインテリジェンスツールやダッシュボードで、ほぼリアルタイムに分析することができます。
使ったユースケース
弊社のプロダクト(広告配信システム系)でまあまあなリクエストを受けているLambdaのログを見れる状態にしたいという場面で使いました。
Lambdaが各々でS3にログをPUTするぜみたいなことをしていた時に、ファイル数多すぎて、BQに入れれないし、Athenaで見るにしてもログ多すぎて検索終わるのを待つと、日が暮れる(正確にはエラーになった)となっていました。
そこに、Kinesis Firehose使って複数のLambdaが吐くログを固めてS3にPUTすることで、少し待てばAthenaでログを見れるようになりました。(ファイル数が減ってファイルIOが減ったから)
また、S3にPUTする時に、時間ごとにフォルダを自動で作ってくれるので、パーティションを切りたい時も同じルールを適用出来るのはいいなーと思いました。
Stream作成
Step 1: Name and source
Delivery stream name
ストリーム名。今回の設定値は「firehose」
Source
- Direct PUT or other sources
レコードを配信ストリームに直接送信する場合、またはAWS IoT、CloudWatchログ、CloudWatchイベントからレコードを送信する場合は、このオプションを選択します。 - Kinesis stream
Kinesis streamの流し先として、Firehoseを選べます。今回のユースケースでは使いませんでしたが、これも便利そう。
今回の設定値は、Lambdaから直接流す形なので、「Direct PUT or other sources」。
Step 2: Process records
Transform source records with AWS Lambda
Lambda 関数を呼び出して、受信した送信元データを変換してから送信先に配信できます。Kinesis Data Firehose のデータ変換は、配信ストリームの作成時に有効にすることができます。 とあるように、流れてくるデータを何か手を加えたい時は、これを使うと良さそう。
Amazon Kinesis Data Firehose のデータ変換 - Amazon Kinesis Firehose
今回の設定値は、「Disabled」。
Convert record format
データのフォーマットをJSONから何かへみたいな感じの変換が出来る。
以下の2つが用意されてる。また、AWS Glueとも連携が出来るようです。(別記事で調査する予定) - Apache Parquet - Apache ORC
今回の設定値は、「Disabled」。
Step 3: Choose destination
Select destination
保存先を設定する。保存先として以下の4つがある。
今回の設定値は、「Amazon S3」。
S3 destination
- S3 bucket
保存先のS3を選ぶ。 - Prefix
FirehoseがPUTする時のフォルダ名と認識すればおk。
今回は、Create newして、「put-by-firehose」というバケットを作成。また、Prefixは「firehose/」とした。こうすることで、put-by-firehose/firehose/firehoseがよしなにという形にしてくれる。
Step 4: Configure settings
S3 buffer conditions
受信レコードをS3バケットに転送する前にバッファリングしてくれる。その時の設定をする。条件のいずれかが満たされると、PUTされる。
- Buffer size
1-128 MBの間でバッファするサイズを決めれる
- Buffer interval
60-900秒でPUTする間隔を設定出来る
今回は、どちらも最小値の「1MB」と「60秒」とする。もし、リアルタイム制が要求されない場合は、もっと高い数値を設定することになる。
S3 compression and encryption
- S3 compression
どういう圧縮形式でS3にPUTするかというもの。 - S3 encryption S3上のデータを暗号化するか
今回は、「GZIP」で圧縮し、暗号化については「Disabled」にした。
Error logging
エラーをCloudWatchログを出力する。
今回は使わないので、「Disabled」とした。
IAM role
Create newして、いい感じにしてくれます。デフォで作成される権限は以下のような感じ。まあ、公式によるものだから、オーバーな権限はないし最低限になっている。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Action": [ "glue:GetTableVersions" ], "Resource": "*" }, { "Sid": "", "Effect": "Allow", "Action": [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::put-by-firehose", "arn:aws:s3:::put-by-firehose/*", "arn:aws:s3:::%FIREHOSE_BUCKET_NAME%", "arn:aws:s3:::%FIREHOSE_BUCKET_NAME%/*" ] }, { "Sid": "", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "arn:aws:lambda:ap-northeast-1:851669633371:function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION%" }, { "Sid": "", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:ap-northeast-1:hoge:log-group:/aws/kinesisfirehose/:log-stream:*" ] }, { "Sid": "", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:GetShardIterator", "kinesis:GetRecords" ], "Resource": "arn:aws:kinesis:ap-northeast-1:hoge:stream/%FIREHOSE_STREAM_NAME%" }, { "Effect": "Allow", "Action": [ "kms:Decrypt" ], "Resource": [ "arn:aws:kms:region:accountid:key/%SSE_KEY_ARN%" ], "Condition": { "StringEquals": { "kms:ViaService": "kinesis.%REGION_NAME%.amazonaws.com" }, "StringLike": { "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:%REGION_NAME%:hoge:stream/%FIREHOSE_STREAM_NAME%" } } } ] }
Step 5: Review
StatusがActiveなったら使える状態になります。
Lambda -> Firehose
ロール作成
今回は、LambdaからFirehoseへPUTしたいので、AWSLambdaMicroserviceExecutionRole
やAWSLambdaBasicExecutionRole
のロール以外に、インラインで以下のような雑なロールを付加しました。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "firehose:PutRecord", "firehose:PutRecordBatch" ], "Resource": "arn:aws:firehose:*:*:deliverystream/*" }, { "Sid": "VisualEditor1", "Effect": "Allow", "Action": [ "firehose:DescribeDeliveryStream", "firehose:ListDeliveryStreams" ], "Resource": "*" } ] }
関数作成
import boto3 def lambda_handler(event, context): firehose = boto3.client('firehose') firehose.put_record( DeliveryStreamName='firehose', Record={ 'Data': b'{"user_id": "a", "event": "AAA"}\n' } ) firehose.put_record( DeliveryStreamName='firehose', Record={ 'Data': b'{"user_id": "b", "event": "BBB"}\n' } ) firehose.put_record( DeliveryStreamName='firehose', Record={ 'Data': b'{"user_id": "c", "event": "CCC"}\n' } )
今回はLambdaから叩いていますが、もちろんローカルからの実行でも動きます。
PUTしてみる
Lambdaを実行すると、以下のようにサクッと出来上がりました!便利! PUTするだけで、ある程度固めてくれるのは相当ありがたい。
put-by-firehose/firehose/2018/06/24/05/firehose-1-2018-06-24-05-47-40-34054ccb-1fa3-47d6-b05f-749b51d032a5.gz
集めたデータどうする
記事の冒頭でも書きましたが、Athenaでいい感じに見れようにしたりも出来るんですが、体力が尽きたので、次の記事で紹介することにします。