Bootstrap

RocketMQ学习笔记

根据最近对RocketMQ的学习,记录了对应的学习笔记和知识点。

学习过程应该从以下几个方面入手:

下面的学习笔记只包括的RocketMQ本身基础概念、关键设计相关的知识点,其他相关的知识待后续整理。

1. 基础概念和特性

  • 消息模型(Message Model)

        RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。

        Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。

        Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。

  • RocketMQ中的Queue的概念和Kafka中的分区概念是一致的,有些文档为了通用,提到的分区的概念,等同于RocketMQ中的Queue。

  • 消息顺序性

        顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一个分区的消息被顺序消费即可。

        例如一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。

        RocketMQ可以保证分区里的消息是顺序性的,如果需要全Topic顺序性,可以用一些折中的方案,例如把Topic只设置成1个Queue(默认是4个)。

  • 消息过滤

        支持按照Tag进行过滤,也支持自定义属性过滤,消息过滤是在Broker端实现。优点是减少了对于Consumer无用消息的网络传输,缺点是增加了Broker的负担、而且实现相对复杂。

  • 消息可靠性

        RocketMQ支持消息的高可靠,影响消息可靠性的几种情况: 

        1) Broker非正常关闭 

        2) Broker异常Crash

        3) OS Crash 

        4) 机器掉电,但是能立即恢复供电情况 

        5) 机器无法开机(可能是cpu、主板、内存等关键设备损坏) 

        6) 磁盘设备损坏

        1)、2)、3)、4) 四种情况都属于硬件资源可立即恢复情况,RocketMQ在这四种情况下能保证消息不丢,或者丢失少量数据(依赖刷盘方式是同步还是异步)。

        5)、6)属于单点故障,且无法恢复,一旦发生,在此单点上的消息全部丢失。RocketMQ在这两种情况下,通过异步复制,可保证99%的消息不丢,但是仍然会有极少量的消息可能丢失。通过同步双写技术可以完全避免单点,同步双写势必会影响性能,适合对消息可靠性要求极高的场合,例如与Money相关的应用。注:RocketMQ从3.0版本开始支持同步双写

        所以,最保证消息可靠的设置是:单机同步刷盘 + 集群同步双写。但是势必要牺牲性能,需要根据实际业务场景进行权衡。

  • 消息消费方式

  •  支持至少一次(At least Once),和 OneWay消费方式。在至少一次(At least Once)模式下,需要做好幂等性设计。

  • 支持回溯消费。例如由于Consumer系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。

  •  支持事务消息,这是RocketMQ的一大亮点。是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。但是这种方式带来的缺点就是业务入侵性比较强。需要编写处理成功以及失败后的业务逻辑代码。

  •  支持延迟消息(定时消息),broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel。

注意,messageDelayLevel是broker的属性,不属于某个topic。

     定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并根据delayTimeLevel存入特定的queue,queueId = delayTimeLevel – 1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费 SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

需要注意的是,定时消息会在第一次写入和调度写入真实topic时都会计数,因此发送数量、tps都会变高。

  • 消息重试和消息重投

        Consumer消费消息失败后,要提供一种重试机制,令消息再消费一次。

通常出现这样的场景有两种:1. 消息本身的原因,例如反序列化失败、消息本身处理失败等;2. 依赖的下游服务不可用,例如db连接断开等。

RocketMQ的处理方式为:每个消费组都设置一个Topic名称为%RETRY%+consumerGroup”的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。

考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%+consumerGroup”的重试队列中。

 

消息重投是发生在Producer端,同步消息会重投,异步消息会重试,对于Oneway发送方式的消息没有任何保证。重投会更换Broker,异步重试只在当前Broker上进行。

具体如下:

  • retryTimesWhenSendFailed:同步发送失败重投次数,默认为2,因此生产者会最多尝试发送retryTimesWhenSendFailed + 1次。不会选择上次失败的broker,尝试向其他broker发送,最大程度保证消息不丢。超过重投次数,抛出异常,由客户端保证消息不丢。当出现RemotingException、MQClientException和部分MQBrokerException时会重投。

  • retryTimesWhenSendAsyncFailed:异步发送失败重试次数,异步重试不会选择其他broker,仅在同一个broker上做重试,不保证消息不丢。

  • retryAnotherBrokerWhenNotStoreOK:消息刷盘(主或备)超时或slave不可用(返回状态非SEND_OK),是否尝试发送到其他broker,默认false。十分重要消息可以开启。

    消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复,消息重复在RocketMQ中是无法避免的问题。消息重复在一般情况下不会发生,当出现消息量大、网络抖动,消息重复就会是大概率事件。另外,生产者主动重发、consumer负载变化也会导致重复消息。

  • 流量控制

    分为生产者流量控制和消费者流量控制,生产者流控是Broker达到了处理瓶颈,消费者流控是Consumer达到了处理瓶颈。

    生产者流控:

  • commitLog文件被锁时间超过osPageCacheBusyTimeOutMills时,参数默认为1000ms,返回流控。

  • 如果开启transientStorePoolEnable == true,且broker为异步刷盘的主机,且transientStorePool中资源不足,拒绝当前send请求,返回流控。

  • broker每隔10ms检查send请求队列头部请求的等待时间,如果超过waitTimeMillsInSendQueue,默认200ms,拒绝当前send请求,返回流控。

  •  broker通过拒绝send 请求方式实现流量控制。

        注意,生产者流控,不会尝试消息重投。

    

    消费者流控:

  • 消费者本地缓存消息数超过pullThresholdForQueue时,默认1000。

  • 消费者本地缓存消息大小超过pullThresholdSizeForQueue时,默认100MB。

  • 消费者本地缓存消息跨度超过consumeConcurrentlyMaxSpan时,默认2000。

消费者流控的结果是降低拉取频率。

  • 死信队列

  死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

  RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在RocketMQ中,可以通过使用console控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

2. 关键技术架构

2.1 技术架构

RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。

  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块:

  • Remoting Module:整个Broker的实体,负责处理来自clients端的请求。

  • Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息

  • Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。

  • HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。

  • Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

2.2 部署架构

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。

  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

结合部署架构图,描述集群工作流程:

RocketMQ支持如下几种部署方式:

  • 主从模式

  • Master宕机,Broker可读不可写

  • 集群搭建方式

  • 单Master模式

  • 多Master模式

  • 多Master多Slave模式-异步复制

  • 多Master多Slave模式-同步双写

2.3 消息存储设计
  • 消息存储整体架构:

消息存储是RocketMQ中最为复杂和最为重要的一部分,主要概念有:CommitLog、ConsumeQueue、IndexFile(索引文件)。

(1) CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;

(2) ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M;

(3) IndexFile:IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:$HOME \store\index${fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

  • 消息刷盘

(1) 同步刷盘:如上图所示,只有在消息真正持久化至磁盘后RocketMQ的Broker端才会真正返回给Producer端一个成功的ACK响应。同步刷盘对MQ消息可靠性来说是一种不错的保障,但是性能上会有较大影响,一般适用于金融业务应用该模式较多。

(2) 异步刷盘:能够充分利用OS的PageCache的优势,只要消息写入PageCache即可将成功的ACK返回给Producer端。消息刷盘采用后台异步线程提交的方式进行,降低了读写延迟,提高了MQ的性能和吞吐量。

注意:这里说的刷盘,都是单Broker上的消息可靠性保障,就像之前概念里提到的,如果整个服务器或者磁盘坏掉,还是不能保证高可用。所以为了保险,还需要搭建集群,通过Master-Slave的方式进行异步写入或者同步双写。

2.4 消息过滤

RocketMQ分布式消息队列的消息过滤方式有别于其它MQ中间件,是在Consumer端订阅消息时再做消息过滤的。RocketMQ这么做是在于其Producer端写入消息和Consumer端订阅消息采用分离存储的机制来实现的,Consumer端订阅消息是需要通过ConsumeQueue这个消息消费的逻辑队列拿到一个索引,然后再从CommitLog里面读取真正的消息实体内容,所以说到底也是还绕不开其存储结构。其ConsumeQueue的存储结构如下,可以看到其中有8个字节存储的Message Tag的哈希值,基于Tag的消息过滤正式基于这个字段值的。

主要支持如下2种的过滤方式:

2.5 负载均衡

RocketMQ的负载均衡都是在Client端完成的,分为Producer端发送消息时候的负载均衡和Consumer端订阅消息时的负载均衡。

  • Producer端负载均衡:

  • 定时获取Queue信息

  • 负载均衡算法:随机递增取模

  • 容错机制:故障延迟

每个实例在发消息的时候,默认会-轮询(调度方式)所有的message queue发送,以达到让消息平均落在不同的queue上。而由于queue可以散落在不同的broker,所以消息就发送到不同的broker下。所以选择broker是负载均衡的关键,基于方法selectOneMessageQueue(),这个方法会随机选择一个broker。跟根据selectOneMessageQueue()方法的实现内容,来选择一个队列(MessageQueue)进行发送消息。

有一个关键变量值sendLatencyFaultEnable(默认是false),如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息;如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。

所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L。latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
  • Consumer端负载均衡:

  • 客户端心跳上报数据

  • 定时Rebalance 20S

  • 获取队列信息

  • 获取消费者信息

  • 排序平均分配

在RocketMQ中,Consumer端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端在知道从Broker端的哪一个消息队列—队列中去获取消息。因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。

LongPoll:

  • Consumer发送拉取消息

  • Broker hold住请求,直到有新消息再返回

  • 请求超时,Consumer再次发起请求

  • 请求超时时间默认30S

在集群消费模式下,每条消息只需要投递到订阅这个topic的Consumer Group下的一个实例即可。RocketMQ采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条message queue。

Consumer通过重平衡(Rebalance)的方式进行实例的重新分配(默认值每20秒执行一次)。而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照queue的数量和实例的数量平均分配queue给每个实例。

常用的负载均衡算法有以下几种:

  • 平均分配策略(默认)(AllocateMessageQueueAveragely)环形分配策略(AllocateMessageQueueAveragelyByCircle)

  • 手动配置分配策略(AllocateMessageQueueByConfig)

  • 机房分配策略(AllocateMessageQueueByMachineRoom)

  • 一致性哈希分配策略(AllocateMessageQueueConsistentHash)

  • 靠近机房策略(AllocateMachineRoomNearby)

2.6事务消息

RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示:

其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

1.事务消息发送及提交:

(1) 发送消息(half消息)。

(2) 服务端响应消息写入结果。

(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

2.补偿流程:

(5) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

(6) Producer收到回查消息,检查回查消息对应的本地事务的状态

(7) 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

2.7 消息查询

RocketMQ支持两种维度的查询:

  • 按照MessageId查询消息,没有业务含义

  • 按照Message Key查询消息,通常是用业务字段定义。

最佳事件:每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。

// 订单Id   
String orderId = "20034568923546";   
message.setKeys(orderId);