対象:Kinesis / Kafka(+Pulsar / Redpanda / Flink / Kinesis Data Analytics)
ゴール:実運用でつまずきやすいポイントを避け、最短距離で“安定稼働”へ
TL;DR(最短まとめ)
- 順序保証の単位を先に決める(例:device_id)。その単位をパーティションキーにする。
- スループット↑=パーティション/シャード↑。ただし順序保証の粒度は粗くなる。
- 低レイテンシ狙いは「小さなバッチ+短い待機」。高スループットは「大きなバッチ+待機長め」。
- 監視は Kafka: Consumer Lag / Kinesis: IteratorAge と p95エンドツーエンド遅延が軸。
- バックプレッシャーは必ず起こる前提。無限バッファ禁止、明示的な制御・ドロップ・リトライを設計に。
なぜパーティション設計が“最初の一手”なのか
- イベント順序を守りたい単位(ユーザー、デバイス、セッション…)を決めてからキー設計をする。
- Kafka:同じキー → 同じパーティション → 順序保持
- Kinesis:同じPartitionKey → 同じシャード → 順序保持
ホットキー対策
- 偏りが出たら
user_id#0..3
のようにソルト付与で分散 - 順序不要イベントはランダム化(ラウンドロビン)で広げる
レイテンシ vs スループット:つまみの回し方
Kafka(Producer)
- 低レイテンシ寄せ:linger.ms=1–5, batch.size小さめ, compression=lz4/zstd
- 高スループット寄せ:linger.ms=10–50, batch.size=128–512KB, compression=zstd
- 可用性:acks=all + min.insync.replicas>=2(レイテンシやや増)
Kinesis
- 低レイテンシ寄せ:KPLの RecordMaxBufferedTime=5–20ms、EFO(Enhanced Fan-Out)
- 高スループット寄せ:Aggregation有効、MaxConnections増、シャード増(目安:1シャード=1MB/s or 1000 rec/s)
Flink / KDA(処理側)
- ウィンドウを短く=新鮮だがコスト↑
- チェックポイント短め=堅牢だが遅延↑
Djangoとつなぐ:最小の実装パターン
- 同期APIでブロックしない
- あふれたら429で守る
- 壊れたデータはDBに入れない
# views.py(受信→非同期送信、溢れたら429)
from django.http import JsonResponse
from .producer import send_event, QueueFullError
def ingest_view(request):
event = {
"event_time": request.POST.get("event_time"),
"user_id": request.user.id,
"payload": request.POST.get("payload"),
}
try:
send_event(topic="events", key=str(event["user_id"]), value=event)
return JsonResponse({"ok": True})
except QueueFullError:
return JsonResponse({"error": "too busy"}, status=429)
スキーマ管理(壊れたJSONをDBに入れない)
- **Schema Registry(Confluent / AWS Glue)**で後方互換を基本に
- フィールド追加はOK(nullable/デフォルト前提)。削除・型変更・リネームはNG扱い
- CI/CDで互換性チェックを自動化。不互換スキーマはリリース不可に
バックプレッシャー:処理が追いつかない時の正解
- 入力制御:キュー長の上限・429返却・低優先度イベントは別ストリームへ
- 処理能力↑:Kafkaはパーティション増+コンシューマ増、Kinesisはシャード増/EFO、FlinkはParallelism↑
- バッチ化:小さなバッチで効率UP(遅延は増える、要SLA調整)
重複と再処理(Exactly-Onceの現実対応)
- Kafka:enable.idempotence=true、必要ならトランザクション(遅延とコスト↑)
- Kinesis:At-Least-Once前提。冪等性キー(event_id 等)で二重処理を排除
- Django:Redis/DBで処理済みフラグ
# 冪等処理(Redis)
import redis
r = redis.Redis()
def process_event(event):
eid = event["event_id"]
if not r.setnx(f"processed:{eid}", 1):
return # 二重処理回避
handle_event(event)
監視・アラート:見るべきはこの3つ
- 遅延の山
- Kafka: consumer_group_lag
- Kinesis: GetRecords.IteratorAgeMilliseconds
- 体感遅延
- E2E遅延(イベント発生→処理完了のp95/p99)
- 基盤の健康
- Kafka: UnderReplicatedPartitions
- Kinesis: *ThroughputExceeded(R/W)
アラート例
- Lag > 10,000 かつ 5分継続
- IteratorAge > 30秒 かつ 3分継続
- p95 E2E遅延がSLA超過(例:>500ms/対話系, >5s/準リアルタイム)
よくある落とし穴と回避策
落とし穴 | どうなる | 回避策 |
---|---|---|
ユーザーIDで固定キー(順序不要なのに) | ホットキーで詰まる | ランダム化 or ソルト付与 |
なんでもリアルタイム | コスト爆増&運用疲弊 | ユースケースごとにSLAを分ける |
JSONなら自由でOK | ある日から落ちる/解析不能 | Schema Registryで後方互換を強制 |
無限バッファ | 障害時にメモリ爆死 | 429/バックオフ/フォールバック |
重複しない前提 | 請求/通知が二重 | 冪等性キー+重複排除ストア |
デプロイ前の「5分点検」チェックリスト
- 順序保証の単位が明文化され、キーに反映されている
- 低/高レイテンシ設定(linger/batch or KPLパラメータ)がSLAに合致
- Schema Registryの互換性チェックがCI/CDに組み込まれている
- Lag/IteratorAge + p95 E2E遅延のダッシュボードとアラートがある
- バックプレッシャー時の挙動(429/退避/再送)がテスト済み
- 重複対策(冪等性キー+ストア)が用意されている
まとめ
- 設計の出発点は 順序保証の単位。
- レイテンシ/スループットのつまみは バッチサイズと待機時間。
- 監視は Lag/IteratorAge + p95遅延、運用は スキーマ互換とバックプレッシャー対応が肝。
付録:初期パラメータの“当たり値”
- Kafka(低遅延スタート):linger.ms=5, batch.size=65536, compression=zstd, acks=all
- Kafka(高スループット):linger.ms=20, batch.size=262144
- Kinesis(低遅延):RecordMaxBufferedTime=5–20ms, EFO
- Kinesis(高スループット):Aggregation有効+シャード増