ReActAgent 总算是到了本次源码阅读的终点站,就是为了这碟醋包的这盘饺子。 总之先来看一下ReActAgent的类说明,感觉根据类说明来进行源码阅读不失为一个方便理解的方法。
ReActAgent 官方类说明翻译 ReAct(推理与行动)智能体实现
ReAct 是一种智能体设计模式,它将推理(思考和规划)与行动(工具执行)结合在一个迭代循环中。智能体会在这两个阶段之间交替进行,直到完成任务或达到最大迭代次数限制。
核心特性:
响应式流处理 :使用 Project Reactor 实现非阻塞执行
Hook 系统 :可扩展的 Hook 机制,用于监控和拦截智能体执行过程
人机协同支持(HITL) :通过在 PostReasoningEvent/PostActingEvent 事件中调用 stopAgent() 实现人在回路
结构化输出 :继承自 StructuredOutputCapableAgent,提供类型安全的输出生成能力
使用示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 DashScopeChatModel model = DashScopeChatModel.builder() .apiKey(System.getenv("DASHSCOPE_API_KEY" )) .modelName("qwen-plus" ) .build(); Toolkit toolkit = new Toolkit ();toolkit.registerObject(new MyToolClass ()); ReActAgent agent = ReActAgent.builder() .name("Assistant" ) .sysPrompt("You are a helpful assistant." ) .model(model) .toolkit(toolkit) .memory(new InMemoryMemory ()) .maxIters(10 ) .build(); Msg response = agent.call(Msg.builder() .name("user" ) .role(MsgRole.USER) .content(TextBlock.builder().text("What's the weather?" ).build()) .build()).block();
自顶向下,ReActAgent的精髓就在于ReAct。直入主题,看看它是怎么Call的。
这里要援引一下之前的AgentBase中的call:call是由AgentBase定义的,而不是ReActAgent
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public final Mono<Msg> call (List<Msg> msgs) { return Mono.using( () -> { if (checkRunning && !running.compareAndSet(false , true )) { throw new IllegalStateException ( "Agent is still running, please wait for it to finish" ); } resetInterruptFlag(); return this ; }, resource -> TracerRegistry.get() .callAgent( this , msgs, () -> notifyPreCall(msgs) .flatMap(this ::doCall) .flatMap(this ::notifyPostCall) .onErrorResume( createErrorHandler( msgs.toArray(new Msg [0 ])))), resource -> running.set(false ), true ); }
Mono.using 深度解析:响应式资源管理 这段代码展示了 Project Reactor 中一个非常重要的操作符——Mono.using。它的设计灵感来源于 Java 的 try-with-resources 语句,专门用于响应式流中的资源管理 。
1️⃣ 为什么需要 Mono.using? 问题背景 : 在响应式编程中,传统的 try-finally 或 try-with-resources 无法直接工作,因为:
✅ 响应式流是异步 和延迟执行 的
✅ 资源的生命周期需要与流的订阅/取消/完成事件同步
❌ 普通 try-finally 会在订阅前就执行 finally,导致资源过早释放
示例对比 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 try { agent.running.set(true ); return doCall(msgs); } finally { agent.running.set(false ); } return Mono.using( () -> { }, resource -> { }, resource -> { } );
2️⃣ Mono.using 的三个参数详解 1 2 3 4 5 6 Mono.using( resourceSupplier, resourceFactory, resourceCleanup, eager )
参数 ①:resourceSupplier(资源供应器) 1 2 3 4 5 6 7 8 () -> { if (checkRunning && !running.compareAndSet(false , true )) { throw new IllegalStateException ( "Agent is still running, please wait for it to finish" ); } resetInterruptFlag(); return this ; }
职责 :
📥 获取资源 :设置运行状态、重置中断标志
🔒 并发控制 :通过 CAS 确保同一时间只有一个调用在执行
⚠️ 异常处理 :如果 Agent 正在运行,立即抛出异常
🔄 返回值 :返回的资源对象会传递给后续两个参数
关键点 :
这个方法在订阅时 执行(不是定义时)
如果抛出异常,整个流会立即失败
参数 ②:resourceFactory(资源工厂) 1 2 3 4 5 6 7 8 9 10 11 resource -> TracerRegistry.get() .callAgent( this , msgs, () -> notifyPreCall(msgs) .flatMap(this ::doCall) .flatMap(this ::notifyPostCall) .onErrorResume( createErrorHandler(msgs.toArray(new Msg [0 ]))))
职责 :
🏭 创建实际的响应式流 :这是真正的业务逻辑
🔗 接收资源 :从参数 ① 获取资源对象(这里是 this)
📤 返回 Mono :必须返回一个 Mono<T> 或 Flux<T>
执行流程 :
1 2 3 4 5 6 graph LR A[订阅] --> B[执行 resourceSupplier] B --> C[获取资源 this] C --> D[执行 resourceFactory] D --> E[创建 Mono 流] E --> F[开始异步执行]
参数 ③:resourceCleanup(资源清理器) 1 resource -> running.set(false )
职责 :
🧹 释放资源 :重置运行状态标志
⏰ 触发时机 :在以下任一情况发生时执行:
✅ 流正常完成(onComplete)
❌ 流发生错误(onError)
🛑 流被取消(cancel)
关键点 :
无论成功还是失败,都会执行
类似于 try-finally 中的 finally 块
可以安全地访问资源对象进行清理
参数 ④:eager(是否提前清理,可选)
含义 :
true:在下游订阅者收到信号之前 清理资源
false:在下游订阅者收到信号之后 清理资源
什么是”下游订阅者收到信号”?
在响应式流中,”信号”指的是以下三种终止事件之一:
信号类型
触发条件
Subscriber 回调
onComplete
流正常完成
subscriber.onComplete()
onError
流发生错误
subscriber.onError(throwable)
cancel
流被取消
(无回调,但会停止接收数据)
“收到信号”的时机 : 当 Mono/Flux 执行完毕后,会向订阅者(Subscriber)发送上述信号之一,此时 Subscriber 的对应回调方法会被调用。
eager=true vs eager=false 的区别 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 sequenceDiagram participant S as Subscriber participant M as Mono.using participant RC as ResourceCleanup Note over M: 流执行完毕 alt eager=true (默认) M->>RC: 先执行 cleanup activate RC RC->>RC: running.set(false) deactivate RC M->>S: 再发送信号 (onComplete/onError) activate S S->>S: 处理信号 deactivate S else eager=false M->>S: 先发送信号 (onComplete/onError) activate S S->>S: 处理信号 deactivate S M->>RC: 再执行 cleanup activate RC RC->>RC: running.set(false) deactivate RC end
实际影响示例 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Mono.using( () -> resource, r -> doWork(), r -> { throw new RuntimeException ("Cleanup failed!" ); }, true ) .subscribe( result -> System.out.println("Result: " + result), error -> System.out.println("Error: " + error) );
两种模式的选择建议 :
模式
适用场景
优点
缺点
eager=true ✅
大多数场景(默认推荐)
- 资源尽快释放 - cleanup 异常不影响业务信号
- Subscriber 无法访问已清理的资源
eager=false
需要在 Subscriber 中访问资源
- Subscriber 可以访问资源状态
- 资源持有时间更长 - cleanup 异常可能掩盖业务异常
Agentscope 的选择 :
1 2 3 4 5 return Mono.using( resource -> running.set(false ), true );
原因分析 :
✅ 快速释放锁 :让其他等待的调用能尽快获取锁
✅ 异常隔离 :即使 running.set(false) 失败,也不影响业务逻辑的信号传递
✅ 符合预期 :Agent 调用完成后,应该立即释放运行状态,而不是等到 Subscriber 处理完结果
影响 :通常使用默认值 true 即可
3️⃣ 完整的执行时序图 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 sequenceDiagram participant S as Subscriber participant U as Mono.using participant RS as ResourceSupplier participant RF as ResourceFactory participant RC as ResourceCleanup participant FL as Flux/Mono S->>U: subscribe() activate U U->>RS: 调用 resourceSupplier activate RS RS->>RS: 检查运行状态 RS->>RS: 重置中断标志 RS-->>U: 返回资源 this deactivate RS U->>RF: 调用 resourceFactory(resource) activate RF RF->>FL: 创建 Mono 流 RF-->>U: 返回 Mono deactivate RF U->>S: 订阅 Mono deactivate U Note over FL: 异步执行业务逻辑 FL->>FL: notifyPreCall FL->>FL: doCall FL->>FL: notifyPostCall alt 正常完成 FL-->>S: onComplete S->>RC: 触发 cleanup activate RC RC->>RC: running.set(false) deactivate RC else 发生错误 FL-->>S: onError S->>RC: 触发 cleanup activate RC RC->>RC: running.set(false) deactivate RC else 被取消 S->>FL: cancel FL->>RC: 触发 cleanup activate RC RC->>RC: running.set(false) deactivate RC end
4️⃣ Agentscope 中的具体应用分析 回到我们的代码,看看每个部分的作用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 return Mono.using( () -> { if (checkRunning && !running.compareAndSet(false , true )) { throw new IllegalStateException ("Agent is still running..." ); } resetInterruptFlag(); return this ; }, resource -> TracerRegistry.get().callAgent( this , msgs, () -> notifyPreCall(msgs) .flatMap(this ::doCall) .flatMap(this ::notifyPostCall) .onErrorResume(createErrorHandler(...)) ), resource -> running.set(false ), true );
设计意图 :
阶段
操作
目的
进入
compareAndSet(false, true)
防止并发调用,确保单例执行
执行
notifyPreCall → doCall → notifyPostCall
Hook 系统 + 实际推理
退出
running.set(false)
释放锁,允许下次调用
异常
自动触发 cleanup
即使出错也能正确释放资源
5️⃣ 与其他资源管理方式对比
方式
适用场景
优点
缺点
Mono.using
响应式流中的资源管理
自动处理完成/错误/取消
仅适用于 Mono/Flux
try-with-resources
同步代码
简洁直观
不适用于异步流
doFinally
简单的清理操作
灵活
需要手动管理资源获取
doOnTerminate
终止时执行
细粒度控制
不处理取消场景
推荐原则 :
✅ 如果需要成对的获取/释放 资源 → 使用 Mono.using
✅ 如果只是单向的清理 操作 → 使用 doFinally
❌ 不要在响应式流中使用 try-finally
6️⃣ 实际应用场景总结 Mono.using 的典型应用场景包括:
数据库连接管理
1 2 3 4 5 Mono.using( () -> connectionPool.getConnection(), conn -> executeQuery(conn), conn -> conn.close() )
文件流操作
1 2 3 4 5 Mono.using( () -> Files.newInputStream(path), stream -> readData(stream), stream -> stream.close() )
锁/信号量管理 (本案例)
1 2 3 4 5 Mono.using( () -> { lock.lock(); return lock; }, lock -> doWork(), lock -> lock.unlock() )
HTTP 客户端会话
1 2 3 4 5 Mono.using( () -> httpClient.createSession(), session -> session.sendRequest(), session -> session.close() )
7️⃣ 关键要点总结 ✅ 核心优势 :
将资源生命周期与响应式流绑定
自动处理所有终止场景(完成/错误/取消)
避免资源泄漏
✅ 执行保证 :
resourceSupplier 只执行一次
resourceCleanup 必定执行(除非 JVM 崩溃)
异常安全:cleanup 中的异常不会掩盖主流的异常
✅ 最佳实践 :
资源获取逻辑要简单快速(不要阻塞)
清理逻辑要幂等(可能被多次调用)
优先使用 Mono.using 而非手动管理
在加载了相关知识的Context之后,回到我们的ReActAgent。 这里有一个比较有意思的东西:
1 TracerRegistry.get().callAgent()
其实本质上也是一个hook,只不过是一个TraceHook用来记录整个call的链路,不过默认是NoopTracer,也就是无操作的Tracer(人话:默认不记录Trace,需要人为手动定义开启)。
OK,然后回过头来看ReActAgent中的doCall方法,也是除了pre和post的hook,最为主要的一个调用方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 protected Mono<Msg> doCall (List<Msg> msgs) { Set<String> pendingIds = getPendingToolUseIds(); if (pendingIds.isEmpty()) { addToMemory(msgs); return executeIteration(0 ); } validateAndAddToolResults(msgs, pendingIds); return hasPendingToolUse() ? acting(0 ) : executeIteration(0 ); }
n### doCall 方法流程分析
这个方法的核心逻辑是判断是否有待处理的工具调用 ,并根据情况选择不同的执行路径。
流程图 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 flowchart TD Start([doCall 开始]) --> GetPending[获取待处理工具 ID<br/>getPendingToolUseIds] GetPending --> CheckEmpty{pendingIds<br/>是否为空?} CheckEmpty -->|是| NormalPath[正常处理路径] CheckEmpty -->|否| PendingPath[待处理工具路径] NormalPath --> AddToMemory[addToMemory msgs<br/>将消息添加到记忆] AddToMemory --> ExecuteIter1[executeIteration 0<br/>从第 0 轮开始执行迭代] ExecuteIter1 --> End1([返回 Mono<Msg>]) PendingPath --> Validate[validateAndAddToolResults<br/>验证并添加工具结果] Validate --> CheckHasPending{hasPendingToolUse<br/>是否仍有待处理工具?} CheckHasPending -->|是| Acting[acting 0<br/>执行工具调用阶段] CheckHasPending -->|否| ExecuteIter2[executeIteration 0<br/>从第 0 轮开始执行迭代] Acting --> End2([返回 Mono<Msg>]) ExecuteIter2 --> End2 style Start fill:#000000 style End1 fill:#000000 style End2 fill:#000000 style NormalPath fill:#000000 style PendingPath fill:#000000 style Acting fill:#000000 style ExecuteIter1 fill:#000000 style ExecuteIter2 fill:#000000
关键分支说明 分支 1:无待处理工具(正常路径)
条件 :pendingIds.isEmpty() == true
场景 :首次调用或所有工具已执行完毕
操作 :
将输入消息添加到记忆
从第 0 轮开始执行完整的 ReAct 迭代(推理 → 行动)
分支 2:有待处理工具(恢复路径)
条件 :pendingIds.isEmpty() == false
场景 :之前有工具调用未完成,现在收到工具执行结果
操作 :
验证工具结果并添加到记忆
根据是否还有待处理工具选择:
仍有待处理 :进入 acting 阶段继续执行工具
无待处理 :进入 executeIteration 开始新的推理轮次
设计意图 这种设计支持断点续传 和人机协同(HITL) :
特性
实现方式
断点续传
通过 pendingIds 识别未完成的工具调用
状态恢复
validateAndAddToolResults 将外部工具结果注入记忆
灵活路由
根据状态动态选择 acting 或 executeIteration
迭代控制
统一从第 0 轮开始计数,简化状态管理
实际应用场景 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 agent.call(userMsg) → pendingIds 为空 → addToMemory + executeIteration(0 ) agent.call(userMsg) → 返回:需要调用 SearchTool agent.call(toolResultMsg) → pendingIds 不为空 → validateAndAddToolResults → 如果还有工具:acting(0 ) → 否则:executeIteration(0 )
那么就先从比较不那么重要的工具调用逻辑说起:acting方法。 其中需要先找到Memory中的工具,这里很有意思的一点在于会根据agent的名字来进行提取工具,可能是为了在observe的时候用于判断确实是当前agent的工具流吧。
1 2 3 4 5 6 if (msg.getRole() == MsgRole.ASSISTANT && msg.getName().equals(agentName)) { List<ToolUseBlock> toolCalls = msg.getContentBlocks(ToolUseBlock.class); if (!toolCalls.isEmpty()) { return toolCalls; } }
基于工具调用的特点,是这样的:一个Role为Assistant的ToolUseBlockContent消息中,会带有多个ToolUse调用,不会出现有多个Msg需要解析的情况,因此,框架只会去取最后一条Assistant消息进行解析,若为ToolUse且其中存在未调用的工具,那么就执行,所以是一条ToolUseBlock后面跟着多个ToolResultBlock的这样的结构。 然后在经过上述的流程拿到还未执行的工具之后,就出现了这样一条看起来很有B格的代码:
1 2 3 toolkit.setInternalChunkCallback( (toolUse, chunk) -> notifyActingChunk(toolUse, chunk).subscribe());
内部 Chunk 回调机制:流式工具执行的钩子桥接 这段代码的核心作用是将 Toolkit 内部的工具执行分块(chunk)事件转发到 Agent 的 Hook 系统 ,同时避免覆盖用户自定义的回调。
在某些场景下,工具执行不是一次性返回结果,而是流式输出多个数据块 。例如:
📡 网络请求 :分块接收 HTTP 响应
🗄️ 数据库查询 :逐行返回大量数据
📝 文件读取 :分块读取大文件
🔍 搜索引擎 :分页返回搜索结果
示例 :
1 2 3 4 5 6 7 8 9 ToolResultBlock result = searchTool.execute(query);searchTool.executeStream(query) → chunk 1 : {"results" : [1 -10 ]} → chunk 2 : {"results" : [11 -20 ]} → chunk 3 : {"results" : [21 -30 ]} → complete
2️⃣ 为什么需要 setInternalChunkCallback? 通俗理解:餐厅点餐系统的启示
想象一个餐厅的场景:
Toolkit = 厨房 :有自己的工作流程,厨师做菜时会记录自己的日志(”开始切菜”、”开始炒菜”等)
Agent = 服务员 :也需要知道菜的进度,以便告诉顾客,但不能破坏厨房的原有工作
问题背景 :
Toolkit 本身已经有自己的 chunk 处理逻辑(用于内部监控、日志等),但 Agent 层也需要感知这些 chunk 事件以触发 Hook。
数据流转示意图 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 graph TB subgraph Toolkit_Layer [Toolkit 层 - 厨房] A[工具执行] --> B[产生 Chunk] B --> C[Toolkit 内部回调] C --> D[记录内部日志] C --> E[更新内部状态] end subgraph Bridge_Layer [桥接层 - Internal Callback] B --> F[Internal Chunk Callback] F --> G[notifyActingChunk] end subgraph Agent_Layer [Agent 层 - 服务员] G --> H[触发 ActingChunkEvent] H --> I[Hook 1: 业务日志] H --> J[Hook 2: WebSocket推送] H --> K[Hook 3: 监控面板] end style Toolkit_Layer fill:#e1f5ff,stroke:#0288d1,stroke-width:2px,color:#000000 style Bridge_Layer fill:#fff3cd,stroke:#f57c00,stroke-width:2px,color:#000000 style Agent_Layer fill:#d4edda,stroke:#28a745,stroke-width:2px,color:#000000 style B fill:#ffeb3b,stroke:#f57c00,stroke-width:3px,color:#000000
流程说明 :
Toolkit 层(蓝色区域) :工具执行产生 chunk,内部回调处理自己的逻辑
桥接层(黄色区域) :通过 setInternalChunkCallback 设置的回调被触发
Agent 层(绿色区域) :Hook 系统接收到事件,执行所有注册的监听器
关键特点 :
✅ Toolkit 的内部逻辑不受影响(C → D, E)
✅ Agent 层能实时感知 chunk 事件(F → G → H)
✅ 两者并行工作,互不干扰
核心机制 :
Toolkit 产生 chunk 时,会同时通知多个回调(类似发布-订阅模式)
setInternalChunkCallback 注册的回调是额外的监听者
Toolkit 自己的内部回调继续工作,Agent 层的 Hook 也能收到通知
两者并行执行,互不阻塞
📡 类比:广播系统
就像电台广播:
Toolkit = 电台:自己记录播出日志
Internal Callback = 新增的接收器:把信号转发给 Agent
Agent Hook = 听众:可以有很多人同时收听
电台不会因为多了几个听众就停止自己的工作,听众之间也互不影响。
🔧 技术实现本质
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 List<ChunkCallback> internalCallbacks = new ArrayList <>(); void setInternalChunkCallback (ChunkCallback callback) { internalCallbacks.add(callback); } void onChunkGenerated (Chunk chunk) { for (ChunkCallback callback : internalCallbacks) { callback.onChunk(chunk); } }
设计挑战 :
1 2 3 4 5 6 7 toolkit.setChunkCallback(myCallback); toolkit.setInternalChunkCallback(myCallback);
核心区别 :
方式
效果
后果
setChunkCallback
覆盖 原有回调
🔥 破坏 Toolkit 的内部逻辑
setInternalChunkCallback
追加 新回调
✅ 两者互不干扰,协同工作
实际执行流程 :
当工具执行产生一个数据块时(比如搜索到第10条结果):
Toolkit 内部 :继续执行自己的处理逻辑(记录日志、更新状态等)
Internal Callback :额外触发我们设置的回调
Agent Hook 系统 :执行所有注册的监听器
可能记录业务日志
可能发送 WebSocket 消息给前端
可能更新实时监控面板
这种设计实现了多层架构中的事件传递 ,每一层都保持独立,通过”追加”而不是”覆盖”的方式实现协作。
3️⃣ 代码解析 1 2 3 toolkit.setInternalChunkCallback( (toolUse, chunk) -> notifyActingChunk(toolUse, chunk).subscribe() );
分解说明 :
部分
含义
setInternalChunkCallback
设置内部 chunk 回调(不覆盖用户回调)
(toolUse, chunk) -> ...
Lambda 表达式,接收工具调用信息和数据块
notifyActingChunk(...)
通知 Agent 的 Hook 系统,触发 ActingChunkEvent
.subscribe()
订阅响应式流,确保 Hook 执行
执行流程 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 sequenceDiagram participant T as Toolkit participant IC as InternalCallback participant A as Agent participant H as Hook System T->>T: 工具执行中产生 chunk T->>IC: 调用 internalChunkCallback activate IC IC->>A: notifyActingChunk(toolUse, chunk) activate A A->>H: 触发 ActingChunkEvent activate H H->>H: 执行所有注册的 Hook H-->>A: 完成 deactivate H A-->>IC: Mono 完成 deactivate A IC-->>T: subscribe 完成 deactivate IC
4️⃣ ActingChunkEvent Hook 的作用 当工具执行产生 chunk 时,会触发 ActingChunkEvent,允许外部监听器:
应用场景 :
实时进度显示
1 2 3 4 5 6 7 agent.registerHook(new AgentHook () { @Override public Mono<Void> onActingChunk (ActingChunkEvent event) { System.out.println("工具执行进度: " + event.getChunk()); return Mono.empty(); } });
流式响应前端
1 2 3 4 5 6 7 8 agent.registerHook(new AgentHook () { @Override public Mono<Void> onActingChunk (ActingChunkEvent event) { webSocketSession.send(event.getChunk()); return Mono.empty(); } });
实时监控与日志
1 2 3 4 5 6 7 8 9 10 agent.registerHook(new AgentHook () { @Override public Mono<Void> onActingChunk (ActingChunkEvent event) { logger.info("Tool {} chunk: {}" , event.getToolUse().getName(), event.getChunk()); return Mono.empty(); } });
5️⃣ 为什么要 .subscribe()? 关键点 :notifyActingChunk 返回的是 Mono<Void>,这是一个冷 Observable ,必须订阅才会执行。
1 2 3 4 5 (toolUse, chunk) -> notifyActingChunk(toolUse, chunk) (toolUse, chunk) -> notifyActingChunk(toolUse, chunk).subscribe()
原因 :
Project Reactor 的 Mono/Flux 是惰性求值 的
只有订阅后,链式调用才会真正执行
这里使用”即发即弃”模式,不关心 Hook 的执行结果
6️⃣ 设计哲学:职责分离与扩展性 这种设计体现了 Agentscope 的核心思想:
层级
职责
实现
Toolkit 层
工具执行与内部管理
维护自己的 chunk 回调
Agent 层
业务逻辑与 Hook 系统
通过 internal callback 桥接
Hook 层
横切关注点(监控、日志等)
监听 ActingChunkEvent
优势 :
✅ 解耦 :Toolkit 不需要知道 Agent 的 Hook 系统
✅ 可扩展 :用户可以注册任意数量的 Hook
✅ 向后兼容 :不影响 Toolkit 的原有功能
✅ 灵活性 :Hook 可以是异步的(Mono)
7️⃣ 完整的数据流 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 graph LR A[工具执行] --> B[产生 Chunk] B --> C[Toolkit 内部处理] B --> D[Internal Callback] D --> E[notifyActingChunk] E --> F[ActingChunkEvent] F --> G[Hook 1: 日志] F --> H[Hook 2: 监控] F --> I[Hook 3: WebSocket] style A fill:#e1f5ff style F fill:#fff3cd style G fill:#d4edda style H fill:#d4edda style I fill:#d4edda
8️⃣ 总结 这段看似简单的代码实际上是一个精巧的桥接模式 :
将底层的工具执行事件(Toolkit)与上层的业务钩子系统(Agent Hook)无缝连接,同时保持各层的独立性和可扩展性。
核心价值 :
✅ 非侵入式 :不修改 Toolkit 的原有逻辑
✅ 响应式 :支持异步 Hook 执行
✅ 可观测 :让工具执行过程透明化
✅ 灵活扩展 :用户可以自由添加监控逻辑
再回到工具调用的代码中来,省略工具调用的preHook不谈,就进入到工具具体调用的方法中:
1 2 3 4 5 6 7 8 9 private Mono<List<Map.Entry<ToolUseBlock, ToolResultBlock>>> executeToolCalls( List<ToolUseBlock> toolCalls) { return toolkit.callTools(toolCalls, toolExecutionConfig, this , toolExecutionContext) .map( results -> IntStream.range(0 , toolCalls.size()) .mapToObj(i -> Map.entry(toolCalls.get(i), results.get(i))) .toList()); }
归纳起来工具调用有这么几个要素:工具 + 工具配置 + 工具上下文 + agent实例
概括成一句话的话就是:Agent实例根据工具配置结合工具上下文调用工具。最后得到的结果也是ToolUse和ToolResult一一对应的。
然后我们一个一个来说:
工具配置 :
超时时间(默认为5min)
最大尝试次数(默认为3次)
初始退避时间(初始为2s)
最大退避时间(最大为30s)
每次退避时间的乘子(2,即每次乘以2) => 2 * 2 * 2 * 2 … 最大达到最大退避时间后不动了
什么样的异常应当发生退避 Predicate<Throwable> => 默认根据字段的 RETRYABLE_ERRORS 配置的错误类型判断
工具上下文 :
一个比较简单的类结构
class ToolExecutionContext => private final List<ContextStore> stores;
但是在其中配置了stores必须是不可变 的。 那么别的方法看了一圈乏善可陈,无非是对stores进行get/build/merge之类的。 那么只好进一步剖开这个ContextStore看看是什么了。
先摆一段类说明:
ContextStore 接口解析 :
这是工具执行上下文的存储层抽象接口,定义了上下文对象的存储契约。它支持两种检索模式:
仅按类型获取 :get(Class<T>) - 适用于单例场景
按键 + 类型获取 :get(String, Class<T>) - 适用于多实例场景
这种设计既能处理简单情况(一个 UserContext),也能处理复杂情况(不同用户的多个 UserContext 实例)。
实现方式 :
简单的内存 Map 存储(DefaultContextStore)
自定义存储后端(Redis、数据库等)
使用示例 :
1 2 3 4 5 6 DatabaseConfig config = store.get(DatabaseConfig.class);UserContext admin = store.get("admin" , UserContext.class);UserContext guest = store.get("guest" , UserContext.class);
设计意图 :
这个接口的核心价值在于解耦存储实现与业务逻辑 :
Agent 层不需要关心数据存在哪里(内存/Redis/数据库)
可以根据实际需求灵活切换存储后端
同时支持单例 和多实例 两种访问模式,适应不同场景
从本质上看,这里的工具上下文实际上定义了一个名为ContextStore的类,它作为常规意义上的上下文对象存储在ToolExecutionContext中。可以将ContextStore视为AgentScope框架中的上下文管理器 。值得注意的是,ToolExecutionContext本身并不关注具体的上下文内容,而是作为一个标识符存在,只要符合ContextStore结构规范的上下文数据都能被正确解析和处理。 然后便是工具的执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 return executor.executeAll(toolCalls, config.isParallel(), effectiveConfig, agent, agentContext);↓↓↓ List<Mono<ToolResultBlock>> monos = toolCalls.stream() .map( toolCall -> executeWithInfrastructure( toolCall, executionConfig, agent, agentContext)) .toList(); if (parallel) { return Flux.mergeSequential(monos).collectList(); } return Flux.concat(monos).collectList();
总之先发现:工具调用既可以并行也可以串行! 那么再总之,先看看 executeWithInfrastructure 做了什么。
构建参数并执行
1 2 3 4 5 6 7 8 9 10 ToolCallParam param = ToolCallParam.builder() .toolUseBlock(toolCall) .agent(agent) .context(agentContext) .build(); Mono<ToolResultBlock> execution = execute(param);
其中主要是进行了工具有效性的校验,然后工具Input信息的merge:
优先级:工具默认参数 < 从ToolCallParam传入的Input参数(默认为空) < 模型自身给出的Input参数(从ToolUseBlock中获取)
然后创建出最终的Param,最后返回一个Mono
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 return tool.callAsync(executionParam) .onErrorResume( ToolSuspendException.class, e -> { logger.debug( "Tool '{}' suspended: {}" , toolCall.getName(), e.getReason() != null ? e.getReason() : "no reason" ); return Mono.just(ToolResultBlock.suspended(toolCall, e)); }) .onErrorResume( e -> { String errorMsg = e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName(); return Mono.just( ToolResultBlock.error("Tool execution failed: " + errorMsg)); });
其他其实看个意思,看看这个suspend是怎么个事儿:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public static ToolResultBlock suspended (ToolUseBlock toolUse, ToolSuspendException exception) { String content = exception.getReason() != null ? exception.getReason() : "[Awaiting external execution]" ; return new ToolResultBlock ( toolUse.getId(), toolUse.getName(), List.of(TextBlock.builder().text(content).build()), Map.of(METADATA_SUSPENDED, true )); }
不过暂时还不知道做什么用的,之后在具体的Msg的时候再看看,这里我也先suspend了。
那么到了这里之后,工具的调用就已经封装为了Mono<ToolResultBlock>,其实主要的工具调用工作就已经完成了,但是从框架中发现还有很多收尾的措施:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 execution = applyScheduling(execution); private Mono<ToolResultBlock> applyScheduling (Mono<ToolResultBlock> execution) { if (executorService == null ) { return execution.subscribeOn(Schedulers.boundedElastic()); } return execution.subscribeOn(Schedulers.fromExecutor(executorService)); } execution = applyTimeout(execution, executionConfig, toolCall); private Mono<ToolResultBlock> applyTimeout ( Mono<ToolResultBlock> execution, ExecutionConfig config, ToolUseBlock toolCall) { if (config == null || config.getTimeout() == null ) { return execution; } Duration timeout = config.getTimeout(); logger.debug("Applied timeout: {} for tool: {}" , timeout, toolCall.getName()); return execution.timeout( timeout, Mono.error(new RuntimeException ("Tool execution timeout after " + timeout))); } execution = applyRetry(execution, executionConfig, toolCall); private Mono<ToolResultBlock> applyRetry ( Mono<ToolResultBlock> execution, ExecutionConfig config, ToolUseBlock toolCall) { if (config == null || config.getMaxAttempts() == null || config.getMaxAttempts() <= 1 ) { return execution; } Integer maxAttempts = config.getMaxAttempts(); Duration initialBackoff = config.getInitialBackoff() != null ? config.getInitialBackoff() : Duration.ofSeconds(1 ); Duration maxBackoff = config.getMaxBackoff() != null ? config.getMaxBackoff() : Duration.ofSeconds(10 ); Predicate<Throwable> retryOn = config.getRetryOn() != null ? config.getRetryOn() : error -> true ; Retry retrySpec = Retry.backoff(maxAttempts - 1 , initialBackoff) .maxBackoff(maxBackoff) .jitter(0.5 ) .filter(retryOn) .doBeforeRetry( signal -> logger.warn( "Retrying tool call (attempt {}/{}) due to: {}" , signal.totalRetriesInARow() + 1 , maxAttempts - 1 , signal.failure().getMessage(), signal.failure())); logger.debug( "Applied retry config: maxAttempts={} for tool: {}" , maxAttempts, toolCall.getName()); return execution.retryWhen(retrySpec); }
那如上的一大堆事情结束之后,终于又回到了之前一开始的acting(0)的代码中,真是饶了好大好大的一圈。
在收集到了上述的整个工具调度MonoList之后,就会开始分配是否挂起suspend.
第一步:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 List<Map.Entry<ToolUseBlock, ToolResultBlock>> successPairs = results.stream() .filter(e -> !e.getValue().isSuspended()) .toList(); List<Map.Entry<ToolUseBlock, ToolResultBlock>> pendingPairs = results.stream() .filter(e -> e.getValue().isSuspended()) .toList(); if (successPairs.isEmpty()) { if (!pendingPairs.isEmpty()) { return Mono.just(buildSuspendedMsg(pendingPairs)); } return executeIteration(iter + 1 ); } private Msg buildSuspendedMsg (List<Map.Entry<ToolUseBlock, ToolResultBlock>> pendingPairs) { List<ContentBlock> content = new ArrayList <>(); for (Map.Entry<ToolUseBlock, ToolResultBlock> pair : pendingPairs) { content.add(pair.getKey()); content.add(pair.getValue()); } return Msg.builder() .name(getName()) .role(MsgRole.ASSISTANT) .content(content) .generateReason(GenerateReason.TOOL_SUSPENDED) .build(); }
这个挂起消息在Msg类里了,之后单独看看,先略过。然后看如果有需要进行处理的工具消息的话:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 return Flux.fromIterable(successPairs) .concatMap(this ::notifyPostActingHook) .last() .flatMap( event -> { if (event.isStopRequested()) { return Mono.just( event.getToolResultMsg() .withGenerateReason( GenerateReason .ACTING_STOP_REQUESTED)); } if (!pendingPairs.isEmpty()) { return Mono.just( buildSuspendedMsg(pendingPairs)); } return executeIteration(iter + 1 ); });
和上面的结合一下其实还算比较简单,接着看具体的推理过程!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 private Mono<Msg> reasoning (int iter, boolean ignoreMaxIters) { if (!ignoreMaxIters && iter >= maxIters) { return summarizing(); } ReasoningContext context = new ReasoningContext (getName()); return checkInterruptedAsync() .then(notifyPreReasoningEvent(prepareMessages())) .flatMapMany( event -> { GenerateOptions options = event.getEffectiveGenerateOptions() != null ? event.getEffectiveGenerateOptions() : buildGenerateOptions(); return model.stream( event.getInputMessages(), toolkit.getToolSchemas(), options) .concatMap(chunk -> checkInterruptedAsync().thenReturn(chunk)); }) .doOnNext( chunk -> { List<Msg> chunkMsgs = context.processChunk(chunk); for (Msg msg : chunkMsgs) { notifyReasoningChunk(msg, context).subscribe(); } }) .then(Mono.defer(() -> Mono.justOrEmpty(context.buildFinalMessage()))) .onErrorResume( InterruptedException.class, error -> { Msg msg = context.buildFinalMessage(); if (msg != null ) { memory.addMessage(msg); } return Mono.error(error); }) .flatMap(this ::notifyPostReasoning) .flatMap( event -> { Msg msg = event.getReasoningMessage(); if (msg != null ) { memory.addMessage(msg); } if (event.isStopRequested()) { return Mono.just( msg.withGenerateReason( GenerateReason.REASONING_STOP_REQUESTED)); } if (event.isGotoReasoningRequested()) { List<Msg> gotoMsgs = event.getGotoReasoningMsgs(); if (gotoMsgs != null ) { gotoMsgs.forEach(memory::addMessage); } return reasoning(iter + 1 , true ); } if (isFinished(msg)) { return Mono.just(msg); } return checkInterruptedAsync().then(acting(iter)); }) .switchIfEmpty( Mono.defer( () -> { return Mono.justOrEmpty((Msg) null ); })); }
若轮数超出最大轮数,进行总结直接返回
这里其实比较简单,是通过一段prompt指引大模型进行总结的,来看看框架里的prompt长什么样。
You have failed to generate response within the maximum iterations. Now respond directly by summarizing the current situation.
可以说在众多的注释里显得非常通俗易懂了,连我都可以直接看懂。(
如果说没有达到最大轮数的话,那么就进入正常的推理环节。依旧跳过各种preHook。
调用模型的stream方法进行请求获取模型响应chunks
判断是否存在HITL(Human-in-the-Loop) stop或者gotoReasoningRequested
无剩余工具调用则返回结果或调用acting处理剩余的toolUse
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 flowchart TD Start([开始]) --> CheckInterrupt1{检查中断?} CheckInterrupt1 -->|是| InterruptEnd([中断退出]) CheckInterrupt1 -->|否| NotifyPre[通知PreReasoningEvent] NotifyPre --> BuildOptions{获取GenerateOptions?} BuildOptions -->|event中有| UseEventOpts[使用event中的配置] BuildOptions -->|event中无| BuildNewOpts[构建新配置] UseEventOpts --> ModelStream BuildNewOpts --> ModelStream ModelStream[调用model.stream] --> CheckInterruptChunk{检查中断?} CheckInterruptChunk -->|是| InterruptEnd CheckInterruptChunk -->|否| ProcessChunk[处理Chunk数据] ProcessChunk --> NotifyChunk[通知ReasoningChunk] NotifyChunk --> HasMoreChunks{还有更多Chunk?} HasMoreChunks -->|是| ModelStream HasMoreChunks -->|否| BuildFinalMsg{构建最终消息?} BuildFinalMsg -->|有内容| GetFinalMsg[获取最终消息] BuildFinalMsg -->|为空| SwitchEmpty[返回null] GetFinalMsg --> HandleInterruptError{处理中断异常?} HandleInterruptError -->|捕获异常| SaveAndError[保存消息并报错] SaveAndError --> InterruptEnd HandleInterruptError -->|无异常| NotifyPost[通知PostReasoningEvent] NotifyPost --> HitlCheck{用户请求停止?} HitlCheck -->|是| ReturnStopMsg[返回停止消息] HitlCheck -->|否| GotoCheck{请求跳转推理?} GotoCheck -->|是| AddGotoMsgs[添加消息到memory] AddGotoMsgs --> RecurseReasoning[递归调用reasoning] RecurseReasoning --> CheckInterrupt1 GotoCheck -->|否| FinishCheck{推理是否完成?} FinishCheck -->|是| ReturnFinishMsg[返回最终消息] FinishCheck -->|否| CheckInterrupt2{检查中断?} CheckInterrupt2 -->|是| InterruptEnd CheckInterrupt2 -->|否| GoToActing[进入Acting阶段] ReturnStopMsg --> End([结束]) ReturnFinishMsg --> End GoToActing --> End SwitchEmpty --> End
问了一圈AI,发现应该框架还是主要依赖于基模本身的模型思考能力,来建立整个ReAct架构的