docs/examples/ingestion/ray_ingestion_pipeline.ipynb
<a href="https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/examples/ingestion/parallel_execution_ingestion_pipeline.ipynb" target="_parent"></a>
In this notebook, we demonstrate how to execute ingestion pipelines using Ray.
%pip install llama-index-ingestion-ray llama-index-embeddings-huggingface
Start a new cluster, or connect to an existing one. See https://docs.ray.io/en/latest/ray-core/configure.html for details about Ray cluster configurations.
import ray
ray.init()
from llama_index.core import SimpleDirectoryReader
documents = SimpleDirectoryReader(input_dir="./data/source_files").load_data()
First, we define our transformations. Each TransformComponent object is wrapped into a RayTransformComponent that encapsulates the transformation logic within stateful Ray Actors. All the transformation logic is performed using Ray Data. For more details about how to configure the hardware requirements and Actor Pool strategies, see ray.data.Dataset.map_batches documentation.
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.ingestion.ray import RayTransformComponent
transformations = [
RayTransformComponent(
transform_class=SentenceSplitter,
chunk_size=1024,
chunk_overlap=20,
map_batches_kwargs={
"batch_size": 100, # Batch Size
"num_cpus": 1, # Request 1 CPU per actor
"compute": ray.data.ActorPoolStrategy(
size=20
), # Fixed Pool of 20 actors
},
),
RayTransformComponent(
transform_class=HuggingFaceEmbedding,
model_name="BAAI/bge-small-en-v1.5",
map_batches_kwargs={
"batch_size": 100,
# Fractional GPU Usage
# This tells Ray: "1 Actor needs 25% of a GPU".
# If you have 1 physical GPU, Ray autoscales to 4 Actors.
# If you have 4 physical GPUs, Ray autoscales to 16 Actors.
"num_gpus": 0.25,
},
),
]
Then, we create the ingestion pipeline.
from llama_index.ingestion.ray import RayIngestionPipeline
pipeline = RayIngestionPipeline(transformations=transformations)
We can finally run the pipeline with our Ray cluster.
nodes = pipeline.run(documents=documents)