RocketMQ一些事情

家电修理 2023-07-16 19:17www.caominkang.com电器维修

1、RocketMQ如何保证消息队列按照顺序执行
原因是因为发送消息的时候,消息发送默认是会采用轮询的方式发送到不通的queue(分区),而消费端消费的时候,是会分配到多个queue的,多个queue是拉取提交消费
如图所示


 

实现步骤
如果控制发送只能依次发送到同一个queue上,然后消费时候一个队列只会被一个消费者消费的原则,也只从这个queue上拉取,就保证了顺序性
如果消费端也不控制,也会导致不按顺序执行

消费端处理问题--串行执行策略
RokcetMQ的完成顺序性主要是由3把琐来实现的。下图是RocketMQ顺序消费的工作原理

1、消费端在启动时会进行队列负载机制,遵循一个消费者可以分配多个队列,但一个队列只会被一个消费者消费的原则。

2、消费者根据分配的队列,向 Broker 申请琐,如果申请到琐,则拉取消息,否则放弃消息拉取,等到下一个队列负载周期(20s)再试。

3、拉取到消息后会在消费端的线程池中进行消费,但消费的时候,会对消费队列进行加锁,即同一个消费队列中的多条消息会串行执行。

4、在消费的过程中,会对处理队列(ProessQueue)进行加锁,保证处理中的消息消费完成,发生队列负载后,其他消费者才能继续消费。

2、Rocketmq高可用搭建
要保证高可用,技术方案如图所示


RocketMQ是通过broker主从机制来实现高可用的。相同broker名称,不同brokerid的机器组成一个broker组,brokerId=0表明这个broker是master,brokerId>0表明这个broker是slave。

Master角色的 Broker 支持读和写,Slave 角色的 Broker 仅支持读,也就是Producer只能和Master角色的Broker连接写入消息;Consumer可以连接Master 角色的 Broker,也可以连接Slave角色的Broker 来读取消息。
 
(1)、集群部署模式多台NameServer+多台主备的broker(Master Broker和Slave Broker)+主从同步复制+异步刷盘

 

(2)、同步刷盘、异步刷盘
RocketMQ的消息是存储到磁盘上的,这样既能保证断电后恢复,又可以让存储的消息量超出内存的限制。
RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ的时候,有两种写磁盘方式

  1)异步刷盘方式在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入 

           优点性能高

           缺点Master宕机,磁盘损坏的情况下,会丢失少量的消息, 导致MQ的消息状态和生产者/消费者的消息状态不一致

  2)同步刷盘方式在返回应用写成功状态前,消息已经被写入磁盘。具体流程是,消息写入内存的PAGECACHE后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,给应用返回消息写成功的状态。

          优点可以保持MQ的消息状态和生产者/消费者的消息状态一致

          缺点性能比异步的低

刷盘消息存储重要部件
CommitLog存储消息的元数据
ConsumerQueue存储消息在CommitLog的索引
IndexFile提供了一种通过key或者时间区间来查询消息的方法

 

(3)、主从复制(同步)
如果一个broker有master和slave时,就需要将master上的消息复制到slave上,复制的方式有两种方式

同步复制master和slave均写成功,才返回客户端成功。maste挂了以后可以保证数据不丢失,同步复制会增加数据写入延迟,降低吞吐量
异步复制master写成功,返回客户端成功。拥有较低的延迟和较高的吞吐量,当master出现故障后,有可能造成数据丢失

主从同步期间读取数据是如何拉取?---拉去消息过程
Master负责接收数据之后,再将数据同步给Slave,Master和Slave都会向NameServer注册路由信息,每隔30秒发送一次心跳。RocketMQ主从架构数据同步采用的是Pull模式,Slave不停的发送请求到Master上去拉取消息
与其他传统的主从架构不一样的是,RocketMQ的主从架构并不是纯粹的读写分离。写的话还是依靠Master,而读的话,既可能是从Master读,也可能是从Slave读,这取决于Master

读消息的时候,第一次是从Master上去读的,之后是去Master还是Slave上读消息,是由Master根据自身的负载情况和Slave的数据同步情况来向消费者建议。如果Master的写负载已经很高的话,那么就会建议消费者下次拉取消息的话就去Slave上拉取;如果Slave数据同步较慢,还没有完全同步Master的消息的话,那么下一次还是会来Master上拉取消息


(4)、Dledger技术技术实现故障切换----防止脑裂
当Master节点宕机之后,会对Slave节点进行选举然后选出新的Master继续提供服务。在RocketMQ 4.5版本之前,还不支持自动故障切换,需要手工调整才能完成
在RocketMQ 4.5版本之后,引入了Dledger技术,是基于Raft协议实现的(如果不清楚Raft协议可以参考一下这个链接)。DLedger引入之后,可以让一个Master对应多个Slave,也就是存在多个副本,一旦Master宕机了,多个Slave之间就会通过DLedger技术和Raft协议算法进行选举,选出来的Slave节点就作为新的Master继续提供服务。

那么DLedger技术的原理是什么呢?
其实DLedger只做一件事情,就是CommitLog,DLedger可以让我们忽略其他的东西,直接关注CommitLog即可。CommitLog在RocketMQ中就是持久化的消息,所以使用DLedger技术其实就是让DLedger来替代RocketMQ的Broker管理CommitLog,然后Broker还是可以基于DLedger管理的CommitLog去构建出来机器上的各个ConsumerQueue磁盘文件

绕来绕去可能有点晕,来理一下里面的因果关系。DLedger技术可以基于Raft协议来进行选主,将RocketMQ的CommitLog交给DLedger来管理,那么在Master宕机的时候,DLedger就可以自己进行选举,然后把选举的结果交给RocketMQ的Broker,下次通过NameServer获取路由信息的时候就可以直接找到新的Master,从而完成自动故障切换

DLedger的选举的机制到底是什么样的
DLedger是基于Raft协议来进行选主的,大致流程为
当Leader宕机,我们假设此时有3个Folloer,分别为Broker1、Broker2、Broker3,此时他们都没有接收到其他人的投票,所以此时都会投给自己,这一轮无法选出新的Leader,接下来所有的Folloer随机休眠一段时间,比如依次休眠1s、2s、3s,优先苏醒的Broker1还会投票给自己,然后将投票信息发送给其他的节点,此时Broker2苏醒,接收到了Broker1的投票信息,就会尊重他的决定,也给Broker1投票,接着Broker3苏醒之后,也是一样的给Broker1投票,此时Broker1就获得了大多数节点的支持,当有一个节点获得了(N / 2) + 1个,也就是大多数节点的支持之后,就能成功当选
上面的那个步骤,反复几次之后,一定是可以选出新的Leader的

来说说DLedger如何基于Raft协议进行多副本同步
DLedger多副本数据同步,Leader Broker收到一条消息,通过DledgerServer组件发送一个unmited消息给Follo Broker的DledgerServer,Folloer接收到消息之后发送一个ack确认消息给Leader,多数Folloer都确认之后,Leader再发送一个mited消息到Folloer上,Folloer将接收到的消息置为mited,此时就完成了数据同步
 

DLedger做的三件事情
1.接管broker的CommitLog消息存储
2.从集群中选举出master节点
3.完成master节点往slave节点的消息同步
 

(5)、消息重试和死信队列
发送端重试
producer向broker发送消息后,没有收到broker的ack时,rocketmq会自动重试。重试的最大次数和发送超时时间都可以设置。如设置producer3秒内没有发送成功,则重试,重试的最大次数为3

消费端重试
顺序消息的重试
对于顺序消息,当Consumer消费消息失败后,RocketMQ会不断进行消息重试,此时后续消息会被阻塞。所以当使用顺序消息的时候,监控一定要做好,避免后续消息被阻塞

无序消息的重试
当消费模式为集群模式时,Broker才会自动进行重试,对于广播消息是不会进行重试的
当consumer消费消息后返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表明消费消息成功,不会进行重试
当consumer符合如下三种场景之一时,会对消息进行重试

返回ConsumeConcurrentlyStatus.RECONSUME_LATER
返回null
抛出抛出异常
RocketMQ默认每条消息会被重试16次,超过16次则不再重试,会将消息放到死信队列,

每次重试的时间间隔如下


重试队列和死信队列
当消息消费失败,会被发送到重试队列
当消息消费失败,并达到最大重试次数,rocketmq并不会将消息丢弃,而是将消息发送到死信队列

死信队列有如下特点
里面存的是不能被正常消费的消息
有效期与正常消息相同,都是3天,3天后会被删除
重试队列的命名为 %RETRY%消费组名称 死信队列的命名为 %DLQ%消费组名称

一个死信队列包含了一个group id产生的所有消息,不管当前消息处于哪个ic。重试队列和死信队列只有在需要的时候才会被创建出来

消费模式集群模式和广播模式的区别
(1)集群模式同一个consumerGroupName下的多个consumer平摊消息队列中的消息,例如三个消费者处于同一个group下,且订阅了同一个ic,加入生产者往消息队列中放入了这个ic的6条消息,那么消费者消费消息的总和为6条,消费完的消息不能被其他实例所消费
(2)广播模式指的是consumer属于同一个ConsumerGroup,消息也会被ConsumerGroup中的每个Consumer都消费一次,广播消费中ConsumerGroup概念可以认为在消息划分方面无意义
 

集群工作流程

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。


3、RocketMQ 消息丢失场景分析及如何解决?

这三种场景都可能会产生消息的丢失,如下图所示

1、场景1中生产者将消息发送给Rocket MQ的时候,如果出现了网络抖动或者通信异常等问题,消息就有可能会丢失
2、场景2中消息需要持久化到磁盘中,这时会有两种情况导致消息丢失

  • RocketMQ为了减少磁盘的IO,会先将消息写入到os cache中,而不是直接写入到磁盘中,消费者从os cache中获取消息类似于直接从内存中获取消息,速度更快,过一段时间会由os线程异步的将消息刷入磁盘中,此时才算真正完成了消息的持久化。在这个过程中,如果消息还没有完成异步刷盘,RocketMQ中的Broker宕机的话,就会导致消息丢失
  • 如果消息已经被刷入了磁盘中,数据没有做任何备份,一旦磁盘损坏,那么消息也会丢失

3、消费者成功从RocketMQ中获取到了消息,还没有将消息完全消费完的时候,就通知RocketMQ我已经将消息消费了,然后消费者宕机,RocketMQ认为消费者已经成功消费了数据,所以数据依旧丢失了。那么如何保证消息的零丢失呢?

解决方案
1、场景1中保证消息不丢失的方案是使用RocketMQ自带的事务机制来发送消息,大致流程为

  • 生产者发送half消息到RocketMQ中,此时消费者是无法消费half消息的,若half消息就发送失败了,则执行相应的回滚逻辑
  • half消息发送成功之后,且RocketMQ返回成功响应,则执行生产者的核心链路
  • 如果生产者自己的核心链路执行失败,则回滚,并通知RocketMQ删除half消息
  • 如果生产者的核心链路执行成功,则通知RocketMQ mit half消息,让消费者可以消费这条数据

half消息指的是不可消费消息

2、场景2中要保证消息不丢失,大致流程为
需要将os cache的异步刷盘策略改为同步刷盘,这一步需要修改Broker的配置文件,将flushDiskType改为SYNC_FLUSH同步刷盘策略,默认的是ASYNC_FLUSH异步刷盘。

一旦同步刷盘返回成功,那么就一定保证消息已经持久化到磁盘中了;为了保证磁盘损坏不会丢失数据,我们需要对RocketMQ采用主从机构,集群部署,Leader中的数据在多个Folloer中都存有备份,防止单点故障。3、在场景3中,消息到达了消费者,RocketMQ在代码中就能保证消息不会丢失

3、场景3中要保证消息不丢失,大致流程为
第一种情况如果消费者还没有返回CONSUME_SUCCESS时就宕机了,那么RocketMQ就会认为你这个消费者节点挂掉了,会自动故障转移,将消息交给消费者组的其他消费者去消费这个消息,保证消息不会丢失为了保证消息不会丢失

第二种情况就有可能出现消息还没有被消费完,消费者告诉RocketMQ消息已经被消费了,返回CONSUME_SUCCESS,结果消费端业务层宕机丢失消息的情况
当业务中开了个线程导致主业务完成,开启的线程导致子任务失败,可以根据情况做补偿机制


 

4、延迟队列实现原理?
Broker端内置延迟消息处理能力,核心实现思路都是一样将延迟消息通过一个临时存储进行暂存,到期后才投递到目标Topic中。如下图所示
 

 

 步骤说明如下

  1. producer要将一个延迟消息发送到某个Topic中
  2. Broker判断这是一个延迟消息后,将其通过临时存储进行暂存。
  3. Broker内部通过一个延迟服务(delay service)检查消息是否到期,将到期的消息投递到目标Topic中。这个的延迟服务名字为delay service,不同消息中间件的延迟服务模块名称可能不同。
  4. 消费者消费目标ic中的延迟投递的消息



5、消费消息是push还是pull?

RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式

broker端属性 longPollingEnable 标记是否开启长轮询。默认开启

为什么要主动拉取消息而不使用事件监听方式?

事件驱动方式是建立好长连接,由事件(发送数据)的方式来实时推送。

如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在consumer端堆积过多,又不能被其他consumer消费的情况。而pull的方式可以根据当前自身情况来pull,不会造成过多的压力而造成瓶颈。所以采取了pull的方式。

Copyright © 2016-2025 www.caominkang.com 曹敏电脑维修网 版权所有 Power by