Merge remote-tracking branch 'origin/sl-firmware/issue-445-diagnostics'

# Conflicts:
#	jetson/ros2_ws/src/saltybot_diagnostics/README.md
#	jetson/ros2_ws/src/saltybot_diagnostics/config/diagnostic_checks.yaml
#	jetson/ros2_ws/src/saltybot_diagnostics/launch/diagnostics.launch.py
#	jetson/ros2_ws/src/saltybot_diagnostics/package.xml
#	jetson/ros2_ws/src/saltybot_diagnostics/saltybot_diagnostics/diagnostics_node.py
#	jetson/ros2_ws/src/saltybot_diagnostics/setup.py
#	jetson/ros2_ws/src/saltybot_diagnostics/test/test_diagnostics.py
This commit is contained in:
sl-jetson 2026-03-05 09:10:24 -05:00
commit b9a6eaa3fa
7 changed files with 202 additions and 875 deletions

View File

@ -1,198 +1,35 @@
# SaltyBot Diagnostic Self-Test System # SaltyBot Diagnostic Self-Test System
Comprehensive hardware diagnostics and health monitoring for SaltyBot. Performs startup checks on critical hardware, continuous runtime monitoring, and publishes detailed diagnostic data. Comprehensive hardware diagnostics and health monitoring for SaltyBot.
## Features ## Features
### Startup Hardware Checks ### Startup Checks
Validates hardware availability and connectivity at boot: - RPLIDAR, RealSense, VESC, Jabra mic, STM32, servos
- **RPLIDAR**: Serial port detection, rotation verification - WiFi, GPS, disk space, RAM
- **RealSense D435i**: USB enumeration, stream availability - Boot result TTS + face animation
- **VESC Motor Controller**: UART connection, firmware status - JSON logging
- **Jabra Microphone**: USB audio device detection
- **STM32 Bridge**: Serial port verification, watchdog status
- **Servo Controller**: I2C bus communication
- **WiFi**: Network interface status
- **GPS Module**: Serial port and fix detection
- **Disk Space**: Storage availability checking
- **System RAM**: Memory availability
### Runtime Monitoring ### Runtime Monitoring
Continuous health checks during operation: - Temperature (Orin GPU >80C, VESC >60C)
- **Sensor FPS**: RealSense and RPLIDAR frame rates - Network latency
- **Motor Stall**: Encoder latency detection - Sensor FPS drops
- **Temperature**: Orin GPU (>80°C warn, >85°C error), VESC (>60°C warn, >70°C error) - System resources
- **Network Latency**: Ping time monitoring
- **System Resources**: CPU, RAM usage trends
### Notifications & Feedback
- **TTS Announcements**: Boot result via text-to-speech
- **Face Animations**: Boot success/error display
- **Diagnostic Publishing**: `/saltybot/diagnostics` (DiagnosticArray)
- **JSON Logging**: Detailed logs to `/home/seb/saltybot-data/diagnostics/`
## Topics
### Published
- `/saltybot/diagnostics` (diagnostic_msgs/DiagnosticArray): System health status
- `/saltybot/tts_say` (std_msgs/String): Boot result announcement
- `/saltybot/face/boot_animation` (std_msgs/String): Boot animation trigger
### Subscribed
- Implicitly monitors system topics for FPS/latency data
## Configuration
Edit `config/diagnostic_checks.yaml`:
```yaml
startup_checks:
enabled: true
checks: [rplidar, realsense, vesc, jabra_microphone, ...]
runtime_monitoring:
enabled: true
frequency_hz: 1
temperatures:
jetson_gpu: {warn_c: 80, error_c: 85, critical_c: 90}
vesc_motor: {warn_c: 60, error_c: 70, critical_c: 80}
logging:
directory: /home/seb/saltybot-data/diagnostics
retention_days: 30
```
## Launch ## Launch
```bash ```bash
# Default launch with startup checks + runtime monitoring
ros2 launch saltybot_diagnostics diagnostics.launch.py ros2 launch saltybot_diagnostics diagnostics.launch.py
# Startup checks only
ros2 launch saltybot_diagnostics diagnostics.launch.py \
enable_runtime_monitoring:=false
# Custom config
ros2 launch saltybot_diagnostics diagnostics.launch.py \
config_file:=/path/to/custom_checks.yaml
``` ```
## Diagnostic Array Format ## Topics
Published to `/saltybot/diagnostics`: - `/saltybot/diagnostics` (DiagnosticArray)
- `/saltybot/tts_say` (String) - Boot announcements
- `/saltybot/face/boot_animation` (String)
```python ## Logs
diagnostic_msgs/DiagnosticArray:
header:
stamp: <timestamp>
status:
- name: "saltybot/rplidar"
level: 0 # OK=0, WARN=1, ERROR=2, STALE=3
message: "RPLIDAR detected on /dev/ttyUSB0"
values:
- key: "port"
value: "/dev/ttyUSB0"
- name: "saltybot/realsense" Diagnostic logs: `/home/seb/saltybot-data/diagnostics/`
level: 2 # ERROR
message: "RealSense not found on expected USB bus"
values: []
- name: "saltybot/gpu_temp" JSON format with hardware status, temperatures, and resource usage.
level: 1 # WARN
message: "Runtime check"
values:
- key: "temperature_c"
value: "82.5"
- key: "threshold_warn"
value: "80"
```
## JSON Diagnostics Log Format
Files saved to `/home/seb/saltybot-data/diagnostics/diagnostics_YYYYMMDD_HHMMSS.json`:
```json
{
"timestamp": "2025-03-05T10:00:00.123456",
"check_type": "startup_checks",
"hardware_checks": {
"rplidar": {
"status": "OK",
"message": "RPLIDAR detected on /dev/ttyUSB0",
"details": {"port": "/dev/ttyUSB0"}
},
"realsense": {
"status": "ERROR",
"message": "RealSense not found",
"details": {}
}
},
"runtime_metrics": {
"gpu_temp": {
"status": "OK",
"temperature_c": 65.0,
"threshold_warn": 80
},
"network_latency": {
"status": "WARN",
"latency_ms": 150
}
}
}
```
## TTS Announcements
Boot result messages published to `/saltybot/tts_say`:
- Success: "Boot complete. All systems online."
- With errors: "Boot complete with errors. RPLIDAR, RealSense offline."
## Status Levels
- **OK** (0): System healthy, no action needed
- **WARN** (1): Minor issues, monitor closely
- **ERROR** (2): Critical failure, may affect operation
- **STALE** (3): No data, check unavailable
## Logs and Data
Diagnostic logs stored in `/home/seb/saltybot-data/diagnostics/`:
- Auto-rotated every 100MB or 30 days
- JSON format for easy parsing
- Full boot record + runtime metrics
## Integration with Full Stack
Add to `full_stack.launch.py` at t=0s:
```python
IncludeLaunchDescription(
PythonLaunchDescriptionSource([...diagnostics.launch.py]),
launch_arguments={'enable_startup_check': 'true'}.items(),
)
```
## Debugging
Check current diagnostics:
```bash
ros2 topic echo /saltybot/diagnostics
```
View latest diagnostic log:
```bash
tail -f /home/seb/saltybot-data/diagnostics/diagnostics_*.json | jq
```
Simulate diagnostic errors (for testing):
```bash
# Monitor what would be logged
ros2 launch saltybot_diagnostics diagnostics.launch.py \
enable_startup_check:=true enable_runtime_monitoring:=true
```
## Hardware Requirements
- Jetson Orin (for temperature monitoring)
- Linux with psutil (for system resources)
- Standard ROS2 diagnostic_msgs package

View File

@ -1,9 +1,6 @@
# Diagnostic System Configuration
startup_checks: startup_checks:
# Hardware checks performed at boot
enabled: true enabled: true
timeout_s: 30 # Maximum time allowed for startup checks timeout_s: 30
checks: checks:
- rplidar - rplidar
- realsense - realsense
@ -16,68 +13,16 @@ startup_checks:
- disk_space - disk_space
- system_ram - system_ram
# Serial device mappings
serial_devices:
rplidar: /dev/ttyUSB0
vesc: [/dev/ttyUSB1, /dev/ttyUSB2, /dev/ttyACM0]
stm32_bridge: /dev/ttyUSB0
gps: [/dev/ttyUSB3, /dev/ttyUSB4, /dev/ttyACM1]
# Device thresholds
disk_space:
warn_percent: 80
error_percent: 90
critical_percent: 95
system_ram:
warn_percent: 80
error_percent: 90
critical_percent: 95
runtime_monitoring: runtime_monitoring:
# Continuous runtime health checks
enabled: true enabled: true
frequency_hz: 1 # Check frequency frequency_hz: 1
# Temperature thresholds
temperatures: temperatures:
jetson_gpu: jetson_gpu: {warn_c: 80, error_c: 85}
warn_c: 80 vesc_motor: {warn_c: 60, error_c: 70}
error_c: 85
critical_c: 90
vesc_motor:
warn_c: 60
error_c: 70
critical_c: 80
# Network monitoring
network: network:
ping_target: 8.8.8.8 ping_target: 8.8.8.8
latency_warn_ms: 100 latency_warn_ms: 100
latency_error_ms: 200
# Sensor monitoring
sensors:
realsense_min_fps: 15
rplidar_min_fps: 5
motor_stall_timeout_s: 2
# TTS notifications
notifications:
tts_boot_success: "Boot complete. All systems online."
tts_boot_error: "Boot complete with errors. Check diagnostics."
tts_on_critical: "System critical. {error_components} offline."
# Face animations
animations:
boot_success: boot_success
boot_error: boot_error
critical_alert: critical_alert
# Logging
logging: logging:
directory: /home/seb/saltybot-data/diagnostics directory: /home/seb/saltybot-data/diagnostics
enable_json_logs: true
enable_csv_logs: false
retention_days: 30 retention_days: 30
max_file_size_mb: 100

View File

@ -1,5 +1,4 @@
"""Launch diagnostic self-test node.""" """Launch diagnostic self-test node."""
import os import os
from ament_index_python.packages import get_package_share_directory from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription from launch import LaunchDescription
@ -7,59 +6,39 @@ from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node from launch_ros.actions import Node
def generate_launch_description(): def generate_launch_description():
"""Generate launch description for diagnostics."""
package_dir = get_package_share_directory("saltybot_diagnostics") package_dir = get_package_share_directory("saltybot_diagnostics")
config_dir = os.path.join(package_dir, "config") config_dir = os.path.join(package_dir, "config")
# Launch arguments return LaunchDescription([
config_file_arg = DeclareLaunchArgument( DeclareLaunchArgument(
"config_file", "config_file",
default_value=os.path.join(config_dir, "diagnostic_checks.yaml"), default_value=os.path.join(config_dir, "diagnostic_checks.yaml"),
description="Path to diagnostic checks configuration YAML file", description="Diagnostic checks configuration",
) ),
DeclareLaunchArgument(
enable_startup_arg = DeclareLaunchArgument( "enable_startup_check", default_value="true",
"enable_startup_check", description="Enable startup checks",
default_value="true", ),
description="Enable startup hardware checks", DeclareLaunchArgument(
) "enable_runtime_monitoring", default_value="true",
description="Enable runtime monitoring",
enable_runtime_arg = DeclareLaunchArgument( ),
"enable_runtime_monitoring", DeclareLaunchArgument(
default_value="true", "log_directory", default_value="/home/seb/saltybot-data/diagnostics",
description="Enable continuous runtime monitoring", description="Log directory",
) ),
Node(
log_dir_arg = DeclareLaunchArgument( package="saltybot_diagnostics",
"log_directory", executable="diagnostics_node",
default_value="/home/seb/saltybot-data/diagnostics", name="diagnostics",
description="Directory for diagnostic logs", output="screen",
) parameters=[{
# Diagnostics node
diagnostics_node = Node(
package="saltybot_diagnostics",
executable="diagnostics_node",
name="diagnostics",
output="screen",
parameters=[
{
"config_file": LaunchConfiguration("config_file"), "config_file": LaunchConfiguration("config_file"),
"enable_startup_check": LaunchConfiguration("enable_startup_check"), "enable_startup_check": LaunchConfiguration("enable_startup_check"),
"enable_runtime_monitoring": LaunchConfiguration("enable_runtime_monitoring"), "enable_runtime_monitoring": LaunchConfiguration("enable_runtime_monitoring"),
"log_directory": LaunchConfiguration("log_directory"), "log_directory": LaunchConfiguration("log_directory"),
"monitoring_frequency": 1.0, # Hz "monitoring_frequency": 1.0,
} }],
], ),
)
return LaunchDescription([
config_file_arg,
enable_startup_arg,
enable_runtime_arg,
log_dir_arg,
diagnostics_node,
]) ])

View File

@ -7,8 +7,6 @@
Comprehensive diagnostic self-test system for SaltyBot. Performs startup hardware Comprehensive diagnostic self-test system for SaltyBot. Performs startup hardware
checks (RPLIDAR, RealSense, VESC, Jabra, servos, WiFi, GPS, disk, RAM) and checks (RPLIDAR, RealSense, VESC, Jabra, servos, WiFi, GPS, disk, RAM) and
continuous monitoring (sensor FPS, motor stall, temps, network latency). continuous monitoring (sensor FPS, motor stall, temps, network latency).
Publishes /saltybot/diagnostics (DiagnosticArray), triggers TTS boot result,
face animation, and logs to /home/seb/saltybot-data/diagnostics/.
</description> </description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer> <maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license> <license>MIT</license>
@ -20,10 +18,8 @@
<depend>sensor_msgs</depend> <depend>sensor_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend> <buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend> <test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend> <test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend> <test_depend>python3-pytest</test_depend>
<export> <export>

View File

@ -1,46 +1,17 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Comprehensive diagnostic self-test for SaltyBot. """Diagnostic self-test system for SaltyBot.
Performs startup hardware checks and continuous runtime monitoring. Performs startup hardware checks and continuous runtime monitoring.
Publishes /saltybot/diagnostics (DiagnosticArray), triggers TTS boot result,
Startup checks: face animation, and logs to /home/seb/saltybot-data/diagnostics/.
- RPLIDAR (serial connection, rotation)
- RealSense D435i (USB enumeration, streams)
- VESC motor controller (UART connection, firmware)
- Jabra microphone (USB audio device)
- STM32 bridge (serial port, watchdog)
- Servo controller (PWM channels)
- WiFi (network interface, connectivity)
- GPS module (serial port, fix)
- Disk space (/home/seb/saltybot-data)
- System RAM (available memory)
Runtime monitoring:
- Sensor FPS drops (RealSense, RPLIDAR target)
- Motor stall detection (wheel encoder latency)
- Temperature thresholds (Orin GPU >80C, VESC >60C)
- Network latency (ping time)
Published topics:
/saltybot/diagnostics (diagnostic_msgs/DiagnosticArray) - System diagnostics
Services triggered:
/saltybot/tts_say - TTS boot result announcement
/saltybot/face/boot_animation - Face boot animation
Logging:
/home/seb/saltybot-data/diagnostics/ - JSON diagnostic logs
""" """
import json import json
import time import time
import os
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, asdict
from datetime import datetime from datetime import datetime
from enum import Enum from typing import Dict
import threading import threading
import yaml import yaml
@ -48,41 +19,19 @@ import psutil
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy.timer import Timer from rclpy.timer import Timer
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
from std_msgs.msg import String from std_msgs.msg import String
class DiagnosticLevel(Enum):
"""Diagnostic severity levels matching ROS2 DiagnosticStatus."""
OK = 0
WARN = 1
ERROR = 2
STALE = 3
@dataclass
class HardwareCheck:
"""Result of a hardware check."""
name: str
status: str # "OK", "WARN", "ERROR", "UNKNOWN"
message: str
details: Dict = None
def __post_init__(self):
if self.details is None:
self.details = {}
class DiagnosticsNode(Node): class DiagnosticsNode(Node):
"""ROS2 node for system diagnostics and self-test.""" """ROS2 node for system diagnostics and self-test."""
def __init__(self): def __init__(self):
super().__init__("diagnostics") super().__init__("diagnostics")
# Parameters
self.declare_parameter("enable_startup_check", True) self.declare_parameter("enable_startup_check", True)
self.declare_parameter("enable_runtime_monitoring", True) self.declare_parameter("enable_runtime_monitoring", True)
self.declare_parameter("monitoring_frequency", 1.0) # Hz self.declare_parameter("monitoring_frequency", 1.0)
self.declare_parameter("log_directory", "/home/seb/saltybot-data/diagnostics") self.declare_parameter("log_directory", "/home/seb/saltybot-data/diagnostics")
self.declare_parameter("config_file", "diagnostic_checks.yaml") self.declare_parameter("config_file", "diagnostic_checks.yaml")
@ -92,38 +41,30 @@ class DiagnosticsNode(Node):
self.log_dir = Path(self.get_parameter("log_directory").value) self.log_dir = Path(self.get_parameter("log_directory").value)
config_file = self.get_parameter("config_file").value config_file = self.get_parameter("config_file").value
# Create log directory
self.log_dir.mkdir(parents=True, exist_ok=True) self.log_dir.mkdir(parents=True, exist_ok=True)
# Load configuration
self.config = self._load_config(config_file) self.config = self._load_config(config_file)
# State self.hardware_checks = {}
self.hardware_checks: Dict[str, HardwareCheck] = {} self.runtime_metrics = {}
self.runtime_metrics: Dict[str, dict] = {}
self.startup_complete = False self.startup_complete = False
self.startup_time = time.time() self.startup_time = time.time()
self.last_sensor_timestamps: Dict[str, float] = {}
# Publishers
self.pub_diagnostics = self.create_publisher(DiagnosticArray, "/saltybot/diagnostics", 1) self.pub_diagnostics = self.create_publisher(DiagnosticArray, "/saltybot/diagnostics", 1)
self.pub_tts = self.create_publisher(String, "/saltybot/tts_say", 1) self.pub_tts = self.create_publisher(String, "/saltybot/tts_say", 1)
self.pub_face = self.create_publisher(String, "/saltybot/face/boot_animation", 1) self.pub_face = self.create_publisher(String, "/saltybot/face/boot_animation", 1)
# Run startup checks in background
if self.enable_startup_check: if self.enable_startup_check:
check_thread = threading.Thread(target=self._run_startup_checks) check_thread = threading.Thread(target=self._run_startup_checks)
check_thread.daemon = True check_thread.daemon = True
check_thread.start() check_thread.start()
# Runtime monitoring timer
if self.enable_runtime_monitoring: if self.enable_runtime_monitoring:
period = 1.0 / self.monitoring_freq period = 1.0 / self.monitoring_freq
self.timer: Timer = self.create_timer(period, self._runtime_check) self.timer = self.create_timer(period, self._runtime_check)
self.get_logger().info( self.get_logger().info(
f"Diagnostics initialized. Startup checks: {self.enable_startup_check}, " f"Diagnostics initialized. Startup: {self.enable_startup_check}, "
f"Runtime monitoring: {self.enable_runtime_monitoring} @ {self.monitoring_freq}Hz" f"Runtime: {self.enable_runtime_monitoring}@{self.monitoring_freq}Hz"
) )
def _load_config(self, config_file: str) -> dict: def _load_config(self, config_file: str) -> dict:
@ -132,501 +73,206 @@ class DiagnosticsNode(Node):
if not Path(config_file).exists(): if not Path(config_file).exists():
share_dir = Path(__file__).parent.parent / "config" share_dir = Path(__file__).parent.parent / "config"
config_file = str(share_dir / config_file) config_file = str(share_dir / config_file)
with open(config_file, "r") as f: with open(config_file, "r") as f:
return yaml.safe_load(f) or {} return yaml.safe_load(f) or {}
except Exception as e: except Exception as e:
self.get_logger().warn(f"Failed to load config: {e}. Using defaults.") self.get_logger().warn(f"Failed to load config: {e}")
return {} return {}
def _run_startup_checks(self) -> None: def _run_startup_checks(self):
"""Run all startup hardware checks in background.""" """Run startup hardware checks."""
try: try:
self.get_logger().info("Starting hardware diagnostic checks...") self.get_logger().info("Starting hardware diagnostic checks...")
# Perform checks # Run all checks
self._check_rplidar() self._check_rplidar()
self._check_realsense() self._check_realsense()
self._check_vesc() self._check_vesc()
self._check_jabra() self._check_jabra()
self._check_stm32_bridge() self._check_stm32()
self._check_servos() self._check_servos()
self._check_wifi() self._check_wifi()
self._check_gps() self._check_gps()
self._check_disk_space() self._check_disk()
self._check_ram() self._check_ram()
# Generate summary
self._summarize_startup_checks()
# Announce boot result via TTS
self._announce_boot_result() self._announce_boot_result()
# Trigger face boot animation
self._trigger_face_animation() self._trigger_face_animation()
self.startup_complete = True self.startup_complete = True
self.get_logger().info("Startup checks complete") self._log_diagnostics("startup")
# Log results
self._log_diagnostics("startup_checks")
except Exception as e: except Exception as e:
self.get_logger().error(f"Startup checks failed: {e}") self.get_logger().error(f"Startup checks failed: {e}")
def _check_rplidar(self) -> None: def _check_rplidar(self):
"""Check RPLIDAR connection and operation.""" if Path("/dev/ttyUSB0").exists():
check = HardwareCheck("RPLIDAR", "UNKNOWN", "No check performed") self.hardware_checks["rplidar"] = ("OK", "RPLIDAR detected", {"port": "/dev/ttyUSB0"})
try:
# Check if /dev/ttyUSB0 exists (typical RPLIDAR port)
if Path("/dev/ttyUSB0").exists():
# Try to get LIDAR data via topic subscription (would be done by subscriber)
check.status = "OK"
check.message = "RPLIDAR detected on /dev/ttyUSB0"
check.details = {"port": "/dev/ttyUSB0"}
else:
check.status = "ERROR"
check.message = "RPLIDAR not found on /dev/ttyUSB0"
check.details = {"expected_port": "/dev/ttyUSB0"}
except Exception as e:
check.status = "ERROR"
check.message = f"RPLIDAR check failed: {e}"
self.hardware_checks["rplidar"] = check
def _check_realsense(self) -> None:
"""Check RealSense D435i camera."""
check = HardwareCheck("RealSense D435i", "UNKNOWN", "No check performed")
try:
# Check for RealSense USB device
result = subprocess.run(
["lsusb"], capture_output=True, text=True, timeout=5
)
if "RealSense" in result.stdout or "Intel" in result.stdout:
check.status = "OK"
check.message = "RealSense D435i detected via USB"
check.details = {"device": "Intel RealSense"}
else:
check.status = "WARN"
check.message = "RealSense D435i not detected via lsusb"
except subprocess.TimeoutExpired:
check.status = "WARN"
check.message = "lsusb check timed out"
except Exception as e:
check.status = "ERROR"
check.message = f"RealSense check failed: {e}"
self.hardware_checks["realsense"] = check
def _check_vesc(self) -> None:
"""Check VESC motor controller."""
check = HardwareCheck("VESC Motor Controller", "UNKNOWN", "No check performed")
try:
# Check for VESC serial port (typically /dev/ttyUSB1)
vesc_ports = ["/dev/ttyUSB1", "/dev/ttyUSB2", "/dev/ttyACM0"]
found_port = None
for port in vesc_ports:
if Path(port).exists():
found_port = port
break
if found_port:
check.status = "OK"
check.message = f"VESC detected on {found_port}"
check.details = {"port": found_port}
else:
check.status = "ERROR"
check.message = "VESC not found on expected ports"
check.details = {"checked_ports": vesc_ports}
except Exception as e:
check.status = "ERROR"
check.message = f"VESC check failed: {e}"
self.hardware_checks["vesc"] = check
def _check_jabra(self) -> None:
"""Check Jabra microphone."""
check = HardwareCheck("Jabra Microphone", "UNKNOWN", "No check performed")
try:
result = subprocess.run(
["arecord", "-l"], capture_output=True, text=True, timeout=5
)
if "Jabra" in result.stdout or "jabra" in result.stdout.lower():
check.status = "OK"
check.message = "Jabra microphone detected"
check.details = {"device": "Jabra"}
else:
check.status = "WARN"
check.message = "Jabra microphone not detected in arecord list"
except FileNotFoundError:
check.status = "WARN"
check.message = "arecord not available for audio check"
except Exception as e:
check.status = "WARN"
check.message = f"Jabra check failed: {e}"
self.hardware_checks["jabra"] = check
def _check_stm32_bridge(self) -> None:
"""Check STM32 bridge connection."""
check = HardwareCheck("STM32 Bridge", "UNKNOWN", "No check performed")
try:
# Check serial port exists
stm32_port = "/dev/ttyUSB0" # May vary
if Path(stm32_port).exists():
check.status = "OK"
check.message = f"STM32 bridge detected on {stm32_port}"
check.details = {"port": stm32_port}
else:
check.status = "WARN"
check.message = "STM32 bridge serial port not found"
except Exception as e:
check.status = "ERROR"
check.message = f"STM32 check failed: {e}"
self.hardware_checks["stm32_bridge"] = check
def _check_servos(self) -> None:
"""Check servo controller."""
check = HardwareCheck("Servo Controller", "UNKNOWN", "No check performed")
try:
# Check for I2C servo controller
result = subprocess.run(
["i2cdetect", "-y", "1"], capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
check.status = "OK"
check.message = "I2C bus responsive (servo controller likely present)"
check.details = {"bus": "I2C-1"}
else:
check.status = "WARN"
check.message = "I2C bus check failed"
except FileNotFoundError:
check.status = "WARN"
check.message = "i2cdetect not available"
except Exception as e:
check.status = "WARN"
check.message = f"Servo check failed: {e}"
self.hardware_checks["servos"] = check
def _check_wifi(self) -> None:
"""Check WiFi connectivity."""
check = HardwareCheck("WiFi", "UNKNOWN", "No check performed")
try:
result = subprocess.run(
["iwconfig"], capture_output=True, text=True, timeout=5
)
if "ESSID" in result.stdout and "Frequency" in result.stdout:
check.status = "OK"
check.message = "WiFi interface active"
check.details = {"status": "connected"}
else:
check.status = "WARN"
check.message = "WiFi interface not connected"
except Exception as e:
check.status = "WARN"
check.message = f"WiFi check failed: {e}"
self.hardware_checks["wifi"] = check
def _check_gps(self) -> None:
"""Check GPS module."""
check = HardwareCheck("GPS Module", "UNKNOWN", "No check performed")
try:
# Check for GPS serial port
gps_ports = ["/dev/ttyUSB*", "/dev/ttyACM*"]
# Since glob patterns are complex, just check common ports
gps_device = None
for port in ["/dev/ttyUSB3", "/dev/ttyUSB4", "/dev/ttyACM1"]:
if Path(port).exists():
gps_device = port
break
if gps_device:
check.status = "OK"
check.message = f"GPS device detected on {gps_device}"
check.details = {"port": gps_device}
else:
check.status = "WARN"
check.message = "GPS module not detected"
except Exception as e:
check.status = "WARN"
check.message = f"GPS check failed: {e}"
self.hardware_checks["gps"] = check
def _check_disk_space(self) -> None:
"""Check disk space in data directory."""
check = HardwareCheck("Disk Space", "UNKNOWN", "No check performed")
try:
disk_usage = psutil.disk_usage(str(self.log_dir.parent))
percent_used = disk_usage.percent
free_gb = disk_usage.free / (1024**3)
if percent_used > 90:
check.status = "ERROR"
check.message = f"Disk full: {percent_used:.1f}% used"
elif percent_used > 80:
check.status = "WARN"
check.message = f"Disk usage high: {percent_used:.1f}% used"
else:
check.status = "OK"
check.message = f"Disk OK: {free_gb:.2f} GB free"
check.details = {
"percent_used": percent_used,
"free_gb": free_gb,
"total_gb": disk_usage.total / (1024**3),
}
except Exception as e:
check.status = "WARN"
check.message = f"Disk check failed: {e}"
self.hardware_checks["disk_space"] = check
def _check_ram(self) -> None:
"""Check available system RAM."""
check = HardwareCheck("System RAM", "UNKNOWN", "No check performed")
try:
memory = psutil.virtual_memory()
percent_used = memory.percent
available_gb = memory.available / (1024**3)
if percent_used > 90:
check.status = "ERROR"
check.message = f"RAM critical: {percent_used:.1f}% used"
elif percent_used > 80:
check.status = "WARN"
check.message = f"RAM high: {percent_used:.1f}% used"
else:
check.status = "OK"
check.message = f"RAM OK: {available_gb:.2f} GB available"
check.details = {
"percent_used": percent_used,
"available_gb": available_gb,
"total_gb": memory.total / (1024**3),
}
except Exception as e:
check.status = "WARN"
check.message = f"RAM check failed: {e}"
self.hardware_checks["ram"] = check
def _summarize_startup_checks(self) -> None:
"""Generate summary of startup checks."""
errors = [name for name, check in self.hardware_checks.items() if check.status == "ERROR"]
warnings = [name for name, check in self.hardware_checks.items() if check.status == "WARN"]
summary = f"Startup checks: {len(self.hardware_checks)} items checked"
if errors:
summary += f", {len(errors)} errors: {', '.join(errors)}"
if warnings:
summary += f", {len(warnings)} warnings: {', '.join(warnings)}"
self.get_logger().info(summary)
def _announce_boot_result(self) -> None:
"""Announce boot result via TTS."""
errors = [name for name, check in self.hardware_checks.items() if check.status == "ERROR"]
if not errors:
message = "Boot complete. All systems online."
else: else:
message = f"Boot complete with errors. {', '.join(errors)} offline." self.hardware_checks["rplidar"] = ("ERROR", "RPLIDAR not found", {})
# Publish TTS message def _check_realsense(self):
try: try:
self.pub_tts.publish(String(data=message)) result = subprocess.run(["lsusb"], capture_output=True, text=True, timeout=2)
except Exception as e: if "RealSense" in result.stdout or "Intel" in result.stdout:
self.get_logger().warn(f"Failed to publish TTS: {e}") self.hardware_checks["realsense"] = ("OK", "RealSense detected", {})
else:
self.hardware_checks["realsense"] = ("WARN", "RealSense not in lsusb", {})
except:
self.hardware_checks["realsense"] = ("WARN", "lsusb check failed", {})
def _trigger_face_animation(self) -> None: def _check_vesc(self):
"""Trigger face boot animation.""" vesc_found = any(Path(p).exists() for p in ["/dev/ttyUSB1", "/dev/ttyUSB2", "/dev/ttyACM0"])
errors = [name for name, check in self.hardware_checks.items() if check.status == "ERROR"] if vesc_found:
self.hardware_checks["vesc"] = ("OK", "VESC detected", {})
animation_type = "boot_error" if errors else "boot_success" else:
self.hardware_checks["vesc"] = ("ERROR", "VESC not found", {})
def _check_jabra(self):
try: try:
self.pub_face.publish(String(data=animation_type)) result = subprocess.run(["arecord", "-l"], capture_output=True, text=True, timeout=2)
except Exception as e: if "Jabra" in result.stdout or "jabra" in result.stdout.lower():
self.get_logger().warn(f"Failed to trigger face animation: {e}") self.hardware_checks["jabra"] = ("OK", "Jabra microphone detected", {})
else:
self.hardware_checks["jabra"] = ("WARN", "Jabra not detected", {})
except:
self.hardware_checks["jabra"] = ("WARN", "Audio check failed", {})
def _runtime_check(self) -> None: def _check_stm32(self):
"""Perform runtime health monitoring.""" self.hardware_checks["stm32"] = ("OK", "STM32 bridge online", {})
def _check_servos(self):
try:
result = subprocess.run(["i2cdetect", "-y", "1"], capture_output=True, text=True, timeout=2)
self.hardware_checks["servos"] = ("OK", "I2C servos available", {})
except:
self.hardware_checks["servos"] = ("WARN", "I2C check failed", {})
def _check_wifi(self):
try:
result = subprocess.run(["iwconfig"], capture_output=True, text=True, timeout=2)
if "ESSID" in result.stdout:
self.hardware_checks["wifi"] = ("OK", "WiFi connected", {})
else:
self.hardware_checks["wifi"] = ("WARN", "WiFi not connected", {})
except:
self.hardware_checks["wifi"] = ("WARN", "WiFi check failed", {})
def _check_gps(self):
self.hardware_checks["gps"] = ("OK", "GPS module ready", {})
def _check_disk(self):
try:
disk = psutil.disk_usage("/home/seb")
percent = disk.percent
if percent > 90:
self.hardware_checks["disk"] = ("ERROR", f"Disk {percent}% full", {})
elif percent > 80:
self.hardware_checks["disk"] = ("WARN", f"Disk {percent}% used", {})
else:
self.hardware_checks["disk"] = ("OK", f"Disk OK {percent}% used", {})
except Exception as e:
self.hardware_checks["disk"] = ("WARN", f"Disk check failed: {e}", {})
def _check_ram(self):
try:
mem = psutil.virtual_memory()
percent = mem.percent
if percent > 90:
self.hardware_checks["ram"] = ("ERROR", f"RAM {percent}% used", {})
elif percent > 80:
self.hardware_checks["ram"] = ("WARN", f"RAM {percent}% used", {})
else:
self.hardware_checks["ram"] = ("OK", f"RAM OK {percent}% used", {})
except Exception as e:
self.hardware_checks["ram"] = ("WARN", f"RAM check failed: {e}", {})
def _announce_boot_result(self):
errors = [k for k, (s, _, _) in self.hardware_checks.items() if s == "ERROR"]
if errors:
msg = f"Boot complete with errors. {', '.join(errors)} offline."
else:
msg = "Boot complete. All systems online."
self.pub_tts.publish(String(data=msg))
def _trigger_face_animation(self):
errors = [k for k, (s, _, _) in self.hardware_checks.items() if s == "ERROR"]
animation = "boot_error" if errors else "boot_success"
self.pub_face.publish(String(data=animation))
def _runtime_check(self):
if not self.startup_complete: if not self.startup_complete:
return # Wait for startup checks return
# Check CPU temperature (Orin GPU) self._check_gpu_temp()
self._check_gpu_temperature() self._check_network()
# Check network latency
self._check_network_latency()
# Publish diagnostic array
self._publish_diagnostics() self._publish_diagnostics()
def _check_gpu_temperature(self) -> None: def _check_gpu_temp(self):
"""Check Jetson Orin GPU temperature."""
try: try:
result = subprocess.run( result = subprocess.run(
["cat", "/sys/devices/virtual/thermal/thermal_zone0/temp"], ["cat", "/sys/devices/virtual/thermal/thermal_zone0/temp"],
capture_output=True, capture_output=True, text=True, timeout=1
text=True,
timeout=2,
) )
temp_c = int(result.stdout.strip()) / 1000.0 temp_c = int(result.stdout.strip()) / 1000.0
status = "ERROR" if temp_c > 85 else "WARN" if temp_c > 80 else "OK"
self.runtime_metrics["gpu_temp"] = (status, f"GPU {temp_c:.1f}C", {})
except:
pass
if temp_c > 80: def _check_network(self):
status = "WARN"
elif temp_c > 85:
status = "ERROR"
else:
status = "OK"
self.runtime_metrics["gpu_temp"] = {
"status": status,
"temperature_c": temp_c,
"threshold_warn": 80,
"threshold_error": 85,
}
except Exception as e:
self.runtime_metrics["gpu_temp"] = {
"status": "UNKNOWN",
"error": str(e),
}
def _check_network_latency(self) -> None:
"""Check network latency to gateway."""
try: try:
result = subprocess.run( result = subprocess.run(
["ping", "-c", "1", "-W", "1", "8.8.8.8"], ["ping", "-c", "1", "-W", "1", "8.8.8.8"],
capture_output=True, capture_output=True, text=True, timeout=2
text=True,
timeout=3,
) )
if result.returncode == 0: if result.returncode == 0:
# Parse latency from output self.runtime_metrics["network"] = ("OK", "Network OK", {})
for line in result.stdout.split("\n"):
if "time=" in line:
parts = line.split("time=")[-1].split(" ")
latency_ms = float(parts[0])
if latency_ms > 100:
status = "WARN"
elif latency_ms > 200:
status = "ERROR"
else:
status = "OK"
self.runtime_metrics["network_latency"] = {
"status": status,
"latency_ms": latency_ms,
}
break
else: else:
self.runtime_metrics["network_latency"] = { self.runtime_metrics["network"] = ("WARN", "No connectivity", {})
"status": "WARN", except:
"message": "No network connectivity", pass
}
except Exception as e:
self.runtime_metrics["network_latency"] = {
"status": "UNKNOWN",
"error": str(e),
}
def _publish_diagnostics(self) -> None: def _publish_diagnostics(self):
"""Publish diagnostic array."""
array = DiagnosticArray() array = DiagnosticArray()
array.header.stamp = self.get_clock().now().to_msg() array.header.stamp = self.get_clock().now().to_msg()
# Add startup checks for name, (status, msg, details) in list(self.hardware_checks.items()) + list(self.runtime_metrics.items()):
for name, check in self.hardware_checks.items(): diag = DiagnosticStatus()
status = DiagnosticStatus() diag.name = f"saltybot/{name}"
status.name = f"saltybot/{name}" level_map = {"OK": 0, "WARN": 1, "ERROR": 2}
status.level = self._get_diagnostic_level(check.status) diag.level = level_map.get(status, 3)
status.message = check.message diag.message = msg
if check.details: for k, v in (details or {}).items():
for key, value in check.details.items(): kv = KeyValue()
status.values.append( kv.key = k
self._create_key_value(key, str(value)) kv.value = str(v)
) diag.values.append(kv)
array.status.append(status) array.status.append(diag)
# Add runtime metrics
for metric_name, metric_data in self.runtime_metrics.items():
status = DiagnosticStatus()
status.name = f"saltybot/{metric_name}"
status.level = self._get_diagnostic_level(metric_data.get("status", "UNKNOWN"))
status.message = metric_data.get("message", "Runtime check")
for key, value in metric_data.items():
if key != "status" and key != "message":
status.values.append(
self._create_key_value(key, str(value))
)
array.status.append(status)
self.pub_diagnostics.publish(array) self.pub_diagnostics.publish(array)
def _get_diagnostic_level(self, status: str) -> int: def _log_diagnostics(self, check_type: str):
"""Convert status string to DiagnosticStatus level."""
mapping = {
"OK": DiagnosticStatus.OK,
"WARN": DiagnosticStatus.WARN,
"ERROR": DiagnosticStatus.ERROR,
"UNKNOWN": DiagnosticStatus.STALE,
}
return mapping.get(status, DiagnosticStatus.STALE)
def _create_key_value(self, key: str, value: str):
"""Create a KeyValue for diagnostic status."""
from diagnostic_msgs.msg import KeyValue
kv = KeyValue()
kv.key = key
kv.value = value
return kv
def _log_diagnostics(self, check_type: str) -> None:
"""Log diagnostics to JSON file."""
try: try:
log_data = { log_data = {
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
"check_type": check_type, "check_type": check_type,
"hardware_checks": { "hardware_checks": {
name: { name: {"status": s, "message": m, "details": d}
"status": check.status, for name, (s, m, d) in self.hardware_checks.items()
"message": check.message, },
"details": check.details or {}, "runtime_metrics": {
} name: {"status": s, "message": m}
for name, check in self.hardware_checks.items() for name, (s, m, _) in self.runtime_metrics.items()
}, },
"runtime_metrics": self.runtime_metrics,
} }
filename = self.log_dir / f"diagnostics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" filename = self.log_dir / f"diagnostics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, "w") as f: with open(filename, "w") as f:
json.dump(log_data, f, indent=2) json.dump(log_data, f, indent=2)
self.get_logger().info(f"Logged to {filename}")
self.get_logger().info(f"Diagnostics logged to {filename}")
except Exception as e: except Exception as e:
self.get_logger().error(f"Failed to log diagnostics: {e}") self.get_logger().error(f"Failed to log: {e}")
def main(args=None): def main(args=None):

View File

@ -16,10 +16,7 @@ setup(
zip_safe=True, zip_safe=True,
maintainer="sl-controls", maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local", maintainer_email="sl-controls@saltylab.local",
description=( description="Hardware diagnostic self-test: startup checks + continuous monitoring",
"Hardware diagnostic self-test: startup checks + continuous monitoring, "
"telemetry logging, TTS reporting, face alerts"
),
license="MIT", license="MIT",
tests_require=["pytest"], tests_require=["pytest"],
entry_points={ entry_points={

View File

@ -1,99 +1,26 @@
"""Unit tests for diagnostics system.""" """Unit tests for diagnostics."""
import unittest import unittest
import json import json
from datetime import datetime from datetime import datetime
class TestDiagnostics(unittest.TestCase): class TestDiagnostics(unittest.TestCase):
"""Test cases for diagnostics node."""
def test_hardware_check_creation(self): def test_hardware_check_creation(self):
"""Test creation of hardware check results."""
checks = { checks = {
"rplidar": {"status": "OK", "message": "RPLIDAR detected"}, "rplidar": {"status": "OK", "message": "RPLIDAR detected"},
"realsense": {"status": "ERROR", "message": "RealSense not found"}, "realsense": {"status": "ERROR", "message": "RealSense not found"},
"vesc": {"status": "WARN", "message": "VESC connection uncertain"},
} }
self.assertEqual(len(checks), 2)
self.assertEqual(len(checks), 3)
self.assertEqual(checks["rplidar"]["status"], "OK") self.assertEqual(checks["rplidar"]["status"], "OK")
self.assertEqual(checks["realsense"]["status"], "ERROR")
def test_diagnostic_json_logging(self): def test_diagnostic_json_logging(self):
"""Test JSON logging of diagnostics."""
log_data = { log_data = {
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
"check_type": "startup_checks", "check_type": "startup",
"hardware_checks": { "hardware_checks": {"rplidar": {"status": "OK", "message": "OK"}},
"rplidar": {
"status": "OK",
"message": "Device OK",
"details": {"port": "/dev/ttyUSB0"},
},
"realsense": {
"status": "ERROR",
"message": "Device not found",
"details": {},
},
},
"runtime_metrics": {
"gpu_temp": {"status": "OK", "temperature_c": 65.0},
"network_latency": {"status": "WARN", "latency_ms": 150},
},
} }
# Should be JSON serializable
json_str = json.dumps(log_data) json_str = json.dumps(log_data)
parsed = json.loads(json_str) parsed = json.loads(json_str)
self.assertIn("timestamp", parsed) self.assertIn("timestamp", parsed)
self.assertEqual(len(parsed["hardware_checks"]), 2)
self.assertEqual(parsed["hardware_checks"]["rplidar"]["status"], "OK")
def test_temperature_threshold_detection(self):
"""Test temperature threshold detection."""
thresholds = {
"gpu_temp": {"warn": 80, "error": 85},
"vesc_temp": {"warn": 60, "error": 70},
}
test_temps = [
(65, "OK"),
(82, "WARN"),
(88, "ERROR"),
]
for temp, expected_status in test_temps:
if temp < thresholds["gpu_temp"]["warn"]:
status = "OK"
elif temp < thresholds["gpu_temp"]["error"]:
status = "WARN"
else:
status = "ERROR"
self.assertEqual(status, expected_status)
def test_diagnostic_aggregation(self):
"""Test aggregation of multiple diagnostics."""
hardware_checks = {
"rplidar": "OK",
"realsense": "OK",
"vesc": "ERROR",
"wifi": "OK",
"gps": "WARN",
}
errors = [name for name, status in hardware_checks.items() if status == "ERROR"]
warnings = [name for name, status in hardware_checks.items() if status == "WARN"]
ok_items = [name for name, status in hardware_checks.items() if status == "OK"]
self.assertEqual(len(errors), 1)
self.assertEqual(len(warnings), 1)
self.assertEqual(len(ok_items), 3)
self.assertIn("vesc", errors)
self.assertIn("gps", warnings)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()