TL;DR (Too Long; Didn’t Read / 要点だけ知りたい人向けまとめ)
「重複なし・取りこぼしなし(Exactly-once)」は、**入口(読む)/真ん中(処理・状態)/出口(書く)**の3点セットで成立する。
キー設計・チェックポイント・アップサート(またはトランザクション)をそろえ、event_idを端から端まで持ち回す。


まずイメージ:ECサイトの「出荷通知」を数える

  • フロント(ブラウザ/アプリ)がevent_id付きで「出荷通知」を送る。
  • Kafkaに貯める。
  • Flink または Spark Structured Streamingで5分集計。
  • Delta/IcebergやRDSに書く。
  • 障害が起きても、通知1件は集計1回だけにしたい。
[Front] --event_id--> [Kafka] --> [Flink/Spark(状態+CP)] --> [Sink(DB/Delta)]
  • 状態(state)= 途中結果のメモ。窗口集計や重複排除がここに載る。
  • チェックポイント(CP)= 状態のスナップショット。再起動しても続きから再開できる。

Exactly-onceは「3つの歯車」をそろえる

  1. 入口(Source)
    • Kafkaならコミット済みだけ読むisolation.level=read_committed
    • メッセージ順序・再試行の前提を明確化。
  2. 真ん中(Processor + State)
    • FlinkはEXACTLY_ONCE定期CPRocksDB state
    • SparkはcheckpointLocationを信頼できるストレージに固定(S3/HDFS等)。
  3. 出口(Sink)
    • トランザクション対応のシンク(Kafka/Delta/Iceberg)か、idempotent upsert(同じevent_idなら上書き)。
    • これが欠けると「2回書いた」事故が起きる。

最小レシピ:Flink

目的:落ちても“なかったこと”にせず、続きから正しく再開。

# Flink (conf)
execution.checkpointing.interval: 60s           # 60秒ごとにCP
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: rocksdb                          # 大きな状態でも安定
# 混雑時のCP詰まりがひどければ:
execution.checkpointing.unaligned: true

コードのポイント(概念)

  • keyBy(...).window(...).aggregate(...)の前に重複排除を入れるなら、event_idBloomFilterMapStateを使う。
  • Kafkaシンクはトランザクション有効に。transaction.timeout.msCP間隔×数倍にする。

最小レシピ:Spark Structured Streaming

目的:再実行されても二重登録しない

val q =
  spark.readStream
    .format("kafka")
    .option("kafka.isolation.level", "read_committed")
    .load()
    .selectExpr("CAST(value AS STRING) as json")
    .select(from_json($"json", schema).as("e"))
    .select($"e.event_id", $"e.user_id", $"e.ts")
    .withWatermark("ts", "10 minutes")                 // 遅延データの線引き
    .groupBy(window($"ts", "5 minutes"), $"user_id")
    .count()
    .writeStream
    .format("delta")                                    // 取れるならトランザクション系
    .option("checkpointLocation", "s3://.../chk")      // 安定ストレージ
    .outputMode("update")
    .start("s3://.../delta-out")

外部DBへ直接書く場合は、foreachBatch****でUPSERTを実装(event_idを主キーに相当させる)。Delta/IcebergならMERGE INTOが楽。


状態設計のコツ(“速く・痩せて・偏らせない”)

  • キー偏り対策:ホットキーにはサルティング(例:userId + "#" + (hash%8)
  • TTL:要らなくなった状態はState TTLで自動削除。メモリ・RocksDBの肥大化を防ぐ。
  • 遅延データ:ウォーターマークで線を引く。範囲外はサイド出力(Flink)や別テーブル(Spark)に逃がす。
  • イベント順序:前提を文書化(「同一ユーザは原則時系列」「たまの逆順は5分まで許容」など)。

監視:壊れる前に気づくメトリクス

  • Flink:checkpoint_durationfailed_checkpointsbackPressuredTimeMsPerSecondstateサイズ
  • Spark:numInputRowsstateOperatorsNumRowsTotalinputRowsPerSecondprocessedRowsPerSecond
  • Kafka:consumer_lag
  • しきい値:CP時間が間隔の50%超で要注意、80%超で要対応など、基準を先に決める

ランブック(何かあったときはこの順序)

  1. 症状確認:重複?欠損?遅延?(どれかで対応が変わる)
  2. 入口を見る:Kafkaのコミット設定・ラグ。
  3. 状態を見る:CPが詰まっていないか、stateサイズが暴れていないか。
  4. 出口を見る:UPSERTのキー不備(event_id欠落)やトランザクション失敗。
  5. 一時対応:ウォーターマーク緩和/バッファ拡張/リトライ待ち。
  6. 恒久対応:キー分散・TTL調整・CP間隔とtransaction.timeout.msの見直し。

“事故テスト”で本当にExactly-onceか確かめる

  • やること:CP進行中に意図的にタスクKill → 自動再開後、重複がゼロかを**count(distinct event_id) == count(*)**で検証。
  • ケース:遅延データ多発/ホットキー集中/シンク一時停止/ネットワーク揺らぎ。
  • 合格ライン:再実行しても合計件数・集計値が不変

よくある落とし穴(と回避策)

  • CP間隔≪トランザクションタイムアウト:途中で切れて二重書き → タイムアウトを大きめに。
  • event_idをつけ忘れ:UPSERTできず重複 → フロントで必ず採番し、全経路で持ち回す。
  • キー偏り:一部パーティションだけ遅い → サルティング上流での軽い分散
  • チェックポイント置き場が脆い:ローカルDiskや不安定S3パス → 信頼できる場所に固定。

仕上げのチェックリスト(配備前の5分点検)

目的

  • 本番投入後に重複・欠損・二重書き込みが起きないようにする
  • 「想定外の遅延」「状態肥大化」「CP詰まり」など運用トラブルを未然に防ぐ
  • 関係者が「Go/No-Go」を即判断できる状態にする

具体的にやること

  1. データ契約の確認
    • event_idが必ず発行されているか
    • スキーマや時刻フォーマットが揃っているか
  2. ソース確認
    • Kafka設定(read_committed)やパーティション設計が正しいか
    • 保持期間設定(retention)と遅延処理ポリシーが一致しているか
  3. 処理系(Flink / Spark)の健全性
    • FlinkならEXACTLY_ONCEとチェックポイント間隔を確認
    • SparkならcheckpointLocationが安定したストレージに置かれているか
  4. 状態管理とチェックポイント
    • State TTLが妥当か(無駄に肥大化しないか)
    • チェックポイントにかかる時間が許容範囲内か
  5. シンク(出力先)
    • トランザクション or UPSERTが効いているか
    • DBにユニークキーやPKが設定されているか
  6. 可観測性
    • メトリクス/アラートが設定済みか
    • ダッシュボードで状態がすぐ見えるか
  7. リカバリテスト
    • タスクKill後にExactly-onceが守られるか(重複なし)
    • ロールバック手順が用意されているか
  8. セキュリティ
    • 権限が最小化されているか
    • 通信・保存の暗号化が有効か
  9. キャパシティ / コスト
    • 負荷試験の結果が目標を満たしているか
    • 状態やCPストレージ容量に余裕があるか
  10. Go/No-Go判定
  • end-to-endで count(distinct event_id) == count(*) を満たすか
  • 再起動しても出力が変わらないか
  • 緊急連絡先やランブックが共有済みか

投稿者 kojiro777

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です