学前班
最后登录1970-1-1
在线时间 小时
注册时间2018-10-9
|
我在使用霸道开发板 移植 MQTT 订阅总是收不到消息。有经验的,帮帮我看看,感激不尽
订阅函数为
[mw_shl_code=c,true]
int mqtt_subscrib(char *pTopic,char *pMessage)
{
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
int rc = 0;
unsigned char buf[200];
int buflen = sizeof(buf);
int msgid = 1;
MQTTString topicString = MQTTString_initializer;
int req_qos = 0;
int len = 0;
rc = transport_open();
if(rc < 0){
printf("transport_open error\r\n");
return rc;
}
data.clientID.cstring = "mmme"; //客户设备ID客户端标识,用于区分每个客户端 )
data.keepAliveInterval = 1; //服务器保持时间(服务器保持连接时间,超过该时间后,服务器会主动断开连接,单位为秒)
data.cleansession = 1; //该标志置1服务器必须丢弃之前保持的客户端的信息,将该连接视为“不存在”
data.username.cstring = "admin"; //MQTT云服务器登录用户名
data.password.cstring = "password"; //MQTT云服务器登录密码
len = MQTTSerialize_connect(buf, buflen, &data);
rc = transport_sendPacketBuffer(buf, len);
if(rc != len){
printf("connect transport_sendPacketBuffer error\r\n");
goto exit;
}
/* wait for connack */
if (MQTTPacket_read(buf, buflen, transport_getdata) == CONNACK)
{
unsigned char sessionPresent, connack_rc,refur=0;
refur =MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen);
if ( refur!= 1 ) //|| connack_rc != 0
{
printf("Unable to connect, return code %d\r\n ", connack_rc);
goto exit;
}
}else{
printf("MQTTPacket_read error\r\n");
goto exit;
}
/* subscribe */
topicString.cstring = pTopic;
len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);
rc = transport_sendPacketBuffer(buf, len);
if(rc != len){
printf("connect transport_sendPacketBuffer error \r\n");
goto exit;
}
if (MQTTPacket_read(buf, buflen, transport_getdata) == SUBACK) /* wait for suback */
{
unsigned short submsgid;
int subcount;
int granted_qos;
rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
if (granted_qos != 0)
{
printf("granted qos != 0, %d \r\n", granted_qos);
goto exit;
}
}
else
goto exit;
/* loop getting msgs on subscribed topic */
topicString.cstring = pTopic;
memset(buf,0,buflen);
//transport_getdata接收数据会阻塞,除非服务器断开连接后才返回
if (MQTTPacket_read(buf, buflen, transport_getdata) == PUBLISH){
unsigned char dup;
int qos;
unsigned char retained;
unsigned short msgid;
int payloadlen_in;
unsigned char* payload_in;
MQTTString receivedTopic;
rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
&payload_in, &payloadlen_in, buf, buflen);
printf("message arrived %d: %s \r\n", payloadlen_in, payload_in);
strcpy(pMessage,(const char *)payload_in);
}
printf("disconnecting \r\n ");
len = MQTTSerialize_disconnect(buf, buflen);
rc = transport_sendPacketBuffer(buf, len);
exit:
{
rc=-1;
transport_close();//关闭socket
}
return rc;
}[/mw_shl_code]
主函数部分就直接调用了
[mw_shl_code=c,true]int main(void)
{
int rc;
char meassage[200]={0};
systick_init(72); /*初始化Systick工作时钟*/
USART1_Config(); /*初始化串口通信:115200@8-n-1*/
i2c_CfgGpio(); /*初始化eeprom*/
gpio_for_w5500_config(); /*初始化MCU相关引脚*/
reset_w5500(); /*硬复位W5500*/
TIM3_Int_Init(100,719);
set_w5500_mac(); /*配置MAC地址*/
// set_w5500_ip(); /*配置IP地址*/
socket_buf_init(txsize, rxsize); /*初始化8个Socket的发送接收缓存大小*/
printf(" 电脑作为TCP服务器,让W5500作为 TCP客户端去连接 \r\n");
printf(" 服务器IP:%d.%d.%d.%d\r\n",domain_ip[0],domain_ip[1],domain_ip[2],domain_ip[3]);
printf(" 监听端口:%d \r\n",remote_port);
while(1) /*循环执行的函数*/
{
do_dhcp();
while(dhcp_ok)
{
// do_tcp_client();
// delay_s(1);
memset(meassage,0,sizeof(meassage));
rc=0;
printf("\r\n mqtt_subscrib state \r\n");
rc = mqtt_subscrib("147",meassage);
printf("\r\n rc = %d\r\n",rc);
if(rc >= 0){
printf("meassage = %s \r\n \r\n ",meassage);
delay_s(10);
}
}
}
}
[/mw_shl_code]
|
|