import cv2 import pynng import struct import json import time import asyncio import aiohttp from threading import Thread, Lock from queue import Queue, Empty import logging HTTP_SERVER_URL = "http://127.0.0.1:5000/upload" # 设置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ImageBuffer: def __init__(self, max_size=2): self.left_queue = Queue(maxsize=max_size) self.right_queue = Queue(maxsize=max_size) self.lock = Lock() def update_frames(self, left_frame, right_frame): """更新缓冲区中的帧,如果缓冲区满则丢弃旧帧""" with self.lock: try: if not self.left_queue.full(): self.left_queue.put_nowait(left_frame) else: try: self.left_queue.get_nowait() # 丢弃旧帧 self.left_queue.put_nowait(left_frame) except Empty: pass if not self.right_queue.full(): self.right_queue.put_nowait(right_frame) else: try: self.right_queue.get_nowait() # 丢弃旧帧 self.right_queue.put_nowait(right_frame) except Empty: pass except: pass def get_latest_frames(self): """获取最新的帧对""" with self.lock: try: left_frames = [] right_frames = [] # 获取所有可用的左帧 while not self.left_queue.empty(): try: left_frames.append(self.left_queue.get_nowait()) except Empty: break # 获取所有可用的右帧 while not self.right_queue.empty(): try: right_frames.append(self.right_queue.get_nowait()) except Empty: break if left_frames and right_frames: # 返回最新的帧对 return left_frames[-1], right_frames[-1] except: pass return None, None # 全局缓冲区 image_buffer = ImageBuffer() def capture_thread(): """异步捕获线程""" cam_left = cv2.VideoCapture('/dev/videoL') cam_right = cv2.VideoCapture('/dev/videoR') # # 设置摄像头分辨率 cam_left.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) cam_left.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) cam_right.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) cam_right.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) while True: ret_left, frame_left = cam_left.read() ret_right, frame_right = cam_right.read() if ret_left and ret_right: image_buffer.update_frames(frame_left.copy(), frame_right.copy()) # print(f"num of left frames: {image_buffer.left_queue.qsize()}") # print(f"num of right frames: {image_buffer.right_queue.qsize()}") else: print(f"Error capture thread, ret_left: {ret_left}, ret_right: {ret_right}") time.sleep(0.03) # ~30fps cam_left.release() cam_right.release() async def send_image_to_web_server(frame_left, frame_right, metadata): """异步发送图片到Web服务器(留空实现)""" try: # 编码为JPEG _, jpeg_left = cv2.imencode('.jpg', frame_left) _, jpeg_right = cv2.imencode('.jpg', frame_right) jpeg_left = jpeg_left.tobytes() jpeg_right = jpeg_right.tobytes() # HTTP POST请求实现留空 print(f"Would send frame {metadata['timestamp']} to web server via HTTP POST") # print(f"Client ID: {metadata['client_id']}") cv2.imwrite(f"./saved/left_{metadata['timestamp']}.jpg", frame_left) cv2.imwrite(f"./saved/right_{metadata['timestamp']}.jpg", frame_right) # 这里可以添加实际的HTTP POST请求代码 # 示例: async with aiohttp.ClientSession() as session: data = aiohttp.FormData() data.add_field('left_image', jpeg_left, filename='left.jpg', content_type='image/jpeg') data.add_field('right_image', jpeg_right, filename='right.jpg', content_type='image/jpeg') data.add_field('metadata', json.dumps(metadata)) async with session.post(HTTP_SERVER_URL, data=data) as response: if response.status == 200: print("Images sent successfully") else: print(f"Failed to send images: {response.status}") # 模拟发送延迟 await asyncio.sleep(0.01) except Exception as e: print(f"Error sending images to web server: {e}") class ImageServer: def __init__(self, address="tcp://0.0.0.0:54321"): self.address = address self.socket = None async def handle_client_request(self, client_id): """处理客户端请求:获取最新帧并发送到Web服务器""" try: # 从缓冲区获取最新帧 frame_left, frame_right = image_buffer.get_latest_frames() if frame_left is not None and frame_right is not None: # 创建元数据 metadata = { "timestamp": time.time(), # "client_id": client_id, # "resolution": { # "width": frame_left.shape[1], # "height": frame_left.shape[0] # } } # 异步发送图片到Web服务器 asyncio.create_task(send_image_to_web_server(frame_left, frame_right, metadata)) logger.info(f"Processed request from client {client_id}, frames sent to web server") return b'ACK' else: logger.warning(f"No frames available for client {client_id}") return b'NO_FRAMES' except Exception as e: logger.error(f"Error processing request for client {client_id}: {e}") return b'ERROR' async def run(self): """运行服务器""" self.socket = pynng.Rep0() self.socket.listen(self.address) logger.info(f"Image server listening on {self.address}...") while True: try: # 接收客户端请求(客户端ID) data = await self.socket.arecv() # 解析客户端ID client_id = data.decode('utf-8') if data else "unknown" # logger.info(f"Received request from client: {client_id}") # 处理请求并发送响应 response = await self.handle_client_request(client_id) await self.socket.asend(response) except Exception as e: logger.error(f"Server error: {e}") break async def main(): # 启动捕获线程 capture_thread_obj = Thread(target=capture_thread, daemon=True) capture_thread_obj.start() print("Capture thread started") # 启动图像服务器 server = ImageServer() await server.run() if __name__ == "__main__": asyncio.run(main())