TL;DR

  • Beam は複数ランナー(Flink / Dataflow / Spark…)で“同じパイプライン”を動かせるが、**二重処理(重複出力)**を避けるには「状態管理 + チェックポイント + 冪等なシンク」の3点セットが必須。
  • @StateId でキー付き状態を持ち、@TimerId で「いつ」「どの条件で」状態をフラッシュするかを制御。
  • 出力先は withExactlyOnce() 対応MERGE / UPSERT / 冪等キーを使えるシンクを選ぶ(BigQuery, Pub/Sub, Spanner, Bigtable など)。

なぜ大事?

  • 同じパイプラインを違うランナー(Flink と Dataflow など)で実行しても、再実行・リトライ・フェイルオーバ時に出力がブレず、二重計上や欠損を起こしにくい設計にできるから。
  • ストリーミングは遅延到着・重複到着・順序入れ替わりが常態。状態とタイマーで「いつ集計確定するか」を決め、チェックポイントで復旧し、冪等シンクで最終重複を抑える。

用語ミニ辞典

  • 状態(State): キーごとに保持する途中結果(例:ユーザー別カウント)。
  • タイマー(Timer): 一定時刻/イベントでコールバックし、状態を確定/フラッシュ。
  • チェックポイント: オペレーターの処理位置と状態の保存点。復旧の基盤。
  • 冪等(idempotent): 同じ入力を何度適用しても結果が変わらない性質。
  • Exactly‑Once: 「出力の重複がない」≒ 実質 Effectively‑Once(実務では冪等シンクの助けで達成)。

最小イメージ(Java)

ポイント: @StateId で途中結果、@TimerId で締め時刻、onTimer でフラッシュ。

public class CountDoFn extends DoFn<KV<String, Long>, KV<String, Long>> {
  @StateId("count") private final StateSpec<ValueState<Long>> countState = StateSpecs.value();
  @TimerId("flush") private final TimerSpec flushTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(@Element KV<String, Long> e,
                      @Timestamp Instant ts,
                      @StateId("count") ValueState<Long> count,
                      @TimerId("flush") Timer timer,
                      OutputReceiver<KV<String, Long>> out) {
    long acc = Optional.ofNullable(count.read()).orElse(0L) + e.getValue();
    count.write(acc);
    // ウィンドウ終了時刻 + 余裕 でフラッシュを予約
    timer.set(ts.plus(Duration.standardMinutes(1)));
  }

  @OnTimer("flush")
  public void onFlush(@StateId("count") ValueState<Long> count,
                      OutputReceiver<KV<String, Long>> out) {
    Long acc = count.read();
    if (acc != null) {
      // ここで冪等キー付きで出力(下セクション参照)
      // out.output(KV.of(key, acc));
      count.clear();
    }
  }
}

原文にあった概念行:

@StateId("count") private final StateSpec<ValueState<Long>> countState = StateSpecs.value();

Python(概念)

class CountDoFn(beam.DoFn):
    STATE = userstate.BagStateSpec('values', beam.coders.VarIntCoder())
    TIMER = userstate.TimerSpec('flush', userstate.TimeDomain.WATERMARK)

    def process(self, element, timestamp=beam.DoFn.TimestampParam,
                state=beam.DoFn.StateParam(STATE),
                timer=beam.DoFn.TimerParam(TIMER)):
        state.add(element[1])
        timer.set(timestamp.to_utc_datetime() + datetime.timedelta(minutes=1))

    @on_timer('flush')
    def flush(self, state=beam.DoFn.StateParam(STATE)):
        total = sum(state.read())
        state.clear()
        # 冪等キーを付けて出力

シンク選定:Exactly‑Once / 冪等の実務パターン

BigQuery

  • 推奨: 一時テーブルにステージ → ターゲットに MERGE(UPSERT)
  • 冪等キー(例: event_id, window_end + key)で同一行を一意化。
MERGE dataset.table T
USING dataset.stage S
ON T.id = S.id
WHEN MATCHED THEN UPDATE SET total = S.total, updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN INSERT (id, total, updated_at) VALUES (S.id, S.total, CURRENT_TIMESTAMP());

Pub/Sub

  • メッセージ重複は起こりうる前提。属性に冪等キーdedup_key)を付与し、 ダウンストリームで重複破棄(キャッシュ or テーブルで最近キーを記録)。
  • Ordering Key を使うとキー内順序保証(ただし可用性とトレードオフ)。

Bigtable / Spanner / RDBMS

  • 主キー UPSERTINSERT ... ON CONFLICT DO UPDATE など)で冪等化。

ウィンドウ / 水位線(Watermark)とタイマー

  • イベント時刻ウィンドウ + 許容遅延 を設定し、水位線が越えたら確定
  • タイマーは EVENT_TIME で「ウィンドウ終了 + 余裕」に設定し、遅延分を吸収
[イベント時刻]  ──┬────────┬────────┬────
                     窓A       窓B
[Watermark]      ───────────▶(越えたら確定)

チェックポイント(保存点)は絶対オフにしない

  • 落とし穴: 状態付きパイプラインで チェックポイント無効にすると、 障害復旧時に 入力リプレイ → 再集計 → 二重出力 の温床に。
  • Flink/Dataflow では 定期スナップショットを有効化し、再開時に状態/進捗を復元

ランナー別の実務メモ

Flink Runner

  • checkpointingInterval を短すぎ/長すぎにしない(レイテンシとコストのバランス)。
  • Exactly‑Once シンク(例: Flink の Two‑Phase Commit)と相性良。
  • Savepoint での安全なバージョンアップ/スケール変更を習慣に。

Dataflow Runner

  • ストリーミング・エンジンでの状態バックアップと自動復旧が強力。
  • Dataflow Shuffle / Streaming Engine 使用時の課金とレイテンシの見積もりを。

運用チェックリスト(配備前の5分点検)

  1. 冪等キーは決めた?(event_id or key + window_end など)
  2. シンクの UPSERT/MERGE ルールは動作確認済み?
  3. チェックポイントは有効? 間隔は根拠ある値?
  4. 許容遅延・遅延到着のテストをした?(過去イベント再投入)
  5. 再実行(リプレイ)しても二重にならないことを E2E で検証した?
  6. DLQ(死行きキュー)と再処理手順は用意した?
  7. スキーマ互換(後方互換)は崩していない?

よくある落とし穴と対策

  • (落とし穴)状態クリア忘れ → (対策)onTimer で確定出力後に state.clear()
  • 遅延イベント無視 → 許容遅延と「再出力の冪等性」をセットで設計。
  • キー粒度が大きすぎ → ホットキー化。キー設計とスケーリング(シャーディング)を見直す。
  • ウィンドウ確定が早すぎ → 遅延到着を吸えず、後から補正で二重。
  • シンク単体で重複防止なし → アプリ側で dedup テーブル/キャッシュ を持つ。

テスト戦略(最小セット)

  • 重複注入テスト: 同一イベントを N 回投入 → 出力1回であること。
  • 遅延注入テスト: ウィンドウ閉鎖後に遅延イベント投入 → 期待通り補正/無視されること。
  • リプレイテスト: チェックポイント復元 → 既出力分が再出力されないこと。
  • E2Eテスト: 入口からシンクまで通しで、UPSERT/MERGE が正しく機能。

サンプル設計テンプレ(チェック用)

  • 冪等キー: id = hash(user_id || window_end || version)
  • ウィンドウ: 5分 tumbling、許容遅延 10分
  • タイマー: window_end + 1min
  • シンク: BigQuery(ステージ→MERGE)
  • DLQ: Pub/Sub トピック errors.my-pipeline
  • メトリクス: duplicate_drop_count, late_event_count, watermark_lag_ms

監視の最重要KPI

  • Watermark Lag(水位線の遅れ)
  • Late Data Rate(遅延到着比率)
  • Dedup Drop Rate(重複除去率)
  • Checkpoint Duration / Alignment Time(Flink)
  • Backlog/Latency(Dataflow UI)

まとめ

  • Beam の 状態 + タイマー + 冪等シンク は、ランナーを跨いだ Effectively‑Once の柱。
  • チェックポイント無効化は厳禁。復旧/リプレイ時の二重出力を招く。
  • 最後は E2Eで冪等性を証明するテストまでやってはじめて安心運用。

付録:実装の覚え書き

  • State/Timer は**キー化後の **ParDo でのみ利用可。
  • Dataflow/Flink いずれも状態サイズに注意(TTLや定期クリーンアップ)。
  • BigQuery への直接 INSERT 連打は重複を招きやすい。ステージ→MERGE を基本に。
  • Pub/Sub は 少量の重複が仕様。属性で冪等キー付与 + 下流で破棄の二段構え。

投稿者 kojiro777

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です