Back to Mlflow

Train and Publish Locally With RAPIDS and MLflow

examples/rapids/mlflow_project/notebooks/rapids_mlflow.ipynb

3.12.04.8 KB
Original Source

Train and Publish Locally With RAPIDS and MLflow

python
import time

from cuml.ensemble import RandomForestClassifier
from cuml.metrics.accuracy import accuracy_score
from cuml.preprocessing.model_selection import train_test_split

import mlflow
import mlflow.sklearn
from mlflow.models import infer_signature

Pull sample airline data

python
#!python -c "from sklearn.datasets import load_iris; d = load_iris(as_frame=True); d.frame.to_csv('iris.csv', index=False)"

Define data loader, using cuDF

python
def load_data(fpath):
    """
    Simple helper function for loading data to be used by CPU/GPU models.

    Args:
        fpath: Path to the data to be ingested

    Returns:
        DataFrame wrapping the data at [fpath]. Data will be in either a Pandas or RAPIDS (cuDF) DataFrame
    """
    import cudf

    df = cudf.read_csv(fpath)
    X = df.drop(["target"], axis=1)
    y = df["target"].astype("int32")

    return train_test_split(X, y, test_size=0.2)

Define our training routine.

python
def train(fpath, max_depth, max_features, n_estimators):
    """
    Args:
        fpath: Path or URL for the training data used with the model.
        max_depth: int Max tree depth
        max_features: float percentage of features to use in classification
        n_estimators: int number of trees to create

    Returns:
        Trained model
    """
    X_train, X_test, y_train, y_test = load_data(fpath)
    mod = RandomForestClassifier(
        max_depth=max_depth, max_features=max_features, n_estimators=n_estimators
    )
    acc_scorer = accuracy_score

    mod.fit(X_train, y_train)
    preds = mod.predict(X_test)
    acc = acc_scorer(y_test, preds)

    mlparams = {
        "max_depth": str(max_depth),
        "max_features": str(max_features),
        "n_estimators": str(n_estimators),
    }
    mlflow.log_params(mlparams)

    mlmetrics = {"accuracy": acc}
    mlflow.log_metrics(mlmetrics)

    return mod, infer_signature(X_train.to_pandas(), y_train.to_pandas())

Implement our MLflow training loop, and save our best model to the tracking server.

python
conda_env = "conda.yaml"
fpath = "iris.csv"

max_depth = 10
max_features = 0.75
n_estimators = 500

artifact_path = "Airline-Demo"
artifact_uri = None
experiment_name = "RAPIDS-Notebook"
experiment_id = None

mlflow.set_tracking_uri(uri="sqlite:////tmp/mlflow-db.sqlite")
mlflow.set_experiment(experiment_name)

with mlflow.start_run(run_name="(Notebook) RAPIDS-MLflow"):
    model, signature = train(fpath, max_depth, max_features, n_estimators)

    mlflow.sklearn.log_model(
        model,
        signature=signature,
        name=artifact_path,
        registered_model_name="rapids-mlflow-notebook",
        conda_env="conda.yaml",
    )

    artifact_uri = mlflow.get_artifact_uri(artifact_path=artifact_path)
print(artifact_uri)

Begin serving our trained model using MLflow

Note: The serving thread will continue to run after cell execution. Select the cell and click 'interrupt the kernel' to stop it.

In a terminal, run: mlflow models serve -m [artifact_uri] -p [port], you should see something similar to the following:

shell
2020/07/27 13:59:49 INFO mlflow.models.cli: Selected backend for flavor 'python_function'
2020/07/27 13:59:49 INFO mlflow.pyfunc.backend: === Running command 'source /anaconda3/bin/../etc/profile.d/conda.sh && conda activate mlflow-3335621df6011b1847d2555b195418d4496e5ffd 1>&2 && gunicorn --timeout=60 -b 127.0.0.1:5000 -w 1 ${GUNICORN_CMD_ARGS} -- mlflow.pyfunc.scoring_server.wsgi:app'
[2020-07-27 13:59:50 -0600] [23779] [INFO] Starting gunicorn 20.0.4
[2020-07-27 13:59:50 -0600] [23779] [INFO] Listening at: http://127.0.0.1:5000 (23779)
[2020-07-27 13:59:50 -0600] [23779] [INFO] Using worker: sync
[2020-07-27 13:59:50 -0600] [23788] [INFO] Booting worker with pid: 23788

Make requests against the deployed model

python
import json

import requests

host = "localhost"
port = "55755"

headers = {
    "Content-Type": "application/json",
}

data = {
    "columns": [
        "Year",
        "Month",
        "DayofMonth",
        "DayofWeek",
        "CRSDepTime",
        "CRSArrTime",
        "UniqueCarrier",
        "FlightNum",
        "ActualElapsedTime",
        "Origin",
        "Dest",
        "Distance",
        "Diverted",
    ],
    "data": [[1987, 10, 1, 4, 1, 556, 0, 190, 247, 202, 162, 1846, 0]],
}

## Pause to let server start
time.sleep(5)

while True:
    try:
        resp = requests.post(
            url=f"http://{host}:{port}/invocations",
            data=json.dumps({"dataframe_split": data}),
            headers=headers,
        )
        print("Classification: %s" % ("ON-Time" if resp.text == "[0.0]" else "LATE"))
        break
    except Exception as e:
        errmsg = f"Caught exception attempting to call model endpoint: {e}"
        print(errmsg, end="")
        print("Sleeping")
        time.sleep(20)