examples/05_operationalize/lightgbm_criteo_o16n.ipynb
Copyright (c) Recommenders contributors.
Licensed under the MIT License.
This notebook provides an example for how a business can use machine learning to automate content based personalization for their customers by using a recommendation system. Azure Databricks is used to train a model that predicts the probability a user will engage with an item. In turn, this estimate can be used to rank items based on the content that a user is most likely to consume.
This notebook creates a scalable real-time scoring service for the Spark based models such as the Content Based Personalization model trained in the MMLSpark-LightGBM-Criteo notebook.
The following components are used in this architecture:
Azure Blob Storage is a storage service optimized for storing massive amounts of unstructured data. In this case, the input data is stored here.
Azure Databricks is a managed Apache Spark cluster where model training and evaluating is performed.
Azure Machine Learning service is used in this scenario to register the machine learning model.
Azure Container Registry is used to package the scoring script as a container image which is used to serve the model in production.
Azure Kubernetes Service is used to deploy the trained models to web or app services.
In order to execute this notebook the following items are assumed:
dbfs:/aml_config/config.jsonIn this example, a "scoring service" is a function that is executed by a docker container. It takes in a post request with JSON formatted payload and produces a score based on a previously estimated model. In our case, we will use the model we estimated earlier that predicts the probability of a user-item interaction based on a set of numeric and categorical features. Because that model was trained using PySpark we will create a Spark session on a single instance (within the docker container) which will use MML Spark Serving to execute the model on the received input data and return the probability of interaction. We will use Azure Machine Learning to create and run the docker container.
In order to create a scoring service, we will do the following steps:
The next few cells initialize the environment and variables: we import relevant libraries and set variables.
import os
import json
import shutil
from recommenders.datasets.criteo import get_spark_schema, load_spark_df
from recommenders.utils.k8s_utils import qps_to_replicas, replicas_to_qps, nodes_to_replicas
from azureml.core import Workspace
from azureml.core import VERSION as azureml_version
from azureml.core.model import Model
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.webservice import Webservice, AksWebservice
from azureml.core.image import ContainerImage
from azureml.core.compute import AksCompute, ComputeTarget
from math import floor
# Check core SDK version number
print("Azure ML SDK version: {}".format(azureml_version))
MODEL_NAME = 'lightgbm_criteo.mml' # this name must exactly match the name used to save the pipeline model in the estimation notebook
MODEL_DESCRIPTION = 'LightGBM Criteo Model'
# Setup AzureML assets (names must be lower case alphanumeric without spaces and between 3 and 32 characters)
# Azure ML Webservice
SERVICE_NAME = 'lightgbm-criteo'
# Azure ML Container Image
CONTAINER_NAME = SERVICE_NAME
CONTAINER_RUN_TIME = 'spark-PY'
# Azure Kubernetes Service (AKS)
AKS_NAME = 'predict-aks'
# Names of other files that are used below
CONDA_FILE = "deploy_conda.yaml"
DRIVER_FILE = "mmlspark_serving.py"
Workspace configuration can be retrieved from the portal and uploaded to Databricks
ws = Workspace.from_config('/dbfs/aml_config/config.json')
In order to create the docker container, the first thing we will do is to prepare the model we estimated in a prior step so that the docker container we are creating will be able to access it. We do this by registering the model to the workspace (see the Azure ML documentation for additional details).
The model has been stored as a directory on dbfs, and before we register it, we do a few additional steps to facilitate the process.
Spark Serving requires the schema of the raw input data. Therefore, we get the schema and store it as an additional file in the model directory.
raw_schema = get_spark_schema()
with open(os.path.join('/dbfs', MODEL_NAME, 'schema.json'), 'w') as f:
f.write(raw_schema.json())
While you can access files on DBFS with local file APIs, it is safer to explicitly copy saved models to and from dbfs, because the local file APIs can only access files smaller than 2 GB (see details here).
model_local = os.path.join(os.getcwd(), MODEL_NAME)
dbutils.fs.cp('dbfs:/' + MODEL_NAME, 'file:' + model_local, recurse=True)
Now we are ready to register the model in the Azure Machine Learning Workspace.
# First the model directory is compressed to minimize data transfer
zip_file = shutil.make_archive(base_name=MODEL_NAME, format='zip', root_dir=model_local)
# Register the model
model = Model.register(model_path=zip_file, # this points to a local file
model_name=MODEL_NAME, # this is the name the model is registered as
description=MODEL_DESCRIPTION,
workspace=ws)
print(model.name, model.description, model.version)
Next, we need to create the driver script that will be executed when the service is called. The functions that need to be defined for scoring are init() and run(). The init() function is run when the service is created, and the run() function is run each time the service is called.
In our example, we use the init() function to load all the libraries, initialize the spark session, start the spark streaming service and load the model pipeline. We use the run() method to route the input to the spark streaming service to generate predictions (in this case the probability of an interaction) then return the output.
driver_file = '''
import os
import json
from time import sleep
from uuid import uuid4
from zipfile import ZipFile
from azureml.core.model import Model
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
import requests
def init():
"""One time initialization of pyspark and model server"""
spark = SparkSession.builder.appName("Model Server").getOrCreate()
import mmlspark # this is needed to load mmlspark libraries
# extract and load model
model_path = Model.get_model_path('{model_name}')
with ZipFile(model_path, 'r') as f:
f.extractall('model')
model = PipelineModel.load('model')
# load data schema saved with model
with open(os.path.join('model', 'schema.json'), 'r') as f:
schema = StructType.fromJson(json.load(f))
input_df = (
spark.readStream.continuousServer()
.address("localhost", 8089, "predict")
.load()
.parseRequest(schema)
)
output_df = (
model.transform(input_df)
.makeReply("probability")
)
checkpoint = os.path.join('/tmp', 'checkpoints', uuid4().hex)
server = (
output_df.writeStream.continuousServer()
.trigger(continuous="30 seconds")
.replyTo("predict")
.queryName("prediction")
.option("checkpointLocation", checkpoint)
.start()
)
# let the server finish starting
sleep(1)
def run(input_json):
try:
response = requests.post(data=input_json, url='http://localhost:8089/predict')
result = response.json()['probability']['values'][1]
except Exception as e:
result = str(e)
return json.dumps({{"result": result}})
'''.format(model_name=MODEL_NAME)
# check syntax
exec(driver_file)
with open(DRIVER_FILE, "w") as f:
f.write(driver_file)
Next, we define the dependencies that are required by the driver script.
# azureml-sdk is required to load the registered model
conda_file = CondaDependencies.create(pip_packages=['azureml-sdk', 'requests']).serialize_to_string()
with open(CONDA_FILE, "w") as f:
f.write(conda_file)
We use the ContainerImage class to first configure the image with the defined driver and dependencies, then to create the image for use later.
Building the image allows it to be downloaded and debugged locally using docker, see troubleshooting instructions
image_config = ContainerImage.image_configuration(execution_script=DRIVER_FILE,
runtime=CONTAINER_RUN_TIME,
conda_file=CONDA_FILE,
tags={"runtime":CONTAINER_RUN_TIME, "model": MODEL_NAME})
image = ContainerImage.create(name=CONTAINER_NAME,
models=[model],
image_config=image_config,
workspace=ws)
image.wait_for_creation(show_output=True)
Once we have created an image, we configure an Azure Kubernetes Service (AKS) and deploy the image as an AKS Webservice.
NOTE We can create a service directly from the registered model and image_configuration with the Webservice.deploy_from_model() function.
We create the image here explicitly and use deploy_from_image() for three reasons:
When we are setting up a production service, we should start by estimating the load we would like to support. In order to estimate that, we need to estimate how long a single call is likely to take. In this example, we have done some local tests, and we have estimated that a single query may take approximately 100 ms to process.
Based on a few additional assumptions, we can estimate how many replicas are required to support a targetted number of queries per second (qps).
Note: This estimate should be used as a ballpark figure to get started, and we can verify performance with subsequent load testing to hone in on better estimates. See this documentation for more details.
We have written some helper functions to support this type of calculation, and we will use them to estimate the number of replicas required to support loads of 25, 50, 100, 200, and 350 queries per second, using 100 ms as our estimate of the time to complete a single query.
all_target_qps = [25, 50, 100, 200, 350]
query_processing_time = 0.1 ## in seconds
replica_estimates = {t: qps_to_replicas(t, query_processing_time) for t in all_target_qps}
Based on the size of our customer base and other considerations (e.g. upcoming announcements that may boost traffic, etc), we make a decision on the maximum load we want to support. In this example, we will say we want to support 100 queries per second, and that will indicate that we should use the corresponding number of replicas (15 based on the estimates above).
Once we have the number of replicas, we then need to make sure we have enough resources (Cores and Memory) within our Azure Kubernetes Service to support that number of replicas. In order to estimate that number, we need to know how many cores are going to be assigned to each replica. This number can be fractional, because there are many use-cases where there are multiple replicas per core. You can see additional details here. When we create the Webservice below, we will allocate 0.3 cpu_cores and 0.5 GB of memory to each replica. To support 15 replicas, we need 15*0.3 cores and 15*0.5 GB of memory.
cpu_cores_per_replica = 0.3
print('{} cores required'.format(replica_estimates[100]*cpu_cores_per_replica))
print('{} GB of memory required'.format(replica_estimates[100]*0.5))
Now that we have an estimate of the number of cores and amount of memory we need, we will configure and create the AKS cluster. By default, AksCompute.provisioning_configuration() will create a configuration that has 3 agents with vm_size='Standard_D3_v2'. Each Standard_D3_v2 virtual machine has 4 cores and 14 GB of memory, so the defaults result in a cluster with a combined 12 cores and 42 GB of memory, which are both sufficient to meet our estimated load requirements.
Note: In this particular case, even though our load requirements are just 4.5 cores, we should not go below 12 cores in the AKS cluster. 12 cores is the minimum number of cores in AKS required for web services. See documentation for details. We can use the agent_count and vm_size parameters to increase the number of cores above 12 if our load requirements demand it, but we should not use them to go below.
# Create AKS compute first
# Use the default configuration (can also provide parameters to customize)
prov_config = AksCompute.provisioning_configuration()
# Create the cluster
aks_target = ComputeTarget.create(
workspace=ws,
name=AKS_NAME,
provisioning_configuration=prov_config
)
aks_target.wait_for_completion(show_output=True)
print(aks_target.provisioning_state)
print(aks_target.provisioning_errors)
Because our estimated load requirements are less than the minimums set by Azure Machine Learning, we should consider an alternate approach to estimating the number of replicas to use for the web service. If this is the only service that will run on the AKS cluster, then we are potentially wasting resources by not leveraging all of the compute resources. Initially, we used the expected load to estimate the number of replicas that should be used. Instead of that approach, we can also use the number of cores in our cluster to estimate the maximum number of replicas that could be supported.
In order to estimate the maximum number of replicas, we do need to consider that there is some overhead on each node for the base kubernetes operations as well as the node's operating system and core functionality. We assume 10% overhead in this case, but you can find more details here.
Note we are using cores in this example, but we could also leverage memory requirements instead.
max_replicas_12_cores = nodes_to_replicas(
n_cores_per_node=4, n_nodes=3, cpu_cores_per_replica=cpu_cores_per_replica
)
Once we have the number of replicas our cluster will support, we can then estimate the queries per second we believe the AKS cluster could support.
replicas_to_qps(max_replicas_12_cores, query_processing_time)
Next, we will configure and create the webservice. In this configuration, we will say each replica will set cpu_cores=cpu_cores_per_replica (default cpu_cores=0.1). We are adjusting this value based on experience and prior testing with this service.
If no arguments are passed to AksWebservice.deploy_configuration(), it uses autoscale_enabled=True with autoscale_min_replicas=1 and autoscale_max_replicas=10. The max value does not meet our minimum requirements to support 100 queries per second, so we need to adjust it. We can adjust this value to either our estimate based on load (15) or our estimate based on the number that can be supported by the AKS cluster (36). In this example, we will set it to the value based on load to allow the AKS cluster to be used for other tasks or services.
webservice_config = AksWebservice.deploy_configuration(cpu_cores=cpu_cores_per_replica,
autoscale_enabled=True,
autoscale_max_replicas=replica_estimates[100])
# Deploy service using created image
aks_service = Webservice.deploy_from_image(
workspace=ws,
name=SERVICE_NAME,
deployment_config=webservice_config,
image=image,
deployment_target=aks_target
)
aks_service.wait_for_deployment(show_output=True)
Next, we can use data from the sample data to test the service.
The service expects JSON as its payload, so we take the sample data, convert to a dictionary, then submit to the service endpoint.
# View the URI
url = aks_service.scoring_uri
print('AKS URI: {}'.format(url))
# Setup authentication using one of the keys from aks_service
headers = dict(Authorization='Bearer {}'.format(aks_service.get_keys()[0]))
# Grab some sample data
df = load_spark_df(size='sample', spark=spark, dbutils=dbutils)
data = df.head().asDict()
print(data)
# Send a request to the AKS cluster
response = requests.post(url=url, json=data, headers=headers)
print(response.json())
When you are done, you can delete the service to minimize costs. You can always redeploy from the image using the same command above.
# Uncomment the following line to delete the web service
# aks_service.delete()
aks_service.state