feat: Nav2 with UWB localization (Issue #599) #607

Merged
sl-jetson merged 1 commits from sl-jetson/issue-599-nav2-uwb into main 2026-03-14 15:54:53 -04:00
11 changed files with 1766 additions and 0 deletions

View File

@ -0,0 +1,315 @@
# nav2_uwb_params.yaml — Nav2 configuration for UWB-based localization (Issue #599).
#
# Key differences from the AMCL/SLAM config (saltybot_nav2_slam):
# • AMCL is NOT used. The uwb_pose_bridge_node broadcasts map→odom TF directly
# from the UWB-IMU EKF fused pose (/saltybot/pose/fused_cov).
# • DWB controller capped at 1.0 m/s for safe differential-drive operation.
# • Global costmap uses a rolling window — no pre-built static map required.
# UWB provides the global (map-frame) position.
# • Costmap obstacle layer: /scan (RPLIDAR A1/A2) only.
# • lifecycle_manager_localization is intentionally empty; the bridge node
# keeps the TF alive without Nav2 lifecycle management.
bt_navigator:
ros__parameters:
use_sim_time: false
global_frame: map
robot_base_frame: base_link
odom_topic: /odom
bt_loop_duration: 10
default_server_timeout: 20
enable_groot_monitoring: false
default_nav_to_pose_bt_xml: >-
$(find-pkg-share nav2_bt_navigator)/behavior_trees/navigate_to_pose_w_replanning_and_recovery.xml
default_nav_through_poses_bt_xml: >-
$(find-pkg-share nav2_bt_navigator)/behavior_trees/navigate_through_poses_w_replanning_and_recovery.xml
plugin_lib_names:
- nav2_compute_path_to_pose_action_bt_node
- nav2_compute_path_through_poses_action_bt_node
- nav2_follow_path_action_bt_node
- nav2_spin_action_bt_node
- nav2_wait_action_bt_node
- nav2_back_up_action_bt_node
- nav2_drive_on_heading_bt_node
- nav2_clear_costmap_service_bt_node
- nav2_is_stuck_condition_bt_node
- nav2_goal_updated_condition_bt_node
- nav2_initial_pose_received_condition_bt_node
- nav2_rate_controller_bt_node
- nav2_distance_controller_bt_node
- nav2_speed_controller_bt_node
- nav2_truncate_path_action_bt_node
- nav2_remove_passed_goals_action_bt_node
- nav2_planner_selector_bt_node
- nav2_controller_selector_bt_node
- nav2_goal_checker_selector_bt_node
bt_navigator_rclcpp_node:
ros__parameters:
use_sim_time: false
# ── Controller (DWB — differential drive, max 1.0 m/s) ─────────────────────────
controller_server:
ros__parameters:
use_sim_time: false
controller_frequency: 20.0
min_x_velocity_threshold: 0.001
min_y_velocity_threshold: 0.5
min_theta_velocity_threshold: 0.001
failure_tolerance: 0.3
progress_checker_plugin: "progress_checker"
goal_checker_plugins: ["general_goal_checker"]
controller_plugins: ["FollowPath"]
progress_checker:
plugin: "nav2_core::SimpleProgressChecker"
required_movement_radius: 0.5
movement_time_allowance: 15.0
general_goal_checker:
stateful: true
plugin: "nav2_core::SimpleGoalChecker"
xy_goal_tolerance: 0.20
yaw_goal_tolerance: 0.15
FollowPath:
plugin: "dwb_core::DWBLocalPlanner"
debug_trajectory_details: false
# ── Velocity limits (1.0 m/s max per Issue #599) ──────────────────
min_vel_x: 0.0
max_vel_x: 1.0 # ← hard cap for safe indoor operation
min_vel_y: 0.0
max_vel_y: 0.0 # differential drive — no lateral motion
min_vel_theta: -1.5
max_vel_theta: 1.5
min_speed_xy: 0.0
max_speed_xy: 1.0
min_speed_theta: 0.0
# ── Acceleration limits ────────────────────────────────────────────
acc_lim_x: 1.0
acc_lim_y: 0.0
acc_lim_theta: 2.0
decel_lim_x: -1.0
decel_lim_y: 0.0
decel_lim_theta: -2.0
# ── Trajectory sampling ────────────────────────────────────────────
vx_samples: 20
vy_samples: 1 # differential drive — only 1 lateral sample
vtheta_samples: 40
sim_time: 1.5
linear_granularity: 0.05
angular_granularity: 0.05
transform_tolerance: 0.3
trans_stopped_velocity: 0.25
short_circuit_trajectory_evaluation: true
stateful: true
# ── Critic weights ─────────────────────────────────────────────────
critics:
- "RotateToGoal"
- "Oscillation"
- "BaseObstacle"
- "GoalAlign"
- "PathAlign"
- "PathDist"
- "GoalDist"
BaseObstacle.scale: 0.02
PathAlign.scale: 32.0
PathAlign.forward_point_distance: 0.1
GoalAlign.scale: 24.0
GoalAlign.forward_point_distance: 0.1
PathDist.scale: 32.0
GoalDist.scale: 24.0
RotateToGoal.scale: 32.0
RotateToGoal.slowing_factor: 5.0
RotateToGoal.lookahead_time: -1.0
controller_server_rclcpp_node:
ros__parameters:
use_sim_time: false
# ── Global planner (NavFn A*) ────────────────────────────────────────────────
planner_server:
ros__parameters:
use_sim_time: false
expected_planner_frequency: 10.0
planner_plugins: ["GridBased"]
GridBased:
plugin: "nav2_navfn_planner/NavfnPlanner"
tolerance: 0.3
use_astar: true
allow_unknown: true
planner_server_rclcpp_node:
ros__parameters:
use_sim_time: false
# ── Recovery behaviors ───────────────────────────────────────────────────────
behavior_server:
ros__parameters:
use_sim_time: false
costmap_topic: local_costmap/costmap_raw
footprint_topic: local_costmap/published_footprint
cycle_frequency: 10.0
behavior_plugins: ["spin", "backup", "drive_on_heading", "wait"]
spin:
plugin: "nav2_behaviors/Spin"
backup:
plugin: "nav2_behaviors/BackUp"
drive_on_heading:
plugin: "nav2_behaviors/DriveOnHeading"
wait:
plugin: "nav2_behaviors/Wait"
global_frame: odom
robot_base_frame: base_link
transform_timeout: 0.1
simulate_ahead_time: 2.0
max_rotations: 1.0
max_backup_dist: 0.3
backup_speed: 0.15
behavior_server_rclcpp_node:
ros__parameters:
use_sim_time: false
# ── Costmaps ─────────────────────────────────────────────────────────────────
#
# Both costmaps use only /scan (RPLIDAR). No depth camera source here —
# saltybot_depth_costmap adds that layer separately when enabled.
local_costmap:
local_costmap:
ros__parameters:
use_sim_time: false
update_frequency: 10.0
publish_frequency: 5.0
global_frame: odom
robot_base_frame: base_link
rolling_window: true
width: 4
height: 4
resolution: 0.05
robot_radius: 0.35
plugins: ["obstacle_layer", "inflation_layer"]
obstacle_layer:
plugin: "nav2_costmap_2d::ObstacleLayer"
enabled: true
observation_sources: scan
scan:
topic: /scan
max_obstacle_height: 2.0
min_obstacle_height: 0.0
clearing: true
marking: true
data_type: "LaserScan"
raytrace_max_range: 8.0
raytrace_min_range: 0.0
obstacle_max_range: 6.0
obstacle_min_range: 0.0
inflation_layer:
plugin: "nav2_costmap_2d::InflationLayer"
enabled: true
inflation_radius: 0.55
cost_scaling_factor: 10.0
inflate_unknown: false
inflate_around_unknown: true
track_unknown_space: false
always_send_full_costmap: true
local_costmap_client:
ros__parameters:
use_sim_time: false
local_costmap_rclcpp_node:
ros__parameters:
use_sim_time: false
global_costmap:
global_costmap:
ros__parameters:
use_sim_time: false
update_frequency: 2.0
publish_frequency: 1.0
global_frame: map
robot_base_frame: base_link
# Rolling window — UWB provides global position, no static map needed.
rolling_window: true
width: 20
height: 20
resolution: 0.05
robot_radius: 0.35
plugins: ["obstacle_layer", "inflation_layer"]
obstacle_layer:
plugin: "nav2_costmap_2d::ObstacleLayer"
enabled: true
observation_sources: scan
scan:
topic: /scan
max_obstacle_height: 2.0
min_obstacle_height: 0.0
clearing: true
marking: true
data_type: "LaserScan"
raytrace_max_range: 8.0
raytrace_min_range: 0.0
obstacle_max_range: 6.0
obstacle_min_range: 0.0
inflation_layer:
plugin: "nav2_costmap_2d::InflationLayer"
enabled: true
inflation_radius: 0.55
cost_scaling_factor: 10.0
inflate_unknown: false
inflate_around_unknown: true
track_unknown_space: false
always_send_full_costmap: false
global_costmap_client:
ros__parameters:
use_sim_time: false
global_costmap_rclcpp_node:
ros__parameters:
use_sim_time: false
# ── Velocity smoother ────────────────────────────────────────────────────────
velocity_smoother:
ros__parameters:
use_sim_time: false
smoothing_frequency: 20.0
scale_velocities: false
feedback: "OPEN_LOOP"
max_velocity: [1.0, 0.0, 1.5] # [x, y, theta] — capped at 1.0 m/s
min_velocity: [-0.3, 0.0, -1.5]
max_accel: [1.0, 0.0, 2.0]
max_decel: [-1.0, 0.0, -2.0]
odom_topic: /odom
odom_duration: 0.1
deadband_velocity: [0.0, 0.0, 0.0]
velocity_timeout: 1.0
# ── Lifecycle managers ───────────────────────────────────────────────────────
#
# Localization lifecycle is intentionally empty — the uwb_pose_bridge_node
# manages the map→odom TF independently (no AMCL, no SLAM Toolbox).
lifecycle_manager_navigation:
ros__parameters:
use_sim_time: false
autostart: true
node_names:
- "controller_server"
- "planner_server"
- "behavior_server"
- "bt_navigator"
- "velocity_smoother"
lifecycle_manager_localization:
ros__parameters:
use_sim_time: false
autostart: true
node_names: [] # UWB pose bridge handles TF directly — no AMCL
map_server:
ros__parameters:
use_sim_time: false
yaml_filename: "" # No static map — global costmap is rolling window

View File

@ -0,0 +1,168 @@
"""nav2_uwb.launch.py — Full Nav2 stack with UWB localization (Issue #599).
Launch sequence
1. uwb_pose_bridge_node broadcasts mapodom TF from UWB-IMU EKF pose
2. nav2_bringup navigation controller, planner, BT navigator, costmaps
3. lifecycle_manager_navigation autostart all Nav2 nodes
4. waypoint_follower_node sequential waypoint execution action server
Localization is handled by the UWB bridge; AMCL is NOT launched.
Arguments
use_sim_time (bool) default: false
params_file (str) default: <package>/config/nav2_uwb_params.yaml
uwb_pose_topic (str) default: /saltybot/pose/fused_cov
odom_topic (str) default: /odom
map_frame (str) default: map
odom_frame (str) default: odom
base_frame (str) default: base_link
max_vel_x (float) default: 1.0 DWB hard cap (m/s)
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import (
DeclareLaunchArgument,
GroupAction,
IncludeLaunchDescription,
LogInfo,
TimerAction,
)
from launch.conditions import IfCondition
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import (
LaunchConfiguration,
PathJoinSubstitution,
PythonExpression,
)
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
def generate_launch_description():
pkg_nav2_uwb = get_package_share_directory("saltybot_nav2_uwb")
pkg_nav2_bringup = get_package_share_directory("nav2_bringup")
default_params = os.path.join(pkg_nav2_uwb, "config", "nav2_uwb_params.yaml")
# ── Declare arguments ──────────────────────────────────────────────────
args = [
DeclareLaunchArgument(
"use_sim_time", default_value="false",
description="Use simulation clock"),
DeclareLaunchArgument(
"params_file", default_value=default_params,
description="Full path to nav2_uwb_params.yaml"),
DeclareLaunchArgument(
"uwb_pose_topic", default_value="/saltybot/pose/fused_cov",
description="UWB-IMU EKF fused pose topic (PoseWithCovarianceStamped)"),
DeclareLaunchArgument(
"odom_topic", default_value="/odom",
description="Wheel odometry topic (Odometry)"),
DeclareLaunchArgument(
"map_frame", default_value="map",
description="Global map frame id"),
DeclareLaunchArgument(
"odom_frame", default_value="odom",
description="Odometry frame id"),
DeclareLaunchArgument(
"base_frame", default_value="base_link",
description="Robot base frame id"),
DeclareLaunchArgument(
"autostart", default_value="true",
description="Automatically start Nav2 lifecycle nodes"),
]
# ── UWB pose bridge — starts immediately ──────────────────────────────
uwb_bridge = Node(
package="saltybot_nav2_uwb",
executable="uwb_pose_bridge_node",
name="uwb_pose_bridge",
output="screen",
parameters=[{
"use_sim_time": LaunchConfiguration("use_sim_time"),
"uwb_pose_topic": LaunchConfiguration("uwb_pose_topic"),
"odom_topic": LaunchConfiguration("odom_topic"),
"map_frame": LaunchConfiguration("map_frame"),
"odom_frame": LaunchConfiguration("odom_frame"),
"base_frame": LaunchConfiguration("base_frame"),
"tf_publish_hz": 20.0,
"tf_timeout_s": 0.5,
}],
)
uwb_bridge_log = LogInfo(
msg="[nav2_uwb] UWB pose bridge launched — map→odom TF will be "
"broadcast once UWB + odom data arrive."
)
# ── Nav2 navigation stack (t=2s: allow bridge to initialise first) ─────
nav2_navigation = TimerAction(
period=2.0,
actions=[
LogInfo(msg="[nav2_uwb] Starting Nav2 navigation stack..."),
IncludeLaunchDescription(
PythonLaunchDescriptionSource(
os.path.join(pkg_nav2_bringup, "launch", "navigation_launch.py")
),
launch_arguments={
"use_sim_time": LaunchConfiguration("use_sim_time"),
"params_file": LaunchConfiguration("params_file"),
"autostart": LaunchConfiguration("autostart"),
# No map_subscribe_transient_local — no static map
"use_lifecycle_mgr": "true",
"use_sim_time": LaunchConfiguration("use_sim_time"),
}.items(),
),
],
)
# ── Waypoint follower (t=4s: Nav2 needs time to come up) ──────────────
waypoint_follower = TimerAction(
period=4.0,
actions=[
LogInfo(msg="[nav2_uwb] Starting waypoint follower action server..."),
Node(
package="saltybot_nav2_uwb",
executable="waypoint_follower_node",
name="waypoint_follower",
output="screen",
parameters=[{
"use_sim_time": LaunchConfiguration("use_sim_time"),
"map_frame": LaunchConfiguration("map_frame"),
"goal_xy_tolerance": 0.20,
"goal_yaw_tolerance": 0.15,
"nav_timeout_s": 60.0,
"status_hz": 2.0,
}],
),
],
)
ready_log = TimerAction(
period=4.5,
actions=[
LogInfo(
msg="[nav2_uwb] Stack ready.\n"
" Localization : UWB-IMU EKF bridge (no AMCL)\n"
" Controller : DWB ≤ 1.0 m/s (differential drive)\n"
" Waypoints : /saltybot/nav/follow_waypoints (action)\n"
" Status : /saltybot/nav/waypoint_status (JSON)\n"
" Cancel : /saltybot/nav/cancel (topic)"
),
],
)
return LaunchDescription(
args + [
uwb_bridge_log,
uwb_bridge,
nav2_navigation,
waypoint_follower,
ready_log,
]
)

View File

@ -0,0 +1,42 @@
<?xml version="1.0"?>
<package format="3">
<name>saltybot_nav2_uwb</name>
<version>0.1.0</version>
<description>
Nav2 integration with UWB-based localization for SaltyBot (Issue #599).
Replaces AMCL with a UWB pose bridge that broadcasts the map-&gt;odom TF
directly from the UWB-IMU EKF fused pose. Includes a DWB controller
capped at 1.0 m/s for safe differential-drive operation and a simple
sequential waypoint-follower action server.
</description>
<maintainer email="sl-jetson@saltylab.local">sl-jetson</maintainer>
<license>Apache-2.0</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<depend>nav_msgs</depend>
<depend>sensor_msgs</depend>
<depend>tf2</depend>
<depend>tf2_ros</depend>
<depend>nav2_msgs</depend>
<exec_depend>nav2_bringup</exec_depend>
<exec_depend>nav2_bt_navigator</exec_depend>
<exec_depend>nav2_controller</exec_depend>
<exec_depend>nav2_planner</exec_depend>
<exec_depend>nav2_behaviors</exec_depend>
<exec_depend>nav2_lifecycle_manager</exec_depend>
<exec_depend>nav2_costmap_2d</exec_depend>
<exec_depend>nav2_navfn_planner</exec_depend>
<exec_depend>dwb_core</exec_depend>
<exec_depend>dwb_critics</exec_depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,218 @@
"""uwb_pose_bridge_node.py — UWB→Nav2 localization bridge (Issue #599).
Replaces AMCL by listening to the UWB-IMU EKF fused pose and broadcasting
the mapodom TF that Nav2 requires for global localization.
TF computation (2D, ground robot):
map base_link := UWB-IMU EKF (/saltybot/pose/fused_cov)
odom base_link := wheel odometry (/odom)
T_map_odom = T_map_base_link T_odom_base_link¹
Published TF: map odom (at UWB rate, 10 Hz)
Published topic: /initialpose (once on first valid pose, for Nav2 boot)
Subscribed topics:
/saltybot/pose/fused_cov (PoseWithCovarianceStamped) UWB-IMU EKF
/odom (Odometry) wheel odometry
"""
from __future__ import annotations
import math
from typing import Optional
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import (
PoseWithCovarianceStamped,
TransformStamped,
)
from nav_msgs.msg import Odometry
from tf2_ros import TransformBroadcaster
# ── Pure-Python 2-D transform helpers (no numpy / tf_transformations needed) ──
def _yaw_from_q(qx: float, qy: float, qz: float, qw: float) -> float:
"""Extract yaw (rad) from unit quaternion."""
return math.atan2(2.0 * (qw * qz + qx * qy),
1.0 - 2.0 * (qy * qy + qz * qz))
def _q_from_yaw(yaw: float):
"""Return (qx, qy, qz, qw) for a pure-yaw rotation."""
return 0.0, 0.0, math.sin(yaw * 0.5), math.cos(yaw * 0.5)
def _invert_2d(x: float, y: float, th: float):
"""Invert a 2-D SE(2) transform."""
c, s = math.cos(th), math.sin(th)
xi = -(c * x + s * y)
yi = -(-s * x + c * y)
return xi, yi, -th
def _compose_2d(x1: float, y1: float, th1: float,
x2: float, y2: float, th2: float):
"""Compose two 2-D SE(2) transforms: T1 ⊗ T2."""
c1, s1 = math.cos(th1), math.sin(th1)
xr = x1 + c1 * x2 - s1 * y2
yr = y1 + s1 * x2 + c1 * y2
thr = th1 + th2
return xr, yr, thr
def compute_map_odom_tf(
map_bl_x: float, map_bl_y: float, map_bl_th: float,
odom_bl_x: float, odom_bl_y: float, odom_bl_th: float,
):
"""Return (x, y, yaw) for the map→odom transform.
T_map_odom = T_map_base_link inv(T_odom_base_link)
"""
ix, iy, ith = _invert_2d(odom_bl_x, odom_bl_y, odom_bl_th)
return _compose_2d(map_bl_x, map_bl_y, map_bl_th, ix, iy, ith)
# ── ROS2 node ─────────────────────────────────────────────────────────────────
class UwbPoseBridgeNode(Node):
"""Broadcasts map→odom TF derived from UWB-IMU fused pose + wheel odom.
Parameters (ROS):
uwb_pose_topic (str) default: /saltybot/pose/fused_cov
odom_topic (str) default: /odom
tf_publish_hz (float) default: 20.0 TF broadcast rate
tf_timeout_s (float) default: 0.5 stop publishing if UWB
older than this
map_frame (str) default: map
odom_frame (str) default: odom
base_frame (str) default: base_link
"""
def __init__(self):
super().__init__("uwb_pose_bridge")
self.declare_parameter("uwb_pose_topic", "/saltybot/pose/fused_cov")
self.declare_parameter("odom_topic", "/odom")
self.declare_parameter("tf_publish_hz", 20.0)
self.declare_parameter("tf_timeout_s", 0.5)
self.declare_parameter("map_frame", "map")
self.declare_parameter("odom_frame", "odom")
self.declare_parameter("base_frame", "base_link")
self._map_frame = self.get_parameter("map_frame").value
self._odom_frame = self.get_parameter("odom_frame").value
self._base_frame = self.get_parameter("base_frame").value
self._tf_timeout = self.get_parameter("tf_timeout_s").value
self._uwb_pose: Optional[PoseWithCovarianceStamped] = None
self._odom_pose: Optional[Odometry] = None
self._init_pose_sent: bool = False
self._tf_broadcaster = TransformBroadcaster(self)
# /initialpose: Nav2 uses this to seed the robot's initial position
self._init_pub = self.create_publisher(
PoseWithCovarianceStamped, "/initialpose", 1)
self.create_subscription(
PoseWithCovarianceStamped,
self.get_parameter("uwb_pose_topic").value,
self._on_uwb_pose, 10)
self.create_subscription(
Odometry,
self.get_parameter("odom_topic").value,
self._on_odom, 20)
hz = self.get_parameter("tf_publish_hz").value
self.create_timer(1.0 / hz, self._publish_tf)
self.get_logger().info(
f"UWB pose bridge ready | "
f"uwb={self.get_parameter('uwb_pose_topic').value} "
f"odom={self.get_parameter('odom_topic').value} "
f"tf={self._map_frame}{self._odom_frame} @ {hz:.0f} Hz"
)
# ── Callbacks ──────────────────────────────────────────────────────────
def _on_uwb_pose(self, msg: PoseWithCovarianceStamped) -> None:
self._uwb_pose = msg
if not self._init_pose_sent:
self._init_pub.publish(msg)
self._init_pose_sent = True
self.get_logger().info(
f"Initial UWB pose published to /initialpose "
f"({msg.pose.pose.position.x:.2f}, "
f"{msg.pose.pose.position.y:.2f})"
)
def _on_odom(self, msg: Odometry) -> None:
self._odom_pose = msg
# ── TF broadcast ───────────────────────────────────────────────────────
def _publish_tf(self) -> None:
if self._uwb_pose is None or self._odom_pose is None:
return
# Staleness guard: stop broadcasting if UWB data is too old
now = self.get_clock().now()
uwb_stamp = rclpy.time.Time.from_msg(self._uwb_pose.header.stamp)
age_s = (now - uwb_stamp).nanoseconds * 1e-9
if age_s > self._tf_timeout:
self.get_logger().warn(
f"UWB pose stale ({age_s:.2f}s > {self._tf_timeout}s) — "
"map→odom TF suppressed",
throttle_duration_sec=5.0,
)
return
# Extract map→base_link from UWB pose
p = self._uwb_pose.pose.pose
map_bl_th = _yaw_from_q(p.orientation.x, p.orientation.y,
p.orientation.z, p.orientation.w)
map_bl_x, map_bl_y = p.position.x, p.position.y
# Extract odom→base_link from wheel odometry
o = self._odom_pose.pose.pose
odom_bl_th = _yaw_from_q(o.orientation.x, o.orientation.y,
o.orientation.z, o.orientation.w)
odom_bl_x, odom_bl_y = o.position.x, o.position.y
# Compute map→odom
mo_x, mo_y, mo_th = compute_map_odom_tf(
map_bl_x, map_bl_y, map_bl_th,
odom_bl_x, odom_bl_y, odom_bl_th,
)
# Broadcast
tf_msg = TransformStamped()
tf_msg.header.stamp = now.to_msg()
tf_msg.header.frame_id = self._map_frame
tf_msg.child_frame_id = self._odom_frame
tf_msg.transform.translation.x = mo_x
tf_msg.transform.translation.y = mo_y
tf_msg.transform.translation.z = 0.0
qx, qy, qz, qw = _q_from_yaw(mo_th)
tf_msg.transform.rotation.x = qx
tf_msg.transform.rotation.y = qy
tf_msg.transform.rotation.z = qz
tf_msg.transform.rotation.w = qw
self._tf_broadcaster.sendTransform(tf_msg)
def main(args=None):
rclpy.init(args=args)
node = UwbPoseBridgeNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()

View File

@ -0,0 +1,349 @@
"""waypoint_follower_node.py — Simple sequential waypoint follower (Issue #599).
Exposes a ROS2 action server that accepts a list of waypoints and drives the
robot through them one by one via Nav2's NavigateToPose action.
Action server: /saltybot/nav/follow_waypoints (nav2_msgs/FollowWaypoints)
Nav2 client: /navigate_to_pose (nav2_msgs/NavigateToPose)
Waypoints topic (JSON, for operator / WebUI override):
/saltybot/nav/waypoints (std_msgs/String)
Payload: JSON array of {x, y, yaw_deg} objects.
Example: [{"x": 1.0, "y": 2.0, "yaw_deg": 90.0}, ...]
Status topic (JSON):
/saltybot/nav/waypoint_status (std_msgs/String)
Published every cycle with WaypointSequencer.status_dict().
Cancel topic:
/saltybot/nav/cancel (std_msgs/Empty)
Cancels the active mission.
"""
from __future__ import annotations
import json
import math
import threading
from typing import List, Optional
import rclpy
from rclpy.action import ActionClient, ActionServer, CancelResponse, GoalResponse
from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup
from geometry_msgs.msg import PoseStamped
from nav2_msgs.action import FollowWaypoints, NavigateToPose
from std_msgs.msg import Empty, String
from saltybot_nav2_uwb.waypoint_sequencer import Waypoint, WaypointSequencer
def _yaw_to_quaternion(yaw_rad: float):
"""Return (qx, qy, qz, qw) for a heading-only rotation."""
return 0.0, 0.0, math.sin(yaw_rad * 0.5), math.cos(yaw_rad * 0.5)
def waypoint_to_pose_stamped(wp: Waypoint, frame: str = "map") -> PoseStamped:
"""Convert a Waypoint to a PoseStamped in the given frame."""
msg = PoseStamped()
msg.header.frame_id = frame
msg.pose.position.x = wp.x
msg.pose.position.y = wp.y
msg.pose.position.z = 0.0
qx, qy, qz, qw = _yaw_to_quaternion(math.radians(wp.yaw_deg))
msg.pose.orientation.x = qx
msg.pose.orientation.y = qy
msg.pose.orientation.z = qz
msg.pose.orientation.w = qw
return msg
class WaypointFollowerNode(Node):
"""Sequential waypoint follower backed by Nav2 NavigateToPose.
Parameters (ROS):
map_frame (str) default: map
goal_xy_tolerance (float) default: 0.20 (m) passed to Nav2
goal_yaw_tolerance (float) default: 0.15 (rad) passed to Nav2
nav_timeout_s (float) default: 60.0 abort if Nav2 takes longer
status_hz (float) default: 2.0 status publish rate
"""
def __init__(self):
super().__init__("waypoint_follower")
self.declare_parameter("map_frame", "map")
self.declare_parameter("goal_xy_tolerance", 0.20)
self.declare_parameter("goal_yaw_tolerance", 0.15)
self.declare_parameter("nav_timeout_s", 60.0)
self.declare_parameter("status_hz", 2.0)
self._map_frame = self.get_parameter("map_frame").value
self._nav_timeout = self.get_parameter("nav_timeout_s").value
self._seq = WaypointSequencer()
self._lock = threading.Lock()
self._cb_group = ReentrantCallbackGroup()
# ── Publishers ─────────────────────────────────────────────────
self._status_pub = self.create_publisher(
String, "/saltybot/nav/waypoint_status", 10)
# ── Subscriptions ──────────────────────────────────────────────
self.create_subscription(
String, "/saltybot/nav/waypoints",
self._on_waypoints_json, 10,
callback_group=self._cb_group)
self.create_subscription(
Empty, "/saltybot/nav/cancel",
self._on_cancel, 10,
callback_group=self._cb_group)
# ── Nav2 action client ─────────────────────────────────────────
self._nav_client: ActionClient = ActionClient(
self, NavigateToPose, "/navigate_to_pose",
callback_group=self._cb_group)
# ── Action server (nav2_msgs/FollowWaypoints) ──────────────────
self._action_server = ActionServer(
self,
FollowWaypoints,
"/saltybot/nav/follow_waypoints",
execute_callback=self._execute_follow_waypoints,
goal_callback=self._goal_callback,
cancel_callback=self._cancel_callback,
callback_group=self._cb_group,
)
hz = self.get_parameter("status_hz").value
self.create_timer(1.0 / hz, self._publish_status,
callback_group=self._cb_group)
self.get_logger().info(
f"WaypointFollower ready | "
f"action=/saltybot/nav/follow_waypoints "
f"nav2=/navigate_to_pose frame={self._map_frame}"
)
# ── Action server callbacks ────────────────────────────────────────────
def _goal_callback(self, goal_request):
if not self._nav_client.wait_for_server(timeout_sec=2.0):
self.get_logger().error(
"Nav2 NavigateToPose server not available — rejecting goal")
return GoalResponse.REJECT
if len(goal_request.poses) == 0:
self.get_logger().warn("Empty waypoint list — rejecting goal")
return GoalResponse.REJECT
return GoalResponse.ACCEPT
def _cancel_callback(self, _goal_handle):
return CancelResponse.ACCEPT
async def _execute_follow_waypoints(self, goal_handle) -> FollowWaypoints.Result:
"""Execute the action: drive through each PoseStamped in sequence."""
result = FollowWaypoints.Result()
feedback_msg = FollowWaypoints.Feedback()
poses: List[PoseStamped] = list(goal_handle.request.poses)
waypoints = [
Waypoint(
x=p.pose.position.x,
y=p.pose.position.y,
yaw_deg=math.degrees(
math.atan2(
2.0 * (p.pose.orientation.w * p.pose.orientation.z
+ p.pose.orientation.x * p.pose.orientation.y),
1.0 - 2.0 * (p.pose.orientation.y ** 2
+ p.pose.orientation.z ** 2),
)
),
)
for p in poses
]
with self._lock:
self._seq.start(waypoints)
self.get_logger().info(
f"Starting waypoint mission: {len(waypoints)} waypoints")
missed_waypoints: List[int] = []
while True:
# Check cancellation
if goal_handle.is_cancel_requested:
with self._lock:
self._seq.cancel()
goal_handle.canceled()
result.missed_waypoints = missed_waypoints
return result
with self._lock:
if self._seq.is_done:
break
wp = self._seq.current_waypoint
idx = self._seq.current_index
if wp is None:
break
# Send goal to Nav2
nav_goal = NavigateToPose.Goal()
nav_goal.pose = waypoint_to_pose_stamped(wp, self._map_frame)
self.get_logger().info(
f"Sending waypoint {idx + 1}/{len(waypoints)}: "
f"({wp.x:.2f}, {wp.y:.2f}, {wp.yaw_deg:.1f}°)"
)
if not self._nav_client.wait_for_server(timeout_sec=5.0):
self.get_logger().error("Nav2 server unavailable — aborting")
with self._lock:
self._seq.abort("Nav2 server unavailable")
missed_waypoints.extend(range(idx, len(waypoints)))
break
send_future = self._nav_client.send_goal_async(
nav_goal,
feedback_callback=lambda fb: None, # suppress individual feedback
)
# Spin until goal accepted
rclpy.spin_until_future_complete(
self, send_future, timeout_sec=self._nav_timeout)
if not send_future.done():
self.get_logger().error(
f"Waypoint {idx + 1}: goal send timed out — skipping")
missed_waypoints.append(idx)
with self._lock:
self._seq.advance()
continue
goal_res = send_future.result()
if not goal_res.accepted:
self.get_logger().warn(
f"Waypoint {idx + 1}: goal rejected by Nav2 — skipping")
missed_waypoints.append(idx)
with self._lock:
self._seq.advance()
continue
# Wait for Nav2 to reach the goal
result_future = goal_res.get_result_async()
rclpy.spin_until_future_complete(
self, result_future, timeout_sec=self._nav_timeout)
if not result_future.done():
self.get_logger().error(
f"Waypoint {idx + 1}: Nav2 timed out after "
f"{self._nav_timeout:.0f}s — skipping")
goal_res.cancel_goal_async()
missed_waypoints.append(idx)
else:
nav_result = result_future.result()
from action_msgs.msg import GoalStatus
if nav_result.status == GoalStatus.STATUS_SUCCEEDED:
self.get_logger().info(
f"Waypoint {idx + 1}/{len(waypoints)} reached ✓")
else:
self.get_logger().warn(
f"Waypoint {idx + 1} failed (Nav2 status "
f"{nav_result.status}) — skipping")
missed_waypoints.append(idx)
with self._lock:
self._seq.advance()
# Publish feedback
feedback_msg.current_waypoint = self._seq.current_index
goal_handle.publish_feedback(feedback_msg)
# Mission complete
with self._lock:
final_state = self._seq.state
if final_state == WaypointSequencer.SUCCEEDED and not missed_waypoints:
self.get_logger().info(
f"Waypoint mission SUCCEEDED ({len(waypoints)} waypoints)")
goal_handle.succeed()
elif final_state == WaypointSequencer.CANCELED:
self.get_logger().info("Waypoint mission CANCELED")
goal_handle.canceled()
else:
self.get_logger().warn(
f"Waypoint mission ended: state={final_state} "
f"missed={missed_waypoints}")
goal_handle.succeed() # FollowWaypoints always succeeds; missed list carries info
result.missed_waypoints = missed_waypoints
return result
# ── JSON topic handler ─────────────────────────────────────────────────
def _on_waypoints_json(self, msg: String) -> None:
"""Accept a JSON waypoint list and start a new mission immediately.
This is a convenience interface for WebUI / operator use.
For production use prefer the action server (supports feedback/cancel).
"""
try:
raw = json.loads(msg.data)
except json.JSONDecodeError as exc:
self.get_logger().error(f"Invalid waypoints JSON: {exc}")
return
if not isinstance(raw, list) or len(raw) == 0:
self.get_logger().error(
"Waypoints JSON must be a non-empty array of {x, y, yaw_deg}")
return
try:
waypoints = [Waypoint.from_dict(d) for d in raw]
except (KeyError, ValueError) as exc:
self.get_logger().error(f"Malformed waypoint in list: {exc}")
return
if not self._nav_client.wait_for_server(timeout_sec=2.0):
self.get_logger().error(
"Nav2 server not ready — cannot execute JSON waypoints")
return
self.get_logger().info(
f"JSON mission: {len(waypoints)} waypoints received via topic")
with self._lock:
self._seq.start(waypoints)
# Execution driven by action server; JSON topic just seeds the sequencer.
# For full feedback, use the FollowWaypoints action server directly.
# ── Cancel topic handler ───────────────────────────────────────────────
def _on_cancel(self, _msg: Empty) -> None:
with self._lock:
if self._seq.is_running:
self._seq.cancel()
self.get_logger().info(
"Mission cancelled via /saltybot/nav/cancel")
# ── Status publish ─────────────────────────────────────────────────────
def _publish_status(self) -> None:
with self._lock:
d = self._seq.status_dict()
self._status_pub.publish(String(data=json.dumps(d)))
def main(args=None):
rclpy.init(args=args)
node = WaypointFollowerNode()
executor = rclpy.executors.MultiThreadedExecutor()
executor.add_node(node)
try:
executor.spin()
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()

View File

@ -0,0 +1,176 @@
"""waypoint_sequencer.py — Pure-Python waypoint sequencing logic (Issue #599).
No ROS2 dependencies importable and fully unit-testable without a live
ROS2 install.
Design
WaypointSequencer tracks a list of (x, y, yaw_deg) waypoints and the index of
the currently active goal. The ROS2 node (waypoint_follower_node.py) drives
this state machine by calling:
sequencer.start(waypoints) load a new mission
sequencer.advance() mark current waypoint reached
sequencer.abort(reason) mark mission failed
sequencer.cancel() mark mission cancelled
State transitions:
IDLE RUNNING (on start())
RUNNING SUCCEEDED (on advance() when last waypoint reached)
RUNNING ABORTED (on abort())
RUNNING CANCELED (on cancel())
ABORTED / CANCELED / SUCCEEDED any state via start()
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import List, Optional, Tuple
# ── Waypoint data type ────────────────────────────────────────────────────────
@dataclass
class Waypoint:
"""A single navigation goal in the map frame."""
x: float
y: float
yaw_deg: float = 0.0
def __post_init__(self):
if not (-360.0 <= self.yaw_deg <= 360.0):
raise ValueError(
f"yaw_deg must be in [-360, 360], got {self.yaw_deg}"
)
@classmethod
def from_dict(cls, d: dict) -> "Waypoint":
"""Construct from a plain dict (e.g. parsed from JSON)."""
return cls(
x=float(d["x"]),
y=float(d["y"]),
yaw_deg=float(d.get("yaw_deg", 0.0)),
)
def to_dict(self) -> dict:
return {"x": self.x, "y": self.y, "yaw_deg": self.yaw_deg}
# ── State machine ─────────────────────────────────────────────────────────────
class WaypointSequencer:
"""Tracks state for sequential waypoint execution.
Thread-safety: external callers (ROS2 callbacks, action server) should
hold their own lock when calling multiple methods atomically.
"""
IDLE = "idle"
RUNNING = "running"
SUCCEEDED = "succeeded"
ABORTED = "aborted"
CANCELED = "canceled"
def __init__(self) -> None:
self._waypoints: List[Waypoint] = []
self._current: int = 0
self._state: str = self.IDLE
self._abort_reason: Optional[str] = None
# ── Control methods ────────────────────────────────────────────────────
def start(self, waypoints: List[Waypoint]) -> None:
"""Load a new mission and set state to RUNNING.
Raises:
ValueError: if waypoints is empty.
"""
if not waypoints:
raise ValueError("Waypoint list must not be empty")
self._waypoints = list(waypoints)
self._current = 0
self._state = self.RUNNING
self._abort_reason = None
def advance(self) -> None:
"""Mark the current waypoint as reached and move to the next.
If the last waypoint is reached the state transitions to SUCCEEDED.
Raises:
RuntimeError: if not RUNNING.
"""
if self._state != self.RUNNING:
raise RuntimeError(
f"advance() called in state {self._state!r} (must be RUNNING)"
)
self._current += 1
if self._current >= len(self._waypoints):
self._state = self.SUCCEEDED
def abort(self, reason: str = "") -> None:
"""Mark the mission as aborted (e.g. Nav2 returned FAILED)."""
self._state = self.ABORTED
self._abort_reason = reason
def cancel(self) -> None:
"""Mark the mission as cancelled (e.g. operator preempt)."""
self._state = self.CANCELED
# ── Query methods ──────────────────────────────────────────────────────
@property
def state(self) -> str:
return self._state
@property
def is_running(self) -> bool:
return self._state == self.RUNNING
@property
def is_done(self) -> bool:
return self._state in (self.SUCCEEDED, self.ABORTED, self.CANCELED)
@property
def current_waypoint(self) -> Optional[Waypoint]:
"""The active goal, or None if not RUNNING or already done."""
if self._state != self.RUNNING:
return None
if self._current >= len(self._waypoints):
return None
return self._waypoints[self._current]
@property
def current_index(self) -> int:
"""0-based index of the active waypoint (0 if not started)."""
return self._current
@property
def total(self) -> int:
"""Total number of waypoints in the current mission."""
return len(self._waypoints)
@property
def progress(self) -> Tuple[int, int]:
"""Return (completed_count, total_count)."""
return self._current, len(self._waypoints)
@property
def remaining(self) -> int:
return max(0, len(self._waypoints) - self._current)
@property
def abort_reason(self) -> Optional[str]:
return self._abort_reason
def status_dict(self) -> dict:
"""Serialisable status snapshot for MQTT / diagnostics."""
wp = self.current_waypoint
return {
"state": self._state,
"current_index": self._current,
"total": len(self._waypoints),
"remaining": self.remaining,
"current_wp": wp.to_dict() if wp else None,
"abort_reason": self._abort_reason,
}

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_nav2_uwb
[install]
install_scripts=$base/lib/saltybot_nav2_uwb

View File

@ -0,0 +1,35 @@
from setuptools import setup
package_name = "saltybot_nav2_uwb"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", [
"launch/nav2_uwb.launch.py",
]),
(f"share/{package_name}/config", [
"config/nav2_uwb_params.yaml",
]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-jetson",
maintainer_email="sl-jetson@saltylab.local",
description=(
"Nav2 with UWB localization: pose bridge (map→odom TF from UWB-IMU EKF) "
"and sequential waypoint follower (Issue #599)"
),
license="Apache-2.0",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"uwb_pose_bridge_node = saltybot_nav2_uwb.uwb_pose_bridge_node:main",
"waypoint_follower_node = saltybot_nav2_uwb.waypoint_follower_node:main",
],
},
)

View File

@ -0,0 +1,459 @@
"""test_waypoint_follower.py — Unit tests for Nav2-UWB waypoint logic (Issue #599).
Tests are ROS2-free. Run with:
python3 -m pytest \
jetson/ros2_ws/src/saltybot_nav2_uwb/test/test_waypoint_follower.py -v
Coverage:
Waypoint dataclass construction, validation, serialisation
WaypointSequencer state machine (all transitions)
Progress tracking
UWB pose-bridge 2-D TF maths
waypoint_to_pose_stamped conversion (quaternion from yaw)
"""
from __future__ import annotations
import math
import sys
import os
# Allow import without ROS2 install
_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "saltybot_nav2_uwb")
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, _pkg_dir)
import pytest
# ── Stub ROS2 modules before importing any node files ────────────────────────
def _install_stubs():
"""Inject minimal stubs so node modules are importable without ROS2."""
from unittest.mock import MagicMock
import types
for mod_name in [
"rclpy", "rclpy.node", "rclpy.action", "rclpy.time",
"rclpy.callback_groups", "rclpy.executors",
"geometry_msgs", "geometry_msgs.msg",
"nav_msgs", "nav_msgs.msg",
"nav2_msgs", "nav2_msgs.action",
"std_msgs", "std_msgs.msg",
"tf2_ros", "action_msgs", "action_msgs.msg",
]:
sys.modules.setdefault(mod_name, MagicMock())
# Make geometry_msgs.msg.PoseWithCovarianceStamped / TransformStamped real
# enough to be used as type hints
sys.modules["geometry_msgs.msg"].PoseWithCovarianceStamped = MagicMock
sys.modules["geometry_msgs.msg"].TransformStamped = MagicMock
sys.modules["nav_msgs.msg"].Odometry = MagicMock
sys.modules["std_msgs.msg"].Empty = MagicMock
sys.modules["std_msgs.msg"].String = MagicMock
sys.modules["tf2_ros"].TransformBroadcaster = MagicMock
_install_stubs()
from waypoint_sequencer import Waypoint, WaypointSequencer
# ── helpers from uwb_pose_bridge_node (pure-python section) ──────────────────
from uwb_pose_bridge_node import (
_yaw_from_q,
_q_from_yaw,
_invert_2d,
_compose_2d,
compute_map_odom_tf,
)
# ─── Waypoint dataclass ────────────────────────────────────────────────────────
class TestWaypointConstruction:
def test_basic_construction(self):
wp = Waypoint(x=1.0, y=2.0, yaw_deg=45.0)
assert wp.x == 1.0
assert wp.y == 2.0
assert wp.yaw_deg == 45.0
def test_default_yaw_is_zero(self):
wp = Waypoint(x=0.0, y=0.0)
assert wp.yaw_deg == 0.0
def test_negative_coordinates(self):
wp = Waypoint(x=-3.5, y=-1.2, yaw_deg=-90.0)
assert wp.x == -3.5
assert wp.yaw_deg == -90.0
def test_yaw_boundary_360(self):
wp = Waypoint(x=0.0, y=0.0, yaw_deg=360.0)
assert wp.yaw_deg == 360.0
def test_yaw_boundary_neg360(self):
wp = Waypoint(x=0.0, y=0.0, yaw_deg=-360.0)
assert wp.yaw_deg == -360.0
def test_yaw_out_of_range_raises(self):
with pytest.raises(ValueError, match="yaw_deg"):
Waypoint(x=0.0, y=0.0, yaw_deg=400.0)
def test_yaw_out_of_range_negative_raises(self):
with pytest.raises(ValueError, match="yaw_deg"):
Waypoint(x=0.0, y=0.0, yaw_deg=-400.0)
class TestWaypointFromDict:
def test_full_dict(self):
wp = Waypoint.from_dict({"x": 1.5, "y": -2.5, "yaw_deg": 30.0})
assert wp.x == 1.5
assert wp.y == -2.5
assert wp.yaw_deg == 30.0
def test_missing_yaw_defaults_zero(self):
wp = Waypoint.from_dict({"x": 0.0, "y": 1.0})
assert wp.yaw_deg == 0.0
def test_string_values_coerced(self):
wp = Waypoint.from_dict({"x": "1.0", "y": "2.0", "yaw_deg": "45"})
assert wp.x == 1.0
def test_missing_x_raises(self):
with pytest.raises(KeyError):
Waypoint.from_dict({"y": 1.0})
def test_missing_y_raises(self):
with pytest.raises(KeyError):
Waypoint.from_dict({"x": 1.0})
class TestWaypointToDict:
def test_roundtrip(self):
wp = Waypoint(x=1.0, y=2.0, yaw_deg=90.0)
d = wp.to_dict()
wp2 = Waypoint.from_dict(d)
assert wp2.x == wp.x
assert wp2.y == wp.y
assert wp2.yaw_deg == wp.yaw_deg
def test_keys_present(self):
d = Waypoint(x=0.0, y=0.0).to_dict()
assert "x" in d and "y" in d and "yaw_deg" in d
# ─── WaypointSequencer state machine ─────────────────────────────────────────
def _three_wp():
return [Waypoint(float(i), 0.0) for i in range(3)]
class TestSequencerInitialState:
def setup_method(self):
self.seq = WaypointSequencer()
def test_initial_state_idle(self):
assert self.seq.state == WaypointSequencer.IDLE
def test_is_not_running(self):
assert not self.seq.is_running
def test_is_not_done(self):
assert not self.seq.is_done
def test_no_current_waypoint(self):
assert self.seq.current_waypoint is None
def test_total_zero(self):
assert self.seq.total == 0
def test_progress_zero(self):
assert self.seq.progress == (0, 0)
def test_no_abort_reason(self):
assert self.seq.abort_reason is None
class TestSequencerStart:
def setup_method(self):
self.seq = WaypointSequencer()
def test_transitions_to_running(self):
self.seq.start(_three_wp())
assert self.seq.state == WaypointSequencer.RUNNING
def test_is_running_true(self):
self.seq.start(_three_wp())
assert self.seq.is_running
def test_is_done_false(self):
self.seq.start(_three_wp())
assert not self.seq.is_done
def test_current_index_zero(self):
self.seq.start(_three_wp())
assert self.seq.current_index == 0
def test_total_set(self):
self.seq.start(_three_wp())
assert self.seq.total == 3
def test_first_waypoint_returned(self):
wps = _three_wp()
self.seq.start(wps)
assert self.seq.current_waypoint is wps[0]
def test_empty_list_raises(self):
with pytest.raises(ValueError, match="empty"):
self.seq.start([])
def test_restart_resets_state(self):
self.seq.start(_three_wp())
self.seq.advance()
self.seq.start([Waypoint(9.0, 9.0)])
assert self.seq.current_index == 0
assert self.seq.total == 1
class TestSequencerAdvance:
def setup_method(self):
self.seq = WaypointSequencer()
def test_advance_moves_to_next(self):
self.seq.start(_three_wp())
self.seq.advance()
assert self.seq.current_index == 1
def test_advance_twice(self):
self.seq.start(_three_wp())
self.seq.advance()
self.seq.advance()
assert self.seq.current_index == 2
def test_advance_last_sets_succeeded(self):
self.seq.start(_three_wp())
for _ in range(3):
self.seq.advance()
assert self.seq.state == WaypointSequencer.SUCCEEDED
def test_advance_last_sets_done(self):
self.seq.start(_three_wp())
for _ in range(3):
self.seq.advance()
assert self.seq.is_done
def test_advance_on_idle_raises(self):
with pytest.raises(RuntimeError, match="RUNNING"):
self.seq.advance()
def test_advance_on_aborted_raises(self):
self.seq.start(_three_wp())
self.seq.abort()
with pytest.raises(RuntimeError, match="RUNNING"):
self.seq.advance()
def test_no_current_waypoint_after_succeed(self):
self.seq.start(_three_wp())
for _ in range(3):
self.seq.advance()
assert self.seq.current_waypoint is None
class TestSequencerAbort:
def setup_method(self):
self.seq = WaypointSequencer()
def test_abort_transitions_state(self):
self.seq.start(_three_wp())
self.seq.abort("Nav2 failed")
assert self.seq.state == WaypointSequencer.ABORTED
def test_abort_sets_reason(self):
self.seq.start(_three_wp())
self.seq.abort("timeout")
assert self.seq.abort_reason == "timeout"
def test_abort_sets_done(self):
self.seq.start(_three_wp())
self.seq.abort()
assert self.seq.is_done
def test_abort_no_reason(self):
self.seq.start(_three_wp())
self.seq.abort()
assert self.seq.abort_reason == ""
class TestSequencerCancel:
def setup_method(self):
self.seq = WaypointSequencer()
def test_cancel_transitions_state(self):
self.seq.start(_three_wp())
self.seq.cancel()
assert self.seq.state == WaypointSequencer.CANCELED
def test_cancel_sets_done(self):
self.seq.start(_three_wp())
self.seq.cancel()
assert self.seq.is_done
def test_cancel_clears_current_waypoint(self):
self.seq.start(_three_wp())
self.seq.cancel()
assert self.seq.current_waypoint is None
class TestSequencerProgress:
def setup_method(self):
self.seq = WaypointSequencer()
self.seq.start(_three_wp())
def test_initial_progress(self):
assert self.seq.progress == (0, 3)
def test_progress_after_one(self):
self.seq.advance()
assert self.seq.progress == (1, 3)
def test_remaining_initial(self):
assert self.seq.remaining == 3
def test_remaining_after_advance(self):
self.seq.advance()
assert self.seq.remaining == 2
def test_remaining_after_all(self):
for _ in range(3):
self.seq.advance()
assert self.seq.remaining == 0
class TestSequencerStatusDict:
def setup_method(self):
self.seq = WaypointSequencer()
def test_idle_status_dict(self):
d = self.seq.status_dict()
assert d["state"] == "idle"
assert d["total"] == 0
assert d["current_wp"] is None
def test_running_status_dict(self):
self.seq.start(_three_wp())
d = self.seq.status_dict()
assert d["state"] == "running"
assert d["total"] == 3
assert d["current_wp"] is not None
assert "x" in d["current_wp"]
def test_succeeded_status_dict(self):
self.seq.start(_three_wp())
for _ in range(3):
self.seq.advance()
d = self.seq.status_dict()
assert d["state"] == "succeeded"
assert d["remaining"] == 0
# ─── 2-D TF maths (uwb_pose_bridge_node) ──────────────────────────────────────
EPS = 1e-9
class TestYawFromQ:
def test_zero_yaw(self):
qx, qy, qz, qw = _q_from_yaw(0.0)
assert abs(_yaw_from_q(qx, qy, qz, qw)) < EPS
def test_90_degrees(self):
yaw = math.pi / 2
qx, qy, qz, qw = _q_from_yaw(yaw)
assert abs(_yaw_from_q(qx, qy, qz, qw) - yaw) < 1e-6
def test_minus_90_degrees(self):
yaw = -math.pi / 2
qx, qy, qz, qw = _q_from_yaw(yaw)
assert abs(_yaw_from_q(qx, qy, qz, qw) - yaw) < 1e-6
def test_180_degrees(self):
yaw = math.pi
qx, qy, qz, qw = _q_from_yaw(yaw)
# atan2 wraps to -π at exactly π; tolerate either
result = _yaw_from_q(qx, qy, qz, qw)
assert abs(abs(result) - math.pi) < 1e-6
def test_roundtrip_arbitrary(self):
for deg in (0, 30, 60, 90, 120, 150, 170, -45, -120):
yaw = math.radians(deg)
qx, qy, qz, qw = _q_from_yaw(yaw)
back = _yaw_from_q(qx, qy, qz, qw)
assert abs(back - yaw) < 1e-6, f"roundtrip failed for {deg}°"
class TestInvert2D:
def test_identity(self):
xi, yi, thi = _invert_2d(0.0, 0.0, 0.0)
assert abs(xi) < EPS and abs(yi) < EPS and abs(thi) < EPS
def test_translation_only(self):
# invert pure translation: inv(T(3,4,0)) = T(-3,-4,0)
xi, yi, thi = _invert_2d(3.0, 4.0, 0.0)
assert abs(xi + 3.0) < EPS
assert abs(yi + 4.0) < EPS
assert abs(thi) < EPS
def test_rotation_only(self):
# invert pure rotation by 90°
xi, yi, thi = _invert_2d(0.0, 0.0, math.pi / 2)
assert abs(thi + math.pi / 2) < EPS
def test_compose_with_inverse_is_identity(self):
x, y, th = 2.5, -1.3, math.radians(47)
ix, iy, ith = _invert_2d(x, y, th)
rx, ry, rth = _compose_2d(x, y, th, ix, iy, ith)
assert abs(rx) < 1e-9
assert abs(ry) < 1e-9
assert abs(rth % (2 * math.pi)) < 1e-9 or abs(rth) < 1e-9
class TestComputeMapOdomTf:
def test_identity_pose(self):
"""If both poses are identity, map→odom is identity."""
mx, my, mth = compute_map_odom_tf(0, 0, 0, 0, 0, 0)
assert abs(mx) < EPS and abs(my) < EPS and abs(mth) < EPS
def test_odom_is_map(self):
"""If robot is at same position in both frames, map→odom is identity."""
pose = (1.0, 2.0, math.radians(45))
mx, my, mth = compute_map_odom_tf(*pose, *pose)
assert abs(mx) < 1e-9 and abs(my) < 1e-9 and abs(mth) < 1e-9
def test_pure_translation_offset(self):
"""map_bl at (5,0,0), odom_bl at (2,0,0) → map_odom at (3,0,0)."""
mx, my, mth = compute_map_odom_tf(5.0, 0.0, 0.0, 2.0, 0.0, 0.0)
assert abs(mx - 3.0) < 1e-9
assert abs(my) < 1e-9
assert abs(mth) < 1e-9
def test_tf_reconstruction(self):
"""T_map_odom ⊗ T_odom_bl == T_map_bl (round-trip check)."""
map_bl = (3.0, 1.5, math.radians(30))
odom_bl = (1.0, 0.5, math.radians(10))
mo = compute_map_odom_tf(*map_bl, *odom_bl)
# Recompose: T_map_odom ⊗ T_odom_bl should equal T_map_bl
rx, ry, rth = _compose_2d(*mo, *odom_bl)
assert abs(rx - map_bl[0]) < 1e-9
assert abs(ry - map_bl[1]) < 1e-9
assert abs(rth - map_bl[2]) < 1e-9
def test_rotated_frame(self):
"""Verify correctness with a 90° heading offset."""
# map: robot at (1,0) facing 90°; odom: robot at (0,1) facing 0°
mo_x, mo_y, mo_th = compute_map_odom_tf(
1.0, 0.0, math.pi / 2,
0.0, 1.0, 0.0,
)
# Recompose and check
rx, ry, rth = _compose_2d(mo_x, mo_y, mo_th, 0.0, 1.0, 0.0)
assert abs(rx - 1.0) < 1e-9
assert abs(ry - 0.0) < 1e-9
assert abs(rth - math.pi / 2) < 1e-9