TL;DR
- Djangoは「認証・発券担当」:短命(5分)のJWTで安全に接続キーを配る。
- WebRTC DataChannelは“秒間多数の軽量通知”用:数ms〜数十msの低遅延で小さな更新を押し出す。
- gRPC Server‑Streamingは“重いAI結果を小分け返却”:推論途中でも断片を順次返すことで体感を速く。
- 要注意:TURN未設定(NAT越え失敗)/EnvoyのHTTP/1.1(ストリーム切断)/DataChannelで巨大データ送信(ブラウザ固まる)。
全体アーキテクチャ(役割分担)
flowchart LR
subgraph Client[Browser]
UI[UI]
DC[WebRTC DataChannel]
WS[Signaling (WS/HTTP)]
end
subgraph Django[Django API/Auth]
JWT[短命JWT発行]
SIG[Signaling仲介]
end
subgraph Edge[Envoy/Ingress]
ENVOY[Envoy Proxy\nHTTP/2 gRPC]
TURN[TURN/STUN]
end
subgraph ML[ML Inference]
GRPC[gRPC Server\n(推論/ストリーミング)]
end
UI-->WS
WS<-->SIG
SIG-->JWT
DC<-->TURN
UI<-->DC
UI<-->ENVOY
ENVOY<-->GRPC
- Signaling(要:WSまたはHTTP + 短命JWT)でピア情報を交換し、WebRTC接続を確立。
- 低遅延の小さい更新(ステータス/数値/カーソル等)は DataChannel でプッシュ。
- 重量級の推論結果(JSON/バイナリ)は gRPC Server‑Streaming で、Envoy 経由のHTTP/2ストリームとして分割返却。
- TURN はNAT/企業ネット越えの“最後の砦”。プロダクションでは必須前提。
よくあるつまずき → 即解決
症状 | 原因 | 速攻対処 |
---|---|---|
接続はできるが一部の利用者で常に失敗 | TURN未設定/443しか開かない企業ネット | coturnを立て、TLS 5349(または443)で TURN を提供。ICEサーバーをクライアントへ配布 |
gRPC の長時間ストリームが途中で切れる | EnvoyがHTTP/1.1で上流接続/idle_timeout | Clusterの http2_protocol_options を有効化、stream_idle_timeout を十分長く |
DataChannel送信でブラウザが固まる | 巨大ペイロードをそのまま送信/背圧未対応 | chunk化+dc.bufferedAmount による背圧制御、巨大データは gRPC に回す |
P2P接続がリレー経路ばかりで遅い | STUN/TURN優先順やネット環境 | ターゲット層のネット環境実測。TURNは冗長構成+地理的に近いリージョン配置 |
Django:短命JWTを発行(5分)
認証はDjangoが担当。短命トークンを配って“使い捨て”を徹底。
# views.py
from datetime import datetime, timedelta, timezone
import jwt
from django.http import JsonResponse
from django.contrib.auth.decorators import login_required
SECRET = "<rotate-me>" # 実運用はKeyRotation/Vault
ISSUER = "your-service"
AUD = "realtime-clients"
@login_required
def issue_token(request):
now = datetime.now(timezone.utc)
payload = {
"sub": str(request.user.id),
"iss": ISSUER,
"aud": AUD,
"jti": str(now.timestamp()),
"exp": int((now + timedelta(minutes=5)).timestamp()),
"scope": ["webrtc", "grpc"]
}
token = jwt.encode(payload, SECRET, algorithm="HS256")
return JsonResponse({"token": token})
ポイント
- 5分など短命+
iss
/aud
/jti
で再利用・誤用を抑制。 - 秘密鍵は定期ローテーション。KMS/Vault/Parameter Store 管理推奨。
- フロントは必要直前に取得し、シグナリング/ヘッダ/クエリで提示。
フロント:WebRTC DataChannel(軽量通知)
// signalingでSDP/ICE交換後…
const pc = new RTCPeerConnection({
iceServers: window.ICE_SERVERS, // Djangoが配る(下記参照)
});
const dc = pc.createDataChannel("updates", {
ordered: true,
maxRetransmits: 0, // 超低遅延用(必要に応じ調整)
});
dc.onmessage = (e) => renderUpdate(JSON.parse(e.data));
// 背圧:バッファ量を監視して送信を制御
const CHUNK = 8 * 1024; // 8KBずつ
const LOW_WATERMARK = 64 * 1024;
dc.bufferedAmountLowThreshold = LOW_WATERMARK;
function sendLarge(obj: any) {
const enc = new TextEncoder();
const data = enc.encode(JSON.stringify(obj));
for (let i = 0; i < data.length; i += CHUNK) {
const slice = data.slice(i, Math.min(i + CHUNK, data.length));
if (dc.bufferedAmount > LOW_WATERMARK) {
return new Promise<void>((resolve) => {
dc.addEventListener("bufferedamountlow", () => {
dc.send(slice);
resolve();
}, { once: true });
});
}
dc.send(slice);
}
}
送信の心得
- “頻繁で小さな”通知はDataChannelへ。
- 画像/動画/数MB級は DataChannelに乗せない(gRPC HTTP/2経由へ)。
bufferedAmount
を見て詰まり対策。
gRPC:Server‑Streamingで“途中経過を即返す”
proto 例
syntax = "proto3";
package inference;
service Inference {
rpc StreamPredict(PredictRequest) returns (stream PredictChunk);
}
message PredictRequest { string task_id = 1; bytes payload = 2; }
message PredictChunk { int32 index = 1; bytes partial = 2; bool done = 3; }
Python サーバ(抜粋)
import inference_pb2 as pb
import inference_pb2_grpc as rpc
class InferenceServicer(rpc.InferenceServicer):
def StreamPredict(self, request, context):
for i, part in enumerate(run_heavy_inference(request.payload)):
yield pb.PredictChunk(index=i, partial=part, done=False)
yield pb.PredictChunk(index=-1, partial=b"", done=True)
Python クライアント(ブラウザ→ゲートウェイ→gRPC の例)
import grpc, inference_pb2 as pb, inference_pb2_grpc as rpc
channel = grpc.insecure_channel("ml:50051")
stub = rpc.InferenceStub(channel)
for chunk in stub.StreamPredict(pb.PredictRequest(task_id="abc", payload=b"...")):
handle_partial(chunk)
フロントはFetch/WSでゲートウェイ(Django/Envoy)に接続し、ストリームを読みながら描画。
Envoy:HTTP/2でgRPCを正しく通す
static_resources:
listeners:
- name: listener_0
address:
socket_address: { address: 0.0.0.0, port_value: 8080 }
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
codec_type: AUTO
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: backend
domains: ["*"]
routes:
- match: { prefix: "/inference.Inference/" }
route:
cluster: grpc_cluster
timeout: 0s # ストリーム無期限
http_filters:
- name: envoy.filters.http.router
clusters:
- name: grpc_cluster
type: logical_dns
connect_timeout: 2s
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: grpc_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address: { address: ml, port_value: 50051 }
http2_protocol_options: {}
upstream_connection_options:
tcp_keepalive: { keepalive_time: 60 }
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
explicit_http_config:
http2_protocol_options:
max_concurrent_streams: 1000
common_http_protocol_options:
idle_timeout: 0s # ストリーム維持
チェック
http2_protocol_options: {}
をCluster側で必ず指定。timeout: 0s
/idle_timeout: 0s
で長時間ストリーム継続。- KeepAlive/
max_concurrent_streams
を適切化。
TURN(coturn):NAT越えの“最後の砦”
最低限の設定例(/etc/turnserver.conf)
listening-port=3478
tls-listening-port=5349
fingerprint
lt-cred-mech
realm=your-realm.example
use-auth-secret
static-auth-secret=<SHARED_SECRET> # Djangoと共有
user-quota=12
total-quota=1200
no-tlsv1
no-tlsv1_1
cert=/etc/letsencrypt/live/your.domain/fullchain.pem
pkey=/etc/letsencrypt/live/your.domain/privkey.pem
DjangoでICEサーバ情報をRESTで配布(TURN REST認証)
import base64, hmac, hashlib, time
from django.http import JsonResponse
TURN_SECRET = "<shared-with-coturn>"
TURN_URI = "turns:your.domain:5349?transport=tcp"
@login_required
def ice_servers(request):
# 5分トークン
expiry = int(time.time()) + 300
username = str(expiry)
digest = hmac.new(TURN_SECRET.encode(), username.encode(), hashlib.sha1).digest()
credential = base64.b64encode(digest).decode()
return JsonResponse({
"iceServers": [
{"urls": ["stun:stun.l.google.com:19302"]},
{"urls": [TURN_URI], "username": username, "credential": credential}
]
})
ブラウザ側
const { iceServers } = await fetch("/api/ice-servers", { credentials: "include" }).then(r => r.json());
const pc = new RTCPeerConnection({ iceServers });
運用Tips:TLS(5349/443)で提供、リージョンをユーザーに近づける。冗長化&ヘルスチェック。
運用メトリクス(まずはこれ)
- RTT(DataChannel ping)… 1〜2秒ごとに往復遅延を測り、p95/p99を見る。
- 接続再試行回数… ICE再試行/候補切替が多いなら TURN経路やネット制限を点検。
- Envoy gRPC エラー…
rx_reset
/messaging_error
/upstream_rq_timeout
をダッシュボード化。 - ストリーム継続時間… 長寿命のgRPCストリームが自然終了か強制終了かを分離集計。
- DataChannel bufferedAmount… 詰まりの早期検知(しきい値アラート)。
失敗しない実装の順番
- gRPC単体でServer‑Streamingをローカル完結で検証
- Envoyを噛ませてHTTP/2設定の地ならし
- DjangoでJWT/ICE配布のRESTを用意
- WebRTC Signaling(WS/HTTP)をDjango経由で整備
- DataChannelの背圧制御・再接続ポリシーを実装
- 本番ネットでTURNを有効化し広帯域/制限ネット双方で検証
トラブルシュート早見表
- 症状:ストリームが数分で切れる
見る場所:Envoyidle_timeout
/ 負荷分散の接続再利用
対処:idle_timeout: 0s
、上流HTTP/2固定、KeepAlive設定 - 症状:一部ネットワークで常に接続失敗
見る場所:ICE候補(Host/Reflexive/Relay)
対処:TURN(TLS)必須化、443/TCPフォールバック - 症状:ブラウザが固まる
見る場所:DataChannel payload/bufferedAmount
対処:chunk + 背圧、巨大データはgRPCへ
セキュリティ設計
- JWTは短命 + scope最小化、
aud/iss/jti
で検証強化。 - 秘密鍵はローテーション(kid付与 → 複数鍵受け入れ → 切替)。
- TURNの
static-auth-secret
は厳重管理、期限付き資格で配布。 - WebRTCはE2E暗号(SRTP/DTLS)、gRPCもTLS終端で二重化。
どのデータをどこで送る?(迷ったらこれ)
- DataChannel:カーソル位置、スコア、軽量なステータス、テキスト通知(\n< 10KB/回 で高頻度)。
- gRPC:推論JSON、画像断片、バイナリ結果、ログのまとまり(数十KB〜MB級)。
付録:簡易Signaling(Django Channels例・疑似)
# consumers.py (概念例)
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from jwt import decode, InvalidTokenError
class SignalingConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
token = self.scope["query_string"].decode().split("token=")[-1]
try:
user = decode(token, SECRET, audience=AUD, algorithms=["HS256"]) # 検証
self.user_id = user["sub"]
except InvalidTokenError:
return await self.close()
await self.accept()
async def receive_json(self, content):
# SDP/ICEの交換を仲介(部屋IDや相手へ転送)
...
まとめ
「Djangoでセキュアに入場券を配る」→「WebRTCで超低遅延の軽通知」→「gRPCで重い結果を小分け返却」。この役割分担を守ると、リアルタイム性と安定性の“いいとこ取り”ができる。まずはローカルでストリーム → Envoy → TURN の順で固めよう。