今天从模板中给定的生成一个agent的代码出发,看看在创建一个ReActAgent实例的过程中发生了什么。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import io.agentscope.core.ReActAgent;
import io.agentscope.core.message.Msg;
import io.agentscope.core.model.DashScopeChatModel;
import io.agentscope.core.tool.Toolkit;
import io.agentscope.core.tool.Tool;
import io.agentscope.core.tool.ToolParam;

public class QuickStart {
public static void main(String[] args) {
// 准备工具
Toolkit toolkit = new Toolkit();
toolkit.registerTool(new SimpleTools());

// 创建智能体
ReActAgent jarvis = ReActAgent.builder()
.name("Jarvis")
.sysPrompt("你是一个名为 Jarvis 的助手")
.model(DashScopeChatModel.builder()
.apiKey(System.getenv("DASHSCOPE_API_KEY"))
.modelName("qwen3-max")
.build())
.toolkit(toolkit)
.build();

// 发送消息
Msg msg = Msg.builder()
.textContent("你好!Jarvis,现在几点了?")
.build();

Msg response = jarvis.call(msg).block();
System.out.println(response.getTextContent());
}
}

// 工具类
class SimpleTools {
@Tool(name = "get_time", description = "获取当前时间")
public String getTime(
@ToolParam(name = "zone", description = "时区,例如:北京") String zone) {
return java.time.LocalDateTime.now()
.format(java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
}

ReActAgent 的类族关系

1
2
3
4
5
6
7
8
classDiagram
ReActAgent --|> StructuredOutputCapableAgent : extends
StructuredOutputCapableAgent --|> AgentBase : extends
AgentBase ..|> StateModule : implements
AgentBase ..|> Agent : implements
Agent --|> CallableAgent : extends
Agent --|> StreamableAgent : extends
Agent --|> ObservableAgent : extends

个人认为可能还是需要从自顶向下来看,先从抽象,再到具体。

CallableAgent

整个CallableAgent接口中,只有一种方法

1
default Mono<Msg> call(xxx){}

Mono 类的作用:
Mono 是 Reactor 框架中的核心类型之一,代表一个异步计算的结果。它具有以下特点:

  • 响应式编程模型Mono<T>表示一个最多发射一个数据项的异步序列(0 或 1 个结果)
  • 非阻塞:与传统的 Future不同,Mono 是非阻塞的,不会等待结果完成
  • 惰性求值:只有在订阅(subscribe)时才会执行实际的操作
  • 丰富的操作符:支持 mapflatMapfilter 等操作符进行链式调用
  • 背压支持:天然支持背压(backpressure),可以控制数据流的处理速度

在 Agentscope 中,call方法返回 Mono<Msg>,意味着智能体的调用是异步非阻塞的,调用者可以通过 .block() 同步等待结果,或者通过 .subscribe() 进行异步订阅。

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 方式一:使用 .block() 同步阻塞获取结果
// 适用于简单场景或主线程中,会阻塞当前线程直到结果返回
try {
Msg response = jarvis.call(msg).block();
System.out.println("同步响应:" + response.getTextContent());
} catch (Exception e) {
e.printStackTrace();
}

// 方式二:使用 .subscribe() 异步非阻塞获取结果
// 适用于响应式编程,不会阻塞当前线程,结果就绪时会自动回调
jarvis.call(msg)
.subscribe(
response -> {
// onNext: 收到数据时的处理
System.out.println("异步响应:" + response.getTextContent());
},
error -> {
// onError: 发生错误时的处理
System.err.println("发生错误:" + error.getMessage());
},
() -> {
// onComplete: 流完成时的处理(可选)
System.out.println("调用完成");
}
);

// 注意:使用 subscribe 后,主线程需要保持运行才能接收到异步结果
// 在实际应用中,通常需要结合其他响应式操作符或等待机制

两种方式的选择:

  • .block():适合简单的命令行程序、测试用例或需要同步等待结果的场景
  • .subscribe():适合高并发的响应式应用,可以充分利用非阻塞特性

代码中给出的类说明:

1
2
3
4
5
6
7
8
9
10
11
/**
* 可调用处理消息的智能体接口。
*
* <p>该接口定义了智能体的核心调用能力,包括:
* <ul>
* <li>通过 {@link #call(List)} 进行基础消息处理</li>
* <li>通过 {@link #call(List, Class)} 和 {@link #call(List, JsonNode)} 生成结构化输出</li>
* </ul>
*
* <p>提供了便捷方法的默认实现,这些方法会委托给核心的 {@link #call(List)} 方法。
*/

个人分析: 该接口将基于 Mono<Msg>的异步调用能力进行了封装,与下文的StreamableAgent(流式)、ObservableAgent(可观察)等接口设计理念一致,均遵循单一职责原则——每个接口只负责一种特定的交互模式,使得智能体可以灵活组合不同的能力。

异步调用 vs 流式的区别:

两者的核心区别在于数据返回的方式

特性 异步调用 (CallableAgent) 流式输出 (StreamableAgent)
返回形式 一次性返回完整结果 分块(chunk)逐步返回结果
等待时间 需要等待全部生成完成 可以立即看到部分结果
用户体验 较长等待后突然显示全部内容 类似打字机效果,逐字/逐句显示
适用场景 短文本、需要完整上下文处理 长文本生成、实时展示需求
网络传输 单次响应,数据包较大 多次小数据包,降低首字节延迟

举例说明:

假设 AI 要生成一段 100 字的回复:

  • 异步调用:用户等待 3 秒后,一次性收到完整的 100 字回复
  • 流式输出:用户在 0.3 秒后开始看到文字,然后每 100ms 收到几个字,像打字机一样逐步显示完整内容

技术实现差异:

1
2
3
4
5
6
7
// CallableAgent - 返回单个 Mono<Msg>
Mono<Msg> response = agent.call(msg);
// 结果:要么没有,要么是一个完整的 Msg 对象

// StreamableAgent - 返回 Flux<Msg>(多个数据项的流)
Flux<Msg> stream = agent.stream(msg);
// 结果:可能收到多个 Msg,每个包含一部分内容(如一个 token)

在 Agentscope 中,Flux是 Reactor 中表示0 到 N 个数据项的响应式类型,适合流式场景。

StreamableAgent

与上述Mono<T>类似,StreamableAgent中主要包装了Flux<T>类型的方法。

1
Flux<Event> stream(xxx) {}

Flux<T>Mono<T>在上面分析异步调用的时候已经基本OK了,但其实这里还需要了解一下这个Event是什么事件。

1
2
3
4
5
public class Event {
private final EventType type;
private final Msg message;
private final boolean isLast;
}

messageisLast比较简单了,这里的 EventType看看包含哪些类型。

EventType 枚举类型说明:

枚举值 用途说明
REASONING 推理事件:智能体思考和规划阶段,包含文本、思考块或工具调用请求
TOOL_RESULT 工具执行结果:记录工具调用的输出结果,支持长时间运行工具的流式输出
HINT 提示信息:来自 RAG、记忆或规划系统的上下文信息,通常为完整的检索内容
AGENT_RESULT 最终结果:智能体的完整响应,默认不包含在流中以避免与返回值重复
SUMMARY 摘要事件:达到最大迭代次数时生成的执行情况摘要
ALL 特殊值:用于订阅所有事件类型(AGENT_RESULT 除外),不过滤任何事件

这些事件类型覆盖了智能体从思考→行动→结果的完整生命周期,便于开发者监控和调试。也主要是用于进行输出判断,一般只输出REASONING中的内容。

ObservableAgent

看名字可以知道是为了 Agent 的可观察性,那么问题来了,何为 Agent 的可观察性。
于是先来看看类说明

ObservableAgent 接口说明:
这是一个支持观察消息但不生成回复的智能体接口。该接口使智能体能够接收和处理来自其他智能体或环境的消息而无需做出响应。它通常用于多智能体协作场景,让智能体可以感知彼此的行为。
典型使用场景包括:

  • 被动监控对话流程
  • 在多智能体系统中构建共享上下文
  • 在智能体流水线中实现观察者模式

理解与分析:

  1. 与 Java 守护线程的区别ObservableAgent 更像是一个被动的监听者/观察者,而不是守护线程。守护线程是在后台默默提供服务(如垃圾回收),而 ObservableAgent 是主动订阅和接收其他智能体的消息,只是不回复而已。

  2. 共享上下文的重要性:在多智能体系统中,ObservableAgent 确实扮演着关键角色:

    • 状态同步:让某些智能体了解整体协作进展
    • 信息传递:作为中间层收集和分发信息
    • 监控审计:记录智能体交互过程,便于调试和分析
    • 决策支持:基于观察到的全局信息做出更智能的决策
  3. 在 AI Agent 开发中的价值:是构建复杂多智能体系统的重要机制,特别是在以下场景:

    • 需要透明化智能体决策过程
    • 实现分层协作(部分智能体负责执行,部分负责协调)
    • 构建可追溯的对话历史和工作流

因此,掌握 ObservableAgent 的设计思想对于开发企业级、生产级的多智能体应用非常重要。

进一步理解:与旁路消息队列的类比

ObservableAgent旁路消息队列有很多相似之处:

特性 旁路消息队列 ObservableAgent
订阅模式 订阅各个 Topic/Channel 的消息 订阅其他智能体发出的消息
非侵入性 不影响主流程的执行 不回复消息,不影响对话流程
信息共享 在多个消费者之间共享数据 在多 Agent 系统中共享上下文
异步处理 异步接收和处理消息 基于响应式的 Flux/Mono 模型
解耦 生产者和消费者解耦 观察者和被观察者解耦

关键区别:

  • 消息队列是基础设施层的概念,而 ObservableAgent智能体协作层的设计
  • ObservableAgent 不仅仅是被动接收,还可以选择性关注特定类型的消息
  • 它具备语义理解能力,可以对观察到的内容进行一定程度的处理和转换

所以可以把它理解为:一个具有 AI 能力的、智能化的旁路消息监听器,它在多 Agent 系统中扮演着情报收集者信息中转站的角色。

Agent

Agent接口则是实现了以上的三个接口,那么也就意味着一个普通的实现了Agent接口的实例类同时具有以下特点:可异步调用、可流式调用、可观测!
而在Agent接口本身也不可能什么都不写,它主要是定义了一个Agent本身的信息:
name, description, agentId
而在此之外,有一个比较有意思的方法也被定义出来:interrupt

interrupt 方法说明:

1
2
3
4
5
6
7
/**
* 中断当前的智能体执行。
* 该方法会设置一个中断标志,智能体会在执行过程中的适当检查点检查此标志。
* 中断是协作式的,可能不会立即生效。
* @param msg 可选,与中断关联的用户消息
*/
void interrupt();

设计亮点:

  • 协作式中断:不是强制立即停止,而是在”适当的检查点”检查中断标志,这给了智能体优雅退出的机会
  • 支持中断原因:可以通过 Msg 传递中断的原因或上下文信息
  • 响应式设计:结合前面的 Mono/Flux模型,中断操作也是非阻塞的

    Maybe 可以用在检测到Agent说了什么不该说的时候,赶紧interrupt捂嘴(

总结:Agent 接口的三大核心特性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
flowchart TB
subgraph AgentCore["Agent 接口核心能力"]
direction TB

CA["CallableAgent<br/>异步调用能力"] --> |"返回 Mono<Msg>"| AgentI["Agent 接口"]
SA["StreamableAgent<br/>流式输出能力"] --> |"返回 Flux<Event>"| AgentI
OA["ObservableAgent<br/>可观测能力"] --> |"消息监听"| AgentI
end

subgraph Features["特性说明"]
direction LR
F1["📞 异步调用<br/>Mono&lt;Msg&gt;<br/>一次性完整响应"]
F2["📺 流式输出<br/>Flux&lt;Event&gt;<br/>逐步返回事件"]
F3["👁️ 可观测<br/>被动监听<br/>共享上下文"]
end

AgentI -.-> F1
AgentI -.-> F2
AgentI -.-> F3

style CA fill:#e1f5ff,color:#000000
style SA fill:#fff4e1,color:#000000
style OA fill:#f0e1ff,color:#000000
style AgentI fill:#ffe1e1,stroke-width:2px,color:#000000

因此归纳一下Agent的能力

能力 继承自 返回类型 特点 适用场景
异步调用 CallableAgent Mono<Msg> 一次性返回完整结果 短文本、需要完整上下文
流式输出 StreamableAgent Flux<Event> 分块逐步返回事件 长文本生成、实时展示
可观测 ObservableAgent Mono<Void> 消息监听, 被动接收不回复 多智能体协作、监控审计
可中断 self void 支持智能体的优雅退出
Agent属性定义 self 字段配置 name, agentId, description

因此,一个实现了 Agent 接口的智能体实例天然就具备了这三种交互模式,可以根据实际需求灵活选择使用方式。

StateModule

往下走可以发现到了与Agent并列的另一个接口,StateModule
通过对AgentScope的阶段性使用,看到接口内定义的方法基本可以知道是做什么用的。

1
2
3
default void saveTo(Session session, SessionKey sessionKey);
default void loadFrom(Session session, SessionKey sessionKey);
default boolean loadIfExists(Session session, SessionKey sessionKey);

简单来说,就是会话短期记忆 => 数据库长期记忆的转换过程(写入、读取)
这个后面看到Session部分可以再展开,感觉现在到这也就够了。

AgentBase

OKOK,那么很快就到了这个 AgentBase,为什么要取一个这样的名字呢,这样的话为什么不直接集成到 Agent 接口里。
容我康康。 XD.
一打开就看到了一大串的类说明注释啊。

AgentBase 类说明:

定义: AgentScope 框架中所有智能体的抽象基类。

核心功能: 提供智能体的通用功能,包括基础钩子集成、MsgHub 订阅者管理、中断处理、追踪以及通过 StateModule 实现的状态管理。注意: 它不负责内存管理——这是具体智能体实现(如 ReActAgent)的职责。

设计哲学:

  • 职责分离:AgentBase 提供基础设施支持(钩子、订阅、中断、状态),但不包含业务逻辑
  • 内存管理委托:将内存管理职责交给需要它的具体智能体(例如 ReActAgent)
  • 状态管理:通过实现 StateModule 接口来管理状态
  • 中断机制:使用响应式模式,子类在适当的检查点调用 checkInterruptedAsync(),通过 Mono 链传播 InterruptedException
  • 观察模式:智能体可以接收消息而无需生成回复

线程安全性:
智能体实例并非为并发执行而设计。单个智能体实例不应从多个线程同时调用(例如同时调用 call()stream())。hooks 列表是可变的,在流式操作期间修改时没有同步保护,这只有在每个智能体实例单线程执行的情况下才是安全的。

中断机制详解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 外部调用中断
agent.interrupt(userMsg);
// 在智能体的 Mono 链内部,在检查点处:
return checkInterruptedAsync()
.then(doWork())
.flatMap(result -> checkInterruptedAsync().thenReturn(result));

// AgentBase.call() 捕获异常:
.onErrorResume(error -> {
if (error instanceof InterruptedException) {
return handleInterrupt(context, msg);
}
...
});

为什么单独设计 AgentBase 抽象类?

  1. 接口与实现的分离Agent 接口定义能力契约,AgentBase 提供通用实现
  2. 避免重复代码:将基础设施相关的通用逻辑集中到基类
  3. 灵活性:允许不同的智能体实现选择不同的内存管理策略
  4. 关注点分离:基础设施 vs 业务逻辑的清晰划分

简单来说就是以上这样,但是考虑了一下我决定另起一篇来研究。TODO ITEM ++。