from simple_pid import PID # import queue class PidWrap: def __init__(self, kp, ki, kd, setpoint=0, output_limits=1): self.pid_t = PID(kp, ki, kd, setpoint, output_limits=(0-output_limits, output_limits)) def set_target(self, target): self.pid_t.setpoint = target def set(self, kp, ki, kd): self.pid_t.kp = kp self.pid_t.ki = ki self.pid_t.kd = kd def get(self, val_in): return self.pid_t(val_in) class main_task(): def __init__(self,socket): self.lane_socket = socket # 赛道回归相关 self.x = 0 self.y = 0 self.error_counts = 0 self.lane_error = 0 # 车控制对象初始化 # self.by_cmd = by_cmd_py() # 转向 pid self.pid1 = PidWrap(0.7, 0, 0,output_limits=40) self.pid1.set_target(0) def parse_data(self,data): if data.get('code') == 0: if data.get('type') == 'infer': self.x += data.get('data')[0][0] self.y += data.get('data')[0][1] self.error_counts += 1 else: pass def run(self): # TODO 请求和解析回归值待完成 try: # data = self.queue.get_nowait() # self.parse_data(data) pass except: pass # 运行巡线任务 self.lane_task() def lane_task(self): # TODO 巡航参数从配置文件中读取 if self.error_counts > 2: self.x = self.x / 3 self.y = self.y / 3 self.lane_error = self.x - 160 self.error_counts = 0 self.x = 0 self.y = 0 if self.lane_error > 30: self.pid1.set(0.7, 0, 0) else: self.pid1.set(0.5, 0, 0) # TODO 待引入控制接口 # self.by_cmd.send_speed_x(7) pid_out = self.pid1.get(self.lane_error) # self.by_cmd.send_speed_omega(pid_out) # self.lane_socket.send_string("infer")