tutorials/delta_lake/1-local-image-batch-inference.ipynb
In this tutorial, we showcase how to perform ML model batch inference on data in a DeltaLake table.
ML Model Batch Inference
When we have a trained machine learning model, the next step is often to apply this model to a large amount of data. This involves efficiently loading the model into memory (potentially GPU memory) and then running data through the model to produce outputs.
To run this tutorial you will require AWS credentials to be correctly provisioned on your machine as all data is hosted in a requestor-pays bucket in AWS S3.
Let's get started!
CI = False
# Skip this notebook execution in CI because it hits non-public buckets
if CI:
import sys
sys.exit()
First, let's provision credentials to Daft! We can do so using the boto3 library, and creating a Daft {class}IOConfig <daft.io.IOConfig> object like so:
import boto3
import daft
session = boto3.session.Session()
creds = session.get_credentials()
io_config = daft.io.IOConfig(
s3=daft.io.S3Config(
access_key=creds.secret_key,
key_id=creds.access_key,
session_token=creds.token,
region_name="us-west-2",
)
)
Now we're ready to read data from our DeltaLake table!
We've hosted a 10k row sample of the validation set of imagenet for you to try this out.
Simply pass in the IOConfig that we previously created to the call in order to ensure that we can access the data.
df = daft.read_deltalake("s3://daft-public-datasets/imagenet/val-10k-sample-deltalake/", io_config=io_config)
df
For this demo, we're running this on our local machine and thus will be limiting the total amount of data to 100.
df = df.limit(100)
df = df.select("folder", "filename", "object")
df.collect()
Let's now resolve the images to their URLs, and start downloading/decoding them into images in our dataframe!
df = df.with_column(
"image_url", "s3://daft-public-datasets/imagenet/val-10k-sample-deltalake/images/" + df["filename"] + ".jpeg"
)
df = df.with_column("image", df["image_url"].download().decode_image())
We also want to do a little preprocessing on our images to get them all into the same size. We can do this with the {meth}.resize <daft.expressions.expressions.Expression.resize> method!
df = df.with_column("image_resized_small", df["image"].resize(32, 32))
df = df.with_column("image_resized_large", df["image"].resize(256, 256))
df.show(4)
Great! We now have our images nicely preprocessed, and are ready to run batch inference on them.
Let's run a simple ResNet image classifier on each image's "high-resolution" and "low-resolution" variant, to see how sensitive our model is to the resolution of the image!
First off, we define a "Stateful UDF" that will initialize our model once in the __init__ method, and then use the same model across multiple invocations on different partitions of data.
import numpy as np
import torch
from torchvision.models import ResNet50_Weights, resnet50
import daft
@daft.udf(return_dtype=daft.DataType.string())
class ClassifyImage:
def __init__(self):
weights = ResNet50_Weights.DEFAULT
self.model = resnet50(weights=weights)
self.model.eval()
self.preprocess = weights.transforms()
self.category_map = weights.meta["categories"]
def __call__(self, images: daft.Series, shape: list[int, int, int]):
if len(images) == 0:
return []
# Convert the Daft Series into a list of Numpy arrays
data = images.cast(daft.DataType.tensor(daft.DataType.uint8(), tuple(shape))).to_pylist()
# Convert the numpy arrays into a torch tensor
images_array = torch.tensor(np.array(data)).permute((0, 3, 1, 2))
# Run the model, and map results back to a human-readable string
batch = self.preprocess(images_array)
prediction = self.model(batch).softmax(0)
class_ids = prediction.argmax(1)
prediction[:, class_ids]
return [self.category_map[class_id] for class_id in class_ids]
To run our model on the dataframe, simply call the ClassifyImage function we defined earlier on the columns!
NOTE: If we wanted to ensure that our UDF will run with a GPU, we can specify:
ClassifyImageWithGPU = ClassifyImage.with_resource_requests(num_gpus=1)
df = df.with_column("predictions_lowres", ClassifyImage(df["image_resized_small"], [32, 32, 3]))
df = df.with_column("predictions_highres", ClassifyImage(df["image_resized_large"], [256, 256, 3]))
df.show(4)
Pretty cool! looks like decreasing the resolution of the image too much does have a strong effect on the model's performance, as expected.
We can go ahead and show just the rows that have show this behavior. We will also need to filter for rows where the image does not have 3 channels because that will break our code.
Note that the following cell will now take a much longer time to run as we need to run the model on all the rows instead of just the first 4!
# Filter out images where the number of channels != 3
df = df.where(df["image"].apply(lambda img: img.shape[2] == 3, return_dtype=daft.DataType.bool()))
# Show only rows where the predictions on the low-res/high-res images don't match
df = df.where(df["predictions_lowres"] != df["predictions_highres"])
df.show(4)