Create saltybot_tests package with comprehensive automated testing: Test Coverage: - Node startup verification (all critical nodes within 30s) - Topic publishing verification - TF tree completeness (all transforms present) - Sensor health checks (RPLIDAR, RealSense, IMU) - Perception pipeline (person detection availability) - Navigation stack (odometry, transforms) - System stability (30-second no-crash test) - Graceful shutdown verification Features: - launch_testing framework for automated startup tests - NodeChecker: wait for nodes in ROS graph - TFChecker: verify TF tree completeness - TopicMonitor: track message rates and counts - Follow mode tests (minimal hardware deps) - Subsystem-specific tests for sensor health - Comprehensive README with troubleshooting Usage: pytest src/saltybot_tests/test/test_launch.py -v -s or colcon test --packages-select saltybot_tests Performance Targets: - Node startup: <30s (follow mode) - RPLIDAR: 10 Hz scan rate - RealSense: 30 Hz RGB + depth - Person detection: 5 Hz - System stability: 30s no-crash validation Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
198 lines
5.8 KiB
Python
198 lines
5.8 KiB
Python
"""test_subsystems.py — Detailed subsystem integration tests.
|
|
|
|
Tests individual subsystems:
|
|
- Sensor health (RPLIDAR, RealSense, IMU)
|
|
- Perception pipeline (person detection)
|
|
- Navigation (odometry, TF tree)
|
|
- Communication (rosbridge connectivity)
|
|
|
|
These tests run AFTER test_launch.py (which verifies the stack started).
|
|
"""
|
|
|
|
import time
|
|
import unittest
|
|
|
|
import pytest
|
|
import rclpy
|
|
from sensor_msgs.msg import LaserScan, Image, Imu
|
|
from nav_msgs.msg import Odometry
|
|
from geometry_msgs.msg import PoseStamped
|
|
|
|
from saltybot_tests.test_helpers import NodeChecker, TopicMonitor
|
|
|
|
|
|
class TestSensorHealth(unittest.TestCase):
|
|
"""Verify all sensor inputs are healthy."""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
"""Set up test fixtures."""
|
|
rclpy.init()
|
|
cls.node = rclpy.create_node("sensor_health_test")
|
|
cls.topic_monitor = TopicMonitor(cls.node)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
"""Clean up test fixtures."""
|
|
cls.topic_monitor.cleanup()
|
|
cls.node.destroy_node()
|
|
rclpy.shutdown()
|
|
|
|
def test_lidar_health(self):
|
|
"""LIDAR must publish scan data at ~10 Hz."""
|
|
self.topic_monitor.subscribe_to_topic("/scan", LaserScan)
|
|
|
|
# Let it collect for 2 seconds
|
|
deadline = time.time() + 2.0
|
|
while time.time() < deadline:
|
|
rclpy.spin_once(self.node, timeout_sec=0.1)
|
|
|
|
count = self.topic_monitor.get_message_count("/scan")
|
|
self.assertGreater(
|
|
count, 10,
|
|
f"LIDAR published only {count} messages in 2s (expected ~20 at 10 Hz)"
|
|
)
|
|
|
|
def test_realsense_rgb_publishing(self):
|
|
"""RealSense RGB camera must publish frames."""
|
|
self.topic_monitor.subscribe_to_topic("/camera/color/image_raw", Image)
|
|
|
|
deadline = time.time() + 3.0
|
|
while time.time() < deadline:
|
|
rclpy.spin_once(self.node, timeout_sec=0.1)
|
|
|
|
count = self.topic_monitor.get_message_count("/camera/color/image_raw")
|
|
self.assertGreater(
|
|
count, 0,
|
|
"RealSense RGB camera not publishing"
|
|
)
|
|
|
|
def test_imu_publishing(self):
|
|
"""IMU must publish data continuously."""
|
|
self.topic_monitor.subscribe_to_topic("/saltybot/imu", Imu)
|
|
|
|
deadline = time.time() + 2.0
|
|
while time.time() < deadline:
|
|
rclpy.spin_once(self.node, timeout_sec=0.1)
|
|
|
|
count = self.topic_monitor.get_message_count("/saltybot/imu")
|
|
self.assertGreater(
|
|
count, 0,
|
|
"IMU not publishing"
|
|
)
|
|
|
|
|
|
class TestPerceptionPipeline(unittest.TestCase):
|
|
"""Verify perception (person detection) is working."""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
"""Set up test fixtures."""
|
|
rclpy.init()
|
|
cls.node = rclpy.create_node("perception_test")
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
"""Clean up test fixtures."""
|
|
cls.node.destroy_node()
|
|
rclpy.shutdown()
|
|
|
|
def test_perception_node_alive(self):
|
|
"""Perception node must be alive."""
|
|
node_names = [n for n, _ in self.node.get_node_names()]
|
|
self.assertIn(
|
|
"yolo_node",
|
|
node_names,
|
|
"YOLOv8 perception node not found"
|
|
)
|
|
|
|
def test_detection_topic_advertised(self):
|
|
"""Person detection topic must be advertised."""
|
|
topics = {name for name, _ in self.node.get_topic_names_and_types()}
|
|
self.assertIn(
|
|
"/person/detections",
|
|
topics,
|
|
"Person detection topic not advertised"
|
|
)
|
|
|
|
|
|
class TestNavigationStack(unittest.TestCase):
|
|
"""Verify navigation subsystem is healthy."""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
"""Set up test fixtures."""
|
|
rclpy.init()
|
|
cls.node = rclpy.create_node("nav_test")
|
|
cls.topic_monitor = TopicMonitor(cls.node)
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
"""Clean up test fixtures."""
|
|
cls.topic_monitor.cleanup()
|
|
cls.node.destroy_node()
|
|
rclpy.shutdown()
|
|
|
|
def test_odometry_continuity(self):
|
|
"""Odometry must publish continuously."""
|
|
self.topic_monitor.subscribe_to_topic("/odom", Odometry)
|
|
|
|
deadline = time.time() + 3.0
|
|
while time.time() < deadline:
|
|
rclpy.spin_once(self.node, timeout_sec=0.1)
|
|
|
|
count = self.topic_monitor.get_message_count("/odom")
|
|
self.assertGreater(
|
|
count, 0,
|
|
"Odometry not publishing"
|
|
)
|
|
|
|
def test_tf_broadcasts(self):
|
|
"""TF tree must be broadcasting transforms."""
|
|
topics = {name for name, _ in self.node.get_topic_names_and_types()}
|
|
self.assertIn(
|
|
"/tf",
|
|
topics,
|
|
"TF broadcaster not active"
|
|
)
|
|
|
|
|
|
class TestCommunication(unittest.TestCase):
|
|
"""Verify rosbridge and external communication."""
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
"""Set up test fixtures."""
|
|
rclpy.init()
|
|
cls.node = rclpy.create_node("comm_test")
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
"""Clean up test fixtures."""
|
|
cls.node.destroy_node()
|
|
rclpy.shutdown()
|
|
|
|
def test_rosbridge_node_alive(self):
|
|
"""Rosbridge WebSocket server must be running."""
|
|
node_names = [n for n, _ in self.node.get_node_names()]
|
|
self.assertIn(
|
|
"rosbridge_websocket",
|
|
node_names,
|
|
"Rosbridge WebSocket server not running"
|
|
)
|
|
|
|
def test_key_topics_bridged(self):
|
|
"""Key topics must be available for bridging."""
|
|
topics = {name for name, _ in self.node.get_topic_names_and_types()}
|
|
critical_topics = [
|
|
"/odom",
|
|
"/scan",
|
|
"/camera/color/image_raw",
|
|
"/saltybot/imu",
|
|
]
|
|
missing = [t for t in critical_topics if t not in topics]
|
|
self.assertEqual(
|
|
missing, [],
|
|
f"Critical topics not available for bridging: {missing}"
|
|
)
|