一言まとめ
遅れて来るデータが前提。**状態(=途中結果のメモ)チェックポイント(=復元スナップショット)**で守り、トランザクション/UPSERTで“同じイベントは1回だけ”にする。


全体像(まずは絵で)

[Front] --event_id--> [Kafka] --> [Flink(状態+CP+Watermark)] --> [Sink: Delta/Iceberg/DB(UPSERT)]
            ^                                |
            |                                v
        read_committed                  Exactly-once書き込み
  • event_id: イベント固有のID。重複排除のカギ。
  • 状態: 途中結果のメモ(例: 窓ごとのカウント)。
  • CP: チェックポイント。落ちても続きから再開。
  • Watermark: 「どこまで遅延を待つか」の線引き。

これは何(What)

Flinkで状態管理 + Exactly-onceを成立させる、実務向けの最小パターン。

なぜ重要(Why)

  • 再実行や障害でも二重計上/欠損を防ぐ。
  • 売上KPIやML特徴量の信頼性を保つ。
  • 「数分後に数字が変わる理由」を運用として説明できる。

いつ使う(When)

  • サブ秒〜数秒の低レイテンシがほしい。
  • イベント時刻に厳密(遅延データを正しく集計)。
  • 大きな状態を持続して扱いたい(ユーザー単位の連続検知など)。

How:5ステップで導入

  1. IDを付ける: フロントでevent_idを採番→Kafka→Flink→シンクまで持ち回す。
  2. 入口を整える: Kafkaはisolation.level=read_committedで確定済みのみ読む。
  3. Flinkの土台: EXACTLY_ONCE+定期チェックポイント(例: 60秒)。状態はRocksDBを選べる。
  4. 遅延に備える: Watermarkで許容遅延(例: 10分)を決め、ウィンドウ確定を安定化。
  5. 出口を堅牢に: Delta/Iceberg/Kafkaのトランザクション or DBのUPSERTで冪等化。

設定スニペット(まずはここから)

# Checkpointを1分おきに、Exactly-onceで
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.interval: 60s
# 状態が大きいならRocksDBに逃がす
state.backend: rocksdb
# CPが詰まるなら(混雑対策)
execution.checkpointing.unaligned: true

Java最小例(概念)

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);

KafkaSource<String> source = KafkaSource.<String>builder()
  .setBootstrapServers("broker:9092")
  .setTopics("events")
  .setGroupId("flink-app")
  .setProperty("isolation.level", "read_committed")
  .setValueOnlyDeserializer(new SimpleStringSchema())
  .build();

WatermarkStrategy<MyEvent> wms = WatermarkStrategy
  .<MyEvent>forBoundedOutOfOrderness(Duration.ofMinutes(10))
  .withTimestampAssigner((e, ts) -> e.ts());

DataStream<MyEvent> s = env.fromSource(source, wms, "kafka");

DataStream<String> out = s
  .keyBy(MyEvent::userId)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .aggregate(new CountAgg()) // 集計結果を文字列化したと仮定
  ;

KafkaSink<String> sink = KafkaSink.<String>builder()
  .setBootstrapServers("broker:9092")
  .setRecordSerializer(KafkaRecordSerializationSchema.builder()
      .setTopic("agg-out")
      .setValueSerializationSchema(new SimpleStringSchema())
      .build())
  .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
  .build();

out.sinkTo(sink);
env.execute("flink-exactly-once-demo");

ポイント: Watermarkで遅延を許容、DeliveryGuarantee.EXACTLY_ONCEで出力を保護。

UPSERT例(DB/Delta)

-- PostgreSQL
INSERT INTO metrics(event_id, window_start, user_id, cnt)
VALUES($1,$2,$3,$4)
ON CONFLICT (event_id) DO UPDATE SET
  window_start = EXCLUDED.window_start,
  user_id      = EXCLUDED.user_id,
  cnt          = EXCLUDED.cnt;
-- Delta Lake
MERGE INTO metrics t
USING updates s
ON t.event_id = s.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

監視(見るべきメトリクス)

  • CP系: checkpoint_durationfailed_checkpoints(増えたら要調査)。
  • 負荷系: backPressuredTimeMsPerSecond、処理RPS。
  • 状態系: stateサイズ(急増=TTL/分散の見直し)。
  • Kafka: consumer lag(溜まりすぎは入口が詰まり気味)。

よくある落とし穴(と回避)

  • CP間隔 ≪ transaction.timeout.ms: 途中で取引が切れ二重書き→タイムアウトをCP間隔×3程度に。
  • チェックポイント置き場がローカル: 再起動で消えて復旧失敗→S3/HDFSへ。
  • event_idを付け忘れ: UPSERTできず重複→フロントで必ず採番し持ち回す。
  • ホットキー偏り: 一部パーティションだけ遅い→サルティング(例: userId#(hash%8))。

ランブック(30秒で切り分け)

  1. 症状の型: 重複 / 欠損 / 遅延悪化 のどれか?
  2. 入口を見る: lag、read_committed、パーティション偏り。
  3. 状態を見る: CP失敗・state急増・TTL設定。
  4. 出口を見る: UPSERTキー、トランザクション失敗率。
  5. 応急: Watermark緩和/再試行、バッファ拡大。
  6. 恒久: サルティング追加、CP/トランザクションの時間調整、ユニーク制約。

FAQミニ

Q. Exactly-onceは“完全保証”ですか?
A. Flink側での再処理制御+取引型シンクで実務的に達成します。非対応シンクではUPSERTで代替。

Q. Watermarkは何分が良い?
A. 現場の遅延P95/P99を見て決定。大きすぎると状態肥大、短すぎると欠損。

Q. RocksDBは必須?
A. 大きな状態や長期保持があるなら有利。小規模ならメモリでもOK。


まとめ

  • 待つラインを決める(Watermark)。
  • 安全に覚える(状態+CP)。
  • 同じIDは上書き(UPSERT/トランザクション)。 この3点が揃えば、Flinkで**“1回だけ”の集計に一歩も二歩も近づきます。次回はテスト戦略(事故テストの型)**を扱います。

投稿者 kojiro777

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です