Back to Weknora

使用其他向量数据库

docs/使用其他向量数据库.md

0.5.210.4 KB
Original Source

如何集成新的向量数据库

本文提供了向 WeKnora 项目添加新向量数据库支持的完整指南。通过实现标准化接口和遵循结构化流程,开发者可以高效地集成自定义向量数据库。

集成流程

1. 实现基础检索引擎接口

首先需要实现 interfaces 包中的 RetrieveEngine 接口,定义检索引擎的核心能力:

go
type RetrieveEngine interface {
    // 返回检索引擎的类型标识
    EngineType() types.RetrieverEngineType

    // 执行检索操作,返回匹配结果
    Retrieve(ctx context.Context, params types.RetrieveParams) ([]*types.RetrieveResult, error)

    // 返回该引擎支持的检索类型列表
    Support() []types.RetrieverType
}

2. 实现存储层接口

实现 RetrieveEngineRepository 接口,扩展基础检索引擎能力,添加索引管理功能:

go
type RetrieveEngineRepository interface {
    // 保存单个索引信息
    Save(ctx context.Context, indexInfo *types.IndexInfo, params map[string]any) error
    
    // 批量保存多个索引信息
    BatchSave(ctx context.Context, indexInfoList []*types.IndexInfo, params map[string]any) error
    
    // 估算索引存储所需空间
    EstimateStorageSize(ctx context.Context, indexInfoList []*types.IndexInfo, params map[string]any) int64
    
    // 通过分块ID列表删除索引
    DeleteByChunkIDList(ctx context.Context, indexIDList []string, dimension int) error
    
    // 复制索引数据,避免重新计算嵌入向量
    CopyIndices(
        ctx context.Context,
        sourceKnowledgeBaseID string,
        sourceToTargetKBIDMap map[string]string,
        sourceToTargetChunkIDMap map[string]string,
        targetKnowledgeBaseID string,
        dimension int,
    ) error
    
    // 通过知识ID列表删除索引
    DeleteByKnowledgeIDList(ctx context.Context, knowledgeIDList []string, dimension int) error
    
    // 继承RetrieveEngine接口
    RetrieveEngine
}

3. 实现服务层接口

创建实现 RetrieveEngineService 接口的服务,负责处理索引创建和管理的业务逻辑:

go
type RetrieveEngineService interface {
    // 创建单个索引
    Index(ctx context.Context,
        embedder embedding.Embedder,
        indexInfo *types.IndexInfo,
        retrieverTypes []types.RetrieverType,
    ) error

    // 批量创建索引
    BatchIndex(ctx context.Context,
        embedder embedding.Embedder,
        indexInfoList []*types.IndexInfo,
        retrieverTypes []types.RetrieverType,
    ) error

    // 估算索引存储空间
    EstimateStorageSize(ctx context.Context,
        embedder embedding.Embedder,
        indexInfoList []*types.IndexInfo,
        retrieverTypes []types.RetrieverType,
    ) int64
    
    // 复制索引数据
    CopyIndices(
        ctx context.Context,
        sourceKnowledgeBaseID string,
        sourceToTargetKBIDMap map[string]string,
        sourceToTargetChunkIDMap map[string]string,
        targetKnowledgeBaseID string,
        dimension int,
    ) error

    // 删除索引
    DeleteByChunkIDList(ctx context.Context, indexIDList []string, dimension int) error
    DeleteByKnowledgeIDList(ctx context.Context, knowledgeIDList []string, dimension int) error

    // 继承RetrieveEngine接口
    RetrieveEngine
}

4. 添加环境变量配置

在环境配置中添加新数据库的必要连接参数:

# 在RETRIEVE_DRIVER中添加新数据库驱动名称(多个驱动用逗号分隔)
RETRIEVE_DRIVER=postgres,elasticsearch_v8,your_database

# 新数据库的连接参数
YOUR_DATABASE_ADDR=your_database_host:port
YOUR_DATABASE_USERNAME=username
YOUR_DATABASE_PASSWORD=password
# 其他必要的连接参数...

5. 注册检索引擎

internal/container/container.go 文件的 initRetrieveEngineRegistry 函数中添加新数据库的初始化与注册逻辑:

go
func initRetrieveEngineRegistry(db *gorm.DB, cfg *config.Config) (interfaces.RetrieveEngineRegistry, error) {
    registry := retriever.NewRetrieveEngineRegistry()
    retrieveDriver := strings.Split(os.Getenv("RETRIEVE_DRIVER"), ",")
    log := logger.GetLogger(context.Background())

    // 已有的PostgreSQL和Elasticsearch初始化代码...
    
    // 添加新向量数据库的初始化代码
    if slices.Contains(retrieveDriver, "your_database") {
        // 初始化数据库客户端
        client, err := your_database.NewClient(your_database.Config{
            Addresses: []string{os.Getenv("YOUR_DATABASE_ADDR")},
            Username:  os.Getenv("YOUR_DATABASE_USERNAME"),
            Password:  os.Getenv("YOUR_DATABASE_PASSWORD"),
            // 其他连接参数...
        })
        
        if err != nil {
            log.Errorf("Create your_database client failed: %v", err)
        } else {
            // 创建检索引擎仓库
            yourDatabaseRepo := your_database.NewYourDatabaseRepository(client, cfg)
            
            // 注册检索引擎
            if err := registry.Register(
                retriever.NewKVHybridRetrieveEngine(
                    yourDatabaseRepo, types.YourDatabaseRetrieverEngineType,
                ),
            ); err != nil {
                log.Errorf("Register your_database retrieve engine failed: %v", err)
            } else {
                log.Infof("Register your_database retrieve engine success")
            }
        }
    }

    return registry, nil
}

6. 定义检索引擎类型常量

internal/types/retriever.go 文件中添加新的检索引擎类型常量:

go
// RetrieverEngineType 定义检索引擎类型
const (
    ElasticsearchRetrieverEngineType RetrieverEngineType = "elasticsearch"
    PostgresRetrieverEngineType      RetrieverEngineType = "postgres"
    YourDatabaseRetrieverEngineType  RetrieverEngineType = "your_database" // 添加新数据库类型
)

参考实现示例

建议参考现有的 PostgreSQL 和 Elasticsearch 实现作为开发模板。这些实现位于以下目录:

  • PostgreSQL: internal/application/repository/retriever/postgres/
  • ElasticsearchV7: internal/application/repository/retriever/elasticsearch/v7/
  • ElasticsearchV8: internal/application/repository/retriever/elasticsearch/v8/
  • Apache Doris 4.1: internal/application/repository/retriever/doris/
  • Tencent VectorDB: internal/application/repository/retriever/tencentvectordb/

通过遵循以上步骤和参考现有实现,你可以成功集成新的向量数据库到 WeKnora 系统中,扩展其向量检索能力。

Apache Doris 4.1 集成说明

Doris 是一种 MPP 风格的分析型数据库;它的接入策略与 NoSQL 向量库(Qdrant/Milvus/Weaviate)有几处特殊点:

协议层

通道端口用途
MySQL 协议FE 9030主链路 CRUD、ANN 检索、全文检索
HTTP APIFE 8030 / BE 8040Stream Load partial update

WeKnora 通过 database/sql + go-sql-driver/mysql 调用 MySQL 协议; 通过 net/http 调用 Stream Load。两条通道复用同一份用户名/密码。

表结构与维度分表

每个 embedding 维度对应一张物理表 <DORIS_TABLE_PREFIX>_<dim>(如 weknora_embeddings_768)。 表关键属性:

sql
ENGINE=OLAP
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES(
    "replication_num"="1",
    "enable_unique_key_merge_on_write"="true"
);

enable_unique_key_merge_on_write=true 是 Stream Load partial update 的前提条件。

索引

  • 倒排索引(INVERTED):chunk_id / knowledge_id / knowledge_base_id / source_id / tag_id / is_enabled 用于过滤;content 加上 parser=chinese 支持中文全文检索。
  • ANN 索引(HNSW + cosine_distance):在 embedding ARRAY<FLOAT> 列上构建。

注:Doris ANN 索引在建表后异步构建,索引未就绪期间查询会退化为 brute-force(结果正确,速度较慢)。WeKnora 在 ensureTable 中会轮询 SHOW INDEX FROM <table> 等待 idx_emb 进入 FINISHED/NORMAL 状态,超时上限 30s。

分数语义

向量检索使用:

sql
1 - cosine_distance_approximate(embedding, <vec>) AS score

将 distance 翻转为 similarity,与 Qdrant cosine 相似度方向一致:值越大越相似。 threshold 比较使用 HAVING score >= ?、排序使用 ORDER BY score DESC LIMIT ?

关键词检索

依赖 Doris 内建的 MATCH_ANYchinese parser,无需在 Go 端做 jieba 分词。 跨维度的多张表会逐表查询并合并取 topK,与 Milvus/Weaviate 现状一致。

批量字段更新

BatchUpdateChunkEnabledStatus / BatchUpdateChunkTagID 通过 Stream Load partial update 实现:

  • HTTP PUT http://<fe_http>/api/<db>/<table>/_stream_load
  • Headers:partial_columns: truecolumns: id,is_enabledmerge_type: APPENDformat: jsonstrip_outer_array: true
  • Body:[{"id": "...", "is_enabled": true}, ...]

每批 ≤ 1MiB 自动拆批,请求体通过 bytes.Reader + req.GetBody 闭包构造, 确保 FE → BE 的 307 redirect 时可以重发 Body。

环境变量

bash
RETRIEVE_DRIVER=doris

DORIS_ADDR=doris-fe:9030       # FE MySQL 协议地址
DORIS_HTTP_PORT=8030           # FE HTTP 端口(Stream Load)
DORIS_DATABASE=weknora         # 目标库
DORIS_USERNAME=root
DORIS_PASSWORD=
DORIS_TABLE_PREFIX=weknora_embeddings

本地起 Doris

bash
docker compose --profile doris up -d
docker exec -it WeKnora-doris-fe mysql -h 127.0.0.1 -P 9030 -uroot \
    -e "CREATE DATABASE IF NOT EXISTS weknora;"

随后启动 WeKnora 后端,知识库写入即会按维度自动建表。

Tencent VectorDB

WeKnora 内置 Tencent VectorDB 适配器,驱动名为 tencent_vectordb。该适配器支持向量检索、基于 BM25 sparse vector 的关键词检索和索引管理,可参与 WeKnora 上层混合检索。

环境变量示例

env
RETRIEVE_DRIVER=tencent_vectordb
TENCENT_VECTORDB_ADDR=http://your-instance.tencentvectordb.com
TENCENT_VECTORDB_USERNAME=root
TENCENT_VECTORDB_API_KEY=your_tencent_vectordb_api_key
TENCENT_VECTORDB_DATABASE=weknora
TENCENT_VECTORDB_COLLECTION=weknora_embeddings

TENCENT_VECTORDB_COLLECTION 是集合名前缀。WeKnora 会按向量维度创建实际集合,例如 weknora_embeddings_768,用于隔离不同 embedding 模型维度的数据。

关键词检索依赖 Tencent VectorDB sparse vector 索引。新建集合会自动创建 sparse_vector 索引;旧版本已创建的向量集合如果没有该索引,需要重建集合并重新导入知识库数据后才能启用关键词检索。