Files
project_main/subtask.py
2024-05-22 12:37:15 +08:00

137 lines
3.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from enum import Enum
from loguru import logger
import time
# 任务类
class task:
def __init__(self, func_exec, func_find, enable=True):
self.enable = enable
self.func_exec = func_exec
self.func_find = func_find
def find(self):
# 检查该任执行标志
# TODO 完善该接口,是否需要单独为每种 task 编写一个函数,还是设置一个通用的过滤器(从 detection 模块过滤结果)
while self.func_find() is False:
pass
def exec(self):
# 根据标志位确定是否执行该任务
if self.enable is True:
logger.debug(f"[Task ]# Executing task")
self.func_exec()
logger.debug(f"[Task ]# Task completed")
else:
logger.warning(f"[Task ]# Skip task")
# 任务队列状态类
class task_queuem_status(Enum):
IDEL = 0
SEARCHING = 1
EXECUTING = 2
# 任务队列类 非EXECUTEING 时均执行 huigui注意互斥操作
class task_queuem(task):
# task_now = task(None, False)
def __init__(self, queue):
super(task_queuem, self)
self.queue = queue
self.status = task_queuem_status.IDEL
self.busy = True
logger.info(f"[TaskM]# Task num {self.queue.qsize()}")
def exec(self):
# 如果空闲状态则将下一个队列任务取出
if self.status is task_queuem_status.IDEL:
if self.queue.qsize() == 0:
self.busy = False
logger.info(f"[TaskM]# Task queue empty, exit")
return False
self.task_now = self.queue.get()
# 如果当前任务没有使能,则直接转入执行状态,由任务执行函数打印未执行信息
if self.task_now.enable is True:
self.status = task_queuem_status.SEARCHING
else:
self.status = task_queuem_status.EXECUTING
logger.info(f"[TaskM]# ---------------------->>>>")
# 阻塞搜索任务标志位
elif self.status is task_queuem_status.SEARCHING:
logger.info(f"[TaskM]# Start searching task target")
self.task_now.find()
self.status = task_queuem_status.EXECUTING
# 执行任务函数
elif self.status is task_queuem_status.EXECUTING:
logger.info(f"[TaskM]# Start execute task function")
self.task_now.exec() # 执行当前任务函数
self.queue.task_done() # 弹出已执行的任务
self.status = task_queuem_status.IDEL #
logger.info(f"[TaskM]# <<<<----------------------")
return True
# 人员施救
class get_block():
def find():
return True
def exec():
pass
# 紧急转移
class put_block():
def find():
return False
def exec():
pass
# 整装上阵
class get_bball():
def find():
return False
def exec():
pass
# 通信抢修
class up_tower():
def find():
return False
def exec():
pass
# 高空排险
class get_rball():
def find():
return False
def exec():
pass
# 派发物资
class put_bball():
def find():
return False
def exec():
pass
# 物资盘点
class put_hanoi():
def find():
return False
def exec():
pass
# 应急避险
class move_area():
def find():
return False
def exec():
pass
# 扫黑除暴
class kick_ass():
def find():
return False
def exec():
pass