はじめに
こんにちは。Gunosy Tech Lab Data Reliability & MLOps Group の阿部といいます。 この記事は Gunosy Advent Calendar 2019 14日目の記事です。 昨日の記事は竹中さんの ノリで使っていたGoLandをちゃんと使う - Gunosy Tech Blog でした。
弊社では adjust を利用してイベントトラッキングを行い、プロモーション効果計測などに活用しています。 今回の記事では adjust のサーバーサイドイベントトラッキングという機能を紹介し、adjust にイベントを送信するまでの仕組みをお伝えします。
adjust サーバーサイドイベントトラッキングとは
サーバーサイドイベントトラッキングを利用するとクライアントではなくサーバーサイドから adjust にイベントを送信することができます( 参考リンク )。
クライアントに実装が不要なため、たとえばSQLで特定の条件でデバイスIDを抽出し送信するといったことが可能になります。
やりたいこと
adjustにイベントを送信する仕組みを構築するにあたり、以下のようなことを考えました。
- 計測したいイベントが増えたときに、できるだけ早く対応したい
- イベントの重複排除を柔軟に設定できるようにしたい
しかしながらいくつか仕様上の制約があり、若干頭を抱えてしまいました。
複数イベント同時に送信できない
イベント毎にadjustのエンドポイントを叩く必要があります。たとえば数万件とかあったりすると面倒です。
送信間隔は0.01秒以上時間を空ける必要がある
異なるイベントを異なるバッチで走らせていると不意に 0.01秒 の制約を破ってしまうかもしれません。
イベントの重複排除をする場合はデバイスIDでのみ可能 ( 参考リンク )
例えば MAU など、月次などの単位で重複排除したい場合等々あると思います。その場合どうすればいいでしょうか。一ヶ月ごとに処理をしてもいいかもしれませんが、できればイベントが発生した時点で送ってしまいたいですね。(その上、イベントは28日以内に送らないといけないという制約もあります)
イベントトークン毎に時系列でソートされている必要がある (例:3日に発生したイベントトークンXのイベントが、7日に発生した同じトークンのイベントの後に送られてくる場合、3日に発生したイベントは拒否される。)
一度送ったら修正できないので注意が必要
システム構成
以上のような要件と制約をもとにシステムの構成を考えます。 おおまかに以下のような流れになります。
- Digdag + Embulk で Athena を利用して特定条件のイベントを抽出
- それをDynamoDBに書き込む
- イベントの追加/変更を DynamoDB ストリームに流す
- Lambda でイベントを adjust に送信する
重複排除
DynamoDBを使って重複排除を行っています。 重複が有無は同一のプライマリキーが存在するかどうかで判定することができます。 時系列ソートに関しても、厳密ではないですがワークフローを時系列で走らせれば、想定する用途では問題無い程度にはなります。
Digdag (+Embulk) から Athena にクエリを発行し、トラッキングしたい条件に合致するデバイスIDを取得し、DynamoDBに書き込みます。
ワークフローの詳細こちらです。
プライマリキーは {app_token}|{event_token}|{gps_adid}|{idfa}|{idfv}
のようにします。
DynamoDBのアトリビュートは app_token
等のプライマリキーを構成する要素の他、タイムスタンプやTTL用のものも用意しています。
TTLはだいたいのケースではイベント送信期限の28日間を設定すれば良いと思います。
複数同時送信の防止と送信間隔の調整
0.01秒間隔を空けるという制約は、一つのストリーム (DynamoDB Streams) を一つのバッチ (Lambda) で処理することで実現しています。
DynamoDB の設定で DynamoDB ストリームを有効化し、StreamViewType は NEW_IMAGE に設定します ( Capturing Table Activity with DynamoDB Streams - Amazon DynamoDB )。 こうしておくと重複したイベントが来ても DynamoDB ストリームには流れなくなり、重複排除が可能です。 プライマリキーに、例えば日付などの追加の情報を付与し、重複排除の条件を変えることもできます。 Lambda のイベントソースマッピングで DynamoDB を指定します。 ParallelizationFactor を 1 (default) にしておくと、同時に複数のバッチが走らないように制限できます。 イベント数が多いとタイムアウトの恐れがあるので Lambda の Timeout は長く、バッチサイズは小さめしておくと良いです。 同時に Firehose を使う等して、処理したイベントのログをS3に置いておくと安心です。
実装
Lambda はPythonで動かしており、実装はこんな感じです。
import time | |
import json | |
import requests | |
import logging | |
import os | |
logger = logging.getLogger() | |
logger.setLevel(logging.INFO) | |
def send_event(params, retry_count=0): | |
try: | |
resp = requests.get('http://s2s.adjust.com/event', params=params) | |
resp.raise_for_status() | |
time.sleep(0.01) | |
except Exception as err: | |
if retry_count > 0: | |
status = resp.status_code | |
headers = resp.headers | |
encoding = resp.encoding | |
body = resp.text | |
if ('content-type' in headers) and headers['content-type'] == 'application/json; charset=utf-8': | |
try: | |
j = json.loads(body) | |
if ('error' in j) and isinstance(j['error'], str): | |
# | |
# 実際はここでエラーに応じて処理をしている | |
# | |
pass | |
except Exception as e: | |
logging.error(f"Unexpected errors occur: {e}") | |
logging.error(f"requests to adjust is failed: status_code: {status}, header: {headers}, encoding: {encoding}, body: {body}") | |
time.sleep(0.1) | |
send_event(params, retry_count-1) | |
raise err | |
def lambda_handler(event, context): | |
num_retries = int(os.getenv('NUM_RETRIES', '3')) | |
num_events = 0 | |
num_success = 0 | |
for record in event['Records']: | |
if 'NewImage' not in record['dynamodb']: | |
continue | |
num_events += 1 | |
data = record['dynamodb']['NewImage'] | |
try: | |
params = { | |
'app_token': data['app_token']['S'], | |
'event_token': data['event_token']['S'], | |
's2s': 1, | |
'created_at_unix': data['created_at_unix']['N'], | |
} | |
# 値がなければ - を入れているため | |
if data['idfa']['S'] and data['idfa']['S'] != '-': | |
params['idfa'] = data['idfa']['S'] | |
if data['idfv']['S'] and data['idfv']['S'] != '-': | |
params['idfv'] = data['idfv']['S'] | |
if data['gps_adid']['S'] and data['gps_adid']['S'] != '-': | |
params['gps_adid'] = data['gps_adid']['S'] | |
except Exception: | |
logging.error(f'failed to parse record: {data}') | |
continue | |
try: | |
send_event(params, retry_count=num_retries) | |
num_success += 1 | |
except Exception: | |
logging.error(f'failed to send event {params}') | |
return f'successfully sent {num_success} events / {num_events} events' |
エラー処理は500の内でも無視して良いもの、だめなものなどを発生ベースで洗練させていっています(リトライで直る可能性があればリトライする等)。エラーコードとエラー内容の一部詳細は以下に記載があります。
基本的にはエラーが発生してもコード上ではロギングし、CloudWatchに出力します。 一部のエラーはCloudWatchを確認して再送する必要もあるでしょう。 また、例外が発生したときでも失敗して停止はさせないようにしています。 というのも、途中で失敗して再実行されると一部のイベントだけ重複して送信されてしまう恐れがあるからです。 一部のエラーの時は数回リトライを実行し、うまくいかなければ諦めます。そのため、クリティカルな要件に使うことはできないので注意が必要です。
運用
このような構成にしておくと、比較的簡単にトラッキングイベントの追加ができます。 手順は
- adjust のイベント作成 (ユニークフラグはdisableのままにしておく)
- クエリを書く
- Digdagにワークフロー追加
だけです。 実質クエリを書くだけですね。 adjustのイベントはテスト用やデバッグ用のものを作り、本番投入の前にテスト用に流すということもしています。 実際のクエリはこのように書いています。
with | |
installed_users as ( | |
select distinct | |
os | |
, gps_adid | |
, idfa | |
, idfv | |
from install_log | |
-- ログはUTC時間で収集されているので、JSTに合わせている | |
-- dt は hour 単位 | |
where date(date_parse(dt, '%Y%m%d%H') AT TIME ZONE 'Asia/Tokyo') = date('${install_jst_date}') | |
), | |
active_users as ( | |
select gps_adid | |
, idfa | |
, idfv | |
, session_id | |
, min(dt) as dt | |
from client_log | |
where date(date_parse(dt, '%Y%m%d%H') AT TIME ZONE 'Asia/Tokyo') = date('${launch_jst_date}') | |
and event_type = 'Launch' -- 起動したというイベント | |
group by 1, 2, 3, 4 | |
order by dt | |
) | |
select distinct | |
nullif(coalesce(installed_users.gps_adid, active_users.gps_adid), '-') as gps_adid | |
, nullif(coalesce(installed_users.idfa, active_users.idfa), '-') as idfa | |
, nullif(coalesce(installed_users.idfv, active_users.idfv), '-') as idfv | |
, to_unixtime(date_parse(array_agg(active_users.dt)[${num_sessions}], '%Y%m%d%H')) as created_at_unix | |
, to_unixtime(date_parse(array_agg(active_users.dt)[${num_sessions}], '%Y%m%d%H') + interval '28' day) as time_to_exist | |
, os | |
from installed_users | |
join active_users | |
on installed_users.gps_adid = active_users.gps_adid | |
or installed_users.idfa = active_users.idfa | |
or installed_users.idfv = active_users.idfv | |
group by 1, 2, 3, 6 | |
having count(session_id) >= ${num_sessions} |
この例ではアプリインストールM日後にNセッションあるデバイスを抽出しています。 日付やセッション数をプレースホルダにしておいて、Digdagで埋め込んでいます。
まとめ
この記事では adjust サーバーサイドイベントトラッキングと、イベントを送信する仕組みを紹介しました。 adjustの制約と付き合いつつ、AWS の DynamoDB や Lambda を利用することで簡単にイベント測定できるように工夫したので、どこかで参考になれば嬉しいです。