RokcetMQ 现在市面上功能都很完善的一款分布式,队列模型的消息中间件
首先来介绍一下Rocket支持的几个重要的特性:
订阅与发布:
消息的发布是指某个生产者向某个topic发送消息;
消息的订阅是指某个消费者关注了某个topic中带有某些tag的消息,进而从该topic消费数据。
消息顺序:
消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ可以严格的保证消息有序。
顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。
- 全局顺序:对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
- 分区顺序:对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。 Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。
适用场景:性能要求高,以 sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。
消息过滤
RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。
消息过滤目前是在Broker端实现的,优点是减少了对于Consumer无用消息的网络传输,缺点是增加了Broker的负担、而且实现相对复杂。
至少一次:
至少一次(At least Once)指每个消息必须投递一次。Consumer先Pull消息到本地,消费完成后,才向服务器返回ack(确认消息),如果没有消费一定不会ack消息。
回溯消费:
回溯消费是指消费者已经消费成功的消息,由于业务上需求需要重新消费。
要支持此功能,Broker在向Consumer投递成功消息后,消息仍然需要保留。并且重新消费一般是按照时间维度,例如由于消费者系统故障,恢复后需要重新消费1小时前的数据,那么Broker要提供一种机制,可以按照时间维度来回退消费进度。RocketMQ支持按照时间回溯消费,时间维度精确到毫秒。
事务消息
RocketMQ事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败。
RocketMQ的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致。
定时消息
定时消息(延迟队列)是指消息发送到broker后,不会立即被消费,等待特定时间投递给真正的topic。
broker有配置项messageDelayLevel,默认值为“1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”,18个level。可以配置自定义messageDelayLevel,但是不支持配置手动的定时时间,如45s。注意,messageDelayLevel是broker的属性,不属于某个topic。发消息时,设置delayLevel等级即可:msg.setDelayLevel(level)。level有以下三种情况:
- level == 0,消息为非延迟消息
- 1<&#61;level<&#61;maxLevel&#xff0c;消息延迟特定时间&#xff0c;例如level&#61;&#61;1&#xff0c;延迟1s
- level > maxLevel&#xff0c;则level &#61;&#61; maxLevel&#xff0c;例如level&#61;&#61;20&#xff0c;延迟2h
定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中&#xff0c;并根据delayTimeLevel存入特定的queue&#xff0c;queueId &#61; delayTimeLevel – 1&#xff0c;即一个queue只存相同延迟的消息&#xff0c;保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX&#xff0c;将消息写入真实的topic。
需要注意的是&#xff0c;定时消息会在第一次写入和调度写入真实topic时都会计数&#xff0c;因此发送数量、tps都会变高。
业务场景&#xff1a;如订单过期&#xff0c;成功订单重复通知。
消息重试
Consumer消费消息失败后&#xff0c;要提供一种重试机制&#xff0c;令消息再消费一次。Consumer消费消息失败通常可以认为有以下几种情况&#xff1a;
- 由于消息本身的原因&#xff0c;例如加解密失败等。这种错误通常需要跳过这条消息&#xff0c;再消费其它消息&#xff0c;而这条失败的消息即使立刻重试消费&#xff0c;99%也不成功&#xff0c;所以最好提供一种定时重试机制&#xff0c;即过10秒后再重试。
- 由于依赖的下游应用服务不可用&#xff0c;例如db连接不可用&#xff0c;网关通讯异常等。遇到这种错误&#xff0c;即使跳过当前失败的消息&#xff0c;消费其他消息同样也会报错。这种情况建议应用sleep 30s&#xff0c;再消费下一条消息&#xff0c;这样可以减轻Broker重试消息的压力。
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%&#43;consumerGroup”的重试队列&#xff08;这里需要注意的是&#xff0c;这个Topic的重试队列是针对消费组&#xff0c;而不是针对每个Topic设置的&#xff09;&#xff0c;用于暂时保存因为各种异常而导致Consumer端无法消费的消息。考虑到异常恢复起来需要一些时间&#xff0c;会为重试队列设置多个重试级别&#xff0c;每个重试级别都有与之对应的重新投递延时&#xff0c;重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC_XXXX”的延迟队列中&#xff08;请参考定时消息&#xff09;&#xff0c;后台定时任务按照对应的时间进行Delay后重新保存至“%RETRY%&#43;consumerGroup”的重试队列中。
现在让我们现在解释几个基本的名词概念&#xff1a;
消息模型&#xff08;Message Model&#xff09;&#xff1a;
RocketMQ主要由 Producer、Broker、Consumer 三部分组成&#xff0c;其中Producer 负责生产消息&#xff0c;Consumer 负责消费消息&#xff0c;Broker 负责存储消息。
Broker 在实际部署过程中对应一台服务器&#xff0c;每个 Broker 可以存储多个Topic的消息&#xff0c;每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址&#xff0c;每个Topic中的消息地址存储于多个 消息队列&#xff08;Message Queue&#xff09; 中。ConsumerGroup 由多个Consumer 实例构成。
NameServer&#xff1a;
nameserver相当于注册中心或者DNS&#xff0c;或者dubbo中的zookeeper&#xff0c;用于存储路由信息。同时由于nameserver没有频繁的读取以及在集群下即使一台挂掉也不会造成影响&#xff0c;所以稳定性特别高。主要功能为&#xff1a;
- Broker管理&#xff1a;nameserver接受broker的注册信息&#xff0c;同时提供心跳检测&#xff0c;检查broker是否存活
- 路由信息管理&#xff1a;每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息&#xff0c;从而进行消息的投递和消费。
注&#xff1a;NameServer通常也是集群的方式部署&#xff0c;各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息&#xff0c;所以每一个NameServer实例上面都保存一份完整的路由信息&#xff0c;这也就是为什么我上面说的一台挂掉也不会造成影响的原因了。
Broker&#xff1a;
Broker主要负责消息的存储、投递和查询以及服务高可用保证。
Topic&#xff1a;
表示一类消息的集合&#xff0c;每个主题包含若干条消息&#xff0c;每条消息只能属于一个主题&#xff0c;是RocketMQ进行消息订阅的基本单位。
topic在RocketMQ 的设计思想里&#xff0c;是作为同一个业务逻辑消息的组织形式&#xff0c;它仅仅是一个逻辑上的概念&#xff0c;而在一个topic下又包含若干个逻辑队列&#xff0c;即消息队列&#xff08;每一个生产者或者消费者只能同时连接一个队列&#xff09;&#xff0c;消息内容实际是存放在队列中&#xff0c;而队列又存储在broker中。Broker又通过同步刷盘和异步刷盘将信息保存起来。
默认情况下&#xff0c;topic不用手动创建&#xff0c;当producer进行消息发送时&#xff0c;会从nameserver拉取topic的路由信息&#xff0c;如果topic的路由信息不存在&#xff0c;那么会默认拉取broker启动时默认创建好名为“TBW102”的Topic
标签&#xff08;Tag&#xff09;&#xff1a;
为消息设置的标志&#xff0c;用于同一主题&#xff08;Topic&#xff09;下区分不同类型的消息。来自同一业务单元的消息&#xff0c;可以根据不同业务目的在同一主题下设置不同标签。
生产者&#xff08;Producer&#xff09;&#xff1a;
负责生产消息&#xff0c;一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式&#xff0c;同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息&#xff0c;单向发送不需要。
消费者&#xff08;Conumser&#xff09;&#xff1a;
负责消费消息&#xff0c;一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式&#xff1a;拉取式消费&#xff08;pull&#xff09;、推动式消费&#xff08;push&#xff09;。
拉取式消费&#xff1b;Consumer消费的一种类型&#xff0c;应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息&#xff0c;应用就会启动消费过程。
推动式消费&#xff1b;Consumer消费的一种类型&#xff0c;该模式下Broker收到数据后会主动推送给消费端&#xff0c;该消费模式一般实时性较高。&#xff08;但实际上&#xff0c;push也只是定时的去pull消息 造成push的假象&#xff09;
生产者组&#xff1a;
同一类Producer的集合&#xff0c;这类Producer发送同一类消息且发送逻辑一致。如果发送的是事物消息且原始生产者在发送之后崩溃&#xff0c;则Broker服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。
消费者组&#xff1a;
同一类Consumer的集合&#xff0c;这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面&#xff0c;实现负载均衡和容错的目标变得非常容易。要注意的是&#xff0c;消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式&#xff1a;集群消费&#xff08;Clustering&#xff09;和广播消费&#xff08;Broadcasting&#xff09;。
集群消费&#xff1a;集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
广播消费&#xff1b;广播消费模式下&#xff0c;相同Consumer Group的每个Consumer实例都接收全量的消息。
未完待续。。。。。