一言まとめ
遅れて来るデータが前提。**状態(=途中結果のメモ)をチェックポイント(=復元スナップショット)**で守り、トランザクション/UPSERTで“同じイベントは1回だけ”にする。
全体像(まずは絵で)
[Front] --event_id--> [Kafka] --> [Flink(状態+CP+Watermark)] --> [Sink: Delta/Iceberg/DB(UPSERT)]
^ |
| v
read_committed Exactly-once書き込み
- event_id: イベント固有のID。重複排除のカギ。
- 状態: 途中結果のメモ(例: 窓ごとのカウント)。
- CP: チェックポイント。落ちても続きから再開。
- Watermark: 「どこまで遅延を待つか」の線引き。
これは何(What)
Flinkで状態管理 + Exactly-onceを成立させる、実務向けの最小パターン。
なぜ重要(Why)
- 再実行や障害でも二重計上/欠損を防ぐ。
- 売上KPIやML特徴量の信頼性を保つ。
- 「数分後に数字が変わる理由」を運用として説明できる。
いつ使う(When)
- サブ秒〜数秒の低レイテンシがほしい。
- イベント時刻に厳密(遅延データを正しく集計)。
- 大きな状態を持続して扱いたい(ユーザー単位の連続検知など)。
How:5ステップで導入
- IDを付ける: フロントで
event_id
を採番→Kafka→Flink→シンクまで持ち回す。 - 入口を整える: Kafkaは
isolation.level=read_committed
で確定済みのみ読む。 - Flinkの土台:
EXACTLY_ONCE
+定期チェックポイント(例: 60秒)。状態はRocksDB
を選べる。 - 遅延に備える: Watermarkで許容遅延(例: 10分)を決め、ウィンドウ確定を安定化。
- 出口を堅牢に: Delta/Iceberg/Kafkaのトランザクション or DBのUPSERTで冪等化。
設定スニペット(まずはここから)
# Checkpointを1分おきに、Exactly-onceで
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.interval: 60s
# 状態が大きいならRocksDBに逃がす
state.backend: rocksdb
# CPが詰まるなら(混雑対策)
execution.checkpointing.unaligned: true
Java最小例(概念)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("broker:9092")
.setTopics("events")
.setGroupId("flink-app")
.setProperty("isolation.level", "read_committed")
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
WatermarkStrategy<MyEvent> wms = WatermarkStrategy
.<MyEvent>forBoundedOutOfOrderness(Duration.ofMinutes(10))
.withTimestampAssigner((e, ts) -> e.ts());
DataStream<MyEvent> s = env.fromSource(source, wms, "kafka");
DataStream<String> out = s
.keyBy(MyEvent::userId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new CountAgg()) // 集計結果を文字列化したと仮定
;
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("broker:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("agg-out")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
out.sinkTo(sink);
env.execute("flink-exactly-once-demo");
ポイント: Watermarkで遅延を許容、DeliveryGuarantee.EXACTLY_ONCEで出力を保護。
UPSERT例(DB/Delta)
-- PostgreSQL
INSERT INTO metrics(event_id, window_start, user_id, cnt)
VALUES($1,$2,$3,$4)
ON CONFLICT (event_id) DO UPDATE SET
window_start = EXCLUDED.window_start,
user_id = EXCLUDED.user_id,
cnt = EXCLUDED.cnt;
-- Delta Lake
MERGE INTO metrics t
USING updates s
ON t.event_id = s.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
監視(見るべきメトリクス)
- CP系:
checkpoint_duration
、failed_checkpoints
(増えたら要調査)。 - 負荷系:
backPressuredTimeMsPerSecond
、処理RPS。 - 状態系: stateサイズ(急増=TTL/分散の見直し)。
- Kafka: consumer lag(溜まりすぎは入口が詰まり気味)。
よくある落とし穴(と回避)
- CP間隔 ≪ transaction.timeout.ms: 途中で取引が切れ二重書き→タイムアウトをCP間隔×3程度に。
- チェックポイント置き場がローカル: 再起動で消えて復旧失敗→S3/HDFSへ。
event_id
を付け忘れ: UPSERTできず重複→フロントで必ず採番し持ち回す。- ホットキー偏り: 一部パーティションだけ遅い→サルティング(例:
userId#(hash%8)
)。
ランブック(30秒で切り分け)
- 症状の型: 重複 / 欠損 / 遅延悪化 のどれか?
- 入口を見る: lag、
read_committed
、パーティション偏り。 - 状態を見る: CP失敗・state急増・TTL設定。
- 出口を見る: UPSERTキー、トランザクション失敗率。
- 応急: Watermark緩和/再試行、バッファ拡大。
- 恒久: サルティング追加、CP/トランザクションの時間調整、ユニーク制約。
FAQミニ
Q. Exactly-onceは“完全保証”ですか?
A. Flink側での再処理制御+取引型シンクで実務的に達成します。非対応シンクではUPSERTで代替。
Q. Watermarkは何分が良い?
A. 現場の遅延P95/P99を見て決定。大きすぎると状態肥大、短すぎると欠損。
Q. RocksDBは必須?
A. 大きな状態や長期保持があるなら有利。小規模ならメモリでもOK。
まとめ
- 待つラインを決める(Watermark)。
- 安全に覚える(状態+CP)。
- 同じIDは上書き(UPSERT/トランザクション)。 この3点が揃えば、Flinkで**“1回だけ”の集計に一歩も二歩も近づきます。次回はテスト戦略(事故テストの型)**を扱います。