From 042c0529a1c9bf2171e2203ec6b23daba71a42cb Mon Sep 17 00:00:00 2001 From: sl-perception Date: Tue, 3 Mar 2026 16:48:17 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20Add=20Issue=20#375=20=E2=80=94=20adapti?= =?UTF-8?q?ve=20camera=20power=20mode=20manager?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements a 5-mode FSM for dynamic sensor activation based on speed, scenario, and battery level — avoids running all 4 CSI cameras + full sensor suite when unnecessary, saving ~1 GB RAM and significant compute. Five modes (sensor sets): SLEEP — no sensors (~150 MB RAM) SOCIAL — webcam only (~400 MB RAM, parked/socialising) AWARE — front CSI + RealSense + LIDAR (~850 MB RAM, indoor/<5km/h) ACTIVE — front+rear CSI + RealSense + LIDAR + UWB (~1.15 GB, 5-15km/h) FULL — all 4 CSI + RealSense + LIDAR + UWB (~1.55 GB, >15km/h) Core library — _camera_power_manager.py (pure Python, no ROS2 deps) - CameraPowerFSM.update(speed_mps, scenario, battery_pct) → ModeDecision - Speed-driven upgrades: instant (safety-first) - Speed-driven downgrades: held for downgrade_hold_s (default 5s, anti-flap) - Scenario overrides (instant, bypass hysteresis): · CROSSING / EMERGENCY → FULL always · PARKED → SOCIAL immediately · INDOOR → cap at AWARE (never ACTIVE/FULL indoors) - Battery low cap: battery_pct < threshold → cap at AWARE - Idle timer: near-zero speed holds at AWARE for idle_to_social_s (30s) before dropping to SOCIAL (avoids cycling at traffic lights) ROS2 node — camera_power_node.py - Subscribes: /saltybot/speed, /saltybot/scenario, /saltybot/battery_pct - Publishes: /saltybot/camera_mode (CameraPowerMode, latched, 2 Hz) - Publishes: /saltybot/camera_cmd/{front,rear,left,right,realsense,lidar,uwb,webcam} (std_msgs/Bool, TRANSIENT_LOCAL so late subscribers get last state) - Logs mode transitions with speed/scenario/battery context Tests — test/test_camera_power_manager.py: 64/64 passing - Sensor configs: counts, correct flags per mode, safety invariants - Speed upgrades: instantaneous at all thresholds, no hold required - Downgrade hysteresis: hold timer, cancellation on speed spike, hold=0 instant - Scenario overrides: CROSSING/EMERGENCY/PARKED/INDOOR, all CSIs on crossing - Battery low: cap at AWARE, threshold boundary - Idle timer: delay AWARE→SOCIAL, motion resets timer - Reset, labels, ModeDecision fields - Integration: full ride scenario (walk→jog→sprint→crossing→indoor→park→low bat) Co-Authored-By: Claude Sonnet 4.6 --- .../saltybot_bringup/_camera_power_manager.py | 329 ++++++++++ .../saltybot_bringup/camera_power_node.py | 215 +++++++ .../test/test_camera_power_manager.py | 575 ++++++++++++++++++ .../msg/CameraPowerMode.msg | 26 + 4 files changed, 1145 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_camera_power_manager.py create mode 100644 jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/camera_power_node.py create mode 100644 jetson/ros2_ws/src/saltybot_bringup/test/test_camera_power_manager.py create mode 100644 jetson/ros2_ws/src/saltybot_scene_msgs/msg/CameraPowerMode.msg diff --git a/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_camera_power_manager.py b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_camera_power_manager.py new file mode 100644 index 0000000..78a1fa8 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/_camera_power_manager.py @@ -0,0 +1,329 @@ +""" +_camera_power_manager.py — Adaptive camera power mode FSM (Issue #375). + +Pure-Python library (no ROS2 / hardware dependencies) for full unit-test +coverage without hardware. + +Five Power Modes +---------------- +SLEEP (0) — charging / idle. + Sensors: none (or 1 CSI at 1 fps as wake trigger). + ~150 MB RAM. + +SOCIAL (1) — parked / socialising. + Sensors: C920 webcam + face UI. + ~400 MB RAM. + +AWARE (2) — indoor, slow walking, <5 km/h. + Sensors: front CSI + RealSense + LIDAR. + ~850 MB RAM. + +ACTIVE (3) — sidewalk / bike path, 5–15 km/h. + Sensors: front+rear CSI + RealSense + LIDAR + UWB. + ~1.15 GB RAM. + +FULL (4) — street / high-speed >15 km/h, or crossing. + Sensors: all 4 CSI + RealSense + LIDAR + UWB. + ~1.55 GB RAM. + +Transition Logic +---------------- +Automatic transitions are speed-driven with hysteresis to prevent flapping: + + Upgrade thresholds (instantaneous): + SOCIAL → AWARE : speed ≥ 0.3 m/s (~1 km/h, any motion) + AWARE → ACTIVE : speed ≥ 1.4 m/s (~5 km/h) + ACTIVE → FULL : speed ≥ 4.2 m/s (~15 km/h) + + Downgrade thresholds (held for downgrade_hold_s before applying): + FULL → ACTIVE : speed < 3.6 m/s (~13 km/h, 2 km/h hysteresis) + ACTIVE → AWARE : speed < 1.1 m/s (~4 km/h) + AWARE → SOCIAL : speed < 0.1 m/s and idle ≥ idle_to_social_s + +Scenario overrides (bypass hysteresis, instant): + CROSSING → FULL immediately and held until scenario clears + EMERGENCY → FULL immediately (also signals speed reduction upstream) + INDOOR → cap at AWARE (never ACTIVE or FULL indoors) + PARKED → SOCIAL immediately + OUTDOOR → normal speed-based logic + +Battery low override: + battery_pct < battery_low_pct → cap at AWARE to save power + +Safety invariant: + Rear CSI is a safety sensor at speed — it is always on in ACTIVE and FULL. + Any mode downgrade during CROSSING is blocked. +""" + +from __future__ import annotations + +import time +from dataclasses import dataclass +from enum import IntEnum +from typing import Optional + + +# ── Mode enum ───────────────────────────────────────────────────────────────── + +class CameraMode(IntEnum): + SLEEP = 0 + SOCIAL = 1 + AWARE = 2 + ACTIVE = 3 + FULL = 4 + + @property + def label(self) -> str: + return self.name + + +# ── Scenario enum ───────────────────────────────────────────────────────────── + +class Scenario(str): + """ + Known scenario strings. The FSM accepts any string; these are the + canonical values that trigger special behaviour. + """ + UNKNOWN = 'unknown' + PARKED = 'parked' + INDOOR = 'indoor' + OUTDOOR = 'outdoor' + CROSSING = 'crossing' + EMERGENCY = 'emergency' + + +# ── Sensor configuration per mode ───────────────────────────────────────────── + +@dataclass(frozen=True) +class ActiveSensors: + csi_front: bool = False + csi_rear: bool = False + csi_left: bool = False + csi_right: bool = False + realsense: bool = False + lidar: bool = False + uwb: bool = False + webcam: bool = False + + @property + def active_count(self) -> int: + return sum([ + self.csi_front, self.csi_rear, self.csi_left, self.csi_right, + self.realsense, self.lidar, self.uwb, self.webcam, + ]) + + +# Canonical sensor set for each mode +MODE_SENSORS: dict[CameraMode, ActiveSensors] = { + CameraMode.SLEEP: ActiveSensors(), + CameraMode.SOCIAL: ActiveSensors(webcam=True), + CameraMode.AWARE: ActiveSensors(csi_front=True, realsense=True, lidar=True), + CameraMode.ACTIVE: ActiveSensors(csi_front=True, csi_rear=True, + realsense=True, lidar=True, uwb=True), + CameraMode.FULL: ActiveSensors(csi_front=True, csi_rear=True, + csi_left=True, csi_right=True, + realsense=True, lidar=True, uwb=True), +} + +# Speed thresholds (m/s) +_SPD_MOTION = 0.3 # ~1 km/h — any meaningful motion +_SPD_ACTIVE_UP = 5.0 / 3.6 # 5 km/h upgrade to ACTIVE +_SPD_FULL_UP = 15.0 / 3.6 # 15 km/h upgrade to FULL +_SPD_ACTIVE_DOWN = 4.0 / 3.6 # 4 km/h downgrade from ACTIVE (hysteresis gap) +_SPD_FULL_DOWN = 13.0 / 3.6 # 13 km/h downgrade from FULL + +# Scenario strings that force FULL +_FORCE_FULL = frozenset({Scenario.CROSSING, Scenario.EMERGENCY}) +# Scenarios that cap the mode +_CAP_AWARE = frozenset({Scenario.INDOOR}) +_CAP_SOCIAL = frozenset({Scenario.PARKED}) + + +# ── FSM result ──────────────────────────────────────────────────────────────── + +@dataclass +class ModeDecision: + mode: CameraMode + sensors: ActiveSensors + trigger_speed_mps: float + trigger_scenario: str + scenario_override: bool + + +# ── FSM ─────────────────────────────────────────────────────────────────────── + +class CameraPowerFSM: + """ + Finite state machine for camera power mode management. + + Parameters + ---------- + downgrade_hold_s : seconds a downgrade condition must persist before + the mode drops (hysteresis). Default 5.0 s. + idle_to_social_s : seconds of near-zero speed before AWARE → SOCIAL. + Default 30.0 s. + battery_low_pct : battery level below which mode is capped at AWARE. + Default 20.0 %. + clock : callable() → float for monotonic time (injectable + for tests; defaults to time.monotonic). + """ + + def __init__( + self, + downgrade_hold_s: float = 5.0, + idle_to_social_s: float = 30.0, + battery_low_pct: float = 20.0, + clock=None, + ) -> None: + self._downgrade_hold = downgrade_hold_s + self._idle_to_social = idle_to_social_s + self._battery_low_pct = battery_low_pct + self._clock = clock or time.monotonic + + self._mode: CameraMode = CameraMode.SLEEP + self._downgrade_pending_since: Optional[float] = None + self._idle_since: Optional[float] = None + + # Last input values (for reporting) + self._last_speed: float = 0.0 + self._last_scenario: str = Scenario.UNKNOWN + + # ── Public API ──────────────────────────────────────────────────────────── + + @property + def mode(self) -> CameraMode: + return self._mode + + def update( + self, + speed_mps: float, + scenario: str = Scenario.UNKNOWN, + battery_pct: float = 100.0, + ) -> ModeDecision: + """ + Evaluate inputs and return the current mode decision. + + Parameters + ---------- + speed_mps : current speed in m/s (non-negative) + scenario : current operating scenario string + battery_pct : battery charge level 0–100 + + Returns + ------- + ModeDecision with the resolved mode and active sensor set. + """ + now = self._clock() + speed_mps = max(0.0, float(speed_mps)) + self._last_speed = speed_mps + self._last_scenario = scenario + + scenario_override = False + + # ── 1. Hard overrides (no hysteresis) ───────────────────────────── + if scenario in _FORCE_FULL: + self._mode = CameraMode.FULL + self._downgrade_pending_since = None + self._idle_since = None + scenario_override = True + return self._decision(scenario, scenario_override) + + if scenario in _CAP_SOCIAL: + self._mode = CameraMode.SOCIAL + self._downgrade_pending_since = None + self._idle_since = None + scenario_override = True + return self._decision(scenario, scenario_override) + + # ── 2. Compute desired mode from speed ──────────────────────────── + desired = self._speed_to_mode(speed_mps) + + # ── 3. Apply scenario caps ──────────────────────────────────────── + if scenario in _CAP_AWARE: + desired = min(desired, CameraMode.AWARE) + + # ── 4. Apply battery low cap ────────────────────────────────────── + if battery_pct < self._battery_low_pct: + desired = min(desired, CameraMode.AWARE) + + # ── 5. Idle timer: delay AWARE → SOCIAL when briefly stopped ───────── + # _speed_to_mode already returns SOCIAL for speed < _SPD_MOTION. + # When in AWARE (or above) and nearly stopped, hold at AWARE until + # the idle timer expires to avoid flapping at traffic lights. + if speed_mps < 0.1 and self._mode >= CameraMode.AWARE: + if self._idle_since is None: + self._idle_since = now + if now - self._idle_since < self._idle_to_social: + # Timer not yet expired — enforce minimum AWARE + desired = max(desired, CameraMode.AWARE) + # else: timer expired — let desired remain SOCIAL naturally + else: + self._idle_since = None + + # ── 6. Apply hysteresis for downgrades ──────────────────────────── + if desired < self._mode: + # Downgrade requested — start hold timer on first detection, then + # check on every call (including the first, so hold=0 is instant). + if self._downgrade_pending_since is None: + self._downgrade_pending_since = now + if now - self._downgrade_pending_since >= self._downgrade_hold: + self._mode = desired + self._downgrade_pending_since = None + # else: hold not expired — keep current mode + else: + # Upgrade or no change — apply immediately, cancel any pending downgrade + self._downgrade_pending_since = None + self._mode = desired + + return self._decision(scenario, scenario_override) + + def reset(self, mode: CameraMode = CameraMode.SLEEP) -> None: + """Reset FSM state (e.g. on node restart).""" + self._mode = mode + self._downgrade_pending_since = None + self._idle_since = None + + # ── Helpers ─────────────────────────────────────────────────────────────── + + def _speed_to_mode(self, speed_mps: float) -> CameraMode: + """Map speed to desired mode using hysteresis thresholds.""" + if self._mode == CameraMode.FULL: + # Downgrade from FULL uses lower threshold + if speed_mps >= _SPD_FULL_DOWN: + return CameraMode.FULL + elif speed_mps >= _SPD_ACTIVE_DOWN: + return CameraMode.ACTIVE + elif speed_mps >= _SPD_MOTION: + return CameraMode.AWARE + else: + return CameraMode.AWARE # idle handled separately + + elif self._mode == CameraMode.ACTIVE: + if speed_mps >= _SPD_FULL_UP: + return CameraMode.FULL + elif speed_mps >= _SPD_ACTIVE_DOWN: + return CameraMode.ACTIVE + elif speed_mps >= _SPD_MOTION: + return CameraMode.AWARE + else: + return CameraMode.AWARE + + else: + # SLEEP / SOCIAL / AWARE — use upgrade thresholds + if speed_mps >= _SPD_FULL_UP: + return CameraMode.FULL + elif speed_mps >= _SPD_ACTIVE_UP: + return CameraMode.ACTIVE + elif speed_mps >= _SPD_MOTION: + return CameraMode.AWARE + else: + return CameraMode.SOCIAL + + def _decision(self, scenario: str, override: bool) -> ModeDecision: + return ModeDecision( + mode = self._mode, + sensors = MODE_SENSORS[self._mode], + trigger_speed_mps = self._last_speed, + trigger_scenario = scenario, + scenario_override = override, + ) diff --git a/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/camera_power_node.py b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/camera_power_node.py new file mode 100644 index 0000000..c53a801 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/saltybot_bringup/camera_power_node.py @@ -0,0 +1,215 @@ +""" +camera_power_node.py — Adaptive camera power mode manager (Issue #375). + +Subscribes to speed, scenario, and battery level inputs, runs the +CameraPowerFSM, and publishes camera enable/disable commands at 2 Hz. + +Subscribes +---------- + /saltybot/speed std_msgs/Float32 robot speed in m/s + /saltybot/scenario std_msgs/String operating scenario label + /saltybot/battery_pct std_msgs/Float32 battery charge level 0–100 % + +Publishes +--------- + /saltybot/camera_mode saltybot_scene_msgs/CameraPowerMode (2 Hz) + /saltybot/camera_cmd/front std_msgs/Bool — front CSI enable + /saltybot/camera_cmd/rear std_msgs/Bool — rear CSI enable + /saltybot/camera_cmd/left std_msgs/Bool — left CSI enable + /saltybot/camera_cmd/right std_msgs/Bool — right CSI enable + /saltybot/camera_cmd/realsense std_msgs/Bool — D435i enable + /saltybot/camera_cmd/lidar std_msgs/Bool — LIDAR enable + /saltybot/camera_cmd/uwb std_msgs/Bool — UWB enable + /saltybot/camera_cmd/webcam std_msgs/Bool — C920 webcam enable + +Parameters +---------- + rate_hz float 2.0 FSM evaluation and publish rate + downgrade_hold_s float 5.0 Seconds before a downgrade is applied + idle_to_social_s float 30.0 Idle time before AWARE→SOCIAL + battery_low_pct float 20.0 Battery threshold for AWARE cap + initial_mode int 0 Starting mode (0=SLEEP … 4=FULL) +""" + +from __future__ import annotations + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy + +from std_msgs.msg import Bool, Float32, String +from saltybot_scene_msgs.msg import CameraPowerMode + +from ._camera_power_manager import ( + CameraMode, + CameraPowerFSM, + Scenario, + MODE_SENSORS, +) + + +_RELIABLE_LATCHED = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=1, + durability=DurabilityPolicy.TRANSIENT_LOCAL, +) +_RELIABLE = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=10, +) +_SENSOR_QOS = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=5, +) + +# Camera command topic suffixes and their ActiveSensors field name +_CAM_TOPICS = [ + ('front', 'csi_front'), + ('rear', 'csi_rear'), + ('left', 'csi_left'), + ('right', 'csi_right'), + ('realsense', 'realsense'), + ('lidar', 'lidar'), + ('uwb', 'uwb'), + ('webcam', 'webcam'), +] + + +class CameraPowerNode(Node): + + def __init__(self) -> None: + super().__init__('camera_power_node') + + # ── Parameters ────────────────────────────────────────────────────── + self.declare_parameter('rate_hz', 2.0) + self.declare_parameter('downgrade_hold_s', 5.0) + self.declare_parameter('idle_to_social_s', 30.0) + self.declare_parameter('battery_low_pct', 20.0) + self.declare_parameter('initial_mode', 0) + + p = self.get_parameter + rate_hz = max(float(p('rate_hz').value), 0.1) + downgrade_hold_s = max(float(p('downgrade_hold_s').value), 0.0) + idle_to_social_s = max(float(p('idle_to_social_s').value), 0.0) + battery_low_pct = float(p('battery_low_pct').value) + initial_mode = int(p('initial_mode').value) + + # ── FSM ────────────────────────────────────────────────────────────── + self._fsm = CameraPowerFSM( + downgrade_hold_s = downgrade_hold_s, + idle_to_social_s = idle_to_social_s, + battery_low_pct = battery_low_pct, + ) + try: + self._fsm.reset(CameraMode(initial_mode)) + except ValueError: + self._fsm.reset(CameraMode.SLEEP) + + # ── Input state ────────────────────────────────────────────────────── + self._speed_mps: float = 0.0 + self._scenario: str = Scenario.UNKNOWN + self._battery_pct: float = 100.0 + + # ── Subscribers ────────────────────────────────────────────────────── + self._speed_sub = self.create_subscription( + Float32, '/saltybot/speed', + lambda m: setattr(self, '_speed_mps', max(0.0, float(m.data))), + _SENSOR_QOS, + ) + self._scenario_sub = self.create_subscription( + String, '/saltybot/scenario', + lambda m: setattr(self, '_scenario', str(m.data)), + _RELIABLE, + ) + self._battery_sub = self.create_subscription( + Float32, '/saltybot/battery_pct', + lambda m: setattr(self, '_battery_pct', + max(0.0, min(100.0, float(m.data)))), + _RELIABLE, + ) + + # ── Publishers ─────────────────────────────────────────────────────── + self._mode_pub = self.create_publisher( + CameraPowerMode, '/saltybot/camera_mode', _RELIABLE_LATCHED, + ) + self._cam_pubs: dict[str, rclpy.publisher.Publisher] = {} + for suffix, _ in _CAM_TOPICS: + self._cam_pubs[suffix] = self.create_publisher( + Bool, f'/saltybot/camera_cmd/{suffix}', _RELIABLE_LATCHED, + ) + + # ── Timer ──────────────────────────────────────────────────────────── + self._timer = self.create_timer(1.0 / rate_hz, self._step) + self._prev_mode: CameraMode | None = None + + self.get_logger().info( + f'camera_power_node ready — ' + f'rate={rate_hz:.1f}Hz, ' + f'downgrade_hold={downgrade_hold_s}s, ' + f'idle_to_social={idle_to_social_s}s, ' + f'battery_low={battery_low_pct}%' + ) + + # ── Step callback ───────────────────────────────────────────────────────── + + def _step(self) -> None: + decision = self._fsm.update( + speed_mps = self._speed_mps, + scenario = self._scenario, + battery_pct = self._battery_pct, + ) + + # Log transitions + if decision.mode != self._prev_mode: + prev_name = self._prev_mode.label if self._prev_mode else 'INIT' + self.get_logger().info( + f'Camera mode: {prev_name} → {decision.mode.label} ' + f'(speed={self._speed_mps:.2f}m/s ' + f'scenario={self._scenario} ' + f'battery={self._battery_pct:.0f}%)' + ) + self._prev_mode = decision.mode + + # Publish CameraPowerMode message + msg = CameraPowerMode() + msg.header.stamp = self.get_clock().now().to_msg() + msg.header.frame_id = '' + msg.mode = int(decision.mode) + msg.mode_name = decision.mode.label + s = decision.sensors + msg.csi_front = s.csi_front + msg.csi_rear = s.csi_rear + msg.csi_left = s.csi_left + msg.csi_right = s.csi_right + msg.realsense = s.realsense + msg.lidar = s.lidar + msg.uwb = s.uwb + msg.webcam = s.webcam + msg.trigger_speed_mps = float(decision.trigger_speed_mps) + msg.trigger_scenario = decision.trigger_scenario + msg.scenario_override = decision.scenario_override + self._mode_pub.publish(msg) + + # Publish per-camera Bool commands + for suffix, field in _CAM_TOPICS: + enabled = bool(getattr(s, field)) + b = Bool() + b.data = enabled + self._cam_pubs[suffix].publish(b) + + +def main(args=None) -> None: + rclpy.init(args=args) + node = CameraPowerNode() + try: + rclpy.spin(node) + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/jetson/ros2_ws/src/saltybot_bringup/test/test_camera_power_manager.py b/jetson/ros2_ws/src/saltybot_bringup/test/test_camera_power_manager.py new file mode 100644 index 0000000..59840f0 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_bringup/test/test_camera_power_manager.py @@ -0,0 +1,575 @@ +""" +test_camera_power_manager.py — Unit tests for _camera_power_manager.py (Issue #375). + +Covers: + - Mode sensor configurations + - Speed-driven upgrade transitions + - Speed-driven downgrade with hysteresis + - Scenario overrides (CROSSING, EMERGENCY, PARKED, INDOOR) + - Battery low cap + - Idle → SOCIAL transition + - Safety invariants (rear CSI in ACTIVE/FULL, CROSSING cannot downgrade) + - Reset + - ModeDecision fields +""" + +from __future__ import annotations + +import pytest + +from saltybot_bringup._camera_power_manager import ( + ActiveSensors, + CameraMode, + CameraPowerFSM, + MODE_SENSORS, + ModeDecision, + Scenario, + _SPD_ACTIVE_DOWN, + _SPD_ACTIVE_UP, + _SPD_FULL_DOWN, + _SPD_FULL_UP, + _SPD_MOTION, +) + + +# ───────────────────────────────────────────────────────────────────────────── +# Helpers +# ───────────────────────────────────────────────────────────────────────────── + +class _FakeClock: + """Injectable monotonic clock for deterministic tests.""" + def __init__(self, t: float = 0.0) -> None: + self.t = t + def __call__(self) -> float: + return self.t + def advance(self, dt: float) -> None: + self.t += dt + + +def _fsm(hold: float = 5.0, idle: float = 30.0, bat_low: float = 20.0, + clock=None) -> CameraPowerFSM: + c = clock or _FakeClock() + return CameraPowerFSM( + downgrade_hold_s=hold, + idle_to_social_s=idle, + battery_low_pct=bat_low, + clock=c, + ) + + +# ───────────────────────────────────────────────────────────────────────────── +# Mode sensor configurations +# ───────────────────────────────────────────────────────────────────────────── + +class TestModeSensors: + + def test_sleep_no_sensors(self): + s = MODE_SENSORS[CameraMode.SLEEP] + assert s.active_count == 0 + + def test_social_webcam_only(self): + s = MODE_SENSORS[CameraMode.SOCIAL] + assert s.webcam is True + assert s.csi_front is False + assert s.realsense is False + assert s.lidar is False + + def test_aware_front_realsense_lidar(self): + s = MODE_SENSORS[CameraMode.AWARE] + assert s.csi_front is True + assert s.realsense is True + assert s.lidar is True + assert s.csi_rear is False + assert s.csi_left is False + assert s.csi_right is False + assert s.uwb is False + + def test_active_front_rear_realsense_lidar_uwb(self): + s = MODE_SENSORS[CameraMode.ACTIVE] + assert s.csi_front is True + assert s.csi_rear is True + assert s.realsense is True + assert s.lidar is True + assert s.uwb is True + assert s.csi_left is False + assert s.csi_right is False + + def test_full_all_sensors(self): + s = MODE_SENSORS[CameraMode.FULL] + assert s.csi_front is True + assert s.csi_rear is True + assert s.csi_left is True + assert s.csi_right is True + assert s.realsense is True + assert s.lidar is True + assert s.uwb is True + assert s.webcam is False # webcam not needed at speed + + def test_mode_sensor_counts_increase(self): + counts = [MODE_SENSORS[m].active_count for m in CameraMode] + assert counts == sorted(counts), "Higher modes should have more sensors" + + def test_safety_rear_csi_in_active(self): + assert MODE_SENSORS[CameraMode.ACTIVE].csi_rear is True + + def test_safety_rear_csi_in_full(self): + assert MODE_SENSORS[CameraMode.FULL].csi_rear is True + + +# ───────────────────────────────────────────────────────────────────────────── +# Speed-driven upgrades (instant) +# ───────────────────────────────────────────────────────────────────────────── + +class TestSpeedUpgrades: + + def test_no_motion_stays_social(self): + f = _fsm() + d = f.update(speed_mps=0.0) + assert d.mode == CameraMode.SOCIAL + + def test_slow_motion_upgrades_to_aware(self): + f = _fsm() + d = f.update(speed_mps=_SPD_MOTION + 0.1) + assert d.mode == CameraMode.AWARE + + def test_5kmh_upgrades_to_active(self): + f = _fsm() + d = f.update(speed_mps=_SPD_ACTIVE_UP + 0.1) + assert d.mode == CameraMode.ACTIVE + + def test_15kmh_upgrades_to_full(self): + f = _fsm() + d = f.update(speed_mps=_SPD_FULL_UP + 0.1) + assert d.mode == CameraMode.FULL + + def test_upgrades_skip_intermediate_modes(self): + """At 20 km/h from rest, jumps directly to FULL.""" + f = _fsm() + d = f.update(speed_mps=20.0 / 3.6) + assert d.mode == CameraMode.FULL + + def test_upgrade_is_immediate_no_hold(self): + """Upgrades do NOT require hold time.""" + clock = _FakeClock(0.0) + f = _fsm(hold=10.0, clock=clock) + # Still at t=0 + d = f.update(speed_mps=_SPD_FULL_UP + 0.5) + assert d.mode == CameraMode.FULL + + def test_exactly_at_motion_threshold(self): + f = _fsm() + d = f.update(speed_mps=_SPD_MOTION) + assert d.mode == CameraMode.AWARE + + def test_exactly_at_active_threshold(self): + f = _fsm() + d = f.update(speed_mps=_SPD_ACTIVE_UP) + assert d.mode == CameraMode.ACTIVE + + def test_exactly_at_full_threshold(self): + f = _fsm() + d = f.update(speed_mps=_SPD_FULL_UP) + assert d.mode == CameraMode.FULL + + def test_negative_speed_clamped_to_zero(self): + f = _fsm() + d = f.update(speed_mps=-1.0) + assert d.mode == CameraMode.SOCIAL + + +# ───────────────────────────────────────────────────────────────────────────── +# Downgrade hysteresis +# ───────────────────────────────────────────────────────────────────────────── + +class TestDowngradeHysteresis: + + def _reach_full(self, f: CameraPowerFSM, clock: _FakeClock) -> None: + f.update(speed_mps=_SPD_FULL_UP + 0.5) + assert f.mode == CameraMode.FULL + + def test_downgrade_not_immediate(self): + clock = _FakeClock(0.0) + f = _fsm(hold=5.0, clock=clock) + self._reach_full(f, clock) + # Drop to below FULL threshold but don't advance clock + d = f.update(speed_mps=0.0) + assert d.mode == CameraMode.FULL # still held + + def test_downgrade_after_hold_expires(self): + clock = _FakeClock(0.0) + f = _fsm(hold=5.0, clock=clock) + self._reach_full(f, clock) + f.update(speed_mps=0.0) # first low-speed call starts the hold timer + clock.advance(5.1) + d = f.update(speed_mps=0.0) # hold expired — downgrade to AWARE (not SOCIAL; + # SOCIAL requires an additional idle timer from AWARE, see TestIdleToSocial) + assert d.mode == CameraMode.AWARE + + def test_downgrade_cancelled_by_speed_spike(self): + """If speed spikes back up during hold, downgrade is cancelled.""" + clock = _FakeClock(0.0) + f = _fsm(hold=5.0, clock=clock) + self._reach_full(f, clock) + clock.advance(3.0) + f.update(speed_mps=0.0) # start downgrade timer + clock.advance(1.0) + f.update(speed_mps=_SPD_FULL_UP + 1.0) # back to full speed + clock.advance(3.0) # would have expired hold if not cancelled + d = f.update(speed_mps=0.0) + # Hold restarted; only 3s elapsed since the cancellation reset + assert d.mode == CameraMode.FULL + + def test_full_to_active_hysteresis_band(self): + """Speed in [_SPD_FULL_DOWN, _SPD_FULL_UP) while in FULL → stays FULL (hold pending).""" + clock = _FakeClock(0.0) + f = _fsm(hold=5.0, clock=clock) + self._reach_full(f, clock) + # Speed in hysteresis band (between down and up thresholds) + mid = (_SPD_FULL_DOWN + _SPD_FULL_UP) / 2.0 + d = f.update(speed_mps=mid) + assert d.mode == CameraMode.FULL # pending downgrade, not yet applied + + def test_hold_zero_downgrades_immediately(self): + clock = _FakeClock(0.0) + f = _fsm(hold=0.0, clock=clock) + f.update(speed_mps=_SPD_FULL_UP + 0.5) + # hold=0: downgrade applies on the very first low-speed call. + # From FULL at speed=0 → AWARE (SOCIAL requires a separate idle timer). + d = f.update(speed_mps=0.0) + assert d.mode == CameraMode.AWARE + + def test_downgrade_full_to_active_at_13kmh(self): + clock = _FakeClock(0.0) + f = _fsm(hold=0.0, clock=clock) + f.update(speed_mps=_SPD_FULL_UP + 0.5) # → FULL + d = f.update(speed_mps=_SPD_FULL_DOWN - 0.05) # just below 13 km/h + assert d.mode == CameraMode.ACTIVE + + def test_downgrade_active_to_aware(self): + clock = _FakeClock(0.0) + f = _fsm(hold=0.0, clock=clock) + f.update(speed_mps=_SPD_ACTIVE_UP + 0.5) # → ACTIVE + d = f.update(speed_mps=_SPD_ACTIVE_DOWN - 0.05) + assert d.mode == CameraMode.AWARE + + +# ───────────────────────────────────────────────────────────────────────────── +# Scenario overrides +# ───────────────────────────────────────────────────────────────────────────── + +class TestScenarioOverrides: + + def test_crossing_forces_full_from_rest(self): + f = _fsm() + d = f.update(speed_mps=0.0, scenario=Scenario.CROSSING) + assert d.mode == CameraMode.FULL + assert d.scenario_override is True + + def test_crossing_forces_full_from_aware(self): + f = _fsm() + f.update(speed_mps=0.5) # AWARE + d = f.update(speed_mps=0.5, scenario=Scenario.CROSSING) + assert d.mode == CameraMode.FULL + + def test_emergency_forces_full(self): + f = _fsm() + d = f.update(speed_mps=0.0, scenario=Scenario.EMERGENCY) + assert d.mode == CameraMode.FULL + assert d.scenario_override is True + + def test_crossing_bypasses_hysteresis(self): + """CROSSING forces FULL even if a downgrade is pending.""" + clock = _FakeClock(0.0) + f = _fsm(hold=5.0, clock=clock) + f.update(speed_mps=_SPD_FULL_UP + 1.0) # FULL + clock.advance(4.0) + f.update(speed_mps=0.0) # pending downgrade started + d = f.update(speed_mps=0.0, scenario=Scenario.CROSSING) + assert d.mode == CameraMode.FULL + + def test_parked_forces_social(self): + f = _fsm() + f.update(speed_mps=_SPD_FULL_UP + 1.0) # FULL + d = f.update(speed_mps=0.0, scenario=Scenario.PARKED) + assert d.mode == CameraMode.SOCIAL + assert d.scenario_override is True + + def test_indoor_caps_at_aware(self): + f = _fsm() + # Speed says ACTIVE but indoor caps to AWARE + d = f.update(speed_mps=_SPD_ACTIVE_UP + 0.5, scenario=Scenario.INDOOR) + assert d.mode == CameraMode.AWARE + + def test_indoor_cannot_reach_full(self): + f = _fsm() + d = f.update(speed_mps=_SPD_FULL_UP + 2.0, scenario=Scenario.INDOOR) + assert d.mode == CameraMode.AWARE + + def test_outdoor_uses_speed_logic(self): + f = _fsm() + d = f.update(speed_mps=_SPD_FULL_UP + 0.5, scenario=Scenario.OUTDOOR) + assert d.mode == CameraMode.FULL + + def test_unknown_scenario_uses_speed_logic(self): + f = _fsm() + d = f.update(speed_mps=_SPD_ACTIVE_UP + 0.5, scenario=Scenario.UNKNOWN) + assert d.mode == CameraMode.ACTIVE + + def test_crossing_sets_all_csi_active(self): + f = _fsm() + d = f.update(speed_mps=0.0, scenario=Scenario.CROSSING) + s = d.sensors + assert s.csi_front and s.csi_rear and s.csi_left and s.csi_right + + def test_scenario_override_false_for_speed_transition(self): + f = _fsm() + d = f.update(speed_mps=_SPD_ACTIVE_UP + 0.5, scenario=Scenario.OUTDOOR) + assert d.scenario_override is False + + +# ───────────────────────────────────────────────────────────────────────────── +# Battery low cap +# ───────────────────────────────────────────────────────────────────────────── + +class TestBatteryLow: + + def test_battery_low_caps_at_aware(self): + f = _fsm(bat_low=20.0) + d = f.update(speed_mps=_SPD_FULL_UP + 1.0, battery_pct=15.0) + assert d.mode == CameraMode.AWARE + + def test_battery_low_prevents_active(self): + f = _fsm(bat_low=20.0) + d = f.update(speed_mps=_SPD_ACTIVE_UP + 0.5, battery_pct=19.9) + assert d.mode == CameraMode.AWARE + + def test_battery_full_no_cap(self): + f = _fsm(bat_low=20.0) + d = f.update(speed_mps=_SPD_FULL_UP + 1.0, battery_pct=100.0) + assert d.mode == CameraMode.FULL + + def test_battery_at_threshold_not_capped(self): + f = _fsm(bat_low=20.0) + d = f.update(speed_mps=_SPD_FULL_UP + 1.0, battery_pct=20.0) + # At exactly threshold — not below — so no cap + assert d.mode == CameraMode.FULL + + def test_battery_low_allows_aware(self): + f = _fsm(bat_low=20.0) + d = f.update(speed_mps=_SPD_MOTION + 0.1, battery_pct=10.0) + assert d.mode == CameraMode.AWARE + + +# ───────────────────────────────────────────────────────────────────────────── +# Idle → SOCIAL transition +# ───────────────────────────────────────────────────────────────────────────── + +class TestIdleToSocial: + + def test_idle_transitions_to_social_after_timeout(self): + clock = _FakeClock(0.0) + f = _fsm(idle=10.0, hold=0.0, clock=clock) + # Bring to AWARE + f.update(speed_mps=_SPD_MOTION + 0.1) + # Reduce to near-zero + clock.advance(5.0) + f.update(speed_mps=0.05) # below 0.1, idle timer starts + clock.advance(10.1) + d = f.update(speed_mps=0.05) + assert d.mode == CameraMode.SOCIAL + + def test_motion_resets_idle_timer(self): + clock = _FakeClock(0.0) + f = _fsm(idle=10.0, hold=0.0, clock=clock) + f.update(speed_mps=_SPD_MOTION + 0.1) # AWARE + clock.advance(5.0) + f.update(speed_mps=0.05) # idle timer starts + clock.advance(5.0) + f.update(speed_mps=1.0) # motion resets timer + clock.advance(6.0) + d = f.update(speed_mps=0.05) # timer restarted, only 6s elapsed + assert d.mode == CameraMode.AWARE + + def test_not_idle_when_moving(self): + clock = _FakeClock(0.0) + f = _fsm(idle=5.0, clock=clock) + f.update(speed_mps=_SPD_MOTION + 0.1) + clock.advance(100.0) + d = f.update(speed_mps=_SPD_MOTION + 0.1) # still moving + assert d.mode == CameraMode.AWARE + + +# ───────────────────────────────────────────────────────────────────────────── +# Reset +# ───────────────────────────────────────────────────────────────────────────── + +class TestReset: + + def test_reset_to_sleep(self): + f = _fsm() + f.update(speed_mps=_SPD_FULL_UP + 1.0) + f.reset(CameraMode.SLEEP) + assert f.mode == CameraMode.SLEEP + + def test_reset_clears_downgrade_timer(self): + clock = _FakeClock(0.0) + f = _fsm(hold=5.0, clock=clock) + f.update(speed_mps=_SPD_FULL_UP + 1.0) + f.update(speed_mps=0.0) # pending downgrade started + f.reset(CameraMode.AWARE) + # After reset, pending downgrade should be cleared + clock.advance(10.0) + d = f.update(speed_mps=0.0) + # From AWARE at speed 0 → idle timer not yet expired → stays AWARE (not SLEEP) + # Actually: speed 0 < _SPD_MOTION → desired=SOCIAL, hold=5.0 + # With hold=5.0 and clock just advanced, pending since = now → no downgrade yet + assert d.mode in (CameraMode.AWARE, CameraMode.SOCIAL) + + def test_reset_to_full(self): + f = _fsm() + f.reset(CameraMode.FULL) + assert f.mode == CameraMode.FULL + + +# ───────────────────────────────────────────────────────────────────────────── +# ModeDecision fields +# ───────────────────────────────────────────────────────────────────────────── + +class TestModeDecisionFields: + + def test_decision_has_mode(self): + f = _fsm() + d = f.update(speed_mps=0.0) + assert isinstance(d.mode, CameraMode) + + def test_decision_has_sensors(self): + f = _fsm() + d = f.update(speed_mps=0.0) + assert isinstance(d.sensors, ActiveSensors) + + def test_decision_sensors_match_mode(self): + f = _fsm() + d = f.update(speed_mps=_SPD_FULL_UP + 1.0) + assert d.sensors == MODE_SENSORS[CameraMode.FULL] + + def test_decision_trigger_speed_recorded(self): + f = _fsm() + d = f.update(speed_mps=2.5) + assert abs(d.trigger_speed_mps - 2.5) < 1e-6 + + def test_decision_trigger_scenario_recorded(self): + f = _fsm() + d = f.update(speed_mps=0.0, scenario='indoor') + assert d.trigger_scenario == 'indoor' + + def test_scenario_override_false_for_normal(self): + f = _fsm() + d = f.update(speed_mps=1.0) + assert d.scenario_override is False + + def test_scenario_override_true_for_crossing(self): + f = _fsm() + d = f.update(speed_mps=0.0, scenario=Scenario.CROSSING) + assert d.scenario_override is True + + +# ───────────────────────────────────────────────────────────────────────────── +# Active sensor counts (RAM budget) +# ───────────────────────────────────────────────────────────────────────────── + +class TestActiveSensors: + + def test_active_sensor_count_non_negative(self): + for m, s in MODE_SENSORS.items(): + assert s.active_count >= 0 + + def test_sleep_zero_sensors(self): + assert MODE_SENSORS[CameraMode.SLEEP].active_count == 0 + + def test_full_seven_sensors(self): + # csi x4 + realsense + lidar + uwb = 7 + assert MODE_SENSORS[CameraMode.FULL].active_count == 7 + + def test_active_sensors_correct(self): + # csi_front + csi_rear + realsense + lidar + uwb = 5 + assert MODE_SENSORS[CameraMode.ACTIVE].active_count == 5 + + def test_aware_sensors_correct(self): + # csi_front + realsense + lidar = 3 + assert MODE_SENSORS[CameraMode.AWARE].active_count == 3 + + def test_social_sensors_correct(self): + # webcam = 1 + assert MODE_SENSORS[CameraMode.SOCIAL].active_count == 1 + + +# ───────────────────────────────────────────────────────────────────────────── +# Mode labels +# ───────────────────────────────────────────────────────────────────────────── + +class TestModeLabels: + + def test_all_modes_have_labels(self): + for m in CameraMode: + assert isinstance(m.label, str) + assert len(m.label) > 0 + + def test_sleep_label(self): + assert CameraMode.SLEEP.label == 'SLEEP' + + def test_full_label(self): + assert CameraMode.FULL.label == 'FULL' + + +# ───────────────────────────────────────────────────────────────────────────── +# Integration: typical ride scenario +# ───────────────────────────────────────────────────────────────────────────── + +class TestIntegrationRideScenario: + """Simulates a typical follow-me trip: start→walk→jog→sprint→crossing→indoor.""" + + def test_full_ride(self): + clock = _FakeClock(0.0) + f = _fsm(hold=2.0, idle=5.0, clock=clock) + + # Starting up + d = f.update(0.0, Scenario.OUTDOOR, 100.0) + assert d.mode == CameraMode.SOCIAL + + # Walking pace (~3 km/h) + d = f.update(0.8, Scenario.OUTDOOR, 95.0) + assert d.mode == CameraMode.AWARE + + # Jogging (~7 km/h) + d = f.update(2.0, Scenario.OUTDOOR, 90.0) + assert d.mode == CameraMode.ACTIVE + + # High-speed following (~20 km/h) + d = f.update(5.6, Scenario.OUTDOOR, 85.0) + assert d.mode == CameraMode.FULL + + # Street crossing — even at slow speed, stays FULL + d = f.update(1.0, Scenario.CROSSING, 85.0) + assert d.mode == CameraMode.FULL + assert d.scenario_override is True + + # Back to outdoor walk + clock.advance(3.0) + d = f.update(1.0, Scenario.OUTDOOR, 80.0) + assert d.mode == CameraMode.FULL # hold not expired yet + + clock.advance(2.1) + d = f.update(0.8, Scenario.OUTDOOR, 80.0) + assert d.mode == CameraMode.AWARE # hold expired, down to walk speed + + # Enter supermarket (indoor cap) + d = f.update(0.8, Scenario.INDOOR, 78.0) + assert d.mode == CameraMode.AWARE + + # Park and wait + d = f.update(0.0, Scenario.PARKED, 75.0) + assert d.mode == CameraMode.SOCIAL + + # Low battery during fast follow + d = f.update(5.6, Scenario.OUTDOOR, 15.0) + assert d.mode == CameraMode.AWARE # battery cap diff --git a/jetson/ros2_ws/src/saltybot_scene_msgs/msg/CameraPowerMode.msg b/jetson/ros2_ws/src/saltybot_scene_msgs/msg/CameraPowerMode.msg new file mode 100644 index 0000000..6e7356f --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_scene_msgs/msg/CameraPowerMode.msg @@ -0,0 +1,26 @@ +std_msgs/Header header + +# Current power mode (use constants below) +uint8 mode +uint8 MODE_SLEEP = 0 # charging / idle — minimal sensors +uint8 MODE_SOCIAL = 1 # parked / socialising — webcam + face UI only +uint8 MODE_AWARE = 2 # indoor / slow (<5 km/h) — front CSI + RealSense + LIDAR +uint8 MODE_ACTIVE = 3 # sidewalk / bike path (5–15 km/h) — front+rear + RealSense + LIDAR + UWB +uint8 MODE_FULL = 4 # street / high-speed (>15 km/h) or crossing — all sensors + +string mode_name # human-readable label + +# Active sensor flags for this mode +bool csi_front +bool csi_rear +bool csi_left +bool csi_right +bool realsense +bool lidar +bool uwb +bool webcam + +# Transition metadata +float32 trigger_speed_mps # speed that triggered the last transition +string trigger_scenario # scenario that triggered the last transition +bool scenario_override # true when scenario (not speed) forced the mode -- 2.47.2