import toml from loguru import logger import zmq import cv2 import numpy as np import requests import base64 import datetime def get_access_token(): client_id = "MDCGplPqK0kteOgbXwt5cyn0" client_secret = "yIHJQUUiMkkw53nlQqHpiLvRFsLGcqgn" url = "https://aip.baidubce.com/oauth/2.0/token" params = { 'grant_type': 'client_credentials', 'client_id': client_id, 'client_secret': client_secret } headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } response = requests.post(url, params=params, headers=headers) response_json = response.json() if 'access_token' in response_json: return response_json['access_token'] else: print("Failed to get access_token:", response_json.get('error_description')) return None def ocr_api_request(image_base64): # url = "https://aip.baidubce.com/rest/2.0/ocr/v1/accurate_basic" # 高精度 url = "https://aip.baidubce.com/rest/2.0/ocr/v1/accurate" # 高精度带位置 # url = "https://aip.baidubce.com/rest/2.0/ocr/v1/general" # 标准精度带位置 headers = { 'Content-Type': 'application/json' } params = { 'access_token': get_access_token(), 'image': image_base64, 'probability': 'true' } try: response = requests.post(url, headers=headers, data=params, timeout=5) try: return response.json() except requests.exceptions.JSONDecodeError: return None except requests.exceptions.Timeout: return None except requests.exceptions.RequestException as e: return None if __name__ == "__main__": logger.info("ocr server 开始加载") cfg = toml.load('/home/evan/Workplace/project_infer/cfg_infer_server.toml') # 配置日志输出 logger.add(cfg['debug']['logger_filename'], format=cfg['debug']['logger_format'], retention = 5, level="INFO") # context1 = zmq.Context() # camera_socket = context1.socket(zmq.REQ) # camera_socket.connect(f"tcp://localhost:{cfg['camera']['camera2_port']}") # logger.info("connect camera success") # cap = cv2.VideoCapture(20) # cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter.fourcc('M','J','P','G')) # cap.set(cv2.CAP_PROP_FPS, 20) # cap.set(cv2.CAP_PROP_FRAME_WIDTH, 960) # cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 540) # 初始化 server context = zmq.Context() # 启动 server socket = context.socket(zmq.REP) socket.bind(f"tcp://*:{cfg['server']['ocr_infer_port']}") import signal import sys def signal_handler(signum, frame): logger.info(f"接收到退出信号 {signum}, 退出中") socket.close() context.term() sys.exit(0) # 注册信号处理函数 signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) logger.info("ocr server 加载完成") while True: message1 = socket.recv_string() logger.info("收到客户端请求") logger.info("构造摄像头") cap = cv2.VideoCapture(20) cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter.fourcc('M','J','P','G')) cap.set(cv2.CAP_PROP_FPS, 20) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 960) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 540) for _ in range(5): ret, frame = cap.read() cv2.waitKey(1) if ret: frame = frame[:,0:480] frame = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE) now = datetime.datetime.now() timestamp = now.strftime("%Y%m%d_%H%M%S") output_file_path = f"./saved_picture/{timestamp}.jpg" success = cv2.imwrite(output_file_path, frame) _, frame = cv2.imencode('.jpg', frame) encoded_image = base64.b64encode(frame).decode('utf-8') result = ocr_api_request(encoded_image) # print(result) if result != None: socket.send_pyobj({'code': 0, 'content': result.get('words_result')}) logger.info(f"ocr 返回 {result.get('words_result')}") else: socket.send_pyobj({'code': -1, 'content': " ocr 没找到文字"}) logger.error("ocr 没找到文字") else: socket.send_pyobj({'code': -1, 'content': "ocr 摄像头读取出错"}) logger.critical("ocr 摄像头读取出错") cap.release() if cv2.waitKey(1) == 27: break logger.info("ocr infer server exit")