import threading import time import zmq import queue import toml import numpy as np from loguru import logger from simple_pid import PID from utils import PidWrap bycmd = None move = None axis = None cmd = None cfg = toml.load('/home/evan/Workplace/project_main/cfg_action.toml') def import_obj(_bycmd): global bycmd global move global axis global cmd bycmd = _bycmd move = move_cls() axis = axis_cls() cmd = cmd_cls() class lane_cls: def __init__(self, _speed, _time, _pid_argv = {"kp" : 1.2, "ki" : 0, "kd" : 0}) -> None: self.context = zmq.Context() self.socket = self.context.socket(zmq.REQ) self.socket.connect("tcp://localhost:6666") self.speed = _speed self.time = _time # 每 20ms 控制一次,故次数乘以 50 self.count = self.time * 50 logger.info(f"count = {self.count}") self.busy = False self. pid = PidWrap(_pid_argv["kp"], _pid_argv["ki"], _pid_argv["kd"], output_limits=50) pass def loop(self): while self.count >= 0: self.count -= 1 self.socket.send_string("") resp = self.socket.recv_pyobj() # 检查该值是否有效 ck_val = resp.get('data') if isinstance(ck_val, np.ndarray): x = ck_val[0] y = ck_val[1] else: continue err = x - 160 pid_out = self.pid.get(err) bycmd.send_speed_omega(pid_out) # logger.debug(f"[任务中巡线模式]# error:{err} out:{pid_out}") time.sleep(0.012) # 时间补偿,保证控制周期 (虽然还是挺不准的) for _ in range(3): bycmd.send_speed_x(0) bycmd.send_speed_omega(0) self.busy = False logger.info("[任务中巡线模式] #结束") return def start(self): logger.info(f"[任务中巡线模式] #开启,速度:{self.speed} 时间:{self.time}") bycmd.send_speed_x(self.speed) self.busy = True thread = threading.Thread(target=self.loop) thread.start() pass def inquire(self): # logger.debug(f"[任务中巡线模式]# 查询 当前 busy 状态 {self.busy}") return self.busy class move_cls: # def x(self, _speed): # bycmd.send_speed_x(_speed) # pass # def y(self, _speed): # bycmd.send_speed_y(_speed) # pass # def z(self, _speed): # bycmd.send_speed_omega(_speed) # pass def lane(self, _speed, _time): lane_obj = lane_cls(_speed, _time) lane_obj.start() # 此处为阻塞实现,非阻塞调用可直接从 lane_cls 类构造对象,然后使用查询方法 # time.sleep(0.5) # 等待子线程启动 while lane_obj.inquire() is True: pass # TODO 新运动指令类,指令的发送和完成查询功能,发送时开启新线程 class cmd_cls(): # FIXME 延时计算时没有关联轴速度 def __init__(self) -> None: # 根据设备上电复位后实际位置设置 self.__axis_x_pos = 160 self.__axis_z_pos = 100 # self.__axis_claw_pos = 0 # self.__axis_claw_arm_pos = 0 # self.__axis_storage_pos = 0 pass def z(self, _speed, _dis, _time_via = -1): # 设置绝对位置 self.__axis_z_pos += _dis # 如果未指定延时时间,则按照相对位移计算延时时间 if _time_via == -1: time_via = abs(_dis) * cfg["axis"]["axis_z_time_via_alpha"] else: time_via = _time_via logger.info(f"z[{self.__axis_z_pos}] speed:{_speed:.2f}, dis:{_dis:.2f}, time_via:{time_via:.2f}") while bycmd.send_position_axis_z(_speed, self.__axis_z_pos) == -1: pass time.sleep(time_via) def z2(self, _speed, _pos, _time_via = -1): # 计算绝对位置 _dis = self.__axis_z_pos - _pos self.__axis_z_pos = _pos # 如果未指定延时时间,则按照相对位移计算延时时间 if _time_via == -1: time_via = abs(_dis) * cfg["axis"]["axis_z_time_via_alpha"] else: time_via = _time_via logger.info(f"z[{self.__axis_z_pos}] speed:{_speed:.2f}, pos:{_pos:.2f}, time_via:{time_via:.2f}") while bycmd.send_position_axis_z(_speed, self.__axis_z_pos) == -1: pass time.sleep(time_via) def x(self, _speed, _dis, _time_via = -1): # 设置绝对位置 self.__axis_x_pos += _dis # 如果未指定延时时间,则按照相对位移计算延时时间 if _time_via == -1: time_via = abs(_dis) * cfg["axis"]["axis_z_time_via_alpha"] else: time_via = _time_via logger.info(f"x[{self.__axis_x_pos}] speed:{_speed:.2f}, dis:{_dis:.2f}, time_via:{time_via:.2f}") while bycmd.send_position_axis_x(_speed, self.__axis_x_pos) == -1: pass time.sleep(time_via) def x2(self, _speed, _pos, _time_via = -1): # 计算绝对位置 _dis = self.__axis_x_pos - _pos self.__axis_x_pos = _pos # 如果未指定延时时间,则按照相对位移计算延时时间 if _time_via == -1: time_via = abs(_dis) * cfg["axis"]["axis_x_time_via_alpha"] else: time_via = _time_via logger.info(f"x[{self.__axis_x_pos}] speed:{_speed:.2f}, pos:{_pos:.2f}, time_via:{time_via:.2f}") while bycmd.send_position_axis_x(_speed, self.__axis_x_pos) == -1: pass time.sleep(time_via) def camera(self, angle): while bycmd.send_angle_camera(angle) == -1: pass def claw(self, angle): while bycmd.send_angle_claw(angle) == -1: pass time.sleep(0.5) def claw_arm(self, angle): while bycmd.send_angle_claw_arm(angle) == -1: pass time.sleep(1) def scoop(self, angle): while bycmd.send_angle_scoop(angle) == -1: pass time.sleep(0.5) def storage(self, angle): while bycmd.send_angle_storage(angle) == -1: pass time.sleep(0.5) def zhuan(self, angle): while bycmd.send_angle_zhuan(angle) == -1: pass time.sleep(0.5) def wait(self, _time): time.sleep(_time) # TODO 增加任务队列中非阻塞控制 class axis_cls(): def __init__(self) -> None: self.axis_queue = queue.Queue() self.busy = False pass def z(self, _distance, _time_via = -1): while self.busy is True: pass self.axis_queue.put(lambda: cmd.z(20, _distance, _time_via)) pass def z2(self, _position, _time_via = -1): while self.busy is True: pass self.axis_queue.put(lambda: cmd.z2(20, _position, _time_via)) pass def x(self, _distance, _time_via = -1): while self.busy is True: pass self.axis_queue.put(lambda: cmd.x(1, _distance, _time_via)) pass def x2(self, _position, _time_via = -1): while self.busy is True: pass self.axis_queue.put(lambda: cmd.x2(1, _position, _time_via)) pass def camera(self, _angle): while self.busy is True: pass self.axis_queue.put(lambda: cmd.camera(_angle)) pass def claw(self, _angle): while self.busy is True: pass self.axis_queue.put(lambda: cmd.claw(_angle)) pass def claw_arm(self, _angle): while self.busy is True: pass self.axis_queue.put(lambda: cmd.claw_arm(_angle)) pass def scoop(self, _angle): while self.busy is True: pass self.axis_queue.put(lambda: cmd.scoop(_angle)) pass def storage(self, _angle): while self.busy is True: pass self.axis_queue.put(lambda: cmd.storage(_angle)) pass def wait(self, _time): while self.busy is True: pass self.axis_queue.put(lambda: cmd.wait(_time)) def pop(self): self.busy = True while self.axis_queue.qsize() > 0: logger.info(f"axis cmd {self.axis_queue.qsize()}") self.axis_queue.get()() self.axis_queue.task_done() time.sleep(0.005) self.busy = False pass def exec(self, _block:bool = True): while self.busy is True: pass if _block is True: self.pop() else: thread = threading.Thread(target=self.pop) thread.start() pass