热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

SpringBoot整合阿里云消息队列MQ

一、概述消息队列(MessageQueue,简称MQ)是阿里巴巴集团中间件技术部自主研发的专业消息中间件。产品基于高可用分布式集群技术&#

一、概述

      消息队列(Message Queue,简称 MQ)是阿里巴巴集团中间件技术部自主研发的专业消息中间件。产品基于高可用分布式集群技术,提供消息发布订阅、消息轨迹查询、定时(延时)消息、资源统计、监控报警等一系列消息云服务,是企业级互联网架构的核心产品。MQ 历史超过9年,为分布式应用系统提供异步解耦、削峰填谷的能力,同时具备海量消息堆积、高吞吐、可靠重试等互联网应用所需的特性,是阿里巴巴双11使用的核心产品。

二、Spring Boot集成

      阿里云官方文档及demo是整合Spring,使用的是xml配置,在这里,我结合官方文档,在Spring Boot框架下整合MQ,由于在网上相关文档及demo不是很多,所以有很多不足的地方,如果有更好的建议,还请多多指教。
      基础的项目搭建在这里就不多说了,在网上也有很多教程,就直接进入正题了。

1. 参数的配置

      我把MQ相关的参数写在Application.properties中

#aliyun MQ config
producerId=*** //替换为自己的账号
consumerId=***
accessKey=***
secretKey=***
tag=***
topic=***
onsAddr=http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
sendMsgTimeoutMillis=3000
suspendTimeMillis=100
maxReconsumeTimes=20

2. demo

@Configuration标注在类上,相当于把该类作为spring的xml配置文件中的,作用为:配置spring容器(应用上下文)
@Bean标注在方法上(返回某个实例的方法),等价于spring的xml配置文件中的,作用为:注册bean对象

@Configuration
public class AliMQConfig {@Value("${producerId}")public String producerId;@Value("${consumerId}")public String consumerId;@Value("${accessKey}")public String accessKey;@Value("${secretKey}")public String secretKey;@Value("${topic}")public String topic;@Value("${tag}")public String tag;@Value("${onsAddr}")public String onsAddr;//超时时间@Value("${sendMsgTimeoutMillis}")public String sendMsgTimeoutMillis;@Value("${suspendTimeMillis}")public String suspendTimeMillis;@Value("${maxReconsumeTimes}")public String maxReconsumeTimes;@Bean(initMethod = "start", destroyMethod = "shutdown")public ProducerBean getProducer() {ProducerBean producerBean = new ProducerBean();Properties properties = new Properties();properties.put(PropertyKeyConst.ProducerId, producerId);// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建properties.put(PropertyKeyConst.AccessKey, accessKey);// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建properties.put(PropertyKeyConst.SecretKey, secretKey);properties.put(PropertyKeyConst.SendMsgTimeoutMillis, sendMsgTimeoutMillis);properties.put(PropertyKeyConst.ONSAddr, onsAddr);producerBean.setProperties(properties);return producerBean;}@Bean(initMethod = "start", destroyMethod = "shutdown")public ConsumerBean getConsumer() {ConsumerBean consumerBean = new ConsumerBean();Properties properties = new Properties();properties.put(PropertyKeyConst.ConsumerId, consumerId);// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建properties.put(PropertyKeyConst.AccessKey, accessKey);// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建properties.put(PropertyKeyConst.SecretKey, secretKey);properties.put(PropertyKeyConst.SuspendTimeMillis, suspendTimeMillis);properties.put(PropertyKeyConst.MaxReconsumeTimes, maxReconsumeTimes);properties.put(PropertyKeyConst.ONSAddr, onsAddr);consumerBean.setProperties(properties);Subscription subscription = new Subscription();subscription.setTopic(topic);subscription.setExpression(tag);Map map = new HashMap();map.put(subscription, new AliMQConsumerListener());consumerBean.setSubscriptionTable(map);return consumerBean;}

这个类相当于官方demo里的xml配置,由于SpringBoot旨在减少xml配置,所以在这里就把xml配置转换为java代码。
下面是生产者发送消息:

@Component
public class AliMQUtil {private static final Logger logger = LoggerFactory.getLogger(AliMQUtil.class);@Autowiredprivate AliMQConfig aliMQConfig;@Value("${topic}")public String topic;// 发送消息public void sendMessage(String tag, String key, byte[] body) {Producer producer = aliMQConfig.getProducer();Message msg = new Message(topic, tag, body);//msg.setKey(key);try {SendResult sendResult = producer.send(msg);if (sendResult != null) {logger.info("消息发送成功:" + sendResult.toString());}} catch (ONSClientException e) {logger.info("消息发送失败:", e);// 出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。}}
}

消费者监听:

public class AliMQConsumerListener implements MessageListener {private static final Logger logger = LoggerFactory.getLogger(AliMQConsumerListener.class);@Overridepublic Action consume(Message message, ConsumeContext context) {String msg = "";try {//do something..msg = new String(message.getBody(), "UTF-8");logger.info("订阅消息:" + msg);return Action.CommitMessage;} catch (Exception e) {//消费失败logger.info("消费失败:" + msg);return Action.ReconsumeLater;}}

当项目启动,消费者就开始监听消息,实测,项目启动后台就会打印消息日志。
由于官方文档中说明MQ 的消费者和生产者客户端对象是线程安全的,可以在多个线程之间共享使用,所以这里配置的是跟随项目启动,创建一个实例,多线程之间可以共享这一实例。由于对多线程的理解不是很深刻,如果不当,还请指正。
      以上就是我在Spring Boot中对于MQ的整合方案,有很多不足的地方,希望和大家一起交流学习。



补充:MQTT的使用

针对用户在移动互联网以及物联网领域的存在的特殊消息传输需求,MQ 开放了 MQTT 协议的完整支持。

消息队列遥测传输(Message Queuing Telemetry Transport,简称 MQTT)是一种轻量的,基于发布订阅模型的即时通讯协议。该协议设计开放,协议简单,平台支持丰富,几乎可以把所有联网物品和外部连接起来,因此在移动互联网和物联网领域拥有众多优势。

协议的特点包括:

使用发布/订阅消息模式,提供一对多的消息分发,解除了应用程序之间的耦合;
对负载内容屏蔽的消息传输;
使用 TCP/IP 提供基础的网络连接;
有三种级别的消息传递服务;
小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以降低网络流量。

客户端分布:
使用 MQTT 协议的客户端作为移动端接入 MQ,一般分布在公网环境,比如嵌入式设备、移动手机、平板、浏览器之类的平台上。
使用 MQ 协议的客户端一般作为业务上的服务端接入 MQ,应该部署在阿里云的 ECS 环境。

MQ服务端:MQ发送MQTT消息,只需要在原来的代码做点小改动即可。

@Component
public class AliMQUtil {private static final Logger logger = LoggerFactory.getLogger(AliMQUtil.class);@Autowiredprivate AliMQConfig aliMQConfig;private static String topic;private static String groupId;@Value("${topic}")public String top;@Value("${groupId}")public String gId;public void setAliMQConfig(AliMQConfig aliMQConfig) {this.aliMQConfig = aliMQConfig;}public AliMQConfig getAliMQConfig() {return aliMQConfig;}private static AliMQUtil aliMQUtil;@PostConstructpublic void init() {topic = top;groupId = gId;aliMQUtil = this;aliMQUtil.aliMQConfig = this.aliMQConfig;}// 发送消息public static void sendMessage(String vehicleId, byte[] body) {Producer producer = aliMQUtil.aliMQConfig.getProducer();final Message msg = new Message(topic, // MQ 消息的 Topic,需要事先创建"MQ2MQTT", // MQ Tag,通过 MQ 向 MQTT 客户端发消息时,必须指定 MQ2MQTT 作为 Tag,其他 Tag 或者不设都将导致 MQTT 客户端收不到消息body);// 消息体,和 MQTT 的 body 对应/*** 使用 MQ 客户端给 MQTT 设备发送 P2P 消息时,需要在 MQ 消息中设置 mqttSecondTopic 属性* 设置的值是“/p2p/”+目标 ClientID*/String targetClientID = groupId + "@@@" + vehicleId;msg.putUserProperties("mqttSecondTopic", "/p2p/" + targetClientID);try {SendResult sendResult = producer.send(msg);if (sendResult != null) {logger.info("消息发送成功:" + sendResult.toString());}} catch (ONSClientException e) {logger.info("消息发送失败:", e);// 出现异常意味着发送失败,为了避免消息丢失,建议缓存该消息然后进行重试。}}
}

客户端:不能开发语言的demo都可以在阿里云帮助文档找到,我这里使用的是java,用于简单测试。

public static void acceptMessage() throws Exception {/*** 设置当前用户私有的 MQTT 的接入点。例如此处示意使用 XXX,实际使用请替换用户自己的接入点。接入点的获取方法是,在控制台创建 MQTT* 实例,每个实例都会分配一个接入点域名。*/final String broker = "tcp://****.mqtt.aliyuncs.com:1883";/*** 设置阿里云的 AccessKey,用于鉴权*/final String acessKey = "***";/*** 设置阿里云的 SecretKey,用于鉴权*/final String secretKey = "***";/*** 发消息使用的一级 Topic,需要先在 MQ 控制台里创建*/final String topic = "***";/*** MQTT 的 ClientID,一般由两部分组成,GroupID@@@DeviceID* 其中 GroupID 在 MQ 控制台里创建* DeviceID 由应用方设置,可能是设备编号等,需要唯一,否则服务端拒绝重复的 ClientID 连接*/final String clientId = "***@@@0001";String sign;MemoryPersistence persistence = new MemoryPersistence();try {final MqttClient sampleClient = new MqttClient(broker, clientId, persistence);final MqttConnectOptions connOpts = new MqttConnectOptions();System.out.println("Connecting to broker: " + broker);/*** 计算签名,将签名作为 MQTT 的 password* 签名的计算方法,参考工具类 MacSignature,第一个参数是 ClientID 的前半部分,即 GroupID* 第二个参数阿里云的 SecretKey*/sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);/*** 设置订阅方订阅的 Topic 集合,此处遵循 MQTT 的订阅规则,可以是一级 Topic,二级 Topic,P2P 消息请订阅/p2p*/final String[] topicFilters = new String[] { topic + "/notice/", topic + "/p2p" };final int[] qos = { 0, 0 };connOpts.setUserName(acessKey);connOpts.setServerURIs(new String[] { broker });connOpts.setPassword(sign.toCharArray());connOpts.setCleanSession(true);connOpts.setKeepAliveInterval(90);connOpts.setAutomaticReconnect(true);sampleClient.setCallback(new MqttCallbackExtended() {public void connectComplete(boolean reconnect, String serverURI) {System.out.println("connect success");// 连接成功,需要上传客户端所有的订阅关系try {sampleClient.subscribe(topicFilters, qos);} catch (MqttException e) {e.printStackTrace();}}public void connectionLost(Throwable throwable) {System.out.println("mqtt connection lost");}public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {System.out.println("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));}public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("deliveryComplete:" + iMqttDeliveryToken.getMessageId());}});// 客户端每次上线都必须上传自己所有涉及的订阅关系,否则可能会导致消息接收延迟sampleClient.connect(connOpts);// 每个客户端最多允许存在30个订阅关系,超出限制可能会丢弃导致收不到部分消息sampleClient.subscribe(topicFilters, qos);Thread.sleep(Integer.MAX_VALUE);} catch (Exception me) {me.printStackTrace();}}


推荐阅读
  • 深入解析Linux下的I/O多路转接epoll技术
    本文深入解析了Linux下的I/O多路转接epoll技术,介绍了select和poll函数的问题,以及epoll函数的设计和优点。同时讲解了epoll函数的使用方法,包括epoll_create和epoll_ctl两个系统调用。 ... [详细]
  • 本文讨论了一个数列求和问题,该数列按照一定规律生成。通过观察数列的规律,我们可以得出求解该问题的算法。具体算法为计算前n项i*f[i]的和,其中f[i]表示数列中有i个数字。根据参考的思路,我们可以将算法的时间复杂度控制在O(n),即计算到5e5即可满足1e9的要求。 ... [详细]
  • 本文介绍了SPOJ2829题目的解法及优化方法。题目要求找出满足一定条件的数列,并对结果取模。文章详细解释了解题思路和算法实现,并提出了使用FMT优化的方法。最后,对于第三个限制条件,作者给出了处理方法。文章最后给出了代码实现。 ... [详细]
  • Ansibleplaybook roles安装redis实例(学习笔记二十九)
    1、相关redis参数:2、templatesredis.conf配置相关参数:daemonizeyespidfilevarrunredis_{{red ... [详细]
  • 一、死锁现象与递归锁进程也是有死锁的所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作 ... [详细]
  • 本文主要解析了Open judge C16H问题中涉及到的Magical Balls的快速幂和逆元算法,并给出了问题的解析和解决方法。详细介绍了问题的背景和规则,并给出了相应的算法解析和实现步骤。通过本文的解析,读者可以更好地理解和解决Open judge C16H问题中的Magical Balls部分。 ... [详细]
  • 本文介绍了P1651题目的描述和要求,以及计算能搭建的塔的最大高度的方法。通过动态规划和状压技术,将问题转化为求解差值的问题,并定义了相应的状态。最终得出了计算最大高度的解法。 ... [详细]
  • http:my.oschina.netleejun2005blog136820刚看到群里又有同学在说HTTP协议下的Get请求参数长度是有大小限制的,最大不能超过XX ... [详细]
  • 判断数组是否全为0_连续子数组的最大和的解题思路及代码方法一_动态规划
    本文介绍了判断数组是否全为0以及求解连续子数组的最大和的解题思路及代码方法一,即动态规划。通过动态规划的方法,可以找出连续子数组的最大和,具体思路是尽量选择正数的部分,遇到负数则不选择进去,遇到正数则保留并继续考察。本文给出了状态定义和状态转移方程,并提供了具体的代码实现。 ... [详细]
  • 本文介绍了RPC框架Thrift的安装环境变量配置与第一个实例,讲解了RPC的概念以及如何解决跨语言、c++客户端、web服务端、远程调用等需求。Thrift开发方便上手快,性能和稳定性也不错,适合初学者学习和使用。 ... [详细]
  • 本文介绍了通过ABAP开发往外网发邮件的需求,并提供了配置和代码整理的资料。其中包括了配置SAP邮件服务器的步骤和ABAP写发送邮件代码的过程。通过RZ10配置参数和icm/server_port_1的设定,可以实现向Sap User和外部邮件发送邮件的功能。希望对需要的开发人员有帮助。摘要长度:184字。 ... [详细]
  • 本文讨论了在VMWARE5.1的虚拟服务器Windows Server 2008R2上安装oracle 10g客户端时出现的问题,并提供了解决方法。错误日志显示了异常访问违例,通过分析日志中的问题帧,找到了解决问题的线索。文章详细介绍了解决方法,帮助读者顺利安装oracle 10g客户端。 ... [详细]
  • 本文介绍了一道经典的状态压缩题目——关灯问题2,并提供了解决该问题的算法思路。通过使用二进制表示灯的状态,并枚举所有可能的状态,可以求解出最少按按钮的次数,从而将所有灯关掉。本文还对状压和位运算进行了解释,并指出了该方法的适用性和局限性。 ... [详细]
  • 基于分布式锁的防止重复请求解决方案
    一、前言关于重复请求,指的是我们服务端接收到很短的时间内的多个相同内容的重复请求。而这样的重复请求如果是幂等的(每次请求的结果都相同,如查 ... [详细]
  • QuestionThereareatotalofncoursesyouhavetotake,labeledfrom0ton-1.Somecoursesmayhaveprerequi ... [详细]
author-avatar
H-蔡鸿晖_515
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有