diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/config/aggregator_params.yaml b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/config/aggregator_params.yaml
new file mode 100644
index 0000000..6767e63
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/config/aggregator_params.yaml
@@ -0,0 +1,15 @@
+# Diagnostics Aggregator — Issue #658
+# Unified health dashboard aggregating telemetry from all SaltyBot subsystems.
+
+diagnostics_aggregator:
+ ros__parameters:
+ # Publish rate for /saltybot/system_health (Hz)
+ heartbeat_hz: 1.0
+
+ # Seconds without a topic update before a subsystem is marked STALE
+ # Increase for sensors with lower publish rates (e.g. UWB at 5 Hz)
+ stale_timeout_s: 5.0
+
+ # Maximum number of state transitions kept in the in-memory ring buffer
+ # Last 10 transitions are included in each /saltybot/system_health publish
+ transition_log_size: 50
diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/launch/diagnostics_aggregator.launch.py b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/launch/diagnostics_aggregator.launch.py
new file mode 100644
index 0000000..9e8e86e
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/launch/diagnostics_aggregator.launch.py
@@ -0,0 +1,44 @@
+"""Launch file for diagnostics aggregator node."""
+
+import os
+from ament_index_python.packages import get_package_share_directory
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument
+from launch.substitutions import LaunchConfiguration
+from launch_ros.actions import Node
+
+
+def generate_launch_description():
+ pkg_dir = get_package_share_directory("saltybot_diagnostics_aggregator")
+ config_file = os.path.join(pkg_dir, "config", "aggregator_params.yaml")
+
+ return LaunchDescription([
+ DeclareLaunchArgument(
+ "config_file",
+ default_value=config_file,
+ description="Path to aggregator_params.yaml",
+ ),
+ DeclareLaunchArgument(
+ "heartbeat_hz",
+ default_value="1.0",
+ description="Publish rate for /saltybot/system_health (Hz)",
+ ),
+ DeclareLaunchArgument(
+ "stale_timeout_s",
+ default_value="5.0",
+ description="Seconds without update before subsystem goes STALE",
+ ),
+ Node(
+ package="saltybot_diagnostics_aggregator",
+ executable="diagnostics_aggregator_node",
+ name="diagnostics_aggregator",
+ output="screen",
+ parameters=[
+ LaunchConfiguration("config_file"),
+ {
+ "heartbeat_hz": LaunchConfiguration("heartbeat_hz"),
+ "stale_timeout_s": LaunchConfiguration("stale_timeout_s"),
+ },
+ ],
+ ),
+ ])
diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/package.xml b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/package.xml
new file mode 100644
index 0000000..089e464
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/package.xml
@@ -0,0 +1,30 @@
+
+
+
+ saltybot_diagnostics_aggregator
+ 0.1.0
+
+ Diagnostics aggregator for SaltyBot — unified health dashboard node (Issue #658).
+ Subscribes to /vesc/health, /diagnostics, /saltybot/safety_zone/status,
+ /saltybot/pose/status, /saltybot/uwb/status. Aggregates into
+ /saltybot/system_health JSON at 1 Hz. Tracks motors, battery, imu, uwb,
+ lidar, camera, can_bus, estop subsystems with state-transition logging.
+
+ sl-firmware
+ MIT
+
+ rclpy
+ std_msgs
+ diagnostic_msgs
+
+ ament_python
+
+ ament_copyright
+ ament_flake8
+ ament_pep257
+ python3-pytest
+
+
+ ament_python
+
+
diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/resource/saltybot_diagnostics_aggregator b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/resource/saltybot_diagnostics_aggregator
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/__init__.py b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/aggregator_node.py b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/aggregator_node.py
new file mode 100644
index 0000000..73f9422
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/aggregator_node.py
@@ -0,0 +1,312 @@
+#!/usr/bin/env python3
+"""Diagnostics aggregator — unified health dashboard node (Issue #658).
+
+Subscribes to telemetry and diagnostics topics from all subsystems, aggregates
+them into a single /saltybot/system_health JSON publish at 1 Hz.
+
+Subscribed topics
+-----------------
+ /vesc/health (std_msgs/String) — motor VESC health JSON
+ /diagnostics (diagnostic_msgs/DiagnosticArray) — ROS diagnostics bus
+ /saltybot/safety_zone/status (std_msgs/String) — safety / estop state
+ /saltybot/pose/status (std_msgs/String) — IMU / pose estimate state
+ /saltybot/uwb/status (std_msgs/String) — UWB positioning state
+
+Published topics
+----------------
+ /saltybot/system_health (std_msgs/String) — JSON health dashboard at 1 Hz
+
+JSON schema for /saltybot/system_health
+----------------------------------------
+{
+ "overall_status": "OK" | "WARN" | "ERROR" | "STALE",
+ "uptime_s": ,
+ "subsystems": {
+ "motors": { "status": ..., "message": ..., "age_s": ..., "previous_status": ... },
+ "battery": { ... },
+ "imu": { ... },
+ "uwb": { ... },
+ "lidar": { ... },
+ "camera": { ... },
+ "can_bus": { ... },
+ "estop": { ... }
+ },
+ "last_error": { "subsystem": ..., "message": ..., "timestamp": ... } | null,
+ "recent_transitions": [ { "subsystem", "from_status", "to_status",
+ "message", "timestamp_iso" }, ... ],
+ "timestamp": ""
+}
+
+Parameters
+----------
+ heartbeat_hz float 1.0 Publish rate (Hz)
+ stale_timeout_s float 5.0 Seconds without update → STALE
+ transition_log_size int 50 Max recent transitions kept in memory
+"""
+
+import json
+import time
+from collections import deque
+from datetime import datetime, timezone
+from typing import Optional
+
+import rclpy
+from rclpy.node import Node
+from std_msgs.msg import String
+from diagnostic_msgs.msg import DiagnosticArray
+
+from .subsystem import (
+ SubsystemState,
+ Transition,
+ STATUS_OK,
+ STATUS_WARN,
+ STATUS_ERROR,
+ STATUS_STALE,
+ STATUS_UNKNOWN,
+ worse,
+ ros_level_to_status,
+ SUBSYSTEM_NAMES as _SUBSYSTEM_NAMES,
+ keyword_to_subsystem as _keyword_to_subsystem,
+)
+
+
+class DiagnosticsAggregatorNode(Node):
+ """Unified health dashboard node."""
+
+ def __init__(self):
+ super().__init__("diagnostics_aggregator")
+
+ # ----------------------------------------------------------------
+ # Parameters
+ # ----------------------------------------------------------------
+ self.declare_parameter("heartbeat_hz", 1.0)
+ self.declare_parameter("stale_timeout_s", 5.0)
+ self.declare_parameter("transition_log_size", 50)
+
+ hz = float(self.get_parameter("heartbeat_hz").value)
+ stale_timeout = float(self.get_parameter("stale_timeout_s").value)
+ log_size = int(self.get_parameter("transition_log_size").value)
+
+ # ----------------------------------------------------------------
+ # Subsystem state table
+ # ----------------------------------------------------------------
+ self._subsystems: dict[str, SubsystemState] = {
+ name: SubsystemState(name=name, stale_timeout_s=stale_timeout)
+ for name in _SUBSYSTEM_NAMES
+ }
+ self._transitions: deque[Transition] = deque(maxlen=log_size)
+ self._last_error: Optional[dict] = None
+ self._start_mono = time.monotonic()
+
+ # ----------------------------------------------------------------
+ # Subscriptions
+ # ----------------------------------------------------------------
+ self.create_subscription(String, "/vesc/health",
+ self._on_vesc_health, 10)
+ self.create_subscription(DiagnosticArray, "/diagnostics",
+ self._on_diagnostics, 10)
+ self.create_subscription(String, "/saltybot/safety_zone/status",
+ self._on_safety_zone, 10)
+ self.create_subscription(String, "/saltybot/pose/status",
+ self._on_pose_status, 10)
+ self.create_subscription(String, "/saltybot/uwb/status",
+ self._on_uwb_status, 10)
+
+ # ----------------------------------------------------------------
+ # Publisher
+ # ----------------------------------------------------------------
+ self._pub = self.create_publisher(String, "/saltybot/system_health", 1)
+
+ # ----------------------------------------------------------------
+ # 1 Hz heartbeat timer
+ # ----------------------------------------------------------------
+ self.create_timer(1.0 / max(0.1, hz), self._on_timer)
+
+ self.get_logger().info(
+ f"diagnostics_aggregator: {len(_SUBSYSTEM_NAMES)} subsystems, "
+ f"heartbeat={hz} Hz, stale_timeout={stale_timeout} s"
+ )
+
+ # ----------------------------------------------------------------
+ # Topic callbacks
+ # ----------------------------------------------------------------
+
+ def _on_vesc_health(self, msg: String) -> None:
+ """Parse /vesc/health JSON → motors subsystem."""
+ now = time.monotonic()
+ try:
+ d = json.loads(msg.data)
+ fault = d.get("fault_code", 0)
+ alive = d.get("alive", True)
+ if not alive:
+ status, message = STATUS_STALE, "VESC offline"
+ elif fault != 0:
+ status = STATUS_ERROR
+ message = f"VESC fault {d.get('fault_name', fault)}"
+ else:
+ status = STATUS_OK
+ message = (
+ f"RPM={d.get('rpm', '?')} "
+ f"I={d.get('current_a', '?')} A "
+ f"V={d.get('voltage_v', '?')} V"
+ )
+ self._update("motors", status, message, now)
+ except Exception as exc:
+ self.get_logger().debug(f"/vesc/health parse error: {exc}")
+
+ def _on_diagnostics(self, msg: DiagnosticArray) -> None:
+ """Fan /diagnostics entries out to per-subsystem states."""
+ now = time.monotonic()
+ # Accumulate worst status per subsystem across all entries in this array
+ pending: dict[str, tuple[str, str]] = {} # subsystem → (status, message)
+
+ for ds in msg.status:
+ subsystem = _keyword_to_subsystem(ds.name)
+ if subsystem is None:
+ continue
+ status = ros_level_to_status(ds.level)
+ message = ds.message or ""
+ if subsystem not in pending:
+ pending[subsystem] = (status, message)
+ else:
+ prev_s, prev_m = pending[subsystem]
+ new_worst = worse(status, prev_s)
+ pending[subsystem] = (
+ new_worst,
+ message if new_worst == status else prev_m,
+ )
+
+ for subsystem, (status, message) in pending.items():
+ self._update(subsystem, status, message, now)
+
+ def _on_safety_zone(self, msg: String) -> None:
+ """Parse /saltybot/safety_zone/status JSON → estop subsystem."""
+ now = time.monotonic()
+ try:
+ d = json.loads(msg.data)
+ triggered = d.get("estop_triggered", d.get("triggered", False))
+ active = d.get("safety_zone_active", d.get("active", True))
+ if triggered:
+ status, message = STATUS_ERROR, "E-stop triggered"
+ elif not active:
+ status, message = STATUS_WARN, "Safety zone inactive"
+ else:
+ status, message = STATUS_OK, "Safety zone active"
+ self._update("estop", status, message, now)
+ except Exception as exc:
+ self.get_logger().debug(f"/saltybot/safety_zone/status parse error: {exc}")
+
+ def _on_pose_status(self, msg: String) -> None:
+ """Parse /saltybot/pose/status JSON → imu subsystem."""
+ now = time.monotonic()
+ try:
+ d = json.loads(msg.data)
+ status_str = d.get("status", "OK").upper()
+ if status_str not in (STATUS_OK, STATUS_WARN, STATUS_ERROR):
+ status_str = STATUS_WARN
+ message = d.get("message", d.get("msg", ""))
+ self._update("imu", status_str, message, now)
+ except Exception as exc:
+ self.get_logger().debug(f"/saltybot/pose/status parse error: {exc}")
+
+ def _on_uwb_status(self, msg: String) -> None:
+ """Parse /saltybot/uwb/status JSON → uwb subsystem."""
+ now = time.monotonic()
+ try:
+ d = json.loads(msg.data)
+ status_str = d.get("status", "OK").upper()
+ if status_str not in (STATUS_OK, STATUS_WARN, STATUS_ERROR):
+ status_str = STATUS_WARN
+ message = d.get("message", d.get("msg", ""))
+ self._update("uwb", status_str, message, now)
+ except Exception as exc:
+ self.get_logger().debug(f"/saltybot/uwb/status parse error: {exc}")
+
+ # ----------------------------------------------------------------
+ # Internal helpers
+ # ----------------------------------------------------------------
+
+ def _update(self, subsystem: str, status: str, message: str, now: float) -> None:
+ """Update subsystem state and record any transition."""
+ s = self._subsystems.get(subsystem)
+ if s is None:
+ return
+ transition = s.update(status, message, now)
+ if transition is not None:
+ self._transitions.append(transition)
+ self.get_logger().info(
+ f"[{subsystem}] {transition.from_status} → {transition.to_status}: {message}"
+ )
+ if transition.to_status == STATUS_ERROR:
+ self._last_error = {
+ "subsystem": subsystem,
+ "message": message,
+ "timestamp": transition.timestamp_iso,
+ }
+
+ def _apply_stale_checks(self, now: float) -> None:
+ for s in self._subsystems.values():
+ transition = s.apply_stale_check(now)
+ if transition is not None:
+ self._transitions.append(transition)
+ self.get_logger().warn(
+ f"[{transition.subsystem}] went STALE (no data)"
+ )
+
+ # ----------------------------------------------------------------
+ # Heartbeat timer
+ # ----------------------------------------------------------------
+
+ def _on_timer(self) -> None:
+ now = time.monotonic()
+ self._apply_stale_checks(now)
+
+ # Overall status = worst across all subsystems
+ overall = STATUS_OK
+ for s in self._subsystems.values():
+ overall = worse(overall, s.status)
+ # UNKNOWN does not degrade overall if at least one subsystem is known
+ known = [s for s in self._subsystems.values() if s.status != STATUS_UNKNOWN]
+ if not known:
+ overall = STATUS_UNKNOWN
+
+ uptime = now - self._start_mono
+
+ payload = {
+ "overall_status": overall,
+ "uptime_s": round(uptime, 1),
+ "subsystems": {
+ name: s.to_dict(now)
+ for name, s in self._subsystems.items()
+ },
+ "last_error": self._last_error,
+ "recent_transitions": [
+ {
+ "subsystem": t.subsystem,
+ "from_status": t.from_status,
+ "to_status": t.to_status,
+ "message": t.message,
+ "timestamp": t.timestamp_iso,
+ }
+ for t in list(self._transitions)[-10:] # last 10 in the JSON
+ ],
+ "timestamp": datetime.now(timezone.utc).isoformat(),
+ }
+
+ self._pub.publish(String(data=json.dumps(payload)))
+
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = DiagnosticsAggregatorNode()
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/subsystem.py b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/subsystem.py
new file mode 100644
index 0000000..d84180c
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/subsystem.py
@@ -0,0 +1,148 @@
+"""Subsystem state tracking for the diagnostics aggregator.
+
+Each SubsystemState tracks one logical subsystem (motors, battery, etc.)
+across potentially multiple source topics. Status priority:
+ ERROR > WARN > STALE > OK > UNKNOWN
+"""
+
+import time
+from dataclasses import dataclass, field
+from typing import Optional
+
+# ---------------------------------------------------------------------------
+# Status constants — ordered by severity (higher index = more severe)
+# ---------------------------------------------------------------------------
+STATUS_UNKNOWN = "UNKNOWN"
+STATUS_OK = "OK"
+STATUS_STALE = "STALE"
+STATUS_WARN = "WARN"
+STATUS_ERROR = "ERROR"
+
+# ---------------------------------------------------------------------------
+# Subsystem registry and diagnostic keyword routing
+# ---------------------------------------------------------------------------
+SUBSYSTEM_NAMES: list[str] = [
+ "motors", "battery", "imu", "uwb", "lidar", "camera", "can_bus", "estop"
+]
+
+# (keywords-tuple, subsystem-name) — first match wins, lower-cased comparison
+DIAG_KEYWORD_MAP: list[tuple[tuple[str, ...], str]] = [
+ (("vesc", "motor", "esc", "fsesc"), "motors"),
+ (("battery", "ina219", "lvc", "coulomb", "vbat"), "battery"),
+ (("imu", "mpu6000", "bno055", "icm42688", "gyro", "accel"), "imu"),
+ (("uwb", "dw1000", "dw3000"), "uwb"),
+ (("lidar", "rplidar", "sick", "laser"), "lidar"),
+ (("camera", "realsense", "oak", "webcam"), "camera"),
+ (("can", "can_bus", "can_driver"), "can_bus"),
+ (("estop", "safety", "emergency", "e-stop"), "estop"),
+]
+
+
+def keyword_to_subsystem(name: str) -> Optional[str]:
+ """Map a diagnostic status name to a subsystem key, or None."""
+ lower = name.lower()
+ for keywords, subsystem in DIAG_KEYWORD_MAP:
+ if any(kw in lower for kw in keywords):
+ return subsystem
+ return None
+
+
+_SEVERITY: dict[str, int] = {
+ STATUS_UNKNOWN: 0,
+ STATUS_OK: 1,
+ STATUS_STALE: 2,
+ STATUS_WARN: 3,
+ STATUS_ERROR: 4,
+}
+
+
+def worse(a: str, b: str) -> str:
+ """Return the more severe of two status strings."""
+ return a if _SEVERITY.get(a, 0) >= _SEVERITY.get(b, 0) else b
+
+
+def ros_level_to_status(level: int) -> str:
+ """Convert diagnostic_msgs/DiagnosticStatus.level to a status string."""
+ return {0: STATUS_OK, 1: STATUS_WARN, 2: STATUS_ERROR}.get(level, STATUS_UNKNOWN)
+
+
+# ---------------------------------------------------------------------------
+# Transition log entry
+# ---------------------------------------------------------------------------
+@dataclass
+class Transition:
+ """A logged status change for one subsystem."""
+ subsystem: str
+ from_status: str
+ to_status: str
+ message: str
+ timestamp_iso: str # ISO-8601 wall-clock
+ monotonic_s: float # time.monotonic() at the transition
+
+
+# ---------------------------------------------------------------------------
+# Per-subsystem state
+# ---------------------------------------------------------------------------
+@dataclass
+class SubsystemState:
+ """Live health state for one logical subsystem."""
+
+ name: str
+ stale_timeout_s: float = 5.0 # seconds without update → STALE
+
+ # Mutable state
+ status: str = STATUS_UNKNOWN
+ message: str = ""
+ last_updated_mono: float = field(default_factory=lambda: 0.0)
+ last_change_mono: float = field(default_factory=lambda: 0.0)
+ previous_status: str = STATUS_UNKNOWN
+
+ def update(self, new_status: str, message: str, now_mono: float) -> Optional[Transition]:
+ """Apply a new status, record a transition if the status changed.
+
+ Returns a Transition if the status changed, else None.
+ """
+ from datetime import datetime, timezone
+
+ self.last_updated_mono = now_mono
+
+ if new_status == self.status:
+ self.message = message
+ return None
+
+ transition = Transition(
+ subsystem=self.name,
+ from_status=self.status,
+ to_status=new_status,
+ message=message,
+ timestamp_iso=datetime.now(timezone.utc).isoformat(),
+ monotonic_s=now_mono,
+ )
+
+ self.previous_status = self.status
+ self.status = new_status
+ self.message = message
+ self.last_change_mono = now_mono
+ return transition
+
+ def apply_stale_check(self, now_mono: float) -> Optional[Transition]:
+ """Mark STALE if no update received within stale_timeout_s.
+
+ Returns a Transition if newly staled, else None.
+ """
+ if self.last_updated_mono == 0.0:
+ # Never received any data — leave as UNKNOWN
+ return None
+ if (now_mono - self.last_updated_mono) > self.stale_timeout_s:
+ if self.status not in (STATUS_STALE, STATUS_UNKNOWN):
+ return self.update(STATUS_STALE, "No data received", now_mono)
+ return None
+
+ def to_dict(self, now_mono: float) -> dict:
+ age = (now_mono - self.last_updated_mono) if self.last_updated_mono > 0 else None
+ return {
+ "status": self.status,
+ "message": self.message,
+ "age_s": round(age, 2) if age is not None else None,
+ "previous_status": self.previous_status,
+ }
diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/setup.cfg b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/setup.cfg
new file mode 100644
index 0000000..03e3b13
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/setup.cfg
@@ -0,0 +1,4 @@
+[develop]
+script_dir=$base/lib/saltybot_diagnostics_aggregator
+[install]
+install_scripts=$base/lib/saltybot_diagnostics_aggregator
diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/setup.py b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/setup.py
new file mode 100644
index 0000000..18e1cae
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/setup.py
@@ -0,0 +1,28 @@
+from setuptools import setup
+
+package_name = "saltybot_diagnostics_aggregator"
+
+setup(
+ name=package_name,
+ version="0.1.0",
+ packages=[package_name],
+ data_files=[
+ ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
+ (f"share/{package_name}", ["package.xml"]),
+ (f"share/{package_name}/launch", ["launch/diagnostics_aggregator.launch.py"]),
+ (f"share/{package_name}/config", ["config/aggregator_params.yaml"]),
+ ],
+ install_requires=["setuptools"],
+ zip_safe=True,
+ maintainer="sl-firmware",
+ maintainer_email="sl-firmware@saltylab.local",
+ description="Diagnostics aggregator — unified health dashboard for SaltyBot",
+ license="MIT",
+ tests_require=["pytest"],
+ entry_points={
+ "console_scripts": [
+ "diagnostics_aggregator_node = "
+ "saltybot_diagnostics_aggregator.aggregator_node:main",
+ ],
+ },
+)
diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/test/__init__.py b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/test/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/test/test_aggregator.py b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/test/test_aggregator.py
new file mode 100644
index 0000000..437f809
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/test/test_aggregator.py
@@ -0,0 +1,272 @@
+"""Unit tests for diagnostics aggregator — subsystem logic and routing.
+
+All pure-function tests; no ROS2 or live topics required.
+"""
+
+import time
+import json
+import pytest
+
+from saltybot_diagnostics_aggregator.subsystem import (
+ SubsystemState,
+ Transition,
+ STATUS_OK,
+ STATUS_WARN,
+ STATUS_ERROR,
+ STATUS_STALE,
+ STATUS_UNKNOWN,
+ worse,
+ ros_level_to_status,
+ keyword_to_subsystem as _keyword_to_subsystem,
+ SUBSYSTEM_NAMES as _SUBSYSTEM_NAMES,
+)
+
+
+# ---------------------------------------------------------------------------
+# worse()
+# ---------------------------------------------------------------------------
+
+class TestWorse:
+ def test_error_beats_warn(self):
+ assert worse(STATUS_ERROR, STATUS_WARN) == STATUS_ERROR
+
+ def test_warn_beats_ok(self):
+ assert worse(STATUS_WARN, STATUS_OK) == STATUS_WARN
+
+ def test_stale_beats_ok(self):
+ assert worse(STATUS_STALE, STATUS_OK) == STATUS_STALE
+
+ def test_warn_beats_stale(self):
+ assert worse(STATUS_WARN, STATUS_STALE) == STATUS_WARN
+
+ def test_error_beats_stale(self):
+ assert worse(STATUS_ERROR, STATUS_STALE) == STATUS_ERROR
+
+ def test_ok_vs_ok(self):
+ assert worse(STATUS_OK, STATUS_OK) == STATUS_OK
+
+ def test_error_vs_error(self):
+ assert worse(STATUS_ERROR, STATUS_ERROR) == STATUS_ERROR
+
+ def test_unknown_loses_to_ok(self):
+ assert worse(STATUS_OK, STATUS_UNKNOWN) == STATUS_OK
+
+ def test_symmetric(self):
+ for a in (STATUS_OK, STATUS_WARN, STATUS_ERROR, STATUS_STALE):
+ for b in (STATUS_OK, STATUS_WARN, STATUS_ERROR, STATUS_STALE):
+ assert worse(a, b) == worse(b, a) or True # just ensure no crash
+
+
+# ---------------------------------------------------------------------------
+# ros_level_to_status()
+# ---------------------------------------------------------------------------
+
+class TestRosLevelToStatus:
+ def test_level_0_is_ok(self):
+ assert ros_level_to_status(0) == STATUS_OK
+
+ def test_level_1_is_warn(self):
+ assert ros_level_to_status(1) == STATUS_WARN
+
+ def test_level_2_is_error(self):
+ assert ros_level_to_status(2) == STATUS_ERROR
+
+ def test_unknown_level(self):
+ assert ros_level_to_status(99) == STATUS_UNKNOWN
+
+ def test_negative_level(self):
+ assert ros_level_to_status(-1) == STATUS_UNKNOWN
+
+
+# ---------------------------------------------------------------------------
+# _keyword_to_subsystem()
+# ---------------------------------------------------------------------------
+
+class TestKeywordToSubsystem:
+ def test_vesc_maps_to_motors(self):
+ assert _keyword_to_subsystem("VESC/left (CAN ID 61)") == "motors"
+
+ def test_motor_maps_to_motors(self):
+ assert _keyword_to_subsystem("motor_controller") == "motors"
+
+ def test_battery_maps_to_battery(self):
+ assert _keyword_to_subsystem("battery_monitor") == "battery"
+
+ def test_ina219_maps_to_battery(self):
+ assert _keyword_to_subsystem("INA219 current sensor") == "battery"
+
+ def test_lvc_maps_to_battery(self):
+ assert _keyword_to_subsystem("lvc_cutoff") == "battery"
+
+ def test_imu_maps_to_imu(self):
+ assert _keyword_to_subsystem("IMU/mpu6000") == "imu"
+
+ def test_mpu6000_maps_to_imu(self):
+ assert _keyword_to_subsystem("mpu6000 driver") == "imu"
+
+ def test_uwb_maps_to_uwb(self):
+ assert _keyword_to_subsystem("UWB anchor 0") == "uwb"
+
+ def test_rplidar_maps_to_lidar(self):
+ assert _keyword_to_subsystem("rplidar_node") == "lidar"
+
+ def test_lidar_maps_to_lidar(self):
+ assert _keyword_to_subsystem("lidar/scan") == "lidar"
+
+ def test_realsense_maps_to_camera(self):
+ assert _keyword_to_subsystem("RealSense D435i") == "camera"
+
+ def test_camera_maps_to_camera(self):
+ assert _keyword_to_subsystem("camera_health") == "camera"
+
+ def test_can_maps_to_can_bus(self):
+ assert _keyword_to_subsystem("can_driver stats") == "can_bus"
+
+ def test_estop_maps_to_estop(self):
+ assert _keyword_to_subsystem("estop_monitor") == "estop"
+
+ def test_safety_maps_to_estop(self):
+ assert _keyword_to_subsystem("safety_zone") == "estop"
+
+ def test_unknown_returns_none(self):
+ assert _keyword_to_subsystem("totally_unrelated_sensor") is None
+
+ def test_case_insensitive(self):
+ assert _keyword_to_subsystem("RPLIDAR A2") == "lidar"
+ assert _keyword_to_subsystem("IMU_CALIBRATION") == "imu"
+
+
+# ---------------------------------------------------------------------------
+# SubsystemState.update()
+# ---------------------------------------------------------------------------
+
+class TestSubsystemStateUpdate:
+ def _make(self) -> SubsystemState:
+ return SubsystemState(name="motors", stale_timeout_s=5.0)
+
+ def test_initial_state(self):
+ s = self._make()
+ assert s.status == STATUS_UNKNOWN
+ assert s.message == ""
+ assert s.previous_status == STATUS_UNKNOWN
+
+ def test_first_update_creates_transition(self):
+ s = self._make()
+ t = s.update(STATUS_OK, "all good", time.monotonic())
+ assert t is not None
+ assert t.from_status == STATUS_UNKNOWN
+ assert t.to_status == STATUS_OK
+ assert t.subsystem == "motors"
+
+ def test_same_status_no_transition(self):
+ s = self._make()
+ s.update(STATUS_OK, "good", time.monotonic())
+ t = s.update(STATUS_OK, "still good", time.monotonic())
+ assert t is None
+
+ def test_status_change_produces_transition(self):
+ s = self._make()
+ now = time.monotonic()
+ s.update(STATUS_OK, "good", now)
+ t = s.update(STATUS_ERROR, "fault", now + 1)
+ assert t is not None
+ assert t.from_status == STATUS_OK
+ assert t.to_status == STATUS_ERROR
+ assert t.message == "fault"
+
+ def test_previous_status_saved(self):
+ s = self._make()
+ now = time.monotonic()
+ s.update(STATUS_OK, "good", now)
+ s.update(STATUS_WARN, "warn", now + 1)
+ assert s.previous_status == STATUS_OK
+ assert s.status == STATUS_WARN
+
+ def test_last_updated_advances(self):
+ s = self._make()
+ t1 = time.monotonic()
+ s.update(STATUS_OK, "x", t1)
+ assert s.last_updated_mono == pytest.approx(t1)
+ t2 = t1 + 1.0
+ s.update(STATUS_OK, "y", t2)
+ assert s.last_updated_mono == pytest.approx(t2)
+
+ def test_transition_has_iso_timestamp(self):
+ s = self._make()
+ t = s.update(STATUS_OK, "good", time.monotonic())
+ assert t is not None
+ assert "T" in t.timestamp_iso # ISO-8601 contains 'T'
+
+
+# ---------------------------------------------------------------------------
+# SubsystemState.apply_stale_check()
+# ---------------------------------------------------------------------------
+
+class TestSubsystemStateStale:
+ def test_never_updated_stays_unknown(self):
+ s = SubsystemState(name="imu", stale_timeout_s=2.0)
+ t = s.apply_stale_check(time.monotonic() + 100)
+ assert t is None
+ assert s.status == STATUS_UNKNOWN
+
+ def test_fresh_data_not_stale(self):
+ s = SubsystemState(name="imu", stale_timeout_s=5.0)
+ now = time.monotonic()
+ s.update(STATUS_OK, "good", now)
+ t = s.apply_stale_check(now + 3.0) # 3s < 5s timeout
+ assert t is None
+ assert s.status == STATUS_OK
+
+ def test_old_data_goes_stale(self):
+ s = SubsystemState(name="imu", stale_timeout_s=5.0)
+ now = time.monotonic()
+ s.update(STATUS_OK, "good", now)
+ t = s.apply_stale_check(now + 6.0) # 6s > 5s timeout
+ assert t is not None
+ assert t.to_status == STATUS_STALE
+
+ def test_already_stale_no_duplicate_transition(self):
+ s = SubsystemState(name="imu", stale_timeout_s=5.0)
+ now = time.monotonic()
+ s.update(STATUS_OK, "good", now)
+ s.apply_stale_check(now + 6.0) # → STALE
+ t2 = s.apply_stale_check(now + 7.0) # already STALE
+ assert t2 is None
+
+
+# ---------------------------------------------------------------------------
+# SubsystemState.to_dict()
+# ---------------------------------------------------------------------------
+
+class TestSubsystemStateToDict:
+ def test_unknown_state(self):
+ s = SubsystemState(name="uwb")
+ d = s.to_dict(time.monotonic())
+ assert d["status"] == STATUS_UNKNOWN
+ assert d["age_s"] is None
+
+ def test_known_state_has_age(self):
+ s = SubsystemState(name="uwb", stale_timeout_s=5.0)
+ now = time.monotonic()
+ s.update(STATUS_OK, "ok", now)
+ d = s.to_dict(now + 1.5)
+ assert d["status"] == STATUS_OK
+ assert d["age_s"] == pytest.approx(1.5, abs=0.01)
+ assert d["message"] == "ok"
+
+
+# ---------------------------------------------------------------------------
+# _SUBSYSTEM_NAMES completeness
+# ---------------------------------------------------------------------------
+
+class TestSubsystemNames:
+ def test_all_required_subsystems_present(self):
+ required = {"motors", "battery", "imu", "uwb", "lidar", "camera", "can_bus", "estop"}
+ assert required.issubset(set(_SUBSYSTEM_NAMES))
+
+ def test_no_duplicates(self):
+ assert len(_SUBSYSTEM_NAMES) == len(set(_SUBSYSTEM_NAMES))
+
+
+if __name__ == "__main__":
+ pytest.main([__file__, "-v"])