initial commit
This commit is contained in:
208
cam_cap.py
Normal file
208
cam_cap.py
Normal file
@@ -0,0 +1,208 @@
|
||||
import cv2
|
||||
import pynng
|
||||
import struct
|
||||
import json
|
||||
import time
|
||||
import asyncio
|
||||
import aiohttp
|
||||
from threading import Thread, Lock
|
||||
from queue import Queue, Empty
|
||||
import logging
|
||||
|
||||
HTTP_SERVER_URL = "http://127.0.0.1:5000/upload"
|
||||
|
||||
# 设置日志
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class ImageBuffer:
|
||||
def __init__(self, max_size=2):
|
||||
self.left_queue = Queue(maxsize=max_size)
|
||||
self.right_queue = Queue(maxsize=max_size)
|
||||
self.lock = Lock()
|
||||
|
||||
def update_frames(self, left_frame, right_frame):
|
||||
"""更新缓冲区中的帧,如果缓冲区满则丢弃旧帧"""
|
||||
with self.lock:
|
||||
try:
|
||||
if not self.left_queue.full():
|
||||
self.left_queue.put_nowait(left_frame)
|
||||
else:
|
||||
try:
|
||||
self.left_queue.get_nowait() # 丢弃旧帧
|
||||
self.left_queue.put_nowait(left_frame)
|
||||
except Empty:
|
||||
pass
|
||||
|
||||
if not self.right_queue.full():
|
||||
self.right_queue.put_nowait(right_frame)
|
||||
else:
|
||||
try:
|
||||
self.right_queue.get_nowait() # 丢弃旧帧
|
||||
self.right_queue.put_nowait(right_frame)
|
||||
except Empty:
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
|
||||
def get_latest_frames(self):
|
||||
"""获取最新的帧对"""
|
||||
with self.lock:
|
||||
try:
|
||||
left_frames = []
|
||||
right_frames = []
|
||||
|
||||
# 获取所有可用的左帧
|
||||
while not self.left_queue.empty():
|
||||
try:
|
||||
left_frames.append(self.left_queue.get_nowait())
|
||||
except Empty:
|
||||
break
|
||||
|
||||
# 获取所有可用的右帧
|
||||
while not self.right_queue.empty():
|
||||
try:
|
||||
right_frames.append(self.right_queue.get_nowait())
|
||||
except Empty:
|
||||
break
|
||||
|
||||
if left_frames and right_frames:
|
||||
# 返回最新的帧对
|
||||
return left_frames[-1], right_frames[-1]
|
||||
except:
|
||||
pass
|
||||
return None, None
|
||||
|
||||
# 全局缓冲区
|
||||
image_buffer = ImageBuffer()
|
||||
|
||||
def capture_thread():
|
||||
"""异步捕获线程"""
|
||||
cam_left = cv2.VideoCapture('/dev/videoL')
|
||||
cam_right = cv2.VideoCapture('/dev/videoR')
|
||||
|
||||
# # 设置摄像头分辨率
|
||||
cam_left.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
|
||||
cam_left.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
|
||||
cam_right.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
|
||||
cam_right.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
|
||||
|
||||
while True:
|
||||
ret_left, frame_left = cam_left.read()
|
||||
ret_right, frame_right = cam_right.read()
|
||||
|
||||
if ret_left and ret_right:
|
||||
image_buffer.update_frames(frame_left.copy(), frame_right.copy())
|
||||
# print(f"num of left frames: {image_buffer.left_queue.qsize()}")
|
||||
# print(f"num of right frames: {image_buffer.right_queue.qsize()}")
|
||||
else:
|
||||
print(f"Error capture thread, ret_left: {ret_left}, ret_right: {ret_right}")
|
||||
|
||||
time.sleep(0.03) # ~30fps
|
||||
|
||||
cam_left.release()
|
||||
cam_right.release()
|
||||
|
||||
async def send_image_to_web_server(frame_left, frame_right, metadata):
|
||||
"""异步发送图片到Web服务器(留空实现)"""
|
||||
try:
|
||||
# 编码为JPEG
|
||||
_, jpeg_left = cv2.imencode('.jpg', frame_left)
|
||||
_, jpeg_right = cv2.imencode('.jpg', frame_right)
|
||||
jpeg_left = jpeg_left.tobytes()
|
||||
jpeg_right = jpeg_right.tobytes()
|
||||
|
||||
# HTTP POST请求实现留空
|
||||
print(f"Would send frame {metadata['timestamp']} to web server via HTTP POST")
|
||||
# print(f"Client ID: {metadata['client_id']}")
|
||||
cv2.imwrite(f"./saved/left_{metadata['timestamp']}.jpg", frame_left)
|
||||
cv2.imwrite(f"./saved/right_{metadata['timestamp']}.jpg", frame_right)
|
||||
# 这里可以添加实际的HTTP POST请求代码
|
||||
# 示例:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
data = aiohttp.FormData()
|
||||
data.add_field('left_image', jpeg_left, filename='left.jpg', content_type='image/jpeg')
|
||||
data.add_field('right_image', jpeg_right, filename='right.jpg', content_type='image/jpeg')
|
||||
data.add_field('metadata', json.dumps(metadata))
|
||||
|
||||
async with session.post(HTTP_SERVER_URL, data=data) as response:
|
||||
if response.status == 200:
|
||||
print("Images sent successfully")
|
||||
else:
|
||||
print(f"Failed to send images: {response.status}")
|
||||
|
||||
# 模拟发送延迟
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error sending images to web server: {e}")
|
||||
|
||||
class ImageServer:
|
||||
def __init__(self, address="tcp://0.0.0.0:54321"):
|
||||
self.address = address
|
||||
self.socket = None
|
||||
|
||||
async def handle_client_request(self, client_id):
|
||||
"""处理客户端请求:获取最新帧并发送到Web服务器"""
|
||||
try:
|
||||
# 从缓冲区获取最新帧
|
||||
frame_left, frame_right = image_buffer.get_latest_frames()
|
||||
if frame_left is not None and frame_right is not None:
|
||||
# 创建元数据
|
||||
metadata = {
|
||||
"timestamp": time.time(),
|
||||
# "client_id": client_id,
|
||||
# "resolution": {
|
||||
# "width": frame_left.shape[1],
|
||||
# "height": frame_left.shape[0]
|
||||
# }
|
||||
}
|
||||
|
||||
# 异步发送图片到Web服务器
|
||||
asyncio.create_task(send_image_to_web_server(frame_left, frame_right, metadata))
|
||||
|
||||
logger.info(f"Processed request from client {client_id}, frames sent to web server")
|
||||
return b'ACK'
|
||||
else:
|
||||
logger.warning(f"No frames available for client {client_id}")
|
||||
return b'NO_FRAMES'
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing request for client {client_id}: {e}")
|
||||
return b'ERROR'
|
||||
|
||||
async def run(self):
|
||||
"""运行服务器"""
|
||||
self.socket = pynng.Rep0()
|
||||
self.socket.listen(self.address)
|
||||
logger.info(f"Image server listening on {self.address}...")
|
||||
|
||||
while True:
|
||||
try:
|
||||
# 接收客户端请求(客户端ID)
|
||||
data = await self.socket.arecv()
|
||||
|
||||
# 解析客户端ID
|
||||
client_id = data.decode('utf-8') if data else "unknown"
|
||||
# logger.info(f"Received request from client: {client_id}")
|
||||
|
||||
# 处理请求并发送响应
|
||||
response = await self.handle_client_request(client_id)
|
||||
await self.socket.asend(response)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Server error: {e}")
|
||||
break
|
||||
|
||||
async def main():
|
||||
# 启动捕获线程
|
||||
capture_thread_obj = Thread(target=capture_thread, daemon=True)
|
||||
capture_thread_obj.start()
|
||||
print("Capture thread started")
|
||||
|
||||
# 启动图像服务器
|
||||
server = ImageServer()
|
||||
await server.run()
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user