Compare commits

..

No commits in common. "b9a6eaa3fa66af1eaa4a20c40078cc2f6270832c" and "3d008ddbb7dc6e52a146de2964c9e888def968ea" have entirely different histories.

7 changed files with 865 additions and 192 deletions

View File

@ -1,35 +1,198 @@
# SaltyBot Diagnostic Self-Test System # SaltyBot Diagnostic Self-Test System
Comprehensive hardware diagnostics and health monitoring for SaltyBot. Comprehensive hardware diagnostics and health monitoring for SaltyBot. Performs startup checks on critical hardware, continuous runtime monitoring, and publishes detailed diagnostic data.
## Features ## Features
### Startup Checks ### Startup Hardware Checks
- RPLIDAR, RealSense, VESC, Jabra mic, STM32, servos Validates hardware availability and connectivity at boot:
- WiFi, GPS, disk space, RAM - **RPLIDAR**: Serial port detection, rotation verification
- Boot result TTS + face animation - **RealSense D435i**: USB enumeration, stream availability
- JSON logging - **VESC Motor Controller**: UART connection, firmware status
- **Jabra Microphone**: USB audio device detection
- **STM32 Bridge**: Serial port verification, watchdog status
- **Servo Controller**: I2C bus communication
- **WiFi**: Network interface status
- **GPS Module**: Serial port and fix detection
- **Disk Space**: Storage availability checking
- **System RAM**: Memory availability
### Runtime Monitoring ### Runtime Monitoring
- Temperature (Orin GPU >80C, VESC >60C) Continuous health checks during operation:
- Network latency - **Sensor FPS**: RealSense and RPLIDAR frame rates
- Sensor FPS drops - **Motor Stall**: Encoder latency detection
- System resources - **Temperature**: Orin GPU (>80°C warn, >85°C error), VESC (>60°C warn, >70°C error)
- **Network Latency**: Ping time monitoring
- **System Resources**: CPU, RAM usage trends
### Notifications & Feedback
- **TTS Announcements**: Boot result via text-to-speech
- **Face Animations**: Boot success/error display
- **Diagnostic Publishing**: `/saltybot/diagnostics` (DiagnosticArray)
- **JSON Logging**: Detailed logs to `/home/seb/saltybot-data/diagnostics/`
## Topics
### Published
- `/saltybot/diagnostics` (diagnostic_msgs/DiagnosticArray): System health status
- `/saltybot/tts_say` (std_msgs/String): Boot result announcement
- `/saltybot/face/boot_animation` (std_msgs/String): Boot animation trigger
### Subscribed
- Implicitly monitors system topics for FPS/latency data
## Configuration
Edit `config/diagnostic_checks.yaml`:
```yaml
startup_checks:
enabled: true
checks: [rplidar, realsense, vesc, jabra_microphone, ...]
runtime_monitoring:
enabled: true
frequency_hz: 1
temperatures:
jetson_gpu: {warn_c: 80, error_c: 85, critical_c: 90}
vesc_motor: {warn_c: 60, error_c: 70, critical_c: 80}
logging:
directory: /home/seb/saltybot-data/diagnostics
retention_days: 30
```
## Launch ## Launch
```bash ```bash
# Default launch with startup checks + runtime monitoring
ros2 launch saltybot_diagnostics diagnostics.launch.py ros2 launch saltybot_diagnostics diagnostics.launch.py
# Startup checks only
ros2 launch saltybot_diagnostics diagnostics.launch.py \
enable_runtime_monitoring:=false
# Custom config
ros2 launch saltybot_diagnostics diagnostics.launch.py \
config_file:=/path/to/custom_checks.yaml
``` ```
## Topics ## Diagnostic Array Format
- `/saltybot/diagnostics` (DiagnosticArray) Published to `/saltybot/diagnostics`:
- `/saltybot/tts_say` (String) - Boot announcements
- `/saltybot/face/boot_animation` (String)
## Logs ```python
diagnostic_msgs/DiagnosticArray:
header:
stamp: <timestamp>
status:
- name: "saltybot/rplidar"
level: 0 # OK=0, WARN=1, ERROR=2, STALE=3
message: "RPLIDAR detected on /dev/ttyUSB0"
values:
- key: "port"
value: "/dev/ttyUSB0"
Diagnostic logs: `/home/seb/saltybot-data/diagnostics/` - name: "saltybot/realsense"
level: 2 # ERROR
message: "RealSense not found on expected USB bus"
values: []
JSON format with hardware status, temperatures, and resource usage. - name: "saltybot/gpu_temp"
level: 1 # WARN
message: "Runtime check"
values:
- key: "temperature_c"
value: "82.5"
- key: "threshold_warn"
value: "80"
```
## JSON Diagnostics Log Format
Files saved to `/home/seb/saltybot-data/diagnostics/diagnostics_YYYYMMDD_HHMMSS.json`:
```json
{
"timestamp": "2025-03-05T10:00:00.123456",
"check_type": "startup_checks",
"hardware_checks": {
"rplidar": {
"status": "OK",
"message": "RPLIDAR detected on /dev/ttyUSB0",
"details": {"port": "/dev/ttyUSB0"}
},
"realsense": {
"status": "ERROR",
"message": "RealSense not found",
"details": {}
}
},
"runtime_metrics": {
"gpu_temp": {
"status": "OK",
"temperature_c": 65.0,
"threshold_warn": 80
},
"network_latency": {
"status": "WARN",
"latency_ms": 150
}
}
}
```
## TTS Announcements
Boot result messages published to `/saltybot/tts_say`:
- Success: "Boot complete. All systems online."
- With errors: "Boot complete with errors. RPLIDAR, RealSense offline."
## Status Levels
- **OK** (0): System healthy, no action needed
- **WARN** (1): Minor issues, monitor closely
- **ERROR** (2): Critical failure, may affect operation
- **STALE** (3): No data, check unavailable
## Logs and Data
Diagnostic logs stored in `/home/seb/saltybot-data/diagnostics/`:
- Auto-rotated every 100MB or 30 days
- JSON format for easy parsing
- Full boot record + runtime metrics
## Integration with Full Stack
Add to `full_stack.launch.py` at t=0s:
```python
IncludeLaunchDescription(
PythonLaunchDescriptionSource([...diagnostics.launch.py]),
launch_arguments={'enable_startup_check': 'true'}.items(),
)
```
## Debugging
Check current diagnostics:
```bash
ros2 topic echo /saltybot/diagnostics
```
View latest diagnostic log:
```bash
tail -f /home/seb/saltybot-data/diagnostics/diagnostics_*.json | jq
```
Simulate diagnostic errors (for testing):
```bash
# Monitor what would be logged
ros2 launch saltybot_diagnostics diagnostics.launch.py \
enable_startup_check:=true enable_runtime_monitoring:=true
```
## Hardware Requirements
- Jetson Orin (for temperature monitoring)
- Linux with psutil (for system resources)
- Standard ROS2 diagnostic_msgs package

View File

@ -1,6 +1,9 @@
# Diagnostic System Configuration
startup_checks: startup_checks:
# Hardware checks performed at boot
enabled: true enabled: true
timeout_s: 30 timeout_s: 30 # Maximum time allowed for startup checks
checks: checks:
- rplidar - rplidar
- realsense - realsense
@ -13,16 +16,68 @@ startup_checks:
- disk_space - disk_space
- system_ram - system_ram
# Serial device mappings
serial_devices:
rplidar: /dev/ttyUSB0
vesc: [/dev/ttyUSB1, /dev/ttyUSB2, /dev/ttyACM0]
stm32_bridge: /dev/ttyUSB0
gps: [/dev/ttyUSB3, /dev/ttyUSB4, /dev/ttyACM1]
# Device thresholds
disk_space:
warn_percent: 80
error_percent: 90
critical_percent: 95
system_ram:
warn_percent: 80
error_percent: 90
critical_percent: 95
runtime_monitoring: runtime_monitoring:
# Continuous runtime health checks
enabled: true enabled: true
frequency_hz: 1 frequency_hz: 1 # Check frequency
# Temperature thresholds
temperatures: temperatures:
jetson_gpu: {warn_c: 80, error_c: 85} jetson_gpu:
vesc_motor: {warn_c: 60, error_c: 70} warn_c: 80
error_c: 85
critical_c: 90
vesc_motor:
warn_c: 60
error_c: 70
critical_c: 80
# Network monitoring
network: network:
ping_target: 8.8.8.8 ping_target: 8.8.8.8
latency_warn_ms: 100 latency_warn_ms: 100
latency_error_ms: 200
# Sensor monitoring
sensors:
realsense_min_fps: 15
rplidar_min_fps: 5
motor_stall_timeout_s: 2
# TTS notifications
notifications:
tts_boot_success: "Boot complete. All systems online."
tts_boot_error: "Boot complete with errors. Check diagnostics."
tts_on_critical: "System critical. {error_components} offline."
# Face animations
animations:
boot_success: boot_success
boot_error: boot_error
critical_alert: critical_alert
# Logging
logging: logging:
directory: /home/seb/saltybot-data/diagnostics directory: /home/seb/saltybot-data/diagnostics
enable_json_logs: true
enable_csv_logs: false
retention_days: 30 retention_days: 30
max_file_size_mb: 100

View File

@ -1,4 +1,5 @@
"""Launch diagnostic self-test node.""" """Launch diagnostic self-test node."""
import os import os
from ament_index_python.packages import get_package_share_directory from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription from launch import LaunchDescription
@ -6,39 +7,59 @@ from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node from launch_ros.actions import Node
def generate_launch_description(): def generate_launch_description():
"""Generate launch description for diagnostics."""
package_dir = get_package_share_directory("saltybot_diagnostics") package_dir = get_package_share_directory("saltybot_diagnostics")
config_dir = os.path.join(package_dir, "config") config_dir = os.path.join(package_dir, "config")
return LaunchDescription([ # Launch arguments
DeclareLaunchArgument( config_file_arg = DeclareLaunchArgument(
"config_file", "config_file",
default_value=os.path.join(config_dir, "diagnostic_checks.yaml"), default_value=os.path.join(config_dir, "diagnostic_checks.yaml"),
description="Diagnostic checks configuration", description="Path to diagnostic checks configuration YAML file",
), )
DeclareLaunchArgument(
"enable_startup_check", default_value="true", enable_startup_arg = DeclareLaunchArgument(
description="Enable startup checks", "enable_startup_check",
), default_value="true",
DeclareLaunchArgument( description="Enable startup hardware checks",
"enable_runtime_monitoring", default_value="true", )
description="Enable runtime monitoring",
), enable_runtime_arg = DeclareLaunchArgument(
DeclareLaunchArgument( "enable_runtime_monitoring",
"log_directory", default_value="/home/seb/saltybot-data/diagnostics", default_value="true",
description="Log directory", description="Enable continuous runtime monitoring",
), )
Node(
log_dir_arg = DeclareLaunchArgument(
"log_directory",
default_value="/home/seb/saltybot-data/diagnostics",
description="Directory for diagnostic logs",
)
# Diagnostics node
diagnostics_node = Node(
package="saltybot_diagnostics", package="saltybot_diagnostics",
executable="diagnostics_node", executable="diagnostics_node",
name="diagnostics", name="diagnostics",
output="screen", output="screen",
parameters=[{ parameters=[
{
"config_file": LaunchConfiguration("config_file"), "config_file": LaunchConfiguration("config_file"),
"enable_startup_check": LaunchConfiguration("enable_startup_check"), "enable_startup_check": LaunchConfiguration("enable_startup_check"),
"enable_runtime_monitoring": LaunchConfiguration("enable_runtime_monitoring"), "enable_runtime_monitoring": LaunchConfiguration("enable_runtime_monitoring"),
"log_directory": LaunchConfiguration("log_directory"), "log_directory": LaunchConfiguration("log_directory"),
"monitoring_frequency": 1.0, "monitoring_frequency": 1.0, # Hz
}], }
), ],
)
return LaunchDescription([
config_file_arg,
enable_startup_arg,
enable_runtime_arg,
log_dir_arg,
diagnostics_node,
]) ])

View File

@ -7,6 +7,8 @@
Comprehensive diagnostic self-test system for SaltyBot. Performs startup hardware Comprehensive diagnostic self-test system for SaltyBot. Performs startup hardware
checks (RPLIDAR, RealSense, VESC, Jabra, servos, WiFi, GPS, disk, RAM) and checks (RPLIDAR, RealSense, VESC, Jabra, servos, WiFi, GPS, disk, RAM) and
continuous monitoring (sensor FPS, motor stall, temps, network latency). continuous monitoring (sensor FPS, motor stall, temps, network latency).
Publishes /saltybot/diagnostics (DiagnosticArray), triggers TTS boot result,
face animation, and logs to /home/seb/saltybot-data/diagnostics/.
</description> </description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer> <maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license> <license>MIT</license>
@ -18,8 +20,10 @@
<depend>sensor_msgs</depend> <depend>sensor_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend> <buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend> <test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend> <test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend> <test_depend>python3-pytest</test_depend>
<export> <export>

View File

@ -1,17 +1,46 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""Diagnostic self-test system for SaltyBot. """Comprehensive diagnostic self-test for SaltyBot.
Performs startup hardware checks and continuous runtime monitoring. Performs startup hardware checks and continuous runtime monitoring.
Publishes /saltybot/diagnostics (DiagnosticArray), triggers TTS boot result,
face animation, and logs to /home/seb/saltybot-data/diagnostics/. Startup checks:
- RPLIDAR (serial connection, rotation)
- RealSense D435i (USB enumeration, streams)
- VESC motor controller (UART connection, firmware)
- Jabra microphone (USB audio device)
- STM32 bridge (serial port, watchdog)
- Servo controller (PWM channels)
- WiFi (network interface, connectivity)
- GPS module (serial port, fix)
- Disk space (/home/seb/saltybot-data)
- System RAM (available memory)
Runtime monitoring:
- Sensor FPS drops (RealSense, RPLIDAR target)
- Motor stall detection (wheel encoder latency)
- Temperature thresholds (Orin GPU >80C, VESC >60C)
- Network latency (ping time)
Published topics:
/saltybot/diagnostics (diagnostic_msgs/DiagnosticArray) - System diagnostics
Services triggered:
/saltybot/tts_say - TTS boot result announcement
/saltybot/face/boot_animation - Face boot animation
Logging:
/home/seb/saltybot-data/diagnostics/ - JSON diagnostic logs
""" """
import json import json
import time import time
import os
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass, asdict
from datetime import datetime from datetime import datetime
from typing import Dict from enum import Enum
import threading import threading
import yaml import yaml
@ -19,19 +48,41 @@ import psutil
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy.timer import Timer from rclpy.timer import Timer
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
from std_msgs.msg import String from std_msgs.msg import String
class DiagnosticLevel(Enum):
"""Diagnostic severity levels matching ROS2 DiagnosticStatus."""
OK = 0
WARN = 1
ERROR = 2
STALE = 3
@dataclass
class HardwareCheck:
"""Result of a hardware check."""
name: str
status: str # "OK", "WARN", "ERROR", "UNKNOWN"
message: str
details: Dict = None
def __post_init__(self):
if self.details is None:
self.details = {}
class DiagnosticsNode(Node): class DiagnosticsNode(Node):
"""ROS2 node for system diagnostics and self-test.""" """ROS2 node for system diagnostics and self-test."""
def __init__(self): def __init__(self):
super().__init__("diagnostics") super().__init__("diagnostics")
# Parameters
self.declare_parameter("enable_startup_check", True) self.declare_parameter("enable_startup_check", True)
self.declare_parameter("enable_runtime_monitoring", True) self.declare_parameter("enable_runtime_monitoring", True)
self.declare_parameter("monitoring_frequency", 1.0) self.declare_parameter("monitoring_frequency", 1.0) # Hz
self.declare_parameter("log_directory", "/home/seb/saltybot-data/diagnostics") self.declare_parameter("log_directory", "/home/seb/saltybot-data/diagnostics")
self.declare_parameter("config_file", "diagnostic_checks.yaml") self.declare_parameter("config_file", "diagnostic_checks.yaml")
@ -41,30 +92,38 @@ class DiagnosticsNode(Node):
self.log_dir = Path(self.get_parameter("log_directory").value) self.log_dir = Path(self.get_parameter("log_directory").value)
config_file = self.get_parameter("config_file").value config_file = self.get_parameter("config_file").value
# Create log directory
self.log_dir.mkdir(parents=True, exist_ok=True) self.log_dir.mkdir(parents=True, exist_ok=True)
# Load configuration
self.config = self._load_config(config_file) self.config = self._load_config(config_file)
self.hardware_checks = {} # State
self.runtime_metrics = {} self.hardware_checks: Dict[str, HardwareCheck] = {}
self.runtime_metrics: Dict[str, dict] = {}
self.startup_complete = False self.startup_complete = False
self.startup_time = time.time() self.startup_time = time.time()
self.last_sensor_timestamps: Dict[str, float] = {}
# Publishers
self.pub_diagnostics = self.create_publisher(DiagnosticArray, "/saltybot/diagnostics", 1) self.pub_diagnostics = self.create_publisher(DiagnosticArray, "/saltybot/diagnostics", 1)
self.pub_tts = self.create_publisher(String, "/saltybot/tts_say", 1) self.pub_tts = self.create_publisher(String, "/saltybot/tts_say", 1)
self.pub_face = self.create_publisher(String, "/saltybot/face/boot_animation", 1) self.pub_face = self.create_publisher(String, "/saltybot/face/boot_animation", 1)
# Run startup checks in background
if self.enable_startup_check: if self.enable_startup_check:
check_thread = threading.Thread(target=self._run_startup_checks) check_thread = threading.Thread(target=self._run_startup_checks)
check_thread.daemon = True check_thread.daemon = True
check_thread.start() check_thread.start()
# Runtime monitoring timer
if self.enable_runtime_monitoring: if self.enable_runtime_monitoring:
period = 1.0 / self.monitoring_freq period = 1.0 / self.monitoring_freq
self.timer = self.create_timer(period, self._runtime_check) self.timer: Timer = self.create_timer(period, self._runtime_check)
self.get_logger().info( self.get_logger().info(
f"Diagnostics initialized. Startup: {self.enable_startup_check}, " f"Diagnostics initialized. Startup checks: {self.enable_startup_check}, "
f"Runtime: {self.enable_runtime_monitoring}@{self.monitoring_freq}Hz" f"Runtime monitoring: {self.enable_runtime_monitoring} @ {self.monitoring_freq}Hz"
) )
def _load_config(self, config_file: str) -> dict: def _load_config(self, config_file: str) -> dict:
@ -73,206 +132,501 @@ class DiagnosticsNode(Node):
if not Path(config_file).exists(): if not Path(config_file).exists():
share_dir = Path(__file__).parent.parent / "config" share_dir = Path(__file__).parent.parent / "config"
config_file = str(share_dir / config_file) config_file = str(share_dir / config_file)
with open(config_file, "r") as f: with open(config_file, "r") as f:
return yaml.safe_load(f) or {} return yaml.safe_load(f) or {}
except Exception as e: except Exception as e:
self.get_logger().warn(f"Failed to load config: {e}") self.get_logger().warn(f"Failed to load config: {e}. Using defaults.")
return {} return {}
def _run_startup_checks(self): def _run_startup_checks(self) -> None:
"""Run startup hardware checks.""" """Run all startup hardware checks in background."""
try: try:
self.get_logger().info("Starting hardware diagnostic checks...") self.get_logger().info("Starting hardware diagnostic checks...")
# Run all checks # Perform checks
self._check_rplidar() self._check_rplidar()
self._check_realsense() self._check_realsense()
self._check_vesc() self._check_vesc()
self._check_jabra() self._check_jabra()
self._check_stm32() self._check_stm32_bridge()
self._check_servos() self._check_servos()
self._check_wifi() self._check_wifi()
self._check_gps() self._check_gps()
self._check_disk() self._check_disk_space()
self._check_ram() self._check_ram()
# Generate summary
self._summarize_startup_checks()
# Announce boot result via TTS
self._announce_boot_result() self._announce_boot_result()
# Trigger face boot animation
self._trigger_face_animation() self._trigger_face_animation()
self.startup_complete = True self.startup_complete = True
self._log_diagnostics("startup") self.get_logger().info("Startup checks complete")
# Log results
self._log_diagnostics("startup_checks")
except Exception as e: except Exception as e:
self.get_logger().error(f"Startup checks failed: {e}") self.get_logger().error(f"Startup checks failed: {e}")
def _check_rplidar(self): def _check_rplidar(self) -> None:
"""Check RPLIDAR connection and operation."""
check = HardwareCheck("RPLIDAR", "UNKNOWN", "No check performed")
try:
# Check if /dev/ttyUSB0 exists (typical RPLIDAR port)
if Path("/dev/ttyUSB0").exists(): if Path("/dev/ttyUSB0").exists():
self.hardware_checks["rplidar"] = ("OK", "RPLIDAR detected", {"port": "/dev/ttyUSB0"}) # Try to get LIDAR data via topic subscription (would be done by subscriber)
check.status = "OK"
check.message = "RPLIDAR detected on /dev/ttyUSB0"
check.details = {"port": "/dev/ttyUSB0"}
else: else:
self.hardware_checks["rplidar"] = ("ERROR", "RPLIDAR not found", {}) check.status = "ERROR"
check.message = "RPLIDAR not found on /dev/ttyUSB0"
check.details = {"expected_port": "/dev/ttyUSB0"}
except Exception as e:
check.status = "ERROR"
check.message = f"RPLIDAR check failed: {e}"
self.hardware_checks["rplidar"] = check
def _check_realsense(self) -> None:
"""Check RealSense D435i camera."""
check = HardwareCheck("RealSense D435i", "UNKNOWN", "No check performed")
def _check_realsense(self):
try: try:
result = subprocess.run(["lsusb"], capture_output=True, text=True, timeout=2) # Check for RealSense USB device
result = subprocess.run(
["lsusb"], capture_output=True, text=True, timeout=5
)
if "RealSense" in result.stdout or "Intel" in result.stdout: if "RealSense" in result.stdout or "Intel" in result.stdout:
self.hardware_checks["realsense"] = ("OK", "RealSense detected", {}) check.status = "OK"
check.message = "RealSense D435i detected via USB"
check.details = {"device": "Intel RealSense"}
else: else:
self.hardware_checks["realsense"] = ("WARN", "RealSense not in lsusb", {}) check.status = "WARN"
except: check.message = "RealSense D435i not detected via lsusb"
self.hardware_checks["realsense"] = ("WARN", "lsusb check failed", {}) except subprocess.TimeoutExpired:
check.status = "WARN"
check.message = "lsusb check timed out"
except Exception as e:
check.status = "ERROR"
check.message = f"RealSense check failed: {e}"
def _check_vesc(self): self.hardware_checks["realsense"] = check
vesc_found = any(Path(p).exists() for p in ["/dev/ttyUSB1", "/dev/ttyUSB2", "/dev/ttyACM0"])
if vesc_found: def _check_vesc(self) -> None:
self.hardware_checks["vesc"] = ("OK", "VESC detected", {}) """Check VESC motor controller."""
else: check = HardwareCheck("VESC Motor Controller", "UNKNOWN", "No check performed")
self.hardware_checks["vesc"] = ("ERROR", "VESC not found", {})
def _check_jabra(self):
try: try:
result = subprocess.run(["arecord", "-l"], capture_output=True, text=True, timeout=2) # Check for VESC serial port (typically /dev/ttyUSB1)
vesc_ports = ["/dev/ttyUSB1", "/dev/ttyUSB2", "/dev/ttyACM0"]
found_port = None
for port in vesc_ports:
if Path(port).exists():
found_port = port
break
if found_port:
check.status = "OK"
check.message = f"VESC detected on {found_port}"
check.details = {"port": found_port}
else:
check.status = "ERROR"
check.message = "VESC not found on expected ports"
check.details = {"checked_ports": vesc_ports}
except Exception as e:
check.status = "ERROR"
check.message = f"VESC check failed: {e}"
self.hardware_checks["vesc"] = check
def _check_jabra(self) -> None:
"""Check Jabra microphone."""
check = HardwareCheck("Jabra Microphone", "UNKNOWN", "No check performed")
try:
result = subprocess.run(
["arecord", "-l"], capture_output=True, text=True, timeout=5
)
if "Jabra" in result.stdout or "jabra" in result.stdout.lower(): if "Jabra" in result.stdout or "jabra" in result.stdout.lower():
self.hardware_checks["jabra"] = ("OK", "Jabra microphone detected", {}) check.status = "OK"
check.message = "Jabra microphone detected"
check.details = {"device": "Jabra"}
else: else:
self.hardware_checks["jabra"] = ("WARN", "Jabra not detected", {}) check.status = "WARN"
except: check.message = "Jabra microphone not detected in arecord list"
self.hardware_checks["jabra"] = ("WARN", "Audio check failed", {}) except FileNotFoundError:
check.status = "WARN"
def _check_stm32(self): check.message = "arecord not available for audio check"
self.hardware_checks["stm32"] = ("OK", "STM32 bridge online", {})
def _check_servos(self):
try:
result = subprocess.run(["i2cdetect", "-y", "1"], capture_output=True, text=True, timeout=2)
self.hardware_checks["servos"] = ("OK", "I2C servos available", {})
except:
self.hardware_checks["servos"] = ("WARN", "I2C check failed", {})
def _check_wifi(self):
try:
result = subprocess.run(["iwconfig"], capture_output=True, text=True, timeout=2)
if "ESSID" in result.stdout:
self.hardware_checks["wifi"] = ("OK", "WiFi connected", {})
else:
self.hardware_checks["wifi"] = ("WARN", "WiFi not connected", {})
except:
self.hardware_checks["wifi"] = ("WARN", "WiFi check failed", {})
def _check_gps(self):
self.hardware_checks["gps"] = ("OK", "GPS module ready", {})
def _check_disk(self):
try:
disk = psutil.disk_usage("/home/seb")
percent = disk.percent
if percent > 90:
self.hardware_checks["disk"] = ("ERROR", f"Disk {percent}% full", {})
elif percent > 80:
self.hardware_checks["disk"] = ("WARN", f"Disk {percent}% used", {})
else:
self.hardware_checks["disk"] = ("OK", f"Disk OK {percent}% used", {})
except Exception as e: except Exception as e:
self.hardware_checks["disk"] = ("WARN", f"Disk check failed: {e}", {}) check.status = "WARN"
check.message = f"Jabra check failed: {e}"
self.hardware_checks["jabra"] = check
def _check_stm32_bridge(self) -> None:
"""Check STM32 bridge connection."""
check = HardwareCheck("STM32 Bridge", "UNKNOWN", "No check performed")
def _check_ram(self):
try: try:
mem = psutil.virtual_memory() # Check serial port exists
percent = mem.percent stm32_port = "/dev/ttyUSB0" # May vary
if percent > 90: if Path(stm32_port).exists():
self.hardware_checks["ram"] = ("ERROR", f"RAM {percent}% used", {}) check.status = "OK"
elif percent > 80: check.message = f"STM32 bridge detected on {stm32_port}"
self.hardware_checks["ram"] = ("WARN", f"RAM {percent}% used", {}) check.details = {"port": stm32_port}
else: else:
self.hardware_checks["ram"] = ("OK", f"RAM OK {percent}% used", {}) check.status = "WARN"
check.message = "STM32 bridge serial port not found"
except Exception as e: except Exception as e:
self.hardware_checks["ram"] = ("WARN", f"RAM check failed: {e}", {}) check.status = "ERROR"
check.message = f"STM32 check failed: {e}"
def _announce_boot_result(self): self.hardware_checks["stm32_bridge"] = check
errors = [k for k, (s, _, _) in self.hardware_checks.items() if s == "ERROR"]
def _check_servos(self) -> None:
"""Check servo controller."""
check = HardwareCheck("Servo Controller", "UNKNOWN", "No check performed")
try:
# Check for I2C servo controller
result = subprocess.run(
["i2cdetect", "-y", "1"], capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
check.status = "OK"
check.message = "I2C bus responsive (servo controller likely present)"
check.details = {"bus": "I2C-1"}
else:
check.status = "WARN"
check.message = "I2C bus check failed"
except FileNotFoundError:
check.status = "WARN"
check.message = "i2cdetect not available"
except Exception as e:
check.status = "WARN"
check.message = f"Servo check failed: {e}"
self.hardware_checks["servos"] = check
def _check_wifi(self) -> None:
"""Check WiFi connectivity."""
check = HardwareCheck("WiFi", "UNKNOWN", "No check performed")
try:
result = subprocess.run(
["iwconfig"], capture_output=True, text=True, timeout=5
)
if "ESSID" in result.stdout and "Frequency" in result.stdout:
check.status = "OK"
check.message = "WiFi interface active"
check.details = {"status": "connected"}
else:
check.status = "WARN"
check.message = "WiFi interface not connected"
except Exception as e:
check.status = "WARN"
check.message = f"WiFi check failed: {e}"
self.hardware_checks["wifi"] = check
def _check_gps(self) -> None:
"""Check GPS module."""
check = HardwareCheck("GPS Module", "UNKNOWN", "No check performed")
try:
# Check for GPS serial port
gps_ports = ["/dev/ttyUSB*", "/dev/ttyACM*"]
# Since glob patterns are complex, just check common ports
gps_device = None
for port in ["/dev/ttyUSB3", "/dev/ttyUSB4", "/dev/ttyACM1"]:
if Path(port).exists():
gps_device = port
break
if gps_device:
check.status = "OK"
check.message = f"GPS device detected on {gps_device}"
check.details = {"port": gps_device}
else:
check.status = "WARN"
check.message = "GPS module not detected"
except Exception as e:
check.status = "WARN"
check.message = f"GPS check failed: {e}"
self.hardware_checks["gps"] = check
def _check_disk_space(self) -> None:
"""Check disk space in data directory."""
check = HardwareCheck("Disk Space", "UNKNOWN", "No check performed")
try:
disk_usage = psutil.disk_usage(str(self.log_dir.parent))
percent_used = disk_usage.percent
free_gb = disk_usage.free / (1024**3)
if percent_used > 90:
check.status = "ERROR"
check.message = f"Disk full: {percent_used:.1f}% used"
elif percent_used > 80:
check.status = "WARN"
check.message = f"Disk usage high: {percent_used:.1f}% used"
else:
check.status = "OK"
check.message = f"Disk OK: {free_gb:.2f} GB free"
check.details = {
"percent_used": percent_used,
"free_gb": free_gb,
"total_gb": disk_usage.total / (1024**3),
}
except Exception as e:
check.status = "WARN"
check.message = f"Disk check failed: {e}"
self.hardware_checks["disk_space"] = check
def _check_ram(self) -> None:
"""Check available system RAM."""
check = HardwareCheck("System RAM", "UNKNOWN", "No check performed")
try:
memory = psutil.virtual_memory()
percent_used = memory.percent
available_gb = memory.available / (1024**3)
if percent_used > 90:
check.status = "ERROR"
check.message = f"RAM critical: {percent_used:.1f}% used"
elif percent_used > 80:
check.status = "WARN"
check.message = f"RAM high: {percent_used:.1f}% used"
else:
check.status = "OK"
check.message = f"RAM OK: {available_gb:.2f} GB available"
check.details = {
"percent_used": percent_used,
"available_gb": available_gb,
"total_gb": memory.total / (1024**3),
}
except Exception as e:
check.status = "WARN"
check.message = f"RAM check failed: {e}"
self.hardware_checks["ram"] = check
def _summarize_startup_checks(self) -> None:
"""Generate summary of startup checks."""
errors = [name for name, check in self.hardware_checks.items() if check.status == "ERROR"]
warnings = [name for name, check in self.hardware_checks.items() if check.status == "WARN"]
summary = f"Startup checks: {len(self.hardware_checks)} items checked"
if errors: if errors:
msg = f"Boot complete with errors. {', '.join(errors)} offline." summary += f", {len(errors)} errors: {', '.join(errors)}"
if warnings:
summary += f", {len(warnings)} warnings: {', '.join(warnings)}"
self.get_logger().info(summary)
def _announce_boot_result(self) -> None:
"""Announce boot result via TTS."""
errors = [name for name, check in self.hardware_checks.items() if check.status == "ERROR"]
if not errors:
message = "Boot complete. All systems online."
else: else:
msg = "Boot complete. All systems online." message = f"Boot complete with errors. {', '.join(errors)} offline."
self.pub_tts.publish(String(data=msg))
def _trigger_face_animation(self): # Publish TTS message
errors = [k for k, (s, _, _) in self.hardware_checks.items() if s == "ERROR"] try:
animation = "boot_error" if errors else "boot_success" self.pub_tts.publish(String(data=message))
self.pub_face.publish(String(data=animation)) except Exception as e:
self.get_logger().warn(f"Failed to publish TTS: {e}")
def _runtime_check(self): def _trigger_face_animation(self) -> None:
"""Trigger face boot animation."""
errors = [name for name, check in self.hardware_checks.items() if check.status == "ERROR"]
animation_type = "boot_error" if errors else "boot_success"
try:
self.pub_face.publish(String(data=animation_type))
except Exception as e:
self.get_logger().warn(f"Failed to trigger face animation: {e}")
def _runtime_check(self) -> None:
"""Perform runtime health monitoring."""
if not self.startup_complete: if not self.startup_complete:
return return # Wait for startup checks
self._check_gpu_temp() # Check CPU temperature (Orin GPU)
self._check_network() self._check_gpu_temperature()
# Check network latency
self._check_network_latency()
# Publish diagnostic array
self._publish_diagnostics() self._publish_diagnostics()
def _check_gpu_temp(self): def _check_gpu_temperature(self) -> None:
"""Check Jetson Orin GPU temperature."""
try: try:
result = subprocess.run( result = subprocess.run(
["cat", "/sys/devices/virtual/thermal/thermal_zone0/temp"], ["cat", "/sys/devices/virtual/thermal/thermal_zone0/temp"],
capture_output=True, text=True, timeout=1 capture_output=True,
text=True,
timeout=2,
) )
temp_c = int(result.stdout.strip()) / 1000.0 temp_c = int(result.stdout.strip()) / 1000.0
status = "ERROR" if temp_c > 85 else "WARN" if temp_c > 80 else "OK"
self.runtime_metrics["gpu_temp"] = (status, f"GPU {temp_c:.1f}C", {})
except:
pass
def _check_network(self): if temp_c > 80:
status = "WARN"
elif temp_c > 85:
status = "ERROR"
else:
status = "OK"
self.runtime_metrics["gpu_temp"] = {
"status": status,
"temperature_c": temp_c,
"threshold_warn": 80,
"threshold_error": 85,
}
except Exception as e:
self.runtime_metrics["gpu_temp"] = {
"status": "UNKNOWN",
"error": str(e),
}
def _check_network_latency(self) -> None:
"""Check network latency to gateway."""
try: try:
result = subprocess.run( result = subprocess.run(
["ping", "-c", "1", "-W", "1", "8.8.8.8"], ["ping", "-c", "1", "-W", "1", "8.8.8.8"],
capture_output=True, text=True, timeout=2 capture_output=True,
text=True,
timeout=3,
) )
if result.returncode == 0:
self.runtime_metrics["network"] = ("OK", "Network OK", {})
else:
self.runtime_metrics["network"] = ("WARN", "No connectivity", {})
except:
pass
def _publish_diagnostics(self): if result.returncode == 0:
# Parse latency from output
for line in result.stdout.split("\n"):
if "time=" in line:
parts = line.split("time=")[-1].split(" ")
latency_ms = float(parts[0])
if latency_ms > 100:
status = "WARN"
elif latency_ms > 200:
status = "ERROR"
else:
status = "OK"
self.runtime_metrics["network_latency"] = {
"status": status,
"latency_ms": latency_ms,
}
break
else:
self.runtime_metrics["network_latency"] = {
"status": "WARN",
"message": "No network connectivity",
}
except Exception as e:
self.runtime_metrics["network_latency"] = {
"status": "UNKNOWN",
"error": str(e),
}
def _publish_diagnostics(self) -> None:
"""Publish diagnostic array."""
array = DiagnosticArray() array = DiagnosticArray()
array.header.stamp = self.get_clock().now().to_msg() array.header.stamp = self.get_clock().now().to_msg()
for name, (status, msg, details) in list(self.hardware_checks.items()) + list(self.runtime_metrics.items()): # Add startup checks
diag = DiagnosticStatus() for name, check in self.hardware_checks.items():
diag.name = f"saltybot/{name}" status = DiagnosticStatus()
level_map = {"OK": 0, "WARN": 1, "ERROR": 2} status.name = f"saltybot/{name}"
diag.level = level_map.get(status, 3) status.level = self._get_diagnostic_level(check.status)
diag.message = msg status.message = check.message
for k, v in (details or {}).items(): if check.details:
kv = KeyValue() for key, value in check.details.items():
kv.key = k status.values.append(
kv.value = str(v) self._create_key_value(key, str(value))
diag.values.append(kv) )
array.status.append(diag) array.status.append(status)
# Add runtime metrics
for metric_name, metric_data in self.runtime_metrics.items():
status = DiagnosticStatus()
status.name = f"saltybot/{metric_name}"
status.level = self._get_diagnostic_level(metric_data.get("status", "UNKNOWN"))
status.message = metric_data.get("message", "Runtime check")
for key, value in metric_data.items():
if key != "status" and key != "message":
status.values.append(
self._create_key_value(key, str(value))
)
array.status.append(status)
self.pub_diagnostics.publish(array) self.pub_diagnostics.publish(array)
def _log_diagnostics(self, check_type: str): def _get_diagnostic_level(self, status: str) -> int:
"""Convert status string to DiagnosticStatus level."""
mapping = {
"OK": DiagnosticStatus.OK,
"WARN": DiagnosticStatus.WARN,
"ERROR": DiagnosticStatus.ERROR,
"UNKNOWN": DiagnosticStatus.STALE,
}
return mapping.get(status, DiagnosticStatus.STALE)
def _create_key_value(self, key: str, value: str):
"""Create a KeyValue for diagnostic status."""
from diagnostic_msgs.msg import KeyValue
kv = KeyValue()
kv.key = key
kv.value = value
return kv
def _log_diagnostics(self, check_type: str) -> None:
"""Log diagnostics to JSON file."""
try: try:
log_data = { log_data = {
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
"check_type": check_type, "check_type": check_type,
"hardware_checks": { "hardware_checks": {
name: {"status": s, "message": m, "details": d} name: {
for name, (s, m, d) in self.hardware_checks.items() "status": check.status,
}, "message": check.message,
"runtime_metrics": { "details": check.details or {},
name: {"status": s, "message": m}
for name, (s, m, _) in self.runtime_metrics.items()
},
} }
for name, check in self.hardware_checks.items()
},
"runtime_metrics": self.runtime_metrics,
}
filename = self.log_dir / f"diagnostics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" filename = self.log_dir / f"diagnostics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, "w") as f: with open(filename, "w") as f:
json.dump(log_data, f, indent=2) json.dump(log_data, f, indent=2)
self.get_logger().info(f"Logged to {filename}")
self.get_logger().info(f"Diagnostics logged to {filename}")
except Exception as e: except Exception as e:
self.get_logger().error(f"Failed to log: {e}") self.get_logger().error(f"Failed to log diagnostics: {e}")
def main(args=None): def main(args=None):

View File

@ -16,7 +16,10 @@ setup(
zip_safe=True, zip_safe=True,
maintainer="sl-controls", maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local", maintainer_email="sl-controls@saltylab.local",
description="Hardware diagnostic self-test: startup checks + continuous monitoring", description=(
"Hardware diagnostic self-test: startup checks + continuous monitoring, "
"telemetry logging, TTS reporting, face alerts"
),
license="MIT", license="MIT",
tests_require=["pytest"], tests_require=["pytest"],
entry_points={ entry_points={

View File

@ -1,26 +1,99 @@
"""Unit tests for diagnostics.""" """Unit tests for diagnostics system."""
import unittest import unittest
import json import json
from datetime import datetime from datetime import datetime
class TestDiagnostics(unittest.TestCase): class TestDiagnostics(unittest.TestCase):
"""Test cases for diagnostics node."""
def test_hardware_check_creation(self): def test_hardware_check_creation(self):
"""Test creation of hardware check results."""
checks = { checks = {
"rplidar": {"status": "OK", "message": "RPLIDAR detected"}, "rplidar": {"status": "OK", "message": "RPLIDAR detected"},
"realsense": {"status": "ERROR", "message": "RealSense not found"}, "realsense": {"status": "ERROR", "message": "RealSense not found"},
"vesc": {"status": "WARN", "message": "VESC connection uncertain"},
} }
self.assertEqual(len(checks), 2)
self.assertEqual(len(checks), 3)
self.assertEqual(checks["rplidar"]["status"], "OK") self.assertEqual(checks["rplidar"]["status"], "OK")
self.assertEqual(checks["realsense"]["status"], "ERROR")
def test_diagnostic_json_logging(self): def test_diagnostic_json_logging(self):
"""Test JSON logging of diagnostics."""
log_data = { log_data = {
"timestamp": datetime.now().isoformat(), "timestamp": datetime.now().isoformat(),
"check_type": "startup", "check_type": "startup_checks",
"hardware_checks": {"rplidar": {"status": "OK", "message": "OK"}}, "hardware_checks": {
"rplidar": {
"status": "OK",
"message": "Device OK",
"details": {"port": "/dev/ttyUSB0"},
},
"realsense": {
"status": "ERROR",
"message": "Device not found",
"details": {},
},
},
"runtime_metrics": {
"gpu_temp": {"status": "OK", "temperature_c": 65.0},
"network_latency": {"status": "WARN", "latency_ms": 150},
},
} }
# Should be JSON serializable
json_str = json.dumps(log_data) json_str = json.dumps(log_data)
parsed = json.loads(json_str) parsed = json.loads(json_str)
self.assertIn("timestamp", parsed) self.assertIn("timestamp", parsed)
self.assertEqual(len(parsed["hardware_checks"]), 2)
self.assertEqual(parsed["hardware_checks"]["rplidar"]["status"], "OK")
def test_temperature_threshold_detection(self):
"""Test temperature threshold detection."""
thresholds = {
"gpu_temp": {"warn": 80, "error": 85},
"vesc_temp": {"warn": 60, "error": 70},
}
test_temps = [
(65, "OK"),
(82, "WARN"),
(88, "ERROR"),
]
for temp, expected_status in test_temps:
if temp < thresholds["gpu_temp"]["warn"]:
status = "OK"
elif temp < thresholds["gpu_temp"]["error"]:
status = "WARN"
else:
status = "ERROR"
self.assertEqual(status, expected_status)
def test_diagnostic_aggregation(self):
"""Test aggregation of multiple diagnostics."""
hardware_checks = {
"rplidar": "OK",
"realsense": "OK",
"vesc": "ERROR",
"wifi": "OK",
"gps": "WARN",
}
errors = [name for name, status in hardware_checks.items() if status == "ERROR"]
warnings = [name for name, status in hardware_checks.items() if status == "WARN"]
ok_items = [name for name, status in hardware_checks.items() if status == "OK"]
self.assertEqual(len(errors), 1)
self.assertEqual(len(warnings), 1)
self.assertEqual(len(ok_items), 3)
self.assertIn("vesc", errors)
self.assertIn("gps", warnings)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()