
| import usocket as socket import ustruct as struct from ubinascii import hexlify
class MQTTException(Exception): pass
class MQTTClient: def __init__( self, client_id, server, port=0, user=None, password=None, keepalive=0, ssl=False, ssl_params={}, ): """ 初始化MQTT客户端
Args: client_id (str): 客户端ID server (str): MQTT服务器地址 port (int, optional): 服务器端口,默认为0,自动选择1883或8883(取决于是否使用SSL) user (str, optional): 用户名,默认为None password (str, optional): 密码,默认为None keepalive (int, optional): 保持连接活跃的时间间隔(秒),默认为0(禁用保持连接) ssl (bool, optional): 是否使用SSL,默认为False ssl_params (dict, optional): SSL参数字典,默认为空字典 """ if port == 0: port = 8883 if ssl else 1883 self.client_id = client_id self.sock = None self.server = server self.port = port self.ssl = ssl self.ssl_params = ssl_params self.pid = 0 self.cb = None self.user = user self.pswd = password self.keepalive = keepalive self.lw_topic = None self.lw_msg = None self.lw_qos = 0 self.lw_retain = False
def _send_str(self, s): """ 发送字符串
Args: s (str): 要发送的字符串 """ self.sock.write(struct.pack("!H", len(s))) self.sock.write(s)
def _recv_len(self): """ 接收长度字段,用于处理可变长度的MQTT消息
Returns: int: 长度值 """ n = 0 sh = 0 while 1: b = self.sock.read(1)[0] n |= (b & 0x7F) << sh if not b & 0x80: return n sh += 7
def set_callback(self, f): """ 设置回调函数,处理订阅消息的回调
Args: f (function): 回调函数,接收两个参数:主题(topic)和消息(msg) """ self.cb = f
def set_last_will(self, topic, msg, retain=False, qos=0): """ 设置遗嘱消息
Args: topic (str): 遗嘱消息主题 msg (str): 遗嘱消息内容 retain (bool, optional): 是否保留消息,默认为False qos (int, optional): 消息质量服务等级(0、1或2),默认为0 """ assert 0 <= qos <= 2 assert topic self.lw_topic = topic self.lw_msg = msg self.lw_qos = qos self.lw_retain = retain
def connect(self, clean_session=True): """ 建立MQTT连接
Args: clean_session (bool, optional): 是否清除会话状态,默认为True
Returns: int: 连接状态,0表示成功,1表示失败 """ self.sock = socket.socket() addr = socket.getaddrinfo(self.server, self.port)[0][-1] self.sock.connect(addr) if self.ssl: import ussl
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params) premsg = bytearray(b"\x10\0\0\0\0\0") msg = bytearray(b"\x04MQTT\x04\x02\0\0")
sz = 10 + 2 + len(self.client_id) msg[6] = clean_session << 1 if self.user is not None: sz += 2 + len(self.user) + 2 + len(self.pswd) msg[6] |= 0xC0 if self.keepalive: assert self.keepalive < 65536 msg[7] |= self.keepalive >> 8 msg[8] |= self.keepalive & 0x00FF if self.lw_topic: sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg) msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3 msg[6] |= self.lw_retain << 5
i = 1 while sz > 0x7F: premsg[i] = (sz & 0x7F) | 0x80 sz >>= 7 i += 1 premsg[i] = sz
self.sock.write(premsg, i + 2) self.sock.write(msg) self._send_str(self.client_id) if self.lw_topic: self._send_str(self.lw_topic) self._send_str(self.lw_msg) if self.user is not None: self._send_str(self.user) self._send_str(self.pswd) resp = self.sock.read(4) assert resp[0] == 0x20 and resp[1] == 0x02 if resp[3] != 0: raise MQTTException(resp[3]) return resp[2] & 1
def disconnect(self): """ 断开MQTT连接 """ self.sock.write(b"\xe0\0") self.sock.close()
def ping(self): """ 发送PINGREQ消息 """ self.sock.write(b"\xc0\0")
def publish(self, topic, msg, retain=False, qos=0): """ 发布MQTT消息
Args: topic (str): 消息主题 msg (str): 消息内容 retain (bool, optional): 是否保留消息,默认为False qos (int, optional): 消息质量服务等级(0、1或2),默认为0 """ pkt = bytearray(b"\x30\0\0\0") pkt[0] |= qos << 1 | retain sz = 2 + len(topic) + len(msg) if qos > 0: sz += 2 assert sz < 2097152 i = 1 while sz > 0x7F: pkt[i] = (sz & 0x7F) | 0x80 sz >>= 7 i += 1 pkt[i] = sz self.sock.write(pkt, i + 1) self._send_str(topic) if qos > 0: self.pid += 1 pid = self.pid struct.pack_into("!H", pkt, 0, pid) self.sock.write(pkt, 2) self.sock.write(msg) if qos == 1: while 1: op = self.wait_msg() if op == 0x40: sz = self.sock.read(1) assert sz == b"\x02" rcv_pid = self.sock.read(2) rcv_pid = rcv_pid[0] << 8 | rcv_pid[1] if pid == rcv_pid: return elif qos == 2: assert 0
def subscribe(self, topic, qos=0): """ 订阅MQTT主题
Args: topic (str): 要订阅的主题 qos (int, optional): 订阅的消息质量服务等级(0、1或2),默认为0 """ assert self.cb is not None, "Subscribe callback is not set" pkt = bytearray(b"\x82\0\0\0") self.pid += 1 struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid) self.sock.write(pkt) self._send_str(topic) self.sock.write(qos.to_bytes(1, "little")) while 1: op = self.wait_msg() if op == 0x90: resp = self.sock.read(4) assert resp[1] == pkt[2] and resp[2] == pkt[3] if resp[3] == 0x80: raise MQTTException(resp[3]) return
def wait_msg(self): """ 等待单个MQTT消息,并处理它
Returns: int: 操作码,表示消息类型 """ res = self.sock.read(1) self.sock.setblocking(True) if res is None: return None if res == b"": raise OSError(-1) if res == b"\xd0": sz = self.sock.read(1)[0] assert sz == 0 return None op = res[0] if op & 0xF0 != 0x30: return op sz = self._recv_len() topic_len = self.sock.read(2) topic_len = (topic_len[0] << 8) | topic_len[1] topic = self.sock.read(topic_len) sz -= topic_len + 2 if op & 6: pid = self.sock.read(2) pid = pid[0] << 8 | pid[1] sz -= 2 msg = self.sock.read(sz) self.cb(topic, msg) if op & 6 == 2: pkt = bytearray(b"\x40\x02\0\0") struct.pack_into("!H", pkt, 2, pid) self.sock.write(pkt) elif op & 6 == 4: assert 0
def check_msg(self): """ 检查是否有来自服务器的待处理消息
Returns: int: 操作码,表示消息类型 """ self.sock.setblocking(False) return self.wait_msg()
|