Back to Ray

Distributed training of an XGBoost model

doc/source/ray-overview/examples/e2e-xgboost/notebooks/01-Distributed_Training.ipynb

1.13.117.0 KB
Original Source

Distributed training of an XGBoost model

<div align="left"> <a target="_blank" href="https://console.anyscale.com/"></a>&nbsp; <a href="https://github.com/anyscale/e2e-xgboost" role="button"></a>&nbsp; </div>

This tutorial executes a distributed training workload that connects the following steps with heterogeneous compute requirements:

  • Preprocessing the dataset with Ray Data
  • Distributed training of an XGBoost model with Ray Train
  • Saving model artifacts to a model registry with MLflow

Note: This tutorial doesn't including tuning of the model. See Ray Tune for experiment execution and hyperparameter tuning.

Dependencies

To install the dependencies, run the following:

bash
pip install -r requirements.txt

Setup

Import the necessary modules:

python
%load_ext autoreload
%autoreload all
python
# Enable importing from dist_xgboost module.
import os
import sys

sys.path.append(os.path.abspath(".."))
python
# Enable Ray Train v2. This is the default in an upcoming release.
os.environ["RAY_TRAIN_V2_ENABLED"] = "1"
# Now it's safe to import from ray.train
python
import ray

from dist_xgboost.constants import storage_path, preprocessor_path
python
# Make Ray data less verbose.
ray.data.DataContext.get_current().enable_progress_bars = False
ray.data.DataContext.get_current().print_on_execution_start = False

Dataset preparation

This example uses the Breast Cancer Wisconsin (Diagnostic) dataset, which contains features computed from digitized images of breast mass cell nuclei.

Split the data into:

  • 70% for training
  • 15% for validation
  • 15% for testing
python
from ray.data import Dataset


def prepare_data() -> tuple[Dataset, Dataset, Dataset]:
    """Load and split the dataset into train, validation, and test sets."""
    # Load the dataset from S3.
    dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
    seed = 42

    # Split 70% for training.
    train_dataset, rest = dataset.train_test_split(test_size=0.3, shuffle=True, seed=seed)
    # Split the remaining 30% into 15% validation and 15% testing.
    valid_dataset, test_dataset = rest.train_test_split(test_size=0.5, shuffle=True, seed=seed)
    return train_dataset, valid_dataset, test_dataset
python
# Load and split the dataset.
train_dataset, valid_dataset, _test_dataset = prepare_data()
train_dataset.take(1)

Look at the output to see that the dataset contains features characterizing cell nuclei in breast mass, such as radius, texture, perimeter, area, smoothness, compactness, concavity, symmetry, and more.

Data preprocessing

Notice that the features have different magnitudes and ranges. While tree-based models like XGBoost aren't as sensitive to these differences, feature scaling can still improve numerical stability in some cases.

Ray Data has built-in preprocessors that simplify common feature preprocessing tasks, especially for tabular data. You can integrate these preprocessors with Ray Datasets, to preprocess data in a fault-tolerant and distributed way.

This example uses Ray's built-in StandardScaler to zero-center and normalize the features:

python
from ray.data.preprocessors import StandardScaler


def train_preprocessor(train_dataset: ray.data.Dataset) -> StandardScaler:
    # Pick some dataset columns to scale.
    columns_to_scale = [c for c in train_dataset.columns() if c != "target"]

    # Initialize the preprocessor.
    preprocessor = StandardScaler(columns=columns_to_scale)
    # Train the preprocessor on the training set.
    preprocessor.fit(train_dataset)

    return preprocessor


preprocessor = train_preprocessor(train_dataset)

Now that you've fit the preprocessor, save it to a file. Register this artifact later in MLflow so you can reuse it in downstream pipelines.

python
import pickle

with open(preprocessor_path, "wb") as f:
    pickle.dump(preprocessor, f)

Next, transform the datasets using the fitted preprocessor. Note that the transform() operation is lazy. Ray Data won't apply it to the data until the Ray Train workers require the data:

python
train_dataset = preprocessor.transform(train_dataset)
valid_dataset = preprocessor.transform(valid_dataset)
train_dataset.take(1)

Using take(), to see that Ray Data zero-centered and rescaled the values to be roughly between -1 and 1.

Data processing note:
For more advanced data loading and preprocessing techniques, see the comprehensive guide. Ray Data also supports performant joins, filters, aggregations, and other operations for more structured data processing, if required.

Model training with XGBoost

Checkpointing configuration

Checkpointing is a powerful feature that enables you to resume training from the last checkpoint in case of interruptions. Checkpointing is particularly useful for long-running training sessions.

XGBoostTrainer implements checkpointing out of the box. Configure CheckpointConfig to set the checkpointing frequency.

python
from ray.train import CheckpointConfig, Result, RunConfig, ScalingConfig

# Configure checkpointing to save progress during training.
run_config = RunConfig(
    checkpoint_config=CheckpointConfig(
        # Checkpoint every 10 iterations.
        checkpoint_frequency=10,
        # Only keep the latest checkpoint.
        num_to_keep=1,
    ),
    ## For multi-node clusters, configure storage that's accessible
    ## across all worker nodes with `storage_path="s3://..."`.
    storage_path=storage_path,
)

Note: Once you enable checkpointing, you can follow this guide to enable fault tolerance.

Training with XGBoost

Pass training parameters as a dictionary, similar to the original xgboost.train() function:

python
import xgboost
from ray.train.xgboost import RayTrainReportCallback, XGBoostTrainer

NUM_WORKERS = 4
USE_GPU = True


def train_fn_per_worker(config: dict):
    """Training function that runs on each worker.

    This function:
    1. Gets the dataset shard for this worker
    2. Converts to pandas for XGBoost
    3. Separates features and labels
    4. Creates DMatrix objects
    5. Trains the model using distributed communication
    """
    # Get this worker's dataset shard.
    train_ds, val_ds = (
        ray.train.get_dataset_shard("train"),
        ray.train.get_dataset_shard("validation"),
    )

    # Materialize the data and convert to pandas.
    train_ds = train_ds.materialize().to_pandas()
    val_ds = val_ds.materialize().to_pandas()

    # Separate the labels from the features.
    train_X, train_y = train_ds.drop("target", axis=1), train_ds["target"]
    eval_X, eval_y = val_ds.drop("target", axis=1), val_ds["target"]

    # Convert the data into DMatrix format for XGBoost.
    dtrain = xgboost.DMatrix(train_X, label=train_y)
    deval = xgboost.DMatrix(eval_X, label=eval_y)

    # Do distributed data-parallel training.
    # Ray Train sets up the necessary coordinator processes and
    # environment variables for workers to communicate with each other.
    _booster = xgboost.train(
        config["xgboost_params"],
        dtrain=dtrain,
        evals=[(dtrain, "train"), (deval, "validation")],
        num_boost_round=10,
        # Handles metric logging and checkpointing.
        callbacks=[RayTrainReportCallback()],
    )


# Parameters for the XGBoost model.
model_config = {
    "xgboost_params": {
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    }
}

trainer = XGBoostTrainer(
    train_fn_per_worker,
    train_loop_config=model_config,
    # Register the data subsets.
    datasets={"train": train_dataset, "validation": valid_dataset},
    # See "Scaling strategies" for more details.
    scaling_config=ScalingConfig(
        # Number of workers for data parallelism.
        num_workers=NUM_WORKERS,
        # Set to True to use GPU acceleration.
        use_gpu=USE_GPU,
    ),
    run_config=run_config,
)

Ray Train benefits:

  • Multi-node orchestration: Automatically handles multi-node, multi-GPU setup without manual SSH or hostfile configurations
  • Built-in fault tolerance: Supports automatic retry of failed workers and can continue from the last checkpoint
  • Flexible training strategies: Supports various parallelism strategies beyond just data parallel training
  • Heterogeneous cluster support: Define per-worker resource requirements and run on mixed hardware

Ray Train integrates with popular frameworks like PyTorch, TensorFlow, XGBoost, and more. For enterprise needs, RayTurbo Train offers additional features like elastic training, advanced monitoring, and performance optimization.

Next, train the model:

python
result: Result = trainer.fit()
result

At the beginning of the training job, Ray started requesting GPU nodes to satisfy the training job's requirement of five GPU workers.

Ray Train returns a ray.train.Result object, which contains important properties such as metrics, checkpoint information, and error details:

python
metrics = result.metrics
metrics

The expected output are similar to the following:

python
OrderedDict([('train-logloss', 0.05463397157248817),
             ('train-error', 0.00506329113924051),
             ('validation-logloss', 0.06741214815308066),
             ('validation-error', 0.01176470588235294)])

See that the Ray Train logs metrics based on the values you configured in eval_metric and evals.

You can also reconstruct the trained model from the checkpoint directory:

python
booster = RayTrainReportCallback.get_model(result.checkpoint)
booster

Model registry

Now that you've trained the model, save it to a model registry for future use. As this is a distributed training workload, the model registry storage needs to be accessible from all workers in the cluster. This storage can be S3, NFS, or another network-attached solution. Anyscale simplifies this process by automatically creating and mounting shared storage options on every cluster node, ensuring that model artifacts are readable and writable across the distributed environment.

The MLflow tracking server stores experiment metadata and model artifacts in the shared storage location, making them available for future model serving, evaluation, or retraining workflows. Ray also integrates with other experiment trackers.

python
import shutil
from tempfile import TemporaryDirectory

import mlflow

from dist_xgboost.constants import (
    experiment_name,
    model_fname,
    model_registry,
    preprocessor_fname,
)


def clean_up_old_runs():
    # Clean up old MLflow runs.
    os.path.isdir(model_registry) and shutil.rmtree(model_registry)
    # mlflow.delete_experiment(experiment_name)
    os.makedirs(model_registry, exist_ok=True)


def log_run_to_mlflow(model_config, result, preprocessor_path):
    # Create a model registry in user storage.
    mlflow.set_tracking_uri(f"file:{model_registry}")

    # Create a new experiment and log metrics and artifacts.
    mlflow.set_experiment(experiment_name)
    with mlflow.start_run(description="xgboost breast cancer classifier on all features"):
        mlflow.log_params(model_config)
        mlflow.log_metrics(result.metrics)

        # Selectively log just the preprocessor and model weights.
        with TemporaryDirectory() as tmp_dir:
            shutil.copy(
                os.path.join(result.checkpoint.path, model_fname),
                os.path.join(tmp_dir, model_fname),
            )
            shutil.copy(
                preprocessor_path,
                os.path.join(tmp_dir, preprocessor_fname),
            )

            mlflow.log_artifacts(tmp_dir)


clean_up_old_runs()
log_run_to_mlflow(model_config, result, preprocessor_path)

Start the MLflow server to view the experiments:

mlflow server -h 0.0.0.0 -p 8080 --backend-store-uri {model_registry}

To view the dashboard, go to the Overview tab > Open Ports > 8080.

You can also view the Ray Dashboard and Train workload dashboards:

You can retrieve the best model from the registry:

python
from dist_xgboost.data import get_best_model_from_registry

best_model, artifacts_dir = get_best_model_from_registry()
artifacts_dir

Production deployment

You can wrap the training workload as a production-grade Anyscale Job. See the API ref for more details.

python
from dist_xgboost.constants import root_dir

os.environ["WORKING_DIR"] = root_dir

Then submit the job using the anyscale CLI command:

python
%%bash

# Production batch job -- note that this is a bash cell
! anyscale job submit --name=train-xboost-breast-cancer-model \
  --containerfile="${WORKING_DIR}/containerfile" \
  --working-dir="${WORKING_DIR}" \
  --exclude="" \
  --max-retries=0 \
  --wait \
  -- cd notebooks && jupyter nbconvert --to script 01-Distributed_Training.ipynb && ipython 01-Distributed_Training.py
  • The containerfile defines the dependencies, but you can also use a pre-built image
  • You can specify compute requirements as a compute config or inline in a job config
  • When launched from a workspace without specifying compute, it defaults to the compute configuration of the workspace

Scaling strategies

One of the key advantages of Ray Train is its ability to effortlessly scale training workloads. By adjusting the ScalingConfig, you can optimize resource utilization and reduce training time.

Scaling examples

Multi-node CPU example: 4 nodes with 8 CPUs each

python
scaling_config = ScalingConfig(
    num_workers=4,
    resources_per_worker={"CPU": 8},
)

Single-node multi-GPU example: 1 node with 8 CPUs and 4 GPUs

python
scaling_config = ScalingConfig(
    num_workers=4,
    use_gpu=True,
)

Multi-node multi-GPU example: 4 nodes with 8 CPUs and 4 GPUs each

python
scaling_config = ScalingConfig(
    num_workers=16,
    use_gpu=True,
)

Important: For multi-node clusters, you must specify a shared storage location, such as cloud storage or NFS, in the run_config. Using a local path raises an error during checkpointing.

python
trainer = XGBoostTrainer(
    ..., run_config=ray.train.RunConfig(storage_path="s3://...")
)

Worker configuration guidelines

The optimal number of workers depends on the workload and cluster setup:

  • For CPU-only training, generally use one worker per node. XGBoost can leverage multiple CPUs with threading.
  • For multi-GPU training, use one worker per GPU.
  • For heterogeneous clusters, consider the greatest common divisor of CPU counts.

GPU acceleration

To use GPUs for training:

  1. Start one actor per GPU with use_gpu=True
  2. Set GPU-compatible parameters, for example, tree_method="gpu_hist" for XGBoost
  3. Divide CPUs evenly across actors on each machine

Example:

python
trainer = XGBoostTrainer(
    scaling_config=ScalingConfig(
        # Number of workers to use for data parallelism.
        num_workers=2,
        # Whether to use GPU acceleration.
        use_gpu=True,
    ),
    params={
        # XGBoost specific params.
        "tree_method": "gpu_hist",  # GPU-specific parameter
        "eval_metric": ["logloss", "error"],
    },
    ...
)

For more advanced topics, see: