Back to Ray

Scalable online XGBoost inference with Ray Serve

doc/source/ray-overview/examples/e2e-xgboost/notebooks/03-Serving.ipynb

1.13.115.5 KB
Original Source

Scalable online XGBoost inference with Ray Serve

<div align="left"> <a target="_blank" href="https://console.anyscale.com/"></a>&nbsp; <a href="https://github.com/anyscale/e2e-xgboost" role="button"></a>&nbsp; </div>

This tutorial launches an online service that:

  • deploys trained XGBoost model artifacts to generate predictions
  • autoscales based on real-time incoming traffic
  • covers observability and debugging around the service

Note that this notebook requires that you run the Distributed training of an XGBoost model tutorial to generate the pre-trained model artifacts that this tutorial fetches.

Ray Serve is a highly scalable and flexible model serving library for building online inference APIs. You can:

  • Wrap models and business logic as separate serve deployments and connect them together (pipeline, ensemble, etc.)
  • Avoid one large service that's network and compute bounded and an inefficient use of resources
  • Utilize fractional heterogeneous resources, which isn't possible with SageMaker, Vertex, KServe, etc., and horizontally scale, with num_replicas
  • Autoscale up and down based on traffic
  • Integrate with FastAPI and HTTP
  • Set up a gRPC service to build distributed systems and microservices
  • Enable dynamic batching based on batch size, time, etc.
  • Access a suite of utilities for serving LLMs that are inference-engine agnostic and have batteries-included support for LLM-specific features such as multi-LoRA support
python
%load_ext autoreload
%autoreload all
python
# Enable loading of the dist_xgboost module.
import os
import sys

sys.path.append(os.path.abspath(".."))
python
# Enable Ray Train v2.
os.environ["RAY_TRAIN_V2_ENABLED"] = "1"
# Now it's safe to import from ray.train.
python
import ray
import dist_xgboost

# Initialize Ray with the dist_xgboost package.
ray.init(runtime_env={"py_modules": [dist_xgboost]})

Loading the model

Next, load the pre-trained preprocessor and XGBoost model from the MLflow registry as demonstrated in the validation notebook.

Creating a Ray Serve deployment

Next, define the Ray Serve endpoint. Use a reusable class to avoid reloading the model and preprocessor for each request. The deployment supports both Pythonic and HTTP requests.

python
import pandas as pd
import xgboost
from ray import serve
from starlette.requests import Request

from dist_xgboost.data import load_model_and_preprocessor


@serve.deployment(num_replicas=2, max_ongoing_requests=25, ray_actor_options={"num_cpus": 2})
class XGBoostModel:
    def __init__(self):
        self.preprocessor, self.model = load_model_and_preprocessor()

    @serve.batch(max_batch_size=16, batch_wait_timeout_s=0.1)
    async def predict_batch(self, input_data: list[dict]) -> list[float]:
        print(f"Batch size: {len(input_data)}")
        # Convert list of dictionaries to DataFrame.
        input_df = pd.DataFrame(input_data)
        # Preprocess the input.
        preprocessed_batch = self.preprocessor.transform_batch(input_df)
        # Create DMatrix for prediction.
        dmatrix = xgboost.DMatrix(preprocessed_batch)
        # Get predictions.
        predictions = self.model.predict(dmatrix)
        return predictions.tolist()

    async def __call__(self, request: Request):
        # Parse the request body as JSON.
        input_data = await request.json()
        return await self.predict_batch(input_data)
<div class="alert alert-block alert"> <b>🧱 Model composition</b>

Ray Serve makes it extremely easy to do model composition where you can compose multiple deployments containing ML models or business logic into a single application. You can independently scale even fractional resources, and configure each of the deployments.

Ensure that you don't have any existing deployments first using serve.shutdown():

python
if "default" in serve.status().applications and serve.status().applications["default"].status == "RUNNING":
    print("Shutting down existing serve application")
    serve.shutdown()

Now that you've defined the deployment, you can create a ray.serve.Application using the .bind() method:

python
# Define the app.
xgboost_model = XGBoostModel.bind()

Preparing test data

Prepare some example data to test the deployment. Use a sample from the hold-out set:

python
sample_input = {
    "mean radius": 14.9,
    "mean texture": 22.53,
    "mean perimeter": 102.1,
    "mean area": 685.0,
    "mean smoothness": 0.09947,
    "mean compactness": 0.2225,
    "mean concavity": 0.2733,
    "mean concave points": 0.09711,
    "mean symmetry": 0.2041,
    "mean fractal dimension": 0.06898,
    "radius error": 0.253,
    "texture error": 0.8749,
    "perimeter error": 3.466,
    "area error": 24.19,
    "smoothness error": 0.006965,
    "compactness error": 0.06213,
    "concavity error": 0.07926,
    "concave points error": 0.02234,
    "symmetry error": 0.01499,
    "fractal dimension error": 0.005784,
    "worst radius": 16.35,
    "worst texture": 27.57,
    "worst perimeter": 125.4,
    "worst area": 832.7,
    "worst smoothness": 0.1419,
    "worst compactness": 0.709,
    "worst concavity": 0.9019,
    "worst concave points": 0.2475,
    "worst symmetry": 0.2866,
    "worst fractal dimension": 0.1155,
}
sample_target = 0  # Ground truth label

Running the service

There are two ways to run a Ray Serve service:

  1. Serve API: use the serve run CLI command, like serve run tutorial:xgboost_model.
  2. Pythonic API: use ray.serve's serve.run command, like serve.run(xgboost_model).

This example uses the Pythonic API:

python
from ray.serve.handle import DeploymentHandle

handle: DeploymentHandle = serve.run(xgboost_model, name="xgboost-breast-cancer-classifier")

You should see some logs indicating that the service is running locally:

bash
INFO 2025-04-09 14:06:55,760 serve 31684 -- Started Serve in namespace "serve".
INFO 2025-04-09 14:06:57,875 serve 31684 -- Application 'default' is ready at http://127.0.0.1:8000/.

You can also check whether it's running using serve.status():

python
serve.status().applications["xgboost-breast-cancer-classifier"].status == "RUNNING"

Querying the service

Using HTTP

The most common way to query services is with an HTTP request. This request invokes the __call__ method defined earlier:

python
import requests

url = "http://127.0.0.1:8000/"

prediction = requests.post(url, json=sample_input).json()

print(f"Prediction: {prediction:.4f}")
print(f"Ground truth: {sample_target}")

This approach works for processing an individual query, but isn't appropriate if you have many queries. Because requests.post is a blocking call, if you run it in a for loop you never benefit from Ray Serve's dynamic batching.

Instead, you want to fire many requests concurrently using asynchronous requests and let Ray Serve buffer and batch process them. You can use this approach with aiohttp:

python
import asyncio

import aiohttp


async def fetch(session, url, data):
    async with session.post(url, json=data) as response:
        return await response.json()


async def fetch_all(requests: list):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, input_item) for input_item in requests]
        responses = await asyncio.gather(*tasks)
        return responses
python
sample_input_list = [sample_input] * 100

# Notebook is already running an asyncio event loop in background, so use `await`.
# In other cases, you would use `asyncio.run(fetch_all(sample_input_list))`.
responses = await fetch_all(sample_input_list)
print(f"Finished processing {len(responses)} queries. Example result: {responses[0]}")

Using Python

For a more direct Pythonic way to query the model, you can use the deployment handle:

python
response = await handle.predict_batch.remote(sample_input)
print(response)

This approach is useful if you need to interact with the service from a different process in the same Ray Cluster. If you need to regenerate the serve handle, you can use serve.get_deployment_handle:

handle = serve.get_deployment_handle("XGBoostModel", "xgboost-breast-cancer-classifier")

<div class="alert alert-block alert"> <b>🔎 Observability for services</b>

The Ray dashboard automatically captures observability for Ray Serve applications in the Serve view. You can view the service deployments and their replicas and time-series metrics about the service's health.

python
# Shutdown service.
serve.shutdown()
<div class="alert alert-block alert"> <b>Anyscale Services</b>

Anyscale Services offers a fault tolerant, scalable and optimized way to serve Ray Serve applications. See the API ref for more details. You can:

  • rollout and update services with canary deployment and zero-downtime upgrades.
  • monitor services through a dedicated service page, unified log viewer, tracing, set up alerts, etc.
  • scale a service with num_replicas=auto and utilize replica compaction to consolidate nodes that are fractionally utilized.
  • have head node fault tolerance. OSS Ray recovers from failed workers and replicas but not head node crashes.
  • serving multiple applications in a single Service

RayTurbo Serve on Anyscale has more capabilities on top of Ray Serve:

  • fast autoscaling and model loading to get services up and running even faster with 5x improvements even for LLMs
  • 54% higher QPS and up-to 3x streaming tokens per second for high traffic serving use-cases with no proxy bottlenecks
  • replica compaction into fewer nodes where possible to reduce resource fragmentation and improve hardware utilization
  • zero-downtime incremental rollouts so the service is never interrupted
  • different environments for each service in a multi-serve application
  • multi availability-zone aware scheduling of Ray Serve replicas to provide higher redundancy to availability zone failures

Note:

  • This example uses a containerfile to define dependencies, but you could easily use a pre-built image as well.
  • You can specify the compute as a compute config or inline in a Service config file.
  • When you don't specify compute and you launch from a workspace, the default is the compute configuration of the workspace.
bash
# Production online service.
anyscale service deploy dist_xgboost.serve:xgboost_model --name=xgboost-breast_cancer_all_features \
  --containerfile="${WORKING_DIR}/containerfile" \
  --working-dir="${WORKING_DIR}" \
  --exclude=""

Note that for this command to succeed, you need to configure MLflow to store the artifacts in storage that's readable across clusters. Anyscale offers a variety of storage options that work out of the box, such as a default storage bucket, as well as automatically mounted network storage shared at the cluster, user, and cloud levels. You could also set up your own network mounts or storage buckets.

Running this command starts a service in production. In the process, Anyscale creates and saves a container image to enable fast starting this service in the future. The link to the endpoint and the bearer token appears in the logs. After the service is running remotely, you need to use the bearer token to query it. Here's how you would modify the preceding requests code to use this token:

python
# Service specific config. Replace with your own values from the preceding logs.
base_url = "https://xgboost-breast-cancer-all-features-jgz99.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com"
token = "tXhmYYY7qMbrb1ToO9_J3n5_kD7ym7Nirs8djtip7P0"

# Requests config.
path = "/"
full_url = f"{base_url}{path}"
headers = {"Authorization": f"Bearer {token}"}

prediction = requests.post(url, json=sample_input, headers=headers).json()

Don't forget to stop the service once it's no longer needed:

bash
anyscale service terminate --name e2e-xgboost
<div class="alert alert-block alert"> <b>CI/CD</b>

While Anyscale Jobs and Services are useful atomic concepts that help you productionize workloads, they're also convenient for nodes in a larger ML DAG or CI/CD workflow. You can chain Jobs together, store results, and then serve the application with those artifacts. From there, you can trigger updates to the service and retrigger the Jobs based on events, time, etc. While you can use the Anyscale CLI to integrate with any orchestration platform, Anyscale does support some purpose-built integrations like Airflow and Prefect.