1、环境准备
1、本地已搭建MQTT服务 参考 windows环境搭建MQTT_音乐土豆-CSDN博客
2、新建Springboot项目
2、pom文件引入依赖
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId>
</dependency>
3、application.properties文件新增mqtt配置
server.port&#61;8080
spring.mqtt.username&#61;admin
spring.mqtt.password&#61;password
spring.mqtt.url&#61;tcp://127.0.0.1:61613
spring.mqtt.client.id&#61;mqttId
spring.mqtt.default.topic&#61;test
mqtt.connection.timeout&#61;20
mqtt.keep.alive.interval&#61;20
4、编写MqttClientUtil
&#64;Component
&#64;Slf4j
public class MqttClientUtil {&#64;Value("${spring.mqtt.username}")private String username;&#64;Value("${spring.mqtt.password}")private String password;&#64;Value("${spring.mqtt.url}")private String host;&#64;Value("${spring.mqtt.client.id}")private String clientId;&#64;Value("${spring.mqtt.default.topic}")private String topic;&#64;Value("${mqtt.connection.timeout}")private int timeOut;&#64;Value("${mqtt.keep.alive.interval}")private int interval;private MqttClient mqttClient;private MqttConnectOptions mqttConnectOptions;&#64;PostConstructprivate void init(){connect(host, clientId);}private void connect(String host,String clientId){try{mqttClient &#61; new MqttClient(host,clientId,new MemoryPersistence());mqttConnectOptions &#61; getMqttConnectOptions();mqttClient.connect(mqttConnectOptions);}catch (Exception e){log.error("mqtt服务链接异常!");e.printStackTrace();}}private MqttConnectOptions getMqttConnectOptions(){MqttConnectOptions mqttConnectOptions &#61; new MqttConnectOptions();mqttConnectOptions.setUserName(username);mqttConnectOptions.setPassword(password.toCharArray());mqttConnectOptions.setServerURIs(new String[]{host});mqttConnectOptions.setKeepAliveInterval(interval);mqttConnectOptions.setConnectionTimeout(timeOut);mqttConnectOptions.setCleanSession(true);return mqttConnectOptions;}private boolean isConnect(){if(Objects.isNull(this.mqttClient)){return false;}return mqttClient.isConnected();}private void reConnect() throws Exception{if(Objects.nonNull(this.mqttClient)){log.info("mqtt 服务已重新链接...");this.mqttClient.connect(this.mqttConnectOptions);}}private void closeConnect() throws Exception{if(Objects.nonNull(this.mqttClient)){log.info("mqtt 服务已断开链接...");this.mqttClient.disconnect();}}public void sendMessage(String topic,String message,int qos) throws Exception {if(Objects.nonNull(this.mqttClient) && this.mqttClient.isConnected()){MqttMessage mqttMessage &#61; new MqttMessage();mqttMessage.setPayload(message.getBytes());mqttMessage.setQos(qos);MqttTopic mqttTopic &#61; mqttClient.getTopic(topic);if(Objects.nonNull(mqttTopic)){try{MqttDeliveryToken publish &#61; mqttTopic.publish(mqttMessage);if(publish.isComplete()){log.info("消息发送成功---->{}",message);}}catch(Exception e){log.error("消息发送异常",e);}}}else{reConnect();}}
}
5、编写controller
&#64;Slf4j
&#64;RestController
public class TestController {&#64;Autowiredprivate MqttClientUtil mqttClientUtil;&#64;RequestMapping("/send")public String sendMessage(){String topic &#61; "test";String message &#61; "测试 mqtt 消息发布";int qos &#61; 2;try{mqttClientUtil.sendMessage(topic,message,qos);}catch(Exception e){log.error("mqtt 消息发送异常",e);}return "mqtt 消息已发送&#xff01;";}
}
6、测试
http://localhost:8080/send
也可以打开Mqtt box 订阅topic为test
可以看到&#xff0c;消息已经成功发出&#xff01;