docs/reference/offline-stores/ray.md
⚠️ Contrib Plugin:
The Ray offline store is a contributed plugin. It may not be as stable or fully supported as core offline stores. Use with caution in production and report issues to the Feast community.
The Ray offline store is a data I/O implementation that leverages Ray for reading and writing data from various sources. It focuses on efficient data access operations, while complex feature computation is handled by the Ray Compute Engine.
The easiest way to get started with Ray offline store is to use the built-in Ray template:
feast init -t ray my_ray_project
cd my_ray_project/feature_repo
This template includes:
The template provides a complete working example with sample datasets and demonstrates both Ray offline store data I/O operations and Ray compute engine distributed processing.
The Ray offline store provides:
num_gpus config (all modes including KubeRay)| Method | Supported |
|---|---|
| get_historical_features | Yes |
| pull_latest_from_table_or_query | Yes |
| pull_all_from_table_or_query | Yes |
| offline_write_batch | Yes |
| write_logged_features | Yes |
| RetrievalJob Feature | Supported |
|---|---|
| export to dataframe | Yes |
| export to arrow table | Yes |
| persist results in offline store | Yes |
| local execution of ODFVs | Yes |
| preview query plan | Yes |
| read partitioned data | Yes |
By default, Ray will use all available system resources (CPU and memory). This can cause issues in test environments or when experimenting locally, potentially leading to system crashes or unresponsiveness.
For testing and local experimentation, we strongly recommend:
feature_store.yaml (see Resource Management and Testing section below)This will limit Ray to safe resource levels for testing and development.
The Ray offline store follows Feast's architectural separation:
For complex feature processing, historical feature retrieval, and distributed joins, use the Ray Compute Engine.
The Ray offline store can be configured in your feature_store.yaml file. It supports three execution modes:
ray_addressFor simple data I/O operations without distributed processing:
project: my_project
registry: data/registry.db
provider: local
offline_store:
type: ray
storage_path: data/ray_storage # Optional: Path for storing datasets
Connect to an existing Ray cluster:
offline_store:
type: ray
storage_path: s3://my-bucket/feast-data
ray_address: "ray://my-cluster.example.com:10001"
Connect to Ray clusters on Kubernetes using CodeFlare SDK:
offline_store:
type: ray
storage_path: s3://my-bucket/feast-data
use_kuberay: true
kuberay_conf:
cluster_name: "feast-ray-cluster"
namespace: "feast-system"
auth_token: "${RAY_AUTH_TOKEN}"
auth_server: "https://api.openshift.com:6443"
skip_tls: false
enable_ray_logging: false
Environment Variables (alternative to config file):
export FEAST_RAY_USE_KUBERAY=true
export FEAST_RAY_CLUSTER_NAME=feast-ray-cluster
export FEAST_RAY_AUTH_TOKEN=your-token
export FEAST_RAY_AUTH_SERVER=https://api.openshift.com:6443
export FEAST_RAY_NAMESPACE=feast-system
For distributed feature processing with advanced capabilities:
project: my_project
registry: data/registry.db
provider: local
# Ray offline store for data I/O operations
offline_store:
type: ray
storage_path: s3://my-bucket/feast-data # Optional: Path for storing datasets
ray_address: localhost:10001 # Optional: Ray cluster address
# Ray compute engine for distributed feature processing
batch_engine:
type: ray.engine
# Resource configuration
max_workers: 8 # Maximum number of Ray workers
max_parallelism_multiplier: 2 # Parallelism as multiple of CPU cores
# Performance optimization
enable_optimization: true # Enable performance optimizations
broadcast_join_threshold_mb: 100 # Broadcast join threshold (MB)
target_partition_size_mb: 64 # Target partition size (MB)
# Distributed join configuration
window_size_for_joins: "1H" # Time window for distributed joins
enable_distributed_joins: true # Enable distributed joins
# Ray cluster configuration (optional)
ray_address: localhost:10001 # Ray cluster address
staging_location: s3://my-bucket/staging # Remote staging location
For local development and testing:
project: my_local_project
registry: data/registry.db
provider: local
offline_store:
type: ray
storage_path: ./data/ray_storage
# Conservative settings for local development
broadcast_join_threshold_mb: 25
max_parallelism_multiplier: 1
target_partition_size_mb: 16
enable_ray_logging: false
# Memory constraints to prevent OOM in test/development environments
ray_conf:
num_cpus: 1
object_store_memory: 104857600 # 100MB
_memory: 524288000 # 500MB
batch_engine:
type: ray.engine
max_workers: 2
enable_optimization: false
For production deployments with distributed Ray cluster:
project: my_production_project
registry: s3://my-bucket/registry.db
provider: local
offline_store:
type: ray
storage_path: s3://my-production-bucket/feast-data
ray_address: "ray://production-head-node:10001"
batch_engine:
type: ray.engine
max_workers: 32
max_parallelism_multiplier: 4
enable_optimization: true
broadcast_join_threshold_mb: 50
target_partition_size_mb: 128
window_size_for_joins: "30min"
ray_address: "ray://production-head-node:10001"
staging_location: s3://my-production-bucket/staging
| Option | Type | Default | Description |
|---|---|---|---|
type | string | Required | Must be feast.offline_stores.contrib.ray_offline_store.ray.RayOfflineStore or ray |
storage_path | string | None | Path for storing temporary files and datasets |
ray_address | string | None | Ray cluster address (triggers REMOTE mode, e.g., "ray://host:10001") |
use_kuberay | boolean | None | Enable KubeRay mode (overrides ray_address) |
kuberay_conf | dict | None | KubeRay configuration dict with keys: cluster_name (required), namespace (default: "default"), auth_token, auth_server, skip_tls (default: false) |
enable_ray_logging | boolean | false | Enable Ray progress bars and verbose logging |
ray_conf | dict | None | Ray initialization parameters for resource management (e.g., memory, CPU limits) |
broadcast_join_threshold_mb | int | 100 | Size threshold for broadcast joins (MB) |
enable_distributed_joins | boolean | true | Enable distributed joins for large datasets |
max_parallelism_multiplier | int | 2 | Parallelism as multiple of CPU cores |
target_partition_size_mb | int | 64 | Target partition size (MB) |
window_size_for_joins | string | "1H" | Time window for distributed joins |
num_gpus | float | None | GPUs per worker task. Supported in all modes. See Worker Resource Scheduling. |
gpu_batch_format | string | "pandas" | Batch format for map_batches when num_gpus is set ("numpy" or "pyarrow" for GPU-native libs). |
worker_task_options | dict | None | Arbitrary Ray .options() kwargs (num_cpus, memory, accelerator_type, resources, runtime_env, …). See Worker Resource Scheduling for the full reference. |
The Ray offline store automatically detects the execution mode using the following precedence:
FEAST_RAY_USE_KUBERAY, FEAST_RAY_CLUSTER_NAME, etc.kuberay_conf
ray_address
For Ray compute engine configuration options, see the Ray Compute Engine documentation.
By default, Ray will use all available system resources (CPU and memory). This can cause issues in test environments or when experimenting locally, potentially leading to system crashes or unresponsiveness.
For custom resource control, configure limits in your feature_store.yaml:
offline_store:
type: ray
storage_path: ./data/ray_storage
# Resource optimization settings
broadcast_join_threshold_mb: 25 # Smaller datasets for broadcast joins
max_parallelism_multiplier: 1 # Reduced parallelism
target_partition_size_mb: 16 # Smaller partition sizes
enable_ray_logging: false # Disable verbose logging
# Memory constraints to prevent OOM in test environments
ray_conf:
num_cpus: 1
object_store_memory: 104857600 # 100MB
_memory: 524288000 # 500MB
offline_store:
type: ray
storage_path: s3://my-bucket/feast-data
ray_address: "ray://production-cluster:10001"
# Optimized for production workloads
broadcast_join_threshold_mb: 100
max_parallelism_multiplier: 2
target_partition_size_mb: 64
enable_ray_logging: true
| Setting | Default | Description | Testing Recommendation |
|---|---|---|---|
broadcast_join_threshold_mb | 100 | Size threshold for broadcast joins (MB) | 25 |
max_parallelism_multiplier | 2 | Parallelism as multiple of CPU cores | 1 |
target_partition_size_mb | 64 | Target partition size (MB) | 16 |
enable_ray_logging | false | Enable Ray progress bars and logging | false |
# feature_store.yaml
offline_store:
type: ray
broadcast_join_threshold_mb: 25
max_parallelism_multiplier: 1
target_partition_size_mb: 16
# feature_store.yaml
offline_store:
type: ray
ray_address: "ray://cluster-head:10001"
broadcast_join_threshold_mb: 200
max_parallelism_multiplier: 4
from feast import FeatureStore, FeatureView, FileSource
from feast.types import Float32, Int64
from datetime import timedelta
# Define a feature view
driver_stats = FeatureView(
name="driver_stats",
entities=["driver_id"],
ttl=timedelta(days=1),
source=FileSource(
path="data/driver_stats.parquet",
timestamp_field="event_timestamp",
),
schema=[
("driver_id", Int64),
("avg_daily_trips", Float32),
],
)
# Initialize feature store
store = FeatureStore("feature_store.yaml")
# The Ray offline store handles data I/O operations
# For complex feature computation, use Ray Compute Engine
The Ray offline store provides direct access to underlying data:
from feast.infra.offline_stores.contrib.ray_offline_store.ray import RayOfflineStore
from datetime import datetime, timedelta
# Pull latest data from a table
job = RayOfflineStore.pull_latest_from_table_or_query(
config=store.config,
data_source=driver_stats.source,
join_key_columns=["driver_id"],
feature_name_columns=["avg_daily_trips"],
timestamp_field="event_timestamp",
created_timestamp_column=None,
start_date=datetime.now() - timedelta(days=7),
end_date=datetime.now(),
)
# Convert to pandas DataFrame
df = job.to_df()
print(f"Retrieved {len(df)} rows")
# Convert to Arrow Table
arrow_table = job.to_arrow()
# Get Ray dataset directly
ray_dataset = job.to_ray_dataset()
The Ray offline store supports batch writing for materialization:
import pyarrow as pa
from feast import FeatureView
# Create sample data
data = pa.table({
"driver_id": [1, 2, 3, 4, 5],
"avg_daily_trips": [10.5, 15.2, 8.7, 12.3, 9.8],
"event_timestamp": [datetime.now()] * 5
})
# Write batch data
RayOfflineStore.offline_write_batch(
config=store.config,
feature_view=driver_stats,
table=data,
progress=lambda x: print(f"Wrote {x} rows")
)
The Ray offline store supports persisting datasets for later analysis:
from feast.infra.offline_stores.file_source import SavedDatasetFileStorage
# Create storage destination
storage = SavedDatasetFileStorage(path="data/training_dataset.parquet")
# Persist the dataset
job.persist(storage, allow_overwrite=False)
# Create a saved dataset in the registry
saved_dataset = store.create_saved_dataset(
from_=job,
name="driver_training_dataset",
storage=storage,
tags={"purpose": "data_access", "version": "v1"}
)
print(f"Saved dataset created: {saved_dataset.name}")
The Ray offline store supports various remote storage backends:
# S3 storage
s3_storage = SavedDatasetFileStorage(path="s3://my-bucket/datasets/driver_features.parquet")
job.persist(s3_storage, allow_overwrite=True)
# Google Cloud Storage
gcs_storage = SavedDatasetFileStorage(path="gs://my-project-bucket/datasets/driver_features.parquet")
job.persist(gcs_storage, allow_overwrite=True)
# HDFS
hdfs_storage = SavedDatasetFileStorage(path="hdfs://namenode:8020/datasets/driver_features.parquet")
job.persist(hdfs_storage, allow_overwrite=True)
# Azure Blob Storage / Azure Data Lake Storage Gen2
# By setting AZURE_STORAGE_ANON=False it uses DefaultAzureCredential
az_storage = SavedDatasetFileStorage(path="abfss://container@stc_account.dfs.core.windows.net/datasets/driver_features.parquet")
job.persist(az_storage, allow_overwrite=True)
To use Ray in cluster mode for distributed data access:
ray start --head --port=10001
feature_store.yaml:offline_store:
type: ray
ray_address: localhost:10001
storage_path: s3://my-bucket/features
# On worker nodes
ray start --address='head-node-ip:10001'
To use Feast with Ray clusters on Kubernetes via CodeFlare SDK:
Prerequisites:
pip install codeflare-sdkConfiguration:
offline_store:
type: ray
use_kuberay: true
storage_path: s3://my-bucket/feast-data
kuberay_conf:
cluster_name: "feast-ray-cluster"
namespace: "feast-system"
auth_token: "${RAY_AUTH_TOKEN}"
auth_server: "https://api.openshift.com:6443"
skip_tls: false
enable_ray_logging: false
export FEAST_RAY_USE_KUBERAY=true
export FEAST_RAY_CLUSTER_NAME=feast-ray-cluster
export FEAST_RAY_AUTH_TOKEN=your-k8s-token
export FEAST_RAY_AUTH_SERVER=https://api.openshift.com:6443
export FEAST_RAY_NAMESPACE=feast-system
export FEAST_RAY_SKIP_TLS=false
# Then use standard Feast code
python your_feast_script.py
Features:
The Ray offline store supports GPU scheduling via the num_gpus and gpu_batch_format config options. This works across all execution modes (local, remote, and KubeRay).
For full configuration details, examples, and KubeRay GPU setup, see the Ray Compute Engine GPU Support section.
The Ray offline store validates data sources to ensure compatibility:
from feast.infra.offline_stores.contrib.ray_offline_store.ray import RayOfflineStore
# Validate a data source
try:
RayOfflineStore.validate_data_source(store.config, driver_stats.source)
print("Data source is valid")
except Exception as e:
print(f"Data source validation failed: {e}")
RaySource is the recommended data source for the
Ray offline store. It is a pure-metadata descriptor that tells Feast how to
load a Ray Dataset from any source Ray Data supports — Parquet, CSV, JSON,
HuggingFace datasets, MongoDB, binary files, images, TFRecords, WebDataset,
SQL, and more.
from feast.infra.offline_stores.contrib.ray_offline_store.ray_source import RaySource
# Load directly from the HuggingFace Hub
cheque_source = RaySource(
name="cheque_images_hf",
reader_type="huggingface",
reader_options={
"dataset_name": "cheques_sample_data",
"split": "train",
},
timestamp_field="event_timestamp",
)
See the RaySource reference for a full list of
reader_type values and configuration options.
Note:
FileSource(Parquet) remains supported for backward compatibility butRaySource(reader_type="parquet")is preferred for new projects.
The Ray offline store has one known limitation:
online_write_batch not implemented: The OfflineStore.online_write_batch() interface
is not supported by the Ray offline store. This does not affect materialization —
feast materialize writes to the online store correctly via the
Ray Compute Engine. The restriction only applies to callers
that invoke online_write_batch on the offline store object directly, which is an
uncommon pattern outside of custom tooling.For complex feature processing operations, use the Ray offline store in combination with the Ray Compute Engine. See the Ray Offline Store + Compute Engine configuration example in the Configuration section above for a complete setup.
For more advanced troubleshooting, refer to the Ray documentation.
Basic Ray Offline Store (local development):
offline_store:
type: ray
storage_path: ./data/ray_storage
# Conservative settings for local development
broadcast_join_threshold_mb: 25
max_parallelism_multiplier: 1
target_partition_size_mb: 16
enable_ray_logging: false
Ray Offline Store + Compute Engine (distributed processing):
offline_store:
type: ray
storage_path: s3://my-bucket/feast-data
batch_engine:
type: ray.engine
max_workers: 8
enable_optimization: true
broadcast_join_threshold_mb: 100
# Initialize feature store
store = FeatureStore("feature_store.yaml")
# Get historical features (uses compute engine if configured)
features = store.get_historical_features(entity_df=df, features=["fv:feature"])
# Direct data access (uses offline store)
job = RayOfflineStore.pull_latest_from_table_or_query(...)
df = job.to_df()
# Offline write batch (materialization)
# Create sample data for materialization
data = pa.table({
"driver_id": [1, 2, 3, 4, 5],
"avg_daily_trips": [10.5, 15.2, 8.7, 12.3, 9.8],
"event_timestamp": [datetime.now()] * 5
})
# Write batch to offline store
RayOfflineStore.offline_write_batch(
config=store.config,
feature_view=driver_stats_fv,
table=data,
progress=lambda rows: print(f"Processed {rows} rows")
)
For complete examples, see the Configuration section above.