Two new packages: - saltybot_emergency_msgs: EmergencyEvent.msg, RecoveryAction.msg - saltybot_emergency: threat_detector, alert_manager, recovery_sequencer, emergency_fsm, emergency_node Implements: - ObstacleDetector: MAJOR <30cm, CRITICAL <10cm; suppressed when not moving - FallDetector: MINOR/MAJOR/CRITICAL tilt thresholds; floor-drop edge detection - StuckDetector: MAJOR after 3s wheel stall (cmd>threshold, actual~0) - BumpDetector: jerk = |Δ|a||/dt with gravity removal; MAJOR/CRITICAL thresholds - AlertManager: per-(type,level) suppression; MAJOR×N within window → CRITICAL escalation - RecoverySequencer: REVERSING→TURNING→RETRYING FSM; max_retries before GAVE_UP - EmergencyFSM: NOMINAL→STOPPING→RECOVERING→ESCALATED; acknowledge to clear - EmergencyNode: 20Hz ROS2 node; /saltybot/emergency, /saltybot/e_stop, cmd_vel mux 59/59 tests passing. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
561 lines
22 KiB
Python
561 lines
22 KiB
Python
"""
|
|
test_emergency.py — Unit tests for Issue #169 emergency behavior modules.
|
|
|
|
Covers:
|
|
ObstacleDetector — proximity thresholds, speed gate
|
|
FallDetector — tilt levels, floor drop
|
|
StuckDetector — timeout accumulation, reset on motion
|
|
BumpDetector — jerk thresholds, first-call safety
|
|
AlertManager — severity mapping, escalation, suppression
|
|
RecoverySequencer — full sequence, retry, give-up
|
|
EmergencyFSM — all state transitions and guard conditions
|
|
"""
|
|
|
|
import math
|
|
import sys
|
|
import os
|
|
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
|
|
import pytest
|
|
|
|
from saltybot_emergency.threat_detector import (
|
|
BumpDetector,
|
|
FallDetector,
|
|
ObstacleDetector,
|
|
StuckDetector,
|
|
ThreatEvent,
|
|
ThreatLevel,
|
|
ThreatType,
|
|
highest_threat,
|
|
)
|
|
from saltybot_emergency.alert_manager import Alert, AlertLevel, AlertManager
|
|
from saltybot_emergency.recovery_sequencer import (
|
|
RecoveryInputs,
|
|
RecoverySequencer,
|
|
RecoveryState,
|
|
)
|
|
from saltybot_emergency.emergency_fsm import (
|
|
EmergencyFSM,
|
|
EmergencyInputs,
|
|
EmergencyState,
|
|
)
|
|
|
|
|
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
def _obs(**kw):
|
|
d = dict(stop_distance_m=0.30, critical_distance_m=0.10, min_speed_ms=0.05)
|
|
d.update(kw)
|
|
return ObstacleDetector(**d)
|
|
|
|
def _fall(**kw):
|
|
d = dict(minor_tilt_rad=0.20, major_tilt_rad=0.35,
|
|
critical_tilt_rad=0.52, floor_drop_m=0.15)
|
|
d.update(kw)
|
|
return FallDetector(**d)
|
|
|
|
def _stuck(**kw):
|
|
d = dict(stuck_timeout_s=3.0, min_cmd_ms=0.05, moving_threshold_ms=0.05)
|
|
d.update(kw)
|
|
return StuckDetector(**d)
|
|
|
|
def _bump(**kw):
|
|
d = dict(jerk_threshold_ms3=8.0, critical_jerk_threshold_ms3=25.0)
|
|
d.update(kw)
|
|
return BumpDetector(**d)
|
|
|
|
def _alert_mgr(**kw):
|
|
d = dict(major_count_threshold=3, escalation_window_s=10.0, suppression_s=1.0)
|
|
d.update(kw)
|
|
return AlertManager(**d)
|
|
|
|
def _seq(**kw):
|
|
d = dict(
|
|
reverse_speed_ms=-0.15, reverse_distance_m=0.30,
|
|
angular_speed_rads=0.60, turn_angle_rad=1.5708,
|
|
retry_timeout_s=3.0, clear_hold_s=0.5, max_retries=3,
|
|
)
|
|
d.update(kw)
|
|
return RecoverySequencer(**d)
|
|
|
|
def _fsm(**kw):
|
|
d = dict(
|
|
stopped_ms=0.03, major_count_threshold=3, escalation_window_s=10.0,
|
|
suppression_s=0.0, # disable suppression for cleaner tests
|
|
reverse_speed_ms=-0.15, reverse_distance_m=0.30,
|
|
angular_speed_rads=0.60, turn_angle_rad=1.5708,
|
|
retry_timeout_s=3.0, clear_hold_s=0.5, max_retries=3,
|
|
)
|
|
d.update(kw)
|
|
return EmergencyFSM(**d)
|
|
|
|
def _major_threat(threat_type=ThreatType.OBSTACLE_PROXIMITY, ts=0.0):
|
|
return ThreatEvent(threat_type=threat_type, level=ThreatLevel.MAJOR,
|
|
value=1.0, detail="test", timestamp_s=ts)
|
|
|
|
def _critical_threat(ts=0.0):
|
|
return ThreatEvent(threat_type=ThreatType.OBSTACLE_PROXIMITY,
|
|
level=ThreatLevel.CRITICAL, value=0.05,
|
|
detail="critical test", timestamp_s=ts)
|
|
|
|
def _minor_threat(ts=0.0):
|
|
return ThreatEvent(threat_type=ThreatType.FALL_RISK, level=ThreatLevel.MINOR,
|
|
value=0.21, detail="tilt", timestamp_s=ts)
|
|
|
|
def _clear_threat():
|
|
return ThreatEvent.clear()
|
|
|
|
def _inp(threat=None, speed=0.0, ack=False):
|
|
return EmergencyInputs(
|
|
threat=threat or _clear_threat(),
|
|
robot_speed_ms=speed,
|
|
acknowledge=ack,
|
|
)
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# ObstacleDetector
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
class TestObstacleDetector:
|
|
|
|
def test_clear_when_far(self):
|
|
ev = _obs().update(0.5, 0.3)
|
|
assert ev.level == ThreatLevel.CLEAR
|
|
|
|
def test_major_within_stop_distance(self):
|
|
ev = _obs(stop_distance_m=0.30).update(0.25, 0.3)
|
|
assert ev.level == ThreatLevel.MAJOR
|
|
assert ev.threat_type == ThreatType.OBSTACLE_PROXIMITY
|
|
|
|
def test_critical_within_critical_distance(self):
|
|
ev = _obs(critical_distance_m=0.10).update(0.05, 0.3)
|
|
assert ev.level == ThreatLevel.CRITICAL
|
|
|
|
def test_clear_when_stopped(self):
|
|
"""Obstacle detection suppressed when not moving."""
|
|
ev = _obs(min_speed_ms=0.05).update(0.05, 0.01)
|
|
assert ev.level == ThreatLevel.CLEAR
|
|
|
|
def test_active_at_min_speed(self):
|
|
ev = _obs(min_speed_ms=0.05).update(0.20, 0.06)
|
|
assert ev.level == ThreatLevel.MAJOR
|
|
|
|
def test_value_is_distance(self):
|
|
ev = _obs().update(0.20, 0.3)
|
|
assert ev.value == pytest.approx(0.20, abs=1e-9)
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# FallDetector
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
class TestFallDetector:
|
|
|
|
def test_clear_on_flat(self):
|
|
ev = _fall().update(0.0, 0.0)
|
|
assert ev.level == ThreatLevel.CLEAR
|
|
|
|
def test_minor_moderate_tilt(self):
|
|
ev = _fall(minor_tilt_rad=0.20, major_tilt_rad=0.35).update(0.25, 0.0)
|
|
assert ev.level == ThreatLevel.MINOR
|
|
|
|
def test_major_high_tilt(self):
|
|
ev = _fall(major_tilt_rad=0.35, critical_tilt_rad=0.52).update(0.40, 0.0)
|
|
assert ev.level == ThreatLevel.MAJOR
|
|
|
|
def test_critical_extreme_tilt(self):
|
|
ev = _fall(critical_tilt_rad=0.52).update(0.60, 0.0)
|
|
assert ev.level == ThreatLevel.CRITICAL
|
|
|
|
def test_major_on_floor_drop(self):
|
|
ev = _fall(floor_drop_m=0.15).update(0.0, 0.0, floor_drop_m=0.20)
|
|
assert ev.level == ThreatLevel.MAJOR
|
|
assert "drop" in ev.detail.lower()
|
|
|
|
def test_roll_triggers_same_as_pitch(self):
|
|
"""Roll beyond minor threshold also fires."""
|
|
ev = _fall(minor_tilt_rad=0.20).update(0.0, 0.25)
|
|
assert ev.level == ThreatLevel.MINOR
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# StuckDetector
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
class TestStuckDetector:
|
|
|
|
def test_clear_when_not_commanded(self):
|
|
s = _stuck(stuck_timeout_s=1.0, min_cmd_ms=0.05)
|
|
ev = s.update(0.01, 0.0, dt=1.0) # cmd below threshold
|
|
assert ev.level == ThreatLevel.CLEAR
|
|
|
|
def test_clear_when_moving(self):
|
|
s = _stuck(stuck_timeout_s=1.0)
|
|
ev = s.update(0.2, 0.2, dt=1.0) # actually moving
|
|
assert ev.level == ThreatLevel.CLEAR
|
|
|
|
def test_major_after_timeout(self):
|
|
s = _stuck(stuck_timeout_s=3.0, min_cmd_ms=0.05, moving_threshold_ms=0.05)
|
|
for _ in range(6):
|
|
ev = s.update(0.2, 0.0, dt=0.5) # cmd=0.2, actual=0 → stuck
|
|
assert ev.level == ThreatLevel.MAJOR
|
|
|
|
def test_no_major_before_timeout(self):
|
|
s = _stuck(stuck_timeout_s=3.0)
|
|
ev = s.update(0.2, 0.0, dt=1.0) # only 1s — not yet
|
|
assert ev.level == ThreatLevel.CLEAR
|
|
|
|
def test_reset_on_motion_resume(self):
|
|
s = _stuck(stuck_timeout_s=1.0)
|
|
s.update(0.2, 0.0, dt=0.8) # accumulate stuck time
|
|
s.update(0.2, 0.3, dt=0.1) # motion resumes → reset
|
|
ev = s.update(0.2, 0.0, dt=0.3) # only 0.3s since reset → still clear
|
|
assert ev.level == ThreatLevel.CLEAR
|
|
|
|
def test_stuck_time_property(self):
|
|
s = _stuck(stuck_timeout_s=5.0)
|
|
s.update(0.2, 0.0, dt=1.0)
|
|
s.update(0.2, 0.0, dt=1.0)
|
|
assert s.stuck_time == pytest.approx(2.0, abs=1e-6)
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# BumpDetector
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
class TestBumpDetector:
|
|
|
|
def test_clear_on_first_call(self):
|
|
"""No jerk on first sample (no previous value)."""
|
|
ev = _bump().update(0.0, 0.0, 9.81, dt=0.05)
|
|
assert ev.level == ThreatLevel.CLEAR
|
|
|
|
def test_major_on_jerk(self):
|
|
b = _bump(jerk_threshold_ms3=5.0, critical_jerk_threshold_ms3=20.0)
|
|
b.update(0.0, 0.0, 9.81, dt=0.05) # seed → dyn_mag = 0
|
|
# ax=4.5: raw≈10.79, dyn≈0.98, jerk≈9.8 m/s³ → MAJOR (5.0 < 9.8 < 20.0)
|
|
ev = b.update(4.5, 0.0, 9.81, dt=0.1)
|
|
assert ev.level == ThreatLevel.MAJOR
|
|
|
|
def test_critical_on_severe_jerk(self):
|
|
b = _bump(jerk_threshold_ms3=5.0, critical_jerk_threshold_ms3=20.0)
|
|
b.update(0.0, 0.0, 9.81, dt=0.05)
|
|
# Very large spike
|
|
ev = b.update(50.0, 0.0, 9.81, dt=0.1)
|
|
assert ev.level == ThreatLevel.CRITICAL
|
|
|
|
def test_clear_on_gentle_acceleration(self):
|
|
b = _bump(jerk_threshold_ms3=8.0)
|
|
b.update(0.0, 0.0, 9.81, dt=0.05)
|
|
ev = b.update(0.1, 0.0, 9.81, dt=0.05) # tiny change
|
|
assert ev.level == ThreatLevel.CLEAR
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# highest_threat
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
class TestHighestThreat:
|
|
|
|
def test_empty_returns_clear(self):
|
|
assert highest_threat([]).level == ThreatLevel.CLEAR
|
|
|
|
def test_picks_highest(self):
|
|
a = ThreatEvent(level=ThreatLevel.MINOR)
|
|
b = ThreatEvent(level=ThreatLevel.CRITICAL)
|
|
c = ThreatEvent(level=ThreatLevel.MAJOR)
|
|
assert highest_threat([a, b, c]).level == ThreatLevel.CRITICAL
|
|
|
|
def test_single_item(self):
|
|
ev = ThreatEvent(level=ThreatLevel.MAJOR)
|
|
assert highest_threat([ev]) is ev
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# AlertManager
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
class TestAlertManager:
|
|
|
|
def test_clear_returns_none(self):
|
|
am = _alert_mgr()
|
|
assert am.update(_clear_threat()) is None
|
|
|
|
def test_minor_threat_gives_minor_alert(self):
|
|
am = _alert_mgr(suppression_s=0.0)
|
|
alert = am.update(_minor_threat(ts=0.0))
|
|
assert alert is not None
|
|
assert alert.level == AlertLevel.MINOR
|
|
|
|
def test_major_threat_gives_major_alert(self):
|
|
am = _alert_mgr(suppression_s=0.0)
|
|
alert = am.update(_major_threat(ts=0.0))
|
|
assert alert is not None
|
|
assert alert.level == AlertLevel.MAJOR
|
|
|
|
def test_critical_threat_gives_critical_alert(self):
|
|
am = _alert_mgr(suppression_s=0.0)
|
|
alert = am.update(_critical_threat(ts=0.0))
|
|
assert alert is not None
|
|
assert alert.level == AlertLevel.CRITICAL
|
|
|
|
def test_suppression_blocks_duplicate(self):
|
|
am = _alert_mgr(suppression_s=5.0)
|
|
am.update(_major_threat(ts=0.0))
|
|
alert = am.update(_major_threat(ts=1.0)) # within 5s window
|
|
assert alert is None
|
|
|
|
def test_suppression_expires(self):
|
|
am = _alert_mgr(suppression_s=2.0)
|
|
am.update(_major_threat(ts=0.0))
|
|
alert = am.update(_major_threat(ts=3.0)) # after 2s window
|
|
assert alert is not None
|
|
|
|
def test_escalation_major_to_critical(self):
|
|
"""After major_count_threshold major alerts, next one becomes CRITICAL."""
|
|
am = _alert_mgr(major_count_threshold=3, escalation_window_s=60.0,
|
|
suppression_s=0.0)
|
|
for i in range(3):
|
|
am.update(_major_threat(ts=float(i)))
|
|
# 4th should be escalated
|
|
alert = am.update(_major_threat(ts=4.0))
|
|
assert alert is not None
|
|
assert alert.level == AlertLevel.CRITICAL
|
|
|
|
def test_escalation_resets_after_window(self):
|
|
"""Major alerts outside the window don't contribute to escalation."""
|
|
am = _alert_mgr(major_count_threshold=3, escalation_window_s=5.0,
|
|
suppression_s=0.0)
|
|
am.update(_major_threat(ts=0.0))
|
|
am.update(_major_threat(ts=1.0))
|
|
am.update(_major_threat(ts=2.0))
|
|
# All 3 are old; new one at t=10 is outside window
|
|
alert = am.update(_major_threat(ts=10.0))
|
|
assert alert is not None
|
|
assert alert.level == AlertLevel.MAJOR # not escalated
|
|
|
|
def test_reset_clears_escalation_state(self):
|
|
am = _alert_mgr(major_count_threshold=2, suppression_s=0.0)
|
|
am.update(_major_threat(ts=0.0))
|
|
am.update(_major_threat(ts=1.0)) # now at threshold
|
|
am.reset()
|
|
alert = am.update(_major_threat(ts=2.0))
|
|
assert alert.level == AlertLevel.MAJOR # back to major after reset
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# RecoverySequencer
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
class TestRecoverySequencer:
|
|
|
|
def _trigger(self, seq):
|
|
return seq.tick(RecoveryInputs(trigger=True, dt=0.02))
|
|
|
|
def test_idle_on_init(self):
|
|
seq = _seq()
|
|
assert seq.state == RecoveryState.IDLE
|
|
|
|
def test_trigger_starts_reversing(self):
|
|
seq = _seq()
|
|
out = self._trigger(seq)
|
|
assert seq.state == RecoveryState.REVERSING
|
|
|
|
def test_reversing_backward_velocity(self):
|
|
seq = _seq(reverse_speed_ms=-0.15)
|
|
self._trigger(seq)
|
|
out = seq.tick(RecoveryInputs(dt=0.02))
|
|
assert out.cmd_linear < 0.0
|
|
|
|
def test_reversing_completes_to_turning(self):
|
|
seq = _seq(reverse_speed_ms=-1.0, reverse_distance_m=0.5)
|
|
self._trigger(seq)
|
|
for _ in range(30):
|
|
out = seq.tick(RecoveryInputs(dt=0.02))
|
|
assert seq.state == RecoveryState.TURNING
|
|
|
|
def test_turning_positive_angular(self):
|
|
seq = _seq(reverse_speed_ms=-1.0, reverse_distance_m=0.1,
|
|
angular_speed_rads=1.0)
|
|
self._trigger(seq)
|
|
# Skip through reversing quickly
|
|
for _ in range(20):
|
|
seq.tick(RecoveryInputs(dt=0.02))
|
|
if seq.state == RecoveryState.TURNING:
|
|
out = seq.tick(RecoveryInputs(dt=0.02))
|
|
assert out.cmd_angular > 0.0
|
|
|
|
def test_retrying_increments_count(self):
|
|
seq = _seq(reverse_speed_ms=-1.0, reverse_distance_m=0.05,
|
|
angular_speed_rads=10.0, turn_angle_rad=0.1)
|
|
self._trigger(seq)
|
|
for _ in range(100):
|
|
seq.tick(RecoveryInputs(dt=0.02))
|
|
assert seq.state == RecoveryState.RETRYING
|
|
assert seq.retry_count == 1
|
|
|
|
def test_threat_cleared_returns_idle(self):
|
|
seq = _seq(reverse_speed_ms=-1.0, reverse_distance_m=0.05,
|
|
angular_speed_rads=10.0, turn_angle_rad=0.1,
|
|
clear_hold_s=0.1)
|
|
self._trigger(seq)
|
|
# Fast-forward to RETRYING
|
|
for _ in range(100):
|
|
seq.tick(RecoveryInputs(dt=0.02))
|
|
assert seq.state == RecoveryState.RETRYING
|
|
# Feed cleared ticks until clear_hold met
|
|
for _ in range(20):
|
|
seq.tick(RecoveryInputs(threat_cleared=True, dt=0.02))
|
|
assert seq.state == RecoveryState.IDLE
|
|
|
|
def test_max_retries_gives_up(self):
|
|
seq = _seq(reverse_speed_ms=-1.0, reverse_distance_m=0.05,
|
|
angular_speed_rads=10.0, turn_angle_rad=0.1,
|
|
retry_timeout_s=0.1, max_retries=2)
|
|
self._trigger(seq)
|
|
for _ in range(500):
|
|
out = seq.tick(RecoveryInputs(threat_cleared=False, dt=0.05))
|
|
if seq.state == RecoveryState.GAVE_UP:
|
|
break
|
|
assert seq.state == RecoveryState.GAVE_UP
|
|
|
|
def test_reset_returns_to_idle(self):
|
|
seq = _seq()
|
|
self._trigger(seq)
|
|
seq.reset()
|
|
assert seq.state == RecoveryState.IDLE
|
|
assert seq.retry_count == 0
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# EmergencyFSM
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
class TestEmergencyFSMBasic:
|
|
|
|
def test_initial_state_nominal(self):
|
|
fsm = _fsm()
|
|
assert fsm.state == EmergencyState.NOMINAL
|
|
|
|
def test_nominal_stays_on_clear(self):
|
|
fsm = _fsm()
|
|
out = fsm.tick(_inp())
|
|
assert fsm.state == EmergencyState.NOMINAL
|
|
assert out.cmd_override is False
|
|
|
|
def test_minor_alert_no_override(self):
|
|
fsm = _fsm()
|
|
out = fsm.tick(_inp(_minor_threat(ts=0.0)))
|
|
assert fsm.state == EmergencyState.NOMINAL
|
|
assert out.cmd_override is False
|
|
assert out.alert is not None
|
|
assert out.alert.level == AlertLevel.MINOR
|
|
|
|
def test_major_threat_enters_stopping(self):
|
|
fsm = _fsm()
|
|
out = fsm.tick(_inp(_major_threat()))
|
|
assert fsm.state == EmergencyState.STOPPING
|
|
assert out.cmd_override is True
|
|
|
|
def test_critical_threat_enters_stopping_critical_pending(self):
|
|
fsm = _fsm()
|
|
fsm.tick(_inp(_critical_threat()))
|
|
assert fsm.state == EmergencyState.STOPPING
|
|
assert fsm._critical_pending is True
|
|
|
|
|
|
class TestEmergencyFSMStopping:
|
|
|
|
def test_stopping_commands_zero(self):
|
|
fsm = _fsm()
|
|
fsm.tick(_inp(_major_threat()))
|
|
out = fsm.tick(_inp(_major_threat(), speed=0.5))
|
|
assert out.cmd_linear == pytest.approx(0.0, abs=1e-9)
|
|
assert out.cmd_angular == pytest.approx(0.0, abs=1e-9)
|
|
|
|
def test_stopped_enters_recovering(self):
|
|
fsm = _fsm(stopped_ms=0.03)
|
|
fsm.tick(_inp(_major_threat()))
|
|
out = fsm.tick(_inp(_major_threat(), speed=0.01)) # below stopped_ms
|
|
assert fsm.state == EmergencyState.RECOVERING
|
|
|
|
def test_critical_pending_enters_escalated(self):
|
|
fsm = _fsm(stopped_ms=0.03)
|
|
fsm.tick(_inp(_critical_threat()))
|
|
fsm.tick(_inp(_critical_threat(), speed=0.01)) # stopped → ESCALATED
|
|
assert fsm.state == EmergencyState.ESCALATED
|
|
|
|
|
|
class TestEmergencyFSMRecovering:
|
|
|
|
def _reach_recovering(self, fsm):
|
|
fsm.tick(_inp(_major_threat()))
|
|
fsm.tick(_inp(_major_threat(), speed=0.0)) # stopped → RECOVERING
|
|
assert fsm.state == EmergencyState.RECOVERING
|
|
|
|
def test_recovering_has_cmd_override(self):
|
|
fsm = _fsm()
|
|
self._reach_recovering(fsm)
|
|
out = fsm.tick(_inp(_clear_threat()))
|
|
assert out.cmd_override is True
|
|
|
|
def test_recovering_gave_up_escalates(self):
|
|
fsm = _fsm(max_retries=1, retry_timeout_s=0.05)
|
|
self._reach_recovering(fsm)
|
|
# Drive recovery to GAVE_UP by feeding many non-clearing ticks
|
|
for _ in range(500):
|
|
out = fsm.tick(_inp(_major_threat()))
|
|
if fsm.state == EmergencyState.ESCALATED:
|
|
break
|
|
assert fsm.state == EmergencyState.ESCALATED
|
|
|
|
|
|
class TestEmergencyFSMEscalated:
|
|
|
|
def _reach_escalated(self, fsm):
|
|
fsm.tick(_inp(_critical_threat()))
|
|
fsm.tick(_inp(_critical_threat(), speed=0.0))
|
|
assert fsm.state == EmergencyState.ESCALATED
|
|
|
|
def test_escalated_emits_critical_alert_once(self):
|
|
fsm = _fsm()
|
|
self._reach_escalated(fsm)
|
|
out1 = fsm.tick(_inp())
|
|
out2 = fsm.tick(_inp())
|
|
assert out1.alert is not None
|
|
assert out1.alert.level == AlertLevel.CRITICAL
|
|
assert out2.alert is None # suppressed after first emission
|
|
|
|
def test_escalated_e_stop_asserted(self):
|
|
fsm = _fsm()
|
|
self._reach_escalated(fsm)
|
|
out = fsm.tick(_inp())
|
|
assert out.e_stop is True
|
|
|
|
def test_escalated_stays_without_ack(self):
|
|
fsm = _fsm()
|
|
self._reach_escalated(fsm)
|
|
for _ in range(5):
|
|
fsm.tick(_inp())
|
|
assert fsm.state == EmergencyState.ESCALATED
|
|
|
|
def test_acknowledge_returns_to_nominal(self):
|
|
fsm = _fsm()
|
|
self._reach_escalated(fsm)
|
|
fsm.tick(_inp(ack=True))
|
|
assert fsm.state == EmergencyState.NOMINAL
|
|
|
|
def test_reset_returns_to_nominal(self):
|
|
fsm = _fsm()
|
|
self._reach_escalated(fsm)
|
|
fsm.reset()
|
|
assert fsm.state == EmergencyState.NOMINAL
|
|
|
|
def test_e_stop_cleared_on_ack(self):
|
|
fsm = _fsm()
|
|
self._reach_escalated(fsm)
|
|
out = fsm.tick(_inp(ack=True))
|
|
assert out.e_stop is False
|