対象:Kinesis / Kafka(+Pulsar / Redpanda / Flink / Kinesis Data Analytics)
ゴール:実運用でつまずきやすいポイントを避け、最短距離で“安定稼働”へ


TL;DR(最短まとめ)

  • 順序保証の単位を先に決める(例:device_id)。その単位をパーティションキーにする。
  • スループット↑=パーティション/シャード↑。ただし順序保証の粒度は粗くなる。
  • 低レイテンシ狙いは「小さなバッチ+短い待機」。高スループットは「大きなバッチ+待機長め」。
  • 監視は Kafka: Consumer Lag / Kinesis: IteratorAge と p95エンドツーエンド遅延が軸。
  • バックプレッシャーは必ず起こる前提。無限バッファ禁止、明示的な制御・ドロップ・リトライを設計に。

なぜパーティション設計が“最初の一手”なのか

  • イベント順序を守りたい単位(ユーザー、デバイス、セッション…)を決めてからキー設計をする。
  • Kafka:同じキー → 同じパーティション → 順序保持
  • Kinesis:同じPartitionKey → 同じシャード → 順序保持

ホットキー対策

  • 偏りが出たら user_id#0..3 のようにソルト付与で分散
  • 順序不要イベントはランダム化(ラウンドロビン)で広げる

レイテンシ vs スループット:つまみの回し方

Kafka(Producer)

  • 低レイテンシ寄せ:linger.ms=1–5, batch.size小さめ, compression=lz4/zstd
  • 高スループット寄せ:linger.ms=10–50, batch.size=128–512KB, compression=zstd
  • 可用性:acks=all + min.insync.replicas>=2(レイテンシやや増)

Kinesis

  • 低レイテンシ寄せ:KPLの RecordMaxBufferedTime=5–20ms、EFO(Enhanced Fan-Out)
  • 高スループット寄せ:Aggregation有効、MaxConnections増、シャード増(目安:1シャード=1MB/s or 1000 rec/s)

Flink / KDA(処理側)

  • ウィンドウを短く=新鮮だがコスト↑
  • チェックポイント短め=堅牢だが遅延↑

Djangoとつなぐ:最小の実装パターン

  • 同期APIでブロックしない
  • あふれたら429で守る
  • 壊れたデータはDBに入れない
# views.py(受信→非同期送信、溢れたら429)
from django.http import JsonResponse
from .producer import send_event, QueueFullError

def ingest_view(request):
    event = {
        "event_time": request.POST.get("event_time"),
        "user_id": request.user.id,
        "payload": request.POST.get("payload"),
    }
    try:
        send_event(topic="events", key=str(event["user_id"]), value=event)
        return JsonResponse({"ok": True})
    except QueueFullError:
        return JsonResponse({"error": "too busy"}, status=429)

スキーマ管理(壊れたJSONをDBに入れない)

  • **Schema Registry(Confluent / AWS Glue)**で後方互換を基本に
  • フィールド追加はOK(nullable/デフォルト前提)。削除・型変更・リネームはNG扱い
  • CI/CDで互換性チェックを自動化。不互換スキーマはリリース不可に

バックプレッシャー:処理が追いつかない時の正解

  • 入力制御:キュー長の上限・429返却・低優先度イベントは別ストリームへ
  • 処理能力↑:Kafkaはパーティション増+コンシューマ増、Kinesisはシャード増/EFO、FlinkはParallelism↑
  • バッチ化:小さなバッチで効率UP(遅延は増える、要SLA調整)

重複と再処理(Exactly-Onceの現実対応)

  • Kafka:enable.idempotence=true、必要ならトランザクション(遅延とコスト↑)
  • Kinesis:At-Least-Once前提。冪等性キー(event_id 等)で二重処理を排除
  • Django:Redis/DBで処理済みフラグ
# 冪等処理(Redis)
import redis
r = redis.Redis()

def process_event(event):
    eid = event["event_id"]
    if not r.setnx(f"processed:{eid}", 1):
        return  # 二重処理回避
    handle_event(event)

監視・アラート:見るべきはこの3つ

  • 遅延の山
    • Kafka: consumer_group_lag
    • Kinesis: GetRecords.IteratorAgeMilliseconds
  • 体感遅延
    • E2E遅延(イベント発生→処理完了のp95/p99)
  • 基盤の健康
    • Kafka: UnderReplicatedPartitions
    • Kinesis: *ThroughputExceeded(R/W)

アラート例

  • Lag > 10,000 かつ 5分継続
  • IteratorAge > 30秒 かつ 3分継続
  • p95 E2E遅延がSLA超過(例:>500ms/対話系, >5s/準リアルタイム)

よくある落とし穴と回避策

落とし穴どうなる回避策
ユーザーIDで固定キー(順序不要なのに)ホットキーで詰まるランダム化 or ソルト付与
なんでもリアルタイムコスト爆増&運用疲弊ユースケースごとにSLAを分ける
JSONなら自由でOKある日から落ちる/解析不能Schema Registryで後方互換を強制
無限バッファ障害時にメモリ爆死429/バックオフ/フォールバック
重複しない前提請求/通知が二重冪等性キー+重複排除ストア

デプロイ前の「5分点検」チェックリスト

  • 順序保証の単位が明文化され、キーに反映されている
  • 低/高レイテンシ設定(linger/batch or KPLパラメータ)がSLAに合致
  • Schema Registryの互換性チェックがCI/CDに組み込まれている
  • Lag/IteratorAge + p95 E2E遅延のダッシュボードとアラートがある
  • バックプレッシャー時の挙動(429/退避/再送)がテスト済み
  • 重複対策(冪等性キー+ストア)が用意されている

まとめ

  • 設計の出発点は 順序保証の単位
  • レイテンシ/スループットのつまみは バッチサイズと待機時間
  • 監視は Lag/IteratorAge + p95遅延、運用は スキーマ互換とバックプレッシャー対応が肝。

付録:初期パラメータの“当たり値”

  • Kafka(低遅延スタート):linger.ms=5, batch.size=65536, compression=zstd, acks=all
  • Kafka(高スループット):linger.ms=20, batch.size=262144
  • Kinesis(低遅延):RecordMaxBufferedTime=5–20ms, EFO
  • Kinesis(高スループット):Aggregation有効+シャード増

投稿者 kojiro777

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です