热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

springboot整合mqtt实现消息发送

1、环境准备1、本地已搭建MQTT服务 参考 windows环境搭建MQTT_音乐土豆-CSDN博客2、新建Springboot项目2、pom文件引入依赖org.springframework.int

1、环境准备

1、本地已搭建MQTT服务 参考 windows环境搭建MQTT_音乐土豆-CSDN博客

2、新建Springboot项目

2、pom文件引入依赖

<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>

3、application.properties文件新增mqtt配置

server.port&#61;8080
#MQTT配置信息
#MQTT-用户名
spring.mqtt.username&#61;admin
#MQTT-密码
spring.mqtt.password&#61;password
#MQTT-服务器连接地址&#xff0c;如果有多个&#xff0c;用逗号隔开&#xff0c;如&#xff1a;tcp://127.0.0.1:61613,tcp://127.0.0.1:61613
spring.mqtt.url&#61;tcp://127.0.0.1:61613
#MQTT-连接服务器默认客户端ID 可自定义
spring.mqtt.client.id&#61;mqttId
#MQTT-默认的消息推送主题&#xff0c;实际可在调用接口时指定
spring.mqtt.default.topic&#61;test
#timeout 链接超时时间
mqtt.connection.timeout&#61;20
#keep alive 单位为秒&#xff0c;MQTT 协议中约定&#xff1a;在 1.5*Keep Alive 的时间间隔内&#xff0c;如果 Broker 没有收到来自 Client 的任何数据包&#xff0c;那么 Broker 认为它和 Client 之间的连接已经断开&#xff1b;同样地, 如果 Client 没有收到来自 Broker 的任何数据包&#xff0c;那么 Client 认为它和 Broker 之间的连接已经断开
mqtt.keep.alive.interval&#61;20

4、编写MqttClientUtil

&#64;Component
&#64;Slf4j
public class MqttClientUtil {&#64;Value("${spring.mqtt.username}")private String username;&#64;Value("${spring.mqtt.password}")private String password;&#64;Value("${spring.mqtt.url}")private String host;&#64;Value("${spring.mqtt.client.id}")private String clientId;&#64;Value("${spring.mqtt.default.topic}")private String topic;&#64;Value("${mqtt.connection.timeout}")private int timeOut;&#64;Value("${mqtt.keep.alive.interval}")private int interval;private MqttClient mqttClient;private MqttConnectOptions mqttConnectOptions;&#64;PostConstructprivate void init(){connect(host, clientId);}/*** 链接mqtt* &#64;param host* &#64;param clientId*/private void connect(String host,String clientId){try{mqttClient &#61; new MqttClient(host,clientId,new MemoryPersistence());mqttConnectOptions &#61; getMqttConnectOptions();mqttClient.connect(mqttConnectOptions);}catch (Exception e){log.error("mqtt服务链接异常!");e.printStackTrace();}}/*** 设置链接对象信息* setCleanSession true 断开链接即清楚会话 false 保留链接信息 离线还会继续发消息* &#64;return*/private MqttConnectOptions getMqttConnectOptions(){MqttConnectOptions mqttConnectOptions &#61; new MqttConnectOptions();mqttConnectOptions.setUserName(username);mqttConnectOptions.setPassword(password.toCharArray());mqttConnectOptions.setServerURIs(new String[]{host});mqttConnectOptions.setKeepAliveInterval(interval);mqttConnectOptions.setConnectionTimeout(timeOut);mqttConnectOptions.setCleanSession(true);return mqttConnectOptions;}/***mqtt链接状态* &#64;return*/private boolean isConnect(){if(Objects.isNull(this.mqttClient)){return false;}return mqttClient.isConnected();}/*** 设置重连* &#64;throws Exception*/private void reConnect() throws Exception{if(Objects.nonNull(this.mqttClient)){log.info("mqtt 服务已重新链接...");this.mqttClient.connect(this.mqttConnectOptions);}}/*** 断开链接* &#64;throws Exception*/private void closeConnect() throws Exception{if(Objects.nonNull(this.mqttClient)){log.info("mqtt 服务已断开链接...");this.mqttClient.disconnect();}}/*** 发布消息* &#64;param topic* &#64;param message* &#64;param qos* &#64;throws Exception*/public void sendMessage(String topic,String message,int qos) throws Exception {if(Objects.nonNull(this.mqttClient) && this.mqttClient.isConnected()){MqttMessage mqttMessage &#61; new MqttMessage();mqttMessage.setPayload(message.getBytes());mqttMessage.setQos(qos);MqttTopic mqttTopic &#61; mqttClient.getTopic(topic);if(Objects.nonNull(mqttTopic)){try{MqttDeliveryToken publish &#61; mqttTopic.publish(mqttMessage);if(publish.isComplete()){log.info("消息发送成功---->{}",message);}}catch(Exception e){log.error("消息发送异常",e);}}}else{reConnect();}}
}

5、编写controller

&#64;Slf4j
&#64;RestController
public class TestController {&#64;Autowiredprivate MqttClientUtil mqttClientUtil;&#64;RequestMapping("/send")public String sendMessage(){String topic &#61; "test";String message &#61; "测试 mqtt 消息发布";//0:最多一次 1&#xff1a;最少一次 2&#xff1a;至少一次int qos &#61; 2;try{mqttClientUtil.sendMessage(topic,message,qos);}catch(Exception e){log.error("mqtt 消息发送异常",e);}return "mqtt 消息已发送&#xff01;";}
}

6、测试

http://localhost:8080/send

在这里插入图片描述
也可以打开Mqtt box 订阅topic为test
在这里插入图片描述
可以看到&#xff0c;消息已经成功发出&#xff01;


推荐阅读
author-avatar
滨州电信1988_259
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有