热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

基于Springboot实现Mqtt

转载:基于Springboot实现MqttJava端开发:pom.xml:

转载:基于Springboot实现Mqtt

Java端开发:
pom.xml:


<dependency><groupId>org.eclipse.pahogroupId><artifactId>org.eclipse.paho.client.mqttv3artifactId><version>1.2.2version>
dependency>

Client.java

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;&#64;Component
public class Client {Logger logger &#61; LoggerFactory.getLogger(Client.class);&#64;Autowiredprivate OnMessageCallback onMessageCallback;public final String HOST &#61; "tcp://连接ip:端口号"; //连接ippublic final String TOPIC &#61; "duilie"; //订阅队列public final String clientId &#61; "jy"; //连接id&#xff0c;注意不要和已有的连接id重复private MqttClient client;private MqttConnectOptions connOpts;private String userName &#61; "userName";private String passWord &#61; "passWord";&#64;PostConstructpublic void init() {start();}public void start() {try {client &#61; new MqttClient(HOST, clientId, new MemoryPersistence());connOpts &#61; new MqttConnectOptions();connOpts.setCleanSession(true); // 清空sessionconnOpts.setUserName(userName);connOpts.setPassword(passWord.toCharArray());connOpts.setConnectionTimeout(10);// 设置超时时间connOpts.setKeepAliveInterval(20);// 设置会话心跳时间client.setCallback(onMessageCallback); // 设置回调函数System.out.println("Connecting to broker: " &#43; HOST);int Qos &#61; 1;client.connect(connOpts); // 创立连接client.subscribe(TOPIC); // 订阅} catch (Exception e) {e.printStackTrace();}}public void publish(String topic, byte[] payload) {try {this.client.publish(topic, payload, 1, false);} catch (MqttException e) {e.printStackTrace();}}public void close() {try {this.client.close();} catch (MqttException e) {e.printStackTrace();}}public void reConnect() throws Exception {if (this.client !&#61; null) {this.logger.info("开始重连");this.client.connect(this.connOpts);int Qos &#61; 1;this.logger.info("主题&#xff1a;" &#43; TOPIC);this.client.subscribe(TOPIC, Qos);}}}

OnMessageCallback.java
回调消息处理类OnMessageCallback&#xff1a;

import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;&#64;Component
public class OnMessageCallback implements MqttCallback {&#64;Autowiredprivate ServiceImpl service; //修改成自己的业务&#64;Autowiredprivate Client client;public void connectionLost(Throwable cause) {// 连接丢失后&#xff0c;一般在这里面进行重连System.out.println("连接断开&#xff0c;可以做重连");while (true) {try {Thread.sleep(5000L);this.client.reConnect();break;} catch (Exception e) {e.printStackTrace();}}}public void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息会执行到这里面System.out.println("接收消息主题:" &#43; topic);System.out.println("接收消息Qos:" &#43; message.getQos());System.out.println("接收消息内容:" &#43; new String(message.getPayload()));service.insert(new String(message.getPayload())); //修改成自己的业务}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" &#43; token.isComplete());}
}

这里加入了断线重连。在 messageArrived 方法中调用业务即可。

连接成功会在控制台可以看到连接的id
在这里插入图片描述
Qos参数&#xff1a;
level 0&#xff1a;最多一次的传输

消息是基于TCP/IP网络传输的。没有回应&#xff0c;在协议中也没有定义重传的语义。消息可能到达服务器1次&#xff0c;也可能根本不会到达。
level 1&#xff1a;至少一次的传输&#xff08;一般配置为1&#xff09;

服务器接收到消息会被确认&#xff0c;通过传输一个PUBACK信息。如果有一个可以辨认的传输失败&#xff0c;无论是通讯连接还是发送设备&#xff0c;还是过了一段时间确认信息没有收到&#xff0c;发送方都会将消息头的DUP位置1&#xff0c;然后再次发送消息。消息最少一次到达服务器。SUBSCRIBE和UNSUBSCRIBE都使用level 1 的QoS。
如果客户端没有接收到PUBACK信息&#xff08;无论是应用定义的超时&#xff0c;还是检测到失败然后通讯session重启&#xff09;&#xff0c;客户端都会再次发送PUBLISH信息&#xff0c;并且将DUP位置1。
当它从客户端接收到重复的数据&#xff0c;服务器重新发送消息给订阅者&#xff0c;并且发送另一个PUBACK消息。
level 2&#xff1a; 只有一次的传输

在QoS level 1上附加的协议流保证了重复的消息不会传送到接收的应用。这是最高级别的传输&#xff0c;当重复的消息不被允许的情况下使用。这样增加了网络流量&#xff0c;但是它通常是可以接受的&#xff0c;因为消息内容很重要。
QoS level 2在消息头有Message ID。
接收消息和写入数据库解藕&#xff1a;

这里是直接调用 server 写入数据库的写法。可以改成接收到消息写入到一个 BlockingQueue 里&#xff0c;再由业务层去取数。

遇见问题&#xff1a;
clientId&#xff1a;是每一个去订阅连接的 client 的名称&#xff0c;注意不要和原有的的连接名重复。
Mqtt是不会消息堆积&#xff0c;也就是我这里设备一直发送消息&#xff0c;而我的java客户端断了5分钟&#xff0c;那么这5分钟的消息在重连时是获取不到的&#xff0c;只能获得之后的消息。&#xff08;具体的原理没有研究&#xff09;


推荐阅读
  • android listview OnItemClickListener失效原因
    最近在做listview时发现OnItemClickListener失效的问题,经过查找发现是因为button的原因。不仅listitem中存在button会影响OnItemClickListener事件的失效,还会导致单击后listview每个item的背景改变,使得item中的所有有关焦点的事件都失效。本文给出了一个范例来说明这种情况,并提供了解决方法。 ... [详细]
  • 计算机存储系统的层次结构及其优势
    本文介绍了计算机存储系统的层次结构,包括高速缓存、主存储器和辅助存储器三个层次。通过分层存储数据可以提高程序的执行效率。计算机存储系统的层次结构将各种不同存储容量、存取速度和价格的存储器有机组合成整体,形成可寻址存储空间比主存储器空间大得多的存储整体。由于辅助存储器容量大、价格低,使得整体存储系统的平均价格降低。同时,高速缓存的存取速度可以和CPU的工作速度相匹配,进一步提高程序执行效率。 ... [详细]
  • 闭包一直是Java社区中争论不断的话题,很多语言都支持闭包这个语言特性,闭包定义了一个依赖于外部环境的自由变量的函数,这个函数能够访问外部环境的变量。本文以JavaScript的一个闭包为例,介绍了闭包的定义和特性。 ... [详细]
  • Android系统源码分析Zygote和SystemServer启动过程详解
    本文详细解析了Android系统源码中Zygote和SystemServer的启动过程。首先介绍了系统framework层启动的内容,帮助理解四大组件的启动和管理过程。接着介绍了AMS、PMS等系统服务的作用和调用方式。然后详细分析了Zygote的启动过程,解释了Zygote在Android启动过程中的决定作用。最后通过时序图展示了整个过程。 ... [详细]
  • 基于Socket的多个客户端之间的聊天功能实现方法
    本文介绍了基于Socket的多个客户端之间实现聊天功能的方法,包括服务器端的实现和客户端的实现。服务器端通过每个用户的输出流向特定用户发送消息,而客户端通过输入流接收消息。同时,还介绍了相关的实体类和Socket的基本概念。 ... [详细]
  • 重入锁(ReentrantLock)学习及实现原理
    本文介绍了重入锁(ReentrantLock)的学习及实现原理。在学习synchronized的基础上,重入锁提供了更多的灵活性和功能。文章详细介绍了重入锁的特性、使用方法和实现原理,并提供了类图和测试代码供读者参考。重入锁支持重入和公平与非公平两种实现方式,通过对比和分析,读者可以更好地理解和应用重入锁。 ... [详细]
  • 本文介绍了一款名为TimeSelector的Android日期时间选择器,采用了Material Design风格,可以在Android Studio中通过gradle添加依赖来使用,也可以在Eclipse中下载源码使用。文章详细介绍了TimeSelector的构造方法和参数说明,以及如何使用回调函数来处理选取时间后的操作。同时还提供了示例代码和可选的起始时间和结束时间设置。 ... [详细]
  • Hibernate延迟加载深入分析-集合属性的延迟加载策略
    本文深入分析了Hibernate延迟加载的机制,特别是集合属性的延迟加载策略。通过延迟加载,可以降低系统的内存开销,提高Hibernate的运行性能。对于集合属性,推荐使用延迟加载策略,即在系统需要使用集合属性时才从数据库装载关联的数据,避免一次加载所有集合属性导致性能下降。 ... [详细]
  • 生成式对抗网络模型综述摘要生成式对抗网络模型(GAN)是基于深度学习的一种强大的生成模型,可以应用于计算机视觉、自然语言处理、半监督学习等重要领域。生成式对抗网络 ... [详细]
  • CSS3选择器的使用方法详解,提高Web开发效率和精准度
    本文详细介绍了CSS3新增的选择器方法,包括属性选择器的使用。通过CSS3选择器,可以提高Web开发的效率和精准度,使得查找元素更加方便和快捷。同时,本文还对属性选择器的各种用法进行了详细解释,并给出了相应的代码示例。通过学习本文,读者可以更好地掌握CSS3选择器的使用方法,提升自己的Web开发能力。 ... [详细]
  • JVM 学习总结(三)——对象存活判定算法的两种实现
    本文介绍了垃圾收集器在回收堆内存前确定对象存活的两种算法:引用计数算法和可达性分析算法。引用计数算法通过计数器判定对象是否存活,虽然简单高效,但无法解决循环引用的问题;可达性分析算法通过判断对象是否可达来确定存活对象,是主流的Java虚拟机内存管理算法。 ... [详细]
  • 本文介绍了Android 7的学习笔记总结,包括最新的移动架构视频、大厂安卓面试真题和项目实战源码讲义。同时还分享了开源的完整内容,并提醒读者在使用FileProvider适配时要注意不同模块的AndroidManfiest.xml中配置的xml文件名必须不同,否则会出现问题。 ... [详细]
  • MyBatis多表查询与动态SQL使用
    本文介绍了MyBatis多表查询与动态SQL的使用方法,包括一对一查询和一对多查询。同时还介绍了动态SQL的使用,包括if标签、trim标签、where标签、set标签和foreach标签的用法。文章还提供了相关的配置信息和示例代码。 ... [详细]
  • Netty源代码分析服务器端启动ServerBootstrap初始化
    本文主要分析了Netty源代码中服务器端启动的过程,包括ServerBootstrap的初始化和相关参数的设置。通过分析NioEventLoopGroup、NioServerSocketChannel、ChannelOption.SO_BACKLOG等关键组件和选项的作用,深入理解Netty服务器端的启动过程。同时,还介绍了LoggingHandler的作用和使用方法,帮助读者更好地理解Netty源代码。 ... [详细]
  • 从相邻元素对还原数组的解题思路和代码
    本文介绍了从相邻元素对还原数组的解题思路和代码。思路是使用HashMap存放邻接关系,并找出起始点,然后依次取。代码使用了HashMap来存放起始点所在的adjacentPairs中的位置,并对重复的起始点进行处理。 ... [详细]
author-avatar
萱筱璧
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有