一言まとめ
processing.guarantee=exactly_once_v2
と 安定した状態管理(state=途中結果のメモ)、そして 内部・出力トピックの冗長化 をそろえると、同じイベントを1回だけ安全に処理できます。
全体像(まずは絵で)
[Front] --event_id--> [Kafka topics]
|
v
[Kafka Streams]
(state store + Txn)
|
v
[Sink: Kafka/DB/Delta]
- event_id: イベント固有ID(重複排除のカギ)
- state store: RocksDB等に置かれる途中結果のメモ
- Txn(トランザクション): 読み書きを“まとめて確定”する仕組み
これは何?(What)
Kafka StreamsでExactly-once v2を有効にし、状態と書き込みを二重にならないよう扱う設計。
なぜ大事?(Why)
- 再実行や障害時に二重計上や欠損を防げる。
- 売上/KPI/学習データの信頼性が上がる。
- 運用で“数字がブレる”原因を減らせる。
どうやる?(How:5ステップ)
- 有効化:
processing.guarantee=exactly_once_v2
を設定。application.id
は固定。 - 内部トピックを堅牢に:
replication.factor
を十分に(例: 本番は3)。 - 状態設計:キー偏りを避け、ステートストアのTTL/保持期間を決める。
- 出力の冪等性:下流がDBならUPSERT(同じIDは上書き)、Kafkaなら出力側もトランザクション。
- 監視:トランザクション失敗、レイテンシ、Lagをダッシュボード化。
設定スニペット(そのままコピペ)
# Exactly-once v2 を有効化
processing.guarantee=exactly_once_v2
application.id=orders-agg
# 冗長化(内部トピック)
default.replication.factor=3
min.insync.replicas=2
# 運用の安定化(例)
commit.interval.ms=1000
num.stream.threads=2
ポイント:
application.id
を変えると別アプリ扱いになり再初期化→再処理の原因に。
DSL最小例(概念)
StreamsBuilder b = new StreamsBuilder();
KStream<String, Event> events = b.stream("events");
KTable<Windowed<String>, Long> agg = events
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.count();
agg.toStream().map((k,v) -> new KeyValue<>(k.key(), v))
.to("agg-out");
Properties p = new Properties();
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "orders-agg");
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "broker:9092");
p.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// 内部トピックの冗長化はブローカー側設定 or AdminClientで作成
KafkaStreams app = new KafkaStreams(b.build(), p);
app.start();
ポイント: v2はプロデューサ/コンシューマのTx制御が簡素化され、運用が扱いやすい。
状態(state)の設計ヒント
- キー偏り: 片寄るIDにはサルティング(例:
userId#(hash%8)
)。 - TTL/保持: 窓の幅+遅延許容(グレース)を基準に、古い状態は自動的に掃除。
- 容量監視: RocksDBディレクトリのサイズ推移を可視化。
よくある落とし穴(と回避)
application.id
変更: 再初期化→再処理→重複の温床。固定する。- レプリカ不足: ブローカー障害でTxが不安定→
replication.factor
を確保。 - DB側にユニーク制約なし: UPSERTしても衝突→
UNIQUE(event_id)
を付ける。 - キー偏り: 一部パーティションが遅い→サルティング+並列度調整。
1分点検(出す前に)
観測ポイント(運用の見える化)
- トランザクション失敗率(
transaction-abort
ログ/メトリクス) record-lag
、処理RPS、レイテンシ- ステートストア容量、再起動後のリカバリ時間
FAQミニ
Q. v1とv2の違いは?
A. v2は設計が簡潔で、Tx周りの安定性/回復性が改善。今からはv2を選ぶのが無難。
Q. 100%のExactly-onceを保証?
A. 実務ではブローカー/シンク/アプリの組み合わせ次第。非対応シンクはUPSERTで代替。
Q. 遅延データはどう扱う?
A. 窓の保持期間とグレース(許容遅延)を調整。遅延を多く許すほど状態は大きくなる。
まとめ
- v2を有効化し、ID/状態/Txの三点をそろえる。
- 内部トピックの冗長化で障害に強く。
- UPSERT/ユニーク制約で“最後の砦”。
この3点で、Kafka StreamsのExactly-onceは実務で使えるレベルに安定します。