Files
project_main/subtask.py
2024-05-21 23:53:47 +08:00

109 lines
3.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from enum import Enum
from loguru import logger
import time
# 任务类
class task:
def __init__(self, task_operation, enable=True):
self.enable = enable
self.task_operation = task_operation
def check(self):
# 检查该任务是否需要执行
# TODO 完善该接口,是否需要单独为每种 task 编写一个函数,还是设置一个通用的过滤器(从 detection 模块过滤结果)
ret = True
return ret
def execute(self):
# 根据标志位确定是否执行该任务
if self.enable is True:
logger.info(f"[Task]#Executing task \"{self.task_operation.__name__}\"")
self.task_operation()
logger.info(f"[Task]#Task \"{self.task_operation.__name__}\" completed.")
else:
logger.warning(f"[Task]#Skip task \"{self.task_operation.__name__}\"")
# 任务队列状态类
class task_queuem_status(Enum):
IDEL = 0
SEARCHING = 1
EXECUTING = 2
# 任务队列类 非EXECUTEING 时均执行 huigui注意互斥操作
class task_queuem(task):
# task_now = task(None, False)
def __init__(self, queue):
super(task_queuem, self)
self.queue = queue
self.status = task_queuem_status.IDEL
self.busy = True
logger.info(f"[TaskM]#Task num {self.queue.qsize()}")
def exec(self):
# 如果空闲状态则将下一个队列任务取出
if self.status is task_queuem_status.IDEL:
if self.queue.qsize() == 0:
self.busy = False
logger.info(f"[TaskM]#Task queue empty, exit")
return False
self.task_now = self.queue.get()
# 如果当前任务没有使能,则直接转入执行状态,由任务执行函数打印未执行信息
if self.task_now.enable is True:
self.status = task_queuem_status.SEARCHING
else:
self.status = task_queuem_status.EXECUTING
logger.info(f"[TaskM]#Start process task \"{self.task_now.task_operation.__name__}\" >>>>")
# 阻塞搜索任务标志位
elif self.status is task_queuem_status.SEARCHING:
logger.info(f"[TaskM]#Start searching task target")
while self.task_now.check() is False: # TODO 增加超时处理
break
self.status = task_queuem_status.EXECUTING
# 执行任务函数
elif self.status is task_queuem_status.EXECUTING:
logger.info(f"[TaskM]#Start execute task function")
self.task_now.execute() # 执行当前任务函数
self.queue.task_done() # 弹出已执行的任务
self.status = task_queuem_status.IDEL #
logger.info(f"[TaskM]#finish process task {self.task_now.task_operation.__name__} <<<<")
return True
# 人员施救
def get_block():
pass
# 紧急转移
def put_block():
pass
# 整装上阵
def get_bball():
pass
# 通信抢修
def up_tower():
pass
# 高空排险
def get_rball():
pass
# 派发物资
def put_bball():
pass
# 物资盘点
def put_hanoi():
pass
# 应急避险
def move_area():
pass
# 扫黑除暴
def kick_ass():
pass