docs/数据源导入开发文档.md
WeKnora 的数据源导入模块支持从外部平台(飞书、企业微信、Notion、Confluence 等)自动导入和同步内容到知识库。用户可配置数据源连接,选择需要同步的资源,并通过手动触发或定时调度自动完成内容的增量/全量同步。
数据源绑定到知识库,一个知识库可接入多个数据源。所有配置通过前端知识库设置页面管理,凭证使用 AES-256-GCM 加密存储在数据库中。
从飞书知识库(Wiki)同步文档到 WeKnora。支持 docx、doc、sheet、bitable、file 类型文档的自动导入。
在 权限管理 中搜索并开通以下权限:
| 权限 | 说明 |
|---|---|
wiki:wiki:readonly | 读取知识库空间列表和节点树 |
drive:drive:readonly | 读取云文档基础信息 |
drive:export:readonly | 导出文档内容(docx/xlsx) |
docx:document:readonly | 读取新版文档内容 |
在 版本管理与发布 中创建版本并提交审核。审核通过后方可正常调用 API。
保存后可点击 立即同步 触发首次同步。在同步日志中查看同步进度和结果:
注意:飞书国际版(Lark)同样支持,在创建数据源时会自动适配
https://open.larksuite.com的 API 地址。
数据源在知识库设置页的 数据源 标签页中管理。
添加数据源使用多步向导流程:
每个数据源以卡片形式展示,包含:
每个数据源可查看历史同步日志,包含:
┌───────────────────────────────────────────────────────────────────────────┐
│ 数据源导入架构 │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 飞书 │ │ Notion │ │Confluence│ │ GitHub │ │ 其他... │ │
│ │ (Wiki) │ │ │ │ │ │ │ │ │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ API │ API │ API │ API │ │
│ ─────┼─────────────┼────────────┼─────────────┼─────────────┼────── │
│ ▼ ▼ ▼ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Feishu │ │ Notion │ │Confluence│ │ GitHub │ │ ... │ │
│ │Connector │ │Connector │ │Connector │ │Connector │ │Connector │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │ │ 连接 │
│ ─────┴─────────────┴─────────────┴──────────────┴─────────────┘ 器层 │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ ConnectorRegistry │ 连接器注册表 │
│ │ · 按类型查找连接器实例 │ │
│ │ · 连接器元数据(名称、认证方式、能力) │ │
│ └──────────────────┬───────────────────────────┘ │
│ │ │
│ ───────────────────┼──────────────────────────────────────────────── │
│ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ DataSourceService (业务逻辑层) │ │
│ │ │ │
│ │ ┌────────────────────────────────────────┐ │ │
│ │ │ · 数据源 CRUD │ │ │
│ │ │ · 连接验证 (ValidateConnection) │ │ │
│ │ │ · 资源发现 (ListAvailableResources) │ │ │
│ │ │ · 手动同步 (ManualSync) │ │ │
│ │ │ · 暂停/恢复 │ │ │
│ │ │ · 同步执行 (ProcessSync) │ │ │
│ │ │ · 内容入库 (ingestItem) │ │ │
│ │ └────────────────────────────────────────┘ │ │
│ └──────────────────┬───────────────────────────┘ │
│ │ │
│ ───────────────────┼──────────────────────────────────────────────── │
│ ▼ │
│ ┌──────────────────────────────────┐ ┌────────────────────────────┐ │
│ │ Scheduler (Cron 调度器) │ │ Task Queue (asynq) │ │
│ │ · robfig/cron(含秒) │ │ · 异步同步任务 │ │
│ │ · DB 加载活跃数据源 │ │ · Redis / Lite 模式 │ │
│ │ · 任务去重 (TaskID + Running) │ │ · TypeDataSourceSync │ │
│ └──────────────────────────────────┘ └────────────────────────────┘ │
│ │ │
│ ───────────────────┼──────────────────────────────────────────────── │
│ ▼ │
│ ┌──────────────────────────────────────────────────────┐ │
│ │ WeKnora Core (知识入库管道) │ │
│ │ KnowledgeService · KnowledgeBaseService │ │
│ │ 文档解析 → 分块 → 向量化 → 索引 │ │
│ └──────────────────────────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────────────────┘
设计模式:
| 模式 | 用途 |
|---|---|
| Adapter Pattern | 统一不同平台的差异,每个平台实现 Connector 接口 |
| Registry Pattern | 通过 ConnectorRegistry 按类型动态查找连接器实例 |
| Strategy Pattern | SyncMode(增量/全量)、ConflictStrategy(覆盖/跳过)可选策略 |
| Producer-Consumer | 手动/定时触发 → asynq 任务队列 → Worker 异步执行同步 |
| Cursor-based Pagination | 增量同步基于 SyncCursor 状态跟踪变更 |
数据源配置存储在 data_sources 表中,绑定到知识库:
CREATE TABLE data_sources (
id VARCHAR(36) PRIMARY KEY,
tenant_id BIGINT NOT NULL,
knowledge_base_id VARCHAR(36) NOT NULL,
name VARCHAR(255) NOT NULL DEFAULT '',
type VARCHAR(50) NOT NULL, -- 连接器类型
config JSONB NOT NULL DEFAULT '{}', -- 加密的凭证和配置
sync_schedule VARCHAR(100) DEFAULT '', -- Cron 表达式(6 段,含秒)
sync_mode VARCHAR(20) DEFAULT 'incremental', -- 'incremental' | 'full'
status VARCHAR(20) DEFAULT 'active', -- 'active' | 'paused' | 'error' | 'deleted'
conflict_strategy VARCHAR(20) DEFAULT 'overwrite', -- 'overwrite' | 'skip'
sync_deletions BOOLEAN DEFAULT true,
last_sync_at TIMESTAMPTZ,
last_sync_cursor JSONB,
last_sync_result JSONB,
error_message TEXT DEFAULT '',
sync_log_retention_days INTEGER DEFAULT 30,
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
deleted_at TIMESTAMPTZ
);
config 字段结构(解密后):
{
"type": "feishu",
"credentials": {
"app_id": "cli_xxx",
"app_secret": "xxx"
},
"resource_ids": ["space_id_1", "space_id_2"],
"settings": {}
}
各连接器凭证字段:
| 连接器类型 | 凭证字段 | 认证方式 |
|---|---|---|
| 飞书 (feishu) | app_id, app_secret | OAuth2 (Tenant Access Token) |
同步日志记录每次同步操作的执行情况:
CREATE TABLE sync_logs (
id VARCHAR(36) PRIMARY KEY,
data_source_id VARCHAR(36) NOT NULL REFERENCES data_sources(id) ON DELETE CASCADE,
tenant_id BIGINT NOT NULL,
status VARCHAR(20) NOT NULL, -- 'running' | 'success' | 'partial' | 'failed' | 'canceled'
started_at TIMESTAMPTZ NOT NULL,
finished_at TIMESTAMPTZ,
items_total INTEGER DEFAULT 0,
items_created INTEGER DEFAULT 0,
items_updated INTEGER DEFAULT 0,
items_deleted INTEGER DEFAULT 0,
items_skipped INTEGER DEFAULT 0,
items_failed INTEGER DEFAULT 0,
error_message TEXT DEFAULT '',
result JSONB, -- 详细同步结果
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);
所有端点位于 /api/v1/datasource 路径下,需认证。
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /api/v1/datasource/types | 获取可用连接器类型列表 |
响应示例:
[
{
"type": "feishu",
"name": "飞书",
"description": "Import documents from Feishu/Lark Wiki spaces",
"icon": "feishu",
"priority": 1,
"auth_type": "oauth2",
"capabilities": ["incremental", "deletion_sync"]
}
]
| 方法 | 路径 | 说明 |
|---|---|---|
| POST | /api/v1/datasource/validate-credentials | 验证凭证(创建前的测试连接) |
请求体:
{
"type": "feishu",
"credentials": {
"app_id": "cli_xxx",
"app_secret": "xxx"
}
}
| 方法 | 路径 | 说明 |
|---|---|---|
| POST | /api/v1/datasource | 创建数据源 |
| GET | /api/v1/datasource?kb_id=xxx | 列出知识库的所有数据源 |
| GET | /api/v1/datasource/:id | 获取数据源详情 |
| PUT | /api/v1/datasource/:id | 更新数据源配置 |
| DELETE | /api/v1/datasource/:id | 删除数据源(软删除) |
| 方法 | 路径 | 说明 |
|---|---|---|
| POST | /api/v1/datasource/:id/validate | 测试已有数据源的连接(会更新状态) |
| GET | /api/v1/datasource/:id/resources | 浏览外部系统的可用资源 |
| POST | /api/v1/datasource/:id/sync | 手动触发同步 |
| POST | /api/v1/datasource/:id/pause | 暂停数据源(停止定时同步) |
| POST | /api/v1/datasource/:id/resume | 恢复数据源 |
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /api/v1/datasource/:id/logs?limit=10&offset=0 | 获取同步日志列表 |
| GET | /api/v1/datasource/logs/:log_id | 获取单条同步日志详情 |
每个数据源代表一个外部平台连接与 WeKnora 知识库的绑定关系。一个知识库可以添加多个数据源,实现从多个平台汇聚内容。
数据源的核心属性:
feishu、notion 等incremental(增量同步,只同步变更)或 full(全量同步,每次拉取所有)连接器是与外部平台交互的适配层,每个平台实现一个连接器。所有连接器通过 ConnectorRegistry 注册和管理。
连接器提供四个核心能力:
| 能力 | 说明 |
|---|---|
Validate | 验证凭证和连接是否可用 |
ListResources | 列出外部系统中可选的资源(如知识库空间、文件夹) |
FetchAll | 全量拉取指定资源中的所有文档 |
FetchIncremental | 基于游标增量拉取变更的文档 |
增量同步的状态标记,记录上次同步的位点。游标内容因连接器类型而异,由连接器自行管理。如飞书连接器的游标包含每个节点的最后编辑时间,用于判断哪些文档发生了变更。
type SyncCursor struct {
LastSyncTime *time.Time // 上次同步时间
ConnectorCursor map[string]interface{} // 连接器自定义状态
LastSchemaHash string // Schema 变更检测
}
连接器从外部平台拉取文档后,统一封装为 FetchedItem:
type FetchedItem struct {
ExternalID string // 外部平台的文档唯一标识
Title string // 文档标题
Content []byte // 文档内容(二进制)
ContentType string // MIME 类型
FileName string // 文件名(含扩展名)
URL string // 原始 URL
UpdatedAt *time.Time // 最后更新时间
Metadata map[string]string // 元数据(来源、作者等)
IsDeleted bool // 标记为已删除(增量同步)
SourceResourceID string // 所属资源 ID
}
外部系统中可选择同步的资源节点,支持层级结构:
type Resource struct {
ExternalID string // 外部 ID
Name string // 名称
Type string // 类型(如 wiki_space)
Description string // 描述
URL string // 外部链接
ModifiedAt *time.Time // 最后修改时间
ParentID string // 父节点 ID(支持树形结构)
Metadata map[string]string // 附加信息
}
手动触发 / Cron 定时触发
│
▼
┌─ 创建 SyncLog (status: running) ──────────────────┐
│ 入队异步任务 (asynq: TypeDataSourceSync) │
│ · 手动同步 → 队列: default │
│ · 定时同步 → 队列: low, TaskID 去重 │
└──────────────────────────┬────────────────────────┘
│
▼
┌─ ProcessSync (Worker 异步执行) ───────────────────┐
│ 1. 解析任务体 (DataSourceSyncPayload) │
│ 2. 加载数据源配置 │
│ 3. 解密凭证 → 获取 Connector │
│ 4. 设置租户上下文 │
│ 5. 创建/获取自动标签 (连接器名·数据源名) │
│ 6. 判断同步模式: │
│ ├─ ForceFull 或 SyncModeFull → FetchAll │
│ └─ SyncModeIncremental → FetchIncremental │
│ 7. 遍历 FetchedItem 列表: │
│ └─ ingestItem → KnowledgeService │
│ 8. 更新 SyncLog (计数、状态、耗时) │
│ 9. 更新 DataSource (游标、最后同步时间、结果) │
└──────────────────────────────────────────────────┘
FetchedItem
│
├─ IsDeleted = true?
│ └─ 按 external_id 查找已有知识 → 软删除
│
├─ 有文件内容 (Content)?
│ └─ CreateKnowledgeFromFile
│ · 按 external_id 查找已有知识
│ · 已存在 → 删除旧版本 → 重建
│ · 不存在 → 新建
│ · metadata: external_id, source_resource_id, datasource_id
│ · channel: 连接器类型 (如 "feishu")
│ · 自动关联标签
│
└─ 仅有 URL (无 Content)?
└─ CreateKnowledgeFromURL
· 同上查重逻辑
创建数据源 (前端向导)
│
▼
┌─ DataSourceService.Create ────────────────────┐
│ 1. 验证知识库存在 │
│ 2. 获取对应 Connector │
│ 3. 验证配置和凭证 (connector.Validate) │
│ 4. 加密凭证写库 │
│ 5. 若有 SyncSchedule → 注册 Cron 任务 │
└───────────────────────────────────────────────┘
暂停数据源:
status → paused, 移除 Cron 任务
恢复数据源:
status → active, 重新注册 Cron 任务
删除数据源:
deleted_at = NOW(), 移除 Cron 任务, 级联删除 sync_logs
连接错误:
ValidateConnection 失败 → status → error, 记录 error_message
ValidateConnection 成功且原为 error → status → active, 清空 error_message
type Connector interface {
Type() string
Validate(ctx context.Context, config *types.DataSourceConfig) error
ListResources(ctx context.Context, config *types.DataSourceConfig) ([]types.Resource, error)
FetchAll(ctx context.Context, config *types.DataSourceConfig, resourceIDs []string) ([]types.FetchedItem, error)
FetchIncremental(ctx context.Context, config *types.DataSourceConfig, cursor *types.SyncCursor) ([]types.FetchedItem, *types.SyncCursor, error)
}
| 方法 | 职责 |
|---|---|
Type() | 返回连接器类型标识,用于注册和路由 |
Validate() | 验证凭证和连接可用性,如尝试获取 Access Token |
ListResources() | 列出外部系统中可选的顶层资源(如知识库空间、工作区) |
FetchAll() | 全量拉取指定资源下的所有文档,返回 FetchedItem 列表 |
FetchIncremental() | 基于上次游标增量拉取变更,返回变更列表和新游标 |
type DataSourceService interface {
CreateDataSource(ctx context.Context, ds *types.DataSource) (*types.DataSource, error)
GetDataSource(ctx context.Context, id string) (*types.DataSource, error)
ListDataSources(ctx context.Context, knowledgeBaseID string) ([]*types.DataSource, error)
UpdateDataSource(ctx context.Context, ds *types.DataSource) (*types.DataSource, error)
DeleteDataSource(ctx context.Context, id string) error
ValidateConnection(ctx context.Context, dataSourceID string) error
ValidateCredentials(ctx context.Context, connectorType string, credentials map[string]interface{}) error
ListAvailableResources(ctx context.Context, dataSourceID string) ([]types.Resource, error)
ManualSync(ctx context.Context, dataSourceID string) (*types.SyncLog, error)
PauseDataSource(ctx context.Context, dataSourceID string) error
ResumeDataSource(ctx context.Context, dataSourceID string) error
GetSyncLogs(ctx context.Context, dataSourceID string, limit, offset int) ([]*types.SyncLog, error)
GetSyncLog(ctx context.Context, syncLogID string) (*types.SyncLog, error)
ProcessSync(ctx context.Context, task *asynq.Task) error
}
type ConnectorMetadata struct {
Type string `json:"type"`
Name string `json:"name"`
Description string `json:"description"`
Icon string `json:"icon"`
Priority int `json:"priority"`
AuthType string `json:"auth_type"`
Capabilities []string `json:"capabilities"`
}
| 字段 | 说明 |
|---|---|
Type | 连接器类型标识(如 feishu) |
Name | 显示名称(如 飞书) |
Priority | 排序优先级,数值越小越靠前 |
AuthType | 认证方式:oauth2 / api_key / token / password / none |
Capabilities | 支持的能力:incremental(增量同步)、webhook、deletion_sync(删除同步) |
飞书连接器支持从飞书知识库(Wiki)同步文档到 WeKnora。
App ID + App Secret
│
▼
POST /open-apis/auth/v3/tenant_access_token/internal
│
▼
获取 Tenant Access Token(有效期 2 小时)
│
▼
Token 缓存,提前 5 分钟刷新
https://open.feishu.cn,国际版 Lark 使用 https://open.larksuite.comListResources 调用飞书 Wiki Space API 列出所有知识库空间:
GET /open-apis/wiki/v2/spaces (分页, page_size=50)
每个知识库空间映射为一个 Resource:
| 字段 | 值 |
|---|---|
Type | wiki_space |
ExternalID | 空间 ID (space_id) |
URL | https://feishu.cn/wiki/{space_id} |
Metadata.visibility | 空间可见性 |
对每个选中的 Wiki Space:
1. 递归列举空间下所有节点 (ListAllWikiNodesRecursive)
└─ GET /open-apis/wiki/v2/spaces/{space_id}/nodes (parent 为空 → 顶层)
└─ 对 has_child=true 的节点递归
2. 对每个节点:
└─ fetchNodeContent → 判断文档类型 → 导出/下载
增量同步基于节点编辑时间对比,而非飞书事件订阅:
1. 加载上次游标 (feishuCursor)
└─ SpaceNodeTimes[space_id][node_token] = 上次记录的编辑时间
2. 对每个 Space 递归列举所有节点(全树遍历)
3. 对比变更:
├─ 新节点(游标中不存在)→ 拉取内容
├─ obj_edit_time 变更(优先)或 node_edit_time 变更 → 拉取内容
├─ 编辑时间未变 → 跳过
└─ 游标中存在但本次遍历不存在 → 标记为 IsDeleted
4. 返回新游标(含本轮每个节点的最新编辑时间)
注意:增量同步的 API 调用量与知识库空间的节点树大小相关,因为需要遍历完整节点树来检测变更。对于大型空间,建议适当降低同步频率。
obj_type | 支持 | 获取方式 | 导出格式 |
|---|---|---|---|
docx | 是 | 导出任务 (Export API) | .docx |
doc | 是 | 导出任务 (Export API) | .docx |
sheet | 是 | 导出任务 (Export API) | .xlsx |
bitable | 是 | 导出任务 (Export API) | .xlsx |
file | 是 | 直接下载 (Drive Download API) | 原格式 |
mindnote | 否 | — | — |
slides | 否 | — | — |
文档 (docx/doc/sheet/bitable):
1. CreateExportTask → 创建导出任务
2. 轮询 GetExportTaskStatus(间隔 2 秒,最长约 60 秒)
3. DownloadExportFile → 用 file_token 下载导出文件
4. 清理文件名 (sanitizeFileName) + 补全扩展名
文件 (file):
DownloadDriveFile → GET /drive/v1/files/{token}/download
| 文件 | 职责 |
|---|---|
internal/datasource/connector/feishu/types.go | 飞书 API 类型定义、配置结构、常量 |
internal/datasource/connector/feishu/client.go | API 客户端:Token 管理、Wiki/Drive API 调用、导出/下载 |
internal/datasource/connector/feishu/connector.go | Connector 接口实现:Validate、ListResources、FetchAll、FetchIncremental |
internal/datasource/connector/feishu/connector_test.go | 单元测试:使用 HTTP Mock 模拟飞书开放平台 |
数据源同步调度基于 robfig/cron/v3,使用 6 段 Cron 表达式(含秒):
秒 分 时 日 月 星期
常用预设:
| 表达式 | 说明 |
|---|---|
0 */30 * * * * | 每 30 分钟 |
0 0 * * * * | 每小时整点 |
0 0 */6 * * * | 每 6 小时 |
0 0 0 * * * | 每天午夜 |
0 0 2 * * * | 每天凌晨 2 点 |
服务启动
│
▼
Scheduler.Start(ctx)
│
├─ 1. 从 DB 加载所有活跃数据源 (FindActive)
│ 条件: status = active, sync_schedule != '', deleted_at IS NULL
│
├─ 2. 为每个数据源注册 Cron Entry
│
└─ 3. 启动 Cron Runner
Cron 触发时:
├─ 检查是否有运行中的同步 (HasRunningSync)
│ 是 → 跳过本次触发
│
├─ 创建 SyncLog (status: running)
│
└─ 入队 asynq 任务
队列: low
TaskID: dssync:<dsID>:<UTC minute>(去重)
HasRunningSync 检查该数据源是否已有运行中的同步任务,防止同步耗时超过调度间隔时产生重叠TaskID(格式 dssync:<dataSourceID>:<UTC minute>)去重,同一分钟内多实例触发时只有第一个成功入队,其余标记为 canceled| 操作 | 行为 |
|---|---|
| 创建数据源(含 schedule) | 注册 Cron Entry |
| 更新数据源 schedule | 移除旧 Entry → 注册新 Entry |
| 暂停数据源 | 移除 Cron Entry |
| 恢复数据源 | 重新注册 Cron Entry |
| 删除数据源 | 移除 Cron Entry |
| 参数 | 值 | 说明 |
|---|---|---|
| Token 缓存安全余量 | 5 min | Token 过期前提前刷新(飞书/企微通用) |
| 飞书 Wiki Space 分页大小 | 50 | ListResources 每页返回数 |
| 飞书导出任务轮询间隔 | 2s | 飞书 Export API 状态检查间隔 |
| 飞书导出任务最大等待 | ~60s | 导出任务超时时间 |
| 企微文档列表分页大小 | 50 | wedoc/doc_list 每页返回数 |
| 企微微盘文件列表分页大小 | 1000 | wedrive/file_list 每次拉取最大数 |
| 企微 access_token 有效期 | 7200s | 2 小时,缓存提前 5 分钟刷新 |
| 同步日志保留天数 | 30 天 | 默认值,可配置 |
| 同步日志默认分页 | 10 | GetSyncLogs 默认 limit |
| Cron 表达式格式 | 6 段含秒 | robfig/cron WithSeconds() |
| 定时同步队列 | low | asynq 低优先级队列 |
| 手动同步队列 | default | asynq 默认队列 |
| 场景 | 处理策略 |
|---|---|
| 凭证验证失败 | 返回错误信息,数据源状态标记为 error,记录 error_message |
| 凭证验证成功(原为 error) | 恢复状态为 active,清空 error_message |
| 连接器未找到 | 返回 ErrConnectorNotFound |
| 知识库不存在 | 创建/更新时校验,返回 ErrKnowledgeBaseNotFound |
| 数据源不存在 | 返回 ErrDataSourceNotFound |
| 数据源非活跃状态触发同步 | 仅允许 active 或 error 状态手动同步 |
| 同步任务入队失败 | SyncLog 标记为 failed,记录错误 |
| 定时同步任务去重 | SyncLog 标记为 canceled,错误信息 deduplicated: another instance enqueued first |
| 单文档拉取失败 | 记录错误到 FetchedItem.Metadata,不中断整体同步 |
| 单文档入库失败 | 计入 items_failed,不中断整体同步 |
| 重复文档 (DuplicateKnowledgeError) | 计入 items_skipped |
| 不支持的文档类型 | 静默跳过(如飞书的 mindnote、slides) |
接入新的外部平台只需 3 步:
在 internal/datasource/connector/<type>/ 下创建连接器:
package myplatform
type Connector struct {
// 连接器配置
}
func NewConnector() *Connector {
return &Connector{}
}
func (c *Connector) Type() string {
return types.ConnectorTypeMyPlatform
}
func (c *Connector) Validate(ctx context.Context, config *types.DataSourceConfig) error {
// 验证凭证:尝试获取 Token 或调用 API
}
func (c *Connector) ListResources(ctx context.Context, config *types.DataSourceConfig) ([]types.Resource, error) {
// 列出可选资源(工作区、空间、文件夹等)
}
func (c *Connector) FetchAll(ctx context.Context, config *types.DataSourceConfig, resourceIDs []string) ([]types.FetchedItem, error) {
// 全量拉取所有文档
}
func (c *Connector) FetchIncremental(ctx context.Context, config *types.DataSourceConfig, cursor *types.SyncCursor) ([]types.FetchedItem, *types.SyncCursor, error) {
// 增量拉取变更(可复用 FetchAll 逻辑 + 编辑时间对比)
}
建议的文件结构:
internal/datasource/connector/myplatform/
├── types.go # 平台 API 类型定义
├── client.go # API 客户端(Token 管理、HTTP 调用)
└── connector.go # Connector 接口实现
在 internal/container/container.go 的 initConnectorRegistry 中注册:
func initConnectorRegistry() *datasource.ConnectorRegistry {
registry := datasource.NewConnectorRegistry()
registry.Register(feishuConnector.NewConnector())
registry.Register(myplatform.NewConnector()) // 新增
return registry
}
在 internal/datasource/connector.go 的 ConnectorMetadataRegistry 中添加元数据:
types.ConnectorTypeMyPlatform: {
Type: types.ConnectorTypeMyPlatform,
Name: "My Platform",
Description: "Import documents from My Platform",
Icon: "myplatform",
Priority: 10,
AuthType: "api_key",
Capabilities: []string{"incremental"},
},
在 internal/types/datasource.go 中添加类型常量:
const ConnectorTypeMyPlatform = "myplatform"
在 frontend/src/views/knowledge/settings/DataSourceEditorDialog.vue 中:
available: true在 i18n 文件中添加平台名称翻译。
Service 层(
DataSourceService)不需要修改 — 数据源管理、同步调度、内容入库、日志记录全部由 Service 统一处理。可参考飞书连接器 (internal/datasource/connector/feishu/) 作为实现参考。