Spring AI Graph:从0到Supervisor(二)并行+HITL实战
2026/6/9 9:40:08 网站建设 项目流程

Spring AI Graph的并行执行和HITL(人机协作)是Graph模块最核心的两个进阶特性。本文基于Dream-SaaS代码审查Agent的真实落地,从API用法→踩坑→5轮线上数据完整记录。并行两行代码搞定,HITL四步走通,但State存嵌套对象直接炸、并行分支后不能接条件边、跨请求状态污染——这3个坑不提前知道,排查能卡两天。


先说结论

特性API核心要点
并行执行addEdge(START, List.of(...))自动线程池调度,fan-in自动等待所有分支完成
HITLinterruptBefore+ checkpoint + resume四步:updateState → getState → withNodeResumed → invoke
断点恢复RedisSaver / MemorySaver实测resume 30-61ms

3个坑:

  1. 并行分支后不能直接接条件边——必须先fan-in到普通节点
  2. State存嵌套对象,checkpoint序列化炸——invoke能跑,Redis存不进去
  3. 跨请求状态污染——默认threadId复用,reasoningTrace越积越多

整体架构

CodeReview子图完整流程:

START ├─ codeCheckNode (调 Feign → codeReviewAnalyzeResult JSON) └─ styleCheckNode (本地规则 → styleCheckResult JSON) ↓(并行汇聚) → mergeCheckNode (合并 STYLE issues,扣分规则:每条 -2,最低 0) → riskAssessNode (写 riskLevel / needHumanReview) → [条件边] → humanReviewNode 或 reportGenNode → humanReviewNode (可选)HITL → reportGenNode END

上篇搭了Supervisor + RAG子图,CodeReview子图是占位。本篇填满它。


一、并行执行:addEdge(START, List.of(...))

1.1 API写法

Spring AI Graph 1.0.0.3+ 并行分支用addEdge的 List 重载:

// 并行分叉:START → codeCheck ∥ styleCheck stateGraph.addEdge( START, List.of(CodeReviewSubGraphNames.codeCheckNode, CodeReviewSubGraphNames.styleCheckNode)); // 并行汇聚:两分支均完成后进入 mergeCheck stateGraph.addEdge( List.of(CodeReviewSubGraphNames.codeCheckNode, CodeReviewSubGraphNames.styleCheckNode), CodeReviewSubGraphNames.mergeCheckNode);

两行代码。不需要addParallelNode之类的专用API,addEdge(START, List.of(...))自动创建并行分支,addEdge(List.of(...), target)自动等待所有源节点完成再汇聚。

1.2 两个并行节点

codeCheckNode——调 Feign 远程服务(重):

@Component public class CodeCheckNode implements NodeAction { private final CodeReviewClient codeReviewClient; @Override public Map<String, Object> apply(OverAllState state) { CodeReviewGraphTiming timing = CodeReviewGraphTiming.begin(); String input = CodeReviewInputSupport.resolveReviewCode(state); log.info("[CodeReviewGraph] node=codeCheckNode start atMs={} codeLength={}", timing.startEpochMs(), input.length()); try { CodeReviewAnalyzeRequest request = CodeReviewAnalyzeRequest.fromReview(input, CodeReviewInputSupport.resolveReviewInstruction(state)); CodeReviewAnalyzeResult result = codeReviewClient.analyze(request); log.info("[CodeReviewGraph] node=codeCheckNode done atMs={} elapsedMs={} score={} issueCount={}", timing.nowEpochMs(), timing.elapsedMs(), result.score(), result.issues().size()); return Map.of(CodeReviewSubGraphNames.codeReviewAnalyzeResult, CodeReviewAnalyzeResults.toJson(result)); } catch (Exception ex) { return Map.of(DreamSaaSOverAllState.errorMessage, "调用 code-review-agent 失败: " + ex.getMessage()); } } }

styleCheckNode——本地规则检查(轻):

@Component public class StyleCheckNode implements NodeAction { @Override public Map<String, Object> apply(OverAllState state) { CodeReviewGraphTiming timing = CodeReviewGraphTiming.begin(); String input = CodeReviewInputSupport.resolveReviewCode(state); log.info("[CodeReviewGraph] node=styleCheckNode start atMs={} inputLength={}", timing.startEpochMs(), input.length()); List<StyleCheckViolation> violations = CodeStyleRules.scan(input); boolean compliant = violations.isEmpty(); StyleCheckResultPayload payload = new StyleCheckResultPayload(compliant, violations, compliant ? "格式检查通过" : "格式检查发现 " + violations.size() + " 处问题"); log.info("[CodeReviewGraph] node=styleCheckNode done atMs={} elapsedMs={} compliant={} violationCount={}", timing.nowEpochMs(), timing.elapsedMs(), compliant, violations.size()); return Map.of(CodeReviewSubGraphNames.styleCheckResult, StyleCheckResults.toJson(payload)); } }

两个节点都实现NodeAction,注册时统一用AsyncNodeAction.node_async()包一层,框架自动线程池并行执行。

1.3 并行执行验证——线上日志

部署后日志线程名和时间戳确认并行:

23:18:39.570 [parallel-node-action-thread-6] INFO CodeCheckNode - start atMs=1780499919570 23:18:39.573 [parallel-node-action-thread-5] INFO StyleCheckNode - start atMs=1780499919573 23:18:39.573 [parallel-node-action-thread-5] INFO StyleCheckNode - done elapsedMs=0 23:19:00.026 [parallel-node-action-thread-6] INFO CodeCheckNode - done elapsedMs=20455

关键证据:

  • codeCheckNode 跑在 thread-6,styleCheckNode 跑在 thread-5——不同线程
  • 两者启动间隔 3ms——确认并行启动
  • styleCheckNode <1ms 完成(本地规则),codeCheckNode 20455ms 完成(Feign 调 LLM)
  • 并行总耗时 = max(20455, <1) = 20455ms

当前 styleCheckNode 是本地规则,并行收益不明显。但如果 styleCheck 也调 LLM(预估 5-10s),并行 = max(22s, 10s) ≈ 22s,串行 = 22s + 10s = 32s,节省 ~31%。架构就位,加节点就行。

1.4 坑1:并行分支后不能接条件边

并行汇聚之后,mergeCheckNode → riskAssessNode → [条件边]正常。但在并行分支上直接加条件边——会报错。并行分支必须先 fan-in 到一个普通节点,再从那个节点出条件边。

// ❌ 错误:并行节点后直接条件边 stateGraph.addConditionalEdges(CodeReviewSubGraphNames.codeCheckNode, ...); // ✅ 正确:先 fan-in → mergeCheck → riskAssess → 条件边 stateGraph.addEdge( List.of(codeCheckNode, styleCheckNode), mergeCheckNode); stateGraph.addConditionalEdges(riskAssessNode, routeNode, Map.of(...));

二、HITL:interruptBefore + checkpoint + resume

HITL的本质:图跑到某个节点之前暂停,等人工决策后从断点继续。

2.1 interruptBefore vs interruptAfter

配置效果适用场景
interruptBefore(node)节点执行前暂停,状态不含该节点输出人工审核后决定是否执行该节点
interruptAfter(node)节点执行后暂停,状态已含该节点输出人工审核节点输出后决定走向

本项目用interruptBefore(humanReviewNode):风险评估已产出结论,人工在审核前介入,可以直接 APPROVED 或 REJECTED。

如果场景是「让 AI 先给建议,人再拍板」,用interruptAfter——比如 LLM 生成了代码修改建议,人工审核后决定是否采纳。

2.2 CompileConfig 声明中断点

CompileConfig compileConfig = GraphCheckpointCompileSupport.withSharedCheckpointSaver( CompileConfig.builder() .interruptBefore(CodeReviewSubGraphNames.humanReviewNode), graphSaverConfig) .build(); CompiledGraph compiled = stateGraph.compile(compileConfig);

一行interruptBefore(humanReviewNode),图跑到 humanReviewNode 前就停了。checkpoint 机制把状态持久化到 Redis,等 resume 时恢复。

2.3 坑2:State 存储类型限制

Graph state 只支持String / Number / Boolean / Map / List。嵌套对象必须转 JSON 字符串存进去,读出来再反序列化。

// 写入:对象 → JSON 字符串 return Map.of(CodeReviewSubGraphNames.codeReviewAnalyzeResult, CodeReviewAnalyzeResults.toJson(result)); // 读取:JSON 字符串 → 对象 CodeReviewAnalyzeResult analyze = CodeReviewAnalyzeResults.parse( state.value(CodeReviewSubGraphNames.codeReviewAnalyzeResult) .orElse(null));

直接塞对象进去,invoke 能跑,但 checkpoint 序列化时炸——Redis 里存不进去,resume 反序列化也拿不到。这个坑排查成本最高,因为本地跑一切正常,上了 Redis 才暴露。

2.4 HITL 三步走:start → interrupt → resume

第一步:startHitl,拿到 threadId

public Map<String, Object> startHitl(String instruction, String code, boolean forceHumanReview) { String threadId = UUID.randomUUID().toString(); RunnableConfig runConfig = GraphInvokeSupport.threadConfig(threadId); Optional<OverAllState> finished = codeReviewSubGraph.invoke( GraphInvokeSupport.codeReviewInputs(instruction, code, forceHumanReview), runConfig); Map<String, Object> result = invokeAndPack(finished, threadId); enrichInterruptFlags(result, runConfig); return result; }

forceHumanReview=true时,RiskAssessNode无视实际风险等级强制needHumanReview=true,方便测试。

第二步:resumeHitl,写入人工决策 + 恢复执行

public Map<String, Object> resumeHitl(String threadId, String decision, String reason) { RunnableConfig runConfig = GraphInvokeSupport.threadConfig(threadId); // 1. updateState:把人工决策写入 checkpoint Map<String, Object> patch = new HashMap<>(); patch.put(CodeReviewSubGraphNames.humanDecision, decision); patch.put(CodeReviewSubGraphNames.humanReviewReason, reason); runConfig = codeReviewSubGraph.updateState(runConfig, patch, null); // 2. getState:读出 snapshot,拿 next 节点 ID StateSnapshot snapshot = codeReviewSubGraph.getState(runConfig); String nextNode = snapshot.next(); // 3. withNodeResumed:标记中断节点可继续 runConfig.withNodeResumed(nextNode); // 4. invoke:从 checkpoint state 接着跑 OverAllState state = codeReviewSubGraph .invoke(snapshot.state(), runConfig) .orElseThrow(); return invokeAndPack(Optional.of(state), threadId); }

四步:updateState → getState → withNodeResumed → invoke。少一步都不行。

2.5 风险评估规则

RiskAssessNode决定需不需要人工介入:

static boolean needHumanReview(int score, SeverityCounts counts) { if (counts.critical > 0) return true; // 有 CRITICAL issue if (score < 60) return true; // 评分低于 60 return counts.high >= 3; // HIGH issue ≥ 3 }

风险等级双维度取高:

评分档位
小于60CRITICAL
60~74HIGH
75~84MEDIUM
85及以上LOW

2.6 实测:HITL 完整流程

用一段含 SQL 注入的代码测试:

public class AuthController { @PostMapping("/login") public String login(String username, String password) { String sql = "SELECT * FROM users WHERE username=" + username + " AND password=" + password; Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery(sql); return rs.next() ? "token-" + username : null; } }

HITL Start 日志:

23:21:04.241 [parallel-node-action-thread-5] INFO CodeCheckNode - start codeLength=417 23:21:04.242 [parallel-node-action-thread-6] INFO StyleCheckNode - start inputLength=417 23:21:04.242 [parallel-node-action-thread-6] INFO StyleCheckNode - done elapsedMs=0 23:21:25.823 [parallel-node-action-thread-5] INFO CodeCheckNode - done elapsedMs=21581 score=70 23:21:25.832 [parallel-node-action-thread-5] INFO MergeCheckNode - done elapsedMs=0 23:21:25.838 [parallel-node-action-thread-5] INFO RiskAssessNode - done elapsedMs=0 riskLevel=CRITICAL needHumanReview=true 23:21:25.846 [http-nio-8098-exec-2] INFO Service - graph interrupted nextNode=humanReviewNode awaiting resume

并行执行 → 风险 CRITICAL → HITL 中断,等人工。

HITL Resume(APPROVED)日志:

23:21:40.872 [http-nio-8098-exec-7] INFO Controller - api hitl/resume threadId=9d3c8010 decision=APPROVED 23:21:40.907 [http-nio-8098-exec-7] INFO HumanReviewNode - approved elapsedMs=0 note=人工复核通过: 人工确认安全风险可接受,已规划修复 23:21:40.916 [http-nio-8098-exec-7] INFO ReportGenNode - done elapsedMs=0 mode=success score=70 23:21:40.933 [http-nio-8098-exec-7] INFO Service - resumeHitl done elapsedMs=61

61ms完成断点恢复。

HITL Resume(REJECTED)日志:

23:22:23.940 [http-nio-8098-exec-8] INFO HumanReviewNode - rejected elapsedMs=0 note=人工复核驳回: SQL注入漏洞不可接受,必须修复后重新提交 23:22:23.946 [http-nio-8098-exec-8] INFO ReportGenNode - done elapsedMs=0 mode=failure reportLength=40 23:22:23.954 [http-nio-8098-exec-8] INFO Service - resumeHitl done elapsedMs=30

REJECTED 时走 failure 分支,30ms 完成。


三、CheckpointSaver:HITL 的状态怎么存

HITL 能断点续跑,靠的是 checkpoint 机制——图每次执行到一个节点,状态会被 Saver 持久化;中断后 resume 时,从上次存的快照恢复。

3.1 框架内置 8 种 Saver

Saver存储多实例共享重启恢复适用场景
MemorySaver进程内 HashMap开发调试、单实例
VersionedMemorySaver进程内带版本号checkpoint 历史回溯
FileSystemSaver本地文件单实例但需重启恢复
RedisSaverRedis生产首选
MysqlSaverMySQL已有 MySQL 基础设施
PostgresSaverPostgreSQL同上
MongoSaverMongoDB同上
OracleSaverOracle企业级数据库

入门用 MemorySaver(零依赖),生产用 RedisSaver(本项目选择)。

3.2 Redis / Memory 双模式配置

一套配置,Redis 优先、自动降级:

@Configuration @EnableConfigurationProperties(GraphCheckpointProperties.class) @ConditionalOnClass(BaseCheckpointSaver.class) public class GraphCheckpointConfiguration { @Bean(name = "graphCheckpointSaver") @ConditionalOnMissingBean(name = "graphCheckpointSaver") public BaseCheckpointSaver graphCheckpointSaver( GraphCheckpointProperties properties, @Qualifier("graphRedissonClient") ObjectProvider<RedissonClient> redissonProvider) { if (properties.isRedisEnabled()) { RedissonClient client = redissonProvider.getIfAvailable(); if (client != null) { return RedisSaver.builder().redisson(client).build(); } log.warn("[GraphCheckpoint] redis-enabled=true but RedissonClient missing; falling back to MemorySaver"); } return MemorySaver.builder().build(); } }

配置项只有两个:

dream: ai: graph: checkpoint: redis-enabled: true # false → MemorySaver redis-address-prefix: "redis://" # 集群/哨兵可改为 rediss://

Redis 连接复用spring.data.redis.*,不需要额外配数据源。

MemorySaver 进程重启后 checkpoint 全丢,HITL 的 resume 就找不回状态了。生产环境务必用 Redis 或数据库 Saver。

3.3 多子图共享 SaverConfig

项目有多个子图都要 HITL 时,不需要每个子图各自配 Saver。GraphCheckpointCompileSupport把共享的SaverConfig挂到CompileConfig

public final class GraphCheckpointCompileSupport { public static CompileConfig.Builder withSharedCheckpointSaver( CompileConfig.Builder builder, ObjectProvider<SaverConfig> graphSaverConfig) { return builder.saverConfig(resolveSaverConfig(graphSaverConfig)); } public static SaverConfig resolveSaverConfig(ObjectProvider<SaverConfig> graphSaverConfig) { return graphSaverConfig.getIfAvailable( () -> SaverConfig.builder() .register(MemorySaver.builder().build()) .build()); } }

子图装配时一行搞定:

CompileConfig compileConfig = GraphCheckpointCompileSupport.withSharedCheckpointSaver( CompileConfig.builder() .interruptBefore(CodeReviewSubGraphNames.humanReviewNode), graphSaverConfig) .build();

未装配dream-ai-graph模块时自动降级MemorySaver,不报错、不阻塞。


四、坑3:防状态污染——跨请求数据串了

多个请求复用同一个 CompiledGraph 实例,state 如果不清理会串数据。

4.1 问题现象

连续请求时,Chat 子图会混入上次推理的reasoningTrace,越积越多。第一次正常,第二次就开始乱了。

4.2 根因

// ❌ 错误写法 graph.invoke(inputs); // 不传 RunnableConfig

不传RunnableConfig时:

  1. threadId 复用:Spring AI Graph 默认用固定 threadId
  2. checkpoint 残留:上一个请求的状态还在
  3. reasoningTrace 累加:节点的追加逻辑不断累积

4.3 解决方案

public final class GraphInvokeStateDefaults { public static Map<String, Object> codeReviewInputs(String instruction, String code, boolean forceHumanReview) { Map<String, Object> inputs = new HashMap<>(); inputs.put(DreamSaaSOverAllState.userInput, instruction + "\n" + code); inputs.put(CodeReviewSubGraphNames.reviewInstruction, StringUtils.hasText(instruction) ? instruction : ""); inputs.put(CodeReviewSubGraphNames.reviewCode, StringUtils.hasText(code) ? code : ""); inputs.put(CodeReviewSubGraphNames.codeReviewAnalyzeResult, ""); inputs.put(CodeReviewSubGraphNames.styleCheckResult, ""); inputs.put(CodeReviewSubGraphNames.riskLevel, ""); inputs.put(CodeReviewSubGraphNames.needHumanReview, false); inputs.put(CodeReviewSubGraphNames.hitlTestForceHumanReview, forceHumanReview); inputs.put(DreamSaaSOverAllState.errorMessage, ""); inputs.put(DreamSaaSOverAllState.finalAnswer, ""); return inputs; } }

每个请求显式初始化所有 state 键,防止上一个请求的残留值污染当前请求。并行执行时两个分支写同一个 state key,如果初始值不干净,merge 阶段会拿到脏数据。


五、5轮线上耗时数据

从服务器日志[CodeReviewGraph]提取的节点级耗时:

轮次codeCheckNodestyleCheckNodemergeriskAssess总计评分风险HITL
Run120455ms<1ms1ms1ms20521ms85HIGH
Run216541ms<1ms1ms1ms16577ms85CRITICAL
Run324706ms<1ms1ms1ms24755ms85HIGH
HITL-121581ms<1ms1ms1ms21615ms70CRITICAL
HITL-226938ms<1ms1ms1ms26975ms60CRITICAL

关键结论:

  1. codeCheckNode(Feign 远程调用)是唯一瓶颈,占 99.9%+ 耗时
  2. styleCheckNode <1ms / mergeCheckNode ≈1ms / riskAssessNode ≈1ms(本地规则 + 内存计算)
  3. 所有轮次 codeCheckNode 和 styleCheckNode 在同一毫秒启动——并行执行确认
  4. HITL Resume 30-61ms,断点恢复几乎零延迟

六、代码结构

src/main/java/com/zhu/dream/ai/graph/subgraph/codereview/ ├── CodeReviewSubGraphConfiguration.java # 子图装配(并行+条件边+interruptBefore) ├── CodeCheckNode.java # Feign 调 code-review-agent ├── StyleCheckNode.java # 本地格式规则 ├── CodeReviewMergeNode.java # 合并 STYLE issues ├── RiskAssessNode.java # 风险评估 + needHumanReview ├── CodeReviewRouteNode.java # 条件边 ├── HumanReviewNode.java # HITL 人工复核 ├── ReportGenNode.java # 生成 Markdown 报告 ├── CodeReviewSubGraphNames.java # 常量 ├── CodeReviewSubGraphTestService.java # invoke / startHitl / resumeHitl ├── CodeReviewGraphTiming.java # 节点耗时统计 src/main/java/com/zhu/dream/ai/graph/support/ ├── CodeReviewInputSupport.java # 输入拆分 ├── GraphInvokeStateDefaults.java # state 键默认值注册表 ├── GraphInvokeSupport.java # invoke 工具(防状态污染)

3个坑汇总

现象解法
并行分支后接条件边编译报错先 fan-in 到普通节点,再出条件边
State 存嵌套对象invoke 能跑,checkpoint 序列化炸嵌套对象转 JSON 字符串存入,读出反序列化
跨请求状态污染并行分支 merge 时拿到脏数据每次 invoke 前显式初始化所有 state 键

版本:spring-ai-bom 1.1.6 + spring-ai-alibaba 1.1.2.2

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询