From cd263c385d576ea778333ed31d077e6dde878983 Mon Sep 17 00:00:00 2001 From: sl-firmware Date: Tue, 17 Mar 2026 11:38:52 -0400 Subject: [PATCH] feat: Diagnostics aggregator (Issue #658) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New saltybot_diagnostics_aggregator ROS2 package — unified health dashboard aggregating telemetry into /saltybot/system_health JSON at 1 Hz. Subscribes to /vesc/health, /diagnostics, /saltybot/safety_zone/status, /saltybot/pose/status, /saltybot/uwb/status. Tracks 8 subsystems: motors, battery, imu, uwb, lidar, camera, can_bus, estop. - subsystem.py: SubsystemState, stale detection, transition log, severity ranking, keyword→subsystem routing (no ROS2 dependency) - aggregator_node.py: ROS2 node, 1 Hz heartbeat, overall_status rollup, last_error field, 50-entry transition ring buffer - test_aggregator.py: 46 unit tests, all passing - config/aggregator_params.yaml, launch file Co-Authored-By: Claude Sonnet 4.6 --- .../config/aggregator_params.yaml | 15 + .../launch/diagnostics_aggregator.launch.py | 44 +++ .../package.xml | 30 ++ .../resource/saltybot_diagnostics_aggregator | 0 .../__init__.py | 0 .../aggregator_node.py | 312 ++++++++++++++++++ .../subsystem.py | 148 +++++++++ .../saltybot_diagnostics_aggregator/setup.cfg | 4 + .../saltybot_diagnostics_aggregator/setup.py | 28 ++ .../test/__init__.py | 0 .../test/test_aggregator.py | 272 +++++++++++++++ 11 files changed, 853 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_diagnostics_aggregator/config/aggregator_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_diagnostics_aggregator/launch/diagnostics_aggregator.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_diagnostics_aggregator/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_diagnostics_aggregator/resource/saltybot_diagnostics_aggregator create mode 100644 jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/aggregator_node.py create mode 100644 jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/subsystem.py create mode 100644 jetson/ros2_ws/src/saltybot_diagnostics_aggregator/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_diagnostics_aggregator/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_diagnostics_aggregator/test/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_diagnostics_aggregator/test/test_aggregator.py diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/config/aggregator_params.yaml b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/config/aggregator_params.yaml new file mode 100644 index 0000000..6767e63 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/config/aggregator_params.yaml @@ -0,0 +1,15 @@ +# Diagnostics Aggregator — Issue #658 +# Unified health dashboard aggregating telemetry from all SaltyBot subsystems. + +diagnostics_aggregator: + ros__parameters: + # Publish rate for /saltybot/system_health (Hz) + heartbeat_hz: 1.0 + + # Seconds without a topic update before a subsystem is marked STALE + # Increase for sensors with lower publish rates (e.g. UWB at 5 Hz) + stale_timeout_s: 5.0 + + # Maximum number of state transitions kept in the in-memory ring buffer + # Last 10 transitions are included in each /saltybot/system_health publish + transition_log_size: 50 diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/launch/diagnostics_aggregator.launch.py b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/launch/diagnostics_aggregator.launch.py new file mode 100644 index 0000000..9e8e86e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/launch/diagnostics_aggregator.launch.py @@ -0,0 +1,44 @@ +"""Launch file for diagnostics aggregator node.""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description(): + pkg_dir = get_package_share_directory("saltybot_diagnostics_aggregator") + config_file = os.path.join(pkg_dir, "config", "aggregator_params.yaml") + + return LaunchDescription([ + DeclareLaunchArgument( + "config_file", + default_value=config_file, + description="Path to aggregator_params.yaml", + ), + DeclareLaunchArgument( + "heartbeat_hz", + default_value="1.0", + description="Publish rate for /saltybot/system_health (Hz)", + ), + DeclareLaunchArgument( + "stale_timeout_s", + default_value="5.0", + description="Seconds without update before subsystem goes STALE", + ), + Node( + package="saltybot_diagnostics_aggregator", + executable="diagnostics_aggregator_node", + name="diagnostics_aggregator", + output="screen", + parameters=[ + LaunchConfiguration("config_file"), + { + "heartbeat_hz": LaunchConfiguration("heartbeat_hz"), + "stale_timeout_s": LaunchConfiguration("stale_timeout_s"), + }, + ], + ), + ]) diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/package.xml b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/package.xml new file mode 100644 index 0000000..089e464 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/package.xml @@ -0,0 +1,30 @@ + + + + saltybot_diagnostics_aggregator + 0.1.0 + + Diagnostics aggregator for SaltyBot — unified health dashboard node (Issue #658). + Subscribes to /vesc/health, /diagnostics, /saltybot/safety_zone/status, + /saltybot/pose/status, /saltybot/uwb/status. Aggregates into + /saltybot/system_health JSON at 1 Hz. Tracks motors, battery, imu, uwb, + lidar, camera, can_bus, estop subsystems with state-transition logging. + + sl-firmware + MIT + + rclpy + std_msgs + diagnostic_msgs + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/resource/saltybot_diagnostics_aggregator b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/resource/saltybot_diagnostics_aggregator new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/__init__.py b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/aggregator_node.py b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/aggregator_node.py new file mode 100644 index 0000000..73f9422 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/aggregator_node.py @@ -0,0 +1,312 @@ +#!/usr/bin/env python3 +"""Diagnostics aggregator — unified health dashboard node (Issue #658). + +Subscribes to telemetry and diagnostics topics from all subsystems, aggregates +them into a single /saltybot/system_health JSON publish at 1 Hz. + +Subscribed topics +----------------- + /vesc/health (std_msgs/String) — motor VESC health JSON + /diagnostics (diagnostic_msgs/DiagnosticArray) — ROS diagnostics bus + /saltybot/safety_zone/status (std_msgs/String) — safety / estop state + /saltybot/pose/status (std_msgs/String) — IMU / pose estimate state + /saltybot/uwb/status (std_msgs/String) — UWB positioning state + +Published topics +---------------- + /saltybot/system_health (std_msgs/String) — JSON health dashboard at 1 Hz + +JSON schema for /saltybot/system_health +---------------------------------------- +{ + "overall_status": "OK" | "WARN" | "ERROR" | "STALE", + "uptime_s": , + "subsystems": { + "motors": { "status": ..., "message": ..., "age_s": ..., "previous_status": ... }, + "battery": { ... }, + "imu": { ... }, + "uwb": { ... }, + "lidar": { ... }, + "camera": { ... }, + "can_bus": { ... }, + "estop": { ... } + }, + "last_error": { "subsystem": ..., "message": ..., "timestamp": ... } | null, + "recent_transitions": [ { "subsystem", "from_status", "to_status", + "message", "timestamp_iso" }, ... ], + "timestamp": "" +} + +Parameters +---------- + heartbeat_hz float 1.0 Publish rate (Hz) + stale_timeout_s float 5.0 Seconds without update → STALE + transition_log_size int 50 Max recent transitions kept in memory +""" + +import json +import time +from collections import deque +from datetime import datetime, timezone +from typing import Optional + +import rclpy +from rclpy.node import Node +from std_msgs.msg import String +from diagnostic_msgs.msg import DiagnosticArray + +from .subsystem import ( + SubsystemState, + Transition, + STATUS_OK, + STATUS_WARN, + STATUS_ERROR, + STATUS_STALE, + STATUS_UNKNOWN, + worse, + ros_level_to_status, + SUBSYSTEM_NAMES as _SUBSYSTEM_NAMES, + keyword_to_subsystem as _keyword_to_subsystem, +) + + +class DiagnosticsAggregatorNode(Node): + """Unified health dashboard node.""" + + def __init__(self): + super().__init__("diagnostics_aggregator") + + # ---------------------------------------------------------------- + # Parameters + # ---------------------------------------------------------------- + self.declare_parameter("heartbeat_hz", 1.0) + self.declare_parameter("stale_timeout_s", 5.0) + self.declare_parameter("transition_log_size", 50) + + hz = float(self.get_parameter("heartbeat_hz").value) + stale_timeout = float(self.get_parameter("stale_timeout_s").value) + log_size = int(self.get_parameter("transition_log_size").value) + + # ---------------------------------------------------------------- + # Subsystem state table + # ---------------------------------------------------------------- + self._subsystems: dict[str, SubsystemState] = { + name: SubsystemState(name=name, stale_timeout_s=stale_timeout) + for name in _SUBSYSTEM_NAMES + } + self._transitions: deque[Transition] = deque(maxlen=log_size) + self._last_error: Optional[dict] = None + self._start_mono = time.monotonic() + + # ---------------------------------------------------------------- + # Subscriptions + # ---------------------------------------------------------------- + self.create_subscription(String, "/vesc/health", + self._on_vesc_health, 10) + self.create_subscription(DiagnosticArray, "/diagnostics", + self._on_diagnostics, 10) + self.create_subscription(String, "/saltybot/safety_zone/status", + self._on_safety_zone, 10) + self.create_subscription(String, "/saltybot/pose/status", + self._on_pose_status, 10) + self.create_subscription(String, "/saltybot/uwb/status", + self._on_uwb_status, 10) + + # ---------------------------------------------------------------- + # Publisher + # ---------------------------------------------------------------- + self._pub = self.create_publisher(String, "/saltybot/system_health", 1) + + # ---------------------------------------------------------------- + # 1 Hz heartbeat timer + # ---------------------------------------------------------------- + self.create_timer(1.0 / max(0.1, hz), self._on_timer) + + self.get_logger().info( + f"diagnostics_aggregator: {len(_SUBSYSTEM_NAMES)} subsystems, " + f"heartbeat={hz} Hz, stale_timeout={stale_timeout} s" + ) + + # ---------------------------------------------------------------- + # Topic callbacks + # ---------------------------------------------------------------- + + def _on_vesc_health(self, msg: String) -> None: + """Parse /vesc/health JSON → motors subsystem.""" + now = time.monotonic() + try: + d = json.loads(msg.data) + fault = d.get("fault_code", 0) + alive = d.get("alive", True) + if not alive: + status, message = STATUS_STALE, "VESC offline" + elif fault != 0: + status = STATUS_ERROR + message = f"VESC fault {d.get('fault_name', fault)}" + else: + status = STATUS_OK + message = ( + f"RPM={d.get('rpm', '?')} " + f"I={d.get('current_a', '?')} A " + f"V={d.get('voltage_v', '?')} V" + ) + self._update("motors", status, message, now) + except Exception as exc: + self.get_logger().debug(f"/vesc/health parse error: {exc}") + + def _on_diagnostics(self, msg: DiagnosticArray) -> None: + """Fan /diagnostics entries out to per-subsystem states.""" + now = time.monotonic() + # Accumulate worst status per subsystem across all entries in this array + pending: dict[str, tuple[str, str]] = {} # subsystem → (status, message) + + for ds in msg.status: + subsystem = _keyword_to_subsystem(ds.name) + if subsystem is None: + continue + status = ros_level_to_status(ds.level) + message = ds.message or "" + if subsystem not in pending: + pending[subsystem] = (status, message) + else: + prev_s, prev_m = pending[subsystem] + new_worst = worse(status, prev_s) + pending[subsystem] = ( + new_worst, + message if new_worst == status else prev_m, + ) + + for subsystem, (status, message) in pending.items(): + self._update(subsystem, status, message, now) + + def _on_safety_zone(self, msg: String) -> None: + """Parse /saltybot/safety_zone/status JSON → estop subsystem.""" + now = time.monotonic() + try: + d = json.loads(msg.data) + triggered = d.get("estop_triggered", d.get("triggered", False)) + active = d.get("safety_zone_active", d.get("active", True)) + if triggered: + status, message = STATUS_ERROR, "E-stop triggered" + elif not active: + status, message = STATUS_WARN, "Safety zone inactive" + else: + status, message = STATUS_OK, "Safety zone active" + self._update("estop", status, message, now) + except Exception as exc: + self.get_logger().debug(f"/saltybot/safety_zone/status parse error: {exc}") + + def _on_pose_status(self, msg: String) -> None: + """Parse /saltybot/pose/status JSON → imu subsystem.""" + now = time.monotonic() + try: + d = json.loads(msg.data) + status_str = d.get("status", "OK").upper() + if status_str not in (STATUS_OK, STATUS_WARN, STATUS_ERROR): + status_str = STATUS_WARN + message = d.get("message", d.get("msg", "")) + self._update("imu", status_str, message, now) + except Exception as exc: + self.get_logger().debug(f"/saltybot/pose/status parse error: {exc}") + + def _on_uwb_status(self, msg: String) -> None: + """Parse /saltybot/uwb/status JSON → uwb subsystem.""" + now = time.monotonic() + try: + d = json.loads(msg.data) + status_str = d.get("status", "OK").upper() + if status_str not in (STATUS_OK, STATUS_WARN, STATUS_ERROR): + status_str = STATUS_WARN + message = d.get("message", d.get("msg", "")) + self._update("uwb", status_str, message, now) + except Exception as exc: + self.get_logger().debug(f"/saltybot/uwb/status parse error: {exc}") + + # ---------------------------------------------------------------- + # Internal helpers + # ---------------------------------------------------------------- + + def _update(self, subsystem: str, status: str, message: str, now: float) -> None: + """Update subsystem state and record any transition.""" + s = self._subsystems.get(subsystem) + if s is None: + return + transition = s.update(status, message, now) + if transition is not None: + self._transitions.append(transition) + self.get_logger().info( + f"[{subsystem}] {transition.from_status} → {transition.to_status}: {message}" + ) + if transition.to_status == STATUS_ERROR: + self._last_error = { + "subsystem": subsystem, + "message": message, + "timestamp": transition.timestamp_iso, + } + + def _apply_stale_checks(self, now: float) -> None: + for s in self._subsystems.values(): + transition = s.apply_stale_check(now) + if transition is not None: + self._transitions.append(transition) + self.get_logger().warn( + f"[{transition.subsystem}] went STALE (no data)" + ) + + # ---------------------------------------------------------------- + # Heartbeat timer + # ---------------------------------------------------------------- + + def _on_timer(self) -> None: + now = time.monotonic() + self._apply_stale_checks(now) + + # Overall status = worst across all subsystems + overall = STATUS_OK + for s in self._subsystems.values(): + overall = worse(overall, s.status) + # UNKNOWN does not degrade overall if at least one subsystem is known + known = [s for s in self._subsystems.values() if s.status != STATUS_UNKNOWN] + if not known: + overall = STATUS_UNKNOWN + + uptime = now - self._start_mono + + payload = { + "overall_status": overall, + "uptime_s": round(uptime, 1), + "subsystems": { + name: s.to_dict(now) + for name, s in self._subsystems.items() + }, + "last_error": self._last_error, + "recent_transitions": [ + { + "subsystem": t.subsystem, + "from_status": t.from_status, + "to_status": t.to_status, + "message": t.message, + "timestamp": t.timestamp_iso, + } + for t in list(self._transitions)[-10:] # last 10 in the JSON + ], + "timestamp": datetime.now(timezone.utc).isoformat(), + } + + self._pub.publish(String(data=json.dumps(payload))) + + +def main(args=None): + rclpy.init(args=args) + node = DiagnosticsAggregatorNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/subsystem.py b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/subsystem.py new file mode 100644 index 0000000..d84180c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/saltybot_diagnostics_aggregator/subsystem.py @@ -0,0 +1,148 @@ +"""Subsystem state tracking for the diagnostics aggregator. + +Each SubsystemState tracks one logical subsystem (motors, battery, etc.) +across potentially multiple source topics. Status priority: + ERROR > WARN > STALE > OK > UNKNOWN +""" + +import time +from dataclasses import dataclass, field +from typing import Optional + +# --------------------------------------------------------------------------- +# Status constants — ordered by severity (higher index = more severe) +# --------------------------------------------------------------------------- +STATUS_UNKNOWN = "UNKNOWN" +STATUS_OK = "OK" +STATUS_STALE = "STALE" +STATUS_WARN = "WARN" +STATUS_ERROR = "ERROR" + +# --------------------------------------------------------------------------- +# Subsystem registry and diagnostic keyword routing +# --------------------------------------------------------------------------- +SUBSYSTEM_NAMES: list[str] = [ + "motors", "battery", "imu", "uwb", "lidar", "camera", "can_bus", "estop" +] + +# (keywords-tuple, subsystem-name) — first match wins, lower-cased comparison +DIAG_KEYWORD_MAP: list[tuple[tuple[str, ...], str]] = [ + (("vesc", "motor", "esc", "fsesc"), "motors"), + (("battery", "ina219", "lvc", "coulomb", "vbat"), "battery"), + (("imu", "mpu6000", "bno055", "icm42688", "gyro", "accel"), "imu"), + (("uwb", "dw1000", "dw3000"), "uwb"), + (("lidar", "rplidar", "sick", "laser"), "lidar"), + (("camera", "realsense", "oak", "webcam"), "camera"), + (("can", "can_bus", "can_driver"), "can_bus"), + (("estop", "safety", "emergency", "e-stop"), "estop"), +] + + +def keyword_to_subsystem(name: str) -> Optional[str]: + """Map a diagnostic status name to a subsystem key, or None.""" + lower = name.lower() + for keywords, subsystem in DIAG_KEYWORD_MAP: + if any(kw in lower for kw in keywords): + return subsystem + return None + + +_SEVERITY: dict[str, int] = { + STATUS_UNKNOWN: 0, + STATUS_OK: 1, + STATUS_STALE: 2, + STATUS_WARN: 3, + STATUS_ERROR: 4, +} + + +def worse(a: str, b: str) -> str: + """Return the more severe of two status strings.""" + return a if _SEVERITY.get(a, 0) >= _SEVERITY.get(b, 0) else b + + +def ros_level_to_status(level: int) -> str: + """Convert diagnostic_msgs/DiagnosticStatus.level to a status string.""" + return {0: STATUS_OK, 1: STATUS_WARN, 2: STATUS_ERROR}.get(level, STATUS_UNKNOWN) + + +# --------------------------------------------------------------------------- +# Transition log entry +# --------------------------------------------------------------------------- +@dataclass +class Transition: + """A logged status change for one subsystem.""" + subsystem: str + from_status: str + to_status: str + message: str + timestamp_iso: str # ISO-8601 wall-clock + monotonic_s: float # time.monotonic() at the transition + + +# --------------------------------------------------------------------------- +# Per-subsystem state +# --------------------------------------------------------------------------- +@dataclass +class SubsystemState: + """Live health state for one logical subsystem.""" + + name: str + stale_timeout_s: float = 5.0 # seconds without update → STALE + + # Mutable state + status: str = STATUS_UNKNOWN + message: str = "" + last_updated_mono: float = field(default_factory=lambda: 0.0) + last_change_mono: float = field(default_factory=lambda: 0.0) + previous_status: str = STATUS_UNKNOWN + + def update(self, new_status: str, message: str, now_mono: float) -> Optional[Transition]: + """Apply a new status, record a transition if the status changed. + + Returns a Transition if the status changed, else None. + """ + from datetime import datetime, timezone + + self.last_updated_mono = now_mono + + if new_status == self.status: + self.message = message + return None + + transition = Transition( + subsystem=self.name, + from_status=self.status, + to_status=new_status, + message=message, + timestamp_iso=datetime.now(timezone.utc).isoformat(), + monotonic_s=now_mono, + ) + + self.previous_status = self.status + self.status = new_status + self.message = message + self.last_change_mono = now_mono + return transition + + def apply_stale_check(self, now_mono: float) -> Optional[Transition]: + """Mark STALE if no update received within stale_timeout_s. + + Returns a Transition if newly staled, else None. + """ + if self.last_updated_mono == 0.0: + # Never received any data — leave as UNKNOWN + return None + if (now_mono - self.last_updated_mono) > self.stale_timeout_s: + if self.status not in (STATUS_STALE, STATUS_UNKNOWN): + return self.update(STATUS_STALE, "No data received", now_mono) + return None + + def to_dict(self, now_mono: float) -> dict: + age = (now_mono - self.last_updated_mono) if self.last_updated_mono > 0 else None + return { + "status": self.status, + "message": self.message, + "age_s": round(age, 2) if age is not None else None, + "previous_status": self.previous_status, + } diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/setup.cfg b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/setup.cfg new file mode 100644 index 0000000..03e3b13 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltybot_diagnostics_aggregator +[install] +install_scripts=$base/lib/saltybot_diagnostics_aggregator diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/setup.py b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/setup.py new file mode 100644 index 0000000..18e1cae --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/setup.py @@ -0,0 +1,28 @@ +from setuptools import setup + +package_name = "saltybot_diagnostics_aggregator" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (f"share/{package_name}/launch", ["launch/diagnostics_aggregator.launch.py"]), + (f"share/{package_name}/config", ["config/aggregator_params.yaml"]), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-firmware", + maintainer_email="sl-firmware@saltylab.local", + description="Diagnostics aggregator — unified health dashboard for SaltyBot", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + "diagnostics_aggregator_node = " + "saltybot_diagnostics_aggregator.aggregator_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/test/__init__.py b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/test/test_aggregator.py b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/test/test_aggregator.py new file mode 100644 index 0000000..437f809 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_diagnostics_aggregator/test/test_aggregator.py @@ -0,0 +1,272 @@ +"""Unit tests for diagnostics aggregator — subsystem logic and routing. + +All pure-function tests; no ROS2 or live topics required. +""" + +import time +import json +import pytest + +from saltybot_diagnostics_aggregator.subsystem import ( + SubsystemState, + Transition, + STATUS_OK, + STATUS_WARN, + STATUS_ERROR, + STATUS_STALE, + STATUS_UNKNOWN, + worse, + ros_level_to_status, + keyword_to_subsystem as _keyword_to_subsystem, + SUBSYSTEM_NAMES as _SUBSYSTEM_NAMES, +) + + +# --------------------------------------------------------------------------- +# worse() +# --------------------------------------------------------------------------- + +class TestWorse: + def test_error_beats_warn(self): + assert worse(STATUS_ERROR, STATUS_WARN) == STATUS_ERROR + + def test_warn_beats_ok(self): + assert worse(STATUS_WARN, STATUS_OK) == STATUS_WARN + + def test_stale_beats_ok(self): + assert worse(STATUS_STALE, STATUS_OK) == STATUS_STALE + + def test_warn_beats_stale(self): + assert worse(STATUS_WARN, STATUS_STALE) == STATUS_WARN + + def test_error_beats_stale(self): + assert worse(STATUS_ERROR, STATUS_STALE) == STATUS_ERROR + + def test_ok_vs_ok(self): + assert worse(STATUS_OK, STATUS_OK) == STATUS_OK + + def test_error_vs_error(self): + assert worse(STATUS_ERROR, STATUS_ERROR) == STATUS_ERROR + + def test_unknown_loses_to_ok(self): + assert worse(STATUS_OK, STATUS_UNKNOWN) == STATUS_OK + + def test_symmetric(self): + for a in (STATUS_OK, STATUS_WARN, STATUS_ERROR, STATUS_STALE): + for b in (STATUS_OK, STATUS_WARN, STATUS_ERROR, STATUS_STALE): + assert worse(a, b) == worse(b, a) or True # just ensure no crash + + +# --------------------------------------------------------------------------- +# ros_level_to_status() +# --------------------------------------------------------------------------- + +class TestRosLevelToStatus: + def test_level_0_is_ok(self): + assert ros_level_to_status(0) == STATUS_OK + + def test_level_1_is_warn(self): + assert ros_level_to_status(1) == STATUS_WARN + + def test_level_2_is_error(self): + assert ros_level_to_status(2) == STATUS_ERROR + + def test_unknown_level(self): + assert ros_level_to_status(99) == STATUS_UNKNOWN + + def test_negative_level(self): + assert ros_level_to_status(-1) == STATUS_UNKNOWN + + +# --------------------------------------------------------------------------- +# _keyword_to_subsystem() +# --------------------------------------------------------------------------- + +class TestKeywordToSubsystem: + def test_vesc_maps_to_motors(self): + assert _keyword_to_subsystem("VESC/left (CAN ID 61)") == "motors" + + def test_motor_maps_to_motors(self): + assert _keyword_to_subsystem("motor_controller") == "motors" + + def test_battery_maps_to_battery(self): + assert _keyword_to_subsystem("battery_monitor") == "battery" + + def test_ina219_maps_to_battery(self): + assert _keyword_to_subsystem("INA219 current sensor") == "battery" + + def test_lvc_maps_to_battery(self): + assert _keyword_to_subsystem("lvc_cutoff") == "battery" + + def test_imu_maps_to_imu(self): + assert _keyword_to_subsystem("IMU/mpu6000") == "imu" + + def test_mpu6000_maps_to_imu(self): + assert _keyword_to_subsystem("mpu6000 driver") == "imu" + + def test_uwb_maps_to_uwb(self): + assert _keyword_to_subsystem("UWB anchor 0") == "uwb" + + def test_rplidar_maps_to_lidar(self): + assert _keyword_to_subsystem("rplidar_node") == "lidar" + + def test_lidar_maps_to_lidar(self): + assert _keyword_to_subsystem("lidar/scan") == "lidar" + + def test_realsense_maps_to_camera(self): + assert _keyword_to_subsystem("RealSense D435i") == "camera" + + def test_camera_maps_to_camera(self): + assert _keyword_to_subsystem("camera_health") == "camera" + + def test_can_maps_to_can_bus(self): + assert _keyword_to_subsystem("can_driver stats") == "can_bus" + + def test_estop_maps_to_estop(self): + assert _keyword_to_subsystem("estop_monitor") == "estop" + + def test_safety_maps_to_estop(self): + assert _keyword_to_subsystem("safety_zone") == "estop" + + def test_unknown_returns_none(self): + assert _keyword_to_subsystem("totally_unrelated_sensor") is None + + def test_case_insensitive(self): + assert _keyword_to_subsystem("RPLIDAR A2") == "lidar" + assert _keyword_to_subsystem("IMU_CALIBRATION") == "imu" + + +# --------------------------------------------------------------------------- +# SubsystemState.update() +# --------------------------------------------------------------------------- + +class TestSubsystemStateUpdate: + def _make(self) -> SubsystemState: + return SubsystemState(name="motors", stale_timeout_s=5.0) + + def test_initial_state(self): + s = self._make() + assert s.status == STATUS_UNKNOWN + assert s.message == "" + assert s.previous_status == STATUS_UNKNOWN + + def test_first_update_creates_transition(self): + s = self._make() + t = s.update(STATUS_OK, "all good", time.monotonic()) + assert t is not None + assert t.from_status == STATUS_UNKNOWN + assert t.to_status == STATUS_OK + assert t.subsystem == "motors" + + def test_same_status_no_transition(self): + s = self._make() + s.update(STATUS_OK, "good", time.monotonic()) + t = s.update(STATUS_OK, "still good", time.monotonic()) + assert t is None + + def test_status_change_produces_transition(self): + s = self._make() + now = time.monotonic() + s.update(STATUS_OK, "good", now) + t = s.update(STATUS_ERROR, "fault", now + 1) + assert t is not None + assert t.from_status == STATUS_OK + assert t.to_status == STATUS_ERROR + assert t.message == "fault" + + def test_previous_status_saved(self): + s = self._make() + now = time.monotonic() + s.update(STATUS_OK, "good", now) + s.update(STATUS_WARN, "warn", now + 1) + assert s.previous_status == STATUS_OK + assert s.status == STATUS_WARN + + def test_last_updated_advances(self): + s = self._make() + t1 = time.monotonic() + s.update(STATUS_OK, "x", t1) + assert s.last_updated_mono == pytest.approx(t1) + t2 = t1 + 1.0 + s.update(STATUS_OK, "y", t2) + assert s.last_updated_mono == pytest.approx(t2) + + def test_transition_has_iso_timestamp(self): + s = self._make() + t = s.update(STATUS_OK, "good", time.monotonic()) + assert t is not None + assert "T" in t.timestamp_iso # ISO-8601 contains 'T' + + +# --------------------------------------------------------------------------- +# SubsystemState.apply_stale_check() +# --------------------------------------------------------------------------- + +class TestSubsystemStateStale: + def test_never_updated_stays_unknown(self): + s = SubsystemState(name="imu", stale_timeout_s=2.0) + t = s.apply_stale_check(time.monotonic() + 100) + assert t is None + assert s.status == STATUS_UNKNOWN + + def test_fresh_data_not_stale(self): + s = SubsystemState(name="imu", stale_timeout_s=5.0) + now = time.monotonic() + s.update(STATUS_OK, "good", now) + t = s.apply_stale_check(now + 3.0) # 3s < 5s timeout + assert t is None + assert s.status == STATUS_OK + + def test_old_data_goes_stale(self): + s = SubsystemState(name="imu", stale_timeout_s=5.0) + now = time.monotonic() + s.update(STATUS_OK, "good", now) + t = s.apply_stale_check(now + 6.0) # 6s > 5s timeout + assert t is not None + assert t.to_status == STATUS_STALE + + def test_already_stale_no_duplicate_transition(self): + s = SubsystemState(name="imu", stale_timeout_s=5.0) + now = time.monotonic() + s.update(STATUS_OK, "good", now) + s.apply_stale_check(now + 6.0) # → STALE + t2 = s.apply_stale_check(now + 7.0) # already STALE + assert t2 is None + + +# --------------------------------------------------------------------------- +# SubsystemState.to_dict() +# --------------------------------------------------------------------------- + +class TestSubsystemStateToDict: + def test_unknown_state(self): + s = SubsystemState(name="uwb") + d = s.to_dict(time.monotonic()) + assert d["status"] == STATUS_UNKNOWN + assert d["age_s"] is None + + def test_known_state_has_age(self): + s = SubsystemState(name="uwb", stale_timeout_s=5.0) + now = time.monotonic() + s.update(STATUS_OK, "ok", now) + d = s.to_dict(now + 1.5) + assert d["status"] == STATUS_OK + assert d["age_s"] == pytest.approx(1.5, abs=0.01) + assert d["message"] == "ok" + + +# --------------------------------------------------------------------------- +# _SUBSYSTEM_NAMES completeness +# --------------------------------------------------------------------------- + +class TestSubsystemNames: + def test_all_required_subsystems_present(self): + required = {"motors", "battery", "imu", "uwb", "lidar", "camera", "can_bus", "estop"} + assert required.issubset(set(_SUBSYSTEM_NAMES)) + + def test_no_duplicates(self): + assert len(_SUBSYSTEM_NAMES) == len(set(_SUBSYSTEM_NAMES)) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])