Merge pull request 'feat(#158): docking station auto-return — ArUco/IR detection, visual servo, charge monitoring' (#165) from sl-controls/issue-158-docking into main
This commit is contained in:
commit
77b3d614dc
@ -0,0 +1,43 @@
|
||||
/**:
|
||||
ros__parameters:
|
||||
# Control loop rate (Hz)
|
||||
control_rate: 20.0
|
||||
|
||||
# Topic names
|
||||
image_topic: "/camera/image_raw"
|
||||
camera_info_topic: "/camera/camera_info"
|
||||
ir_topic: "/saltybot/ir_beacon"
|
||||
battery_topic: "/saltybot/battery_state"
|
||||
|
||||
# ── ArUco detection ────────────────────────────────────────────────────────
|
||||
aruco_marker_id: 42 # marker ID on the dock face
|
||||
aruco_marker_size_m: 0.10 # printed marker side length (m)
|
||||
|
||||
# ── IR beacon ─────────────────────────────────────────────────────────────
|
||||
ir_threshold: 0.50 # amplitude threshold for beacon detection
|
||||
|
||||
# ── Battery thresholds ────────────────────────────────────────────────────
|
||||
battery_low_pct: 15.0 # SOC triggering auto-dock (%)
|
||||
battery_high_pct: 80.0 # SOC declaring charge complete (%)
|
||||
|
||||
# ── Visual servo ──────────────────────────────────────────────────────────
|
||||
servo_range_m: 1.00 # switch from Nav2 to IBVS at this distance (m)
|
||||
k_linear: 0.30 # distance P-gain
|
||||
k_angular: 0.80 # yaw P-gain
|
||||
lateral_tol_m: 0.005 # ±5 mm alignment tolerance
|
||||
contact_distance_m: 0.05 # distance below which contact is assumed (m)
|
||||
max_linear_ms: 0.10 # forward speed ceiling during servo (m/s)
|
||||
max_angular_rads: 0.30 # yaw rate ceiling (rad/s)
|
||||
|
||||
# ── Undocking maneuver ────────────────────────────────────────────────────
|
||||
undock_speed_ms: -0.20 # reverse speed (m/s; must be negative)
|
||||
undock_ticks_max: 50 # ticks at undock speed (2.5 s at 20 Hz)
|
||||
|
||||
# ── FSM timeouts ──────────────────────────────────────────────────────────
|
||||
lost_ticks_max: 40 # ticks of lost detection before retry (2 s)
|
||||
contact_timeout_ticks: 20 # ticks at CONTACT without charging before retry
|
||||
|
||||
# ── Nav2 waypoints [x, y, z, qx, qy, qz, qw] in map frame ───────────────
|
||||
# Set these to the actual dock and pre-dock poses in your environment.
|
||||
dock_pose: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]
|
||||
pre_dock_pose: [0.0, -1.0, 0.0, 0.0, 0.0, 0.0, 1.0] # 1 m in front of dock
|
||||
53
jetson/ros2_ws/src/saltybot_docking/launch/docking.launch.py
Normal file
53
jetson/ros2_ws/src/saltybot_docking/launch/docking.launch.py
Normal file
@ -0,0 +1,53 @@
|
||||
"""
|
||||
docking.launch.py — Launch the autonomous docking node (Issue #158).
|
||||
|
||||
Usage
|
||||
-----
|
||||
ros2 launch saltybot_docking docking.launch.py
|
||||
ros2 launch saltybot_docking docking.launch.py \
|
||||
battery_low_pct:=15.0 servo_range_m:=1.0
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
pkg_share = get_package_share_directory("saltybot_docking")
|
||||
default_params = os.path.join(pkg_share, "config", "docking_params.yaml")
|
||||
|
||||
return LaunchDescription([
|
||||
DeclareLaunchArgument(
|
||||
"params_file",
|
||||
default_value=default_params,
|
||||
description="Path to docking_params.yaml",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"battery_low_pct",
|
||||
default_value="15.0",
|
||||
description="SOC threshold to trigger auto-dock (%)",
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
"servo_range_m",
|
||||
default_value="1.0",
|
||||
description="Switch to visual servo within this distance (m)",
|
||||
),
|
||||
Node(
|
||||
package="saltybot_docking",
|
||||
executable="docking_node",
|
||||
name="docking",
|
||||
output="screen",
|
||||
parameters=[
|
||||
LaunchConfiguration("params_file"),
|
||||
{
|
||||
"battery_low_pct": LaunchConfiguration("battery_low_pct"),
|
||||
"servo_range_m": LaunchConfiguration("servo_range_m"),
|
||||
},
|
||||
],
|
||||
),
|
||||
])
|
||||
24
jetson/ros2_ws/src/saltybot_docking/package.xml
Normal file
24
jetson/ros2_ws/src/saltybot_docking/package.xml
Normal file
@ -0,0 +1,24 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_docking</name>
|
||||
<version>0.1.0</version>
|
||||
<description>Docking station auto-return with ArUco/IR detection and visual servo (Issue #158)</description>
|
||||
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>nav2_msgs</depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,137 @@
|
||||
"""
|
||||
charge_monitor.py — Battery charge monitoring for auto-dock return (Issue #158).
|
||||
|
||||
Consumes BatteryState data (voltage, current, percentage) each control tick
|
||||
and emits discrete ChargeEvents when meaningful thresholds are crossed.
|
||||
|
||||
Events
|
||||
──────
|
||||
CHARGING_STARTED : charge current rises above charging_current_threshold_a
|
||||
(docking pins making contact and charger active)
|
||||
CHARGING_STOPPED : charge current drops back to near-zero (unplugged or
|
||||
charge complete at hardware level)
|
||||
THRESHOLD_LOW : SOC dropped below low_threshold_pct → trigger auto-dock
|
||||
THRESHOLD_HIGH : SOC rose above high_threshold_pct → resume mission
|
||||
|
||||
Each event is emitted at most once per crossing (edge-triggered).
|
||||
|
||||
Pure module — no ROS2 dependency.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import Enum, auto
|
||||
from typing import List
|
||||
|
||||
|
||||
# ── Events ────────────────────────────────────────────────────────────────────
|
||||
|
||||
class ChargeEvent(Enum):
|
||||
CHARGING_STARTED = auto()
|
||||
CHARGING_STOPPED = auto()
|
||||
THRESHOLD_LOW = auto()
|
||||
THRESHOLD_HIGH = auto()
|
||||
|
||||
|
||||
# ── ChargeMonitor ─────────────────────────────────────────────────────────────
|
||||
|
||||
class ChargeMonitor:
|
||||
"""
|
||||
Edge-triggered monitor for battery charging state and SOC thresholds.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
low_threshold_pct : SOC below which THRESHOLD_LOW fires (default 15 %)
|
||||
high_threshold_pct : SOC above which THRESHOLD_HIGH fires (default 80 %)
|
||||
charging_current_threshold_a : minimum current (A) to declare charging
|
||||
(positive = charging convention)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
low_threshold_pct: float = 15.0,
|
||||
high_threshold_pct: float = 80.0,
|
||||
charging_current_threshold_a: float = 0.10,
|
||||
):
|
||||
self._low_pct = float(low_threshold_pct)
|
||||
self._high_pct = float(high_threshold_pct)
|
||||
self._i_thresh = float(charging_current_threshold_a)
|
||||
|
||||
# Internal state (edge-detection flags)
|
||||
self._is_charging: bool = False
|
||||
self._was_low: bool = False # True while SOC is in low zone
|
||||
self._was_high: bool = True # True while SOC is in high zone
|
||||
self._percentage: float = 100.0
|
||||
|
||||
# ── Properties ────────────────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
def is_charging(self) -> bool:
|
||||
"""True when charge current has been detected."""
|
||||
return self._is_charging
|
||||
|
||||
@property
|
||||
def percentage(self) -> float:
|
||||
"""Most recently observed state of charge (0–100 %)."""
|
||||
return self._percentage
|
||||
|
||||
@property
|
||||
def is_depleted(self) -> bool:
|
||||
"""True when SOC is currently below the low threshold."""
|
||||
return self._percentage < self._low_pct
|
||||
|
||||
@property
|
||||
def is_charged(self) -> bool:
|
||||
"""True when SOC is currently above the high threshold."""
|
||||
return self._percentage >= self._high_pct
|
||||
|
||||
# ── Update ────────────────────────────────────────────────────────────────
|
||||
|
||||
def update(
|
||||
self,
|
||||
percentage: float,
|
||||
current_a: float,
|
||||
) -> List[ChargeEvent]:
|
||||
"""
|
||||
Process one sample from BatteryState.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
percentage : state of charge 0–100 (from BatteryState.percentage × 100)
|
||||
current_a : charge/discharge current (A); positive = charging
|
||||
|
||||
Returns
|
||||
-------
|
||||
List of ChargeEvents (may be empty; never duplicates per edge).
|
||||
"""
|
||||
events: List[ChargeEvent] = []
|
||||
self._percentage = float(percentage)
|
||||
now_charging = current_a >= self._i_thresh
|
||||
|
||||
# ── Charging edge detection ───────────────────────────────────────────
|
||||
if now_charging and not self._is_charging:
|
||||
events.append(ChargeEvent.CHARGING_STARTED)
|
||||
elif not now_charging and self._is_charging:
|
||||
events.append(ChargeEvent.CHARGING_STOPPED)
|
||||
self._is_charging = now_charging
|
||||
|
||||
# ── Low threshold — edge on falling through ───────────────────────────
|
||||
in_low_zone = self._percentage < self._low_pct
|
||||
if in_low_zone and not self._was_low:
|
||||
events.append(ChargeEvent.THRESHOLD_LOW)
|
||||
self._was_low = in_low_zone
|
||||
|
||||
# ── High threshold — edge on rising through ───────────────────────────
|
||||
in_high_zone = self._percentage >= self._high_pct
|
||||
if in_high_zone and not self._was_high:
|
||||
events.append(ChargeEvent.THRESHOLD_HIGH)
|
||||
self._was_high = in_high_zone
|
||||
|
||||
return events
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Clear all state (use when node restarts)."""
|
||||
self._is_charging = False
|
||||
self._was_low = False
|
||||
self._was_high = True
|
||||
self._percentage = 100.0
|
||||
@ -0,0 +1,210 @@
|
||||
"""
|
||||
dock_detector.py — ArUco marker + IR beacon detection for docking (Issue #158).
|
||||
|
||||
Detection sources
|
||||
─────────────────
|
||||
ArUco marker : cv2.aruco marker on the dock face; PnP solve gives full 3-D
|
||||
pose relative to the robot camera. Marker ID and physical
|
||||
size are configurable.
|
||||
|
||||
IR beacon : single ADC channel (normalised 0–1) with envelope follower
|
||||
and amplitude threshold. Provides presence detection only
|
||||
(no pose), used as a fallback or range-alert.
|
||||
|
||||
Outputs
|
||||
───────
|
||||
DockPose(distance_m, yaw_rad, lateral_m, source)
|
||||
distance_m — straight-line distance to dock marker centre (m)
|
||||
yaw_rad — bearing error (rad); +ve = dock is right of camera axis (tx > 0)
|
||||
lateral_m — lateral offset (m) in camera frame; +ve = dock is right
|
||||
source — "aruco" | "ir"
|
||||
|
||||
ArucoDetector.detect() returns Optional[DockPose]; None if marker absent or
|
||||
OpenCV / cv2.aruco are unavailable at runtime.
|
||||
|
||||
IRBeaconDetector.update(sample) returns bool (True = beacon detected).
|
||||
|
||||
Pure module — no ROS2 dependency.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
|
||||
# ── DockPose ─────────────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class DockPose:
|
||||
"""Dock pose in the robot's camera frame."""
|
||||
distance_m: float # straight-line distance to marker (m)
|
||||
yaw_rad: float # bearing error (rad); +ve = dock is right of camera axis (tx > 0)
|
||||
lateral_m: float # lateral offset (m) in camera X; +ve = dock is right
|
||||
source: str # "aruco" | "ir"
|
||||
|
||||
|
||||
# ── ArucoDetector ─────────────────────────────────────────────────────────────
|
||||
|
||||
class ArucoDetector:
|
||||
"""
|
||||
Detect a single ArUco marker and solve its 3-D pose with PnP.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
marker_id : ArUco marker ID placed on the dock face (default 42)
|
||||
marker_size_m : physical side length of the printed marker (m)
|
||||
aruco_dict_id : cv2.aruco dictionary constant (default DICT_4X4_50 = 0)
|
||||
"""
|
||||
|
||||
MARKER_ID_DEFAULT = 42
|
||||
MARKER_SIZE_DEFAULT = 0.10 # 10 cm
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
marker_id: int = MARKER_ID_DEFAULT,
|
||||
marker_size_m: float = MARKER_SIZE_DEFAULT,
|
||||
aruco_dict_id: int = 0, # cv2.aruco.DICT_4X4_50
|
||||
):
|
||||
self._marker_id = marker_id
|
||||
self._marker_size = marker_size_m
|
||||
self._dict_id = aruco_dict_id
|
||||
self._ready: Optional[bool] = None # None = not yet attempted
|
||||
|
||||
# Set by _init(); None when OpenCV unavailable
|
||||
self._cv2 = None
|
||||
self._detector = None
|
||||
self._obj_pts = None
|
||||
|
||||
# ── Initialisation ────────────────────────────────────────────────────────
|
||||
|
||||
def _init(self) -> bool:
|
||||
"""Lazy-initialise cv2.aruco. Returns False if unavailable."""
|
||||
if self._ready is not None:
|
||||
return self._ready
|
||||
try:
|
||||
import cv2
|
||||
import numpy as np
|
||||
aruco_dict = cv2.aruco.getPredefinedDictionary(self._dict_id)
|
||||
aruco_params = cv2.aruco.DetectorParameters()
|
||||
self._detector = cv2.aruco.ArucoDetector(aruco_dict, aruco_params)
|
||||
half = self._marker_size / 2.0
|
||||
self._obj_pts = np.array([
|
||||
[-half, half, 0.0],
|
||||
[ half, half, 0.0],
|
||||
[ half, -half, 0.0],
|
||||
[-half, -half, 0.0],
|
||||
], dtype=np.float64)
|
||||
self._cv2 = cv2
|
||||
self._ready = True
|
||||
except Exception:
|
||||
self._ready = False
|
||||
return self._ready
|
||||
|
||||
# ── Public API ────────────────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
def available(self) -> bool:
|
||||
"""True if cv2.aruco is importable and initialised."""
|
||||
return self._init()
|
||||
|
||||
def detect(
|
||||
self,
|
||||
image, # numpy.ndarray H×W×3 BGR (or greyscale)
|
||||
camera_matrix, # numpy.ndarray 3×3
|
||||
dist_coeffs, # numpy.ndarray 1×4 or 1×5
|
||||
) -> Optional[DockPose]:
|
||||
"""
|
||||
Detect the dock marker in *image* and return its pose.
|
||||
|
||||
Returns None if the target marker is absent, PnP fails, or OpenCV is
|
||||
not available.
|
||||
"""
|
||||
if not self._init():
|
||||
return None
|
||||
|
||||
try:
|
||||
corners, ids, _ = self._detector.detectMarkers(image)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
if ids is None or len(ids) == 0:
|
||||
return None
|
||||
|
||||
import numpy as np
|
||||
for i, mid in enumerate(ids.flatten()):
|
||||
if int(mid) != self._marker_id:
|
||||
continue
|
||||
img_pts = corners[i].reshape(4, 2).astype(np.float64)
|
||||
ok, rvec, tvec = self._cv2.solvePnP(
|
||||
self._obj_pts, img_pts,
|
||||
camera_matrix, dist_coeffs,
|
||||
flags=self._cv2.SOLVEPNP_IPPE_SQUARE,
|
||||
)
|
||||
if not ok:
|
||||
continue
|
||||
tx = float(tvec[0])
|
||||
ty = float(tvec[1])
|
||||
tz = float(tvec[2])
|
||||
distance = math.sqrt(tx * tx + ty * ty + tz * tz)
|
||||
yaw = math.atan2(tx, tz) # camera-x deviation from optical axis
|
||||
return DockPose(
|
||||
distance_m=distance,
|
||||
yaw_rad=yaw,
|
||||
lateral_m=tx,
|
||||
source="aruco",
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
# ── IRBeaconDetector ──────────────────────────────────────────────────────────
|
||||
|
||||
class IRBeaconDetector:
|
||||
"""
|
||||
IR beacon presence detector from a single normalised ADC channel.
|
||||
|
||||
Maintains an exponential envelope follower. When the smoothed amplitude
|
||||
exceeds *threshold* the beacon is considered detected.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
threshold : amplitude above which beacon is 'detected' (default 0.5)
|
||||
smoothing_alpha : EMA coefficient ∈ (0, 1]; 1 = no smoothing (default 0.3)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
threshold: float = 0.5,
|
||||
smoothing_alpha: float = 0.3,
|
||||
):
|
||||
self._threshold = max(1e-9, float(threshold))
|
||||
self._alpha = max(1e-9, min(1.0, smoothing_alpha))
|
||||
self._envelope = 0.0
|
||||
|
||||
# ── Properties ────────────────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
def envelope(self) -> float:
|
||||
"""Current EMA envelope value."""
|
||||
return self._envelope
|
||||
|
||||
@property
|
||||
def detected(self) -> bool:
|
||||
"""True when smoothed amplitude ≥ threshold."""
|
||||
return self._envelope >= self._threshold
|
||||
|
||||
# ── Update ────────────────────────────────────────────────────────────────
|
||||
|
||||
def update(self, sample: float) -> bool:
|
||||
"""
|
||||
Submit one ADC sample (normalised 0–1 or raw voltage).
|
||||
Returns True if the beacon is currently detected.
|
||||
"""
|
||||
amp = abs(sample)
|
||||
self._envelope = (1.0 - self._alpha) * self._envelope + self._alpha * amp
|
||||
return self.detected
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Clear envelope to zero (e.g. when node restarts)."""
|
||||
self._envelope = 0.0
|
||||
@ -0,0 +1,482 @@
|
||||
"""
|
||||
docking_node.py — Autonomous docking station return (Issue #158).
|
||||
|
||||
Overview
|
||||
────────
|
||||
Orchestrates dock detection, Nav2 corridor approach, visual-servo final
|
||||
alignment, and charge monitoring. Interrupts the active Nav2 mission when
|
||||
battery drops to 15 % and resumes when charged to 80 %.
|
||||
|
||||
Pipeline (20 Hz)
|
||||
────────────────
|
||||
1. Camera callback → ArucoDetector → DockPose (if marker visible)
|
||||
2. IR callback → IRBeaconDetector → presence flag (fallback)
|
||||
3. Battery callback→ ChargeMonitor → ChargeEvents
|
||||
4. 20 Hz timer → DockingStateMachine.tick()
|
||||
→ publish cmd_vel (VISUAL_SERVO + UNDOCKING)
|
||||
→ Nav2 action calls (NAV2_APPROACH)
|
||||
→ publish DockingStatus
|
||||
|
||||
Subscribes
|
||||
──────────
|
||||
<image_topic> sensor_msgs/Image — dock camera
|
||||
<camera_info_topic> sensor_msgs/CameraInfo — intrinsics for PnP
|
||||
<ir_topic> std_msgs/Float32 — IR beacon ADC sample
|
||||
<battery_topic> sensor_msgs/BatteryState — SOC + current
|
||||
|
||||
Publishes
|
||||
─────────
|
||||
/saltybot/docking_status saltybot_docking_msgs/DockingStatus
|
||||
/saltybot/docking_cmd_vel geometry_msgs/Twist — servo + undock commands
|
||||
/saltybot/resume_mission std_msgs/Bool — trigger mission resume
|
||||
|
||||
Services
|
||||
────────
|
||||
/saltybot/dock saltybot_docking_msgs/Dock
|
||||
/saltybot/undock saltybot_docking_msgs/Undock
|
||||
|
||||
Action client
|
||||
─────────────
|
||||
/navigate_to_pose nav2_msgs/action/NavigateToPose (optional)
|
||||
|
||||
Parameters
|
||||
──────────
|
||||
control_rate 20.0 Hz
|
||||
image_topic /camera/image_raw
|
||||
camera_info_topic /camera/camera_info
|
||||
ir_topic /saltybot/ir_beacon
|
||||
battery_topic /saltybot/battery_state
|
||||
aruco_marker_id 42
|
||||
aruco_marker_size_m 0.10
|
||||
ir_threshold 0.50
|
||||
battery_low_pct 15.0
|
||||
battery_high_pct 80.0
|
||||
servo_range_m 1.00 m (switch to IBVS within this distance)
|
||||
k_linear 0.30
|
||||
k_angular 0.80
|
||||
lateral_tol_m 0.005 m (±5 mm alignment tolerance)
|
||||
contact_distance_m 0.05 m
|
||||
max_linear_ms 0.10 m/s
|
||||
max_angular_rads 0.30 rad/s
|
||||
undock_speed_ms -0.20 m/s
|
||||
undock_ticks_max 50 (2.5 s at 20 Hz)
|
||||
lost_ticks_max 40 (2.0 s at 20 Hz)
|
||||
contact_timeout_ticks 20 (1.0 s at 20 Hz)
|
||||
dock_pose [0,0,0,0,0,0,1] [x,y,z,qx,qy,qz,qw] map frame
|
||||
pre_dock_pose [0,0,0,0,0,0,1] 1 m in front of dock
|
||||
"""
|
||||
|
||||
import json
|
||||
import math
|
||||
|
||||
import rclpy
|
||||
from rclpy.action import ActionClient
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
|
||||
|
||||
from geometry_msgs.msg import PoseStamped, Twist
|
||||
from sensor_msgs.msg import BatteryState, CameraInfo, Image
|
||||
from std_msgs.msg import Bool, Float32
|
||||
|
||||
from saltybot_docking.charge_monitor import ChargeEvent, ChargeMonitor
|
||||
from saltybot_docking.dock_detector import ArucoDetector, DockPose, IRBeaconDetector
|
||||
from saltybot_docking.docking_state_machine import (
|
||||
DockingInputs,
|
||||
DockingState,
|
||||
DockingStateMachine,
|
||||
)
|
||||
from saltybot_docking.visual_servo import VisualServo
|
||||
|
||||
try:
|
||||
from saltybot_docking_msgs.msg import DockingStatus
|
||||
from saltybot_docking_msgs.srv import Dock, Undock
|
||||
_MSGS_OK = True
|
||||
except ImportError:
|
||||
_MSGS_OK = False
|
||||
|
||||
try:
|
||||
from nav2_msgs.action import NavigateToPose
|
||||
_NAV2_OK = True
|
||||
except ImportError:
|
||||
_NAV2_OK = False
|
||||
|
||||
try:
|
||||
import cv2
|
||||
import numpy as np
|
||||
_CV2_OK = True
|
||||
except ImportError:
|
||||
_CV2_OK = False
|
||||
|
||||
|
||||
# ── Helper ────────────────────────────────────────────────────────────────────
|
||||
|
||||
def _pose_param_to_stamped(vals: list, frame_id: str, stamp) -> PoseStamped:
|
||||
"""Convert [x,y,z,qx,qy,qz,qw] list to PoseStamped."""
|
||||
ps = PoseStamped()
|
||||
ps.header.frame_id = frame_id
|
||||
ps.header.stamp = stamp
|
||||
if len(vals) >= 7:
|
||||
ps.pose.position.x = float(vals[0])
|
||||
ps.pose.position.y = float(vals[1])
|
||||
ps.pose.position.z = float(vals[2])
|
||||
ps.pose.orientation.x = float(vals[3])
|
||||
ps.pose.orientation.y = float(vals[4])
|
||||
ps.pose.orientation.z = float(vals[5])
|
||||
ps.pose.orientation.w = float(vals[6])
|
||||
return ps
|
||||
|
||||
|
||||
# ── Node ──────────────────────────────────────────────────────────────────────
|
||||
|
||||
class DockingNode(Node):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("docking")
|
||||
|
||||
# ── Parameters ───────────────────────────────────────────────────────
|
||||
self._declare_params()
|
||||
p = self._load_params()
|
||||
|
||||
# ── Sub-systems ──────────────────────────────────────────────────────
|
||||
self._aruco = ArucoDetector(
|
||||
marker_id=p["aruco_marker_id"],
|
||||
marker_size_m=p["aruco_marker_size_m"],
|
||||
)
|
||||
self._ir = IRBeaconDetector(
|
||||
threshold=p["ir_threshold"],
|
||||
)
|
||||
self._servo = VisualServo(
|
||||
k_linear=p["k_linear"],
|
||||
k_angular=p["k_angular"],
|
||||
lateral_tol_m=p["lateral_tol_m"],
|
||||
contact_distance_m=p["contact_distance_m"],
|
||||
max_linear_ms=p["max_linear_ms"],
|
||||
max_angular_rads=p["max_angular_rads"],
|
||||
)
|
||||
self._charge = ChargeMonitor(
|
||||
low_threshold_pct=p["battery_low_pct"],
|
||||
high_threshold_pct=p["battery_high_pct"],
|
||||
)
|
||||
self._fsm = DockingStateMachine(
|
||||
battery_low_pct=p["battery_low_pct"],
|
||||
battery_high_pct=p["battery_high_pct"],
|
||||
servo_range_m=p["servo_range_m"],
|
||||
undock_speed_ms=p["undock_speed_ms"],
|
||||
undock_ticks_max=p["undock_ticks_max"],
|
||||
lost_ticks_max=p["lost_ticks_max"],
|
||||
contact_timeout_ticks=p["contact_timeout_ticks"],
|
||||
)
|
||||
|
||||
# ── Mutable state ────────────────────────────────────────────────────
|
||||
self._latest_dock_pose: DockPose | None = None
|
||||
self._latest_servo_aligned: bool = False
|
||||
self._nav2_arrived: bool = False
|
||||
self._nav2_failed: bool = False
|
||||
self._dock_requested: bool = False
|
||||
self._undock_requested: bool = False
|
||||
self._camera_matrix = None
|
||||
self._dist_coeffs = None
|
||||
self._nav2_goal_handle = None
|
||||
|
||||
# ── QoS ──────────────────────────────────────────────────────────────
|
||||
reliable = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=10,
|
||||
)
|
||||
best_effort = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=1,
|
||||
)
|
||||
|
||||
# ── Subscriptions ────────────────────────────────────────────────────
|
||||
self.create_subscription(Image, p["image_topic"], self._image_cb, best_effort)
|
||||
self.create_subscription(CameraInfo, p["camera_info_topic"], self._cam_info_cb, reliable)
|
||||
self.create_subscription(Float32, p["ir_topic"], self._ir_cb, best_effort)
|
||||
self.create_subscription(BatteryState, p["battery_topic"], self._battery_cb, reliable)
|
||||
|
||||
# ── Publishers ───────────────────────────────────────────────────────
|
||||
self._cmd_vel_pub = self.create_publisher(
|
||||
Twist, "/saltybot/docking_cmd_vel", reliable
|
||||
)
|
||||
self._resume_pub = self.create_publisher(
|
||||
Bool, "/saltybot/resume_mission", reliable
|
||||
)
|
||||
self._status_pub = None
|
||||
if _MSGS_OK:
|
||||
self._status_pub = self.create_publisher(
|
||||
DockingStatus, "/saltybot/docking_status", reliable
|
||||
)
|
||||
|
||||
# ── Services ─────────────────────────────────────────────────────────
|
||||
if _MSGS_OK:
|
||||
self.create_service(Dock, "/saltybot/dock", self._dock_srv)
|
||||
self.create_service(Undock, "/saltybot/undock", self._undock_srv)
|
||||
|
||||
# ── Nav2 action client (optional) ────────────────────────────────────
|
||||
self._nav2_client = None
|
||||
if _NAV2_OK:
|
||||
self._nav2_client = ActionClient(self, NavigateToPose, "navigate_to_pose")
|
||||
|
||||
# ── Timer ────────────────────────────────────────────────────────────
|
||||
rate = p["control_rate"]
|
||||
self._timer = self.create_timer(1.0 / rate, self._control_cb)
|
||||
|
||||
self.get_logger().info(
|
||||
f"DockingNode ready rate={rate}Hz "
|
||||
f"nav2={'yes' if _NAV2_OK else 'no'} "
|
||||
f"aruco={'yes' if _CV2_OK else 'no'}"
|
||||
)
|
||||
|
||||
# ── Parameters ────────────────────────────────────────────────────────────
|
||||
|
||||
def _declare_params(self) -> None:
|
||||
self.declare_parameter("control_rate", 20.0)
|
||||
self.declare_parameter("image_topic", "/camera/image_raw")
|
||||
self.declare_parameter("camera_info_topic", "/camera/camera_info")
|
||||
self.declare_parameter("ir_topic", "/saltybot/ir_beacon")
|
||||
self.declare_parameter("battery_topic", "/saltybot/battery_state")
|
||||
self.declare_parameter("aruco_marker_id", 42)
|
||||
self.declare_parameter("aruco_marker_size_m", 0.10)
|
||||
self.declare_parameter("ir_threshold", 0.50)
|
||||
self.declare_parameter("battery_low_pct", 15.0)
|
||||
self.declare_parameter("battery_high_pct", 80.0)
|
||||
self.declare_parameter("servo_range_m", 1.00)
|
||||
self.declare_parameter("k_linear", 0.30)
|
||||
self.declare_parameter("k_angular", 0.80)
|
||||
self.declare_parameter("lateral_tol_m", 0.005)
|
||||
self.declare_parameter("contact_distance_m", 0.05)
|
||||
self.declare_parameter("max_linear_ms", 0.10)
|
||||
self.declare_parameter("max_angular_rads", 0.30)
|
||||
self.declare_parameter("undock_speed_ms", -0.20)
|
||||
self.declare_parameter("undock_ticks_max", 50)
|
||||
self.declare_parameter("lost_ticks_max", 40)
|
||||
self.declare_parameter("contact_timeout_ticks", 20)
|
||||
self.declare_parameter("dock_pose", [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0])
|
||||
self.declare_parameter("pre_dock_pose", [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0])
|
||||
|
||||
def _load_params(self) -> dict:
|
||||
g = self.get_parameter
|
||||
keys = [
|
||||
"control_rate",
|
||||
"image_topic", "camera_info_topic", "ir_topic", "battery_topic",
|
||||
"aruco_marker_id", "aruco_marker_size_m", "ir_threshold",
|
||||
"battery_low_pct", "battery_high_pct",
|
||||
"servo_range_m", "k_linear", "k_angular",
|
||||
"lateral_tol_m", "contact_distance_m", "max_linear_ms", "max_angular_rads",
|
||||
"undock_speed_ms", "undock_ticks_max", "lost_ticks_max", "contact_timeout_ticks",
|
||||
"dock_pose", "pre_dock_pose",
|
||||
]
|
||||
return {k: g(k).value for k in keys}
|
||||
|
||||
# ── Subscriptions ─────────────────────────────────────────────────────────
|
||||
|
||||
def _cam_info_cb(self, msg: CameraInfo) -> None:
|
||||
if _CV2_OK and self._camera_matrix is None:
|
||||
self._camera_matrix = np.array(msg.k, dtype=np.float64).reshape(3, 3)
|
||||
self._dist_coeffs = np.array(msg.d, dtype=np.float64)
|
||||
|
||||
def _image_cb(self, msg: Image) -> None:
|
||||
if not _CV2_OK or self._camera_matrix is None:
|
||||
return
|
||||
try:
|
||||
img = self._ros_image_to_bgr(msg)
|
||||
except Exception:
|
||||
return
|
||||
pose = self._aruco.detect(img, self._camera_matrix, self._dist_coeffs)
|
||||
if pose is not None:
|
||||
self._latest_dock_pose = pose
|
||||
# Update visual servo for aligned flag
|
||||
servo_result = self._servo.update(pose)
|
||||
self._latest_servo_aligned = servo_result.aligned
|
||||
|
||||
def _ir_cb(self, msg: Float32) -> None:
|
||||
detected = self._ir.update(msg.data)
|
||||
# IR gives presence only — supply a synthetic far DockPose if no ArUco
|
||||
if detected and self._latest_dock_pose is None:
|
||||
# Beacon says dock is ahead, distance unknown; use sentinel 5.0 m
|
||||
self._latest_dock_pose = DockPose(
|
||||
distance_m=5.0, yaw_rad=0.0, lateral_m=0.0, source="ir"
|
||||
)
|
||||
|
||||
def _battery_cb(self, msg: BatteryState) -> None:
|
||||
pct = float(msg.percentage) * 100.0 if msg.percentage <= 1.0 else float(msg.percentage)
|
||||
current = float(msg.current)
|
||||
events = self._charge.update(pct, current)
|
||||
for ev in events:
|
||||
if ev == ChargeEvent.THRESHOLD_LOW:
|
||||
self.get_logger().warn(
|
||||
f"Battery low ({pct:.1f}%) — initiating auto-dock"
|
||||
)
|
||||
elif ev == ChargeEvent.CHARGING_STARTED:
|
||||
self.get_logger().info("Charging started")
|
||||
elif ev == ChargeEvent.THRESHOLD_HIGH:
|
||||
self.get_logger().info(
|
||||
f"Battery charged ({pct:.1f}%) — ready to undock"
|
||||
)
|
||||
|
||||
# ── 20 Hz control loop ────────────────────────────────────────────────────
|
||||
|
||||
def _control_cb(self) -> None:
|
||||
inp = DockingInputs(
|
||||
battery_pct=self._charge.percentage,
|
||||
is_charging=self._charge.is_charging,
|
||||
dock_pose=self._latest_dock_pose,
|
||||
servo_aligned=self._latest_servo_aligned,
|
||||
dock_requested=self._dock_requested,
|
||||
undock_requested=self._undock_requested,
|
||||
nav2_arrived=self._nav2_arrived,
|
||||
nav2_failed=self._nav2_failed,
|
||||
)
|
||||
# Consume one-shot flags
|
||||
self._dock_requested = False
|
||||
self._undock_requested = False
|
||||
self._nav2_arrived = False
|
||||
self._nav2_failed = False
|
||||
# Clear dock pose so stale detections don't persist past one tick
|
||||
self._latest_dock_pose = None
|
||||
self._latest_servo_aligned = False
|
||||
|
||||
out = self._fsm.tick(inp)
|
||||
|
||||
# ── State-entry side effects ──────────────────────────────────────────
|
||||
if out.state_changed:
|
||||
self.get_logger().info(f"Docking FSM → {out.state.value}")
|
||||
|
||||
if out.request_nav2:
|
||||
self._send_nav2_goal()
|
||||
if out.cancel_nav2:
|
||||
self._cancel_nav2_goal()
|
||||
|
||||
if out.mission_interrupted:
|
||||
self.get_logger().warn("Mission interrupted for auto-dock (battery low)")
|
||||
|
||||
if out.mission_resume:
|
||||
msg = Bool()
|
||||
msg.data = True
|
||||
self._resume_pub.publish(msg)
|
||||
self.get_logger().info("Auto-dock complete — resuming mission")
|
||||
|
||||
# ── Velocity commands ─────────────────────────────────────────────────
|
||||
if out.state in (DockingState.VISUAL_SERVO, DockingState.UNDOCKING):
|
||||
twist = Twist()
|
||||
twist.linear.x = out.cmd_linear
|
||||
twist.angular.z = out.cmd_angular
|
||||
self._cmd_vel_pub.publish(twist)
|
||||
|
||||
# ── Status ────────────────────────────────────────────────────────────
|
||||
if self._status_pub is not None:
|
||||
self._publish_status(inp, out)
|
||||
|
||||
# ── Nav2 integration ──────────────────────────────────────────────────────
|
||||
|
||||
def _send_nav2_goal(self) -> None:
|
||||
if not _NAV2_OK or self._nav2_client is None:
|
||||
self.get_logger().warn("Nav2 unavailable — skipping pre-dock approach")
|
||||
self._nav2_arrived = True # skip straight to VISUAL_SERVO next tick
|
||||
return
|
||||
if not self._nav2_client.wait_for_server(timeout_sec=1.0):
|
||||
self.get_logger().warn("Nav2 action server not ready — skipping approach")
|
||||
self._nav2_arrived = True
|
||||
return
|
||||
p = self._load_params()
|
||||
goal = NavigateToPose.Goal()
|
||||
goal.pose = _pose_param_to_stamped(
|
||||
p["pre_dock_pose"], "map", self.get_clock().now().to_msg()
|
||||
)
|
||||
send_future = self._nav2_client.send_goal_async(goal)
|
||||
send_future.add_done_callback(self._nav2_goal_response_cb)
|
||||
|
||||
def _nav2_goal_response_cb(self, future) -> None:
|
||||
goal_handle = future.result()
|
||||
if not goal_handle or not goal_handle.accepted:
|
||||
self.get_logger().warn("Nav2 goal rejected")
|
||||
self._nav2_failed = True
|
||||
return
|
||||
self._nav2_goal_handle = goal_handle
|
||||
result_future = goal_handle.get_result_async()
|
||||
result_future.add_done_callback(self._nav2_result_cb)
|
||||
|
||||
def _nav2_result_cb(self, future) -> None:
|
||||
result = future.result()
|
||||
if result and result.status == 4: # SUCCEEDED
|
||||
self._nav2_arrived = True
|
||||
else:
|
||||
self._nav2_failed = True
|
||||
|
||||
def _cancel_nav2_goal(self) -> None:
|
||||
if self._nav2_goal_handle is not None:
|
||||
self._nav2_goal_handle.cancel_goal_async()
|
||||
self._nav2_goal_handle = None
|
||||
|
||||
# ── Services ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _dock_srv(self, request, response):
|
||||
if self._fsm.state not in (DockingState.IDLE,) and not request.force:
|
||||
response.success = False
|
||||
response.message = f"Already in state {self._fsm.state.value}; use force=True to override"
|
||||
return response
|
||||
self._dock_requested = True
|
||||
response.success = True
|
||||
response.message = "Docking initiated"
|
||||
return response
|
||||
|
||||
def _undock_srv(self, request, response):
|
||||
if self._fsm.state == DockingState.IDLE:
|
||||
response.success = False
|
||||
response.message = "Already idle"
|
||||
return response
|
||||
self._undock_requested = True
|
||||
response.success = True
|
||||
response.message = "Undocking initiated"
|
||||
return response
|
||||
|
||||
# ── Publishing ────────────────────────────────────────────────────────────
|
||||
|
||||
def _publish_status(self, inp: DockingInputs, out) -> None:
|
||||
msg = DockingStatus()
|
||||
msg.stamp = self.get_clock().now().to_msg()
|
||||
msg.state = out.state.value
|
||||
msg.dock_detected = inp.dock_pose is not None
|
||||
msg.distance_m = float(inp.dock_pose.distance_m) if inp.dock_pose else float("nan")
|
||||
msg.lateral_error_m = float(inp.dock_pose.lateral_m) if inp.dock_pose else float("nan")
|
||||
msg.battery_pct = float(self._charge.percentage)
|
||||
msg.charging = self._charge.is_charging
|
||||
msg.aligned = inp.servo_aligned
|
||||
self._status_pub.publish(msg)
|
||||
|
||||
# ── Utilities ─────────────────────────────────────────────────────────────
|
||||
|
||||
@staticmethod
|
||||
def _ros_image_to_bgr(msg: Image):
|
||||
"""Convert sensor_msgs/Image to cv2 BGR numpy array."""
|
||||
import numpy as np
|
||||
data = np.frombuffer(msg.data, dtype=np.uint8)
|
||||
if msg.encoding in ("bgr8", "rgb8"):
|
||||
img = data.reshape(msg.height, msg.width, 3)
|
||||
if msg.encoding == "rgb8":
|
||||
img = img[:, :, ::-1].copy()
|
||||
elif msg.encoding == "mono8":
|
||||
img = data.reshape(msg.height, msg.width)
|
||||
else:
|
||||
img = data.reshape(msg.height, msg.width, -1)
|
||||
return img
|
||||
|
||||
|
||||
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = DockingNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.try_shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -0,0 +1,257 @@
|
||||
"""
|
||||
docking_state_machine.py — FSM for autonomous docking / undocking (Issue #158).
|
||||
|
||||
States
|
||||
──────
|
||||
IDLE : normal operation; watching battery.
|
||||
DETECTING : scanning for dock marker or IR beacon.
|
||||
NAV2_APPROACH : Nav2 navigating to pre-dock waypoint (~1 m from dock face).
|
||||
VISUAL_SERVO : IBVS final approach (<= servo_range_m to dock).
|
||||
CONTACT : dock pins touched; waiting for charge current.
|
||||
CHARGING : charging in progress.
|
||||
UNDOCKING : backing away from dock at fixed reverse speed.
|
||||
|
||||
Transitions
|
||||
───────────
|
||||
IDLE → DETECTING : battery_pct < battery_low_pct OR dock_requested
|
||||
DETECTING → NAV2_APPROACH : dock_pose found AND distance > servo_range_m
|
||||
DETECTING → VISUAL_SERVO : dock_pose found AND distance ≤ servo_range_m
|
||||
NAV2_APPROACH → VISUAL_SERVO : dock_pose found AND distance ≤ servo_range_m
|
||||
OR nav2_arrived
|
||||
NAV2_APPROACH → DETECTING : nav2_failed (retry detection)
|
||||
VISUAL_SERVO → CONTACT : servo_aligned
|
||||
VISUAL_SERVO → DETECTING : dock lost for > lost_ticks_max ticks
|
||||
CONTACT → CHARGING : is_charging
|
||||
CONTACT → VISUAL_SERVO : contact_timeout_ticks exceeded (retry)
|
||||
CHARGING → UNDOCKING : battery_pct >= battery_high_pct
|
||||
OR undock_requested
|
||||
UNDOCKING → IDLE : undock_ticks >= undock_ticks_max
|
||||
|
||||
Any state → IDLE : undock_requested (except CHARGING → UNDOCKING)
|
||||
DETECTING → IDLE : undock_requested (abort scan)
|
||||
|
||||
Output signals
|
||||
──────────────
|
||||
cmd_linear, cmd_angular : velocity during VISUAL_SERVO and UNDOCKING
|
||||
request_nav2 : send Nav2 goal (emitted once on ENTRY to NAV2_APPROACH)
|
||||
cancel_nav2 : cancel Nav2 goal
|
||||
mission_interrupted : battery forced dock (for mission planner)
|
||||
mission_resume : auto-resume after charge cycle completes
|
||||
state_changed : FSM state changed this tick
|
||||
|
||||
Pure module — no ROS2 dependency.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
from saltybot_docking.dock_detector import DockPose
|
||||
|
||||
|
||||
# ── States ────────────────────────────────────────────────────────────────────
|
||||
|
||||
class DockingState(Enum):
|
||||
IDLE = "IDLE"
|
||||
DETECTING = "DETECTING"
|
||||
NAV2_APPROACH = "NAV2_APPROACH"
|
||||
VISUAL_SERVO = "VISUAL_SERVO"
|
||||
CONTACT = "CONTACT"
|
||||
CHARGING = "CHARGING"
|
||||
UNDOCKING = "UNDOCKING"
|
||||
|
||||
|
||||
# ── I/O dataclasses ───────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class DockingInputs:
|
||||
"""Sensor snapshot fed to the FSM each tick."""
|
||||
battery_pct: float = 100.0
|
||||
is_charging: bool = False
|
||||
dock_pose: Optional[DockPose] = None # None = dock not visible
|
||||
servo_aligned: bool = False # visual servo declared aligned
|
||||
dock_requested: bool = False # explicit /saltybot/dock call
|
||||
undock_requested: bool = False # explicit /saltybot/undock call
|
||||
nav2_arrived: bool = False # Nav2 action succeeded
|
||||
nav2_failed: bool = False # Nav2 action failed/cancelled
|
||||
|
||||
|
||||
@dataclass
|
||||
class DockingOutputs:
|
||||
"""Commands and signals produced by the FSM each tick."""
|
||||
state: DockingState = DockingState.IDLE
|
||||
cmd_linear: float = 0.0 # m/s (VISUAL_SERVO + UNDOCKING)
|
||||
cmd_angular: float = 0.0 # rad/s (VISUAL_SERVO)
|
||||
request_nav2: bool = False # send Nav2 goal
|
||||
cancel_nav2: bool = False # cancel Nav2 goal
|
||||
mission_interrupted: bool = False # battery dock forced
|
||||
mission_resume: bool = False # charge cycle complete, resume
|
||||
state_changed: bool = False # FSM state changed this tick
|
||||
|
||||
|
||||
# ── DockingStateMachine ───────────────────────────────────────────────────────
|
||||
|
||||
class DockingStateMachine:
|
||||
"""
|
||||
Deterministic docking FSM.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
battery_low_pct : SOC that triggers auto-dock (default 15 %)
|
||||
battery_high_pct : SOC at which charging completes (default 80 %)
|
||||
servo_range_m : switch from Nav2 to visual servo (default 1.0 m)
|
||||
undock_speed_ms : reverse speed during undocking (m/s, must be ≤ 0)
|
||||
undock_ticks_max : ticks at undock_speed before returning to IDLE
|
||||
lost_ticks_max : allowed VISUAL_SERVO ticks with no detection
|
||||
contact_timeout_ticks : CONTACT ticks before retry alignment
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
battery_low_pct: float = 15.0,
|
||||
battery_high_pct: float = 80.0,
|
||||
servo_range_m: float = 1.0,
|
||||
undock_speed_ms: float = -0.20,
|
||||
undock_ticks_max: int = 50, # 2.5 s at 20 Hz
|
||||
lost_ticks_max: int = 40, # 2.0 s at 20 Hz
|
||||
contact_timeout_ticks: int = 20, # 1.0 s at 20 Hz
|
||||
):
|
||||
self._bat_low = float(battery_low_pct)
|
||||
self._bat_high = float(battery_high_pct)
|
||||
self._servo_range = float(servo_range_m)
|
||||
self._undock_speed = min(0.0, float(undock_speed_ms))
|
||||
self._undock_max = max(1, int(undock_ticks_max))
|
||||
self._lost_max = max(1, int(lost_ticks_max))
|
||||
self._contact_max = max(1, int(contact_timeout_ticks))
|
||||
|
||||
self._state = DockingState.IDLE
|
||||
self._undock_ticks = 0
|
||||
self._lost_ticks = 0
|
||||
self._contact_ticks = 0
|
||||
self._mission_interrupted = False
|
||||
self._resume_on_undock = False
|
||||
|
||||
# ── Public API ────────────────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
def state(self) -> DockingState:
|
||||
return self._state
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Return to IDLE and clear all counters (use for unit tests)."""
|
||||
self._state = DockingState.IDLE
|
||||
self._undock_ticks = 0
|
||||
self._lost_ticks = 0
|
||||
self._contact_ticks = 0
|
||||
self._mission_interrupted = False
|
||||
self._resume_on_undock = False
|
||||
|
||||
def tick(self, inputs: DockingInputs) -> DockingOutputs:
|
||||
"""Advance the FSM by one tick and return output commands."""
|
||||
prev_state = self._state
|
||||
out = self._step(inputs)
|
||||
out.state = self._state
|
||||
out.state_changed = (self._state != prev_state)
|
||||
if out.state_changed:
|
||||
self._on_enter(self._state)
|
||||
return out
|
||||
|
||||
# ── Internal step ─────────────────────────────────────────────────────────
|
||||
|
||||
def _step(self, inp: DockingInputs) -> DockingOutputs:
|
||||
out = DockingOutputs(state=self._state)
|
||||
|
||||
# ── IDLE ──────────────────────────────────────────────────────────────
|
||||
if self._state == DockingState.IDLE:
|
||||
if inp.dock_requested:
|
||||
self._state = DockingState.DETECTING
|
||||
elif inp.battery_pct < self._bat_low:
|
||||
self._state = DockingState.DETECTING
|
||||
out.mission_interrupted = True
|
||||
self._mission_interrupted = True
|
||||
|
||||
# ── DETECTING ─────────────────────────────────────────────────────────
|
||||
elif self._state == DockingState.DETECTING:
|
||||
if inp.undock_requested:
|
||||
self._state = DockingState.IDLE
|
||||
self._mission_interrupted = False
|
||||
elif inp.dock_pose is not None:
|
||||
if inp.dock_pose.distance_m <= self._servo_range:
|
||||
self._state = DockingState.VISUAL_SERVO
|
||||
else:
|
||||
self._state = DockingState.NAV2_APPROACH
|
||||
out.request_nav2 = True
|
||||
|
||||
# ── NAV2_APPROACH ─────────────────────────────────────────────────────
|
||||
elif self._state == DockingState.NAV2_APPROACH:
|
||||
if inp.undock_requested:
|
||||
self._state = DockingState.IDLE
|
||||
out.cancel_nav2 = True
|
||||
self._mission_interrupted = False
|
||||
elif inp.dock_pose is not None and inp.dock_pose.distance_m <= self._servo_range:
|
||||
self._state = DockingState.VISUAL_SERVO
|
||||
out.cancel_nav2 = True
|
||||
elif inp.nav2_arrived:
|
||||
self._state = DockingState.VISUAL_SERVO
|
||||
elif inp.nav2_failed:
|
||||
self._state = DockingState.DETECTING # retry scan
|
||||
|
||||
# ── VISUAL_SERVO ──────────────────────────────────────────────────────
|
||||
elif self._state == DockingState.VISUAL_SERVO:
|
||||
if inp.undock_requested:
|
||||
self._state = DockingState.IDLE
|
||||
elif inp.dock_pose is None:
|
||||
self._lost_ticks += 1
|
||||
if self._lost_ticks >= self._lost_max:
|
||||
self._state = DockingState.DETECTING
|
||||
else:
|
||||
self._lost_ticks = 0
|
||||
if inp.servo_aligned:
|
||||
self._state = DockingState.CONTACT
|
||||
|
||||
# ── CONTACT ───────────────────────────────────────────────────────────
|
||||
elif self._state == DockingState.CONTACT:
|
||||
if inp.undock_requested:
|
||||
self._state = DockingState.UNDOCKING
|
||||
self._resume_on_undock = False
|
||||
elif inp.is_charging:
|
||||
self._state = DockingState.CHARGING
|
||||
else:
|
||||
self._contact_ticks += 1
|
||||
if self._contact_ticks >= self._contact_max:
|
||||
self._state = DockingState.VISUAL_SERVO # realign
|
||||
|
||||
# ── CHARGING ──────────────────────────────────────────────────────────
|
||||
elif self._state == DockingState.CHARGING:
|
||||
if inp.undock_requested:
|
||||
self._state = DockingState.UNDOCKING
|
||||
self._resume_on_undock = False
|
||||
elif inp.battery_pct >= self._bat_high:
|
||||
self._state = DockingState.UNDOCKING
|
||||
self._resume_on_undock = self._mission_interrupted
|
||||
|
||||
# ── UNDOCKING ─────────────────────────────────────────────────────────
|
||||
elif self._state == DockingState.UNDOCKING:
|
||||
out.cmd_linear = self._undock_speed
|
||||
self._undock_ticks += 1
|
||||
if self._undock_ticks >= self._undock_max:
|
||||
out.mission_resume = self._resume_on_undock
|
||||
self._mission_interrupted = False
|
||||
self._resume_on_undock = False
|
||||
self._state = DockingState.IDLE
|
||||
|
||||
return out
|
||||
|
||||
# ── Entry actions ─────────────────────────────────────────────────────────
|
||||
|
||||
def _on_enter(self, state: DockingState) -> None:
|
||||
"""Reset per-state counters when entering a new state."""
|
||||
if state == DockingState.VISUAL_SERVO:
|
||||
self._lost_ticks = 0
|
||||
elif state == DockingState.CONTACT:
|
||||
self._contact_ticks = 0
|
||||
elif state == DockingState.UNDOCKING:
|
||||
self._undock_ticks = 0
|
||||
@ -0,0 +1,130 @@
|
||||
"""
|
||||
visual_servo.py — Image-based visual servo for final dock approach (Issue #158).
|
||||
|
||||
Control law
|
||||
───────────
|
||||
Operates once the robot is within *servo_range_m* of the dock.
|
||||
|
||||
Linear velocity (forward):
|
||||
v = clip( k_linear × (distance_m − target_distance_m), 0, max_linear_ms )
|
||||
|
||||
Angular velocity (yaw correction):
|
||||
ω = clip( −k_angular × yaw_rad, −max_angular_rads, +max_angular_rads )
|
||||
|
||||
The linear term drives the robot toward the dock until it reaches
|
||||
target_distance_m. The clip to [0, …] prevents reversing.
|
||||
|
||||
The angular term zeroes the bearing error; sign convention:
|
||||
yaw_rad > 0 → dock is to the robot's left → turn left (ω > 0).
|
||||
|
||||
Alignment declaration
|
||||
─────────────────────
|
||||
`aligned` is True when both:
|
||||
|lateral_m| < lateral_tol_m (default ±5 mm)
|
||||
distance_m < contact_distance_m (default 50 mm)
|
||||
|
||||
Pure module — no ROS2 dependency.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from saltybot_docking.dock_detector import DockPose
|
||||
|
||||
|
||||
# ── Result ────────────────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class VisualServoResult:
|
||||
linear_x: float # forward velocity (m/s); ≥ 0
|
||||
angular_z: float # yaw velocity (rad/s)
|
||||
distance_m: float # current distance to dock (m)
|
||||
lateral_error_m: float # current lateral offset (m); +ve = dock is right
|
||||
aligned: bool # True when within contact tolerances
|
||||
|
||||
|
||||
# ── VisualServo ───────────────────────────────────────────────────────────────
|
||||
|
||||
class VisualServo:
|
||||
"""
|
||||
Proportional visual servo controller for dock final approach.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
k_linear : distance gain (m/s per metre of error)
|
||||
k_angular : yaw gain (rad/s per radian of bearing error)
|
||||
target_distance_m : desired stop distance from dock face (m)
|
||||
lateral_tol_m : lateral alignment tolerance (m) [spec ±5 mm → 0.005]
|
||||
contact_distance_m: distance below which contact is assumed (m)
|
||||
max_linear_ms : velocity ceiling (m/s)
|
||||
max_angular_rads : yaw-rate ceiling (rad/s)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
k_linear: float = 0.30,
|
||||
k_angular: float = 0.80,
|
||||
target_distance_m: float = 0.02, # 20 mm — at the dock face
|
||||
lateral_tol_m: float = 0.005, # ±5 mm
|
||||
contact_distance_m: float = 0.05, # 50 mm — within contact zone
|
||||
max_linear_ms: float = 0.10, # slow for precision approach
|
||||
max_angular_rads: float = 0.30,
|
||||
):
|
||||
self._k_lin = float(k_linear)
|
||||
self._k_ang = float(k_angular)
|
||||
self._target = float(target_distance_m)
|
||||
self._lat_tol = max(1e-6, float(lateral_tol_m))
|
||||
self._contact = max(self._target, float(contact_distance_m))
|
||||
self._v_max = max(0.0, float(max_linear_ms))
|
||||
self._w_max = max(0.0, float(max_angular_rads))
|
||||
|
||||
# ── Control step ─────────────────────────────────────────────────────────
|
||||
|
||||
def update(self, dock_pose: DockPose) -> VisualServoResult:
|
||||
"""
|
||||
Compute one velocity command from the current dock pose.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
dock_pose : DockPose from ArucoDetector or IRBeaconDetector
|
||||
|
||||
Returns
|
||||
-------
|
||||
VisualServoResult with cmd velocities and alignment flag
|
||||
"""
|
||||
dist = dock_pose.distance_m
|
||||
yaw = dock_pose.yaw_rad
|
||||
lat = dock_pose.lateral_m
|
||||
|
||||
# Linear: approach until target_distance; no reversing
|
||||
v_raw = self._k_lin * (dist - self._target)
|
||||
v = max(0.0, min(self._v_max, v_raw))
|
||||
|
||||
# Angular: null yaw error; positive yaw → turn left (+ω)
|
||||
w_raw = -self._k_ang * yaw
|
||||
w = max(-self._w_max, min(self._w_max, w_raw))
|
||||
|
||||
aligned = (abs(lat) < self._lat_tol) and (dist < self._contact)
|
||||
|
||||
return VisualServoResult(
|
||||
linear_x=v,
|
||||
angular_z=w,
|
||||
distance_m=dist,
|
||||
lateral_error_m=lat,
|
||||
aligned=aligned,
|
||||
)
|
||||
|
||||
def stop(self) -> VisualServoResult:
|
||||
"""
|
||||
Zero-velocity command — used when dock pose is momentarily lost.
|
||||
"""
|
||||
return VisualServoResult(
|
||||
linear_x=0.0,
|
||||
angular_z=0.0,
|
||||
distance_m=float("nan"),
|
||||
lateral_error_m=float("nan"),
|
||||
aligned=False,
|
||||
)
|
||||
5
jetson/ros2_ws/src/saltybot_docking/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_docking/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_docking
|
||||
|
||||
[install]
|
||||
install_scripts=$base/lib/saltybot_docking
|
||||
32
jetson/ros2_ws/src/saltybot_docking/setup.py
Normal file
32
jetson/ros2_ws/src/saltybot_docking/setup.py
Normal file
@ -0,0 +1,32 @@
|
||||
from setuptools import setup, find_packages
|
||||
import os
|
||||
from glob import glob
|
||||
|
||||
package_name = "saltybot_docking"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version="0.1.0",
|
||||
packages=find_packages(exclude=["test"]),
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages",
|
||||
[f"resource/{package_name}"]),
|
||||
(f"share/{package_name}", ["package.xml"]),
|
||||
(os.path.join("share", package_name, "config"),
|
||||
glob("config/*.yaml")),
|
||||
(os.path.join("share", package_name, "launch"),
|
||||
glob("launch/*.py")),
|
||||
],
|
||||
install_requires=["setuptools"],
|
||||
zip_safe=True,
|
||||
maintainer="sl-controls",
|
||||
maintainer_email="sl-controls@saltylab.local",
|
||||
description="Docking station auto-return with ArUco/IR detection and visual servo",
|
||||
license="MIT",
|
||||
tests_require=["pytest"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
f"docking_node = {package_name}.docking_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
580
jetson/ros2_ws/src/saltybot_docking/test/test_docking.py
Normal file
580
jetson/ros2_ws/src/saltybot_docking/test/test_docking.py
Normal file
@ -0,0 +1,580 @@
|
||||
"""
|
||||
test_docking.py — Unit tests for Issue #158 docking modules.
|
||||
|
||||
Covers:
|
||||
IRBeaconDetector — envelope follower, threshold, reset
|
||||
VisualServo — velocity commands, clamping, alignment declaration
|
||||
ChargeMonitor — event detection, edge-triggering, threshold crossing
|
||||
DockingStateMachine — all state transitions and guard conditions
|
||||
"""
|
||||
|
||||
import math
|
||||
import sys
|
||||
import os
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
import pytest
|
||||
|
||||
from saltybot_docking.dock_detector import DockPose, IRBeaconDetector
|
||||
from saltybot_docking.visual_servo import VisualServo, VisualServoResult
|
||||
from saltybot_docking.charge_monitor import ChargeEvent, ChargeMonitor
|
||||
from saltybot_docking.docking_state_machine import (
|
||||
DockingInputs,
|
||||
DockingState,
|
||||
DockingStateMachine,
|
||||
)
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _make_ir(**kw):
|
||||
defaults = dict(threshold=0.5, smoothing_alpha=1.0)
|
||||
defaults.update(kw)
|
||||
return IRBeaconDetector(**defaults)
|
||||
|
||||
|
||||
def _make_servo(**kw):
|
||||
defaults = dict(
|
||||
k_linear=1.0,
|
||||
k_angular=1.0,
|
||||
target_distance_m=0.02,
|
||||
lateral_tol_m=0.005,
|
||||
contact_distance_m=0.05,
|
||||
max_linear_ms=0.5,
|
||||
max_angular_rads=1.0,
|
||||
)
|
||||
defaults.update(kw)
|
||||
return VisualServo(**defaults)
|
||||
|
||||
|
||||
def _make_charge(**kw):
|
||||
defaults = dict(
|
||||
low_threshold_pct=15.0,
|
||||
high_threshold_pct=80.0,
|
||||
charging_current_threshold_a=0.10,
|
||||
)
|
||||
defaults.update(kw)
|
||||
return ChargeMonitor(**defaults)
|
||||
|
||||
|
||||
def _make_fsm(**kw):
|
||||
defaults = dict(
|
||||
battery_low_pct=15.0,
|
||||
battery_high_pct=80.0,
|
||||
servo_range_m=1.0,
|
||||
undock_speed_ms=-0.20,
|
||||
undock_ticks_max=5,
|
||||
lost_ticks_max=3,
|
||||
contact_timeout_ticks=3,
|
||||
)
|
||||
defaults.update(kw)
|
||||
return DockingStateMachine(**defaults)
|
||||
|
||||
|
||||
def _inp(**kw):
|
||||
return DockingInputs(**kw)
|
||||
|
||||
|
||||
def _near_pose(dist=0.5):
|
||||
"""Dock pose within servo range (default 0.5 m)."""
|
||||
return DockPose(distance_m=dist, yaw_rad=0.0, lateral_m=0.0, source="aruco")
|
||||
|
||||
|
||||
def _far_pose(dist=3.0):
|
||||
"""Dock pose outside servo range."""
|
||||
return DockPose(distance_m=dist, yaw_rad=0.0, lateral_m=0.0, source="aruco")
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# IRBeaconDetector
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestIRBeaconDetector:
|
||||
|
||||
def test_zero_signal_not_detected(self):
|
||||
ir = _make_ir()
|
||||
assert ir.update(0.0) is False
|
||||
assert ir.detected is False
|
||||
|
||||
def test_above_threshold_detected(self):
|
||||
ir = _make_ir(threshold=0.5, smoothing_alpha=1.0)
|
||||
assert ir.update(0.6) is True
|
||||
assert ir.detected is True
|
||||
|
||||
def test_below_threshold_not_detected(self):
|
||||
ir = _make_ir(threshold=0.5, smoothing_alpha=1.0)
|
||||
assert ir.update(0.4) is False
|
||||
|
||||
def test_exactly_at_threshold_detected(self):
|
||||
ir = _make_ir(threshold=0.5, smoothing_alpha=1.0)
|
||||
assert ir.update(0.5) is True
|
||||
|
||||
def test_envelope_smoothing_alpha_one(self):
|
||||
"""With alpha=1, envelope equals |sample| immediately."""
|
||||
ir = _make_ir(smoothing_alpha=1.0)
|
||||
ir.update(0.7)
|
||||
assert ir.envelope == pytest.approx(0.7, abs=1e-9)
|
||||
|
||||
def test_envelope_smoothing_below_one(self):
|
||||
"""With alpha=0.5, envelope is a running average."""
|
||||
ir = _make_ir(threshold=999.0, smoothing_alpha=0.5)
|
||||
ir.update(1.0) # envelope = 0.5
|
||||
assert ir.envelope == pytest.approx(0.5, abs=1e-6)
|
||||
ir.update(1.0) # envelope = 0.75
|
||||
assert ir.envelope == pytest.approx(0.75, abs=1e-6)
|
||||
|
||||
def test_reset_clears_envelope(self):
|
||||
ir = _make_ir(smoothing_alpha=1.0)
|
||||
ir.update(0.8)
|
||||
ir.reset()
|
||||
assert ir.envelope == pytest.approx(0.0, abs=1e-9)
|
||||
assert ir.detected is False
|
||||
|
||||
def test_negative_sample_uses_abs(self):
|
||||
"""update() should use absolute value of sample."""
|
||||
ir = _make_ir(threshold=0.5, smoothing_alpha=1.0)
|
||||
assert ir.update(-0.7) is True
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# VisualServo
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestVisualServo:
|
||||
|
||||
def test_far_dock_gives_forward_velocity(self):
|
||||
servo = _make_servo(k_linear=1.0, target_distance_m=0.02)
|
||||
pose = DockPose(distance_m=0.5, yaw_rad=0.0, lateral_m=0.0, source="aruco")
|
||||
result = servo.update(pose)
|
||||
assert result.linear_x > 0.0
|
||||
|
||||
def test_at_target_distance_zero_linear(self):
|
||||
servo = _make_servo(k_linear=1.0, target_distance_m=0.02)
|
||||
pose = DockPose(distance_m=0.02, yaw_rad=0.0, lateral_m=0.0, source="aruco")
|
||||
result = servo.update(pose)
|
||||
assert result.linear_x == pytest.approx(0.0, abs=1e-9)
|
||||
|
||||
def test_behind_target_no_reverse(self):
|
||||
"""When distance < target, linear velocity must be 0 (no reversing)."""
|
||||
servo = _make_servo(k_linear=1.0, target_distance_m=0.10)
|
||||
pose = DockPose(distance_m=0.01, yaw_rad=0.0, lateral_m=0.0, source="aruco")
|
||||
result = servo.update(pose)
|
||||
assert result.linear_x == pytest.approx(0.0, abs=1e-9)
|
||||
|
||||
def test_yaw_right_gives_negative_angular(self):
|
||||
"""yaw_rad = atan2(tx, tz); tx > 0 → dock is right in camera → turn right (ω < 0)."""
|
||||
servo = _make_servo(k_angular=1.0)
|
||||
pose = DockPose(distance_m=0.5, yaw_rad=0.3, lateral_m=0.0, source="aruco")
|
||||
result = servo.update(pose)
|
||||
assert result.angular_z < 0.0
|
||||
|
||||
def test_yaw_left_gives_positive_angular(self):
|
||||
"""tx < 0 → dock is left in camera → turn left (ω > 0)."""
|
||||
servo = _make_servo(k_angular=1.0)
|
||||
pose = DockPose(distance_m=0.5, yaw_rad=-0.3, lateral_m=0.0, source="aruco")
|
||||
result = servo.update(pose)
|
||||
assert result.angular_z > 0.0
|
||||
|
||||
def test_zero_yaw_zero_angular(self):
|
||||
servo = _make_servo()
|
||||
pose = DockPose(distance_m=0.5, yaw_rad=0.0, lateral_m=0.0, source="aruco")
|
||||
result = servo.update(pose)
|
||||
assert result.angular_z == pytest.approx(0.0, abs=1e-9)
|
||||
|
||||
def test_max_linear_clamped(self):
|
||||
servo = _make_servo(k_linear=100.0, max_linear_ms=0.1)
|
||||
pose = DockPose(distance_m=5.0, yaw_rad=0.0, lateral_m=0.0, source="aruco")
|
||||
result = servo.update(pose)
|
||||
assert result.linear_x <= 0.1 + 1e-9
|
||||
|
||||
def test_max_angular_clamped(self):
|
||||
servo = _make_servo(k_angular=100.0, max_angular_rads=0.3)
|
||||
pose = DockPose(distance_m=0.5, yaw_rad=5.0, lateral_m=0.0, source="aruco")
|
||||
result = servo.update(pose)
|
||||
assert abs(result.angular_z) <= 0.3 + 1e-9
|
||||
|
||||
def test_aligned_when_within_tolerances(self):
|
||||
servo = _make_servo(lateral_tol_m=0.005, contact_distance_m=0.05)
|
||||
pose = DockPose(distance_m=0.03, yaw_rad=0.0, lateral_m=0.002, source="aruco")
|
||||
result = servo.update(pose)
|
||||
assert result.aligned is True
|
||||
|
||||
def test_not_aligned_lateral_too_large(self):
|
||||
servo = _make_servo(lateral_tol_m=0.005, contact_distance_m=0.05)
|
||||
pose = DockPose(distance_m=0.03, yaw_rad=0.0, lateral_m=0.010, source="aruco")
|
||||
result = servo.update(pose)
|
||||
assert result.aligned is False
|
||||
|
||||
def test_not_aligned_too_far(self):
|
||||
servo = _make_servo(lateral_tol_m=0.005, contact_distance_m=0.05)
|
||||
pose = DockPose(distance_m=0.10, yaw_rad=0.0, lateral_m=0.001, source="aruco")
|
||||
result = servo.update(pose)
|
||||
assert result.aligned is False
|
||||
|
||||
def test_stop_returns_zero_velocity(self):
|
||||
servo = _make_servo()
|
||||
result = servo.stop()
|
||||
assert result.linear_x == pytest.approx(0.0, abs=1e-9)
|
||||
assert result.angular_z == pytest.approx(0.0, abs=1e-9)
|
||||
assert result.aligned is False
|
||||
|
||||
def test_proportional_linear(self):
|
||||
"""Linear velocity scales with (distance − target)."""
|
||||
servo = _make_servo(k_linear=2.0, target_distance_m=0.0, max_linear_ms=999.0)
|
||||
pose = DockPose(distance_m=0.3, yaw_rad=0.0, lateral_m=0.0, source="aruco")
|
||||
result = servo.update(pose)
|
||||
assert result.linear_x == pytest.approx(2.0 * 0.3, rel=1e-6)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# ChargeMonitor
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestChargeMonitor:
|
||||
|
||||
def test_initially_not_charging(self):
|
||||
cm = _make_charge()
|
||||
assert cm.is_charging is False
|
||||
|
||||
def test_charging_started_event(self):
|
||||
cm = _make_charge(charging_current_threshold_a=0.1)
|
||||
events = cm.update(50.0, 0.5)
|
||||
assert ChargeEvent.CHARGING_STARTED in events
|
||||
|
||||
def test_charging_stopped_event(self):
|
||||
cm = _make_charge(charging_current_threshold_a=0.1)
|
||||
cm.update(50.0, 0.5) # start charging
|
||||
events = cm.update(50.0, 0.0) # stop
|
||||
assert ChargeEvent.CHARGING_STOPPED in events
|
||||
|
||||
def test_no_duplicate_charging_started(self):
|
||||
"""CHARGING_STARTED fires only once per rising edge."""
|
||||
cm = _make_charge(charging_current_threshold_a=0.1)
|
||||
cm.update(50.0, 0.5) # fires CHARGING_STARTED
|
||||
events = cm.update(50.0, 0.5) # still charging — no duplicate
|
||||
assert ChargeEvent.CHARGING_STARTED not in events
|
||||
|
||||
def test_low_threshold_event_falling_edge(self):
|
||||
"""THRESHOLD_LOW fires when SOC first drops below low_threshold_pct."""
|
||||
cm = _make_charge(low_threshold_pct=20.0)
|
||||
events = cm.update(19.0, 0.0)
|
||||
assert ChargeEvent.THRESHOLD_LOW in events
|
||||
|
||||
def test_no_duplicate_low_threshold(self):
|
||||
cm = _make_charge(low_threshold_pct=20.0)
|
||||
cm.update(19.0, 0.0) # fires once
|
||||
events = cm.update(18.0, 0.0) # still below — no repeat
|
||||
assert ChargeEvent.THRESHOLD_LOW not in events
|
||||
|
||||
def test_high_threshold_event_rising_edge(self):
|
||||
"""THRESHOLD_HIGH fires when SOC rises above high_threshold_pct."""
|
||||
cm = _make_charge(low_threshold_pct=15.0, high_threshold_pct=80.0)
|
||||
# Start below high threshold
|
||||
cm.update(50.0, 0.5) # sets was_high = False (50 < 80)
|
||||
events = cm.update(81.0, 0.5) # rises above 80
|
||||
assert ChargeEvent.THRESHOLD_HIGH in events
|
||||
|
||||
def test_no_high_event_if_always_above(self):
|
||||
"""THRESHOLD_HIGH should not fire if SOC starts above threshold."""
|
||||
cm = _make_charge(high_threshold_pct=80.0)
|
||||
# Default _was_high=True; remains above → no edge
|
||||
events = cm.update(90.0, 0.0)
|
||||
assert ChargeEvent.THRESHOLD_HIGH not in events
|
||||
|
||||
def test_is_depleted_property(self):
|
||||
cm = _make_charge(low_threshold_pct=20.0)
|
||||
cm.update(15.0, 0.0)
|
||||
assert cm.is_depleted is True
|
||||
|
||||
def test_is_charged_property(self):
|
||||
cm = _make_charge(high_threshold_pct=80.0)
|
||||
cm.update(85.0, 0.0)
|
||||
assert cm.is_charged is True
|
||||
|
||||
def test_percentage_tracking(self):
|
||||
cm = _make_charge()
|
||||
cm.update(42.5, 0.0)
|
||||
assert cm.percentage == pytest.approx(42.5, abs=1e-6)
|
||||
|
||||
def test_reset_clears_state(self):
|
||||
cm = _make_charge(charging_current_threshold_a=0.1)
|
||||
cm.update(10.0, 0.5) # is_charging=True, is_depleted=True
|
||||
cm.reset()
|
||||
assert cm.is_charging is False
|
||||
assert cm.is_depleted is False
|
||||
assert cm.percentage == pytest.approx(100.0, abs=1e-6)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# DockingStateMachine
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestDockingStateMachineBasic:
|
||||
|
||||
def test_initial_state_is_idle(self):
|
||||
fsm = _make_fsm()
|
||||
assert fsm.state == DockingState.IDLE
|
||||
|
||||
def test_idle_stays_idle_on_good_battery(self):
|
||||
fsm = _make_fsm()
|
||||
out = fsm.tick(_inp(battery_pct=80.0))
|
||||
assert fsm.state == DockingState.IDLE
|
||||
assert out.state_changed is False
|
||||
|
||||
def test_battery_low_triggers_detecting(self):
|
||||
fsm = _make_fsm()
|
||||
out = fsm.tick(_inp(battery_pct=10.0))
|
||||
assert fsm.state == DockingState.DETECTING
|
||||
assert out.mission_interrupted is True
|
||||
assert out.state_changed is True
|
||||
|
||||
def test_dock_request_triggers_detecting(self):
|
||||
fsm = _make_fsm()
|
||||
out = fsm.tick(_inp(battery_pct=80.0, dock_requested=True))
|
||||
assert fsm.state == DockingState.DETECTING
|
||||
assert out.mission_interrupted is False
|
||||
|
||||
def test_dock_request_not_duplicated_mission_interrupted(self):
|
||||
"""Explicit dock request should not set mission_interrupted."""
|
||||
fsm = _make_fsm()
|
||||
out = fsm.tick(_inp(battery_pct=80.0, dock_requested=True))
|
||||
assert out.mission_interrupted is False
|
||||
|
||||
|
||||
class TestDockingStateMachineDetecting:
|
||||
|
||||
def test_no_dock_stays_detecting(self):
|
||||
fsm = _make_fsm()
|
||||
fsm.tick(_inp(dock_requested=True))
|
||||
out = fsm.tick(_inp()) # no dock_pose
|
||||
assert fsm.state == DockingState.DETECTING
|
||||
|
||||
def test_far_dock_goes_to_nav2(self):
|
||||
fsm = _make_fsm(servo_range_m=1.0)
|
||||
fsm.tick(_inp(dock_requested=True))
|
||||
out = fsm.tick(_inp(dock_pose=_far_pose()))
|
||||
assert fsm.state == DockingState.NAV2_APPROACH
|
||||
assert out.request_nav2 is True
|
||||
|
||||
def test_near_dock_skips_nav2_to_servo(self):
|
||||
fsm = _make_fsm(servo_range_m=1.0)
|
||||
fsm.tick(_inp(dock_requested=True))
|
||||
out = fsm.tick(_inp(dock_pose=_near_pose()))
|
||||
assert fsm.state == DockingState.VISUAL_SERVO
|
||||
|
||||
def test_undock_request_aborts_detecting(self):
|
||||
fsm = _make_fsm()
|
||||
fsm.tick(_inp(dock_requested=True))
|
||||
fsm.tick(_inp(undock_requested=True))
|
||||
assert fsm.state == DockingState.IDLE
|
||||
|
||||
|
||||
class TestDockingStateMachineNav2Approach:
|
||||
|
||||
def _reach_nav2(self, fsm):
|
||||
fsm.tick(_inp(dock_requested=True))
|
||||
fsm.tick(_inp(dock_pose=_far_pose()))
|
||||
assert fsm.state == DockingState.NAV2_APPROACH
|
||||
|
||||
def test_nav2_arrived_enters_visual_servo(self):
|
||||
fsm = _make_fsm()
|
||||
self._reach_nav2(fsm)
|
||||
fsm.tick(_inp(nav2_arrived=True))
|
||||
assert fsm.state == DockingState.VISUAL_SERVO
|
||||
|
||||
def test_dock_close_during_nav2_enters_servo(self):
|
||||
fsm = _make_fsm(servo_range_m=1.0)
|
||||
self._reach_nav2(fsm)
|
||||
out = fsm.tick(_inp(dock_pose=_near_pose()))
|
||||
assert fsm.state == DockingState.VISUAL_SERVO
|
||||
assert out.cancel_nav2 is True
|
||||
|
||||
def test_nav2_failed_back_to_detecting(self):
|
||||
fsm = _make_fsm()
|
||||
self._reach_nav2(fsm)
|
||||
fsm.tick(_inp(nav2_failed=True))
|
||||
assert fsm.state == DockingState.DETECTING
|
||||
|
||||
def test_undock_during_nav2_cancels_and_idles(self):
|
||||
fsm = _make_fsm()
|
||||
self._reach_nav2(fsm)
|
||||
out = fsm.tick(_inp(undock_requested=True))
|
||||
assert fsm.state == DockingState.IDLE
|
||||
assert out.cancel_nav2 is True
|
||||
|
||||
|
||||
class TestDockingStateMachineVisualServo:
|
||||
|
||||
def _reach_servo(self, fsm):
|
||||
fsm.tick(_inp(dock_requested=True))
|
||||
fsm.tick(_inp(dock_pose=_near_pose()))
|
||||
assert fsm.state == DockingState.VISUAL_SERVO
|
||||
|
||||
def test_aligned_enters_contact(self):
|
||||
fsm = _make_fsm()
|
||||
self._reach_servo(fsm)
|
||||
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True))
|
||||
assert fsm.state == DockingState.CONTACT
|
||||
|
||||
def test_not_aligned_stays_in_servo(self):
|
||||
fsm = _make_fsm()
|
||||
self._reach_servo(fsm)
|
||||
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=False))
|
||||
assert fsm.state == DockingState.VISUAL_SERVO
|
||||
|
||||
def test_lost_detection_timeout_enters_detecting(self):
|
||||
fsm = _make_fsm(lost_ticks_max=3)
|
||||
self._reach_servo(fsm)
|
||||
for _ in range(3): # dock_pose=None each tick
|
||||
fsm.tick(_inp())
|
||||
assert fsm.state == DockingState.DETECTING
|
||||
|
||||
def test_lost_counter_resets_on_rediscovery(self):
|
||||
"""Lost count should reset when pose reappears."""
|
||||
fsm = _make_fsm(lost_ticks_max=3)
|
||||
self._reach_servo(fsm)
|
||||
fsm.tick(_inp()) # 1 lost tick
|
||||
fsm.tick(_inp(dock_pose=_near_pose())) # rediscovered
|
||||
# Need 3 more lost ticks to time out
|
||||
fsm.tick(_inp())
|
||||
fsm.tick(_inp())
|
||||
assert fsm.state == DockingState.VISUAL_SERVO
|
||||
fsm.tick(_inp())
|
||||
assert fsm.state == DockingState.DETECTING
|
||||
|
||||
def test_undock_aborts_visual_servo(self):
|
||||
fsm = _make_fsm()
|
||||
self._reach_servo(fsm)
|
||||
fsm.tick(_inp(undock_requested=True))
|
||||
assert fsm.state == DockingState.IDLE
|
||||
|
||||
|
||||
class TestDockingStateMachineContact:
|
||||
|
||||
def _reach_contact(self, fsm):
|
||||
fsm.tick(_inp(dock_requested=True))
|
||||
fsm.tick(_inp(dock_pose=_near_pose()))
|
||||
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True))
|
||||
assert fsm.state == DockingState.CONTACT
|
||||
|
||||
def test_charging_enters_charging_state(self):
|
||||
fsm = _make_fsm()
|
||||
self._reach_contact(fsm)
|
||||
fsm.tick(_inp(is_charging=True))
|
||||
assert fsm.state == DockingState.CHARGING
|
||||
|
||||
def test_contact_timeout_retries_servo(self):
|
||||
fsm = _make_fsm(contact_timeout_ticks=3)
|
||||
self._reach_contact(fsm)
|
||||
for _ in range(3):
|
||||
fsm.tick(_inp(is_charging=False))
|
||||
assert fsm.state == DockingState.VISUAL_SERVO
|
||||
|
||||
def test_undock_from_contact_to_undocking(self):
|
||||
fsm = _make_fsm()
|
||||
self._reach_contact(fsm)
|
||||
fsm.tick(_inp(undock_requested=True))
|
||||
assert fsm.state == DockingState.UNDOCKING
|
||||
|
||||
|
||||
class TestDockingStateMachineCharging:
|
||||
|
||||
def _reach_charging(self, fsm):
|
||||
fsm.tick(_inp(dock_requested=True))
|
||||
fsm.tick(_inp(dock_pose=_near_pose()))
|
||||
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True))
|
||||
fsm.tick(_inp(is_charging=True))
|
||||
assert fsm.state == DockingState.CHARGING
|
||||
|
||||
def test_full_battery_triggers_undocking(self):
|
||||
fsm = _make_fsm(battery_high_pct=80.0)
|
||||
self._reach_charging(fsm)
|
||||
fsm.tick(_inp(is_charging=True, battery_pct=85.0))
|
||||
assert fsm.state == DockingState.UNDOCKING
|
||||
|
||||
def test_undock_request_during_charging(self):
|
||||
fsm = _make_fsm()
|
||||
self._reach_charging(fsm)
|
||||
fsm.tick(_inp(undock_requested=True))
|
||||
assert fsm.state == DockingState.UNDOCKING
|
||||
|
||||
def test_charging_stays_charging_below_high_threshold(self):
|
||||
fsm = _make_fsm(battery_high_pct=80.0)
|
||||
self._reach_charging(fsm)
|
||||
fsm.tick(_inp(is_charging=True, battery_pct=70.0))
|
||||
assert fsm.state == DockingState.CHARGING
|
||||
|
||||
|
||||
class TestDockingStateMachineUndocking:
|
||||
|
||||
def _reach_undocking_via_charge(self, fsm):
|
||||
fsm.tick(_inp(dock_requested=True))
|
||||
fsm.tick(_inp(dock_pose=_near_pose()))
|
||||
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True))
|
||||
fsm.tick(_inp(is_charging=True))
|
||||
fsm.tick(_inp(is_charging=True, battery_pct=85.0)) # → UNDOCKING
|
||||
assert fsm.state == DockingState.UNDOCKING
|
||||
|
||||
def test_undocking_backs_away(self):
|
||||
fsm = _make_fsm(undock_ticks_max=5)
|
||||
self._reach_undocking_via_charge(fsm)
|
||||
out = fsm.tick(_inp())
|
||||
assert out.cmd_linear < 0.0
|
||||
|
||||
def test_undocking_completes_back_to_idle(self):
|
||||
fsm = _make_fsm(undock_ticks_max=5)
|
||||
self._reach_undocking_via_charge(fsm)
|
||||
for _ in range(5):
|
||||
out = fsm.tick(_inp())
|
||||
assert fsm.state == DockingState.IDLE
|
||||
|
||||
def test_mission_resume_after_battery_auto_dock(self):
|
||||
"""mission_resume must be True when auto-dock cycle completes."""
|
||||
fsm = _make_fsm(undock_ticks_max=5)
|
||||
# Trigger via battery low
|
||||
fsm.tick(_inp(battery_pct=10.0)) # IDLE → DETECTING
|
||||
fsm.tick(_inp(dock_pose=_near_pose())) # → VISUAL_SERVO
|
||||
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True)) # → CONTACT
|
||||
fsm.tick(_inp(is_charging=True)) # → CHARGING
|
||||
fsm.tick(_inp(is_charging=True, battery_pct=85.0)) # → UNDOCKING
|
||||
out = None
|
||||
for _ in range(5):
|
||||
out = fsm.tick(_inp())
|
||||
assert out is not None
|
||||
assert out.mission_resume is True
|
||||
assert fsm.state == DockingState.IDLE
|
||||
|
||||
def test_no_mission_resume_on_explicit_undock(self):
|
||||
"""Explicit undock should not trigger mission_resume."""
|
||||
fsm = _make_fsm(undock_ticks_max=5)
|
||||
fsm.tick(_inp(dock_requested=True)) # explicit dock — no interrupt
|
||||
fsm.tick(_inp(dock_pose=_near_pose()))
|
||||
fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True))
|
||||
fsm.tick(_inp(is_charging=True))
|
||||
fsm.tick(_inp(undock_requested=True)) # explicit undock
|
||||
last_out = None
|
||||
for _ in range(5):
|
||||
last_out = fsm.tick(_inp())
|
||||
assert last_out is not None
|
||||
assert last_out.mission_resume is False
|
||||
|
||||
|
||||
class TestDockingStateMachineReset:
|
||||
|
||||
def test_reset_returns_to_idle(self):
|
||||
fsm = _make_fsm()
|
||||
fsm.tick(_inp(dock_requested=True))
|
||||
assert fsm.state == DockingState.DETECTING
|
||||
fsm.reset()
|
||||
assert fsm.state == DockingState.IDLE
|
||||
|
||||
def test_state_changed_flag_on_transition(self):
|
||||
fsm = _make_fsm()
|
||||
out = fsm.tick(_inp(dock_requested=True))
|
||||
assert out.state_changed is True
|
||||
|
||||
def test_no_state_changed_when_same_state(self):
|
||||
fsm = _make_fsm()
|
||||
out = fsm.tick(_inp()) # stays IDLE
|
||||
assert out.state_changed is False
|
||||
16
jetson/ros2_ws/src/saltybot_docking_msgs/CMakeLists.txt
Normal file
16
jetson/ros2_ws/src/saltybot_docking_msgs/CMakeLists.txt
Normal file
@ -0,0 +1,16 @@
|
||||
cmake_minimum_required(VERSION 3.8)
|
||||
project(saltybot_docking_msgs)
|
||||
|
||||
find_package(ament_cmake REQUIRED)
|
||||
find_package(rosidl_default_generators REQUIRED)
|
||||
find_package(builtin_interfaces REQUIRED)
|
||||
|
||||
rosidl_generate_interfaces(${PROJECT_NAME}
|
||||
"msg/DockingStatus.msg"
|
||||
"srv/Dock.srv"
|
||||
"srv/Undock.srv"
|
||||
DEPENDENCIES builtin_interfaces
|
||||
)
|
||||
|
||||
ament_export_dependencies(rosidl_default_runtime)
|
||||
ament_package()
|
||||
@ -0,0 +1,21 @@
|
||||
# DockingStatus.msg — Real-time docking state snapshot (Issue #158)
|
||||
# Published by: /saltybot/docking_node
|
||||
# Topic: /saltybot/docking_status
|
||||
|
||||
builtin_interfaces/Time stamp
|
||||
|
||||
# Current FSM state
|
||||
# Values: "IDLE" | "DETECTING" | "NAV2_APPROACH" | "VISUAL_SERVO" | "CONTACT" | "CHARGING" | "UNDOCKING"
|
||||
string state
|
||||
|
||||
# Dock detection
|
||||
bool dock_detected # ArUco marker or IR beacon visible
|
||||
float32 distance_m # distance to dock (m); NaN if unknown
|
||||
float32 lateral_error_m # lateral offset to dock centre (m); +ve = dock is right
|
||||
|
||||
# Battery / charging
|
||||
float32 battery_pct # state of charge [0–100]
|
||||
bool charging # charge current detected on dock pins
|
||||
|
||||
# Alignment quality (active only during VISUAL_SERVO)
|
||||
bool aligned # within ±5 mm lateral and contact distance
|
||||
22
jetson/ros2_ws/src/saltybot_docking_msgs/package.xml
Normal file
22
jetson/ros2_ws/src/saltybot_docking_msgs/package.xml
Normal file
@ -0,0 +1,22 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_docking_msgs</name>
|
||||
<version>0.1.0</version>
|
||||
<description>Docking station message and service definitions for SaltyBot (Issue #158)</description>
|
||||
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<buildtool_depend>ament_cmake</buildtool_depend>
|
||||
<buildtool_depend>rosidl_default_generators</buildtool_depend>
|
||||
|
||||
<depend>builtin_interfaces</depend>
|
||||
|
||||
<exec_depend>rosidl_default_runtime</exec_depend>
|
||||
|
||||
<member_of_group>rosidl_interface_packages</member_of_group>
|
||||
|
||||
<export>
|
||||
<build_type>ament_cmake</build_type>
|
||||
</export>
|
||||
</package>
|
||||
9
jetson/ros2_ws/src/saltybot_docking_msgs/srv/Dock.srv
Normal file
9
jetson/ros2_ws/src/saltybot_docking_msgs/srv/Dock.srv
Normal file
@ -0,0 +1,9 @@
|
||||
# Dock.srv — Request the robot to autonomously dock (Issue #158)
|
||||
# Service: /saltybot/dock
|
||||
|
||||
# If force=True the current Nav2 mission is cancelled immediately.
|
||||
# If force=False the dock request is honoured only when the robot is idle.
|
||||
bool force
|
||||
---
|
||||
bool success
|
||||
string message
|
||||
9
jetson/ros2_ws/src/saltybot_docking_msgs/srv/Undock.srv
Normal file
9
jetson/ros2_ws/src/saltybot_docking_msgs/srv/Undock.srv
Normal file
@ -0,0 +1,9 @@
|
||||
# Undock.srv — Request the robot to leave the dock (Issue #158)
|
||||
# Service: /saltybot/undock
|
||||
|
||||
# If resume_mission=True and a mission was interrupted for battery, it is
|
||||
# re-queued after the robot backs away from the dock.
|
||||
bool resume_mission
|
||||
---
|
||||
bool success
|
||||
string message
|
||||
Loading…
x
Reference in New Issue
Block a user