TL;DR(最初に要点)

  • 目的:重複なしで正しい集計・特徴量を作る(信頼できるデータ基盤)。
  • コアCheckpointingMode.EXACTLY_ONCEトランザクション対応シンク(Kafka等)。
  • 運用の肝状態TTLで肥大化を防ぐ/チェックポイントの安定化/復旧時の重複防止
  • アンチパターン:シンクが冪等でもトランザクション非対応だと、障害復旧で二重書き込みが起きがち。

なぜ大事?(ML/分析の観点)

  • 特徴量の二重計算 → 学習・推論が劣化。
  • 集計の二重カウント → KPIが信用できない。
  • 一度壊れると後続全体が汚染 → 追跡・修復コストが爆増。

Exactly-once は「再実行があっても結果が一意である」ことを担保し、信頼できる特徴量・指標を作ります。


仕組みの全体像(Source → Operator → Sink)

  1. チェックポイント:演算子の状態・ソースのオフセットをスナップショット。
  2. 再処理時:最新の正常なスナップショットから復旧し、同じ入力境界から決定的に再計算
  3. シンクのコミット:2相コミットで「このチェックポイント分だけ」原子に公開。

ポイント:“演算は重ねてOK、公開は一度だけ” の設計にする。


最小設定スニペット(Java)

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 60 秒ごとにチェックポイント(Exactly-once)
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);

// 失敗時の堅牢性
env.getCheckpointConfig()
   .setMinPauseBetweenCheckpoints(30_000)         // チェックポイント間の最小間隔
   .setCheckpointTimeout(120_000)                 // タイムアウト
   .setTolerableCheckpointFailureNumber(3)        // 許容失敗回数
   .setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// Source/Transform/State …

状態(State)管理の実装要点

代表的な状態タイプ

  • ValueState:単一値(累積カウンタ、最新特徴量など)
  • ListState:順序リスト(最近N件など)
  • MapState<K,V>:キー→値(特徴量辞書)

TTL(Time-to-Live)の付与

状態が増え続けると 復旧が遅くなるストレージ圧迫。TTLで古いキーを自動清掃。

StateTtlConfig ttl = StateTtlConfig
    .newBuilder(Time.days(7))                     // 7日で期限切れ
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
    .build();

ValueStateDescriptor<Long> desc = new ValueStateDescriptor<>("cnt", Long.class);
desc.enableTimeToLive(ttl);

Tips:期限切れ状態の物理削除は遅延して行われる(GC的)。メモリ・バックエンドと間隔を監視。


シンク設計:冪等 vs. トランザクション

種別特徴復旧時の二重防止
トランザクション対応Kafka Transactional(KafkaSink Exactly-once)、2PCシンク2相コミット(pre-commit→checkpoint→commit)強い(チェックポイントと連動)
冪等APIPUT / UPSERT、主キー重複無視同一キーの重複を無視入力キー設計が重要
単純AppendS3/HDFSへファイル追記追記時に重複し得る弱い:ファイル原子性や出力パターンの工夫が必要

Kafka(Exactly-once)例(Flink 1.15+ の KafkaSink

KafkaSink<String> sink = KafkaSink.<String>builder()
    .setBootstrapServers("kafka:9092")
    .setRecordSerializer(KafkaRecordSerializationSchema.builder()
        .setTopic("features")
        .setValueSerializationSchema(new SimpleStringSchema())
        .build())
    .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // ← 重要
    .setTransactionalIdPrefix("flink-features-")        // Job固有で安定
    .build();

stream.sinkTo(sink);

注意:TransactionalId は ジョブ再デプロイで変えない(並列度×タスクで一意)。


運用チェックリスト(5分で点検)


典型トラブル → 原因と対策

  1. 復旧後に二重書き込み
    • 原因:シンクがトランザクション非対応 or キー設計が無い
    • 対策:KafkaのExactly-onceに切替/出力に自然キーを導入してUPSERT
  2. チェックポイントが頻繁にタイムアウト
    • 原因:状態巨大化、I/Oレイテンシ、ネットワーク輻輳
    • 対策setMinPauseBetweenCheckpoints で間隔をとる、インクリメンタルスナップショット、TTL強化
  3. 復旧に時間がかかる
    • 原因:状態サイズが大きい
    • 対策:TTL、キー粒度の見直し、局所集計→集約の段階化、サイド出力で不要データ分離
  4. トランザクション衝突(Kafka)
    • 原因transactional.id の衝突/並列度変更時の使い回し
    • 対策並列度×サブタスクで一意に生成、ローリングアップデート時の整合を確認

観測とメトリクス(最低限)

  • Checkpoint:成功率、alignmentDuration、保存サイズ
  • State:総サイズ、TTL清掃レート、遅延
  • Backpressure:Operator単位の比率
  • Sink:コミット時間、トランザクション失敗率

SLO例:p99 レイテンシ < 2s、Checkpoint成功率 ≥ 99.5%、復旧 ≤ 5分


テスト戦略(重複ゼロを確信する)

  1. 障害注入テスト:チェックポイント直後に TaskManager を kill → 復旧後に件数・キー重複ゼロを検証。
  2. 終端一貫性テスト:同じ入力を2回流しても出力が完全一致。
  3. 負荷テスト:チェックポイント間隔を短縮(例 10s)しながら安定性を確認。

よくある設計Q&A

Q. そもそも冪等APIだけで十分?
A. 可能。ただし入力キーの厳密さが命。バッチ・ストリームの混在や順序入替が起きると穴が出やすい。KafkaのExactly-onceは設計負荷を減らせる。

Q. TTLはいくつにすべき?
A. 集計ウィンドウ+遅延許容(例:7日ウィンドウ+遅延2日→TTL≥9日)。余裕を持たせて、実測で縮める。

Q. 状態を軽く保つコツ?
A. キーを局所化(前処理でバケット化)、重い状態は外部ストアへ、滑らかな集約(分位点スケッチ等)を採用。


運用Runbook(インシデント時)

  1. 症状確認:チェックポイント失敗/遅延、Backpressure急増、シンクコミット遅延。
  2. 即応
    • 間隔を一時的に延長(例 60s→120s)
    • 並列度を+N(シンク側も追随)
  3. 原因切分
    • ネットワーク/ストレージ遅延 → I/Oメトリクス
    • 状態肥大 → Stateサイズ、キー分布
  4. 復旧
    • 外部化Checkpointから-s <path>で再起動
    • 直前のトランザクションが未コミットなら自動アボートされる(Kafka)

参考パターン:TwoPhaseCommitSinkFunction(概念)

自作シンクでExactly-onceをやる場合の骨格(簡略化)

public class My2PCSink<T> extends TwoPhaseCommitSinkFunction<T, TxnHandle, Void> {
  protected TxnHandle beginTransaction() { /* start txn */ }
  protected void invoke(TxnHandle txn, T value, Context ctx) { /* buffer */ }
  protected void preCommit(TxnHandle txn) { /* flush to temp */ }
  protected void commit(TxnHandle txn) { /* atomically publish */ }
  protected void abort(TxnHandle txn) { /* rollback */ }
}

まとめ

  • Exactly-once = 入力の再実行は許すが、公開は一意
  • チェックポイント × トランザクションシンク × 状態TTL の三点セットが実運用の安定化の鍵。
  • メトリクスで常時観測し、“速く壊して速く直す” 手順を用意しておく。

付録:設定プリセット(出発点)

# Flink
execution.checkpointing.interval=60000
execution.checkpointing.mode=EXACTLY_ONCE
execution.checkpointing.timeout=120000
execution.checkpointing.max-concurrent-checkpoints=1
execution.checkpointing.min-pause=30000

# Kafka(例)
transaction.timeout.ms=900000
max.in.flight.requests.per.connection=1
acks=all
retries=2147483647

# State Backend(バージョンに応じて適切に)
state.backend.latency-track.enabled=true
state.checkpoints.dir=s3://your-bucket/flink/checkpoints
state.savepoints.dir=s3://your-bucket/flink/savepoints

投稿者 kojiro777

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です