小学生
最后登录1970-1-1
在线时间 小时
注册时间2023-5-18
|
项目材料:
鲁班猫卡片电脑
检测甲醛传感器,ZE08-CH2O
鲁班猫ubuntu20.04固件
连接甲醛传感器:
选择5V的电源供电并选用UART4作为通讯口
在/boot/uEnv/uEnvLubanCatZN.txt取消dtoverlay=/dtb/overlay/rk356x-lubancat-uart4-m1-overlay.dtbo,dtoverlay=/dtb/overlay/rk356x-lubancat-uart8-m0-overlay.dtbo,dtoverlay=/dtb/overlay/rk356x-lubancat-uart9-m1-overlay.dtbo注释。重启鲁班猫电脑卡片,可以在/dev/目录下找到ttyS4、ttyS8、ttyS9。
硬件连接如下:
安装软件环境
1. 增加emqx的apt源
cat@lubancat:~/serial$ curl -s https://assets.emqx.com/scripts/install-emqx-deb.sh | sudo bash
2. 安装emqx
cat@lubancat:~/serial$ sudo apt install emqx
3. 启动emqx
cat@lubancat:~/serial$ sudo systemctl start emqx.service
4. 安装pyserial
sudo pip3 install pyserial
5. 安装crc
pip3 install crc==0.9.5
- class MyMqtt(object):
- def __init__(self, ip, username):
- self.client = mqtt.Client()
- self.client.on_connect = self.on_connect
- self.client.on_message = self.on_message
- self.client.on_disconnect = self.on_disconnect
- self.client.connect(ip, 1883, 60)
- self.client.username_pw_set(username)
- self.client.loop_start()
- self.onConnect = False
- def on_connect(self, client, userdata, flags, rc):
- print("Connected with result code "+str(rc))
- if rc == 0:
- self.onConnect = True
- def on_disconnect(self, client, userdata, rc):
- if rc != 0:
- self.onConnect = False
- def on_message(self, client, userdata, msg):
- print(msg.topic+" "+str(msg.payload))
- def publish(self, value):
- if self.onConnect:
- msg = '{"formaldehyde": ' +str(value) + ' }'
- self.client.publish(topic="v1/devices/me/telemetry", payload=msg)
- class MySerial(object):
- def __init__(self):
- self.ser = None
- def get_port_list(self):
- port_list = list(serial.tools.list_ports.comports())
- return port_list
- def open_serial_port(self, port, baud):
- try:
- self.ser = serial.Serial(port, baud, timeout=0.3, interCharTimeout=0.05)
- except Exception as e:
- self.ser = None
- raise Exception(e)
- def close_serial_port(self):
- if self.ser == None:
- raise Exception("no selected serial")
- try:
- self.ser.close()
- self.ser = None
- except Exception as e:
- raise Exception(e)
- def read(self):
- if self.ser == None:
- raise Exception("no selected serial")
- try:
- time.sleep(0.001)
- return self.ser.read(self.ser.in_waiting)
- except Exception as e:
- raise Exception(e)
- class ze08_ch20(object):
- def __init__(self) -> None:
- self.tools = MyTools()
- self.serial = MySerial()
- self.interface = MyMqtt(MQTT_IP, MQTT_USERNAME)
- self.serial.open_serial_port('/dev/ttyS4', 9600)
- self.recv_thread_ = threading.Thread(target=self.recv_thread)
- self.recv_thread_.start()
- def calc_checksum(self, bytes_value):
- checksum = 0
- for byte_ in bytes_value:
- checksum += byte_
- checksum = checksum % (0xff + 1)
- if checksum == 0xff:
- return True
- return False
- def recv_thread(self):
- recv_buffer = bytes()
- while True:
- try:
- recv_buffer += self.serial.read()
- except Exception as e:
- print(str(e))
- if len(recv_buffer) < 9:
- time.sleep(0.01)
- continue
- else:
- if recv_buffer[0] != 0xff:
- recv_buffer = recv_buffer[1:]
- else:
- if len(recv_buffer) >= 9:
- phase_buffer = recv_buffer[:9]
- recv_buffer = recv_buffer[9:]
- if self.calc_checksum(phase_buffer) == True:
- int_formaldehyde = int.from_bytes(phase_buffer[4:6], byteorder='big')
- float_formaldehyde = int_formaldehyde /1000.0 *1.25
- self.interface.publish(float_formaldehyde)
- #print(float_formaldehyde)
- time.sleep(0.001)
复制代码
|
|