TL;DR(最短まとめ)
- ストリーミング基盤では「スキーマ=データの取り決め(約束ごと)」として扱う。
- 後方互換性を守るのが最優先。追加はOK、削除や型変更・名前変更はNG。
- Schema Registry / Glue Schema Registryで管理し、互換性チェックを自動化する。
- 大きな変更が必要なら新トピックを作って段階移行。
よくある失敗例
- 「JSONは自由に書ける」問題
→ 気軽にフィールドを変えたら翌日バッチが全滅。 - Excelでスキーマ管理
→ 実装とズレて誰も信用しなくなる。 - 小変更と思ったら破壊的変更
→ 後方互換切れでコンシューマが落ちる。
Kafka / Redpandaの場合
- Confluent Schema Registryを使う(Avro / Protobuf / JSON Schema対応)。
- 互換性モードは BACKWARD が基本(新しいスキーマでも古いデータを読める)。
- CI/CDに互換性チェックを組み込み、NGならリリースをストップ。
# schema.avsc を topic1 に追加できるか確認
curl -X POST http://schemaregistry:8081/compatibility/subjects/topic1-value/versions/latest \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"schema": "'"$(< schema.avsc)"'"}'
Kinesisの場合
- AWS Glue Schema Registryを利用。
- Producerは
put_record
時にスキーマIDを付与。Consumerは自動デシリアライズ。 - 互換性モードは BACKWARD_ALL を推奨。
Flink / Kinesis Data Analyticsの場合
- Schema Registryと連携し、テーブル定義を厳密化。
- フィールド追加は nullable で吸収。削除やリネームは新テーブル/新ジョブを起動。
Django統合Tips
- DBに“生JSON”を直に保存しない。
- 受信イベントはSchema Registryで検証してからDBへ。
- 検証失敗時はDLQ(Dead Letter Queue)へ退避。
from schema_registry.client import SchemaRegistryClient, schema
client = SchemaRegistryClient({"url": "http://schemaregistry:8081"})
schema_id, parsed = client.get_latest_schema("events-value")
def validate_and_save(event):
if not parsed.validate(event):
save_to_dlq(event) # NGデータはDLQへ
return
save_to_db(event) # OKならDB保存
運用チェックリスト ✅
- スキーマは必ずRegistryに集約している
- 互換性モードは BACKWARD/BACKWARD_ALL に固定
- CI/CDで自動チェックを走らせている
- Django側で検証&DLQ退避の仕組みあり
- 破壊的変更は必ず新トピックを用意して段階移行
まとめ
ストリーミング基盤は**「スキーマの契約社会」。安心して変更できるのは追加だけ**。削除やリネームは別レーンで。
RegistryとCI/CDを組み合わせ、「壊れない変更しか入らない世界」を作ることが安定運用の第一歩。