Merge pull request 'feat: adaptive camera power modes (Issue #375)' (#376) from sl-perception/issue-375-camera-power-modes into main

This commit is contained in:
sl-jetson 2026-03-03 17:20:04 -05:00
commit a8a9771ec7
4 changed files with 1145 additions and 0 deletions

View File

@ -0,0 +1,329 @@
"""
_camera_power_manager.py Adaptive camera power mode FSM (Issue #375).
Pure-Python library (no ROS2 / hardware dependencies) for full unit-test
coverage without hardware.
Five Power Modes
----------------
SLEEP (0) charging / idle.
Sensors: none (or 1 CSI at 1 fps as wake trigger).
~150 MB RAM.
SOCIAL (1) parked / socialising.
Sensors: C920 webcam + face UI.
~400 MB RAM.
AWARE (2) indoor, slow walking, <5 km/h.
Sensors: front CSI + RealSense + LIDAR.
~850 MB RAM.
ACTIVE (3) sidewalk / bike path, 515 km/h.
Sensors: front+rear CSI + RealSense + LIDAR + UWB.
~1.15 GB RAM.
FULL (4) street / high-speed >15 km/h, or crossing.
Sensors: all 4 CSI + RealSense + LIDAR + UWB.
~1.55 GB RAM.
Transition Logic
----------------
Automatic transitions are speed-driven with hysteresis to prevent flapping:
Upgrade thresholds (instantaneous):
SOCIAL AWARE : speed 0.3 m/s (~1 km/h, any motion)
AWARE ACTIVE : speed 1.4 m/s (~5 km/h)
ACTIVE FULL : speed 4.2 m/s (~15 km/h)
Downgrade thresholds (held for downgrade_hold_s before applying):
FULL ACTIVE : speed < 3.6 m/s (~13 km/h, 2 km/h hysteresis)
ACTIVE AWARE : speed < 1.1 m/s (~4 km/h)
AWARE SOCIAL : speed < 0.1 m/s and idle idle_to_social_s
Scenario overrides (bypass hysteresis, instant):
CROSSING FULL immediately and held until scenario clears
EMERGENCY FULL immediately (also signals speed reduction upstream)
INDOOR cap at AWARE (never ACTIVE or FULL indoors)
PARKED SOCIAL immediately
OUTDOOR normal speed-based logic
Battery low override:
battery_pct < battery_low_pct cap at AWARE to save power
Safety invariant:
Rear CSI is a safety sensor at speed it is always on in ACTIVE and FULL.
Any mode downgrade during CROSSING is blocked.
"""
from __future__ import annotations
import time
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional
# ── Mode enum ─────────────────────────────────────────────────────────────────
class CameraMode(IntEnum):
SLEEP = 0
SOCIAL = 1
AWARE = 2
ACTIVE = 3
FULL = 4
@property
def label(self) -> str:
return self.name
# ── Scenario enum ─────────────────────────────────────────────────────────────
class Scenario(str):
"""
Known scenario strings. The FSM accepts any string; these are the
canonical values that trigger special behaviour.
"""
UNKNOWN = 'unknown'
PARKED = 'parked'
INDOOR = 'indoor'
OUTDOOR = 'outdoor'
CROSSING = 'crossing'
EMERGENCY = 'emergency'
# ── Sensor configuration per mode ─────────────────────────────────────────────
@dataclass(frozen=True)
class ActiveSensors:
csi_front: bool = False
csi_rear: bool = False
csi_left: bool = False
csi_right: bool = False
realsense: bool = False
lidar: bool = False
uwb: bool = False
webcam: bool = False
@property
def active_count(self) -> int:
return sum([
self.csi_front, self.csi_rear, self.csi_left, self.csi_right,
self.realsense, self.lidar, self.uwb, self.webcam,
])
# Canonical sensor set for each mode
MODE_SENSORS: dict[CameraMode, ActiveSensors] = {
CameraMode.SLEEP: ActiveSensors(),
CameraMode.SOCIAL: ActiveSensors(webcam=True),
CameraMode.AWARE: ActiveSensors(csi_front=True, realsense=True, lidar=True),
CameraMode.ACTIVE: ActiveSensors(csi_front=True, csi_rear=True,
realsense=True, lidar=True, uwb=True),
CameraMode.FULL: ActiveSensors(csi_front=True, csi_rear=True,
csi_left=True, csi_right=True,
realsense=True, lidar=True, uwb=True),
}
# Speed thresholds (m/s)
_SPD_MOTION = 0.3 # ~1 km/h — any meaningful motion
_SPD_ACTIVE_UP = 5.0 / 3.6 # 5 km/h upgrade to ACTIVE
_SPD_FULL_UP = 15.0 / 3.6 # 15 km/h upgrade to FULL
_SPD_ACTIVE_DOWN = 4.0 / 3.6 # 4 km/h downgrade from ACTIVE (hysteresis gap)
_SPD_FULL_DOWN = 13.0 / 3.6 # 13 km/h downgrade from FULL
# Scenario strings that force FULL
_FORCE_FULL = frozenset({Scenario.CROSSING, Scenario.EMERGENCY})
# Scenarios that cap the mode
_CAP_AWARE = frozenset({Scenario.INDOOR})
_CAP_SOCIAL = frozenset({Scenario.PARKED})
# ── FSM result ────────────────────────────────────────────────────────────────
@dataclass
class ModeDecision:
mode: CameraMode
sensors: ActiveSensors
trigger_speed_mps: float
trigger_scenario: str
scenario_override: bool
# ── FSM ───────────────────────────────────────────────────────────────────────
class CameraPowerFSM:
"""
Finite state machine for camera power mode management.
Parameters
----------
downgrade_hold_s : seconds a downgrade condition must persist before
the mode drops (hysteresis). Default 5.0 s.
idle_to_social_s : seconds of near-zero speed before AWARE SOCIAL.
Default 30.0 s.
battery_low_pct : battery level below which mode is capped at AWARE.
Default 20.0 %.
clock : callable() float for monotonic time (injectable
for tests; defaults to time.monotonic).
"""
def __init__(
self,
downgrade_hold_s: float = 5.0,
idle_to_social_s: float = 30.0,
battery_low_pct: float = 20.0,
clock=None,
) -> None:
self._downgrade_hold = downgrade_hold_s
self._idle_to_social = idle_to_social_s
self._battery_low_pct = battery_low_pct
self._clock = clock or time.monotonic
self._mode: CameraMode = CameraMode.SLEEP
self._downgrade_pending_since: Optional[float] = None
self._idle_since: Optional[float] = None
# Last input values (for reporting)
self._last_speed: float = 0.0
self._last_scenario: str = Scenario.UNKNOWN
# ── Public API ────────────────────────────────────────────────────────────
@property
def mode(self) -> CameraMode:
return self._mode
def update(
self,
speed_mps: float,
scenario: str = Scenario.UNKNOWN,
battery_pct: float = 100.0,
) -> ModeDecision:
"""
Evaluate inputs and return the current mode decision.
Parameters
----------
speed_mps : current speed in m/s (non-negative)
scenario : current operating scenario string
battery_pct : battery charge level 0100
Returns
-------
ModeDecision with the resolved mode and active sensor set.
"""
now = self._clock()
speed_mps = max(0.0, float(speed_mps))
self._last_speed = speed_mps
self._last_scenario = scenario
scenario_override = False
# ── 1. Hard overrides (no hysteresis) ─────────────────────────────
if scenario in _FORCE_FULL:
self._mode = CameraMode.FULL
self._downgrade_pending_since = None
self._idle_since = None
scenario_override = True
return self._decision(scenario, scenario_override)
if scenario in _CAP_SOCIAL:
self._mode = CameraMode.SOCIAL
self._downgrade_pending_since = None
self._idle_since = None
scenario_override = True
return self._decision(scenario, scenario_override)
# ── 2. Compute desired mode from speed ────────────────────────────
desired = self._speed_to_mode(speed_mps)
# ── 3. Apply scenario caps ────────────────────────────────────────
if scenario in _CAP_AWARE:
desired = min(desired, CameraMode.AWARE)
# ── 4. Apply battery low cap ──────────────────────────────────────
if battery_pct < self._battery_low_pct:
desired = min(desired, CameraMode.AWARE)
# ── 5. Idle timer: delay AWARE → SOCIAL when briefly stopped ─────────
# _speed_to_mode already returns SOCIAL for speed < _SPD_MOTION.
# When in AWARE (or above) and nearly stopped, hold at AWARE until
# the idle timer expires to avoid flapping at traffic lights.
if speed_mps < 0.1 and self._mode >= CameraMode.AWARE:
if self._idle_since is None:
self._idle_since = now
if now - self._idle_since < self._idle_to_social:
# Timer not yet expired — enforce minimum AWARE
desired = max(desired, CameraMode.AWARE)
# else: timer expired — let desired remain SOCIAL naturally
else:
self._idle_since = None
# ── 6. Apply hysteresis for downgrades ────────────────────────────
if desired < self._mode:
# Downgrade requested — start hold timer on first detection, then
# check on every call (including the first, so hold=0 is instant).
if self._downgrade_pending_since is None:
self._downgrade_pending_since = now
if now - self._downgrade_pending_since >= self._downgrade_hold:
self._mode = desired
self._downgrade_pending_since = None
# else: hold not expired — keep current mode
else:
# Upgrade or no change — apply immediately, cancel any pending downgrade
self._downgrade_pending_since = None
self._mode = desired
return self._decision(scenario, scenario_override)
def reset(self, mode: CameraMode = CameraMode.SLEEP) -> None:
"""Reset FSM state (e.g. on node restart)."""
self._mode = mode
self._downgrade_pending_since = None
self._idle_since = None
# ── Helpers ───────────────────────────────────────────────────────────────
def _speed_to_mode(self, speed_mps: float) -> CameraMode:
"""Map speed to desired mode using hysteresis thresholds."""
if self._mode == CameraMode.FULL:
# Downgrade from FULL uses lower threshold
if speed_mps >= _SPD_FULL_DOWN:
return CameraMode.FULL
elif speed_mps >= _SPD_ACTIVE_DOWN:
return CameraMode.ACTIVE
elif speed_mps >= _SPD_MOTION:
return CameraMode.AWARE
else:
return CameraMode.AWARE # idle handled separately
elif self._mode == CameraMode.ACTIVE:
if speed_mps >= _SPD_FULL_UP:
return CameraMode.FULL
elif speed_mps >= _SPD_ACTIVE_DOWN:
return CameraMode.ACTIVE
elif speed_mps >= _SPD_MOTION:
return CameraMode.AWARE
else:
return CameraMode.AWARE
else:
# SLEEP / SOCIAL / AWARE — use upgrade thresholds
if speed_mps >= _SPD_FULL_UP:
return CameraMode.FULL
elif speed_mps >= _SPD_ACTIVE_UP:
return CameraMode.ACTIVE
elif speed_mps >= _SPD_MOTION:
return CameraMode.AWARE
else:
return CameraMode.SOCIAL
def _decision(self, scenario: str, override: bool) -> ModeDecision:
return ModeDecision(
mode = self._mode,
sensors = MODE_SENSORS[self._mode],
trigger_speed_mps = self._last_speed,
trigger_scenario = scenario,
scenario_override = override,
)

View File

@ -0,0 +1,215 @@
"""
camera_power_node.py Adaptive camera power mode manager (Issue #375).
Subscribes to speed, scenario, and battery level inputs, runs the
CameraPowerFSM, and publishes camera enable/disable commands at 2 Hz.
Subscribes
----------
/saltybot/speed std_msgs/Float32 robot speed in m/s
/saltybot/scenario std_msgs/String operating scenario label
/saltybot/battery_pct std_msgs/Float32 battery charge level 0100 %
Publishes
---------
/saltybot/camera_mode saltybot_scene_msgs/CameraPowerMode (2 Hz)
/saltybot/camera_cmd/front std_msgs/Bool front CSI enable
/saltybot/camera_cmd/rear std_msgs/Bool rear CSI enable
/saltybot/camera_cmd/left std_msgs/Bool left CSI enable
/saltybot/camera_cmd/right std_msgs/Bool right CSI enable
/saltybot/camera_cmd/realsense std_msgs/Bool D435i enable
/saltybot/camera_cmd/lidar std_msgs/Bool LIDAR enable
/saltybot/camera_cmd/uwb std_msgs/Bool UWB enable
/saltybot/camera_cmd/webcam std_msgs/Bool C920 webcam enable
Parameters
----------
rate_hz float 2.0 FSM evaluation and publish rate
downgrade_hold_s float 5.0 Seconds before a downgrade is applied
idle_to_social_s float 30.0 Idle time before AWARESOCIAL
battery_low_pct float 20.0 Battery threshold for AWARE cap
initial_mode int 0 Starting mode (0=SLEEP 4=FULL)
"""
from __future__ import annotations
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
from std_msgs.msg import Bool, Float32, String
from saltybot_scene_msgs.msg import CameraPowerMode
from ._camera_power_manager import (
CameraMode,
CameraPowerFSM,
Scenario,
MODE_SENSORS,
)
_RELIABLE_LATCHED = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=1,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
)
_RELIABLE = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
# Camera command topic suffixes and their ActiveSensors field name
_CAM_TOPICS = [
('front', 'csi_front'),
('rear', 'csi_rear'),
('left', 'csi_left'),
('right', 'csi_right'),
('realsense', 'realsense'),
('lidar', 'lidar'),
('uwb', 'uwb'),
('webcam', 'webcam'),
]
class CameraPowerNode(Node):
def __init__(self) -> None:
super().__init__('camera_power_node')
# ── Parameters ──────────────────────────────────────────────────────
self.declare_parameter('rate_hz', 2.0)
self.declare_parameter('downgrade_hold_s', 5.0)
self.declare_parameter('idle_to_social_s', 30.0)
self.declare_parameter('battery_low_pct', 20.0)
self.declare_parameter('initial_mode', 0)
p = self.get_parameter
rate_hz = max(float(p('rate_hz').value), 0.1)
downgrade_hold_s = max(float(p('downgrade_hold_s').value), 0.0)
idle_to_social_s = max(float(p('idle_to_social_s').value), 0.0)
battery_low_pct = float(p('battery_low_pct').value)
initial_mode = int(p('initial_mode').value)
# ── FSM ──────────────────────────────────────────────────────────────
self._fsm = CameraPowerFSM(
downgrade_hold_s = downgrade_hold_s,
idle_to_social_s = idle_to_social_s,
battery_low_pct = battery_low_pct,
)
try:
self._fsm.reset(CameraMode(initial_mode))
except ValueError:
self._fsm.reset(CameraMode.SLEEP)
# ── Input state ──────────────────────────────────────────────────────
self._speed_mps: float = 0.0
self._scenario: str = Scenario.UNKNOWN
self._battery_pct: float = 100.0
# ── Subscribers ──────────────────────────────────────────────────────
self._speed_sub = self.create_subscription(
Float32, '/saltybot/speed',
lambda m: setattr(self, '_speed_mps', max(0.0, float(m.data))),
_SENSOR_QOS,
)
self._scenario_sub = self.create_subscription(
String, '/saltybot/scenario',
lambda m: setattr(self, '_scenario', str(m.data)),
_RELIABLE,
)
self._battery_sub = self.create_subscription(
Float32, '/saltybot/battery_pct',
lambda m: setattr(self, '_battery_pct',
max(0.0, min(100.0, float(m.data)))),
_RELIABLE,
)
# ── Publishers ───────────────────────────────────────────────────────
self._mode_pub = self.create_publisher(
CameraPowerMode, '/saltybot/camera_mode', _RELIABLE_LATCHED,
)
self._cam_pubs: dict[str, rclpy.publisher.Publisher] = {}
for suffix, _ in _CAM_TOPICS:
self._cam_pubs[suffix] = self.create_publisher(
Bool, f'/saltybot/camera_cmd/{suffix}', _RELIABLE_LATCHED,
)
# ── Timer ────────────────────────────────────────────────────────────
self._timer = self.create_timer(1.0 / rate_hz, self._step)
self._prev_mode: CameraMode | None = None
self.get_logger().info(
f'camera_power_node ready — '
f'rate={rate_hz:.1f}Hz, '
f'downgrade_hold={downgrade_hold_s}s, '
f'idle_to_social={idle_to_social_s}s, '
f'battery_low={battery_low_pct}%'
)
# ── Step callback ─────────────────────────────────────────────────────────
def _step(self) -> None:
decision = self._fsm.update(
speed_mps = self._speed_mps,
scenario = self._scenario,
battery_pct = self._battery_pct,
)
# Log transitions
if decision.mode != self._prev_mode:
prev_name = self._prev_mode.label if self._prev_mode else 'INIT'
self.get_logger().info(
f'Camera mode: {prev_name}{decision.mode.label} '
f'(speed={self._speed_mps:.2f}m/s '
f'scenario={self._scenario} '
f'battery={self._battery_pct:.0f}%)'
)
self._prev_mode = decision.mode
# Publish CameraPowerMode message
msg = CameraPowerMode()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = ''
msg.mode = int(decision.mode)
msg.mode_name = decision.mode.label
s = decision.sensors
msg.csi_front = s.csi_front
msg.csi_rear = s.csi_rear
msg.csi_left = s.csi_left
msg.csi_right = s.csi_right
msg.realsense = s.realsense
msg.lidar = s.lidar
msg.uwb = s.uwb
msg.webcam = s.webcam
msg.trigger_speed_mps = float(decision.trigger_speed_mps)
msg.trigger_scenario = decision.trigger_scenario
msg.scenario_override = decision.scenario_override
self._mode_pub.publish(msg)
# Publish per-camera Bool commands
for suffix, field in _CAM_TOPICS:
enabled = bool(getattr(s, field))
b = Bool()
b.data = enabled
self._cam_pubs[suffix].publish(b)
def main(args=None) -> None:
rclpy.init(args=args)
node = CameraPowerNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,575 @@
"""
test_camera_power_manager.py Unit tests for _camera_power_manager.py (Issue #375).
Covers:
- Mode sensor configurations
- Speed-driven upgrade transitions
- Speed-driven downgrade with hysteresis
- Scenario overrides (CROSSING, EMERGENCY, PARKED, INDOOR)
- Battery low cap
- Idle SOCIAL transition
- Safety invariants (rear CSI in ACTIVE/FULL, CROSSING cannot downgrade)
- Reset
- ModeDecision fields
"""
from __future__ import annotations
import pytest
from saltybot_bringup._camera_power_manager import (
ActiveSensors,
CameraMode,
CameraPowerFSM,
MODE_SENSORS,
ModeDecision,
Scenario,
_SPD_ACTIVE_DOWN,
_SPD_ACTIVE_UP,
_SPD_FULL_DOWN,
_SPD_FULL_UP,
_SPD_MOTION,
)
# ─────────────────────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────────────────────
class _FakeClock:
"""Injectable monotonic clock for deterministic tests."""
def __init__(self, t: float = 0.0) -> None:
self.t = t
def __call__(self) -> float:
return self.t
def advance(self, dt: float) -> None:
self.t += dt
def _fsm(hold: float = 5.0, idle: float = 30.0, bat_low: float = 20.0,
clock=None) -> CameraPowerFSM:
c = clock or _FakeClock()
return CameraPowerFSM(
downgrade_hold_s=hold,
idle_to_social_s=idle,
battery_low_pct=bat_low,
clock=c,
)
# ─────────────────────────────────────────────────────────────────────────────
# Mode sensor configurations
# ─────────────────────────────────────────────────────────────────────────────
class TestModeSensors:
def test_sleep_no_sensors(self):
s = MODE_SENSORS[CameraMode.SLEEP]
assert s.active_count == 0
def test_social_webcam_only(self):
s = MODE_SENSORS[CameraMode.SOCIAL]
assert s.webcam is True
assert s.csi_front is False
assert s.realsense is False
assert s.lidar is False
def test_aware_front_realsense_lidar(self):
s = MODE_SENSORS[CameraMode.AWARE]
assert s.csi_front is True
assert s.realsense is True
assert s.lidar is True
assert s.csi_rear is False
assert s.csi_left is False
assert s.csi_right is False
assert s.uwb is False
def test_active_front_rear_realsense_lidar_uwb(self):
s = MODE_SENSORS[CameraMode.ACTIVE]
assert s.csi_front is True
assert s.csi_rear is True
assert s.realsense is True
assert s.lidar is True
assert s.uwb is True
assert s.csi_left is False
assert s.csi_right is False
def test_full_all_sensors(self):
s = MODE_SENSORS[CameraMode.FULL]
assert s.csi_front is True
assert s.csi_rear is True
assert s.csi_left is True
assert s.csi_right is True
assert s.realsense is True
assert s.lidar is True
assert s.uwb is True
assert s.webcam is False # webcam not needed at speed
def test_mode_sensor_counts_increase(self):
counts = [MODE_SENSORS[m].active_count for m in CameraMode]
assert counts == sorted(counts), "Higher modes should have more sensors"
def test_safety_rear_csi_in_active(self):
assert MODE_SENSORS[CameraMode.ACTIVE].csi_rear is True
def test_safety_rear_csi_in_full(self):
assert MODE_SENSORS[CameraMode.FULL].csi_rear is True
# ─────────────────────────────────────────────────────────────────────────────
# Speed-driven upgrades (instant)
# ─────────────────────────────────────────────────────────────────────────────
class TestSpeedUpgrades:
def test_no_motion_stays_social(self):
f = _fsm()
d = f.update(speed_mps=0.0)
assert d.mode == CameraMode.SOCIAL
def test_slow_motion_upgrades_to_aware(self):
f = _fsm()
d = f.update(speed_mps=_SPD_MOTION + 0.1)
assert d.mode == CameraMode.AWARE
def test_5kmh_upgrades_to_active(self):
f = _fsm()
d = f.update(speed_mps=_SPD_ACTIVE_UP + 0.1)
assert d.mode == CameraMode.ACTIVE
def test_15kmh_upgrades_to_full(self):
f = _fsm()
d = f.update(speed_mps=_SPD_FULL_UP + 0.1)
assert d.mode == CameraMode.FULL
def test_upgrades_skip_intermediate_modes(self):
"""At 20 km/h from rest, jumps directly to FULL."""
f = _fsm()
d = f.update(speed_mps=20.0 / 3.6)
assert d.mode == CameraMode.FULL
def test_upgrade_is_immediate_no_hold(self):
"""Upgrades do NOT require hold time."""
clock = _FakeClock(0.0)
f = _fsm(hold=10.0, clock=clock)
# Still at t=0
d = f.update(speed_mps=_SPD_FULL_UP + 0.5)
assert d.mode == CameraMode.FULL
def test_exactly_at_motion_threshold(self):
f = _fsm()
d = f.update(speed_mps=_SPD_MOTION)
assert d.mode == CameraMode.AWARE
def test_exactly_at_active_threshold(self):
f = _fsm()
d = f.update(speed_mps=_SPD_ACTIVE_UP)
assert d.mode == CameraMode.ACTIVE
def test_exactly_at_full_threshold(self):
f = _fsm()
d = f.update(speed_mps=_SPD_FULL_UP)
assert d.mode == CameraMode.FULL
def test_negative_speed_clamped_to_zero(self):
f = _fsm()
d = f.update(speed_mps=-1.0)
assert d.mode == CameraMode.SOCIAL
# ─────────────────────────────────────────────────────────────────────────────
# Downgrade hysteresis
# ─────────────────────────────────────────────────────────────────────────────
class TestDowngradeHysteresis:
def _reach_full(self, f: CameraPowerFSM, clock: _FakeClock) -> None:
f.update(speed_mps=_SPD_FULL_UP + 0.5)
assert f.mode == CameraMode.FULL
def test_downgrade_not_immediate(self):
clock = _FakeClock(0.0)
f = _fsm(hold=5.0, clock=clock)
self._reach_full(f, clock)
# Drop to below FULL threshold but don't advance clock
d = f.update(speed_mps=0.0)
assert d.mode == CameraMode.FULL # still held
def test_downgrade_after_hold_expires(self):
clock = _FakeClock(0.0)
f = _fsm(hold=5.0, clock=clock)
self._reach_full(f, clock)
f.update(speed_mps=0.0) # first low-speed call starts the hold timer
clock.advance(5.1)
d = f.update(speed_mps=0.0) # hold expired — downgrade to AWARE (not SOCIAL;
# SOCIAL requires an additional idle timer from AWARE, see TestIdleToSocial)
assert d.mode == CameraMode.AWARE
def test_downgrade_cancelled_by_speed_spike(self):
"""If speed spikes back up during hold, downgrade is cancelled."""
clock = _FakeClock(0.0)
f = _fsm(hold=5.0, clock=clock)
self._reach_full(f, clock)
clock.advance(3.0)
f.update(speed_mps=0.0) # start downgrade timer
clock.advance(1.0)
f.update(speed_mps=_SPD_FULL_UP + 1.0) # back to full speed
clock.advance(3.0) # would have expired hold if not cancelled
d = f.update(speed_mps=0.0)
# Hold restarted; only 3s elapsed since the cancellation reset
assert d.mode == CameraMode.FULL
def test_full_to_active_hysteresis_band(self):
"""Speed in [_SPD_FULL_DOWN, _SPD_FULL_UP) while in FULL → stays FULL (hold pending)."""
clock = _FakeClock(0.0)
f = _fsm(hold=5.0, clock=clock)
self._reach_full(f, clock)
# Speed in hysteresis band (between down and up thresholds)
mid = (_SPD_FULL_DOWN + _SPD_FULL_UP) / 2.0
d = f.update(speed_mps=mid)
assert d.mode == CameraMode.FULL # pending downgrade, not yet applied
def test_hold_zero_downgrades_immediately(self):
clock = _FakeClock(0.0)
f = _fsm(hold=0.0, clock=clock)
f.update(speed_mps=_SPD_FULL_UP + 0.5)
# hold=0: downgrade applies on the very first low-speed call.
# From FULL at speed=0 → AWARE (SOCIAL requires a separate idle timer).
d = f.update(speed_mps=0.0)
assert d.mode == CameraMode.AWARE
def test_downgrade_full_to_active_at_13kmh(self):
clock = _FakeClock(0.0)
f = _fsm(hold=0.0, clock=clock)
f.update(speed_mps=_SPD_FULL_UP + 0.5) # → FULL
d = f.update(speed_mps=_SPD_FULL_DOWN - 0.05) # just below 13 km/h
assert d.mode == CameraMode.ACTIVE
def test_downgrade_active_to_aware(self):
clock = _FakeClock(0.0)
f = _fsm(hold=0.0, clock=clock)
f.update(speed_mps=_SPD_ACTIVE_UP + 0.5) # → ACTIVE
d = f.update(speed_mps=_SPD_ACTIVE_DOWN - 0.05)
assert d.mode == CameraMode.AWARE
# ─────────────────────────────────────────────────────────────────────────────
# Scenario overrides
# ─────────────────────────────────────────────────────────────────────────────
class TestScenarioOverrides:
def test_crossing_forces_full_from_rest(self):
f = _fsm()
d = f.update(speed_mps=0.0, scenario=Scenario.CROSSING)
assert d.mode == CameraMode.FULL
assert d.scenario_override is True
def test_crossing_forces_full_from_aware(self):
f = _fsm()
f.update(speed_mps=0.5) # AWARE
d = f.update(speed_mps=0.5, scenario=Scenario.CROSSING)
assert d.mode == CameraMode.FULL
def test_emergency_forces_full(self):
f = _fsm()
d = f.update(speed_mps=0.0, scenario=Scenario.EMERGENCY)
assert d.mode == CameraMode.FULL
assert d.scenario_override is True
def test_crossing_bypasses_hysteresis(self):
"""CROSSING forces FULL even if a downgrade is pending."""
clock = _FakeClock(0.0)
f = _fsm(hold=5.0, clock=clock)
f.update(speed_mps=_SPD_FULL_UP + 1.0) # FULL
clock.advance(4.0)
f.update(speed_mps=0.0) # pending downgrade started
d = f.update(speed_mps=0.0, scenario=Scenario.CROSSING)
assert d.mode == CameraMode.FULL
def test_parked_forces_social(self):
f = _fsm()
f.update(speed_mps=_SPD_FULL_UP + 1.0) # FULL
d = f.update(speed_mps=0.0, scenario=Scenario.PARKED)
assert d.mode == CameraMode.SOCIAL
assert d.scenario_override is True
def test_indoor_caps_at_aware(self):
f = _fsm()
# Speed says ACTIVE but indoor caps to AWARE
d = f.update(speed_mps=_SPD_ACTIVE_UP + 0.5, scenario=Scenario.INDOOR)
assert d.mode == CameraMode.AWARE
def test_indoor_cannot_reach_full(self):
f = _fsm()
d = f.update(speed_mps=_SPD_FULL_UP + 2.0, scenario=Scenario.INDOOR)
assert d.mode == CameraMode.AWARE
def test_outdoor_uses_speed_logic(self):
f = _fsm()
d = f.update(speed_mps=_SPD_FULL_UP + 0.5, scenario=Scenario.OUTDOOR)
assert d.mode == CameraMode.FULL
def test_unknown_scenario_uses_speed_logic(self):
f = _fsm()
d = f.update(speed_mps=_SPD_ACTIVE_UP + 0.5, scenario=Scenario.UNKNOWN)
assert d.mode == CameraMode.ACTIVE
def test_crossing_sets_all_csi_active(self):
f = _fsm()
d = f.update(speed_mps=0.0, scenario=Scenario.CROSSING)
s = d.sensors
assert s.csi_front and s.csi_rear and s.csi_left and s.csi_right
def test_scenario_override_false_for_speed_transition(self):
f = _fsm()
d = f.update(speed_mps=_SPD_ACTIVE_UP + 0.5, scenario=Scenario.OUTDOOR)
assert d.scenario_override is False
# ─────────────────────────────────────────────────────────────────────────────
# Battery low cap
# ─────────────────────────────────────────────────────────────────────────────
class TestBatteryLow:
def test_battery_low_caps_at_aware(self):
f = _fsm(bat_low=20.0)
d = f.update(speed_mps=_SPD_FULL_UP + 1.0, battery_pct=15.0)
assert d.mode == CameraMode.AWARE
def test_battery_low_prevents_active(self):
f = _fsm(bat_low=20.0)
d = f.update(speed_mps=_SPD_ACTIVE_UP + 0.5, battery_pct=19.9)
assert d.mode == CameraMode.AWARE
def test_battery_full_no_cap(self):
f = _fsm(bat_low=20.0)
d = f.update(speed_mps=_SPD_FULL_UP + 1.0, battery_pct=100.0)
assert d.mode == CameraMode.FULL
def test_battery_at_threshold_not_capped(self):
f = _fsm(bat_low=20.0)
d = f.update(speed_mps=_SPD_FULL_UP + 1.0, battery_pct=20.0)
# At exactly threshold — not below — so no cap
assert d.mode == CameraMode.FULL
def test_battery_low_allows_aware(self):
f = _fsm(bat_low=20.0)
d = f.update(speed_mps=_SPD_MOTION + 0.1, battery_pct=10.0)
assert d.mode == CameraMode.AWARE
# ─────────────────────────────────────────────────────────────────────────────
# Idle → SOCIAL transition
# ─────────────────────────────────────────────────────────────────────────────
class TestIdleToSocial:
def test_idle_transitions_to_social_after_timeout(self):
clock = _FakeClock(0.0)
f = _fsm(idle=10.0, hold=0.0, clock=clock)
# Bring to AWARE
f.update(speed_mps=_SPD_MOTION + 0.1)
# Reduce to near-zero
clock.advance(5.0)
f.update(speed_mps=0.05) # below 0.1, idle timer starts
clock.advance(10.1)
d = f.update(speed_mps=0.05)
assert d.mode == CameraMode.SOCIAL
def test_motion_resets_idle_timer(self):
clock = _FakeClock(0.0)
f = _fsm(idle=10.0, hold=0.0, clock=clock)
f.update(speed_mps=_SPD_MOTION + 0.1) # AWARE
clock.advance(5.0)
f.update(speed_mps=0.05) # idle timer starts
clock.advance(5.0)
f.update(speed_mps=1.0) # motion resets timer
clock.advance(6.0)
d = f.update(speed_mps=0.05) # timer restarted, only 6s elapsed
assert d.mode == CameraMode.AWARE
def test_not_idle_when_moving(self):
clock = _FakeClock(0.0)
f = _fsm(idle=5.0, clock=clock)
f.update(speed_mps=_SPD_MOTION + 0.1)
clock.advance(100.0)
d = f.update(speed_mps=_SPD_MOTION + 0.1) # still moving
assert d.mode == CameraMode.AWARE
# ─────────────────────────────────────────────────────────────────────────────
# Reset
# ─────────────────────────────────────────────────────────────────────────────
class TestReset:
def test_reset_to_sleep(self):
f = _fsm()
f.update(speed_mps=_SPD_FULL_UP + 1.0)
f.reset(CameraMode.SLEEP)
assert f.mode == CameraMode.SLEEP
def test_reset_clears_downgrade_timer(self):
clock = _FakeClock(0.0)
f = _fsm(hold=5.0, clock=clock)
f.update(speed_mps=_SPD_FULL_UP + 1.0)
f.update(speed_mps=0.0) # pending downgrade started
f.reset(CameraMode.AWARE)
# After reset, pending downgrade should be cleared
clock.advance(10.0)
d = f.update(speed_mps=0.0)
# From AWARE at speed 0 → idle timer not yet expired → stays AWARE (not SLEEP)
# Actually: speed 0 < _SPD_MOTION → desired=SOCIAL, hold=5.0
# With hold=5.0 and clock just advanced, pending since = now → no downgrade yet
assert d.mode in (CameraMode.AWARE, CameraMode.SOCIAL)
def test_reset_to_full(self):
f = _fsm()
f.reset(CameraMode.FULL)
assert f.mode == CameraMode.FULL
# ─────────────────────────────────────────────────────────────────────────────
# ModeDecision fields
# ─────────────────────────────────────────────────────────────────────────────
class TestModeDecisionFields:
def test_decision_has_mode(self):
f = _fsm()
d = f.update(speed_mps=0.0)
assert isinstance(d.mode, CameraMode)
def test_decision_has_sensors(self):
f = _fsm()
d = f.update(speed_mps=0.0)
assert isinstance(d.sensors, ActiveSensors)
def test_decision_sensors_match_mode(self):
f = _fsm()
d = f.update(speed_mps=_SPD_FULL_UP + 1.0)
assert d.sensors == MODE_SENSORS[CameraMode.FULL]
def test_decision_trigger_speed_recorded(self):
f = _fsm()
d = f.update(speed_mps=2.5)
assert abs(d.trigger_speed_mps - 2.5) < 1e-6
def test_decision_trigger_scenario_recorded(self):
f = _fsm()
d = f.update(speed_mps=0.0, scenario='indoor')
assert d.trigger_scenario == 'indoor'
def test_scenario_override_false_for_normal(self):
f = _fsm()
d = f.update(speed_mps=1.0)
assert d.scenario_override is False
def test_scenario_override_true_for_crossing(self):
f = _fsm()
d = f.update(speed_mps=0.0, scenario=Scenario.CROSSING)
assert d.scenario_override is True
# ─────────────────────────────────────────────────────────────────────────────
# Active sensor counts (RAM budget)
# ─────────────────────────────────────────────────────────────────────────────
class TestActiveSensors:
def test_active_sensor_count_non_negative(self):
for m, s in MODE_SENSORS.items():
assert s.active_count >= 0
def test_sleep_zero_sensors(self):
assert MODE_SENSORS[CameraMode.SLEEP].active_count == 0
def test_full_seven_sensors(self):
# csi x4 + realsense + lidar + uwb = 7
assert MODE_SENSORS[CameraMode.FULL].active_count == 7
def test_active_sensors_correct(self):
# csi_front + csi_rear + realsense + lidar + uwb = 5
assert MODE_SENSORS[CameraMode.ACTIVE].active_count == 5
def test_aware_sensors_correct(self):
# csi_front + realsense + lidar = 3
assert MODE_SENSORS[CameraMode.AWARE].active_count == 3
def test_social_sensors_correct(self):
# webcam = 1
assert MODE_SENSORS[CameraMode.SOCIAL].active_count == 1
# ─────────────────────────────────────────────────────────────────────────────
# Mode labels
# ─────────────────────────────────────────────────────────────────────────────
class TestModeLabels:
def test_all_modes_have_labels(self):
for m in CameraMode:
assert isinstance(m.label, str)
assert len(m.label) > 0
def test_sleep_label(self):
assert CameraMode.SLEEP.label == 'SLEEP'
def test_full_label(self):
assert CameraMode.FULL.label == 'FULL'
# ─────────────────────────────────────────────────────────────────────────────
# Integration: typical ride scenario
# ─────────────────────────────────────────────────────────────────────────────
class TestIntegrationRideScenario:
"""Simulates a typical follow-me trip: start→walk→jog→sprint→crossing→indoor."""
def test_full_ride(self):
clock = _FakeClock(0.0)
f = _fsm(hold=2.0, idle=5.0, clock=clock)
# Starting up
d = f.update(0.0, Scenario.OUTDOOR, 100.0)
assert d.mode == CameraMode.SOCIAL
# Walking pace (~3 km/h)
d = f.update(0.8, Scenario.OUTDOOR, 95.0)
assert d.mode == CameraMode.AWARE
# Jogging (~7 km/h)
d = f.update(2.0, Scenario.OUTDOOR, 90.0)
assert d.mode == CameraMode.ACTIVE
# High-speed following (~20 km/h)
d = f.update(5.6, Scenario.OUTDOOR, 85.0)
assert d.mode == CameraMode.FULL
# Street crossing — even at slow speed, stays FULL
d = f.update(1.0, Scenario.CROSSING, 85.0)
assert d.mode == CameraMode.FULL
assert d.scenario_override is True
# Back to outdoor walk
clock.advance(3.0)
d = f.update(1.0, Scenario.OUTDOOR, 80.0)
assert d.mode == CameraMode.FULL # hold not expired yet
clock.advance(2.1)
d = f.update(0.8, Scenario.OUTDOOR, 80.0)
assert d.mode == CameraMode.AWARE # hold expired, down to walk speed
# Enter supermarket (indoor cap)
d = f.update(0.8, Scenario.INDOOR, 78.0)
assert d.mode == CameraMode.AWARE
# Park and wait
d = f.update(0.0, Scenario.PARKED, 75.0)
assert d.mode == CameraMode.SOCIAL
# Low battery during fast follow
d = f.update(5.6, Scenario.OUTDOOR, 15.0)
assert d.mode == CameraMode.AWARE # battery cap

View File

@ -0,0 +1,26 @@
std_msgs/Header header
# Current power mode (use constants below)
uint8 mode
uint8 MODE_SLEEP = 0 # charging / idle — minimal sensors
uint8 MODE_SOCIAL = 1 # parked / socialising — webcam + face UI only
uint8 MODE_AWARE = 2 # indoor / slow (<5 km/h) — front CSI + RealSense + LIDAR
uint8 MODE_ACTIVE = 3 # sidewalk / bike path (515 km/h) — front+rear + RealSense + LIDAR + UWB
uint8 MODE_FULL = 4 # street / high-speed (>15 km/h) or crossing — all sensors
string mode_name # human-readable label
# Active sensor flags for this mode
bool csi_front
bool csi_rear
bool csi_left
bool csi_right
bool realsense
bool lidar
bool uwb
bool webcam
# Transition metadata
float32 trigger_speed_mps # speed that triggered the last transition
string trigger_scenario # scenario that triggered the last transition
bool scenario_override # true when scenario (not speed) forced the mode