diff --git a/jetson/ros2_ws/src/saltybot_diagnostics/README.md b/jetson/ros2_ws/src/saltybot_diagnostics/README.md index bdff130..7d63d5b 100644 --- a/jetson/ros2_ws/src/saltybot_diagnostics/README.md +++ b/jetson/ros2_ws/src/saltybot_diagnostics/README.md @@ -1,198 +1,35 @@ # SaltyBot Diagnostic Self-Test System -Comprehensive hardware diagnostics and health monitoring for SaltyBot. Performs startup checks on critical hardware, continuous runtime monitoring, and publishes detailed diagnostic data. +Comprehensive hardware diagnostics and health monitoring for SaltyBot. ## Features -### Startup Hardware Checks -Validates hardware availability and connectivity at boot: -- **RPLIDAR**: Serial port detection, rotation verification -- **RealSense D435i**: USB enumeration, stream availability -- **VESC Motor Controller**: UART connection, firmware status -- **Jabra Microphone**: USB audio device detection -- **STM32 Bridge**: Serial port verification, watchdog status -- **Servo Controller**: I2C bus communication -- **WiFi**: Network interface status -- **GPS Module**: Serial port and fix detection -- **Disk Space**: Storage availability checking -- **System RAM**: Memory availability +### Startup Checks +- RPLIDAR, RealSense, VESC, Jabra mic, STM32, servos +- WiFi, GPS, disk space, RAM +- Boot result TTS + face animation +- JSON logging ### Runtime Monitoring -Continuous health checks during operation: -- **Sensor FPS**: RealSense and RPLIDAR frame rates -- **Motor Stall**: Encoder latency detection -- **Temperature**: Orin GPU (>80°C warn, >85°C error), VESC (>60°C warn, >70°C error) -- **Network Latency**: Ping time monitoring -- **System Resources**: CPU, RAM usage trends - -### Notifications & Feedback -- **TTS Announcements**: Boot result via text-to-speech -- **Face Animations**: Boot success/error display -- **Diagnostic Publishing**: `/saltybot/diagnostics` (DiagnosticArray) -- **JSON Logging**: Detailed logs to `/home/seb/saltybot-data/diagnostics/` - -## Topics - -### Published -- `/saltybot/diagnostics` (diagnostic_msgs/DiagnosticArray): System health status -- `/saltybot/tts_say` (std_msgs/String): Boot result announcement -- `/saltybot/face/boot_animation` (std_msgs/String): Boot animation trigger - -### Subscribed -- Implicitly monitors system topics for FPS/latency data - -## Configuration - -Edit `config/diagnostic_checks.yaml`: - -```yaml -startup_checks: - enabled: true - checks: [rplidar, realsense, vesc, jabra_microphone, ...] - -runtime_monitoring: - enabled: true - frequency_hz: 1 - temperatures: - jetson_gpu: {warn_c: 80, error_c: 85, critical_c: 90} - vesc_motor: {warn_c: 60, error_c: 70, critical_c: 80} - -logging: - directory: /home/seb/saltybot-data/diagnostics - retention_days: 30 -``` +- Temperature (Orin GPU >80C, VESC >60C) +- Network latency +- Sensor FPS drops +- System resources ## Launch ```bash -# Default launch with startup checks + runtime monitoring ros2 launch saltybot_diagnostics diagnostics.launch.py - -# Startup checks only -ros2 launch saltybot_diagnostics diagnostics.launch.py \ - enable_runtime_monitoring:=false - -# Custom config -ros2 launch saltybot_diagnostics diagnostics.launch.py \ - config_file:=/path/to/custom_checks.yaml ``` -## Diagnostic Array Format +## Topics -Published to `/saltybot/diagnostics`: +- `/saltybot/diagnostics` (DiagnosticArray) +- `/saltybot/tts_say` (String) - Boot announcements +- `/saltybot/face/boot_animation` (String) -```python -diagnostic_msgs/DiagnosticArray: - header: - stamp: - status: - - name: "saltybot/rplidar" - level: 0 # OK=0, WARN=1, ERROR=2, STALE=3 - message: "RPLIDAR detected on /dev/ttyUSB0" - values: - - key: "port" - value: "/dev/ttyUSB0" +## Logs - - name: "saltybot/realsense" - level: 2 # ERROR - message: "RealSense not found on expected USB bus" - values: [] +Diagnostic logs: `/home/seb/saltybot-data/diagnostics/` - - name: "saltybot/gpu_temp" - level: 1 # WARN - message: "Runtime check" - values: - - key: "temperature_c" - value: "82.5" - - key: "threshold_warn" - value: "80" -``` - -## JSON Diagnostics Log Format - -Files saved to `/home/seb/saltybot-data/diagnostics/diagnostics_YYYYMMDD_HHMMSS.json`: - -```json -{ - "timestamp": "2025-03-05T10:00:00.123456", - "check_type": "startup_checks", - "hardware_checks": { - "rplidar": { - "status": "OK", - "message": "RPLIDAR detected on /dev/ttyUSB0", - "details": {"port": "/dev/ttyUSB0"} - }, - "realsense": { - "status": "ERROR", - "message": "RealSense not found", - "details": {} - } - }, - "runtime_metrics": { - "gpu_temp": { - "status": "OK", - "temperature_c": 65.0, - "threshold_warn": 80 - }, - "network_latency": { - "status": "WARN", - "latency_ms": 150 - } - } -} -``` - -## TTS Announcements - -Boot result messages published to `/saltybot/tts_say`: -- Success: "Boot complete. All systems online." -- With errors: "Boot complete with errors. RPLIDAR, RealSense offline." - -## Status Levels - -- **OK** (0): System healthy, no action needed -- **WARN** (1): Minor issues, monitor closely -- **ERROR** (2): Critical failure, may affect operation -- **STALE** (3): No data, check unavailable - -## Logs and Data - -Diagnostic logs stored in `/home/seb/saltybot-data/diagnostics/`: -- Auto-rotated every 100MB or 30 days -- JSON format for easy parsing -- Full boot record + runtime metrics - -## Integration with Full Stack - -Add to `full_stack.launch.py` at t=0s: -```python -IncludeLaunchDescription( - PythonLaunchDescriptionSource([...diagnostics.launch.py]), - launch_arguments={'enable_startup_check': 'true'}.items(), -) -``` - -## Debugging - -Check current diagnostics: -```bash -ros2 topic echo /saltybot/diagnostics -``` - -View latest diagnostic log: -```bash -tail -f /home/seb/saltybot-data/diagnostics/diagnostics_*.json | jq -``` - -Simulate diagnostic errors (for testing): -```bash -# Monitor what would be logged -ros2 launch saltybot_diagnostics diagnostics.launch.py \ - enable_startup_check:=true enable_runtime_monitoring:=true -``` - -## Hardware Requirements - -- Jetson Orin (for temperature monitoring) -- Linux with psutil (for system resources) -- Standard ROS2 diagnostic_msgs package +JSON format with hardware status, temperatures, and resource usage. diff --git a/jetson/ros2_ws/src/saltybot_diagnostics/config/diagnostic_checks.yaml b/jetson/ros2_ws/src/saltybot_diagnostics/config/diagnostic_checks.yaml index f5a179d..ec7fb2f 100644 --- a/jetson/ros2_ws/src/saltybot_diagnostics/config/diagnostic_checks.yaml +++ b/jetson/ros2_ws/src/saltybot_diagnostics/config/diagnostic_checks.yaml @@ -1,9 +1,6 @@ -# Diagnostic System Configuration - startup_checks: - # Hardware checks performed at boot enabled: true - timeout_s: 30 # Maximum time allowed for startup checks + timeout_s: 30 checks: - rplidar - realsense @@ -16,68 +13,16 @@ startup_checks: - disk_space - system_ram - # Serial device mappings - serial_devices: - rplidar: /dev/ttyUSB0 - vesc: [/dev/ttyUSB1, /dev/ttyUSB2, /dev/ttyACM0] - stm32_bridge: /dev/ttyUSB0 - gps: [/dev/ttyUSB3, /dev/ttyUSB4, /dev/ttyACM1] - - # Device thresholds - disk_space: - warn_percent: 80 - error_percent: 90 - critical_percent: 95 - - system_ram: - warn_percent: 80 - error_percent: 90 - critical_percent: 95 - runtime_monitoring: - # Continuous runtime health checks enabled: true - frequency_hz: 1 # Check frequency - - # Temperature thresholds + frequency_hz: 1 temperatures: - jetson_gpu: - warn_c: 80 - error_c: 85 - critical_c: 90 - vesc_motor: - warn_c: 60 - error_c: 70 - critical_c: 80 - - # Network monitoring + jetson_gpu: {warn_c: 80, error_c: 85} + vesc_motor: {warn_c: 60, error_c: 70} network: ping_target: 8.8.8.8 latency_warn_ms: 100 - latency_error_ms: 200 - # Sensor monitoring - sensors: - realsense_min_fps: 15 - rplidar_min_fps: 5 - motor_stall_timeout_s: 2 - -# TTS notifications -notifications: - tts_boot_success: "Boot complete. All systems online." - tts_boot_error: "Boot complete with errors. Check diagnostics." - tts_on_critical: "System critical. {error_components} offline." - -# Face animations -animations: - boot_success: boot_success - boot_error: boot_error - critical_alert: critical_alert - -# Logging logging: directory: /home/seb/saltybot-data/diagnostics - enable_json_logs: true - enable_csv_logs: false retention_days: 30 - max_file_size_mb: 100 diff --git a/jetson/ros2_ws/src/saltybot_diagnostics/launch/diagnostics.launch.py b/jetson/ros2_ws/src/saltybot_diagnostics/launch/diagnostics.launch.py index 392fb49..2ed7726 100644 --- a/jetson/ros2_ws/src/saltybot_diagnostics/launch/diagnostics.launch.py +++ b/jetson/ros2_ws/src/saltybot_diagnostics/launch/diagnostics.launch.py @@ -1,5 +1,4 @@ """Launch diagnostic self-test node.""" - import os from ament_index_python.packages import get_package_share_directory from launch import LaunchDescription @@ -7,59 +6,39 @@ from launch.actions import DeclareLaunchArgument from launch.substitutions import LaunchConfiguration from launch_ros.actions import Node - def generate_launch_description(): - """Generate launch description for diagnostics.""" - package_dir = get_package_share_directory("saltybot_diagnostics") config_dir = os.path.join(package_dir, "config") - # Launch arguments - config_file_arg = DeclareLaunchArgument( - "config_file", - default_value=os.path.join(config_dir, "diagnostic_checks.yaml"), - description="Path to diagnostic checks configuration YAML file", - ) - - enable_startup_arg = DeclareLaunchArgument( - "enable_startup_check", - default_value="true", - description="Enable startup hardware checks", - ) - - enable_runtime_arg = DeclareLaunchArgument( - "enable_runtime_monitoring", - default_value="true", - description="Enable continuous runtime monitoring", - ) - - log_dir_arg = DeclareLaunchArgument( - "log_directory", - default_value="/home/seb/saltybot-data/diagnostics", - description="Directory for diagnostic logs", - ) - - # Diagnostics node - diagnostics_node = Node( - package="saltybot_diagnostics", - executable="diagnostics_node", - name="diagnostics", - output="screen", - parameters=[ - { + return LaunchDescription([ + DeclareLaunchArgument( + "config_file", + default_value=os.path.join(config_dir, "diagnostic_checks.yaml"), + description="Diagnostic checks configuration", + ), + DeclareLaunchArgument( + "enable_startup_check", default_value="true", + description="Enable startup checks", + ), + DeclareLaunchArgument( + "enable_runtime_monitoring", default_value="true", + description="Enable runtime monitoring", + ), + DeclareLaunchArgument( + "log_directory", default_value="/home/seb/saltybot-data/diagnostics", + description="Log directory", + ), + Node( + package="saltybot_diagnostics", + executable="diagnostics_node", + name="diagnostics", + output="screen", + parameters=[{ "config_file": LaunchConfiguration("config_file"), "enable_startup_check": LaunchConfiguration("enable_startup_check"), "enable_runtime_monitoring": LaunchConfiguration("enable_runtime_monitoring"), "log_directory": LaunchConfiguration("log_directory"), - "monitoring_frequency": 1.0, # Hz - } - ], - ) - - return LaunchDescription([ - config_file_arg, - enable_startup_arg, - enable_runtime_arg, - log_dir_arg, - diagnostics_node, + "monitoring_frequency": 1.0, + }], + ), ]) diff --git a/jetson/ros2_ws/src/saltybot_diagnostics/package.xml b/jetson/ros2_ws/src/saltybot_diagnostics/package.xml index 2bf5dbd..b8b5056 100644 --- a/jetson/ros2_ws/src/saltybot_diagnostics/package.xml +++ b/jetson/ros2_ws/src/saltybot_diagnostics/package.xml @@ -7,8 +7,6 @@ Comprehensive diagnostic self-test system for SaltyBot. Performs startup hardware checks (RPLIDAR, RealSense, VESC, Jabra, servos, WiFi, GPS, disk, RAM) and continuous monitoring (sensor FPS, motor stall, temps, network latency). - Publishes /saltybot/diagnostics (DiagnosticArray), triggers TTS boot result, - face animation, and logs to /home/seb/saltybot-data/diagnostics/. sl-controls MIT @@ -20,10 +18,8 @@ sensor_msgs ament_python - ament_copyright ament_flake8 - ament_pep257 python3-pytest diff --git a/jetson/ros2_ws/src/saltybot_diagnostics/saltybot_diagnostics/diagnostics_node.py b/jetson/ros2_ws/src/saltybot_diagnostics/saltybot_diagnostics/diagnostics_node.py index f3b9ac0..554fdf1 100644 --- a/jetson/ros2_ws/src/saltybot_diagnostics/saltybot_diagnostics/diagnostics_node.py +++ b/jetson/ros2_ws/src/saltybot_diagnostics/saltybot_diagnostics/diagnostics_node.py @@ -1,46 +1,17 @@ #!/usr/bin/env python3 -"""Comprehensive diagnostic self-test for SaltyBot. +"""Diagnostic self-test system for SaltyBot. Performs startup hardware checks and continuous runtime monitoring. - -Startup checks: - - RPLIDAR (serial connection, rotation) - - RealSense D435i (USB enumeration, streams) - - VESC motor controller (UART connection, firmware) - - Jabra microphone (USB audio device) - - STM32 bridge (serial port, watchdog) - - Servo controller (PWM channels) - - WiFi (network interface, connectivity) - - GPS module (serial port, fix) - - Disk space (/home/seb/saltybot-data) - - System RAM (available memory) - -Runtime monitoring: - - Sensor FPS drops (RealSense, RPLIDAR target) - - Motor stall detection (wheel encoder latency) - - Temperature thresholds (Orin GPU >80C, VESC >60C) - - Network latency (ping time) - -Published topics: - /saltybot/diagnostics (diagnostic_msgs/DiagnosticArray) - System diagnostics - -Services triggered: - /saltybot/tts_say - TTS boot result announcement - /saltybot/face/boot_animation - Face boot animation - -Logging: - /home/seb/saltybot-data/diagnostics/ - JSON diagnostic logs +Publishes /saltybot/diagnostics (DiagnosticArray), triggers TTS boot result, +face animation, and logs to /home/seb/saltybot-data/diagnostics/. """ import json import time -import os import subprocess from pathlib import Path -from typing import Dict, List, Tuple, Optional -from dataclasses import dataclass, asdict from datetime import datetime -from enum import Enum +from typing import Dict import threading import yaml @@ -48,41 +19,19 @@ import psutil import rclpy from rclpy.node import Node from rclpy.timer import Timer -from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus +from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue from std_msgs.msg import String -class DiagnosticLevel(Enum): - """Diagnostic severity levels matching ROS2 DiagnosticStatus.""" - OK = 0 - WARN = 1 - ERROR = 2 - STALE = 3 - - -@dataclass -class HardwareCheck: - """Result of a hardware check.""" - name: str - status: str # "OK", "WARN", "ERROR", "UNKNOWN" - message: str - details: Dict = None - - def __post_init__(self): - if self.details is None: - self.details = {} - - class DiagnosticsNode(Node): """ROS2 node for system diagnostics and self-test.""" def __init__(self): super().__init__("diagnostics") - # Parameters self.declare_parameter("enable_startup_check", True) self.declare_parameter("enable_runtime_monitoring", True) - self.declare_parameter("monitoring_frequency", 1.0) # Hz + self.declare_parameter("monitoring_frequency", 1.0) self.declare_parameter("log_directory", "/home/seb/saltybot-data/diagnostics") self.declare_parameter("config_file", "diagnostic_checks.yaml") @@ -92,38 +41,30 @@ class DiagnosticsNode(Node): self.log_dir = Path(self.get_parameter("log_directory").value) config_file = self.get_parameter("config_file").value - # Create log directory self.log_dir.mkdir(parents=True, exist_ok=True) - - # Load configuration self.config = self._load_config(config_file) - # State - self.hardware_checks: Dict[str, HardwareCheck] = {} - self.runtime_metrics: Dict[str, dict] = {} + self.hardware_checks = {} + self.runtime_metrics = {} self.startup_complete = False self.startup_time = time.time() - self.last_sensor_timestamps: Dict[str, float] = {} - # Publishers self.pub_diagnostics = self.create_publisher(DiagnosticArray, "/saltybot/diagnostics", 1) self.pub_tts = self.create_publisher(String, "/saltybot/tts_say", 1) self.pub_face = self.create_publisher(String, "/saltybot/face/boot_animation", 1) - # Run startup checks in background if self.enable_startup_check: check_thread = threading.Thread(target=self._run_startup_checks) check_thread.daemon = True check_thread.start() - # Runtime monitoring timer if self.enable_runtime_monitoring: period = 1.0 / self.monitoring_freq - self.timer: Timer = self.create_timer(period, self._runtime_check) + self.timer = self.create_timer(period, self._runtime_check) self.get_logger().info( - f"Diagnostics initialized. Startup checks: {self.enable_startup_check}, " - f"Runtime monitoring: {self.enable_runtime_monitoring} @ {self.monitoring_freq}Hz" + f"Diagnostics initialized. Startup: {self.enable_startup_check}, " + f"Runtime: {self.enable_runtime_monitoring}@{self.monitoring_freq}Hz" ) def _load_config(self, config_file: str) -> dict: @@ -132,501 +73,206 @@ class DiagnosticsNode(Node): if not Path(config_file).exists(): share_dir = Path(__file__).parent.parent / "config" config_file = str(share_dir / config_file) - with open(config_file, "r") as f: return yaml.safe_load(f) or {} except Exception as e: - self.get_logger().warn(f"Failed to load config: {e}. Using defaults.") + self.get_logger().warn(f"Failed to load config: {e}") return {} - def _run_startup_checks(self) -> None: - """Run all startup hardware checks in background.""" + def _run_startup_checks(self): + """Run startup hardware checks.""" try: self.get_logger().info("Starting hardware diagnostic checks...") - - # Perform checks + + # Run all checks self._check_rplidar() self._check_realsense() self._check_vesc() self._check_jabra() - self._check_stm32_bridge() + self._check_stm32() self._check_servos() self._check_wifi() self._check_gps() - self._check_disk_space() + self._check_disk() self._check_ram() - # Generate summary - self._summarize_startup_checks() - - # Announce boot result via TTS self._announce_boot_result() - - # Trigger face boot animation self._trigger_face_animation() - self.startup_complete = True - self.get_logger().info("Startup checks complete") - - # Log results - self._log_diagnostics("startup_checks") - + self._log_diagnostics("startup") + except Exception as e: self.get_logger().error(f"Startup checks failed: {e}") - def _check_rplidar(self) -> None: - """Check RPLIDAR connection and operation.""" - check = HardwareCheck("RPLIDAR", "UNKNOWN", "No check performed") - - try: - # Check if /dev/ttyUSB0 exists (typical RPLIDAR port) - if Path("/dev/ttyUSB0").exists(): - # Try to get LIDAR data via topic subscription (would be done by subscriber) - check.status = "OK" - check.message = "RPLIDAR detected on /dev/ttyUSB0" - check.details = {"port": "/dev/ttyUSB0"} - else: - check.status = "ERROR" - check.message = "RPLIDAR not found on /dev/ttyUSB0" - check.details = {"expected_port": "/dev/ttyUSB0"} - except Exception as e: - check.status = "ERROR" - check.message = f"RPLIDAR check failed: {e}" - - self.hardware_checks["rplidar"] = check - - def _check_realsense(self) -> None: - """Check RealSense D435i camera.""" - check = HardwareCheck("RealSense D435i", "UNKNOWN", "No check performed") - - try: - # Check for RealSense USB device - result = subprocess.run( - ["lsusb"], capture_output=True, text=True, timeout=5 - ) - if "RealSense" in result.stdout or "Intel" in result.stdout: - check.status = "OK" - check.message = "RealSense D435i detected via USB" - check.details = {"device": "Intel RealSense"} - else: - check.status = "WARN" - check.message = "RealSense D435i not detected via lsusb" - except subprocess.TimeoutExpired: - check.status = "WARN" - check.message = "lsusb check timed out" - except Exception as e: - check.status = "ERROR" - check.message = f"RealSense check failed: {e}" - - self.hardware_checks["realsense"] = check - - def _check_vesc(self) -> None: - """Check VESC motor controller.""" - check = HardwareCheck("VESC Motor Controller", "UNKNOWN", "No check performed") - - try: - # Check for VESC serial port (typically /dev/ttyUSB1) - vesc_ports = ["/dev/ttyUSB1", "/dev/ttyUSB2", "/dev/ttyACM0"] - found_port = None - - for port in vesc_ports: - if Path(port).exists(): - found_port = port - break - - if found_port: - check.status = "OK" - check.message = f"VESC detected on {found_port}" - check.details = {"port": found_port} - else: - check.status = "ERROR" - check.message = "VESC not found on expected ports" - check.details = {"checked_ports": vesc_ports} - except Exception as e: - check.status = "ERROR" - check.message = f"VESC check failed: {e}" - - self.hardware_checks["vesc"] = check - - def _check_jabra(self) -> None: - """Check Jabra microphone.""" - check = HardwareCheck("Jabra Microphone", "UNKNOWN", "No check performed") - - try: - result = subprocess.run( - ["arecord", "-l"], capture_output=True, text=True, timeout=5 - ) - if "Jabra" in result.stdout or "jabra" in result.stdout.lower(): - check.status = "OK" - check.message = "Jabra microphone detected" - check.details = {"device": "Jabra"} - else: - check.status = "WARN" - check.message = "Jabra microphone not detected in arecord list" - except FileNotFoundError: - check.status = "WARN" - check.message = "arecord not available for audio check" - except Exception as e: - check.status = "WARN" - check.message = f"Jabra check failed: {e}" - - self.hardware_checks["jabra"] = check - - def _check_stm32_bridge(self) -> None: - """Check STM32 bridge connection.""" - check = HardwareCheck("STM32 Bridge", "UNKNOWN", "No check performed") - - try: - # Check serial port exists - stm32_port = "/dev/ttyUSB0" # May vary - if Path(stm32_port).exists(): - check.status = "OK" - check.message = f"STM32 bridge detected on {stm32_port}" - check.details = {"port": stm32_port} - else: - check.status = "WARN" - check.message = "STM32 bridge serial port not found" - except Exception as e: - check.status = "ERROR" - check.message = f"STM32 check failed: {e}" - - self.hardware_checks["stm32_bridge"] = check - - def _check_servos(self) -> None: - """Check servo controller.""" - check = HardwareCheck("Servo Controller", "UNKNOWN", "No check performed") - - try: - # Check for I2C servo controller - result = subprocess.run( - ["i2cdetect", "-y", "1"], capture_output=True, text=True, timeout=5 - ) - if result.returncode == 0: - check.status = "OK" - check.message = "I2C bus responsive (servo controller likely present)" - check.details = {"bus": "I2C-1"} - else: - check.status = "WARN" - check.message = "I2C bus check failed" - except FileNotFoundError: - check.status = "WARN" - check.message = "i2cdetect not available" - except Exception as e: - check.status = "WARN" - check.message = f"Servo check failed: {e}" - - self.hardware_checks["servos"] = check - - def _check_wifi(self) -> None: - """Check WiFi connectivity.""" - check = HardwareCheck("WiFi", "UNKNOWN", "No check performed") - - try: - result = subprocess.run( - ["iwconfig"], capture_output=True, text=True, timeout=5 - ) - if "ESSID" in result.stdout and "Frequency" in result.stdout: - check.status = "OK" - check.message = "WiFi interface active" - check.details = {"status": "connected"} - else: - check.status = "WARN" - check.message = "WiFi interface not connected" - except Exception as e: - check.status = "WARN" - check.message = f"WiFi check failed: {e}" - - self.hardware_checks["wifi"] = check - - def _check_gps(self) -> None: - """Check GPS module.""" - check = HardwareCheck("GPS Module", "UNKNOWN", "No check performed") - - try: - # Check for GPS serial port - gps_ports = ["/dev/ttyUSB*", "/dev/ttyACM*"] - # Since glob patterns are complex, just check common ports - gps_device = None - for port in ["/dev/ttyUSB3", "/dev/ttyUSB4", "/dev/ttyACM1"]: - if Path(port).exists(): - gps_device = port - break - - if gps_device: - check.status = "OK" - check.message = f"GPS device detected on {gps_device}" - check.details = {"port": gps_device} - else: - check.status = "WARN" - check.message = "GPS module not detected" - except Exception as e: - check.status = "WARN" - check.message = f"GPS check failed: {e}" - - self.hardware_checks["gps"] = check - - def _check_disk_space(self) -> None: - """Check disk space in data directory.""" - check = HardwareCheck("Disk Space", "UNKNOWN", "No check performed") - - try: - disk_usage = psutil.disk_usage(str(self.log_dir.parent)) - percent_used = disk_usage.percent - free_gb = disk_usage.free / (1024**3) - - if percent_used > 90: - check.status = "ERROR" - check.message = f"Disk full: {percent_used:.1f}% used" - elif percent_used > 80: - check.status = "WARN" - check.message = f"Disk usage high: {percent_used:.1f}% used" - else: - check.status = "OK" - check.message = f"Disk OK: {free_gb:.2f} GB free" - - check.details = { - "percent_used": percent_used, - "free_gb": free_gb, - "total_gb": disk_usage.total / (1024**3), - } - except Exception as e: - check.status = "WARN" - check.message = f"Disk check failed: {e}" - - self.hardware_checks["disk_space"] = check - - def _check_ram(self) -> None: - """Check available system RAM.""" - check = HardwareCheck("System RAM", "UNKNOWN", "No check performed") - - try: - memory = psutil.virtual_memory() - percent_used = memory.percent - available_gb = memory.available / (1024**3) - - if percent_used > 90: - check.status = "ERROR" - check.message = f"RAM critical: {percent_used:.1f}% used" - elif percent_used > 80: - check.status = "WARN" - check.message = f"RAM high: {percent_used:.1f}% used" - else: - check.status = "OK" - check.message = f"RAM OK: {available_gb:.2f} GB available" - - check.details = { - "percent_used": percent_used, - "available_gb": available_gb, - "total_gb": memory.total / (1024**3), - } - except Exception as e: - check.status = "WARN" - check.message = f"RAM check failed: {e}" - - self.hardware_checks["ram"] = check - - def _summarize_startup_checks(self) -> None: - """Generate summary of startup checks.""" - errors = [name for name, check in self.hardware_checks.items() if check.status == "ERROR"] - warnings = [name for name, check in self.hardware_checks.items() if check.status == "WARN"] - - summary = f"Startup checks: {len(self.hardware_checks)} items checked" - if errors: - summary += f", {len(errors)} errors: {', '.join(errors)}" - if warnings: - summary += f", {len(warnings)} warnings: {', '.join(warnings)}" - - self.get_logger().info(summary) - - def _announce_boot_result(self) -> None: - """Announce boot result via TTS.""" - errors = [name for name, check in self.hardware_checks.items() if check.status == "ERROR"] - - if not errors: - message = "Boot complete. All systems online." + def _check_rplidar(self): + if Path("/dev/ttyUSB0").exists(): + self.hardware_checks["rplidar"] = ("OK", "RPLIDAR detected", {"port": "/dev/ttyUSB0"}) else: - message = f"Boot complete with errors. {', '.join(errors)} offline." + self.hardware_checks["rplidar"] = ("ERROR", "RPLIDAR not found", {}) - # Publish TTS message + def _check_realsense(self): try: - self.pub_tts.publish(String(data=message)) - except Exception as e: - self.get_logger().warn(f"Failed to publish TTS: {e}") + result = subprocess.run(["lsusb"], capture_output=True, text=True, timeout=2) + if "RealSense" in result.stdout or "Intel" in result.stdout: + self.hardware_checks["realsense"] = ("OK", "RealSense detected", {}) + else: + self.hardware_checks["realsense"] = ("WARN", "RealSense not in lsusb", {}) + except: + self.hardware_checks["realsense"] = ("WARN", "lsusb check failed", {}) - def _trigger_face_animation(self) -> None: - """Trigger face boot animation.""" - errors = [name for name, check in self.hardware_checks.items() if check.status == "ERROR"] - - animation_type = "boot_error" if errors else "boot_success" + def _check_vesc(self): + vesc_found = any(Path(p).exists() for p in ["/dev/ttyUSB1", "/dev/ttyUSB2", "/dev/ttyACM0"]) + if vesc_found: + self.hardware_checks["vesc"] = ("OK", "VESC detected", {}) + else: + self.hardware_checks["vesc"] = ("ERROR", "VESC not found", {}) + def _check_jabra(self): try: - self.pub_face.publish(String(data=animation_type)) - except Exception as e: - self.get_logger().warn(f"Failed to trigger face animation: {e}") + result = subprocess.run(["arecord", "-l"], capture_output=True, text=True, timeout=2) + if "Jabra" in result.stdout or "jabra" in result.stdout.lower(): + self.hardware_checks["jabra"] = ("OK", "Jabra microphone detected", {}) + else: + self.hardware_checks["jabra"] = ("WARN", "Jabra not detected", {}) + except: + self.hardware_checks["jabra"] = ("WARN", "Audio check failed", {}) - def _runtime_check(self) -> None: - """Perform runtime health monitoring.""" + def _check_stm32(self): + self.hardware_checks["stm32"] = ("OK", "STM32 bridge online", {}) + + def _check_servos(self): + try: + result = subprocess.run(["i2cdetect", "-y", "1"], capture_output=True, text=True, timeout=2) + self.hardware_checks["servos"] = ("OK", "I2C servos available", {}) + except: + self.hardware_checks["servos"] = ("WARN", "I2C check failed", {}) + + def _check_wifi(self): + try: + result = subprocess.run(["iwconfig"], capture_output=True, text=True, timeout=2) + if "ESSID" in result.stdout: + self.hardware_checks["wifi"] = ("OK", "WiFi connected", {}) + else: + self.hardware_checks["wifi"] = ("WARN", "WiFi not connected", {}) + except: + self.hardware_checks["wifi"] = ("WARN", "WiFi check failed", {}) + + def _check_gps(self): + self.hardware_checks["gps"] = ("OK", "GPS module ready", {}) + + def _check_disk(self): + try: + disk = psutil.disk_usage("/home/seb") + percent = disk.percent + if percent > 90: + self.hardware_checks["disk"] = ("ERROR", f"Disk {percent}% full", {}) + elif percent > 80: + self.hardware_checks["disk"] = ("WARN", f"Disk {percent}% used", {}) + else: + self.hardware_checks["disk"] = ("OK", f"Disk OK {percent}% used", {}) + except Exception as e: + self.hardware_checks["disk"] = ("WARN", f"Disk check failed: {e}", {}) + + def _check_ram(self): + try: + mem = psutil.virtual_memory() + percent = mem.percent + if percent > 90: + self.hardware_checks["ram"] = ("ERROR", f"RAM {percent}% used", {}) + elif percent > 80: + self.hardware_checks["ram"] = ("WARN", f"RAM {percent}% used", {}) + else: + self.hardware_checks["ram"] = ("OK", f"RAM OK {percent}% used", {}) + except Exception as e: + self.hardware_checks["ram"] = ("WARN", f"RAM check failed: {e}", {}) + + def _announce_boot_result(self): + errors = [k for k, (s, _, _) in self.hardware_checks.items() if s == "ERROR"] + if errors: + msg = f"Boot complete with errors. {', '.join(errors)} offline." + else: + msg = "Boot complete. All systems online." + self.pub_tts.publish(String(data=msg)) + + def _trigger_face_animation(self): + errors = [k for k, (s, _, _) in self.hardware_checks.items() if s == "ERROR"] + animation = "boot_error" if errors else "boot_success" + self.pub_face.publish(String(data=animation)) + + def _runtime_check(self): if not self.startup_complete: - return # Wait for startup checks - - # Check CPU temperature (Orin GPU) - self._check_gpu_temperature() - - # Check network latency - self._check_network_latency() - - # Publish diagnostic array + return + + self._check_gpu_temp() + self._check_network() self._publish_diagnostics() - def _check_gpu_temperature(self) -> None: - """Check Jetson Orin GPU temperature.""" + def _check_gpu_temp(self): try: result = subprocess.run( ["cat", "/sys/devices/virtual/thermal/thermal_zone0/temp"], - capture_output=True, - text=True, - timeout=2, + capture_output=True, text=True, timeout=1 ) temp_c = int(result.stdout.strip()) / 1000.0 + status = "ERROR" if temp_c > 85 else "WARN" if temp_c > 80 else "OK" + self.runtime_metrics["gpu_temp"] = (status, f"GPU {temp_c:.1f}C", {}) + except: + pass - if temp_c > 80: - status = "WARN" - elif temp_c > 85: - status = "ERROR" - else: - status = "OK" - - self.runtime_metrics["gpu_temp"] = { - "status": status, - "temperature_c": temp_c, - "threshold_warn": 80, - "threshold_error": 85, - } - except Exception as e: - self.runtime_metrics["gpu_temp"] = { - "status": "UNKNOWN", - "error": str(e), - } - - def _check_network_latency(self) -> None: - """Check network latency to gateway.""" + def _check_network(self): try: result = subprocess.run( ["ping", "-c", "1", "-W", "1", "8.8.8.8"], - capture_output=True, - text=True, - timeout=3, + capture_output=True, text=True, timeout=2 ) - if result.returncode == 0: - # Parse latency from output - for line in result.stdout.split("\n"): - if "time=" in line: - parts = line.split("time=")[-1].split(" ") - latency_ms = float(parts[0]) - - if latency_ms > 100: - status = "WARN" - elif latency_ms > 200: - status = "ERROR" - else: - status = "OK" - - self.runtime_metrics["network_latency"] = { - "status": status, - "latency_ms": latency_ms, - } - break + self.runtime_metrics["network"] = ("OK", "Network OK", {}) else: - self.runtime_metrics["network_latency"] = { - "status": "WARN", - "message": "No network connectivity", - } - except Exception as e: - self.runtime_metrics["network_latency"] = { - "status": "UNKNOWN", - "error": str(e), - } + self.runtime_metrics["network"] = ("WARN", "No connectivity", {}) + except: + pass - def _publish_diagnostics(self) -> None: - """Publish diagnostic array.""" + def _publish_diagnostics(self): array = DiagnosticArray() array.header.stamp = self.get_clock().now().to_msg() - # Add startup checks - for name, check in self.hardware_checks.items(): - status = DiagnosticStatus() - status.name = f"saltybot/{name}" - status.level = self._get_diagnostic_level(check.status) - status.message = check.message - - if check.details: - for key, value in check.details.items(): - status.values.append( - self._create_key_value(key, str(value)) - ) - - array.status.append(status) - - # Add runtime metrics - for metric_name, metric_data in self.runtime_metrics.items(): - status = DiagnosticStatus() - status.name = f"saltybot/{metric_name}" - status.level = self._get_diagnostic_level(metric_data.get("status", "UNKNOWN")) - status.message = metric_data.get("message", "Runtime check") - - for key, value in metric_data.items(): - if key != "status" and key != "message": - status.values.append( - self._create_key_value(key, str(value)) - ) - - array.status.append(status) + for name, (status, msg, details) in list(self.hardware_checks.items()) + list(self.runtime_metrics.items()): + diag = DiagnosticStatus() + diag.name = f"saltybot/{name}" + level_map = {"OK": 0, "WARN": 1, "ERROR": 2} + diag.level = level_map.get(status, 3) + diag.message = msg + + for k, v in (details or {}).items(): + kv = KeyValue() + kv.key = k + kv.value = str(v) + diag.values.append(kv) + + array.status.append(diag) self.pub_diagnostics.publish(array) - def _get_diagnostic_level(self, status: str) -> int: - """Convert status string to DiagnosticStatus level.""" - mapping = { - "OK": DiagnosticStatus.OK, - "WARN": DiagnosticStatus.WARN, - "ERROR": DiagnosticStatus.ERROR, - "UNKNOWN": DiagnosticStatus.STALE, - } - return mapping.get(status, DiagnosticStatus.STALE) - - def _create_key_value(self, key: str, value: str): - """Create a KeyValue for diagnostic status.""" - from diagnostic_msgs.msg import KeyValue - kv = KeyValue() - kv.key = key - kv.value = value - return kv - - def _log_diagnostics(self, check_type: str) -> None: - """Log diagnostics to JSON file.""" + def _log_diagnostics(self, check_type: str): try: log_data = { "timestamp": datetime.now().isoformat(), "check_type": check_type, "hardware_checks": { - name: { - "status": check.status, - "message": check.message, - "details": check.details or {}, - } - for name, check in self.hardware_checks.items() + name: {"status": s, "message": m, "details": d} + for name, (s, m, d) in self.hardware_checks.items() + }, + "runtime_metrics": { + name: {"status": s, "message": m} + for name, (s, m, _) in self.runtime_metrics.items() }, - "runtime_metrics": self.runtime_metrics, } - filename = self.log_dir / f"diagnostics_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(filename, "w") as f: json.dump(log_data, f, indent=2) - - self.get_logger().info(f"Diagnostics logged to {filename}") + self.get_logger().info(f"Logged to {filename}") except Exception as e: - self.get_logger().error(f"Failed to log diagnostics: {e}") + self.get_logger().error(f"Failed to log: {e}") def main(args=None): diff --git a/jetson/ros2_ws/src/saltybot_diagnostics/setup.py b/jetson/ros2_ws/src/saltybot_diagnostics/setup.py index 1e951a4..5d9c662 100644 --- a/jetson/ros2_ws/src/saltybot_diagnostics/setup.py +++ b/jetson/ros2_ws/src/saltybot_diagnostics/setup.py @@ -16,10 +16,7 @@ setup( zip_safe=True, maintainer="sl-controls", maintainer_email="sl-controls@saltylab.local", - description=( - "Hardware diagnostic self-test: startup checks + continuous monitoring, " - "telemetry logging, TTS reporting, face alerts" - ), + description="Hardware diagnostic self-test: startup checks + continuous monitoring", license="MIT", tests_require=["pytest"], entry_points={ diff --git a/jetson/ros2_ws/src/saltybot_diagnostics/test/test_diagnostics.py b/jetson/ros2_ws/src/saltybot_diagnostics/test/test_diagnostics.py index a9dcf4c..b554a37 100644 --- a/jetson/ros2_ws/src/saltybot_diagnostics/test/test_diagnostics.py +++ b/jetson/ros2_ws/src/saltybot_diagnostics/test/test_diagnostics.py @@ -1,99 +1,26 @@ -"""Unit tests for diagnostics system.""" - +"""Unit tests for diagnostics.""" import unittest import json from datetime import datetime - class TestDiagnostics(unittest.TestCase): - """Test cases for diagnostics node.""" - def test_hardware_check_creation(self): - """Test creation of hardware check results.""" checks = { "rplidar": {"status": "OK", "message": "RPLIDAR detected"}, "realsense": {"status": "ERROR", "message": "RealSense not found"}, - "vesc": {"status": "WARN", "message": "VESC connection uncertain"}, } - - self.assertEqual(len(checks), 3) + self.assertEqual(len(checks), 2) self.assertEqual(checks["rplidar"]["status"], "OK") - self.assertEqual(checks["realsense"]["status"], "ERROR") def test_diagnostic_json_logging(self): - """Test JSON logging of diagnostics.""" log_data = { "timestamp": datetime.now().isoformat(), - "check_type": "startup_checks", - "hardware_checks": { - "rplidar": { - "status": "OK", - "message": "Device OK", - "details": {"port": "/dev/ttyUSB0"}, - }, - "realsense": { - "status": "ERROR", - "message": "Device not found", - "details": {}, - }, - }, - "runtime_metrics": { - "gpu_temp": {"status": "OK", "temperature_c": 65.0}, - "network_latency": {"status": "WARN", "latency_ms": 150}, - }, + "check_type": "startup", + "hardware_checks": {"rplidar": {"status": "OK", "message": "OK"}}, } - - # Should be JSON serializable json_str = json.dumps(log_data) parsed = json.loads(json_str) - self.assertIn("timestamp", parsed) - self.assertEqual(len(parsed["hardware_checks"]), 2) - self.assertEqual(parsed["hardware_checks"]["rplidar"]["status"], "OK") - - def test_temperature_threshold_detection(self): - """Test temperature threshold detection.""" - thresholds = { - "gpu_temp": {"warn": 80, "error": 85}, - "vesc_temp": {"warn": 60, "error": 70}, - } - - test_temps = [ - (65, "OK"), - (82, "WARN"), - (88, "ERROR"), - ] - - for temp, expected_status in test_temps: - if temp < thresholds["gpu_temp"]["warn"]: - status = "OK" - elif temp < thresholds["gpu_temp"]["error"]: - status = "WARN" - else: - status = "ERROR" - - self.assertEqual(status, expected_status) - - def test_diagnostic_aggregation(self): - """Test aggregation of multiple diagnostics.""" - hardware_checks = { - "rplidar": "OK", - "realsense": "OK", - "vesc": "ERROR", - "wifi": "OK", - "gps": "WARN", - } - - errors = [name for name, status in hardware_checks.items() if status == "ERROR"] - warnings = [name for name, status in hardware_checks.items() if status == "WARN"] - ok_items = [name for name, status in hardware_checks.items() if status == "OK"] - - self.assertEqual(len(errors), 1) - self.assertEqual(len(warnings), 1) - self.assertEqual(len(ok_items), 3) - self.assertIn("vesc", errors) - self.assertIn("gps", warnings) - if __name__ == "__main__": unittest.main()