Back to Ray

Online serving

doc/source/ray-overview/examples/e2e-multimodal-ai-workloads/notebooks/03-Online-Serving.ipynb

1.13.112.7 KB
Original Source

Online serving

<div align="left"> <a target="_blank" href="https://console.anyscale.com/"></a>&nbsp;

<a href="https://github.com/anyscale/multimodal-ai" role="button"></a>Ā 

</div>

This tutorial launches an online service that deploys the trained model to generate predictions and autoscales based on incoming traffic.

python
%%bash
pip install -q -r /home/ray/default/requirements.txt
pip install -q -e /home/ray/default/doggos

Note: A kernel restart may be required for all dependencies to become available.

If using uv, then:

  1. Turn off the runtime dependencies (Dependencies tab up top > Toggle off Pip packages). And no need to run the pip install commands above.
  2. Change the python kernel of this notebook to use the venv (Click on base (Python x.yy.zz) on top right cordern of notebook > Select another Kernel > Python Environments... > Create Python Environment > Venv > Use Existing) and done! Now all the notebook's cells will use the virtual env.
  3. Change the py executable to use uv run instead of python by adding this line after importing ray.
python
import os
os.environ.pop("RAY_RUNTIME_ENV_HOOK", None)
import ray
ray.init(runtime_env={"py_executable": "uv run", "working_dir": "/home/ray/default"})
python
%load_ext autoreload
%autoreload all

python
import os
import ray
import sys
sys.path.append(os.path.abspath("../doggos/"))

python
# If using UV
# os.environ.pop("RAY_RUNTIME_ENV_HOOK", None)
# ray.init(runtime_env={"py_executable": "uv run", "working_dir": "/home/ray/default"})

python
import os
from fastapi import FastAPI
import mlflow
import requests
from starlette.requests import Request
from urllib.parse import urlparse
from ray import serve

python
import numpy as np
from PIL import Image
import torch
from transformers import CLIPModel, CLIPProcessor

python
from doggos.infer import TorchPredictor
from doggos.model import collate_fn
from doggos.utils import url_to_array

Deployments

First create a deployment for the trained model that generates a probability distribution for a given image URL. You can specify the compute you want to use with ray_actor_options, and how you want to horizontally scale, with num_replicas, this specific deployment.

python
@serve.deployment(
    num_replicas="1", 
    ray_actor_options={
        "num_gpus": 1, 
        "accelerator_type": "T4",
    },
)
class ClassPredictor:
    def __init__(self, model_id, artifacts_dir, device="cuda"):
        """Initialize the model."""
        # Embdding model
        self.processor = CLIPProcessor.from_pretrained(model_id)
        self.model = CLIPModel.from_pretrained(model_id)
        self.model.to(device=device)
        self.device = device

        # Trained classifier
        self.predictor = TorchPredictor.from_artifacts_dir(artifacts_dir=artifacts_dir)
        self.preprocessor = self.predictor.preprocessor

    def get_probabilities(self, url):
        image = Image.fromarray(np.uint8(url_to_array(url=url))).convert("RGB")
        inputs = self.processor(images=[image], return_tensors="pt", padding=True).to(self.device)
        with torch.inference_mode():
            embedding = self.model.get_image_features(**inputs).cpu().numpy()
        outputs = self.predictor.predict_probabilities(
            collate_fn({"embedding": embedding}))
        return {"probabilities": outputs["probabilities"][0]}

<div class="alert alert-block alert"> <b>🧱 Model composition</b>

Ray Serve makes it easy to do model composition where you can compose multiple deployments containing ML models or business logic into a single application. You can independently scale even fractional resources, and configure each of your deployments.

Application

python
# Define app.
api = FastAPI(
    title="doggos", 
    description="classify your dog", 
    version="0.1",
)

python
@serve.deployment
@serve.ingress(api)
class Doggos:
    def __init__(self, classifier):
        self.classifier = classifier
        
    @api.post("/predict/")
    async def predict(self, request: Request):
        data = await request.json()
        probabilities = await self.classifier.get_probabilities.remote(url=data["url"])
        return probabilities

python
# Model registry.
model_registry = "/mnt/cluster_storage/mlflow/doggos"
experiment_name = "doggos"
mlflow.set_tracking_uri(f"file:{model_registry}")

python
# Get best_run's artifact_dir.
sorted_runs = mlflow.search_runs(
    experiment_names=[experiment_name], 
    order_by=["metrics.val_loss ASC"])
best_run = sorted_runs.iloc[0]
artifacts_dir = urlparse(best_run.artifact_uri).path

python
# Define app.
app = Doggos.bind(
    classifier=ClassPredictor.bind(
        model_id="openai/clip-vit-base-patch32",
        artifacts_dir=artifacts_dir,
        device="cuda"
    )
)

python
# Run service locally.
serve.run(app, route_prefix="/")

python
# Send a request.
url = "https://doggos-dataset.s3.us-west-2.amazonaws.com/samara.png"
data = {"url": url}
response = requests.post("http://127.0.0.1:8000/predict/", json=data)
probabilities = response.json()["probabilities"]
sorted_probabilities = sorted(probabilities.items(), key=lambda x: x[1], reverse=True)
sorted_probabilities[0:3]

Ray Serve

Ray Serve is a highly scalable and flexible model serving library for building online inference APIs that allows you to:

  • Wrap models and business logic as separate serve deployments and connect them together (pipeline, ensemble, etc.)
  • Avoid one large service that's network and compute bounded and an inefficient use of resources.
  • Utilize fractional heterogeneous resources, which isn't possible with SageMaker, Vertex, KServe, etc., and horizontally scale withnum_replicas.
  • autoscale up and down based on traffic.
  • Integrate with FastAPI and HTTP.
  • Set up a gRPC service to build distributed systems and microservices.
  • Enable dynamic batching based on batch size, time, etc.
  • Access a suite of utilities for serving LLMs that are inference-engine agnostic and have batteries-included support for LLM-specific features such as multi-LoRA support

šŸ”„ RayTurbo Serve on Anyscale has more functionality on top of Ray Serve:

  • fast autoscaling and model loading to get services up and running even faster with 5x improvements even for LLMs.
  • 54% higher QPS and up-to 3x streaming tokens per second for high traffic serving use-cases with no proxy bottlenecks.
  • replica compaction into fewer nodes where possible to reduce resource fragmentation and improve hardware utilization.
  • zero-downtime incremental rollouts so your service is never interrupted.
  • different environments for each service in a multi-serve application.
  • multi availability-zone aware scheduling of Ray Serve replicas to provide higher redundancy to availability zone failures.

Observability

The Ray dashboard and specifically the Serve view automatically captures observability for Ray Serve applications. You can view the service deployments and their replicas and time-series metrics to see the service's health.

Production services

Anyscale Services (API ref) offers a fault tolerant, scalable, and optimized way to serve Ray Serve applications. You can:

  • rollout and update services with canary deployment and zero-downtime upgrades.
  • monitor services through a dedicated service page, unified log viewer, tracing, set up alerts, etc.
  • scale a service, with num_replicas=auto, and utilize replica compaction to consolidate nodes that are fractionally utilized.
  • get head node fault tolerance. OSS Ray recovers from failed workers and replicas but not head node crashes.
  • serve multiple applications in a single service.

Note:

  • This tutorial uses a containerfile to define dependencies, but you could easily use a pre-built image as well.
  • You can specify the compute as a compute config or inline in a Service config file.
  • When you don't specify compute while launching from a workspace, this configuration defaults to the compute configuration of the workspace.
bash
# Production online service.
anyscale service deploy -f /home/ray/default/configs/service.yaml
(anyscale +1.9s) Restarting existing service 'doggos-app'.
(anyscale +3.2s) Uploading local dir '/home/ray/default' to cloud storage.
(anyscale +5.2s) Including workspace-managed pip dependencies.
(anyscale +5.8s) Service 'doggos-app' deployed (version ID: akz9ul28).
(anyscale +5.8s) View the service in the UI: 'https://console.anyscale.com/services/service2_6hxismeqf1fkd2h7pfmljmncvm'
(anyscale +5.8s) Query the service once it's running using the following curl command (add the path you want to query):
(anyscale +5.8s) curl -H "Authorization: Bearer <BEARER_TOKEN>" https://doggos-app-bxauk.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com/
sh
curl -X POST "https://doggos-app-bxauk.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com/predict/" \
     -H "Authorization: Bearer <BEARER_TOKEN>" \
     -H "Content-Type: application/json" \
     -d '{"url": "https://doggos-dataset.s3.us-west-2.amazonaws.com/samara.png", "k": 4}'
bash
# Terminate service.
anyscale service terminate --name doggos-app
(anyscale +1.5s) Service service2_6hxismeqf1fkd2h7pfmljmncvm terminate initiated.
(anyscale +1.5s) View the service in the UI at https://console.anyscale.com/services/service2_6hxismeqf1fkd2h7pfmljmncvm

CI/CD

While Anyscale Jobs and Services are useful atomic concepts that help you productionize workloads, they're also useful for nodes in a larger ML DAG or CI/CD workflow. You can chain Jobs together, store results and then serve your application with those artifacts. From there, you can trigger updates to your service and retrigger the Jobs based on events, time, etc. While you can simply use the Anyscale CLI to integrate with any orchestration platform, Anyscale does support some purpose-built integrations like Airflow and Prefect.

🚨 Note: Reset this notebook using the "šŸ”„ Restart" button location at the notebook's menu bar. This way we can free up all the variables, utils, etc. used in this notebook.