Files
m20_monitor_web/gimbal.py

211 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import socket
import struct
import urllib.request
import urllib.error
import time
class gimbal_ctrl:
"""
云台控制器类,用于通过 TCP 和 HTTP 协议控制云台。
支持长连接模式,避免频繁新建连接导致设备 RST。
"""
def __init__(self, host, port, timeout=5):
"""
初始化控制器。
Args:
host (str): 云台设备的 IP 地址。
port (int): 云台设备的 TCP 控制端口。
timeout (int, optional): TCP 连接和接收响应的超时时间(秒)。默认为 5。
"""
self.host = host
self.port = port
self.base_url = f"http://{host}" # 基础 URL 用于 HTTP 请求
self.timeout = timeout
self.client_socket = None # 长连接 socket
def connect(self):
"""建立 TCP 长连接"""
if self.client_socket is not None:
return # 已连接,直接返回
try:
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.settimeout(self.timeout)
self.client_socket.connect((self.host, self.port))
print(f"Connected to {self.host}:{self.port}")
except Exception as e:
print(f"Failed to connect to {self.host}:{self.port}: {e}")
self.client_socket = None
def disconnect(self):
"""断开 TCP 连接"""
if self.client_socket:
try:
self.client_socket.close()
except:
pass
self.client_socket = None
print("TCP connection closed.")
def calculate_checksum(self, data_bytes):
"""
计算校验和。
使用简单累加和32位有符号与原始代码一致。
如设备使用 CRC32请替换为 zlib.crc32。
"""
checksum = 0
for i in range(0, len(data_bytes), 4):
chunk = data_bytes[i:i+4]
if len(chunk) == 4:
value = struct.unpack('<i', chunk)[0]
checksum += value
checksum_unsigned = checksum & 0xFFFFFFFF
checksum_signed = struct.unpack('<i', struct.pack('<I', checksum_unsigned))[0]
return checksum_signed
def _send_tcp_command(self, command_id, param_value):
"""
发送 TCP 二进制命令(使用长连接,不等待响应)。
自动重连(如果连接断开)。
"""
if self.client_socket is None:
self.connect()
if self.client_socket is None:
return # 连接失败,直接返回
# 打包数据
header_data = struct.pack('<ii', command_id, param_value)
checksum = self.calculate_checksum(header_data)
checksum_data = struct.pack('<i', checksum)
binary_data = header_data + checksum_data
try:
sent_bytes = self.client_socket.send(binary_data)
# print(f"Sent {sent_bytes} bytes successfully.")
except (ConnectionResetError, BrokenPipeError, OSError) as e:
print(f"Connection lost: {e}. Reconnecting...")
self.disconnect()
# 可选:尝试立即重发(谨慎使用,避免死循环)
# self._send_tcp_command(command_id, param_value)
# --- TCP 控制方法 ---
def set_pitch_speed(self, speed):
if -99 <= speed <= 99:
self._send_tcp_command(0x0000000C, speed)
else:
print(f"Pitch speed value {speed} is out of range [-99, 99].")
def set_yaw_speed(self, speed):
if -99 <= speed <= 99:
self._send_tcp_command(0x0000000D, speed)
else:
print(f"Yaw speed value {speed} is out of range [-99, 99].")
def set_roll_speed(self, speed):
if -99 <= speed <= 99:
self._send_tcp_command(0x0000000E, speed)
else:
print(f"Roll speed value {speed} is out of range [-99, 99].")
def center(self):
self._send_tcp_command(0x00000010, 0)
def take_photo(self):
self._send_tcp_command(0x045c8701, 0)
def zoom_in(self):
self._send_tcp_command(0x0000001F, 0x01)
def zoom_out(self):
self._send_tcp_command(0x0000001F, 0x02)
def osd_on(self):
self._send_tcp_command(0x00000005, 0x01)
def osd_off(self):
self._send_tcp_command(0x00000005, 0x02)
def laser_on(self):
self._send_tcp_command(0x0000001A, 0x01)
def laser_off(self):
self._send_tcp_command(0x0000001A, 0x02)
def track_on(self):
self._send_tcp_command(0x0000001C, 0x01)
def track_off(self):
self._send_tcp_command(0x0000001C, 0x02)
# --- HTTP 控制方法(保持不变)---
def _send_http_get(self, path):
full_url = self.base_url + path
try:
req = urllib.request.Request(full_url)
req.add_header('User-Agent', 'PanTiltController')
response = urllib.request.urlopen(req)
# print(f"HTTP GET request to {full_url} successful.")
except urllib.error.HTTPError as e:
print(f"HTTP Error for {full_url}: {e.code} - {e.reason}")
except urllib.error.URLError as e:
print(f"URL Error for {full_url}: {e.reason}")
except Exception as e:
print(f"Unexpected error for {full_url}: {e}")
def set_pip_mode(self, mode):
if 0 <= mode <= 3:
path = f"/cgi-bin/Config.cgi?action=set&property=Camera.IrPip&value={mode}"
self._send_http_get(path)
else:
print(f"PIP mode value {mode} is out of range [0, 3].")
def set_ir_mode(self, mode):
if 0 <= mode <= 9:
path = f"/cgi-bin/Config.cgi?action=set&property=Camera.IrColorMode&value={mode}"
self._send_http_get(path)
else:
print(f"IR mode value {mode} is out of range [0, 9].")
# --- 使用示例 ---
if __name__ == "__main__":
PTZ_HOST = '192.168.17.114'
PTZ_PORT = 49125
ptz = gimbal_ctrl(PTZ_HOST, PTZ_PORT)
try:
# 可选:显式连接(非必须,发送时会自动连接)
# ptz.connect()
print("Sending commands...")
ptz.center()
time.sleep(1)
# ptz.set_pitch_speed(99)
# time.sleep(1)
# ptz.set_pitch_speed(0)
# time.sleep(1)
# ptz.set_yaw_speed(-99)
# time.sleep(1)
# ptz.set_yaw_speed(0)
# time.sleep(1)
# ptz.set_roll_speed(-99)
# time.sleep(1)
# ptz.set_roll_speed(0)
# time.sleep(1)
print("All commands sent.")
finally:
# 程序结束时断开连接
ptz.disconnect()
print("Connection closed.")
time.sleep(1)
exit(1)