.agents/design/core/workflow/runtime.md
FastGPT 工作流 Runtime 是一个基于有向图的工作流执行引擎,支持复杂的分支、循环、并行执行等场景。本文档详细描述了工作流 Runtime 的最新逻辑设计和实现。
工作流执行的核心类,负责管理节点执行队列和状态。
关键属性:
runtimeNodesMap: 节点 ID 到节点对象的映射edgeIndex: 边的索引(按 source 和 target 分组)nodeEdgeGroupsMap: 预构建的节点边分组 MapactiveRunQueue: 活跃运行队列skipNodeQueue: 跳过节点队列关键方法:
buildEdgeIndex(): 构建边索引buildNodeEdgeGroupsMap(): 预构建节点边分组getNodeRunStatus(): 获取节点运行状态addActiveNode(): 添加活跃节点到队列startProcessing(): 开始处理队列用于图论分析的核心算法模块,位于 packages/service/core/workflow/utils/tarjan.ts。
主要功能:
findSCCs(): 使用 Tarjan 算法找出所有强连通分量(SCC)classifyEdgesByDFS(): 使用 DFS 对边进行分类isNodeInCycle(): 判断节点是否在循环中getEdgeType(): 获取边的类型type EdgeStatus = 'waiting' | 'active' | 'skipped';
waiting: 等待执行active: 已激活(源节点已执行完成)skipped: 已跳过(源节点被跳过或分支未选中)type EdgeType = 'tree' | 'back' | 'forward' | 'cross';
tree: 树边(DFS 树中的边)back: 回边(循环边,从后代指向祖先)forward: 前向边(从祖先指向后代的非树边)cross: 跨边(连接不同子树的边)type NodeEdgeGroups = RuntimeEdgeItemType[][];
type NodeEdgeGroupsMap = Map<string, NodeEdgeGroups>;
每个节点的输入边被分成多个组,每组代表一个独立的执行路径。
1. 全局 DFS 边分类
└─> 识别回边(循环边)
2. Tarjan SCC 算法
└─> 找出所有强连通分量
└─> 判断节点是否在循环中
3. 为每个节点构建边分组
├─> 分类边:回边 vs 非回边
├─> 处理非回边
│ ├─> 节点在循环中 → 按 branchHandle 分组
│ └─> 节点不在循环中 → 所有非回边放在同一组
└─> 处理回边
└─> 按 branchHandle 分组
策略 1:节点不在循环中
策略 2:节点在循环中
findBranchHandle(edge) {
// 从边的源节点开始向上回溯
queue = [{ nodeId: edge.source, handle: edge.sourceHandle }]
while (queue.length > 0) {
{ nodeId, handle } = queue.shift()
// 如果当前节点是分支节点且有 handle,返回 handle
if (isBranchNode(node) && handle) {
return handle
}
// 继续向上回溯
for (inEdge of inEdges) {
newHandle = isBranchNode(sourceNode) ? inEdge.sourceHandle : handle
queue.push({ nodeId: inEdge.source, handle: newHandle })
}
}
return 'common'
}
getNodeRunStatus(node, nodeEdgeGroupsMap) {
edgeGroups = nodeEdgeGroupsMap.get(node.nodeId)
// 1. 没有输入边 → 入口节点,直接运行
if (!edgeGroups || edgeGroups.length === 0) {
return 'run'
}
// 2. 检查是否可以运行(任意一组边满足条件)
// 每组边内:至少有一个 active,且没有 waiting
if (edgeGroups.some(group =>
group.some(edge => edge.status === 'active') &&
group.every(edge => edge.status !== 'waiting')
)) {
return 'run'
}
// 3. 检查是否跳过(所有组的边都是 skipped)
if (edgeGroups.every(group =>
group.every(edge => edge.status === 'skipped')
)) {
return 'skip'
}
// 4. 否则等待
return 'wait'
}
规则 1:运行条件
规则 2:跳过条件
规则 3:等待条件
Tarjan 算法用于在有向图中找出所有强连通分量(Strongly Connected Components, SCC)。
强连通分量定义:
function findSCCs(runtimeNodes, edgeIndex) {
nodeToSCC = new Map()
sccSizes = new Map()
sccId = 0
stack = []
inStack = new Set()
lowLink = new Map()
discoveryTime = new Map()
time = 0
function tarjan(nodeId) {
// 初始化
discoveryTime.set(nodeId, time)
lowLink.set(nodeId, time)
time++
stack.push(nodeId)
inStack.add(nodeId)
// 遍历所有出边
for (edge of outEdges) {
targetId = edge.target
if (!discoveryTime.has(targetId)) {
// 未访问过,递归访问
tarjan(targetId)
lowLink.set(nodeId, min(lowLink.get(nodeId), lowLink.get(targetId)))
} else if (inStack.has(targetId)) {
// 在栈中,更新 lowLink
lowLink.set(nodeId, min(lowLink.get(nodeId), discoveryTime.get(targetId)))
}
}
// 如果是 SCC 的根节点
if (lowLink.get(nodeId) === discoveryTime.get(nodeId)) {
sccNodes = []
do {
w = stack.pop()
inStack.delete(w)
nodeToSCC.set(w, sccId)
sccNodes.push(w)
} while (w !== nodeId)
sccSizes.set(sccId, sccNodes.length)
sccId++
}
}
// 从所有未访问节点开始
for (node of runtimeNodes) {
if (!discoveryTime.has(node.nodeId)) {
tarjan(node.nodeId)
}
}
return { nodeToSCC, sccSizes }
}
使用深度优先搜索(DFS)对图中的边进行分类。
function classifyEdgesByDFS(runtimeNodes, edgeIndex) {
edgeTypes = new Map()
visited = new Set()
inStack = new Set()
discoveryTime = new Map()
finishTime = new Map()
time = 0
function dfs(nodeId) {
visited.add(nodeId)
inStack.add(nodeId)
discoveryTime.set(nodeId, ++time)
for (edge of outEdges) {
targetId = edge.target
if (!visited.has(targetId)) {
// 未访问 → 树边
edgeTypes.set(edgeKey, 'tree')
dfs(targetId)
} else if (inStack.has(targetId)) {
// 在当前路径上 → 回边(循环边)
edgeTypes.set(edgeKey, 'back')
} else if (discoveryTime.get(source) < discoveryTime.get(targetId)) {
// 从祖先指向后代 → 前向边
edgeTypes.set(edgeKey, 'forward')
} else {
// 跨边
edgeTypes.set(edgeKey, 'cross')
}
}
inStack.delete(nodeId)
finishTime.set(nodeId, ++time)
}
// 从所有入口节点开始 DFS
for (node of entryNodes) {
if (!visited.has(node.nodeId)) {
dfs(node.nodeId)
}
}
return edgeTypes
}
┌─ if ──→ B ──┐
start ──→ A ├──→ D
└─ else ─→ C ──┘
边分组:
运行逻辑:
start ──→ A ──→ B ──→ C ──┐
↑ |
└────────────────┘
边分组:
运行逻辑:
┌─ if ──→ B ──┐
start ──→ A ├──→ D ──┐
└─ else ─→ C ──┘ |
↑ |
└──────────────────────┘
边分组:
运行逻辑:
start ──→ A ──→ C
└──→ B ──→ C
边分组:
运行逻辑:
┌──selectedTools──→ Tool1 ──┐
start → Agent ─┤ ├──→ End
└──────────────────────────→ ┘
边分组:
运行逻辑:
优化前:
优化后:
优化前:
优化后:
优化前:
优化后:
测试文件:test/cases/global/core/workflow/dispatch/checkNodeRunStatus.test.ts
已覆盖场景:
测试结果:
问题: 场景14.7 测试失败,期望节点 F 在只有一条边 active 时等待,但实际返回 run。
原因: 场景14 包含了 D→E 的交叉路径,导致 F 的两条输入边(D→F 和 E→F)被分成了不同的组。当 D→F active 时,第一组满足条件,F 就可以运行。
解决方案: 删除场景14.7 测试,因为:
"或"关系:
"且"关系:
循环识别:
循环边分组:
应该避免的场景:
原因:
packages/service/core/workflow/dispatch/index.ts - WorkflowQueue 类packages/service/core/workflow/utils/tarjan.ts - Tarjan 算法packages/global/core/workflow/runtime/type.ts - 类型定义packages/global/core/workflow/runtime/utils.ts - 工具函数test/cases/global/core/workflow/dispatch/checkNodeRunStatus.test.ts - 节点状态判断测试test/cases/global/core/workflow/runtime/utils.test.ts - 工具函数测试.claude/issue/checkNodeRunStatus-test-fix.md - 测试修复文档.claude/issue/edge-grouping-*.md - 边分组问题分析文档FastGPT 工作流 Runtime 采用了基于图论的设计,通过 Tarjan SCC 算法和 DFS 边分类实现了对复杂工作流的支持。核心的边分组算法和节点状态判断逻辑经过了充分的测试验证,能够正确处理分支、循环、并行等各种场景。
通过预构建边分组、边索引等优化手段,Runtime 在保证正确性的同时也具有良好的性能表现。未来可以在并行执行、错误处理、可观测性等方面继续优化和增强。