Back to Weknora

数据源导入开发文档

docs/数据源导入开发文档.md

0.5.136.2 KB
Original Source

数据源导入开发文档

WeKnora 的数据源导入模块支持从外部平台(飞书、企业微信、Notion、Confluence 等)自动导入和同步内容到知识库。用户可配置数据源连接,选择需要同步的资源,并通过手动触发或定时调度自动完成内容的增量/全量同步。

数据源绑定到知识库,一个知识库可接入多个数据源。所有配置通过前端知识库设置页面管理,凭证使用 AES-256-GCM 加密存储在数据库中。

目录


快速接入指南

前置条件

  • WeKnora 已部署并运行
  • 已创建至少一个知识库
  • 拥有外部平台(如飞书、企业微信)的管理权限,可创建应用并授权

飞书知识库接入

从飞书知识库(Wiki)同步文档到 WeKnora。支持 docx、doc、sheet、bitable、file 类型文档的自动导入。

第一步:创建飞书应用

  1. 登录 飞书开放平台开发者后台创建企业自建应用
  2. 凭证与基础信息 页获取:
    • App ID
    • App Secret

第二步:开通权限

权限管理 中搜索并开通以下权限:

权限说明
wiki:wiki:readonly读取知识库空间列表和节点树
drive:drive:readonly读取云文档基础信息
drive:export:readonly导出文档内容(docx/xlsx)
docx:document:readonly读取新版文档内容

第三步:发布应用

版本管理与发布 中创建版本并提交审核。审核通过后方可正常调用 API。

第四步:在 WeKnora 中添加数据源

  1. 进入知识库设置页 → 数据源 标签页
  2. 点击 添加数据源
  3. 选择连接器类型:飞书
  4. 填写凭证:
    • App ID:填入从飞书获取的 App ID
    • App Secret:填入从飞书获取的 App Secret
  5. 点击 测试连接,验证凭证有效
  6. 选择要同步的知识库空间(Wiki Space)
  7. 配置同步策略:
    • 同步模式:增量同步(推荐)或全量同步
    • 同步频率:选择预设的 Cron 表达式(如每 30 分钟、每小时、每天凌晨 2 点)
    • 冲突策略:覆盖(推荐)或跳过
    • 同步删除:是否同步源端的删除操作
  8. 点击保存

第五步:验证

保存后可点击 立即同步 触发首次同步。在同步日志中查看同步进度和结果:

  • 成功:显示已创建/更新/跳过/失败的文档数量
  • 失败:显示错误信息,常见原因为权限不足或 Token 过期

注意:飞书国际版(Lark)同样支持,在创建数据源时会自动适配 https://open.larksuite.com 的 API 地址。


前端管理

数据源在知识库设置页的 数据源 标签页中管理。

创建向导

添加数据源使用多步向导流程:

  1. 选择类型:从可用连接器列表中选择平台
  2. 配置凭证:填写平台凭证,支持即时「测试连接」验证
  3. 选择资源:浏览并勾选要同步的资源(如飞书知识库空间)
  4. 同步策略:配置 Cron 调度、同步模式、冲突策略等

数据源列表

每个数据源以卡片形式展示,包含:

  • 连接器类型标识:飞书(蓝色)等
  • 数据源名称:用户自定义
  • 同步状态:活跃 / 暂停 / 错误
  • 最近同步时间:上次成功同步的时间
  • 最近同步结果:已同步的文档数量摘要
  • 操作按钮:立即同步、暂停/恢复、编辑、删除

同步日志

每个数据源可查看历史同步日志,包含:

  • 同步状态:运行中 / 成功 / 部分成功 / 失败 / 已取消
  • 开始时间 / 结束时间
  • 文档计数:总数、已创建、已更新、已删除、已跳过、失败
  • 错误信息(如有)

架构总览

┌───────────────────────────────────────────────────────────────────────────┐
│                          数据源导入架构                                     │
│                                                                           │
│   ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐   │
│   │   飞书    │  │  Notion  │  │Confluence│  │  GitHub  │  │  其他...  │   │
│   │  (Wiki)  │  │          │  │          │  │          │  │          │   │
│   └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘   │
│        │ API         │ API         │ API         │ API         │          │
│   ─────┼─────────────┼────────────┼─────────────┼─────────────┼──────    │
│        ▼             ▼            ▼             ▼             ▼          │
│   ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐  │
│   │  Feishu  │  │  Notion  │  │Confluence│  │  GitHub  │  │   ...    │  │
│   │Connector │  │Connector │  │Connector │  │Connector │  │Connector │  │
│   └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘  │
│        │             │             │              │             │   连接  │
│   ─────┴─────────────┴─────────────┴──────────────┴─────────────┘   器层  │
│                      │                                                    │
│                      ▼                                                    │
│   ┌──────────────────────────────────────────────┐                        │
│   │         ConnectorRegistry                    │   连接器注册表          │
│   │   · 按类型查找连接器实例                        │                       │
│   │   · 连接器元数据(名称、认证方式、能力)          │                       │
│   └──────────────────┬───────────────────────────┘                        │
│                      │                                                    │
│   ───────────────────┼────────────────────────────────────────────────    │
│                      ▼                                                    │
│   ┌──────────────────────────────────────────────┐                        │
│   │       DataSourceService (业务逻辑层)          │                        │
│   │                                              │                        │
│   │  ┌────────────────────────────────────────┐  │                        │
│   │  │ · 数据源 CRUD                          │  │                        │
│   │  │ · 连接验证 (ValidateConnection)        │  │                        │
│   │  │ · 资源发现 (ListAvailableResources)    │  │                        │
│   │  │ · 手动同步 (ManualSync)                │  │                        │
│   │  │ · 暂停/恢复                            │  │                        │
│   │  │ · 同步执行 (ProcessSync)               │  │                        │
│   │  │ · 内容入库 (ingestItem)                │  │                        │
│   │  └────────────────────────────────────────┘  │                        │
│   └──────────────────┬───────────────────────────┘                        │
│                      │                                                    │
│   ───────────────────┼────────────────────────────────────────────────    │
│                      ▼                                                    │
│   ┌──────────────────────────────────┐  ┌────────────────────────────┐    │
│   │      Scheduler (Cron 调度器)     │  │    Task Queue (asynq)     │    │
│   │  · robfig/cron(含秒)           │  │  · 异步同步任务            │    │
│   │  · DB 加载活跃数据源             │  │  · Redis / Lite 模式      │    │
│   │  · 任务去重 (TaskID + Running)   │  │  · TypeDataSourceSync     │    │
│   └──────────────────────────────────┘  └────────────────────────────┘    │
│                      │                                                    │
│   ───────────────────┼────────────────────────────────────────────────    │
│                      ▼                                                    │
│   ┌──────────────────────────────────────────────────────┐                │
│   │            WeKnora Core (知识入库管道)                │                │
│   │  KnowledgeService · KnowledgeBaseService             │                │
│   │  文档解析 → 分块 → 向量化 → 索引                      │                │
│   └──────────────────────────────────────────────────────┘                │
└───────────────────────────────────────────────────────────────────────────┘

设计模式:

模式用途
Adapter Pattern统一不同平台的差异,每个平台实现 Connector 接口
Registry Pattern通过 ConnectorRegistry 按类型动态查找连接器实例
Strategy PatternSyncMode(增量/全量)、ConflictStrategy(覆盖/跳过)可选策略
Producer-Consumer手动/定时触发 → asynq 任务队列 → Worker 异步执行同步
Cursor-based Pagination增量同步基于 SyncCursor 状态跟踪变更

数据模型

data_sources 表

数据源配置存储在 data_sources 表中,绑定到知识库:

sql
CREATE TABLE data_sources (
    id                      VARCHAR(36) PRIMARY KEY,
    tenant_id               BIGINT NOT NULL,
    knowledge_base_id       VARCHAR(36) NOT NULL,
    name                    VARCHAR(255) NOT NULL DEFAULT '',
    type                    VARCHAR(50) NOT NULL,          -- 连接器类型
    config                  JSONB NOT NULL DEFAULT '{}',   -- 加密的凭证和配置
    sync_schedule           VARCHAR(100) DEFAULT '',       -- Cron 表达式(6 段,含秒)
    sync_mode               VARCHAR(20) DEFAULT 'incremental',  -- 'incremental' | 'full'
    status                  VARCHAR(20) DEFAULT 'active',  -- 'active' | 'paused' | 'error' | 'deleted'
    conflict_strategy       VARCHAR(20) DEFAULT 'overwrite',    -- 'overwrite' | 'skip'
    sync_deletions          BOOLEAN DEFAULT true,
    last_sync_at            TIMESTAMPTZ,
    last_sync_cursor        JSONB,
    last_sync_result        JSONB,
    error_message           TEXT DEFAULT '',
    sync_log_retention_days INTEGER DEFAULT 30,
    created_at              TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at              TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
    deleted_at              TIMESTAMPTZ
);

config 字段结构(解密后):

json
{
  "type": "feishu",
  "credentials": {
    "app_id": "cli_xxx",
    "app_secret": "xxx"
  },
  "resource_ids": ["space_id_1", "space_id_2"],
  "settings": {}
}

各连接器凭证字段:

连接器类型凭证字段认证方式
飞书 (feishu)app_id, app_secretOAuth2 (Tenant Access Token)

sync_logs 表

同步日志记录每次同步操作的执行情况:

sql
CREATE TABLE sync_logs (
    id              VARCHAR(36) PRIMARY KEY,
    data_source_id  VARCHAR(36) NOT NULL REFERENCES data_sources(id) ON DELETE CASCADE,
    tenant_id       BIGINT NOT NULL,
    status          VARCHAR(20) NOT NULL,    -- 'running' | 'success' | 'partial' | 'failed' | 'canceled'
    started_at      TIMESTAMPTZ NOT NULL,
    finished_at     TIMESTAMPTZ,
    items_total     INTEGER DEFAULT 0,
    items_created   INTEGER DEFAULT 0,
    items_updated   INTEGER DEFAULT 0,
    items_deleted   INTEGER DEFAULT 0,
    items_skipped   INTEGER DEFAULT 0,
    items_failed    INTEGER DEFAULT 0,
    error_message   TEXT DEFAULT '',
    result          JSONB,                   -- 详细同步结果
    created_at      TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
    updated_at      TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP
);

API 端点

所有端点位于 /api/v1/datasource 路径下,需认证。

连接器元数据

方法路径说明
GET/api/v1/datasource/types获取可用连接器类型列表

响应示例:

json
[
  {
    "type": "feishu",
    "name": "飞书",
    "description": "Import documents from Feishu/Lark Wiki spaces",
    "icon": "feishu",
    "priority": 1,
    "auth_type": "oauth2",
    "capabilities": ["incremental", "deletion_sync"]
  }
]

凭证验证(不落库)

方法路径说明
POST/api/v1/datasource/validate-credentials验证凭证(创建前的测试连接)

请求体:

json
{
  "type": "feishu",
  "credentials": {
    "app_id": "cli_xxx",
    "app_secret": "xxx"
  }
}

数据源管理

方法路径说明
POST/api/v1/datasource创建数据源
GET/api/v1/datasource?kb_id=xxx列出知识库的所有数据源
GET/api/v1/datasource/:id获取数据源详情
PUT/api/v1/datasource/:id更新数据源配置
DELETE/api/v1/datasource/:id删除数据源(软删除)

操作

方法路径说明
POST/api/v1/datasource/:id/validate测试已有数据源的连接(会更新状态)
GET/api/v1/datasource/:id/resources浏览外部系统的可用资源
POST/api/v1/datasource/:id/sync手动触发同步
POST/api/v1/datasource/:id/pause暂停数据源(停止定时同步)
POST/api/v1/datasource/:id/resume恢复数据源

同步日志

方法路径说明
GET/api/v1/datasource/:id/logs?limit=10&offset=0获取同步日志列表
GET/api/v1/datasource/logs/:log_id获取单条同步日志详情

核心概念

DataSource — 数据源

每个数据源代表一个外部平台连接与 WeKnora 知识库的绑定关系。一个知识库可以添加多个数据源,实现从多个平台汇聚内容。

数据源的核心属性:

  • 类型 (Type):绑定的连接器类型,如 feishunotion
  • 配置 (Config):加密存储的凭证、选中的资源 ID 列表和额外设置
  • 同步模式 (SyncMode)incremental(增量同步,只同步变更)或 full(全量同步,每次拉取所有)
  • 同步调度 (SyncSchedule):6 段 Cron 表达式(含秒),为空则仅支持手动触发
  • 冲突策略 (ConflictStrategy):源端与本地均有变更时的处理策略
  • 同步删除 (SyncDeletions):源端删除文档时是否同步删除本地知识

Connector — 连接器

连接器是与外部平台交互的适配层,每个平台实现一个连接器。所有连接器通过 ConnectorRegistry 注册和管理。

连接器提供四个核心能力:

能力说明
Validate验证凭证和连接是否可用
ListResources列出外部系统中可选的资源(如知识库空间、文件夹)
FetchAll全量拉取指定资源中的所有文档
FetchIncremental基于游标增量拉取变更的文档

SyncCursor — 同步游标

增量同步的状态标记,记录上次同步的位点。游标内容因连接器类型而异,由连接器自行管理。如飞书连接器的游标包含每个节点的最后编辑时间,用于判断哪些文档发生了变更。

go
type SyncCursor struct {
    LastSyncTime    *time.Time             // 上次同步时间
    ConnectorCursor map[string]interface{} // 连接器自定义状态
    LastSchemaHash  string                 // Schema 变更检测
}

FetchedItem — 拉取到的文档条目

连接器从外部平台拉取文档后,统一封装为 FetchedItem

go
type FetchedItem struct {
    ExternalID       string            // 外部平台的文档唯一标识
    Title            string            // 文档标题
    Content          []byte            // 文档内容(二进制)
    ContentType      string            // MIME 类型
    FileName         string            // 文件名(含扩展名)
    URL              string            // 原始 URL
    UpdatedAt        *time.Time        // 最后更新时间
    Metadata         map[string]string // 元数据(来源、作者等)
    IsDeleted        bool              // 标记为已删除(增量同步)
    SourceResourceID string            // 所属资源 ID
}

Resource — 可选资源

外部系统中可选择同步的资源节点,支持层级结构:

go
type Resource struct {
    ExternalID  string            // 外部 ID
    Name        string            // 名称
    Type        string            // 类型(如 wiki_space)
    Description string            // 描述
    URL         string            // 外部链接
    ModifiedAt  *time.Time        // 最后修改时间
    ParentID    string            // 父节点 ID(支持树形结构)
    Metadata    map[string]string // 附加信息
}

同步处理流程

完整同步流程

手动触发 / Cron 定时触发
        │
        ▼
┌─ 创建 SyncLog (status: running) ──────────────────┐
│  入队异步任务 (asynq: TypeDataSourceSync)           │
│  · 手动同步 → 队列: default                        │
│  · 定时同步 → 队列: low, TaskID 去重               │
└──────────────────────────┬────────────────────────┘
                           │
                           ▼
┌─ ProcessSync (Worker 异步执行) ───────────────────┐
│  1. 解析任务体 (DataSourceSyncPayload)             │
│  2. 加载数据源配置                                  │
│  3. 解密凭证 → 获取 Connector                      │
│  4. 设置租户上下文                                  │
│  5. 创建/获取自动标签 (连接器名·数据源名)            │
│  6. 判断同步模式:                                   │
│     ├─ ForceFull 或 SyncModeFull → FetchAll        │
│     └─ SyncModeIncremental → FetchIncremental      │
│  7. 遍历 FetchedItem 列表:                         │
│     └─ ingestItem → KnowledgeService               │
│  8. 更新 SyncLog (计数、状态、耗时)                 │
│  9. 更新 DataSource (游标、最后同步时间、结果)       │
└──────────────────────────────────────────────────┘

ingestItem — 单文档入库

FetchedItem
    │
    ├─ IsDeleted = true?
    │     └─ 按 external_id 查找已有知识 → 软删除
    │
    ├─ 有文件内容 (Content)?
    │     └─ CreateKnowledgeFromFile
    │        · 按 external_id 查找已有知识
    │        · 已存在 → 删除旧版本 → 重建
    │        · 不存在 → 新建
    │        · metadata: external_id, source_resource_id, datasource_id
    │        · channel: 连接器类型 (如 "feishu")
    │        · 自动关联标签
    │
    └─ 仅有 URL (无 Content)?
          └─ CreateKnowledgeFromURL
             · 同上查重逻辑

数据源生命周期

创建数据源 (前端向导)
        │
        ▼
┌─ DataSourceService.Create ────────────────────┐
│  1. 验证知识库存在                              │
│  2. 获取对应 Connector                         │
│  3. 验证配置和凭证 (connector.Validate)         │
│  4. 加密凭证写库                               │
│  5. 若有 SyncSchedule → 注册 Cron 任务         │
└───────────────────────────────────────────────┘

暂停数据源:
  status → paused, 移除 Cron 任务

恢复数据源:
  status → active, 重新注册 Cron 任务

删除数据源:
  deleted_at = NOW(), 移除 Cron 任务, 级联删除 sync_logs

连接错误:
  ValidateConnection 失败 → status → error, 记录 error_message
  ValidateConnection 成功且原为 error → status → active, 清空 error_message

接口定义

Connector — 连接器接口(必须实现)

go
type Connector interface {
    Type() string
    Validate(ctx context.Context, config *types.DataSourceConfig) error
    ListResources(ctx context.Context, config *types.DataSourceConfig) ([]types.Resource, error)
    FetchAll(ctx context.Context, config *types.DataSourceConfig, resourceIDs []string) ([]types.FetchedItem, error)
    FetchIncremental(ctx context.Context, config *types.DataSourceConfig, cursor *types.SyncCursor) ([]types.FetchedItem, *types.SyncCursor, error)
}
方法职责
Type()返回连接器类型标识,用于注册和路由
Validate()验证凭证和连接可用性,如尝试获取 Access Token
ListResources()列出外部系统中可选的顶层资源(如知识库空间、工作区)
FetchAll()全量拉取指定资源下的所有文档,返回 FetchedItem 列表
FetchIncremental()基于上次游标增量拉取变更,返回变更列表和新游标

DataSourceService — 业务服务接口

go
type DataSourceService interface {
    CreateDataSource(ctx context.Context, ds *types.DataSource) (*types.DataSource, error)
    GetDataSource(ctx context.Context, id string) (*types.DataSource, error)
    ListDataSources(ctx context.Context, knowledgeBaseID string) ([]*types.DataSource, error)
    UpdateDataSource(ctx context.Context, ds *types.DataSource) (*types.DataSource, error)
    DeleteDataSource(ctx context.Context, id string) error
    ValidateConnection(ctx context.Context, dataSourceID string) error
    ValidateCredentials(ctx context.Context, connectorType string, credentials map[string]interface{}) error
    ListAvailableResources(ctx context.Context, dataSourceID string) ([]types.Resource, error)
    ManualSync(ctx context.Context, dataSourceID string) (*types.SyncLog, error)
    PauseDataSource(ctx context.Context, dataSourceID string) error
    ResumeDataSource(ctx context.Context, dataSourceID string) error
    GetSyncLogs(ctx context.Context, dataSourceID string, limit, offset int) ([]*types.SyncLog, error)
    GetSyncLog(ctx context.Context, syncLogID string) (*types.SyncLog, error)
    ProcessSync(ctx context.Context, task *asynq.Task) error
}

ConnectorMetadata — 连接器元数据

go
type ConnectorMetadata struct {
    Type         string   `json:"type"`
    Name         string   `json:"name"`
    Description  string   `json:"description"`
    Icon         string   `json:"icon"`
    Priority     int      `json:"priority"`
    AuthType     string   `json:"auth_type"`
    Capabilities []string `json:"capabilities"`
}
字段说明
Type连接器类型标识(如 feishu
Name显示名称(如 飞书
Priority排序优先级,数值越小越靠前
AuthType认证方式:oauth2 / api_key / token / password / none
Capabilities支持的能力:incremental(增量同步)、webhookdeletion_sync(删除同步)

连接器详解

飞书 (Feishu)

飞书连接器支持从飞书知识库(Wiki)同步文档到 WeKnora。

认证机制

App ID + App Secret
        │
        ▼
  POST /open-apis/auth/v3/tenant_access_token/internal
        │
        ▼
  获取 Tenant Access Token(有效期 2 小时)
        │
        ▼
  Token 缓存,提前 5 分钟刷新
  • 认证类型:企业自建应用,Tenant Access Token
  • Token 有效期:2 小时,内置缓存,提前 5 分钟自动刷新
  • API 地址:默认 https://open.feishu.cn,国际版 Lark 使用 https://open.larksuite.com

资源发现

ListResources 调用飞书 Wiki Space API 列出所有知识库空间:

GET /open-apis/wiki/v2/spaces (分页, page_size=50)

每个知识库空间映射为一个 Resource

字段
Typewiki_space
ExternalID空间 ID (space_id)
URLhttps://feishu.cn/wiki/{space_id}
Metadata.visibility空间可见性

全量同步 (FetchAll)

对每个选中的 Wiki Space:
  1. 递归列举空间下所有节点 (ListAllWikiNodesRecursive)
     └─ GET /open-apis/wiki/v2/spaces/{space_id}/nodes (parent 为空 → 顶层)
     └─ 对 has_child=true 的节点递归
  2. 对每个节点:
     └─ fetchNodeContent → 判断文档类型 → 导出/下载

增量同步 (FetchIncremental)

增量同步基于节点编辑时间对比,而非飞书事件订阅:

1. 加载上次游标 (feishuCursor)
   └─ SpaceNodeTimes[space_id][node_token] = 上次记录的编辑时间

2. 对每个 Space 递归列举所有节点(全树遍历)

3. 对比变更:
   ├─ 新节点(游标中不存在)→ 拉取内容
   ├─ obj_edit_time 变更(优先)或 node_edit_time 变更 → 拉取内容
   ├─ 编辑时间未变 → 跳过
   └─ 游标中存在但本次遍历不存在 → 标记为 IsDeleted

4. 返回新游标(含本轮每个节点的最新编辑时间)

注意:增量同步的 API 调用量与知识库空间的节点树大小相关,因为需要遍历完整节点树来检测变更。对于大型空间,建议适当降低同步频率。

支持的文档类型

obj_type支持获取方式导出格式
docx导出任务 (Export API).docx
doc导出任务 (Export API).docx
sheet导出任务 (Export API).xlsx
bitable导出任务 (Export API).xlsx
file直接下载 (Drive Download API)原格式
mindnote
slides

内容获取流程

文档 (docx/doc/sheet/bitable):

1. CreateExportTask → 创建导出任务
2. 轮询 GetExportTaskStatus(间隔 2 秒,最长约 60 秒)
3. DownloadExportFile → 用 file_token 下载导出文件
4. 清理文件名 (sanitizeFileName) + 补全扩展名

文件 (file):

DownloadDriveFile → GET /drive/v1/files/{token}/download

源码文件

文件职责
internal/datasource/connector/feishu/types.go飞书 API 类型定义、配置结构、常量
internal/datasource/connector/feishu/client.goAPI 客户端:Token 管理、Wiki/Drive API 调用、导出/下载
internal/datasource/connector/feishu/connector.goConnector 接口实现:Validate、ListResources、FetchAll、FetchIncremental
internal/datasource/connector/feishu/connector_test.go单元测试:使用 HTTP Mock 模拟飞书开放平台

定时调度

Cron 调度器

数据源同步调度基于 robfig/cron/v3,使用 6 段 Cron 表达式(含秒)

秒 分 时 日 月 星期

常用预设:

表达式说明
0 */30 * * * *每 30 分钟
0 0 * * * *每小时整点
0 0 */6 * * *每 6 小时
0 0 0 * * *每天午夜
0 0 2 * * *每天凌晨 2 点

调度生命周期

服务启动
    │
    ▼
Scheduler.Start(ctx)
    │
    ├─ 1. 从 DB 加载所有活跃数据源 (FindActive)
    │     条件: status = active, sync_schedule != '', deleted_at IS NULL
    │
    ├─ 2. 为每个数据源注册 Cron Entry
    │
    └─ 3. 启动 Cron Runner

Cron 触发时:
    ├─ 检查是否有运行中的同步 (HasRunningSync)
    │     是 → 跳过本次触发
    │
    ├─ 创建 SyncLog (status: running)
    │
    └─ 入队 asynq 任务
          队列: low
          TaskID: dssync:<dsID>:<UTC minute>(去重)

任务去重(两层保护)

  1. DB 层HasRunningSync 检查该数据源是否已有运行中的同步任务,防止同步耗时超过调度间隔时产生重叠
  2. 队列层:基于 TaskID(格式 dssync:<dataSourceID>:<UTC minute>)去重,同一分钟内多实例触发时只有第一个成功入队,其余标记为 canceled

动态管理

操作行为
创建数据源(含 schedule)注册 Cron Entry
更新数据源 schedule移除旧 Entry → 注册新 Entry
暂停数据源移除 Cron Entry
恢复数据源重新注册 Cron Entry
删除数据源移除 Cron Entry

关键参数与阈值

参数说明
Token 缓存安全余量5 minToken 过期前提前刷新(飞书/企微通用)
飞书 Wiki Space 分页大小50ListResources 每页返回数
飞书导出任务轮询间隔2s飞书 Export API 状态检查间隔
飞书导出任务最大等待~60s导出任务超时时间
企微文档列表分页大小50wedoc/doc_list 每页返回数
企微微盘文件列表分页大小1000wedrive/file_list 每次拉取最大数
企微 access_token 有效期7200s2 小时,缓存提前 5 分钟刷新
同步日志保留天数30 天默认值,可配置
同步日志默认分页10GetSyncLogs 默认 limit
Cron 表达式格式6 段含秒robfig/cron WithSeconds()
定时同步队列lowasynq 低优先级队列
手动同步队列defaultasynq 默认队列

错误处理

场景处理策略
凭证验证失败返回错误信息,数据源状态标记为 error,记录 error_message
凭证验证成功(原为 error)恢复状态为 active,清空 error_message
连接器未找到返回 ErrConnectorNotFound
知识库不存在创建/更新时校验,返回 ErrKnowledgeBaseNotFound
数据源不存在返回 ErrDataSourceNotFound
数据源非活跃状态触发同步仅允许 activeerror 状态手动同步
同步任务入队失败SyncLog 标记为 failed,记录错误
定时同步任务去重SyncLog 标记为 canceled,错误信息 deduplicated: another instance enqueued first
单文档拉取失败记录错误到 FetchedItem.Metadata,不中断整体同步
单文档入库失败计入 items_failed,不中断整体同步
重复文档 (DuplicateKnowledgeError)计入 items_skipped
不支持的文档类型静默跳过(如飞书的 mindnote、slides)

扩展新连接器

接入新的外部平台只需 3 步:

1. 实现 Connector 接口

internal/datasource/connector/<type>/ 下创建连接器:

go
package myplatform

type Connector struct {
    // 连接器配置
}

func NewConnector() *Connector {
    return &Connector{}
}

func (c *Connector) Type() string {
    return types.ConnectorTypeMyPlatform
}

func (c *Connector) Validate(ctx context.Context, config *types.DataSourceConfig) error {
    // 验证凭证:尝试获取 Token 或调用 API
}

func (c *Connector) ListResources(ctx context.Context, config *types.DataSourceConfig) ([]types.Resource, error) {
    // 列出可选资源(工作区、空间、文件夹等)
}

func (c *Connector) FetchAll(ctx context.Context, config *types.DataSourceConfig, resourceIDs []string) ([]types.FetchedItem, error) {
    // 全量拉取所有文档
}

func (c *Connector) FetchIncremental(ctx context.Context, config *types.DataSourceConfig, cursor *types.SyncCursor) ([]types.FetchedItem, *types.SyncCursor, error) {
    // 增量拉取变更(可复用 FetchAll 逻辑 + 编辑时间对比)
}

建议的文件结构:

internal/datasource/connector/myplatform/
├── types.go        # 平台 API 类型定义
├── client.go       # API 客户端(Token 管理、HTTP 调用)
└── connector.go    # Connector 接口实现

2. 注册连接器

internal/container/container.goinitConnectorRegistry 中注册:

go
func initConnectorRegistry() *datasource.ConnectorRegistry {
    registry := datasource.NewConnectorRegistry()
    registry.Register(feishuConnector.NewConnector())
    registry.Register(myplatform.NewConnector()) // 新增
    return registry
}

internal/datasource/connector.goConnectorMetadataRegistry 中添加元数据:

go
types.ConnectorTypeMyPlatform: {
    Type:         types.ConnectorTypeMyPlatform,
    Name:         "My Platform",
    Description:  "Import documents from My Platform",
    Icon:         "myplatform",
    Priority:     10,
    AuthType:     "api_key",
    Capabilities: []string{"incremental"},
},

internal/types/datasource.go 中添加类型常量:

go
const ConnectorTypeMyPlatform = "myplatform"

3. 前端添加配置

frontend/src/views/knowledge/settings/DataSourceEditorDialog.vue 中:

  • 添加连接器类型选项
  • 添加该平台的凭证表单字段
  • 标记 available: true

在 i18n 文件中添加平台名称翻译。

Service 层(DataSourceService)不需要修改 — 数据源管理、同步调度、内容入库、日志记录全部由 Service 统一处理。可参考飞书连接器 (internal/datasource/connector/feishu/) 作为实现参考。