热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

RabbitMQ学习笔记之五种模式及消息确认机制

本文详细介绍简单模式Simple、工作模式Work、发布订阅模式PublishSubscribe、Topic、Routing。Maven依赖引用

本文详细介绍简单模式Simple、工作模式Work、发布订阅模式Publish/Subscribe、Topic、Routing。

Maven依赖引用

junitjunit4.11com.rabbitmqamqp-client4.0.2org.slf4jslf4j-api1.7.10org.slf4jslf4j-log4j121.7.5log4jlog4j1.2.17

连接RabbitMQ服务公用方法

package com.test.testboot.mq;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConnectionUtil {/*** 获取MQ的连接* @return* @throws IOException* @throws TimeoutException*/public static Connection getConection() throws IOException, TimeoutException {//创建连接工厂ConnectionFactory factory=new ConnectionFactory();//设置服务地址factory.setHost("127.0.0.1");//设置服务端口号factory.setPort(5672);//设置Hostfactory.setVirtualHost("/");//设置用户名factory.setUsername("");//设置密码factory.setPassword("123456");return factory.newConnection() ;}
}

模式1:简单队列模式(Simple)

消息生产者p将消息放入队列
消费者监听队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列删除
(隐患,消息可能没有被消费者正确处理,已经消失了,无法恢复)

应用场景:聊天室


  • 生产者

package com.test.testboot.mq;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Sender {private static String QUEUE_NAME="test_simple_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取一个连接Connection connection=ConnectionUtil.getConection();//获取一个通道Channel channel=connection.createChannel();//创建队列声明channel.queueDeclare(QUEUE_NAME,false,false,false,null);String msg="Hello World!";System.out.println("sendMsg:"+msg);channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());channel.close();connection.close();}
}

  • 消费者

package com.test.testboot.mq;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv {private static String QUEUE_NAME="test_simple_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取连接Connection connection=ConnectionUtil.getConection();//创建通道Channel channel = connection.createChannel();//定义队列消费者channel.queueDeclare(QUEUE_NAME,false,false,false,null);DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {//获取到达的消息@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String recvMsg = new String(body, "UTF-8");System.out.println("recvMsg:" + recvMsg);}};channel.basicConsume(QUEUE_NAME,true,defaultConsumer);;//老版本api/* QueueingConsumer comsumer= new QueueingConsumer(channel);channel.queueDeclare(QUEUE_NAME,false,false,false,null);//监听队列channel.basicConsume(QUEUE_NAME,true,comsumer);while(true){QueueingConsumer.Delivery delivery=comsumer.nextDelivery();String recvMsg=new String(delivery.getBody());System.out.println("recvMsg:"+recvMsg);}*/}
}

模式2:工作模式(Work)

生产者将消息放入队列
多个消费者同时监听同一个队列,消息如何被消费?(与具体的分发方式有关系)
C1,C2共同争抢当前消息队列的内容,谁先拿到消息,谁来负责消费
应用场景:红包;大型项目中的资源调度过程(直接由最空闲的系统争抢到资源处理任务)


  • 轮询分发(Round-Robin)

    • 生产者

package com.test.testboot.mq.work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {/** |--C1* p-----Queue---|* |--C2*/private static final String QUEUE_NAME&#61;"test_work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);for (int i &#61; 0; i <10 ; i&#43;&#43;) {String msg&#61;"Hello"&#43;i;System.out.println("[WorkQueue] Send &#xff1a;"&#43;msg);channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());Thread.sleep(i*10);}channel.close();conection.close();}}

  • 消费者1

package com.test.testboot.mq.work;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {private static final String QUEUE_NAME&#61;"test_work_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("Recv[1] recv msg:"&#43;msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("Recv[1] down");}}};/*true:表示自动确认,只要消息从队列中获取,无论消费者获取到消息后是否成功消费,都会认为消息成功消费.false:表示手动确认,消费者获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态,并且服务器会认为该消费者已经挂掉,不会再给其发送消息,直到该消费者反馈.*/boolean autoAck&#61;true;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

  • 消费者2

package com.test.testboot.mq.work;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {private static final String QUEUE_NAME&#61;"test_work_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("Recv[2] recv msg:"&#43;msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("Recv[2] down");}}};boolean autoAck&#61;true;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

  • 公平分发&#xff08;Work Fair&#xff09;

  • 生产者

package com.test.testboot.mq.workfair;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {/** |--C1* p-----Queue---|* |--C2*/private static final String QUEUE_NAME&#61;"test_work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//每个消费者 发送确认消息之前&#xff0c;消息队列不发送下一个消息到消费者&#xff0c;一次只处理一个消息//限制发送给同一个消费者不得超过一条消息channel.basicQos(1);//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);for (int i &#61; 0; i <10 ; i&#43;&#43;) {String msg&#61;"Hello"&#43;i;System.out.println("[WorkQueue] Send &#xff1a;"&#43;msg);channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());Thread.sleep(i*5);}channel.close();conection.close();}}

  • 消费者1

package com.test.testboot.mq.workfair;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {private static final String QUEUE_NAME&#61;"test_work_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//每次只分发一个channel.basicQos(1);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("Recv[1] recv msg:"&#43;msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("Recv[1] down");//手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};boolean autoAck&#61;false; //自动应答channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

  • 消费者2

package com.test.testboot.mq.workfair;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {private static final String QUEUE_NAME&#61;"test_work_queue";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取Channelfinal Channel channel &#61; conection.createChannel();channel.basicQos(1);//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("Recv[2] recv msg:"&#43;msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("Recv[2] down");channel.basicAck(envelope.getDeliveryTag(),false);}}};boolean autoAck&#61;false;//自动应答channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

模式三&#xff1a;订阅模式(publish/subscribe)

生产者将消息交给交换机
有交换机根据发布订阅的模式设定将消息同步到所有的绑定队列中;
后端的消费者都能拿到消息

应用场景:短信、邮件群发,群聊天,广告


  • 生产者

package com.test.testboot.mq.ps;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {/** |--C1* p---exchange----Queue-----|* |--C2*/private static final String EXCHANGE_NAME&#61;"test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME,"fanout");String msg&#61;"Hello";System.out.println("Send &#xff1a;"&#43;msg);channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());channel.close();conection.close();}}

  • 消费者1

package com.test.testboot.mq.ps;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {private static final String QUEUE_NAME&#61;"test_queue_fanout_email";private static final String EXCHANGE_NAME&#61;"test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//绑定交换机channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");//保证一次只分发一个channel.basicQos(1);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("[1] recv msg:"&#43;msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("[1] down");channel.basicAck(envelope.getDeliveryTag(),false);}}};boolean autoAck&#61;false;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

  • 消费者2

package com.test.testboot.mq.ps;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {private static final String QUEUE_NAME&#61;"test_queue_fanout_sms";private static final String EXCHANGE_NAME&#61;"test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//获取ChannelChannel channel &#61; conection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME,false,false,false,null);//绑定交换机channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");//保证每次只分发一个channel.basicQos(1);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("[2]recv msg:"&#43;msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("[2] down");channel.basicAck(envelope.getDeliveryTag(),false);}}};boolean autoAck&#61;true;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

模式四&#xff1a;路由模式&#xff08;Routing&#xff09;

生产者发送消息到交换机,同时绑定一个路由Key,交换机根据路由key对下游绑定的队列进行路
由key的判断,满足路由key的队列才会接收到消息,消费者消费消息

应用场景: 项目中的error报错


  • 生产者

package com.test.testboot.mq.routing;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** &#64;description: 路由模式生产者* &#64;author: Mr.ADiao* &#64;create: 2019-10-21 11:04**/
public class Send {private static final String EXCHANGE_NAME&#61;"test_exchange_direct";public static void main(String[] args) throws IOException, TimeoutException {//获取连接Connection conection &#61; ConnectionUtil.getConection();//创建频道Channel channel &#61; conection.createChannel();channel.basicQos(1);//声明交换机channel.exchangeDeclare(EXCHANGE_NAME, "direct");String msg&#61;"Hello Routing";String routingKey&#61;"warning";channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());System.out.println("send:"&#43;msg);channel.close();conection.close();}
}

  • 消费者1

package com.test.testboot.mq.routing;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {private static final String EXCHANGE_NAME&#61;"test_exchange_direct";private static final String QUEUE_NAME&#61;"test_queue_direct1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtil.getConection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");channel.basicQos(1);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("[1] recv msg:"&#43;msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("[1] down");channel.basicAck(envelope.getDeliveryTag(),false);}}};boolean autoAck&#61;false;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

  • 消费者2

package com.test.testboot.mq.routing;import com.rabbitmq.client.*;
import com.test.testboot.mq.ConnectionUtil;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** &#64;description: 路由模式消费者2* &#64;author: Mr.ADiao* &#64;create: 2019-10-21 11:05**/
public class Recv2 {private static final String EXCHANGE_NAME&#61;"test_exchange_direct";private static final String QUEUE_NAME&#61;"test_queue_direct2";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtil.getConection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");channel.basicQos(1);//定义一个消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {//消息到达触发这个方法&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg&#61;new String(body,"UTF-8");System.out.println("[2] recv msg:"&#43;msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}finally {System.out.println("[2] down");channel.basicAck(envelope.getDeliveryTag(),false);}}};boolean autoAck&#61;false;channel.basicConsume(QUEUE_NAME,autoAck,consumer);}
}

模式五&#xff1a;主题模式&#xff08;Topics&#xff09;

topic模式也称为通配符模式&#xff0c;其实他相对于routing模式最大的好处就是他多了一种匹配模式的路由&#xff0c;怎么理解匹配呢&#xff0c;其实就相当于我们之前正则的.*这种&#xff0c;不过他的匹配机制可能不是这种&#xff08;其实除了匹配规则外&#xff0c;他的作用就和routing模式一样 &#xff09;

匹配规则&#xff1a;

绑定键binding key也必须是这种形式。以特定路由键发送的消息将会发送到所有绑定键与之匹配的队列中。但绑定键有两种特殊的情况&#xff1a; 
①*&#xff08;星号&#xff09;仅代表一个单词 
②#&#xff08;井号&#xff09;代表任意个单词

示例&#xff1a;

*.apple.* :  匹配以 任意一个单词字符开头中间包含 .orange. 以任意一个单词字符结尾 的字符串。比如 a.apple.b, asd.apple.qewf 等&#xff08;注意是一个单词&#xff09;。

log.# &#xff1a;只要一lay.开头的都匹配&#xff0c;他可以匹配product.a,product.a.b, product.b.c等。


  • 生产者

package com.adiao.rabbitmq.topics;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {private static final String EXCHANGE_NAME&#61;"test_exchange_topic";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");channel.basicQos(1);String msg&#61;"This is Topic Msg";channel.basicPublish(EXCHANGE_NAME, "myTopic.key2", null, msg.getBytes());System.out.println("Send:"&#43;msg);channel.close();connection.close();}}

  • 消费者1

package com.adiao.rabbitmq.topics;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;public class Recv1 {private static final String EXCHANGE_NAME&#61;"test_exchange_topic";private static final String QUEUE_NAME&#61;"test_queue_topic1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "myTopic.#");channel.basicQos(1);DefaultConsumer consumer &#61; new DefaultConsumer(channel){&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("[1] Recv:"&#43;new String(body,"UTF-8"));//手动回执channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, false, consumer);}}

  • 消费者2

package com.adiao.rabbitmq.topics;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;public class Recv2 {private static final String EXCHANGE_NAME&#61;"test_exchange_topic";private static final String QUEUE_NAME&#61;"test_queue_topic2";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "myTopic.key1");channel.basicQos(1);DefaultConsumer consumer &#61; new DefaultConsumer(channel){&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("[2]Recv:"&#43;new String(body,"UTF-8"));channel.basicAck(envelope.getDeliveryTag(), false);}};channel.basicConsume(QUEUE_NAME, false, consumer);}}

  • 消息确认机制之事务机制

AMQP协议自带机制

弊端&#xff1a;降低RabibtMQ的吞吐量


  • 生产者

package com.adiao.rabbitmq.tx;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {private static final String QUEUE_NAME&#61;"test_queue_tx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String msg&#61;"This is tx";try {//开启事务channel.txSelect();channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());int i&#61;1/0;System.out.println("Send"&#43;msg);//提交事务channel.txCommit();} catch (Exception e) {e.printStackTrace();//回滚事务channel.txRollback();System.out.println("rollback");}channel.close();connection.close();}}

  • 消费者

package com.adiao.rabbitmq.tx;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;public class Recv {private static final String QUEUE_NAME &#61; "test_queue_tx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("Recv:"&#43;new String(body,"UTF-8"));}});}}

  • 消息确认机制之Confirm同步确认

生产者

package com.adiao.rabbitmq.confirm;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Send {private static final String QUEUE_NAME&#61;"test_queue_confirm";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//单条开启channel.confirmSelect();String msg&#61;"This is Confirm";channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());if(!channel.waitForConfirms()){System.out.println("Send Faile"); }else{System.out.println("Send Success"); }channel.close();connection.close();}}

消费者

package com.adiao.rabbitmq.confirm;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;public class Recv {private static final String QUEUE_NAME &#61; "test_queue_confirm";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();final Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body) throws IOException {System.out.println("Recv:" &#43; new String(body, "UTF-8"));channel.basicAck(envelope.getDeliveryTag(), false);}});}}

  • 批量确认消费者

package com.adiao.rabbitmq.confirm;import java.io.IOException;
import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class SendManyMsg {private static final String QUEUE_NAME&#61;"test_queue_confirm";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//单条开启channel.confirmSelect();String msg&#61;"This is Confirm";for (int i &#61; 0; i <10; i&#43;&#43;) {if(i&#61;&#61;5){int j&#61;i/0;}channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());}channel.waitForConfirmsOrDie();System.out.println("全部发送完成");channel.close();connection.close();}}

  • 异步确认消费者

    package com.adiao.rabbitmq.confirm;import java.io.IOException;
    import java.util.Date;
    import java.util.concurrent.TimeoutException;import com.adiao.rabbitmq.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmListener;
    import com.rabbitmq.client.Connection;public class SendAsyn {private static final String QUEUE_NAME&#61;"test_queue_confirm";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.confirmSelect();channel.addConfirmListener(new ConfirmListener() {public void handleNack(long deliveryTag, boolean multiple)throws IOException {System.out.println("未确认消息标识&#xff1a;"&#43;deliveryTag);}public void handleAck(long deliveryTag, boolean multiple)throws IOException {System.out.println(String.format("已确认消息标识:%d (%b)", deliveryTag,multiple));}});for (int i &#61; 0; i <10; i&#43;&#43;) {String msg&#61;new Date().getTime()&#43; ":This is Asyn Confirm";System.out.println("Send:"&#43;msg);channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());Thread.sleep(1000);}channel.close();connection.close();}}

     


推荐阅读
  • 线上问题:JavaBean赋值基础类型抛出异常
    1问题复现1.1问题实体(JavaBean规范)赋值时,抛出异常。1.2原因使用基础类型定义属性,当使用null给属 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • Java自带的观察者模式及实现方法详解
    本文介绍了Java自带的观察者模式,包括Observer和Observable对象的定义和使用方法。通过添加观察者和设置内部标志位,当被观察者中的事件发生变化时,通知观察者对象并执行相应的操作。实现观察者模式非常简单,只需继承Observable类和实现Observer接口即可。详情请参考Java官方api文档。 ... [详细]
  • 面向对象之3:封装的总结及实现方法
    本文总结了面向对象中封装的概念和好处,以及在Java中如何实现封装。封装是将过程和数据用一个外壳隐藏起来,只能通过提供的接口进行访问。适当的封装可以提高程序的理解性和维护性,增强程序的安全性。在Java中,封装可以通过将属性私有化并使用权限修饰符来实现,同时可以通过方法来访问属性并加入限制条件。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • 本文介绍了在Android开发中使用软引用和弱引用的应用。如果一个对象只具有软引用,那么只有在内存不够的情况下才会被回收,可以用来实现内存敏感的高速缓存;而如果一个对象只具有弱引用,不管内存是否足够,都会被垃圾回收器回收。软引用和弱引用还可以与引用队列联合使用,当被引用的对象被回收时,会将引用加入到关联的引用队列中。软引用和弱引用的根本区别在于生命周期的长短,弱引用的对象可能随时被回收,而软引用的对象只有在内存不够时才会被回收。 ... [详细]
  • 在开发中,有时候一个业务上要求的原子操作不仅仅包括数据库,还可能涉及外部接口或者消息队列。此时,传统的数据库事务无法满足需求。本文介绍了Java中如何利用java.lang.Runtime.addShutdownHook方法来保证业务线程的完整性。通过添加钩子,在程序退出时触发钩子,可以执行一些操作,如循环检查某个线程的状态,直到业务线程正常退出,再结束钩子程序。例子程序展示了如何利用钩子来保证业务线程的完整性。 ... [详细]
  • 巧用arguments在Javascript的函数中有个名为arguments的类数组对象。它看起来是那么的诡异而且名不经传,但众多的Javascript库都使用着它强大的功能。所 ... [详细]
  • 基于分布式锁的防止重复请求解决方案
    一、前言关于重复请求,指的是我们服务端接收到很短的时间内的多个相同内容的重复请求。而这样的重复请求如果是幂等的(每次请求的结果都相同,如查 ... [详细]
  • packagecom.huawei.it.citools.utils;importjava.io.File;importjava.io.IOException;importjava ... [详细]
  • 学习mybatis的基础知识:mybatis入门教程(二)
    2019独角兽企业重金招聘Python工程师标准2.3MyBatisprintsql在log4j.properties配置文件中添加如下配置,让mybatis打 ... [详细]
  • springboot日志【转】【补】
     市面上的日志框架日志门面(日志的抽象层)日志实现JCL(JakartaCommonsLogging)(2014)SLF4j(SimpleLoggingFacadeforJava) ... [详细]
  • java日志框架详解
    Java日志框架详解1.常用日志框架1.1Java常用日志框架类别1.2Java常用日志框架历史1.3两大日志接口阵营1.3.1基于CommonsLogging接口实现的常用日志框 ... [详细]
  • springboot项目组引入JMeter的实现步骤
    本文主要介绍了springboot项目组引入JMeter的实现步骤,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的 ... [详细]
author-avatar
幻竞_847
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有