一言まとめ
遅れて届くイベント(遅延データ)を前提に設計し、ウォーターマーク+安定したチェックポイント+**UPSERT(同じIDなら上書き)**で、結果を“1回だけ”に保つ。
まずイメージ(30秒)
ユーザーの操作ログが遅れて届くことは普通です(電波・端末・リトライ)。そのまま集計すると、重複や欠損で数字が歪みます。
だからこそ、**「どこまで待つか」**を決め(ウォーターマーク)、**途中結果(状態)**を安全に持ち、同じイベントは上書きにします。
[Front] --event_id--> [Kafka] --> [Spark(状態+ウォーターマーク+CP)] --> [Sink(DB/Delta=UPSERT)]
- event_id=イベント固有のID(重複排除のカギ)
- 状態(state)=途中結果のメモ
- CP(チェックポイント)=状態のスナップショット
何をする?(What)
遅延データがあっても、最終的な集計結果が1回分だけになるよう、Spark Structured Streamingを設定します。
なぜ大事?(Why)
- 売上・KPI・ML特徴量が正しい値に近づく。
- 障害後の再実行でも二重登録を防げる。
- 現場の“体感ズレ”を減らせる(数分後に数字が変わる理由を説明できる)。
どうやる?(How:5ステップ)
- イベントIDを必ず付ける:
event_id
をフロントで採番し、下流で持ち回す。 - ウォーターマークを決める:現場の遅延P99を見て、例「10分」。
- チェックポイントを安定化:
checkpointLocation
はS3/HDFSなど“消えない場所”に。 - 出力はUPSERTにする:同じ
event_id
なら上書き。Delta/IcebergやDBのON CONFLICT
を使う。 - 監視を置く:
state storeサイズ
、processedRowsPerSecond
、エラー率、遅延件数。
コピペで試す(最小スニペット)
val q = events
.withWatermark("ts", "10 minutes") // 遅延の線引き
.groupBy(window($"ts", "5 minutes"), $"user_id")
.count()
.writeStream
.option("checkpointLocation", "s3://YOUR-BUCKET/chk") // 安定ストレージ
.format("delta") // 取れるなら取引型シンク
.outputMode("update")
.start("s3://YOUR-BUCKET/delta-out")
DeltaのUPSERT(MERGE)例
MERGE INTO metrics t
USING updates s
ON t.event_id = s.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
PostgreSQLのUPSERT例
INSERT INTO metrics(event_id, window_start, user_id, cnt)
VALUES($1,$2,$3,$4)
ON CONFLICT (event_id) DO UPDATE
SET window_start = EXCLUDED.window_start,
user_id = EXCLUDED.user_id,
cnt = EXCLUDED.cnt;
よくある落とし穴(と回避策)
- ウォーターマークが短すぎ:遅延分が捨てられる → 現場のデータでP95/P99を測って決める。
- ウォーターマークが長すぎ:状態が肥大してレイテンシ悪化 → 期間を縮める or バッチに後送。
- checkpointLocationがローカル:再起動で消えて再集計→S3/HDFSへ。
- **
event_id
**欠落:UPSERTできず二重登録→フロントで必ず採番。 - DB側にユニーク制約なし:アプリだけで冪等化は不十分→
UNIQUE(event_id)
を付ける。
観測ポイント(運用の見える化)
- Spark:
inputRowsPerSecond
、processedRowsPerSecond
、stateOperatorsNumRowsTotal
- ストレージ:
checkpoint
ディレクトリのサイズ推移 - DB/Delta:書き込み失敗率、ロック待ち、スループット
1分点検(出す前に)
- ウォーターマーク=遅延P99前後で設定した?
checkpointLocation
は消えない場所(S3/HDFS)?- シンクはUPSERT/トランザクション対応?
event_id
を全経路で持ち回してる?- ダッシュボードでstateサイズとRPSが見える?
次の一歩(そのまま使える運用テンプレ)
- 週次レビュー:遅延分布(P50/P95/P99)を更新し、ウォーターマークを見直す。
- 事故テスト:チェックポイント中に意図的にタスクKill→重複ゼロ/欠損ゼロを確認。
- コスト監視:state storeとcheckpointのストレージ増分をダッシュボード化。
- ランブック更新:障害対応の手順と連絡先を記事末のリンクに集約。
まとめ
遅延は“異常”ではなく日常です。
待つライン(ウォーターマーク)を決める → 安全に覚える(状態+CP) → 同じIDは上書く(UPSERT)。
これだけで、数字がブレない“信頼できるストリーム集計”にグッと近づきます。
付録:FAQミニ
Q. ウォーターマークはどれくらいが目安?
A. まずは遅延のP99±少し。大きすぎて状態が膨らむなら段階的に短縮。
Q. UPSERTが使えないシンクは?
A. アプリ側でevent_id
の重複判定を実装するか、トランザクション対応のシンク(Delta/Iceberg/Kafka)へ切り替え。
Q. 数分後に数字が変わるのはなぜ?
A. 遅延データを許容時間内で取り込んでいるため。運用上の“仕様”として説明できるようにする。