TL;DR
- Beam は複数ランナー(Flink / Dataflow / Spark…)で“同じパイプライン”を動かせるが、**二重処理(重複出力)**を避けるには「状態管理 + チェックポイント + 冪等なシンク」の3点セットが必須。
@StateId
でキー付き状態を持ち、@TimerId
で「いつ」「どの条件で」状態をフラッシュするかを制御。- 出力先は withExactlyOnce() 対応や MERGE / UPSERT / 冪等キーを使えるシンクを選ぶ(BigQuery, Pub/Sub, Spanner, Bigtable など)。
なぜ大事?
- 同じパイプラインを違うランナー(Flink と Dataflow など)で実行しても、再実行・リトライ・フェイルオーバ時に出力がブレず、二重計上や欠損を起こしにくい設計にできるから。
- ストリーミングは遅延到着・重複到着・順序入れ替わりが常態。状態とタイマーで「いつ集計確定するか」を決め、チェックポイントで復旧し、冪等シンクで最終重複を抑える。
用語ミニ辞典
- 状態(State): キーごとに保持する途中結果(例:ユーザー別カウント)。
- タイマー(Timer): 一定時刻/イベントでコールバックし、状態を確定/フラッシュ。
- チェックポイント: オペレーターの処理位置と状態の保存点。復旧の基盤。
- 冪等(idempotent): 同じ入力を何度適用しても結果が変わらない性質。
- Exactly‑Once: 「出力の重複がない」≒ 実質 Effectively‑Once(実務では冪等シンクの助けで達成)。
最小イメージ(Java)
ポイント:
@StateId
で途中結果、@TimerId
で締め時刻、onTimer
でフラッシュ。
public class CountDoFn extends DoFn<KV<String, Long>, KV<String, Long>> {
@StateId("count") private final StateSpec<ValueState<Long>> countState = StateSpecs.value();
@TimerId("flush") private final TimerSpec flushTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(@Element KV<String, Long> e,
@Timestamp Instant ts,
@StateId("count") ValueState<Long> count,
@TimerId("flush") Timer timer,
OutputReceiver<KV<String, Long>> out) {
long acc = Optional.ofNullable(count.read()).orElse(0L) + e.getValue();
count.write(acc);
// ウィンドウ終了時刻 + 余裕 でフラッシュを予約
timer.set(ts.plus(Duration.standardMinutes(1)));
}
@OnTimer("flush")
public void onFlush(@StateId("count") ValueState<Long> count,
OutputReceiver<KV<String, Long>> out) {
Long acc = count.read();
if (acc != null) {
// ここで冪等キー付きで出力(下セクション参照)
// out.output(KV.of(key, acc));
count.clear();
}
}
}
原文にあった概念行:
@StateId("count") private final StateSpec<ValueState<Long>> countState = StateSpecs.value();
Python(概念)
class CountDoFn(beam.DoFn):
STATE = userstate.BagStateSpec('values', beam.coders.VarIntCoder())
TIMER = userstate.TimerSpec('flush', userstate.TimeDomain.WATERMARK)
def process(self, element, timestamp=beam.DoFn.TimestampParam,
state=beam.DoFn.StateParam(STATE),
timer=beam.DoFn.TimerParam(TIMER)):
state.add(element[1])
timer.set(timestamp.to_utc_datetime() + datetime.timedelta(minutes=1))
@on_timer('flush')
def flush(self, state=beam.DoFn.StateParam(STATE)):
total = sum(state.read())
state.clear()
# 冪等キーを付けて出力
シンク選定:Exactly‑Once / 冪等の実務パターン
BigQuery
- 推奨: 一時テーブルにステージ → ターゲットに MERGE(UPSERT)。
- 冪等キー(例:
event_id
,window_end + key
)で同一行を一意化。
MERGE dataset.table T
USING dataset.stage S
ON T.id = S.id
WHEN MATCHED THEN UPDATE SET total = S.total, updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN INSERT (id, total, updated_at) VALUES (S.id, S.total, CURRENT_TIMESTAMP());
Pub/Sub
- メッセージ重複は起こりうる前提。属性に冪等キー(
dedup_key
)を付与し、 ダウンストリームで重複破棄(キャッシュ or テーブルで最近キーを記録)。 - Ordering Key を使うとキー内順序保証(ただし可用性とトレードオフ)。
Bigtable / Spanner / RDBMS
- 主キー UPSERT(
INSERT ... ON CONFLICT DO UPDATE
など)で冪等化。
ウィンドウ / 水位線(Watermark)とタイマー
- イベント時刻ウィンドウ + 許容遅延 を設定し、水位線が越えたら確定。
- タイマーは
EVENT_TIME
で「ウィンドウ終了 + 余裕」に設定し、遅延分を吸収。
[イベント時刻] ──┬────────┬────────┬────
窓A 窓B
[Watermark] ───────────▶(越えたら確定)
チェックポイント(保存点)は絶対オフにしない
- 落とし穴: 状態付きパイプラインで チェックポイント無効にすると、 障害復旧時に 入力リプレイ → 再集計 → 二重出力 の温床に。
- Flink/Dataflow では 定期スナップショットを有効化し、再開時に状態/進捗を復元。
ランナー別の実務メモ
Flink Runner
checkpointingInterval
を短すぎ/長すぎにしない(レイテンシとコストのバランス)。- Exactly‑Once シンク(例: Flink の Two‑Phase Commit)と相性良。
- Savepoint での安全なバージョンアップ/スケール変更を習慣に。
Dataflow Runner
- ストリーミング・エンジンでの状態バックアップと自動復旧が強力。
- Dataflow Shuffle / Streaming Engine 使用時の課金とレイテンシの見積もりを。
運用チェックリスト(配備前の5分点検)
- 冪等キーは決めた?(
event_id
orkey + window_end
など) - シンクの UPSERT/MERGE ルールは動作確認済み?
- チェックポイントは有効? 間隔は根拠ある値?
- 許容遅延・遅延到着のテストをした?(過去イベント再投入)
- 再実行(リプレイ)しても二重にならないことを E2E で検証した?
- DLQ(死行きキュー)と再処理手順は用意した?
- スキーマ互換(後方互換)は崩していない?
よくある落とし穴と対策
- (落とし穴)状態クリア忘れ → (対策)
onTimer
で確定出力後にstate.clear()
。 - 遅延イベント無視 → 許容遅延と「再出力の冪等性」をセットで設計。
- キー粒度が大きすぎ → ホットキー化。キー設計とスケーリング(シャーディング)を見直す。
- ウィンドウ確定が早すぎ → 遅延到着を吸えず、後から補正で二重。
- シンク単体で重複防止なし → アプリ側で dedup テーブル/キャッシュ を持つ。
テスト戦略(最小セット)
- 重複注入テスト: 同一イベントを N 回投入 → 出力1回であること。
- 遅延注入テスト: ウィンドウ閉鎖後に遅延イベント投入 → 期待通り補正/無視されること。
- リプレイテスト: チェックポイント復元 → 既出力分が再出力されないこと。
- E2Eテスト: 入口からシンクまで通しで、UPSERT/MERGE が正しく機能。
サンプル設計テンプレ(チェック用)
- 冪等キー:
id = hash(user_id || window_end || version)
- ウィンドウ: 5分 tumbling、許容遅延 10分
- タイマー:
window_end + 1min
- シンク: BigQuery(ステージ→MERGE)
- DLQ: Pub/Sub トピック
errors.my-pipeline
- メトリクス:
duplicate_drop_count
,late_event_count
,watermark_lag_ms
監視の最重要KPI
- Watermark Lag(水位線の遅れ)
- Late Data Rate(遅延到着比率)
- Dedup Drop Rate(重複除去率)
- Checkpoint Duration / Alignment Time(Flink)
- Backlog/Latency(Dataflow UI)
まとめ
- Beam の 状態 + タイマー + 冪等シンク は、ランナーを跨いだ Effectively‑Once の柱。
- チェックポイント無効化は厳禁。復旧/リプレイ時の二重出力を招く。
- 最後は E2Eで冪等性を証明するテストまでやってはじめて安心運用。
付録:実装の覚え書き
- State/Timer は**キー化後の **
ParDo
でのみ利用可。 - Dataflow/Flink いずれも状態サイズに注意(TTLや定期クリーンアップ)。
- BigQuery への直接
INSERT
連打は重複を招きやすい。ステージ→MERGE を基本に。 - Pub/Sub は 少量の重複が仕様。属性で冪等キー付与 + 下流で破棄の二段構え。