doc/source/ray-overview/examples/e2e-timeseries/e2e_timeseries/02-Validation.ipynb
This tutorial demonstrates how to perform batch inference using the DLinear model and Ray Data. The process involves loading the model checkpoint, preparing the test data, running inference in batches, and evaluating the performance.
Note that this notebook requires the pre-trained model artifacts that the previous "Distributed training of a DLinear time-series model" notebook generates.
The preceding figure illustrates how different blocks of data process concurrently at various stages of the pipeline. This parallel execution maximizes resource utilization and throughput.
Note that this diagram is a simplification for various reasons:
ā Traditional batch execution, non-streaming like Spark without pipelining or SageMaker Batch Transform:
.map, .filter, etc.ā Streaming execution with Ray Data:
Note: Ray Data operates as batch processing with streaming execution rather than a real-time stream processing engine like Flink or Kafka Streams. This approach proves especially useful for iterative ML workloads, ETL pipelines, and preprocessing before training or inference. Ray typically delivers a 2-17x throughput improvement over solutions like Spark and SageMaker Batch Transform.
# Enable importing from e2e_timeseries module.
import os
import sys
sys.path.insert(0, os.path.abspath(os.path.join(os.getcwd())))
Start by setting up the environment and imports:
import numpy as np
import ray
import torch
os.environ["RAY_TRAIN_V2_ENABLED"] = "1"
import e2e_timeseries
from e2e_timeseries.data_factory import data_provider
from e2e_timeseries.metrics import metric
from e2e_timeseries.model import DLinear
Initialize the Ray cluster with the e2e_timeseries module, so that newly spawned workers can import it.
ray.init(runtime_env={"py_modules": [e2e_timeseries]})
Next, set up the DLinear model configuration as well as job configuration:
# Load the best checkpoint path from the metadata file created in the training notebook.
best_checkpoint_metadata_fpath = os.path.join(
"/mnt/cluster_storage/checkpoints", "best_checkpoint_path.txt"
)
with open(best_checkpoint_metadata_fpath, "r") as f:
best_checkpoint_path = f.read().strip()
config = {
"checkpoint_path": best_checkpoint_path,
"num_data_workers": 1,
"features": "S",
"target": "OT",
"smoke_test": False,
"seq_len": 96,
"label_len": 48,
"pred_len": 96,
"individual": False,
"batch_size": 64,
"num_predictor_replicas": 4,
}
def _process_config(config: dict) -> dict:
"""Helper function to process and update configuration."""
# Configure encoder input size based on task type.
if config["features"] == "M" or config["features"] == "MS":
config["enc_in"] = 7 # ETTh1 has 7 features when multi-dimensional prediction is enabled
else:
config["enc_in"] = 1
# Ensure paths are absolute.
config["checkpoint_path"] = os.path.abspath(config["checkpoint_path"])
config["num_gpus_per_worker"] = 1.0
config["train_only"] = False # Load test subset
return config
# Set derived values.
config = _process_config(config)
First, load the test dataset as a Ray Data Dataset. Use .show(1) to trigger the execution for a single row,
because Ray Data is lazily evaluates datasets.
ray.init(ignore_reinit_error=True)
print("Loading test data...")
ds = data_provider(config, flag="test")
ds.show(1)
This cell defines the Predictor class. It loads the trained DLinear model from a checkpoint and processes input batches to produce predictions. The call method performs inference on a given batch of NumPy arrays.
Ray Data's actor-based processing enables loading the model weights and transferring them to GPU only once and reusing them across batches.
class Predictor:
"""Actor class for performing inference with the DLinear model."""
def __init__(self, checkpoint_path: str, config: dict):
self.config = config
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Load model from checkpoint.
self.model = DLinear(config).float()
checkpoint = torch.load(checkpoint_path, map_location=self.device)
self.model.load_state_dict(checkpoint["model_state_dict"])
self.model.to(self.device)
self.model.eval()
def __call__(self, batch: dict[str, np.ndarray]) -> dict:
"""Process a batch of data for inference (numpy batch format)."""
# Convert input batch to tensor.
batch_x = torch.from_numpy(batch["x"]).float().to(self.device)
with torch.no_grad():
outputs = self.model(batch_x) # Shape (N, pred_len, features_out)
# Determine feature dimension based on config.
f_dim = -1 if self.config["features"] == "MS" else 0
outputs = outputs[:, -self.config["pred_len"] :, f_dim:]
outputs_np = outputs.cpu().numpy()
# Extract the target part from the batch.
batch_y = batch["y"]
batch_y_target = batch_y[:, -self.config["pred_len"] :]
return {"predictions": outputs_np, "targets": batch_y_target}
ds = ds.map_batches(
Predictor,
fn_constructor_kwargs={"checkpoint_path": config["checkpoint_path"], "config": config},
batch_size=config["batch_size"],
concurrency=config["num_predictor_replicas"],
num_gpus=config["num_gpus_per_worker"],
batch_format="numpy",
)
Next, perform minor post-processing to get the results in the desired dimensions.
def postprocess_items(item: dict) -> dict:
# Squeeze singleton dimensions for predictions and targets if necessary.
if item["predictions"].shape[-1] == 1:
item["predictions"] = item["predictions"].squeeze(-1)
if item["targets"].shape[-1] == 1:
item["targets"] = item["targets"].squeeze(-1)
return item
ds = ds.map(postprocess_items)
Finally, execute all of these lazy steps and materialize them into memory using take_all():
# Trigger the lazy execution of the entire Ray pipeline.
all_results = ds.take_all()
Now that the results are in memory, calculate some validation metrics for the trained DLinear model.
# Concatenate predictions and targets from all batches.
all_predictions = np.concatenate([item["predictions"] for item in all_results], axis=0)
all_targets = np.concatenate([item["targets"] for item in all_results], axis=0)
# Compute evaluation metrics.
mae, mse, rmse, mape, mspe, rse = metric(all_predictions, all_targets)
print("\n--- Test Results ---")
print(f"MSE: {mse:.3f}")
print(f"MAE: {mae:.3f}")
print(f"RMSE: {rmse:.3f}")
print(f"MAPE: {mape:.3f}")
print(f"MSPE: {mspe:.3f}")
print(f"RSE: {rse:.3f}")
print("\nOffline inference finished!")