Back to Llama Index

Distributed Ingestion Pipeline with Ray

docs/examples/ingestion/ray_ingestion_pipeline.ipynb

0.14.212.8 KB
Original Source

<a href="https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/examples/ingestion/parallel_execution_ingestion_pipeline.ipynb" target="_parent"></a>

Distributed Ingestion Pipeline with Ray

In this notebook, we demonstrate how to execute ingestion pipelines using Ray.

python
%pip install llama-index-ingestion-ray llama-index-embeddings-huggingface

Start a new cluster, or connect to an existing one. See https://docs.ray.io/en/latest/ray-core/configure.html for details about Ray cluster configurations.

python
import ray

ray.init()

Load data

python
from llama_index.core import SimpleDirectoryReader

documents = SimpleDirectoryReader(input_dir="./data/source_files").load_data()

Define the RayIngestionPipeline

First, we define our transformations. Each TransformComponent object is wrapped into a RayTransformComponent that encapsulates the transformation logic within stateful Ray Actors. All the transformation logic is performed using Ray Data. For more details about how to configure the hardware requirements and Actor Pool strategies, see ray.data.Dataset.map_batches documentation.

python
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.ingestion.ray import RayTransformComponent

transformations = [
    RayTransformComponent(
        transform_class=SentenceSplitter,
        chunk_size=1024,
        chunk_overlap=20,
        map_batches_kwargs={
            "batch_size": 100,  # Batch Size
            "num_cpus": 1,  # Request 1 CPU per actor
            "compute": ray.data.ActorPoolStrategy(
                size=20
            ),  # Fixed Pool of 20 actors
        },
    ),
    RayTransformComponent(
        transform_class=HuggingFaceEmbedding,
        model_name="BAAI/bge-small-en-v1.5",
        map_batches_kwargs={
            "batch_size": 100,
            # Fractional GPU Usage
            # This tells Ray: "1 Actor needs 25% of a GPU".
            # If you have 1 physical GPU, Ray autoscales to 4 Actors.
            # If you have 4 physical GPUs, Ray autoscales to 16 Actors.
            "num_gpus": 0.25,
        },
    ),
]

Then, we create the ingestion pipeline.

python
from llama_index.ingestion.ray import RayIngestionPipeline

pipeline = RayIngestionPipeline(transformations=transformations)

Run the Pipeline

We can finally run the pipeline with our Ray cluster.

python
nodes = pipeline.run(documents=documents)