TL;DR(最初に要点)
- 目的:重複なしで正しい集計・特徴量を作る(信頼できるデータ基盤)。
- コア:
CheckpointingMode.EXACTLY_ONCE
+ トランザクション対応シンク(Kafka等)。 - 運用の肝:状態TTLで肥大化を防ぐ/チェックポイントの安定化/復旧時の重複防止。
- アンチパターン:シンクが冪等でもトランザクション非対応だと、障害復旧で二重書き込みが起きがち。
なぜ大事?(ML/分析の観点)
- 特徴量の二重計算 → 学習・推論が劣化。
- 集計の二重カウント → KPIが信用できない。
- 一度壊れると後続全体が汚染 → 追跡・修復コストが爆増。
Exactly-once は「再実行があっても結果が一意である」ことを担保し、信頼できる特徴量・指標を作ります。
仕組みの全体像(Source → Operator → Sink)
- チェックポイント:演算子の状態・ソースのオフセットをスナップショット。
- 再処理時:最新の正常なスナップショットから復旧し、同じ入力境界から決定的に再計算。
- シンクのコミット:2相コミットで「このチェックポイント分だけ」原子に公開。
ポイント:“演算は重ねてOK、公開は一度だけ” の設計にする。
最小設定スニペット(Java)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 60 秒ごとにチェックポイント(Exactly-once)
env.enableCheckpointing(60_000, CheckpointingMode.EXACTLY_ONCE);
// 失敗時の堅牢性
env.getCheckpointConfig()
.setMinPauseBetweenCheckpoints(30_000) // チェックポイント間の最小間隔
.setCheckpointTimeout(120_000) // タイムアウト
.setTolerableCheckpointFailureNumber(3) // 許容失敗回数
.setExternalizedCheckpointCleanup(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// Source/Transform/State …
状態(State)管理の実装要点
代表的な状態タイプ
- ValueState:単一値(累積カウンタ、最新特徴量など)
- ListState:順序リスト(最近N件など)
- MapState<K,V>:キー→値(特徴量辞書)
TTL(Time-to-Live)の付与
状態が増え続けると 復旧が遅くなる/ストレージ圧迫。TTLで古いキーを自動清掃。
StateTtlConfig ttl = StateTtlConfig
.newBuilder(Time.days(7)) // 7日で期限切れ
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build();
ValueStateDescriptor<Long> desc = new ValueStateDescriptor<>("cnt", Long.class);
desc.enableTimeToLive(ttl);
Tips:期限切れ状態の物理削除は遅延して行われる(GC的)。メモリ・バックエンドと間隔を監視。
シンク設計:冪等 vs. トランザクション
種別 | 例 | 特徴 | 復旧時の二重防止 |
---|---|---|---|
トランザクション対応 | Kafka Transactional(KafkaSink Exactly-once)、2PCシンク | 2相コミット(pre-commit→checkpoint→commit) | 強い(チェックポイントと連動) |
冪等API | PUT / UPSERT、主キー重複無視 | 同一キーの重複を無視 | 入力キー設計が重要 |
単純Append | S3/HDFSへファイル追記 | 追記時に重複し得る | 弱い:ファイル原子性や出力パターンの工夫が必要 |
Kafka(Exactly-once)例(Flink 1.15+ の KafkaSink
)
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("features")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) // ← 重要
.setTransactionalIdPrefix("flink-features-") // Job固有で安定
.build();
stream.sinkTo(sink);
注意:TransactionalId は ジョブ再デプロイで変えない(並列度×タスクで一意)。
運用チェックリスト(5分で点検)
典型トラブル → 原因と対策
- 復旧後に二重書き込み
- 原因:シンクがトランザクション非対応 or キー設計が無い
- 対策:KafkaのExactly-onceに切替/出力に自然キーを導入してUPSERT
- チェックポイントが頻繁にタイムアウト
- 原因:状態巨大化、I/Oレイテンシ、ネットワーク輻輳
- 対策:
setMinPauseBetweenCheckpoints
で間隔をとる、インクリメンタルスナップショット、TTL強化
- 復旧に時間がかかる
- 原因:状態サイズが大きい
- 対策:TTL、キー粒度の見直し、局所集計→集約の段階化、サイド出力で不要データ分離
- トランザクション衝突(Kafka)
- 原因:
transactional.id
の衝突/並列度変更時の使い回し - 対策:並列度×サブタスクで一意に生成、ローリングアップデート時の整合を確認
- 原因:
観測とメトリクス(最低限)
- Checkpoint:成功率、
alignmentDuration
、保存サイズ - State:総サイズ、TTL清掃レート、遅延
- Backpressure:Operator単位の比率
- Sink:コミット時間、トランザクション失敗率
SLO例:p99 レイテンシ < 2s、Checkpoint成功率 ≥ 99.5%、復旧 ≤ 5分
テスト戦略(重複ゼロを確信する)
- 障害注入テスト:チェックポイント直後に TaskManager を kill → 復旧後に件数・キー重複ゼロを検証。
- 終端一貫性テスト:同じ入力を2回流しても出力が完全一致。
- 負荷テスト:チェックポイント間隔を短縮(例 10s)しながら安定性を確認。
よくある設計Q&A
Q. そもそも冪等APIだけで十分?
A. 可能。ただし入力キーの厳密さが命。バッチ・ストリームの混在や順序入替が起きると穴が出やすい。KafkaのExactly-onceは設計負荷を減らせる。
Q. TTLはいくつにすべき?
A. 集計ウィンドウ+遅延許容(例:7日ウィンドウ+遅延2日→TTL≥9日)。余裕を持たせて、実測で縮める。
Q. 状態を軽く保つコツ?
A. キーを局所化(前処理でバケット化)、重い状態は外部ストアへ、滑らかな集約(分位点スケッチ等)を採用。
運用Runbook(インシデント時)
- 症状確認:チェックポイント失敗/遅延、Backpressure急増、シンクコミット遅延。
- 即応:
- 間隔を一時的に延長(例 60s→120s)
- 並列度を+N(シンク側も追随)
- 原因切分:
- ネットワーク/ストレージ遅延 → I/Oメトリクス
- 状態肥大 → Stateサイズ、キー分布
- 復旧:
- 外部化Checkpointから
-s <path>
で再起動 - 直前のトランザクションが未コミットなら自動アボートされる(Kafka)
- 外部化Checkpointから
参考パターン:TwoPhaseCommitSinkFunction(概念)
自作シンクでExactly-onceをやる場合の骨格(簡略化)
public class My2PCSink<T> extends TwoPhaseCommitSinkFunction<T, TxnHandle, Void> {
protected TxnHandle beginTransaction() { /* start txn */ }
protected void invoke(TxnHandle txn, T value, Context ctx) { /* buffer */ }
protected void preCommit(TxnHandle txn) { /* flush to temp */ }
protected void commit(TxnHandle txn) { /* atomically publish */ }
protected void abort(TxnHandle txn) { /* rollback */ }
}
まとめ
- Exactly-once = 入力の再実行は許すが、公開は一意。
- チェックポイント × トランザクションシンク × 状態TTL の三点セットが実運用の安定化の鍵。
- メトリクスで常時観測し、“速く壊して速く直す” 手順を用意しておく。
付録:設定プリセット(出発点)
# Flink
execution.checkpointing.interval=60000
execution.checkpointing.mode=EXACTLY_ONCE
execution.checkpointing.timeout=120000
execution.checkpointing.max-concurrent-checkpoints=1
execution.checkpointing.min-pause=30000
# Kafka(例)
transaction.timeout.ms=900000
max.in.flight.requests.per.connection=1
acks=all
retries=2147483647
# State Backend(バージョンに応じて適切に)
state.backend.latency-track.enabled=true
state.checkpoints.dir=s3://your-bucket/flink/checkpoints
state.savepoints.dir=s3://your-bucket/flink/savepoints