from enum import Enum from loguru import logger from utils import label_filter from utils import tlabel from utils import LLM import utils import toml import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:6667") logger.info("subtask yolo client init") cfg = toml.load('cfg_subtask.toml') # 加载任务配置 logger.info("load subtask config") by_cmd = None filter = None llm_bot = None ''' description: main.py 里执行 引入全局变量 param {*} _by_cmd 控制器对象 return {*} ''' def import_obj(_by_cmd): global by_cmd global filter global llm_bot by_cmd = _by_cmd filter = label_filter(socket) if cfg['move_area']['llm_enable']: llm_bot = LLM() def car_stop(): for _ in range(3): by_cmd.send_speed_x(0) time.sleep(0.2) by_cmd.send_speed_omega(0) def calibrate_right_new(label, offset, run = True, run_speed = 3.5): not_found_counts = 0 ret, error = filter.aim_right(label) while not ret: not_found_counts += 1 if not_found_counts >= 20: not_found_counts = 0 error = -320 # error > 0 front run logger.info("找不到 直接向前") break ret, error = filter.aim_right(label) error += offset if abs(error) > 10 and run: if error > 0: by_cmd.send_speed_x(-run_speed) else: by_cmd.send_speed_x(run_speed) pass # 停的位置已经很接近目标,可以直接使用 distance 校准 else: error = error * 3 if error > 0: by_cmd.send_distance_x(-10, int(error)) else: by_cmd.send_distance_x(10, int(-error)) return while True: ret, error = filter.aim_right(label) while not ret: ret, error = filter.aim_right(label) error += offset if ret: if abs(error) <= 8: car_stop() logger.info("可以停车了") ret, error = filter.aim_right(label) while not ret: ret, error = filter.aim_right(label) error += offset if abs(error) > 8: error = error * 3 if error > 0: by_cmd.send_distance_x(-10, int(error)) else: by_cmd.send_distance_x(10, int(-error)) break ''' description: 校准新方法 找到后停止 然后根据 error 判断前后低速前进 error < 5 后直接停车 如果停车后 error > 8 则使用 distance 校准 这个方法仅用于在视野里能找到的情况下进行校准,并不能实现边走边寻找 然后再校准 param {*} label param {*} offset param {*} run param {*} run_speed return {*} ''' def calibrate_new(label, offset, run = True, run_speed = 3.5): ret, box = filter.get(label) while not ret: ret, box = filter.get(label) error = (box[0][2] + box[0][0] - 320) / 2 + offset if abs(error) > 10 and run: if error > 0: by_cmd.send_speed_x(-run_speed) else: by_cmd.send_speed_x(run_speed) # 停的位置已经很接近目标,可以直接使用 distance 校准 else: error = error * 3 if error > 0: by_cmd.send_distance_x(-10, int(error)) else: by_cmd.send_distance_x(10, int(-error)) return while True: ret, box = filter.get(label) while not ret: ret, box = filter.get(label) error = (box[0][2] + box[0][0] - 320) / 2 + offset if ret: if abs(error) <= 10: # 5 car_stop() logger.info("可以停车了") ret, box = filter.get(label) while not ret: ret, box = filter.get(label) error = (box[0][2] + box[0][0] - 320) / 2 + offset logger.info(f"停车后像素误差:{error}") if abs(error) > 8: error = error * 1.5 # 3 if error > 0: by_cmd.send_distance_x(-10, int(error)) else: by_cmd.send_distance_x(10, int(-error)) break def explore_calibrate_new(label, offset, run_direc ,run_speed = 3.5): # run_direc == 1 向前 if run_direc == 1: by_cmd.send_speed_x(run_speed) else: by_cmd.send_speed_x(-run_speed) while True: ret, box = filter.get(label) while not ret: ret, box = filter.get(label) error = (box[0][2] + box[0][0] - 320) / 2 + offset if ret: # 校准速度越大 停车的条件越宽泛 if abs(error) <= 15: car_stop() logger.info("可以停车了") ret, box = filter.get(label) while not ret: ret, box = filter.get(label) error = (box[0][2] + box[0][0] - 320) / 2 + offset logger.info(f"停车后像素误差:{error}") if abs(error) > 8: error = error * 3 if error > 0: by_cmd.send_distance_x(-10, int(error)) else: by_cmd.send_distance_x(10, int(-error)) break # 任务类 class task: def __init__(self, task_template, find_counts=10, enable=True): self.enable = enable self.task_t = task_template() self.counts = 0 self.find_counts = find_counts def init(self): self.task_t.init() def find(self): # 检查该任执行标志 while True: ret = self.task_t.find() self.counts += ret if self.counts >= self.find_counts: break def exec(self): # 根据标志位确定是否执行该任务 if self.enable is True: logger.debug(f"[Task ]# Executing task") self.task_t.exec() logger.debug(f"[Task ]# Task completed") else: logger.warning(f"[Task ]# Skip task") self.task_t.nexec() # 任务队列状态类 class task_queuem_status(Enum): IDEL = 0 SEARCHING = 1 EXECUTING = 2 # 任务队列类 非 EXECUTEING 时均执行 huigui,注意互斥操作 class task_queuem(task): # task_now = task(None, False) def __init__(self, queue): super(task_queuem, self) self.queue = queue self.status = task_queuem_status.IDEL self.busy = True logger.info(f"[TaskM]# Task num {self.queue.qsize()}") def exec(self): # 如果空闲状态则将下一个队列任务取出 if self.status is task_queuem_status.IDEL: if self.queue.qsize() == 0: self.busy = False logger.info(f"[TaskM]# Task queue empty, exit") return False self.task_now = self.queue.get() # 如果当前任务没有使能,则直接转入执行状态,由任务执行函数打印未执行信息 if self.task_now.enable is True: self.status = task_queuem_status.SEARCHING # 如果使能该任务则执行该任务的初始化动作 self.task_now.init() else: self.status = task_queuem_status.EXECUTING logger.info(f"[TaskM]# ---------------------->>>>") # 阻塞搜索任务标志位 elif self.status is task_queuem_status.SEARCHING: logger.info(f"[TaskM]# Start searching task target") self.task_now.find() self.status = task_queuem_status.EXECUTING # 执行任务函数 elif self.status is task_queuem_status.EXECUTING: logger.info(f"[TaskM]# Start execute task function") self.task_now.exec() # 执行当前任务函数 self.queue.task_done() # 弹出已执行的任务 self.status = task_queuem_status.IDEL # logger.info(f"[TaskM]# <<<<----------------------") return True # 人员施救 class get_block(): def init(self): logger.info("人员施救初始化") while (by_cmd.send_angle_camera(180) == -1): by_cmd.send_angle_camera(180) filter.switch_camera(1) if cfg['get_block']['first_block'] == "blue": self.target_label = tlabel.BBLOCK self.another_label = tlabel.RBLOCK cfg['get_block']['first_block'] = '' else: self.target_label = tlabel.RBLOCK self.another_label = tlabel.BBLOCK def find(self): # 目标检测红/蓝方块 ret = filter.find(self.target_label) if ret > 0: return True return False def exec(self): car_stop() calibrate_new(self.target_label, offset = 12, run = True) logger.info("抓取块") time.sleep(0.5) by_cmd.send_position_axis_z(20, 60) by_cmd.send_position_axis_x(1, 140) time.sleep(4) by_cmd.send_angle_claw_arm(220) by_cmd.send_angle_claw(63) by_cmd.send_position_axis_x(1, 20) time.sleep(2) by_cmd.send_angle_claw(30) by_cmd.send_light(1) by_cmd.send_beep(1) time.sleep(0.5) by_cmd.send_light(0) by_cmd.send_beep(0) time.sleep(0.5) by_cmd.send_position_axis_x(1, 140) time.sleep(3) pass # 调试 临时注释掉 # calibrate(tlabel.RBLOCK,15) # time.sleep(2) # by_cmd.send_position_axis_z(10, 150) # time.sleep(5) # by_cmd.send_angle_claw_arm(127) # time.sleep(1) # by_cmd.send_position_axis_x(4, 140) # time.sleep(4) # by_cmd.send_angle_claw_arm(220) # by_cmd.send_angle_claw(90) # time.sleep(1) # by_cmd.send_distance_axis_z(10, -70) # time.sleep(3) # by_cmd.send_angle_claw(27) # by_cmd.send_distance_axis_z(10, 10) # time.sleep(2) # by_cmd.send_distance_axis_x(4, -100) # time.sleep(1) # by_cmd.send_distance_axis_z(10, -40) # time.sleep(3) # by_cmd.send_angle_claw(35) # time.sleep(1) # by_cmd.send_position_axis_z(10, 150) # time.sleep(3) # by_cmd.send_position_axis_x(2, 140) # # 抓取第二个块后 收爪 # time.sleep(3) # by_cmd.send_position_axis_x(4, 0) def nexec(self): # TODO 完成不执行任务的空动作 pass # 紧急转移 class put_block(): def init(self): while (by_cmd.send_angle_camera(180) == -1): by_cmd.send_angle_camera(180) logger.info("紧急转移初始化") socket.send_string("1") socket.recv() def find(self): # 目标检测医院 ret, box = filter.get(tlabel.HOSPITAL) if ret > 0: width = box[0][2] - box[0][0] if width > 120: return True return False def exec(self): logger.info("找到医院") car_stop() calibrate_new(tlabel.HOSPITAL, offset = 7, run = False) time.sleep(1) by_cmd.send_position_axis_x(1, 0) time.sleep(0.5) by_cmd.send_position_axis_z(20, 0) time.sleep(3) by_cmd.send_angle_claw(63) # time.sleep(2) # by_cmd.send_position_axis_z(20, 130) # while True: # pass # by_cmd.send_position_axis_z(10, 150) # time.sleep(3) # # TODO 切换爪子方向 # by_cmd.send_position_axis_x(2, 140) # time.sleep(2) # by_cmd.send_position_axis_z(10, 170) # 下一动作预备位置 by_cmd.send_position_axis_z(20, 130) time.sleep(4) by_cmd.send_position_axis_x(1, 0) time.sleep(1) by_cmd.send_angle_claw_arm(36) pass def nexec(self): # 下一动作预备位置 by_cmd.send_position_axis_z(20, 130) time.sleep(1.5) by_cmd.send_position_axis_x(1, 0) by_cmd.send_angle_claw_arm(36) time.sleep(4) pass # 整装上阵 class get_bball(): def init(self): while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) by_cmd.send_position_axis_z(20, 135) time.sleep(0.5) by_cmd.send_position_axis_x(1, 0) time.sleep(0.5) while (by_cmd.send_angle_storage(0) == -1): by_cmd.send_angle_storage(0) # 调试 临时换源 socket.send_string("1") socket.recv() logger.info("整装上阵初始化") # time.sleep(0.5) def find(self): # 目标检测蓝球 ret = filter.find(tlabel.BBALL) if ret: return True return False def exec(self): logger.info("找到蓝色球") car_stop() time.sleep(0.5) for _ in range(3): calibrate_right_new(tlabel.BBALL, offset = 27, run = True, run_speed = 4) logger.info("抓蓝色球") time.sleep(0.5) by_cmd.send_angle_claw_arm(36) by_cmd.send_angle_claw(54) by_cmd.send_position_axis_x(1, 160) time.sleep(1) by_cmd.send_angle_claw(70) time.sleep(1) by_cmd.send_angle_claw(30) time.sleep(1) by_cmd.send_distance_axis_z(20, 20) time.sleep(1) by_cmd.send_position_axis_x(1, 0) time.sleep(1.5) by_cmd.send_angle_claw_arm(67) time.sleep(0.5) by_cmd.send_angle_claw(54) time.sleep(1) by_cmd.send_angle_claw_arm(36) time.sleep(1) by_cmd.send_distance_axis_z(20, -20) time.sleep(1) # 继续向前走 # by_cmd.send_speed_x(4) # 下一动作预备位置 by_cmd.send_angle_claw(30) by_cmd.send_position_axis_z(20, 0) time.sleep(2) pass def nexec(self): by_cmd.send_angle_claw(30) by_cmd.send_position_axis_z(20, 0) time.sleep(2) pass # 通信抢修 class up_tower(): def init(self): logger.info("通信抢修初始化") while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) def find(self): # 目标检测通信塔 ret = filter.find(tlabel.TOWER) if ret: return True return False def exec(self): logger.info("找到塔") car_stop() calibrate_new(tlabel.TOWER, offset = 27, run = True) by_cmd.send_light(1) time.sleep(0.5) by_cmd.send_light(0) # calibrate(tlabel.TOWER, 27, False, 6) time.sleep(3) # while True: # pass # 下一动作预备位置 by_cmd.send_position_axis_z(20, 0) def nexec(self): # 下一动作预备位置 by_cmd.send_position_axis_z(20, 0) time.sleep(4) pass # 高空排险 class get_rball(): def init(self): logger.info("高空排险初始化") while (by_cmd.send_angle_camera(0) == -1): by_cmd.send_angle_camera(0) def find(self): # 目标检测红球 ret = filter.find(tlabel.RBALL) if ret > 0: utils.task_speed = 5 return True else: return False def exec(self): logger.info("找到红球") utils.task_speed = 0 car_stop() time.sleep(0.5) # 靠近塔 by_cmd.send_angle_scoop(20) by_cmd.send_distance_y(-10, 65) time.sleep(1) calibrate_new(tlabel.RBALL,offset = 50, run = True) time.sleep(1) logger.info("抓红球") # by_cmd.send_angle_scoop(15) time.sleep(0.5) by_cmd.send_position_axis_z(20, 170) time.sleep(5) by_cmd.send_angle_scoop(7) by_cmd.send_distance_y(10, 65) time.sleep(0.5) # while True: # pass pass def nexec(self): pass # 派发物资 class put_bball(): def init(self): logger.info("派发物资初始化") socket.send_string("1") socket.recv() by_cmd.send_position_axis_z(20, 0) while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) def find(self): ret = filter.find(tlabel.BASKET) if ret > 0: return True else: return False def exec(self): logger.info("找到篮筐") car_stop() calibrate_new(tlabel.BASKET,offset = -15, run = True, run_speed = 5.5) by_cmd.send_distance_y(-10, 65) time.sleep(1) by_cmd.send_angle_storage(55) logger.info("把球放篮筐里") time.sleep(2) by_cmd.send_distance_y(10, 65) time.sleep(1) by_cmd.send_angle_storage(20) pass def nexec(self): pass # 物资盘点 class put_hanoi1(): def init(self): logger.info("物资盘点 1 初始化") socket.send_string("2") socket.recv() def find(self): ret, label, error = filter.get_mult_box([tlabel.LMARK, tlabel.RMARK]) if label == tlabel.RMARK: if abs(error) < 55: logger.info("向右拐") utils.direction_right += 1 return True return False elif label == tlabel.LMARK: if abs(error) < 50: logger.info("向左拐") utils.direction_left += 1 return True return False else: return False def exec(self): global direction for _ in range(3): by_cmd.send_speed_x(0) time.sleep(0.2) by_cmd.send_speed_omega(0) time.sleep(0.2) by_cmd.send_position_axis_z(20, 130) # 校准牌子 if utils.direction_right > utils.direction_left: ret, error = filter.aim_near(tlabel.RMARK) while not ret: ret, error = filter.aim_near(tlabel.RMARK) utils.direction = tlabel.RMARK logger.info("应该向右转") else: ret, error = filter.aim_near(tlabel.LMARK) while not ret: ret, error = filter.aim_near(tlabel.LMARK) utils.direction = tlabel.LMARK logger.info("应该向左转") # 校准 omega if error > 0: by_cmd.send_angle_omega(-20,abs(utils.lane_error)) else: by_cmd.send_angle_omega(20,abs(utils.lane_error)) time.sleep(0.5) car_stop() time.sleep(0.5) # by_cmd.send_distance_x(10, 200) by_cmd.send_distance_x(10, 180) time.sleep(0.5) # 根据方向初始化执行器位置 if utils.direction is tlabel.RMARK: by_cmd.send_position_axis_x(1, 0) by_cmd.send_angle_claw_arm(36) by_cmd.send_angle_storage(0) else: by_cmd.send_position_axis_x(1, 150) by_cmd.send_angle_claw_arm(220) by_cmd.send_angle_storage(55) time.sleep(1.5) by_cmd.send_position_axis_z(20, 0) if utils.direction_right > utils.direction_left: utils.direction = tlabel.RMARK by_cmd.send_angle_omega(-25,430) time.sleep(2) while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) else: utils.direction = tlabel.LMARK by_cmd.send_angle_omega(25,430) time.sleep(2) while (by_cmd.send_angle_camera(180) == -1): by_cmd.send_angle_camera(180) time.sleep(0.5) socket.send_string("1") socket.recv() def nexec(self): pass class put_hanoi2(): def __init__(self): if utils.direction_right > utils.direction_left: self.offset = 25 else: self.offset = 15 self.pos_lp = cfg['put_hanoi2']['pos_lp'] self.pos_mp = cfg['put_hanoi2']['pos_mp'] self.pos_sp = 6 - self.pos_lp - self.pos_mp if self.pos_lp == 1: self.target_label = tlabel.LPILLER elif self.pos_mp == 1: self.target_label = tlabel.MPILLER else: self.target_label = tlabel.SPILLER self.distance_lp = self.pos_lp * cfg['put_hanoi2']['pos_gap'] self.distance_mp = self.pos_mp * cfg['put_hanoi2']['pos_gap'] self.distance_sp = self.pos_sp * cfg['put_hanoi2']['pos_gap'] logger.info(f"setting hanoi pos_lp[{self.pos_lp}] pos_mp[{self.pos_mp}] pos_sp[{self.pos_sp}]") def init(self): logger.info("物资盘点 2 初始化") def find(self): ret, box = filter.get(self.target_label) if ret: utils.task_speed = 8.5 error = (box[0][2] + box[0][0] - 320) / 2 + self.offset if abs(error) < 40: return True return False def exec(self): logger.info(f"direction:{utils.direction.name}") logger.info(f"offset:{self.offset}") utils.task_speed = 0 car_stop() time.sleep(0.5) # by_cmd.send_distance_x(-8, cfg['put_hanoi2']['pos_gap'] - 30) time.sleep(1) explore_calibrate_new(tlabel.LPILLER, offset = self.offset, run_direc = 1, run_speed = 6) logger.info("抓大平台") if utils.direction is tlabel.RMARK: by_cmd.send_position_axis_z(20, 40) by_cmd.send_position_axis_x(1, 160) by_cmd.send_angle_claw(63) time.sleep(1) by_cmd.send_angle_claw(85) time.sleep(4) by_cmd.send_angle_claw(50) time.sleep(4) by_cmd.send_position_axis_x(2, 80) by_cmd.send_distance_axis_z(20, 10) else: by_cmd.send_position_axis_z(20, 30) by_cmd.send_position_axis_x(1, 40) by_cmd.send_angle_claw(63) time.sleep(1) by_cmd.send_angle_claw(90) time.sleep(4) by_cmd.send_angle_claw(50) time.sleep(2) by_cmd.send_distance_axis_z(20, 20) time.sleep(2) by_cmd.send_position_axis_x(2, 160) time.sleep(4) pass explore_calibrate_new(tlabel.TPLATFORM, offset = self.offset, run_direc = -1, run_speed = 6) logger.info("放大平台") if utils.direction is tlabel.RMARK: by_cmd.send_distance_axis_z(20, -10) by_cmd.send_position_axis_x(2, 160) time.sleep(4) by_cmd.send_angle_claw(90) by_cmd.send_position_axis_x(1, 0) else: by_cmd.send_position_axis_x(2, 40) time.sleep(4) by_cmd.send_distance_axis_z(20, -20) time.sleep(2) by_cmd.send_angle_claw(90) time.sleep(2) by_cmd.send_position_axis_x(1, 160) explore_calibrate_new(tlabel.MPILLER, offset = self.offset, run_direc = 1, run_speed = 6) logger.info("抓中平台") if utils.direction is tlabel.RMARK: pass else: pass explore_calibrate_new(tlabel.TPLATFORM, offset = self.offset, run_direc = -1, run_speed = 6) logger.info("放中平台") if utils.direction is tlabel.RMARK: pass else: pass explore_calibrate_new(tlabel.SPILLER, offset = self.offset, run_direc = 1, run_speed = 6) logger.info("抓小平台") if utils.direction is tlabel.RMARK: pass else: pass explore_calibrate_new(tlabel.TPLATFORM, offset = self.offset, run_direc = -1, run_speed = 6) logger.info("放小平台") if utils.direction is tlabel.RMARK: pass else: pass # while True: # pass def nexec(self): pass class put_hanoi3(): def init(self): logger.info("完成任务,爪回左侧") by_cmd.send_angle_claw_arm(128) time.sleep(0.5) by_cmd.send_position_axis_x(1, 150) time.sleep(1) by_cmd.send_angle_claw_arm(220) def find(self): time.sleep(1) return True def exec(self): by_cmd.send_position_axis_z(20, 100) pass def nexec(self): pass # 应急避险 第一阶段 找目标牌 class move_area1(): def init(self): logger.info("应急避险第一阶段初始化") while (by_cmd.send_angle_camera(180) == -1): by_cmd.send_angle_camera(180) def find(self): ret = filter.find(tlabel.SIGN) if ret: return True return False def exec(self): logger.info("找到标示牌") # 停车 ocr 识别文字 调用大模型 car_stop() time.sleep(2) utils.task_speed = 8 pass def nexec(self): pass # 应急避险 第二阶段 找停车区域 class move_area2(): def init(self): logger.info("应急避险第二阶段初始化") self.offset = 15 def find(self): # time.sleep(0.001) ret, box = filter.get(tlabel.SHELTER) if ret: error = (box[0][2] + box[0][0] - 320) / 2 + self.offset if abs(error) < 10: return 5000 return False def exec(self): utils.task_speed = 0 logger.info("开始寻找停车区域") car_stop() calibrate_new(tlabel.SHELTER, offset = 15, run = True) time.sleep(1) # 进入停车区域 by_cmd.send_distance_y(10, 450) time.sleep(3) # TODO 调用大模型 然后执行动作 by_cmd.send_light(1) time.sleep(2) by_cmd.send_light(0) # 离开停车区域 by_cmd.send_distance_y(-10, 450) time.sleep(3) by_cmd.send_position_axis_z(20, 0) pass def nexec(self): by_cmd.send_position_axis_z(20, 0) pass # 扫黑除暴 class kick_ass(): def init(self): while (by_cmd.send_angle_camera(180) == -1): by_cmd.send_angle_camera(180) logger.info("扫黑除暴初始化") self.pos_gap1 = cfg['kick_ass']['pos_gap1'] self.pos_gap2 = cfg['kick_ass']['pos_gap2'] self.target_person = cfg['kick_ass']['target_person'] def find(self): ret = filter.find(tlabel.SIGN) if ret: return True return False def exec(self): logger.info("找到标示牌") # 停的晚无需校准 omage car_stop() time.sleep(1) calibrate_new(tlabel.SIGN, offset = 8, run = True) time.sleep(0.5) by_cmd.send_position_axis_z(20, 60) if self.target_person == 1: target_distance = self.pos_gap1 else: target_distance = self.pos_gap1 + (self.target_person - 1) * self.pos_gap2 + (self.target_person - 1) * 10 by_cmd.send_distance_x(10, target_distance) time.sleep(5) by_cmd.send_angle_claw_arm(220) by_cmd.send_angle_claw(15) by_cmd.send_position_axis_x(1, 160) time.sleep(2) by_cmd.send_position_axis_x(1, 0) time.sleep(4) by_cmd.send_position_axis_x(1, 160) time.sleep(3) pass def nexec(self): pass