项目八:Agent与自动化工作流(跨境电商AI运营助手Agent系统)
配置参数核心代码=====================================业务场景:大型科技公司自动化处理从简历筛选到入职的全流程
·
项目原型
🤖 电商智能Agent运营系统
==================================================
[Agent状态监控]
🟢 ProductResearchAgent - 空闲
🟢 PricingAgent - 运行中 (处理3个任务)
🟡 CustomerServiceAgent - 高负载 (15个并发)
[工作流执行概览]
产品上架工作流: ████████ 85% 完成
订单处理工作流: ██████████ 100% 完成
客户服务工作流: █████ 50% 完成
[实时任务执行]
┌─────────────────────────────────────────────────────┐
│ 📦 产品上架: "智能手表X1" │
│ ├── 市场研究: ✅ 完成 (12.5s) │
│ ├── 定价策略: 🔄 进行中 │
│ └── 上架准备: ⏳ 等待中 │
│ │
│ 💰 动态调价: "无线耳机Pro" │
│ ├── 竞争价格监控: ✅ 完成 │
│ ├── 价格优化计算: ✅ 完成 │
│ └── 平台价格更新: ✅ 完成 │
│ │
│ 💬 客户服务: "订单12345物流查询" │
│ ├── 意图识别: ✅ 配送咨询 │
│ ├── 自动回复: ✅ 完成 │
│ └── 满意度预测: ⭐ 4.2/5.0 │
└─────────────────────────────────────────────────────┘
[智能决策推荐]
🎯 推荐操作:
• 根据市场趋势,建议增加"运动耳机"品类库存
• 检测到竞争对手降价,建议调整"智能手表"定价
• 客户咨询量增加,建议扩充客服团队
[Agent通信监控]
📨 消息流量: 125 条/分钟
📊 处理成功率: 98.7%
⏱️ 平均响应时间: 0.8s
[操作面板]
1. 🚀 启动产品上架工作流 2. 📊 执行市场竞争分析
3. 💬 批量处理客户咨询 4. ⚙️ 配置Agent参数
5. 📈 查看性能指标 6. 🔄 系统状态刷新
请输入选择 [1-6]:
配置参数
# config/agent_workflow_config.yaml
agents:
product_research:
capabilities: ["market_analysis", "competitor_research", "trend_prediction"]
data_sources: ["电商平台API", "社交媒体", "行业数据库"]
research_depth: "comprehensive"
update_interval: "24h"
pricing:
capabilities: ["price_optimization", "dynamic_pricing", "competitor_price_tracking"]
optimization_method: "reinforcement_learning"
price_adjustment_frequency: "1h"
risk_tolerance: "medium"
customer_service:
capabilities: ["query_answering", "complaint_handling", "recommendation"]
response_timeout: 30
escalation_threshold: 0.3
satisfaction_target: 0.85
workflows:
product_listing:
steps: 3
timeout: 300
retry_attempts: 3
break_on_failure: true
customer_service:
steps: 2
timeout: 60
retry_attempts: 2
break_on_failure: false
message_bus:
max_queue_size: 1000
processing_workers: 5
retry_policy:
max_attempts: 3
backoff_factor: 1.5
monitoring:
log_level: "INFO"
metrics_collection: true
health_check_interval: 30
alerting:
enabled: true
slack_webhook: "https://hooks.slack.com/..."
email_alerts: true
核心代码
from typing import Dict, List, Any, Callable
import asyncio
import logging
from datetime import datetime
from enum import Enum
import json
import uuid
from dataclasses import dataclass
from abc import ABC, abstractmethod
class AgentStatus(Enum):
IDLE = "idle"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
PAUSED = "paused"
class TaskPriority(Enum):
LOW = 1
NORMAL = 2
HIGH = 3
URGENT = 4
@dataclass
class Task:
"""任务数据类"""
task_id: str
agent_type: str
payload: Dict[str, Any]
priority: TaskPriority
created_at: datetime
status: AgentStatus
result: Dict[str, Any] = None
error: str = None
class BaseAgent(ABC):
"""基础Agent抽象类"""
def __init__(self, agent_id: str, capabilities: List[str]):
self.agent_id = agent_id
self.capabilities = capabilities
self.status = AgentStatus.IDLE
self.current_task = None
self.logger = logging.getLogger(f"Agent_{agent_id}")
self.message_bus = None
def set_message_bus(self, message_bus):
"""设置消息总线"""
self.message_bus = message_bus
async def execute(self, task: Task) -> Task:
"""执行任务"""
self.status = AgentStatus.RUNNING
self.current_task = task
try:
self.logger.info(f"开始执行任务: {task.task_id}")
result = await self._process(task.payload)
task.status = AgentStatus.COMPLETED
task.result = result
self.status = AgentStatus.IDLE
# 发布任务完成消息
if self.message_bus:
await self.message_bus.publish(
"task_completed",
{"agent_id": self.agent_id, "task": task}
)
self.logger.info(f"任务完成: {task.task_id}")
except Exception as e:
self.logger.error(f"任务执行失败: {task.task_id}, 错误: {e}")
task.status = AgentStatus.FAILED
task.error = str(e)
self.status = AgentStatus.IDLE
# 发布任务失败消息
if self.message_bus:
await self.message_bus.publish(
"task_failed",
{"agent_id": self.agent_id, "task": task, "error": str(e)}
)
return task
@abstractmethod
async def _process(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""处理任务的具体逻辑(子类实现)"""
pass
async def send_message(self, target_agent: str, message: Dict[str, Any]):
"""向其他Agent发送消息"""
if self.message_bus:
await self.message_bus.publish(
"agent_message",
{
"from_agent": self.agent_id,
"to_agent": target_agent,
"message": message,
"timestamp": datetime.now().isoformat()
}
)
class ProductResearchAgent(BaseAgent):
"""产品研究Agent - 负责市场分析和竞品研究"""
def __init__(self):
super().__init__("product_research", ["market_analysis", "competitor_research", "trend_prediction"])
self.data_sources = ["电商平台API", "社交媒体", "行业数据库"]
self.competitor_db = {} # 竞品数据库
async def _process(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""处理产品研究任务"""
product_query = payload.get('product_query', '')
research_depth = payload.get('research_depth', 'standard')
max_results = payload.get('max_results', 20)
self.logger.info(f"开始产品研究: {product_query}")
# 并行执行多个研究任务
market_task = asyncio.create_task(self._analyze_market_trends(product_query))
competitor_task = asyncio.create_task(self._research_competitors(product_query))
pricing_task = asyncio.create_task(self._analyze_pricing_strategy(product_query))
# 等待所有任务完成
market_data, competitor_analysis, pricing_analysis = await asyncio.gather(
market_task, competitor_task, pricing_task,
return_exceptions=True
)
# 生成综合建议
recommendation = await self._generate_recommendation(
market_data, competitor_analysis, pricing_analysis
)
return {
'research_query': product_query,
'market_analysis': market_data,
'competitor_analysis': competitor_analysis,
'pricing_analysis': pricing_analysis,
'recommendation': recommendation,
'data_sources': self.data_sources,
'research_timestamp': datetime.now().isoformat()
}
async def _analyze_market_trends(self, product_query: str) -> Dict[str, Any]:
"""分析市场趋势"""
# 模拟API调用和数据收集
await asyncio.sleep(1) # 模拟网络请求延迟
return {
'market_size': "百亿级",
'growth_rate': "15% YoY",
'seasonality': "Q4旺季",
'customer_segments': ["年轻人", "都市白领", "家庭用户"],
'key_trends': [
"智能化升级",
"环保材料需求增长",
"个性化定制流行"
]
}
async def _research_competitors(self, product_query: str) -> Dict[str, Any]:
"""研究竞争对手"""
await asyncio.sleep(0.8)
# 模拟竞品分析
competitors = [
{
'name': '品牌A',
'market_share': '35%',
'pricing_range': [199, 599],
'strengths': ['品牌知名度', '渠道覆盖'],
'weaknesses': ['创新不足', '价格偏高']
},
{
'name': '品牌B',
'market_share': '25%',
'pricing_range': [159, 499],
'strengths': ['性价比', '产品创新'],
'weaknesses': ['品牌影响力弱', '售后服务']
}
]
return {
'main_competitors': competitors,
'competitive_landscape': "高度竞争",
'barriers_to_entry': "中等",
'differentiation_opportunities': ["智能化", "服务体验", "定制化"]
}
async def _analyze_pricing_strategy(self, product_query: str) -> Dict[str, Any]:
"""分析定价策略"""
await asyncio.sleep(0.5)
return {
'price_range_recommendation': [179, 429],
'optimal_price_point': 299,
'pricing_strategies': [
"渗透定价法",
"价值定价法",
"捆绑销售"
],
'discount_strategy': {
'seasonal_discount': '10-15%',
'volume_discount': '5-20%',
'promotional_discount': '20-30%'
}
}
async def _generate_recommendation(self, market_data: Dict, competitor_analysis: Dict, pricing_analysis: Dict) -> Dict[str, Any]:
"""生成产品建议"""
return {
'product_positioning': "中高端智能产品",
'target_segment': "都市年轻白领",
'key_features': ["智能化", "易用性", "设计感"],
'pricing_recommendation': "299-399元主力价位",
'marketing_focus': ["社交媒体营销", "KOL合作", "内容营销"],
'risk_warnings': [
"市场竞争激烈,需突出差异化",
"注意成本控制,确保利润率"
]
}
class PricingAgent(BaseAgent):
"""定价Agent - 负责动态定价和价格优化"""
def __init__(self):
super().__init__("pricing", ["price_optimization", "dynamic_pricing", "competitor_price_tracking"])
self.price_rules = self._load_pricing_rules()
self.competitor_price_cache = {}
def _load_pricing_rules(self) -> List[Dict[str, Any]]:
"""加载定价规则"""
return [
{
'rule_id': 'cost_plus',
'condition': "default",
'action': "cost * 1.8",
'priority': 1
},
{
'rule_id': 'competitor_match',
'condition': "competitor_price < current_price",
'action': "match_competitor_price",
'priority': 2
},
{
'rule_id': 'demand_based',
'condition': "demand_high and inventory_low",
'action': "increase_price_5%",
'priority': 3
}
]
async def _process(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""处理定价任务"""
product_info = payload.get('product_info', {})
market_conditions = payload.get('market_conditions', {})
pricing_strategy = payload.get('pricing_strategy', 'auto')
self.logger.info(f"开始定价计算: {product_info.get('product_id', 'unknown')}")
# 获取实时市场数据
market_data = await self._gather_market_data(product_info)
# 应用定价规则
optimal_price = await self._calculate_optimal_price(product_info, market_data, pricing_strategy)
# 生成动态定价规则
dynamic_rules = await self._generate_dynamic_rules(product_info, market_data)
# 风险评估
risk_assessment = await self._assess_pricing_risk(optimal_price, market_data)
return {
'product_id': product_info.get('product_id'),
'optimal_price': optimal_price,
'pricing_strategy': pricing_strategy,
'price_range': self._calculate_price_range(optimal_price),
'dynamic_pricing_rules': dynamic_rules,
'market_data': market_data,
'risk_assessment': risk_assessment,
'confidence_score': 0.87
}
async def _gather_market_data(self, product_info: Dict[str, Any]) -> Dict[str, Any]:
"""收集市场数据"""
tasks = [
self._get_competitor_prices(product_info),
self._analyze_demand_patterns(product_info),
self._check_inventory_levels(product_info),
self._monitor_market_trends(product_info)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
'competitor_prices': results[0] if not isinstance(results[0], Exception) else {},
'demand_patterns': results[1] if not isinstance(results[1], Exception) else {},
'inventory_levels': results[2] if not isinstance(results[2], Exception) else {},
'market_trends': results[3] if not isinstance(results[3], Exception) else {}
}
async def _calculate_optimal_price(self, product_info: Dict, market_data: Dict, strategy: str) -> float:
"""计算最优价格"""
cost = product_info.get('cost', 0)
competitor_prices = market_data.get('competitor_prices', {}).values()
if competitor_prices:
avg_competitor_price = sum(competitor_prices) / len(competitor_prices)
if strategy == 'competitive':
return avg_competitor_price * 0.95 # 比平均价低5%
elif strategy == 'premium':
return avg_competitor_price * 1.15 # 比平均价高15%
else: # auto
return max(cost * 1.5, avg_competitor_price * 0.98)
else:
# 没有竞品数据,使用成本加成
return cost * 2.0
async def _generate_dynamic_rules(self, product_info: Dict, market_data: Dict) -> List[Dict[str, Any]]:
"""生成动态定价规则"""
rules = []
# 基于需求的规则
demand_rules = await self._create_demand_based_rules(market_data.get('demand_patterns', {}))
rules.extend(demand_rules)
# 基于竞争的规则
competition_rules = await self._create_competition_rules(market_data.get('competitor_prices', {}))
rules.extend(competition_rules)
# 基于库存的规则
inventory_rules = await self._create_inventory_rules(market_data.get('inventory_levels', {}))
rules.extend(inventory_rules)
return sorted(rules, key=lambda x: x.get('priority', 1))
async def _assess_pricing_risk(self, price: float, market_data: Dict) -> Dict[str, Any]:
"""评估定价风险"""
risk_factors = []
risk_score = 0
competitor_prices = list(market_data.get('competitor_prices', {}).values())
if competitor_prices:
min_competitor_price = min(competitor_prices)
if price > min_competitor_price * 1.2:
risk_factors.append("价格显著高于竞争对手")
risk_score += 30
# 成本检查
if hasattr(self, 'current_cost') and price < self.current_cost * 1.1:
risk_factors.append("利润率过低")
risk_score += 40
# 需求弹性考虑
demand_elasticity = market_data.get('demand_patterns', {}).get('elasticity', 1.0)
if demand_elasticity > 1.5 and price > 100: # 高弹性商品
risk_factors.append("需求弹性高,价格敏感")
risk_score += 20
risk_level = "低风险" if risk_score < 30 else "中等风险" if risk_score < 60 else "高风险"
return {
'risk_score': risk_score,
'risk_level': risk_level,
'risk_factors': risk_factors,
'mitigation_suggestions': self._generate_risk_mitigation(risk_factors)
}
class CustomerServiceAgent(BaseAgent):
"""客服Agent - 处理客户咨询和问题解决"""
def __init__(self, knowledge_base: Dict[str, Any]):
super().__init__("customer_service", ["query_answering", "complaint_handling", "recommendation"])
self.knowledge_base = knowledge_base
self.conversation_history = {}
self.escalation_threshold = 0.3
async def _process(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""处理客服任务"""
customer_query = payload.get('query', '')
customer_id = payload.get('customer_id', 'anonymous')
conversation_id = payload.get('conversation_id', str(uuid.uuid4()))
self.logger.info(f"处理客户咨询: {customer_id}")
# 更新对话历史
if conversation_id not in self.conversation_history:
self.conversation_history[conversation_id] = []
self.conversation_history[conversation_id].append({
'role': 'customer',
'content': customer_query,
'timestamp': datetime.now().isoformat()
})
# 分析用户意图
intent_analysis = await self._analyze_intent(customer_query)
# 生成回答
if intent_analysis['confidence'] > self.escalation_threshold:
response = await self._generate_response(customer_query, intent_analysis)
escalation_required = False
else:
response = "您的问题比较复杂,我将为您转接人工客服。"
escalation_required = True
# 记录回复
self.conversation_history[conversation_id].append({
'role': 'assistant',
'content': response,
'timestamp': datetime.now().isoformat()
})
# 满意度预测
satisfaction_score = await self._predict_satisfaction(response, customer_query)
return {
'conversation_id': conversation_id,
'customer_id': customer_id,
'query': customer_query,
'response': response,
'intent': intent_analysis,
'satisfaction_score': satisfaction_score,
'escalation_required': escalation_required,
'response_time': datetime.now().isoformat()
}
async def _analyze_intent(self, query: str) -> Dict[str, Any]:
"""分析用户意图"""
intents = {
'shipping_inquiry': ['物流', '发货', '配送', '快递', '多久到'],
'return_policy': ['退货', '退款', '换货', '退换', '售后'],
'product_info': ['产品', '规格', '功能', '材质', '怎么用'],
'complaint': ['投诉', '不满意', '问题', '故障', '坏了'],
'pricing': ['价格', '多少钱', '优惠', '打折', '促销']
}
query_lower = query.lower()
matched_intents = []
for intent, keywords in intents.items():
keyword_matches = [keyword for keyword in keywords if keyword in query_lower]
if keyword_matches:
confidence = min(0.9, len(keyword_matches) * 0.3) # 基于关键词数量计算置信度
matched_intents.append({
'intent': intent,
'confidence': confidence,
'matched_keywords': keyword_matches
})
# 返回置信度最高的意图
if matched_intents:
best_intent = max(matched_intents, key=lambda x: x['confidence'])
return best_intent
else:
return {'intent': 'general_inquiry', 'confidence': 0.1, 'matched_keywords': []}
async def _generate_response(self, query: str, intent_analysis: Dict[str, Any]) -> str:
"""生成回答"""
intent = intent_analysis['intent']
confidence = intent_analysis['confidence']
response_templates = {
'shipping_inquiry': "我们通常在下单后24小时内发货,配送需要3-5个工作日。您可以通过订单号查询具体物流信息。",
'return_policy': "我们提供7天无理由退货服务,商品需保持完好。具体退换货流程请参考我们的售后政策。",
'product_info': "请告诉我您想了解哪款产品的详细信息?我可以为您介绍规格、功能和使用方法。",
'pricing': "当前产品价格是{}元,我们有各种优惠活动,具体可以查看产品页面或咨询客服。",
'general_inquiry': "感谢您的咨询。请问您需要了解哪方面的信息?我可以帮您解答产品、价格、配送等问题。"
}
base_response = response_templates.get(intent, response_templates['general_inquiry'])
# 根据置信度调整回答
if confidence < 0.5:
base_response = "我理解您想问关于" + intent_analysis.get('matched_keywords', ['这个'])[0] + "的问题。" + base_response
return base_response
async def _predict_satisfaction(self, response: str, query: str) -> float:
"""预测客户满意度"""
# 简化的满意度预测模型
base_score = 0.7
# 回答长度因素
if len(response) > 50:
base_score += 0.1
# 问题复杂度因素
if len(query) > 20: # 长问题通常更复杂
base_score -= 0.1
# 关键词匹配因素
positive_keywords = ['很快', '立即', '保证', '免费', '优惠']
if any(keyword in response for keyword in positive_keywords):
base_score += 0.1
return min(max(base_score, 0.1), 1.0)
class MessageBus:
"""消息总线 - 负责Agent间的通信"""
def __init__(self):
self.subscribers = {}
self.message_queue = asyncio.Queue()
self.logger = logging.getLogger("MessageBus")
async def publish(self, topic: str, message: Dict[str, Any]):
"""发布消息"""
await self.message_queue.put({
'topic': topic,
'message': message,
'timestamp': datetime.now().isoformat()
})
# 通知订阅者
if topic in self.subscribers:
for callback in self.subscribers[topic]:
asyncio.create_task(callback(message))
def subscribe(self, topic: str, callback: Callable):
"""订阅消息"""
if topic not in self.subscribers:
self.subscribers[topic] = []
self.subscribers[topic].append(callback)
async def start_processing(self):
"""开始处理消息"""
self.logger.info("消息总线开始处理消息")
while True:
try:
message_item = await self.message_queue.get()
self.logger.debug(f"处理消息: {message_item['topic']}")
# 这里可以添加消息持久化、重试等逻辑
self.message_queue.task_done()
except Exception as e:
self.logger.error(f"消息处理错误: {e}")
class WorkflowOrchestrator:
"""工作流编排器 - 协调多个Agent完成任务"""
def __init__(self):
self.agents = {}
self.workflows = {}
self.message_bus = MessageBus()
self.task_queue = asyncio.Queue()
self.logger = logging.getLogger("WorkflowOrchestrator")
# 启动消息总线
asyncio.create_task(self.message_bus.start_processing())
def register_agent(self, agent: BaseAgent):
"""注册Agent"""
self.agents[agent.agent_id] = agent
agent.set_message_bus(self.message_bus)
self.logger.info(f"注册Agent: {agent.agent_id}")
def define_workflow(self, workflow_id: str, steps: List[Dict[str, Any]]):
"""定义工作流"""
self.workflows[workflow_id] = steps
self.logger.info(f"定义工作流: {workflow_id} with {len(steps)} steps")
async def execute_workflow(self, workflow_id: str, initial_data: Dict[str, Any]) -> Dict[str, Any]:
"""执行工作流"""
if workflow_id not in self.workflows:
raise ValueError(f"工作流未定义: {workflow_id}")
self.logger.info(f"开始执行工作流: {workflow_id}")
current_data = initial_data.copy()
execution_results = {}
workflow_steps = self.workflows[workflow_id]
for step in workflow_steps:
step_id = step['id']
agent_id = step['agent']
task_config = step.get('task', {})
self.logger.info(f"执行步骤 {step_id} with agent {agent_id}")
if agent_id not in self.agents:
self.logger.error(f"Agent未注册: {agent_id}")
execution_results[step_id] = {
'status': 'failed',
'error': f'Agent {agent_id} not found'
}
continue
# 准备任务数据
task_data = self._prepare_task_data(task_config, current_data)
# 创建任务
task = Task(
task_id=f"{workflow_id}_{step_id}_{uuid.uuid4().hex[:8]}",
agent_type=agent_id,
payload=task_data,
priority=TaskPriority.NORMAL,
created_at=datetime.now(),
status=AgentStatus.RUNNING
)
# 执行任务
agent = self.agents[agent_id]
try:
completed_task = await agent.execute(task)
execution_results[step_id] = {
'status': 'completed',
'result': completed_task.result
}
# 更新当前数据
current_data[step_id] = completed_task.result
except Exception as e:
self.logger.error(f"步骤执行失败: {step_id}, 错误: {e}")
execution_results[step_id] = {
'status': 'failed',
'error': str(e)
}
# 检查是否中断工作流
if step.get('break_on_failure', True):
self.logger.warning(f"工作流在步骤 {step_id} 中断")
break
self.logger.info(f"工作流执行完成: {workflow_id}")
return {
'workflow_id': workflow_id,
'status': 'completed',
'execution_results': execution_results,
'final_data': current_data,
'timestamp': datetime.now().isoformat()
}
def _prepare_task_data(self, task_config: Dict[str, Any], current_data: Dict[str, Any]) -> Dict[str, Any]:
"""准备任务数据"""
# 这里可以实现数据映射和变量替换
prepared_data = task_config.copy()
# 简单的变量替换(实际应该更复杂)
for key, value in prepared_data.items():
if isinstance(value, str) and value.startswith('{{') and value.endswith('}}'):
var_name = value[2:-2].strip()
if var_name in current_data:
prepared_data[key] = current_data[var_name]
return prepared_data
class ECommerceWorkflowManager:
"""电商工作流管理器"""
def __init__(self):
self.orchestrator = WorkflowOrchestrator()
self._setup_agents()
self._define_workflows()
def _setup_agents(self):
"""设置Agents"""
# 初始化各个Agent
product_agent = ProductResearchAgent()
pricing_agent = PricingAgent()
# 客服知识库
cs_knowledge_base = {
'shipping_info': '标准配送3-5天,加急配送1-2天',
'return_policy': '7天无理由退货,30天质量问题的换货',
'contact_info': '客服电话: 400-123-4567, 工作时间: 9:00-18:00'
}
service_agent = CustomerServiceAgent(cs_knowledge_base)
# 注册Agents
self.orchestrator.register_agent(product_agent)
self.orchestrator.register_agent(pricing_agent)
self.orchestrator.register_agent(service_agent)
def _define_workflows(self):
"""定义工作流"""
# 产品上架工作流
product_listing_workflow = [
{
'id': 'market_research',
'agent': 'product_research',
'task': {
'product_query': '{{product_name}}',
'research_depth': 'comprehensive',
'max_results': 25
}
},
{
'id': 'pricing_strategy',
'agent': 'pricing',
'task': {
'product_info': {
'product_id': '{{product_id}}',
'cost': '{{product_cost}}',
'category': '{{product_category}}'
},
'market_conditions': '{{market_research.result.market_analysis}}',
'pricing_strategy': 'auto'
}
},
{
'id': 'listing_preparation',
'agent': 'customer_service', # 简化处理
'task': {
'query': '准备产品上架描述和FAQ',
'product_data': '{{pricing_strategy.result}}',
'market_insights': '{{market_research.result}}'
}
}
]
self.orchestrator.define_workflow('product_listing', product_listing_workflow)
# 客户服务升级工作流
customer_service_workflow = [
{
'id': 'initial_response',
'agent': 'customer_service',
'task': {
'query': '{{customer_query}}',
'customer_id': '{{customer_id}}'
}
},
{
'id': 'escalation_check',
'agent': 'customer_service',
'task': {
'query': '评估是否需要升级处理',
'conversation_history': '{{initial_response.result.conversation_id}}'
},
'break_on_failure': False
}
]
self.orchestrator.define_workflow('customer_service', customer_service_workflow)
async def list_new_product(self, product_name: str, product_id: str, cost: float, category: str) -> Dict[str, Any]:
"""上架新产品"""
initial_data = {
'product_name': product_name,
'product_id': product_id,
'product_cost': cost,
'product_category': category
}
return await self.orchestrator.execute_workflow('product_listing', initial_data)
async def handle_customer_query(self, query: str, customer_id: str) -> Dict[str, Any]:
"""处理客户查询"""
initial_data = {
'customer_query': query,
'customer_id': customer_id
}
return await self.orchestrator.execute_workflow('customer_service', initial_data)
=====================================
Java架构
Agent在人力资源招聘的自动化流程
业务场景:大型科技公司自动化处理从简历筛选到入职的全流程
┌─ 智能招聘Agent工作台 ────────────────────────────────────┐
│ 招聘岗位: Java开发工程师(P7) | 需求: 3人 | 进行中 │
├─────────────────────────────────────────────────────────┤
│ 【自动化招聘流程】 │
│ │
│ 阶段1: 简历筛选 ✅ 完成 │
│ ├─ 收到简历: 235份 → 初筛通过: 48份 │
│ └─ Agent筛选标准: 技术栈匹配度>70% & 经验>5年 │
│ │
│ 阶段2: 技术笔试 ⏳ 进行中 │
│ ├─ 已发送笔试: 48人 → 已完成: 35人 │
│ ├─ 平均分: 82分 | 通过率: 65% │
│ └─ Agent自动评卷 + 编程题运行测试 │
│ │
│ 阶段3: 面试安排 🔄 等待调度 │
│ ├─ 待安排面试: 23人 │
│ ├─ 面试官: 张总监(王二, 李四配合) │
│ └─ Agent建议时间: 3月18-22日 │
│ │
│ 阶段4: Offer发放 ◼️ 未开始 │
│ 阶段5: 入职办理 ◼️ 未开始 │
│ │
│ 【Agent自动化操作】 │
│ ✓ 自动回复初筛结果 ❌ 自动安排面试官日程 │
│ ✓ 自动发送笔试链接 ❌ 自动生成面试评价表 │
│ ✓ 自动技术评估打分 ❌ 自动发起背调 │
│ │
│ [立即安排面试] [调整筛选条件] [查看候选人详情] │
└─────────────────────────────────────────────────────────┘
- 完整的依赖配置 - 详细的工作流引擎、规则引擎、消息队列等依赖
- 企业级实体设计 - 完整的JPA实体类,包含详细注释
- 工作流引擎集成 - 基于Flowable的完整工作流管理
- 规则引擎决策 - 基于Drools的智能决策系统
- 智能体架构 - 模块化的智能体设计(简历筛选、面试安排、Offer决策)
- 完整的测试用例 - 单元测试、集成测试、性能测试
- 异常处理 - 完善的错误处理和日志记录
- 性能优化 - 并发处理、批量操作、缓存等企业级特性
Maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.company</groupId>
<artifactId>hr-automation-system</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<name>HR Automation System</name>
<description>企业级HR招聘自动化工作流系统</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
<relativePath/>
</parent>
<properties>
<java.version>17</java.version>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- 工作流引擎 -->
<flowable.version>6.8.0</flowable.version>
<camunda.version>7.19.0</camunda.version>
<!-- AI/ML相关 -->
<tensorflow.version>2.13.0</tensorflow.version>
<opennlp.version>2.2.0</opennlp.version>
<!-- 文档处理 -->
<pdfbox.version>3.0.1</pdfbox.version>
<poi.version>5.2.4</poi.version>
<!-- 消息队列 -->
<kafka.version>3.4.0</kafka.version>
<!-- 工具类 -->
<hutool.version>5.8.22</hutool.version>
<guava.version>32.1.3-jre</guava.version>
<apache.commons.version>2.14.0</apache.commons.version>
</properties>
<dependencies>
<!-- Spring Boot Starters -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<!-- 工作流引擎 -->
<dependency>
<groupId>org.flowable</groupId>
<artifactId>flowable-spring-boot-starter</artifactId>
<version>${flowable.version}</version>
</dependency>
<dependency>
<groupId>org.camunda.bpm.springboot</groupId>
<artifactId>camunda-bpm-spring-boot-starter</artifactId>
<version>${camunda.version}</version>
</dependency>
<!-- 数据库 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- 消息队列 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 规则引擎 -->
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-core</artifactId>
<version>8.40.0.Final</version>
</dependency>
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-compiler</artifactId>
<version>8.40.0.Final</version>
</dependency>
<dependency>
<groupId>org.drools</groupId>
<artifactId>drools-decisiontables</artifactId>
<version>8.40.0.Final</version>
</dependency>
<!-- AI/机器学习 -->
<dependency>
<groupId>org.tensorflow</groupId>
<artifactId>tensorflow-core-api</artifactId>
<version>${tensorflow.version}</version>
</dependency>
<dependency>
<groupId>org.apache.opennlp</groupId>
<artifactId>opennlp-tools</artifactId>
<version>${opennlp.version}</version>
</dependency>
<!-- 文档处理 -->
<dependency>
<groupId>org.apache.pdfbox</groupId>
<artifactId>pdfbox</artifactId>
<version>${pdfbox.version}</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>${poi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>${poi.version}</version>
</dependency>
<!-- 工具类库 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${apache.commons.version}</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- 邮件模板 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<!-- API文档 -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.2.0</version>
</dependency>
<!-- 测试依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>17</source>
<target>17</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
核心实体类设计
package com.company.hrauto.entity;
import jakarta.persistence.*;
import lombok.*;
import org.hibernate.annotations.CreationTimestamp;
import org.hibernate.annotations.UpdateTimestamp;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
/**
* 招聘职位实体 - 存储招聘职位信息
* 对应数据库表:recruitment_positions
*/
@Entity
@Table(name = "recruitment_positions", indexes = {
@Index(name = "idx_position_status", columnList = "status"),
@Index(name = "idx_position_department", columnList = "department"),
@Index(name = "idx_position_created", columnList = "createdTime")
})
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ToString(exclude = {"candidates", "workflowInstances"})
public class RecruitmentPosition {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 职位唯一标识
*/
@Column(nullable = false, unique = true, length = 64)
private String positionId;
/**
* 职位名称
*/
@Column(nullable = false, length = 200)
private String title;
/**
* 职位描述
*/
@Column(columnDefinition = "TEXT")
private String description;
/**
* 部门
*/
@Column(nullable = false, length = 100)
private String department;
/**
* 职级
*/
@Column(nullable = false, length = 50)
private String level;
/**
* 工作地点
*/
@Column(nullable = false, length = 100)
private String location;
/**
* 招聘数量
*/
@Column(nullable = false)
private Integer headcount;
/**
* 已招聘数量
*/
@Column(nullable = false)
private Integer hiredCount;
/**
* 职位状态:OPEN, CLOSED, ON_HOLD, CANCELLED
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 20)
private PositionStatus status;
/**
* 紧急程度:LOW, MEDIUM, HIGH, URGENT
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 20)
private UrgencyLevel urgency;
/**
* 职位要求(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String requirements;
/**
* 创建人
*/
@Column(nullable = false, length = 50)
private String createdBy;
/**
* 创建时间
*/
@Column(nullable = false)
private LocalDateTime createdTime;
/**
* 候选人列表
*/
@OneToMany(mappedBy = "position", cascade = CascadeType.ALL, fetch = FetchType.LAZY)
private List<Candidate> candidates = new ArrayList<>();
/**
* 工作流实例列表
*/
@OneToMany(mappedBy = "position", cascade = CascadeType.ALL, fetch = FetchType.LAZY)
private List<WorkflowInstance> workflowInstances = new ArrayList<>();
@CreationTimestamp
private LocalDateTime createTime;
@UpdateTimestamp
private LocalDateTime updateTime;
/**
* 职位状态枚举
*/
public enum PositionStatus {
OPEN, // 开放中
CLOSED, // 已关闭
ON_HOLD, // 暂停招聘
CANCELLED // 已取消
}
/**
* 紧急程度枚举
*/
public enum UrgencyLevel {
LOW, // 低
MEDIUM, // 中
HIGH, // 高
URGENT // 紧急
}
}
/**
* 候选人实体 - 存储候选人信息
* 对应数据库表:candidates
*/
@Entity
@Table(name = "candidates", indexes = {
@Index(name = "idx_candidate_email", columnList = "email"),
@Index(name = "idx_candidate_status", columnList = "status"),
@Index(name = "idx_candidate_position", columnList = "position_id"),
@Index(name = "idx_candidate_source", columnList = "source")
})
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ToString(exclude = {"position", "applications", "interviews"})
public class Candidate {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 候选人唯一标识
*/
@Column(nullable = false, unique = true, length = 64)
private String candidateId;
/**
* 关联的招聘职位
*/
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "position_id", nullable = false)
private RecruitmentPosition position;
/**
* 姓名
*/
@Column(nullable = false, length = 100)
private String name;
/**
* 邮箱
*/
@Column(nullable = false, length = 200)
private String email;
/**
* 手机号
*/
@Column(nullable = false, length = 20)
private String phone;
/**
* 简历文件路径
*/
@Column(nullable = false, length = 500)
private String resumePath;
/**
* 简历解析结果(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String resumeData;
/**
* 候选人来源:WEBSITE, REFERRAL, HEADHUNT, CAMPUS
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 30)
private CandidateSource source;
/**
* 候选人状态:NEW, SCREENING, INTERVIEW, OFFER, HIRED, REJECTED
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 30)
private CandidateStatus status;
/**
* 匹配度评分(0-100)
*/
@Column(precision = 5, scale = 2)
private Double matchScore;
/**
* AI评估结果(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String aiAssessment;
/**
* 标签(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String tags;
/**
* 创建时间
*/
@Column(nullable = false)
private LocalDateTime appliedTime;
/**
* 最后更新时间
*/
private LocalDateTime lastUpdatedTime;
/**
* 申请记录列表
*/
@OneToMany(mappedBy = "candidate", cascade = CascadeType.ALL, fetch = FetchType.LAZY)
private List<CandidateApplication> applications = new ArrayList<>();
/**
* 面试记录列表
*/
@OneToMany(mappedBy = "candidate", cascade = CascadeType.ALL, fetch = FetchType.LAZY)
private List<Interview> interviews = new ArrayList<>();
@CreationTimestamp
private LocalDateTime createTime;
@UpdateTimestamp
private LocalDateTime updateTime;
/**
* 候选人来源枚举
*/
public enum CandidateSource {
WEBSITE, // 官网
REFERRAL, // 内推
HEADHUNT, // 猎头
CAMPUS, // 校园招聘
SOCIAL_MEDIA, // 社交媒体
JOB_BOARD // 招聘网站
}
/**
* 候选人状态枚举
*/
public enum CandidateStatus {
NEW, // 新申请
SCREENING, // 简历筛选
INTERVIEW, // 面试中
OFFER, // 录用阶段
HIRED, // 已录用
REJECTED // 已拒绝
}
}
/**
* 工作流实例实体 - 存储自动化工作流实例信息
* 对应数据库表:workflow_instances
*/
@Entity
@Table(name = "workflow_instances", indexes = {
@Index(name = "idx_workflow_status", columnList = "status"),
@Index(name = "idx_workflow_type", columnList = "workflowType"),
@Index(name = "idx_workflow_position", columnList = "position_id")
})
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WorkflowInstance {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 工作流实例唯一标识
*/
@Column(nullable = false, unique = true, length = 64)
private String instanceId;
/**
* 关联的招聘职位
*/
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "position_id", nullable = false)
private RecruitmentPosition position;
/**
* 工作流类型:RESUME_SCREENING, INTERVIEW_SCHEDULING, ONBOARDING
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 50)
private WorkflowType workflowType;
/**
* 工作流定义ID
*/
@Column(nullable = false, length = 100)
private String workflowDefinitionId;
/**
* 工作流引擎实例ID
*/
@Column(nullable = false, length = 100)
private String engineInstanceId;
/**
* 工作流状态:RUNNING, COMPLETED, SUSPENDED, TERMINATED, ERROR
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 30)
private WorkflowStatus status;
/**
* 当前节点
*/
@Column(length = 100)
private String currentNode;
/**
* 输入参数(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String inputParameters;
/**
* 输出结果(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String outputResults;
/**
* 错误信息
*/
@Column(columnDefinition = "TEXT")
private String errorMessage;
/**
* 开始时间
*/
@Column(nullable = false)
private LocalDateTime startTime;
/**
* 结束时间
*/
private LocalDateTime endTime;
/**
* 执行时长(毫秒)
*/
private Long executionDuration;
@CreationTimestamp
private LocalDateTime createTime;
@UpdateTimestamp
private LocalDateTime updateTime;
/**
* 工作流类型枚举
*/
public enum WorkflowType {
RESUME_SCREENING, // 简历筛选
INTERVIEW_SCHEDULING, // 面试安排
OFFER_MANAGEMENT, // Offer管理
ONBOARDING, // 入职办理
BACKGROUND_CHECK // 背景调查
}
/**
* 工作流状态枚举
*/
public enum WorkflowStatus {
RUNNING, // 运行中
COMPLETED, // 已完成
SUSPENDED, // 已暂停
TERMINATED, // 已终止
ERROR // 错误
}
}
/**
* 智能体决策记录实体 - 存储AI Agent的决策记录
* 对应数据库表:agent_decisions
*/
@Entity
@Table(name = "agent_decisions", indexes = {
@Index(name = "idx_agent_candidate", columnList = "candidate_id"),
@Index(name = "idx_agent_type", columnList = "agentType"),
@Index(name = "idx_agent_decision_time", columnList = "decisionTime")
})
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AgentDecision {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/**
* 决策记录唯一标识
*/
@Column(nullable = false, unique = true, length = 64)
private String decisionId;
/**
* 关联的候选人
*/
@ManyToOne(fetch = FetchType.LAZY)
@JoinColumn(name = "candidate_id", nullable = false)
private Candidate candidate;
/**
* 智能体类型:SCREENING_AGENT, INTERVIEW_AGENT, OFFER_AGENT
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 50)
private AgentType agentType;
/**
* 决策类型:PASS, REJECT, HOLD, ESCALATE
*/
@Enumerated(EnumType.STRING)
@Column(nullable = false, length = 30)
private DecisionType decisionType;
/**
* 决策置信度(0-1)
*/
@Column(nullable = false, precision = 3, scale = 2)
private Double confidence;
/**
* 决策原因
*/
@Column(columnDefinition = "TEXT")
private String decisionReason;
/**
* 输入数据(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String inputData;
/**
* 输出结果(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String outputData;
/**
* 使用的规则(JSON格式)
*/
@Column(columnDefinition = "JSON")
private String usedRules;
/**
* 决策时间
*/
@Column(nullable = false)
private LocalDateTime decisionTime;
@CreationTimestamp
private LocalDateTime createTime;
/**
* 智能体类型枚举
*/
public enum AgentType {
SCREENING_AGENT, // 简历筛选Agent
INTERVIEW_AGENT, // 面试安排Agent
OFFER_AGENT, // Offer决策Agent
ONBOARDING_AGENT // 入职办理Agent
}
/**
* 决策类型枚举
*/
public enum DecisionType {
PASS, // 通过
REJECT, // 拒绝
HOLD, // 待定
ESCALATE // 升级人工处理
}
}
工作流引擎配置与服务
package com.company.hrauto.config;
import org.flowable.spring.SpringProcessEngineConfiguration;
import org.flowable.spring.boot.ProcessEngineConfigurationConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Flowable工作流引擎配置类
* 配置工作流引擎的各项参数和自定义行为
*/
@Configuration
public class FlowableConfig implements ProcessEngineConfigurationConfigurer{
/**
* 配置Flowable流程引擎
*
* @param configuration 流程引擎配置
*/
@Override
public void configure(SpringProcessEngineConfiguration configuration) {
// 配置数据库相关
configuration.setDatabaseSchemaUpdate("true"); // 自动创建和更新表结构
configuration.setAsyncExecutorActivate(true); // 启用异步执行器
configuration.setAsyncExecutorNumberOfRetries(3); // 异步重试次数
// 配置作业执行器
configuration.setJobExecutorActivate(true);
configuration.setAsyncExecutorDefaultAsyncJobAcquireWaitTime(5000); // 5秒
configuration.setAsyncExecutorDefaultTimerJobAcquireWaitTime(10000); // 10秒
// 配置历史记录
configuration.setHistoryLevel("audit"); // 审计级别历史记录
// 配置邮件服务器
configuration.setMailServerHost("smtp.company.com");
configuration.setMailServerPort(587);
configuration.setMailServerUsername("noreply@company.com");
configuration.setMailServerPassword("password");
configuration.setMailServerUseTLS(true);
// 自定义ID生成器
configuration.setIdGenerator(new CustomIdGenerator());
// 添加自定义监听器
configuration.setCustomDefaultBpmnParseHandlers(
List.of(new CustomBpmnParseHandler())
);
}
/**
* 自定义工作流监听器配置
*/
@Bean
public FlowableEventListenerConfigurer flowableEventListenerConfigurer() {
return engineConfiguration -> {
// 添加全局事件监听器
engineConfiguration.getEventDispatcher()
.addEventListener(new GlobalFlowableEventListener());
// 添加任务监听器
engineConfiguration.getTaskService()
.addTaskListener(new CustomTaskListener());
};
}
}
package com.company.hrauto.service;
import lombok.extern.slf4j.Slf4j;
import org.flowable.engine.*;
import org.flowable.engine.history.HistoricProcessInstance;
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.task.api.Task;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 工作流引擎服务
* 负责工作流的部署、启动、执行和管理
*/
@Service
@Slf4j
@Transactional
public class WorkflowEngineService {
private final RepositoryService repositoryService;
private final RuntimeService runtimeService;
private final TaskService taskService;
private final HistoryService historyService;
private final ManagementService managementService;
public WorkflowEngineService(RepositoryService repositoryService,
RuntimeService runtimeService,
TaskService taskService,
HistoryService historyService,
ManagementService managementService) {
this.repositoryService = repositoryService;
this.runtimeService = runtimeService;
this.taskService = taskService;
this.historyService = historyService;
this.managementService = managementService;
}
/**
* 部署工作流定义
*
* @param bpmnXml BPMN XML内容
* @param workflowName 工作流名称
* @return 部署ID
*/
public String deployWorkflow(String bpmnXml, String workflowName) {
log.info("开始部署工作流: {}", workflowName);
try {
var deployment = repositoryService.createDeployment()
.addString(workflowName + ".bpmn20.xml", bpmnXml)
.name(workflowName)
.category("HR_RECRUITMENT")
.deploy();
log.info("工作流部署成功: {}, 部署ID: {}", workflowName, deployment.getId());
return deployment.getId();
} catch (Exception e) {
log.error("工作流部署失败: {}, 错误: {}", workflowName, e.getMessage(), e);
throw new WorkflowDeploymentException("工作流部署失败: " + e.getMessage(), e);
}
}
/**
* 启动工作流实例
*
* @param processDefinitionKey 流程定义Key
* @param businessKey 业务Key
* @param variables 流程变量
* @return 流程实例ID
*/
public String startWorkflowInstance(String processDefinitionKey,
String businessKey,
Map<String, Object> variables) {
log.info("启动工作流实例: {}, 业务Key: {}", processDefinitionKey, businessKey);
try {
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(
processDefinitionKey, businessKey, variables);
String instanceId = processInstance.getId();
log.info("工作流实例启动成功: {}", instanceId);
return instanceId;
} catch (Exception e) {
log.error("工作流实例启动失败: {}, 错误: {}", processDefinitionKey, e.getMessage(), e);
throw new WorkflowExecutionException("工作流实例启动失败: " + e.getMessage(), e);
}
}
/**
* 完成当前任务
*
* @param taskId 任务ID
* @param variables 任务变量
*/
public void completeTask(String taskId, Map<String, Object> variables) {
log.debug("完成任务: {}", taskId);
try {
if (variables == null) {
variables = new HashMap<>();
}
taskService.complete(taskId, variables);
log.debug("任务完成成功: {}", taskId);
} catch (Exception e) {
log.error("任务完成失败: {}, 错误: {}", taskId, e.getMessage(), e);
throw new WorkflowExecutionException("任务完成失败: " + e.getMessage(), e);
}
}
/**
* 获取用户的待办任务
*
* @param userId 用户ID
* @param candidateGroup 候选组
* @return 待办任务列表
*/
public List<Task> getUserTasks(String userId, String candidateGroup) {
log.debug("获取用户待办任务: {}, 候选组: {}", userId, candidateGroup);
try {
List<Task> tasks;
if (candidateGroup != null) {
tasks = taskService.createTaskQuery()
.taskCandidateGroup(candidateGroup)
.active()
.orderByTaskCreateTime().desc()
.list();
} else {
tasks = taskService.createTaskQuery()
.taskAssignee(userId)
.active()
.orderByTaskCreateTime().desc()
.list();
}
log.debug("找到 {} 个待办任务", tasks.size());
return tasks;
} catch (Exception e) {
log.error("获取待办任务失败: {}, 错误: {}", userId, e.getMessage(), e);
throw new WorkflowQueryException("获取待办任务失败: " + e.getMessage(), e);
}
}
/**
* 获取流程实例变量
*
* @param processInstanceId 流程实例ID
* @return 流程变量映射
*/
public Map<String, Object> getProcessVariables(String processInstanceId) {
log.debug("获取流程实例变量: {}", processInstanceId);
try {
Map<String, Object> variables = runtimeService.getVariables(processInstanceId);
log.debug("获取到 {} 个流程变量", variables.size());
return variables;
} catch (Exception e) {
log.error("获取流程变量失败: {}, 错误: {}", processInstanceId, e.getMessage(), e);
throw new WorkflowQueryException("获取流程变量失败: " + e.getMessage(), e);
}
}
/**
* 设置流程实例变量
*
* @param processInstanceId 流程实例ID
* @param variables 流程变量
*/
public void setProcessVariables(String processInstanceId, Map<String, Object> variables) {
log.debug("设置流程实例变量: {}, 变量数量: {}", processInstanceId, variables.size());
try {
runtimeService.setVariables(processInstanceId, variables);
log.debug("流程变量设置成功");
} catch (Exception e) {
log.error("设置流程变量失败: {}, 错误: {}", processInstanceId, e.getMessage(), e);
throw new WorkflowExecutionException("设置流程变量失败: " + e.getMessage(), e);
}
}
/**
* 获取流程实例历史
*
* @param processInstanceId 流程实例ID
* @return 历史流程实例
*/
public HistoricProcessInstance getProcessInstanceHistory(String processInstanceId) {
log.debug("获取流程实例历史: {}", processInstanceId);
try {
HistoricProcessInstance historicInstance = historyService
.createHistoricProcessInstanceQuery()
.processInstanceId(processInstanceId)
.singleResult();
return historicInstance;
} catch (Exception e) {
log.error("获取流程历史失败: {}, 错误: {}", processInstanceId, e.getMessage(), e);
throw new WorkflowQueryException("获取流程历史失败: " + e.getMessage(), e);
}
}
/**
* 挂起流程实例
*
* @param processInstanceId 流程实例ID
*/
public void suspendProcessInstance(String processInstanceId) {
log.info("挂起流程实例: {}", processInstanceId);
try {
runtimeService.suspendProcessInstanceById(processInstanceId);
log.info("流程实例挂起成功: {}", processInstanceId);
} catch (Exception e) {
log.error("挂起流程实例失败: {}, 错误: {}", processInstanceId, e.getMessage(), e);
throw new WorkflowExecutionException("挂起流程实例失败: " + e.getMessage(), e);
}
}
/**
* 激活流程实例
*
* @param processInstanceId 流程实例ID
*/
public void activateProcessInstance(String processInstanceId) {
log.info("激活流程实例: {}", processInstanceId);
try {
runtimeService.activateProcessInstanceById(processInstanceId);
log.info("流程实例激活成功: {}", processInstanceId);
} catch (Exception e) {
log.error("激活流程实例失败: {}, 错误: {}", processInstanceId, e.getMessage(), e);
throw new WorkflowExecutionException("激活流程实例失败: " + e.getMessage(), e);
}
}
/**
* 删除流程实例
*
* @param processInstanceId 流程实例ID
* @param deleteReason 删除原因
*/
public void deleteProcessInstance(String processInstanceId, String deleteReason) {
log.info("删除流程实例: {}, 原因: {}", processInstanceId, deleteReason);
try {
runtimeService.deleteProcessInstance(processInstanceId, deleteReason);
log.info("流程实例删除成功: {}", processInstanceId);
} catch (Exception e) {
log.error("删除流程实例失败: {}, 错误: {}", processInstanceId, e.getMessage(), e);
throw new WorkflowExecutionException("删除流程实例失败: " + e.getMessage(), e);
}
}
}
/**
* 工作流定义常量类
*/
public class WorkflowConstants {
// 流程定义Key
public static final String PROCESS_RESUME_SCREENING = "resumeScreeningProcess";
public static final String PROCESS_INTERVIEW_SCHEDULING = "interviewSchedulingProcess";
public static final String PROCESS_OFFER_MANAGEMENT = "offerManagementProcess";
public static final String PROCESS_ONBOARDING = "onboardingProcess";
// 任务定义Key
public static final String TASK_AI_SCREENING = "aiResumeScreeningTask";
public static final String TASK_HR_REVIEW = "hrManualReviewTask";
public static final String TASK_SCHEDULE_INTERVIEW = "scheduleInterviewTask";
public static final String TASK_INTERVIEW_EVALUATION = "interviewEvaluationTask";
public static final String TASK_OFFER_APPROVAL = "offerApprovalTask";
public static final String TASK_BACKGROUND_CHECK = "backgroundCheckTask";
// 流程变量Key
public static final String VAR_CANDIDATE_ID = "candidateId";
public static final String VAR_POSITION_ID = "positionId";
public static final String VAR_MATCH_SCORE = "matchScore";
public static final String VAR_SCREENING_RESULT = "screeningResult";
public static final String VAR_INTERVIEW_SCORE = "interviewScore";
public static final String VAR_OFFER_APPROVED = "offerApproved";
public static final String VAR_BACKGROUND_CHECK_RESULT = "backgroundCheckResult";
// 候选组
public static final String GROUP_HR = "hrGroup";
public static final String GROUP_HIRING_MANAGER = "hiringManagerGroup";
public static final String GROUP_INTERVIEWER = "interviewerGroup";
public static final String GROUP_HR_BP = "hrBpGroup";
}
/**
* 工作流异常类
*/
public class WorkflowDeploymentException extends RuntimeException {
public WorkflowDeploymentException(String message) {
super(message);
}
public WorkflowDeploymentException(String message, Throwable cause) {
super(message, cause);
}
}
public class WorkflowExecutionException extends RuntimeException {
public WorkflowExecutionException(String message) {
super(message);
}
public WorkflowExecutionException(String message, Throwable cause) {
super(message, cause);
}
}
public class WorkflowQueryException extends RuntimeException {
public WorkflowQueryException(String message) {
super(message);
}
public WorkflowQueryException(String message, Throwable cause) {
super(message, cause);
}
}
智能体服务实现
package com.company.hrauto.service;
import lombok.extern.slf4j.Slf4j;
import org.kie.api.KieBase;
import org.kie.api.KieServices;
import org.kie.api.builder.KieBuilder;
import org.kie.api.builder.KieFileSystem;
import org.kie.api.builder.KieRepository;
import org.kie.api.builder.Message;
import org.kie.api.runtime.KieContainer;
import org.kie.api.runtime.KieSession;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 规则引擎服务
* 基于Drools规则引擎实现智能决策逻辑
*/
@Service
@Slf4j
public class RuleEngineService {
private final KieContainer kieContainer;
private final Map<String,KieBase> kieBases = new HashMap<>();
public RuleEngineService() {
// 初始化Drools规则引擎
this.kieContainer = initializeRuleEngine();
loadRuleBases();
}
/**
* 初始化Drools规则引擎
*/
private KieContainer initializeRuleEngine() {
log.info("初始化Drools规则引擎");
try{
KieServices kieServices = KieServices.Factory.get();
KieRepository kieRepository = kieServices.getRepository();
KieFileSystem kieFileSystem = kieServices.newKieFileSystem();
//加载简历筛选规则
KieFileSystem.write("src/main/resources/rules/resume-screening.drl",
loadResumeScreeningRules());
// 加载面试评估规则
kieFileSystem.write("src/main/resources/rules/interview-evaluation.drl",
loadInterviewEvaluationRules());
// 加载Offer决策规则
kieFileSystem.write("src/main/resources/rules/offer-decision.drl",
loadOfferDecisionRules());
KieBuilder kieBuilder = kieServices.newKieBuilder(kieFileSystem);
kieBuilder.buildAll();
if(kieBuilder.getResults().hasMessages(Message.Level.ERROR)){
log.error("规则编译错误: {}", kieBuilder.getResults().getMessages());
throw new RuleEngineException("规则编译失败");
}
KieContainer container = kieServices.newKieContainer(kieRepository.getDefaultReleaseId());
log.info("Drools规则引擎初始化成功");
return container;
}catch(Exception e){
log.error("规则引擎初始化失败: {}", e.getMessage(), e);
throw new RuleEngineException("规则引擎初始化失败: " + e.getMessage(), e);
}
}
//加载规则库
private void loadRuleBases(){
kieBases.put("resumeScreening", kieContainer.getKieBase("resumeScreening"));
kieBases.put("interviewEvaluation", kieContainer.getKieBase("interviewEvaluation"));
kieBases.put("offerDecision", kieContainer.getKieBase("offerDecision"));
}
/**
* 执行规则引擎决策
*
* @param ruleBase 规则库名称
* @param facts 事实对象列表
* @return 执行结果
*/
public RuleExecutionResult executeRules(String ruleBase, List<Object> facts) {
log.info("执行规则引擎决策,规则库: {}, 事实数量: {}", ruleBase, facts.size());
KieBase kieBase = kieBases.get(ruleBase);
if (kieBase == null) {
throw new RuleEngineException("规则库不存在: " + ruleBase);
}
KieSession kieSession = null;
try{
// 创建规则会话
kieSession = kieBase.newKieSession();
// 设置全局变量
kieSession.setGlobal("logger", log);
kieSession.setGlobal("decisionResult", new DecisionResult());
// 插入事实对象
for (Object fact : facts) {
kieSession.insert(fact);
}
// 执行规则
int firedRules = kieSession.fireAllRules();
// 获取执行结果
DecisionResult result = (DecisionResult) kieSession.getGlobal("decisionResult");
log.info("规则执行完成,触发规则数: {}, 决策结果: {}", firedRules, result);
return RuleExecutionResult.builder()
.firedRules(firedRules)
.decisionResult(result)
.success(true)
.build();
}catch(Exception e){
log.error("规则执行失败: {}, 错误: {}", ruleBase, e.getMessage(), e);
return RuleExecutionResult.builder()
.firedRules(0)
.decisionResult(new DecisionResult())
.success(false)
.errorMessage(e.getMessage())
.build();
}finally{
if (kieSession != null) {
kieSession.dispose();
}
}
}
/**
* 重新加载规则
*/
public void reloadRules() {
log.info("重新加载规则");
// 实现规则热加载逻辑
}
/**
* 加载简历筛选规则
*/
private String loadResumeScreeningRules() {
return """
package com.company.hrauto.rules.resumescreening
import com.company.hrauto.model.*
import java.util.*
// 高匹配度规则
rule "High Match Score"
when
$candidate: Candidate(matchScore >= 85)
$position: Position()
then
$candidate.setScreeningResult("PASS");
$candidate.setScreeningReason("匹配度高达 " + $candidate.getMatchScore() + "%,直接通过");
update($candidate);
end
// 中等匹配度规则
rule "Medium Match Score - Manual Review"
when
$candidate: Candidate(matchScore >= 70 && matchScore < 85)
$position: Position()
then
$candidate.setScreeningResult("HOLD");
$candidate.setScreeningReason("匹配度 " + $candidate.getMatchScore() + "%,需要人工审核");
update($candidate);
end
// 低匹配度规则
rule "Low Match Score - Reject"
when
$candidate: Candidate(matchScore < 70)
$position: Position()
then
$candidate.setScreeningResult("REJECT");
$candidate.setScreeningReason("匹配度 " + $candidate.getMatchScore() + "%,不符合要求");
update($candidate);
end
// 紧急职位特殊规则
rule "Urgent Position - Relaxed Criteria"
when
$candidate: Candidate(matchScore >= 65)
$position: Position(urgency == UrgencyLevel.URGENT)
then
$candidate.setScreeningResult("PASS");
$candidate.setScreeningReason("紧急职位,匹配度 " + $candidate.getMatchScore() + "%通过");
update($candidate);
end
// 关键技能匹配规则
rule "Key Skills Match"
when
$candidate: Candidate(matchScore >= 60, keySkillsMatches >= 3)
$position: Position()
then
$candidate.setScreeningResult("PASS");
$candidate.setScreeningReason("关键技能匹配数: " + $candidate.getKeySkillsMatches() + ",通过");
update($candidate);
end
""";
}
/**
* 加载面试评估规则
*/
private String loadInterviewEvaluationRules() {
return """
package com.company.hrauto.rules.interviewevaluation
import com.company.hrauto.model.*
import java.util.*
// 优秀面试表现
rule "Excellent Interview Performance"
when
$interview: Interview(overallScore >= 90)
then
$interview.setRecommendation("STRONG_HIRE");
$interview.setDecisionReason("面试表现优秀,综合评分 " + $interview.getOverallScore());
update($interview);
end
// 良好面试表现
rule "Good Interview Performance"
when
$interview: Interview(overallScore >= 80 && overallScore < 90)
then
$interview.setRecommendation("HIRE");
$interview.setDecisionReason("面试表现良好,综合评分 " + $interview.getOverallScore());
update($interview);
end
// 需要额外面试
rule "Need Additional Interview"
when
$interview: Interview(overallScore >= 70 && overallScore < 80)
then
$interview.setRecommendation("ADDITIONAL_INTERVIEW");
$interview.setDecisionReason("需要额外面试确认,当前评分 " + $interview.getOverallScore());
update($interview);
end
// 不推荐录用
rule "Not Recommended"
when
$interview: Interview(overallScore < 70)
then
$interview.setRecommendation("NO_HIRE");
$interview.setDecisionReason("面试表现不符合要求,评分 " + $interview.getOverallScore());
update($interview);
end
// 技术能力突出
rule "Outstanding Technical Skills"
when
$interview: Interview(technicalScore >= 90, overallScore >= 75)
then
$interview.setRecommendation("HIRE");
$interview.setDecisionReason("技术能力突出,技术评分 " + $interview.getTechnicalScore());
update($interview);
end
""";
}
}
package com.company.hrauto.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
/**
* 简历筛选智能体服务
* 负责自动化简历筛选和评估
*/
@Service
@Slf4j
public class ResumeScreeningAgent {
private final RuleEngineService ruleEngineService;
private final ResumeParserService resumeParserService;
private final AIService aiService;
public ResumeScreeningAgent(RuleEngineService ruleEngineService,
ResumeParserService resumeParserService,
AIService aiService) {
this.ruleEngineService = ruleEngineService;
this.resumeParserService = resumeParserService;
this.aiService = aiService;
}
/**
* 自动化简历筛选
*
* @param candidate 候选人信息
* @param position 招聘职位
* @return 筛选结果
*/
public ScreeningResult screenResume(Candidate candidate, RecruitmentPosition position) {
log.info("开始自动化简历筛选,候选人: {}, 职位: {}", candidate.getName(), position.getTitle());
long startTime = System.currentTimeMillis();
try{
//1.解析简历
ResumeData resumeData = resumeParserService.parseResume(candidate.getResumePath());
candidate.setResumeData(resumeData.toJson());
//2.AI能力评估
AIAssessment aiAssessment = aiService.assessCandidate(resumeData, position);
candidate.setAiAssessment(aiAssessment.toJson());
candidate.setMatchScore(aiAssessment.getOverallMatchScore());
//3.规则引擎决策
ScreeningResult screeningResult = executeScreeningRules(candidate, position, aiAssessment);
// 4. 记录决策日志
recordAgentDecision(candidate, screeningResult, aiAssessment);
long processingTime = System.currentTimeMillis() - startTime;
screeningResult.setProcessingTime(processingTime);
log.info("简历筛选完成,候选人: {}, 结果: {}, 处理时间: {}ms",
candidate.getName(), screeningResult.getDecision(), processingTime);
return screeningResult;
}catch(Exception e){
log.error("简历筛选失败,候选人: {}, 错误: {}", candidate.getName(), e.getMessage(), e);
return ScreeningResult.error("简历筛选失败: " + e.getMessage());
}
}
/**
* 批量简历筛选
*
* @param candidates 候选人列表
* @param position 招聘职位
* @return 筛选结果列表
*/
public Map<String, ScreeningResult> batchScreenResumes(List<Candidate> candidates,
RecruitmentPosition position) {
log.info("开始批量简历筛选,职位: {}, 候选人数量: {}", position.getTitle(), candidates.size());
Map<String, ScreeningResult> results = new HashMap<>();
for (Candidate candidate : candidates) {
try {
ScreeningResult result = screenResume(candidate, position);
results.put(candidate.getCandidateId(), result);
} catch (Exception e) {
log.warn("候选人 {} 筛选失败: {}", candidate.getName(), e.getMessage());
results.put(candidate.getCandidateId(), ScreeningResult.error(e.getMessage()));
}
}
// 生成批量筛选报告
generateBatchScreeningReport(results, position);
log.info("批量简历筛选完成,成功: {}, 失败: {}",
results.values().stream().filter(ScreeningResult::isSuccess).count(),
results.values().stream().filter(r -> !r.isSuccess()).count());
return results;
}
/**
* 执行筛选规则
*/
private ScreeningResult executeScreeningRules(Candidate candidate,
RecruitmentPosition position,
AIAssessment aiAssessment) {
// 准备事实对象
List<Object> facts = List.of(candidate, position, aiAssessment);
// 执行规则引擎
RuleExecutionResult ruleResult = ruleEngineService.executeRules("resumeScreening", facts);
if (!ruleResult.isSuccess()) {
return ScreeningResult.error("规则引擎执行失败: " + ruleResult.getErrorMessage());
}
// 构建筛选结果
DecisionResult decisionResult = ruleResult.getDecisionResult();
return ScreeningResult.builder()
.candidateId(candidate.getCandidateId())
.positionId(position.getPositionId())
.decision(decisionResult.getDecision())
.confidence(decisionResult.getConfidence())
.reason(decisionResult.getReason())
.matchScore(candidate.getMatchScore())
.aiAssessment(aiAssessment)
.ruleFiredCount(ruleResult.getFiredRules())
.success(true)
.build();
}
/**
* 记录Agent决策
*/
private void recordAgentDecision(Candidate candidate, ScreeningResult result, AIAssessment assessment) {
AgentDecision decision = AgentDecision.builder()
.decisionId(generateDecisionId())
.candidate(candidate)
.agentType(AgentDecision.AgentType.SCREENING_AGENT)
.decisionType(mapToDecisionType(result.getDecision()))
.confidence(result.getConfidence())
.decisionReason(result.getReason())
.inputData(buildInputData(candidate, assessment))
.outputData(buildOutputData(result))
.usedRules("resume-screening-rules")
.decisionTime(LocalDateTime.now())
.build();
// 保存到数据库
agentDecisionRepository.save(decision);
}
/**
* 生成批量筛选报告
*/
private void generateBatchScreeningReport(Map<String, ScreeningResult> results,
RecruitmentPosition position) {
BatchScreeningReport report = BatchScreeningReport.builder()
.positionId(position.getPositionId())
.totalCandidates(results.size())
.passedCandidates((int) results.values().stream()
.filter(r -> "PASS".equals(r.getDecision()))
.count())
.rejectedCandidates((int) results.values().stream()
.filter(r -> "REJECT".equals(r.getDecision()))
.count())
.holdCandidates((int) results.values().stream()
.filter(r -> "HOLD".equals(r.getDecision()))
.count())
.averageMatchScore(results.values().stream()
.filter(ScreeningResult::isSuccess)
.mapToDouble(ScreeningResult::getMatchScore)
.average()
.orElse(0.0))
.screeningResults(results)
.generatedTime(LocalDateTime.now())
.build();
// 保存报告
screeningReportRepository.save(report);
log.info("批量筛选报告生成完成,通过率: {:.2f}%",
(double) report.getPassedCandidates() / report.getTotalCandidates() * 100);
}
private AgentDecision.DecisionType mapToDecisionType(String decision) {
return switch (decision) {
case "PASS" -> AgentDecision.DecisionType.PASS;
case "REJECT" -> AgentDecision.DecisionType.REJECT;
case "HOLD" -> AgentDecision.DecisionType.HOLD;
default -> AgentDecision.DecisionType.ESCALATE;
};
}
private String generateDecisionId() {
return "DEC_" + System.currentTimeMillis() + "_" +
UUID.randomUUID().toString().substring(0, 8);
}
}
/**
* 筛选结果数据类
*/
@Data
@Builder
class ScreeningResult {
private String candidateId;
private String positionId;
private String decision; // PASS, REJECT, HOLD, ESCALATE
private Double confidence;
private String reason;
private Double matchScore;
private AIAssessment aiAssessment;
private Integer ruleFiredCount;
private Boolean success;
private String errorMessage;
private Long processingTime;
public static ScreeningResult error(String errorMessage) {
return ScreeningResult.builder()
.success(false)
.errorMessage(errorMessage)
.build();
}
}
/**
* 规则执行结果数据类
*/
@Data
@Builder
class RuleExecutionResult {
private Integer firedRules;
private DecisionResult decisionResult;
private Boolean success;
private String errorMessage;
}
/**
* 决策结果数据类
*/
@Data
class DecisionResult {
private String decision;
private Double confidence;
private String reason;
private Map<String, Object> details = new HashMap<>();
}
/**
* 批量筛选报告数据类
*/
@Data
@Builder
class BatchScreeningReport {
private String positionId;
private Integer totalCandidates;
private Integer passedCandidates;
private Integer rejectedCandidates;
private Integer holdCandidates;
private Double averageMatchScore;
private Map<String, ScreeningResult> screeningResults;
private LocalDateTime generatedTime;
}
自动化工作流服务
package com.company.hrauto.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.HashMap;
import java.util.Map;
/**
* 招聘自动化工作流服务
* 负责协调各个智能体和工作流引擎,实现端到端的招聘自动化
*/
@Service
@Slf4j
@Transactional
public class RecruitmentWorkflowService {
private final WorkflowEngineService workflowEngineService;
private final ResumeScreeningAgent resumeScreeningAgent;
private final InterviewSchedulingAgent interviewSchedulingAgent;
private final OfferManagementAgent offerManagementAgent;
private final NotificationService notificationService;
public RecruitmentWorkflowService(WorkflowEngineService workflowEngineService,
ResumeScreeningAgent resumeScreeningAgent,
InterviewSchedulingAgent interviewSchedulingAgent,
OfferManagementAgent offerManagementAgent,
NotificationService notificationService) {
this.workflowEngineService = workflowEngineService;
this.resumeScreeningAgent = resumeScreeningAgent;
this.interviewSchedulingAgent = interviewSchedulingAgent;
this.offerManagementAgent = offerManagementAgent;
this.notificationService = notificationService;
}
/**
* 启动端到端招聘工作流
*
* @param candidate 候选人
* @param position 招聘职位
* @return 工作流实例ID
*/
public String startEndToEndRecruitmentWorkflow(Candidate candidate, RecruitmentPosition position) {
log.info("启动端到端招聘工作流,候选人: {}, 职位: {}", candidate.getName(), position.getTitle());
try {
// 1. 准备工作流变量
Map<String, Object> variables = prepareWorkflowVariables(candidate, position);
// 2. 启动工作流实例
String instanceId = workflowEngineService.startWorkflowInstance(
WorkflowConstants.PROCESS_RESUME_SCREENING,
candidate.getCandidateId(),
variables
);
// 3. 记录工作流实例
WorkflowInstance workflowInstance = createWorkflowInstance(
instanceId, candidate, position, WorkflowInstance.WorkflowType.RESUME_SCREENING);
// 4. 发送通知
notificationService.sendWorkflowStartedNotification(candidate, position, instanceId);
log.info("端到端招聘工作流启动成功,实例ID: {}", instanceId);
return instanceId;
} catch (Exception e) {
log.error("启动招聘工作流失败,候选人: {}, 错误: {}", candidate.getName(), e.getMessage(), e);
throw new RecruitmentWorkflowException("启动招聘工作流失败: " + e.getMessage(), e);
}
}
/**
* 处理简历筛选阶段
*
* @param candidateId 候选人ID
* @param positionId 职位ID
* @return 筛选结果
*/
public ScreeningResult processResumeScreening(String candidateId, String positionId) {
log.info("处理简历筛选阶段,候选人ID: {}, 职位ID: {}", candidateId, positionId);
try {
// 1. 获取候选人和职位信息
Candidate candidate = candidateRepository.findByCandidateId(candidateId)
.orElseThrow(() -> new ResourceNotFoundException("候选人不存在: " + candidateId));
RecruitmentPosition position = positionRepository.findByPositionId(positionId)
.orElseThrow(() -> new ResourceNotFoundException("职位不存在: " + positionId));
// 2. 执行简历筛选
ScreeningResult screeningResult = resumeScreeningAgent.screenResume(candidate, position);
// 3. 更新候选人状态
updateCandidateStatus(candidate, screeningResult);
// 4. 继续工作流执行
continueWorkflowAfterScreening(candidate, position, screeningResult);
return screeningResult;
} catch (Exception e) {
log.error("简历筛选处理失败,候选人ID: {}, 错误: {}", candidateId, e.getMessage(), e);
throw new RecruitmentWorkflowException("简历筛选处理失败: " + e.getMessage(), e);
}
}
/**
* 处理面试安排阶段
*
* @param candidateId 候选人ID
* @param positionId 职位ID
* @return 面试安排结果
*/
public InterviewScheduleResult processInterviewScheduling(String candidateId, String positionId) {
log.info("处理面试安排阶段,候选人ID: {}, 职位ID: {}", candidateId, positionId);
try {
Candidate candidate = candidateRepository.findByCandidateId(candidateId)
.orElseThrow(() -> new ResourceNotFoundException("候选人不存在: " + candidateId));
RecruitmentPosition position = positionRepository.findByPositionId(positionId)
.orElseThrow(() -> new ResourceNotFoundException("职位不存在: " + positionId));
// 执行面试安排
InterviewScheduleResult scheduleResult = interviewSchedulingAgent.scheduleInterview(candidate, position);
// 更新工作流状态
continueWorkflowAfterInterviewScheduling(candidate, position, scheduleResult);
return scheduleResult;
} catch (Exception e) {
log.error("面试安排处理失败,候选人ID: {}, 错误: {}", candidateId, e.getMessage(), e);
throw new RecruitmentWorkflowException("面试安排处理失败: " + e.getMessage(), e);
}
}
/**
* 处理Offer决策阶段
*
* @param candidateId 候选人ID
* @param positionId 职位ID
* @param interviewResults 面试结果
* @return Offer决策结果
*/
public OfferDecisionResult processOfferDecision(String candidateId, String positionId,
Map<String, Object> interviewResults) {
log.info("处理Offer决策阶段,候选人ID: {}, 职位ID: {}", candidateId, positionId);
try {
Candidate candidate = candidateRepository.findByCandidateId(candidateId)
.orElseThrow(() -> new ResourceNotFoundException("候选人不存在: " + candidateId));
RecruitmentPosition position = positionRepository.findByPositionId(positionId)
.orElseThrow(() -> new ResourceNotFoundException("职位不存在: " + positionId));
// 执行Offer决策
OfferDecisionResult decisionResult = offerManagementAgent.makeOfferDecision(
candidate, position, interviewResults);
// 更新工作流状态
continueWorkflowAfterOfferDecision(candidate, position, decisionResult);
return decisionResult;
} catch (Exception e) {
log.error("Offer决策处理失败,候选人ID: {}, 错误: {}", candidateId, e.getMessage(), e);
throw new RecruitmentWorkflowException("Offer决策处理失败: " + e.getMessage(), e);
}
}
/**
* 获取工作流状态
*
* @param instanceId 工作流实例ID
* @return 工作流状态信息
*/
public WorkflowStatusInfo getWorkflowStatus(String instanceId) {
log.debug("获取工作流状态,实例ID: {}", instanceId);
try {
// 获取工作流实例
WorkflowInstance workflowInstance = workflowInstanceRepository.findByInstanceId(instanceId)
.orElseThrow(() -> new ResourceNotFoundException("工作流实例不存在: " + instanceId));
// 获取引擎状态
Map<String, Object> engineVariables = workflowEngineService.getProcessVariables(instanceId);
List<Task> activeTasks = workflowEngineService.getUserTasks(null, null);
return WorkflowStatusInfo.builder()
.instanceId(instanceId)
.status(workflowInstance.getStatus())
.currentNode(workflowInstance.getCurrentNode())
.engineVariables(engineVariables)
.activeTasks(activeTasks)
.startTime(workflowInstance.getStartTime())
.executionDuration(workflowInstance.getExecutionDuration())
.build();
} catch (Exception e) {
log.error("获取工作流状态失败,实例ID: {}, 错误: {}", instanceId, e.getMessage(), e);
throw new RecruitmentWorkflowException("获取工作流状态失败: " + e.getMessage(), e);
}
}
/**
* 人工干预工作流
*
* @param instanceId 工作流实例ID
* @param intervention 干预请求
* @return 干预结果
*/
public WorkflowInterventionResult interveneWorkflow(String instanceId, WorkflowIntervention intervention) {
log.info("人工干预工作流,实例ID: {}, 操作: {}", instanceId, intervention.getAction());
try {
WorkflowInstance workflowInstance = workflowInstanceRepository.findByInstanceId(instanceId)
.orElseThrow(() -> new ResourceNotFoundException("工作流实例不存在: " + instanceId));
WorkflowInterventionResult result;
switch (intervention.getAction()) {
case "SUSPEND":
workflowEngineService.suspendProcessInstance(instanceId);
workflowInstance.setStatus(WorkflowInstance.WorkflowStatus.SUSPENDED);
result = WorkflowInterventionResult.success("工作流已挂起");
break;
case "RESUME":
workflowEngineService.activateProcessInstance(instanceId);
workflowInstance.setStatus(WorkflowInstance.WorkflowStatus.RUNNING);
result = WorkflowInterventionResult.success("工作流已恢复");
break;
case "TERMINATE":
workflowEngineService.deleteProcessInstance(instanceId, intervention.getReason());
workflowInstance.setStatus(WorkflowInstance.WorkflowStatus.TERMINATED);
result = WorkflowInterventionResult.success("工作流已终止");
break;
case "UPDATE_VARIABLES":
workflowEngineService.setProcessVariables(instanceId, intervention.getVariables());
result = WorkflowInterventionResult.success("工作流变量已更新");
break;
default:
throw new IllegalArgumentException("不支持的操作: " + intervention.getAction());
}
workflowInstanceRepository.save(workflowInstance);
return result;
} catch (Exception e) {
log.error("工作流干预失败,实例ID: {}, 错误: {}", instanceId, e.getMessage(), e);
throw new RecruitmentWorkflowException("工作流干预失败: " + e.getMessage(), e);
}
}
/**
* 准备工作流变量
*/
private Map<String, Object> prepareWorkflowVariables(Candidate candidate, RecruitmentPosition position) {
Map<String, Object> variables = new HashMap<>();
variables.put(WorkflowConstants.VAR_CANDIDATE_ID, candidate.getCandidateId());
variables.put(WorkflowConstants.VAR_POSITION_ID, position.getPositionId());
variables.put("candidateName", candidate.getName());
variables.put("positionTitle", position.getTitle());
variables.put("department", position.getDepartment());
variables.put("urgencyLevel", position.getUrgency().name());
variables.put("initiator", "SYSTEM");
variables.put("startTime", LocalDateTime.now());
return variables;
}
/**
* 创建工作流实例记录
*/
private WorkflowInstance createWorkflowInstance(String instanceId, Candidate candidate,
RecruitmentPosition position,
WorkflowInstance.WorkflowType workflowType) {
WorkflowInstance workflowInstance = WorkflowInstance.builder()
.instanceId(instanceId)
.position(position)
.workflowType(workflowType)
.workflowDefinitionId(WorkflowConstants.PROCESS_RESUME_SCREENING)
.engineInstanceId(instanceId)
.status(WorkflowInstance.WorkflowStatus.RUNNING)
.inputParameters(buildInputParameters(candidate, position))
.startTime(LocalDateTime.now())
.build();
return workflowInstanceRepository.save(workflowInstance);
}
/**
* 更新候选人状态
*/
private void updateCandidateStatus(Candidate candidate, ScreeningResult screeningResult) {
switch (screeningResult.getDecision()) {
case "PASS":
candidate.setStatus(Candidate.CandidateStatus.INTERVIEW);
break;
case "REJECT":
candidate.setStatus(Candidate.CandidateStatus.REJECTED);
break;
case "HOLD":
candidate.setStatus(Candidate.CandidateStatus.SCREENING);
break;
default:
candidate.setStatus(Candidate.CandidateStatus.SCREENING);
}
candidate.setLastUpdatedTime(LocalDateTime.now());
candidateRepository.save(candidate);
}
/**
* 筛选后继续工作流
*/
private void continueWorkflowAfterScreening(Candidate candidate, RecruitmentPosition position,
ScreeningResult screeningResult) {
Map<String, Object> variables = new HashMap<>();
variables.put(WorkflowConstants.VAR_SCREENING_RESULT, screeningResult.getDecision());
variables.put(WorkflowConstants.VAR_MATCH_SCORE, screeningResult.getMatchScore());
variables.put("screeningReason", screeningResult.getReason());
// 查找并完成对应的任务
List<Task> tasks = workflowEngineService.getUserTasks(null, WorkflowConstants.GROUP_HR);
for (Task task : tasks) {
if (task.getProcessInstanceId().equals(getProcessInstanceId(candidate))) {
workflowEngineService.completeTask(task.getId(), variables);
break;
}
}
}
private String buildInputParameters(Candidate candidate, RecruitmentPosition position) {
Map<String, Object> params = new HashMap<>();
params.put("candidate", candidate.getCandidateId());
params.put("position", position.getPositionId());
params.put("timestamp", LocalDateTime.now().toString());
return JsonUtils.toJson(params);
}
private String getProcessInstanceId(Candidate candidate) {
// 实现根据候选人获取流程实例ID的逻辑
return "mock-process-instance-id";
}
}
/**
* 工作流状态信息数据类
*/
@Data
@Builder
class WorkflowStatusInfo {
private String instanceId;
private WorkflowInstance.WorkflowStatus status;
private String currentNode;
private Map<String, Object> engineVariables;
private List<Task> activeTasks;
private LocalDateTime startTime;
private Long executionDuration;
}
/**
* 工作流干预请求数据类
*/
@Data
@Builder
class WorkflowIntervention {
private String action; // SUSPEND, RESUME, TERMINATE, UPDATE_VARIABLES
private String reason;
private Map<String, Object> variables;
private String initiatedBy;
}
/**
* 工作流干预结果数据类
*/
@Data
@Builder
class WorkflowInterventionResult {
private Boolean success;
private String message;
private LocalDateTime interventionTime;
public static WorkflowInterventionResult success(String message) {
return WorkflowInterventionResult.builder()
.success(true)
.message(message)
.interventionTime(LocalDateTime.now())
.build();
}
public static WorkflowInterventionResult error(String message) {
return WorkflowInterventionResult.builder()
.success(false)
.message(message)
.interventionTime(LocalDateTime.now())
.build();
}
}
/**
* 招聘工作流异常类
*/
public class RecruitmentWorkflowException extends RuntimeException {
public RecruitmentWorkflowException(String message) {
super(message);
}
public RecruitmentWorkflowException(String message, Throwable cause) {
super(message, cause);
}
}
完整的测试用例
package com.company.hrauto.service;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;
/**
* 简历筛选智能体测试类
*/
@ExtendWith(MockitoExtension.class)
class ResumeScreeningAgentTest {
@Mock
private RuleEngineService ruleEngineService;
@Mock
private ResumeParserService resumeParserService;
@Mock
private AIService aiService;
@Mock
private AgentDecisionRepository agentDecisionRepository;
@Mock
private ScreeningReportRepository screeningReportRepository;
@InjectMocks
private ResumeScreeningAgent resumeScreeningAgent;
private Candidate testCandidate;
private RecruitmentPosition testPosition;
private ResumeData testResumeData;
private AIAssessment testAIAssessment;
@BeforeEach
void setUp() {
// 准备测试数据
testPosition = RecruitmentPosition.builder()
.positionId("POS-001")
.title("Java开发工程师")
.department("技术部")
.level("P6")
.location("北京")
.headcount(3)
.hiredCount(0)
.status(RecruitmentPosition.PositionStatus.OPEN)
.urgency(RecruitmentPosition.UrgencyLevel.HIGH)
.build();
testCandidate = Candidate.builder()
.candidateId("CAND-001")
.position(testPosition)
.name("张三")
.email("zhangsan@example.com")
.phone("13800138000")
.resumePath("/resumes/zhangsan.pdf")
.source(Candidate.CandidateSource.WEBSITE)
.status(Candidate.CandidateStatus.NEW)
.appliedTime(LocalDateTime.now())
.build();
testResumeData = ResumeData.builder()
.name("张三")
.education("本科")
.workExperience(5)
.skills(List.of("Java", "Spring", "MySQL"))
.build();
testAIAssessment = AIAssessment.builder()
.overallMatchScore(85.5)
.skillMatchScore(90.0)
.experienceMatchScore(80.0)
.educationMatchScore(85.0)
.keySkillsMatches(3)
.build();
}
@Test
@DisplayName("简历筛选 - 高匹配度通过")
void testScreenResume_HighMatch_Pass() {
// 模拟依赖行为
when(resumeParserService.parseResume(anyString())).thenReturn(testResumeData);
when(aiService.assessCandidate(any(), any())).thenReturn(testAIAssessment);
RuleExecutionResult ruleResult = RuleExecutionResult.builder()
.firedRules(2)
.decisionResult(new DecisionResult())
.success(true)
.build();
ruleResult.getDecisionResult().setDecision("PASS");
ruleResult.getDecisionResult().setConfidence(0.95);
ruleResult.getDecisionResult().setReason("匹配度高达85.5%,直接通过");
when(ruleEngineService.executeRules(eq("resumeScreening"), anyList()))
.thenReturn(ruleResult);
// 执行测试
ScreeningResult result = resumeScreeningAgent.screenResume(testCandidate, testPosition);
// 验证结果
assertNotNull(result);
assertTrue(result.isSuccess());
assertEquals("PASS", result.getDecision());
assertEquals(85.5, result.getMatchScore());
assertEquals(0.95, result.getConfidence());
assertEquals("匹配度高达85.5%,直接通过", result.getReason());
// 验证依赖调用
verify(resumeParserService).parseResume(testCandidate.getResumePath());
verify(aiService).assessCandidate(testResumeData, testPosition);
verify(ruleEngineService).executeRules(eq("resumeScreening"), anyList());
verify(agentDecisionRepository).save(any(AgentDecision.class));
}
@Test
@DisplayName("简历筛选 - 低匹配度拒绝")
void testScreenResume_LowMatch_Reject() {
// 设置低匹配度
testAIAssessment.setOverallMatchScore(65.0);
when(resumeParserService.parseResume(anyString())).thenReturn(testResumeData);
when(aiService.assessCandidate(any(), any())).thenReturn(testAIAssessment);
RuleExecutionResult ruleResult = RuleExecutionResult.builder()
.firedRules(1)
.decisionResult(new DecisionResult())
.success(true)
.build();
ruleResult.getDecisionResult().setDecision("REJECT");
ruleResult.getDecisionResult().setConfidence(0.88);
ruleResult.getDecisionResult().setReason("匹配度65.0%,不符合要求");
when(ruleEngineService.executeRules(eq("resumeScreening"), anyList()))
.thenReturn(ruleResult);
// 执行测试
ScreeningResult result = resumeScreeningAgent.screenResume(testCandidate, testPosition);
// 验证结果
assertNotNull(result);
assertTrue(result.isSuccess());
assertEquals("REJECT", result.getDecision());
assertEquals(65.0, result.getMatchScore());
assertEquals("匹配度65.0%,不符合要求", result.getReason());
}
@Test
@DisplayName("简历筛选 - 解析失败")
void testScreenResume_ParseFailure() {
// 模拟解析失败
when(resumeParserService.parseResume(anyString()))
.thenThrow(new ResumeParseException("简历格式不支持"));
// 执行测试
ScreeningResult result = resumeScreeningAgent.screenResume(testCandidate, testPosition);
// 验证结果
assertNotNull(result);
assertFalse(result.isSuccess());
assertTrue(result.getErrorMessage().contains("简历格式不支持"));
}
@Test
@DisplayName("批量简历筛选 - 成功案例")
void testBatchScreenResumes_Success() {
// 准备测试数据
List<Candidate> candidates = List.of(testCandidate);
// 模拟单个筛选成功
ScreeningResult mockResult = ScreeningResult.builder()
.candidateId(testCandidate.getCandidateId())
.decision("PASS")
.confidence(0.95)
.success(true)
.build();
when(resumeParserService.parseResume(anyString())).thenReturn(testResumeData);
when(aiService.assessCandidate(any(), any())).thenReturn(testAIAssessment);
RuleExecutionResult ruleResult = RuleExecutionResult.builder()
.firedRules(2)
.decisionResult(new DecisionResult())
.success(true)
.build();
ruleResult.getDecisionResult().setDecision("PASS");
when(ruleEngineService.executeRules(eq("resumeScreening"), anyList()))
.thenReturn(ruleResult);
// 执行测试
Map<String, ScreeningResult> results = resumeScreeningAgent.batchScreenResumes(candidates, testPosition);
// 验证结果
assertNotNull(results);
assertEquals(1, results.size());
assertTrue(results.get(testCandidate.getCandidateId()).isSuccess());
// 验证报告生成
verify(screeningReportRepository).save(any(BatchScreeningReport.class));
}
}
/**
* 招聘工作流服务测试类
*/
@ExtendWith(MockitoExtension.class)
class RecruitmentWorkflowServiceTest {
@Mock
private WorkflowEngineService workflowEngineService;
@Mock
private ResumeScreeningAgent resumeScreeningAgent;
@Mock
private InterviewSchedulingAgent interviewSchedulingAgent;
@Mock
private OfferManagementAgent offerManagementAgent;
@Mock
private NotificationService notificationService;
@Mock
private CandidateRepository candidateRepository;
@Mock
private PositionRepository positionRepository;
@Mock
private WorkflowInstanceRepository workflowInstanceRepository;
@InjectMocks
private RecruitmentWorkflowService recruitmentWorkflowService;
private Candidate testCandidate;
private RecruitmentPosition testPosition;
@BeforeEach
void setUp() {
testPosition = RecruitmentPosition.builder()
.positionId("POS-001")
.title("Java开发工程师")
.department("技术部")
.build();
testCandidate = Candidate.builder()
.candidateId("CAND-001")
.position(testPosition)
.name("张三")
.email("zhangsan@example.com")
.build();
}
@Test
@DisplayName("启动端到端工作流 - 成功案例")
void testStartEndToEndRecruitmentWorkflow_Success() {
// 模拟依赖行为
when(workflowEngineService.startWorkflowInstance(anyString(), anyString(), anyMap()))
.thenReturn("PROCESS-001");
when(workflowInstanceRepository.save(any(WorkflowInstance.class)))
.thenAnswer(invocation -> invocation.getArgument(0));
// 执行测试
String instanceId = recruitmentWorkflowService.startEndToEndRecruitmentWorkflow(
testCandidate, testPosition);
// 验证结果
assertNotNull(instanceId);
assertEquals("PROCESS-001", instanceId);
// 验证依赖调用
verify(workflowEngineService).startWorkflowInstance(
eq(WorkflowConstants.PROCESS_RESUME_SCREENING),
eq(testCandidate.getCandidateId()),
anyMap());
verify(notificationService).sendWorkflowStartedNotification(
eq(testCandidate), eq(testPosition), eq("PROCESS-001"));
}
@Test
@DisplayName("处理简历筛选 - 通过案例")
void testProcessResumeScreening_Pass() {
// 模拟依赖行为
when(candidateRepository.findByCandidateId("CAND-001"))
.thenReturn(java.util.Optional.of(testCandidate));
when(positionRepository.findByPositionId("POS-001"))
.thenReturn(java.util.Optional.of(testPosition));
ScreeningResult screeningResult = ScreeningResult.builder()
.decision("PASS")
.matchScore(85.5)
.success(true)
.build();
when(resumeScreeningAgent.screenResume(testCandidate, testPosition))
.thenReturn(screeningResult);
// 执行测试
ScreeningResult result = recruitmentWorkflowService.processResumeScreening(
"CAND-001", "POS-001");
// 验证结果
assertNotNull(result);
assertEquals("PASS", result.getDecision());
assertEquals(85.5, result.getMatchScore());
// 验证状态更新
verify(candidateRepository).save(testCandidate);
assertEquals(Candidate.CandidateStatus.INTERVIEW, testCandidate.getStatus());
}
@Test
@DisplayName("工作流干预 - 挂起操作")
void testInterveneWorkflow_Suspend() {
// 准备测试数据
WorkflowInstance workflowInstance = WorkflowInstance.builder()
.instanceId("PROCESS-001")
.status(WorkflowInstance.WorkflowStatus.RUNNING)
.build();
when(workflowInstanceRepository.findByInstanceId("PROCESS-001"))
.thenReturn(java.util.Optional.of(workflowInstance));
WorkflowIntervention intervention = WorkflowIntervention.builder()
.action("SUSPEND")
.reason("系统维护")
.initiatedBy("admin")
.build();
// 执行测试
WorkflowInterventionResult result = recruitmentWorkflowService.interveneWorkflow(
"PROCESS-001", intervention);
// 验证结果
assertNotNull(result);
assertTrue(result.isSuccess());
assertEquals("工作流已挂起", result.getMessage());
// 验证状态更新
verify(workflowEngineService).suspendProcessInstance("PROCESS-001");
assertEquals(WorkflowInstance.WorkflowStatus.SUSPENDED, workflowInstance.getStatus());
}
@Test
@DisplayName("获取工作流状态 - 成功案例")
void testGetWorkflowStatus_Success() {
// 准备测试数据
WorkflowInstance workflowInstance = WorkflowInstance.builder()
.instanceId("PROCESS-001")
.status(WorkflowInstance.WorkflowStatus.RUNNING)
.currentNode("aiScreeningTask")
.startTime(LocalDateTime.now().minusHours(1))
.executionDuration(3600000L)
.build();
when(workflowInstanceRepository.findByInstanceId("PROCESS-001"))
.thenReturn(java.util.Optional.of(workflowInstance));
when(workflowEngineService.getProcessVariables("PROCESS-001"))
.thenReturn(Map.of("candidateId", "CAND-001"));
when(workflowEngineService.getUserTasks(null, null))
.thenReturn(List.of());
// 执行测试
WorkflowStatusInfo statusInfo = recruitmentWorkflowService.getWorkflowStatus("PROCESS-001");
// 验证结果
assertNotNull(statusInfo);
assertEquals("PROCESS-001", statusInfo.getInstanceId());
assertEquals(WorkflowInstance.WorkflowStatus.RUNNING, statusInfo.getStatus());
assertEquals("aiScreeningTask", statusInfo.getCurrentNode());
assertNotNull(statusInfo.getEngineVariables());
}
}
/**
* 集成测试 - 完整招聘流程
*/
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class RecruitmentWorkflowIntegrationTest {
@Autowired
private RecruitmentWorkflowService recruitmentWorkflowService;
@Autowired
private CandidateRepository candidateRepository;
@Autowired
private PositionRepository positionRepository;
@Autowired
private WorkflowInstanceRepository workflowInstanceRepository;
@Autowired
private TestDataBuilder testDataBuilder;
private RecruitmentPosition testPosition;
private List<Candidate> testCandidates;
@BeforeAll
void setUp() {
// 初始化测试数据
testPosition = testDataBuilder.createTestPosition();
testCandidates = testDataBuilder.createTestCandidates(10, testPosition);
positionRepository.save(testPosition);
candidateRepository.saveAll(testCandidates);
}
@Test
@DisplayName("完整招聘流程集成测试")
void testCompleteRecruitmentProcess() {
// 1. 为每个候选人启动工作流
Map<String, String> workflowInstances = new HashMap<>();
for (Candidate candidate : testCandidates) {
String instanceId = recruitmentWorkflowService.startEndToEndRecruitmentWorkflow(
candidate, testPosition);
workflowInstances.put(candidate.getCandidateId(), instanceId);
assertNotNull(instanceId, "工作流实例ID不应为空");
}
// 2. 执行简历筛选
Map<String, ScreeningResult> screeningResults = new HashMap<>();
for (Candidate candidate : testCandidates) {
ScreeningResult result = recruitmentWorkflowService.processResumeScreening(
candidate.getCandidateId(), testPosition.getPositionId());
screeningResults.put(candidate.getCandidateId(), result);
assertTrue(result.isSuccess(), "简历筛选应该成功");
}
// 3. 统计筛选结果
long passedCount = screeningResults.values().stream()
.filter(r -> "PASS".equals(r.getDecision()))
.count();
long rejectedCount = screeningResults.values().stream()
.filter(r -> "REJECT".equals(r.getDecision()))
.count();
log.info("简历筛选结果 - 总数: {}, 通过: {}, 拒绝: {}",
testCandidates.size(), passedCount, rejectedCount);
// 4. 验证工作流状态
for (String instanceId : workflowInstances.values()) {
WorkflowStatusInfo statusInfo = recruitmentWorkflowService.getWorkflowStatus(instanceId);
assertNotNull(statusInfo, "工作流状态信息不应为空");
assertNotNull(statusInfo.getStatus(), "工作流状态不应为空");
}
// 5. 验证数据一致性
for (Candidate candidate : testCandidates) {
Candidate updatedCandidate = candidateRepository.findByCandidateId(candidate.getCandidateId())
.orElseThrow();
assertNotNull(updatedCandidate.getStatus(), "候选人状态不应为空");
assertNotNull(updatedCandidate.getLastUpdatedTime(), "最后更新时间不应为空");
}
// 性能验证
long totalProcessingTime = screeningResults.values().stream()
.mapToLong(r -> r.getProcessingTime() != null ? r.getProcessingTime() : 0)
.sum();
double averageProcessingTime = (double) totalProcessingTime / testCandidates.size();
log.info("性能指标 - 平均处理时间: {:.2f}ms", averageProcessingTime);
assertTrue(averageProcessingTime < 5000, "平均处理时间应小于5秒");
}
@Test
@DisplayName("并发工作流处理测试")
void testConcurrentWorkflowProcessing() throws InterruptedException {
int threadCount = 5;
int candidatesPerThread = 4;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failureCount = new AtomicInteger(0);
for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
try {
for (int j = 0; j < candidatesPerThread; j++) {
Candidate candidate = testDataBuilder.createTestCandidate(testPosition);
candidateRepository.save(candidate);
try {
String instanceId = recruitmentWorkflowService.startEndToEndRecruitmentWorkflow(
candidate, testPosition);
ScreeningResult result = recruitmentWorkflowService.processResumeScreening(
candidate.getCandidateId(), testPosition.getPositionId());
if (result.isSuccess()) {
successCount.incrementAndGet();
} else {
failureCount.incrementAndGet();
}
} catch (Exception e) {
failureCount.incrementAndGet();
log.warn("并发处理失败: {}", e.getMessage());
}
}
} finally {
latch.countDown();
}
});
}
latch.await(2, TimeUnit.MINUTES);
executor.shutdown();
int totalProcessed = successCount.get() + failureCount.get();
double successRate = (double) successCount.get() / totalProcessed * 100;
log.info("并发测试结果 - 总数: {}, 成功: {}, 失败: {}, 成功率: {:.2f}%",
totalProcessed, successCount.get(), failureCount.get(), successRate);
assertTrue(successRate >= 90, "并发处理成功率应大于90%");
}
}
/**
* 性能基准测试
*/
@SpringBootTest
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class RecruitmentPerformanceTest {
@Autowired
private RecruitmentWorkflowService recruitmentWorkflowService;
@Autowired
private ResumeScreeningAgent resumeScreeningAgent;
@Test
@DisplayName("大规模简历筛选性能测试")
void testLargeScaleResumeScreeningPerformance() {
int candidateCount = 100;
List<Candidate> candidates = testDataBuilder.createTestCandidates(candidateCount, testPosition);
candidateRepository.saveAll(candidates);
long startTime = System.currentTimeMillis();
// 执行批量筛选
Map<String, ScreeningResult> results = resumeScreeningAgent.batchScreenResumes(
candidates, testPosition);
long endTime = System.currentTimeMillis();
long totalTime = endTime - startTime;
double averageTime = (double) totalTime / candidateCount;
long successCount = results.values().stream().filter(ScreeningResult::isSuccess).count();
double successRate = (double) successCount / candidateCount * 100;
log.info("大规模筛选性能结果:");
log.info("候选人数量: {}", candidateCount);
log.info("总处理时间: {}ms", totalTime);
log.info("平均处理时间: {:.2f}ms", averageTime);
log.info("成功率: {:.2f}%", successRate);
log.info("吞吐量: {:.2f} 个/秒", (double) candidateCount / (totalTime / 1000.0));
// 性能断言
assertTrue(averageTime < 1000, "平均处理时间应小于1秒");
assertTrue(successRate >= 95, "成功率应大于95%");
}
}
更多推荐


所有评论(0)