import subprocess import time import os import psutil from loguru import logger # 定义要启动的程序和命令 binary_command = "cd /home/evan/Workplace/project_capture/build/; ./capture" # 替换为你的二进制程序路径 conda_envs = [ {"env": "conda_venv", "path": "/home/evan/Workplace/project_infer/lane_server/", "script": "python lane_infer_server.py"}, {"env": "conda_venv", "path": "/home/evan/Workplace/project_infer/yolo_server/", "script": "python yolo_infer_server.py"}, {"env": "conda_venv", "path": "/home/evan/Workplace/project_infer/ocr_server/", "script": "python ocr_infer_server.py"}, ] def kill_processes_by_name(process_name): """ 终止所有具有指定名称的进程。 """ for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: # 检查 cmdline 是否存在且是可迭代的 if 'cmdline' in proc.info and isinstance(proc.info['cmdline'], (list, tuple)): cmdline = ' '.join(proc.info['cmdline']).lower() # 检查进程名或命令行是否包含给定的字符串 if process_name.lower() in proc.info['name'].lower() or process_name.lower() in cmdline: logger.warning(f"Killing {proc.info['name']} (PID: {proc.info['pid']})") proc.kill() else: # 如果 cmdline 不存在或不是可迭代的,可以跳过或记录日志 logger.debug(f"cmdline is not iterable or does not exist for process {proc.info['name']} (PID: {proc.info['pid']})") except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass # 启动函数,用于启动并监控一个进程 def start_and_monitor_process(cmd, env=None): process = subprocess.Popen(cmd, shell=True, env=env) # while process.poll() is None: # 等待进程结束 # time.sleep(1) # 可以根据需要调整等待时间 # return process.returncode # 返回进程退出码 # 主程序 def main(): # 启动二进制程序 logger.error("hi") kill_processes_by_name("capture") binary_returncode = start_and_monitor_process(binary_command) time.sleep(2) logger.info("start python script") # 为每个 conda 环境启动 Python 脚本 conda_procs = [] for env_info in conda_envs: logger.info(f"start {env_info['script']}") # 激活 conda 环境并运行脚本(这里假设你使用的是 bash shell) activate_cmd = f"cd {env_info['path']}; {env_info['script']}" # activate_cmd = f"conda activate {env_info['env']}; {env_info['script']}" # 注意:在某些系统上,可能需要使用 `conda activate` 而不是 `source activate` # 并且可能需要设置 CONDA_PREFIX 等环境变量 env = os.environ.copy() # 如果有需要传递给子 shell 的环境变量,可以在这里设置 # 例如:env["CONDA_PREFIX"] = ... conda_proc = subprocess.Popen(["bash", "-c", activate_cmd], env=env) conda_procs.append(conda_proc) # 等待所有 conda 环境中的 Python 脚本完成 for proc in conda_procs: proc.wait() # 等待进程结束,但不会轮询检查 # 打印提示信息 print(f"Binary program exited with code {binary_returncode}") for idx, proc in enumerate(conda_procs): print(f"{env_info['script']} script exited with code {proc.returncode}") if __name__ == "__main__": main()