03.连接与断开
MQTT 的连接过程
Client 建立到 Broker 的连接过程如下图所示:
- Client 发送 CONNECT 数据包给 Broker
- Broker 在收到 CONNECT 数据包之后,给 Client 返回一个 CONNACK 数据包
CONNECT 数据包
连接的建立由 Client 发起,Client 端首先向 Broker 发送一个 CONNECT 数据包,CONNECT 数据包包含以下内容。
可变头
在 CONNECT 数据包可变头中,包含以下信息:
- 协议名称(Protocol Name):值固定为字符 “MQTT”。
- 协议版本(Protocol Level):对 MQTT 3.1.1 来说,该值为 4。
- 用户名标识(User Name Flag):消息体中是否有用户名字段,1bit,0 或者 1。
- 密码标识(Password Flag):消息体中是否有密码字段,1bit,0 或者 1。
- 遗愿消息 Retain 标识(Will Retain):标识遗愿消息是否是 Retain 消息,1bit,0 或者 1。
- 遗愿消息 QOS 标识(Will QOS):标识遗愿消息的 QOS,2bit,0、1 或者 2。
- 遗愿标识(Will Flag):标识是否使用遗愿消息,1bit,0 或者 1。
- 会话清除标识(Clean Session):标识 Client 是否建立一个持久化的会话,1bit,0 或者 1。当该标识设为 0 时,代表 Client 希望建立一个持久会话的连接,Broker 将存储该 Client 订阅的主题和未接受的消息,否则 Broker 不会存储这些数据,同时在建立连接时清楚这个 Client 之前存在的持久化会话所保存的数据。
- 连接保活(Keep Alive):设置一个以秒为单位的时间间隔,Client 和 Broker 之间在这个时间间隔之内需要至少一次消息交互,否则 Client 和 Broker 会认为它们之间的连接已经断开。
消息体
CONNECT 数据包的消息体中包含以下数据:
-
客户端标识符(Client Identifier):Client Identifier 是用来标识 Client 身份的字段,在 MQTT 3.1.1 的版本中,这个字段的长度是 1 到 23 个字节,而且只能包含数字和 26 个字母(包括大小写),Broker 通过这个字段来区分不同的 Client。所以在连接的时候,应该保证 Client Identifier 是唯一的,所以我们可以使用 UUID,唯一的设备硬件标识,或者在 Android 设备中使用的话,可以使用 DEVICE_ID 等作为 Client Identifier 的取值来源。MQTT 协议中要求 Client 连接时必须带上 Client Identifier,但是也允许 Broker 在 Client Identifier 为空时,会为 Client 分配一个内部唯一的 Identifier。如果需要持久化会话的话,那必须为 Client 设定一个唯一的 Identifier。
-
用户名(Username):如果可变头中的用户名标识设为 1,那么消息体中将包含用户名字段,Broker 可以使用用户名和密码来对接入的 Client 进行验证,只允许已授权的 Client 接入。注意不同的 Client 需要使用不同的 Client Identifier,但它们可以使用同样的用户名和密码进行连接。
-
密码(Password):如果可变头中的密码标识设为 1,那么消息体中将包含密码字段。
-
遗愿主题(Will Topic):如果可变头中的遗愿标识设为 1,那么消息体中将包含遗愿主题,当 Client 非正常地中断连接的时候,Broker 将向指定的遗愿主题中发布遗愿消息。
-
遗愿消息(Will Message):如果可变头中的遗愿标识设为 1,那么消息体中将包含遗愿消息,当 Client 非正常地中断连接的时候,Broker 将向指定的遗愿主题中发布由该字段指定的内容。
CONNACK 数据包
当 Broker 收到 Client 的 CONNECT 数据包之后,将检查并检验 CONNECT 数据包的内容,之后回复 Client 一个 CONNACK 数据包。CONNACK 数据包包含以下内容。
可变头
CONNACK 数据包的可变头中,包含以下信息:
-
会话存在标识(Session Present Flag):用于标识在 Broker 上是否已存在该 Client(用 Client Identifier 区分)的持久性会话,1bit,0 或者 1。当 Client 在连接时设置
Clean Session=1
(会话清除标识见 CONNECT 数据包的可变头),则 CONNACK 中的Session Present Flag
始终为 0;当 Client 在连接时设置Clean Session=0
,那么存在下面两种情况 -
- 如果 Broker 上面保留了这个 Client 之前留下的持久性会话,那么 CONNACK 中的
Session Present Flag
值为 1; - 如果 Broker 上面没有保存这个 Client 之前留下的会话数据,那么 CONNACK 中的
Session Present Flag
值为 0;
- 如果 Broker 上面保留了这个 Client 之前留下的持久性会话,那么 CONNACK 中的
Session Present Flag 这个特性是在 MQTT3.1.1 版本中新加入的,之前的版本中没有这个标识
- 连接返回码(Connect Return code):用于标识 Client 是 Broker 的连接是否建立成功,连接返回码有以下一些值:
Return code 连接状态 0 连接已经建立 1 连接被拒绝,不允许的协议版本 2 连接被拒绝,Client Identifier 被拒绝 3 连接被拒绝,服务器不可用 4 连接被拒绝,错误的用户名或密码 5 连接被拒绝,未授权
Return code=2
代表的是 Client Identifier 格式不规范,比如长度超过 23 个字符,包含了不允许的字符等(部分 Broker 的实现在协议标准上做了扩展,比如允许超过 23 个字符的 Client Identifer 等)Return code=4
在 MQTT 协议中的含义是 Username 和 Password 的格式不正确,但是在大部分的 Broker 实现中,使用错误的用户名和密码时返回的也是 4,所以可以认为 4 表示就是错误的用户名或者密码;Return code=5
一般表示 Broker 不使用用户名和密码验证而是采用 IP 地址或者 Client Identifier 进行认证的时候使用,来标识 Client 没有通过验证。
消息头
CONNACK 没有消息体。综上所述当 Client 向 Broker 发送了 CONNECT 数据包并获得了 Return code 为 0 的 CONNACK 数据包后,则代表连接建立成功了。之后则可以进行消息的发布和订阅了。
MQTT 断开过程
MQTT 的断开过程分为以下两种:
- 一种是 Client 主动关闭连接
- 一种是 Broker 主动关闭连接
Client 主动关闭连接
Client 主动关闭连接的流程非常简单,只需要 Client 向 Broker 发送一个 DISCONNECT 数据包就可以了。DISCONNECT 数据包没有可变头(Variable header)和消息体(Payload)。在 Client 发送完 DISCONNECT 之后,就可以关闭底层的 TCP 连接了,不需要等待 Broker 的回复(Broker 也不会对 DISCONNECT 数据包回复)。
为什么 Client 关闭 TCP 连接之前,要发送一个和 Broker 没有交互的数据包,而不是关闭底层的 TCP 连接的?因为这涉及到 MQTT 协议的一个特性,在 MQTT 协议中,Broker 需要判断 Client 是否是正常的断开连接。当 Broker 收到 Client 的 DISCONNECT 数据包的时候,Broker 则认为 Client 是正常断开连接的,那么会丢弃当前连接指定的遗愿消息。如果 Broker 检测到 Client 连接丢失,但是又没有收到 DISCONNECT 数据包,则认为 Client 是非正常断开的,就会向在连接的时候指定的遗愿主题发布遗愿消息。
Broker 主动关闭连接
MQTT 协议规定 Broker 在没有收到 Client 的 DISCONNECT 数据包之前都应该和 Client 保持连接。只有当 Broker 在 Keep Alive 的时间间隔内,没有收到 Client 的任何 MQTT 数据包的时候会主动关闭连接。一些 Broker 的实现在 MQTT 协议上做了一些拓展,支持 Client 的连接管理,可以主动和某个 Client 断开连接。
Broker 主动关闭连接之前不会向 Client 发送任何 MQTT 数据包,而是直接关闭底层的 TCP 连接。
3. 代码实践
下面使用 Python 的 paho mqtt 库来实现 MQTT 的连接,Broker 的话使用自己搭建的 mosquitto。
3.1. 建立持久会话的连接
import paho.mqtt.client as mqtt
'''
收到Broker发来的CONNACK消息,就会执行on_connect()回调函数
打印出CONNACK数据包中的Connect Return code、Session Present Flag
'''
def on_connect(client, userdata, flags, rc):
print("return code:", rc)
print("session present:", flags['session present'])
# 通过client_id指定Client Identifier
# 设置clean_session为False表示要建立一个持久性会话
mqtt_client = mqtt.Client(client_id="demo_mqtt", clean_session=False)
# 将回调函数指派给客户端实例
mqtt_client.on_connect = on_connect
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
运行 Python 代码之后,输出结果如下所示:
return code: 0
session present: 0
return code 为 0 表示连接已成功建立,因为demo_mqtt
的 Client 第一次建立,所以SessionPresent
为 0。再次运行,输出结果变成如下:
return code: 0
session present: 1
3.2. 建立非持久会话的连接
相比建立持久会话的连接的代码,只需要将clean_session
指定为了 True 即可建立一个非持久会话的连接了。
import paho.mqtt.client as mqtt
'''
收到Broker发来的CONNACK消息,就会执行on_connect()回调函数
打印出CONNACK数据包中的Connect Return code、Session Present Flag
'''
def on_connect(client, userdata, flags, rc):
print("return code:", rc)
print("session present:", flags['session present'])
# 通过client_id指定Client Identifier
# 设置clean_session为True表示要建立一个非持久性的会话
mqtt_client = mqtt.Client(client_id="demo_mqtt", clean_session=True)
# 将回调函数指派给客户端实例
mqtt_client.on_connect = on_connect
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
运行该代码,输出结果为:
return code: 0
session present: 0
并且无论运行多少次,SessionPresent
都是为 0
paho mqtt 的 Python 版本,默认
clean_session
为 True
3.3. 使用相同的 Client Identifier 进行连接
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("return code:", rc)
print("session present:", flags['session present'])
mqtt_client = mqtt.Client(client_id="demo_mqtt")
mqtt_client.on_connect = on_connect
mqtt_client.connect("192.168.10.239", 1883)
mqtt_client.loop_forever()
分别在两个终端中运行上述同样的代码,那么两个终端中会不停打印如下内容:
return code: 0
session present: 0
return code: 0
session present: 0
......
因为当两个 Client 中使用同样的 Client Identifie 进行连接时,那么第二个 Client 连接成功后,Broker 会关闭和第一个已经连接上的 Client 连接。然而因为我们使用了loop_forever()
函数,这个函数会一直阻塞,直到 Client 调用disconnect()
,并且这个函数会在断开后自动重连。所以当连接被 Broker 关闭时,它又会尝试重新连接,结果就是这两个 Client 交替地把对方顶下线,因此在使用中我们需要保证每一个设备使用的 Client Identifier 是唯一的。