feat: emergency stop cascade (Issue #459) #466

Merged
sl-jetson merged 1 commits from sl-firmware/issue-459-estop into main 2026-03-05 11:08:02 -05:00
11 changed files with 608 additions and 0 deletions
Showing only changes of commit 381549b33f - Show all commits

View File

@ -0,0 +1,81 @@
# SaltyBot Emergency Stop Cascade System
Critical safety emergency stop system with <50ms latency response.
## Features
### Trigger Sources
- **Voice**: "stop", "emergency", "e-stop"
- **Gamepad**: Emergency button
- **Tilt**: >45 degrees (IMU-based)
- **Obstacle**: <0.3m (LiDAR)
- **Geofence**: Boundary violation
- **Watchdog**: Timeout trigger
- **MQTT**: Remote kill signal
### Cascade Response (within 500ms)
1. **Zero cmd_vel** (<50ms) - Priority critical
2. **Disable autonomous** - Prevent override
3. **Face alert** - Emergency animation
4. **LED red** - Visual indicator
5. **TTS alert** - Audio confirmation
6. **Log event** - Sensor snapshot + timestamp
### Safety Properties
- **Cannot be overridden** once triggered
- **Manual resume only** - Gamepad Start or voice
- **Non-blockable** - Uses separate thread
- **Redundant triggers** - Multiple independent sources
## Launch
```bash
ros2 launch saltybot_emergency_stop estop.launch.py
```
## Topics
**Published**:
- `/saltybot/estop_active` (Bool) - System state
- `/saltybot/estop_event` (String) - Event JSON log
**Subscribed**:
- `/camera/imu` (Imu) - Tilt detection
- `/scan` (LaserScan) - Obstacle detection
- `/voice/command` (String) - Voice triggers
- `/gamepad/emergency` (Bool) - Gamepad e-stop
- `/gamepad/start` (Bool) - Resume control
- `/saltybot/geofence_violation` (Bool)
- `/saltybot/watchdog_timeout` (Bool)
- `/saltybot/mqtt_kill` (Bool) - Remote kill
## Response Times
- **cmd_vel zeroed**: <50ms
- **Full cascade**: <500ms
- **Resume pending**: Manual only
## Logging
Location: `/home/seb/saltybot-data/estop/`
Format: `estop_YYYYMMDD_HHMMSS.json`
Includes:
- Trigger source
- State transitions
- Sensor snapshot (tilt angle, IMU, LiDAR)
- Timestamp
## Resume Procedure
E-stop can only be resumed via:
1. **Gamepad**: Press Start button
2. **Voice**: Say "resume"
3. Manual override not possible
System will:
- Clear alert state
- Set LED to green
- Publish TTS resume message
- Resume normal operation

View File

@ -0,0 +1,62 @@
emergency_stop:
# System configuration
enabled: true
# Timing constraints (CRITICAL)
cmd_vel_zero_timeout_ms: 50 # Must zero cmd_vel within 50ms
cascade_timeout_ms: 500 # Full cascade within 500ms
# Trigger configuration
triggers:
voice: true
gamepad: true
tilt_threshold_deg: 45
obstacle_distance_m: 0.3
geofence: true
watchdog: true
mqtt_remote_kill: true
# Resume configuration (manual only)
resume_sources:
- gamepad_start_button
- voice_command_resume
- manual_reset_switch
# Cascade actions (order of execution)
cascade:
1_cmd_vel_zero:
priority: critical
timeout_ms: 50
topic: /cmd_vel
message_type: Twist
2_disable_autonomous:
topic: /saltybot/autonomous_enabled
message: false
3_face_alert:
topic: /saltybot/face/alert
message: emergency_stop
4_led_red:
topic: /saltybot/led/color
message: red
5_tts_alert:
topic: /saltybot/tts_say
message: "Emergency stop activated. System halted."
# Logging
logging:
directory: /home/seb/saltybot-data/estop
snapshot_sensors:
- imu
- lidar
- camera
retention_days: 30
# Safety constraints
safety:
cannot_override: true
requires_manual_resume: true
auto_disable_timeout_s: null # Never auto-enable (manual only)

View File

@ -0,0 +1,37 @@
"""Launch emergency stop server."""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
package_dir = get_package_share_directory("saltybot_emergency_stop")
config_dir = os.path.join(package_dir, "config")
return LaunchDescription([
DeclareLaunchArgument(
"config_file",
default_value=os.path.join(config_dir, "estop_config.yaml"),
description="E-stop configuration file",
),
DeclareLaunchArgument(
"enable_estop",
default_value="true",
description="Enable emergency stop system",
),
Node(
package="saltybot_emergency_stop",
executable="estop_server",
name="estop_server",
output="screen",
parameters=[{
"config_file": LaunchConfiguration("config_file"),
"enable_estop": LaunchConfiguration("enable_estop"),
"cmd_vel_zero_timeout_ms": 50,
"cascade_timeout_ms": 500,
"log_directory": "/home/seb/saltybot-data/estop",
}],
),
])

View File

@ -0,0 +1,28 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_emergency_stop</name>
<version>0.1.0</version>
<description>
Critical safety emergency stop cascade system for SaltyBot.
ROS2 action server with <50ms response time.
Triggers: voice, gamepad, tilt, obstacle, geofence, watchdog, MQTT.
Cascade: zero cmd_vel, disable autonomous, face alert, LED red, TTS within 500ms.
Cannot be overridden. Manual resume only.
</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>rclcpp_action</depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<depend>sensor_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1 @@
# Emergency stop package

View File

@ -0,0 +1,333 @@
#!/usr/bin/env python3
"""Emergency stop action server implementation."""
import json
import time
import math
import threading
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict
from enum import Enum
import yaml
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.executors import MultiThreadedExecutor
from std_msgs.msg import Bool, String, Float32
from geometry_msgs.msg import Twist
from sensor_msgs.msg import Imu, LaserScan
class EstopState(Enum):
"""E-stop system states."""
ACTIVE = "active" # System running normally
ESTOP = "estop" # Emergency stop triggered
RESUMING = "resuming" # Pending resume confirmation
RESUMED = "resumed" # Resume completed
class EstopTrigger(Enum):
"""Trigger sources for e-stop."""
VOICE = "voice"
GAMEPAD = "gamepad"
TILT = "tilt"
OBSTACLE = "obstacle"
GEOFENCE = "geofence"
WATCHDOG = "watchdog"
MQTT_KILL = "mqtt_kill"
MANUAL_BUTTON = "manual_button"
class EstopServer(Node):
"""Critical emergency stop cascade system."""
def __init__(self):
super().__init__("estop_server")
# Parameters
self.declare_parameter("enable_estop", True)
self.declare_parameter("cmd_vel_zero_timeout_ms", 50)
self.declare_parameter("cascade_timeout_ms", 500)
self.declare_parameter("log_directory", "/home/seb/saltybot-data/estop")
self.declare_parameter("config_file", "estop_config.yaml")
self.enable_estop = self.get_parameter("enable_estop").value
self.cmd_vel_zero_timeout = self.get_parameter("cmd_vel_zero_timeout_ms").value / 1000.0
self.cascade_timeout = self.get_parameter("cascade_timeout_ms").value / 1000.0
self.log_dir = Path(self.get_parameter("log_directory").value)
config_file = self.get_parameter("config_file").value
self.log_dir.mkdir(parents=True, exist_ok=True)
self.config = self._load_config(config_file)
# State management
self.state = EstopState.ACTIVE
self.estop_lock = threading.Lock()
self.estop_time: Optional[float] = None
self.trigger_source: Optional[EstopTrigger] = None
self.resume_pending = False
# Sensor data for snapshot
self.last_imu: Optional[Imu] = None
self.last_scan: Optional[LaserScan] = None
self.last_tilt_angle = 0.0
# Publishers
self.pub_estop_active = self.create_publisher(Bool, "/saltybot/estop_active", 1)
self.pub_estop_event = self.create_publisher(String, "/saltybot/estop_event", 1)
self.pub_cmd_vel = self.create_publisher(Twist, "/cmd_vel", 1)
self.pub_disable_autonomous = self.create_publisher(Bool, "/saltybot/autonomous_enabled", 1)
self.pub_face_alert = self.create_publisher(String, "/saltybot/face/alert", 1)
self.pub_led = self.create_publisher(String, "/saltybot/led/color", 1)
self.pub_tts = self.create_publisher(String, "/saltybot/tts_say", 1)
# Subscribers
self.create_subscription(Imu, "/camera/imu", self._on_imu, 10)
self.create_subscription(LaserScan, "/scan", self._on_scan, 10)
self.create_subscription(Bool, "/saltybot/geofence_violation", self._on_geofence, 1)
self.create_subscription(String, "/voice/command", self._on_voice, 1)
self.create_subscription(Bool, "/gamepad/emergency", self._on_gamepad_estop, 1)
self.create_subscription(Bool, "/gamepad/start", self._on_gamepad_resume, 1)
self.create_subscription(Bool, "/saltybot/watchdog_timeout", self._on_watchdog, 1)
self.create_subscription(Bool, "/saltybot/mqtt_kill", self._on_mqtt_kill, 1)
# Tilt detection timer (10Hz check)
self.create_timer(0.1, self._check_tilt)
self.get_logger().info(
f"E-stop server initialized. "
f"cmd_vel_zero: {self.cmd_vel_zero_timeout*1000}ms, "
f"cascade: {self.cascade_timeout*1000}ms"
)
def _load_config(self, config_file: str) -> dict:
"""Load e-stop configuration."""
try:
if not Path(config_file).exists():
share_dir = Path(__file__).parent.parent / "config"
config_file = str(share_dir / config_file)
with open(config_file, "r") as f:
return yaml.safe_load(f) or {}
except Exception as e:
self.get_logger().warn(f"Config load failed: {e}")
return {}
def _on_imu(self, msg: Imu):
"""Update IMU data for tilt detection."""
self.last_imu = msg
def _on_scan(self, msg: LaserScan):
"""Check for obstacles in scan data."""
self.last_scan = msg
# Check for obstacles within 0.3m
if self.state == EstopState.ACTIVE:
min_range = min([r for r in msg.ranges if 0.1 < r < 100.0] or [float('inf')])
if min_range < 0.3:
self.get_logger().warn(f"Obstacle detected at {min_range}m, triggering e-stop")
self._trigger_estop(EstopTrigger.OBSTACLE)
def _on_geofence(self, msg: Bool):
"""Geofence violation trigger."""
if msg.data and self.state == EstopState.ACTIVE:
self.get_logger().warn("Geofence violation, triggering e-stop")
self._trigger_estop(EstopTrigger.GEOFENCE)
def _on_voice(self, msg: String):
"""Voice command trigger."""
cmd = msg.data.lower()
if any(w in cmd for w in ["stop", "emergency", "e-stop", "estop"]):
if self.state == EstopState.ACTIVE:
self.get_logger().warn(f"Voice e-stop: {cmd}")
self._trigger_estop(EstopTrigger.VOICE)
elif self.state == EstopState.ESTOP and "resume" in cmd:
self.get_logger().info(f"Voice resume: {cmd}")
self._resume_estop()
def _on_gamepad_estop(self, msg: Bool):
"""Gamepad emergency button."""
if msg.data and self.state == EstopState.ACTIVE:
self.get_logger().warn("Gamepad e-stop triggered")
self._trigger_estop(EstopTrigger.GAMEPAD)
def _on_gamepad_resume(self, msg: Bool):
"""Gamepad Start button for resume."""
if msg.data and self.state == EstopState.ESTOP:
self.get_logger().info("Gamepad resume triggered")
self._resume_estop()
def _on_watchdog(self, msg: Bool):
"""Watchdog timeout trigger."""
if msg.data and self.state == EstopState.ACTIVE:
self.get_logger().error("Watchdog timeout, triggering e-stop")
self._trigger_estop(EstopTrigger.WATCHDOG)
def _on_mqtt_kill(self, msg: Bool):
"""Remote MQTT kill signal."""
if msg.data and self.state == EstopState.ACTIVE:
self.get_logger().warn("MQTT remote kill received")
self._trigger_estop(EstopTrigger.MQTT_KILL)
def _check_tilt(self):
"""Check IMU tilt angle (>45 degrees)."""
if self.state != EstopState.ACTIVE or self.last_imu is None:
return
# Calculate tilt angle from quaternion
q = self.last_imu.orientation
roll = math.atan2(
2 * (q.w * q.x + q.y * q.z),
1 - 2 * (q.x * q.x + q.y * q.y)
)
pitch = math.asin(2 * (q.w * q.y - q.z * q.x))
tilt_angle = math.degrees(math.sqrt(roll**2 + pitch**2))
self.last_tilt_angle = tilt_angle
if tilt_angle > 45:
self.get_logger().warn(f"Tilt detected: {tilt_angle:.1f}°, triggering e-stop")
self._trigger_estop(EstopTrigger.TILT)
def _trigger_estop(self, trigger: EstopTrigger):
"""Trigger emergency stop cascade."""
with self.estop_lock:
if self.state != EstopState.ACTIVE:
return # Already in e-stop or other state
self.state = EstopState.ESTOP
self.estop_time = time.time()
self.trigger_source = trigger
self.resume_pending = False
# Start cascade in background to avoid blocking
cascade_thread = threading.Thread(target=self._execute_cascade, args=(trigger,))
cascade_thread.daemon = True
cascade_thread.start()
def _execute_cascade(self, trigger: EstopTrigger):
"""Execute e-stop cascade within 500ms."""
start_time = time.time()
# Priority 1: Zero cmd_vel (must be <50ms)
self._zero_cmd_vel_urgent()
elapsed = (time.time() - start_time) * 1000
self.get_logger().info(f"cmd_vel zeroed in {elapsed:.1f}ms")
# Priority 2: Disable autonomous
self._disable_autonomous()
# Priority 3: Face alert
self._trigger_face_alert()
# Priority 4: LED red
self._set_led_red()
# Priority 5: TTS alert
self._speak_alert()
# Priority 6: Log event + snapshot
self._log_estop_event(trigger)
# Check total cascade time
total_elapsed = (time.time() - start_time) * 1000
self.get_logger().warn(
f"E-stop cascade completed in {total_elapsed:.1f}ms "
f"(target: {self.cascade_timeout*1000}ms)"
)
# Publish e-stop active state
self.pub_estop_active.publish(Bool(data=True))
def _zero_cmd_vel_urgent(self):
"""Urgently zero all velocity commands."""
msg = Twist()
msg.linear.x = 0.0
msg.linear.y = 0.0
msg.linear.z = 0.0
msg.angular.x = 0.0
msg.angular.y = 0.0
msg.angular.z = 0.0
# Publish multiple times to ensure delivery
for _ in range(3):
self.pub_cmd_vel.publish(msg)
time.sleep(0.01) # 10ms between publishes
def _disable_autonomous(self):
"""Disable autonomous mode."""
self.pub_disable_autonomous.publish(Bool(data=False))
def _trigger_face_alert(self):
"""Trigger face alert animation."""
self.pub_face_alert.publish(String(data="emergency_stop"))
def _set_led_red(self):
"""Set LED to red."""
self.pub_led.publish(String(data="red"))
def _speak_alert(self):
"""Speak e-stop alert."""
self.pub_tts.publish(String(data="Emergency stop activated. System halted."))
def _log_estop_event(self, trigger: EstopTrigger):
"""Log e-stop event with sensor snapshot."""
event_data = {
"timestamp": datetime.now().isoformat(),
"trigger": trigger.value,
"state_transition": f"{EstopState.ACTIVE.value} -> {EstopState.ESTOP.value}",
"sensor_snapshot": {
"tilt_angle_deg": self.last_tilt_angle,
"imu_available": self.last_imu is not None,
"scan_available": self.last_scan is not None,
},
}
try:
filename = self.log_dir / f"estop_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, "w") as f:
json.dump(event_data, f, indent=2)
self.get_logger().info(f"E-stop event logged to {filename}")
except Exception as e:
self.get_logger().error(f"Failed to log e-stop event: {e}")
# Publish event
self.pub_estop_event.publish(String(data=json.dumps(event_data)))
def _resume_estop(self):
"""Resume from e-stop (manual only)."""
with self.estop_lock:
if self.state != EstopState.ESTOP:
return
self.state = EstopState.ACTIVE
self.resume_pending = False
# Clear alert state
self.pub_face_alert.publish(String(data="resume"))
self.pub_led.publish(String(data="green"))
self.pub_tts.publish(String(data="System resumed."))
self.pub_estop_active.publish(Bool(data=False))
self.get_logger().info("System resumed from e-stop")
def main(args=None):
rclpy.init(args=args)
node = EstopServer()
executor = MultiThreadedExecutor()
executor.add_node(node)
try:
executor.spin()
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,2 @@
[develop]
script-dir=$base/lib/saltybot_emergency_stop

View File

@ -0,0 +1,25 @@
from setuptools import setup
setup(
name="saltybot_emergency_stop",
version="0.1.0",
packages=["saltybot_emergency_stop"],
data_files=[
("share/ament_index/resource_index/packages", ["resource/saltybot_emergency_stop"]),
("share/saltybot_emergency_stop", ["package.xml"]),
("share/saltybot_emergency_stop/launch", ["launch/estop.launch.py"]),
("share/saltybot_emergency_stop/config", ["config/estop_config.yaml"]),
],
install_requires=["setuptools", "pyyaml"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description="Critical emergency stop cascade system with <50ms response",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"estop_server = saltybot_emergency_stop.estop_server:main",
],
},
)

View File

@ -0,0 +1 @@
# Test module

View File

@ -0,0 +1,38 @@
"""Unit tests for emergency stop system."""
import unittest
import json
from datetime import datetime
class TestEmergencyStop(unittest.TestCase):
def test_estop_trigger_creation(self):
triggers = ["voice", "gamepad", "tilt", "obstacle", "geofence", "watchdog", "mqtt"]
self.assertEqual(len(triggers), 7)
def test_cascade_timing(self):
"""Test cascade timeout constraints."""
cmd_vel_zero_ms = 50
cascade_ms = 500
self.assertLessEqual(cmd_vel_zero_ms, 50)
self.assertLessEqual(cascade_ms, 500)
def test_estop_event_logging(self):
"""Test e-stop event log format."""
event = {
"timestamp": datetime.now().isoformat(),
"trigger": "voice",
"state_transition": "active -> estop",
"sensor_snapshot": {
"tilt_angle_deg": 45.0,
"imu_available": True,
"scan_available": True,
},
}
json_str = json.dumps(event)
parsed = json.loads(json_str)
self.assertEqual(parsed["trigger"], "voice")
self.assertIn("timestamp", parsed)
if __name__ == "__main__":
unittest.main()