让AI不只是聊天,而是能实际执行任务、调用工具、完成复杂工作流的技术实践

引言:从思考到执行

在过去的几天里,我们掌握了环境搭建和提示工程,让AI能够更好地理解和回应我们的需求。但如果你观察过真实的工作场景,你会发现一个明显的瓶颈:AI只能思考,不能行动

想象一下这样的场景:

  • 你想让AI帮你查询今天的天气,它知道如何描述查询逻辑,但无法真正调用天气API
  • 你希望AI分析一份数据报告,它理解数据分析方法,但无法读取你的Excel文件
  • 你需要AI安排一次团队会议,它能生成完美的会议议程,但无法添加到你的日历中

这就是今天我们要解决的核⼼问题:让AI从思考者转变为执行者

函数调用(Function Calling)与工具集成技术,正是打通这“最后一公里”的关键。通过这项技术,AI能够:

  1. 理解你的意图,将其转化为具体的操作请求
  2. 选择合适的工具(函数、API、系统调用)
  3. 正确调用工具并处理返回结果
  4. 整合结果继续后续工作流

本文将带你从原理到实践,全面掌握函数调用技术,并构建一个自动化数据报表生成系统作为实战案例。

第一部分:函数调用原理与实现

1.1 函数调用的技术本质

函数调用不是魔法,而是结构化数据交换的艺术。其核心流程可以概括为:

用户自然语言请求

LLM理解意图

LLM生成结构化调用请求

外部系统执行函数

获取执行结果

LLM解释结果

最终响应给用户

在这个过程中,LLM扮演着“智能路由器”的角色,它不执行代码,但知道应该调用什么代码、传递什么参数。

1.2 主流平台实现对比

目前市场上主要有两种技术路线:

OpenAI Functions
# OpenAI函数定义示例
functions = [
    {
        "name": "get_weather",
        "description": "获取指定城市的天气信息",
        "parameters": {
            "type": "object",
            "properties": {
                "city": {
                    "type": "string",
                    "description": "城市名称,如北京、上海"
                },
                "unit": {
                    "type": "string",
                    "enum": ["celsius", "fahrenheit"],
                    "description": "温度单位"
                }
            },
            "required": ["city"]
        }
    }
]

特点

  • 基于JSON Schema的函数定义
  • 模型返回function_call字段而非普通文本
  • 需要开发者手动执行函数并传回结果
Anthropic Tools
# Anthropic工具定义示例
tools = [
    {
        "name": "get_weather",
        "description": "获取指定城市的天气信息",
        "input_schema": {
            "type": "object",
            "properties": {
                "city": {"type": "string"},
                "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
            },
            "required": ["city"]
        }
    }
]

特点

  • 更接近OpenAI但略有不同
  • 支持工具使用历史记录
  • 更强的工具组合能力
国内厂商实现

国内厂商如百度文心、阿里通义、智谱GLM等也提供了类似功能,但API设计各有特色:

  • 百度文心:使用tools参数,支持函数定义和调用
  • 智谱GLM:通过functions参数,与OpenAI高度兼容
  • 阿里通义:提供function_call能力,支持复杂参数

1.3 参数验证与错误处理

函数调用的可靠性取决于严格的参数验证:

import jsonschema
from typing import Dict, Any

def validate_function_params(func_def: Dict, params: Dict) -> Dict[str, Any]:
    """验证函数参数是否符合schema定义"""
    try:
        jsonschema.validate(params, func_def["parameters"])
        return {"valid": True, "params": params}
    except jsonschema.ValidationError as e:
        # 智能参数修复
        if "city" in params and isinstance(params["city"], str):
            # 简单清洗:去除空格,首字母大写
            params["city"] = params["city"].strip().title()
            try:
                jsonschema.validate(params, func_def["parameters"])
                return {"valid": True, "params": params, "note": "参数已自动修复"}
            except:
                pass
        
        return {
            "valid": False,
            "error": str(e),
            "suggestion": "请检查参数格式,确保提供城市名称(字符串)"
        }

# 使用示例
weather_func = {
    "name": "get_weather",
    "parameters": {
        "type": "object",
        "properties": {
            "city": {"type": "string", "minLength": 1},
            "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}
        },
        "required": ["city"]
    }
}

# 测试验证
test_params = {"city": "  beijing  ", "unit": "celsius"}
result = validate_function_params(weather_func, test_params)
print(f"验证结果: {result}")

1.4 重试机制与容错设计

在实际应用中,函数调用可能失败,需要设计智能重试机制:

import time
from functools import wraps
from typing import Callable, Optional

class FunctionCallRetry:
    """函数调用重试管理器"""
    
    def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
        self.max_retries = max_retries
        self.base_delay = base_delay
    
    def retry(self, func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_error = None
            
            for attempt in range(self.max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_error = e
                    
                    if attempt < self.max_retries:
                        # 指数退避策略
                        delay = self.base_delay * (2 ** attempt)
                        print(f"第{attempt + 1}次调用失败,{delay}秒后重试: {str(e)}")
                        time.sleep(delay)
            
            # 所有重试都失败
            raise FunctionCallError(
                f"函数调用失败,共尝试{self.max_retries + 1}次",
                original_error=last_error
            )
        
        return wrapper

class FunctionCallError(Exception):
    """函数调用专用异常"""
    def __init__(self, message: str, original_error: Optional[Exception] = None):
        super().__init__(message)
        self.original_error = original_error

# 使用示例
@FunctionCallRetry(max_retries=2).retry
def call_external_api(endpoint: str, data: dict):
    """调用外部API(模拟可能失败的情况)"""
    import random
    # 模拟30%的失败率
    if random.random() < 0.3:
        raise ConnectionError("API连接超时")
    return {"status": "success", "data": data}

第二部分:常用工具集成实战

2.1 文件操作工具

AI应用经常需要读写文件,以下是一个完整的文件操作工具集:

import os
import json
import csv
import pandas as pd
from pathlib import Path
from typing import Union, List, Dict, Any

class FileOperations:
    """文件操作工具类"""
    
    @staticmethod
    def read_file(file_path: str, encoding: str = "utf-8") -> str:
        """读取文本文件"""
        path = Path(file_path)
        if not path.exists():
            raise FileNotFoundError(f"文件不存在: {file_path}")
        
        with open(path, 'r', encoding=encoding) as f:
            return f.read()
    
    @staticmethod
    def write_file(file_path: str, content: str, encoding: str = "utf-8") -> bool:
        """写入文本文件"""
        path = Path(file_path)
        # 确保目录存在
        path.parent.mkdir(parents=True, exist_ok=True)
        
        with open(path, 'w', encoding=encoding) as f:
            f.write(content)
        return True
    
    @staticmethod
    def read_csv(file_path: str) -> List[Dict[str, Any]]:
        """读取CSV文件为字典列表"""
        with open(file_path, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            return [row for row in reader]
    
    @staticmethod
    def read_json(file_path: str) -> Any:
        """读取JSON文件"""
        with open(file_path, 'r', encoding='utf-8') as f:
            return json.load(f)
    
    @staticmethod
    def list_files(directory: str, pattern: str = "*") -> List[str]:
        """列出目录中的文件"""
        path = Path(directory)
        if not path.exists():
            return []
        
        return [str(p) for p in path.glob(pattern)]

# 将工具暴露给AI的函数定义
file_tools = [
    {
        "name": "read_text_file",
        "description": "读取文本文件内容",
        "parameters": {
            "type": "object",
            "properties": {
                "file_path": {"type": "string", "description": "文件路径"},
                "encoding": {"type": "string", "description": "文件编码,默认utf-8"}
            },
            "required": ["file_path"]
        }
    },
    {
        "name": "write_text_file",
        "description": "写入内容到文本文件",
        "parameters": {
            "type": "object",
            "properties": {
                "file_path": {"type": "string", "description": "文件路径"},
                "content": {"type": "string", "description": "要写入的内容"},
                "encoding": {"type": "string", "description": "文件编码,默认utf-8"}
            },
            "required": ["file_path", "content"]
        }
    },
    {
        "name": "read_csv_file",
        "description": "读取CSV文件为结构化数据",
        "parameters": {
            "type": "object",
            "properties": {
                "file_path": {"type": "string", "description": "CSV文件路径"}
            },
            "required": ["file_path"]
        }
    }
]

2.2 网络请求工具

让AI能够主动获取外部信息:

import requests
import aiohttp
import asyncio
from typing import Dict, Any, Optional
from urllib.parse import urlencode

class NetworkTools:
    """网络请求工具集"""
    
    @staticmethod
    def fetch_url(url: str, params: Optional[Dict] = None, 
                  headers: Optional[Dict] = None, timeout: int = 10) -> Dict[str, Any]:
        """同步获取URL内容"""
        try:
            response = requests.get(
                url, 
                params=params, 
                headers=headers or {},
                timeout=timeout
            )
            response.raise_for_status()
            
            # 根据内容类型返回不同格式
            content_type = response.headers.get('content-type', '')
            if 'application/json' in content_type:
                data = response.json()
                return {"success": True, "data": data, "type": "json"}
            else:
                text = response.text
                return {"success": True, "data": text, "type": "text"}
                
        except requests.exceptions.RequestException as e:
            return {"success": False, "error": str(e)}
    
    @staticmethod
    async def fetch_url_async(url: str, params: Optional[Dict] = None,
                             headers: Optional[Dict] = None, timeout: int = 10) -> Dict[str, Any]:
        """异步获取URL内容"""
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    url, 
                    params=params, 
                    headers=headers or {},
                    timeout=aiohttp.ClientTimeout(total=timeout)
                ) as response:
                    response.raise_for_status()
                    
                    content_type = response.headers.get('content-type', '')
                    if 'application/json' in content_type:
                        data = await response.json()
                        return {"success": True, "data": data, "type": "json"}
                    else:
                        text = await response.text()
                        return {"success": True, "data": text, "type": "text"}
                        
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    @staticmethod
    def search_web(query: str, limit: int = 5) -> Dict[str, Any]:
        """模拟网络搜索(实际项目中可接入真实搜索API)"""
        # 这里使用模拟数据,实际可接入Google Custom Search、SerpAPI等
        mock_results = [
            {"title": f"{query}的相关信息1", "url": "https://example.com/1", "snippet": f"这是关于{query}的第一个搜索结果"},
            {"title": f"{query}的深入分析", "url": "https://example.com/2", "snippet": f"深入探讨{query}的技术细节"},
            {"title": f"{query}的最佳实践", "url": "https://example.com/3", "snippet": f"分享{query}的实际应用经验"},
        ]
        
        return {
            "success": True,
            "query": query,
            "results": mock_results[:limit]
        }

# 网络工具函数定义
network_tools = [
    {
        "name": "fetch_web_content",
        "description": "获取网页内容",
        "parameters": {
            "type": "object",
            "properties": {
                "url": {"type": "string", "description": "网页URL"},
                "params": {"type": "object", "description": "查询参数"},
                "timeout": {"type": "integer", "description": "超时时间(秒)"}
            },
            "required": ["url"]
        }
    },
    {
        "name": "search_internet",
        "description": "在互联网上搜索信息",
        "parameters": {
            "type": "object",
            "properties": {
                "query": {"type": "string", "description": "搜索关键词"},
                "limit": {"type": "integer", "description": "结果数量限制"}
            },
            "required": ["query"]
        }
    }
]

2.3 数据库操作工具

AI与数据库交互的标准化接口:

import sqlite3
from contextlib import contextmanager
from typing import List, Dict, Any, Optional

class DatabaseTools:
    """数据库操作工具集"""
    
    def __init__(self, db_path: str = "data/app.db"):
        self.db_path = db_path
    
    @contextmanager
    def get_connection(self):
        """获取数据库连接(上下文管理器)"""
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row  # 返回字典格式的结果
        try:
            yield conn
            conn.commit()
        except Exception:
            conn.rollback()
            raise
        finally:
            conn.close()
    
    def execute_query(self, sql: str, params: tuple = ()) -> List[Dict[str, Any]]:
        """执行查询语句"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, params)
            rows = cursor.fetchall()
            return [dict(row) for row in rows]
    
    def execute_update(self, sql: str, params: tuple = ()) -> int:
        """执行更新/插入/删除语句,返回影响行数"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(sql, params)
            return cursor.rowcount
    
    def get_table_schema(self, table_name: str) -> Dict[str, Any]:
        """获取表结构信息"""
        sql = """
        SELECT name, type, "notnull", dflt_value, pk 
        FROM pragma_table_info(?)
        ORDER BY cid
        """
        columns = self.execute_query(sql, (table_name,))
        
        # 获取索引信息
        index_sql = """
        SELECT name, "unique", origin, partial
        FROM pragma_index_list(?)
        """
        indexes = self.execute_query(index_sql, (table_name,))
        
        return {
            "table_name": table_name,
            "columns": columns,
            "indexes": indexes
        }
    
    def search_data(self, table_name: str, search_field: str, 
                   search_value: str, limit: int = 10) -> List[Dict[str, Any]]:
        """在指定表中搜索数据"""
        sql = f"SELECT * FROM {table_name} WHERE {search_field} LIKE ? LIMIT ?"
        return self.execute_query(sql, (f"%{search_value}%", limit))

# 数据库工具函数定义
database_tools = [
    {
        "name": "query_database",
        "description": "执行数据库查询",
        "parameters": {
            "type": "object",
            "properties": {
                "sql": {"type": "string", "description": "SQL查询语句"},
                "params": {"type": "array", "description": "查询参数"}
            },
            "required": ["sql"]
        }
    },
    {
        "name": "get_table_info",
        "description": "获取数据库表结构信息",
        "parameters": {
            "type": "object",
            "properties": {
                "table_name": {"type": "string", "description": "表名称"}
            },
            "required": ["table_name"]
        }
    },
    {
        "name": "search_in_table",
        "description": "在数据库表中搜索数据",
        "parameters": {
            "type": "object",
            "properties": {
                "table_name": {"type": "string", "description": "表名称"},
                "search_field": {"type": "string", "description": "搜索字段"},
                "search_value": {"type": "string", "description": "搜索值"},
                "limit": {"type": "integer", "description": "结果数量限制"}
            },
            "required": ["table_name", "search_field", "search_value"]
        }
    }
]

第三部分:复杂工作流设计

3.1 多步骤任务分解

复杂任务需要分解为多个可执行的步骤:

from enum import Enum
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Callable

class TaskStatus(Enum):
    """任务状态枚举"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class TaskStep:
    """任务步骤定义"""
    id: str
    name: str
    description: str
    required_tools: List[str]  # 需要的工具列表
    function_name: str  # 要调用的函数名
    function_params: Dict[str, Any]  # 函数参数
    depends_on: List[str] = None  # 依赖的步骤ID
    timeout: int = 30  # 超时时间(秒)
    
    def __post_init__(self):
        if self.depends_on is None:
            self.depends_on = []

@dataclass
class TaskExecution:
    """任务执行状态"""
    task_id: str
    steps: List[TaskStep]
    current_step: int = 0
    status: TaskStatus = TaskStatus.PENDING
    results: Dict[str, Any] = None
    errors: List[str] = None
    
    def __post_init__(self):
        if self.results is None:
            self.results = {}
        if self.errors is None:
            self.errors = []

class WorkflowEngine:
    """工作流引擎"""
    
    def __init__(self, available_tools: Dict[str, Callable]):
        self.tools = available_tools
        self.tasks: Dict[str, TaskExecution] = {}
    
    def create_task(self, steps: List[TaskStep]) -> str:
        """创建新任务"""
        import uuid
        task_id = f"task_{uuid.uuid4().hex[:8]}"
        
        # 验证步骤依赖关系
        step_ids = {step.id for step in steps}
        for step in steps:
            for dep in step.depends_on:
                if dep not in step_ids:
                    raise ValueError(f"步骤 {step.id} 依赖不存在的步骤 {dep}")
        
        # 验证工具可用性
        for step in steps:
            for tool in step.required_tools:
                if tool not in self.tools:
                    raise ValueError(f"步骤 {step.id} 需要未注册的工具 {tool}")
        
        task = TaskExecution(task_id=task_id, steps=steps)
        self.tasks[task_id] = task
        return task_id
    
    def execute_step(self, task_id: str, step_id: str) -> Dict[str, Any]:
        """执行单个步骤"""
        task = self.tasks.get(task_id)
        if not task:
            return {"success": False, "error": f"任务 {task_id} 不存在"}
        
        # 查找步骤
        step = None
        step_index = -1
        for i, s in enumerate(task.steps):
            if s.id == step_id:
                step = s
                step_index = i
                break
        
        if not step:
            return {"success": False, "error": f"步骤 {step_id} 不存在"}
        
        # 检查依赖是否完成
        for dep_id in step.depends_on:
            if dep_id not in task.results:
                return {
                    "success": False, 
                    "error": f"步骤 {step_id} 依赖的步骤 {dep_id} 未完成"
                }
        
        # 执行步骤
        task.status = TaskStatus.RUNNING
        task.current_step = step_index
        
        try:
            # 获取工具函数
            tool_func = self.tools.get(step.function_name)
            if not tool_func:
                raise ValueError(f"工具函数 {step.function_name} 未注册")
            
            # 执行函数调用
            result = tool_func(**step.function_params)
            
            # 存储结果
            task.results[step_id] = result
            return {"success": True, "result": result}
            
        except Exception as e:
            error_msg = f"步骤 {step_id} 执行失败: {str(e)}"
            task.errors.append(error_msg)
            task.status = TaskStatus.FAILED
            return {"success": False, "error": error_msg}
    
    def execute_task(self, task_id: str) -> Dict[str, Any]:
        """执行整个任务"""
        task = self.tasks.get(task_id)
        if not task:
            return {"success": False, "error": f"任务 {task_id} 不存在"}
        
        if task.status != TaskStatus.PENDING:
            return {"success": False, "error": f"任务状态为 {task.status.value},无法执行"}
        
        task.status = TaskStatus.RUNNING
        all_results = {}
        
        # 按顺序执行步骤(简化版本,实际需要考虑依赖图)
        for step in task.steps:
            result = self.execute_step(task_id, step.id)
            if not result["success"]:
                task.status = TaskStatus.FAILED
                return {
                    "success": False,
                    "error": result["error"],
                    "completed_steps": list(all_results.keys())
                }
            
            all_results[step.id] = result.get("result")
        
        task.status = TaskStatus.COMPLETED
        return {
            "success": True,
            "task_id": task_id,
            "results": all_results
        }

3.2 工具调用顺序优化

智能优化工具调用顺序以减少等待时间:

from collections import defaultdict, deque
from typing import Set, List, Dict, Tuple

class ToolScheduler:
    """工具调用调度器"""
    
    def __init__(self):
        self.tool_durations = defaultdict(float)  # 工具平均执行时间
        self.tool_dependencies = defaultdict(set)  # 工具依赖关系
    
    def add_tool(self, name: str, avg_duration: float = 1.0, 
                 dependencies: List[str] = None):
        """添加工具到调度器"""
        self.tool_durations[name] = avg_duration
        if dependencies:
            self.tool_dependencies[name] = set(dependencies)
    
    def schedule_tools(self, required_tools: List[str]) -> List[str]:
        """调度工具执行顺序"""
        # 构建依赖图
        graph = defaultdict(list)
        in_degree = defaultdict(int)
        
        # 初始化所有节点
        all_tools = set(required_tools)
        for tool in all_tools:
            graph[tool] = []
            in_degree[tool] = 0
        
        # 添加依赖边
        for tool in required_tools:
            for dep in self.tool_dependencies[tool]:
                if dep in all_tools:
                    graph[dep].append(tool)
                    in_degree[tool] += 1
        
        # 拓扑排序
        queue = deque([tool for tool in all_tools if in_degree[tool] == 0])
        execution_order = []
        
        while queue:
            # 优先选择执行时间短的工具
            queue = deque(sorted(queue, key=lambda x: self.tool_durations[x]))
            current = queue.popleft()
            execution_order.append(current)
            
            for neighbor in graph[current]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)
        
        # 检查是否有环
        if len(execution_order) != len(all_tools):
            # 有环,返回按执行时间排序的列表
            return sorted(required_tools, key=lambda x: self.tool_durations[x])
        
        return execution_order
    
    def estimate_completion_time(self, tool_sequence: List[str]) -> float:
        """估计完成时间"""
        total = 0.0
        for tool in tool_sequence:
            total += self.tool_durations.get(tool, 1.0)
        return total

# 使用示例
scheduler = ToolScheduler()
scheduler.add_tool("read_file", avg_duration=0.5)
scheduler.add_tool("analyze_data", avg_duration=2.0, dependencies=["read_file"])
scheduler.add_tool("fetch_external", avg_duration=1.5)
scheduler.add_tool("generate_report", avg_duration=1.0, 
                   dependencies=["analyze_data", "fetch_external"])

required = ["read_file", "analyze_data", "fetch_external", "generate_report"]
optimal_order = scheduler.schedule_tools(required)
estimated_time = scheduler.estimate_completion_time(optimal_order)

print(f"最优执行顺序: {optimal_order}")
print(f"预计完成时间: {estimated_time}秒")

3.3 状态管理与上下文保持

在多步骤工作流中保持状态一致性:

import pickle
import hashlib
from datetime import datetime
from typing import Any, Optional

class WorkflowState:
    """工作流状态管理器"""
    
    def __init__(self, workflow_id: str):
        self.workflow_id = workflow_id
        self.state_data = {}
        self.state_history = []
        self.created_at = datetime.now()
        self.last_updated = self.created_at
    
    def set(self, key: str, value: Any, reason: str = ""):
        """设置状态值"""
        old_value = self.state_data.get(key)
        self.state_data[key] = value
        self.last_updated = datetime.now()
        
        # 记录历史
        self.state_history.append({
            "timestamp": self.last_updated,
            "key": key,
            "old_value": old_value,
            "new_value": value,
            "reason": reason
        })
        
        # 限制历史记录长度
        if len(self.state_history) > 100:
            self.state_history = self.state_history[-100:]
    
    def get(self, key: str, default: Any = None) -> Any:
        """获取状态值"""
        return self.state_data.get(key, default)
    
    def snapshot(self) -> Dict[str, Any]:
        """创建状态快照"""
        return {
            "workflow_id": self.workflow_id,
            "state_data": self.state_data.copy(),
            "history_length": len(self.state_history),
            "created_at": self.created_at.isoformat(),
            "last_updated": self.last_updated.isoformat()
        }
    
    def compute_checksum(self) -> str:
        """计算状态校验和"""
        state_bytes = pickle.dumps(self.state_data)
        return hashlib.md5(state_bytes).hexdigest()
    
    def rollback(self, steps: int = 1) -> bool:
        """回滚指定步数的状态变化"""
        if steps <= 0 or steps > len(self.state_history):
            return False
        
        # 获取要回滚的历史记录
        rollback_history = self.state_history[-steps:]
        
        # 逆向应用回滚
        for entry in reversed(rollback_history):
            if entry["old_value"] is None:
                # 原来没有值,删除键
                self.state_data.pop(entry["key"], None)
            else:
                # 恢复旧值
                self.state_data[entry["key"]] = entry["old_value"]
        
        # 移除已回滚的历史记录
        self.state_history = self.state_history[:-steps]
        self.last_updated = datetime.now()
        
        return True

class StatefulWorkflow:
    """带状态的工作流"""
    
    def __init__(self, name: str):
        self.name = name
        self.state = WorkflowState(f"{name}_{datetime.now().timestamp()}")
        self.steps_completed = 0
    
    def execute_step(self, step_func, *args, **kwargs):
        """执行步骤并管理状态"""
        step_name = step_func.__name__
        
        try:
            # 记录步骤开始
            self.state.set(f"step_{step_name}_started", True, 
                          f"开始执行步骤 {step_name}")
            
            # 执行步骤
            result = step_func(*args, **kwargs)
            
            # 记录步骤完成
            self.state.set(f"step_{step_name}_result", result,
                          f"步骤 {step_name} 执行完成")
            self.state.set(f"step_{step_name}_completed", True,
                          f"步骤 {step_name} 完成")
            
            self.steps_completed += 1
            return {"success": True, "result": result}
            
        except Exception as e:
            # 记录错误
            self.state.set(f"step_{step_name}_error", str(e),
                          f"步骤 {step_name} 执行失败")
            return {"success": False, "error": str(e)}

第四部分:安全考量与最佳实践

4.1 工具调用权限控制

不同用户和场景需要不同的工具访问权限:

from enum import Enum
from typing import Set, List

class PermissionLevel(Enum):
    """权限级别"""
    GUEST = 1      # 仅查看
    USER = 2       # 基本操作
    POWER_USER = 3 # 高级操作
    ADMIN = 4      # 所有操作
    SYSTEM = 5     # 系统级操作

class ToolPermission:
    """工具权限管理器"""
    
    def __init__(self):
        # 工具到所需权限的映射
        self.tool_permissions = {}
        # 用户权限缓存
        self.user_permissions = {}
    
    def register_tool(self, tool_name: str, required_level: PermissionLevel,
                     allowed_users: List[str] = None, denied_users: List[str] = None):
        """注册工具权限要求"""
        self.tool_permissions[tool_name] = {
            "required_level": required_level,
            "allowed_users": set(allowed_users) if allowed_users else None,
            "denied_users": set(denied_users) if denied_users else None
        }
    
    def set_user_permission(self, user_id: str, level: PermissionLevel,
                           custom_tools: List[str] = None):
        """设置用户权限"""
        self.user_permissions[user_id] = {
            "level": level,
            "custom_tools": set(custom_tools) if custom_tools else set()
        }
    
    def check_permission(self, user_id: str, tool_name: str) -> bool:
        """检查用户是否有权限使用工具"""
        # 获取工具权限要求
        tool_perm = self.tool_permissions.get(tool_name)
        if not tool_perm:
            # 未注册的工具默认拒绝
            return False
        
        # 获取用户权限
        user_perm = self.user_permissions.get(user_id)
        if not user_perm:
            # 未知用户默认最低权限
            user_level = PermissionLevel.GUEST
            user_custom_tools = set()
        else:
            user_level = user_perm["level"]
            user_custom_tools = user_perm["custom_tools"]
        
        # 检查显式允许/拒绝
        if tool_perm["allowed_users"] and user_id not in tool_perm["allowed_users"]:
            return False
        
        if tool_perm["denied_users"] and user_id in tool_perm["denied_users"]:
            return False
        
        # 检查权限级别
        if user_level.value < tool_perm["required_level"].value:
            # 权限不足,但检查是否有自定义工具权限
            if tool_name in user_custom_tools:
                return True
            return False
        
        return True
    
    def audit_tool_usage(self, user_id: str, tool_name: str, 
                        params: dict, success: bool) -> dict:
        """审计工具使用记录"""
        import datetime
        return {
            "timestamp": datetime.datetime.now().isoformat(),
            "user_id": user_id,
            "tool_name": tool_name,
            "params_summary": str(params)[:200],  # 限制参数长度
            "success": success,
            "permission_granted": self.check_permission(user_id, tool_name)
        }

# 使用示例
permission_mgr = ToolPermission()

# 注册工具权限
permission_mgr.register_tool("read_file", PermissionLevel.USER)
permission_mgr.register_tool("write_file", PermissionLevel.POWER_USER)
permission_mgr.register_tool("execute_system", PermissionLevel.SYSTEM,
                           allowed_users=["admin_user"])

# 设置用户权限
permission_mgr.set_user_permission("alice", PermissionLevel.POWER_USER)
permission_mgr.set_user_permission("bob", PermissionLevel.USER)
permission_mgr.set_user_permission("admin_user", PermissionLevel.ADMIN,
                                 custom_tools=["execute_system"])

# 检查权限
print(f"Alice可以写入文件: {permission_mgr.check_permission('alice', 'write_file')}")
print(f"Bob可以写入文件: {permission_mgr.check_permission('bob', 'write_file')}")
print(f"Admin可以执行系统命令: {permission_mgr.check_permission('admin_user', 'execute_system')}")

4.2 输入验证与净化

防止恶意输入导致安全问题:

import re
import html

class InputSanitizer:
    """输入净化器"""
    
    def __init__(self):
        # 定义危险模式
        self.dangerous_patterns = [
            r"(\b)(DROP\s+TABLE\b)",  # SQL注入
            r"(\b)(DELETE\s+FROM\b)",
            r"(\b)(INSERT\s+INTO\b)",
            r"(\b)(UPDATE\b)",
            r"(\b)(UNION\s+SELECT\b)",
            r"(<\s*script\b[^>]*>)",  # XSS攻击
            r"(javascript\s*:\s*)",
            r"(on\w+\s*=\s*)",
            r"(<\s*iframe\b[^>]*>)",
            r"(\b)(rm\s+-rf\b)",  # 命令注入
            r"(\b)(wget\b)",
            r"(\b)(curl\b)",
            r"(\.\.\/\.\.\/)",  # 路径遍历
        ]
        
        # 编译正则表达式
        self.patterns = [re.compile(pattern, re.IGNORECASE) 
                        for pattern in self.dangerous_patterns]
    
    def sanitize_string(self, input_str: str, max_length: int = 1000) -> str:
        """净化字符串输入"""
        if not isinstance(input_str, str):
            raise TypeError("输入必须是字符串")
        
        # 截断长度
        if len(input_str) > max_length:
            input_str = input_str[:max_length]
        
        # 移除危险内容
        sanitized = input_str
        for pattern in self.patterns:
            sanitized = pattern.sub(r"\1[BLOCKED]", sanitized)
        
        # HTML转义
        sanitized = html.escape(sanitized)
        
        return sanitized
    
    def sanitize_dict(self, input_dict: dict, string_fields: list = None) -> dict:
        """净化字典输入"""
        if not isinstance(input_dict, dict):
            raise TypeError("输入必须是字典")
        
        sanitized = {}
        for key, value in input_dict.items():
            if string_fields and key not in string_fields:
                # 非字符串字段,直接复制(可能需要其他验证)
                sanitized[key] = value
            elif isinstance(value, str):
                # 字符串字段进行净化
                sanitized[key] = self.sanitize_string(value)
            elif isinstance(value, dict):
                # 递归处理嵌套字典
                sanitized[key] = self.sanitize_dict(value, string_fields)
            elif isinstance(value, list):
                # 处理列表
                sanitized[key] = [
                    self.sanitize_string(item) if isinstance(item, str) else item
                    for item in value
                ]
            else:
                # 其他类型直接复制
                sanitized[key] = value
        
        return sanitized
    
    def validate_file_path(self, file_path: str, allowed_dirs: list = None) -> bool:
        """验证文件路径安全性"""
        import os
        from pathlib import Path
        
        # 检查路径遍历
        if ".." in file_path:
            return False
        
        # 规范化路径
        try:
            normalized = os.path.normpath(file_path)
        except:
            return False
        
        # 检查绝对路径限制
        if os.path.isabs(normalized):
            # 只允许在特定目录下
            if allowed_dirs:
                allowed = False
                for allowed_dir in allowed_dirs:
                    if normalized.startswith(allowed_dir):
                        allowed = True
                        break
                if not allowed:
                    return False
        
        # 检查文件类型(扩展名)
        safe_extensions = {'.txt', '.md', '.json', '.csv', '.log'}
        ext = Path(normalized).suffix.lower()
        if ext not in safe_extensions:
            return False
        
        return True

# 使用示例
sanitizer = InputSanitizer()

# 测试危险输入
dangerous_input = "用户输入<script>alert('xss')</script> AND DROP TABLE users"
safe_input = sanitizer.sanitize_string(dangerous_input)
print(f"净化前: {dangerous_input}")
print(f"净化后: {safe_input}")

# 测试文件路径验证
print(f"安全路径: {sanitizer.validate_file_path('data/report.txt')}")
print(f"危险路径: {sanitizer.validate_file_path('../../../etc/passwd')}")

4.3 资源消耗监控

防止恶意或错误的工具调用消耗过多资源:

import time
import psutil
import threading
from typing import Dict, Any, Optional
from dataclasses import dataclass

@dataclass
class ResourceUsage:
    """资源使用情况"""
    cpu_percent: float
    memory_mb: float
    execution_time: float
    network_requests: int
    file_operations: int

class ResourceMonitor:
    """资源监控器"""
    
    def __init__(self, limits: Optional[Dict[str, float]] = None):
        self.limits = limits or {
            "max_cpu_percent": 80.0,
            "max_memory_mb": 1024,  # 1GB
            "max_execution_time": 30,  # 30秒
            "max_network_requests": 10,
            "max_file_operations": 50
        }
        
        self.current_usage = {
            "cpu_percent": 0.0,
            "memory_mb": 0.0,
            "execution_time": 0.0,
            "network_requests": 0,
            "file_operations": 0
        }
        
        self.lock = threading.Lock()
        self.start_time = time.time()
    
    def check_limits(self) -> Dict[str, Any]:
        """检查是否超过资源限制"""
        with self.lock:
            violations = []
            
            for key, limit in self.limits.items():
                usage_key = key.replace("max_", "")
                current = self.current_usage.get(usage_key, 0)
                
                if current > limit:
                    violations.append({
                        "resource": usage_key,
                        "current": current,
                        "limit": limit,
                        "exceeded_by": ((current - limit) / limit) * 100
                    })
            
            return {
                "within_limits": len(violations) == 0,
                "violations": violations,
                "current_usage": self.current_usage.copy()
            }
    
    def update_usage(self, metric: str, value: float):
        """更新资源使用量"""
        with self.lock:
            if metric in self.current_usage:
                if metric in ["network_requests", "file_operations"]:
                    self.current_usage[metric] += int(value)
                else:
                    self.current_usage[metric] = value
    
    def monitor_execution(self, func, *args, **kwargs):
        """监控函数执行的资源使用"""
        start_time = time.time()
        
        # 启动监控线程
        stop_monitoring = threading.Event()
        
        def monitor_loop():
            while not stop_monitoring.is_set():
                # 获取CPU和内存使用
                cpu = psutil.cpu_percent(interval=0.1)
                memory = psutil.Process().memory_info().rss / 1024 / 1024  # MB
                
                self.update_usage("cpu_percent", cpu)
                self.update_usage("memory_mb", memory)
                
                # 更新执行时间
                elapsed = time.time() - start_time
                self.update_usage("execution_time", elapsed)
                
                # 检查限制
                limits_check = self.check_limits()
                if not limits_check["within_limits"]:
                    print(f"资源限制超出: {limits_check['violations']}")
                    # 这里可以触发告警或停止执行
                
                time.sleep(0.5)
        
        monitor_thread = threading.Thread(target=monitor_loop)
        monitor_thread.daemon = True
        monitor_thread.start()
        
        try:
            # 执行函数
            result = func(*args, **kwargs)
            return result
        finally:
            # 停止监控
            stop_monitoring.set()
            monitor_thread.join(timeout=1.0)
            
            # 最终检查
            final_check = self.check_limits()
            if not final_check["within_limits"]:
                print(f"执行完成,但资源使用超出限制: {final_check['violations']}")
            
            return final_check

# 使用示例
def expensive_operation():
    """模拟消耗资源的操作"""
    import random
    data = []
    for i in range(100000):
        data.append(random.random() * random.random())
    return sum(data)

monitor = ResourceMonitor(limits={
    "max_cpu_percent": 50,
    "max_memory_mb": 500,
    "max_execution_time": 5,
    "max_network_requests": 5,
    "max_file_operations": 10
})

# 监控执行
result = monitor.monitor_execution(expensive_operation)
print(f"监控结果: {result}")

第五部分:实战案例 - 自动化数据报表生成系统

5.1 系统架构设计

让我们构建一个完整的自动化数据报表生成系统,整合今天学习的所有技术:

"""
自动化数据报表生成系统
功能:从多个数据源收集数据,分析处理,生成可视化报表
"""
import json
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
import seaborn as sns

class DataReportSystem:
    """自动化数据报表生成系统"""
    
    def __init__(self, config_path: str = "config/report_config.json"):
        self.config = self._load_config(config_path)
        self.tools_registry = {}
        self.register_default_tools()
    
    def _load_config(self, config_path: str) -> Dict[str, Any]:
        """加载配置文件"""
        import os
        if os.path.exists(config_path):
            with open(config_path, 'r', encoding='utf-8') as f:
                return json.load(f)
        return {
            "data_sources": [],
            "report_templates": {},
            "schedule": {"interval_hours": 24},
            "output_formats": ["html", "pdf", "markdown"]
        }
    
    def register_default_tools(self):
        """注册默认工具集"""
        # 数据收集工具
        self.register_tool("fetch_database_data", self.fetch_database_data)
        self.register_tool("fetch_api_data", self.fetch_api_data)
        self.register_tool("read_local_files", self.read_local_files)
        
        # 数据处理工具
        self.register_tool("clean_data", self.clean_data)
        self.register_tool("aggregate_data", self.aggregate_data)
        self.register_tool("calculate_metrics", self.calculate_metrics)
        
        # 报表生成工具
        self.register_tool("create_visualizations", self.create_visualizations)
        self.register_tool("generate_report_html", self.generate_report_html)
        self.register_tool("export_report", self.export_report)
    
    def register_tool(self, name: str, function):
        """注册工具函数"""
        self.tools_registry[name] = function
    
    def fetch_database_data(self, query: str, connection_params: Dict) -> pd.DataFrame:
        """从数据库获取数据"""
        import sqlalchemy
        # 这里使用SQLAlchemy作为示例
        engine = sqlalchemy.create_engine(
            f"{connection_params['dialect']}://"
            f"{connection_params['user']}:{connection_params['password']}@"
            f"{connection_params['host']}:{connection_params['port']}/"
            f"{connection_params['database']}"
        )
        
        df = pd.read_sql(query, engine)
        return df
    
    def fetch_api_data(self, api_url: str, params: Dict = None) -> Dict:
        """从API获取数据"""
        import requests
        response = requests.get(api_url, params=params or {})
        response.raise_for_status()
        return response.json()
    
    def read_local_files(self, file_pattern: str) -> List[pd.DataFrame]:
        """读取本地数据文件"""
        from glob import glob
        import os
        
        data_frames = []
        for file_path in glob(file_pattern):
            if file_path.endswith('.csv'):
                df = pd.read_csv(file_path)
                data_frames.append(df)
            elif file_path.endswith('.json'):
                df = pd.read_json(file_path)
                data_frames.append(df)
            elif file_path.endswith('.xlsx') or file_path.endswith('.xls'):
                df = pd.read_excel(file_path)
                data_frames.append(df)
        
        return data_frames
    
    def clean_data(self, df: pd.DataFrame, cleaning_rules: Dict) -> pd.DataFrame:
        """清洗数据"""
        cleaned_df = df.copy()
        
        # 处理缺失值
        if "missing_value_strategy" in cleaning_rules:
            strategy = cleaning_rules["missing_value_strategy"]
            if strategy == "drop":
                cleaned_df = cleaned_df.dropna()
            elif strategy == "fill_mean":
                cleaned_df = cleaned_df.fillna(cleaned_df.mean(numeric_only=True))
            elif strategy == "fill_median":
                cleaned_df = cleaned_df.fillna(cleaned_df.median(numeric_only=True))
        
        # 处理重复值
        if cleaning_rules.get("remove_duplicates", False):
            cleaned_df = cleaned_df.drop_duplicates()
        
        # 数据类型转换
        if "type_conversions" in cleaning_rules:
            for column, target_type in cleaning_rules["type_conversions"].items():
                if column in cleaned_df.columns:
                    try:
                        cleaned_df[column] = cleaned_df[column].astype(target_type)
                    except:
                        pass  # 转换失败保持原类型
        
        return cleaned_df
    
    def aggregate_data(self, df: pd.DataFrame, group_by: List[str], 
                      aggregations: Dict) -> pd.DataFrame:
        """聚合数据"""
        return df.groupby(group_by).agg(aggregations).reset_index()
    
    def calculate_metrics(self, df: pd.DataFrame, metrics_def: Dict) -> Dict[str, float]:
        """计算关键指标"""
        results = {}
        
        for metric_name, metric_config in metrics_def.items():
            metric_type = metric_config.get("type", "simple")
            
            if metric_type == "simple":
                # 简单计算:平均值、总和等
                column = metric_config["column"]
                operation = metric_config["operation"]
                
                if operation == "mean":
                    results[metric_name] = df[column].mean()
                elif operation == "sum":
                    results[metric_name] = df[column].sum()
                elif operation == "count":
                    results[metric_name] = df[column].count()
                elif operation == "std":
                    results[metric_name] = df[column].std()
            
            elif metric_type == "ratio":
                # 比率计算
                numerator = metric_config["numerator"]
                denominator = metric_config["denominator"]
                results[metric_name] = df[numerator].sum() / max(df[denominator].sum(), 1)
            
            elif metric_type == "growth":
                # 增长率计算(需要时间序列数据)
                pass  # 实现略
        
        return results
    
    def create_visualizations(self, df: pd.DataFrame, viz_config: List[Dict]) -> List[str]:
        """创建可视化图表"""
        output_files = []
        
        for i, config in enumerate(viz_config):
            viz_type = config.get("type", "line")
            title = config.get("title", f"Chart {i+1}")
            
            plt.figure(figsize=config.get("figsize", (10, 6)))
            
            if viz_type == "line":
                if "x" in config and "y" in config:
                    plt.plot(df[config["x"]], df[config["y"]])
                else:
                    # 默认绘制所有数值列
                    numeric_cols = df.select_dtypes(include=['number']).columns
                    for col in numeric_cols[:5]:  # 最多5列
                        plt.plot(df.index, df[col], label=col)
            
            elif viz_type == "bar":
                if "x" in config and "y" in config:
                    plt.bar(df[config["x"]], df[config["y"]])
            
            elif viz_type == "histogram":
                if "column" in config:
                    plt.hist(df[config["column"]].dropna(), bins=config.get("bins", 20))
            
            elif viz_type == "scatter":
                if "x" in config and "y" in config:
                    plt.scatter(df[config["x"]], df[config["y"]])
            
            elif viz_type == "heatmap":
                # 相关性热图
                corr_matrix = df.select_dtypes(include=['number']).corr()
                sns.heatmap(corr_matrix, annot=True, cmap='coolwarm')
            
            plt.title(title)
            if config.get("xlabel"):
                plt.xlabel(config["xlabel"])
            if config.get("ylabel"):
                plt.ylabel(config["ylabel"])
            
            if config.get("legend", False) and viz_type == "line":
                plt.legend()
            
            # 保存图表
            filename = f"output/chart_{i+1}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
            plt.savefig(filename, dpi=150, bbox_inches='tight')
            plt.close()
            
            output_files.append(filename)
        
        return output_files
    
    def generate_report_html(self, data: Dict, metrics: Dict, 
                           charts: List[str], template: str = "basic") -> str:
        """生成HTML报表"""
        import jinja2
        
        # 加载模板
        if template == "basic":
            html_template = """
            <!DOCTYPE html>
            <html>
            <head>
                <title>数据报表 - {{ report_date }}</title>
                <style>
                    body { font-family: Arial, sans-serif; margin: 40px; }
                    .header { background: #f0f0f0; padding: 20px; border-radius: 5px; }
                    .metrics { display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 20px; margin: 30px 0; }
                    .metric-card { background: white; border: 1px solid #ddd; padding: 15px; border-radius: 5px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
                    .metric-value { font-size: 24px; font-weight: bold; color: #2c3e50; }
                    .metric-label { color: #7f8c8d; font-size: 14px; }
                    .charts { margin: 30px 0; }
                    .chart-container { margin: 20px 0; }
                    .chart-img { max-width: 100%; border: 1px solid #ddd; border-radius: 5px; }
                    table { width: 100%; border-collapse: collapse; margin: 20px 0; }
                    th, td { border: 1px solid #ddd; padding: 12px; text-align: left; }
                    th { background-color: #f8f9fa; }
                </style>
            </head>
            <body>
                <div class="header">
                    <h1>数据报表</h1>
                    <p>生成时间: {{ report_date }}</p>
                    <p>数据源: {{ data_source }}</p>
                </div>
                
                {% if metrics %}
                <div class="metrics">
                    {% for name, value in metrics.items() %}
                    <div class="metric-card">
                        <div class="metric-label">{{ name }}</div>
                        <div class="metric-value">{{ "%.2f"|format(value) }}</div>
                    </div>
                    {% endfor %}
                </div>
                {% endif %}
                
                {% if charts %}
                <div class="charts">
                    <h2>可视化图表</h2>
                    {% for chart in charts %}
                    <div class="chart-container">
                        <img src="{{ chart }}" alt="Chart" class="chart-img">
                    </div>
                    {% endfor %}
                </div>
                {% endif %}
                
                {% if sample_data %}
                <div class="data-preview">
                    <h2>数据预览</h2>
                    {{ sample_data|safe }}
                </div>
                {% endif %}
            </body>
            </html>
            """
        else:
            # 可以加载外部模板文件
            html_template = template
        
        # 准备数据
        context = {
            "report_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            "data_source": data.get("source", "Multiple Sources"),
            "metrics": metrics,
            "charts": charts,
            "sample_data": data.get("preview_html", "")
        }
        
        # 渲染模板
        template_engine = jinja2.Template(html_template)
        return template_engine.render(context)
    
    def export_report(self, html_content: str, formats: List[str] = None) -> Dict[str, str]:
        """导出报表到多种格式"""
        if formats is None:
            formats = ["html"]
        
        output_files = {}
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        
        for fmt in formats:
            if fmt == "html":
                filename = f"output/report_{timestamp}.html"
                with open(filename, 'w', encoding='utf-8') as f:
                    f.write(html_content)
                output_files["html"] = filename
            
            elif fmt == "pdf":
                # 需要安装wkhtmltopdf
                try:
                    import pdfkit
                    filename = f"output/report_{timestamp}.pdf"
                    pdfkit.from_string(html_content, filename)
                    output_files["pdf"] = filename
                except:
                    print("PDF导出失败,请安装wkhtmltopdf")
            
            elif fmt == "markdown":
                # 简化转换
                filename = f"output/report_{timestamp}.md"
                markdown_content = f"""# 数据报表\n\n生成时间: {timestamp}\n\n"""
                with open(filename, 'w', encoding='utf-8') as f:
                    f.write(markdown_content)
                output_files["markdown"] = filename
        
        return output_files
    
    def execute_workflow(self, workflow_config: Dict) -> Dict[str, Any]:
        """执行完整的工作流"""
        results = {
            "success": False,
            "steps": {},
            "outputs": {},
            "errors": []
        }
        
        try:
            # 步骤1: 收集数据
            print("步骤1: 收集数据...")
            all_data = []
            
            for source in workflow_config.get("data_sources", []):
                source_type = source.get("type")
                
                if source_type == "database":
                    data = self.fetch_database_data(
                        source["query"], 
                        source["connection"]
                    )
                    all_data.append({"source": source["name"], "data": data})
                
                elif source_type == "api":
                    data = self.fetch_api_data(
                        source["url"],
                        source.get("params")
                    )
                    all_data.append({"source": source["name"], "data": pd.DataFrame(data)})
                
                elif source_type == "file":
                    files_data = self.read_local_files(source["pattern"])
                    for i, df in enumerate(files_data):
                        all_data.append({"source": f"{source['name']}_{i}", "data": df})
            
            results["steps"]["data_collection"] = {
                "status": "completed",
                "sources_count": len(all_data)
            }
            
            # 步骤2: 数据处理
            print("步骤2: 数据处理...")
            processed_data = []
            
            for data_item in all_data:
                df = data_item["data"]
                
                # 清洗
                if workflow_config.get("data_cleaning"):
                    df = self.clean_data(df, workflow_config["data_cleaning"])
                
                # 聚合
                if workflow_config.get("aggregation"):
                    agg_config = workflow_config["aggregation"]
                    df = self.aggregate_data(
                        df, 
                        agg_config.get("group_by", []),
                        agg_config.get("aggregations", {})
                    )
                
                processed_data.append({
                    "source": data_item["source"],
                    "data": df,
                    "row_count": len(df),
                    "column_count": len(df.columns)
                })
            
            results["steps"]["data_processing"] = {
                "status": "completed",
                "datasets_processed": len(processed_data)
            }
            
            # 步骤3: 计算指标
            print("步骤3: 计算指标...")
            all_metrics = {}
            
            for data_item in processed_data:
                df = data_item["data"]
                metrics = self.calculate_metrics(
                    df, 
                    workflow_config.get("metrics", {})
                )
                
                # 添加数据源前缀
                prefixed_metrics = {
                    f"{data_item['source']}_{k}": v 
                    for k, v in metrics.items()
                }
                all_metrics.update(prefixed_metrics)
            
            results["steps"]["metrics_calculation"] = {
                "status": "completed",
                "metrics_count": len(all_metrics)
            }
            
            # 步骤4: 创建可视化
            print("步骤4: 创建可视化...")
            all_charts = []
            
            for data_item in processed_data:
                df = data_item["data"]
                charts = self.create_visualizations(
                    df,
                    workflow_config.get("visualizations", [])
                )
                all_charts.extend(charts)
            
            results["steps"]["visualization"] = {
                "status": "completed",
                "charts_created": len(all_charts)
            }
            
            # 步骤5: 生成报表
            print("步骤5: 生成报表...")
            
            # 准备样本数据用于预览
            sample_df = processed_data[0]["data"] if processed_data else pd.DataFrame()
            sample_html = sample_df.head(10).to_html() if not sample_df.empty else ""
            
            html_report = self.generate_report_html(
                data={"source": "自动化报表系统", "preview_html": sample_html},
                metrics=all_metrics,
                charts=all_charts,
                template=workflow_config.get("template", "basic")
            )
            
            results["steps"]["report_generation"] = {
                "status": "completed",
                "html_length": len(html_report)
            }
            
            # 步骤6: 导出报表
            print("步骤6: 导出报表...")
            output_files = self.export_report(
                html_report,
                formats=workflow_config.get("output_formats", ["html"])
            )
            
            results["steps"]["export"] = {
                "status": "completed",
                "files": output_files
            }
            
            results["success"] = True
            results["outputs"] = {
                "metrics": all_metrics,
                "charts": all_charts,
                "files": output_files,
                "report_html": html_report[:500] + "..."  # 只存储前500字符
            }
            
        except Exception as e:
            results["success"] = False
            results["errors"].append(str(e))
            print(f"工作流执行失败: {e}")
        
        return results

# 使用示例
def run_automated_reporting():
    """运行自动化报表生成示例"""
    system = DataReportSystem()
    
    # 定义工作流配置
    workflow_config = {
        "data_sources": [
            {
                "name": "销售数据",
                "type": "file",
                "pattern": "data/sales_*.csv"
            }
        ],
        "data_cleaning": {
            "missing_value_strategy": "fill_mean",
            "remove_duplicates": True
        },
        "aggregation": {
            "group_by": ["region", "product_category"],
            "aggregations": {
                "sales_amount": "sum",
                "quantity": "sum",
                "customer_count": "count"
            }
        },
        "metrics": {
            "total_sales": {"type": "simple", "column": "sales_amount", "operation": "sum"},
            "avg_transaction": {"type": "simple", "column": "sales_amount", "operation": "mean"},
            "sales_per_customer": {"type": "ratio", "numerator": "sales_amount", "denominator": "customer_count"}
        },
        "visualizations": [
            {
                "type": "bar",
                "x": "region",
                "y": "sales_amount",
                "title": "各地区销售额对比"
            },
            {
                "type": "line",
                "title": "销售趋势分析"
            }
        ],
        "output_formats": ["html", "pdf"],
        "template": "basic"
    }
    
    # 执行工作流
    results = system.execute_workflow(workflow_config)
    
    if results["success"]:
        print("报表生成成功!")
        print(f"生成的文件: {results['outputs']['files']}")
        print(f"计算的指标数量: {len(results['outputs']['metrics'])}")
        print(f"创建的图表数量: {len(results['outputs']['charts'])}")
    else:
        print(f"报表生成失败: {results['errors']}")
    
    return results

if __name__ == "__main__":
    # 创建示例数据文件(如果不存在)
    import os
    if not os.path.exists("data"):
        os.makedirs("data")
    
    # 生成示例销售数据
    import numpy as np
    dates = pd.date_range(start="2024-01-01", end="2024-12-31", freq='D')
    regions = ["North", "South", "East", "West"]
    categories = ["Electronics", "Clothing", "Home", "Books"]
    
    sample_data = []
    for i in range(100):
        row = {
            "date": np.random.choice(dates),
            "region": np.random.choice(regions),
            "product_category": np.random.choice(categories),
            "sales_amount": np.random.uniform(100, 10000),
            "quantity": np.random.randint(1, 100),
            "customer_id": f"CUST{np.random.randint(1000, 9999)}"
        }
        sample_data.append(row)
    
    df = pd.DataFrame(sample_data)
    df.to_csv("data/sales_sample.csv", index=False)
    
    # 运行报表系统
    run_automated_reporting()

第六部分:今日行动 - 实践函数调用技术

行动选项1:基础实践 - 创建天气查询助手

目标:实现一个能够查询天气并给出穿衣建议的AI助手

步骤

  1. 定义天气查询函数(可以使用模拟数据或免费API如OpenWeatherMap)
  2. 实现穿衣建议函数(根据温度、天气状况给出建议)
  3. 创建AI对话流程,让AI理解用户意图并调用相应函数
  4. 测试不同场景下的响应

代码框架

import random
from datetime import datetime
from typing import Dict, List

def get_weather(city: str, date: str = None) -> Dict:
    """获取城市天气信息(模拟实现)"""
    # 模拟数据 - 实际项目中使用真实API
    weather_conditions = ["晴", "多云", "小雨", "中雨", "大雨", "雪", "雾"]
    temperatures = {
        "北京": (-5, 5),
        "上海": (5, 15),
        "广州": (15, 25),
        "深圳": (16, 26),
        "成都": (3, 12)
    }
    
    temp_range = temperatures.get(city, (0, 20))
    temp = random.randint(temp_range[0], temp_range[1])
    
    return {
        "city": city,
        "date": date or datetime.now().strftime("%Y-%m-%d"),
        "temperature": temp,
        "condition": random.choice(weather_conditions),
        "humidity": f"{random.randint(30, 90)}%",
        "wind_speed": f"{random.randint(1, 20)}km/h"
    }

def get_clothing_advice(temperature: int, condition: str) -> Dict:
    """根据天气给出穿衣建议"""
    if temperature < 0:
        advice = "保暖内衣 + 毛衣 + 羽绒服,戴手套和帽子"
    elif temperature < 10:
        advice = "秋衣 + 外套,建议穿长裤"
    elif temperature < 20:
        advice = "长袖T恤 + 薄外套,可根据体感调整"
    else:
        advice = "短袖或薄长袖,注意防晒"
    
    # 根据天气状况调整
    if "雨" in condition:
        advice += ",记得带雨伞"
    elif "雪" in condition:
        advice += ",穿防滑鞋"
    elif "雾" in condition:
        advice += ",注意交通安全"
    
    return {
        "temperature_category": f"{temperature}°C",
        "condition": condition,
        "clothing_advice": advice,
        "accessories": ["雨伞"] if "雨" in condition else []
    }

# 函数定义供AI使用
weather_functions = [
    {
        "name": "get_weather",
        "description": "获取城市天气信息",
        "parameters": {
            "type": "object",
            "properties": {
                "city": {"type": "string", "description": "城市名称"},
                "date": {"type": "string", "description": "日期,格式YYYY-MM-DD"}
            },
            "required": ["city"]
        }
    },
    {
        "name": "get_clothing_advice",
        "description": "根据天气状况给出穿衣建议",
        "parameters": {
            "type": "object",
            "properties": {
                "temperature": {"type": "integer", "description": "温度(摄氏度)"},
                "condition": {"type": "string", "description": "天气状况"}
            },
            "required": ["temperature", "condition"]
        }
    }
]

行动选项2:中级实践 - 构建智能文档分析系统

目标:创建一个能够读取文档、分析内容、提取关键信息的系统

步骤

  1. 实现文件读取工具(支持txt、pdf、docx等格式)
  2. 创建文本分析工具(关键词提取、摘要生成、情感分析)
  3. 设计多步骤工作流:读取→分析→生成报告
  4. 添加错误处理和资源监控

核心功能

  • 批量处理文档
  • 自动分类和标签生成
  • 生成分析报告摘要
  • 支持自定义分析规则

行动选项3:高级实践 - 开发自动化工作流引擎

目标:实现一个可配置的自动化工作流引擎,支持复杂任务编排

步骤

  1. 设计工作流DSL(领域特定语言)或JSON配置格式
  2. 实现任务调度器,支持顺序、并行、条件分支
  3. 添加工具注册和发现机制
  4. 实现状态管理和错误恢复
  5. 创建可视化工作流编辑器(可选)

高级特性

  • 实时监控工作流执行状态
  • 支持人工干预和步骤重试
  • 工作流版本管理和回滚
  • 性能优化和资源限制

额外挑战:函数调用优化竞赛

挑战内容

  1. 最少调用优化:设计一个场景,用最少的函数调用完成复杂任务
  2. 并行执行优化:识别可以并行执行的函数调用,减少总执行时间
  3. 错误恢复优化:设计智能错误恢复机制,在部分失败时最大化完成任务
  4. 成本优化:考虑API调用成本,设计成本最优的调用策略

评估标准

  • 任务完成度
  • 调用次数和效率
  • 错误处理能力
  • 代码可读性和可维护性

总结与展望

通过今天的学习,我们掌握了让AI从"思考者"转变为"执行者"的关键技术。函数调用不仅仅是API调用,它是一种思维方式和工作方法的转变:

关键收获:

  1. 结构化思维:学会将复杂需求分解为可执行的函数调用
  2. 工具集成能力:掌握将外部工具和服务集成到AI工作流中
  3. 安全与可靠性:理解函数调用中的安全风险和防范措施
  4. 工程化实践:学习如何设计可维护、可扩展的函数调用系统

未来方向:

  1. 自动化程度的提升:从单次调用到自动化工作流
  2. 智能调度优化:基于历史数据和预测优化工具调用顺序
  3. 跨平台集成:在不同AI平台间实现函数调用兼容
  4. 低代码/无代码工具:让非技术用户也能配置复杂工作流

行业应用:

  • 客户服务:自动查询订单、处理退款、安排售后
  • 数据分析:自动收集、清洗、分析、可视化数据
  • 内容创作:研究、写作、编辑、发布全流程自动化
  • 开发运维:代码审查、测试、部署、监控自动化

记住,函数调用的真正价值不在于技术本身,而在于它如何扩展AI的能力边界,让人工智能真正成为你的数字伙伴,而不仅仅是对话伙伴。

今日行动建议:从最简单的天气查询助手开始,逐步构建更复杂的系统。每一次实践都会加深你对函数调用本质的理解。

技术不是目的,而是手段。真正的目标是让技术服务于人,让AI成为放大人类能力的工具,而不是替代品。

Logo

电商企业物流数字化转型必备!快递鸟 API 接口,72 小时快速完成物流系统集成。全流程实战1V1指导,营造开放的API技术生态圈。

更多推荐