examples/rapids/mlflow_project/notebooks/rapids_mlflow.ipynb
import time
from cuml.ensemble import RandomForestClassifier
from cuml.metrics.accuracy import accuracy_score
from cuml.preprocessing.model_selection import train_test_split
import mlflow
import mlflow.sklearn
from mlflow.models import infer_signature
#!python -c "from sklearn.datasets import load_iris; d = load_iris(as_frame=True); d.frame.to_csv('iris.csv', index=False)"
def load_data(fpath):
"""
Simple helper function for loading data to be used by CPU/GPU models.
Args:
fpath: Path to the data to be ingested
Returns:
DataFrame wrapping the data at [fpath]. Data will be in either a Pandas or RAPIDS (cuDF) DataFrame
"""
import cudf
df = cudf.read_csv(fpath)
X = df.drop(["target"], axis=1)
y = df["target"].astype("int32")
return train_test_split(X, y, test_size=0.2)
def train(fpath, max_depth, max_features, n_estimators):
"""
Args:
fpath: Path or URL for the training data used with the model.
max_depth: int Max tree depth
max_features: float percentage of features to use in classification
n_estimators: int number of trees to create
Returns:
Trained model
"""
X_train, X_test, y_train, y_test = load_data(fpath)
mod = RandomForestClassifier(
max_depth=max_depth, max_features=max_features, n_estimators=n_estimators
)
acc_scorer = accuracy_score
mod.fit(X_train, y_train)
preds = mod.predict(X_test)
acc = acc_scorer(y_test, preds)
mlparams = {
"max_depth": str(max_depth),
"max_features": str(max_features),
"n_estimators": str(n_estimators),
}
mlflow.log_params(mlparams)
mlmetrics = {"accuracy": acc}
mlflow.log_metrics(mlmetrics)
return mod, infer_signature(X_train.to_pandas(), y_train.to_pandas())
conda_env = "conda.yaml"
fpath = "iris.csv"
max_depth = 10
max_features = 0.75
n_estimators = 500
artifact_path = "Airline-Demo"
artifact_uri = None
experiment_name = "RAPIDS-Notebook"
experiment_id = None
mlflow.set_tracking_uri(uri="sqlite:////tmp/mlflow-db.sqlite")
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name="(Notebook) RAPIDS-MLflow"):
model, signature = train(fpath, max_depth, max_features, n_estimators)
mlflow.sklearn.log_model(
model,
signature=signature,
name=artifact_path,
registered_model_name="rapids-mlflow-notebook",
conda_env="conda.yaml",
)
artifact_uri = mlflow.get_artifact_uri(artifact_path=artifact_path)
print(artifact_uri)
Note: The serving thread will continue to run after cell execution. Select the cell and click 'interrupt the kernel' to stop it.
In a terminal, run: mlflow models serve -m [artifact_uri] -p [port], you should see something similar to the following:
2020/07/27 13:59:49 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
2020/07/27 13:59:49 INFO mlflow.pyfunc.backend: === Running command 'source /anaconda3/bin/../etc/profile.d/conda.sh && conda activate mlflow-3335621df6011b1847d2555b195418d4496e5ffd 1>&2 && gunicorn --timeout=60 -b 127.0.0.1:5000 -w 1 ${GUNICORN_CMD_ARGS} -- mlflow.pyfunc.scoring_server.wsgi:app'
[2020-07-27 13:59:50 -0600] [23779] [INFO] Starting gunicorn 20.0.4
[2020-07-27 13:59:50 -0600] [23779] [INFO] Listening at: http://127.0.0.1:5000 (23779)
[2020-07-27 13:59:50 -0600] [23779] [INFO] Using worker: sync
[2020-07-27 13:59:50 -0600] [23788] [INFO] Booting worker with pid: 23788
import json
import requests
host = "localhost"
port = "55755"
headers = {
"Content-Type": "application/json",
}
data = {
"columns": [
"Year",
"Month",
"DayofMonth",
"DayofWeek",
"CRSDepTime",
"CRSArrTime",
"UniqueCarrier",
"FlightNum",
"ActualElapsedTime",
"Origin",
"Dest",
"Distance",
"Diverted",
],
"data": [[1987, 10, 1, 4, 1, 556, 0, 190, 247, 202, 162, 1846, 0]],
}
## Pause to let server start
time.sleep(5)
while True:
try:
resp = requests.post(
url=f"http://{host}:{port}/invocations",
data=json.dumps({"dataframe_split": data}),
headers=headers,
)
print("Classification: %s" % ("ON-Time" if resp.text == "[0.0]" else "LATE"))
break
except Exception as e:
errmsg = f"Caught exception attempting to call model endpoint: {e}"
print(errmsg, end="")
print("Sleeping")
time.sleep(20)