ReActAgent

总算是到了本次源码阅读的终点站,就是为了这碟醋包的这盘饺子。
总之先来看一下ReActAgent的类说明,感觉根据类说明来进行源码阅读不失为一个方便理解的方法。

ReActAgent 官方类说明翻译

ReAct(推理与行动)智能体实现

ReAct 是一种智能体设计模式,它将推理(思考和规划)与行动(工具执行)结合在一个迭代循环中。智能体会在这两个阶段之间交替进行,直到完成任务或达到最大迭代次数限制。

核心特性:

  • 响应式流处理:使用 Project Reactor 实现非阻塞执行
  • Hook 系统:可扩展的 Hook 机制,用于监控和拦截智能体执行过程
  • 人机协同支持(HITL):通过在 PostReasoningEvent/PostActingEvent 事件中调用 stopAgent() 实现人在回路
  • 结构化输出:继承自 StructuredOutputCapableAgent,提供类型安全的输出生成能力

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 创建模型
DashScopeChatModel model = DashScopeChatModel.builder()
.apiKey(System.getenv("DASHSCOPE_API_KEY"))
.modelName("qwen-plus")
.build();

// 创建包含工具的工具包
Toolkit toolkit = new Toolkit();
toolkit.registerObject(new MyToolClass());

// 构建智能体
ReActAgent agent = ReActAgent.builder()
.name("Assistant")
.sysPrompt("You are a helpful assistant.")
.model(model)
.toolkit(toolkit)
.memory(new InMemoryMemory())
.maxIters(10)
.build();

// 使用智能体
Msg response = agent.call(Msg.builder()
.name("user")
.role(MsgRole.USER)
.content(TextBlock.builder().text("What's the weather?").build())
.build()).block();

自顶向下,ReActAgent的精髓就在于ReAct。直入主题,看看它是怎么Call的。

这里要援引一下之前的AgentBase中的callcall是由AgentBase定义的,而不是ReActAgent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final Mono<Msg> call(List<Msg> msgs) {
return Mono.using(
() -> {
if (checkRunning && !running.compareAndSet(false, true)) {
throw new IllegalStateException(
"Agent is still running, please wait for it to finish");
}
resetInterruptFlag();
return this;
},
resource ->
TracerRegistry.get()
.callAgent(
this,
msgs,
() ->
notifyPreCall(msgs)
.flatMap(this::doCall)
.flatMap(this::notifyPostCall)
.onErrorResume(
createErrorHandler(
msgs.toArray(new Msg[0])))),
resource -> running.set(false),
true);
}

Mono.using 深度解析:响应式资源管理

这段代码展示了 Project Reactor 中一个非常重要的操作符——Mono.using。它的设计灵感来源于 Java 的 try-with-resources 语句,专门用于响应式流中的资源管理

1️⃣ 为什么需要 Mono.using?

问题背景
在响应式编程中,传统的 try-finally 或 try-with-resources 无法直接工作,因为:

  • ✅ 响应式流是异步延迟执行
  • ✅ 资源的生命周期需要与流的订阅/取消/完成事件同步
  • ❌ 普通 try-finally 会在订阅前就执行 finally,导致资源过早释放

示例对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ❌ 错误做法:传统 try-finally 不适用于响应式流
try {
agent.running.set(true); // 获取资源
return doCall(msgs); // 返回 Mono,但此时还没执行
} finally {
agent.running.set(false); // ← 立即执行!资源过早释放
}

// ✅ 正确做法:使用 Mono.using
return Mono.using(
() -> { /* 获取资源 */ },
resource -> { /* 使用资源创建流 */ },
resource -> { /* 释放资源 */ } // ← 在流完成/错误/取消时执行
);

2️⃣ Mono.using 的三个参数详解

1
2
3
4
5
6
Mono.using(
resourceSupplier, // ① 资源供应器
resourceFactory, // ② 资源工厂(创建流)
resourceCleanup, // ③ 资源清理器
eager // ④ 是否提前清理(可选)
)
参数 ①:resourceSupplier(资源供应器)
1
2
3
4
5
6
7
8
() -> {
if (checkRunning && !running.compareAndSet(false, true)) {
throw new IllegalStateException(
"Agent is still running, please wait for it to finish");
}
resetInterruptFlag();
return this; // 返回资源对象
}

职责

  • 📥 获取资源:设置运行状态、重置中断标志
  • 🔒 并发控制:通过 CAS 确保同一时间只有一个调用在执行
  • ⚠️ 异常处理:如果 Agent 正在运行,立即抛出异常
  • 🔄 返回值:返回的资源对象会传递给后续两个参数

关键点

  • 这个方法在订阅时执行(不是定义时)
  • 如果抛出异常,整个流会立即失败
参数 ②:resourceFactory(资源工厂)
1
2
3
4
5
6
7
8
9
10
11
resource ->
TracerRegistry.get()
.callAgent(
this,
msgs,
() ->
notifyPreCall(msgs)
.flatMap(this::doCall)
.flatMap(this::notifyPostCall)
.onErrorResume(
createErrorHandler(msgs.toArray(new Msg[0]))))

职责

  • 🏭 创建实际的响应式流:这是真正的业务逻辑
  • 🔗 接收资源:从参数 ① 获取资源对象(这里是 this
  • 📤 返回 Mono:必须返回一个 Mono<T>Flux<T>

执行流程

1
2
3
4
5
6
graph LR
A[订阅] --> B[执行 resourceSupplier]
B --> C[获取资源 this]
C --> D[执行 resourceFactory]
D --> E[创建 Mono 流]
E --> F[开始异步执行]
参数 ③:resourceCleanup(资源清理器)
1
resource -> running.set(false)

职责

  • 🧹 释放资源:重置运行状态标志
  • 触发时机:在以下任一情况发生时执行:
    • ✅ 流正常完成(onComplete)
    • ❌ 流发生错误(onError)
    • 🛑 流被取消(cancel)

关键点

  • 无论成功还是失败,都会执行
  • 类似于 try-finally 中的 finally 块
  • 可以安全地访问资源对象进行清理
参数 ④:eager(是否提前清理,可选)
1
true  // 默认值

含义

  • true:在下游订阅者收到信号之前清理资源
  • false:在下游订阅者收到信号之后清理资源

什么是”下游订阅者收到信号”?

在响应式流中,”信号”指的是以下三种终止事件之一:

信号类型 触发条件 Subscriber 回调
onComplete 流正常完成 subscriber.onComplete()
onError 流发生错误 subscriber.onError(throwable)
cancel 流被取消 (无回调,但会停止接收数据)

“收到信号”的时机
当 Mono/Flux 执行完毕后,会向订阅者(Subscriber)发送上述信号之一,此时 Subscriber 的对应回调方法会被调用。

eager=true vs eager=false 的区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
sequenceDiagram
participant S as Subscriber
participant M as Mono.using
participant RC as ResourceCleanup

Note over M: 流执行完毕

alt eager=true (默认)
M->>RC: 先执行 cleanup
activate RC
RC->>RC: running.set(false)
deactivate RC
M->>S: 再发送信号 (onComplete/onError)
activate S
S->>S: 处理信号
deactivate S
else eager=false
M->>S: 先发送信号 (onComplete/onError)
activate S
S->>S: 处理信号
deactivate S
M->>RC: 再执行 cleanup
activate RC
RC->>RC: running.set(false)
deactivate RC
end

实际影响示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 场景:cleanup 操作可能会抛出异常
Mono.using(
() -> resource,
r -> doWork(),
r -> {
// 如果这里抛出异常
throw new RuntimeException("Cleanup failed!");
},
true // eager=true
)
.subscribe(
result -> System.out.println("Result: " + result),
error -> System.out.println("Error: " + error) // ← 收到的是业务错误,不是 cleanup 错误
);

两种模式的选择建议

模式 适用场景 优点 缺点
eager=true 大多数场景(默认推荐) - 资源尽快释放
- cleanup 异常不影响业务信号
- Subscriber 无法访问已清理的资源
eager=false 需要在 Subscriber 中访问资源 - Subscriber 可以访问资源状态 - 资源持有时间更长
- cleanup 异常可能掩盖业务异常

Agentscope 的选择

1
2
3
4
5
return Mono.using(
// ...
resource -> running.set(false), // cleanup
true // ← 选择 eager=true
);

原因分析

  1. 快速释放锁:让其他等待的调用能尽快获取锁
  2. 异常隔离:即使 running.set(false) 失败,也不影响业务逻辑的信号传递
  3. 符合预期:Agent 调用完成后,应该立即释放运行状态,而不是等到 Subscriber 处理完结果

影响:通常使用默认值 true 即可

3️⃣ 完整的执行时序图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
sequenceDiagram
participant S as Subscriber
participant U as Mono.using
participant RS as ResourceSupplier
participant RF as ResourceFactory
participant RC as ResourceCleanup
participant FL as Flux/Mono

S->>U: subscribe()
activate U
U->>RS: 调用 resourceSupplier
activate RS
RS->>RS: 检查运行状态
RS->>RS: 重置中断标志
RS-->>U: 返回资源 this
deactivate RS

U->>RF: 调用 resourceFactory(resource)
activate RF
RF->>FL: 创建 Mono 流
RF-->>U: 返回 Mono
deactivate RF

U->>S: 订阅 Mono
deactivate U

Note over FL: 异步执行业务逻辑
FL->>FL: notifyPreCall
FL->>FL: doCall
FL->>FL: notifyPostCall

alt 正常完成
FL-->>S: onComplete
S->>RC: 触发 cleanup
activate RC
RC->>RC: running.set(false)
deactivate RC
else 发生错误
FL-->>S: onError
S->>RC: 触发 cleanup
activate RC
RC->>RC: running.set(false)
deactivate RC
else 被取消
S->>FL: cancel
FL->>RC: 触发 cleanup
activate RC
RC->>RC: running.set(false)
deactivate RC
end

4️⃣ Agentscope 中的具体应用分析

回到我们的代码,看看每个部分的作用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
return Mono.using(
// ① 资源获取:进入临界区
() -> {
if (checkRunning && !running.compareAndSet(false, true)) {
throw new IllegalStateException("Agent is still running...");
}
resetInterruptFlag();
return this;
},

// ② 业务逻辑:执行 Agent 调用链
resource -> TracerRegistry.get().callAgent(
this, msgs,
() -> notifyPreCall(msgs)
.flatMap(this::doCall)
.flatMap(this::notifyPostCall)
.onErrorResume(createErrorHandler(...))
),

// ③ 资源释放:退出临界区
resource -> running.set(false),

true // 提前清理
);

设计意图

阶段 操作 目的
进入 compareAndSet(false, true) 防止并发调用,确保单例执行
执行 notifyPreCall → doCall → notifyPostCall Hook 系统 + 实际推理
退出 running.set(false) 释放锁,允许下次调用
异常 自动触发 cleanup 即使出错也能正确释放资源

5️⃣ 与其他资源管理方式对比

方式 适用场景 优点 缺点
Mono.using 响应式流中的资源管理 自动处理完成/错误/取消 仅适用于 Mono/Flux
try-with-resources 同步代码 简洁直观 不适用于异步流
doFinally 简单的清理操作 灵活 需要手动管理资源获取
doOnTerminate 终止时执行 细粒度控制 不处理取消场景

推荐原则

  • ✅ 如果需要成对的获取/释放资源 → 使用 Mono.using
  • ✅ 如果只是单向的清理操作 → 使用 doFinally
  • ❌ 不要在响应式流中使用 try-finally

6️⃣ 实际应用场景总结

Mono.using 的典型应用场景包括:

  1. 数据库连接管理

    1
    2
    3
    4
    5
    Mono.using(
    () -> connectionPool.getConnection(),
    conn -> executeQuery(conn),
    conn -> conn.close()
    )
  2. 文件流操作

    1
    2
    3
    4
    5
    Mono.using(
    () -> Files.newInputStream(path),
    stream -> readData(stream),
    stream -> stream.close()
    )
  3. 锁/信号量管理(本案例)

    1
    2
    3
    4
    5
    Mono.using(
    () -> { lock.lock(); return lock; },
    lock -> doWork(),
    lock -> lock.unlock()
    )
  4. HTTP 客户端会话

    1
    2
    3
    4
    5
    Mono.using(
    () -> httpClient.createSession(),
    session -> session.sendRequest(),
    session -> session.close()
    )

7️⃣ 关键要点总结

核心优势

  • 将资源生命周期与响应式流绑定
  • 自动处理所有终止场景(完成/错误/取消)
  • 避免资源泄漏

执行保证

  • resourceSupplier 只执行一次
  • resourceCleanup 必定执行(除非 JVM 崩溃)
  • 异常安全:cleanup 中的异常不会掩盖主流的异常

最佳实践

  • 资源获取逻辑要简单快速(不要阻塞)
  • 清理逻辑要幂等(可能被多次调用)
  • 优先使用 Mono.using 而非手动管理

在加载了相关知识的Context之后,回到我们的ReActAgent
这里有一个比较有意思的东西:

1
TracerRegistry.get().callAgent()

其实本质上也是一个hook,只不过是一个TraceHook用来记录整个call的链路,不过默认是NoopTracer,也就是无操作的Tracer(人话:默认不记录Trace,需要人为手动定义开启)。

OK,然后回过头来看ReActAgent中的doCall方法,也是除了preposthook,最为主要的一个调用方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
protected Mono<Msg> doCall(List<Msg> msgs) {
Set<String> pendingIds = getPendingToolUseIds();

// No pending tools -> normal processing
if (pendingIds.isEmpty()) {
addToMemory(msgs);
return executeIteration(0);
}

// Has pending tools -> validate and add tool results
validateAndAddToolResults(msgs, pendingIds);
return hasPendingToolUse() ? acting(0) : executeIteration(0);
}

n### doCall 方法流程分析

这个方法的核心逻辑是判断是否有待处理的工具调用,并根据情况选择不同的执行路径。

流程图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
flowchart TD
Start([doCall 开始]) --> GetPending[获取待处理工具 ID<br/>getPendingToolUseIds]
GetPending --> CheckEmpty{pendingIds<br/>是否为空?}

CheckEmpty -->|是| NormalPath[正常处理路径]
CheckEmpty -->|否| PendingPath[待处理工具路径]

NormalPath --> AddToMemory[addToMemory msgs<br/>将消息添加到记忆]
AddToMemory --> ExecuteIter1[executeIteration 0<br/>从第 0 轮开始执行迭代]
ExecuteIter1 --> End1([返回 Mono&lt;Msg&gt;])

PendingPath --> Validate[validateAndAddToolResults<br/>验证并添加工具结果]
Validate --> CheckHasPending{hasPendingToolUse<br/>是否仍有待处理工具?}

CheckHasPending -->|是| Acting[acting 0<br/>执行工具调用阶段]
CheckHasPending -->|否| ExecuteIter2[executeIteration 0<br/>从第 0 轮开始执行迭代]

Acting --> End2([返回 Mono&lt;Msg&gt;])
ExecuteIter2 --> End2

style Start fill:#000000
style End1 fill:#000000
style End2 fill:#000000
style NormalPath fill:#000000
style PendingPath fill:#000000
style Acting fill:#000000
style ExecuteIter1 fill:#000000
style ExecuteIter2 fill:#000000

关键分支说明

分支 1:无待处理工具(正常路径)

  • 条件pendingIds.isEmpty() == true
  • 场景:首次调用或所有工具已执行完毕
  • 操作
    1. 将输入消息添加到记忆
    2. 从第 0 轮开始执行完整的 ReAct 迭代(推理 → 行动)

分支 2:有待处理工具(恢复路径)

  • 条件pendingIds.isEmpty() == false
  • 场景:之前有工具调用未完成,现在收到工具执行结果
  • 操作
    1. 验证工具结果并添加到记忆
    2. 根据是否还有待处理工具选择:
      • 仍有待处理:进入 acting 阶段继续执行工具
      • 无待处理:进入 executeIteration 开始新的推理轮次

设计意图

这种设计支持断点续传人机协同(HITL)

特性 实现方式
断点续传 通过 pendingIds 识别未完成的工具调用
状态恢复 validateAndAddToolResults 将外部工具结果注入记忆
灵活路由 根据状态动态选择 actingexecuteIteration
迭代控制 统一从第 0 轮开始计数,简化状态管理

实际应用场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 场景 1:正常调用(无待处理工具)
agent.call(userMsg)
→ pendingIds 为空
→ addToMemory + executeIteration(0)

// 场景 2:工具执行后恢复(有待处理工具)
// 第 1 步:Agent 调用搜索工具
agent.call(userMsg)
→ 返回:需要调用 SearchTool

// 第 2 步:外部执行工具,传入结果
agent.call(toolResultMsg)
→ pendingIds 不为空
→ validateAndAddToolResults
→ 如果还有工具:acting(0)
→ 否则:executeIteration(0)

那么就先从比较不那么重要的工具调用逻辑说起:acting方法。
其中需要先找到Memory中的工具,这里很有意思的一点在于会根据agent的名字来进行提取工具,可能是为了在observe的时候用于判断确实是当前agent的工具流吧。

1
2
3
4
5
6
if (msg.getRole() == MsgRole.ASSISTANT && msg.getName().equals(agentName)) {
List<ToolUseBlock> toolCalls = msg.getContentBlocks(ToolUseBlock.class);
if (!toolCalls.isEmpty()) {
return toolCalls;
}
}

基于工具调用的特点,是这样的:一个RoleAssistantToolUseBlockContent消息中,会带有多个ToolUse调用,不会出现有多个Msg需要解析的情况,因此,框架只会去取最后一条Assistant消息进行解析,若为ToolUse且其中存在未调用的工具,那么就执行,所以是一条ToolUseBlock后面跟着多个ToolResultBlock的这样的结构。
然后在经过上述的流程拿到还未执行的工具之后,就出现了这样一条看起来很有B格的代码:

1
2
3
// Forward tool chunks into ActingChunkEvent hooks without overwriting user callbacks.
toolkit.setInternalChunkCallback(
(toolUse, chunk) -> notifyActingChunk(toolUse, chunk).subscribe());

内部 Chunk 回调机制:流式工具执行的钩子桥接

这段代码的核心作用是将 Toolkit 内部的工具执行分块(chunk)事件转发到 Agent 的 Hook 系统,同时避免覆盖用户自定义的回调。

1️⃣ 什么是 Tool Chunk?

在某些场景下,工具执行不是一次性返回结果,而是流式输出多个数据块。例如:

  • 📡 网络请求:分块接收 HTTP 响应
  • 🗄️ 数据库查询:逐行返回大量数据
  • 📝 文件读取:分块读取大文件
  • 🔍 搜索引擎:分页返回搜索结果

示例

1
2
3
4
5
6
7
8
9
// 传统方式:一次性返回
ToolResultBlock result = searchTool.execute(query);

// 流式方式:分块返回
searchTool.executeStream(query)
→ chunk 1: {"results": [1-10]}
→ chunk 2: {"results": [11-20]}
→ chunk 3: {"results": [21-30]}
→ complete

2️⃣ 为什么需要 setInternalChunkCallback?

通俗理解:餐厅点餐系统的启示

想象一个餐厅的场景:

  • Toolkit = 厨房:有自己的工作流程,厨师做菜时会记录自己的日志(”开始切菜”、”开始炒菜”等)
  • Agent = 服务员:也需要知道菜的进度,以便告诉顾客,但不能破坏厨房的原有工作

问题背景

Toolkit 本身已经有自己的 chunk 处理逻辑(用于内部监控、日志等),但 Agent 层也需要感知这些 chunk 事件以触发 Hook。

数据流转示意图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
graph TB
subgraph Toolkit_Layer [Toolkit 层 - 厨房]
A[工具执行] --> B[产生 Chunk]
B --> C[Toolkit 内部回调]
C --> D[记录内部日志]
C --> E[更新内部状态]
end

subgraph Bridge_Layer [桥接层 - Internal Callback]
B --> F[Internal Chunk Callback]
F --> G[notifyActingChunk]
end

subgraph Agent_Layer [Agent 层 - 服务员]
G --> H[触发 ActingChunkEvent]
H --> I[Hook 1: 业务日志]
H --> J[Hook 2: WebSocket推送]
H --> K[Hook 3: 监控面板]
end

style Toolkit_Layer fill:#e1f5ff,stroke:#0288d1,stroke-width:2px,color:#000000
style Bridge_Layer fill:#fff3cd,stroke:#f57c00,stroke-width:2px,color:#000000
style Agent_Layer fill:#d4edda,stroke:#28a745,stroke-width:2px,color:#000000
style B fill:#ffeb3b,stroke:#f57c00,stroke-width:3px,color:#000000

流程说明

  1. Toolkit 层(蓝色区域):工具执行产生 chunk,内部回调处理自己的逻辑
  2. 桥接层(黄色区域):通过 setInternalChunkCallback 设置的回调被触发
  3. Agent 层(绿色区域):Hook 系统接收到事件,执行所有注册的监听器

关键特点

  • ✅ Toolkit 的内部逻辑不受影响(C → D, E)
  • ✅ Agent 层能实时感知 chunk 事件(F → G → H)
  • ✅ 两者并行工作,互不干扰

核心机制

  • Toolkit 产生 chunk 时,会同时通知多个回调(类似发布-订阅模式)
  • setInternalChunkCallback 注册的回调是额外的监听者
  • Toolkit 自己的内部回调继续工作,Agent 层的 Hook 也能收到通知
  • 两者并行执行,互不阻塞

📡 类比:广播系统

就像电台广播:

  • Toolkit = 电台:自己记录播出日志
  • Internal Callback = 新增的接收器:把信号转发给 Agent
  • Agent Hook = 听众:可以有很多人同时收听

电台不会因为多了几个听众就停止自己的工作,听众之间也互不影响。

🔧 技术实现本质

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// Toolkit 内部可能是这样的结构:
List<ChunkCallback> internalCallbacks = new ArrayList<>();

// setInternalChunkCallback 实际上是:
void setInternalChunkCallback(ChunkCallback callback) {
internalCallbacks.add(callback); // 追加到列表
}

// 产生 chunk 时:
void onChunkGenerated(Chunk chunk) {
// 遍历所有注册的回调,逐个通知
for (ChunkCallback callback : internalCallbacks) {
callback.onChunk(chunk); // 每个回调独立执行
}
}

设计挑战

1
2
3
4
5
6
7
// ❌ 错误做法:覆盖原有回调
// 就像服务员冲进厨房说:"以后你们只准向我汇报,不准记自己的日志!"
toolkit.setChunkCallback(myCallback); // ← 会替换掉 Toolkit 内部的回调

// ✅ 正确做法:追加内部回调
// 就像在厨房装了一个"副本摄像头",厨房继续记日志,服务员也能看到进度
toolkit.setInternalChunkCallback(myCallback); // ← 与原有回调并存

核心区别

方式 效果 后果
setChunkCallback 覆盖原有回调 🔥 破坏 Toolkit 的内部逻辑
setInternalChunkCallback 追加新回调 ✅ 两者互不干扰,协同工作

实际执行流程

当工具执行产生一个数据块时(比如搜索到第10条结果):

  1. Toolkit 内部:继续执行自己的处理逻辑(记录日志、更新状态等)
  2. Internal Callback:额外触发我们设置的回调
  3. Agent Hook 系统:执行所有注册的监听器
    • 可能记录业务日志
    • 可能发送 WebSocket 消息给前端
    • 可能更新实时监控面板

这种设计实现了多层架构中的事件传递,每一层都保持独立,通过”追加”而不是”覆盖”的方式实现协作。

3️⃣ 代码解析

1
2
3
toolkit.setInternalChunkCallback(
(toolUse, chunk) -> notifyActingChunk(toolUse, chunk).subscribe()
);

分解说明

部分 含义
setInternalChunkCallback 设置内部 chunk 回调(不覆盖用户回调)
(toolUse, chunk) -> ... Lambda 表达式,接收工具调用信息和数据块
notifyActingChunk(...) 通知 Agent 的 Hook 系统,触发 ActingChunkEvent
.subscribe() 订阅响应式流,确保 Hook 执行

执行流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sequenceDiagram
participant T as Toolkit
participant IC as InternalCallback
participant A as Agent
participant H as Hook System

T->>T: 工具执行中产生 chunk
T->>IC: 调用 internalChunkCallback
activate IC
IC->>A: notifyActingChunk(toolUse, chunk)
activate A
A->>H: 触发 ActingChunkEvent
activate H
H->>H: 执行所有注册的 Hook
H-->>A: 完成
deactivate H
A-->>IC: Mono 完成
deactivate A
IC-->>T: subscribe 完成
deactivate IC

4️⃣ ActingChunkEvent Hook 的作用

当工具执行产生 chunk 时,会触发 ActingChunkEvent,允许外部监听器:

应用场景

  1. 实时进度显示

    1
    2
    3
    4
    5
    6
    7
    agent.registerHook(new AgentHook() {
    @Override
    public Mono<Void> onActingChunk(ActingChunkEvent event) {
    System.out.println("工具执行进度: " + event.getChunk());
    return Mono.empty();
    }
    });
  2. 流式响应前端

    1
    2
    3
    4
    5
    6
    7
    8
    // WebSocket 推送工具执行的中间结果
    agent.registerHook(new AgentHook() {
    @Override
    public Mono<Void> onActingChunk(ActingChunkEvent event) {
    webSocketSession.send(event.getChunk());
    return Mono.empty();
    }
    });
  3. 实时监控与日志

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // 记录工具执行的详细过程
    agent.registerHook(new AgentHook() {
    @Override
    public Mono<Void> onActingChunk(ActingChunkEvent event) {
    logger.info("Tool {} chunk: {}",
    event.getToolUse().getName(),
    event.getChunk());
    return Mono.empty();
    }
    });

5️⃣ 为什么要 .subscribe()?

关键点notifyActingChunk 返回的是 Mono<Void>,这是一个冷 Observable,必须订阅才会执行。

1
2
3
4
5
// ❌ 错误:不会触发 Hook
(toolUse, chunk) -> notifyActingChunk(toolUse, chunk)

// ✅ 正确:订阅后才会执行
(toolUse, chunk) -> notifyActingChunk(toolUse, chunk).subscribe()

原因

  • Project Reactor 的 Mono/Flux 是惰性求值
  • 只有订阅后,链式调用才会真正执行
  • 这里使用”即发即弃”模式,不关心 Hook 的执行结果

6️⃣ 设计哲学:职责分离与扩展性

这种设计体现了 Agentscope 的核心思想:

层级 职责 实现
Toolkit 层 工具执行与内部管理 维护自己的 chunk 回调
Agent 层 业务逻辑与 Hook 系统 通过 internal callback 桥接
Hook 层 横切关注点(监控、日志等) 监听 ActingChunkEvent

优势

  • 解耦:Toolkit 不需要知道 Agent 的 Hook 系统
  • 可扩展:用户可以注册任意数量的 Hook
  • 向后兼容:不影响 Toolkit 的原有功能
  • 灵活性:Hook 可以是异步的(Mono)

7️⃣ 完整的数据流

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
graph LR
A[工具执行] --> B[产生 Chunk]
B --> C[Toolkit 内部处理]
B --> D[Internal Callback]
D --> E[notifyActingChunk]
E --> F[ActingChunkEvent]
F --> G[Hook 1: 日志]
F --> H[Hook 2: 监控]
F --> I[Hook 3: WebSocket]

style A fill:#e1f5ff
style F fill:#fff3cd
style G fill:#d4edda
style H fill:#d4edda
style I fill:#d4edda

8️⃣ 总结

这段看似简单的代码实际上是一个精巧的桥接模式

将底层的工具执行事件(Toolkit)与上层的业务钩子系统(Agent Hook)无缝连接,同时保持各层的独立性和可扩展性。

核心价值

  1. 非侵入式:不修改 Toolkit 的原有逻辑
  2. 响应式:支持异步 Hook 执行
  3. 可观测:让工具执行过程透明化
  4. 灵活扩展:用户可以自由添加监控逻辑

再回到工具调用的代码中来,省略工具调用的preHook不谈,就进入到工具具体调用的方法中:

1
2
3
4
5
6
7
8
9
private Mono<List<Map.Entry<ToolUseBlock, ToolResultBlock>>> executeToolCalls(
List<ToolUseBlock> toolCalls) {
return toolkit.callTools(toolCalls, toolExecutionConfig, this, toolExecutionContext)
.map(
results ->
IntStream.range(0, toolCalls.size())
.mapToObj(i -> Map.entry(toolCalls.get(i), results.get(i)))
.toList());
}

归纳起来工具调用有这么几个要素:工具 + 工具配置 + 工具上下文 + agent实例

概括成一句话的话就是:Agent实例根据工具配置结合工具上下文调用工具。最后得到的结果也是ToolUseToolResult一一对应的。

然后我们一个一个来说:

工具配置

  1. 超时时间(默认为5min)
  2. 最大尝试次数(默认为3次)
  3. 初始退避时间(初始为2s)
  4. 最大退避时间(最大为30s)
  5. 每次退避时间的乘子(2,即每次乘以2) => 2 * 2 * 2 * 2 … 最大达到最大退避时间后不动了
  6. 什么样的异常应当发生退避 Predicate<Throwable> => 默认根据字段的 RETRYABLE_ERRORS 配置的错误类型判断

工具上下文

一个比较简单的类结构

class ToolExecutionContext => private final List<ContextStore> stores;

但是在其中配置了stores必须是不可变的。
那么别的方法看了一圈乏善可陈,无非是对stores进行get/build/merge之类的。
那么只好进一步剖开这个ContextStore看看是什么了。

先摆一段类说明:

ContextStore 接口解析

这是工具执行上下文的存储层抽象接口,定义了上下文对象的存储契约。它支持两种检索模式:

  1. 仅按类型获取get(Class<T>) - 适用于单例场景
  2. 按键 + 类型获取get(String, Class<T>) - 适用于多实例场景

这种设计既能处理简单情况(一个 UserContext),也能处理复杂情况(不同用户的多个 UserContext 实例)。

实现方式

  • 简单的内存 Map 存储(DefaultContextStore
  • 自定义存储后端(Redis、数据库等)

使用示例

1
2
3
4
5
6
// 每种类型单个实例
DatabaseConfig config = store.get(DatabaseConfig.class);

// 同一类型的多个实例
UserContext admin = store.get("admin", UserContext.class);
UserContext guest = store.get("guest", UserContext.class);

设计意图

这个接口的核心价值在于解耦存储实现与业务逻辑

  • Agent 层不需要关心数据存在哪里(内存/Redis/数据库)
  • 可以根据实际需求灵活切换存储后端
  • 同时支持单例多实例两种访问模式,适应不同场景

从本质上看,这里的工具上下文实际上定义了一个名为ContextStore的类,它作为常规意义上的上下文对象存储在ToolExecutionContext中。可以将ContextStore视为AgentScope框架中的上下文管理器。值得注意的是,ToolExecutionContext本身并不关注具体的上下文内容,而是作为一个标识符存在,只要符合ContextStore结构规范的上下文数据都能被正确解析和处理。
然后便是工具的执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
return executor.executeAll(toolCalls, config.isParallel(), effectiveConfig, agent, agentContext);
↓↓↓
// Map each tool call to an execution Mono
List<Mono<ToolResultBlock>> monos =
toolCalls.stream()
.map(
toolCall ->
executeWithInfrastructure(
toolCall, executionConfig, agent, agentContext))
.toList();
// Parallel or sequential execution
if (parallel) {
return Flux.mergeSequential(monos).collectList();
}
return Flux.concat(monos).collectList();

总之先发现:工具调用既可以并行也可以串行!
那么再总之,先看看 executeWithInfrastructure 做了什么。

构建参数并执行

1
2
3
4
5
6
7
8
9
10
// Build tool call parameter
ToolCallParam param =
ToolCallParam.builder()
.toolUseBlock(toolCall)
.agent(agent)
.context(agentContext)
.build();

// Get core execution
Mono<ToolResultBlock> execution = execute(param);

其中主要是进行了工具有效性的校验,然后工具Input信息的merge

  • 优先级:工具默认参数 < 从ToolCallParam传入的Input参数(默认为空) < 模型自身给出的Input参数(从ToolUseBlock中获取)

然后创建出最终的Param,最后返回一个Mono

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
return tool.callAsync(executionParam)
.onErrorResume(
ToolSuspendException.class,
e -> {
// Convert ToolSuspendException to suspended result
logger.debug(
"Tool '{}' suspended: {}",
toolCall.getName(),
e.getReason() != null ? e.getReason() : "no reason");
return Mono.just(ToolResultBlock.suspended(toolCall, e));
})
.onErrorResume(
e -> {
String errorMsg =
e.getMessage() != null
? e.getMessage()
: e.getClass().getSimpleName();
return Mono.just(
ToolResultBlock.error("Tool execution failed: " + errorMsg));
});

其他其实看个意思,看看这个suspend是怎么个事儿:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 从 ToolSuspendException 创建挂起的工具结果。
*
* <p>此方法由框架使用,用于将 {@link ToolSuspendException}
* 转换为挂起的结果,该结果将返回给用户以进行外部执行。
*
* @param toolUse 触发异常的工具使用块
* @param exception 工具抛出的异常
* @return 一个挂起的 ToolResultBlock
*/
public static ToolResultBlock suspended(ToolUseBlock toolUse, ToolSuspendException exception) {
String content =
exception.getReason() != null
? exception.getReason()
: "[Awaiting external execution]";
return new ToolResultBlock(
toolUse.getId(),
toolUse.getName(),
List.of(TextBlock.builder().text(content).build()),
Map.of(METADATA_SUSPENDED, true));
}

不过暂时还不知道做什么用的,之后在具体的Msg的时候再看看,这里我也先suspend了。

那么到了这里之后,工具的调用就已经封装为了Mono<ToolResultBlock>,其实主要的工具调用工作就已经完成了,但是从框架中发现还有很多收尾的措施:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// --------------- 应用基础设施层(调度、超时、重试)到工具执行流 ---------------

// 1. 应用线程调度策略:指定执行流在哪个线程池上运行
execution = applyScheduling(execution);

/**
* 为执行流配置线程调度器
* 作用:通过 subscribeOn 决定 Mono 订阅逻辑所在的线程池
*
* @param execution 原始工具执行流 Mono
* @return 绑定了调度器的新 Mono
*/
private Mono<ToolResultBlock> applyScheduling(Mono<ToolResultBlock> execution) {
// 分支1:未配置自定义线程池 → 使用 Reactor 内置的 boundedElastic 调度器
// boundedElastic 适用于阻塞 IO 场景,会按需创建线程池且有边界,防止资源耗尽
if (executorService == null) {
return execution.subscribeOn(Schedulers.boundedElastic());
}
// 分支2:已配置自定义线程池 → 将其包装为 Reactor Scheduler 使用
return execution.subscribeOn(Schedulers.fromExecutor(executorService));
}

// 2. 应用超时控制:限制工具执行的最大耗时
execution = applyTimeout(execution, executionConfig, toolCall);

/**
* 为执行流配置超时机制
* 作用:若工具执行超过指定时间,主动终止并抛出超时异常
*
* @param execution 原始工具执行流 Mono
* @param config 执行配置(含超时时间等参数)
* @param toolCall 当前工具调用信息(用于日志追踪)
* @return 绑定了超时控制的新 Mono
*/
private Mono<ToolResultBlock> applyTimeout(
Mono<ToolResultBlock> execution, ExecutionConfig config, ToolUseBlock toolCall) {
// 无有效超时配置 → 直接返回原始流,不应用超时
if (config == null || config.getTimeout() == null) {
return execution;
}

// 从配置中提取超时时间
Duration timeout = config.getTimeout();
// 记录调试日志:明确为哪个工具、应用了多久的超时
logger.debug("Applied timeout: {} for tool: {}", timeout, toolCall.getName());

// 应用 timeout 操作符:超时后触发自定义异常
return execution.timeout(
timeout,
Mono.error(new RuntimeException("Tool execution timeout after " + timeout)));
}

// 3. 应用重试机制:执行失败时按策略自动重试
execution = applyRetry(execution, executionConfig, toolCall);

/**
* 为执行流配置重试策略
* 作用:当工具执行异常时,根据退避策略自动重试,提升成功率
*
* @param execution 原始工具执行流 Mono
* @param config 执行配置(含重试次数、退避时间等参数)
* @param toolCall 当前工具调用信息(用于日志追踪)
* @return 绑定了重试机制的新 Mono
*/
private Mono<ToolResultBlock> applyRetry(
Mono<ToolResultBlock> execution, ExecutionConfig config, ToolUseBlock toolCall) {
// 检查是否需要重试:无配置、重试次数≤1(仅执行1次,不重试) → 直接返回
if (config == null || config.getMaxAttempts() == null || config.getMaxAttempts() <= 1) {
return execution;
}

// --------------- 提取/初始化重试配置参数 ---------------
Integer maxAttempts = config.getMaxAttempts(); // 总尝试次数(含首次执行)
// 初始退避时间(第一次重试前等待),未配置则默认 1 秒
Duration initialBackoff =
config.getInitialBackoff() != null
? config.getInitialBackoff()
: Duration.ofSeconds(1);
// 最大退避时间(重试等待上限),未配置则默认 10 秒
Duration maxBackoff =
config.getMaxBackoff() != null ? config.getMaxBackoff() : Duration.ofSeconds(10);
// 重试异常谓词(判断哪些异常需要重试),未配置则默认所有异常都重试
Predicate<Throwable> retryOn =
config.getRetryOn() != null ? config.getRetryOn() : error -> true;

// --------------- 构建 Reactor Retry 策略 ---------------
Retry retrySpec =
Retry.backoff(maxAttempts - 1, initialBackoff) // 参数1:重试次数(总尝试-1);参数2:初始退避
.maxBackoff(maxBackoff) // 退避时间上限,避免无限等待
.jitter(0.5) // 抖动因子(±50%随机),防止大量请求同时重试造成雪崩
.filter(retryOn) // 应用重试条件:仅满足谓词的异常才重试
.doBeforeRetry( // 重试前回调:记录警告日志
signal ->
logger.warn(
"Retrying tool call (attempt {}/{}) due to: {}",
signal.totalRetriesInARow() + 1, // 当前重试次数(从1开始)
maxAttempts - 1, // 总重试次数
signal.failure().getMessage(), // 异常信息
signal.failure())); // 异常对象(打印堆栈)

// 记录调试日志:明确为哪个工具、应用了怎样的重试配置
logger.debug(
"Applied retry config: maxAttempts={} for tool: {}",
maxAttempts,
toolCall.getName());

// 应用 retryWhen 操作符:将重试策略绑定到执行流
return execution.retryWhen(retrySpec);
}

那如上的一大堆事情结束之后,终于又回到了之前一开始的acting(0)的代码中,真是饶了好大好大的一圈。

在收集到了上述的整个工具调度MonoList之后,就会开始分配是否挂起suspend.

第一步:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
 // Separate success and pending results
List<Map.Entry<ToolUseBlock, ToolResultBlock>> successPairs =
results.stream()
.filter(e -> !e.getValue().isSuspended())
.toList();
List<Map.Entry<ToolUseBlock, ToolResultBlock>> pendingPairs =
results.stream()
.filter(e -> e.getValue().isSuspended())
.toList();

// If no success results to process
if (successPairs.isEmpty()) {
if (!pendingPairs.isEmpty()) {
// 如果没有成功调用的但是存在挂起的工具消息
return Mono.just(buildSuspendedMsg(pendingPairs));
}
// 说明没有需要进行处理的工具消息,直接走推理流程
return executeIteration(iter + 1);
}

// 处理挂起的工具消息
private Msg buildSuspendedMsg(List<Map.Entry<ToolUseBlock, ToolResultBlock>> pendingPairs) {
List<ContentBlock> content = new ArrayList<>();
for (Map.Entry<ToolUseBlock, ToolResultBlock> pair : pendingPairs) {
content.add(pair.getKey());
content.add(pair.getValue());
}
return Msg.builder()
.name(getName())
.role(MsgRole.ASSISTANT)
.content(content)
.generateReason(GenerateReason.TOOL_SUSPENDED) /** Tool execution was suspended, waiting for user to provide results. */
.build();
}

这个挂起消息在Msg类里了,之后单独看看,先略过。然后看如果有需要进行处理的工具消息的话:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Process success results through hooks and add to memory
return Flux.fromIterable(successPairs)
.concatMap(this::notifyPostActingHook)
.last()
.flatMap(
event -> {
// HITL stop (also triggered by
// StructuredOutputHook when completed)
if (event.isStopRequested()) {
return Mono.just(
event.getToolResultMsg()
.withGenerateReason(
/** Acting phase was stopped by a Hook (PostActingEvent.stopAgent()). */
GenerateReason
.ACTING_STOP_REQUESTED));
}

// If there are pending results, build suspended Msg
if (!pendingPairs.isEmpty()) {
return Mono.just(
buildSuspendedMsg(pendingPairs));
}

// Continue next iteration
return executeIteration(iter + 1);
});

和上面的结合一下其实还算比较简单,接着看具体的推理过程!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
private Mono<Msg> reasoning(int iter, boolean ignoreMaxIters) {
// Check maxIters unless ignoreMaxIters is set
if (!ignoreMaxIters && iter >= maxIters) {
return summarizing();
}

ReasoningContext context = new ReasoningContext(getName());

return checkInterruptedAsync()
.then(notifyPreReasoningEvent(prepareMessages()))
.flatMapMany(
event -> {
GenerateOptions options =
event.getEffectiveGenerateOptions() != null
? event.getEffectiveGenerateOptions()
: buildGenerateOptions();
return model.stream(
event.getInputMessages(),
toolkit.getToolSchemas(),
options)
.concatMap(chunk -> checkInterruptedAsync().thenReturn(chunk));
})
.doOnNext(
chunk -> {
List<Msg> chunkMsgs = context.processChunk(chunk);
// Notify streaming hooks for each chunk message
for (Msg msg : chunkMsgs) {
notifyReasoningChunk(msg, context).subscribe();
}
})
.then(Mono.defer(() -> Mono.justOrEmpty(context.buildFinalMessage())))
.onErrorResume(
InterruptedException.class,
error -> {
// Save accumulated message before propagating interrupt
Msg msg = context.buildFinalMessage();
if (msg != null) {
memory.addMessage(msg);
}
return Mono.error(error);
})
.flatMap(this::notifyPostReasoning)
.flatMap(
event -> {
Msg msg = event.getReasoningMessage();
if (msg != null) {
memory.addMessage(msg);
}

// HITL stop
if (event.isStopRequested()) {
return Mono.just(
msg.withGenerateReason(
GenerateReason.REASONING_STOP_REQUESTED));
}

// gotoReasoning requested (e.g., by StructuredOutputHook)
if (event.isGotoReasoningRequested()) {
// Validation already done in PostReasoningEvent.gotoReasoning()
List<Msg> gotoMsgs = event.getGotoReasoningMsgs();
if (gotoMsgs != null) {
gotoMsgs.forEach(memory::addMessage);
}
// Continue to next iteration, ignoring maxIters for this entry
return reasoning(iter + 1, true);
}

// Check finish conditions
if (isFinished(msg)) {
return Mono.just(msg);
}

// Continue to acting
return checkInterruptedAsync().then(acting(iter));
})
.switchIfEmpty(
Mono.defer(
() -> {
// No message was produced
return Mono.justOrEmpty((Msg) null);
}));
}
  1. 若轮数超出最大轮数,进行总结直接返回

    这里其实比较简单,是通过一段prompt指引大模型进行总结的,来看看框架里的prompt长什么样。

    You have failed to generate response within the maximum iterations. Now respond directly by summarizing the current situation.

    可以说在众多的注释里显得非常通俗易懂了,连我都可以直接看懂。(

    如果说没有达到最大轮数的话,那么就进入正常的推理环节。依旧跳过各种preHook

  2. 调用模型的stream方法进行请求获取模型响应chunks

  3. 判断是否存在HITL(Human-in-the-Loop) stop或者gotoReasoningRequested

  4. 无剩余工具调用则返回结果或调用acting处理剩余的toolUse

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
flowchart TD
Start([开始]) --> CheckInterrupt1{检查中断?}
CheckInterrupt1 -->|是| InterruptEnd([中断退出])
CheckInterrupt1 -->|否| NotifyPre[通知PreReasoningEvent]

NotifyPre --> BuildOptions{获取GenerateOptions?}
BuildOptions -->|event中有| UseEventOpts[使用event中的配置]
BuildOptions -->|event中无| BuildNewOpts[构建新配置]

UseEventOpts --> ModelStream
BuildNewOpts --> ModelStream

ModelStream[调用model.stream] --> CheckInterruptChunk{检查中断?}
CheckInterruptChunk -->|是| InterruptEnd
CheckInterruptChunk -->|否| ProcessChunk[处理Chunk数据]
ProcessChunk --> NotifyChunk[通知ReasoningChunk]
NotifyChunk --> HasMoreChunks{还有更多Chunk?}
HasMoreChunks -->|是| ModelStream

HasMoreChunks -->|否| BuildFinalMsg{构建最终消息?}
BuildFinalMsg -->|有内容| GetFinalMsg[获取最终消息]
BuildFinalMsg -->|为空| SwitchEmpty[返回null]

GetFinalMsg --> HandleInterruptError{处理中断异常?}
HandleInterruptError -->|捕获异常| SaveAndError[保存消息并报错]
SaveAndError --> InterruptEnd

HandleInterruptError -->|无异常| NotifyPost[通知PostReasoningEvent]

NotifyPost --> HitlCheck{用户请求停止?}
HitlCheck -->|是| ReturnStopMsg[返回停止消息]

HitlCheck -->|否| GotoCheck{请求跳转推理?}
GotoCheck -->|是| AddGotoMsgs[添加消息到memory]
AddGotoMsgs --> RecurseReasoning[递归调用reasoning]
RecurseReasoning --> CheckInterrupt1

GotoCheck -->|否| FinishCheck{推理是否完成?}
FinishCheck -->|是| ReturnFinishMsg[返回最终消息]

FinishCheck -->|否| CheckInterrupt2{检查中断?}
CheckInterrupt2 -->|是| InterruptEnd
CheckInterrupt2 -->|否| GoToActing[进入Acting阶段]

ReturnStopMsg --> End([结束])
ReturnFinishMsg --> End
GoToActing --> End
SwitchEmpty --> End

问了一圈AI,发现应该框架还是主要依赖于基模本身的模型思考能力,来建立整个ReAct架构的

1
Reasoning => Acting