野火电子论坛

 找回密码
 注册

QQ登录

只需一步,快速开始

查看: 12338|回复: 1

【转】各类应用MQTT接入OneNet心得

[复制链接]
发表于 2017-7-27 13:47:24 | 显示全部楼层 |阅读模式

各位朋友大家好,本人为重庆工商大学物联网工程大二在校学生,欢迎各位与我交流。同时,我们在学校创立了一个创意科技团队,CO团队,平时做一些物联网相关的小创意项目。希望大家了解一下我们,也欢迎各位与我联系。我的邮箱是chnhawk@foxmail.com

前言:本篇主要记录一下我个人使用mqtt协议上OneNet云时踩过的一些坑。之前使用过mosquito在自己的服务器上搭建的mqtt转发服务,但是和OneNet的使用还是有一些不一样的地方。前面记录的大都是开发的过程和一些小心得,后面会附上自己的测试代码。对于大佬来说可能这算不上什么干货,但是对于高校学生或者初学者来讲,还没有较多的开发经验,故作此篇文章,记录一下上云的心得。本文存在有错误以及不正规的地方,还恳请各位批评斧正!

一、Android设备使用MQTT协议接入OneNet平台
1. 用到的MQTT库:eclipse.paho
直接在app的gradle内dependencies添加依赖
compile 'org.eclipse.pahorg.eclipse.paho.client.mqttv3:1.1.0'
compile 'org.eclipse.pahorg.eclipse.paho.android.service:1.1.1'

注意:使用eclipse.paho一定记得要在AndroidManifest.xml中添加服务
<serviceandroid:name="org.eclipse.paho.android.service.MqttService"/>
否则没有办法正常使用

2. 连接上OneNet的服务器
需要设置
clientId为deviceId,即创建设备时的设备号
userName为productId,即创建产品的产品号
password为APIkey或者authInfo(即自己设置鉴权信息字符串)
因为APIkey太长不好记忆,所以我选择鉴权

1) 首先配置连接设置
MqttConnectOptions option = newMqttConnectOptions();
option.setUserName(productId);
option.setPassword(authInfo.toCharArray());
option.setCleanSession(false);

2) 调用mqttclient的连接方法
MqttAndroidClient mqttClient = newMqttAndroidClient(mContext, ("tcp://" + serverIP + ":" +serverPort), deviceId);
mqttClient.connet(option);
//OneNet的ip为183.230.40.39
//mqtt端口为6002

这样基本上就连接上OneNet的服务器了,ConnAck包这些是paho库帮我们完成的接收确认,我们只需要调用连接的这个函数即可,非常方便。

ps.当初看开发文档的时候一开始没有看到设置用户名这些,因为照着接入流程做,看到了服务器的ip和端口就直接跑去连接了,发现一直连接不上。直到再次看了一遍开发文档,拖到文档下面的常见问题才发现还需要设置这些。感觉这些重要的设置内容应该写在前面比较好,第一次没注意看,很尴尬。

3.发布及订阅自己的topic
这个很简单,之前用自己的服务器也是这样做的,
1) 订阅:
mqttClient.subscribe("topic",0); //参数一为topic字符串, 参数二为QoS级别

2) 发布:
MqttMessage msg = new MqttMessage();
msg.setPayload(payload.getBytes());  //发布内容
msg.setQos(1);                                 //设置发布级别
mqttClient.publish(topic, msg);          //发布

我们的项目是以topic来区别设备的
譬如,Android应用就订阅一个topic为Android/id,id为编号
底层设备就订阅一个topic为Device/id
然后发送指令直接向对应的topic发送就行

ps.一开始接入OneNet我也就是这么做的,使用的自己的topic来通信,完全把OneNet平台当成一个broker(转发器)了,到后面才发现我们简直是用高射炮打了蚊子,根本没有使用正确的姿势来使用OneNet平台。

4.使用mqtt上传数据点/获取数据
OneNet为每一个产品提供了数据流,在平台上记录并且可以生成数据的曲线,要想让我们通过mqtt协议上传的数据能够保存在数据流上,就得向$dp这个topic发送一串规定格式的数据。
我翻阅了一下OneNet的mqtt协议手册,没能搞清楚,翻车了。试了几次向$dp这个topic发送数据,在设备数据界面看都没有成功更新到数据,最让人疑惑的是,一旦发送了数据,设备就会掉线,需要重连。不太清楚OneNet平台是不是有什么发布数据就让你强制下线的机制。并且因为平台不允许订阅$打头的数据,所以也不清楚数据成功发送出去了没有。
起初我以为是OneNet服务器的问题,不过其他的topic还是可以正常的使用,并且换用平台提供的官方客户端,又可以成功发送且对数据进行更新。
1.png (0 Bytes, 下载次数: 0)
下载附件
OneNet提供的mqtt测试客户端
昨天 21:51 上传




(使用OneNet提供的mqtt测试程序上传数据点,同时可以看到平台上显示设备的数据更新了)
1.png (0 Bytes, 下载次数: 0)
下载附件
数据点显示更新
昨天 21:52 上传



于是我在想是不是发送的格式还是不正确,就发现了这个
1.png (0 Bytes, 下载次数: 0)
下载附件
测试客户端的调试窗口显示刚刚发布的内容
昨天 21:53 上传



这个是官方软件的调试输出窗口,可以看到刚刚发布出去的payload为上图所示的一串16进制数据。于是我尝试将这段数据复制到mqtt.fx软件并进行发布。

ps.mqttfx是一款非常方便的测试mqtt的软件。不过在win7系统下经常容易出现窗口消失的bug,实质上是窗口跑到屏幕外面去了。可以通过在状态栏-右键-移动(M)-使用方向键让窗口回到屏幕内。
2.png (0 Bytes, 下载次数: 0)
下载附件
移动mqtt.fx窗口
昨天 21:55 上传



使用mqtt.fx进行测试
3.png (0 Bytes, 下载次数: 0)
下载附件
通过mqtt.fx发布消息
昨天 21:55 上传



经过测试,很遗憾还是没有成功,这让我有些郁闷。

于是决定还是通过wireshark软件对OneNet平台提供的客户端进行抓包,看看他究竟发送的是什么。

1.png (0 Bytes, 下载次数: 0)
下载附件
OneNet平台提供的客户端发送的数据
昨天 21:57 上传




可以看到确实是向$dp这个topic发送了消息,7b是”{”转为16进制,那么前面的01 00 5e就是协议文档里面提到的发送类型、长度高位、长度低位了。
我之前发送的也确实是按照这个协议走的啊,为什么不成功呢?

于是再来抓一下我之前程序发送的内容(这里丢人了,捂脸)

2.png (0 Bytes, 下载次数: 0)
下载附件
我自己发送的数据
昨天 21:57 上传



附上我之前的代码:

3.png (0 Bytes, 下载次数: 0)
下载附件
错误的代码示范
昨天 21:57 上传




这样问题就显而易见了。
协议中的格式讲的是发送16进制的01 00 length,而通过我之前编写的这段代码发送出去的是字符串”01 00length”,同理,之前测试通过mqttfx发送出去的就更不对了。

故赶紧修改代码为:

4.png (0 Bytes, 下载次数: 0)
下载附件
修改之后的代码
昨天 21:57 上传



这样就能成功地将数据库上传到OneNet平台了。

总结一下,在这一步翻车的原因还是对mqtt协议理解的不够充分,错把16进制字符串当做16进制在发送。做物联网项目,会经常在16进制数组和16进制字符串之间来回切换,一定要弄清楚,不然像这样因为这样一个小问题而耗费大量时间去处理、测试就很不划算了。

下面附上自己编写的基于paho的mqtt使用帮助类:
  • import android.content.Context;
  • import android.util.Log;
  • import org.eclipse.paho.android.service.MqttAndroidClient;
  • import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
  • import org.eclipse.paho.client.mqttv3.MqttCallback;
  • import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  • import org.eclipse.paho.client.mqttv3.MqttException;
  • import org.eclipse.paho.client.mqttv3.MqttMessage;
  • import java.util.ArrayList;
  • /**
  • * Created by CHNhawk on 2017/3/7
  • * 重庆工商大学 物联网工程 熊廷宇
  • * update on 2017/5/16
  • * 添加了消息队列转发以及MsgHandler接口
  • * 支持更好地用于多Activity
  • *
  • */
  • public class MqttUtil {
  •     //region 网络设定
  •     private static String serverIP = "183.230.40.39";               // 服务器ip
  •     private static int serverPort = 6002;                           // 服务器端口
  •     private static String deviceId = "填设备id";                            // 终端id
  •     private static String userId = "填应用id";                           // 服务器登陆名
  •     private static String password = "填鉴权或apiKey";                         // 服务器登陆密码
  •     //endregion
  •     //region 消息类型
  •     public static final int MQTT_CONNECTED = 1000;                 // mqtt连接成功
  •     public static final int MQTT_CONNECTFAIL = 1001;               // mqtt连接失败
  •     public static final int MQTT_DISCONNECT = 1002;                // mqtt断开连接
  •     public static final int MQTT_SUBSCRIBED = 1010;                // 订阅成功
  •     public static final int MQTT_SUBSCRIBEFAIL = 1011;             // 订阅失败
  •     public static final int MQTT_MSG_TEST = 2001;                  // 接收到TEST消息
  •     public static final int MQTT_PUBLISHED = 2010;                 // 发布成功
  •     public static final int MQTT_PUBLISHFAIL = 2011;               // 发布失败
  •     //endregion
  •     //region MQTT客户端服务
  •     private static MqttUtil instance;                               // 单例对象
  •     private MqttAndroidClient mqttClient;                           // mqtt客户端
  •     private MqttConnectOptions option;                              // mqtt设置
  •     private MqttCallback clientCallback;                            // 客户端回调
  •     private Context mContext;                                       // 上下文
  •     private ArrayList<MsgHandler> listenerList = new ArrayList<>(); // 消息接收者
  •     //endregion
  •     //封闭构造函数
  •     private MqttUtil(){}
  •     //获取单例
  •     public static MqttUtil getInstance(){
  •         if(instance == null){
  •             instance = new MqttUtil();
  •             //deviceId =  android.os.Build.SERIAL + (int)(Math.random() * 10);
  •         }
  •         return instance;
  •     }
  •     //初始化连接
  •     public void initMqtt(Context context) {
  •         mContext = context;
  •         connect();
  •     }
  •     //重连
  •     public void reConnect() {
  •         //尝试重连
  •         connect();
  •     }
  •     //发布消息
  •     public void publish(String topic, String payload) {
  •         MqttMessage msg = new MqttMessage();
  •         msg.setPayload(payload.getBytes());
  •         msg.setQos(1);
  •         if (mqttClient != null){
  •             if(mqttClient.isConnected()) {
  •                 try {
  •                     mqttClient.publish(topic, msg);
  •                     DispachEvent(MQTT_PUBLISHED);
  •                 } catch (MqttException e) {
  •                     //发布失败
  •                     Log.e("mqtt", "发布失败" + e);
  •                     DispachEvent(MQTT_PUBLISHFAIL);
  •                 }
  •             } else {
  •                 Log.e("mqtt", "网络未连接 - 尝试重新连接" );
  •                 connect();
  •                 DispachEvent(MQTT_PUBLISHFAIL);
  •             }
  •         } else {
  •             Log.e("mqtt", "客户端初始化失败" );
  •             DispachEvent(MQTT_PUBLISHFAIL);
  •         }
  •     }
  •     //订阅主题
  •     public void subscribe(String topic){
  •         try {
  •             //订阅消息
  •             mqttClient.subscribe(topic, 0);
  •             DispachEvent(MQTT_SUBSCRIBED);
  •         } catch (MqttException e) {
  •             DispachEvent(MQTT_SUBSCRIBEFAIL);
  •             Log.e("mqtt", "订阅错误:" + e);
  •         }
  •     }
  •     //返回是否连接
  •     public boolean isConnected(){
  •         return mqttClient.isConnected();
  •     }
  •     //获取本机deviceId
  •     public String getDeviceId(){
  •         return deviceId;
  •     }
  •     //连接
  •     private void connect() {
  •         if (mqttClient == null) {
  •             option = new MqttConnectOptions();
  •             option.setUserName(userId);
  •             option.setPassword(password.toCharArray());
  •             option.setCleanSession(false);
  •             //设置回调
  •             clientCallback = new MqttCallback() {
  •                 @Override
  •                 public void connectionLost(Throwable cause) {
  •                     //断开连接
  •                     DispachEvent(MQTT_DISCONNECT);
  •                 }
  •                 @Override
  •                 public void messageArrived(String topic, MqttMessage message) throws Exception {
  •                     //接收到消息
  •                     Log.v("mqtt", "接收到信息:" + topic);
  •                     DispachMessage(topic, message);
  •                 }
  •                 @Override
  •                 public void deliveryComplete(IMqttDeliveryToken token) {
  •                     //publish成功后调用
  •                     Log.v("mqtt","发送成功");
  •                 }
  •             };
  •             try {
  •                 mqttClient = new MqttAndroidClient(mContext, ("tcp://" + serverIP + ":" + serverPort), deviceId);
  •                 mqttClient.setCallback(clientCallback);
  •             } catch (Exception e) {
  •                 Log.e("mqtt", "启动服务错误:" + e);
  •             }
  •         }
  •         if (!mqttClient.isConnected()) {
  •             //匿名连接线程
  •             new Thread(new Runnable() {
  •                 @Override
  •                 public void run() {
  •                     try {
  •                         int count = 0;  //重试次数
  •                         while(count < 5 && !mqttClient.isConnected()) {
  •                             mqttClient.connect(option);
  •                             Thread.sleep(1000);
  •                             count ++;
  •                         }
  •                         //连接成功
  •                         if(mqttClient.isConnected()) {
  •                             DispachEvent(MQTT_CONNECTED);
  •                             Log.v("mqtt", "连接成功");
  •                         } else {
  •                             Log.e("mqtt", "连接网络错误");
  •                             DispachEvent(MQTT_CONNECTFAIL);
  •                         }
  •                     } catch (Exception e) {
  •                         Log.e("mqtt", "连接错误:" + e);
  •                         DispachEvent(MQTT_CONNECTFAIL);
  •                     }
  •                 }
  •             }).start();
  •         }
  •     }
  •     //region 消息转发部分
  •     //添加接收者
  •     public void addListener(MsgHandler msgHandler){
  •         if(!listenerList.contains(msgHandler)) {
  •             listenerList.add(msgHandler);
  •         }
  •     }
  •     //移除接收者
  •     public void removeListener(MsgHandler msgHandler){
  •         listenerList.remove(msgHandler);
  •     }
  •     //移除所有接收者
  •     public void removeAll(){
  •         listenerList.clear();
  •     }
  •     //发送消息
  •     public void DispachMessage(String type, Object data){
  •         if(listenerList.isEmpty()) {
  •             Log.v("mqtt", "没有消息接收者:" + type);
  •             return;
  •         }
  •         Log.v("mqtt", "发送消息:" + type);
  •         for (MsgHandler msgHandler : listenerList)
  •         {
  •             msgHandler.onMessage(type, data);
  •         }
  •     }
  •     //发送事件
  •     public void DispachEvent(int event){
  •         if(listenerList.isEmpty()) {
  •             Log.v("mqtt", "没有消息接收者:");
  •             return;
  •         }
  •         Log.v("mqtt", "派发事件:" + event);
  •         for (MsgHandler msgHandler : listenerList)
  •         {
  •             msgHandler.onEvent(event);
  •         }
  •     }
  •     //endregion
  • }


复制代码
消息接口如下
  • /**
  • * Created by CHNhawk on 2017/5/16.
  • * 重庆工商大学 物联网工程 熊廷宇
  • * 自定义的消息接口
  • */
  • public interface MsgHandler
  • {
  •     /**
  •      * 消息
  •      * @param type 消息类型
  •      * @param data 数据
  •      */
  •     void onMessage(String type, Object data);
  •     /**
  •      * 事件
  •      * @param event x
  •      */
  •     void onEvent(int event);
  • }


复制代码
这样就可以很方便的使用mqtt通过Android设备连接上OneNet服务器了。

二、C#使用MQTT协议接入OneNet平台
讲完了Android,再来简单介绍一下其他平台或者应用接入OneNet平台的方式吧。
网上虽然有大量关于mqtt的文章,不过我再复述一遍,增加一下自己的印象也是极好的。
这里说一句,C#用来编写.NET,同时也可以用在Unity引擎中,再导出到各个平台,C#也可以用来写Android程序,哈哈,也是非常方便的!

这里附上一张我使用Unity+mqtt实现的具有房间聊天功能的安卓对战小游戏,游戏可以同步玩家数据,实现登陆、同一房间聊天的功能。为了练习,网络通信部分我完全使用的mqtt协议,测试联机的效果还不错。
1.png (0 Bytes, 下载次数: 0)
下载附件
自制的mqtt小游戏mcio
昨天 22:02 上传




我们知道untiy用于AR以及VR项目也是很火的,而物联网+AR/VR这个组合也是很火爆,下面我就介绍一下unity怎么使用C#中的mqtt连接上OneNet平台。纯C#的.NET应用同理~

首先需要的mqtt库,M2Mqtt
这个网上可以下载到源码,在unity中我们需要把它导出为dll文件来调用。
注意,导出的时候记得选择3.5版的框架,否则Unity不兼容。
然后将导出的dll文件移动到项目的\Assets\Plugins文件夹下,注意这里只能放在这个文件夹下,不然没有用。
然后我们就可以愉快地使用m2mqtt了。
1. 导入包
using uPLibrary.Networking.M2Mqtt;
usinguPLibrary.Networking.M2Mqtt.Messages;
2. 创建mqtt客户端对象并且进行连接
MqttClient mqttClient;
//设置ip、端口
mqttClient = new MqttClient(“183.230.40.39”,6002, false, null);      
mqttClient. Connect(clientId, username,password);
//订阅主题
mqttClient.Subscribe(new string[] {topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });
//取消订阅
mqttClient.Unsubscribe(new string[] {topic });
//发布信息
mqttClient.Publish(topic,Encoding.UTF8.GetBytes(payload));


下面附上自己编写的辅助类:
  • using UnityEngine;
  • using System.Collections;
  • using System.Net;
  • using System.Text;
  • using uPLibrary.Networking.M2Mqtt;
  • using uPLibrary.Networking.M2Mqtt.Messages;
  • using System;
  • // mqtt网络通讯模块
  • public class MqttNetwork
  • {
  •     private static MqttNetwork instance; //单例的网络连接对象
  •     private MqttClient mqttClient; //mqtt客户端
  •     private EventManager manager; //事件管理
  •     private MqttNetwork()
  •     {
  •         //构造函数
  •         manager = EventManager.GetInstance();
  •         //initNetWork();
  •     }
  •     public static MqttNetwork GetInstance()
  •     {
  •         //获取实例
  •         if (instance == null)
  •         {
  •             instance = new MqttNetwork();
  •         }
  •         return instance;
  •     }
  •     public void initNetWork()
  •     {
  •         mqttClient = new MqttClient(“183.230.40.39”, 6002, false, null);
  •         //注册服务器返回信息接受函数
  •         mqttClient.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
  •         //注册服务器断开函数
  •         mqttClient.ConnectionClosed += client_ConnectionClosed;
  •     }
  •     public void subsribeTopics(string topic)
  •     {  //订阅主题
  •         mqttClient.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });
  •     }
  •     public void unSubsribeTopics(string topic)
  •     {  //取消订阅主题
  •         if (mqttClient != null)
  •         {
  •             mqttClient.Unsubscribe(new string[] { topic });
  •         }
  •     }
  •     public void publish(string topic, string payload)
  •     {//发布消息
  •         if (mqttClient != null)
  •         {
  •             mqttClient.Publish(topic, Encoding.UTF8.GetBytes(payload));
  •         }
  •     }
  •     public void connect()
  •     {
  •         if (mqttClient == null)
  •         {
  •             //初始化mqtt客户端
  •             initNetWork();
  •         }
  •         if (!mqttClient.IsConnected)
  •         {
  •             //若未连接则进行连接
  •             try
  •             {
  •                 mqttClient.Connect(“clientId”);//客户端ID
  •                 if (mqttClient.IsConnected)
  •                 {
  •                     //订阅需要的主题
  •                     subsribeTopics("test")
  •                 }
  •             }
  •             catch (System.Exception e)
  •             {
  •                 Debug.Log("连接出错 : " + e);
  •                 manager.DispachEvent("mqtt-disconnected", null);
  •             }
  •         }
  •     }
  •     public void disConnect()
  •     {
  •         //关闭连接
  •         if (mqttClient != null && mqttClient.IsConnected)
  •         {
  •             mqttClient.Disconnect();
  •         }
  •     }
  •     public bool isNetConnected()
  •     {
  •         if (mqttClient == null)
  •         {
  •             return false;
  •         }
  •         return mqttClient.IsConnected;
  •     }
  •     void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs package)
  •     {
  •         //过滤并处理接收到的消息
  •         string topic = package.Topic;
  •         string msg = System.Text.Encoding.UTF8.GetString(package.Message);
  • //To do sth.
  •     }
  •     private void client_ConnectionClosed(object sender, EventArgs e)
  •     {
  •         Debug.Log("[系统] : 服务器已断开");
  •         manager.DispachEvent("mqtt-disconnected", null);
  •     }
  • }


复制代码
用到的一个事件转发接口
  • public interface EventHandler
  • {
  •     /// <summary>
  •     ///
  •     /// </summary>
  •     /// <param name="type"></param>
  •     /// <param name="data"></param>
  •     void OnEvent(string type, object data);
  • }


复制代码
事件转发的实现
  • using UnityEngine;
  • using System.Collections.Generic;
  • //事件管理类
  • public class EventManager
  • {
  •     private static EventManager instance; // 同样使用单例模式
  •     private Dictionary<string, List<EventHandler>> dicHandler; //事件句柄集合
  •     private EventManager()
  •     {
  •         //内部的构造函数,不对外开放
  •         dicHandler = new Dictionary<string, List<EventHandler>>();
  •     }
  •     public static EventManager GetInstance()
  •     {
  •         if (instance == null)
  •         {
  •             instance = new EventManager();
  •         }
  •         return instance;
  •     }
  •     /// <summary>
  •     /// 注册事件监听
  •     /// </summary>
  •     /// <param name="type">监听类型</param>
  •     /// <param name="listher">监听对象</param>
  •     public void AddEventListener(string type, EventHandler listher)
  •     {
  •         if (!dicHandler.ContainsKey(type))
  •         {
  •             dicHandler.Add(type, new List<EventHandler>());
  •         }
  •         dicHandler[type].Add(listher);
  •     }
  •     /// <summary>
  •     /// 移除对type的所有监听
  •     /// </summary>
  •     /// <param name="type"></param>
  •     public void RemoveEventListener(string type)
  •     {
  •         if (dicHandler.ContainsKey(type))
  •         {
  •             dicHandler.Remove(type);
  •         }
  •     }
  •     /// <summary>
  •     /// 移除监听者的所有监听
  •     /// </summary>
  •     /// <param name="listener">监听者</param>
  •     public void RemoveEventListener(EventHandler listener)
  •     {
  •         foreach (var item in dicHandler)
  •         {
  •             if (item.Value.Contains(listener))
  •             {
  •                 item.Value.Remove(listener);
  •             }
  •         }
  •     }
  •     /// <summary>
  •     /// 清空所有监听事件
  •     /// </summary>
  •     public void ClearEventListener()
  •     {
  •         Debug.Log("清空对所有所有所有事件的监听");
  •         if (dicHandler != null)
  •         {
  •             dicHandler.Clear();
  •         }
  •     }
  •     /// <summary>
  •     /// 派发事件
  •     /// </summary>
  •     /// <param name="type">事件类型</param>
  •     /// <param name="data">事件传达的数据</param>
  •     public void DispachEvent(string type, object data)
  •     {
  •         if (!dicHandler.ContainsKey(type))
  •         {
  •             Debug.Log("未添加任何" + type + " 类型的监听器");
  •             return;
  •         }
  •         //Debug.Log("派发事件:" + type);
  •         List<EventHandler> list = dicHandler[type];
  •         for (int i = 0; i < list.Count; i++)
  •         {
  •             list.OnEvent(type, data);
  •         }
  •     }
  • }


复制代码
这样,在自己的接收类实现OnEvent方法就行

三、NodeJS使用MQTT协议接入OneNet平台
NodeJs是近年比较火热的一个服务器js环境,可以在Linux系统上运行,配合mqtt实现一些功能,效率挺不错的。这里提供两个mqtt+nodejs的思路:
其一,用于树莓派等基于Linux系统的物联网设备连接至OneNet平台。
其二,用于自己的服务器配置mqtt转发服务以及一些自动处理服务,这个和OneNet平台就没有关系了,不过可以用在物联网相关项目上,让服务器自动完成一些数据的处理,我在这里也简单提一下。

首先讲其一:
树莓派一直是硬件爱好者比较喜欢的玩具设备(雾),也可以用树莓派来做一些物联网相关的东西,那么运行Linux系统的树莓派使用nodejs来连接上OneNet平台也是很不错的。不久前我尝试在树莓派上通过Python获取传感器数据并保存在本地文件中,然后再通过NodeJs上传到服务器,这样就可以在手机app上同步获取到传感器数据了。
废话不多说,首先下载配置nodejs+npm环境,这一块我不过多叙述,网上有太多太多的教程和步骤详细截图,本部分重点讲mqtt模块。
一般来说使用
apt-get install nodejs
apt-get install npm
这两个命令就可以安装,安装完毕可以输入node -v查看版本。
无法安装的可以先对apt update/upgrade一下,实在不行就或者直接wget下载nodejs解压。
要运行mqtt模块,nodejs版本不能太低,版本低的话可以安装n模块来升级nodejs,下面的内容假定你已经配置好了合适的nodejs环境。

nodejs中提供mqtt服务的有很多模块,我们就用mqtt这个模块,简单粗暴。
使用npm install mqtt命令安装mqtt模块
安装完成后,便可以实现一个mqtt的客户端连接至服务器的脚本了。


附上我自己编写的一个测试脚本:
  • /// nodejs上传数据测试脚本
  • /// 使用MQTT协议
  • /// author CHNhawk (重庆工商大学 15物联网工程 熊廷宇)
  • var mqtt = require('mqtt');                         // 导入mqttCilent模块
  • var ip = '183.230.40.39';                           // oneNet服务器ip
  • var port = '6002';
  • var deviceId = '4734262';                           // 设备号
  • var userName = '81695';                             // 用户名-产品id
  • var passWord = '12345678';                          // 密码-鉴权
  • // 创建连接
  • var client = mqtt.connect('mqtt://' + ip + ':' + port, {
  •     username: userName,
  •     password: passWord,
  •     clientId: deviceId
  • });
  • var isConnected = false;        // 是否连上服务器的标识
  • console.log('Connecting to ' ,ip + ':' + port + '...');
  • client.on('connect', function() {
  •     // 连接成功回调函数
  •     console.log('Server is connected to', ip + ':' + port);
  •     isConnected = true;
  •     // 订阅主题 - 指令接收topic
  •     client.subscribe('ctbu_cmd/' + deviceId);
  •     console.log('Subscribe', 'ctbu_cmd/' + deviceId);
  • });
  • client.on('message', function(topic, message) {
  •     // 接收信息回调函数
  •     console.log(topic.toString() + ' - ' + message.toString());
  •     var payloadMsg = message.toString();
  •     switch (topic) {
  •         case 'ctbu_cmd/' + deviceId:
  •             console.log('Received command: ', payloadMsg);
  •             /////////////
  •             //以下处理命令
  •             /////////////
  •             break;
  •     }
  • });
  • client.on('offline', function(error){
  •     // 掉线回调函数
  •     isConnected = false;
  •     console.log('Connecting failed cz:', error);
  • });
  • client.on('error', function(){
  •     // 连接错误回调函数
  •     isConnected = false;
  •     console.log('Lost connecting :', ip + ':' + port);
  • });
  • setInterval(function() {
  •     // 设置定时上传数据
  •     if(isConnected){
  •         var time = getTime();
  •         var msg = "value" + time;
  •         var payload = buf.join("");
  •         // 向topic发布数据
  •         client.publish('ctbu_data', payload);
  •     } else {
  •         console.log('Offline state!');
  •     }
  • }, 5000);
  • //获取当前时间
  • function getTime() {
  •     return new Date().toLocaleString();
  • }


复制代码
其二,在Linux服务器上搭建一个mqtt转发器
那么假定你已经配置好了nodejs。
使用npm install mosca 命令安装mosca模块
mosca是一个能够创建mqtt服务器的一个模块,使用起来也很方便。

我在之前提到的小游戏就是用这个模块加上mongodb数据库搭建的服务器,比较简单,效果也还好,700多行代码就搞定了。对于我那种高并发状态的游戏十几个人一起玩玩还是妥妥的,没做过压力测试,我的服务器的买的阿里云的学生专用版,9.9一个月那种,不敢奢求太多。不过用于物联网项目,还是够用。


也附上我自己编写的一个简单的测试脚本
  • /// nodejs服务器测试脚本
  • /// 使用MQTT协议
  • /// author CHNhawk (重庆工商大学 15物联网工程 熊廷宇)
  • var mosca = require('mosca');          // 导入mosca模块
  • // 创建mqtt服务器对象并设置开放端口
  • var MqttServer = new mosca.Server({
  •     port: 1883
  • });
  • MqttServer.on('clientConnected', function(client){
  •         // 设备连接回调函数
  •     console.log('client connected', client.id);
  • });
  • MqttServer.on('published', function(packet, client) {
  •         // 发布topic回调函数
  •     var topic = packet.topic;
  •     console.log('message-arrived--->','topic :'+topic+',message :'+ packet.payload.toString());
  • });
  • MqttServer.on('ready', function(){
  •         // 成功运行服务
  •     console.log('mqtt is running...');
  • });


复制代码
后记:这篇文章是参加实训需要完成的任务之一,之前很少在网上发布类似的帖子,虽然平时倒是经常写点文档做些记录,不过都是写给自己看的。由于本人技术水平及文字功底有限,如果大家有什么没有看明白或者质疑的地方,非常欢迎各位给我拍砖。希望与大家共同进步,最后祝OneNet平台越办越好!
在网上编辑帖子感觉比写文档还要麻烦一点,还不是很习惯....

回复

使用道具 举报

发表于 2017-7-27 14:02:33 | 显示全部楼层
帮顶                       
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 注册

本版积分规则

联系站长|手机版|野火电子官网|野火淘宝店铺|野火电子论坛 ( 粤ICP备14069197号 ) 大学生ARM嵌入式2群

GMT+8, 2025-1-29 06:01 , Processed in 0.049461 second(s), 23 queries , Gzip On.

Powered by Discuz! X3.4

Copyright © 2001-2021, Tencent Cloud.

快速回复 返回顶部 返回列表