doc/source/ray-overview/examples/entity-recognition-with-llms/README.ipynb
This end-to-end tutorial fine-tunes an LLM to perform batch inference and online serving at scale. While entity recognition (NER) is the main task in this tutorial, you can easily extend these end-to-end workflows to any use case.
Note: The intent of this tutorial is to show how you can use Ray to implement end-to-end LLM workflows that can extend to any use case, including multimodal.
This tutorial uses the Ray library to implement these workflows, namely the LLM APIs:
And all of these workloads come with all the observability views you need to debug and tune them to maximize throughput/latency.
This Anyscale Workspace automatically provisions and autoscales the compute your workloads need. If you're not on Anyscale, then you need to provision the appropriate compute (L4) for this tutorial.
Start by downloading the dependencies required for this tutorial. Notice in your containerfile you have a base image anyscale/ray-llm:latest-py311-cu124 followed by a list of pip packages. If you're not on Anyscale, you can pull this Docker image yourself and install the dependencies.
%%bash
# Install dependencies
pip install -q \
"xgrammar==0.1.11" \
"pynvml==12.0.0" \
"hf_transfer==0.1.9" \
"tensorboard==2.19.0" \
"llamafactory@git+https://github.com/hiyouga/LLaMA-Factory.git@ac8c6fdd3ab7fb6372f231f238e6b8ba6a17eb16#egg=llamafactory"
import json
import textwrap
from IPython.display import Code, Image, display
Start by downloading the data from cloud storage to local shared storage.
%%bash
rm -rf /mnt/cluster_storage/viggo # clean up
mkdir /mnt/cluster_storage/viggo
wget https://viggo-ds.s3.amazonaws.com/train.jsonl -O /mnt/cluster_storage/viggo/train.jsonl
wget https://viggo-ds.s3.amazonaws.com/val.jsonl -O /mnt/cluster_storage/viggo/val.jsonl
wget https://viggo-ds.s3.amazonaws.com/test.jsonl -O /mnt/cluster_storage/viggo/test.jsonl
wget https://viggo-ds.s3.amazonaws.com/dataset_info.json -O /mnt/cluster_storage/viggo/dataset_info.json
%%bash
head -n 1 /mnt/cluster_storage/viggo/train.jsonl | python3 -m json.tool
with open("/mnt/cluster_storage/viggo/train.jsonl", "r") as fp:
first_line = fp.readline()
item = json.loads(first_line)
system_content = item["instruction"]
print(textwrap.fill(system_content, width=80))
You also have an info file that identifies the datasets and format (Alpaca and ShareGPT formats) to use for post training.
display(Code(filename="/mnt/cluster_storage/viggo/dataset_info.json", language="json"))
Use Ray Train + LLaMA-Factory to perform multi-node training. Find the parameters for the training workload, post-training method, dataset location, train/val details, etc. in the llama3_lora_sft_ray.yaml config file. See the recipes for even more post-training methods, like SFT, pretraining, PPO, DPO, KTO, etc. on GitHub.
Note: Ray also supports using other tools like axolotl or even Ray Train + HF Accelerate + FSDP/DeepSpeed directly for complete control of your post-training workloads.
configimport os
from pathlib import Path
import yaml
display(Code(filename="lora_sft_ray.yaml", language="yaml"))
model_id = "ft-model" # call it whatever you want
model_source = yaml.safe_load(open("lora_sft_ray.yaml"))["model_name_or_path"] # HF model ID, S3 mirror config, or GCS mirror config
print (model_source)
Use Ray Train + LlamaFactory to perform the mult-node train loop.
<div class="alert alert-block alert"> <b>Ray Train</b>Using Ray Train has several advantages:
hostfile configs.RayTurbo Train offers even more improvement to the price-performance ratio, performance monitoring, and more:
%%bash
# Run multi-node distributed fine-tuning workload
USE_RAY=1 llamafactory-cli train lora_sft_ray.yaml
display(Code(filename="/mnt/cluster_storage/viggo/outputs/all_results.json", language="json"))
OSS Ray offers an extensive observability suite with logs and an observability dashboard that you can use to monitor and debug. The dashboard includes a lot of different components such as:
memory, utilization, etc., of the tasks running in the cluster
views to see all running tasks, utilization across instance types, autoscaling, etc.
OSS Ray comes with an extensive observability suite, and Anyscale takes it many steps further to make monitoring and debugging your workloads even easier and faster with:
You can always store to data inside any storage buckets but Anyscale offers a default storage bucket to make things even easier. You also have plenty of other storage options as well, shared at the cluster, user, and cloud levels.
%%bash
# Anyscale default storage bucket.
echo $ANYSCALE_ARTIFACT_STORAGE
%%bash
# Save fine-tuning artifacts to cloud storage.
STORAGE_PATH="$ANYSCALE_ARTIFACT_STORAGE/viggo"
LOCAL_OUTPUTS_PATH="/mnt/cluster_storage/viggo/outputs"
LOCAL_SAVES_PATH="/mnt/cluster_storage/viggo/saves"
# AWS S3 operations.
if [[ "$STORAGE_PATH" == s3://* ]]; then
if aws s3 ls "$STORAGE_PATH" > /dev/null 2>&1; then
aws s3 rm "$STORAGE_PATH" --recursive --quiet
fi
aws s3 cp "$LOCAL_OUTPUTS_PATH" "$STORAGE_PATH/outputs" --recursive --quiet
aws s3 cp "$LOCAL_SAVES_PATH" "$STORAGE_PATH/saves" --recursive --quiet
# Google Cloud Storage operations.
elif [[ "$STORAGE_PATH" == gs://* ]]; then
if gsutil ls "$STORAGE_PATH" > /dev/null 2>&1; then
gsutil -m -q rm -r "$STORAGE_PATH"
fi
gsutil -m -q cp -r "$LOCAL_OUTPUTS_PATH" "$STORAGE_PATH/outputs"
gsutil -m -q cp -r "$LOCAL_SAVES_PATH" "$STORAGE_PATH/saves"
else
echo "Unsupported storage protocol: $STORAGE_PATH"
exit 1
fi
%%bash
ls /mnt/cluster_storage/viggo/saves/lora_sft_ray
# LoRA paths.
save_dir = Path("/mnt/cluster_storage/viggo/saves/lora_sft_ray")
trainer_dirs = [d for d in save_dir.iterdir() if d.name.startswith("TorchTrainer_") and d.is_dir()]
latest_trainer = max(trainer_dirs, key=lambda d: d.stat().st_mtime, default=None)
lora_path = f"{latest_trainer}/checkpoint_000000/checkpoint"
cloud_lora_path = os.path.join(os.getenv("ANYSCALE_ARTIFACT_STORAGE"), lora_path.split("/mnt/cluster_storage/")[-1])
dynamic_lora_path, lora_id = cloud_lora_path.rsplit("/", 1)
print (lora_path)
print (cloud_lora_path)
print (dynamic_lora_path)
print (lora_id)
%%bash -s "$lora_path"
ls $1
The ray.data.llm module integrates with key large language model (LLM) inference engines and deployed models to enable LLM batch inference. These LLM modules use Ray Data under the hood, which makes it extremely easy to distribute workloads but also ensures that they happen:
RayTurbo Data has more features on top of Ray Data:
Start by defining the vLLM engine processor config where you can select the model to use and the engine behavior. The model can come from Hugging Face (HF) Hub or a local model path /path/to/your/model. Anyscale supports GPTQ, GGUF, or LoRA model formats.
import os
import ray
from ray.data.llm import vLLMEngineProcessorConfig
config = vLLMEngineProcessorConfig(
model_source=model_source,
runtime_env={
"env_vars": {
"VLLM_USE_V1": "0", # v1 doesn't support lora adapters yet
# "HF_TOKEN": os.environ.get("HF_TOKEN"),
},
},
engine_kwargs={
"enable_lora": True,
"max_lora_rank": 8,
"max_loras": 1,
"pipeline_parallel_size": 1,
"tensor_parallel_size": 1,
"enable_prefix_caching": True,
"enable_chunked_prefill": True,
"max_num_batched_tokens": 4096,
"max_model_len": 4096, # or increase KV cache size
# complete list: https://docs.vllm.ai/en/stable/serving/engine_args.html
},
concurrency=1,
batch_size=16,
accelerator_type="L4",
)
Next, pass the config to an LLM processor where you can define the preprocessing and postprocessing steps around inference. With your base model defined in the processor config, you can define the LoRA adapter layers as part of the preprocessing step of the LLM processor itself.
from ray.data.llm import build_processor
processor = build_processor(
config,
preprocess=lambda row: dict(
model=lora_path, # REMOVE this line if doing inference with just the base model
messages=[
{"role": "system", "content": system_content},
{"role": "user", "content": row["input"]}
],
sampling_params={
"temperature": 0.3,
"max_tokens": 250,
# complete list: https://docs.vllm.ai/en/stable/api/inference_params.html
},
),
postprocess=lambda row: {
**row, # all contents
"generated_output": row["generated_text"],
# add additional outputs
},
)
# Evaluation on test dataset
ds = ray.data.read_json("/mnt/cluster_storage/viggo/test.jsonl") # complete list: https://docs.ray.io/en/latest/data/api/input_output.html
ds = processor(ds)
results = ds.take_all()
results[0]
# Exact match (strict!)
matches = 0
for item in results:
if item["output"] == item["generated_output"]:
matches += 1
matches / float(len(results))
Note: The objective of fine-tuning here isn't to create the most performant model but to show that you can leverage it for downstream workloads, like batch inference and online serving at scale. However, you can increase num_train_epochs if you want to.
Observe the individual steps in the batch inference workload through the Anyscale Ray Data dashboard:
<div class="alert alert-info">š” For more advanced guides on topics like optimized model loading, multi-LoRA, OpenAI-compatible endpoints, etc., see more examples and the API reference.
</div>ray.serve.llm APIs allow users to deploy multiple LLM models together with a familiar Ray Serve API, while providing compatibility with the OpenAI API.
Ray Serve LLM is designed with the following features:
RayTurbo Serve on Anyscale has more features on top of Ray Serve:
import os
from openai import OpenAI # to use openai api format
from ray import serve
from ray.serve.llm import LLMConfig, build_openai_app
Define an LLM config where you can define where the model comes from, its autoscaling behavior, what hardware to use and engine arguments.
Note: If you're using AWS S3, replace AWS_REGION in the runtime_env's env_vars below with the cloud storage and respective region you saved your model artifacts to. Do the same if using other cloud storage options as well.
# Define config.
llm_config = LLMConfig(
model_loading_config={
"model_id": model_id,
"model_source": model_source
},
lora_config={ # REMOVE this section if you're only using a base model.
"dynamic_lora_loading_path": dynamic_lora_path,
"max_num_adapters_per_replica": 16, # You only have 1.
},
runtime_env={"env_vars": {"AWS_REGION": "us-west-2"}},
# runtime_env={"env_vars": {"HF_TOKEN": os.environ.get("HF_TOKEN")}},
deployment_config={
"autoscaling_config": {
"min_replicas": 1,
"max_replicas": 2,
# complete list: https://docs.ray.io/en/latest/serve/autoscaling-guide.html#serve-autoscaling
}
},
accelerator_type="L4",
engine_kwargs={
"max_model_len": 4096, # Or increase KV cache size.
"tensor_parallel_size": 1,
"enable_lora": True,
# complete list: https://docs.vllm.ai/en/stable/serving/engine_args.html
},
)
Now deploy the LLM config as an application. And because this application is all built on top of Ray Serve, you can have advanced service logic around composing models together, deploying multiple applications, model multiplexing, observability, etc.
# Deploy.
app = build_openai_app({"llm_configs": [llm_config]})
serve.run(app)
# Initialize client.
client = OpenAI(base_url="http://localhost:8000/v1", api_key="fake-key")
response = client.chat.completions.create(
model=f"{model_id}:{lora_id}",
messages=[
{"role": "system", "content": "Given a target sentence construct the underlying meaning representation of the input sentence as a single function with attributes and attribute values. This function should describe the target string accurately and the function must be one of the following ['inform', 'request', 'give_opinion', 'confirm', 'verify_attribute', 'suggest', 'request_explanation', 'recommend', 'request_attribute']. The attributes must be one of the following: ['name', 'exp_release_date', 'release_year', 'developer', 'esrb', 'rating', 'genres', 'player_perspective', 'has_multiplayer', 'platforms', 'available_on_steam', 'has_linux_release', 'has_mac_release', 'specifier']"},
{"role": "user", "content": "Blizzard North is mostly an okay developer, but they released Diablo II for the Mac and so that pushes the game from okay to good in my view."},
],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="", flush=True)
And of course, you can observe the running service, the deployments, and metrics like QPS, latency, etc., through the Ray Dashboard's Serve view:
<div class="alert alert-info">š” See more examples and the API reference for advanced guides on topics like structured outputs (like JSON), vision LMs, multi-LoRA on shared base models, using other inference engines (like sglang), fast model loading, etc.
Seamlessly integrate with your existing CI/CD pipelines by leveraging the Anyscale CLI or SDK to run reliable batch jobs and deploy highly available services. Given you've been developing in an environment that's almost identical to production with a multi-node cluster, this integration should drastically speed up your dev to prod velocity.
Anyscale Jobs (API ref) allows you to execute discrete workloads in production such as batch inference, embeddings generation, or model fine-tuning.
Anyscale Services (API ref) offers an extremely fault tolerant, scalable, and optimized way to serve your Ray Serve applications:
num_replicas=auto) and utilize replica compaction to consolidate nodes that are fractionally utilized%%bash
# clean up
rm -rf /mnt/cluster_storage/viggo
STORAGE_PATH="$ANYSCALE_ARTIFACT_STORAGE/viggo"
if [[ "$STORAGE_PATH" == s3://* ]]; then
aws s3 rm "$STORAGE_PATH" --recursive --quiet
elif [[ "$STORAGE_PATH" == gs://* ]]; then
gsutil -m -q rm -r "$STORAGE_PATH"
fi