Back to Feast

Train and Deploy a model using Feast

docs/tutorials/azure/notebooks/part3-train-and-deploy-with-feast.ipynb

0.63.06.1 KB
Original Source

Copyright (c) Microsoft Corporation. Licensed under the MIT license.

Train and Deploy a model using Feast

In this notebook we show how to:

  1. access a feature store
  2. discover features in the feature store
  3. train a model using the offline store (using the feast function get_historical_features())
  4. use the feast materialize() function to push features from the offline store to an online store (redis)
  5. Deploy the model to an Azure ML endpoint where the features are consumed from the online store (feast function get_online_features())

Connect to Feature store

Below we create a Feast repository config, which accesses the registry.db file and also provides the credentials to the offline and online storage. These credentials are done via the Azure Keyvault.

python
import os
from feast import FeatureStore
from azureml.core import Workspace

# access key vault to get secrets
ws = Workspace.from_config()
kv = ws.get_default_keyvault()
os.environ['REGISTRY_PATH']=kv.get_secret("FEAST-REGISTRY-PATH")
os.environ['SQL_CONN']=kv.get_secret("FEAST-OFFLINE-STORE-CONN")
os.environ['REDIS_CONN']=kv.get_secret("FEAST-ONLINE-STORE-CONN")

# connect to feature store
fs = FeatureStore("./feature_repo")

List the feature views

Below lists the registered feature views.

python
fs.list_feature_views()

Load features into a pandas dataframe

Below you load the features from the feature store into a pandas data frame.

python
sql_job = fs.get_historical_features(
    entity_df="SELECT * FROM orders",
    features=[
        "driver_stats:conv_rate",
        "driver_stats:acc_rate",
        "driver_stats:avg_daily_trips",
        "customer_profile:current_balance",
        "customer_profile:avg_passenger_count",
        "customer_profile:lifetime_trip_count",
    ],
)

training_df = sql_job.to_df()
training_df.head()

Train a model and capture metrics with MLFlow

python
import mlflow
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from azureml.core import Workspace

# connect to your workspace
ws = Workspace.from_config()

# create experiment and start logging to a new run in the experiment
experiment_name = "order_model"

# set up MLflow to track the metrics
mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())
mlflow.set_experiment(experiment_name)
mlflow.sklearn.autolog()

training_df = training_df.dropna()
X = training_df[['conv_rate', 'acc_rate', 'avg_daily_trips', 
        'current_balance', 'avg_passenger_count','lifetime_trip_count' ]].dropna()
y = training_df['order_is_success']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.33, random_state=42)
clf = RandomForestClassifier(n_estimators=10)

# train the model
with mlflow.start_run() as run:
    clf.fit(X_train, y_train)

Prepare for deployment

Register model and the feature registry

python
# register the model
model_uri = "runs:/{}/model".format(run.info.run_id)
model = mlflow.register_model(model_uri, "order_model")

materialize() data into the online store (redis)

python
from datetime import datetime, timedelta

end_date = datetime.now()
start_date = end_date - timedelta(days=365)
fs.materialize(start_date=start_date, end_date=end_date)

Set up deployment configuration

Note: You will need to set up a service principal (SP) and add that SP to your blob storage account as a Storage Blob Data Contributor role to authenticate to the storage containing the feast registry file.

az ad sp create-for-rbac -n $sp_name --role "Storage Blob Data Contributor" \ --scopes /subscriptions/$sub_id/resourceGroups/$rg_name

Once you have set up the SP, populate the AZURE_CLIENT_ID, AZURE_TENANT_ID, AZURE_CLIENT_SECRET environment variables below.

python
from azureml.core.environment import Environment
from azureml.core.webservice import AciWebservice
from azureml.core import Workspace

ws = Workspace.from_config()
keyvault = ws.get_default_keyvault()

# create deployment config i.e. compute resources
aciconfig = AciWebservice.deploy_configuration(
    cpu_cores=1,
    memory_gb=1,
    description="orders service using feast",
)

# get registered environment
env = Environment("feast-env")
env.docker.base_image = None
env.docker.base_dockerfile = "./inference.dockerfile"
env.python.user_managed_dependencies = True
env.inferencing_stack_version = 'latest'
env.python.interpreter_path = "/azureml-envs/feast/bin/python"

# again ensure that the scoring environment has access to the registry file
env.environment_variables = {
    "FEAST_SQL_CONN": fs.config.offline_store.connection_string,
    "FEAST_REDIS_CONN": fs.config.online_store.connection_string,
    "FEAST_REGISTRY_BLOB": fs.config.registry.path,
    "AZURE_CLIENT_ID": "PROVIDE YOUR SERVICE PRINCIPLE CLIENT ID HERE",
    "AZURE_TENANT_ID": "PROVIDE YOUR SERVICE PRINCIPLE TENANT ID HERE",
    "AZURE_CLIENT_SECRET": "PROVIDE YOUR SERVICE PRINCIPLE CLIENT SECRET HERE"
}

Deploy model

Next, you deploy the model to Azure Container Instance. Please note that this may take approximately 10 minutes.

python
import uuid
from azureml.core.model import InferenceConfig
from azureml.core.environment import Environment
from azureml.core.model import Model

# get the registered model
model = Model(ws, "order_model")

# create an inference config i.e. the scoring script and environment
inference_config = InferenceConfig(
    entry_script="./src/score.py", 
    environment=env, 
    source_directory="src"
)

# deploy the service
service_name = "orders-service" + str(uuid.uuid4())[:4]
service = Model.deploy(
    workspace=ws,
    name=service_name,
    models=[model],
    inference_config=inference_config,
    deployment_config=aciconfig,
)

service.wait_for_deployment(show_output=True)

Test service

Below you test the service. The first score takes a while as the feast registry file is downloaded from blob. Subsequent runs will be faster as feast uses a local cache for the registry.

python
import json

input_payload = json.dumps({"driver":50521, "customer_id":20265})

service.run(input_data=input_payload)

Clean up service

python
service.delete()