docs/getting-started/concepts/batch-feature-view.md
BatchFeatureView is a flexible abstraction in Feast that allows users to define features derived from batch data sources or even other FeatureViews, enabling composable and reusable feature pipelines. It is an extension of the FeatureView class, with support for user-defined transformations, aggregations, and recursive chaining of feature logic.
BatchFeatureView on top of one or more other FeatureViews.feature_transformation or udf) to raw data source, can also be used to deal with multiple data sources.sum, avg) over event-timestamped data.sink_source to define where derived feature data should be persisted.class BatchFeatureView(FeatureView):
def __init__(
*,
name: str,
source: Optional[Union[DataSource, FeatureView, List[FeatureView]]] = None,
sink_source: Optional[DataSource] = None,
schema: Optional[List[Field]] = None,
entities: Optional[List[Entity]] = None,
aggregations: Optional[List[Aggregation]] = None,
udf: Optional[Callable[[DataFrame], DataFrame]] = None,
udf_string: Optional[str] = None,
ttl: Optional[timedelta] = timedelta(days=0),
online: bool = True,
offline: bool = False,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
)
from feast import BatchFeatureView, Field
from feast.types import Float32, Int32
from feast import FileSource
from feast.aggregation import Aggregation
from datetime import timedelta
source = FileSource(
path="s3://bucket/path/data.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
driver_fv = BatchFeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
schema=[
Field(name="driver_id", dtype=Int32),
Field(name="conv_rate", dtype=Float32),
],
aggregations=[
Aggregation(column="conv_rate", function="sum", time_window=timedelta(days=1), name="total_conv_rate_1d"),
],
source=source,
)
You can build feature views on top of other features by deriving a feature view from another view. Let's take a look at an example.
from feast import BatchFeatureView, Field
from pyspark.sql import DataFrame
from feast.types import Float32, Int32
from feast import FileSource
def transform(df: DataFrame) -> DataFrame:
return df.withColumn("conv_rate", df["conv_rate"] * 2)
daily_driver_stats = BatchFeatureView(
name="daily_driver_stats",
entities=["driver_id"],
schema=[
Field(name="driver_id", dtype=Int32),
Field(name="conv_rate", dtype=Float32),
],
udf=transform,
source=driver_fv,
sink_source=FileSource( # Required to specify where to sink the derived view
name="daily_driver_stats_sink",
path="s3://bucket/daily_stats/",
file_format="parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
),
)
Feast automatically resolves the DAG of BatchFeatureView dependencies during:
materialize(): recursively resolves and executes the feature view graph.get_historical_features(): builds the execution plan for retrieving point-in-time correct features.apply(): registers the feature view DAG structure to the registry.Each transformation and aggregation is turned into a DAG node (e.g., SparkTransformationNode, SparkAggregationNode) executed by the compute engine (e.g., SparkComputeEngine).
BatchFeatureView is backed by a base source (FileSource, BigQuerySource, SparkSource etc), the batch_source is used directly.sink_source must be provided to define the materialization target data source.SparkWriteNode uses the sink_source as the batch sink.See:
test_spark_dag_materialize_recursive_view(): Validates chaining of two feature views and output validation.test_spark_compute_engine_materialize(): Validates transformation and write of features into offline and online stores.sink_source is required when chaining views (i.e., source is another FeatureView or list of them).source is optional; if omitted (None), the feature view has no associated batch data source.sink_source, batch_source.field_mapping if field mappings exist.{function}_{column} (e.g., sum_conv_rate). Use the name parameter to override it (e.g., name="total_conv_rate_1d").