diff --git a/cfg_main.toml b/cfg_main.toml index 57c5d3a..c6eed12 100644 --- a/cfg_main.toml +++ b/cfg_main.toml @@ -3,25 +3,29 @@ logger_filename = "log/file_{time}.log" logger_format = "{time} {level} {message}" [task] -GetBlock_enable = false # 人员施救使能 -PutBlock_enable = false # 紧急转移使能 -GetBBall_enable = false # 整装上阵使能 -UpTower_enable = false # 通信抢修使能 -GetRBall_enable = false # 高空排险使能 -PutBBall_enable = false # 派发物资使能 -PutHanoi_enable = false # 物资盘点使能 -MoveArea_enable = false # 应急避险使能 +GetBlock_enable = true # 人员施救使能 +PutBlock_enable = true # 紧急转移使能 +GetBBall_enable = true # 整装上阵使能 +UpTower_enable = true # 通信抢修使能 +GetRBall_enable = true # 高空排险使能 +PutBBall_enable = true # 派发物资使能 +PutHanoi_enable = true # 物资盘点使能 +MoveArea_enable = true # 应急避险使能 KickAss_enable = true # 扫黑除暴使能 [find_counts] -GetBlock_counts = 5 # 人员施救计数 -PutBlock_counts = 5 # 紧急转移计数 -GetBBall_counts = 5 # 整装上阵计数 -UpTower_counts = 5 # 通信抢修计数 -GetRBall_counts = 5 # 高空排险计数 +GetBlock_counts = 20 # 人员施救计数 +PutBlock_counts = 20 # 紧急转移计数 +GetBBall_counts = 10 # 整装上阵计数 +UpTower_counts = 10 # 通信抢修计数 +GetRBall_counts = 2 # 高空排险计数 + + + + PutBBall_counts = 5 # 派发物资计数 -PutHanoi1_counts = 10 # 物资盘点计数 -PutHanoi2_counts = 4300 # 物资盘点计数 +PutHanoi1_counts = 8 # 物资盘点计数 +PutHanoi2_counts = 4200 # 物资盘点计数 MoveArea1_counts = 2 # 应急避险计数 MoveArea2_counts = 1700 # 应急避险第二阶段计数 KickAss_counts = 2 # 扫黑除暴计数 diff --git a/cfg_subtask.toml b/cfg_subtask.toml index bc53026..f147a7a 100644 --- a/cfg_subtask.toml +++ b/cfg_subtask.toml @@ -1,5 +1,5 @@ [get_block] - +first_block = "blue" [put_block] [get_bball] @@ -14,10 +14,11 @@ [put_hanoi2] pos_gap = 160 # 标定值,两个放置位置的标定距离 -pos_lp = 1 # 1\2\3 数字越小越靠近红色置物区 -pos_mp = 3 +pos_lp = 2 # 1\2\3 数字越小越靠近红色置物区 +pos_mp = 1 [move_area] +llm_enable = false # 大模型机器人 [kick_ass] pos_gap1 = 150 # 目标牌和第一个 person 之间的距离 diff --git a/main.py b/main.py index 3722d80..278e765 100644 --- a/main.py +++ b/main.py @@ -18,13 +18,14 @@ logger.add(cfg_main['debug']['logger_filename'], format=cfg_main['debug']['logge # 向任务队列添加任务 task_queue = queue.Queue() -task_queue.put(sb.task(sb.get_block, cfg_main['find_counts']['GetBlock_counts'], cfg_main['task']['GetBlock_enable'])) -task_queue.put(sb.task(sb.put_block, cfg_main['find_counts']['PutBlock_counts'], cfg_main['task']['GetBlock_enable'] and cfg_main['task']['PutBlock_enable'])) -task_queue.put(sb.task(sb.get_bball, cfg_main['find_counts']['GetBBall_counts'], cfg_main['task']['GetBBall_enable'])) -task_queue.put(sb.task(sb.up_tower, cfg_main['find_counts']['UpTower_counts'], cfg_main['task']['UpTower_enable'])) -task_queue.put(sb.task(sb.get_rball, cfg_main['find_counts']['GetRBall_counts'], cfg_main['task']['GetRBall_enable'])) +# task_queue.put(sb.task(sb.get_block, cfg_main['find_counts']['GetBlock_counts'], cfg_main['task']['GetBlock_enable'])) +# task_queue.put(sb.task(sb.get_block, cfg_main['find_counts']['GetBlock_counts'], cfg_main['task']['GetBlock_enable'])) +# task_queue.put(sb.task(sb.put_block, cfg_main['find_counts']['PutBlock_counts'], cfg_main['task']['GetBlock_enable'] and cfg_main['task']['PutBlock_enable'])) +# task_queue.put(sb.task(sb.get_bball, cfg_main['find_counts']['GetBBall_counts'], cfg_main['task']['GetBBall_enable'])) +# task_queue.put(sb.task(sb.up_tower, cfg_main['find_counts']['UpTower_counts'], cfg_main['task']['UpTower_enable'])) +# task_queue.put(sb.task(sb.get_rball, cfg_main['find_counts']['GetRBall_counts'], cfg_main['task']['GetRBall_enable'])) task_queue.put(sb.task(sb.put_bball, cfg_main['find_counts']['PutBBall_counts'], cfg_main['task']['GetBBall_enable'] and cfg_main['task']['PutBBall_enable'])) -task_queue.put(sb.task(sb.put_hanoi1, cfg_main['find_counts']['PutHanoi1_counts'], False)) # 无论是否进行任务,检测标识并转向都是必须进行的 +task_queue.put(sb.task(sb.put_hanoi1, cfg_main['find_counts']['PutHanoi1_counts'], True)) # 无论是否进行任务,检测标识并转向都是必须进行的 task_queue.put(sb.task(sb.put_hanoi2, cfg_main['find_counts']['PutHanoi2_counts'], cfg_main['task']['PutHanoi_enable'])) task_queue.put(sb.task(sb.move_area1, cfg_main['find_counts']['MoveArea1_counts'], cfg_main['task']['MoveArea_enable'])) task_queue.put(sb.task(sb.move_area2, cfg_main['find_counts']['MoveArea2_counts'], cfg_main['task']['MoveArea_enable'])) diff --git a/majtask.py b/majtask.py index f63dd18..099227c 100644 --- a/majtask.py +++ b/majtask.py @@ -1,3 +1,4 @@ + from simple_pid import PID import zmq import time @@ -69,7 +70,7 @@ class main_task(): error_abs = abs(self.lane_error) if error_abs < 10: self.pid1.set(0.7, 0, 0) - self.by_cmd.send_speed_x(12) + self.by_cmd.send_speed_x(13) # 12 elif error_abs > 45: self.by_cmd.send_speed_x(6) self.pid1.set(1.8, 0, 0) @@ -84,8 +85,9 @@ class main_task(): self.by_cmd.send_speed_x(11) # pid_out = self.pid1.get(self.lane_error*0.65) - pid_out = self.pid1.get(self.lane_error*0.75) + pid_out = self.pid1.get(self.lane_error*0.7) self.by_cmd.send_speed_omega(pid_out) + self.socket.send_string("") resp = self.socket.recv_pyobj() # logger.info(resp) diff --git a/subtask.py b/subtask.py index 2fde91d..a7d53d9 100644 --- a/subtask.py +++ b/subtask.py @@ -2,6 +2,7 @@ from enum import Enum from loguru import logger from utils import label_filter from utils import tlabel +from utils import LLM import utils import toml import zmq @@ -17,18 +18,125 @@ logger.info("load subtask config") by_cmd = None filter = None - +llm_bot = None +''' +description: main.py 里执行 引入全局变量 +param {*} _by_cmd 控制器对象 +return {*} +''' def import_obj(_by_cmd): global by_cmd global filter + global llm_bot by_cmd = _by_cmd filter = label_filter(socket) + if cfg['move_area']['llm_enable']: + llm_bot = LLM() def car_stop(): - pass -# run_on 是否继续向前行进 等待 error<5 后停止 如果不需要前进(在校准前车已经停下) -# 则不需要等待 error < 5 可以直接 aim_near + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) +def calibrate_right_new(label, offset, run = True, run_speed = 3.5): + ret, error = filter.aim_right(label) + while not ret: + ret, error = filter.aim_right(label) + + error += offset + if abs(error) > 10 and run: + if error > 0: + by_cmd.send_speed_x(-run_speed) + else: + by_cmd.send_speed_x(run_speed) + # 停的位置已经很接近目标,可以直接使用 distance 校准 + else: + error = error * 3 + if error > 0: + by_cmd.send_distance_x(-10, int(error)) + else: + by_cmd.send_distance_x(10, int(-error)) + return + while True: + ret, error = filter.aim_right(label) + while not ret: + ret, error = filter.aim_right(label) + error += offset + if ret: + if abs(error) <= 5: + car_stop() + logger.info("可以停车了") -def calibrate(label, offset, run_on = True, cali_speed = 10): + ret, error = filter.aim_right(label) + while not ret: + ret, error = filter.aim_right(label) + error += offset + if abs(error) > 8: + error = error * 3 + if error > 0: + by_cmd.send_distance_x(-10, int(error)) + else: + by_cmd.send_distance_x(10, int(-error)) + + break +''' +description: 校准新方法 找到后停止 然后根据 error 判断前后低速前进 error < 5 后直接停车 +param {*} label +param {*} offset +param {*} run +param {*} run_speed +return {*} +''' +def calibrate_new(label, offset, run = True, run_speed = 3.5): + ret, box = filter.get(label) + while not ret: + ret, box = filter.get(label) + + error = (box[0][2] + box[0][0] - 320) / 2 + offset + if abs(error) > 10 and run: + if error > 0: + by_cmd.send_speed_x(-run_speed) + else: + by_cmd.send_speed_x(run_speed) + # 停的位置已经很接近目标,可以直接使用 distance 校准 + else: + error = error * 3 + if error > 0: + by_cmd.send_distance_x(-10, int(error)) + else: + by_cmd.send_distance_x(10, int(-error)) + return + while True: + ret, box = filter.get(label) + while not ret: + ret, box = filter.get(label) + + error = (box[0][2] + box[0][0] - 320) / 2 + offset + if ret: + if abs(error) <= 5: + car_stop() + logger.info("可以停车了") + + ret, box = filter.get(label) + while not ret: + ret, box = filter.get(label) + error = (box[0][2] + box[0][0] - 320) / 2 + offset + if abs(error) > 8: + error = error * 3 + if error > 0: + by_cmd.send_distance_x(-10, int(error)) + else: + by_cmd.send_distance_x(10, int(-error)) + + break +''' +description: 与 calibrate 一样 只不过寻找最右侧的 整装上阵时使用 +param {*} label +param {*} offset +param {*} run_on +param {*} cali_speed +return {*} +''' +def calibrate_right(label, offset, run_on = True, cali_speed = 10): logger.info("开始校准") # go on if run_on: @@ -37,9 +145,9 @@ def calibrate(label, offset, run_on = True, cali_speed = 10): by_cmd.send_speed_omega(0) time.sleep(0.1) while True: - ret, error = filter.aim_near(label) + ret, error = filter.aim_right(label) while not ret: - ret, error = filter.aim_near(label) + ret, error = filter.aim_right(label) error += offset if run_on: if abs(error) < 5: @@ -51,9 +159,9 @@ def calibrate(label, offset, run_on = True, cali_speed = 10): else: break - ret, error = filter.aim_near(label) + ret, error = filter.aim_right(label) while not ret: - ret, error = filter.aim_near(label) + ret, error = filter.aim_right(label) error += offset time.sleep(1) logger.error(error) @@ -63,7 +171,6 @@ def calibrate(label, offset, run_on = True, cali_speed = 10): by_cmd.send_distance_x(-cali_speed, int(error*4)) else: by_cmd.send_distance_x(cali_speed, int(-error*4)) - logger.error(error) time.sleep(1) # stop for _ in range(3): @@ -71,6 +178,67 @@ def calibrate(label, offset, run_on = True, cali_speed = 10): time.sleep(0.2) by_cmd.send_speed_omega(0) pass +''' +description: 校准函数 传入对应的标签将车校准到最接近中心的标签的位置 +param {*} label +param {*} offset 摄像头 error offset +param {*} run_on 校准前是否继续向前行进以寻找 label +param {*} cali_speed send_distance_x 校准时移动的速度 +return {*} +''' +def calibrate(label, offset, run_on = True, cali_speed = 10): + logger.info("开始校准") + # go on + if run_on: + for _ in range(3): + by_cmd.send_speed_x(7) + by_cmd.send_speed_omega(0) + time.sleep(0.1) + while True: + # 寻找距离屏幕中心最近的标签 + ret, error = filter.aim_near(label) + while not ret: + # 注意这里一定要保证摄像头内有该目标 否则会无限循环 + ret, error = filter.aim_near(label) + error += offset + if run_on: + if abs(error) < 5: + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) + break + else: + break + # 代码运行到这里代表摄像头内一定有物体 label,此时再获取一次 error 用于校准 + ret, error = filter.aim_near(label) + while not ret: + ret, error = filter.aim_near(label) + error += offset + time.sleep(1) + logger.error(error) + if abs(error) > 5: + logger.info("校准中") + if abs(error) < 5: + error = error * 5 + logger.error("校准分段 error * 5") + elif abs(error) < 20: + error = error * 4 + logger.error("校准分段 error * 4") + else: + error = error * 3 + logger.error("校准分段 error * 3") + if error > 0: + by_cmd.send_distance_x(-cali_speed, int(error)) + else: + by_cmd.send_distance_x(cali_speed, int(-error)) + time.sleep(2) + # stop + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) + pass # 任务类 class task: @@ -152,84 +320,107 @@ class task_queuem(task): logger.info(f"[TaskM]# <<<<----------------------") return True - - - # 人员施救 class get_block(): def init(self): logger.info("人员施救初始化") + filter.switch_camera(1) + if cfg['get_block']['first_block'] == "blue": + self.target_label = tlabel.BBLOCK + self.another_label = tlabel.RBLOCK + cfg['get_block']['first_block'] = '' + else: + self.target_label = tlabel.RBLOCK + self.another_label = tlabel.BBLOCK def find(self): # 目标检测红/蓝方块 - ret1, list1 = filter.get(tlabel.RBLOCK) - if ret1 > 0: - logger.info("[抓方块]# find label") + ret = filter.find(self.target_label) + if ret > 0: return True else: return False def exec(self): - calibrate(tlabel.RBLOCK,15) - # for _ in range(3): - # by_cmd.send_speed_x(7) - # by_cmd.send_speed_omega(0) - # time.sleep(0.1) - # while True: - # # logger.info("等待进入准确区域") - # ret, error = filter.aim_near(tlabel.RBLOCK) - # while not ret: - # ret, error = filter.aim_near(tlabel.RBLOCK) - # # logger.info(error) - # if abs(error) < 5: - # for _ in range(3): - # by_cmd.send_speed_x(0) - # time.sleep(0.2) - # by_cmd.send_speed_omega(0) - # break - # ret, error = filter.aim_near(tlabel.RBLOCK) - # while not ret: - # ret, error = filter.aim_near(tlabel.RBLOCK) - # time.sleep(1) - # logger.error(error) - # if abs(error) > 5: - # logger.info("校准中") - # if error > 0: - # by_cmd.send_distance_x(-10, int(error*3)) - # else: - # by_cmd.send_distance_x(10, int(-error*3)) - # logger.error(error) - # time.sleep(1) + car_stop() + calibrate_new(self.target_label, offset = 12) + logger.info("抓取块") + time.sleep(3) + # 测试新方案 + + + + # 旧方案 # for _ in range(3): # by_cmd.send_speed_x(0) # time.sleep(0.2) # by_cmd.send_speed_omega(0) - time.sleep(2) + # calibrate(self.target_label, 12, False, 7) + # logger.info("抓取块") + # time.sleep(2) + # by_cmd.send_distance_axis_z(20, 20*7) + # time.sleep(3) + # # 向内退 + # by_cmd.send_distance_axis_x(3, 20*2) + # time.sleep(2) + # # 旋转机械臂到最右侧 + # by_cmd.send_angle_claw_arm(36 + 184) + # time.sleep(1.5) + # # 开爪子 + # by_cmd.send_angle_claw(27.0 + 9 * 3) + # # 下降 + # by_cmd.send_distance_axis_z(20, -20*4) + # time.sleep(1.5) + # # 关爪子 + # by_cmd.send_angle_claw(27.0 - 2) + # time.sleep(1) + # by_cmd.send_distance_axis_z(15, 20*2) + # time.sleep(2) + # while True: + # pass + + # by_cmd.send_distance_x(10, 500) + # counts = 0 + # while True: + # counts += filter.find(self.another_label) + # if counts >= 35: + # break + # for _ in range(3): + # by_cmd.send_speed_x(0) + # time.sleep(0.2) + # by_cmd.send_speed_omega(0) + # calibrate(self.another_label, 12, False, 7) + # logger.info("抓取块") + # time.sleep(2) + pass + # 调试 临时注释掉 + # calibrate(tlabel.RBLOCK,15) + # time.sleep(2) - by_cmd.send_position_axis_z(10, 150) - time.sleep(5) - by_cmd.send_angle_claw_arm(127) - time.sleep(1) - by_cmd.send_position_axis_x(4, 140) - time.sleep(4) - by_cmd.send_angle_claw_arm(220) - by_cmd.send_angle_claw(90) - time.sleep(1) - by_cmd.send_distance_axis_z(10, -70) - time.sleep(3) - by_cmd.send_angle_claw(27) - by_cmd.send_distance_axis_z(10, 10) - time.sleep(2) - by_cmd.send_distance_axis_x(4, -100) - time.sleep(1) - by_cmd.send_distance_axis_z(10, -40) - time.sleep(3) - by_cmd.send_angle_claw(35) - time.sleep(1) - by_cmd.send_position_axis_z(10, 150) - time.sleep(3) - by_cmd.send_position_axis_x(2, 140) - # 抓取第二个块后 收爪 - time.sleep(3) - by_cmd.send_position_axis_x(4, 0) + # by_cmd.send_position_axis_z(10, 150) + # time.sleep(5) + # by_cmd.send_angle_claw_arm(127) + # time.sleep(1) + # by_cmd.send_position_axis_x(4, 140) + # time.sleep(4) + # by_cmd.send_angle_claw_arm(220) + # by_cmd.send_angle_claw(90) + # time.sleep(1) + # by_cmd.send_distance_axis_z(10, -70) + # time.sleep(3) + # by_cmd.send_angle_claw(27) + # by_cmd.send_distance_axis_z(10, 10) + # time.sleep(2) + # by_cmd.send_distance_axis_x(4, -100) + # time.sleep(1) + # by_cmd.send_distance_axis_z(10, -40) + # time.sleep(3) + # by_cmd.send_angle_claw(35) + # time.sleep(1) + # by_cmd.send_position_axis_z(10, 150) + # time.sleep(3) + # by_cmd.send_position_axis_x(2, 140) + # # 抓取第二个块后 收爪 + # time.sleep(3) + # by_cmd.send_position_axis_x(4, 0) def nexec(self): # TODO 完成不执行任务的空动作 pass @@ -239,53 +430,31 @@ class get_block(): class put_block(): def init(self): logger.info("紧急转移初始化") + socket.send_string("1") + socket.recv() def find(self): # 目标检测医院 ret1, list1 = filter.get(tlabel.HOSPITAL) if ret1 > 0: - return True + width = list1[0][2] - list1[0][0] + if width > 120: + return True + return False else: return False def exec(self): - # TODO 测试对准效果 logger.info("找到医院") - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) - time.sleep(0.1) - while True: - # logger.info("等待进入准确区域") - ret, error = filter.aim_near(tlabel.HOSPITAL) - while not ret: - ret, error = filter.aim_near(tlabel.HOSPITAL) - # logger.info(error) - if abs(error) < 5: - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) - break - ret, error = filter.aim_near(tlabel.HOSPITAL) - while not ret: - ret, error = filter.aim_near(tlabel.HOSPITAL) - time.sleep(1) - logger.error(error) - if abs(error) > 5: - logger.info("校准中") - if error > 0: - by_cmd.send_distance_x(-10, int(error*3)) - else: - by_cmd.send_distance_x(10, int(-error*3)) - logger.error(error) - time.sleep(1) - - by_cmd.send_position_axis_z(10, 150) + car_stop() + calibrate_new(tlabel.HOSPITAL, offset = 7, run = False) time.sleep(3) - # TODO 切换爪子方向 - by_cmd.send_position_axis_x(2, 140) - time.sleep(2) - by_cmd.send_position_axis_z(10, 170) + # calibrate(tlabel.HOSPITAL, 7, False, 6) + + # by_cmd.send_position_axis_z(10, 150) + # time.sleep(3) + # # TODO 切换爪子方向 + # by_cmd.send_position_axis_x(2, 140) + # time.sleep(2) + # by_cmd.send_position_axis_z(10, 170) pass def nexec(self): pass @@ -293,70 +462,70 @@ class put_block(): # 整装上阵 class get_bball(): def init(self): - by_cmd.send_position_axis_x(2, 140) + # by_cmd.send_position_axis_x(2, 140) + # 调试 临时换源 + socket.send_string("1") + socket.recv() logger.info("整装上阵初始化") time.sleep(0.5) while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) def find(self): - # 目标检测黄球 - ret1, list1 = filter.get(tlabel.YBALL) - if ret1 > 0: + # 目标检测蓝球 + ret = filter.find(tlabel.BBALL) + if ret: return True else: return False def exec(self): - logger.info("找到黄色球") + logger.info("找到蓝色球") + car_stop() + time.sleep(0.5) for _ in range(3): - by_cmd.send_speed_x(7) - by_cmd.send_speed_omega(0) - time.sleep(0.1) - while True: - # logger.info("等待进入准确区域") - ret, error = filter.aim_near(tlabel.YBALL) - while not ret: - ret, error = filter.aim_near(tlabel.YBALL) - # logger.info(error) - if abs(error) < 5: - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) - break - ret, error = filter.aim_near(tlabel.YBALL) - while not ret: - ret, error = filter.aim_near(tlabel.YBALL) - time.sleep(1) - logger.error(error) - if abs(error) > 5: - logger.info("校准中") - if error > 0: - by_cmd.send_distance_x(-10, int(error*3)) - else: - by_cmd.send_distance_x(10, int(-error*3)) - logger.error(error) - time.sleep(1) - while (by_cmd.send_angle_camera(0) == -1): - by_cmd.send_angle_camera(0) - by_cmd.send_position_axis_z(20, 160) - time.sleep(2) - by_cmd.send_position_axis_x(2, 70) - time.sleep(2) - by_cmd.send_angle_claw(90) - time.sleep(0.2) - by_cmd.send_position_axis_x(2, 0) - time.sleep(2) - by_cmd.send_angle_claw(27) - time.sleep(1) - by_cmd.send_position_axis_z(20, 180) - time.sleep(1) - by_cmd.send_position_axis_x(4, 45) - time.sleep(1) - by_cmd.send_position_axis_z(20, 140) - time.sleep(3) - by_cmd.send_position_axis_x(2, 140) - time.sleep(2) - by_cmd.send_angle_claw(90) + calibrate_right_new(tlabel.BBALL, offset = 27, run = True, run_speed = 3.5) + logger.info("抓蓝色球") + time.sleep(5) + # 原方案 + # time.sleep(1) + # calibrate_right(tlabel.BBALL, 27, False, 5) + # logger.info("抓到了蓝色球") + # time.sleep(5) + # for i in range(2): + # while True: + # by_cmd.send_distance_x(6, 60) + # ret, error = filter.aim_near(tlabel.BBALL) + # if ret and abs(error) < 30: + # break + # else: + # by_cmd.send_distance_x(6, 60) + # for _ in range(3): + # by_cmd.send_speed_x(0) + # time.sleep(0.1) + # by_cmd.send_speed_omega(0) + + # calibrate_right(tlabel.BBALL, 27, False, 5) + # time.sleep(5) + + + # by_cmd.send_position_axis_z(20, 160) + # time.sleep(2) + # by_cmd.send_position_axis_x(2, 70) + # time.sleep(2) + # by_cmd.send_angle_claw(90) + # time.sleep(0.2) + # by_cmd.send_position_axis_x(2, 0) + # time.sleep(2) + # by_cmd.send_angle_claw(27) + # time.sleep(1) + # by_cmd.send_position_axis_z(20, 180) + # time.sleep(1) + # by_cmd.send_position_axis_x(4, 45) + # time.sleep(1) + # by_cmd.send_position_axis_z(20, 140) + # time.sleep(3) + # by_cmd.send_position_axis_x(2, 140) + # time.sleep(2) + # by_cmd.send_angle_claw(90) pass def nexec(self): pass @@ -365,46 +534,24 @@ class get_bball(): class up_tower(): def init(self): logger.info("通信抢修初始化") + while (by_cmd.send_angle_camera(90) == -1): + by_cmd.send_angle_camera(90) def find(self): # 目标检测通信塔 - ret1, list1 = filter.get(tlabel.TOWER) - if ret1 > 0: + ret, error = filter.aim_near(tlabel.TOWER) + if ret: return True else: return False def exec(self): logger.info("找到塔") - for _ in range(3): - by_cmd.send_speed_x(7) - by_cmd.send_speed_omega(0) - time.sleep(0.1) + car_stop() + calibrate_new(tlabel.TOWER, offset = 27, run = True) - + # calibrate(tlabel.TOWER, 27, False, 6) + time.sleep(3) while True: - # logger.info("等待进入准确区域") - ret, error = filter.aim_near(tlabel.TOWER) - while not ret: - ret, error = filter.aim_near(tlabel.TOWER) - # logger.info(error) - if abs(error) < 5: - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) - break - ret, error = filter.aim_near(tlabel.TOWER) - while not ret: - ret, error = filter.aim_near(tlabel.TOWER) - time.sleep(1) - logger.error(error) - if abs(error) > 5: - logger.info("校准中") - if error > 0: - by_cmd.send_distance_x(-10, int(error*3)) - else: - by_cmd.send_distance_x(10, int(-error*3)) - logger.error(error) - time.sleep(1) + pass def nexec(self): pass @@ -417,15 +564,23 @@ class get_rball(): by_cmd.send_angle_camera(0) def find(self): # 目标检测红球 - ret1, list1 = filter.get(tlabel.RBALL) - if ret1 > 0: + ret = filter.find(tlabel.RBALL) + if ret > 0: return True else: return False def exec(self): logger.info("找到红球") - # TODO 高空排险 offset 需要调整 - calibrate(tlabel.RBALL,15, True, 6) + car_stop() + calibrate_new(tlabel.RBALL,offset = -4, run = True) + time.sleep(1) + by_cmd.send_distance_y(-10, 65) + time.sleep(1) + by_cmd.send_distance_x(-10, 80) + time.sleep(1) + logger.info("抓红球") + while True: + pass pass def nexec(self): pass @@ -434,22 +589,22 @@ class get_rball(): class put_bball(): def init(self): logger.info("派发物资初始化") + socket.send_string("1") + socket.recv() while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) def find(self): - # 目标检测通信塔 - ret1, list1 = filter.get(tlabel.BASKET) - if ret1 > 0: + ret = filter.find(tlabel.BASKET) + if ret > 0: return True else: return False def exec(self): logger.info("找到篮筐") - for _ in range(3): - by_cmd.send_speed_x(0) - time.sleep(0.2) - by_cmd.send_speed_omega(0) - time.sleep(1) + car_stop() + calibrate_new(tlabel.BASKET,offset = 12, run = True) + logger.info("把球放篮筐里") + time.sleep(2) pass def nexec(self): pass @@ -469,27 +624,23 @@ class put_hanoi1(): global direction_right # 目标检测左右转向标识 # TODO 框的大小判断距离 - ret1, list1 = filter.get(tlabel.RMARK) - ret2, list2 = filter.get(tlabel.LMARK) - if ret1: - logger.info("向右拐") - list1 = list1[0] - area = (list1[2] - list1[0]) * (list1[3] - list1[1]) - logger.info(area) - if area > 2500: + # ret1, list1 = filter.get(tlabel.RMARK) + # ret2, list2 = filter.get(tlabel.LMARK) + ret, label, error = filter.get_mult_box([tlabel.LMARK, tlabel.RMARK]) + if label == tlabel.RMARK: + if abs(error) < 55: + logger.info("向右拐") direction_right += 1 return True return False - elif ret2: - logger.info("向左拐") - list2 = list2[0] - area = (list2[2] - list2[0]) * (list2[3] - list2[1]) - logger.info(area) - if area > 2500: + elif label == tlabel.LMARK: + if abs(error) < 50: + logger.info("向左拐") direction_left += 1 return True return False - return False + else: + return False def exec(self): global direction for _ in range(3): @@ -497,17 +648,36 @@ class put_hanoi1(): time.sleep(0.2) by_cmd.send_speed_omega(0) time.sleep(0.2) - # if direction == tlabel.RMARK: + + # 校准牌子 + if direction_right > direction_left: + ret, error = filter.aim_near(tlabel.RMARK) + while not ret: + ret, error = filter.aim_near(tlabel.RMARK) + direction = tlabel.RMARK + else: + ret, error = filter.aim_near(tlabel.LMARK) + while not ret: + ret, error = filter.aim_near(tlabel.LMARK) + direction = tlabel.LMARK + # 校准 omega + if error > 0: + by_cmd.send_angle_omega(-20,abs(utils.lane_error)*13) + else: + by_cmd.send_angle_omega(20,abs(utils.lane_error)*13) + time.sleep(0.5) + by_cmd.send_speed_omega(0) + time.sleep(0.5) + by_cmd.send_distance_x(10, 250) + time.sleep(1) if direction_right > direction_left: direction = tlabel.RMARK - # by_cmd.send_distance_x(6, 200) by_cmd.send_angle_omega(-20,380) time.sleep(2) while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) else: direction = tlabel.LMARK - # by_cmd.send_distance_x(6, 200) by_cmd.send_angle_omega(20,500) time.sleep(2) while (by_cmd.send_angle_camera(180) == -1): @@ -529,7 +699,7 @@ def sub_put_hanoi2(label,distance_type,run_on,back_flag): time.sleep(2) # 对准大红色柱体 - calibrate(label,15, run_on, 6) + calibrate(label,7, run_on, 10) # 抓取大红色柱体 logger.info("抓取柱体") # 根据 direction 确定移动方向 @@ -559,21 +729,6 @@ class put_hanoi2(): def find(self): time.sleep(0.001) return True - # # 目标检测左右转向标识 - # ret1, list1 = filter.get(tlabel.LPILLER) - # ret, _ = filter.get_mult([tlabel.LPILLER]) - # if ret: - # logger.info("看到了三个 直接停车") - # return 100 - # if ret1 > 0: - # list1 = list1[0] - # area = (list1[2] - list1[0]) * (list1[3] - list1[1]) - # logger.info(area) - # if area > 3300: - # return True - # return False - # else: - # return False def exec(self): # TODO 延时需要根据移动的 distance 判断 for _ in range(3): @@ -625,10 +780,12 @@ class move_area1(): return False def exec(self): logger.info("找到标示牌") - # for _ in range(3): - # by_cmd.send_speed_x(0) - # time.sleep(0.2) - # by_cmd.send_speed_omega(0) + # 停车 ocr 识别文字 调用大模型 + for _ in range(3): + by_cmd.send_speed_x(0) + time.sleep(0.2) + by_cmd.send_speed_omega(0) + time.sleep(2) pass def nexec(self): @@ -651,11 +808,14 @@ class move_area2(): # 进入停车区域 by_cmd.send_distance_y(10, 450) time.sleep(3) + + # TODO 调用大模型 然后执行动作 + by_cmd.send_light(1) + time.sleep(2) + by_cmd.send_light(0) # 离开停车区域 by_cmd.send_distance_y(-10, 450) time.sleep(3) - while True: - pass pass def nexec(self): @@ -669,7 +829,7 @@ class kick_ass(): self.target_person = cfg['kick_ass']['target_person'] def find(self): ret, error = filter.aim_near(tlabel.SIGN) - if ret > 0 and abs(error) < 8: + if ret > 0 and abs(error) < 16: return True else: return False diff --git a/utils.py b/utils.py index d28a063..e3a9a13 100644 --- a/utils.py +++ b/utils.py @@ -1,6 +1,7 @@ from enum import Enum import numpy as np +import erniebot lane_error = 0 @@ -40,7 +41,7 @@ test1_resp = { description: yolo 目标检测标签过滤器,需要传入连接到 yolo server 的 socket 对象 ''' class label_filter: - def __init__(self, socket, threshold=0.6): + def __init__(self, socket, threshold=0.5): self.num = 0 self.pos = [] self.socket = socket @@ -92,7 +93,7 @@ class label_filter: return True, np.array(results) return False, None ''' - description: 根据传入的标签过滤,返回该标签的个数以及 box + description: 根据传入的标签过滤,返回该标签的个数、box param {*} self param {*} tlabel return {int, array} @@ -107,34 +108,59 @@ class label_filter: expect_boxes = (results[:, 0] == tlabel.value) boxes = results[expect_boxes, :] self.num = len(boxes) - self.pos = boxes[:, 2:] # [[x1 y1 x2 y2]] - return self.num, self.pos - return 0, [] + if self.num: + self.pos = boxes[:, 2:] # [[x1 y1 x2 y2]] + return True, self.pos + return False, [] ''' - description: + description: 仅限在岔路口判断方向牌处使用 param {*} self param {*} tlabel_list return {*} ''' - def get_mult(self, tlabel_list): + def get_mult_box(self, tlabel_list): response = self.get_resp() if response['code'] == 0: ret, results = self.filter_box(response['data']) - target_counts = len(tlabel_list) - counts = 0 + except_label = None if ret: for tlabel in tlabel_list: expect_boxes = (results[:, 0] == tlabel.value) has_true = np.any(expect_boxes) if has_true: - counts += 1 - else: - return False, [] - if counts == target_counts: - return True, counts - return False, [] - return False, [] - return False, [] + except_label = tlabel + box = results[expect_boxes, :][:, 2:][0] + error = (box[2] + box[0] - self.img_size[0]) / 2 + break + if except_label != None: + return True, except_label, error + return False, None, None + return False, None, None + return False, None, None + def get_near_box(self, tlabel_list): + response = self.get_resp() + if response['code'] == 0: + ret, results = self.filter_box(response['data']) + except_label = [] + abs_error_list = [] + error_list = [] + if ret: + for tlabel in tlabel_list: + expect_boxes = (results[:, 0] == tlabel.value) + has_true = np.any(expect_boxes) + if has_true: + except_label.append(tlabel) + box = results[expect_boxes, :][:, 2:][0] + error = (box[2] + box[0] - self.img_size[0]) / 2 + abs_error_list.append(abs(error)) + error_list.append(error) + if len(error_list) != 0: + abs_error_list = np.array(abs_error_list) + errormin_index = np.argmin(abs_error_list) + return True, except_label[errormin_index], error_list[errormin_index] + return False, None, None + return False, None, None + return False, None, None ''' description: 判断传入的标签是否存在,存在返回 True param {*} self @@ -152,10 +178,10 @@ class label_filter: return True return False ''' - description: 根据传入的标签, + description: 根据传入的标签,寻找画面中最左侧的并返回 error param {*} self param {*} tlabel - return {*} + return {bool, error} ''' def aim_left(self, tlabel): # 如果标签存在,则返回列表中位置最靠左的目标框和中心的偏移值 @@ -181,12 +207,12 @@ class label_filter: expect_boxes = (results[:, 0] == tlabel.value) boxes = results[expect_boxes, :] if len(boxes) == 0: - return (False, ) + return (False, None) xmax_values = boxes[:, 4] # xmax xmax_index = np.argmax(xmax_values) error = (boxes[xmax_index][4] + boxes[xmax_index][2] - self.img_size[0]) / 2 return (True, error) - return (False, ) + return (False, None) def aim_near(self, tlabel): # 如果标签存在,则返回列表中位置最近的目标框和中心的偏移值 response = self.get_resp() @@ -203,12 +229,47 @@ class label_filter: return (True, error) return (False, 0) -# class Calibrate: -# def __init__(self,by_cmd): -# # 车控制对象初始化 -# self.by_cmd = by_cmd -# def aim(self,error): -# self.by_cmd.send_distance_x(error,) + +class LLM: + def __init__(self): + erniebot.api_type = "qianfan" + erniebot.ak = "jReawMtWhPu0wrxN9Rp1MzZX" + erniebot.sk = "eowS1BqsNgD2i0C9xNnHUVOSNuAzVTh6" + self.model = 'ernie-3.5' + self.prompt = '''你是一个机器人动作规划者,需要把我的话翻译成机器人动作规划并生成对应的 json 结果,机器人工作空间参考右手坐标系。 + 严格按照下面的描述生成给定格式 json,从现在开始你仅仅给我返回 json 数据!''' + self.prompt += '''正确的示例如下: + 向左移 0.1m, 向左转弯 85 度 [{'func': 'move', 'x': 0, 'y': 0.1},{'func': 'turn','angle': -85}], + 向右移 0.2m, 向前 0.1m [{'func': 'move', 'x': 0, 'y': -0.2},{'func': 'move', 'x': 0.1, 'y': 0}], + 向右转 85 度,向右移 0.1m [{'func': 'turn','angle': 85},{'func': 'move', 'x': 0, 'y': -0.1}], + 原地左转 38 度 [{'func': 'turn','angle': -38}], + 蜂鸣器发声 5 秒 [{'func': 'beep', 'time': 5}] + 发光或者照亮 5 秒 [{'func': 'light', 'time': 5}] + ''' + self.prompt += '''你无需回复我''' + self.messages = [] + self.resp = None + self.reset() + def reset(self): + self.messages = [self.make_message(self.prompt)] + self.resp = erniebot.ChatCompletion.create( + model=self.model, + messages=self.messages, + ) + self.messages.append(self.resp.to_message()) + def make_message(self,content): + return {'role': 'user', 'content': content} + def get_command_json(self,chat): + self.messages.append(self.make_message(chat)) + self.resp = erniebot.ChatCompletion.create( + model=self.model, + messages=self.messages, + ) + self.messages.append(self.resp.to_message()) + return self.resp.get_result() + + + if __name__ == '__main__': @@ -222,4 +283,9 @@ if __name__ == '__main__': print(obj.aim_left(tlabel.BBALL)) print(obj.aim_right(tlabel.BBALL)) print(obj.aim_near(tlabel.BBALL)) - print(obj.get(tlabel.HOSPITAL)) \ No newline at end of file + print(obj.get(tlabel.HOSPITAL)) + lmm_bot = LLM() + while True: + chat = input("输入:") + print(lmm_bot.get_command_json(chat)) +