热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

速入!3W字带你迅速上手MQ

????????关注后回复“进群”,拉你进程序员交流群????????作者丨SoWhat1412来源丨sowhat1412❝高清思维导图已同步Git:h

????????关注后回复 “进群” ,拉你进程序员交流群????????

作者丨SoWhat1412

来源丨sowhat1412

高清思维导图已同步Git:https://github.com/SoWhat1412/xmindfile,关注公众号sowhat1412获取海量资源

1. 消息队列解决了什么问题

消息中间件是目前比较流行的一个中间件,其中RabbitMQ更是占有一定的市场份额,主要用来做异步处理、应用解耦、流量削峰、日志处理等等方面。

1. 异步处理

一个用户登陆网址注册,然后系统发短信跟邮件告知注册成功,一般有三种解决方法。

  1. 串行到依次执行,问题是用户注册后就可以使用了,没必要等验证码跟邮件。

  2. 注册成功后,邮件跟验证码用并行等方式执行,问题是邮件跟验证码是非重要的任务,系统注册还要等这俩完成么?

  3. 基于异步MQ的处理,用户注册成功后直接把信息异步发送到MQ中,然后邮件系统跟验证码系统主动去拉取数据。

2. 应用解耦

比如我们有一个订单系统,还要一个库存系统,用户下订单了就要调用下库存系统来处理,直接调用到话库存系统出现问题咋办呢?

3. 流量削峰

举办一个 秒杀活动,如何较好到设计?服务层直接接受瞬间搞密度访问绝对不可以起码要加入一个MQ。

4. 日志处理

用户通过WebUI访问发送请求到时候后端如何接受跟处理呢一般?

2. RabbitMQ 安装跟配置

官网:https://www.rabbitmq.com/download.html

开发语言:https://www.erlang.org/

正式到安装跟允许需要Erlang跟RabbitMQ俩版本之间相互兼容!我这里图省事直接用Docker 拉取镜像了。下载:开启:管理页面 默认账号:guest  默认密码:guest 。Docker启动时候可以指定账号密码对外端口以及

docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management 

启动:用户添加:vitrual hosts 相当于mysql中的DB。创建一个virtual hosts,一般以/ 开头。对用户进行授权,点击/vhost_mmr,至于WebUI多点点即可了解。

3. 实战

RabbitMQ 官网支持任务模式:https://www.rabbitmq.com/getstarted.htm

l创建Maven项目导入必要依赖:

    com.rabbitmqamqp-client4.0.2org.slf4jslf4j-api1.7.10org.slf4jslf4j-log4j121.7.5log4jlog4j1.2.17junitjunit4.11

0. 获取MQ连接

package com.sowhat.mq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConnectionUtils {/*** 连接器* @return* @throws IOException* @throws TimeoutException*/public static Connection getConnection() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/vhost_mmr");factory.setUsername("user_mmr");factory.setPassword("sowhat");Connection connection = factory.newConnection();return connection;}
}

1. 简单队列

P:Producer 消息的生产者 中间:Queue消息队列 C:Consumer 消息的消费者

package com.sowhat.mq.simple;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String QUEUE_NAME = "test_simple_queue";public static void main(String[] args) throws IOException, TimeoutException {// 获取一个连接Connection connection = ConnectionUtils.getConnection();// 从连接获取一个通道Channel channel = connection.createChannel();// 创建队列声明AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);String msg = "hello Simple";// exchange,队列,参数,消息字节体channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());System.out.println("--send msg:" + msg);channel.close();connection.close();}
}
---
package com.sowhat.mq.simple;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消费者获取消息*/
public class Recv {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {newApi();oldApi();}private static void newApi() throws IOException, TimeoutException {// 创建连接Connection connection = ConnectionUtils.getConnection();// 创建频道Channel channel = connection.createChannel();// 队列声明  队列名,是否持久化,是否独占模式,无消息后是否自动删除,消息携带参数channel.queueDeclare(Send.QUEUE_NAME,false,false,false,null);// 定义消费者DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {@Override  // 事件模型,消息来了会触发该函数public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s = new String(body, "utf-8");System.out.println("---new api recv:" + s);}};// 监听队列channel.basicConsume(Send.QUEUE_NAME,true,defaultConsumer);}// 老方法 消费者 MQ 在3。4以下 用次方法,private static void oldApi() throws IOException, TimeoutException, InterruptedException {// 创建连接Connection connection = ConnectionUtils.getConnection();// 创建频道Channel channel = connection.createChannel();// 定义队列消费者QueueingConsumer consumer = new QueueingConsumer(channel);//监听队列channel.basicConsume(Send.QUEUE_NAME, true, consumer);while (true) {// 发货体QueueingConsumer.Delivery delivery = consumer.nextDelivery();byte[] body = delivery.getBody();String s = new String(body);System.out.println("---Recv:" + s);}}
}

右上角有可以设置页面刷新频率,然后可以在UI界面直接手动消费掉,如下图:简单队列的不足:耦合性过高,生产者一一对应消费者,如果有多个消费者想消费队列中信息就无法实现了。

2. WorkQueue 工作队列

Simple队列中只能一一对应的生产消费,实际开发中生产者发消息很简单,而消费者要跟业务结合,消费者接受到消息后要处理从而会耗时。「可能会出现队列中出现消息积压」。所以如果多个消费者可以加速消费。

1. round robin 轮询分发

代码编程一个生产者两个消费者:

package com.sowhat.mq.work;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String  QUEUE_NAME &#61; "test_work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 获取连接Connection connection &#61; ConnectionUtils.getConnection();// 获取 channelChannel channel &#61; connection.createChannel();// 声明队列AMQP.Queue.DeclareOk declareOk &#61; channel.queueDeclare(QUEUE_NAME, false, false, false, null);for (int i &#61; 0; i <50 ; i&#43;&#43;) {String msg &#61; "hello-" &#43; i;System.out.println("WQ send " &#43; msg);channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());Thread.sleep(i*20);}channel.close();connection.close();}
}---
package com.sowhat.mq.work;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection &#61; ConnectionUtils.getConnection();// 获取通道Channel channel &#61; connection.createChannel();// 声明队列channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);//定义消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {&#64;Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s &#61; new String(body, "utf-8");System.out.println("【1】&#xff1a;" &#43; s);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【1】 done");}}};boolean autoAck &#61; true;channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);}
}
---
package com.sowhat.mq.work;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection &#61; ConnectionUtils.getConnection();// 获取通道Channel channel &#61; connection.createChannel();// 声明队列channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);//定义消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {&#64;Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s &#61; new String(body, "utf-8");System.out.println("【2】&#xff1a;" &#43; s);try {Thread.sleep(1000 );} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【2】 done");}}};boolean autoAck &#61; true;channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);}
}

现象&#xff1a;消费者1 跟消费者2 处理的数据量完全一样的个数&#xff1a;消费者1:处理偶数 消费者2:处理奇数 这种方式叫轮询分发(round-robin)结果就是不管两个消费者谁忙&#xff0c;「数据总是你一个我一个」&#xff0c;MQ 给两个消费发数据的时候是不知道消费者性能的&#xff0c;默认就是雨露均沾。此时 autoAck &#61; true。

2. 公平分发 fair dipatch

如果要实现公平分发&#xff0c;要让消费者消费完毕一条数据后就告知MQ&#xff0c;再让MQ发数据即可。自动应答要关闭&#xff01;

package com.sowhat.mq.work;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String  QUEUE_NAME &#61; "test_work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 获取连接Connection connection &#61; ConnectionUtils.getConnection();// 获取 channelChannel channel &#61; connection.createChannel();// s声明队列AMQP.Queue.DeclareOk declareOk &#61; channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 每个消费者发送确认消息之前&#xff0c;消息队列不发送下一个消息到消费者&#xff0c;一次只发送一个消息// 从而限制一次性发送给消费者到消息不得超过1个。int perfetchCount &#61; 1;channel.basicQos(perfetchCount);for (int i &#61; 0; i <50 ; i&#43;&#43;) {String msg &#61; "hello-" &#43; i;System.out.println("WQ send " &#43; msg);channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());Thread.sleep(i*20);}channel.close();connection.close();}
}
---
package com.sowhat.mq.work;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection &#61; ConnectionUtils.getConnection();// 获取通道final Channel channel &#61; connection.createChannel();// 声明队列channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);// 保证一次只分发一个channel.basicQos(1);//定义消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {&#64;Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s &#61; new String(body, "utf-8");System.out.println("【1】&#xff1a;" &#43; s);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【1】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck &#61; false;channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);}
}
---
package com.sowhat.mq.work;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {public static void main(String[] args) throws IOException, TimeoutException {// 获取连接Connection connection &#61; ConnectionUtils.getConnection();// 获取通道final Channel channel &#61; connection.createChannel();// 声明队列channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);// 保证一次只分发一个channel.basicQos(1);//定义消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {&#64;Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s &#61; new String(body, "utf-8");System.out.println("【2】&#xff1a;" &#43; s);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【2】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck &#61; false;channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);}
}

结果&#xff1a;实现了公平分发&#xff0c;消费者2 是消费者1消费数量的2倍。

3. publish/subscribe 发布订阅模式

类似公众号的订阅跟发布&#xff0c;无需指定routingKey&#xff1a;

解读&#xff1a;

  1. 一个生产者多个消费者

  2. 每一个消费者都有一个自己的队列

  3. 生产者没有把消息直接发送到队列而是发送到了交换机转化器(exchange)

  4. 每一个队列都要绑定到交换机上。

  5. 生产者发送的消息经过交换机到达队列&#xff0c;从而实现一个消息被多个消费者消费。

生产者&#xff1a;

package com.sowhat.mq.ps;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String EXCHANGE_NAME &#61; "test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();//声明交换机channel.exchangeDeclare(EXCHANGE_NAME,"fanout");// 分发&#61; fanout// 发送消息String msg &#61; "hello ps ";channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());System.out.println("Send:" &#43; msg);channel.close();connection.close();}
}

消息哪儿去了&#xff1f;丢失了&#xff0c;在RabbitMQ中只有队列有存储能力&#xff0c;「因为这个时候队列还没有绑定到交换机 所以消息丢失了」。消费者&#xff1a;

package com.sowhat.mq.ps;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {public static final String  QUEUE_NAME &#61; "test_queue_fanout_email";public static final String EXCHANGE_NAME &#61; "test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();final Channel channel &#61; connection.createChannel();// 队列声明channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 绑定队列到交换机转发器channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );// 保证一次只分发一个channel.basicQos(1);//定义消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {&#64;Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s &#61; new String(body, "utf-8");System.out.println("【1】&#xff1a;" &#43; s);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【1】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck &#61; false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}
---
package com.sowhat.mq.ps;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {public static final String  QUEUE_NAME &#61; "test_queue_fanout_sms";public static final String EXCHANGE_NAME &#61; "test_exchange_fanout";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();final Channel channel &#61; connection.createChannel();// 队列声明channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 绑定队列到交换机转发器channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"" );// 保证一次只分发一个channel.basicQos(1);//定义消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {&#64;Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s &#61; new String(body, "utf-8");System.out.println("【2】&#xff1a;" &#43; s);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【2】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck &#61; false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

「同时还可以自己手动的添加一个队列监控到该exchange」

4. routing 路由选择 通配符模式

Exchange(交换机&#xff0c;转发器)&#xff1a;「一方面接受生产者消息&#xff0c;另一方面是向队列推送消息」。匿名转发用 ""  表示&#xff0c;比如前面到简单队列跟WorkQueue。fanout&#xff1a;不处理路由键。「不需要指定routingKey」&#xff0c;我们只需要把队列绑定到交换机&#xff0c; 「消息就会被发送到所有到队列中」direct&#xff1a;处理路由键&#xff0c;「需要指定routingKey」&#xff0c;此时生产者发送数据到时候会指定key&#xff0c;任务队列也会指定key&#xff0c;只有key一样消息才会被传送到队列中。如下图

package com.sowhat.mq.routing;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String  EXCHANGE_NAME &#61; "test_exchange_direct";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();// exchangechannel.exchangeDeclare(EXCHANGE_NAME,"direct");String msg &#61; "hello info!";// 可以指定类型String routingKey &#61; "info";channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());System.out.println("Send : " &#43; msg);channel.close();connection.close();}
}
---
package com.sowhat.mq.routing;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {public static final String  EXCHANGE_NAME &#61; "test_exchange_direct";public static final String QUEUE_NAME &#61; "test_queue_direct_1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();final Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);channel.basicQos(1);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");//定义消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {&#64;Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s &#61; new String(body, "utf-8");System.out.println("【1】&#xff1a;" &#43; s);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【1】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck &#61; false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}
---
package com.sowhat.mq.routing;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {public static final String EXCHANGE_NAME &#61; "test_exchange_direct";public static final String QUEUE_NAME &#61; "test_queue_direct_2";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();final Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);// 绑定种类似 Keychannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");//定义消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {&#64;Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s &#61; new String(body, "utf-8");System.out.println("【2】&#xff1a;" &#43; s);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【2】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(), false);}}};// 自动应答boolean autoAck &#61; false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

WebUI:缺点&#xff1a;路由key必须要明确&#xff0c;无法实现规则性模糊匹配。

5. Topics 主题

将路由键跟某个模式匹配&#xff0c;# 表示匹配 >&#61;1个字符&#xff0c; *表示匹配一个。生产者会带routingKey&#xff0c;但是消费者的MQ会带模糊routingKey。商品&#xff1a;发布、删除、修改、查询。

package com.sowhat.mq.topic;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {public static final String EXCHANGE_NAME &#61; "test_exchange_topic";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();// exchangechannel.exchangeDeclare(EXCHANGE_NAME, "topic");String msg &#61; "商品!";// 可以指定类型String routingKey &#61; "goods.find";channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());System.out.println("Send : " &#43; msg);channel.close();connection.close();}
}
---
package com.sowhat.mq.topic;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {public static final String  EXCHANGE_NAME &#61; "test_exchange_topic";public static final String QUEUE_NAME &#61; "test_queue_topic_1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();final Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);channel.basicQos(1);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");//定义消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {&#64;Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s &#61; new String(body, "utf-8");System.out.println("【1】&#xff1a;" &#43; s);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【1】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(),false);}}};// 自动应答boolean autoAck &#61; false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}
---
package com.sowhat.mq.topic;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {public static final String EXCHANGE_NAME &#61; "test_exchange_topic";public static final String QUEUE_NAME &#61; "test_queue_topic_2";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();final Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);// 此乃重点channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "goods.#");//定义消费者DefaultConsumer consumer &#61; new DefaultConsumer(channel) {&#64;Override // 事件触发机制public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String s &#61; new String(body, "utf-8");System.out.println("【2】&#xff1a;" &#43; s);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("【2】 done");// 手动回执channel.basicAck(envelope.getDeliveryTag(), false);}}};// 自动应答boolean autoAck &#61; false;channel.basicConsume(QUEUE_NAME, autoAck, consumer);}
}

6. MQ的持久化跟非持久化

因为消息在内存中&#xff0c;如果MQ挂了那么消息也丢失了&#xff0c;所以应该考虑MQ的持久化。MQ是支持持久化的&#xff0c;

// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);/*** Declare a queue* &#64;see com.rabbitmq.client.AMQP.Queue.Declare* &#64;see com.rabbitmq.client.AMQP.Queue.DeclareOk* &#64;param queue the name of the queue* &#64;param durable true if we are declaring a durable queue (the queue will survive a server restart)* &#64;param exclusive true if we are declaring an exclusive queue (restricted to this connection)* &#64;param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)* &#64;param arguments other properties (construction arguments) for the queue* &#64;return a declaration-confirm method to indicate the queue was successfully declared* &#64;throws java.io.IOException if an error is encountered*/Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments) throws IOException;

boolean durable就是表明是否可以持久化&#xff0c;如果我们将程序中的durable &#61; false改为true是不可以的&#xff01;因为我们已经定义过的test_work_queue&#xff0c;这个queue已声明为未持久化的。结论&#xff1a;MQ 不允许修改一个已经存在的队列参数。

7. 消费者端手动跟自动确认消息

// 自动应答boolean autoAck &#61; false;channel.basicConsume(Send.QUEUE_NAME, autoAck, consumer);

当MQ发送数据个消费者后&#xff0c;消费者要对收到对信息应答给MQ。

如果autoAck &#61; true 表示「自动确认模式」&#xff0c;一旦MQ把消息分发给消费者就会把消息从内存中删除。如果消费者收到消息但是还没有消费完而MQ中数据已删除则会导致丢失了正在处理对消息。

如果autoAck &#61; false表示「手动确认模式」&#xff0c;如果有个消费者挂了&#xff0c;MQ因为没有收到回执信息可以把该信息再发送给其他对消费者。

MQ支持消息应答(Message acknowledgement)&#xff0c;消费者发送一个消息应答告诉MQ这个消息已经被消费了&#xff0c;MQ才从内存中删除。消息应答模式「默认为 false」

8. RabbitMQ生产者端消息确认机制(事务 &#43; confirm)

在RabbitMQ中我们可以通过持久化来解决MQ服务器异常的数据丢失问题&#xff0c;但是「生产者如何确保数据发送到MQ了」&#xff1f;默认情况下生产者也是不知道的。如何解决 呢&#xff1f;

1. AMQP事务

第一种方式AMQP实现了事务机制&#xff0c;类似mysql的事务机制。txSelect&#xff1a;用户将当前channel设置为transition模式。txCommit&#xff1a;用于提交事务。txRollback&#xff1a;用于回滚事务。

以上都是对生产者对操作。

package com.sowhat.mq.tx;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class TxSend {public static final String QUEUE_NAME &#61; "test_queue_tx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String msg &#61; "hello tx message";try {//开启事务模式channel.txSelect();channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());int x &#61; 1 / 0;// 提交事务channel.txCommit();} catch (IOException e) {// 回滚channel.txRollback();System.out.println("send message rollback");} finally {channel.close();connection.close();}}
}
---
package com.sowhat.mq.tx;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class TxRecv {public static final String QUEUE_NAME &#61; "test_queue_tx";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String s &#61; channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("recv[tx] msg:" &#43; new String(body, "utf-8"));}});channel.close();connection.close();}
}

缺点就是大量对请求尝试然后失败然后回滚&#xff0c;会降低MQ的吞吐量。

2. Confirm模式。

「生产者端confirm实现原理」生产者将信道设置为confirm模式&#xff0c;一旦信道进入了confirm模式&#xff0c;所以该信道上发布的信息都会被派一个唯一的ID(从1开始)&#xff0c;一旦消息被投递到所有的匹配队列后&#xff0c;Broker就回发送一个确认给生产者(包含消息唯一ID)&#xff0c;这就使得生产者知道消息已经正确到达目的队列了&#xff0c;如果消息跟队列是可持久化的&#xff0c;那么确认消息会在消息写入到磁盘后才发出。broker回传给生产者到确认消息中deliver-tag域包含了确认消息到序列号&#xff0c;此外broker也可以设置basic.ack的multiple域&#xff0c;表示这个序列号之前所以信息都已经得到处理。

Confirm模式最大的好处在于是异步的。第一条消息发送后不用一直等待回复后才发第二条消息。

开启confirm模式&#xff1a;channel.confimSelect()编程模式&#xff1a;

1. 普通的发送一个消息后就 waitForConfirms()

package com.sowhat.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send1 {public static final String QUEUE_NAME &#61; "test_queue_confirm1";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 将channel模式设置为 confirm模式&#xff0c;注意设置这个不能设置为事务模式。channel.confirmSelect();String msg &#61; "hello confirm message";channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());if (!channel.waitForConfirms()) {System.out.println("消息发送失败");} else {System.out.println("消息发送OK");}channel.close();connection.close();}
}
---
package com.sowhat.confirm;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv {public static final String QUEUE_NAME &#61; "test_queue_confirm1";public static void main(String[] args) throws IOException, TimeoutException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String s &#61; channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {&#64;Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("recv[tx] msg:" &#43; new String(body, "utf-8"));}});}
}

2. 批量的发一批数据 waitForConfirms()

package com.sowhat.confirm;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send2 {public static final String QUEUE_NAME &#61; "test_queue_confirm1";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 将channel模式设置为 confirm模式&#xff0c;注意设置这个不能设置为事务模式。channel.confirmSelect();String msg &#61; "hello confirm message";// 批量发送for (int i &#61; 0; i < 10; i&#43;&#43;) {channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());}// 确认if (!channel.waitForConfirms()) {System.out.println("消息发送失败");} else {System.out.println("消息发送OK");}channel.close();connection.close();}
}
---
接受信息跟上面一样

3. 异步confirm模式&#xff0c;提供一个回调方法。

Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(包含当前发出消息序号)&#xff0c;我们需要自己为每一个Channel维护一个unconfirm的消息序号集合&#xff0c;每publish一条数据&#xff0c;集合中元素加1&#xff0c;每回调一次handleAck方法&#xff0c;unconfirm集合删掉响应的一条(multiple&#61;false)或多条(multiple&#61;true)记录&#xff0c;从运行效率来看&#xff0c;unconfirm集合最好采用有序集合SortedSet存储结构。

package com.sowhat.mq.confirm;import com.rabbitmq.client.*;
import com.sowhat.mq.util.ConnectionUtils;import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;public class Send3 {public static final String QUEUE_NAME &#61; "test_queue_confirm3";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection &#61; ConnectionUtils.getConnection();Channel channel &#61; connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//生产者调用confirmSelectchannel.confirmSelect();// 存放未确认消息final SortedSet confirmSet &#61; Collections.synchronizedSortedSet(new TreeSet());// 添加监听通道channel.addConfirmListener(new ConfirmListener() {// 回执有问题的public void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple) {System.out.println("--handleNack---multiple");confirmSet.headSet(deliveryTag &#43; 1).clear();} else {System.out.println("--handleNack-- multiple false");confirmSet.remove(deliveryTag);}}// 没有问题的handleAckpublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple) {System.out.println("--handleAck---multiple");confirmSet.headSet(deliveryTag &#43; 1).clear();} else {System.out.println("--handleAck--multiple false");confirmSet.remove(deliveryTag);}}});// 一般情况下是先开启 消费者&#xff0c;指定好 exchange跟routingkey&#xff0c;如果生产者等routingkey 就会触发这个return 方法channel.addReturnListener(new ReturnListener() {public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("---- handle return----");System.out.println("replyCode:" &#43; replyCode );System.out.println("replyText:" &#43;replyText );System.out.println("exchange:" &#43; exchange);System.out.println("routingKey:" &#43; routingKey);System.out.println("properties:" &#43; properties);System.out.println("body:" &#43; new String(body));}});String msgStr &#61; "sssss";while(true){long nextPublishSeqNo &#61; channel.getNextPublishSeqNo();channel.basicPublish("",QUEUE_NAME,null,msgStr.getBytes());confirmSet.add(nextPublishSeqNo);Thread.sleep(1000);}}
}

总结&#xff1a;AMQP模式相对来说没Confirm模式性能好些&#xff0c;推荐使用后者。

9. RabbitMQ延迟队列 跟死信

淘宝订单付款&#xff0c;验证码等限时类型服务。

        Map headers &#61;  new HashMap();headers.put("my1","111");headers.put("my2","222");AMQP.BasicProperties build &#61; new AMQP.BasicProperties().builder().deliveryMode(2).contentEncoding("utf-8").expiration("10000").headers(headers).build();

死信的处理&#xff1a;

10. SpringBoot Tpoic Demo

需求图&#xff1a;新建SpringBoot 项目添加如下依赖&#xff1a;

       org.springframework.bootspring-boot-starter-amqp

1. 生产者

application.yml

spring:rabbitmq:host: 127.0.0.1username: adminpassword: admin

测试用例&#xff1a;

package com.sowhat.mqpublisher;import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;&#64;SpringBootTest
class MqpublisherApplicationTests {&#64;Autowiredprivate AmqpTemplate amqpTemplate;&#64;Testvoid userInfo() {/*** exchange,routingKey,message*/this.amqpTemplate.convertAndSend("log.topic","user.log.error","Users...");}
}

2. 消费者

application.xml

spring:rabbitmq:host: 127.0.0.1username: adminpassword: admin# 自定义配置
mq:config:exchange_name: log.topic# 配置队列名称queue_name:info: log.infoerror: log.errorlogs: log.logs

三个不同的消费者&#xff1a;

package com.sowhat.mqconsumer.service;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** &#64;QueueBinding value属性&#xff1a;用于绑定一个队列。&#64;Queue去查找一个名字为value属性中的值得队列&#xff0c;如果没有则创建&#xff0c;如果有则返回* type &#61; ExchangeTypes.TOPIC 指定交换器类型。默认的direct交换器*/
&#64;Service
public class ErrorReceiverService {/*** 把一个方法跟一个队列进行绑定&#xff0c;收到消息后绑定给msg*/&#64;RabbitListener(bindings &#61; &#64;QueueBinding(value &#61; &#64;Queue(value &#61; "${mq.config.queue_name.error}"),exchange &#61; &#64;Exchange(value &#61; "${mq.config.exchange_name}", type &#61; ExchangeTypes.TOPIC),key &#61; "*.log.error"))public void process(String msg) {System.out.println(msg &#43; " Logs...........");}
}
---
package com.sowhat.mqconsumer.service;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** &#64;QueueBinding value属性&#xff1a;用于绑定一个队列。* &#64;Queue去查找一个名字为value属性中的值得队列&#xff0c;如果没有则创建&#xff0c;如果有则返回*/
&#64;Service
public class InfoReceiverService {/*** 添加一个能够处理消息的方法*/&#64;RabbitListener(bindings &#61; &#64;QueueBinding(value &#61; &#64;Queue(value &#61;"${mq.config.queue_name.info}"),exchange &#61; &#64;Exchange(value &#61; "${mq.config.exchange_name}",type &#61; ExchangeTypes.TOPIC),key &#61; "*.log.info"))public void process(String msg){System.out.println(msg&#43;" Info...........");}
}
--
package com.sowhat.mqconsumer.service;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;/*** &#64;QueueBinding value属性&#xff1a;用于绑定一个队列。* &#64;Queue去查找一个名字为value属性中的值得队列&#xff0c;如果没有则创建&#xff0c;如果有则返回*/
&#64;Service
public class LogsReceiverService {/*** 添加一个能够处理消息的方法*/&#64;RabbitListener(bindings &#61; &#64;QueueBinding(value &#61; &#64;Queue(value &#61;"${mq.config.queue_name.logs}"),exchange &#61; &#64;Exchange(value &#61; "${mq.config.exchange_name}",type &#61; ExchangeTypes.TOPIC),key &#61; "*.log.*"))public void process(String msg){System.out.println(msg&#43;" Error...........");}
}

详细安装跟代码看参考下载&#xff1a;

总结

如果需要指定模式一般是在消费者端设置&#xff0c;灵活性调节。

模式生产者Queue生产者exchange生产者routingKey消费者exchange消费者queueroutingKey
Simple(简单模式少用)指定不指定不指定不指定指定不指定
WorkQueue(多个消费者少用)指定不指定不指定不指定指定不指定
fanout(publish/subscribe模式)不指定指定不指定指定指定不指定
direct(路由模式)不指定指定指定指定指定消费者routingKey精确指定多个
topic(主题模糊匹配)不指定指定指定指定指定消费者routingKey可以进行模糊匹配

-End-

最近有一些小伙伴&#xff0c;让我帮忙找一些 面试题 资料&#xff0c;于是我翻遍了收藏的 5T 资料后&#xff0c;汇总整理出来&#xff0c;可以说是程序员面试必备&#xff01;所有资料都整理到网盘了&#xff0c;欢迎下载&#xff01;

点击????卡片&#xff0c;关注后回复【面试题】即可获取

在看点这里好文分享给更多人↓↓


推荐阅读
  • baresip android编译、运行教程1语音通话
    本文介绍了如何在安卓平台上编译和运行baresip android,包括下载相关的sdk和ndk,修改ndk路径和输出目录,以及创建一个c++的安卓工程并将目录考到cpp下。详细步骤可参考给出的链接和文档。 ... [详细]
  • Java验证码——kaptcha的使用配置及样式
    本文介绍了如何使用kaptcha库来实现Java验证码的配置和样式设置,包括pom.xml的依赖配置和web.xml中servlet的配置。 ... [详细]
  • MySQL中的MVVC多版本并发控制机制的应用及实现
    本文介绍了MySQL中MVCC的应用及实现机制。MVCC是一种提高并发性能的技术,通过对事务内读取的内存进行处理,避免写操作堵塞读操作的并发问题。与其他数据库系统的MVCC实现机制不尽相同,MySQL的MVCC是在undolog中实现的。通过undolog可以找回数据的历史版本,提供给用户读取或在回滚时覆盖数据页上的数据。MySQL的大多数事务型存储引擎都实现了MVCC,但各自的实现机制有所不同。 ... [详细]
  • 微软头条实习生分享深度学习自学指南
    本文介绍了一位微软头条实习生自学深度学习的经验分享,包括学习资源推荐、重要基础知识的学习要点等。作者强调了学好Python和数学基础的重要性,并提供了一些建议。 ... [详细]
  • 在Docker中,将主机目录挂载到容器中作为volume使用时,常常会遇到文件权限问题。这是因为容器内外的UID不同所导致的。本文介绍了解决这个问题的方法,包括使用gosu和suexec工具以及在Dockerfile中配置volume的权限。通过这些方法,可以避免在使用Docker时出现无写权限的情况。 ... [详细]
  • EPICS Archiver Appliance存储waveform记录的尝试及资源需求分析
    本文介绍了EPICS Archiver Appliance存储waveform记录的尝试过程,并分析了其所需的资源容量。通过解决错误提示和调整内存大小,成功存储了波形数据。然后,讨论了储存环逐束团信号的意义,以及通过记录多圈的束团信号进行参数分析的可能性。波形数据的存储需求巨大,每天需要近250G,一年需要90T。然而,储存环逐束团信号具有重要意义,可以揭示出每个束团的纵向振荡频率和模式。 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • 如何去除Win7快捷方式的箭头
    本文介绍了如何去除Win7快捷方式的箭头的方法,通过生成一个透明的ico图标并将其命名为Empty.ico,将图标复制到windows目录下,并导入注册表,即可去除箭头。这样做可以改善默认快捷方式的外观,提升桌面整洁度。 ... [详细]
  • 如何用UE4制作2D游戏文档——计算篇
    篇首语:本文由编程笔记#小编为大家整理,主要介绍了如何用UE4制作2D游戏文档——计算篇相关的知识,希望对你有一定的参考价值。 ... [详细]
  • 使用在线工具jsonschema2pojo根据json生成java对象
    本文介绍了使用在线工具jsonschema2pojo根据json生成java对象的方法。通过该工具,用户只需将json字符串复制到输入框中,即可自动将其转换成java对象。该工具还能解析列表式的json数据,并将嵌套在内层的对象也解析出来。本文以请求github的api为例,展示了使用该工具的步骤和效果。 ... [详细]
  • 推荐系统遇上深度学习(十七)详解推荐系统中的常用评测指标
    原创:石晓文小小挖掘机2018-06-18笔者是一个痴迷于挖掘数据中的价值的学习人,希望在平日的工作学习中,挖掘数据的价值, ... [详细]
  • XML介绍与使用的概述及标签规则
    本文介绍了XML的基本概念和用途,包括XML的可扩展性和标签的自定义特性。同时还详细解释了XML标签的规则,包括标签的尖括号和合法标识符的组成,标签必须成对出现的原则以及特殊标签的使用方法。通过本文的阅读,读者可以对XML的基本知识有一个全面的了解。 ... [详细]
  • 本文介绍了在使用TortoiseSVN的Repo-browser浏览SVN时出现的错误,以及解决方法。文章提到了卸载当前版本、安装较低版本、使用更下一层的路径等解决方案。同时指出该问题可能是客户端与SVN服务端不匹配造成的,且服务端无法升级更高的SVN版本。 ... [详细]
  • FeatureRequestIsyourfeaturerequestrelatedtoaproblem?Please ... [详细]
  • 李逍遥寻找仙药的迷阵之旅
    本文讲述了少年李逍遥为了救治婶婶的病情,前往仙灵岛寻找仙药的故事。他需要穿越一个由M×N个方格组成的迷阵,有些方格内有怪物,有些方格是安全的。李逍遥需要避开有怪物的方格,并经过最少的方格,找到仙药。在寻找的过程中,他还会遇到神秘人物。本文提供了一个迷阵样例及李逍遥找到仙药的路线。 ... [详细]
author-avatar
frank52_445
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有