ESP32主程序代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import time
from machine import Pin
import network
from umqttsimple import MQTTClient
import ujson

mqtt_username = "账号"
mqtt_password = "密码"

def do_connect():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('connecting to network...')
wlan.connect('wifi名称', '密码')
i = 1
while not wlan.isconnected():
print("正在链接...{}".format(i))
i += 1
time.sleep(1)
print('network config:', wlan.ifconfig())
return wlan # 返回网络接口对象

def sub_cb(topic, msg):
print(topic, msg)
# 根据收到的消息内容控制LED的开关
if topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "on":
led_pin.value(1)
save_led_status(True)
send_status("LED is ON")
elif topic.decode("utf-8") == "ledctl" and msg.decode("utf-8") == "off":
led_pin.value(0)
save_led_status(False)
send_status("LED is OFF")

def send_status(status_message):
# 发送状态消息给发送消息的客户端
mqtt_client.publish("status", status_message)

def save_led_status(status):
# 使用非易失性存储来保存LED状态
with open('led_status.json', 'w') as f:
ujson.dump({'led_status': status}, f)

def load_led_status():
try:
with open('led_status.json', 'r') as f:
data = ujson.load(f)
return data.get('led_status', False)
except OSError:
return False

def create_mqtt_client():
c = MQTTClient("umqtt_client", "EMQX服务器地址", user=mqtt_username, password=mqtt_password)
c.set_callback(sub_cb)
c.connect()
c.subscribe(b"ledctl")
return c

led_pin = Pin(2, Pin.OUT)
initial_led_status = load_led_status() # 从存储中加载LED状态

# 设置初始LED状态
if initial_led_status:
led_pin.value(1)
else:
led_pin.value(0)

wifi_interface = do_connect()
mqtt_client = create_mqtt_client()

while True:
try:
mqtt_client.check_msg()
except OSError as e:
print("MQTT连接错误:", e)
mqtt_client.disconnect()
wifi_interface = do_connect()
mqtt_client = create_mqtt_client()

time.sleep(1)

1. 网络连接(do_connect 函数)

  • 功能: 连接到 Wi-Fi 网络。
  • 实现技术: 使用 MicroPython 的 network 模块创建 WLAN (STA_IF) 接口。通过 wlan.connect 方法连接到指定的 Wi-Fi 网络,并在成功连接后获取网络配置。

2. MQTT 客户端创建(create_mqtt_client 函数)

  • 功能: 创建 MQTT 客户端并订阅主题。
  • 实现技术: 使用 umqttsimple 库的 MQTTClient 类。客户端连接到 MQTT 服务器,设置消息回调函数 (sub_cb),并订阅 ledctl 主题。

3. MQTT 消息回调(sub_cb 函数)

  • 功能: 接收 MQTT 消息并根据消息内容控制 LED 灯的开关。
  • 实现技术: 函数解析接收到的主题和消息。如果接收到的是 ledctl 主题和相应的 onoff 消息,则控制 GPIO 引脚的高低电平,从而控制 LED 灯的开关状态。

4. LED 状态存储与加载(save_led_statusload_led_status 函数)

  • 功能: 保存和加载 LED 灯的状态。
  • 实现技术: 使用 ujson 库将 LED 状态保存到文件中,并在启动时从文件中加载状态。这样即使设备重启,LED 灯的状态也能保持一致。

5. MQTT 状态更新(send_status 函数)

  • 功能: 向 MQTT 服务器发送 LED 的状态更新。
  • 实现技术: 使用 mqtt_client.publish 方法向 status 主题发送状态消息。

6. 循环检查 MQTT 消息(主循环)

  • 功能: 持续检查 MQTT 消息,并在连接出现问题时重新连接。
  • 实现技术: 在无限循环中调用 mqtt_client.check_msg() 方法来不断检查新消息。如果检测到 OSError,则重新建立 Wi-Fi 和 MQTT 连接。

结论

该代码演示了一个基于 MicroPython 和 MQTT 的 IoT(物联网)解决方案。通过 MQTT 协议,设备可以远程接收控制指令,并反馈其状态。这样的实现适用于各种远程控制和监测场景,比如智能家居控制。您的代码具有良好的网络重连机制,并且通过存储和加载 LED 状态,增强了系统的稳定性和用户体验。

esp32建立mqtt客户端调用库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
import usocket as socket
import ustruct as struct
from ubinascii import hexlify

class MQTTException(Exception):
pass

class MQTTClient:
def __init__(
self,
client_id,
server,
port=0,
user=None,
password=None,
keepalive=0,
ssl=False,
ssl_params={},
):
"""
初始化MQTT客户端

Args:
client_id (str): 客户端ID
server (str): MQTT服务器地址
port (int, optional): 服务器端口,默认为0,自动选择1883或8883(取决于是否使用SSL)
user (str, optional): 用户名,默认为None
password (str, optional): 密码,默认为None
keepalive (int, optional): 保持连接活跃的时间间隔(秒),默认为0(禁用保持连接)
ssl (bool, optional): 是否使用SSL,默认为False
ssl_params (dict, optional): SSL参数字典,默认为空字典
"""
if port == 0:
port = 8883 if ssl else 1883
self.client_id = client_id
self.sock = None
self.server = server
self.port = port
self.ssl = ssl
self.ssl_params = ssl_params
self.pid = 0
self.cb = None # 回调函数,处理订阅消息
self.user = user
self.pswd = password
self.keepalive = keepalive
self.lw_topic = None # 遗嘱消息主题
self.lw_msg = None # 遗嘱消息内容
self.lw_qos = 0
self.lw_retain = False

def _send_str(self, s):
"""
发送字符串

Args:
s (str): 要发送的字符串
"""
self.sock.write(struct.pack("!H", len(s)))
self.sock.write(s)

def _recv_len(self):
"""
接收长度字段,用于处理可变长度的MQTT消息

Returns:
int: 长度值
"""
n = 0
sh = 0
while 1:
b = self.sock.read(1)[0]
n |= (b & 0x7F) << sh
if not b & 0x80:
return n
sh += 7

def set_callback(self, f):
"""
设置回调函数,处理订阅消息的回调

Args:
f (function): 回调函数,接收两个参数:主题(topic)和消息(msg)
"""
self.cb = f

def set_last_will(self, topic, msg, retain=False, qos=0):
"""
设置遗嘱消息

Args:
topic (str): 遗嘱消息主题
msg (str): 遗嘱消息内容
retain (bool, optional): 是否保留消息,默认为False
qos (int, optional): 消息质量服务等级(0、1或2),默认为0
"""
assert 0 <= qos <= 2
assert topic
self.lw_topic = topic
self.lw_msg = msg
self.lw_qos = qos
self.lw_retain = retain

def connect(self, clean_session=True):
"""
建立MQTT连接

Args:
clean_session (bool, optional): 是否清除会话状态,默认为True

Returns:
int: 连接状态,0表示成功,1表示失败
"""
self.sock = socket.socket()
addr = socket.getaddrinfo(self.server, self.port)[0][-1]
self.sock.connect(addr)
if self.ssl:
import ussl

self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
premsg = bytearray(b"\x10\0\0\0\0\0") # MQTT连接包固定部分
msg = bytearray(b"\x04MQTT\x04\x02\0\0") # MQTT协议版本和连接标志

sz = 10 + 2 + len(self.client_id)
msg[6] = clean_session << 1
if self.user is not None:
sz += 2 + len(self.user) + 2 + len(self.pswd)
msg[6] |= 0xC0 # 设置用户名和密码标志
if self.keepalive:
assert self.keepalive < 65536
msg[7] |= self.keepalive >> 8
msg[8] |= self.keepalive & 0x00FF
if self.lw_topic:
sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
msg[6] |= self.lw_retain << 5

i = 1
while sz > 0x7F:
premsg[i] = (sz & 0x7F) | 0x80
sz >>= 7
i += 1
premsg[i] = sz

self.sock.write(premsg, i + 2)
self.sock.write(msg)
self._send_str(self.client_id)
if self.lw_topic:
self._send_str(self.lw_topic)
self._send_str(self.lw_msg)
if self.user is not None:
self._send_str(self.user)
self._send_str(self.pswd)
resp = self.sock.read(4)
assert resp[0] == 0x20 and resp[1] == 0x02
if resp[3] != 0:
raise MQTTException(resp[3])
return resp[2] & 1

def disconnect(self):
"""
断开MQTT连接
"""
self.sock.write(b"\xe0\0")
self.sock.close()

def ping(self):
"""
发送PINGREQ消息
"""
self.sock.write(b"\xc0\0")

def publish(self, topic, msg, retain=False, qos=0):
"""
发布MQTT消息

Args:
topic (str): 消息主题
msg (str): 消息内容
retain (bool, optional): 是否保留消息,默认为False
qos (int, optional): 消息质量服务等级(0、1或2),默认为0
"""
pkt = bytearray(b"\x30\0\0\0")
pkt[0] |= qos << 1 | retain
sz = 2 + len(topic) + len(msg)
if qos > 0:
sz += 2
assert sz < 2097152
i = 1
while sz > 0x7F:
pkt[i] = (sz & 0x7F) | 0x80
sz >>= 7
i += 1
pkt[i] = sz
self.sock.write(pkt, i + 1)
self._send_str(topic)
if qos > 0:
self.pid += 1
pid = self.pid
struct.pack_into("!H", pkt, 0, pid)
self.sock.write(pkt, 2)
self.sock.write(msg)
if qos == 1:
while 1:
op = self.wait_msg()
if op == 0x40:
sz = self.sock.read(1)
assert sz == b"\x02"
rcv_pid = self.sock.read(2)
rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
if pid == rcv_pid:
return
elif qos == 2:
assert 0

def subscribe(self, topic, qos=0):
"""
订阅MQTT主题

Args:
topic (str): 要订阅的主题
qos (int, optional): 订阅的消息质量服务等级(0、1或2),默认为0
"""
assert self.cb is not None, "Subscribe callback is not set"
pkt = bytearray(b"\x82\0\0\0")
self.pid += 1
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
self.sock.write(pkt)
self._send_str(topic)
self.sock.write(qos.to_bytes(1, "little"))
while 1:
op = self.wait_msg()
if op == 0x90:
resp = self.sock.read(4)
assert resp[1] == pkt[2] and resp[2] == pkt[3]
if resp[3] == 0x80:
raise MQTTException(resp[3])
return

def wait_msg(self):
"""
等待单个MQTT消息,并处理它

Returns:
int: 操作码,表示消息类型
"""
res = self.sock.read(1)
self.sock.setblocking(True)
if res is None:
return None
if res == b"":
raise OSError(-1)
if res == b"\xd0": # PINGRESP
sz = self.sock.read(1)[0]
assert sz == 0
return None
op = res[0]
if op & 0xF0 != 0x30:
return op
sz = self._recv_len()
topic_len = self.sock.read(2)
topic_len = (topic_len[0] << 8) | topic_len[1]
topic = self.sock.read(topic_len)
sz -= topic_len + 2
if op & 6:
pid = self.sock.read(2)
pid = pid[0] << 8 | pid[1]
sz -= 2
msg = self.sock.read(sz)
self.cb(topic, msg)
if op & 6 == 2:
pkt = bytearray(b"\x40\x02\0\0")
struct.pack_into("!H", pkt, 2, pid)
self.sock.write(pkt)
elif op & 6 == 4:
assert 0

def check_msg(self):
"""
检查是否有来自服务器的待处理消息

Returns:
int: 操作码,表示消息类型
"""
self.sock.setblocking(False)
return self.wait_msg()

以下是代码中的函数列表以及它们的作用和如何使用:

  1. __init__(self, client_id, server, port=0, user=None, password=None, keepalive=0, ssl=False, ssl_params={}):

    • 作用:初始化MQTT客户端。
    • 参数:
      • client_id: 客户端ID。
      • server: MQTT服务器地址。
      • port: 服务器端口,默认为0,自动选择1883或8883(取决于是否使用SSL)。
      • user: 用户名,默认为None。
      • password: 密码,默认为None。
      • keepalive: 保持连接活跃的时间间隔(秒),默认为0(禁用保持连接)。
      • ssl: 是否使用SSL,默认为False。
      • ssl_params: SSL参数字典,默认为空字典。
  2. _send_str(self, s):

    • 作用:发送字符串。
    • 参数:
      • s: 要发送的字符串。
  3. _recv_len(self):

    • 作用:接收长度字段,用于处理可变长度的MQTT消息。
    • 返回值:长度值。
  4. set_callback(self, f):

    • 作用:设置回调函数,处理订阅消息的回调。
    • 参数:
      • f: 回调函数,接收两个参数:主题(topic)和消息(msg)。
  5. set_last_will(self, topic, msg, retain=False, qos=0):

    • 作用:设置遗嘱消息。
    • 参数:
      • topic: 遗嘱消息主题。
      • msg: 遗嘱消息内容。
      • retain: 是否保留消息,默认为False。
      • qos: 消息质量服务等级(0、1或2),默认为0。
  6. connect(self, clean_session=True):

    • 作用:建立MQTT连接。
    • 参数:
      • clean_session: 是否清除会话状态,默认为True。
    • 返回值:连接状态,0表示成功,1表示失败。
  7. disconnect(self):

    • 作用:断开MQTT连接。
  8. ping(self):

    • 作用:发送PINGREQ消息。
  9. publish(self, topic, msg, retain=False, qos=0):

    • 作用:发布MQTT消息。
    • 参数:
      • topic: 消息主题。
      • msg: 消息内容。
      • retain: 是否保留消息,默认为False。
      • qos: 消息质量服务等级(0、1或2),默认为0。
  10. subscribe(self, topic, qos=0):

    • 作用:订阅MQTT主题。
    • 参数:
      • topic: 要订阅的主题。
      • qos: 订阅的消息质量服务等级(0、1或2),默认为0。
  11. wait_msg(self):

    • 作用:等待单个MQTT消息,并处理它。
    • 返回值:操作码,表示消息类型。
  12. check_msg(self):

    • 作用:检查是否有来自服务器的待处理消息。
    • 返回值:操作码,表示消息类型。