feat: Diagnostics aggregator (Issue #658)

New saltybot_diagnostics_aggregator ROS2 package — unified health dashboard
aggregating telemetry into /saltybot/system_health JSON at 1 Hz.

Subscribes to /vesc/health, /diagnostics, /saltybot/safety_zone/status,
/saltybot/pose/status, /saltybot/uwb/status. Tracks 8 subsystems:
motors, battery, imu, uwb, lidar, camera, can_bus, estop.

- subsystem.py: SubsystemState, stale detection, transition log, severity
  ranking, keyword→subsystem routing (no ROS2 dependency)
- aggregator_node.py: ROS2 node, 1 Hz heartbeat, overall_status rollup,
  last_error field, 50-entry transition ring buffer
- test_aggregator.py: 46 unit tests, all passing
- config/aggregator_params.yaml, launch file

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-firmware 2026-03-17 11:38:52 -04:00
parent d9b4b10b90
commit cd263c385d
11 changed files with 853 additions and 0 deletions

View File

@ -0,0 +1,15 @@
# Diagnostics Aggregator — Issue #658
# Unified health dashboard aggregating telemetry from all SaltyBot subsystems.
diagnostics_aggregator:
ros__parameters:
# Publish rate for /saltybot/system_health (Hz)
heartbeat_hz: 1.0
# Seconds without a topic update before a subsystem is marked STALE
# Increase for sensors with lower publish rates (e.g. UWB at 5 Hz)
stale_timeout_s: 5.0
# Maximum number of state transitions kept in the in-memory ring buffer
# Last 10 transitions are included in each /saltybot/system_health publish
transition_log_size: 50

View File

@ -0,0 +1,44 @@
"""Launch file for diagnostics aggregator node."""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg_dir = get_package_share_directory("saltybot_diagnostics_aggregator")
config_file = os.path.join(pkg_dir, "config", "aggregator_params.yaml")
return LaunchDescription([
DeclareLaunchArgument(
"config_file",
default_value=config_file,
description="Path to aggregator_params.yaml",
),
DeclareLaunchArgument(
"heartbeat_hz",
default_value="1.0",
description="Publish rate for /saltybot/system_health (Hz)",
),
DeclareLaunchArgument(
"stale_timeout_s",
default_value="5.0",
description="Seconds without update before subsystem goes STALE",
),
Node(
package="saltybot_diagnostics_aggregator",
executable="diagnostics_aggregator_node",
name="diagnostics_aggregator",
output="screen",
parameters=[
LaunchConfiguration("config_file"),
{
"heartbeat_hz": LaunchConfiguration("heartbeat_hz"),
"stale_timeout_s": LaunchConfiguration("stale_timeout_s"),
},
],
),
])

View File

@ -0,0 +1,30 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_diagnostics_aggregator</name>
<version>0.1.0</version>
<description>
Diagnostics aggregator for SaltyBot — unified health dashboard node (Issue #658).
Subscribes to /vesc/health, /diagnostics, /saltybot/safety_zone/status,
/saltybot/pose/status, /saltybot/uwb/status. Aggregates into
/saltybot/system_health JSON at 1 Hz. Tracks motors, battery, imu, uwb,
lidar, camera, can_bus, estop subsystems with state-transition logging.
</description>
<maintainer email="sl-firmware@saltylab.local">sl-firmware</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>diagnostic_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,312 @@
#!/usr/bin/env python3
"""Diagnostics aggregator — unified health dashboard node (Issue #658).
Subscribes to telemetry and diagnostics topics from all subsystems, aggregates
them into a single /saltybot/system_health JSON publish at 1 Hz.
Subscribed topics
-----------------
/vesc/health (std_msgs/String) motor VESC health JSON
/diagnostics (diagnostic_msgs/DiagnosticArray) ROS diagnostics bus
/saltybot/safety_zone/status (std_msgs/String) safety / estop state
/saltybot/pose/status (std_msgs/String) IMU / pose estimate state
/saltybot/uwb/status (std_msgs/String) UWB positioning state
Published topics
----------------
/saltybot/system_health (std_msgs/String) JSON health dashboard at 1 Hz
JSON schema for /saltybot/system_health
----------------------------------------
{
"overall_status": "OK" | "WARN" | "ERROR" | "STALE",
"uptime_s": <float>,
"subsystems": {
"motors": { "status": ..., "message": ..., "age_s": ..., "previous_status": ... },
"battery": { ... },
"imu": { ... },
"uwb": { ... },
"lidar": { ... },
"camera": { ... },
"can_bus": { ... },
"estop": { ... }
},
"last_error": { "subsystem": ..., "message": ..., "timestamp": ... } | null,
"recent_transitions": [ { "subsystem", "from_status", "to_status",
"message", "timestamp_iso" }, ... ],
"timestamp": "<ISO-8601>"
}
Parameters
----------
heartbeat_hz float 1.0 Publish rate (Hz)
stale_timeout_s float 5.0 Seconds without update STALE
transition_log_size int 50 Max recent transitions kept in memory
"""
import json
import time
from collections import deque
from datetime import datetime, timezone
from typing import Optional
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from diagnostic_msgs.msg import DiagnosticArray
from .subsystem import (
SubsystemState,
Transition,
STATUS_OK,
STATUS_WARN,
STATUS_ERROR,
STATUS_STALE,
STATUS_UNKNOWN,
worse,
ros_level_to_status,
SUBSYSTEM_NAMES as _SUBSYSTEM_NAMES,
keyword_to_subsystem as _keyword_to_subsystem,
)
class DiagnosticsAggregatorNode(Node):
"""Unified health dashboard node."""
def __init__(self):
super().__init__("diagnostics_aggregator")
# ----------------------------------------------------------------
# Parameters
# ----------------------------------------------------------------
self.declare_parameter("heartbeat_hz", 1.0)
self.declare_parameter("stale_timeout_s", 5.0)
self.declare_parameter("transition_log_size", 50)
hz = float(self.get_parameter("heartbeat_hz").value)
stale_timeout = float(self.get_parameter("stale_timeout_s").value)
log_size = int(self.get_parameter("transition_log_size").value)
# ----------------------------------------------------------------
# Subsystem state table
# ----------------------------------------------------------------
self._subsystems: dict[str, SubsystemState] = {
name: SubsystemState(name=name, stale_timeout_s=stale_timeout)
for name in _SUBSYSTEM_NAMES
}
self._transitions: deque[Transition] = deque(maxlen=log_size)
self._last_error: Optional[dict] = None
self._start_mono = time.monotonic()
# ----------------------------------------------------------------
# Subscriptions
# ----------------------------------------------------------------
self.create_subscription(String, "/vesc/health",
self._on_vesc_health, 10)
self.create_subscription(DiagnosticArray, "/diagnostics",
self._on_diagnostics, 10)
self.create_subscription(String, "/saltybot/safety_zone/status",
self._on_safety_zone, 10)
self.create_subscription(String, "/saltybot/pose/status",
self._on_pose_status, 10)
self.create_subscription(String, "/saltybot/uwb/status",
self._on_uwb_status, 10)
# ----------------------------------------------------------------
# Publisher
# ----------------------------------------------------------------
self._pub = self.create_publisher(String, "/saltybot/system_health", 1)
# ----------------------------------------------------------------
# 1 Hz heartbeat timer
# ----------------------------------------------------------------
self.create_timer(1.0 / max(0.1, hz), self._on_timer)
self.get_logger().info(
f"diagnostics_aggregator: {len(_SUBSYSTEM_NAMES)} subsystems, "
f"heartbeat={hz} Hz, stale_timeout={stale_timeout} s"
)
# ----------------------------------------------------------------
# Topic callbacks
# ----------------------------------------------------------------
def _on_vesc_health(self, msg: String) -> None:
"""Parse /vesc/health JSON → motors subsystem."""
now = time.monotonic()
try:
d = json.loads(msg.data)
fault = d.get("fault_code", 0)
alive = d.get("alive", True)
if not alive:
status, message = STATUS_STALE, "VESC offline"
elif fault != 0:
status = STATUS_ERROR
message = f"VESC fault {d.get('fault_name', fault)}"
else:
status = STATUS_OK
message = (
f"RPM={d.get('rpm', '?')} "
f"I={d.get('current_a', '?')} A "
f"V={d.get('voltage_v', '?')} V"
)
self._update("motors", status, message, now)
except Exception as exc:
self.get_logger().debug(f"/vesc/health parse error: {exc}")
def _on_diagnostics(self, msg: DiagnosticArray) -> None:
"""Fan /diagnostics entries out to per-subsystem states."""
now = time.monotonic()
# Accumulate worst status per subsystem across all entries in this array
pending: dict[str, tuple[str, str]] = {} # subsystem → (status, message)
for ds in msg.status:
subsystem = _keyword_to_subsystem(ds.name)
if subsystem is None:
continue
status = ros_level_to_status(ds.level)
message = ds.message or ""
if subsystem not in pending:
pending[subsystem] = (status, message)
else:
prev_s, prev_m = pending[subsystem]
new_worst = worse(status, prev_s)
pending[subsystem] = (
new_worst,
message if new_worst == status else prev_m,
)
for subsystem, (status, message) in pending.items():
self._update(subsystem, status, message, now)
def _on_safety_zone(self, msg: String) -> None:
"""Parse /saltybot/safety_zone/status JSON → estop subsystem."""
now = time.monotonic()
try:
d = json.loads(msg.data)
triggered = d.get("estop_triggered", d.get("triggered", False))
active = d.get("safety_zone_active", d.get("active", True))
if triggered:
status, message = STATUS_ERROR, "E-stop triggered"
elif not active:
status, message = STATUS_WARN, "Safety zone inactive"
else:
status, message = STATUS_OK, "Safety zone active"
self._update("estop", status, message, now)
except Exception as exc:
self.get_logger().debug(f"/saltybot/safety_zone/status parse error: {exc}")
def _on_pose_status(self, msg: String) -> None:
"""Parse /saltybot/pose/status JSON → imu subsystem."""
now = time.monotonic()
try:
d = json.loads(msg.data)
status_str = d.get("status", "OK").upper()
if status_str not in (STATUS_OK, STATUS_WARN, STATUS_ERROR):
status_str = STATUS_WARN
message = d.get("message", d.get("msg", ""))
self._update("imu", status_str, message, now)
except Exception as exc:
self.get_logger().debug(f"/saltybot/pose/status parse error: {exc}")
def _on_uwb_status(self, msg: String) -> None:
"""Parse /saltybot/uwb/status JSON → uwb subsystem."""
now = time.monotonic()
try:
d = json.loads(msg.data)
status_str = d.get("status", "OK").upper()
if status_str not in (STATUS_OK, STATUS_WARN, STATUS_ERROR):
status_str = STATUS_WARN
message = d.get("message", d.get("msg", ""))
self._update("uwb", status_str, message, now)
except Exception as exc:
self.get_logger().debug(f"/saltybot/uwb/status parse error: {exc}")
# ----------------------------------------------------------------
# Internal helpers
# ----------------------------------------------------------------
def _update(self, subsystem: str, status: str, message: str, now: float) -> None:
"""Update subsystem state and record any transition."""
s = self._subsystems.get(subsystem)
if s is None:
return
transition = s.update(status, message, now)
if transition is not None:
self._transitions.append(transition)
self.get_logger().info(
f"[{subsystem}] {transition.from_status}{transition.to_status}: {message}"
)
if transition.to_status == STATUS_ERROR:
self._last_error = {
"subsystem": subsystem,
"message": message,
"timestamp": transition.timestamp_iso,
}
def _apply_stale_checks(self, now: float) -> None:
for s in self._subsystems.values():
transition = s.apply_stale_check(now)
if transition is not None:
self._transitions.append(transition)
self.get_logger().warn(
f"[{transition.subsystem}] went STALE (no data)"
)
# ----------------------------------------------------------------
# Heartbeat timer
# ----------------------------------------------------------------
def _on_timer(self) -> None:
now = time.monotonic()
self._apply_stale_checks(now)
# Overall status = worst across all subsystems
overall = STATUS_OK
for s in self._subsystems.values():
overall = worse(overall, s.status)
# UNKNOWN does not degrade overall if at least one subsystem is known
known = [s for s in self._subsystems.values() if s.status != STATUS_UNKNOWN]
if not known:
overall = STATUS_UNKNOWN
uptime = now - self._start_mono
payload = {
"overall_status": overall,
"uptime_s": round(uptime, 1),
"subsystems": {
name: s.to_dict(now)
for name, s in self._subsystems.items()
},
"last_error": self._last_error,
"recent_transitions": [
{
"subsystem": t.subsystem,
"from_status": t.from_status,
"to_status": t.to_status,
"message": t.message,
"timestamp": t.timestamp_iso,
}
for t in list(self._transitions)[-10:] # last 10 in the JSON
],
"timestamp": datetime.now(timezone.utc).isoformat(),
}
self._pub.publish(String(data=json.dumps(payload)))
def main(args=None):
rclpy.init(args=args)
node = DiagnosticsAggregatorNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,148 @@
"""Subsystem state tracking for the diagnostics aggregator.
Each SubsystemState tracks one logical subsystem (motors, battery, etc.)
across potentially multiple source topics. Status priority:
ERROR > WARN > STALE > OK > UNKNOWN
"""
import time
from dataclasses import dataclass, field
from typing import Optional
# ---------------------------------------------------------------------------
# Status constants — ordered by severity (higher index = more severe)
# ---------------------------------------------------------------------------
STATUS_UNKNOWN = "UNKNOWN"
STATUS_OK = "OK"
STATUS_STALE = "STALE"
STATUS_WARN = "WARN"
STATUS_ERROR = "ERROR"
# ---------------------------------------------------------------------------
# Subsystem registry and diagnostic keyword routing
# ---------------------------------------------------------------------------
SUBSYSTEM_NAMES: list[str] = [
"motors", "battery", "imu", "uwb", "lidar", "camera", "can_bus", "estop"
]
# (keywords-tuple, subsystem-name) — first match wins, lower-cased comparison
DIAG_KEYWORD_MAP: list[tuple[tuple[str, ...], str]] = [
(("vesc", "motor", "esc", "fsesc"), "motors"),
(("battery", "ina219", "lvc", "coulomb", "vbat"), "battery"),
(("imu", "mpu6000", "bno055", "icm42688", "gyro", "accel"), "imu"),
(("uwb", "dw1000", "dw3000"), "uwb"),
(("lidar", "rplidar", "sick", "laser"), "lidar"),
(("camera", "realsense", "oak", "webcam"), "camera"),
(("can", "can_bus", "can_driver"), "can_bus"),
(("estop", "safety", "emergency", "e-stop"), "estop"),
]
def keyword_to_subsystem(name: str) -> Optional[str]:
"""Map a diagnostic status name to a subsystem key, or None."""
lower = name.lower()
for keywords, subsystem in DIAG_KEYWORD_MAP:
if any(kw in lower for kw in keywords):
return subsystem
return None
_SEVERITY: dict[str, int] = {
STATUS_UNKNOWN: 0,
STATUS_OK: 1,
STATUS_STALE: 2,
STATUS_WARN: 3,
STATUS_ERROR: 4,
}
def worse(a: str, b: str) -> str:
"""Return the more severe of two status strings."""
return a if _SEVERITY.get(a, 0) >= _SEVERITY.get(b, 0) else b
def ros_level_to_status(level: int) -> str:
"""Convert diagnostic_msgs/DiagnosticStatus.level to a status string."""
return {0: STATUS_OK, 1: STATUS_WARN, 2: STATUS_ERROR}.get(level, STATUS_UNKNOWN)
# ---------------------------------------------------------------------------
# Transition log entry
# ---------------------------------------------------------------------------
@dataclass
class Transition:
"""A logged status change for one subsystem."""
subsystem: str
from_status: str
to_status: str
message: str
timestamp_iso: str # ISO-8601 wall-clock
monotonic_s: float # time.monotonic() at the transition
# ---------------------------------------------------------------------------
# Per-subsystem state
# ---------------------------------------------------------------------------
@dataclass
class SubsystemState:
"""Live health state for one logical subsystem."""
name: str
stale_timeout_s: float = 5.0 # seconds without update → STALE
# Mutable state
status: str = STATUS_UNKNOWN
message: str = ""
last_updated_mono: float = field(default_factory=lambda: 0.0)
last_change_mono: float = field(default_factory=lambda: 0.0)
previous_status: str = STATUS_UNKNOWN
def update(self, new_status: str, message: str, now_mono: float) -> Optional[Transition]:
"""Apply a new status, record a transition if the status changed.
Returns a Transition if the status changed, else None.
"""
from datetime import datetime, timezone
self.last_updated_mono = now_mono
if new_status == self.status:
self.message = message
return None
transition = Transition(
subsystem=self.name,
from_status=self.status,
to_status=new_status,
message=message,
timestamp_iso=datetime.now(timezone.utc).isoformat(),
monotonic_s=now_mono,
)
self.previous_status = self.status
self.status = new_status
self.message = message
self.last_change_mono = now_mono
return transition
def apply_stale_check(self, now_mono: float) -> Optional[Transition]:
"""Mark STALE if no update received within stale_timeout_s.
Returns a Transition if newly staled, else None.
"""
if self.last_updated_mono == 0.0:
# Never received any data — leave as UNKNOWN
return None
if (now_mono - self.last_updated_mono) > self.stale_timeout_s:
if self.status not in (STATUS_STALE, STATUS_UNKNOWN):
return self.update(STATUS_STALE, "No data received", now_mono)
return None
def to_dict(self, now_mono: float) -> dict:
age = (now_mono - self.last_updated_mono) if self.last_updated_mono > 0 else None
return {
"status": self.status,
"message": self.message,
"age_s": round(age, 2) if age is not None else None,
"previous_status": self.previous_status,
}

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_diagnostics_aggregator
[install]
install_scripts=$base/lib/saltybot_diagnostics_aggregator

View File

@ -0,0 +1,28 @@
from setuptools import setup
package_name = "saltybot_diagnostics_aggregator"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/diagnostics_aggregator.launch.py"]),
(f"share/{package_name}/config", ["config/aggregator_params.yaml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-firmware",
maintainer_email="sl-firmware@saltylab.local",
description="Diagnostics aggregator — unified health dashboard for SaltyBot",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"diagnostics_aggregator_node = "
"saltybot_diagnostics_aggregator.aggregator_node:main",
],
},
)

View File

@ -0,0 +1,272 @@
"""Unit tests for diagnostics aggregator — subsystem logic and routing.
All pure-function tests; no ROS2 or live topics required.
"""
import time
import json
import pytest
from saltybot_diagnostics_aggregator.subsystem import (
SubsystemState,
Transition,
STATUS_OK,
STATUS_WARN,
STATUS_ERROR,
STATUS_STALE,
STATUS_UNKNOWN,
worse,
ros_level_to_status,
keyword_to_subsystem as _keyword_to_subsystem,
SUBSYSTEM_NAMES as _SUBSYSTEM_NAMES,
)
# ---------------------------------------------------------------------------
# worse()
# ---------------------------------------------------------------------------
class TestWorse:
def test_error_beats_warn(self):
assert worse(STATUS_ERROR, STATUS_WARN) == STATUS_ERROR
def test_warn_beats_ok(self):
assert worse(STATUS_WARN, STATUS_OK) == STATUS_WARN
def test_stale_beats_ok(self):
assert worse(STATUS_STALE, STATUS_OK) == STATUS_STALE
def test_warn_beats_stale(self):
assert worse(STATUS_WARN, STATUS_STALE) == STATUS_WARN
def test_error_beats_stale(self):
assert worse(STATUS_ERROR, STATUS_STALE) == STATUS_ERROR
def test_ok_vs_ok(self):
assert worse(STATUS_OK, STATUS_OK) == STATUS_OK
def test_error_vs_error(self):
assert worse(STATUS_ERROR, STATUS_ERROR) == STATUS_ERROR
def test_unknown_loses_to_ok(self):
assert worse(STATUS_OK, STATUS_UNKNOWN) == STATUS_OK
def test_symmetric(self):
for a in (STATUS_OK, STATUS_WARN, STATUS_ERROR, STATUS_STALE):
for b in (STATUS_OK, STATUS_WARN, STATUS_ERROR, STATUS_STALE):
assert worse(a, b) == worse(b, a) or True # just ensure no crash
# ---------------------------------------------------------------------------
# ros_level_to_status()
# ---------------------------------------------------------------------------
class TestRosLevelToStatus:
def test_level_0_is_ok(self):
assert ros_level_to_status(0) == STATUS_OK
def test_level_1_is_warn(self):
assert ros_level_to_status(1) == STATUS_WARN
def test_level_2_is_error(self):
assert ros_level_to_status(2) == STATUS_ERROR
def test_unknown_level(self):
assert ros_level_to_status(99) == STATUS_UNKNOWN
def test_negative_level(self):
assert ros_level_to_status(-1) == STATUS_UNKNOWN
# ---------------------------------------------------------------------------
# _keyword_to_subsystem()
# ---------------------------------------------------------------------------
class TestKeywordToSubsystem:
def test_vesc_maps_to_motors(self):
assert _keyword_to_subsystem("VESC/left (CAN ID 61)") == "motors"
def test_motor_maps_to_motors(self):
assert _keyword_to_subsystem("motor_controller") == "motors"
def test_battery_maps_to_battery(self):
assert _keyword_to_subsystem("battery_monitor") == "battery"
def test_ina219_maps_to_battery(self):
assert _keyword_to_subsystem("INA219 current sensor") == "battery"
def test_lvc_maps_to_battery(self):
assert _keyword_to_subsystem("lvc_cutoff") == "battery"
def test_imu_maps_to_imu(self):
assert _keyword_to_subsystem("IMU/mpu6000") == "imu"
def test_mpu6000_maps_to_imu(self):
assert _keyword_to_subsystem("mpu6000 driver") == "imu"
def test_uwb_maps_to_uwb(self):
assert _keyword_to_subsystem("UWB anchor 0") == "uwb"
def test_rplidar_maps_to_lidar(self):
assert _keyword_to_subsystem("rplidar_node") == "lidar"
def test_lidar_maps_to_lidar(self):
assert _keyword_to_subsystem("lidar/scan") == "lidar"
def test_realsense_maps_to_camera(self):
assert _keyword_to_subsystem("RealSense D435i") == "camera"
def test_camera_maps_to_camera(self):
assert _keyword_to_subsystem("camera_health") == "camera"
def test_can_maps_to_can_bus(self):
assert _keyword_to_subsystem("can_driver stats") == "can_bus"
def test_estop_maps_to_estop(self):
assert _keyword_to_subsystem("estop_monitor") == "estop"
def test_safety_maps_to_estop(self):
assert _keyword_to_subsystem("safety_zone") == "estop"
def test_unknown_returns_none(self):
assert _keyword_to_subsystem("totally_unrelated_sensor") is None
def test_case_insensitive(self):
assert _keyword_to_subsystem("RPLIDAR A2") == "lidar"
assert _keyword_to_subsystem("IMU_CALIBRATION") == "imu"
# ---------------------------------------------------------------------------
# SubsystemState.update()
# ---------------------------------------------------------------------------
class TestSubsystemStateUpdate:
def _make(self) -> SubsystemState:
return SubsystemState(name="motors", stale_timeout_s=5.0)
def test_initial_state(self):
s = self._make()
assert s.status == STATUS_UNKNOWN
assert s.message == ""
assert s.previous_status == STATUS_UNKNOWN
def test_first_update_creates_transition(self):
s = self._make()
t = s.update(STATUS_OK, "all good", time.monotonic())
assert t is not None
assert t.from_status == STATUS_UNKNOWN
assert t.to_status == STATUS_OK
assert t.subsystem == "motors"
def test_same_status_no_transition(self):
s = self._make()
s.update(STATUS_OK, "good", time.monotonic())
t = s.update(STATUS_OK, "still good", time.monotonic())
assert t is None
def test_status_change_produces_transition(self):
s = self._make()
now = time.monotonic()
s.update(STATUS_OK, "good", now)
t = s.update(STATUS_ERROR, "fault", now + 1)
assert t is not None
assert t.from_status == STATUS_OK
assert t.to_status == STATUS_ERROR
assert t.message == "fault"
def test_previous_status_saved(self):
s = self._make()
now = time.monotonic()
s.update(STATUS_OK, "good", now)
s.update(STATUS_WARN, "warn", now + 1)
assert s.previous_status == STATUS_OK
assert s.status == STATUS_WARN
def test_last_updated_advances(self):
s = self._make()
t1 = time.monotonic()
s.update(STATUS_OK, "x", t1)
assert s.last_updated_mono == pytest.approx(t1)
t2 = t1 + 1.0
s.update(STATUS_OK, "y", t2)
assert s.last_updated_mono == pytest.approx(t2)
def test_transition_has_iso_timestamp(self):
s = self._make()
t = s.update(STATUS_OK, "good", time.monotonic())
assert t is not None
assert "T" in t.timestamp_iso # ISO-8601 contains 'T'
# ---------------------------------------------------------------------------
# SubsystemState.apply_stale_check()
# ---------------------------------------------------------------------------
class TestSubsystemStateStale:
def test_never_updated_stays_unknown(self):
s = SubsystemState(name="imu", stale_timeout_s=2.0)
t = s.apply_stale_check(time.monotonic() + 100)
assert t is None
assert s.status == STATUS_UNKNOWN
def test_fresh_data_not_stale(self):
s = SubsystemState(name="imu", stale_timeout_s=5.0)
now = time.monotonic()
s.update(STATUS_OK, "good", now)
t = s.apply_stale_check(now + 3.0) # 3s < 5s timeout
assert t is None
assert s.status == STATUS_OK
def test_old_data_goes_stale(self):
s = SubsystemState(name="imu", stale_timeout_s=5.0)
now = time.monotonic()
s.update(STATUS_OK, "good", now)
t = s.apply_stale_check(now + 6.0) # 6s > 5s timeout
assert t is not None
assert t.to_status == STATUS_STALE
def test_already_stale_no_duplicate_transition(self):
s = SubsystemState(name="imu", stale_timeout_s=5.0)
now = time.monotonic()
s.update(STATUS_OK, "good", now)
s.apply_stale_check(now + 6.0) # → STALE
t2 = s.apply_stale_check(now + 7.0) # already STALE
assert t2 is None
# ---------------------------------------------------------------------------
# SubsystemState.to_dict()
# ---------------------------------------------------------------------------
class TestSubsystemStateToDict:
def test_unknown_state(self):
s = SubsystemState(name="uwb")
d = s.to_dict(time.monotonic())
assert d["status"] == STATUS_UNKNOWN
assert d["age_s"] is None
def test_known_state_has_age(self):
s = SubsystemState(name="uwb", stale_timeout_s=5.0)
now = time.monotonic()
s.update(STATUS_OK, "ok", now)
d = s.to_dict(now + 1.5)
assert d["status"] == STATUS_OK
assert d["age_s"] == pytest.approx(1.5, abs=0.01)
assert d["message"] == "ok"
# ---------------------------------------------------------------------------
# _SUBSYSTEM_NAMES completeness
# ---------------------------------------------------------------------------
class TestSubsystemNames:
def test_all_required_subsystems_present(self):
required = {"motors", "battery", "imu", "uwb", "lidar", "camera", "can_bus", "estop"}
assert required.issubset(set(_SUBSYSTEM_NAMES))
def test_no_duplicates(self):
assert len(_SUBSYSTEM_NAMES) == len(set(_SUBSYSTEM_NAMES))
if __name__ == "__main__":
pytest.main([__file__, "-v"])