热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

五、RabbitMQJavaClient基本使用详解

JavaClient的5.x版本系列需要JDK8,用于编译和运行。在Android上,仅支持Android7.0或更高版本。4.x版本系列支持7.0之前

Java Client的5.x版本系列需要JDK 8,用于编译和运行。在Android上,仅支持Android 7.0或更高版本。4.x版本系列支持7.0之前的JDK 6和Android版本。

加入RabbitMQ java client(基于4.x最新版本)依赖:

com.rabbitmqamqp-client4.9.0

RabbitMQ默认用户guest默认情况下,仅限于localhost连接 。所以需要添加新用户,用户名为"admin",密码为"admin"。参考前面RabbitMQ安装新建用户。

简单Hello World

使用Java Client开发RabbitMQ第一个最简单的程序Hello World

后面的demo都使用下面的类获取Connection。

/*** @author Hayson* @date 2018/11/23 13:44* @description rabbitmq Connection工具类*/
public class ConnectionUtils {final static String HOST = "192.168.239.134";final static Integer PORT = 5672;final static String USERNAME = "admin";final static String PASSWORD = "admin";final static String VIRTUALHOST = "/";public static Connection getConnection() throws IOException, TimeoutException {//创建连接工厂,设置连接rabbitmq的参数ConnectionFactory factory = new ConnectionFactory();factory.setHost(HOST);factory.setPort(PORT);factory.setUsername(USERNAME);factory.setPassword(PASSWORD);factory.setVirtualHost(VIRTUALHOST);//通过连接工厂创建连接return factory.newConnection();}
}

生产者客户端发送消息:

/*** @author Hayson* @date 2018/11/23 13:39* @description rabbitmq生产者发送消息*/
public class Send {final static String QUEUE = "helloWorld";public static void main(String[] args) throws IOException, TimeoutException {send();}public static void send() throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//通过连接创建信道Channel channel = connection.createChannel();// 创建一个type="direct"、持久化的、非自动删除的交换器channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);//创建队列,设置队列名、不持久化、不排他、不自动删除、参数为空channel.queueDeclare(QUEUE, false, false, false, null);//将交换器与队列通过路由键绑定channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);//发送消息,指定发送交换器(""则为自带默认交换器)、队列、消息基本属性集为空,发送内容为字节数组String message = "Hello World!";channel.basicPublish("", QUEUE, null, message.getBytes("UTF-8"));System.out.println("发送消息:" + message);//关闭信道和连接channel.close();connection.close();}
}

消费者客户端接受消息:

/*** @author Hayson* @date 2018/11/23 13:41* @description rabbitmq消费者接收消息*/
public class Receiver {final static String QUEUE = "helloWorld";public static void main(String[] args) throws IOException, TimeoutException {recevier();}public static void recevier() throws IOException, TimeoutException {//获取连接Connection connection = ConnectionUtils.getConnection();//通过连接创建信道Channel channel = connection.createChannel();//创建队列,设置队列名、不持久化、不排他、不自动删除、参数为空。//消费者可以不用创建队列,只需指定队列名,但还是建议创建,//如果先启动消费者客户端监听生产者消息,而队列不存在会报异常channel.queueDeclare(QUEUE, false, false, false, null);//继承DefaultConsumer类来实现消费,获取消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println("接收到消息:" + message);}};channel.basicConsume(QUEUE, true, consumer);//关闭信道、连接channel.close();connection.close();}
}

运行生产者客户端发送消息后,在Web管理页面可以看到队列存在一条消息:

1070782-20181204170317807-2035743692.png

此时运行消费者客户端接收消息,控制台打印"Hello World!",Web管理页面队列消息被消费后删除。

Connections(连接)和Channels(管道)

核心的类是Connections(连接)和Channels(管道),分别代表着AMQP 0-9-1协议中的Connections(连接)和Channels(管道),使用前导入

import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;

下面的代码用来在给定的参数(lP地址、端口号、用户名、密码等)下连接RabbitMQ:

ConnectionFactoryfactory factory = new ConnectionFactory();factory.setUsername(USERNAME);factory.setPassword(PASSWORD);factory.setVirtualHost(virtualHost);factory.setHost(IPADDRESS);factory.setPort(PORT);Connectionconn connection = factory.newConnection();

所有的这些参数RabbitMQ服务器都设置了默认值,可以在ConnectionFactory类中查看这些默认值:

PropertyDefault Value
Username"guest"
Password"guest"
Virtual host"/"
Hostname"localhost"
port5672用于常规连接, 5671用于使用TLS的连接

也可以选择使用URI的方式来实现:

ConnectionFactoryfactory factory = new ConnectionFactory();factory.setUri("amqp://userName:password@ipAddress:portNumber/virtualHost");Connectionconn connection = factory.newConnection();

Connection(连接)接口被用来创建一个Channel(管道):

Channel channel = connection.createChannel();

在创建之后,Channel可以用来发送或者接收消息了。

Connection可以用来创建多个Channel实例,但是Channel实例不能在线程问共享,应用程序应该为每一个线程开辟一个Channel。

某些情况下Channel的操作可以并发运行,但是在其他情况下会导致在网络上出现错误的通信帧交错,同时也会影响发送方确认(publisherconfrrm)机制的运行,所以多线程问共享Channel实例是非线程安全的。

Exchange(交换器)和Queue(队列)

Exchange和Queue是AMQP协议中的构建模块,应用程序需确保在使用它们的时候就已经存在了,在使用之前需要先声明(declare)它们。

如果不声明Exchange,RabbitMQ使用默认Exchange

Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
channel.basicPublish("", queueName, null, "Hello World!".getBytes("UTF-8"));

声明一个Exchange和Queue,并将它们绑定在一起:

channel.exchangeDeclare(exchangeName,"direct",true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,exchangeName,routingKey);

上面声明了:

  1. 一个持久化的、非自动删除的、绑定类型为direct的Exchange
  2. 一个非持久化的、排他的、自动删除的Queue(此Queue的名称由RabbitMQ自动生成)。

上面的Exchange和Queue也都没有设置特殊的参数。channel.queueBind方法使用RoutingKey将Exchange和Queue绑定起来。

上面代码作用:当只有一个客户端在使用时的声明队列的方式:它不需要一个确定的名称,没有其他客户端来使用它(独占)并且将自动清除(自动删除)。

如果多个客户端希望共享具有名称的队列,则应该使用下面的代码:

channel.exchangeDeclare(exchangeName,"direct",true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(queueName,exchangeName,routingKey);

上面创建了一个持久化的、非排他的、非自动删除的、绑定类型为direct的Queue,而且也被分配一个确定的已知名称(由客户端手动分配而非RabbitMQ自动生成)。

Channel的API方法都是可以重载的,比如exchangeDeclare、queueDeclare。根据参数不同,可以有不同的重载形式,根据自身的需要进行调用。

exchange相关方法详解

  • 创建Exchange

    exchangeDeclare有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成。

    Exchange.DeclareOk exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map arguments) throws IOException;

    方法的返回值是Exchange.DeclareOK,用来标识成功声明了一个交换器。各个参数详细:

    参数描述
    exchange交换器的名称
    type交换器的类型,常见的如:fanout、direct、topic
    durable是否持久化。为true,表示持久化,反之非持久化。持久化可以将交换器存储,在服务器重启的时候不会丢失信息。
    autoDelete是否自动删除。为true,表示自动删除。自动删除的前提是至少有一个队列或者交换器与交换器绑定,在所有队列或交换器被解绑后自动删除。
    internal是否是内置。为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
    argument其他一些结构化参数,比如alternate-exchange

    exchangeDeclare的其他重载方法:

    与此对应的,将第二个参数String type换成BuiltInExchangeType type对应的几个重载方法:

    与exchangeDeclare师出同门的还有几个方法,比如

    1. exchangeDeclareNoWait方法

      void exchangeDeclareNoWait(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map arguments) throws IOException;
      void exchangeDeclareNoWait(String exchange,BuiltinExchangeType type,boolean durable,boolean autoDelete,boolean internal,Map arguments) throws IOException;

      NoWait意思是不需要服务器返回(异步),这里返回值是void,而普通的exchangeDeclare方法的返回值是Exchange.DeclareOk,意思是在客户端声明了一个交换器后,需要等待服务器的返回。

      关于这方法不建议使用,因为如果客户端在声明了一个交换器后不等待服务器返回(可能服务器未完成交换器创建),此时客户端紧接着使用这个交换器,会发生异常。

    2. exchangeDeclarePassive方法

      Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;

      该方法比较有用,用来检测相应的交换器是否存在。如果存在则正常返回;如果不存在则抛出异常,并关闭Channel。

  • 删除Exchange

    Exchange.DeleteOk exchangeDelete(String exchange) throws IOException;
    Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
    void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;

    参数描述
    exchange交换器的名称
    ifUnused是否在交换器没有被使用的情况下删除。为true,则只有该交换器没有被使用的情况下才被删除;为false,则无论如何这个交换器都要被删除。
  • 绑定Exchange

    将交换器与队列绑定方法如下:

    Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;
    Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map arguments) throws IOException;
    void exchangeBindNoWait(String destination, String source, String routingKey, Map arguments) throws IOException;

    方法中参数:

    参数描述
    destination目标exchange
    source源exchange
    routingKey绑定的路由键
    arguments绑定时设置的参数

    绑定之后,消息从source交换器转发到destination交换器,某种程度上来说destination交换可以看作一队列。如下代码

    channel.exchangeDeclare("source","direct",false,true,null);
    channel.exchangeDeclare("destination","fanout",false,true,null);
    channel.exchangeBind("destination","source","exKey");
    channel.queueDeclare("queue",false,false,true,null);
    channel.queueBind("queue","destination","");
    channel.basicPublish("source","exKey",null,"exToExDemo".getBytes());

    生产者发送消息至交换器source中,交换器source根据路由键找到与其匹配的另一个交换器destination,并把消息转发到destination中,进而存储在destination绑定的队列queue中,如图

1070782-20181204170243531-990753115.png

交换器解绑的方法有如下:

Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;
Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map arguments) throws IOException;

  • 解绑Exchange

    交换器解绑的方法有如下:

    Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey) throws IOException;
    Exchange.UnbindOk exchangeUnbind(String destination, String source, String routingKey, Map arguments) throws IOException;

queue方法详解

  • 创建Queue

    queueDeclare只有两个重载方法:

    Queue.DeclareOk queueDeclare() throws IOException;
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments) throws IOException;

    不带任何参数的queueDeclare方法默认创建一个由RabbitMQ命名的(类似这种amq.gen-LhQzlgv3GhDOv8PIDabOXA名称,这种队列也称之为匿名队列)、排他的、自动删除的、非持久化的队列。

    方法的参数:

    参数描述
    queue队列的名称
    durable是否持久化。为true,则队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息
    exclusive是否排他。为true,则队列排他。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
    注意:
    1. 排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)是可以同时访问同一连接创建的排他队列
    2. "首次"是指如果一个连接已经声明了一个排他队列,其他连接时不允许建立同名的排他队列的,这个与普通队列不同
    3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
    autoDelete是否自动删除。为true,则队列自动删除。
    自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除
    argurnents队列的其他一些参数,如x-rnessage-ttl、x-expires、x-rnax-length、x-rnax-length-bytes、x-dead-letter-exchange、x-dead­letter-routing-key,x-rnax-priority等。

    生产者和消费者都能够使用queueDeclare来声明一个队列,但是如果消费者在同一个信道上订阅了另一个队列,就无法再声明队列了。

    对应于exchangeDeclare师出同门的方法,queueDeclare同样也有:

    1. queueDeclareNoWait方法

      void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments) throws IOException;

      方法的返回值是void,表示不需要服务端的任何返回(异步)。同样也需要注意,在调用完queueDeclareNoWait方法之后,紧接着使用声明的队列时有可能会发生异常情况。

    2. queueDeclarePassive方法

      Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;

      该方法比较有用,通过队列名获取队列的存在,成功返回与queueDeclare相同的信息。

  • 删除Queue

    Queue.DeleteOk queueDelete(String queue) throws IOException;
    Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
    void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

    参数描述
    queue队列的名称
    ifUnused是否在队列没有被使用的情况下删除。为true,则只有该队列没有被使用的情况下才被删除;为false,则无论如何这个队列都要被删除。
    ifEmptytrue表示在队列为空才能删除。
  • 清空Queue

    Queue.PurgeOk queuePurge(String queue) throws IOException;

  • 绑定Queue

    将队列和交换器绑定的方法如下

    Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException;Queue.BindOk queueBind(String queue, String exchange, String routingKey,Map arguments) throws IOException;
    void queueBindNoWait(String queue, String exchange, String routingKey, Map arguments) throws IOException;

    方法参数:

    参数描述
    queue队列名称
    exchange交换器的名称
    routingKey用来绑定队列和交换器的路由键
    argument定义绑定的一些参数
  • 解绑Queue

    将队列和交换器绑定,也可以将绑定的队列和交换器进行解绑:

    Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException;
    Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map arguments) throws IOException;

创建时机

RabbitMQ的消息存储在队列中,交换器的使用并不真正耗费服务器性能,而队列会。如要衡量RabbitMQ当前的QPSI(每秒查询率)只需看队列的即可。在实际业务应用中,需要对所创建的队列的流量、内存占用及网卡占用有一个清晰的认知,预估其平均值和峰值,以便在固定硬件资源的情况下能够进行合理有效的分配。

按照RabbitMQ官方建议,生产者和消费者都应该尝试创建(这里指声明操作)队列。这是一个很好的建议,但不适用于所有的情况。如果业务本身在架构设计之初己经充分地预估了队列的使用情况,完全可以在业务程序上线之前在服务器上创建好(比如通过页面管理、RabbitMQ命令或者更好的是从配置中心下发),这样业务程序也可以免去声明的过程,直接使用即可。

预先创建好资源还有一个好处是,可以确保交换器和队列之间正确地绑定匹配。很多时候,由于人为因素、代码缺陷等,发送消息的交换器并没有绑定任何队列,那么消息将会丢失。或者交换器绑定了某个队列,但是发送消息时的路由键无法与现存的队列匹配,那么消息也会丢失。当然可以配合mandatory参数或者备份交换器来提高程序的健壮性

与此同时,预估好队列的使用情况非常重要,如果在后期运行过程中超过预定的阔值,可以根据实际情况对当前集群进行扩容或者将相应的队列迁移到其他集群。迁移的过程也可以对业务程序完全透明。此种方法也更有利于开发和运维分工,便于相应资源的管理。

如果集群资源充足,而即将使用的队列所占用的资源又在可控的范围之内,为了增加业务程序的灵活性,也完全可以在业务程序中声明队列。至于是使用预先分配创建资源的静态方式还是动态的创建方式,需要从业务逻辑本身、公司运维体系和公司硬件资源等方面考虑。

发送消息(Publishing messages)

使用Channel.basicPublish方法将消息发送给一个exchange:

byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

为了更好的控制,可以使用重载的参数来设置消息的一些属性(比如说mandatory标志,关于mandatory标志,下面会讲到),或者在发送消息前设定一些消息属性。

channel.basicPublish(exchangeName, routingKey, mandatory,MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes);

查看MessageProperties.PERSISTENT_TEXT_PLAIN源码

/** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */public static final BasicProperties PERSISTENT_TEXT_PLAIN =new BasicProperties("text/plain",null,null,2,0, null, null, null,null, null, null, null,null, null);

可以发现上面代码发送一条消息,消息的投递模式(delivery mode)设置为2,即消息会被持久化(即存入磁盘)在服务器中。同时优先级(priority)设置为0,content-type为text/plain

可以使用Builder类自己设定消息的属性

channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(1).userId("main").build()),messageBodyBytes);

也可以发送一条带有自定义headers的消息

Map headers = new HashMap();
headers.put("localtion", "here");
headers.put("time", "today");
channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().headers(headers).build()),messageBodyBytes);

还可以发送一条带有过期时间(expiration)的消息

channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().expiration("60000").build()),messageBodyBytes);

对于basicPublish而言,有几个重载方法:

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)throws IOException;

方法参数

  • exchange:交换器名称。指定消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到RabbitMQ默认的交换器中

  • routingKey:路由键,交换器根据路由键将消息存储到相应的队列中。

  • props:消息的基本属性集,包含14个属性成员,分别有contentTypecontentEncodingheaders(Map)deliveryModeprioritycorrelationldreplyToexpirationmessageldtimestamptypeuserldappldclusterld。其中常用的几种都在上面的示例中进行了演示。

  • byte[] body:消息体(payload),真正需要发送的消息

  • mandatory:默认false。为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ将消息返回给生产者;为false时,消息直接丢弃。

    可以通过调用channel.addReturnListener来添加ReturnListener监听器获取没有被正确路由到队列的消息:

    channel.basicPublish(EXCHANGE_NAME,"",true,false,MessageProperties.PERSISTENT_TEXT_PLAIN,"Hello World!".getBytes());
    channel.addReturnListener(new ReturnListener() {@Overridepublic void handleReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body);System.out.println("返回的消息:" + message);}
    });

    发送消息指定的RoutingKey为字符串"",不能成功将消息路由到队列,此时RabbitMQ返回"Hello World!"消息,通过ReturnListener监听返回的消息并打印控制台。

  • immediate:为true时,当消息不能路由到消费者时返回消息。RabbitMQ3.0版本后以废弃,影响镜像队列性能,建议采用消息TTL和DLX。

消费消息

RabbitMQ的消费模式分两种:推(Push)模式和拉(Pull)模式。推模式采用Basic.Consume进行消费,而拉模式则是调用Basic.Get进行消费。

推模式

在推模式中,可以通过持续订阅的方式来消费消息,使用到的相关类有:

importcom.rabbitmq.client.Consumer;
importcom.rabbitmq.client.DefaultConsumer;

接收消息一般通过继承DefaultConsumer类来实现。当调用与Consumer相关的API方法时,不同的订阅采用不同的消费者标签(consuerTag)来区分彼此,在同一个Channel中的消费者也需要通过唯一的消费者标签以作区分,关键消费代码如下

boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(queueName,autoAck,"myConsumerTag",new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,Byte[] body)throws IOException{String routingKey = envelope.getRoutingKey();String contentType = properties.getContentType();long deliveryTag = envelope.getDeliveryTag();//(process the message components here ...)channel.basicAck(deliveryTag,false);}});

注意,上面代码中显式地设置autoAck为false,然后在接收到消息之后进行显式ack操作(channel.basicAck),对于消费者来说这个设置是非常必要的,可以防止消息不必要地丢失。

推模式过程:

1070782-20181204170206465-633281438.png

Channel类中basicConsume方法:

String basicConsume(String queue, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map arguments, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map arguments, Consumer callback) throws IOException;

方法参数

参数描述
queue队列的名称
autoAck是否自动确认
1. 为true时,自动确认。
2. 为false时,需要使用channel.basicAck、channel.basicNack、channel.basicReject 进行消息应答,具体看下面消息确认与拒绝。
consumerTag消费者标签,用来区分多个消费者
noLocal为true,则表示将同一个Connection中生产者发送的消息传送给这个Connection中的消费者
exclusive是否排他
argument设置消费者的其他参数
callback设置消费者的回调函数。用来处理RabbitMQ推送过来的消息,比如DefaultConsumer,使用时需要客户端重写(override)其中的方法

对于上面消费者客户端代码,重写handleDelivery方法是十分方便的。更复杂的消费者客户端会重写更多的方法,如下

// 在其他方法之前调用,返回消费者标签。
void handleConsumeOk(String consumerTag);
// 下面两个显式地或者隐式地取消订阅的时候调用
void handleCancelOk(String consumerTag);
void handleCancel(String consumerTag);
// 当Channel或者Connection关闭的时候会调用。
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
void handleRecoverOk(String consumerTag);
// 消费者callback回调重写接收消息
void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body);

也可以通过channel.basicCancel方法来显式地取消一个消费者的订阅:

channel.basicCancel(consumerTag);

上面这行代码会首先触发handleConsumerOk方法,之后触发handleDelivery方法,最后才触发handleCancelOk方法。

和生产者一样,消费者客户端同样需要考虑线程安全的问题。消费者客户端的这些callback会被分配到与Channel不同的线程池上,这意味着消费者客户端可以安全地调用这些阻塞方法,比如channel.queueDeclare、channel.basicCancel等。

每个Channel都拥有自己独立的线程。最常用的做法是一个Channel对应一个消费者,也就是意味着消费者彼此之间没有任何关联。当然也可以在一个Channel中维持多个消费者,但是要注意一个问题,如果Channel中的一个消费者一直在运行,那么其他消费者的callback会被"耽搁"。

拉模式

拉模式的消费方式。通过channel.basicGet方法可以单条地获取消息,其返回值是GetRespone,Channel类的basicGet方法没有其他重载方法,只有:

GetResponse basicGet(String queue,boolean autoAck)throws IOException;

queue代表队列的名称,如果设置autoAck为false,那么同样需要调用channel.basicAck来确认消息己被成功接收。

拉模式关键代码:

boolean autoAck = false;
GetResponse response = channel.basicGet(queueName,autoAck);
if(response == null){//未检索到任何消息。
} else { AMQP.BasicProperties props = response.getProps();byte[] body = response.getBody();long deliveryTag = response.getEnvelope().getDeliveryTag();...

由于上面的autoAck = false,还必须调用Channel.basicAck以确认已成功收到消息:

// 确认收到消息
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);

拉模式过程:

1070782-20181204170145636-846624304.png

注意:

Basic.Cnsume将信道(Channel)置为接收模式,直到取消队列的订阅为止。在接收模式期间,RabbitMQ会不断地推送消息给消费者,当然推送消息的个数还是会受到Basic.Qos的限制。

如果只想从队列获得单条消息而不是持续订阅,建议还是使用Basic.Get进行消费.但是不能将Basic.Get放在一个循环里来代替Basic.Consume,这样做会严重影响RabbitMQ的性能。

如果要实现高吞吐量,消费者理应使用Basic.Consume方法。

消费端的确认与拒绝

  • 消息确认

    RabbitMQ提供了消息确认机制以保证消息从队列可靠地到达消费者。

    消费者在订阅时,可以指定autoAck参数

    • autoAck=false:RabbitMQ会等待消费者回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。

      队列中的消息可以分为两个部分:

      1. 等待投递给消费者的消息
      2. 已经投递给消费者,但还未收到消费者确认信号的消息

      在为false时,需要使用channel.basicAckchannel.basicNackchannel.basicReject进行消息应答:

      • 对于channel.basicAck:
        • true时,RabbitMQ将自动确认所有指定小于deliveryTag(通过envelope.getDeliveryTag()获得)的消息,例如:在Channel上传递过来的deliveryTag标签有5678,当消费者先接收到标签8并确认,则小于8的有567都全部确认。
        • false时,对于上面的例子,只有8确认,其他不会确认。

      如果RabbitMQ一直没有收到消费者的确认信号,且消费者断开连接,则消息重新进入队列,等待投递给下一个消费者。

    • autoAck=true:RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

    RabbitMQ的Web管理平台可以看到当前队列的"Ready"状态和"Unacked"状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者,但还未收到消费者确认信号的消息数

1070782-20181204170059013-87903635.png

可以通过命令行查看:

[hayson@localhost ~]$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues
hello_world 1 0

  • 消息拒绝

    如果消费者接收到消息后,需要拒绝消息,RabbitMQ提供了两个方法拒绝消息

    1. 拒绝单条消息

      channel.basicReject方法拒绝单条消息

      void basicReject(long deliveryTag, boolean requeue) throws IOException;

      方法参数:

      参数描述
      deliveryTag消息的编号,可通过envelope.getDeliveryTag()获取
      requeue被拒绝的消息是否重新入队列。
      true时,重新入队列以发给下一个消费者;
      false时,立即把消息移除,不再会发给消费者
    2. 批量拒绝消息

      channel.basicNack批量拒绝消息

      void basicNack(long deliveryTag, boolean multiple, boolean requeue)throws IOException;

      其中deliveryTagrequeue的含义可以参考basicReject方法。

      multiple:

      • false,则表示拒绝编号为deliveryTag的这一条消息,这时候basicNackbasicReject方法一样;
      • 为true,则表示拒绝deliveryTag编号以下所有未被当前消费者确认的消息。

注意:

channel.basicRejectchannel.basicNack中的requeue设置为false,可以启用"死信队列"的功能,后面说到。

对于requeue,还有两个方法具备可重入队列的特性:

Basic.RecoverOk basicRecover() throws IOException;
Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

channel.basicRecover方法用来请求RabbitMQ重新发送还未被确认的消息。

如果requeue参数设置为true,则未被确认的消息会被重新进入到队列中,对于同一条消息来说,可能会被分配给与之前不同的消费者。

如果requeue参数设置为false,同一条消息会被分配给与之前相同的消费者。

默认情况下,requeue默认为true

关闭连接

在应用程序使用完后,需要关闭连接,释放资源

channel.close();
conn.close();

显式地关闭Channel是个好习惯,但这不是必须的,在Connection关闭的时候,Channel也会自动关闭。

转:https://www.cnblogs.com/zenghi-home/p/9994764.html



推荐阅读
  • 在Android开发中,使用Picasso库可以实现对网络图片的等比例缩放。本文介绍了使用Picasso库进行图片缩放的方法,并提供了具体的代码实现。通过获取图片的宽高,计算目标宽度和高度,并创建新图实现等比例缩放。 ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 本文分享了一个关于在C#中使用异步代码的问题,作者在控制台中运行时代码正常工作,但在Windows窗体中却无法正常工作。作者尝试搜索局域网上的主机,但在窗体中计数器没有减少。文章提供了相关的代码和解决思路。 ... [详细]
  • 本文介绍了使用Java实现大数乘法的分治算法,包括输入数据的处理、普通大数乘法的结果和Karatsuba大数乘法的结果。通过改变long类型可以适应不同范围的大数乘法计算。 ... [详细]
  • HDU 2372 El Dorado(DP)的最长上升子序列长度求解方法
    本文介绍了解决HDU 2372 El Dorado问题的一种动态规划方法,通过循环k的方式求解最长上升子序列的长度。具体实现过程包括初始化dp数组、读取数列、计算最长上升子序列长度等步骤。 ... [详细]
  • 本文讨论了如何优化解决hdu 1003 java题目的动态规划方法,通过分析加法规则和最大和的性质,提出了一种优化的思路。具体方法是,当从1加到n为负时,即sum(1,n)sum(n,s),可以继续加法计算。同时,还考虑了两种特殊情况:都是负数的情况和有0的情况。最后,通过使用Scanner类来获取输入数据。 ... [详细]
  • 本文介绍了C#中数据集DataSet对象的使用及相关方法详解,包括DataSet对象的概述、与数据关系对象的互联、Rows集合和Columns集合的组成,以及DataSet对象常用的方法之一——Merge方法的使用。通过本文的阅读,读者可以了解到DataSet对象在C#中的重要性和使用方法。 ... [详细]
  • baresip android编译、运行教程1语音通话
    本文介绍了如何在安卓平台上编译和运行baresip android,包括下载相关的sdk和ndk,修改ndk路径和输出目录,以及创建一个c++的安卓工程并将目录考到cpp下。详细步骤可参考给出的链接和文档。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 后台获取视图对应的字符串
    1.帮助类后台获取视图对应的字符串publicclassViewHelper{将View输出为字符串(注:不会执行对应的ac ... [详细]
  • 《数据结构》学习笔记3——串匹配算法性能评估
    本文主要讨论串匹配算法的性能评估,包括模式匹配、字符种类数量、算法复杂度等内容。通过借助C++中的头文件和库,可以实现对串的匹配操作。其中蛮力算法的复杂度为O(m*n),通过随机取出长度为m的子串作为模式P,在文本T中进行匹配,统计平均复杂度。对于成功和失败的匹配分别进行测试,分析其平均复杂度。详情请参考相关学习资源。 ... [详细]
  • 本文介绍了Java高并发程序设计中线程安全的概念与synchronized关键字的使用。通过一个计数器的例子,演示了多线程同时对变量进行累加操作时可能出现的问题。最终值会小于预期的原因是因为两个线程同时对变量进行写入时,其中一个线程的结果会覆盖另一个线程的结果。为了解决这个问题,可以使用synchronized关键字来保证线程安全。 ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • 猜字母游戏
    猜字母游戏猜字母游戏——设计数据结构猜字母游戏——设计程序结构猜字母游戏——实现字母生成方法猜字母游戏——实现字母检测方法猜字母游戏——实现主方法1猜字母游戏——设计数据结构1.1 ... [详细]
author-avatar
阿都欧巴
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有