initial commit

This commit is contained in:
2025-11-01 01:11:35 +08:00
commit b3b34b5d91
6 changed files with 794 additions and 0 deletions

210
gimbal.py Normal file
View File

@@ -0,0 +1,210 @@
import socket
import struct
import urllib.request
import urllib.error
import time
class gimbal_ctrl:
"""
云台控制器类,用于通过 TCP 和 HTTP 协议控制云台。
支持长连接模式,避免频繁新建连接导致设备 RST。
"""
def __init__(self, host, port, timeout=5):
"""
初始化控制器。
Args:
host (str): 云台设备的 IP 地址。
port (int): 云台设备的 TCP 控制端口。
timeout (int, optional): TCP 连接和接收响应的超时时间(秒)。默认为 5。
"""
self.host = host
self.port = port
self.base_url = f"http://{host}" # 基础 URL 用于 HTTP 请求
self.timeout = timeout
self.client_socket = None # 长连接 socket
def connect(self):
"""建立 TCP 长连接"""
if self.client_socket is not None:
return # 已连接,直接返回
try:
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.settimeout(self.timeout)
self.client_socket.connect((self.host, self.port))
print(f"Connected to {self.host}:{self.port}")
except Exception as e:
print(f"Failed to connect to {self.host}:{self.port}: {e}")
self.client_socket = None
def disconnect(self):
"""断开 TCP 连接"""
if self.client_socket:
try:
self.client_socket.close()
except:
pass
self.client_socket = None
print("TCP connection closed.")
def calculate_checksum(self, data_bytes):
"""
计算校验和。
使用简单累加和32位有符号与原始代码一致。
如设备使用 CRC32请替换为 zlib.crc32。
"""
checksum = 0
for i in range(0, len(data_bytes), 4):
chunk = data_bytes[i:i+4]
if len(chunk) == 4:
value = struct.unpack('<i', chunk)[0]
checksum += value
checksum_unsigned = checksum & 0xFFFFFFFF
checksum_signed = struct.unpack('<i', struct.pack('<I', checksum_unsigned))[0]
return checksum_signed
def _send_tcp_command(self, command_id, param_value):
"""
发送 TCP 二进制命令(使用长连接,不等待响应)。
自动重连(如果连接断开)。
"""
if self.client_socket is None:
self.connect()
if self.client_socket is None:
return # 连接失败,直接返回
# 打包数据
header_data = struct.pack('<ii', command_id, param_value)
checksum = self.calculate_checksum(header_data)
checksum_data = struct.pack('<i', checksum)
binary_data = header_data + checksum_data
try:
sent_bytes = self.client_socket.send(binary_data)
# print(f"Sent {sent_bytes} bytes successfully.")
except (ConnectionResetError, BrokenPipeError, OSError) as e:
print(f"Connection lost: {e}. Reconnecting...")
self.disconnect()
# 可选:尝试立即重发(谨慎使用,避免死循环)
# self._send_tcp_command(command_id, param_value)
# --- TCP 控制方法 ---
def set_pitch_speed(self, speed):
if -99 <= speed <= 99:
self._send_tcp_command(0x0000000C, speed)
else:
print(f"Pitch speed value {speed} is out of range [-99, 99].")
def set_yaw_speed(self, speed):
if -99 <= speed <= 99:
self._send_tcp_command(0x0000000D, speed)
else:
print(f"Yaw speed value {speed} is out of range [-99, 99].")
def set_roll_speed(self, speed):
if -99 <= speed <= 99:
self._send_tcp_command(0x0000000E, speed)
else:
print(f"Roll speed value {speed} is out of range [-99, 99].")
def center(self):
self._send_tcp_command(0x00000010, 0)
def take_photo(self):
self._send_tcp_command(0x045c8701, 0)
def zoom_in(self):
self._send_tcp_command(0x0000001F, 0x01)
def zoom_out(self):
self._send_tcp_command(0x0000001F, 0x02)
def osd_on(self):
self._send_tcp_command(0x00000005, 0x01)
def osd_off(self):
self._send_tcp_command(0x00000005, 0x02)
def laser_on(self):
self._send_tcp_command(0x0000001A, 0x01)
def laser_off(self):
self._send_tcp_command(0x0000001A, 0x02)
def track_on(self):
self._send_tcp_command(0x0000001C, 0x01)
def track_off(self):
self._send_tcp_command(0x0000001C, 0x02)
# --- HTTP 控制方法(保持不变)---
def _send_http_get(self, path):
full_url = self.base_url + path
try:
req = urllib.request.Request(full_url)
req.add_header('User-Agent', 'PanTiltController')
response = urllib.request.urlopen(req)
# print(f"HTTP GET request to {full_url} successful.")
except urllib.error.HTTPError as e:
print(f"HTTP Error for {full_url}: {e.code} - {e.reason}")
except urllib.error.URLError as e:
print(f"URL Error for {full_url}: {e.reason}")
except Exception as e:
print(f"Unexpected error for {full_url}: {e}")
def set_pip_mode(self, mode):
if 0 <= mode <= 3:
path = f"/cgi-bin/Config.cgi?action=set&property=Camera.IrPip&value={mode}"
self._send_http_get(path)
else:
print(f"PIP mode value {mode} is out of range [0, 3].")
def set_ir_mode(self, mode):
if 0 <= mode <= 9:
path = f"/cgi-bin/Config.cgi?action=set&property=Camera.IrColorMode&value={mode}"
self._send_http_get(path)
else:
print(f"IR mode value {mode} is out of range [0, 9].")
# --- 使用示例 ---
if __name__ == "__main__":
PTZ_HOST = '192.168.17.114'
PTZ_PORT = 49125
ptz = gimbal_ctrl(PTZ_HOST, PTZ_PORT)
try:
# 可选:显式连接(非必须,发送时会自动连接)
# ptz.connect()
print("Sending commands...")
ptz.center()
time.sleep(1)
ptz.set_pitch_speed(99)
time.sleep(1)
ptz.set_pitch_speed(0)
time.sleep(1)
ptz.set_yaw_speed(-99)
time.sleep(1)
ptz.set_yaw_speed(0)
time.sleep(1)
ptz.set_roll_speed(-99)
time.sleep(1)
ptz.set_roll_speed(0)
time.sleep(1)
print("All commands sent.")
finally:
# 程序结束时断开连接
ptz.disconnect()
print("Connection closed.")
time.sleep(1)
exit(1)