TL;DR

  • 詰まるのは前提。吸い込む量・処理する量・吐き出す量に明確な上限を設ける。
  • 入口で 計測 → 制限(レート/同時数) → シェディング(優先度/429) → 退避(DLQ/リトライ) の順に守る。
  • 監視の主語はエンドツーエンド遅延 (p95/p99) と Lag/IteratorAge。閾値は“時間”で決める。

設計の基本ライン

  • 計測点をそろえるevent_time(発生時刻)と ingest_time(受信時刻)を必ず記録。
  • 入口に速度標識:レートリミット・同時実行上限・キュー長上限を最初に決める。
  • 落ち先(DLQ)を先に作る:落とす先がないと“無限待ち”が発生する。
  • 再処理は冪等を前提event_id で二重実行を吸収。

プラットフォーム別の“詰まり対策”

Kafka / Redpanda

Producer

  • linger.msbatch.size は混雑時に上げると効率↑

Consumer

  • フェッチ量制御:max.partition.fetch.bytes, fetch.max.bytes
  • 取り出し速度:max.poll.records
  • 一時停止:consumer.pause()/resume() でホットキーを冷ます
  • 再均衡:partition.assignment.strategy=cooperative-sticky
# Python Consumer(ホットパーティションを一時停止)
msg = consumer.poll(timeout=1.0)
if is_hot_partition(msg.partition()):
    consumer.pause([msg.topic(), msg.partition()])
    time.sleep(0.2)
    consumer.resume([msg.topic(), msg.partition()])

Kinesis(標準/Enhanced Fan-Out)

  • 主指標:GetRecords.IteratorAgeMilliseconds
  • 調整つまみ:MaxRecords, IdleTimeBetweenReadsInMillis
  • 書き込み:KPL の RecordMaxBufferedTime と Aggregation を切替
  • シャード計画:IteratorAge が上昇し続けたらシャード増

Flink / Kinesis Data Analytics

  • 可視化:BackPressure比率、Busy Time、Watermark遅延
  • 緩和:bufferTimeout 調整、rebalance/rescale、operator chaining見直し
  • 遅延イベントはサイド出力へ

Pulsar(参考)

  • Key_Shared サブスクリプションでホットキー分散
  • Message Redelivery を短くして DLQ に早めに落とす

Django つなぎ込み(入口で“止める勇気”)

  • 非同期送信キューを挟み、満杯なら 429 で返す
  • 優先度キューを2本用意(高:同期フィードバック用、低:集計用)
# views.py(入口で保護)
def ingest(request):
    ev = {
        "event_id": uuid4().hex,
        "event_time": request.POST["t"],  # 必須
        "user_id": request.user.id,
        "payload": request.POST["p"],
    }
    if not high_priority_queue.offer(ev, timeout_ms=5):  # 入り口で5msだけ待つ
        return JsonResponse({"error":"too busy"}, status=429)
    return JsonResponse({"ok": True})

リトライ & DLQ 戦略(最小セット)

  • 即時リトライは最大3回(指数バックオフ+ジッタ)
  • 失敗イベントは DLQ(Kafkaトピック/Kinesis別ストリーム/S3)
  • DLQは 件数・最古時刻・原因トップ3 をダッシュボードで監視
  • 再処理ジョブは冪等&速度制限付きで夜間に流す

モニタリングの“3点セット”

  • E2E遅延 (p95/p99) = now() – event_time
  • バッファ/キュー長 = アプリ側キュー・Kafka Lag / Kinesis IteratorAge
  • 落ち率 = エラー率・DLQ流入率・再処理成功率

推奨アラート例(目安)

  • E2E p95 > 2s(対話系)or > 10s(準リアルタイム)を3分連続
  • Kafka consumer_group_lag > 10k を5分継続
  • Kinesis IteratorAge > 30s を3分継続
  • DLQ流入 > 0.5% を5分継続

ありがちアンチパターン → 置き換え方

アンチパターン何が起きる置き換え方
無限バッファ障害時に落ち切らず遅延爆発上限+拒否(429)+DLQ
入口で全部受ける下流が詰まると連鎖停止レート/同時数制限で手前で調整
リトライ無限ストームで二次障害回数制限+指数バックオフ
冪等なし再処理二重課金・二重通知event_id+処理済みストア
指標がLagだけ体感遅延が見えないE2E遅延 (p95/p99) を主語に

デプロイ前チェック(60秒版)

  • event_time を必ず持つ/p95 E2E遅延が可視化済み
  • 入り口にレート/同時数/キュー長の上限がある
  • 429や優先度キューでシェディングできる
  • DLQと再処理の経路が動作確認済み
  • Kafka: pause/resume/Kinesis: EFO or MaxRecords調整の手順がある

投稿者 kojiro777

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です