From 62de9573ac2ed04d3a5083a17af1620a33bc1810 Mon Sep 17 00:00:00 2001 From: CaoWangrenbo Date: Sun, 26 Oct 2025 10:03:07 +0800 Subject: [PATCH] initial commit --- .gitignore | 12 ++ cam_cap.py | 208 ++++++++++++++++++++++++ cam_web.py | 314 +++++++++++++++++++++++++++++++++++++ cap_trigger.py | 102 ++++++++++++ static/received/.gitkeep | 0 templates/index.html | 128 +++++++++++++++ templates/list_images.html | 260 ++++++++++++++++++++++++++++++ 7 files changed, 1024 insertions(+) create mode 100644 .gitignore create mode 100644 cam_cap.py create mode 100644 cam_web.py create mode 100644 cap_trigger.py create mode 100644 static/received/.gitkeep create mode 100644 templates/index.html create mode 100644 templates/list_images.html diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9cf3ce4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,12 @@ +venv/ +__pycache__/ +.vscode/ + + +/static/received/left/ +/static/received/right/ + +# 数据文件及数据库 +*.zip +*.db + diff --git a/cam_cap.py b/cam_cap.py new file mode 100644 index 0000000..0dee1f9 --- /dev/null +++ b/cam_cap.py @@ -0,0 +1,208 @@ +import cv2 +import pynng +import struct +import json +import time +import asyncio +import aiohttp +from threading import Thread, Lock +from queue import Queue, Empty +import logging + +HTTP_SERVER_URL = "http://127.0.0.1:5000/upload" + +# 设置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class ImageBuffer: + def __init__(self, max_size=2): + self.left_queue = Queue(maxsize=max_size) + self.right_queue = Queue(maxsize=max_size) + self.lock = Lock() + + def update_frames(self, left_frame, right_frame): + """更新缓冲区中的帧,如果缓冲区满则丢弃旧帧""" + with self.lock: + try: + if not self.left_queue.full(): + self.left_queue.put_nowait(left_frame) + else: + try: + self.left_queue.get_nowait() # 丢弃旧帧 + self.left_queue.put_nowait(left_frame) + except Empty: + pass + + if not self.right_queue.full(): + self.right_queue.put_nowait(right_frame) + else: + try: + self.right_queue.get_nowait() # 丢弃旧帧 + self.right_queue.put_nowait(right_frame) + except Empty: + pass + except: + pass + + def get_latest_frames(self): + """获取最新的帧对""" + with self.lock: + try: + left_frames = [] + right_frames = [] + + # 获取所有可用的左帧 + while not self.left_queue.empty(): + try: + left_frames.append(self.left_queue.get_nowait()) + except Empty: + break + + # 获取所有可用的右帧 + while not self.right_queue.empty(): + try: + right_frames.append(self.right_queue.get_nowait()) + except Empty: + break + + if left_frames and right_frames: + # 返回最新的帧对 + return left_frames[-1], right_frames[-1] + except: + pass + return None, None + +# 全局缓冲区 +image_buffer = ImageBuffer() + +def capture_thread(): + """异步捕获线程""" + cam_left = cv2.VideoCapture('/dev/videoL') + cam_right = cv2.VideoCapture('/dev/videoR') + + # # 设置摄像头分辨率 + cam_left.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) + cam_left.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) + cam_right.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) + cam_right.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) + + while True: + ret_left, frame_left = cam_left.read() + ret_right, frame_right = cam_right.read() + + if ret_left and ret_right: + image_buffer.update_frames(frame_left.copy(), frame_right.copy()) + # print(f"num of left frames: {image_buffer.left_queue.qsize()}") + # print(f"num of right frames: {image_buffer.right_queue.qsize()}") + else: + print(f"Error capture thread, ret_left: {ret_left}, ret_right: {ret_right}") + + time.sleep(0.03) # ~30fps + + cam_left.release() + cam_right.release() + +async def send_image_to_web_server(frame_left, frame_right, metadata): + """异步发送图片到Web服务器(留空实现)""" + try: + # 编码为JPEG + _, jpeg_left = cv2.imencode('.jpg', frame_left) + _, jpeg_right = cv2.imencode('.jpg', frame_right) + jpeg_left = jpeg_left.tobytes() + jpeg_right = jpeg_right.tobytes() + + # HTTP POST请求实现留空 + print(f"Would send frame {metadata['timestamp']} to web server via HTTP POST") + # print(f"Client ID: {metadata['client_id']}") + cv2.imwrite(f"./saved/left_{metadata['timestamp']}.jpg", frame_left) + cv2.imwrite(f"./saved/right_{metadata['timestamp']}.jpg", frame_right) + # 这里可以添加实际的HTTP POST请求代码 + # 示例: + async with aiohttp.ClientSession() as session: + data = aiohttp.FormData() + data.add_field('left_image', jpeg_left, filename='left.jpg', content_type='image/jpeg') + data.add_field('right_image', jpeg_right, filename='right.jpg', content_type='image/jpeg') + data.add_field('metadata', json.dumps(metadata)) + + async with session.post(HTTP_SERVER_URL, data=data) as response: + if response.status == 200: + print("Images sent successfully") + else: + print(f"Failed to send images: {response.status}") + + # 模拟发送延迟 + await asyncio.sleep(0.01) + + except Exception as e: + print(f"Error sending images to web server: {e}") + +class ImageServer: + def __init__(self, address="tcp://0.0.0.0:54321"): + self.address = address + self.socket = None + + async def handle_client_request(self, client_id): + """处理客户端请求:获取最新帧并发送到Web服务器""" + try: + # 从缓冲区获取最新帧 + frame_left, frame_right = image_buffer.get_latest_frames() + if frame_left is not None and frame_right is not None: + # 创建元数据 + metadata = { + "timestamp": time.time(), + # "client_id": client_id, + # "resolution": { + # "width": frame_left.shape[1], + # "height": frame_left.shape[0] + # } + } + + # 异步发送图片到Web服务器 + asyncio.create_task(send_image_to_web_server(frame_left, frame_right, metadata)) + + logger.info(f"Processed request from client {client_id}, frames sent to web server") + return b'ACK' + else: + logger.warning(f"No frames available for client {client_id}") + return b'NO_FRAMES' + + except Exception as e: + logger.error(f"Error processing request for client {client_id}: {e}") + return b'ERROR' + + async def run(self): + """运行服务器""" + self.socket = pynng.Rep0() + self.socket.listen(self.address) + logger.info(f"Image server listening on {self.address}...") + + while True: + try: + # 接收客户端请求(客户端ID) + data = await self.socket.arecv() + + # 解析客户端ID + client_id = data.decode('utf-8') if data else "unknown" + # logger.info(f"Received request from client: {client_id}") + + # 处理请求并发送响应 + response = await self.handle_client_request(client_id) + await self.socket.asend(response) + + except Exception as e: + logger.error(f"Server error: {e}") + break + +async def main(): + # 启动捕获线程 + capture_thread_obj = Thread(target=capture_thread, daemon=True) + capture_thread_obj.start() + print("Capture thread started") + + # 启动图像服务器 + server = ImageServer() + await server.run() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/cam_web.py b/cam_web.py new file mode 100644 index 0000000..c8aa1c2 --- /dev/null +++ b/cam_web.py @@ -0,0 +1,314 @@ +import cv2 +import numpy as np +from flask import Flask, render_template, request, jsonify, send_file +from flask_socketio import SocketIO, emit +import io +import base64 +import time +import threading +import logging +import os +import json +import sqlite3 +from datetime import datetime +import zipfile +import tempfile + +from cap_trigger import ImageClient + + + +# --- 配置 --- +SAVE_PATH_LEFT = "./static/received/left" +SAVE_PATH_RIGHT = "./static/received/right" +FLASK_HOST = "0.0.0.0" +FLASK_PORT = 5000 +MAX_LIVE_FRAMES = 2 # 保留最新的几帧用于实时显示 +DATABASE_PATH = "received_images.db" # SQLite 数据库文件路径 +# --- 配置 --- + +# 设置日志 +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# --- 数据库初始化 --- +def init_db(): + """初始化 SQLite 数据库和表""" + conn = sqlite3.connect(DATABASE_PATH) + cursor = conn.cursor() + # 创建表,存储图片信息 + cursor.execute(''' + CREATE TABLE IF NOT EXISTS images ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + left_filename TEXT NOT NULL, + right_filename TEXT NOT NULL, + timestamp REAL NOT NULL, + metadata TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + ''') + conn.commit() + conn.close() + logger.info(f"Database {DATABASE_PATH} initialized.") + +# --- 全局变量 --- +# 用于存储最新的左右帧,用于实时显示 +latest_left_frame = None +latest_right_frame = None +latest_timestamp = None +frame_lock = threading.Lock() # 保护全局帧变量 + +# --- Flask & SocketIO 应用 --- +app = Flask(__name__) +# 为生产环境配置 SECRET_KEY +app.config['SECRET_KEY'] = 'your-secret-key-change-this' +# 配置异步模式,如果需要异步处理可以调整 +socketio = SocketIO(app, cors_allowed_origins="*") # 允许所有来源,生产环境请具体配置 + +# 初始化图像客户端 +image_client = ImageClient("tcp://127.0.0.1:54321", client_id="local") + +# 初始化数据库 +init_db() + +# --- Flask 路由 --- + +@app.route('/') +def index(): + """主页,加载实时图像页面""" + return render_template('index.html') + +@app.route('/list') # 新增路由用于显示图片列表 +def list_images(): + """加载图片列表页面""" + return render_template('list_images.html') + +@app.route('/api/images', methods=['GET']) +def get_images_api(): + """API: 获取图片列表 (JSON 格式)""" + conn = sqlite3.connect(DATABASE_PATH) + cursor = conn.cursor() + # 按时间倒序排列 + cursor.execute("SELECT id, left_filename, right_filename, timestamp, metadata, created_at FROM images ORDER BY timestamp DESC") + rows = cursor.fetchall() + conn.close() + + images = [] + for row in rows: + images.append({ + "id": row[0], + "left_filename": row[1], + "right_filename": row[2], + "timestamp": row[3], + "metadata": row[4], + "created_at": row[5] + }) + return jsonify(images) + +@app.route('/api/images', methods=['DELETE']) +def delete_image_api(): + """API: 删除单张图片记录及其文件""" + image_id = request.json.get('id') + if not image_id: + return jsonify({"error": "Image ID is required"}), 400 + + conn = sqlite3.connect(DATABASE_PATH) + cursor = conn.cursor() + # 查询文件名 + cursor.execute("SELECT left_filename, right_filename FROM images WHERE id = ?", (image_id,)) + row = cursor.fetchone() + if not row: + conn.close() + return jsonify({"error": "Image not found"}), 404 + + left_filename, right_filename = row + # 删除数据库记录 + cursor.execute("DELETE FROM images WHERE id = ?", (image_id,)) + conn.commit() + conn.close() + + # 删除对应的文件 + left_path = os.path.join(SAVE_PATH_LEFT, left_filename) + right_path = os.path.join(SAVE_PATH_RIGHT, right_filename) + try: + if os.path.exists(left_path): + os.remove(left_path) + logger.info(f"Deleted file: {left_path}") + if os.path.exists(right_path): + os.remove(right_path) + logger.info(f"Deleted file: {right_path}") + except OSError as e: + logger.error(f"Error deleting files: {e}") + # 即使删除文件失败,数据库记录也已删除,返回成功 + pass + + return jsonify({"message": f"Image {image_id} deleted successfully"}) + +@app.route('/api/images/export', methods=['POST']) +def export_images_api(): + """API: 打包导出选中的图片""" + selected_ids = request.json.get('ids', []) + if not selected_ids: + return jsonify({"error": "No image IDs selected"}), 400 + + conn = sqlite3.connect(DATABASE_PATH) + cursor = conn.cursor() + placeholders = ','.join('?' * len(selected_ids)) + cursor.execute(f"SELECT left_filename, right_filename FROM images WHERE id IN ({placeholders})", selected_ids) + rows = cursor.fetchall() + conn.close() + + if not rows: + return jsonify({"error": "No matching images found"}), 404 + + # 创建临时 ZIP 文件 + temp_zip_fd, temp_zip_path = tempfile.mkstemp(suffix='.zip') + os.close(temp_zip_fd) # 关闭文件描述符,让 zipfile 模块管理 + + try: + with zipfile.ZipFile(temp_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: + for left_fn, right_fn in rows: + left_path = os.path.join(SAVE_PATH_LEFT, left_fn) + right_path = os.path.join(SAVE_PATH_RIGHT, right_fn) + if os.path.exists(left_path): + zipf.write(left_path, os.path.join('left', left_fn)) + if os.path.exists(right_path): + zipf.write(right_path, os.path.join('right', right_fn)) + + logger.info(f"Exported {len(rows)} image pairs to {temp_zip_path}") + # 返回 ZIP 文件给客户端 + return send_file(temp_zip_path, as_attachment=True, download_name='exported_images.zip') + + except Exception as e: + logger.error(f"Error creating export ZIP: {e}") + if os.path.exists(temp_zip_path): + os.remove(temp_zip_path) + return jsonify({"error": str(e)}), 500 + + +@app.route('/upload', methods=['POST']) +def upload_images(): + """接收左右摄像头图片,保存并推送更新""" + try: + # 从 multipart/form-data 中获取文件 + left_file = request.files.get('left_image') + right_file = request.files.get('right_image') + metadata_str = request.form.get('metadata') # 如果需要处理元数据 + + if not left_file or not right_file: + logger.warning("Received request without required image files.") + return jsonify({"error": "Missing left_image or right_image"}), 400 + + # 读取图片数据 + left_img_bytes = left_file.read() + right_img_bytes = right_file.read() + + # 解码图片用于后续处理 (如显示、保存) + nparr_left = np.frombuffer(left_img_bytes, np.uint8) + nparr_right = np.frombuffer(right_img_bytes, np.uint8) + img_left = cv2.imdecode(nparr_left, cv2.IMREAD_COLOR) + img_right = cv2.imdecode(nparr_right, cv2.IMREAD_COLOR) + + if img_left is None or img_right is None: + logger.error("Failed to decode received images.") + return jsonify({"error": "Could not decode images"}), 400 + + # 解析元数据 (如果提供) + metadata = {} + if metadata_str: + try: + metadata = json.loads(metadata_str) + except json.JSONDecodeError as e: + logger.warning(f"Could not parse metadata: {e}") + + timestamp_str = str(metadata.get("timestamp", str(int(time.time())))) + timestamp_str_safe = timestamp_str.replace(".", "_") # 避免文件名中的点号问题 + + # 生成文件名 + left_filename = f"left_{timestamp_str_safe}.jpg" + right_filename = f"right_{timestamp_str_safe}.jpg" + + # 保存图片到本地 + left_path = os.path.join(SAVE_PATH_LEFT, left_filename) + right_path = os.path.join(SAVE_PATH_RIGHT, right_filename) + + # 确保目录存在 + os.makedirs(SAVE_PATH_LEFT, exist_ok=True) + os.makedirs(SAVE_PATH_RIGHT, exist_ok=True) + + cv2.imwrite(left_path, img_left) + cv2.imwrite(right_path, img_right) + logger.info(f"Saved images: {left_path}, {right_path}") + + # 将图片信息写入数据库 + conn = sqlite3.connect(DATABASE_PATH) + cursor = conn.cursor() + cursor.execute(''' + INSERT INTO images (left_filename, right_filename, timestamp, metadata) + VALUES (?, ?, ?, ?) + ''', (left_filename, right_filename, float(timestamp_str), json.dumps(metadata))) + conn.commit() + image_id = cursor.lastrowid # 获取新插入记录的 ID + conn.close() + logger.info(f"Recorded image pair (ID: {image_id}) in database.") + + # 将 OpenCV 图像编码为 base64 字符串,用于 WebSocket 传输 + _, left_encoded = cv2.imencode('.jpg', img_left) + _, right_encoded = cv2.imencode('.jpg', img_right) + left_b64 = base64.b64encode(left_encoded).decode('utf-8') + right_b64 = base64.b64encode(right_encoded).decode('utf-8') + + # 更新用于实时显示的全局变量 (如果需要) + with frame_lock: + global latest_left_frame, latest_right_frame, latest_timestamp + latest_left_frame = img_left + latest_right_frame = img_right + latest_timestamp = timestamp_str + + socketio.emit('update_image', { + 'left_image': left_b64, + 'right_image': right_b64, + 'timestamp': timestamp_str + }) + + return jsonify({"message": "Images received, saved, and pushed via WebSocket", "timestamp": timestamp_str, "id": image_id}) + + except Exception as e: + logger.error(f"Error processing upload: {e}") + return jsonify({"error": str(e)}), 500 + +# --- 可选:添加一个简单的状态检查路由 --- +@app.route('/status') +def status(): + with frame_lock: + has_frames = latest_left_frame is not None and latest_right_frame is not None + timestamp = latest_timestamp if has_frames else "N/A" + return jsonify({"has_frames": has_frames, "latest_timestamp": timestamp}) + +# --- SocketIO 事件处理程序 --- +@socketio.event +def connect(): + """客户端连接事件""" + logger.info("Client connected") + +@socketio.event +def disconnect(): + """客户端断开连接事件""" + logger.info("Client disconnected") + +@socketio.event +def capture_button(data): + """ + SocketIO 事件处理器,当客户端发送 'button_pressed' 事件时触发 + """ + try: + image_client.request_sync() + logger.info("Request sent to server.") + except Exception as e: + logger.error(f"Error sending request: {e}") + + + +if __name__ == '__main__': + logger.info(f"Starting Flask-SocketIO server on {FLASK_HOST}:{FLASK_PORT}") + socketio.run(app, host=FLASK_HOST, port=FLASK_PORT, debug=False, allow_unsafe_werkzeug=True) \ No newline at end of file diff --git a/cap_trigger.py b/cap_trigger.py new file mode 100644 index 0000000..6da23c9 --- /dev/null +++ b/cap_trigger.py @@ -0,0 +1,102 @@ +import pynng +import asyncio +import uuid +import time + +class ImageClient: + def __init__(self, server_address="tcp://10.42.70.1:54321", client_id=None): + self.server_address = server_address + self.client_id = client_id or f"client_{uuid.uuid4().hex[:8]}" + self.socket = None + + async def connect(self): + """连接到服务器""" + self.socket = pynng.Req0() + self.socket.dial(self.server_address, block=False) + print(f"Client {self.client_id} connected to server") + + async def send_request(self): + """发送请求到服务器""" + try: + # 发送客户端 ID 作为请求 + client_id_bytes = self.client_id.encode('utf-8') + self.socket.send(client_id_bytes) + + # 等待响应 + response = self.socket.recv() + print(f"Client {self.client_id}: Received response: {response}") + return response + except Exception as e: + print(f"Client {self.client_id} error: {e}") + return None + + async def request(self): + """发送请求并等待响应""" + try: + await self.connect() + response = await self.send_request() + return response + except Exception as e: + print(f"Client {self.client_id} error: {e}") + + def request_sync(self): + """发送同步请求到服务器""" + socket = None + try: + # 每次都创建一个新的socket实例以避免复用已关闭的socket + socket = pynng.Req0() + socket.dial(self.server_address, block=True) # 使用阻塞模式确保连接建立 + client_id_bytes = self.client_id.encode('utf-8') + socket.send(client_id_bytes) + response = socket.recv() + return response + except Exception as e: + print(f"Client {self.client_id} error: {e}") + return None + finally: + # 确保socket在使用后被正确关闭 + if socket: + try: + socket.close() + except: + pass + + + async def run(self, request_interval=0.033): # ~30fps + """运行客户端请求循环""" + await self.connect() + + request_count = 0 + while True: + try: + response = await self.send_request() + if response: + print(f"Client {self.client_id}: Request {request_count} completed") + + request_count += 1 + await asyncio.sleep(request_interval) + if request_count >= 1: + break + + except KeyboardInterrupt: + print(f"Client {self.client_id} interrupted by user") + break + except Exception as e: + print(f"Client {self.client_id} error in main loop: {e}") + await asyncio.sleep(0.02) + +async def main(): + # 创建多个客户端实例 + num_clients = 1 # 可以根据需要调整客户端数量 + tasks = [] + + for i in range(num_clients): + client = ImageClient(client_id=f"client_{i}_{uuid.uuid4().hex[:8]}") + task = asyncio.create_task(client.run()) + tasks.append(task) + + # 等待所有客户端任务完成 + await asyncio.gather(*tasks, return_exceptions=True) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/static/received/.gitkeep b/static/received/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..81abd6c --- /dev/null +++ b/templates/index.html @@ -0,0 +1,128 @@ + + + + Real-time Image Display via WebSocket + + + + + +

Live Stereo Camera Feed (via WebSocket)

+ +

View Saved Images

+
Connecting to server...
+ + +
+ + +
+ +
+
+

Left Camera

+ No Left Image +
No timestamp
+
+
+

Right Camera

+ No Right Image +
No timestamp
+
+
+ + + + \ No newline at end of file diff --git a/templates/list_images.html b/templates/list_images.html new file mode 100644 index 0000000..1be3a19 --- /dev/null +++ b/templates/list_images.html @@ -0,0 +1,260 @@ + + + + Saved Images List + + + + +

Saved Images

+ +

Back to Live Feed

+
Loading images...
+
+ + + +
+ + + + + + + + + + + + + + + +
+ + + IDLeft ImageRight ImageTimestampCreated AtActions
+ + + + \ No newline at end of file