docs/examples/low_level/fusion_retriever.ipynb
<a href="https://colab.research.google.com/github/run-llama/llama_index/blob/main/docs/examples/low_level/fusion_retriever.ipynb" target="_parent"></a>
In this tutorial, we show you how to build an advanced retriever from scratch.
Specifically, we show you how to build our QueryFusionRetriever from scratch.
This is heavily inspired from the RAG-fusion repo here: https://github.com/Raudaschl/rag-fusion.
We load documents and build a simple vector index.
%pip install llama-index-readers-file pymupdf
%pip install llama-index-llms-openai
%pip install llama-index-retrievers-bm25
import nest_asyncio
nest_asyncio.apply()
!mkdir data
!wget --user-agent "Mozilla" "https://arxiv.org/pdf/2307.09288.pdf" -O "data/llama2.pdf"
If you're opening this Notebook on colab, you will probably need to install LlamaIndex 🦙.
!pip install llama-index
from pathlib import Path
from llama_index.readers.file import PyMuPDFReader
loader = PyMuPDFReader()
documents = loader.load(file_path="./data/llama2.pdf")
import os
os.environ["OPENAI_API_KEY"] = "sk-..."
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
llm = OpenAI(model="gpt-3.5-turbo", temperature=0.1)
embed_model = OpenAIEmbedding(
model="text-embedding-3-small", embed_batch_size=256
)
from llama_index.core import VectorStoreIndex
from llama_index.core.node_parser import SentenceSplitter
splitter = SentenceSplitter(chunk_size=1024)
index = VectorStoreIndex.from_documents(
documents, transformations=[splitter], embed_model=embed_model
)
We define an advanced retriever that performs the following steps:
Then in the next section we'll plug this into our response synthesis module.
The first step is to generate queries from the original query to better match the query intent, and increase precision/recall of the retrieved results. For instance, we might be able to rewrite the query into smaller queries.
We can do this by prompting ChatGPT.
from llama_index.core import PromptTemplate
query_str = "How do the models developed in this work compare to open-source chat models based on the benchmarks tested?"
query_gen_prompt_str = (
"You are a helpful assistant that generates multiple search queries based on a "
"single input query. Generate {num_queries} search queries, one on each line, "
"related to the following input query:\n"
"Query: {query}\n"
"Queries:\n"
)
query_gen_prompt = PromptTemplate(query_gen_prompt_str)
def generate_queries(llm, query_str: str, num_queries: int = 4):
fmt_prompt = query_gen_prompt.format(
num_queries=num_queries - 1, query=query_str
)
response = llm.complete(fmt_prompt)
queries = response.text.split("\n")
return queries
queries = generate_queries(llm, query_str, num_queries=4)
print(queries)
Now we run retrieval for each query. This means that we fetch the top-k most relevant results from each vector store.
NOTE: We can also have multiple retrievers. Then the total number of queries we run is NM, where N is number of retrievers and M is number of generated queries. Hence there will also be NM retrieved lists.
Here we'll use the retriever provided from our vector store. If you want to see how to build this from scratch please see our tutorial on this.
from tqdm.asyncio import tqdm
async def run_queries(queries, retrievers):
"""Run queries against retrievers."""
tasks = []
for query in queries:
for i, retriever in enumerate(retrievers):
tasks.append(retriever.aretrieve(query))
task_results = await tqdm.gather(*tasks)
results_dict = {}
for i, (query, query_result) in enumerate(zip(queries, task_results)):
results_dict[(query, i)] = query_result
return results_dict
# get retrievers
from llama_index.retrievers.bm25 import BM25Retriever
## vector retriever
vector_retriever = index.as_retriever(similarity_top_k=2)
## bm25 retriever
bm25_retriever = BM25Retriever.from_defaults(
docstore=index.docstore, similarity_top_k=2
)
results_dict = await run_queries(queries, [vector_retriever, bm25_retriever])
The next step here is to perform fusion: combining the results from several retrievers into one and re-ranking.
Note that a given node might be retrieved multiple times from different retrievers, so there needs to be a way to de-dup and rerank the node given the multiple retrievals.
We'll show you how to perform "reciprocal rank fusion": for each node, add up its reciprocal rank in every list where it's retrieved.
Then reorder nodes by highest score to least.
Full paper here: https://plg.uwaterloo.ca/~gvcormac/cormacksigir09-rrf.pdf
from typing import List
from llama_index.core.schema import NodeWithScore
def fuse_results(results_dict, similarity_top_k: int = 2):
"""Fuse results."""
k = 60.0 # `k` is a parameter used to control the impact of outlier rankings.
fused_scores = {}
text_to_node = {}
# compute reciprocal rank scores
for nodes_with_scores in results_dict.values():
for rank, node_with_score in enumerate(
sorted(
nodes_with_scores, key=lambda x: x.score or 0.0, reverse=True
)
):
text = node_with_score.node.get_content()
text_to_node[text] = node_with_score
if text not in fused_scores:
fused_scores[text] = 0.0
fused_scores[text] += 1.0 / (rank + k)
# sort results
reranked_results = dict(
sorted(fused_scores.items(), key=lambda x: x[1], reverse=True)
)
# adjust node scores
reranked_nodes: List[NodeWithScore] = []
for text, score in reranked_results.items():
reranked_nodes.append(text_to_node[text])
reranked_nodes[-1].score = score
return reranked_nodes[:similarity_top_k]
final_results = fuse_results(results_dict)
for n in final_results:
print(n.score, "\n", n.text, "\n********\n")
Analysis: The above code has a few straightforward components.
Now we're ready to define this as a custom retriever, and plug it into our RetrieverQueryEngine (which does retrieval and synthesis).
from typing import List
from llama_index.core import QueryBundle
from llama_index.core.retrievers import BaseRetriever
from llama_index.core.schema import NodeWithScore
import asyncio
class FusionRetriever(BaseRetriever):
"""Ensemble retriever with fusion."""
def __init__(
self,
llm,
retrievers: List[BaseRetriever],
similarity_top_k: int = 2,
) -> None:
"""Init params."""
self._retrievers = retrievers
self._similarity_top_k = similarity_top_k
self._llm = llm
super().__init__()
def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
"""Retrieve."""
queries = generate_queries(
self._llm, query_bundle.query_str, num_queries=4
)
results = asyncio.run(run_queries(queries, self._retrievers))
final_results = fuse_results(
results, similarity_top_k=self._similarity_top_k
)
return final_results
from llama_index.core.query_engine import RetrieverQueryEngine
fusion_retriever = FusionRetriever(
llm, [vector_retriever, bm25_retriever], similarity_top_k=2
)
query_engine = RetrieverQueryEngine(fusion_retriever)
response = query_engine.query(query_str)
print(str(response))