Back to Ray

Model validation using offline batch inference

doc/source/ray-overview/examples/e2e-xgboost/notebooks/02-Validation.ipynb

1.13.111.5 KB
Original Source

Model validation using offline batch inference

<div align="left"> <a target="_blank" href="https://console.anyscale.com/"></a>&nbsp; <a href="https://github.com/anyscale/e2e-xgboost" role="button"></a>&nbsp; </div>

This tutorial executes a batch inference workload that connects the following heterogeneous workloads:

  • Distributed read from cloud storage
  • Distributed preprocessing
  • Parallel batch inference
  • Distributed aggregation of summary metrics

Note that this tutorial fetches the pre-trained model artifacts from the Distributed training of an XGBoost model tutorial.

The preceding figure illustrates how Ray Data can concurrently process different chunks of data at various stages of the pipeline. This parallel execution maximizes resource utilization and throughput.

Note that this diagram is a simplification for various reasons:

  • Backpressure mechanisms may throttle upstream operators to prevent overwhelming downstream stages
  • Dynamic repartitioning often occurs as data moves through the pipeline, changing block counts and sizes
  • Available resources change as the cluster autoscales
  • System failures may disrupt the clean sequential flow shown in the diagram
<div class="alert alert-block alert"> <b> Ray Data streaming execution</b>

āŒ Traditional batch execution, non-streaming like Spark without pipelining and SageMaker Batch Transform:

  • Reads the entire dataset into memory or a persistent intermediate format
  • Only then starts applying transformations like .map, .filter, etc.
  • Higher memory pressure and startup latency

āœ… Streaming execution with Ray Data:

  • Starts processing chunks ("blocks") as they're loaded without needing to wait for entire dataset to load
  • Reduces memory footprint, no out-of-memory errors, and speeds up time to first output
  • Increases resource utilization by reducing idle time
  • Enables online-style inference pipelines with minimal latency

Note: Ray Data isn't a real-time stream processing engine like Flink or Kafka Streams. Instead, it's batch processing with streaming execution, which is especially useful for iterative ML workloads, ETL pipelines, and preprocessing before training or inference. Ray typically has a 2-17x throughput improvement over solutions like Spark and SageMaker Batch Transform.

python
%load_ext autoreload
%autoreload all
python
# Enable importing from dist_xgboost package.
import os
import sys

sys.path.append(os.path.abspath(".."))
python
# Enable Ray Train v2.
os.environ["RAY_TRAIN_V2_ENABLED"] = "1"
# Now it's safe to import from ray.train.
python
import ray
import dist_xgboost

# Initialize Ray with the dist_xgboost package.
ray.init(runtime_env={"py_modules": [dist_xgboost]})

# Configure Ray Data logging.
ray.data.DataContext.get_current().enable_progress_bars = False
ray.data.DataContext.get_current().print_on_execution_start = False

Validating the model using Ray Data

The previous tutorial, Distributed Training with XGBoost, trained an XGBoost model and stored it in the MLflow artifact storage. In this step, use it to make predictions on the hold-out test set.

Data ingestion

Load the test dataset using the same procedure as before:

python
from ray.data import Dataset


def prepare_data() -> tuple[Dataset, Dataset, Dataset]:
    """Load and split the dataset into train, validation, and test sets."""
    dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
    seed = 42
    train_dataset, rest = dataset.train_test_split(test_size=0.3, shuffle=True, seed=seed)
    # 15% for validation, 15% for testing.
    valid_dataset, test_dataset = rest.train_test_split(test_size=0.5, shuffle=True, seed=seed)
    return train_dataset, valid_dataset, test_dataset


_, _, test_dataset = prepare_data()
# Use `take()` to trigger execution because Ray Data uses lazy evaluation mode.
test_dataset.take(1)
<div class="alert alert-block alert"> <b>šŸ’” Ray Data best practices</b>
  • Use materialize() during development: The materialize() method executes and stores the dataset in Ray's shared memory object store. This behavior creates a checkpoint so future operations can start from this point instead of rerunning all operations from scratch.
  • Choose appropriate shuffling strategies: Ray Data provides various shuffling strategies including local shuffles and per-epoch shuffles. You need to shuffle this dataset because the original data groups items by class.

Next, transform the input data the same way you did during training. Fetch the preprocessor from the artifact registry:

python
import pickle

from dist_xgboost.constants import preprocessor_fname
from dist_xgboost.data import get_best_model_from_registry

best_run, best_artifacts_dir = get_best_model_from_registry()

with open(os.path.join(best_artifacts_dir, preprocessor_fname), "rb") as f:
    preprocessor = pickle.load(f)

Now define the transformation step in the Ray Data pipeline. Instead of processing each item individually with .map(), use Ray Data's map_batches method to process entire batches at once, which is much more efficient:

python
def transform_with_preprocessor(batch_df, preprocessor):
    # The preprocessor doesn't include the `target` column,
    # so remove it temporarily, then add it back.
    target = batch_df.pop("target")
    transformed_features = preprocessor.transform_batch(batch_df)
    transformed_features["target"] = target
    return transformed_features


# Apply the transformation to each batch.
test_dataset = test_dataset.map_batches(
    transform_with_preprocessor,
    fn_kwargs={"preprocessor": preprocessor},
    batch_format="pandas",
    batch_size=1000,
)

test_dataset.show(1)

Load the trained model

Now that you've defined the preprocessing pipeline, you're ready to run batch inference. Load the model from the artifact registry:

python
from ray.train import Checkpoint
from ray.train.xgboost import RayTrainReportCallback

checkpoint = Checkpoint.from_directory(best_artifacts_dir)
model = RayTrainReportCallback.get_model(checkpoint)

Run batch inference

Next, run the inference step. To avoid repeatedly loading the model for each batch, define a reusable class that can use the same XGBoost model for different batches:

python
import pandas as pd
import xgboost

from dist_xgboost.data import load_model_and_preprocessor


class Validator:
    def __init__(self):
        _, self.model = load_model_and_preprocessor()

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        # Remove the target column for inference.
        target = batch.pop("target")
        dmatrix = xgboost.DMatrix(batch)
        predictions = self.model.predict(dmatrix)

        results = pd.DataFrame({"prediction": predictions, "target": target})
        return results

Now parallelize inference across replicas of the model by processing data in batches:

python
test_predictions = test_dataset.map_batches(
    Validator,
    concurrency=4,  # Number of model replicas.
    batch_format="pandas",
)

test_predictions.show(1)

Calculate evaluation metrics

Now that you have predictions, evaluate the model's accuracy, precision, recall, and F1-score. Calculate the number of true positives, true negatives, false positives, and false negatives across the test subset:

python
from sklearn.metrics import confusion_matrix


def confusion_matrix_batch(batch, threshold=0.5):
    # Apply a threshold to get binary predictions.
    batch["prediction"] = (batch["prediction"] > threshold).astype(int)

    result = {}
    cm = confusion_matrix(batch["target"], batch["prediction"], labels=[0, 1])
    result["TN"] = cm[0, 0]
    result["FP"] = cm[0, 1]
    result["FN"] = cm[1, 0]
    result["TP"] = cm[1, 1]
    return pd.DataFrame(result, index=[0])


test_results = test_predictions.map_batches(confusion_matrix_batch, batch_format="pandas", batch_size=1000)

Finally, aggregate the confusion matrix results from all batches to get the global counts. This step materializes the dataset and executes all previously declared lazy transformations:

python
# Sum all confusion matrix values across batches.
cm_sums = test_results.sum(["TN", "FP", "FN", "TP"])

# Extract confusion matrix components.
tn = cm_sums["sum(TN)"]
fp = cm_sums["sum(FP)"]
fn = cm_sums["sum(FN)"]
tp = cm_sums["sum(TP)"]

# Calculate metrics.
accuracy = (tp + tn) / (tp + tn + fp + fn)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

metrics = {"precision": precision, "recall": recall, "f1": f1, "accuracy": accuracy}
python
print("Validation results:")
for key, value in metrics.items():
    print(f"{key}: {value:.4f}")

The following is the expected output:

Validation results:
precision: 0.9574
recall: 1.0000
f1: 0.9783
accuracy: 0.9767

Observability

Ray Data provides built-in observability features to help you monitor and debug data processing pipelines:

Production deployment

You can wrap the training workload as a production-grade Anyscale Job. See the API ref:

bash
# Production batch job.
anyscale job submit --name=validate-xboost-breast-cancer-model \
  --containerfile="${WORKING_DIR}/containerfile" \
  --working-dir="${WORKING_DIR}" \
  --exclude="" \
  --max-retries=0 \
  -- python dist_xgboost/infer.py

Note that in order for this command to succeed, first configure MLflow to store the artifacts in storage that's readable across clusters. Anyscale offers a variety of storage options that work out of the box, such as a default storage bucket, as well as automatically mounted network storage shared at the cluster, user, and cloud levels. You could also set up your own network mounts or storage buckets.

Summary

In this tutorial, you:

  1. Loaded a test dataset using distributed reads from cloud storage
  2. Transformed the dataset in a streaming fashion with the same preprocessor used during training
  3. Treated a validation pipeline to:
    • Make predictions on the test data using multiple model replicas
    • Calculate confusion matrix components for each batch
    • Aggregate results across all batches
  4. Computed key performance metrics, like precision, recall, F1-score, and accuracy

The same code can efficiently run on terabyte-scale datasets without modifications using Ray Data's distributed processing capabilities.

The next tutorial shows how to serve this XGBoost model for online inference using Ray Serve.