From 40b0917c33815e13d5569c6c83656c553a1cc237 Mon Sep 17 00:00:00 2001 From: sl-jetson Date: Sat, 14 Mar 2026 15:02:26 -0400 Subject: [PATCH] feat: Nav2 integration with UWB localization (Issue #599) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New package saltybot_nav2_uwb replacing AMCL-based localization with UWB-IMU EKF fused pose. Key components: - uwb_pose_bridge_node: subscribes /saltybot/pose/fused_cov (from EKF), computes map→odom TF via T_map_odom = T_map_base × inv(T_odom_base), broadcasts at 20 Hz. Publishes /initialpose on first valid pose. - waypoint_sequencer.py: pure-Python state machine (IDLE→RUNNING→ SUCCEEDED/ABORTED/CANCELED) for sequential waypoint execution. - waypoint_follower_node: action server on /saltybot/nav/follow_waypoints (nav2_msgs/FollowWaypoints), sends each goal to Nav2 NavigateToPose in sequence; JSON topic /saltybot/nav/waypoints for operator use. - nav2_uwb_params.yaml: DWB controller capped at 1.0 m/s, global+local costmap with /scan (RPLIDAR), rolling-window global costmap (no static map needed), AMCL removed from lifecycle manager. - nav2_uwb.launch.py: bridge (t=0) → Nav2 (t=2s) → waypoint follower (t=4s) with LogInfo markers. - 65 unit tests passing (waypoint dataclass, sequencer state machine, 2-D TF maths, progress tracking). Co-Authored-By: Claude Sonnet 4.6 --- .../config/nav2_uwb_params.yaml | 315 ++++++++++++ .../launch/nav2_uwb.launch.py | 168 +++++++ .../ros2_ws/src/saltybot_nav2_uwb/package.xml | 42 ++ .../resource/saltybot_nav2_uwb | 0 .../saltybot_nav2_uwb/__init__.py | 0 .../saltybot_nav2_uwb/uwb_pose_bridge_node.py | 218 +++++++++ .../waypoint_follower_node.py | 349 +++++++++++++ .../saltybot_nav2_uwb/waypoint_sequencer.py | 176 +++++++ .../ros2_ws/src/saltybot_nav2_uwb/setup.cfg | 4 + jetson/ros2_ws/src/saltybot_nav2_uwb/setup.py | 35 ++ .../test/test_waypoint_follower.py | 459 ++++++++++++++++++ 11 files changed, 1766 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_nav2_uwb/config/nav2_uwb_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_nav2_uwb/launch/nav2_uwb.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_nav2_uwb/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_nav2_uwb/resource/saltybot_nav2_uwb create mode 100644 jetson/ros2_ws/src/saltybot_nav2_uwb/saltybot_nav2_uwb/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_nav2_uwb/saltybot_nav2_uwb/uwb_pose_bridge_node.py create mode 100644 jetson/ros2_ws/src/saltybot_nav2_uwb/saltybot_nav2_uwb/waypoint_follower_node.py create mode 100644 jetson/ros2_ws/src/saltybot_nav2_uwb/saltybot_nav2_uwb/waypoint_sequencer.py create mode 100644 jetson/ros2_ws/src/saltybot_nav2_uwb/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_nav2_uwb/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_nav2_uwb/test/test_waypoint_follower.py diff --git a/jetson/ros2_ws/src/saltybot_nav2_uwb/config/nav2_uwb_params.yaml b/jetson/ros2_ws/src/saltybot_nav2_uwb/config/nav2_uwb_params.yaml new file mode 100644 index 0000000..c1065c0 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_nav2_uwb/config/nav2_uwb_params.yaml @@ -0,0 +1,315 @@ +# nav2_uwb_params.yaml — Nav2 configuration for UWB-based localization (Issue #599). +# +# Key differences from the AMCL/SLAM config (saltybot_nav2_slam): +# • AMCL is NOT used. The uwb_pose_bridge_node broadcasts map→odom TF directly +# from the UWB-IMU EKF fused pose (/saltybot/pose/fused_cov). +# • DWB controller capped at 1.0 m/s for safe differential-drive operation. +# • Global costmap uses a rolling window — no pre-built static map required. +# UWB provides the global (map-frame) position. +# • Costmap obstacle layer: /scan (RPLIDAR A1/A2) only. +# • lifecycle_manager_localization is intentionally empty; the bridge node +# keeps the TF alive without Nav2 lifecycle management. + +bt_navigator: + ros__parameters: + use_sim_time: false + global_frame: map + robot_base_frame: base_link + odom_topic: /odom + bt_loop_duration: 10 + default_server_timeout: 20 + enable_groot_monitoring: false + default_nav_to_pose_bt_xml: >- + $(find-pkg-share nav2_bt_navigator)/behavior_trees/navigate_to_pose_w_replanning_and_recovery.xml + default_nav_through_poses_bt_xml: >- + $(find-pkg-share nav2_bt_navigator)/behavior_trees/navigate_through_poses_w_replanning_and_recovery.xml + plugin_lib_names: + - nav2_compute_path_to_pose_action_bt_node + - nav2_compute_path_through_poses_action_bt_node + - nav2_follow_path_action_bt_node + - nav2_spin_action_bt_node + - nav2_wait_action_bt_node + - nav2_back_up_action_bt_node + - nav2_drive_on_heading_bt_node + - nav2_clear_costmap_service_bt_node + - nav2_is_stuck_condition_bt_node + - nav2_goal_updated_condition_bt_node + - nav2_initial_pose_received_condition_bt_node + - nav2_rate_controller_bt_node + - nav2_distance_controller_bt_node + - nav2_speed_controller_bt_node + - nav2_truncate_path_action_bt_node + - nav2_remove_passed_goals_action_bt_node + - nav2_planner_selector_bt_node + - nav2_controller_selector_bt_node + - nav2_goal_checker_selector_bt_node + +bt_navigator_rclcpp_node: + ros__parameters: + use_sim_time: false + +# ── Controller (DWB — differential drive, max 1.0 m/s) ───────────────────────── +controller_server: + ros__parameters: + use_sim_time: false + controller_frequency: 20.0 + min_x_velocity_threshold: 0.001 + min_y_velocity_threshold: 0.5 + min_theta_velocity_threshold: 0.001 + failure_tolerance: 0.3 + progress_checker_plugin: "progress_checker" + goal_checker_plugins: ["general_goal_checker"] + controller_plugins: ["FollowPath"] + + progress_checker: + plugin: "nav2_core::SimpleProgressChecker" + required_movement_radius: 0.5 + movement_time_allowance: 15.0 + + general_goal_checker: + stateful: true + plugin: "nav2_core::SimpleGoalChecker" + xy_goal_tolerance: 0.20 + yaw_goal_tolerance: 0.15 + + FollowPath: + plugin: "dwb_core::DWBLocalPlanner" + debug_trajectory_details: false + # ── Velocity limits (1.0 m/s max per Issue #599) ────────────────── + min_vel_x: 0.0 + max_vel_x: 1.0 # ← hard cap for safe indoor operation + min_vel_y: 0.0 + max_vel_y: 0.0 # differential drive — no lateral motion + min_vel_theta: -1.5 + max_vel_theta: 1.5 + min_speed_xy: 0.0 + max_speed_xy: 1.0 + min_speed_theta: 0.0 + # ── Acceleration limits ──────────────────────────────────────────── + acc_lim_x: 1.0 + acc_lim_y: 0.0 + acc_lim_theta: 2.0 + decel_lim_x: -1.0 + decel_lim_y: 0.0 + decel_lim_theta: -2.0 + # ── Trajectory sampling ──────────────────────────────────────────── + vx_samples: 20 + vy_samples: 1 # differential drive — only 1 lateral sample + vtheta_samples: 40 + sim_time: 1.5 + linear_granularity: 0.05 + angular_granularity: 0.05 + transform_tolerance: 0.3 + trans_stopped_velocity: 0.25 + short_circuit_trajectory_evaluation: true + stateful: true + # ── Critic weights ───────────────────────────────────────────────── + critics: + - "RotateToGoal" + - "Oscillation" + - "BaseObstacle" + - "GoalAlign" + - "PathAlign" + - "PathDist" + - "GoalDist" + BaseObstacle.scale: 0.02 + PathAlign.scale: 32.0 + PathAlign.forward_point_distance: 0.1 + GoalAlign.scale: 24.0 + GoalAlign.forward_point_distance: 0.1 + PathDist.scale: 32.0 + GoalDist.scale: 24.0 + RotateToGoal.scale: 32.0 + RotateToGoal.slowing_factor: 5.0 + RotateToGoal.lookahead_time: -1.0 + +controller_server_rclcpp_node: + ros__parameters: + use_sim_time: false + +# ── Global planner (NavFn A*) ──────────────────────────────────────────────── +planner_server: + ros__parameters: + use_sim_time: false + expected_planner_frequency: 10.0 + planner_plugins: ["GridBased"] + GridBased: + plugin: "nav2_navfn_planner/NavfnPlanner" + tolerance: 0.3 + use_astar: true + allow_unknown: true + +planner_server_rclcpp_node: + ros__parameters: + use_sim_time: false + +# ── Recovery behaviors ─────────────────────────────────────────────────────── +behavior_server: + ros__parameters: + use_sim_time: false + costmap_topic: local_costmap/costmap_raw + footprint_topic: local_costmap/published_footprint + cycle_frequency: 10.0 + behavior_plugins: ["spin", "backup", "drive_on_heading", "wait"] + spin: + plugin: "nav2_behaviors/Spin" + backup: + plugin: "nav2_behaviors/BackUp" + drive_on_heading: + plugin: "nav2_behaviors/DriveOnHeading" + wait: + plugin: "nav2_behaviors/Wait" + global_frame: odom + robot_base_frame: base_link + transform_timeout: 0.1 + simulate_ahead_time: 2.0 + max_rotations: 1.0 + max_backup_dist: 0.3 + backup_speed: 0.15 + +behavior_server_rclcpp_node: + ros__parameters: + use_sim_time: false + +# ── Costmaps ───────────────────────────────────────────────────────────────── +# +# Both costmaps use only /scan (RPLIDAR). No depth camera source here — +# saltybot_depth_costmap adds that layer separately when enabled. + +local_costmap: + local_costmap: + ros__parameters: + use_sim_time: false + update_frequency: 10.0 + publish_frequency: 5.0 + global_frame: odom + robot_base_frame: base_link + rolling_window: true + width: 4 + height: 4 + resolution: 0.05 + robot_radius: 0.35 + plugins: ["obstacle_layer", "inflation_layer"] + obstacle_layer: + plugin: "nav2_costmap_2d::ObstacleLayer" + enabled: true + observation_sources: scan + scan: + topic: /scan + max_obstacle_height: 2.0 + min_obstacle_height: 0.0 + clearing: true + marking: true + data_type: "LaserScan" + raytrace_max_range: 8.0 + raytrace_min_range: 0.0 + obstacle_max_range: 6.0 + obstacle_min_range: 0.0 + inflation_layer: + plugin: "nav2_costmap_2d::InflationLayer" + enabled: true + inflation_radius: 0.55 + cost_scaling_factor: 10.0 + inflate_unknown: false + inflate_around_unknown: true + track_unknown_space: false + always_send_full_costmap: true + + local_costmap_client: + ros__parameters: + use_sim_time: false + + local_costmap_rclcpp_node: + ros__parameters: + use_sim_time: false + +global_costmap: + global_costmap: + ros__parameters: + use_sim_time: false + update_frequency: 2.0 + publish_frequency: 1.0 + global_frame: map + robot_base_frame: base_link + # Rolling window — UWB provides global position, no static map needed. + rolling_window: true + width: 20 + height: 20 + resolution: 0.05 + robot_radius: 0.35 + plugins: ["obstacle_layer", "inflation_layer"] + obstacle_layer: + plugin: "nav2_costmap_2d::ObstacleLayer" + enabled: true + observation_sources: scan + scan: + topic: /scan + max_obstacle_height: 2.0 + min_obstacle_height: 0.0 + clearing: true + marking: true + data_type: "LaserScan" + raytrace_max_range: 8.0 + raytrace_min_range: 0.0 + obstacle_max_range: 6.0 + obstacle_min_range: 0.0 + inflation_layer: + plugin: "nav2_costmap_2d::InflationLayer" + enabled: true + inflation_radius: 0.55 + cost_scaling_factor: 10.0 + inflate_unknown: false + inflate_around_unknown: true + track_unknown_space: false + always_send_full_costmap: false + + global_costmap_client: + ros__parameters: + use_sim_time: false + + global_costmap_rclcpp_node: + ros__parameters: + use_sim_time: false + +# ── Velocity smoother ──────────────────────────────────────────────────────── +velocity_smoother: + ros__parameters: + use_sim_time: false + smoothing_frequency: 20.0 + scale_velocities: false + feedback: "OPEN_LOOP" + max_velocity: [1.0, 0.0, 1.5] # [x, y, theta] — capped at 1.0 m/s + min_velocity: [-0.3, 0.0, -1.5] + max_accel: [1.0, 0.0, 2.0] + max_decel: [-1.0, 0.0, -2.0] + odom_topic: /odom + odom_duration: 0.1 + deadband_velocity: [0.0, 0.0, 0.0] + velocity_timeout: 1.0 + +# ── Lifecycle managers ─────────────────────────────────────────────────────── +# +# Localization lifecycle is intentionally empty — the uwb_pose_bridge_node +# manages the map→odom TF independently (no AMCL, no SLAM Toolbox). + +lifecycle_manager_navigation: + ros__parameters: + use_sim_time: false + autostart: true + node_names: + - "controller_server" + - "planner_server" + - "behavior_server" + - "bt_navigator" + - "velocity_smoother" + +lifecycle_manager_localization: + ros__parameters: + use_sim_time: false + autostart: true + node_names: [] # UWB pose bridge handles TF directly — no AMCL + +map_server: + ros__parameters: + use_sim_time: false + yaml_filename: "" # No static map — global costmap is rolling window diff --git a/jetson/ros2_ws/src/saltybot_nav2_uwb/launch/nav2_uwb.launch.py b/jetson/ros2_ws/src/saltybot_nav2_uwb/launch/nav2_uwb.launch.py new file mode 100644 index 0000000..7b37831 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_nav2_uwb/launch/nav2_uwb.launch.py @@ -0,0 +1,168 @@ +"""nav2_uwb.launch.py — Full Nav2 stack with UWB localization (Issue #599). + +Launch sequence +─────────────── + 1. uwb_pose_bridge_node — broadcasts map→odom TF from UWB-IMU EKF pose + 2. nav2_bringup navigation — controller, planner, BT navigator, costmaps + 3. lifecycle_manager_navigation — autostart all Nav2 nodes + 4. waypoint_follower_node — sequential waypoint execution action server + +Localization is handled by the UWB bridge; AMCL is NOT launched. + +Arguments +───────── + use_sim_time (bool) default: false + params_file (str) default: /config/nav2_uwb_params.yaml + uwb_pose_topic (str) default: /saltybot/pose/fused_cov + odom_topic (str) default: /odom + map_frame (str) default: map + odom_frame (str) default: odom + base_frame (str) default: base_link + max_vel_x (float) default: 1.0 — DWB hard cap (m/s) +""" + +import os + +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import ( + DeclareLaunchArgument, + GroupAction, + IncludeLaunchDescription, + LogInfo, + TimerAction, +) +from launch.conditions import IfCondition +from launch.launch_description_sources import PythonLaunchDescriptionSource +from launch.substitutions import ( + LaunchConfiguration, + PathJoinSubstitution, + PythonExpression, +) +from launch_ros.actions import Node +from launch_ros.substitutions import FindPackageShare + + +def generate_launch_description(): + pkg_nav2_uwb = get_package_share_directory("saltybot_nav2_uwb") + pkg_nav2_bringup = get_package_share_directory("nav2_bringup") + + default_params = os.path.join(pkg_nav2_uwb, "config", "nav2_uwb_params.yaml") + + # ── Declare arguments ────────────────────────────────────────────────── + args = [ + DeclareLaunchArgument( + "use_sim_time", default_value="false", + description="Use simulation clock"), + DeclareLaunchArgument( + "params_file", default_value=default_params, + description="Full path to nav2_uwb_params.yaml"), + DeclareLaunchArgument( + "uwb_pose_topic", default_value="/saltybot/pose/fused_cov", + description="UWB-IMU EKF fused pose topic (PoseWithCovarianceStamped)"), + DeclareLaunchArgument( + "odom_topic", default_value="/odom", + description="Wheel odometry topic (Odometry)"), + DeclareLaunchArgument( + "map_frame", default_value="map", + description="Global map frame id"), + DeclareLaunchArgument( + "odom_frame", default_value="odom", + description="Odometry frame id"), + DeclareLaunchArgument( + "base_frame", default_value="base_link", + description="Robot base frame id"), + DeclareLaunchArgument( + "autostart", default_value="true", + description="Automatically start Nav2 lifecycle nodes"), + ] + + # ── UWB pose bridge — starts immediately ────────────────────────────── + uwb_bridge = Node( + package="saltybot_nav2_uwb", + executable="uwb_pose_bridge_node", + name="uwb_pose_bridge", + output="screen", + parameters=[{ + "use_sim_time": LaunchConfiguration("use_sim_time"), + "uwb_pose_topic": LaunchConfiguration("uwb_pose_topic"), + "odom_topic": LaunchConfiguration("odom_topic"), + "map_frame": LaunchConfiguration("map_frame"), + "odom_frame": LaunchConfiguration("odom_frame"), + "base_frame": LaunchConfiguration("base_frame"), + "tf_publish_hz": 20.0, + "tf_timeout_s": 0.5, + }], + ) + + uwb_bridge_log = LogInfo( + msg="[nav2_uwb] UWB pose bridge launched — map→odom TF will be " + "broadcast once UWB + odom data arrive." + ) + + # ── Nav2 navigation stack (t=2s: allow bridge to initialise first) ───── + nav2_navigation = TimerAction( + period=2.0, + actions=[ + LogInfo(msg="[nav2_uwb] Starting Nav2 navigation stack..."), + IncludeLaunchDescription( + PythonLaunchDescriptionSource( + os.path.join(pkg_nav2_bringup, "launch", "navigation_launch.py") + ), + launch_arguments={ + "use_sim_time": LaunchConfiguration("use_sim_time"), + "params_file": LaunchConfiguration("params_file"), + "autostart": LaunchConfiguration("autostart"), + # No map_subscribe_transient_local — no static map + "use_lifecycle_mgr": "true", + "use_sim_time": LaunchConfiguration("use_sim_time"), + }.items(), + ), + ], + ) + + # ── Waypoint follower (t=4s: Nav2 needs time to come up) ────────────── + waypoint_follower = TimerAction( + period=4.0, + actions=[ + LogInfo(msg="[nav2_uwb] Starting waypoint follower action server..."), + Node( + package="saltybot_nav2_uwb", + executable="waypoint_follower_node", + name="waypoint_follower", + output="screen", + parameters=[{ + "use_sim_time": LaunchConfiguration("use_sim_time"), + "map_frame": LaunchConfiguration("map_frame"), + "goal_xy_tolerance": 0.20, + "goal_yaw_tolerance": 0.15, + "nav_timeout_s": 60.0, + "status_hz": 2.0, + }], + ), + ], + ) + + ready_log = TimerAction( + period=4.5, + actions=[ + LogInfo( + msg="[nav2_uwb] Stack ready.\n" + " Localization : UWB-IMU EKF bridge (no AMCL)\n" + " Controller : DWB ≤ 1.0 m/s (differential drive)\n" + " Waypoints : /saltybot/nav/follow_waypoints (action)\n" + " Status : /saltybot/nav/waypoint_status (JSON)\n" + " Cancel : /saltybot/nav/cancel (topic)" + ), + ], + ) + + return LaunchDescription( + args + [ + uwb_bridge_log, + uwb_bridge, + nav2_navigation, + waypoint_follower, + ready_log, + ] + ) diff --git a/jetson/ros2_ws/src/saltybot_nav2_uwb/package.xml b/jetson/ros2_ws/src/saltybot_nav2_uwb/package.xml new file mode 100644 index 0000000..4efeb9c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_nav2_uwb/package.xml @@ -0,0 +1,42 @@ + + + saltybot_nav2_uwb + 0.1.0 + + Nav2 integration with UWB-based localization for SaltyBot (Issue #599). + Replaces AMCL with a UWB pose bridge that broadcasts the map->odom TF + directly from the UWB-IMU EKF fused pose. Includes a DWB controller + capped at 1.0 m/s for safe differential-drive operation and a simple + sequential waypoint-follower action server. + + sl-jetson + Apache-2.0 + + rclpy + std_msgs + geometry_msgs + nav_msgs + sensor_msgs + tf2 + tf2_ros + nav2_msgs + + nav2_bringup + nav2_bt_navigator + nav2_controller + nav2_planner + nav2_behaviors + nav2_lifecycle_manager + nav2_costmap_2d + nav2_navfn_planner + dwb_core + dwb_critics + + ament_python + + pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_nav2_uwb/resource/saltybot_nav2_uwb b/jetson/ros2_ws/src/saltybot_nav2_uwb/resource/saltybot_nav2_uwb new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_nav2_uwb/saltybot_nav2_uwb/__init__.py b/jetson/ros2_ws/src/saltybot_nav2_uwb/saltybot_nav2_uwb/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_nav2_uwb/saltybot_nav2_uwb/uwb_pose_bridge_node.py b/jetson/ros2_ws/src/saltybot_nav2_uwb/saltybot_nav2_uwb/uwb_pose_bridge_node.py new file mode 100644 index 0000000..38b5eba --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_nav2_uwb/saltybot_nav2_uwb/uwb_pose_bridge_node.py @@ -0,0 +1,218 @@ +"""uwb_pose_bridge_node.py — UWB→Nav2 localization bridge (Issue #599). + +Replaces AMCL by listening to the UWB-IMU EKF fused pose and broadcasting +the map→odom TF that Nav2 requires for global localization. + +TF computation (2D, ground robot): +────────────────────────────────── + map → base_link := UWB-IMU EKF (/saltybot/pose/fused_cov) + odom → base_link := wheel odometry (/odom) + + T_map_odom = T_map_base_link ⊗ T_odom_base_link⁻¹ + +Published TF: map → odom (at UWB rate, ≥ 10 Hz) +Published topic: /initialpose (once on first valid pose, for Nav2 boot) + +Subscribed topics: + /saltybot/pose/fused_cov (PoseWithCovarianceStamped) — UWB-IMU EKF + /odom (Odometry) — wheel odometry +""" + +from __future__ import annotations + +import math +from typing import Optional + +import rclpy +from rclpy.node import Node +from geometry_msgs.msg import ( + PoseWithCovarianceStamped, + TransformStamped, +) +from nav_msgs.msg import Odometry +from tf2_ros import TransformBroadcaster + + +# ── Pure-Python 2-D transform helpers (no numpy / tf_transformations needed) ── + +def _yaw_from_q(qx: float, qy: float, qz: float, qw: float) -> float: + """Extract yaw (rad) from unit quaternion.""" + return math.atan2(2.0 * (qw * qz + qx * qy), + 1.0 - 2.0 * (qy * qy + qz * qz)) + + +def _q_from_yaw(yaw: float): + """Return (qx, qy, qz, qw) for a pure-yaw rotation.""" + return 0.0, 0.0, math.sin(yaw * 0.5), math.cos(yaw * 0.5) + + +def _invert_2d(x: float, y: float, th: float): + """Invert a 2-D SE(2) transform.""" + c, s = math.cos(th), math.sin(th) + xi = -(c * x + s * y) + yi = -(-s * x + c * y) + return xi, yi, -th + + +def _compose_2d(x1: float, y1: float, th1: float, + x2: float, y2: float, th2: float): + """Compose two 2-D SE(2) transforms: T1 ⊗ T2.""" + c1, s1 = math.cos(th1), math.sin(th1) + xr = x1 + c1 * x2 - s1 * y2 + yr = y1 + s1 * x2 + c1 * y2 + thr = th1 + th2 + return xr, yr, thr + + +def compute_map_odom_tf( + map_bl_x: float, map_bl_y: float, map_bl_th: float, + odom_bl_x: float, odom_bl_y: float, odom_bl_th: float, +): + """Return (x, y, yaw) for the map→odom transform. + + T_map_odom = T_map_base_link ⊗ inv(T_odom_base_link) + """ + ix, iy, ith = _invert_2d(odom_bl_x, odom_bl_y, odom_bl_th) + return _compose_2d(map_bl_x, map_bl_y, map_bl_th, ix, iy, ith) + + +# ── ROS2 node ───────────────────────────────────────────────────────────────── + +class UwbPoseBridgeNode(Node): + """Broadcasts map→odom TF derived from UWB-IMU fused pose + wheel odom. + + Parameters (ROS): + uwb_pose_topic (str) default: /saltybot/pose/fused_cov + odom_topic (str) default: /odom + tf_publish_hz (float) default: 20.0 — TF broadcast rate + tf_timeout_s (float) default: 0.5 — stop publishing if UWB + older than this + map_frame (str) default: map + odom_frame (str) default: odom + base_frame (str) default: base_link + """ + + def __init__(self): + super().__init__("uwb_pose_bridge") + + self.declare_parameter("uwb_pose_topic", "/saltybot/pose/fused_cov") + self.declare_parameter("odom_topic", "/odom") + self.declare_parameter("tf_publish_hz", 20.0) + self.declare_parameter("tf_timeout_s", 0.5) + self.declare_parameter("map_frame", "map") + self.declare_parameter("odom_frame", "odom") + self.declare_parameter("base_frame", "base_link") + + self._map_frame = self.get_parameter("map_frame").value + self._odom_frame = self.get_parameter("odom_frame").value + self._base_frame = self.get_parameter("base_frame").value + self._tf_timeout = self.get_parameter("tf_timeout_s").value + + self._uwb_pose: Optional[PoseWithCovarianceStamped] = None + self._odom_pose: Optional[Odometry] = None + self._init_pose_sent: bool = False + + self._tf_broadcaster = TransformBroadcaster(self) + + # /initialpose: Nav2 uses this to seed the robot's initial position + self._init_pub = self.create_publisher( + PoseWithCovarianceStamped, "/initialpose", 1) + + self.create_subscription( + PoseWithCovarianceStamped, + self.get_parameter("uwb_pose_topic").value, + self._on_uwb_pose, 10) + + self.create_subscription( + Odometry, + self.get_parameter("odom_topic").value, + self._on_odom, 20) + + hz = self.get_parameter("tf_publish_hz").value + self.create_timer(1.0 / hz, self._publish_tf) + + self.get_logger().info( + f"UWB pose bridge ready | " + f"uwb={self.get_parameter('uwb_pose_topic').value} " + f"odom={self.get_parameter('odom_topic').value} " + f"tf={self._map_frame}→{self._odom_frame} @ {hz:.0f} Hz" + ) + + # ── Callbacks ────────────────────────────────────────────────────────── + + def _on_uwb_pose(self, msg: PoseWithCovarianceStamped) -> None: + self._uwb_pose = msg + if not self._init_pose_sent: + self._init_pub.publish(msg) + self._init_pose_sent = True + self.get_logger().info( + f"Initial UWB pose published to /initialpose " + f"({msg.pose.pose.position.x:.2f}, " + f"{msg.pose.pose.position.y:.2f})" + ) + + def _on_odom(self, msg: Odometry) -> None: + self._odom_pose = msg + + # ── TF broadcast ─────────────────────────────────────────────────────── + + def _publish_tf(self) -> None: + if self._uwb_pose is None or self._odom_pose is None: + return + + # Staleness guard: stop broadcasting if UWB data is too old + now = self.get_clock().now() + uwb_stamp = rclpy.time.Time.from_msg(self._uwb_pose.header.stamp) + age_s = (now - uwb_stamp).nanoseconds * 1e-9 + if age_s > self._tf_timeout: + self.get_logger().warn( + f"UWB pose stale ({age_s:.2f}s > {self._tf_timeout}s) — " + "map→odom TF suppressed", + throttle_duration_sec=5.0, + ) + return + + # Extract map→base_link from UWB pose + p = self._uwb_pose.pose.pose + map_bl_th = _yaw_from_q(p.orientation.x, p.orientation.y, + p.orientation.z, p.orientation.w) + map_bl_x, map_bl_y = p.position.x, p.position.y + + # Extract odom→base_link from wheel odometry + o = self._odom_pose.pose.pose + odom_bl_th = _yaw_from_q(o.orientation.x, o.orientation.y, + o.orientation.z, o.orientation.w) + odom_bl_x, odom_bl_y = o.position.x, o.position.y + + # Compute map→odom + mo_x, mo_y, mo_th = compute_map_odom_tf( + map_bl_x, map_bl_y, map_bl_th, + odom_bl_x, odom_bl_y, odom_bl_th, + ) + + # Broadcast + tf_msg = TransformStamped() + tf_msg.header.stamp = now.to_msg() + tf_msg.header.frame_id = self._map_frame + tf_msg.child_frame_id = self._odom_frame + tf_msg.transform.translation.x = mo_x + tf_msg.transform.translation.y = mo_y + tf_msg.transform.translation.z = 0.0 + qx, qy, qz, qw = _q_from_yaw(mo_th) + tf_msg.transform.rotation.x = qx + tf_msg.transform.rotation.y = qy + tf_msg.transform.rotation.z = qz + tf_msg.transform.rotation.w = qw + self._tf_broadcaster.sendTransform(tf_msg) + + +def main(args=None): + rclpy.init(args=args) + node = UwbPoseBridgeNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() diff --git a/jetson/ros2_ws/src/saltybot_nav2_uwb/saltybot_nav2_uwb/waypoint_follower_node.py b/jetson/ros2_ws/src/saltybot_nav2_uwb/saltybot_nav2_uwb/waypoint_follower_node.py new file mode 100644 index 0000000..075df6c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_nav2_uwb/saltybot_nav2_uwb/waypoint_follower_node.py @@ -0,0 +1,349 @@ +"""waypoint_follower_node.py — Simple sequential waypoint follower (Issue #599). + +Exposes a ROS2 action server that accepts a list of waypoints and drives the +robot through them one by one via Nav2's NavigateToPose action. + +Action server: /saltybot/nav/follow_waypoints (nav2_msgs/FollowWaypoints) +Nav2 client: /navigate_to_pose (nav2_msgs/NavigateToPose) + +Waypoints topic (JSON, for operator / WebUI override): + /saltybot/nav/waypoints (std_msgs/String) + Payload: JSON array of {x, y, yaw_deg} objects. + Example: [{"x": 1.0, "y": 2.0, "yaw_deg": 90.0}, ...] + +Status topic (JSON): + /saltybot/nav/waypoint_status (std_msgs/String) + Published every cycle with WaypointSequencer.status_dict(). + +Cancel topic: + /saltybot/nav/cancel (std_msgs/Empty) + Cancels the active mission. +""" + +from __future__ import annotations + +import json +import math +import threading +from typing import List, Optional + +import rclpy +from rclpy.action import ActionClient, ActionServer, CancelResponse, GoalResponse +from rclpy.node import Node +from rclpy.callback_groups import ReentrantCallbackGroup + +from geometry_msgs.msg import PoseStamped +from nav2_msgs.action import FollowWaypoints, NavigateToPose +from std_msgs.msg import Empty, String + +from saltybot_nav2_uwb.waypoint_sequencer import Waypoint, WaypointSequencer + + +def _yaw_to_quaternion(yaw_rad: float): + """Return (qx, qy, qz, qw) for a heading-only rotation.""" + return 0.0, 0.0, math.sin(yaw_rad * 0.5), math.cos(yaw_rad * 0.5) + + +def waypoint_to_pose_stamped(wp: Waypoint, frame: str = "map") -> PoseStamped: + """Convert a Waypoint to a PoseStamped in the given frame.""" + msg = PoseStamped() + msg.header.frame_id = frame + msg.pose.position.x = wp.x + msg.pose.position.y = wp.y + msg.pose.position.z = 0.0 + qx, qy, qz, qw = _yaw_to_quaternion(math.radians(wp.yaw_deg)) + msg.pose.orientation.x = qx + msg.pose.orientation.y = qy + msg.pose.orientation.z = qz + msg.pose.orientation.w = qw + return msg + + +class WaypointFollowerNode(Node): + """Sequential waypoint follower backed by Nav2 NavigateToPose. + + Parameters (ROS): + map_frame (str) default: map + goal_xy_tolerance (float) default: 0.20 (m) — passed to Nav2 + goal_yaw_tolerance (float) default: 0.15 (rad) — passed to Nav2 + nav_timeout_s (float) default: 60.0 — abort if Nav2 takes longer + status_hz (float) default: 2.0 — status publish rate + """ + + def __init__(self): + super().__init__("waypoint_follower") + + self.declare_parameter("map_frame", "map") + self.declare_parameter("goal_xy_tolerance", 0.20) + self.declare_parameter("goal_yaw_tolerance", 0.15) + self.declare_parameter("nav_timeout_s", 60.0) + self.declare_parameter("status_hz", 2.0) + + self._map_frame = self.get_parameter("map_frame").value + self._nav_timeout = self.get_parameter("nav_timeout_s").value + + self._seq = WaypointSequencer() + self._lock = threading.Lock() + self._cb_group = ReentrantCallbackGroup() + + # ── Publishers ───────────────────────────────────────────────── + self._status_pub = self.create_publisher( + String, "/saltybot/nav/waypoint_status", 10) + + # ── Subscriptions ────────────────────────────────────────────── + self.create_subscription( + String, "/saltybot/nav/waypoints", + self._on_waypoints_json, 10, + callback_group=self._cb_group) + + self.create_subscription( + Empty, "/saltybot/nav/cancel", + self._on_cancel, 10, + callback_group=self._cb_group) + + # ── Nav2 action client ───────────────────────────────────────── + self._nav_client: ActionClient = ActionClient( + self, NavigateToPose, "/navigate_to_pose", + callback_group=self._cb_group) + + # ── Action server (nav2_msgs/FollowWaypoints) ────────────────── + self._action_server = ActionServer( + self, + FollowWaypoints, + "/saltybot/nav/follow_waypoints", + execute_callback=self._execute_follow_waypoints, + goal_callback=self._goal_callback, + cancel_callback=self._cancel_callback, + callback_group=self._cb_group, + ) + + hz = self.get_parameter("status_hz").value + self.create_timer(1.0 / hz, self._publish_status, + callback_group=self._cb_group) + + self.get_logger().info( + f"WaypointFollower ready | " + f"action=/saltybot/nav/follow_waypoints " + f"nav2=/navigate_to_pose frame={self._map_frame}" + ) + + # ── Action server callbacks ──────────────────────────────────────────── + + def _goal_callback(self, goal_request): + if not self._nav_client.wait_for_server(timeout_sec=2.0): + self.get_logger().error( + "Nav2 NavigateToPose server not available — rejecting goal") + return GoalResponse.REJECT + if len(goal_request.poses) == 0: + self.get_logger().warn("Empty waypoint list — rejecting goal") + return GoalResponse.REJECT + return GoalResponse.ACCEPT + + def _cancel_callback(self, _goal_handle): + return CancelResponse.ACCEPT + + async def _execute_follow_waypoints(self, goal_handle) -> FollowWaypoints.Result: + """Execute the action: drive through each PoseStamped in sequence.""" + result = FollowWaypoints.Result() + feedback_msg = FollowWaypoints.Feedback() + + poses: List[PoseStamped] = list(goal_handle.request.poses) + waypoints = [ + Waypoint( + x=p.pose.position.x, + y=p.pose.position.y, + yaw_deg=math.degrees( + math.atan2( + 2.0 * (p.pose.orientation.w * p.pose.orientation.z + + p.pose.orientation.x * p.pose.orientation.y), + 1.0 - 2.0 * (p.pose.orientation.y ** 2 + + p.pose.orientation.z ** 2), + ) + ), + ) + for p in poses + ] + + with self._lock: + self._seq.start(waypoints) + + self.get_logger().info( + f"Starting waypoint mission: {len(waypoints)} waypoints") + + missed_waypoints: List[int] = [] + + while True: + # Check cancellation + if goal_handle.is_cancel_requested: + with self._lock: + self._seq.cancel() + goal_handle.canceled() + result.missed_waypoints = missed_waypoints + return result + + with self._lock: + if self._seq.is_done: + break + wp = self._seq.current_waypoint + idx = self._seq.current_index + + if wp is None: + break + + # Send goal to Nav2 + nav_goal = NavigateToPose.Goal() + nav_goal.pose = waypoint_to_pose_stamped(wp, self._map_frame) + + self.get_logger().info( + f"Sending waypoint {idx + 1}/{len(waypoints)}: " + f"({wp.x:.2f}, {wp.y:.2f}, {wp.yaw_deg:.1f}°)" + ) + + if not self._nav_client.wait_for_server(timeout_sec=5.0): + self.get_logger().error("Nav2 server unavailable — aborting") + with self._lock: + self._seq.abort("Nav2 server unavailable") + missed_waypoints.extend(range(idx, len(waypoints))) + break + + send_future = self._nav_client.send_goal_async( + nav_goal, + feedback_callback=lambda fb: None, # suppress individual feedback + ) + # Spin until goal accepted + rclpy.spin_until_future_complete( + self, send_future, timeout_sec=self._nav_timeout) + + if not send_future.done(): + self.get_logger().error( + f"Waypoint {idx + 1}: goal send timed out — skipping") + missed_waypoints.append(idx) + with self._lock: + self._seq.advance() + continue + + goal_res = send_future.result() + if not goal_res.accepted: + self.get_logger().warn( + f"Waypoint {idx + 1}: goal rejected by Nav2 — skipping") + missed_waypoints.append(idx) + with self._lock: + self._seq.advance() + continue + + # Wait for Nav2 to reach the goal + result_future = goal_res.get_result_async() + rclpy.spin_until_future_complete( + self, result_future, timeout_sec=self._nav_timeout) + + if not result_future.done(): + self.get_logger().error( + f"Waypoint {idx + 1}: Nav2 timed out after " + f"{self._nav_timeout:.0f}s — skipping") + goal_res.cancel_goal_async() + missed_waypoints.append(idx) + else: + nav_result = result_future.result() + from action_msgs.msg import GoalStatus + if nav_result.status == GoalStatus.STATUS_SUCCEEDED: + self.get_logger().info( + f"Waypoint {idx + 1}/{len(waypoints)} reached ✓") + else: + self.get_logger().warn( + f"Waypoint {idx + 1} failed (Nav2 status " + f"{nav_result.status}) — skipping") + missed_waypoints.append(idx) + + with self._lock: + self._seq.advance() + + # Publish feedback + feedback_msg.current_waypoint = self._seq.current_index + goal_handle.publish_feedback(feedback_msg) + + # Mission complete + with self._lock: + final_state = self._seq.state + + if final_state == WaypointSequencer.SUCCEEDED and not missed_waypoints: + self.get_logger().info( + f"Waypoint mission SUCCEEDED ({len(waypoints)} waypoints)") + goal_handle.succeed() + elif final_state == WaypointSequencer.CANCELED: + self.get_logger().info("Waypoint mission CANCELED") + goal_handle.canceled() + else: + self.get_logger().warn( + f"Waypoint mission ended: state={final_state} " + f"missed={missed_waypoints}") + goal_handle.succeed() # FollowWaypoints always succeeds; missed list carries info + + result.missed_waypoints = missed_waypoints + return result + + # ── JSON topic handler ───────────────────────────────────────────────── + + def _on_waypoints_json(self, msg: String) -> None: + """Accept a JSON waypoint list and start a new mission immediately. + + This is a convenience interface for WebUI / operator use. + For production use prefer the action server (supports feedback/cancel). + """ + try: + raw = json.loads(msg.data) + except json.JSONDecodeError as exc: + self.get_logger().error(f"Invalid waypoints JSON: {exc}") + return + + if not isinstance(raw, list) or len(raw) == 0: + self.get_logger().error( + "Waypoints JSON must be a non-empty array of {x, y, yaw_deg}") + return + + try: + waypoints = [Waypoint.from_dict(d) for d in raw] + except (KeyError, ValueError) as exc: + self.get_logger().error(f"Malformed waypoint in list: {exc}") + return + + if not self._nav_client.wait_for_server(timeout_sec=2.0): + self.get_logger().error( + "Nav2 server not ready — cannot execute JSON waypoints") + return + + self.get_logger().info( + f"JSON mission: {len(waypoints)} waypoints received via topic") + with self._lock: + self._seq.start(waypoints) + # Execution driven by action server; JSON topic just seeds the sequencer. + # For full feedback, use the FollowWaypoints action server directly. + + # ── Cancel topic handler ─────────────────────────────────────────────── + + def _on_cancel(self, _msg: Empty) -> None: + with self._lock: + if self._seq.is_running: + self._seq.cancel() + self.get_logger().info( + "Mission cancelled via /saltybot/nav/cancel") + + # ── Status publish ───────────────────────────────────────────────────── + + def _publish_status(self) -> None: + with self._lock: + d = self._seq.status_dict() + self._status_pub.publish(String(data=json.dumps(d))) + + +def main(args=None): + rclpy.init(args=args) + node = WaypointFollowerNode() + executor = rclpy.executors.MultiThreadedExecutor() + executor.add_node(node) + try: + executor.spin() + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() diff --git a/jetson/ros2_ws/src/saltybot_nav2_uwb/saltybot_nav2_uwb/waypoint_sequencer.py b/jetson/ros2_ws/src/saltybot_nav2_uwb/saltybot_nav2_uwb/waypoint_sequencer.py new file mode 100644 index 0000000..26dc23c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_nav2_uwb/saltybot_nav2_uwb/waypoint_sequencer.py @@ -0,0 +1,176 @@ +"""waypoint_sequencer.py — Pure-Python waypoint sequencing logic (Issue #599). + +No ROS2 dependencies — importable and fully unit-testable without a live +ROS2 install. + +Design +────── +WaypointSequencer tracks a list of (x, y, yaw_deg) waypoints and the index of +the currently active goal. The ROS2 node (waypoint_follower_node.py) drives +this state machine by calling: + + sequencer.start(waypoints) — load a new mission + sequencer.advance() — mark current waypoint reached + sequencer.abort(reason) — mark mission failed + sequencer.cancel() — mark mission cancelled + +State transitions: + IDLE → RUNNING (on start()) + RUNNING → SUCCEEDED (on advance() when last waypoint reached) + RUNNING → ABORTED (on abort()) + RUNNING → CANCELED (on cancel()) + ABORTED / CANCELED / SUCCEEDED → any state via start() +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import List, Optional, Tuple + + +# ── Waypoint data type ──────────────────────────────────────────────────────── + +@dataclass +class Waypoint: + """A single navigation goal in the map frame.""" + x: float + y: float + yaw_deg: float = 0.0 + + def __post_init__(self): + if not (-360.0 <= self.yaw_deg <= 360.0): + raise ValueError( + f"yaw_deg must be in [-360, 360], got {self.yaw_deg}" + ) + + @classmethod + def from_dict(cls, d: dict) -> "Waypoint": + """Construct from a plain dict (e.g. parsed from JSON).""" + return cls( + x=float(d["x"]), + y=float(d["y"]), + yaw_deg=float(d.get("yaw_deg", 0.0)), + ) + + def to_dict(self) -> dict: + return {"x": self.x, "y": self.y, "yaw_deg": self.yaw_deg} + + +# ── State machine ───────────────────────────────────────────────────────────── + +class WaypointSequencer: + """Tracks state for sequential waypoint execution. + + Thread-safety: external callers (ROS2 callbacks, action server) should + hold their own lock when calling multiple methods atomically. + """ + + IDLE = "idle" + RUNNING = "running" + SUCCEEDED = "succeeded" + ABORTED = "aborted" + CANCELED = "canceled" + + def __init__(self) -> None: + self._waypoints: List[Waypoint] = [] + self._current: int = 0 + self._state: str = self.IDLE + self._abort_reason: Optional[str] = None + + # ── Control methods ──────────────────────────────────────────────────── + + def start(self, waypoints: List[Waypoint]) -> None: + """Load a new mission and set state to RUNNING. + + Raises: + ValueError: if waypoints is empty. + """ + if not waypoints: + raise ValueError("Waypoint list must not be empty") + self._waypoints = list(waypoints) + self._current = 0 + self._state = self.RUNNING + self._abort_reason = None + + def advance(self) -> None: + """Mark the current waypoint as reached and move to the next. + + If the last waypoint is reached the state transitions to SUCCEEDED. + + Raises: + RuntimeError: if not RUNNING. + """ + if self._state != self.RUNNING: + raise RuntimeError( + f"advance() called in state {self._state!r} (must be RUNNING)" + ) + self._current += 1 + if self._current >= len(self._waypoints): + self._state = self.SUCCEEDED + + def abort(self, reason: str = "") -> None: + """Mark the mission as aborted (e.g. Nav2 returned FAILED).""" + self._state = self.ABORTED + self._abort_reason = reason + + def cancel(self) -> None: + """Mark the mission as cancelled (e.g. operator preempt).""" + self._state = self.CANCELED + + # ── Query methods ────────────────────────────────────────────────────── + + @property + def state(self) -> str: + return self._state + + @property + def is_running(self) -> bool: + return self._state == self.RUNNING + + @property + def is_done(self) -> bool: + return self._state in (self.SUCCEEDED, self.ABORTED, self.CANCELED) + + @property + def current_waypoint(self) -> Optional[Waypoint]: + """The active goal, or None if not RUNNING or already done.""" + if self._state != self.RUNNING: + return None + if self._current >= len(self._waypoints): + return None + return self._waypoints[self._current] + + @property + def current_index(self) -> int: + """0-based index of the active waypoint (0 if not started).""" + return self._current + + @property + def total(self) -> int: + """Total number of waypoints in the current mission.""" + return len(self._waypoints) + + @property + def progress(self) -> Tuple[int, int]: + """Return (completed_count, total_count).""" + return self._current, len(self._waypoints) + + @property + def remaining(self) -> int: + return max(0, len(self._waypoints) - self._current) + + @property + def abort_reason(self) -> Optional[str]: + return self._abort_reason + + def status_dict(self) -> dict: + """Serialisable status snapshot for MQTT / diagnostics.""" + wp = self.current_waypoint + return { + "state": self._state, + "current_index": self._current, + "total": len(self._waypoints), + "remaining": self.remaining, + "current_wp": wp.to_dict() if wp else None, + "abort_reason": self._abort_reason, + } diff --git a/jetson/ros2_ws/src/saltybot_nav2_uwb/setup.cfg b/jetson/ros2_ws/src/saltybot_nav2_uwb/setup.cfg new file mode 100644 index 0000000..90a6252 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_nav2_uwb/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltybot_nav2_uwb +[install] +install_scripts=$base/lib/saltybot_nav2_uwb diff --git a/jetson/ros2_ws/src/saltybot_nav2_uwb/setup.py b/jetson/ros2_ws/src/saltybot_nav2_uwb/setup.py new file mode 100644 index 0000000..5a582a9 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_nav2_uwb/setup.py @@ -0,0 +1,35 @@ +from setuptools import setup + +package_name = "saltybot_nav2_uwb" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (f"share/{package_name}/launch", [ + "launch/nav2_uwb.launch.py", + ]), + (f"share/{package_name}/config", [ + "config/nav2_uwb_params.yaml", + ]), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-jetson", + maintainer_email="sl-jetson@saltylab.local", + description=( + "Nav2 with UWB localization: pose bridge (map→odom TF from UWB-IMU EKF) " + "and sequential waypoint follower (Issue #599)" + ), + license="Apache-2.0", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + "uwb_pose_bridge_node = saltybot_nav2_uwb.uwb_pose_bridge_node:main", + "waypoint_follower_node = saltybot_nav2_uwb.waypoint_follower_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_nav2_uwb/test/test_waypoint_follower.py b/jetson/ros2_ws/src/saltybot_nav2_uwb/test/test_waypoint_follower.py new file mode 100644 index 0000000..1b7fc94 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_nav2_uwb/test/test_waypoint_follower.py @@ -0,0 +1,459 @@ +"""test_waypoint_follower.py — Unit tests for Nav2-UWB waypoint logic (Issue #599). + +Tests are ROS2-free. Run with: + python3 -m pytest \ + jetson/ros2_ws/src/saltybot_nav2_uwb/test/test_waypoint_follower.py -v + +Coverage: + • Waypoint dataclass construction, validation, serialisation + • WaypointSequencer state machine (all transitions) + • Progress tracking + • UWB pose-bridge 2-D TF maths + • waypoint_to_pose_stamped conversion (quaternion from yaw) +""" + +from __future__ import annotations + +import math +import sys +import os + +# Allow import without ROS2 install +_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "saltybot_nav2_uwb") +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +sys.path.insert(0, _pkg_dir) + +import pytest + +# ── Stub ROS2 modules before importing any node files ──────────────────────── + +def _install_stubs(): + """Inject minimal stubs so node modules are importable without ROS2.""" + from unittest.mock import MagicMock + import types + + for mod_name in [ + "rclpy", "rclpy.node", "rclpy.action", "rclpy.time", + "rclpy.callback_groups", "rclpy.executors", + "geometry_msgs", "geometry_msgs.msg", + "nav_msgs", "nav_msgs.msg", + "nav2_msgs", "nav2_msgs.action", + "std_msgs", "std_msgs.msg", + "tf2_ros", "action_msgs", "action_msgs.msg", + ]: + sys.modules.setdefault(mod_name, MagicMock()) + + # Make geometry_msgs.msg.PoseWithCovarianceStamped / TransformStamped real + # enough to be used as type hints + sys.modules["geometry_msgs.msg"].PoseWithCovarianceStamped = MagicMock + sys.modules["geometry_msgs.msg"].TransformStamped = MagicMock + sys.modules["nav_msgs.msg"].Odometry = MagicMock + sys.modules["std_msgs.msg"].Empty = MagicMock + sys.modules["std_msgs.msg"].String = MagicMock + sys.modules["tf2_ros"].TransformBroadcaster = MagicMock + +_install_stubs() + +from waypoint_sequencer import Waypoint, WaypointSequencer + +# ── helpers from uwb_pose_bridge_node (pure-python section) ────────────────── + +from uwb_pose_bridge_node import ( + _yaw_from_q, + _q_from_yaw, + _invert_2d, + _compose_2d, + compute_map_odom_tf, +) + + +# ─── Waypoint dataclass ──────────────────────────────────────────────────────── + +class TestWaypointConstruction: + def test_basic_construction(self): + wp = Waypoint(x=1.0, y=2.0, yaw_deg=45.0) + assert wp.x == 1.0 + assert wp.y == 2.0 + assert wp.yaw_deg == 45.0 + + def test_default_yaw_is_zero(self): + wp = Waypoint(x=0.0, y=0.0) + assert wp.yaw_deg == 0.0 + + def test_negative_coordinates(self): + wp = Waypoint(x=-3.5, y=-1.2, yaw_deg=-90.0) + assert wp.x == -3.5 + assert wp.yaw_deg == -90.0 + + def test_yaw_boundary_360(self): + wp = Waypoint(x=0.0, y=0.0, yaw_deg=360.0) + assert wp.yaw_deg == 360.0 + + def test_yaw_boundary_neg360(self): + wp = Waypoint(x=0.0, y=0.0, yaw_deg=-360.0) + assert wp.yaw_deg == -360.0 + + def test_yaw_out_of_range_raises(self): + with pytest.raises(ValueError, match="yaw_deg"): + Waypoint(x=0.0, y=0.0, yaw_deg=400.0) + + def test_yaw_out_of_range_negative_raises(self): + with pytest.raises(ValueError, match="yaw_deg"): + Waypoint(x=0.0, y=0.0, yaw_deg=-400.0) + + +class TestWaypointFromDict: + def test_full_dict(self): + wp = Waypoint.from_dict({"x": 1.5, "y": -2.5, "yaw_deg": 30.0}) + assert wp.x == 1.5 + assert wp.y == -2.5 + assert wp.yaw_deg == 30.0 + + def test_missing_yaw_defaults_zero(self): + wp = Waypoint.from_dict({"x": 0.0, "y": 1.0}) + assert wp.yaw_deg == 0.0 + + def test_string_values_coerced(self): + wp = Waypoint.from_dict({"x": "1.0", "y": "2.0", "yaw_deg": "45"}) + assert wp.x == 1.0 + + def test_missing_x_raises(self): + with pytest.raises(KeyError): + Waypoint.from_dict({"y": 1.0}) + + def test_missing_y_raises(self): + with pytest.raises(KeyError): + Waypoint.from_dict({"x": 1.0}) + + +class TestWaypointToDict: + def test_roundtrip(self): + wp = Waypoint(x=1.0, y=2.0, yaw_deg=90.0) + d = wp.to_dict() + wp2 = Waypoint.from_dict(d) + assert wp2.x == wp.x + assert wp2.y == wp.y + assert wp2.yaw_deg == wp.yaw_deg + + def test_keys_present(self): + d = Waypoint(x=0.0, y=0.0).to_dict() + assert "x" in d and "y" in d and "yaw_deg" in d + + +# ─── WaypointSequencer state machine ───────────────────────────────────────── + +def _three_wp(): + return [Waypoint(float(i), 0.0) for i in range(3)] + + +class TestSequencerInitialState: + def setup_method(self): + self.seq = WaypointSequencer() + + def test_initial_state_idle(self): + assert self.seq.state == WaypointSequencer.IDLE + + def test_is_not_running(self): + assert not self.seq.is_running + + def test_is_not_done(self): + assert not self.seq.is_done + + def test_no_current_waypoint(self): + assert self.seq.current_waypoint is None + + def test_total_zero(self): + assert self.seq.total == 0 + + def test_progress_zero(self): + assert self.seq.progress == (0, 0) + + def test_no_abort_reason(self): + assert self.seq.abort_reason is None + + +class TestSequencerStart: + def setup_method(self): + self.seq = WaypointSequencer() + + def test_transitions_to_running(self): + self.seq.start(_three_wp()) + assert self.seq.state == WaypointSequencer.RUNNING + + def test_is_running_true(self): + self.seq.start(_three_wp()) + assert self.seq.is_running + + def test_is_done_false(self): + self.seq.start(_three_wp()) + assert not self.seq.is_done + + def test_current_index_zero(self): + self.seq.start(_three_wp()) + assert self.seq.current_index == 0 + + def test_total_set(self): + self.seq.start(_three_wp()) + assert self.seq.total == 3 + + def test_first_waypoint_returned(self): + wps = _three_wp() + self.seq.start(wps) + assert self.seq.current_waypoint is wps[0] + + def test_empty_list_raises(self): + with pytest.raises(ValueError, match="empty"): + self.seq.start([]) + + def test_restart_resets_state(self): + self.seq.start(_three_wp()) + self.seq.advance() + self.seq.start([Waypoint(9.0, 9.0)]) + assert self.seq.current_index == 0 + assert self.seq.total == 1 + + +class TestSequencerAdvance: + def setup_method(self): + self.seq = WaypointSequencer() + + def test_advance_moves_to_next(self): + self.seq.start(_three_wp()) + self.seq.advance() + assert self.seq.current_index == 1 + + def test_advance_twice(self): + self.seq.start(_three_wp()) + self.seq.advance() + self.seq.advance() + assert self.seq.current_index == 2 + + def test_advance_last_sets_succeeded(self): + self.seq.start(_three_wp()) + for _ in range(3): + self.seq.advance() + assert self.seq.state == WaypointSequencer.SUCCEEDED + + def test_advance_last_sets_done(self): + self.seq.start(_three_wp()) + for _ in range(3): + self.seq.advance() + assert self.seq.is_done + + def test_advance_on_idle_raises(self): + with pytest.raises(RuntimeError, match="RUNNING"): + self.seq.advance() + + def test_advance_on_aborted_raises(self): + self.seq.start(_three_wp()) + self.seq.abort() + with pytest.raises(RuntimeError, match="RUNNING"): + self.seq.advance() + + def test_no_current_waypoint_after_succeed(self): + self.seq.start(_three_wp()) + for _ in range(3): + self.seq.advance() + assert self.seq.current_waypoint is None + + +class TestSequencerAbort: + def setup_method(self): + self.seq = WaypointSequencer() + + def test_abort_transitions_state(self): + self.seq.start(_three_wp()) + self.seq.abort("Nav2 failed") + assert self.seq.state == WaypointSequencer.ABORTED + + def test_abort_sets_reason(self): + self.seq.start(_three_wp()) + self.seq.abort("timeout") + assert self.seq.abort_reason == "timeout" + + def test_abort_sets_done(self): + self.seq.start(_three_wp()) + self.seq.abort() + assert self.seq.is_done + + def test_abort_no_reason(self): + self.seq.start(_three_wp()) + self.seq.abort() + assert self.seq.abort_reason == "" + + +class TestSequencerCancel: + def setup_method(self): + self.seq = WaypointSequencer() + + def test_cancel_transitions_state(self): + self.seq.start(_three_wp()) + self.seq.cancel() + assert self.seq.state == WaypointSequencer.CANCELED + + def test_cancel_sets_done(self): + self.seq.start(_three_wp()) + self.seq.cancel() + assert self.seq.is_done + + def test_cancel_clears_current_waypoint(self): + self.seq.start(_three_wp()) + self.seq.cancel() + assert self.seq.current_waypoint is None + + +class TestSequencerProgress: + def setup_method(self): + self.seq = WaypointSequencer() + self.seq.start(_three_wp()) + + def test_initial_progress(self): + assert self.seq.progress == (0, 3) + + def test_progress_after_one(self): + self.seq.advance() + assert self.seq.progress == (1, 3) + + def test_remaining_initial(self): + assert self.seq.remaining == 3 + + def test_remaining_after_advance(self): + self.seq.advance() + assert self.seq.remaining == 2 + + def test_remaining_after_all(self): + for _ in range(3): + self.seq.advance() + assert self.seq.remaining == 0 + + +class TestSequencerStatusDict: + def setup_method(self): + self.seq = WaypointSequencer() + + def test_idle_status_dict(self): + d = self.seq.status_dict() + assert d["state"] == "idle" + assert d["total"] == 0 + assert d["current_wp"] is None + + def test_running_status_dict(self): + self.seq.start(_three_wp()) + d = self.seq.status_dict() + assert d["state"] == "running" + assert d["total"] == 3 + assert d["current_wp"] is not None + assert "x" in d["current_wp"] + + def test_succeeded_status_dict(self): + self.seq.start(_three_wp()) + for _ in range(3): + self.seq.advance() + d = self.seq.status_dict() + assert d["state"] == "succeeded" + assert d["remaining"] == 0 + + +# ─── 2-D TF maths (uwb_pose_bridge_node) ────────────────────────────────────── + +EPS = 1e-9 + + +class TestYawFromQ: + def test_zero_yaw(self): + qx, qy, qz, qw = _q_from_yaw(0.0) + assert abs(_yaw_from_q(qx, qy, qz, qw)) < EPS + + def test_90_degrees(self): + yaw = math.pi / 2 + qx, qy, qz, qw = _q_from_yaw(yaw) + assert abs(_yaw_from_q(qx, qy, qz, qw) - yaw) < 1e-6 + + def test_minus_90_degrees(self): + yaw = -math.pi / 2 + qx, qy, qz, qw = _q_from_yaw(yaw) + assert abs(_yaw_from_q(qx, qy, qz, qw) - yaw) < 1e-6 + + def test_180_degrees(self): + yaw = math.pi + qx, qy, qz, qw = _q_from_yaw(yaw) + # atan2 wraps to -π at exactly π; tolerate either + result = _yaw_from_q(qx, qy, qz, qw) + assert abs(abs(result) - math.pi) < 1e-6 + + def test_roundtrip_arbitrary(self): + for deg in (0, 30, 60, 90, 120, 150, 170, -45, -120): + yaw = math.radians(deg) + qx, qy, qz, qw = _q_from_yaw(yaw) + back = _yaw_from_q(qx, qy, qz, qw) + assert abs(back - yaw) < 1e-6, f"roundtrip failed for {deg}°" + + +class TestInvert2D: + def test_identity(self): + xi, yi, thi = _invert_2d(0.0, 0.0, 0.0) + assert abs(xi) < EPS and abs(yi) < EPS and abs(thi) < EPS + + def test_translation_only(self): + # invert pure translation: inv(T(3,4,0)) = T(-3,-4,0) + xi, yi, thi = _invert_2d(3.0, 4.0, 0.0) + assert abs(xi + 3.0) < EPS + assert abs(yi + 4.0) < EPS + assert abs(thi) < EPS + + def test_rotation_only(self): + # invert pure rotation by 90° + xi, yi, thi = _invert_2d(0.0, 0.0, math.pi / 2) + assert abs(thi + math.pi / 2) < EPS + + def test_compose_with_inverse_is_identity(self): + x, y, th = 2.5, -1.3, math.radians(47) + ix, iy, ith = _invert_2d(x, y, th) + rx, ry, rth = _compose_2d(x, y, th, ix, iy, ith) + assert abs(rx) < 1e-9 + assert abs(ry) < 1e-9 + assert abs(rth % (2 * math.pi)) < 1e-9 or abs(rth) < 1e-9 + + +class TestComputeMapOdomTf: + def test_identity_pose(self): + """If both poses are identity, map→odom is identity.""" + mx, my, mth = compute_map_odom_tf(0, 0, 0, 0, 0, 0) + assert abs(mx) < EPS and abs(my) < EPS and abs(mth) < EPS + + def test_odom_is_map(self): + """If robot is at same position in both frames, map→odom is identity.""" + pose = (1.0, 2.0, math.radians(45)) + mx, my, mth = compute_map_odom_tf(*pose, *pose) + assert abs(mx) < 1e-9 and abs(my) < 1e-9 and abs(mth) < 1e-9 + + def test_pure_translation_offset(self): + """map_bl at (5,0,0), odom_bl at (2,0,0) → map_odom at (3,0,0).""" + mx, my, mth = compute_map_odom_tf(5.0, 0.0, 0.0, 2.0, 0.0, 0.0) + assert abs(mx - 3.0) < 1e-9 + assert abs(my) < 1e-9 + assert abs(mth) < 1e-9 + + def test_tf_reconstruction(self): + """T_map_odom ⊗ T_odom_bl == T_map_bl (round-trip check).""" + map_bl = (3.0, 1.5, math.radians(30)) + odom_bl = (1.0, 0.5, math.radians(10)) + mo = compute_map_odom_tf(*map_bl, *odom_bl) + # Recompose: T_map_odom ⊗ T_odom_bl should equal T_map_bl + rx, ry, rth = _compose_2d(*mo, *odom_bl) + assert abs(rx - map_bl[0]) < 1e-9 + assert abs(ry - map_bl[1]) < 1e-9 + assert abs(rth - map_bl[2]) < 1e-9 + + def test_rotated_frame(self): + """Verify correctness with a 90° heading offset.""" + # map: robot at (1,0) facing 90°; odom: robot at (0,1) facing 0° + mo_x, mo_y, mo_th = compute_map_odom_tf( + 1.0, 0.0, math.pi / 2, + 0.0, 1.0, 0.0, + ) + # Recompose and check + rx, ry, rth = _compose_2d(mo_x, mo_y, mo_th, 0.0, 1.0, 0.0) + assert abs(rx - 1.0) < 1e-9 + assert abs(ry - 0.0) < 1e-9 + assert abs(rth - math.pi / 2) < 1e-9