功能描述:生产者将消息发送到队列(队列的名字为hello)中,消费者从队列中获取消息。
二、生产者// 定义队列名 public final static String QUEUE_NAME="rabbitMQ.zhxb"; public void rabbitMQ (){ try { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ参数 factory.setHost("IP"); factory.setUsername("*"); factory.setPassword("*"); factory.setPort(*); // 创建一个连接 Connection cOnnection= factory.newConnection(); // 创建一个通道(大连接中的一根线) Channel channel = connection.createChannel(); // 声明一个队列 // 第一个参数表示队列名称、第二个参数为是否持久化、第三个参数为是否是独占队列、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定义一个消息 String message = "Hello RabbitMQ"; // 发送消息到队列
// 第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("我向"+QUEUE_NAME+"发送一个消息:+'" + message); // 关闭通道和连接 channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }
连接工厂:生产Connection的的工厂
连接:RabbitMQ的socket链接,它封装了socket协议
通道:把Connection比作一条光纤电缆的话,那么Channel信道就比作成光纤电缆中的其中一束光纤
声明队列:rabbitMQ.zhxb,储存消息的容器(非持久化)
发送消息:将消息发送到交换机,然后交换机通过路由策略(规则)将消息路由到匹配的队列中去
步骤详解:// 声明一个队列
// 第一个参数表示队列名称、第二个参数为是否持久化、第三个参数为是否是独占队列、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
参数详解
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Maparguments);
arguments具体说明
官方解释:
增加解释:
// 发送消息到队列 // 第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
void basicPublish(String neme, String routingKey, boolean mandatory, boolean immediate, BasicProperties properties, byte[] message) throws IOException;
// 给某个消息设置存在时间 AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration("6000"); channel.basicPublish("", QUEUE_NAME, properties.build(), message.getBytes("UTF-8"));
// 定义队列名 private final static String QUEUE_NAME = "rabbitMQ.zhxb"; public void rabbitMQCustomer() { try { // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置RabbitMQ参数 factory.setHost("IP"); factory.setUsername("*"); factory.setPassword("*"); factory.setPort(*); // 创建一个连接 Connection cOnnection= factory.newConnection(); // 创建一个通道 Channel channel = connection.createChannel(); // 声明一个队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("消费者一,等待消息产生"); // DefaultConsumer类实现了Consumer接口,通过传入一个频道, // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery // 消费一个消息 Consumer cOnsumer= new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("我是消费者一,我消费了:" + message); try { Thread thread = Thread.currentThread(); thread.sleep(5000);//暂停1.5秒后程序继续执行 } catch (InterruptedException e) { e.printStackTrace(); } } }; // 自动应答回复队列(RabbitMQ中的消息确认机制) channel.basicConsume(QUEUE_NAME, true, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } }
1.声明一个队列
// 声明一个队列 // 第一个参数表示队列名称、第二个参数为是否持久化、第三个参数为是否是独占队列、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 规则一单指定,必须保持一致
2.消费一个消息(订阅方式)
// 消费一个消息
Consumer cOnsumer= new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("我消费了:" + message);
}
};
订阅方式:
消费原理:
3.RabbitMQ中的消息确认机制
// 自动应答回复队列(RabbitMQ中的消息确认机制) channel.basicConsume(QUEUE_NAME, true, consumer);
原理: