''' 待办事项: - nexec 中添加延时,保证重试时不会立即跳入下个任务 - 确认 nexec 还是直接添加一个任务结束后执行的方法更好,或者两者都保留 - 医院第二个方块自由落体 ''' from enum import Enum from loguru import logger from utils import label_filter from utils import tlabel from utils import LLM from utils import CountRecord import utils import toml import zmq import time import variable as var import action as act context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:6667") logger.info("subtask yolo client init") cfg = toml.load('cfg_subtask.toml') # 加载任务配置 logger.info("load subtask config") by_cmd = None filter = None llm_bot = None ''' description: main.py 里执行 引入全局变量 param {*} _by_cmd 控制器对象 return {*} ''' def import_obj(_by_cmd): global by_cmd global filter global llm_bot by_cmd = _by_cmd filter = label_filter(socket) if cfg['move_area']['llm_enable']: llm_bot = LLM() def car_stop(): for _ in range(3): by_cmd.send_speed_x(0) time.sleep(0.2) by_cmd.send_speed_omega(0) # 蓝色球使用 def calibrate_right_new(label, offset, run = True, run_speed = 3.5): not_found_counts = 0 ret, error = filter.aim_right(label) while not ret: not_found_counts += 1 if not_found_counts >= 20: not_found_counts = 0 error = -320 # error > 0 front run logger.info("calibrate_right_new:找不到次数超过 20 帧 直接前进寻找") break ret, error = filter.aim_right(label) error += offset if abs(error) > 10 and run: if error > 0: by_cmd.send_speed_x(-run_speed) else: by_cmd.send_speed_x(run_speed) # 停的位置已经很接近目标,可以直接使用 distance 校准 else: return while True: ret, error = filter.aim_right(label) while not ret: ret, error = filter.aim_right(label) error += offset # if ret: if abs(error) <= 8: car_stop() logger.info("calibrate_right_new:行进时 误差小于 8 直接停车") ret, error = filter.aim_right(label) while not ret: not_found_counts += 1 if not_found_counts >=30: return ret, error = filter.aim_right(label) error += offset logger.info(f"calibrate_right_new:停车后的误差是{error}") if abs(error) > 8: logger.info(f"calibrate_right_new:停车后的误差大于 8 使用 distance 校准") error = error * 3 if error > 0: by_cmd.send_distance_x(-10, int(error)) else: by_cmd.send_distance_x(10, int(-error)) break ''' description: 校准新方法 找到后停止 然后根据 error 判断前后低速前进 error < 5 后直接停车 如果停车后 error > 8 则使用 distance 校准 这个方法仅用于在视野里能找到的情况下进行校准,并不能实现边走边寻找 然后再校准 param {*} label param {*} offset param {*} run param {*} run_speed return {*} ''' def calibrate_new(label, offset, run = True, run_speed = 3.5): not_found_counts = 0 ret, box = filter.get(label) while not ret: not_found_counts += 1 if not_found_counts >= 20: not_found_counts = 0 error = -320 # error > 0 front run logger.info("calibrate_new:找不到次数超过 20 帧 直接前进寻找") break ret, box = filter.get(label) # 如果超过二十帧跳出,则此时 box 为空值,需要再校验 ret if ret is True: error = (box[0][2] + box[0][0] - 320) / 2 + offset if abs(error) > 10 and run: if error > 0: by_cmd.send_speed_x(-run_speed) else: by_cmd.send_speed_x(run_speed) # 停的位置已经很接近目标,可以直接使用 distance 校准 else: if abs(error) > 8: logger.info(f"calibrate_new:停车后误差{error}大于 8 使用 distance 校准") # error = error # 3 if error > 0: by_cmd.send_distance_x(-10, int(error)) else: by_cmd.send_distance_x(10, int(-error)) return logger.info(f"calibrate_new:停车后误差{error}小于 8 不校准") return while True: ret, box = filter.get(label) while not ret: ret, box = filter.get(label) error = (box[0][2] + box[0][0] - 320) / 2 + offset if ret: if abs(error) <= 10: # 5 car_stop() logger.info("calibrate_new:行进时 误差小于 10 直接停车") ret, box = filter.get(label) while not ret: ret, box = filter.get(label) error = (box[0][2] + box[0][0] - 320) / 2 + offset logger.info(f"calibrate_new:停车后的误差是{error}") if abs(error) > 8: logger.info(f"calibrate_new:停车后的误差大于 8 使用 distance 校准") error = error * 3 # logger.error(f"error * 3 {error}") if error > 0: by_cmd.send_distance_x(-10, int(error)) else: by_cmd.send_distance_x(10, int(-error)) break def explore_calibrate_new(label, offset, run_direc ,run_speed = 3.5): # run_direc == 1 向前 if run_direc == 1: by_cmd.send_speed_x(run_speed) else: by_cmd.send_speed_x(-run_speed) while True: ret, box = filter.get(label) while not ret: ret, box = filter.get(label) error = (box[0][2] + box[0][0] - 320) / 2 + offset if ret: # 校准速度越大 停车的条件越宽泛 if abs(error) <= 20: car_stop() logger.info("explore_calibrate_new:行进时 误差小于 10 直接停车") ret, box = filter.get(label) while not ret: ret, box = filter.get(label) error = (box[0][2] + box[0][0] - 320) / 2 + offset logger.info(f"停车后像素误差:{error}") logger.info(f"explore_calibrate_new:停车后的误差大于 8 使用 distance 校准") if abs(error) < 8: error = error * 3 else: error = error if error > 0: by_cmd.send_distance_x(-10, int(error)) else: by_cmd.send_distance_x(10, int(-error)) break # 任务类 class task: def __init__(self, task_template, find_counts=10, enable=True): self.enable = enable self.task_t = task_template() self.counts = 0 self.find_counts = find_counts def init(self): if hasattr(self.task_t, 'init') and callable(getattr(self.task_t, 'init', None)): self.task_t.init() else: logger.warning("[Task ]# 该任务没有 init 方法") def find(self): if hasattr(self.task_t, 'find') and callable(getattr(self.task_t, 'find', None)): # 检查该任务执行标志 while True: ret = self.task_t.find() self.counts += ret if self.counts >= self.find_counts: break else: logger.warning("[Task ]# 该任务没有 find 方法") def exec(self): # 根据标志位确定是否执行该任务 if self.enable is True: if hasattr(self.task_t, 'exec') and callable(getattr(self.task_t, 'exec', None)): logger.info(f"[Task ]# Executing task") self.task_t.exec() else: logger.warning("[Task ]# 该任务没有 exec 方法") else: logger.warning(f"[Task ]# Skip task") if hasattr(self.task_t, 'nexec') and callable(getattr(self.task_t, 'nexec', None)): self.task_t.nexec() else: logger.warning("[Task ]# 该任务没有 nexec 方法") def after(self): if hasattr(self.task_t, 'after') and callable(getattr(self.task_t, 'after', None)): self.task_t.after() logger.debug(f"[Task ]# Task completed") else: logger.warning("[Task ]# 该任务没有 after 方法") # 任务队列状态类 class task_queuem_status(Enum): IDEL = 0 SEARCHING = 1 EXECUTING = 2 # 任务队列类 非 EXECUTEING 时均执行 huigui,注意互斥操作 class task_queuem(task): # task_now = task(None, False) def __init__(self, queue): super(task_queuem, self) self.queue = queue self.status = task_queuem_status.IDEL self.busy = True logger.info(f"[TaskM]# Task num {self.queue.qsize()}") def exec(self): # 如果空闲状态则将下一个队列任务取出 if self.status is task_queuem_status.IDEL: if self.queue.qsize() == 0: self.busy = False logger.info(f"[TaskM]# Task queue empty, exit") return False self.task_now = self.queue.get() # 如果当前任务没有使能,则直接转入执行状态,由任务执行函数打印未执行信息 if self.task_now.enable is True: self.status = task_queuem_status.SEARCHING # 如果使能该任务则执行该任务的初始化动作 self.task_now.init() else: self.status = task_queuem_status.EXECUTING logger.info(f"[TaskM]# ---------------------->>>>") # 阻塞搜索任务标志位 elif self.status is task_queuem_status.SEARCHING: logger.info(f"[TaskM]# Start searching task target") self.task_now.find() self.status = task_queuem_status.EXECUTING # 执行任务函数 elif self.status is task_queuem_status.EXECUTING: if self.task_now.enable is True: logger.info(f"[TaskM]# Start execute task function") self.task_now.exec() # 执行当前任务函数 self.status = task_queuem_status.IDEL # 执行完成后为退出巡线阻塞 self.task_now.after() # 执行任务后处理 self.queue.task_done() # 弹出已执行的任务 logger.info(f"[TaskM]# <<<<----------------------") else: logger.info(f"[TaskM]# Start execute task function (nexec)") self.status = task_queuem_status.IDEL # 空动作不需要阻塞巡线,直接置位 self.task_now.exec() # 执行当前任务函数 self.task_now.after() # 执行任务后处理 self.queue.task_done() # 弹出已执行的任务 logger.info(f"[TaskM]# <<<<----------------------") return True # 人员施救 class get_block1(): def init(self): var.task_speed = 0 logger.info(f"人员施救 1 初始化,第一抓取块为 {cfg['get_block']['first_block']}") act.cmd.camera(0) act.cmd.z2(20, 60, 0) filter.switch_camera(1) if cfg['get_block']['first_block'] == "blue": self.target_label = tlabel.BBLOCK self.another_label = tlabel.RBLOCK else: self.target_label = tlabel.RBLOCK self.another_label = tlabel.BBLOCK cfg['get_block']['first_block'] = "red" def find(self): # 目标检测红/蓝方块 ret = filter.find(self.target_label) if ret > 0: return True return False def exec(self): car_stop() calibrate_new(self.target_label, offset = 15, run = True) logger.info("抓取块") # act.axis.wait(0.5) # act.axis.z2(60) # act.axis.x2(100) # act.axis.claw_arm(220) # act.axis.claw(63) # act.axis.x2(20) # act.axis.wait(2) # act.axis.claw(30) # act.axis.x2(80) # act.axis.z2(130) # act.axis.claw_arm(36) # act.axis.z2(20) # act.axis.x2(40) # act.axis.claw(45) # act.axis.z2(80) # act.axis.claw_arm(220) # act.axis.x2(100) # act.axis.exec() by_cmd.send_position_axis_z(30, 60) time.sleep(1) by_cmd.send_angle_claw(63) by_cmd.send_position_axis_x(1, 20) time.sleep(2) by_cmd.send_angle_claw(30) by_cmd.send_beep(1) time.sleep(0.5) by_cmd.send_beep(0) time.sleep(0.5) by_cmd.send_position_axis_x(1, 80) by_cmd.send_position_axis_z(30, 130) time.sleep(2) by_cmd.send_angle_claw_arm(36) time.sleep(1) by_cmd.send_position_axis_z(30, 20) time.sleep(0.5) by_cmd.send_position_axis_x(1, 40) time.sleep(1) by_cmd.send_angle_claw(45) time.sleep(0.5) by_cmd.send_position_axis_z(30, 80) time.sleep(2) by_cmd.send_angle_claw_arm(220) time.sleep(0.5) by_cmd.send_position_axis_x(1, 100) time.sleep(1) pass def nexec(self): # TODO 完成不执行任务的空动作 pass def after(self): pass class get_block2(): def init(self): logger.info(f"人员施救 2 初始化,第一抓取块为 {cfg['get_block']['first_block']}") while (by_cmd.send_angle_camera(0) == -1): by_cmd.send_angle_camera(0) filter.switch_camera(1) if cfg['get_block']['first_block'] == "red": self.target_label = tlabel.BBLOCK self.another_label = tlabel.RBLOCK else: self.target_label = tlabel.RBLOCK self.another_label = tlabel.BBLOCK def find(self): # 目标检测红/蓝方块 ret = filter.find(self.target_label) if ret > 0: return True return False def exec(self): car_stop() calibrate_new(self.target_label, offset = 15, run = True) logger.info("抓取块") time.sleep(0.5) by_cmd.send_angle_claw_arm(220) by_cmd.send_angle_claw(53) by_cmd.send_position_axis_z(30, 60) time.sleep(1) by_cmd.send_position_axis_x(1, 20) time.sleep(1) by_cmd.send_angle_claw(30) by_cmd.send_beep(1) time.sleep(0.5) by_cmd.send_beep(0) time.sleep(0.5) by_cmd.send_position_axis_x(1, 100) # by_cmd.send_distance_x(15, 100) time.sleep(2) pass def nexec(self): # TODO 完成不执行任务的空动作 pass def after(self): var.pid_turning.set(cfg["get_block"]["pid_kp"], cfg["get_block"]["pid_ki"], cfg["get_block"]["pid_kd"]) # 任务检查间隔 time.sleep(9) # 紧急转移 class put_block(): def init(self): var.task_speed = 0 while (by_cmd.send_angle_camera(0) == -1): by_cmd.send_angle_camera(0) logger.info("紧急转移初始化") filter.switch_camera(1) def find(self): # 目标检测医院 ret, box = filter.get(tlabel.HOSPITAL) if ret > 0: width = box[0][2] - box[0][0] if width > 145: return True return False def exec(self): logger.info("找到医院") car_stop() calibrate_new(tlabel.HOSPITAL, offset = 7, run = False) by_cmd.send_distance_x(10, 100) by_cmd.send_position_axis_z(30, 0) time.sleep(3) by_cmd.send_position_axis_x(1, 40) time.sleep(1) by_cmd.send_angle_claw(63) time.sleep(1) by_cmd.send_position_axis_x(1, 80) by_cmd.send_position_axis_z(30, 130) time.sleep(2) by_cmd.send_angle_claw_arm(36) time.sleep(1) by_cmd.send_angle_claw(45) time.sleep(1) by_cmd.send_position_axis_z(30, 20) time.sleep(0.5) by_cmd.send_position_axis_x(1, 40) time.sleep(2) by_cmd.send_angle_claw(30) time.sleep(0.5) by_cmd.send_position_axis_z(30, 140) time.sleep(2) by_cmd.send_angle_claw_arm(220) time.sleep(1) by_cmd.send_distance_x(-10, 110) by_cmd.send_position_axis_z(30, 0) time.sleep(2) by_cmd.send_position_axis_x(1, 40) time.sleep(2) by_cmd.send_angle_claw(45) time.sleep(1) by_cmd.send_position_axis_x(1, 80) time.sleep(1) pass def nexec(self): pass def after(self): var.pid_turning.set(cfg["put_block"]["pid_kp"], cfg["put_block"]["pid_ki"], cfg["put_block"]["pid_kd"]) # 下一动作预备位置 while by_cmd.send_position_axis_z(30, 135) == -1: pass time.sleep(1) while by_cmd.send_position_axis_x(1, 0) == -1: pass while by_cmd.send_angle_claw_arm(36) == -1: pass # 任务检查间隔 # time.sleep(2) # 整装上阵 class get_bball(): def init(self): while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) # by_cmd.send_position_axis_z(30, 135) # time.sleep(0.5) # by_cmd.send_position_axis_x(1, 0) # time.sleep(2) # by_cmd.send_angle_claw_arm(36) while (by_cmd.send_angle_storage(0) == -1): by_cmd.send_angle_storage(0) # 调试 临时换源 filter.switch_camera(1) logger.info("整装上阵初始化") # time.sleep(0.5) self.record = CountRecord(5) def find(self): # 目标检测蓝球 ret = filter.find(tlabel.BBALL) # ret = filter.find(tlabel.BBALL) or filter.find(tlabel.YBALL) # TODO 此处使用连续检出判断感觉不是很好,黄球放置远时停车较晚,可能跟请求速度相关 # if ret: # if self.record(tlabel.BBALL): # return True if ret: return True return False def exec(self): logger.info("找到蓝色球") car_stop() time.sleep(0.5) for _ in range(3): calibrate_right_new(tlabel.BBALL, offset = 16, run = True, run_speed = 4) logger.info("抓蓝色球") time.sleep(0.5) by_cmd.send_angle_claw_arm(36) by_cmd.send_angle_claw(54) by_cmd.send_position_axis_x(1, 160) time.sleep(1) by_cmd.send_angle_claw(60) time.sleep(1) by_cmd.send_angle_claw(30) time.sleep(1) by_cmd.send_distance_axis_z(30, 20) time.sleep(1) by_cmd.send_position_axis_x(1, 40) time.sleep(0.5) by_cmd.send_distance_axis_z(30, -40) time.sleep(0.5) by_cmd.send_angle_claw_arm(70) time.sleep(0.5) by_cmd.send_angle_claw(54) time.sleep(1) by_cmd.send_angle_claw_arm(36) time.sleep(1) by_cmd.send_position_axis_z(30, 135) # 继续向前走 # by_cmd.send_speed_x(4) pass def nexec(self): pass def after(self): var.pid_turning.set(cfg["get_bball"]["pid_kp"], cfg["get_bball"]["pid_ki"], cfg["get_bball"]["pid_kd"]) # 下一动作预备位置 by_cmd.send_angle_claw(30) time.sleep(0.5) by_cmd.send_position_axis_z(30, 0) time.sleep(1) # # 任务检查间隔 # time.sleep(1) # 通信抢修 class up_tower(): def init(self): logger.info("通信抢修初始化") while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) filter.switch_camera(1) def find(self): # 目标检测通信塔 ret = filter.find(tlabel.TOWER) if ret: return True return False def exec(self): logger.info("找到塔") car_stop() time.sleep(1) calibrate_new(tlabel.TOWER, offset = 20, run = True) time.sleep(1) # calibrate(tlabel.TOWER, 27, False, 6) by_cmd.send_distance_x(-10, 120) time.sleep(1) by_cmd.send_distance_y(-10, 50) time.sleep(3) by_cmd.send_angle_zhuan(10) time.sleep(9) by_cmd.send_distance_y(10, 60) time.sleep(1) by_cmd.send_angle_zhuan(0) time.sleep(0.5) # while True: # pass def nexec(self): pass def after(self): var.pid_turning.set(cfg["up_tower"]["pid_kp"], cfg["up_tower"]["pid_ki"], cfg["up_tower"]["pid_kd"]) # 下一动作预备位置 by_cmd.send_position_axis_z(30, 0) # 任务检查间隔 time.sleep(4) # 高空排险 class get_rball(): def init(self): filter.switch_camera(1) logger.info("高空排险初始化") while (by_cmd.send_angle_camera(180) == -1): by_cmd.send_angle_camera(180) self.record = CountRecord(3) def find(self): # 目标检测红球 ret = filter.find(tlabel.RBALL) if ret > 0: # TODO 连续两帧才开始减速 if self.record(tlabel.RBALL): var.task_speed = 5 return True else: return False def exec(self): logger.info("找到红球") var.task_speed = 0 car_stop() time.sleep(0.5) # 靠近塔 by_cmd.send_angle_scoop(20) by_cmd.send_distance_y(-15, 50) # 50 time.sleep(2) calibrate_new(tlabel.RBALL,offset = 44, run = True) time.sleep(1) logger.info("抓红球") # by_cmd.send_angle_scoop(15) time.sleep(0.5) by_cmd.send_position_axis_z(30, 170) time.sleep(5) by_cmd.send_angle_scoop(7) by_cmd.send_distance_y(15, 70) time.sleep(1) by_cmd.send_angle_omega(-55,30) # while True: # pass pass def nexec(self): pass def after(self): var.pid_turning.set(cfg["get_rball"]["pid_kp"], cfg["get_rball"]["pid_ki"], cfg["get_rball"]["pid_kd"]) # 任务检查间隔 time.sleep(2) # 派发物资 class put_bball(): def init(self): logger.info("派发物资初始化") filter.switch_camera(1) by_cmd.send_position_axis_z(30, 0) while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) def find(self): ret = filter.find(tlabel.BASKET) if ret > 0: return True else: return False def exec(self): logger.info("找到篮筐") car_stop() calibrate_new(tlabel.BASKET,offset = -21, run = True, run_speed = 5.5) by_cmd.send_distance_x(10, 20) by_cmd.send_distance_y(-10, 55) time.sleep(1) by_cmd.send_angle_storage(55) logger.info("把球放篮筐里") time.sleep(2) by_cmd.send_distance_y(10, 55) time.sleep(1) by_cmd.send_angle_storage(20) pass def nexec(self): pass def after(self): var.pid_turning.set(cfg["put_bball"]["pid_kp"], cfg["put_bball"]["pid_ki"], cfg["put_bball"]["pid_kd"]) # 任务检查间隔 time.sleep(2) # 物资盘点 class put_hanoi1(): def init(self): logger.info("物资盘点 1 初始化") filter.switch_camera(2) def find(self): ret, label, error = filter.get_mult_box([tlabel.LMARK, tlabel.RMARK]) if label == tlabel.RMARK: if abs(error) < 55: logger.info("向右拐") utils.direction_right += 1 return True return False elif label == tlabel.LMARK: if abs(error) < 50: logger.info("向左拐") utils.direction_left += 1 return True return False else: return False def exec(self): global direction for _ in range(3): by_cmd.send_speed_x(0) time.sleep(0.2) by_cmd.send_speed_omega(0) time.sleep(0.2) by_cmd.send_position_axis_z(30, 130) # 校准牌子 if utils.direction_right > utils.direction_left: ret, error = filter.aim_near(tlabel.RMARK) while not ret: ret, error = filter.aim_near(tlabel.RMARK) utils.direction = tlabel.RMARK logger.info("应该向右转") else: ret, error = filter.aim_near(tlabel.LMARK) while not ret: ret, error = filter.aim_near(tlabel.LMARK) utils.direction = tlabel.LMARK logger.info("应该向左转") # 校准 omega if error > 0: by_cmd.send_angle_omega(-20,abs(var.lane_error)) else: by_cmd.send_angle_omega(20,abs(var.lane_error)) time.sleep(0.5) car_stop() time.sleep(0.5) # by_cmd.send_distance_x(10, 200) by_cmd.send_distance_x(10, 180) time.sleep(0.5) # 根据方向初始化执行器位置 if utils.direction is tlabel.RMARK: # FIXME 右侧的爪子会被 storage 挡住 by_cmd.send_position_axis_x(1, 0) by_cmd.send_angle_claw_arm(36) by_cmd.send_angle_storage(0) else: by_cmd.send_position_axis_x(1, 150) by_cmd.send_angle_claw_arm(217) by_cmd.send_angle_storage(55) time.sleep(1.5) by_cmd.send_position_axis_z(30, 0) if utils.direction_right > utils.direction_left: utils.direction = tlabel.RMARK # by_cmd.send_angle_omega(-25,430) # by_cmd.send_angle_omega(-45,238) by_cmd.send_angle_omega(-55,194) time.sleep(2) while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) else: utils.direction = tlabel.LMARK # by_cmd.send_angle_omega(25,430) # by_cmd.send_angle_omega(45,238) by_cmd.send_angle_omega(55,194) time.sleep(2) while (by_cmd.send_angle_camera(0) == -1): by_cmd.send_angle_camera(0) time.sleep(0.5) filter.switch_camera(1) def nexec(self): pass def after(self): var.pid_turning.set(0.8, 0, 0) pass class put_hanoi2(): def __init__(self): self.pos_lp = cfg['put_hanoi2']['pos_lp'] self.pos_mp = cfg['put_hanoi2']['pos_mp'] self.pos_sp = 6 - self.pos_lp - self.pos_mp if self.pos_lp == 1: self.target_label = tlabel.LPILLER elif self.pos_mp == 1: self.target_label = tlabel.MPILLER else: self.target_label = tlabel.SPILLER self.distance_lp = self.pos_lp * cfg['put_hanoi2']['pos_gap'] self.distance_mp = self.pos_mp * cfg['put_hanoi2']['pos_gap'] self.distance_sp = self.pos_sp * cfg['put_hanoi2']['pos_gap'] logger.info(f"setting hanoi pos_lp[{self.pos_lp}] pos_mp[{self.pos_mp}] pos_sp[{self.pos_sp}]") def init(self): logger.info("物资盘点 2 初始化") if utils.direction == tlabel.RMARK: self.offset = 25 else: self.offset = 10 def find(self): ret, box = filter.get(self.target_label) if ret: var.task_speed = 8.5 error = (box[0][2] + box[0][0] - 320) / 2 + self.offset if abs(error) < 40: return True return False def exec(self): logger.info(f"direction:{utils.direction.name}") var.task_speed = 0 car_stop() time.sleep(0.5) # by_cmd.send_distance_x(-8, cfg['put_hanoi2']['pos_gap'] - 30) time.sleep(1) explore_calibrate_new(tlabel.LPILLER, offset = self.offset, run_direc = 1, run_speed = 5) time.sleep(1) logger.info("抓大平台") if utils.direction is tlabel.RMARK: by_cmd.send_position_axis_z(30, 10) by_cmd.send_position_axis_x(1, 130) by_cmd.send_angle_claw(63) time.sleep(1) by_cmd.send_angle_claw(90) time.sleep(1.5) by_cmd.send_angle_claw(45) time.sleep(2) by_cmd.send_distance_axis_z(30, 20) time.sleep(1) by_cmd.send_position_axis_x(1, 10) time.sleep(2) pass else: by_cmd.send_position_axis_z(30, 10) by_cmd.send_position_axis_x(1, 40) by_cmd.send_angle_claw(63) time.sleep(1) by_cmd.send_angle_claw(90) time.sleep(1.5) by_cmd.send_angle_claw(45) time.sleep(2) by_cmd.send_distance_axis_z(30, 20) time.sleep(1) by_cmd.send_position_axis_x(1, 160) time.sleep(2) pass explore_calibrate_new(tlabel.TPLATFORM, offset = self.offset, run_direc = -1, run_speed = 5) time.sleep(1) logger.info("放大平台") if utils.direction is tlabel.RMARK: by_cmd.send_position_axis_x(1, 130) time.sleep(1.5) by_cmd.send_distance_axis_z(30, -20) time.sleep(1) by_cmd.send_angle_claw(90) time.sleep(1) by_cmd.send_position_axis_x(1, 10) pass else: by_cmd.send_position_axis_x(1, 40) time.sleep(1.5) by_cmd.send_distance_axis_z(30, -20) time.sleep(1) by_cmd.send_angle_claw(90) time.sleep(1) by_cmd.send_position_axis_x(1, 160) explore_calibrate_new(tlabel.MPILLER, offset = self.offset, run_direc = 1, run_speed = 5) time.sleep(1) logger.info("抓中平台") if utils.direction is tlabel.RMARK: by_cmd.send_position_axis_z(30, 10) by_cmd.send_position_axis_x(1, 130) by_cmd.send_angle_claw(63) time.sleep(1) by_cmd.send_angle_claw(85) time.sleep(2) by_cmd.send_angle_claw(40) time.sleep(2) by_cmd.send_distance_axis_z(30, 20) time.sleep(1) by_cmd.send_position_axis_x(1, 10) time.sleep(3) pass else: by_cmd.send_position_axis_z(30, 10) by_cmd.send_position_axis_x(1, 40) by_cmd.send_angle_claw(63) time.sleep(1) by_cmd.send_angle_claw(85) time.sleep(2) by_cmd.send_angle_claw(40) time.sleep(2) by_cmd.send_distance_axis_z(30, 20) time.sleep(1) by_cmd.send_position_axis_x(1, 160) time.sleep(3) pass explore_calibrate_new(tlabel.LPILLER, offset = self.offset, run_direc = -1, run_speed = 5) time.sleep(1) logger.info("放中平台") if utils.direction is tlabel.RMARK: by_cmd.send_position_axis_z(30, 100) time.sleep(2) by_cmd.send_position_axis_x(1, 130) time.sleep(2) by_cmd.send_distance_axis_z(30, -20) time.sleep(0.5) by_cmd.send_angle_claw(90) time.sleep(0.5) by_cmd.send_position_axis_x(1, 10) time.sleep(1) pass else: by_cmd.send_position_axis_z(30, 100) time.sleep(2) by_cmd.send_position_axis_x(1, 40) time.sleep(2) by_cmd.send_distance_axis_z(30, -20) time.sleep(0.5) by_cmd.send_angle_claw(90) time.sleep(0.5) by_cmd.send_position_axis_x(1, 160) time.sleep(1) explore_calibrate_new(tlabel.SPILLER, offset = self.offset, run_direc = 1, run_speed = 5) time.sleep(1) logger.info("抓小平台") if utils.direction is tlabel.RMARK: by_cmd.send_position_axis_z(30, 10) by_cmd.send_position_axis_x(1, 130) by_cmd.send_angle_claw(63) time.sleep(1) by_cmd.send_angle_claw(85) time.sleep(1) by_cmd.send_angle_claw(30) time.sleep(1) by_cmd.send_distance_axis_z(30, 10) time.sleep(1) by_cmd.send_position_axis_x(1, 10) time.sleep(2) pass else: by_cmd.send_position_axis_z(30, 10) by_cmd.send_position_axis_x(1, 40) by_cmd.send_angle_claw(63) time.sleep(1) by_cmd.send_angle_claw(85) time.sleep(1) by_cmd.send_angle_claw(30) time.sleep(1) by_cmd.send_distance_axis_z(30, 10) time.sleep(1) by_cmd.send_position_axis_x(1, 160) time.sleep(2) pass explore_calibrate_new(tlabel.MPILLER, offset = self.offset, run_direc = -1, run_speed = 5) time.sleep(1) logger.info("放小平台") if utils.direction is tlabel.RMARK: by_cmd.send_position_axis_z(30, 170) time.sleep(1.5) by_cmd.send_position_axis_x(1, 130) time.sleep(2) by_cmd.send_distance_axis_z(30, -20) time.sleep(0.5) by_cmd.send_angle_claw(90) time.sleep(1) by_cmd.send_position_axis_x(1, 10) time.sleep(1.5) pass else: by_cmd.send_position_axis_z(30, 170) time.sleep(1.5) by_cmd.send_position_axis_x(1, 40) time.sleep(2) by_cmd.send_distance_axis_z(30, -20) time.sleep(0.5) by_cmd.send_angle_claw(90) time.sleep(1) by_cmd.send_position_axis_x(1, 160) time.sleep(1.5) # while True: # pass def nexec(self): pass def after(self): pass class put_hanoi3(): def init(self): while by_cmd.send_position_axis_z(30, 130) == -1: pass time.sleep(3) logger.info("完成任务,爪回左侧") while by_cmd.send_angle_claw_arm(128) == -1: pass time.sleep(0.5) while by_cmd.send_position_axis_x(1, 150) == -1: pass time.sleep(1) while by_cmd.send_angle_claw_arm(220) == -1: pass def find(self): time.sleep(1) return True def exec(self): by_cmd.send_position_axis_z(30, 100) pass def nexec(self): pass def after(self): var.pid_turning.set(cfg["put_hanoi3"]["pid_kp"], cfg["put_hanoi3"]["pid_ki"], cfg["put_hanoi3"]["pid_kd"]) # 应急避险 第一阶段 找目标牌 class move_area1(): def init(self): logger.info("应急避险第一阶段初始化") while (by_cmd.send_angle_camera(0) == -1): by_cmd.send_angle_camera(0) def find(self): ret = filter.find(tlabel.SIGN) if ret: return True return False def exec(self): logger.info("找到标示牌") # 停车 ocr 识别文字 调用大模型 car_stop() time.sleep(2) var.task_speed = 8 pass def nexec(self): pass def after(self): # 任务检查间隔 time.sleep(2) pass # 应急避险 第二阶段 找停车区域 class move_area2(): def init(self): logger.info("应急避险第二阶段初始化") self.offset = 15 def find(self): # time.sleep(0.001) ret, box = filter.get(tlabel.SHELTER) if ret: error = (box[0][2] + box[0][0] - 320) / 2 + self.offset if abs(error) < 20: return 5000 return False def exec(self): var.task_speed = 0 logger.info("开始寻找停车区域") car_stop() calibrate_new(tlabel.SHELTER, offset = 15, run = True) time.sleep(1) # 进入停车区域 by_cmd.send_distance_y(10, 450) time.sleep(3) # TODO 调用大模型 然后执行动作 by_cmd.send_light(1) time.sleep(2) by_cmd.send_light(0) # 离开停车区域 by_cmd.send_distance_y(-10, 450) time.sleep(4) pass def nexec(self): pass def after(self): var.pid_turning.set(cfg["move_area"]["pid_kp"], cfg["move_area"]["pid_ki"], cfg["move_area"]["pid_kd"]) by_cmd.send_position_axis_z(30, 0) time.sleep(2) # 扫黑除暴 class kick_ass(): def init(self): while (by_cmd.send_angle_camera(0) == -1): by_cmd.send_angle_camera(0) logger.info("扫黑除暴初始化") self.pos_gap1 = cfg['kick_ass']['pos_gap1'] self.pos_gap2 = cfg['kick_ass']['pos_gap2'] self.target_person = cfg['kick_ass']['target_person'] by_cmd.send_angle_claw(15) by_cmd.send_position_axis_x(1, 160) def find(self): ret = filter.find(tlabel.SIGN) if ret: return True return False def exec(self): logger.info("找到标示牌") # 停的晚无需校准 omage car_stop() time.sleep(1) calibrate_new(tlabel.SIGN, offset = 8, run = True) time.sleep(0.5) by_cmd.send_position_axis_z(30, 60) if self.target_person == 1: target_distance = self.pos_gap1 else: target_distance = self.pos_gap1 + (self.target_person - 1) * self.pos_gap2 + (self.target_person - 1) * 10 by_cmd.send_distance_x(10, target_distance) time.sleep(5) by_cmd.send_angle_claw_arm(220) by_cmd.send_angle_claw(15) by_cmd.send_position_axis_x(1, 160) time.sleep(2) by_cmd.send_position_axis_x(1, 0) time.sleep(3) by_cmd.send_position_axis_x(1, 160) time.sleep(2) pass def nexec(self): pass def after(self): var.pid_turning.set(cfg["kick_ass"]["pid_kp"], cfg["kick_ass"]["pid_ki"], cfg["kick_ass"]["pid_kd"])