Back to Llama Index

Advanced Ingestion Pipeline

docs/examples/ingestion/advanced_ingestion_pipeline.ipynb

0.14.213.2 KB
Original Source

<a href="https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/examples/ingestion/advanced_ingestion_pipeline.ipynb" target="_parent"></a>

python
%pip install llama-index-vector-stores-weaviate
%pip install llama-index-embeddings-huggingface
python
!pip install llama-index

Advanced Ingestion Pipeline

In this notebook, we implement an IngestionPipeline with the following features

  • MongoDB transformation caching
  • Automatic vector databse insertion
  • A custom transformation

Redis Cache Setup

All node + transformation combinations will have their outputs cached, which will save time on duplicate runs.

python
from llama_index.core.ingestion.cache import RedisCache
from llama_index.core.ingestion import IngestionCache

ingest_cache = IngestionCache(
    cache=RedisCache.from_host_and_port(host="127.0.0.1", port=6379),
    collection="my_test_cache",
)

Vector DB Setup

For this example, we use weaviate as a vector store.

python
!pip install weaviate-client
python
import weaviate

auth_config = weaviate.AuthApiKey(api_key="...")

client = weaviate.Client(url="https://...", auth_client_secret=auth_config)
python
from llama_index.vector_stores.weaviate import WeaviateVectorStore

vector_store = WeaviateVectorStore(
    weaviate_client=client, index_name="CachingTest"
)

Transformation Setup

python
from llama_index.core.node_parser import TokenTextSplitter
from llama_index.embeddings.huggingface import HuggingFaceEmbedding

text_splitter = TokenTextSplitter(chunk_size=512)
embed_model = HuggingFaceEmbedding(model_name="BAAI/bge-small-en-v1.5")

Custom Transformation

python
import re
from llama_index.core.schema import TransformComponent


class TextCleaner(TransformComponent):
    def __call__(self, nodes, **kwargs):
        for node in nodes:
            node.text = re.sub(r"[^0-9A-Za-z ]", "", node.text)
        return nodes

Running the pipeline

python
from llama_index.core.ingestion import IngestionPipeline

pipeline = IngestionPipeline(
    transformations=[
        TextCleaner(),
        text_splitter,
        embed_model,
        TitleExtractor(),
    ],
    vector_store=vector_store,
    cache=ingest_cache,
)
python
from llama_index.core import SimpleDirectoryReader

documents = SimpleDirectoryReader("../data/paul_graham/").load_data()
python
nodes = pipeline.run(documents=documents)

Using our populated vector store

python
import os

# needed for the LLM in the query engine
os.environ["OPENAI_API_KEY"] = "sk-..."
python
from llama_index.core import VectorStoreIndex

index = VectorStoreIndex.from_vector_store(
    vector_store=vector_store,
    embed_model=embed_model,
)
python
query_engine = index.as_query_engine()

print(query_engine.query("What did the author do growing up?"))

Re-run Ingestion to test Caching

The next code block will execute almost instantly due to caching.

python
pipeline = IngestionPipeline(
    transformations=[TextCleaner(), text_splitter, embed_model],
    cache=ingest_cache,
)

nodes = pipeline.run(documents=documents)

Clear the cache

python
ingest_cache.clear()