热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

提升RabbitMQ消费速度的一些实践

RabbitMQ是一个开源的消息中间件,自带管理界面友好、开发语言支持广泛、没有对其它中间件的依赖,而且社区非常活跃,特别适合中小型企业拿来就用。这篇文章主要探讨提升RabbitM

RabbitMQ是一个开源的消息中间件,自带管理界面友好、开发语言支持广泛、没有对其它中间件的依赖,而且社区非常活跃,特别适合中小型企业拿来就用。这篇文章主要探讨提升RabbitMQ消费速度的一些方法和实践,比如增加消费者、提高Prefetch count、多线程处理、批量Ack等。


增加消费者


这个道理比较容易理解,多个人搬砖的速度肯定比一个人要快很多。

不过实际情况中还需要面对一些技术挑战,比如后端处理能力、并发冲突,以及处理顺序。

后端处理能力:比如多个消费者都要操作数据库,那么数据库连接的并发数和读写吞吐量就是后端处理能力,如果达到了数据库的最大处理能力,增加再多的消费者也没有用,甚至会因为数据库拥塞导致整体消费速度的下降。这个问题还存在另一种情况,就是消费者是否真正的发挥了后端服务的处理能力,比如使用Redis时是否采用了多线程、IO复用等方式来进一步提升吞吐量。

并发冲突:比如两个消费者都要去修改用户的积分,单个消费者的做法可能就是取出来、改下字段的值、最后再update到数据库,多个消费者时如果同时取出了相同的数据,还这样处理的话就会出问题了。这时候可能需要修改下SQL语句,直接在SQL语句中修改积分,由数据库写入事务来处理并发冲突;或者搞一个分布式锁,对于具体的某个用户同时只能有一个消费者来处理其积分。

处理顺序:如果消息需要被顺序处理,那么各个消费者之间还需要增加一个同步机制。比如基于GPS定位的电子围栏,在出围栏的某个时段,先产生了围栏内定位消息、然后产生了围栏外定位消息;如果围栏外定位消息先被一个消费者处理,则判定为出围栏,这没有问题;然后围栏内定位消息被另一个消费者处理,则会被判定为入围栏,这个就属于误判了。这时候可能要同步一个已处理定位时间,早于这个时间的定位就抛弃掉;或者同一个设备的定位消息通过某种算法控制只能由某个消费者进行处理。

解决后边两个问题的方法不可避免的要引入多个消费者之间的协商机制,如果这些协商机制设计不好会对处理速度带来很大影响。因此多人搬砖速度快的前提是多个人搬砖时不需要大家频繁的坐下来协商谁搬哪块砖,否则就会浪费很多时间在相互协调上,反而不能提升搬砖的速度。

所以通过增加消费者提升消费速度得以成立的前提是消费者业务并发处理能力要足够,消费者依赖的后端服务处理能力也要足够。这是此种方式的关键点。


提高Prefetch count


消息消费速度主要受到发送消息时间、消费者处理时间、消息Ack时间这几个时间的影响,如果一个消息走完这个流程再发送另一个的话,效率将会非常低。可以让消息在这几个时间内恰当的分配,让消息总是连续不断的被消费者接收处理,就可以提升消费者的消费速度。

根据如上描述,有些消息可能正在被消费者处理,有些可能在等待消费者处理,有的消息可能还在网络传输中,而如果不限制传输的数量,消费者端可能因处理能力补足会堆积大量的消息,首先内存使用将不可控制,其次此时也无法将这些消息再分配给别的消费者。因此才有了Prefetch count,用于控制消息发送给消费者的速度;这个方案需要配合Ack使用,消费者回复消息Ack后,RabbitMQ才会继续发送同等数量的消息到消费者。提高Prefetch count到一个合适的值可以提升消息的消费速度。这个值的设定可能还要实时参考上边提到的三个时间,这有点类似TCP的流控措施。这个值的计算方法请看下文:


RabbitMQ关于吞吐量,延迟和带宽的一些理论


参考文档:https://blog.csdn.net/gbbqrglvir3dyi82/article/details/78663828

多线程处理


多线程处理和增加消费者有异曲同工之妙。多线程处理不需要建立多个到RabbitMQ的连接,它在收到队列消息后将其放入不同的线程中进行处理,这样进程中就会有多个消息同时处理,增加了消费吞吐量,从而提升了消费速度。

来看一个例子:

consumer.Received += (o, e) =>
{
ThreadPool.QueueUserWorkItem(new WaitCallback(ProcessSingleContextMessage), e);
};

 

在这个例子中波斯码将收到的消息放入线程池队列进行处理,注意这里需要配合上一节提到的Prefetch count,设置一个合适的值,消费者就可以同时处理多条消息了。

多线程处理也存在多消费者处理时的问题,只不过在一个进程中处理并发冲突和消息顺序的成本可能更低一些。下边的代码片段展示了一个解决消息顺序处理问题的方案:

// 接收消息存入列表,当接收数量达到prefetchCount/2时就加入处理队列;
// 1/2是考虑了消息从RabbitMQ到消费者的传输时间,不需要等所有的消息都到达了才开始处理。
consumer.Received += (o, e) =>
{
lock(receiveLocker){
basicDeliverEventArgsList.Add(e);
if (basicDeliverEventArgsList.Count >= prefetchCount/2)
{
var deliverEventArgs = basicDeliverEventArgsList.ToArray();
basicDeliverEventArgsList.Clear();
EnProcessQueue(deliverEventArgs);
}
}
};
// 此处省略数据出队列的代码,请自行脑补
....
// 然后这个方法是用来处理消息的,将消息根据数据Key分成若干组,放到多个任务中并行处理;
// 相同数据Key的消息将分配到一个组中,在这个组中数据被顺序处理
private void Process(BasicDeliverEventArgs[] args)
{
if (args.Length <= 0)
{
return;
}
try
{
var tasks = CreateParallelProcessTasksByDataKey(args);
Task.WaitAll(tasks);
}
catch (Exception ex)
{
ToLog("处理任务发生异常", ex);
}
}
// 创建并行处理多条消息的任务
private Task[] CreateParallelProcessTasksByDataKey(BasicDeliverEventArgs[] args)
{
// 根据dataKey进行分组,dataKey可以放到消息的header中进行传输,这里就不给出具体的分组方法了
Dictionary> eDic = GetMessgeGroupByDataKey(args);
// 任务数量
var paralleTaskNum = this.parallelNum;
if (paralleTaskNum > eDic.Count)
{
paralleTaskNum = eDic.Count;
}
// 每个任务处理的消息数量
var perTaskNum = (int)Math.Ceiling(args.Length / (double)paralleTaskNum);
// 任务数组
List tasks = new List();
var taskArgs = new List();
for (int j = eDic.Count - 1; j >= 0; j--)
{
var currentElement = eDic.ElementAt(j);
taskArgs.AddRange(currentElement.Value);
eDic.Remove(currentElement.Key);
if (taskArgs.Count >= perTaskNum || j == 0)
{
// 创建任务,并处理分配的消息
var taskList = taskArgs.Select(d => d).ToList();
taskArgs.Clear();
var task = Task.Factory.StartNew(() =>
{
// 这这里处理分组中的消息
...
});
tasks.Add(task);
}
}
return tasks.ToArray();
}

 

上边这段代码中解决问题的关键就是将消息进行分组,同组内的消息顺序处理,分组间并行处理,既通过多线程提升了消息整体的处理速度,又能支持消息的顺序处理。


批量Ack


这种方式有效的原理是:每条消息分别Ack的情况下,RabbitMQ收到一个Ack才发送一条消息,这中间就会有很多的时间在等待Ack回来,通过批量Ack的方式,减少了很多Ack传输的时间。注意这里隐含的方式是RabbitMQ通过设置的Prefetch count连续向消费者发送多条消息,否则这个批量就没意义了。

下边的代码片段给出其使用方式:

channel.BasicAck(e.DeliveryTag, true);

 

第2个参数为true就是指示采用批量Ack的方式,凡是delivery-­tag比第1个参数小的消息都会被Ack。

这里需要注意:如果消费者在处理某条消息时失败了,业务上又要求不能丢失任何消息,这时就不能对所有的消息进行批量Ack,否则RabbitMQ就不会再次投递这条消息了,这需要根据自己的实际情况进行取舍。解决此问题的一个简单方法是,跟踪所有消息的处理结果,如果全部成功则使用批量Ack,如果部分成功则有两个选择:如果不关注顺序则退化为每个消息发送Ack或Reject的方式;如果关注顺序则本次接收到Prefetch count数量的消息全部nack,否则reject的消息再次投递时顺序就不对了,这时候业务还要做好处理重复数据的逻辑。


总结

通过分析上边的这些方法,在使用RabbitMQ消费时可以遵循这样一个路径:



  1. 启用Prefetch count设置;

  2. 先1个消费者,1次只接收1条,处理完毕后再传输下一条,这样可以避免并发冲突和消息顺序问题;

  3. 如果消费速度不满足要求,则1次接收多条,按接收顺序处理;

  4. 如果消费速度还是不满足要求,则1次接收多条,并行处理;

  5. 如果消费速度还是不满足要求,则启动多个消费者,并行处理。

  6. 如果消费速度还是不满足要求,改需求,或者换别的中间件。

在这个过程中需要始终关注优化消费者及后端程序处理能力,比如优化SQL语句、使用缓存、使用负载均衡等等,加快处理速度就能提升消费速度,而且很多时候就是程序处理太耗时了。

关于重复数据、并发冲突、顺序处理问题的处理:



  • 随时做好处理重复数据的准备,因为不只消费者端可能会触发消息的重复投递,发送端也可能重复发送消息,这个很难避免。

  • 对于并发冲突问题,消费者进程内可以使用锁,跨消费者引入第三方机制来处理,比如使用Redis原子操作、数据库原子操作或者分布式锁。

  • 对于顺序处理问题,最好没有这个需求;在同一个消费者内可以分组处理;在多个消费者时使用队列分组,每个队列绑定不同的Route key,不同Route key代表的消息之间没有顺序关联。波斯码再次提醒还要注意处理失败时的逻辑,避免重新投递消息的顺序问题。



推荐阅读
  • 讨伐Java多线程与高并发——MQ篇
    本文是学习Java多线程与高并发知识时做的笔记。这部分内容比较多,按照内容分为5个部分:多线程基础篇JUC篇同步容器和并发容器篇线程池篇MQ篇本篇 ... [详细]
  • RabbitMQ的消息持久化处理
    1、RabbitMQ的消息持久化处理,消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢——消息持久化。2、auto ... [详细]
  • 前景:当UI一个查询条件为多项选择,或录入多个条件的时候,比如查询所有名称里面包含以下动态条件,需要模糊查询里面每一项时比如是这样一个数组条件:newstring[]{兴业银行, ... [详细]
  • Oracle优化新常态的五大禁止及其性能隐患
    本文介绍了Oracle优化新常态中的五大禁止措施,包括禁止外键、禁止视图、禁止触发器、禁止存储过程和禁止JOB,并分析了这些禁止措施可能带来的性能隐患。文章还讨论了这些禁止措施在C/S架构和B/S架构中的不同应用情况,并提出了解决方案。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • 2021最新总结网易/腾讯/CVTE/字节面经分享(附答案解析)
    本文分享作者在2021年面试网易、腾讯、CVTE和字节等大型互联网企业的经历和问题,包括稳定性设计、数据库优化、分布式锁的设计等内容。同时提供了大厂最新面试真题笔记,并附带答案解析。 ... [详细]
  • RabbitMq之发布确认高级部分1.为什么会需要发布确认高级部分?在生产环境中由于一些不明原因,导致rabbitmq重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢 ... [详细]
  • 本文介绍了一个在线急等问题解决方法,即如何统计数据库中某个字段下的所有数据,并将结果显示在文本框里。作者提到了自己是一个菜鸟,希望能够得到帮助。作者使用的是ACCESS数据库,并且给出了一个例子,希望得到的结果是560。作者还提到自己已经尝试了使用"select sum(字段2) from 表名"的语句,得到的结果是650,但不知道如何得到560。希望能够得到解决方案。 ... [详细]
  • 第四章高阶函数(参数传递、高阶函数、lambda表达式)(python进阶)的讲解和应用
    本文主要讲解了第四章高阶函数(参数传递、高阶函数、lambda表达式)的相关知识,包括函数参数传递机制和赋值机制、引用传递的概念和应用、默认参数的定义和使用等内容。同时介绍了高阶函数和lambda表达式的概念,并给出了一些实例代码进行演示。对于想要进一步提升python编程能力的读者来说,本文将是一个不错的学习资料。 ... [详细]
  • This article discusses the efficiency of using char str[] and char *str and whether there is any reason to prefer one over the other. It explains the difference between the two and provides an example to illustrate their usage. ... [详细]
  • 本文介绍了OpenStack的逻辑概念以及其构成简介,包括了软件开源项目、基础设施资源管理平台、三大核心组件等内容。同时还介绍了Horizon(UI模块)等相关信息。 ... [详细]
  • Android自定义控件绘图篇之Paint函数大汇总
    本文介绍了Android自定义控件绘图篇中的Paint函数大汇总,包括重置画笔、设置颜色、设置透明度、设置样式、设置宽度、设置抗锯齿等功能。通过学习这些函数,可以更好地掌握Paint的用法。 ... [详细]
  • MySQL数据库锁机制及其应用(数据库锁的概念)
    本文介绍了MySQL数据库锁机制及其应用。数据库锁是计算机协调多个进程或线程并发访问某一资源的机制,在数据库中,数据是一种供许多用户共享的资源,如何保证数据并发访问的一致性和有效性是数据库必须解决的问题。MySQL的锁机制相对简单,不同的存储引擎支持不同的锁机制,主要包括表级锁、行级锁和页面锁。本文详细介绍了MySQL表级锁的锁模式和特点,以及行级锁和页面锁的特点和应用场景。同时还讨论了锁冲突对数据库并发访问性能的影响。 ... [详细]
  • 云原生应用最佳开发实践之十二原则(12factor)
    目录简介一、基准代码二、依赖三、配置四、后端配置五、构建、发布、运行六、进程七、端口绑定八、并发九、易处理十、开发与线上环境等价十一、日志十二、进程管理当 ... [详细]
  • {moduleinfo:{card_count:[{count_phone:1,count:1}],search_count:[{count_phone:4 ... [详细]
author-avatar
SCY瑶_450
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有