import toml from loguru import logger import zmq import cv2 import numpy as np import requests import base64 def get_access_token(): client_id = "MDCGplPqK0kteOgbXwt5cyn0" client_secret = "yIHJQUUiMkkw53nlQqHpiLvRFsLGcqgn" url = "https://aip.baidubce.com/oauth/2.0/token" params = { 'grant_type': 'client_credentials', 'client_id': client_id, 'client_secret': client_secret } headers = { 'Content-Type': 'application/json', 'Accept': 'application/json' } response = requests.post(url, params=params, headers=headers) response_json = response.json() if 'access_token' in response_json: return response_json['access_token'] else: print("Failed to get access_token:", response_json.get('error_description')) return None def ocr_api_request(image_base64): # url = "https://aip.baidubce.com/rest/2.0/ocr/v1/accurate_basic" # 高精度 # url = "https://aip.baidubce.com/rest/2.0/ocr/v1/accurate" # 高精度带位置 url = "https://aip.baidubce.com/rest/2.0/ocr/v1/general" # 标准精度带位置 headers = { 'Content-Type': 'application/json' } params = { 'access_token': get_access_token(), 'image': image_base64, 'probability': 'true' } try: response = requests.post(url, headers=headers, data=params, timeout=5) try: return response.json() except requests.exceptions.JSONDecodeError: return None except requests.exceptions.Timeout: return None except requests.exceptions.RequestException as e: return None if __name__ == "__main__": cfg = toml.load('../cfg_infer_server.toml') # 配置日志输出 logger.add(cfg['debug']['logger_filename'], format=cfg['debug']['logger_format'], retention = 5, level="INFO") context1 = zmq.Context() camera_socket = context1.socket(zmq.REQ) camera_socket.connect(f"tcp://localhost:{cfg['camera']['camera2_port']}") logger.info("connect camera success") # 初始化 server context = zmq.Context() # 启动 server socket = context.socket(zmq.REP) socket.bind(f"tcp://*:{cfg['server']['ocr_infer_port']}") while True: message1 = socket.recv_string() logger.info("recv client request") for _ in range(5): camera_socket.send_string("") message = camera_socket.recv() np_array = np.frombuffer(message, dtype=np.uint8) image = cv2.imdecode(np_array, cv2.IMREAD_COLOR) output_file_path = 'output_image.jpg' success = cv2.imwrite(output_file_path, image) encoded_image = base64.b64encode(message).decode('utf-8') result = ocr_api_request(encoded_image) print(result) if result != None: socket.send_pyobj({'code': 0, 'content': result.get('words_result')}) else: socket.send_pyobj({'code': -1, 'content': None}) if cv2.waitKey(1) == 27: break logger.info("ocr infer server exit")