
こんにちは。SRE部の塩崎です。七味唐辛子の粉末を7種類に分類するという趣味を発展させて、おっとっとを新口動物と旧口動物に分類するという趣味を最近発明しました。
BigQueryは非常にパワフルなData WareHouse(DWH) SaaSであり、大容量のデータを一瞬で分析できます。しかし、課金額がスキャンしたデータ量に比例するという特徴があるため、意図せずに大量のデータをスキャンしてしまい大金を溶かしてしまうことを懸念する人もいます。
qiita.com
そのため、課金額が大きすぎるクエリを発見した際にSlackへ通知する仕組みを作りました。GCP Organization内の全プロジェクトで実行されたBigQueryの監査ログをリアルタイムにチェックすることによってこの仕組みは実現されています。本記事では作成したシステムを紹介します。
なお、本記事は以下のQiita記事に着想を得たものであり、@irotorisさんにはこの場を借りてお礼申し上げます。
qiita.com
最初は元記事で公開されているソースコードをそのまま使用することを考えていたのですが、ライセンス表記がなかったため独自で実装し直し、我々のユースケースにおいて不足する機能を付け足しました。
最初にBigQueryのコスト削減に有効な機能の紹介と、それらだけでは不十分であり、監査ログのリアルタイムスキャン機能を作成した理由について説明します。
まず、BigQueryのコストを抑える方法として、プロジェクトレベル・ユーザーレベルにQuotaを割り当てる機能があります。
cloud.google.com
この機能を使えば、特定のユーザーが高額なクエリを実行しすぎた際に、それ以上のクエリの実行を停止できます。
しかし、ユーザー毎にQuotaの上限を変えたい場合は、そのユーザーが属するプロジェクトを変える必要があります。プロジェクトを適切に分けてある場合はこの機能の導入がしやすいですが、そうでない場合は事故を防ぐために相当大きめのQuotaとして設定する必要が出てきてしまいます。特に落ちてはいけない大事なバッチが実行されるプロジェクトとアドホックなクエリが実行されるプロジェクトの分離は必須です。そうしない場合はアドホックなクエリによってQuotaが全部使い果たされてしまい、大事なバッチがusageQuotaExceededエラーになってしまいます。
我々の環境ではこれらの分離がまだまだ不十分であったため、この機能の導入は一旦見送りました。分離が十分にできた時点で導入を再度検討することを考えております。
次に紹介するのがReservations機能です。
cloud.google.com
この機能を使うと、スキャンしたデータ量によらず毎月固定の金額が課金されます。どれだけの計算量が必要なのかというのをSlotという単位であらかじめコミットします。このSlotはCPUを仮想化した概念で1Slotが1CPUに相当する量です。
cloud.google.com
このReservations機能を使うことで、「お値段定額クエリ実行し放題プラン」になると勘違いしやすいですが、実際は異なります。
実際は「お値段定額『Slotの枠内で』クエリ実行し放題プラン」です。購入したSlot数を超えたパフォーマンスが出ることはないため、Slotを大量に消費するクエリが同時に実行された場合は、クエリの実行時間が伸びるという結果になります。Slotはプロジェクト毎でしか割り当てできないので、大事な集計バッチとアドホッククエリのプロジェクトを変えないと、集計バッチのSLAを担保できません。
我々の環境ではプロジェクトごとにReservations機能の有効・無効を切り替え、コストを最適化することを試みています。定常的にクエリが実行されているプロジェクトではこの機能を有効化しコストの予測可能性を高めます。
一方でクエリの実行頻度が低いプロジェクトやクエリの実行頻度が一定でないプロジェクトでは従来の料金プランと最低60秒の枠でSlotを購入できるFlex Slotsを利用しようとしています。
いずれのプロジェクトに対しても、スキャン量が多すぎるクエリや、Slot使用量が多すぎるクエリを発見しアラートを上げる仕組みが必要です。
そのため、BigQueryの課金ログをリアルタイムにチェックし、スキャン量やSlot使用量などが多すぎるクエリを通知する仕組みを作成しました。
今回構築したシステムのインフラ構成図を以下に示します。

GCPのマネージドサービスを活用した、イベントドリブンかつサーバレスな構成です。
BigQueryの監査ログはデフォルトではONになっているため、Cloud Loggingに送られています。Organization内の全プロジェクトから送られてくる監査ログをAggregated sinks機能で集約し、Cloud Pub/Subの1つのTopicに集めます。
その後、Cloud Pub/SubのPush SubscriptionでCloud Runを起動します。HTTPリクエストのBodyの中に監査ログが含まれているため、Cloud Run上に実装したアプリケーションでクエリがリソースを使いすぎていないかどうかをチェックし、Slackに通知します。
ここからは各サービスの連携方法をTerraformのtfファイルを交えながら説明していきます。
一番最初にCloud Pub/SubのTopicを作成します。これについては特に詳しい説明は不要かと思います。
resource "google_pubsub_topic" "bq-police" {
name = "bq-police"
}
次に先程作成したTopicに対して、Cloud LoggingからpublishするためのLog Sinkを作成します。Organization内の全てのログを出力するために、Aggregated sinks機能を使っています。このSinkを作成するためにはOrganizationの logging.configWriter
を付与したアカウントで terraform apply
をする必要があります。
cloud.google.com
org_id
パラメーターは、gcloud organizations list
コマンドを実行すると確認できます。filterで指定しているクエリは以下のページを参考にして作成しました。古いタイプの課金ログ(AuditData)と新しいタイプの課金ログ(BigQueryAuditMetadata)の2種類があり、新しい側の利用が推奨されている点に注意が必要です。
cloud.google.com
resource "google_logging_organization_sink" "bq-police-org" {
name = "bq-police-org"
destination = "pubsub.googleapis.com/${google_pubsub_topic.bq-police.id}"
org_id = "123456789012" # 各自の環境で変える
include_children = true
filter = <<-EOT
protoPayload.metadata."@type"="type.googleapis.com/google.cloud.audit.BigQueryAuditMetadata"
protoPayload.metadata.jobChange.job.jobConfig.type = "QUERY"
EOT
}
そして、Cloud Loggingのサービスアカウント(writer_identity)にTopicへのpublish権限を付与します。Log Sinkはそれぞれ固有のサービスアカウントで実行されており、そのサービスアカウントに対する書き込み権限の付与が必要です。
cloud.google.com
resource "google_pubsub_topic_iam_member" "bq-police-org" {
project = google_pubsub_topic.bq-police.project
topic = google_pubsub_topic.bq-police.name
role = "roles/pubsub.publisher"
member = google_logging_organization_sink.bq-police-org.writer_identity
}
次にCloud Pub/SubとCloud Runの連携について説明します。
まずは、Cloud Runのサービスを作成します。アラートを発報するための閾値(THRESHOLD_*
)やアラート対象から除外するユーザー(EXEMPTED_USERS
)などを環境変数で設定できるようにしています。また、Slackに通知するためのWeb hookのURLは後述するBerglasで設定できるようにしています。
resource "google_service_account" "bq-police-cloud-run" {
account_id = "bq-police-cloud-run"
display_name = "BQ Police(Cloud Run)"
}
resource "google_cloud_run_service" "bq-police" {
name = "bq-police"
location = "us-central1"
template {
spec {
containers {
image = "gcr.io/プロジェクトID/bq-police:latest"
resources {
limits = {
cpu = "1000m"
memory = "256Mi"
}
}
env {
name = "TZ"
value = "Asia/Tokyo"
}
env {
name = "THRESHOLD_TOTAL_BILLED_BYTES"
value = "0"
}
env {
name = "THRESHOLD_TOTAL_SLOT_MS"
value = "0"
}
env {
name = "EXEMPTED_USERS"
value = ""
}
env {
name = "SLACK_WEBHOOK_URL"
value = "sm://プロジェクトID/slack_webhook_url"
}
}
container_concurrency = 10
service_account_name = google_service_account.bq-police-cloud-run.email
}
metadata {
annotations = {
"autoscaling.knative.dev/maxScale" = "10"
"client.knative.dev/user-image" = "gcr.io/プロジェクトID/bq-police:latest"
"run.googleapis.com/client-name" = "terraform"
}
}
}
autogenerate_revision_name = true
}
Cloud Pub/SubからCloud Runを呼び出すために、Push Subscriptionを作成します。今回作成したCloud Runサービスは呼び出すための認証が必要なため、Pushする際にHTTPヘッダーへ認証情報を埋め込む設定をします。このSubscription専用のサービスアカウントを作成し、serviceAccountTokenCreatorのロールを与えます。これにより、このサービスアカウントは自身のOIDCトークンを取得できるようになります。push_config
で生成されたOIDCトークンをHTTPヘッダーに埋め込むように設定します。
cloud.google.com
なお、このOIDCトークンを埋め込んだリクエストを生成するという方式はCloud Pub/Sub以外のプロダクトでも活用できます。以下の記事で詳しく解説されています。
medium.com
そして、Cloud Pub/SubのサービスアカウントにCloud Runサービスの run.invoker
ロールを付与することで、この認証済みリクエストに対する認可をします。
resource "google_service_account" "bq-police-pubsub" {
account_id = "bq-police-pubsub"
display_name = "BQ Police(Cloud Pub/Sub)"
}
resource "google_project_iam_member" "bq-police-pubsub" {
member = "serviceAccount:${google_service_account.bq-police-pubsub.email}"
role = "roles/iam.serviceAccountTokenCreator"
}
resource "google_pubsub_subscription" "bq-police" {
name = "bq-police"
topic = google_pubsub_topic.bq-police.name
push_config {
push_endpoint = google_cloud_run_service.bq-police.status[0].url
oidc_token {
service_account_email = google_service_account.bq-police-pubsub.email
}
}
}
resource "google_cloud_run_service_iam_member" "bq-police-pubsub" {
service = google_cloud_run_service.bq-police.name
location = google_cloud_run_service.bq-police.location
role = "roles/run.invoker"
member = "serviceAccount:${google_service_account.bq-police-pubsub.email}"
}
今回はアプリケーションの実行基盤にCloud Runを使っています。そのため、App Engine(Standard Environment)やCloud Functionsと比較して使用できる言語・フレームワークの自由度が高いです。使い慣れているという理由で、Ruby + Sinatraを使って実装してみました。特別なことはしていないので、詳しい説明は省略します。
require 'json'
require 'yaml'
require 'erb'
require 'sinatra'
require 'faraday'
THRESHOLD = {
total_billed_bytes: ENV.fetch('THRESHOLD_TOTAL_BILLED_BYTES', '0').to_i,
total_slot_ms: ENV.fetch('THRESHOLD_TOTAL_SLOT_MS', '0').to_i,
}
EXEMPTED_USERS = ENV.fetch('EXEMPTED_USERS', '').split(',')
WEBHOOK_URL = ENV['SLACK_WEBHOOK_URL']
class AuditLog
attr_reader :project_id, :total_billed_bytes, :total_slot_ms, :principal_email, :start_time, :end_time, :query
def self.from_pubsub_format(data)
self.new(JSON.load(Base64.decode64(data['message']['data'])))
end
def initialize(log)
@project_id = log['resource']['labels']['project_id']
@principal_email = log['protoPayload']['authenticationInfo']['principalEmail']
@total_billed_bytes = log['protoPayload']['metadata']['jobChange']['job']['jobStats']['queryStats']['totalBilledBytes'].to_i
@total_slot_ms = log['protoPayload']['metadata']['jobChange']['job']['jobStats']['totalSlotMs'].to_i
@start_time = Time.parse(log['protoPayload']['metadata']['jobChange']['job']['jobStats']['startTime'])
@end_time = Time.parse(log['protoPayload']['metadata']['jobChange']['job']['jobStats']['endTime'])
@query = log['protoPayload']['metadata']['jobChange']['job']['jobConfig']['queryConfig']['query']
end
def duration
end_time - start_time
end
def total_billed_gb
total_billed_bytes.to_f / 1024 / 1024 / 1024
end
def cost
total_billed_gb.to_f / 1024 * 5
end
def total_slot_s
total_slot_ms.to_f / 1000
end
end
class BqPolice < Sinatra::Base
def alert?(audit_log, threshold, exempted_users)
if exempted_users.include?(audit_log.principal_email)
return false
end
if audit_log.total_billed_bytes >= threshold[:total_billed_bytes] || audit_log.total_slot_ms >= threshold[:total_slot_ms]
true
else
false
end
end
def format_message(audit_log)
YAML.load(ERB.new(File.read('slack_message.yml.erb')).result_with_hash(audit_log: audit_log))
end
def post_to_slack(webhook_url, message)
Faraday.post(webhook_url, JSON.dump(message), 'Content-Type' => 'application/json')
end
post '/' do
audit_log = AuditLog.from_pubsub_format(JSON.load(request.body.read))
if alert?(audit_log, THRESHOLD, EXEMPTED_USERS)
message = format_message(audit_log)
post_to_slack(WEBHOOK_URL, message)
end
status 200
end
end
上記のRubyコードで参照している slack_message.yml.erb
を以下に示します。
username: 'BigQuery Police'
icon_emoji: ':cop:'
channel: '#投稿したいチャンネル'
attachments:
- text: "BigQuery cost alert"
fallback: "BigQuery cost alert"
color: 'danger'
fields:
- title: 'User'
value: <%= audit_log.principal_email %>
short: true
- title: 'Project'
value: <%= audit_log.project_id %>
short: true
- title: 'Billed bytes (GB)'
value: <%= audit_log.total_billed_gb.round(2) %>
short: true
- title: 'Cost (USD)'
value: <%= audit_log.cost.round(2) %>
short: true
- title: 'Start time'
value: <%= audit_log.start_time.getlocal.to_s %>
short: true
- title: 'End time'
value: <%= audit_log.end_time.getlocal.to_s %>
short: true
- title: 'Duration (sec)'
value: <%= audit_log.duration.round(2) %>
short: true
- title: 'Slot time (sec)'
value: <%= audit_log.total_slot_s.round(2) %>
short: true
- title: 'Query'
value: |-
```
<%= audit_log.query.slice(0, 3000).gsub("\n", "\n ") %>
```
short: false
このアプリケーションを動かすためのDockerイメージを作成します。SlackのWeb hook URLはSecret Managerに格納されており、それをBerglas経由で取得しています。そのため、Berglasのバイナリをコンテナ内に入れ、Berglas経由でPumaを起動しています。Pumaの設定ファイルやGemfileは省略します。
FROM ruby:2.7-slim
COPY --from=us-docker.pkg.dev/berglas/berglas/berglas:latest /bin/berglas /bin/berglas
RUN apt-get -qq update && \
apt-get -qq -y install build-essential --fix-missing --no-install-recommends
WORKDIR /usr/src/app
COPY Gemfile Gemfile.lock ./
ENV BUNDLE_FROZEN=true
RUN gem install bundler && bundle config set --local without 'test' && bundle install
COPY . ./
CMD ["/bin/berglas", "exec", "--", "/usr/local/bundle/bin/puma", "-C", "puma.rb"]
Cloud RunがBerglas経由でSecret ManagerからSlack Web hook URLを取得できるように、以下のコマンドを実行しておきます。
berglas create sm://プロジェクト名/slack_webhook_url "SlackのWeb hook URL"
berglas grant sm://プロジェクト名/slack_webhook_url --member serviceAccount:Cloud Runのサービスアカウント
BigQueryの監査ログをリアルタイムにCloud Runで処理することによって、BigQueryで高額なクエリを実行されたときにいち早く気づくことができるようになりました。スキャン量に対するアラート条件だけでなく使用したSlot数に対する条件を指定できるようにし、オンデマンド課金のプロジェクトでもFlat rate課金のプロジェクトでも使用できました。

なお、上図は意図的に閾値を厳しくした時の通知であり、実際にはこのレベルのクエリでアラートを発報させてはいません。
ZOZOテクノロジーズでは多数の社員から使われるデータ基盤のデータガバナンスを高められる人材を募集しています。ご興味のある方は以下のリンクからご応募ください。
hrmos.co