TL;DR

  • Djangoは「認証・発券担当」:短命(5分)のJWTで安全に接続キーを配る。
  • WebRTC DataChannelは“秒間多数の軽量通知”用:数ms〜数十msの低遅延で小さな更新を押し出す。
  • gRPC Server‑Streamingは“重いAI結果を小分け返却”:推論途中でも断片を順次返すことで体感を速く。
  • 要注意:TURN未設定(NAT越え失敗)/EnvoyのHTTP/1.1(ストリーム切断)/DataChannelで巨大データ送信(ブラウザ固まる)。

全体アーキテクチャ(役割分担)

flowchart LR
  subgraph Client[Browser]
    UI[UI]
    DC[WebRTC DataChannel]
    WS[Signaling (WS/HTTP)]
  end
  subgraph Django[Django API/Auth]
    JWT[短命JWT発行]
    SIG[Signaling仲介]
  end
  subgraph Edge[Envoy/Ingress]
    ENVOY[Envoy Proxy\nHTTP/2 gRPC]
    TURN[TURN/STUN]
  end
  subgraph ML[ML Inference]
    GRPC[gRPC Server\n(推論/ストリーミング)]
  end

  UI-->WS
  WS<-->SIG
  SIG-->JWT
  DC<-->TURN
  UI<-->DC
  UI<-->ENVOY
  ENVOY<-->GRPC
  • Signaling(要:WSまたはHTTP + 短命JWT)でピア情報を交換し、WebRTC接続を確立。
  • 低遅延の小さい更新(ステータス/数値/カーソル等)は DataChannel でプッシュ。
  • 重量級の推論結果(JSON/バイナリ)は gRPC Server‑Streaming で、Envoy 経由のHTTP/2ストリームとして分割返却。
  • TURN はNAT/企業ネット越えの“最後の砦”。プロダクションでは必須前提。

よくあるつまずき → 即解決

症状原因速攻対処
接続はできるが一部の利用者で常に失敗TURN未設定/443しか開かない企業ネットcoturnを立て、TLS 5349(または443)で TURN を提供。ICEサーバーをクライアントへ配布
gRPC の長時間ストリームが途中で切れるEnvoyがHTTP/1.1で上流接続/idle_timeoutClusterの http2_protocol_options を有効化、stream_idle_timeout を十分長く
DataChannel送信でブラウザが固まる巨大ペイロードをそのまま送信/背圧未対応chunk化dc.bufferedAmount による背圧制御、巨大データは gRPC に回す
P2P接続がリレー経路ばかりで遅いSTUN/TURN優先順やネット環境ターゲット層のネット環境実測。TURNは冗長構成+地理的に近いリージョン配置

Django:短命JWTを発行(5分)

認証はDjangoが担当。短命トークンを配って“使い捨て”を徹底。

# views.py
from datetime import datetime, timedelta, timezone
import jwt
from django.http import JsonResponse
from django.contrib.auth.decorators import login_required

SECRET = "<rotate-me>"  # 実運用はKeyRotation/Vault
ISSUER = "your-service"
AUD = "realtime-clients"

@login_required
def issue_token(request):
    now = datetime.now(timezone.utc)
    payload = {
        "sub": str(request.user.id),
        "iss": ISSUER,
        "aud": AUD,
        "jti": str(now.timestamp()),
        "exp": int((now + timedelta(minutes=5)).timestamp()),
        "scope": ["webrtc", "grpc"]
    }
    token = jwt.encode(payload, SECRET, algorithm="HS256")
    return JsonResponse({"token": token})

ポイント

  • 5分など短命+iss/aud/jti再利用・誤用を抑制。
  • 秘密鍵は定期ローテーション。KMS/Vault/Parameter Store 管理推奨。
  • フロントは必要直前に取得し、シグナリング/ヘッダ/クエリで提示。

フロント:WebRTC DataChannel(軽量通知)

// signalingでSDP/ICE交換後…
const pc = new RTCPeerConnection({
  iceServers: window.ICE_SERVERS, // Djangoが配る(下記参照)
});

const dc = pc.createDataChannel("updates", {
  ordered: true,
  maxRetransmits: 0, // 超低遅延用(必要に応じ調整)
});

dc.onmessage = (e) => renderUpdate(JSON.parse(e.data));

// 背圧:バッファ量を監視して送信を制御
const CHUNK = 8 * 1024; // 8KBずつ
const LOW_WATERMARK = 64 * 1024;

dc.bufferedAmountLowThreshold = LOW_WATERMARK;
function sendLarge(obj: any) {
  const enc = new TextEncoder();
  const data = enc.encode(JSON.stringify(obj));
  for (let i = 0; i < data.length; i += CHUNK) {
    const slice = data.slice(i, Math.min(i + CHUNK, data.length));
    if (dc.bufferedAmount > LOW_WATERMARK) {
      return new Promise<void>((resolve) => {
        dc.addEventListener("bufferedamountlow", () => {
          dc.send(slice);
          resolve();
        }, { once: true });
      });
    }
    dc.send(slice);
  }
}

送信の心得

  • 頻繁で小さな”通知はDataChannelへ。
  • 画像/動画/数MB級は DataChannelに乗せない(gRPC HTTP/2経由へ)。
  • bufferedAmount を見て詰まり対策

gRPC:Server‑Streamingで“途中経過を即返す”

proto 例

syntax = "proto3";
package inference;

service Inference {
  rpc StreamPredict(PredictRequest) returns (stream PredictChunk);
}

message PredictRequest { string task_id = 1; bytes payload = 2; }
message PredictChunk { int32 index = 1; bytes partial = 2; bool done = 3; }

Python サーバ(抜粋)

import inference_pb2 as pb
import inference_pb2_grpc as rpc

class InferenceServicer(rpc.InferenceServicer):
    def StreamPredict(self, request, context):
        for i, part in enumerate(run_heavy_inference(request.payload)):
            yield pb.PredictChunk(index=i, partial=part, done=False)
        yield pb.PredictChunk(index=-1, partial=b"", done=True)

Python クライアント(ブラウザ→ゲートウェイ→gRPC の例)

import grpc, inference_pb2 as pb, inference_pb2_grpc as rpc

channel = grpc.insecure_channel("ml:50051")
stub = rpc.InferenceStub(channel)
for chunk in stub.StreamPredict(pb.PredictRequest(task_id="abc", payload=b"...")):
    handle_partial(chunk)

フロントはFetch/WSでゲートウェイ(Django/Envoy)に接続し、ストリームを読みながら描画


Envoy:HTTP/2でgRPCを正しく通す

static_resources:
  listeners:
    - name: listener_0
      address:
        socket_address: { address: 0.0.0.0, port_value: 8080 }
      filter_chains:
        - filters:
            - name: envoy.filters.network.http_connection_manager
              typed_config:
                "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
                codec_type: AUTO
                stat_prefix: ingress_http
                route_config:
                  name: local_route
                  virtual_hosts:
                    - name: backend
                      domains: ["*"]
                      routes:
                        - match: { prefix: "/inference.Inference/" }
                          route:
                            cluster: grpc_cluster
                            timeout: 0s  # ストリーム無期限
                http_filters:
                  - name: envoy.filters.http.router
  clusters:
    - name: grpc_cluster
      type: logical_dns
      connect_timeout: 2s
      lb_policy: ROUND_ROBIN
      load_assignment:
        cluster_name: grpc_cluster
        endpoints:
          - lb_endpoints:
              - endpoint:
                  address:
                    socket_address: { address: ml, port_value: 50051 }
      http2_protocol_options: {}
      upstream_connection_options:
        tcp_keepalive: { keepalive_time: 60 }
      typed_extension_protocol_options:
        envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
          "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
          explicit_http_config:
            http2_protocol_options:
              max_concurrent_streams: 1000
      common_http_protocol_options:
        idle_timeout: 0s  # ストリーム維持

チェック

  • http2_protocol_options: {}Cluster側で必ず指定。
  • timeout: 0sidle_timeout: 0s長時間ストリーム継続
  • KeepAlive/max_concurrent_streams を適切化。

TURN(coturn):NAT越えの“最後の砦”

最低限の設定例(/etc/turnserver.conf)

listening-port=3478
tls-listening-port=5349
fingerprint
lt-cred-mech
realm=your-realm.example
use-auth-secret
static-auth-secret=<SHARED_SECRET>  # Djangoと共有
user-quota=12
total-quota=1200
no-tlsv1
no-tlsv1_1
cert=/etc/letsencrypt/live/your.domain/fullchain.pem
pkey=/etc/letsencrypt/live/your.domain/privkey.pem

DjangoでICEサーバ情報をRESTで配布(TURN REST認証)

import base64, hmac, hashlib, time
from django.http import JsonResponse

TURN_SECRET = "<shared-with-coturn>"
TURN_URI = "turns:your.domain:5349?transport=tcp"

@login_required
def ice_servers(request):
    # 5分トークン
    expiry = int(time.time()) + 300
    username = str(expiry)
    digest = hmac.new(TURN_SECRET.encode(), username.encode(), hashlib.sha1).digest()
    credential = base64.b64encode(digest).decode()
    return JsonResponse({
      "iceServers": [
        {"urls": ["stun:stun.l.google.com:19302"]},
        {"urls": [TURN_URI], "username": username, "credential": credential}
      ]
    })

ブラウザ側

const { iceServers } = await fetch("/api/ice-servers", { credentials: "include" }).then(r => r.json());
const pc = new RTCPeerConnection({ iceServers });

運用Tips:TLS(5349/443)で提供、リージョンをユーザーに近づける。冗長化&ヘルスチェック。


運用メトリクス(まずはこれ)

  • RTT(DataChannel ping)… 1〜2秒ごとに往復遅延を測り、p95/p99を見る。
  • 接続再試行回数… ICE再試行/候補切替が多いなら TURN経路ネット制限を点検。
  • Envoy gRPC エラーrx_reset / messaging_error / upstream_rq_timeout をダッシュボード化。
  • ストリーム継続時間… 長寿命のgRPCストリームが自然終了強制終了かを分離集計。
  • DataChannel bufferedAmount… 詰まりの早期検知(しきい値アラート)。

失敗しない実装の順番

  1. gRPC単体でServer‑Streamingをローカル完結で検証
  2. Envoyを噛ませてHTTP/2設定の地ならし
  3. DjangoでJWT/ICE配布のRESTを用意
  4. WebRTC Signaling(WS/HTTP)をDjango経由で整備
  5. DataChannelの背圧制御・再接続ポリシーを実装
  6. 本番ネットでTURNを有効化し広帯域/制限ネット双方で検証

トラブルシュート早見表

  • 症状:ストリームが数分で切れる
    見る場所:Envoy idle_timeout / 負荷分散の接続再利用
    対処idle_timeout: 0s、上流HTTP/2固定、KeepAlive設定
  • 症状:一部ネットワークで常に接続失敗
    見る場所:ICE候補(Host/Reflexive/Relay)
    対処:TURN(TLS)必須化、443/TCPフォールバック
  • 症状:ブラウザが固まる
    見る場所:DataChannel payload/bufferedAmount
    対処chunk + 背圧、巨大データはgRPCへ

セキュリティ設計

  • JWTは短命 + scope最小化aud/iss/jti で検証強化。
  • 秘密鍵はローテーション(kid付与 → 複数鍵受け入れ → 切替)。
  • TURNのstatic-auth-secret厳重管理、期限付き資格で配布。
  • WebRTCはE2E暗号(SRTP/DTLS)、gRPCもTLS終端で二重化。


どのデータをどこで送る?(迷ったらこれ)

  • DataChannel:カーソル位置、スコア、軽量なステータス、テキスト通知(\n< 10KB/回 で高頻度)。
  • gRPC:推論JSON、画像断片、バイナリ結果、ログのまとまり(数十KB〜MB級)。

付録:簡易Signaling(Django Channels例・疑似)

# consumers.py (概念例)
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from jwt import decode, InvalidTokenError

class SignalingConsumer(AsyncJsonWebsocketConsumer):
    async def connect(self):
        token = self.scope["query_string"].decode().split("token=")[-1]
        try:
            user = decode(token, SECRET, audience=AUD, algorithms=["HS256"])  # 検証
            self.user_id = user["sub"]
        except InvalidTokenError:
            return await self.close()
        await self.accept()

    async def receive_json(self, content):
        # SDP/ICEの交換を仲介(部屋IDや相手へ転送)
        ...

まとめ

「Djangoでセキュアに入場券を配る」→「WebRTCで超低遅延の軽通知」→「gRPCで重い結果を小分け返却」。この役割分担を守ると、リアルタイム性と安定性の“いいとこ取り”ができる。まずはローカルでストリーム → Envoy → TURN の順で固めよう。

投稿者 kojiro777

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です