《金融支付架构实战指南:技术、安全与合规》介绍了6种容错性,其中重试性是其中一种,本文介绍重试性的一种实现方法。
一、前言
当程序发布重启,程序宕机(kill -9) 执行一部分的数据,但仍有一部分业务逻辑未执行,支付敏感业务场景需要重试。痛点汇总(所有微服务通用)
- 本地事务已落库,RPC 调用下游(订单 / 账务 / 清算)失败,同步抛异常回滚主事务影响业务;若主事务已提交、RPC 失败 → 上下游数据不一致、资损;
- 临时内存重试:JVM 宕机、进程重启,待重试 RPC 全部丢失;
- 自建 MySQL 重试表:大促高频 RPC 落库压垮 DB、事务锁、性能差;
- Redis 重试队列:Redis 集群宕机后整个补偿链路瘫痪;
设计目标:做成通用组件
一套代码统一支撑:Dubbo RPC、OpenFeign HTTP、RestTemplate、Redis 消息发送四大场景调用失败持久化重试,业务只需一行代码接入,无需重复编写重试落盘逻辑;
定位:RPC 调用失败兜底组件,正常链路直接同步调用,异常自动落地本地磁盘异步重试,不侵入原有业务事务。 |
二、整体通用架构
业务执行入库完成 → 通用RPC调用模板发起远程调用 ↓调用成功 → 直接返回,无落盘 ↓异常/超时/服务不可用 → 组装通用RPC参数落地OpenHFT磁盘 后台定时任务轮询磁盘队列 → 反射/代理执行原RPC ↓调用成功 → 不再落盘(等效删除消息) ↓调用失败 → 更新重试次数+阶梯间隔,重新入磁盘队列 ↓超出最大重试次数 → 移入本地死信队列,人工运维补发对账 |
三、Maven 依赖
xml <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!-- OpenHFT持久队列 --> <dependency> <groupId>net.openhft</groupId> <artifactId>chronicle-queue</artifactId> <version>5.25ea10</version> </dependency> <!-- JSON序列化 --> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.48</version> </dependency> <!-- Dubbo/Feign按需引入 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo-spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies> |
四、通用 RPC 重试实体(万能结构,适配所有调用)
不绑定任何业务、不绑定 Dubbo/Feign,通过「调用类型 + 全类名 + 方法名 + 入参数组」反射执行,通用所有远程调用 |
import lombok.Data; import java.io.Serializable;
/** * 通用RPC/HTTP调用失败重试实体 */ @Data public class CommonRpcRetryMsg implements Serializable { // 全局唯一ID(幂等标识) private String uniqueId; // 业务单据号:支付单号/订单号(用于对账) private String bizNo; /** * 调用类型:DUBBO / FEIGN / REST / REDIS_MSG */ private String invokeType; // 接口全限定类名:com.xxx.api.OrderRemoteApi private String interfaceClassName; // 目标方法名 private String methodName; // 方法入参类型全类名数组 private String[] paramClassNames; // 方法入参JSON数组 private String[] paramJsonArr;
// 重试调度字段 private Integer retryCount; private Long nextRetryTime; private Long createTime; } |
五、全局常量配置
public final class RpcRetryConfig { // 正常重试队列磁盘路径 public static final String RETRY_QUEUE_PATH = "./common/rpc/retry"; // 死信队列路径 public static final String DEAD_QUEUE_PATH = "./common/rpc/dead"; // 最大重试次数 金融建议12次 public static final int MAX_RETRY_NUM = 12; }
|
六、通用反射 RPC 执行工具(核心:根据实体动态执行任意 RPC)
import com.alibaba.fastjson2.JSON; import org.springframework.context.ApplicationContext; import javax.annotation.Resource; import java.lang.reflect.Method;
public class RpcInvokeUtil { @Resource private ApplicationContext applicationContext;
/** * 根据重试消息反射执行远程调用,兼容Dubbo/Feign/普通Bean * @return true成功 false失败 */ public boolean invoke(CommonRpcRetryMsg msg) { try { // 1.获取接口Class Class<?> interfaceCls = Class.forName(msg.getInterfaceClassName()); // 2.从Spring容器获取代理Bean(Dubbo/Feign代理对象都在Spring) Object proxyBean = applicationContext.getBean(interfaceCls); // 3.拼装参数Class数组 Class<?>[] paramClsArr = new Class<?>[msg.getParamClassNames().length]; for (int i = 0; i < msg.getParamClassNames().length; i++) { paramClsArr[i] = Class.forName(msg.getParamClassNames()[i]); } // 4.获取Method Method method = interfaceCls.getMethod(msg.getMethodName(), paramClsArr); // 5.反序列化入参 Object[] params = new Object[msg.getParamJsonArr().length]; for (int i = 0; i < params.length; i++) { params[i] = JSON.parseObject(msg.getParamJsonArr()[i], paramClsArr[i]); } // 6.执行RPC method.invoke(proxyBean, params); return true; } catch (Exception e) { return false; } } }
|
七、OpenHFT 通用持久化重试管理器(全局唯一组件)
java import com.alibaba.fastjson2.JSON; import net.openhft.chronicle.queue.ChronicleQueue; import net.openhft.chronicle.queue.ExcerptAppender; import net.openhft.chronicle.queue.ExcerptTailer; import net.openhft.chronicle.queue.RollCycle; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.io.File; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit;
@Component public class CommonRpcRetryManager {
private ChronicleQueue retryQueue; private ChronicleQueue deadQueue; private final RpcInvokeUtil rpcInvokeUtil; private final ScheduledExecutorService schedule = Executors.newSingleThreadScheduledExecutor();
public CommonRpcRetryManager(RpcInvokeUtil rpcInvokeUtil) { this.rpcInvokeUtil = rpcInvokeUtil; }
@PostConstruct public void init() { // 按日拆分队列文件,避免超大文件 retryQueue = ChronicleQueue.singleBuilder(new File(RpcRetryConfig.RETRY_QUEUE_PATH)) .rollCycle(RollCycle.DAILY).build(); deadQueue = ChronicleQueue.singleBuilder(new File(RpcRetryConfig.DEAD_QUEUE_PATH)) .rollCycle(RollCycle.DAILY).build(); // 每15s扫描重试 schedule.scheduleWithFixedDelay(this::scanRetryTask, 3, 15, TimeUnit.SECONDS); }
/** * 调用失败:存入本地磁盘 */ public void saveFailMsg(CommonRpcRetryMsg msg) { msg.setCreateTime(System.currentTimeMillis()); msg.setRetryCount(0); msg.setNextRetryTime(System.currentTimeMillis()); try (ExcerptAppender app = retryQueue.createAppender()) { app.writeText(JSON.toJSONString(msg)); } }
/** * 定时扫描重试 */ private void scanRetryTask() { ExcerptTailer tailer = retryQueue.createTailer(); String json; while ((json = tailer.readText()) != null) { CommonRpcRetryMsg retryMsg = JSON.parseObject(json, CommonRpcRetryMsg.class); long now = System.currentTimeMillis(); // 未到重试时间跳过 if (retryMsg.getNextRetryTime() > now) continue; // 超限移入死信 if (retryMsg.getRetryCount() >= RpcRetryConfig.MAX_RETRY_NUM) { moveToDead(retryMsg); continue; } // 反射执行RPC boolean success = rpcInvokeUtil.invoke(retryMsg); if (success) { // 成功:不重写=删除消息 continue; } else { // 失败:更新次数、下次时间重新落盘 retryMsg.setRetryCount(retryMsg.getRetryCount() + 1); retryMsg.setNextRetryTime(getNextRetryTime(retryMsg.getRetryCount())); try (ExcerptAppender app = retryQueue.createAppender()) { app.writeText(JSON.toJSONString(retryMsg)); } } } }
/** * 阶梯退避策略:通用金融标准间隔 */ private long getNextRetryTime(int count) { long now = System.currentTimeMillis(); return switch (count) { case 0 -> now + 10 * 1000; case 1 -> now + 30 * 1000; case 2 -> now + 60 * 1000; case 3 -> now + 3 * 60 * 1000; case 4 -> now + 5 * 60 * 1000; default -> now + 10 * 60 * 1000; }; }
private void moveToDead(CommonRpcRetryMsg msg) { try (ExcerptAppender app = deadQueue.createAppender()) { app.writeText(JSON.toJSONString(msg)); } }
@PreDestroy public void close() { schedule.shutdown(); retryQueue.close(); deadQueue.close(); } } |
八、通用 RPC 调用门面(业务统一入口,一行代码接入)
java import com.alibaba.fastjson2.JSON; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.lang.reflect.Method; import java.util.UUID;
@Service public class RpcRetryTemplate { @Resource private CommonRpcRetryManager retryManager;
/** * 通用RPC调用模板 * @param bizNo 业务单号 * @param invokeType DUBBO/FEIGN/REST * @param targetBean 远程接口代理对象(Dubbo/Feign) * @param method 要执行的方法 * @param args 入参 */ public <T> Object callRpc(String bizNo, String invokeType, T targetBean, Method method, Object... args) { try { // 同步发起远程调用 return method.invoke(targetBean, args); } catch (Exception e) { // 调用异常,自动组装消息落盘 CommonRpcRetryMsg msg = buildMsg(bizNo, invokeType, targetBean.getClass().getInterfaces()[0].getName(), method, args); retryManager.saveFailMsg(msg); return null; } }
private CommonRpcRetryMsg buildMsg(String bizNo, String invokeType, String interfaceName, Method method, Object[] args) { CommonRpcRetryMsg msg = new CommonRpcRetryMsg(); msg.setUniqueId(UUID.randomUUID().toString().replace("-", "")); msg.setBizNo(bizNo); msg.setInvokeType(invokeType); msg.setInterfaceClassName(interfaceName); msg.setMethodName(method.getName());
// 参数Class数组 Class<?>[] paramTypes = method.getParameterTypes(); String[] clsArr = new String[paramTypes.length]; String[] jsonArr = new String[paramTypes.length]; for (int i = 0; i < paramTypes.length; i++) { clsArr[i] = paramTypes[i].getName(); jsonArr[i] = JSON.toJSONString(args[i]); } msg.setParamClassNames(clsArr); msg.setParamJsonArr(jsonArr); return msg; } } |
九、业务层使用示例(Dubbo + Feign 两种演示)
示例 1:Dubbo 远程调用
java @Service public class PayBizService { @Resource private OrderDubboApi orderDubboApi; // Dubbo远程接口 @Resource private RpcRetryTemplate rpcTemplate;
public void paySuccess(String payNo) throws Exception { // 1.本地数据库更新支付成功 // payOrderMapper.updateSuccess(payNo);
// 2.调用下游订单Dubbo,失败自动落盘重试 Method method = OrderDubboApi.class.getDeclaredMethod("createOrder", String.class, Long.class); rpcTemplate.callRpc(payNo, "DUBBO", orderDubboApi, method, payNo, 100L); } } |
示例 2:OpenFeign 远程调用
java @Service public class SettleBizService { @Resource private SettleFeignApi settleFeignApi; // Feign远程接口 @Resource private RpcRetryTemplate rpcTemplate;
public void createSettle(String settleNo) throws Exception { // 本地落库 Method method = SettleFeignApi.class.getDeclaredMethod("notifySettle", String.class); rpcTemplate.callRpc(settleNo, "FEIGN", settleFeignApi, method, settleNo); } } |
十、扩展:注解版(进阶,可选 AOP 自动拦截 @RpcRetry 注解方法)
自定义注解@RpcRetry(invokeType = "DUBBO"),AOP 环绕拦截方法异常,自动封装参数落盘,业务连手动构建 Method 都不需要,极简接入。 |
java @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface RpcRetry { String invokeType() default "DUBBO"; } |
十一、核心通用特性说明
1、全场景兼容
- OpenFeign/HttpClient HTTP 远程调用;
- Redis Stream/MQ 消息投递失败;
一套组件全部接管失败重试。
2、宕机数据安全
OpenHFT mmap 内存映射落磁盘,进程 / 服务器宕机消息永久保存在本地文件,重启自动加载全部未完成 RPC 继续重试,金融杜绝资损。
3、删除机制
Chronicle 只追加不删除:重试成功不再写入队列 = 逻辑删除;失败更新重试参数重新入队,无冗余数据堆积。
4、死信运维
重试耗尽转入死信目录,可开发后台管理接口:读取死信、手动重新入重试队列,满足金融审计、人工补单。
十二、四种重试方案横向对比
方案 | 通用 RPC 适配 | 宕机不丢消息 | 性能 | 中间件依赖 | 金融适配度 |
内存重试 | 全适配 | ❌ | 极高 | 无 | 禁止生产 |
DB 重试表 | 全适配 | ✅ | 低 | Mysql | 高并发不推荐 |
Redis 队列 | 全适配 | ✅ | 高 | Redis | 中间件故障不可用 |
OpenHFT 通用组件 | ✅DUBBO/FEIGN/REST | ✅ | 超高 | 无 | ✅金融首选通用兜底 |
十三、生产优化拓展
- 目录监控:监控./common/rpc目录磁盘容量、文件增长,死信突增告警(下游服务大面积宕机);
- 幂等控制:uniqueId 全局唯一,下游接口根据 ID 做幂等,防止重复下单 / 重复入账;
- 多环境隔离:测试 / 生产区分磁盘目录,避免文件混杂;
- 超大流量分片:多实例部署天然队列隔离,每个实例维护自身本地磁盘。
十四、总结
- 本组件完全通用化,不绑定任何业务、任何 RPC 框架,新项目老项目无缝接入;
- 作为 RPC 调用最后兜底:正常走同步远程调用,异常自动降级本地磁盘异步重试,解决上下游数据不一致;
- 替换项目中 DB 重试表、内存重试、Redis 重试队列,金融级可靠性,满足支付、账务、清算合规要求。