examples/monitoring/monitoring-quickstart.ipynb
This notebook walks you through Feast's data quality monitoring end-to-end:
Prerequisites: A running PostgreSQL instance and feast[postgres] installed.
!uv pip install -q 'feast[postgres]'
Create a minimal feature_store.yaml with a PostgreSQL offline store.
import os
import tempfile
REPO_DIR = tempfile.mkdtemp(prefix="feast_monitoring_demo_")
os.makedirs(REPO_DIR, exist_ok=True)
print(f"Working directory: {REPO_DIR}")
# Adjust these to match your PostgreSQL instance
PG_HOST = os.environ.get("FEAST_PG_HOST", "localhost")
PG_PORT = os.environ.get("FEAST_PG_PORT", "5432")
PG_DB = os.environ.get("FEAST_PG_DB", "feast")
PG_USER = os.environ.get("FEAST_PG_USER", "feast")
PG_PASS = os.environ.get("FEAST_PG_PASS", "feast")
PG_SSLMODE = os.environ.get("FEAST_PG_SSLMODE", "disable")
feature_store_yaml = f"""
project: monitoring_demo
registry:
registry_type: sql
path: postgresql://{PG_USER}:{PG_PASS}@{PG_HOST}:{PG_PORT}/{PG_DB}?sslmode={PG_SSLMODE}
provider: local
offline_store:
type: postgres
host: {PG_HOST}
port: {PG_PORT}
database: {PG_DB}
user: {PG_USER}
password: {PG_PASS}
sslmode: {PG_SSLMODE}
online_store:
type: sqlite
path: {REPO_DIR}/online_store.db
entity_key_serialization_version: 3
"""
with open(os.path.join(REPO_DIR, "feature_store.yaml"), "w") as f:
f.write(feature_store_yaml)
print("feature_store.yaml written.")
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
np.random.seed(42)
N_ROWS = 5000
N_DAYS = 60
base_date = datetime(2025, 1, 1)
timestamps = [base_date + timedelta(days=int(d)) for d in np.random.randint(0, N_DAYS, N_ROWS)]
df = pd.DataFrame({
"driver_id": np.random.randint(1000, 2000, N_ROWS),
"event_timestamp": timestamps,
"conv_rate": np.clip(np.random.normal(0.5, 0.2, N_ROWS), 0, 1),
"acc_rate": np.clip(np.random.normal(0.7, 0.15, N_ROWS), 0, 1),
"avg_daily_trips": np.random.poisson(20, N_ROWS).astype("int32"),
"vehicle_type": np.random.choice(["sedan", "suv", "truck", "van", "compact"], N_ROWS),
"created": timestamps,
})
print(f"Sample data: {len(df)} rows, {N_DAYS} days")
df.head()
!uv pip install -q 'psycopg2'
# Load sample data into PostgreSQL'
from sqlalchemy import create_engine
engine = create_engine(f"postgresql://{PG_USER}:{PG_PASS}@{PG_HOST}:{PG_PORT}/{PG_DB}")
df.to_sql("driver_stats_source", engine, if_exists="replace", index=False)
print("Loaded sample data into PostgreSQL table 'driver_stats_source'.")
# Write feature definitions
definitions = '''
from datetime import timedelta
from feast import Entity, FeatureView, FeatureService, Field
from feast.types import Float32, Int32, String
from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
PostgreSQLSource,
)
driver = Entity(name="driver", join_keys=["driver_id"])
driver_stats_source = PostgreSQLSource(
name="driver_stats_source",
query="SELECT * FROM driver_stats_source",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)
driver_stats_fv = FeatureView(
name="driver_stats",
entities=[driver],
ttl=timedelta(days=365),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int32),
Field(name="vehicle_type", dtype=String),
],
source=driver_stats_source,
)
driver_service = FeatureService(
name="driver_service",
features=[driver_stats_fv],
)
'''
with open(os.path.join(REPO_DIR, "definitions.py"), "w") as f:
f.write(definitions)
print("Feature definitions written.")
Running feast apply registers the feature definitions and automatically queues baseline metric computation.
import sys
from feast import FeatureStore
sys.path.insert(0, REPO_DIR)
from definitions import driver, driver_stats_source, driver_stats_fv, driver_service
store = FeatureStore(repo_path=REPO_DIR)
store.apply([driver, driver_stats_source, driver_stats_fv, driver_service])
print("Features registered. Baseline computation queued.")
Auto-compute detects the latest event timestamp and generates metrics for all 5 granularities: daily, weekly, biweekly, monthly, and quarterly.
from feast.monitoring.monitoring_service import MonitoringService
monitoring = MonitoringService(store)
result = monitoring.auto_compute(
project="monitoring_demo",
)
print(f"Computed metrics for {result.get('computed_features', 'N/A')} features")
print(f"Granularities: {result.get('granularities', [])}")
Compute weekly metrics for a specific window.
from datetime import date
result = monitoring.compute_metrics(
project="monitoring_demo",
feature_view_name="driver_stats",
start_date=date(2025, 1, 1),
end_date=date(2025, 1, 7),
granularity="weekly",
)
print(result)
Use set_baseline=True to mark the computed metrics as the reference distribution.
result = monitoring.compute_metrics(
project="monitoring_demo",
feature_view_name="driver_stats",
start_date=date(2025, 1, 1),
end_date=date(2025, 2, 28),
granularity="daily",
set_baseline=True,
)
print("Baseline set.")
metrics = monitoring.get_feature_metrics(
project="monitoring_demo",
feature_view_name="driver_stats",
feature_name="conv_rate",
data_source_type="batch",
granularity="daily",
)
for m in metrics[:3]:
print(f"Date: {m['metric_date']} Mean: {m['mean']:.4f} Null rate: {m['null_rate']:.4f} Rows: {m['row_count']}")
Categorical features (like vehicle_type) produce value-count histograms instead of numeric statistics.
cat_metrics = monitoring.get_feature_metrics(
project="monitoring_demo",
feature_view_name="driver_stats",
feature_name="vehicle_type",
data_source_type="batch",
granularity="daily",
)
for m in cat_metrics[:3]:
print(f"Date: {m['metric_date']} Type: {m['feature_type']} "
f"Rows: {m['row_count']} Null rate: {m['null_rate']:.4f}")
if m.get("histogram"):
hist = m["histogram"]
print(f" Unique values: {hist['unique_count']} Other count: {hist['other_count']}")
for entry in hist["values"]:
print(f" {entry['value']}: {entry['count']}")
try:
import matplotlib.pyplot as plt
latest_cat = cat_metrics[0] if cat_metrics else None
if latest_cat and latest_cat.get("histogram"):
hist = latest_cat["histogram"]
labels = [e["value"] for e in hist["values"]]
counts = [e["count"] for e in hist["values"]]
if hist["other_count"] > 0:
labels.append("(other)")
counts.append(hist["other_count"])
fig, ax = plt.subplots(figsize=(8, 4))
ax.barh(labels, counts, color="steelblue", edgecolor="white")
ax.set_title(f"vehicle_type distribution — {latest_cat['metric_date']}")
ax.set_xlabel("Count")
plt.tight_layout()
plt.show() # pragma: allowlist secret
else:
print("No categorical histogram data available.")
except ImportError:
print("Install matplotlib to visualize: pip install matplotlib")
view_metrics = monitoring.get_feature_view_metrics(
project="monitoring_demo",
feature_view_name="driver_stats",
data_source_type="batch",
)
for m in view_metrics[:3]:
print(f"Date: {m['metric_date']} Total rows: {m['total_row_count']} "
f"Features w/ nulls: {m['features_with_nulls']} Max null rate: {m.get('max_null_rate', 'N/A')}")
svc_metrics = monitoring.get_feature_service_metrics(
project="monitoring_demo",
feature_service_name="driver_service",
data_source_type="batch",
)
for m in svc_metrics[:3]:
print(f"Date: {m['metric_date']} Total features: {m['total_features']} "
f"Avg null rate: {m.get('avg_null_rate', 'N/A')}")
baseline = monitoring.get_baseline(
project="monitoring_demo",
feature_view_name="driver_stats",
feature_name="conv_rate",
data_source_type="batch",
)
if baseline:
print(f"Baseline mean: {baseline[0]['mean']:.4f}")
print(f"Baseline stddev: {baseline[0]['stddev']:.4f}")
print(f"Baseline null_rate: {baseline[0]['null_rate']:.4f}")
else:
print("No baseline found.")
Use the histogram stored in the metrics to plot a distribution.
!uv pip install -q 'matplotlib'
try:
import matplotlib.pyplot as plt
# Get the latest daily metric for conv_rate
latest = metrics[0] if metrics else None
if latest and latest.get("histogram"):
hist = latest["histogram"]
bins = hist["bins"]
counts = hist["counts"]
fig, ax = plt.subplots(figsize=(10, 4))
ax.bar(
[f"{bins[i]:.2f}" for i in range(len(counts))],
counts,
color="steelblue",
edgecolor="white",
)
ax.set_title(f"conv_rate distribution — {latest['metric_date']}")
ax.set_xlabel("Bin")
ax.set_ylabel("Count")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show() # pragma: allowlist secret
else:
print("No histogram data available.")
except ImportError:
print("Install matplotlib to visualize: pip install matplotlib")
Plot how a metric (e.g., mean) evolves over time.
timeseries = monitoring.get_timeseries(
project="monitoring_demo",
feature_view_name="driver_stats",
feature_name="conv_rate",
data_source_type="batch",
granularity="daily",
start_date=date(2025, 1, 1),
end_date=date(2025, 3, 1),
)
if timeseries:
dates = [t["metric_date"] for t in timeseries]
means = [t["mean"] for t in timeseries]
null_rates = [t["null_rate"] for t in timeseries]
print(f"{len(timeseries)} data points from {dates[0]} to {dates[-1]}")
for t in timeseries[:5]:
print(f" {t['metric_date']}: mean={t['mean']:.4f}, null_rate={t['null_rate']:.4f}")
print(" ...")
else:
print("No time-series data.")
try:
import matplotlib.pyplot as plt
if timeseries:
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 6), sharex=True)
ax1.plot(dates, means, marker=".", color="steelblue")
ax1.set_ylabel("Mean")
ax1.set_title("conv_rate — Daily Trend")
ax1.grid(True, alpha=0.3)
ax2.plot(dates, null_rates, marker=".", color="coral")
ax2.set_ylabel("Null Rate")
ax2.set_xlabel("Date")
ax2.grid(True, alpha=0.3)
plt.xticks(rotation=45)
plt.tight_layout()
plt.show() # pragma: allowlist secret
except ImportError:
print("Install matplotlib to visualize: pip install matplotlib")
Compute metrics for an arbitrary date range without storing them. Useful for ad-hoc investigation.
transient_result = monitoring.compute_transient(
project="monitoring_demo",
feature_view_name="driver_stats",
feature_names=["conv_rate", "avg_daily_trips", "vehicle_type"],
start_date=date(2025, 1, 10),
end_date=date(2025, 1, 20),
)
for fm in transient_result.get("metrics", []):
print(f"{fm['feature_name']} ({fm['feature_type']}):")
print(f" rows={fm['row_count']} nulls={fm['null_count']} null_rate={fm['null_rate']:.4f}")
if fm["feature_type"] == "numeric":
print(f" mean={fm['mean']:.4f} stddev={fm['stddev']:.4f}")
print(f" p50={fm['p50']:.4f} p95={fm['p95']:.4f} p99={fm['p99']:.4f}")
elif fm["feature_type"] == "categorical" and fm.get("histogram"):
hist = fm["histogram"]
print(f" unique_values={hist['unique_count']}")
for entry in hist["values"]:
print(f" {entry['value']}: {entry['count']}")
print()
Once the Feast registry server is running, all monitoring endpoints are available via HTTP.
# Start the server
feast serve_registry
# This cell is for reference — run it when the registry server is up.
import requests
BASE_URL = "http://localhost:6572/api/v1"
# Auto-compute all metrics
resp = requests.post(f"{BASE_URL}/monitoring/auto_compute", json={
"project": "monitoring_demo",
})
print(resp.json())
# Read per-feature metrics
resp = requests.get(f"{BASE_URL}/monitoring/metrics/features", params={
"project": "monitoring_demo",
"feature_view_name": "driver_stats",
"feature_name": "conv_rate",
"granularity": "daily",
"data_source_type": "batch",
})
print(resp.json())
# Read baseline
resp = requests.get(f"{BASE_URL}/monitoring/metrics/baseline", params={
"project": "monitoring_demo",
"feature_view_name": "driver_stats",
"feature_name": "conv_rate",
"data_source_type": "batch",
})
print(resp.json())
If your feature service has logging enabled, you can compute metrics from actual production traffic.
# Example feature service definition with logging
#
# from feast import FeatureService, LoggingConfig
# from feast.infra.offline_stores.contrib.postgres_offline_store.postgres_source import (
# PostgreSQLLoggingDestination,
# )
#
# driver_service = FeatureService(
# name="driver_service",
# features=[driver_stats_fv],
# logging_config=LoggingConfig(
# destination=PostgreSQLLoggingDestination(table_name="feast_driver_logs"),
# sample_rate=1.0,
# ),
# )
print("See the code cell above for the logging config pattern.")
print("Once applied, log metrics can be computed with:")
print(" CLI: feast monitor run --source-type log")
print(" API: POST /monitoring/compute/log")
print(" SDK: monitoring.compute_log_metrics(project, feature_service_name)")
# Uncomment when you have a feature service with logging enabled
#
# result = monitoring.compute_log_metrics(
# project="monitoring_demo",
# feature_service_name="driver_service",
# granularity="daily",
# )
# print(result)
# Or auto-compute all log metrics
# result = monitoring.auto_compute_log_metrics(project="monitoring_demo")
# print(result)
# Compare batch vs. log metrics for the same feature
#
# batch = monitoring.get_feature_metrics(
# project="monitoring_demo",
# feature_view_name="driver_stats",
# feature_name="conv_rate",
# data_source_type="batch",
# granularity="daily",
# )
#
# log = monitoring.get_feature_metrics(
# project="monitoring_demo",
# feature_view_name="driver_stats",
# feature_name="conv_rate",
# data_source_type="log",
# granularity="daily",
# )
#
# print("Batch metrics:")
# for m in batch[:3]:
# print(f" {m['metric_date']}: mean={m['mean']:.4f}")
#
# print("\nLog metrics:")
# for m in log[:3]:
# print(f" {m['metric_date']}: mean={m['mean']:.4f}")
print("Uncomment the cell above once log metrics have been computed.")
# Compute all batch + log metrics daily at 2 AM
0 2 * * * cd /path/to/feast/repo && feast monitor run --source-type all >> /var/log/feast-monitor.log 2>&1
from airflow.operators.bash import BashOperator
monitor_task = BashOperator(
task_id="feast_monitor",
bash_command="feast monitor run --source-type all",
cwd="/path/to/feast/repo",
)
apiVersion: batch/v1
kind: CronJob
metadata:
name: feast-monitor
spec:
schedule: "0 2 * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: feast-monitor
image: feast-image:latest
command: ["feast", "monitor", "run", "--source-type", "all"]
volumeMounts:
- name: feast-repo
mountPath: /feast/repo
restartPolicy: OnFailure
volumes:
- name: feast-repo
configMap:
name: feast-repo-config
| Capability | CLI | REST API | SDK |
|---|---|---|---|
| Auto-compute (all granularities) | feast monitor run | POST /monitoring/auto_compute | monitoring.auto_compute_metrics() |
| Targeted compute | feast monitor run --feature-view X --granularity daily | POST /monitoring/compute | monitoring.compute_metrics() |
| Set baseline | feast monitor run --set-baseline | POST /monitoring/compute (with set_baseline: true) | monitoring.compute_metrics(set_baseline=True) |
| Log metrics | feast monitor run --source-type log | POST /monitoring/compute/log | monitoring.compute_log_metrics() |
| On-demand exploration | — | POST /monitoring/compute/transient | monitoring.compute_transient() |
| Read metrics | — | GET /monitoring/metrics/* | monitoring.get_feature_metrics() etc. |
| Read baseline | — | GET /monitoring/metrics/baseline | monitoring.get_baseline() |
| Time-series | — | GET /monitoring/metrics/timeseries | monitoring.get_timeseries() |