diff --git a/include/jlink.h b/include/jlink.h index c008754..7e2289e 100644 --- a/include/jlink.h +++ b/include/jlink.h @@ -3,9 +3,10 @@ #include #include +#include "pid_flash.h" /* pid_sched_entry_t, PID_SCHED_MAX_BANDS */ /* - * JLink — Jetson serial binary protocol over USART1 (PB6=TX, PB7=RX). + * JLink -- Jetson serial binary protocol over USART1 (PB6=TX, PB7=RX). * * Issue #120: replaces jetson_cmd ASCII-over-USB-CDC with a dedicated * hardware UART at 921600 baud using DMA circular RX and IDLE interrupt. @@ -28,14 +29,21 @@ * 0x05 PID_SET - float kp, float ki, float kd (12 bytes, IEEE-754 LE) * 0x06 DFU_ENTER - no payload; request OTA DFU reboot (denied while armed) * 0x07 ESTOP - no payload; engage emergency stop + * 0x08 AUDIO - int16 PCM samples (up to 126 samples) + * 0x09 SLEEP - no payload; request STOP-mode sleep * 0x0A PID_SAVE - no payload; save current Kp/Ki/Kd to flash (Issue #531) * 0x0B GIMBAL_POS - int16 pan_x10, int16 tilt_x10, uint16 speed (Issue #547) + * 0x0C SCHED_GET - no payload; reply with TLM_SCHED (Issue #550) + * 0x0D SCHED_SET - uint8 num_bands + N*16-byte pid_sched_entry_t (Issue #550) + * 0x0E SCHED_SAVE - float kp, ki, kd (12 bytes); save sched+single to flash (Issue #550) * * STM32 to Jetson telemetry: - * 0x80 STATUS - jlink_tlm_status_t (20 bytes), sent at JLINK_TLM_HZ - * 0x81 POWER - jlink_tlm_power_t (11 bytes), sent at PM_TLM_HZ + * 0x80 STATUS - jlink_tlm_status_t (20 bytes), sent at JLINK_TLM_HZ + * 0x81 POWER - jlink_tlm_power_t (11 bytes), sent at PM_TLM_HZ + * 0x82 BATTERY - jlink_tlm_battery_t (10 bytes, Issue #533) * 0x83 PID_RESULT - jlink_tlm_pid_result_t (13 bytes), sent after PID_SAVE (Issue #531) * 0x84 GIMBAL_STATE - jlink_tlm_gimbal_state_t (10 bytes, Issue #547) + * 0x85 SCHED - jlink_tlm_sched_t (1+N*16 bytes), sent on SCHED_GET (Issue #550) * * Priority: CRSF RC always takes precedence. Jetson steer/speed only applied * when mode_manager_active() == MODE_AUTONOMOUS (CH6 high). In RC_MANUAL and @@ -62,13 +70,17 @@ #define JLINK_CMD_SLEEP 0x09u /* no payload; request STOP-mode sleep */ #define JLINK_CMD_PID_SAVE 0x0Au /* no payload; save Kp/Ki/Kd to flash (Issue #531) */ #define JLINK_CMD_GIMBAL_POS 0x0Bu /* int16 pan_x10, int16 tilt_x10, uint16 speed (Issue #547) */ +#define JLINK_CMD_SCHED_GET 0x0Cu /* no payload; reply TLM_SCHED (Issue #550) */ +#define JLINK_CMD_SCHED_SET 0x0Du /* uint8 num_bands + N*16-byte entries (Issue #550) */ +#define JLINK_CMD_SCHED_SAVE 0x0Eu /* float kp,ki,kd; save sched+single to flash (Issue #550) */ /* ---- Telemetry IDs (STM32 to Jetson) ---- */ #define JLINK_TLM_STATUS 0x80u #define JLINK_TLM_POWER 0x81u /* jlink_tlm_power_t (11 bytes) */ #define JLINK_TLM_BATTERY 0x82u /* jlink_tlm_battery_t (10 bytes, Issue #533) */ -#define JLINK_TLM_PID_RESULT 0x83u /* jlink_tlm_pid_result_t (13 bytes) Issue #531 */ +#define JLINK_TLM_PID_RESULT 0x83u /* jlink_tlm_pid_result_t (13 bytes, Issue #531) */ #define JLINK_TLM_GIMBAL_STATE 0x84u /* jlink_tlm_gimbal_state_t (10 bytes, Issue #547) */ +#define JLINK_TLM_SCHED 0x85u /* jlink_tlm_sched_t (1+N*16 bytes, Issue #550) */ /* ---- Telemetry STATUS payload (20 bytes, packed) ---- */ typedef struct __attribute__((packed)) { @@ -98,15 +110,15 @@ typedef struct __attribute__((packed)) { uint32_t idle_ms; /* ms since last cmd_vel activity */ } jlink_tlm_power_t; /* 11 bytes */ -/* ---- Telemetry BATTERY payload (10 bytes, packed) — Issue #533 ---- */ +/* ---- Telemetry BATTERY payload (10 bytes, packed) Issue #533 ---- */ typedef struct __attribute__((packed)) { - uint16_t vbat_mv; /* DMA-sampled LPF-filtered Vbat (mV) */ + uint16_t vbat_mv; /* DMA-sampled LPF-filtered Vbat (mV) */ int16_t ibat_ma; /* DMA-sampled LPF-filtered Ibat (mA, + = discharge) */ - uint16_t vbat_raw_mv; /* unfiltered last-tick average (mV) */ - uint8_t flags; /* bit0=low, bit1=critical, bit2=4S, bit3=adc_ready */ - int8_t cal_offset; /* vbat_offset_mv / 4 (±127 → ±508 mV) */ - uint8_t lpf_shift; /* IIR shift factor (α = 1/2^lpf_shift) */ - uint8_t soc_pct; /* voltage-based SoC 0–100, 255 = unknown */ + uint16_t vbat_raw_mv; /* unfiltered last-tick average (mV) */ + uint8_t flags; /* bit0=low, bit1=critical, bit2=4S, bit3=adc_ready */ + int8_t cal_offset; /* vbat_offset_mv / 4 (+-127 -> +-508 mV) */ + uint8_t lpf_shift; /* IIR shift factor (alpha = 1/2^lpf_shift) */ + uint8_t soc_pct; /* voltage-based SoC 0-100, 255 = unknown */ } jlink_tlm_battery_t; /* 10 bytes */ /* ---- Telemetry PID_RESULT payload (13 bytes, packed) Issue #531 ---- */ @@ -129,6 +141,13 @@ typedef struct __attribute__((packed)) { uint8_t rx_err_pct; /* bus error rate 0-100% (rx_err*100/(rx_ok+rx_err)) */ } jlink_tlm_gimbal_state_t; /* 10 bytes */ +/* ---- Telemetry SCHED payload (1 + N*16 bytes, packed) Issue #550 ---- */ +/* Sent in response to JLINK_CMD_SCHED_GET; N = num_bands (1..PID_SCHED_MAX_BANDS). */ +typedef struct __attribute__((packed)) { + uint8_t num_bands; /* number of valid entries */ + pid_sched_entry_t bands[PID_SCHED_MAX_BANDS]; /* up to 6 x 16 = 96 bytes */ +} jlink_tlm_sched_t; /* 1 + 96 = 97 bytes max */ + /* ---- Volatile state (read from main loop) ---- */ typedef struct { /* Drive command - updated on JLINK_CMD_DRIVE */ @@ -161,55 +180,37 @@ typedef struct { volatile int16_t gimbal_pan_x10; /* pan angle deg x10 */ volatile int16_t gimbal_tilt_x10; /* tilt angle deg x10 */ volatile uint16_t gimbal_speed; /* servo speed 0-4095 (0=max) */ + + /* PID schedule commands (Issue #550) - set by parser, cleared by main loop */ + volatile uint8_t sched_get_req; /* SCHED_GET: main loop calls jlink_send_sched_telemetry() */ + volatile uint8_t sched_save_req; /* SCHED_SAVE: main loop calls pid_schedule_flash_save() */ + volatile float sched_save_kp; /* kp for single-PID record in SCHED_SAVE */ + volatile float sched_save_ki; + volatile float sched_save_kd; } JLinkState; extern volatile JLinkState jlink_state; +/* ---- SCHED_SET receive buffer -- Issue #550 ---- */ +/* + * Populated by the parser on JLINK_CMD_SCHED_SET. Main loop reads via + * jlink_get_sched_set() and calls pid_schedule_set_table() before clearing. + */ +typedef struct { + volatile uint8_t ready; /* set by parser, cleared by main loop */ + volatile uint8_t num_bands; + pid_sched_entry_t bands[PID_SCHED_MAX_BANDS]; /* copied from frame */ +} JLinkSchedSetBuf; + /* ---- API ---- */ -/* - * jlink_init() - configure USART1 (PB6=TX, PB7=RX) at 921600 baud with - * DMA2_Stream2_Channel4 circular RX (128-byte buffer) and IDLE interrupt. - * Call once before safety_init(). - */ void jlink_init(void); - -/* - * jlink_is_active(now_ms) - returns true if a valid frame arrived within - * JLINK_HB_TIMEOUT_MS. Returns false if no frame ever received. - */ bool jlink_is_active(uint32_t now_ms); - -/* - * jlink_send_telemetry(status) - build and transmit a JLINK_TLM_STATUS frame - * over USART1 TX (blocking, ~0.2ms at 921600). Call at JLINK_TLM_HZ. - */ -void jlink_send_telemetry(const jlink_tlm_status_t *status); - -/* - * jlink_process() - drain DMA circular buffer and parse frames. - * Call from main loop every iteration (not ISR). Lightweight: O(bytes_pending). - */ void jlink_process(void); - -/* - * jlink_send_power_telemetry(power) - build and transmit a JLINK_TLM_POWER - * frame (17 bytes) at PM_TLM_HZ. Call from main loop when not in STOP mode. - */ +void jlink_send_telemetry(const jlink_tlm_status_t *status); void jlink_send_power_telemetry(const jlink_tlm_power_t *power); - -/* - * jlink_send_pid_result(result) - build and transmit a JLINK_TLM_PID_RESULT - * frame (19 bytes) to confirm PID flash save outcome (Issue #531). - */ -void jlink_send_pid_result(const jlink_tlm_pid_result_t *result); - -/* - * jlink_send_battery_telemetry(batt) - build and transmit JLINK_TLM_BATTERY - * (0x82) frame (16 bytes) at BATTERY_ADC_PUBLISH_HZ (1 Hz). - * Called by battery_adc_publish(); not normally called directly. - */ void jlink_send_battery_telemetry(const jlink_tlm_battery_t *batt); +void jlink_send_pid_result(const jlink_tlm_pid_result_t *result); /* * jlink_send_gimbal_state(state) - transmit JLINK_TLM_GIMBAL_STATE (0x84) @@ -217,4 +218,18 @@ void jlink_send_battery_telemetry(const jlink_tlm_battery_t *batt); */ void jlink_send_gimbal_state(const jlink_tlm_gimbal_state_t *state); +/* + * jlink_send_sched_telemetry(tlm) - transmit JLINK_TLM_SCHED (0x85) in + * response to SCHED_GET. tlm->num_bands determines actual frame size. + * Issue #550. + */ +void jlink_send_sched_telemetry(const jlink_tlm_sched_t *tlm); + +/* + * jlink_get_sched_set() - return pointer to the most-recently received + * SCHED_SET payload buffer (static storage in jlink.c). Main loop calls + * pid_schedule_set_table() from this buffer, then clears ready. Issue #550. + */ +JLinkSchedSetBuf *jlink_get_sched_set(void); + #endif /* JLINK_H */ diff --git a/jetson/ros2_ws/src/saltybot_health_monitor/config/sensor_health_params.yaml b/jetson/ros2_ws/src/saltybot_health_monitor/config/sensor_health_params.yaml new file mode 100644 index 0000000..f203362 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_health_monitor/config/sensor_health_params.yaml @@ -0,0 +1,66 @@ +sensor_health_node: + ros__parameters: + # Publish rate for DiagnosticArray and JSON summary + check_hz: 1.0 + + # MQTT broker for saltybot/health topic + mqtt_broker: "localhost" + mqtt_port: 1883 + mqtt_topic: "saltybot/health" + mqtt_enabled: true + +# Per-sensor thresholds and configuration +# Each entry: topic, name, warn_s (WARN threshold), error_s (ERROR threshold), critical +# +# critical=true: system cannot operate without this sensor +# warn_s: topic age (s) that triggers WARN level +# error_s: topic age (s) that triggers ERROR level + +sensors: + - topic: "/camera/color/image_raw" + name: "camera_color" + warn_s: 1.0 + error_s: 3.0 + critical: true + + - topic: "/camera/depth/image_rect_raw" + name: "camera_depth" + warn_s: 1.0 + error_s: 3.0 + critical: true + + - topic: "/camera/color/camera_info" + name: "camera_info" + warn_s: 2.0 + error_s: 5.0 + critical: false + + - topic: "/scan" + name: "lidar" + warn_s: 1.0 + error_s: 3.0 + critical: true + + - topic: "/imu/data" + name: "imu" + warn_s: 0.5 + error_s: 2.0 + critical: true + + - topic: "/saltybot/uwb/range" + name: "uwb" + warn_s: 2.0 + error_s: 5.0 + critical: false + + - topic: "/saltybot/battery" + name: "battery" + warn_s: 3.0 + error_s: 8.0 + critical: false + + - topic: "/saltybot/motor_daemon/status" + name: "motor_daemon" + warn_s: 2.0 + error_s: 5.0 + critical: true diff --git a/jetson/ros2_ws/src/saltybot_health_monitor/launch/sensor_health.launch.py b/jetson/ros2_ws/src/saltybot_health_monitor/launch/sensor_health.launch.py new file mode 100644 index 0000000..c333907 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_health_monitor/launch/sensor_health.launch.py @@ -0,0 +1,50 @@ +"""sensor_health.launch.py — Launch sensor health monitor node (Issue #566).""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description() -> LaunchDescription: + pkg = get_package_share_directory("saltybot_health_monitor") + + check_hz_arg = DeclareLaunchArgument( + "check_hz", + default_value="1.0", + description="Health check publish rate (Hz)", + ) + mqtt_broker_arg = DeclareLaunchArgument( + "mqtt_broker", + default_value="localhost", + description="MQTT broker hostname", + ) + mqtt_enabled_arg = DeclareLaunchArgument( + "mqtt_enabled", + default_value="true", + description="Enable MQTT publishing to saltybot/health", + ) + + sensor_health_node = Node( + package="saltybot_health_monitor", + executable="sensor_health_node", + name="sensor_health_node", + output="screen", + parameters=[ + os.path.join(pkg, "config", "sensor_health_params.yaml"), + { + "check_hz": LaunchConfiguration("check_hz"), + "mqtt_broker": LaunchConfiguration("mqtt_broker"), + "mqtt_enabled": LaunchConfiguration("mqtt_enabled"), + }, + ], + ) + + return LaunchDescription([ + check_hz_arg, + mqtt_broker_arg, + mqtt_enabled_arg, + sensor_health_node, + ]) diff --git a/jetson/ros2_ws/src/saltybot_health_monitor/package.xml b/jetson/ros2_ws/src/saltybot_health_monitor/package.xml index b49a494..95dab9b 100644 --- a/jetson/ros2_ws/src/saltybot_health_monitor/package.xml +++ b/jetson/ros2_ws/src/saltybot_health_monitor/package.xml @@ -1,27 +1,24 @@ - saltybot_health_monitor - 0.1.0 + 0.2.0 - ROS2 system health monitor for SaltyBot. Central node that monitors heartbeats - from all critical nodes, detects when nodes go down (>5s silent), triggers - auto-restart, publishes /saltybot/system_health JSON, and alerts face display - on critical failures. + ROS2 health monitor for SaltyBot. Tracks node heartbeats and sensor topic + staleness. Publishes DiagnosticArray on /saltybot/diagnostics and MQTT on + saltybot/health. Issue #566. - sl-controls - MIT + sl-jetson + Apache-2.0 rclpy std_msgs geometry_msgs + sensor_msgs + diagnostic_msgs ament_python - ament_copyright - ament_flake8 - ament_pep257 - python3-pytest + pytest ament_python diff --git a/jetson/ros2_ws/src/saltybot_health_monitor/saltybot_health_monitor/sensor_health_node.py b/jetson/ros2_ws/src/saltybot_health_monitor/saltybot_health_monitor/sensor_health_node.py new file mode 100644 index 0000000..960132d --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_health_monitor/saltybot_health_monitor/sensor_health_node.py @@ -0,0 +1,385 @@ +#!/usr/bin/env python3 +"""sensor_health_node.py — ROS2 sensor topic health monitor (Issue #566). + +Monitors all sensor topics for staleness. Each sensor is checked against +configurable WARN and ERROR timeouts. Results are published as a ROS2 +DiagnosticArray and as an MQTT JSON summary. + +Monitored topics (configurable via sensor_health_params.yaml): + /camera/color/image_raw — RealSense colour stream + /camera/depth/image_rect_raw — RealSense depth stream + /camera/color/camera_info — RealSense camera info + /scan — LiDAR 2-D scan + /imu/data — IMU (BNO055 via JLink) + /saltybot/uwb/range — UWB ranging + /saltybot/battery — Battery telemetry (JSON string) + /saltybot/motor_daemon/status — Motor daemon status + +Published topics: + /saltybot/diagnostics (diagnostic_msgs/DiagnosticArray) — full per-sensor status + /saltybot/sensor_health (std_msgs/String) — JSON summary + +MQTT: + Topic: saltybot/health + Payload: JSON {timestamp, overall, sensors:{name: {status, age_s, hz}}} + +Parameters (config/sensor_health_params.yaml): + check_hz 1.0 Health check publish rate + mqtt_broker localhost + mqtt_port 1883 + mqtt_topic saltybot/health + mqtt_enabled true + sensors list of {topic, name, warn_s, error_s, critical} +""" + +from __future__ import annotations + +import json +import time +import threading +from dataclasses import dataclass, field +from typing import Dict, List, Optional + +import rclpy +from rclpy.node import Node +from rclpy.qos import ( + DurabilityPolicy, HistoryPolicy, QoSProfile, ReliabilityPolicy +) + +from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue +from sensor_msgs.msg import CameraInfo, Image, Imu, LaserScan +from std_msgs.msg import String + + +# ── Diagnostic level aliases ─────────────────────────────────────────────────── + +OK = DiagnosticStatus.OK # 0 +WARN = DiagnosticStatus.WARN # 1 +ERROR = DiagnosticStatus.ERROR # 2 + +_LEVEL_NAMES = {OK: "OK", WARN: "WARN", ERROR: "ERROR"} + +# ── Default sensor configuration ────────────────────────────────────────────── + +DEFAULT_SENSORS: List[dict] = [ + {"topic": "/camera/color/image_raw", "name": "camera_color", "warn_s": 1.0, "error_s": 3.0, "critical": True}, + {"topic": "/camera/depth/image_rect_raw", "name": "camera_depth", "warn_s": 1.0, "error_s": 3.0, "critical": True}, + {"topic": "/camera/color/camera_info", "name": "camera_info", "warn_s": 2.0, "error_s": 5.0, "critical": False}, + {"topic": "/scan", "name": "lidar", "warn_s": 1.0, "error_s": 3.0, "critical": True}, + {"topic": "/imu/data", "name": "imu", "warn_s": 0.5, "error_s": 2.0, "critical": True}, + {"topic": "/saltybot/uwb/range", "name": "uwb", "warn_s": 2.0, "error_s": 5.0, "critical": False}, + {"topic": "/saltybot/battery", "name": "battery", "warn_s": 3.0, "error_s": 8.0, "critical": False}, + {"topic": "/saltybot/motor_daemon/status", "name": "motor_daemon", "warn_s": 2.0, "error_s": 5.0, "critical": True}, +] + + +# ── SensorWatcher ───────────────────────────────────────────────────────────── + +class SensorWatcher: + """Tracks message receipt timestamps and rate for a single topic. + + Thread-safe: callback may fire from any executor thread. + """ + + def __init__(self, topic: str, name: str, + warn_s: float, error_s: float, critical: bool) -> None: + self.topic = topic + self.name = name + self.warn_s = warn_s + self.error_s = error_s + self.critical = critical + + self._last_rx: float = 0.0 # monotonic; 0 = never received + self._count: int = 0 + self._rate_hz: float = 0.0 + self._rate_ts: float = time.monotonic() + self._rate_cnt: int = 0 + self._lock = threading.Lock() + + # ── Callback (called from subscription) ──────────────────────────────── + + def on_message(self, _msg) -> None: + """Record receipt of any message on this topic.""" + now = time.monotonic() + with self._lock: + self._last_rx = now + self._count += 1 + self._rate_cnt += 1 + # Update rate estimate every ~2 s + elapsed = now - self._rate_ts + if elapsed >= 2.0: + self._rate_hz = self._rate_cnt / elapsed + self._rate_cnt = 0 + self._rate_ts = now + + # ── Status query ─────────────────────────────────────────────────────── + + def age_s(self, now: Optional[float] = None) -> float: + """Seconds since last message (∞ if never received).""" + if now is None: + now = time.monotonic() + with self._lock: + if self._last_rx == 0.0: + return float("inf") + return now - self._last_rx + + def rate_hz(self) -> float: + with self._lock: + return self._rate_hz + + def msg_count(self) -> int: + with self._lock: + return self._count + + def level(self, now: Optional[float] = None) -> int: + age = self.age_s(now) + if age >= self.error_s: + return ERROR + if age >= self.warn_s: + return WARN + return OK + + def diagnostic_status(self, now: Optional[float] = None) -> DiagnosticStatus: + if now is None: + now = time.monotonic() + age = self.age_s(now) + lvl = self.level(now) + hz = self.rate_hz() + cnt = self.msg_count() + + if age == float("inf"): + msg = f"ERROR: no data received on {self.topic}" + elif lvl == ERROR: + msg = f"ERROR: stale {age:.1f}s (threshold {self.error_s:.1f}s)" + elif lvl == WARN: + msg = f"WARN: stale {age:.1f}s (threshold {self.warn_s:.1f}s)" + else: + msg = f"OK ({hz:.1f} Hz)" + + age_str = "inf" if age == float("inf") else f"{age:.2f}" + status = DiagnosticStatus() + status.level = lvl + status.name = self.name + status.message = msg + status.hardware_id = self.topic + status.values = [ + KeyValue(key="topic", value=self.topic), + KeyValue(key="age_s", value=age_str), + KeyValue(key="rate_hz", value=f"{hz:.2f}"), + KeyValue(key="count", value=str(cnt)), + KeyValue(key="warn_s", value=str(self.warn_s)), + KeyValue(key="error_s", value=str(self.error_s)), + KeyValue(key="critical", value=str(self.critical)), + ] + return status + + def summary_dict(self, now: Optional[float] = None) -> dict: + if now is None: + now = time.monotonic() + age = self.age_s(now) + return { + "status": _LEVEL_NAMES[self.level(now)], + "age_s": round(age, 2) if age != float("inf") else None, + "rate_hz": round(self.rate_hz(), 2), + "count": self.msg_count(), + "critical": self.critical, + } + + +# ── SensorHealthNode ─────────────────────────────────────────────────────────── + +class SensorHealthNode(Node): + """Monitor all sensor topics; publish DiagnosticArray + MQTT JSON.""" + + def __init__(self) -> None: + super().__init__("sensor_health_node") + + # ── Parameters ───────────────────────────────────────────────────── + self.declare_parameter("check_hz", 1.0) + self.declare_parameter("mqtt_broker", "localhost") + self.declare_parameter("mqtt_port", 1883) + self.declare_parameter("mqtt_topic", "saltybot/health") + self.declare_parameter("mqtt_enabled", True) + + check_hz = self.get_parameter("check_hz").value + self._mqtt_broker = self.get_parameter("mqtt_broker").value + self._mqtt_port = int(self.get_parameter("mqtt_port").value) + self._mqtt_topic = self.get_parameter("mqtt_topic").value + mqtt_enabled = self.get_parameter("mqtt_enabled").value + + # ── Build sensor watchers ─────────────────────────────────────────── + self._watchers: Dict[str, SensorWatcher] = {} + for cfg in DEFAULT_SENSORS: + w = SensorWatcher( + topic = cfg["topic"], + name = cfg["name"], + warn_s = float(cfg.get("warn_s", 1.0)), + error_s = float(cfg.get("error_s", 3.0)), + critical = bool(cfg.get("critical", False)), + ) + self._watchers[cfg["name"]] = w + + # ── Subscriptions — one per sensor type ──────────────────────────── + # Best-effort QoS for sensor data (sensors may use BEST_EFFORT publishers) + be_qos = QoSProfile( + history=HistoryPolicy.KEEP_LAST, depth=1, + reliability=ReliabilityPolicy.BEST_EFFORT, + durability=DurabilityPolicy.VOLATILE, + ) + rel_qos = QoSProfile( + history=HistoryPolicy.KEEP_LAST, depth=5, + reliability=ReliabilityPolicy.RELIABLE, + ) + + self._subscribe(Image, "/camera/color/image_raw", "camera_color", be_qos) + self._subscribe(Image, "/camera/depth/image_rect_raw", "camera_depth", be_qos) + self._subscribe(CameraInfo, "/camera/color/camera_info", "camera_info", be_qos) + self._subscribe(LaserScan, "/scan", "lidar", be_qos) + self._subscribe(Imu, "/imu/data", "imu", be_qos) + self._subscribe(String, "/saltybot/uwb/range", "uwb", rel_qos) + self._subscribe(String, "/saltybot/battery", "battery", rel_qos) + self._subscribe(String, "/saltybot/motor_daemon/status", "motor_daemon", rel_qos) + + # ── Publishers ───────────────────────────────────────────────────── + self._pub_diag = self.create_publisher( + DiagnosticArray, "/saltybot/diagnostics", 10) + self._pub_health = self.create_publisher( + String, "/saltybot/sensor_health", 10) + + # ── MQTT ─────────────────────────────────────────────────────────── + self._mqtt_client = None + if mqtt_enabled: + self._connect_mqtt() + + # ── Timer ────────────────────────────────────────────────────────── + self.create_timer(1.0 / max(0.1, check_hz), self._publish_diagnostics) + + sensor_list = ", ".join(self._watchers.keys()) + self.get_logger().info( + f"[sensor_health] monitoring {len(self._watchers)} sensors: {sensor_list}" + ) + + # ── Subscription helper ──────────────────────────────────────────────── + + def _subscribe(self, msg_type, topic: str, name: str, qos) -> None: + if name not in self._watchers: + return + watcher = self._watchers[name] + self.create_subscription(msg_type, topic, watcher.on_message, qos) + + # ── MQTT ─────────────────────────────────────────────────────────────── + + def _connect_mqtt(self) -> None: + try: + import paho.mqtt.client as mqtt # type: ignore + self._mqtt_client = mqtt.Client(client_id="saltybot_sensor_health") + self._mqtt_client.on_connect = self._on_mqtt_connect + self._mqtt_client.connect_async(self._mqtt_broker, self._mqtt_port, 60) + self._mqtt_client.loop_start() + except ImportError: + self.get_logger().warn( + "[sensor_health] paho-mqtt not installed — MQTT publishing disabled" + ) + except Exception as e: + self.get_logger().warn(f"[sensor_health] MQTT connect failed: {e}") + + def _on_mqtt_connect(self, client, userdata, flags, rc) -> None: + if rc == 0: + self.get_logger().info( + f"[sensor_health] MQTT connected to {self._mqtt_broker}:{self._mqtt_port}" + ) + else: + self.get_logger().warn(f"[sensor_health] MQTT connect rc={rc}") + + def _publish_mqtt(self, payload: str) -> None: + if self._mqtt_client is None: + return + try: + self._mqtt_client.publish(self._mqtt_topic, payload, qos=0, retain=True) + except Exception as e: + self.get_logger().warn(f"[sensor_health] MQTT publish error: {e}") + + # ── Diagnostic publisher ─────────────────────────────────────────────── + + def _publish_diagnostics(self) -> None: + now = time.monotonic() + wall = time.time() + + # Build DiagnosticArray + diag_array = DiagnosticArray() + diag_array.header.stamp = self.get_clock().now().to_msg() + + sensor_summaries: dict = {} + worst_level = OK + + for name, watcher in self._watchers.items(): + ds = watcher.diagnostic_status(now) + diag_array.status.append(ds) + + if ds.level > worst_level: + worst_level = ds.level + + sensor_summaries[name] = watcher.summary_dict(now) + + # Overall system status + n_error = sum(1 for w in self._watchers.values() if w.level(now) == ERROR) + n_warn = sum(1 for w in self._watchers.values() if w.level(now) == WARN) + crit_err = [n for n, w in self._watchers.items() + if w.critical and w.level(now) == ERROR] + + overall = "OK" + if crit_err: + overall = "ERROR" + elif n_error > 0: + overall = "ERROR" + elif n_warn > 0: + overall = "WARN" + + # Publish DiagnosticArray + self._pub_diag.publish(diag_array) + + # JSON summary + summary = { + "timestamp": wall, + "overall": overall, + "n_ok": len(self._watchers) - n_error - n_warn, + "n_warn": n_warn, + "n_error": n_error, + "critical_err": crit_err, + "sensors": sensor_summaries, + } + payload = json.dumps(summary) + self._pub_health.publish(String(data=payload)) + self._publish_mqtt(payload) + + # Log on transitions or errors + if worst_level >= ERROR: + self.get_logger().warn( + f"[sensor_health] {overall}: {n_error} error(s), {n_warn} warn(s)" + + (f" — critical: {crit_err}" if crit_err else "") + ) + + def destroy_node(self) -> None: + if self._mqtt_client is not None: + try: + self._mqtt_client.loop_stop() + self._mqtt_client.disconnect() + except Exception: + pass + super().destroy_node() + + +def main(args=None) -> None: + rclpy.init(args=args) + node = SensorHealthNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_health_monitor/setup.py b/jetson/ros2_ws/src/saltybot_health_monitor/setup.py index 1b08bbb..5179230 100644 --- a/jetson/ros2_ws/src/saltybot_health_monitor/setup.py +++ b/jetson/ros2_ws/src/saltybot_health_monitor/setup.py @@ -4,27 +4,34 @@ package_name = "saltybot_health_monitor" setup( name=package_name, - version="0.1.0", + version="0.2.0", packages=[package_name], data_files=[ ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), (f"share/{package_name}", ["package.xml"]), - (f"share/{package_name}/launch", ["launch/health_monitor.launch.py"]), - (f"share/{package_name}/config", ["config/health_config.yaml"]), + (f"share/{package_name}/launch", [ + "launch/health_monitor.launch.py", + "launch/sensor_health.launch.py", + ]), + (f"share/{package_name}/config", [ + "config/health_config.yaml", + "config/sensor_health_params.yaml", + ]), ], - install_requires=["setuptools", "pyyaml"], + install_requires=["setuptools", "pyyaml", "paho-mqtt"], zip_safe=True, - maintainer="sl-controls", - maintainer_email="sl-controls@saltylab.local", + maintainer="sl-jetson", + maintainer_email="sl-jetson@saltylab.local", description=( - "System health monitor: tracks node heartbeats, detects down nodes, " - "triggers auto-restart, publishes system health status" + "System health monitor: node heartbeats + sensor topic staleness " + "detection with DiagnosticArray and MQTT (Issue #566)" ), - license="MIT", + license="Apache-2.0", tests_require=["pytest"], entry_points={ "console_scripts": [ "health_monitor_node = saltybot_health_monitor.health_monitor_node:main", + "sensor_health_node = saltybot_health_monitor.sensor_health_node:main", ], }, ) diff --git a/jetson/ros2_ws/src/saltybot_health_monitor/test/test_sensor_health.py b/jetson/ros2_ws/src/saltybot_health_monitor/test/test_sensor_health.py new file mode 100644 index 0000000..0ea909a --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_health_monitor/test/test_sensor_health.py @@ -0,0 +1,344 @@ +"""test_sensor_health.py — Unit tests for sensor_health_node (Issue #566). + +Runs entirely offline: no ROS2 runtime, no hardware, no MQTT broker required. +""" + +import json +import sys +import time +import types +import unittest +from unittest.mock import MagicMock, patch + +# ── Stub ROS2 and sensor_msgs so tests run offline ──────────────────────────── + +def _install_stubs(): + # rclpy + rclpy = types.ModuleType("rclpy") + rclpy.init = lambda **_: None + rclpy.spin = lambda _: None + rclpy.shutdown = lambda: None + + node_mod = types.ModuleType("rclpy.node") + class _Node: + def __init__(self, *a, **kw): pass + def declare_parameter(self, *a, **kw): pass + def get_parameter(self, name): + defaults = { + "check_hz": 1.0, "mqtt_broker": "localhost", + "mqtt_port": 1883, "mqtt_topic": "saltybot/health", + "mqtt_enabled": False, + } + m = MagicMock(); m.value = defaults.get(name, False) + return m + def get_logger(self): return MagicMock() + def get_clock(self): return MagicMock() + def create_subscription(self, *a, **kw): pass + def create_publisher(self, *a, **kw): return MagicMock() + def create_timer(self, *a, **kw): pass + def destroy_node(self): pass + node_mod.Node = _Node + rclpy.node = node_mod + + qos_mod = types.ModuleType("rclpy.qos") + for attr in ("QoSProfile", "HistoryPolicy", "ReliabilityPolicy", "DurabilityPolicy"): + setattr(qos_mod, attr, MagicMock()) + rclpy.qos = qos_mod + + sys.modules.setdefault("rclpy", rclpy) + sys.modules.setdefault("rclpy.node", rclpy.node) + sys.modules.setdefault("rclpy.qos", rclpy.qos) + + # diagnostic_msgs + diag = types.ModuleType("diagnostic_msgs") + diag_msg = types.ModuleType("diagnostic_msgs.msg") + + class _DiagStatus: + OK = 0 + WARN = 1 + ERROR = 2 + def __init__(self): + self.level = 0; self.name = ""; self.message = "" + self.hardware_id = ""; self.values = [] + + class _DiagArray: + def __init__(self): + self.header = MagicMock(); self.status = [] + + class _KeyValue: + def __init__(self, key="", value=""): + self.key = key; self.value = value + + diag_msg.DiagnosticStatus = _DiagStatus + diag_msg.DiagnosticArray = _DiagArray + diag_msg.KeyValue = _KeyValue + diag.msg = diag_msg + sys.modules.setdefault("diagnostic_msgs", diag) + sys.modules.setdefault("diagnostic_msgs.msg", diag_msg) + + # sensor_msgs + sens = types.ModuleType("sensor_msgs") + sens_msg = types.ModuleType("sensor_msgs.msg") + for cls_name in ("Image", "CameraInfo", "Imu", "LaserScan"): + setattr(sens_msg, cls_name, MagicMock) + sens.msg = sens_msg + sys.modules.setdefault("sensor_msgs", sens) + sys.modules.setdefault("sensor_msgs.msg", sens_msg) + + # std_msgs + std = types.ModuleType("std_msgs") + std_msg = types.ModuleType("std_msgs.msg") + class _String: + def __init__(self, data=""): self.data = data + std_msg.String = _String + std.msg = std_msg + sys.modules.setdefault("std_msgs", std) + sys.modules.setdefault("std_msgs.msg", std_msg) + + +_install_stubs() + +from saltybot_health_monitor.sensor_health_node import ( # noqa: E402 + SensorWatcher, OK, WARN, ERROR, _LEVEL_NAMES, +) + + +# ── SensorWatcher: initial state ────────────────────────────────────────────── + +class TestSensorWatcherInitial(unittest.TestCase): + + def setUp(self): + self.w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=True) + + def test_initial_age_is_inf(self): + self.assertEqual(self.w.age_s(), float("inf")) + + def test_initial_level_is_error(self): + # Never received → age=inf ≥ error_s → ERROR + self.assertEqual(self.w.level(), ERROR) + + def test_initial_count_zero(self): + self.assertEqual(self.w.msg_count(), 0) + + def test_initial_rate_zero(self): + self.assertAlmostEqual(self.w.rate_hz(), 0.0) + + def test_name_stored(self): + self.assertEqual(self.w.name, "lidar") + + def test_topic_stored(self): + self.assertEqual(self.w.topic, "/scan") + + def test_critical_stored(self): + self.assertTrue(self.w.critical) + + +# ── SensorWatcher: after receiving messages ─────────────────────────────────── + +class TestSensorWatcherAfterMessage(unittest.TestCase): + + def setUp(self): + self.w = SensorWatcher("/imu/data", "imu", warn_s=0.5, error_s=2.0, critical=True) + self.w.on_message(None) # simulate message receipt + + def test_age_is_small_after_message(self): + self.assertLess(self.w.age_s(), 0.1) + + def test_level_ok_immediately_after(self): + self.assertEqual(self.w.level(), OK) + + def test_count_increments(self): + self.w.on_message(None) + self.assertEqual(self.w.msg_count(), 2) + + def test_multiple_messages(self): + for _ in range(10): + self.w.on_message(None) + self.assertEqual(self.w.msg_count(), 11) + + +# ── SensorWatcher: staleness thresholds ────────────────────────────────────── + +class TestSensorWatcherStaleness(unittest.TestCase): + + def _make_stale(self, seconds_ago: float) -> SensorWatcher: + """Return a watcher whose last_rx was `seconds_ago` seconds in the past.""" + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=False) + w.on_message(None) + # Backdate the last_rx directly + with w._lock: + w._last_rx -= seconds_ago + return w + + def test_ok_when_fresh(self): + w = self._make_stale(0.5) + self.assertEqual(w.level(), OK) + + def test_warn_at_warn_threshold(self): + w = self._make_stale(1.1) + self.assertEqual(w.level(), WARN) + + def test_error_at_error_threshold(self): + w = self._make_stale(3.1) + self.assertEqual(w.level(), ERROR) + + def test_age_matches_backdated_time(self): + w = self._make_stale(2.0) + self.assertAlmostEqual(w.age_s(), 2.0, delta=0.1) + + def test_warn_level_between_thresholds(self): + w = self._make_stale(2.0) # 1.0 < 2.0 < 3.0 + self.assertEqual(w.level(), WARN) + + +# ── SensorWatcher: diagnostic_status output ────────────────────────────────── + +class TestSensorWatcherDiagStatus(unittest.TestCase): + + def test_never_received_is_error(self): + w = SensorWatcher("/camera/color/image_raw", "camera_color", + warn_s=1.0, error_s=3.0, critical=True) + ds = w.diagnostic_status() + self.assertEqual(ds.level, ERROR) + self.assertIn("no data", ds.message) + + def test_ok_status_message(self): + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=False) + w.on_message(None) + ds = w.diagnostic_status() + self.assertEqual(ds.level, OK) + self.assertIn("OK", ds.message) + + def test_warn_status_message(self): + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=False) + w.on_message(None) + with w._lock: + w._last_rx -= 1.5 + ds = w.diagnostic_status() + self.assertEqual(ds.level, WARN) + self.assertIn("WARN", ds.message) + + def test_hardware_id_is_topic(self): + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=False) + w.on_message(None) + ds = w.diagnostic_status() + self.assertEqual(ds.hardware_id, "/scan") + + def test_kv_keys_present(self): + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=False) + w.on_message(None) + ds = w.diagnostic_status() + kv_keys = {kv.key for kv in ds.values} + for expected in ("topic", "age_s", "rate_hz", "count", "warn_s", "error_s"): + self.assertIn(expected, kv_keys) + + def test_age_inf_displayed_as_inf(self): + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=False) + ds = w.diagnostic_status() + kv = {kv.key: kv.value for kv in ds.values} + self.assertEqual(kv["age_s"], "inf") + + +# ── SensorWatcher: summary_dict output ─────────────────────────────────────── + +class TestSensorWatcherSummaryDict(unittest.TestCase): + + def test_never_received_age_is_none(self): + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=False) + d = w.summary_dict() + self.assertIsNone(d["age_s"]) + self.assertEqual(d["status"], "ERROR") + + def test_ok_status_string(self): + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=False) + w.on_message(None) + d = w.summary_dict() + self.assertEqual(d["status"], "OK") + + def test_warn_status_string(self): + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=False) + w.on_message(None) + with w._lock: + w._last_rx -= 1.5 + d = w.summary_dict() + self.assertEqual(d["status"], "WARN") + + def test_error_status_string(self): + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=False) + w.on_message(None) + with w._lock: + w._last_rx -= 5.0 + d = w.summary_dict() + self.assertEqual(d["status"], "ERROR") + + def test_age_rounded(self): + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=False) + w.on_message(None) + d = w.summary_dict() + self.assertIsInstance(d["age_s"], float) + + def test_critical_flag(self): + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=True) + self.assertTrue(w.summary_dict()["critical"]) + + def test_non_critical_flag(self): + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=False) + self.assertFalse(w.summary_dict()["critical"]) + + def test_count_in_summary(self): + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=False) + for _ in range(5): + w.on_message(None) + self.assertEqual(w.summary_dict()["count"], 5) + + +# ── Level name mapping ───────────────────────────────────────────────────────── + +class TestLevelNames(unittest.TestCase): + + def test_ok_name(self): + self.assertEqual(_LEVEL_NAMES[OK], "OK") + + def test_warn_name(self): + self.assertEqual(_LEVEL_NAMES[WARN], "WARN") + + def test_error_name(self): + self.assertEqual(_LEVEL_NAMES[ERROR], "ERROR") + + +# ── Thread safety ───────────────────────────────────────────────────────────── + +class TestSensorWatcherThreadSafety(unittest.TestCase): + + def test_concurrent_messages(self): + import threading + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=False) + N = 500 + threads = [threading.Thread(target=w.on_message, args=(None,)) for _ in range(N)] + for t in threads: + t.start() + for t in threads: + t.join() + self.assertEqual(w.msg_count(), N) + + def test_concurrent_reads(self): + import threading + w = SensorWatcher("/scan", "lidar", warn_s=1.0, error_s=3.0, critical=False) + w.on_message(None) + errors = [] + def read_loop(): + for _ in range(100): + try: + w.level() + w.age_s() + w.rate_hz() + except Exception as e: + errors.append(e) + threads = [threading.Thread(target=read_loop) for _ in range(5)] + for t in threads: t.start() + for t in threads: t.join() + self.assertEqual(errors, []) + + +if __name__ == "__main__": + unittest.main()