docs/examples/rag_opensearch.ipynb
| Step | Tech | Execution |
|---|---|---|
| Embedding | HuggingFace (IBM Granite Embedding 30M) | 💻 Local |
| Vector store | OpenSearch 3.0.0 | 💻 Local |
| Gen AI | Ollama (IBM Granite 4.0 Tiny) | 💻 Local |
This is a code recipe that uses OpenSearch, an open-source search and analytics tool, and the LlamaIndex framework to perform RAG over documents parsed by Docling.
In this notebook, we accomplish the following:
For running this notebook on your machine, you can use applications like Jupyter Notebook or Visual Studio Code.
💡 For best results, please use GPU acceleration to run this notebook.
Before installing dependencies and to avoid conflicts in your environment, it is advisable to use a virtual environment (venv). For instance, uv is a popular tool to manage virtual environments and dependencies. You can install it with:
curl -LsSf https://astral.sh/uv/install.sh | sh
Then create the virtual environment and activate it:
uv venv
source .venv/bin/activate
Refer to Installing uv for more details.
To start, install the required dependencies by running the following command:
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"
! uv pip install -q --no-progress notebook ipywidgets docling llama-index-readers-file llama-index-readers-docling llama-index-readers-elasticsearch llama-index-node-parser-docling llama-index-vector-stores-opensearch llama-index-embeddings-huggingface llama-index-llms-ollama
We now import all the necessary modules for this notebook:
import logging
from pathlib import Path
from tempfile import mkdtemp
import requests
import torch
from docling_core.transforms.chunker import HierarchicalChunker
from docling_core.transforms.chunker.hierarchical_chunker import (
ChunkingDocSerializer,
ChunkingSerializerProvider,
)
from docling_core.transforms.chunker.tokenizer.huggingface import HuggingFaceTokenizer
from docling_core.transforms.serializer.markdown import MarkdownTableSerializer
from llama_index.core import SimpleDirectoryReader, StorageContext, VectorStoreIndex
from llama_index.core.data_structs import Node
from llama_index.core.response_synthesizers import get_response_synthesizer
from llama_index.core.schema import NodeWithScore, TransformComponent
from llama_index.core.vector_stores import MetadataFilter, MetadataFilters
from llama_index.core.vector_stores.types import VectorStoreQueryMode
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.ollama import Ollama
from llama_index.node_parser.docling import DoclingNodeParser
from llama_index.readers.docling import DoclingReader
from llama_index.readers.elasticsearch import ElasticsearchReader
from llama_index.vector_stores.opensearch import (
OpensearchVectorClient,
OpensearchVectorStore,
)
from rich.console import Console
from rich.pretty import pprint
from transformers import AutoTokenizer
from docling.chunking import HybridChunker
logging.getLogger().setLevel(logging.WARNING)
Part of what makes Docling so remarkable is the fact that it can run on commodity hardware. This means that this notebook can be run on a local machine with GPU acceleration. If you're using a MacBook with a silicon chip, Docling integrates seamlessly with Metal Performance Shaders (MPS). MPS provides out-of-the-box GPU acceleration for macOS, seamlessly integrating with PyTorch and TensorFlow, offering energy-efficient performance on Apple Silicon, and broad compatibility with all Metal-supported GPUs.
The code below checks if a GPU is available, either via CUDA or MPS.
# Check if GPU or MPS is available
if torch.cuda.is_available():
device = torch.device("cuda")
print(f"CUDA GPU is enabled: {torch.cuda.get_device_name(0)}")
elif torch.backends.mps.is_available():
device = torch.device("mps")
print("MPS GPU is enabled.")
else:
raise OSError(
"No GPU or MPS device found. Please check your environment and ensure GPU or MPS support is configured."
)
To run the notebook locally, we can pull an OpenSearch image and run a single node for local development. You can use a container tool like Podman or Docker. In the interest of simplicity, we disable the SSL option for this example.
💡 The version of the OpenSearch instance needs to be compatible with the version of the OpenSearch Python Client library, since this library is used by the LlamaIndex framework, which we leverage in this notebook.
On your computer terminal run:
podman run \
-it \
--pull always \
-p 9200:9200 \
-p 9600:9600 \
-e "discovery.type=single-node" \
-e DISABLE_INSTALL_DEMO_CONFIG=true \
-e DISABLE_SECURITY_PLUGIN=true \
--name opensearch-node \
-d opensearchproject/opensearch:3.0.0
Once the instance is running, verify that you can connect to OpenSearch:
response = requests.get("http://localhost:9200")
print(response.text)
We will use HuggingFace and Ollama to run language models on your local computer, rather than relying on cloud services.
In this example, the following models are considered:
Once Ollama is installed on your computer, you can pull the model above from your terminal:
ollama pull granite4:tiny-h
We setup the main variables for OpenSearch and the embedding and generation models.
# http endpoint for your cluster
OPENSEARCH_ENDPOINT = "http://localhost:9200"
# index to store the Docling document vectors
OPENSEARCH_INDEX = "docling-index"
# the embedding model
EMBED_MODEL = HuggingFaceEmbedding(
model_name="ibm-granite/granite-embedding-30m-english"
)
# maximum chunk size in tokens
EMBED_MAX_TOKENS = 200
# the generation model
GEN_MODEL = Ollama(
model="granite4:tiny-h",
request_timeout=120.0,
# Manually set the context window to limit memory usage
context_window=8000,
# Set temperature to 0 for reproducibility of the results
temperature=0.0,
)
# a sample document
SOURCE = "https://arxiv.org/pdf/2408.09869"
embed_dim = len(EMBED_MODEL.get_text_embedding("hi"))
print(f"The embedding dimension is {embed_dim}.")
Docling can parse various document formats into a unified representation (DoclingDocument), which can then be exported to different output formats. For a full list of supported input and output formats, please refer to Supported formats section of Docling's documentation.
In this recipe, we will use a single PDF file, the Docling Technical Report. We will process it using the Hybrid Chunker provided by Docling to generate structured, hierarchical chunks suitable for downstream RAG tasks.
We will convert the original PDF file into a DoclingDocument format using a DoclingReader object. We specify the JSON export type to retain the document hierarchical structure as an input for the next step (chunking the document).
tmp_dir_path = Path(mkdtemp())
req = requests.get(SOURCE)
with open(tmp_dir_path / f"{Path(SOURCE).name}.pdf", "wb") as out_file:
out_file.write(req.content)
reader = DoclingReader(export_type=DoclingReader.ExportType.JSON)
dir_reader = SimpleDirectoryReader(
input_dir=tmp_dir_path,
file_extractor={".pdf": reader},
)
# load the PDF files
documents = dir_reader.load_data()
Before the actual ingestion of data, we need to define the data transformations to apply on the DoclingDocument:
DoclingNodeParser executes the document-based chunking with the hybrid chunker, which leverages the tokenizer of the embedding model to ensure that the resulting chunks fit within the model input text limit.MetadataTransform is a custom transformation to ensure that generated chunk metadata is best formatted for indexing with OpenSearch💡 For demonstration purposes, we configure the hybrid chunker to produce chunks capped at 200 tokens. The optimal limit will vary according to the specific requirements of the AI application in question. If this value is omitted, the chunker automatically derives the maximum size from the tokenizer. This safeguard guarantees that each chunk remains within the bounds supported by the underlying embedding model.
# create the hybrid chunker
tokenizer = HuggingFaceTokenizer(
tokenizer=AutoTokenizer.from_pretrained(EMBED_MODEL.model_name),
max_tokens=EMBED_MAX_TOKENS,
)
chunker = HybridChunker(tokenizer=tokenizer)
# create a Docling node parser
node_parser = DoclingNodeParser(chunker=chunker)
# create a custom transformation to avoid out-of-range integers
class MetadataTransform(TransformComponent):
def __call__(self, nodes, **kwargs):
for node in nodes:
binary_hash = node.metadata.get("origin", {}).get("binary_hash", None)
if binary_hash is not None:
node.metadata["origin"]["binary_hash"] = str(binary_hash)
return nodes
In this step, we create an OpenSearchVectorClient, which encapsulates the logic for a single OpenSearch index with vector search enabled.
We then initialize the index using our sample data (a single PDF file), the Docling node parser, and the OpenSearch client that we just created.
💡 You may get a warning message like:
Token indices sequence length is longer than the specified maximum sequence length for this model
This is a false alarm and you may get more background explanation in Docling's FAQ page.
# OpensearchVectorClient stores text in this field by default
text_field = "content"
# OpensearchVectorClient stores embeddings in this field by default
embed_field = "embedding"
client = OpensearchVectorClient(
endpoint=OPENSEARCH_ENDPOINT,
index=OPENSEARCH_INDEX,
dim=embed_dim,
engine="faiss",
embedding_field=embed_field,
text_field=text_field,
)
vector_store = OpensearchVectorStore(client)
storage_context = StorageContext.from_defaults(vector_store=vector_store)
index = VectorStoreIndex.from_documents(
documents=documents,
transformations=[node_parser, MetadataTransform()],
storage_context=storage_context,
embed_model=EMBED_MODEL,
)
In this section, we will see how to assemble a RAG system, execute a query, and get a generated response.
We will also describe how to leverage Docling capabilities to improve RAG results.
With LlamaIndex's query engine, we can simply run a RAG system as follows:
console = Console(width=88)
QUERY = "Which are the main AI models in Docling?"
query_engine = index.as_query_engine(llm=GEN_MODEL)
res = query_engine.query(QUERY)
console.print(f"👤: {QUERY}\n🤖: {res.response.strip()}")
Docling can extract the table content and process it for chunking, like other text elements.
In the following example, the response is generated from a retrieved chunk containing a table.
QUERY = "What is the time to solution with the native backend on Intel?"
query_engine = index.as_query_engine(llm=GEN_MODEL)
res = query_engine.query(QUERY)
console.print(f"👤: {QUERY}\n🤖: {res.response.strip()}")
The result above was generated with the table serialized in a triplet format. Language models may perform better on complex tables if the structure is represented in a format that is widely adopted, like markdown.
For this purpose, we can leverage a custom serializer that transforms tables in markdown format:
class MDTableSerializerProvider(ChunkingSerializerProvider):
def get_serializer(self, doc):
return ChunkingDocSerializer(
doc=doc,
# configuring a different table serializer
table_serializer=MarkdownTableSerializer(),
)
# clear the database from the previous chunks
client.clear()
vector_store.clear()
chunker = HybridChunker(
tokenizer=tokenizer,
max_tokens=EMBED_MAX_TOKENS,
serializer_provider=MDTableSerializerProvider(),
)
node_parser = DoclingNodeParser(chunker=chunker)
index = VectorStoreIndex.from_documents(
documents=documents,
transformations=[node_parser, MetadataTransform()],
storage_context=storage_context,
embed_model=EMBED_MODEL,
)
query_engine = index.as_query_engine(llm=GEN_MODEL)
res = query_engine.query(QUERY)
console.print(f"👤: {QUERY}\n🤖: {res.response.strip()}")
Observe that the generated response is now more accurate. Refer to the Advanced chunking & serialization example for more details on serialization strategies.
By default, the DoclingNodeParser will keep the hierarchical information of items when creating the chunks.
That information will be stored as metadata in the OpenSearch index. Leveraging the document structure is a powerful
feature of Docling for improving RAG systems, both for retrieval and for answer generation.
For example, we can use chunk metadata with layout information to run queries in a filter context, for high retrieval accuracy.
Using the previous setup, we can see that the most similar chunk corresponds to a paragraph without enough grounding for the question:
def display_nodes(nodes):
res = []
for idx, item in enumerate(nodes):
doc_res = {"k": idx + 1, "score": item.score, "text": item.text, "items": []}
doc_items = item.metadata["doc_items"]
for doc in doc_items:
doc_res["items"].append({"ref": doc["self_ref"], "label": doc["label"]})
res.append(doc_res)
pprint(res, max_string=200)
retriever = index.as_retriever(similarity_top_k=1)
QUERY = "How does pypdfium perform?"
nodes = retriever.retrieve(QUERY)
print(QUERY)
display_nodes(nodes)
We may want to restrict the retrieval to only those chunks containing tabular data, expecting to retrieve more quantitative information for our type of question:
filters = MetadataFilters(
filters=[MetadataFilter(key="doc_items.label", value="table")]
)
table_retriever = index.as_retriever(filters=filters, similarity_top_k=1)
nodes = table_retriever.retrieve(QUERY)
print(QUERY)
display_nodes(nodes)
Hybrid search combines keyword and semantic search to improve search relevance. To avoid relying on traditional score normalization techniques, the reciprocal rank fusion (RRF) feature on hybrid search can significantly improve the relevance of the retrieved chunks in our RAG system.
First, create a search pipeline and specify RRF as technique:
url = f"{OPENSEARCH_ENDPOINT}/_search/pipeline/rrf-pipeline"
headers = {"Content-Type": "application/json"}
body = {
"description": "Post processor for hybrid RRF search",
"phase_results_processors": [
{"score-ranker-processor": {"combination": {"technique": "rrf"}}}
],
}
response = requests.put(url, json=body, headers=headers)
print(response.text)
We can then repeat the previous steps to get a VectorStoreIndex object, leveraging the search pipeline that we just created:
client_rrf = OpensearchVectorClient(
endpoint=OPENSEARCH_ENDPOINT,
index=f"{OPENSEARCH_INDEX}-rrf",
dim=embed_dim,
engine="faiss",
embedding_field=embed_field,
text_field=text_field,
search_pipeline="rrf-pipeline",
)
vector_store_rrf = OpensearchVectorStore(client_rrf)
storage_context_rrf = StorageContext.from_defaults(vector_store=vector_store_rrf)
index_hybrid = VectorStoreIndex.from_documents(
documents=documents,
transformations=[node_parser, MetadataTransform()],
storage_context=storage_context_rrf,
embed_model=EMBED_MODEL,
)
The first retriever, which entirely relies on semantic (vector) search, fails to catch the supporting chunk for the given question in the top 1 position. Note that we highlight few expected keywords for illustration purposes.
QUERY = "Does Docling project provide a Dockerfile?"
retriever = index.as_retriever(similarity_top_k=3)
nodes = retriever.retrieve(QUERY)
exp = "Docling also provides a Dockerfile"
start = "[bold yellow]"
end = "[/]"
for idx, item in enumerate(nodes):
console.print(
f"*** k={idx + 1} ***\n{item.text.strip().replace(exp, f'{start}{exp}{end}')}"
)
However, the retriever with the hybrid search pipeline effectively recognizes the key paragraph in the first position:
retriever_rrf = index_hybrid.as_retriever(
vector_store_query_mode=VectorStoreQueryMode.HYBRID, similarity_top_k=3
)
nodes = retriever_rrf.retrieve(QUERY)
for idx, item in enumerate(nodes):
console.print(
f"*** k={idx + 1} ***\n{item.text.strip().replace(exp, f'{start}{exp}{end}')}"
)
Using small chunks can offer several benefits: it increases retrieval precision and it keeps the answer generation tightly focused, which improves accuracy, reduces hallucination, and speeds up inferece. However, your RAG system may overlook contextual information necessary for producing a fully grounded response.
Docling's preservation of document structure enables you to employ various strategies for enriching the context available during answer generation within the RAG pipeline. For example, after identifying the most relevant chunk, you might include adjacent chunks from the same section as additional groudning material before generating the final answer.
In the following example, the generated response is wrong, since the top retrieved chunks do not contain all the information that is required to answer the question.
QUERY = "According to the tests with arXiv and IBM Redbooks, which backend should I use if I have limited resources and complex tables?"
query_rrf = index_hybrid.as_query_engine(
vector_store_query_mode=VectorStoreQueryMode.HYBRID,
llm=GEN_MODEL,
similarity_top_k=3,
)
res = query_rrf.query(QUERY)
console.print(f"👤: {QUERY}\n🤖: {res.response.strip()}")
nodes = retriever_rrf.retrieve(QUERY)
for idx, item in enumerate(nodes):
console.print(
f"*** k={idx + 1} ***\n{item.text.strip().replace(exp, f'{start}{exp}{end}')}"
)
Even though the top retrieved chunks are relevant for the question, the key information lays in the paragraph after the first chunk:
If you need to run Docling in very low-resource environments, please consider configuring the pypdfium backend. While it is faster and more memory efficient than the default docling-parse backend, it will come at the expense of worse quality results, especially in table structure recovery.
We next examine the fragments that immediately precede and follow the top‑retrieved chunk, so long as those neighbors remain within the same section, to preserve the semantic integrity of the context. The generated answer is now accurate because it has been grounded in the necessary contextual information.
💡 In a production setting, it may be preferable to persist the parsed documents (i.e., DoclingDocument objects) as JSON in an object store or database and then fetch them when you need to traverse the document for context‑expansion scenarios. In this simplified example, however, we will query the OpenSearch index directly to obtain the required chunks.
top_headings = nodes[0].metadata["headings"]
top_text = nodes[0].text
rdr = ElasticsearchReader(endpoint=OPENSEARCH_ENDPOINT, index=OPENSEARCH_INDEX)
docs = rdr.load_data(
field=text_field,
query={
"query": {
"terms_set": {
"metadata.headings.keyword": {
"terms": top_headings,
"minimum_should_match_script": {"source": "params.num_terms"},
}
}
}
},
)
ext_nodes = []
for idx, item in enumerate(docs):
if item.text == top_text:
ext_nodes.append(NodeWithScore(node=Node(text=item.text), score=1.0))
if idx > 0:
ext_nodes.append(
NodeWithScore(node=Node(text=docs[idx - 1].text), score=1.0)
)
if idx < len(docs) - 1:
ext_nodes.append(
NodeWithScore(node=Node(text=docs[idx + 1].text), score=1.0)
)
break
synthesizer = get_response_synthesizer(llm=GEN_MODEL)
res = synthesizer.synthesize(query=QUERY, nodes=ext_nodes)
console.print(f"👤: {QUERY}\n🤖: {res.response.strip()}")