Back to Hello Agents

DataAnalysisAgent - 智能数据分析助手

Co-creation-projects/1zrj-DataAnalysisAgent/main.ipynb

1.0.29.9 KB
Original Source

DataAnalysisAgent - 智能数据分析助手

第1部分:环境配置

python
# 导入库和配置
from hello_agents import SimpleAgent, HelloAgentsLLM
from hello_agents.tools import Tool, ToolParameter
from typing import Dict, Any, List
import ast
import os

# 配置LLM参数
os.environ["LLM_MODEL_ID"] = "Qwen/Qwen3-8B"
os.environ["LLM_API_KEY"] = "ms-9382e20f-96c2-456a-b609-af5c81201066"
os.environ["LLM_BASE_URL"] = "https://api-inference.modelscope.cn/v1/"
os.environ["LLM_TIMEOUT"] = "60"

print("✅ 库导入和配置完成")

第2部分:定义数据分析工具

python
import json
import pandas as pd
from typing import Dict, Any, List

class DataCleaningTool(Tool):
    """数据清洗工具 - 基于用户指定规则清洗表格数据"""

    def __init__(self):
        super().__init__(
            name="data_cleaner",
            description="对传入的表格数据执行清洗操作,包括去空值、列筛选等"
        )

    def run(self, parameters: Dict[str, Any]) -> str:
        """
        执行数据清洗
        parameters 应包含:
          - data_json: str,来自 excel_reader 的 JSON 字符串(必须)
          - drop_na: bool,是否删除含空值的行(默认 False)
          - columns_to_keep: List[str],保留的列名列表(可选)
        """
        data_json = parameters.get("data_json")
        if not data_json:
            return "错误:缺少原始数据(data_json 不能为空)"

        try:
            # 解析原始数据
            raw_data = json.loads(data_json)
            records = raw_data.get("完整数据", [])
            if not records:
                return "警告:原始数据为空,无法清洗"

            df = pd.DataFrame(records)

            # 1. 列筛选
            columns_to_keep = parameters.get("columns_to_keep")
            if columns_to_keep:
                missing_cols = [col for col in columns_to_keep if col not in df.columns]
                if missing_cols:
                    return f"错误:指定保留的列不存在:{missing_cols}"
                df = df[columns_to_keep]


            # 2. 删除空值行
            if parameters.get("drop_na", False):
                original_len = len(df)
                df = df.dropna()
                dropped = original_len - len(df)
                if dropped > 0:
                    pass
            
            df = df.fillna(0)
            # 构建清洗后结果
            cleaned_records = df.where(pd.notnull(df), None).to_dict(orient='records')
            result = {
                "clean_data": cleaned_records
            }

            return json.dumps(result, ensure_ascii=False, indent=2)

        except json.JSONDecodeError:
            return "错误:data_json 不是有效的 JSON 格式"
        except Exception as e:
            return f"清洗过程中出错:{str(e)}"

    def get_parameters(self) -> List[ToolParameter]:
        return [
            ToolParameter(
                name="data_json",
                type="string",
                description="原始数据的 JSON 字符串",
                required=True
            ),
            ToolParameter(
                name="drop_na",
                type="boolean",
                description="是否删除包含空值的行",
                required=False
            ),
            ToolParameter(
                name="columns_to_keep",
                type="array",
                description="要保留的列名列表",
                required=False
            ),
        ]

print("✅ DataCleaningTool 定义完成")
python
class DataStatisticsTool(Tool):
    """数据统计工具 - 提供描述性统计分析"""

    def __init__(self):
        super().__init__(
            name="data_statistics",
            description="对数据进行描述性统计分析,包括均值、中位数、标准差等"
        )

    def run(self, parameters: Dict[str, Any]) -> str:
        data_json = parameters.get("data_json")
        if not data_json:
            return "错误:缺少数据"

        try:
            raw_data = json.loads(data_json)
            records = raw_data.get("clean_data", [])
            df = pd.DataFrame(records)
            
            # 数值型列的统计
            numeric_stats = {}
            for col in df.select_dtypes(include=[np.number]).columns:
                numeric_stats[col] = {
                    "count": int(df[col].count()),
                    "mean": float(df[col].mean()),
                    "median": float(df[col].median()),
                    "std": float(df[col].std()),
                    "min": float(df[col].min()),
                    "max": float(df[col].max()),
                    "q25": float(df[col].quantile(0.25)),
                    "q75": float(df[col].quantile(0.75))
                }
            
            # 分类型列的统计
            categorical_stats = {}
            for col in df.select_dtypes(include=['object']).columns:
                value_counts = df[col].value_counts().head(10).to_dict()
                categorical_stats[col] = {
                    "unique_count": int(df[col].nunique()),
                    "top_values": value_counts
                }
            
            result = {
                "shape": f"{len(df)} 行, {len(df.columns)} 列",
                "numeric_stats": numeric_stats,
                "categorical_stats": categorical_stats,
            }
            
            return json.dumps(result, ensure_ascii=False, indent=2)
            
        except Exception as e:
            return f"统计分析出错:{str(e)}"

    def get_parameters(self) -> List[ToolParameter]:
        return [
            ToolParameter(
                name="data_json",
                type="string",
                description="数据的 JSON 字符串",
                required=True
            )
        ]

print("✅ DataStatisticsTool 定义完成")

第3部分:创建智能体

python
# 创建工具注册表和智能体
from hello_agents import ToolRegistry

# 创建工具注册表
tool_registry = ToolRegistry()
tool_registry.register_tool(DataCleaningTool())

system_prompt = """你是一名数据分析师,你的任务是:
    1. 使用data_cleaner工具清洗数据
    2. 使用data_statistics工具统计数据
    3. 选择合适的图表,用echarts代码绘制图表,例如:
    option = {
        xAxis: {
            type: 'category',
            data: ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun']
        },
        yAxis: {
            type: 'value'
        },
        series: [
            {
            data: [120, 200, 150, 80, 70, 110, 130],
            type: 'bar'
            }
        ]
    };
    3、不要对代码分析,不要输出html,只输出echarts代码
    4、最后基于数据,提供详细的数据分析报告
    
    数据分析报告应包括:
    - 分析背景与目标
    - 关键的发现
    - 进行统计计算、趋势识别、异常检测或对比分析
    避免主观臆断,结论需基于数据,
    请以Markdown格式输出报告。
    """
# 创建智能体
agent = SimpleAgent(
    name="数据分析助手",
    llm=HelloAgentsLLM(),
    system_prompt=system_prompt,
    tool_registry=tool_registry
)

print("✅ 智能体创建完成")
print(f"✅ 可用工具: {list(tool_registry._tools.keys())}")

第4部分:读取示例数据表格

python
file_path = "./data/simple_data.xls"

try:
    df = pd.read_excel(file_path)
    # ⚠️ 不做清洗!保留原始 NaN(pandas 会自动将 Excel 空单元格转为 NaN)
    data_records = df.to_dict(orient='records') 

    # 构造符合 DataCleaningTool 要求的输入格式
    clean_input = {
        "完整数据": data_records
    }
    sample_data = json.dumps(clean_input, ensure_ascii=False, indent=2)

except FileNotFoundError:
    sample_data = json.dumps({"error": f"Excel 文件不存在: {file_path}"}, ensure_ascii=False)
except Exception as e:
    sample_data = json.dumps({"error": f"读取 Excel 文件失败: {str(e)}"}, ensure_ascii=False)

print(sample_data)

第5部分:执行数据分析

python
# 执行数据分析
print("=== 开始数据分析 ===")
result = agent.run(f"对以下数据绘制图表和数据分析\n\n{sample_data}\n")

print(result)

第6部分:保存分析报告和图表

python
import re
import os

echarts_match = re.search(r"option\s*=\s*(\{[\s\S]*?\});", result)
if echarts_match:
    echarts_code = echarts_match.group(1)
    print(echarts_code)
else:
    print("未找到 ECharts 代码")
python
report_match = re.search(r"(# 数据分析报告[\s\S]*)", result)

markdown_report = report_match.group(1).strip()
print("提取到 Markdown 报告")


# ==============================
# 3. 保存 Markdown 报告到文件
# ==============================
output_dir = "./output"
os.makedirs(output_dir, exist_ok=True)
md_path = os.path.join(output_dir, "report.md")

with open(md_path, "w", encoding="utf-8") as f:
    f.write(markdown_report)

print(f"\nMarkdown 报告已保存至: {md_path}")
python
from IPython.display import HTML

html_code = f'''
<!DOCTYPE html>
<html>
<head>
    <meta charset="utf-8">
    <title>第一个 ECharts 实例</title>
    <!-- 引入 echarts.js -->
    <script src="https://cdn.staticfile.org/echarts/4.3.0/echarts.min.js"></script>
</head>
<body>
    <!-- 为ECharts准备一个具备大小(宽高)的Dom -->
    <div id="main" style="width: 600px;height:400px;"></div>
    <script type="text/javascript">
        // 基于准备好的dom,初始化echarts实例
        var myChart = echarts.init(document.getElementById('main'));
 
        // 指定图表的配置项和数据
        var option = {echarts_code}
        
        // 使用刚指定的配置项和数据显示图表。
        myChart.setOption(option);
    </script>
</body>
</html>
'''
python
from IPython.display import IFrame

# 将 HTML 代码保存为文件
with open('./output/echarts.html', 'w', encoding='utf-8') as f:
    f.write(html_code)