feat(#169): emergency behavior system — obstacle stop, fall prevention, stuck detection, recovery FSM #179

Merged
sl-jetson merged 1 commits from sl-controls/issue-169-emergency into main 2026-03-02 10:44:50 -05:00
17 changed files with 2095 additions and 0 deletions

View File

@ -0,0 +1,43 @@
/**:
ros__parameters:
# Control loop rate (Hz)
control_rate: 20.0
# Odometry topic for stuck detection
odom_topic: "/saltybot/rover_odom"
# ── LaserScan forward sector ───────────────────────────────────────────────
forward_scan_angle_rad: 0.785 # ±45° forward sector
# ── Obstacle proximity ────────────────────────────────────────────────────
stop_distance_m: 0.30 # MAJOR threshold (spec: <30 cm)
critical_distance_m: 0.10 # CRITICAL threshold
min_cmd_speed_ms: 0.05 # ignore obstacle when nearly stopped
# ── Fall detection (IMU tilt) ─────────────────────────────────────────────
minor_tilt_rad: 0.20 # advisory
major_tilt_rad: 0.35 # stop + recover
critical_tilt_rad: 0.52 # ~30° — full shutdown
floor_drop_m: 0.15 # depth discontinuity triggering MAJOR
# ── Stuck detection ───────────────────────────────────────────────────────
stuck_timeout_s: 3.0 # (spec: 3 s wheel stall)
# ── Bump / jerk detection ─────────────────────────────────────────────────
jerk_threshold_ms3: 8.0
critical_jerk_threshold_ms3: 25.0
# ── FSM / recovery ────────────────────────────────────────────────────────
stopped_ms: 0.03 # speed below which robot is "stopped" (m/s)
major_count_threshold: 3 # MAJOR alerts before escalation to CRITICAL
escalation_window_s: 10.0 # sliding window for escalation counter (s)
suppression_s: 1.0 # de-bounce period for duplicate alerts (s)
# Recovery sequence
reverse_speed_ms: -0.15 # back-up speed (m/s; must be negative)
reverse_distance_m: 0.30 # distance to reverse each cycle (m)
angular_speed_rads: 0.60 # turn speed (rad/s)
turn_angle_rad: 1.5708 # ~90° turn (rad)
retry_timeout_s: 3.0 # time in RETRYING per attempt (s)
clear_hold_s: 0.50 # consecutive clear time to declare success (s)
max_retries: 3 # maximum reverse+turn attempts before GAVE_UP

View File

@ -0,0 +1,53 @@
"""
emergency.launch.py Launch the emergency behavior system (Issue #169).
Usage
-----
ros2 launch saltybot_emergency emergency.launch.py
ros2 launch saltybot_emergency emergency.launch.py \
stop_distance_m:=0.30 max_retries:=3
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg_share = get_package_share_directory("saltybot_emergency")
default_params = os.path.join(pkg_share, "config", "emergency_params.yaml")
return LaunchDescription([
DeclareLaunchArgument(
"params_file",
default_value=default_params,
description="Path to emergency_params.yaml",
),
DeclareLaunchArgument(
"stop_distance_m",
default_value="0.30",
description="Obstacle distance triggering MAJOR stop (m)",
),
DeclareLaunchArgument(
"max_retries",
default_value="3",
description="Maximum recovery cycles before ESCALATED",
),
Node(
package="saltybot_emergency",
executable="emergency_node",
name="emergency",
output="screen",
parameters=[
LaunchConfiguration("params_file"),
{
"stop_distance_m": LaunchConfiguration("stop_distance_m"),
"max_retries": LaunchConfiguration("max_retries"),
},
],
),
])

View File

@ -0,0 +1,24 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_emergency</name>
<version>0.1.0</version>
<description>Emergency behavior system — collision avoidance, fall prevention, stuck detection, recovery (Issue #169)</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>nav_msgs</depend>
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,139 @@
"""
alert_manager.py Alert severity escalation for emergency behavior (Issue #169).
Alert levels
NONE : no action
MINOR : advisory beep publish to /saltybot/alert_beep
MAJOR : stop + LED flash publish to /saltybot/alert_flash; cmd_vel override
CRITICAL : full shutdown + MQTT publish to /saltybot/e_stop + /saltybot/critical_alert
Escalation
If major_count_threshold MAJOR alerts occur within escalation_window_s, the
next MAJOR is promoted to CRITICAL. This catches persistent stuck / repeated
collision scenarios.
Suppression
Identical (type, level) alerts are suppressed within suppression_s to avoid
flooding downstream topics.
Pure module no ROS2 dependency.
"""
from __future__ import annotations
from collections import deque
from dataclasses import dataclass
from enum import Enum
from typing import Optional
from saltybot_emergency.threat_detector import ThreatEvent, ThreatLevel, ThreatType
# ── Alert level ───────────────────────────────────────────────────────────────
class AlertLevel(Enum):
NONE = 0
MINOR = 1 # beep
MAJOR = 2 # stop + flash
CRITICAL = 3 # shutdown + MQTT
# ── Alert ─────────────────────────────────────────────────────────────────────
@dataclass
class Alert:
level: AlertLevel
source: str # ThreatType value string
message: str
timestamp_s: float
# ── AlertManager ─────────────────────────────────────────────────────────────
class AlertManager:
"""
Converts ThreatEvents to Alerts with escalation and suppression logic.
Parameters
----------
major_count_threshold : number of MAJOR alerts within window to escalate
escalation_window_s : sliding window for escalation counting (s)
suppression_s : suppress duplicate (type, level) alerts within this period
"""
def __init__(
self,
major_count_threshold: int = 3,
escalation_window_s: float = 10.0,
suppression_s: float = 1.0,
):
self._major_threshold = max(1, int(major_count_threshold))
self._esc_window = float(escalation_window_s)
self._suppress = float(suppression_s)
self._major_times: deque = deque() # timestamps of MAJOR alerts
self._last_seen: dict = {} # (type, level) → timestamp
# ── Update ────────────────────────────────────────────────────────────────
def update(self, threat: ThreatEvent) -> Optional[Alert]:
"""
Convert one ThreatEvent to an Alert, applying escalation and suppression.
Returns None if threat is CLEAR or the alert is suppressed.
"""
if threat.level == ThreatLevel.CLEAR:
return None
now = threat.timestamp_s
alert_level = _threat_to_alert(threat.level)
# ── Suppression ───────────────────────────────────────────────────────
key = (threat.threat_type, threat.level)
last = self._last_seen.get(key)
if last is not None and (now - last) < self._suppress:
return None
self._last_seen[key] = now
# ── Escalation ────────────────────────────────────────────────────────
if alert_level == AlertLevel.MAJOR:
# Prune old timestamps
while self._major_times and (now - self._major_times[0]) > self._esc_window:
self._major_times.popleft()
self._major_times.append(now)
if len(self._major_times) >= self._major_threshold:
alert_level = AlertLevel.CRITICAL
msg = _build_message(alert_level, threat)
return Alert(
level=alert_level,
source=threat.threat_type.value,
message=msg,
timestamp_s=now,
)
def reset(self) -> None:
"""Clear escalation history and suppression state."""
self._major_times.clear()
self._last_seen.clear()
# ── Helpers ───────────────────────────────────────────────────────────────────
def _threat_to_alert(level: ThreatLevel) -> AlertLevel:
return {
ThreatLevel.MINOR: AlertLevel.MINOR,
ThreatLevel.MAJOR: AlertLevel.MAJOR,
ThreatLevel.CRITICAL: AlertLevel.CRITICAL,
}.get(level, AlertLevel.NONE)
def _build_message(level: AlertLevel, threat: ThreatEvent) -> str:
prefix = {
AlertLevel.MINOR: "[MINOR]",
AlertLevel.MAJOR: "[MAJOR]",
AlertLevel.CRITICAL: "[CRITICAL]",
}.get(level, "[?]")
return f"{prefix} {threat.threat_type.value}: {threat.detail}"

View File

@ -0,0 +1,232 @@
"""
emergency_fsm.py Master emergency FSM integrating all detectors (Issue #169).
States
NOMINAL : normal operation; minor alerts pass through; major/critical STOPPING.
STOPPING : commanding zero velocity until robot speed drops below stopped_ms.
Critical threats skip RECOVERING ESCALATED immediately.
RECOVERING : RecoverySequencer executing reverse+turn sequence.
Success NOMINAL; gave-up ESCALATED.
ESCALATED : full stop; critical alert emitted once; stays until acknowledge.
Alert actions produced by state
NOMINAL : emit MINOR alert (beep only); no velocity override.
STOPPING : suppress nav, publish zero; emit MAJOR alert once.
RECOVERING : suppress nav, publish recovery cmds; no new alerts.
ESCALATED : suppress nav, publish zero; emit CRITICAL alert once per entry.
Pure module no ROS2 dependency.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
from saltybot_emergency.alert_manager import Alert, AlertLevel, AlertManager
from saltybot_emergency.recovery_sequencer import RecoveryInputs, RecoverySequencer
from saltybot_emergency.threat_detector import ThreatEvent, ThreatLevel
# ── States ────────────────────────────────────────────────────────────────────
class EmergencyState(Enum):
NOMINAL = "NOMINAL"
STOPPING = "STOPPING"
RECOVERING = "RECOVERING"
ESCALATED = "ESCALATED"
# ── I/O ───────────────────────────────────────────────────────────────────────
@dataclass
class EmergencyInputs:
threat: ThreatEvent # highest-severity threat this tick
robot_speed_ms: float = 0.0 # actual speed from odometry (m/s)
acknowledge: bool = False # operator cleared the escalation
@dataclass
class EmergencyOutputs:
state: EmergencyState = EmergencyState.NOMINAL
cmd_override: bool = False # True = emergency owns cmd_vel
cmd_linear: float = 0.0
cmd_angular: float = 0.0
alert: Optional[Alert] = None
e_stop: bool = False # assert /saltybot/e_stop
state_changed: bool = False
recovery_progress: float = 0.0
recovery_retry_count: int = 0
# ── EmergencyFSM ──────────────────────────────────────────────────────────────
class EmergencyFSM:
"""
Master emergency FSM.
Owns an AlertManager and a RecoverySequencer; coordinates them each tick.
Parameters
----------
stopped_ms : speed below which robot is considered stopped (m/s)
major_count_threshold : MAJOR events within window before escalation
escalation_window_s : sliding window for escalation (s)
suppression_s : alert de-bounce period (s)
reverse_speed_ms : reverse speed during recovery (m/s)
reverse_distance_m : reverse travel per cycle (m)
angular_speed_rads : turn speed during recovery (rad/s)
turn_angle_rad : turn per cycle (rad)
retry_timeout_s : time in RETRYING before next cycle (s)
clear_hold_s : clear duration required to declare success (s)
max_retries : recovery cycles before GAVE_UP
"""
def __init__(
self,
stopped_ms: float = 0.03,
major_count_threshold: int = 3,
escalation_window_s: float = 10.0,
suppression_s: float = 1.0,
reverse_speed_ms: float = -0.15,
reverse_distance_m: float = 0.30,
angular_speed_rads: float = 0.60,
turn_angle_rad: float = 1.5708,
retry_timeout_s: float = 3.0,
clear_hold_s: float = 0.5,
max_retries: int = 3,
):
self._stopped_ms = max(0.0, stopped_ms)
self._alert_mgr = AlertManager(
major_count_threshold=major_count_threshold,
escalation_window_s=escalation_window_s,
suppression_s=suppression_s,
)
self._recovery = RecoverySequencer(
reverse_speed_ms=reverse_speed_ms,
reverse_distance_m=reverse_distance_m,
angular_speed_rads=angular_speed_rads,
turn_angle_rad=turn_angle_rad,
retry_timeout_s=retry_timeout_s,
clear_hold_s=clear_hold_s,
max_retries=max_retries,
)
self._state = EmergencyState.NOMINAL
self._critical_pending = False # STOPPING → ESCALATED (not RECOVERING)
self._escalation_alerted = False # CRITICAL alert emitted once per ESCALATED entry
# ── Public API ────────────────────────────────────────────────────────────
@property
def state(self) -> EmergencyState:
return self._state
def reset(self) -> None:
self._state = EmergencyState.NOMINAL
self._critical_pending = False
self._escalation_alerted = False
self._alert_mgr.reset()
self._recovery.reset()
def tick(self, inputs: EmergencyInputs) -> EmergencyOutputs:
prev = self._state
out = self._step(inputs)
out.state = self._state
out.state_changed = (self._state != prev)
return out
# ── Step ─────────────────────────────────────────────────────────────────
def _step(self, inp: EmergencyInputs) -> EmergencyOutputs:
out = EmergencyOutputs(state=self._state)
# Run alert manager for this threat
alert = self._alert_mgr.update(inp.threat)
# ── NOMINAL ───────────────────────────────────────────────────────────
if self._state == EmergencyState.NOMINAL:
if inp.threat.level == ThreatLevel.CRITICAL:
self._state = EmergencyState.STOPPING
self._critical_pending = True
out.alert = alert
out.cmd_override = True # start overriding on entry tick
elif inp.threat.level == ThreatLevel.MAJOR:
self._state = EmergencyState.STOPPING
self._critical_pending = False
out.alert = alert
out.cmd_override = True # start overriding on entry tick
elif inp.threat.level == ThreatLevel.MINOR:
# Advisory only — no override
out.alert = alert
# ── STOPPING ──────────────────────────────────────────────────────────
elif self._state == EmergencyState.STOPPING:
out.cmd_override = True
out.cmd_linear = 0.0
out.cmd_angular = 0.0
# Upgrade to critical if new critical arrives
if inp.threat.level == ThreatLevel.CRITICAL:
self._critical_pending = True
if abs(inp.robot_speed_ms) <= self._stopped_ms:
if self._critical_pending:
self._state = EmergencyState.ESCALATED
self._escalation_alerted = False
else:
self._state = EmergencyState.RECOVERING
self._recovery.reset()
self._recovery.tick(RecoveryInputs(trigger=True, dt=0.0))
# ── RECOVERING ────────────────────────────────────────────────────────
elif self._state == EmergencyState.RECOVERING:
threat_cleared = inp.threat.level == ThreatLevel.CLEAR
rec_inp = RecoveryInputs(
trigger=False,
threat_cleared=threat_cleared,
dt=0.02, # nominal dt; node should pass actual dt
)
rec_out = self._recovery.tick(rec_inp)
out.cmd_override = True
out.cmd_linear = rec_out.cmd_linear
out.cmd_angular = rec_out.cmd_angular
out.recovery_progress = rec_out.progress
out.recovery_retry_count = rec_out.retry_count
if rec_out.gave_up:
self._state = EmergencyState.ESCALATED
self._escalation_alerted = False
elif rec_out.state.value == "IDLE" and not inp.trigger if hasattr(inp, "trigger") else True:
# RecoverySequencer returned to IDLE = success
from saltybot_emergency.recovery_sequencer import RecoveryState
if self._recovery.state == RecoveryState.IDLE and not rec_out.gave_up:
self._state = EmergencyState.NOMINAL
self._recovery.reset()
# ── ESCALATED ─────────────────────────────────────────────────────────
elif self._state == EmergencyState.ESCALATED:
out.cmd_override = True
out.cmd_linear = 0.0
out.cmd_angular = 0.0
out.e_stop = True
if not self._escalation_alerted:
# Force a CRITICAL alert regardless of suppression
from saltybot_emergency.alert_manager import Alert, AlertLevel
out.alert = Alert(
level=AlertLevel.CRITICAL,
source=inp.threat.threat_type.value,
message=f"[CRITICAL] ESCALATED: {inp.threat.detail or 'Recovery gave up'}",
timestamp_s=inp.threat.timestamp_s,
)
self._escalation_alerted = True
if inp.acknowledge:
self._state = EmergencyState.NOMINAL
self._critical_pending = False
self._escalation_alerted = False
out.e_stop = False
self._alert_mgr.reset()
self._recovery.reset()
return out

View File

@ -0,0 +1,383 @@
"""
emergency_node.py Emergency behavior system orchestration (Issue #169).
Overview
Aggregates threats from four independent detectors and drives the
EmergencyFSM. Overrides /cmd_vel when an emergency is active. Escalates
via /saltybot/e_stop and /saltybot/critical_alert for CRITICAL events.
Pipeline (20 Hz)
1. LaserScan callback ObstacleDetector ThreatEvent
2. IMU callback FallDetector + BumpDetector ThreatEvent (×2)
3. Odom callback StuckDetector (fed in timer) ThreatEvent
4. 20 Hz timer highest_threat() EmergencyFSM.tick()
publish overriding cmd_vel or pass-through
publish /saltybot/emergency + /saltybot/recovery_action
Subscribes
/scan sensor_msgs/LaserScan obstacle detection
/saltybot/imu sensor_msgs/Imu fall + bump detection
<odom_topic> nav_msgs/Odometry stuck + speed tracking
/cmd_vel geometry_msgs/Twist nav commands (pass-through)
Publishes
/saltybot/cmd_vel_out geometry_msgs/Twist muxed cmd_vel (to drive nodes)
/saltybot/e_stop std_msgs/Bool emergency stop flag
/saltybot/alert_beep std_msgs/Empty beep trigger (MINOR)
/saltybot/alert_flash std_msgs/Empty LED flash trigger (MAJOR)
/saltybot/critical_alert std_msgs/String (JSON) CRITICAL event for MQTT bridge
/saltybot/emergency saltybot_emergency_msgs/EmergencyEvent
/saltybot/recovery_action saltybot_emergency_msgs/RecoveryAction
Parameters
control_rate 20.0 Hz
odom_topic /saltybot/rover_odom
forward_scan_angle_rad 0.785 rad (±45° forward sector for obstacle check)
stop_distance_m 0.30 m
critical_distance_m 0.10 m
min_cmd_speed_ms 0.05 m/s
minor_tilt_rad 0.20 rad
major_tilt_rad 0.35 rad
critical_tilt_rad 0.52 rad
floor_drop_m 0.15 m
stuck_timeout_s 3.0 s
jerk_threshold_ms3 8.0 m/
critical_jerk_threshold_ms3 25.0 m/
stopped_ms 0.03 m/s
major_count_threshold 3
escalation_window_s 10.0 s
suppression_s 1.0 s
reverse_speed_ms -0.15 m/s
reverse_distance_m 0.30 m
angular_speed_rads 0.60 rad/s
turn_angle_rad 1.5708 rad
retry_timeout_s 3.0 s
clear_hold_s 0.50 s
max_retries 3
"""
import json
import math
import time
import rclpy
from rclpy.node import Node
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry
from sensor_msgs.msg import Imu, LaserScan
from std_msgs.msg import Bool, Empty, String
from saltybot_emergency.alert_manager import AlertLevel
from saltybot_emergency.emergency_fsm import EmergencyFSM, EmergencyInputs, EmergencyState
from saltybot_emergency.threat_detector import (
BumpDetector,
FallDetector,
ObstacleDetector,
StuckDetector,
ThreatEvent,
ThreatType,
highest_threat,
)
try:
from saltybot_emergency_msgs.msg import EmergencyEvent, RecoveryAction
_MSGS_OK = True
except ImportError:
_MSGS_OK = False
def _quaternion_to_pitch_roll(qx, qy, qz, qw):
pitch = math.asin(max(-1.0, min(1.0, 2.0 * (qw * qy - qz * qx))))
roll = math.atan2(2.0 * (qw * qx + qy * qz), 1.0 - 2.0 * (qx * qx + qy * qy))
return pitch, roll
class EmergencyNode(Node):
def __init__(self):
super().__init__("emergency")
self._declare_params()
p = self._load_params()
# ── Detectors ────────────────────────────────────────────────────────
self._obstacle = ObstacleDetector(
stop_distance_m=p["stop_distance_m"],
critical_distance_m=p["critical_distance_m"],
min_speed_ms=p["min_cmd_speed_ms"],
)
self._fall = FallDetector(
minor_tilt_rad=p["minor_tilt_rad"],
major_tilt_rad=p["major_tilt_rad"],
critical_tilt_rad=p["critical_tilt_rad"],
floor_drop_m=p["floor_drop_m"],
)
self._stuck = StuckDetector(
stuck_timeout_s=p["stuck_timeout_s"],
min_cmd_ms=p["min_cmd_speed_ms"],
)
self._bump = BumpDetector(
jerk_threshold_ms3=p["jerk_threshold_ms3"],
critical_jerk_threshold_ms3=p["critical_jerk_threshold_ms3"],
)
self._fsm = EmergencyFSM(
stopped_ms=p["stopped_ms"],
major_count_threshold=p["major_count_threshold"],
escalation_window_s=p["escalation_window_s"],
suppression_s=p["suppression_s"],
reverse_speed_ms=p["reverse_speed_ms"],
reverse_distance_m=p["reverse_distance_m"],
angular_speed_rads=p["angular_speed_rads"],
turn_angle_rad=p["turn_angle_rad"],
retry_timeout_s=p["retry_timeout_s"],
clear_hold_s=p["clear_hold_s"],
max_retries=p["max_retries"],
)
# ── State ────────────────────────────────────────────────────────────
self._latest_obstacle_threat = ThreatEvent()
self._latest_fall_threat = ThreatEvent()
self._latest_bump_threat = ThreatEvent()
self._cmd_speed_ms = 0.0
self._actual_speed_ms = 0.0
self._last_ctrl_t = time.monotonic()
self._scan_forward_angle = p["forward_scan_angle_rad"]
self._acknowledge_flag = False
# ── QoS ──────────────────────────────────────────────────────────────
reliable = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
best_effort = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
# ── Subscriptions ────────────────────────────────────────────────────
self.create_subscription(LaserScan, "/scan", self._scan_cb, best_effort)
self.create_subscription(Imu, "/saltybot/imu", self._imu_cb, best_effort)
self.create_subscription(Odometry, p["odom_topic"], self._odom_cb, reliable)
self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, reliable)
self.create_subscription(Bool, "/saltybot/emergency_ack", self._ack_cb, reliable)
# ── Publishers ───────────────────────────────────────────────────────
self._cmd_out_pub = self.create_publisher(Twist, "/saltybot/cmd_vel_out", reliable)
self._estop_pub = self.create_publisher(Bool, "/saltybot/e_stop", reliable)
self._beep_pub = self.create_publisher(Empty, "/saltybot/alert_beep", reliable)
self._flash_pub = self.create_publisher(Empty, "/saltybot/alert_flash", reliable)
self._critical_pub = self.create_publisher(String, "/saltybot/critical_alert", reliable)
self._event_pub = None
self._recovery_pub = None
if _MSGS_OK:
self._event_pub = self.create_publisher(EmergencyEvent, "/saltybot/emergency", reliable)
self._recovery_pub = self.create_publisher(RecoveryAction, "/saltybot/recovery_action", reliable)
# ── Timer ────────────────────────────────────────────────────────────
rate = p["control_rate"]
self._timer = self.create_timer(1.0 / rate, self._control_cb)
self.get_logger().info(f"EmergencyNode ready rate={rate}Hz")
# ── Parameters ────────────────────────────────────────────────────────────
def _declare_params(self) -> None:
self.declare_parameter("control_rate", 20.0)
self.declare_parameter("odom_topic", "/saltybot/rover_odom")
self.declare_parameter("forward_scan_angle_rad", 0.785)
self.declare_parameter("stop_distance_m", 0.30)
self.declare_parameter("critical_distance_m", 0.10)
self.declare_parameter("min_cmd_speed_ms", 0.05)
self.declare_parameter("minor_tilt_rad", 0.20)
self.declare_parameter("major_tilt_rad", 0.35)
self.declare_parameter("critical_tilt_rad", 0.52)
self.declare_parameter("floor_drop_m", 0.15)
self.declare_parameter("stuck_timeout_s", 3.0)
self.declare_parameter("jerk_threshold_ms3", 8.0)
self.declare_parameter("critical_jerk_threshold_ms3", 25.0)
self.declare_parameter("stopped_ms", 0.03)
self.declare_parameter("major_count_threshold", 3)
self.declare_parameter("escalation_window_s", 10.0)
self.declare_parameter("suppression_s", 1.0)
self.declare_parameter("reverse_speed_ms", -0.15)
self.declare_parameter("reverse_distance_m", 0.30)
self.declare_parameter("angular_speed_rads", 0.60)
self.declare_parameter("turn_angle_rad", 1.5708)
self.declare_parameter("retry_timeout_s", 3.0)
self.declare_parameter("clear_hold_s", 0.50)
self.declare_parameter("max_retries", 3)
def _load_params(self) -> dict:
g = self.get_parameter
return {k: g(k).value for k in [
"control_rate", "odom_topic",
"forward_scan_angle_rad",
"stop_distance_m", "critical_distance_m", "min_cmd_speed_ms",
"minor_tilt_rad", "major_tilt_rad", "critical_tilt_rad", "floor_drop_m",
"stuck_timeout_s", "jerk_threshold_ms3", "critical_jerk_threshold_ms3",
"stopped_ms",
"major_count_threshold", "escalation_window_s", "suppression_s",
"reverse_speed_ms", "reverse_distance_m",
"angular_speed_rads", "turn_angle_rad",
"retry_timeout_s", "clear_hold_s", "max_retries",
]}
# ── Callbacks ─────────────────────────────────────────────────────────────
def _scan_cb(self, msg: LaserScan) -> None:
# Extract minimum range within forward sector (±forward_scan_angle_rad)
half = self._scan_forward_angle
ranges = []
for i, r in enumerate(msg.ranges):
angle = msg.angle_min + i * msg.angle_increment
if abs(angle) <= half and msg.range_min < r < msg.range_max:
ranges.append(r)
min_r = min(ranges) if ranges else float("inf")
self._latest_obstacle_threat = self._obstacle.update(
min_r, self._cmd_speed_ms, time.monotonic()
)
def _imu_cb(self, msg: Imu) -> None:
now = time.monotonic()
ax = msg.linear_acceleration.x
ay = msg.linear_acceleration.y
az = msg.linear_acceleration.z
pitch, roll = _quaternion_to_pitch_roll(
msg.orientation.x, msg.orientation.y,
msg.orientation.z, msg.orientation.w,
)
# dt for jerk is approximated; bump detector handles None on first call
dt = 0.02 # nominal 20 Hz
self._latest_fall_threat = self._fall.update(pitch, roll, 0.0, now)
self._latest_bump_threat = self._bump.update(ax, ay, az, dt, now)
def _odom_cb(self, msg: Odometry) -> None:
self._actual_speed_ms = msg.twist.twist.linear.x
def _cmd_vel_cb(self, msg: Twist) -> None:
self._cmd_speed_ms = msg.linear.x
def _ack_cb(self, msg: Bool) -> None:
if msg.data:
self._acknowledge_flag = True
# ── 20 Hz control loop ────────────────────────────────────────────────────
def _control_cb(self) -> None:
now = time.monotonic()
dt = now - self._last_ctrl_t
self._last_ctrl_t = now
stuck_threat = self._stuck.update(
self._cmd_speed_ms, self._actual_speed_ms, dt, now
)
threat = highest_threat([
self._latest_obstacle_threat,
self._latest_fall_threat,
self._latest_bump_threat,
stuck_threat,
])
inp = EmergencyInputs(
threat=threat,
robot_speed_ms=self._actual_speed_ms,
acknowledge=self._acknowledge_flag,
)
self._acknowledge_flag = False
out = self._fsm.tick(inp)
if out.state_changed:
self.get_logger().info(f"Emergency FSM → {out.state.value}")
# ── Alert dispatch ────────────────────────────────────────────────────
if out.alert is not None:
lvl = out.alert.level
self.get_logger().warn(out.alert.message)
if lvl == AlertLevel.MINOR:
self._beep_pub.publish(Empty())
elif lvl == AlertLevel.MAJOR:
self._flash_pub.publish(Empty())
elif lvl == AlertLevel.CRITICAL:
self._flash_pub.publish(Empty())
self._publish_critical_alert(out.alert.message, threat)
# ── E-stop ───────────────────────────────────────────────────────────
estop_msg = Bool()
estop_msg.data = out.e_stop
self._estop_pub.publish(estop_msg)
# ── cmd_vel mux ───────────────────────────────────────────────────────
twist = Twist()
if out.cmd_override:
twist.linear.x = out.cmd_linear
twist.angular.z = out.cmd_angular
else:
twist.linear.x = self._cmd_speed_ms
self._cmd_out_pub.publish(twist)
# ── Status topics ─────────────────────────────────────────────────────
if self._event_pub is not None:
self._publish_event(out, threat)
if self._recovery_pub is not None:
self._publish_recovery(out)
# ── Publishers ────────────────────────────────────────────────────────────
def _publish_critical_alert(self, message: str, threat: ThreatEvent) -> None:
msg = String()
msg.data = json.dumps({
"severity": "CRITICAL",
"threat": threat.threat_type.value,
"value": round(threat.value, 3),
"detail": threat.detail,
"message": message,
})
self._critical_pub.publish(msg)
def _publish_event(self, out, threat: ThreatEvent) -> None:
msg = EmergencyEvent()
msg.stamp = self.get_clock().now().to_msg()
msg.state = out.state.value
msg.threat_type = threat.threat_type.value
msg.severity = threat.level.name
msg.threat_value = float(threat.value)
msg.detail = threat.detail
msg.cmd_override = out.cmd_override
self._event_pub.publish(msg)
def _publish_recovery(self, out) -> None:
msg = RecoveryAction()
msg.stamp = self.get_clock().now().to_msg()
msg.action = self._fsm._recovery.state.value
msg.retry_count = out.recovery_retry_count
msg.progress = float(out.recovery_progress)
self._recovery_pub.publish(msg)
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = EmergencyNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,193 @@
"""
recovery_sequencer.py Reverse + turn recovery FSM for emergency behavior (Issue #169).
Sequence
IDLE REVERSING TURNING RETRYING (IDLE on success)
(REVERSING on re-threat, retry loop)
(GAVE_UP after max_retries)
REVERSING : command reverse at reverse_speed_ms until reverse_distance_m covered.
TURNING : command angular_speed_rads until turn_angle_rad covered (90°).
RETRYING : zero velocity; wait up to retry_timeout_s for threat to clear.
If threat clears within clear_hold_s back to IDLE (success).
If timeout without clearance start another REVERSING cycle.
If retry_count >= max_retries GAVE_UP.
Pure module no ROS2 dependency.
"""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
# ── States ────────────────────────────────────────────────────────────────────
class RecoveryState(Enum):
IDLE = "IDLE"
REVERSING = "REVERSING"
TURNING = "TURNING"
RETRYING = "RETRYING"
GAVE_UP = "GAVE_UP"
# ── I/O ───────────────────────────────────────────────────────────────────────
@dataclass
class RecoveryInputs:
trigger: bool = False # True to start (or restart) recovery
threat_cleared: bool = False # True when all threats are CLEAR
dt: float = 0.02 # time step (s)
@dataclass
class RecoveryOutputs:
state: RecoveryState = RecoveryState.IDLE
cmd_linear: float = 0.0 # m/s
cmd_angular: float = 0.0 # rad/s
progress: float = 0.0 # [0, 1] completion of current phase
retry_count: int = 0
gave_up: bool = False
state_changed: bool = False
# ── RecoverySequencer ────────────────────────────────────────────────────────
class RecoverySequencer:
"""
Tick-based FSM for executing reverse + turn recovery sequences.
Parameters
----------
reverse_speed_ms : backward speed during REVERSING (m/s; stored as negative)
reverse_distance_m: total reverse travel before turning (m)
angular_speed_rads: yaw rate during TURNING (rad/s; positive = left)
turn_angle_rad : total turn angle before RETRYING (rad; default π/2)
retry_timeout_s : max RETRYING time per attempt before next reverse cycle
clear_hold_s : consecutive clear time needed to declare success
max_retries : maximum reverse+turn attempts before GAVE_UP
"""
def __init__(
self,
reverse_speed_ms: float = -0.15,
reverse_distance_m: float = 0.30,
angular_speed_rads: float = 0.60,
turn_angle_rad: float = 1.5708, # π/2
retry_timeout_s: float = 3.0,
clear_hold_s: float = 0.5,
max_retries: int = 3,
):
self._rev_speed = min(0.0, float(reverse_speed_ms)) # ensure negative
self._rev_dist = max(0.01, float(reverse_distance_m))
self._ang_speed = abs(float(angular_speed_rads))
self._turn_angle = max(0.01, float(turn_angle_rad))
self._retry_tout = max(0.1, float(retry_timeout_s))
self._clear_hold = max(0.0, float(clear_hold_s))
self._max_retry = max(1, int(max_retries))
self._state = RecoveryState.IDLE
self._rev_done = 0.0 # distance reversed so far
self._turn_done = 0.0 # angle turned so far
self._retry_count = 0
self._retry_timer = 0.0 # time spent in RETRYING
self._clear_timer = 0.0 # consecutive clear time in RETRYING
# ── Public API ────────────────────────────────────────────────────────────
@property
def state(self) -> RecoveryState:
return self._state
@property
def retry_count(self) -> int:
return self._retry_count
def reset(self) -> None:
"""Return to IDLE and clear all counters."""
self._state = RecoveryState.IDLE
self._rev_done = 0.0
self._turn_done = 0.0
self._retry_count = 0
self._retry_timer = 0.0
self._clear_timer = 0.0
def tick(self, inputs: RecoveryInputs) -> RecoveryOutputs:
prev = self._state
out = self._step(inputs)
out.state = self._state
out.retry_count = self._retry_count
out.state_changed = (self._state != prev)
if out.state_changed:
self._on_enter(self._state)
return out
# ── Internal step ─────────────────────────────────────────────────────────
def _step(self, inp: RecoveryInputs) -> RecoveryOutputs:
out = RecoveryOutputs(state=self._state)
dt = max(0.0, inp.dt)
# ── IDLE ──────────────────────────────────────────────────────────────
if self._state == RecoveryState.IDLE:
if inp.trigger:
self._state = RecoveryState.REVERSING
# ── REVERSING ─────────────────────────────────────────────────────────
elif self._state == RecoveryState.REVERSING:
step = abs(self._rev_speed) * dt
self._rev_done += step
out.cmd_linear = self._rev_speed
out.progress = min(1.0, self._rev_done / self._rev_dist)
if self._rev_done >= self._rev_dist:
self._state = RecoveryState.TURNING
# ── TURNING ───────────────────────────────────────────────────────────
elif self._state == RecoveryState.TURNING:
step = self._ang_speed * dt
self._turn_done += step
out.cmd_angular = self._ang_speed
out.progress = min(1.0, self._turn_done / self._turn_angle)
if self._turn_done >= self._turn_angle:
self._retry_count += 1
self._state = RecoveryState.RETRYING
# ── RETRYING ──────────────────────────────────────────────────────────
elif self._state == RecoveryState.RETRYING:
self._retry_timer += dt
if inp.threat_cleared:
self._clear_timer += dt
if self._clear_timer >= self._clear_hold:
# Success — threat has cleared
self._state = RecoveryState.IDLE
return out
else:
self._clear_timer = 0.0
if self._retry_timer >= self._retry_tout:
if self._retry_count >= self._max_retry:
self._state = RecoveryState.GAVE_UP
out.gave_up = True
else:
self._state = RecoveryState.REVERSING
# ── GAVE_UP ───────────────────────────────────────────────────────────
elif self._state == RecoveryState.GAVE_UP:
# Stay in GAVE_UP until external reset()
out.gave_up = True
return out
# ── Entry actions ─────────────────────────────────────────────────────────
def _on_enter(self, state: RecoveryState) -> None:
if state == RecoveryState.REVERSING:
self._rev_done = 0.0
elif state == RecoveryState.TURNING:
self._turn_done = 0.0
elif state == RecoveryState.RETRYING:
self._retry_timer = 0.0
self._clear_timer = 0.0

View File

@ -0,0 +1,354 @@
"""
threat_detector.py Multi-source threat detection for emergency behavior (Issue #169).
Detectors
ObstacleDetector : Forward-sector minimum range < stop thresholds at speed.
Inputs: min_range_m (pre-filtered from LaserScan forward
sector), cmd_speed_ms.
FallDetector : Extreme pitch/roll from IMU, or depth floor-drop ahead.
Inputs: pitch_rad, roll_rad, floor_drop_m (depth-derived;
0.0 if depth unavailable).
StuckDetector : Commanded speed vs actual speed mismatch sustained for
stuck_timeout_s. Tracks elapsed time with dt argument.
BumpDetector : IMU acceleration jerk (|Δ|a||/dt) above threshold.
MAJOR at jerk_threshold_ms3, CRITICAL at
critical_jerk_threshold_ms3.
ThreatLevel
CLEAR : no threat; normal operation
MINOR : advisory; log/beep only
MAJOR : stop and execute recovery
CRITICAL : full shutdown + MQTT escalation
Pure module no ROS2 dependency.
"""
from __future__ import annotations
import math
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional
# ── Enumerations ──────────────────────────────────────────────────────────────
class ThreatLevel(Enum):
CLEAR = 0
MINOR = 1
MAJOR = 2
CRITICAL = 3
class ThreatType(Enum):
NONE = "NONE"
OBSTACLE_PROXIMITY = "OBSTACLE_PROXIMITY"
FALL_RISK = "FALL_RISK"
WHEEL_STUCK = "WHEEL_STUCK"
BUMP = "BUMP"
# ── ThreatEvent ───────────────────────────────────────────────────────────────
@dataclass
class ThreatEvent:
"""Snapshot of a single detected threat."""
threat_type: ThreatType = ThreatType.NONE
level: ThreatLevel = ThreatLevel.CLEAR
value: float = 0.0 # triggering metric
detail: str = ""
timestamp_s: float = 0.0
@staticmethod
def clear(timestamp_s: float = 0.0) -> "ThreatEvent":
return ThreatEvent(timestamp_s=timestamp_s)
_CLEAR = ThreatEvent()
# ── ObstacleDetector ─────────────────────────────────────────────────────────
class ObstacleDetector:
"""
Obstacle proximity threat from forward-sector minimum range.
Parameters
----------
stop_distance_m : range below which MAJOR is raised (default 0.30 m)
critical_distance_m : range below which CRITICAL is raised (default 0.10 m)
min_speed_ms : only active above this commanded speed (default 0.05 m/s)
"""
def __init__(
self,
stop_distance_m: float = 0.30,
critical_distance_m: float = 0.10,
min_speed_ms: float = 0.05,
):
self._stop = max(1e-3, stop_distance_m)
self._critical = max(1e-3, min(self._stop, critical_distance_m))
self._min_spd = abs(min_speed_ms)
def update(
self,
min_range_m: float,
cmd_speed_ms: float,
timestamp_s: float = 0.0,
) -> ThreatEvent:
"""
Parameters
----------
min_range_m : minimum obstacle range in forward sector (m)
cmd_speed_ms : signed commanded forward speed (m/s)
"""
if abs(cmd_speed_ms) < self._min_spd:
return ThreatEvent.clear(timestamp_s)
if min_range_m <= self._critical:
return ThreatEvent(
threat_type=ThreatType.OBSTACLE_PROXIMITY,
level=ThreatLevel.CRITICAL,
value=min_range_m,
detail=f"Obstacle {min_range_m:.2f} m (critical zone)",
timestamp_s=timestamp_s,
)
if min_range_m <= self._stop:
return ThreatEvent(
threat_type=ThreatType.OBSTACLE_PROXIMITY,
level=ThreatLevel.MAJOR,
value=min_range_m,
detail=f"Obstacle {min_range_m:.2f} m ahead",
timestamp_s=timestamp_s,
)
return ThreatEvent.clear(timestamp_s)
# ── FallDetector ──────────────────────────────────────────────────────────────
class FallDetector:
"""
Fall / tipping risk from IMU pitch/roll and optional depth floor-drop.
Parameters
----------
minor_tilt_rad : |pitch| or |roll| above which MINOR fires (default 0.20 rad)
major_tilt_rad : above which MAJOR fires (default 0.35 rad)
critical_tilt_rad : above which CRITICAL fires (default 0.52 rad 30°)
floor_drop_m : depth discontinuity (m) triggering MAJOR (default 0.15 m)
"""
def __init__(
self,
minor_tilt_rad: float = 0.20,
major_tilt_rad: float = 0.35,
critical_tilt_rad: float = 0.52,
floor_drop_m: float = 0.15,
):
self._minor = float(minor_tilt_rad)
self._major = float(major_tilt_rad)
self._critical = float(critical_tilt_rad)
self._drop = float(floor_drop_m)
def update(
self,
pitch_rad: float,
roll_rad: float,
floor_drop_m: float = 0.0,
timestamp_s: float = 0.0,
) -> ThreatEvent:
"""
Parameters
----------
pitch_rad : forward tilt (rad); +ve = nose up
roll_rad : lateral tilt (rad); +ve = left side up
floor_drop_m : depth discontinuity ahead of robot (m); 0 = not measured
"""
tilt = max(abs(pitch_rad), abs(roll_rad))
if tilt >= self._critical:
return ThreatEvent(
threat_type=ThreatType.FALL_RISK,
level=ThreatLevel.CRITICAL,
value=tilt,
detail=f"Critical tilt {math.degrees(tilt):.1f}°",
timestamp_s=timestamp_s,
)
if tilt >= self._major or floor_drop_m >= self._drop:
value = max(tilt, floor_drop_m)
detail = (
f"Floor drop {floor_drop_m:.2f} m" if floor_drop_m >= self._drop
else f"Major tilt {math.degrees(tilt):.1f}°"
)
return ThreatEvent(
threat_type=ThreatType.FALL_RISK,
level=ThreatLevel.MAJOR,
value=value,
detail=detail,
timestamp_s=timestamp_s,
)
if tilt >= self._minor:
return ThreatEvent(
threat_type=ThreatType.FALL_RISK,
level=ThreatLevel.MINOR,
value=tilt,
detail=f"Tilt advisory {math.degrees(tilt):.1f}°",
timestamp_s=timestamp_s,
)
return ThreatEvent.clear(timestamp_s)
# ── StuckDetector ─────────────────────────────────────────────────────────────
class StuckDetector:
"""
Wheel stall / stuck detection from cmd_vel vs odometry mismatch.
Accumulates stuck time while |cmd| > min_cmd_ms AND |actual| < moving_ms.
Resets when motion resumes or commanded speed drops below min_cmd_ms.
Parameters
----------
stuck_timeout_s : accumulated stuck time before MAJOR fires (default 3.0 s)
min_cmd_ms : minimum commanded speed to consider stalling (0.05 m/s)
moving_threshold_ms : actual speed above which robot is considered moving
"""
def __init__(
self,
stuck_timeout_s: float = 3.0,
min_cmd_ms: float = 0.05,
moving_threshold_ms: float = 0.05,
):
self._timeout = max(0.1, stuck_timeout_s)
self._min_cmd = abs(min_cmd_ms)
self._moving = abs(moving_threshold_ms)
self._stuck_time: float = 0.0
@property
def stuck_time(self) -> float:
"""Accumulated stuck duration (s)."""
return self._stuck_time
def update(
self,
cmd_speed_ms: float,
actual_speed_ms: float,
dt: float,
timestamp_s: float = 0.0,
) -> ThreatEvent:
"""
Parameters
----------
cmd_speed_ms : commanded forward speed (m/s)
actual_speed_ms : measured forward speed from odometry (m/s)
dt : elapsed time since last call (s)
"""
commanding = abs(cmd_speed_ms) >= self._min_cmd
moving = abs(actual_speed_ms) >= self._moving
if not commanding or moving:
self._stuck_time = 0.0
return ThreatEvent.clear(timestamp_s)
self._stuck_time += max(0.0, dt)
if self._stuck_time >= self._timeout:
return ThreatEvent(
threat_type=ThreatType.WHEEL_STUCK,
level=ThreatLevel.MAJOR,
value=self._stuck_time,
detail=f"Wheels stuck for {self._stuck_time:.1f} s",
timestamp_s=timestamp_s,
)
return ThreatEvent.clear(timestamp_s)
def reset(self) -> None:
self._stuck_time = 0.0
# ── BumpDetector ─────────────────────────────────────────────────────────────
class BumpDetector:
"""
Collision / bump detection via IMU acceleration jerk.
Jerk = |Δ|a|| / dt where |a| = sqrt(ax²+ay²+az²) g (gravity removed)
Parameters
----------
jerk_threshold_ms3 : MAJOR at jerk above this (m/, default 8.0)
critical_jerk_threshold_ms3 : CRITICAL at jerk above this (m/, default 25.0)
gravity_ms2 : gravity magnitude to subtract (default 9.81)
"""
def __init__(
self,
jerk_threshold_ms3: float = 8.0,
critical_jerk_threshold_ms3: float = 25.0,
gravity_ms2: float = 9.81,
):
self._jerk_major = float(jerk_threshold_ms3)
self._jerk_critical = float(critical_jerk_threshold_ms3)
self._gravity = float(gravity_ms2)
self._prev_dyn_mag: Optional[float] = None # previous |a_dynamic|
def update(
self,
ax: float,
ay: float,
az: float,
dt: float,
timestamp_s: float = 0.0,
) -> ThreatEvent:
"""
Parameters
----------
ax, ay, az : IMU linear acceleration (m/)
dt : elapsed time since last call (s)
"""
raw_mag = math.sqrt(ax * ax + ay * ay + az * az)
dyn_mag = abs(raw_mag - self._gravity) # remove gravity component
if self._prev_dyn_mag is None or dt <= 0.0:
self._prev_dyn_mag = dyn_mag
return ThreatEvent.clear(timestamp_s)
jerk = abs(dyn_mag - self._prev_dyn_mag) / dt
self._prev_dyn_mag = dyn_mag
if jerk >= self._jerk_critical:
return ThreatEvent(
threat_type=ThreatType.BUMP,
level=ThreatLevel.CRITICAL,
value=jerk,
detail=f"Critical impact: jerk {jerk:.1f} m/s³",
timestamp_s=timestamp_s,
)
if jerk >= self._jerk_major:
return ThreatEvent(
threat_type=ThreatType.BUMP,
level=ThreatLevel.MAJOR,
value=jerk,
detail=f"Bump detected: jerk {jerk:.1f} m/s³",
timestamp_s=timestamp_s,
)
return ThreatEvent.clear(timestamp_s)
def reset(self) -> None:
self._prev_dyn_mag = None
# ── Utility: pick highest-severity threat ─────────────────────────────────────
def highest_threat(events: list[ThreatEvent]) -> ThreatEvent:
"""Return the ThreatEvent with the highest ThreatLevel from a list."""
if not events:
return _CLEAR
return max(events, key=lambda e: e.level.value)

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_emergency
[install]
install_scripts=$base/lib/saltybot_emergency

View File

@ -0,0 +1,32 @@
from setuptools import setup, find_packages
import os
from glob import glob
package_name = "saltybot_emergency"
setup(
name=package_name,
version="0.1.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages",
[f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(os.path.join("share", package_name, "config"),
glob("config/*.yaml")),
(os.path.join("share", package_name, "launch"),
glob("launch/*.py")),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description="Emergency behavior system — collision avoidance, fall prevention, stuck detection, recovery",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
f"emergency_node = {package_name}.emergency_node:main",
],
},
)

View File

@ -0,0 +1,560 @@
"""
test_emergency.py Unit tests for Issue #169 emergency behavior modules.
Covers:
ObstacleDetector proximity thresholds, speed gate
FallDetector tilt levels, floor drop
StuckDetector timeout accumulation, reset on motion
BumpDetector jerk thresholds, first-call safety
AlertManager severity mapping, escalation, suppression
RecoverySequencer full sequence, retry, give-up
EmergencyFSM all state transitions and guard conditions
"""
import math
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import pytest
from saltybot_emergency.threat_detector import (
BumpDetector,
FallDetector,
ObstacleDetector,
StuckDetector,
ThreatEvent,
ThreatLevel,
ThreatType,
highest_threat,
)
from saltybot_emergency.alert_manager import Alert, AlertLevel, AlertManager
from saltybot_emergency.recovery_sequencer import (
RecoveryInputs,
RecoverySequencer,
RecoveryState,
)
from saltybot_emergency.emergency_fsm import (
EmergencyFSM,
EmergencyInputs,
EmergencyState,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _obs(**kw):
d = dict(stop_distance_m=0.30, critical_distance_m=0.10, min_speed_ms=0.05)
d.update(kw)
return ObstacleDetector(**d)
def _fall(**kw):
d = dict(minor_tilt_rad=0.20, major_tilt_rad=0.35,
critical_tilt_rad=0.52, floor_drop_m=0.15)
d.update(kw)
return FallDetector(**d)
def _stuck(**kw):
d = dict(stuck_timeout_s=3.0, min_cmd_ms=0.05, moving_threshold_ms=0.05)
d.update(kw)
return StuckDetector(**d)
def _bump(**kw):
d = dict(jerk_threshold_ms3=8.0, critical_jerk_threshold_ms3=25.0)
d.update(kw)
return BumpDetector(**d)
def _alert_mgr(**kw):
d = dict(major_count_threshold=3, escalation_window_s=10.0, suppression_s=1.0)
d.update(kw)
return AlertManager(**d)
def _seq(**kw):
d = dict(
reverse_speed_ms=-0.15, reverse_distance_m=0.30,
angular_speed_rads=0.60, turn_angle_rad=1.5708,
retry_timeout_s=3.0, clear_hold_s=0.5, max_retries=3,
)
d.update(kw)
return RecoverySequencer(**d)
def _fsm(**kw):
d = dict(
stopped_ms=0.03, major_count_threshold=3, escalation_window_s=10.0,
suppression_s=0.0, # disable suppression for cleaner tests
reverse_speed_ms=-0.15, reverse_distance_m=0.30,
angular_speed_rads=0.60, turn_angle_rad=1.5708,
retry_timeout_s=3.0, clear_hold_s=0.5, max_retries=3,
)
d.update(kw)
return EmergencyFSM(**d)
def _major_threat(threat_type=ThreatType.OBSTACLE_PROXIMITY, ts=0.0):
return ThreatEvent(threat_type=threat_type, level=ThreatLevel.MAJOR,
value=1.0, detail="test", timestamp_s=ts)
def _critical_threat(ts=0.0):
return ThreatEvent(threat_type=ThreatType.OBSTACLE_PROXIMITY,
level=ThreatLevel.CRITICAL, value=0.05,
detail="critical test", timestamp_s=ts)
def _minor_threat(ts=0.0):
return ThreatEvent(threat_type=ThreatType.FALL_RISK, level=ThreatLevel.MINOR,
value=0.21, detail="tilt", timestamp_s=ts)
def _clear_threat():
return ThreatEvent.clear()
def _inp(threat=None, speed=0.0, ack=False):
return EmergencyInputs(
threat=threat or _clear_threat(),
robot_speed_ms=speed,
acknowledge=ack,
)
# ══════════════════════════════════════════════════════════════════════════════
# ObstacleDetector
# ══════════════════════════════════════════════════════════════════════════════
class TestObstacleDetector:
def test_clear_when_far(self):
ev = _obs().update(0.5, 0.3)
assert ev.level == ThreatLevel.CLEAR
def test_major_within_stop_distance(self):
ev = _obs(stop_distance_m=0.30).update(0.25, 0.3)
assert ev.level == ThreatLevel.MAJOR
assert ev.threat_type == ThreatType.OBSTACLE_PROXIMITY
def test_critical_within_critical_distance(self):
ev = _obs(critical_distance_m=0.10).update(0.05, 0.3)
assert ev.level == ThreatLevel.CRITICAL
def test_clear_when_stopped(self):
"""Obstacle detection suppressed when not moving."""
ev = _obs(min_speed_ms=0.05).update(0.05, 0.01)
assert ev.level == ThreatLevel.CLEAR
def test_active_at_min_speed(self):
ev = _obs(min_speed_ms=0.05).update(0.20, 0.06)
assert ev.level == ThreatLevel.MAJOR
def test_value_is_distance(self):
ev = _obs().update(0.20, 0.3)
assert ev.value == pytest.approx(0.20, abs=1e-9)
# ══════════════════════════════════════════════════════════════════════════════
# FallDetector
# ══════════════════════════════════════════════════════════════════════════════
class TestFallDetector:
def test_clear_on_flat(self):
ev = _fall().update(0.0, 0.0)
assert ev.level == ThreatLevel.CLEAR
def test_minor_moderate_tilt(self):
ev = _fall(minor_tilt_rad=0.20, major_tilt_rad=0.35).update(0.25, 0.0)
assert ev.level == ThreatLevel.MINOR
def test_major_high_tilt(self):
ev = _fall(major_tilt_rad=0.35, critical_tilt_rad=0.52).update(0.40, 0.0)
assert ev.level == ThreatLevel.MAJOR
def test_critical_extreme_tilt(self):
ev = _fall(critical_tilt_rad=0.52).update(0.60, 0.0)
assert ev.level == ThreatLevel.CRITICAL
def test_major_on_floor_drop(self):
ev = _fall(floor_drop_m=0.15).update(0.0, 0.0, floor_drop_m=0.20)
assert ev.level == ThreatLevel.MAJOR
assert "drop" in ev.detail.lower()
def test_roll_triggers_same_as_pitch(self):
"""Roll beyond minor threshold also fires."""
ev = _fall(minor_tilt_rad=0.20).update(0.0, 0.25)
assert ev.level == ThreatLevel.MINOR
# ══════════════════════════════════════════════════════════════════════════════
# StuckDetector
# ══════════════════════════════════════════════════════════════════════════════
class TestStuckDetector:
def test_clear_when_not_commanded(self):
s = _stuck(stuck_timeout_s=1.0, min_cmd_ms=0.05)
ev = s.update(0.01, 0.0, dt=1.0) # cmd below threshold
assert ev.level == ThreatLevel.CLEAR
def test_clear_when_moving(self):
s = _stuck(stuck_timeout_s=1.0)
ev = s.update(0.2, 0.2, dt=1.0) # actually moving
assert ev.level == ThreatLevel.CLEAR
def test_major_after_timeout(self):
s = _stuck(stuck_timeout_s=3.0, min_cmd_ms=0.05, moving_threshold_ms=0.05)
for _ in range(6):
ev = s.update(0.2, 0.0, dt=0.5) # cmd=0.2, actual=0 → stuck
assert ev.level == ThreatLevel.MAJOR
def test_no_major_before_timeout(self):
s = _stuck(stuck_timeout_s=3.0)
ev = s.update(0.2, 0.0, dt=1.0) # only 1s — not yet
assert ev.level == ThreatLevel.CLEAR
def test_reset_on_motion_resume(self):
s = _stuck(stuck_timeout_s=1.0)
s.update(0.2, 0.0, dt=0.8) # accumulate stuck time
s.update(0.2, 0.3, dt=0.1) # motion resumes → reset
ev = s.update(0.2, 0.0, dt=0.3) # only 0.3s since reset → still clear
assert ev.level == ThreatLevel.CLEAR
def test_stuck_time_property(self):
s = _stuck(stuck_timeout_s=5.0)
s.update(0.2, 0.0, dt=1.0)
s.update(0.2, 0.0, dt=1.0)
assert s.stuck_time == pytest.approx(2.0, abs=1e-6)
# ══════════════════════════════════════════════════════════════════════════════
# BumpDetector
# ══════════════════════════════════════════════════════════════════════════════
class TestBumpDetector:
def test_clear_on_first_call(self):
"""No jerk on first sample (no previous value)."""
ev = _bump().update(0.0, 0.0, 9.81, dt=0.05)
assert ev.level == ThreatLevel.CLEAR
def test_major_on_jerk(self):
b = _bump(jerk_threshold_ms3=5.0, critical_jerk_threshold_ms3=20.0)
b.update(0.0, 0.0, 9.81, dt=0.05) # seed → dyn_mag = 0
# ax=4.5: raw≈10.79, dyn≈0.98, jerk≈9.8 m/s³ → MAJOR (5.0 < 9.8 < 20.0)
ev = b.update(4.5, 0.0, 9.81, dt=0.1)
assert ev.level == ThreatLevel.MAJOR
def test_critical_on_severe_jerk(self):
b = _bump(jerk_threshold_ms3=5.0, critical_jerk_threshold_ms3=20.0)
b.update(0.0, 0.0, 9.81, dt=0.05)
# Very large spike
ev = b.update(50.0, 0.0, 9.81, dt=0.1)
assert ev.level == ThreatLevel.CRITICAL
def test_clear_on_gentle_acceleration(self):
b = _bump(jerk_threshold_ms3=8.0)
b.update(0.0, 0.0, 9.81, dt=0.05)
ev = b.update(0.1, 0.0, 9.81, dt=0.05) # tiny change
assert ev.level == ThreatLevel.CLEAR
# ══════════════════════════════════════════════════════════════════════════════
# highest_threat
# ══════════════════════════════════════════════════════════════════════════════
class TestHighestThreat:
def test_empty_returns_clear(self):
assert highest_threat([]).level == ThreatLevel.CLEAR
def test_picks_highest(self):
a = ThreatEvent(level=ThreatLevel.MINOR)
b = ThreatEvent(level=ThreatLevel.CRITICAL)
c = ThreatEvent(level=ThreatLevel.MAJOR)
assert highest_threat([a, b, c]).level == ThreatLevel.CRITICAL
def test_single_item(self):
ev = ThreatEvent(level=ThreatLevel.MAJOR)
assert highest_threat([ev]) is ev
# ══════════════════════════════════════════════════════════════════════════════
# AlertManager
# ══════════════════════════════════════════════════════════════════════════════
class TestAlertManager:
def test_clear_returns_none(self):
am = _alert_mgr()
assert am.update(_clear_threat()) is None
def test_minor_threat_gives_minor_alert(self):
am = _alert_mgr(suppression_s=0.0)
alert = am.update(_minor_threat(ts=0.0))
assert alert is not None
assert alert.level == AlertLevel.MINOR
def test_major_threat_gives_major_alert(self):
am = _alert_mgr(suppression_s=0.0)
alert = am.update(_major_threat(ts=0.0))
assert alert is not None
assert alert.level == AlertLevel.MAJOR
def test_critical_threat_gives_critical_alert(self):
am = _alert_mgr(suppression_s=0.0)
alert = am.update(_critical_threat(ts=0.0))
assert alert is not None
assert alert.level == AlertLevel.CRITICAL
def test_suppression_blocks_duplicate(self):
am = _alert_mgr(suppression_s=5.0)
am.update(_major_threat(ts=0.0))
alert = am.update(_major_threat(ts=1.0)) # within 5s window
assert alert is None
def test_suppression_expires(self):
am = _alert_mgr(suppression_s=2.0)
am.update(_major_threat(ts=0.0))
alert = am.update(_major_threat(ts=3.0)) # after 2s window
assert alert is not None
def test_escalation_major_to_critical(self):
"""After major_count_threshold major alerts, next one becomes CRITICAL."""
am = _alert_mgr(major_count_threshold=3, escalation_window_s=60.0,
suppression_s=0.0)
for i in range(3):
am.update(_major_threat(ts=float(i)))
# 4th should be escalated
alert = am.update(_major_threat(ts=4.0))
assert alert is not None
assert alert.level == AlertLevel.CRITICAL
def test_escalation_resets_after_window(self):
"""Major alerts outside the window don't contribute to escalation."""
am = _alert_mgr(major_count_threshold=3, escalation_window_s=5.0,
suppression_s=0.0)
am.update(_major_threat(ts=0.0))
am.update(_major_threat(ts=1.0))
am.update(_major_threat(ts=2.0))
# All 3 are old; new one at t=10 is outside window
alert = am.update(_major_threat(ts=10.0))
assert alert is not None
assert alert.level == AlertLevel.MAJOR # not escalated
def test_reset_clears_escalation_state(self):
am = _alert_mgr(major_count_threshold=2, suppression_s=0.0)
am.update(_major_threat(ts=0.0))
am.update(_major_threat(ts=1.0)) # now at threshold
am.reset()
alert = am.update(_major_threat(ts=2.0))
assert alert.level == AlertLevel.MAJOR # back to major after reset
# ══════════════════════════════════════════════════════════════════════════════
# RecoverySequencer
# ══════════════════════════════════════════════════════════════════════════════
class TestRecoverySequencer:
def _trigger(self, seq):
return seq.tick(RecoveryInputs(trigger=True, dt=0.02))
def test_idle_on_init(self):
seq = _seq()
assert seq.state == RecoveryState.IDLE
def test_trigger_starts_reversing(self):
seq = _seq()
out = self._trigger(seq)
assert seq.state == RecoveryState.REVERSING
def test_reversing_backward_velocity(self):
seq = _seq(reverse_speed_ms=-0.15)
self._trigger(seq)
out = seq.tick(RecoveryInputs(dt=0.02))
assert out.cmd_linear < 0.0
def test_reversing_completes_to_turning(self):
seq = _seq(reverse_speed_ms=-1.0, reverse_distance_m=0.5)
self._trigger(seq)
for _ in range(30):
out = seq.tick(RecoveryInputs(dt=0.02))
assert seq.state == RecoveryState.TURNING
def test_turning_positive_angular(self):
seq = _seq(reverse_speed_ms=-1.0, reverse_distance_m=0.1,
angular_speed_rads=1.0)
self._trigger(seq)
# Skip through reversing quickly
for _ in range(20):
seq.tick(RecoveryInputs(dt=0.02))
if seq.state == RecoveryState.TURNING:
out = seq.tick(RecoveryInputs(dt=0.02))
assert out.cmd_angular > 0.0
def test_retrying_increments_count(self):
seq = _seq(reverse_speed_ms=-1.0, reverse_distance_m=0.05,
angular_speed_rads=10.0, turn_angle_rad=0.1)
self._trigger(seq)
for _ in range(100):
seq.tick(RecoveryInputs(dt=0.02))
assert seq.state == RecoveryState.RETRYING
assert seq.retry_count == 1
def test_threat_cleared_returns_idle(self):
seq = _seq(reverse_speed_ms=-1.0, reverse_distance_m=0.05,
angular_speed_rads=10.0, turn_angle_rad=0.1,
clear_hold_s=0.1)
self._trigger(seq)
# Fast-forward to RETRYING
for _ in range(100):
seq.tick(RecoveryInputs(dt=0.02))
assert seq.state == RecoveryState.RETRYING
# Feed cleared ticks until clear_hold met
for _ in range(20):
seq.tick(RecoveryInputs(threat_cleared=True, dt=0.02))
assert seq.state == RecoveryState.IDLE
def test_max_retries_gives_up(self):
seq = _seq(reverse_speed_ms=-1.0, reverse_distance_m=0.05,
angular_speed_rads=10.0, turn_angle_rad=0.1,
retry_timeout_s=0.1, max_retries=2)
self._trigger(seq)
for _ in range(500):
out = seq.tick(RecoveryInputs(threat_cleared=False, dt=0.05))
if seq.state == RecoveryState.GAVE_UP:
break
assert seq.state == RecoveryState.GAVE_UP
def test_reset_returns_to_idle(self):
seq = _seq()
self._trigger(seq)
seq.reset()
assert seq.state == RecoveryState.IDLE
assert seq.retry_count == 0
# ══════════════════════════════════════════════════════════════════════════════
# EmergencyFSM
# ══════════════════════════════════════════════════════════════════════════════
class TestEmergencyFSMBasic:
def test_initial_state_nominal(self):
fsm = _fsm()
assert fsm.state == EmergencyState.NOMINAL
def test_nominal_stays_on_clear(self):
fsm = _fsm()
out = fsm.tick(_inp())
assert fsm.state == EmergencyState.NOMINAL
assert out.cmd_override is False
def test_minor_alert_no_override(self):
fsm = _fsm()
out = fsm.tick(_inp(_minor_threat(ts=0.0)))
assert fsm.state == EmergencyState.NOMINAL
assert out.cmd_override is False
assert out.alert is not None
assert out.alert.level == AlertLevel.MINOR
def test_major_threat_enters_stopping(self):
fsm = _fsm()
out = fsm.tick(_inp(_major_threat()))
assert fsm.state == EmergencyState.STOPPING
assert out.cmd_override is True
def test_critical_threat_enters_stopping_critical_pending(self):
fsm = _fsm()
fsm.tick(_inp(_critical_threat()))
assert fsm.state == EmergencyState.STOPPING
assert fsm._critical_pending is True
class TestEmergencyFSMStopping:
def test_stopping_commands_zero(self):
fsm = _fsm()
fsm.tick(_inp(_major_threat()))
out = fsm.tick(_inp(_major_threat(), speed=0.5))
assert out.cmd_linear == pytest.approx(0.0, abs=1e-9)
assert out.cmd_angular == pytest.approx(0.0, abs=1e-9)
def test_stopped_enters_recovering(self):
fsm = _fsm(stopped_ms=0.03)
fsm.tick(_inp(_major_threat()))
out = fsm.tick(_inp(_major_threat(), speed=0.01)) # below stopped_ms
assert fsm.state == EmergencyState.RECOVERING
def test_critical_pending_enters_escalated(self):
fsm = _fsm(stopped_ms=0.03)
fsm.tick(_inp(_critical_threat()))
fsm.tick(_inp(_critical_threat(), speed=0.01)) # stopped → ESCALATED
assert fsm.state == EmergencyState.ESCALATED
class TestEmergencyFSMRecovering:
def _reach_recovering(self, fsm):
fsm.tick(_inp(_major_threat()))
fsm.tick(_inp(_major_threat(), speed=0.0)) # stopped → RECOVERING
assert fsm.state == EmergencyState.RECOVERING
def test_recovering_has_cmd_override(self):
fsm = _fsm()
self._reach_recovering(fsm)
out = fsm.tick(_inp(_clear_threat()))
assert out.cmd_override is True
def test_recovering_gave_up_escalates(self):
fsm = _fsm(max_retries=1, retry_timeout_s=0.05)
self._reach_recovering(fsm)
# Drive recovery to GAVE_UP by feeding many non-clearing ticks
for _ in range(500):
out = fsm.tick(_inp(_major_threat()))
if fsm.state == EmergencyState.ESCALATED:
break
assert fsm.state == EmergencyState.ESCALATED
class TestEmergencyFSMEscalated:
def _reach_escalated(self, fsm):
fsm.tick(_inp(_critical_threat()))
fsm.tick(_inp(_critical_threat(), speed=0.0))
assert fsm.state == EmergencyState.ESCALATED
def test_escalated_emits_critical_alert_once(self):
fsm = _fsm()
self._reach_escalated(fsm)
out1 = fsm.tick(_inp())
out2 = fsm.tick(_inp())
assert out1.alert is not None
assert out1.alert.level == AlertLevel.CRITICAL
assert out2.alert is None # suppressed after first emission
def test_escalated_e_stop_asserted(self):
fsm = _fsm()
self._reach_escalated(fsm)
out = fsm.tick(_inp())
assert out.e_stop is True
def test_escalated_stays_without_ack(self):
fsm = _fsm()
self._reach_escalated(fsm)
for _ in range(5):
fsm.tick(_inp())
assert fsm.state == EmergencyState.ESCALATED
def test_acknowledge_returns_to_nominal(self):
fsm = _fsm()
self._reach_escalated(fsm)
fsm.tick(_inp(ack=True))
assert fsm.state == EmergencyState.NOMINAL
def test_reset_returns_to_nominal(self):
fsm = _fsm()
self._reach_escalated(fsm)
fsm.reset()
assert fsm.state == EmergencyState.NOMINAL
def test_e_stop_cleared_on_ack(self):
fsm = _fsm()
self._reach_escalated(fsm)
out = fsm.tick(_inp(ack=True))
assert out.e_stop is False

View File

@ -0,0 +1,15 @@
cmake_minimum_required(VERSION 3.8)
project(saltybot_emergency_msgs)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(builtin_interfaces REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/EmergencyEvent.msg"
"msg/RecoveryAction.msg"
DEPENDENCIES builtin_interfaces
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()

View File

@ -0,0 +1,25 @@
# EmergencyEvent.msg — Real-time emergency system state snapshot (Issue #169)
# Published by: /saltybot/emergency_node
# Topic: /saltybot/emergency
builtin_interfaces/Time stamp
# Overall FSM state
# Values: "NOMINAL" | "STOPPING" | "RECOVERING" | "ESCALATED"
string state
# Active threat (highest severity across all detectors)
# threat_type values: "NONE" | "OBSTACLE_PROXIMITY" | "FALL_RISK" | "WHEEL_STUCK" | "BUMP"
string threat_type
# Severity: "CLEAR" | "MINOR" | "MAJOR" | "CRITICAL"
string severity
# Triggering metric value (e.g. distance in m, jerk in m/s³, stuck seconds)
float32 threat_value
# Human-readable description of the active threat
string detail
# True when emergency system is overriding normal cmd_vel with its own commands
bool cmd_override

View File

@ -0,0 +1,15 @@
# RecoveryAction.msg — Recovery sequencer state (Issue #169)
# Published by: /saltybot/emergency_node
# Topic: /saltybot/recovery_action
builtin_interfaces/Time stamp
# Current recovery action
# Values: "IDLE" | "REVERSING" | "TURNING" | "RETRYING" | "GAVE_UP"
string action
# Number of reverse+turn attempts completed so far
int32 retry_count
# Progress through current phase [0.0 1.0]
float32 progress

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_emergency_msgs</name>
<version>0.1.0</version>
<description>Emergency behavior message definitions for SaltyBot (Issue #169)</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<depend>builtin_interfaces</depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>