1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286
| import usocket as socket import ustruct as struct from ubinascii import hexlify
class MQTTException(Exception): pass
class MQTTClient: def __init__( self, client_id, server, port=0, user=None, password=None, keepalive=0, ssl=False, ssl_params={}, ): """ 初始化MQTT客户端
Args: client_id (str): 客户端ID server (str): MQTT服务器地址 port (int, optional): 服务器端口,默认为0,自动选择1883或8883(取决于是否使用SSL) user (str, optional): 用户名,默认为None password (str, optional): 密码,默认为None keepalive (int, optional): 保持连接活跃的时间间隔(秒),默认为0(禁用保持连接) ssl (bool, optional): 是否使用SSL,默认为False ssl_params (dict, optional): SSL参数字典,默认为空字典 """ if port == 0: port = 8883 if ssl else 1883 self.client_id = client_id self.sock = None self.server = server self.port = port self.ssl = ssl self.ssl_params = ssl_params self.pid = 0 self.cb = None self.user = user self.pswd = password self.keepalive = keepalive self.lw_topic = None self.lw_msg = None self.lw_qos = 0 self.lw_retain = False
def _send_str(self, s): """ 发送字符串
Args: s (str): 要发送的字符串 """ self.sock.write(struct.pack("!H", len(s))) self.sock.write(s)
def _recv_len(self): """ 接收长度字段,用于处理可变长度的MQTT消息
Returns: int: 长度值 """ n = 0 sh = 0 while 1: b = self.sock.read(1)[0] n |= (b & 0x7F) << sh if not b & 0x80: return n sh += 7
def set_callback(self, f): """ 设置回调函数,处理订阅消息的回调
Args: f (function): 回调函数,接收两个参数:主题(topic)和消息(msg) """ self.cb = f
def set_last_will(self, topic, msg, retain=False, qos=0): """ 设置遗嘱消息
Args: topic (str): 遗嘱消息主题 msg (str): 遗嘱消息内容 retain (bool, optional): 是否保留消息,默认为False qos (int, optional): 消息质量服务等级(0、1或2),默认为0 """ assert 0 <= qos <= 2 assert topic self.lw_topic = topic self.lw_msg = msg self.lw_qos = qos self.lw_retain = retain
def connect(self, clean_session=True): """ 建立MQTT连接
Args: clean_session (bool, optional): 是否清除会话状态,默认为True
Returns: int: 连接状态,0表示成功,1表示失败 """ self.sock = socket.socket() addr = socket.getaddrinfo(self.server, self.port)[0][-1] self.sock.connect(addr) if self.ssl: import ussl
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params) premsg = bytearray(b"\x10\0\0\0\0\0") msg = bytearray(b"\x04MQTT\x04\x02\0\0")
sz = 10 + 2 + len(self.client_id) msg[6] = clean_session << 1 if self.user is not None: sz += 2 + len(self.user) + 2 + len(self.pswd) msg[6] |= 0xC0 if self.keepalive: assert self.keepalive < 65536 msg[7] |= self.keepalive >> 8 msg[8] |= self.keepalive & 0x00FF if self.lw_topic: sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg) msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3 msg[6] |= self.lw_retain << 5
i = 1 while sz > 0x7F: premsg[i] = (sz & 0x7F) | 0x80 sz >>= 7 i += 1 premsg[i] = sz
self.sock.write(premsg, i + 2) self.sock.write(msg) self._send_str(self.client_id) if self.lw_topic: self._send_str(self.lw_topic) self._send_str(self.lw_msg) if self.user is not None: self._send_str(self.user) self._send_str(self.pswd) resp = self.sock.read(4) assert resp[0] == 0x20 and resp[1] == 0x02 if resp[3] != 0: raise MQTTException(resp[3]) return resp[2] & 1
def disconnect(self): """ 断开MQTT连接 """ self.sock.write(b"\xe0\0") self.sock.close()
def ping(self): """ 发送PINGREQ消息 """ self.sock.write(b"\xc0\0")
def publish(self, topic, msg, retain=False, qos=0): """ 发布MQTT消息
Args: topic (str): 消息主题 msg (str): 消息内容 retain (bool, optional): 是否保留消息,默认为False qos (int, optional): 消息质量服务等级(0、1或2),默认为0 """ pkt = bytearray(b"\x30\0\0\0") pkt[0] |= qos << 1 | retain sz = 2 + len(topic) + len(msg) if qos > 0: sz += 2 assert sz < 2097152 i = 1 while sz > 0x7F: pkt[i] = (sz & 0x7F) | 0x80 sz >>= 7 i += 1 pkt[i] = sz self.sock.write(pkt, i + 1) self._send_str(topic) if qos > 0: self.pid += 1 pid = self.pid struct.pack_into("!H", pkt, 0, pid) self.sock.write(pkt, 2) self.sock.write(msg) if qos == 1: while 1: op = self.wait_msg() if op == 0x40: sz = self.sock.read(1) assert sz == b"\x02" rcv_pid = self.sock.read(2) rcv_pid = rcv_pid[0] << 8 | rcv_pid[1] if pid == rcv_pid: return elif qos == 2: assert 0
def subscribe(self, topic, qos=0): """ 订阅MQTT主题
Args: topic (str): 要订阅的主题 qos (int, optional): 订阅的消息质量服务等级(0、1或2),默认为0 """ assert self.cb is not None, "Subscribe callback is not set" pkt = bytearray(b"\x82\0\0\0") self.pid += 1 struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid) self.sock.write(pkt) self._send_str(topic) self.sock.write(qos.to_bytes(1, "little")) while 1: op = self.wait_msg() if op == 0x90: resp = self.sock.read(4) assert resp[1] == pkt[2] and resp[2] == pkt[3] if resp[3] == 0x80: raise MQTTException(resp[3]) return
def wait_msg(self): """ 等待单个MQTT消息,并处理它
Returns: int: 操作码,表示消息类型 """ res = self.sock.read(1) self.sock.setblocking(True) if res is None: return None if res == b"": raise OSError(-1) if res == b"\xd0": sz = self.sock.read(1)[0] assert sz == 0 return None op = res[0] if op & 0xF0 != 0x30: return op sz = self._recv_len() topic_len = self.sock.read(2) topic_len = (topic_len[0] << 8) | topic_len[1] topic = self.sock.read(topic_len) sz -= topic_len + 2 if op & 6: pid = self.sock.read(2) pid = pid[0] << 8 | pid[1] sz -= 2 msg = self.sock.read(sz) self.cb(topic, msg) if op & 6 == 2: pkt = bytearray(b"\x40\x02\0\0") struct.pack_into("!H", pkt, 2, pid) self.sock.write(pkt) elif op & 6 == 4: assert 0
def check_msg(self): """ 检查是否有来自服务器的待处理消息
Returns: int: 操作码,表示消息类型 """ self.sock.setblocking(False) return self.wait_msg()
|