diff --git a/jetson/ros2_ws/src/saltybot_emergency/config/emergency_params.yaml b/jetson/ros2_ws/src/saltybot_emergency/config/emergency_params.yaml
new file mode 100644
index 0000000..161ab00
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_emergency/config/emergency_params.yaml
@@ -0,0 +1,43 @@
+/**:
+ ros__parameters:
+ # Control loop rate (Hz)
+ control_rate: 20.0
+
+ # Odometry topic for stuck detection
+ odom_topic: "/saltybot/rover_odom"
+
+ # ── LaserScan forward sector ───────────────────────────────────────────────
+ forward_scan_angle_rad: 0.785 # ±45° forward sector
+
+ # ── Obstacle proximity ────────────────────────────────────────────────────
+ stop_distance_m: 0.30 # MAJOR threshold (spec: <30 cm)
+ critical_distance_m: 0.10 # CRITICAL threshold
+ min_cmd_speed_ms: 0.05 # ignore obstacle when nearly stopped
+
+ # ── Fall detection (IMU tilt) ─────────────────────────────────────────────
+ minor_tilt_rad: 0.20 # advisory
+ major_tilt_rad: 0.35 # stop + recover
+ critical_tilt_rad: 0.52 # ~30° — full shutdown
+ floor_drop_m: 0.15 # depth discontinuity triggering MAJOR
+
+ # ── Stuck detection ───────────────────────────────────────────────────────
+ stuck_timeout_s: 3.0 # (spec: 3 s wheel stall)
+
+ # ── Bump / jerk detection ─────────────────────────────────────────────────
+ jerk_threshold_ms3: 8.0
+ critical_jerk_threshold_ms3: 25.0
+
+ # ── FSM / recovery ────────────────────────────────────────────────────────
+ stopped_ms: 0.03 # speed below which robot is "stopped" (m/s)
+ major_count_threshold: 3 # MAJOR alerts before escalation to CRITICAL
+ escalation_window_s: 10.0 # sliding window for escalation counter (s)
+ suppression_s: 1.0 # de-bounce period for duplicate alerts (s)
+
+ # Recovery sequence
+ reverse_speed_ms: -0.15 # back-up speed (m/s; must be negative)
+ reverse_distance_m: 0.30 # distance to reverse each cycle (m)
+ angular_speed_rads: 0.60 # turn speed (rad/s)
+ turn_angle_rad: 1.5708 # ~90° turn (rad)
+ retry_timeout_s: 3.0 # time in RETRYING per attempt (s)
+ clear_hold_s: 0.50 # consecutive clear time to declare success (s)
+ max_retries: 3 # maximum reverse+turn attempts before GAVE_UP
diff --git a/jetson/ros2_ws/src/saltybot_emergency/launch/emergency.launch.py b/jetson/ros2_ws/src/saltybot_emergency/launch/emergency.launch.py
new file mode 100644
index 0000000..62822ae
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_emergency/launch/emergency.launch.py
@@ -0,0 +1,53 @@
+"""
+emergency.launch.py — Launch the emergency behavior system (Issue #169).
+
+Usage
+-----
+ ros2 launch saltybot_emergency emergency.launch.py
+ ros2 launch saltybot_emergency emergency.launch.py \
+ stop_distance_m:=0.30 max_retries:=3
+"""
+
+import os
+
+from ament_index_python.packages import get_package_share_directory
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument
+from launch.substitutions import LaunchConfiguration
+from launch_ros.actions import Node
+
+
+def generate_launch_description():
+ pkg_share = get_package_share_directory("saltybot_emergency")
+ default_params = os.path.join(pkg_share, "config", "emergency_params.yaml")
+
+ return LaunchDescription([
+ DeclareLaunchArgument(
+ "params_file",
+ default_value=default_params,
+ description="Path to emergency_params.yaml",
+ ),
+ DeclareLaunchArgument(
+ "stop_distance_m",
+ default_value="0.30",
+ description="Obstacle distance triggering MAJOR stop (m)",
+ ),
+ DeclareLaunchArgument(
+ "max_retries",
+ default_value="3",
+ description="Maximum recovery cycles before ESCALATED",
+ ),
+ Node(
+ package="saltybot_emergency",
+ executable="emergency_node",
+ name="emergency",
+ output="screen",
+ parameters=[
+ LaunchConfiguration("params_file"),
+ {
+ "stop_distance_m": LaunchConfiguration("stop_distance_m"),
+ "max_retries": LaunchConfiguration("max_retries"),
+ },
+ ],
+ ),
+ ])
diff --git a/jetson/ros2_ws/src/saltybot_emergency/package.xml b/jetson/ros2_ws/src/saltybot_emergency/package.xml
new file mode 100644
index 0000000..0837ae4
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_emergency/package.xml
@@ -0,0 +1,24 @@
+
+
+
+ saltybot_emergency
+ 0.1.0
+ Emergency behavior system — collision avoidance, fall prevention, stuck detection, recovery (Issue #169)
+ sl-controls
+ MIT
+
+ rclpy
+ sensor_msgs
+ nav_msgs
+ geometry_msgs
+ std_msgs
+
+ ament_copyright
+ ament_flake8
+ ament_pep257
+ pytest
+
+
+ ament_python
+
+
diff --git a/jetson/ros2_ws/src/saltybot_emergency/resource/saltybot_emergency b/jetson/ros2_ws/src/saltybot_emergency/resource/saltybot_emergency
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/__init__.py b/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/alert_manager.py b/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/alert_manager.py
new file mode 100644
index 0000000..c1097b4
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/alert_manager.py
@@ -0,0 +1,139 @@
+"""
+alert_manager.py — Alert severity escalation for emergency behavior (Issue #169).
+
+Alert levels
+────────────
+ NONE : no action
+ MINOR : advisory beep → publish to /saltybot/alert_beep
+ MAJOR : stop + LED flash → publish to /saltybot/alert_flash; cmd_vel override
+ CRITICAL : full shutdown + MQTT → publish to /saltybot/e_stop + /saltybot/critical_alert
+
+Escalation
+──────────
+ If major_count_threshold MAJOR alerts occur within escalation_window_s, the
+ next MAJOR is promoted to CRITICAL. This catches persistent stuck / repeated
+ collision scenarios.
+
+Suppression
+───────────
+ Identical (type, level) alerts are suppressed within suppression_s to avoid
+ flooding downstream topics.
+
+Pure module — no ROS2 dependency.
+"""
+
+from __future__ import annotations
+
+from collections import deque
+from dataclasses import dataclass
+from enum import Enum
+from typing import Optional
+
+from saltybot_emergency.threat_detector import ThreatEvent, ThreatLevel, ThreatType
+
+
+# ── Alert level ───────────────────────────────────────────────────────────────
+
+class AlertLevel(Enum):
+ NONE = 0
+ MINOR = 1 # beep
+ MAJOR = 2 # stop + flash
+ CRITICAL = 3 # shutdown + MQTT
+
+
+# ── Alert ─────────────────────────────────────────────────────────────────────
+
+@dataclass
+class Alert:
+ level: AlertLevel
+ source: str # ThreatType value string
+ message: str
+ timestamp_s: float
+
+
+# ── AlertManager ─────────────────────────────────────────────────────────────
+
+class AlertManager:
+ """
+ Converts ThreatEvents to Alerts with escalation and suppression logic.
+
+ Parameters
+ ----------
+ major_count_threshold : number of MAJOR alerts within window to escalate
+ escalation_window_s : sliding window for escalation counting (s)
+ suppression_s : suppress duplicate (type, level) alerts within this period
+ """
+
+ def __init__(
+ self,
+ major_count_threshold: int = 3,
+ escalation_window_s: float = 10.0,
+ suppression_s: float = 1.0,
+ ):
+ self._major_threshold = max(1, int(major_count_threshold))
+ self._esc_window = float(escalation_window_s)
+ self._suppress = float(suppression_s)
+ self._major_times: deque = deque() # timestamps of MAJOR alerts
+ self._last_seen: dict = {} # (type, level) → timestamp
+
+ # ── Update ────────────────────────────────────────────────────────────────
+
+ def update(self, threat: ThreatEvent) -> Optional[Alert]:
+ """
+ Convert one ThreatEvent to an Alert, applying escalation and suppression.
+
+ Returns None if threat is CLEAR or the alert is suppressed.
+ """
+ if threat.level == ThreatLevel.CLEAR:
+ return None
+
+ now = threat.timestamp_s
+ alert_level = _threat_to_alert(threat.level)
+
+ # ── Suppression ───────────────────────────────────────────────────────
+ key = (threat.threat_type, threat.level)
+ last = self._last_seen.get(key)
+ if last is not None and (now - last) < self._suppress:
+ return None
+ self._last_seen[key] = now
+
+ # ── Escalation ────────────────────────────────────────────────────────
+ if alert_level == AlertLevel.MAJOR:
+ # Prune old timestamps
+ while self._major_times and (now - self._major_times[0]) > self._esc_window:
+ self._major_times.popleft()
+ self._major_times.append(now)
+ if len(self._major_times) >= self._major_threshold:
+ alert_level = AlertLevel.CRITICAL
+
+ msg = _build_message(alert_level, threat)
+ return Alert(
+ level=alert_level,
+ source=threat.threat_type.value,
+ message=msg,
+ timestamp_s=now,
+ )
+
+ def reset(self) -> None:
+ """Clear escalation history and suppression state."""
+ self._major_times.clear()
+ self._last_seen.clear()
+
+
+# ── Helpers ───────────────────────────────────────────────────────────────────
+
+def _threat_to_alert(level: ThreatLevel) -> AlertLevel:
+ return {
+ ThreatLevel.MINOR: AlertLevel.MINOR,
+ ThreatLevel.MAJOR: AlertLevel.MAJOR,
+ ThreatLevel.CRITICAL: AlertLevel.CRITICAL,
+ }.get(level, AlertLevel.NONE)
+
+
+def _build_message(level: AlertLevel, threat: ThreatEvent) -> str:
+ prefix = {
+ AlertLevel.MINOR: "[MINOR]",
+ AlertLevel.MAJOR: "[MAJOR]",
+ AlertLevel.CRITICAL: "[CRITICAL]",
+ }.get(level, "[?]")
+ return f"{prefix} {threat.threat_type.value}: {threat.detail}"
diff --git a/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/emergency_fsm.py b/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/emergency_fsm.py
new file mode 100644
index 0000000..ae725dc
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/emergency_fsm.py
@@ -0,0 +1,232 @@
+"""
+emergency_fsm.py — Master emergency FSM integrating all detectors (Issue #169).
+
+States
+──────
+ NOMINAL : normal operation; minor alerts pass through; major/critical → STOPPING.
+ STOPPING : commanding zero velocity until robot speed drops below stopped_ms.
+ Critical threats skip RECOVERING → ESCALATED immediately.
+ RECOVERING : RecoverySequencer executing reverse+turn sequence.
+ Success → NOMINAL; gave-up → ESCALATED.
+ ESCALATED : full stop; critical alert emitted once; stays until acknowledge.
+
+Alert actions produced by state
+────────────────────────────────
+ NOMINAL : emit MINOR alert (beep only); no velocity override.
+ STOPPING : suppress nav, publish zero; emit MAJOR alert once.
+ RECOVERING : suppress nav, publish recovery cmds; no new alerts.
+ ESCALATED : suppress nav, publish zero; emit CRITICAL alert once per entry.
+
+Pure module — no ROS2 dependency.
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from enum import Enum
+from typing import Optional
+
+from saltybot_emergency.alert_manager import Alert, AlertLevel, AlertManager
+from saltybot_emergency.recovery_sequencer import RecoveryInputs, RecoverySequencer
+from saltybot_emergency.threat_detector import ThreatEvent, ThreatLevel
+
+
+# ── States ────────────────────────────────────────────────────────────────────
+
+class EmergencyState(Enum):
+ NOMINAL = "NOMINAL"
+ STOPPING = "STOPPING"
+ RECOVERING = "RECOVERING"
+ ESCALATED = "ESCALATED"
+
+
+# ── I/O ───────────────────────────────────────────────────────────────────────
+
+@dataclass
+class EmergencyInputs:
+ threat: ThreatEvent # highest-severity threat this tick
+ robot_speed_ms: float = 0.0 # actual speed from odometry (m/s)
+ acknowledge: bool = False # operator cleared the escalation
+
+
+@dataclass
+class EmergencyOutputs:
+ state: EmergencyState = EmergencyState.NOMINAL
+ cmd_override: bool = False # True = emergency owns cmd_vel
+ cmd_linear: float = 0.0
+ cmd_angular: float = 0.0
+ alert: Optional[Alert] = None
+ e_stop: bool = False # assert /saltybot/e_stop
+ state_changed: bool = False
+ recovery_progress: float = 0.0
+ recovery_retry_count: int = 0
+
+
+# ── EmergencyFSM ──────────────────────────────────────────────────────────────
+
+class EmergencyFSM:
+ """
+ Master emergency FSM.
+
+ Owns an AlertManager and a RecoverySequencer; coordinates them each tick.
+
+ Parameters
+ ----------
+ stopped_ms : speed below which robot is considered stopped (m/s)
+ major_count_threshold : MAJOR events within window before escalation
+ escalation_window_s : sliding window for escalation (s)
+ suppression_s : alert de-bounce period (s)
+ reverse_speed_ms : reverse speed during recovery (m/s)
+ reverse_distance_m : reverse travel per cycle (m)
+ angular_speed_rads : turn speed during recovery (rad/s)
+ turn_angle_rad : turn per cycle (rad)
+ retry_timeout_s : time in RETRYING before next cycle (s)
+ clear_hold_s : clear duration required to declare success (s)
+ max_retries : recovery cycles before GAVE_UP
+ """
+
+ def __init__(
+ self,
+ stopped_ms: float = 0.03,
+ major_count_threshold: int = 3,
+ escalation_window_s: float = 10.0,
+ suppression_s: float = 1.0,
+ reverse_speed_ms: float = -0.15,
+ reverse_distance_m: float = 0.30,
+ angular_speed_rads: float = 0.60,
+ turn_angle_rad: float = 1.5708,
+ retry_timeout_s: float = 3.0,
+ clear_hold_s: float = 0.5,
+ max_retries: int = 3,
+ ):
+ self._stopped_ms = max(0.0, stopped_ms)
+ self._alert_mgr = AlertManager(
+ major_count_threshold=major_count_threshold,
+ escalation_window_s=escalation_window_s,
+ suppression_s=suppression_s,
+ )
+ self._recovery = RecoverySequencer(
+ reverse_speed_ms=reverse_speed_ms,
+ reverse_distance_m=reverse_distance_m,
+ angular_speed_rads=angular_speed_rads,
+ turn_angle_rad=turn_angle_rad,
+ retry_timeout_s=retry_timeout_s,
+ clear_hold_s=clear_hold_s,
+ max_retries=max_retries,
+ )
+ self._state = EmergencyState.NOMINAL
+ self._critical_pending = False # STOPPING → ESCALATED (not RECOVERING)
+ self._escalation_alerted = False # CRITICAL alert emitted once per ESCALATED entry
+
+ # ── Public API ────────────────────────────────────────────────────────────
+
+ @property
+ def state(self) -> EmergencyState:
+ return self._state
+
+ def reset(self) -> None:
+ self._state = EmergencyState.NOMINAL
+ self._critical_pending = False
+ self._escalation_alerted = False
+ self._alert_mgr.reset()
+ self._recovery.reset()
+
+ def tick(self, inputs: EmergencyInputs) -> EmergencyOutputs:
+ prev = self._state
+ out = self._step(inputs)
+ out.state = self._state
+ out.state_changed = (self._state != prev)
+ return out
+
+ # ── Step ─────────────────────────────────────────────────────────────────
+
+ def _step(self, inp: EmergencyInputs) -> EmergencyOutputs:
+ out = EmergencyOutputs(state=self._state)
+
+ # Run alert manager for this threat
+ alert = self._alert_mgr.update(inp.threat)
+
+ # ── NOMINAL ───────────────────────────────────────────────────────────
+ if self._state == EmergencyState.NOMINAL:
+ if inp.threat.level == ThreatLevel.CRITICAL:
+ self._state = EmergencyState.STOPPING
+ self._critical_pending = True
+ out.alert = alert
+ out.cmd_override = True # start overriding on entry tick
+ elif inp.threat.level == ThreatLevel.MAJOR:
+ self._state = EmergencyState.STOPPING
+ self._critical_pending = False
+ out.alert = alert
+ out.cmd_override = True # start overriding on entry tick
+ elif inp.threat.level == ThreatLevel.MINOR:
+ # Advisory only — no override
+ out.alert = alert
+
+ # ── STOPPING ──────────────────────────────────────────────────────────
+ elif self._state == EmergencyState.STOPPING:
+ out.cmd_override = True
+ out.cmd_linear = 0.0
+ out.cmd_angular = 0.0
+ # Upgrade to critical if new critical arrives
+ if inp.threat.level == ThreatLevel.CRITICAL:
+ self._critical_pending = True
+ if abs(inp.robot_speed_ms) <= self._stopped_ms:
+ if self._critical_pending:
+ self._state = EmergencyState.ESCALATED
+ self._escalation_alerted = False
+ else:
+ self._state = EmergencyState.RECOVERING
+ self._recovery.reset()
+ self._recovery.tick(RecoveryInputs(trigger=True, dt=0.0))
+
+ # ── RECOVERING ────────────────────────────────────────────────────────
+ elif self._state == EmergencyState.RECOVERING:
+ threat_cleared = inp.threat.level == ThreatLevel.CLEAR
+ rec_inp = RecoveryInputs(
+ trigger=False,
+ threat_cleared=threat_cleared,
+ dt=0.02, # nominal dt; node should pass actual dt
+ )
+ rec_out = self._recovery.tick(rec_inp)
+
+ out.cmd_override = True
+ out.cmd_linear = rec_out.cmd_linear
+ out.cmd_angular = rec_out.cmd_angular
+ out.recovery_progress = rec_out.progress
+ out.recovery_retry_count = rec_out.retry_count
+
+ if rec_out.gave_up:
+ self._state = EmergencyState.ESCALATED
+ self._escalation_alerted = False
+ elif rec_out.state.value == "IDLE" and not inp.trigger if hasattr(inp, "trigger") else True:
+ # RecoverySequencer returned to IDLE = success
+ from saltybot_emergency.recovery_sequencer import RecoveryState
+ if self._recovery.state == RecoveryState.IDLE and not rec_out.gave_up:
+ self._state = EmergencyState.NOMINAL
+ self._recovery.reset()
+
+ # ── ESCALATED ─────────────────────────────────────────────────────────
+ elif self._state == EmergencyState.ESCALATED:
+ out.cmd_override = True
+ out.cmd_linear = 0.0
+ out.cmd_angular = 0.0
+ out.e_stop = True
+ if not self._escalation_alerted:
+ # Force a CRITICAL alert regardless of suppression
+ from saltybot_emergency.alert_manager import Alert, AlertLevel
+ out.alert = Alert(
+ level=AlertLevel.CRITICAL,
+ source=inp.threat.threat_type.value,
+ message=f"[CRITICAL] ESCALATED: {inp.threat.detail or 'Recovery gave up'}",
+ timestamp_s=inp.threat.timestamp_s,
+ )
+ self._escalation_alerted = True
+ if inp.acknowledge:
+ self._state = EmergencyState.NOMINAL
+ self._critical_pending = False
+ self._escalation_alerted = False
+ out.e_stop = False
+ self._alert_mgr.reset()
+ self._recovery.reset()
+
+ return out
diff --git a/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/emergency_node.py b/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/emergency_node.py
new file mode 100644
index 0000000..79039ba
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/emergency_node.py
@@ -0,0 +1,383 @@
+"""
+emergency_node.py — Emergency behavior system orchestration (Issue #169).
+
+Overview
+────────
+ Aggregates threats from four independent detectors and drives the
+ EmergencyFSM. Overrides /cmd_vel when an emergency is active. Escalates
+ via /saltybot/e_stop and /saltybot/critical_alert for CRITICAL events.
+
+Pipeline (20 Hz)
+────────────────
+ 1. LaserScan callback → ObstacleDetector → ThreatEvent
+ 2. IMU callback → FallDetector + BumpDetector → ThreatEvent (×2)
+ 3. Odom callback → StuckDetector (fed in timer) → ThreatEvent
+ 4. 20 Hz timer → highest_threat() → EmergencyFSM.tick()
+ → publish overriding cmd_vel or pass-through
+ → publish /saltybot/emergency + /saltybot/recovery_action
+
+Subscribes
+──────────
+ /scan sensor_msgs/LaserScan — obstacle detection
+ /saltybot/imu sensor_msgs/Imu — fall + bump detection
+ nav_msgs/Odometry — stuck + speed tracking
+ /cmd_vel geometry_msgs/Twist — nav commands (pass-through)
+
+Publishes
+─────────
+ /saltybot/cmd_vel_out geometry_msgs/Twist — muxed cmd_vel (to drive nodes)
+ /saltybot/e_stop std_msgs/Bool — emergency stop flag
+ /saltybot/alert_beep std_msgs/Empty — beep trigger (MINOR)
+ /saltybot/alert_flash std_msgs/Empty — LED flash trigger (MAJOR)
+ /saltybot/critical_alert std_msgs/String (JSON) — CRITICAL event for MQTT bridge
+ /saltybot/emergency saltybot_emergency_msgs/EmergencyEvent
+ /saltybot/recovery_action saltybot_emergency_msgs/RecoveryAction
+
+Parameters
+──────────
+ control_rate 20.0 Hz
+ odom_topic /saltybot/rover_odom
+ forward_scan_angle_rad 0.785 rad (±45° forward sector for obstacle check)
+ stop_distance_m 0.30 m
+ critical_distance_m 0.10 m
+ min_cmd_speed_ms 0.05 m/s
+ minor_tilt_rad 0.20 rad
+ major_tilt_rad 0.35 rad
+ critical_tilt_rad 0.52 rad
+ floor_drop_m 0.15 m
+ stuck_timeout_s 3.0 s
+ jerk_threshold_ms3 8.0 m/s³
+ critical_jerk_threshold_ms3 25.0 m/s³
+ stopped_ms 0.03 m/s
+ major_count_threshold 3
+ escalation_window_s 10.0 s
+ suppression_s 1.0 s
+ reverse_speed_ms -0.15 m/s
+ reverse_distance_m 0.30 m
+ angular_speed_rads 0.60 rad/s
+ turn_angle_rad 1.5708 rad
+ retry_timeout_s 3.0 s
+ clear_hold_s 0.50 s
+ max_retries 3
+"""
+
+import json
+import math
+import time
+
+import rclpy
+from rclpy.node import Node
+from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
+
+from geometry_msgs.msg import Twist
+from nav_msgs.msg import Odometry
+from sensor_msgs.msg import Imu, LaserScan
+from std_msgs.msg import Bool, Empty, String
+
+from saltybot_emergency.alert_manager import AlertLevel
+from saltybot_emergency.emergency_fsm import EmergencyFSM, EmergencyInputs, EmergencyState
+from saltybot_emergency.threat_detector import (
+ BumpDetector,
+ FallDetector,
+ ObstacleDetector,
+ StuckDetector,
+ ThreatEvent,
+ ThreatType,
+ highest_threat,
+)
+
+try:
+ from saltybot_emergency_msgs.msg import EmergencyEvent, RecoveryAction
+ _MSGS_OK = True
+except ImportError:
+ _MSGS_OK = False
+
+
+def _quaternion_to_pitch_roll(qx, qy, qz, qw):
+ pitch = math.asin(max(-1.0, min(1.0, 2.0 * (qw * qy - qz * qx))))
+ roll = math.atan2(2.0 * (qw * qx + qy * qz), 1.0 - 2.0 * (qx * qx + qy * qy))
+ return pitch, roll
+
+
+class EmergencyNode(Node):
+
+ def __init__(self):
+ super().__init__("emergency")
+
+ self._declare_params()
+ p = self._load_params()
+
+ # ── Detectors ────────────────────────────────────────────────────────
+ self._obstacle = ObstacleDetector(
+ stop_distance_m=p["stop_distance_m"],
+ critical_distance_m=p["critical_distance_m"],
+ min_speed_ms=p["min_cmd_speed_ms"],
+ )
+ self._fall = FallDetector(
+ minor_tilt_rad=p["minor_tilt_rad"],
+ major_tilt_rad=p["major_tilt_rad"],
+ critical_tilt_rad=p["critical_tilt_rad"],
+ floor_drop_m=p["floor_drop_m"],
+ )
+ self._stuck = StuckDetector(
+ stuck_timeout_s=p["stuck_timeout_s"],
+ min_cmd_ms=p["min_cmd_speed_ms"],
+ )
+ self._bump = BumpDetector(
+ jerk_threshold_ms3=p["jerk_threshold_ms3"],
+ critical_jerk_threshold_ms3=p["critical_jerk_threshold_ms3"],
+ )
+ self._fsm = EmergencyFSM(
+ stopped_ms=p["stopped_ms"],
+ major_count_threshold=p["major_count_threshold"],
+ escalation_window_s=p["escalation_window_s"],
+ suppression_s=p["suppression_s"],
+ reverse_speed_ms=p["reverse_speed_ms"],
+ reverse_distance_m=p["reverse_distance_m"],
+ angular_speed_rads=p["angular_speed_rads"],
+ turn_angle_rad=p["turn_angle_rad"],
+ retry_timeout_s=p["retry_timeout_s"],
+ clear_hold_s=p["clear_hold_s"],
+ max_retries=p["max_retries"],
+ )
+
+ # ── State ────────────────────────────────────────────────────────────
+ self._latest_obstacle_threat = ThreatEvent()
+ self._latest_fall_threat = ThreatEvent()
+ self._latest_bump_threat = ThreatEvent()
+ self._cmd_speed_ms = 0.0
+ self._actual_speed_ms = 0.0
+ self._last_ctrl_t = time.monotonic()
+ self._scan_forward_angle = p["forward_scan_angle_rad"]
+ self._acknowledge_flag = False
+
+ # ── QoS ──────────────────────────────────────────────────────────────
+ reliable = QoSProfile(
+ reliability=ReliabilityPolicy.RELIABLE,
+ history=HistoryPolicy.KEEP_LAST,
+ depth=10,
+ )
+ best_effort = QoSProfile(
+ reliability=ReliabilityPolicy.BEST_EFFORT,
+ history=HistoryPolicy.KEEP_LAST,
+ depth=1,
+ )
+
+ # ── Subscriptions ────────────────────────────────────────────────────
+ self.create_subscription(LaserScan, "/scan", self._scan_cb, best_effort)
+ self.create_subscription(Imu, "/saltybot/imu", self._imu_cb, best_effort)
+ self.create_subscription(Odometry, p["odom_topic"], self._odom_cb, reliable)
+ self.create_subscription(Twist, "/cmd_vel", self._cmd_vel_cb, reliable)
+ self.create_subscription(Bool, "/saltybot/emergency_ack", self._ack_cb, reliable)
+
+ # ── Publishers ───────────────────────────────────────────────────────
+ self._cmd_out_pub = self.create_publisher(Twist, "/saltybot/cmd_vel_out", reliable)
+ self._estop_pub = self.create_publisher(Bool, "/saltybot/e_stop", reliable)
+ self._beep_pub = self.create_publisher(Empty, "/saltybot/alert_beep", reliable)
+ self._flash_pub = self.create_publisher(Empty, "/saltybot/alert_flash", reliable)
+ self._critical_pub = self.create_publisher(String, "/saltybot/critical_alert", reliable)
+
+ self._event_pub = None
+ self._recovery_pub = None
+ if _MSGS_OK:
+ self._event_pub = self.create_publisher(EmergencyEvent, "/saltybot/emergency", reliable)
+ self._recovery_pub = self.create_publisher(RecoveryAction, "/saltybot/recovery_action", reliable)
+
+ # ── Timer ────────────────────────────────────────────────────────────
+ rate = p["control_rate"]
+ self._timer = self.create_timer(1.0 / rate, self._control_cb)
+ self.get_logger().info(f"EmergencyNode ready rate={rate}Hz")
+
+ # ── Parameters ────────────────────────────────────────────────────────────
+
+ def _declare_params(self) -> None:
+ self.declare_parameter("control_rate", 20.0)
+ self.declare_parameter("odom_topic", "/saltybot/rover_odom")
+ self.declare_parameter("forward_scan_angle_rad", 0.785)
+ self.declare_parameter("stop_distance_m", 0.30)
+ self.declare_parameter("critical_distance_m", 0.10)
+ self.declare_parameter("min_cmd_speed_ms", 0.05)
+ self.declare_parameter("minor_tilt_rad", 0.20)
+ self.declare_parameter("major_tilt_rad", 0.35)
+ self.declare_parameter("critical_tilt_rad", 0.52)
+ self.declare_parameter("floor_drop_m", 0.15)
+ self.declare_parameter("stuck_timeout_s", 3.0)
+ self.declare_parameter("jerk_threshold_ms3", 8.0)
+ self.declare_parameter("critical_jerk_threshold_ms3", 25.0)
+ self.declare_parameter("stopped_ms", 0.03)
+ self.declare_parameter("major_count_threshold", 3)
+ self.declare_parameter("escalation_window_s", 10.0)
+ self.declare_parameter("suppression_s", 1.0)
+ self.declare_parameter("reverse_speed_ms", -0.15)
+ self.declare_parameter("reverse_distance_m", 0.30)
+ self.declare_parameter("angular_speed_rads", 0.60)
+ self.declare_parameter("turn_angle_rad", 1.5708)
+ self.declare_parameter("retry_timeout_s", 3.0)
+ self.declare_parameter("clear_hold_s", 0.50)
+ self.declare_parameter("max_retries", 3)
+
+ def _load_params(self) -> dict:
+ g = self.get_parameter
+ return {k: g(k).value for k in [
+ "control_rate", "odom_topic",
+ "forward_scan_angle_rad",
+ "stop_distance_m", "critical_distance_m", "min_cmd_speed_ms",
+ "minor_tilt_rad", "major_tilt_rad", "critical_tilt_rad", "floor_drop_m",
+ "stuck_timeout_s", "jerk_threshold_ms3", "critical_jerk_threshold_ms3",
+ "stopped_ms",
+ "major_count_threshold", "escalation_window_s", "suppression_s",
+ "reverse_speed_ms", "reverse_distance_m",
+ "angular_speed_rads", "turn_angle_rad",
+ "retry_timeout_s", "clear_hold_s", "max_retries",
+ ]}
+
+ # ── Callbacks ─────────────────────────────────────────────────────────────
+
+ def _scan_cb(self, msg: LaserScan) -> None:
+ # Extract minimum range within forward sector (±forward_scan_angle_rad)
+ half = self._scan_forward_angle
+ ranges = []
+ for i, r in enumerate(msg.ranges):
+ angle = msg.angle_min + i * msg.angle_increment
+ if abs(angle) <= half and msg.range_min < r < msg.range_max:
+ ranges.append(r)
+ min_r = min(ranges) if ranges else float("inf")
+ self._latest_obstacle_threat = self._obstacle.update(
+ min_r, self._cmd_speed_ms, time.monotonic()
+ )
+
+ def _imu_cb(self, msg: Imu) -> None:
+ now = time.monotonic()
+ ax = msg.linear_acceleration.x
+ ay = msg.linear_acceleration.y
+ az = msg.linear_acceleration.z
+ pitch, roll = _quaternion_to_pitch_roll(
+ msg.orientation.x, msg.orientation.y,
+ msg.orientation.z, msg.orientation.w,
+ )
+ # dt for jerk is approximated; bump detector handles None on first call
+ dt = 0.02 # nominal 20 Hz
+ self._latest_fall_threat = self._fall.update(pitch, roll, 0.0, now)
+ self._latest_bump_threat = self._bump.update(ax, ay, az, dt, now)
+
+ def _odom_cb(self, msg: Odometry) -> None:
+ self._actual_speed_ms = msg.twist.twist.linear.x
+
+ def _cmd_vel_cb(self, msg: Twist) -> None:
+ self._cmd_speed_ms = msg.linear.x
+
+ def _ack_cb(self, msg: Bool) -> None:
+ if msg.data:
+ self._acknowledge_flag = True
+
+ # ── 20 Hz control loop ────────────────────────────────────────────────────
+
+ def _control_cb(self) -> None:
+ now = time.monotonic()
+ dt = now - self._last_ctrl_t
+ self._last_ctrl_t = now
+
+ stuck_threat = self._stuck.update(
+ self._cmd_speed_ms, self._actual_speed_ms, dt, now
+ )
+
+ threat = highest_threat([
+ self._latest_obstacle_threat,
+ self._latest_fall_threat,
+ self._latest_bump_threat,
+ stuck_threat,
+ ])
+
+ inp = EmergencyInputs(
+ threat=threat,
+ robot_speed_ms=self._actual_speed_ms,
+ acknowledge=self._acknowledge_flag,
+ )
+ self._acknowledge_flag = False
+
+ out = self._fsm.tick(inp)
+
+ if out.state_changed:
+ self.get_logger().info(f"Emergency FSM → {out.state.value}")
+
+ # ── Alert dispatch ────────────────────────────────────────────────────
+ if out.alert is not None:
+ lvl = out.alert.level
+ self.get_logger().warn(out.alert.message)
+ if lvl == AlertLevel.MINOR:
+ self._beep_pub.publish(Empty())
+ elif lvl == AlertLevel.MAJOR:
+ self._flash_pub.publish(Empty())
+ elif lvl == AlertLevel.CRITICAL:
+ self._flash_pub.publish(Empty())
+ self._publish_critical_alert(out.alert.message, threat)
+
+ # ── E-stop ───────────────────────────────────────────────────────────
+ estop_msg = Bool()
+ estop_msg.data = out.e_stop
+ self._estop_pub.publish(estop_msg)
+
+ # ── cmd_vel mux ───────────────────────────────────────────────────────
+ twist = Twist()
+ if out.cmd_override:
+ twist.linear.x = out.cmd_linear
+ twist.angular.z = out.cmd_angular
+ else:
+ twist.linear.x = self._cmd_speed_ms
+ self._cmd_out_pub.publish(twist)
+
+ # ── Status topics ─────────────────────────────────────────────────────
+ if self._event_pub is not None:
+ self._publish_event(out, threat)
+ if self._recovery_pub is not None:
+ self._publish_recovery(out)
+
+ # ── Publishers ────────────────────────────────────────────────────────────
+
+ def _publish_critical_alert(self, message: str, threat: ThreatEvent) -> None:
+ msg = String()
+ msg.data = json.dumps({
+ "severity": "CRITICAL",
+ "threat": threat.threat_type.value,
+ "value": round(threat.value, 3),
+ "detail": threat.detail,
+ "message": message,
+ })
+ self._critical_pub.publish(msg)
+
+ def _publish_event(self, out, threat: ThreatEvent) -> None:
+ msg = EmergencyEvent()
+ msg.stamp = self.get_clock().now().to_msg()
+ msg.state = out.state.value
+ msg.threat_type = threat.threat_type.value
+ msg.severity = threat.level.name
+ msg.threat_value = float(threat.value)
+ msg.detail = threat.detail
+ msg.cmd_override = out.cmd_override
+ self._event_pub.publish(msg)
+
+ def _publish_recovery(self, out) -> None:
+ msg = RecoveryAction()
+ msg.stamp = self.get_clock().now().to_msg()
+ msg.action = self._fsm._recovery.state.value
+ msg.retry_count = out.recovery_retry_count
+ msg.progress = float(out.recovery_progress)
+ self._recovery_pub.publish(msg)
+
+
+# ── Entry point ───────────────────────────────────────────────────────────────
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = EmergencyNode()
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ node.destroy_node()
+ rclpy.try_shutdown()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/recovery_sequencer.py b/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/recovery_sequencer.py
new file mode 100644
index 0000000..94664f0
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/recovery_sequencer.py
@@ -0,0 +1,193 @@
+"""
+recovery_sequencer.py — Reverse + turn recovery FSM for emergency behavior (Issue #169).
+
+Sequence
+────────
+ IDLE → REVERSING → TURNING → RETRYING → (IDLE on success)
+ → (REVERSING on re-threat, retry loop)
+ → (GAVE_UP after max_retries)
+
+ REVERSING : command reverse at reverse_speed_ms until reverse_distance_m covered.
+ TURNING : command angular_speed_rads until turn_angle_rad covered (90°).
+ RETRYING : zero velocity; wait up to retry_timeout_s for threat to clear.
+ If threat clears within clear_hold_s → back to IDLE (success).
+ If timeout without clearance → start another REVERSING cycle.
+ If retry_count >= max_retries → GAVE_UP.
+
+Pure module — no ROS2 dependency.
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from enum import Enum
+
+
+# ── States ────────────────────────────────────────────────────────────────────
+
+class RecoveryState(Enum):
+ IDLE = "IDLE"
+ REVERSING = "REVERSING"
+ TURNING = "TURNING"
+ RETRYING = "RETRYING"
+ GAVE_UP = "GAVE_UP"
+
+
+# ── I/O ───────────────────────────────────────────────────────────────────────
+
+@dataclass
+class RecoveryInputs:
+ trigger: bool = False # True to start (or restart) recovery
+ threat_cleared: bool = False # True when all threats are CLEAR
+ dt: float = 0.02 # time step (s)
+
+
+@dataclass
+class RecoveryOutputs:
+ state: RecoveryState = RecoveryState.IDLE
+ cmd_linear: float = 0.0 # m/s
+ cmd_angular: float = 0.0 # rad/s
+ progress: float = 0.0 # [0, 1] completion of current phase
+ retry_count: int = 0
+ gave_up: bool = False
+ state_changed: bool = False
+
+
+# ── RecoverySequencer ────────────────────────────────────────────────────────
+
+class RecoverySequencer:
+ """
+ Tick-based FSM for executing reverse + turn recovery sequences.
+
+ Parameters
+ ----------
+ reverse_speed_ms : backward speed during REVERSING (m/s; stored as negative)
+ reverse_distance_m: total reverse travel before turning (m)
+ angular_speed_rads: yaw rate during TURNING (rad/s; positive = left)
+ turn_angle_rad : total turn angle before RETRYING (rad; default π/2)
+ retry_timeout_s : max RETRYING time per attempt before next reverse cycle
+ clear_hold_s : consecutive clear time needed to declare success
+ max_retries : maximum reverse+turn attempts before GAVE_UP
+ """
+
+ def __init__(
+ self,
+ reverse_speed_ms: float = -0.15,
+ reverse_distance_m: float = 0.30,
+ angular_speed_rads: float = 0.60,
+ turn_angle_rad: float = 1.5708, # π/2
+ retry_timeout_s: float = 3.0,
+ clear_hold_s: float = 0.5,
+ max_retries: int = 3,
+ ):
+ self._rev_speed = min(0.0, float(reverse_speed_ms)) # ensure negative
+ self._rev_dist = max(0.01, float(reverse_distance_m))
+ self._ang_speed = abs(float(angular_speed_rads))
+ self._turn_angle = max(0.01, float(turn_angle_rad))
+ self._retry_tout = max(0.1, float(retry_timeout_s))
+ self._clear_hold = max(0.0, float(clear_hold_s))
+ self._max_retry = max(1, int(max_retries))
+
+ self._state = RecoveryState.IDLE
+ self._rev_done = 0.0 # distance reversed so far
+ self._turn_done = 0.0 # angle turned so far
+ self._retry_count = 0
+ self._retry_timer = 0.0 # time spent in RETRYING
+ self._clear_timer = 0.0 # consecutive clear time in RETRYING
+
+ # ── Public API ────────────────────────────────────────────────────────────
+
+ @property
+ def state(self) -> RecoveryState:
+ return self._state
+
+ @property
+ def retry_count(self) -> int:
+ return self._retry_count
+
+ def reset(self) -> None:
+ """Return to IDLE and clear all counters."""
+ self._state = RecoveryState.IDLE
+ self._rev_done = 0.0
+ self._turn_done = 0.0
+ self._retry_count = 0
+ self._retry_timer = 0.0
+ self._clear_timer = 0.0
+
+ def tick(self, inputs: RecoveryInputs) -> RecoveryOutputs:
+ prev = self._state
+ out = self._step(inputs)
+ out.state = self._state
+ out.retry_count = self._retry_count
+ out.state_changed = (self._state != prev)
+ if out.state_changed:
+ self._on_enter(self._state)
+ return out
+
+ # ── Internal step ─────────────────────────────────────────────────────────
+
+ def _step(self, inp: RecoveryInputs) -> RecoveryOutputs:
+ out = RecoveryOutputs(state=self._state)
+ dt = max(0.0, inp.dt)
+
+ # ── IDLE ──────────────────────────────────────────────────────────────
+ if self._state == RecoveryState.IDLE:
+ if inp.trigger:
+ self._state = RecoveryState.REVERSING
+
+ # ── REVERSING ─────────────────────────────────────────────────────────
+ elif self._state == RecoveryState.REVERSING:
+ step = abs(self._rev_speed) * dt
+ self._rev_done += step
+ out.cmd_linear = self._rev_speed
+ out.progress = min(1.0, self._rev_done / self._rev_dist)
+ if self._rev_done >= self._rev_dist:
+ self._state = RecoveryState.TURNING
+
+ # ── TURNING ───────────────────────────────────────────────────────────
+ elif self._state == RecoveryState.TURNING:
+ step = self._ang_speed * dt
+ self._turn_done += step
+ out.cmd_angular = self._ang_speed
+ out.progress = min(1.0, self._turn_done / self._turn_angle)
+ if self._turn_done >= self._turn_angle:
+ self._retry_count += 1
+ self._state = RecoveryState.RETRYING
+
+ # ── RETRYING ──────────────────────────────────────────────────────────
+ elif self._state == RecoveryState.RETRYING:
+ self._retry_timer += dt
+
+ if inp.threat_cleared:
+ self._clear_timer += dt
+ if self._clear_timer >= self._clear_hold:
+ # Success — threat has cleared
+ self._state = RecoveryState.IDLE
+ return out
+ else:
+ self._clear_timer = 0.0
+
+ if self._retry_timer >= self._retry_tout:
+ if self._retry_count >= self._max_retry:
+ self._state = RecoveryState.GAVE_UP
+ out.gave_up = True
+ else:
+ self._state = RecoveryState.REVERSING
+
+ # ── GAVE_UP ───────────────────────────────────────────────────────────
+ elif self._state == RecoveryState.GAVE_UP:
+ # Stay in GAVE_UP until external reset()
+ out.gave_up = True
+
+ return out
+
+ # ── Entry actions ─────────────────────────────────────────────────────────
+
+ def _on_enter(self, state: RecoveryState) -> None:
+ if state == RecoveryState.REVERSING:
+ self._rev_done = 0.0
+ elif state == RecoveryState.TURNING:
+ self._turn_done = 0.0
+ elif state == RecoveryState.RETRYING:
+ self._retry_timer = 0.0
+ self._clear_timer = 0.0
diff --git a/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/threat_detector.py b/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/threat_detector.py
new file mode 100644
index 0000000..f0a68d1
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_emergency/saltybot_emergency/threat_detector.py
@@ -0,0 +1,354 @@
+"""
+threat_detector.py — Multi-source threat detection for emergency behavior (Issue #169).
+
+Detectors
+─────────
+ ObstacleDetector : Forward-sector minimum range < stop thresholds at speed.
+ Inputs: min_range_m (pre-filtered from LaserScan forward
+ sector), cmd_speed_ms.
+
+ FallDetector : Extreme pitch/roll from IMU, or depth floor-drop ahead.
+ Inputs: pitch_rad, roll_rad, floor_drop_m (depth-derived;
+ 0.0 if depth unavailable).
+
+ StuckDetector : Commanded speed vs actual speed mismatch sustained for
+ stuck_timeout_s. Tracks elapsed time with dt argument.
+
+ BumpDetector : IMU acceleration jerk (|Δ|a||/dt) above threshold.
+ MAJOR at jerk_threshold_ms3, CRITICAL at
+ critical_jerk_threshold_ms3.
+
+ThreatLevel
+───────────
+ CLEAR : no threat; normal operation
+ MINOR : advisory; log/beep only
+ MAJOR : stop and execute recovery
+ CRITICAL : full shutdown + MQTT escalation
+
+Pure module — no ROS2 dependency.
+"""
+
+from __future__ import annotations
+
+import math
+import time
+from dataclasses import dataclass
+from enum import Enum
+from typing import Optional
+
+
+# ── Enumerations ──────────────────────────────────────────────────────────────
+
+class ThreatLevel(Enum):
+ CLEAR = 0
+ MINOR = 1
+ MAJOR = 2
+ CRITICAL = 3
+
+
+class ThreatType(Enum):
+ NONE = "NONE"
+ OBSTACLE_PROXIMITY = "OBSTACLE_PROXIMITY"
+ FALL_RISK = "FALL_RISK"
+ WHEEL_STUCK = "WHEEL_STUCK"
+ BUMP = "BUMP"
+
+
+# ── ThreatEvent ───────────────────────────────────────────────────────────────
+
+@dataclass
+class ThreatEvent:
+ """Snapshot of a single detected threat."""
+ threat_type: ThreatType = ThreatType.NONE
+ level: ThreatLevel = ThreatLevel.CLEAR
+ value: float = 0.0 # triggering metric
+ detail: str = ""
+ timestamp_s: float = 0.0
+
+ @staticmethod
+ def clear(timestamp_s: float = 0.0) -> "ThreatEvent":
+ return ThreatEvent(timestamp_s=timestamp_s)
+
+
+_CLEAR = ThreatEvent()
+
+
+# ── ObstacleDetector ─────────────────────────────────────────────────────────
+
+class ObstacleDetector:
+ """
+ Obstacle proximity threat from forward-sector minimum range.
+
+ Parameters
+ ----------
+ stop_distance_m : range below which MAJOR is raised (default 0.30 m)
+ critical_distance_m : range below which CRITICAL is raised (default 0.10 m)
+ min_speed_ms : only active above this commanded speed (default 0.05 m/s)
+ """
+
+ def __init__(
+ self,
+ stop_distance_m: float = 0.30,
+ critical_distance_m: float = 0.10,
+ min_speed_ms: float = 0.05,
+ ):
+ self._stop = max(1e-3, stop_distance_m)
+ self._critical = max(1e-3, min(self._stop, critical_distance_m))
+ self._min_spd = abs(min_speed_ms)
+
+ def update(
+ self,
+ min_range_m: float,
+ cmd_speed_ms: float,
+ timestamp_s: float = 0.0,
+ ) -> ThreatEvent:
+ """
+ Parameters
+ ----------
+ min_range_m : minimum obstacle range in forward sector (m)
+ cmd_speed_ms : signed commanded forward speed (m/s)
+ """
+ if abs(cmd_speed_ms) < self._min_spd:
+ return ThreatEvent.clear(timestamp_s)
+
+ if min_range_m <= self._critical:
+ return ThreatEvent(
+ threat_type=ThreatType.OBSTACLE_PROXIMITY,
+ level=ThreatLevel.CRITICAL,
+ value=min_range_m,
+ detail=f"Obstacle {min_range_m:.2f} m (critical zone)",
+ timestamp_s=timestamp_s,
+ )
+ if min_range_m <= self._stop:
+ return ThreatEvent(
+ threat_type=ThreatType.OBSTACLE_PROXIMITY,
+ level=ThreatLevel.MAJOR,
+ value=min_range_m,
+ detail=f"Obstacle {min_range_m:.2f} m ahead",
+ timestamp_s=timestamp_s,
+ )
+ return ThreatEvent.clear(timestamp_s)
+
+
+# ── FallDetector ──────────────────────────────────────────────────────────────
+
+class FallDetector:
+ """
+ Fall / tipping risk from IMU pitch/roll and optional depth floor-drop.
+
+ Parameters
+ ----------
+ minor_tilt_rad : |pitch| or |roll| above which MINOR fires (default 0.20 rad)
+ major_tilt_rad : above which MAJOR fires (default 0.35 rad)
+ critical_tilt_rad : above which CRITICAL fires (default 0.52 rad ≈ 30°)
+ floor_drop_m : depth discontinuity (m) triggering MAJOR (default 0.15 m)
+ """
+
+ def __init__(
+ self,
+ minor_tilt_rad: float = 0.20,
+ major_tilt_rad: float = 0.35,
+ critical_tilt_rad: float = 0.52,
+ floor_drop_m: float = 0.15,
+ ):
+ self._minor = float(minor_tilt_rad)
+ self._major = float(major_tilt_rad)
+ self._critical = float(critical_tilt_rad)
+ self._drop = float(floor_drop_m)
+
+ def update(
+ self,
+ pitch_rad: float,
+ roll_rad: float,
+ floor_drop_m: float = 0.0,
+ timestamp_s: float = 0.0,
+ ) -> ThreatEvent:
+ """
+ Parameters
+ ----------
+ pitch_rad : forward tilt (rad); +ve = nose up
+ roll_rad : lateral tilt (rad); +ve = left side up
+ floor_drop_m : depth discontinuity ahead of robot (m); 0 = not measured
+ """
+ tilt = max(abs(pitch_rad), abs(roll_rad))
+
+ if tilt >= self._critical:
+ return ThreatEvent(
+ threat_type=ThreatType.FALL_RISK,
+ level=ThreatLevel.CRITICAL,
+ value=tilt,
+ detail=f"Critical tilt {math.degrees(tilt):.1f}°",
+ timestamp_s=timestamp_s,
+ )
+ if tilt >= self._major or floor_drop_m >= self._drop:
+ value = max(tilt, floor_drop_m)
+ detail = (
+ f"Floor drop {floor_drop_m:.2f} m" if floor_drop_m >= self._drop
+ else f"Major tilt {math.degrees(tilt):.1f}°"
+ )
+ return ThreatEvent(
+ threat_type=ThreatType.FALL_RISK,
+ level=ThreatLevel.MAJOR,
+ value=value,
+ detail=detail,
+ timestamp_s=timestamp_s,
+ )
+ if tilt >= self._minor:
+ return ThreatEvent(
+ threat_type=ThreatType.FALL_RISK,
+ level=ThreatLevel.MINOR,
+ value=tilt,
+ detail=f"Tilt advisory {math.degrees(tilt):.1f}°",
+ timestamp_s=timestamp_s,
+ )
+ return ThreatEvent.clear(timestamp_s)
+
+
+# ── StuckDetector ─────────────────────────────────────────────────────────────
+
+class StuckDetector:
+ """
+ Wheel stall / stuck detection from cmd_vel vs odometry mismatch.
+
+ Accumulates stuck time while |cmd| > min_cmd_ms AND |actual| < moving_ms.
+ Resets when motion resumes or commanded speed drops below min_cmd_ms.
+
+ Parameters
+ ----------
+ stuck_timeout_s : accumulated stuck time before MAJOR fires (default 3.0 s)
+ min_cmd_ms : minimum commanded speed to consider stalling (0.05 m/s)
+ moving_threshold_ms : actual speed above which robot is considered moving
+ """
+
+ def __init__(
+ self,
+ stuck_timeout_s: float = 3.0,
+ min_cmd_ms: float = 0.05,
+ moving_threshold_ms: float = 0.05,
+ ):
+ self._timeout = max(0.1, stuck_timeout_s)
+ self._min_cmd = abs(min_cmd_ms)
+ self._moving = abs(moving_threshold_ms)
+ self._stuck_time: float = 0.0
+
+ @property
+ def stuck_time(self) -> float:
+ """Accumulated stuck duration (s)."""
+ return self._stuck_time
+
+ def update(
+ self,
+ cmd_speed_ms: float,
+ actual_speed_ms: float,
+ dt: float,
+ timestamp_s: float = 0.0,
+ ) -> ThreatEvent:
+ """
+ Parameters
+ ----------
+ cmd_speed_ms : commanded forward speed (m/s)
+ actual_speed_ms : measured forward speed from odometry (m/s)
+ dt : elapsed time since last call (s)
+ """
+ commanding = abs(cmd_speed_ms) >= self._min_cmd
+ moving = abs(actual_speed_ms) >= self._moving
+
+ if not commanding or moving:
+ self._stuck_time = 0.0
+ return ThreatEvent.clear(timestamp_s)
+
+ self._stuck_time += max(0.0, dt)
+
+ if self._stuck_time >= self._timeout:
+ return ThreatEvent(
+ threat_type=ThreatType.WHEEL_STUCK,
+ level=ThreatLevel.MAJOR,
+ value=self._stuck_time,
+ detail=f"Wheels stuck for {self._stuck_time:.1f} s",
+ timestamp_s=timestamp_s,
+ )
+ return ThreatEvent.clear(timestamp_s)
+
+ def reset(self) -> None:
+ self._stuck_time = 0.0
+
+
+# ── BumpDetector ─────────────────────────────────────────────────────────────
+
+class BumpDetector:
+ """
+ Collision / bump detection via IMU acceleration jerk.
+
+ Jerk = |Δ|a|| / dt where |a| = sqrt(ax²+ay²+az²) − g (gravity removed)
+
+ Parameters
+ ----------
+ jerk_threshold_ms3 : MAJOR at jerk above this (m/s³, default 8.0)
+ critical_jerk_threshold_ms3 : CRITICAL at jerk above this (m/s³, default 25.0)
+ gravity_ms2 : gravity magnitude to subtract (default 9.81)
+ """
+
+ def __init__(
+ self,
+ jerk_threshold_ms3: float = 8.0,
+ critical_jerk_threshold_ms3: float = 25.0,
+ gravity_ms2: float = 9.81,
+ ):
+ self._jerk_major = float(jerk_threshold_ms3)
+ self._jerk_critical = float(critical_jerk_threshold_ms3)
+ self._gravity = float(gravity_ms2)
+ self._prev_dyn_mag: Optional[float] = None # previous |a_dynamic|
+
+ def update(
+ self,
+ ax: float,
+ ay: float,
+ az: float,
+ dt: float,
+ timestamp_s: float = 0.0,
+ ) -> ThreatEvent:
+ """
+ Parameters
+ ----------
+ ax, ay, az : IMU linear acceleration (m/s²)
+ dt : elapsed time since last call (s)
+ """
+ raw_mag = math.sqrt(ax * ax + ay * ay + az * az)
+ dyn_mag = abs(raw_mag - self._gravity) # remove gravity component
+
+ if self._prev_dyn_mag is None or dt <= 0.0:
+ self._prev_dyn_mag = dyn_mag
+ return ThreatEvent.clear(timestamp_s)
+
+ jerk = abs(dyn_mag - self._prev_dyn_mag) / dt
+ self._prev_dyn_mag = dyn_mag
+
+ if jerk >= self._jerk_critical:
+ return ThreatEvent(
+ threat_type=ThreatType.BUMP,
+ level=ThreatLevel.CRITICAL,
+ value=jerk,
+ detail=f"Critical impact: jerk {jerk:.1f} m/s³",
+ timestamp_s=timestamp_s,
+ )
+ if jerk >= self._jerk_major:
+ return ThreatEvent(
+ threat_type=ThreatType.BUMP,
+ level=ThreatLevel.MAJOR,
+ value=jerk,
+ detail=f"Bump detected: jerk {jerk:.1f} m/s³",
+ timestamp_s=timestamp_s,
+ )
+ return ThreatEvent.clear(timestamp_s)
+
+ def reset(self) -> None:
+ self._prev_dyn_mag = None
+
+
+# ── Utility: pick highest-severity threat ─────────────────────────────────────
+
+def highest_threat(events: list[ThreatEvent]) -> ThreatEvent:
+ """Return the ThreatEvent with the highest ThreatLevel from a list."""
+ if not events:
+ return _CLEAR
+ return max(events, key=lambda e: e.level.value)
diff --git a/jetson/ros2_ws/src/saltybot_emergency/setup.cfg b/jetson/ros2_ws/src/saltybot_emergency/setup.cfg
new file mode 100644
index 0000000..5d1b488
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_emergency/setup.cfg
@@ -0,0 +1,5 @@
+[develop]
+script_dir=$base/lib/saltybot_emergency
+
+[install]
+install_scripts=$base/lib/saltybot_emergency
diff --git a/jetson/ros2_ws/src/saltybot_emergency/setup.py b/jetson/ros2_ws/src/saltybot_emergency/setup.py
new file mode 100644
index 0000000..3427e55
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_emergency/setup.py
@@ -0,0 +1,32 @@
+from setuptools import setup, find_packages
+import os
+from glob import glob
+
+package_name = "saltybot_emergency"
+
+setup(
+ name=package_name,
+ version="0.1.0",
+ packages=find_packages(exclude=["test"]),
+ data_files=[
+ ("share/ament_index/resource_index/packages",
+ [f"resource/{package_name}"]),
+ (f"share/{package_name}", ["package.xml"]),
+ (os.path.join("share", package_name, "config"),
+ glob("config/*.yaml")),
+ (os.path.join("share", package_name, "launch"),
+ glob("launch/*.py")),
+ ],
+ install_requires=["setuptools"],
+ zip_safe=True,
+ maintainer="sl-controls",
+ maintainer_email="sl-controls@saltylab.local",
+ description="Emergency behavior system — collision avoidance, fall prevention, stuck detection, recovery",
+ license="MIT",
+ tests_require=["pytest"],
+ entry_points={
+ "console_scripts": [
+ f"emergency_node = {package_name}.emergency_node:main",
+ ],
+ },
+)
diff --git a/jetson/ros2_ws/src/saltybot_emergency/test/test_emergency.py b/jetson/ros2_ws/src/saltybot_emergency/test/test_emergency.py
new file mode 100644
index 0000000..ad76f66
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_emergency/test/test_emergency.py
@@ -0,0 +1,560 @@
+"""
+test_emergency.py — Unit tests for Issue #169 emergency behavior modules.
+
+Covers:
+ ObstacleDetector — proximity thresholds, speed gate
+ FallDetector — tilt levels, floor drop
+ StuckDetector — timeout accumulation, reset on motion
+ BumpDetector — jerk thresholds, first-call safety
+ AlertManager — severity mapping, escalation, suppression
+ RecoverySequencer — full sequence, retry, give-up
+ EmergencyFSM — all state transitions and guard conditions
+"""
+
+import math
+import sys
+import os
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
+
+import pytest
+
+from saltybot_emergency.threat_detector import (
+ BumpDetector,
+ FallDetector,
+ ObstacleDetector,
+ StuckDetector,
+ ThreatEvent,
+ ThreatLevel,
+ ThreatType,
+ highest_threat,
+)
+from saltybot_emergency.alert_manager import Alert, AlertLevel, AlertManager
+from saltybot_emergency.recovery_sequencer import (
+ RecoveryInputs,
+ RecoverySequencer,
+ RecoveryState,
+)
+from saltybot_emergency.emergency_fsm import (
+ EmergencyFSM,
+ EmergencyInputs,
+ EmergencyState,
+)
+
+
+# ── Helpers ───────────────────────────────────────────────────────────────────
+
+def _obs(**kw):
+ d = dict(stop_distance_m=0.30, critical_distance_m=0.10, min_speed_ms=0.05)
+ d.update(kw)
+ return ObstacleDetector(**d)
+
+def _fall(**kw):
+ d = dict(minor_tilt_rad=0.20, major_tilt_rad=0.35,
+ critical_tilt_rad=0.52, floor_drop_m=0.15)
+ d.update(kw)
+ return FallDetector(**d)
+
+def _stuck(**kw):
+ d = dict(stuck_timeout_s=3.0, min_cmd_ms=0.05, moving_threshold_ms=0.05)
+ d.update(kw)
+ return StuckDetector(**d)
+
+def _bump(**kw):
+ d = dict(jerk_threshold_ms3=8.0, critical_jerk_threshold_ms3=25.0)
+ d.update(kw)
+ return BumpDetector(**d)
+
+def _alert_mgr(**kw):
+ d = dict(major_count_threshold=3, escalation_window_s=10.0, suppression_s=1.0)
+ d.update(kw)
+ return AlertManager(**d)
+
+def _seq(**kw):
+ d = dict(
+ reverse_speed_ms=-0.15, reverse_distance_m=0.30,
+ angular_speed_rads=0.60, turn_angle_rad=1.5708,
+ retry_timeout_s=3.0, clear_hold_s=0.5, max_retries=3,
+ )
+ d.update(kw)
+ return RecoverySequencer(**d)
+
+def _fsm(**kw):
+ d = dict(
+ stopped_ms=0.03, major_count_threshold=3, escalation_window_s=10.0,
+ suppression_s=0.0, # disable suppression for cleaner tests
+ reverse_speed_ms=-0.15, reverse_distance_m=0.30,
+ angular_speed_rads=0.60, turn_angle_rad=1.5708,
+ retry_timeout_s=3.0, clear_hold_s=0.5, max_retries=3,
+ )
+ d.update(kw)
+ return EmergencyFSM(**d)
+
+def _major_threat(threat_type=ThreatType.OBSTACLE_PROXIMITY, ts=0.0):
+ return ThreatEvent(threat_type=threat_type, level=ThreatLevel.MAJOR,
+ value=1.0, detail="test", timestamp_s=ts)
+
+def _critical_threat(ts=0.0):
+ return ThreatEvent(threat_type=ThreatType.OBSTACLE_PROXIMITY,
+ level=ThreatLevel.CRITICAL, value=0.05,
+ detail="critical test", timestamp_s=ts)
+
+def _minor_threat(ts=0.0):
+ return ThreatEvent(threat_type=ThreatType.FALL_RISK, level=ThreatLevel.MINOR,
+ value=0.21, detail="tilt", timestamp_s=ts)
+
+def _clear_threat():
+ return ThreatEvent.clear()
+
+def _inp(threat=None, speed=0.0, ack=False):
+ return EmergencyInputs(
+ threat=threat or _clear_threat(),
+ robot_speed_ms=speed,
+ acknowledge=ack,
+ )
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# ObstacleDetector
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestObstacleDetector:
+
+ def test_clear_when_far(self):
+ ev = _obs().update(0.5, 0.3)
+ assert ev.level == ThreatLevel.CLEAR
+
+ def test_major_within_stop_distance(self):
+ ev = _obs(stop_distance_m=0.30).update(0.25, 0.3)
+ assert ev.level == ThreatLevel.MAJOR
+ assert ev.threat_type == ThreatType.OBSTACLE_PROXIMITY
+
+ def test_critical_within_critical_distance(self):
+ ev = _obs(critical_distance_m=0.10).update(0.05, 0.3)
+ assert ev.level == ThreatLevel.CRITICAL
+
+ def test_clear_when_stopped(self):
+ """Obstacle detection suppressed when not moving."""
+ ev = _obs(min_speed_ms=0.05).update(0.05, 0.01)
+ assert ev.level == ThreatLevel.CLEAR
+
+ def test_active_at_min_speed(self):
+ ev = _obs(min_speed_ms=0.05).update(0.20, 0.06)
+ assert ev.level == ThreatLevel.MAJOR
+
+ def test_value_is_distance(self):
+ ev = _obs().update(0.20, 0.3)
+ assert ev.value == pytest.approx(0.20, abs=1e-9)
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# FallDetector
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestFallDetector:
+
+ def test_clear_on_flat(self):
+ ev = _fall().update(0.0, 0.0)
+ assert ev.level == ThreatLevel.CLEAR
+
+ def test_minor_moderate_tilt(self):
+ ev = _fall(minor_tilt_rad=0.20, major_tilt_rad=0.35).update(0.25, 0.0)
+ assert ev.level == ThreatLevel.MINOR
+
+ def test_major_high_tilt(self):
+ ev = _fall(major_tilt_rad=0.35, critical_tilt_rad=0.52).update(0.40, 0.0)
+ assert ev.level == ThreatLevel.MAJOR
+
+ def test_critical_extreme_tilt(self):
+ ev = _fall(critical_tilt_rad=0.52).update(0.60, 0.0)
+ assert ev.level == ThreatLevel.CRITICAL
+
+ def test_major_on_floor_drop(self):
+ ev = _fall(floor_drop_m=0.15).update(0.0, 0.0, floor_drop_m=0.20)
+ assert ev.level == ThreatLevel.MAJOR
+ assert "drop" in ev.detail.lower()
+
+ def test_roll_triggers_same_as_pitch(self):
+ """Roll beyond minor threshold also fires."""
+ ev = _fall(minor_tilt_rad=0.20).update(0.0, 0.25)
+ assert ev.level == ThreatLevel.MINOR
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# StuckDetector
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestStuckDetector:
+
+ def test_clear_when_not_commanded(self):
+ s = _stuck(stuck_timeout_s=1.0, min_cmd_ms=0.05)
+ ev = s.update(0.01, 0.0, dt=1.0) # cmd below threshold
+ assert ev.level == ThreatLevel.CLEAR
+
+ def test_clear_when_moving(self):
+ s = _stuck(stuck_timeout_s=1.0)
+ ev = s.update(0.2, 0.2, dt=1.0) # actually moving
+ assert ev.level == ThreatLevel.CLEAR
+
+ def test_major_after_timeout(self):
+ s = _stuck(stuck_timeout_s=3.0, min_cmd_ms=0.05, moving_threshold_ms=0.05)
+ for _ in range(6):
+ ev = s.update(0.2, 0.0, dt=0.5) # cmd=0.2, actual=0 → stuck
+ assert ev.level == ThreatLevel.MAJOR
+
+ def test_no_major_before_timeout(self):
+ s = _stuck(stuck_timeout_s=3.0)
+ ev = s.update(0.2, 0.0, dt=1.0) # only 1s — not yet
+ assert ev.level == ThreatLevel.CLEAR
+
+ def test_reset_on_motion_resume(self):
+ s = _stuck(stuck_timeout_s=1.0)
+ s.update(0.2, 0.0, dt=0.8) # accumulate stuck time
+ s.update(0.2, 0.3, dt=0.1) # motion resumes → reset
+ ev = s.update(0.2, 0.0, dt=0.3) # only 0.3s since reset → still clear
+ assert ev.level == ThreatLevel.CLEAR
+
+ def test_stuck_time_property(self):
+ s = _stuck(stuck_timeout_s=5.0)
+ s.update(0.2, 0.0, dt=1.0)
+ s.update(0.2, 0.0, dt=1.0)
+ assert s.stuck_time == pytest.approx(2.0, abs=1e-6)
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# BumpDetector
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestBumpDetector:
+
+ def test_clear_on_first_call(self):
+ """No jerk on first sample (no previous value)."""
+ ev = _bump().update(0.0, 0.0, 9.81, dt=0.05)
+ assert ev.level == ThreatLevel.CLEAR
+
+ def test_major_on_jerk(self):
+ b = _bump(jerk_threshold_ms3=5.0, critical_jerk_threshold_ms3=20.0)
+ b.update(0.0, 0.0, 9.81, dt=0.05) # seed → dyn_mag = 0
+ # ax=4.5: raw≈10.79, dyn≈0.98, jerk≈9.8 m/s³ → MAJOR (5.0 < 9.8 < 20.0)
+ ev = b.update(4.5, 0.0, 9.81, dt=0.1)
+ assert ev.level == ThreatLevel.MAJOR
+
+ def test_critical_on_severe_jerk(self):
+ b = _bump(jerk_threshold_ms3=5.0, critical_jerk_threshold_ms3=20.0)
+ b.update(0.0, 0.0, 9.81, dt=0.05)
+ # Very large spike
+ ev = b.update(50.0, 0.0, 9.81, dt=0.1)
+ assert ev.level == ThreatLevel.CRITICAL
+
+ def test_clear_on_gentle_acceleration(self):
+ b = _bump(jerk_threshold_ms3=8.0)
+ b.update(0.0, 0.0, 9.81, dt=0.05)
+ ev = b.update(0.1, 0.0, 9.81, dt=0.05) # tiny change
+ assert ev.level == ThreatLevel.CLEAR
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# highest_threat
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestHighestThreat:
+
+ def test_empty_returns_clear(self):
+ assert highest_threat([]).level == ThreatLevel.CLEAR
+
+ def test_picks_highest(self):
+ a = ThreatEvent(level=ThreatLevel.MINOR)
+ b = ThreatEvent(level=ThreatLevel.CRITICAL)
+ c = ThreatEvent(level=ThreatLevel.MAJOR)
+ assert highest_threat([a, b, c]).level == ThreatLevel.CRITICAL
+
+ def test_single_item(self):
+ ev = ThreatEvent(level=ThreatLevel.MAJOR)
+ assert highest_threat([ev]) is ev
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# AlertManager
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestAlertManager:
+
+ def test_clear_returns_none(self):
+ am = _alert_mgr()
+ assert am.update(_clear_threat()) is None
+
+ def test_minor_threat_gives_minor_alert(self):
+ am = _alert_mgr(suppression_s=0.0)
+ alert = am.update(_minor_threat(ts=0.0))
+ assert alert is not None
+ assert alert.level == AlertLevel.MINOR
+
+ def test_major_threat_gives_major_alert(self):
+ am = _alert_mgr(suppression_s=0.0)
+ alert = am.update(_major_threat(ts=0.0))
+ assert alert is not None
+ assert alert.level == AlertLevel.MAJOR
+
+ def test_critical_threat_gives_critical_alert(self):
+ am = _alert_mgr(suppression_s=0.0)
+ alert = am.update(_critical_threat(ts=0.0))
+ assert alert is not None
+ assert alert.level == AlertLevel.CRITICAL
+
+ def test_suppression_blocks_duplicate(self):
+ am = _alert_mgr(suppression_s=5.0)
+ am.update(_major_threat(ts=0.0))
+ alert = am.update(_major_threat(ts=1.0)) # within 5s window
+ assert alert is None
+
+ def test_suppression_expires(self):
+ am = _alert_mgr(suppression_s=2.0)
+ am.update(_major_threat(ts=0.0))
+ alert = am.update(_major_threat(ts=3.0)) # after 2s window
+ assert alert is not None
+
+ def test_escalation_major_to_critical(self):
+ """After major_count_threshold major alerts, next one becomes CRITICAL."""
+ am = _alert_mgr(major_count_threshold=3, escalation_window_s=60.0,
+ suppression_s=0.0)
+ for i in range(3):
+ am.update(_major_threat(ts=float(i)))
+ # 4th should be escalated
+ alert = am.update(_major_threat(ts=4.0))
+ assert alert is not None
+ assert alert.level == AlertLevel.CRITICAL
+
+ def test_escalation_resets_after_window(self):
+ """Major alerts outside the window don't contribute to escalation."""
+ am = _alert_mgr(major_count_threshold=3, escalation_window_s=5.0,
+ suppression_s=0.0)
+ am.update(_major_threat(ts=0.0))
+ am.update(_major_threat(ts=1.0))
+ am.update(_major_threat(ts=2.0))
+ # All 3 are old; new one at t=10 is outside window
+ alert = am.update(_major_threat(ts=10.0))
+ assert alert is not None
+ assert alert.level == AlertLevel.MAJOR # not escalated
+
+ def test_reset_clears_escalation_state(self):
+ am = _alert_mgr(major_count_threshold=2, suppression_s=0.0)
+ am.update(_major_threat(ts=0.0))
+ am.update(_major_threat(ts=1.0)) # now at threshold
+ am.reset()
+ alert = am.update(_major_threat(ts=2.0))
+ assert alert.level == AlertLevel.MAJOR # back to major after reset
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# RecoverySequencer
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestRecoverySequencer:
+
+ def _trigger(self, seq):
+ return seq.tick(RecoveryInputs(trigger=True, dt=0.02))
+
+ def test_idle_on_init(self):
+ seq = _seq()
+ assert seq.state == RecoveryState.IDLE
+
+ def test_trigger_starts_reversing(self):
+ seq = _seq()
+ out = self._trigger(seq)
+ assert seq.state == RecoveryState.REVERSING
+
+ def test_reversing_backward_velocity(self):
+ seq = _seq(reverse_speed_ms=-0.15)
+ self._trigger(seq)
+ out = seq.tick(RecoveryInputs(dt=0.02))
+ assert out.cmd_linear < 0.0
+
+ def test_reversing_completes_to_turning(self):
+ seq = _seq(reverse_speed_ms=-1.0, reverse_distance_m=0.5)
+ self._trigger(seq)
+ for _ in range(30):
+ out = seq.tick(RecoveryInputs(dt=0.02))
+ assert seq.state == RecoveryState.TURNING
+
+ def test_turning_positive_angular(self):
+ seq = _seq(reverse_speed_ms=-1.0, reverse_distance_m=0.1,
+ angular_speed_rads=1.0)
+ self._trigger(seq)
+ # Skip through reversing quickly
+ for _ in range(20):
+ seq.tick(RecoveryInputs(dt=0.02))
+ if seq.state == RecoveryState.TURNING:
+ out = seq.tick(RecoveryInputs(dt=0.02))
+ assert out.cmd_angular > 0.0
+
+ def test_retrying_increments_count(self):
+ seq = _seq(reverse_speed_ms=-1.0, reverse_distance_m=0.05,
+ angular_speed_rads=10.0, turn_angle_rad=0.1)
+ self._trigger(seq)
+ for _ in range(100):
+ seq.tick(RecoveryInputs(dt=0.02))
+ assert seq.state == RecoveryState.RETRYING
+ assert seq.retry_count == 1
+
+ def test_threat_cleared_returns_idle(self):
+ seq = _seq(reverse_speed_ms=-1.0, reverse_distance_m=0.05,
+ angular_speed_rads=10.0, turn_angle_rad=0.1,
+ clear_hold_s=0.1)
+ self._trigger(seq)
+ # Fast-forward to RETRYING
+ for _ in range(100):
+ seq.tick(RecoveryInputs(dt=0.02))
+ assert seq.state == RecoveryState.RETRYING
+ # Feed cleared ticks until clear_hold met
+ for _ in range(20):
+ seq.tick(RecoveryInputs(threat_cleared=True, dt=0.02))
+ assert seq.state == RecoveryState.IDLE
+
+ def test_max_retries_gives_up(self):
+ seq = _seq(reverse_speed_ms=-1.0, reverse_distance_m=0.05,
+ angular_speed_rads=10.0, turn_angle_rad=0.1,
+ retry_timeout_s=0.1, max_retries=2)
+ self._trigger(seq)
+ for _ in range(500):
+ out = seq.tick(RecoveryInputs(threat_cleared=False, dt=0.05))
+ if seq.state == RecoveryState.GAVE_UP:
+ break
+ assert seq.state == RecoveryState.GAVE_UP
+
+ def test_reset_returns_to_idle(self):
+ seq = _seq()
+ self._trigger(seq)
+ seq.reset()
+ assert seq.state == RecoveryState.IDLE
+ assert seq.retry_count == 0
+
+
+# ══════════════════════════════════════════════════════════════════════════════
+# EmergencyFSM
+# ══════════════════════════════════════════════════════════════════════════════
+
+class TestEmergencyFSMBasic:
+
+ def test_initial_state_nominal(self):
+ fsm = _fsm()
+ assert fsm.state == EmergencyState.NOMINAL
+
+ def test_nominal_stays_on_clear(self):
+ fsm = _fsm()
+ out = fsm.tick(_inp())
+ assert fsm.state == EmergencyState.NOMINAL
+ assert out.cmd_override is False
+
+ def test_minor_alert_no_override(self):
+ fsm = _fsm()
+ out = fsm.tick(_inp(_minor_threat(ts=0.0)))
+ assert fsm.state == EmergencyState.NOMINAL
+ assert out.cmd_override is False
+ assert out.alert is not None
+ assert out.alert.level == AlertLevel.MINOR
+
+ def test_major_threat_enters_stopping(self):
+ fsm = _fsm()
+ out = fsm.tick(_inp(_major_threat()))
+ assert fsm.state == EmergencyState.STOPPING
+ assert out.cmd_override is True
+
+ def test_critical_threat_enters_stopping_critical_pending(self):
+ fsm = _fsm()
+ fsm.tick(_inp(_critical_threat()))
+ assert fsm.state == EmergencyState.STOPPING
+ assert fsm._critical_pending is True
+
+
+class TestEmergencyFSMStopping:
+
+ def test_stopping_commands_zero(self):
+ fsm = _fsm()
+ fsm.tick(_inp(_major_threat()))
+ out = fsm.tick(_inp(_major_threat(), speed=0.5))
+ assert out.cmd_linear == pytest.approx(0.0, abs=1e-9)
+ assert out.cmd_angular == pytest.approx(0.0, abs=1e-9)
+
+ def test_stopped_enters_recovering(self):
+ fsm = _fsm(stopped_ms=0.03)
+ fsm.tick(_inp(_major_threat()))
+ out = fsm.tick(_inp(_major_threat(), speed=0.01)) # below stopped_ms
+ assert fsm.state == EmergencyState.RECOVERING
+
+ def test_critical_pending_enters_escalated(self):
+ fsm = _fsm(stopped_ms=0.03)
+ fsm.tick(_inp(_critical_threat()))
+ fsm.tick(_inp(_critical_threat(), speed=0.01)) # stopped → ESCALATED
+ assert fsm.state == EmergencyState.ESCALATED
+
+
+class TestEmergencyFSMRecovering:
+
+ def _reach_recovering(self, fsm):
+ fsm.tick(_inp(_major_threat()))
+ fsm.tick(_inp(_major_threat(), speed=0.0)) # stopped → RECOVERING
+ assert fsm.state == EmergencyState.RECOVERING
+
+ def test_recovering_has_cmd_override(self):
+ fsm = _fsm()
+ self._reach_recovering(fsm)
+ out = fsm.tick(_inp(_clear_threat()))
+ assert out.cmd_override is True
+
+ def test_recovering_gave_up_escalates(self):
+ fsm = _fsm(max_retries=1, retry_timeout_s=0.05)
+ self._reach_recovering(fsm)
+ # Drive recovery to GAVE_UP by feeding many non-clearing ticks
+ for _ in range(500):
+ out = fsm.tick(_inp(_major_threat()))
+ if fsm.state == EmergencyState.ESCALATED:
+ break
+ assert fsm.state == EmergencyState.ESCALATED
+
+
+class TestEmergencyFSMEscalated:
+
+ def _reach_escalated(self, fsm):
+ fsm.tick(_inp(_critical_threat()))
+ fsm.tick(_inp(_critical_threat(), speed=0.0))
+ assert fsm.state == EmergencyState.ESCALATED
+
+ def test_escalated_emits_critical_alert_once(self):
+ fsm = _fsm()
+ self._reach_escalated(fsm)
+ out1 = fsm.tick(_inp())
+ out2 = fsm.tick(_inp())
+ assert out1.alert is not None
+ assert out1.alert.level == AlertLevel.CRITICAL
+ assert out2.alert is None # suppressed after first emission
+
+ def test_escalated_e_stop_asserted(self):
+ fsm = _fsm()
+ self._reach_escalated(fsm)
+ out = fsm.tick(_inp())
+ assert out.e_stop is True
+
+ def test_escalated_stays_without_ack(self):
+ fsm = _fsm()
+ self._reach_escalated(fsm)
+ for _ in range(5):
+ fsm.tick(_inp())
+ assert fsm.state == EmergencyState.ESCALATED
+
+ def test_acknowledge_returns_to_nominal(self):
+ fsm = _fsm()
+ self._reach_escalated(fsm)
+ fsm.tick(_inp(ack=True))
+ assert fsm.state == EmergencyState.NOMINAL
+
+ def test_reset_returns_to_nominal(self):
+ fsm = _fsm()
+ self._reach_escalated(fsm)
+ fsm.reset()
+ assert fsm.state == EmergencyState.NOMINAL
+
+ def test_e_stop_cleared_on_ack(self):
+ fsm = _fsm()
+ self._reach_escalated(fsm)
+ out = fsm.tick(_inp(ack=True))
+ assert out.e_stop is False
diff --git a/jetson/ros2_ws/src/saltybot_emergency_msgs/CMakeLists.txt b/jetson/ros2_ws/src/saltybot_emergency_msgs/CMakeLists.txt
new file mode 100644
index 0000000..49fc619
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_emergency_msgs/CMakeLists.txt
@@ -0,0 +1,15 @@
+cmake_minimum_required(VERSION 3.8)
+project(saltybot_emergency_msgs)
+
+find_package(ament_cmake REQUIRED)
+find_package(rosidl_default_generators REQUIRED)
+find_package(builtin_interfaces REQUIRED)
+
+rosidl_generate_interfaces(${PROJECT_NAME}
+ "msg/EmergencyEvent.msg"
+ "msg/RecoveryAction.msg"
+ DEPENDENCIES builtin_interfaces
+)
+
+ament_export_dependencies(rosidl_default_runtime)
+ament_package()
diff --git a/jetson/ros2_ws/src/saltybot_emergency_msgs/msg/EmergencyEvent.msg b/jetson/ros2_ws/src/saltybot_emergency_msgs/msg/EmergencyEvent.msg
new file mode 100644
index 0000000..32fd9ef
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_emergency_msgs/msg/EmergencyEvent.msg
@@ -0,0 +1,25 @@
+# EmergencyEvent.msg — Real-time emergency system state snapshot (Issue #169)
+# Published by: /saltybot/emergency_node
+# Topic: /saltybot/emergency
+
+builtin_interfaces/Time stamp
+
+# Overall FSM state
+# Values: "NOMINAL" | "STOPPING" | "RECOVERING" | "ESCALATED"
+string state
+
+# Active threat (highest severity across all detectors)
+# threat_type values: "NONE" | "OBSTACLE_PROXIMITY" | "FALL_RISK" | "WHEEL_STUCK" | "BUMP"
+string threat_type
+
+# Severity: "CLEAR" | "MINOR" | "MAJOR" | "CRITICAL"
+string severity
+
+# Triggering metric value (e.g. distance in m, jerk in m/s³, stuck seconds)
+float32 threat_value
+
+# Human-readable description of the active threat
+string detail
+
+# True when emergency system is overriding normal cmd_vel with its own commands
+bool cmd_override
diff --git a/jetson/ros2_ws/src/saltybot_emergency_msgs/msg/RecoveryAction.msg b/jetson/ros2_ws/src/saltybot_emergency_msgs/msg/RecoveryAction.msg
new file mode 100644
index 0000000..a0a0b50
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_emergency_msgs/msg/RecoveryAction.msg
@@ -0,0 +1,15 @@
+# RecoveryAction.msg — Recovery sequencer state (Issue #169)
+# Published by: /saltybot/emergency_node
+# Topic: /saltybot/recovery_action
+
+builtin_interfaces/Time stamp
+
+# Current recovery action
+# Values: "IDLE" | "REVERSING" | "TURNING" | "RETRYING" | "GAVE_UP"
+string action
+
+# Number of reverse+turn attempts completed so far
+int32 retry_count
+
+# Progress through current phase [0.0 – 1.0]
+float32 progress
diff --git a/jetson/ros2_ws/src/saltybot_emergency_msgs/package.xml b/jetson/ros2_ws/src/saltybot_emergency_msgs/package.xml
new file mode 100644
index 0000000..c2122b3
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_emergency_msgs/package.xml
@@ -0,0 +1,22 @@
+
+
+
+ saltybot_emergency_msgs
+ 0.1.0
+ Emergency behavior message definitions for SaltyBot (Issue #169)
+ sl-controls
+ MIT
+
+ ament_cmake
+ rosidl_default_generators
+
+ builtin_interfaces
+
+ rosidl_default_runtime
+
+ rosidl_interface_packages
+
+
+ ament_cmake
+
+