0%

SpringBoot集成MQTT

文章字数:282,阅读全文大约需要1分钟

使用SpringBoot进行MQTT的推送和订阅主题。

环境

  1. jdk1.8

  2. maven

1
2
3
4
5
6
7
8
9
10
11
12
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>

使用

  1. 配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

@Configuration
// spring Integration组件扫描,MessageChannel使用的就是这个组件
@IntegrationComponentScan
public class MqttConfig {

private String username = "userName";
private String password = "password";

/**
* mqtt 服务地址
*/
@Value("${mqtt.url}")
private String hostUrl;

/**
* 设备id,用来区分不同的设备连接
*/
@Value("${mqtt.client.id}")
private String clientId;

/**
* 订阅那个主题
*/
@Value("${mqtt.topic}")
private String defaultTopic;

/**
* 创建连接的工厂
* 用于构建MessageHandler
* @return
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
// mqtt服务器url
mqttConnectOptions.setServerURIs(new String[]{hostUrl});
// 设置会话心跳时间(秒)
mqttConnectOptions.setKeepAliveInterval(2);
// 每次请求是否清空连接记录
mqttConnectOptions.setCleanSession(false);
// 可以设置用户名密码
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
factory.setConnectionOptions(mqttConnectOptions);
return factory;
}

/* --------------------发布配置----------------- */

/**
* 1. 发布信息的MessageHandler
* 订阅 mqttOutboundChannel 通道的信息
* @param mqttClientFactory
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory);
messageHandler.setAsync(true);
messageHandler.setDefaultQos(0);
messageHandler.setDefaultRetained(false);
messageHandler.setAsyncEvents(false);
return messageHandler;
}

@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}

/* --------------------接收配置-------------------- */

/**
* 处理订阅的MessageHandler
* 订阅 aaInboundChannel 通道的信息
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttInboundChannel")
public MessageHandler newHandler() {
return message -> System.out.println("收到消息 = " + message.getPayload());
}

@Bean
public MessageChannel mqttInboundChannel() {
return new DirectChannel();
}

/**
* 1. 订阅主题,可订阅多个主题
* 2. 将主题返回的内容发布到指定的 MessageChannel 里
* @param mqttClientFactory
* @return
*/
@Bean
public MessageProducerSupport mqttInbound(MqttPahoClientFactory mqttClientFactory) {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory, defaultTopic);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(0);
adapter.setOutputChannel(mqttInboundChannel());
return adapter;
}
}

上面配置了订阅了一个DirectChannel,接收到消息后推送给MQTT
下面就是如何使用

1
2
3
4
5
6
7
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MsgWriter {
void sendToMqtt(String data);
void sendToMqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}