一言まとめ
遅れて届くイベント(遅延データ)を前提に設計し、ウォーターマーク安定したチェックポイント+**UPSERT(同じIDなら上書き)**で、結果を“1回だけ”に保つ。


まずイメージ(30秒)

ユーザーの操作ログが遅れて届くことは普通です(電波・端末・リトライ)。そのまま集計すると、重複欠損で数字が歪みます。
だからこそ、**「どこまで待つか」**を決め(ウォーターマーク)、**途中結果(状態)**を安全に持ち、同じイベントは上書きにします。

[Front] --event_id--> [Kafka] --> [Spark(状態+ウォーターマーク+CP)] --> [Sink(DB/Delta=UPSERT)]
  • event_id=イベント固有のID(重複排除のカギ)
  • 状態(state)=途中結果のメモ
  • CP(チェックポイント)=状態のスナップショット

何をする?(What)

遅延データがあっても、最終的な集計結果が1回分だけになるよう、Spark Structured Streamingを設定します。

なぜ大事?(Why)

  • 売上・KPI・ML特徴量が正しい値に近づく。
  • 障害後の再実行でも二重登録を防げる。
  • 現場の“体感ズレ”を減らせる(数分後に数字が変わる理由を説明できる)。

どうやる?(How:5ステップ)

  1. イベントIDを必ず付けるevent_idをフロントで採番し、下流で持ち回す。
  2. ウォーターマークを決める:現場の遅延P99を見て、例「10分」。
  3. チェックポイントを安定化checkpointLocationはS3/HDFSなど“消えない場所”に。
  4. 出力はUPSERTにする:同じevent_idなら上書き。Delta/IcebergやDBのON CONFLICTを使う。
  5. 監視を置くstate storeサイズprocessedRowsPerSecond、エラー率、遅延件数。

コピペで試す(最小スニペット)

val q = events
  .withWatermark("ts", "10 minutes")              // 遅延の線引き
  .groupBy(window($"ts", "5 minutes"), $"user_id")
  .count()
  .writeStream
  .option("checkpointLocation", "s3://YOUR-BUCKET/chk") // 安定ストレージ
  .format("delta")                                     // 取れるなら取引型シンク
  .outputMode("update")
  .start("s3://YOUR-BUCKET/delta-out")

DeltaのUPSERT(MERGE)例

MERGE INTO metrics t
USING updates s
ON t.event_id = s.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

PostgreSQLのUPSERT例

INSERT INTO metrics(event_id, window_start, user_id, cnt)
VALUES($1,$2,$3,$4)
ON CONFLICT (event_id) DO UPDATE
SET window_start = EXCLUDED.window_start,
    user_id      = EXCLUDED.user_id,
    cnt          = EXCLUDED.cnt;

よくある落とし穴(と回避策)

  • ウォーターマークが短すぎ:遅延分が捨てられる → 現場のデータでP95/P99を測って決める。
  • ウォーターマークが長すぎ:状態が肥大してレイテンシ悪化 → 期間を縮める or バッチに後送。
  • checkpointLocationがローカル:再起動で消えて再集計→S3/HDFSへ。
  • **event_id**欠落:UPSERTできず二重登録→フロントで必ず採番。
  • DB側にユニーク制約なし:アプリだけで冪等化は不十分→UNIQUE(event_id)を付ける。

観測ポイント(運用の見える化)

  • Spark:inputRowsPerSecondprocessedRowsPerSecondstateOperatorsNumRowsTotal
  • ストレージ:checkpointディレクトリのサイズ推移
  • DB/Delta:書き込み失敗率、ロック待ち、スループット

1分点検(出す前に)

  • ウォーターマーク=遅延P99前後で設定した?
  • checkpointLocation消えない場所(S3/HDFS)
  • シンクはUPSERT/トランザクション対応
  • event_id全経路で持ち回してる?
  • ダッシュボードでstateサイズRPSが見える?

次の一歩(そのまま使える運用テンプレ)

  • 週次レビュー:遅延分布(P50/P95/P99)を更新し、ウォーターマークを見直す。
  • 事故テスト:チェックポイント中に意図的にタスクKill→重複ゼロ/欠損ゼロを確認。
  • コスト監視:state storeとcheckpointのストレージ増分をダッシュボード化。
  • ランブック更新:障害対応の手順と連絡先を記事末のリンクに集約。

まとめ

遅延は“異常”ではなく日常です。
待つライン(ウォーターマーク)を決める → 安全に覚える(状態+CP) → 同じIDは上書く(UPSERT)
これだけで、数字がブレない“信頼できるストリーム集計”にグッと近づきます。


付録:FAQミニ

Q. ウォーターマークはどれくらいが目安?
A. まずは遅延のP99±少し。大きすぎて状態が膨らむなら段階的に短縮。

Q. UPSERTが使えないシンクは?
A. アプリ側でevent_idの重複判定を実装するか、トランザクション対応のシンク(Delta/Iceberg/Kafka)へ切り替え。

Q. 数分後に数字が変わるのはなぜ?
A. 遅延データを許容時間内で取り込んでいるため。運用上の“仕様”として説明できるようにする。

投稿者 kojiro777

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です