Files
uzh-fpv-sv-test/src/event_utils.py
CaoWangrenbo 02d429282e feat: add --show-events overlay with raw log intensity
Visualize raw temporal brightness change (threshold=0, log domain)
as green(+)/red(-) gradient overlay proportional to |change|.
Supports video output and live display modes.
Enables EventProcessor threshold=0 for raw mode without clipping.

Generated by Mistral Vibe.
Co-Authored-By: Mistral Vibe <vibe@mistral.ai>
2026-06-08 11:47:19 +08:00

117 lines
4.2 KiB
Python

"""
Event camera simulation utilities for ML preprocessing.
Core logic extracted from EventCameraSimulator (test.py).
Designed for frame-by-frame preprocessing in training pipelines.
Output:
events_binary: (-1, 0, +1) hard threshold decision
events_strength: [-1, 1] continuous change intensity (clipped & normalized)
"""
import numpy as np
from collections import deque
class EventProcessor:
"""Lightweight event computation module. No visualization, no OpenCV dependency."""
def __init__(self, threshold=0.1, use_log=True, auto_threshold=False):
self.threshold = threshold
self.use_log = use_log
self.auto_threshold = auto_threshold
self.prev_brightness = None
self.change_history = deque(maxlen=100)
self.threshold_scale = 1.5
def reset(self):
"""Clear temporal state (call on video/reset)."""
self.prev_brightness = None
self.change_history.clear()
def _to_grayscale(self, frame):
"""Convert frame to grayscale float32."""
if frame.ndim == 3:
# RGB/HWC -> gray via luminance weights
gray = 0.299 * frame[..., 0] + 0.587 * frame[..., 1] + 0.114 * frame[..., 2]
else:
gray = frame
return gray.astype(np.float32)
def _compute_change(self, brightness):
"""Compute log or linear brightness change."""
if self.use_log:
eps = 1e-3
return np.log(brightness + eps) - np.log(self.prev_brightness + eps)
else:
return brightness - self.prev_brightness
def _update_auto_threshold(self, change):
"""Adapt threshold based on global change statistics."""
abs_change = np.abs(change)
mean_change = np.mean(abs_change)
self.change_history.append(mean_change)
if len(self.change_history) > 10:
avg_change = np.mean(self.change_history)
new_threshold = max(avg_change * self.threshold_scale, 0.01)
self.threshold = self.threshold * 0.9 + new_threshold * 0.1
if self.use_log:
self.threshold = np.clip(self.threshold, 0.01, 0.5)
else:
self.threshold = np.clip(self.threshold, 1, 50)
def __call__(self, frame):
"""
Process a single frame.
Args:
frame: np.ndarray, shape (H, W) or (H, W, C), uint8 or float.
Returns:
When threshold > 0:
events_binary: np.ndarray (H, W), values in {-1, 0, +1}
events_strength: np.ndarray (H, W), values in [-1, 1]
event_count: int, number of non-zero events
When threshold == 0 (raw output, no thresholding):
change_raw: np.ndarray (H, W), raw log/linear brightness change (float32)
change_raw: same as above
event_count: int, number of pixels with non-zero change
"""
brightness = self._to_grayscale(frame)
# First frame — initialise, no events
if self.prev_brightness is None:
self.prev_brightness = brightness
h, w = brightness.shape
if self.threshold == 0:
return np.zeros((h, w), dtype=np.float32), np.zeros((h, w), dtype=np.float32), 0
return np.zeros((h, w), dtype=np.int8), np.zeros((h, w), dtype=np.float32), 0
change = self._compute_change(brightness)
# threshold == 0: raw mode, skip thresholding
if self.threshold == 0:
self.prev_brightness = brightness
change_f32 = change.astype(np.float32)
return change_f32, change_f32, int(np.count_nonzero(change))
if self.auto_threshold:
self._update_auto_threshold(change)
# Binary events
events_binary = np.zeros_like(brightness, dtype=np.int8)
events_binary[change > self.threshold] = 1
events_binary[change < -self.threshold] = -1
# Continuous strength: clip to [-threshold, threshold] then normalise to [-1, 1]
events_strength = np.clip(change, -self.threshold, self.threshold) / self.threshold
event_count = int(np.count_nonzero(events_binary))
self.prev_brightness = brightness
return events_binary, events_strength, event_count