diff --git a/jetson/ros2_ws/src/saltybot_docking/config/docking_params.yaml b/jetson/ros2_ws/src/saltybot_docking/config/docking_params.yaml new file mode 100644 index 0000000..20d15fe --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking/config/docking_params.yaml @@ -0,0 +1,43 @@ +/**: + ros__parameters: + # Control loop rate (Hz) + control_rate: 20.0 + + # Topic names + image_topic: "/camera/image_raw" + camera_info_topic: "/camera/camera_info" + ir_topic: "/saltybot/ir_beacon" + battery_topic: "/saltybot/battery_state" + + # ── ArUco detection ──────────────────────────────────────────────────────── + aruco_marker_id: 42 # marker ID on the dock face + aruco_marker_size_m: 0.10 # printed marker side length (m) + + # ── IR beacon ───────────────────────────────────────────────────────────── + ir_threshold: 0.50 # amplitude threshold for beacon detection + + # ── Battery thresholds ──────────────────────────────────────────────────── + battery_low_pct: 15.0 # SOC triggering auto-dock (%) + battery_high_pct: 80.0 # SOC declaring charge complete (%) + + # ── Visual servo ────────────────────────────────────────────────────────── + servo_range_m: 1.00 # switch from Nav2 to IBVS at this distance (m) + k_linear: 0.30 # distance P-gain + k_angular: 0.80 # yaw P-gain + lateral_tol_m: 0.005 # ±5 mm alignment tolerance + contact_distance_m: 0.05 # distance below which contact is assumed (m) + max_linear_ms: 0.10 # forward speed ceiling during servo (m/s) + max_angular_rads: 0.30 # yaw rate ceiling (rad/s) + + # ── Undocking maneuver ──────────────────────────────────────────────────── + undock_speed_ms: -0.20 # reverse speed (m/s; must be negative) + undock_ticks_max: 50 # ticks at undock speed (2.5 s at 20 Hz) + + # ── FSM timeouts ────────────────────────────────────────────────────────── + lost_ticks_max: 40 # ticks of lost detection before retry (2 s) + contact_timeout_ticks: 20 # ticks at CONTACT without charging before retry + + # ── Nav2 waypoints [x, y, z, qx, qy, qz, qw] in map frame ─────────────── + # Set these to the actual dock and pre-dock poses in your environment. + dock_pose: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0] + pre_dock_pose: [0.0, -1.0, 0.0, 0.0, 0.0, 0.0, 1.0] # 1 m in front of dock diff --git a/jetson/ros2_ws/src/saltybot_docking/launch/docking.launch.py b/jetson/ros2_ws/src/saltybot_docking/launch/docking.launch.py new file mode 100644 index 0000000..15b7e5a --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking/launch/docking.launch.py @@ -0,0 +1,53 @@ +""" +docking.launch.py — Launch the autonomous docking node (Issue #158). + +Usage +----- + ros2 launch saltybot_docking docking.launch.py + ros2 launch saltybot_docking docking.launch.py \ + battery_low_pct:=15.0 servo_range_m:=1.0 +""" + +import os + +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description(): + pkg_share = get_package_share_directory("saltybot_docking") + default_params = os.path.join(pkg_share, "config", "docking_params.yaml") + + return LaunchDescription([ + DeclareLaunchArgument( + "params_file", + default_value=default_params, + description="Path to docking_params.yaml", + ), + DeclareLaunchArgument( + "battery_low_pct", + default_value="15.0", + description="SOC threshold to trigger auto-dock (%)", + ), + DeclareLaunchArgument( + "servo_range_m", + default_value="1.0", + description="Switch to visual servo within this distance (m)", + ), + Node( + package="saltybot_docking", + executable="docking_node", + name="docking", + output="screen", + parameters=[ + LaunchConfiguration("params_file"), + { + "battery_low_pct": LaunchConfiguration("battery_low_pct"), + "servo_range_m": LaunchConfiguration("servo_range_m"), + }, + ], + ), + ]) diff --git a/jetson/ros2_ws/src/saltybot_docking/package.xml b/jetson/ros2_ws/src/saltybot_docking/package.xml new file mode 100644 index 0000000..02ff45d --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking/package.xml @@ -0,0 +1,24 @@ + + + + saltybot_docking + 0.1.0 + Docking station auto-return with ArUco/IR detection and visual servo (Issue #158) + sl-controls + MIT + + rclpy + sensor_msgs + geometry_msgs + std_msgs + nav2_msgs + + ament_copyright + ament_flake8 + ament_pep257 + pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_docking/resource/saltybot_docking b/jetson/ros2_ws/src/saltybot_docking/resource/saltybot_docking new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/__init__.py b/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/charge_monitor.py b/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/charge_monitor.py new file mode 100644 index 0000000..7874b13 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/charge_monitor.py @@ -0,0 +1,137 @@ +""" +charge_monitor.py — Battery charge monitoring for auto-dock return (Issue #158). + +Consumes BatteryState data (voltage, current, percentage) each control tick +and emits discrete ChargeEvents when meaningful thresholds are crossed. + +Events +────── + CHARGING_STARTED : charge current rises above charging_current_threshold_a + (docking pins making contact and charger active) + CHARGING_STOPPED : charge current drops back to near-zero (unplugged or + charge complete at hardware level) + THRESHOLD_LOW : SOC dropped below low_threshold_pct → trigger auto-dock + THRESHOLD_HIGH : SOC rose above high_threshold_pct → resume mission + +Each event is emitted at most once per crossing (edge-triggered). + +Pure module — no ROS2 dependency. +""" + +from __future__ import annotations + +from enum import Enum, auto +from typing import List + + +# ── Events ──────────────────────────────────────────────────────────────────── + +class ChargeEvent(Enum): + CHARGING_STARTED = auto() + CHARGING_STOPPED = auto() + THRESHOLD_LOW = auto() + THRESHOLD_HIGH = auto() + + +# ── ChargeMonitor ───────────────────────────────────────────────────────────── + +class ChargeMonitor: + """ + Edge-triggered monitor for battery charging state and SOC thresholds. + + Parameters + ---------- + low_threshold_pct : SOC below which THRESHOLD_LOW fires (default 15 %) + high_threshold_pct : SOC above which THRESHOLD_HIGH fires (default 80 %) + charging_current_threshold_a : minimum current (A) to declare charging + (positive = charging convention) + """ + + def __init__( + self, + low_threshold_pct: float = 15.0, + high_threshold_pct: float = 80.0, + charging_current_threshold_a: float = 0.10, + ): + self._low_pct = float(low_threshold_pct) + self._high_pct = float(high_threshold_pct) + self._i_thresh = float(charging_current_threshold_a) + + # Internal state (edge-detection flags) + self._is_charging: bool = False + self._was_low: bool = False # True while SOC is in low zone + self._was_high: bool = True # True while SOC is in high zone + self._percentage: float = 100.0 + + # ── Properties ──────────────────────────────────────────────────────────── + + @property + def is_charging(self) -> bool: + """True when charge current has been detected.""" + return self._is_charging + + @property + def percentage(self) -> float: + """Most recently observed state of charge (0–100 %).""" + return self._percentage + + @property + def is_depleted(self) -> bool: + """True when SOC is currently below the low threshold.""" + return self._percentage < self._low_pct + + @property + def is_charged(self) -> bool: + """True when SOC is currently above the high threshold.""" + return self._percentage >= self._high_pct + + # ── Update ──────────────────────────────────────────────────────────────── + + def update( + self, + percentage: float, + current_a: float, + ) -> List[ChargeEvent]: + """ + Process one sample from BatteryState. + + Parameters + ---------- + percentage : state of charge 0–100 (from BatteryState.percentage × 100) + current_a : charge/discharge current (A); positive = charging + + Returns + ------- + List of ChargeEvents (may be empty; never duplicates per edge). + """ + events: List[ChargeEvent] = [] + self._percentage = float(percentage) + now_charging = current_a >= self._i_thresh + + # ── Charging edge detection ─────────────────────────────────────────── + if now_charging and not self._is_charging: + events.append(ChargeEvent.CHARGING_STARTED) + elif not now_charging and self._is_charging: + events.append(ChargeEvent.CHARGING_STOPPED) + self._is_charging = now_charging + + # ── Low threshold — edge on falling through ─────────────────────────── + in_low_zone = self._percentage < self._low_pct + if in_low_zone and not self._was_low: + events.append(ChargeEvent.THRESHOLD_LOW) + self._was_low = in_low_zone + + # ── High threshold — edge on rising through ─────────────────────────── + in_high_zone = self._percentage >= self._high_pct + if in_high_zone and not self._was_high: + events.append(ChargeEvent.THRESHOLD_HIGH) + self._was_high = in_high_zone + + return events + + def reset(self) -> None: + """Clear all state (use when node restarts).""" + self._is_charging = False + self._was_low = False + self._was_high = True + self._percentage = 100.0 diff --git a/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/dock_detector.py b/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/dock_detector.py new file mode 100644 index 0000000..22f080e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/dock_detector.py @@ -0,0 +1,210 @@ +""" +dock_detector.py — ArUco marker + IR beacon detection for docking (Issue #158). + +Detection sources +───────────────── + ArUco marker : cv2.aruco marker on the dock face; PnP solve gives full 3-D + pose relative to the robot camera. Marker ID and physical + size are configurable. + + IR beacon : single ADC channel (normalised 0–1) with envelope follower + and amplitude threshold. Provides presence detection only + (no pose), used as a fallback or range-alert. + +Outputs +─────── + DockPose(distance_m, yaw_rad, lateral_m, source) + distance_m — straight-line distance to dock marker centre (m) + yaw_rad — bearing error (rad); +ve = dock is right of camera axis (tx > 0) + lateral_m — lateral offset (m) in camera frame; +ve = dock is right + source — "aruco" | "ir" + + ArucoDetector.detect() returns Optional[DockPose]; None if marker absent or + OpenCV / cv2.aruco are unavailable at runtime. + + IRBeaconDetector.update(sample) returns bool (True = beacon detected). + +Pure module — no ROS2 dependency. +""" + +from __future__ import annotations + +import math +from dataclasses import dataclass +from typing import Optional + + +# ── DockPose ───────────────────────────────────────────────────────────────── + +@dataclass +class DockPose: + """Dock pose in the robot's camera frame.""" + distance_m: float # straight-line distance to marker (m) + yaw_rad: float # bearing error (rad); +ve = dock is right of camera axis (tx > 0) + lateral_m: float # lateral offset (m) in camera X; +ve = dock is right + source: str # "aruco" | "ir" + + +# ── ArucoDetector ───────────────────────────────────────────────────────────── + +class ArucoDetector: + """ + Detect a single ArUco marker and solve its 3-D pose with PnP. + + Parameters + ---------- + marker_id : ArUco marker ID placed on the dock face (default 42) + marker_size_m : physical side length of the printed marker (m) + aruco_dict_id : cv2.aruco dictionary constant (default DICT_4X4_50 = 0) + """ + + MARKER_ID_DEFAULT = 42 + MARKER_SIZE_DEFAULT = 0.10 # 10 cm + + def __init__( + self, + marker_id: int = MARKER_ID_DEFAULT, + marker_size_m: float = MARKER_SIZE_DEFAULT, + aruco_dict_id: int = 0, # cv2.aruco.DICT_4X4_50 + ): + self._marker_id = marker_id + self._marker_size = marker_size_m + self._dict_id = aruco_dict_id + self._ready: Optional[bool] = None # None = not yet attempted + + # Set by _init(); None when OpenCV unavailable + self._cv2 = None + self._detector = None + self._obj_pts = None + + # ── Initialisation ──────────────────────────────────────────────────────── + + def _init(self) -> bool: + """Lazy-initialise cv2.aruco. Returns False if unavailable.""" + if self._ready is not None: + return self._ready + try: + import cv2 + import numpy as np + aruco_dict = cv2.aruco.getPredefinedDictionary(self._dict_id) + aruco_params = cv2.aruco.DetectorParameters() + self._detector = cv2.aruco.ArucoDetector(aruco_dict, aruco_params) + half = self._marker_size / 2.0 + self._obj_pts = np.array([ + [-half, half, 0.0], + [ half, half, 0.0], + [ half, -half, 0.0], + [-half, -half, 0.0], + ], dtype=np.float64) + self._cv2 = cv2 + self._ready = True + except Exception: + self._ready = False + return self._ready + + # ── Public API ──────────────────────────────────────────────────────────── + + @property + def available(self) -> bool: + """True if cv2.aruco is importable and initialised.""" + return self._init() + + def detect( + self, + image, # numpy.ndarray H×W×3 BGR (or greyscale) + camera_matrix, # numpy.ndarray 3×3 + dist_coeffs, # numpy.ndarray 1×4 or 1×5 + ) -> Optional[DockPose]: + """ + Detect the dock marker in *image* and return its pose. + + Returns None if the target marker is absent, PnP fails, or OpenCV is + not available. + """ + if not self._init(): + return None + + try: + corners, ids, _ = self._detector.detectMarkers(image) + except Exception: + return None + + if ids is None or len(ids) == 0: + return None + + import numpy as np + for i, mid in enumerate(ids.flatten()): + if int(mid) != self._marker_id: + continue + img_pts = corners[i].reshape(4, 2).astype(np.float64) + ok, rvec, tvec = self._cv2.solvePnP( + self._obj_pts, img_pts, + camera_matrix, dist_coeffs, + flags=self._cv2.SOLVEPNP_IPPE_SQUARE, + ) + if not ok: + continue + tx = float(tvec[0]) + ty = float(tvec[1]) + tz = float(tvec[2]) + distance = math.sqrt(tx * tx + ty * ty + tz * tz) + yaw = math.atan2(tx, tz) # camera-x deviation from optical axis + return DockPose( + distance_m=distance, + yaw_rad=yaw, + lateral_m=tx, + source="aruco", + ) + return None + + +# ── IRBeaconDetector ────────────────────────────────────────────────────────── + +class IRBeaconDetector: + """ + IR beacon presence detector from a single normalised ADC channel. + + Maintains an exponential envelope follower. When the smoothed amplitude + exceeds *threshold* the beacon is considered detected. + + Parameters + ---------- + threshold : amplitude above which beacon is 'detected' (default 0.5) + smoothing_alpha : EMA coefficient ∈ (0, 1]; 1 = no smoothing (default 0.3) + """ + + def __init__( + self, + threshold: float = 0.5, + smoothing_alpha: float = 0.3, + ): + self._threshold = max(1e-9, float(threshold)) + self._alpha = max(1e-9, min(1.0, smoothing_alpha)) + self._envelope = 0.0 + + # ── Properties ──────────────────────────────────────────────────────────── + + @property + def envelope(self) -> float: + """Current EMA envelope value.""" + return self._envelope + + @property + def detected(self) -> bool: + """True when smoothed amplitude ≥ threshold.""" + return self._envelope >= self._threshold + + # ── Update ──────────────────────────────────────────────────────────────── + + def update(self, sample: float) -> bool: + """ + Submit one ADC sample (normalised 0–1 or raw voltage). + Returns True if the beacon is currently detected. + """ + amp = abs(sample) + self._envelope = (1.0 - self._alpha) * self._envelope + self._alpha * amp + return self.detected + + def reset(self) -> None: + """Clear envelope to zero (e.g. when node restarts).""" + self._envelope = 0.0 diff --git a/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/docking_node.py b/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/docking_node.py new file mode 100644 index 0000000..f60151d --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/docking_node.py @@ -0,0 +1,482 @@ +""" +docking_node.py — Autonomous docking station return (Issue #158). + +Overview +──────── + Orchestrates dock detection, Nav2 corridor approach, visual-servo final + alignment, and charge monitoring. Interrupts the active Nav2 mission when + battery drops to 15 % and resumes when charged to 80 %. + +Pipeline (20 Hz) +──────────────── + 1. Camera callback → ArucoDetector → DockPose (if marker visible) + 2. IR callback → IRBeaconDetector → presence flag (fallback) + 3. Battery callback→ ChargeMonitor → ChargeEvents + 4. 20 Hz timer → DockingStateMachine.tick() + → publish cmd_vel (VISUAL_SERVO + UNDOCKING) + → Nav2 action calls (NAV2_APPROACH) + → publish DockingStatus + +Subscribes +────────── + sensor_msgs/Image — dock camera + sensor_msgs/CameraInfo — intrinsics for PnP + std_msgs/Float32 — IR beacon ADC sample + sensor_msgs/BatteryState — SOC + current + +Publishes +───────── + /saltybot/docking_status saltybot_docking_msgs/DockingStatus + /saltybot/docking_cmd_vel geometry_msgs/Twist — servo + undock commands + /saltybot/resume_mission std_msgs/Bool — trigger mission resume + +Services +──────── + /saltybot/dock saltybot_docking_msgs/Dock + /saltybot/undock saltybot_docking_msgs/Undock + +Action client +───────────── + /navigate_to_pose nav2_msgs/action/NavigateToPose (optional) + +Parameters +────────── + control_rate 20.0 Hz + image_topic /camera/image_raw + camera_info_topic /camera/camera_info + ir_topic /saltybot/ir_beacon + battery_topic /saltybot/battery_state + aruco_marker_id 42 + aruco_marker_size_m 0.10 + ir_threshold 0.50 + battery_low_pct 15.0 + battery_high_pct 80.0 + servo_range_m 1.00 m (switch to IBVS within this distance) + k_linear 0.30 + k_angular 0.80 + lateral_tol_m 0.005 m (±5 mm alignment tolerance) + contact_distance_m 0.05 m + max_linear_ms 0.10 m/s + max_angular_rads 0.30 rad/s + undock_speed_ms -0.20 m/s + undock_ticks_max 50 (2.5 s at 20 Hz) + lost_ticks_max 40 (2.0 s at 20 Hz) + contact_timeout_ticks 20 (1.0 s at 20 Hz) + dock_pose [0,0,0,0,0,0,1] [x,y,z,qx,qy,qz,qw] map frame + pre_dock_pose [0,0,0,0,0,0,1] 1 m in front of dock +""" + +import json +import math + +import rclpy +from rclpy.action import ActionClient +from rclpy.node import Node +from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy + +from geometry_msgs.msg import PoseStamped, Twist +from sensor_msgs.msg import BatteryState, CameraInfo, Image +from std_msgs.msg import Bool, Float32 + +from saltybot_docking.charge_monitor import ChargeEvent, ChargeMonitor +from saltybot_docking.dock_detector import ArucoDetector, DockPose, IRBeaconDetector +from saltybot_docking.docking_state_machine import ( + DockingInputs, + DockingState, + DockingStateMachine, +) +from saltybot_docking.visual_servo import VisualServo + +try: + from saltybot_docking_msgs.msg import DockingStatus + from saltybot_docking_msgs.srv import Dock, Undock + _MSGS_OK = True +except ImportError: + _MSGS_OK = False + +try: + from nav2_msgs.action import NavigateToPose + _NAV2_OK = True +except ImportError: + _NAV2_OK = False + +try: + import cv2 + import numpy as np + _CV2_OK = True +except ImportError: + _CV2_OK = False + + +# ── Helper ──────────────────────────────────────────────────────────────────── + +def _pose_param_to_stamped(vals: list, frame_id: str, stamp) -> PoseStamped: + """Convert [x,y,z,qx,qy,qz,qw] list to PoseStamped.""" + ps = PoseStamped() + ps.header.frame_id = frame_id + ps.header.stamp = stamp + if len(vals) >= 7: + ps.pose.position.x = float(vals[0]) + ps.pose.position.y = float(vals[1]) + ps.pose.position.z = float(vals[2]) + ps.pose.orientation.x = float(vals[3]) + ps.pose.orientation.y = float(vals[4]) + ps.pose.orientation.z = float(vals[5]) + ps.pose.orientation.w = float(vals[6]) + return ps + + +# ── Node ────────────────────────────────────────────────────────────────────── + +class DockingNode(Node): + + def __init__(self): + super().__init__("docking") + + # ── Parameters ─────────────────────────────────────────────────────── + self._declare_params() + p = self._load_params() + + # ── Sub-systems ────────────────────────────────────────────────────── + self._aruco = ArucoDetector( + marker_id=p["aruco_marker_id"], + marker_size_m=p["aruco_marker_size_m"], + ) + self._ir = IRBeaconDetector( + threshold=p["ir_threshold"], + ) + self._servo = VisualServo( + k_linear=p["k_linear"], + k_angular=p["k_angular"], + lateral_tol_m=p["lateral_tol_m"], + contact_distance_m=p["contact_distance_m"], + max_linear_ms=p["max_linear_ms"], + max_angular_rads=p["max_angular_rads"], + ) + self._charge = ChargeMonitor( + low_threshold_pct=p["battery_low_pct"], + high_threshold_pct=p["battery_high_pct"], + ) + self._fsm = DockingStateMachine( + battery_low_pct=p["battery_low_pct"], + battery_high_pct=p["battery_high_pct"], + servo_range_m=p["servo_range_m"], + undock_speed_ms=p["undock_speed_ms"], + undock_ticks_max=p["undock_ticks_max"], + lost_ticks_max=p["lost_ticks_max"], + contact_timeout_ticks=p["contact_timeout_ticks"], + ) + + # ── Mutable state ──────────────────────────────────────────────────── + self._latest_dock_pose: DockPose | None = None + self._latest_servo_aligned: bool = False + self._nav2_arrived: bool = False + self._nav2_failed: bool = False + self._dock_requested: bool = False + self._undock_requested: bool = False + self._camera_matrix = None + self._dist_coeffs = None + self._nav2_goal_handle = None + + # ── QoS ────────────────────────────────────────────────────────────── + reliable = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=10, + ) + best_effort = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=1, + ) + + # ── Subscriptions ──────────────────────────────────────────────────── + self.create_subscription(Image, p["image_topic"], self._image_cb, best_effort) + self.create_subscription(CameraInfo, p["camera_info_topic"], self._cam_info_cb, reliable) + self.create_subscription(Float32, p["ir_topic"], self._ir_cb, best_effort) + self.create_subscription(BatteryState, p["battery_topic"], self._battery_cb, reliable) + + # ── Publishers ─────────────────────────────────────────────────────── + self._cmd_vel_pub = self.create_publisher( + Twist, "/saltybot/docking_cmd_vel", reliable + ) + self._resume_pub = self.create_publisher( + Bool, "/saltybot/resume_mission", reliable + ) + self._status_pub = None + if _MSGS_OK: + self._status_pub = self.create_publisher( + DockingStatus, "/saltybot/docking_status", reliable + ) + + # ── Services ───────────────────────────────────────────────────────── + if _MSGS_OK: + self.create_service(Dock, "/saltybot/dock", self._dock_srv) + self.create_service(Undock, "/saltybot/undock", self._undock_srv) + + # ── Nav2 action client (optional) ──────────────────────────────────── + self._nav2_client = None + if _NAV2_OK: + self._nav2_client = ActionClient(self, NavigateToPose, "navigate_to_pose") + + # ── Timer ──────────────────────────────────────────────────────────── + rate = p["control_rate"] + self._timer = self.create_timer(1.0 / rate, self._control_cb) + + self.get_logger().info( + f"DockingNode ready rate={rate}Hz " + f"nav2={'yes' if _NAV2_OK else 'no'} " + f"aruco={'yes' if _CV2_OK else 'no'}" + ) + + # ── Parameters ──────────────────────────────────────────────────────────── + + def _declare_params(self) -> None: + self.declare_parameter("control_rate", 20.0) + self.declare_parameter("image_topic", "/camera/image_raw") + self.declare_parameter("camera_info_topic", "/camera/camera_info") + self.declare_parameter("ir_topic", "/saltybot/ir_beacon") + self.declare_parameter("battery_topic", "/saltybot/battery_state") + self.declare_parameter("aruco_marker_id", 42) + self.declare_parameter("aruco_marker_size_m", 0.10) + self.declare_parameter("ir_threshold", 0.50) + self.declare_parameter("battery_low_pct", 15.0) + self.declare_parameter("battery_high_pct", 80.0) + self.declare_parameter("servo_range_m", 1.00) + self.declare_parameter("k_linear", 0.30) + self.declare_parameter("k_angular", 0.80) + self.declare_parameter("lateral_tol_m", 0.005) + self.declare_parameter("contact_distance_m", 0.05) + self.declare_parameter("max_linear_ms", 0.10) + self.declare_parameter("max_angular_rads", 0.30) + self.declare_parameter("undock_speed_ms", -0.20) + self.declare_parameter("undock_ticks_max", 50) + self.declare_parameter("lost_ticks_max", 40) + self.declare_parameter("contact_timeout_ticks", 20) + self.declare_parameter("dock_pose", [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]) + self.declare_parameter("pre_dock_pose", [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]) + + def _load_params(self) -> dict: + g = self.get_parameter + keys = [ + "control_rate", + "image_topic", "camera_info_topic", "ir_topic", "battery_topic", + "aruco_marker_id", "aruco_marker_size_m", "ir_threshold", + "battery_low_pct", "battery_high_pct", + "servo_range_m", "k_linear", "k_angular", + "lateral_tol_m", "contact_distance_m", "max_linear_ms", "max_angular_rads", + "undock_speed_ms", "undock_ticks_max", "lost_ticks_max", "contact_timeout_ticks", + "dock_pose", "pre_dock_pose", + ] + return {k: g(k).value for k in keys} + + # ── Subscriptions ───────────────────────────────────────────────────────── + + def _cam_info_cb(self, msg: CameraInfo) -> None: + if _CV2_OK and self._camera_matrix is None: + self._camera_matrix = np.array(msg.k, dtype=np.float64).reshape(3, 3) + self._dist_coeffs = np.array(msg.d, dtype=np.float64) + + def _image_cb(self, msg: Image) -> None: + if not _CV2_OK or self._camera_matrix is None: + return + try: + img = self._ros_image_to_bgr(msg) + except Exception: + return + pose = self._aruco.detect(img, self._camera_matrix, self._dist_coeffs) + if pose is not None: + self._latest_dock_pose = pose + # Update visual servo for aligned flag + servo_result = self._servo.update(pose) + self._latest_servo_aligned = servo_result.aligned + + def _ir_cb(self, msg: Float32) -> None: + detected = self._ir.update(msg.data) + # IR gives presence only — supply a synthetic far DockPose if no ArUco + if detected and self._latest_dock_pose is None: + # Beacon says dock is ahead, distance unknown; use sentinel 5.0 m + self._latest_dock_pose = DockPose( + distance_m=5.0, yaw_rad=0.0, lateral_m=0.0, source="ir" + ) + + def _battery_cb(self, msg: BatteryState) -> None: + pct = float(msg.percentage) * 100.0 if msg.percentage <= 1.0 else float(msg.percentage) + current = float(msg.current) + events = self._charge.update(pct, current) + for ev in events: + if ev == ChargeEvent.THRESHOLD_LOW: + self.get_logger().warn( + f"Battery low ({pct:.1f}%) — initiating auto-dock" + ) + elif ev == ChargeEvent.CHARGING_STARTED: + self.get_logger().info("Charging started") + elif ev == ChargeEvent.THRESHOLD_HIGH: + self.get_logger().info( + f"Battery charged ({pct:.1f}%) — ready to undock" + ) + + # ── 20 Hz control loop ──────────────────────────────────────────────────── + + def _control_cb(self) -> None: + inp = DockingInputs( + battery_pct=self._charge.percentage, + is_charging=self._charge.is_charging, + dock_pose=self._latest_dock_pose, + servo_aligned=self._latest_servo_aligned, + dock_requested=self._dock_requested, + undock_requested=self._undock_requested, + nav2_arrived=self._nav2_arrived, + nav2_failed=self._nav2_failed, + ) + # Consume one-shot flags + self._dock_requested = False + self._undock_requested = False + self._nav2_arrived = False + self._nav2_failed = False + # Clear dock pose so stale detections don't persist past one tick + self._latest_dock_pose = None + self._latest_servo_aligned = False + + out = self._fsm.tick(inp) + + # ── State-entry side effects ────────────────────────────────────────── + if out.state_changed: + self.get_logger().info(f"Docking FSM → {out.state.value}") + + if out.request_nav2: + self._send_nav2_goal() + if out.cancel_nav2: + self._cancel_nav2_goal() + + if out.mission_interrupted: + self.get_logger().warn("Mission interrupted for auto-dock (battery low)") + + if out.mission_resume: + msg = Bool() + msg.data = True + self._resume_pub.publish(msg) + self.get_logger().info("Auto-dock complete — resuming mission") + + # ── Velocity commands ───────────────────────────────────────────────── + if out.state in (DockingState.VISUAL_SERVO, DockingState.UNDOCKING): + twist = Twist() + twist.linear.x = out.cmd_linear + twist.angular.z = out.cmd_angular + self._cmd_vel_pub.publish(twist) + + # ── Status ──────────────────────────────────────────────────────────── + if self._status_pub is not None: + self._publish_status(inp, out) + + # ── Nav2 integration ────────────────────────────────────────────────────── + + def _send_nav2_goal(self) -> None: + if not _NAV2_OK or self._nav2_client is None: + self.get_logger().warn("Nav2 unavailable — skipping pre-dock approach") + self._nav2_arrived = True # skip straight to VISUAL_SERVO next tick + return + if not self._nav2_client.wait_for_server(timeout_sec=1.0): + self.get_logger().warn("Nav2 action server not ready — skipping approach") + self._nav2_arrived = True + return + p = self._load_params() + goal = NavigateToPose.Goal() + goal.pose = _pose_param_to_stamped( + p["pre_dock_pose"], "map", self.get_clock().now().to_msg() + ) + send_future = self._nav2_client.send_goal_async(goal) + send_future.add_done_callback(self._nav2_goal_response_cb) + + def _nav2_goal_response_cb(self, future) -> None: + goal_handle = future.result() + if not goal_handle or not goal_handle.accepted: + self.get_logger().warn("Nav2 goal rejected") + self._nav2_failed = True + return + self._nav2_goal_handle = goal_handle + result_future = goal_handle.get_result_async() + result_future.add_done_callback(self._nav2_result_cb) + + def _nav2_result_cb(self, future) -> None: + result = future.result() + if result and result.status == 4: # SUCCEEDED + self._nav2_arrived = True + else: + self._nav2_failed = True + + def _cancel_nav2_goal(self) -> None: + if self._nav2_goal_handle is not None: + self._nav2_goal_handle.cancel_goal_async() + self._nav2_goal_handle = None + + # ── Services ────────────────────────────────────────────────────────────── + + def _dock_srv(self, request, response): + if self._fsm.state not in (DockingState.IDLE,) and not request.force: + response.success = False + response.message = f"Already in state {self._fsm.state.value}; use force=True to override" + return response + self._dock_requested = True + response.success = True + response.message = "Docking initiated" + return response + + def _undock_srv(self, request, response): + if self._fsm.state == DockingState.IDLE: + response.success = False + response.message = "Already idle" + return response + self._undock_requested = True + response.success = True + response.message = "Undocking initiated" + return response + + # ── Publishing ──────────────────────────────────────────────────────────── + + def _publish_status(self, inp: DockingInputs, out) -> None: + msg = DockingStatus() + msg.stamp = self.get_clock().now().to_msg() + msg.state = out.state.value + msg.dock_detected = inp.dock_pose is not None + msg.distance_m = float(inp.dock_pose.distance_m) if inp.dock_pose else float("nan") + msg.lateral_error_m = float(inp.dock_pose.lateral_m) if inp.dock_pose else float("nan") + msg.battery_pct = float(self._charge.percentage) + msg.charging = self._charge.is_charging + msg.aligned = inp.servo_aligned + self._status_pub.publish(msg) + + # ── Utilities ───────────────────────────────────────────────────────────── + + @staticmethod + def _ros_image_to_bgr(msg: Image): + """Convert sensor_msgs/Image to cv2 BGR numpy array.""" + import numpy as np + data = np.frombuffer(msg.data, dtype=np.uint8) + if msg.encoding in ("bgr8", "rgb8"): + img = data.reshape(msg.height, msg.width, 3) + if msg.encoding == "rgb8": + img = img[:, :, ::-1].copy() + elif msg.encoding == "mono8": + img = data.reshape(msg.height, msg.width) + else: + img = data.reshape(msg.height, msg.width, -1) + return img + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None): + rclpy.init(args=args) + node = DockingNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/docking_state_machine.py b/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/docking_state_machine.py new file mode 100644 index 0000000..d131d11 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/docking_state_machine.py @@ -0,0 +1,257 @@ +""" +docking_state_machine.py — FSM for autonomous docking / undocking (Issue #158). + +States +────── + IDLE : normal operation; watching battery. + DETECTING : scanning for dock marker or IR beacon. + NAV2_APPROACH : Nav2 navigating to pre-dock waypoint (~1 m from dock face). + VISUAL_SERVO : IBVS final approach (<= servo_range_m to dock). + CONTACT : dock pins touched; waiting for charge current. + CHARGING : charging in progress. + UNDOCKING : backing away from dock at fixed reverse speed. + +Transitions +─────────── + IDLE → DETECTING : battery_pct < battery_low_pct OR dock_requested + DETECTING → NAV2_APPROACH : dock_pose found AND distance > servo_range_m + DETECTING → VISUAL_SERVO : dock_pose found AND distance ≤ servo_range_m + NAV2_APPROACH → VISUAL_SERVO : dock_pose found AND distance ≤ servo_range_m + OR nav2_arrived + NAV2_APPROACH → DETECTING : nav2_failed (retry detection) + VISUAL_SERVO → CONTACT : servo_aligned + VISUAL_SERVO → DETECTING : dock lost for > lost_ticks_max ticks + CONTACT → CHARGING : is_charging + CONTACT → VISUAL_SERVO : contact_timeout_ticks exceeded (retry) + CHARGING → UNDOCKING : battery_pct >= battery_high_pct + OR undock_requested + UNDOCKING → IDLE : undock_ticks >= undock_ticks_max + +Any state → IDLE : undock_requested (except CHARGING → UNDOCKING) +DETECTING → IDLE : undock_requested (abort scan) + +Output signals +────────────── + cmd_linear, cmd_angular : velocity during VISUAL_SERVO and UNDOCKING + request_nav2 : send Nav2 goal (emitted once on ENTRY to NAV2_APPROACH) + cancel_nav2 : cancel Nav2 goal + mission_interrupted : battery forced dock (for mission planner) + mission_resume : auto-resume after charge cycle completes + state_changed : FSM state changed this tick + +Pure module — no ROS2 dependency. +""" + +from __future__ import annotations + +import math +from dataclasses import dataclass, field +from enum import Enum +from typing import Optional + +from saltybot_docking.dock_detector import DockPose + + +# ── States ──────────────────────────────────────────────────────────────────── + +class DockingState(Enum): + IDLE = "IDLE" + DETECTING = "DETECTING" + NAV2_APPROACH = "NAV2_APPROACH" + VISUAL_SERVO = "VISUAL_SERVO" + CONTACT = "CONTACT" + CHARGING = "CHARGING" + UNDOCKING = "UNDOCKING" + + +# ── I/O dataclasses ─────────────────────────────────────────────────────────── + +@dataclass +class DockingInputs: + """Sensor snapshot fed to the FSM each tick.""" + battery_pct: float = 100.0 + is_charging: bool = False + dock_pose: Optional[DockPose] = None # None = dock not visible + servo_aligned: bool = False # visual servo declared aligned + dock_requested: bool = False # explicit /saltybot/dock call + undock_requested: bool = False # explicit /saltybot/undock call + nav2_arrived: bool = False # Nav2 action succeeded + nav2_failed: bool = False # Nav2 action failed/cancelled + + +@dataclass +class DockingOutputs: + """Commands and signals produced by the FSM each tick.""" + state: DockingState = DockingState.IDLE + cmd_linear: float = 0.0 # m/s (VISUAL_SERVO + UNDOCKING) + cmd_angular: float = 0.0 # rad/s (VISUAL_SERVO) + request_nav2: bool = False # send Nav2 goal + cancel_nav2: bool = False # cancel Nav2 goal + mission_interrupted: bool = False # battery dock forced + mission_resume: bool = False # charge cycle complete, resume + state_changed: bool = False # FSM state changed this tick + + +# ── DockingStateMachine ─────────────────────────────────────────────────────── + +class DockingStateMachine: + """ + Deterministic docking FSM. + + Parameters + ---------- + battery_low_pct : SOC that triggers auto-dock (default 15 %) + battery_high_pct : SOC at which charging completes (default 80 %) + servo_range_m : switch from Nav2 to visual servo (default 1.0 m) + undock_speed_ms : reverse speed during undocking (m/s, must be ≤ 0) + undock_ticks_max : ticks at undock_speed before returning to IDLE + lost_ticks_max : allowed VISUAL_SERVO ticks with no detection + contact_timeout_ticks : CONTACT ticks before retry alignment + """ + + def __init__( + self, + battery_low_pct: float = 15.0, + battery_high_pct: float = 80.0, + servo_range_m: float = 1.0, + undock_speed_ms: float = -0.20, + undock_ticks_max: int = 50, # 2.5 s at 20 Hz + lost_ticks_max: int = 40, # 2.0 s at 20 Hz + contact_timeout_ticks: int = 20, # 1.0 s at 20 Hz + ): + self._bat_low = float(battery_low_pct) + self._bat_high = float(battery_high_pct) + self._servo_range = float(servo_range_m) + self._undock_speed = min(0.0, float(undock_speed_ms)) + self._undock_max = max(1, int(undock_ticks_max)) + self._lost_max = max(1, int(lost_ticks_max)) + self._contact_max = max(1, int(contact_timeout_ticks)) + + self._state = DockingState.IDLE + self._undock_ticks = 0 + self._lost_ticks = 0 + self._contact_ticks = 0 + self._mission_interrupted = False + self._resume_on_undock = False + + # ── Public API ──────────────────────────────────────────────────────────── + + @property + def state(self) -> DockingState: + return self._state + + def reset(self) -> None: + """Return to IDLE and clear all counters (use for unit tests).""" + self._state = DockingState.IDLE + self._undock_ticks = 0 + self._lost_ticks = 0 + self._contact_ticks = 0 + self._mission_interrupted = False + self._resume_on_undock = False + + def tick(self, inputs: DockingInputs) -> DockingOutputs: + """Advance the FSM by one tick and return output commands.""" + prev_state = self._state + out = self._step(inputs) + out.state = self._state + out.state_changed = (self._state != prev_state) + if out.state_changed: + self._on_enter(self._state) + return out + + # ── Internal step ───────────────────────────────────────────────────────── + + def _step(self, inp: DockingInputs) -> DockingOutputs: + out = DockingOutputs(state=self._state) + + # ── IDLE ────────────────────────────────────────────────────────────── + if self._state == DockingState.IDLE: + if inp.dock_requested: + self._state = DockingState.DETECTING + elif inp.battery_pct < self._bat_low: + self._state = DockingState.DETECTING + out.mission_interrupted = True + self._mission_interrupted = True + + # ── DETECTING ───────────────────────────────────────────────────────── + elif self._state == DockingState.DETECTING: + if inp.undock_requested: + self._state = DockingState.IDLE + self._mission_interrupted = False + elif inp.dock_pose is not None: + if inp.dock_pose.distance_m <= self._servo_range: + self._state = DockingState.VISUAL_SERVO + else: + self._state = DockingState.NAV2_APPROACH + out.request_nav2 = True + + # ── NAV2_APPROACH ───────────────────────────────────────────────────── + elif self._state == DockingState.NAV2_APPROACH: + if inp.undock_requested: + self._state = DockingState.IDLE + out.cancel_nav2 = True + self._mission_interrupted = False + elif inp.dock_pose is not None and inp.dock_pose.distance_m <= self._servo_range: + self._state = DockingState.VISUAL_SERVO + out.cancel_nav2 = True + elif inp.nav2_arrived: + self._state = DockingState.VISUAL_SERVO + elif inp.nav2_failed: + self._state = DockingState.DETECTING # retry scan + + # ── VISUAL_SERVO ────────────────────────────────────────────────────── + elif self._state == DockingState.VISUAL_SERVO: + if inp.undock_requested: + self._state = DockingState.IDLE + elif inp.dock_pose is None: + self._lost_ticks += 1 + if self._lost_ticks >= self._lost_max: + self._state = DockingState.DETECTING + else: + self._lost_ticks = 0 + if inp.servo_aligned: + self._state = DockingState.CONTACT + + # ── CONTACT ─────────────────────────────────────────────────────────── + elif self._state == DockingState.CONTACT: + if inp.undock_requested: + self._state = DockingState.UNDOCKING + self._resume_on_undock = False + elif inp.is_charging: + self._state = DockingState.CHARGING + else: + self._contact_ticks += 1 + if self._contact_ticks >= self._contact_max: + self._state = DockingState.VISUAL_SERVO # realign + + # ── CHARGING ────────────────────────────────────────────────────────── + elif self._state == DockingState.CHARGING: + if inp.undock_requested: + self._state = DockingState.UNDOCKING + self._resume_on_undock = False + elif inp.battery_pct >= self._bat_high: + self._state = DockingState.UNDOCKING + self._resume_on_undock = self._mission_interrupted + + # ── UNDOCKING ───────────────────────────────────────────────────────── + elif self._state == DockingState.UNDOCKING: + out.cmd_linear = self._undock_speed + self._undock_ticks += 1 + if self._undock_ticks >= self._undock_max: + out.mission_resume = self._resume_on_undock + self._mission_interrupted = False + self._resume_on_undock = False + self._state = DockingState.IDLE + + return out + + # ── Entry actions ───────────────────────────────────────────────────────── + + def _on_enter(self, state: DockingState) -> None: + """Reset per-state counters when entering a new state.""" + if state == DockingState.VISUAL_SERVO: + self._lost_ticks = 0 + elif state == DockingState.CONTACT: + self._contact_ticks = 0 + elif state == DockingState.UNDOCKING: + self._undock_ticks = 0 diff --git a/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/visual_servo.py b/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/visual_servo.py new file mode 100644 index 0000000..369c5d4 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking/saltybot_docking/visual_servo.py @@ -0,0 +1,130 @@ +""" +visual_servo.py — Image-based visual servo for final dock approach (Issue #158). + +Control law +─────────── + Operates once the robot is within *servo_range_m* of the dock. + + Linear velocity (forward): + v = clip( k_linear × (distance_m − target_distance_m), 0, max_linear_ms ) + + Angular velocity (yaw correction): + ω = clip( −k_angular × yaw_rad, −max_angular_rads, +max_angular_rads ) + + The linear term drives the robot toward the dock until it reaches + target_distance_m. The clip to [0, …] prevents reversing. + + The angular term zeroes the bearing error; sign convention: + yaw_rad > 0 → dock is to the robot's left → turn left (ω > 0). + +Alignment declaration +───────────────────── + `aligned` is True when both: + |lateral_m| < lateral_tol_m (default ±5 mm) + distance_m < contact_distance_m (default 50 mm) + +Pure module — no ROS2 dependency. +""" + +from __future__ import annotations + +import math +from dataclasses import dataclass +from typing import Optional + +from saltybot_docking.dock_detector import DockPose + + +# ── Result ──────────────────────────────────────────────────────────────────── + +@dataclass +class VisualServoResult: + linear_x: float # forward velocity (m/s); ≥ 0 + angular_z: float # yaw velocity (rad/s) + distance_m: float # current distance to dock (m) + lateral_error_m: float # current lateral offset (m); +ve = dock is right + aligned: bool # True when within contact tolerances + + +# ── VisualServo ─────────────────────────────────────────────────────────────── + +class VisualServo: + """ + Proportional visual servo controller for dock final approach. + + Parameters + ---------- + k_linear : distance gain (m/s per metre of error) + k_angular : yaw gain (rad/s per radian of bearing error) + target_distance_m : desired stop distance from dock face (m) + lateral_tol_m : lateral alignment tolerance (m) [spec ±5 mm → 0.005] + contact_distance_m: distance below which contact is assumed (m) + max_linear_ms : velocity ceiling (m/s) + max_angular_rads : yaw-rate ceiling (rad/s) + """ + + def __init__( + self, + k_linear: float = 0.30, + k_angular: float = 0.80, + target_distance_m: float = 0.02, # 20 mm — at the dock face + lateral_tol_m: float = 0.005, # ±5 mm + contact_distance_m: float = 0.05, # 50 mm — within contact zone + max_linear_ms: float = 0.10, # slow for precision approach + max_angular_rads: float = 0.30, + ): + self._k_lin = float(k_linear) + self._k_ang = float(k_angular) + self._target = float(target_distance_m) + self._lat_tol = max(1e-6, float(lateral_tol_m)) + self._contact = max(self._target, float(contact_distance_m)) + self._v_max = max(0.0, float(max_linear_ms)) + self._w_max = max(0.0, float(max_angular_rads)) + + # ── Control step ───────────────────────────────────────────────────────── + + def update(self, dock_pose: DockPose) -> VisualServoResult: + """ + Compute one velocity command from the current dock pose. + + Parameters + ---------- + dock_pose : DockPose from ArucoDetector or IRBeaconDetector + + Returns + ------- + VisualServoResult with cmd velocities and alignment flag + """ + dist = dock_pose.distance_m + yaw = dock_pose.yaw_rad + lat = dock_pose.lateral_m + + # Linear: approach until target_distance; no reversing + v_raw = self._k_lin * (dist - self._target) + v = max(0.0, min(self._v_max, v_raw)) + + # Angular: null yaw error; positive yaw → turn left (+ω) + w_raw = -self._k_ang * yaw + w = max(-self._w_max, min(self._w_max, w_raw)) + + aligned = (abs(lat) < self._lat_tol) and (dist < self._contact) + + return VisualServoResult( + linear_x=v, + angular_z=w, + distance_m=dist, + lateral_error_m=lat, + aligned=aligned, + ) + + def stop(self) -> VisualServoResult: + """ + Zero-velocity command — used when dock pose is momentarily lost. + """ + return VisualServoResult( + linear_x=0.0, + angular_z=0.0, + distance_m=float("nan"), + lateral_error_m=float("nan"), + aligned=False, + ) diff --git a/jetson/ros2_ws/src/saltybot_docking/setup.cfg b/jetson/ros2_ws/src/saltybot_docking/setup.cfg new file mode 100644 index 0000000..676a9f8 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/saltybot_docking + +[install] +install_scripts=$base/lib/saltybot_docking diff --git a/jetson/ros2_ws/src/saltybot_docking/setup.py b/jetson/ros2_ws/src/saltybot_docking/setup.py new file mode 100644 index 0000000..4973a51 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking/setup.py @@ -0,0 +1,32 @@ +from setuptools import setup, find_packages +import os +from glob import glob + +package_name = "saltybot_docking" + +setup( + name=package_name, + version="0.1.0", + packages=find_packages(exclude=["test"]), + data_files=[ + ("share/ament_index/resource_index/packages", + [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (os.path.join("share", package_name, "config"), + glob("config/*.yaml")), + (os.path.join("share", package_name, "launch"), + glob("launch/*.py")), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-controls", + maintainer_email="sl-controls@saltylab.local", + description="Docking station auto-return with ArUco/IR detection and visual servo", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + f"docking_node = {package_name}.docking_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_docking/test/test_docking.py b/jetson/ros2_ws/src/saltybot_docking/test/test_docking.py new file mode 100644 index 0000000..21b0774 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking/test/test_docking.py @@ -0,0 +1,580 @@ +""" +test_docking.py — Unit tests for Issue #158 docking modules. + +Covers: + IRBeaconDetector — envelope follower, threshold, reset + VisualServo — velocity commands, clamping, alignment declaration + ChargeMonitor — event detection, edge-triggering, threshold crossing + DockingStateMachine — all state transitions and guard conditions +""" + +import math +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +import pytest + +from saltybot_docking.dock_detector import DockPose, IRBeaconDetector +from saltybot_docking.visual_servo import VisualServo, VisualServoResult +from saltybot_docking.charge_monitor import ChargeEvent, ChargeMonitor +from saltybot_docking.docking_state_machine import ( + DockingInputs, + DockingState, + DockingStateMachine, +) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _make_ir(**kw): + defaults = dict(threshold=0.5, smoothing_alpha=1.0) + defaults.update(kw) + return IRBeaconDetector(**defaults) + + +def _make_servo(**kw): + defaults = dict( + k_linear=1.0, + k_angular=1.0, + target_distance_m=0.02, + lateral_tol_m=0.005, + contact_distance_m=0.05, + max_linear_ms=0.5, + max_angular_rads=1.0, + ) + defaults.update(kw) + return VisualServo(**defaults) + + +def _make_charge(**kw): + defaults = dict( + low_threshold_pct=15.0, + high_threshold_pct=80.0, + charging_current_threshold_a=0.10, + ) + defaults.update(kw) + return ChargeMonitor(**defaults) + + +def _make_fsm(**kw): + defaults = dict( + battery_low_pct=15.0, + battery_high_pct=80.0, + servo_range_m=1.0, + undock_speed_ms=-0.20, + undock_ticks_max=5, + lost_ticks_max=3, + contact_timeout_ticks=3, + ) + defaults.update(kw) + return DockingStateMachine(**defaults) + + +def _inp(**kw): + return DockingInputs(**kw) + + +def _near_pose(dist=0.5): + """Dock pose within servo range (default 0.5 m).""" + return DockPose(distance_m=dist, yaw_rad=0.0, lateral_m=0.0, source="aruco") + + +def _far_pose(dist=3.0): + """Dock pose outside servo range.""" + return DockPose(distance_m=dist, yaw_rad=0.0, lateral_m=0.0, source="aruco") + + +# ══════════════════════════════════════════════════════════════════════════════ +# IRBeaconDetector +# ══════════════════════════════════════════════════════════════════════════════ + +class TestIRBeaconDetector: + + def test_zero_signal_not_detected(self): + ir = _make_ir() + assert ir.update(0.0) is False + assert ir.detected is False + + def test_above_threshold_detected(self): + ir = _make_ir(threshold=0.5, smoothing_alpha=1.0) + assert ir.update(0.6) is True + assert ir.detected is True + + def test_below_threshold_not_detected(self): + ir = _make_ir(threshold=0.5, smoothing_alpha=1.0) + assert ir.update(0.4) is False + + def test_exactly_at_threshold_detected(self): + ir = _make_ir(threshold=0.5, smoothing_alpha=1.0) + assert ir.update(0.5) is True + + def test_envelope_smoothing_alpha_one(self): + """With alpha=1, envelope equals |sample| immediately.""" + ir = _make_ir(smoothing_alpha=1.0) + ir.update(0.7) + assert ir.envelope == pytest.approx(0.7, abs=1e-9) + + def test_envelope_smoothing_below_one(self): + """With alpha=0.5, envelope is a running average.""" + ir = _make_ir(threshold=999.0, smoothing_alpha=0.5) + ir.update(1.0) # envelope = 0.5 + assert ir.envelope == pytest.approx(0.5, abs=1e-6) + ir.update(1.0) # envelope = 0.75 + assert ir.envelope == pytest.approx(0.75, abs=1e-6) + + def test_reset_clears_envelope(self): + ir = _make_ir(smoothing_alpha=1.0) + ir.update(0.8) + ir.reset() + assert ir.envelope == pytest.approx(0.0, abs=1e-9) + assert ir.detected is False + + def test_negative_sample_uses_abs(self): + """update() should use absolute value of sample.""" + ir = _make_ir(threshold=0.5, smoothing_alpha=1.0) + assert ir.update(-0.7) is True + + +# ══════════════════════════════════════════════════════════════════════════════ +# VisualServo +# ══════════════════════════════════════════════════════════════════════════════ + +class TestVisualServo: + + def test_far_dock_gives_forward_velocity(self): + servo = _make_servo(k_linear=1.0, target_distance_m=0.02) + pose = DockPose(distance_m=0.5, yaw_rad=0.0, lateral_m=0.0, source="aruco") + result = servo.update(pose) + assert result.linear_x > 0.0 + + def test_at_target_distance_zero_linear(self): + servo = _make_servo(k_linear=1.0, target_distance_m=0.02) + pose = DockPose(distance_m=0.02, yaw_rad=0.0, lateral_m=0.0, source="aruco") + result = servo.update(pose) + assert result.linear_x == pytest.approx(0.0, abs=1e-9) + + def test_behind_target_no_reverse(self): + """When distance < target, linear velocity must be 0 (no reversing).""" + servo = _make_servo(k_linear=1.0, target_distance_m=0.10) + pose = DockPose(distance_m=0.01, yaw_rad=0.0, lateral_m=0.0, source="aruco") + result = servo.update(pose) + assert result.linear_x == pytest.approx(0.0, abs=1e-9) + + def test_yaw_right_gives_negative_angular(self): + """yaw_rad = atan2(tx, tz); tx > 0 → dock is right in camera → turn right (ω < 0).""" + servo = _make_servo(k_angular=1.0) + pose = DockPose(distance_m=0.5, yaw_rad=0.3, lateral_m=0.0, source="aruco") + result = servo.update(pose) + assert result.angular_z < 0.0 + + def test_yaw_left_gives_positive_angular(self): + """tx < 0 → dock is left in camera → turn left (ω > 0).""" + servo = _make_servo(k_angular=1.0) + pose = DockPose(distance_m=0.5, yaw_rad=-0.3, lateral_m=0.0, source="aruco") + result = servo.update(pose) + assert result.angular_z > 0.0 + + def test_zero_yaw_zero_angular(self): + servo = _make_servo() + pose = DockPose(distance_m=0.5, yaw_rad=0.0, lateral_m=0.0, source="aruco") + result = servo.update(pose) + assert result.angular_z == pytest.approx(0.0, abs=1e-9) + + def test_max_linear_clamped(self): + servo = _make_servo(k_linear=100.0, max_linear_ms=0.1) + pose = DockPose(distance_m=5.0, yaw_rad=0.0, lateral_m=0.0, source="aruco") + result = servo.update(pose) + assert result.linear_x <= 0.1 + 1e-9 + + def test_max_angular_clamped(self): + servo = _make_servo(k_angular=100.0, max_angular_rads=0.3) + pose = DockPose(distance_m=0.5, yaw_rad=5.0, lateral_m=0.0, source="aruco") + result = servo.update(pose) + assert abs(result.angular_z) <= 0.3 + 1e-9 + + def test_aligned_when_within_tolerances(self): + servo = _make_servo(lateral_tol_m=0.005, contact_distance_m=0.05) + pose = DockPose(distance_m=0.03, yaw_rad=0.0, lateral_m=0.002, source="aruco") + result = servo.update(pose) + assert result.aligned is True + + def test_not_aligned_lateral_too_large(self): + servo = _make_servo(lateral_tol_m=0.005, contact_distance_m=0.05) + pose = DockPose(distance_m=0.03, yaw_rad=0.0, lateral_m=0.010, source="aruco") + result = servo.update(pose) + assert result.aligned is False + + def test_not_aligned_too_far(self): + servo = _make_servo(lateral_tol_m=0.005, contact_distance_m=0.05) + pose = DockPose(distance_m=0.10, yaw_rad=0.0, lateral_m=0.001, source="aruco") + result = servo.update(pose) + assert result.aligned is False + + def test_stop_returns_zero_velocity(self): + servo = _make_servo() + result = servo.stop() + assert result.linear_x == pytest.approx(0.0, abs=1e-9) + assert result.angular_z == pytest.approx(0.0, abs=1e-9) + assert result.aligned is False + + def test_proportional_linear(self): + """Linear velocity scales with (distance − target).""" + servo = _make_servo(k_linear=2.0, target_distance_m=0.0, max_linear_ms=999.0) + pose = DockPose(distance_m=0.3, yaw_rad=0.0, lateral_m=0.0, source="aruco") + result = servo.update(pose) + assert result.linear_x == pytest.approx(2.0 * 0.3, rel=1e-6) + + +# ══════════════════════════════════════════════════════════════════════════════ +# ChargeMonitor +# ══════════════════════════════════════════════════════════════════════════════ + +class TestChargeMonitor: + + def test_initially_not_charging(self): + cm = _make_charge() + assert cm.is_charging is False + + def test_charging_started_event(self): + cm = _make_charge(charging_current_threshold_a=0.1) + events = cm.update(50.0, 0.5) + assert ChargeEvent.CHARGING_STARTED in events + + def test_charging_stopped_event(self): + cm = _make_charge(charging_current_threshold_a=0.1) + cm.update(50.0, 0.5) # start charging + events = cm.update(50.0, 0.0) # stop + assert ChargeEvent.CHARGING_STOPPED in events + + def test_no_duplicate_charging_started(self): + """CHARGING_STARTED fires only once per rising edge.""" + cm = _make_charge(charging_current_threshold_a=0.1) + cm.update(50.0, 0.5) # fires CHARGING_STARTED + events = cm.update(50.0, 0.5) # still charging — no duplicate + assert ChargeEvent.CHARGING_STARTED not in events + + def test_low_threshold_event_falling_edge(self): + """THRESHOLD_LOW fires when SOC first drops below low_threshold_pct.""" + cm = _make_charge(low_threshold_pct=20.0) + events = cm.update(19.0, 0.0) + assert ChargeEvent.THRESHOLD_LOW in events + + def test_no_duplicate_low_threshold(self): + cm = _make_charge(low_threshold_pct=20.0) + cm.update(19.0, 0.0) # fires once + events = cm.update(18.0, 0.0) # still below — no repeat + assert ChargeEvent.THRESHOLD_LOW not in events + + def test_high_threshold_event_rising_edge(self): + """THRESHOLD_HIGH fires when SOC rises above high_threshold_pct.""" + cm = _make_charge(low_threshold_pct=15.0, high_threshold_pct=80.0) + # Start below high threshold + cm.update(50.0, 0.5) # sets was_high = False (50 < 80) + events = cm.update(81.0, 0.5) # rises above 80 + assert ChargeEvent.THRESHOLD_HIGH in events + + def test_no_high_event_if_always_above(self): + """THRESHOLD_HIGH should not fire if SOC starts above threshold.""" + cm = _make_charge(high_threshold_pct=80.0) + # Default _was_high=True; remains above → no edge + events = cm.update(90.0, 0.0) + assert ChargeEvent.THRESHOLD_HIGH not in events + + def test_is_depleted_property(self): + cm = _make_charge(low_threshold_pct=20.0) + cm.update(15.0, 0.0) + assert cm.is_depleted is True + + def test_is_charged_property(self): + cm = _make_charge(high_threshold_pct=80.0) + cm.update(85.0, 0.0) + assert cm.is_charged is True + + def test_percentage_tracking(self): + cm = _make_charge() + cm.update(42.5, 0.0) + assert cm.percentage == pytest.approx(42.5, abs=1e-6) + + def test_reset_clears_state(self): + cm = _make_charge(charging_current_threshold_a=0.1) + cm.update(10.0, 0.5) # is_charging=True, is_depleted=True + cm.reset() + assert cm.is_charging is False + assert cm.is_depleted is False + assert cm.percentage == pytest.approx(100.0, abs=1e-6) + + +# ══════════════════════════════════════════════════════════════════════════════ +# DockingStateMachine +# ══════════════════════════════════════════════════════════════════════════════ + +class TestDockingStateMachineBasic: + + def test_initial_state_is_idle(self): + fsm = _make_fsm() + assert fsm.state == DockingState.IDLE + + def test_idle_stays_idle_on_good_battery(self): + fsm = _make_fsm() + out = fsm.tick(_inp(battery_pct=80.0)) + assert fsm.state == DockingState.IDLE + assert out.state_changed is False + + def test_battery_low_triggers_detecting(self): + fsm = _make_fsm() + out = fsm.tick(_inp(battery_pct=10.0)) + assert fsm.state == DockingState.DETECTING + assert out.mission_interrupted is True + assert out.state_changed is True + + def test_dock_request_triggers_detecting(self): + fsm = _make_fsm() + out = fsm.tick(_inp(battery_pct=80.0, dock_requested=True)) + assert fsm.state == DockingState.DETECTING + assert out.mission_interrupted is False + + def test_dock_request_not_duplicated_mission_interrupted(self): + """Explicit dock request should not set mission_interrupted.""" + fsm = _make_fsm() + out = fsm.tick(_inp(battery_pct=80.0, dock_requested=True)) + assert out.mission_interrupted is False + + +class TestDockingStateMachineDetecting: + + def test_no_dock_stays_detecting(self): + fsm = _make_fsm() + fsm.tick(_inp(dock_requested=True)) + out = fsm.tick(_inp()) # no dock_pose + assert fsm.state == DockingState.DETECTING + + def test_far_dock_goes_to_nav2(self): + fsm = _make_fsm(servo_range_m=1.0) + fsm.tick(_inp(dock_requested=True)) + out = fsm.tick(_inp(dock_pose=_far_pose())) + assert fsm.state == DockingState.NAV2_APPROACH + assert out.request_nav2 is True + + def test_near_dock_skips_nav2_to_servo(self): + fsm = _make_fsm(servo_range_m=1.0) + fsm.tick(_inp(dock_requested=True)) + out = fsm.tick(_inp(dock_pose=_near_pose())) + assert fsm.state == DockingState.VISUAL_SERVO + + def test_undock_request_aborts_detecting(self): + fsm = _make_fsm() + fsm.tick(_inp(dock_requested=True)) + fsm.tick(_inp(undock_requested=True)) + assert fsm.state == DockingState.IDLE + + +class TestDockingStateMachineNav2Approach: + + def _reach_nav2(self, fsm): + fsm.tick(_inp(dock_requested=True)) + fsm.tick(_inp(dock_pose=_far_pose())) + assert fsm.state == DockingState.NAV2_APPROACH + + def test_nav2_arrived_enters_visual_servo(self): + fsm = _make_fsm() + self._reach_nav2(fsm) + fsm.tick(_inp(nav2_arrived=True)) + assert fsm.state == DockingState.VISUAL_SERVO + + def test_dock_close_during_nav2_enters_servo(self): + fsm = _make_fsm(servo_range_m=1.0) + self._reach_nav2(fsm) + out = fsm.tick(_inp(dock_pose=_near_pose())) + assert fsm.state == DockingState.VISUAL_SERVO + assert out.cancel_nav2 is True + + def test_nav2_failed_back_to_detecting(self): + fsm = _make_fsm() + self._reach_nav2(fsm) + fsm.tick(_inp(nav2_failed=True)) + assert fsm.state == DockingState.DETECTING + + def test_undock_during_nav2_cancels_and_idles(self): + fsm = _make_fsm() + self._reach_nav2(fsm) + out = fsm.tick(_inp(undock_requested=True)) + assert fsm.state == DockingState.IDLE + assert out.cancel_nav2 is True + + +class TestDockingStateMachineVisualServo: + + def _reach_servo(self, fsm): + fsm.tick(_inp(dock_requested=True)) + fsm.tick(_inp(dock_pose=_near_pose())) + assert fsm.state == DockingState.VISUAL_SERVO + + def test_aligned_enters_contact(self): + fsm = _make_fsm() + self._reach_servo(fsm) + fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True)) + assert fsm.state == DockingState.CONTACT + + def test_not_aligned_stays_in_servo(self): + fsm = _make_fsm() + self._reach_servo(fsm) + fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=False)) + assert fsm.state == DockingState.VISUAL_SERVO + + def test_lost_detection_timeout_enters_detecting(self): + fsm = _make_fsm(lost_ticks_max=3) + self._reach_servo(fsm) + for _ in range(3): # dock_pose=None each tick + fsm.tick(_inp()) + assert fsm.state == DockingState.DETECTING + + def test_lost_counter_resets_on_rediscovery(self): + """Lost count should reset when pose reappears.""" + fsm = _make_fsm(lost_ticks_max=3) + self._reach_servo(fsm) + fsm.tick(_inp()) # 1 lost tick + fsm.tick(_inp(dock_pose=_near_pose())) # rediscovered + # Need 3 more lost ticks to time out + fsm.tick(_inp()) + fsm.tick(_inp()) + assert fsm.state == DockingState.VISUAL_SERVO + fsm.tick(_inp()) + assert fsm.state == DockingState.DETECTING + + def test_undock_aborts_visual_servo(self): + fsm = _make_fsm() + self._reach_servo(fsm) + fsm.tick(_inp(undock_requested=True)) + assert fsm.state == DockingState.IDLE + + +class TestDockingStateMachineContact: + + def _reach_contact(self, fsm): + fsm.tick(_inp(dock_requested=True)) + fsm.tick(_inp(dock_pose=_near_pose())) + fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True)) + assert fsm.state == DockingState.CONTACT + + def test_charging_enters_charging_state(self): + fsm = _make_fsm() + self._reach_contact(fsm) + fsm.tick(_inp(is_charging=True)) + assert fsm.state == DockingState.CHARGING + + def test_contact_timeout_retries_servo(self): + fsm = _make_fsm(contact_timeout_ticks=3) + self._reach_contact(fsm) + for _ in range(3): + fsm.tick(_inp(is_charging=False)) + assert fsm.state == DockingState.VISUAL_SERVO + + def test_undock_from_contact_to_undocking(self): + fsm = _make_fsm() + self._reach_contact(fsm) + fsm.tick(_inp(undock_requested=True)) + assert fsm.state == DockingState.UNDOCKING + + +class TestDockingStateMachineCharging: + + def _reach_charging(self, fsm): + fsm.tick(_inp(dock_requested=True)) + fsm.tick(_inp(dock_pose=_near_pose())) + fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True)) + fsm.tick(_inp(is_charging=True)) + assert fsm.state == DockingState.CHARGING + + def test_full_battery_triggers_undocking(self): + fsm = _make_fsm(battery_high_pct=80.0) + self._reach_charging(fsm) + fsm.tick(_inp(is_charging=True, battery_pct=85.0)) + assert fsm.state == DockingState.UNDOCKING + + def test_undock_request_during_charging(self): + fsm = _make_fsm() + self._reach_charging(fsm) + fsm.tick(_inp(undock_requested=True)) + assert fsm.state == DockingState.UNDOCKING + + def test_charging_stays_charging_below_high_threshold(self): + fsm = _make_fsm(battery_high_pct=80.0) + self._reach_charging(fsm) + fsm.tick(_inp(is_charging=True, battery_pct=70.0)) + assert fsm.state == DockingState.CHARGING + + +class TestDockingStateMachineUndocking: + + def _reach_undocking_via_charge(self, fsm): + fsm.tick(_inp(dock_requested=True)) + fsm.tick(_inp(dock_pose=_near_pose())) + fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True)) + fsm.tick(_inp(is_charging=True)) + fsm.tick(_inp(is_charging=True, battery_pct=85.0)) # → UNDOCKING + assert fsm.state == DockingState.UNDOCKING + + def test_undocking_backs_away(self): + fsm = _make_fsm(undock_ticks_max=5) + self._reach_undocking_via_charge(fsm) + out = fsm.tick(_inp()) + assert out.cmd_linear < 0.0 + + def test_undocking_completes_back_to_idle(self): + fsm = _make_fsm(undock_ticks_max=5) + self._reach_undocking_via_charge(fsm) + for _ in range(5): + out = fsm.tick(_inp()) + assert fsm.state == DockingState.IDLE + + def test_mission_resume_after_battery_auto_dock(self): + """mission_resume must be True when auto-dock cycle completes.""" + fsm = _make_fsm(undock_ticks_max=5) + # Trigger via battery low + fsm.tick(_inp(battery_pct=10.0)) # IDLE → DETECTING + fsm.tick(_inp(dock_pose=_near_pose())) # → VISUAL_SERVO + fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True)) # → CONTACT + fsm.tick(_inp(is_charging=True)) # → CHARGING + fsm.tick(_inp(is_charging=True, battery_pct=85.0)) # → UNDOCKING + out = None + for _ in range(5): + out = fsm.tick(_inp()) + assert out is not None + assert out.mission_resume is True + assert fsm.state == DockingState.IDLE + + def test_no_mission_resume_on_explicit_undock(self): + """Explicit undock should not trigger mission_resume.""" + fsm = _make_fsm(undock_ticks_max=5) + fsm.tick(_inp(dock_requested=True)) # explicit dock — no interrupt + fsm.tick(_inp(dock_pose=_near_pose())) + fsm.tick(_inp(dock_pose=_near_pose(), servo_aligned=True)) + fsm.tick(_inp(is_charging=True)) + fsm.tick(_inp(undock_requested=True)) # explicit undock + last_out = None + for _ in range(5): + last_out = fsm.tick(_inp()) + assert last_out is not None + assert last_out.mission_resume is False + + +class TestDockingStateMachineReset: + + def test_reset_returns_to_idle(self): + fsm = _make_fsm() + fsm.tick(_inp(dock_requested=True)) + assert fsm.state == DockingState.DETECTING + fsm.reset() + assert fsm.state == DockingState.IDLE + + def test_state_changed_flag_on_transition(self): + fsm = _make_fsm() + out = fsm.tick(_inp(dock_requested=True)) + assert out.state_changed is True + + def test_no_state_changed_when_same_state(self): + fsm = _make_fsm() + out = fsm.tick(_inp()) # stays IDLE + assert out.state_changed is False diff --git a/jetson/ros2_ws/src/saltybot_docking_msgs/CMakeLists.txt b/jetson/ros2_ws/src/saltybot_docking_msgs/CMakeLists.txt new file mode 100644 index 0000000..60048ea --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking_msgs/CMakeLists.txt @@ -0,0 +1,16 @@ +cmake_minimum_required(VERSION 3.8) +project(saltybot_docking_msgs) + +find_package(ament_cmake REQUIRED) +find_package(rosidl_default_generators REQUIRED) +find_package(builtin_interfaces REQUIRED) + +rosidl_generate_interfaces(${PROJECT_NAME} + "msg/DockingStatus.msg" + "srv/Dock.srv" + "srv/Undock.srv" + DEPENDENCIES builtin_interfaces +) + +ament_export_dependencies(rosidl_default_runtime) +ament_package() diff --git a/jetson/ros2_ws/src/saltybot_docking_msgs/msg/DockingStatus.msg b/jetson/ros2_ws/src/saltybot_docking_msgs/msg/DockingStatus.msg new file mode 100644 index 0000000..a0b8172 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking_msgs/msg/DockingStatus.msg @@ -0,0 +1,21 @@ +# DockingStatus.msg — Real-time docking state snapshot (Issue #158) +# Published by: /saltybot/docking_node +# Topic: /saltybot/docking_status + +builtin_interfaces/Time stamp + +# Current FSM state +# Values: "IDLE" | "DETECTING" | "NAV2_APPROACH" | "VISUAL_SERVO" | "CONTACT" | "CHARGING" | "UNDOCKING" +string state + +# Dock detection +bool dock_detected # ArUco marker or IR beacon visible +float32 distance_m # distance to dock (m); NaN if unknown +float32 lateral_error_m # lateral offset to dock centre (m); +ve = dock is right + +# Battery / charging +float32 battery_pct # state of charge [0–100] +bool charging # charge current detected on dock pins + +# Alignment quality (active only during VISUAL_SERVO) +bool aligned # within ±5 mm lateral and contact distance diff --git a/jetson/ros2_ws/src/saltybot_docking_msgs/package.xml b/jetson/ros2_ws/src/saltybot_docking_msgs/package.xml new file mode 100644 index 0000000..c96a38c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking_msgs/package.xml @@ -0,0 +1,22 @@ + + + + saltybot_docking_msgs + 0.1.0 + Docking station message and service definitions for SaltyBot (Issue #158) + sl-controls + MIT + + ament_cmake + rosidl_default_generators + + builtin_interfaces + + rosidl_default_runtime + + rosidl_interface_packages + + + ament_cmake + + diff --git a/jetson/ros2_ws/src/saltybot_docking_msgs/srv/Dock.srv b/jetson/ros2_ws/src/saltybot_docking_msgs/srv/Dock.srv new file mode 100644 index 0000000..74c4c90 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking_msgs/srv/Dock.srv @@ -0,0 +1,9 @@ +# Dock.srv — Request the robot to autonomously dock (Issue #158) +# Service: /saltybot/dock + +# If force=True the current Nav2 mission is cancelled immediately. +# If force=False the dock request is honoured only when the robot is idle. +bool force +--- +bool success +string message diff --git a/jetson/ros2_ws/src/saltybot_docking_msgs/srv/Undock.srv b/jetson/ros2_ws/src/saltybot_docking_msgs/srv/Undock.srv new file mode 100644 index 0000000..361f5f9 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_docking_msgs/srv/Undock.srv @@ -0,0 +1,9 @@ +# Undock.srv — Request the robot to leave the dock (Issue #158) +# Service: /saltybot/undock + +# If resume_mission=True and a mission was interrupted for battery, it is +# re-queued after the robot backs away from the dock. +bool resume_mission +--- +bool success +string message