doc/source/ray-overview/examples/e2e-xgboost/notebooks/02-Validation.ipynb
This tutorial executes a batch inference workload that connects the following heterogeneous workloads:
Note that this tutorial fetches the pre-trained model artifacts from the Distributed training of an XGBoost model tutorial.
The preceding figure illustrates how Ray Data can concurrently process different chunks of data at various stages of the pipeline. This parallel execution maximizes resource utilization and throughput.
Note that this diagram is a simplification for various reasons:
ā Traditional batch execution, non-streaming like Spark without pipelining and SageMaker Batch Transform:
ā Streaming execution with Ray Data:
Note: Ray Data isn't a real-time stream processing engine like Flink or Kafka Streams. Instead, it's batch processing with streaming execution, which is especially useful for iterative ML workloads, ETL pipelines, and preprocessing before training or inference. Ray typically has a 2-17x throughput improvement over solutions like Spark and SageMaker Batch Transform.
%load_ext autoreload
%autoreload all
# Enable importing from dist_xgboost package.
import os
import sys
sys.path.append(os.path.abspath(".."))
# Enable Ray Train v2.
os.environ["RAY_TRAIN_V2_ENABLED"] = "1"
# Now it's safe to import from ray.train.
import ray
import dist_xgboost
# Initialize Ray with the dist_xgboost package.
ray.init(runtime_env={"py_modules": [dist_xgboost]})
# Configure Ray Data logging.
ray.data.DataContext.get_current().enable_progress_bars = False
ray.data.DataContext.get_current().print_on_execution_start = False
The previous tutorial, Distributed Training with XGBoost, trained an XGBoost model and stored it in the MLflow artifact storage. In this step, use it to make predictions on the hold-out test set.
Load the test dataset using the same procedure as before:
from ray.data import Dataset
def prepare_data() -> tuple[Dataset, Dataset, Dataset]:
"""Load and split the dataset into train, validation, and test sets."""
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
seed = 42
train_dataset, rest = dataset.train_test_split(test_size=0.3, shuffle=True, seed=seed)
# 15% for validation, 15% for testing.
valid_dataset, test_dataset = rest.train_test_split(test_size=0.5, shuffle=True, seed=seed)
return train_dataset, valid_dataset, test_dataset
_, _, test_dataset = prepare_data()
# Use `take()` to trigger execution because Ray Data uses lazy evaluation mode.
test_dataset.take(1)
materialize() during development: The materialize() method executes and stores the dataset in Ray's shared memory object store. This behavior creates a checkpoint so future operations can start from this point instead of rerunning all operations from scratch.Next, transform the input data the same way you did during training. Fetch the preprocessor from the artifact registry:
import pickle
from dist_xgboost.constants import preprocessor_fname
from dist_xgboost.data import get_best_model_from_registry
best_run, best_artifacts_dir = get_best_model_from_registry()
with open(os.path.join(best_artifacts_dir, preprocessor_fname), "rb") as f:
preprocessor = pickle.load(f)
Now define the transformation step in the Ray Data pipeline. Instead of processing each item individually with .map(), use Ray Data's map_batches method to process entire batches at once, which is much more efficient:
def transform_with_preprocessor(batch_df, preprocessor):
# The preprocessor doesn't include the `target` column,
# so remove it temporarily, then add it back.
target = batch_df.pop("target")
transformed_features = preprocessor.transform_batch(batch_df)
transformed_features["target"] = target
return transformed_features
# Apply the transformation to each batch.
test_dataset = test_dataset.map_batches(
transform_with_preprocessor,
fn_kwargs={"preprocessor": preprocessor},
batch_format="pandas",
batch_size=1000,
)
test_dataset.show(1)
Now that you've defined the preprocessing pipeline, you're ready to run batch inference. Load the model from the artifact registry:
from ray.train import Checkpoint
from ray.train.xgboost import RayTrainReportCallback
checkpoint = Checkpoint.from_directory(best_artifacts_dir)
model = RayTrainReportCallback.get_model(checkpoint)
Next, run the inference step. To avoid repeatedly loading the model for each batch, define a reusable class that can use the same XGBoost model for different batches:
import pandas as pd
import xgboost
from dist_xgboost.data import load_model_and_preprocessor
class Validator:
def __init__(self):
_, self.model = load_model_and_preprocessor()
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
# Remove the target column for inference.
target = batch.pop("target")
dmatrix = xgboost.DMatrix(batch)
predictions = self.model.predict(dmatrix)
results = pd.DataFrame({"prediction": predictions, "target": target})
return results
Now parallelize inference across replicas of the model by processing data in batches:
test_predictions = test_dataset.map_batches(
Validator,
concurrency=4, # Number of model replicas.
batch_format="pandas",
)
test_predictions.show(1)
Now that you have predictions, evaluate the model's accuracy, precision, recall, and F1-score. Calculate the number of true positives, true negatives, false positives, and false negatives across the test subset:
from sklearn.metrics import confusion_matrix
def confusion_matrix_batch(batch, threshold=0.5):
# Apply a threshold to get binary predictions.
batch["prediction"] = (batch["prediction"] > threshold).astype(int)
result = {}
cm = confusion_matrix(batch["target"], batch["prediction"], labels=[0, 1])
result["TN"] = cm[0, 0]
result["FP"] = cm[0, 1]
result["FN"] = cm[1, 0]
result["TP"] = cm[1, 1]
return pd.DataFrame(result, index=[0])
test_results = test_predictions.map_batches(confusion_matrix_batch, batch_format="pandas", batch_size=1000)
Finally, aggregate the confusion matrix results from all batches to get the global counts. This step materializes the dataset and executes all previously declared lazy transformations:
# Sum all confusion matrix values across batches.
cm_sums = test_results.sum(["TN", "FP", "FN", "TP"])
# Extract confusion matrix components.
tn = cm_sums["sum(TN)"]
fp = cm_sums["sum(FP)"]
fn = cm_sums["sum(FN)"]
tp = cm_sums["sum(TP)"]
# Calculate metrics.
accuracy = (tp + tn) / (tp + tn + fp + fn)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
metrics = {"precision": precision, "recall": recall, "f1": f1, "accuracy": accuracy}
print("Validation results:")
for key, value in metrics.items():
print(f"{key}: {value:.4f}")
The following is the expected output:
Validation results:
precision: 0.9574
recall: 1.0000
f1: 0.9783
accuracy: 0.9767
Ray Data provides built-in observability features to help you monitor and debug data processing pipelines:
You can wrap the training workload as a production-grade Anyscale Job. See the API ref:
# Production batch job.
anyscale job submit --name=validate-xboost-breast-cancer-model \
--containerfile="${WORKING_DIR}/containerfile" \
--working-dir="${WORKING_DIR}" \
--exclude="" \
--max-retries=0 \
-- python dist_xgboost/infer.py
Note that in order for this command to succeed, first configure MLflow to store the artifacts in storage that's readable across clusters. Anyscale offers a variety of storage options that work out of the box, such as a default storage bucket, as well as automatically mounted network storage shared at the cluster, user, and cloud levels. You could also set up your own network mounts or storage buckets.
In this tutorial, you:
The same code can efficiently run on terabyte-scale datasets without modifications using Ray Data's distributed processing capabilities.
The next tutorial shows how to serve this XGBoost model for online inference using Ray Serve.