Back to Feast

Ray Data Source (contrib)

docs/reference/data-sources/ray.md

0.63.06.5 KB
Original Source

Ray Data Source (contrib)

⚠️ Contrib Plugin: RaySource is a contributed plugin shipped alongside the Ray offline store. It may not be as stable or fully supported as core data sources.

RaySource is a pure-metadata descriptor that tells Feast how to load a Ray Dataset from any source that Ray Data supports natively — Parquet, CSV, JSON, HuggingFace Datasets, MongoDB, binary files, images, TFRecords, and more.

It is the recommended data source when using the Ray offline store and replaces the need for FileSource for all non-Parquet and non-file-based data.


When to use RaySource vs FileSource

ScenarioRecommended source
Parquet files on disk / S3 / GCS (existing setup)FileSource (backward compatible)
Parquet via Ray reader (pipelines, remote auth)RaySource(reader_type="parquet")
CSV, JSON, text, images via RayRaySource
HuggingFace datasets libraryRaySource(reader_type="huggingface")
MongoDB, SQL, TFRecords, WebDatasetRaySource

Installation

RaySource is bundled with the Ray offline store contrib package:

bash
pip install 'feast[ray]'

Supported reader_type values

reader_typeUnderlying Ray APINotes
parquetray.data.read_parquetS3, GCS, HDFS, local
csvray.data.read_csv
jsonray.data.read_json
textray.data.read_text
imagesray.data.read_images
binary_filesray.data.read_binary_files
tfrecordsray.data.read_tfrecords
webdatasetray.data.read_webdataset
huggingfaceray.data.from_huggingfaceWraps datasets.load_dataset
mongoray.data.read_mongo
sqlray.data.read_sqlPass connection_url in reader_options

Configuration

Parameters

ParameterTypeRequiredDescription
namestrYesUnique name for this data source
reader_typestrYesOne of the supported reader types above
pathstrNoFile or directory path (required for file-based readers)
reader_optionsdictNoExtra keyword arguments forwarded to the Ray reader
timestamp_fieldstrNoColumn containing event timestamps
created_timestamp_columnstrNoColumn containing row creation timestamps
tagsdictNoArbitrary key-value metadata
descriptionstrNoHuman-readable description
ownerstrNoOwning team or contact

Usage examples

Parquet on S3

python
from feast.infra.offline_stores.contrib.ray_offline_store.ray_source import RaySource

driver_stats = RaySource(
    name="driver_stats_parquet",
    reader_type="parquet",
    path="s3://my-bucket/driver_stats/",
    timestamp_field="event_timestamp",
)

CSV

python
sensor_readings = RaySource(
    name="sensor_readings_csv",
    reader_type="csv",
    path="/data/sensors/",
    timestamp_field="ts",
)

HuggingFace dataset

Load a dataset from the HuggingFace Hub directly into Feast.

python
from feast.infra.offline_stores.contrib.ray_offline_store.ray_source import RaySource

cheque_images = RaySource(
    name="cheque_images_hf",
    reader_type="huggingface",
    reader_options={
        "dataset_name": "cheques_sample_data",
        "split": "train",
    },
    timestamp_field="event_timestamp",
)

MongoDB

python
transaction_log = RaySource(
    name="transactions_mongo",
    reader_type="mongo",
    reader_options={
        "uri": "mongodb://localhost:27017",
        "database": "featuredb",
        "collection": "transactions",
    },
    timestamp_field="created_at",
)

SQL (via connection URL)

python
user_features = RaySource(
    name="user_features_sql",
    reader_type="sql",
    reader_options={
        "connection_url": "postgresql+psycopg2://user:password@host:5432/db",  # pragma: allowlist secret
        "query": "SELECT * FROM user_features",
    },
    timestamp_field="event_timestamp",
)

Using RaySource in a BatchFeatureView

python
from datetime import timedelta
from feast import BatchFeatureView, Entity, Field
from feast.types import Float32, Int64, String
from feast.infra.offline_stores.contrib.ray_offline_store.ray_source import RaySource

cheque = Entity(name="cheque_id", description="Unique cheque identifier")

cheque_source = RaySource(
    name="cheque_images_hf",
    reader_type="huggingface",
    reader_options={
        "dataset_name": "cheques_sample_data",
        "split": "train",
    },
    timestamp_field="event_timestamp",
)

cheque_ocr_fv = BatchFeatureView(
    name="cheque_ocr_features",
    entities=[cheque],
    ttl=timedelta(days=365),
    schema=[
        Field(name="cheque_id", dtype=Int64),
        Field(name="payee_name", dtype=String),
        Field(name="amount", dtype=String),
        Field(name="bank_name", dtype=String),
        Field(name="raw_text", dtype=String),
    ],
    source=cheque_source,
)

Retrieving data as a Ray Dataset

Once the feature view is materialised you can retrieve the offline features directly as a Ray Dataset using the first-class to_ray_dataset() method:

python
from feast import FeatureStore

store = FeatureStore(".")

# Chain directly on the retrieval job — to_ray_dataset() is a first-class
# method on every RetrievalJobs.
ds = store.get_historical_features(
    features=["cheque_ocr_features:payee_name", "cheque_ocr_features:amount"],
    entity_df=entity_df,
).to_ray_dataset()

# Use the dataset downstream in Ray or ML pipelines
ds.show(3)

Proto serialisation

RaySource is fully serialisable to Feast's protobuf registry format. The reader_type, path, and reader_options dict are all persisted and can be round-tripped via to_proto() / from_proto().


Limitations

  • The Ray offline store (and therefore RaySource) requires feast[ray].
  • reader_type="sql" requires a serialisable connection_url; raw sqlalchemy.engine.Engine objects cannot be pickled across Ray workers.
  • Streaming sources (Kafka, Kinesis) are not supported via RaySource; use the dedicated Kafka or Kinesis data sources.