doc/source/ray-overview/examples/e2e-multimodal-ai-workloads/notebooks/01-Batch-Inference.ipynb
<a href="https://github.com/anyscale/multimodal-ai" role="button"></a>
</div>This tutorial executes a batch inference workload that connects the following heterogeneous workloads:
%%bash
pip install -q -r /home/ray/default/requirements.txt
pip install -q -e /home/ray/default/doggos
Note: A kernel restart may be required for all dependencies to become available.
If using uv, then:
Dependencies tab up top > Toggle off Pip packages). And no need to run the pip install commands above.venv (Click on base (Python x.yy.zz) on top right cordern of notebook > Select another Kernel > Python Environments... > Create Python Environment > Venv > Use Existing) and done! Now all the notebook's cells will use the virtual env.uv run instead of python by adding this line after importing ray.import os
os.environ.pop("RAY_RUNTIME_ENV_HOOK", None)
import ray
ray.init(runtime_env={"py_executable": "uv run", "working_dir": "/home/ray/default"})
%load_ext autoreload
%autoreload all
import os
import ray
import sys
sys.path.append(os.path.abspath("../doggos/"))
# If using UV
# os.environ.pop("RAY_RUNTIME_ENV_HOOK", None)
# ray.init(runtime_env={"py_executable": "uv run", "working_dir": "/home/ray/default"})
from doggos import utils
Start by reading the data from a public cloud storage bucket.
# Load data.
ds = ray.data.read_images(
"s3://doggos-dataset/train",
include_paths=True,
shuffle="files",
)
ds.take(1)
Ray Data supports a wide range of data sources for both loading and saving from generic binary files in cloud storage to structured data formats used by modern data platforms. This example reads data from a public S3 bucket prepared with the dataset. This read operation, much like the write operation in a later step, runs in a distributed fashion. As a result, Ray Data processes the data in parallel across the cluster and doesn't need to load the data entirely into memory at once, making data loading scalable and memory-efficient.
trigger lazy execution: use take to trigger the execution because Ray has lazy execution mode, which decreases execution time and memory utilization. But, this approach means that you need an operation like take, count, write, etc., to actually execute the workflow DAG.
shuffling strategies: to shuffle the dataset because it's all ordered by class, randomly shuffle the ordering of input files before reading. Ray Data also provides an extensive list of shuffling strategies such as local shuffles, per-epoch shuffles, etc.
materialize during development: use materialize to execute and materialize the dataset into Ray's shared memory object store memory. This way, you save a checkpoint at this point and future operations on the dataset can start from this point. You won't rerun all operations on the dataset again from scratch. This feature is convenient during development, especially in a stateful environment like Jupyter notebooks, because you can run from saved checkpoints.
ds = ds.map(...)
ds = ds.materialize()
Note: only use this during development and use it with small datasets, as it will load it all into memory.
You also want to add the class for each data point. When reading the data with include_paths Ray Data saves the filename with each data point. The filename has the class label in it so add that to each data point's row. Use Ray Data's map function to apply the function to each row.
def add_class(row):
row["class"] = row["path"].rsplit("/", 3)[-2]
return row
# Add class.
ds = ds.map(add_class)
❌ Traditional batch execution, for example, non-streaming like Spark without pipelining, SageMaker Batch Transform:
✅ Streaming execution with Ray Data:
Note: Ray Data isn't a real-time stream processing engine like Flink or Kafka Streams. Instead, it's batch processing with streaming execution, which is especially useful for iterative ML workloads, ETL pipelines, and preprocessing before training or inference. Ray typically has a 2-17x throughput improvement over solutions like Spark and SageMaker Batch Transform, etc.
The previous section applied a mapping operation using a function to each row in the dataset. Now you're ready to generate embeddings from the data and using Ray Data's map_batches to apply an operation across batches of the data. The operation is in the form of a callable, which is a function or a class with a __call__ method.
import numpy as np
from PIL import Image
import torch
from transformers import CLIPModel, CLIPProcessor
class EmbedImages(object):
def __init__(self, model_id, device):
# Load CLIP model and processor
self.processor = CLIPProcessor.from_pretrained(model_id)
self.model = CLIPModel.from_pretrained(model_id)
self.model.to(device)
self.device = device
def __call__(self, batch):
# Load and preprocess images
images = [Image.fromarray(np.uint8(img)).convert("RGB") for img in batch["image"]]
inputs = self.processor(images=images, return_tensors="pt", padding=True).to(self.device)
# Generate embeddings
with torch.inference_mode():
batch["embedding"] = self.model.get_image_features(**inputs).cpu().numpy()
return batch
Instead of initializing the same model for each instance of the class above, we can instead use references to Ray's shared memory object store. We can load the model once, store it inside the default object store and then have each instance of our class refer to it.
model = load_model(...)
model_ref = ray.put(model)
class Foo:
def __init__(self, model_ref):
self.model = ray.get(model_ref)
...
# Generate batch embeddings
embeddings_ds = ds.map_batches(
EmbedImages,
fn_constructor_kwargs={
"model_id": "openai/clip-vit-base-patch32",
"device": "cuda",
}, # class kwargs
fn_kwargs={}, # __call__ kwargs
compute=ray.data.ActorPoolStrategy(size=4),
batch_size=64,
num_gpus=1,
accelerator_type="T4",
)
embeddings_ds = embeddings_ds.drop_columns(["image"]) # remove image column
Ray Data not only makes it extremely easy to distribute workloads but also ensures that they with:
🔥 RayTurbo Data has more functionality on top of Ray Data:
import shutil
# Save to artifact storage.
embeddings_path = os.path.join("/mnt/cluster_storage", "doggos/embeddings")
if os.path.exists(embeddings_path):
shutil.rmtree(embeddings_path) # clean up
embeddings_ds.write_parquet(embeddings_path)
You can always store to the data inside any storage buckets but Anyscale offers a default storage bucket to make things easier. You also have plenty of other storage options as well, for example, shared at the cluster, user and cloud levels.
Note: ideally you would store these embeddings in a vector database like efficient search, filter, index, etc., but for this tutorial, just store to a shared file system.
While you're developing out workloads, Ray offers logs and an observability dashboard that you can use to monitor and debug. The dashboard includes a lot of different components such as:
memory, utilization, etc., of the tasks running in the cluster
views to see all running tasks, utilization across instance types, autoscaling, etc.
🔥 While OSS Ray comes with an extensive observability suite, Anyscale takes it many steps further to make it easier and faster to monitor and debug workloads.
Ray workload specific dashboard, like Data, Train, etc., that can breakdown the tasks
unified log viewer to see logs from all driver and worker processes
Anyscale Jobs (API ref) allows you to execute discrete workloads in production such as batch inference, embeddings generation, or model fine-tuning.
Wrap the batch embedding generation workload as an Anyscale Job by providing the main command to run, python doggos/embed.py, and the appropriate compute and dependencies required for it. Also set the working directory to the default directory so that the Job has access to all the files for the workload.
Note:
containerfile to define dependencies, but you could easily use a pre-built image as well.%%bash
# Production batch embedding generation job
anyscale job submit -f /home/ray/default/configs/generate_embeddings.yaml
Process a new image, embed it, and then retrieve the top similar images, based on embedding similarity (cosine), from the larger dataset of images you just computed batch embeddings for.
from io import BytesIO
from PIL import Image
import numpy as np
import requests
from doggos.embed import get_top_matches, display_top_matches
def url_to_array(url):
return np.array(Image.open(
BytesIO(requests.get(url).content)).convert("RGB"))
# Embed input image.
url = "https://doggos-dataset.s3.us-west-2.amazonaws.com/samara.png"
image = url_to_array(url=url)
embedding_generator = EmbedImages(model_id="openai/clip-vit-base-patch32", device="cpu")
embedding = embedding_generator({"image": [image]})["embedding"][0]
np.shape(embedding)
# Top matches by embedding similarity.
embeddings_ds = ray.data.read_parquet(embeddings_path)
top_matches = get_top_matches(embedding, embeddings_ds, n=5)
display_top_matches(url, top_matches)
🚨 Note: Reset this notebook using the "🔄 Restart" button location at the notebook's menu bar. This way we can free up all the variables, utils, etc. used in this notebook.