wren-ai-service/docs/code_design.md
This document aims to dive deep to the implementation details of wren-ai-service. We have two goals in mind while writing the document:
If you haven't setup the environment or don't know how to run wren-ai-service locally, please refer to the document here first.
wren-ai-service is basically an AI service which provides REST api endpoints for access. There are 4 main concepts to wren-ai-service: API endpoints, Services, Pipelines and Providers.
API endpoints: They are entry points for users to access several kinds of RAG(retrieval-augmented-generation) systems; you can also see API endpoints as encapsulation of Services. For example, when users need to ask a question in order to get SQL, they need to call /ask and there is AskService under the hood for background computation.Services: They are abstraction of business-logic concepts, such as AskService for users asking questions to get SQL results back, AskDetailsService for users to get SQL breakdown as several sub-steps in order to understand the logic behind the original SQL. Every service is composed of a series of pipelines.Pipelines: Basically RAG systems are actually implemented here. However, not all pipelines have complete indexing, retrieval and generation components; it depends on what's the purpose of the pipeline. Also, every pipeline contains some providers such as LLM provider, which represents an LLM.Providers: Now there are 4 kinds of providers:
wren-ai-service/src/__main__.pylifespan method, which is FastAPI's feature for defining startup and shutdown logic.# https://fastapi.tiangolo.com/advanced/events/#lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
# startup events
pipe_components = generate_components()
app.state.service_container = create_service_container(
pipe_components,
column_indexing_batch_size=(
int(os.getenv("COLUMN_INDEXING_BATCH_SIZE"))
if os.getenv("COLUMN_INDEXING_BATCH_SIZE")
else 50
),
table_retrieval_size=(
int(os.getenv("TABLE_RETRIEVAL_SIZE"))
if os.getenv("TABLE_RETRIEVAL_SIZE")
else 10
),
table_column_retrieval_size=(
int(os.getenv("TABLE_COLUMN_RETRIEVAL_SIZE"))
if os.getenv("TABLE_COLUMN_RETRIEVAL_SIZE")
else 1000
),
query_cache={
# the maxsize is a necessary parameter to init cache, but we don't want to expose it to the user
# so we set it to 1_000_000, which is a large number
"maxsize": 1_000_000,
"ttl": int(os.getenv("QUERY_CACHE_TTL") or 120),
},
)
app.state.service_metadata = create_service_metadata(pipe_components)
init_langfuse()
yield
# shutdown events
langfuse_context.flush()
.env.dev locally, then you can prepare config.yaml and run just start.wren-ai-service/src/globals.pywren-ai-service/src/web/v1/routersbackground_tasks. For example, after the ask api is invoked, the response is immediately returned, then users need to conduct polling in order to get the latest task status; and once the status is finished, the result is returned correspondinglywren-ai-service/src/web/v1/serviceswren-ai-service/src/pipelineswren-ai-service/src/core/pipeline.pywren-ai-service/src/providerswren-ai-service/src/core/provider.pywren-ai-service/src/core/engine.py