RocketMQ源码深度解析(二)Netty通信、Broker心跳注册、消息收发、客户端负载均衡原理
2026/6/6 4:24:56 网站建设 项目流程

一、RocketMQ 底层 Netty 通信框架原理

1.1 核心定位

RocketMQ所有网络交互全部基于 Netty NIO 通信实现,是整个中间件的通信基石。所有组件(NameServer、Broker、Producer、Consumer)的请求、响应、心跳、数据传输,都统一基于 Netty 异步非阻塞模型完成。

Netty的所有远程通信功能都由remoting模块实现。remoting模块中有两个对象最为重要。就是RPC的服务端RemotingServer以及客户端RemotingClient。在RocketMQ中,涉及到的远程服务⾮常多,同⼀个服务,可能既是RPC的服务端也可以是RPC的客户端。例如Broker服务,对于Client来说,他需要作为服务端响应他们发送消息以及拉取消息等请求,所以Broker是需要RemotingServer的。⽽另⼀⽅⾯,Broker需要主动向NameServer发送⼼跳请求,这时,Broker⼜
需要RemotingClient。因此,Broker既是RPC的服务端⼜是RPC的客户端。

核心设计特点:

  • 基于Reactor 主从多线程模型,高吞吐、低延迟

  • 统一协议封装:RemotingCommand(所有请求、响应统一对象)

  • 全异步通信,支持同步/异步/单向调用模式

  • 服务端:NameServer(9876)、Broker(10911)

  • 客户端:Producer、Consumer、Broker 内部远程调用

1.2 Netty 服务端启动初始化流程

以 Broker、NameServer 公共 Netty 启动逻辑为例,启动流程标准化:

  1. 加载NettyServerConfig端口、线程数、缓冲区配置

  2. 创建 Boss 主线程组(1个线程,负责监听连接)

  3. 创建 Worker 工作线程组(多线程,负责读写处理)

  4. 初始化 Channel 流水线:注册自定义编解码器、空闲检测、业务处理器

  5. 绑定监听端口,启动 NIO 服务

  6. 循环接收客户端连接,异步处理所有 RemotingCommand 请求

1.3 Netty 通信流程图

1.4 核心源码片段(Netty 启动模板)

@Override public void start() { this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("NettyServerCodecThread_")); //初始化netty服务端参数信息 initServerBootstrap(serverBootstrap); //绑定对应通道信息 try { ChannelFuture sync = serverBootstrap.bind().sync(); InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress(); if (0 == nettyServerConfig.getListenPort()) { this.nettyServerConfig.setListenPort(addr.getPort()); } log.info("RemotingServer started, listening {}:{}", this.nettyServerConfig.getBindAddress(), this.nettyServerConfig.getListenPort()); this.remotingServerTable.put(this.nettyServerConfig.getListenPort(), this); } catch (Exception e) { throw new IllegalStateException(String.format("Failed to bind to %s:%d", nettyServerConfig.getBindAddress(), nettyServerConfig.getListenPort()), e); } if (this.channelEventListener != null) { this.nettyEventExecutor.start(); } //其他定时器启动 ...... }

1.5 服务端构建处理链的核心代码

protected ChannelPipeline configChannel(SocketChannel ch) { return ch.pipeline() .addLast(getDefaultEventExecutorGroup(), HANDSHAKE_HANDLER_NAME, new HandshakeHandler()) .addLast(getDefaultEventExecutorGroup(), encoder,//请求编码器 new NettyDecoder(),//请求解码器 distributionHandler,//请求计数器 new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),//⼼跳管理器 connectionManageHandler,//连接管理器 serverHandler//核⼼的业务处理器 ); }
1.5.1 请求参数
从请求的编解码器可以看出,RocketMQ的所有RPC请求数据都封装成RemotingCommand对象。RemotingCommand对象中有⼏个重要的属性
private int code; //响应码,表示请求处理成功还是失败 private int opaque = requestId.getAndIncrement(); //服务端内部会构建唯⼀的请求ID。 private transient CommandCustomHeader customHeader; //⾃定义的请求头。⽤来区分不同的业务请求 private transient byte[] body; //请求参数体 private int flag = 0; //参数类型, 默认0表示请求,1表示响应
1.5.2 处理逻辑
所有核⼼的业务请求都是通过⼀个NettyServerHandler进⾏统⼀处理。他处理时的核⼼代码如下:
@ChannelHandler.Sharable public class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { //统一处理所有业务请求 @Override protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) { int localPort = RemotingHelper.parseSocketAddressPort(ctx.channel().localAddress()); NettyRemotingAbstract remotingAbstract = NettyRemotingServer.this.remotingServerTable.get(localPort); if (localPort != -1 && remotingAbstract != null) { remotingAbstract.processMessageReceived(ctx, msg);//核⼼处理请求的⽅法 return; } // The related remoting server has been shutdown, so close the connected channel RemotingHelper.closeChannel(ctx.channel()); } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { //调整channel的读写属性 } }
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) { if (msg != null) { switch (msg.getType()) { case REQUEST_COMMAND: processRequestCommand(ctx, msg); break; case RESPONSE_COMMAND: processResponseCommand(ctx, msg); break; default: break; } } }
  • 在最核⼼的处理请求的processMessageReceived⽅法中,会将请求类型分为 REQUEST__COMMAND 和 RESPONSE_COMMAND来处理。**为什么会有两种不同类型的请求呢?这是因为客户端的业务请求会有两种类型:⼀种是客户端发过来的业务请求,另⼀种是客户上次发过来的业务请求,可能并没有同步给出相应。这时就需要客户端再发⼀个response类型的请求,获取上⼀次请求的响应。这也就能⽀持异步的RPC调⽤。
  • 如何处理request类型的请求?

服务端和客户端都会维护⼀个processorTable。这是个HashMap,key是服务码,也就对应RemotingCommand的code。value是对应的运行单元Pair<NettyRequestProcessor,ExecutorService>。包含了执行线程的线程池和具体处理业务的Processor。而这些Processor,是由业务系统自主注册的。也就是说,想要看每个服务具体有哪些业务能力,就只要看他们注册了哪些Processor就知道了。

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) { //根据业务类型找到线程中的事务处理器 final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); //未找到对应的事务处理器,则初始化一个默认的处理器 final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessorPair : matched; final int opaque = cmd.getOpaque(); //无默认处理器,抛出异常 if (pair == null) { String error = " request type " + cmd.getCode() + " not supported"; final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error); response.setOpaque(opaque); this.writeResponse(ctx.channel(), cmd, response, null); log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error); return; } //构建请求信息,并执行请求获取最终结果 Runnable run = buildProcessRequestHandler(ctx, cmd, pair, opaque); ...... }

Broker服务注册,详⻅ BrokerController.registerProcssor()⽅法。

NameServer的服务注册⽅法,重点如下:

private void registerProcessor() { if (namesrvConfig.isClusterTest()) {//是否测试集群模式,默认是false。也就是说现在阶段不推荐。 this.remotingServer.registerDefaultProcessor(new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.defaultExecutor); } else { // Support get route info only temporarily ClientRequestProcessor clientRequestProcessor = new ClientRequestProcessor(this); this.remotingServer.registerProcessor(RequestCode.GET_ROUTEINFO_BY_TOPIC, clientRequestProcessor, this.clientRequestExecutor); this.remotingServer.registerDefaultProcessor(new DefaultRequestProcessor(this), this.defaultExecutor); } }
  • 如何处理response类型的请求
NettyServer处理完request请求后,会先缓存到responseTable中,等NettyClient下次发送response类型的请求,再来获取。这样就不⽤阻塞Channel,提升请求的吞吐量。优雅的⽀持了异步请求。
RocketMQ的RemotingServer服务端,会维护一个responseTable,这是一个线程同步的Map结构。 key为请求的ID,value是异步的消息结果。ConcurrentMap<Integer /* opaque */, ResponseFuture> 。
处理同步请求(NettyRemotingAbstract#invokeSyncImpl)时,处理的结果会存responseTable,通过ResponseFuture提供一定的服务端异步处理支持,提升服务端的吞吐量。 请求返回后,⽴即从responseTable中移除请求记录。
//org.apache.rocketmq.remoting.netty.ResponseFuture //发送消息后,通过countDownLatch阻塞当前线程,造成同步等待的效果。 public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException { this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); return this.responseCommand; } //等待异步获取到消息后,再通过countDownLatch释放当前线程。 public void putResponse(final RemotingCommand responseCommand) { this.responseCommand = responseCommand; this.countDownLatch.countDown(); }

处理异步请求(NettyRemotingAbstract#invokeAsyncImpl)时,处理的结果依然会存responsTable,等待客户端后续再来请求结果。但是他保存的依然是一个ResponseFuture,也就是在客户端请求结果时再去获取真正的结果。另外,在RemotingServer启动时,会启动⼀个定时的线程任务,不断扫描responseTable,将其中过期的response清除掉。

TimerTask timerScanResponseTable = new TimerTask() { @Override public void run(Timeout timeout) { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } finally { timer.newTimeout(this, 1000, TimeUnit.MILLISECONDS); } } }; this.timer.newTimeout(timerScanResponseTable, 1000 * 3, TimeUnit.MILLISECONDS);
  • 整体流程

可以看到,RocketMQ基于Netty框架实现的这一套基于服务码的服务注册机制,即可以让各种不同的组件都按照自己的需求注册自己的服务方法,又可以以一种统一的方式同时支持同步请求和异步请求。所以这一套框架,其实是非常简洁易用的。在使用Netty框架进行相关应用开发时,都可以借鉴他的这一套服务注册机制。例如开发一个大型的IM项目,要添加好友、发送文本、发送图片、发送附件、甚至还有表情、红包等等各种各样的请求。这些请求如何封装,就可以参考这一套服务注册框架。


二、Broker 心跳注册与路由管理机制

2.1 机制核心作用

Broker 启动后必须主动向 NameServer注册 + 定时心跳保活,让 NameServer 维护全局最新的集群路由信息,为生产者、消费者提供路由发现能力。

2.2 完整运行机制

  1. 启动立即注册:Broker 初始化完成后,立刻调用 registerBrokerAll 向所有 NameServer 注册自身信息

  2. 定时心跳保活:每30s执行一次注册请求,复用注册接口作为心跳

  3. NameServer 维护路由:NameServer 将 Broker 信息、Topic 队列信息存入内存 RouteInfoManager,

  4. 失效剔除机制:NameServer 每10s扫描一次,120s 无心跳则判定 Broker 下线,剔除路由

2.3 Broker 心跳注册流程图

2.4 核心源码:定时心跳任务

//K4 Broker向NameServer进⾏⼼跳注册 if (!isIsolated && !this.messageStoreConfig.isEnableDLegerCommitLog() && !this.messageStoreConfig.isDuplicationEnable()) { changeSpecialServiceStatus(this.brokerConfig.getBrokerId() == MixAll.MASTER_ID); this.registerBrokerAll(true, false, true); } //启动后定时注册 scheduledFutures.add(this.scheduledExecutorService.scheduleAtFixedRate(new AbstractBrokerRunnable(this.getBrokerIdentity()) { @Override public void run0() { try { if (System.currentTimeMillis() < shouldStartTime) { BrokerController.LOG.info("Register to namesrv after {}", shouldStartTime); return; } if (isIsolated) { BrokerController.LOG.info("Skip register for broker is isolated"); return; } BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { BrokerController.LOG.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS));

2.5 整体运行流程


三、Producer 消息发送源码完整流程

3.1Producer的核⼼启动流程

所有Producer的启动过程,最终都会调用到DefaultMQProducerlmpl#start方法。在start方法中的通过一个mQClientFactory对象,启动生产者的一大堆重要服务。
这个mQClientFactory是最为重要的一个对象,负责生产所有的Client,包括Producer和Consumer。这里其实就是一种设计模式,虽然有很多种不同的客户端,但是这些客户端的启动流程最终都是统一的,全是交由mQClientFactory对象来启动。而不同之处在于这些客户端在启动过程中,按照服务端的要求注册不同的信息。例如生产者注册到producerTable,消费者注册到consumerTable,管理控制端注册到adminExtTable

3.2 发送消息的核⼼流程

  1. 发送消息时,会维护一个本地的topicPublishInfoTable缓存,DefaultMQProducer会尽量保证这个缓存数据是最新的。但是,如果NameServer挂了,那么DefaultMQProducer还是会基于这个本地缓存去找Broker。只要能找到Broker,还是可以正常发送消息到Broker的。-可以在生产者示例中,start后打一个断点,然后把NameServer停掉,这时,Producer还是可以发送消息的。
  2. 生产者如何找MessageQueue:默认情况下,生产者是按照轮训的方式,依次轮训各个MessageQueue。但是如果某一次往一个Broker发送请求失败后,下一次就会跳过这个Broker。
//org.apache.rocketmq.client.impl.producer.TopicPublishInfo //QueueFilter是⽤来过滤掉上⼀次失败的Broker的,表示上⼀次向这个Broker发送消息是失败的,这时就尽量不要再往这个Broker发送消息了。 private MessageQueue selectOneMessageQueue(List<MessageQueue> messageQueueList, ThreadLocalIndex sendQueue, QueueFilter ...filter) { if (messageQueueList == null || messageQueueList.isEmpty()) { return null; } if (filter != null && filter.length != 0) { for (int i = 0; i < messageQueueList.size(); i++) { int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); MessageQueue mq = messageQueueList.get(index); boolean filterResult = true; for (QueueFilter f: filter) { Preconditions.checkNotNull(f); filterResult &= f.filter(mq); } if (filterResult) { return mq; } } return null; } int index = Math.abs(sendQueue.incrementAndGet() % messageQueueList.size()); return messageQueueList.get(index); }
  1. 如果在发送消息时传了Selector,那么Producer就不会⾛这个负载均衡的逻辑,⽽是会使⽤Selector去寻找⼀个队列。 具体参⻅org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendSelectImpl ⽅法
//K4 Producer顺序消息的发送⽅法 public MessageQueue invokeMessageQueueSelector(Message msg, MessageQueueSelector selector, Object arg, final long timeout) throws MQClientException, RemotingTooMuchRequestException { long beginStartTime = System.currentTimeMillis(); this.makeSureStateOK(); Validators.checkMessage(msg, this.defaultMQProducer); TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; try { List<MessageQueue> messageQueueList = mQClientFactory.getMQAdminImpl().parsePublishMessageQueues(topicPublishInfo.getMessageQueueList()); Message userMessage = MessageAccessor.cloneMessage(msg); String userTopic = NamespaceUtil.withoutNamespace(userMessage.getTopic(), mQClientFactory.getClientConfig().getNamespace()); userMessage.setTopic(userTopic); //由selector选择出⽬标mq mq = mQClientFactory.getClientConfig().queueWithNamespace(selector.select(messageQueueList, userMessage, arg)); } catch (Throwable e) { throw new MQClientException("select message queue threw exception.", e); } long costTime = System.currentTimeMillis() - beginStartTime; if (timeout < costTime) { throw new RemotingTooMuchRequestException("sendSelectImpl call timeout"); } if (mq != null) { return mq; } else { throw new MQClientException("select message queue return null.", null); } } validateNameServerSetting(); throw new MQClientException("No route info for this topic, " + msg.getTopic(), null); }

四、Consumer 消息拉取源码完整流程

4.1 核心特点

RocketMQ 消费者为主动 Pull 拉取模式,区别于推模式,流量完全可控、不会压垮消费者,稳定性极强。

4.1 启动流程:

Consumer的核⼼启动过程和Producer是⼀样的, 最终都是通过mQClientFactory对象启动。不过之间添加了⼀些注册信息。整体的启动过程如下:

4.3 ⼴播模式与集群模式的Offset处理

在DefaultMQPushConsumerImpl的start⽅法中,启动了⾮常多的核⼼服务。 ⽐如,对于⼴播模式与集群模式的Offset处理

if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load();
可以看到,⼴播模式是使⽤LocalFileOffsetStore,在Consumer本地保存Offset,⽽集群模式是使⽤RemoteBrokerOffsetStore,在Broker端远程保存offset。 ⽽这两种Offset的存储⽅式,最终都是通过维护本地的offsetTable缓存来管理Offset。

4.4Consumer与MessageQueue建⽴绑定关系

在 Consumer 的 start 方法中,有一处关键逻辑是给rebalanceImpl设置队列分配策略AllocateMessageQueueStrategy,该策略用于完成 Consumer 和 MessageQueue 的分配绑定。

相关源码:

this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); //Consumer负载均衡策略 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());

AllocateMessageQueueStrategy负责建立 Consumer 与 MessageQueue 的对应关系:Topic 的队列数量、同一个消费组下 Consumer 实例数量没有变动时,单个 Consumer 固定消费分配到自身的一个或多个 MessageQueue,同组其他 Consumer 不会抢占该队列

AllocateMessageQueueStrategy 的 7 个实现类

AllocateMessageQueueStrategy是队列分配顶层接口,RocketMQ 原生提供 7 种策略实现:

  1. AbstractAllocateMessageQueueStrategy(抽象父类)
  2. AllocateMachineRoomNearby
  3. AllocateMessageQueueAveragely
  4. AllocateMessageQueueAveragelyByCircle
  5. AllocateMessageQueueByConfig
  6. AllocateMessageQueueByMachineRoom
  7. AllocateMessageQueueConsistentHash

思考一下:为什么一个 MessageQueue 只能被同一个消费组中的一个 Consumer 消费

Broker 按照ConsumerGroup+MessageQueue维度管理队列的 Offset 消费位点。 如果同一个队列被同组多个 Consumer 实例消费,不同消费者的消息处理进度不一致,会造成队列的 Offset 错乱,因此 RocketMQ 做了约束:单个 MessageQueue 在同一个消费组内,仅能由一个 Consumer 实例负责消费。

4.5 顺序消费与并发消费启动逻辑

在 Consumer 的start()方法内,会根据注册的监听器类型,创建不同的消息拉取服务ConsumerMessageService,以此区分顺序消费并发消费

// 判断是否为顺序消费监听器 if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {//顺序消费监听器 this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); //POPTODO reuse Executor:POP消费模式,本篇暂不关注 this.consumeMessagePopService = new ConsumeMessagePopOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {//并发消费监听器 this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); //POPTODO reuse Executor:POP消费模式,本篇暂不关注 this.consumeMessagePopService = new ConsumeMessagePopConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } //启动消息消费服务 this.consumeMessageService.start(); //POPTODO //this.consumeMessagePopService.start();

核心规则

  1. 注册MessageListenerOrderly→ 标记consumeOrderly=true,创建顺序消费服务 ConsumeMessageOrderlyService
  2. 注册MessageListenerConcurrently→ 标记consumeOrderly=false,创建并发消费服务 ConsumeMessageConcurrentlyService
  3. 代码预留POP新模式实现(ConsumeMessagePopOrderlyService/ConsumeMessagePopConcurrentlyService),源码 TODO 标记,当前暂不启用。

消费者通过registerMessageListener注册的回调监听,最终会被封装为对应ConsumerMessageService的实现类。


二、POP/PULL 消息拉取入口:PullMessageService#run

PullMessageService随客户端一同启动,run()为消息拉取主循环方法,区分PULL、POP两种拉取模式:

@Override public void run() { logger.info(this.getServiceName() + " service started"); while (!this.isStopped()) { try { MessageRequest messageRequest = this.messageRequestQueue.take(); if (messageRequest.getMessageRequestMode() == MessageRequestMode.POP) { this.popMessage((PopRequest) messageRequest); } else { this.pullMessage((PullRequest) messageRequest); } } catch (InterruptedException ignored) { } catch (Exception e) { logger.error("Pull Message Service Run Method exception", e); } } logger.info(this.getServiceName() + " service end"); }

代码说明

  1. 死循环阻塞从messageRequestQueue队列获取拉取任务;
  2. 根据枚举MessageRequestMode区分:
    • POP:执行popMessage()(新增 POP 工作模式,TODO 暂不深究)
    • PULL:执行pullMessage()(传统主动拉取消息逻辑)
  3. 异常捕获只打印日志,不终止循环,保证消息拉取服务持续运行。

4.6 PullCallback 与顺序 / 并发消费提交逻辑

消费者拉取消息后的逻辑流转会进入DefaultMQPushConsumerImpl#pullCallback回调对象:

Consumer 每从 Broker 拉取到一批消息,就通过该回调提交消费请求,由此印证:顺序消费仅在异步推送 (Push) 模式生效,同步拉取模式无法实现顺序消费,原因是同步拉取场景下 pullCallback 不会被传递执行;但拉模式可由业务代码手动指定消费队列,实现自定义顺序消费。

PullCallback 核心源码

PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { switch (pullResult.getPullStatus()) { case FOUND: // 向消费服务提交消费任务 DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); break; // ...其他状态 } } } }

拉取到消息状态为FOUND时,调用consumeMessageService.submitConsumeRequest()提交消费任务,生成ConsumeRequest交由消费线程处理。


并发消费:ConsumeMessageConcurrentlyService#submitConsumeRequest

源码逻辑

@Override public void submitConsumeRequest( final List<MessageExt> msgs, final ProcessQueue processQueue, final MessageQueue messageQueue, final boolean dispatchToConsume) { final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize(); // 批量消息小于等于单次消费上限,整批提交 if (msgs.size() <= consumeBatchSize) { ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { this.submitConsumeRequestLater(consumeRequest); } } else { // 消息量大时,按批次拆分,循环提交多个消费任务 for (int total = 0; total < msgs.size(); ) { List<MessageExt> msgThis = new ArrayList<>(); for (int i = 0; i < consumeBatchSize; i++, total++) { if (total < msgs.size()) { msgThis.add(msgs.get(total)); } else { break; } } ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue); try { this.consumeExecutor.submit(consumeRequest); } catch (RejectedExecutionException e) { for (; total < msgs.size(); total++) { msgThis.add(msgs.get(total)); } this.submitConsumeRequestLater(consumeRequest); } } } }

并发消费特点📌

  1. 仅控制单次批量消费消息条数consumeBatchSize),不锁定 MessageQueue
  2. 线程池consumeExecutor多线程并行执行消费任务,同一个队列的消息会被多个线程并发处理,天然无法保证消息有序。

顺序消费:ConsumeMessageOrderlyService 核心规则

和并发消费最大区别:锁定单个 MessageQueue,当前队列全部消息处理完成后,才会拉取消费下一个队列消息,同一个队列消息单线程串行消费,以此实现消息顺序性。


五、客户端负载均衡机制(Producer + Consumer)

RocketMQ 负载均衡分为发送负载均衡(Producer)消费负载均衡(Consumer Rebalance),两者完全独立,是集群高可用、高吞吐的核心保障。

5.1 Producer 发送负载均衡

核心目的

将消息均匀分散到所有队列、所有 Broker 节点,最大化集群吞吐量,避免单队列热点瓶颈。

默认策略:轮询 RoundRobin
  • 基于原子计数器自增,对队列数量取模

  • 消息均匀打散到所有队列

  • 无状态、高性能、无需协调

5.2 Consumer 消费负载均衡(Rebalance重平衡)

核心目的

同一消费组内,将 Topic 的所有队列平均分配给组内所有消费者实例,保证消费均匀、不重复、不遗漏。

触发时机(默认20s一次)
  • 服务启动、服务上下线、集群扩容缩容

  • Topic 队列数量变更

默认策略:平均分配 AllocateMessageQueueAveragely
  • 队列总数、消费者总数做平均计算

  • 尽量保证每个消费者分配队列数量一致

  • 保证有序消息不冲突、消费负载均衡

5.3 消费者重平衡流程图

5.4 核心源码:消费者平均分配算法

public List<MessageQueue> allocate( String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { int currentIndex = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); // 计算当前消费者需要分配的队列数量 int avgSize = mqAll.size() / cidAll.size() + (mod > currentIndex ? 1 : 0); int start = currentIndex * avgSize; return mqAll.subList(start, start + avgSize); }

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询