文章字数:178,阅读全文大约需要1分钟
使用org.eclipse.paho.client.mqttv3
包下的工具创建mqtt
连接,并订阅和发布消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| public static void main(String[] args) {
String topic = "defaultTopic"; String content = "Message from 123xxx"; int qos = 2; String broker = "tcp://127.0.0.1:1883";
String clientId="123xxx"; String userName = "admin"; String pwd = "admin"; try {
MqttClient mqttClient = new MqttClient(broker, clientId, null); MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true); connOpts.setUserName(userName); connOpts.setPassword(pwd.toCharArray()); mqttClient.connect(connOpts);
MqttCallback callback = new MessageProcess(); mqttClient.setCallback(callback); mqttClient.subscribe(topic); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); while (true) { TimeUnit.SECONDS.sleep(5); mqttClient.publish(topic, message); System.out.println("Message published"); } } catch (MqttException me) { System.out.println("reasonCode " + me.getReasonCode()); System.out.println("message " + me.getMessage()); System.out.println("localizedMessage " + me.getLocalizedMessage()); System.out.println("cause " + me.getCause()); System.out.println("exception " + me.getMessage()); me.printStackTrace(); } catch (Exception e) { e.getMessage(); } }
static class MessageProcess implements MqttCallback { @Override public void connectionLost(Throwable throwable) { }
@Override public void messageArrived(String topic, MqttMessage message) { byte[] mess = message.getPayload(); System.out.println("topic[" + topic + "]:" + new String(mess)); }
@Override public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { } }
|