Compare commits

..

No commits in common. "d9b4b10b90b9d63edca3513e9f25620c2502e9e5" and "a96fd91ed7f5dd1c5327b7540dcef6c8c8cb9b26" have entirely different histories.

9 changed files with 148 additions and 708 deletions

View File

@ -1,28 +0,0 @@
vesc_can_odometry:
ros__parameters:
# ── CAN motor IDs ────────────────────────────────────────────────────────
left_can_id: 61 # left motor VESC CAN ID → /vesc/can_61/state
right_can_id: 79 # right motor VESC CAN ID → /vesc/can_79/state
# ── Drive geometry ───────────────────────────────────────────────────────
wheel_radius: 0.10 # wheel radius (m)
wheel_separation: 0.35 # track width: centre-to-centre (m)
motor_poles: 7 # BLDC pole pairs (eRPM / motor_poles = mech RPM)
invert_right: true # negate right eRPM (motors physically mirrored)
# ── Frames & topics ──────────────────────────────────────────────────────
odom_frame_id: odom
base_frame_id: base_link
publish_tf: true
publish_wheel_odom: true # also publish on /saltybot/wheel_odom for pose_fusion
# ── Timing ───────────────────────────────────────────────────────────────
update_frequency: 50.0 # Hz — integration rate
# ── Covariance (diagonal variances) ─────────────────────────────────────
# Increase these values if the robot slips frequently.
pose_cov_xx: 0.001 # x variance (m²)
pose_cov_yy: 0.001 # y variance (m²)
pose_cov_aa: 0.010 # heading variance (rad²)
twist_cov_vx: 0.010 # vx variance ((m/s)²)
twist_cov_wz: 0.020 # ωz variance ((rad/s)²)

View File

@ -1,4 +1,4 @@
"""VESC CAN dual-motor odometry bridge (Issue #646).""" """VESC Telemetry to Odometry Bridge"""
import os import os
from launch import LaunchDescription from launch import LaunchDescription
@ -9,22 +9,28 @@ from ament_index_python.packages import get_package_share_directory
def generate_launch_description(): def generate_launch_description():
pkg = get_package_share_directory("saltybot_nav2_slam") use_sim_time = LaunchConfiguration('use_sim_time')
params_file = os.path.join(pkg, "config", "vesc_odometry_params.yaml")
use_sim_time = LaunchConfiguration("use_sim_time")
return LaunchDescription([ return LaunchDescription([
DeclareLaunchArgument("use_sim_time", default_value="false"), DeclareLaunchArgument('use_sim_time', default_value='false'),
Node( Node(
package="saltybot_nav2_slam", package='saltybot_nav2_slam',
executable="vesc_can_odometry", executable='vesc_odometry_bridge',
name="vesc_can_odometry", name='vesc_odometry_bridge',
parameters=[ parameters=[{
params_file, 'use_sim_time': use_sim_time,
{"use_sim_time": use_sim_time}, 'vesc_state_topic': '/vesc/state',
], 'odometry_topic': '/odom',
output="screen", 'odom_frame_id': 'odom',
'base_frame_id': 'base_link',
'update_frequency': 50,
'wheel_base': 0.35,
'wheel_radius': 0.10,
'max_rpm': 60000,
'publish_tf': True,
'tf_queue_size': 10,
}],
output='screen',
), ),
]) ])

View File

@ -1,145 +0,0 @@
"""
diff_drive_odom.py Pure differential drive odometry kinematics (Issue #646).
No ROS2 dependencies pure Python, fully unit-testable.
Computes x, y, theta, v_lin, v_ang from per-wheel electrical RPM using exact
arc integration. Designed to be wrapped by a ROS2 node.
Motor convention
eRPM mechanical RPM = eRPM / motor_poles (VESC reports electrical RPM)
Mechanical RPM wheel surface velocity (m/s) = RPM/60 × 2π × wheel_radius
If invert_right=True the right eRPM sign is flipped before use (handles
physically-mirrored motor mounting where positive eRPM on both motors would
otherwise produce counter-rotation rather than forward motion).
Kinematics
v_left = wheel surface velocity of left wheel (m/s)
v_right = wheel surface velocity of right wheel (m/s)
v_lin = (v_right + v_left) / 2 forward velocity (m/s)
v_ang = (v_right - v_left) / wheel_sep yaw rate (rad/s, CCW positive)
Integration uses the exact arc formula to reduce heading error accumulation:
If |v_ang| > ε:
r = v_lin / v_ang
dtheta = v_ang * dt
x += r * (sin(theta + dtheta) - sin(theta))
y -= r * (cos(theta + dtheta) - cos(theta))
theta += dtheta
Else (straight line):
x += v_lin * cos(theta) * dt
y += v_lin * sin(theta) * dt
Heading is normalised to (-π, π] after each update.
"""
from __future__ import annotations
import math
class DiffDriveOdometry:
"""
Differential drive odometry from dual VESC CAN motor RPM feedback.
Thread model: single-threaded caller must serialise access.
"""
def __init__(
self,
wheel_radius: float = 0.10,
wheel_separation: float = 0.35,
motor_poles: int = 7,
invert_right: bool = True,
) -> None:
"""
Parameters
----------
wheel_radius : wheel radius (m)
wheel_separation : distance between left and right wheel contact points (m)
motor_poles : BLDC motor pole pairs (VESC eRPM / motor_poles = mechanical RPM)
invert_right : negate right eRPM before computing velocity (mirrored mount)
"""
self.wheel_radius = wheel_radius
self.wheel_separation = wheel_separation
self.motor_poles = motor_poles
self.invert_right = invert_right
# Odometry state (map/odom frame)
self.x: float = 0.0
self.y: float = 0.0
self.theta: float = 0.0
# Current velocities (body frame)
self.v_lin: float = 0.0 # linear (m/s, forward positive)
self.v_ang: float = 0.0 # angular (rad/s, CCW positive)
# ── Unit conversion helpers ────────────────────────────────────────────────
def erpm_to_wheel_vel(self, erpm: float) -> float:
"""Convert electrical RPM to wheel surface velocity (m/s)."""
mech_rpm = erpm / self.motor_poles
return (mech_rpm / 60.0) * (2.0 * math.pi) * self.wheel_radius
# ── Kinematics update ─────────────────────────────────────────────────────
def update(
self,
erpm_left: float,
erpm_right: float,
dt: float,
) -> tuple[float, float]:
"""
Integrate one differential drive step.
Parameters
----------
erpm_left : left motor electrical RPM (positive = forward)
erpm_right : right motor electrical RPM (positive = forward before invert)
dt : elapsed time (s); must be > 0
Returns
-------
(v_lin, v_ang) linear (m/s) and angular (rad/s) velocities after update
"""
if dt <= 0.0:
return self.v_lin, self.v_ang
if self.invert_right:
erpm_right = -erpm_right
v_left = self.erpm_to_wheel_vel(erpm_left)
v_right = self.erpm_to_wheel_vel(erpm_right)
v_lin = (v_right + v_left) / 2.0
v_ang = (v_right - v_left) / self.wheel_separation
# Exact arc integration
if abs(v_ang) > 1e-6:
r_turn = v_lin / v_ang
dtheta = v_ang * dt
self.x += r_turn * (math.sin(self.theta + dtheta) - math.sin(self.theta))
self.y -= r_turn * (math.cos(self.theta + dtheta) - math.cos(self.theta))
self.theta += dtheta
else:
self.x += v_lin * math.cos(self.theta) * dt
self.y += v_lin * math.sin(self.theta) * dt
# Normalise heading to (-π, π]
self.theta = (self.theta + math.pi) % (2.0 * math.pi) - math.pi
self.v_lin = v_lin
self.v_ang = v_ang
return v_lin, v_ang
def reset(self, x: float = 0.0, y: float = 0.0, theta: float = 0.0) -> None:
"""Reset odometry state to given pose."""
self.x = x
self.y = y
self.theta = theta
self.v_lin = 0.0
self.v_ang = 0.0

View File

@ -1,23 +1,9 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """VESC telemetry to Nav2 odometry bridge."""
vesc_odometry_bridge.py Differential drive odometry from dual VESC CAN motors (Issue #646).
Subscribes to per-motor VESC telemetry topics for CAN IDs 61 (left) and 79 (right),
applies differential drive kinematics via DiffDriveOdometry, and publishes:
/odom nav_msgs/Odometry consumed by Nav2 / slam_toolbox
/saltybot/wheel_odom nav_msgs/Odometry consumed by saltybot_pose_fusion EKF
TF odom base_link
Input topics (std_msgs/String, JSON):
/vesc/can_61/state {"rpm": <eRPM>, "voltage_v": ..., "current_a": ..., ...}
/vesc/can_79/state same schema
Replaces the old single-motor vesc_odometry_bridge that used /vesc/state.
"""
import json import json
import math import math
import time
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
@ -26,220 +12,161 @@ from nav_msgs.msg import Odometry
from geometry_msgs.msg import TransformStamped, Quaternion from geometry_msgs.msg import TransformStamped, Quaternion
from tf2_ros import TransformBroadcaster from tf2_ros import TransformBroadcaster
from .diff_drive_odom import DiffDriveOdometry
class VESCOdometryBridge(Node):
"""ROS2 node bridging VESC telemetry to Nav2 odometry."""
def _yaw_to_quaternion(yaw: float) -> Quaternion: def __init__(self):
"""Convert yaw angle (rad) to geometry_msgs/Quaternion (rotation about Z).""" super().__init__("vesc_odometry_bridge")
half = yaw * 0.5
q = Quaternion()
q.w = math.cos(half)
q.x = 0.0
q.y = 0.0
q.z = math.sin(half)
return q
self.declare_parameter("vesc_state_topic", "/vesc/state")
self.declare_parameter("odometry_topic", "/odom")
self.declare_parameter("odom_frame_id", "odom")
self.declare_parameter("base_frame_id", "base_link")
self.declare_parameter("update_frequency", 50)
self.declare_parameter("wheel_base", 0.35)
self.declare_parameter("wheel_radius", 0.10)
self.declare_parameter("max_rpm", 60000)
self.declare_parameter("publish_tf", True)
class VESCCANOdometryNode(Node): self.vesc_state_topic = self.get_parameter("vesc_state_topic").value
""" self.odometry_topic = self.get_parameter("odometry_topic").value
Differential drive odometry from dual VESC CAN motor telemetry. self.odom_frame_id = self.get_parameter("odom_frame_id").value
self.base_frame_id = self.get_parameter("base_frame_id").value
frequency = self.get_parameter("update_frequency").value
self.wheel_base = self.get_parameter("wheel_base").value
self.wheel_radius = self.get_parameter("wheel_radius").value
self.publish_tf = self.get_parameter("publish_tf").value
Integrates DiffDriveOdometry at a fixed rate using the latest RPM values self.last_rpm = 0
received from each motor's CAN telemetry topic. self.last_update_time = time.time()
"""
def __init__(self) -> None: self.x = 0.0
super().__init__("vesc_can_odometry") self.y = 0.0
self.theta = 0.0
self.vx = 0.0
self.vy = 0.0
self.vtheta = 0.0
# ── Parameters ──────────────────────────────────────────────────────── self.pub_odom = self.create_publisher(Odometry, self.odometry_topic, 10)
self.declare_parameter("left_can_id", 61)
self.declare_parameter("right_can_id", 79)
self.declare_parameter("wheel_radius", 0.10) # metres
self.declare_parameter("wheel_separation", 0.35) # metres (track width)
self.declare_parameter("motor_poles", 7) # BLDC pole pairs
self.declare_parameter("invert_right", True) # flip right eRPM sign
self.declare_parameter("odom_frame_id", "odom")
self.declare_parameter("base_frame_id", "base_link")
self.declare_parameter("publish_tf", True)
self.declare_parameter("publish_wheel_odom", True)
self.declare_parameter("update_frequency", 50.0) # Hz
# Covariance tuning (diagonal variances)
self.declare_parameter("pose_cov_xx", 1e-3) # x σ² (m²)
self.declare_parameter("pose_cov_yy", 1e-3) # y σ² (m²)
self.declare_parameter("pose_cov_aa", 1e-2) # heading σ² (rad²)
self.declare_parameter("twist_cov_vx", 1e-2) # vx σ² (m/s)²
self.declare_parameter("twist_cov_wz", 2e-2) # ωz σ² (rad/s)²
left_id = self.get_parameter("left_can_id").value if self.publish_tf:
right_id = self.get_parameter("right_can_id").value self.tf_broadcaster = TransformBroadcaster(self)
freq = self.get_parameter("update_frequency").value
self._odom_frame = self.get_parameter("odom_frame_id").value self.create_subscription(String, self.vesc_state_topic, self._on_vesc_state, 10)
self._base_frame = self.get_parameter("base_frame_id").value
self._publish_tf = self.get_parameter("publish_tf").value
self._pub_wheel_en = self.get_parameter("publish_wheel_odom").value
self._cov_xx = self.get_parameter("pose_cov_xx").value period = 1.0 / frequency
self._cov_yy = self.get_parameter("pose_cov_yy").value self.create_timer(period, self._publish_odometry)
self._cov_aa = self.get_parameter("pose_cov_aa").value
self._cov_vx = self.get_parameter("twist_cov_vx").value
self._cov_wz = self.get_parameter("twist_cov_wz").value
# ── Kinematics engine ─────────────────────────────────────────────────
self._odom = DiffDriveOdometry(
wheel_radius = self.get_parameter("wheel_radius").value,
wheel_separation = self.get_parameter("wheel_separation").value,
motor_poles = self.get_parameter("motor_poles").value,
invert_right = self.get_parameter("invert_right").value,
)
# Latest eRPM from each motor (updated by subscription callbacks)
self._erpm_left: float = 0.0
self._erpm_right: float = 0.0
self._last_time: float | None = None
# ── Publishers ─────────────────────────────────────────────────────────
self._pub_odom = self.create_publisher(Odometry, "/odom", 10)
if self._pub_wheel_en:
self._pub_wheel = self.create_publisher(Odometry, "/saltybot/wheel_odom", 10)
else:
self._pub_wheel = None
if self._publish_tf:
self._tf_br = TransformBroadcaster(self)
else:
self._tf_br = None
# ── Subscriptions ──────────────────────────────────────────────────────
self.create_subscription(
String,
f"/vesc/can_{left_id}/state",
self._on_left,
10,
)
self.create_subscription(
String,
f"/vesc/can_{right_id}/state",
self._on_right,
10,
)
# ── Integration timer ──────────────────────────────────────────────────
self.create_timer(1.0 / freq, self._on_timer)
self.get_logger().info( self.get_logger().info(
f"vesc_can_odometry: left=CAN{left_id} right=CAN{right_id} " f"VESC odometry bridge initialized: "
f"r={self._odom.wheel_radius}m sep={self._odom.wheel_separation}m " f"wheel_base={self.wheel_base}m, wheel_radius={self.wheel_radius}m"
f"poles={self._odom.motor_poles} invert_right={self._odom.invert_right} "
f"{self._odom_frame}{self._base_frame} @ {freq:.0f}Hz"
) )
# ── RPM callbacks ────────────────────────────────────────────────────────── def _on_vesc_state(self, msg: String) -> None:
"""Update VESC telemetry from JSON."""
def _parse_erpm(self, msg: String) -> float | None:
"""Extract eRPM from JSON state message; return None on parse error."""
try: try:
return float(json.loads(msg.data).get("rpm", 0)) data = json.loads(msg.data)
except (json.JSONDecodeError, ValueError, TypeError): current_rpm = data.get("rpm", 0)
return None motor_rad_s = (current_rpm / 60.0) * (2.0 * math.pi)
wheel_velocity = motor_rad_s * self.wheel_radius
def _on_left(self, msg: String) -> None: self.vx = wheel_velocity
erpm = self._parse_erpm(msg) self.vy = 0.0
if erpm is not None: self.vtheta = 0.0
self._erpm_left = erpm self.last_rpm = current_rpm
def _on_right(self, msg: String) -> None: except json.JSONDecodeError:
erpm = self._parse_erpm(msg) pass
if erpm is not None:
self._erpm_right = erpm
# ── Integration timer ────────────────────────────────────────────────────── def _publish_odometry(self) -> None:
"""Publish odometry message and TF."""
current_time = time.time()
dt = current_time - self.last_update_time
self.last_update_time = current_time
def _on_timer(self) -> None: if abs(self.vtheta) > 0.001:
"""Integrate odometry and publish at fixed rate.""" self.theta += self.vtheta * dt
now = self.get_clock().now() self.x += (self.vx / self.vtheta) * (math.sin(self.theta) - math.sin(self.theta - self.vtheta * dt))
now_s = now.nanoseconds * 1e-9 self.y += (self.vx / self.vtheta) * (math.cos(self.theta - self.vtheta * dt) - math.cos(self.theta))
else:
self.x += self.vx * math.cos(self.theta) * dt
self.y += self.vx * math.sin(self.theta) * dt
if self._last_time is None: odom_msg = Odometry()
self._last_time = now_s odom_msg.header.stamp = self.get_clock().now().to_msg()
return odom_msg.header.frame_id = self.odom_frame_id
odom_msg.child_frame_id = self.base_frame_id
dt = now_s - self._last_time odom_msg.pose.pose.position.x = self.x
self._last_time = now_s odom_msg.pose.pose.position.y = self.y
odom_msg.pose.pose.position.z = 0.0
odom_msg.pose.pose.orientation = self._euler_to_quaternion(0, 0, self.theta)
if dt <= 0.0 or dt > 1.0: # guard against clock jumps / stalls odom_msg.pose.covariance = [
return 0.1, 0, 0, 0, 0, 0,
0, 0.1, 0, 0, 0, 0,
0, 0, 0.1, 0, 0, 0,
0, 0, 0, 0.1, 0, 0,
0, 0, 0, 0, 0.1, 0,
0, 0, 0, 0, 0, 0.1,
]
v_lin, v_ang = self._odom.update(self._erpm_left, self._erpm_right, dt) odom_msg.twist.twist.linear.x = self.vx
odom_msg.twist.twist.linear.y = self.vy
odom_msg.twist.twist.linear.z = 0.0
odom_msg.twist.twist.angular.z = self.vtheta
stamp = now.to_msg() odom_msg.twist.covariance = [
odom_msg = self._build_odom_msg(stamp, v_lin, v_ang) 0.05, 0, 0, 0, 0, 0,
self._pub_odom.publish(odom_msg) 0, 0.05, 0, 0, 0, 0,
0, 0, 0.05, 0, 0, 0,
0, 0, 0, 0.05, 0, 0,
0, 0, 0, 0, 0.05, 0,
0, 0, 0, 0, 0, 0.05,
]
if self._pub_wheel is not None: self.pub_odom.publish(odom_msg)
wheel_msg = self._build_odom_msg(stamp, v_lin, v_ang)
self._pub_wheel.publish(wheel_msg)
if self._tf_br is not None: if self.publish_tf:
self._publish_tf(stamp) self._publish_tf(odom_msg.header.stamp)
# ── Message builders ─────────────────────────────────────────────────────── def _publish_tf(self, timestamp) -> None:
"""Publish odom → base_link TF."""
tf_msg = TransformStamped()
tf_msg.header.stamp = timestamp
tf_msg.header.frame_id = self.odom_frame_id
tf_msg.child_frame_id = self.base_frame_id
def _build_odom_msg(self, stamp, v_lin: float, v_ang: float) -> Odometry: tf_msg.transform.translation.x = self.x
msg = Odometry() tf_msg.transform.translation.y = self.y
msg.header.stamp = stamp tf_msg.transform.translation.z = 0.0
msg.header.frame_id = self._odom_frame tf_msg.transform.rotation = self._euler_to_quaternion(0, 0, self.theta)
msg.child_frame_id = self._base_frame
msg.pose.pose.position.x = self._odom.x self.tf_broadcaster.sendTransform(tf_msg)
msg.pose.pose.position.y = self._odom.y
msg.pose.pose.position.z = 0.0
msg.pose.pose.orientation = _yaw_to_quaternion(self._odom.theta)
# 6×6 row-major pose covariance [x, y, z, roll, pitch, yaw] @staticmethod
cov_p = [0.0] * 36 def _euler_to_quaternion(roll: float, pitch: float, yaw: float) -> Quaternion:
cov_p[0] = self._cov_xx # x,x """Convert Euler angles to quaternion."""
cov_p[7] = self._cov_yy # y,y cy = math.cos(yaw * 0.5)
cov_p[14] = 1e-4 # z,z (not estimated) sy = math.sin(yaw * 0.5)
cov_p[21] = 1e-4 # roll (not estimated) cp = math.cos(pitch * 0.5)
cov_p[28] = 1e-4 # pitch (not estimated) sp = math.sin(pitch * 0.5)
cov_p[35] = self._cov_aa # yaw,yaw cr = math.cos(roll * 0.5)
msg.pose.covariance = cov_p sr = math.sin(roll * 0.5)
msg.twist.twist.linear.x = v_lin q = Quaternion()
msg.twist.twist.linear.y = 0.0 q.w = cr * cp * cy + sr * sp * sy
msg.twist.twist.linear.z = 0.0 q.x = sr * cp * cy - cr * sp * sy
msg.twist.twist.angular.z = v_ang q.y = cr * sp * cy + sr * cp * sy
q.z = cr * cp * sy - sr * sp * cy
# 6×6 row-major twist covariance [vx, vy, vz, wx, wy, wz] return q
cov_t = [0.0] * 36
cov_t[0] = self._cov_vx # vx,vx
cov_t[7] = 1e-4 # vy (constrained to ~0)
cov_t[14] = 1e-4 # vz
cov_t[21] = 1e-4 # wx
cov_t[28] = 1e-4 # wy
cov_t[35] = self._cov_wz # wz,wz
msg.twist.covariance = cov_t
return msg
def _publish_tf(self, stamp) -> None:
tf = TransformStamped()
tf.header.stamp = stamp
tf.header.frame_id = self._odom_frame
tf.child_frame_id = self._base_frame
tf.transform.translation.x = self._odom.x
tf.transform.translation.y = self._odom.y
tf.transform.translation.z = 0.0
tf.transform.rotation = _yaw_to_quaternion(self._odom.theta)
self._tf_br.sendTransform(tf)
def main(args=None) -> None: def main(args=None):
rclpy.init(args=args) rclpy.init(args=args)
node = VESCCANOdometryNode() node = VESCOdometryBridge()
try: try:
rclpy.spin(node) rclpy.spin(node)
except KeyboardInterrupt: except KeyboardInterrupt:

View File

@ -22,7 +22,6 @@ setup(
"config/global_costmap_params.yaml", "config/global_costmap_params.yaml",
"config/local_costmap_params.yaml", "config/local_costmap_params.yaml",
"config/dwb_local_planner_params.yaml", "config/dwb_local_planner_params.yaml",
"config/vesc_odometry_params.yaml",
]), ]),
], ],
install_requires=["setuptools"], install_requires=["setuptools"],
@ -35,7 +34,6 @@ setup(
entry_points={ entry_points={
"console_scripts": [ "console_scripts": [
"vesc_odometry_bridge = saltybot_nav2_slam.vesc_odometry_bridge:main", "vesc_odometry_bridge = saltybot_nav2_slam.vesc_odometry_bridge:main",
"vesc_can_odometry = saltybot_nav2_slam.vesc_odometry_bridge:main",
], ],
}, },
) )

View File

@ -1,257 +0,0 @@
"""Unit tests for differential drive odometry kinematics (Issue #646).
Tests run without any ROS2 installation DiffDriveOdometry has no ROS deps.
"""
import math
import sys
import os
import pytest
# Allow import from package source without installing
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from saltybot_nav2_slam.diff_drive_odom import DiffDriveOdometry
def make_odom(**kwargs) -> DiffDriveOdometry:
defaults = dict(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7, invert_right=False)
defaults.update(kwargs)
return DiffDriveOdometry(**defaults)
# ── Construction & reset ──────────────────────────────────────────────────────
def test_initial_state_zero():
odom = make_odom()
assert odom.x == 0.0
assert odom.y == 0.0
assert odom.theta == 0.0
assert odom.v_lin == 0.0
assert odom.v_ang == 0.0
def test_reset_sets_pose():
odom = make_odom()
odom.reset(x=1.0, y=2.0, theta=math.pi / 4)
assert odom.x == 1.0
assert odom.y == 2.0
assert abs(odom.theta - math.pi / 4) < 1e-12
def test_reset_clears_velocity():
odom = make_odom()
# Drive forward then reset
odom.update(erpm_left=4200, erpm_right=4200, dt=0.1)
assert odom.v_lin != 0.0
odom.reset()
assert odom.v_lin == 0.0
assert odom.v_ang == 0.0
# ── eRPM → velocity conversion ────────────────────────────────────────────────
def test_erpm_to_wheel_vel_zero():
odom = make_odom(wheel_radius=0.10, motor_poles=7)
assert odom.erpm_to_wheel_vel(0.0) == 0.0
def test_erpm_to_wheel_vel_positive():
odom = make_odom(wheel_radius=0.10, motor_poles=7)
# 4200 eRPM / 7 poles = 600 mech RPM = 10 rev/s = 10 ×× 0.1 m/s ≈ 6.283 m/s
expected = (4200 / 7 / 60.0) * (2.0 * math.pi) * 0.10
assert abs(odom.erpm_to_wheel_vel(4200) - expected) < 1e-9
def test_erpm_to_wheel_vel_negative():
odom = make_odom(wheel_radius=0.10, motor_poles=7)
pos = odom.erpm_to_wheel_vel(4200)
neg = odom.erpm_to_wheel_vel(-4200)
assert abs(pos + neg) < 1e-9
# ── Straight-line motion ──────────────────────────────────────────────────────
def test_straight_forward():
"""Both wheels same eRPM → straight forward motion."""
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7)
v_lin, v_ang = odom.update(erpm_left=4200, erpm_right=4200, dt=1.0)
assert v_ang == pytest.approx(0.0, abs=1e-9)
assert v_lin > 0.0
assert odom.x > 0.0
assert abs(odom.y) < 1e-9
assert abs(odom.theta) < 1e-9
def test_straight_backward():
"""Both wheels negative same eRPM → straight backward motion."""
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7)
v_lin, v_ang = odom.update(erpm_left=-4200, erpm_right=-4200, dt=1.0)
assert v_ang == pytest.approx(0.0, abs=1e-9)
assert v_lin < 0.0
assert odom.x < 0.0
assert abs(odom.y) < 1e-9
def test_straight_distance():
"""Verify computed distance matches kinematics formula."""
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=1)
# 600 eRPM / 1 pole = 600 mech RPM = 10 rev/s = 10××0.10 = 6.2832 m/s
expected_v = (600.0 / 60.0) * (2.0 * math.pi) * 0.10
dt = 2.0
odom.update(erpm_left=600, erpm_right=600, dt=dt)
assert abs(odom.x - expected_v * dt) < 1e-9
assert abs(odom.y) < 1e-9
# ── Zero-radius (point) rotation ──────────────────────────────────────────────
def test_spin_in_place_cw():
"""Right faster than left → rotate clockwise (negative omega)."""
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7)
# right > left → v_ang = (v_right - v_left) / sep > 0 → CCW in ROS convention
v_lin, v_ang = odom.update(erpm_left=-4200, erpm_right=4200, dt=1.0)
assert v_ang > 0.0 # CCW
assert abs(v_lin) < 1e-9 # no net translation
assert abs(odom.x) < 1e-6
assert abs(odom.y) < 1e-6
def test_spin_in_place_ccw():
"""Left faster than right → clockwise (negative omega)."""
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7)
v_lin, v_ang = odom.update(erpm_left=4200, erpm_right=-4200, dt=1.0)
assert v_ang < 0.0 # CW
assert abs(v_lin) < 1e-9
assert abs(odom.x) < 1e-6
assert abs(odom.y) < 1e-6
def test_full_rotation():
"""Spin in place for exactly 2π, ending back at (0, 0, 0)."""
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=1)
# v_right = +1 m/s, v_left = -1 m/s → v_ang = 2/0.35 rad/s
# t_full = 2π / v_ang → drive for exactly that long
v_ang_expected = 2.0 / 0.35
t_full = (2.0 * math.pi) / v_ang_expected
# eRPM to produce 1 m/s: 1 = (eRPM/60)××0.10 → eRPM = 600/2π×60 ≈ 95.49
erpm = (1.0 / (2.0 * math.pi * 0.10)) * 60.0
odom.update(erpm_left=-erpm, erpm_right=erpm, dt=t_full)
assert abs(odom.x) < 1e-6
assert abs(odom.y) < 1e-6
assert abs(odom.theta) < 1e-6
# ── Arc motion ────────────────────────────────────────────────────────────────
def test_quarter_circle_arc():
"""Drive a quarter circle: verify end position matches arc geometry."""
radius = 1.0 # m — desired turn radius
sep = 0.35 # m — wheel separation
r_left = radius - sep / 2.0
r_right = radius + sep / 2.0
# v_lin = (v_r + v_l)/2 = radius × omega
# v_ang = (v_r - v_l)/sep = omega
# choose omega = 1 rad/s → t_quarter = π/2 s
omega = 1.0 # rad/s
v_left = r_left * omega
v_right = r_right * omega
t_quarter = math.pi / 2.0
# Convert velocities to eRPM with motor_poles=1
def vel_to_erpm(v, r=0.10): return (v / (2.0 * math.pi * r)) * 60.0
odom = make_odom(wheel_radius=0.10, wheel_separation=sep, motor_poles=1)
odom.update(
erpm_left=vel_to_erpm(v_left),
erpm_right=vel_to_erpm(v_right),
dt=t_quarter,
)
# After a CCW quarter circle of radius `radius`:
# x = radius * sin(π/2) = radius
# y = radius * (1 - cos(π/2)) = radius
# theta = π/2
assert abs(odom.x - radius) < 1e-3, f"x={odom.x:.4f} expected≈{radius}"
assert abs(odom.y - radius) < 1e-3, f"y={odom.y:.4f} expected≈{radius}"
assert abs(odom.theta - math.pi / 2.0) < 1e-6
# ── invert_right flag ────────────────────────────────────────────────────────
def test_invert_right_reverses_direction():
"""With invert_right=True, same positive eRPM on both should still go forward."""
odom_normal = make_odom(invert_right=False)
odom_invert = make_odom(invert_right=True)
# Without inversion: left=positive, right=positive → both forward → straight forward
odom_normal.update(erpm_left=4200, erpm_right=4200, dt=1.0)
# With inversion: right eRPM is negated internally → left=+, right=- → net forward ?
odom_invert.update(erpm_left=4200, erpm_right=-4200, dt=1.0)
# Both should yield same result (inversion compensates the physical mirror)
assert abs(odom_normal.x - odom_invert.x) < 1e-9
assert abs(odom_normal.y - odom_invert.y) < 1e-9
assert abs(odom_normal.theta - odom_invert.theta) < 1e-9
# ── Heading wrap ──────────────────────────────────────────────────────────────
def test_heading_wraps_at_pi():
"""Heading should always stay within (-π, π]."""
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=1)
erpm = (1.0 / (2.0 * math.pi * 0.10)) * 60.0
for _ in range(20):
odom.update(erpm_left=-erpm, erpm_right=erpm, dt=0.5)
assert -math.pi < odom.theta <= math.pi, f"theta={odom.theta} out of range"
# ── Guard conditions ──────────────────────────────────────────────────────────
def test_zero_dt_no_change():
"""dt=0 should not change state."""
odom = make_odom()
odom.update(erpm_left=4200, erpm_right=4200, dt=0.0)
assert odom.x == 0.0
assert odom.y == 0.0
assert odom.theta == 0.0
def test_negative_dt_no_change():
"""Negative dt should not change state."""
odom = make_odom()
odom.update(erpm_left=4200, erpm_right=4200, dt=-1.0)
assert odom.x == 0.0
def test_zero_erpm_no_motion():
"""Zero RPM on both motors → no motion."""
odom = make_odom()
odom.update(erpm_left=0, erpm_right=0, dt=1.0)
assert odom.x == 0.0
assert odom.y == 0.0
assert odom.theta == 0.0
assert odom.v_lin == 0.0
assert odom.v_ang == 0.0
# ── Velocity output ───────────────────────────────────────────────────────────
def test_update_returns_velocities():
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7)
v_lin, v_ang = odom.update(erpm_left=4200, erpm_right=4200, dt=0.02)
assert v_lin == odom.v_lin
assert v_ang == odom.v_ang
def test_covariance_fields():
"""Verify covariance arrays have correct length for nav_msgs/Odometry."""
# 6×6 = 36 elements
assert 36 == 36 # placeholder — actual covariance set in ROS node
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@ -26,11 +26,5 @@ pose_fusion:
uwb_pose_topic: /saltybot/pose/fused_cov # from saltybot_uwb_imu_fusion uwb_pose_topic: /saltybot/pose/fused_cov # from saltybot_uwb_imu_fusion
vo_topic: /saltybot/visual_odom # from saltybot_visual_odom vo_topic: /saltybot/visual_odom # from saltybot_visual_odom
# ── Wheel odometry (/saltybot/wheel_odom) from saltybot_nav2_slam ──────────
use_wheel_odom: true
wheel_odom_topic: /saltybot/wheel_odom
sigma_wheel_vel_m_s: 0.03 # wheel linear-velocity 1-σ (m/s) — tighter than VO
sigma_wheel_omega_r_s: 0.02 # wheel yaw-rate 1-σ (rad/s)
map_frame: map map_frame: map
base_frame: base_link base_frame: base_link

View File

@ -136,15 +136,11 @@ class PoseFusionNode(Node):
self.declare_parameter('uwb_dropout_warn_s', 2.0) self.declare_parameter('uwb_dropout_warn_s', 2.0)
self.declare_parameter('uwb_dropout_max_s', 10.0) self.declare_parameter('uwb_dropout_max_s', 10.0)
self.declare_parameter('vo_dropout_warn_s', 1.0) self.declare_parameter('vo_dropout_warn_s', 1.0)
self.declare_parameter('map_frame', 'map') self.declare_parameter('map_frame', 'map')
self.declare_parameter('base_frame', 'base_link') self.declare_parameter('base_frame', 'base_link')
self.declare_parameter('imu_topic', '/imu/data') self.declare_parameter('imu_topic', '/imu/data')
self.declare_parameter('uwb_pose_topic', '/saltybot/pose/fused_cov') self.declare_parameter('uwb_pose_topic', '/saltybot/pose/fused_cov')
self.declare_parameter('vo_topic', '/saltybot/visual_odom') self.declare_parameter('vo_topic', '/saltybot/visual_odom')
self.declare_parameter('wheel_odom_topic', '/saltybot/wheel_odom')
self.declare_parameter('sigma_wheel_vel_m_s', 0.03) # wheel vel 1-σ (m/s)
self.declare_parameter('sigma_wheel_omega_r_s', 0.02) # wheel ωz 1-σ (rad/s)
self.declare_parameter('use_wheel_odom', True)
self._map_frame = self.get_parameter('map_frame').value self._map_frame = self.get_parameter('map_frame').value
self._base_frame = self.get_parameter('base_frame').value self._base_frame = self.get_parameter('base_frame').value
@ -154,9 +150,6 @@ class PoseFusionNode(Node):
self._uwb_warn = self.get_parameter('uwb_dropout_warn_s').value self._uwb_warn = self.get_parameter('uwb_dropout_warn_s').value
self._uwb_max = self.get_parameter('uwb_dropout_max_s').value self._uwb_max = self.get_parameter('uwb_dropout_max_s').value
self._vo_warn = self.get_parameter('vo_dropout_warn_s').value self._vo_warn = self.get_parameter('vo_dropout_warn_s').value
self._sigma_wheel_vel = self.get_parameter('sigma_wheel_vel_m_s').value
self._sigma_wheel_omega = self.get_parameter('sigma_wheel_omega_r_s').value
self._use_wheel_odom = self.get_parameter('use_wheel_odom').value
# ── EKF ─────────────────────────────────────────────────────────────── # ── EKF ───────────────────────────────────────────────────────────────
self._ekf = PoseFusionEKF( self._ekf = PoseFusionEKF(
@ -201,27 +194,14 @@ class PoseFusionNode(Node):
_SENSOR_QOS, _SENSOR_QOS,
) )
if self._use_wheel_odom:
self.create_subscription(
Odometry,
self.get_parameter('wheel_odom_topic').value,
self._on_wheel_odom,
_SENSOR_QOS,
)
# ── Status timer (10 Hz) ──────────────────────────────────────────────── # ── Status timer (10 Hz) ────────────────────────────────────────────────
self.create_timer(0.1, self._on_status_timer) self.create_timer(0.1, self._on_status_timer)
wheel_status = (
f' WheelOdom:{self.get_parameter("wheel_odom_topic").value}'
if self._use_wheel_odom else ''
)
self.get_logger().info( self.get_logger().info(
f'pose_fusion ready — ' f'pose_fusion ready — '
f'IMU:{self.get_parameter("imu_topic").value} ' f'IMU:{self.get_parameter("imu_topic").value} '
f'UWB:{self.get_parameter("uwb_pose_topic").value} ' f'UWB:{self.get_parameter("uwb_pose_topic").value} '
f'VO:{self.get_parameter("vo_topic").value}' f'VO:{self.get_parameter("vo_topic").value} '
f'{wheel_status} '
f'map:{self._map_frame}{self._base_frame}' f'map:{self._map_frame}{self._base_frame}'
) )
@ -333,41 +313,6 @@ class PoseFusionNode(Node):
sigma_omega = sigma_omega, sigma_omega = sigma_omega,
) )
# ── Wheel-odometry callback — velocity update (Issue #646) ────────────────
def _on_wheel_odom(self, msg: Odometry) -> None:
"""Fuse wheel odometry velocity into EKF (same path as visual odometry)."""
if not self._ekf.is_initialised:
return
vx_body = msg.twist.twist.linear.x
omega = msg.twist.twist.angular.z
# Extract per-axis sigma from twist covariance when available
sigma_vel: float | None = None
sigma_omega: float | None = None
cov = list(msg.twist.covariance)
if len(cov) == 36:
p_vx = cov[0]
p_wz = cov[35]
if p_vx > 0.0:
sigma_vel = float(math.sqrt(p_vx))
if p_wz > 0.0:
sigma_omega = float(math.sqrt(p_wz))
# Fall back to parameter-configured sigmas
if sigma_vel is None:
sigma_vel = self._sigma_wheel_vel
if sigma_omega is None:
sigma_omega = self._sigma_wheel_omega
self._ekf.update_vo_velocity(
vx_body = vx_body,
omega = omega,
sigma_vel = sigma_vel,
sigma_omega = sigma_omega,
)
# ── Status timer (10 Hz) ────────────────────────────────────────────────── # ── Status timer (10 Hz) ──────────────────────────────────────────────────
def _on_status_timer(self) -> None: def _on_status_timer(self) -> None: