ねらい:複数のモデル(例:画像分類のResNet+テキスト分類のBERT)を1つのBentoML Serviceに束ね、運用・配信・監視をシンプルにする。


TL;DR(最短まとめ)

  • モデルをバラバラに立てると、エンドポイント乱立・デプロイ管理地獄になりがち。
  • **BentoMLは1つのServiceに複数Runner(=モデル)**を登録できる。統合APIで推論受付が可能。
  • GPUは“共有資源”。重いモデルを無計画に同居させるとOOM(メモリ不足)。→ Runner単位でGPU/CPU割り当て・同時実行数・バッチングをコントロール。

こんなときに効く

  • マイクロサービスを乱立させたくない(1エンドポイントでまとめたい)。
  • APIゲートウェイや認証、レート制限、ログ/メトリクスをひとまとめに運用したい。
  • 軽量モデルと重量モデルを用途別に同居させ、CPU/GPUを使い分けたい。

アーキテクチャざっくり図

[ Client ]
    |
    v
[ Bento Service ]  --- Runners -->  [ ResNet50 (GPU) ]
        |                             [ BERT (CPU/GPU) ]
        +-- Metrics/Logs
        +-- Auth/Rate Limit

最小サンプル(service.py)

ポイント:Runnerを先に作成 → Serviceに登録 → API内でRunnerを呼ぶ。

# service.py
import bentoml
from bentoml.io import JSON

# 事前に `bentoml models list` で存在確認しておくこと
resnet_model = bentoml.pytorch.get("resnet50:latest")
bert_model = bentoml.transformers.get("bert:latest")

# 1) Runnerを作成(資源は後述の設定で制御)
resnet_runner = resnet_model.to_runner()
bert_runner = bert_model.to_runner()

# 2) Serviceに複数Runnerをまとめる
svc = bentoml.Service(
    name="multi_model_service",
    runners=[resnet_runner, bert_runner],
)

# 3) 統合エンドポイント
@svc.api(input=JSON(), output=JSON())
def classify(payload: dict):
    # 例:payload = {"image": "...", "text": "..."}
    image = payload.get("image")
    text = payload.get("text")

    # Runner.run(...) は同期呼び出し。非同期は async_run(...) を利用可能
    res = {}
    if image is not None:
        res["resnet"] = resnet_runner.run({"image": image})
    if text is not None:
        res["bert"] = bert_runner.run({"text": text})
    return res

非同期版のヒントasync def にして await resnet_runner.async_run(...) とすれば、I/O待ちや複数Runner並列呼び出しの効率が向上します。


リソース・同時実行・バッチングの制御(bentofile.yaml

Runner単位でGPU/CPUやワーカー数、バッチングを設定できます(例)。

# bentofile.yaml(一例)
service: "service:svc"
labels:
  project: multi-model
  owner: ml-team

# DockerやPython依存は省略(必要に応じて追加)

runners:
  - name: resnet_runner
    resources:
      gpu: 1           # GPU 1枚を割り当て(MIGやfractional GPUは環境に依存)
      cpu: "2"
      memory: "2Gi"
    traffic:
      max_concurrency: 2   # 同時実行を抑えてOOM予防
      max_batch_size: 8     # バッチング
      max_latency: 50ms     # バッチ待機の最大遅延

  - name: bert_runner
    resources:
      gpu: 0           # CPU運用に切り替え(軽量/低頻度用途なら有効)
      cpu: "4"
      memory: "4Gi"
    traffic:
      max_concurrency: 4
      max_batch_size: 4
      max_latency: 100ms

コツ:重いモデルは max_concurrency を低めに、軽量モデルは高めに。GPUメモリはRunner起動時に確保されがちなので、同居数は少なめから始めて計測で調整。


ビルド&起動&疎通

# 1) Bentoをビルド
bentoml build

# 2) コンテナ化して起動(例)
bentoml containerize multi_model_service:latest

docker run --gpus all -p 3000:3000 multi_model_service:latest

# 3) 疎通確認
curl -X POST http://localhost:3000/classify \
  -H 'Content-Type: application/json' \
  -d '{"image": "<BASE64_IMAGE>", "text": "今日はいい天気"}'

よくある落とし穴 → 先回り回避策

  1. 「全部GPU」で即Out Of Memory
  • 先にモデルサイズと推論時のVRAM使用量を把握。
  • 重いモデルは単独GPU or 別サービスに隔離。軽量モデルはCPU運用に寄せる。
  1. 同時実行の上げすぎ
  • p95/p99レイテンシとエラー率を監視。スループットだけ見ない。
  • Runnerごとに max_concurrency を分ける。
  1. バッチングの副作用(待ち時間増)
  • max_latency を短めにして“待ちすぎ”を防ぐ。
  • リアルタイム用途は小さなバッチ+短い待機
  1. ロード順の想定外
  • 起動時ログで各Runnerの初期化順を確認。重い初期化(モデルロード・トークナイザDL等)はウォームアップで先行実行。
  1. モデル更新でダウンタイム
  • 新旧BentoのBlue/Green切替やCanaryで段階移行。
  • モデルバージョンをレスポンスに含めて可観測性を上げる。

メトリクス設計(見るべき数字)

  • エンドツーエンド遅延:p95/p99。
  • Runner別:キュー長、同時実行数、バッチサイズ、エラー率。
  • GPU/CPU:利用率・メモリ使用量(nvidia-smi + エクスポート)。
  • スループット:RPS(用途別に分解)。

判断基準の例

  • p95がSLO(例:200ms)を超えたら、まず同時実行→バッチ設定を見直す。
  • 常時GPU 90%超&OOMなし → 微増で同時実行を上げる。
  • OOMが出る → モデル分離orCPU化を最優先。

パターン別レシピ

  • パターンA:軽量×2 + 重量×1
    • 重量モデルをGPU、軽量はCPU。入口は共通API。
    • 軽量Runnerは max_concurrency 高め、重量Runnerは低め。
  • パターンB:全モデルGPUだが小さめ
    • 全RunnerにGPUを割り当て。ただし同時実行をかなり絞る
    • 将来の増加に備え、モデルごと別サービス化も検討。
  • パターンC:トラフィック偏在(ホットキー)
    • キュー長/待機増を観測。軽量モデルで先に返す“ファストパス”を用意。
    • 重いモデルは非同期化 or 別キューへ退避。


参考キーワード

  • BentoML multi-model service, BentoML runners, GPU OOM, batching, blue/green, canary, p95/p99 latency.

付録:非同期並列の例(応答を速く)

# 非同期で並列実行したいケースの例
from bentoml.io import JSON

@svc.api(input=JSON(), output=JSON())
async def classify_async(payload: dict):
    image = payload.get("image")
    text = payload.get("text")

    tasks = []
    if image is not None:
        tasks.append(resnet_runner.async_run({"image": image}))
    if text is not None:
        tasks.append(bert_runner.async_run({"text": text}))

    results = await asyncio.gather(*tasks)
    out = {}
    i = 0
    if image is not None:
        out["resnet"] = results[i]; i += 1
    if text is not None:
        out["bert"] = results[i]
    return out

注意:I/O待ちやCPU処理が混在する場合、非同期化で体感遅延が大きく改善することが多い。

投稿者 kojiro777

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です