feat: VESC CAN odometry for nav2 (Issue #646)

Replace single-motor vesc_odometry_bridge with dual-CAN differential
drive odometry for left (CAN 61) and right (CAN 79) VESC motors.

New files:
- diff_drive_odom.py: pure-Python kinematics (eRPM→wheel vel, exact arc
  integration, heading wrap), no ROS deps, fully unit-tested
- test/test_vesc_odometry.py: 20 unit tests (straight, arc, spin,
  invert_right, guard conditions) — all pass
- config/vesc_odometry_params.yaml: configurable wheel_radius,
  wheel_separation, motor_poles, invert_right, covariance tuning

Updated:
- vesc_odometry_bridge.py: rewritten as VESCCANOdometryNode; subscribes
  to /vesc/can_61/state and /vesc/can_79/state (std_msgs/String JSON);
  publishes /odom and /saltybot/wheel_odom (nav_msgs/Odometry) + TF
  odom→base_link with proper 6×6 covariance matrices
- odometry_bridge.launch.py: updated to launch vesc_can_odometry with
  vesc_odometry_params.yaml
- setup.py: added vesc_can_odometry entry point + config install
- pose_fusion_node.py: added optional wheel_odom_topic subscriber that
  feeds DiffDriveOdometry velocities into EKF via update_vo_velocity
- pose_fusion_params.yaml: added use_wheel_odom, wheel_odom_topic,
  sigma_wheel_vel_m_s, sigma_wheel_omega_r_s parameters

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-perception 2026-03-17 09:54:19 -04:00
parent 0fcad75cb4
commit d8b25bad77
9 changed files with 710 additions and 150 deletions

View File

@ -0,0 +1,28 @@
vesc_can_odometry:
ros__parameters:
# ── CAN motor IDs ────────────────────────────────────────────────────────
left_can_id: 61 # left motor VESC CAN ID → /vesc/can_61/state
right_can_id: 79 # right motor VESC CAN ID → /vesc/can_79/state
# ── Drive geometry ───────────────────────────────────────────────────────
wheel_radius: 0.10 # wheel radius (m)
wheel_separation: 0.35 # track width: centre-to-centre (m)
motor_poles: 7 # BLDC pole pairs (eRPM / motor_poles = mech RPM)
invert_right: true # negate right eRPM (motors physically mirrored)
# ── Frames & topics ──────────────────────────────────────────────────────
odom_frame_id: odom
base_frame_id: base_link
publish_tf: true
publish_wheel_odom: true # also publish on /saltybot/wheel_odom for pose_fusion
# ── Timing ───────────────────────────────────────────────────────────────
update_frequency: 50.0 # Hz — integration rate
# ── Covariance (diagonal variances) ─────────────────────────────────────
# Increase these values if the robot slips frequently.
pose_cov_xx: 0.001 # x variance (m²)
pose_cov_yy: 0.001 # y variance (m²)
pose_cov_aa: 0.010 # heading variance (rad²)
twist_cov_vx: 0.010 # vx variance ((m/s)²)
twist_cov_wz: 0.020 # ωz variance ((rad/s)²)

View File

@ -1,4 +1,4 @@
"""VESC Telemetry to Odometry Bridge""" """VESC CAN dual-motor odometry bridge (Issue #646)."""
import os import os
from launch import LaunchDescription from launch import LaunchDescription
@ -9,28 +9,22 @@ from ament_index_python.packages import get_package_share_directory
def generate_launch_description(): def generate_launch_description():
use_sim_time = LaunchConfiguration('use_sim_time') pkg = get_package_share_directory("saltybot_nav2_slam")
params_file = os.path.join(pkg, "config", "vesc_odometry_params.yaml")
use_sim_time = LaunchConfiguration("use_sim_time")
return LaunchDescription([ return LaunchDescription([
DeclareLaunchArgument('use_sim_time', default_value='false'), DeclareLaunchArgument("use_sim_time", default_value="false"),
Node( Node(
package='saltybot_nav2_slam', package="saltybot_nav2_slam",
executable='vesc_odometry_bridge', executable="vesc_can_odometry",
name='vesc_odometry_bridge', name="vesc_can_odometry",
parameters=[{ parameters=[
'use_sim_time': use_sim_time, params_file,
'vesc_state_topic': '/vesc/state', {"use_sim_time": use_sim_time},
'odometry_topic': '/odom', ],
'odom_frame_id': 'odom', output="screen",
'base_frame_id': 'base_link',
'update_frequency': 50,
'wheel_base': 0.35,
'wheel_radius': 0.10,
'max_rpm': 60000,
'publish_tf': True,
'tf_queue_size': 10,
}],
output='screen',
), ),
]) ])

View File

@ -0,0 +1,145 @@
"""
diff_drive_odom.py Pure differential drive odometry kinematics (Issue #646).
No ROS2 dependencies pure Python, fully unit-testable.
Computes x, y, theta, v_lin, v_ang from per-wheel electrical RPM using exact
arc integration. Designed to be wrapped by a ROS2 node.
Motor convention
eRPM mechanical RPM = eRPM / motor_poles (VESC reports electrical RPM)
Mechanical RPM wheel surface velocity (m/s) = RPM/60 × 2π × wheel_radius
If invert_right=True the right eRPM sign is flipped before use (handles
physically-mirrored motor mounting where positive eRPM on both motors would
otherwise produce counter-rotation rather than forward motion).
Kinematics
v_left = wheel surface velocity of left wheel (m/s)
v_right = wheel surface velocity of right wheel (m/s)
v_lin = (v_right + v_left) / 2 forward velocity (m/s)
v_ang = (v_right - v_left) / wheel_sep yaw rate (rad/s, CCW positive)
Integration uses the exact arc formula to reduce heading error accumulation:
If |v_ang| > ε:
r = v_lin / v_ang
dtheta = v_ang * dt
x += r * (sin(theta + dtheta) - sin(theta))
y -= r * (cos(theta + dtheta) - cos(theta))
theta += dtheta
Else (straight line):
x += v_lin * cos(theta) * dt
y += v_lin * sin(theta) * dt
Heading is normalised to (-π, π] after each update.
"""
from __future__ import annotations
import math
class DiffDriveOdometry:
"""
Differential drive odometry from dual VESC CAN motor RPM feedback.
Thread model: single-threaded caller must serialise access.
"""
def __init__(
self,
wheel_radius: float = 0.10,
wheel_separation: float = 0.35,
motor_poles: int = 7,
invert_right: bool = True,
) -> None:
"""
Parameters
----------
wheel_radius : wheel radius (m)
wheel_separation : distance between left and right wheel contact points (m)
motor_poles : BLDC motor pole pairs (VESC eRPM / motor_poles = mechanical RPM)
invert_right : negate right eRPM before computing velocity (mirrored mount)
"""
self.wheel_radius = wheel_radius
self.wheel_separation = wheel_separation
self.motor_poles = motor_poles
self.invert_right = invert_right
# Odometry state (map/odom frame)
self.x: float = 0.0
self.y: float = 0.0
self.theta: float = 0.0
# Current velocities (body frame)
self.v_lin: float = 0.0 # linear (m/s, forward positive)
self.v_ang: float = 0.0 # angular (rad/s, CCW positive)
# ── Unit conversion helpers ────────────────────────────────────────────────
def erpm_to_wheel_vel(self, erpm: float) -> float:
"""Convert electrical RPM to wheel surface velocity (m/s)."""
mech_rpm = erpm / self.motor_poles
return (mech_rpm / 60.0) * (2.0 * math.pi) * self.wheel_radius
# ── Kinematics update ─────────────────────────────────────────────────────
def update(
self,
erpm_left: float,
erpm_right: float,
dt: float,
) -> tuple[float, float]:
"""
Integrate one differential drive step.
Parameters
----------
erpm_left : left motor electrical RPM (positive = forward)
erpm_right : right motor electrical RPM (positive = forward before invert)
dt : elapsed time (s); must be > 0
Returns
-------
(v_lin, v_ang) linear (m/s) and angular (rad/s) velocities after update
"""
if dt <= 0.0:
return self.v_lin, self.v_ang
if self.invert_right:
erpm_right = -erpm_right
v_left = self.erpm_to_wheel_vel(erpm_left)
v_right = self.erpm_to_wheel_vel(erpm_right)
v_lin = (v_right + v_left) / 2.0
v_ang = (v_right - v_left) / self.wheel_separation
# Exact arc integration
if abs(v_ang) > 1e-6:
r_turn = v_lin / v_ang
dtheta = v_ang * dt
self.x += r_turn * (math.sin(self.theta + dtheta) - math.sin(self.theta))
self.y -= r_turn * (math.cos(self.theta + dtheta) - math.cos(self.theta))
self.theta += dtheta
else:
self.x += v_lin * math.cos(self.theta) * dt
self.y += v_lin * math.sin(self.theta) * dt
# Normalise heading to (-π, π]
self.theta = (self.theta + math.pi) % (2.0 * math.pi) - math.pi
self.v_lin = v_lin
self.v_ang = v_ang
return v_lin, v_ang
def reset(self, x: float = 0.0, y: float = 0.0, theta: float = 0.0) -> None:
"""Reset odometry state to given pose."""
self.x = x
self.y = y
self.theta = theta
self.v_lin = 0.0
self.v_ang = 0.0

View File

@ -1,9 +1,23 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
"""VESC telemetry to Nav2 odometry bridge.""" """
vesc_odometry_bridge.py Differential drive odometry from dual VESC CAN motors (Issue #646).
Subscribes to per-motor VESC telemetry topics for CAN IDs 61 (left) and 79 (right),
applies differential drive kinematics via DiffDriveOdometry, and publishes:
/odom nav_msgs/Odometry consumed by Nav2 / slam_toolbox
/saltybot/wheel_odom nav_msgs/Odometry consumed by saltybot_pose_fusion EKF
TF odom base_link
Input topics (std_msgs/String, JSON):
/vesc/can_61/state {"rpm": <eRPM>, "voltage_v": ..., "current_a": ..., ...}
/vesc/can_79/state same schema
Replaces the old single-motor vesc_odometry_bridge that used /vesc/state.
"""
import json import json
import math import math
import time
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
@ -12,161 +26,220 @@ from nav_msgs.msg import Odometry
from geometry_msgs.msg import TransformStamped, Quaternion from geometry_msgs.msg import TransformStamped, Quaternion
from tf2_ros import TransformBroadcaster from tf2_ros import TransformBroadcaster
from .diff_drive_odom import DiffDriveOdometry
class VESCOdometryBridge(Node):
"""ROS2 node bridging VESC telemetry to Nav2 odometry."""
def __init__(self):
super().__init__("vesc_odometry_bridge")
self.declare_parameter("vesc_state_topic", "/vesc/state")
self.declare_parameter("odometry_topic", "/odom")
self.declare_parameter("odom_frame_id", "odom")
self.declare_parameter("base_frame_id", "base_link")
self.declare_parameter("update_frequency", 50)
self.declare_parameter("wheel_base", 0.35)
self.declare_parameter("wheel_radius", 0.10)
self.declare_parameter("max_rpm", 60000)
self.declare_parameter("publish_tf", True)
self.vesc_state_topic = self.get_parameter("vesc_state_topic").value
self.odometry_topic = self.get_parameter("odometry_topic").value
self.odom_frame_id = self.get_parameter("odom_frame_id").value
self.base_frame_id = self.get_parameter("base_frame_id").value
frequency = self.get_parameter("update_frequency").value
self.wheel_base = self.get_parameter("wheel_base").value
self.wheel_radius = self.get_parameter("wheel_radius").value
self.publish_tf = self.get_parameter("publish_tf").value
self.last_rpm = 0
self.last_update_time = time.time()
self.x = 0.0
self.y = 0.0
self.theta = 0.0
self.vx = 0.0
self.vy = 0.0
self.vtheta = 0.0
self.pub_odom = self.create_publisher(Odometry, self.odometry_topic, 10)
if self.publish_tf:
self.tf_broadcaster = TransformBroadcaster(self)
self.create_subscription(String, self.vesc_state_topic, self._on_vesc_state, 10)
period = 1.0 / frequency
self.create_timer(period, self._publish_odometry)
self.get_logger().info(
f"VESC odometry bridge initialized: "
f"wheel_base={self.wheel_base}m, wheel_radius={self.wheel_radius}m"
)
def _on_vesc_state(self, msg: String) -> None:
"""Update VESC telemetry from JSON."""
try:
data = json.loads(msg.data)
current_rpm = data.get("rpm", 0)
motor_rad_s = (current_rpm / 60.0) * (2.0 * math.pi)
wheel_velocity = motor_rad_s * self.wheel_radius
self.vx = wheel_velocity
self.vy = 0.0
self.vtheta = 0.0
self.last_rpm = current_rpm
except json.JSONDecodeError:
pass
def _publish_odometry(self) -> None:
"""Publish odometry message and TF."""
current_time = time.time()
dt = current_time - self.last_update_time
self.last_update_time = current_time
if abs(self.vtheta) > 0.001:
self.theta += self.vtheta * dt
self.x += (self.vx / self.vtheta) * (math.sin(self.theta) - math.sin(self.theta - self.vtheta * dt))
self.y += (self.vx / self.vtheta) * (math.cos(self.theta - self.vtheta * dt) - math.cos(self.theta))
else:
self.x += self.vx * math.cos(self.theta) * dt
self.y += self.vx * math.sin(self.theta) * dt
odom_msg = Odometry()
odom_msg.header.stamp = self.get_clock().now().to_msg()
odom_msg.header.frame_id = self.odom_frame_id
odom_msg.child_frame_id = self.base_frame_id
odom_msg.pose.pose.position.x = self.x
odom_msg.pose.pose.position.y = self.y
odom_msg.pose.pose.position.z = 0.0
odom_msg.pose.pose.orientation = self._euler_to_quaternion(0, 0, self.theta)
odom_msg.pose.covariance = [
0.1, 0, 0, 0, 0, 0,
0, 0.1, 0, 0, 0, 0,
0, 0, 0.1, 0, 0, 0,
0, 0, 0, 0.1, 0, 0,
0, 0, 0, 0, 0.1, 0,
0, 0, 0, 0, 0, 0.1,
]
odom_msg.twist.twist.linear.x = self.vx
odom_msg.twist.twist.linear.y = self.vy
odom_msg.twist.twist.linear.z = 0.0
odom_msg.twist.twist.angular.z = self.vtheta
odom_msg.twist.covariance = [
0.05, 0, 0, 0, 0, 0,
0, 0.05, 0, 0, 0, 0,
0, 0, 0.05, 0, 0, 0,
0, 0, 0, 0.05, 0, 0,
0, 0, 0, 0, 0.05, 0,
0, 0, 0, 0, 0, 0.05,
]
self.pub_odom.publish(odom_msg)
if self.publish_tf:
self._publish_tf(odom_msg.header.stamp)
def _publish_tf(self, timestamp) -> None:
"""Publish odom → base_link TF."""
tf_msg = TransformStamped()
tf_msg.header.stamp = timestamp
tf_msg.header.frame_id = self.odom_frame_id
tf_msg.child_frame_id = self.base_frame_id
tf_msg.transform.translation.x = self.x
tf_msg.transform.translation.y = self.y
tf_msg.transform.translation.z = 0.0
tf_msg.transform.rotation = self._euler_to_quaternion(0, 0, self.theta)
self.tf_broadcaster.sendTransform(tf_msg)
@staticmethod
def _euler_to_quaternion(roll: float, pitch: float, yaw: float) -> Quaternion:
"""Convert Euler angles to quaternion."""
cy = math.cos(yaw * 0.5)
sy = math.sin(yaw * 0.5)
cp = math.cos(pitch * 0.5)
sp = math.sin(pitch * 0.5)
cr = math.cos(roll * 0.5)
sr = math.sin(roll * 0.5)
def _yaw_to_quaternion(yaw: float) -> Quaternion:
"""Convert yaw angle (rad) to geometry_msgs/Quaternion (rotation about Z)."""
half = yaw * 0.5
q = Quaternion() q = Quaternion()
q.w = cr * cp * cy + sr * sp * sy q.w = math.cos(half)
q.x = sr * cp * cy - cr * sp * sy q.x = 0.0
q.y = cr * sp * cy + sr * cp * sy q.y = 0.0
q.z = cr * cp * sy - sr * sp * cy q.z = math.sin(half)
return q return q
def main(args=None): class VESCCANOdometryNode(Node):
"""
Differential drive odometry from dual VESC CAN motor telemetry.
Integrates DiffDriveOdometry at a fixed rate using the latest RPM values
received from each motor's CAN telemetry topic.
"""
def __init__(self) -> None:
super().__init__("vesc_can_odometry")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("left_can_id", 61)
self.declare_parameter("right_can_id", 79)
self.declare_parameter("wheel_radius", 0.10) # metres
self.declare_parameter("wheel_separation", 0.35) # metres (track width)
self.declare_parameter("motor_poles", 7) # BLDC pole pairs
self.declare_parameter("invert_right", True) # flip right eRPM sign
self.declare_parameter("odom_frame_id", "odom")
self.declare_parameter("base_frame_id", "base_link")
self.declare_parameter("publish_tf", True)
self.declare_parameter("publish_wheel_odom", True)
self.declare_parameter("update_frequency", 50.0) # Hz
# Covariance tuning (diagonal variances)
self.declare_parameter("pose_cov_xx", 1e-3) # x σ² (m²)
self.declare_parameter("pose_cov_yy", 1e-3) # y σ² (m²)
self.declare_parameter("pose_cov_aa", 1e-2) # heading σ² (rad²)
self.declare_parameter("twist_cov_vx", 1e-2) # vx σ² (m/s)²
self.declare_parameter("twist_cov_wz", 2e-2) # ωz σ² (rad/s)²
left_id = self.get_parameter("left_can_id").value
right_id = self.get_parameter("right_can_id").value
freq = self.get_parameter("update_frequency").value
self._odom_frame = self.get_parameter("odom_frame_id").value
self._base_frame = self.get_parameter("base_frame_id").value
self._publish_tf = self.get_parameter("publish_tf").value
self._pub_wheel_en = self.get_parameter("publish_wheel_odom").value
self._cov_xx = self.get_parameter("pose_cov_xx").value
self._cov_yy = self.get_parameter("pose_cov_yy").value
self._cov_aa = self.get_parameter("pose_cov_aa").value
self._cov_vx = self.get_parameter("twist_cov_vx").value
self._cov_wz = self.get_parameter("twist_cov_wz").value
# ── Kinematics engine ─────────────────────────────────────────────────
self._odom = DiffDriveOdometry(
wheel_radius = self.get_parameter("wheel_radius").value,
wheel_separation = self.get_parameter("wheel_separation").value,
motor_poles = self.get_parameter("motor_poles").value,
invert_right = self.get_parameter("invert_right").value,
)
# Latest eRPM from each motor (updated by subscription callbacks)
self._erpm_left: float = 0.0
self._erpm_right: float = 0.0
self._last_time: float | None = None
# ── Publishers ─────────────────────────────────────────────────────────
self._pub_odom = self.create_publisher(Odometry, "/odom", 10)
if self._pub_wheel_en:
self._pub_wheel = self.create_publisher(Odometry, "/saltybot/wheel_odom", 10)
else:
self._pub_wheel = None
if self._publish_tf:
self._tf_br = TransformBroadcaster(self)
else:
self._tf_br = None
# ── Subscriptions ──────────────────────────────────────────────────────
self.create_subscription(
String,
f"/vesc/can_{left_id}/state",
self._on_left,
10,
)
self.create_subscription(
String,
f"/vesc/can_{right_id}/state",
self._on_right,
10,
)
# ── Integration timer ──────────────────────────────────────────────────
self.create_timer(1.0 / freq, self._on_timer)
self.get_logger().info(
f"vesc_can_odometry: left=CAN{left_id} right=CAN{right_id} "
f"r={self._odom.wheel_radius}m sep={self._odom.wheel_separation}m "
f"poles={self._odom.motor_poles} invert_right={self._odom.invert_right} "
f"{self._odom_frame}{self._base_frame} @ {freq:.0f}Hz"
)
# ── RPM callbacks ──────────────────────────────────────────────────────────
def _parse_erpm(self, msg: String) -> float | None:
"""Extract eRPM from JSON state message; return None on parse error."""
try:
return float(json.loads(msg.data).get("rpm", 0))
except (json.JSONDecodeError, ValueError, TypeError):
return None
def _on_left(self, msg: String) -> None:
erpm = self._parse_erpm(msg)
if erpm is not None:
self._erpm_left = erpm
def _on_right(self, msg: String) -> None:
erpm = self._parse_erpm(msg)
if erpm is not None:
self._erpm_right = erpm
# ── Integration timer ──────────────────────────────────────────────────────
def _on_timer(self) -> None:
"""Integrate odometry and publish at fixed rate."""
now = self.get_clock().now()
now_s = now.nanoseconds * 1e-9
if self._last_time is None:
self._last_time = now_s
return
dt = now_s - self._last_time
self._last_time = now_s
if dt <= 0.0 or dt > 1.0: # guard against clock jumps / stalls
return
v_lin, v_ang = self._odom.update(self._erpm_left, self._erpm_right, dt)
stamp = now.to_msg()
odom_msg = self._build_odom_msg(stamp, v_lin, v_ang)
self._pub_odom.publish(odom_msg)
if self._pub_wheel is not None:
wheel_msg = self._build_odom_msg(stamp, v_lin, v_ang)
self._pub_wheel.publish(wheel_msg)
if self._tf_br is not None:
self._publish_tf(stamp)
# ── Message builders ───────────────────────────────────────────────────────
def _build_odom_msg(self, stamp, v_lin: float, v_ang: float) -> Odometry:
msg = Odometry()
msg.header.stamp = stamp
msg.header.frame_id = self._odom_frame
msg.child_frame_id = self._base_frame
msg.pose.pose.position.x = self._odom.x
msg.pose.pose.position.y = self._odom.y
msg.pose.pose.position.z = 0.0
msg.pose.pose.orientation = _yaw_to_quaternion(self._odom.theta)
# 6×6 row-major pose covariance [x, y, z, roll, pitch, yaw]
cov_p = [0.0] * 36
cov_p[0] = self._cov_xx # x,x
cov_p[7] = self._cov_yy # y,y
cov_p[14] = 1e-4 # z,z (not estimated)
cov_p[21] = 1e-4 # roll (not estimated)
cov_p[28] = 1e-4 # pitch (not estimated)
cov_p[35] = self._cov_aa # yaw,yaw
msg.pose.covariance = cov_p
msg.twist.twist.linear.x = v_lin
msg.twist.twist.linear.y = 0.0
msg.twist.twist.linear.z = 0.0
msg.twist.twist.angular.z = v_ang
# 6×6 row-major twist covariance [vx, vy, vz, wx, wy, wz]
cov_t = [0.0] * 36
cov_t[0] = self._cov_vx # vx,vx
cov_t[7] = 1e-4 # vy (constrained to ~0)
cov_t[14] = 1e-4 # vz
cov_t[21] = 1e-4 # wx
cov_t[28] = 1e-4 # wy
cov_t[35] = self._cov_wz # wz,wz
msg.twist.covariance = cov_t
return msg
def _publish_tf(self, stamp) -> None:
tf = TransformStamped()
tf.header.stamp = stamp
tf.header.frame_id = self._odom_frame
tf.child_frame_id = self._base_frame
tf.transform.translation.x = self._odom.x
tf.transform.translation.y = self._odom.y
tf.transform.translation.z = 0.0
tf.transform.rotation = _yaw_to_quaternion(self._odom.theta)
self._tf_br.sendTransform(tf)
def main(args=None) -> None:
rclpy.init(args=args) rclpy.init(args=args)
node = VESCOdometryBridge() node = VESCCANOdometryNode()
try: try:
rclpy.spin(node) rclpy.spin(node)
except KeyboardInterrupt: except KeyboardInterrupt:

View File

@ -22,6 +22,7 @@ setup(
"config/global_costmap_params.yaml", "config/global_costmap_params.yaml",
"config/local_costmap_params.yaml", "config/local_costmap_params.yaml",
"config/dwb_local_planner_params.yaml", "config/dwb_local_planner_params.yaml",
"config/vesc_odometry_params.yaml",
]), ]),
], ],
install_requires=["setuptools"], install_requires=["setuptools"],
@ -34,6 +35,7 @@ setup(
entry_points={ entry_points={
"console_scripts": [ "console_scripts": [
"vesc_odometry_bridge = saltybot_nav2_slam.vesc_odometry_bridge:main", "vesc_odometry_bridge = saltybot_nav2_slam.vesc_odometry_bridge:main",
"vesc_can_odometry = saltybot_nav2_slam.vesc_odometry_bridge:main",
], ],
}, },
) )

View File

@ -0,0 +1,257 @@
"""Unit tests for differential drive odometry kinematics (Issue #646).
Tests run without any ROS2 installation DiffDriveOdometry has no ROS deps.
"""
import math
import sys
import os
import pytest
# Allow import from package source without installing
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from saltybot_nav2_slam.diff_drive_odom import DiffDriveOdometry
def make_odom(**kwargs) -> DiffDriveOdometry:
defaults = dict(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7, invert_right=False)
defaults.update(kwargs)
return DiffDriveOdometry(**defaults)
# ── Construction & reset ──────────────────────────────────────────────────────
def test_initial_state_zero():
odom = make_odom()
assert odom.x == 0.0
assert odom.y == 0.0
assert odom.theta == 0.0
assert odom.v_lin == 0.0
assert odom.v_ang == 0.0
def test_reset_sets_pose():
odom = make_odom()
odom.reset(x=1.0, y=2.0, theta=math.pi / 4)
assert odom.x == 1.0
assert odom.y == 2.0
assert abs(odom.theta - math.pi / 4) < 1e-12
def test_reset_clears_velocity():
odom = make_odom()
# Drive forward then reset
odom.update(erpm_left=4200, erpm_right=4200, dt=0.1)
assert odom.v_lin != 0.0
odom.reset()
assert odom.v_lin == 0.0
assert odom.v_ang == 0.0
# ── eRPM → velocity conversion ────────────────────────────────────────────────
def test_erpm_to_wheel_vel_zero():
odom = make_odom(wheel_radius=0.10, motor_poles=7)
assert odom.erpm_to_wheel_vel(0.0) == 0.0
def test_erpm_to_wheel_vel_positive():
odom = make_odom(wheel_radius=0.10, motor_poles=7)
# 4200 eRPM / 7 poles = 600 mech RPM = 10 rev/s = 10 ×× 0.1 m/s ≈ 6.283 m/s
expected = (4200 / 7 / 60.0) * (2.0 * math.pi) * 0.10
assert abs(odom.erpm_to_wheel_vel(4200) - expected) < 1e-9
def test_erpm_to_wheel_vel_negative():
odom = make_odom(wheel_radius=0.10, motor_poles=7)
pos = odom.erpm_to_wheel_vel(4200)
neg = odom.erpm_to_wheel_vel(-4200)
assert abs(pos + neg) < 1e-9
# ── Straight-line motion ──────────────────────────────────────────────────────
def test_straight_forward():
"""Both wheels same eRPM → straight forward motion."""
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7)
v_lin, v_ang = odom.update(erpm_left=4200, erpm_right=4200, dt=1.0)
assert v_ang == pytest.approx(0.0, abs=1e-9)
assert v_lin > 0.0
assert odom.x > 0.0
assert abs(odom.y) < 1e-9
assert abs(odom.theta) < 1e-9
def test_straight_backward():
"""Both wheels negative same eRPM → straight backward motion."""
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7)
v_lin, v_ang = odom.update(erpm_left=-4200, erpm_right=-4200, dt=1.0)
assert v_ang == pytest.approx(0.0, abs=1e-9)
assert v_lin < 0.0
assert odom.x < 0.0
assert abs(odom.y) < 1e-9
def test_straight_distance():
"""Verify computed distance matches kinematics formula."""
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=1)
# 600 eRPM / 1 pole = 600 mech RPM = 10 rev/s = 10××0.10 = 6.2832 m/s
expected_v = (600.0 / 60.0) * (2.0 * math.pi) * 0.10
dt = 2.0
odom.update(erpm_left=600, erpm_right=600, dt=dt)
assert abs(odom.x - expected_v * dt) < 1e-9
assert abs(odom.y) < 1e-9
# ── Zero-radius (point) rotation ──────────────────────────────────────────────
def test_spin_in_place_cw():
"""Right faster than left → rotate clockwise (negative omega)."""
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7)
# right > left → v_ang = (v_right - v_left) / sep > 0 → CCW in ROS convention
v_lin, v_ang = odom.update(erpm_left=-4200, erpm_right=4200, dt=1.0)
assert v_ang > 0.0 # CCW
assert abs(v_lin) < 1e-9 # no net translation
assert abs(odom.x) < 1e-6
assert abs(odom.y) < 1e-6
def test_spin_in_place_ccw():
"""Left faster than right → clockwise (negative omega)."""
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7)
v_lin, v_ang = odom.update(erpm_left=4200, erpm_right=-4200, dt=1.0)
assert v_ang < 0.0 # CW
assert abs(v_lin) < 1e-9
assert abs(odom.x) < 1e-6
assert abs(odom.y) < 1e-6
def test_full_rotation():
"""Spin in place for exactly 2π, ending back at (0, 0, 0)."""
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=1)
# v_right = +1 m/s, v_left = -1 m/s → v_ang = 2/0.35 rad/s
# t_full = 2π / v_ang → drive for exactly that long
v_ang_expected = 2.0 / 0.35
t_full = (2.0 * math.pi) / v_ang_expected
# eRPM to produce 1 m/s: 1 = (eRPM/60)××0.10 → eRPM = 600/2π×60 ≈ 95.49
erpm = (1.0 / (2.0 * math.pi * 0.10)) * 60.0
odom.update(erpm_left=-erpm, erpm_right=erpm, dt=t_full)
assert abs(odom.x) < 1e-6
assert abs(odom.y) < 1e-6
assert abs(odom.theta) < 1e-6
# ── Arc motion ────────────────────────────────────────────────────────────────
def test_quarter_circle_arc():
"""Drive a quarter circle: verify end position matches arc geometry."""
radius = 1.0 # m — desired turn radius
sep = 0.35 # m — wheel separation
r_left = radius - sep / 2.0
r_right = radius + sep / 2.0
# v_lin = (v_r + v_l)/2 = radius × omega
# v_ang = (v_r - v_l)/sep = omega
# choose omega = 1 rad/s → t_quarter = π/2 s
omega = 1.0 # rad/s
v_left = r_left * omega
v_right = r_right * omega
t_quarter = math.pi / 2.0
# Convert velocities to eRPM with motor_poles=1
def vel_to_erpm(v, r=0.10): return (v / (2.0 * math.pi * r)) * 60.0
odom = make_odom(wheel_radius=0.10, wheel_separation=sep, motor_poles=1)
odom.update(
erpm_left=vel_to_erpm(v_left),
erpm_right=vel_to_erpm(v_right),
dt=t_quarter,
)
# After a CCW quarter circle of radius `radius`:
# x = radius * sin(π/2) = radius
# y = radius * (1 - cos(π/2)) = radius
# theta = π/2
assert abs(odom.x - radius) < 1e-3, f"x={odom.x:.4f} expected≈{radius}"
assert abs(odom.y - radius) < 1e-3, f"y={odom.y:.4f} expected≈{radius}"
assert abs(odom.theta - math.pi / 2.0) < 1e-6
# ── invert_right flag ────────────────────────────────────────────────────────
def test_invert_right_reverses_direction():
"""With invert_right=True, same positive eRPM on both should still go forward."""
odom_normal = make_odom(invert_right=False)
odom_invert = make_odom(invert_right=True)
# Without inversion: left=positive, right=positive → both forward → straight forward
odom_normal.update(erpm_left=4200, erpm_right=4200, dt=1.0)
# With inversion: right eRPM is negated internally → left=+, right=- → net forward ?
odom_invert.update(erpm_left=4200, erpm_right=-4200, dt=1.0)
# Both should yield same result (inversion compensates the physical mirror)
assert abs(odom_normal.x - odom_invert.x) < 1e-9
assert abs(odom_normal.y - odom_invert.y) < 1e-9
assert abs(odom_normal.theta - odom_invert.theta) < 1e-9
# ── Heading wrap ──────────────────────────────────────────────────────────────
def test_heading_wraps_at_pi():
"""Heading should always stay within (-π, π]."""
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=1)
erpm = (1.0 / (2.0 * math.pi * 0.10)) * 60.0
for _ in range(20):
odom.update(erpm_left=-erpm, erpm_right=erpm, dt=0.5)
assert -math.pi < odom.theta <= math.pi, f"theta={odom.theta} out of range"
# ── Guard conditions ──────────────────────────────────────────────────────────
def test_zero_dt_no_change():
"""dt=0 should not change state."""
odom = make_odom()
odom.update(erpm_left=4200, erpm_right=4200, dt=0.0)
assert odom.x == 0.0
assert odom.y == 0.0
assert odom.theta == 0.0
def test_negative_dt_no_change():
"""Negative dt should not change state."""
odom = make_odom()
odom.update(erpm_left=4200, erpm_right=4200, dt=-1.0)
assert odom.x == 0.0
def test_zero_erpm_no_motion():
"""Zero RPM on both motors → no motion."""
odom = make_odom()
odom.update(erpm_left=0, erpm_right=0, dt=1.0)
assert odom.x == 0.0
assert odom.y == 0.0
assert odom.theta == 0.0
assert odom.v_lin == 0.0
assert odom.v_ang == 0.0
# ── Velocity output ───────────────────────────────────────────────────────────
def test_update_returns_velocities():
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7)
v_lin, v_ang = odom.update(erpm_left=4200, erpm_right=4200, dt=0.02)
assert v_lin == odom.v_lin
assert v_ang == odom.v_ang
def test_covariance_fields():
"""Verify covariance arrays have correct length for nav_msgs/Odometry."""
# 6×6 = 36 elements
assert 36 == 36 # placeholder — actual covariance set in ROS node
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@ -26,5 +26,11 @@ pose_fusion:
uwb_pose_topic: /saltybot/pose/fused_cov # from saltybot_uwb_imu_fusion uwb_pose_topic: /saltybot/pose/fused_cov # from saltybot_uwb_imu_fusion
vo_topic: /saltybot/visual_odom # from saltybot_visual_odom vo_topic: /saltybot/visual_odom # from saltybot_visual_odom
# ── Wheel odometry (/saltybot/wheel_odom) from saltybot_nav2_slam ──────────
use_wheel_odom: true
wheel_odom_topic: /saltybot/wheel_odom
sigma_wheel_vel_m_s: 0.03 # wheel linear-velocity 1-σ (m/s) — tighter than VO
sigma_wheel_omega_r_s: 0.02 # wheel yaw-rate 1-σ (rad/s)
map_frame: map map_frame: map
base_frame: base_link base_frame: base_link

View File

@ -141,6 +141,10 @@ class PoseFusionNode(Node):
self.declare_parameter('imu_topic', '/imu/data') self.declare_parameter('imu_topic', '/imu/data')
self.declare_parameter('uwb_pose_topic', '/saltybot/pose/fused_cov') self.declare_parameter('uwb_pose_topic', '/saltybot/pose/fused_cov')
self.declare_parameter('vo_topic', '/saltybot/visual_odom') self.declare_parameter('vo_topic', '/saltybot/visual_odom')
self.declare_parameter('wheel_odom_topic', '/saltybot/wheel_odom')
self.declare_parameter('sigma_wheel_vel_m_s', 0.03) # wheel vel 1-σ (m/s)
self.declare_parameter('sigma_wheel_omega_r_s', 0.02) # wheel ωz 1-σ (rad/s)
self.declare_parameter('use_wheel_odom', True)
self._map_frame = self.get_parameter('map_frame').value self._map_frame = self.get_parameter('map_frame').value
self._base_frame = self.get_parameter('base_frame').value self._base_frame = self.get_parameter('base_frame').value
@ -150,6 +154,9 @@ class PoseFusionNode(Node):
self._uwb_warn = self.get_parameter('uwb_dropout_warn_s').value self._uwb_warn = self.get_parameter('uwb_dropout_warn_s').value
self._uwb_max = self.get_parameter('uwb_dropout_max_s').value self._uwb_max = self.get_parameter('uwb_dropout_max_s').value
self._vo_warn = self.get_parameter('vo_dropout_warn_s').value self._vo_warn = self.get_parameter('vo_dropout_warn_s').value
self._sigma_wheel_vel = self.get_parameter('sigma_wheel_vel_m_s').value
self._sigma_wheel_omega = self.get_parameter('sigma_wheel_omega_r_s').value
self._use_wheel_odom = self.get_parameter('use_wheel_odom').value
# ── EKF ─────────────────────────────────────────────────────────────── # ── EKF ───────────────────────────────────────────────────────────────
self._ekf = PoseFusionEKF( self._ekf = PoseFusionEKF(
@ -194,14 +201,27 @@ class PoseFusionNode(Node):
_SENSOR_QOS, _SENSOR_QOS,
) )
if self._use_wheel_odom:
self.create_subscription(
Odometry,
self.get_parameter('wheel_odom_topic').value,
self._on_wheel_odom,
_SENSOR_QOS,
)
# ── Status timer (10 Hz) ──────────────────────────────────────────────── # ── Status timer (10 Hz) ────────────────────────────────────────────────
self.create_timer(0.1, self._on_status_timer) self.create_timer(0.1, self._on_status_timer)
wheel_status = (
f' WheelOdom:{self.get_parameter("wheel_odom_topic").value}'
if self._use_wheel_odom else ''
)
self.get_logger().info( self.get_logger().info(
f'pose_fusion ready — ' f'pose_fusion ready — '
f'IMU:{self.get_parameter("imu_topic").value} ' f'IMU:{self.get_parameter("imu_topic").value} '
f'UWB:{self.get_parameter("uwb_pose_topic").value} ' f'UWB:{self.get_parameter("uwb_pose_topic").value} '
f'VO:{self.get_parameter("vo_topic").value} ' f'VO:{self.get_parameter("vo_topic").value}'
f'{wheel_status} '
f'map:{self._map_frame}{self._base_frame}' f'map:{self._map_frame}{self._base_frame}'
) )
@ -313,6 +333,41 @@ class PoseFusionNode(Node):
sigma_omega = sigma_omega, sigma_omega = sigma_omega,
) )
# ── Wheel-odometry callback — velocity update (Issue #646) ────────────────
def _on_wheel_odom(self, msg: Odometry) -> None:
"""Fuse wheel odometry velocity into EKF (same path as visual odometry)."""
if not self._ekf.is_initialised:
return
vx_body = msg.twist.twist.linear.x
omega = msg.twist.twist.angular.z
# Extract per-axis sigma from twist covariance when available
sigma_vel: float | None = None
sigma_omega: float | None = None
cov = list(msg.twist.covariance)
if len(cov) == 36:
p_vx = cov[0]
p_wz = cov[35]
if p_vx > 0.0:
sigma_vel = float(math.sqrt(p_vx))
if p_wz > 0.0:
sigma_omega = float(math.sqrt(p_wz))
# Fall back to parameter-configured sigmas
if sigma_vel is None:
sigma_vel = self._sigma_wheel_vel
if sigma_omega is None:
sigma_omega = self._sigma_wheel_omega
self._ekf.update_vo_velocity(
vx_body = vx_body,
omega = omega,
sigma_vel = sigma_vel,
sigma_omega = sigma_omega,
)
# ── Status timer (10 Hz) ────────────────────────────────────────────────── # ── Status timer (10 Hz) ──────────────────────────────────────────────────
def _on_status_timer(self) -> None: def _on_status_timer(self) -> None: