Files
uzh-fpv-sv-test/src/event_utils.py
2026-05-29 18:49:01 +08:00

104 lines
3.5 KiB
Python

"""
Event camera simulation utilities for ML preprocessing.
Core logic extracted from EventCameraSimulator (test.py).
Designed for frame-by-frame preprocessing in training pipelines.
Output:
events_binary: (-1, 0, +1) hard threshold decision
events_strength: [-1, 1] continuous change intensity (clipped & normalized)
"""
import numpy as np
from collections import deque
class EventProcessor:
"""Lightweight event computation module. No visualization, no OpenCV dependency."""
def __init__(self, threshold=0.1, use_log=True, auto_threshold=False):
self.threshold = threshold
self.use_log = use_log
self.auto_threshold = auto_threshold
self.prev_brightness = None
self.change_history = deque(maxlen=100)
self.threshold_scale = 1.5
def reset(self):
"""Clear temporal state (call on video/reset)."""
self.prev_brightness = None
self.change_history.clear()
def _to_grayscale(self, frame):
"""Convert frame to grayscale float32."""
if frame.ndim == 3:
# RGB/HWC -> gray via luminance weights
gray = 0.299 * frame[..., 0] + 0.587 * frame[..., 1] + 0.114 * frame[..., 2]
else:
gray = frame
return gray.astype(np.float32)
def _compute_change(self, brightness):
"""Compute log or linear brightness change."""
if self.use_log:
eps = 1e-3
return np.log(brightness + eps) - np.log(self.prev_brightness + eps)
else:
return brightness - self.prev_brightness
def _update_auto_threshold(self, change):
"""Adapt threshold based on global change statistics."""
abs_change = np.abs(change)
mean_change = np.mean(abs_change)
self.change_history.append(mean_change)
if len(self.change_history) > 10:
avg_change = np.mean(self.change_history)
new_threshold = max(avg_change * self.threshold_scale, 0.01)
self.threshold = self.threshold * 0.9 + new_threshold * 0.1
if self.use_log:
self.threshold = np.clip(self.threshold, 0.01, 0.5)
else:
self.threshold = np.clip(self.threshold, 1, 50)
def __call__(self, frame):
"""
Process a single frame.
Args:
frame: np.ndarray, shape (H, W) or (H, W, C), uint8 or float.
Returns:
events_binary: np.ndarray (H, W), values in {-1, 0, +1}
events_strength: np.ndarray (H, W), values in [-1, 1]
event_count: int, number of non-zero events
"""
brightness = self._to_grayscale(frame)
# First frame — initialise, no events
if self.prev_brightness is None:
self.prev_brightness = brightness
h, w = brightness.shape
return np.zeros((h, w), dtype=np.int8), np.zeros((h, w), dtype=np.float32), 0
change = self._compute_change(brightness)
if self.auto_threshold:
self._update_auto_threshold(change)
# Binary events
events_binary = np.zeros_like(brightness, dtype=np.int8)
events_binary[change > self.threshold] = 1
events_binary[change < -self.threshold] = -1
# Continuous strength: clip to [-threshold, threshold] then normalise to [-1, 1]
events_strength = np.clip(change, -self.threshold, self.threshold) / self.threshold
event_count = int(np.count_nonzero(events_binary))
self.prev_brightness = brightness
return events_binary, events_strength, event_count