Java充当mqtt客户端发送订阅

使用 Java SDK 连接

Eclipse Paho Java Client 是用 Java 编写的 MQTT 客户端库(MQTT Java Client),可用于 JVM 或其他 Java 兼容平台(例如Android)。

Eclipse Paho Java Client 提供了MqttAsyncClient 和 MqttClient 异步和同步 API。

通过 Maven 安装 Paho Java

通过包管理工具 Maven 可以方便地安装 Paho Java 客户端库,截止目前最新版本安装如下:

1
2
3
4
5
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>

Paho Java 使用示例

Java 体系中 Paho Java 是比较稳定、广泛应用的 MQTT 客户端库,本示例包含 Java 语言的 Paho Java 连接 EMQX Broker,并进行消息收发完整代码:

Java发送订阅示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;

public class MqttClientExample {

public static void main(String[] args) {

// MQTT服务器地址,格式为tcp://[地址]:[端口]
String broker = "tcp://[地址]:[端口]";
// 客户端ID,可以随机生成
String clientId = MqttClient.generateClientId();
// MQTT服务器的用户名和密码---开启客户端密码验证需要此项
String username = "账号";
String password = "密码";

try {
// 创建MQTT客户端
MqttClient client = new MqttClient(broker, clientId);

// 创建连接选项对象
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(username);
connOpts.setPassword(password.toCharArray());

// 设置回调函数,用于处理消息
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
// 连接丢失时的回调
System.out.println("Connection lost!");
}

public void messageArrived(String topic, MqttMessage message) throws Exception {
// 收到消息时的回调
System.out.println("Received message: " + new String(message.getPayload()) + " on topic: " + topic);
}

public void deliveryComplete(IMqttDeliveryToken token) {
// 消息送达完成时的回调,此例中不需要实现
}
});

// 连接到MQTT服务器
client.connect(connOpts);

// 订阅主题
String subTopic = "status";
client.subscribe(subTopic);

// 发送消息
String pubTopic = "ledctl";
String content = "off";
MqttMessage message = new MqttMessage(content.getBytes());
client.publish(pubTopic, message);

// 等待消息被发送和接收,此处等待2秒
Thread.sleep(2000);

// 断开连接
client.disconnect();
System.out.println("Disconnected");

} catch(MqttException me) {
// 捕获并打印MQTT异常信息
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
} catch (InterruptedException ie) {
// 捕获并打印线程中断异常
ie.printStackTrace();
}
}
}