feat: Add Issue #375 — adaptive camera power mode manager
Implements a 5-mode FSM for dynamic sensor activation based on speed,
scenario, and battery level — avoids running all 4 CSI cameras + full
sensor suite when unnecessary, saving ~1 GB RAM and significant compute.
Five modes (sensor sets):
SLEEP — no sensors (~150 MB RAM)
SOCIAL — webcam only (~400 MB RAM, parked/socialising)
AWARE — front CSI + RealSense + LIDAR (~850 MB RAM, indoor/<5km/h)
ACTIVE — front+rear CSI + RealSense + LIDAR + UWB (~1.15 GB, 5-15km/h)
FULL — all 4 CSI + RealSense + LIDAR + UWB (~1.55 GB, >15km/h)
Core library — _camera_power_manager.py (pure Python, no ROS2 deps)
- CameraPowerFSM.update(speed_mps, scenario, battery_pct) → ModeDecision
- Speed-driven upgrades: instant (safety-first)
- Speed-driven downgrades: held for downgrade_hold_s (default 5s, anti-flap)
- Scenario overrides (instant, bypass hysteresis):
· CROSSING / EMERGENCY → FULL always
· PARKED → SOCIAL immediately
· INDOOR → cap at AWARE (never ACTIVE/FULL indoors)
- Battery low cap: battery_pct < threshold → cap at AWARE
- Idle timer: near-zero speed holds at AWARE for idle_to_social_s (30s)
before dropping to SOCIAL (avoids cycling at traffic lights)
ROS2 node — camera_power_node.py
- Subscribes: /saltybot/speed, /saltybot/scenario, /saltybot/battery_pct
- Publishes: /saltybot/camera_mode (CameraPowerMode, latched, 2 Hz)
- Publishes: /saltybot/camera_cmd/{front,rear,left,right,realsense,lidar,uwb,webcam}
(std_msgs/Bool, TRANSIENT_LOCAL so late subscribers get last state)
- Logs mode transitions with speed/scenario/battery context
Tests — test/test_camera_power_manager.py: 64/64 passing
- Sensor configs: counts, correct flags per mode, safety invariants
- Speed upgrades: instantaneous at all thresholds, no hold required
- Downgrade hysteresis: hold timer, cancellation on speed spike, hold=0 instant
- Scenario overrides: CROSSING/EMERGENCY/PARKED/INDOOR, all CSIs on crossing
- Battery low: cap at AWARE, threshold boundary
- Idle timer: delay AWARE→SOCIAL, motion resets timer
- Reset, labels, ModeDecision fields
- Integration: full ride scenario (walk→jog→sprint→crossing→indoor→park→low bat)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
46fc2db8e6
commit
042c0529a1
@ -0,0 +1,329 @@
|
||||
"""
|
||||
_camera_power_manager.py — Adaptive camera power mode FSM (Issue #375).
|
||||
|
||||
Pure-Python library (no ROS2 / hardware dependencies) for full unit-test
|
||||
coverage without hardware.
|
||||
|
||||
Five Power Modes
|
||||
----------------
|
||||
SLEEP (0) — charging / idle.
|
||||
Sensors: none (or 1 CSI at 1 fps as wake trigger).
|
||||
~150 MB RAM.
|
||||
|
||||
SOCIAL (1) — parked / socialising.
|
||||
Sensors: C920 webcam + face UI.
|
||||
~400 MB RAM.
|
||||
|
||||
AWARE (2) — indoor, slow walking, <5 km/h.
|
||||
Sensors: front CSI + RealSense + LIDAR.
|
||||
~850 MB RAM.
|
||||
|
||||
ACTIVE (3) — sidewalk / bike path, 5–15 km/h.
|
||||
Sensors: front+rear CSI + RealSense + LIDAR + UWB.
|
||||
~1.15 GB RAM.
|
||||
|
||||
FULL (4) — street / high-speed >15 km/h, or crossing.
|
||||
Sensors: all 4 CSI + RealSense + LIDAR + UWB.
|
||||
~1.55 GB RAM.
|
||||
|
||||
Transition Logic
|
||||
----------------
|
||||
Automatic transitions are speed-driven with hysteresis to prevent flapping:
|
||||
|
||||
Upgrade thresholds (instantaneous):
|
||||
SOCIAL → AWARE : speed ≥ 0.3 m/s (~1 km/h, any motion)
|
||||
AWARE → ACTIVE : speed ≥ 1.4 m/s (~5 km/h)
|
||||
ACTIVE → FULL : speed ≥ 4.2 m/s (~15 km/h)
|
||||
|
||||
Downgrade thresholds (held for downgrade_hold_s before applying):
|
||||
FULL → ACTIVE : speed < 3.6 m/s (~13 km/h, 2 km/h hysteresis)
|
||||
ACTIVE → AWARE : speed < 1.1 m/s (~4 km/h)
|
||||
AWARE → SOCIAL : speed < 0.1 m/s and idle ≥ idle_to_social_s
|
||||
|
||||
Scenario overrides (bypass hysteresis, instant):
|
||||
CROSSING → FULL immediately and held until scenario clears
|
||||
EMERGENCY → FULL immediately (also signals speed reduction upstream)
|
||||
INDOOR → cap at AWARE (never ACTIVE or FULL indoors)
|
||||
PARKED → SOCIAL immediately
|
||||
OUTDOOR → normal speed-based logic
|
||||
|
||||
Battery low override:
|
||||
battery_pct < battery_low_pct → cap at AWARE to save power
|
||||
|
||||
Safety invariant:
|
||||
Rear CSI is a safety sensor at speed — it is always on in ACTIVE and FULL.
|
||||
Any mode downgrade during CROSSING is blocked.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from enum import IntEnum
|
||||
from typing import Optional
|
||||
|
||||
|
||||
# ── Mode enum ─────────────────────────────────────────────────────────────────
|
||||
|
||||
class CameraMode(IntEnum):
|
||||
SLEEP = 0
|
||||
SOCIAL = 1
|
||||
AWARE = 2
|
||||
ACTIVE = 3
|
||||
FULL = 4
|
||||
|
||||
@property
|
||||
def label(self) -> str:
|
||||
return self.name
|
||||
|
||||
|
||||
# ── Scenario enum ─────────────────────────────────────────────────────────────
|
||||
|
||||
class Scenario(str):
|
||||
"""
|
||||
Known scenario strings. The FSM accepts any string; these are the
|
||||
canonical values that trigger special behaviour.
|
||||
"""
|
||||
UNKNOWN = 'unknown'
|
||||
PARKED = 'parked'
|
||||
INDOOR = 'indoor'
|
||||
OUTDOOR = 'outdoor'
|
||||
CROSSING = 'crossing'
|
||||
EMERGENCY = 'emergency'
|
||||
|
||||
|
||||
# ── Sensor configuration per mode ─────────────────────────────────────────────
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ActiveSensors:
|
||||
csi_front: bool = False
|
||||
csi_rear: bool = False
|
||||
csi_left: bool = False
|
||||
csi_right: bool = False
|
||||
realsense: bool = False
|
||||
lidar: bool = False
|
||||
uwb: bool = False
|
||||
webcam: bool = False
|
||||
|
||||
@property
|
||||
def active_count(self) -> int:
|
||||
return sum([
|
||||
self.csi_front, self.csi_rear, self.csi_left, self.csi_right,
|
||||
self.realsense, self.lidar, self.uwb, self.webcam,
|
||||
])
|
||||
|
||||
|
||||
# Canonical sensor set for each mode
|
||||
MODE_SENSORS: dict[CameraMode, ActiveSensors] = {
|
||||
CameraMode.SLEEP: ActiveSensors(),
|
||||
CameraMode.SOCIAL: ActiveSensors(webcam=True),
|
||||
CameraMode.AWARE: ActiveSensors(csi_front=True, realsense=True, lidar=True),
|
||||
CameraMode.ACTIVE: ActiveSensors(csi_front=True, csi_rear=True,
|
||||
realsense=True, lidar=True, uwb=True),
|
||||
CameraMode.FULL: ActiveSensors(csi_front=True, csi_rear=True,
|
||||
csi_left=True, csi_right=True,
|
||||
realsense=True, lidar=True, uwb=True),
|
||||
}
|
||||
|
||||
# Speed thresholds (m/s)
|
||||
_SPD_MOTION = 0.3 # ~1 km/h — any meaningful motion
|
||||
_SPD_ACTIVE_UP = 5.0 / 3.6 # 5 km/h upgrade to ACTIVE
|
||||
_SPD_FULL_UP = 15.0 / 3.6 # 15 km/h upgrade to FULL
|
||||
_SPD_ACTIVE_DOWN = 4.0 / 3.6 # 4 km/h downgrade from ACTIVE (hysteresis gap)
|
||||
_SPD_FULL_DOWN = 13.0 / 3.6 # 13 km/h downgrade from FULL
|
||||
|
||||
# Scenario strings that force FULL
|
||||
_FORCE_FULL = frozenset({Scenario.CROSSING, Scenario.EMERGENCY})
|
||||
# Scenarios that cap the mode
|
||||
_CAP_AWARE = frozenset({Scenario.INDOOR})
|
||||
_CAP_SOCIAL = frozenset({Scenario.PARKED})
|
||||
|
||||
|
||||
# ── FSM result ────────────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class ModeDecision:
|
||||
mode: CameraMode
|
||||
sensors: ActiveSensors
|
||||
trigger_speed_mps: float
|
||||
trigger_scenario: str
|
||||
scenario_override: bool
|
||||
|
||||
|
||||
# ── FSM ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
class CameraPowerFSM:
|
||||
"""
|
||||
Finite state machine for camera power mode management.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
downgrade_hold_s : seconds a downgrade condition must persist before
|
||||
the mode drops (hysteresis). Default 5.0 s.
|
||||
idle_to_social_s : seconds of near-zero speed before AWARE → SOCIAL.
|
||||
Default 30.0 s.
|
||||
battery_low_pct : battery level below which mode is capped at AWARE.
|
||||
Default 20.0 %.
|
||||
clock : callable() → float for monotonic time (injectable
|
||||
for tests; defaults to time.monotonic).
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
downgrade_hold_s: float = 5.0,
|
||||
idle_to_social_s: float = 30.0,
|
||||
battery_low_pct: float = 20.0,
|
||||
clock=None,
|
||||
) -> None:
|
||||
self._downgrade_hold = downgrade_hold_s
|
||||
self._idle_to_social = idle_to_social_s
|
||||
self._battery_low_pct = battery_low_pct
|
||||
self._clock = clock or time.monotonic
|
||||
|
||||
self._mode: CameraMode = CameraMode.SLEEP
|
||||
self._downgrade_pending_since: Optional[float] = None
|
||||
self._idle_since: Optional[float] = None
|
||||
|
||||
# Last input values (for reporting)
|
||||
self._last_speed: float = 0.0
|
||||
self._last_scenario: str = Scenario.UNKNOWN
|
||||
|
||||
# ── Public API ────────────────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
def mode(self) -> CameraMode:
|
||||
return self._mode
|
||||
|
||||
def update(
|
||||
self,
|
||||
speed_mps: float,
|
||||
scenario: str = Scenario.UNKNOWN,
|
||||
battery_pct: float = 100.0,
|
||||
) -> ModeDecision:
|
||||
"""
|
||||
Evaluate inputs and return the current mode decision.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
speed_mps : current speed in m/s (non-negative)
|
||||
scenario : current operating scenario string
|
||||
battery_pct : battery charge level 0–100
|
||||
|
||||
Returns
|
||||
-------
|
||||
ModeDecision with the resolved mode and active sensor set.
|
||||
"""
|
||||
now = self._clock()
|
||||
speed_mps = max(0.0, float(speed_mps))
|
||||
self._last_speed = speed_mps
|
||||
self._last_scenario = scenario
|
||||
|
||||
scenario_override = False
|
||||
|
||||
# ── 1. Hard overrides (no hysteresis) ─────────────────────────────
|
||||
if scenario in _FORCE_FULL:
|
||||
self._mode = CameraMode.FULL
|
||||
self._downgrade_pending_since = None
|
||||
self._idle_since = None
|
||||
scenario_override = True
|
||||
return self._decision(scenario, scenario_override)
|
||||
|
||||
if scenario in _CAP_SOCIAL:
|
||||
self._mode = CameraMode.SOCIAL
|
||||
self._downgrade_pending_since = None
|
||||
self._idle_since = None
|
||||
scenario_override = True
|
||||
return self._decision(scenario, scenario_override)
|
||||
|
||||
# ── 2. Compute desired mode from speed ────────────────────────────
|
||||
desired = self._speed_to_mode(speed_mps)
|
||||
|
||||
# ── 3. Apply scenario caps ────────────────────────────────────────
|
||||
if scenario in _CAP_AWARE:
|
||||
desired = min(desired, CameraMode.AWARE)
|
||||
|
||||
# ── 4. Apply battery low cap ──────────────────────────────────────
|
||||
if battery_pct < self._battery_low_pct:
|
||||
desired = min(desired, CameraMode.AWARE)
|
||||
|
||||
# ── 5. Idle timer: delay AWARE → SOCIAL when briefly stopped ─────────
|
||||
# _speed_to_mode already returns SOCIAL for speed < _SPD_MOTION.
|
||||
# When in AWARE (or above) and nearly stopped, hold at AWARE until
|
||||
# the idle timer expires to avoid flapping at traffic lights.
|
||||
if speed_mps < 0.1 and self._mode >= CameraMode.AWARE:
|
||||
if self._idle_since is None:
|
||||
self._idle_since = now
|
||||
if now - self._idle_since < self._idle_to_social:
|
||||
# Timer not yet expired — enforce minimum AWARE
|
||||
desired = max(desired, CameraMode.AWARE)
|
||||
# else: timer expired — let desired remain SOCIAL naturally
|
||||
else:
|
||||
self._idle_since = None
|
||||
|
||||
# ── 6. Apply hysteresis for downgrades ────────────────────────────
|
||||
if desired < self._mode:
|
||||
# Downgrade requested — start hold timer on first detection, then
|
||||
# check on every call (including the first, so hold=0 is instant).
|
||||
if self._downgrade_pending_since is None:
|
||||
self._downgrade_pending_since = now
|
||||
if now - self._downgrade_pending_since >= self._downgrade_hold:
|
||||
self._mode = desired
|
||||
self._downgrade_pending_since = None
|
||||
# else: hold not expired — keep current mode
|
||||
else:
|
||||
# Upgrade or no change — apply immediately, cancel any pending downgrade
|
||||
self._downgrade_pending_since = None
|
||||
self._mode = desired
|
||||
|
||||
return self._decision(scenario, scenario_override)
|
||||
|
||||
def reset(self, mode: CameraMode = CameraMode.SLEEP) -> None:
|
||||
"""Reset FSM state (e.g. on node restart)."""
|
||||
self._mode = mode
|
||||
self._downgrade_pending_since = None
|
||||
self._idle_since = None
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────
|
||||
|
||||
def _speed_to_mode(self, speed_mps: float) -> CameraMode:
|
||||
"""Map speed to desired mode using hysteresis thresholds."""
|
||||
if self._mode == CameraMode.FULL:
|
||||
# Downgrade from FULL uses lower threshold
|
||||
if speed_mps >= _SPD_FULL_DOWN:
|
||||
return CameraMode.FULL
|
||||
elif speed_mps >= _SPD_ACTIVE_DOWN:
|
||||
return CameraMode.ACTIVE
|
||||
elif speed_mps >= _SPD_MOTION:
|
||||
return CameraMode.AWARE
|
||||
else:
|
||||
return CameraMode.AWARE # idle handled separately
|
||||
|
||||
elif self._mode == CameraMode.ACTIVE:
|
||||
if speed_mps >= _SPD_FULL_UP:
|
||||
return CameraMode.FULL
|
||||
elif speed_mps >= _SPD_ACTIVE_DOWN:
|
||||
return CameraMode.ACTIVE
|
||||
elif speed_mps >= _SPD_MOTION:
|
||||
return CameraMode.AWARE
|
||||
else:
|
||||
return CameraMode.AWARE
|
||||
|
||||
else:
|
||||
# SLEEP / SOCIAL / AWARE — use upgrade thresholds
|
||||
if speed_mps >= _SPD_FULL_UP:
|
||||
return CameraMode.FULL
|
||||
elif speed_mps >= _SPD_ACTIVE_UP:
|
||||
return CameraMode.ACTIVE
|
||||
elif speed_mps >= _SPD_MOTION:
|
||||
return CameraMode.AWARE
|
||||
else:
|
||||
return CameraMode.SOCIAL
|
||||
|
||||
def _decision(self, scenario: str, override: bool) -> ModeDecision:
|
||||
return ModeDecision(
|
||||
mode = self._mode,
|
||||
sensors = MODE_SENSORS[self._mode],
|
||||
trigger_speed_mps = self._last_speed,
|
||||
trigger_scenario = scenario,
|
||||
scenario_override = override,
|
||||
)
|
||||
@ -0,0 +1,215 @@
|
||||
"""
|
||||
camera_power_node.py — Adaptive camera power mode manager (Issue #375).
|
||||
|
||||
Subscribes to speed, scenario, and battery level inputs, runs the
|
||||
CameraPowerFSM, and publishes camera enable/disable commands at 2 Hz.
|
||||
|
||||
Subscribes
|
||||
----------
|
||||
/saltybot/speed std_msgs/Float32 robot speed in m/s
|
||||
/saltybot/scenario std_msgs/String operating scenario label
|
||||
/saltybot/battery_pct std_msgs/Float32 battery charge level 0–100 %
|
||||
|
||||
Publishes
|
||||
---------
|
||||
/saltybot/camera_mode saltybot_scene_msgs/CameraPowerMode (2 Hz)
|
||||
/saltybot/camera_cmd/front std_msgs/Bool — front CSI enable
|
||||
/saltybot/camera_cmd/rear std_msgs/Bool — rear CSI enable
|
||||
/saltybot/camera_cmd/left std_msgs/Bool — left CSI enable
|
||||
/saltybot/camera_cmd/right std_msgs/Bool — right CSI enable
|
||||
/saltybot/camera_cmd/realsense std_msgs/Bool — D435i enable
|
||||
/saltybot/camera_cmd/lidar std_msgs/Bool — LIDAR enable
|
||||
/saltybot/camera_cmd/uwb std_msgs/Bool — UWB enable
|
||||
/saltybot/camera_cmd/webcam std_msgs/Bool — C920 webcam enable
|
||||
|
||||
Parameters
|
||||
----------
|
||||
rate_hz float 2.0 FSM evaluation and publish rate
|
||||
downgrade_hold_s float 5.0 Seconds before a downgrade is applied
|
||||
idle_to_social_s float 30.0 Idle time before AWARE→SOCIAL
|
||||
battery_low_pct float 20.0 Battery threshold for AWARE cap
|
||||
initial_mode int 0 Starting mode (0=SLEEP … 4=FULL)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
|
||||
|
||||
from std_msgs.msg import Bool, Float32, String
|
||||
from saltybot_scene_msgs.msg import CameraPowerMode
|
||||
|
||||
from ._camera_power_manager import (
|
||||
CameraMode,
|
||||
CameraPowerFSM,
|
||||
Scenario,
|
||||
MODE_SENSORS,
|
||||
)
|
||||
|
||||
|
||||
_RELIABLE_LATCHED = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=1,
|
||||
durability=DurabilityPolicy.TRANSIENT_LOCAL,
|
||||
)
|
||||
_RELIABLE = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=10,
|
||||
)
|
||||
_SENSOR_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=5,
|
||||
)
|
||||
|
||||
# Camera command topic suffixes and their ActiveSensors field name
|
||||
_CAM_TOPICS = [
|
||||
('front', 'csi_front'),
|
||||
('rear', 'csi_rear'),
|
||||
('left', 'csi_left'),
|
||||
('right', 'csi_right'),
|
||||
('realsense', 'realsense'),
|
||||
('lidar', 'lidar'),
|
||||
('uwb', 'uwb'),
|
||||
('webcam', 'webcam'),
|
||||
]
|
||||
|
||||
|
||||
class CameraPowerNode(Node):
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__('camera_power_node')
|
||||
|
||||
# ── Parameters ──────────────────────────────────────────────────────
|
||||
self.declare_parameter('rate_hz', 2.0)
|
||||
self.declare_parameter('downgrade_hold_s', 5.0)
|
||||
self.declare_parameter('idle_to_social_s', 30.0)
|
||||
self.declare_parameter('battery_low_pct', 20.0)
|
||||
self.declare_parameter('initial_mode', 0)
|
||||
|
||||
p = self.get_parameter
|
||||
rate_hz = max(float(p('rate_hz').value), 0.1)
|
||||
downgrade_hold_s = max(float(p('downgrade_hold_s').value), 0.0)
|
||||
idle_to_social_s = max(float(p('idle_to_social_s').value), 0.0)
|
||||
battery_low_pct = float(p('battery_low_pct').value)
|
||||
initial_mode = int(p('initial_mode').value)
|
||||
|
||||
# ── FSM ──────────────────────────────────────────────────────────────
|
||||
self._fsm = CameraPowerFSM(
|
||||
downgrade_hold_s = downgrade_hold_s,
|
||||
idle_to_social_s = idle_to_social_s,
|
||||
battery_low_pct = battery_low_pct,
|
||||
)
|
||||
try:
|
||||
self._fsm.reset(CameraMode(initial_mode))
|
||||
except ValueError:
|
||||
self._fsm.reset(CameraMode.SLEEP)
|
||||
|
||||
# ── Input state ──────────────────────────────────────────────────────
|
||||
self._speed_mps: float = 0.0
|
||||
self._scenario: str = Scenario.UNKNOWN
|
||||
self._battery_pct: float = 100.0
|
||||
|
||||
# ── Subscribers ──────────────────────────────────────────────────────
|
||||
self._speed_sub = self.create_subscription(
|
||||
Float32, '/saltybot/speed',
|
||||
lambda m: setattr(self, '_speed_mps', max(0.0, float(m.data))),
|
||||
_SENSOR_QOS,
|
||||
)
|
||||
self._scenario_sub = self.create_subscription(
|
||||
String, '/saltybot/scenario',
|
||||
lambda m: setattr(self, '_scenario', str(m.data)),
|
||||
_RELIABLE,
|
||||
)
|
||||
self._battery_sub = self.create_subscription(
|
||||
Float32, '/saltybot/battery_pct',
|
||||
lambda m: setattr(self, '_battery_pct',
|
||||
max(0.0, min(100.0, float(m.data)))),
|
||||
_RELIABLE,
|
||||
)
|
||||
|
||||
# ── Publishers ───────────────────────────────────────────────────────
|
||||
self._mode_pub = self.create_publisher(
|
||||
CameraPowerMode, '/saltybot/camera_mode', _RELIABLE_LATCHED,
|
||||
)
|
||||
self._cam_pubs: dict[str, rclpy.publisher.Publisher] = {}
|
||||
for suffix, _ in _CAM_TOPICS:
|
||||
self._cam_pubs[suffix] = self.create_publisher(
|
||||
Bool, f'/saltybot/camera_cmd/{suffix}', _RELIABLE_LATCHED,
|
||||
)
|
||||
|
||||
# ── Timer ────────────────────────────────────────────────────────────
|
||||
self._timer = self.create_timer(1.0 / rate_hz, self._step)
|
||||
self._prev_mode: CameraMode | None = None
|
||||
|
||||
self.get_logger().info(
|
||||
f'camera_power_node ready — '
|
||||
f'rate={rate_hz:.1f}Hz, '
|
||||
f'downgrade_hold={downgrade_hold_s}s, '
|
||||
f'idle_to_social={idle_to_social_s}s, '
|
||||
f'battery_low={battery_low_pct}%'
|
||||
)
|
||||
|
||||
# ── Step callback ─────────────────────────────────────────────────────────
|
||||
|
||||
def _step(self) -> None:
|
||||
decision = self._fsm.update(
|
||||
speed_mps = self._speed_mps,
|
||||
scenario = self._scenario,
|
||||
battery_pct = self._battery_pct,
|
||||
)
|
||||
|
||||
# Log transitions
|
||||
if decision.mode != self._prev_mode:
|
||||
prev_name = self._prev_mode.label if self._prev_mode else 'INIT'
|
||||
self.get_logger().info(
|
||||
f'Camera mode: {prev_name} → {decision.mode.label} '
|
||||
f'(speed={self._speed_mps:.2f}m/s '
|
||||
f'scenario={self._scenario} '
|
||||
f'battery={self._battery_pct:.0f}%)'
|
||||
)
|
||||
self._prev_mode = decision.mode
|
||||
|
||||
# Publish CameraPowerMode message
|
||||
msg = CameraPowerMode()
|
||||
msg.header.stamp = self.get_clock().now().to_msg()
|
||||
msg.header.frame_id = ''
|
||||
msg.mode = int(decision.mode)
|
||||
msg.mode_name = decision.mode.label
|
||||
s = decision.sensors
|
||||
msg.csi_front = s.csi_front
|
||||
msg.csi_rear = s.csi_rear
|
||||
msg.csi_left = s.csi_left
|
||||
msg.csi_right = s.csi_right
|
||||
msg.realsense = s.realsense
|
||||
msg.lidar = s.lidar
|
||||
msg.uwb = s.uwb
|
||||
msg.webcam = s.webcam
|
||||
msg.trigger_speed_mps = float(decision.trigger_speed_mps)
|
||||
msg.trigger_scenario = decision.trigger_scenario
|
||||
msg.scenario_override = decision.scenario_override
|
||||
self._mode_pub.publish(msg)
|
||||
|
||||
# Publish per-camera Bool commands
|
||||
for suffix, field in _CAM_TOPICS:
|
||||
enabled = bool(getattr(s, field))
|
||||
b = Bool()
|
||||
b.data = enabled
|
||||
self._cam_pubs[suffix].publish(b)
|
||||
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = CameraPowerNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -0,0 +1,575 @@
|
||||
"""
|
||||
test_camera_power_manager.py — Unit tests for _camera_power_manager.py (Issue #375).
|
||||
|
||||
Covers:
|
||||
- Mode sensor configurations
|
||||
- Speed-driven upgrade transitions
|
||||
- Speed-driven downgrade with hysteresis
|
||||
- Scenario overrides (CROSSING, EMERGENCY, PARKED, INDOOR)
|
||||
- Battery low cap
|
||||
- Idle → SOCIAL transition
|
||||
- Safety invariants (rear CSI in ACTIVE/FULL, CROSSING cannot downgrade)
|
||||
- Reset
|
||||
- ModeDecision fields
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from saltybot_bringup._camera_power_manager import (
|
||||
ActiveSensors,
|
||||
CameraMode,
|
||||
CameraPowerFSM,
|
||||
MODE_SENSORS,
|
||||
ModeDecision,
|
||||
Scenario,
|
||||
_SPD_ACTIVE_DOWN,
|
||||
_SPD_ACTIVE_UP,
|
||||
_SPD_FULL_DOWN,
|
||||
_SPD_FULL_UP,
|
||||
_SPD_MOTION,
|
||||
)
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Helpers
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class _FakeClock:
|
||||
"""Injectable monotonic clock for deterministic tests."""
|
||||
def __init__(self, t: float = 0.0) -> None:
|
||||
self.t = t
|
||||
def __call__(self) -> float:
|
||||
return self.t
|
||||
def advance(self, dt: float) -> None:
|
||||
self.t += dt
|
||||
|
||||
|
||||
def _fsm(hold: float = 5.0, idle: float = 30.0, bat_low: float = 20.0,
|
||||
clock=None) -> CameraPowerFSM:
|
||||
c = clock or _FakeClock()
|
||||
return CameraPowerFSM(
|
||||
downgrade_hold_s=hold,
|
||||
idle_to_social_s=idle,
|
||||
battery_low_pct=bat_low,
|
||||
clock=c,
|
||||
)
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Mode sensor configurations
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestModeSensors:
|
||||
|
||||
def test_sleep_no_sensors(self):
|
||||
s = MODE_SENSORS[CameraMode.SLEEP]
|
||||
assert s.active_count == 0
|
||||
|
||||
def test_social_webcam_only(self):
|
||||
s = MODE_SENSORS[CameraMode.SOCIAL]
|
||||
assert s.webcam is True
|
||||
assert s.csi_front is False
|
||||
assert s.realsense is False
|
||||
assert s.lidar is False
|
||||
|
||||
def test_aware_front_realsense_lidar(self):
|
||||
s = MODE_SENSORS[CameraMode.AWARE]
|
||||
assert s.csi_front is True
|
||||
assert s.realsense is True
|
||||
assert s.lidar is True
|
||||
assert s.csi_rear is False
|
||||
assert s.csi_left is False
|
||||
assert s.csi_right is False
|
||||
assert s.uwb is False
|
||||
|
||||
def test_active_front_rear_realsense_lidar_uwb(self):
|
||||
s = MODE_SENSORS[CameraMode.ACTIVE]
|
||||
assert s.csi_front is True
|
||||
assert s.csi_rear is True
|
||||
assert s.realsense is True
|
||||
assert s.lidar is True
|
||||
assert s.uwb is True
|
||||
assert s.csi_left is False
|
||||
assert s.csi_right is False
|
||||
|
||||
def test_full_all_sensors(self):
|
||||
s = MODE_SENSORS[CameraMode.FULL]
|
||||
assert s.csi_front is True
|
||||
assert s.csi_rear is True
|
||||
assert s.csi_left is True
|
||||
assert s.csi_right is True
|
||||
assert s.realsense is True
|
||||
assert s.lidar is True
|
||||
assert s.uwb is True
|
||||
assert s.webcam is False # webcam not needed at speed
|
||||
|
||||
def test_mode_sensor_counts_increase(self):
|
||||
counts = [MODE_SENSORS[m].active_count for m in CameraMode]
|
||||
assert counts == sorted(counts), "Higher modes should have more sensors"
|
||||
|
||||
def test_safety_rear_csi_in_active(self):
|
||||
assert MODE_SENSORS[CameraMode.ACTIVE].csi_rear is True
|
||||
|
||||
def test_safety_rear_csi_in_full(self):
|
||||
assert MODE_SENSORS[CameraMode.FULL].csi_rear is True
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Speed-driven upgrades (instant)
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestSpeedUpgrades:
|
||||
|
||||
def test_no_motion_stays_social(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=0.0)
|
||||
assert d.mode == CameraMode.SOCIAL
|
||||
|
||||
def test_slow_motion_upgrades_to_aware(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=_SPD_MOTION + 0.1)
|
||||
assert d.mode == CameraMode.AWARE
|
||||
|
||||
def test_5kmh_upgrades_to_active(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=_SPD_ACTIVE_UP + 0.1)
|
||||
assert d.mode == CameraMode.ACTIVE
|
||||
|
||||
def test_15kmh_upgrades_to_full(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=_SPD_FULL_UP + 0.1)
|
||||
assert d.mode == CameraMode.FULL
|
||||
|
||||
def test_upgrades_skip_intermediate_modes(self):
|
||||
"""At 20 km/h from rest, jumps directly to FULL."""
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=20.0 / 3.6)
|
||||
assert d.mode == CameraMode.FULL
|
||||
|
||||
def test_upgrade_is_immediate_no_hold(self):
|
||||
"""Upgrades do NOT require hold time."""
|
||||
clock = _FakeClock(0.0)
|
||||
f = _fsm(hold=10.0, clock=clock)
|
||||
# Still at t=0
|
||||
d = f.update(speed_mps=_SPD_FULL_UP + 0.5)
|
||||
assert d.mode == CameraMode.FULL
|
||||
|
||||
def test_exactly_at_motion_threshold(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=_SPD_MOTION)
|
||||
assert d.mode == CameraMode.AWARE
|
||||
|
||||
def test_exactly_at_active_threshold(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=_SPD_ACTIVE_UP)
|
||||
assert d.mode == CameraMode.ACTIVE
|
||||
|
||||
def test_exactly_at_full_threshold(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=_SPD_FULL_UP)
|
||||
assert d.mode == CameraMode.FULL
|
||||
|
||||
def test_negative_speed_clamped_to_zero(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=-1.0)
|
||||
assert d.mode == CameraMode.SOCIAL
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Downgrade hysteresis
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestDowngradeHysteresis:
|
||||
|
||||
def _reach_full(self, f: CameraPowerFSM, clock: _FakeClock) -> None:
|
||||
f.update(speed_mps=_SPD_FULL_UP + 0.5)
|
||||
assert f.mode == CameraMode.FULL
|
||||
|
||||
def test_downgrade_not_immediate(self):
|
||||
clock = _FakeClock(0.0)
|
||||
f = _fsm(hold=5.0, clock=clock)
|
||||
self._reach_full(f, clock)
|
||||
# Drop to below FULL threshold but don't advance clock
|
||||
d = f.update(speed_mps=0.0)
|
||||
assert d.mode == CameraMode.FULL # still held
|
||||
|
||||
def test_downgrade_after_hold_expires(self):
|
||||
clock = _FakeClock(0.0)
|
||||
f = _fsm(hold=5.0, clock=clock)
|
||||
self._reach_full(f, clock)
|
||||
f.update(speed_mps=0.0) # first low-speed call starts the hold timer
|
||||
clock.advance(5.1)
|
||||
d = f.update(speed_mps=0.0) # hold expired — downgrade to AWARE (not SOCIAL;
|
||||
# SOCIAL requires an additional idle timer from AWARE, see TestIdleToSocial)
|
||||
assert d.mode == CameraMode.AWARE
|
||||
|
||||
def test_downgrade_cancelled_by_speed_spike(self):
|
||||
"""If speed spikes back up during hold, downgrade is cancelled."""
|
||||
clock = _FakeClock(0.0)
|
||||
f = _fsm(hold=5.0, clock=clock)
|
||||
self._reach_full(f, clock)
|
||||
clock.advance(3.0)
|
||||
f.update(speed_mps=0.0) # start downgrade timer
|
||||
clock.advance(1.0)
|
||||
f.update(speed_mps=_SPD_FULL_UP + 1.0) # back to full speed
|
||||
clock.advance(3.0) # would have expired hold if not cancelled
|
||||
d = f.update(speed_mps=0.0)
|
||||
# Hold restarted; only 3s elapsed since the cancellation reset
|
||||
assert d.mode == CameraMode.FULL
|
||||
|
||||
def test_full_to_active_hysteresis_band(self):
|
||||
"""Speed in [_SPD_FULL_DOWN, _SPD_FULL_UP) while in FULL → stays FULL (hold pending)."""
|
||||
clock = _FakeClock(0.0)
|
||||
f = _fsm(hold=5.0, clock=clock)
|
||||
self._reach_full(f, clock)
|
||||
# Speed in hysteresis band (between down and up thresholds)
|
||||
mid = (_SPD_FULL_DOWN + _SPD_FULL_UP) / 2.0
|
||||
d = f.update(speed_mps=mid)
|
||||
assert d.mode == CameraMode.FULL # pending downgrade, not yet applied
|
||||
|
||||
def test_hold_zero_downgrades_immediately(self):
|
||||
clock = _FakeClock(0.0)
|
||||
f = _fsm(hold=0.0, clock=clock)
|
||||
f.update(speed_mps=_SPD_FULL_UP + 0.5)
|
||||
# hold=0: downgrade applies on the very first low-speed call.
|
||||
# From FULL at speed=0 → AWARE (SOCIAL requires a separate idle timer).
|
||||
d = f.update(speed_mps=0.0)
|
||||
assert d.mode == CameraMode.AWARE
|
||||
|
||||
def test_downgrade_full_to_active_at_13kmh(self):
|
||||
clock = _FakeClock(0.0)
|
||||
f = _fsm(hold=0.0, clock=clock)
|
||||
f.update(speed_mps=_SPD_FULL_UP + 0.5) # → FULL
|
||||
d = f.update(speed_mps=_SPD_FULL_DOWN - 0.05) # just below 13 km/h
|
||||
assert d.mode == CameraMode.ACTIVE
|
||||
|
||||
def test_downgrade_active_to_aware(self):
|
||||
clock = _FakeClock(0.0)
|
||||
f = _fsm(hold=0.0, clock=clock)
|
||||
f.update(speed_mps=_SPD_ACTIVE_UP + 0.5) # → ACTIVE
|
||||
d = f.update(speed_mps=_SPD_ACTIVE_DOWN - 0.05)
|
||||
assert d.mode == CameraMode.AWARE
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Scenario overrides
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestScenarioOverrides:
|
||||
|
||||
def test_crossing_forces_full_from_rest(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=0.0, scenario=Scenario.CROSSING)
|
||||
assert d.mode == CameraMode.FULL
|
||||
assert d.scenario_override is True
|
||||
|
||||
def test_crossing_forces_full_from_aware(self):
|
||||
f = _fsm()
|
||||
f.update(speed_mps=0.5) # AWARE
|
||||
d = f.update(speed_mps=0.5, scenario=Scenario.CROSSING)
|
||||
assert d.mode == CameraMode.FULL
|
||||
|
||||
def test_emergency_forces_full(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=0.0, scenario=Scenario.EMERGENCY)
|
||||
assert d.mode == CameraMode.FULL
|
||||
assert d.scenario_override is True
|
||||
|
||||
def test_crossing_bypasses_hysteresis(self):
|
||||
"""CROSSING forces FULL even if a downgrade is pending."""
|
||||
clock = _FakeClock(0.0)
|
||||
f = _fsm(hold=5.0, clock=clock)
|
||||
f.update(speed_mps=_SPD_FULL_UP + 1.0) # FULL
|
||||
clock.advance(4.0)
|
||||
f.update(speed_mps=0.0) # pending downgrade started
|
||||
d = f.update(speed_mps=0.0, scenario=Scenario.CROSSING)
|
||||
assert d.mode == CameraMode.FULL
|
||||
|
||||
def test_parked_forces_social(self):
|
||||
f = _fsm()
|
||||
f.update(speed_mps=_SPD_FULL_UP + 1.0) # FULL
|
||||
d = f.update(speed_mps=0.0, scenario=Scenario.PARKED)
|
||||
assert d.mode == CameraMode.SOCIAL
|
||||
assert d.scenario_override is True
|
||||
|
||||
def test_indoor_caps_at_aware(self):
|
||||
f = _fsm()
|
||||
# Speed says ACTIVE but indoor caps to AWARE
|
||||
d = f.update(speed_mps=_SPD_ACTIVE_UP + 0.5, scenario=Scenario.INDOOR)
|
||||
assert d.mode == CameraMode.AWARE
|
||||
|
||||
def test_indoor_cannot_reach_full(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=_SPD_FULL_UP + 2.0, scenario=Scenario.INDOOR)
|
||||
assert d.mode == CameraMode.AWARE
|
||||
|
||||
def test_outdoor_uses_speed_logic(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=_SPD_FULL_UP + 0.5, scenario=Scenario.OUTDOOR)
|
||||
assert d.mode == CameraMode.FULL
|
||||
|
||||
def test_unknown_scenario_uses_speed_logic(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=_SPD_ACTIVE_UP + 0.5, scenario=Scenario.UNKNOWN)
|
||||
assert d.mode == CameraMode.ACTIVE
|
||||
|
||||
def test_crossing_sets_all_csi_active(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=0.0, scenario=Scenario.CROSSING)
|
||||
s = d.sensors
|
||||
assert s.csi_front and s.csi_rear and s.csi_left and s.csi_right
|
||||
|
||||
def test_scenario_override_false_for_speed_transition(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=_SPD_ACTIVE_UP + 0.5, scenario=Scenario.OUTDOOR)
|
||||
assert d.scenario_override is False
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Battery low cap
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestBatteryLow:
|
||||
|
||||
def test_battery_low_caps_at_aware(self):
|
||||
f = _fsm(bat_low=20.0)
|
||||
d = f.update(speed_mps=_SPD_FULL_UP + 1.0, battery_pct=15.0)
|
||||
assert d.mode == CameraMode.AWARE
|
||||
|
||||
def test_battery_low_prevents_active(self):
|
||||
f = _fsm(bat_low=20.0)
|
||||
d = f.update(speed_mps=_SPD_ACTIVE_UP + 0.5, battery_pct=19.9)
|
||||
assert d.mode == CameraMode.AWARE
|
||||
|
||||
def test_battery_full_no_cap(self):
|
||||
f = _fsm(bat_low=20.0)
|
||||
d = f.update(speed_mps=_SPD_FULL_UP + 1.0, battery_pct=100.0)
|
||||
assert d.mode == CameraMode.FULL
|
||||
|
||||
def test_battery_at_threshold_not_capped(self):
|
||||
f = _fsm(bat_low=20.0)
|
||||
d = f.update(speed_mps=_SPD_FULL_UP + 1.0, battery_pct=20.0)
|
||||
# At exactly threshold — not below — so no cap
|
||||
assert d.mode == CameraMode.FULL
|
||||
|
||||
def test_battery_low_allows_aware(self):
|
||||
f = _fsm(bat_low=20.0)
|
||||
d = f.update(speed_mps=_SPD_MOTION + 0.1, battery_pct=10.0)
|
||||
assert d.mode == CameraMode.AWARE
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Idle → SOCIAL transition
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestIdleToSocial:
|
||||
|
||||
def test_idle_transitions_to_social_after_timeout(self):
|
||||
clock = _FakeClock(0.0)
|
||||
f = _fsm(idle=10.0, hold=0.0, clock=clock)
|
||||
# Bring to AWARE
|
||||
f.update(speed_mps=_SPD_MOTION + 0.1)
|
||||
# Reduce to near-zero
|
||||
clock.advance(5.0)
|
||||
f.update(speed_mps=0.05) # below 0.1, idle timer starts
|
||||
clock.advance(10.1)
|
||||
d = f.update(speed_mps=0.05)
|
||||
assert d.mode == CameraMode.SOCIAL
|
||||
|
||||
def test_motion_resets_idle_timer(self):
|
||||
clock = _FakeClock(0.0)
|
||||
f = _fsm(idle=10.0, hold=0.0, clock=clock)
|
||||
f.update(speed_mps=_SPD_MOTION + 0.1) # AWARE
|
||||
clock.advance(5.0)
|
||||
f.update(speed_mps=0.05) # idle timer starts
|
||||
clock.advance(5.0)
|
||||
f.update(speed_mps=1.0) # motion resets timer
|
||||
clock.advance(6.0)
|
||||
d = f.update(speed_mps=0.05) # timer restarted, only 6s elapsed
|
||||
assert d.mode == CameraMode.AWARE
|
||||
|
||||
def test_not_idle_when_moving(self):
|
||||
clock = _FakeClock(0.0)
|
||||
f = _fsm(idle=5.0, clock=clock)
|
||||
f.update(speed_mps=_SPD_MOTION + 0.1)
|
||||
clock.advance(100.0)
|
||||
d = f.update(speed_mps=_SPD_MOTION + 0.1) # still moving
|
||||
assert d.mode == CameraMode.AWARE
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Reset
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestReset:
|
||||
|
||||
def test_reset_to_sleep(self):
|
||||
f = _fsm()
|
||||
f.update(speed_mps=_SPD_FULL_UP + 1.0)
|
||||
f.reset(CameraMode.SLEEP)
|
||||
assert f.mode == CameraMode.SLEEP
|
||||
|
||||
def test_reset_clears_downgrade_timer(self):
|
||||
clock = _FakeClock(0.0)
|
||||
f = _fsm(hold=5.0, clock=clock)
|
||||
f.update(speed_mps=_SPD_FULL_UP + 1.0)
|
||||
f.update(speed_mps=0.0) # pending downgrade started
|
||||
f.reset(CameraMode.AWARE)
|
||||
# After reset, pending downgrade should be cleared
|
||||
clock.advance(10.0)
|
||||
d = f.update(speed_mps=0.0)
|
||||
# From AWARE at speed 0 → idle timer not yet expired → stays AWARE (not SLEEP)
|
||||
# Actually: speed 0 < _SPD_MOTION → desired=SOCIAL, hold=5.0
|
||||
# With hold=5.0 and clock just advanced, pending since = now → no downgrade yet
|
||||
assert d.mode in (CameraMode.AWARE, CameraMode.SOCIAL)
|
||||
|
||||
def test_reset_to_full(self):
|
||||
f = _fsm()
|
||||
f.reset(CameraMode.FULL)
|
||||
assert f.mode == CameraMode.FULL
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# ModeDecision fields
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestModeDecisionFields:
|
||||
|
||||
def test_decision_has_mode(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=0.0)
|
||||
assert isinstance(d.mode, CameraMode)
|
||||
|
||||
def test_decision_has_sensors(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=0.0)
|
||||
assert isinstance(d.sensors, ActiveSensors)
|
||||
|
||||
def test_decision_sensors_match_mode(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=_SPD_FULL_UP + 1.0)
|
||||
assert d.sensors == MODE_SENSORS[CameraMode.FULL]
|
||||
|
||||
def test_decision_trigger_speed_recorded(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=2.5)
|
||||
assert abs(d.trigger_speed_mps - 2.5) < 1e-6
|
||||
|
||||
def test_decision_trigger_scenario_recorded(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=0.0, scenario='indoor')
|
||||
assert d.trigger_scenario == 'indoor'
|
||||
|
||||
def test_scenario_override_false_for_normal(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=1.0)
|
||||
assert d.scenario_override is False
|
||||
|
||||
def test_scenario_override_true_for_crossing(self):
|
||||
f = _fsm()
|
||||
d = f.update(speed_mps=0.0, scenario=Scenario.CROSSING)
|
||||
assert d.scenario_override is True
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Active sensor counts (RAM budget)
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestActiveSensors:
|
||||
|
||||
def test_active_sensor_count_non_negative(self):
|
||||
for m, s in MODE_SENSORS.items():
|
||||
assert s.active_count >= 0
|
||||
|
||||
def test_sleep_zero_sensors(self):
|
||||
assert MODE_SENSORS[CameraMode.SLEEP].active_count == 0
|
||||
|
||||
def test_full_seven_sensors(self):
|
||||
# csi x4 + realsense + lidar + uwb = 7
|
||||
assert MODE_SENSORS[CameraMode.FULL].active_count == 7
|
||||
|
||||
def test_active_sensors_correct(self):
|
||||
# csi_front + csi_rear + realsense + lidar + uwb = 5
|
||||
assert MODE_SENSORS[CameraMode.ACTIVE].active_count == 5
|
||||
|
||||
def test_aware_sensors_correct(self):
|
||||
# csi_front + realsense + lidar = 3
|
||||
assert MODE_SENSORS[CameraMode.AWARE].active_count == 3
|
||||
|
||||
def test_social_sensors_correct(self):
|
||||
# webcam = 1
|
||||
assert MODE_SENSORS[CameraMode.SOCIAL].active_count == 1
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Mode labels
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestModeLabels:
|
||||
|
||||
def test_all_modes_have_labels(self):
|
||||
for m in CameraMode:
|
||||
assert isinstance(m.label, str)
|
||||
assert len(m.label) > 0
|
||||
|
||||
def test_sleep_label(self):
|
||||
assert CameraMode.SLEEP.label == 'SLEEP'
|
||||
|
||||
def test_full_label(self):
|
||||
assert CameraMode.FULL.label == 'FULL'
|
||||
|
||||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Integration: typical ride scenario
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
class TestIntegrationRideScenario:
|
||||
"""Simulates a typical follow-me trip: start→walk→jog→sprint→crossing→indoor."""
|
||||
|
||||
def test_full_ride(self):
|
||||
clock = _FakeClock(0.0)
|
||||
f = _fsm(hold=2.0, idle=5.0, clock=clock)
|
||||
|
||||
# Starting up
|
||||
d = f.update(0.0, Scenario.OUTDOOR, 100.0)
|
||||
assert d.mode == CameraMode.SOCIAL
|
||||
|
||||
# Walking pace (~3 km/h)
|
||||
d = f.update(0.8, Scenario.OUTDOOR, 95.0)
|
||||
assert d.mode == CameraMode.AWARE
|
||||
|
||||
# Jogging (~7 km/h)
|
||||
d = f.update(2.0, Scenario.OUTDOOR, 90.0)
|
||||
assert d.mode == CameraMode.ACTIVE
|
||||
|
||||
# High-speed following (~20 km/h)
|
||||
d = f.update(5.6, Scenario.OUTDOOR, 85.0)
|
||||
assert d.mode == CameraMode.FULL
|
||||
|
||||
# Street crossing — even at slow speed, stays FULL
|
||||
d = f.update(1.0, Scenario.CROSSING, 85.0)
|
||||
assert d.mode == CameraMode.FULL
|
||||
assert d.scenario_override is True
|
||||
|
||||
# Back to outdoor walk
|
||||
clock.advance(3.0)
|
||||
d = f.update(1.0, Scenario.OUTDOOR, 80.0)
|
||||
assert d.mode == CameraMode.FULL # hold not expired yet
|
||||
|
||||
clock.advance(2.1)
|
||||
d = f.update(0.8, Scenario.OUTDOOR, 80.0)
|
||||
assert d.mode == CameraMode.AWARE # hold expired, down to walk speed
|
||||
|
||||
# Enter supermarket (indoor cap)
|
||||
d = f.update(0.8, Scenario.INDOOR, 78.0)
|
||||
assert d.mode == CameraMode.AWARE
|
||||
|
||||
# Park and wait
|
||||
d = f.update(0.0, Scenario.PARKED, 75.0)
|
||||
assert d.mode == CameraMode.SOCIAL
|
||||
|
||||
# Low battery during fast follow
|
||||
d = f.update(5.6, Scenario.OUTDOOR, 15.0)
|
||||
assert d.mode == CameraMode.AWARE # battery cap
|
||||
@ -0,0 +1,26 @@
|
||||
std_msgs/Header header
|
||||
|
||||
# Current power mode (use constants below)
|
||||
uint8 mode
|
||||
uint8 MODE_SLEEP = 0 # charging / idle — minimal sensors
|
||||
uint8 MODE_SOCIAL = 1 # parked / socialising — webcam + face UI only
|
||||
uint8 MODE_AWARE = 2 # indoor / slow (<5 km/h) — front CSI + RealSense + LIDAR
|
||||
uint8 MODE_ACTIVE = 3 # sidewalk / bike path (5–15 km/h) — front+rear + RealSense + LIDAR + UWB
|
||||
uint8 MODE_FULL = 4 # street / high-speed (>15 km/h) or crossing — all sensors
|
||||
|
||||
string mode_name # human-readable label
|
||||
|
||||
# Active sensor flags for this mode
|
||||
bool csi_front
|
||||
bool csi_rear
|
||||
bool csi_left
|
||||
bool csi_right
|
||||
bool realsense
|
||||
bool lidar
|
||||
bool uwb
|
||||
bool webcam
|
||||
|
||||
# Transition metadata
|
||||
float32 trigger_speed_mps # speed that triggered the last transition
|
||||
string trigger_scenario # scenario that triggered the last transition
|
||||
bool scenario_override # true when scenario (not speed) forced the mode
|
||||
Loading…
x
Reference in New Issue
Block a user