208 lines
7.4 KiB
Python
208 lines
7.4 KiB
Python
import cv2
|
||
import pynng
|
||
import struct
|
||
import json
|
||
import time
|
||
import asyncio
|
||
import aiohttp
|
||
from threading import Thread, Lock
|
||
from queue import Queue, Empty
|
||
import logging
|
||
|
||
HTTP_SERVER_URL = "http://c1.sgc.brisky.space/upload"
|
||
|
||
# 设置日志
|
||
logging.basicConfig(level=logging.INFO)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class ImageBuffer:
|
||
def __init__(self, max_size=2):
|
||
self.left_queue = Queue(maxsize=max_size)
|
||
self.right_queue = Queue(maxsize=max_size)
|
||
self.lock = Lock()
|
||
|
||
def update_frames(self, left_frame, right_frame):
|
||
"""更新缓冲区中的帧,如果缓冲区满则丢弃旧帧"""
|
||
with self.lock:
|
||
try:
|
||
if not self.left_queue.full():
|
||
self.left_queue.put_nowait(left_frame)
|
||
else:
|
||
try:
|
||
self.left_queue.get_nowait() # 丢弃旧帧
|
||
self.left_queue.put_nowait(left_frame)
|
||
except Empty:
|
||
pass
|
||
|
||
if not self.right_queue.full():
|
||
self.right_queue.put_nowait(right_frame)
|
||
else:
|
||
try:
|
||
self.right_queue.get_nowait() # 丢弃旧帧
|
||
self.right_queue.put_nowait(right_frame)
|
||
except Empty:
|
||
pass
|
||
except:
|
||
pass
|
||
|
||
def get_latest_frames(self):
|
||
"""获取最新的帧对"""
|
||
with self.lock:
|
||
try:
|
||
left_frames = []
|
||
right_frames = []
|
||
|
||
# 获取所有可用的左帧
|
||
while not self.left_queue.empty():
|
||
try:
|
||
left_frames.append(self.left_queue.get_nowait())
|
||
except Empty:
|
||
break
|
||
|
||
# 获取所有可用的右帧
|
||
while not self.right_queue.empty():
|
||
try:
|
||
right_frames.append(self.right_queue.get_nowait())
|
||
except Empty:
|
||
break
|
||
|
||
if left_frames and right_frames:
|
||
# 返回最新的帧对
|
||
return left_frames[-1], right_frames[-1]
|
||
except:
|
||
pass
|
||
return None, None
|
||
|
||
# 全局缓冲区
|
||
image_buffer = ImageBuffer()
|
||
|
||
def capture_thread():
|
||
"""异步捕获线程"""
|
||
cam_left = cv2.VideoCapture('/dev/videoL')
|
||
cam_right = cv2.VideoCapture('/dev/videoR')
|
||
|
||
# # 设置摄像头分辨率
|
||
cam_left.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
|
||
cam_left.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
|
||
cam_right.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
|
||
cam_right.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
|
||
|
||
while True:
|
||
ret_left, frame_left = cam_left.read()
|
||
ret_right, frame_right = cam_right.read()
|
||
|
||
if ret_left and ret_right:
|
||
image_buffer.update_frames(frame_left.copy(), frame_right.copy())
|
||
# print(f"num of left frames: {image_buffer.left_queue.qsize()}")
|
||
# print(f"num of right frames: {image_buffer.right_queue.qsize()}")
|
||
else:
|
||
print(f"Error capture thread, ret_left: {ret_left}, ret_right: {ret_right}")
|
||
|
||
time.sleep(0.03) # ~30fps
|
||
|
||
cam_left.release()
|
||
cam_right.release()
|
||
|
||
async def send_image_to_web_server(frame_left, frame_right, metadata):
|
||
"""异步发送图片到 Web 服务器(留空实现)"""
|
||
try:
|
||
# 编码为 JPEG
|
||
_, jpeg_left = cv2.imencode('.jpg', frame_left)
|
||
_, jpeg_right = cv2.imencode('.jpg', frame_right)
|
||
jpeg_left = jpeg_left.tobytes()
|
||
jpeg_right = jpeg_right.tobytes()
|
||
|
||
# HTTP POST 请求实现留空
|
||
print(f"Would send frame {metadata['timestamp']} to web server via HTTP POST")
|
||
# print(f"Client ID: {metadata['client_id']}")
|
||
cv2.imwrite(f"./saved/left_{metadata['timestamp']}.jpg", frame_left)
|
||
cv2.imwrite(f"./saved/right_{metadata['timestamp']}.jpg", frame_right)
|
||
# 这里可以添加实际的 HTTP POST 请求代码
|
||
# 示例:
|
||
async with aiohttp.ClientSession() as session:
|
||
data = aiohttp.FormData()
|
||
data.add_field('left_image', jpeg_left, filename='left.jpg', content_type='image/jpeg')
|
||
data.add_field('right_image', jpeg_right, filename='right.jpg', content_type='image/jpeg')
|
||
data.add_field('metadata', json.dumps(metadata))
|
||
|
||
async with session.post(HTTP_SERVER_URL, data=data) as response:
|
||
if response.status == 200:
|
||
print("Images sent successfully")
|
||
else:
|
||
print(f"Failed to send images: {response.status}")
|
||
|
||
# 模拟发送延迟
|
||
await asyncio.sleep(0.01)
|
||
|
||
except Exception as e:
|
||
print(f"Error sending images to web server: {e}")
|
||
|
||
class ImageServer:
|
||
def __init__(self, address="tcp://0.0.0.0:54321"):
|
||
self.address = address
|
||
self.socket = None
|
||
|
||
async def handle_client_request(self, client_id):
|
||
"""处理客户端请求:获取最新帧并发送到 Web 服务器"""
|
||
try:
|
||
# 从缓冲区获取最新帧
|
||
frame_left, frame_right = image_buffer.get_latest_frames()
|
||
if frame_left is not None and frame_right is not None:
|
||
# 创建元数据
|
||
metadata = {
|
||
"timestamp": time.time(),
|
||
# "client_id": client_id,
|
||
# "resolution": {
|
||
# "width": frame_left.shape[1],
|
||
# "height": frame_left.shape[0]
|
||
# }
|
||
}
|
||
|
||
# 异步发送图片到 Web 服务器
|
||
asyncio.create_task(send_image_to_web_server(frame_left, frame_right, metadata))
|
||
|
||
logger.info(f"Processed request from client {client_id}, frames sent to web server")
|
||
return b'ACK'
|
||
else:
|
||
logger.warning(f"No frames available for client {client_id}")
|
||
return b'NO_FRAMES'
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error processing request for client {client_id}: {e}")
|
||
return b'ERROR'
|
||
|
||
async def run(self):
|
||
"""运行服务器"""
|
||
self.socket = pynng.Rep0()
|
||
self.socket.listen(self.address)
|
||
logger.info(f"Image server listening on {self.address}...")
|
||
|
||
while True:
|
||
try:
|
||
# 接收客户端请求(客户端 ID)
|
||
data = await self.socket.arecv()
|
||
|
||
# 解析客户端 ID
|
||
client_id = data.decode('utf-8') if data else "unknown"
|
||
# logger.info(f"Received request from client: {client_id}")
|
||
|
||
# 处理请求并发送响应
|
||
response = await self.handle_client_request(client_id)
|
||
await self.socket.asend(response)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Server error: {e}")
|
||
break
|
||
|
||
async def main():
|
||
# 启动捕获线程
|
||
capture_thread_obj = Thread(target=capture_thread, daemon=True)
|
||
capture_thread_obj.start()
|
||
print("Capture thread started")
|
||
|
||
# 启动图像服务器
|
||
server = ImageServer()
|
||
await server.run()
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main()) |