热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Javaspringmvc结合阿里MQTT功能

近期,公司准备使用阿里MQTT,自己做了一个demo。准备工作首先,你歹有一个阿里云帐号,去阿里云注册一个MQTT实例&

近期,公司准备使用阿里MQTT,自己做了一个demo。


准备工作

首先,你歹有一个阿里云帐号,去阿里云注册一个MQTT实例,现在有免费试用一个月的

在这我们要记得尤其记一下我们购买的实例是买的哪的 比如我的这个是深圳  可以写进配置文件中

public final static String instanceId = "";public final static String accessKey= "";public final static String secretKey= "";public final static String cOnnectEndpoint= "";;public final static String topicId ="demo";public final static String groupId ="GID_demo";public final static String messageModel="5000";public final static String sendMsgTimeoutMillis="5000";public final static String suspendTimeMillis="5000";/*** 消息消费失败时的最大重试次数*/public final static String maxRecOnsumeTimes="10";/*** QoS参数代表传输质量,可选0,1,2*/public final static int qosLevel = 0;/*** 客户端超时时间*/public final static int timeToWait =5000;/*** MQTT所在地域*/ public final static String mqttAddress= "cn-shenzhen";

MQTT消息队列的实现功能

我们主要完成五个接口功能的实现

1.设备code发送消息到topic中

2.设备code发送消息到指定设备上(P2P模式)

3.服务启动监听队列消息

4.根据设备ID及所在队列查询设备当前状态消息(单独设备)

5.根据设备ID及所在队列批量查询设备当前状态消息(注意最大只是支持十个设备的查询)


设备code发送消息到topic中

主要思路是创建连接MQTT,直接将消息发送到对应的topic中

public void sendMqttMsgTopic(@RequestParam(value = "deviceCode", required = true) String deviceCode,@RequestParam(value = "msg", required = true) String msg) throws InterruptedException, MqttException{ String listenTopic ="earTagTopic";String clientId = MqttConfig.earTagGroupId + "@@@" + deviceCode;String topic = MqttConfig.earTagTopicId + "/" + listenTopic; //发送消息到消息队列MqttMessage message = new MqttMessage(msg.getBytes());message.setQos(MqttConfig.qosLevel);/*** 发送普通消息时,topic 必须和接收方订阅的 topic 一致,或者符合通配符匹配规则*/ try {mqttClient = MqtttClient.getProducerConnection(clientId,topic,deviceCode,MqttConfig.earTagGroupId); mqttClient.publish(topic, message);} catch (InvalidKeyException | NoSuchAlgorithmException | MqttException e1) {System.out.printf("消息发送失败:{}", e1.getMessage());e1.printStackTrace();}mqttClient.disconnect();Thread.sleep(Long.MAX_VALUE);return ;}

我们可以看到我们向earTag的eraTagTopic的主题消息发送成功,并且可以看到已经监听到并打印

如果我们不需要发送成功返回的回调,我们直接将callback代码去掉即可

mqttClient.setCallback(new MqttCallbackExtended()

我们可以看到这两条就是去掉成功回调,我们直接监听到返回结果了


设备code发送消息到指定设备上(P2P模式)

主要思路也是连接到MQTT上,然后直接向目标设备发送消息。

public void sendMqttp2pMsg(@RequestParam(value = "deviceCode", required = true) String deviceCode,@RequestParam(value = "msg", required = true) String msg) throws InterruptedException, MqttException{/*** MQ4IoT支持点对点消息,即如果发送方明确知道该消息只需要给特定的一个设备接收,且知道对端的 clientId,则可以直接发送点对点消息。* 点对点消息不需要经过订阅关系匹配,可以简化订阅方的逻辑。点对点消息的 topic 格式规范是 {{parentTopic}}/p2p/{{targetClientId}}*///对指定设备发送p2p消息 当前测试只是给监听我这个队列的设备发送消息String listenTopic ="earTagTopic";String clientId = MqttConfig.earTagGroupId + "@@@" + deviceCode+"aa";String topic = MqttConfig.earTagTopicId + "/" + listenTopic; //String msgSendTopic = MqttConfig.earTagTopicId + "/p2p/" + MqttConfig.earTagGroupId + "@@@" + deviceCode;String msgSendTopic = MqttConfig.topicId + "/p2p/" + MqttConfig.groupId + "@@@" + deviceCode;MqttMessage mqttMessage = new MqttMessage(msg.getBytes());mqttMessage.setQos(MqttConfig.qosLevel); try {mqttClient = MqtttClient.getProducerConnection(clientId,topic,deviceCode+"aa",MqttConfig.earTagGroupId); mqttClient.publish(msgSendTopic, mqttMessage);} catch (InvalidKeyException | NoSuchAlgorithmException | MqttException e1) {System.out.printf("消息发送失败:{}", e1.getMessage());e1.printStackTrace();}mqttClient.disconnect();Thread.sleep(Long.MAX_VALUE);return ;}

我向GID_demo@@@test3的设备发送指定的消息,GID_demo@@@test3这个设备成功接收到了。


服务启动监听队列消息

主要思路使用ApplicationListener来进行实现 项目启动自动启动监听


根据设备ID及所在队列查询设备当前状态消息(单独设备)

主要思路传想要查询的主题及设备code,返回当前状态信息

public SourceDataBean querySessionByClientId(@RequestParam(value = "topic", required = true) String topic,@RequestParam(value = "deviceCode", required = true) String deviceCode) throws InterruptedException, MqttException{ SourceDataBean sdb = new SourceDataBean(); List res = queryDeviceStateService.querySessionByClientId(topic, deviceCode);if(res.size()>0) {sdb.setDataSource(new DataSource<>(res,1,1,1));sdb.setMessage(RestCode.SUCCESS.code,PropertiesUtil.getValue("config.properties","QuerySuccessful"));}else {sdb.setMessage(RestCode.FAILED.code,res.get(0).getOnlineStatusName());}return sdb;}


根据设备ID及所在队列批量查询设备当前状态消息(注意最大只是支持十个设备的查询)

主要思路传想要查询的主题及设备codes,返回查询设备的状态信息

注意deviceCodes  使用逗号分隔即可,当前阿里MQTT官方文档写的是最大只是十个设备的查询,因为我这边的分页最大数量是10,未做超过10的测试。

public SourceDataBean querySessionByClientIds(@RequestParam(value = "topic", required = true) String topic,@RequestParam(value = "deviceCodes", required = true) String deviceCodes) throws InterruptedException, MqttException{ SourceDataBean sdb = new SourceDataBean();List res = queryDeviceStateService.querySessionByClientIds(topic, deviceCodes);if(res.size()>0) {sdb.setDataSource(new DataSource<>(res,res.size(),0,0));sdb.setMessage(RestCode.SUCCESS.code,PropertiesUtil.getValue("config.properties","QuerySuccessful"));}else {sdb.setMessage(RestCode.FAILED.code,res.get(0).getOnlineStatusName());}return sdb;}


这样,springMVC结合阿里MQTT的基本使用就完成了。

有不懂的地方可以QQ联系我,外接私活,源码出售,欢迎各位老板。

 


推荐阅读
author-avatar
霹靂一頁書_629
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有