sl-controls 783dedf4d4 feat(#158): docking station auto-return with ArUco/IR detection and visual servo
Two new ROS2 packages implementing Issue #158:

saltybot_docking_msgs (ament_cmake)
- DockingStatus.msg: stamp, state, dock_detected, distance_m, lateral_error_m,
  battery_pct, charging, aligned
- Dock.srv / Undock.srv: force + resume_mission flags

saltybot_docking (ament_python, 20 Hz)
- dock_detector.py: ArucoDetector (cv2.aruco PnP → DockPose) + IRBeaconDetector
  (EMA envelope with amplitude threshold); both gracefully degrade if unavailable
- visual_servo.py: IBVS proportional controller — v = k_lin×(d−target),
  ω = −k_ang×yaw; aligned when |lateral| < 5mm AND d < contact_distance
- charge_monitor.py: edge-triggered events (CHARGING_STARTED/STOPPED,
  THRESHOLD_LOW at 15%, THRESHOLD_HIGH at 80%)
- docking_state_machine.py: 7-state FSM (IDLE→DETECTING→NAV2_APPROACH→
  VISUAL_SERVO→CONTACT→CHARGING→UNDOCKING); mission_resume signal on
  auto-dock cycle; contact retry on timeout; lost-detection timeout
- docking_node.py: 20Hz ROS2 node; Nav2 NavigateToPose action client (optional);
  /saltybot/dock + /saltybot/undock services; /saltybot/docking_cmd_vel;
  /saltybot/resume_mission; /saltybot/docking_status
- config/docking_params.yaml, launch/docking.launch.py

Tests: 64/64 passing (IRBeaconDetector, VisualServo, ChargeMonitor,
DockingStateMachine — all state transitions and guard conditions covered)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-02 10:19:22 -05:00

581 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
test_docking.py — Unit tests for Issue #158 docking modules.
Covers:
IRBeaconDetector — envelope follower, threshold, reset
VisualServo — velocity commands, clamping, alignment declaration
ChargeMonitor — event detection, edge-triggering, threshold crossing
DockingStateMachine — all state transitions and guard conditions
"""
import math
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import pytest
from saltybot_docking.dock_detector import DockPose, IRBeaconDetector
from saltybot_docking.visual_servo import VisualServo, VisualServoResult
from saltybot_docking.charge_monitor import ChargeEvent, ChargeMonitor
from saltybot_docking.docking_state_machine import (
DockingInputs,
DockingState,
DockingStateMachine,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_ir(**kw):
defaults = dict(threshold=0.5, smoothing_alpha=1.0)
defaults.update(kw)
return IRBeaconDetector(**defaults)
def _make_servo(**kw):
defaults = dict(
k_linear=1.0,
k_angular=1.0,
target_distance_m=0.02,
lateral_tol_m=0.005,
contact_distance_m=0.05,
max_linear_ms=0.5,
max_angular_rads=1.0,
)
defaults.update(kw)
return VisualServo(**defaults)
def _make_charge(**kw):
defaults = dict(
low_threshold_pct=15.0,
high_threshold_pct=80.0,
charging_current_threshold_a=0.10,
)
defaults.update(kw)
return ChargeMonitor(**defaults)
def _make_fsm(**kw):
defaults = dict(
battery_low_pct=15.0,
battery_high_pct=80.0,
servo_range_m=1.0,
undock_speed_ms=-0.20,
undock_ticks_max=5,
lost_ticks_max=3,
contact_timeout_ticks=3,
)
defaults.update(kw)
return DockingStateMachine(**defaults)
def _inp(**kw):
return DockingInputs(**kw)
def _near_pose(dist=0.5):
"""Dock pose within servo range (default 0.5 m)."""
return DockPose(distance_m=dist, yaw_rad=0.0, lateral_m=0.0, source="aruco")
def _far_pose(dist=3.0):
"""Dock pose outside servo range."""
return DockPose(distance_m=dist, yaw_rad=0.0, lateral_m=0.0, source="aruco")
# ══════════════════════════════════════════════════════════════════════════════
# IRBeaconDetector
# ══════════════════════════════════════════════════════════════════════════════
class TestIRBeaconDetector:
def test_zero_signal_not_detected(self):
ir = _make_ir()
assert ir.update(0.0) is False
assert ir.detected is False
def test_above_threshold_detected(self):
ir = _make_ir(threshold=0.5, smoothing_alpha=1.0)
assert ir.update(0.6) is True
assert ir.detected is True
def test_below_threshold_not_detected(self):
ir = _make_ir(threshold=0.5, smoothing_alpha=1.0)
assert ir.update(0.4) is False
def test_exactly_at_threshold_detected(self):
ir = _make_ir(threshold=0.5, smoothing_alpha=1.0)
assert ir.update(0.5) is True
def test_envelope_smoothing_alpha_one(self):
"""With alpha=1, envelope equals |sample| immediately."""
ir = _make_ir(smoothing_alpha=1.0)
ir.update(0.7)
assert ir.envelope == pytest.approx(0.7, abs=1e-9)
def test_envelope_smoothing_below_one(self):
"""With alpha=0.5, envelope is a running average."""
ir = _make_ir(threshold=999.0, smoothing_alpha=0.5)
ir.update(1.0) # envelope = 0.5
assert ir.envelope == pytest.approx(0.5, abs=1e-6)
ir.update(1.0) # envelope = 0.75
assert ir.envelope == pytest.approx(0.75, abs=1e-6)
def test_reset_clears_envelope(self):
ir = _make_ir(smoothing_alpha=1.0)
ir.update(0.8)
ir.reset()
assert ir.envelope == pytest.approx(0.0, abs=1e-9)
assert ir.detected is False
def test_negative_sample_uses_abs(self):
"""update() should use absolute value of sample."""
ir = _make_ir(threshold=0.5, smoothing_alpha=1.0)
assert ir.update(-0.7) is True
# ══════════════════════════════════════════════════════════════════════════════
# VisualServo
# ══════════════════════════════════════════════════════════════════════════════
class TestVisualServo:
def test_far_dock_gives_forward_velocity(self):
servo = _make_servo(k_linear=1.0, target_distance_m=0.02)
pose = DockPose(distance_m=0.5, yaw_rad=0.0, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.linear_x > 0.0
def test_at_target_distance_zero_linear(self):
servo = _make_servo(k_linear=1.0, target_distance_m=0.02)
pose = DockPose(distance_m=0.02, yaw_rad=0.0, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.linear_x == pytest.approx(0.0, abs=1e-9)
def test_behind_target_no_reverse(self):
"""When distance < target, linear velocity must be 0 (no reversing)."""
servo = _make_servo(k_linear=1.0, target_distance_m=0.10)
pose = DockPose(distance_m=0.01, yaw_rad=0.0, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.linear_x == pytest.approx(0.0, abs=1e-9)
def test_yaw_right_gives_negative_angular(self):
"""yaw_rad = atan2(tx, tz); tx > 0 → dock is right in camera → turn right (ω < 0)."""
servo = _make_servo(k_angular=1.0)
pose = DockPose(distance_m=0.5, yaw_rad=0.3, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.angular_z < 0.0
def test_yaw_left_gives_positive_angular(self):
"""tx < 0 → dock is left in camera → turn left (ω > 0)."""
servo = _make_servo(k_angular=1.0)
pose = DockPose(distance_m=0.5, yaw_rad=-0.3, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.angular_z > 0.0
def test_zero_yaw_zero_angular(self):
servo = _make_servo()
pose = DockPose(distance_m=0.5, yaw_rad=0.0, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.angular_z == pytest.approx(0.0, abs=1e-9)
def test_max_linear_clamped(self):
servo = _make_servo(k_linear=100.0, max_linear_ms=0.1)
pose = DockPose(distance_m=5.0, yaw_rad=0.0, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.linear_x <= 0.1 + 1e-9
def test_max_angular_clamped(self):
servo = _make_servo(k_angular=100.0, max_angular_rads=0.3)
pose = DockPose(distance_m=0.5, yaw_rad=5.0, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert abs(result.angular_z) <= 0.3 + 1e-9
def test_aligned_when_within_tolerances(self):
servo = _make_servo(lateral_tol_m=0.005, contact_distance_m=0.05)
pose = DockPose(distance_m=0.03, yaw_rad=0.0, lateral_m=0.002, source="aruco")
result = servo.update(pose)
assert result.aligned is True
def test_not_aligned_lateral_too_large(self):
servo = _make_servo(lateral_tol_m=0.005, contact_distance_m=0.05)
pose = DockPose(distance_m=0.03, yaw_rad=0.0, lateral_m=0.010, source="aruco")
result = servo.update(pose)
assert result.aligned is False
def test_not_aligned_too_far(self):
servo = _make_servo(lateral_tol_m=0.005, contact_distance_m=0.05)
pose = DockPose(distance_m=0.10, yaw_rad=0.0, lateral_m=0.001, source="aruco")
result = servo.update(pose)
assert result.aligned is False
def test_stop_returns_zero_velocity(self):
servo = _make_servo()
result = servo.stop()
assert result.linear_x == pytest.approx(0.0, abs=1e-9)
assert result.angular_z == pytest.approx(0.0, abs=1e-9)
assert result.aligned is False
def test_proportional_linear(self):
"""Linear velocity scales with (distance target)."""
servo = _make_servo(k_linear=2.0, target_distance_m=0.0, max_linear_ms=999.0)
pose = DockPose(distance_m=0.3, yaw_rad=0.0, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.linear_x == pytest.approx(2.0 * 0.3, rel=1e-6)
# ══════════════════════════════════════════════════════════════════════════════
# ChargeMonitor
# ══════════════════════════════════════════════════════════════════════════════
class TestChargeMonitor:
def test_initially_not_charging(self):
cm = _make_charge()
assert cm.is_charging is False
def test_charging_started_event(self):
cm = _make_charge(charging_current_threshold_a=0.1)
events = cm.update(50.0, 0.5)
assert ChargeEvent.CHARGING_STARTED in events
def test_charging_stopped_event(self):
cm = _make_charge(charging_current_threshold_a=0.1)
cm.update(50.0, 0.5) # start charging
events = cm.update(50.0, 0.0) # stop
assert ChargeEvent.CHARGING_STOPPED in events
def test_no_duplicate_charging_started(self):
"""CHARGING_STARTED fires only once per rising edge."""
cm = _make_charge(charging_current_threshold_a=0.1)
cm.update(50.0, 0.5) # fires CHARGING_STARTED
events = cm.update(50.0, 0.5) # still charging — no duplicate
assert ChargeEvent.CHARGING_STARTED not in events
def test_low_threshold_event_falling_edge(self):
"""THRESHOLD_LOW fires when SOC first drops below low_threshold_pct."""
cm = _make_charge(low_threshold_pct=20.0)
events = cm.update(19.0, 0.0)
assert ChargeEvent.THRESHOLD_LOW in events
def test_no_duplicate_low_threshold(self):
cm = _make_charge(low_threshold_pct=20.0)
cm.update(19.0, 0.0) # fires once
events = cm.update(18.0, 0.0) # still below — no repeat
assert ChargeEvent.THRESHOLD_LOW not in events
def test_high_threshold_event_rising_edge(self):
"""THRESHOLD_HIGH fires when SOC rises above high_threshold_pct."""
cm = _make_charge(low_threshold_pct=15.0, high_threshold_pct=80.0)
# Start below high threshold
cm.update(50.0, 0.5) # sets was_high = False (50 < 80)
events = cm.update(81.0, 0.5) # rises above 80
assert ChargeEvent.THRESHOLD_HIGH in events
def test_no_high_event_if_always_above(self):
"""THRESHOLD_HIGH should not fire if SOC starts above threshold."""
cm = _make_charge(high_threshold_pct=80.0)
# Default _was_high=True; remains above → no edge
events = cm.update(90.0, 0.0)
assert ChargeEvent.THRESHOLD_HIGH not in events
def test_is_depleted_property(self):
cm = _make_charge(low_threshold_pct=20.0)
cm.update(15.0, 0.0)
assert cm.is_depleted is True
def test_is_charged_property(self):
cm = _make_charge(high_threshold_pct=80.0)
cm.update(85.0, 0.0)
assert cm.is_charged is True
def test_percentage_tracking(self):
cm = _make_charge()
cm.update(42.5, 0.0)
assert cm.percentage == pytest.approx(42.5, abs=1e-6)
def test_reset_clears_state(self):
cm = _make_charge(charging_current_threshold_a=0.1)
cm.update(10.0, 0.5) # is_charging=True, is_depleted=True
cm.reset()
assert cm.is_charging is False
assert cm.is_depleted is False
assert cm.percentage == pytest.approx(100.0, abs=1e-6)
# ══════════════════════════════════════════════════════════════════════════════
# DockingStateMachine
# ══════════════════════════════════════════════════════════════════════════════
class TestDockingStateMachineBasic:
def test_initial_state_is_idle(self):
fsm = _make_fsm()
assert fsm.state == DockingState.IDLE
def test_idle_stays_idle_on_good_battery(self):
fsm = _make_fsm()
out = fsm.tick(_inp(battery_pct=80.0))
assert fsm.state == DockingState.IDLE
assert out.state_changed is False
def test_battery_low_triggers_detecting(self):
fsm = _make_fsm()
out = fsm.tick(_inp(battery_pct=10.0))
assert fsm.state == DockingState.DETECTING
assert out.mission_interrupted is True
assert out.state_changed is True
def test_dock_request_triggers_detecting(self):
fsm = _make_fsm()
out = fsm.tick(_inp(battery_pct=80.0, dock_requested=True))
assert fsm.state == DockingState.DETECTING
assert out.mission_interrupted is False
def test_dock_request_not_duplicated_mission_interrupted(self):
"""Explicit dock request should not set mission_interrupted."""
fsm = _make_fsm()
out = fsm.tick(_inp(battery_pct=80.0, dock_requested=True))
assert out.mission_interrupted is False
class TestDockingStateMachineDetecting:
def test_no_dock_stays_detecting(self):
fsm = _make_fsm()
fsm.tick(_inp(dock_requested=True))
out = fsm.tick(_inp()) # no dock_pose
assert fsm.state == DockingState.DETECTING
def test_far_dock_goes_to_nav2(self):
fsm = _make_fsm(servo_range_m=1.0)
fsm.tick(_inp(dock_requested=True))
out = fsm.tick(_inp(dock_pose=_far_pose()))
assert fsm.state == DockingState.NAV2_APPROACH
assert out.request_nav2 is True
def test_near_dock_skips_nav2_to_servo(self):
fsm = _make_fsm(servo_range_m=1.0)
fsm.tick(_inp(dock_requested=True))
out = fsm.tick(_inp(dock_pose=_near_pose()))
assert fsm.state == DockingState.VISUAL_SERVO
def test_undock_request_aborts_detecting(self):
fsm = _make_fsm()
fsm.tick(_inp(dock_requested=True))
fsm.tick(_inp(undock_requested=True))
assert fsm.state == DockingState.IDLE
class TestDockingStateMachineNav2Approach:
def _reach_nav2(self, fsm):
fsm.tick(_inp(dock_requested=True))
fsm.tick(_inp(dock_pose=_far_pose()))
assert fsm.state == DockingState.NAV2_APPROACH
def test_nav2_arrived_enters_visual_servo(self):
fsm = _make_fsm()
self._reach_nav2(fsm)
fsm.tick(_inp(nav2_arrived=True))
assert fsm.state == DockingState.VISUAL_SERVO
def test_dock_close_during_nav2_enters_servo(self):
fsm = _make_fsm(servo_range_m=1.0)
self._reach_nav2(fsm)
out = fsm.tick(_inp(dock_pose=_near_pose()))
assert fsm.state == DockingState.VISUAL_SERVO
assert out.cancel_nav2 is True
def test_nav2_failed_back_to_detecting(self):
fsm = _make_fsm()
self._reach_nav2(fsm)
fsm.tick(_inp(nav2_failed=True))
assert fsm.state == DockingState.DETECTING
def test_undock_during_nav2_cancels_and_idles(self):
fsm = _make_fsm()
self._reach_nav2(fsm)
out = fsm.tick(_inp(undock_requested=True))
assert fsm.state == DockingState.IDLE
assert out.cancel_nav2 is True
class TestDockingStateMachineVisualServo:
def _reach_servo(self, fsm):
fsm.tick(_inp(dock_requested=True))
fsm.tick(_inp(dock_pose=_near_pose()))
assert fsm.state == DockingState.VISUAL_SERVO
def test_aligned_enters_contact(self):
fsm = _make_fsm()
self._reach_servo(fsm)
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True))
assert fsm.state == DockingState.CONTACT
def test_not_aligned_stays_in_servo(self):
fsm = _make_fsm()
self._reach_servo(fsm)
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=False))
assert fsm.state == DockingState.VISUAL_SERVO
def test_lost_detection_timeout_enters_detecting(self):
fsm = _make_fsm(lost_ticks_max=3)
self._reach_servo(fsm)
for _ in range(3): # dock_pose=None each tick
fsm.tick(_inp())
assert fsm.state == DockingState.DETECTING
def test_lost_counter_resets_on_rediscovery(self):
"""Lost count should reset when pose reappears."""
fsm = _make_fsm(lost_ticks_max=3)
self._reach_servo(fsm)
fsm.tick(_inp()) # 1 lost tick
fsm.tick(_inp(dock_pose=_near_pose())) # rediscovered
# Need 3 more lost ticks to time out
fsm.tick(_inp())
fsm.tick(_inp())
assert fsm.state == DockingState.VISUAL_SERVO
fsm.tick(_inp())
assert fsm.state == DockingState.DETECTING
def test_undock_aborts_visual_servo(self):
fsm = _make_fsm()
self._reach_servo(fsm)
fsm.tick(_inp(undock_requested=True))
assert fsm.state == DockingState.IDLE
class TestDockingStateMachineContact:
def _reach_contact(self, fsm):
fsm.tick(_inp(dock_requested=True))
fsm.tick(_inp(dock_pose=_near_pose()))
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True))
assert fsm.state == DockingState.CONTACT
def test_charging_enters_charging_state(self):
fsm = _make_fsm()
self._reach_contact(fsm)
fsm.tick(_inp(is_charging=True))
assert fsm.state == DockingState.CHARGING
def test_contact_timeout_retries_servo(self):
fsm = _make_fsm(contact_timeout_ticks=3)
self._reach_contact(fsm)
for _ in range(3):
fsm.tick(_inp(is_charging=False))
assert fsm.state == DockingState.VISUAL_SERVO
def test_undock_from_contact_to_undocking(self):
fsm = _make_fsm()
self._reach_contact(fsm)
fsm.tick(_inp(undock_requested=True))
assert fsm.state == DockingState.UNDOCKING
class TestDockingStateMachineCharging:
def _reach_charging(self, fsm):
fsm.tick(_inp(dock_requested=True))
fsm.tick(_inp(dock_pose=_near_pose()))
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True))
fsm.tick(_inp(is_charging=True))
assert fsm.state == DockingState.CHARGING
def test_full_battery_triggers_undocking(self):
fsm = _make_fsm(battery_high_pct=80.0)
self._reach_charging(fsm)
fsm.tick(_inp(is_charging=True, battery_pct=85.0))
assert fsm.state == DockingState.UNDOCKING
def test_undock_request_during_charging(self):
fsm = _make_fsm()
self._reach_charging(fsm)
fsm.tick(_inp(undock_requested=True))
assert fsm.state == DockingState.UNDOCKING
def test_charging_stays_charging_below_high_threshold(self):
fsm = _make_fsm(battery_high_pct=80.0)
self._reach_charging(fsm)
fsm.tick(_inp(is_charging=True, battery_pct=70.0))
assert fsm.state == DockingState.CHARGING
class TestDockingStateMachineUndocking:
def _reach_undocking_via_charge(self, fsm):
fsm.tick(_inp(dock_requested=True))
fsm.tick(_inp(dock_pose=_near_pose()))
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True))
fsm.tick(_inp(is_charging=True))
fsm.tick(_inp(is_charging=True, battery_pct=85.0)) # → UNDOCKING
assert fsm.state == DockingState.UNDOCKING
def test_undocking_backs_away(self):
fsm = _make_fsm(undock_ticks_max=5)
self._reach_undocking_via_charge(fsm)
out = fsm.tick(_inp())
assert out.cmd_linear < 0.0
def test_undocking_completes_back_to_idle(self):
fsm = _make_fsm(undock_ticks_max=5)
self._reach_undocking_via_charge(fsm)
for _ in range(5):
out = fsm.tick(_inp())
assert fsm.state == DockingState.IDLE
def test_mission_resume_after_battery_auto_dock(self):
"""mission_resume must be True when auto-dock cycle completes."""
fsm = _make_fsm(undock_ticks_max=5)
# Trigger via battery low
fsm.tick(_inp(battery_pct=10.0)) # IDLE → DETECTING
fsm.tick(_inp(dock_pose=_near_pose())) # → VISUAL_SERVO
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True)) # → CONTACT
fsm.tick(_inp(is_charging=True)) # → CHARGING
fsm.tick(_inp(is_charging=True, battery_pct=85.0)) # → UNDOCKING
out = None
for _ in range(5):
out = fsm.tick(_inp())
assert out is not None
assert out.mission_resume is True
assert fsm.state == DockingState.IDLE
def test_no_mission_resume_on_explicit_undock(self):
"""Explicit undock should not trigger mission_resume."""
fsm = _make_fsm(undock_ticks_max=5)
fsm.tick(_inp(dock_requested=True)) # explicit dock — no interrupt
fsm.tick(_inp(dock_pose=_near_pose()))
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True))
fsm.tick(_inp(is_charging=True))
fsm.tick(_inp(undock_requested=True)) # explicit undock
last_out = None
for _ in range(5):
last_out = fsm.tick(_inp())
assert last_out is not None
assert last_out.mission_resume is False
class TestDockingStateMachineReset:
def test_reset_returns_to_idle(self):
fsm = _make_fsm()
fsm.tick(_inp(dock_requested=True))
assert fsm.state == DockingState.DETECTING
fsm.reset()
assert fsm.state == DockingState.IDLE
def test_state_changed_flag_on_transition(self):
fsm = _make_fsm()
out = fsm.tick(_inp(dock_requested=True))
assert out.state_changed is True
def test_no_state_changed_when_same_state(self):
fsm = _make_fsm()
out = fsm.tick(_inp()) # stays IDLE
assert out.state_changed is False