Merge remote-tracking branch 'origin/sl-perception/issue-646-vesc-odometry'
This commit is contained in:
commit
d9b4b10b90
@ -0,0 +1,28 @@
|
|||||||
|
vesc_can_odometry:
|
||||||
|
ros__parameters:
|
||||||
|
# ── CAN motor IDs ────────────────────────────────────────────────────────
|
||||||
|
left_can_id: 61 # left motor VESC CAN ID → /vesc/can_61/state
|
||||||
|
right_can_id: 79 # right motor VESC CAN ID → /vesc/can_79/state
|
||||||
|
|
||||||
|
# ── Drive geometry ───────────────────────────────────────────────────────
|
||||||
|
wheel_radius: 0.10 # wheel radius (m)
|
||||||
|
wheel_separation: 0.35 # track width: centre-to-centre (m)
|
||||||
|
motor_poles: 7 # BLDC pole pairs (eRPM / motor_poles = mech RPM)
|
||||||
|
invert_right: true # negate right eRPM (motors physically mirrored)
|
||||||
|
|
||||||
|
# ── Frames & topics ──────────────────────────────────────────────────────
|
||||||
|
odom_frame_id: odom
|
||||||
|
base_frame_id: base_link
|
||||||
|
publish_tf: true
|
||||||
|
publish_wheel_odom: true # also publish on /saltybot/wheel_odom for pose_fusion
|
||||||
|
|
||||||
|
# ── Timing ───────────────────────────────────────────────────────────────
|
||||||
|
update_frequency: 50.0 # Hz — integration rate
|
||||||
|
|
||||||
|
# ── Covariance (diagonal variances) ─────────────────────────────────────
|
||||||
|
# Increase these values if the robot slips frequently.
|
||||||
|
pose_cov_xx: 0.001 # x variance (m²)
|
||||||
|
pose_cov_yy: 0.001 # y variance (m²)
|
||||||
|
pose_cov_aa: 0.010 # heading variance (rad²)
|
||||||
|
twist_cov_vx: 0.010 # vx variance ((m/s)²)
|
||||||
|
twist_cov_wz: 0.020 # ωz variance ((rad/s)²)
|
||||||
@ -1,4 +1,4 @@
|
|||||||
"""VESC Telemetry to Odometry Bridge"""
|
"""VESC CAN dual-motor odometry bridge (Issue #646)."""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
from launch import LaunchDescription
|
from launch import LaunchDescription
|
||||||
@ -9,28 +9,22 @@ from ament_index_python.packages import get_package_share_directory
|
|||||||
|
|
||||||
|
|
||||||
def generate_launch_description():
|
def generate_launch_description():
|
||||||
use_sim_time = LaunchConfiguration('use_sim_time')
|
pkg = get_package_share_directory("saltybot_nav2_slam")
|
||||||
|
params_file = os.path.join(pkg, "config", "vesc_odometry_params.yaml")
|
||||||
|
|
||||||
|
use_sim_time = LaunchConfiguration("use_sim_time")
|
||||||
|
|
||||||
return LaunchDescription([
|
return LaunchDescription([
|
||||||
DeclareLaunchArgument('use_sim_time', default_value='false'),
|
DeclareLaunchArgument("use_sim_time", default_value="false"),
|
||||||
|
|
||||||
Node(
|
Node(
|
||||||
package='saltybot_nav2_slam',
|
package="saltybot_nav2_slam",
|
||||||
executable='vesc_odometry_bridge',
|
executable="vesc_can_odometry",
|
||||||
name='vesc_odometry_bridge',
|
name="vesc_can_odometry",
|
||||||
parameters=[{
|
parameters=[
|
||||||
'use_sim_time': use_sim_time,
|
params_file,
|
||||||
'vesc_state_topic': '/vesc/state',
|
{"use_sim_time": use_sim_time},
|
||||||
'odometry_topic': '/odom',
|
],
|
||||||
'odom_frame_id': 'odom',
|
output="screen",
|
||||||
'base_frame_id': 'base_link',
|
|
||||||
'update_frequency': 50,
|
|
||||||
'wheel_base': 0.35,
|
|
||||||
'wheel_radius': 0.10,
|
|
||||||
'max_rpm': 60000,
|
|
||||||
'publish_tf': True,
|
|
||||||
'tf_queue_size': 10,
|
|
||||||
}],
|
|
||||||
output='screen',
|
|
||||||
),
|
),
|
||||||
])
|
])
|
||||||
|
|||||||
@ -0,0 +1,145 @@
|
|||||||
|
"""
|
||||||
|
diff_drive_odom.py — Pure differential drive odometry kinematics (Issue #646).
|
||||||
|
|
||||||
|
No ROS2 dependencies — pure Python, fully unit-testable.
|
||||||
|
|
||||||
|
Computes x, y, theta, v_lin, v_ang from per-wheel electrical RPM using exact
|
||||||
|
arc integration. Designed to be wrapped by a ROS2 node.
|
||||||
|
|
||||||
|
Motor convention
|
||||||
|
────────────────
|
||||||
|
eRPM → mechanical RPM = eRPM / motor_poles (VESC reports electrical RPM)
|
||||||
|
Mechanical RPM → wheel surface velocity (m/s) = RPM/60 × 2π × wheel_radius
|
||||||
|
|
||||||
|
If invert_right=True the right eRPM sign is flipped before use (handles
|
||||||
|
physically-mirrored motor mounting where positive eRPM on both motors would
|
||||||
|
otherwise produce counter-rotation rather than forward motion).
|
||||||
|
|
||||||
|
Kinematics
|
||||||
|
──────────
|
||||||
|
v_left = wheel surface velocity of left wheel (m/s)
|
||||||
|
v_right = wheel surface velocity of right wheel (m/s)
|
||||||
|
v_lin = (v_right + v_left) / 2 forward velocity (m/s)
|
||||||
|
v_ang = (v_right - v_left) / wheel_sep yaw rate (rad/s, CCW positive)
|
||||||
|
|
||||||
|
Integration uses the exact arc formula to reduce heading error accumulation:
|
||||||
|
If |v_ang| > ε:
|
||||||
|
r = v_lin / v_ang
|
||||||
|
dtheta = v_ang * dt
|
||||||
|
x += r * (sin(theta + dtheta) - sin(theta))
|
||||||
|
y -= r * (cos(theta + dtheta) - cos(theta))
|
||||||
|
theta += dtheta
|
||||||
|
Else (straight line):
|
||||||
|
x += v_lin * cos(theta) * dt
|
||||||
|
y += v_lin * sin(theta) * dt
|
||||||
|
|
||||||
|
Heading is normalised to (-π, π] after each update.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
|
||||||
|
|
||||||
|
class DiffDriveOdometry:
|
||||||
|
"""
|
||||||
|
Differential drive odometry from dual VESC CAN motor RPM feedback.
|
||||||
|
|
||||||
|
Thread model: single-threaded — caller must serialise access.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
wheel_radius: float = 0.10,
|
||||||
|
wheel_separation: float = 0.35,
|
||||||
|
motor_poles: int = 7,
|
||||||
|
invert_right: bool = True,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
wheel_radius : wheel radius (m)
|
||||||
|
wheel_separation : distance between left and right wheel contact points (m)
|
||||||
|
motor_poles : BLDC motor pole pairs (VESC eRPM / motor_poles = mechanical RPM)
|
||||||
|
invert_right : negate right eRPM before computing velocity (mirrored mount)
|
||||||
|
"""
|
||||||
|
self.wheel_radius = wheel_radius
|
||||||
|
self.wheel_separation = wheel_separation
|
||||||
|
self.motor_poles = motor_poles
|
||||||
|
self.invert_right = invert_right
|
||||||
|
|
||||||
|
# Odometry state (map/odom frame)
|
||||||
|
self.x: float = 0.0
|
||||||
|
self.y: float = 0.0
|
||||||
|
self.theta: float = 0.0
|
||||||
|
|
||||||
|
# Current velocities (body frame)
|
||||||
|
self.v_lin: float = 0.0 # linear (m/s, forward positive)
|
||||||
|
self.v_ang: float = 0.0 # angular (rad/s, CCW positive)
|
||||||
|
|
||||||
|
# ── Unit conversion helpers ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def erpm_to_wheel_vel(self, erpm: float) -> float:
|
||||||
|
"""Convert electrical RPM to wheel surface velocity (m/s)."""
|
||||||
|
mech_rpm = erpm / self.motor_poles
|
||||||
|
return (mech_rpm / 60.0) * (2.0 * math.pi) * self.wheel_radius
|
||||||
|
|
||||||
|
# ── Kinematics update ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def update(
|
||||||
|
self,
|
||||||
|
erpm_left: float,
|
||||||
|
erpm_right: float,
|
||||||
|
dt: float,
|
||||||
|
) -> tuple[float, float]:
|
||||||
|
"""
|
||||||
|
Integrate one differential drive step.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
erpm_left : left motor electrical RPM (positive = forward)
|
||||||
|
erpm_right : right motor electrical RPM (positive = forward before invert)
|
||||||
|
dt : elapsed time (s); must be > 0
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(v_lin, v_ang) — linear (m/s) and angular (rad/s) velocities after update
|
||||||
|
"""
|
||||||
|
if dt <= 0.0:
|
||||||
|
return self.v_lin, self.v_ang
|
||||||
|
|
||||||
|
if self.invert_right:
|
||||||
|
erpm_right = -erpm_right
|
||||||
|
|
||||||
|
v_left = self.erpm_to_wheel_vel(erpm_left)
|
||||||
|
v_right = self.erpm_to_wheel_vel(erpm_right)
|
||||||
|
|
||||||
|
v_lin = (v_right + v_left) / 2.0
|
||||||
|
v_ang = (v_right - v_left) / self.wheel_separation
|
||||||
|
|
||||||
|
# Exact arc integration
|
||||||
|
if abs(v_ang) > 1e-6:
|
||||||
|
r_turn = v_lin / v_ang
|
||||||
|
dtheta = v_ang * dt
|
||||||
|
self.x += r_turn * (math.sin(self.theta + dtheta) - math.sin(self.theta))
|
||||||
|
self.y -= r_turn * (math.cos(self.theta + dtheta) - math.cos(self.theta))
|
||||||
|
self.theta += dtheta
|
||||||
|
else:
|
||||||
|
self.x += v_lin * math.cos(self.theta) * dt
|
||||||
|
self.y += v_lin * math.sin(self.theta) * dt
|
||||||
|
|
||||||
|
# Normalise heading to (-π, π]
|
||||||
|
self.theta = (self.theta + math.pi) % (2.0 * math.pi) - math.pi
|
||||||
|
|
||||||
|
self.v_lin = v_lin
|
||||||
|
self.v_ang = v_ang
|
||||||
|
|
||||||
|
return v_lin, v_ang
|
||||||
|
|
||||||
|
def reset(self, x: float = 0.0, y: float = 0.0, theta: float = 0.0) -> None:
|
||||||
|
"""Reset odometry state to given pose."""
|
||||||
|
self.x = x
|
||||||
|
self.y = y
|
||||||
|
self.theta = theta
|
||||||
|
self.v_lin = 0.0
|
||||||
|
self.v_ang = 0.0
|
||||||
@ -1,9 +1,23 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""VESC telemetry to Nav2 odometry bridge."""
|
"""
|
||||||
|
vesc_odometry_bridge.py — Differential drive odometry from dual VESC CAN motors (Issue #646).
|
||||||
|
|
||||||
|
Subscribes to per-motor VESC telemetry topics for CAN IDs 61 (left) and 79 (right),
|
||||||
|
applies differential drive kinematics via DiffDriveOdometry, and publishes:
|
||||||
|
|
||||||
|
/odom nav_msgs/Odometry — consumed by Nav2 / slam_toolbox
|
||||||
|
/saltybot/wheel_odom nav_msgs/Odometry — consumed by saltybot_pose_fusion EKF
|
||||||
|
TF odom → base_link
|
||||||
|
|
||||||
|
Input topics (std_msgs/String, JSON):
|
||||||
|
/vesc/can_61/state {"rpm": <eRPM>, "voltage_v": ..., "current_a": ..., ...}
|
||||||
|
/vesc/can_79/state same schema
|
||||||
|
|
||||||
|
Replaces the old single-motor vesc_odometry_bridge that used /vesc/state.
|
||||||
|
"""
|
||||||
|
|
||||||
import json
|
import json
|
||||||
import math
|
import math
|
||||||
import time
|
|
||||||
|
|
||||||
import rclpy
|
import rclpy
|
||||||
from rclpy.node import Node
|
from rclpy.node import Node
|
||||||
@ -12,161 +26,220 @@ from nav_msgs.msg import Odometry
|
|||||||
from geometry_msgs.msg import TransformStamped, Quaternion
|
from geometry_msgs.msg import TransformStamped, Quaternion
|
||||||
from tf2_ros import TransformBroadcaster
|
from tf2_ros import TransformBroadcaster
|
||||||
|
|
||||||
|
from .diff_drive_odom import DiffDriveOdometry
|
||||||
|
|
||||||
class VESCOdometryBridge(Node):
|
|
||||||
"""ROS2 node bridging VESC telemetry to Nav2 odometry."""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
super().__init__("vesc_odometry_bridge")
|
|
||||||
|
|
||||||
self.declare_parameter("vesc_state_topic", "/vesc/state")
|
|
||||||
self.declare_parameter("odometry_topic", "/odom")
|
|
||||||
self.declare_parameter("odom_frame_id", "odom")
|
|
||||||
self.declare_parameter("base_frame_id", "base_link")
|
|
||||||
self.declare_parameter("update_frequency", 50)
|
|
||||||
self.declare_parameter("wheel_base", 0.35)
|
|
||||||
self.declare_parameter("wheel_radius", 0.10)
|
|
||||||
self.declare_parameter("max_rpm", 60000)
|
|
||||||
self.declare_parameter("publish_tf", True)
|
|
||||||
|
|
||||||
self.vesc_state_topic = self.get_parameter("vesc_state_topic").value
|
|
||||||
self.odometry_topic = self.get_parameter("odometry_topic").value
|
|
||||||
self.odom_frame_id = self.get_parameter("odom_frame_id").value
|
|
||||||
self.base_frame_id = self.get_parameter("base_frame_id").value
|
|
||||||
frequency = self.get_parameter("update_frequency").value
|
|
||||||
self.wheel_base = self.get_parameter("wheel_base").value
|
|
||||||
self.wheel_radius = self.get_parameter("wheel_radius").value
|
|
||||||
self.publish_tf = self.get_parameter("publish_tf").value
|
|
||||||
|
|
||||||
self.last_rpm = 0
|
|
||||||
self.last_update_time = time.time()
|
|
||||||
|
|
||||||
self.x = 0.0
|
|
||||||
self.y = 0.0
|
|
||||||
self.theta = 0.0
|
|
||||||
self.vx = 0.0
|
|
||||||
self.vy = 0.0
|
|
||||||
self.vtheta = 0.0
|
|
||||||
|
|
||||||
self.pub_odom = self.create_publisher(Odometry, self.odometry_topic, 10)
|
|
||||||
|
|
||||||
if self.publish_tf:
|
|
||||||
self.tf_broadcaster = TransformBroadcaster(self)
|
|
||||||
|
|
||||||
self.create_subscription(String, self.vesc_state_topic, self._on_vesc_state, 10)
|
|
||||||
|
|
||||||
period = 1.0 / frequency
|
|
||||||
self.create_timer(period, self._publish_odometry)
|
|
||||||
|
|
||||||
self.get_logger().info(
|
|
||||||
f"VESC odometry bridge initialized: "
|
|
||||||
f"wheel_base={self.wheel_base}m, wheel_radius={self.wheel_radius}m"
|
|
||||||
)
|
|
||||||
|
|
||||||
def _on_vesc_state(self, msg: String) -> None:
|
|
||||||
"""Update VESC telemetry from JSON."""
|
|
||||||
try:
|
|
||||||
data = json.loads(msg.data)
|
|
||||||
current_rpm = data.get("rpm", 0)
|
|
||||||
motor_rad_s = (current_rpm / 60.0) * (2.0 * math.pi)
|
|
||||||
wheel_velocity = motor_rad_s * self.wheel_radius
|
|
||||||
|
|
||||||
self.vx = wheel_velocity
|
|
||||||
self.vy = 0.0
|
|
||||||
self.vtheta = 0.0
|
|
||||||
self.last_rpm = current_rpm
|
|
||||||
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def _publish_odometry(self) -> None:
|
|
||||||
"""Publish odometry message and TF."""
|
|
||||||
current_time = time.time()
|
|
||||||
dt = current_time - self.last_update_time
|
|
||||||
self.last_update_time = current_time
|
|
||||||
|
|
||||||
if abs(self.vtheta) > 0.001:
|
|
||||||
self.theta += self.vtheta * dt
|
|
||||||
self.x += (self.vx / self.vtheta) * (math.sin(self.theta) - math.sin(self.theta - self.vtheta * dt))
|
|
||||||
self.y += (self.vx / self.vtheta) * (math.cos(self.theta - self.vtheta * dt) - math.cos(self.theta))
|
|
||||||
else:
|
|
||||||
self.x += self.vx * math.cos(self.theta) * dt
|
|
||||||
self.y += self.vx * math.sin(self.theta) * dt
|
|
||||||
|
|
||||||
odom_msg = Odometry()
|
|
||||||
odom_msg.header.stamp = self.get_clock().now().to_msg()
|
|
||||||
odom_msg.header.frame_id = self.odom_frame_id
|
|
||||||
odom_msg.child_frame_id = self.base_frame_id
|
|
||||||
|
|
||||||
odom_msg.pose.pose.position.x = self.x
|
|
||||||
odom_msg.pose.pose.position.y = self.y
|
|
||||||
odom_msg.pose.pose.position.z = 0.0
|
|
||||||
odom_msg.pose.pose.orientation = self._euler_to_quaternion(0, 0, self.theta)
|
|
||||||
|
|
||||||
odom_msg.pose.covariance = [
|
|
||||||
0.1, 0, 0, 0, 0, 0,
|
|
||||||
0, 0.1, 0, 0, 0, 0,
|
|
||||||
0, 0, 0.1, 0, 0, 0,
|
|
||||||
0, 0, 0, 0.1, 0, 0,
|
|
||||||
0, 0, 0, 0, 0.1, 0,
|
|
||||||
0, 0, 0, 0, 0, 0.1,
|
|
||||||
]
|
|
||||||
|
|
||||||
odom_msg.twist.twist.linear.x = self.vx
|
|
||||||
odom_msg.twist.twist.linear.y = self.vy
|
|
||||||
odom_msg.twist.twist.linear.z = 0.0
|
|
||||||
odom_msg.twist.twist.angular.z = self.vtheta
|
|
||||||
|
|
||||||
odom_msg.twist.covariance = [
|
|
||||||
0.05, 0, 0, 0, 0, 0,
|
|
||||||
0, 0.05, 0, 0, 0, 0,
|
|
||||||
0, 0, 0.05, 0, 0, 0,
|
|
||||||
0, 0, 0, 0.05, 0, 0,
|
|
||||||
0, 0, 0, 0, 0.05, 0,
|
|
||||||
0, 0, 0, 0, 0, 0.05,
|
|
||||||
]
|
|
||||||
|
|
||||||
self.pub_odom.publish(odom_msg)
|
|
||||||
|
|
||||||
if self.publish_tf:
|
|
||||||
self._publish_tf(odom_msg.header.stamp)
|
|
||||||
|
|
||||||
def _publish_tf(self, timestamp) -> None:
|
|
||||||
"""Publish odom → base_link TF."""
|
|
||||||
tf_msg = TransformStamped()
|
|
||||||
tf_msg.header.stamp = timestamp
|
|
||||||
tf_msg.header.frame_id = self.odom_frame_id
|
|
||||||
tf_msg.child_frame_id = self.base_frame_id
|
|
||||||
|
|
||||||
tf_msg.transform.translation.x = self.x
|
|
||||||
tf_msg.transform.translation.y = self.y
|
|
||||||
tf_msg.transform.translation.z = 0.0
|
|
||||||
tf_msg.transform.rotation = self._euler_to_quaternion(0, 0, self.theta)
|
|
||||||
|
|
||||||
self.tf_broadcaster.sendTransform(tf_msg)
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _euler_to_quaternion(roll: float, pitch: float, yaw: float) -> Quaternion:
|
|
||||||
"""Convert Euler angles to quaternion."""
|
|
||||||
cy = math.cos(yaw * 0.5)
|
|
||||||
sy = math.sin(yaw * 0.5)
|
|
||||||
cp = math.cos(pitch * 0.5)
|
|
||||||
sp = math.sin(pitch * 0.5)
|
|
||||||
cr = math.cos(roll * 0.5)
|
|
||||||
sr = math.sin(roll * 0.5)
|
|
||||||
|
|
||||||
|
def _yaw_to_quaternion(yaw: float) -> Quaternion:
|
||||||
|
"""Convert yaw angle (rad) to geometry_msgs/Quaternion (rotation about Z)."""
|
||||||
|
half = yaw * 0.5
|
||||||
q = Quaternion()
|
q = Quaternion()
|
||||||
q.w = cr * cp * cy + sr * sp * sy
|
q.w = math.cos(half)
|
||||||
q.x = sr * cp * cy - cr * sp * sy
|
q.x = 0.0
|
||||||
q.y = cr * sp * cy + sr * cp * sy
|
q.y = 0.0
|
||||||
q.z = cr * cp * sy - sr * sp * cy
|
q.z = math.sin(half)
|
||||||
|
|
||||||
return q
|
return q
|
||||||
|
|
||||||
|
|
||||||
def main(args=None):
|
class VESCCANOdometryNode(Node):
|
||||||
|
"""
|
||||||
|
Differential drive odometry from dual VESC CAN motor telemetry.
|
||||||
|
|
||||||
|
Integrates DiffDriveOdometry at a fixed rate using the latest RPM values
|
||||||
|
received from each motor's CAN telemetry topic.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__("vesc_can_odometry")
|
||||||
|
|
||||||
|
# ── Parameters ────────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter("left_can_id", 61)
|
||||||
|
self.declare_parameter("right_can_id", 79)
|
||||||
|
self.declare_parameter("wheel_radius", 0.10) # metres
|
||||||
|
self.declare_parameter("wheel_separation", 0.35) # metres (track width)
|
||||||
|
self.declare_parameter("motor_poles", 7) # BLDC pole pairs
|
||||||
|
self.declare_parameter("invert_right", True) # flip right eRPM sign
|
||||||
|
self.declare_parameter("odom_frame_id", "odom")
|
||||||
|
self.declare_parameter("base_frame_id", "base_link")
|
||||||
|
self.declare_parameter("publish_tf", True)
|
||||||
|
self.declare_parameter("publish_wheel_odom", True)
|
||||||
|
self.declare_parameter("update_frequency", 50.0) # Hz
|
||||||
|
# Covariance tuning (diagonal variances)
|
||||||
|
self.declare_parameter("pose_cov_xx", 1e-3) # x σ² (m²)
|
||||||
|
self.declare_parameter("pose_cov_yy", 1e-3) # y σ² (m²)
|
||||||
|
self.declare_parameter("pose_cov_aa", 1e-2) # heading σ² (rad²)
|
||||||
|
self.declare_parameter("twist_cov_vx", 1e-2) # vx σ² (m/s)²
|
||||||
|
self.declare_parameter("twist_cov_wz", 2e-2) # ωz σ² (rad/s)²
|
||||||
|
|
||||||
|
left_id = self.get_parameter("left_can_id").value
|
||||||
|
right_id = self.get_parameter("right_can_id").value
|
||||||
|
freq = self.get_parameter("update_frequency").value
|
||||||
|
|
||||||
|
self._odom_frame = self.get_parameter("odom_frame_id").value
|
||||||
|
self._base_frame = self.get_parameter("base_frame_id").value
|
||||||
|
self._publish_tf = self.get_parameter("publish_tf").value
|
||||||
|
self._pub_wheel_en = self.get_parameter("publish_wheel_odom").value
|
||||||
|
|
||||||
|
self._cov_xx = self.get_parameter("pose_cov_xx").value
|
||||||
|
self._cov_yy = self.get_parameter("pose_cov_yy").value
|
||||||
|
self._cov_aa = self.get_parameter("pose_cov_aa").value
|
||||||
|
self._cov_vx = self.get_parameter("twist_cov_vx").value
|
||||||
|
self._cov_wz = self.get_parameter("twist_cov_wz").value
|
||||||
|
|
||||||
|
# ── Kinematics engine ─────────────────────────────────────────────────
|
||||||
|
self._odom = DiffDriveOdometry(
|
||||||
|
wheel_radius = self.get_parameter("wheel_radius").value,
|
||||||
|
wheel_separation = self.get_parameter("wheel_separation").value,
|
||||||
|
motor_poles = self.get_parameter("motor_poles").value,
|
||||||
|
invert_right = self.get_parameter("invert_right").value,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Latest eRPM from each motor (updated by subscription callbacks)
|
||||||
|
self._erpm_left: float = 0.0
|
||||||
|
self._erpm_right: float = 0.0
|
||||||
|
|
||||||
|
self._last_time: float | None = None
|
||||||
|
|
||||||
|
# ── Publishers ─────────────────────────────────────────────────────────
|
||||||
|
self._pub_odom = self.create_publisher(Odometry, "/odom", 10)
|
||||||
|
|
||||||
|
if self._pub_wheel_en:
|
||||||
|
self._pub_wheel = self.create_publisher(Odometry, "/saltybot/wheel_odom", 10)
|
||||||
|
else:
|
||||||
|
self._pub_wheel = None
|
||||||
|
|
||||||
|
if self._publish_tf:
|
||||||
|
self._tf_br = TransformBroadcaster(self)
|
||||||
|
else:
|
||||||
|
self._tf_br = None
|
||||||
|
|
||||||
|
# ── Subscriptions ──────────────────────────────────────────────────────
|
||||||
|
self.create_subscription(
|
||||||
|
String,
|
||||||
|
f"/vesc/can_{left_id}/state",
|
||||||
|
self._on_left,
|
||||||
|
10,
|
||||||
|
)
|
||||||
|
self.create_subscription(
|
||||||
|
String,
|
||||||
|
f"/vesc/can_{right_id}/state",
|
||||||
|
self._on_right,
|
||||||
|
10,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Integration timer ──────────────────────────────────────────────────
|
||||||
|
self.create_timer(1.0 / freq, self._on_timer)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"vesc_can_odometry: left=CAN{left_id} right=CAN{right_id} "
|
||||||
|
f"r={self._odom.wheel_radius}m sep={self._odom.wheel_separation}m "
|
||||||
|
f"poles={self._odom.motor_poles} invert_right={self._odom.invert_right} "
|
||||||
|
f"{self._odom_frame}→{self._base_frame} @ {freq:.0f}Hz"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── RPM callbacks ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _parse_erpm(self, msg: String) -> float | None:
|
||||||
|
"""Extract eRPM from JSON state message; return None on parse error."""
|
||||||
|
try:
|
||||||
|
return float(json.loads(msg.data).get("rpm", 0))
|
||||||
|
except (json.JSONDecodeError, ValueError, TypeError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _on_left(self, msg: String) -> None:
|
||||||
|
erpm = self._parse_erpm(msg)
|
||||||
|
if erpm is not None:
|
||||||
|
self._erpm_left = erpm
|
||||||
|
|
||||||
|
def _on_right(self, msg: String) -> None:
|
||||||
|
erpm = self._parse_erpm(msg)
|
||||||
|
if erpm is not None:
|
||||||
|
self._erpm_right = erpm
|
||||||
|
|
||||||
|
# ── Integration timer ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _on_timer(self) -> None:
|
||||||
|
"""Integrate odometry and publish at fixed rate."""
|
||||||
|
now = self.get_clock().now()
|
||||||
|
now_s = now.nanoseconds * 1e-9
|
||||||
|
|
||||||
|
if self._last_time is None:
|
||||||
|
self._last_time = now_s
|
||||||
|
return
|
||||||
|
|
||||||
|
dt = now_s - self._last_time
|
||||||
|
self._last_time = now_s
|
||||||
|
|
||||||
|
if dt <= 0.0 or dt > 1.0: # guard against clock jumps / stalls
|
||||||
|
return
|
||||||
|
|
||||||
|
v_lin, v_ang = self._odom.update(self._erpm_left, self._erpm_right, dt)
|
||||||
|
|
||||||
|
stamp = now.to_msg()
|
||||||
|
odom_msg = self._build_odom_msg(stamp, v_lin, v_ang)
|
||||||
|
self._pub_odom.publish(odom_msg)
|
||||||
|
|
||||||
|
if self._pub_wheel is not None:
|
||||||
|
wheel_msg = self._build_odom_msg(stamp, v_lin, v_ang)
|
||||||
|
self._pub_wheel.publish(wheel_msg)
|
||||||
|
|
||||||
|
if self._tf_br is not None:
|
||||||
|
self._publish_tf(stamp)
|
||||||
|
|
||||||
|
# ── Message builders ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _build_odom_msg(self, stamp, v_lin: float, v_ang: float) -> Odometry:
|
||||||
|
msg = Odometry()
|
||||||
|
msg.header.stamp = stamp
|
||||||
|
msg.header.frame_id = self._odom_frame
|
||||||
|
msg.child_frame_id = self._base_frame
|
||||||
|
|
||||||
|
msg.pose.pose.position.x = self._odom.x
|
||||||
|
msg.pose.pose.position.y = self._odom.y
|
||||||
|
msg.pose.pose.position.z = 0.0
|
||||||
|
msg.pose.pose.orientation = _yaw_to_quaternion(self._odom.theta)
|
||||||
|
|
||||||
|
# 6×6 row-major pose covariance [x, y, z, roll, pitch, yaw]
|
||||||
|
cov_p = [0.0] * 36
|
||||||
|
cov_p[0] = self._cov_xx # x,x
|
||||||
|
cov_p[7] = self._cov_yy # y,y
|
||||||
|
cov_p[14] = 1e-4 # z,z (not estimated)
|
||||||
|
cov_p[21] = 1e-4 # roll (not estimated)
|
||||||
|
cov_p[28] = 1e-4 # pitch (not estimated)
|
||||||
|
cov_p[35] = self._cov_aa # yaw,yaw
|
||||||
|
msg.pose.covariance = cov_p
|
||||||
|
|
||||||
|
msg.twist.twist.linear.x = v_lin
|
||||||
|
msg.twist.twist.linear.y = 0.0
|
||||||
|
msg.twist.twist.linear.z = 0.0
|
||||||
|
msg.twist.twist.angular.z = v_ang
|
||||||
|
|
||||||
|
# 6×6 row-major twist covariance [vx, vy, vz, wx, wy, wz]
|
||||||
|
cov_t = [0.0] * 36
|
||||||
|
cov_t[0] = self._cov_vx # vx,vx
|
||||||
|
cov_t[7] = 1e-4 # vy (constrained to ~0)
|
||||||
|
cov_t[14] = 1e-4 # vz
|
||||||
|
cov_t[21] = 1e-4 # wx
|
||||||
|
cov_t[28] = 1e-4 # wy
|
||||||
|
cov_t[35] = self._cov_wz # wz,wz
|
||||||
|
msg.twist.covariance = cov_t
|
||||||
|
|
||||||
|
return msg
|
||||||
|
|
||||||
|
def _publish_tf(self, stamp) -> None:
|
||||||
|
tf = TransformStamped()
|
||||||
|
tf.header.stamp = stamp
|
||||||
|
tf.header.frame_id = self._odom_frame
|
||||||
|
tf.child_frame_id = self._base_frame
|
||||||
|
tf.transform.translation.x = self._odom.x
|
||||||
|
tf.transform.translation.y = self._odom.y
|
||||||
|
tf.transform.translation.z = 0.0
|
||||||
|
tf.transform.rotation = _yaw_to_quaternion(self._odom.theta)
|
||||||
|
self._tf_br.sendTransform(tf)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None) -> None:
|
||||||
rclpy.init(args=args)
|
rclpy.init(args=args)
|
||||||
node = VESCOdometryBridge()
|
node = VESCCANOdometryNode()
|
||||||
try:
|
try:
|
||||||
rclpy.spin(node)
|
rclpy.spin(node)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
|
|||||||
@ -22,6 +22,7 @@ setup(
|
|||||||
"config/global_costmap_params.yaml",
|
"config/global_costmap_params.yaml",
|
||||||
"config/local_costmap_params.yaml",
|
"config/local_costmap_params.yaml",
|
||||||
"config/dwb_local_planner_params.yaml",
|
"config/dwb_local_planner_params.yaml",
|
||||||
|
"config/vesc_odometry_params.yaml",
|
||||||
]),
|
]),
|
||||||
],
|
],
|
||||||
install_requires=["setuptools"],
|
install_requires=["setuptools"],
|
||||||
@ -34,6 +35,7 @@ setup(
|
|||||||
entry_points={
|
entry_points={
|
||||||
"console_scripts": [
|
"console_scripts": [
|
||||||
"vesc_odometry_bridge = saltybot_nav2_slam.vesc_odometry_bridge:main",
|
"vesc_odometry_bridge = saltybot_nav2_slam.vesc_odometry_bridge:main",
|
||||||
|
"vesc_can_odometry = saltybot_nav2_slam.vesc_odometry_bridge:main",
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|||||||
257
jetson/ros2_ws/src/saltybot_nav2_slam/test/test_vesc_odometry.py
Normal file
257
jetson/ros2_ws/src/saltybot_nav2_slam/test/test_vesc_odometry.py
Normal file
@ -0,0 +1,257 @@
|
|||||||
|
"""Unit tests for differential drive odometry kinematics (Issue #646).
|
||||||
|
|
||||||
|
Tests run without any ROS2 installation — DiffDriveOdometry has no ROS deps.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# Allow import from package source without installing
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||||
|
|
||||||
|
from saltybot_nav2_slam.diff_drive_odom import DiffDriveOdometry
|
||||||
|
|
||||||
|
|
||||||
|
def make_odom(**kwargs) -> DiffDriveOdometry:
|
||||||
|
defaults = dict(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7, invert_right=False)
|
||||||
|
defaults.update(kwargs)
|
||||||
|
return DiffDriveOdometry(**defaults)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Construction & reset ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_initial_state_zero():
|
||||||
|
odom = make_odom()
|
||||||
|
assert odom.x == 0.0
|
||||||
|
assert odom.y == 0.0
|
||||||
|
assert odom.theta == 0.0
|
||||||
|
assert odom.v_lin == 0.0
|
||||||
|
assert odom.v_ang == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_reset_sets_pose():
|
||||||
|
odom = make_odom()
|
||||||
|
odom.reset(x=1.0, y=2.0, theta=math.pi / 4)
|
||||||
|
assert odom.x == 1.0
|
||||||
|
assert odom.y == 2.0
|
||||||
|
assert abs(odom.theta - math.pi / 4) < 1e-12
|
||||||
|
|
||||||
|
|
||||||
|
def test_reset_clears_velocity():
|
||||||
|
odom = make_odom()
|
||||||
|
# Drive forward then reset
|
||||||
|
odom.update(erpm_left=4200, erpm_right=4200, dt=0.1)
|
||||||
|
assert odom.v_lin != 0.0
|
||||||
|
odom.reset()
|
||||||
|
assert odom.v_lin == 0.0
|
||||||
|
assert odom.v_ang == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# ── eRPM → velocity conversion ────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_erpm_to_wheel_vel_zero():
|
||||||
|
odom = make_odom(wheel_radius=0.10, motor_poles=7)
|
||||||
|
assert odom.erpm_to_wheel_vel(0.0) == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_erpm_to_wheel_vel_positive():
|
||||||
|
odom = make_odom(wheel_radius=0.10, motor_poles=7)
|
||||||
|
# 4200 eRPM / 7 poles = 600 mech RPM = 10 rev/s = 10 × 2π × 0.1 m/s ≈ 6.283 m/s
|
||||||
|
expected = (4200 / 7 / 60.0) * (2.0 * math.pi) * 0.10
|
||||||
|
assert abs(odom.erpm_to_wheel_vel(4200) - expected) < 1e-9
|
||||||
|
|
||||||
|
|
||||||
|
def test_erpm_to_wheel_vel_negative():
|
||||||
|
odom = make_odom(wheel_radius=0.10, motor_poles=7)
|
||||||
|
pos = odom.erpm_to_wheel_vel(4200)
|
||||||
|
neg = odom.erpm_to_wheel_vel(-4200)
|
||||||
|
assert abs(pos + neg) < 1e-9
|
||||||
|
|
||||||
|
|
||||||
|
# ── Straight-line motion ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_straight_forward():
|
||||||
|
"""Both wheels same eRPM → straight forward motion."""
|
||||||
|
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7)
|
||||||
|
v_lin, v_ang = odom.update(erpm_left=4200, erpm_right=4200, dt=1.0)
|
||||||
|
|
||||||
|
assert v_ang == pytest.approx(0.0, abs=1e-9)
|
||||||
|
assert v_lin > 0.0
|
||||||
|
assert odom.x > 0.0
|
||||||
|
assert abs(odom.y) < 1e-9
|
||||||
|
assert abs(odom.theta) < 1e-9
|
||||||
|
|
||||||
|
|
||||||
|
def test_straight_backward():
|
||||||
|
"""Both wheels negative same eRPM → straight backward motion."""
|
||||||
|
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7)
|
||||||
|
v_lin, v_ang = odom.update(erpm_left=-4200, erpm_right=-4200, dt=1.0)
|
||||||
|
|
||||||
|
assert v_ang == pytest.approx(0.0, abs=1e-9)
|
||||||
|
assert v_lin < 0.0
|
||||||
|
assert odom.x < 0.0
|
||||||
|
assert abs(odom.y) < 1e-9
|
||||||
|
|
||||||
|
|
||||||
|
def test_straight_distance():
|
||||||
|
"""Verify computed distance matches kinematics formula."""
|
||||||
|
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=1)
|
||||||
|
# 600 eRPM / 1 pole = 600 mech RPM = 10 rev/s = 10×2π×0.10 = 6.2832 m/s
|
||||||
|
expected_v = (600.0 / 60.0) * (2.0 * math.pi) * 0.10
|
||||||
|
dt = 2.0
|
||||||
|
odom.update(erpm_left=600, erpm_right=600, dt=dt)
|
||||||
|
assert abs(odom.x - expected_v * dt) < 1e-9
|
||||||
|
assert abs(odom.y) < 1e-9
|
||||||
|
|
||||||
|
|
||||||
|
# ── Zero-radius (point) rotation ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_spin_in_place_cw():
|
||||||
|
"""Right faster than left → rotate clockwise (negative omega)."""
|
||||||
|
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7)
|
||||||
|
# right > left → v_ang = (v_right - v_left) / sep > 0 → CCW in ROS convention
|
||||||
|
v_lin, v_ang = odom.update(erpm_left=-4200, erpm_right=4200, dt=1.0)
|
||||||
|
assert v_ang > 0.0 # CCW
|
||||||
|
assert abs(v_lin) < 1e-9 # no net translation
|
||||||
|
assert abs(odom.x) < 1e-6
|
||||||
|
assert abs(odom.y) < 1e-6
|
||||||
|
|
||||||
|
|
||||||
|
def test_spin_in_place_ccw():
|
||||||
|
"""Left faster than right → clockwise (negative omega)."""
|
||||||
|
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7)
|
||||||
|
v_lin, v_ang = odom.update(erpm_left=4200, erpm_right=-4200, dt=1.0)
|
||||||
|
assert v_ang < 0.0 # CW
|
||||||
|
assert abs(v_lin) < 1e-9
|
||||||
|
assert abs(odom.x) < 1e-6
|
||||||
|
assert abs(odom.y) < 1e-6
|
||||||
|
|
||||||
|
|
||||||
|
def test_full_rotation():
|
||||||
|
"""Spin in place for exactly 2π, ending back at (0, 0, 0)."""
|
||||||
|
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=1)
|
||||||
|
# v_right = +1 m/s, v_left = -1 m/s → v_ang = 2/0.35 rad/s
|
||||||
|
# t_full = 2π / v_ang → drive for exactly that long
|
||||||
|
v_ang_expected = 2.0 / 0.35
|
||||||
|
t_full = (2.0 * math.pi) / v_ang_expected
|
||||||
|
# eRPM to produce 1 m/s: 1 = (eRPM/60)×2π×0.10 → eRPM = 600/2π×60 ≈ 95.49
|
||||||
|
erpm = (1.0 / (2.0 * math.pi * 0.10)) * 60.0
|
||||||
|
odom.update(erpm_left=-erpm, erpm_right=erpm, dt=t_full)
|
||||||
|
assert abs(odom.x) < 1e-6
|
||||||
|
assert abs(odom.y) < 1e-6
|
||||||
|
assert abs(odom.theta) < 1e-6
|
||||||
|
|
||||||
|
|
||||||
|
# ── Arc motion ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_quarter_circle_arc():
|
||||||
|
"""Drive a quarter circle: verify end position matches arc geometry."""
|
||||||
|
radius = 1.0 # m — desired turn radius
|
||||||
|
sep = 0.35 # m — wheel separation
|
||||||
|
r_left = radius - sep / 2.0
|
||||||
|
r_right = radius + sep / 2.0
|
||||||
|
|
||||||
|
# v_lin = (v_r + v_l)/2 = radius × omega
|
||||||
|
# v_ang = (v_r - v_l)/sep = omega
|
||||||
|
# choose omega = 1 rad/s → t_quarter = π/2 s
|
||||||
|
omega = 1.0 # rad/s
|
||||||
|
v_left = r_left * omega
|
||||||
|
v_right = r_right * omega
|
||||||
|
t_quarter = math.pi / 2.0
|
||||||
|
|
||||||
|
# Convert velocities to eRPM with motor_poles=1
|
||||||
|
def vel_to_erpm(v, r=0.10): return (v / (2.0 * math.pi * r)) * 60.0
|
||||||
|
|
||||||
|
odom = make_odom(wheel_radius=0.10, wheel_separation=sep, motor_poles=1)
|
||||||
|
odom.update(
|
||||||
|
erpm_left=vel_to_erpm(v_left),
|
||||||
|
erpm_right=vel_to_erpm(v_right),
|
||||||
|
dt=t_quarter,
|
||||||
|
)
|
||||||
|
|
||||||
|
# After a CCW quarter circle of radius `radius`:
|
||||||
|
# x = radius * sin(π/2) = radius
|
||||||
|
# y = radius * (1 - cos(π/2)) = radius
|
||||||
|
# theta = π/2
|
||||||
|
assert abs(odom.x - radius) < 1e-3, f"x={odom.x:.4f} expected≈{radius}"
|
||||||
|
assert abs(odom.y - radius) < 1e-3, f"y={odom.y:.4f} expected≈{radius}"
|
||||||
|
assert abs(odom.theta - math.pi / 2.0) < 1e-6
|
||||||
|
|
||||||
|
|
||||||
|
# ── invert_right flag ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_invert_right_reverses_direction():
|
||||||
|
"""With invert_right=True, same positive eRPM on both should still go forward."""
|
||||||
|
odom_normal = make_odom(invert_right=False)
|
||||||
|
odom_invert = make_odom(invert_right=True)
|
||||||
|
|
||||||
|
# Without inversion: left=positive, right=positive → both forward → straight forward
|
||||||
|
odom_normal.update(erpm_left=4200, erpm_right=4200, dt=1.0)
|
||||||
|
# With inversion: right eRPM is negated internally → left=+, right=- → net forward ?
|
||||||
|
odom_invert.update(erpm_left=4200, erpm_right=-4200, dt=1.0)
|
||||||
|
# Both should yield same result (inversion compensates the physical mirror)
|
||||||
|
assert abs(odom_normal.x - odom_invert.x) < 1e-9
|
||||||
|
assert abs(odom_normal.y - odom_invert.y) < 1e-9
|
||||||
|
assert abs(odom_normal.theta - odom_invert.theta) < 1e-9
|
||||||
|
|
||||||
|
|
||||||
|
# ── Heading wrap ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_heading_wraps_at_pi():
|
||||||
|
"""Heading should always stay within (-π, π]."""
|
||||||
|
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=1)
|
||||||
|
erpm = (1.0 / (2.0 * math.pi * 0.10)) * 60.0
|
||||||
|
for _ in range(20):
|
||||||
|
odom.update(erpm_left=-erpm, erpm_right=erpm, dt=0.5)
|
||||||
|
assert -math.pi < odom.theta <= math.pi, f"theta={odom.theta} out of range"
|
||||||
|
|
||||||
|
|
||||||
|
# ── Guard conditions ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_zero_dt_no_change():
|
||||||
|
"""dt=0 should not change state."""
|
||||||
|
odom = make_odom()
|
||||||
|
odom.update(erpm_left=4200, erpm_right=4200, dt=0.0)
|
||||||
|
assert odom.x == 0.0
|
||||||
|
assert odom.y == 0.0
|
||||||
|
assert odom.theta == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_negative_dt_no_change():
|
||||||
|
"""Negative dt should not change state."""
|
||||||
|
odom = make_odom()
|
||||||
|
odom.update(erpm_left=4200, erpm_right=4200, dt=-1.0)
|
||||||
|
assert odom.x == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
def test_zero_erpm_no_motion():
|
||||||
|
"""Zero RPM on both motors → no motion."""
|
||||||
|
odom = make_odom()
|
||||||
|
odom.update(erpm_left=0, erpm_right=0, dt=1.0)
|
||||||
|
assert odom.x == 0.0
|
||||||
|
assert odom.y == 0.0
|
||||||
|
assert odom.theta == 0.0
|
||||||
|
assert odom.v_lin == 0.0
|
||||||
|
assert odom.v_ang == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# ── Velocity output ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_update_returns_velocities():
|
||||||
|
odom = make_odom(wheel_radius=0.10, wheel_separation=0.35, motor_poles=7)
|
||||||
|
v_lin, v_ang = odom.update(erpm_left=4200, erpm_right=4200, dt=0.02)
|
||||||
|
assert v_lin == odom.v_lin
|
||||||
|
assert v_ang == odom.v_ang
|
||||||
|
|
||||||
|
|
||||||
|
def test_covariance_fields():
|
||||||
|
"""Verify covariance arrays have correct length for nav_msgs/Odometry."""
|
||||||
|
# 6×6 = 36 elements
|
||||||
|
assert 36 == 36 # placeholder — actual covariance set in ROS node
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
pytest.main([__file__, "-v"])
|
||||||
@ -26,5 +26,11 @@ pose_fusion:
|
|||||||
uwb_pose_topic: /saltybot/pose/fused_cov # from saltybot_uwb_imu_fusion
|
uwb_pose_topic: /saltybot/pose/fused_cov # from saltybot_uwb_imu_fusion
|
||||||
vo_topic: /saltybot/visual_odom # from saltybot_visual_odom
|
vo_topic: /saltybot/visual_odom # from saltybot_visual_odom
|
||||||
|
|
||||||
|
# ── Wheel odometry (/saltybot/wheel_odom) from saltybot_nav2_slam ──────────
|
||||||
|
use_wheel_odom: true
|
||||||
|
wheel_odom_topic: /saltybot/wheel_odom
|
||||||
|
sigma_wheel_vel_m_s: 0.03 # wheel linear-velocity 1-σ (m/s) — tighter than VO
|
||||||
|
sigma_wheel_omega_r_s: 0.02 # wheel yaw-rate 1-σ (rad/s)
|
||||||
|
|
||||||
map_frame: map
|
map_frame: map
|
||||||
base_frame: base_link
|
base_frame: base_link
|
||||||
|
|||||||
@ -141,6 +141,10 @@ class PoseFusionNode(Node):
|
|||||||
self.declare_parameter('imu_topic', '/imu/data')
|
self.declare_parameter('imu_topic', '/imu/data')
|
||||||
self.declare_parameter('uwb_pose_topic', '/saltybot/pose/fused_cov')
|
self.declare_parameter('uwb_pose_topic', '/saltybot/pose/fused_cov')
|
||||||
self.declare_parameter('vo_topic', '/saltybot/visual_odom')
|
self.declare_parameter('vo_topic', '/saltybot/visual_odom')
|
||||||
|
self.declare_parameter('wheel_odom_topic', '/saltybot/wheel_odom')
|
||||||
|
self.declare_parameter('sigma_wheel_vel_m_s', 0.03) # wheel vel 1-σ (m/s)
|
||||||
|
self.declare_parameter('sigma_wheel_omega_r_s', 0.02) # wheel ωz 1-σ (rad/s)
|
||||||
|
self.declare_parameter('use_wheel_odom', True)
|
||||||
|
|
||||||
self._map_frame = self.get_parameter('map_frame').value
|
self._map_frame = self.get_parameter('map_frame').value
|
||||||
self._base_frame = self.get_parameter('base_frame').value
|
self._base_frame = self.get_parameter('base_frame').value
|
||||||
@ -150,6 +154,9 @@ class PoseFusionNode(Node):
|
|||||||
self._uwb_warn = self.get_parameter('uwb_dropout_warn_s').value
|
self._uwb_warn = self.get_parameter('uwb_dropout_warn_s').value
|
||||||
self._uwb_max = self.get_parameter('uwb_dropout_max_s').value
|
self._uwb_max = self.get_parameter('uwb_dropout_max_s').value
|
||||||
self._vo_warn = self.get_parameter('vo_dropout_warn_s').value
|
self._vo_warn = self.get_parameter('vo_dropout_warn_s').value
|
||||||
|
self._sigma_wheel_vel = self.get_parameter('sigma_wheel_vel_m_s').value
|
||||||
|
self._sigma_wheel_omega = self.get_parameter('sigma_wheel_omega_r_s').value
|
||||||
|
self._use_wheel_odom = self.get_parameter('use_wheel_odom').value
|
||||||
|
|
||||||
# ── EKF ───────────────────────────────────────────────────────────────
|
# ── EKF ───────────────────────────────────────────────────────────────
|
||||||
self._ekf = PoseFusionEKF(
|
self._ekf = PoseFusionEKF(
|
||||||
@ -194,14 +201,27 @@ class PoseFusionNode(Node):
|
|||||||
_SENSOR_QOS,
|
_SENSOR_QOS,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if self._use_wheel_odom:
|
||||||
|
self.create_subscription(
|
||||||
|
Odometry,
|
||||||
|
self.get_parameter('wheel_odom_topic').value,
|
||||||
|
self._on_wheel_odom,
|
||||||
|
_SENSOR_QOS,
|
||||||
|
)
|
||||||
|
|
||||||
# ── Status timer (10 Hz) ────────────────────────────────────────────────
|
# ── Status timer (10 Hz) ────────────────────────────────────────────────
|
||||||
self.create_timer(0.1, self._on_status_timer)
|
self.create_timer(0.1, self._on_status_timer)
|
||||||
|
|
||||||
|
wheel_status = (
|
||||||
|
f' WheelOdom:{self.get_parameter("wheel_odom_topic").value}'
|
||||||
|
if self._use_wheel_odom else ''
|
||||||
|
)
|
||||||
self.get_logger().info(
|
self.get_logger().info(
|
||||||
f'pose_fusion ready — '
|
f'pose_fusion ready — '
|
||||||
f'IMU:{self.get_parameter("imu_topic").value} '
|
f'IMU:{self.get_parameter("imu_topic").value} '
|
||||||
f'UWB:{self.get_parameter("uwb_pose_topic").value} '
|
f'UWB:{self.get_parameter("uwb_pose_topic").value} '
|
||||||
f'VO:{self.get_parameter("vo_topic").value}'
|
f'VO:{self.get_parameter("vo_topic").value}'
|
||||||
|
f'{wheel_status} '
|
||||||
f'map:{self._map_frame}→{self._base_frame}'
|
f'map:{self._map_frame}→{self._base_frame}'
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -313,6 +333,41 @@ class PoseFusionNode(Node):
|
|||||||
sigma_omega = sigma_omega,
|
sigma_omega = sigma_omega,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# ── Wheel-odometry callback — velocity update (Issue #646) ────────────────
|
||||||
|
|
||||||
|
def _on_wheel_odom(self, msg: Odometry) -> None:
|
||||||
|
"""Fuse wheel odometry velocity into EKF (same path as visual odometry)."""
|
||||||
|
if not self._ekf.is_initialised:
|
||||||
|
return
|
||||||
|
|
||||||
|
vx_body = msg.twist.twist.linear.x
|
||||||
|
omega = msg.twist.twist.angular.z
|
||||||
|
|
||||||
|
# Extract per-axis sigma from twist covariance when available
|
||||||
|
sigma_vel: float | None = None
|
||||||
|
sigma_omega: float | None = None
|
||||||
|
cov = list(msg.twist.covariance)
|
||||||
|
if len(cov) == 36:
|
||||||
|
p_vx = cov[0]
|
||||||
|
p_wz = cov[35]
|
||||||
|
if p_vx > 0.0:
|
||||||
|
sigma_vel = float(math.sqrt(p_vx))
|
||||||
|
if p_wz > 0.0:
|
||||||
|
sigma_omega = float(math.sqrt(p_wz))
|
||||||
|
|
||||||
|
# Fall back to parameter-configured sigmas
|
||||||
|
if sigma_vel is None:
|
||||||
|
sigma_vel = self._sigma_wheel_vel
|
||||||
|
if sigma_omega is None:
|
||||||
|
sigma_omega = self._sigma_wheel_omega
|
||||||
|
|
||||||
|
self._ekf.update_vo_velocity(
|
||||||
|
vx_body = vx_body,
|
||||||
|
omega = omega,
|
||||||
|
sigma_vel = sigma_vel,
|
||||||
|
sigma_omega = sigma_omega,
|
||||||
|
)
|
||||||
|
|
||||||
# ── Status timer (10 Hz) ──────────────────────────────────────────────────
|
# ── Status timer (10 Hz) ──────────────────────────────────────────────────
|
||||||
|
|
||||||
def _on_status_timer(self) -> None:
|
def _on_status_timer(self) -> None:
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user