doc/source/ray-overview/examples/e2e-multimodal-ai-workloads/notebooks/03-Online-Serving.ipynb
<a href="https://github.com/anyscale/multimodal-ai" role="button"></a>Ā
</div>This tutorial launches an online service that deploys the trained model to generate predictions and autoscales based on incoming traffic.
%%bash
pip install -q -r /home/ray/default/requirements.txt
pip install -q -e /home/ray/default/doggos
Note: A kernel restart may be required for all dependencies to become available.
If using uv, then:
Dependencies tab up top > Toggle off Pip packages). And no need to run the pip install commands above.venv (Click on base (Python x.yy.zz) on top right cordern of notebook > Select another Kernel > Python Environments... > Create Python Environment > Venv > Use Existing) and done! Now all the notebook's cells will use the virtual env.uv run instead of python by adding this line after importing ray.import os
os.environ.pop("RAY_RUNTIME_ENV_HOOK", None)
import ray
ray.init(runtime_env={"py_executable": "uv run", "working_dir": "/home/ray/default"})
%load_ext autoreload
%autoreload all
import os
import ray
import sys
sys.path.append(os.path.abspath("../doggos/"))
# If using UV
# os.environ.pop("RAY_RUNTIME_ENV_HOOK", None)
# ray.init(runtime_env={"py_executable": "uv run", "working_dir": "/home/ray/default"})
import os
from fastapi import FastAPI
import mlflow
import requests
from starlette.requests import Request
from urllib.parse import urlparse
from ray import serve
import numpy as np
from PIL import Image
import torch
from transformers import CLIPModel, CLIPProcessor
from doggos.infer import TorchPredictor
from doggos.model import collate_fn
from doggos.utils import url_to_array
First create a deployment for the trained model that generates a probability distribution for a given image URL. You can specify the compute you want to use with ray_actor_options, and how you want to horizontally scale, with num_replicas, this specific deployment.
@serve.deployment(
num_replicas="1",
ray_actor_options={
"num_gpus": 1,
"accelerator_type": "T4",
},
)
class ClassPredictor:
def __init__(self, model_id, artifacts_dir, device="cuda"):
"""Initialize the model."""
# Embdding model
self.processor = CLIPProcessor.from_pretrained(model_id)
self.model = CLIPModel.from_pretrained(model_id)
self.model.to(device=device)
self.device = device
# Trained classifier
self.predictor = TorchPredictor.from_artifacts_dir(artifacts_dir=artifacts_dir)
self.preprocessor = self.predictor.preprocessor
def get_probabilities(self, url):
image = Image.fromarray(np.uint8(url_to_array(url=url))).convert("RGB")
inputs = self.processor(images=[image], return_tensors="pt", padding=True).to(self.device)
with torch.inference_mode():
embedding = self.model.get_image_features(**inputs).cpu().numpy()
outputs = self.predictor.predict_probabilities(
collate_fn({"embedding": embedding}))
return {"probabilities": outputs["probabilities"][0]}
Ray Serve makes it easy to do model composition where you can compose multiple deployments containing ML models or business logic into a single application. You can independently scale even fractional resources, and configure each of your deployments.
# Define app.
api = FastAPI(
title="doggos",
description="classify your dog",
version="0.1",
)
@serve.deployment
@serve.ingress(api)
class Doggos:
def __init__(self, classifier):
self.classifier = classifier
@api.post("/predict/")
async def predict(self, request: Request):
data = await request.json()
probabilities = await self.classifier.get_probabilities.remote(url=data["url"])
return probabilities
# Model registry.
model_registry = "/mnt/cluster_storage/mlflow/doggos"
experiment_name = "doggos"
mlflow.set_tracking_uri(f"file:{model_registry}")
# Get best_run's artifact_dir.
sorted_runs = mlflow.search_runs(
experiment_names=[experiment_name],
order_by=["metrics.val_loss ASC"])
best_run = sorted_runs.iloc[0]
artifacts_dir = urlparse(best_run.artifact_uri).path
# Define app.
app = Doggos.bind(
classifier=ClassPredictor.bind(
model_id="openai/clip-vit-base-patch32",
artifacts_dir=artifacts_dir,
device="cuda"
)
)
# Run service locally.
serve.run(app, route_prefix="/")
# Send a request.
url = "https://doggos-dataset.s3.us-west-2.amazonaws.com/samara.png"
data = {"url": url}
response = requests.post("http://127.0.0.1:8000/predict/", json=data)
probabilities = response.json()["probabilities"]
sorted_probabilities = sorted(probabilities.items(), key=lambda x: x[1], reverse=True)
sorted_probabilities[0:3]
Ray Serve is a highly scalable and flexible model serving library for building online inference APIs that allows you to:
num_replicas.š„ RayTurbo Serve on Anyscale has more functionality on top of Ray Serve:
The Ray dashboard and specifically the Serve view automatically captures observability for Ray Serve applications. You can view the service deployments and their replicas and time-series metrics to see the service's health.
Anyscale Services (API ref) offers a fault tolerant, scalable, and optimized way to serve Ray Serve applications. You can:
num_replicas=auto, and utilize replica compaction to consolidate nodes that are fractionally utilized.Note:
containerfile to define dependencies, but you could easily use a pre-built image as well.# Production online service.
anyscale service deploy -f /home/ray/default/configs/service.yaml
(anyscale +1.9s) Restarting existing service 'doggos-app'.
(anyscale +3.2s) Uploading local dir '/home/ray/default' to cloud storage.
(anyscale +5.2s) Including workspace-managed pip dependencies.
(anyscale +5.8s) Service 'doggos-app' deployed (version ID: akz9ul28).
(anyscale +5.8s) View the service in the UI: 'https://console.anyscale.com/services/service2_6hxismeqf1fkd2h7pfmljmncvm'
(anyscale +5.8s) Query the service once it's running using the following curl command (add the path you want to query):
(anyscale +5.8s) curl -H "Authorization: Bearer <BEARER_TOKEN>" https://doggos-app-bxauk.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com/
curl -X POST "https://doggos-app-bxauk.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com/predict/" \
-H "Authorization: Bearer <BEARER_TOKEN>" \
-H "Content-Type: application/json" \
-d '{"url": "https://doggos-dataset.s3.us-west-2.amazonaws.com/samara.png", "k": 4}'
# Terminate service.
anyscale service terminate --name doggos-app
(anyscale +1.5s) Service service2_6hxismeqf1fkd2h7pfmljmncvm terminate initiated.
(anyscale +1.5s) View the service in the UI at https://console.anyscale.com/services/service2_6hxismeqf1fkd2h7pfmljmncvm
While Anyscale Jobs and Services are useful atomic concepts that help you productionize workloads, they're also useful for nodes in a larger ML DAG or CI/CD workflow. You can chain Jobs together, store results and then serve your application with those artifacts. From there, you can trigger updates to your service and retrigger the Jobs based on events, time, etc. While you can simply use the Anyscale CLI to integrate with any orchestration platform, Anyscale does support some purpose-built integrations like Airflow and Prefect.
šØ Note: Reset this notebook using the "š Restart" button location at the notebook's menu bar. This way we can free up all the variables, utils, etc. used in this notebook.