docs/develop/agent-go-port-design.md
Last cross-checked against code: 2026-06-17 Source of truth:
internal/agent/(canvas, component, tool, runtime, workflowx, dsl, audio, sandbox, observability/otel/) +internal/service/(agent.go, canvas_decode.go) +internal/handler/(agent.go) +tools/(gen-component-parity, migrate-canvas)
RAGFlow 的 Agent Canvas(编排 22 个 component + 21 个 tool 的 DSL 执行器)从 Python 移植到 Go。Python 端位于 agent/canvas.py(Graph / Canvas)+ agent/component/base.py(ComponentBase / ComponentParamBase)+ agent/tools/。Go 端独立实现于 internal/agent/,与 Python 端通过共享 DSL JSON schema 兼容(v1↔v2 双向转换器在 internal/agent/dsl/,已收敛为单一 wire 形态)。
State + Workflow 混血:eino 的 compose.Workflow 提供声明式拓扑(节点 + exec 边)+ 并发调度;compose.WithGenLocalState + WithStatePreHandler/WithStatePostHandler 提供任意节点读任意节点输出的"状态变量"能力。State 解决 {{cpn_id@param}} 任意交叉引用问题,Workflow 解决执行拓扑 + cancel + checkpoint 问题。
5-tier 移植策略:T1(直接复用 eino 内置)→ T2(薄包装)→ T3(Lambda + State)→ T4(嵌套 Workflow 子图)→ T5(重 I/O + 第三方 lib)。判定原则:功能相当 → 优先 eino 内置,禁止复制 Python 端的黑魔法(_feeded_deprecated_params、partial hack、thread_pool_exec 异步伪装等)。
Checkpoint 存 Redis:eino compose.CheckPointStore 是纯 KV 接口,Redis String + EXPIRE 是天然 fit。业务元数据(status / canvas_id / parent_run_id)走独立 Redis Hash(由应用层显式控制,不依赖 eino 自动写)。
Observability 走 OpenTelemetry:用 OTLP HTTP exporter + eino callbacks.Handler 注入 span。
AGPL-3 零容忍:T5 DOCX 库穷举后全部 AGPL-3/维护停滞,自实现 OOXML writer(archive/zip stdlib + text/template);PDF 选 signintech/gopdf (MIT);Excel 选 xuri/excelize/v2 (BSD-3);Markdown 选 yuin/goldmark (MIT)。
Wait-for-User 用 eino 原生 interrupt:废除自实现 sentinel 链路(__wait_for_user__ / _user_input_provided / synthetic Loop / cycle_wrap.go / wait_for_user.go);改用 compose.Interrupt + compose.ResumeWithData 一等 API,节点内 compose.GetResumeContext[T](ctx) 读用户输入。
Real Compile/Invoke 接入生产链:buildRunFunc 驱动真正的 canvas.Compile → CompiledCanvas.Invoke 流程。
Before adding any new component, runtime abstraction, or third-party dependency, every phase must check whether the capability already exists elsewhere in the codebase or its declared dependencies.
Decision order (apply in sequence; first match wins):
internal/entity/models/anthropic.go, internal/handler/chat_session.go, or similar already has the capability, just wire it through — don't reimplement.compose.NewGraphMultiBranch or workflowx.AddLoopNode or internal/utility/mcp_client.go already provides the mechanism, build a thin adapter.go.mod (even as // indirect), the work is to import it directly and use it.Anti-patterns explicitly rejected: ❌ Adding InvokeAsync to the Component interface (would compete with eino compose.Parallel); ❌ Registering LoopItem / ExitLoop as components; ❌ Reimplementing Python's runtime path extension in Go; ❌ Building a new MCP subsystem; ❌ "Introducing" gonja (it's already a declared dep).
internal/agent/
├── canvas/ # 画布执行器(eino 编译、状态调度、checkpoint、cancel、stream、interrupt)
│ ├── canvas.go # Canvas struct, BuildWorkflow, Run/Stream
│ ├── runner.go # canvas.Runner; SSE event emission + interrupt catch
│ ├── scheduler.go # State pre/post handler + 节点 lambda + legacyNoOpNames
│ ├── node_body.go # 单节点 lambda 体 (per-class timeout via resolveTimeout)
│ ├── timeout.go # componentDefaults map; 4-level resolver (per-class env → per-class table → uniform env → 600s fallback)
│ ├── loop_subgraph.go # Loop 宏展开 (buildSubWorkflow + translateLoopCondition)
│ ├── interrupt_resume.go # eino interrupt 封装: UserFillUpNodeBody / IsInterruptError / ExtractInterruptContexts
│ ├── multibranch.go # Switch / Categorize 路由的 eino MultiBranch 集成
│ ├── cancel.go # Redis cancel 协议 (watchCancel goroutine)
│ ├── stream.go # SSE 通道
│ ├── compile.go # eino 编译 + WithCheckPointStore + checkPointAdapter (不覆盖 InternalSerializer)
│ ├── checkpoint_store.go # RedisCheckPointStore (Get/Set/Delete) — interface 包含 Delete
│ ├── run_tracker.go # RunTracker (Start/MarkSucceeded/MarkFailed/MarkCancelled/AttachCheckpoint)
│ ├── state_serializer.go # CanvasStateSerializer (encoding/json)
│ └── state_export.go # WithState / GetStateFromContext 薄重导出
│
├── component/ # 19 components + 6 helpers (含 fixture_stubs.go + universe_a_wrappers.go)
│ ├── base.go # Component interface + ParamError + ErrNotImplemented
│ ├── registry.go # name → factory 映射 (auto-init)
│ ├── runtime_wire.go # 组件与 runtime 包的桥接
│ ├── io_init.go # T5 组件初始化
│ ├── fixture_stubs.go # IterationStub / IterationItemStub / RetrievalStub / SearchMyDataset alias / ExeSQLStub
│ ├── universe_a_wrappers.go # newRetrievalComponent / newExeSQLComponent / newTavilySearchComponent — Universe A → Universe B 委派
│ ├── production_chain_fixes_test.go # 生产链回归 pin 测试
│ ├── agent.go # T1 — react.NewAgent + tool artifact capture + maybeAppendCitation + Reset() interface-assert
│ ├── llm.go # T1 — EinoChatModel 薄包装; VisualFiles / Cite / MessageHistoryWindowSize / ChatTemplateKwargs / OutputStructure / JSONOutput / TopP / MaxRetries / DelayAfterError
│ ├── llm_retry.go # retryInvoker + Unwrap(); unwrapChatInvoker 辅助
│ ├── switch.go # T2 — 12 of 12 operators (==/!=/contains/not contains/start with/end with/empty/not empty/>/</>=/<=)
│ ├── begin.go / message.go / categorize.go / invoke.go / browser.go
│ ├── data_operations.go / list_operations.go / string_transform.go
│ ├── variable_aggregator.go / variable_assigner.go
│ ├── fillup.go / userfillup.go
│ ├── loop.go # T4 — no-op marker, 实际工作由 loop_subgraph 接管
│ ├── parallel.go # T4 — workflowx.AddParallelNode 包装
│ ├── docs_generator.go / excel_processor.go # T5
│ └── render.go # output_format HTML/Markdown/plain renderer
│
├── tool/ # 21 tools (统一 eino tool.InvokableTool)
│ ├── registry.go # BuildAll / BuildByName (alias: exesql=execute_sql, retrieval=search_my_dateset=search_my_dataset)
│ ├── http_helper.go # 共用 HTTP client (context + retry + backoff)
│ ├── ssrf.go # SSRF 防护
│ ├── mcp.go # MCPToolAdapter — InvokableRun 调 mcpclient.CallTool over streamable-HTTP
│ ├── retrieval.go / retrieval_service.go / retrieval_nlp.go / retrieval_kg.go # RetrievalService 双 registry: nlp + kg
│ ├── sandbox_bridge.go # CodeExec sandbox providers 桥接
│ └── akshare.go / arxiv.go / code_exec.go / code_exec_client.go / crawler.go / deepl.go
│ / duckduckgo.go / email.go / exesql.go / github.go / google.go
│ / google_scholar.go / jin10.go / pubmed.go / qweather.go
│ / searxng.go / tavily.go / tushare.go / wencai.go / wikipedia.go / yahoo_finance.go
│
├── runtime/ # canvas + component 共享的运行时契约(无 cycle)
│ ├── component.go # Component interface (从 component/base.go 提取)
│ ├── context.go # GetStateFromContext / withState
│ ├── state.go # CanvasState + NewCanvasState + GetVar/SetVar/ReadVars + MarshalJSON/UnmarshalJSON + compose.RegisterSerializableType
│ ├── template.go # ResolveTemplate (regex 快速路径)
│ ├── template_jinja.go # gonja 兜底
│ ├── selector.go # component selector 辅助
│ └── metrics.go # runtime metrics + Prometheus counters
│
├── workflowx/ # eino 扩展(零侵入,外部 helper)
│ ├── loop.go # AddLoopNode[T] — 通用 do-while 循环节点
│ ├── parallel.go # AddParallelNode[I,O] — 通用 bounded-concurrency 节点
│ └── *_test.go # 单元 + 集成测试
│
├── sandbox/ # CodeExec 沙箱 providers
│ ├── provider.go / manager.go / http.go / result_protocol.go / artifacts.go
│ ├── self_managed.go / aliyun.go / e2b.go / local.go / ssh.go
│ └── e2b_test.go / local_test.go / manager_test.go / result_protocol_test.go / self_managed_test.go / ssh_test.go
│
├── audio/ # TTS
│ ├── tts.go # Synthesizer interface + 错误哨兵 + 默认 stub
│ ├── model_provider_synthesizer.go # calls models.BaseModel.AudioSpeech (60+ driver impls)
│ ├── tts_dispatch.go # TTSDispatcher interface + NewTTSDispatchFunc
│ └── *_test.go
│
├── observability/otel/ # OTel SDK + eino callbacks.Handler
│ ├── provider.go # TracerProvider 工厂
│ └── handler.go # eino callbacks.Handler → OTel span
│
└── dsl/ # DSL normalize
├── normalize.go # NormalizeForCanvas (enforceHandleIds / buildGraphFromComponents / foldLegacyLoopVariants)
├── normalize_test.go
└── testdata/ # 7 fixtures (all / browser / dfx_picture_parser / questions_category / resume / subaget / switch)
internal/handler/
├── agent.go # HTTP API (RunAgent SSE with RunEvent.Type dispatch)
├── agent_wait_for_user_test.go # 4 e2e tests pinning wait-for-user orchestrator side
└── admin_runtime.go # POST /api/v1/admin/canvas-runtime/:tenant_id
internal/service/
├── agent.go # AgentService.RunAgent / buildRunFunc / NewAgentService[WithOptions] / option injection
├── canvas_decode.go # decodeCanvasFromDSL
├── canvas_decode_test.go
├── agent_run_e2e_test.go # 4 e2e tests
└── agent_sessions.go # session CRUD
cmd/server_main.go # Redis CheckPointStore + RunTracker + TTS service wire-up
internal/observability/otel/
├── provider.go # TracerProvider 工厂 (读 OTEL_EXPORTER_OTLP_ENDPOINT)
├── handler.go # eino callbacks.Handler → OTel span
└── handler_test.go # tracetest.SpanRecorder
实际文件计数:
eino compose.Workflow 本身只支持 DAG(节点间数据通过 declared predecessor 输出传递),没有"任意节点读任意节点输出"的现成 API。RAGFlow Python 端用 self._canvas.get_variable_value("cpn_id@param") 实现 {{cpn_id@param}} 任意交叉引用。
Go 端方案:
*CanvasState,挂在 context.Value 上。所有节点通过 runtime.GetStateFromContext(ctx) 读写。wf.AddLambdaNode(...) 时挂 compose.WithStatePreHandler[map[string]any, *runtime.CanvasState](canvasPre),从 State 提取节点输入。compose.WithStatePostHandler,把节点输出回写 State。downstream / upstream 加 exec 边,数据流走 State 不走边。// internal/agent/canvas/scheduler.go
node := wf.AddLambdaNode(cpnID, nodeBody,
compose.WithStatePreHandler[map[string]any, *runtime.CanvasState](canvasPre),
compose.WithStatePostHandler[map[string]any, *runtime.CanvasState](canvasPost),
)
for _, upID := range comp.Upstream {
node.AddInput(upID)
}
CanvasState 序列化:
CanvasState 结构包含 sync.RWMutex,原生无法被 encoding/json 序列化(Marshaler 接口与 mutex 不兼容)。通过:
MarshalJSON / UnmarshalJSON 方法 — 输出/读取 canvasStateJSON 内部结构(不暴露 mutex)compose.RegisterSerializableType[CanvasState] — 让 eino StatePre/PostHandler 在 interrupt path 能 marshal/unmarshal stateeino InternalSerializer 是另一个独立的序列化机制(eino 内部 checkpoint payload),不与 WithStateSerializer/compose.Serializer 共享。生产代码只 wire WithCheckPointStore (保留 eino InternalSerializer 默认值) + CanvasState 自带 MarshalJSON。
runtime 包:消除 canvas <-> component cycle问题:component/ 大量文件(Begin/Message/Switch/Browser/...)需要调 canvas.CanvasState / canvas.GetStateFromContext / canvas.ResolveTemplate / canvas.SetDefaultFactory;同时 canvas 通过 ComponentFactory 间接依赖 component 的具体实现。强行 canvas -> component 形成 Go import cycle。
方案:把"运行时共用契约"提取到 internal/agent/runtime/,canvas 和 component 都依赖 runtime,但不互相依赖。
| 提取到 runtime | 留在 canvas | 留在 component |
|---|---|---|
Component interface | DSL graph types (Canvas, CanvasComponent, CanvasComponentObj) | component registry + factory |
CanvasState + GetVar/SetVar/ReadVars + MarshalJSON | 拓扑构建 (BuildWorkflow, buildLoopExpansion, scheduler wiring) | 具体 component 实现 |
GetStateFromContext / withState / WithState | checkpoint / workflow 编译 orchestration | NewBeginComponent, NewMessageComponent, ... |
ResolveTemplate + template_jinja (gonja fallback) | Loop 宏展开 logic | |
ParamError, ErrNotImplemented |
UserFillUp 节点 → compose.Interrupt(ctx, inputSpec)
↓
返回 *InterruptSignal (实现 error 接口)
↓
图引擎捕获 → 自动 checkpoint → 向上传播
↓
Runner.Run 捕获 → SSE "waiting_for_user" + 保存 interrupt id
↓
用户提交 → Runner.Run 注入 __resume_interrupt_id__ + __resume_data__
↓
buildRunFunc 消费 → compose.ResumeWithData(ctx2, id, data)
↓
节点重入 → 顶部 compose.GetResumeContext[T](ctx) → 返回用户输入
核心实现:
// internal/agent/canvas/interrupt_resume.go
func UserFillUpNodeBody(cpnID string, params map[string]any) func(ctx context.Context, input map[string]any) (map[string]any, error) {
inputSpec := buildInputSpec(params)
return func(ctx context.Context, input map[string]any) (map[string]any, error) {
// Resume path: 节点重入时, 顶部检查 resume context
if isResume, hasData, data := compose.GetResumeContext[any](ctx); isResume && hasData {
return map[string]any{
"user_input": data,
cpnID: data,
}, nil
}
// 首次执行: 调 Interrupt 暂停图
if err := compose.Interrupt(ctx, inputSpec); err != nil {
return nil, err
}
return nil, errors.New("UserFillUp: interrupt did not halt execution")
}
}
Runner.Run interrupt catch(internal/agent/canvas/runner.go):
if info, ok := compose.ExtractInterruptInfo(runErr); ok {
ctxs := info.InterruptContexts // []*compose.InterruptCtx
if len(ctxs) > 0 {
d.saveInterruptID(canvasID, sessionID, ctxs[0].ID)
payload, _ := json.Marshal(WaitingForUserEvent{CpnID: ctxs[0].ID})
push(out, RunEvent{Type: "waiting_for_user", Data: string(payload)})
return
}
}
Resume 传参(buildRunFunc):
if resumeID, ok := root["__resume_interrupt_id__"].(string); ok && resumeID != "" {
resumeData := root["__resume_data__"]
delete(root, "__resume_interrupt_id__")
delete(root, "__resume_data__")
ctx2 = compose.ResumeWithData(ctx2, resumeID, resumeData)
}
Cycle 处理:前端契约保证生产画布无环(hasCanvasCycle 阻止保存),eino 的 DAG 检查在 Compile() 时自动拒绝有环图,无需额外防御。
// internal/service/agent.go — buildRunFunc
func (s *AgentService) buildRunFunc(canvasID string, versionRow *entity.UserCanvasVersion, dsl map[string]any) canvas.RunFunc {
return func(ctx context.Context, root map[string]any) (*canvas.CanvasState, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
taskID := ""
if versionRow != nil {
taskID = versionRow.ID
}
c, err := decodeCanvasFromDSL(dsl)
if err != nil {
return nil, err
}
runID := canvasID
if sessionID, ok := root["session_id"].(string); ok && sessionID != "" {
runID = runID + "-" + sessionID
}
state := canvas.NewCanvasState(runID, taskID)
userInput, _ := root["user_input"].(string)
state.Sys["query"] = userInput
ctx2 := runtime.WithState(ctx, state)
if resumeID, ok := root["__resume_interrupt_id__"].(string); ok && resumeID != "" {
resumeData := root["__resume_data__"]
delete(root, "__resume_interrupt_id__")
delete(root, "__resume_data__")
ctx2 = compose.ResumeWithData(ctx2, resumeID, resumeData)
}
if s.runTracker != nil {
_ = s.runTracker.Start(ctx2, runID, canvasID, tenantIDFromRoot(root), userInput)
}
var cc *canvas.CompiledCanvas
if s.checkpointStore != nil && s.stateSerializer != nil {
cc, err = canvas.Compile(ctx2, c,
canvas.WithCheckPointStore(s.checkpointStore),
canvas.WithStateSerializer(s.stateSerializer),
)
} else {
cc, err = canvas.Compile(ctx2, c)
}
if err != nil {
s.markRunFailed(ctx2, runID, "compile: "+err.Error())
return nil, fmt.Errorf("canvas compile: %w: %w", err, ErrAgentStorageError)
}
if s.runTracker != nil {
_ = s.runTracker.AttachCheckpoint(ctx2, runID, runID)
}
_, err = cc.Workflow.Invoke(ctx2, map[string]any{"query": userInput})
if err != nil {
if canvas.IsInterruptError(err) || canvas.ExtractInterruptContexts(err) != nil {
s.markRunFailed(ctx2, runID, "interrupt: "+err.Error())
return state, err
}
s.markRunFailed(ctx2, runID, "invoke: "+err.Error())
return nil, fmt.Errorf("canvas invoke: %w: %w", err, ErrAgentStorageError)
}
s.markRunSucceeded(ctx2, runID)
return state, nil
}
}
AgentService option injection(internal/service/agent.go):
type AgentService struct {
// ... existing fields
checkpointStore canvas.CheckPointStore // nil = in-memory (test path)
stateSerializer canvas.StateSerializer // nil = eino default
runTracker *canvas.RunTracker // nil = best-effort no-tracking
runner *canvas.Runner
}
func NewAgentService() *AgentService {
return NewAgentServiceWithOptions(nil, nil, nil)
}
func NewAgentServiceWithOptions(
cp canvas.CheckPointStore,
ser canvas.StateSerializer,
rt *canvas.RunTracker,
) *AgentService {
return &AgentService{...}
}
Production boot wiring(cmd/server_main.go):
// SetRedisCheckPointStore + CanvasStateSerializer + RunTracker → NewAgentServiceWithOptions
// + configureTTSSynthesizer (audio.SetModelProviderSynthesizer)
// Redis 不可达时 graceful degradation: 退化为 in-memory (nil options)
DSL decoder(internal/service/canvas_decode.go):
decodeCanvasFromDSL 接受两种形态:
obj.component_name / obj.params (Python v1 DSL 直接写入)name / params (生产路径走 NormalizeForCanvas)不采用 JSON round-trip — 直接 map walking 更清晰,因为生产路径已通过 NormalizeForCanvas 扁平化。所有失败模式 wrap ErrAgentStorageError。
| Tier | 含义 | 验收 |
|---|---|---|
| T1 | 直接用 eino 已有类型/接口,零代码 | eino 单元测试覆盖 |
| T2 | 薄包装 1 struct + factory,对齐 Python 行为参数 | 跨 eino/RAGFlow 边界 + 1 e2e |
| T3 | compose.Lambda + StatePre/PostHandler | 1 单测 + 1 e2e |
| T4 | 嵌套 compose.Workflow + getState[CanvasState](ctx) | 子图单测 + 完整 e2e |
| T5 | 重 I/O + 第三方 lib | 单测 + e2e + 失败注入 |
判定原则:T1 > T2 > T3 > T4 > T5 时禁止跳级。
| Component | Python 行为 | Tier | Go 实现 | 状态 |
|---|---|---|---|---|
| LLM | LLMBundle 单轮 chat + JSON output + cite + stream | T1 | EinoChatModel 薄包装 internal/entity/models/<provider>.go;实现 model.ToolCallingChatModel;retryInvoker.Unwrap() + unwrapChatInvoker 实现 normal-absolute-count retry 语义 | ✅ |
| Agent | ReAct + tool/MCP + 多轮 stream | T1 | react.NewAgent + compose.ToolsNodeConfig{Tools: tools} + 22 tool 全注册;citation 中间件 + tool artifact 收集已实现;Reset() 走 interface{ Reset() } 类型断言 | ✅ |
| Switch | 多条件 (and/or) → 多 downstream + ELSE | T2 | compose.NewGraphMultiBranch 路由;12 of 12 operators (==/!=/contains/not contains/start with/end with/empty/not empty/>/</>=/<=) + case-insensitive string ops | ✅ |
| Categorize | LLM 分类 + 路由 | T3 | Lambda 调 LLM + compose.NewGraphMultiBranch | ✅ |
| Begin | DSL 入口 + 注入 inputs + 文件 inputs | T3 | Lambda + StatePreHandler;文件走 internal/service/file_service.go | ✅ |
| UserFillUp / Fillup | Jinja2 + file inputs + wait-for-user interrupt | T3 | text/template 替代 Jinja2 + eino interrupt via interrupt_resume.go | ✅ |
| Message | 最终输出(jinja2 + stream + downloads + filegen + TTS + memory) | T3 | Lambda + schema.StreamReader + text/template + MinIO + TTS dispatch + MemorySaver | 🟡 真实 TTS binary + MemorySaver completion deferred |
| Invoke | HTTP 客户端 + HTML 清洗 + JSON | T3 | net/http + golang.org/x/net/html | ✅ |
| Browser | LLM + HTTP + 文件下载 + MinIO | T3 | 复用 Invoke + LLM + storage | ✅ |
| DataOperations | dict 7 类操作 | T3 | Lambda + encoding/json + go/ast | ✅ |
| ListOperations | slice 6 类操作 | T3 | Lambda + slices (Go 1.21+ stdlib) | ✅ |
| StringTransform | split/merge + Jinja2 | T3 | Lambda + strings.Split + text/template | ✅ |
| VariableAggregator | 多 group,first-non-empty | T3 | Lambda + State 读 | ✅ |
| VariableAssigner | 11 个算子原地改 State | T3 | Lambda + State 写 | ✅ |
| Loop | 条件循环 + loop_variables 初始化 + 终止评估 | T4 | compose.NewWorkflow + workflowx.AddLoopNode(loop.go 自身变为 no-op marker;实际工作由 canvas/loop_subgraph.go 宏展开接管) | ✅ |
| Parallel | 数组并行处理 | T4 | workflowx.AddParallelNode 包装 | ✅ |
| DocsGenerator | pdf/docx/txt/md/html 生成 | T5 | signintech/gopdf (PDF) + 自实现 OOXML writer (DOCX) + yuin/goldmark (MD);render.go 提供 HTML/Markdown/plain rendering | 🟡 txt/md/html writers 部分缺失 |
| ExcelProcessor | pandas 读/合并/转换 Excel | T5 | xuri/excelize/v2 (BSD-3) | ✅ |
| Retrieval (Universe A) | canvas DAG node | T2 | newRetrievalComponent — 委派给 Universe B RetrievalTool | ✅ |
| Python 端 | 不移植原因 |
|---|---|
_feeded_deprecated_params / _deprecated_params / _user_feeded_params 三层装饰 | DSL v2 已去除;Go ComponentParamBase 不引入 |
ComponentParamBase.validate() + param_validation/*.json 96 文件 | Go struct tag + go-playground/validator/v10 替代 |
ComponentBase.thread_limiter = asyncio.Semaphore(...) | Go errgroup.SetLimit(MAX_CONCURRENT_CHATS) (stdlib x/sync) |
partial 流式 hack | eino schema.StreamReader 原生流式 |
thread_pool_exec(self._invoke, **kwargs) 异步伪装 | Go 全程 goroutine |
set_output("_ERROR", ...) + set_exception_default_value() 双轨 | Go error 单一返回 + eino OnError callback |
ExitLoop no-op 节点 | DSL v1 compat 通过 legacyNoOpNames 在 canvas 层吸收,不注册 component |
LoopItem 组件 | LoopItem 角色由 workflowx.AddLoopNode 内部 machinery 取代,不注册 component;TestLoop_Registered enforces absence |
Iteration / IterationItem 组件 | IterationStub + IterationItemStub 注册为 compat stubs(DSL round-trip) |
┌──────────────────────────────────────────────────────────────┐
│ Universe A — Canvas DAG Components │
│ Registry: internal/agent/component/registry.go (auto-init) │
│ Interface: Component { Invoke, Stream, Inputs, Outputs } │
│ Output: map[string]any │
│ Names: PascalCase — Retrieval, TavilySearch, ExeSQL, │
│ Answer, Generate, Begin, LLM, Switch, … │
│ Used by: Canvas DAG nodes (placed on the canvas directly) │
├──────────────────────────────────────────────────────────────┤
│ Universe B — Agent ReAct Tools │
│ Registry: internal/agent/tool/registry.go │
│ Interface: einotool.BaseTool { Info, InvokableRun } │
│ Output: JSON string (envelope) │
│ Names: snake_case — retrieval, tavily, execute_sql, … │
│ Used by: Agent component's tools=["…"] list, called via │
│ eino ReAct loop │
└──────────────────────────────────────────────────────────────┘
Mapping table:
| Universe A (PascalCase) | Universe B (snake_case) | 当前状态 |
|---|---|---|
| Retrieval | retrieval / search_my_dateset / search_my_dataset | 委派到 Universe B real (nlp + kg 双 backend) |
| ExeSQL | execute_sql / exesql | 委派到 Universe B real (mysql/pg/mssql/oceanbase/trino) |
| TavilySearch | tavily | 委派到 Universe B real |
| Answer | — | 需要 orchestrator-side pause/resume(已通过 eino interrupt 实现) |
| Generate | — | alias to LLM component |
| SearchMyDataset | — | 注册为 Retrieval alias (4 spellings: PascalCase + snake_case + Python-typo) |
// internal/agent/tool/registry.go
type Tool interface {
einotool.InvokableTool // eino 协议:Info() / InvokableRun(ctx, args, opts)
}
func BuildAll(names []string, params map[string]map[string]any) ([]einotool.BaseTool, error)
func BuildByName(name string, params map[string]any) (einotool.BaseTool, error)
21 tool 表 (alias 不算新 tool): akshare, arxiv, code_exec, crawler, deepl, duckduckgo, email, exesql(=execute_sql), github, google, google_scholar, jin10, pubmed, qweather, retrieval(=search_my_dateset=search_my_dataset), searxng, tavily, tushare, wencai, wikipedia, yahoo_finance。
Retrieval 双 registry:
internal/agent/tool/retrieval_nlp.go — NLPRetrievalAdapter 桥接 nlp.RetrievalServiceinternal/agent/tool/retrieval_kg.go — KGRetrievalAdapter 桥接 kg.Retrieval(...) (GraphRAG, use_kg=true)internal/agent/tool/retrieval_service.go — 两个独立 SetRetrievalService / SetKGRetrievalService registry; un-wired 返回 ErrRetrievalServiceMissing / ErrKGRetrievalServiceMissingMCP tools:internal/agent/tool/mcp.go — MCPToolAdapter.InvokableRun 通过 mcpclient.CallTool over streamable-HTTP dispatch。
Tool 通用模式:HTTP 类 tool 走 http_helper.go (context + retry + 指数 backoff);ExeSQL 走 stdlib database/sql + 各 driver (mysql / pg / mssql / oceanbase / trino);CodeExec 走 internal/agent/sandbox/ 5 providers (self_managed / aliyun / e2b / local / ssh) + tool/sandbox_bridge.go 桥接;Retrieval 走进程内 internal/service/nlp/retrieval.go (Dealer 后端已 Go 化)。
Parity legend: ✅ implemented & tested · 🟡 scaffolded (loud-fail sentinel) · ⚠️ implemented with a known gap vs Python.
| Name | Source | Status |
|---|---|---|
| Agent | internal/agent/component/agent.go | ✅ |
| Begin | internal/agent/component/begin.go | ✅ |
| Browser | internal/agent/component/browser.go | ✅ |
| Categorize | internal/agent/component/categorize.go | ✅ |
| DataOperations | internal/agent/component/data_operations.go | ✅ |
| DocsGenerator | internal/agent/component/docs_generator.go | ✅ |
| ExcelProcessor | internal/agent/component/excel_processor.go | ✅ |
| ExeSQL | internal/agent/component/universe_a_wrappers.go | ⚠️ Wrapper exists; registry primary still stub |
| Fillup | internal/agent/component/fillup.go | ✅ |
| Generate | internal/agent/component/fixture_stubs.go | ✅ Legacy alias for DSL round-trip |
| Invoke | internal/agent/component/invoke.go | ✅ |
| Iteration | internal/agent/component/fixture_stubs.go | ✅ Legacy alias; compat stub |
| IterationItem | internal/agent/component/fixture_stubs.go | ✅ Legacy alias; compat stub |
| ListOperations | internal/agent/component/list_operations.go | ✅ |
| LLM | internal/agent/component/llm.go | ✅ |
| Loop | internal/agent/component/loop.go | ✅ Engine-level macro (LoopItem/ExitLoop deliberately not registered) |
| Message | internal/agent/component/message.go | 🟡 TTS real engine + MemorySaver completion still deferred |
| Parallel | internal/agent/component/parallel.go | ✅ |
| Retrieval | internal/agent/component/universe_a_wrappers.go | ⚠️ Wrapper exists; registry primary still stub (also covers SearchMyDataset alias) |
| StringTransform | internal/agent/component/string_transform.go | ✅ |
| Switch | internal/agent/component/switch.go | ✅ All 12 operators with case-folded string ops |
| TavilySearch | internal/agent/component/universe_a_wrappers.go | ⚠️ Wrapper exists; registry primary still stub |
| UserFillUp | internal/agent/component/userfillup.go | ✅ |
| VariableAggregator | internal/agent/component/variable_aggregator.go | ✅ |
| VariableAssigner | internal/agent/component/variable_assigner.go | ✅ |
| Answer | internal/agent/component/fixture_stubs.go | 🟡 Compat stub; canvas pause/resume is real but the Answer node is still a placeholder |
Stub vs wrapper:
Retrieval/TavilySearch/ExeSQLhave real delegation wrappers inuniverse_a_wrappers.go; the registry still maps them to stubs infixture_stubs.go. Tracked in §14.
| Name | Source | Status |
|---|---|---|
| akshare | internal/agent/tool/akshare.go | ✅ |
| arxiv | internal/agent/tool/arxiv.go | ✅ |
| code_exec | internal/agent/tool/code_exec.go + code_exec_client.go | ✅ All 5 sandbox providers |
| crawler | internal/agent/tool/crawler.go | ✅ |
| deepl | internal/agent/tool/deepl.go | ✅ |
| duckduckgo | internal/agent/tool/duckduckgo.go | ✅ |
internal/agent/tool/email.go | ✅ | |
| execute_sql | internal/agent/tool/exesql.go | ⚠️ SELECT-only; rejects Trino/DB2 (ErrExeSQLUnsupportedDB) |
| exesql | internal/agent/tool/exesql.go | ⚠️ Alias of execute_sql |
| github | internal/agent/tool/github.go | ✅ |
internal/agent/tool/google.go | ✅ | |
| google_scholar | internal/agent/tool/google_scholar.go | ✅ |
| jin10 | internal/agent/tool/jin10.go | ✅ |
| mcp | internal/agent/tool/mcp.go | 🟡 MCPToolAdapter wraps mcpclient.Tool; InvokableRun returns "not yet implemented" until mcpclient.CallTools lands |
| pubmed | internal/agent/tool/pubmed.go | ✅ |
| qweather | internal/agent/tool/qweather.go | ✅ |
| retrieval | internal/agent/tool/retrieval.go | ✅ Adapter + boot wiring (cmd/server_main.go) |
| search_my_dataset | internal/agent/tool/registry.go | ✅ Alias of retrieval |
| search_my_dateset | internal/agent/tool/registry.go | ✅ Python-typo alias of retrieval |
| searxng | internal/agent/tool/searxng.go | ✅ |
| tavily | internal/agent/tool/tavily.go | ✅ |
| tushare | internal/agent/tool/tushare.go | ✅ |
| wencai | internal/agent/tool/wencai.go | ✅ |
| wikipedia | internal/agent/tool/wikipedia.go | ✅ |
| yahoo_finance | internal/agent/tool/yahoo_finance.go | ✅ |
Total: 49 named entities (24 components + 25 tools).
RAGFlow agent DSL 现在只有一种 wire 形态(之前 v1/v2 双轨已删):
{
"globals": {...}, // sys.query / sys.user_id / ...
"graph": { "nodes": [...], "edges": [...] }, // React-Flow 布局
"variables": {...}, // 用户级变量
"components": { "<Name>:<UUID>": { // 执行拓扑
"downstream": [...], "upstream": [...],
"obj": { "component_name": "Name", "params": {...} }
}},
"path": [...], "retrieval": {...}, "history": [...] // 运行时状态
}
单一 wire 的硬性保证:
graph + components。前端 use-build-dsl.ts 在 PUT 时一并填充两个块,back-end 不依赖 graph。dsl.NormalizeForCanvas(internal/handler/agent.go:226、internal/service/agent.go:217,273)。所有 Python ↔ Go 路径的 dsl 都在解码边界过一次。internal/agent/dsl/ 包当前仅 normalize.go + normalize_test.go + testdata/(v1↔v2 转换器与 v2.go/loader.go/converter_v1_to_v2.go/converter_v2_to_v1.go 已 git rm)。internal/agent/dsl/normalize.go 的 NormalizeForCanvas(dsl map[string]any) map[string]any:
enforceHandleIds(dsl) — 把 graph.edges[*].sourceHandle / targetHandle 规约为 React-Flow 约定。buildGraphFromComponents(components) — 若 graph.nodes 缺失,从 components 派生默认布局。foldLegacyLoopVariants(dsl) — 把 Loop+LoopItem / Iteration+IterationItem 折叠成单个 Loop / Parallel 节点。Loop+LoopItem / Iteration+IterationItem 旧类名(stable server,本次不动)。Loop 已经是单节点(internal/agent/component/loop.go),Parallel 已经是单节点。Iteration / IterationItem 仅作为 alias 留在 internal/agent/component/fixture_stubs.go,stub 体内委托给 Parallel factory。Operator 枚举里 Iteration / IterationStart / LoopStart 保留。canvas.Compile(ctx, c *Canvas, opts...) 接收的 *Canvas 预期已经过 NormalizeForCanvas。如果某条路径直接 unmarshal dsl 后丢给 Compile 而没走 decoder,Compile 入口会 log.Printf 一行 stderr warning。
internal/agent/dsl/testdata/{all, browser, dfx_picture_parser, questions_category, resume, subaget, switch}.json 顶层都是 {globals, graph, variables}(graph.nodes / graph.edges 完整)。没有 components 顶层 key。这是 import / export 文件的形态。
web/src/pages/agent/utils/dsl-bridge.ts 重写为单一模式:
DSL_MODE / DslMode / if (DSL_MODE === 'v1') / if (DSL_MODE === 'v2') 编译期分支importDsl(rawParsed, isAgent) 单一优先级:raw.graph.nodes 非空 → 用之;否则 fallback 到 empty seeddslToGraph(dsl) 同样只读 dsl.graph.nodesinternal/agent/workflowx/ 提供零侵入 eino 扩展——不修改 eino 源码,只提供外部 helper。
API:
func AddLoopNode[T any](
ctx context.Context,
wf *compose.Workflow[T, T],
key string,
sub *compose.Workflow[T, T],
shouldQuit LoopCondition[T],
opts ...LoopOption,
) (*compose.WorkflowNode, error)
执行模型(do-while 语义):
currentnextshouldQuit(ctx, iteration, current, next) — iteration 从 1 开始next;否则 current = next 继续实现要点:
compose.AnyLambda[T, T, struct{}](...) 包裹 invoke + stream 双路径WithLoopMaxIterations(n) 强建议(防意外死循环)WithLoopStream(mode) — LoopStreamFinalOnly (默认) / LoopStreamEveryIterationErrLoopMaxIterationsExceeded / ErrLoopSubGraphInterrupted / ErrLoopResumeStateInvalid / ErrLoopQuitConditionFailedcompose.Runnable[T,T] + sub-checkpoint 通过 loop-owned bridge storeCheckpoint/Resume 合约:
compose.CompositeInterrupt 向上传播;resume 从中断的 iteration 继续(不重头)WithLoopCheckpointIDBuilder(nodeKey, iteration);默认 workflowx-loop:<nodeKey>:<iteration> 命名空间Loop 在 canvas 中的应用:
Loop 在 Go 端是单节点:registry 注册 + 工厂,但 LoopComponent.Invoke 是 no-opBuildWorkflow 看到名为 Loop 的 cpn 时:调用 expandLoopSubgraph 收集下游、构建 sub-compose.Workflow[map[string]any, map[string]any]、调 workflowx.AddLoopNode 把结果作为单节点插入外图LoopItem / ExitLoop 已删除(v1 compat 通过 legacyNoOpNames 在 canvas 层吸收)API:
func AddParallelNode[I, O any](
ctx context.Context,
wf *compose.Workflow[[]I, []O],
key string,
sub Compilable[I, O],
opts ...ParallelOption,
) (*compose.WorkflowNode, error)
实现要点:
WithParallelMaxConcurrency(n int):0 / 1 = 顺序执行;> 1 = 信号量并发outputs[i] 永远对应 inputs[i]ErrParallelCompileFailed / ErrParallelResumeStateInvalid;per-item 错误用 fmt.Errorf("item %d: %w", idx, err) 包装compose.CompositeInterrupt(ctx, nil, state, interruptErrs...)CompletedResults ∪ InterruptedIndices = 0..TotalCount-1(partition 完整)Parallel 在 canvas 中的应用:
Parallel component 走 T4 薄包装:注册时传 agenttool.BuildByName("parallel", params)(实际是 internal/agent/component/parallel.go 的 ParallelComponent),内部用 workflowx.AddParallelNode 把 sub-workflow 插入外图关键发现:Phase 4.1 "Canvas parallel batch execution" 不需要额外实现 — eino compose.Workflow.Run 本身就在每个 topological wave 内 spawn 一个 go t.execute() per ready node。
canvas/parallel_batch_test.go::TestBuildWorkflow_ParallelBatchStructure pin 4-node sibling compilecanvas/parallel_timing_test.go::TestCanvas_ParallelExecution_StaticAnalysis pin 5-node DAG compile 静态分析workflowx/parallel.go 仍存在,但仅用于 Parallel component (Loop/Iteration 风格的 array parallel),不是 canvas 层的 ready-node 调度。
Key 1:agent:cp:{check_point_id} — eino payload 存储
[]byte,不走 JSON — eino Serializer 已负责序列化)EXPIRE 30*24*3600 一次设置CheckPointStore 是纯 KV 接口 — Get(ctx, id) ([]byte, bool, error) / Set(ctx, id, []byte) errorKey 2:agent:run:{run_id} — 业务元数据存储 (Redis Hash)
| 字段 | 类型 | 含义 |
|---|---|---|
canvas_id | string | user_canvas.id |
tenant_id | string | 从 user-tenant lookup |
checkpoint_id | string | 当前 run 的最新 checkpoint (指向 key 1) |
parent_run_id | string | resume_from 源 run (续跑链),可空 |
status | int (0/1/2/3) | 0=running 1=succeeded 2=failed 3=cancelled |
failure_reason | string | 失败原因 (err.Error()) |
cancel_requested | int (0/1) | 1=用户/admin 已请求 cancel |
started_at | int (epoch ms) | |
finished_at | int (epoch ms) | 退出时填写 |
RunTracker.Start/MarkSucceeded/MarkFailed/MarkCancelled/AttachCheckpoint 显式调用status=failed 由应用层自己写agent:cp:*)| # | 触发点 | eino 源码 | 用途 |
|---|---|---|---|
| W1 | 节点显式 compose.Interrupt(ctx, info) / StatefulInterrupt(ctx, info, state) | compose/interrupt.go | human-in-the-loop、外部 API 回调、限流暂停 |
| W2 | compose.WithInterruptBeforeNodes([]string) / WithInterruptAfterNodes([]string) 编译期拦截点 | compose/interrupt.go | 命中后写盘 + 终止 run (与 W1 共用 handleInterrupt 路径);默认开 0 个 |
| W3 | 子 graph interrupt 向上传播 | subGraphInterruptError | 嵌套 subgraph / ToolsNode / agentic 抛 interrupt 时,父 graph 同步落盘 |
| W4 | 运行退出 | WithCheckPointID + WithWriteToCheckPointID | run 退出时最后一次落盘 |
| # | 触发点 | 写入函数 |
|---|---|---|
| B1 | Canvas run 启动 | RunTracker.Start(runID, canvasID, tenantID, parentRunID) |
| B2 | Run 正常完成 | RunTracker.MarkSucceeded(runID) |
| B3 | Run 失败 | RunTracker.MarkFailed(runID, err.Error()) |
| B4 | Run 被 cancel | RunTracker.MarkCancelled(runID) |
| B5 | Compile 成功后 | RunTracker.AttachCheckpoint(runID, cpID) |
| R1 | HTTP POST /run?resume_from=run_xxx | handler: HGetAll("agent:run:run_xxx") → checkpoint_id → WithCheckPointID(cpID) + WithWriteToCheckPointID(newCP) + RunTracker.Start(newRunID, canvas, tenant, "run_xxx") |
internal/agent/canvas/checkpoint_store.go:
type CheckPointStore interface {
Get(ctx context.Context, id string) ([]byte, bool, error)
Set(ctx context.Context, id string, data []byte) error
Delete(ctx context.Context, id string) error // 自定义扩展, eino compose.CheckPointStore 无此方法
}
internal/agent/canvas/state_serializer.go:
type StateSerializer interface {
Marshal(v any) ([]byte, error)
Unmarshal(data []byte, v any) error
}
// CanvasStateSerializer — encoding/json
type CanvasStateSerializer struct{}
func (CanvasStateSerializer) Marshal(v any) ([]byte, error) { return json.Marshal(v) }
func (CanvasStateSerializer) Unmarshal(b []byte, v any) error { return json.Unmarshal(b, v) }
internal/agent/canvas/compile.go — 关键修正:
// 注意: 不能用 compose.WithSerializer 覆盖 eino 的 InternalSerializer!
// eino 的 compose.Serializer 同时控制 (a) 用户提供的 state 序列化 AND (b) eino 内部
// graph state 序列化。覆盖会破坏 eino graph 内部 marshal/unmarshal 逻辑。
//
// 正确做法: 仅 wire WithCheckPointStore (custom KV 接口), 让 eino 内部
// InternalSerializer 保留默认值。同时 CanvasState 自带 MarshalJSON 让
// eino StatePre/PostHandler 能序列化 state。
func Compile(ctx context.Context, c *Canvas, opts ...CompileOption) (*CompiledCanvas, error) {
cfg := CompileOptions{}
for _, o := range opts { o(&cfg) }
compileOpts := []compose.GraphCompileOption{
compose.WithCheckPointStore(checkPointAdapter{cfg.Store}), // 适配 Delete
}
// 显式 NOT 调用 compose.WithSerializer
return wf.Compile(ctx, compileOpts...)
}
// checkPointAdapter drops the Delete method that compose.CheckPointStore does not declare.
type checkPointAdapter struct{ inner CheckPointStore }
func (a checkPointAdapter) Get(ctx context.Context, id string) ([]byte, bool, error) {
return a.inner.Get(ctx, id)
}
func (a checkPointAdapter) Set(ctx context.Context, id string, data []byte) error {
return a.inner.Set(ctx, id, data)
}
CompiledCanvas struct:
type CompiledCanvas struct {
Workflow compose.Runnable
CheckPointID string // 暂时空字符串; V2.1 从 eino Runnable 表面化
}
// internal/agent/canvas/cancel.go
func Run(ctx context.Context, taskID string, compiled compose.Runnable[...]) error {
einoCtx, interrupt := compose.WithGraphInterrupt(ctx)
defer close(stopCh)
go watchCancel(taskID, func() {
interrupt(compose.WithGraphInterruptTimeout(30 * time.Second))
})
return compiled.Invoke(einoCtx, input,
compose.WithCheckPointID(genID(taskID)),
compose.WithWriteToCheckPointID(genID(taskID)),
)
}
func watchCancel(taskID string, onCancel func()) {
ticker := time.NewTicker(500 * time.Millisecond) // 500ms 轮询
defer ticker.Stop()
for {
select {
case <-stopCh: return
case <-ticker.C:
v, _ := redis.Get(context.Background(), fmt.Sprintf("%s-cancel", taskID))
if v != "" { onCancel(); return }
}
}
}
Python 兼容:{task_id}-cancel Redis key 命名与 Python 端 task_service.py 协议完全一致。
Canvas run goroutine (Go)
↓
eino Graph Engine
↓ (OnStart / OnEnd / OnError auto-injected)
callbacks.Handler (业务实现)
├─ OTelHandler
│ └─ 开始 span → 注入 attributes → 结束 span
│ └─ otlphttpexporter → OTel Collector (外部)
│ ├─ Jaeger / Tempo (trace UI)
│ ├─ Langfuse (LLM 专门)
│ └─ Prometheus / Grafana
└─ SSEHandler (业务事件流) → admin UI
| 通道 | 用途 | 协议 | 消费者 |
|---|---|---|---|
| SSE | 业务事件("node 开始/结束/消息") | text/event-stream HTTP | admin UI |
| OTel span | 系统可观测性(节点耗时/错误/token) | OTLP HTTP | 运维/APM |
| eino 时机 | OTel 行为 | Span attribute |
|---|---|---|
OnStart(ctx, info, input) | tracer.Start(ctx, info.Name) → 写入 ctx | eino.component.name, eino.component.type, eino.input.size |
OnEnd(ctx, info, output) | span.End() | eino.output.size |
OnError(ctx, info, err) | span.RecordError(err) + span.SetStatus(codes.Error, ...) | eino.error.message |
export OTEL_EXPORTER_OTLP_ENDPOINT="http://otel-collector:4318"
export OTEL_SERVICE_NAME="ragflow-agent"
export OTEL_RESOURCE_ATTRIBUTES="service.namespace=ragflow,deployment.environment=production"
export OTEL_TRACES_SAMPLER="parentbased_traceidratio"
export OTEL_TRACES_SAMPLER_ARG="0.1" # 10% 采样
降级:未配置 OTEL_EXPORTER_OTLP_ENDPOINT → handler 退化为 noop,不影响业务。
Go 端支持多版本并存(永不覆盖):
| 场景 | 行为 |
|---|---|
| 编辑器保存草稿 | UPDATE user_canvas SET dsl=? WHERE id=? (不创建 version) |
| 点击"发布" | INSERT user_canvas_version(...) 新行;UPDATE user_canvas SET release=true, dsl=?, update_at=NOW() |
| Run 不带 version | 拉取最新 user_canvas_version (create_time DESC LIMIT 1) |
Run ?version=v_xxx | 拉取指定 user_canvas_version |
Run ?version=draft | 拉取 user_canvas.dsl (编辑器未发布状态) |
API 端:
GET /api/v1/agents/{id}/versions — 列表POST /api/v1/agents/{id}/versions — 显式发布DELETE /api/v1/agents/{id}/versions/{version_id} — 删除POST /api/v1/agents/{id}/run?version=xxx — 指定版本运行| 用途 | 选 | License | 备注 |
|---|---|---|---|
| PDF 生成 | signintech/gopdf | MIT | 主选;TTF 字体注册 + CJK + header/footer 内置 |
| DOCX 生成 | 自实现 OOXML writer | — | Go archive/zip stdlib + text/template + //go:embed |
| Excel 读写 | xuri/excelize/v2 | BSD-3 | 无 license 风险 |
| Markdown 解析 | yuin/goldmark | MIT | CommonMark 标准 |
| HTML 解析 | golang.org/x/net/html | BSD-3 | stdlib 旁路 |
| OpenTelemetry SDK | go.opentelemetry.io/otel v1.44.0 | Apache-2.0 | |
| MySQL driver | go-sql-driver/mysql | MPL-2.0 | ExeSQL 走 stdlib database/sql |
| PG driver | lib/pq | MIT | |
| MSSQL driver | denisenkom/go-mssqldb | BSD-3 | |
| Trino driver | trinodb/trino-go-client v0.333.0 | Apache-2.0 | ExeSQL Trino dialect |
| Jinja2 模板 | nikolalohinski/gonja v1.5.3 | MIT | Phase 8a — 直接 import (from indirect) |
| Test SQL mock | DATA-DOG/go-sqlmock | MIT | ExeSQL 注入测试 |
RAGFlow 是 Apache-2.0;AGPL-3 强传染会让整个 RAGFlow Go 二进制被迫 AGPL-3 化。所有候选 AGPL-3 库 (unipdf / unioffice / fumiama-go-docx / baliance-gooxml) 全部排除。
AGPL-3 预筛规则:
go-licenses check 命中 AGPL → 构建失败| Method | Path | Handler | 说明 |
|---|---|---|---|
GET | /api/v1/agents | ListAgents | 已存在 |
POST | /api/v1/agents | CreateAgent | |
GET | /api/v1/agents/{id} | GetAgent | |
PATCH | /api/v1/agents/{id} | UpdateAgent | |
DELETE | /api/v1/agents/{id} | DeleteAgent | 级联删除所有 version |
POST | /api/v1/agents/{id}/run | RunAgent | 同步; ?version=v_xxx 缺省=最新 |
POST | /api/v1/agents/{id}/stream | StreamAgent | SSE; emits message / waiting_for_user / error / done events |
POST | /api/v1/agents/{id}/cancel | CancelAgent | 写 Redis cancel key |
GET | /api/v1/agents/{id}/versions | ListVersions | |
POST | /api/v1/agents/{id}/versions | PublishVersion | |
GET | /api/v1/agents/{id}/versions/{vid} | GetVersion | |
DELETE | /api/v1/agents/{id}/versions/{vid} | DeleteVersion | |
POST | /api/v1/admin/canvas-runtime/:tenant_id | AdminRuntime | 翻转租户 override |
SSE 事件 payload:
event: message
data: {"answer": "...", "reference": [...]}
event: waiting_for_user
data: {"cpn_id": "node:userfillup_1"}
event: error
data: {"error": "..."}
event: done
data: [DONE]
| 类别 | 标准 |
|---|---|
| 功能 | 19 component × ≥3 单测 = ≥57 个 component 单测 |
| 功能 | 21 tool × ≥2 单测 = ≥42 个 tool 单测 |
| eino 复用 | T1 组件 (LLM/Agent) 回归:跑 eino 自带 react_test.go / chatmodel_test.go / compose_test.go 不退化 |
| 功能 | {{cpn_id@param}} 任意节点读任意节点, 单测覆盖 |
| 功能 | SSE 事件序列与 Python agent_api.py 一致: message / waiting_for_user / error / done |
| wait-for-user | Canvas 含 UserFillUp 节点 → 首次运行到 UserFillUp 暂停 → SSE waiting_for_user → 用户提交后恢复运行 → 最终输出 message + done 事件 |
| RunAgent e2e | 4 e2e sub-tests: TestRunAgent_RealCanvas_BeginMessage / _CompileFails / _InvokeFails / _WaitForUserResume |
| RunTracker | miniredis-backed e2e pinning Start → AttachCheckpoint → MarkSucceeded sequence |
| TTS dispatch | model-provider integration wired (audio.NewTTSDispatchFunc) |
| per-class timeout | ExeSQL→3s, TavilySearch→12s, uniform fallback, env override |
| LLM retry | MaxRetries=5 → exactly 6 invoker calls (absolute count) |
| 可靠 | Redis 取消协议:cancel → 5s 内节点 stop (500ms 轮询下 p99 ≤ 500ms) |
| 可观测性 | OTel handler P99 overhead < 2% (100 节点) |
| checkpoint | Redis RedisCheckPointStore Get/Set/Delete 通过 eino 集成测试 |
| 代码质量 | 公共 API 100% godoc 注释;>=80% test coverage on internal/agent/canvas |
| 风险 | 严重度 | 缓解 |
|---|---|---|
| eino State 在高并发下 mutex 竞争 | 中 | Phase 1 末 benchmark;若 > 5% 调度开销,引入分片 mutex |
| v1 DSL 100% 兼容不可能 | 中 | 不兼容的旧 DSL 走"自动转换 + 提示"路径 |
| Tool 外部 HTTP 失败 | 中 | 复用 http_helper.go 的 retry |
| 前端 DSL 编辑器只懂 v1 | 中 | Phase 5 维持 v1 写出能力 |
| 测试环境无 LLM key | 低 | 所有 LLM 组件测试走 mock provider driver |
| LLM retry multiplicative stacking | 中 | retryInvoker.Unwrap() + unwrapChatInvoker 让 MaxRetries = absolute count |
| CodeExec feature gap vs Python | 中 | 5 sandbox providers 已 ported;docs/develop/sandbox-python-go-diff.md 详细记录 per-provider diff |
| real TTS binary shape TBD | 中 | model-provider 60+ driver 路由;real binary 由 model provider 决定 |
| real MemorySaver 端口 partial | 低 | partial port;user-deferred |
可操作的下一轮跟进项 (按优先级):
(canvasID, versionID, DSL-hash) 缓存编译产物;仅在 profiling 显示 Compile 主导热路径时启动。1-2 周。browser.py 29.4K vs Go 8.9K,差 3.3×。需要 scope 决策:完整 Playwright 移植 vs 缩减到核心场景。1 周。internal/service/memory_message_service.go 完整实现。1-2 周,user-deferred。github.com/ibmdb/go_ibmdb + native client lib。仅在 e2e 需求浮现时启动。0.5-1 周。agent_api.py 标 @deprecated + 兼容 proxy shim。2-4 周。{{cpn_id@param}})。AddAgenticModelNode 替换 LLM component; AddRetrieverNode 替换 Retrieval component。The five Python sandbox providers are ported to Go with functional parity (self_managed, aliyun, local, ssh) and one strict superset (e2b — Go is real, Python is a stub). Admin-panel settings reader lands in ProviderManager.LoadFromSettings (see commit history). The remaining 7 gaps are intentional and tracked here:
TemplateName not sent on CreateCodeInterpreter (operators must pre-create non-default templates via Python or the aliyun console, then reference by name in metadata); (2) execute uses raw HTTP because the SDK has no execute method (the wire format was reverse-engineered from the Python SDK). Swap to the SDK calls when both APIs land. (1-2 days once the SDK releases; no in-house workaround)LocalProvider rlimits not applied — Go os/exec has no portable pre-start hook; rlimits (RLIMIT_AS/CPU/FSIZE/NOFILE) are not enforced. The Go LocalProvider is not a security boundary — for adversarial code, use SelfManagedProvider (executor_manager + gVisor) or AliyunCodeInterpreterProvider (cloud microVM). This matches the Python note that "local" is "for development / trusted environments". (no fix planned — by design)SSHProvider uses SSH exec, not SFTP — avoids the github.com/pkg/sftp dependency. For workloads with many large artifacts, switch to pkg/sftp if profiling shows exec overhead. (1 day, deferred until profiling shows it matters)LocalProvider — syscall.Setpgid is POSIX-only. The Go side is //go:build !windows; the Python side runs on Windows via process.kill(). Tracked; not blocking because RAGFlow production is Linux. (1-2 days, deferred)github.com/eric642/e2b-go-sdk v0.1.3 (Apache-2.0). Re-evaluate quarterly; fork to github.com/infiniflow/e2b-go-sdk if maintenance lags. (1 day fork if needed)otelhttp.NewTransport). Providers themselves do not emit OTel spans. (1 day)cmd/server_main.go registers the runtime in three layers:
internal/agent/sandbox/manager.go) — chooses which sandbox provider backs CodeExec. Default self_managed; override via SANDBOX_PROVIDER_TYPE. Falls back to env-driven init when the admin-panel settings table is empty/malformed.internal/agent/tool/retrieval_service.go) — nlp.NewRetrievalService(docEngine, docDAO) and kg.NewRetrieval(...) are wired via tool.SetRetrievalService(...) / tool.SetKGRetrievalService(...) at boot. The first backs use_kg=false; the second backs use_kg=true.internal/service/agent.go) — accepts optional Redis-backed CheckPointStore / StateSerializer / RunTracker via NewAgentServiceWithOptions(...). Boot installs these when Redis is up; otherwise the fields stay nil and the service falls back to in-memory mode (transparent to callers).Any layer that is not wired at boot produces a loud-fail sentinel (see §15.3) — stubs never silently return empty results.
| Env var | Default | Effect |
|---|---|---|
SANDBOX_PROVIDER_TYPE | self_managed | One of self_managed / aliyun_codeinterpreter / e2b / local / ssh |
SANDBOX_EXECUTOR_MANAGER_URL | http://sandbox-executor-manager:9385 | self-managed endpoint |
SANDBOX_EXECUTOR_MANAGER_TIMEOUT | 30 (s) | self-managed per-call timeout |
AGENTRUN_* (5 vars) | n/a | aliyun code interpreter |
E2B_API_KEY / E2B_ACCESS_TOKEN | n/a | e2b (one required) |
E2B_TEMPLATE | base | e2b sandbox template |
LOCAL_* (8 vars) | n/a | local subprocess |
SSH_HOST / SSH_PORT / SSH_USERNAME / SSH_PASSWORD / SSH_PRIVATE_KEY / SSH_PRIVATE_KEY_PATH | n/a | SSH provider |
COMPONENT_EXEC_TIMEOUT | 600 (s) | canvas-level per-invocation timeout; per-class overrides via env-derived map (see canvas/timeout.go) |
| Sentinel | Cause | Fix |
|---|---|---|
ErrRetrievalServiceMissing | tool.SetRetrievalService(...) not called at boot | Wire nlp.NewRetrievalService at boot (default in cmd/server_main.go) |
ErrKGRetrievalServiceMissing | Canvas uses use_kg=true and tool.SetKGRetrievalService(...) not called | Wire kg.NewRetrieval(...) at boot (default in cmd/server_main.go) |
ErrMemoryServiceMissing | component.SetMemorySaver(...) not called at boot | Wire NewMemoryMessageService(...) (default in cmd/server_main.go) |
ErrEmbedderNotWired | MemorySaver reached but no embedder configured | Port the embedding model — see §14 |
ErrSandboxNotConfigured | SANDBOX_PROVIDER_TYPE set to unknown value | Set to one of the 5 supported values |
ErrE2BProviderNotImplemented | SANDBOX_PROVIDER_TYPE=e2b and no E2B_API_KEY/E2B_ACCESS_TOKEN | Provide one of the two env vars |
ErrTTSEngineNotConfigured | Message runs with auto_play=true and no audio.SetSynthesizer(...) | Wire a TTS engine at boot — see §14 |
ErrExeSQLUnsupportedDB | db_type is trino or ibm db2 | Add the driver registration — see §14 |
tools/migrate-canvas cross-validates Python's normalize_chunker_dsl against Go's NormalizeForCanvas. Manual equivalent until the tool ships:
GET /api/v1/canvas/<id>/export.uv run python -c "from agent.canvas import normalize_chunker_dsl; print(normalize_chunker_dsl(json.load(open('canvas.json'))))".go test ./internal/agent/dsl/ -run TestNormalize -v (uses fixtures in internal/agent/dsl/testdata/).go test -count=1 ./internal/agent/... # all agent tests
go test -count=1 ./internal/agent/component/ # component tests
go test -count=1 ./internal/agent/tool/ # tool tests + retrieval + sandbox providers
go test -count=1 ./internal/agent/sandbox/ # 5 sandbox providers + manager
go test -count=1 ./internal/agent/canvas/ # canvas engine, parallel, interrupt/resume
go test -count=1 ./internal/agent/runtime/ # state, template, history window
Fixtures: internal/agent/dsl/testdata/ (7 JSONs) drive the e2e suite and match the input corpus Python's normalize_chunker_dsl accepts.
| 设计点 | 关键文件 |
|---|---|
| State 模式 | internal/agent/canvas/{state.go, scheduler.go} + internal/agent/runtime/{state.go, context.go, template.go, template_jinja.go} |
| CanvasState MarshalJSON | internal/agent/runtime/state.go |
| runtime 提取 | internal/agent/runtime/*.go (8 文件) + internal/agent/canvas/state_export.go |
| Loop 宏展开 | internal/agent/canvas/loop_subgraph.go + internal/agent/component/loop.go (no-op marker) |
| Parallel | internal/agent/component/parallel.go + internal/agent/workflowx/parallel.go |
| Loop 通用节点 | internal/agent/workflowx/loop.go + loop_*_test.go |
| Interrupt 路径 | internal/agent/canvas/interrupt_resume.go + internal/agent/canvas/runner.go |
| Checkpoint | internal/agent/canvas/{checkpoint_store.go, run_tracker.go, state_serializer.go, compile.go} |
| Compile 适配 | internal/agent/canvas/compile.go (checkPointAdapter) |
| Per-class timeout | internal/agent/canvas/timeout.go + node_body.go |
| Cancel 协议 | internal/agent/canvas/cancel.go |
| OTel | internal/observability/otel/{provider.go, handler.go, handler_test.go} |
| DSL normalize | internal/agent/dsl/{normalize.go, normalize_test.go} + testdata/ |
| Tool registry | internal/agent/tool/{registry.go, http_helper.go, ssrf.go, mcp.go, retrieval*.go} |
| Component 5-tier | internal/agent/component/{base.go, registry.go, runtime_wire.go, fixture_stubs.go, universe_a_wrappers.go} + 19 component .go |
| AgentService V2 | internal/service/agent.go (buildRunFunc) + internal/service/canvas_decode.go + internal/service/agent_run_e2e_test.go |
| Sandbox providers | internal/agent/sandbox/{self_managed.go, aliyun.go, e2b.go, local.go, ssh.go, manager.go} + tool/sandbox_bridge.go |
| TTS dispatch | internal/agent/audio/{tts.go, tts_dispatch.go, model_provider_synthesizer.go} |
| 包 | 测试文件数 | 覆盖点 |
|---|---|---|
internal/agent/canvas | 17 | canvas_test.go, scheduler_test.go, state_test.go, variable_test.go, state_bench_test.go, state_serializer_test.go, checkpoint_store_test.go, run_tracker_test.go, cancel_test.go, stream_test.go, loop_subgraph_test.go, loop_semantics_test.go, dsl_examples_e2e_test.go, interrupt_resume_test.go, multibranch_test.go, node_body_timeout_test.go, node_body_per_class_timeout_integration_test.go, parallel_batch_test.go, parallel_timing_test.go |
internal/agent/component | 50+ | 各 component _test.go + verify_p1_test.go + production_chain_fixes_test.go |
internal/agent/tool | 30+ | 各 tool _test.go + registry_test.go + retrieval_nlp_test.go + retrieval_kg_test.go + exesql_trino_test.go + exesql_unsupported_test.go + http_helper_test.go + ssrf_test.go + mcp_test.go |
internal/agent/runtime | 4 | metrics_test.go, selector_test.go, state_test.go, template_jinja_test.go |
internal/agent/workflowx | 8 | loop_test.go, loop_options_test.go, loop_integration_test.go, loop_example_test.go, parallel_test.go, parallel_options_test.go, parallel_integration_test.go, parallel_helpers_test.go |
internal/agent/dsl | 1 | normalize_test.go |
internal/agent/audio | 3 | model_provider_synthesizer_test.go, tts_dispatch_test.go, tts_test.go |
internal/agent/sandbox | 6 | e2b_test.go, local_test.go, manager_test.go, result_protocol_test.go, self_managed_test.go, ssh_test.go |
internal/observability/otel | 1 | handler_test.go (tracetest.SpanRecorder) |
internal/service | 8+ | canvas_decode_test.go, agent_run_e2e_test.go, agent_test.go, agent_sessions_test.go, chat_session_test.go, ... |
internal/handler | 10+ | agent_test.go, agent_wait_for_user_test.go, admin_runtime_test.go, ... |
| Endpoint | URL | Status | Go port need |
|---|---|---|---|
| DLA (Document Layout Analysis) | POST {DEEPDOC_URL}/predict | Remote HTTP (via dla_cli.py) | Go client with 3-retry + 18s timeout |
| OCR | No remote endpoint | Local ONNX only | None — ErrNotImplemented stub |
| TSR (Table Structure Recognition) | No remote endpoint | Local ONNX only | None — ErrNotImplemented stub |
Single toggle: DEEPDOC_URL (preferred) or TENSORRT_DLA_SVR (legacy).
POST {DEEPDOC_URL}/predictmultipart/form-data, field name request, raw JPEG bytes{"bboxes": [[left, top, right, bottom, score, type_idx], ...]}[]{
"components": {
"<cpn_id>": {
"obj": {"component_name": "Retrieval", "params": {...}},
"downstream": ["generate_0"],
"upstream": ["answer_0"]
}
},
"path": ["begin"],
"history": [],
"retrieval": {"chunks": [], "doc_aggs": []},
"globals": {"sys.query": "", "sys.user_id": "...", "sys.conversation_turns": 0,
"sys.files": [], "sys.history": [], "sys.date": "..."},
"variables": {},
"memory": []
}
Two regexes:
variable_ref_patt = r"\{* *\{([a-zA-Z:0-9]+@[A-Za-z0-9_.-]+|sys\.[A-Za-z0-9_.]+|env\.[A-Za-z0-9_.]+)\} *\}*"
iteration_alias_patt = r"\{* *\{(item|index|result)\} *\}*"
Key behaviors the Go port must mirror:
{{var}}, {{ var }}, {{{var}}} are all validsys.*/env.*: namespace-only (no @), read from State flat namespace[email protected]: dot-path traversal with json.loads on strings, dict.get, list[int] index{{...}}: resolves to "", never crashesis_reff: returns True only if cpn_id@param resolves to a known componentAll comparisons use .lower(). Stored cpn_ids may be any case. Go port must NOT key component map by case-sensitive cpn_id.
| # | Component | File | component_name | Tier | Key behavior |
|---|---|---|---|---|---|
| 1 | Begin | begin.py | Begin | T3 | Consumes kwargs["inputs"], resolves file inputs via FileService |
| 2 | UserFillUp | fillup.py | UserFillUp | T3 | Renders tips with variable interpolation; eino interrupt |
| 3 | Fillup | (alias) | Fillup | T3 | Thin alias of UserFillUp (disable enable_tips) |
| 4 | Message | message.py | Message | T3 | jinja2 prompt + stream + TTS + filegen + memory save |
| 5 | LLM | llm.py | LLM | T1 | Sync + async paths; chatModel.Generate / Stream; structured JSON output |
| 6 | Categorize | categorize.py | Categorize | T3 | LLM one-shot classification → _next (routing list) + category_name |
| 7 | Switch | switch.py | Switch | T2 | 12 operators; _next = matching downstream(s) |
| 8 | Agent | agent_with_tools.py | Agent | T1 | ReAct loop with LLMBundle + tool binding + citations |
| 9 | Iteration | iteration.py | Iteration | T4 | Compat stub → Parallel (Go) |
| 10 | IterationItem | iterationitem.py | IterationItem | T4 | Compat stub |
| 11 | Loop | loop.py | Loop | T4 | workflowx.AddLoopNode (Go) |
| 12 | LoopItem | loopitem.py | LoopItem | (none) | Engine-handled, not registered |
| 13 | ExitLoop | exit_loop.py | ExitLoop | (none) | legacyNoOpNames (Go) |
| 14 | Invoke | invoke.py | Invoke | T3 | HTTP GET/POST/PUT/PATCH/DELETE + headers/proxy/timeout |
| 15 | Browser | browser.py | Browser | T3 | LLM-driven browsing |
| 16 | DataOperations | data_operations.py | DataOperations | T3 | 7 ops: select_keys/literal_eval/combine/filter/append_or_update/remove/rename |
| 17 | ListOperations | list_operations.py | ListOperations | T3 | 6 ops: nth/head/tail/filter/sort/drop_duplicates |
| 18 | StringTransform | string_transform.py | StringTransform | T3 | split/merge/jinja2 template ops |
| 19 | VariableAggregator | variable_aggregator.py | VariableAggregator | T3 | Returns first non-empty in each variable group |
| 20 | VariableAssigner | variable_assigner.py | VariableAssigner | T3 | 11 ops |
| 21 | DocsGenerator | docs_generator.py | DocGenerator | T5 | MD → PDF/DOCX/TXT/MD/HTML |
| 22 | ExcelProcessor | excel_processor.py | ExcelProcessor | T5 | pandas read/write/merge/convert |
All tools extend ToolBase, expose get_meta() (OpenAI function-call schema), _invoke/_invoke_async.
| # | Tool | component_name | Behavior |
|---|---|---|---|
| 1 | AkShare | AkShare | Chinese financial data (HTTP) |
| 2 | ArXiv | ArXiv | export.arxiv.org/api/query search |
| 3 | CodeExec | CodeExec | gRPC client to Python sandbox; 5 sandbox providers in internal/agent/sandbox/ |
| 4 | Crawler | Crawler | Generic HTML scraper |
| 5 | DeepL | DeepL | DeepL Translate API (HTTP) |
| 6 | DuckDuckGo | DuckDuckGo | html.duckduckgo.com/html search |
| 7 | Email | SMTP send via smtplib | |
| 8 | ExeSQL | ExeSQL | MySQL/PG/MSSQL/Trino/OceanBase via stdlib database/sql |
| 9 | GitHub | GitHub | GitHub REST API search |
| 10 | Google | SerpAPI / Google CSE search | |
| 11 | GoogleScholar | GoogleScholar | Scholar via SerpAPI |
| 12 | Jin10 | Jin10 | Chinese financial news feed |
| 13 | PubMed | PubMed | NCBI E-utilities |
| 14 | QWeather | QWeather | HeFeng weather API |
| 15 | Retrieval | Retrieval | nlp.Dealer + kg.Retrieval (Go dual-registry) |
| 16 | SearXNG | SearXNG | Meta-search |
| 17 | TavilySearch | TavilySearch | Tavily search API |
| 18 | TavilyExtract | TavilyExtract | Tavily extract API |
| 19 | TuShare | TuShare | Tushare Chinese financial data |
| 20 | WenCai | WenCai | 同花顺 问财 stock Q&A |
| 21 | Wikipedia | Wikipedia | Wikipedia REST API |
| 22 | YahooFinance | YahooFinance | Yahoo Finance unofficial API |
| — | MCP | (server_id) | MCPToolAdapter over streamable-HTTP |
| ID | Question | Action | Effort |
|---|---|---|---|
| OQ #1 | Iteration semantic preservation | ✅ Done — engine design | — |
| OQ #2 | MCP tool priority | ✅ Done — thin wrapper | — |
| OQ #3 | DSL normalization | ✅ Done — Go-side + tools/migrate-canvas built | — |
| OQ #4 | History window behavior | ✅ Done — canvas-level session | — |
| OQ #5 | Citation injection scope | ✅ Done — LLM + Agent | — |
| OQ #6 | Component timeout granularity | ✅ Done — per-class table is a Go enhancement over Python's uniform 600s | — |
| OQ #7 | Universe A/B naming asymmetry | ✅ Done — keep dual-naming convention | — |
| OQ #8 | GraphRAG scope | ✅ Done — KGRetrievalAdapter wired | — |
| OQ #9 | generate legacy alias | ⏸️ Deferred | — |
| OQ #10 | Phase 5a vs 5b ordering | ✅ Done — single Retrieval milestone | — |
| OQ #11 | Per-component env-driven timeout | ✅ Done — canvas-level uniform 600s | — |
| OQ #12 | Embedding model port | ✅ Done — model provider architecture | — |
| OQ #13 | Switch operator coverage | ✅ Done — 12/12 | — |
| OQ #14 | Universe A SearchMyDataset alias | ✅ Done — 4 spellings | — |
| OQ #15 | LLM max_retries / delay_after_error | ✅ Done — retryInvoker.Unwrap() normal-absolute-count | — |
| OQ #16 | Phase 4.4 orchestrator side | ✅ Done — Runner.Run catches interrupt | — |
| OQ #17 | Phase 5d CodeExec full feature parity | ⏸️ Partial — 5 providers + artifacts/args/timeout/per-language base image done; GraphRAG adapter remains | 1-2 weeks |
| OQ #18 | Phase 8b real TTS engine | ✅ Done — dispatcher routes through 60+ model drivers, no shell-out needed | — |
| OQ #19 | Phase 8b real MemorySaver completion | ⏸️ Open | 1-2 weeks |
| OQ #20 | Phase 5c DB2 e2e demand | ⏸️ Open (CGO + native lib) | 0.5-1 week if needed |
| OQ #21 | Compile LRU cache | ⏸️ Open — defer until profiling | 1-2 weeks |
| OQ #22 | Phase 6 component hardening | ⏸️ Open — Browser Playwright parity + ExcelProcessor audit | 1-2 weeks |
| OQ #23 | tools/gen-component-parity script | ✅ Done | — |
Last verified: 2026-06-17