热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ运用负载均衡与消息持久化的实现

 Rabbitmq是对AMQP协议的一种实现。使用范围也比较广泛,主要用于消息异步通讯。

 

Rabbitmq 是对AMQP协议的一种实现。使用范围也比较广泛,主要用于消息异步通讯。



一,默认情况下Rabbitmq使用轮询(round-robin)方式转发消息。为了较好实现负载,可以在消息接收方指定,每次接收到一条,这样可以缓解单一服务器压力。

代码如下:



ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
cOnnection= factory.newConnection();
final Channel channel = connection.createChannel();
channel.basicQos(1);//设置每次接收一条
为了保证消息不丢失,取消自动ACK,改为只有在完全处理消息后再ACK。
如:

Consumer cOnsumer= new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
try {
Thread.sleep(10000);
}
catch(Exception ex)
{
}
System.out.println("received Message:" + message);
channel.basicAck(envelope.getDeliveryTag(), false);//处理完成后ACK
}
};
channel.basicConsume(QUEUE_NAME, false, consumer);//取消自动ACK
二,为了保证在Rabbitmq在宕机后,仍不丢失消息,需要将队列和发布的消息都声明为可持久化的。
如:

channel.queueDeclare(QUEUE_NAME, true, false, false, null);

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, bytes);

三,Rabbitmq 的MessageModel(消息模型)
在Rabbitmq的消息模型中,我们决不应该将消息直接发送到queue.事实上,消息发送者并不关心消息是否被路由或被入队列或被接收并处理。
生产者应只与Exchange(交换器)打交道,
Exchange的作用:从生产者接受消息,向消费者发送消息。
The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.【所以交换器必须准确地知道怎样处理消息,是否应该加到一个指定的队列,还是发送到多个队列,还是应该抛弃该消息。】
指定Exchange的Rule,即以何种方式转发消息。
Rabbitmq共有四种:direct,topic ,headers和 fanout,NamelessExchange.

fanout,这种方式很简单,就是一个广播,把消息转给所有的订阅者;有几个订阅者,消息就会被复制几份。

NamelessExchange,无Exchange,消息以轮询(round-robin)方式,发送给消费者,通过routingKey识别对应的消费者。

【提示】:

rabbitmqctl list_exchanges ,用于查看当前Rabbitmq正在运行的交换器;rabbitmqctl list_bindings,查看当前绑定数
eg:
生产者只负责发送消息,而不关心这些消息是否被处理,也不关心消息是否被抛弃;消息由Exchange根据具体rule处理。

private static final String EXCHANGE_NAME = "logs";
private static final String EXCHANGE_TYPE="fanout";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection cOnnection= factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}

 如:以fanout方式处理消息:消息会发送给所有的订阅者,(与routingKey无关)

private static final String exchangeName="logs";
private static final String exchangeType = "fanout";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
Connection cOnnection= null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
cOnnection= factory.newConnection();
final Channel channel = connection.createChannel();
int prefetchCount = 1;
channel.basicQos(prefetchCount);
channel.exchangeDeclare(exchangeName,exchangeType);
//创建一个队列,用于接收消息
String queueName= channel.queueDeclare().getQueue();
channel.queueBind(queueName,exchangeName,"");
System.out.println("Waiting for messages...,over it ,Press CTRL+C ");
Consumer cOnsumer= new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("received Message:" + message);
try {
doWork(message);
} catch (InterruptedException e) {
e.printStackTrace();
}
finally {
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(queueName, false, consumer);
} finally {
}
}

Bindings,将队列绑定到Exchange,说明可以从Exchange接收消息。【A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.】

channel.queueBind(queueName, EXCHANGE_NAME, "routingKey");
direct,只有当routingKey,与bindingKey相同Exchange才能会推送消息。
如:
生产者:

channel.basicPublish(exchangeName, "routingKey", MessageProperties.PERSISTENT_TEXT_PLAIN, bytes);//片断

生产者在发送消息时指定routingKey.
消费者:可以绑定多个Key,以接收来自多个routingKey的消息

for (String bindingKey : typeArr) {
channel.queueBind(queueName, exchangeName, bindingKey);
}
//接收者绑定Key.
topic ,生产者在发送消息时,指定准确的routingKey(多个单词以.号分隔),当bindingKey模式匹配到routingKey时,则接收消息。
注意:routingKey和bindingKey的长度不能超过255.
*(star),只匹配一个单词。#(hash),能匹配0个或多个单词
三RPC
分布式远程调用。

 

 



   



推荐阅读
  • Spring源码解密之默认标签的解析方式分析
    本文分析了Spring源码解密中默认标签的解析方式。通过对命名空间的判断,区分默认命名空间和自定义命名空间,并采用不同的解析方式。其中,bean标签的解析最为复杂和重要。 ... [详细]
  • 本文介绍了在开发Android新闻App时,搭建本地服务器的步骤。通过使用XAMPP软件,可以一键式搭建起开发环境,包括Apache、MySQL、PHP、PERL。在本地服务器上新建数据库和表,并设置相应的属性。最后,给出了创建new表的SQL语句。这个教程适合初学者参考。 ... [详细]
  • 开发笔记:加密&json&StringIO模块&BytesIO模块
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了加密&json&StringIO模块&BytesIO模块相关的知识,希望对你有一定的参考价值。一、加密加密 ... [详细]
  • 本文介绍了作者在开发过程中遇到的问题,即播放框架内容安全策略设置不起作用的错误。作者通过使用编译时依赖注入的方式解决了这个问题,并分享了解决方案。文章详细描述了问题的出现情况、错误输出内容以及解决方案的具体步骤。如果你也遇到了类似的问题,本文可能对你有一定的参考价值。 ... [详细]
  • 本文介绍了南邮ctf-web的writeup,包括签到题和md5 collision。在CTF比赛和渗透测试中,可以通过查看源代码、代码注释、页面隐藏元素、超链接和HTTP响应头部来寻找flag或提示信息。利用PHP弱类型,可以发现md5('QNKCDZO')='0e830400451993494058024219903391'和md5('240610708')='0e462097431906509019562988736854'。 ... [详细]
  • 本文介绍了Android 7的学习笔记总结,包括最新的移动架构视频、大厂安卓面试真题和项目实战源码讲义。同时还分享了开源的完整内容,并提醒读者在使用FileProvider适配时要注意不同模块的AndroidManfiest.xml中配置的xml文件名必须不同,否则会出现问题。 ... [详细]
  • 本文介绍了如何使用C#制作Java+Mysql+Tomcat环境安装程序,实现一键式安装。通过将JDK、Mysql、Tomcat三者制作成一个安装包,解决了客户在安装软件时的复杂配置和繁琐问题,便于管理软件版本和系统集成。具体步骤包括配置JDK环境变量和安装Mysql服务,其中使用了MySQL Server 5.5社区版和my.ini文件。安装方法为通过命令行将目录转到mysql的bin目录下,执行mysqld --install MySQL5命令。 ... [详细]
  • 在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板
    本文介绍了在Xamarin XAML语言中如何在页面级别构建ControlTemplate控件模板的方法和步骤,包括将ResourceDictionary添加到页面中以及在ResourceDictionary中实现模板的构建。通过本文的阅读,读者可以了解到在Xamarin XAML语言中构建控件模板的具体操作步骤和语法形式。 ... [详细]
  • 基于dlib的人脸68特征点提取(眨眼张嘴检测)python版本
    文章目录引言开发环境和库流程设计张嘴和闭眼的检测引言(1)利用Dlib官方训练好的模型“shape_predictor_68_face_landmarks.dat”进行68个点标定 ... [详细]
  • 本文介绍了Swing组件的用法,重点讲解了图标接口的定义和创建方法。图标接口用来将图标与各种组件相关联,可以是简单的绘画或使用磁盘上的GIF格式图像。文章详细介绍了图标接口的属性和绘制方法,并给出了一个菱形图标的实现示例。该示例可以配置图标的尺寸、颜色和填充状态。 ... [详细]
  • Android工程师面试准备及设计模式使用场景
    本文介绍了Android工程师面试准备的经验,包括面试流程和重点准备内容。同时,还介绍了建造者模式的使用场景,以及在Android开发中的具体应用。 ... [详细]
  • 在Oracle11g以前版本中的的DataGuard物理备用数据库,可以以只读的方式打开数据库,但此时MediaRecovery利用日志进行数据同步的过 ... [详细]
  • 本文介绍了如何使用PHP代码将表格导出为UTF8格式的Excel文件。首先,需要连接到数据库并获取表格的列名。然后,设置文件名和文件指针,并将内容写入文件。最后,设置响应头部,将文件作为附件下载。 ... [详细]
  • 超级简单加解密工具的方案和功能
    本文介绍了一个超级简单的加解密工具的方案和功能。该工具可以读取文件头,并根据特定长度进行加密,加密后将加密部分写入源文件。同时,该工具也支持解密操作。加密和解密过程是可逆的。本文还提到了一些相关的功能和使用方法,并给出了Python代码示例。 ... [详细]
  • 本文详细介绍了使用C#实现Word模版打印的方案。包括添加COM引用、新建Word操作类、开启Word进程、加载模版文件等步骤。通过该方案可以实现C#对Word文档的打印功能。 ... [详细]
author-avatar
张伊韵育财育信
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有