TL;DR(最短まとめ)

  • ストリーミング基盤では「スキーマ=データの取り決め(約束ごと)」として扱う。
  • 後方互換性を守るのが最優先。追加はOK、削除や型変更・名前変更はNG。
  • Schema Registry / Glue Schema Registryで管理し、互換性チェックを自動化する。
  • 大きな変更が必要なら新トピックを作って段階移行

よくある失敗例

  • 「JSONは自由に書ける」問題
    → 気軽にフィールドを変えたら翌日バッチが全滅。
  • Excelでスキーマ管理
    → 実装とズレて誰も信用しなくなる。
  • 小変更と思ったら破壊的変更
    → 後方互換切れでコンシューマが落ちる。

Kafka / Redpandaの場合

  • Confluent Schema Registryを使う(Avro / Protobuf / JSON Schema対応)。
  • 互換性モードは BACKWARD が基本(新しいスキーマでも古いデータを読める)。
  • CI/CDに互換性チェックを組み込み、NGならリリースをストップ。
# schema.avsc を topic1 に追加できるか確認
curl -X POST http://schemaregistry:8081/compatibility/subjects/topic1-value/versions/latest \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{"schema": "'"$(< schema.avsc)"'"}'

Kinesisの場合

  • AWS Glue Schema Registryを利用。
  • Producerは put_record 時にスキーマIDを付与。Consumerは自動デシリアライズ。
  • 互換性モードは BACKWARD_ALL を推奨。

Flink / Kinesis Data Analyticsの場合

  • Schema Registryと連携し、テーブル定義を厳密化。
  • フィールド追加は nullable で吸収。削除やリネームは新テーブル/新ジョブを起動。

Django統合Tips

  • DBに“生JSON”を直に保存しない。
  • 受信イベントはSchema Registryで検証してからDBへ。
  • 検証失敗時はDLQ(Dead Letter Queue)へ退避
from schema_registry.client import SchemaRegistryClient, schema

client = SchemaRegistryClient({"url": "http://schemaregistry:8081"})
schema_id, parsed = client.get_latest_schema("events-value")

def validate_and_save(event):
    if not parsed.validate(event):
        save_to_dlq(event)  # NGデータはDLQへ
        return
    save_to_db(event)  # OKならDB保存

運用チェックリスト ✅

  • スキーマは必ずRegistryに集約している
  • 互換性モードは BACKWARD/BACKWARD_ALL に固定
  • CI/CDで自動チェックを走らせている
  • Django側で検証&DLQ退避の仕組みあり
  • 破壊的変更は必ず新トピックを用意して段階移行

まとめ

ストリーミング基盤は**「スキーマの契約社会」安心して変更できるのは追加だけ**。削除やリネームは別レーンで。
RegistryとCI/CDを組み合わせ、「壊れない変更しか入らない世界」を作ることが安定運用の第一歩。

投稿者 kojiro777

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です