.agents/design/core/workflow/index.md
FastGPT 工作流系统是一个基于 Node.js/TypeScript 的可视化工作流引擎,支持拖拽式节点编排、实时执行、并发控制和交互式调试。系统采用队列式执行架构,通过有向图模型实现复杂的业务逻辑编排。
FastGPT/
├── packages/
│ ├── global/core/workflow/ # 全局工作流类型和常量
│ │ ├── constants.ts # 工作流常量定义
│ │ ├── node/ # 节点类型定义
│ │ │ └── constant.ts # 节点枚举和配置
│ │ ├── runtime/ # 运行时类型和工具
│ │ │ ├── constants.ts # 运行时常量
│ │ │ ├── type.d.ts # 运行时类型定义
│ │ │ └── utils.ts # 运行时工具函数
│ │ ├── template/ # 节点模板定义
│ │ │ └── system/ # 系统节点模板
│ │ └── type/ # 类型定义
│ │ ├── node.d.ts # 节点类型
│ │ ├── edge.d.ts # 边类型
│ │ └── io.d.ts # 输入输出类型
│ └── service/core/workflow/ # 工作流服务层
│ ├── constants.ts # 服务常量
│ ├── dispatch/ # 调度器核心
│ │ ├── index.ts # 工作流执行引擎 ⭐
│ │ ├── constants.ts # 节点调度映射表
│ │ ├── type.d.ts # 调度器类型
│ │ ├── ai/ # AI相关节点
│ │ ├── tools/ # 工具节点
│ │ ├── dataset/ # 数据集节点
│ │ ├── interactive/ # 交互节点
│ │ ├── loop/ # 循环节点
│ │ └── plugin/ # 插件节点
│ └── utils.ts # 工作流工具函数
└── projects/app/src/
├── pages/api/v1/chat/completions.ts # 聊天API入口
└── pages/api/core/workflow/debug.ts # 工作流调试API
工作流执行引擎采用队列式架构,主要特点:
1. 初始化 WorkflowQueue 实例
2. 识别入口节点(isEntry=true)
3. 将入口节点加入 activeRunQueue
4. 循环处理活跃节点队列:
- 检查节点执行条件
- 执行节点或跳过节点
- 更新边状态
- 将后续节点加入队列
5. 处理跳过节点队列
6. 返回执行结果
enum FlowNodeTypeEnum {
// 基础节点
workflowStart: 'workflowStart', // 工作流开始
chatNode: 'chatNode', // AI对话
answerNode: 'answerNode', // 回答节点
// 数据集相关
datasetSearchNode: 'datasetSearchNode', // 数据集搜索
datasetConcatNode: 'datasetConcatNode', // 数据集拼接
// 控制流节点
ifElseNode: 'ifElseNode', // 条件判断
loop: 'loop', // 循环
loopStart: 'loopStart', // 循环开始
loopEnd: 'loopEnd', // 循环结束
// 交互节点
userSelect: 'userSelect', // 用户选择
formInput: 'formInput', // 表单输入
// 工具节点
httpRequest468: 'httpRequest468', // HTTP请求
code: 'code', // 代码执行
readFiles: 'readFiles', // 文件读取
variableUpdate: 'variableUpdate', // 变量更新
// AI相关
classifyQuestion: 'classifyQuestion', // 问题分类
contentExtract: 'contentExtract', // 内容提取
agent: 'tools', // 智能体
queryExtension: 'cfr', // 查询扩展
// 插件系统
pluginModule: 'pluginModule', // 插件模块
appModule: 'appModule', // 应用模块
tool: 'tool', // 工具调用
// 系统节点
systemConfig: 'userGuide', // 系统配置
globalVariable: 'globalVariable', // 全局变量
comment: 'comment' // 注释节点
}
每个节点类型都有对应的调度函数:
export const callbackMap: Record<FlowNodeTypeEnum, Function> = {
[FlowNodeTypeEnum.workflowStart]: dispatchWorkflowStart,
[FlowNodeTypeEnum.chatNode]: dispatchChatCompletion,
[FlowNodeTypeEnum.datasetSearchNode]: dispatchDatasetSearch,
[FlowNodeTypeEnum.httpRequest468]: dispatchHttp468Request,
[FlowNodeTypeEnum.ifElseNode]: dispatchIfElse,
[FlowNodeTypeEnum.agent]: dispatchRunTools,
// ... 更多节点调度函数
};
enum WorkflowIOValueTypeEnum {
string: 'string',
number: 'number',
boolean: 'boolean',
object: 'object',
arrayString: 'arrayString',
arrayNumber: 'arrayNumber',
arrayBoolean: 'arrayBoolean',
arrayObject: 'arrayObject',
chatHistory: 'chatHistory', // 聊天历史
datasetQuote: 'datasetQuote', // 数据集引用
dynamic: 'dynamic', // 动态类型
any: 'any'
}
interface RuntimeNodeItemType {
nodeId: string;
name: string;
flowNodeType: FlowNodeTypeEnum;
inputs: FlowNodeInputItemType[];
outputs: FlowNodeOutputItemType[];
isEntry?: boolean;
catchError?: boolean;
}
interface RuntimeEdgeItemType {
source: string;
target: string;
sourceHandle: string;
targetHandle: string;
status: 'waiting' | 'active' | 'skipped';
}
enum RuntimeEdgeStatusEnum {
waiting: 'waiting', // 等待执行
active: 'active', // 活跃状态
skipped: 'skipped' // 已跳过
}
工作流调试: /api/core/workflow/debug
聊天完成: /api/v1/chat/completions
优化代码: /api/core/workflow/optimizeCode
interface DispatchFlowResponse {
flowUsages: ChatNodeUsageType[];
debugResponse: WorkflowDebugResponse;
workflowInteractiveResponse?: WorkflowInteractiveResponseType;
toolResponse: ToolRunResponseItemType;
assistantResponses: AIChatItemValueItemType[];
runTimes: number;
newVariables: Record<string, string>;
durationSeconds: number;
}
FlowNodeTypeEnum 中添加新类型callbackMap 中注册调度函数dispatch/ 目录下实现节点逻辑template/system/ 中定义节点模板/api/core/workflow/debug 进行测试projects/app/src/
├── pageComponents/app/detail/ # 应用详情页面
│ ├── Workflow/ # 工作流主页面
│ │ ├── Header.tsx # 工作流头部
│ │ └── index.tsx # 工作流入口
│ ├── WorkflowComponents/ # 工作流核心组件
│ │ ├── context/ # 状态管理上下文
│ │ │ ├── index.tsx # 主上下文提供者 ⭐
│ │ │ ├── workflowInitContext.tsx # 初始化上下文
│ │ │ ├── workflowEventContext.tsx # 事件上下文
│ │ │ └── workflowStatusContext.tsx # 状态上下文
│ │ ├── Flow/ # ReactFlow核心组件
│ │ │ ├── index.tsx # 工作流画布 ⭐
│ │ │ ├── components/ # 工作流UI组件
│ │ │ ├── hooks/ # 工作流逻辑钩子
│ │ │ └── nodes/ # 节点渲染组件
│ │ ├── constants.tsx # 常量定义
│ │ └── utils.ts # 工具函数
│ └── HTTPTools/ # HTTP工具页面
│ └── Edit.tsx # HTTP工具编辑器
├── web/core/workflow/ # 工作流核心逻辑
│ ├── api.ts # API调用 ⭐
│ ├── adapt.ts # 数据适配
│ ├── type.d.ts # 类型定义
│ └── utils.ts # 工具函数
└── global/core/workflow/ # 全局工作流定义
└── api.d.ts # API类型定义
前端采用分层Context架构,实现状态的高效管理和组件间通信:
// 1. ReactFlowCustomProvider - 最外层提供者
ReactFlowProvider → WorkflowInitContextProvider →
WorkflowContextProvider → WorkflowEventContextProvider →
WorkflowStatusContextProvider → children
// 2. 四层核心Context
- WorkflowInitContext: 节点数据和基础状态
- WorkflowDataContext: 节点/边操作和状态
- WorkflowEventContext: 事件处理和UI控制
- WorkflowStatusContext: 保存状态和父节点管理
interface WorkflowContextType {
// 节点管理
nodeList: FlowNodeItemType[];
onChangeNode: (props: FlowNodeChangeProps) => void;
onUpdateNodeError: (nodeId: string, isError: boolean) => void;
getNodeDynamicInputs: (nodeId: string) => FlowNodeInputItemType[];
// 边管理
onDelEdge: (edgeProps: EdgeDeleteProps) => void;
// 版本控制
past: WorkflowSnapshotsType[];
future: WorkflowSnapshotsType[];
undo: () => void;
redo: () => void;
pushPastSnapshot: (snapshot: SnapshotProps) => boolean;
// 调试功能
workflowDebugData?: DebugDataType;
onNextNodeDebug: (debugData: DebugDataType) => Promise<void>;
onStartNodeDebug: (debugProps: DebugStartProps) => Promise<void>;
onStopNodeDebug: () => void;
// 数据转换
flowData2StoreData: () => StoreWorkflowType;
splitToolInputs: (inputs, nodeId) => ToolInputsResult;
}
const nodeTypes: Record<FlowNodeTypeEnum, React.ComponentType> = {
[FlowNodeTypeEnum.workflowStart]: NodeWorkflowStart,
[FlowNodeTypeEnum.chatNode]: NodeSimple,
[FlowNodeTypeEnum.datasetSearchNode]: NodeSimple,
[FlowNodeTypeEnum.httpRequest468]: NodeHttp,
[FlowNodeTypeEnum.ifElseNode]: NodeIfElse,
[FlowNodeTypeEnum.agent]: NodeAgent,
[FlowNodeTypeEnum.code]: NodeCode,
[FlowNodeTypeEnum.loop]: NodeLoop,
[FlowNodeTypeEnum.userSelect]: NodeUserSelect,
[FlowNodeTypeEnum.formInput]: NodeFormInput,
// ... 40+ 种节点类型
};
nodes/
├── NodeSimple.tsx # 通用简单节点
├── NodeWorkflowStart.tsx # 工作流开始节点
├── NodeAgent.tsx # AI智能体节点
├── NodeHttp/ # HTTP请求节点
├── NodeCode/ # 代码执行节点
├── Loop/ # 循环节点组
├── NodeFormInput/ # 表单输入节点
├── NodePluginIO/ # 插件IO节点
├── NodeToolParams/ # 工具参数节点
└── render/ # 渲染组件库
├── NodeCard.tsx # 节点卡片容器
├── RenderInput/ # 输入渲染器
├── RenderOutput/ # 输出渲染器
└── templates/ # 输入模板组件
// 支持多种输入类型
const inputTemplates = {
reference: ReferenceTemplate, // 引用其他节点
input: TextInput, // 文本输入
textarea: TextareaInput, // 多行文本
selectApp: AppSelector, // 应用选择器
selectDataset: DatasetSelector, // 数据集选择
settingLLMModel: LLMModelConfig, // AI模型配置
// ... 更多模板类型
};
interface DebugDataType {
runtimeNodes: RuntimeNodeItemType[];
runtimeEdges: RuntimeEdgeItemType[];
entryNodeIds: string[];
variables: Record<string, any>;
history?: ChatItemMiniType[];
query?: UserChatItemValueItemType[];
workflowInteractiveResponse?: WorkflowInteractiveResponseType;
}
// ChatTest组件提供实时工作流测试
<ChatTest
isOpen={isOpenTest}
nodes={workflowTestData?.nodes}
edges={workflowTestData?.edges}
onClose={onCloseTest}
chatId={chatId}
/>
// 工作流调试API
export const postWorkflowDebug = (data: PostWorkflowDebugProps) =>
POST<PostWorkflowDebugResponse>(
'/core/workflow/debug',
{ ...data, mode: 'debug' },
{ timeout: 300000 }
);
// 支持的API操作
- 工作流调试和测试
- 节点模板获取
- 插件配置管理
- 版本控制操作
// 数据转换适配
- storeNode2FlowNode: 存储节点 → Flow节点
- storeEdge2RenderEdge: 存储边 → 渲染边
- uiWorkflow2StoreWorkflow: UI工作流 → 存储格式
- adaptCatchError: 错误处理适配
const keyboardShortcuts = {
'Ctrl+Z': undo, // 撤销
'Ctrl+Y': redo, // 重做
'Ctrl+S': saveWorkflow, // 保存工作流
'Delete': deleteSelectedNodes, // 删除选中节点
'Escape': cancelCurrentOperation, // 取消当前操作
};
// 快照系统
interface WorkflowSnapshotsType {
nodes: Node[];
edges: Edge[];
chatConfig: AppChatConfigType;
title: string;
isSaved?: boolean;
}
// 动态加载节点组件
const nodeTypes: Record<FlowNodeTypeEnum, any> = {
[FlowNodeTypeEnum.workflowStart]: dynamic(() => import('./nodes/NodeWorkflowStart')),
[FlowNodeTypeEnum.httpRequest468]: dynamic(() => import('./nodes/NodeHttp')),
// ... 按需加载
};
// 节点模板扩展
interface NodeTemplateListItemType {
id: string;
flowNodeType: FlowNodeTypeEnum;
templateType: string;
avatar?: string;
name: string;
intro?: string;
isTool?: boolean;
pluginId?: string;
}