feat(#158): docking station auto-return — ArUco/IR detection, visual servo, charge monitoring #165

Merged
sl-jetson merged 1 commits from sl-controls/issue-158-docking into main 2026-03-02 10:26:18 -05:00
18 changed files with 2030 additions and 0 deletions
Showing only changes of commit 783dedf4d4 - Show all commits

View File

@ -0,0 +1,43 @@
/**:
ros__parameters:
# Control loop rate (Hz)
control_rate: 20.0
# Topic names
image_topic: "/camera/image_raw"
camera_info_topic: "/camera/camera_info"
ir_topic: "/saltybot/ir_beacon"
battery_topic: "/saltybot/battery_state"
# ── ArUco detection ────────────────────────────────────────────────────────
aruco_marker_id: 42 # marker ID on the dock face
aruco_marker_size_m: 0.10 # printed marker side length (m)
# ── IR beacon ─────────────────────────────────────────────────────────────
ir_threshold: 0.50 # amplitude threshold for beacon detection
# ── Battery thresholds ────────────────────────────────────────────────────
battery_low_pct: 15.0 # SOC triggering auto-dock (%)
battery_high_pct: 80.0 # SOC declaring charge complete (%)
# ── Visual servo ──────────────────────────────────────────────────────────
servo_range_m: 1.00 # switch from Nav2 to IBVS at this distance (m)
k_linear: 0.30 # distance P-gain
k_angular: 0.80 # yaw P-gain
lateral_tol_m: 0.005 # ±5 mm alignment tolerance
contact_distance_m: 0.05 # distance below which contact is assumed (m)
max_linear_ms: 0.10 # forward speed ceiling during servo (m/s)
max_angular_rads: 0.30 # yaw rate ceiling (rad/s)
# ── Undocking maneuver ────────────────────────────────────────────────────
undock_speed_ms: -0.20 # reverse speed (m/s; must be negative)
undock_ticks_max: 50 # ticks at undock speed (2.5 s at 20 Hz)
# ── FSM timeouts ──────────────────────────────────────────────────────────
lost_ticks_max: 40 # ticks of lost detection before retry (2 s)
contact_timeout_ticks: 20 # ticks at CONTACT without charging before retry
# ── Nav2 waypoints [x, y, z, qx, qy, qz, qw] in map frame ───────────────
# Set these to the actual dock and pre-dock poses in your environment.
dock_pose: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]
pre_dock_pose: [0.0, -1.0, 0.0, 0.0, 0.0, 0.0, 1.0] # 1 m in front of dock

View File

@ -0,0 +1,53 @@
"""
docking.launch.py Launch the autonomous docking node (Issue #158).
Usage
-----
ros2 launch saltybot_docking docking.launch.py
ros2 launch saltybot_docking docking.launch.py \
battery_low_pct:=15.0 servo_range_m:=1.0
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg_share = get_package_share_directory("saltybot_docking")
default_params = os.path.join(pkg_share, "config", "docking_params.yaml")
return LaunchDescription([
DeclareLaunchArgument(
"params_file",
default_value=default_params,
description="Path to docking_params.yaml",
),
DeclareLaunchArgument(
"battery_low_pct",
default_value="15.0",
description="SOC threshold to trigger auto-dock (%)",
),
DeclareLaunchArgument(
"servo_range_m",
default_value="1.0",
description="Switch to visual servo within this distance (m)",
),
Node(
package="saltybot_docking",
executable="docking_node",
name="docking",
output="screen",
parameters=[
LaunchConfiguration("params_file"),
{
"battery_low_pct": LaunchConfiguration("battery_low_pct"),
"servo_range_m": LaunchConfiguration("servo_range_m"),
},
],
),
])

View File

@ -0,0 +1,24 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_docking</name>
<version>0.1.0</version>
<description>Docking station auto-return with ArUco/IR detection and visual servo (Issue #158)</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<depend>std_msgs</depend>
<depend>nav2_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,137 @@
"""
charge_monitor.py Battery charge monitoring for auto-dock return (Issue #158).
Consumes BatteryState data (voltage, current, percentage) each control tick
and emits discrete ChargeEvents when meaningful thresholds are crossed.
Events
CHARGING_STARTED : charge current rises above charging_current_threshold_a
(docking pins making contact and charger active)
CHARGING_STOPPED : charge current drops back to near-zero (unplugged or
charge complete at hardware level)
THRESHOLD_LOW : SOC dropped below low_threshold_pct trigger auto-dock
THRESHOLD_HIGH : SOC rose above high_threshold_pct resume mission
Each event is emitted at most once per crossing (edge-triggered).
Pure module no ROS2 dependency.
"""
from __future__ import annotations
from enum import Enum, auto
from typing import List
# ── Events ────────────────────────────────────────────────────────────────────
class ChargeEvent(Enum):
CHARGING_STARTED = auto()
CHARGING_STOPPED = auto()
THRESHOLD_LOW = auto()
THRESHOLD_HIGH = auto()
# ── ChargeMonitor ─────────────────────────────────────────────────────────────
class ChargeMonitor:
"""
Edge-triggered monitor for battery charging state and SOC thresholds.
Parameters
----------
low_threshold_pct : SOC below which THRESHOLD_LOW fires (default 15 %)
high_threshold_pct : SOC above which THRESHOLD_HIGH fires (default 80 %)
charging_current_threshold_a : minimum current (A) to declare charging
(positive = charging convention)
"""
def __init__(
self,
low_threshold_pct: float = 15.0,
high_threshold_pct: float = 80.0,
charging_current_threshold_a: float = 0.10,
):
self._low_pct = float(low_threshold_pct)
self._high_pct = float(high_threshold_pct)
self._i_thresh = float(charging_current_threshold_a)
# Internal state (edge-detection flags)
self._is_charging: bool = False
self._was_low: bool = False # True while SOC is in low zone
self._was_high: bool = True # True while SOC is in high zone
self._percentage: float = 100.0
# ── Properties ────────────────────────────────────────────────────────────
@property
def is_charging(self) -> bool:
"""True when charge current has been detected."""
return self._is_charging
@property
def percentage(self) -> float:
"""Most recently observed state of charge (0100 %)."""
return self._percentage
@property
def is_depleted(self) -> bool:
"""True when SOC is currently below the low threshold."""
return self._percentage < self._low_pct
@property
def is_charged(self) -> bool:
"""True when SOC is currently above the high threshold."""
return self._percentage >= self._high_pct
# ── Update ────────────────────────────────────────────────────────────────
def update(
self,
percentage: float,
current_a: float,
) -> List[ChargeEvent]:
"""
Process one sample from BatteryState.
Parameters
----------
percentage : state of charge 0100 (from BatteryState.percentage × 100)
current_a : charge/discharge current (A); positive = charging
Returns
-------
List of ChargeEvents (may be empty; never duplicates per edge).
"""
events: List[ChargeEvent] = []
self._percentage = float(percentage)
now_charging = current_a >= self._i_thresh
# ── Charging edge detection ───────────────────────────────────────────
if now_charging and not self._is_charging:
events.append(ChargeEvent.CHARGING_STARTED)
elif not now_charging and self._is_charging:
events.append(ChargeEvent.CHARGING_STOPPED)
self._is_charging = now_charging
# ── Low threshold — edge on falling through ───────────────────────────
in_low_zone = self._percentage < self._low_pct
if in_low_zone and not self._was_low:
events.append(ChargeEvent.THRESHOLD_LOW)
self._was_low = in_low_zone
# ── High threshold — edge on rising through ───────────────────────────
in_high_zone = self._percentage >= self._high_pct
if in_high_zone and not self._was_high:
events.append(ChargeEvent.THRESHOLD_HIGH)
self._was_high = in_high_zone
return events
def reset(self) -> None:
"""Clear all state (use when node restarts)."""
self._is_charging = False
self._was_low = False
self._was_high = True
self._percentage = 100.0

View File

@ -0,0 +1,210 @@
"""
dock_detector.py ArUco marker + IR beacon detection for docking (Issue #158).
Detection sources
ArUco marker : cv2.aruco marker on the dock face; PnP solve gives full 3-D
pose relative to the robot camera. Marker ID and physical
size are configurable.
IR beacon : single ADC channel (normalised 01) with envelope follower
and amplitude threshold. Provides presence detection only
(no pose), used as a fallback or range-alert.
Outputs
DockPose(distance_m, yaw_rad, lateral_m, source)
distance_m straight-line distance to dock marker centre (m)
yaw_rad bearing error (rad); +ve = dock is right of camera axis (tx > 0)
lateral_m lateral offset (m) in camera frame; +ve = dock is right
source "aruco" | "ir"
ArucoDetector.detect() returns Optional[DockPose]; None if marker absent or
OpenCV / cv2.aruco are unavailable at runtime.
IRBeaconDetector.update(sample) returns bool (True = beacon detected).
Pure module no ROS2 dependency.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from typing import Optional
# ── DockPose ─────────────────────────────────────────────────────────────────
@dataclass
class DockPose:
"""Dock pose in the robot's camera frame."""
distance_m: float # straight-line distance to marker (m)
yaw_rad: float # bearing error (rad); +ve = dock is right of camera axis (tx > 0)
lateral_m: float # lateral offset (m) in camera X; +ve = dock is right
source: str # "aruco" | "ir"
# ── ArucoDetector ─────────────────────────────────────────────────────────────
class ArucoDetector:
"""
Detect a single ArUco marker and solve its 3-D pose with PnP.
Parameters
----------
marker_id : ArUco marker ID placed on the dock face (default 42)
marker_size_m : physical side length of the printed marker (m)
aruco_dict_id : cv2.aruco dictionary constant (default DICT_4X4_50 = 0)
"""
MARKER_ID_DEFAULT = 42
MARKER_SIZE_DEFAULT = 0.10 # 10 cm
def __init__(
self,
marker_id: int = MARKER_ID_DEFAULT,
marker_size_m: float = MARKER_SIZE_DEFAULT,
aruco_dict_id: int = 0, # cv2.aruco.DICT_4X4_50
):
self._marker_id = marker_id
self._marker_size = marker_size_m
self._dict_id = aruco_dict_id
self._ready: Optional[bool] = None # None = not yet attempted
# Set by _init(); None when OpenCV unavailable
self._cv2 = None
self._detector = None
self._obj_pts = None
# ── Initialisation ────────────────────────────────────────────────────────
def _init(self) -> bool:
"""Lazy-initialise cv2.aruco. Returns False if unavailable."""
if self._ready is not None:
return self._ready
try:
import cv2
import numpy as np
aruco_dict = cv2.aruco.getPredefinedDictionary(self._dict_id)
aruco_params = cv2.aruco.DetectorParameters()
self._detector = cv2.aruco.ArucoDetector(aruco_dict, aruco_params)
half = self._marker_size / 2.0
self._obj_pts = np.array([
[-half, half, 0.0],
[ half, half, 0.0],
[ half, -half, 0.0],
[-half, -half, 0.0],
], dtype=np.float64)
self._cv2 = cv2
self._ready = True
except Exception:
self._ready = False
return self._ready
# ── Public API ────────────────────────────────────────────────────────────
@property
def available(self) -> bool:
"""True if cv2.aruco is importable and initialised."""
return self._init()
def detect(
self,
image, # numpy.ndarray H×W×3 BGR (or greyscale)
camera_matrix, # numpy.ndarray 3×3
dist_coeffs, # numpy.ndarray 1×4 or 1×5
) -> Optional[DockPose]:
"""
Detect the dock marker in *image* and return its pose.
Returns None if the target marker is absent, PnP fails, or OpenCV is
not available.
"""
if not self._init():
return None
try:
corners, ids, _ = self._detector.detectMarkers(image)
except Exception:
return None
if ids is None or len(ids) == 0:
return None
import numpy as np
for i, mid in enumerate(ids.flatten()):
if int(mid) != self._marker_id:
continue
img_pts = corners[i].reshape(4, 2).astype(np.float64)
ok, rvec, tvec = self._cv2.solvePnP(
self._obj_pts, img_pts,
camera_matrix, dist_coeffs,
flags=self._cv2.SOLVEPNP_IPPE_SQUARE,
)
if not ok:
continue
tx = float(tvec[0])
ty = float(tvec[1])
tz = float(tvec[2])
distance = math.sqrt(tx * tx + ty * ty + tz * tz)
yaw = math.atan2(tx, tz) # camera-x deviation from optical axis
return DockPose(
distance_m=distance,
yaw_rad=yaw,
lateral_m=tx,
source="aruco",
)
return None
# ── IRBeaconDetector ──────────────────────────────────────────────────────────
class IRBeaconDetector:
"""
IR beacon presence detector from a single normalised ADC channel.
Maintains an exponential envelope follower. When the smoothed amplitude
exceeds *threshold* the beacon is considered detected.
Parameters
----------
threshold : amplitude above which beacon is 'detected' (default 0.5)
smoothing_alpha : EMA coefficient (0, 1]; 1 = no smoothing (default 0.3)
"""
def __init__(
self,
threshold: float = 0.5,
smoothing_alpha: float = 0.3,
):
self._threshold = max(1e-9, float(threshold))
self._alpha = max(1e-9, min(1.0, smoothing_alpha))
self._envelope = 0.0
# ── Properties ────────────────────────────────────────────────────────────
@property
def envelope(self) -> float:
"""Current EMA envelope value."""
return self._envelope
@property
def detected(self) -> bool:
"""True when smoothed amplitude ≥ threshold."""
return self._envelope >= self._threshold
# ── Update ────────────────────────────────────────────────────────────────
def update(self, sample: float) -> bool:
"""
Submit one ADC sample (normalised 01 or raw voltage).
Returns True if the beacon is currently detected.
"""
amp = abs(sample)
self._envelope = (1.0 - self._alpha) * self._envelope + self._alpha * amp
return self.detected
def reset(self) -> None:
"""Clear envelope to zero (e.g. when node restarts)."""
self._envelope = 0.0

View File

@ -0,0 +1,482 @@
"""
docking_node.py Autonomous docking station return (Issue #158).
Overview
Orchestrates dock detection, Nav2 corridor approach, visual-servo final
alignment, and charge monitoring. Interrupts the active Nav2 mission when
battery drops to 15 % and resumes when charged to 80 %.
Pipeline (20 Hz)
1. Camera callback ArucoDetector DockPose (if marker visible)
2. IR callback IRBeaconDetector presence flag (fallback)
3. Battery callback ChargeMonitor ChargeEvents
4. 20 Hz timer DockingStateMachine.tick()
publish cmd_vel (VISUAL_SERVO + UNDOCKING)
Nav2 action calls (NAV2_APPROACH)
publish DockingStatus
Subscribes
<image_topic> sensor_msgs/Image dock camera
<camera_info_topic> sensor_msgs/CameraInfo intrinsics for PnP
<ir_topic> std_msgs/Float32 IR beacon ADC sample
<battery_topic> sensor_msgs/BatteryState SOC + current
Publishes
/saltybot/docking_status saltybot_docking_msgs/DockingStatus
/saltybot/docking_cmd_vel geometry_msgs/Twist servo + undock commands
/saltybot/resume_mission std_msgs/Bool trigger mission resume
Services
/saltybot/dock saltybot_docking_msgs/Dock
/saltybot/undock saltybot_docking_msgs/Undock
Action client
/navigate_to_pose nav2_msgs/action/NavigateToPose (optional)
Parameters
control_rate 20.0 Hz
image_topic /camera/image_raw
camera_info_topic /camera/camera_info
ir_topic /saltybot/ir_beacon
battery_topic /saltybot/battery_state
aruco_marker_id 42
aruco_marker_size_m 0.10
ir_threshold 0.50
battery_low_pct 15.0
battery_high_pct 80.0
servo_range_m 1.00 m (switch to IBVS within this distance)
k_linear 0.30
k_angular 0.80
lateral_tol_m 0.005 m (±5 mm alignment tolerance)
contact_distance_m 0.05 m
max_linear_ms 0.10 m/s
max_angular_rads 0.30 rad/s
undock_speed_ms -0.20 m/s
undock_ticks_max 50 (2.5 s at 20 Hz)
lost_ticks_max 40 (2.0 s at 20 Hz)
contact_timeout_ticks 20 (1.0 s at 20 Hz)
dock_pose [0,0,0,0,0,0,1] [x,y,z,qx,qy,qz,qw] map frame
pre_dock_pose [0,0,0,0,0,0,1] 1 m in front of dock
"""
import json
import math
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
from geometry_msgs.msg import PoseStamped, Twist
from sensor_msgs.msg import BatteryState, CameraInfo, Image
from std_msgs.msg import Bool, Float32
from saltybot_docking.charge_monitor import ChargeEvent, ChargeMonitor
from saltybot_docking.dock_detector import ArucoDetector, DockPose, IRBeaconDetector
from saltybot_docking.docking_state_machine import (
DockingInputs,
DockingState,
DockingStateMachine,
)
from saltybot_docking.visual_servo import VisualServo
try:
from saltybot_docking_msgs.msg import DockingStatus
from saltybot_docking_msgs.srv import Dock, Undock
_MSGS_OK = True
except ImportError:
_MSGS_OK = False
try:
from nav2_msgs.action import NavigateToPose
_NAV2_OK = True
except ImportError:
_NAV2_OK = False
try:
import cv2
import numpy as np
_CV2_OK = True
except ImportError:
_CV2_OK = False
# ── Helper ────────────────────────────────────────────────────────────────────
def _pose_param_to_stamped(vals: list, frame_id: str, stamp) -> PoseStamped:
"""Convert [x,y,z,qx,qy,qz,qw] list to PoseStamped."""
ps = PoseStamped()
ps.header.frame_id = frame_id
ps.header.stamp = stamp
if len(vals) >= 7:
ps.pose.position.x = float(vals[0])
ps.pose.position.y = float(vals[1])
ps.pose.position.z = float(vals[2])
ps.pose.orientation.x = float(vals[3])
ps.pose.orientation.y = float(vals[4])
ps.pose.orientation.z = float(vals[5])
ps.pose.orientation.w = float(vals[6])
return ps
# ── Node ──────────────────────────────────────────────────────────────────────
class DockingNode(Node):
def __init__(self):
super().__init__("docking")
# ── Parameters ───────────────────────────────────────────────────────
self._declare_params()
p = self._load_params()
# ── Sub-systems ──────────────────────────────────────────────────────
self._aruco = ArucoDetector(
marker_id=p["aruco_marker_id"],
marker_size_m=p["aruco_marker_size_m"],
)
self._ir = IRBeaconDetector(
threshold=p["ir_threshold"],
)
self._servo = VisualServo(
k_linear=p["k_linear"],
k_angular=p["k_angular"],
lateral_tol_m=p["lateral_tol_m"],
contact_distance_m=p["contact_distance_m"],
max_linear_ms=p["max_linear_ms"],
max_angular_rads=p["max_angular_rads"],
)
self._charge = ChargeMonitor(
low_threshold_pct=p["battery_low_pct"],
high_threshold_pct=p["battery_high_pct"],
)
self._fsm = DockingStateMachine(
battery_low_pct=p["battery_low_pct"],
battery_high_pct=p["battery_high_pct"],
servo_range_m=p["servo_range_m"],
undock_speed_ms=p["undock_speed_ms"],
undock_ticks_max=p["undock_ticks_max"],
lost_ticks_max=p["lost_ticks_max"],
contact_timeout_ticks=p["contact_timeout_ticks"],
)
# ── Mutable state ────────────────────────────────────────────────────
self._latest_dock_pose: DockPose | None = None
self._latest_servo_aligned: bool = False
self._nav2_arrived: bool = False
self._nav2_failed: bool = False
self._dock_requested: bool = False
self._undock_requested: bool = False
self._camera_matrix = None
self._dist_coeffs = None
self._nav2_goal_handle = None
# ── QoS ──────────────────────────────────────────────────────────────
reliable = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
best_effort = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=1,
)
# ── Subscriptions ────────────────────────────────────────────────────
self.create_subscription(Image, p["image_topic"], self._image_cb, best_effort)
self.create_subscription(CameraInfo, p["camera_info_topic"], self._cam_info_cb, reliable)
self.create_subscription(Float32, p["ir_topic"], self._ir_cb, best_effort)
self.create_subscription(BatteryState, p["battery_topic"], self._battery_cb, reliable)
# ── Publishers ───────────────────────────────────────────────────────
self._cmd_vel_pub = self.create_publisher(
Twist, "/saltybot/docking_cmd_vel", reliable
)
self._resume_pub = self.create_publisher(
Bool, "/saltybot/resume_mission", reliable
)
self._status_pub = None
if _MSGS_OK:
self._status_pub = self.create_publisher(
DockingStatus, "/saltybot/docking_status", reliable
)
# ── Services ─────────────────────────────────────────────────────────
if _MSGS_OK:
self.create_service(Dock, "/saltybot/dock", self._dock_srv)
self.create_service(Undock, "/saltybot/undock", self._undock_srv)
# ── Nav2 action client (optional) ────────────────────────────────────
self._nav2_client = None
if _NAV2_OK:
self._nav2_client = ActionClient(self, NavigateToPose, "navigate_to_pose")
# ── Timer ────────────────────────────────────────────────────────────
rate = p["control_rate"]
self._timer = self.create_timer(1.0 / rate, self._control_cb)
self.get_logger().info(
f"DockingNode ready rate={rate}Hz "
f"nav2={'yes' if _NAV2_OK else 'no'} "
f"aruco={'yes' if _CV2_OK else 'no'}"
)
# ── Parameters ────────────────────────────────────────────────────────────
def _declare_params(self) -> None:
self.declare_parameter("control_rate", 20.0)
self.declare_parameter("image_topic", "/camera/image_raw")
self.declare_parameter("camera_info_topic", "/camera/camera_info")
self.declare_parameter("ir_topic", "/saltybot/ir_beacon")
self.declare_parameter("battery_topic", "/saltybot/battery_state")
self.declare_parameter("aruco_marker_id", 42)
self.declare_parameter("aruco_marker_size_m", 0.10)
self.declare_parameter("ir_threshold", 0.50)
self.declare_parameter("battery_low_pct", 15.0)
self.declare_parameter("battery_high_pct", 80.0)
self.declare_parameter("servo_range_m", 1.00)
self.declare_parameter("k_linear", 0.30)
self.declare_parameter("k_angular", 0.80)
self.declare_parameter("lateral_tol_m", 0.005)
self.declare_parameter("contact_distance_m", 0.05)
self.declare_parameter("max_linear_ms", 0.10)
self.declare_parameter("max_angular_rads", 0.30)
self.declare_parameter("undock_speed_ms", -0.20)
self.declare_parameter("undock_ticks_max", 50)
self.declare_parameter("lost_ticks_max", 40)
self.declare_parameter("contact_timeout_ticks", 20)
self.declare_parameter("dock_pose", [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0])
self.declare_parameter("pre_dock_pose", [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0])
def _load_params(self) -> dict:
g = self.get_parameter
keys = [
"control_rate",
"image_topic", "camera_info_topic", "ir_topic", "battery_topic",
"aruco_marker_id", "aruco_marker_size_m", "ir_threshold",
"battery_low_pct", "battery_high_pct",
"servo_range_m", "k_linear", "k_angular",
"lateral_tol_m", "contact_distance_m", "max_linear_ms", "max_angular_rads",
"undock_speed_ms", "undock_ticks_max", "lost_ticks_max", "contact_timeout_ticks",
"dock_pose", "pre_dock_pose",
]
return {k: g(k).value for k in keys}
# ── Subscriptions ─────────────────────────────────────────────────────────
def _cam_info_cb(self, msg: CameraInfo) -> None:
if _CV2_OK and self._camera_matrix is None:
self._camera_matrix = np.array(msg.k, dtype=np.float64).reshape(3, 3)
self._dist_coeffs = np.array(msg.d, dtype=np.float64)
def _image_cb(self, msg: Image) -> None:
if not _CV2_OK or self._camera_matrix is None:
return
try:
img = self._ros_image_to_bgr(msg)
except Exception:
return
pose = self._aruco.detect(img, self._camera_matrix, self._dist_coeffs)
if pose is not None:
self._latest_dock_pose = pose
# Update visual servo for aligned flag
servo_result = self._servo.update(pose)
self._latest_servo_aligned = servo_result.aligned
def _ir_cb(self, msg: Float32) -> None:
detected = self._ir.update(msg.data)
# IR gives presence only — supply a synthetic far DockPose if no ArUco
if detected and self._latest_dock_pose is None:
# Beacon says dock is ahead, distance unknown; use sentinel 5.0 m
self._latest_dock_pose = DockPose(
distance_m=5.0, yaw_rad=0.0, lateral_m=0.0, source="ir"
)
def _battery_cb(self, msg: BatteryState) -> None:
pct = float(msg.percentage) * 100.0 if msg.percentage <= 1.0 else float(msg.percentage)
current = float(msg.current)
events = self._charge.update(pct, current)
for ev in events:
if ev == ChargeEvent.THRESHOLD_LOW:
self.get_logger().warn(
f"Battery low ({pct:.1f}%) — initiating auto-dock"
)
elif ev == ChargeEvent.CHARGING_STARTED:
self.get_logger().info("Charging started")
elif ev == ChargeEvent.THRESHOLD_HIGH:
self.get_logger().info(
f"Battery charged ({pct:.1f}%) — ready to undock"
)
# ── 20 Hz control loop ────────────────────────────────────────────────────
def _control_cb(self) -> None:
inp = DockingInputs(
battery_pct=self._charge.percentage,
is_charging=self._charge.is_charging,
dock_pose=self._latest_dock_pose,
servo_aligned=self._latest_servo_aligned,
dock_requested=self._dock_requested,
undock_requested=self._undock_requested,
nav2_arrived=self._nav2_arrived,
nav2_failed=self._nav2_failed,
)
# Consume one-shot flags
self._dock_requested = False
self._undock_requested = False
self._nav2_arrived = False
self._nav2_failed = False
# Clear dock pose so stale detections don't persist past one tick
self._latest_dock_pose = None
self._latest_servo_aligned = False
out = self._fsm.tick(inp)
# ── State-entry side effects ──────────────────────────────────────────
if out.state_changed:
self.get_logger().info(f"Docking FSM → {out.state.value}")
if out.request_nav2:
self._send_nav2_goal()
if out.cancel_nav2:
self._cancel_nav2_goal()
if out.mission_interrupted:
self.get_logger().warn("Mission interrupted for auto-dock (battery low)")
if out.mission_resume:
msg = Bool()
msg.data = True
self._resume_pub.publish(msg)
self.get_logger().info("Auto-dock complete — resuming mission")
# ── Velocity commands ─────────────────────────────────────────────────
if out.state in (DockingState.VISUAL_SERVO, DockingState.UNDOCKING):
twist = Twist()
twist.linear.x = out.cmd_linear
twist.angular.z = out.cmd_angular
self._cmd_vel_pub.publish(twist)
# ── Status ────────────────────────────────────────────────────────────
if self._status_pub is not None:
self._publish_status(inp, out)
# ── Nav2 integration ──────────────────────────────────────────────────────
def _send_nav2_goal(self) -> None:
if not _NAV2_OK or self._nav2_client is None:
self.get_logger().warn("Nav2 unavailable — skipping pre-dock approach")
self._nav2_arrived = True # skip straight to VISUAL_SERVO next tick
return
if not self._nav2_client.wait_for_server(timeout_sec=1.0):
self.get_logger().warn("Nav2 action server not ready — skipping approach")
self._nav2_arrived = True
return
p = self._load_params()
goal = NavigateToPose.Goal()
goal.pose = _pose_param_to_stamped(
p["pre_dock_pose"], "map", self.get_clock().now().to_msg()
)
send_future = self._nav2_client.send_goal_async(goal)
send_future.add_done_callback(self._nav2_goal_response_cb)
def _nav2_goal_response_cb(self, future) -> None:
goal_handle = future.result()
if not goal_handle or not goal_handle.accepted:
self.get_logger().warn("Nav2 goal rejected")
self._nav2_failed = True
return
self._nav2_goal_handle = goal_handle
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self._nav2_result_cb)
def _nav2_result_cb(self, future) -> None:
result = future.result()
if result and result.status == 4: # SUCCEEDED
self._nav2_arrived = True
else:
self._nav2_failed = True
def _cancel_nav2_goal(self) -> None:
if self._nav2_goal_handle is not None:
self._nav2_goal_handle.cancel_goal_async()
self._nav2_goal_handle = None
# ── Services ──────────────────────────────────────────────────────────────
def _dock_srv(self, request, response):
if self._fsm.state not in (DockingState.IDLE,) and not request.force:
response.success = False
response.message = f"Already in state {self._fsm.state.value}; use force=True to override"
return response
self._dock_requested = True
response.success = True
response.message = "Docking initiated"
return response
def _undock_srv(self, request, response):
if self._fsm.state == DockingState.IDLE:
response.success = False
response.message = "Already idle"
return response
self._undock_requested = True
response.success = True
response.message = "Undocking initiated"
return response
# ── Publishing ────────────────────────────────────────────────────────────
def _publish_status(self, inp: DockingInputs, out) -> None:
msg = DockingStatus()
msg.stamp = self.get_clock().now().to_msg()
msg.state = out.state.value
msg.dock_detected = inp.dock_pose is not None
msg.distance_m = float(inp.dock_pose.distance_m) if inp.dock_pose else float("nan")
msg.lateral_error_m = float(inp.dock_pose.lateral_m) if inp.dock_pose else float("nan")
msg.battery_pct = float(self._charge.percentage)
msg.charging = self._charge.is_charging
msg.aligned = inp.servo_aligned
self._status_pub.publish(msg)
# ── Utilities ─────────────────────────────────────────────────────────────
@staticmethod
def _ros_image_to_bgr(msg: Image):
"""Convert sensor_msgs/Image to cv2 BGR numpy array."""
import numpy as np
data = np.frombuffer(msg.data, dtype=np.uint8)
if msg.encoding in ("bgr8", "rgb8"):
img = data.reshape(msg.height, msg.width, 3)
if msg.encoding == "rgb8":
img = img[:, :, ::-1].copy()
elif msg.encoding == "mono8":
img = data.reshape(msg.height, msg.width)
else:
img = data.reshape(msg.height, msg.width, -1)
return img
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = DockingNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,257 @@
"""
docking_state_machine.py FSM for autonomous docking / undocking (Issue #158).
States
IDLE : normal operation; watching battery.
DETECTING : scanning for dock marker or IR beacon.
NAV2_APPROACH : Nav2 navigating to pre-dock waypoint (~1 m from dock face).
VISUAL_SERVO : IBVS final approach (<= servo_range_m to dock).
CONTACT : dock pins touched; waiting for charge current.
CHARGING : charging in progress.
UNDOCKING : backing away from dock at fixed reverse speed.
Transitions
IDLE DETECTING : battery_pct < battery_low_pct OR dock_requested
DETECTING NAV2_APPROACH : dock_pose found AND distance > servo_range_m
DETECTING VISUAL_SERVO : dock_pose found AND distance servo_range_m
NAV2_APPROACH VISUAL_SERVO : dock_pose found AND distance servo_range_m
OR nav2_arrived
NAV2_APPROACH DETECTING : nav2_failed (retry detection)
VISUAL_SERVO CONTACT : servo_aligned
VISUAL_SERVO DETECTING : dock lost for > lost_ticks_max ticks
CONTACT CHARGING : is_charging
CONTACT VISUAL_SERVO : contact_timeout_ticks exceeded (retry)
CHARGING UNDOCKING : battery_pct >= battery_high_pct
OR undock_requested
UNDOCKING IDLE : undock_ticks >= undock_ticks_max
Any state IDLE : undock_requested (except CHARGING UNDOCKING)
DETECTING IDLE : undock_requested (abort scan)
Output signals
cmd_linear, cmd_angular : velocity during VISUAL_SERVO and UNDOCKING
request_nav2 : send Nav2 goal (emitted once on ENTRY to NAV2_APPROACH)
cancel_nav2 : cancel Nav2 goal
mission_interrupted : battery forced dock (for mission planner)
mission_resume : auto-resume after charge cycle completes
state_changed : FSM state changed this tick
Pure module no ROS2 dependency.
"""
from __future__ import annotations
import math
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
from saltybot_docking.dock_detector import DockPose
# ── States ────────────────────────────────────────────────────────────────────
class DockingState(Enum):
IDLE = "IDLE"
DETECTING = "DETECTING"
NAV2_APPROACH = "NAV2_APPROACH"
VISUAL_SERVO = "VISUAL_SERVO"
CONTACT = "CONTACT"
CHARGING = "CHARGING"
UNDOCKING = "UNDOCKING"
# ── I/O dataclasses ───────────────────────────────────────────────────────────
@dataclass
class DockingInputs:
"""Sensor snapshot fed to the FSM each tick."""
battery_pct: float = 100.0
is_charging: bool = False
dock_pose: Optional[DockPose] = None # None = dock not visible
servo_aligned: bool = False # visual servo declared aligned
dock_requested: bool = False # explicit /saltybot/dock call
undock_requested: bool = False # explicit /saltybot/undock call
nav2_arrived: bool = False # Nav2 action succeeded
nav2_failed: bool = False # Nav2 action failed/cancelled
@dataclass
class DockingOutputs:
"""Commands and signals produced by the FSM each tick."""
state: DockingState = DockingState.IDLE
cmd_linear: float = 0.0 # m/s (VISUAL_SERVO + UNDOCKING)
cmd_angular: float = 0.0 # rad/s (VISUAL_SERVO)
request_nav2: bool = False # send Nav2 goal
cancel_nav2: bool = False # cancel Nav2 goal
mission_interrupted: bool = False # battery dock forced
mission_resume: bool = False # charge cycle complete, resume
state_changed: bool = False # FSM state changed this tick
# ── DockingStateMachine ───────────────────────────────────────────────────────
class DockingStateMachine:
"""
Deterministic docking FSM.
Parameters
----------
battery_low_pct : SOC that triggers auto-dock (default 15 %)
battery_high_pct : SOC at which charging completes (default 80 %)
servo_range_m : switch from Nav2 to visual servo (default 1.0 m)
undock_speed_ms : reverse speed during undocking (m/s, must be 0)
undock_ticks_max : ticks at undock_speed before returning to IDLE
lost_ticks_max : allowed VISUAL_SERVO ticks with no detection
contact_timeout_ticks : CONTACT ticks before retry alignment
"""
def __init__(
self,
battery_low_pct: float = 15.0,
battery_high_pct: float = 80.0,
servo_range_m: float = 1.0,
undock_speed_ms: float = -0.20,
undock_ticks_max: int = 50, # 2.5 s at 20 Hz
lost_ticks_max: int = 40, # 2.0 s at 20 Hz
contact_timeout_ticks: int = 20, # 1.0 s at 20 Hz
):
self._bat_low = float(battery_low_pct)
self._bat_high = float(battery_high_pct)
self._servo_range = float(servo_range_m)
self._undock_speed = min(0.0, float(undock_speed_ms))
self._undock_max = max(1, int(undock_ticks_max))
self._lost_max = max(1, int(lost_ticks_max))
self._contact_max = max(1, int(contact_timeout_ticks))
self._state = DockingState.IDLE
self._undock_ticks = 0
self._lost_ticks = 0
self._contact_ticks = 0
self._mission_interrupted = False
self._resume_on_undock = False
# ── Public API ────────────────────────────────────────────────────────────
@property
def state(self) -> DockingState:
return self._state
def reset(self) -> None:
"""Return to IDLE and clear all counters (use for unit tests)."""
self._state = DockingState.IDLE
self._undock_ticks = 0
self._lost_ticks = 0
self._contact_ticks = 0
self._mission_interrupted = False
self._resume_on_undock = False
def tick(self, inputs: DockingInputs) -> DockingOutputs:
"""Advance the FSM by one tick and return output commands."""
prev_state = self._state
out = self._step(inputs)
out.state = self._state
out.state_changed = (self._state != prev_state)
if out.state_changed:
self._on_enter(self._state)
return out
# ── Internal step ─────────────────────────────────────────────────────────
def _step(self, inp: DockingInputs) -> DockingOutputs:
out = DockingOutputs(state=self._state)
# ── IDLE ──────────────────────────────────────────────────────────────
if self._state == DockingState.IDLE:
if inp.dock_requested:
self._state = DockingState.DETECTING
elif inp.battery_pct < self._bat_low:
self._state = DockingState.DETECTING
out.mission_interrupted = True
self._mission_interrupted = True
# ── DETECTING ─────────────────────────────────────────────────────────
elif self._state == DockingState.DETECTING:
if inp.undock_requested:
self._state = DockingState.IDLE
self._mission_interrupted = False
elif inp.dock_pose is not None:
if inp.dock_pose.distance_m <= self._servo_range:
self._state = DockingState.VISUAL_SERVO
else:
self._state = DockingState.NAV2_APPROACH
out.request_nav2 = True
# ── NAV2_APPROACH ─────────────────────────────────────────────────────
elif self._state == DockingState.NAV2_APPROACH:
if inp.undock_requested:
self._state = DockingState.IDLE
out.cancel_nav2 = True
self._mission_interrupted = False
elif inp.dock_pose is not None and inp.dock_pose.distance_m <= self._servo_range:
self._state = DockingState.VISUAL_SERVO
out.cancel_nav2 = True
elif inp.nav2_arrived:
self._state = DockingState.VISUAL_SERVO
elif inp.nav2_failed:
self._state = DockingState.DETECTING # retry scan
# ── VISUAL_SERVO ──────────────────────────────────────────────────────
elif self._state == DockingState.VISUAL_SERVO:
if inp.undock_requested:
self._state = DockingState.IDLE
elif inp.dock_pose is None:
self._lost_ticks += 1
if self._lost_ticks >= self._lost_max:
self._state = DockingState.DETECTING
else:
self._lost_ticks = 0
if inp.servo_aligned:
self._state = DockingState.CONTACT
# ── CONTACT ───────────────────────────────────────────────────────────
elif self._state == DockingState.CONTACT:
if inp.undock_requested:
self._state = DockingState.UNDOCKING
self._resume_on_undock = False
elif inp.is_charging:
self._state = DockingState.CHARGING
else:
self._contact_ticks += 1
if self._contact_ticks >= self._contact_max:
self._state = DockingState.VISUAL_SERVO # realign
# ── CHARGING ──────────────────────────────────────────────────────────
elif self._state == DockingState.CHARGING:
if inp.undock_requested:
self._state = DockingState.UNDOCKING
self._resume_on_undock = False
elif inp.battery_pct >= self._bat_high:
self._state = DockingState.UNDOCKING
self._resume_on_undock = self._mission_interrupted
# ── UNDOCKING ─────────────────────────────────────────────────────────
elif self._state == DockingState.UNDOCKING:
out.cmd_linear = self._undock_speed
self._undock_ticks += 1
if self._undock_ticks >= self._undock_max:
out.mission_resume = self._resume_on_undock
self._mission_interrupted = False
self._resume_on_undock = False
self._state = DockingState.IDLE
return out
# ── Entry actions ─────────────────────────────────────────────────────────
def _on_enter(self, state: DockingState) -> None:
"""Reset per-state counters when entering a new state."""
if state == DockingState.VISUAL_SERVO:
self._lost_ticks = 0
elif state == DockingState.CONTACT:
self._contact_ticks = 0
elif state == DockingState.UNDOCKING:
self._undock_ticks = 0

View File

@ -0,0 +1,130 @@
"""
visual_servo.py Image-based visual servo for final dock approach (Issue #158).
Control law
Operates once the robot is within *servo_range_m* of the dock.
Linear velocity (forward):
v = clip( k_linear × (distance_m target_distance_m), 0, max_linear_ms )
Angular velocity (yaw correction):
ω = clip( k_angular × yaw_rad, max_angular_rads, +max_angular_rads )
The linear term drives the robot toward the dock until it reaches
target_distance_m. The clip to [0, ] prevents reversing.
The angular term zeroes the bearing error; sign convention:
yaw_rad > 0 dock is to the robot's left → turn left (ω > 0).
Alignment declaration
`aligned` is True when both:
|lateral_m| < lateral_tol_m (default ±5 mm)
distance_m < contact_distance_m (default 50 mm)
Pure module no ROS2 dependency.
"""
from __future__ import annotations
import math
from dataclasses import dataclass
from typing import Optional
from saltybot_docking.dock_detector import DockPose
# ── Result ────────────────────────────────────────────────────────────────────
@dataclass
class VisualServoResult:
linear_x: float # forward velocity (m/s); ≥ 0
angular_z: float # yaw velocity (rad/s)
distance_m: float # current distance to dock (m)
lateral_error_m: float # current lateral offset (m); +ve = dock is right
aligned: bool # True when within contact tolerances
# ── VisualServo ───────────────────────────────────────────────────────────────
class VisualServo:
"""
Proportional visual servo controller for dock final approach.
Parameters
----------
k_linear : distance gain (m/s per metre of error)
k_angular : yaw gain (rad/s per radian of bearing error)
target_distance_m : desired stop distance from dock face (m)
lateral_tol_m : lateral alignment tolerance (m) [spec ±5 mm 0.005]
contact_distance_m: distance below which contact is assumed (m)
max_linear_ms : velocity ceiling (m/s)
max_angular_rads : yaw-rate ceiling (rad/s)
"""
def __init__(
self,
k_linear: float = 0.30,
k_angular: float = 0.80,
target_distance_m: float = 0.02, # 20 mm — at the dock face
lateral_tol_m: float = 0.005, # ±5 mm
contact_distance_m: float = 0.05, # 50 mm — within contact zone
max_linear_ms: float = 0.10, # slow for precision approach
max_angular_rads: float = 0.30,
):
self._k_lin = float(k_linear)
self._k_ang = float(k_angular)
self._target = float(target_distance_m)
self._lat_tol = max(1e-6, float(lateral_tol_m))
self._contact = max(self._target, float(contact_distance_m))
self._v_max = max(0.0, float(max_linear_ms))
self._w_max = max(0.0, float(max_angular_rads))
# ── Control step ─────────────────────────────────────────────────────────
def update(self, dock_pose: DockPose) -> VisualServoResult:
"""
Compute one velocity command from the current dock pose.
Parameters
----------
dock_pose : DockPose from ArucoDetector or IRBeaconDetector
Returns
-------
VisualServoResult with cmd velocities and alignment flag
"""
dist = dock_pose.distance_m
yaw = dock_pose.yaw_rad
lat = dock_pose.lateral_m
# Linear: approach until target_distance; no reversing
v_raw = self._k_lin * (dist - self._target)
v = max(0.0, min(self._v_max, v_raw))
# Angular: null yaw error; positive yaw → turn left (+ω)
w_raw = -self._k_ang * yaw
w = max(-self._w_max, min(self._w_max, w_raw))
aligned = (abs(lat) < self._lat_tol) and (dist < self._contact)
return VisualServoResult(
linear_x=v,
angular_z=w,
distance_m=dist,
lateral_error_m=lat,
aligned=aligned,
)
def stop(self) -> VisualServoResult:
"""
Zero-velocity command used when dock pose is momentarily lost.
"""
return VisualServoResult(
linear_x=0.0,
angular_z=0.0,
distance_m=float("nan"),
lateral_error_m=float("nan"),
aligned=False,
)

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_docking
[install]
install_scripts=$base/lib/saltybot_docking

View File

@ -0,0 +1,32 @@
from setuptools import setup, find_packages
import os
from glob import glob
package_name = "saltybot_docking"
setup(
name=package_name,
version="0.1.0",
packages=find_packages(exclude=["test"]),
data_files=[
("share/ament_index/resource_index/packages",
[f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(os.path.join("share", package_name, "config"),
glob("config/*.yaml")),
(os.path.join("share", package_name, "launch"),
glob("launch/*.py")),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description="Docking station auto-return with ArUco/IR detection and visual servo",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
f"docking_node = {package_name}.docking_node:main",
],
},
)

View File

@ -0,0 +1,580 @@
"""
test_docking.py Unit tests for Issue #158 docking modules.
Covers:
IRBeaconDetector envelope follower, threshold, reset
VisualServo velocity commands, clamping, alignment declaration
ChargeMonitor event detection, edge-triggering, threshold crossing
DockingStateMachine all state transitions and guard conditions
"""
import math
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import pytest
from saltybot_docking.dock_detector import DockPose, IRBeaconDetector
from saltybot_docking.visual_servo import VisualServo, VisualServoResult
from saltybot_docking.charge_monitor import ChargeEvent, ChargeMonitor
from saltybot_docking.docking_state_machine import (
DockingInputs,
DockingState,
DockingStateMachine,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_ir(**kw):
defaults = dict(threshold=0.5, smoothing_alpha=1.0)
defaults.update(kw)
return IRBeaconDetector(**defaults)
def _make_servo(**kw):
defaults = dict(
k_linear=1.0,
k_angular=1.0,
target_distance_m=0.02,
lateral_tol_m=0.005,
contact_distance_m=0.05,
max_linear_ms=0.5,
max_angular_rads=1.0,
)
defaults.update(kw)
return VisualServo(**defaults)
def _make_charge(**kw):
defaults = dict(
low_threshold_pct=15.0,
high_threshold_pct=80.0,
charging_current_threshold_a=0.10,
)
defaults.update(kw)
return ChargeMonitor(**defaults)
def _make_fsm(**kw):
defaults = dict(
battery_low_pct=15.0,
battery_high_pct=80.0,
servo_range_m=1.0,
undock_speed_ms=-0.20,
undock_ticks_max=5,
lost_ticks_max=3,
contact_timeout_ticks=3,
)
defaults.update(kw)
return DockingStateMachine(**defaults)
def _inp(**kw):
return DockingInputs(**kw)
def _near_pose(dist=0.5):
"""Dock pose within servo range (default 0.5 m)."""
return DockPose(distance_m=dist, yaw_rad=0.0, lateral_m=0.0, source="aruco")
def _far_pose(dist=3.0):
"""Dock pose outside servo range."""
return DockPose(distance_m=dist, yaw_rad=0.0, lateral_m=0.0, source="aruco")
# ══════════════════════════════════════════════════════════════════════════════
# IRBeaconDetector
# ══════════════════════════════════════════════════════════════════════════════
class TestIRBeaconDetector:
def test_zero_signal_not_detected(self):
ir = _make_ir()
assert ir.update(0.0) is False
assert ir.detected is False
def test_above_threshold_detected(self):
ir = _make_ir(threshold=0.5, smoothing_alpha=1.0)
assert ir.update(0.6) is True
assert ir.detected is True
def test_below_threshold_not_detected(self):
ir = _make_ir(threshold=0.5, smoothing_alpha=1.0)
assert ir.update(0.4) is False
def test_exactly_at_threshold_detected(self):
ir = _make_ir(threshold=0.5, smoothing_alpha=1.0)
assert ir.update(0.5) is True
def test_envelope_smoothing_alpha_one(self):
"""With alpha=1, envelope equals |sample| immediately."""
ir = _make_ir(smoothing_alpha=1.0)
ir.update(0.7)
assert ir.envelope == pytest.approx(0.7, abs=1e-9)
def test_envelope_smoothing_below_one(self):
"""With alpha=0.5, envelope is a running average."""
ir = _make_ir(threshold=999.0, smoothing_alpha=0.5)
ir.update(1.0) # envelope = 0.5
assert ir.envelope == pytest.approx(0.5, abs=1e-6)
ir.update(1.0) # envelope = 0.75
assert ir.envelope == pytest.approx(0.75, abs=1e-6)
def test_reset_clears_envelope(self):
ir = _make_ir(smoothing_alpha=1.0)
ir.update(0.8)
ir.reset()
assert ir.envelope == pytest.approx(0.0, abs=1e-9)
assert ir.detected is False
def test_negative_sample_uses_abs(self):
"""update() should use absolute value of sample."""
ir = _make_ir(threshold=0.5, smoothing_alpha=1.0)
assert ir.update(-0.7) is True
# ══════════════════════════════════════════════════════════════════════════════
# VisualServo
# ══════════════════════════════════════════════════════════════════════════════
class TestVisualServo:
def test_far_dock_gives_forward_velocity(self):
servo = _make_servo(k_linear=1.0, target_distance_m=0.02)
pose = DockPose(distance_m=0.5, yaw_rad=0.0, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.linear_x > 0.0
def test_at_target_distance_zero_linear(self):
servo = _make_servo(k_linear=1.0, target_distance_m=0.02)
pose = DockPose(distance_m=0.02, yaw_rad=0.0, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.linear_x == pytest.approx(0.0, abs=1e-9)
def test_behind_target_no_reverse(self):
"""When distance < target, linear velocity must be 0 (no reversing)."""
servo = _make_servo(k_linear=1.0, target_distance_m=0.10)
pose = DockPose(distance_m=0.01, yaw_rad=0.0, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.linear_x == pytest.approx(0.0, abs=1e-9)
def test_yaw_right_gives_negative_angular(self):
"""yaw_rad = atan2(tx, tz); tx > 0 → dock is right in camera → turn right (ω < 0)."""
servo = _make_servo(k_angular=1.0)
pose = DockPose(distance_m=0.5, yaw_rad=0.3, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.angular_z < 0.0
def test_yaw_left_gives_positive_angular(self):
"""tx < 0 → dock is left in camera → turn left (ω > 0)."""
servo = _make_servo(k_angular=1.0)
pose = DockPose(distance_m=0.5, yaw_rad=-0.3, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.angular_z > 0.0
def test_zero_yaw_zero_angular(self):
servo = _make_servo()
pose = DockPose(distance_m=0.5, yaw_rad=0.0, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.angular_z == pytest.approx(0.0, abs=1e-9)
def test_max_linear_clamped(self):
servo = _make_servo(k_linear=100.0, max_linear_ms=0.1)
pose = DockPose(distance_m=5.0, yaw_rad=0.0, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.linear_x <= 0.1 + 1e-9
def test_max_angular_clamped(self):
servo = _make_servo(k_angular=100.0, max_angular_rads=0.3)
pose = DockPose(distance_m=0.5, yaw_rad=5.0, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert abs(result.angular_z) <= 0.3 + 1e-9
def test_aligned_when_within_tolerances(self):
servo = _make_servo(lateral_tol_m=0.005, contact_distance_m=0.05)
pose = DockPose(distance_m=0.03, yaw_rad=0.0, lateral_m=0.002, source="aruco")
result = servo.update(pose)
assert result.aligned is True
def test_not_aligned_lateral_too_large(self):
servo = _make_servo(lateral_tol_m=0.005, contact_distance_m=0.05)
pose = DockPose(distance_m=0.03, yaw_rad=0.0, lateral_m=0.010, source="aruco")
result = servo.update(pose)
assert result.aligned is False
def test_not_aligned_too_far(self):
servo = _make_servo(lateral_tol_m=0.005, contact_distance_m=0.05)
pose = DockPose(distance_m=0.10, yaw_rad=0.0, lateral_m=0.001, source="aruco")
result = servo.update(pose)
assert result.aligned is False
def test_stop_returns_zero_velocity(self):
servo = _make_servo()
result = servo.stop()
assert result.linear_x == pytest.approx(0.0, abs=1e-9)
assert result.angular_z == pytest.approx(0.0, abs=1e-9)
assert result.aligned is False
def test_proportional_linear(self):
"""Linear velocity scales with (distance target)."""
servo = _make_servo(k_linear=2.0, target_distance_m=0.0, max_linear_ms=999.0)
pose = DockPose(distance_m=0.3, yaw_rad=0.0, lateral_m=0.0, source="aruco")
result = servo.update(pose)
assert result.linear_x == pytest.approx(2.0 * 0.3, rel=1e-6)
# ══════════════════════════════════════════════════════════════════════════════
# ChargeMonitor
# ══════════════════════════════════════════════════════════════════════════════
class TestChargeMonitor:
def test_initially_not_charging(self):
cm = _make_charge()
assert cm.is_charging is False
def test_charging_started_event(self):
cm = _make_charge(charging_current_threshold_a=0.1)
events = cm.update(50.0, 0.5)
assert ChargeEvent.CHARGING_STARTED in events
def test_charging_stopped_event(self):
cm = _make_charge(charging_current_threshold_a=0.1)
cm.update(50.0, 0.5) # start charging
events = cm.update(50.0, 0.0) # stop
assert ChargeEvent.CHARGING_STOPPED in events
def test_no_duplicate_charging_started(self):
"""CHARGING_STARTED fires only once per rising edge."""
cm = _make_charge(charging_current_threshold_a=0.1)
cm.update(50.0, 0.5) # fires CHARGING_STARTED
events = cm.update(50.0, 0.5) # still charging — no duplicate
assert ChargeEvent.CHARGING_STARTED not in events
def test_low_threshold_event_falling_edge(self):
"""THRESHOLD_LOW fires when SOC first drops below low_threshold_pct."""
cm = _make_charge(low_threshold_pct=20.0)
events = cm.update(19.0, 0.0)
assert ChargeEvent.THRESHOLD_LOW in events
def test_no_duplicate_low_threshold(self):
cm = _make_charge(low_threshold_pct=20.0)
cm.update(19.0, 0.0) # fires once
events = cm.update(18.0, 0.0) # still below — no repeat
assert ChargeEvent.THRESHOLD_LOW not in events
def test_high_threshold_event_rising_edge(self):
"""THRESHOLD_HIGH fires when SOC rises above high_threshold_pct."""
cm = _make_charge(low_threshold_pct=15.0, high_threshold_pct=80.0)
# Start below high threshold
cm.update(50.0, 0.5) # sets was_high = False (50 < 80)
events = cm.update(81.0, 0.5) # rises above 80
assert ChargeEvent.THRESHOLD_HIGH in events
def test_no_high_event_if_always_above(self):
"""THRESHOLD_HIGH should not fire if SOC starts above threshold."""
cm = _make_charge(high_threshold_pct=80.0)
# Default _was_high=True; remains above → no edge
events = cm.update(90.0, 0.0)
assert ChargeEvent.THRESHOLD_HIGH not in events
def test_is_depleted_property(self):
cm = _make_charge(low_threshold_pct=20.0)
cm.update(15.0, 0.0)
assert cm.is_depleted is True
def test_is_charged_property(self):
cm = _make_charge(high_threshold_pct=80.0)
cm.update(85.0, 0.0)
assert cm.is_charged is True
def test_percentage_tracking(self):
cm = _make_charge()
cm.update(42.5, 0.0)
assert cm.percentage == pytest.approx(42.5, abs=1e-6)
def test_reset_clears_state(self):
cm = _make_charge(charging_current_threshold_a=0.1)
cm.update(10.0, 0.5) # is_charging=True, is_depleted=True
cm.reset()
assert cm.is_charging is False
assert cm.is_depleted is False
assert cm.percentage == pytest.approx(100.0, abs=1e-6)
# ══════════════════════════════════════════════════════════════════════════════
# DockingStateMachine
# ══════════════════════════════════════════════════════════════════════════════
class TestDockingStateMachineBasic:
def test_initial_state_is_idle(self):
fsm = _make_fsm()
assert fsm.state == DockingState.IDLE
def test_idle_stays_idle_on_good_battery(self):
fsm = _make_fsm()
out = fsm.tick(_inp(battery_pct=80.0))
assert fsm.state == DockingState.IDLE
assert out.state_changed is False
def test_battery_low_triggers_detecting(self):
fsm = _make_fsm()
out = fsm.tick(_inp(battery_pct=10.0))
assert fsm.state == DockingState.DETECTING
assert out.mission_interrupted is True
assert out.state_changed is True
def test_dock_request_triggers_detecting(self):
fsm = _make_fsm()
out = fsm.tick(_inp(battery_pct=80.0, dock_requested=True))
assert fsm.state == DockingState.DETECTING
assert out.mission_interrupted is False
def test_dock_request_not_duplicated_mission_interrupted(self):
"""Explicit dock request should not set mission_interrupted."""
fsm = _make_fsm()
out = fsm.tick(_inp(battery_pct=80.0, dock_requested=True))
assert out.mission_interrupted is False
class TestDockingStateMachineDetecting:
def test_no_dock_stays_detecting(self):
fsm = _make_fsm()
fsm.tick(_inp(dock_requested=True))
out = fsm.tick(_inp()) # no dock_pose
assert fsm.state == DockingState.DETECTING
def test_far_dock_goes_to_nav2(self):
fsm = _make_fsm(servo_range_m=1.0)
fsm.tick(_inp(dock_requested=True))
out = fsm.tick(_inp(dock_pose=_far_pose()))
assert fsm.state == DockingState.NAV2_APPROACH
assert out.request_nav2 is True
def test_near_dock_skips_nav2_to_servo(self):
fsm = _make_fsm(servo_range_m=1.0)
fsm.tick(_inp(dock_requested=True))
out = fsm.tick(_inp(dock_pose=_near_pose()))
assert fsm.state == DockingState.VISUAL_SERVO
def test_undock_request_aborts_detecting(self):
fsm = _make_fsm()
fsm.tick(_inp(dock_requested=True))
fsm.tick(_inp(undock_requested=True))
assert fsm.state == DockingState.IDLE
class TestDockingStateMachineNav2Approach:
def _reach_nav2(self, fsm):
fsm.tick(_inp(dock_requested=True))
fsm.tick(_inp(dock_pose=_far_pose()))
assert fsm.state == DockingState.NAV2_APPROACH
def test_nav2_arrived_enters_visual_servo(self):
fsm = _make_fsm()
self._reach_nav2(fsm)
fsm.tick(_inp(nav2_arrived=True))
assert fsm.state == DockingState.VISUAL_SERVO
def test_dock_close_during_nav2_enters_servo(self):
fsm = _make_fsm(servo_range_m=1.0)
self._reach_nav2(fsm)
out = fsm.tick(_inp(dock_pose=_near_pose()))
assert fsm.state == DockingState.VISUAL_SERVO
assert out.cancel_nav2 is True
def test_nav2_failed_back_to_detecting(self):
fsm = _make_fsm()
self._reach_nav2(fsm)
fsm.tick(_inp(nav2_failed=True))
assert fsm.state == DockingState.DETECTING
def test_undock_during_nav2_cancels_and_idles(self):
fsm = _make_fsm()
self._reach_nav2(fsm)
out = fsm.tick(_inp(undock_requested=True))
assert fsm.state == DockingState.IDLE
assert out.cancel_nav2 is True
class TestDockingStateMachineVisualServo:
def _reach_servo(self, fsm):
fsm.tick(_inp(dock_requested=True))
fsm.tick(_inp(dock_pose=_near_pose()))
assert fsm.state == DockingState.VISUAL_SERVO
def test_aligned_enters_contact(self):
fsm = _make_fsm()
self._reach_servo(fsm)
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True))
assert fsm.state == DockingState.CONTACT
def test_not_aligned_stays_in_servo(self):
fsm = _make_fsm()
self._reach_servo(fsm)
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=False))
assert fsm.state == DockingState.VISUAL_SERVO
def test_lost_detection_timeout_enters_detecting(self):
fsm = _make_fsm(lost_ticks_max=3)
self._reach_servo(fsm)
for _ in range(3): # dock_pose=None each tick
fsm.tick(_inp())
assert fsm.state == DockingState.DETECTING
def test_lost_counter_resets_on_rediscovery(self):
"""Lost count should reset when pose reappears."""
fsm = _make_fsm(lost_ticks_max=3)
self._reach_servo(fsm)
fsm.tick(_inp()) # 1 lost tick
fsm.tick(_inp(dock_pose=_near_pose())) # rediscovered
# Need 3 more lost ticks to time out
fsm.tick(_inp())
fsm.tick(_inp())
assert fsm.state == DockingState.VISUAL_SERVO
fsm.tick(_inp())
assert fsm.state == DockingState.DETECTING
def test_undock_aborts_visual_servo(self):
fsm = _make_fsm()
self._reach_servo(fsm)
fsm.tick(_inp(undock_requested=True))
assert fsm.state == DockingState.IDLE
class TestDockingStateMachineContact:
def _reach_contact(self, fsm):
fsm.tick(_inp(dock_requested=True))
fsm.tick(_inp(dock_pose=_near_pose()))
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True))
assert fsm.state == DockingState.CONTACT
def test_charging_enters_charging_state(self):
fsm = _make_fsm()
self._reach_contact(fsm)
fsm.tick(_inp(is_charging=True))
assert fsm.state == DockingState.CHARGING
def test_contact_timeout_retries_servo(self):
fsm = _make_fsm(contact_timeout_ticks=3)
self._reach_contact(fsm)
for _ in range(3):
fsm.tick(_inp(is_charging=False))
assert fsm.state == DockingState.VISUAL_SERVO
def test_undock_from_contact_to_undocking(self):
fsm = _make_fsm()
self._reach_contact(fsm)
fsm.tick(_inp(undock_requested=True))
assert fsm.state == DockingState.UNDOCKING
class TestDockingStateMachineCharging:
def _reach_charging(self, fsm):
fsm.tick(_inp(dock_requested=True))
fsm.tick(_inp(dock_pose=_near_pose()))
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True))
fsm.tick(_inp(is_charging=True))
assert fsm.state == DockingState.CHARGING
def test_full_battery_triggers_undocking(self):
fsm = _make_fsm(battery_high_pct=80.0)
self._reach_charging(fsm)
fsm.tick(_inp(is_charging=True, battery_pct=85.0))
assert fsm.state == DockingState.UNDOCKING
def test_undock_request_during_charging(self):
fsm = _make_fsm()
self._reach_charging(fsm)
fsm.tick(_inp(undock_requested=True))
assert fsm.state == DockingState.UNDOCKING
def test_charging_stays_charging_below_high_threshold(self):
fsm = _make_fsm(battery_high_pct=80.0)
self._reach_charging(fsm)
fsm.tick(_inp(is_charging=True, battery_pct=70.0))
assert fsm.state == DockingState.CHARGING
class TestDockingStateMachineUndocking:
def _reach_undocking_via_charge(self, fsm):
fsm.tick(_inp(dock_requested=True))
fsm.tick(_inp(dock_pose=_near_pose()))
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True))
fsm.tick(_inp(is_charging=True))
fsm.tick(_inp(is_charging=True, battery_pct=85.0)) # → UNDOCKING
assert fsm.state == DockingState.UNDOCKING
def test_undocking_backs_away(self):
fsm = _make_fsm(undock_ticks_max=5)
self._reach_undocking_via_charge(fsm)
out = fsm.tick(_inp())
assert out.cmd_linear < 0.0
def test_undocking_completes_back_to_idle(self):
fsm = _make_fsm(undock_ticks_max=5)
self._reach_undocking_via_charge(fsm)
for _ in range(5):
out = fsm.tick(_inp())
assert fsm.state == DockingState.IDLE
def test_mission_resume_after_battery_auto_dock(self):
"""mission_resume must be True when auto-dock cycle completes."""
fsm = _make_fsm(undock_ticks_max=5)
# Trigger via battery low
fsm.tick(_inp(battery_pct=10.0)) # IDLE → DETECTING
fsm.tick(_inp(dock_pose=_near_pose())) # → VISUAL_SERVO
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True)) # → CONTACT
fsm.tick(_inp(is_charging=True)) # → CHARGING
fsm.tick(_inp(is_charging=True, battery_pct=85.0)) # → UNDOCKING
out = None
for _ in range(5):
out = fsm.tick(_inp())
assert out is not None
assert out.mission_resume is True
assert fsm.state == DockingState.IDLE
def test_no_mission_resume_on_explicit_undock(self):
"""Explicit undock should not trigger mission_resume."""
fsm = _make_fsm(undock_ticks_max=5)
fsm.tick(_inp(dock_requested=True)) # explicit dock — no interrupt
fsm.tick(_inp(dock_pose=_near_pose()))
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True))
fsm.tick(_inp(is_charging=True))
fsm.tick(_inp(undock_requested=True)) # explicit undock
last_out = None
for _ in range(5):
last_out = fsm.tick(_inp())
assert last_out is not None
assert last_out.mission_resume is False
class TestDockingStateMachineReset:
def test_reset_returns_to_idle(self):
fsm = _make_fsm()
fsm.tick(_inp(dock_requested=True))
assert fsm.state == DockingState.DETECTING
fsm.reset()
assert fsm.state == DockingState.IDLE
def test_state_changed_flag_on_transition(self):
fsm = _make_fsm()
out = fsm.tick(_inp(dock_requested=True))
assert out.state_changed is True
def test_no_state_changed_when_same_state(self):
fsm = _make_fsm()
out = fsm.tick(_inp()) # stays IDLE
assert out.state_changed is False

View File

@ -0,0 +1,16 @@
cmake_minimum_required(VERSION 3.8)
project(saltybot_docking_msgs)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(builtin_interfaces REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/DockingStatus.msg"
"srv/Dock.srv"
"srv/Undock.srv"
DEPENDENCIES builtin_interfaces
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()

View File

@ -0,0 +1,21 @@
# DockingStatus.msg — Real-time docking state snapshot (Issue #158)
# Published by: /saltybot/docking_node
# Topic: /saltybot/docking_status
builtin_interfaces/Time stamp
# Current FSM state
# Values: "IDLE" | "DETECTING" | "NAV2_APPROACH" | "VISUAL_SERVO" | "CONTACT" | "CHARGING" | "UNDOCKING"
string state
# Dock detection
bool dock_detected # ArUco marker or IR beacon visible
float32 distance_m # distance to dock (m); NaN if unknown
float32 lateral_error_m # lateral offset to dock centre (m); +ve = dock is right
# Battery / charging
float32 battery_pct # state of charge [0100]
bool charging # charge current detected on dock pins
# Alignment quality (active only during VISUAL_SERVO)
bool aligned # within ±5 mm lateral and contact distance

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_docking_msgs</name>
<version>0.1.0</version>
<description>Docking station message and service definitions for SaltyBot (Issue #158)</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<depend>builtin_interfaces</depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@ -0,0 +1,9 @@
# Dock.srv — Request the robot to autonomously dock (Issue #158)
# Service: /saltybot/dock
# If force=True the current Nav2 mission is cancelled immediately.
# If force=False the dock request is honoured only when the robot is idle.
bool force
---
bool success
string message

View File

@ -0,0 +1,9 @@
# Undock.srv — Request the robot to leave the dock (Issue #158)
# Service: /saltybot/undock
# If resume_mission=True and a mission was interrupted for battery, it is
# re-queued after the robot backs away from the dock.
bool resume_mission
---
bool success
string message