feat: Nav2 integration with UWB localization (Issue #599)
New package saltybot_nav2_uwb replacing AMCL-based localization with UWB-IMU EKF fused pose. Key components: - uwb_pose_bridge_node: subscribes /saltybot/pose/fused_cov (from EKF), computes map→odom TF via T_map_odom = T_map_base × inv(T_odom_base), broadcasts at 20 Hz. Publishes /initialpose on first valid pose. - waypoint_sequencer.py: pure-Python state machine (IDLE→RUNNING→ SUCCEEDED/ABORTED/CANCELED) for sequential waypoint execution. - waypoint_follower_node: action server on /saltybot/nav/follow_waypoints (nav2_msgs/FollowWaypoints), sends each goal to Nav2 NavigateToPose in sequence; JSON topic /saltybot/nav/waypoints for operator use. - nav2_uwb_params.yaml: DWB controller capped at 1.0 m/s, global+local costmap with /scan (RPLIDAR), rolling-window global costmap (no static map needed), AMCL removed from lifecycle manager. - nav2_uwb.launch.py: bridge (t=0) → Nav2 (t=2s) → waypoint follower (t=4s) with LogInfo markers. - 65 unit tests passing (waypoint dataclass, sequencer state machine, 2-D TF maths, progress tracking). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
061189670a
commit
40b0917c33
315
jetson/ros2_ws/src/saltybot_nav2_uwb/config/nav2_uwb_params.yaml
Normal file
315
jetson/ros2_ws/src/saltybot_nav2_uwb/config/nav2_uwb_params.yaml
Normal file
@ -0,0 +1,315 @@
|
||||
# nav2_uwb_params.yaml — Nav2 configuration for UWB-based localization (Issue #599).
|
||||
#
|
||||
# Key differences from the AMCL/SLAM config (saltybot_nav2_slam):
|
||||
# • AMCL is NOT used. The uwb_pose_bridge_node broadcasts map→odom TF directly
|
||||
# from the UWB-IMU EKF fused pose (/saltybot/pose/fused_cov).
|
||||
# • DWB controller capped at 1.0 m/s for safe differential-drive operation.
|
||||
# • Global costmap uses a rolling window — no pre-built static map required.
|
||||
# UWB provides the global (map-frame) position.
|
||||
# • Costmap obstacle layer: /scan (RPLIDAR A1/A2) only.
|
||||
# • lifecycle_manager_localization is intentionally empty; the bridge node
|
||||
# keeps the TF alive without Nav2 lifecycle management.
|
||||
|
||||
bt_navigator:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
global_frame: map
|
||||
robot_base_frame: base_link
|
||||
odom_topic: /odom
|
||||
bt_loop_duration: 10
|
||||
default_server_timeout: 20
|
||||
enable_groot_monitoring: false
|
||||
default_nav_to_pose_bt_xml: >-
|
||||
$(find-pkg-share nav2_bt_navigator)/behavior_trees/navigate_to_pose_w_replanning_and_recovery.xml
|
||||
default_nav_through_poses_bt_xml: >-
|
||||
$(find-pkg-share nav2_bt_navigator)/behavior_trees/navigate_through_poses_w_replanning_and_recovery.xml
|
||||
plugin_lib_names:
|
||||
- nav2_compute_path_to_pose_action_bt_node
|
||||
- nav2_compute_path_through_poses_action_bt_node
|
||||
- nav2_follow_path_action_bt_node
|
||||
- nav2_spin_action_bt_node
|
||||
- nav2_wait_action_bt_node
|
||||
- nav2_back_up_action_bt_node
|
||||
- nav2_drive_on_heading_bt_node
|
||||
- nav2_clear_costmap_service_bt_node
|
||||
- nav2_is_stuck_condition_bt_node
|
||||
- nav2_goal_updated_condition_bt_node
|
||||
- nav2_initial_pose_received_condition_bt_node
|
||||
- nav2_rate_controller_bt_node
|
||||
- nav2_distance_controller_bt_node
|
||||
- nav2_speed_controller_bt_node
|
||||
- nav2_truncate_path_action_bt_node
|
||||
- nav2_remove_passed_goals_action_bt_node
|
||||
- nav2_planner_selector_bt_node
|
||||
- nav2_controller_selector_bt_node
|
||||
- nav2_goal_checker_selector_bt_node
|
||||
|
||||
bt_navigator_rclcpp_node:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
|
||||
# ── Controller (DWB — differential drive, max 1.0 m/s) ─────────────────────────
|
||||
controller_server:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
controller_frequency: 20.0
|
||||
min_x_velocity_threshold: 0.001
|
||||
min_y_velocity_threshold: 0.5
|
||||
min_theta_velocity_threshold: 0.001
|
||||
failure_tolerance: 0.3
|
||||
progress_checker_plugin: "progress_checker"
|
||||
goal_checker_plugins: ["general_goal_checker"]
|
||||
controller_plugins: ["FollowPath"]
|
||||
|
||||
progress_checker:
|
||||
plugin: "nav2_core::SimpleProgressChecker"
|
||||
required_movement_radius: 0.5
|
||||
movement_time_allowance: 15.0
|
||||
|
||||
general_goal_checker:
|
||||
stateful: true
|
||||
plugin: "nav2_core::SimpleGoalChecker"
|
||||
xy_goal_tolerance: 0.20
|
||||
yaw_goal_tolerance: 0.15
|
||||
|
||||
FollowPath:
|
||||
plugin: "dwb_core::DWBLocalPlanner"
|
||||
debug_trajectory_details: false
|
||||
# ── Velocity limits (1.0 m/s max per Issue #599) ──────────────────
|
||||
min_vel_x: 0.0
|
||||
max_vel_x: 1.0 # ← hard cap for safe indoor operation
|
||||
min_vel_y: 0.0
|
||||
max_vel_y: 0.0 # differential drive — no lateral motion
|
||||
min_vel_theta: -1.5
|
||||
max_vel_theta: 1.5
|
||||
min_speed_xy: 0.0
|
||||
max_speed_xy: 1.0
|
||||
min_speed_theta: 0.0
|
||||
# ── Acceleration limits ────────────────────────────────────────────
|
||||
acc_lim_x: 1.0
|
||||
acc_lim_y: 0.0
|
||||
acc_lim_theta: 2.0
|
||||
decel_lim_x: -1.0
|
||||
decel_lim_y: 0.0
|
||||
decel_lim_theta: -2.0
|
||||
# ── Trajectory sampling ────────────────────────────────────────────
|
||||
vx_samples: 20
|
||||
vy_samples: 1 # differential drive — only 1 lateral sample
|
||||
vtheta_samples: 40
|
||||
sim_time: 1.5
|
||||
linear_granularity: 0.05
|
||||
angular_granularity: 0.05
|
||||
transform_tolerance: 0.3
|
||||
trans_stopped_velocity: 0.25
|
||||
short_circuit_trajectory_evaluation: true
|
||||
stateful: true
|
||||
# ── Critic weights ─────────────────────────────────────────────────
|
||||
critics:
|
||||
- "RotateToGoal"
|
||||
- "Oscillation"
|
||||
- "BaseObstacle"
|
||||
- "GoalAlign"
|
||||
- "PathAlign"
|
||||
- "PathDist"
|
||||
- "GoalDist"
|
||||
BaseObstacle.scale: 0.02
|
||||
PathAlign.scale: 32.0
|
||||
PathAlign.forward_point_distance: 0.1
|
||||
GoalAlign.scale: 24.0
|
||||
GoalAlign.forward_point_distance: 0.1
|
||||
PathDist.scale: 32.0
|
||||
GoalDist.scale: 24.0
|
||||
RotateToGoal.scale: 32.0
|
||||
RotateToGoal.slowing_factor: 5.0
|
||||
RotateToGoal.lookahead_time: -1.0
|
||||
|
||||
controller_server_rclcpp_node:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
|
||||
# ── Global planner (NavFn A*) ────────────────────────────────────────────────
|
||||
planner_server:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
expected_planner_frequency: 10.0
|
||||
planner_plugins: ["GridBased"]
|
||||
GridBased:
|
||||
plugin: "nav2_navfn_planner/NavfnPlanner"
|
||||
tolerance: 0.3
|
||||
use_astar: true
|
||||
allow_unknown: true
|
||||
|
||||
planner_server_rclcpp_node:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
|
||||
# ── Recovery behaviors ───────────────────────────────────────────────────────
|
||||
behavior_server:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
costmap_topic: local_costmap/costmap_raw
|
||||
footprint_topic: local_costmap/published_footprint
|
||||
cycle_frequency: 10.0
|
||||
behavior_plugins: ["spin", "backup", "drive_on_heading", "wait"]
|
||||
spin:
|
||||
plugin: "nav2_behaviors/Spin"
|
||||
backup:
|
||||
plugin: "nav2_behaviors/BackUp"
|
||||
drive_on_heading:
|
||||
plugin: "nav2_behaviors/DriveOnHeading"
|
||||
wait:
|
||||
plugin: "nav2_behaviors/Wait"
|
||||
global_frame: odom
|
||||
robot_base_frame: base_link
|
||||
transform_timeout: 0.1
|
||||
simulate_ahead_time: 2.0
|
||||
max_rotations: 1.0
|
||||
max_backup_dist: 0.3
|
||||
backup_speed: 0.15
|
||||
|
||||
behavior_server_rclcpp_node:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
|
||||
# ── Costmaps ─────────────────────────────────────────────────────────────────
|
||||
#
|
||||
# Both costmaps use only /scan (RPLIDAR). No depth camera source here —
|
||||
# saltybot_depth_costmap adds that layer separately when enabled.
|
||||
|
||||
local_costmap:
|
||||
local_costmap:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
update_frequency: 10.0
|
||||
publish_frequency: 5.0
|
||||
global_frame: odom
|
||||
robot_base_frame: base_link
|
||||
rolling_window: true
|
||||
width: 4
|
||||
height: 4
|
||||
resolution: 0.05
|
||||
robot_radius: 0.35
|
||||
plugins: ["obstacle_layer", "inflation_layer"]
|
||||
obstacle_layer:
|
||||
plugin: "nav2_costmap_2d::ObstacleLayer"
|
||||
enabled: true
|
||||
observation_sources: scan
|
||||
scan:
|
||||
topic: /scan
|
||||
max_obstacle_height: 2.0
|
||||
min_obstacle_height: 0.0
|
||||
clearing: true
|
||||
marking: true
|
||||
data_type: "LaserScan"
|
||||
raytrace_max_range: 8.0
|
||||
raytrace_min_range: 0.0
|
||||
obstacle_max_range: 6.0
|
||||
obstacle_min_range: 0.0
|
||||
inflation_layer:
|
||||
plugin: "nav2_costmap_2d::InflationLayer"
|
||||
enabled: true
|
||||
inflation_radius: 0.55
|
||||
cost_scaling_factor: 10.0
|
||||
inflate_unknown: false
|
||||
inflate_around_unknown: true
|
||||
track_unknown_space: false
|
||||
always_send_full_costmap: true
|
||||
|
||||
local_costmap_client:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
|
||||
local_costmap_rclcpp_node:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
|
||||
global_costmap:
|
||||
global_costmap:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
update_frequency: 2.0
|
||||
publish_frequency: 1.0
|
||||
global_frame: map
|
||||
robot_base_frame: base_link
|
||||
# Rolling window — UWB provides global position, no static map needed.
|
||||
rolling_window: true
|
||||
width: 20
|
||||
height: 20
|
||||
resolution: 0.05
|
||||
robot_radius: 0.35
|
||||
plugins: ["obstacle_layer", "inflation_layer"]
|
||||
obstacle_layer:
|
||||
plugin: "nav2_costmap_2d::ObstacleLayer"
|
||||
enabled: true
|
||||
observation_sources: scan
|
||||
scan:
|
||||
topic: /scan
|
||||
max_obstacle_height: 2.0
|
||||
min_obstacle_height: 0.0
|
||||
clearing: true
|
||||
marking: true
|
||||
data_type: "LaserScan"
|
||||
raytrace_max_range: 8.0
|
||||
raytrace_min_range: 0.0
|
||||
obstacle_max_range: 6.0
|
||||
obstacle_min_range: 0.0
|
||||
inflation_layer:
|
||||
plugin: "nav2_costmap_2d::InflationLayer"
|
||||
enabled: true
|
||||
inflation_radius: 0.55
|
||||
cost_scaling_factor: 10.0
|
||||
inflate_unknown: false
|
||||
inflate_around_unknown: true
|
||||
track_unknown_space: false
|
||||
always_send_full_costmap: false
|
||||
|
||||
global_costmap_client:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
|
||||
global_costmap_rclcpp_node:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
|
||||
# ── Velocity smoother ────────────────────────────────────────────────────────
|
||||
velocity_smoother:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
smoothing_frequency: 20.0
|
||||
scale_velocities: false
|
||||
feedback: "OPEN_LOOP"
|
||||
max_velocity: [1.0, 0.0, 1.5] # [x, y, theta] — capped at 1.0 m/s
|
||||
min_velocity: [-0.3, 0.0, -1.5]
|
||||
max_accel: [1.0, 0.0, 2.0]
|
||||
max_decel: [-1.0, 0.0, -2.0]
|
||||
odom_topic: /odom
|
||||
odom_duration: 0.1
|
||||
deadband_velocity: [0.0, 0.0, 0.0]
|
||||
velocity_timeout: 1.0
|
||||
|
||||
# ── Lifecycle managers ───────────────────────────────────────────────────────
|
||||
#
|
||||
# Localization lifecycle is intentionally empty — the uwb_pose_bridge_node
|
||||
# manages the map→odom TF independently (no AMCL, no SLAM Toolbox).
|
||||
|
||||
lifecycle_manager_navigation:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
autostart: true
|
||||
node_names:
|
||||
- "controller_server"
|
||||
- "planner_server"
|
||||
- "behavior_server"
|
||||
- "bt_navigator"
|
||||
- "velocity_smoother"
|
||||
|
||||
lifecycle_manager_localization:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
autostart: true
|
||||
node_names: [] # UWB pose bridge handles TF directly — no AMCL
|
||||
|
||||
map_server:
|
||||
ros__parameters:
|
||||
use_sim_time: false
|
||||
yaml_filename: "" # No static map — global costmap is rolling window
|
||||
168
jetson/ros2_ws/src/saltybot_nav2_uwb/launch/nav2_uwb.launch.py
Normal file
168
jetson/ros2_ws/src/saltybot_nav2_uwb/launch/nav2_uwb.launch.py
Normal file
@ -0,0 +1,168 @@
|
||||
"""nav2_uwb.launch.py — Full Nav2 stack with UWB localization (Issue #599).
|
||||
|
||||
Launch sequence
|
||||
───────────────
|
||||
1. uwb_pose_bridge_node — broadcasts map→odom TF from UWB-IMU EKF pose
|
||||
2. nav2_bringup navigation — controller, planner, BT navigator, costmaps
|
||||
3. lifecycle_manager_navigation — autostart all Nav2 nodes
|
||||
4. waypoint_follower_node — sequential waypoint execution action server
|
||||
|
||||
Localization is handled by the UWB bridge; AMCL is NOT launched.
|
||||
|
||||
Arguments
|
||||
─────────
|
||||
use_sim_time (bool) default: false
|
||||
params_file (str) default: <package>/config/nav2_uwb_params.yaml
|
||||
uwb_pose_topic (str) default: /saltybot/pose/fused_cov
|
||||
odom_topic (str) default: /odom
|
||||
map_frame (str) default: map
|
||||
odom_frame (str) default: odom
|
||||
base_frame (str) default: base_link
|
||||
max_vel_x (float) default: 1.0 — DWB hard cap (m/s)
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import (
|
||||
DeclareLaunchArgument,
|
||||
GroupAction,
|
||||
IncludeLaunchDescription,
|
||||
LogInfo,
|
||||
TimerAction,
|
||||
)
|
||||
from launch.conditions import IfCondition
|
||||
from launch.launch_description_sources import PythonLaunchDescriptionSource
|
||||
from launch.substitutions import (
|
||||
LaunchConfiguration,
|
||||
PathJoinSubstitution,
|
||||
PythonExpression,
|
||||
)
|
||||
from launch_ros.actions import Node
|
||||
from launch_ros.substitutions import FindPackageShare
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
pkg_nav2_uwb = get_package_share_directory("saltybot_nav2_uwb")
|
||||
pkg_nav2_bringup = get_package_share_directory("nav2_bringup")
|
||||
|
||||
default_params = os.path.join(pkg_nav2_uwb, "config", "nav2_uwb_params.yaml")
|
||||
|
||||
# ── Declare arguments ──────────────────────────────────────────────────
|
||||
args = [
|
||||
DeclareLaunchArgument(
|
||||
"use_sim_time", default_value="false",
|
||||
description="Use simulation clock"),
|
||||
DeclareLaunchArgument(
|
||||
"params_file", default_value=default_params,
|
||||
description="Full path to nav2_uwb_params.yaml"),
|
||||
DeclareLaunchArgument(
|
||||
"uwb_pose_topic", default_value="/saltybot/pose/fused_cov",
|
||||
description="UWB-IMU EKF fused pose topic (PoseWithCovarianceStamped)"),
|
||||
DeclareLaunchArgument(
|
||||
"odom_topic", default_value="/odom",
|
||||
description="Wheel odometry topic (Odometry)"),
|
||||
DeclareLaunchArgument(
|
||||
"map_frame", default_value="map",
|
||||
description="Global map frame id"),
|
||||
DeclareLaunchArgument(
|
||||
"odom_frame", default_value="odom",
|
||||
description="Odometry frame id"),
|
||||
DeclareLaunchArgument(
|
||||
"base_frame", default_value="base_link",
|
||||
description="Robot base frame id"),
|
||||
DeclareLaunchArgument(
|
||||
"autostart", default_value="true",
|
||||
description="Automatically start Nav2 lifecycle nodes"),
|
||||
]
|
||||
|
||||
# ── UWB pose bridge — starts immediately ──────────────────────────────
|
||||
uwb_bridge = Node(
|
||||
package="saltybot_nav2_uwb",
|
||||
executable="uwb_pose_bridge_node",
|
||||
name="uwb_pose_bridge",
|
||||
output="screen",
|
||||
parameters=[{
|
||||
"use_sim_time": LaunchConfiguration("use_sim_time"),
|
||||
"uwb_pose_topic": LaunchConfiguration("uwb_pose_topic"),
|
||||
"odom_topic": LaunchConfiguration("odom_topic"),
|
||||
"map_frame": LaunchConfiguration("map_frame"),
|
||||
"odom_frame": LaunchConfiguration("odom_frame"),
|
||||
"base_frame": LaunchConfiguration("base_frame"),
|
||||
"tf_publish_hz": 20.0,
|
||||
"tf_timeout_s": 0.5,
|
||||
}],
|
||||
)
|
||||
|
||||
uwb_bridge_log = LogInfo(
|
||||
msg="[nav2_uwb] UWB pose bridge launched — map→odom TF will be "
|
||||
"broadcast once UWB + odom data arrive."
|
||||
)
|
||||
|
||||
# ── Nav2 navigation stack (t=2s: allow bridge to initialise first) ─────
|
||||
nav2_navigation = TimerAction(
|
||||
period=2.0,
|
||||
actions=[
|
||||
LogInfo(msg="[nav2_uwb] Starting Nav2 navigation stack..."),
|
||||
IncludeLaunchDescription(
|
||||
PythonLaunchDescriptionSource(
|
||||
os.path.join(pkg_nav2_bringup, "launch", "navigation_launch.py")
|
||||
),
|
||||
launch_arguments={
|
||||
"use_sim_time": LaunchConfiguration("use_sim_time"),
|
||||
"params_file": LaunchConfiguration("params_file"),
|
||||
"autostart": LaunchConfiguration("autostart"),
|
||||
# No map_subscribe_transient_local — no static map
|
||||
"use_lifecycle_mgr": "true",
|
||||
"use_sim_time": LaunchConfiguration("use_sim_time"),
|
||||
}.items(),
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
# ── Waypoint follower (t=4s: Nav2 needs time to come up) ──────────────
|
||||
waypoint_follower = TimerAction(
|
||||
period=4.0,
|
||||
actions=[
|
||||
LogInfo(msg="[nav2_uwb] Starting waypoint follower action server..."),
|
||||
Node(
|
||||
package="saltybot_nav2_uwb",
|
||||
executable="waypoint_follower_node",
|
||||
name="waypoint_follower",
|
||||
output="screen",
|
||||
parameters=[{
|
||||
"use_sim_time": LaunchConfiguration("use_sim_time"),
|
||||
"map_frame": LaunchConfiguration("map_frame"),
|
||||
"goal_xy_tolerance": 0.20,
|
||||
"goal_yaw_tolerance": 0.15,
|
||||
"nav_timeout_s": 60.0,
|
||||
"status_hz": 2.0,
|
||||
}],
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
ready_log = TimerAction(
|
||||
period=4.5,
|
||||
actions=[
|
||||
LogInfo(
|
||||
msg="[nav2_uwb] Stack ready.\n"
|
||||
" Localization : UWB-IMU EKF bridge (no AMCL)\n"
|
||||
" Controller : DWB ≤ 1.0 m/s (differential drive)\n"
|
||||
" Waypoints : /saltybot/nav/follow_waypoints (action)\n"
|
||||
" Status : /saltybot/nav/waypoint_status (JSON)\n"
|
||||
" Cancel : /saltybot/nav/cancel (topic)"
|
||||
),
|
||||
],
|
||||
)
|
||||
|
||||
return LaunchDescription(
|
||||
args + [
|
||||
uwb_bridge_log,
|
||||
uwb_bridge,
|
||||
nav2_navigation,
|
||||
waypoint_follower,
|
||||
ready_log,
|
||||
]
|
||||
)
|
||||
42
jetson/ros2_ws/src/saltybot_nav2_uwb/package.xml
Normal file
42
jetson/ros2_ws/src/saltybot_nav2_uwb/package.xml
Normal file
@ -0,0 +1,42 @@
|
||||
<?xml version="1.0"?>
|
||||
<package format="3">
|
||||
<name>saltybot_nav2_uwb</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
Nav2 integration with UWB-based localization for SaltyBot (Issue #599).
|
||||
Replaces AMCL with a UWB pose bridge that broadcasts the map->odom TF
|
||||
directly from the UWB-IMU EKF fused pose. Includes a DWB controller
|
||||
capped at 1.0 m/s for safe differential-drive operation and a simple
|
||||
sequential waypoint-follower action server.
|
||||
</description>
|
||||
<maintainer email="sl-jetson@saltylab.local">sl-jetson</maintainer>
|
||||
<license>Apache-2.0</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>nav_msgs</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>tf2</depend>
|
||||
<depend>tf2_ros</depend>
|
||||
<depend>nav2_msgs</depend>
|
||||
|
||||
<exec_depend>nav2_bringup</exec_depend>
|
||||
<exec_depend>nav2_bt_navigator</exec_depend>
|
||||
<exec_depend>nav2_controller</exec_depend>
|
||||
<exec_depend>nav2_planner</exec_depend>
|
||||
<exec_depend>nav2_behaviors</exec_depend>
|
||||
<exec_depend>nav2_lifecycle_manager</exec_depend>
|
||||
<exec_depend>nav2_costmap_2d</exec_depend>
|
||||
<exec_depend>nav2_navfn_planner</exec_depend>
|
||||
<exec_depend>dwb_core</exec_depend>
|
||||
<exec_depend>dwb_critics</exec_depend>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<test_depend>pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,218 @@
|
||||
"""uwb_pose_bridge_node.py — UWB→Nav2 localization bridge (Issue #599).
|
||||
|
||||
Replaces AMCL by listening to the UWB-IMU EKF fused pose and broadcasting
|
||||
the map→odom TF that Nav2 requires for global localization.
|
||||
|
||||
TF computation (2D, ground robot):
|
||||
──────────────────────────────────
|
||||
map → base_link := UWB-IMU EKF (/saltybot/pose/fused_cov)
|
||||
odom → base_link := wheel odometry (/odom)
|
||||
|
||||
T_map_odom = T_map_base_link ⊗ T_odom_base_link⁻¹
|
||||
|
||||
Published TF: map → odom (at UWB rate, ≥ 10 Hz)
|
||||
Published topic: /initialpose (once on first valid pose, for Nav2 boot)
|
||||
|
||||
Subscribed topics:
|
||||
/saltybot/pose/fused_cov (PoseWithCovarianceStamped) — UWB-IMU EKF
|
||||
/odom (Odometry) — wheel odometry
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from typing import Optional
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from geometry_msgs.msg import (
|
||||
PoseWithCovarianceStamped,
|
||||
TransformStamped,
|
||||
)
|
||||
from nav_msgs.msg import Odometry
|
||||
from tf2_ros import TransformBroadcaster
|
||||
|
||||
|
||||
# ── Pure-Python 2-D transform helpers (no numpy / tf_transformations needed) ──
|
||||
|
||||
def _yaw_from_q(qx: float, qy: float, qz: float, qw: float) -> float:
|
||||
"""Extract yaw (rad) from unit quaternion."""
|
||||
return math.atan2(2.0 * (qw * qz + qx * qy),
|
||||
1.0 - 2.0 * (qy * qy + qz * qz))
|
||||
|
||||
|
||||
def _q_from_yaw(yaw: float):
|
||||
"""Return (qx, qy, qz, qw) for a pure-yaw rotation."""
|
||||
return 0.0, 0.0, math.sin(yaw * 0.5), math.cos(yaw * 0.5)
|
||||
|
||||
|
||||
def _invert_2d(x: float, y: float, th: float):
|
||||
"""Invert a 2-D SE(2) transform."""
|
||||
c, s = math.cos(th), math.sin(th)
|
||||
xi = -(c * x + s * y)
|
||||
yi = -(-s * x + c * y)
|
||||
return xi, yi, -th
|
||||
|
||||
|
||||
def _compose_2d(x1: float, y1: float, th1: float,
|
||||
x2: float, y2: float, th2: float):
|
||||
"""Compose two 2-D SE(2) transforms: T1 ⊗ T2."""
|
||||
c1, s1 = math.cos(th1), math.sin(th1)
|
||||
xr = x1 + c1 * x2 - s1 * y2
|
||||
yr = y1 + s1 * x2 + c1 * y2
|
||||
thr = th1 + th2
|
||||
return xr, yr, thr
|
||||
|
||||
|
||||
def compute_map_odom_tf(
|
||||
map_bl_x: float, map_bl_y: float, map_bl_th: float,
|
||||
odom_bl_x: float, odom_bl_y: float, odom_bl_th: float,
|
||||
):
|
||||
"""Return (x, y, yaw) for the map→odom transform.
|
||||
|
||||
T_map_odom = T_map_base_link ⊗ inv(T_odom_base_link)
|
||||
"""
|
||||
ix, iy, ith = _invert_2d(odom_bl_x, odom_bl_y, odom_bl_th)
|
||||
return _compose_2d(map_bl_x, map_bl_y, map_bl_th, ix, iy, ith)
|
||||
|
||||
|
||||
# ── ROS2 node ─────────────────────────────────────────────────────────────────
|
||||
|
||||
class UwbPoseBridgeNode(Node):
|
||||
"""Broadcasts map→odom TF derived from UWB-IMU fused pose + wheel odom.
|
||||
|
||||
Parameters (ROS):
|
||||
uwb_pose_topic (str) default: /saltybot/pose/fused_cov
|
||||
odom_topic (str) default: /odom
|
||||
tf_publish_hz (float) default: 20.0 — TF broadcast rate
|
||||
tf_timeout_s (float) default: 0.5 — stop publishing if UWB
|
||||
older than this
|
||||
map_frame (str) default: map
|
||||
odom_frame (str) default: odom
|
||||
base_frame (str) default: base_link
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("uwb_pose_bridge")
|
||||
|
||||
self.declare_parameter("uwb_pose_topic", "/saltybot/pose/fused_cov")
|
||||
self.declare_parameter("odom_topic", "/odom")
|
||||
self.declare_parameter("tf_publish_hz", 20.0)
|
||||
self.declare_parameter("tf_timeout_s", 0.5)
|
||||
self.declare_parameter("map_frame", "map")
|
||||
self.declare_parameter("odom_frame", "odom")
|
||||
self.declare_parameter("base_frame", "base_link")
|
||||
|
||||
self._map_frame = self.get_parameter("map_frame").value
|
||||
self._odom_frame = self.get_parameter("odom_frame").value
|
||||
self._base_frame = self.get_parameter("base_frame").value
|
||||
self._tf_timeout = self.get_parameter("tf_timeout_s").value
|
||||
|
||||
self._uwb_pose: Optional[PoseWithCovarianceStamped] = None
|
||||
self._odom_pose: Optional[Odometry] = None
|
||||
self._init_pose_sent: bool = False
|
||||
|
||||
self._tf_broadcaster = TransformBroadcaster(self)
|
||||
|
||||
# /initialpose: Nav2 uses this to seed the robot's initial position
|
||||
self._init_pub = self.create_publisher(
|
||||
PoseWithCovarianceStamped, "/initialpose", 1)
|
||||
|
||||
self.create_subscription(
|
||||
PoseWithCovarianceStamped,
|
||||
self.get_parameter("uwb_pose_topic").value,
|
||||
self._on_uwb_pose, 10)
|
||||
|
||||
self.create_subscription(
|
||||
Odometry,
|
||||
self.get_parameter("odom_topic").value,
|
||||
self._on_odom, 20)
|
||||
|
||||
hz = self.get_parameter("tf_publish_hz").value
|
||||
self.create_timer(1.0 / hz, self._publish_tf)
|
||||
|
||||
self.get_logger().info(
|
||||
f"UWB pose bridge ready | "
|
||||
f"uwb={self.get_parameter('uwb_pose_topic').value} "
|
||||
f"odom={self.get_parameter('odom_topic').value} "
|
||||
f"tf={self._map_frame}→{self._odom_frame} @ {hz:.0f} Hz"
|
||||
)
|
||||
|
||||
# ── Callbacks ──────────────────────────────────────────────────────────
|
||||
|
||||
def _on_uwb_pose(self, msg: PoseWithCovarianceStamped) -> None:
|
||||
self._uwb_pose = msg
|
||||
if not self._init_pose_sent:
|
||||
self._init_pub.publish(msg)
|
||||
self._init_pose_sent = True
|
||||
self.get_logger().info(
|
||||
f"Initial UWB pose published to /initialpose "
|
||||
f"({msg.pose.pose.position.x:.2f}, "
|
||||
f"{msg.pose.pose.position.y:.2f})"
|
||||
)
|
||||
|
||||
def _on_odom(self, msg: Odometry) -> None:
|
||||
self._odom_pose = msg
|
||||
|
||||
# ── TF broadcast ───────────────────────────────────────────────────────
|
||||
|
||||
def _publish_tf(self) -> None:
|
||||
if self._uwb_pose is None or self._odom_pose is None:
|
||||
return
|
||||
|
||||
# Staleness guard: stop broadcasting if UWB data is too old
|
||||
now = self.get_clock().now()
|
||||
uwb_stamp = rclpy.time.Time.from_msg(self._uwb_pose.header.stamp)
|
||||
age_s = (now - uwb_stamp).nanoseconds * 1e-9
|
||||
if age_s > self._tf_timeout:
|
||||
self.get_logger().warn(
|
||||
f"UWB pose stale ({age_s:.2f}s > {self._tf_timeout}s) — "
|
||||
"map→odom TF suppressed",
|
||||
throttle_duration_sec=5.0,
|
||||
)
|
||||
return
|
||||
|
||||
# Extract map→base_link from UWB pose
|
||||
p = self._uwb_pose.pose.pose
|
||||
map_bl_th = _yaw_from_q(p.orientation.x, p.orientation.y,
|
||||
p.orientation.z, p.orientation.w)
|
||||
map_bl_x, map_bl_y = p.position.x, p.position.y
|
||||
|
||||
# Extract odom→base_link from wheel odometry
|
||||
o = self._odom_pose.pose.pose
|
||||
odom_bl_th = _yaw_from_q(o.orientation.x, o.orientation.y,
|
||||
o.orientation.z, o.orientation.w)
|
||||
odom_bl_x, odom_bl_y = o.position.x, o.position.y
|
||||
|
||||
# Compute map→odom
|
||||
mo_x, mo_y, mo_th = compute_map_odom_tf(
|
||||
map_bl_x, map_bl_y, map_bl_th,
|
||||
odom_bl_x, odom_bl_y, odom_bl_th,
|
||||
)
|
||||
|
||||
# Broadcast
|
||||
tf_msg = TransformStamped()
|
||||
tf_msg.header.stamp = now.to_msg()
|
||||
tf_msg.header.frame_id = self._map_frame
|
||||
tf_msg.child_frame_id = self._odom_frame
|
||||
tf_msg.transform.translation.x = mo_x
|
||||
tf_msg.transform.translation.y = mo_y
|
||||
tf_msg.transform.translation.z = 0.0
|
||||
qx, qy, qz, qw = _q_from_yaw(mo_th)
|
||||
tf_msg.transform.rotation.x = qx
|
||||
tf_msg.transform.rotation.y = qy
|
||||
tf_msg.transform.rotation.z = qz
|
||||
tf_msg.transform.rotation.w = qw
|
||||
self._tf_broadcaster.sendTransform(tf_msg)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = UwbPoseBridgeNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.try_shutdown()
|
||||
@ -0,0 +1,349 @@
|
||||
"""waypoint_follower_node.py — Simple sequential waypoint follower (Issue #599).
|
||||
|
||||
Exposes a ROS2 action server that accepts a list of waypoints and drives the
|
||||
robot through them one by one via Nav2's NavigateToPose action.
|
||||
|
||||
Action server: /saltybot/nav/follow_waypoints (nav2_msgs/FollowWaypoints)
|
||||
Nav2 client: /navigate_to_pose (nav2_msgs/NavigateToPose)
|
||||
|
||||
Waypoints topic (JSON, for operator / WebUI override):
|
||||
/saltybot/nav/waypoints (std_msgs/String)
|
||||
Payload: JSON array of {x, y, yaw_deg} objects.
|
||||
Example: [{"x": 1.0, "y": 2.0, "yaw_deg": 90.0}, ...]
|
||||
|
||||
Status topic (JSON):
|
||||
/saltybot/nav/waypoint_status (std_msgs/String)
|
||||
Published every cycle with WaypointSequencer.status_dict().
|
||||
|
||||
Cancel topic:
|
||||
/saltybot/nav/cancel (std_msgs/Empty)
|
||||
Cancels the active mission.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import math
|
||||
import threading
|
||||
from typing import List, Optional
|
||||
|
||||
import rclpy
|
||||
from rclpy.action import ActionClient, ActionServer, CancelResponse, GoalResponse
|
||||
from rclpy.node import Node
|
||||
from rclpy.callback_groups import ReentrantCallbackGroup
|
||||
|
||||
from geometry_msgs.msg import PoseStamped
|
||||
from nav2_msgs.action import FollowWaypoints, NavigateToPose
|
||||
from std_msgs.msg import Empty, String
|
||||
|
||||
from saltybot_nav2_uwb.waypoint_sequencer import Waypoint, WaypointSequencer
|
||||
|
||||
|
||||
def _yaw_to_quaternion(yaw_rad: float):
|
||||
"""Return (qx, qy, qz, qw) for a heading-only rotation."""
|
||||
return 0.0, 0.0, math.sin(yaw_rad * 0.5), math.cos(yaw_rad * 0.5)
|
||||
|
||||
|
||||
def waypoint_to_pose_stamped(wp: Waypoint, frame: str = "map") -> PoseStamped:
|
||||
"""Convert a Waypoint to a PoseStamped in the given frame."""
|
||||
msg = PoseStamped()
|
||||
msg.header.frame_id = frame
|
||||
msg.pose.position.x = wp.x
|
||||
msg.pose.position.y = wp.y
|
||||
msg.pose.position.z = 0.0
|
||||
qx, qy, qz, qw = _yaw_to_quaternion(math.radians(wp.yaw_deg))
|
||||
msg.pose.orientation.x = qx
|
||||
msg.pose.orientation.y = qy
|
||||
msg.pose.orientation.z = qz
|
||||
msg.pose.orientation.w = qw
|
||||
return msg
|
||||
|
||||
|
||||
class WaypointFollowerNode(Node):
|
||||
"""Sequential waypoint follower backed by Nav2 NavigateToPose.
|
||||
|
||||
Parameters (ROS):
|
||||
map_frame (str) default: map
|
||||
goal_xy_tolerance (float) default: 0.20 (m) — passed to Nav2
|
||||
goal_yaw_tolerance (float) default: 0.15 (rad) — passed to Nav2
|
||||
nav_timeout_s (float) default: 60.0 — abort if Nav2 takes longer
|
||||
status_hz (float) default: 2.0 — status publish rate
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("waypoint_follower")
|
||||
|
||||
self.declare_parameter("map_frame", "map")
|
||||
self.declare_parameter("goal_xy_tolerance", 0.20)
|
||||
self.declare_parameter("goal_yaw_tolerance", 0.15)
|
||||
self.declare_parameter("nav_timeout_s", 60.0)
|
||||
self.declare_parameter("status_hz", 2.0)
|
||||
|
||||
self._map_frame = self.get_parameter("map_frame").value
|
||||
self._nav_timeout = self.get_parameter("nav_timeout_s").value
|
||||
|
||||
self._seq = WaypointSequencer()
|
||||
self._lock = threading.Lock()
|
||||
self._cb_group = ReentrantCallbackGroup()
|
||||
|
||||
# ── Publishers ─────────────────────────────────────────────────
|
||||
self._status_pub = self.create_publisher(
|
||||
String, "/saltybot/nav/waypoint_status", 10)
|
||||
|
||||
# ── Subscriptions ──────────────────────────────────────────────
|
||||
self.create_subscription(
|
||||
String, "/saltybot/nav/waypoints",
|
||||
self._on_waypoints_json, 10,
|
||||
callback_group=self._cb_group)
|
||||
|
||||
self.create_subscription(
|
||||
Empty, "/saltybot/nav/cancel",
|
||||
self._on_cancel, 10,
|
||||
callback_group=self._cb_group)
|
||||
|
||||
# ── Nav2 action client ─────────────────────────────────────────
|
||||
self._nav_client: ActionClient = ActionClient(
|
||||
self, NavigateToPose, "/navigate_to_pose",
|
||||
callback_group=self._cb_group)
|
||||
|
||||
# ── Action server (nav2_msgs/FollowWaypoints) ──────────────────
|
||||
self._action_server = ActionServer(
|
||||
self,
|
||||
FollowWaypoints,
|
||||
"/saltybot/nav/follow_waypoints",
|
||||
execute_callback=self._execute_follow_waypoints,
|
||||
goal_callback=self._goal_callback,
|
||||
cancel_callback=self._cancel_callback,
|
||||
callback_group=self._cb_group,
|
||||
)
|
||||
|
||||
hz = self.get_parameter("status_hz").value
|
||||
self.create_timer(1.0 / hz, self._publish_status,
|
||||
callback_group=self._cb_group)
|
||||
|
||||
self.get_logger().info(
|
||||
f"WaypointFollower ready | "
|
||||
f"action=/saltybot/nav/follow_waypoints "
|
||||
f"nav2=/navigate_to_pose frame={self._map_frame}"
|
||||
)
|
||||
|
||||
# ── Action server callbacks ────────────────────────────────────────────
|
||||
|
||||
def _goal_callback(self, goal_request):
|
||||
if not self._nav_client.wait_for_server(timeout_sec=2.0):
|
||||
self.get_logger().error(
|
||||
"Nav2 NavigateToPose server not available — rejecting goal")
|
||||
return GoalResponse.REJECT
|
||||
if len(goal_request.poses) == 0:
|
||||
self.get_logger().warn("Empty waypoint list — rejecting goal")
|
||||
return GoalResponse.REJECT
|
||||
return GoalResponse.ACCEPT
|
||||
|
||||
def _cancel_callback(self, _goal_handle):
|
||||
return CancelResponse.ACCEPT
|
||||
|
||||
async def _execute_follow_waypoints(self, goal_handle) -> FollowWaypoints.Result:
|
||||
"""Execute the action: drive through each PoseStamped in sequence."""
|
||||
result = FollowWaypoints.Result()
|
||||
feedback_msg = FollowWaypoints.Feedback()
|
||||
|
||||
poses: List[PoseStamped] = list(goal_handle.request.poses)
|
||||
waypoints = [
|
||||
Waypoint(
|
||||
x=p.pose.position.x,
|
||||
y=p.pose.position.y,
|
||||
yaw_deg=math.degrees(
|
||||
math.atan2(
|
||||
2.0 * (p.pose.orientation.w * p.pose.orientation.z
|
||||
+ p.pose.orientation.x * p.pose.orientation.y),
|
||||
1.0 - 2.0 * (p.pose.orientation.y ** 2
|
||||
+ p.pose.orientation.z ** 2),
|
||||
)
|
||||
),
|
||||
)
|
||||
for p in poses
|
||||
]
|
||||
|
||||
with self._lock:
|
||||
self._seq.start(waypoints)
|
||||
|
||||
self.get_logger().info(
|
||||
f"Starting waypoint mission: {len(waypoints)} waypoints")
|
||||
|
||||
missed_waypoints: List[int] = []
|
||||
|
||||
while True:
|
||||
# Check cancellation
|
||||
if goal_handle.is_cancel_requested:
|
||||
with self._lock:
|
||||
self._seq.cancel()
|
||||
goal_handle.canceled()
|
||||
result.missed_waypoints = missed_waypoints
|
||||
return result
|
||||
|
||||
with self._lock:
|
||||
if self._seq.is_done:
|
||||
break
|
||||
wp = self._seq.current_waypoint
|
||||
idx = self._seq.current_index
|
||||
|
||||
if wp is None:
|
||||
break
|
||||
|
||||
# Send goal to Nav2
|
||||
nav_goal = NavigateToPose.Goal()
|
||||
nav_goal.pose = waypoint_to_pose_stamped(wp, self._map_frame)
|
||||
|
||||
self.get_logger().info(
|
||||
f"Sending waypoint {idx + 1}/{len(waypoints)}: "
|
||||
f"({wp.x:.2f}, {wp.y:.2f}, {wp.yaw_deg:.1f}°)"
|
||||
)
|
||||
|
||||
if not self._nav_client.wait_for_server(timeout_sec=5.0):
|
||||
self.get_logger().error("Nav2 server unavailable — aborting")
|
||||
with self._lock:
|
||||
self._seq.abort("Nav2 server unavailable")
|
||||
missed_waypoints.extend(range(idx, len(waypoints)))
|
||||
break
|
||||
|
||||
send_future = self._nav_client.send_goal_async(
|
||||
nav_goal,
|
||||
feedback_callback=lambda fb: None, # suppress individual feedback
|
||||
)
|
||||
# Spin until goal accepted
|
||||
rclpy.spin_until_future_complete(
|
||||
self, send_future, timeout_sec=self._nav_timeout)
|
||||
|
||||
if not send_future.done():
|
||||
self.get_logger().error(
|
||||
f"Waypoint {idx + 1}: goal send timed out — skipping")
|
||||
missed_waypoints.append(idx)
|
||||
with self._lock:
|
||||
self._seq.advance()
|
||||
continue
|
||||
|
||||
goal_res = send_future.result()
|
||||
if not goal_res.accepted:
|
||||
self.get_logger().warn(
|
||||
f"Waypoint {idx + 1}: goal rejected by Nav2 — skipping")
|
||||
missed_waypoints.append(idx)
|
||||
with self._lock:
|
||||
self._seq.advance()
|
||||
continue
|
||||
|
||||
# Wait for Nav2 to reach the goal
|
||||
result_future = goal_res.get_result_async()
|
||||
rclpy.spin_until_future_complete(
|
||||
self, result_future, timeout_sec=self._nav_timeout)
|
||||
|
||||
if not result_future.done():
|
||||
self.get_logger().error(
|
||||
f"Waypoint {idx + 1}: Nav2 timed out after "
|
||||
f"{self._nav_timeout:.0f}s — skipping")
|
||||
goal_res.cancel_goal_async()
|
||||
missed_waypoints.append(idx)
|
||||
else:
|
||||
nav_result = result_future.result()
|
||||
from action_msgs.msg import GoalStatus
|
||||
if nav_result.status == GoalStatus.STATUS_SUCCEEDED:
|
||||
self.get_logger().info(
|
||||
f"Waypoint {idx + 1}/{len(waypoints)} reached ✓")
|
||||
else:
|
||||
self.get_logger().warn(
|
||||
f"Waypoint {idx + 1} failed (Nav2 status "
|
||||
f"{nav_result.status}) — skipping")
|
||||
missed_waypoints.append(idx)
|
||||
|
||||
with self._lock:
|
||||
self._seq.advance()
|
||||
|
||||
# Publish feedback
|
||||
feedback_msg.current_waypoint = self._seq.current_index
|
||||
goal_handle.publish_feedback(feedback_msg)
|
||||
|
||||
# Mission complete
|
||||
with self._lock:
|
||||
final_state = self._seq.state
|
||||
|
||||
if final_state == WaypointSequencer.SUCCEEDED and not missed_waypoints:
|
||||
self.get_logger().info(
|
||||
f"Waypoint mission SUCCEEDED ({len(waypoints)} waypoints)")
|
||||
goal_handle.succeed()
|
||||
elif final_state == WaypointSequencer.CANCELED:
|
||||
self.get_logger().info("Waypoint mission CANCELED")
|
||||
goal_handle.canceled()
|
||||
else:
|
||||
self.get_logger().warn(
|
||||
f"Waypoint mission ended: state={final_state} "
|
||||
f"missed={missed_waypoints}")
|
||||
goal_handle.succeed() # FollowWaypoints always succeeds; missed list carries info
|
||||
|
||||
result.missed_waypoints = missed_waypoints
|
||||
return result
|
||||
|
||||
# ── JSON topic handler ─────────────────────────────────────────────────
|
||||
|
||||
def _on_waypoints_json(self, msg: String) -> None:
|
||||
"""Accept a JSON waypoint list and start a new mission immediately.
|
||||
|
||||
This is a convenience interface for WebUI / operator use.
|
||||
For production use prefer the action server (supports feedback/cancel).
|
||||
"""
|
||||
try:
|
||||
raw = json.loads(msg.data)
|
||||
except json.JSONDecodeError as exc:
|
||||
self.get_logger().error(f"Invalid waypoints JSON: {exc}")
|
||||
return
|
||||
|
||||
if not isinstance(raw, list) or len(raw) == 0:
|
||||
self.get_logger().error(
|
||||
"Waypoints JSON must be a non-empty array of {x, y, yaw_deg}")
|
||||
return
|
||||
|
||||
try:
|
||||
waypoints = [Waypoint.from_dict(d) for d in raw]
|
||||
except (KeyError, ValueError) as exc:
|
||||
self.get_logger().error(f"Malformed waypoint in list: {exc}")
|
||||
return
|
||||
|
||||
if not self._nav_client.wait_for_server(timeout_sec=2.0):
|
||||
self.get_logger().error(
|
||||
"Nav2 server not ready — cannot execute JSON waypoints")
|
||||
return
|
||||
|
||||
self.get_logger().info(
|
||||
f"JSON mission: {len(waypoints)} waypoints received via topic")
|
||||
with self._lock:
|
||||
self._seq.start(waypoints)
|
||||
# Execution driven by action server; JSON topic just seeds the sequencer.
|
||||
# For full feedback, use the FollowWaypoints action server directly.
|
||||
|
||||
# ── Cancel topic handler ───────────────────────────────────────────────
|
||||
|
||||
def _on_cancel(self, _msg: Empty) -> None:
|
||||
with self._lock:
|
||||
if self._seq.is_running:
|
||||
self._seq.cancel()
|
||||
self.get_logger().info(
|
||||
"Mission cancelled via /saltybot/nav/cancel")
|
||||
|
||||
# ── Status publish ─────────────────────────────────────────────────────
|
||||
|
||||
def _publish_status(self) -> None:
|
||||
with self._lock:
|
||||
d = self._seq.status_dict()
|
||||
self._status_pub.publish(String(data=json.dumps(d)))
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = WaypointFollowerNode()
|
||||
executor = rclpy.executors.MultiThreadedExecutor()
|
||||
executor.add_node(node)
|
||||
try:
|
||||
executor.spin()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.try_shutdown()
|
||||
@ -0,0 +1,176 @@
|
||||
"""waypoint_sequencer.py — Pure-Python waypoint sequencing logic (Issue #599).
|
||||
|
||||
No ROS2 dependencies — importable and fully unit-testable without a live
|
||||
ROS2 install.
|
||||
|
||||
Design
|
||||
──────
|
||||
WaypointSequencer tracks a list of (x, y, yaw_deg) waypoints and the index of
|
||||
the currently active goal. The ROS2 node (waypoint_follower_node.py) drives
|
||||
this state machine by calling:
|
||||
|
||||
sequencer.start(waypoints) — load a new mission
|
||||
sequencer.advance() — mark current waypoint reached
|
||||
sequencer.abort(reason) — mark mission failed
|
||||
sequencer.cancel() — mark mission cancelled
|
||||
|
||||
State transitions:
|
||||
IDLE → RUNNING (on start())
|
||||
RUNNING → SUCCEEDED (on advance() when last waypoint reached)
|
||||
RUNNING → ABORTED (on abort())
|
||||
RUNNING → CANCELED (on cancel())
|
||||
ABORTED / CANCELED / SUCCEEDED → any state via start()
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Optional, Tuple
|
||||
|
||||
|
||||
# ── Waypoint data type ────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class Waypoint:
|
||||
"""A single navigation goal in the map frame."""
|
||||
x: float
|
||||
y: float
|
||||
yaw_deg: float = 0.0
|
||||
|
||||
def __post_init__(self):
|
||||
if not (-360.0 <= self.yaw_deg <= 360.0):
|
||||
raise ValueError(
|
||||
f"yaw_deg must be in [-360, 360], got {self.yaw_deg}"
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, d: dict) -> "Waypoint":
|
||||
"""Construct from a plain dict (e.g. parsed from JSON)."""
|
||||
return cls(
|
||||
x=float(d["x"]),
|
||||
y=float(d["y"]),
|
||||
yaw_deg=float(d.get("yaw_deg", 0.0)),
|
||||
)
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {"x": self.x, "y": self.y, "yaw_deg": self.yaw_deg}
|
||||
|
||||
|
||||
# ── State machine ─────────────────────────────────────────────────────────────
|
||||
|
||||
class WaypointSequencer:
|
||||
"""Tracks state for sequential waypoint execution.
|
||||
|
||||
Thread-safety: external callers (ROS2 callbacks, action server) should
|
||||
hold their own lock when calling multiple methods atomically.
|
||||
"""
|
||||
|
||||
IDLE = "idle"
|
||||
RUNNING = "running"
|
||||
SUCCEEDED = "succeeded"
|
||||
ABORTED = "aborted"
|
||||
CANCELED = "canceled"
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._waypoints: List[Waypoint] = []
|
||||
self._current: int = 0
|
||||
self._state: str = self.IDLE
|
||||
self._abort_reason: Optional[str] = None
|
||||
|
||||
# ── Control methods ────────────────────────────────────────────────────
|
||||
|
||||
def start(self, waypoints: List[Waypoint]) -> None:
|
||||
"""Load a new mission and set state to RUNNING.
|
||||
|
||||
Raises:
|
||||
ValueError: if waypoints is empty.
|
||||
"""
|
||||
if not waypoints:
|
||||
raise ValueError("Waypoint list must not be empty")
|
||||
self._waypoints = list(waypoints)
|
||||
self._current = 0
|
||||
self._state = self.RUNNING
|
||||
self._abort_reason = None
|
||||
|
||||
def advance(self) -> None:
|
||||
"""Mark the current waypoint as reached and move to the next.
|
||||
|
||||
If the last waypoint is reached the state transitions to SUCCEEDED.
|
||||
|
||||
Raises:
|
||||
RuntimeError: if not RUNNING.
|
||||
"""
|
||||
if self._state != self.RUNNING:
|
||||
raise RuntimeError(
|
||||
f"advance() called in state {self._state!r} (must be RUNNING)"
|
||||
)
|
||||
self._current += 1
|
||||
if self._current >= len(self._waypoints):
|
||||
self._state = self.SUCCEEDED
|
||||
|
||||
def abort(self, reason: str = "") -> None:
|
||||
"""Mark the mission as aborted (e.g. Nav2 returned FAILED)."""
|
||||
self._state = self.ABORTED
|
||||
self._abort_reason = reason
|
||||
|
||||
def cancel(self) -> None:
|
||||
"""Mark the mission as cancelled (e.g. operator preempt)."""
|
||||
self._state = self.CANCELED
|
||||
|
||||
# ── Query methods ──────────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
def state(self) -> str:
|
||||
return self._state
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
return self._state == self.RUNNING
|
||||
|
||||
@property
|
||||
def is_done(self) -> bool:
|
||||
return self._state in (self.SUCCEEDED, self.ABORTED, self.CANCELED)
|
||||
|
||||
@property
|
||||
def current_waypoint(self) -> Optional[Waypoint]:
|
||||
"""The active goal, or None if not RUNNING or already done."""
|
||||
if self._state != self.RUNNING:
|
||||
return None
|
||||
if self._current >= len(self._waypoints):
|
||||
return None
|
||||
return self._waypoints[self._current]
|
||||
|
||||
@property
|
||||
def current_index(self) -> int:
|
||||
"""0-based index of the active waypoint (0 if not started)."""
|
||||
return self._current
|
||||
|
||||
@property
|
||||
def total(self) -> int:
|
||||
"""Total number of waypoints in the current mission."""
|
||||
return len(self._waypoints)
|
||||
|
||||
@property
|
||||
def progress(self) -> Tuple[int, int]:
|
||||
"""Return (completed_count, total_count)."""
|
||||
return self._current, len(self._waypoints)
|
||||
|
||||
@property
|
||||
def remaining(self) -> int:
|
||||
return max(0, len(self._waypoints) - self._current)
|
||||
|
||||
@property
|
||||
def abort_reason(self) -> Optional[str]:
|
||||
return self._abort_reason
|
||||
|
||||
def status_dict(self) -> dict:
|
||||
"""Serialisable status snapshot for MQTT / diagnostics."""
|
||||
wp = self.current_waypoint
|
||||
return {
|
||||
"state": self._state,
|
||||
"current_index": self._current,
|
||||
"total": len(self._waypoints),
|
||||
"remaining": self.remaining,
|
||||
"current_wp": wp.to_dict() if wp else None,
|
||||
"abort_reason": self._abort_reason,
|
||||
}
|
||||
4
jetson/ros2_ws/src/saltybot_nav2_uwb/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_nav2_uwb/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_nav2_uwb
|
||||
[install]
|
||||
install_scripts=$base/lib/saltybot_nav2_uwb
|
||||
35
jetson/ros2_ws/src/saltybot_nav2_uwb/setup.py
Normal file
35
jetson/ros2_ws/src/saltybot_nav2_uwb/setup.py
Normal file
@ -0,0 +1,35 @@
|
||||
from setuptools import setup
|
||||
|
||||
package_name = "saltybot_nav2_uwb"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version="0.1.0",
|
||||
packages=[package_name],
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||
(f"share/{package_name}", ["package.xml"]),
|
||||
(f"share/{package_name}/launch", [
|
||||
"launch/nav2_uwb.launch.py",
|
||||
]),
|
||||
(f"share/{package_name}/config", [
|
||||
"config/nav2_uwb_params.yaml",
|
||||
]),
|
||||
],
|
||||
install_requires=["setuptools"],
|
||||
zip_safe=True,
|
||||
maintainer="sl-jetson",
|
||||
maintainer_email="sl-jetson@saltylab.local",
|
||||
description=(
|
||||
"Nav2 with UWB localization: pose bridge (map→odom TF from UWB-IMU EKF) "
|
||||
"and sequential waypoint follower (Issue #599)"
|
||||
),
|
||||
license="Apache-2.0",
|
||||
tests_require=["pytest"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"uwb_pose_bridge_node = saltybot_nav2_uwb.uwb_pose_bridge_node:main",
|
||||
"waypoint_follower_node = saltybot_nav2_uwb.waypoint_follower_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -0,0 +1,459 @@
|
||||
"""test_waypoint_follower.py — Unit tests for Nav2-UWB waypoint logic (Issue #599).
|
||||
|
||||
Tests are ROS2-free. Run with:
|
||||
python3 -m pytest \
|
||||
jetson/ros2_ws/src/saltybot_nav2_uwb/test/test_waypoint_follower.py -v
|
||||
|
||||
Coverage:
|
||||
• Waypoint dataclass construction, validation, serialisation
|
||||
• WaypointSequencer state machine (all transitions)
|
||||
• Progress tracking
|
||||
• UWB pose-bridge 2-D TF maths
|
||||
• waypoint_to_pose_stamped conversion (quaternion from yaw)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import sys
|
||||
import os
|
||||
|
||||
# Allow import without ROS2 install
|
||||
_pkg_dir = os.path.join(os.path.dirname(__file__), "..", "saltybot_nav2_uwb")
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||
sys.path.insert(0, _pkg_dir)
|
||||
|
||||
import pytest
|
||||
|
||||
# ── Stub ROS2 modules before importing any node files ────────────────────────
|
||||
|
||||
def _install_stubs():
|
||||
"""Inject minimal stubs so node modules are importable without ROS2."""
|
||||
from unittest.mock import MagicMock
|
||||
import types
|
||||
|
||||
for mod_name in [
|
||||
"rclpy", "rclpy.node", "rclpy.action", "rclpy.time",
|
||||
"rclpy.callback_groups", "rclpy.executors",
|
||||
"geometry_msgs", "geometry_msgs.msg",
|
||||
"nav_msgs", "nav_msgs.msg",
|
||||
"nav2_msgs", "nav2_msgs.action",
|
||||
"std_msgs", "std_msgs.msg",
|
||||
"tf2_ros", "action_msgs", "action_msgs.msg",
|
||||
]:
|
||||
sys.modules.setdefault(mod_name, MagicMock())
|
||||
|
||||
# Make geometry_msgs.msg.PoseWithCovarianceStamped / TransformStamped real
|
||||
# enough to be used as type hints
|
||||
sys.modules["geometry_msgs.msg"].PoseWithCovarianceStamped = MagicMock
|
||||
sys.modules["geometry_msgs.msg"].TransformStamped = MagicMock
|
||||
sys.modules["nav_msgs.msg"].Odometry = MagicMock
|
||||
sys.modules["std_msgs.msg"].Empty = MagicMock
|
||||
sys.modules["std_msgs.msg"].String = MagicMock
|
||||
sys.modules["tf2_ros"].TransformBroadcaster = MagicMock
|
||||
|
||||
_install_stubs()
|
||||
|
||||
from waypoint_sequencer import Waypoint, WaypointSequencer
|
||||
|
||||
# ── helpers from uwb_pose_bridge_node (pure-python section) ──────────────────
|
||||
|
||||
from uwb_pose_bridge_node import (
|
||||
_yaw_from_q,
|
||||
_q_from_yaw,
|
||||
_invert_2d,
|
||||
_compose_2d,
|
||||
compute_map_odom_tf,
|
||||
)
|
||||
|
||||
|
||||
# ─── Waypoint dataclass ────────────────────────────────────────────────────────
|
||||
|
||||
class TestWaypointConstruction:
|
||||
def test_basic_construction(self):
|
||||
wp = Waypoint(x=1.0, y=2.0, yaw_deg=45.0)
|
||||
assert wp.x == 1.0
|
||||
assert wp.y == 2.0
|
||||
assert wp.yaw_deg == 45.0
|
||||
|
||||
def test_default_yaw_is_zero(self):
|
||||
wp = Waypoint(x=0.0, y=0.0)
|
||||
assert wp.yaw_deg == 0.0
|
||||
|
||||
def test_negative_coordinates(self):
|
||||
wp = Waypoint(x=-3.5, y=-1.2, yaw_deg=-90.0)
|
||||
assert wp.x == -3.5
|
||||
assert wp.yaw_deg == -90.0
|
||||
|
||||
def test_yaw_boundary_360(self):
|
||||
wp = Waypoint(x=0.0, y=0.0, yaw_deg=360.0)
|
||||
assert wp.yaw_deg == 360.0
|
||||
|
||||
def test_yaw_boundary_neg360(self):
|
||||
wp = Waypoint(x=0.0, y=0.0, yaw_deg=-360.0)
|
||||
assert wp.yaw_deg == -360.0
|
||||
|
||||
def test_yaw_out_of_range_raises(self):
|
||||
with pytest.raises(ValueError, match="yaw_deg"):
|
||||
Waypoint(x=0.0, y=0.0, yaw_deg=400.0)
|
||||
|
||||
def test_yaw_out_of_range_negative_raises(self):
|
||||
with pytest.raises(ValueError, match="yaw_deg"):
|
||||
Waypoint(x=0.0, y=0.0, yaw_deg=-400.0)
|
||||
|
||||
|
||||
class TestWaypointFromDict:
|
||||
def test_full_dict(self):
|
||||
wp = Waypoint.from_dict({"x": 1.5, "y": -2.5, "yaw_deg": 30.0})
|
||||
assert wp.x == 1.5
|
||||
assert wp.y == -2.5
|
||||
assert wp.yaw_deg == 30.0
|
||||
|
||||
def test_missing_yaw_defaults_zero(self):
|
||||
wp = Waypoint.from_dict({"x": 0.0, "y": 1.0})
|
||||
assert wp.yaw_deg == 0.0
|
||||
|
||||
def test_string_values_coerced(self):
|
||||
wp = Waypoint.from_dict({"x": "1.0", "y": "2.0", "yaw_deg": "45"})
|
||||
assert wp.x == 1.0
|
||||
|
||||
def test_missing_x_raises(self):
|
||||
with pytest.raises(KeyError):
|
||||
Waypoint.from_dict({"y": 1.0})
|
||||
|
||||
def test_missing_y_raises(self):
|
||||
with pytest.raises(KeyError):
|
||||
Waypoint.from_dict({"x": 1.0})
|
||||
|
||||
|
||||
class TestWaypointToDict:
|
||||
def test_roundtrip(self):
|
||||
wp = Waypoint(x=1.0, y=2.0, yaw_deg=90.0)
|
||||
d = wp.to_dict()
|
||||
wp2 = Waypoint.from_dict(d)
|
||||
assert wp2.x == wp.x
|
||||
assert wp2.y == wp.y
|
||||
assert wp2.yaw_deg == wp.yaw_deg
|
||||
|
||||
def test_keys_present(self):
|
||||
d = Waypoint(x=0.0, y=0.0).to_dict()
|
||||
assert "x" in d and "y" in d and "yaw_deg" in d
|
||||
|
||||
|
||||
# ─── WaypointSequencer state machine ─────────────────────────────────────────
|
||||
|
||||
def _three_wp():
|
||||
return [Waypoint(float(i), 0.0) for i in range(3)]
|
||||
|
||||
|
||||
class TestSequencerInitialState:
|
||||
def setup_method(self):
|
||||
self.seq = WaypointSequencer()
|
||||
|
||||
def test_initial_state_idle(self):
|
||||
assert self.seq.state == WaypointSequencer.IDLE
|
||||
|
||||
def test_is_not_running(self):
|
||||
assert not self.seq.is_running
|
||||
|
||||
def test_is_not_done(self):
|
||||
assert not self.seq.is_done
|
||||
|
||||
def test_no_current_waypoint(self):
|
||||
assert self.seq.current_waypoint is None
|
||||
|
||||
def test_total_zero(self):
|
||||
assert self.seq.total == 0
|
||||
|
||||
def test_progress_zero(self):
|
||||
assert self.seq.progress == (0, 0)
|
||||
|
||||
def test_no_abort_reason(self):
|
||||
assert self.seq.abort_reason is None
|
||||
|
||||
|
||||
class TestSequencerStart:
|
||||
def setup_method(self):
|
||||
self.seq = WaypointSequencer()
|
||||
|
||||
def test_transitions_to_running(self):
|
||||
self.seq.start(_three_wp())
|
||||
assert self.seq.state == WaypointSequencer.RUNNING
|
||||
|
||||
def test_is_running_true(self):
|
||||
self.seq.start(_three_wp())
|
||||
assert self.seq.is_running
|
||||
|
||||
def test_is_done_false(self):
|
||||
self.seq.start(_three_wp())
|
||||
assert not self.seq.is_done
|
||||
|
||||
def test_current_index_zero(self):
|
||||
self.seq.start(_three_wp())
|
||||
assert self.seq.current_index == 0
|
||||
|
||||
def test_total_set(self):
|
||||
self.seq.start(_three_wp())
|
||||
assert self.seq.total == 3
|
||||
|
||||
def test_first_waypoint_returned(self):
|
||||
wps = _three_wp()
|
||||
self.seq.start(wps)
|
||||
assert self.seq.current_waypoint is wps[0]
|
||||
|
||||
def test_empty_list_raises(self):
|
||||
with pytest.raises(ValueError, match="empty"):
|
||||
self.seq.start([])
|
||||
|
||||
def test_restart_resets_state(self):
|
||||
self.seq.start(_three_wp())
|
||||
self.seq.advance()
|
||||
self.seq.start([Waypoint(9.0, 9.0)])
|
||||
assert self.seq.current_index == 0
|
||||
assert self.seq.total == 1
|
||||
|
||||
|
||||
class TestSequencerAdvance:
|
||||
def setup_method(self):
|
||||
self.seq = WaypointSequencer()
|
||||
|
||||
def test_advance_moves_to_next(self):
|
||||
self.seq.start(_three_wp())
|
||||
self.seq.advance()
|
||||
assert self.seq.current_index == 1
|
||||
|
||||
def test_advance_twice(self):
|
||||
self.seq.start(_three_wp())
|
||||
self.seq.advance()
|
||||
self.seq.advance()
|
||||
assert self.seq.current_index == 2
|
||||
|
||||
def test_advance_last_sets_succeeded(self):
|
||||
self.seq.start(_three_wp())
|
||||
for _ in range(3):
|
||||
self.seq.advance()
|
||||
assert self.seq.state == WaypointSequencer.SUCCEEDED
|
||||
|
||||
def test_advance_last_sets_done(self):
|
||||
self.seq.start(_three_wp())
|
||||
for _ in range(3):
|
||||
self.seq.advance()
|
||||
assert self.seq.is_done
|
||||
|
||||
def test_advance_on_idle_raises(self):
|
||||
with pytest.raises(RuntimeError, match="RUNNING"):
|
||||
self.seq.advance()
|
||||
|
||||
def test_advance_on_aborted_raises(self):
|
||||
self.seq.start(_three_wp())
|
||||
self.seq.abort()
|
||||
with pytest.raises(RuntimeError, match="RUNNING"):
|
||||
self.seq.advance()
|
||||
|
||||
def test_no_current_waypoint_after_succeed(self):
|
||||
self.seq.start(_three_wp())
|
||||
for _ in range(3):
|
||||
self.seq.advance()
|
||||
assert self.seq.current_waypoint is None
|
||||
|
||||
|
||||
class TestSequencerAbort:
|
||||
def setup_method(self):
|
||||
self.seq = WaypointSequencer()
|
||||
|
||||
def test_abort_transitions_state(self):
|
||||
self.seq.start(_three_wp())
|
||||
self.seq.abort("Nav2 failed")
|
||||
assert self.seq.state == WaypointSequencer.ABORTED
|
||||
|
||||
def test_abort_sets_reason(self):
|
||||
self.seq.start(_three_wp())
|
||||
self.seq.abort("timeout")
|
||||
assert self.seq.abort_reason == "timeout"
|
||||
|
||||
def test_abort_sets_done(self):
|
||||
self.seq.start(_three_wp())
|
||||
self.seq.abort()
|
||||
assert self.seq.is_done
|
||||
|
||||
def test_abort_no_reason(self):
|
||||
self.seq.start(_three_wp())
|
||||
self.seq.abort()
|
||||
assert self.seq.abort_reason == ""
|
||||
|
||||
|
||||
class TestSequencerCancel:
|
||||
def setup_method(self):
|
||||
self.seq = WaypointSequencer()
|
||||
|
||||
def test_cancel_transitions_state(self):
|
||||
self.seq.start(_three_wp())
|
||||
self.seq.cancel()
|
||||
assert self.seq.state == WaypointSequencer.CANCELED
|
||||
|
||||
def test_cancel_sets_done(self):
|
||||
self.seq.start(_three_wp())
|
||||
self.seq.cancel()
|
||||
assert self.seq.is_done
|
||||
|
||||
def test_cancel_clears_current_waypoint(self):
|
||||
self.seq.start(_three_wp())
|
||||
self.seq.cancel()
|
||||
assert self.seq.current_waypoint is None
|
||||
|
||||
|
||||
class TestSequencerProgress:
|
||||
def setup_method(self):
|
||||
self.seq = WaypointSequencer()
|
||||
self.seq.start(_three_wp())
|
||||
|
||||
def test_initial_progress(self):
|
||||
assert self.seq.progress == (0, 3)
|
||||
|
||||
def test_progress_after_one(self):
|
||||
self.seq.advance()
|
||||
assert self.seq.progress == (1, 3)
|
||||
|
||||
def test_remaining_initial(self):
|
||||
assert self.seq.remaining == 3
|
||||
|
||||
def test_remaining_after_advance(self):
|
||||
self.seq.advance()
|
||||
assert self.seq.remaining == 2
|
||||
|
||||
def test_remaining_after_all(self):
|
||||
for _ in range(3):
|
||||
self.seq.advance()
|
||||
assert self.seq.remaining == 0
|
||||
|
||||
|
||||
class TestSequencerStatusDict:
|
||||
def setup_method(self):
|
||||
self.seq = WaypointSequencer()
|
||||
|
||||
def test_idle_status_dict(self):
|
||||
d = self.seq.status_dict()
|
||||
assert d["state"] == "idle"
|
||||
assert d["total"] == 0
|
||||
assert d["current_wp"] is None
|
||||
|
||||
def test_running_status_dict(self):
|
||||
self.seq.start(_three_wp())
|
||||
d = self.seq.status_dict()
|
||||
assert d["state"] == "running"
|
||||
assert d["total"] == 3
|
||||
assert d["current_wp"] is not None
|
||||
assert "x" in d["current_wp"]
|
||||
|
||||
def test_succeeded_status_dict(self):
|
||||
self.seq.start(_three_wp())
|
||||
for _ in range(3):
|
||||
self.seq.advance()
|
||||
d = self.seq.status_dict()
|
||||
assert d["state"] == "succeeded"
|
||||
assert d["remaining"] == 0
|
||||
|
||||
|
||||
# ─── 2-D TF maths (uwb_pose_bridge_node) ──────────────────────────────────────
|
||||
|
||||
EPS = 1e-9
|
||||
|
||||
|
||||
class TestYawFromQ:
|
||||
def test_zero_yaw(self):
|
||||
qx, qy, qz, qw = _q_from_yaw(0.0)
|
||||
assert abs(_yaw_from_q(qx, qy, qz, qw)) < EPS
|
||||
|
||||
def test_90_degrees(self):
|
||||
yaw = math.pi / 2
|
||||
qx, qy, qz, qw = _q_from_yaw(yaw)
|
||||
assert abs(_yaw_from_q(qx, qy, qz, qw) - yaw) < 1e-6
|
||||
|
||||
def test_minus_90_degrees(self):
|
||||
yaw = -math.pi / 2
|
||||
qx, qy, qz, qw = _q_from_yaw(yaw)
|
||||
assert abs(_yaw_from_q(qx, qy, qz, qw) - yaw) < 1e-6
|
||||
|
||||
def test_180_degrees(self):
|
||||
yaw = math.pi
|
||||
qx, qy, qz, qw = _q_from_yaw(yaw)
|
||||
# atan2 wraps to -π at exactly π; tolerate either
|
||||
result = _yaw_from_q(qx, qy, qz, qw)
|
||||
assert abs(abs(result) - math.pi) < 1e-6
|
||||
|
||||
def test_roundtrip_arbitrary(self):
|
||||
for deg in (0, 30, 60, 90, 120, 150, 170, -45, -120):
|
||||
yaw = math.radians(deg)
|
||||
qx, qy, qz, qw = _q_from_yaw(yaw)
|
||||
back = _yaw_from_q(qx, qy, qz, qw)
|
||||
assert abs(back - yaw) < 1e-6, f"roundtrip failed for {deg}°"
|
||||
|
||||
|
||||
class TestInvert2D:
|
||||
def test_identity(self):
|
||||
xi, yi, thi = _invert_2d(0.0, 0.0, 0.0)
|
||||
assert abs(xi) < EPS and abs(yi) < EPS and abs(thi) < EPS
|
||||
|
||||
def test_translation_only(self):
|
||||
# invert pure translation: inv(T(3,4,0)) = T(-3,-4,0)
|
||||
xi, yi, thi = _invert_2d(3.0, 4.0, 0.0)
|
||||
assert abs(xi + 3.0) < EPS
|
||||
assert abs(yi + 4.0) < EPS
|
||||
assert abs(thi) < EPS
|
||||
|
||||
def test_rotation_only(self):
|
||||
# invert pure rotation by 90°
|
||||
xi, yi, thi = _invert_2d(0.0, 0.0, math.pi / 2)
|
||||
assert abs(thi + math.pi / 2) < EPS
|
||||
|
||||
def test_compose_with_inverse_is_identity(self):
|
||||
x, y, th = 2.5, -1.3, math.radians(47)
|
||||
ix, iy, ith = _invert_2d(x, y, th)
|
||||
rx, ry, rth = _compose_2d(x, y, th, ix, iy, ith)
|
||||
assert abs(rx) < 1e-9
|
||||
assert abs(ry) < 1e-9
|
||||
assert abs(rth % (2 * math.pi)) < 1e-9 or abs(rth) < 1e-9
|
||||
|
||||
|
||||
class TestComputeMapOdomTf:
|
||||
def test_identity_pose(self):
|
||||
"""If both poses are identity, map→odom is identity."""
|
||||
mx, my, mth = compute_map_odom_tf(0, 0, 0, 0, 0, 0)
|
||||
assert abs(mx) < EPS and abs(my) < EPS and abs(mth) < EPS
|
||||
|
||||
def test_odom_is_map(self):
|
||||
"""If robot is at same position in both frames, map→odom is identity."""
|
||||
pose = (1.0, 2.0, math.radians(45))
|
||||
mx, my, mth = compute_map_odom_tf(*pose, *pose)
|
||||
assert abs(mx) < 1e-9 and abs(my) < 1e-9 and abs(mth) < 1e-9
|
||||
|
||||
def test_pure_translation_offset(self):
|
||||
"""map_bl at (5,0,0), odom_bl at (2,0,0) → map_odom at (3,0,0)."""
|
||||
mx, my, mth = compute_map_odom_tf(5.0, 0.0, 0.0, 2.0, 0.0, 0.0)
|
||||
assert abs(mx - 3.0) < 1e-9
|
||||
assert abs(my) < 1e-9
|
||||
assert abs(mth) < 1e-9
|
||||
|
||||
def test_tf_reconstruction(self):
|
||||
"""T_map_odom ⊗ T_odom_bl == T_map_bl (round-trip check)."""
|
||||
map_bl = (3.0, 1.5, math.radians(30))
|
||||
odom_bl = (1.0, 0.5, math.radians(10))
|
||||
mo = compute_map_odom_tf(*map_bl, *odom_bl)
|
||||
# Recompose: T_map_odom ⊗ T_odom_bl should equal T_map_bl
|
||||
rx, ry, rth = _compose_2d(*mo, *odom_bl)
|
||||
assert abs(rx - map_bl[0]) < 1e-9
|
||||
assert abs(ry - map_bl[1]) < 1e-9
|
||||
assert abs(rth - map_bl[2]) < 1e-9
|
||||
|
||||
def test_rotated_frame(self):
|
||||
"""Verify correctness with a 90° heading offset."""
|
||||
# map: robot at (1,0) facing 90°; odom: robot at (0,1) facing 0°
|
||||
mo_x, mo_y, mo_th = compute_map_odom_tf(
|
||||
1.0, 0.0, math.pi / 2,
|
||||
0.0, 1.0, 0.0,
|
||||
)
|
||||
# Recompose and check
|
||||
rx, ry, rth = _compose_2d(mo_x, mo_y, mo_th, 0.0, 1.0, 0.0)
|
||||
assert abs(rx - 1.0) < 1e-9
|
||||
assert abs(ry - 0.0) < 1e-9
|
||||
assert abs(rth - math.pi / 2) < 1e-9
|
||||
Loading…
x
Reference in New Issue
Block a user