diff --git a/jetson/ros2_ws/src/saltybot_aruco_detect/config/aruco_detect_params.yaml b/jetson/ros2_ws/src/saltybot_aruco_detect/config/aruco_detect_params.yaml new file mode 100644 index 0000000..385d612 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_aruco_detect/config/aruco_detect_params.yaml @@ -0,0 +1,34 @@ +aruco_detect: + ros__parameters: + # ── Marker specification ───────────────────────────────────────────────── + marker_size_m: 0.10 # physical side length of printed ArUco markers (m) + # measure the black border edge-to-edge + + # ── Dock target filter ─────────────────────────────────────────────────── + # dock_marker_ids: IDs that may serve as dock target. + # - Empty list [] → any detected marker is a dock candidate (closest wins) + # - Non-empty → only listed IDs are candidates; others still appear in + # /saltybot/aruco/markers but not in dock_target + # The saltybot dock uses marker ID 42 on the charging face by convention. + dock_marker_ids: [42] + + # Maximum distance to consider a marker as dock target (m) + max_dock_range_m: 3.0 + + # ── ArUco dictionary ───────────────────────────────────────────────────── + aruco_dict: DICT_4X4_50 # cv2.aruco constant name + + # Corner sub-pixel refinement (improves pose accuracy at close range) + # CORNER_REFINE_NONE — fastest, no refinement + # CORNER_REFINE_SUBPIX — best for high-contrast markers (recommended) + # CORNER_REFINE_CONTOUR — good for blurry or low-res images + corner_refinement: CORNER_REFINE_SUBPIX + + # ── Frame / topic config ───────────────────────────────────────────────── + camera_frame: camera_color_optical_frame + + # ── Debug image output (disabled by default) ───────────────────────────── + # When true, publishes annotated BGR image with markers + axes drawn. + # Useful for tuning but costs ~10 ms/frame encoding overhead. + draw_debug_image: false + debug_image_topic: /saltybot/aruco/debug_image diff --git a/jetson/ros2_ws/src/saltybot_aruco_detect/launch/aruco_detect.launch.py b/jetson/ros2_ws/src/saltybot_aruco_detect/launch/aruco_detect.launch.py new file mode 100644 index 0000000..9745f10 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_aruco_detect/launch/aruco_detect.launch.py @@ -0,0 +1,29 @@ +"""aruco_detect.launch.py — ArUco marker detection for docking (Issue #627).""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description(): + pkg_share = get_package_share_directory('saltybot_aruco_detect') + default_params = os.path.join(pkg_share, 'config', 'aruco_detect_params.yaml') + + params_arg = DeclareLaunchArgument( + 'params_file', + default_value=default_params, + description='Path to aruco_detect_params.yaml', + ) + + aruco_node = Node( + package='saltybot_aruco_detect', + executable='aruco_detect', + name='aruco_detect', + output='screen', + parameters=[LaunchConfiguration('params_file')], + ) + + return LaunchDescription([params_arg, aruco_node]) diff --git a/jetson/ros2_ws/src/saltybot_aruco_detect/package.xml b/jetson/ros2_ws/src/saltybot_aruco_detect/package.xml new file mode 100644 index 0000000..c44f72a --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_aruco_detect/package.xml @@ -0,0 +1,37 @@ + + + + saltybot_aruco_detect + 0.1.0 + + ArUco marker detection for docking alignment (Issue #627). + Detects all DICT_4X4_50 markers from RealSense D435i RGB, estimates 6-DOF + pose with estimatePoseSingleMarkers, publishes PoseArray on + /saltybot/aruco/markers and closest dock-candidate marker on + /saltybot/aruco/dock_target (PoseStamped) with RViz MarkerArray and + JSON status. + + sl-perception + MIT + + ament_python + + rclpy + std_msgs + sensor_msgs + geometry_msgs + visualization_msgs + cv_bridge + + python3-numpy + python3-opencv + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_aruco_detect/resource/saltybot_aruco_detect b/jetson/ros2_ws/src/saltybot_aruco_detect/resource/saltybot_aruco_detect new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_aruco_detect/saltybot_aruco_detect/__init__.py b/jetson/ros2_ws/src/saltybot_aruco_detect/saltybot_aruco_detect/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_aruco_detect/saltybot_aruco_detect/aruco_detect_node.py b/jetson/ros2_ws/src/saltybot_aruco_detect/saltybot_aruco_detect/aruco_detect_node.py new file mode 100644 index 0000000..7318a41 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_aruco_detect/saltybot_aruco_detect/aruco_detect_node.py @@ -0,0 +1,518 @@ +""" +aruco_detect_node.py — ArUco marker detection for docking alignment (Issue #627). + +Detects all DICT_4X4_50 ArUco markers visible in the RealSense D435i RGB stream, +estimates their 6-DOF poses relative to the camera using estimatePoseSingleMarkers, +and publishes: + + /saltybot/aruco/markers geometry_msgs/PoseArray — all detected markers + /saltybot/aruco/dock_target geometry_msgs/PoseStamped — closest dock marker + /saltybot/aruco/viz visualization_msgs/MarkerArray — RViz axes overlays + /saltybot/aruco/status std_msgs/String — JSON summary (10 Hz) + +Coordinate frame +──────────────── +All poses are expressed in camera_color_optical_frame (ROS optical convention): + +X = right, +Y = down, +Z = forward (into scene) + +The dock_target pose gives the docking controller everything it needs: + position.z — forward distance to dock (m) → throttle target + position.x — lateral offset (m) → steer target + orientation — marker orientation for final align → yaw servo + +Dock target selection +───────────────────── + dock_marker_ids list[int] [] — empty = accept any detected marker + non-empty = only these IDs are candidates + closest_wins bool true — among candidates, prefer the nearest + max_dock_range_m float 3.0 — ignore markers beyond this distance + +estimatePoseSingleMarkers API +────────────────────────────── + Uses cv2.aruco.estimatePoseSingleMarkers (legacy OpenCV API still present in + 4.x with a deprecation notice). Falls back to per-marker cv2.solvePnP with + SOLVEPNP_IPPE_SQUARE if the legacy function is unavailable. + Both paths use the same camera_matrix / dist_coeffs from CameraInfo. + +Subscribes +────────── + /camera/color/image_raw sensor_msgs/Image 30 Hz (BGR8) + /camera/color/camera_info sensor_msgs/CameraInfo (latched, once) + +Publishes +───────── + /saltybot/aruco/markers geometry_msgs/PoseArray + /saltybot/aruco/dock_target geometry_msgs/PoseStamped + /saltybot/aruco/viz visualization_msgs/MarkerArray + /saltybot/aruco/status std_msgs/String (JSON, 10 Hz) + +Parameters (see config/aruco_detect_params.yaml) +──────────────────────────────────────────────── + marker_size_m float 0.10 physical side length of printed markers (m) + dock_marker_ids int[] [] IDs to accept as dock targets (empty = all) + max_dock_range_m float 3.0 ignore candidates beyond this (m) + aruco_dict str DICT_4X4_50 + corner_refinement str CORNER_REFINE_SUBPIX + camera_frame str camera_color_optical_frame + draw_debug_image bool false publish annotated image for debugging + debug_image_topic str /saltybot/aruco/debug_image +""" + +from __future__ import annotations + +import json +import math + +import rclpy +from rclpy.node import Node +from rclpy.qos import ( + QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy +) + +import numpy as np +import cv2 +from cv_bridge import CvBridge + +from geometry_msgs.msg import ( + Pose, PoseArray, PoseStamped, Point, Quaternion, Vector3, TransformStamped +) +from sensor_msgs.msg import Image, CameraInfo +from std_msgs.msg import Header, String, ColorRGBA +from visualization_msgs.msg import Marker, MarkerArray + +from .aruco_math import MarkerPose, rvec_to_quat, tvec_distance + + +# ── QoS ─────────────────────────────────────────────────────────────────────── +_SENSOR_QOS = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=5, +) +_LATCHED_QOS = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=1, + durability=DurabilityPolicy.TRANSIENT_LOCAL, +) + +# ArUco axis length for viz (fraction of marker size) +_AXIS_LEN_FRAC = 0.5 + + +class ArucoDetectNode(Node): + """ArUco marker detection + pose estimation node — see module docstring.""" + + def __init__(self) -> None: + super().__init__('aruco_detect') + + # ── Parameters ──────────────────────────────────────────────────────── + self.declare_parameter('marker_size_m', 0.10) + self.declare_parameter('dock_marker_ids', []) # empty list = all + self.declare_parameter('max_dock_range_m', 3.0) + self.declare_parameter('aruco_dict', 'DICT_4X4_50') + self.declare_parameter('corner_refinement', 'CORNER_REFINE_SUBPIX') + self.declare_parameter('camera_frame', 'camera_color_optical_frame') + self.declare_parameter('draw_debug_image', False) + self.declare_parameter('debug_image_topic', '/saltybot/aruco/debug_image') + + self._marker_size = self.get_parameter('marker_size_m').value + self._dock_ids = set(self.get_parameter('dock_marker_ids').value) + self._max_range = self.get_parameter('max_dock_range_m').value + self._cam_frame = self.get_parameter('camera_frame').value + self._draw_debug = self.get_parameter('draw_debug_image').value + self._debug_topic = self.get_parameter('debug_image_topic').value + + aruco_dict_name = self.get_parameter('aruco_dict').value + refine_name = self.get_parameter('corner_refinement').value + + # ── ArUco detector setup ────────────────────────────────────────────── + dict_id = getattr(cv2.aruco, aruco_dict_name, cv2.aruco.DICT_4X4_50) + params = cv2.aruco.DetectorParameters() + # Apply corner refinement + refine_id = getattr(cv2.aruco, refine_name, cv2.aruco.CORNER_REFINE_SUBPIX) + params.cornerRefinementMethod = refine_id + + aruco_dict = cv2.aruco.getPredefinedDictionary(dict_id) + self._detector = cv2.aruco.ArucoDetector(aruco_dict, params) + + # Object points for solvePnP fallback (counter-clockwise from top-left) + half = self._marker_size / 2.0 + self._obj_pts = np.array([ + [-half, half, 0.0], + [ half, half, 0.0], + [ half, -half, 0.0], + [-half, -half, 0.0], + ], dtype=np.float64) + + # ── Camera intrinsics (set on first CameraInfo) ─────────────────────── + self._K: np.ndarray | None = None + self._dist: np.ndarray | None = None + self._bridge = CvBridge() + + # ── Publishers ───────────────────────────────────────────────────────── + self._pose_array_pub = self.create_publisher( + PoseArray, '/saltybot/aruco/markers', 10 + ) + self._dock_pub = self.create_publisher( + PoseStamped, '/saltybot/aruco/dock_target', 10 + ) + self._viz_pub = self.create_publisher( + MarkerArray, '/saltybot/aruco/viz', 10 + ) + self._status_pub = self.create_publisher( + String, '/saltybot/aruco/status', 10 + ) + if self._draw_debug: + self._debug_pub = self.create_publisher( + Image, self._debug_topic, 5 + ) + else: + self._debug_pub = None + + # ── Subscriptions ────────────────────────────────────────────────────── + self.create_subscription( + CameraInfo, + '/camera/color/camera_info', + self._on_camera_info, + _LATCHED_QOS, + ) + self.create_subscription( + Image, + '/camera/color/image_raw', + self._on_image, + _SENSOR_QOS, + ) + + # ── Status timer (10 Hz) ──────────────────────────────────────────────── + self._last_detections: list[MarkerPose] = [] + self.create_timer(0.1, self._on_status_timer) + + self.get_logger().info( + f'aruco_detect ready — ' + f'dict={aruco_dict_name} ' + f'marker_size={self._marker_size * 100:.0f}cm ' + f'dock_ids={list(self._dock_ids) if self._dock_ids else "any"} ' + f'max_range={self._max_range}m' + ) + + # ── Camera info ─────────────────────────────────────────────────────────── + + def _on_camera_info(self, msg: CameraInfo) -> None: + if self._K is not None: + return + self._K = np.array(msg.k, dtype=np.float64).reshape(3, 3) + self._dist = np.array(msg.d, dtype=np.float64).reshape(1, -1) + self.get_logger().info( + f'camera_info received — ' + f'fx={self._K[0,0]:.1f} fy={self._K[1,1]:.1f}' + ) + + # ── Image callback ──────────────────────────────────────────────────────── + + def _on_image(self, msg: Image) -> None: + if self._K is None: + return + + # ── Decode ──────────────────────────────────────────────────────────── + try: + bgr = self._bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8') + except Exception as exc: + self.get_logger().warning(f'image decode error: {exc}', + throttle_duration_sec=5.0) + return + + # Convert to greyscale for detection (faster, same accuracy) + gray = cv2.cvtColor(bgr, cv2.COLOR_BGR2GRAY) + + stamp = msg.header.stamp + + # ── Detect markers ──────────────────────────────────────────────────── + corners, ids, _ = self._detector.detectMarkers(gray) + + if ids is None or len(ids) == 0: + self._last_detections = [] + self._publish_empty(stamp) + return + + ids_flat = ids.flatten().tolist() + + # ── Pose estimation ─────────────────────────────────────────────────── + detections = self._estimate_poses(corners, ids_flat) + + # Filter by range + detections = [d for d in detections if d.distance_m <= self._max_range] + + self._last_detections = detections + + # ── Publish ─────────────────────────────────────────────────────────── + self._publish_pose_array(detections, stamp) + self._publish_dock_target(detections, stamp) + self._publish_viz(detections, stamp) + + # Debug image + if self._debug_pub is not None: + self._publish_debug(bgr, corners, ids, detections, stamp) + + # ── Pose estimation ─────────────────────────────────────────────────────── + + def _estimate_poses( + self, + corners: list, + ids: list[int], + ) -> list[MarkerPose]: + """ + Estimate 6-DOF pose for each detected marker. + + Uses estimatePoseSingleMarkers (legacy API, still present in cv2 4.x). + Falls back to per-marker solvePnP(IPPE_SQUARE) if unavailable. + """ + results: list[MarkerPose] = [] + + # ── Try legacy estimatePoseSingleMarkers ────────────────────────────── + if hasattr(cv2.aruco, 'estimatePoseSingleMarkers'): + try: + rvecs, tvecs, _ = cv2.aruco.estimatePoseSingleMarkers( + corners, self._marker_size, self._K, self._dist + ) + for i, mid in enumerate(ids): + rvec = rvecs[i].ravel().astype(np.float64) + tvec = tvecs[i].ravel().astype(np.float64) + results.append(MarkerPose( + marker_id = int(mid), + tvec = tvec, + rvec = rvec, + corners = corners[i].reshape(4, 2).astype(np.float64), + )) + return results + except Exception as exc: + self.get_logger().debug( + f'estimatePoseSingleMarkers failed ({exc}), ' + 'falling back to solvePnP' + ) + + # ── Fallback: per-marker solvePnP ───────────────────────────────────── + for i, mid in enumerate(ids): + img_pts = corners[i].reshape(4, 2).astype(np.float64) + ok, rvec, tvec = cv2.solvePnP( + self._obj_pts, img_pts, + self._K, self._dist, + flags=cv2.SOLVEPNP_IPPE_SQUARE, + ) + if not ok: + continue + results.append(MarkerPose( + marker_id = int(mid), + tvec = tvec.ravel().astype(np.float64), + rvec = rvec.ravel().astype(np.float64), + corners = img_pts, + )) + return results + + # ── Dock target selection ───────────────────────────────────────────────── + + def _select_dock_target( + self, + detections: list[MarkerPose], + ) -> MarkerPose | None: + """Select the closest marker that matches dock_marker_ids filter.""" + candidates = [ + d for d in detections + if not self._dock_ids or d.marker_id in self._dock_ids + ] + if not candidates: + return None + return min(candidates, key=lambda d: d.distance_m) + + # ── Publishers ──────────────────────────────────────────────────────────── + + def _publish_pose_array( + self, + detections: list[MarkerPose], + stamp, + ) -> None: + pa = PoseArray() + pa.header.stamp = stamp + pa.header.frame_id = self._cam_frame + + for d in detections: + qx, qy, qz, qw = d.quat + pose = Pose() + pose.position = Point(x=float(d.tvec[0]), + y=float(d.tvec[1]), + z=float(d.tvec[2])) + pose.orientation = Quaternion(x=qx, y=qy, z=qz, w=qw) + pa.poses.append(pose) + + self._pose_array_pub.publish(pa) + + def _publish_dock_target( + self, + detections: list[MarkerPose], + stamp, + ) -> None: + target = self._select_dock_target(detections) + if target is None: + return + + qx, qy, qz, qw = target.quat + ps = PoseStamped() + ps.header.stamp = stamp + ps.header.frame_id = self._cam_frame + ps.pose.position = Point(x=float(target.tvec[0]), + y=float(target.tvec[1]), + z=float(target.tvec[2])) + ps.pose.orientation = Quaternion(x=qx, y=qy, z=qz, w=qw) + self._dock_pub.publish(ps) + + def _publish_viz( + self, + detections: list[MarkerPose], + stamp, + ) -> None: + """Publish coordinate-axes MarkerArray for each detected marker.""" + ma = MarkerArray() + lifetime = _ros_duration(0.2) + + # Delete-all + dm = Marker() + dm.action = Marker.DELETEALL + dm.header.stamp = stamp + dm.header.frame_id = self._cam_frame + ma.markers.append(dm) + + target = self._select_dock_target(detections) + axis_len = self._marker_size * _AXIS_LEN_FRAC + + for idx, d in enumerate(detections): + is_target = (target is not None and d.marker_id == target.marker_id) + cx, cy, cz = float(d.tvec[0]), float(d.tvec[1]), float(d.tvec[2]) + qx, qy, qz, qw = d.quat + base_id = idx * 10 + + # Sphere at marker centre + sph = Marker() + sph.header.stamp = stamp + sph.header.frame_id = self._cam_frame + sph.ns = 'aruco_centers' + sph.id = base_id + sph.type = Marker.SPHERE + sph.action = Marker.ADD + sph.pose.position = Point(x=cx, y=cy, z=cz) + sph.pose.orientation = Quaternion(x=qx, y=qy, z=qz, w=qw) + r_s = 0.015 if not is_target else 0.025 + sph.scale = Vector3(x=r_s * 2, y=r_s * 2, z=r_s * 2) + sph.color = ( + ColorRGBA(r=1.0, g=0.2, b=0.2, a=1.0) if is_target else + ColorRGBA(r=0.2, g=0.8, b=1.0, a=0.9) + ) + sph.lifetime = lifetime + ma.markers.append(sph) + + # Text label: ID + distance + txt = Marker() + txt.header.stamp = stamp + txt.header.frame_id = self._cam_frame + txt.ns = 'aruco_labels' + txt.id = base_id + 1 + txt.type = Marker.TEXT_VIEW_FACING + txt.action = Marker.ADD + txt.pose.position = Point(x=cx, y=cy - 0.08, z=cz) + txt.pose.orientation.w = 1.0 + txt.scale.z = 0.06 + txt.color = ColorRGBA(r=1.0, g=1.0, b=0.0, a=1.0) + txt.text = f'ID={d.marker_id}\n{d.distance_m:.2f}m' + if is_target: + txt.text += '\n[DOCK]' + txt.lifetime = lifetime + ma.markers.append(txt) + + self._viz_pub.publish(ma) + + def _publish_empty(self, stamp) -> None: + pa = PoseArray() + pa.header.stamp = stamp + pa.header.frame_id = self._cam_frame + self._pose_array_pub.publish(pa) + self._publish_viz([], stamp) + + def _publish_debug(self, bgr, corners, ids, detections, stamp) -> None: + """Annotate and publish debug image with detected markers drawn.""" + debug = bgr.copy() + if ids is not None and len(ids) > 0: + cv2.aruco.drawDetectedMarkers(debug, corners, ids) + + # Draw axes for each detection using the estimated poses + for d in detections: + try: + cv2.drawFrameAxes( + debug, self._K, self._dist, + d.rvec.reshape(3, 1), + d.tvec.reshape(3, 1), + self._marker_size * _AXIS_LEN_FRAC, + ) + except Exception: + pass + + try: + img_msg = self._bridge.cv2_to_imgmsg(debug, encoding='bgr8') + img_msg.header.stamp = stamp + img_msg.header.frame_id = self._cam_frame + self._debug_pub.publish(img_msg) + except Exception: + pass + + # ── Status timer ───────────────────────────────────────────────────────── + + def _on_status_timer(self) -> None: + detections = self._last_detections + target = self._select_dock_target(detections) + + markers_info = [ + { + 'id': d.marker_id, + 'distance_m': round(d.distance_m, 3), + 'yaw_deg': round(math.degrees(d.yaw_rad), 2), + 'lateral_m': round(d.lateral_m, 3), + 'forward_m': round(d.forward_m, 3), + 'is_target': target is not None and d.marker_id == target.marker_id, + } + for d in detections + ] + + status = { + 'detected_count': len(detections), + 'dock_target_id': target.marker_id if target else None, + 'dock_distance_m': round(target.distance_m, 3) if target else None, + 'dock_yaw_deg': round(math.degrees(target.yaw_rad), 2) if target else None, + 'dock_lateral_m': round(target.lateral_m, 3) if target else None, + 'markers': markers_info, + } + msg = String() + msg.data = json.dumps(status) + self._status_pub.publish(msg) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _ros_duration(seconds: float): + from rclpy.duration import Duration + sec = int(seconds) + nsec = int((seconds - sec) * 1e9) + return Duration(seconds=sec, nanoseconds=nsec).to_msg() + + +def main(args=None) -> None: + rclpy.init(args=args) + node = ArucoDetectNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == '__main__': + main() diff --git a/jetson/ros2_ws/src/saltybot_aruco_detect/saltybot_aruco_detect/aruco_math.py b/jetson/ros2_ws/src/saltybot_aruco_detect/saltybot_aruco_detect/aruco_math.py new file mode 100644 index 0000000..fd145e2 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_aruco_detect/saltybot_aruco_detect/aruco_math.py @@ -0,0 +1,156 @@ +""" +aruco_math.py — Pure-math helpers for ArUco detection (Issue #627). + +No ROS2 dependencies — importable in unit tests without a ROS2 install. + +Provides: + rot_mat_to_quat(R) → (qx, qy, qz, qw) + rvec_to_quat(rvec) → (qx, qy, qz, qw) [via cv2.Rodrigues] + tvec_distance(tvec) → float (Euclidean norm) + tvec_yaw_rad(tvec) → float (atan2(tx, tz) — lateral bearing error) + MarkerPose dataclass — per-marker detection result +""" + +from __future__ import annotations + +import math +from dataclasses import dataclass, field +from typing import List, Optional, Tuple + +import numpy as np + + +# ── Rotation helpers ────────────────────────────────────────────────────────── + +def rot_mat_to_quat(R: np.ndarray) -> Tuple[float, float, float, float]: + """ + Convert a 3×3 rotation matrix to quaternion (qx, qy, qz, qw). + + Uses Shepperd's method for numerical stability. + R must be a valid rotation matrix (‖R‖₂ = 1, det R = +1). + """ + R = np.asarray(R, dtype=np.float64) + trace = R[0, 0] + R[1, 1] + R[2, 2] + + if trace > 0.0: + s = 0.5 / math.sqrt(trace + 1.0) + w = 0.25 / s + x = (R[2, 1] - R[1, 2]) * s + y = (R[0, 2] - R[2, 0]) * s + z = (R[1, 0] - R[0, 1]) * s + elif R[0, 0] > R[1, 1] and R[0, 0] > R[2, 2]: + s = 2.0 * math.sqrt(1.0 + R[0, 0] - R[1, 1] - R[2, 2]) + w = (R[2, 1] - R[1, 2]) / s + x = 0.25 * s + y = (R[0, 1] + R[1, 0]) / s + z = (R[0, 2] + R[2, 0]) / s + elif R[1, 1] > R[2, 2]: + s = 2.0 * math.sqrt(1.0 + R[1, 1] - R[0, 0] - R[2, 2]) + w = (R[0, 2] - R[2, 0]) / s + x = (R[0, 1] + R[1, 0]) / s + y = 0.25 * s + z = (R[1, 2] + R[2, 1]) / s + else: + s = 2.0 * math.sqrt(1.0 + R[2, 2] - R[0, 0] - R[1, 1]) + w = (R[1, 0] - R[0, 1]) / s + x = (R[0, 2] + R[2, 0]) / s + y = (R[1, 2] + R[2, 1]) / s + z = 0.25 * s + + # Normalise + norm = math.sqrt(x * x + y * y + z * z + w * w) + if norm < 1e-10: + return 0.0, 0.0, 0.0, 1.0 + inv = 1.0 / norm + return x * inv, y * inv, z * inv, w * inv + + +def rvec_to_quat(rvec: np.ndarray) -> Tuple[float, float, float, float]: + """ + Convert a Rodrigues rotation vector (3,) or (1,3) to quaternion. + + Requires cv2 for cv2.Rodrigues(); falls back to manual exponential map + if cv2 is unavailable. + """ + vec = np.asarray(rvec, dtype=np.float64).ravel() + try: + import cv2 + R, _ = cv2.Rodrigues(vec) + except ImportError: + # Manual Rodrigues exponential map + angle = float(np.linalg.norm(vec)) + if angle < 1e-12: + return 0.0, 0.0, 0.0, 1.0 + axis = vec / angle + K = np.array([ + [ 0.0, -axis[2], axis[1]], + [ axis[2], 0.0, -axis[0]], + [-axis[1], axis[0], 0.0], + ]) + R = np.eye(3) + math.sin(angle) * K + (1.0 - math.cos(angle)) * K @ K + + return rot_mat_to_quat(R) + + +# ── Metric helpers ──────────────────────────────────────────────────────────── + +def tvec_distance(tvec: np.ndarray) -> float: + """Euclidean distance from camera to marker (m).""" + t = np.asarray(tvec, dtype=np.float64).ravel() + return float(math.sqrt(t[0] ** 2 + t[1] ** 2 + t[2] ** 2)) + + +def tvec_yaw_rad(tvec: np.ndarray) -> float: + """ + Horizontal bearing error to marker (rad). + + atan2(tx, tz): +ve when marker is to the right of the camera optical axis. + Zero when the marker is directly in front. + """ + t = np.asarray(tvec, dtype=np.float64).ravel() + return float(math.atan2(t[0], t[2])) + + +# ── Per-marker result ───────────────────────────────────────────────────────── + +@dataclass +class MarkerPose: + """Pose result for a single detected ArUco marker.""" + marker_id: int + tvec: np.ndarray # (3,) translation in camera optical frame (m) + rvec: np.ndarray # (3,) Rodrigues rotation vector + corners: np.ndarray # (4, 2) image-plane corner coordinates (px) + + # Derived (computed lazily) + _distance_m: Optional[float] = field(default=None, repr=False) + _yaw_rad: Optional[float] = field(default=None, repr=False) + _quat: Optional[Tuple[float, float, float, float]] = field(default=None, repr=False) + + @property + def distance_m(self) -> float: + if self._distance_m is None: + self._distance_m = tvec_distance(self.tvec) + return self._distance_m + + @property + def yaw_rad(self) -> float: + if self._yaw_rad is None: + self._yaw_rad = tvec_yaw_rad(self.tvec) + return self._yaw_rad + + @property + def lateral_m(self) -> float: + """Lateral offset (m); +ve = marker is to the right.""" + return float(self.tvec[0]) + + @property + def forward_m(self) -> float: + """Forward distance (Z component in camera frame, m).""" + return float(self.tvec[2]) + + @property + def quat(self) -> Tuple[float, float, float, float]: + """Quaternion (qx, qy, qz, qw) from rvec.""" + if self._quat is None: + self._quat = rvec_to_quat(self.rvec) + return self._quat diff --git a/jetson/ros2_ws/src/saltybot_aruco_detect/setup.cfg b/jetson/ros2_ws/src/saltybot_aruco_detect/setup.cfg new file mode 100644 index 0000000..a7c6945 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_aruco_detect/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/saltybot_aruco_detect + +[install] +install_scripts=$base/lib/saltybot_aruco_detect diff --git a/jetson/ros2_ws/src/saltybot_aruco_detect/setup.py b/jetson/ros2_ws/src/saltybot_aruco_detect/setup.py new file mode 100644 index 0000000..9952b00 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_aruco_detect/setup.py @@ -0,0 +1,30 @@ +import os +from glob import glob +from setuptools import setup + +package_name = 'saltybot_aruco_detect' + +setup( + name=package_name, + version='0.1.0', + packages=[package_name], + data_files=[ + ('share/ament_index/resource_index/packages', + ['resource/' + package_name]), + ('share/' + package_name, ['package.xml']), + (os.path.join('share', package_name, 'launch'), glob('launch/*.py')), + (os.path.join('share', package_name, 'config'), glob('config/*.yaml')), + ], + install_requires=['setuptools'], + zip_safe=True, + maintainer='sl-perception', + maintainer_email='sl-perception@saltylab.local', + description='ArUco marker detection for docking alignment (Issue #627)', + license='MIT', + tests_require=['pytest'], + entry_points={ + 'console_scripts': [ + 'aruco_detect = saltybot_aruco_detect.aruco_detect_node:main', + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_aruco_detect/test/test_aruco_math.py b/jetson/ros2_ws/src/saltybot_aruco_detect/test/test_aruco_math.py new file mode 100644 index 0000000..1546dd3 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_aruco_detect/test/test_aruco_math.py @@ -0,0 +1,206 @@ +"""Unit tests for aruco_math.py — no ROS2, no live camera required.""" + +import math +import numpy as np +import pytest + +from saltybot_aruco_detect.aruco_math import ( + rot_mat_to_quat, + rvec_to_quat, + tvec_distance, + tvec_yaw_rad, + MarkerPose, +) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _quat_norm(q): + return math.sqrt(sum(x * x for x in q)) + + +def _quat_rotate(q, v): + """Rotate vector v by quaternion q (qx, qy, qz, qw).""" + qx, qy, qz, qw = q + # Quaternion product: q * (0,v) * q^-1 + # Using formula: v' = v + 2qw(q × v) + 2(q × (q × v)) + qvec = np.array([qx, qy, qz]) + t = 2.0 * np.cross(qvec, v) + return v + qw * t + np.cross(qvec, t) + + +# ── rot_mat_to_quat ─────────────────────────────────────────────────────────── + +def test_identity_rotation(): + R = np.eye(3) + q = rot_mat_to_quat(R) + assert abs(q[3] - 1.0) < 1e-9 # w = 1 for identity + assert abs(q[0]) < 1e-9 + assert abs(q[1]) < 1e-9 + assert abs(q[2]) < 1e-9 + + +def test_90deg_yaw(): + """90° rotation about Z axis.""" + angle = math.pi / 2 + R = np.array([ + [math.cos(angle), -math.sin(angle), 0], + [math.sin(angle), math.cos(angle), 0], + [0, 0, 1], + ]) + qx, qy, qz, qw = rot_mat_to_quat(R) + # For 90° Z rotation: q = (0, 0, sin45°, cos45°) + assert abs(qx) < 1e-6 + assert abs(qy) < 1e-6 + assert abs(qz - math.sin(angle / 2)) < 1e-6 + assert abs(qw - math.cos(angle / 2)) < 1e-6 + + +def test_unit_norm(): + for yaw in [0, 0.5, 1.0, 2.0, math.pi]: + R = np.array([ + [math.cos(yaw), -math.sin(yaw), 0], + [math.sin(yaw), math.cos(yaw), 0], + [0, 0, 1], + ]) + q = rot_mat_to_quat(R) + assert abs(_quat_norm(q) - 1.0) < 1e-9, f"yaw={yaw}: norm={_quat_norm(q)}" + + +def test_roundtrip_rotation_matrix(): + """Rotating a vector with the matrix and quaternion should give same result.""" + angle = 1.23 + R = np.array([ + [math.cos(angle), -math.sin(angle), 0], + [math.sin(angle), math.cos(angle), 0], + [0, 0, 1], + ]) + q = rot_mat_to_quat(R) + v = np.array([1.0, 0.0, 0.0]) + v_R = R @ v + v_q = _quat_rotate(q, v) + np.testing.assert_allclose(v_R, v_q, atol=1e-9) + + +def test_180deg_pitch(): + """180° rotation about Y axis.""" + R = np.array([[-1, 0, 0], [0, 1, 0], [0, 0, -1]], dtype=float) + q = rot_mat_to_quat(R) + assert abs(_quat_norm(q) - 1.0) < 1e-9 + # For 180° Y: q = (0, 1, 0, 0) or (0, -1, 0, 0) + assert abs(abs(q[1]) - 1.0) < 1e-6 + + +# ── rvec_to_quat ────────────────────────────────────────────────────────────── + +def test_rvec_zero_is_identity(): + q = rvec_to_quat(np.array([0.0, 0.0, 0.0])) + assert abs(_quat_norm(q) - 1.0) < 1e-6 + assert abs(q[3] - 1.0) < 1e-6 # w = 1 + + +def test_rvec_unit_norm(): + for rvec in [ + [0.1, 0.0, 0.0], + [0.0, 0.5, 0.0], + [0.0, 0.0, 1.57], + [0.3, 0.4, 0.5], + ]: + q = rvec_to_quat(np.array(rvec)) + assert abs(_quat_norm(q) - 1.0) < 1e-9, f"rvec={rvec}: norm={_quat_norm(q)}" + + +def test_rvec_z_rotation_matches_rot_mat(): + """rvec=[0,0,π/3] should match 60° Z rotation matrix quaternion.""" + angle = math.pi / 3 + rvec = np.array([0.0, 0.0, angle]) + R = np.array([ + [math.cos(angle), -math.sin(angle), 0], + [math.sin(angle), math.cos(angle), 0], + [0, 0, 1], + ]) + q_rvec = rvec_to_quat(rvec) + q_R = rot_mat_to_quat(R) + # Allow sign flip (q and -q represent the same rotation) + diff = min( + abs(_quat_norm(np.array(q_rvec) - np.array(q_R))), + abs(_quat_norm(np.array(q_rvec) + np.array(q_R))), + ) + assert diff < 1e-6, f"rvec q={q_rvec}, mat q={q_R}" + + +# ── tvec_distance ───────────────────────────────────────────────────────────── + +def test_tvec_distance_basic(): + assert abs(tvec_distance(np.array([3.0, 4.0, 0.0])) - 5.0) < 1e-9 + + +def test_tvec_distance_zero(): + assert tvec_distance(np.array([0.0, 0.0, 0.0])) == 0.0 + + +def test_tvec_distance_forward(): + assert abs(tvec_distance(np.array([0.0, 0.0, 2.5])) - 2.5) < 1e-9 + + +def test_tvec_distance_3d(): + t = np.array([1.0, 2.0, 3.0]) + expected = math.sqrt(14.0) + assert abs(tvec_distance(t) - expected) < 1e-9 + + +# ── tvec_yaw_rad ────────────────────────────────────────────────────────────── + +def test_yaw_zero_when_centred(): + """Marker directly in front: tx=0 → yaw=0.""" + assert abs(tvec_yaw_rad(np.array([0.0, 0.0, 2.0]))) < 1e-9 + + +def test_yaw_positive_right(): + """Marker to the right (tx>0) → positive yaw.""" + assert tvec_yaw_rad(np.array([1.0, 0.0, 2.0])) > 0.0 + + +def test_yaw_negative_left(): + """Marker to the left (tx<0) → negative yaw.""" + assert tvec_yaw_rad(np.array([-1.0, 0.0, 2.0])) < 0.0 + + +def test_yaw_45deg(): + """tx=tz → 45°.""" + assert abs(tvec_yaw_rad(np.array([1.0, 0.0, 1.0])) - math.pi / 4) < 1e-9 + + +# ── MarkerPose ──────────────────────────────────────────────────────────────── + +def _make_marker(mid=42, tx=0.0, ty=0.0, tz=2.0): + rvec = np.array([0.0, 0.0, 0.1]) + tvec = np.array([tx, ty, tz]) + corners = np.zeros((4, 2)) + return MarkerPose(marker_id=mid, tvec=tvec, rvec=rvec, corners=corners) + + +def test_marker_distance_cached(): + m = _make_marker(tz=3.0) + d1 = m.distance_m + d2 = m.distance_m + assert d1 == d2 + + +def test_marker_lateral_and_forward(): + m = _make_marker(tx=0.5, tz=2.0) + assert abs(m.lateral_m - 0.5) < 1e-9 + assert abs(m.forward_m - 2.0) < 1e-9 + + +def test_marker_quat_unit_norm(): + m = _make_marker() + q = m.quat + assert abs(_quat_norm(q) - 1.0) < 1e-9 + + +def test_marker_yaw_sign(): + m_right = _make_marker(tx=+0.3, tz=1.0) + m_left = _make_marker(tx=-0.3, tz=1.0) + assert m_right.yaw_rad > 0 + assert m_left.yaw_rad < 0