examples/05_operationalize/als_movie_o16n.ipynb
<i>Copyright (c) Recommenders contributors.</i>
<i>Licensed under the MIT License.</i>
This reference architecture shows the full lifecycle of building a recommendation system. It walks through the creation of appropriate azure resources, training a recommendation model using a Virtual Machine or Databricks, and deploying it as an API. It uses Azure Cosmos DB, Azure Machine Learning, and Azure Kubernetes Service.
This architecture can be generalized for many recommendation engine scenarios, including recommendations for products, movies, and news.
Scenario: A media organization wants to provide movie or video recommendations to its users. By providing personalized recommendations, the organization meets several business goals, including increased click-through rates, increased engagement on site, and higher user satisfaction.
In this reference, we train and deploy a real-time recommender service API that can provide the top 10 movie recommendations for a given user.
This architecture consists of the following key components:
<sup>1) Here, we are just giving an example of using Azure Databricks. Any platforms listed in SETUP can be used as well.</sup>
To run this notebook on Azure Databricks, you should setup Azure Databricks by following the appropriate sections in the repository SETUP instructions and import this notebook into your Azure Databricks Workspace (see instructions here).
Please note: This notebook REQUIRES that you add the dependencies to support operationalization. See SETUP for details.
import os
import sys
import urllib
from azure.common.client_factory import get_client_from_cli_profile
import azure.mgmt.cosmosdb
import azureml.core
from azureml.core import Workspace
from azureml.core.model import Model
from azureml.core.compute import AksCompute, ComputeTarget
from azureml.core.compute_target import ComputeTargetException
from azureml.core.webservice import Webservice, AksWebservice
from azureml.exceptions import WebserviceException
from azureml.core import Environment
from azureml.core.environment import CondaDependencies
from azureml.core.model import InferenceConfig
from azureml.core.environment import SparkPackage
import pydocumentdb.document_client as document_client
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import FloatType, IntegerType, LongType
from recommenders.datasets import movielens
from recommenders.datasets.cosmos_cli import find_collection, read_collection, read_database, find_database
from recommenders.datasets.download_utils import maybe_download
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.notebook_utils import is_databricks
from recommenders.utils.timer import Timer
from recommenders.utils.spark_utils import start_or_get_spark
print("Azure SDK version:", azureml.core.VERSION)
# Start spark session if needed
if not is_databricks():
cosmos_connector = (
"https://search.maven.org/remotecontent?filepath=com/microsoft/azure/"
"azure-cosmosdb-spark_2.3.0_2.11/1.3.3/azure-cosmosdb-spark_2.3.0_2.11-1.3.3-uber.jar"
)
jar_filepath = maybe_download(url=cosmos_connector, filename="cosmos.jar")
spark = start_or_get_spark("ALS", memory="10g", jars=[jar_filepath])
sc = spark.sparkContext
print(sc)
Modify the Subscription ID to the subscription you would like to deploy to and set the resource name variables.
Add your Azure subscription ID
# Add your subscription ID
subscription_id = ""
# Set your workspace name
workspace_name = "o16n-test"
resource_group = "{}-rg".format(workspace_name)
# Set your region to deploy Azure ML workspace
location = "eastus"
# AzureML service and Azure Kubernetes Service prefix
service_name = "mvl-als"
# Login for Azure CLI so that AzureML can use Azure CLI login credentials
!az login
# Change subscription if needed
!az account set --subscription {subscription_id}
# Check account
!az account show
# CosmosDB
# account_name for CosmosDB cannot have "_" and needs to be less than 31 chars
account_name = "{}-ds-sql".format(workspace_name).replace("_", "-")[:31]
cosmos_database = "recommendations"
cosmos_collection = "user_recommendations_als"
# AzureML resource names
model_name = "{}-reco.mml".format(service_name)
aks_name = "{}-aks".format(service_name)
# top k items to recommend
TOP_K = 10
# Select MovieLens data size: 100k, 1m, 10m, or 20m
MOVIELENS_DATA_SIZE = '100k'
userCol = "UserId"
itemCol = "MovieId"
ratingCol = "Rating"
train_data_path = "train"
test_data_path = "test"
This command will check if the AzureML Workspace exists or not, and will create the workspace if it doesn't exist.
ws = Workspace.create(
name=workspace_name,
subscription_id=subscription_id,
resource_group=resource_group,
location=location,
exist_ok=True
)
This step will take some time to create CosmosDB resources.
# explicitly pass subscription_id in case user has multiple subscriptions
client = get_client_from_cli_profile(
azure.mgmt.cosmosdb.CosmosDB,
subscription_id=subscription_id
)
async_cosmosdb_create = client.database_accounts.create_or_update(
resource_group,
account_name,
{
'location': location,
'locations': [{
'location_name': location
}]
}
)
account = async_cosmosdb_create.result()
my_keys = client.database_accounts.list_keys(resource_group, account_name)
master_key = my_keys.primary_master_key
endpoint = "https://" + account_name + ".documents.azure.com:443/"
# DB client
client = document_client.DocumentClient(endpoint, {'masterKey': master_key})
if not find_database(client, cosmos_database):
db = client.CreateDatabase({'id': cosmos_database })
print("Database created")
else:
db = read_database(client, cosmos_database)
print("Database found")
# Create collection options
options = dict(offerThroughput=11000)
# Create a collection
collection_definition = {
'id': cosmos_collection,
'partitionKey': {'paths': ['/id'],'kind': 'Hash'}
}
if not find_collection(client, cosmos_database, cosmos_collection):
collection = client.CreateCollection(
db['_self'],
collection_definition,
options
)
print("Collection created")
else:
collection = read_collection(client, cosmos_database, cosmos_collection)
print("Collection found")
dbsecrets = dict(
Endpoint=endpoint,
Masterkey=master_key,
Database=cosmos_database,
Collection=cosmos_collection,
Upsert=True
)
Next, we train an Alternating Least Squares model on MovieLens dataset.
# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.
schema = StructType(
(
StructField(userCol, IntegerType()),
StructField(itemCol, IntegerType()),
StructField(ratingCol, FloatType()),
)
)
data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema)
data.show()
There are several ways of splitting the data: random, chronological, stratified, etc., each of which favors a different real-world evaluation use case. We will split randomly in this example – for more details on which splitter to choose, consult this guide.
train, test = spark_random_split(data, ratio=0.75, seed=42)
print("N train", train.cache().count())
print("N test", test.cache().count())
To predict movie ratings, we use the rating data in the training set as users' explicit feedback. The hyperparameters used to estimate the model are set based on this page.
Under most circumstances, you would explore the hyperparameters and choose an optimal set based on some criteria. For additional details on this process, please see additional information in the deep dives here.
als = ALS(
rank=10,
maxIter=15,
implicitPrefs=False,
alpha=0.1,
regParam=0.05,
coldStartStrategy='drop',
nonnegative=True,
userCol=userCol,
itemCol=itemCol,
ratingCol=ratingCol,
)
model = als.fit(train)
In the movie recommendation use case, recommending movies that have been rated by the users do not make sense. Therefore, the rated movies are removed from the recommended items.
In order to achieve this, we recommend all movies to all users, and then remove the user-movie pairs that exist in the training dataset.
# Get the cross join of all user-item pairs and score them.
users = train.select(userCol).distinct()
items = train.select(itemCol).distinct()
user_item = users.crossJoin(items)
dfs_pred = model.transform(user_item)
dfs_pred.show()
# Remove seen items.
dfs_pred_exclude_train = dfs_pred.alias("pred").join(
train.alias("train"),
(dfs_pred[userCol]==train[userCol]) & (dfs_pred[itemCol]==train[itemCol]),
how='outer'
)
top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train["train."+ratingCol].isNull()) \
.select("pred."+userCol, "pred."+itemCol, "pred.prediction")
top_all.show()
Evaluate model performance using metrics such as Precision@K, Recall@K, [MAP@K](https://en.wikipedia.org/wiki/Evaluation_measures_\(information_retrieval\) or nDCG@K. For a full guide on what metrics to evaluate your recommender with, consult [this guide]../03_evaluate/evaluation.ipynb).
cols = {
'col_user': userCol,
'col_item': itemCol,
'col_rating': ratingCol,
'col_prediction': "prediction",
}
test.show()
# Evaluate Ranking Metrics
rank_eval = SparkRankingEvaluation(
test,
top_all,
k=TOP_K,
**cols
)
print(
"Model:\tALS",
"Top K:\t%d" % rank_eval.k,
"MAP:\t%f" % rank_eval.map_at_k(),
"NDCG:\t%f" % rank_eval.ndcg_at_k(),
"Precision@K:\t%f" % rank_eval.precision_at_k(),
"Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n'
)
# Evaluate Rating Metrics
prediction = model.transform(test)
rating_eval = SparkRatingEvaluation(
test,
prediction,
**cols
)
print(
"Model:\tALS rating prediction",
"RMSE:\t%.2f" % rating_eval.rmse(),
"MAE:\t%f" % rating_eval.mae(),
"Explained variance:\t%f" % rating_eval.exp_var(),
"R squared:\t%f" % rating_eval.rsquared(), sep='\n'
)
(model
.write()
.overwrite()
.save(model_name))
Once the model is built with desirable performance, it will be operationalized to run as a REST endpoint to be utilized by a real time service. We will utilize Azure Cosmos DB, Azure Machine Learning Service, and Azure Kubernetes Service to operationalize the recommender service.
First, the Top-10 recommendations for each user as predicted by the model are stored as a lookup table in Cosmos DB. At runtime, the service will return the Top-10 recommendations as precomputed and stored in Cosmos DB:
recs = model.recommendForAllUsers(10)
recs_topk = recs.withColumn("id", recs[userCol].cast("string")) \
.select("id", "recommendations." + itemCol)
recs_topk.show()
# Save data to CosmosDB
(recs_topk.coalesce(1)
.write
.format("com.microsoft.azure.cosmosdb.spark")
.mode('overwrite')
.options(**dbsecrets)
.save())
Next, Azure Machine Learning Service is used to create a model scoring image and deploy it to Azure Kubernetes Service as a scalable containerized service. To achieve this, a scoring script should be created. In the script, we make a call to Cosmos DB to lookup the top 10 movies to recommend given an input User ID.
score_sparkml = """
import json
import pydocumentdb.document_client as document_client
def init(local=False):
global client, collection
try:
client = document_client.DocumentClient('{endpoint}', dict(masterKey='{key}'))
collection = client.ReadCollection(collection_link='dbs/{database}/colls/{collection}')
except Exception as e:
collection = e
def run(input_json):
try:
# Query them in SQL
id = str(json.loads(json.loads(input_json)[0])['id'])
query = dict(query='SELECT * FROM c WHERE c.id = "' + id +'"')
options = dict(partitionKey=str(id))
document_link = 'dbs/{database}/colls/{collection}/docs/' + id
result = client.ReadDocument(document_link, options);
except Exception as e:
result = str(e)
return json.dumps(str(result))
""".format(key=dbsecrets['Masterkey'],
endpoint=dbsecrets['Endpoint'],
database=dbsecrets['Database'],
collection=dbsecrets['Collection'])
# test validity of python string
exec(score_sparkml)
with open("score_sparkml.py", "w") as file:
file.write(score_sparkml)
Register your model:
mymodel = Model.register(
model_path=model_name, # this points to a local file
model_name=model_name, # this is the name the model is registered as
description="AML trained model",
workspace=ws
)
print(mymodel.name, mymodel.description, mymodel.version)
env = Environment(name='sparkmlenv')
# Specify a public image from microsoft/mmlspark as base image
env.docker.base_image="microsoft/mmlspark:0.15"
pip = [
'azureml-defaults',
'numpy==1.14.2',
'scikit-learn==0.19.1',
'pandas',
'pydocumentdb'
]
# Add dependencies needed for inferencing
env.python.conda_dependencies = CondaDependencies.create(pip_packages=pip)
env.inferencing_stack_version = "latest"
# Add spark packages
env.spark.precache_packages = True
env.spark.repositories = ["https://mmlspark.azureedge.net/maven"]
env.spark.packages= [
SparkPackage("com.microsoft.ml.spark", "mmlspark_2.11", "0.15"),
SparkPackage("com.microsoft.azure", artifact="azure-storage", version="2.0.0"),
SparkPackage(group="org.apache.hadoop", artifact="hadoop-azure", version="2.7.0")
]
This may take 20 to 30 minutes depending on the cluster size.
# Verify that cluster does not exist already
try:
aks_target = ComputeTarget(workspace=ws, name=aks_name)
print("Found existing cluster, use it.")
except ComputeTargetException:
# Create the cluster using the default configuration (can also provide parameters to customize)
prov_config = AksCompute.provisioning_configuration()
aks_target = ComputeTarget.create(
workspace=ws,
name=aks_name,
provisioning_configuration=prov_config
)
aks_target.wait_for_completion(show_output = True)
print(aks_target.provisioning_state)
# To check any error logs, print(aks_target.provisioning_errors)
# Create an Inferencing Configuration with your environment and scoring script
inference_config = InferenceConfig(
environment=env,
entry_script="score_sparkml.py"
)
# Set the web service configuration (using default here with app insights)
aks_config = AksWebservice.deploy_configuration(enable_app_insights=True)
# Webservice creation using single command
try:
aks_service = Model.deploy(
workspace=ws,
models=[mymodel],
name=service_name,
inference_config=inference_config,
deployment_config=aks_config,
deployment_target=aks_target
)
aks_service.wait_for_deployment(show_output=True)
except WebserviceException:
# Retrieve existing service.
aks_service = Webservice(ws, name=service_name)
print("Retrieved existing service")
After the deployment, the service can be called with a user ID – the service will then look up the top 10 recommendations for that user in Cosmos DB and send back the results. The following script demonstrates how to call the recommendation service API and view the result for the given user ID:
import json
scoring_url = aks_service.scoring_uri
service_key = aks_service.get_keys()[0]
input_data = '["{\\"id\\":\\"496\\"}"]'.encode()
req = urllib.request.Request(scoring_url, data=input_data)
req.add_header("Authorization","Bearer {}".format(service_key))
req.add_header("Content-Type","application/json")
with Timer() as t:
with urllib.request.urlopen(req) as result:
res = result.read()
resj = json.loads(
# Cleanup to parse into a json object
res.decode("utf-8")
.replace("\\", "")
.replace('"', "")
.replace("'", '"')
)
print(json.dumps(resj, indent=4))
print("Full run took %.2f seconds" % t.interval)
In the previous cells, we utilized Cosmos DB to cache the recommendation results for realtime serving. Alternatively, we can generate recommendation results on demand by using the model we deployed. Following scripts load the registered model and use it for recommendation:
score_sparkml.py
import json
import os
from pyspark.ml.recommendation import ALSModel
# Note, set `model_name`, `userCol`, and `itemCol` defined earlier.
model_name = "mvl-als-reco.mml"
userCol = "UserId"
itemCol = "MovieId"
def init(local=False):
global model
# Load ALS model.
model_path = os.path.join(os.getenv('AZUREML_MODEL_DIR'), model_name)
model = ALSModel.load(model_path)
def run(input_json):
js = json.loads(json.loads(input_json)[0])
id = str(js['id'])
k = js.get('k', 10)
# Use the model to get recommendation.
recs = model.recommendForAllUsers(k)
recs_topk = recs.withColumn('id', recs[userCol].cast("string")).select(
'id', "recommendations." + itemCol
)
result = recs_topk[recs_topk.id==id].collect()[0].asDict()
return json.dumps(str(result))
Call the AKS model service
# Get a recommendation of 10 movies
input_data = '["{\\"id\\":\\"496\\",\\"k\\":10}"]'.encode()
req = urllib.request.Request(scoring_url, data=input_data)
req.add_header("Authorization","Bearer {}".format(service_key))
req.add_header("Content-Type","application/json")
...