examples/rbac-local/client.ipynb
import yaml
def update_username(username):
path = 'client/feature_store.yaml'
with open(path, 'r') as file:
config = yaml.safe_load(file) or {}
config['auth']['username'] = username
with open(path, 'w') as file:
yaml.safe_dump(config, file, default_flow_style=False)
Use one of reader, writer, batch_admin or admin (password is fixed) as the current username.
username = 'reader'
update_username(username)
!cat client/feature_store.yaml
The following is needed to log in the notebook the output the messages logged by th Feast application.
import logging
import sys
from io import StringIO
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger()
Initialize the Feast store from the client configuration
from feast.feature_store import FeatureStore
store = FeatureStore(repo_path="client")
Verify the authorization config and run some GET APIs on the registry.
print(f"Authorization config is: {store.config.auth}")
for e in store.list_entities():
print(f"Entity found {e.name}")
for fv in store.list_all_feature_views():
print(f"FeatureView found {fv.name} of type {type(fv).__name__}")
for fs in store.list_feature_services():
print(f"FeatureService found {fs.name} of type {type(fs).__name__}")
!feast -c client permissions list
The following test functions were copied from the test_workflow.py template but we added try blocks to print only
the relevant error messages, since we expect to receive errors from the permission enforcement modules.
import subprocess
from datetime import datetime
import pandas as pd
from feast import FeatureStore
from feast.data_source import PushMode
def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool):
# Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve
# for all entities in the offline store instead
entity_df = pd.DataFrame.from_dict(
{
# entity's join key -> entity values
"driver_id": [1001, 1002, 1003],
# "event_timestamp" (reserved key) -> timestamps
"event_timestamp": [
datetime(2021, 4, 12, 10, 59, 42),
datetime(2021, 4, 12, 8, 12, 10),
datetime(2021, 4, 12, 16, 40, 26),
],
# (optional) label name -> label values. Feast does not process these
"label_driver_reported_satisfaction": [1, 5, 3],
# values we're using for an on-demand transformation
"val_to_add": [1, 2, 3],
"val_to_add_2": [10, 20, 30],
}
)
# For batch scoring, we want the latest timestamps
if for_batch_scoring:
entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)
try:
training_df = store.get_historical_features(
entity_df=entity_df,
features=[
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
"transformed_conv_rate:conv_rate_plus_val1",
"transformed_conv_rate:conv_rate_plus_val2",
],
).to_df()
print(training_df.head())
except Exception as e:
print(f"Failed to run `store.get_historical_features`: {e}")
def fetch_online_features(store, source: str = ""):
entity_rows = [
# {join_key: entity_value}
{
"driver_id": 1001,
"val_to_add": 1000,
"val_to_add_2": 2000,
},
{
"driver_id": 1002,
"val_to_add": 1001,
"val_to_add_2": 2002,
},
]
if source == "feature_service":
try:
features_to_fetch = store.get_feature_service("driver_activity_v1")
except Exception as e:
print(f"Failed to run `store.get_feature_service`: {e}")
elif source == "push":
try:
features_to_fetch = store.get_feature_service("driver_activity_v3")
except Exception as e:
print(f"Failed to run `store.get_feature_service`: {e}")
else:
features_to_fetch = [
"driver_hourly_stats:acc_rate",
"transformed_conv_rate:conv_rate_plus_val1",
"transformed_conv_rate:conv_rate_plus_val2",
]
try:
returned_features = store.get_online_features(
features=features_to_fetch,
entity_rows=entity_rows,
).to_dict()
for key, value in sorted(returned_features.items()):
print(key, " : ", value)
except Exception as e:
print(f"Failed to run `store.get_online_features`: {e}")
store = FeatureStore(repo_path="client")
print("\n--- Historical features for training ---")
fetch_historical_features_entity_df(store, for_batch_scoring=False)
print("\n--- Historical features for batch scoring ---")
fetch_historical_features_entity_df(store, for_batch_scoring=True)
print("\n--- Load features into online store ---")
try:
store.materialize_incremental(end_date=datetime.now())
except Exception as e:
print(f"Failed to run `store.materialize_incremental`: {e}")
print("\n--- Online features ---")
fetch_online_features(store)
print("\n--- Online features retrieved (instead) through a feature service---")
fetch_online_features(store, source="feature_service")
print(
"\n--- Online features retrieved (using feature service v3, which uses a feature view with a push source---"
)
fetch_online_features(store, source="push")
print("\n--- Simulate a stream event ingestion of the hourly stats df ---")
event_df = pd.DataFrame.from_dict(
{
"driver_id": [1001],
"event_timestamp": [
datetime.now(),
],
"created": [
datetime.now(),
],
"conv_rate": [1.0],
"acc_rate": [1.0],
"avg_daily_trips": [1000],
}
)
print(event_df)
try:
store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE_AND_OFFLINE)
except Exception as e:
print(f"Failed to run `store.push`: {e}")
print("\n--- Online features again with updated values from a stream push---")
fetch_online_features(store, source="push")
Note If you see the following error, it is likely due to the issue #4392: Remote registry client does not map application errors:
Feature view driver_hourly_stats_fresh does not exist in project rbac