Files
m20_core_web/cam_web.py
2025-10-26 15:34:58 +08:00

623 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import cv2
import numpy as np
from flask import Flask, render_template, request, jsonify, send_file
from flask_socketio import SocketIO, emit
import io
import base64
import time
import threading
import logging
import os
import json
import sqlite3
from datetime import datetime
import zipfile
import tempfile
# 导入视觉处理相关的模块
from llm_req import VisionAPIClient, DetectionResult
from cap_trigger import ImageClient
# --- 配置 ---
SAVE_PATH_LEFT = "./static/received/left"
SAVE_PATH_RIGHT = "./static/received/right"
SAVE_PATH_LEFT_MARKED = "./static/received/left_marked" # 标注图片保存路径
SAVE_PATH_RIGHT_MARKED = "./static/received/right_marked" # 标注图片保存路径
FLASK_HOST = "0.0.0.0"
FLASK_PORT = 5000
MAX_LIVE_FRAMES = 2 # 保留最新的几帧用于实时显示
DATABASE_PATH = "received_images.db" # SQLite 数据库文件路径
# --- 配置 ---
# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- 数据库初始化 ---
def init_db():
"""初始化 SQLite 数据库和表"""
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# 创建表,存储图片信息,添加标注图片字段
cursor.execute('''
CREATE TABLE IF NOT EXISTS images (
id INTEGER PRIMARY KEY AUTOINCREMENT,
left_filename TEXT NOT NULL,
right_filename TEXT NOT NULL,
left_marked_filename TEXT,
right_marked_filename TEXT,
timestamp REAL NOT NULL,
metadata TEXT,
comment TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
logger.info(f"Database {DATABASE_PATH} initialized.")
# --- 全局变量 ---
# 用于存储最新的左右帧,用于实时显示
latest_left_frame = None
latest_right_frame = None
latest_timestamp = None
frame_lock = threading.Lock() # 保护全局帧变量
# --- Flask & SocketIO 应用 ---
app = Flask(__name__)
# 为生产环境配置 SECRET_KEY
app.config['SECRET_KEY'] = 'your-secret-key-change-this'
# 配置异步模式,如果需要异步处理可以调整
socketio = SocketIO(app, cors_allowed_origins="*") # 允许所有来源,生产环境请具体配置
# 初始化图像客户端
image_client = ImageClient("tcp://127.0.0.1:54321", client_id="local")
# 初始化数据库
init_db()
# --- 辅助函数 ---
def draw_detections_on_image(image: np.ndarray, detections: list) -> np.ndarray:
"""在图像上绘制检测框"""
# 复制原图以避免修改原始图像
marked_image = image.copy()
# 定义颜色映射
color_map = {
1: (0, 255, 0), # 绿色 - 弹药箱
2: (255, 0, 0), # 蓝色 - 士兵
3: (0, 0, 255), # 红色 - 枪支
4: (255, 255, 0) # 青色 - 数字牌
}
# 获取图像尺寸
h, w = image.shape[:2]
# 绘制每个检测框
for detection in detections:
# 获取检测信息
obj_id = detection.get("id", 0)
label = detection.get("label", "")
bbox = detection.get("bbox", [])
if len(bbox) == 4:
# 将归一化的坐标转换为实际像素坐标
x_min = int(bbox[0] * w / 999)
y_min = int(bbox[1] * h / 999)
x_max = int(bbox[2] * w / 999)
y_max = int(bbox[3] * h / 999)
# 获取颜色
color = color_map.get(obj_id, (255, 255, 255)) # 默认白色
# 绘制边界框
cv2.rectangle(marked_image, (x_min, y_min), (x_max, y_max), color, 2)
# 添加标签
label_text = f"{label} ({obj_id})"
cv2.putText(marked_image, label_text, (x_min, y_min - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
return marked_image
# --- Flask 路由 ---
@app.route('/')
def index():
"""主页,加载实时图像页面"""
return render_template('index.html')
@app.route('/list') # 新增路由用于显示图片列表
def list_images():
"""加载图片列表页面"""
return render_template('list_images.html')
@app.route('/api/images', methods=['GET'])
def get_images_api():
"""API: 获取图片列表 (JSON 格式)"""
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# 按时间倒序排列,包含标注图片字段和人工标注字段
try:
cursor.execute("""
SELECT id, left_filename, right_filename, left_marked_filename, right_marked_filename,
timestamp, metadata, comment, created_at, manual_detections, is_manual_labeled
FROM images
ORDER BY timestamp DESC
""")
except sqlite3.OperationalError:
# 如果字段不存在,使用基本查询
cursor.execute("""
SELECT id, left_filename, right_filename, left_marked_filename, right_marked_filename,
timestamp, metadata, comment, created_at, NULL as manual_detections, 0 as is_manual_labeled
FROM images
ORDER BY timestamp DESC
""")
rows = cursor.fetchall()
conn.close()
images = []
for row in rows:
images.append({
"id": row[0],
"left_filename": row[1],
"right_filename": row[2],
"left_marked_filename": row[3] or "", # 如果没有标注图片则显示空字符串
"right_marked_filename": row[4] or "", # 如果没有标注图片则显示空字符串
"timestamp": row[5],
"metadata": row[6],
"comment": row[7] or "", # 如果没有 comment 则显示空字符串
"created_at": row[8],
"manual_detections": row[9] or "[]", # 人工标注检测框结果
"is_manual_labeled": bool(row[10]) if row[10] is not None else False # 是否已完成人工标注
})
return jsonify(images)
@app.route('/api/images', methods=['DELETE'])
def delete_image_api():
"""API: 删除单张图片记录及其文件"""
image_id = request.json.get('id')
if not image_id:
return jsonify({"error": "Image ID is required"}), 400
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# 查询文件名,包含标注图片
cursor.execute("SELECT left_filename, right_filename, left_marked_filename, right_marked_filename FROM images WHERE id = ?", (image_id,))
row = cursor.fetchone()
if not row:
conn.close()
return jsonify({"error": "Image not found"}), 404
left_filename, right_filename, left_marked_filename, right_marked_filename = row
# 删除数据库记录
cursor.execute("DELETE FROM images WHERE id = ?", (image_id,))
conn.commit()
conn.close()
# 删除对应的文件,包括标注图片
left_path = os.path.join(SAVE_PATH_LEFT, left_filename)
right_path = os.path.join(SAVE_PATH_RIGHT, right_filename)
left_marked_path = os.path.join(SAVE_PATH_LEFT_MARKED, left_marked_filename) if left_marked_filename else None
right_marked_path = os.path.join(SAVE_PATH_RIGHT_MARKED, right_marked_filename) if right_marked_filename else None
try:
if os.path.exists(left_path):
os.remove(left_path)
logger.info(f"Deleted file: {left_path}")
if os.path.exists(right_path):
os.remove(right_path)
logger.info(f"Deleted file: {right_path}")
if left_marked_path and os.path.exists(left_marked_path):
os.remove(left_marked_path)
logger.info(f"Deleted file: {left_marked_path}")
if right_marked_path and os.path.exists(right_marked_path):
os.remove(right_marked_path)
logger.info(f"Deleted file: {right_marked_path}")
except OSError as e:
logger.error(f"Error deleting files: {e}")
# 即使删除文件失败,数据库记录也已删除,返回成功
pass
return jsonify({"message": f"Image {image_id} deleted successfully"})
@app.route('/api/images/export', methods=['POST'])
def export_images_api():
"""API: 打包导出选中的图片,优先导出标注图片"""
selected_ids = request.json.get('ids', [])
if not selected_ids:
return jsonify({"error": "No image IDs selected"}), 400
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
placeholders = ','.join('?' * len(selected_ids))
# 查询包含标注图片的文件名
cursor.execute(f"SELECT left_filename, right_filename, left_marked_filename, right_marked_filename FROM images WHERE id IN ({placeholders})", selected_ids)
rows = cursor.fetchall()
conn.close()
if not rows:
return jsonify({"error": "No matching images found"}), 404
# 创建临时 ZIP 文件
temp_zip_fd, temp_zip_path = tempfile.mkstemp(suffix='.zip')
os.close(temp_zip_fd) # 关闭文件描述符,让 zipfile 模块管理
try:
with zipfile.ZipFile(temp_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for left_fn, right_fn, left_marked_fn, right_marked_fn in rows:
# 优先使用标注图片,如果没有则使用原图
left_export_fn = left_marked_fn if left_marked_fn else left_fn
right_export_fn = right_marked_fn if right_marked_fn else right_fn
# 确定文件路径
left_export_path = os.path.join(SAVE_PATH_LEFT_MARKED if left_marked_fn else SAVE_PATH_LEFT, left_export_fn)
right_export_path = os.path.join(SAVE_PATH_RIGHT_MARKED if right_marked_fn else SAVE_PATH_RIGHT, right_export_fn)
# 添加到 ZIP 文件
if os.path.exists(left_export_path):
zipf.write(left_export_path, os.path.join('left', left_export_fn))
if os.path.exists(right_export_path):
zipf.write(right_export_path, os.path.join('right', right_export_fn))
logger.info(f"Exported {len(rows)} image pairs to {temp_zip_path}")
# 返回 ZIP 文件给客户端
return send_file(temp_zip_path, as_attachment=True, download_name='exported_images.zip')
except Exception as e:
logger.error(f"Error creating export ZIP: {e}")
if os.path.exists(temp_zip_path):
os.remove(temp_zip_path)
return jsonify({"error": str(e)}), 500
@app.route('/upload', methods=['POST'])
def upload_images():
"""接收左右摄像头图片,保存并推送更新,同时生成标注图片"""
try:
# 从 multipart/form-data 中获取文件
left_file = request.files.get('left_image')
right_file = request.files.get('right_image')
metadata_str = request.form.get('metadata') # 如果需要处理元数据
comment = request.form.get('comment', '') # 获取 comment 字段
if not left_file or not right_file:
logger.warning("Received request without required image files.")
return jsonify({"error": "Missing left_image or right_image"}), 400
# 读取图片数据
left_img_bytes = left_file.read()
right_img_bytes = right_file.read()
# 解码图片用于后续处理 (如显示、保存)
nparr_left = np.frombuffer(left_img_bytes, np.uint8)
nparr_right = np.frombuffer(right_img_bytes, np.uint8)
img_left = cv2.imdecode(nparr_left, cv2.IMREAD_COLOR)
img_right = cv2.imdecode(nparr_right, cv2.IMREAD_COLOR)
if img_left is None or img_right is None:
logger.error("Failed to decode received images.")
return jsonify({"error": "Could not decode images"}), 400
# 解析元数据 (如果提供)
metadata = {}
if metadata_str:
try:
metadata = json.loads(metadata_str)
except json.JSONDecodeError as e:
logger.warning(f"Could not parse metadata: {e}")
timestamp_str = str(metadata.get("timestamp", str(int(time.time()))))
timestamp_str_safe = timestamp_str.replace(".", "_") # 避免文件名中的点号问题
# 生成文件名
left_filename = f"left_{timestamp_str_safe}.jpg"
right_filename = f"right_{timestamp_str_safe}.jpg"
left_marked_filename = f"left_marked_{timestamp_str_safe}.jpg" # 标注图片文件名
right_marked_filename = f"right_marked_{timestamp_str_safe}.jpg" # 标注图片文件名
# 保存原图到本地
left_path = os.path.join(SAVE_PATH_LEFT, left_filename)
right_path = os.path.join(SAVE_PATH_RIGHT, right_filename)
# 确保目录存在
os.makedirs(SAVE_PATH_LEFT, exist_ok=True)
os.makedirs(SAVE_PATH_RIGHT, exist_ok=True)
os.makedirs(SAVE_PATH_LEFT_MARKED, exist_ok=True) # 创建标注图片目录
os.makedirs(SAVE_PATH_RIGHT_MARKED, exist_ok=True) # 创建标注图片目录
cv2.imwrite(left_path, img_left)
cv2.imwrite(right_path, img_right)
logger.info(f"Saved original images: {left_path}, {right_path}")
# 使用 VisionAPIClient 处理图片并生成标注图片
left_marked_path = None
right_marked_path = None
try:
# with VisionAPIClient() as client:
# # 处理左图
# left_task_id = client.submit_task(image_id=1, image=img_left)
# # 处理右图
# right_task_id = client.submit_task(image_id=2, image=img_right)
# # 等待任务完成
# client.task_queue.join()
# # 获取处理结果
# left_result = client.get_result(left_task_id)
# right_result = client.get_result(right_task_id)
# # 生成标注图片
# if left_result and left_result.success:
# marked_left_img = draw_detections_on_image(img_left, left_result.detections)
# left_marked_path = os.path.join(SAVE_PATH_LEFT_MARKED, left_marked_filename)
# cv2.imwrite(left_marked_path, marked_left_img)
# logger.info(f"Saved marked left image: {left_marked_path}")
# if right_result and right_result.success:
# marked_right_img = draw_detections_on_image(img_right, right_result.detections)
# right_marked_path = os.path.join(SAVE_PATH_RIGHT_MARKED, right_marked_filename)
# cv2.imwrite(right_marked_path, marked_right_img)
# logger.info(f"Saved marked right image: {right_marked_path}")
# Debug 直接原图覆盖
left_marked_path = os.path.join(SAVE_PATH_LEFT_MARKED, left_marked_filename)
cv2.imwrite(left_marked_path, img_left)
logger.info(f"Saved marked left image: {left_marked_path}")
right_marked_path = os.path.join(SAVE_PATH_RIGHT_MARKED, right_marked_filename)
cv2.imwrite(right_marked_path, img_right)
logger.info(f"Saved marked right image: {right_marked_path}")
except Exception as e:
logger.error(f"Error processing images with VisionAPIClient: {e}")
# 即使处理失败,也继续保存原图
# 将图片信息写入数据库,包含标注图片字段
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO images (left_filename, right_filename, left_marked_filename, right_marked_filename, timestamp, metadata, comment)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (left_filename, right_filename,
left_marked_filename if left_marked_path else None,
right_marked_filename if right_marked_path else None,
float(timestamp_str), json.dumps(metadata), comment))
conn.commit()
image_id = cursor.lastrowid # 获取新插入记录的 ID
conn.close()
logger.info(f"Recorded image pair (ID: {image_id}) in database.")
# 将 OpenCV 图像编码为 base64 字符串,用于 WebSocket 传输
_, left_encoded = cv2.imencode('.jpg', img_left)
_, right_encoded = cv2.imencode('.jpg', img_right)
left_b64 = base64.b64encode(left_encoded).decode('utf-8')
right_b64 = base64.b64encode(right_encoded).decode('utf-8')
# 更新用于实时显示的全局变量 (如果需要)
with frame_lock:
global latest_left_frame, latest_right_frame, latest_timestamp
latest_left_frame = img_left
latest_right_frame = img_right
latest_timestamp = timestamp_str
socketio.emit('update_image', {
'left_image': left_b64,
'right_image': right_b64,
'timestamp': timestamp_str
})
return jsonify({"message": "Images received, saved, and pushed via WebSocket", "timestamp": timestamp_str, "id": image_id})
except Exception as e:
logger.error(f"Error processing upload: {e}")
return jsonify({"error": str(e)}), 500
@app.route('/api/images/comment', methods=['PUT'])
def update_image_comment():
"""API: 更新图片的 comment"""
data = request.json
image_id = data.get('id')
comment = data.get('comment', '')
if not image_id:
return jsonify({"error": "Image ID is required"}), 400
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
# 更新 comment 字段
cursor.execute("UPDATE images SET comment = ? WHERE id = ?", (comment, image_id))
conn.commit()
conn.close()
return jsonify({"message": f"Comment for image {image_id} updated successfully"})
@app.route('/status')
def status():
with frame_lock:
has_frames = latest_left_frame is not None and latest_right_frame is not None
timestamp = latest_timestamp if has_frames else "N/A"
return jsonify({"has_frames": has_frames, "latest_timestamp": timestamp})
@app.route('/api/images/manual-detections', methods=['PUT'])
def update_manual_detections():
"""API: 更新图片的人工标注检测框结果,支持左右图像分别标注"""
data = request.json
image_id = data.get('id')
side = data.get('side', 'left') # 获取side参数默认为左侧
detections = data.get('detections')
if not image_id or detections is None:
return jsonify({"error": "Image ID and detections are required"}), 400
# 验证检测数据格式
if not isinstance(detections, list):
return jsonify({"error": "Detections must be a list"}), 400
for detection in detections:
if not isinstance(detection, dict):
return jsonify({"error": "Each detection must be a dictionary"}), 400
required_keys = ['id', 'label', 'bbox']
for key in required_keys:
if key not in detection:
return jsonify({"error": f"Missing required key '{key}' in detection"}), 400
# 验证ID
if not isinstance(detection['id'], int) or detection['id'] not in [1, 2, 3, 4]:
return jsonify({"error": f"Invalid ID in detection: {detection['id']}"}), 400
# 验证标签
valid_labels = ['caisson', 'soldier', 'gun', 'number']
if detection['label'] not in valid_labels:
return jsonify({"error": f"Invalid label in detection: {detection['label']}"}), 400
# 验证边界框
bbox = detection['bbox']
if not isinstance(bbox, list) or len(bbox) != 4:
return jsonify({"error": f"Invalid bbox format in detection"}), 400
for coord in bbox:
if not isinstance(coord, int) or not (0 <= coord <= 999):
return jsonify({"error": f"Invalid bbox coordinate: {coord}"}), 400
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute("SELECT id FROM images WHERE id = ?", (image_id,))
if not cursor.fetchone():
conn.close()
return jsonify({"error": "Image not found"}), 404
# 添加人工标注字段(如果不存在)
try:
cursor.execute("""
ALTER TABLE images ADD COLUMN manual_detections_left TEXT
""")
except sqlite3.OperationalError:
pass
try:
cursor.execute("""
ALTER TABLE images ADD COLUMN manual_detections_right TEXT
""")
except sqlite3.OperationalError:
pass
try:
cursor.execute("""
ALTER TABLE images ADD COLUMN is_manual_labeled_left INTEGER DEFAULT 0
""")
except sqlite3.OperationalError:
pass
try:
cursor.execute("""
ALTER TABLE images ADD COLUMN is_manual_labeled_right INTEGER DEFAULT 0
""")
except sqlite3.OperationalError:
pass
# 根据side参数更新对应的人工标注结果
if side == 'left':
cursor.execute("""
UPDATE images
SET manual_detections_left = ?, is_manual_labeled_left = 1
WHERE id = ?
""", (json.dumps(detections), image_id))
else: # side == 'right'
cursor.execute("""
UPDATE images
SET manual_detections_right = ?, is_manual_labeled_right = 1
WHERE id = ?
""", (json.dumps(detections), image_id))
conn.commit()
conn.close()
# 重新生成标注图片
try:
regenerate_marked_images(image_id, detections, side)
return jsonify({"message": f"Manual detections for image {image_id} ({side}) updated successfully and marked images regenerated"})
except Exception as e:
return jsonify({"message": f"Manual detections for image {image_id} ({side}) updated successfully but failed to regenerate marked images: {str(e)}"}), 500
# 添加重新生成标注图片的函数
def regenerate_marked_images(image_id, detections, side):
"""重新生成标注图片,支持左右图像分别处理"""
conn = sqlite3.connect(DATABASE_PATH)
cursor = conn.cursor()
cursor.execute("""
SELECT left_filename, right_filename, left_marked_filename, right_marked_filename
FROM images
WHERE id = ?
""", (image_id,))
row = cursor.fetchone()
conn.close()
if not row:
raise Exception("Image not found")
left_filename, right_filename, left_marked_filename, right_marked_filename = row
# 根据指定的side重新生成对应的标注图片
if side == 'left' and left_marked_filename:
left_path = os.path.join(SAVE_PATH_LEFT, left_filename)
left_marked_path = os.path.join(SAVE_PATH_LEFT_MARKED, left_marked_filename)
if os.path.exists(left_path):
img_left = cv2.imread(left_path)
if img_left is not None:
marked_left_img = draw_detections_on_image(img_left, detections)
cv2.imwrite(left_marked_path, marked_left_img)
elif side == 'right' and right_marked_filename:
right_path = os.path.join(SAVE_PATH_RIGHT, right_filename)
right_marked_path = os.path.join(SAVE_PATH_RIGHT_MARKED, right_marked_filename)
if os.path.exists(right_path):
img_right = cv2.imread(right_path)
if img_right is not None:
marked_right_img = draw_detections_on_image(img_right, detections)
cv2.imwrite(right_marked_path, marked_right_img)
# 添加人工标注页面路由
@app.route('/manual-annotation')
def manual_annotation():
"""人工标注页面"""
return render_template('manual_annotation.html')
# --- SocketIO 事件处理程序 ---
@socketio.event
def connect():
"""客户端连接事件"""
logger.info("Client connected")
@socketio.event
def disconnect():
"""客户端断开连接事件"""
logger.info("Client disconnected")
@socketio.event
def capture_button(data):
"""
SocketIO 事件处理器,当客户端发送 'button_pressed' 事件时触发
"""
try:
image_client.request_sync()
logger.info("Request sent to server.")
except Exception as e:
logger.error(f"Error sending request: {e}")
if __name__ == '__main__':
logger.info(f"Starting Flask-SocketIO server on {FLASK_HOST}:{FLASK_PORT}")
socketio.run(app, host=FLASK_HOST, port=FLASK_PORT, debug=False, allow_unsafe_werkzeug=True)