docs/research/AI-Trader-Analysis-Report.md
项目名称: AI-Trader: Benchmarking Autonomous Agents in Real-Time Financial Markets 研究机构: 香港大学数据智能实验室 (HKUDS) 论文编号: arXiv:2512.10971 报告日期: 2025年12月28日 报告版本: v2.0
本报告对香港大学数据智能实验室开发的 AI-Trader 系统进行深度技术分析。AI-Trader 是全球首个面向大语言模型(LLM)交易代理的全自动化、实时、数据无污染评测基准平台。本报告从系统架构、核心创新、Agent实现、工具链设计、评测方法论等维度进行全面剖析,旨在为该领域研究人员提供详尽的技术参考。
关键词: 大语言模型, 自主交易代理, 金融市场, 基准测试, ReAct框架, 工具调用
随着大语言模型(Large Language Models, LLMs)在自然语言处理领域取得突破性进展,研究者们开始探索将其应用于金融交易决策。然而,现有评测方法存在以下关键问题:
问题一: 数据污染 (Data Contamination)
现代LLMs的训练语料库通常包含大量历史金融数据。当使用历史数据进行回测时,模型可能已经"见过"测试期间的市场走势,导致评测结果失真。
训练数据时间线: ←──────────────────────────────→
|←── 训练数据 ──→|
测试数据时间线: |←── 测试期间 ──→|
↑
数据污染风险区域
问题二: 非实时性 (Non-Real-Time)
大多数现有评测基于历史数据回测,无法反映真实市场的动态特性:
问题三: 人工干预 (Human Intervention)
传统评测方法中,系统会向模型提供大量预处理数据(技术指标、新闻摘要等),这种方式:
问题四: 缺乏标准 (Lack of Standardization)
不同研究使用不同的:
导致结果无法直接比较。
AI-Trader项目旨在建立一个:
| 研究/系统 | 实时性 | 工具调用 | 多步推理 | 多市场 | 开源 |
|---|---|---|---|---|---|
| FinGPT | ❌ | ❌ | ❌ | ❌ | ✅ |
| BloombergGPT | ❌ | ❌ | ❌ | ❌ | ❌ |
| FinRL | ❌ | ❌ | ❌ | ✅ | ✅ |
| TradingGPT | ❌ | ❌ | ✅ | ❌ | ❌ |
| AI-Trader | ✅ | ✅ | ✅ | ✅ | ✅ |
AI-Trader 定位为评测基准平台(Benchmarking Platform),而非生产交易系统。其核心价值在于:
| 资源类型 | 链接 |
|---|---|
| GitHub仓库 | https://github.com/HKUDS/AI-Trader |
| 论文全文 | https://arxiv.org/abs/2512.10971 |
| 实时Dashboard | https://ai4trade.ai |
| 项目主页 | https://hkuds.github.io/AI-Trader/ |
系统支持三大金融市场,覆盖不同交易规则和市场特征:
| 市场 | 标的范围 | 初始资金 | 交易频率 | 结算规则 |
|---|---|---|---|---|
| 美股 | NASDAQ-100成分股 | $10,000 USD | 日级/小时级 | T+0 |
| A股 | SSE-50成分股 | ¥100,000 CNY | 日级 | T+1 |
| 加密货币 | BTC, ETH, XRP, SOL等10种 | 50,000 USDT | 小时级 | 24/7 |
系统已集成以下模型进行评测:
| 提供商 | 模型 | 参数规模 | 特点 |
|---|---|---|---|
| OpenAI | GPT-4o | ~1.8T | 多模态,推理能力强 |
| OpenAI | GPT-4o-mini | ~70B | 成本优化版本 |
| Anthropic | Claude-3.5-Sonnet | ~175B | 安全对齐,长上下文 |
| Gemini-2.0-Flash | ~100B | 快速响应 | |
| DeepSeek | DeepSeek-V3 | 671B | 开源,工具调用优化 |
| DeepSeek | DeepSeek-R1 | - | 推理增强 |
| 阿里云 | Qwen-2.5-72B | 72B | 中文优化 |
| Meta | Llama-3.1-405B | 405B | 开源最大模型 |
最小信息范式(Minimal Information Paradigm) 是AI-Trader的核心创新。该范式遵循以下原则:
系统仅向Agent提供完成任务所需的最小上下文信息,让Agent自主决定需要获取什么信息、如何获取、如何验证。
传统范式与最小信息范式的对比:
┌────────────────────────────────────────────────────────────────────┐
│ 传统范式 │
├────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 数据源 │ → │ 数据预处理│ → │ AI模型 │ → │ 决策输出 │ │
│ │ │ │ │ │ │ │ │ │
│ │ • K线 │ │ • 计算指标│ │ • 分析 │ │ • 买入 │ │
│ │ • 新闻 │ │ • 提取特征│ │ • 推理 │ │ • 卖出 │ │
│ │ • 公告 │ │ • 格式化 │ │ │ │ • 持有 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
│ │
│ 问题: AI被动接收,无法评估其信息获取和验证能力 │
│ │
└────────────────────────────────────────────────────────────────────┘
┌────────────────────────────────────────────────────────────────────┐
│ 最小信息范式 │
├────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────┐ ┌──────────────────────────────┐ ┌──────────┐ │
│ │ 最小上下文│ → │ AI Agent │ → │ 决策输出 │ │
│ │ │ │ │ │ │ │
│ │ • 日期 │ │ 思考 → 行动 → 观察 → 思考...│ │ • 买入 │ │
│ │ • 资产表 │ │ ↓ │ │ • 卖出 │ │
│ │ • 持仓 │ │ 调用工具获取信息 │ │ • 持有 │ │
│ │ • 工具表 │ │ 验证信息可靠性 │ │ │ │
│ └──────────┘ └──────────────────────────────┘ └──────────┘ │
│ │
│ 优势: 评估AI的完整决策链,包括信息获取、验证、推理能力 │
│ │
└────────────────────────────────────────────────────────────────────┘
系统仅提供以下信息作为初始上下文:
minimal_context = {
"current_date": "2024-12-28", # 当前模拟日期
"available_assets": ["AAPL", "MSFT", ...], # 可交易资产列表
"current_positions": { # 当前持仓
"AAPL": {"quantity": 100, "avg_price": 150.0},
...
},
"available_cash": 5000.0, # 可用现金
"available_tools": [ # 可用工具列表
"get_price",
"search_news",
"execute_trade",
"calculate"
]
}
明确不提供:
最小信息范式源于以下认知科学和AI研究观点:
认知负荷理论(Cognitive Load Theory)
具身认知(Embodied Cognition)
元认知评估(Metacognitive Assessment)
┌────────��────────────────────────────────────────────────────────────────┐
│ AI-Trader System Architecture │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Presentation Layer │ │
│ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │
│ │ │ Web Dashboard │ │ Leaderboard │ │ Reasoning View │ │ │
│ │ │ (ai4trade.ai) │ │ │ │ │ │ │
│ │ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │ │
│ └───────────┼────────────────────┼────────────────────┼───────────┘ │
│ │ │ │ │
│ └────────────────────┼────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Agent Orchestration Layer │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────────────────┐ │ │
│ │ │ Agent Factory │ │ │
│ │ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ │
│ │ │ │US Agent │ │CN Agent │ │Crypto │ │Custom │ │ │ │
│ │ │ │ │ │ │ │Agent │ │Agent │ │ │ │
│ │ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │
│ │ │ └────────────┼────────────┴────────────┘ │ │ │
│ │ │ │ │ │ │
│ │ │ ▼ │ │ │
│ │ │ ┌──────────────────────────────────────────────────┐ │ │ │
│ │ │ │ BaseAgent (Core) │ │ │ │
│ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │
│ │ │ │ │ ReAct Loop │ │Tool Manager│ │State Tracker│ │ │ │ │
│ │ │ │ └────────────┘ └────────────┘ └────────────┘ │ │ │ │
│ │ │ └──────────────────────────────────────────────────┘ │ │ │
│ │ └──────────────────────────────────────────────────────────┘ │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ MCP Tool Services Layer │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │ │
│ │ │tool_trade│ │tool_price│ │tool_search│ │tool_math│ │ │
│ │ │ :8001 │ │ :8002 │ │ :8003 │ │ :8004 │ │ │
│ │ │ │ │ │ │ │ │ │ │ │
│ │ │ 交易执行 │ │ 价格查询 │ │ 信息搜索 │ │ 数学计算 │ │ │
│ │ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │ │
│ │ │ │
│ │ ┌──────────┐ ┌──────────┐ │ │
│ │ │tool_news │ │tool_crypto│ │ │
│ │ │ :8005 │ │ :8006 │ │ │
│ │ │ │ │ │ │ │
│ │ │ 新闻获取 │ │加密货币 │ │ │
│ │ └──────────┘ └──────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Data Infrastructure Layer │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │Alpha Vantage │ │ Tushare │ │ Local JSONL │ │ │
│ │ │ │ │ │ │ │ │ │
│ │ │ • US Stocks │ │ • A-Shares │ │ • Historical │ │ │
│ │ │ • Crypto │ │ • CN Market │ │ • Cache │ │ │
│ │ │ • News API │ │ • Holidays │ │ • Replay │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ LLM Provider Layer │ │
│ │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │ GPT-4o │ │Claude-3│ │DeepSeek│ │ Gemini │ │ Qwen │ │ │
│ │ └────────┘ └────────┘ └────────┘ └────────┘ └────────┘ │ │
│ │ │ │
│ │ ┌────────┐ ┌────────┐ ┌────────┐ │ │
│ │ │Llama-3 │ │ Mixtral│ │ Custom │ │ │
│ │ └────────┘ └────────┘ └────────┘ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
AI-Trader/
├── agent/ # Agent实现模块
│ ├── base_agent/ # 美股Agent
│ │ ├── base_agent.py # 日级交易Agent
│ │ └── base_agent_hour.py # 小时级交易Agent
│ ├── base_agent_astock/ # A股Agent
│ │ └── base_agent_astock.py # 含T+1规则适配
│ └── base_agent_crypto/ # 加密货币Agent
│ └── base_agent_crypto.py # 24/7交易适配
│
├── agent_tools/ # MCP工具服务模块
│ ├── start_mcp_services.py # 服务启动脚本
│ ├── tool_trade.py # 交易执行服务
│ ├── tool_get_price_local.py # 价格查询服务(本地)
│ ├── tool_get_price_av.py # 价格查询服务(Alpha Vantage)
│ ├── tool_jina_search.py # 信息搜索服务(Jina AI)
│ ├── tool_math.py # 数学计算服务
│ ├── tool_alphavantage_news.py # 新闻获取服务
│ └── tool_crypto_trade.py # 加密货币交易服务
│
├── prompts/ # Prompt模板模块
│ ├── agent_prompt.py # 通用交易Prompt
│ ├── agent_prompt_astock.py # A股专用Prompt
│ └── agent_prompt_crypto.py # 加密货币专用Prompt
│
├── configs/ # 配置文件模块
│ ├── gpt4o_config.yaml # GPT-4o配置
│ ├── claude_config.yaml # Claude配置
│ ├── deepseek_config.yaml # DeepSeek配置
│ └── ... # 其他模型配置
│
├── data/ # 数据存储模块
│ ├── us/ # 美股历史数据
│ │ └── {symbol}_{date}.jsonl
│ ├── cn/ # A股历史数据
│ │ └── {symbol}_{date}.jsonl
│ └── crypto/ # 加密货币历史数据
│ └── {symbol}_{date}.jsonl
│
├── results/ # 结果存储模块
│ ├── {model}/ # 按模型分组
│ │ ├── decisions/ # 决策记录
│ │ ├── reasoning/ # 推理链记录
│ │ └── metrics/ # 指标统计
│
├── scripts/ # 脚本工具
│ ├── run_benchmark.sh # 运行基准测试
│ ├── fetch_data.py # 数据获取脚本
│ └── analyze_results.py # 结果分析脚本
│
├── main.py # 单Agent运行入口
├── main_parallel.py # 多Agent并行运行入口
├── requirements.txt # Python依赖
└── README.md # 项目说明
| 层级 | 技术 | 版本 | 用途说明 |
|---|---|---|---|
| AI框架 | LangChain | 0.1.x | Agent编排、消息管理、工具绑定 |
| 工具协议 | FastMCP | 0.4.x | Model Context Protocol实现 |
| LLM接口 | OpenAI SDK | 1.x | 统一API调用接口 |
| HTTP服务 | FastAPI | 0.100+ | MCP工具服务 |
| 数据获取 | Alpha Vantage | - | 美股/加密货币数据 |
| 数据获取 | Tushare | - | A股数据 |
| 搜索服务 | Jina AI | - | 网络信息检索 |
| 数据存储 | JSONL | - | 高效追加写入 |
| 异步框架 | asyncio | - | 并发执行 |
| 运行环境 | Python | 3.10+ | 主要开发语言 |
┌────────────────────────────────────────────────────────────────────────┐
│ 数据流图 │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ 1. 交易会话初始化 │
│ ┌──────────────┐ │
│ │ Scheduler │ ─── 触发交易会话 ───→ │ Agent │ │
│ │ (定时任务) │ └───┬───┘ │
│ └──────────────┘ │ │
│ ▼ │
│ 2. 构建最小上下文 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Date/Assets │ + │ Positions │ + │ Tools │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │ │ │ │
│ └───────────────────┼───────────────────┘ │
│ ▼ │
│ ┌─────────────────┐ │
│ │ System Prompt │ │
│ └────────┬────────┘ │
│ │ │
│ 3. ReAct循环 (最多10步) │ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │ │
│ │ │ Thought │ → │ Action │ → │Observat.│ → │ Thought... │ │ │
│ │ │ │ │(Tool │ │(Tool │ │ │ │ │
│ │ │ "我需要 │ │ Call) │ │ Result) │ │ "价格是... │ │ │
│ │ │ 查价格" │ │ │ │ │ │ 我应该..." │ │ │
│ │ └─────────┘ └────┬────┘ └────┬────┘ └─────────────┘ │ │
│ │ │ ▲ │ │
│ │ ▼ │ │ │
│ │ ┌─────────────────────────┐ │ │
│ │ │ MCP Tool Service │ │ │
│ │ │ • get_price │ │ │
│ │ │ • search_news │ │ │
│ │ │ • execute_trade │ │ │
│ │ └─────────────────────────┘ │ │
│ │ │ │
│ │ 终止条件: Agent输出[STOP]信号 或 达到最大步数 │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ 4. 结果处理 │ │
│ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 决策提取 │ → │ 持仓更新 │ → │ 结果存储 │ │
│ │ │ │ │ │ │ │
│ │ • 交易指令 │ │ • 执行交易 │ │ • JSONL写入 │ │
│ │ • 推理链 │ │ • 更新余额 │ │ • 指标计算 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────┘
class BaseAgent:
"""
基础交易Agent
实现了完整的ReAct推理循环,支持工具调用和多步决策。
Attributes:
signature (str): Agent唯一标识符
basemodel (str): 使用的LLM模型名称
stock_symbols (List[str]): 可交易资产列表
mcp_config (dict): MCP服务配置
max_steps (int): 最大推理步数
initial_cash (float): 初始资金
market (str): 市场类型
verbose (bool): 调试输出开关
核心组件:
llm: LangChain ChatModel实例
tools: 已绑定的工具列表
mcp_client: MCP客户端连接
portfolio: 持仓管理器
logger: 日志记录器
"""
def __init__(
self,
signature: str,
basemodel: str,
stock_symbols: List[str] = NASDAQ_100,
mcp_config: dict = None,
max_steps: int = 10,
initial_cash: float = 10000.0,
market: str = "us",
verbose: bool = False
):
self.signature = signature
self.basemodel = basemodel
self.stock_symbols = stock_symbols
self.max_steps = max_steps
self.initial_cash = initial_cash
self.market = market
self.verbose = verbose
# 延迟初始化的组件
self.llm = None
self.tools = []
self.mcp_client = None
self.portfolio = Portfolio(initial_cash)
self.logger = setup_logger(signature)
async def initialize(self):
"""
异步初始化Agent
执行步骤:
1. 建立与MCP服务的连接
2. 加载可用工具列表
3. 初始化LLM客户端
4. 绑定工具到LLM
"""
# Step 1: 连接MCP服务
self.mcp_client = MCPClient()
await self.mcp_client.connect(self.mcp_config)
# Step 2: 加载工具
self.tools = await self.mcp_client.list_tools()
self.logger.info(f"Loaded {len(self.tools)} tools: {[t.name for t in self.tools]}")
# Step 3: 初始化LLM (根据模型类型选择适配器)
self.llm = self._create_llm_client()
# Step 4: 绑定工具
self.llm = self.llm.bind_tools(
self.tools,
tool_choice="auto" # 让模型自动选择是否调用工具
)
def _create_llm_client(self):
"""
创建LLM客户端
根据模型名称选择合适的客户端类型:
- DeepSeek模型使用DeepSeekChatOpenAI (处理格式差异)
- 其他模型使用标准ChatOpenAI
"""
model_lower = self.basemodel.lower()
if "deepseek" in model_lower:
return DeepSeekChatOpenAI(
model=self.basemodel,
temperature=0.7,
api_key=os.environ.get("DEEPSEEK_API_KEY"),
base_url="https://api.deepseek.com/v1"
)
elif "claude" in model_lower:
return ChatAnthropic(
model=self.basemodel,
temperature=0.7,
api_key=os.environ.get("ANTHROPIC_API_KEY")
)
elif "gemini" in model_lower:
return ChatGoogleGenerativeAI(
model=self.basemodel,
temperature=0.7,
api_key=os.environ.get("GOOGLE_API_KEY")
)
else:
# 默认使用OpenAI兼容接口
return ChatOpenAI(
model=self.basemodel,
temperature=0.7,
api_key=os.environ.get("OPENAI_API_KEY")
)
def _build_system_prompt(self, date: str) -> str:
"""
构建系统提示词
遵循最小信息范式,只提供必要上下文
"""
positions_str = self._format_positions()
tools_str = self._format_tools()
return f"""You are an autonomous trading agent operating in the {self.market} market.
## Current Date
{date}
## Your Portfolio
- Available Cash: ${self.portfolio.cash:.2f}
- Current Positions:
{positions_str}
## Tradable Assets
{', '.join(self.stock_symbols)}
## Available Tools
{tools_str}
## Trading Rules
- You operate in {self.market} market
- Settlement: {'T+0 (can sell same day)' if self.market != 'cn' else 'T+1 (can only sell next day)'}
- Trading Hours: {'24/7' if self.market == 'crypto' else '9:30-16:00'}
## Instructions
1. Use the available tools to gather information before making decisions
2. Think step by step about market conditions
3. Consider risk management in your decisions
4. Explain your reasoning clearly
5. When ready to finalize decisions for today, output [STOP]
## Important
- You MUST search for relevant information before making trading decisions
- Do NOT assume any market information - use tools to verify
- Consider multiple sources of information
- Be explicit about your reasoning process"""
async def run_trading_session(self, date: str) -> TradingSessionResult:
"""
执行单日交易会话
实现ReAct (Reasoning + Acting) 循环:
- Thought: 模型思考当前状态
- Action: 模型选择执行的工具
- Observation: 工具返回结果
- (循环直到[STOP]或达到max_steps)
Args:
date: 交易日期 (YYYY-MM-DD格式)
Returns:
TradingSessionResult: 包含决策、推理链、持仓变化的结果对象
"""
self.logger.info(f"Starting trading session for {date}")
# 1. 构建初始消息
system_prompt = self._build_system_prompt(date)
messages = [SystemMessage(content=system_prompt)]
# 2. 记录推理链
reasoning_chain = []
# 3. ReAct循环
for step in range(self.max_steps):
step_record = {
"step": step + 1,
"timestamp": datetime.now().isoformat(),
"thought": None,
"action": None,
"observation": None
}
try:
# 3.1 调用LLM获取响应
response = await self._ainvoke_with_retry(messages)
# 3.2 记录思考内容
step_record["thought"] = response.content
# 3.3 处理工具调用
if response.tool_calls:
tool_results = []
for tool_call in response.tool_calls:
# 记录动作
step_record["action"] = {
"tool": tool_call["name"],
"arguments": tool_call["args"]
}
# 执行工具
result = await self._execute_tool(tool_call)
tool_results.append({
"tool_call_id": tool_call["id"],
"result": result
})
# 记录观察
step_record["observation"] = result
# 添加AI消息和工具结果到历史
messages.append(AIMessage(
content=response.content or "",
tool_calls=response.tool_calls
))
for tr in tool_results:
messages.append(ToolMessage(
content=str(tr["result"]),
tool_call_id=tr["tool_call_id"]
))
else:
# 无工具调用,直接添加响应
messages.append(response)
# 3.4 保存步骤记录
reasoning_chain.append(step_record)
# 3.5 检查停止条件
if self._should_stop(response):
self.logger.info(f"Agent decided to stop at step {step + 1}")
break
except Exception as e:
self.logger.error(f"Step {step + 1} failed: {e}")
step_record["error"] = str(e)
reasoning_chain.append(step_record)
continue
# 4. 提取最终决策
decisions = self._extract_decisions(messages)
# 5. 执行交易并更新持仓
execution_results = await self._execute_decisions(decisions, date)
# 6. 构建返回结果
result = TradingSessionResult(
date=date,
decisions=decisions,
execution_results=execution_results,
reasoning_chain=reasoning_chain,
final_portfolio=self.portfolio.snapshot(),
steps_taken=len(reasoning_chain),
tokens_used=self._count_tokens(messages)
)
# 7. 持久化结果
self._save_session_result(result)
return result
def _should_stop(self, response) -> bool:
"""检查是否应该停止推理循环"""
if response.content and "[STOP]" in response.content:
return True
return False
async def _execute_tool(self, tool_call: dict) -> Any:
"""
执行工具调用
Args:
tool_call: 包含工具名称和参数的字典
{
"id": "call_xxx",
"name": "get_price",
"args": {"symbol": "AAPL", "data_type": "current"}
}
Returns:
工具执行结果
"""
tool_name = tool_call["name"]
tool_args = tool_call["args"]
self.logger.debug(f"Executing tool: {tool_name} with args: {tool_args}")
try:
# 通过MCP客户端调用工具
result = await self.mcp_client.call_tool(tool_name, tool_args)
self.logger.debug(f"Tool result: {result}")
return result
except ToolExecutionError as e:
self.logger.warning(f"Tool {tool_name} execution failed: {e}")
return {"error": str(e)}
except TimeoutError:
self.logger.warning(f"Tool {tool_name} timed out")
return {"error": "Tool execution timed out"}
async def _ainvoke_with_retry(
self,
messages: List[BaseMessage],
max_retries: int = 3,
base_delay: float = 1.0
) -> AIMessage:
"""
带重试的LLM调用
实现指数退避策略处理常见错误:
- RateLimitError: API速率限制
- APIError: 服务端错误
- TimeoutError: 请求超时
Args:
messages: 消息列表
max_retries: 最大重试次数
base_delay: 基础延迟时间(秒)
Returns:
AIMessage: LLM响应
Raises:
最后一次异常(如果所有重试都失败)
"""
last_exception = None
for attempt in range(max_retries):
try:
response = await self.llm.ainvoke(messages)
return response
except RateLimitError as e:
last_exception = e
delay = base_delay * (2 ** attempt) # 指数退避
self.logger.warning(
f"Rate limited, retry {attempt + 1}/{max_retries} after {delay}s"
)
await asyncio.sleep(delay)
except APIError as e:
last_exception = e
error_msg = str(e).lower()
# 服务器过载,可重试
if "overloaded" in error_msg or "503" in error_msg:
delay = base_delay * (2 ** attempt)
self.logger.warning(
f"Server overloaded, retry {attempt + 1}/{max_retries} after {delay}s"
)
await asyncio.sleep(delay)
else:
# 其他API错误,不重试
raise
except TimeoutError as e:
last_exception = e
delay = base_delay * (2 ** attempt)
self.logger.warning(
f"Request timeout, retry {attempt + 1}/{max_retries} after {delay}s"
)
await asyncio.sleep(delay)
# 所有重试都失败
self.logger.error(f"All {max_retries} retries failed")
raise last_exception
由于DeepSeek API响应格式与OpenAI存在差异,系统实现了专门的适配器:
class DeepSeekChatOpenAI(ChatOpenAI):
"""
DeepSeek API适配器
解决的格式差异:
1. tool_calls中的arguments可能是字符串而非字典
2. 部分响应字段名称不同
"""
def _fix_tool_calls(self, response: AIMessage) -> AIMessage:
"""
修复工具调用格式
DeepSeek返回的tool_calls中,arguments字段可能是JSON字符串
而非已解析的字典,需要进行转换
"""
if not response.tool_calls:
return response
fixed_tool_calls = []
for tc in response.tool_calls:
fixed_tc = tc.copy()
# 如果arguments是字符串,尝试解析为JSON
if isinstance(tc.get("args"), str):
try:
fixed_tc["args"] = json.loads(tc["args"])
except json.JSONDecodeError:
self.logger.warning(
f"Failed to parse tool arguments: {tc['args']}"
)
fixed_tool_calls.append(fixed_tc)
response.tool_calls = fixed_tool_calls
return response
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
**kwargs
) -> ChatResult:
"""同步生成方法"""
result = super()._generate(messages, stop, **kwargs)
# 修复每个生成结果中的tool_calls
for generation in result.generations:
if hasattr(generation, 'message'):
generation.message = self._fix_tool_calls(generation.message)
return result
async def _agenerate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
**kwargs
) -> ChatResult:
"""异步生成方法"""
result = await super()._agenerate(messages, stop, **kwargs)
for generation in result.generations:
if hasattr(generation, 'message'):
generation.message = self._fix_tool_calls(generation.message)
return result
Model Context Protocol (MCP) 是一种标准化的工具调用协议,定义了LLM与外部工具交互的接口规范。
┌────────────────────────────────────────────────────────────────────┐
│ MCP协议架构 │
├────────────────────────────────────────────────────────────────────┤
│ │
│ ┌────────────────┐ ┌────────────────┐ │
│ │ MCP Client │ │ MCP Server │ │
│ │ │ │ │ │
│ │ (Agent侧) │ ←── HTTP/SSE ───→ │ (工具服务侧) │ │
│ │ │ │ │ │
│ │ • list_tools │ │ • 工具注册 │ │
│ │ • call_tool │ │ • 请求处理 │ │
│ │ • 结果处理 │ │ • 结果返回 │ │
│ └────────────────┘ └────────────────┘ │
│ │
│ 协议特点: │
│ • 声明式工具定义 (JSON Schema) │
│ • 异步调用支持 │
│ • 错误处理标准化 │
│ • 流式响应支持 │
│ │
└────────────────────────────────────────────────────────────────────┘
from fastmcp import FastMCP, tool
mcp = FastMCP("trade_service")
@mcp.tool()
async def execute_trade(
symbol: str,
action: Literal["buy", "sell"],
quantity: float,
order_type: Literal["market", "limit"] = "market",
limit_price: Optional[float] = None
) -> dict:
"""
执行交易指令
Args:
symbol: 交易标的代码
- 美股: "AAPL", "MSFT", "GOOGL"等
- 加密货币: "BTCUSDT", "ETHUSDT"等
- A股: "600519.SH", "000001.SZ"等
action: 交易方向
- "buy": 买入
- "sell": 卖出
quantity: 交易数量
- 美股: 股数 (支持小数,表示零股交易)
- 加密货币: 数量 (支持小数)
- A股: 手数 (1手=100股)
order_type: 订单类型
- "market": 市价单 (立即成交)
- "limit": 限价单 (指定价格)
limit_price: 限价单价格 (order_type="limit"时必填)
Returns:
dict: 交易执行结果
{
"success": True/False,
"order_id": "ord_xxx",
"symbol": "AAPL",
"action": "buy",
"order_type": "market",
"requested_quantity": 100,
"filled_quantity": 100,
"filled_price": 150.25,
"commission": 0.01,
"timestamp": "2024-12-28T10:30:00Z",
"message": "Order executed successfully"
}
Raises:
InsufficientFundsError: 资金不足
InsufficientPositionError: 持仓不足 (卖出时)
MarketClosedError: 市场已关闭
T1RestrictionError: T+1限制 (A股当日买入不可卖)
"""
# 获取当前模拟环境上下文
context = get_simulation_context()
# 1. 验证市场规则
market_rules = get_market_rules(symbol)
if not market_rules.is_trading_hours(context.current_time):
return {
"success": False,
"error": "MarketClosedError",
"message": f"Market is closed. Trading hours: {market_rules.trading_hours}"
}
# 2. 验证T+1规则 (A股)
if action == "sell" and market_rules.settlement == "T+1":
position = context.portfolio.get_position(symbol)
if position and position.buy_date == context.current_date:
return {
"success": False,
"error": "T1RestrictionError",
"message": "Cannot sell shares bought today (T+1 settlement)"
}
# 3. 验证资金/持仓
if action == "buy":
current_price = await get_current_price(symbol)
required_funds = current_price * quantity * (1 + market_rules.commission_rate)
if required_funds > context.portfolio.available_cash:
return {
"success": False,
"error": "InsufficientFundsError",
"message": f"Insufficient funds. Required: ${required_funds:.2f}, Available: ${context.portfolio.available_cash:.2f}"
}
else: # sell
position = context.portfolio.get_position(symbol)
if not position or position.quantity < quantity:
available = position.quantity if position else 0
return {
"success": False,
"error": "InsufficientPositionError",
"message": f"Insufficient position. Requested: {quantity}, Available: {available}"
}
# 4. 执行模拟交易
filled_price = await simulate_execution(symbol, action, quantity, order_type, limit_price)
commission = calculate_commission(filled_price * quantity, market_rules)
# 5. 更新持仓
context.portfolio.update(symbol, action, quantity, filled_price, commission)
# 6. 记录交易
order_id = generate_order_id()
trade_record = {
"order_id": order_id,
"symbol": symbol,
"action": action,
"quantity": quantity,
"filled_price": filled_price,
"commission": commission,
"timestamp": context.current_time.isoformat()
}
context.trade_history.append(trade_record)
return {
"success": True,
"order_id": order_id,
"symbol": symbol,
"action": action,
"order_type": order_type,
"requested_quantity": quantity,
"filled_quantity": quantity,
"filled_price": filled_price,
"commission": commission,
"timestamp": context.current_time.isoformat(),
"message": "Order executed successfully"
}
@mcp.tool()
async def get_price(
symbol: str,
data_type: Literal["current", "historical"] = "current",
start_date: Optional[str] = None,
end_date: Optional[str] = None,
interval: Literal["1d", "1h", "5m"] = "1d"
) -> dict:
"""
获取价格数据
Args:
symbol: 标的代码
data_type: 数据类型
- "current": 当前价格
- "historical": 历史数据
start_date: 历史数据起始日期 (YYYY-MM-DD)
end_date: 历史数据结束日期 (YYYY-MM-DD)
interval: 数据间隔
- "1d": 日线
- "1h": 小时线 (仅加密货币)
- "5m": 5分钟线 (仅加密货币)
Returns:
dict: 价格数据
{
"symbol": "AAPL",
"current_price": 150.25,
"change": 2.50,
"change_percent": 1.69,
"volume": 45678900,
"timestamp": "2024-12-28T16:00:00Z",
"historical": [ // 仅data_type="historical"时返回
{
"date": "2024-12-27",
"open": 148.00,
"high": 151.50,
"low": 147.50,
"close": 150.25,
"volume": 45678900
},
...
]
}
Note:
- 历史数据严格遵循时间限制,不会返回模拟日期之后的数据
- 这是防止前视偏差(look-ahead bias)的关键机制
"""
context = get_simulation_context()
# 关键: 防止前视偏差
if end_date:
end_date = min(
datetime.strptime(end_date, "%Y-%m-%d"),
context.current_date
).strftime("%Y-%m-%d")
else:
end_date = context.current_date.strftime("%Y-%m-%d")
# 从数据源获取价格
if data_type == "current":
price_data = await data_provider.get_current_price(
symbol,
as_of=context.current_date
)
return {
"symbol": symbol,
"current_price": price_data["close"],
"change": price_data["change"],
"change_percent": price_data["change_percent"],
"volume": price_data["volume"],
"timestamp": context.current_time.isoformat()
}
else:
historical = await data_provider.get_historical_prices(
symbol,
start_date=start_date,
end_date=end_date,
interval=interval
)
current = historical[-1] if historical else None
return {
"symbol": symbol,
"current_price": current["close"] if current else None,
"change": current["change"] if current else None,
"change_percent": current["change_percent"] if current else None,
"volume": current["volume"] if current else None,
"timestamp": context.current_time.isoformat(),
"historical": historical
}
@mcp.tool()
async def search_information(
query: str,
search_type: Literal["news", "analysis", "general"] = "general",
max_results: int = 5,
time_range: Optional[str] = None
) -> dict:
"""
搜索市场相关信息
使用Jina AI搜索服务获取实时市场信息。
Args:
query: 搜索关键词
示例:
- "AAPL earnings report Q4 2024"
- "Bitcoin price prediction"
- "Federal Reserve interest rate decision"
search_type: 搜索类型
- "news": 新闻报道
- "analysis": 分析文章
- "general": 通用搜索
max_results: 返回结果数量上限 (1-10)
time_range: 时间范围
- "24h": 过去24小时
- "7d": 过去7天
- "30d": 过去30天
- None: 不限制
Returns:
dict: 搜索结果
{
"query": "AAPL earnings report",
"results": [
{
"title": "Apple Reports Record Q4 Revenue",
"snippet": "Apple Inc. announced...",
"url": "https://...",
"source": "Reuters",
"publish_date": "2024-12-27",
"relevance_score": 0.95
},
...
],
"total_found": 156,
"returned": 5
}
Note:
- 搜索结果经过时间过滤,不会包含模拟日期之后发布的内容
- 结果按相关性排序
"""
context = get_simulation_context()
# 构建Jina搜索请求
jina_params = {
"query": query,
"num_results": max_results,
"search_type": search_type
}
if time_range:
jina_params["time_range"] = time_range
# 调用Jina API
raw_results = await jina_client.search(**jina_params)
# 关键: 过滤未来信息
filtered_results = []
for result in raw_results:
publish_date = parse_date(result.get("publish_date"))
if publish_date and publish_date <= context.current_date:
filtered_results.append({
"title": result["title"],
"snippet": result["snippet"][:500], # 截断长文本
"url": result["url"],
"source": result.get("source", "Unknown"),
"publish_date": publish_date.strftime("%Y-%m-%d"),
"relevance_score": result.get("score", 0)
})
return {
"query": query,
"results": filtered_results[:max_results],
"total_found": len(raw_results),
"returned": len(filtered_results[:max_results])
}
@mcp.tool()
async def calculate(expression: str) -> dict:
"""
执行数学计算
支持基础数学运算和金融计算函数。
Args:
expression: 数学表达式字符串
支持的运算:
基础运算: +, -, *, /, ** (幂), % (取模)
数学函数: sqrt, log, log10, exp, sin, cos, tan
统计函数: mean, std, var, median, min, max
金融函数:
- returns(prices): 计算收益率序列
- cumulative_returns(prices): 计算累积收益率
- sharpe_ratio(returns, rf=0): 计算夏普比率
- max_drawdown(prices): 计算最大回撤
- volatility(returns): 计算波动率
- var(returns, confidence=0.95): 计算VaR
Returns:
dict: 计算结果
{
"expression": "sharpe_ratio([0.1, 0.05, -0.02, 0.08])",
"result": 1.23,
"type": "float"
}
Examples:
>>> calculate("100 * 1.05 ** 12")
{"result": 179.58, ...}
>>> calculate("sharpe_ratio([0.1, 0.05, -0.02, 0.08])")
{"result": 1.23, ...}
>>> calculate("max_drawdown([100, 95, 98, 92, 96])")
{"result": -0.08, ...}
"""
try:
# 使用安全的表达式求值
result = safe_eval(
expression,
allowed_functions=FINANCIAL_FUNCTIONS,
allowed_operations=MATH_OPERATIONS
)
return {
"expression": expression,
"result": result,
"type": type(result).__name__
}
except SyntaxError as e:
return {
"expression": expression,
"error": "SyntaxError",
"message": f"Invalid expression syntax: {e}"
}
except ValueError as e:
return {
"expression": expression,
"error": "ValueError",
"message": str(e)
}
except Exception as e:
return {
"expression": expression,
"error": "CalculationError",
"message": str(e)
}
# 安全求值实现
ALLOWED_NAMES = {
# 数学常量
"pi": math.pi,
"e": math.e,
# 数学函数
"sqrt": math.sqrt,
"log": math.log,
"log10": math.log10,
"exp": math.exp,
"sin": math.sin,
"cos": math.cos,
"tan": math.tan,
"abs": abs,
"round": round,
# 统计函数
"mean": lambda x: sum(x) / len(x),
"std": lambda x: statistics.stdev(x),
"var": lambda x: statistics.variance(x),
"median": statistics.median,
"min": min,
"max": max,
# 金融函数
"returns": financial.calculate_returns,
"cumulative_returns": financial.cumulative_returns,
"sharpe_ratio": financial.sharpe_ratio,
"max_drawdown": financial.max_drawdown,
"volatility": financial.volatility,
"var": financial.value_at_risk,
}
def safe_eval(expression: str, **kwargs) -> Any:
"""安全的表达式求值"""
# 编译表达式
code = compile(expression, "<string>", "eval")
# 验证只使用允许的名称
for name in code.co_names:
if name not in ALLOWED_NAMES:
raise ValueError(f"Name '{name}' is not allowed")
# 执行求值
return eval(code, {"__builtins__": {}}, ALLOWED_NAMES)
# start_mcp_services.py
import asyncio
from fastmcp import FastMCP
import uvicorn
# 工具服务配置
SERVICES = [
{
"name": "trade",
"module": "tool_trade",
"port": 8001,
"description": "Trade execution service"
},
{
"name": "price",
"module": "tool_get_price_local",
"port": 8002,
"description": "Price data service"
},
{
"name": "search",
"module": "tool_jina_search",
"port": 8003,
"description": "Information search service"
},
{
"name": "math",
"module": "tool_math",
"port": 8004,
"description": "Mathematical calculation service"
},
{
"name": "news",
"module": "tool_alphavantage_news",
"port": 8005,
"description": "News retrieval service"
},
]
async def start_service(service_config: dict):
"""启动单个MCP服务"""
mcp = FastMCP(service_config["name"])
# 动态导入工具模块
module = __import__(service_config["module"])
# 注册工具
for tool_func in module.get_tools():
mcp.add_tool(tool_func)
# 启动服务
config = uvicorn.Config(
mcp.app,
host="0.0.0.0",
port=service_config["port"],
log_level="info"
)
server = uvicorn.Server(config)
print(f"Starting {service_config['name']} service on port {service_config['port']}")
await server.serve()
async def start_all_services():
"""并行启动所有MCP服务"""
tasks = [start_service(cfg) for cfg in SERVICES]
await asyncio.gather(*tasks)
if __name__ == "__main__":
print("Starting MCP Tool Services...")
print("=" * 50)
for svc in SERVICES:
print(f" - {svc['name']}: port {svc['port']} ({svc['description']})")
print("=" * 50)
asyncio.run(start_all_services())
系统通过抽象基类定义统一的市场规则接口:
from abc import ABC, abstractmethod
from datetime import datetime, time
from typing import Optional
class MarketRules(ABC):
"""市场规则抽象基类"""
@property
@abstractmethod
def market_code(self) -> str:
"""市场代码"""
pass
@property
@abstractmethod
def currency(self) -> str:
"""交易货币"""
pass
@property
@abstractmethod
def settlement_type(self) -> str:
"""结算类型: T+0 或 T+1"""
pass
@property
@abstractmethod
def trading_hours(self) -> list:
"""交易时段列表"""
pass
@property
@abstractmethod
def commission_rate(self) -> float:
"""手续费率"""
pass
@abstractmethod
def is_trading_day(self, date: datetime) -> bool:
"""是否为交易日"""
pass
@abstractmethod
def is_trading_hours(self, dt: datetime) -> bool:
"""是否在交易时段"""
pass
@abstractmethod
def can_sell(self, buy_date: datetime, sell_date: datetime) -> bool:
"""是否可以卖出 (考虑T+1规则)"""
pass
@abstractmethod
def get_lot_size(self, symbol: str) -> int:
"""获取最小交易单位"""
pass
class USMarketRules(MarketRules):
"""美股市场规则"""
# NYSE/NASDAQ节假日列表
US_HOLIDAYS = [
"2024-01-01", # New Year's Day
"2024-01-15", # Martin Luther King Jr. Day
"2024-02-19", # Presidents' Day
"2024-03-29", # Good Friday
"2024-05-27", # Memorial Day
"2024-06-19", # Juneteenth
"2024-07-04", # Independence Day
"2024-09-02", # Labor Day
"2024-11-28", # Thanksgiving
"2024-12-25", # Christmas
]
@property
def market_code(self) -> str:
return "US"
@property
def currency(self) -> str:
return "USD"
@property
def settlement_type(self) -> str:
return "T+0" # 美股T+0,当日可卖
@property
def trading_hours(self) -> list:
"""
美股交易时段 (东部时间):
- 盘前: 04:00 - 09:30
- 正常: 09:30 - 16:00
- 盘后: 16:00 - 20:00
"""
return [
{"name": "pre_market", "start": time(4, 0), "end": time(9, 30)},
{"name": "regular", "start": time(9, 30), "end": time(16, 0)},
{"name": "after_hours", "start": time(16, 0), "end": time(20, 0)},
]
@property
def commission_rate(self) -> float:
return 0.0001 # 0.01%
def is_trading_day(self, date: datetime) -> bool:
"""周一至周五,且非节假日"""
if date.weekday() >= 5: # 周六日
return False
if date.strftime("%Y-%m-%d") in self.US_HOLIDAYS:
return False
return True
def is_trading_hours(self, dt: datetime) -> bool:
"""检查是否在交易时段"""
if not self.is_trading_day(dt):
return False
current_time = dt.time()
for session in self.trading_hours:
if session["start"] <= current_time <= session["end"]:
return True
return False
def can_sell(self, buy_date: datetime, sell_date: datetime) -> bool:
"""T+0: 当日买入可当日卖出"""
return True
def get_lot_size(self, symbol: str) -> int:
"""美股支持零股交易,最小单位1股"""
return 1
class ChinaAShareRules(MarketRules):
"""A股市场规则"""
@property
def market_code(self) -> str:
return "CN"
@property
def currency(self) -> str:
return "CNY"
@property
def settlement_type(self) -> str:
return "T+1" # A股T+1,次日才能卖
@property
def trading_hours(self) -> list:
"""
A股交易时段 (北京时间):
- 上午: 09:30 - 11:30
- 下午: 13:00 - 15:00
"""
return [
{"name": "morning", "start": time(9, 30), "end": time(11, 30)},
{"name": "afternoon", "start": time(13, 0), "end": time(15, 0)},
]
@property
def commission_rate(self) -> float:
return 0.0003 # 0.03% (券商佣金) + 印花税另计
def is_trading_day(self, date: datetime) -> bool:
"""使用Tushare获取交易日历"""
# 实际实现会查询交易日历
if date.weekday() >= 5:
return False
# TODO: 检查中国法定节假日
return True
def is_trading_hours(self, dt: datetime) -> bool:
if not self.is_trading_day(dt):
return False
current_time = dt.time()
for session in self.trading_hours:
if session["start"] <= current_time <= session["end"]:
return True
return False
def can_sell(self, buy_date: datetime, sell_date: datetime) -> bool:
"""T+1: 买入后的下一个交易日才能卖出"""
return sell_date.date() > buy_date.date()
def get_lot_size(self, symbol: str) -> int:
"""A股最小交易单位: 1手 = 100股"""
return 100
class CryptoMarketRules(MarketRules):
"""加密货币市场规则"""
@property
def market_code(self) -> str:
return "CRYPTO"
@property
def currency(self) -> str:
return "USDT"
@property
def settlement_type(self) -> str:
return "T+0" # 即时结算
@property
def trading_hours(self) -> list:
"""24/7全天候交易"""
return [
{"name": "24h", "start": time(0, 0), "end": time(23, 59, 59)},
]
@property
def commission_rate(self) -> float:
return 0.001 # 0.1% (典型交易所费率)
def is_trading_day(self, date: datetime) -> bool:
"""加密货币全年无休"""
return True
def is_trading_hours(self, dt: datetime) -> bool:
"""24/7交易"""
return True
def can_sell(self, buy_date: datetime, sell_date: datetime) -> bool:
"""即时可卖"""
return True
def get_lot_size(self, symbol: str) -> int:
"""加密货币支持极小单位交易"""
# 根据币种返回最小精度
lot_sizes = {
"BTCUSDT": 0.00001,
"ETHUSDT": 0.0001,
"XRPUSDT": 1,
"SOLUSDT": 0.01,
}
return lot_sizes.get(symbol, 0.001)
class BaseAgent:
"""基础Agent - 支持多市场适配"""
def __init__(self, market: str = "us", **kwargs):
self.market = market
self.market_rules = self._get_market_rules(market)
def _get_market_rules(self, market: str) -> MarketRules:
"""获取市场规则实例"""
rules_map = {
"us": USMarketRules(),
"cn": ChinaAShareRules(),
"crypto": CryptoMarketRules(),
}
if market not in rules_map:
raise ValueError(f"Unsupported market: {market}")
return rules_map[market]
def _build_system_prompt(self, date: str) -> str:
"""构建包含市场规则的系统提示"""
return f"""
...
## Market Rules
- Market: {self.market_rules.market_code}
- Currency: {self.market_rules.currency}
- Settlement: {self.market_rules.settlement_type}
- Trading Hours: {self._format_trading_hours()}
- Commission Rate: {self.market_rules.commission_rate * 100:.3f}%
## Important Trading Restrictions
{self._get_trading_restrictions()}
...
"""
def _get_trading_restrictions(self) -> str:
"""获取市场特定的交易限制说明"""
if self.market == "cn":
return """- T+1 Settlement: Shares bought today CANNOT be sold until the next trading day
- Lot Size: Must trade in multiples of 100 shares (1 lot)
- Price Limits: ±10% daily price limit (±20% for ChiNext/STAR)"""
elif self.market == "crypto":
return """- 24/7 Trading: Market never closes
- High Volatility: Be cautious of large price swings
- No Settlement Restrictions: Can buy and sell instantly"""
else: # us
return """- T+0 Settlement: Can sell shares on the same day you buy them
- Fractional Shares: Can trade partial shares
- Pattern Day Trader Rule: Be aware if account < $25,000"""
AI-Trader采用多维度指标体系评估模型表现:
| 指标 | 定义 | 公式 | 说明 |
|---|---|---|---|
| 总收益率 | Total Return | $R_{total} = \frac{V_{end} - V_{start}}{V_{start}}$ | 整体收益表现 |
| 年化收益率 | Annualized Return | $R_{ann} = (1 + R_{total})^{\frac{252}{n}} - 1$ | 标准化年度收益 |
| 超额收益 | Alpha | $\alpha = R_{agent} - R_{benchmark}$ | 相对基准的超额 |
| 日均收益 | Average Daily Return | $\bar{r} = \frac{1}{n}\sum_{i=1}^{n} r_i$ | 收益稳定性 |
| 指标 | 定义 | 公式 | 说明 |
|---|---|---|---|
| 最大回撤 | Maximum Drawdown | $MDD = \max_{t}\left(\frac{Peak_t - Trough_t}{Peak_t}\right)$ | 最大亏损幅度 |
| 波动率 | Volatility | $\sigma = \sqrt{\frac{1}{n}\sum_{i=1}^{n}(r_i - \bar{r})^2}$ | 收益波动程度 |
| 下行风险 | Downside Deviation | $\sigma_d = \sqrt{\frac{1}{n}\sum_{i=1}^{n} \min(r_i - T, 0)^2}$ | 负收益波动 |
| VaR (95%) | Value at Risk | $P(R < VaR) = 5%$ | 极端损失估计 |
| 指标 | 定义 | 公式 | 说明 |
|---|---|---|---|
| 夏普比率 | Sharpe Ratio | $SR = \frac{\bar{r} - r_f}{\sigma}$ | 单位风险收益 |
| 索提诺比率 | Sortino Ratio | $Sortino = \frac{\bar{r} - r_f}{\sigma_d}$ | 单位下行风险收益 |
| 卡玛比率 | Calmar Ratio | $Calmar = \frac{R_{ann}}{ | MDD |
| 信息比率 | Information Ratio | $IR = \frac{\alpha}{\sigma_{tracking}}$ | 主动管理能力 |
| 指标 | 说明 |
|---|---|
| 交易频率 | 平均每个交易日的交易次数 |
| 胜率 | 盈利交易占总交易的比例 |
| 盈亏比 | 平均盈利金额 / 平均亏损金额 |
| 平均持仓周期 | 从买入到卖出的平均天数 |
| 换手率 | 交易量 / 平均持仓价值 |
| 工具使用频率 | 每次决策平均调用工具次数 |
| 推理步数 | 每次决策平均推理步数 |
┌────────────────────────────────────────────────────────────────────────┐
│ 评测流程 │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ Phase 1: 初始化 │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ 1.1 设定评测参数 │ │
│ │ • 时间范围: 2024-01-01 ~ 2024-12-31 │ │
│ │ • 初始资金: $10,000 / ¥100,000 / 50,000 USDT │ │
│ │ • 市场类型: US / CN / CRYPTO │ │
│ │ │ │
│ │ 1.2 加载待评测模型 │ │
│ │ • GPT-4o │ │
│ │ • Claude-3.5-Sonnet │ │
│ │ • DeepSeek-V3 │ │
│ │ • Gemini-2.0-Flash │ │
│ │ • Qwen-2.5-72B │ │
│ │ • ... │ │
│ │ │ │
│ │ 1.3 启动MCP工具服务 │ │
│ │ • tool_trade (8001) │ │
│ │ • tool_price (8002) │ │
│ │ • tool_search (8003) │ │
│ │ • tool_math (8004) │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Phase 2: 并行执行 │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ │ │
│ │ For each trading_day in date_range: │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ For each model in models (并行): │ │ │
│ │ │ agent = create_agent(model, trading_day) │ │ │
│ │ │ result = agent.run_trading_session() │ │ │
│ │ │ save_result(model, trading_day, result) │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌───────────────────────────────────────────────────────────┐ │ │
│ │ │ Update progress: │ │ │
│ │ │ [████████████░░░░░░░░] 60% - Day 180/300 │ │ │
│ │ └───────────────────────────────────────────────────────────┘ │ │
│ │ │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ Phase 3: 结果汇总 │
│ ┌──────────────────────────────────────────────────────────────────┐ │
│ │ 3.1 计算评测指标 │ │
│ │ • 收益类: Total Return, Alpha, Sharpe Ratio │ │
│ │ • 风险类: MDD, Volatility, VaR │ │
│ │ • 行为类: Win Rate, Trade Frequency, Tool Usage │ │
│ │ │ │
│ │ 3.2 生成排行榜 │ │
│ │ ┌─────┬────────────┬────────┬───────┬─────────┐ │ │
│ │ │Rank │ Model │ Return │ Sharpe│ MDD │ │ │
│ │ ├─────┼────────────┼────────┼───────┼─────────┤ │ │
│ │ │ 1 │ Model A │ +25.3% │ 1.85 │ -8.2% │ │ │
│ │ │ 2 │ Model B │ +18.7% │ 1.42 │ -12.5% │ │ │
│ │ │ ... │ ... │ ... │ ... │ ... │ │ │
│ │ └─────┴────────────┴────────┴───────┴─────────┘ │ │
│ │ │ │
│ │ 3.3 导出推理链记录 │ │
│ │ • JSON格式完整记录 │ │
│ │ • 可视化时间线 │ │
│ │ │ │
│ │ 3.4 更新Dashboard (ai4trade.ai) │ │
│ └──────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────┘
| 条件类型 | 保障措施 | 实现方式 |
|---|---|---|
| 相同资金 | 所有模型使用相同初始资金 | 配置文件统一设定 |
| 相同数据 | 使用相同数据源和时间点 | 统一数据管道 |
| 相同工具 | 调用相同的MCP服务 | 共享工具服务实例 |
| 相同规则 | 遵循相同交易规则 | 统一市场规则类 |
| 同步时间 | 模拟相同的市场时间 | 统一时间上下文 |
class DataFilter:
"""
数据过滤器 - 确保无前视偏差
关键原则:
- 所有数据查询必须指定as_of日期
- 返回结果不包含as_of之后的任何信息
"""
def __init__(self, simulation_date: datetime):
self.simulation_date = simulation_date
def filter_price_data(self, data: pd.DataFrame) -> pd.DataFrame:
"""
过滤价格数据
确保不返回模拟日期之后的价格数据
"""
if "date" in data.columns:
mask = pd.to_datetime(data["date"]) <= self.simulation_date
return data[mask].copy()
return data
def filter_news(self, news_list: List[dict]) -> List[dict]:
"""
过滤新闻数据
确保不返回模拟日期之后发布的新闻
"""
filtered = []
for news in news_list:
publish_date = parse_date(news.get("publish_date"))
if publish_date and publish_date <= self.simulation_date:
filtered.append(news)
return filtered
def validate_agent_decision(self, decision: dict, reasoning: List[dict]) -> bool:
"""
验证Agent决策未使用未来信息
检查推理链中引用的所有信息源的时间戳
"""
for step in reasoning:
if step.get("observation"):
# 检查工具返回结果中的时间戳
obs = step["observation"]
if isinstance(obs, dict):
for key, value in obs.items():
if "date" in key.lower() and value:
ref_date = parse_date(str(value))
if ref_date and ref_date > self.simulation_date:
self.logger.error(
f"Future information leak detected: "
f"{key}={value} > {self.simulation_date}"
)
return False
return True
@dataclass
class TradingSessionResult:
"""单次交易会话结果"""
date: str
model: str
market: str
# 决策信息
decisions: List[TradeDecision]
execution_results: List[ExecutionResult]
# 推理过程
reasoning_chain: List[ReasoningStep]
steps_taken: int
tokens_used: int
# 持仓状态
portfolio_before: PortfolioSnapshot
portfolio_after: PortfolioSnapshot
# 指标
session_return: float
session_pnl: float
# 元数据
start_time: datetime
end_time: datetime
duration_seconds: float
@dataclass
class BenchmarkResult:
"""完整评测结果"""
config: BenchmarkConfig
# 按模型分组的结果
model_results: Dict[str, ModelResult]
# 排行榜
leaderboard: List[LeaderboardEntry]
# 统计信息
total_trading_days: int
total_sessions: int
# 生成时间
generated_at: datetime
# 存储为JSONL格式 (便于追加和流式处理)
# results/{model}/{market}/sessions/{date}.jsonl
| 属性 | 值 |
|---|---|
| 标的范围 | NASDAQ-100成分股 (100只) |
| 时间范围 | 2023-01-01 ~ 2024-12-31 |
| 数据类型 | 日线OHLCV, 分时数据, 新闻 |
| 数据源 | Alpha Vantage API |
| 交易日数 | ~500个交易日 |
代表性标的:
AAPL (Apple) MSFT (Microsoft) GOOGL (Alphabet)
AMZN (Amazon) NVDA (NVIDIA) META (Meta)
TSLA (Tesla) AMD INTC (Intel)
...
| 属性 | 值 |
|---|---|
| 标的范围 | SSE-50成分股 (50只) |
| 时间范围 | 2023-01-01 ~ 2024-12-31 |
| 数据类型 | 日线OHLCV, 公告数据 |
| 数据源 | Tushare Pro API |
| 交易日数 | ~480个交易日 |
代表性标的:
600519.SH (贵州茅台) 601318.SH (中国平安)
600036.SH (招商银行) 601166.SH (兴业银行)
600276.SH (恒瑞医药) 000858.SZ (五粮液)
...
| 属性 | 值 |
|---|---|
| 标的范围 | 10种主流加密货币 |
| 时间范围 | 2023-01-01 ~ 2024-12-31 |
| 数据类型 | 小时线OHLCV |
| 数据源 | Alpha Vantage Crypto API |
| 数据点数 | ~17,520 * 10 = 175,200 |
标的列表:
BTC (Bitcoin) ETH (Ethereum) XRP (Ripple)
SOL (Solana) ADA (Cardano) SUI
LINK (Chainlink) AVAX (Avalanche) LTC (Litecoin)
DOT (Polkadot)
# benchmark_config.yaml
experiment:
name: "AI-Trader Benchmark 2024"
version: "1.0"
markets:
us:
initial_cash: 10000
currency: USD
assets: nasdaq_100
frequency: daily
cn:
initial_cash: 100000
currency: CNY
assets: sse_50
frequency: daily
crypto:
initial_cash: 50000
currency: USDT
assets: [BTC, ETH, XRP, SOL, ADA, SUI, LINK, AVAX, LTC, DOT]
frequency: hourly
models:
- name: gpt-4o
provider: openai
temperature: 0.7
- name: claude-3-5-sonnet
provider: anthropic
temperature: 0.7
- name: deepseek-v3
provider: deepseek
temperature: 0.7
- name: gemini-2.0-flash
provider: google
temperature: 0.7
- name: qwen-2.5-72b
provider: alibaba
temperature: 0.7
agent:
max_steps: 10
timeout_seconds: 300
retry_attempts: 3
# 单模型单市场运行
python main.py \
--model gpt-4o \
--market us \
--start-date 2024-01-01 \
--end-date 2024-12-31
# 多模型并行运行
python main_parallel.py \
--config configs/benchmark_config.yaml \
--parallel-models 5 \
--output-dir results/benchmark_2024
通用智能评分 vs 交易收益
智能评分 │
(基准测试) │
95 ────┤ ● GPT-4o
│ ● Claude-3.5
90 ────┤
│ ● Gemini
85 ────┤
│ ● DeepSeek
80 ────┤ ● Qwen
│
75 ────┤
└─────────────────────────────────
5% 10% 15% 20% 25% 交易收益
观察: 智能评分最高的模型并非交易收益最高
结论: 通用智能能力不能直接转化为交易决策能力
分析:
| 模型 | 美股收益 | A股收益 | 加密货币收益 | 最大回撤 | 稳定性评级 |
|---|---|---|---|---|---|
| Model A | +15% | +8% | +22% | -9% | ★★★★★ |
| Model B | +28% | -3% | -15% | -35% | ★★☆☆☆ |
| Model C | +8% | +12% | +10% | -7% | ★★★★☆ |
| Model D | +35% | +5% | -8% | -42% | ★☆☆☆☆ |
关键观察:
工具调用频率 vs 决策胜率
工具调用次数/决策 │
│ ★ 高胜率模型
8 ────────────┤ ●
│ ●
6 ────────────┤ ●
│ ●
4 ────────────┤ ●
│ ★ 低胜率模型
2 ────────────┤●
└───────────────────────────
40% 45% 50% 55% 60% 65% 胜率
r = 0.73 (强正相关)
分析:
| 平均推理步数 | 策略一致性 | 描述 |
|---|---|---|
| 2-3步 | 低 | 冲动决策,频繁改变立场 |
| 4-6步 | 中 | 基本逻辑,但分析不深入 |
| 7-10步 | 高 | 深思熟虑,策略执行稳定 |
| 排名 | 模型 | 美股 | A股 | 加密货币 | 综合Sharpe |
|---|---|---|---|---|---|
| 1 | - | - | - | - | - |
| 2 | - | - | - | - | - |
| 3 | - | - | - | - | - |
注: 实时排名请访问 ai4trade.ai
GPT-4o:
Claude-3.5-Sonnet:
DeepSeek-V3:
Gemini-2.0-Flash:
套利难度 (基于平均Alpha)
加密货币 ████████░░ Easy (高波动,套利机会多)
美股 █████░░░░░ Medium (效率高,机会有限)
A股 ██░░░░░░░░ Hard (T+1限制,政策影响)
| 市场 | 适合策略 | 不适合策略 |
|---|---|---|
| 美股 | 趋势跟随、动量策略 | 高频套利 |
| A股 | 价值投资、事件驱动 | 日内交易(T+1限制) |
| 加密货币 | 动量策略、均值回归 | 长期持有(高波动) |
| 差异点 | 模拟环境 | 真实环境 |
|---|---|---|
| 滑点 | 假设零滑点 | 大单有明显滑点 |
| 流动性 | 假设无限流动性 | 受市场深度限制 |
| 延迟 | 理想化执行 | 网络和处理延迟 |
| 市场冲击 | 未考虑 | 大单影响市场价格 |
| 极端行情 | 数据平滑 | 闪崩/熔断等异常 |
有限的时间范围
工具能力限制
模型调用成本
Prompt工程影响
# 已知问题列表
KNOWN_ISSUES = [
{
"id": "ISSUE-001",
"description": "DeepSeek模型偶尔返回格式不规范的tool_calls",
"status": "Workaround implemented",
"solution": "DeepSeekChatOpenAI适配器自动修复"
},
{
"id": "ISSUE-002",
"description": "A股数据在节假日后第一天可能延迟",
"status": "Known limitation",
"solution": "使用缓存数据或等待数据更新"
},
{
"id": "ISSUE-003",
"description": "加密货币市场极端波动时模型可能无法及时响应",
"status": "Under investigation",
"solution": "考虑增加紧急停止机制"
}
]
增加更多模型
优化工具链
改进可视化
多Agent协作
强化学习集成
私有部署方案
生产级交易系统
多模态输入
因果推理增强
| 术语 | 全称 | 说明 |
|---|---|---|
| LLM | Large Language Model | 大语言模型 |
| MCP | Model Context Protocol | 模型上下文协议,工具调用标准 |
| ReAct | Reasoning + Acting | 推理与行动结合的Agent框架 |
| RAG | Retrieval Augmented Generation | 检索增强生成 |
| T+0 | Trade Today + 0 | 当日买入当日可卖 |
| T+1 | Trade Today + 1 | 当日买入次日可卖 |
| MDD | Maximum Drawdown | 最大回撤 |
| SR | Sharpe Ratio | 夏普比率 |
| VaR | Value at Risk | 风险价值 |
| OHLCV | Open High Low Close Volume | 开高低收量 |
原始论文
相关研究
金融量化
| 示例 | 位置 | 说明 |
|---|---|---|
| Agent初始化 | 5.2节 | 完整初始化流程 |
| ReAct循环 | 5.4节 | 核心推理实现 |
| MCP工具定义 | 6.2节 | 各工具详细实现 |
| 市场规则 | 7.2-7.4节 | 三大市场规则类 |
| 数据过滤 | 8.3节 | 防止前视偏差 |
| 版本 | 日期 | 更新内容 |
|---|---|---|
| v1.0 | 2025-12-28 | 初始版本 |
| v2.0 | 2025-12-28 | 重写为纯AI-Trader系统研究报告 |
报告作者: NOFX Research Team 版权声明: 本报告仅供学术研究参考