dip Engineer Blog

Engineer Blog
ディップ株式会社のエンジニアによる技術ブログです。
弊社はバイトル・はたらこねっとなど様々なサービスを運営しています。

CloudWatchLogsからLambda経由でログメッセージを通知する

AWSを利用していると、アプリケーションのログをCloudWatch Logsに出力させることがあると思います。 本記事ではCloudWatch Logsに出力されたログの文字列を検知してAWS Lambda(以下、Lambda)を起動するシステムを構築していきます。

Lambdaの作成

CloudWatch Logsのサブスクリプションを作成する前に、起動先のLambdaを作成しておきます。 Lambda構築の説明は省略しますが、コードは以下のようにslack通知するようにしています。

require 'slack/incoming/webhooks'
require 'json'

def slack_notification(message)
  slack = Slack::Incoming::Webhooks.new ENV['SLACK_INCOMING_URL'], channel: '#general'
  slack.post message
end

def handler(event:, context:)
  slack_notification('エラーだよ')
  { event: JSON.generate(event), context: JSON.generate(context.inspect) }
end

CloudWatch Logs subscriptionsの設定

CloudWatch LogsからLambdaを起動させる方法の一つとして、サブスクリプションがあります。

サブスクリプションを使用して CloudWatch Logs からのログイベントのリアルタイムフィードにアクセスし、カスタム処理、分析、他のシステムへのロードを行うために、 Amazon Kinesis ストリーム、Amazon Kinesis Data Firehose ストリーム、AWS Lambda などの他のサービスに配信することができます。 ログイベントが宛先サービスに送信されると、Base64 でエンコードされ、gzip 形式で圧縮されます。

設定には、ロググループ設定のサブスクリプションフィルターのタブから作成できます。 また、サブスクリプションフィルターは1つのロググループにつき2つまでしか作成できません。 Lambdaサブスクリプションフィルターを選択する事でLambdaへの起動をするフィルターを作成できます。

実際に作成するにはまず起動先のLambda関数を選択します。 ログの形式は、今回はカスタムフィルターを利用したいのでその他にします。 サブスクリプションフィルターのパターンに、検知したい文字列を入力し、名前をつけます。

パターンをテストの項目で先ほど指定して文字列の検知が合っているかを確認できます。 すでにいくつかログが出力されている場合、そのログを利用してパターンのテストができます。

f:id:hayaosato:20201202153552p:plain

今回は ERROR という文字列を検知するようにしました。

ストリーミングを開始を設定する事でLambdaへのトリガが作成され、起動するようになります。

実際に飛ばしてみよう

作成したサブスクリプションが起動するか、実際にCloudWatch Logsで検知するであろう文字列(今回はERROR)を実際に出力させて Lambdaが起動するか確かめて見ます。

CloudWatch Logsのロググループにログを出力させるのは以下のコマンドで出来ます。

aws logs put-log-events --log-group-name ロググループ名 --log-stream-name ログストリーム名 --log-events timestamp=$(node -e 'console.log(Date.now())'),message='出力したいメッセージ'

今回は ロググループ: /sample-logs ログストリーム: stream01 出力するメッセージ: ERROR: hogehoge として実際に動かしてみます。

aws logs put-log-events --log-group-name /sample-logs --log-stream-name stream01  --log-events timestamp=$(node -e 'console.log(Date.now())'),message='ERROR: hogehoge'

ちなみに、成功すると

{
    "nextSequenceToken": "xxxxxxxxxxxxxxxx"
}

とトークンが発行されるので、2回目以降は

aws logs put-log-events --log-group-name /sample-logs --log-stream-name stream01  --log-events timestamp=$(node -e 'console.log(Date.now())'),message='ERROR: hogehoge'  --sequence-token='xxxxxxxxxxxxxx'

とトークンを指定する必要があります。

以上のように、ログを出力する事でLambdaが起動します。 今回はSlack通知を飛ばすLambdaなので、

f:id:hayaosato:20201202153513p:plain

出来ました。

CloudWatchからのログをデコード

しかし、実際の運用でこの仕組みを使おうとするとCloudWatchからのメッセージは

{
  "awslogs": {
    "data": "H4sIAAAAAAAAAHWPwQqCQBCGX0Xm7EFtK+smZBEUgXoLCdMhFtKV3akI8d0bLYmibvPPN3wz00CJxmQnTO41whwWQRIctmEcB6sQbFC3CjW3XW8kxpOpP+OC22d1Wml1qZkQGtoMsScxaczKN3plG8zlaHIta5KqWsozoTYw3/djzwhpLwivWFGHGpAFe7DL68JlBUk+l7KSN7tCOEJ4M3/qOI49vMHj+zCKdlFqLaU2ZHV2a4Ct/an0/ivdX8oYc1UVX860fQDQiMdxRQEAAA=="
  }
}

のようにエンコードされ、gzipで圧縮されています。 Lambdaをログをデコードおよび圧縮解除し、Slack通知するファンクションにしようと思います。 コードは以下のようになります。

require 'slack/incoming/webhooks'
require 'json'
require 'zlib'
require 'stringio'
require 'base64'


def slack_notification(message)
  slack = Slack::Incoming::Webhooks.new ENV['SLACK_INCOMING_URL'], channel: '#general'
  slack.post message
end

def parse_message(data)
  gzip_data = Base64.strict_decode64(data)
  gz = Zlib::GzipReader.new(StringIO.new(gzip_data))
  JSON.parse(gz.read).to_s
end

def main(event:, context:)
  slack_message = parse_message(event['awslogs']['data'])
  slack_notification(slack_message)
  { event: JSON.generate(event), context: JSON.generate(context.inspect) }
end

Base64デコードをしてGzipの圧縮解除で読み込んだデータをjsonにパースしています。 slackにメッセージを送信するために無理やり文字列に変換してますが、実際に運用する際には各自整形してあげてください。

このようにログの中身も確認できるようになりました。

f:id:hayaosato:20201202153640p:plain

Terraform化

最後に、この仕組みをTerraform化してみようと思います。

まずはLambdaのTerraformです。IAM Roleは適宜変更をお願いします。

resource "aws_lambda_function" "default" {
  filename      = "your_script.zip"
  function_name = "your_function_name"
  role          = aws_iam_role.default.arn
  handler       = "your.handler"
  timeout       = 900

  runtime = "ruby2.7"
}

resource "aws_iam_role" "default" {
  name               = "your_role_name"
  assume_role_policy = <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents"
      ],
      "Resource": "arn:aws:logs:*:*:*",
      "Effect": "Allow"
    }
  ]
}
EOF
}

続いて、CloudWatch LogsとLambdaの連携をするTerraformです。

resource "aws_cloudwatch_log_group" "default" {
  name = "/your/log/group/name"
}

resource "aws_cloudwatch_log_subscription_filter" "default" {
  name            = "your subscription filter"
  log_group_name  = aws_cloudwatch_log_group.default.name
  filter_pattern  = "ERROR"
  destination_arn = aws_lambda_function.default.arn
}

resource "aws_lambda_permission" "from_cloudwatch_logs" {
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.default.function_name
  principal     = "logs.ap-northeast-1.amazonaws.com"
  source_arn    = aws_cloudwatch_log_group.default.arn
}

このように、ロググループのサブスクリプションフィルターはlambdaと連携させてあげる必要があります。

参考

著者

dippeople.dip-net.jp (写真左)