contrib/sarplus/README.md
Pronounced surplus as it's simply better if not best!
Simple Algorithm for Recommendation (SAR) is a neighborhood based algorithm for personalized recommendations based on user transaction history. SAR recommends items that are most similar to the ones that the user already has an existing affinity for. Two items are similar if the users that interacted with one item are also likely to have interacted with the other. A user has an affinity to an item if they have interacted with it in the past.
SARplus is an efficient implementation of this algorithm for Spark.
Features:
| # Users | # Items | # Ratings | Runtime | Environment | Dataset |
|---|---|---|---|---|---|
| 2.5mio | 35k | 100mio | 1.3h | Databricks, 8 workers, Azure Standard DS3 v2 (4 core machines) |
There are a couple of key optimizations:
Two packages should be installed:
from pysarplus import SARPlus
# spark dataframe with user/item/rating/optional timestamp tuples
train_df = spark.createDataFrame(
[(1, 1, 1), (1, 2, 1), (2, 1, 1), (3, 1, 1), (3, 3, 1)],
["user_id", "item_id", "rating"]
)
# spark dataframe with user/item tuples
test_df = spark.createDataFrame(
[(1, 1, 1), (3, 3, 1)],
["user_id", "item_id", "rating"],
)
# To use C++ based fast prediction, a local cache directory needs to be
# specified.
# * On local machine, `cache_path` can be any valid directories. For example,
#
# ```python
# model = SARPlus(
# spark,
# col_user="user_id",
# col_item="item_id",
# col_rating="rating",
# col_timestamp="timestamp",
# similarity_type="jaccard",
# cache_path="cache",
# )
# ```
#
# * On Databricks, `cache_path` needs to be mounted on DBFS. For example,
#
# ```python
# model = SARPlus(
# spark,
# col_user="user_id",
# col_item="item_id",
# col_rating="rating",
# col_timestamp="timestamp",
# similarity_type="jaccard",
# cache_path="dbfs:/mnt/sarpluscache/cache",
# )
# ```
#
# * On Azure Synapse, `cache_path` needs to be mounted on Spark pool's driver
# node. For example,
#
# ```python
# model = SARPlus(
# spark,
# col_user="user_id",
# col_item="item_id",
# col_rating="rating",
# col_timestamp="timestamp",
# similarity_type="jaccard",
# cache_path=f"synfs:/{job_id}/mnt/sarpluscache/cache",
# )
# ```
#
# where `job_id` can be obtained by
#
# ```python
# from notebookutils import mssparkutils
# job_id = mssparkutils.env.getJobId()
# ```
model = SARPlus(
spark,
col_user="user_id",
col_item="item_id",
col_rating="rating",
col_timestamp="timestamp",
similarity_type="jaccard",
)
model.fit(train_df)
# To use C++ based fast prediction, the `use_cache` parameter of
# `SARPlus.recommend_k_items()` also needs to be set to `True`.
#
# ```
# model.recommend_k_items(test_df, top_k=3, use_cache=True).show()
# ```
model.recommend_k_items(test_df, top_k=3, remove_seen=False).show()
Insert this cell prior to the code above.
import os
SARPLUS_MVN_COORDINATE = "com.microsoft.sarplus:sarplus_2.12:0.6.6"
SUBMIT_ARGS = f"--packages {SARPLUS_MVN_COORDINATE} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.appName("sample")
.master("local[*]")
.config("memory", "1G")
.config("spark.sql.shuffle.partitions", "1")
.config("spark.sql.crossJoin.enabled", True)
.config("spark.sql.sources.default", "parquet")
.config("spark.sql.legacy.createHiveTableByDefault", True)
.config("spark.ui.enabled", False)
.getOrCreate()
)
SARPLUS_MVN_COORDINATE="com.microsoft.sarplus:sarplus_2.12:0.6.6"
# Install pysarplus
pip install pysarplus
# Specify sarplus maven coordinate and configure Spark environment
pyspark --packages "${SARPLUS_MVN_COORDINATE}" \
--conf spark.sql.crossJoin.enabled=true \
--conf spark.sql.sources.default=parquet \
--conf spark.sql.legacy.createHiveTableByDefault=true
Library Source select MavenCoordinates:
com.microsoft.sarplus:sarplus_2.12:0.6.6com.microsoft.sarplus:sarplus-spark-3-2-plus_2.12:0.6.6 (if
you're on Spark 3.2+)CreateLibrary Source select PyPIpysarplus==0.6.6CreateThis will install C++, Python and Scala code on your cluster. See Libraries for details on how to install libraries on Azure Databricks.
Navigate to your Databricks Compute
Navigate to your cluster's Configuration -> Advanced options ->
Spark
Put the following configurations into Spark config
spark.sql.crossJoin.enabled true
spark.sql.sources.default parquet
spark.sql.legacy.createHiveTableByDefault true
These will set the crossJoin property to enable calculation of the similarity matrix, and set default sources to parquet.
It can also be configured by putting the following Python code in a notebook cell:
spark.conf.set("spark.sql.crossJoin.enabled", "true")
spark.conf.set("spark.sql.sources.default", "parquet")
spark.conf.set("spark.sql.legacy.createHiveTableByDefault", "true")
To use C++ based fast prediction in
pysarplus.SARPlus.recommend_k_items(), a local cache directory needs
to be specified as the cache_path parameter of pysarplus.SARPlus()
to store intermediate files during its calculation, so you'll also
have to mount shared storage.
For example, you can create a storage
account
(e.g. sarplusstorage) and a container (e.g. sarpluscache) in the
storage account, copy the access key of the storage account, and then
run the following code to mount the storage.
dbutils.fs.mount(
source = "wasbs://<container>@<storage-account>.blob.core.windows.net",
mount_point = "/mnt/<container>",
extra_configs = {
"fs.azure.account.key.<storage-account>.blob.core.windows.net":
"<access-key>"
}
)
where <storage-account>, <container> and <access-key> should be
replaced with the actual values, such as sarplusstorage,
sarpluscache and the access key of the storage account. Then pass
cache_path="dbfs:/mnt/<container>/cache" to pysarplus.SARPlus(),
where cache is the cache's name.
To disable logging messages:
import logging
logging.getLogger("py4j").setLevel(logging.ERROR)
Download pysarplus WHL file from pysarplus@PyPI
Download sarplus JAR file from sarplus@MavenCentralRepository
(or sarplus-spark-3-2-plus@MavenCentralRepository if run on Spark 3.2+)
Navigate to your Azure Synapse workspace -> Manage -> Workspace packages
Upload pysarplus WHL file and sarplus JAR file as workspace packages
Navigate to your Azure Synapse workspace -> Manage -> Apache Spark pools
Find the Spark pool to install the packages -> ... -> Packages
-> Workspace packages -> + Select from workspace packages and
select pysarplus TAR file and sarplus JAR file uploaded in the
previous step
Apply
pysarplus can also be installed via requirements.txt. See Manage
libraries for Apache Spark in Azure Synapse
Analytics
for details on how to manage libraries in Azure Synapse.
To use C++ based fast prediction in
pysarplus.SARPlus.recommend_k_items(), a local cache directory needs
to be specified as the cache_path parameter of pysarplus.SARPlus()
to store intermediate files during its calculation, so you'll also
have to mount shared storage.
For example, you can run the following code to mount the file system (container) of the default/primary storage account.
from notebookutils import mssparkutils
mssparkutils.fs.mount(
"abfss://<container>@<storage-account>.dfs.core.windows.net",
"/mnt/<container>",
{ "linkedService": "<storage-linked-service>"}
)
job_id = mssparkutils.env.getJobId()
Then pass cache_path=f"synfs:/{job_id}/mnt/<container>/cache" to
pysarplus.SARPlus(), where cache is the cache's name. NOTE:
job_id should be prepended to the local path.
See How to use file mount/unmount API in Synapse for more details.
See DEVELOPMENT.md for implementation details and development information.