doc/source/train/examples/transformers/huggingface_text_classification.ipynb
This notebook is based on an official Hugging Face example, How to fine-tune a model on text classification. This notebook shows the process of conversion from vanilla HF to Ray Train without changing the training logic unless necessary.
This notebook consists of the following steps:
Uncomment and run the following line to install all the necessary dependencies. (This notebook is being tested with transformers==4.19.1.):
#! pip install "datasets" "transformers>=4.19.0" "torch>=1.10.0" "mlflow"
(hf-setup)=
Use ray.init() to initialize a local cluster. By default, this cluster contains only the machine you are running this notebook on. You can also run this notebook on an Anyscale cluster.
from pprint import pprint
import ray
ray.init()
Check the resources our cluster is composed of. If you are running this notebook on your local machine or Google Colab, you should see the number of CPU cores and GPUs available on your machine.
pprint(ray.cluster_resources())
This notebook fine-tunes a HF Transformers model for one of the text classification task of the GLUE Benchmark. It runs the training using Ray Train.
You can change these two variables to control whether the training, which happens later, uses CPUs or GPUs, and how many workers to spawn. Each worker claims one CPU or GPU. Make sure to not request more resources than the resources present. By default, the training runs with one GPU worker.
use_gpu = True # set this to False to run on CPUs
num_workers = 1 # set this to number of GPUs or CPUs you want to use
The GLUE Benchmark is a group of nine classification tasks on sentences or pairs of sentences. To learn more, see the original notebook.
Each task has a name that is its acronym, with mnli-mm to indicate that it is a mismatched version of MNLI. Each one has the same training set as mnli but different validation and test sets.
GLUE_TASKS = [
"cola",
"mnli",
"mnli-mm",
"mrpc",
"qnli",
"qqp",
"rte",
"sst2",
"stsb",
"wnli",
]
This notebook runs on any of the tasks in the list above, with any model checkpoint from the Model Hub as long as that model has a version with a classification head. Depending on the model and the GPU you are using, you might need to adjust the batch size to avoid out-of-memory errors. Set these three parameters, and the rest of the notebook should run smoothly:
task = "cola"
model_checkpoint = "distilbert-base-uncased"
batch_size = 16
(hf-preprocess)=
Below, we'll load a dataset from HuggingFace and preprocess them with a HF Transformers' Tokenizer, which tokenizes the inputs, including converting the tokens to their corresponding IDs in the pretrained vocabulary, and puts them in a format the model expects. It also generates the other inputs that the model requires.
To do all of this preprocessing, instantiate your tokenizer with the AutoTokenizer.from_pretrained method, which ensures that you:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, use_fast=True)
Pass use_fast=True to the preceding call to use one of the fast tokenizers, backed by Rust, from the HF Tokenizers library. These fast tokenizers are available for almost all models, but if you get an error with the previous call, remove the argument.
To preprocess the dataset, you need the names of the columns containing the sentence(s). The following dictionary keeps track of the correspondence task to column names:
task_to_keys = {
"cola": ("sentence", None),
"mnli": ("premise", "hypothesis"),
"mnli-mm": ("premise", "hypothesis"),
"mrpc": ("sentence1", "sentence2"),
"qnli": ("question", "sentence"),
"qqp": ("question1", "question2"),
"rte": ("sentence1", "sentence2"),
"sst2": ("sentence", None),
"stsb": ("sentence1", "sentence2"),
"wnli": ("sentence1", "sentence2"),
}
Use the HuggingFace Hub HfFileSystem to directly read Parquet files from the Hub. Since the HuggingFace datasets here are backed by Parquet files, you can use {meth}~ray.data.read_parquet with the filesystem parameter to load them into Ray Data.
import ray.data
from huggingface_hub import HfFileSystem
actual_task = "mnli" if task == "mnli-mm" else task
# Load datasets using HfFileSystem
# GLUE datasets are backed by Parquet files on Hugging Face Hub
path = f"hf://datasets/nyu-mll/glue/{actual_task}/"
fs = HfFileSystem()
# List the parquet files for each split
files = [f["name"] for f in fs.ls(path)]
train_files = [f for f in files if "train" in f and f.endswith(".parquet")]
validation_files = [f for f in files if "validation" in f and f.endswith(".parquet")]
test_files = [f for f in files if "test" in f and f.endswith(".parquet")]
ray_datasets = {
"train": ray.data.read_parquet(train_files, filesystem=fs),
"validation": ray.data.read_parquet(validation_files, filesystem=fs),
"test": ray.data.read_parquet(test_files, filesystem=fs),
}
ray_datasets
You can then write the function that preprocesses the samples. Feed them to the tokenizer with the argument truncation=True. This configuration ensures that the tokenizer truncates and pads to the longest sequence in the batch, any input longer than what the model selected can handle.
import numpy as np
from typing import Dict
# Tokenize input sentences
def collate_fn(examples: Dict[str, np.array]):
sentence1_key, sentence2_key = task_to_keys[task]
if sentence2_key is None:
outputs = tokenizer(
list(examples[sentence1_key]),
truncation=True,
padding="longest",
return_tensors="pt",
)
else:
outputs = tokenizer(
list(examples[sentence1_key]),
list(examples[sentence2_key]),
truncation=True,
padding="longest",
return_tensors="pt",
)
outputs["labels"] = torch.LongTensor(examples["label"])
# Move all input tensors to GPU
for key, value in outputs.items():
outputs[key] = value.cuda()
return outputs
(hf-train)=
Now that the data is ready, download the pretrained model and fine-tune it.
Because all of the tasks involve sentence classification, use the AutoModelForSequenceClassification class. For more specifics about each individual training component, see the original notebook. The original notebook uses the same tokenizer used to encode the dataset in this notebook's preceding example.
The main difference when using Ray Train is that you need to define the training logic as a function (train_func). You pass this training function to the {class}~ray.train.torch.TorchTrainer to on every Ray worker. The training then proceeds using PyTorch DDP.
Be sure to initialize the model, metric, and tokenizer within the function. Otherwise, you may encounter serialization errors.
import torch
import numpy as np
from evaluate import load
from transformers import AutoModelForSequenceClassification, TrainingArguments, Trainer
import ray.train
from ray.train.huggingface.transformers import prepare_trainer, RayTrainReportCallback
num_labels = 3 if task.startswith("mnli") else 1 if task == "stsb" else 2
metric_name = (
"pearson"
if task == "stsb"
else "matthews_correlation"
if task == "cola"
else "accuracy"
)
model_name = model_checkpoint.split("/")[-1]
validation_key = (
"validation_mismatched"
if task == "mnli-mm"
else "validation_matched"
if task == "mnli"
else "validation"
)
name = f"{model_name}-finetuned-{task}"
# Calculate the maximum steps per epoch based on the number of rows in the training dataset.
# Make sure to scale by the total number of training workers and the per device batch size.
max_steps_per_epoch = ray_datasets["train"].count() // (batch_size * num_workers)
def train_func(config):
print(f"Is CUDA available: {torch.cuda.is_available()}")
metric = load("glue", actual_task)
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, use_fast=True)
model = AutoModelForSequenceClassification.from_pretrained(
model_checkpoint, num_labels=num_labels
)
train_ds = ray.train.get_dataset_shard("train")
eval_ds = ray.train.get_dataset_shard("eval")
train_ds_iterable = train_ds.iter_torch_batches(
batch_size=batch_size, collate_fn=collate_fn
)
eval_ds_iterable = eval_ds.iter_torch_batches(
batch_size=batch_size, collate_fn=collate_fn
)
print("max_steps_per_epoch: ", max_steps_per_epoch)
args = TrainingArguments(
name,
evaluation_strategy="epoch",
save_strategy="epoch",
logging_strategy="epoch",
per_device_train_batch_size=batch_size,
per_device_eval_batch_size=batch_size,
learning_rate=config.get("learning_rate", 2e-5),
num_train_epochs=config.get("epochs", 2),
weight_decay=config.get("weight_decay", 0.01),
push_to_hub=False,
max_steps=max_steps_per_epoch * config.get("epochs", 2),
disable_tqdm=True, # declutter the output a little
no_cuda=not use_gpu, # you need to explicitly set no_cuda if you want CPUs
report_to="none",
)
def compute_metrics(eval_pred):
predictions, labels = eval_pred
if task != "stsb":
predictions = np.argmax(predictions, axis=1)
else:
predictions = predictions[:, 0]
return metric.compute(predictions=predictions, references=labels)
trainer = Trainer(
model,
args,
train_dataset=train_ds_iterable,
eval_dataset=eval_ds_iterable,
tokenizer=tokenizer,
compute_metrics=compute_metrics,
)
trainer.add_callback(RayTrainReportCallback())
trainer = prepare_trainer(trainer)
print("Starting training")
trainer.train()
With your train_func complete, you can now instantiate the {class}~ray.train.torch.TorchTrainer. Aside from calling the function, set the scaling_config, which controls the amount of workers and resources used, and the datasets to use for training and evaluation.
from ray.train.torch import TorchTrainer
from ray.train import RunConfig, ScalingConfig, CheckpointConfig
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
datasets={
"train": ray_datasets["train"],
"eval": ray_datasets["validation"],
},
run_config=RunConfig(
checkpoint_config=CheckpointConfig(
num_to_keep=1,
checkpoint_score_attribute="eval_loss",
checkpoint_score_order="min",
),
),
)
Finally, call the fit method to start training with Ray Train. Save the Result object to a variable so you can access metrics and checkpoints.
result = trainer.fit()
You can use the returned Result object to access metrics and the Ray Train Checkpoint associated with the last iteration.
result
Ray Train Examples <../../examples> for more use casesRay Train User Guides <train-user-guides> for how-to guides