京东 JoyAgent 有多强?ReAct+Plan-Executor 双模式拆解,搞定多智能体落地最后一公里
JoyAgent-JDGenie是京东开源的轻量化多智能体框架,采用SSE流式响应实现实时交互。其核心代理协作机制包括:1)BaseAgent提供统一执行骨架;2)ReAct模式通过"思考-行动-观察"循环反馈驱动任务;3)Plan-Executor模式先拆解目标再执行;4)异步线程池处理工具调用。框架支持策略模式的分发机制和动态提示词注入,具有较好的扩展性。当前存在提示词管理
- 项目名称: JoyAgent-JDGenie
- 开发公司: 京东
- 开源性质: 完全开源
- 定位: 解决快速构建多智能体产品的最后一公里问题
- 特点: 轻量化、通用型、开箱即用
之前我们得出结论,从代理协作机制、工具系统集成、实时通信三个核心维度深入分析该项目是如何运作的。本文将围绕代理协作机制展开研究,共同探讨JoyAgent的设计理念!
那么,我们以“最简单的开始”来走进JoyAgent!
用户请求 → MultiAgentService → handleMultiAgentRequest
↓
智能体选择 (Planning/Executor/ReAct/Summary)
↓【异步线程池工具】【深度 | 非深度存在差异】
BaseAgent.run() → think() → act() → observe()
↓
ToolCollection.execute() → 工具执行 → 结果返回
↓
Memory.update() → 状态更新 → 下一步决策
↓
流式响应 → 前端展示 → 任务完成



明显,我们需要研究的接口(控制层)就是 gpt/queryAgentStreamIncr
queryAgentStreamIncr——一切的入口
通过 SSE 协议向客户端持续推送 GPT 或多 Agent 的流式输出,实现实时显示的效果。
/**
* 处理Agent流式增量查询请求,返回SSE事件流
* @param params 查询请求参数对象,包含GPT查询所需信息
* @return 返回SSE事件发射器,用于流式传输增量响应结果
*/
@RequestMapping(value = "/web/api/v1/gpt/queryAgentStreamIncr", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter queryAgentStreamIncr(@RequestBody GptQueryReq params) {
return gptProcessService.queryMultiAgentIncrStream(params);
}
produces = MediaType.TEXT_EVENT_STREAM_VALUE:指定响应的 MIME 类型为text/event-stream,这是 SSE 的标准类型,客户端会保持长连接,服务器可以持续推送数据。SseEmitter:Spring 提供的一个类,用于支持 Server-Sent Events,可以从服务器向客户端异步发送事件流。
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GptQueryReq {
// 自然语言查询
private String query;
private String sessionId;
private String requestId;
// 是否开启深度思考
private Integer deepThink;
/**
* 前端传入交付物格式:html(网页模式),docs(文档模式), table(表格模式)
*/
private String outputStyle;
private String traceId;
private String user;
}
queryMultiAgentIncrStream——查询多智能体增量流的方法。
@Slf4j
@Service
public class GptProcessServiceImpl implements IGptProcessService {
@Autowired
private IMultiAgentService multiAgentService;
/**
* 查询多智能体增量流的方法。
*
* @param req GptQueryReq类型的请求对象,包含查询所需的各种参数
* @return 返回一个SseEmitter对象,用于服务器发送事件流
*/
public SseEmitter queryMultiAgentIncrStream(GptQueryReq req) {
// 设置SseEmitter的超时时间为1小时
long timeoutMillis = TimeUnit.HOURS.toMillis(1);
// 设置请求中的用户为 "genie"
req.setUser("genie");
// 如果请求中的深度思考参数deepThink为null,则设置为0,否则保持原值
req.setDeepThink(req.getDeepThink() == null? 0: req.getDeepThink());
String traceId = ChateiUtils.getRequestId(req);
req.setTraceId(traceId);
// 使用设置的超时时间和追踪ID构建SseEmitter对象
final SseEmitter emitter = SseUtil.build(timeoutMillis, req.getTraceId());
// 调用多智能体服务的方法,进行基于请求和SseEmitter的搜索操作
multiAgentService.searchForAgentRequest(req, emitter);
// 记录请求信息到日志,便于后续查看和调试
log.info("queryMultiAgentIncrStream GptQueryReq request:{}", req);
// 返回构建好并使用过的SseEmitter对象
return emitter;
}
}
searchForAgentRequest——处理智能体请求
@Slf4j
@Component
// 多智能体服务实现类,实现IMultiAgentService接口
public class MultiAgentServiceImpl implements IMultiAgentService {
@Autowired
private GenieConfig genieConfig;
@Autowired
private Map<AgentType, AgentResponseHandler> handlerMap;
// 处理智能体请求
@Override
public AutoBotsResult searchForAgentRequest(GptQueryReq gptQueryReq, SseEmitter sseEmitter) {
AgentRequest agentRequest = buildAgentRequest(gptQueryReq);
log.info("{} start handle Agent request: {}", gptQueryReq.getRequestId(), JSON.toJSONString(agentRequest));
try {
handleMultiAgentRequest(agentRequest, sseEmitter);
} catch (Exception e) {
log.error("{}, error in requestMultiAgent, deepThink: {}, errorMsg: {}", gptQueryReq.getRequestId(), gptQueryReq.getDeepThink(), e.getMessage(), e);
throw e;
} finally {
log.info("{}, agent.query.web.singleRequest end, requestId: {}", gptQueryReq.getRequestId(), JSON.toJSONString(gptQueryReq));
}
// 将AgentRequest转换为AutoBotsResult并设置状态为loading
return ChateiUtils.toAutoBotsResult(agentRequest, AutoBotsResultStatus.loading.name());
}
// 处理多智能体请求的具体方法
public void handleMultiAgentRequest(AgentRequest autoReq, SseEmitter sseEmitter) {
long startTime = System.currentTimeMillis();
Request request = buildHttpRequest(autoReq);
log.info("{} agentRequest:{}", autoReq.getRequestId(), JSON.toJSONString(request));
// 创建OkHttpClient,并设置各种超时时间
OkHttpClient client = new OkHttpClient.Builder()
.connectTimeout(60, TimeUnit.SECONDS)
.readTimeout(genieConfig.getSseClientReadTimeout(), TimeUnit.SECONDS)
.writeTimeout(1800, TimeUnit.SECONDS)
.callTimeout(genieConfig.getSseClientConnectTimeout(), TimeUnit.SECONDS)
.build();
// 异步执行HTTP请求
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
log.error("onFailure {}", e.getMessage(), e);
}
@Override
public void onResponse(Call call, Response response) {
List<AgentResponse> agentRespList = new ArrayList<>();
EventResult eventResult = new EventResult();
ResponseBody responseBody = response.body();
if (responseBody == null) {
log.error("{} auto agent empty response body", autoReq.getRequestId());
return;
}
try {
if (!response.isSuccessful()) {
log.error("{}, response body is failed: {}", autoReq.getRequestId(), responseBody.string());
return;
}
String line;
BufferedReader reader = new BufferedReader(
new InputStreamReader(responseBody.byteStream())
);
while ((line = reader.readLine()) != null) {
if (!line.startsWith("data:")) {
continue;
}
String data = line.substring(5);
if (data.equals("[DONE]")) {
// 接收到[DONE]标记时记录日志并结束循环
log.info("{} data equals with [DONE] {}:", autoReq.getRequestId(), data);
break;
}
if (data.startsWith("heartbeat")) {
// 接收到心跳数据时构建心跳结果并发送
GptProcessResult result = buildHeartbeatData(autoReq.getRequestId());
sseEmitter.send(result);
log.info("{} heartbeat-data: {}", autoReq.getRequestId(), data);
continue;
}
// 记录接收到来自自动控制器的数据日志
log.info("{} recv from autocontroller: {}", autoReq.getRequestId(), data);
// 将接收到的数据解析为AgentResponse
AgentResponse agentResponse = JSON.parseObject(data, AgentResponse.class);
AgentType agentType = AgentType.fromCode(autoReq.getAgentType());
AgentResponseHandler handler = handlerMap.get(agentType);
// 使用处理器处理响应并构建GptProcessResult
GptProcessResult result = handler.handle(autoReq, agentResponse, agentRespList, eventResult);
// 发送处理结果
sseEmitter.send(result);
if (result.isFinished()) {
// 任务完成时记录任务执行时间并完成SseEmitter
log.info("{} task total cost time:{}ms", autoReq.getRequestId(), System.currentTimeMillis() - startTime);
sseEmitter.complete();
}
}
} catch (Exception e) {
log.error("", e);
}
}
});
}
private Request buildHttpRequest(AgentRequest autoReq) {
String reqId = autoReq.getRequestId();
autoReq.setRequestId(autoReq.getRequestId());
String url = "http://127.0.0.1:8080/AutoAgent";
RequestBody body = RequestBody.create(
MediaType.parse("application/json"),
JSONObject.toJSONString(autoReq)
);
autoReq.setRequestId(reqId);
return new Request.Builder().url(url).post(body).build();
}
private GptProcessResult buildDefaultAutobotsResult(AgentRequest autoReq, String errMsg) {
GptProcessResult result = new GptProcessResult();
boolean isRouter = AgentType.ROUTER.getValue().equals(autoReq.getAgentType());
if (isRouter) {
result.setStatus("success");
result.setFinished(true);
result.setResponse(errMsg);
result.setTraceId(autoReq.getRequestId());
} else {
result.setResultMap(new HashMap<>());
result.setStatus("failed");
result.setFinished(true);
result.setErrorMsg(errMsg);
}
return result;
}
private AgentRequest buildAgentRequest(GptQueryReq req) {
AgentRequest request = new AgentRequest();
request.setRequestId(req.getTraceId());
request.setErp(req.getUser());
request.setQuery(req.getQuery());
request.setAgentType(req.getDeepThink() == 0 ? 5 : 3);
request.setSopPrompt(request.getAgentType() == 3 ? genieConfig.getGenieSopPrompt() : "");
request.setBasePrompt(request.getAgentType() == 5 ? genieConfig.getGenieBasePrompt() : "");
request.setIsStream(true);
request.setOutputStyle(req.getOutputStyle());
return request;
}
// 构建心跳数据的GptProcessResult
private GptProcessResult buildHeartbeatData(String requestId) {
GptProcessResult result = new GptProcessResult();
result.setFinished(false);
result.setStatus("success");
result.setResponseType(ResponseTypeEnum.text.name());
result.setResponse("");
result.setResponseAll("");
result.setUseTimes(0);
result.setUseTokens(0);
result.setReqId(requestId);
result.setPackageType("heartbeat");
result.setEncrypted(false);
return result;
}
}

AutoAgent—执行智能体调度

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AgentRequest {
private String requestId;
private String erp;
private String query;
private Integer agentType;
private String basePrompt;
private String sopPrompt;
private Boolean isStream;
private List<Message> messages;
private String outputStyle; // 交付物产出格式:html(网页模式), docs(文档模式), table(表格模式)
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class Message {
private String role;
private String content;
private String commandCode;
private List<FileInformation> uploadFile;
private List<FileInformation> files;
}
}
我们能够看到这里是通过线程池异步处理AI请求,当前的线程池数量通过源码能够看到相关的参数设置。

// 创建一个线程池执行器对象 executor
// 核心线程数为 poolSize (100)
// 最大线程数为 maxPoolSize (1000)
// 线程存活时间为 60000 毫秒(60 秒),即当线程数超过核心线程数时,多余的空闲线程在存活 60000 毫秒后会被销毁
// 使用 SynchronousQueue 作为任务队列,SynchronousQueue 没有容量,每个插入操作必须等待另一个线程的移除操作,反之亦然。
// 使用自定义的线程工厂 threadFactory 来创建新线程,线程工厂可以控制线程的一些属性,如线程名、线程优先级等。
// 使用自定义的拒绝策略处理器 handler 来处理当线程池无法接受新任务时的情况
executor = new ThreadPoolExecutor(poolSize, maxPoolSize, 60000L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), threadFactory, handler);
// 执行调度引擎
ThreadUtil.execute(() -> {
try {
Printer printer = new SSEPrinter(emitter, request, request.getAgentType());
AgentContext agentContext = AgentContext.builder()
.requestId(request.getRequestId())
.sessionId(request.getRequestId())
.printer(printer)
.query(request.getQuery())
.task("")
.dateInfo(DateUtil.CurrentDateInfo())
.productFiles(new ArrayList<>())
.taskProductFiles(new ArrayList<>())
.sopPrompt(request.getSopPrompt())
.basePrompt(request.getBasePrompt())
.agentType(request.getAgentType())
.isStream(Objects.nonNull(request.getIsStream()) ? request.getIsStream() : false)
.templateType("dataAgent".equals(request.getOutputStyle()) ? "fix" : "empty")
.build();
// 构建工具列表
agentContext.setToolCollection(buildToolCollection(agentContext, request));
// 根据数据类型获取对应的处理器
AgentHandlerService handler = agentHandlerFactory.getHandler(agentContext, request);
// 执行处理逻辑
handler.handle(agentContext, request);
// 关闭连接
emitter.complete();
} catch (Exception e) {
log.error("{} auto agent error", request.getRequestId(), e);
}
});
getBasePrompt()——此时Agent提示词
# 要求
- 需要结合互联网知识来完成用户的问题时,需要先试用搜索工具搜索最新的信息
- 如果回答用户问题时,如果用户没有指定输出格式,尽量使用HTML网页报告输出网页版报告, 如果用户指定了输出格式,则按用户指定的格式输出。
- 如果用户指定“输出表格”、“结构化展示”、“结构化输出”或者“抽取相关指标”,尽量使用excel或者csv输出数据;如果已经生成了相应的Excel、csv文件,说明已经满足了“结构化展示”、“结构化输出”等要求。
- 默认工作语言: **中文**
- 如果明确提供,则使用用户指定的语言作为工作语言
- 所有思维和响应必须使用工作语言
- 优先选择合适 的工具完成任务,不要重复使用相同工具进行尝试
# 解决问题的流程
请使用交替进行的“思考(Thought)、行动(Action)、观察(Observation)"三个步骤来系统地解决回答任务。
思考:基于当前获得的信息进行推理和反思,明确下一步行动的目标,使用平文本输出,不超过200字。
行动:用于表示需要调用的工具,每一步行动必须是以下两种之一:
1、工具调用 [Function Calling]:根据任务需要,确定调用工具。如果用户问题是从上传的文件中直接抽取相关指标,请不要调用code_interpreter工具。
2、Finish[答案]:得出明确答案后使用此操作,返回答案并终止任务。
观察:记录前一步行动的结果。
你可以进行多轮推理和检索,但必须严格按照上述格式进行操作,尤其是每一步“行动”只能使用上述两种类型之一。
# 示例
## 问题 1:
科罗拉多造山带东部区域延伸到的区域的海拔范围是多少?
思考:了解科罗拉多造山带东部区域延伸到的区域的海拔范围,我需要先明确科罗拉多造山带东部区域延伸到哪些区域,再查找这些区域的海拔范围。第一步,我将通过网络搜索获取科罗拉多造山带东部区域延伸到的区域的相关信息。
行动:搜索[“科罗拉多造山带概况,特别是东部延伸区域的信息”]
观察:科罗拉多造山带是科罗拉多及其周边地区造山运动的一段。
思考:通过 搜索得知,科罗拉多造山带东部区域延伸至高平原。接下来,我需要搜索高平原的海拔范围。
行动:搜索 [高平原的海拔范围]
观察:科罗拉多造山带东部区域延伸至高平原,高平原是大平原的一个分区,其海拔从 1800 到 7000 英尺(550 到 2130 米)不等。
思考:我已经得到了答案[1800 到 7000 英尺],可以结束任务。
行动:Finish
## 问题2:
分析一下三大电商平台京东、淘宝、拼多多 的优劣势
思考:分析京东、淘宝、拼多多三大电商平台的优劣势,我计划先分别搜索各平台优势和劣势的相关信息,再进行整理和分析,最后将结果保存为 HTML 文件。第一步,我需要使用 搜索 工具搜索京东、淘宝、拼多多优势和劣势的相关内容。
行动:搜索 [搜索京东、淘宝、拼多多优势和劣势的相关内容]
观察:搜索结果已经保存到文件中。
思考:已获取到京东、淘宝、拼多多优势和劣势的 相关信息,接下来我将对这些信息进行整理和分析,形成一份详细的分析报告,并使用工具将输出 HTML 报告文件。
行动:执行 HTML 报告工具
观察:已获取到京东、淘宝、拼多多优势和劣势的相关信息,接下来我将对这些信息进行整理和分析,形成一份详细的分析报告。
思考:我已经得到了答案,可以结束任务。
行动:Finish
AgentHandlerService——策略模式-开启深度思考的密钥

React 模式和 Plan-Executor 模式的对比
表格 还在加载中,请等待加载完成后再尝试复制
- React 模式是 “反馈驱动”:流程的起点是 “当前结果是否达标”,核心是 “根据反馈调整动作”(比如 “看到文案有语病→修改语病→再检查是否还有语病”);
- Plan-Executor 模式是 “目标驱动”:流程的起点是 “用户需求目标”,核心是 “先拆解目标为步骤,再落地步骤”(比如 “用户要一篇报告→拆解为‘查资料→列提纲→写正文’→逐一执行”)。
那么接下来针对两种模式的代码实现进行分析
public interface AgentHandlerService {
/**
* 处理Agent请求
*/
String handle(AgentContext context, AgentRequest request);
/**
* 进入handler条件
*/
Boolean support(AgentContext context, AgentRequest request);
}
@Component
public class AgentHandlerFactory {
private final Map<String, AgentHandlerService> handlerMap = new ConcurrentHashMap<>();
// 构造函数注入所有DataHandler实现
@Autowired
public AgentHandlerFactory(List<AgentHandlerService> handlers) {
// 初始化处理器映射
for (AgentHandlerService handler : handlers) {
// 可根据Handler的supports方法或自定义注解来注册
handlerMap.put(handler.getClass().getSimpleName().toLowerCase(), handler);
}
}
// 根据类型获取处理器
public AgentHandlerService getHandler(AgentContext context, AgentRequest request) {
if (Objects.isNull(context) || Objects.isNull(request)) {
return null;
}
// 方法1:通过supports方法匹配
for (AgentHandlerService handler : handlerMap.values()) {
if (handler.support(context, request)) {
return handler;
}
}
return null;
}
}
ReactHandlerImpl——React 模式
@Component
public class ReactHandlerImpl implements AgentHandlerService {
@Override
public String handle(AgentContext agentContext, AgentRequest request) {
// 创建ReActAgent执行器实例,传入智能体上下文
ReActAgent executor = new ReactImplAgent(agentContext);
// 创建SummaryAgent总结器实例,传入智能体上下文
SummaryAgent summary = new SummaryAgent(agentContext);
// 替换总结器系统提示中的占位符 {{query}} 为实际的请求查询内容
summary.setSystemPrompt(summary.getSystemPrompt().replace("{{query}}", request.getQuery()));
// 执行ReActAgent,传入请求的查询内容
executor.run(request.getQuery());
// 使用SummaryAgent总结任务结果,传入执行器内存中的消息和请求查询内容
TaskSummaryResult result = summary.summaryTaskResult(executor.getMemory().getMessages(), request.getQuery());
// 创建一个用于存储任务结果的Map
Map<String, Object> taskResult = new HashMap<>();
// 将任务总结信息放入任务结果Map中
taskResult.put("taskSummary", result.getTaskSummary());
// 如果任务总结结果中的文件列表为空
if (CollectionUtils.isEmpty(result.getFiles())) {
// 并且智能体上下文中的产品文件列表不为空
if (!CollectionUtils.isEmpty(agentContext.getProductFiles())) {
// 获取智能体上下文中的产品文件列表
List<File> fileResponses = agentContext.getProductFiles();
// 过滤掉标记为内部文件的中间搜索结果文件
fileResponses.removeIf(file -> Objects.nonNull(file) && file.getIsInternalFile());
// 反转文件列表顺序
Collections.reverse(fileResponses);
// 将处理后的文件列表放入任务结果Map中
taskResult.put("fileList", fileResponses);
}
} else {
// 如果任务总结结果中的文件列表不为空,直接将其放入任务结果Map中
taskResult.put("fileList", result.getFiles());
}
// 通过智能体上下文的打印机发送 "result" 及任务结果
agentContext.getPrinter().send("result", taskResult);
// 返回空字符串,这里可能根据具体业务逻辑决定返回值的含义,当前逻辑返回空字符串
return "";
}
// 判断该处理器是否支持给定的智能体上下文和请求
@Override
public Boolean support(AgentContext agentContext, AgentRequest request) {
return AgentType.REACT.getValue().equals(request.getAgentType());
}
}
PlanSolveHandlerImpl——Plan-Executor 模式
@Slf4j
@Component
public class PlanSolveHandlerImpl implements AgentHandlerService {
@Autowired
private GenieConfig genieConfig;
@Autowired
private SopRecallService sopRecallService;
@Override
public String handle(AgentContext agentContext, AgentRequest request) {
// 执行SOP召回逻辑
handleSopRecall(agentContext, request);
// 创建规划智能体实例,传入智能体上下文
PlanningAgent planning = new PlanningAgent(agentContext);
// 创建执行智能体实例,传入智能体上下文
ExecutorAgent executor = new ExecutorAgent(agentContext);
// 创建总结智能体实例,传入智能体上下文
SummaryAgent summary = new SummaryAgent(agentContext);
// 替换总结智能体系统提示中的占位符 {{query}} 为实际的请求查询内容
summary.setSystemPrompt(summary.getSystemPrompt().replace("{{query}}", request.getQuery()));
// 运行规划智能体,获取规划结果
String planningResult = planning.run(agentContext.getQuery());
int stepIdx = 0;
// 最大步骤数,从配置中获取
int maxStepNum = genieConfig.getPlannerMaxSteps();
// 循环执行任务,直到达到最大步骤数
while (stepIdx <= maxStepNum) {
// 将规划结果按 <sep> 分割,并为每个任务添加前缀 "你的任务是:"
List<String> planningResults = Arrays.stream(planningResult.split("<sep>"))
.map(task -> "你的任务是:" + task)
.collect(Collectors.toList());
String executorResult;
// 清空任务产品文件列表
agentContext.getTaskProductFiles().clear();
// 如果只有一个规划任务
if (planningResults.size() == 1) {
// 运行执行智能体处理该任务
executorResult = executor.run(planningResults.get(0));
} else {
// 用于存储每个子任务的执行结果
Map<String, String> tmpTaskResult = new ConcurrentHashMap<>();
// 创建CountDownLatch,用于等待所有子任务执行完成
CountDownLatch taskCount = ThreadUtil.getCountDownLatch(planningResults.size());
// 记录当前执行智能体内存大小,用于后续合并内存信息
int memoryIndex = executor.getMemory().size();
// 用于存储子执行智能体实例
List<ExecutorAgent> slaveExecutors = new ArrayList<>();
for (String task : planningResults) {
// 创建子执行智能体实例,传入智能体上下文
ExecutorAgent slaveExecutor = new ExecutorAgent(agentContext);
// 设置子执行智能体状态与主执行智能体相同
slaveExecutor.setState(executor.getState());
// 将主执行智能体的内存消息添加到子执行智能体
slaveExecutor.getMemory().addMessages(executor.getMemory().getMessages());
slaveExecutors.add(slaveExecutor);
// 异步执行子任务
ThreadUtil.execute(() -> {
// 运行子执行智能体处理任务
String taskResult = slaveExecutor.run(task);
// 将子任务执行结果存入临时结果Map
tmpTaskResult.put(task, taskResult);
// 减少CountDownLatch计数
taskCount.countDown();
});
}
// 等待所有子任务执行完成
ThreadUtil.await(taskCount);
for (ExecutorAgent slaveExecutor : slaveExecutors) {
// 将子执行智能体新增的内存消息合并到主执行智能体
for (int i = memoryIndex; i < slaveExecutor.getMemory().size(); i++) {
executor.getMemory().addMessage(slaveExecutor.getMemory().get(i));
}
// 清空子执行智能体内存
slaveExecutor.getMemory().clear();
// 设置主执行智能体状态与子执行智能体相同
executor.setState(slaveExecutor.getState());
}
// 将所有子任务执行结果拼接成字符串
executorResult = String.join("\n", tmpTaskResult.values());
}
// 使用执行结果再次运行规划智能体,获取新的规划结果
planningResult = planning.run(executorResult);
// 如果规划结果为 "finish",表示任务成功结束
if ("finish".equals(planningResult)) {
// 总结任务
TaskSummaryResult result = summary.summaryTaskResult(executor.getMemory().getMessages(), request.getQuery());
// 创建用于存储任务结果的Map
Map<String, Object> taskResult = new HashMap<>();
// 将任务总结信息放入任务结果Map
taskResult.put("taskSummary", result.getTaskSummary());
// 如果任务总结结果中的文件列表为空
if (CollectionUtils.isEmpty(result.getFiles())) {
// 并且智能体上下文中的产品文件列表不为空
if (!CollectionUtils.isEmpty(agentContext.getProductFiles())) {
// 获取智能体上下文中的产品文件列表
List<File> fileResponses = agentContext.getProductFiles();
// 过滤掉标记为内部文件的中间搜索结果文件
fileResponses.removeIf(file -> Objects.nonNull(file) && file.getIsInternalFile());
// 反转文件列表顺序
Collections.reverse(fileResponses);
// 将处理后的文件列表放入任务结果Map
taskResult.put("fileList", fileResponses);
}
} else {
// 如果任务总结结果中的文件列表不为空,直接将其放入任务结果Map
taskResult.put("fileList", result.getFiles());
}
// 通过智能体上下文的打印机发送 "result" 及任务结果
agentContext.getPrinter().send("result", taskResult);
// 跳出循环
break;
}
// 如果规划智能体或执行智能体状态为IDLE(空闲),表示达到最大迭代次数,任务终止
if (planning.getState() == AgentState.IDLE || executor.getState() == AgentState.IDLE) {
agentContext.getPrinter().send("result", "达到最大迭代次数,任务终止。");
break;
}
// 如果规划智能体或执行智能体状态为ERROR(错误),表示任务执行异常,任务终止
if (planning.getState() == AgentState.ERROR || executor.getState() == AgentState.ERROR) {
agentContext.getPrinter().send("result", "任务执行异常,请联系管理员,任务终止。");
break;
}
// 步骤索引加1
stepIdx++;
}
// 返回空字符串,这里可能根据具体业务逻辑决定返回值的含义,当前逻辑返回空字符串
return "";
}
@Override
public Boolean support(AgentContext agentContext, AgentRequest request) {
return AgentType.PLAN_SOLVE.getValue().equals(request.getAgentType());
}
/**
* 处理SOP召回逻辑
*
* @param agentContext 代理上下文
* @param request 请求对象
*/
private void handleSopRecall(AgentContext agentContext, AgentRequest request) {
try {
log.info("{} 开始执行SOP召回", request.getRequestId());
// 调用SOP召回服务
SopRecallResponse sopResponse = sopRecallService.sopRecall(
request.getRequestId(),
request.getQuery()
);
// 检查召回结果
if (sopRecallService.isValidSopResult(sopResponse)) {
String sopContent = sopResponse.getData().getChoosed_sop_string();
String sopMode = sopResponse.getData().getSop_mode();
log.info("{} SOP召回成功,模式:{},内容长度:{}",
request.getRequestId(), sopMode, sopContent.length());
// 注入sopPrompt
String sopPrompt = agentContext.getSopPrompt().replace("{{sop}}", sopContent);
agentContext.setSopPrompt(sopPrompt);
} else {
log.warn("{} SOP召回失败或结果无效", request.getRequestId());
}
} catch (Exception e) {
log.error("{} SOP召回处理异常", request.getRequestId(), e);
// SOP召回失败不影响主流程,继续执行
}
}
}
此时,我们再回过头来看 源码中关于Agent实现的机制!

BaseAgent 代理基类 - 管理代理状态和执行的基础类
/**
* 代理基类 - 管理代理状态和执行的基础类
*/
@Slf4j
@Data
@Accessors(chain = true)
public abstract class BaseAgent {
// 核心属性
private String name;
private String description;
private String systemPrompt;
private String nextStepPrompt;
public ToolCollection availableTools = new ToolCollection();
private Memory memory = new Memory();
protected LLM llm;
protected AgentContext context;
// 执行控制
private AgentState state = AgentState.IDLE;
private int maxSteps = 10;
private int currentStep = 0;
private int duplicateThreshold = 2;
// emitter
Printer printer;
// digital employee prompt
private String digitalEmployeePrompt;
/**
* 执行单个步骤
*/
public abstract String step();
/**
* 运行代理主循环
*/
public String run(String query) {
setState(AgentState.IDLE);
if (!query.isEmpty()) {
updateMemory(RoleType.USER, query, null);
}
List<String> results = new ArrayList<>();
try {
while (currentStep < maxSteps && state != AgentState.FINISHED) {
currentStep++;
log.info("{} {} Executing step {}/{}", context.getRequestId(), getName(), currentStep, maxSteps);
String stepResult = step();
results.add(stepResult);
}
if (currentStep >= maxSteps) {
currentStep = 0;
state = AgentState.IDLE;
results.add("Terminated: Reached max steps (" + maxSteps + ")");
}
} catch (Exception e) {
state = AgentState.ERROR;
throw e;
}
return results.isEmpty() ? "No steps executed" : results.get(results.size() - 1);
}
/**
* 更新代理记忆
*/
public void updateMemory(RoleType role, String content, String base64Image, Object... args) {
Message message;
switch (role) {
case USER:
message = Message.userMessage(content, base64Image);
break;
case SYSTEM:
message = Message.systemMessage(content, base64Image);
break;
case ASSISTANT:
message = Message.assistantMessage(content, base64Image);
break;
case TOOL:
message = Message.toolMessage(content, (String) args[0], base64Image);
break;
default:
throw new IllegalArgumentException("Unsupported role type: " + role);
}
memory.addMessage(message);
}
public String executeTool(ToolCall command) {
if (command == null || command.getFunction() == null || command.getFunction().getName() == null) {
return "Error: Invalid function call format";
}
String name = command.getFunction().getName();
try {
// 解析参数
ObjectMapper mapper = new ObjectMapper();
Object args = mapper.readValue(command.getFunction().getArguments(), Object.class);
// 执行工具
Object result = availableTools.execute(name, args);
log.info("{} execute tool: {} {} result {}", context.getRequestId(), name, args, result);
// 格式化结果
if (Objects.nonNull(result)) {
return (String) result;
}
} catch (Exception e) {
log.error("{} execute tool {} failed ", context.getRequestId(), name, e);
}
return "Tool" + name + " Error.";
}
/**
* 并发执行多个工具调用命令并返回执行结果
*
* @param commands 工具调用命令列表
* @return 返回工具执行结果映射,key为工具ID,value为执行结果
*/
public Map<String, String> executeTools(List<ToolCall> commands) {
Map<String, String> result = new ConcurrentHashMap<>();
CountDownLatch taskCount = ThreadUtil.getCountDownLatch(commands.size());
for (ToolCall tooCall : commands) {
ThreadUtil.execute(() -> {
String toolResult = executeTool(tooCall);
result.put(tooCall.getId(), toolResult);
taskCount.countDown();
});
}
ThreadUtil.await(taskCount);
return result;
}
}
ReAct代理 - 基于ReAct模式的智能代理
/**
* ReAct代理 - 基于ReAct模式的智能代理
*/
@Data
@Slf4j
@EqualsAndHashCode(callSuper = true)
public abstract class ReActAgent extends BaseAgent {
/**
* 思考过程
*/
public abstract boolean think();
/**
* 执行行动
*/
public abstract String act();
/**
* 执行单个步骤
*/
@Override
public String step() {
boolean shouldAct = think();
if (!shouldAct) {
return "Thinking complete - no action needed";
}
return act();
}
public void generateDigitalEmployee(String task) {
// 1、参数检查
if (StringUtils.isEmpty(task)) {
return;
}
try {
// 2. 构建系统消息(提取为独立方法)
String formattedPrompt = formatSystemPrompt(task);
Message userMessage = Message.userMessage(formattedPrompt, null);
// 3. 调用LLM并处理结果
CompletableFuture<String> summaryFuture = getLlm().ask(
context,
Collections.singletonList(userMessage),
Collections.emptyList(),
false,
0.01);
// 4. 解析响应
String llmResponse = summaryFuture.get();
log.info("requestId: {} task:{} generateDigitalEmployee: {}", context.getRequestId(), task, llmResponse);
JSONObject jsonObject = parseDigitalEmployee(llmResponse);
if (jsonObject != null) {
log.info("requestId:{} generateDigitalEmployee: {}", context.getRequestId(), jsonObject);
context.getToolCollection().updateDigitalEmployee(jsonObject);
context.getToolCollection().setCurrentTask(task);
// 更新 availableTools 添加数字员工
availableTools = context.getToolCollection();
} else {
log.error("requestId: {} generateDigitalEmployee failed", context.getRequestId());
}
} catch (Exception e) {
log.error("requestId: {} in generateDigitalEmployee failed,", context.getRequestId(), e);
}
}
// 解析数据员工大模型响应
private JSONObject parseDigitalEmployee(String response) {
/**
* 格式一:
* ```json
* {
* "file_tool": "市场洞察专员"
* }
* ```
* 格式二:
* {
* "file_tool": "市场洞察专员"
* }
*/
if (StringUtils.isBlank(response)) {
return null;
}
String jsonString = response;
String regex = "```\\s*json([\\d\\D]+?)```";
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(response);
if (matcher.find()) {
String temp = matcher.group(1).trim();
if (!jsonString.isEmpty()) {
jsonString = temp;
}
}
try {
return JSON.parseObject(jsonString);
} catch (Exception e) {
log.error("requestId: {} in parseDigitalEmployee error:", context.getRequestId(), e);
return null;
}
}
// 提取系统提示格式化逻辑
private String formatSystemPrompt(String task) {
String digitalEmployeePrompt = getDigitalEmployeePrompt();
if (digitalEmployeePrompt == null) {
throw new IllegalStateException("System prompt is not configured");
}
StringBuilder toolPrompt = new StringBuilder();
for (BaseTool tool : context.getToolCollection().getToolMap().values()) {
toolPrompt.append(String.format("工具名:%s 工具描述:%s\n", tool.getName(), tool.getDescription()));
}
// 替换占位符
return digitalEmployeePrompt
.replace("{{task}}", task)
.replace("{{ToolsDesc}}", toolPrompt.toString())
.replace("{{query}}", context.getQuery());
}
}
Summary代理-基于上下文信息进行处理以及类似执行任务产生结果的行为 (不属于典型模式!)
// 定义SummaryAgent类,继承自BaseAgent,用于总结任务结果
@Data
@Slf4j
@EqualsAndHashCode(callSuper = true)
public class SummaryAgent extends BaseAgent {
// 请求ID
private String requestId;
// 消息大小限制
private Integer messageSizeLimit;
// 日志标识
public static final String logFlag = "summaryTaskResult";
// 构造函数,初始化SummaryAgent
public SummaryAgent(AgentContext context) {
// 获取Spring应用上下文
ApplicationContext applicationContext = SpringContextHolder.getApplicationContext();
// 从Spring应用上下文中获取GenieConfig实例
GenieConfig genieConfig = applicationContext.getBean(GenieConfig.class);
// 设置系统提示,从GenieConfig中获取总结系统提示
setSystemPrompt(genieConfig.getSummarySystemPrompt());
// 设置上下文
setContext(context);
// 设置请求ID
setRequestId(context.getRequestId());
// 根据上下文的智能体类型设置LLM模型名称
setLlm(new LLM(context.getAgentType() == 3? genieConfig.getPlannerModelName() : genieConfig.getReactModelName(), ""));
// 设置消息大小限制
setMessageSizeLimit(genieConfig.getMessageSizeLimit());
}
/**
* 执行单个步骤,当前返回空字符串,可能在子类中具体实现
*/
public String step() {
return "";
}
// 构造文件信息,用于生成文件相关的描述字符串
private String createFileInfo() {
// 获取上下文中的产品文件列表
List<File> files = context.getProductFiles();
// 如果文件列表为空
if (CollectionUtils.isEmpty(files)) {
// 记录日志,表明上下文中未找到文件
log.info("requestId: {} no files found in context", requestId);
return "";
}
// 记录日志,打印请求ID、日志标识和产品文件列表
log.info("requestId: {} {} product files:{}", requestId, logFlag, files);
// 过滤掉内部文件,并将文件名和描述拼接成字符串,每个文件占一行
String result = files.stream()
.filter(file ->!file.getIsInternalFile())
.map(file -> file.getFileName() + " : " + file.getDescription())
.collect(Collectors.joining("\n"));
// 记录日志,打印生成的文件信息
log.info("requestId: {} generated file info: {}", requestId, result);
return result;
}
// 提取系统提示格式化逻辑,根据任务历史、文件信息和查询替换系统提示中的占位符
private String formatSystemPrompt(String taskHistory, String query) {
// 获取系统提示
String systemPrompt = getSystemPrompt();
// 如果系统提示为空
if (systemPrompt == null) {
// 记录错误日志,表明系统提示未配置
log.error("requestId: {} {} systemPrompt is null", requestId, logFlag);
throw new IllegalStateException("System prompt is not configured");
}
// 替换占位符并返回格式化后的系统提示
return systemPrompt
.replace("{{taskHistory}}", taskHistory)
.replace("{{fileNameDesc}}", createFileInfo())
.replace("{{query}}", query);
}
// 提取消息创建逻辑,创建系统消息
private Message createSystemMessage(String content) {
// 创建用户消息,这里假设Message.userMessage方法用于创建用户消息
return Message.userMessage(content, null);
}
/**
* 解析LLM响应并处理文件关联,根据LLM响应生成任务总结结果
*/
private TaskSummaryResult parseLlmResponse(String llmResponse) {
// 如果LLM响应为空
if (StringUtils.isEmpty(llmResponse)) {
// 记录错误日志,表明响应为空导致模式匹配失败
log.error("requestId: {} pattern matcher failed for response is null", requestId);
}
// 使用$$$分割LLM响应
String[] parts1 = llmResponse.split("\\$\\$\\$");
// 如果分割后的数组长度小于2
if (parts1.length < 2) {
// 只返回总结内容
return TaskSummaryResult.builder().taskSummary(parts1[0]).build();
}
// 提取总结内容
String summary = parts1[0];
// 提取文件名部分
String fileNames = parts1[1];
// 获取上下文中的产品文件列表
List<File> files = context.getProductFiles();
// 如果文件列表不为空,反转文件列表顺序
if (!CollectionUtils.isEmpty(files)) {
Collections.reverse(files);
} else {
// 如果文件列表为空,记录错误日志,并返回只包含总结内容的任务总结结果
log.error("requestId: {} llmResponse:{} productFile list is empty", requestId, llmResponse);
return TaskSummaryResult.builder().taskSummary(summary).build();
}
// 用于存储匹配的文件
List<File> product = new ArrayList<>();
// 使用、分割文件名部分
String[] items = fileNames.split("、");
for (String item : items) {
// 去除文件名前后的空白字符
String trimmedItem = item.trim();
if (StringUtils.isBlank(trimmedItem)) {
continue;
}
for (File file : files) {
// 如果文件名包含在LLM响应的文件名部分中
if (item.contains(file.getFileName().trim())) {
// 记录日志,表明添加了匹配的文件
log.info("requestId: {} add file:{}", requestId, file);
product.add(file);
break;
}
}
}
// 返回包含总结内容和匹配文件的任务总结结果
return TaskSummaryResult.builder().taskSummary(summary).files(product).build();
}
// 总结任务,生成任务总结结果
public TaskSummaryResult summaryTaskResult(List<Message> messages, String query) {
long startTime = System.currentTimeMillis();
// 1. 参数校验(可选)
if (CollectionUtils.isEmpty(messages) || StringUtils.isEmpty(query)) {
// 记录警告日志,表明消息或查询为空
log.warn("requestId: {} summaryTaskResult messages:{} or query:{} is empty", requestId, messages, query);
return TaskSummaryResult.builder().taskSummary("").build();
}
try {
// 2. 构建系统消息(提取为独立方法)
log.info("requestId: {} summaryTaskResult: messages:{}", requestId, messages.size());
StringBuilder sb = new StringBuilder();
for (Message message : messages) {
String content = message.getContent();
// 如果消息内容长度超过限制,进行截断
if (content != null && content.length() > getMessageSizeLimit()) {
log.info("requestId: {} message truncate,{}", requestId, message);
content = content.substring(0, getMessageSizeLimit());
}
// 将消息的角色和内容拼接成字符串
sb.append(String.format("role:%s content:%s\n", message.getRole(), content));
}
// 格式化系统提示
String formattedPrompt = formatSystemPrompt(sb.toString(), query);
// 创建系统消息
Message userMessage = createSystemMessage(formattedPrompt);
// 3. 调用LLM并处理结果
CompletableFuture<String> summaryFuture = getLlm().ask(
context,
Collections.singletonList(userMessage),
Collections.emptyList(),
false,
0.01);
// 5. 解析响应
String llmResponse = summaryFuture.get();
log.info("requestId: {} summaryTaskResult: {}", requestId, llmResponse);
// 解析LLM响应并返回任务总结结果
return parseLlmResponse(llmResponse);
} catch (Exception e) {
// 记录错误日志,表明总结任务失败
log.error("requestId: {} in summaryTaskResult failed,", requestId, e);
return TaskSummaryResult.builder().taskSummary("任务执行失败,请联系管理员!").build();
}
}
}
Executor 工具调用代理 - 处理工具/函数调用的基础代理类
/**
* 工具调用代理 - 处理工具/函数调用的基础代理类
*/
@Data
@Slf4j
@EqualsAndHashCode(callSuper = true)
public class ExecutorAgent extends ReActAgent {
private List<ToolCall> toolCalls;
private Integer maxObserve;
private String systemPromptSnapshot;
private String nextStepPromptSnapshot;
private Integer taskId;
public ExecutorAgent(AgentContext context) {
setName("executor");
setDescription("an agent that can execute tool calls.");
ApplicationContext applicationContext = SpringContextHolder.getApplicationContext();
GenieConfig genieConfig = applicationContext.getBean(GenieConfig.class);
StringBuilder toolPrompt = new StringBuilder();
for (BaseTool tool : context.getToolCollection().getToolMap().values()) {
toolPrompt.append(String.format("工具名:%s 工具描述:%s\n", tool.getName(), tool.getDescription()));
}
String promptKey = "default";
String sopPromptKey = "default";
String nextPromptKey = "default";
setSystemPrompt(genieConfig.getExecutorSystemPromptMap().getOrDefault(promptKey, ToolCallPrompt.SYSTEM_PROMPT)
.replace("{{tools}}", toolPrompt.toString())
.replace("{{query}}", context.getQuery())
.replace("{{date}}", context.getDateInfo())
.replace("{{sopPrompt}}", context.getSopPrompt())
.replace("{{executorSopPrompt}}", genieConfig.getExecutorSopPromptMap().getOrDefault(sopPromptKey, "")));
setNextStepPrompt(genieConfig.getExecutorNextStepPromptMap().getOrDefault(nextPromptKey, ToolCallPrompt.NEXT_STEP_PROMPT)
.replace("{{tools}}", toolPrompt.toString())
.replace("{{query}}", context.getQuery())
.replace("{{date}}", context.getDateInfo())
.replace("{{sopPrompt}}", context.getSopPrompt())
.replace("{{executorSopPrompt}}", genieConfig.getExecutorSopPromptMap().getOrDefault(sopPromptKey, "")));
setSystemPromptSnapshot(getSystemPrompt());
setNextStepPromptSnapshot(getNextStepPrompt());
setPrinter(context.printer);
setMaxSteps(genieConfig.getPlannerMaxSteps());
setLlm(new LLM(genieConfig.getExecutorModelName(), ""));
setContext(context);
setMaxObserve(Integer.parseInt(genieConfig.getMaxObserve()));
// 初始化工具集合
availableTools = context.getToolCollection();
setDigitalEmployeePrompt(genieConfig.getDigitalEmployeePrompt());
setTaskId(0);
}
@Override
public boolean think() {
// 获取文件内容
String filesStr = FileUtil.formatFileInfo(context.getProductFiles(), true);
setSystemPrompt(getSystemPromptSnapshot().replace("{{files}}", filesStr));
setNextStepPrompt(getNextStepPromptSnapshot().replace("{{files}}", filesStr));
if (!getMemory().getLastMessage().getRole().equals(RoleType.USER)) {
Message userMsg = Message.userMessage(getNextStepPrompt(), null);
getMemory().addMessage(userMsg);
}
try {
// 获取带工具选项的响应
log.info("{} executor ask tool {}", context.getRequestId(), JSON.toJSONString(availableTools));
CompletableFuture<LLM.ToolCallResponse> future = getLlm().askTool(
context,
getMemory().getMessages(),
Message.systemMessage(getSystemPrompt(), null),
availableTools,
ToolChoice.AUTO, null, false, 300
);
LLM.ToolCallResponse response = future.get();
setToolCalls(response.getToolCalls());
// 记录响应信息
if (response.getContent() != null && !response.getContent().trim().isEmpty()) {
String thinkResult = response.getContent();
String subType = "taskThought";
if (toolCalls.isEmpty()) {
Map<String, Object> taskSummary = new HashMap<>();
taskSummary.put("taskSummary", response.getContent());
taskSummary.put("fileList", context.getTaskProductFiles());
thinkResult = JSON.toJSONString(taskSummary);
subType = "taskSummary";
printer.send("task_summary", taskSummary);
} else {
printer.send("tool_thought", response.getContent());
}
}
// 创建并添加助手消息
Message assistantMsg = response.getToolCalls() != null && !response.getToolCalls().isEmpty() && !"struct_parse".equals(llm.getFunctionCallType()) ?
Message.fromToolCalls(response.getContent(), response.getToolCalls()) :
Message.assistantMessage(response.getContent(), null);
getMemory().addMessage(assistantMsg);
} catch (Exception e) {
log.error("Oops! The " + getName() + "'s thinking process hit a snag: " + e.getMessage());
getMemory().addMessage(Message.assistantMessage(
"Error encountered while processing: " + e.getMessage(), null));
setState(AgentState.FINISHED);
return false;
}
return true;
}
@Override
public String act() {
if (toolCalls.isEmpty()) {
GenieConfig genieConfig = SpringContextHolder.getApplicationContext().getBean(GenieConfig.class);
setState(AgentState.FINISHED);
// 删除工具结果
if ("1".equals(genieConfig.getClearToolMessage())) {
getMemory().clearToolContext();
}
// 返回固定话术
if (!genieConfig.getTaskCompleteDesc().isEmpty()) {
return genieConfig.getTaskCompleteDesc();
}
return getMemory().getLastMessage().getContent();
}
Map<String, String> toolResults = executeTools(toolCalls);
List<String> results = new ArrayList<>();
for (ToolCall command : toolCalls) {
String result = toolResults.get(command.getId());
if (!Arrays.asList("code_interpreter", "report_tool", "file_tool", "deep_search", "data_analysis").contains(command.getFunction().getName())) {
String toolName = command.getFunction().getName();
printer.send("tool_result", AgentResponse.ToolResult.builder()
.toolName(toolName)
.toolParam(JSON.parseObject(command.getFunction().getArguments(), Map.class))
.toolResult(result)
.build(), null);
}
if (maxObserve != null) {
result = result.substring(0, Math.min(result.length(), maxObserve));
}
// 添加工具响应到记忆
if ("struct_parse".equals(llm.getFunctionCallType())) {
String content = getMemory().getLastMessage().getContent();
getMemory().getLastMessage().setContent(content + "\n 工具执行结果为:\n" + result);
} else { // function_call
Message toolMsg = Message.toolMessage(
result,
command.getId(),
null
);
getMemory().addMessage(toolMsg);
}
results.add(result);
}
return String.join("\n\n", results);
}
@Override
public String run(String request) {
generateDigitalEmployee(request);
GenieConfig genieConfig = SpringContextHolder.getApplicationContext().getBean(GenieConfig.class);
request = genieConfig.getTaskPrePrompt() + request;
// 更新当前task
context.setTask(request);
return super.run(request);
}
}
React 工具调用代理 - 处理工具/函数调用的基础代理类
/**
* 工具调用代理 - 处理工具/函数调用的基础代理类
*/
@Data
@Slf4j
@EqualsAndHashCode(callSuper = true)
public class ReactImplAgent extends ReActAgent {
private List<ToolCall> toolCalls;
private Integer maxObserve;
private String systemPromptSnapshot;
private String nextStepPromptSnapshot;
public ReactImplAgent(AgentContext context) {
setName("react");
setDescription("an agent that can execute tool calls.");
ApplicationContext applicationContext = SpringContextHolder.getApplicationContext();
GenieConfig genieConfig = applicationContext.getBean(GenieConfig.class);
StringBuilder toolPrompt = new StringBuilder();
for (BaseTool tool : context.getToolCollection().getToolMap().values()) {
toolPrompt.append(String.format("工具名:%s 工具描述:%s\n", tool.getName(), tool.getDescription()));
}
String promptKey = "default";
String nextPromptKey = "default";
setSystemPrompt(genieConfig.getReactSystemPromptMap().getOrDefault(promptKey, ToolCallPrompt.SYSTEM_PROMPT)
.replace("{{tools}}", toolPrompt.toString())
.replace("{{query}}", context.getQuery())
.replace("{{date}}", context.getDateInfo())
.replace("{{basePrompt}}", context.getBasePrompt()));
setNextStepPrompt(genieConfig.getReactNextStepPromptMap().getOrDefault(nextPromptKey, ToolCallPrompt.NEXT_STEP_PROMPT)
.replace("{{tools}}", toolPrompt.toString())
.replace("{{query}}", context.getQuery())
.replace("{{date}}", context.getDateInfo())
.replace("{{basePrompt}}", context.getBasePrompt()));
setSystemPromptSnapshot(getSystemPrompt());
setNextStepPromptSnapshot(getNextStepPrompt());
setPrinter(context.printer);
setMaxSteps(genieConfig.getReactMaxSteps());
setLlm(new LLM(genieConfig.getReactModelName(), ""));
setContext(context);
// 初始化工具集合
availableTools = context.getToolCollection();
setDigitalEmployeePrompt(genieConfig.getDigitalEmployeePrompt());
}
@Override
public boolean think() {
// 获取文件内容
String filesStr = FileUtil.formatFileInfo(context.getProductFiles(), true);
setSystemPrompt(getSystemPromptSnapshot().replace("{{files}}", filesStr));
setNextStepPrompt(getNextStepPromptSnapshot().replace("{{files}}", filesStr));
if (!getMemory().getLastMessage().getRole().equals(RoleType.USER)) {
Message userMsg = Message.userMessage(getNextStepPrompt(), null);
getMemory().addMessage(userMsg);
}
try {
// 获取带工具选项的响应
context.setStreamMessageType("tool_thought");
CompletableFuture<LLM.ToolCallResponse> future = getLlm().askTool(
context,
getMemory().getMessages(),
Message.systemMessage(getSystemPrompt(), null),
availableTools,
ToolChoice.AUTO, null, context.getIsStream(), 300
);
LLM.ToolCallResponse response = future.get();
setToolCalls(response.getToolCalls());
// 记录响应信息
if (!context.getIsStream() && response.getContent() != null && !response.getContent().isEmpty()) {
printer.send("tool_thought", response.getContent());
}
// 创建并添加助手消息
Message assistantMsg = response.getToolCalls() != null && !response.getToolCalls().isEmpty() && !"struct_parse".equals(llm.getFunctionCallType()) ?
Message.fromToolCalls(response.getContent(), response.getToolCalls()) :
Message.assistantMessage(response.getContent(), null);
getMemory().addMessage(assistantMsg);
} catch (Exception e) {
log.error("{} react think error", context.getRequestId(), e);
getMemory().addMessage(Message.assistantMessage(
"Error encountered while processing: " + e.getMessage(), null));
setState(AgentState.FINISHED);
return false;
}
return true;
}
@Override
public String act() {
if (toolCalls.isEmpty()) {
setState(AgentState.FINISHED);
return getMemory().getLastMessage().getContent();
}
// action
Map<String, String> toolResults = executeTools(toolCalls);
List<String> results = new ArrayList<>();
for (ToolCall command : toolCalls) {
String result = toolResults.get(command.getId());
if (!Arrays.asList("code_interpreter", "report_tool", "file_tool", "deep_search", "data_analysis").contains(command.getFunction().getName())) {
String toolName = command.getFunction().getName();
printer.send("tool_result", AgentResponse.ToolResult.builder()
.toolName(toolName)
.toolParam(JSON.parseObject(command.getFunction().getArguments(), Map.class))
.toolResult(result)
.build(), null);
}
if (maxObserve != null) {
result = result.substring(0, Math.min(result.length(), maxObserve));
}
// 添加工具响应到记忆
if ("struct_parse".equals(llm.getFunctionCallType())) {
String content = getMemory().getLastMessage().getContent();
getMemory().getLastMessage().setContent(content + "\n 工具执行结果为:\n" + result);
} else { // function_call
Message toolMsg = Message.toolMessage(
result,
command.getId(),
null
);
getMemory().addMessage(toolMsg);
}
results.add(result);
}
return String.join("\n\n", results);
}
@Override
public String run(String request) {
return super.run(request);
}
}
Planning规划代理 - 创建和管理任务计划的代理
/**
* 规划代理 - 创建和管理任务计划的代理
*/
@Slf4j
@Data
@EqualsAndHashCode(callSuper = true)
public class PlanningAgent extends ReActAgent {
private List<ToolCall> toolCalls;
private Integer maxObserve;
private PlanningTool planningTool = new PlanningTool();
private Boolean isColseUpdate;
private String systemPromptSnapshot;
private String nextStepPromptSnapshot;
private String planId;
public PlanningAgent(AgentContext context) {
setName("planning");
setDescription("An agent that creates and manages plans to solve tasks");
ApplicationContext applicationContext = SpringContextHolder.getApplicationContext();
GenieConfig genieConfig = applicationContext.getBean(GenieConfig.class);
StringBuilder toolPrompt = new StringBuilder();
for (BaseTool tool : context.getToolCollection().getToolMap().values()) {
toolPrompt.append(String.format("工具名:%s 工具描述:%s\n", tool.getName(), tool.getDescription()));
}
String promptKey = "default";
String nextPromptKey = "default";
setSystemPrompt(genieConfig.getPlannerSystemPromptMap().getOrDefault(promptKey, PlanningPrompt.SYSTEM_PROMPT)
.replace("{{tools}}", toolPrompt.toString())
.replace("{{query}}", context.getQuery())
.replace("{{date}}", context.getDateInfo())
.replace("{{sopPrompt}}", context.getSopPrompt()));
setNextStepPrompt(genieConfig.getPlannerNextStepPromptMap().getOrDefault(nextPromptKey, PlanningPrompt.NEXT_STEP_PROMPT)
.replace("{{tools}}", toolPrompt.toString())
.replace("{{query}}", context.getQuery())
.replace("{{date}}", context.getDateInfo())
.replace("{{sopPrompt}}", context.getSopPrompt()));
setSystemPromptSnapshot(getSystemPrompt());
setNextStepPromptSnapshot(getNextStepPrompt());
setPrinter(context.printer);
setMaxSteps(genieConfig.getPlannerMaxSteps());
setLlm(new LLM(genieConfig.getPlannerModelName(), ""));
setContext(context);
setIsColseUpdate("1".equals(genieConfig.getPlanningCloseUpdate()));
// 初始化工具集合
availableTools.addTool(planningTool);
planningTool.setAgentContext(context);
}
@Override
public boolean think() {
long startTime = System.currentTimeMillis();
// 获取文件内容
String filesStr = FileUtil.formatFileInfo(context.getProductFiles(), false);
setSystemPrompt(getSystemPromptSnapshot().replace("{{files}}", filesStr));
setNextStepPrompt(getNextStepPromptSnapshot().replace("{{files}}", filesStr));
log.info("{} planer fileStr {}", context.getRequestId(), filesStr);
// 关闭了动态更新Plan,直接执行下一个task
if (isColseUpdate) {
if (Objects.nonNull(planningTool.getPlan())) {
planningTool.stepPlan();
return true;
}
}
try {
if (!getMemory().getLastMessage().getRole().equals(RoleType.USER)) {
Message userMsg = Message.userMessage(getNextStepPrompt(), null);
getMemory().addMessage(userMsg);
}
context.setStreamMessageType("plan_thought");
CompletableFuture<LLM.ToolCallResponse> future = getLlm().askTool(context,
getMemory().getMessages(),
Message.systemMessage(getSystemPrompt(), null),
availableTools,
ToolChoice.AUTO, null, context.getIsStream(), 300
);
LLM.ToolCallResponse response = future.get();
setToolCalls(response.getToolCalls());
// 记录响应信息
if (!context.getIsStream() && response.getContent() != null && !response.getContent().isEmpty()) {
printer.send("plan_thought", response.getContent());
}
// 记录响应信息
log.info("{} {}'s thoughts: {}", context.getRequestId(), getName(), response.getContent());
log.info("{} {} selected {} tools to use", context.getRequestId(), getName(),
response.getToolCalls() != null ? response.getToolCalls().size() : 0);
// 创建并添加助手消息
Message assistantMsg = response.getToolCalls() != null && !response.getToolCalls().isEmpty() && !"struct_parse".equals(llm.getFunctionCallType()) ?
Message.fromToolCalls(response.getContent(), response.getToolCalls()) :
Message.assistantMessage(response.getContent(), null);
getMemory().addMessage(assistantMsg);
} catch (Exception e) {
log.error("{} think error ", context.getRequestId(), e);
}
return true;
}
@Override
public String act() {
// 关闭了动态更新Plan,直接执行下一个task
if (isColseUpdate) {
if (Objects.nonNull(planningTool.getPlan())) {
return getNextTask();
}
}
List<String> results = new ArrayList<>();
long startTime = System.currentTimeMillis();
for (ToolCall toolCall : toolCalls) {
String result = executeTool(toolCall);
if (maxObserve != null) {
result = result.substring(0, Math.min(result.length(), maxObserve));
}
results.add(result);
// 添加工具响应到记忆
if ("struct_parse".equals(llm.getFunctionCallType())) {
String content = getMemory().getLastMessage().getContent();
getMemory().getLastMessage().setContent(content + "\n 工具执行结果为:\n" + result);
} else { // function_call
Message toolMsg = Message.toolMessage(
result,
toolCall.getId(),
null
);
getMemory().addMessage(toolMsg);
}
}
if (Objects.nonNull(planningTool.getPlan())) {
if (isColseUpdate) {
planningTool.stepPlan();
}
return getNextTask();
}
return String.join("\n\n", results);
}
private String getNextTask() {
boolean allComplete = true;
for (String status : planningTool.getPlan().getStepStatus()) {
if (!"completed".equals(status)) {
allComplete = false;
break;
}
}
if (allComplete) {
setState(AgentState.FINISHED);
printer.send("plan", planningTool.getPlan());
return "finish";
}
if (!planningTool.getPlan().getCurrentStep().isEmpty()) {
setState(AgentState.FINISHED);
String[] currentSteps = planningTool.getPlan().getCurrentStep().split("<sep>");
printer.send("plan", planningTool.getPlan());
Arrays.stream(currentSteps).forEach(step -> printer.send("task", step));
return planningTool.getPlan().getCurrentStep();
}
return "";
}
@Override
public String run(String request) {
if (Objects.isNull(planningTool.getPlan())) {
GenieConfig genieConfig = SpringContextHolder.getApplicationContext().getBean(GenieConfig.class);
request = genieConfig.getPlanPrePrompt() + request;
}
return super.run(request);
}
}
绘图理解Agent请求链路


ReAct模式细则

Plan Exector模式

总结
总体来看,JoyAgent-JDGenie 在代理协作机制上的设计确实体现了 “轻量化、通用型、开箱即用” 的定位 —— 从 SSE 流式响应到 ReAct 与 Plan-Executor 双模式切换,再到 BaseAgent 提供的统一骨架,整个链路清晰且易于扩展。尤其是线程池异步调度、工具集合动态注入、以及策略模式的 Handler 分发,都为多智能体快速落地提供了扎实的技术底座。
当然,任何开源项目都有持续优化的空间。例如,SummaryAgent 的抽象层次偏高,虽然保证了通用性,但也增加了初读源码的理解成本;再比如,核心提示词目前集中写在 YAML 配置文件中,虽然便于集中管理,但在实际业务中频繁调整时,需要重新打包或重启服务,不够灵活,也不利于动态优化。
瑕不掩瑜,这些问题恰恰是社区可以参与共建的方向 —— 比如将 SummaryAgent 拆分为更明确的功能分层,或引入数据库 / 配置中心实现提示词的热更新。相信随着更多开发者的参与,JoyAgent 会在保持轻量的同时,变得更加易用、易维护。
如果你对多智能体协作感兴趣,不妨亲自拉取源码跑一遍流程,也许你会在这些 “槽点” 中找到属于自己的改进灵感。
更多推荐

所有评论(0)