初始化
[mw_shl_code=c,true]int mqtt_init(mqtt_client_t* c, client_init_params_t* init)[/mw_shl_code]
主要是配置mqtt_client_t结构的相关信息,如果没有指定初始化参数,则系统会提供默认的参数。
但连接部分的参数则必须指定:
[mw_shl_code=c,true] init_params.connect_params.network_params.addr = "[你的mqtt服务器IP地址或者是域名]";
init_params.connect_params.network_params.port = 1883; //端口号
init_params.connect_params.user_name = "jiejietop";
init_params.connect_params.password = "123456";
init_params.connect_params.client_id = "clientid";
mqtt_init(&client, &init_params);[/mw_shl_code]
连接服务器
[mw_shl_code=c,true]int mqtt_connect(mqtt_client_t* c);[/mw_shl_code]
参数只有 mqtt_client_t 类型的指针,字符串类型的主题(支持通配符"#" "+"),主题的服务质量,以及收到报文的处理函数,如不指定则有默认处理函数。连接服务器则是使用非异步的方式设计,因为必须等待连接上服务器才能进行下一步操作。
过程如下:
[mw_shl_code=c,true]c->network->connect(c->network);[/mw_shl_code]
[mw_shl_code=c,true]MQTTSerialize_connect(c->write_buf, c->write_buf_size, &connect_data)
mqtt_send_packet(c, len, &connect_timer)[/mw_shl_code]
[mw_shl_code=c,true]mqtt_wait_packet(c, CONNACK, &connect_timer)[/mw_shl_code]
- 连接成功后创建一个内部线程mqtt_yield_thread
[mw_shl_code=c,true]platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK)[/mw_shl_code]
订阅报文
[mw_shl_code=c,true]int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)[/mw_shl_code]
订阅报文使用异步设计来实现的:
过程如下:
[mw_shl_code=c,true]MQTTSerialize_subscribe(c->write_buf, c->write_buf_size, 0, mqtt_get_next_packet_id(c), 1, &topic, (int*)&qos)
mqtt_send_packet(c, len, &timer)[/mw_shl_code]
- 创建对应的消息处理节点,这个消息节点在收到服务器的SUBACK订阅应答报文后会挂载到消息处理列表msg_handler_list上
[mw_shl_code=c,true]mqtt_msg_handler_create(topic_filter, qos, handler)[/mw_shl_code]
- 在发送了报文给服务器那就要等待服务器的响应了,记录这个等待SUBACK
[mw_shl_code=c,true]mqtt_ack_list_record(c, SUBACK, mqtt_get_next_packet_id(c), len, msg_handler)[/mw_shl_code]
取消订阅与订阅报文的逻辑基本差不多的~
[mw_shl_code=c,true]MQTTSerialize_unsubscribe(c->write_buf, c->write_buf_size, 0, packet_id, 1, &topic)
mqtt_send_packet(c, len, &timer)[/mw_shl_code]
- 创建对应的消息处理节点,这个消息节点在收到服务器的UNSUBACK取消订阅应答报文后将消息处理列表msg_handler_list上的已经订阅的主题消息节点销毁
[mw_shl_code=c,true]mqtt_msg_handler_create((const char*)topic_filter, QOS0, NULL)[/mw_shl_code]
- 在发送了报文给服务器那就要等待服务器的响应了,先记录这个等待UNSUBACK
[mw_shl_code=c,true]mqtt_ack_list_record(c, UNSUBACK, packet_id, len, msg_handler)[/mw_shl_code]
发布报文[mw_shl_code=c,true]int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg)[/mw_shl_code]
参数只有 mqtt_client_t 类型的指针,字符串类型的主题(支持通配符),要发布的消息(包括服务质量、消息主体)。
[mw_shl_code=c,true] mqtt_message_t msg;
msg.qos = 2;
msg.payload = (void *) buf;
mqtt_publish(&client, "testtopic1", &msg);[/mw_shl_code]
核心思想都差不多,过程如下:
[mw_shl_code=c,true]MQTTSerialize_publish(c->write_buf, c->write_buf_size, 0, msg->qos, msg->retained, msg->id,
topic, (unsigned char*)msg->payload, msg->payloadlen);
mqtt_send_packet(c, len, &timer)[/mw_shl_code]
- 对于QOS0的逻辑,不做任何处理,对于QOS1和QOS2的报文则需要记录下来,在没收到服务器应答的时候进行重发
[mw_shl_code=c,true] if (QOS1 == msg->qos) {
rc = mqtt_ack_list_record(c, PUBACK, mqtt_get_next_packet_id(c), len, NULL);
} else if (QOS2 == msg->qos) {
rc = mqtt_ack_list_record(c, PUBREC, mqtt_get_next_packet_id(c), len, NULL);
}[/mw_shl_code]
- 还有非常重要的一点,重发报文的MQTT报文头部需要设置DUP标志位,这是MQTT协议的标准,因此,在重发的时候作者直接操作了报文的DUP标志位,因为修改DUP标志位的函数我没有从MQTT库中找到,所以我封装了一个函数,这与LwIP中的交叉存取思想是一个道理,它假设我知道MQTT报文的所有操作,所以我可以操作它,这样子可以提高很多效率:
[mw_shl_code=c,true]mqtt_set_publish_dup(c,1); /* may resend this data, set the udp flag in advance */[/mw_shl_code]
内部线程
[mw_shl_code=c,true]static void mqtt_yield_thread(void *arg)[/mw_shl_code]
主要是对mqtt_yield函数的返回值做处理,比如在disconnect的时候销毁这个线程。
核心的处理函数`mqtt_yield`[mw_shl_code=c,true]static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)[/mw_shl_code]
对不同的包使用不一样的处理:
[mw_shl_code=c,true] switch (packet_type) {
case 0: /* timed out reading packet */
break;
case CONNACK:
break;
case PUBACK:
case PUBCOMP:
rc = mqtt_puback_and_pubcomp_packet_handle(c, timer);
break;
case SUBACK:
rc = mqtt_suback_packet_handle(c, timer);
break;
case UNSUBACK:
rc = mqtt_unsuback_packet_handle(c, timer);
break;
case PUBLISH:
rc = mqtt_publish_packet_handle(c, timer);
break;
case PUBREC:
case PUBREL:
rc = mqtt_pubrec_and_pubrel_packet_handle(c, timer);
break;
case PINGRESP:
c->ping_outstanding = 0;
break;
default:
goto exit;
}[/mw_shl_code]
并且做保活的处理:
[mw_shl_code=c,true]mqtt_keep_alive(c)[/mw_shl_code]
当发生超时后,
[mw_shl_code=c,true]if (platform_timer_is_expired(&c->last_sent) || platform_timer_is_expired(&c->last_received)) [/mw_shl_code]
序列号一个心跳包并且发送给服务器
[mw_shl_code=c,true]MQTTSerialize_pingreq(c->write_buf, c->write_buf_size);
mqtt_send_packet(c, len, &timer);[/mw_shl_code]
当再次发生超时后,表示与服务器的连接已断开,需要重连的操作,设置客户端状态为断开连接
[mw_shl_code=c,true]mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);[/mw_shl_code]
- ack链表的扫描,当收到服务器的报文时,对ack列表进行扫描操作
[mw_shl_code=c,true]mqtt_ack_list_scan(c);[/mw_shl_code]
当超时后就销毁ack链表节点:
[mw_shl_code=c,true]mqtt_ack_handler_destroy(ack_handler);[/mw_shl_code]
当然下面这几种报文则需要重发操作:(PUBACK 、PUBREC、 PUBREL 、PUBCOMP,保证QOS1 QOS2的服务质量)
[mw_shl_code=c,true]if ((ack_handler->type == PUBACK) || (ack_handler->type == PUBREC) || (ack_handler->type == PUBREL) || (ack_handler->type == PUBCOMP))
mqtt_ack_handler_resend(c, ack_handler);[/mw_shl_code]
[mw_shl_code=c,true]mqtt_try_reconnect(c);[/mw_shl_code]
重连成功后尝试重新订阅报文,保证恢复原始状态~
[mw_shl_code=c,true]mqtt_try_resubscribe(c)[/mw_shl_code]
发布应答与发布完成报文的处理
[mw_shl_code=c,true]static int mqtt_puback_and_pubcomp_packet_handle(mqtt_client_t *c, platform_timer_t *timer)[/mw_shl_code]
[mw_shl_code=c,true]MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size)[/mw_shl_code]
[mw_shl_code=c,true]mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);[/mw_shl_code]
订阅应答报文的处理
[mw_shl_code=c,true]static int mqtt_suback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)[/mw_shl_code]
[mw_shl_code=c,true]MQTTDeserialize_suback(&packet_id, 1, &count, (int*)&granted_qos, c->read_buf, c->read_buf_size)[/mw_shl_code]
[mw_shl_code=c,true]mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);[/mw_shl_code]
- 安装对应的订阅消息处理函数,如果是已存在的则不会安装
[mw_shl_code=c,true]mqtt_msg_handlers_install(c, msg_handler);[/mw_shl_code]
取消订阅应答报文的处理
[mw_shl_code=c,true]static int mqtt_unsuback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)[/mw_shl_code]
[mw_shl_code=c,true]MQTTDeserialize_unsuback(&packet_id, c->read_buf, c->read_buf_size)[/mw_shl_code]
[mw_shl_code=c,true]mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler)[/mw_shl_code]
[mw_shl_code=c,true]mqtt_msg_handler_destory(msg_handler);[/mw_shl_code]
来自服务器的发布报文的处理
[mw_shl_code=c,true]static int mqtt_publish_packet_handle(mqtt_client_t *c, platform_timer_t *timer)[/mw_shl_code]
[mw_shl_code=c,true]MQTTDeserialize_publish(&msg.dup, &qos, &msg.retained, &msg.id, &topic_name,
(unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->read_buf, c->read_buf_size)[/mw_shl_code]
[mw_shl_code=c,true]mqtt_deliver_message(c, &topic_name, &msg);[/mw_shl_code]
- 对于QOS1的报文,还需要发送一个PUBACK应答报文给服务器
[mw_shl_code=c,true]MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBACK, 0, msg.id);[/mw_shl_code]
- 而对于QOS2的报文则需要发送PUBREC报文给服务器,除此之外还需要记录PUBREL到ack链表上,等待服务器的发布释放报文,最后再去处理这个消息
[mw_shl_code=c,true]MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBREC, 0, msg.id);
mqtt_ack_list_record(c, PUBREL, msg.id + 1, len, NULL)
mqtt_deliver_message(c, &topic_name, &msg);[/mw_shl_code]
说明:一旦注册到ack列表上的报文,当具有重复的报文是不会重新被注册的,它会通过mqtt_ack_list_node_is_exist函数判断这个节点是否存在,主要是依赖等待响应的消息类型与msgid。
发布收到与发布释放`报文的处理
[mw_shl_code=c,true]static int mqtt_pubrec_and_pubrel_packet_handle(mqtt_client_t *c, platform_timer_t *timer)[/mw_shl_code]
[mw_shl_code=c,true]MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size)[/mw_shl_code]
[mw_shl_code=c,true]mqtt_publish_ack_packet(c, packet_id, packet_type);[/mw_shl_code]
[mw_shl_code=c,true]mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler)[/mw_shl_code]
在后台测试[mw_shl_code=c,true]nohup ./mqtt-client > log.out 2>&1 &[/mw_shl_code]