0%

mqttv3实现mqttCLient端

文章字数:178,阅读全文大约需要1分钟

使用org.eclipse.paho.client.mqttv3包下的工具创建mqtt连接,并订阅和发布消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public static void main(String[] args) {

String topic = "defaultTopic";
String content = "Message from 123xxx";
int qos = 2;
String broker = "tcp://127.0.0.1:1883";

String clientId="123xxx";
String userName = "admin";
String pwd = "admin";
try {
/**
* 不传递persistence默认使用文件保存未发送的消息,传递null使用内存保存未发送的消息
*/
MqttClient mqttClient = new MqttClient(broker, clientId, null);
MqttConnectOptions connOpts = new MqttConnectOptions();

/**
* 再次连接时是否清除上次的session,即重新连接时是否不接收未收到的消息。只有重新连接时才能够更改此参数
*/
connOpts.setCleanSession(true);
connOpts.setUserName(userName);
connOpts.setPassword(pwd.toCharArray());
// 发起连接
mqttClient.connect(connOpts);

// 订阅
MqttCallback callback = new MessageProcess();
mqttClient.setCallback(callback);
mqttClient.subscribe(topic);
// 发布
// 1.创建需要发布的消息
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
// 2.循环模拟发布
while (true) {
TimeUnit.SECONDS.sleep(5);
mqttClient.publish(topic, message);
System.out.println("Message published");
}
} catch (MqttException me) {
System.out.println("reasonCode " + me.getReasonCode());
System.out.println("message " + me.getMessage());
System.out.println("localizedMessage " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("exception " + me.getMessage());
me.printStackTrace();
} catch (Exception e) {
e.getMessage();
}
}

static class MessageProcess implements MqttCallback {
@Override
public void connectionLost(Throwable throwable) {
// 断线 手动重连
}

@Override
public void messageArrived(String topic, MqttMessage message) {
byte[] mess = message.getPayload();
System.out.println("topic[" + topic + "]:" + new String(mess));
}

@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
// 确认信息已经传递完毕调用
}
}