.agents/issue/workflow-deep-analysis.md
本报告对 dispatchWorkFlow 函数及其完整调用链进行了深度分析,包括:
dispatchWorkFlow (1277 行)WorkflowQueue 类及其所有方法replaceEditorVariable, getReferenceVariableValue, checkNodeRunStatus, filterWorkflowEdges 等位置: index.ts:851-909
问题代码:
const nodeOutput = (node: RuntimeNodeItemType, result: NodeResponseCompleteType) => {
// 1. 过滤边 - O(m)
const targetEdges = filterWorkflowEdges(runtimeEdges).filter(
(item) => item.source === node.nodeId
);
// 2. 遍历所有节点查找下一步节点 - O(n)
runtimeNodes.forEach((node) => {
// 3. 对每个节点,遍历 targetEdges - O(k)
if (targetEdges.some((item) => item.target === node.nodeId && item.status === 'active')) {
nextStepActiveNodesMap.set(node.nodeId, node);
}
if (targetEdges.some((item) => item.target === node.nodeId && item.status === 'skipped')) {
nextStepSkipNodesMap.set(node.nodeId, node);
}
});
};
复杂度分析:
nodeOutput内存影响:
优化方案:
// 🟢 预构建边索引(在 WorkflowQueue 构造函数中)
class WorkflowQueue {
private edgeIndex = {
bySource: new Map<string, RuntimeEdgeItemType[]>(),
byTarget: new Map<string, RuntimeEdgeItemType[]>()
};
constructor() {
// 一次性构建索引 - O(m)
const filteredEdges = filterWorkflowEdges(runtimeEdges);
filteredEdges.forEach(edge => {
if (!this.edgeIndex.bySource.has(edge.source)) {
this.edgeIndex.bySource.set(edge.source, []);
}
this.edgeIndex.bySource.get(edge.source)!.push(edge);
if (!this.edgeIndex.byTarget.has(edge.target)) {
this.edgeIndex.byTarget.set(edge.target, []);
}
this.edgeIndex.byTarget.get(edge.target)!.push(edge);
});
}
// 🟢 优化后的 nodeOutput - O(k),k 是目标边数量
const nodeOutput = (node: RuntimeNodeItemType, result: NodeResponseCompleteType) => {
// O(1) 查询
const targetEdges = this.edgeIndex.bySource.get(node.nodeId) || [];
// O(k) - 只遍历目标边
const nextStepActiveNodesMap = new Map<string, RuntimeNodeItemType>();
const nextStepSkipNodesMap = new Map<string, RuntimeNodeItemType>();
targetEdges.forEach((edge) => {
const targetNode = this.runtimeNodesMap.get(edge.target);
if (!targetNode) return;
if (edge.status === 'active') {
nextStepActiveNodesMap.set(targetNode.nodeId, targetNode);
} else if (edge.status === 'skipped') {
nextStepSkipNodesMap.set(targetNode.nodeId, targetNode);
}
});
return {
nextStepActiveNodes: Array.from(nextStepActiveNodesMap.values()),
nextStepSkipNodes: Array.from(nextStepSkipNodesMap.values())
};
};
}
收益:
位置: packages/global/core/workflow/runtime/utils.ts:495-597
问题分析:
export function replaceEditorVariable({ text, nodes, variables, depth = 0 }) {
// 1. 正则匹配所有变量 - O(m),m 是文本长度
const variablePattern = /\{\{\$([^.]+)\.([^$]+)\$\}\}/g;
const matches = [...text.matchAll(variablePattern)];
for (const match of matches) {
const nodeId = match[1];
const id = match[2];
// 2. 查找节点 - O(n)
const node = nodes.find((node) => node.nodeId === nodeId);
if (!node) return;
// 3. 查找输出 - O(k)
const output = node.outputs.find((output) => output.id === id);
if (output) return formatVariableValByType(output.value, output.valueType);
// 4. 查找输入 - O(k)
const input = node.inputs.find((input) => input.key === id);
if (input) return getReferenceVariableValue({ value: input.value, nodes, variables });
// ^^^^^ 又传递完整 nodes
}
// 5. 递归处理嵌套变量 - 最多 10 层
if (hasReplacements && /\{\{\$[^.]+\.[^$]+\$\}\}/.test(result)) {
result = replaceEditorVariable({ text: result, nodes, variables, depth: depth + 1 });
}
}
调用频率:
// 在 nodeRunWithActive 中,每个 input 都调用
node.inputs.forEach((input) => {
let value = replaceEditorVariable({
text: input.value,
nodes: runtimeNodes, // 传递 150 个节点
variables
});
});
复杂度估算:
为什么不能简单优化:
valueType 进行格式化node.inputsgetReferenceVariableValue 内部递归需要完整 nodes 数组可行的优化方案:
class WorkflowQueue {
// 🟢 构建节点索引
private nodeIndex = {
byId: new Map<string, RuntimeNodeItemType>(),
outputsByNodeId: new Map<string, Map<string, { value: any; valueType: WorkflowIOValueTypeEnum }>>(),
inputsByNodeId: new Map<string, Map<string, any>>()
};
constructor() {
runtimeNodes.forEach(node => {
this.nodeIndex.byId.set(node.nodeId, node);
// 索引 outputs
const outputsMap = new Map();
node.outputs.forEach(output => {
outputsMap.set(output.id, { value: output.value, valueType: output.valueType });
});
this.nodeIndex.outputsByNodeId.set(node.nodeId, outputsMap);
// 索引 inputs
const inputsMap = new Map();
node.inputs.forEach(input => {
inputsMap.set(input.key, input.value);
});
this.nodeIndex.inputsByNodeId.set(node.nodeId, inputsMap);
});
}
// 🟢 优化的变量查找
private getNodeOutput(nodeId: string, outputId: string) {
return this.nodeIndex.outputsByNodeId.get(nodeId)?.get(outputId);
}
private getNodeInput(nodeId: string, inputKey: string) {
return this.nodeIndex.inputsByNodeId.get(nodeId)?.get(inputKey);
}
}
// 🟢 修改 replaceEditorVariable 使用索引
export function replaceEditorVariable({
text,
nodeIndex, // 传递索引而不是完整数组
variables,
depth = 0
}) {
// ... 其他逻辑
const variableVal = (() => {
if (nodeId === VARIABLE_NODE_ID) {
return variables[id];
}
// O(1) 查询而不是 O(n)
const output = nodeIndex.outputsByNodeId.get(nodeId)?.get(id);
if (output) return formatVariableValByType(output.value, output.valueType);
const input = nodeIndex.inputsByNodeId.get(nodeId)?.get(id);
if (input) return getReferenceVariableValue({ value: input, nodeIndex, variables });
})();
}
收益:
位置: packages/global/core/workflow/runtime/utils.ts:297-413
问题代码:
export const checkNodeRunStatus = ({ nodesMap, node, runtimeEdges }) => {
const splitNodeEdges = (targetNode: RuntimeNodeItemType) => {
const commonEdges: RuntimeEdgeItemType[] = [];
const recursiveEdgeGroupsMap = new Map<string, RuntimeEdgeItemType[]>();
// 1. 获取所有源边 - O(m)
const sourceEdges = runtimeEdges.filter((item) => item.target === targetNode.nodeId);
sourceEdges.forEach((sourceEdge) => {
// 2. 使用栈进行深度遍历 - 最多 3000 次迭代
const stack: Array<{ edge: RuntimeEdgeItemType; visited: Set<string> }> = [
{ edge: sourceEdge, visited: new Set([targetNode.nodeId]) }
];
const MAX_DEPTH = 3000;
let iterations = 0;
while (stack.length > 0 && iterations < MAX_DEPTH) {
iterations++;
const { edge, visited } = stack.pop()!;
const sourceNode = nodesMap.get(edge.source);
if (!sourceNode) continue;
// 检查是否是起始节点
if (isStartNode(sourceNode.flowNodeType)) {
commonEdges.push(sourceEdge);
continue;
}
// 检查循环
if (edge.source === targetNode.nodeId) {
recursiveEdgeGroupsMap.set(edge.target, [
...(recursiveEdgeGroupsMap.get(edge.target) || []),
sourceEdge
]);
continue;
}
// 继续向上遍历
const nextEdges = runtimeEdges.filter((item) => item.target === edge.source);
for (const nextEdge of nextEdges) {
stack.push({
edge: nextEdge,
visited: new Set([...visited, edge.source])
});
}
}
});
return { commonEdges, recursiveEdgeGroups: Array.from(recursiveEdgeGroupsMap.values()) };
};
const { commonEdges, recursiveEdgeGroups } = splitNodeEdges(node);
// ... 检查逻辑
};
性能问题:
runtimeEdges.filter 被多次调用调用频率:
优化方案:
class WorkflowQueue {
// 🟢 缓存节点运行状态检查结果
private nodeStatusCache = new Map<string, 'run' | 'skip' | 'wait'>();
// 🟢 预构建边的反向索引
private edgesByTarget = new Map<string, RuntimeEdgeItemType[]>();
constructor() {
runtimeEdges.forEach(edge => {
if (!this.edgesByTarget.has(edge.target)) {
this.edgesByTarget.set(edge.target, []);
}
this.edgesByTarget.get(edge.target)!.push(edge);
});
}
private checkNodeCanRun(node: RuntimeNodeItemType) {
// 🟢 检查缓存
const cached = this.nodeStatusCache.get(node.nodeId);
if (cached) return cached;
// 使用索引而不是过滤
const sourceEdges = this.edgesByTarget.get(node.nodeId) || [];
const status = checkNodeRunStatus({
nodesMap: this.runtimeNodesMap,
node,
runtimeEdges,
sourceEdges // 传递预过滤的边
});
// 🟢 缓存结果(边状态变化时需要清除)
this.nodeStatusCache.set(node.nodeId, status);
return status;
}
// 🟢 边状态更新时清除相关缓存
private updateEdgeStatus(edge: RuntimeEdgeItemType, status: string) {
edge.status = status;
// 清除目标节点的缓存
this.nodeStatusCache.delete(edge.target);
}
}
收益:
位置: index.ts:515-570
问题代码:
function getNodeRunParams(node: RuntimeNodeItemType) {
const params: Record<string, any> = {};
node.inputs.forEach((input) => {
// 1. 第一次变量替换 - O(n)
let value = replaceEditorVariable({
text: input.value,
nodes: runtimeNodes,
variables
});
// 2. 第二次变量替换 - O(n)
value = getReferenceVariableValue({
value,
nodes: runtimeNodes,
variables
});
// 3. 类型格式化
params[input.key] = valueTypeFormat(value, input.valueType);
});
return params;
}
问题:
优化建议:
replaceEditorVariable 内部处理引用变量位置: index.ts:587-601
问题代码:
const dispatchData: ModuleDispatchProps<Record<string, any>> = {
...data, // 🔴 展开整个 data 对象
usagePush: this.usagePush.bind(this),
variables,
histories, // 🔴 完整的历史记录数组
node,
runtimeNodes, // 🔴 150 个节点的完整数组
runtimeEdges, // 🔴 所有边的完整数组
params,
mode: isDebugMode ? 'test' : data.mode
};
// 传递给每个节点处理函数
const dispatchRes = await callbackMap[node.flowNodeType](dispatchData);
内存影响:
dispatchData 对象...data 会复制所有属性优化方案:
// 🟢 创建共享的不可变数据
class WorkflowQueue {
private sharedDispatchData: Readonly<Omit<ModuleDispatchProps, 'node' | 'params'>>;
constructor() {
// 一次性创建共享数据
this.sharedDispatchData = Object.freeze({
...data,
usagePush: this.usagePush.bind(this),
variables,
histories,
runtimeNodes,
runtimeEdges
});
}
async nodeRunWithActive(node: RuntimeNodeItemType) {
const params = getNodeRunParams(node);
// 🟢 只传递变化的部分
const dispatchData = {
...this.sharedDispatchData,
node,
params
};
return await callbackMap[node.flowNodeType](dispatchData);
}
}
收益:
位置: index.ts:155-182, 207-215
问题代码:
// 🔴 100ms 定时器检查停止状态
const checkStoppingTimer = apiVersion === 'v2'
? setInterval(async () => {
stopping = await shouldWorkflowStop({
appId: runningAppInfo.id,
chatId
});
}, 100) // 每 100ms 触发一次
: undefined;
// 🔴 10秒定时器保持连接
streamCheckTimer = setInterval(() => {
data?.workflowStreamResponse?.({
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({ text: '' })
});
}, 10000);
问题:
shouldWorkflowStop)优化方案:
// 🟢 增加检查间隔
const checkStoppingTimer = apiVersion === 'v2'
? setInterval(async () => {
stopping = await shouldWorkflowStop({
appId: runningAppInfo.id,
chatId
});
}, 500) // 从 100ms 改为 500ms
: undefined;
// 🟢 添加定时器清理保护
const cleanupTimers = () => {
if (streamCheckTimer) {
clearInterval(streamCheckTimer);
streamCheckTimer = null;
}
if (checkStoppingTimer) {
clearInterval(checkStoppingTimer);
}
};
// 🟢 确保清理
try {
await runWorkflow(...);
} finally {
cleanupTimers();
}
收益:
位置: 多处调用
当前实现:
export const surrenderProcess = () => new Promise((resolve) => setImmediate(resolve));
调用频率:
startProcessing 循环中每次迭代调用processSkipNodes 中调用影响:
优化建议:
// 🟢 批量处理,减少 surrenderProcess 调用
private async startProcessing() {
let processedCount = 0;
const BATCH_SIZE = 5;
while (true) {
// 处理节点...
processedCount++;
// 每处理 5 个节点才让出一次
if (processedCount % BATCH_SIZE === 0) {
await surrenderProcess();
}
}
}
位置: 多处调用
问题:
// 在多个地方重复调用
const targetEdges = filterWorkflowEdges(runtimeEdges).filter(...);
优化方案:
class WorkflowQueue {
private filteredEdges: RuntimeEdgeItemType[];
constructor() {
// 🟢 一次性过滤
this.filteredEdges = filterWorkflowEdges(runtimeEdges);
}
// 使用缓存的过滤结果
private getTargetEdges(nodeId: string) {
return this.filteredEdges.filter(item => item.source === nodeId);
}
}
interface WorkflowMetrics {
// 性能指标
totalExecutionTime: number;
nodeExecutionTimes: Map<string, number>;
averageNodeTime: number;
// 内存指标
peakMemoryUsage: number;
averageMemoryUsage: number;
gcCount: number;
// 操作计数
nodeTraversalCount: number;
variableLookupCount: number;
edgeFilterCount: number;
statusCheckCount: number;
// 并发指标
maxConcurrentNodes: number;
averageConcurrentNodes: number;
queueWaitTime: number;
// 缓存指标
cacheHitRate: number;
cacheMissRate: number;
}
通过深度分析,发现了 8 个主要性能瓶颈:
核心问题: 缺少索引和缓存机制,导致大量 O(n) 和 O(n²) 操作。
解决方案: 通过预构建索引、缓存结果、批量处理等方式,将复杂度降低到 O(1) 或 O(k)。
预期收益: 实施 P0 和 P1 优化后,可以解决 90% 的性能问题。