Co-creation-projects/1zrj-DataAnalysisAgent/main.ipynb
# 导入库和配置
from hello_agents import SimpleAgent, HelloAgentsLLM
from hello_agents.tools import Tool, ToolParameter
from typing import Dict, Any, List
import ast
import os
# 配置LLM参数
os.environ["LLM_MODEL_ID"] = "Qwen/Qwen3-8B"
os.environ["LLM_API_KEY"] = "ms-9382e20f-96c2-456a-b609-af5c81201066"
os.environ["LLM_BASE_URL"] = "https://api-inference.modelscope.cn/v1/"
os.environ["LLM_TIMEOUT"] = "60"
print("✅ 库导入和配置完成")
import json
import pandas as pd
from typing import Dict, Any, List
class DataCleaningTool(Tool):
"""数据清洗工具 - 基于用户指定规则清洗表格数据"""
def __init__(self):
super().__init__(
name="data_cleaner",
description="对传入的表格数据执行清洗操作,包括去空值、列筛选等"
)
def run(self, parameters: Dict[str, Any]) -> str:
"""
执行数据清洗
parameters 应包含:
- data_json: str,来自 excel_reader 的 JSON 字符串(必须)
- drop_na: bool,是否删除含空值的行(默认 False)
- columns_to_keep: List[str],保留的列名列表(可选)
"""
data_json = parameters.get("data_json")
if not data_json:
return "错误:缺少原始数据(data_json 不能为空)"
try:
# 解析原始数据
raw_data = json.loads(data_json)
records = raw_data.get("完整数据", [])
if not records:
return "警告:原始数据为空,无法清洗"
df = pd.DataFrame(records)
# 1. 列筛选
columns_to_keep = parameters.get("columns_to_keep")
if columns_to_keep:
missing_cols = [col for col in columns_to_keep if col not in df.columns]
if missing_cols:
return f"错误:指定保留的列不存在:{missing_cols}"
df = df[columns_to_keep]
# 2. 删除空值行
if parameters.get("drop_na", False):
original_len = len(df)
df = df.dropna()
dropped = original_len - len(df)
if dropped > 0:
pass
df = df.fillna(0)
# 构建清洗后结果
cleaned_records = df.where(pd.notnull(df), None).to_dict(orient='records')
result = {
"clean_data": cleaned_records
}
return json.dumps(result, ensure_ascii=False, indent=2)
except json.JSONDecodeError:
return "错误:data_json 不是有效的 JSON 格式"
except Exception as e:
return f"清洗过程中出错:{str(e)}"
def get_parameters(self) -> List[ToolParameter]:
return [
ToolParameter(
name="data_json",
type="string",
description="原始数据的 JSON 字符串",
required=True
),
ToolParameter(
name="drop_na",
type="boolean",
description="是否删除包含空值的行",
required=False
),
ToolParameter(
name="columns_to_keep",
type="array",
description="要保留的列名列表",
required=False
),
]
print("✅ DataCleaningTool 定义完成")
class DataStatisticsTool(Tool):
"""数据统计工具 - 提供描述性统计分析"""
def __init__(self):
super().__init__(
name="data_statistics",
description="对数据进行描述性统计分析,包括均值、中位数、标准差等"
)
def run(self, parameters: Dict[str, Any]) -> str:
data_json = parameters.get("data_json")
if not data_json:
return "错误:缺少数据"
try:
raw_data = json.loads(data_json)
records = raw_data.get("clean_data", [])
df = pd.DataFrame(records)
# 数值型列的统计
numeric_stats = {}
for col in df.select_dtypes(include=[np.number]).columns:
numeric_stats[col] = {
"count": int(df[col].count()),
"mean": float(df[col].mean()),
"median": float(df[col].median()),
"std": float(df[col].std()),
"min": float(df[col].min()),
"max": float(df[col].max()),
"q25": float(df[col].quantile(0.25)),
"q75": float(df[col].quantile(0.75))
}
# 分类型列的统计
categorical_stats = {}
for col in df.select_dtypes(include=['object']).columns:
value_counts = df[col].value_counts().head(10).to_dict()
categorical_stats[col] = {
"unique_count": int(df[col].nunique()),
"top_values": value_counts
}
result = {
"shape": f"{len(df)} 行, {len(df.columns)} 列",
"numeric_stats": numeric_stats,
"categorical_stats": categorical_stats,
}
return json.dumps(result, ensure_ascii=False, indent=2)
except Exception as e:
return f"统计分析出错:{str(e)}"
def get_parameters(self) -> List[ToolParameter]:
return [
ToolParameter(
name="data_json",
type="string",
description="数据的 JSON 字符串",
required=True
)
]
print("✅ DataStatisticsTool 定义完成")
# 创建工具注册表和智能体
from hello_agents import ToolRegistry
# 创建工具注册表
tool_registry = ToolRegistry()
tool_registry.register_tool(DataCleaningTool())
system_prompt = """你是一名数据分析师,你的任务是:
1. 使用data_cleaner工具清洗数据
2. 使用data_statistics工具统计数据
3. 选择合适的图表,用echarts代码绘制图表,例如:
option = {
xAxis: {
type: 'category',
data: ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
},
yAxis: {
type: 'value'
},
series: [
{
data: [120, 200, 150, 80, 70, 110, 130],
type: 'bar'
}
]
};
3、不要对代码分析,不要输出html,只输出echarts代码
4、最后基于数据,提供详细的数据分析报告
数据分析报告应包括:
- 分析背景与目标
- 关键的发现
- 进行统计计算、趋势识别、异常检测或对比分析
避免主观臆断,结论需基于数据,
请以Markdown格式输出报告。
"""
# 创建智能体
agent = SimpleAgent(
name="数据分析助手",
llm=HelloAgentsLLM(),
system_prompt=system_prompt,
tool_registry=tool_registry
)
print("✅ 智能体创建完成")
print(f"✅ 可用工具: {list(tool_registry._tools.keys())}")
file_path = "./data/simple_data.xls"
try:
df = pd.read_excel(file_path)
# ⚠️ 不做清洗!保留原始 NaN(pandas 会自动将 Excel 空单元格转为 NaN)
data_records = df.to_dict(orient='records')
# 构造符合 DataCleaningTool 要求的输入格式
clean_input = {
"完整数据": data_records
}
sample_data = json.dumps(clean_input, ensure_ascii=False, indent=2)
except FileNotFoundError:
sample_data = json.dumps({"error": f"Excel 文件不存在: {file_path}"}, ensure_ascii=False)
except Exception as e:
sample_data = json.dumps({"error": f"读取 Excel 文件失败: {str(e)}"}, ensure_ascii=False)
print(sample_data)
# 执行数据分析
print("=== 开始数据分析 ===")
result = agent.run(f"对以下数据绘制图表和数据分析\n\n{sample_data}\n")
print(result)
import re
import os
echarts_match = re.search(r"option\s*=\s*(\{[\s\S]*?\});", result)
if echarts_match:
echarts_code = echarts_match.group(1)
print(echarts_code)
else:
print("未找到 ECharts 代码")
report_match = re.search(r"(# 数据分析报告[\s\S]*)", result)
markdown_report = report_match.group(1).strip()
print("提取到 Markdown 报告")
# ==============================
# 3. 保存 Markdown 报告到文件
# ==============================
output_dir = "./output"
os.makedirs(output_dir, exist_ok=True)
md_path = os.path.join(output_dir, "report.md")
with open(md_path, "w", encoding="utf-8") as f:
f.write(markdown_report)
print(f"\nMarkdown 报告已保存至: {md_path}")
from IPython.display import HTML
html_code = f'''
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>第一个 ECharts 实例</title>
<!-- 引入 echarts.js -->
<script src="https://cdn.staticfile.org/echarts/4.3.0/echarts.min.js"></script>
</head>
<body>
<!-- 为ECharts准备一个具备大小(宽高)的Dom -->
<div id="main" style="width: 600px;height:400px;"></div>
<script type="text/javascript">
// 基于准备好的dom,初始化echarts实例
var myChart = echarts.init(document.getElementById('main'));
// 指定图表的配置项和数据
var option = {echarts_code}
// 使用刚指定的配置项和数据显示图表。
myChart.setOption(option);
</script>
</body>
</html>
'''
from IPython.display import IFrame
# 将 HTML 代码保存为文件
with open('./output/echarts.html', 'w', encoding='utf-8') as f:
f.write(html_code)