通用本地持久化RPC失败重试组件|SpringBoot + OpenHFT Chronicle-Queue
2026/6/6 8:21:17 网站建设 项目流程

《金融支付架构实战指南:技术、安全与合规》介绍了6种容错性,其中重试性是其中一种,本文介绍重试性的一种实现方法。

一、前言

当程序发布重启,程序宕机(kill -9) 执行一部分的数据,但仍有一部分业务逻辑未执行,支付敏感业务场景需要重试。痛点汇总(所有微服务通用)

  • 本地事务已落库,RPC 调用下游(订单 / 账务 / 清算)失败,同步抛异常回滚主事务影响业务;若主事务已提交、RPC 失败 → 上下游数据不一致、资损;
  • 临时内存重试:JVM 宕机、进程重启,待重试 RPC 全部丢失;
  • 自建 MySQL 重试表:大促高频 RPC 落库压垮 DB、事务锁、性能差;
  • Redis 重试队列:Redis 集群宕机后整个补偿链路瘫痪;

设计目标:做成通用组件

一套代码统一支撑:Dubbo RPC、OpenFeign HTTP、RestTemplate、Redis 消息发送四大场景调用失败持久化重试,业务只需一行代码接入,无需重复编写重试落盘逻辑;

定位:RPC 调用失败兜底组件,正常链路直接同步调用,异常自动落地本地磁盘异步重试,不侵入原有业务事务

二、整体通用架构

业务执行入库完成 → 通用RPC调用模板发起远程调用
↓调用成功 → 直接返回,无落盘
↓异常/超时/服务不可用 → 组装通用RPC参数落地OpenHFT磁盘
后台定时任务轮询磁盘队列 → 反射/代理执行原RPC
↓调用成功 → 不再落盘(等效删除消息)
↓调用失败 → 更新重试次数+阶梯间隔,重新入磁盘队列
↓超出最大重试次数 → 移入本地死信队列,人工运维补发对账

三、Maven 依赖

xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- OpenHFT持久队列 -->
<dependency>
<groupId>net.openhft</groupId>
<artifactId>chronicle-queue</artifactId>
<version>5.25ea10</version>
</dependency>
<!-- JSON序列化 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.48</version>
</dependency>
<!-- Dubbo/Feign按需引入 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>

四、通用 RPC 重试实体(万能结构,适配所有调用)

不绑定任何业务、不绑定 Dubbo/Feign,通过「调用类型 + 全类名 + 方法名 + 入参数组」反射执行,通用所有远程调用

import lombok.Data;
import java.io.Serializable;

/**
* 通用RPC/HTTP调用失败重试实体
*/
@Data
public class CommonRpcRetryMsg implements Serializable {
// 全局唯一ID(幂等标识)
private String uniqueId;
// 业务单据号:支付单号/订单号(用于对账)
private String bizNo;
/**
* 调用类型:DUBBO / FEIGN / REST / REDIS_MSG
*/
private String invokeType;
// 接口全限定类名:com.xxx.api.OrderRemoteApi
private String interfaceClassName;
// 目标方法名
private String methodName;
// 方法入参类型全类名数组
private String[] paramClassNames;
// 方法入参JSON数组
private String[] paramJsonArr;

// 重试调度字段
private Integer retryCount;
private Long nextRetryTime;
private Long createTime;
}

五、全局常量配置


public final class RpcRetryConfig {
// 正常重试队列磁盘路径
public static final String RETRY_QUEUE_PATH = "./common/rpc/retry";
// 死信队列路径
public static final String DEAD_QUEUE_PATH = "./common/rpc/dead";
// 最大重试次数 金融建议12次
public static final int MAX_RETRY_NUM = 12;
}

六、通用反射 RPC 执行工具(核心:根据实体动态执行任意 RPC)


import com.alibaba.fastjson2.JSON;
import org.springframework.context.ApplicationContext;
import javax.annotation.Resource;
import java.lang.reflect.Method;

public class RpcInvokeUtil {
@Resource
private ApplicationContext applicationContext;

/**
* 根据重试消息反射执行远程调用,兼容Dubbo/Feign/普通Bean
* @return true成功 false失败
*/
public boolean invoke(CommonRpcRetryMsg msg) {
try {
// 1.获取接口Class
Class<?> interfaceCls = Class.forName(msg.getInterfaceClassName());
// 2.从Spring容器获取代理Bean(Dubbo/Feign代理对象都在Spring)
Object proxyBean = applicationContext.getBean(interfaceCls);
// 3.拼装参数Class数组
Class<?>[] paramClsArr = new Class<?>[msg.getParamClassNames().length];
for (int i = 0; i < msg.getParamClassNames().length; i++) {
paramClsArr[i] = Class.forName(msg.getParamClassNames()[i]);
}
// 4.获取Method
Method method = interfaceCls.getMethod(msg.getMethodName(), paramClsArr);
// 5.反序列化入参
Object[] params = new Object[msg.getParamJsonArr().length];
for (int i = 0; i < params.length; i++) {
params[i] = JSON.parseObject(msg.getParamJsonArr()[i], paramClsArr[i]);
}
// 6.执行RPC
method.invoke(proxyBean, params);
return true;
} catch (Exception e) {
return false;
}
}
}

七、OpenHFT 通用持久化重试管理器(全局唯一组件)

java
import com.alibaba.fastjson2.JSON;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycle;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.File;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

@Component
public class CommonRpcRetryManager {

private ChronicleQueue retryQueue;
private ChronicleQueue deadQueue;
private final RpcInvokeUtil rpcInvokeUtil;
private final ScheduledExecutorService schedule = Executors.newSingleThreadScheduledExecutor();

public CommonRpcRetryManager(RpcInvokeUtil rpcInvokeUtil) {
this.rpcInvokeUtil = rpcInvokeUtil;
}

@PostConstruct
public void init() {
// 按日拆分队列文件,避免超大文件
retryQueue = ChronicleQueue.singleBuilder(new File(RpcRetryConfig.RETRY_QUEUE_PATH))
.rollCycle(RollCycle.DAILY).build();
deadQueue = ChronicleQueue.singleBuilder(new File(RpcRetryConfig.DEAD_QUEUE_PATH))
.rollCycle(RollCycle.DAILY).build();
// 每15s扫描重试
schedule.scheduleWithFixedDelay(this::scanRetryTask, 3, 15, TimeUnit.SECONDS);
}

/**
* 调用失败:存入本地磁盘
*/
public void saveFailMsg(CommonRpcRetryMsg msg) {
msg.setCreateTime(System.currentTimeMillis());
msg.setRetryCount(0);
msg.setNextRetryTime(System.currentTimeMillis());
try (ExcerptAppender app = retryQueue.createAppender()) {
app.writeText(JSON.toJSONString(msg));
}
}

/**
* 定时扫描重试
*/
private void scanRetryTask() {
ExcerptTailer tailer = retryQueue.createTailer();
String json;
while ((json = tailer.readText()) != null) {
CommonRpcRetryMsg retryMsg = JSON.parseObject(json, CommonRpcRetryMsg.class);
long now = System.currentTimeMillis();
// 未到重试时间跳过
if (retryMsg.getNextRetryTime() > now) continue;
// 超限移入死信
if (retryMsg.getRetryCount() >= RpcRetryConfig.MAX_RETRY_NUM) {
moveToDead(retryMsg);
continue;
}
// 反射执行RPC
boolean success = rpcInvokeUtil.invoke(retryMsg);
if (success) {
// 成功:不重写=删除消息
continue;
} else {
// 失败:更新次数、下次时间重新落盘
retryMsg.setRetryCount(retryMsg.getRetryCount() + 1);
retryMsg.setNextRetryTime(getNextRetryTime(retryMsg.getRetryCount()));
try (ExcerptAppender app = retryQueue.createAppender()) {
app.writeText(JSON.toJSONString(retryMsg));
}
}
}
}

/**
* 阶梯退避策略:通用金融标准间隔
*/
private long getNextRetryTime(int count) {
long now = System.currentTimeMillis();
return switch (count) {
case 0 -> now + 10 * 1000;
case 1 -> now + 30 * 1000;
case 2 -> now + 60 * 1000;
case 3 -> now + 3 * 60 * 1000;
case 4 -> now + 5 * 60 * 1000;
default -> now + 10 * 60 * 1000;
};
}

private void moveToDead(CommonRpcRetryMsg msg) {
try (ExcerptAppender app = deadQueue.createAppender()) {
app.writeText(JSON.toJSONString(msg));
}
}

@PreDestroy
public void close() {
schedule.shutdown();
retryQueue.close();
deadQueue.close();
}
}

八、通用 RPC 调用门面(业务统一入口,一行代码接入)

java
import com.alibaba.fastjson2.JSON;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.util.UUID;

@Service
public class RpcRetryTemplate {
@Resource
private CommonRpcRetryManager retryManager;

/**
* 通用RPC调用模板
* @param bizNo 业务单号
* @param invokeType DUBBO/FEIGN/REST
* @param targetBean 远程接口代理对象(Dubbo/Feign)
* @param method 要执行的方法
* @param args 入参
*/
public <T> Object callRpc(String bizNo, String invokeType, T targetBean, Method method, Object... args) {
try {
// 同步发起远程调用
return method.invoke(targetBean, args);
} catch (Exception e) {
// 调用异常,自动组装消息落盘
CommonRpcRetryMsg msg = buildMsg(bizNo, invokeType, targetBean.getClass().getInterfaces()[0].getName(), method, args);
retryManager.saveFailMsg(msg);
return null;
}
}

private CommonRpcRetryMsg buildMsg(String bizNo, String invokeType, String interfaceName, Method method, Object[] args) {
CommonRpcRetryMsg msg = new CommonRpcRetryMsg();
msg.setUniqueId(UUID.randomUUID().toString().replace("-", ""));
msg.setBizNo(bizNo);
msg.setInvokeType(invokeType);
msg.setInterfaceClassName(interfaceName);
msg.setMethodName(method.getName());

// 参数Class数组
Class<?>[] paramTypes = method.getParameterTypes();
String[] clsArr = new String[paramTypes.length];
String[] jsonArr = new String[paramTypes.length];
for (int i = 0; i < paramTypes.length; i++) {
clsArr[i] = paramTypes[i].getName();
jsonArr[i] = JSON.toJSONString(args[i]);
}
msg.setParamClassNames(clsArr);
msg.setParamJsonArr(jsonArr);
return msg;
}
}

九、业务层使用示例(Dubbo + Feign 两种演示)

示例 1:Dubbo 远程调用

java
@Service
public class PayBizService {
@Resource
private OrderDubboApi orderDubboApi; // Dubbo远程接口
@Resource
private RpcRetryTemplate rpcTemplate;

public void paySuccess(String payNo) throws Exception {
// 1.本地数据库更新支付成功
// payOrderMapper.updateSuccess(payNo);

// 2.调用下游订单Dubbo,失败自动落盘重试
Method method = OrderDubboApi.class.getDeclaredMethod("createOrder", String.class, Long.class);
rpcTemplate.callRpc(payNo, "DUBBO", orderDubboApi, method, payNo, 100L);
}
}

示例 2:OpenFeign 远程调用

java
@Service
public class SettleBizService {
@Resource
private SettleFeignApi settleFeignApi; // Feign远程接口
@Resource
private RpcRetryTemplate rpcTemplate;

public void createSettle(String settleNo) throws Exception {
// 本地落库
Method method = SettleFeignApi.class.getDeclaredMethod("notifySettle", String.class);
rpcTemplate.callRpc(settleNo, "FEIGN", settleFeignApi, method, settleNo);
}
}

十、扩展:注解版(进阶,可选 AOP 自动拦截 @RpcRetry 注解方法)

自定义注解@RpcRetry(invokeType = "DUBBO"),AOP 环绕拦截方法异常,自动封装参数落盘,业务连手动构建 Method 都不需要,极简接入。

java
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcRetry {
String invokeType() default "DUBBO";
}

十一、核心通用特性说明

1、全场景兼容

  • Dubbo 泛化调用、接口代理调用;
  • OpenFeign/HttpClient HTTP 远程调用;
  • Redis Stream/MQ 消息投递失败;
    一套组件全部接管失败重试。

2、宕机数据安全

OpenHFT mmap 内存映射落磁盘,进程 / 服务器宕机消息永久保存在本地文件,重启自动加载全部未完成 RPC 继续重试,金融杜绝资损。

3、删除机制

Chronicle 只追加不删除:重试成功不再写入队列 = 逻辑删除;失败更新重试参数重新入队,无冗余数据堆积。

4、死信运维

重试耗尽转入死信目录,可开发后台管理接口:读取死信、手动重新入重试队列,满足金融审计、人工补单。

十二、四种重试方案横向对比

方案

通用 RPC 适配

宕机不丢消息

性能

中间件依赖

金融适配度

内存重试

全适配

极高

禁止生产

DB 重试表

全适配

Mysql

高并发不推荐

Redis 队列

全适配

Redis

中间件故障不可用

OpenHFT 通用组件

✅DUBBO/FEIGN/REST

超高

✅金融首选通用兜底

十三、生产优化拓展

  • 目录监控:监控./common/rpc目录磁盘容量、文件增长,死信突增告警(下游服务大面积宕机);
  • 幂等控制:uniqueId 全局唯一,下游接口根据 ID 做幂等,防止重复下单 / 重复入账;
  • 多环境隔离:测试 / 生产区分磁盘目录,避免文件混杂;
  • 超大流量分片:多实例部署天然队列隔离,每个实例维护自身本地磁盘。

十四、总结

  1. 本组件完全通用化,不绑定任何业务、任何 RPC 框架,新项目老项目无缝接入;
  2. 作为 RPC 调用最后兜底:正常走同步远程调用,异常自动降级本地磁盘异步重试,解决上下游数据不一致;
  3. 替换项目中 DB 重试表、内存重试、Redis 重试队列,金融级可靠性,满足支付、账务、清算合规要求。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询