feat: 增加任务段pid参数设置

This commit is contained in:
bmy
2024-06-15 22:04:11 +08:00
parent db6ec06441
commit ac72d1e3e6
5 changed files with 132 additions and 27 deletions

View File

@@ -1,3 +1,9 @@
'''
待办事项:
- nexec 中添加延时,保证重试时不会立即跳入下个任务
- 确认 nexec 还是直接添加一个任务结束后执行的方法更好,或者两者都保留
- 医院第二个方块自由落体
'''
from enum import Enum
from loguru import logger
from utils import label_filter
@@ -8,6 +14,7 @@ import utils
import toml
import zmq
import time
import variable as var
context = zmq.Context()
socket = context.socket(zmq.REQ)
@@ -224,8 +231,8 @@ class task_queuem_status(Enum):
IDEL = 0
SEARCHING = 1
EXECUTING = 2
# 任务队列类 非 EXECUTEING 时均执行 huigui注意互斥操作
class task_queuem(task):
# task_now = task(None, False)
def __init__(self, queue):
@@ -275,7 +282,7 @@ class task_queuem(task):
# 人员施救
class get_block1():
def init(self):
utils.task_speed = 12
var.task_speed = 12
logger.info(f"人员施救 1 初始化,第一抓取块为 {cfg['get_block']['first_block']}")
while (by_cmd.send_angle_camera(0) == -1):
by_cmd.send_angle_camera(0)
@@ -370,16 +377,19 @@ class get_block2():
by_cmd.send_distance_x(15, 100)
time.sleep(2)
var.pid_turning.set(cfg["get_block"]["pid_kp"], cfg["get_block"]["pid_ki"], cfg["get_block"]["pid_kd"])
pass
def nexec(self):
# TODO 完成不执行任务的空动作
var.pid_turning.set(cfg["get_block"]["pid_kp"], cfg["get_block"]["pid_ki"], cfg["get_block"]["pid_kd"])
pass
# 紧急转移
class put_block():
def init(self):
utils.task_speed = 0
var.task_speed = 0
while (by_cmd.send_angle_camera(0) == -1):
by_cmd.send_angle_camera(0)
logger.info("紧急转移初始化")
@@ -434,6 +444,7 @@ class put_block():
by_cmd.send_position_axis_x(1, 0)
time.sleep(1)
by_cmd.send_angle_claw_arm(36)
var.pid_turning.set(cfg["put_block"]["pid_kp"], cfg["put_block"]["pid_ki"], cfg["put_block"]["pid_kd"])
pass
def nexec(self):
# 下一动作预备位置
@@ -442,6 +453,7 @@ class put_block():
by_cmd.send_position_axis_x(1, 0)
by_cmd.send_angle_claw_arm(36)
time.sleep(4)
var.pid_turning.set(cfg["put_block"]["pid_kp"], cfg["put_block"]["pid_ki"], cfg["put_block"]["pid_kd"])
pass
# 整装上阵
@@ -507,11 +519,13 @@ class get_bball():
by_cmd.send_angle_claw(30)
by_cmd.send_position_axis_z(20, 0)
time.sleep(2)
var.pid_turning.set(cfg["get_bball"]["pid_kp"], cfg["get_bball"]["pid_ki"], cfg["get_bball"]["pid_kd"])
pass
def nexec(self):
by_cmd.send_angle_claw(30)
by_cmd.send_position_axis_z(20, 0)
time.sleep(2)
var.pid_turning.set(cfg["get_bball"]["pid_kp"], cfg["get_bball"]["pid_ki"], cfg["get_bball"]["pid_kd"])
pass
# 通信抢修
@@ -543,15 +557,17 @@ class up_tower():
by_cmd.send_distance_y(10, 50)
time.sleep(1)
by_cmd.send_angle_zhuan(0)
time.sleep(1)
time.sleep(0.5)
# while True:
# pass
# 下一动作预备位置
by_cmd.send_position_axis_z(20, 0)
var.pid_turning.set(cfg["up_tower"]["pid_kp"], cfg["up_tower"]["pid_ki"], cfg["up_tower"]["pid_kd"])
def nexec(self):
# 下一动作预备位置
by_cmd.send_position_axis_z(20, 0)
time.sleep(4)
var.pid_turning.set(cfg["up_tower"]["pid_kp"], cfg["up_tower"]["pid_ki"], cfg["up_tower"]["pid_kd"])
pass
@@ -570,18 +586,18 @@ class get_rball():
if ret > 0:
# TODO 连续两帧才开始减速
if self.record(tlabel.RBALL):
utils.task_speed = 5
var.task_speed = 5
return True
else:
return False
def exec(self):
logger.info("找到红球")
utils.task_speed = 0
var.task_speed = 0
car_stop()
time.sleep(0.5)
# 靠近塔
by_cmd.send_angle_scoop(20)
by_cmd.send_distance_y(-15, 42) # 50
by_cmd.send_distance_y(-15, 50) # 50
time.sleep(2)
calibrate_new(tlabel.RBALL,offset = 44, run = True)
time.sleep(1)
@@ -596,8 +612,10 @@ class get_rball():
by_cmd.send_angle_omega(-55,30)
# while True:
# pass
var.pid_turning.set(cfg["get_block"]["pid_kp"], cfg["get_block"]["pid_ki"], cfg["get_block"]["pid_kd"])
pass
def nexec(self):
var.pid_turning.set(cfg["get_block"]["pid_kp"], cfg["get_block"]["pid_ki"], cfg["get_block"]["pid_kd"])
pass
# 派发物资
@@ -631,8 +649,10 @@ class put_bball():
by_cmd.send_angle_storage(20)
var.pid_turning.set(cfg["put_bball"]["pid_kp"], cfg["put_bball"]["pid_ki"], cfg["put_bball"]["pid_kd"])
pass
def nexec(self):
var.pid_turning.set(cfg["put_bball"]["pid_kp"], cfg["put_bball"]["pid_ki"], cfg["put_bball"]["pid_kd"])
pass
@@ -685,9 +705,9 @@ class put_hanoi1():
# 校准 omega
if error > 0:
by_cmd.send_angle_omega(-20,abs(utils.lane_error))
by_cmd.send_angle_omega(-20,abs(var.lane_error))
else:
by_cmd.send_angle_omega(20,abs(utils.lane_error))
by_cmd.send_angle_omega(20,abs(var.lane_error))
time.sleep(0.5)
car_stop()
time.sleep(0.5)
@@ -757,7 +777,7 @@ class put_hanoi2():
def find(self):
ret, box = filter.get(self.target_label)
if ret:
utils.task_speed = 8.5
var.task_speed = 8.5
error = (box[0][2] + box[0][0] - 320) / 2 + self.offset
if abs(error) < 40:
return True
@@ -766,7 +786,7 @@ class put_hanoi2():
def exec(self):
logger.info(f"direction:{utils.direction.name}")
utils.task_speed = 0
var.task_speed = 0
car_stop()
time.sleep(0.5)
# by_cmd.send_distance_x(-8, cfg['put_hanoi2']['pos_gap'] - 30)
@@ -943,8 +963,10 @@ class put_hanoi3():
return True
def exec(self):
by_cmd.send_position_axis_z(20, 100)
var.pid_turning.set(cfg["put_hanoi3"]["pid_kp"], cfg["put_hanoi3"]["pid_ki"], cfg["put_hanoi3"]["pid_kd"])
pass
def nexec(self):
var.pid_turning.set(cfg["put_hanoi3"]["pid_kp"], cfg["put_hanoi3"]["pid_ki"], cfg["put_hanoi3"]["pid_kd"])
pass
# 应急避险 第一阶段 找目标牌
@@ -963,7 +985,7 @@ class move_area1():
# 停车 ocr 识别文字 调用大模型
car_stop()
time.sleep(2)
utils.task_speed = 8
var.task_speed = 8
pass
def nexec(self):
pass
@@ -981,7 +1003,7 @@ class move_area2():
return 5000
return False
def exec(self):
utils.task_speed = 0
var.task_speed = 0
logger.info("开始寻找停车区域")
car_stop()
calibrate_new(tlabel.SHELTER, offset = 15, run = True)
@@ -998,9 +1020,11 @@ class move_area2():
by_cmd.send_distance_y(-10, 450)
time.sleep(3)
by_cmd.send_position_axis_z(20, 0)
var.pid_turning.set(cfg["move_area"]["pid_kp"], cfg["move_area"]["pid_ki"], cfg["move_area"]["pid_kd"])
pass
def nexec(self):
by_cmd.send_position_axis_z(20, 0)
var.pid_turning.set(cfg["move_area"]["pid_kp"], cfg["move_area"]["pid_ki"], cfg["move_area"]["pid_kd"])
pass
# 扫黑除暴
@@ -1040,7 +1064,9 @@ class kick_ass():
time.sleep(4)
by_cmd.send_position_axis_x(1, 160)
time.sleep(3)
var.pid_turning.set(cfg["kick_ass"]["pid_kp"], cfg["kick_ass"]["pid_ki"], cfg["kick_ass"]["pid_kd"])
pass
def nexec(self):
var.pid_turning.set(cfg["kick_ass"]["pid_kp"], cfg["kick_ass"]["pid_ki"], cfg["kick_ass"]["pid_kd"])
pass