import zmq import time import numpy as np from loguru import logger import utils import variable as var class main_task(): def __init__(self,by_cmd): # lane infer server self.context = zmq.Context() self.socket = self.context.socket(zmq.REQ) self.socket.connect("tcp://localhost:6666") # lane infer server1 self.context1 = zmq.Context() self.socket1 = self.context.socket(zmq.REQ) self.socket1.connect("tcp://localhost:6669") # 赛道回归相关 self.x = 0 self.y = 0 self.error_counts = 0 self.lane_error = 0 # 车控制对象初始化 self.by_cmd = by_cmd # 转向 pid # var.pid_turning = PidWrap(0.5, 0, 0, output_limits=55) # 1.2 # pid 参数在 var.pid_turning.set_target(0) def parse_data(self,data): if data.get('code') == 0: ck_val = data.get('data') # logger.debug(ck_val) if isinstance(ck_val, np.ndarray): self.x += ck_val[0] self.y += ck_val[1] self.error_counts += 1 else: pass def run(self): # 运行巡线任务 self.lane_task() def lane_ntask(self): # time.sleep(0.002) # self.socket.send_string("") # resp = self.socket.recv_pyobj() # return resp.get('data')[0] - 160 pass def lane_task(self): # TODO 巡航参数从配置文件中读取 time.sleep(0.002) if self.error_counts > 2: self.x = self.x / 3 self.y = self.y / 3 self.lane_error = self.x - 160 var.lane_error = self.lane_error # 赋全局变量 self.error_counts = 0 self.x = 0 self.y = 0 error_abs = abs(self.lane_error) if error_abs > 50: speed = 13 elif error_abs > 45: speed = 15 elif error_abs > 35: speed = 17 elif error_abs > 25: speed = 18 elif error_abs > 15: speed = 19 else: speed = 21 # lane_model initial # if error_abs > 50: # speed = 10 # elif error_abs > 45: # speed = 11 # elif error_abs > 35: # speed = 13 # elif error_abs > 25: # speed = 15 # elif error_abs > 15: # speed = 15 # else: # speed = 18 if var.task_speed != 0: speed = var.task_speed self.by_cmd.send_speed_x(speed) # pid_out = var.pid_turning.get(self.lane_error*0.65) pid_out = var.pid_turning.get(self.lane_error) # pid_out = -pid_out # logger.debug(f"err={self.lane_error}, pwm out={pid_out}") self.by_cmd.send_speed_omega(pid_out) if var.switch_lane_model: self.socket1.send_string("") resp = self.socket1.recv_pyobj() else: self.socket.send_string("") resp = self.socket.recv_pyobj() self.parse_data(resp)