一言まとめ
processing.guarantee=exactly_once_v2安定した状態管理(state=途中結果のメモ)、そして 内部・出力トピックの冗長化 をそろえると、同じイベントを1回だけ安全に処理できます。


全体像(まずは絵で)

[Front] --event_id--> [Kafka topics]
                         |
                         v
                 [Kafka Streams]
                 (state store + Txn)
                         |
                         v
                 [Sink: Kafka/DB/Delta]
  • event_id: イベント固有ID(重複排除のカギ)
  • state store: RocksDB等に置かれる途中結果のメモ
  • Txn(トランザクション): 読み書きを“まとめて確定”する仕組み

これは何?(What)

Kafka StreamsでExactly-once v2を有効にし、状態と書き込みを二重にならないよう扱う設計。

なぜ大事?(Why)

  • 再実行や障害時に二重計上や欠損を防げる。
  • 売上/KPI/学習データの信頼性が上がる。
  • 運用で“数字がブレる”原因を減らせる。

どうやる?(How:5ステップ)

  1. 有効化processing.guarantee=exactly_once_v2 を設定。application.id は固定。
  2. 内部トピックを堅牢にreplication.factor を十分に(例: 本番は3)。
  3. 状態設計:キー偏りを避け、ステートストアのTTL/保持期間を決める。
  4. 出力の冪等性:下流がDBならUPSERT(同じIDは上書き)、Kafkaなら出力側もトランザクション。
  5. 監視:トランザクション失敗、レイテンシ、Lagをダッシュボード化。

設定スニペット(そのままコピペ)

# Exactly-once v2 を有効化
processing.guarantee=exactly_once_v2
application.id=orders-agg

# 冗長化(内部トピック)
default.replication.factor=3
min.insync.replicas=2

# 運用の安定化(例)
commit.interval.ms=1000
num.stream.threads=2

ポイント: application.id を変えると別アプリ扱いになり再初期化→再処理の原因に。


DSL最小例(概念)

StreamsBuilder b = new StreamsBuilder();
KStream<String, Event> events = b.stream("events");

KTable<Windowed<String>, Long> agg = events
  .groupByKey()
  .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
  .count();

agg.toStream().map((k,v) -> new KeyValue<>(k.key(), v))
   .to("agg-out");

Properties p = new Properties();
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "orders-agg");
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
p.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// 内部トピックの冗長化はブローカー側設定 or AdminClientで作成

KafkaStreams app = new KafkaStreams(b.build(), p);
app.start();

ポイント: v2はプロデューサ/コンシューマのTx制御が簡素化され、運用が扱いやすい。


状態(state)の設計ヒント

  • キー偏り: 片寄るIDにはサルティング(例: userId#(hash%8))。
  • TTL/保持: 窓の幅+遅延許容(グレース)を基準に、古い状態は自動的に掃除。
  • 容量監視: RocksDBディレクトリのサイズ推移を可視化。

よくある落とし穴(と回避)

  • application.id変更: 再初期化→再処理→重複の温床。固定する。
  • レプリカ不足: ブローカー障害でTxが不安定→replication.factorを確保。
  • DB側にユニーク制約なし: UPSERTしても衝突→UNIQUE(event_id) を付ける。
  • キー偏り: 一部パーティションが遅い→サルティング+並列度調整。

1分点検(出す前に)


観測ポイント(運用の見える化)

  • トランザクション失敗率(transaction-abortログ/メトリクス)
  • record-lag、処理RPS、レイテンシ
  • ステートストア容量、再起動後のリカバリ時間

FAQミニ

Q. v1とv2の違いは?
A. v2は設計が簡潔で、Tx周りの安定性/回復性が改善。今からはv2を選ぶのが無難。

Q. 100%のExactly-onceを保証?
A. 実務ではブローカー/シンク/アプリの組み合わせ次第。非対応シンクはUPSERTで代替。

Q. 遅延データはどう扱う?
A. 窓の保持期間とグレース(許容遅延)を調整。遅延を多く許すほど状態は大きくなる。


まとめ

  • v2を有効化し、ID/状態/Txの三点をそろえる。
  • 内部トピックの冗長化で障害に強く。
  • UPSERT/ユニーク制約で“最後の砦”。
    この3点で、Kafka StreamsのExactly-onceは実務で使えるレベルに安定します。

投稿者 kojiro777

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です