本帖最后由 ChenChenRui 于 2023-11-30 22:20 编辑
大规模预训练语言模型(Large Language Model, LLM)的出现推动了机器人领域的发展。这些模型通过在大规模文本数据上进行预训练,可以学习到丰富的语言知识和语义表示。然后,这些模型可以通过微调来适应特定的任务或领域。自然语言是一种大众掌握的技能。通过使用自然语言与计算机交互,降低了新手的使用难度,直观有效,降低了学习成本。本项目以机械臂为大语言模型的身体,实现具身智能。 1. 什么是具身智能? 具身智能通过在物理世界和数字世界的学习和进化,达到理解世界、互动交互并完成任务的目标。 具身智能是由“本体”和“智能体”耦合而成且能够在复杂环境中执行任务的智能系统。一般认为,具身智能具有如下的几个核心要素: 第一是本体,作为实际的执行者,是在物理或者虚拟世界进行感知和任务执行的机构。 本体通常是具有物理实体的机器人,可以有多种形态。本体的能力边界会限制智能体的能力发挥,本体具备环境感知能力、运动能力和操作执行能力,是连接数字世界和物理世界的载体,本项目采用三轴步进机械臂。 具身智能的第二个要素是智能体(EmbodiedAgents),是具身于本体之上的智能核心,负责感知、理解、决策、控制等的核心工作。 智能体可以感知复杂环境,理解环境所包含的语义信息,能够和环境进行交互;可以理解具体任务,并且根据环境的变化和目标状态做出决策,进而控制本体完成任务。 本项目的方案 硬件方面: 鲁班猫4、usb摄像头、usb音频模块、机械臂及其控制器 机械臂为三轴步进机械臂。控制器为stm32,接收串口指令完成运动学正逆解以及圆弧直线插补等算法。 上位机为鲁班猫4通过串口与stm32连接,完成对机械臂的控制以及连接usb摄像头和usb音频获取环境。 软件方面: 基于ChatGPT的强大自然语言理解和推理能力,生成控制机器人的相关代码并通过串口向机械臂发送指令,利用usb摄像头获取环境和物体信息,以及通过usb音频模块获取人类指令并向人类反馈状态。 物体定位:采用opencv OpenCV是一个开源的计算机视觉和机器学习软件库,由英特尔公司发起并开发。它提供了丰富的图像处理和计算机视觉的功能,涵盖了从基本的图像操作、图像处理到目标检测、人脸识别等高级功能。OpenCV广泛应用于各种领域,包括医学图像处理、安防监控、自动驾驶、机器人技术等。 本项目采用将采集到的图片先进行仿射变换完成投影到xy平面后再转换到hsv颜色空间后寻找物体轮廓和定位。 语言转文字为调用讯飞api - import base64
- import hashlib
- import hmac
- import json
- import os
- import time
- import requests
- lfasr_host = 'http://raasr.xfyun.cn/api'
- # 请求的接口名
- api_prepare = '/prepare'
- api_upload = '/upload'
- api_merge = '/merge'
- api_get_progress = '/getProgress'
- api_get_result = '/getResult'
- # 文件分片大小10M
- file_piece_sice = 10485760
- # ——————————————————转写可配置参数————————————————
- # 参数可在官网界面(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看,根据需求可自行在gene_params方法里添加修改
- # 转写类型
- lfasr_type = 0
- # 是否开启分词
- has_participle = 'false'
- has_seperate = 'true'
- # 多候选词个数
- max_alternatives = 0
- # 子用户标识
- suid = ''
- class SliceIdGenerator:
- """slice id生成器"""
- def __init__(self):
- self.__ch = 'aaaaaaaaa`'
- def getNextSliceId(self):
- ch = self.__ch
- j = len(ch) - 1
- while j >= 0:
- cj = ch[j]
- if cj != 'z':
- ch = ch[:j] + chr(ord(cj) + 1) + ch[j + 1:]
- break
- else:
- ch = ch[:j] + 'a' + ch[j + 1:]
- j = j - 1
- self.__ch = ch
- return self.__ch
- class RequestApi(object):
- def __init__(self, appid, secret_key, upload_file_path):
- self.appid = appid
- self.secret_key = secret_key
- self.upload_file_path = upload_file_path
- # 根据不同的apiname生成不同的参数,本示例中未使用全部参数您可在官网(https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html)查看后选择适合业务场景的进行更换
- def gene_params(self, apiname, taskid=None, slice_id=None):
- appid = self.appid
- secret_key = self.secret_key
- upload_file_path = self.upload_file_path
- ts = str(int(time.time()))
- m2 = hashlib.md5()
- m2.update((appid + ts).encode('utf-8'))
- md5 = m2.hexdigest()
- md5 = bytes(md5, encoding='utf-8')
- # 以secret_key为key, 上面的md5为msg, 使用hashlib.sha1加密结果为signa
- signa = hmac.new(secret_key.encode('utf-8'), md5, hashlib.sha1).digest()
- signa = base64.b64encode(signa)
- signa = str(signa, 'utf-8')
- file_len = os.path.getsize(upload_file_path)
- file_name = os.path.basename(upload_file_path)
- param_dict = {}
- if apiname == api_prepare:
- # slice_num是指分片数量,如果您使用的音频都是较短音频也可以不分片,直接将slice_num指定为1即可
- slice_num = int(file_len / file_piece_sice) + (0 if (file_len % file_piece_sice == 0) else 1)
- param_dict['app_id'] = appid
- param_dict['signa'] = signa
- param_dict['ts'] = ts
- param_dict['file_len'] = str(file_len)
- param_dict['file_name'] = file_name
- param_dict['slice_num'] = str(slice_num)
- elif apiname == api_upload:
- param_dict['app_id'] = appid
- param_dict['signa'] = signa
- param_dict['ts'] = ts
- param_dict['task_id'] = taskid
- param_dict['slice_id'] = slice_id
- elif apiname == api_merge:
- param_dict['app_id'] = appid
- param_dict['signa'] = signa
- param_dict['ts'] = ts
- param_dict['task_id'] = taskid
- param_dict['file_name'] = file_name
- elif apiname == api_get_progress or apiname == api_get_result:
- param_dict['app_id'] = appid
- param_dict['signa'] = signa
- param_dict['ts'] = ts
- param_dict['task_id'] = taskid
- return param_dict
- # 请求和结果解析,结果中各个字段的含义可参考:https://doc.xfyun.cn/rest_api/%E8%AF%AD%E9%9F%B3%E8%BD%AC%E5%86%99.html
- def gene_request(self, apiname, data, files=None, headers=None):
- response = requests.post(lfasr_host + apiname, data=data, files=files, headers=headers)
- result = json.loads(response.text)
- if result["ok"] == 0:
- print("{} success:".format(apiname) + str(result))
- return result
- else:
- print("{} error:".format(apiname) + str(result))
- exit(0)
- return result
- # 预处理
- def prepare_request(self):
- return self.gene_request(apiname=api_prepare,
- data=self.gene_params(api_prepare))
- # 上传
- def upload_request(self, taskid, upload_file_path):
- file_object = open(upload_file_path, 'rb')
- try:
- index = 1
- sig = SliceIdGenerator()
- while True:
- content = file_object.read(file_piece_sice)
- if not content or len(content) == 0:
- break
- files = {
- "filename": self.gene_params(api_upload).get("slice_id"),
- "content": content
- }
- response = self.gene_request(api_upload,
- data=self.gene_params(api_upload, taskid=taskid,
- slice_id=sig.getNextSliceId()),
- files=files)
- if response.get('ok') != 0:
- # 上传分片失败
- print('upload slice fail, response: ' + str(response))
- return False
- print('upload slice ' + str(index) + ' success')
- index += 1
- finally:
- 'file index:' + str(file_object.tell())
- file_object.close()
- return True
- # 合并
- def merge_request(self, taskid):
- return self.gene_request(api_merge, data=self.gene_params(api_merge, taskid=taskid))
- # 获取进度
- def get_progress_request(self, taskid):
- return self.gene_request(api_get_progress, data=self.gene_params(api_get_progress, taskid=taskid))
- # 获取结果
- def get_result_request(self, taskid):
- return self.gene_request(api_get_result, data=self.gene_params(api_get_result, taskid=taskid))
- def all_api_request(self):
- # 1. 预处理
- pre_result = self.prepare_request()
- taskid = pre_result["data"]
- # 2 . 分片上传
- self.upload_request(taskid=taskid, upload_file_path=self.upload_file_path)
- # 3 . 文件合并
- self.merge_request(taskid=taskid)
- # 4 . 获取任务进度
- while True:
- # 每隔20秒获取一次任务进度
- progress = self.get_progress_request(taskid)
- progress_dic = progress
- if progress_dic['err_no'] != 0 and progress_dic['err_no'] != 26605:
- print('task error: ' + progress_dic['failed'])
- return
- else:
- data = progress_dic['data']
- task_status = json.loads(data)
- if task_status['status'] == 9:
- print('task ' + taskid + ' finished')
- break
- print('The task ' + taskid + ' is in processing, task status: ' + str(data))
- # 每次获取进度间隔20S
- time.sleep(20)
- # 5 . 获取结果
- res = self.get_result_request(taskid=taskid)
- # 保存到txt
- data = res['data'].strip('[').strip(']')
- for d in eval(data):
- a = d['onebest']
- fw.write(a + '\n') # 逐行保存
- # 注意:如果出现requests模块报错:"NoneType" object has no attribute 'read', 请尝试将requests模块更新到2.20.0或以上版本(本demo测试版本为2.20.0)
- # 输入讯飞开放平台的appid,secret_key和待转写的文件路径
- if __name__ == '__main__':
- # 单个语音文件解析并保存
- root = "D:/xxxx/"
- filename = "xxxx" # mp3文件名,需修改
- speechpath = root +filename
- save_path = speechpath + '.txt' # txt与mp3同名,并保存在同一路径下
- fw = open(save_path, 'w')
- # 解析语音并保存到txt
- api = RequestApi(appid="xxx", secret_key="xxx", upload_file_path=speechpath+".mp3")
- api.all_api_request()
复制代码因为此项目还在参加比赛,所以未公开整个工程和大语言模型部分。等后续比赛结束整理资料后再开源。
|