from flask import Flask, render_template, request from flask_socketio import SocketIO import toml from loguru import logger import logging from multiprocessing import Process, Queue import threading import multiprocessing import os import time import subprocess import signal import base64 import json from main_upper import main_func server_command = [ {"path": "/home/evan/Workplace/project_capture/build/", "script": "./capture"}, {"path": "/home/evan/Workplace/project_infer/lane_server/", "script": "lane_infer_server.py"}, {"path": "/home/evan/Workplace/project_infer/yolo_server/", "script": "yolo_infer_server.py"}, {"path": "/home/evan/Workplace/project_infer/ocr_server/", "script": "ocr_infer_server.py"}, ] processes = [] time_record = None # 日志队列 queue = Queue() # 跳过任务 干预任务调度 skip_task_queue = Queue() app = Flask(__name__) app.jinja_env.variable_start_string = '[(' app.jinja_env.variable_end_string = ')]' app.config['SECRET_KEY'] = 'secret!' socketio = SocketIO(app, allow_unsafe_werkzeug=True) server_process = None # FIXME 在列表里存所有的 task_process task_process = None task_run_flag = threading.Event() class WebSocketHandler(logging.Handler): def emit(self, record): log_entry = self.format(record) socketio.emit('log', {'level': record.levelname.lower(), 'content': log_entry}) # 设置日志 logger.remove() handler = WebSocketHandler() logger.add(handler, format="{time:MM-DD HH:mm:ss} {message}", level="DEBUG") fileOptions_path = '/home/evan/Workplace/project_main' fileOptions_list = ['cfg_args.toml','cfg_main.toml', 'cfg_subtask.toml'] cfg_args_path = os.path.join(fileOptions_path, 'cfg_args.toml') cfg_move_area_path = os.path.join(fileOptions_path, 'cfg_move_area.json') @app.route('/') def index(): return render_template('index.html') @app.route('/run') def run(): mode_index = request.args.get('mode') config_args = toml.load(cfg_args_path) config_args['lane_mode']['mode_index'] = int(mode_index) with open(cfg_args_path, 'w') as config_file: toml.dump(config_args, config_file) try: action_base64 = request.args.get('action') decoded_bytes = base64.b64decode(action_base64) decoded_str = decoded_bytes.decode('utf-8') json_data = json.loads(decoded_str) with open(cfg_move_area_path, 'w') as json_file: json.dump(json_data, json_file) except: # 当该字段没有传入参数时 清空配置文件 该任务按照正常流程去做 with open(cfg_move_area_path, 'w') as json_file: pass return render_template('index2.html') # @app.route('/csdn') # def csdn(): # return render_template('csdn.html') @socketio.on('operate') def operate_handle(data): global server_process global task_process global processes global time_record global task_run_flag if data['type'] == 'save_config': f = open(os.path.join(fileOptions_path,data['file_name']), 'w') ret = toml.dump(data['content'], f) logger.info(f"保存成功 {data['file_name']}") f.close() elif data['type'] == 'operate_server': logger.info(data) if data['content'] == 'run': log_file = "server_processes.log" log = open(log_file, "w") time.sleep(2) # 启动所有脚本 for i, env_info in enumerate(server_command): env_path = env_info["path"] script = env_info["script"] env = os.environ.copy() if i == 0: process = subprocess.Popen([script], cwd=env_path, env=env, stdout=log, stderr=subprocess.STDOUT) processes.append(process) time.sleep(2) process = subprocess.Popen(['python', script], cwd=env_path, env=env, stdout=log, stderr=subprocess.STDOUT) processes.append(process) logger.info("开启 server") elif data['content'] == 'stop': for process in processes: logger.error(process.pid) os.kill(process.pid, signal.SIGINT) logger.info("关闭 server") elif data['content'] == 'restart': logger.info("重启 server") elif data['type'] == 'operate_task': # 任务函数 if data['content'] == 'run': task_run_flag.set() # 开启 task 进程前先关闭所有历史进程 if task_process != None: task_process.terminate() time_record = time.perf_counter() task_process = Process(target=main_func, args=(task_run_flag,queue,skip_task_queue)) task_process.start() logger.info("开启 task") elif data['content'] == 'stop': task_run_flag.clear() if task_process != None: task_process.terminate() logger.info(f"任务结束 用时{time.perf_counter() - time_record}s") logger.info("关闭 task") elif data['content'] == 'restart': pass elif data['type'] == 'show_server_log': content = '' try: with open("server_processes.log", 'r') as file: content = file.read() except: pass socketio.emit('server_log', {'type': 'server_log', 'content': content}) elif data['type'] == 'skip_task': logger.info(data) skip_task_queue.put(1) # elif data['type'] == 'save_target_person': # config_path = os.path.join(fileOptions_path, 'cfg_args.toml') # config_args = toml.load(config_path) # config_args['lane_mode']['mode_index'] = int(data['content']) # with open(config_path, 'w') as config_file: # toml.dump(config_args, config_file) @socketio.on('connect') def test_connect(): logger.info('Client connected') socketio.emit('config_data', {'type': 'fileOptions', 'content': fileOptions_list}) config_data = {} for item in fileOptions_list: config_data[item] = toml.load(os.path.join(fileOptions_path,item)) socketio.emit('config_data', {'type': 'config_data', 'content': config_data}) socketio.emit('task_status', {'type': 'task_status', 'content': int(task_run_flag.isSet())}) def thread_function(): global queue while True: try: log = queue.get() socketio.emit('log', log) except multiprocessing.Queue.Empty: pass if __name__ == '__main__': config_path = os.path.join(fileOptions_path, 'cfg_args.toml') config_args = toml.load(config_path) config_args['lane_mode']['mode_index'] = 1 with open(config_path, 'w') as config_file: toml.dump(config_args, config_file) log_file = "server_processes.log" log = open(log_file, "w") time.sleep(2) # 启动所有脚本 for i, env_info in enumerate(server_command): env_path = env_info["path"] script = env_info["script"] env = os.environ.copy() if i == 0: process = subprocess.Popen([script], cwd=env_path, env=env, stdout=log, stderr=subprocess.STDOUT) processes.append(process) time.sleep(2) process = subprocess.Popen(['python', script], cwd=env_path, env=env, stdout=log, stderr=subprocess.STDOUT) processes.append(process) thread1 = threading.Thread(target=thread_function, daemon = True) thread1.start() socketio.run(app, host='0.0.0.0', port=5001, allow_unsafe_werkzeug=True) if server_process != None: server_process.terminate() if task_process != None: task_process.terminate() for process in processes: logger.error(process.pid) os.kill(process.pid, signal.SIGINT)