文章字数:282,阅读全文大约需要1分钟
使用SpringBoot
进行MQTT
的推送和订阅主题。
环境
jdk1.8
maven
1 2 3 4 5 6 7 8 9 10 11 12
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
|
使用
- 配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
| import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.endpoint.MessageProducerSupport; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler;
@Configuration
@IntegrationComponentScan public class MqttConfig {
private String username = "userName"; private String password = "password";
@Value("${mqtt.url}") private String hostUrl;
@Value("${mqtt.client.id}") private String clientId;
@Value("${mqtt.topic}") private String defaultTopic;
@Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions mqttConnectOptions=new MqttConnectOptions(); mqttConnectOptions.setServerURIs(new String[]{hostUrl}); mqttConnectOptions.setKeepAliveInterval(2); mqttConnectOptions.setCleanSession(false); mqttConnectOptions.setUserName(username); mqttConnectOptions.setPassword(password.toCharArray()); factory.setConnectionOptions(mqttConnectOptions); return factory; }
@Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId, mqttClientFactory); messageHandler.setAsync(true); messageHandler.setDefaultQos(0); messageHandler.setDefaultRetained(false); messageHandler.setAsyncEvents(false); return messageHandler; }
@Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); }
@Bean @ServiceActivator(inputChannel = "mqttInboundChannel") public MessageHandler newHandler() { return message -> System.out.println("收到消息 = " + message.getPayload()); }
@Bean public MessageChannel mqttInboundChannel() { return new DirectChannel(); }
@Bean public MessageProducerSupport mqttInbound(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory, defaultTopic); adapter.setConverter(new DefaultPahoMessageConverter()); adapter.setQos(0); adapter.setOutputChannel(mqttInboundChannel()); return adapter; } }
|
上面配置了订阅了一个DirectChannel
,接收到消息后推送给MQTT
下面就是如何使用
1 2 3 4 5 6 7
| @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MsgWriter { void sendToMqtt(String data); void sendToMqtt(String payload,@Header(MqttHeaders.TOPIC) String topic); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
|