openviking/storage/vectordb/README.md
OpenViking 项目的高性能向量数据库模块,专为 AI Agent 场景设计,提供向量存储、检索和聚合分析能力。
VikingVectorIndex 采用分层架构设计:
Application Layer (用户代码/API)
↓
Collection Layer (集合管理、数据操作、索引协调)
↓
┌─────┴─────┐
↓ ↓
Index Layer Storage Layer
(向量检索) (三表存储)
↓ ↓
C++ Engine (abi3 绑定)
VikingVectorIndex 使用三张表分离不同职责:
C 表 (Candidate Table)
label (uint64)D 表 (Delta Table)
timestamp_labelT 表 (TTL Table)
expire_timestamp_labelVolatileIndex (内存索引)
PersistentIndex (持久化索引)
版本目录结构:
versions/
1704067200000000000/ # 版本快照
1704067200000000000.write_done # 完成标记
插入流程:
用户数据 → 验证 → 生成label → 向量化
↓
写入C/D/T表 → 通知所有索引更新
↓
C++引擎更新向量索引和标量索引
搜索流程:
查询向量 → 索引检索 + 标量过滤
↓
返回labels和scores → 从C表批量获取完整数据
↓
构造SearchResult返回
from openviking.storage.vectordb.collection.local_collection import get_or_create_local_collection
import random
# Step 1: 定义集合元数据
collection_meta_data = {
"CollectionName": "demo_collection",
"Fields": [
{"FieldName": "id", "FieldType": "int64", "IsPrimaryKey": True},
{"FieldName": "embedding", "FieldType": "vector", "Dim": 128},
{"FieldName": "text", "FieldType": "text"},
{"FieldName": "category", "FieldType": "text"},
{"FieldName": "score", "FieldType": "float32"},
{"FieldName": "priority", "FieldType": "int64"},
],
}
# Step 2: 创建集合(内存模式)
collection = get_or_create_local_collection(meta_data=collection_meta_data)
# 或创建持久化模式
# collection = get_or_create_local_collection(meta_data=collection_meta_data, path="./demo_db/")
# Step 3: 准备测试数据
data_list = []
categories = ["tech", "science", "art", "sports", "music"]
for i in range(1, 101):
data_list.append({
"id": i,
"embedding": [random.random() for _ in range(128)],
"text": f"This is document number {i}",
"category": categories[i % 5],
"score": round(random.uniform(0.5, 1.0), 2),
"priority": random.randint(1, 10)
})
# Step 4: 插入数据
result = collection.upsert_data(data_list)
print(f"Successfully inserted {len(result.ids)} documents")
# Step 5: 创建索引
index_meta_data = {
"IndexName": "demo_index",
"VectorIndex": {
"IndexType": "flat",
"Distance": "ip"
},
"ScalarIndex": ["category", "priority"],
}
collection.create_index("demo_index", index_meta_data)
print("Index created successfully")
# Step 6: 向量搜索
query_vector = [random.random() for _ in range(128)]
search_result = collection.search_by_vector(
index_name="demo_index",
dense_vector=query_vector,
limit=5
)
print("\n=== Search Results ===")
for item in search_result.data:
print(f"ID: {item.id}, Score: {item.score:.4f}")
# Step 7: 带过滤条件的搜索
search_result = collection.search_by_vector(
index_name="demo_index",
dense_vector=query_vector,
limit=5,
filters={"op": "must", "field": "category", "conds": ["tech", "science"]},
output_fields=["text", "category", "score"]
)
print("\n=== Filtered Search Results (tech or science) ===")
for item in search_result.data:
print(f"ID: {item.id}, Category: {item.fields.get('category')}, "
f"Score: {item.score:.4f}, Text: {item.fields.get('text')}")
# Step 8: 清理资源
collection.close()
from openviking.storage.vectordb.collection.local_collection import get_or_create_local_collection
# 定义集合元数据
meta_data = {
"CollectionName": "my_collection",
"Fields": [
{"FieldName": "id", "FieldType": "int64", "IsPrimaryKey": True},
{"FieldName": "vector", "FieldType": "vector", "Dim": 128},
{"FieldName": "text", "FieldType": "text"},
],
}
# 创建内存集合(进程结束后数据丢失)
collection = get_or_create_local_collection(meta_data=meta_data)
print(f"Collection '{collection.get_meta_data()['CollectionName']}' created in memory")
import os
# 创建持久化集合
persist_path = "./vectordb_data/my_persistent_collection"
os.makedirs(persist_path, exist_ok=True)
collection = get_or_create_local_collection(
meta_data=meta_data,
path=persist_path
)
print(f"Persistent collection created at: {persist_path}")
# 关闭集合
collection.close()
# 重新打开集合(数据自动恢复)
collection = get_or_create_local_collection(path=persist_path)
print("Collection reopened with all data restored")
# 自定义 TTL 清理和索引维护间隔
config = {
"ttl_cleanup_seconds": 10, # TTL 清理间隔 10 秒
"index_maintenance_seconds": 60 # 索引维护间隔 60 秒
}
collection = get_or_create_local_collection(
meta_data=meta_data,
path="./vectordb_data/",
config=config
)
print(f"Collection created with custom config: TTL cleanup every {config['ttl_cleanup_seconds']}s")
# 添加新字段
collection.update(
fields=[
{
"FieldName": "timestamp",
"FieldType": "int64",
"DefaultValue": 0
},
{
"FieldName": "tags",
"FieldType": "text",
"DefaultValue": ""
}
]
)
# 验证新字段
meta = collection.get_meta_data()
print(f"Collection now has {len(meta['Fields'])} fields")
for field in meta['Fields']:
print(f" - {field['FieldName']}: {field['FieldType']}")
import time
# 准备数据
data_list = [
{
"id": 1,
"vector": [0.1] * 128,
"text": "First document",
"timestamp": int(time.time()),
"tags": "important"
},
{
"id": 2,
"vector": [0.2] * 128,
"text": "Second document",
"timestamp": int(time.time()),
"tags": "review"
},
{
"id": 3,
"vector": [0.3] * 128,
"text": "Third document",
"timestamp": int(time.time()),
"tags": "archive"
}
]
# 插入数据
result = collection.upsert_data(data_list)
print(f"Inserted IDs: {result.ids}")
# 更新数据(相同 ID)
update_data = [
{
"id": 1,
"vector": [0.15] * 128,
"text": "Updated first document",
"timestamp": int(time.time()),
"tags": "updated"
}
]
result = collection.upsert_data(update_data)
print(f"Updated IDs: {result.ids}")
import time
# 插入 5 秒后过期的数据
ttl_data = [
{
"id": 100,
"vector": [1.0] * 128,
"text": "Temporary document",
"timestamp": int(time.time()),
"tags": "temp"
}
]
result = collection.upsert_data(ttl_data, ttl=5)
print(f"Inserted temporary data with ID: {result.ids}")
# 立即获取数据(成功)
fetch_result = collection.fetch_data([100])
print(f"Immediately fetched: {len(fetch_result.items)} items")
# 等待 TTL 过期
print("Waiting 10 seconds for TTL expiration...")
time.sleep(10)
# 再次获取(失败)
fetch_result = collection.fetch_data([100])
print(f"After TTL expiration: {fetch_result.ids_not_exist}")
# 获取多条数据
primary_keys = [1, 2, 3, 999] # 999 不存在
fetch_result = collection.fetch_data(primary_keys)
print(f"Found {len(fetch_result.items)} items")
for item in fetch_result.items:
print(f" ID: {item.fields['id']}, Text: {item.fields['text']}")
print(f"Not found IDs: {fetch_result.ids_not_exist}")
# 删除单条数据
collection.delete_data(primary_keys=[2])
print("Deleted ID: 2")
# 删除多条数据
collection.delete_data(primary_keys=[3, 100])
print("Deleted IDs: 3, 100")
# 验证删除
fetch_result = collection.fetch_data([1, 2, 3])
print(f"Remaining items: {len(fetch_result.items)}")
print(f"Not found: {fetch_result.ids_not_exist}")
# 删除所有数据(保留集合和索引结构)
collection.delete_all_data()
print("All data deleted")
# 验证
fetch_result = collection.fetch_data([1])
print(f"Items after delete_all: {len(fetch_result.items)}")
# 创建基本向量索引
basic_index_meta = {
"IndexName": "basic_index",
"VectorIndex": {
"IndexType": "flat",
"Distance": "ip"
}
}
collection.create_index("basic_index", basic_index_meta)
# 创建带标量索引的向量索引
scalar_index_meta = {
"IndexName": "scalar_index",
"VectorIndex": {
"IndexType": "flat",
"Distance": "l2"
},
"ScalarIndex": ["category", "priority", "timestamp"]
}
collection.create_index("scalar_index", scalar_index_meta)
# 创建混合索引(密集+稀疏向量)
hybrid_index_meta = {
"IndexName": "hybrid_index",
"VectorIndex": {
"IndexType": "flat_hybrid",
"Distance": "ip",
"SearchWithSparseLogitAlpha": 1.0
}
}
collection.create_index("hybrid_index", hybrid_index_meta)
# 列出所有索引
indexes = collection.list_indexes()
print(f"Total indexes: {len(indexes)}")
for idx_name in indexes:
print(f" - {idx_name}")
# 更新索引的标量字段和描述
collection.update_index(
index_name="basic_index",
scalar_index=["text", "tags"],
description="Updated basic index with text and tags fields"
)
# 获取索引元数据
index_meta = collection.get_index_meta_data("basic_index")
print(f"Index: {index_meta['IndexName']}")
print(f"Description: {index_meta.get('Description', 'N/A')}")
print(f"Scalar Index: {index_meta.get('ScalarIndex', [])}")
# 删除索引(不影响数据)
collection.drop_index("hybrid_index")
print("Index 'hybrid_index' dropped")
# 验证
remaining_indexes = collection.list_indexes()
print(f"Remaining indexes: {remaining_indexes}")
import random
# 准备测试数据
test_data = [
{"id": i, "vector": [random.random() for _ in range(128)],
"text": f"Document {i}", "category": ["tech", "science", "art"][i % 3]}
for i in range(1, 51)
]
collection.upsert_data(test_data)
# 创建索引
collection.create_index("test_index", {
"IndexName": "test_index",
"VectorIndex": {"IndexType": "flat", "Distance": "ip"},
"ScalarIndex": ["category"]
})
# 执行向量搜索
query_vector = [random.random() for _ in range(128)]
result = collection.search_by_vector(
index_name="test_index",
dense_vector=query_vector,
limit=10
)
print("=== Top 10 Similar Documents ===")
for i, item in enumerate(result.data, 1):
print(f"{i}. ID: {item.id}, Score: {item.score:.4f}")
# 过滤特定类别
result = collection.search_by_vector(
index_name="test_index",
dense_vector=query_vector,
limit=5,
filters={"op": "must", "field": "category", "conds": ["tech"]},
output_fields=["text", "category"]
)
print("\n=== Tech Category Results ===")
for item in result.data:
print(f"ID: {item.id}, Category: {item.fields['category']}, "
f"Text: {item.fields['text']}, Score: {item.score:.4f}")
# 添加带优先级的数据
priority_data = [
{"id": i, "vector": [random.random() for _ in range(128)],
"text": f"Priority doc {i}", "priority": i}
for i in range(1, 21)
]
collection.upsert_data(priority_data)
# 搜索优先级在 5-15 之间的文档
result = collection.search_by_vector(
index_name="test_index",
dense_vector=query_vector,
limit=10,
filters={"op": "range", "field": "priority", "gte": 5, "lte": 15},
output_fields=["text", "priority"]
)
print("\n=== Priority Range [5, 15] Results ===")
for item in result.data:
print(f"ID: {item.id}, Priority: {item.fields['priority']}, "
f"Score: {item.score:.4f}")
# 第一页(前 10 条)
page1 = collection.search_by_vector(
index_name="test_index",
dense_vector=query_vector,
limit=10,
offset=0,
output_fields=["text"]
)
print("\n=== Page 1 (offset=0, limit=10) ===")
for item in page1.data:
print(f"ID: {item.id}, Text: {item.fields['text']}")
# 第二页(10-20 条)
page2 = collection.search_by_vector(
index_name="test_index",
dense_vector=query_vector,
limit=10,
offset=10,
output_fields=["text"]
)
print("\n=== Page 2 (offset=10, limit=10) ===")
for item in page2.data:
print(f"ID: {item.id}, Text: {item.fields['text']}")
# 使用 ID=5 的向量搜索相似文档
result = collection.search_by_id(
index_name="test_index",
id=5,
limit=5,
output_fields=["text"]
)
print("\n=== Similar to Document ID=5 ===")
for item in result.data:
print(f"ID: {item.id}, Text: {item.fields['text']}, Score: {item.score:.4f}")
# 随机获取 10 条文档
result = collection.search_by_random(
index_name="test_index",
limit=10,
output_fields=["text", "category"]
)
print("\n=== Random 10 Documents ===")
for item in result.data:
print(f"ID: {item.id}, Category: {item.fields.get('category')}, "
f"Text: {item.fields['text']}")
# 带过滤的随机搜索
result = collection.search_by_random(
index_name="test_index",
limit=5,
filters={"op": "must", "field": "category", "conds": ["science"]},
output_fields=["text"]
)
print("\n=== Random 5 Science Documents ===")
for item in result.data:
print(f"ID: {item.id}, Text: {item.fields['text']}")
# 按优先级降序排列
result = collection.search_by_scalar(
index_name="test_index",
field="priority",
order="desc",
limit=5,
output_fields=["text", "priority"]
)
print("\n=== Top 5 by Priority (Descending) ===")
for item in result.data:
print(f"ID: {item.id}, Priority: {item.fields['priority']}, "
f"Score: {item.score}")
# 按优先级升序排列,带过滤
result = collection.search_by_scalar(
index_name="test_index",
field="priority",
order="asc",
limit=5,
filters={"op": "range", "field": "priority", "gte": 5},
output_fields=["text", "priority"]
)
print("\n=== Top 5 by Priority (Ascending, priority >= 5) ===")
for item in result.data:
print(f"ID: {item.id}, Priority: {item.fields['priority']}, "
f"Score: {item.score}")
# 获取索引中的总文档数
agg_result = collection.aggregate_data(
index_name="test_index",
op="count"
)
print(f"\n=== Total Document Count ===")
print(f"Total: {agg_result.total_count}")
# 按类别分组统计
agg_result = collection.aggregate_data(
index_name="test_index",
op="count",
field="category"
)
print("\n=== Count by Category ===")
for group in agg_result.groups:
print(f"{group['value']}: {group['count']}")
# 统计优先级 >= 10 的文档,按类别分组
agg_result = collection.aggregate_data(
index_name="test_index",
op="count",
field="category",
filters={"op": "range", "field": "priority", "gte": 10}
)
print("\n=== Count by Category (priority >= 10) ===")
for group in agg_result.groups:
print(f"{group['value']}: {group['count']}")
# 统计每个类别的文档数,只返回数量 >= 5 的类别
agg_result = collection.aggregate_data(
index_name="test_index",
op="count",
field="category",
cond={"gt": 5}
)
print("\n=== Categories with Count > 5 ===")
for group in agg_result.groups:
print(f"{group['value']}: {group['count']}")
# 不指定主键的集合(使用自动生成的 AUTO_ID)
auto_id_meta = {
"CollectionName": "auto_id_collection",
"Fields": [
{"FieldName": "content", "FieldType": "text"},
{"FieldName": "embedding", "FieldType": "vector", "Dim": 64},
]
}
auto_collection = get_or_create_local_collection(meta_data=auto_id_meta)
# 插入数据(无需指定 ID)
data = [
{"content": "Document A", "embedding": [random.random() for _ in range(64)]},
{"content": "Document B", "embedding": [random.random() for _ in range(64)]},
{"content": "Document C", "embedding": [random.random() for _ in range(64)]}
]
result = auto_collection.upsert_data(data)
auto_ids = result.ids
print(f"Auto-generated IDs: {auto_ids}")
# 使用自动生成的 ID 获取数据
fetch_result = auto_collection.fetch_data(auto_ids[:2])
print(f"\nFetched {len(fetch_result.items)} items using auto-generated IDs")
for item in fetch_result.items:
print(f" Content: {item.fields['content']}")
auto_collection.close()
import math
# 创建支持向量归一化的集合
normalized_meta = {
"CollectionName": "normalized_vectors",
"Fields": [
{"FieldName": "id", "FieldType": "int64", "IsPrimaryKey": True},
{"FieldName": "vector", "FieldType": "vector", "Dim": 128},
],
"VectorIndex": {
"NormalizeVector": True # 启用向量归一化
}
}
norm_collection = get_or_create_local_collection(meta_data=normalized_meta)
# 插入非归一化向量(系统会自动归一化)
raw_vector = [i * 0.1 for i in range(128)]
norm_collection.upsert_data([{"id": 1, "vector": raw_vector}])
# 创建索引
norm_collection.create_index("norm_index", {
"IndexName": "norm_index",
"VectorIndex": {"IndexType": "flat", "Distance": "ip"}
})
# 搜索时向量也会自动归一化
query = [i * 0.05 for i in range(128)]
result = norm_collection.search_by_vector(
index_name="norm_index",
dense_vector=query,
limit=1
)
print("Vector normalization enabled")
print(f"Search result score: {result.data[0].score:.4f}")
norm_collection.close()
must - 值必须在列表中# 单个值
filters = {"op": "must", "field": "category", "conds": ["tech"]}
# 多个值(OR 关系)
filters = {"op": "must", "field": "status", "conds": ["active", "pending", "review"]}
range - 范围查询# 大于等于
filters = {"op": "range", "field": "score", "gte": 0.5}
# 小于等于
filters = {"op": "range", "field": "priority", "lte": 10}
# 范围(闭区间)
filters = {"op": "range", "field": "age", "gte": 18, "lte": 65}
# 大于
filters = {"op": "range", "field": "price", "gt": 100}
# 小于
filters = {"op": "range", "field": "discount", "lt": 0.5}
time_range - 时间范围查询(date_time)date_time 字段使用 datetime.isoformat() 格式,例如 2026-02-06T12:34:56.123456。
不带时区的时间会按本地时区解析。
# 大于等于(ISO 时间字符串)
filters = {
"op": "time_range",
"field": "created_at",
"gte": "2026-02-01T00:00:00"
}
# 时间范围(闭区间)
filters = {
"op": "time_range",
"field": "created_at",
"gte": "2026-02-01T00:00:00",
"lte": "2026-02-07T23:59:59"
}
geo_range - 地理范围查询(geo_point)geo_point 字段写入格式为 "longitude,latitude",其中:
longitude ∈ (-180, 180)latitude ∈ (-90, 90)radius 支持 m 和 km 单位。
filters = {
"op": "geo_range",
"field": "f_geo_point",
"center": "116.412138,39.914912",
"radius": "10km"
}
# 示例1: 查找特定类别且高优先级的文档
result = collection.search_by_vector(
index_name="test_index",
dense_vector=query_vector,
filters={
"op": "must",
"field": "category",
"conds": ["tech", "science"]
},
limit=10
)
# 示例2: 查找特定分数范围的文档
result = collection.search_by_vector(
index_name="test_index",
dense_vector=query_vector,
filters={
"op": "range",
"field": "score",
"gte": 0.7,
"lte": 0.95
},
limit=10
)
collection.close() 释放资源| 方法 | 说明 | 返回值 |
|---|---|---|
create_index(name, meta) | 创建索引 | Index |
drop_index(name) | 删除索引 | None |
list_indexes() | 列出所有索引 | List[str] |
get_index_meta_data(name) | 获取索引元数据 | Dict |
update_index(name, scalar_index, description) | 更新索引 | None |
upsert_data(data_list, ttl) | 插入/更新数据 | UpsertResult |
fetch_data(primary_keys) | 获取数据 | FetchResult |
delete_data(primary_keys) | 删除数据 | None |
delete_all_data() | 删除所有数据 | None |
search_by_vector(...) | 向量搜索 | SearchResult |
search_by_id(...) | ID 搜索 | SearchResult |
search_by_random(...) | 随机搜索 | SearchResult |
search_by_scalar(...) | 标量排序搜索 | SearchResult |
search_by_keywords(...) | 关键词搜索 | SearchResult |
search_by_multimodal(...) | 多模态搜索 | SearchResult |
aggregate_data(...) | 数据聚合 | AggregateResult |
get_meta_data() | 获取集合元数据 | Dict |
update(fields) | 更新集合字段 | None |
close() | 关闭集合 | None |
drop() | 删除集合 | None |
欢迎提交 Issue 和 Pull Request!
本项目遵循 OpenViking 项目的许可证协议。