Redis Stream 与消息队列模式:从 Pub/Sub 到持久化消费
2026/6/11 23:06:57 网站建设 项目流程

Redis Stream 与消息队列模式:从 Pub/Sub 到持久化消费

一、Redis Pub/Sub 的"不可靠"困境:消息丢失与无法回溯

Redis 的 Pub/Sub 机制轻量高效,但存在两个致命缺陷:一是消息不持久化,如果消费者离线,期间发布的消息将永久丢失;二是无法回溯消费,消息一旦发布就被推送给在线消费者,没有历史记录可供查询。在需要可靠消息传递的场景(如订单状态变更通知、异步任务分发)中,Pub/Sub 的不可靠性成为硬伤。

Redis 5.0 引入的 Stream 数据类型,提供了类似 Kafka 的持久化消息队列能力:消息持久存储、支持消费者组、支持消息确认和重试、支持历史消息回溯。Stream 是 Redis 从"缓存"走向"消息基础设施"的关键一步。

二、Stream 的消费者组模型

Stream 的消费者组(Consumer Group)模型借鉴了 Kafka 的设计理念:多个消费者组成一个组,组内消费者共享消息(每条消息只被组内一个消费者处理),不同组独立消费(每条消息会被每个组各消费一次)。

flowchart TD A[Stream:消息流] --> B[消费者组 A] A --> C[消费者组 B] B --> D[消费者 A1:处理消息 1,4,7] B --> E[消费者 A2:处理消息 2,5,8] B --> F[消费者 A3:处理消息 3,6,9] C --> G[消费者 B1:处理消息 1,2,3] C --> H[消费者 B2:处理消息 4,5,6] subgraph 消息确认机制 I[XACK:确认消费] J[XPEND:待确认列表] K[XPCLAIM:转移超时消息] end D --> I E --> I J --> K

关键机制:消费者读取消息后,消息进入 Pending 列表(待确认状态)。消费者处理完成后发送 XACK 确认。如果消费者宕机,Pending 中的消息不会被确认,其他消费者可以通过 XPCLAIM 接管超时消息,实现故障转移。

三、工程化实现

3.1 消息生产者

// StreamProducer.java @Component @RequiredArgsConstructor public class StreamProducer { private final StringRedisTemplate redisTemplate; // 发送消息到 Stream public RecordId sendMessage(String streamKey, Map<String, String> message) { // 使用 MAXLEN 限制 Stream 长度,防止内存无限增长 // 近似裁剪(~)避免性能影响 StringRecord record = StreamRecords.string(message) .withStreamKey(streamKey); RecordId recordId = redisTemplate.opsForStream().add( record, XAddOptions.maxlen(100000).approximateTrimming() ); if (recordId == null) { throw new RuntimeException("消息发送失败"); } return recordId; } // 发送带业务 ID 的消息,支持幂等去重 public RecordId sendMessageWithId( String streamKey, String messageId, Map<String, String> message ) { message.put("_biz_id", messageId); // 检查是否已发送(基于业务 ID 去重) // 简化实现:实际应使用 Redis Hash 维护去重表 return sendMessage(streamKey, message); } }

3.2 消费者组与消息消费

// StreamConsumer.java @Component @RequiredArgsConstructor @Slf4j public class StreamConsumer { private final StringRedisTemplate redisTemplate; private final OrderEventHandler orderEventHandler; private static final String STREAM_KEY = "order:events"; private static final String GROUP_NAME = "order-processor"; private static final String CONSUMER_NAME = "consumer-1"; @PostConstruct public void init() { // 创建消费者组(如果不存在) try { redisTemplate.opsForStream().createGroup( STREAM_KEY, GROUP_NAME ); } catch (Exception e) { // 组已存在,忽略 log.info("消费者组已存在:{}", GROUP_NAME); } } // 拉取模式消费消息 @Scheduled(fixedDelay = 100) // 每 100ms 拉取一次 public void consumeMessages() { // 从消费者组读取未消费的消息 List<MapRecord<String, Object, Object>> records = redisTemplate.opsForStream().read( Consumer.from(GROUP_NAME, CONSUMER_NAME), StreamReadOptions.empty().count(10), StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()) ); if (records == null || records.isEmpty()) { return; } for (MapRecord<String, Object, Object> record : records) { try { // 处理消息 handleMessage(record); // 确认消费 redisTemplate.opsForStream().acknowledge( STREAM_KEY, GROUP_NAME, record.getId() ); } catch (Exception e) { // 处理失败:消息留在 Pending 列表,等待重试 log.error("消息处理失败:{}, 错误:{}", record.getId(), e.getMessage()); } } } private void handleMessage(MapRecord<String, Object, Object> record) { Map<Object, Object> body = record.getValue(); String eventType = (String) body.get("eventType"); switch (eventType) { case "ORDER_CREATED" -> orderEventHandler.handleCreated(body); case "ORDER_PAID" -> orderEventHandler.handlePaid(body); case "ORDER_CANCELLED" -> orderEventHandler.handleCancelled(body); default -> log.warn("未知事件类型:{}", eventType); } } }

3.3 Pending 消息监控与故障转移

// PendingMessageMonitor.java @Component @RequiredArgsConstructor @Slf4j public class PendingMessageMonitor { private final StringRedisTemplate redisTemplate; private static final String STREAM_KEY = "order:events"; private static final String GROUP_NAME = "order-processor"; private static final Duration MESSAGE_TIMEOUT = Duration.ofMinutes(5); // 定期检查 Pending 消息,转移超时消息 @Scheduled(fixedDelay = 60000) // 每分钟检查一次 public void monitorPendingMessages() { PendingMessagesSummary summary = redisTemplate.opsForStream() .pending(STREAM_KEY, GROUP_NAME); if (summary == null || summary.getTotalPendingMessages() == 0) { return; } log.info("Pending 消息数:{}", summary.getTotalPendingMessages()); // 获取超时的 Pending 消息 PendingMessages pending = redisTemplate.opsForStream().pending( STREAM_KEY, Consumer.from(GROUP_NAME, "monitor"), Range.unbounded(), 100 ); if (pending == null) return; for (PendingMessage pm : pending) { // 消息超过 5 分钟未确认,视为超时 if (pm.getElapsedTimeSinceLastDelivery().compareTo( MESSAGE_TIMEOUT) > 0) { // 将超时消息转移给当前消费者重新处理 redisTemplate.opsForStream().claim( STREAM_KEY, GROUP_NAME, "monitor", Duration.ZERO, pm.getId() ); log.warn("转移超时消息:{}, 原消费者:{}", pm.getId(), pm.getConsumerName()); } } } }

四、Redis Stream 的边界与权衡

内存容量限制:Stream 数据存储在 Redis 内存中,消息量受内存限制。虽然 MAXLEN 可以控制 Stream 长度,但裁剪后的消息无法恢复。对于需要长期保存的消息(如审计日志),必须将消息归档到外部存储(如 MySQL、对象存储)。

消费延迟与拉取模式:Redis Stream 的消费是拉取模式(XREADGROUP),需要客户端定期轮询。相比 Kafka 的推模式,拉取模式增加了消费延迟。优化策略是:低延迟场景使用 BLOCK 选项阻塞等待,高吞吐场景使用 COUNT 批量拉取。

消费者再平衡的缺失:Kafka 有完善的消费者再平衡机制(消费者加入/离开时自动重新分配分区),Redis Stream 没有内置的再平衡。消费者宕机后,Pending 消息需要手动通过 XPCLAIM 转移。建议实现自动化的 Pending 监控和转移机制。

与专业消息队列的差距:Redis Stream 适合轻量级消息场景,但在消息可靠性、分区扩展、消费者再平衡等方面与 Kafka/RabbitMQ 有显著差距。如果业务对消息可靠性要求极高(如金融交易),应使用专业消息队列而非 Redis Stream。

五、总结

Redis Stream 为轻量级消息队列场景提供了开箱即用的解决方案,在不需要引入 Kafka 的复杂度时是一个务实的选择。落地路线上,建议先用 Stream 替代 Pub/Sub 实现可靠消息传递,再逐步引入消费者组和 Pending 监控。关键原则:Stream 是 Redis 的功能而非专业消息队列,适合轻量场景,重度消息场景仍需 Kafka/RabbitMQ。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询