Back to Feast

Test client

examples/rbac-local/client.ipynb

0.63.06.9 KB
Original Source

Test client

python
import yaml
def update_username(username):
    path = 'client/feature_store.yaml'
    with open(path, 'r') as file:
        config = yaml.safe_load(file) or {}
    config['auth']['username'] = username
    with open(path, 'w') as file:
        yaml.safe_dump(config, file, default_flow_style=False)

Update test user

Use one of reader, writer, batch_admin or admin (password is fixed) as the current username.

python
username = 'reader'
update_username(username)
python
!cat client/feature_store.yaml

Updating logger

The following is needed to log in the notebook the output the messages logged by th Feast application.

python
import logging
import sys
from io import StringIO
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger()

Setup Feast client

Initialize the Feast store from the client configuration

python
from feast.feature_store import FeatureStore
python
store = FeatureStore(repo_path="client")

Basic validation

Verify the authorization config and run some GET APIs on the registry.

python
print(f"Authorization config is: {store.config.auth}")
python
for e in store.list_entities():
    print(f"Entity found {e.name}")
python
for fv in store.list_all_feature_views():
    print(f"FeatureView found {fv.name} of type {type(fv).__name__}")
python
for fs in store.list_feature_services():
    print(f"FeatureService found {fs.name} of type {type(fs).__name__}")
python
!feast -c client permissions list

Validating with test_workflow.py

The following test functions were copied from the test_workflow.py template but we added try blocks to print only the relevant error messages, since we expect to receive errors from the permission enforcement modules.

python
import subprocess
from datetime import datetime

import pandas as pd

from feast import FeatureStore
from feast.data_source import PushMode

def fetch_historical_features_entity_df(store: FeatureStore, for_batch_scoring: bool):
    # Note: see https://docs.feast.dev/getting-started/concepts/feature-retrieval for more details on how to retrieve
    # for all entities in the offline store instead
    entity_df = pd.DataFrame.from_dict(
        {
            # entity's join key -> entity values
            "driver_id": [1001, 1002, 1003],
            # "event_timestamp" (reserved key) -> timestamps
            "event_timestamp": [
                datetime(2021, 4, 12, 10, 59, 42),
                datetime(2021, 4, 12, 8, 12, 10),
                datetime(2021, 4, 12, 16, 40, 26),
            ],
            # (optional) label name -> label values. Feast does not process these
            "label_driver_reported_satisfaction": [1, 5, 3],
            # values we're using for an on-demand transformation
            "val_to_add": [1, 2, 3],
            "val_to_add_2": [10, 20, 30],
        }
    )
    # For batch scoring, we want the latest timestamps
    if for_batch_scoring:
        entity_df["event_timestamp"] = pd.to_datetime("now", utc=True)

    try:
        training_df = store.get_historical_features(
            entity_df=entity_df,
            features=[
                "driver_hourly_stats:conv_rate",
                "driver_hourly_stats:acc_rate",
                "driver_hourly_stats:avg_daily_trips",
                "transformed_conv_rate:conv_rate_plus_val1",
                "transformed_conv_rate:conv_rate_plus_val2",
            ],
        ).to_df()
        print(training_df.head())
    except Exception as e:
        print(f"Failed to run `store.get_historical_features`: {e}")


def fetch_online_features(store, source: str = ""):
    entity_rows = [
        # {join_key: entity_value}
        {
            "driver_id": 1001,
            "val_to_add": 1000,
            "val_to_add_2": 2000,
        },
        {
            "driver_id": 1002,
            "val_to_add": 1001,
            "val_to_add_2": 2002,
        },
    ]
    if source == "feature_service":
        try:
            features_to_fetch = store.get_feature_service("driver_activity_v1")
        except Exception as e:
            print(f"Failed to run `store.get_feature_service`: {e}")
    elif source == "push":
        try:
            features_to_fetch = store.get_feature_service("driver_activity_v3")
        except Exception as e:
            print(f"Failed to run `store.get_feature_service`: {e}")
    else:
        features_to_fetch = [
            "driver_hourly_stats:acc_rate",
            "transformed_conv_rate:conv_rate_plus_val1",
            "transformed_conv_rate:conv_rate_plus_val2",
        ]
    try:
        returned_features = store.get_online_features(
            features=features_to_fetch,
            entity_rows=entity_rows,
        ).to_dict()
        for key, value in sorted(returned_features.items()):
            print(key, " : ", value)
    except Exception as e:
        print(f"Failed to run `store.get_online_features`: {e}")
python
store = FeatureStore(repo_path="client")

Historical features

python
print("\n--- Historical features for training ---")
fetch_historical_features_entity_df(store, for_batch_scoring=False)

print("\n--- Historical features for batch scoring ---")
fetch_historical_features_entity_df(store, for_batch_scoring=True)

Materialization

python
print("\n--- Load features into online store ---")
try:
    store.materialize_incremental(end_date=datetime.now())
except Exception as e:
    print(f"Failed to run `store.materialize_incremental`: {e}")

Online features

python
print("\n--- Online features ---")
fetch_online_features(store)

print("\n--- Online features retrieved (instead) through a feature service---")
fetch_online_features(store, source="feature_service")

print(
    "\n--- Online features retrieved (using feature service v3, which uses a feature view with a push source---"
)
fetch_online_features(store, source="push")

Stream push

python
print("\n--- Simulate a stream event ingestion of the hourly stats df ---")
event_df = pd.DataFrame.from_dict(
    {
        "driver_id": [1001],
        "event_timestamp": [
            datetime.now(),
        ],
        "created": [
            datetime.now(),
        ],
        "conv_rate": [1.0],
        "acc_rate": [1.0],
        "avg_daily_trips": [1000],
    }
)
print(event_df)
try:
    store.push("driver_stats_push_source", event_df, to=PushMode.ONLINE_AND_OFFLINE)
except Exception as e:
    print(f"Failed to run `store.push`: {e}")    

print("\n--- Online features again with updated values from a stream push---")
fetch_online_features(store, source="push")

Note If you see the following error, it is likely due to the issue #4392: Remote registry client does not map application errors:

Feature view driver_hourly_stats_fresh does not exist in project rbac