doc/source/ray-overview/examples/e2e-xgboost/notebooks/03-Serving.ipynb
This tutorial launches an online service that:
Note that this notebook requires that you run the Distributed training of an XGBoost model tutorial to generate the pre-trained model artifacts that this tutorial fetches.
Ray Serve is a highly scalable and flexible model serving library for building online inference APIs. You can:
num_replicas%load_ext autoreload
%autoreload all
# Enable loading of the dist_xgboost module.
import os
import sys
sys.path.append(os.path.abspath(".."))
# Enable Ray Train v2.
os.environ["RAY_TRAIN_V2_ENABLED"] = "1"
# Now it's safe to import from ray.train.
import ray
import dist_xgboost
# Initialize Ray with the dist_xgboost package.
ray.init(runtime_env={"py_modules": [dist_xgboost]})
Next, load the pre-trained preprocessor and XGBoost model from the MLflow registry as demonstrated in the validation notebook.
Next, define the Ray Serve endpoint. Use a reusable class to avoid reloading the model and preprocessor for each request. The deployment supports both Pythonic and HTTP requests.
import pandas as pd
import xgboost
from ray import serve
from starlette.requests import Request
from dist_xgboost.data import load_model_and_preprocessor
@serve.deployment(num_replicas=2, max_ongoing_requests=25, ray_actor_options={"num_cpus": 2})
class XGBoostModel:
def __init__(self):
self.preprocessor, self.model = load_model_and_preprocessor()
@serve.batch(max_batch_size=16, batch_wait_timeout_s=0.1)
async def predict_batch(self, input_data: list[dict]) -> list[float]:
print(f"Batch size: {len(input_data)}")
# Convert list of dictionaries to DataFrame.
input_df = pd.DataFrame(input_data)
# Preprocess the input.
preprocessed_batch = self.preprocessor.transform_batch(input_df)
# Create DMatrix for prediction.
dmatrix = xgboost.DMatrix(preprocessed_batch)
# Get predictions.
predictions = self.model.predict(dmatrix)
return predictions.tolist()
async def __call__(self, request: Request):
# Parse the request body as JSON.
input_data = await request.json()
return await self.predict_batch(input_data)
Ray Serve makes it extremely easy to do model composition where you can compose multiple deployments containing ML models or business logic into a single application. You can independently scale even fractional resources, and configure each of the deployments.
Ensure that you don't have any existing deployments first using serve.shutdown():
if "default" in serve.status().applications and serve.status().applications["default"].status == "RUNNING":
print("Shutting down existing serve application")
serve.shutdown()
Now that you've defined the deployment, you can create a ray.serve.Application using the .bind() method:
# Define the app.
xgboost_model = XGBoostModel.bind()
Prepare some example data to test the deployment. Use a sample from the hold-out set:
sample_input = {
"mean radius": 14.9,
"mean texture": 22.53,
"mean perimeter": 102.1,
"mean area": 685.0,
"mean smoothness": 0.09947,
"mean compactness": 0.2225,
"mean concavity": 0.2733,
"mean concave points": 0.09711,
"mean symmetry": 0.2041,
"mean fractal dimension": 0.06898,
"radius error": 0.253,
"texture error": 0.8749,
"perimeter error": 3.466,
"area error": 24.19,
"smoothness error": 0.006965,
"compactness error": 0.06213,
"concavity error": 0.07926,
"concave points error": 0.02234,
"symmetry error": 0.01499,
"fractal dimension error": 0.005784,
"worst radius": 16.35,
"worst texture": 27.57,
"worst perimeter": 125.4,
"worst area": 832.7,
"worst smoothness": 0.1419,
"worst compactness": 0.709,
"worst concavity": 0.9019,
"worst concave points": 0.2475,
"worst symmetry": 0.2866,
"worst fractal dimension": 0.1155,
}
sample_target = 0 # Ground truth label
There are two ways to run a Ray Serve service:
serve run CLI command, like serve run tutorial:xgboost_model.ray.serve's serve.run command, like serve.run(xgboost_model).This example uses the Pythonic API:
from ray.serve.handle import DeploymentHandle
handle: DeploymentHandle = serve.run(xgboost_model, name="xgboost-breast-cancer-classifier")
You should see some logs indicating that the service is running locally:
INFO 2025-04-09 14:06:55,760 serve 31684 -- Started Serve in namespace "serve".
INFO 2025-04-09 14:06:57,875 serve 31684 -- Application 'default' is ready at http://127.0.0.1:8000/.
You can also check whether it's running using serve.status():
serve.status().applications["xgboost-breast-cancer-classifier"].status == "RUNNING"
The most common way to query services is with an HTTP request. This request invokes the __call__ method defined earlier:
import requests
url = "http://127.0.0.1:8000/"
prediction = requests.post(url, json=sample_input).json()
print(f"Prediction: {prediction:.4f}")
print(f"Ground truth: {sample_target}")
This approach works for processing an individual query, but isn't appropriate if you have many queries. Because requests.post is a blocking call, if you run it in a for loop you never benefit from Ray Serve's dynamic batching.
Instead, you want to fire many requests concurrently using asynchronous requests and let Ray Serve buffer and batch process them. You can use this approach with aiohttp:
import asyncio
import aiohttp
async def fetch(session, url, data):
async with session.post(url, json=data) as response:
return await response.json()
async def fetch_all(requests: list):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, input_item) for input_item in requests]
responses = await asyncio.gather(*tasks)
return responses
sample_input_list = [sample_input] * 100
# Notebook is already running an asyncio event loop in background, so use `await`.
# In other cases, you would use `asyncio.run(fetch_all(sample_input_list))`.
responses = await fetch_all(sample_input_list)
print(f"Finished processing {len(responses)} queries. Example result: {responses[0]}")
For a more direct Pythonic way to query the model, you can use the deployment handle:
response = await handle.predict_batch.remote(sample_input)
print(response)
This approach is useful if you need to interact with the service from a different process in the same Ray Cluster. If you need to regenerate the serve handle, you can use serve.get_deployment_handle:
handle = serve.get_deployment_handle("XGBoostModel", "xgboost-breast-cancer-classifier")
The Ray dashboard automatically captures observability for Ray Serve applications in the Serve view. You can view the service deployments and their replicas and time-series metrics about the service's health.
# Shutdown service.
serve.shutdown()
Anyscale Services offers a fault tolerant, scalable and optimized way to serve Ray Serve applications. See the API ref for more details. You can:
num_replicas=auto and utilize replica compaction to consolidate nodes that are fractionally utilized.RayTurbo Serve on Anyscale has more capabilities on top of Ray Serve:
Note:
containerfile to define dependencies, but you could easily use a pre-built image as well.# Production online service.
anyscale service deploy dist_xgboost.serve:xgboost_model --name=xgboost-breast_cancer_all_features \
--containerfile="${WORKING_DIR}/containerfile" \
--working-dir="${WORKING_DIR}" \
--exclude=""
Note that for this command to succeed, you need to configure MLflow to store the artifacts in storage that's readable across clusters. Anyscale offers a variety of storage options that work out of the box, such as a default storage bucket, as well as automatically mounted network storage shared at the cluster, user, and cloud levels. You could also set up your own network mounts or storage buckets.
Running this command starts a service in production. In the process, Anyscale creates and saves a container image to enable fast starting this service in the future. The link to the endpoint and the bearer token appears in the logs. After the service is running remotely, you need to use the bearer token to query it. Here's how you would modify the preceding requests code to use this token:
# Service specific config. Replace with your own values from the preceding logs.
base_url = "https://xgboost-breast-cancer-all-features-jgz99.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com"
token = "tXhmYYY7qMbrb1ToO9_J3n5_kD7ym7Nirs8djtip7P0"
# Requests config.
path = "/"
full_url = f"{base_url}{path}"
headers = {"Authorization": f"Bearer {token}"}
prediction = requests.post(url, json=sample_input, headers=headers).json()
Don't forget to stop the service once it's no longer needed:
anyscale service terminate --name e2e-xgboost
While Anyscale Jobs and Services are useful atomic concepts that help you productionize workloads, they're also convenient for nodes in a larger ML DAG or CI/CD workflow. You can chain Jobs together, store results, and then serve the application with those artifacts. From there, you can trigger updates to the service and retrigger the Jobs based on events, time, etc. While you can use the Anyscale CLI to integrate with any orchestration platform, Anyscale does support some purpose-built integrations like Airflow and Prefect.