消息队列(Message Queue,简称 MQ)是阿里巴巴集团中间件技术部自主研发的专业消息中间件。产品基于高可用分布式集群技术,提供消息发布订阅、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。MQ 历史超过9年,为分布式应用系统提供异步解耦、削峰填谷的能力,同时具备海量消息堆积、高吞吐、可靠重试等互联网应用所需的特性,是阿里巴巴双11使用的核心产品。
阿里云官方文档及demo是整合Spring,使用的是xml配置,在这里,我结合官方文档,在Spring Boot框架下整合MQ,由于在网上相关文档及demo不是很多,所以有很多不足的地方,如果有更好的建议,还请多多指教。
基础的项目搭建在这里就不多说了,在网上也有很多教程,就直接进入正题了。
我把MQ相关的参数写在Application.properties中
#aliyun MQ config
producerId=*** //替换为自己的账号
consumerId=***
accessKey=***
secretKey=***
tag=***
topic=***
onsAddr=http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
sendMsgTimeoutMillis=3000
suspendTimeMillis=100
maxReconsumeTimes=20
@Configuration标注在类上,相当于把该类作为spring的xml配置文件中的,作用为:配置spring容器(应用上下文)
@Bean标注在方法上(返回某个实例的方法),等价于spring的xml配置文件中的,作用为:注册bean对象
@Configuration
public class AliMQConfig {@Value("${producerId}")public String producerId;@Value("${consumerId}")public String consumerId;@Value("${accessKey}")public String accessKey;@Value("${secretKey}")public String secretKey;@Value("${topic}")public String topic;@Value("${tag}")public String tag;@Value("${onsAddr}")public String onsAddr;//超时时间@Value("${sendMsgTimeoutMillis}")public String sendMsgTimeoutMillis;@Value("${suspendTimeMillis}")public String suspendTimeMillis;@Value("${maxReconsumeTimes}")public String maxReconsumeTimes;@Bean(initMethod = "start", destroyMethod = "shutdown")public ProducerBean getProducer() {ProducerBean producerBean = new ProducerBean();Properties properties = new Properties();properties.put(PropertyKeyConst.ProducerId, producerId);// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建properties.put(PropertyKeyConst.AccessKey, accessKey);// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建properties.put(PropertyKeyConst.SecretKey, secretKey);properties.put(PropertyKeyConst.SendMsgTimeoutMillis, sendMsgTimeoutMillis);properties.put(PropertyKeyConst.ONSAddr, onsAddr);producerBean.setProperties(properties);return producerBean;}@Bean(initMethod = "start", destroyMethod = "shutdown")public ConsumerBean getConsumer() {ConsumerBean consumerBean = new ConsumerBean();Properties properties = new Properties();properties.put(PropertyKeyConst.ConsumerId, consumerId);// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建properties.put(PropertyKeyConst.AccessKey, accessKey);// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建properties.put(PropertyKeyConst.SecretKey, secretKey);properties.put(PropertyKeyConst.SuspendTimeMillis, suspendTimeMillis);properties.put(PropertyKeyConst.MaxReconsumeTimes, maxReconsumeTimes);properties.put(PropertyKeyConst.ONSAddr, onsAddr);consumerBean.setProperties(properties);Subscription subscription = new Subscription();subscription.setTopic(topic);subscription.setExpression(tag);Map
这个类相当于官方demo里的xml配置,由于SpringBoot旨在减少xml配置,所以在这里就把xml配置转换为java代码。
下面是生产者发送消息:
@Component
public class AliMQUtil {private static final Logger logger = LoggerFactory.getLogger(AliMQUtil.class);@Autowiredprivate AliMQConfig aliMQConfig;@Value("${topic}")public String topic;// 发送消息public void sendMessage(String tag, String key, byte[] body) {Producer producer = aliMQConfig.getProducer();Message msg = new Message(topic, tag, body);//msg.setKey(key);try {SendResult sendResult = producer.send(msg);if (sendResult != null) {logger.info("消息发送成功:" + sendResult.toString());}} catch (ONSClientException e) {logger.info("消息发送失败:", e);// 出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。}}
}
消费者监听:
public class AliMQConsumerListener implements MessageListener {private static final Logger logger = LoggerFactory.getLogger(AliMQConsumerListener.class);@Overridepublic Action consume(Message message, ConsumeContext context) {String msg = "";try {//do something..msg = new String(message.getBody(), "UTF-8");logger.info("订阅消息:" + msg);return Action.CommitMessage;} catch (Exception e) {//消费失败logger.info("消费失败:" + msg);return Action.ReconsumeLater;}}
当项目启动,消费者就开始监听消息,实测,项目启动后台就会打印消息日志。
由于官方文档中说明MQ 的消费者和生产者客户端对象是线程安全的,可以在多个线程之间共享使用,所以这里配置的是跟随项目启动,创建一个实例,多线程之间可以共享这一实例。由于对多线程的理解不是很深刻,如果不当,还请指正。
以上就是我在Spring Boot中对于MQ的整合方案,有很多不足的地方,希望和大家一起交流学习。
针对用户在移动互联网以及物联网领域的存在的特殊消息传输需求,MQ 开放了 MQTT 协议的完整支持。
消息队列遥测传输(Message Queuing Telemetry Transport,简称 MQTT)是一种轻量的,基于发布订阅模型的即时通讯协议。该协议设计开放,协议简单,平台支持丰富,几乎可以把所有联网物品和外部连接起来,因此在移动互联网和物联网领域拥有众多优势。
协议的特点包括:
使用发布/订阅消息模式,提供一对多的消息分发,解除了应用程序之间的耦合;
对负载内容屏蔽的消息传输;
使用 TCP/IP 提供基础的网络连接;
有三种级别的消息传递服务;
小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以降低网络流量。
客户端分布:
使用 MQTT 协议的客户端作为移动端接入 MQ,一般分布在公网环境,比如嵌入式设备、移动手机、平板、浏览器之类的平台上。
使用 MQ 协议的客户端一般作为业务上的服务端接入 MQ,应该部署在阿里云的 ECS 环境。
MQ服务端:MQ发送MQTT消息,只需要在原来的代码做点小改动即可。
@Component
public class AliMQUtil {private static final Logger logger = LoggerFactory.getLogger(AliMQUtil.class);@Autowiredprivate AliMQConfig aliMQConfig;private static String topic;private static String groupId;@Value("${topic}")public String top;@Value("${groupId}")public String gId;public void setAliMQConfig(AliMQConfig aliMQConfig) {this.aliMQConfig = aliMQConfig;}public AliMQConfig getAliMQConfig() {return aliMQConfig;}private static AliMQUtil aliMQUtil;@PostConstructpublic void init() {topic = top;groupId = gId;aliMQUtil = this;aliMQUtil.aliMQConfig = this.aliMQConfig;}// 发送消息public static void sendMessage(String vehicleId, byte[] body) {Producer producer = aliMQUtil.aliMQConfig.getProducer();final Message msg = new Message(topic, // MQ 消息的 Topic,需要事先创建"MQ2MQTT", // MQ Tag,通过 MQ 向 MQTT 客户端发消息时,必须指定 MQ2MQTT 作为 Tag,其他 Tag 或者不设都将导致 MQTT 客户端收不到消息body);// 消息体,和 MQTT 的 body 对应/*** 使用 MQ 客户端给 MQTT 设备发送 P2P 消息时,需要在 MQ 消息中设置 mqttSecondTopic 属性* 设置的值是“/p2p/”+目标 ClientID*/String targetClientID = groupId + "@@@" + vehicleId;msg.putUserProperties("mqttSecondTopic", "/p2p/" + targetClientID);try {SendResult sendResult = producer.send(msg);if (sendResult != null) {logger.info("消息发送成功:" + sendResult.toString());}} catch (ONSClientException e) {logger.info("消息发送失败:", e);// 出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。}}
}
客户端:不能开发语言的demo都可以在阿里云帮助文档找到,我这里使用的是java,用于简单测试。
public static void acceptMessage() throws Exception {/*** 设置当前用户私有的 MQTT 的接入点。例如此处示意使用 XXX,实际使用请替换用户自己的接入点。接入点的获取方法是,在控制台创建 MQTT* 实例,每个实例都会分配一个接入点域名。*/final String broker = "tcp://****.mqtt.aliyuncs.com:1883";/*** 设置阿里云的 AccessKey,用于鉴权*/final String acessKey = "***";/*** 设置阿里云的 SecretKey,用于鉴权*/final String secretKey = "***";/*** 发消息使用的一级 Topic,需要先在 MQ 控制台里创建*/final String topic = "***";/*** MQTT 的 ClientID,一般由两部分组成,GroupID@@@DeviceID* 其中 GroupID 在 MQ 控制台里创建* DeviceID 由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的 ClientID 连接*/final String clientId = "***@@@0001";String sign;MemoryPersistence persistence = new MemoryPersistence();try {final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);final MqttConnectOptions connOpts = new MqttConnectOptions();System.out.println("Connecting to broker: " + broker);/*** 计算签名,将签名作为 MQTT 的 password* 签名的计算方法,参考工具类 MacSignature,第一个参数是 ClientID 的前半部分,即 GroupID* 第二个参数阿里云的 SecretKey*/sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);/*** 设置订阅方订阅的 Topic 集合,此处遵循 MQTT 的订阅规则,可以是一级 Topic,二级 Topic,P2P 消息请订阅/p2p*/final String[] topicFilters = new String[] { topic + "/notice/", topic + "/p2p" };final int[] qos = { 0, 0 };connOpts.setUserName(acessKey);connOpts.setServerURIs(new String[] { broker });connOpts.setPassword(sign.toCharArray());connOpts.setCleanSession(true);connOpts.setKeepAliveInterval(90);connOpts.setAutomaticReconnect(true);sampleClient.setCallback(new MqttCallbackExtended() {public void connectComplete(boolean reconnect, String serverURI) {System.out.println("connect success");// 连接成功,需要上传客户端所有的订阅关系try {sampleClient.subscribe(topicFilters, qos);} catch (MqttException e) {e.printStackTrace();}}public void connectionLost(Throwable throwable) {System.out.println("mqtt connection lost");}public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));}public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());}});// 客户端每次上线都必须上传自己所有涉及的订阅关系,否则可能会导致消息接收延迟sampleClient.connect(connOpts);// 每个客户端最多允许存在30个订阅关系,超出限制可能会丢弃导致收不到部分消息sampleClient.subscribe(topicFilters, qos);Thread.sleep(Integer.MAX_VALUE);} catch (Exception me) {me.printStackTrace();}}