feat: Multi-sensor pose fusion node (Issue #595)

New package saltybot_pose_fusion — EKF fusing UWB+IMU absolute pose,
visual odometry velocity, and raw IMU into a single authoritative pose.

pose_fusion_ekf.py (pure Python, no ROS2 deps):
  PoseFusionEKF — state [x, y, θ, vx, vy, ω], 6-state EKF.
  - predict_imu(ax_body, ay_body, omega, dt): body-frame IMU predict step
    with Jacobian F, bias-compensated accel, process noise Q.
  - update_uwb_position(x, y, sigma_m): absolute position measurement
    (H=[1,0,0,0,0,0; 0,1,0,0,0,0]) from UWB+IMU fused stream.
  - update_uwb_heading(heading_rad, sigma_rad): heading measurement.
  - update_vo_velocity(vx_body, omega, ...): VO velocity measurement —
    body-frame vx rotated to world via cos/sin(θ), updates [vx,vy,ω] state.
  - Joseph-form covariance update for numerical stability.
  - Dual dropout clocks: uwb_dropout_s, vo_dropout_s (reset on each update).
  - Velocity damping when uwb_dropout_s > 2s.
  - Sensor weight parameters: sigma_uwb_pos_m, sigma_uwb_head_rad,
    sigma_vo_vel_m_s, sigma_vo_omega_r_s, sigma_imu_accel/gyro,
    sigma_vel_drift, dropout_vel_damp.

pose_fusion_node.py (ROS2 node 'pose_fusion'):
  - Subscribes: /imu/data (Imu, 200Hz → predict), /saltybot/pose/fused_cov
    (PoseWithCovarianceStamped, 10Hz → position+heading update, σ extracted
    from message covariance when use_uwb_covariance=true), /saltybot/visual_odom
    (Odometry, 30Hz → velocity update, σ from twist covariance).
  - Publishes: /saltybot/pose/authoritative (PoseWithCovarianceStamped),
    /saltybot/pose/status (String JSON, 10Hz).
  - TF2: map→base_link broadcast at IMU rate.
  - Suppresses output when uwb_dropout_s > uwb_dropout_max_s (10s).
  - Warns (throttled) on UWB/VO dropout.

config/pose_fusion_params.yaml: sensor weights + dropout thresholds.
launch/pose_fusion.launch.py: single node launch with params_file arg.
test/test_pose_fusion_ekf.py: 13 unit tests — init, predict, UWB/VO
  updates, dropout reset, covariance shape/convergence, sigma override.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-perception 2026-03-14 15:00:44 -04:00
parent 061189670a
commit c76d5b0dd7
10 changed files with 1036 additions and 0 deletions

View File

@ -0,0 +1,30 @@
pose_fusion:
ros__parameters:
# ── Sensor weights ──────────────────────────────────────────────────────
# UWB+IMU fused pose (/saltybot/pose/fused_cov)
sigma_uwb_pos_m: 0.10 # position 1-σ (m) — used when use_uwb_covariance=false
use_uwb_covariance: true # if true, extract σ from PoseWithCovarianceStamped.covariance
sigma_uwb_head_rad: 0.15 # heading 1-σ (rad) — used when cov unavailable
# Visual odometry (/saltybot/visual_odom)
sigma_vo_vel_m_s: 0.05 # linear-velocity update 1-σ (m/s)
sigma_vo_omega_r_s: 0.03 # yaw-rate update 1-σ (rad/s)
# IMU process noise (/imu/data)
sigma_imu_accel: 0.05 # accelerometer noise (m/s²)
sigma_imu_gyro: 0.003 # gyroscope noise (rad/s)
sigma_vel_drift: 0.10 # velocity random-walk process noise (m/s)
dropout_vel_damp: 0.95 # velocity damping factor per second during UWB dropout
# ── Dropout thresholds ──────────────────────────────────────────────────
uwb_dropout_warn_s: 2.0 # log warning if UWB silent > this (s)
uwb_dropout_max_s: 10.0 # suppress pose output if UWB silent > this (s)
vo_dropout_warn_s: 1.0 # log warning if VO silent > this (s)
# ── Topic / frame configuration ─────────────────────────────────────────
imu_topic: /imu/data
uwb_pose_topic: /saltybot/pose/fused_cov # from saltybot_uwb_imu_fusion
vo_topic: /saltybot/visual_odom # from saltybot_visual_odom
map_frame: map
base_frame: base_link

View File

@ -0,0 +1,29 @@
"""pose_fusion.launch.py — Launch the multi-sensor pose fusion node (Issue #595)."""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg_share = get_package_share_directory('saltybot_pose_fusion')
default_params = os.path.join(pkg_share, 'config', 'pose_fusion_params.yaml')
params_arg = DeclareLaunchArgument(
'params_file',
default_value=default_params,
description='Path to pose_fusion_params.yaml',
)
pose_fusion_node = Node(
package='saltybot_pose_fusion',
executable='pose_fusion',
name='pose_fusion',
output='screen',
parameters=[LaunchConfiguration('params_file')],
)
return LaunchDescription([params_arg, pose_fusion_node])

View File

@ -0,0 +1,35 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_pose_fusion</name>
<version>0.1.0</version>
<description>
Multi-sensor EKF pose fusion node (Issue #595).
Fuses UWB+IMU absolute pose (/saltybot/pose/fused_cov), visual odometry
(/saltybot/visual_odom), and raw IMU (/imu/data) into a single authoritative
map-frame PoseWithCovarianceStamped on /saltybot/pose/authoritative with
TF2 map→base_link broadcast.
</description>
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
<license>MIT</license>
<buildtool_depend>ament_python</buildtool_depend>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>nav_msgs</depend>
<depend>geometry_msgs</depend>
<depend>tf2_ros</depend>
<exec_depend>python3-numpy</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,377 @@
"""
pose_fusion_ekf.py Extended Kalman Filter for multi-sensor pose fusion (Issue #595).
No ROS2 dependencies pure Python + numpy, fully unit-testable.
Fuses three sensor streams
1. IMU (200 Hz) predict step (body-frame accel + gyro)
2. UWB+IMU fused pose (10 Hz) absolute position and heading update
3. Visual odometry (30 Hz) velocity-domain update (drift-free short-term motion)
State vector [x, y, θ, vx, vy, ω]
x position, forward (m, map frame)
y position, lateral (m, map frame)
θ heading, yaw (rad, CCW positive)
vx velocity x (m/s, map frame)
vy velocity y (m/s, map frame)
ω yaw rate (rad/s)
Process model (IMU as control input):
θ_{k+1} = θ_k + ω_imu * dt
vx_{k+1} = vx_k + (ax_body*cos θ - ay_body*sin θ) * dt
vy_{k+1} = vy_k + (ax_body*sin θ + ay_body*cos θ) * dt
x_{k+1} = x_k + vx_k * dt
y_{k+1} = y_k + vy_k * dt
ω_{k+1} = ω_imu
Measurement models
UWB position: z = [x, y] H[2×6] = [[1,0,0,0,0,0],[0,1,0,0,0,0]]
UWB heading: z = [θ] H[1×6] = [[0,0,1,0,0,0]]
VO velocity: z = [vx_w, vy_w, ω] H[3×6] = diag of [vx,vy,ω] rows in state
VO velocity measurement: VO gives vx_body (forward speed) and ω.
We rotate to world frame: vx_w = vx_body*cos(θ), vy_w = vx_body*sin(θ)
then update [vx, vy, ω] state directly (linear measurement step).
Sensor dropout handling
_uwb_dropout_s: seconds since last UWB update
_vo_dropout_s: seconds since last VO update
Both increment during predict(), reset to 0 on update.
When uwb_dropout_s > uwb_max_s, velocity damping activates and output
is suppressed by the node.
Sensor weights (configurable via constructor)
sigma_uwb_pos_m UWB+IMU fused position σ (m) default 0.10
sigma_uwb_head_rad UWB+IMU fused heading σ (rad) default 0.15
sigma_vo_vel_m_s VO linear velocity σ (m/s) default 0.05
sigma_vo_omega_r_s VO yaw-rate σ (rad/s) default 0.03
sigma_imu_accel IMU accel noise σ (m/) default 0.05
sigma_imu_gyro IMU gyro noise σ (rad/s) default 0.003
sigma_vel_drift velocity process noise σ (m/s) default 0.10
dropout_vel_damp velocity damping/s during UWB drop default 0.95
"""
from __future__ import annotations
import math
from typing import Tuple
import numpy as np
# ── State indices ──────────────────────────────────────────────────────────────
N = 6
IX, IY, IT, IVX, IVY, IW = range(N)
def _wrap(a: float) -> float:
"""Wrap angle to (−π, π]."""
return float((a + math.pi) % (2.0 * math.pi) - math.pi)
class PoseFusionEKF:
"""
EKF fusing UWB+IMU absolute pose (10 Hz), visual-odometry velocity (30 Hz),
and raw IMU (200 Hz) into a single authoritative map-frame pose.
Thread model: single-threaded caller must serialise access.
"""
def __init__(
self,
sigma_uwb_pos_m: float = 0.10,
sigma_uwb_head_rad: float = 0.15,
sigma_vo_vel_m_s: float = 0.05,
sigma_vo_omega_r_s: float = 0.03,
sigma_imu_accel: float = 0.05,
sigma_imu_gyro: float = 0.003,
sigma_vel_drift: float = 0.10,
dropout_vel_damp: float = 0.95,
) -> None:
# Measurement noise variances
self._R_uwb_pos = sigma_uwb_pos_m ** 2
self._R_uwb_head = sigma_uwb_head_rad ** 2
self._R_vo_vel = sigma_vo_vel_m_s ** 2
self._R_vo_omega = sigma_vo_omega_r_s ** 2
# Process noise parameters
self._q_a = sigma_imu_accel ** 2
self._q_g = sigma_imu_gyro ** 2
self._q_v = sigma_vel_drift ** 2
self._damp = dropout_vel_damp
# State & covariance
self._x = np.zeros(N, dtype=np.float64)
self._P = np.eye(N, dtype=np.float64) * 9.0
# Simple accel bias estimator
self._accel_bias = np.zeros(2, dtype=np.float64)
self._bias_alpha = 0.005
self._init = False
self._uwb_dropout_s = 0.0
self._vo_dropout_s = 0.0
# ── Initialisation ─────────────────────────────────────────────────────────
def initialise(self, x: float, y: float, heading: float = 0.0) -> None:
"""Seed filter with known position (called on first UWB measurement)."""
self._x[:] = 0.0
self._x[IX] = x
self._x[IY] = y
self._x[IT] = heading
self._P = np.diag([0.25, 0.25, 0.1, 1.0, 1.0, 0.1])
self._init = True
# ── IMU predict step (200 Hz) ──────────────────────────────────────────────
def predict_imu(
self,
ax_body: float,
ay_body: float,
omega: float,
dt: float,
) -> None:
"""
EKF predict driven by body-frame IMU.
Parameters
----------
ax_body : forward accel (m/, body frame)
ay_body : lateral accel (m/, body frame, left positive)
omega : yaw rate (rad/s, CCW positive)
dt : timestep (s)
"""
if not self._init:
return
# Bias-compensated acceleration
ax_c = ax_body - self._accel_bias[0]
ay_c = ay_body - self._accel_bias[1]
x = self._x
th = x[IT]
ct = math.cos(th)
st = math.sin(th)
# World-frame acceleration
ax_w = ax_c * ct - ay_c * st
ay_w = ax_c * st + ay_c * ct
# Propagate state
xp = x.copy()
xp[IX] = x[IX] + x[IVX] * dt
xp[IY] = x[IY] + x[IVY] * dt
xp[IT] = _wrap(x[IT] + omega * dt)
xp[IVX] = x[IVX] + ax_w * dt
xp[IVY] = x[IVY] + ay_w * dt
xp[IW] = omega
# Jacobian F = ∂f/∂x
F = np.eye(N, dtype=np.float64)
F[IX, IVX] = dt
F[IY, IVY] = dt
F[IVX, IT] = (-ax_c * st - ay_c * ct) * dt
F[IVY, IT] = ( ax_c * ct - ay_c * st) * dt
# Process noise Q
dt2 = dt * dt
Q = np.diag([
0.5 * self._q_a * dt2 * dt2,
0.5 * self._q_a * dt2 * dt2,
self._q_g * dt2,
self._q_v * dt + self._q_a * dt2,
self._q_v * dt + self._q_a * dt2,
self._q_g,
])
self._x = xp
self._P = F @ self._P @ F.T + Q
# Advance dropout clocks
self._uwb_dropout_s += dt
self._vo_dropout_s += dt
# Velocity damping during extended UWB dropout
if self._uwb_dropout_s > 2.0:
d = self._damp ** dt
self._x[IVX] *= d
self._x[IVY] *= d
# Running accel bias estimate
self._accel_bias[0] += self._bias_alpha * (ax_body - self._accel_bias[0])
self._accel_bias[1] += self._bias_alpha * (ay_body - self._accel_bias[1])
# ── UWB+IMU position update (10 Hz) ───────────────────────────────────────
def update_uwb_position(
self,
x_m: float,
y_m: float,
sigma_m: float | None = None,
) -> None:
"""
Update from UWB+IMU fused position measurement [x, y].
Parameters
----------
x_m, y_m : measured position (m, map frame)
sigma_m : std-dev override; uses constructor default if None
"""
if not self._init:
self.initialise(x_m, y_m)
return
R_val = (sigma_m ** 2) if sigma_m is not None else self._R_uwb_pos
R = np.eye(2, dtype=np.float64) * R_val
H = np.zeros((2, N), dtype=np.float64)
H[0, IX] = 1.0
H[1, IY] = 1.0
innov = np.array([x_m - self._x[IX], y_m - self._x[IY]])
_ekf_update(self._x, self._P, H, R, innov)
self._x[IT] = _wrap(self._x[IT])
self._uwb_dropout_s = 0.0
# ── UWB+IMU heading update (10 Hz) ────────────────────────────────────────
def update_uwb_heading(
self,
heading_rad: float,
sigma_rad: float | None = None,
) -> None:
"""Update heading from UWB+IMU fused estimate."""
if not self._init:
return
R_val = (sigma_rad ** 2) if sigma_rad is not None else self._R_uwb_head
R = np.array([[R_val]])
H = np.zeros((1, N), dtype=np.float64)
H[0, IT] = 1.0
innov = np.array([_wrap(heading_rad - self._x[IT])])
_ekf_update(self._x, self._P, H, R, innov)
self._x[IT] = _wrap(self._x[IT])
# ── Visual-odometry velocity update (30 Hz) ───────────────────────────────
def update_vo_velocity(
self,
vx_body: float,
omega: float,
sigma_vel: float | None = None,
sigma_omega: float | None = None,
) -> None:
"""
Update from visual-odometry velocity measurement.
VO gives forward speed (vx_body, m/s) in the robot body frame, and
yaw rate (omega, rad/s). We rotate vx_body to world frame using the
current heading estimate, then update [vx, vy, ω] state.
Parameters
----------
vx_body : VO forward linear speed (m/s)
omega : VO yaw rate (rad/s)
sigma_vel : std-dev for linear velocity component (m/s)
sigma_omega: std-dev for yaw rate (rad/s)
"""
if not self._init:
return
R_vel = (sigma_vel ** 2) if sigma_vel is not None else self._R_vo_vel
R_omega = (sigma_omega ** 2) if sigma_omega is not None else self._R_vo_omega
th = self._x[IT]
ct = math.cos(th)
st = math.sin(th)
# Rotate body-frame vx to world-frame [vx_w, vy_w]
vx_w = vx_body * ct
vy_w = vx_body * st
# Measurement: [vx_w, vy_w, ω]
z = np.array([vx_w, vy_w, omega])
H = np.zeros((3, N), dtype=np.float64)
H[0, IVX] = 1.0
H[1, IVY] = 1.0
H[2, IW] = 1.0
R = np.diag([R_vel, R_vel, R_omega])
innov = z - H @ self._x
_ekf_update(self._x, self._P, H, R, innov)
self._x[IT] = _wrap(self._x[IT])
self._vo_dropout_s = 0.0
# ── Accessors ─────────────────────────────────────────────────────────────
@property
def position(self) -> Tuple[float, float]:
return float(self._x[IX]), float(self._x[IY])
@property
def heading(self) -> float:
return float(self._x[IT])
@property
def velocity(self) -> Tuple[float, float]:
return float(self._x[IVX]), float(self._x[IVY])
@property
def yaw_rate(self) -> float:
return float(self._x[IW])
@property
def covariance_position(self) -> np.ndarray:
"""2×2 position sub-covariance [x, y]."""
return self._P[:2, :2].copy()
@property
def covariance_full(self) -> np.ndarray:
"""Full 6×6 state covariance."""
return self._P.copy()
@property
def is_initialised(self) -> bool:
return self._init
@property
def uwb_dropout_s(self) -> float:
return self._uwb_dropout_s
@property
def vo_dropout_s(self) -> float:
return self._vo_dropout_s
def position_uncertainty_m(self) -> float:
"""Approx 1-σ position uncertainty (m)."""
return float(math.sqrt((self._P[IX, IX] + self._P[IY, IY]) / 2.0))
# ── EKF update helper ──────────────────────────────────────────────────────────
def _ekf_update(
x: np.ndarray,
P: np.ndarray,
H: np.ndarray,
R: np.ndarray,
innov: np.ndarray,
) -> None:
"""Standard linear EKF update step — modifies x and P in place."""
S = H @ P @ H.T + R
K = P @ H.T @ np.linalg.inv(S)
x += K @ innov
I_KH = np.eye(N, dtype=np.float64) - K @ H
# Joseph form for numerical stability
P[:] = I_KH @ P @ I_KH.T + K @ R @ K.T

View File

@ -0,0 +1,396 @@
"""
pose_fusion_node.py Multi-sensor pose fusion node (Issue #595).
Fuses three sensor streams into a single authoritative map-frame pose:
IMU (200 Hz) /imu/data
UWB + IMU fused pose /saltybot/pose/fused_cov (10 Hz)
Visual odometry /saltybot/visual_odom (30 Hz)
The UWB+IMU stream (from saltybot_uwb_imu_fusion) provides absolute position
anchoring every ~100 ms. Visual odometry fills in smooth relative motion
between UWB updates at 30 Hz. Raw IMU drives the predict step at 200 Hz.
Publishes
/saltybot/pose/authoritative geometry_msgs/PoseWithCovarianceStamped
/saltybot/pose/status std_msgs/String (JSON, 10 Hz)
TF2
map base_link (broadcast at IMU rate when filter is healthy)
Parameters (see config/pose_fusion_params.yaml)
Sensor weights:
sigma_uwb_pos_m float 0.10 UWB position 1-σ (m)
use_uwb_covariance bool true extract σ from PoseWithCovarianceStamped
sigma_uwb_head_rad float 0.15 UWB heading 1-σ (rad)
sigma_vo_vel_m_s float 0.05 VO linear-velocity 1-σ (m/s)
sigma_vo_omega_r_s float 0.03 VO yaw-rate 1-σ (rad/s)
sigma_imu_accel float 0.05 IMU accel noise (m/)
sigma_imu_gyro float 0.003 IMU gyro noise (rad/s)
sigma_vel_drift float 0.10 velocity process-noise (m/s)
dropout_vel_damp float 0.95 velocity damping/s during UWB dropout
Dropout thresholds:
uwb_dropout_warn_s float 2.0 log warning after this many s without UWB
uwb_dropout_max_s float 10.0 suppress output after this many s
vo_dropout_warn_s float 1.0 log warning after this many s without VO
Frame IDs:
map_frame str map
base_frame str base_link
Topics:
imu_topic str /imu/data
uwb_pose_topic str /saltybot/pose/fused_cov
vo_topic str /saltybot/visual_odom
"""
from __future__ import annotations
import json
import math
import rclpy
from rclpy.node import Node
from rclpy.qos import (
QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
)
import tf2_ros
from geometry_msgs.msg import (
PoseWithCovarianceStamped, TransformStamped
)
from nav_msgs.msg import Odometry
from sensor_msgs.msg import Imu
from std_msgs.msg import String
from .pose_fusion_ekf import PoseFusionEKF
# ── QoS ───────────────────────────────────────────────────────────────────────
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
_RELIABLE_QOS = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
def _quat_to_yaw(qx: float, qy: float, qz: float, qw: float) -> float:
"""Extract yaw (Z-axis rotation) from quaternion."""
return math.atan2(2.0 * (qw * qz + qx * qy),
1.0 - 2.0 * (qy * qy + qz * qz))
def _yaw_to_qz_qw(yaw: float):
half = yaw * 0.5
return math.sin(half), math.cos(half)
def _cov36_pos_sigma(cov36: list) -> float | None:
"""Extract average xy position sigma from 6×6 covariance row-major list."""
try:
p_xx = cov36[0]
p_yy = cov36[7]
if p_xx > 0.0 and p_yy > 0.0:
return float(math.sqrt((p_xx + p_yy) / 2.0))
except (IndexError, ValueError):
pass
return None
def _cov36_heading_sigma(cov36: list) -> float | None:
"""Extract heading sigma from 6×6 covariance row-major list (index 35 = yaw)."""
try:
p_yaw = cov36[35]
if p_yaw > 0.0:
return float(math.sqrt(p_yaw))
except (IndexError, ValueError):
pass
return None
class PoseFusionNode(Node):
"""Multi-sensor EKF pose fusion — see module docstring for details."""
def __init__(self) -> None:
super().__init__('pose_fusion')
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter('sigma_uwb_pos_m', 0.10)
self.declare_parameter('use_uwb_covariance', True)
self.declare_parameter('sigma_uwb_head_rad', 0.15)
self.declare_parameter('sigma_vo_vel_m_s', 0.05)
self.declare_parameter('sigma_vo_omega_r_s', 0.03)
self.declare_parameter('sigma_imu_accel', 0.05)
self.declare_parameter('sigma_imu_gyro', 0.003)
self.declare_parameter('sigma_vel_drift', 0.10)
self.declare_parameter('dropout_vel_damp', 0.95)
self.declare_parameter('uwb_dropout_warn_s', 2.0)
self.declare_parameter('uwb_dropout_max_s', 10.0)
self.declare_parameter('vo_dropout_warn_s', 1.0)
self.declare_parameter('map_frame', 'map')
self.declare_parameter('base_frame', 'base_link')
self.declare_parameter('imu_topic', '/imu/data')
self.declare_parameter('uwb_pose_topic', '/saltybot/pose/fused_cov')
self.declare_parameter('vo_topic', '/saltybot/visual_odom')
self._map_frame = self.get_parameter('map_frame').value
self._base_frame = self.get_parameter('base_frame').value
self._use_uwb_cov = self.get_parameter('use_uwb_covariance').value
self._sigma_uwb_pos = self.get_parameter('sigma_uwb_pos_m').value
self._sigma_uwb_head = self.get_parameter('sigma_uwb_head_rad').value
self._uwb_warn = self.get_parameter('uwb_dropout_warn_s').value
self._uwb_max = self.get_parameter('uwb_dropout_max_s').value
self._vo_warn = self.get_parameter('vo_dropout_warn_s').value
# ── EKF ───────────────────────────────────────────────────────────────
self._ekf = PoseFusionEKF(
sigma_uwb_pos_m = self._sigma_uwb_pos,
sigma_uwb_head_rad = self._sigma_uwb_head,
sigma_vo_vel_m_s = self.get_parameter('sigma_vo_vel_m_s').value,
sigma_vo_omega_r_s = self.get_parameter('sigma_vo_omega_r_s').value,
sigma_imu_accel = self.get_parameter('sigma_imu_accel').value,
sigma_imu_gyro = self.get_parameter('sigma_imu_gyro').value,
sigma_vel_drift = self.get_parameter('sigma_vel_drift').value,
dropout_vel_damp = self.get_parameter('dropout_vel_damp').value,
)
self._last_imu_t: float | None = None
# ── Publishers ─────────────────────────────────────────────────────────
self._pose_pub = self.create_publisher(
PoseWithCovarianceStamped, '/saltybot/pose/authoritative', 10
)
self._status_pub = self.create_publisher(
String, '/saltybot/pose/status', 10
)
self._tf_br = tf2_ros.TransformBroadcaster(self)
# ── Subscriptions ──────────────────────────────────────────────────────
self.create_subscription(
Imu,
self.get_parameter('imu_topic').value,
self._on_imu,
_SENSOR_QOS,
)
self.create_subscription(
PoseWithCovarianceStamped,
self.get_parameter('uwb_pose_topic').value,
self._on_uwb_pose,
_RELIABLE_QOS,
)
self.create_subscription(
Odometry,
self.get_parameter('vo_topic').value,
self._on_vo,
_SENSOR_QOS,
)
# ── Status timer (10 Hz) ────────────────────────────────────────────────
self.create_timer(0.1, self._on_status_timer)
self.get_logger().info(
f'pose_fusion ready — '
f'IMU:{self.get_parameter("imu_topic").value} '
f'UWB:{self.get_parameter("uwb_pose_topic").value} '
f'VO:{self.get_parameter("vo_topic").value} '
f'map:{self._map_frame}{self._base_frame}'
)
# ── IMU callback — predict step ────────────────────────────────────────────
def _on_imu(self, msg: Imu) -> None:
now = self.get_clock().now().nanoseconds * 1e-9
if self._last_imu_t is None:
self._last_imu_t = now
return
dt = now - self._last_imu_t
self._last_imu_t = now
if dt <= 0.0 or dt > 0.5:
return
self._ekf.predict_imu(
ax_body = msg.linear_acceleration.x,
ay_body = msg.linear_acceleration.y,
omega = msg.angular_velocity.z,
dt = dt,
)
if not self._ekf.is_initialised:
return
# Warn / suppress on UWB dropout
if self._ekf.uwb_dropout_s > self._uwb_max:
self.get_logger().warn(
f'UWB dropout {self._ekf.uwb_dropout_s:.1f}s '
'> max — pose output suppressed',
throttle_duration_sec=5.0,
)
return
if self._ekf.uwb_dropout_s > self._uwb_warn:
self.get_logger().warn(
f'UWB dropout {self._ekf.uwb_dropout_s:.1f}s — '
'dead-reckoning only',
throttle_duration_sec=2.0,
)
self._publish_pose(msg.header.stamp)
# ── UWB+IMU fused pose callback — position+heading update ─────────────────
def _on_uwb_pose(self, msg: PoseWithCovarianceStamped) -> None:
x = msg.pose.pose.position.x
y = msg.pose.pose.position.y
# Position sigma: from message covariance or parameter
sigma_pos: float | None = None
if self._use_uwb_cov:
sigma_pos = _cov36_pos_sigma(list(msg.pose.covariance))
if sigma_pos is None or sigma_pos <= 0.0:
sigma_pos = self._sigma_uwb_pos
self._ekf.update_uwb_position(x, y, sigma_m=sigma_pos)
# Heading update from quaternion
q = msg.pose.pose.orientation
yaw = _quat_to_yaw(q.x, q.y, q.z, q.w)
sigma_head: float | None = None
if self._use_uwb_cov:
sigma_head = _cov36_heading_sigma(list(msg.pose.covariance))
if sigma_head is None or sigma_head <= 0.0:
sigma_head = self._sigma_uwb_head
# Only update heading if quaternion is non-trivial (some nodes publish q=(0,0,0,0))
if abs(q.w) > 0.01 or abs(q.z) > 0.01:
self._ekf.update_uwb_heading(yaw, sigma_rad=sigma_head)
# ── Visual-odometry callback — velocity update ─────────────────────────────
def _on_vo(self, msg: Odometry) -> None:
if not self._ekf.is_initialised:
return
vx_body = msg.twist.twist.linear.x
omega = msg.twist.twist.angular.z
# Extract per-sensor sigma from twist covariance if present
# Row-major 6×6: [0]=vx, [7]=vy, [35]=ωz
sigma_vel: float | None = None
sigma_omega: float | None = None
cov = list(msg.twist.covariance)
if len(cov) == 36:
p_vx = cov[0]
p_wz = cov[35]
if p_vx > 0.0:
sigma_vel = float(math.sqrt(p_vx))
if p_wz > 0.0:
sigma_omega = float(math.sqrt(p_wz))
# Warn on VO dropout (but never suppress — VO is secondary)
if self._ekf.vo_dropout_s > self._vo_warn:
self.get_logger().warn(
f'VO dropout {self._ekf.vo_dropout_s:.1f}s',
throttle_duration_sec=2.0,
)
self._ekf.update_vo_velocity(
vx_body = vx_body,
omega = omega,
sigma_vel = sigma_vel,
sigma_omega = sigma_omega,
)
# ── Status timer (10 Hz) ──────────────────────────────────────────────────
def _on_status_timer(self) -> None:
if not self._ekf.is_initialised:
return
x, y = self._ekf.position
status = {
'x': round(x, 4),
'y': round(y, 4),
'heading_deg': round(math.degrees(self._ekf.heading), 2),
'pos_uncertainty_m': round(self._ekf.position_uncertainty_m(), 4),
'uwb_dropout_s': round(self._ekf.uwb_dropout_s, 2),
'vo_dropout_s': round(self._ekf.vo_dropout_s, 2),
'healthy': self._ekf.uwb_dropout_s < self._uwb_max,
}
msg = String()
msg.data = json.dumps(status)
self._status_pub.publish(msg)
# ── Publish authoritative pose ────────────────────────────────────────────
def _publish_pose(self, stamp) -> None:
x, y = self._ekf.position
heading = self._ekf.heading
cov_pos = self._ekf.covariance_position
cov6 = self._ekf.covariance_full
vx, vy = self._ekf.velocity
omega = self._ekf.yaw_rate
qz, qw = _yaw_to_qz_qw(heading)
msg = PoseWithCovarianceStamped()
msg.header.stamp = stamp
msg.header.frame_id = self._map_frame
msg.pose.pose.position.x = x
msg.pose.pose.position.y = y
msg.pose.pose.position.z = 0.0
msg.pose.pose.orientation.z = qz
msg.pose.pose.orientation.w = qw
# Build 6×6 row-major covariance
cov36 = [0.0] * 36
cov36[0] = cov_pos[0, 0] # x,x
cov36[1] = cov_pos[0, 1] # x,y
cov36[6] = cov_pos[1, 0] # y,x
cov36[7] = cov_pos[1, 1] # y,y
cov36[14] = 1e-4 # z (not estimated)
cov36[21] = 1e-4 # roll
cov36[28] = 1e-4 # pitch
cov36[35] = float(cov6[2, 2]) # yaw
msg.pose.covariance = cov36
self._pose_pub.publish(msg)
# TF2: map → base_link
tf = TransformStamped()
tf.header.stamp = stamp
tf.header.frame_id = self._map_frame
tf.child_frame_id = self._base_frame
tf.transform.translation.x = x
tf.transform.translation.y = y
tf.transform.translation.z = 0.0
tf.transform.rotation.z = qz
tf.transform.rotation.w = qw
self._tf_br.sendTransform(tf)
def main(args=None) -> None:
rclpy.init(args=args)
node = PoseFusionNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_pose_fusion
[install]
install_scripts=$base/lib/saltybot_pose_fusion

View File

@ -0,0 +1,30 @@
import os
from glob import glob
from setuptools import setup
package_name = 'saltybot_pose_fusion'
setup(
name=package_name,
version='0.1.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*.py')),
(os.path.join('share', package_name, 'config'), glob('config/*.yaml')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='sl-perception',
maintainer_email='sl-perception@saltylab.local',
description='Multi-sensor EKF pose fusion — UWB + visual odom + IMU (Issue #595)',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'pose_fusion = saltybot_pose_fusion.pose_fusion_node:main',
],
},
)

View File

@ -0,0 +1,134 @@
"""Unit tests for PoseFusionEKF (no ROS2 required)."""
import math
import pytest
from saltybot_pose_fusion.pose_fusion_ekf import PoseFusionEKF
def make_ekf(**kwargs):
return PoseFusionEKF(**kwargs)
# ── Initialisation ─────────────────────────────────────────────────────────────
def test_not_initialised_before_uwb():
ekf = make_ekf()
assert not ekf.is_initialised
def test_initialised_after_uwb_update():
ekf = make_ekf()
ekf.update_uwb_position(1.0, 2.0)
assert ekf.is_initialised
x, y = ekf.position
assert abs(x - 1.0) < 1e-9
assert abs(y - 2.0) < 1e-9
# ── IMU predict ───────────────────────────────────────────────────────────────
def test_predict_advances_position():
ekf = make_ekf()
ekf.update_uwb_position(0.0, 0.0)
# Give it a forward velocity by injecting a forward acceleration
ekf.predict_imu(ax_body=1.0, ay_body=0.0, omega=0.0, dt=1.0)
ekf.predict_imu(ax_body=1.0, ay_body=0.0, omega=0.0, dt=1.0)
x, y = ekf.position
assert x > 0.0, f"Expected forward motion, got x={x}"
def test_predict_no_crash_before_init():
ekf = make_ekf()
ekf.predict_imu(ax_body=1.0, ay_body=0.0, omega=0.0, dt=0.01)
assert not ekf.is_initialised
# ── UWB position update ───────────────────────────────────────────────────────
def test_uwb_update_converges():
ekf = make_ekf(sigma_uwb_pos_m=0.10)
ekf.update_uwb_position(5.0, 3.0)
for _ in range(20):
ekf.predict_imu(ax_body=0.0, ay_body=0.0, omega=0.0, dt=0.05)
ekf.update_uwb_position(5.0, 3.0)
x, y = ekf.position
assert abs(x - 5.0) < 0.5
assert abs(y - 3.0) < 0.5
def test_uwb_resets_dropout():
ekf = make_ekf()
ekf.update_uwb_position(0.0, 0.0)
ekf.predict_imu(0.0, 0.0, 0.0, dt=5.0)
assert ekf.uwb_dropout_s > 4.0
ekf.update_uwb_position(0.0, 0.0)
assert ekf.uwb_dropout_s == 0.0
# ── UWB heading update ────────────────────────────────────────────────────────
def test_uwb_heading_update():
ekf = make_ekf(sigma_uwb_head_rad=0.1)
ekf.update_uwb_position(0.0, 0.0)
target_yaw = math.radians(45.0)
for _ in range(10):
ekf.update_uwb_heading(target_yaw)
assert abs(ekf.heading - target_yaw) < 0.2
# ── VO velocity update ────────────────────────────────────────────────────────
def test_vo_velocity_update():
ekf = make_ekf(sigma_vo_vel_m_s=0.05, sigma_vo_omega_r_s=0.03)
ekf.update_uwb_position(0.0, 0.0)
ekf.update_vo_velocity(vx_body=1.0, omega=0.0)
vx, vy = ekf.velocity
assert vx > 0.5, f"Expected vx>0.5 after VO update, got {vx}"
def test_vo_resets_dropout():
ekf = make_ekf()
ekf.update_uwb_position(0.0, 0.0)
ekf.predict_imu(0.0, 0.0, 0.0, dt=3.0)
assert ekf.vo_dropout_s > 2.0
ekf.update_vo_velocity(vx_body=0.0, omega=0.0)
assert ekf.vo_dropout_s == 0.0
# ── Covariance ────────────────────────────────────────────────────────────────
def test_covariance_decreases_with_updates():
ekf = make_ekf()
ekf.update_uwb_position(0.0, 0.0)
p_init = ekf.position_uncertainty_m()
for _ in range(30):
ekf.predict_imu(0.0, 0.0, 0.0, dt=0.05)
ekf.update_uwb_position(0.0, 0.0)
assert ekf.position_uncertainty_m() < p_init
def test_covariance_matrix_shape():
ekf = make_ekf()
ekf.update_uwb_position(1.0, 1.0)
assert ekf.covariance_position.shape == (2, 2)
assert ekf.covariance_full.shape == (6, 6)
# ── Sigma override ────────────────────────────────────────────────────────────
def test_custom_sigma_override():
"""High sigma should produce weaker updates than low sigma."""
ekf_weak = make_ekf(sigma_uwb_pos_m=2.0)
ekf_strong = make_ekf(sigma_uwb_pos_m=0.01)
for ekf in (ekf_weak, ekf_strong):
ekf.update_uwb_position(10.0, 0.0)
ekf.predict_imu(0.0, 0.0, 0.0, dt=0.1)
ekf.update_uwb_position(0.0, 0.0)
x_weak, _ = ekf_weak.position
x_strong, _ = ekf_strong.position
# Strong measurement should pull closer to 0
assert x_strong < x_weak, \
f"Strong sigma should pull x closer to 0: strong={x_strong:.3f} weak={x_weak:.3f}"