doc/source/train/examples/pytorch/torch_detection.ipynb
This tutorial explains how to fine-tune fasterrcnn_resnet50_fpn using the Ray AI libraries for parallel data ingest and training.
Here's what you'll do:
fasterrcnn_resnet50_fpn (the backbone is pre-trained on ImageNet)You should be familiar with PyTorch before starting the tutorial. If you need a refresher, read PyTorch's training a classifier tutorial.
!pip install 'ray[data,train]'
torch, torchmetrics, torchvision, and xmltodict.!pip install torch torchmetrics torchvision xmltodict
DatasetYou'll work with a subset of Pascal VOC that contains cats and dogs (the full dataset has 20 classes).
CLASS_TO_LABEL = {
"background": 0,
"cat": 1,
"dog": 2,
}
The dataset contain two subdirectories: JPEGImages and Annotations. JPEGImages contains raw images, and
Annotations contains XML annotations.
AnimalDetection
├── Annotations
│ ├── 2007_000063.xml
│ ├── 2007_000528.xml
│ └── ...
└── JPEGImages
├── 2007_000063.jpg
├── 2007_000528.jpg
└── ...
Each annotation describes the objects in an image.
For example, view this image of a dog:
import io
from PIL import Image
import requests
response = requests.get("https://s3-us-west-2.amazonaws.com/air-example-data/AnimalDetection/JPEGImages/2007_000063.jpg")
image = Image.open(io.BytesIO(response.content))
image
Then, print the image's annotation:
!curl "https://s3-us-west-2.amazonaws.com/air-example-data/AnimalDetection/Annotations/2007_000063.xml"
Notice how there's one object labeled "dog"
<name>dog</name>
<pose>Unspecified</pose>
<truncated>0</truncated>
<difficult>0</difficult>
<bndbox>
<xmin>123</xmin>
<ymin>115</ymin>
<xmax>379</xmax>
<ymax>275</ymax>
</bndbox>
Ray Data lets you read and preprocess data in parallel. Ray Data doesn't have built-in support for VOC-style annotations, so you'll need to define logic to parse the annotations.
from typing import Any, Dict, List, Tuple
import xmltodict
def decode_annotation(row: Dict[str, Any]) -> Dict[str, Any]:
text = row["bytes"].decode("utf-8")
annotation = xmltodict.parse(text)["annotation"]
objects = annotation["object"]
# If there's one object, `objects` is a `dict`; otherwise, it's a `list[dict]`.
if isinstance(objects, dict):
objects = [objects]
boxes: List[Tuple] = []
for obj in objects:
x1 = float(obj["bndbox"]["xmin"])
y1 = float(obj["bndbox"]["ymin"])
x2 = float(obj["bndbox"]["xmax"])
y2 = float(obj["bndbox"]["ymax"])
boxes.append((x1, y1, x2, y2))
labels: List[int] = [CLASS_TO_LABEL[obj["name"]] for obj in objects]
filename = annotation["filename"]
return {
"boxes": boxes,
"labels": labels,
"filename": filename,
}
import os
import ray
path = "s3://anonymous@air-example-data/AnimalDetection/Annotations"
annotations: ray.data.Dataset = (
ray.data.read_binary_files(path)
.map(decode_annotation)
)
Look at the first two samples. Ray Data should've correctly parsed labels and bounding boxes.
annotations.take(2)
Each row of annotations contains the filename of an image.
Write a user-defined function that loads these images. For each annotation, your function will:
"image" column.from typing import Dict
import numpy as np
from PIL import Image
def read_images(row: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
url = os.path.join("https://s3-us-west-2.amazonaws.com/air-example-data/AnimalDetection/JPEGImages", row["filename"])
response = requests.get(url)
image = Image.open(io.BytesIO(response.content))
row["image"] = np.array(image)
return row
dataset = annotations.map(read_images)
dataset
Once you've created a Dataset, split the dataset into train and test sets.
train_dataset, test_dataset = dataset.train_test_split(0.2)
Create a function that preprocesses the images in the dataset. First, transpose and scale the images (ToTensor). Then,
randomly augment images every epoch (RandomHorizontalFlip). Apply this transformation to each row in the dataset with map.
from typing import Any
from torchvision import transforms
def preprocess_image(row: Dict[str, Any]) -> Dict[str, Any]:
transform = transforms.Compose([transforms.ToTensor(), transforms.RandomHorizontalFlip(p=0.5)])
row["image"] = transform(row["image"])
return row
# The following transform operation is lazy.
# It will be re-run every epoch.
train_dataset = train_dataset.map(preprocess_image)
test_dataset.take(1)
Write a function that trains fasterrcnn_resnet50_fpn. Your code will look like
standard Torch code with a few changes.
Here are a few things to point out:
ray.train.torch.prepare_model. Don't use DistributedDataParallel.DataIterator.iter_batches. Don't use a Torch DataLoader.In addition, report metrics and checkpoints with train.report. train.report tracks these metrics in Ray Train's internal bookkeeping, allowing you to monitor training and analyze training runs after they've finished.
import os
import torch
from torchvision import models
from tempfile import TemporaryDirectory
from ray import train
def train_one_epoch(*, model, optimizer, batch_size, epoch):
model.train()
lr_scheduler = None
if epoch == 0:
warmup_factor = 1.0 / 1000
lr_scheduler = torch.optim.lr_scheduler.LinearLR(
optimizer, start_factor=warmup_factor, total_iters=250
)
device = ray.train.torch.get_device()
train_dataset_shard = train.get_dataset_shard("train")
batches = train_dataset_shard.iter_batches(batch_size=batch_size)
for batch in batches:
inputs = [torch.as_tensor(image).to(device) for image in batch["image"]]
targets = []
for i in range(len(batch["boxes"])):
# `boxes` is a (B, 4) tensor, where B is the number of boxes in the image.
boxes = torch.as_tensor([box for box in batch["boxes"][i]]).to(device)
# `labels` is a (B,) tensor, where B is the number of boxes in the image.
labels = torch.as_tensor(batch["labels"][i]).to(device)
targets.append({"boxes": boxes, "labels": labels})
loss_dict = model(inputs, targets)
losses = sum(loss for loss in loss_dict.values())
optimizer.zero_grad()
losses.backward()
optimizer.step()
if lr_scheduler is not None:
lr_scheduler.step()
train.report(
{
"losses": losses.item(),
"epoch": epoch,
"lr": optimizer.param_groups[0]["lr"],
**{key: value.item() for key, value in loss_dict.items()},
}
)
def train_loop_per_worker(config):
# By default, `fasterrcnn_resnet50_fpn`'s backbone is pre-trained on ImageNet.
model = models.detection.fasterrcnn_resnet50_fpn(num_classes=3)
model = ray.train.torch.prepare_model(model)
parameters = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(
parameters,
lr=config["lr"],
momentum=config["momentum"],
weight_decay=config["weight_decay"],
)
lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(
optimizer, milestones=config["lr_steps"], gamma=config["lr_gamma"]
)
for epoch in range(0, config["epochs"]):
train_one_epoch(
model=model,
optimizer=optimizer,
batch_size=config["batch_size"],
epoch=epoch,
)
lr_scheduler.step()
Once you've defined the training loop, create a TorchTrainer and pass the training
loop to the constructor. Then, call TorchTrainer.fit to train the model.
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
trainer = TorchTrainer(
train_loop_per_worker=train_loop_per_worker,
train_loop_config={
"batch_size": 2,
"lr": 0.02,
"epochs": 1, # You'd normally train for 26 epochs.
"momentum": 0.9,
"weight_decay": 1e-4,
"lr_steps": [16, 22],
"lr_gamma": 0.1,
},
scaling_config=ScalingConfig(num_workers=4, use_gpu=True),
datasets={"train": train_dataset},
)
results = trainer.fit()
End-to-end: Offline Batch Inference <batch_inference_home>