feat: 增加动作队列

This commit is contained in:
bmy
2024-06-16 17:28:00 +08:00
parent f5be160fd9
commit 0a0e597fe9
3 changed files with 131 additions and 12 deletions

125
action.py
View File

@@ -1,6 +1,8 @@
import threading import threading
import time import time
import zmq import zmq
import queue
import toml
import numpy as np import numpy as np
from loguru import logger from loguru import logger
from simple_pid import PID from simple_pid import PID
@@ -8,13 +10,21 @@ from utils import PidWrap
bycmd = None bycmd = None
move = None move = None
axis = None
cmd = None
cfg = toml.load('cfg_action.toml')
def import_obj(_bycmd): def import_obj(_bycmd):
global bycmd global bycmd
global move global move
global axis
global cmd
bycmd = _bycmd bycmd = _bycmd
move = move_cls() move = move_cls()
axis = axis_cls()
cmd = cmd_cls()
class lane_cls: class lane_cls:
def __init__(self, _speed, _time, _pid_argv = {"kp" : 1.2, "ki" : 0, "kd" : 0}) -> None: def __init__(self, _speed, _time, _pid_argv = {"kp" : 1.2, "ki" : 0, "kd" : 0}) -> None:
@@ -88,12 +98,119 @@ class move_cls:
# TODO 新运动指令类,指令的发送和完成查询功能,发送时开启新线程 # TODO 新运动指令类,指令的发送和完成查询功能,发送时开启新线程
# TODO 轴控制指令类,增加指令后加入动作队列,非阻塞
class cmd_cls():
# FIXME 延时计算时没有关联轴速度
def __init__(self) -> None:
# 根据设备上电复位后实际位置设置
self.__axis_x_pos = 160
self.__axis_z_pos = 100
# self.__axis_claw_pos = 0
# self.__axis_claw_arm_pos = 0
# self.__axis_storage_pos = 0
pass
def z(self, _speed, _dis, _time_via = -1):
# 设置绝对位置
self.__axis_z_pos += _dis
# 如果未指定延时时间,则按照相对位移计算延时时间
if _time_via == -1:
time_via = abs(_dis) * cfg["axis"]["axis_z_time_via_alpha"]
else:
time_via = _time_via
logger.info(f"z[{self.__axis_z_pos}] speed:{_speed:.2f}, dis:{_dis:.2f}, time_via:{time_via:.2f}")
while bycmd.send_position_axis_z(_speed, self.__axis_z_pos) == -1:
pass
time.sleep(time_via)
def z2(self, _speed, _pos, _time_via = -1):
# 计算绝对位置
_dis = self.__axis_z_pos - _pos
self.__axis_z_pos = _pos
# 如果未指定延时时间,则按照相对位移计算延时时间
if _time_via == -1:
time_via = abs(_dis) * cfg["axis"]["axis_z_time_via_alpha"]
else:
time_via = _time_via
logger.info(f"z[{self.__axis_z_pos}] speed:{_speed:.2f}, pos:{_pos:.2f}, time_via:{time_via:.2f}")
while bycmd.send_position_axis_z(_speed, self.__axis_z_pos) == -1:
pass
time.sleep(time_via)
def x(self, _speed, _dis, _time_via = -1):
# 设置绝对位置
self.__axis_x_pos += _dis
# 如果未指定延时时间,则按照相对位移计算延时时间
if _time_via == -1:
time_via = abs(_dis) * cfg["axis"]["axis_z_time_via_alpha"]
else:
time_via = _time_via
logger.info(f"x[{self.__axis_x_pos}] speed:{_speed:.2f}, dis:{_dis:.2f}, time_via:{time_via:.2f}")
while bycmd.send_position_axis_x(_speed, self.__axis_x_pos) == -1:
pass
time.sleep(time_via)
def x2(self, _speed, _pos, _time_via = -1):
# 计算绝对位置
_dis = self.__axis_x_pos - _pos
self.__axis_x_pos = _pos
# 如果未指定延时时间,则按照相对位移计算延时时间
if _time_via == -1:
time_via = abs(_dis) * cfg["axis"]["axis_x_time_via_alpha"]
else:
time_via = _time_via
logger.info(f"x[{self.__axis_x_pos}] speed:{_speed:.2f}, pos:{_pos:.2f}, time_via:{time_via:.2f}")
while bycmd.send_position_axis_x(_speed, self.__axis_x_pos) == -1:
pass
time.sleep(time_via)
def camera(self, angle):
while bycmd.send_angle_camera(angle) == -1:
pass
def claw(self, angle):
while bycmd.send_angle_claw(angle) == -1:
pass
time.sleep(0.5)
def claw_arm(self, angle):
while bycmd.send_angle_claw_arm(angle) == -1:
pass
time.sleep(1)
def scoop(self, angle):
while bycmd.send_angle_scoop(angle) == -1:
pass
time.sleep(0.5)
def storage(self, angle):
while bycmd.send_angle_storage(angle) == -1:
pass
time.sleep(0.5)
def zhuan(self, angle):
while bycmd.send_angle_zhuan(angle) == -1:
pass
time.sleep(0.5)
# TODO 轴控制指令类,增加指令后加入动作队列,非阻塞
class axis_cls(): class axis_cls():
def axis_z(self, _distance): def __init__(self) -> None:
self.axis_queue = queue.Queue()
pass pass
def axis_z2(self, _position): def axis_z(self, _distance, _time_via = -1):
self.axis_queue.put(lambda: cmd.z(20, _distance, _time_via))
pass pass
def exec(self): def axis_z2(self, _position, _time_via = -1):
self.axis_queue.put(lambda: cmd.z2(20, _position, _time_via))
pass
def axis_x(self, _distance, _time_via = -1):
self.axis_queue.put(lambda: cmd.x(1, _distance, _time_via))
pass
def axis_x2(self, _position, _time_via = -1):
self.axis_queue.put(lambda: cmd.x2(1, _position, _time_via))
pass
def pop(self):
while self.axis_queue.qsize() > 0:
logger.info(f"axis cmd {self.axis_queue.qsize()}")
self.axis_queue.get()()
self.axis_queue.task_done()
time.sleep(0.005)
pass
def exec(self, _block:bool = True):
if _block is True:
self.pop()
else:
thread = threading.Thread(target=self.pop)
thread.start()
pass pass

4
cfg_action.toml Normal file
View File

@@ -0,0 +1,4 @@
[axis]
# 轴运动延时系数
axis_z_time_via_alpha = 0.05
axis_x_time_via_alpha = 0.01

14
main.py
View File

@@ -18,6 +18,12 @@ cfg_main = toml.load('cfg_main.toml')
# 配置日志输出 # 配置日志输出
logger.add(cfg_main['debug']['logger_filename'], format=cfg_main['debug']['logger_format'], retention = 5, level="INFO") logger.add(cfg_main['debug']['logger_filename'], format=cfg_main['debug']['logger_format'], retention = 5, level="INFO")
act.cmd.camera(180)
act.cmd.x2(1, 140)
act.cmd.storage(20)
act.cmd.scoop(25)
# 向任务队列添加任务 # 向任务队列添加任务
task_queue = queue.Queue() task_queue = queue.Queue()
if cfg_main['task']['Subtask_enable'] is True: if cfg_main['task']['Subtask_enable'] is True:
@@ -47,14 +53,6 @@ def worker_thread():
# 启动工作线程 # 启动工作线程
worker = threading.Thread(target=worker_thread, daemon=True) worker = threading.Thread(target=worker_thread, daemon=True)
worker.start() worker.start()
# if (cmd_py_obj.send_angle_camera(180) == -1):
# cmd_py_obj.send_angle_camera(180)
cmd_py_obj.send_position_axis_x(1, 140)
time.sleep(0.5)
cmd_py_obj.send_angle_storage(20)
time.sleep(0.5)
cmd_py_obj.send_angle_scoop(25)
time.sleep(2)
# 创建主任务 # 创建主任务
main_task_t = mj.main_task(cmd_py_obj) # 初始化时传入 zmq socket 对象 main_task_t = mj.main_task(cmd_py_obj) # 初始化时传入 zmq socket 对象