docs/reference/data-sources/table-formats.md
Table formats are metadata and transaction layers built on top of data storage formats (like Parquet). They provide advanced capabilities for managing large-scale data lakes, including ACID transactions, time travel, schema evolution, and efficient data management.
Feast supports modern table formats to enable data lakehouse architectures with your feature store.
Apache Iceberg is an open table format designed for huge analytic datasets. It provides:
from feast.table_format import IcebergFormat
iceberg_format = IcebergFormat(
catalog="my_catalog",
namespace="my_database"
)
| Parameter | Type | Description |
|---|---|---|
catalog | str (optional) | Iceberg catalog name |
namespace | str (optional) | Namespace/schema within the catalog |
properties | dict (optional) | Additional Iceberg configuration properties |
iceberg_format = IcebergFormat(
catalog="spark_catalog",
namespace="production",
properties={
# Snapshot selection
"snapshot-id": "123456789",
"as-of-timestamp": "1609459200000", # Unix timestamp in ms
# Performance tuning
"read.split.target-size": "134217728", # 128 MB splits
"read.parquet.vectorization.enabled": "true",
# Advanced configuration
"io-impl": "org.apache.iceberg.hadoop.HadoopFileIO",
"warehouse": "s3://my-bucket/warehouse"
}
)
# Read from a specific snapshot
iceberg_format = IcebergFormat(
catalog="spark_catalog",
namespace="lakehouse"
)
iceberg_format.set_property("snapshot-id", "7896524153287651133")
# Or read as of a timestamp
iceberg_format.set_property("as-of-timestamp", "1609459200000")
Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads. It provides:
from feast.table_format import DeltaFormat
delta_format = DeltaFormat()
| Parameter | Type | Description |
|---|---|---|
checkpoint_location | str (optional) | Location for Delta transaction log checkpoints |
properties | dict (optional) | Additional Delta configuration properties |
delta_format = DeltaFormat(
checkpoint_location="s3://my-bucket/checkpoints",
properties={
# Time travel
"versionAsOf": "5",
"timestampAsOf": "2024-01-01 00:00:00",
# Performance optimization
"delta.autoOptimize.optimizeWrite": "true",
"delta.autoOptimize.autoCompact": "true",
# Data skipping
"delta.dataSkippingNumIndexedCols": "32",
# Z-ordering
"delta.autoOptimize.zOrderCols": "event_timestamp"
}
)
# Read from a specific version
delta_format = DeltaFormat()
delta_format.set_property("versionAsOf", "10")
# Or read as of a timestamp
delta_format = DeltaFormat()
delta_format.set_property("timestampAsOf", "2024-01-15 12:00:00")
Apache Hudi (Hadoop Upserts Deletes and Incrementals) is a data lake storage framework for simplifying incremental data processing. It provides:
from feast.table_format import HudiFormat
hudi_format = HudiFormat(
table_type="COPY_ON_WRITE",
record_key="user_id",
precombine_field="updated_at"
)
| Parameter | Type | Description |
|---|---|---|
table_type | str (optional) | COPY_ON_WRITE or MERGE_ON_READ |
record_key | str (optional) | Field(s) that uniquely identify a record |
precombine_field | str (optional) | Field used to determine the latest version |
properties | dict (optional) | Additional Hudi configuration properties |
COPY_ON_WRITE (COW)
hudi_format = HudiFormat(
table_type="COPY_ON_WRITE",
record_key="id",
precombine_field="timestamp"
)
MERGE_ON_READ (MOR)
hudi_format = HudiFormat(
table_type="MERGE_ON_READ",
record_key="id",
precombine_field="timestamp"
)
hudi_format = HudiFormat(
table_type="COPY_ON_WRITE",
record_key="user_id",
precombine_field="updated_at",
properties={
# Query type
"hoodie.datasource.query.type": "snapshot", # or "incremental"
# Incremental queries
"hoodie.datasource.read.begin.instanttime": "20240101000000",
"hoodie.datasource.read.end.instanttime": "20240102000000",
# Indexing
"hoodie.index.type": "BLOOM",
# Compaction (for MOR tables)
"hoodie.compact.inline": "true",
"hoodie.compact.inline.max.delta.commits": "5",
# Clustering
"hoodie.clustering.inline": "true"
}
)
# Process only new/changed data
hudi_format = HudiFormat(
table_type="COPY_ON_WRITE",
record_key="id",
precombine_field="timestamp",
properties={
"hoodie.datasource.query.type": "incremental",
"hoodie.datasource.read.begin.instanttime": "20240101000000",
"hoodie.datasource.read.end.instanttime": "20240102000000"
}
)
It's important to understand the distinction:
| Aspect | File Format | Table Format |
|---|---|---|
| What it is | Physical encoding of data | Metadata and transaction layer |
| Examples | Parquet, Avro, ORC, CSV | Iceberg, Delta Lake, Hudi |
| Handles | Data serialization | ACID, versioning, schema evolution |
| Layer | Storage layer | Metadata layer |
# Table format (metadata layer) built on top of file format (storage layer)
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import SparkSource
from feast.table_format import IcebergFormat
iceberg = IcebergFormat(catalog="my_catalog", namespace="db")
source = SparkSource(
name="features",
path="catalog.db.table",
file_format="parquet", # Underlying storage format
table_format=iceberg, # Table metadata format
timestamp_field="event_timestamp"
)
| Use Case | Recommended Format | Why |
|---|---|---|
| Large-scale analytics with frequent schema changes | Iceberg | Best schema evolution, hidden partitioning, mature ecosystem |
| Streaming + batch workloads | Delta Lake | Unified architecture, strong integration with Spark, good docs |
| CDC and upsert-heavy workloads | Hudi | Efficient record-level updates, incremental queries |
| Read-heavy analytics | Iceberg or Delta | Excellent query performance |
| Write-heavy transactional | Hudi (MOR) | Optimized for fast writes |
| Multi-engine support | Iceberg | Widest engine support (Spark, Flink, Trino, etc.) |
# Iceberg - hidden partitioning
iceberg_format.set_property("partition-spec", "days(event_timestamp)")
# Delta - explicit partitioning in data source
# Hudi - configure via properties
hudi_format.set_property("hoodie.datasource.write.partitionpath.field", "date")
# Delta auto-optimize
delta_format.set_property("delta.autoOptimize.optimizeWrite", "true")
delta_format.set_property("delta.autoOptimize.autoCompact", "true")
# Hudi compaction
hudi_format.set_property("hoodie.compact.inline", "true")
# Regularly clean up old snapshots/versions
# For Iceberg: Use expire_snapshots() procedure
# For Delta: Use VACUUM command
# For Hudi: Configure retention policies
# Always test schema changes in non-production first
# Ensure backward compatibility
# Use proper migration procedures
Currently, table formats are supported with:
Future support planned for: