文章目录
- 1. 概述
- 2. 自定义 Middleware
- 2.1 实现 MiddlewareBase
- 2.2 配置到 ReActAgent 中
- 3. 常用实现
- 3.1 记录模型调用耗时
- 3.2 模型调用限速
- 3.3 动态系统提示词
- 3.4 切换备用模型
1. 概述
Middleware是AgentScope 2.0的扩展机制,能在不修改Agent或Model代码的前提下,向执行流程的关键位置注入自定义逻辑。
两种模式:
| 类型 | 模式 | 涉及的 hook |
|---|---|---|
| Onion(洋葱) | 包裹下一层,可在next.apply(input)前后插入逻辑 | onAgent/onReasoning/onActing/onModelCall |
| Transformer(变换) | 串行接力,前一个输出作为后一个输入 | onSystemPrompt |
执行流程:
onAgent └── 每一轮 ReAct: ├── onReasoning │ ├── onSystemPrompt(Transformer 链,从左到右) │ └── onModelCall └── onActing2. 自定义 Middleware
2.1 实现 MiddlewareBase
实现MiddlewareBase接口,按需重写hook方法。未重写的hook默认透传,无调用开销。
五个hook的Input类型:
| Hook | Input 类型 | 可访问内容 |
|---|---|---|
onAgent | AgentInput | msgs()— 输入消息列表 |
onReasoning | ReasoningInput | messages()+tools()+options() |
onActing | ActingInput | toolCalls()— 待执行的 ToolUseBlock 列表 |
onModelCall | ModelCallInput | messages()+tools()+options()+model() |
onSystemPrompt | String | 当前 prompt,返回新 prompt |
示例:同时观察四个洋葱位置 +SystemPrompt
importio.agentscope.core.agent.Agent;importio.agentscope.core.event.AgentEvent;importio.agentscope.core.middleware.*;importjava.util.function.Function;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;publicclassFullObservabilityMiddlewareimplementsMiddlewareBase{@OverridepublicFlux<AgentEvent>onAgent(Agentagent,AgentInputinput,Function<AgentInput,Flux<AgentEvent>>next){System.out.println("[agent] start for "+agent.getName());returnnext.apply(input).doOnComplete(()->System.out.println("[agent] end for "+agent.getName()));}@OverridepublicFlux<AgentEvent>onReasoning(Agentagent,ReasoningInputinput,Function<ReasoningInput,Flux<AgentEvent>>next){System.out.println("[reasoning] start");returnnext.apply(input).doOnComplete(()->System.out.println("[reasoning] end"));}@OverridepublicFlux<AgentEvent>onModelCall(Agentagent,ModelCallInputinput,Function<ModelCallInput,Flux<AgentEvent>>next){System.out.println("[model_call] "+input.model().getClass().getSimpleName());returnnext.apply(input).doOnComplete(()->System.out.println("[model_call] done"));}@OverridepublicMono<String>onSystemPrompt(Agentagent,StringcurrentPrompt){System.out.println("[system_prompt] length="+currentPrompt.length());returnMono.just(currentPrompt);}}所有hook将Agent作为首个参数,通过agent.getRuntimeContext()获取当前调用的RuntimeContext,可读写会话属性,也可写入key供下游hook和tool使用。
publicclassRequestContextMiddlewareimplementsMiddlewareBase{@OverridepublicFlux<AgentEvent>onAgent(Agentagent,AgentInputinput,Function<AgentInput,Flux<AgentEvent>>next){RuntimeContextrc=agent.getRuntimeContext();if(rc!=null){System.out.printf("[req] user=%s session=%s%n",rc.getUserId(),rc.getSessionId());rc.put("trace_id",java.util.UUID.randomUUID().toString());}returnnext.apply(input);}}注意:不要把请求级状态缓存到
middleware实例字段——一个middleware实例通常被多个Agent/call复用。应放入RuntimeContext或使用ReactorcontextWrite。
2.2 配置到 ReActAgent 中
ReActAgentagent=ReActAgent.builder().name("my-agent").model(model).toolkit(toolkit).middlewares(List.of(newFullObservabilityMiddleware(),newRequestContextMiddleware())).build();.middleware(...)(单数)— 追加一个.middlewares(List)— 批量追加
Onion执行顺序(列表中第一个middleware处于最外层):
mw1 前 → mw2 前 → 内部逻辑 → mw2 后 → mw1 后对于流式事件:内层middleware先看到emit。
Transformer执行顺序(onSystemPrompt):
原始 prompt → mw1.onSystemPrompt() → mw2.onSystemPrompt() → 最终 prompt3. 常用实现
3.1 记录模型调用耗时
在onModelCall前后记录时间,通过doFinally输出耗时。适用于排查慢请求或做性能基线。
importio.agentscope.core.agent.Agent;importio.agentscope.core.event.AgentEvent;importio.agentscope.core.middleware.MiddlewareBase;importio.agentscope.core.middleware.ModelCallInput;importjava.util.function.Function;importreactor.core.publisher.Flux;publicclassTimingMiddlewareimplementsMiddlewareBase{@OverridepublicFlux<AgentEvent>onModelCall(Agentagent,ModelCallInputinput,Function<ModelCallInput,Flux<AgentEvent>>next){longstart=System.nanoTime();returnnext.apply(input).doFinally(sig->{longms=(System.nanoTime()-start)/1_000_000;System.out.println("[timing] "+agent.getName()+": "+ms+"ms");});}}3.2 模型调用限速
通过AtomicLong记录上次调用时间,在onModelCall中计算距上次调用的间隔,不足时用Mono.delay延迟。适用于有QPM/QPS限制的API。
importio.agentscope.core.agent.Agent;importio.agentscope.core.event.AgentEvent;importio.agentscope.core.middleware.MiddlewareBase;importio.agentscope.core.middleware.ModelCallInput;importjava.time.Duration;importjava.util.concurrent.atomic.AtomicLong;importjava.util.function.Function;importreactor.core.publisher.Flux;importreactor.core.publisher.Mono;publicclassRateLimitMiddlewareimplementsMiddlewareBase{privatefinallongminIntervalMs;privatefinalAtomicLonglastCall=newAtomicLong(0);publicRateLimitMiddleware(DurationminInterval){this.minIntervalMs=minInterval.toMillis();}@OverridepublicFlux<AgentEvent>onModelCall(Agentagent,ModelCallInputinput,Function<ModelCallInput,Flux<AgentEvent>>next){longnow=System.currentTimeMillis();longwait=minIntervalMs-(now-lastCall.get());Mono<Void>delay=wait>0?Mono.delay(Duration.ofMillis(wait)).then():Mono.empty();returndelay.thenMany(next.apply(input)).doOnSubscribe(s->lastCall.set(System.currentTimeMillis()));}}3.3 动态系统提示词
在onSystemPrompt中注入实时上下文(时间、用户信息、环境变量等),通过Supplier<String>实现每次调用时动态计算。适用于需要按请求变化的信息注入。
importio.agentscope.core.agent.Agent;importio.agentscope.core.middleware.MiddlewareBase;importjava.time.Instant;importjava.util.function.Supplier;importreactor.core.publisher.Mono;publicclassDynamicContextMiddlewareimplementsMiddlewareBase{privatefinalSupplier<String>contextFn;publicDynamicContextMiddleware(Supplier<String>contextFn){this.contextFn=contextFn;}@OverridepublicMono<String>onSystemPrompt(Agentagent,StringcurrentPrompt){returnMono.just(currentPrompt+"\n\n## Current Context\n"+contextFn.get());}}// 使用:// .middleware(new DynamicContextMiddleware(() -> "Time: " + Instant.now()))3.4 切换备用模型
在onModelCall中通过onErrorResume捕获主模型失败,构造新的ModelCallInput替换为备用模型重试。适用于需要高可用的生产环境。
importio.agentscope.core.agent.Agent;importio.agentscope.core.event.AgentEvent;importio.agentscope.core.middleware.MiddlewareBase;importio.agentscope.core.middleware.ModelCallInput;importio.agentscope.core.model.Model;importjava.util.function.Function;importreactor.core.publisher.Flux;publicclassModelFallbackMiddlewareimplementsMiddlewareBase{privatefinalModelfallback;publicModelFallbackMiddleware(Modelfallback){this.fallback=fallback;}@OverridepublicFlux<AgentEvent>onModelCall(Agentagent,ModelCallInputinput,Function<ModelCallInput,Flux<AgentEvent>>next){returnnext.apply(input).onErrorResume(err->{System.err.println("Primary model failed: "+err.getMessage()+", switching to fallback");returnnext.apply(newModelCallInput(input.messages(),input.tools(),input.options(),fallback));});}}Tip:若只是简单的「主 → 备」回退,
ReActAgent.Builder已暴露fallbackModel(...)和maxRetries(...),无需自己写middleware。