code/chapter1/FirstAgentTest.ipynb
import requests
import os
import re
from openai import OpenAI
from tavily import TavilyClient
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
# 配置API密钥
API_KEY = os.getenv("API_KEY")
BASE_URL = os.getenv("BASE_URL")
MODEL_ID = os.getenv("MODEL_ID")
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY")
os.environ['TAVILY_API_KEY'] = TAVILY_API_KEY
# 系统提示词
AGENT_SYSTEM_PROMPT = """
你是一个智能旅行助手。你的任务是分析用户的请求,并使用可用工具一步步地解决问题。
# 可用工具:
- `get_weather(city: str)`: 查询指定城市的实时天气。
- `get_attraction(city: str, weather: str)`: 根据城市和天气搜索推荐的旅游景点。
# 输出格式要求:
你的每次回复必须严格遵循以下格式,包含一对Thought和Action:
Thought: [你的思考过程和下一步计划]
Action: [你要执行的具体行动]
Action的格式必须是以下之一:
1. 调用工具:function_name(arg_name="arg_value")
2. 结束任务:Finish[最终答案]
# 重要提示:
- 每次只输出一对Thought-Action
- Action必须在同一行,不要换行
- 当收集到足够信息可以回答用户问题时,必须使用 Action: Finish[最终答案] 格式结束
请开始吧!
"""
def get_weather(city: str) -> str:
"""
通过调用 wttr.in API 查询真实的天气信息。
"""
# API端点,我们请求JSON格式的数据
url = f"https://wttr.in/{city}?format=j1"
try:
# 发起网络请求
response = requests.get(url)
# 检查响应状态码是否为200 (成功)
response.raise_for_status()
# 解析返回的JSON数据
data = response.json()
# 提取当前天气状况
current_condition = data['current_condition'][0]
weather_desc = current_condition['weatherDesc'][0]['value']
temp_c = current_condition['temp_C']
# 格式化成自然语言返回
return f"{city}当前天气:{weather_desc},气温{temp_c}摄氏度"
except requests.exceptions.RequestException as e:
# 处理网络错误
return f"错误:查询天气时遇到网络问题 - {e}"
except (KeyError, IndexError) as e:
# 处理数据解析错误
return f"错误:解析天气数据失败,可能是城市名称无效 - {e}"
def get_attraction(city: str, weather: str) -> str:
"""
根据城市和天气,使用Tavily Search API搜索并返回优化后的景点推荐。
"""
api_key = os.environ.get("TAVILY_API_KEY")
if not api_key:
return "错误:未配置TAVILY_API_KEY。"
# 初始化Tavily客户端
tavily = TavilyClient(api_key=api_key)
# 构造一个精确的查询
query = f"'{city}' 在'{weather}'天气下最值得去的旅游景点推荐及理由"
try:
# 调用API,include_answer=True会返回一个综合性的回答
response = tavily.search(query=query, search_depth="basic", include_answer=True)
# Tavily返回的结果已经非常干净,可以直接使用
if response.get("answer"):
return response["answer"]
# 如果没有综合性回答,则格式化原始结果
formatted_results = []
for result in response.get("results", []):
formatted_results.append(f"- {result['title']}: {result['content']}")
if not formatted_results:
return "抱歉,没有找到相关的旅游景点推荐。"
return "根据搜索,为您找到以下信息:\n" + "\n".join(formatted_results)
except Exception as e:
return f"错误:执行Tavily搜索时出现问题 - {e}"
# 将所有工具函数放入一个字典,方便后续调用
available_tools = {
"get_weather": get_weather,
"get_attraction": get_attraction,
}
print("✅ 工具函数定义完成!")
class OpenAICompatibleClient:
"""
一个用于调用任何兼容OpenAI接口的LLM服务的客户端。
"""
def __init__(self, model: str, api_key: str, base_url: str):
self.model = model
self.client = OpenAI(api_key=api_key, base_url=base_url)
def generate(self, prompt: str, system_prompt: str) -> str:
"""调用LLM API来生成回应。"""
print("正在调用大语言模型...")
try:
messages = [
{'role': 'system', 'content': system_prompt},
{'role': 'user', 'content': prompt}
]
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
stream=False
)
answer = response.choices[0].message.content
print("大语言模型响应成功。")
return answer
except Exception as e:
print(f"调用LLM API时发生错误: {e}")
return "错误:调用语言模型服务时出错。"
class TravelAssistant:
"""
智能旅行助手类
"""
def __init__(self):
self.llm = OpenAICompatibleClient(
model=MODEL_ID,
api_key=API_KEY,
base_url=BASE_URL
)
self.prompt_history = []
def reset(self):
"""重置对话历史"""
self.prompt_history = []
def add_user_message(self, message: str):
"""添加用户消息到历史"""
self.prompt_history.append(f"用户请求: {message}")
def add_assistant_message(self, message: str):
"""添加助手消息到历史"""
self.prompt_history.append(message)
def add_observation(self, observation: str):
"""添加观察结果到历史"""
self.prompt_history.append(f"Observation: {observation}")
print("✅ 智能助手类定义完成!")
def display_conversation(history):
"""美观地显示对话历史"""
print("\n" + "="*60)
print("📝 对话历史")
print("="*60)
for i, message in enumerate(history, 1):
if message.startswith("用户请求:"):
print(f"\n👤 用户 [{i}]: {message[5:]}")
elif message.startswith("Thought:"):
print(f"\n🤔 思考 [{i}]: {message[8:].strip()}")
elif message.startswith("Action:"):
print(f"🛠️ 行动 [{i}]: {message[7:].strip()}")
elif message.startswith("Observation:"):
print(f"📊 观察 [{i}]: {message[12:].strip()}")
else:
print(f"💬 消息 [{i}]: {message}")
print("="*60 + "\n")
def parse_action(action_str):
"""解析行动字符串"""
if action_str.startswith("Finish"):
match = re.match(r"\w+\[(.*)\]", action_str)
if match:
return "finish", {"answer": match.group(1)}
return "finish", {"answer": "任务完成"}
tool_name_match = re.search(r"(\w+)\(", action_str)
if not tool_name_match:
return None, {}
tool_name = tool_name_match.group(1)
args_match = re.search(r"\((.*)\)", action_str)
if args_match:
args_str = args_match.group(1)
kwargs = dict(re.findall(r'(\w+)="([^"]*)"', args_str))
else:
kwargs = {}
return tool_name, kwargs
print("✅ 显示函数定义完成!")
def run_assistant(user_input, max_iterations=5, display=True):
"""
运行旅行助手的主函数
Args:
user_input: 用户输入的问题
max_iterations: 最大循环次数
display: 是否显示对话历史
Returns:
tuple: (最终答案, 完整的对话历史)
"""
assistant = TravelAssistant()
assistant.add_user_message(user_input)
if display:
print(f"👤 用户输入: {user_input}")
print("="*50)
for i in range(max_iterations):
if display:
print(f"\n🔄 循环 {i+1}/{max_iterations}")
# 构建完整prompt并调用LLM
full_prompt = "\n".join(assistant.prompt_history)
llm_output = assistant.llm.generate(full_prompt, AGENT_SYSTEM_PROMPT)
# 模型可能会输出多余的Thought-Action,需要截断
match = re.search(r'(Thought:.*?Action:.*?)(?=\n\s*(?:Thought:|Action:|Observation:)|\Z)', llm_output, re.DOTALL)
if match:
truncated = match.group(1).strip()
if truncated != llm_output.strip():
llm_output = truncated
print("⚠️ 已截断多余的 Thought-Action 对")
assistant.add_assistant_message(llm_output)
if display:
print(f"🤖 模型输出:\n{llm_output}")
# 解析行动
action_match = re.search(r"Action: (.*)", llm_output, re.DOTALL)
if not action_match:
observation = "错误: 未能解析到 Action 字段。请确保你的回复严格遵循 'Thought: ... Action: ...' 的格式。"
observation_str = f"Observation: {observation}"
print(f"{observation_str}\n" + "="*40)
assistant.prompt_history.append(observation_str)
continue
action_str = action_match.group(1).strip()
tool_name, kwargs = parse_action(action_str)
# 处理完成行动
if tool_name == "finish":
final_answer = kwargs.get("answer", "任务完成")
if display:
print(f"🎉 任务完成!")
print(f"📋 最终答案: {final_answer}")
return final_answer, assistant.prompt_history
# 处理工具调用
if tool_name in available_tools:
if display:
print(f"🛠️ 调用工具: {tool_name}({kwargs})")
observation = available_tools[tool_name](**kwargs)
else:
observation = f"错误:未定义的工具 '{tool_name}'"
# 记录观察结果
if display:
print(f"📊 观察结果: {observation}")
print("="*50)
assistant.add_observation(observation)
# 如果达到最大循环次数仍未完成
timeout_answer = "抱歉,经过多次尝试仍未完成您的请求。请尝试简化您的问题或稍后重试。"
if display:
print(f"⏰ 达到最大循环次数: {timeout_answer}")
return timeout_answer, assistant.prompt_history
# 测试示例
def test_basic_example():
"""测试北京天气+景点推荐的示例"""
print("🚀 开始测试北京天气+景点推荐示例")
user_input = "你好,请帮我查询一下今天北京的天气,然后根据天气推荐一个合适的旅游景点。"
final_answer, history = run_assistant(user_input, display=True)
print("\n" + "="*60)
print("📊 测试完成!")
print("="*60)
print(f"最终答案: {final_answer}")
# 显示完整对话历史
display_conversation(history)
return final_answer, history
# 运行测试示例
final_answer, history = test_basic_example()
def interactive_travel_assistant():
"""
交互式旅行助手
"""
print("🌍 欢迎使用智能旅行助手!")
print("💡 您可以询问任何城市的天气和旅游景点推荐")
print("❌ 输入 'quit' 或 '退出' 来结束对话\n")
while True:
user_input = input("👤 请输入您的问题: ").strip()
if user_input.lower() in ['quit', '退出', 'exit']:
print("👋 感谢使用智能旅行助手,再见!")
break
if not user_input:
print("⚠️ 请输入有效的问题")
continue
print("\n" + "="*50)
print("🔄 正在处理您的请求...")
final_answer, history = run_assistant(user_input, display=True)
print("\n🎯 最终回答:")
print("="*30)
print(final_answer)
print("="*30)
# 询问是否显示完整对话历史
show_history = input("\n📖 是否显示完整对话历史? (y/n): ").strip().lower()
if show_history in ['y', 'yes', '是']:
display_conversation(history)
print("\n" + "="*60)
print("🔄 准备接受下一个问题...\n")
# 快速测试函数
def quick_test(city="上海"):
"""快速测试指定城市的天气和景点"""
user_input = f"请帮我查询{city}的天气,并推荐适合的旅游景点"
print(f"🚀 快速测试: {user_input}")
final_answer, _ = run_assistant(user_input, display=True)
return final_answer
# 主启动入口
if __name__ == "__main__":
# 可以选择直接运行测试示例
print("选择运行模式:")
print("1. 运行测试示例 (北京)")
print("2. 交互模式")
print("3. 快速测试其他城市")
choice = input("请输入选择 (1/2/3): ").strip()
if choice == "1":
test_basic_example()
elif choice == "2":
interactive_travel_assistant()
elif choice == "3":
city = input("请输入要测试的城市: ").strip() or "上海"
quick_test(city)
else:
print("无效选择,运行测试示例...")
test_basic_example()