doc/source/ray-overview/examples/e2e-xgboost/notebooks/01-Distributed_Training.ipynb
This tutorial executes a distributed training workload that connects the following steps with heterogeneous compute requirements:
Note: This tutorial doesn't including tuning of the model. See Ray Tune for experiment execution and hyperparameter tuning.
To install the dependencies, run the following:
pip install -r requirements.txt
Import the necessary modules:
%load_ext autoreload
%autoreload all
# Enable importing from dist_xgboost module.
import os
import sys
sys.path.append(os.path.abspath(".."))
# Enable Ray Train v2. This is the default in an upcoming release.
os.environ["RAY_TRAIN_V2_ENABLED"] = "1"
# Now it's safe to import from ray.train
import ray
from dist_xgboost.constants import storage_path, preprocessor_path
# Make Ray data less verbose.
ray.data.DataContext.get_current().enable_progress_bars = False
ray.data.DataContext.get_current().print_on_execution_start = False
This example uses the Breast Cancer Wisconsin (Diagnostic) dataset, which contains features computed from digitized images of breast mass cell nuclei.
Split the data into:
from ray.data import Dataset
def prepare_data() -> tuple[Dataset, Dataset, Dataset]:
"""Load and split the dataset into train, validation, and test sets."""
# Load the dataset from S3.
dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
seed = 42
# Split 70% for training.
train_dataset, rest = dataset.train_test_split(test_size=0.3, shuffle=True, seed=seed)
# Split the remaining 30% into 15% validation and 15% testing.
valid_dataset, test_dataset = rest.train_test_split(test_size=0.5, shuffle=True, seed=seed)
return train_dataset, valid_dataset, test_dataset
# Load and split the dataset.
train_dataset, valid_dataset, _test_dataset = prepare_data()
train_dataset.take(1)
Look at the output to see that the dataset contains features characterizing cell nuclei in breast mass, such as radius, texture, perimeter, area, smoothness, compactness, concavity, symmetry, and more.
Notice that the features have different magnitudes and ranges. While tree-based models like XGBoost aren't as sensitive to these differences, feature scaling can still improve numerical stability in some cases.
Ray Data has built-in preprocessors that simplify common feature preprocessing tasks, especially for tabular data. You can integrate these preprocessors with Ray Datasets, to preprocess data in a fault-tolerant and distributed way.
This example uses Ray's built-in StandardScaler to zero-center and normalize the features:
from ray.data.preprocessors import StandardScaler
def train_preprocessor(train_dataset: ray.data.Dataset) -> StandardScaler:
# Pick some dataset columns to scale.
columns_to_scale = [c for c in train_dataset.columns() if c != "target"]
# Initialize the preprocessor.
preprocessor = StandardScaler(columns=columns_to_scale)
# Train the preprocessor on the training set.
preprocessor.fit(train_dataset)
return preprocessor
preprocessor = train_preprocessor(train_dataset)
Now that you've fit the preprocessor, save it to a file. Register this artifact later in MLflow so you can reuse it in downstream pipelines.
import pickle
with open(preprocessor_path, "wb") as f:
pickle.dump(preprocessor, f)
Next, transform the datasets using the fitted preprocessor. Note that the transform() operation is lazy. Ray Data won't apply it to the data until the Ray Train workers require the data:
train_dataset = preprocessor.transform(train_dataset)
valid_dataset = preprocessor.transform(valid_dataset)
train_dataset.take(1)
Using take(), to see that Ray Data zero-centered and rescaled the values to be roughly between -1 and 1.
Data processing note:
For more advanced data loading and preprocessing techniques, see the comprehensive guide. Ray Data also supports performant joins, filters, aggregations, and other operations for more structured data processing, if required.
Checkpointing is a powerful feature that enables you to resume training from the last checkpoint in case of interruptions. Checkpointing is particularly useful for long-running training sessions.
XGBoostTrainer implements checkpointing out of the box. Configure CheckpointConfig to set the checkpointing frequency.
from ray.train import CheckpointConfig, Result, RunConfig, ScalingConfig
# Configure checkpointing to save progress during training.
run_config = RunConfig(
checkpoint_config=CheckpointConfig(
# Checkpoint every 10 iterations.
checkpoint_frequency=10,
# Only keep the latest checkpoint.
num_to_keep=1,
),
## For multi-node clusters, configure storage that's accessible
## across all worker nodes with `storage_path="s3://..."`.
storage_path=storage_path,
)
Note: Once you enable checkpointing, you can follow this guide to enable fault tolerance.
Pass training parameters as a dictionary, similar to the original xgboost.train() function:
import xgboost
from ray.train.xgboost import RayTrainReportCallback, XGBoostTrainer
NUM_WORKERS = 4
USE_GPU = True
def train_fn_per_worker(config: dict):
"""Training function that runs on each worker.
This function:
1. Gets the dataset shard for this worker
2. Converts to pandas for XGBoost
3. Separates features and labels
4. Creates DMatrix objects
5. Trains the model using distributed communication
"""
# Get this worker's dataset shard.
train_ds, val_ds = (
ray.train.get_dataset_shard("train"),
ray.train.get_dataset_shard("validation"),
)
# Materialize the data and convert to pandas.
train_ds = train_ds.materialize().to_pandas()
val_ds = val_ds.materialize().to_pandas()
# Separate the labels from the features.
train_X, train_y = train_ds.drop("target", axis=1), train_ds["target"]
eval_X, eval_y = val_ds.drop("target", axis=1), val_ds["target"]
# Convert the data into DMatrix format for XGBoost.
dtrain = xgboost.DMatrix(train_X, label=train_y)
deval = xgboost.DMatrix(eval_X, label=eval_y)
# Do distributed data-parallel training.
# Ray Train sets up the necessary coordinator processes and
# environment variables for workers to communicate with each other.
_booster = xgboost.train(
config["xgboost_params"],
dtrain=dtrain,
evals=[(dtrain, "train"), (deval, "validation")],
num_boost_round=10,
# Handles metric logging and checkpointing.
callbacks=[RayTrainReportCallback()],
)
# Parameters for the XGBoost model.
model_config = {
"xgboost_params": {
"objective": "binary:logistic",
"eval_metric": ["logloss", "error"],
}
}
trainer = XGBoostTrainer(
train_fn_per_worker,
train_loop_config=model_config,
# Register the data subsets.
datasets={"train": train_dataset, "validation": valid_dataset},
# See "Scaling strategies" for more details.
scaling_config=ScalingConfig(
# Number of workers for data parallelism.
num_workers=NUM_WORKERS,
# Set to True to use GPU acceleration.
use_gpu=USE_GPU,
),
run_config=run_config,
)
Ray Train benefits:
- Multi-node orchestration: Automatically handles multi-node, multi-GPU setup without manual SSH or hostfile configurations
- Built-in fault tolerance: Supports automatic retry of failed workers and can continue from the last checkpoint
- Flexible training strategies: Supports various parallelism strategies beyond just data parallel training
- Heterogeneous cluster support: Define per-worker resource requirements and run on mixed hardware
Ray Train integrates with popular frameworks like PyTorch, TensorFlow, XGBoost, and more. For enterprise needs, RayTurbo Train offers additional features like elastic training, advanced monitoring, and performance optimization.
Next, train the model:
result: Result = trainer.fit()
result
At the beginning of the training job, Ray started requesting GPU nodes to satisfy the training job's requirement of five GPU workers.
Ray Train returns a ray.train.Result object, which contains important properties such as metrics, checkpoint information, and error details:
metrics = result.metrics
metrics
The expected output are similar to the following:
OrderedDict([('train-logloss', 0.05463397157248817),
('train-error', 0.00506329113924051),
('validation-logloss', 0.06741214815308066),
('validation-error', 0.01176470588235294)])
See that the Ray Train logs metrics based on the values you configured in eval_metric and evals.
You can also reconstruct the trained model from the checkpoint directory:
booster = RayTrainReportCallback.get_model(result.checkpoint)
booster
Now that you've trained the model, save it to a model registry for future use. As this is a distributed training workload, the model registry storage needs to be accessible from all workers in the cluster. This storage can be S3, NFS, or another network-attached solution. Anyscale simplifies this process by automatically creating and mounting shared storage options on every cluster node, ensuring that model artifacts are readable and writable across the distributed environment.
The MLflow tracking server stores experiment metadata and model artifacts in the shared storage location, making them available for future model serving, evaluation, or retraining workflows. Ray also integrates with other experiment trackers.
import shutil
from tempfile import TemporaryDirectory
import mlflow
from dist_xgboost.constants import (
experiment_name,
model_fname,
model_registry,
preprocessor_fname,
)
def clean_up_old_runs():
# Clean up old MLflow runs.
os.path.isdir(model_registry) and shutil.rmtree(model_registry)
# mlflow.delete_experiment(experiment_name)
os.makedirs(model_registry, exist_ok=True)
def log_run_to_mlflow(model_config, result, preprocessor_path):
# Create a model registry in user storage.
mlflow.set_tracking_uri(f"file:{model_registry}")
# Create a new experiment and log metrics and artifacts.
mlflow.set_experiment(experiment_name)
with mlflow.start_run(description="xgboost breast cancer classifier on all features"):
mlflow.log_params(model_config)
mlflow.log_metrics(result.metrics)
# Selectively log just the preprocessor and model weights.
with TemporaryDirectory() as tmp_dir:
shutil.copy(
os.path.join(result.checkpoint.path, model_fname),
os.path.join(tmp_dir, model_fname),
)
shutil.copy(
preprocessor_path,
os.path.join(tmp_dir, preprocessor_fname),
)
mlflow.log_artifacts(tmp_dir)
clean_up_old_runs()
log_run_to_mlflow(model_config, result, preprocessor_path)
Start the MLflow server to view the experiments:
mlflow server -h 0.0.0.0 -p 8080 --backend-store-uri {model_registry}
To view the dashboard, go to the Overview tab > Open Ports > 8080.
You can also view the Ray Dashboard and Train workload dashboards:
You can retrieve the best model from the registry:
from dist_xgboost.data import get_best_model_from_registry
best_model, artifacts_dir = get_best_model_from_registry()
artifacts_dir
You can wrap the training workload as a production-grade Anyscale Job. See the API ref for more details.
from dist_xgboost.constants import root_dir
os.environ["WORKING_DIR"] = root_dir
Then submit the job using the anyscale CLI command:
%%bash
# Production batch job -- note that this is a bash cell
! anyscale job submit --name=train-xboost-breast-cancer-model \
--containerfile="${WORKING_DIR}/containerfile" \
--working-dir="${WORKING_DIR}" \
--exclude="" \
--max-retries=0 \
--wait \
-- cd notebooks && jupyter nbconvert --to script 01-Distributed_Training.ipynb && ipython 01-Distributed_Training.py
- The
containerfiledefines the dependencies, but you can also use a pre-built image- You can specify compute requirements as a compute config or inline in a job config
- When launched from a workspace without specifying compute, it defaults to the compute configuration of the workspace
One of the key advantages of Ray Train is its ability to effortlessly scale training workloads. By adjusting the ScalingConfig, you can optimize resource utilization and reduce training time.
Multi-node CPU example: 4 nodes with 8 CPUs each
scaling_config = ScalingConfig(
num_workers=4,
resources_per_worker={"CPU": 8},
)
Single-node multi-GPU example: 1 node with 8 CPUs and 4 GPUs
scaling_config = ScalingConfig(
num_workers=4,
use_gpu=True,
)
Multi-node multi-GPU example: 4 nodes with 8 CPUs and 4 GPUs each
scaling_config = ScalingConfig(
num_workers=16,
use_gpu=True,
)
Important: For multi-node clusters, you must specify a shared storage location, such as cloud storage or NFS, in the
run_config. Using a local path raises an error during checkpointing.pythontrainer = XGBoostTrainer( ..., run_config=ray.train.RunConfig(storage_path="s3://...") )
The optimal number of workers depends on the workload and cluster setup:
To use GPUs for training:
use_gpu=Truetree_method="gpu_hist" for XGBoosttrainer = XGBoostTrainer(
scaling_config=ScalingConfig(
# Number of workers to use for data parallelism.
num_workers=2,
# Whether to use GPU acceleration.
use_gpu=True,
),
params={
# XGBoost specific params.
"tree_method": "gpu_hist", # GPU-specific parameter
"eval_metric": ["logloss", "error"],
},
...
)
For more advanced topics, see: