from enum import Enum from loguru import logger from utils import label_filter from utils import tlabel from utils import LLM from utils import CountRecord import utils import toml import zmq import time import variable as var import action as act import re import threading import ctypes cfg = None cfg_args = None by_cmd = None filter = None llm_bot = None # 目标检测 socket 客户端 context = None socket = None context1 = None ocr_socket = None global_skip_queue = None ''' description: main.py 里执行 引入全局变量 param {*} _by_cmd 控制器对象 return {*} ''' def import_obj(_by_cmd, skip_queue): global by_cmd global filter global llm_bot global context global socket global context1 global ocr_socket global cfg global cfg_args global global_skip_queue cfg = toml.load('/home/evan/Workplace/project_main/cfg_subtask.toml') # 加载任务配置 cfg_args = toml.load('/home/evan/Workplace/project_main/cfg_args.toml') by_cmd = _by_cmd global_skip_queue = skip_queue # 目标检测 socket 客户端 context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:6667") logger.info("subtask yolo client init") # ocr socket 客户端 # context1 = zmq.Context() # ocr_socket = context1.socket(zmq.REQ) # ocr_socket.connect("tcp://localhost:6668") # logger.info("subtask ocr client init") filter = label_filter(socket) if cfg['move_area']['llm_enable']: llm_bot = LLM() def car_stop(): for _ in range(3): by_cmd.send_speed_x(0) time.sleep(0.2) by_cmd.send_speed_y(0) time.sleep(0.2) by_cmd.send_speed_omega(0) # 蓝色球使用 def calibrate_right_new(label, offset, run = True, run_speed = 3.5): not_found_counts = 0 ret, error = filter.aim_right(label) while not ret: not_found_counts += 1 if not_found_counts >= 20: not_found_counts = 0 error = -320 # error > 0 front run logger.info("calibrate_right_new:找不到次数超过 20 帧 直接前进寻找") break ret, error = filter.aim_right(label) error += offset if abs(error) > 10 and run: if error > 0: by_cmd.send_speed_x(-run_speed) else: by_cmd.send_speed_x(run_speed) # 停的位置已经很接近目标,可以直接使用 distance 校准 else: return while True: ret, error = filter.aim_right(label) while not ret: ret, error = filter.aim_right(label) error += offset # if ret: if abs(error) <= 8: car_stop() logger.info("calibrate_right_new:行进时 误差小于 8 直接停车") ret, error = filter.aim_right(label) while not ret: not_found_counts += 1 if not_found_counts >=30: return ret, error = filter.aim_right(label) error += offset logger.info(f"calibrate_right_new:停车后的误差是{error}") if abs(error) > 8: logger.info(f"calibrate_right_new:停车后的误差大于 8 使用 distance 校准") error = error * 1.5 if error > 0: by_cmd.send_distance_x(-10, int(error)) else: by_cmd.send_distance_x(10, int(-error)) break ''' description: 校准新方法 找到后停止 然后根据 error 判断前后低速前进 error < 5 后直接停车 如果停车后 error > 8 则使用 distance 校准 这个方法仅用于在视野里能找到的情况下进行校准,并不能实现边走边寻找 然后再校准 param {*} label param {*} offset param {*} run param {*} run_speed return {*} ''' def calibrate_new(label, offset, run = True, run_speed = 3.5): not_found_counts = 0 ret, box = filter.get(label) while not ret: not_found_counts += 1 if not_found_counts >= 20: not_found_counts = 0 error = -320 # error > 0 front run logger.info("calibrate_new:找不到次数超过 20 帧 直接前进寻找") break ret, box = filter.get(label) # 如果超过二十帧跳出,则此时 box 为空值,需要再校验 ret if ret is True: error = (box[0][2] + box[0][0] - 320) / 2 + offset if abs(error) > 10 and run: if error > 0: by_cmd.send_speed_x(-run_speed) else: by_cmd.send_speed_x(run_speed) # 停的位置已经很接近目标,可以直接使用 distance 校准 else: if abs(error) > 8: logger.info(f"calibrate_new:停车后误差{error}大于 8 使用 distance 校准") # error = error # 3 if error > 0: by_cmd.send_distance_x(-10, int(error)) else: by_cmd.send_distance_x(10, int(-error)) return logger.info(f"calibrate_new:停车后误差{error}小于 8 不校准") return while True: ret, box = filter.get(label) while not ret: ret, box = filter.get(label) error = (box[0][2] + box[0][0] - 320) / 2 + offset if ret: if abs(error) <= 10: # 5 car_stop() logger.info("calibrate_new:行进时 误差小于 10 直接停车") ret, box = filter.get(label) while not ret: ret, box = filter.get(label) error = (box[0][2] + box[0][0] - 320) / 2 + offset logger.info(f"calibrate_new:停车后的误差是{error}") if abs(error) > 8: logger.info(f"calibrate_new:停车后的误差大于 8 使用 distance 校准") error = error * 3 # logger.error(f"error * 3 {error}") if error > 0: by_cmd.send_distance_x(-10, int(error)) else: by_cmd.send_distance_x(10, int(-error)) break # 弃用 distance 校准 def explore_calibrate_new(label, offset, run_direc ,run_speed = 3.5): # run_direc == 1 向前 stop_error = 0 if run_direc == 1: by_cmd.send_speed_x(run_speed) else: by_cmd.send_speed_x(-run_speed) if label == tlabel.TPLATFORM: stop_error = 8 else: stop_error = 15 while True: ret, box = filter.get(label) while not ret: if not global_skip_queue.empty(): _ = global_skip_queue.get() logger.error("跳过 explore_calibrate_new") return False ret, box = filter.get(label) error = (box[0][2] + box[0][0] - 320) / 2 + offset if ret: # 校准速度越大 停车的条件越宽泛 20 15 if abs(error) <= stop_error: car_stop() logger.info("explore_calibrate_new:行进时 误差小于 15 直接停车") ret, box = filter.get(label) while not ret: ret, box = filter.get(label) error = (box[0][2] + box[0][0] - 320) / 2 + offset logger.info(f"停车后像素误差:{error}") # logger.info(f"explore_calibrate_new:停车后的误差大于 8 使用 distance 校准") # if abs(error) < 8: # error = error * 3 # else: # error = error * 2 # if error > 0: # by_cmd.send_distance_x(-10, int(error)) # else: # by_cmd.send_distance_x(10, int(-error)) break return True def hanoi_calibrate(target_label, error_label, offset, run_direc ,run_speed = 3.5): stop_error = 0 error_record = CountRecord(3) if run_direc == 1: by_cmd.send_speed_x(run_speed) else: by_cmd.send_speed_x(-run_speed) if target_label == tlabel.TPLATFORM: stop_error = 8 else: stop_error = 15 while True: ret1, ret2, box = filter.get_two(target_label, error_label) while not ret1: if not global_skip_queue.empty(): _ = global_skip_queue.get() logger.error("跳过 hanoi_calibrate") return False if ret2: if error_record(ret2): return False ret1, ret2, box = filter.get_two(target_label, error_label) error = (box[0][2] + box[0][0] - 320) / 2 + offset if ret1: # 校准速度越大 停车的条件越宽泛 20 15 if abs(error) <= stop_error: car_stop() logger.info("explore_calibrate_new:行进时 误差小于 15 直接停车") ret, box = filter.get(target_label) while not ret: ret, box = filter.get(target_label) error = (box[0][2] + box[0][0] - 320) / 2 + offset logger.info(f"停车后像素误差:{error}") break return True # 任务类 class task: def __init__(self, name, task_template, find_counts=10, enable=True): self.enable = enable self.task_t = task_template() self.counts = 0 self.find_counts = int(find_counts) self.name = name def init(self): if hasattr(self.task_t, 'init') and callable(getattr(self.task_t, 'init', None)): self.task_t.init() else: logger.warning("[Task ]# 该任务没有 init 方法") def find(self): if hasattr(self.task_t, 'find') and callable(getattr(self.task_t, 'find', None)): return self.task_t.find() else: logger.warning("[Task ]# 该任务没有 find 方法") def exec(self): # 根据标志位确定是否执行该任务 if self.enable is True: if hasattr(self.task_t, 'exec') and callable(getattr(self.task_t, 'exec', None)): logger.info(f"[Task ]# Executing task") self.task_t.exec() else: logger.warning("[Task ]# 该任务没有 exec 方法") else: logger.warning(f"[Task ]# Skip task") if hasattr(self.task_t, 'nexec') and callable(getattr(self.task_t, 'nexec', None)): self.task_t.nexec() else: logger.warning("[Task ]# 该任务没有 nexec 方法") def after(self): if hasattr(self.task_t, 'after') and callable(getattr(self.task_t, 'after', None)): logger.info(f"[Task ]# {self.name} 正在执行 after") self.task_t.after() logger.debug(f"[Task ]# Task completed") else: logger.warning("[Task ]# 该任务没有 after 方法") # 任务队列状态类 class task_queuem_status(Enum): IDEL = 0 SEARCHING = 1 EXECUTING = 2 # 任务队列类 非 EXECUTEING 时均执行 huigui,注意互斥操作 class task_queuem(task): # task_now = task(None, False) def __init__(self, queue): super(task_queuem, self) self.queue = queue self.status = task_queuem_status.IDEL self.busy = True logger.info(f"[TaskM]# Task num {self.queue.qsize()}") # exec 线程 self.exec_thread = None def exec(self, skip_queue): # 如果空闲状态则将下一个队列任务取出 if self.status is task_queuem_status.IDEL: if self.queue.qsize() == 0: self.busy = False logger.info(f"[TaskM]# Task queue empty, exit") return False self.task_now = self.queue.get() # 如果当前任务没有使能,则直接转入执行状态,由任务执行函数打印未执行信息 if self.task_now.enable is True: self.status = task_queuem_status.SEARCHING # 如果使能该任务则执行该任务的初始化动作 self.task_now.init() else: self.status = task_queuem_status.EXECUTING logger.info(f"[TaskM]# ---------------------->>>>") # 阻塞搜索任务标志位 elif self.status is task_queuem_status.SEARCHING: logger.info(f"[{self.task_now.name}]# Start searching task target") while True: if not skip_queue.empty(): _ = skip_queue.get() logger.error(f"{self.task_now.name} 任务在 find 中已经被手动跳过") self.status = task_queuem_status.IDEL # 空动作不需要阻塞巡线,直接置位 self.task_now.after() # 执行任务后处理 self.queue.task_done() # 弹出已执行的任务 return True ret = self.task_now.find() self.task_now.counts += ret if self.task_now.counts >= self.task_now.find_counts: self.status = task_queuem_status.EXECUTING break # 执行任务函数 elif self.status is task_queuem_status.EXECUTING: if self.task_now.enable is True: logger.info(f"[TaskM]# Start execute task function") # self.exec_thread = threading.Thread(target=self.task_now.exec) # # 启动线程 # self.exec_thread.start() # while True: # if not self.exec_thread.is_alive(): # break # else: # if not skip_queue.empty(): # car_stop() # thread_id = self.exec_thread.ident # res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(SystemExit)) # _ = skip_queue.get() # logger.error(f"{self.task_now.name} 任务在 exec 中已经被手动跳过") # self.status = task_queuem_status.IDEL # 空动作不需要阻塞巡线,直接置位 # self.task_now.after() # 执行任务后处理 # self.queue.task_done() # 弹出已执行的任务 # return True self.task_now.exec() # 执行当前任务函数 self.status = task_queuem_status.IDEL # 执行完成后为退出巡线阻塞 self.task_now.after() # 执行任务后处理 self.queue.task_done() # 弹出已执行的任务 logger.info(f"[TaskM]# <<<<----------------------") else: logger.info(f"[TaskM]# Start execute task function (nexec)") self.status = task_queuem_status.IDEL # 空动作不需要阻塞巡线,直接置位 self.task_now.exec() # 执行当前任务函数 self.task_now.after() # 执行任务后处理 self.queue.task_done() # 弹出已执行的任务 logger.info(f"[TaskM]# <<<<----------------------") return True # 人员施救 class get_block1(): def init(self): var.task_speed = 15 act.cmd.camera(0) act.cmd.z2(20, 60, 0) filter.switch_camera(1) # if cfg['get_block']['first_block'] == "blue": # self.target_label = tlabel.BBLOCK # self.another_label = tlabel.RBLOCK # else: # self.target_label = tlabel.RBLOCK # self.another_label = tlabel.BBLOCK # cfg['get_block']['first_block'] = "red" self.target_label = [tlabel.RBLOCK, tlabel.BBLOCK] self.target_counts = [0, 0] def find(self): # 目标检测红/蓝方块 # ret = filter.find(self.target_label) # if ret > 0: # return True # return False ret = filter.find_mult(self.target_label) self.target_counts[0] += ret[0] self.target_counts[1] += ret[1] if any(ret): return True return False def exec(self): car_stop() if self.target_counts[0] > self.target_counts[1]: var.first_block = tlabel.RBLOCK var.second_block = tlabel.BBLOCK else: var.first_block = tlabel.BBLOCK var.second_block = tlabel.RBLOCK calibrate_new(var.first_block, offset = 16, run = True, run_speed = 5) logger.info("抓取块") by_cmd.send_position_axis_z(30, 60) time.sleep(1) by_cmd.send_angle_claw(63) by_cmd.send_position_axis_x(1, 20) time.sleep(1) by_cmd.send_angle_claw(25) time.sleep(0.5) by_cmd.send_position_axis_z(30, 90) time.sleep(0.5) by_cmd.send_angle_claw_arm(175) time.sleep(0.1) by_cmd.send_position_axis_x(1, 100) time.sleep(1) by_cmd.send_position_axis_z(30, 70) time.sleep(0.5) by_cmd.send_angle_claw(63) time.sleep(0.5) by_cmd.send_position_axis_z(30, 130) time.sleep(1) by_cmd.send_position_axis_x(1, 140) by_cmd.send_angle_claw_arm(225) time.sleep(0.5) # by_cmd.send_angle_storage(55) # time.sleep(1) by_cmd.send_position_axis_z(30, 60) pass def nexec(self): # TODO 完成不执行任务的空动作 pass def after(self): var.pid_turning.set(cfg["get_block"]["pid_kp"], cfg["get_block"]["pid_ki"], cfg["get_block"]["pid_kd"]) pass class get_block2(): def init(self): while (by_cmd.send_angle_camera(0) == -1): by_cmd.send_angle_camera(0) filter.switch_camera(1) # if cfg['get_block']['first_block'] == "red": # self.target_label = tlabel.BBLOCK # self.another_label = tlabel.RBLOCK # else: # self.target_label = tlabel.RBLOCK # self.another_label = tlabel.BBLOCK def find(self): # 目标检测红/蓝方块 ret = filter.find(var.second_block) if ret > 0: return True return False def exec(self): car_stop() calibrate_new(var.second_block, offset = 16, run = True, run_speed = 5) logger.info("抓取块") time.sleep(0.5) by_cmd.send_angle_claw_arm(225) by_cmd.send_angle_claw(63) time.sleep(0.1) by_cmd.send_position_axis_x(1, 20) time.sleep(1) by_cmd.send_angle_claw(25) time.sleep(0.5) by_cmd.send_position_axis_x(1, 100) # by_cmd.send_distance_x(15, 100) time.sleep(2) pass def nexec(self): # TODO 完成不执行任务的空动作 pass def after(self): var.task_speed = 0 var.pid_turning.set(cfg["get_block"]["pid_kp"], cfg["get_block"]["pid_ki"], cfg["get_block"]["pid_kd"]) # 任务检查间隔 time.sleep(7) # 紧急转移 class put_block(): def init(self): var.task_speed = 0 while (by_cmd.send_angle_camera(0) == -1): by_cmd.send_angle_camera(0) logger.info("紧急转移初始化") filter.switch_camera(1) def find(self): # 目标检测医院 ret, box = filter.get(tlabel.HOSPITAL) if ret > 0: width = box[0][2] - box[0][0] if width > 130: return True return False def exec(self): logger.info("找到医院") car_stop() calibrate_new(tlabel.HOSPITAL, offset = 7, run = False) by_cmd.send_distance_x(10, 100) by_cmd.send_position_axis_z(30, 0) time.sleep(0.5) by_cmd.send_position_axis_x(1, 20) time.sleep(1) by_cmd.send_angle_claw(63) time.sleep(1) # 放置第二個塊 by_cmd.send_angle_storage(20) by_cmd.send_position_axis_x(1, 110) by_cmd.send_position_axis_z(30, 120) time.sleep(1.5) by_cmd.send_angle_claw_arm(180) by_cmd.send_angle_claw(85) # by_cmd.send_angle_storage(0) time.sleep(1) by_cmd.send_position_axis_z(30,70) time.sleep(1) by_cmd.send_angle_claw(25) by_cmd.send_distance_x(-10, 110) time.sleep(1) by_cmd.send_position_axis_z(30, 110) time.sleep(1) by_cmd.send_angle_claw_arm(225) time.sleep(1) by_cmd.send_position_axis_z(30, 0) time.sleep(0.5) by_cmd.send_position_axis_x(1, 20) time.sleep(1.5) by_cmd.send_angle_claw(45) time.sleep(1) pass def nexec(self): pass def after(self): var.pid_turning.set(cfg["put_block"]["pid_kp"], cfg["put_block"]["pid_ki"], cfg["put_block"]["pid_kd"]) # 下一动作预备位置 while by_cmd.send_position_axis_z(30, 130) == -1: pass time.sleep(1) while by_cmd.send_position_axis_x(1, 0) == -1: pass while by_cmd.send_angle_claw_arm(45) == -1: pass # 任务检查间隔 # time.sleep(2) # 整装上阵 class get_bball(): def init(self): while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) # by_cmd.send_position_axis_z(30, 135) # time.sleep(0.5) # by_cmd.send_position_axis_x(1, 0) # time.sleep(2) # by_cmd.send_angle_claw_arm(45) while (by_cmd.send_angle_storage(0) == -1): by_cmd.send_angle_storage(0) # 调试 临时换源 filter.switch_camera(1) logger.info("整装上阵初始化") # time.sleep(0.5) self.record = CountRecord(5) def find(self): # 目标检测蓝球 # ret = filter.find(tlabel.BBALL) ret = filter.find_mult([tlabel.BBALL, tlabel.YBALL]) # ret = filter.find(tlabel.BBALL) or filter.find(tlabel.YBALL) # TODO 此处使用连续检出判断感觉不是很好,黄球放置远时停车较晚,可能跟请求速度相关 # if ret: # if self.record(tlabel.BBALL): # return True if ret[0] or ret[1]: return True return False def exec(self): logger.info("找到蓝色球") car_stop() time.sleep(0.5) for _ in range(3): calibrate_right_new(tlabel.BBALL, offset = 18, run = True, run_speed = 5) logger.info("抓蓝色球") time.sleep(0.5) by_cmd.send_angle_claw_arm(45) by_cmd.send_angle_claw(54) by_cmd.send_position_axis_x(1, 160) time.sleep(1.2) by_cmd.send_angle_claw(25) time.sleep(1) by_cmd.send_distance_axis_z(30, 20) time.sleep(1) by_cmd.send_position_axis_x(1, 60) time.sleep(0.5) by_cmd.send_distance_axis_z(30, -40) time.sleep(0.5) by_cmd.send_angle_claw_arm(80) time.sleep(0.5) by_cmd.send_angle_claw(54) time.sleep(0.5) by_cmd.send_angle_claw_arm(45) time.sleep(1) by_cmd.send_position_axis_z(30, 135) # 继续向前走 # by_cmd.send_speed_x(4) pass def nexec(self): pass def after(self): var.pid_turning.set(cfg["get_bball"]["pid_kp"], cfg["get_bball"]["pid_ki"], cfg["get_bball"]["pid_kd"]) # 下一动作预备位置 by_cmd.send_angle_claw(30) time.sleep(0.5) while by_cmd.send_position_axis_z(30, 0) == -1: pass time.sleep(1) # # 任务检查间隔 # time.sleep(1) # 通信抢修 class up_tower(): def init(self): logger.info("通信抢修初始化") while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) filter.switch_camera(1) def find(self): # 目标检测通信塔 ret = filter.find(tlabel.TOWER) if ret: return True return False def exec(self): logger.info("找到塔") car_stop() time.sleep(1) calibrate_new(tlabel.TOWER, offset = 20, run = True) time.sleep(1) # calibrate(tlabel.TOWER, 27, False, 6) by_cmd.send_distance_x(-10, 120) time.sleep(1) # 上古參數 # by_cmd.send_distance_y(-10, 50) # 6_9 模型參數 by_cmd.send_distance_y(-10, 40) # 7_12_3 模型參數 # by_cmd.send_distance_y(-10, 50) # time.sleep(1) car_stop() # FIXME 如果下發 distance 後直接 car_stop,則 distance 執行時間僅由指令間處理延時決定 # time.sleep(3) # by_cmd.send_speed_y(-10) # time.sleep(0.15) by_cmd.send_angle_zhuan(10) time.sleep(12) by_cmd.send_speed_y(10) time.sleep(0.3) car_stop() by_cmd.send_angle_zhuan(0) # while True: # pass def nexec(self): pass def after(self): var.pid_turning.set(cfg["up_tower"]["pid_kp"], cfg["up_tower"]["pid_ki"], cfg["up_tower"]["pid_kd"]) # 下一动作预备位置 by_cmd.send_position_axis_z(30, 0) # 任务检查间隔 time.sleep(4) # 高空排险 class get_rball(): def init(self): filter.switch_camera(1) logger.info("高空排险初始化") while (by_cmd.send_angle_camera(180) == -1): by_cmd.send_angle_camera(180) self.record = CountRecord(3) def find(self): # 目标检测红球 ret = filter.find(tlabel.RBALL) if ret > 0: # TODO 连续两帧才开始减速 if self.record(tlabel.RBALL): var.task_speed = 5 return True else: return False def exec(self): logger.info("找到红球") var.task_speed = 0 car_stop() time.sleep(0.5) # 靠近塔 by_cmd.send_angle_scoop(20) # 上古參數 # by_cmd.send_distance_y(-15, 45) # 50 # 6_9 參數 by_cmd.send_distance_y(-15, 35) time.sleep(2) # 7_12_3 參數 # by_cmd.send_distance_y(-15, 45) # time.sleep(2) car_stop() calibrate_new(tlabel.RBALL,offset = 44, run = True) time.sleep(1) logger.info("抓红球") # by_cmd.send_angle_scoop(15) time.sleep(0.5) by_cmd.send_position_axis_z(30, 170) time.sleep(2.5) by_cmd.send_angle_scoop(7) time.sleep(0.5) by_cmd.send_speed_y(15) time.sleep(0.2) car_stop() # by_cmd.send_angle_omega(-55,30) # while True: # pass pass def nexec(self): pass def after(self): var.pid_turning.set(cfg["get_rball"]["pid_kp"], cfg["get_rball"]["pid_ki"], cfg["get_rball"]["pid_kd"]) # 任务检查间隔 time.sleep(2) # 派发物资 class put_bball(): def init(self): logger.info("派发物资初始化") filter.switch_camera(1) while by_cmd.send_position_axis_z(30, 0) == -1: pass while by_cmd.send_angle_camera(90) == -1: pass def find(self): ret = filter.find(tlabel.BASKET) if ret > 0: return True else: return False def exec(self): logger.info("找到篮筐") car_stop() calibrate_new(tlabel.BASKET,offset = -40, run = True, run_speed = 6) # by_cmd.send_distance_x(10, 10) # 向左运动 # by_cmd.send_distance_y(-10, 35) # by_cmd.send_angle_storage(10) # time.sleep(1) by_cmd.send_angle_storage(50) logger.info("把球放篮筐里") time.sleep(1) # by_cmd.send_distance_y(10, 55) by_cmd.send_angle_storage(20) # time.sleep(1) # car_stop() pass def nexec(self): pass def after(self): var.pid_turning.set(cfg["put_bball"]["pid_kp"], cfg["put_bball"]["pid_ki"], cfg["put_bball"]["pid_kd"]) # 任务检查间隔 time.sleep(2) # 物资盘点 class put_hanoi1(): def init(self): logger.info("物资盘点 1 初始化") filter.switch_camera(2) def find(self): ret, label, error = filter.get_mult_box([tlabel.LMARK, tlabel.RMARK]) if label == tlabel.RMARK: if abs(error) < 55: logger.info("向右拐") utils.direction_right += 1 return True return False elif label == tlabel.LMARK: if abs(error) < 50: logger.info("向左拐") utils.direction_left += 1 return True return False else: return False def exec(self): global direction for _ in range(3): by_cmd.send_speed_x(0) time.sleep(0.2) by_cmd.send_speed_omega(0) time.sleep(0.2) by_cmd.send_position_axis_z(30, 130) # 校准牌子 if utils.direction_right > utils.direction_left: ret, error = filter.aim_near(tlabel.RMARK) while not ret: ret, error = filter.aim_near(tlabel.RMARK) utils.direction = tlabel.RMARK logger.info("应该向右转") else: ret, error = filter.aim_near(tlabel.LMARK) while not ret: ret, error = filter.aim_near(tlabel.LMARK) utils.direction = tlabel.LMARK logger.info("应该向左转") # 校准 omega for _ in range(10): ret, box = filter.get(utils.direction) if ret: error = (box[0][2] + box[0][0] - 320) / 2 by_cmd.send_speed_omega(-error * 0.8) time.sleep(0.2) car_stop() # 前进 # by_cmd.send_distance_x(10, 200) # by_cmd.send_distance_x(10, 180) # by_cmd.send_distance_x(10, 180) # time.sleep(1.5) # car_stop() while True: by_cmd.send_speed_x(8.5) ret, box = filter.get(utils.direction) if ret: if abs(box[0][2] - box[0][0]) > 41: car_stop() break # 根据方向初始化执行器位置 if utils.direction is tlabel.RMARK: # FIXME 右侧的爪子会被 storage 挡住 by_cmd.send_position_axis_x(1, 0) by_cmd.send_angle_claw_arm(45) by_cmd.send_angle_storage(0) else: by_cmd.send_position_axis_x(1, 150) by_cmd.send_angle_claw_arm(225) by_cmd.send_angle_storage(55) time.sleep(1) by_cmd.send_position_axis_z(30, 10) if utils.direction_right > utils.direction_left: utils.direction = tlabel.RMARK # by_cmd.send_angle_omega(-25,430) # by_cmd.send_angle_omega(-55,194) # by_cmd.send_angle_omega(-45,238) by_cmd.send_angle_omega(-45,252) time.sleep(2) while (by_cmd.send_angle_camera(90) == -1): by_cmd.send_angle_camera(90) else: utils.direction = tlabel.LMARK # by_cmd.send_angle_omega(25,430) # by_cmd.send_angle_omega(55,194) # by_cmd.send_angle_omega(45,238) by_cmd.send_angle_omega(45,252) time.sleep(2) while (by_cmd.send_angle_camera(0) == -1): by_cmd.send_angle_camera(0) time.sleep(0.5) filter.switch_camera(1) def nexec(self): pass def after(self): var.switch_lane_model = True if utils.direction == tlabel.RMARK: var.pid_turning.set(cfg["put_hanoi1"]["pid_kp"] - 0.2, cfg["put_hanoi1"]["pid_ki"], cfg["put_hanoi1"]["pid_kd"]) else: var.pid_turning.set(cfg["put_hanoi1"]["pid_kp"], cfg["put_hanoi1"]["pid_ki"], cfg["put_hanoi1"]["pid_kd"]) pass time.sleep(1.5) class put_hanoi2(): def __init__(self): if cfg['put_hanoi2']['first_target'] == "lp": self.target_label = tlabel.LPILLER elif cfg['put_hanoi2']['first_target'] == "mp": self.target_label = tlabel.MPILLER elif cfg['put_hanoi2']['first_target'] == "sp": self.target_label = tlabel.SPILLER def init(self): logger.info("物资盘点 2 初始化") var.task_speed = 8.5 if utils.direction == tlabel.RMARK: # 15 self.offset = 14 # self.platform_offset = -25 self.platform_offset = -19 else: self.offset = 14 #self.platform_offset = -30 self.platform_offset = -19 # 延时,防止过早看到 tplatform(虽然此现象相当少见且诡异) time.sleep(0.5) def find(self): # ret, box = filter.get(self.target_label) ret, box = filter.get(tlabel.TPLATFORM) if ret: error = (box[0][2] + box[0][0] - 320) / 2 + self.platform_offset if error > 0: return True return False def exec(self): logger.info(f"direction:{utils.direction.name}") var.task_speed = 0 car_stop() # if utils.direction is tlabel.RMARK: # by_cmd.send_distance_y(10, 50) # time.sleep(1) # time.sleep(0.5) # by_cmd.send_distance_x(-8, cfg['put_hanoi2']['pos_gap'] - 30) # time.sleep(1) ret = explore_calibrate_new(tlabel.LPILLER, offset = self.offset, run_direc = 1, run_speed = 5) if not ret: logger.error("跳过物资盘点 2 exec") return time.sleep(0.5) logger.info("抓大平台") if utils.direction is tlabel.RMARK: by_cmd.send_position_axis_z(30, 10) by_cmd.send_position_axis_x(1, 150) by_cmd.send_angle_claw(63) time.sleep(2) by_cmd.send_angle_claw(40) time.sleep(0.5) by_cmd.send_distance_axis_z(30, 20) time.sleep(0.5) by_cmd.send_position_axis_x(1, 10) time.sleep(1) pass else: by_cmd.send_position_axis_z(30, 10) by_cmd.send_position_axis_x(1, 40) by_cmd.send_angle_claw(63) time.sleep(2) by_cmd.send_angle_claw(40) time.sleep(0.5) by_cmd.send_distance_axis_z(30, 20) time.sleep(0.5) by_cmd.send_position_axis_x(1, 160) time.sleep(1) pass ret = explore_calibrate_new(tlabel.TPLATFORM, offset = self.offset, run_direc = -1, run_speed = 5) if not ret: logger.error("跳过物资盘点 2 exec") return time.sleep(0.5) logger.info("放大平台") if utils.direction is tlabel.RMARK: by_cmd.send_position_axis_x(1, 150) time.sleep(1.5) by_cmd.send_distance_axis_z(30, -20) time.sleep(1) by_cmd.send_angle_claw(81) time.sleep(0.5) by_cmd.send_angle_claw(63) time.sleep(0.5) by_cmd.send_position_axis_x(1, 10) time.sleep(1) pass else: by_cmd.send_position_axis_x(1, 40) time.sleep(1.5) by_cmd.send_distance_axis_z(30, -20) time.sleep(1) by_cmd.send_angle_claw(81) time.sleep(0.5) by_cmd.send_angle_claw(63) time.sleep(0.5) by_cmd.send_position_axis_x(1, 160) time.sleep(1) ret = explore_calibrate_new(tlabel.MPILLER, offset = self.offset, run_direc = 1, run_speed = 5) if not ret: logger.error("跳过物资盘点 2 exec") return time.sleep(0.5) logger.info("抓中平台") if utils.direction is tlabel.RMARK: by_cmd.send_position_axis_z(30, 10) by_cmd.send_position_axis_x(1, 150) by_cmd.send_angle_claw(63) time.sleep(2) by_cmd.send_angle_claw(35) time.sleep(0.5) by_cmd.send_distance_axis_z(30, 20) time.sleep(0.5) by_cmd.send_position_axis_x(1, 10) time.sleep(1) pass else: by_cmd.send_position_axis_z(30, 10) by_cmd.send_position_axis_x(1, 40) by_cmd.send_angle_claw(63) time.sleep(2) by_cmd.send_angle_claw(35) time.sleep(0.5) by_cmd.send_distance_axis_z(30, 20) time.sleep(0.5) by_cmd.send_position_axis_x(1, 160) time.sleep(1) pass # ret = explore_calibrate_new(tlabel.LPILLER, offset = self.offset, run_direc = -1, run_speed = 5) ret = hanoi_calibrate(tlabel.LPILLER, tlabel.TPLATFORM, offset = self.offset, run_direc = -1, run_speed = 5) if not ret: logger.error("跳过物资盘点 2 exec") return time.sleep(0.5) logger.info("放中平台") if utils.direction is tlabel.RMARK: by_cmd.send_position_axis_z(30, 120) time.sleep(2) by_cmd.send_position_axis_x(1, 150) time.sleep(2) by_cmd.send_distance_axis_z(30, -20) time.sleep(0.5) by_cmd.send_angle_claw(65) time.sleep(0.5) by_cmd.send_position_axis_x(1, 10) time.sleep(1) pass else: by_cmd.send_position_axis_z(30, 120) time.sleep(2) by_cmd.send_position_axis_x(1, 40) time.sleep(2) by_cmd.send_distance_axis_z(30, -20) time.sleep(0.5) by_cmd.send_angle_claw(65) time.sleep(0.5) by_cmd.send_position_axis_x(1, 160) time.sleep(1) ret = explore_calibrate_new(tlabel.SPILLER, offset = self.offset, run_direc = 1, run_speed = 5) if not ret: logger.error("跳过物资盘点 2 exec") return time.sleep(0.5) logger.info("抓小平台") if utils.direction is tlabel.RMARK: by_cmd.send_position_axis_z(30, 10) by_cmd.send_position_axis_x(1, 150) by_cmd.send_angle_claw(50) time.sleep(2) by_cmd.send_angle_claw(27) time.sleep(1) by_cmd.send_distance_axis_z(30, 10) time.sleep(0.5) by_cmd.send_position_axis_x(1, 10) time.sleep(1) pass else: by_cmd.send_position_axis_z(30, 10) by_cmd.send_position_axis_x(1, 40) by_cmd.send_angle_claw(50) time.sleep(2) by_cmd.send_angle_claw(27) time.sleep(1) by_cmd.send_distance_axis_z(30, 10) time.sleep(0.5) by_cmd.send_position_axis_x(1, 160) time.sleep(1) pass # ret = explore_calibrate_new(tlabel.MPILLER, offset = self.offset, run_direc = -1, run_speed = 5) ret = hanoi_calibrate(tlabel.MPILLER, tlabel.LPILLER, offset = self.offset, run_direc = -1, run_speed = 5) if not ret: logger.error("跳过物资盘点 2 exec") return time.sleep(0.5) logger.info("放小平台") if utils.direction is tlabel.RMARK: by_cmd.send_position_axis_z(30, 190) # 170 time.sleep(1.5) by_cmd.send_position_axis_x(1, 150) time.sleep(2) by_cmd.send_distance_axis_z(30, -20) time.sleep(0.5) by_cmd.send_angle_claw(80) time.sleep(0.5) by_cmd.send_position_axis_x(1, 10) time.sleep(1) # by_cmd.send_speed_y(10) # time.sleep(0.12) # car_stop() pass else: by_cmd.send_position_axis_z(30, 190) time.sleep(1.5) by_cmd.send_position_axis_x(1, 40) time.sleep(2) by_cmd.send_distance_axis_z(30, -20) time.sleep(0.5) by_cmd.send_angle_claw(80) time.sleep(0.5) by_cmd.send_position_axis_x(1, 160) time.sleep(1.5) # while True: # pass def nexec(self): pass def after(self): var.switch_lane_model = False if utils.direction is tlabel.RMARK: var.pid_turning.set(cfg["put_hanoi2"]["pid_kp"] - 0.2, cfg["put_hanoi2"]["pid_ki"], cfg["put_hanoi2"]["pid_kd"]) else: var.pid_turning.set(cfg["put_hanoi2"]["pid_kp"], cfg["put_hanoi2"]["pid_ki"], cfg["put_hanoi2"]["pid_kd"]) pass class put_hanoi3(): def init(self): while by_cmd.send_position_axis_z(30, 130) == -1: pass time.sleep(3) logger.info("完成任务,爪回左侧") while by_cmd.send_angle_claw_arm(128) == -1: pass time.sleep(0.5) while by_cmd.send_position_axis_x(1, 0) == -1: pass time.sleep(1) # while by_cmd.send_angle_claw_arm(225) == -1: # pass while by_cmd.send_angle_claw(85) == -1: pass def find(self): time.sleep(1) return True def exec(self): while by_cmd.send_position_axis_z(30, 100) == -1: pass pass def nexec(self): pass def after(self): by_cmd.send_position_axis_x(1, 150) var.pid_turning.set(cfg["put_hanoi3"]["pid_kp"], cfg["put_hanoi3"]["pid_ki"], cfg["put_hanoi3"]["pid_kd"]) # 应急避险 第一阶段 找目标牌 class move_area1(): def init(self): logger.info("应急避险第一阶段初始化") while (by_cmd.send_angle_camera(0) == -1): by_cmd.send_angle_camera(0) def find(self): ret = filter.find(tlabel.SIGN) if ret: return True return False def exec(self): logger.info("找到标示牌") # 停车 ocr 识别文字 调用大模型 car_stop() time.sleep(1) # calibrate_new(tlabel.SIGN, offset = 8, run = True) calibrate_new(tlabel.SIGN, offset = -30, run = True, run_speed = 5) time.sleep(1) by_cmd.send_position_axis_x(1, 50) time.sleep(1) # filter_w = (148, 560) # filter_h = (165, 390) counts = 0 while True: ocr_socket.send_string("") resp = ocr_socket.recv_pyobj() var.llm_text = '' counts += 1 if resp.get('code') == 0: for item in resp.get('content'): if item['probability']['average'] < 0.80: continue # box = item['location'] # center_x = box['left'] + box['width'] / 2 # center_y = box['top'] + box['height'] / 2 # if center_x < filter_w[0] or center_x > filter_w[1] \ # or center_y < filter_h[0] or center_y > filter_h[1]: # continue var.llm_text += item['words'] break if counts >= 2: var.skip_llm_task_flag = True return logger.error(var.llm_text) if len(var.llm_text) < 3: var.skip_llm_task_flag = True return var.task_speed = 9 # 12 pass def nexec(self): pass def after(self): # 任务检查间隔 by_cmd.send_position_axis_x(1, 150) # time.sleep(1) by_cmd.send_angle_claw_arm(225) pass # 应急避险 第二阶段 找停车区域 class move_area2(): def init(self): logger.info("应急避险第二阶段初始化") self.offset = 15 self.delta_x = 0 self.delta_y = 0 self.delta_omage = 0 def find(self): # time.sleep(0.001) if var.skip_llm_task_flag: return 5000 ret, box = filter.get(tlabel.SHELTER) if ret: error = (box[0][2] + box[0][0] - 320) / 2 + self.offset if abs(error) < 20: return 5000 return False def sub_light(self, delay_time): by_cmd.send_light(1) time.sleep(delay_time) by_cmd.send_light(0) def sub_beep(self,delay_time): by_cmd.send_beep(1) time.sleep(delay_time) by_cmd.send_beep(0) def sub_move(self, x, y): # FIXME 如果同時有 xy,是否會造成 delay 不足 self.delta_x += x self.delta_y += y if x != 0: delay_time = int(abs(x) * 500) if x > 0: by_cmd.send_distance_x(15, delay_time) else: by_cmd.send_distance_x(-15, delay_time) elif y != 0: delay_time = int(abs(y) * 500) if y > 0: # 向左 by_cmd.send_distance_y(-15, delay_time) else: by_cmd.send_distance_y(15, delay_time) time.sleep(delay_time / 500) car_stop() pass def sub_turn(self, angle): self.delta_omage += angle delay_time = int(abs(angle) * 400 / 90) if angle < 0: # 左转 by_cmd.send_angle_omega(+55, delay_time) else: # 右转 by_cmd.send_angle_omega(-55, delay_time) time.sleep(delay_time / 300 * 1.5) def exec(self): var.task_speed = 0 if var.skip_llm_task_flag: logger.error("ocr 识别出错 直接跳过改任务") return logger.info("开始寻找停车区域") car_stop() calibrate_new(tlabel.SHELTER, offset = 15, run = True) time.sleep(0.5) # 调用大模型 然后执行动作 try: resp = llm_bot.get_command_json(var.llm_text) logger.info(resp) except: logger.error("大模型超时,跳过任务") return try: resp_commands = eval(re.findall("```json(.*?)```", resp, re.S)[0]) if len(resp_commands) == 0: return # 进入停车区域 # by_cmd.send_speed_y(15) by_cmd.send_distance_y(25, 180) time.sleep(1) # time.sleep(1.25) car_stop() logger.info(resp_commands) for command in resp_commands: logger.info(command) if command['func'] == 'move': self.sub_move(float(command['x']), float(command['y'])) elif command['func'] == 'light': self.sub_light(int(command['time'])) elif command['func'] == 'beep': self.sub_beep(int(command['time'])) elif command['func'] == 'turn': self.sub_turn(int(command['angle'])) pass else: continue time.sleep(0.5) except: pass time.sleep(1) # 回到原位 delay_time = int(abs(self.delta_omage) * 400 / 90) if int(abs(self.delta_omage)) == 360: delay_time = 0 if self.delta_omage < 0: # 左转 by_cmd.send_angle_omega(-55, delay_time) else: # 右转 by_cmd.send_angle_omega(55, delay_time) time.sleep(delay_time / 300 * 1.5) if self.delta_y > 0: # 向左移动的距离就要比进入的时候少一些 因为 action 已经向左运动了 delay_time = 180 - (self.delta_y * 500) else: delay_time = 180 + (abs(self.delta_y) * 500) # 离开停车区域 by_cmd.send_distance_y(-25, delay_time) time.sleep(delay_time * 5e-3) car_stop() # FIXME 移动距离指令下发后未完成,再发送速度指令,将不会清除未完成的速度值 # by_cmd.send_distance_y(-15, 300) pass def nexec(self): logger.warning("正在跳過大模型任務") time.sleep(2) pass def after(self): var.pid_turning.set(cfg["move_area"]["pid_kp"], cfg["move_area"]["pid_ki"], cfg["move_area"]["pid_kd"]) by_cmd.send_position_axis_z(30, 0) while by_cmd.send_angle_claw(90) == -1: pass time.sleep(2) # 扫黑除暴 class kick_ass(): def init(self): while (by_cmd.send_angle_camera(0) == -1): by_cmd.send_angle_camera(0) logger.info("扫黑除暴初始化") self.pos_gap1 = cfg['kick_ass']['pos_gap1'] self.pos_gap2 = cfg['kick_ass']['pos_gap2'] self.target_person = cfg_args['kick_ass']['target_person'] # by_cmd.send_angle_claw(15) by_cmd.send_position_axis_x(1, 160) def find(self): ret = filter.find(tlabel.SIGN) if ret: return True return False def exec(self): logger.info("找到标示牌") # 停的晚无需校准 omage car_stop() time.sleep(1) calibrate_new(tlabel.SIGN, offset = 8, run = True) by_cmd.send_angle_claw(15) time.sleep(0.5) by_cmd.send_position_axis_z(30, 60) by_cmd.send_position_axis_x(1, 130) if self.target_person == 1: target_distance = self.pos_gap1 else: target_distance = self.pos_gap1 + (self.target_person - 1) * self.pos_gap2 + (self.target_person - 1) * 10 by_cmd.send_distance_x(10, target_distance) logger.info(f"target distance {target_distance}") time.sleep(1.5 + (self.target_person - 1) * 0.7 ) car_stop() # by_cmd.send_angle_claw_arm(225) # time.sleep(0.5) by_cmd.send_position_axis_x(1, 20) time.sleep(3) by_cmd.send_position_axis_x(1, 120) time.sleep(1) logger.debug("結束任務,前進四") # by_cmd.send_speed_x(25) # time.sleep(4) pass def nexec(self): pass def after(self): var.pid_turning.set(cfg["kick_ass"]["pid_kp"], cfg["kick_ass"]["pid_ki"], cfg["kick_ass"]["pid_kd"])