TL;DR (Too Long; Didn’t Read / 要点だけ知りたい人向けまとめ)
「重複なし・取りこぼしなし(Exactly-once)」は、**入口(読む)/真ん中(処理・状態)/出口(書く)**の3点セットで成立する。
キー設計・チェックポイント・アップサート(またはトランザクション)をそろえ、event_idを端から端まで持ち回す。
まずイメージ:ECサイトの「出荷通知」を数える
- フロント(ブラウザ/アプリ)がevent_id付きで「出荷通知」を送る。
- Kafkaに貯める。
- Flink または Spark Structured Streamingで5分集計。
- Delta/IcebergやRDSに書く。
- 障害が起きても、通知1件は集計1回だけにしたい。
[Front] --event_id--> [Kafka] --> [Flink/Spark(状態+CP)] --> [Sink(DB/Delta)]
- 状態(state)= 途中結果のメモ。窗口集計や重複排除がここに載る。
- チェックポイント(CP)= 状態のスナップショット。再起動しても続きから再開できる。
Exactly-onceは「3つの歯車」をそろえる
- 入口(Source)
- Kafkaならコミット済みだけ読む:
isolation.level=read_committed
。 - メッセージ順序・再試行の前提を明確化。
- Kafkaならコミット済みだけ読む:
- 真ん中(Processor + State)
- Flinkは
EXACTLY_ONCE
+定期CP+RocksDB state。 - SparkはcheckpointLocationを信頼できるストレージに固定(S3/HDFS等)。
- Flinkは
- 出口(Sink)
- トランザクション対応のシンク(Kafka/Delta/Iceberg)か、idempotent upsert(同じ
event_id
なら上書き)。 - これが欠けると「2回書いた」事故が起きる。
- トランザクション対応のシンク(Kafka/Delta/Iceberg)か、idempotent upsert(同じ
最小レシピ:Flink
目的:落ちても“なかったこと”にせず、続きから正しく再開。
# Flink (conf)
execution.checkpointing.interval: 60s # 60秒ごとにCP
execution.checkpointing.mode: EXACTLY_ONCE
state.backend: rocksdb # 大きな状態でも安定
# 混雑時のCP詰まりがひどければ:
execution.checkpointing.unaligned: true
コードのポイント(概念)
keyBy(...).window(...).aggregate(...)
の前に重複排除を入れるなら、event_id
でBloomFilterやMapState
を使う。- Kafkaシンクはトランザクション有効に。
transaction.timeout.ms
はCP間隔×数倍にする。
最小レシピ:Spark Structured Streaming
目的:再実行されても二重登録しない。
val q =
spark.readStream
.format("kafka")
.option("kafka.isolation.level", "read_committed")
.load()
.selectExpr("CAST(value AS STRING) as json")
.select(from_json($"json", schema).as("e"))
.select($"e.event_id", $"e.user_id", $"e.ts")
.withWatermark("ts", "10 minutes") // 遅延データの線引き
.groupBy(window($"ts", "5 minutes"), $"user_id")
.count()
.writeStream
.format("delta") // 取れるならトランザクション系
.option("checkpointLocation", "s3://.../chk") // 安定ストレージ
.outputMode("update")
.start("s3://.../delta-out")
外部DBへ直接書く場合は、foreachBatch
****でUPSERTを実装(event_id
を主キーに相当させる)。Delta/IcebergならMERGE INTO
が楽。
状態設計のコツ(“速く・痩せて・偏らせない”)
- キー偏り対策:ホットキーにはサルティング(例:
userId + "#" + (hash%8)
)。 - TTL:要らなくなった状態はState TTLで自動削除。メモリ・RocksDBの肥大化を防ぐ。
- 遅延データ:ウォーターマークで線を引く。範囲外はサイド出力(Flink)や別テーブル(Spark)に逃がす。
- イベント順序:前提を文書化(「同一ユーザは原則時系列」「たまの逆順は5分まで許容」など)。
監視:壊れる前に気づくメトリクス
- Flink:
checkpoint_duration
、failed_checkpoints
、backPressuredTimeMsPerSecond
、stateサイズ。 - Spark:
numInputRows
、stateOperatorsNumRowsTotal
、inputRowsPerSecond
、processedRowsPerSecond
。 - Kafka:
consumer_lag
。 - しきい値:CP時間が間隔の50%超で要注意、80%超で要対応など、基準を先に決める。
ランブック(何かあったときはこの順序)
- 症状確認:重複?欠損?遅延?(どれかで対応が変わる)
- 入口を見る:Kafkaのコミット設定・ラグ。
- 状態を見る:CPが詰まっていないか、stateサイズが暴れていないか。
- 出口を見る:UPSERTのキー不備(
event_id
欠落)やトランザクション失敗。 - 一時対応:ウォーターマーク緩和/バッファ拡張/リトライ待ち。
- 恒久対応:キー分散・TTL調整・CP間隔と
transaction.timeout.ms
の見直し。
“事故テスト”で本当にExactly-onceか確かめる
- やること:CP進行中に意図的にタスクKill → 自動再開後、重複がゼロかを**
count(distinct event_id) == count(*)
**で検証。 - ケース:遅延データ多発/ホットキー集中/シンク一時停止/ネットワーク揺らぎ。
- 合格ライン:再実行しても合計件数・集計値が不変。
よくある落とし穴(と回避策)
- CP間隔≪トランザクションタイムアウト:途中で切れて二重書き → タイムアウトを大きめに。
- event_idをつけ忘れ:UPSERTできず重複 → フロントで必ず採番し、全経路で持ち回す。
- キー偏り:一部パーティションだけ遅い → サルティング+上流での軽い分散。
- チェックポイント置き場が脆い:ローカルDiskや不安定S3パス → 信頼できる場所に固定。
仕上げのチェックリスト(配備前の5分点検)
目的
- 本番投入後に重複・欠損・二重書き込みが起きないようにする
- 「想定外の遅延」「状態肥大化」「CP詰まり」など運用トラブルを未然に防ぐ
- 関係者が「Go/No-Go」を即判断できる状態にする
具体的にやること
- データ契約の確認
event_id
が必ず発行されているか- スキーマや時刻フォーマットが揃っているか
- ソース確認
- Kafka設定(
read_committed
)やパーティション設計が正しいか - 保持期間設定(retention)と遅延処理ポリシーが一致しているか
- Kafka設定(
- 処理系(Flink / Spark)の健全性
- Flinkなら
EXACTLY_ONCE
とチェックポイント間隔を確認 - Sparkなら
checkpointLocation
が安定したストレージに置かれているか
- Flinkなら
- 状態管理とチェックポイント
- State TTLが妥当か(無駄に肥大化しないか)
- チェックポイントにかかる時間が許容範囲内か
- シンク(出力先)
- トランザクション or UPSERTが効いているか
- DBにユニークキーやPKが設定されているか
- 可観測性
- メトリクス/アラートが設定済みか
- ダッシュボードで状態がすぐ見えるか
- リカバリテスト
- タスクKill後にExactly-onceが守られるか(重複なし)
- ロールバック手順が用意されているか
- セキュリティ
- 権限が最小化されているか
- 通信・保存の暗号化が有効か
- キャパシティ / コスト
- 負荷試験の結果が目標を満たしているか
- 状態やCPストレージ容量に余裕があるか
- Go/No-Go判定
- end-to-endで
count(distinct event_id) == count(*)
を満たすか - 再起動しても出力が変わらないか
- 緊急連絡先やランブックが共有済みか