feat: Multi-sensor pose fusion (Issue #595) #606
@ -0,0 +1,30 @@
|
||||
pose_fusion:
|
||||
ros__parameters:
|
||||
# ── Sensor weights ──────────────────────────────────────────────────────
|
||||
# UWB+IMU fused pose (/saltybot/pose/fused_cov)
|
||||
sigma_uwb_pos_m: 0.10 # position 1-σ (m) — used when use_uwb_covariance=false
|
||||
use_uwb_covariance: true # if true, extract σ from PoseWithCovarianceStamped.covariance
|
||||
sigma_uwb_head_rad: 0.15 # heading 1-σ (rad) — used when cov unavailable
|
||||
|
||||
# Visual odometry (/saltybot/visual_odom)
|
||||
sigma_vo_vel_m_s: 0.05 # linear-velocity update 1-σ (m/s)
|
||||
sigma_vo_omega_r_s: 0.03 # yaw-rate update 1-σ (rad/s)
|
||||
|
||||
# IMU process noise (/imu/data)
|
||||
sigma_imu_accel: 0.05 # accelerometer noise (m/s²)
|
||||
sigma_imu_gyro: 0.003 # gyroscope noise (rad/s)
|
||||
sigma_vel_drift: 0.10 # velocity random-walk process noise (m/s)
|
||||
dropout_vel_damp: 0.95 # velocity damping factor per second during UWB dropout
|
||||
|
||||
# ── Dropout thresholds ──────────────────────────────────────────────────
|
||||
uwb_dropout_warn_s: 2.0 # log warning if UWB silent > this (s)
|
||||
uwb_dropout_max_s: 10.0 # suppress pose output if UWB silent > this (s)
|
||||
vo_dropout_warn_s: 1.0 # log warning if VO silent > this (s)
|
||||
|
||||
# ── Topic / frame configuration ─────────────────────────────────────────
|
||||
imu_topic: /imu/data
|
||||
uwb_pose_topic: /saltybot/pose/fused_cov # from saltybot_uwb_imu_fusion
|
||||
vo_topic: /saltybot/visual_odom # from saltybot_visual_odom
|
||||
|
||||
map_frame: map
|
||||
base_frame: base_link
|
||||
@ -0,0 +1,29 @@
|
||||
"""pose_fusion.launch.py — Launch the multi-sensor pose fusion node (Issue #595)."""
|
||||
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
pkg_share = get_package_share_directory('saltybot_pose_fusion')
|
||||
default_params = os.path.join(pkg_share, 'config', 'pose_fusion_params.yaml')
|
||||
|
||||
params_arg = DeclareLaunchArgument(
|
||||
'params_file',
|
||||
default_value=default_params,
|
||||
description='Path to pose_fusion_params.yaml',
|
||||
)
|
||||
|
||||
pose_fusion_node = Node(
|
||||
package='saltybot_pose_fusion',
|
||||
executable='pose_fusion',
|
||||
name='pose_fusion',
|
||||
output='screen',
|
||||
parameters=[LaunchConfiguration('params_file')],
|
||||
)
|
||||
|
||||
return LaunchDescription([params_arg, pose_fusion_node])
|
||||
35
jetson/ros2_ws/src/saltybot_pose_fusion/package.xml
Normal file
35
jetson/ros2_ws/src/saltybot_pose_fusion/package.xml
Normal file
@ -0,0 +1,35 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_pose_fusion</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
Multi-sensor EKF pose fusion node (Issue #595).
|
||||
Fuses UWB+IMU absolute pose (/saltybot/pose/fused_cov), visual odometry
|
||||
(/saltybot/visual_odom), and raw IMU (/imu/data) into a single authoritative
|
||||
map-frame PoseWithCovarianceStamped on /saltybot/pose/authoritative with
|
||||
TF2 map→base_link broadcast.
|
||||
</description>
|
||||
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>nav_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>tf2_ros</depend>
|
||||
|
||||
<exec_depend>python3-numpy</exec_depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,377 @@
|
||||
"""
|
||||
pose_fusion_ekf.py — Extended Kalman Filter for multi-sensor pose fusion (Issue #595).
|
||||
|
||||
No ROS2 dependencies — pure Python + numpy, fully unit-testable.
|
||||
|
||||
Fuses three sensor streams
|
||||
──────────────────────────
|
||||
1. IMU (200 Hz) — predict step (body-frame accel + gyro)
|
||||
2. UWB+IMU fused pose (10 Hz) — absolute position and heading update
|
||||
3. Visual odometry (30 Hz) — velocity-domain update (drift-free short-term motion)
|
||||
|
||||
State vector [x, y, θ, vx, vy, ω]ᵀ
|
||||
────────────────────────────────────
|
||||
x — position, forward (m, map frame)
|
||||
y — position, lateral (m, map frame)
|
||||
θ — heading, yaw (rad, CCW positive)
|
||||
vx — velocity x (m/s, map frame)
|
||||
vy — velocity y (m/s, map frame)
|
||||
ω — yaw rate (rad/s)
|
||||
|
||||
Process model (IMU as control input):
|
||||
θ_{k+1} = θ_k + ω_imu * dt
|
||||
vx_{k+1} = vx_k + (ax_body*cos θ - ay_body*sin θ) * dt
|
||||
vy_{k+1} = vy_k + (ax_body*sin θ + ay_body*cos θ) * dt
|
||||
x_{k+1} = x_k + vx_k * dt
|
||||
y_{k+1} = y_k + vy_k * dt
|
||||
ω_{k+1} = ω_imu
|
||||
|
||||
Measurement models
|
||||
──────────────────
|
||||
UWB position: z = [x, y] H[2×6] = [[1,0,0,0,0,0],[0,1,0,0,0,0]]
|
||||
UWB heading: z = [θ] H[1×6] = [[0,0,1,0,0,0]]
|
||||
VO velocity: z = [vx_w, vy_w, ω] H[3×6] = diag of [vx,vy,ω] rows in state
|
||||
|
||||
VO velocity measurement: VO gives vx_body (forward speed) and ω.
|
||||
We rotate to world frame: vx_w = vx_body*cos(θ), vy_w = vx_body*sin(θ)
|
||||
then update [vx, vy, ω] state directly (linear measurement step).
|
||||
|
||||
Sensor dropout handling
|
||||
───────────────────────
|
||||
_uwb_dropout_s: seconds since last UWB update
|
||||
_vo_dropout_s: seconds since last VO update
|
||||
Both increment during predict(), reset to 0 on update.
|
||||
When uwb_dropout_s > uwb_max_s, velocity damping activates and output
|
||||
is suppressed by the node.
|
||||
|
||||
Sensor weights (configurable via constructor)
|
||||
─────────────────────────────────────────────
|
||||
sigma_uwb_pos_m — UWB+IMU fused position σ (m) default 0.10
|
||||
sigma_uwb_head_rad — UWB+IMU fused heading σ (rad) default 0.15
|
||||
sigma_vo_vel_m_s — VO linear velocity σ (m/s) default 0.05
|
||||
sigma_vo_omega_r_s — VO yaw-rate σ (rad/s) default 0.03
|
||||
sigma_imu_accel — IMU accel noise σ (m/s²) default 0.05
|
||||
sigma_imu_gyro — IMU gyro noise σ (rad/s) default 0.003
|
||||
sigma_vel_drift — velocity process noise σ (m/s) default 0.10
|
||||
dropout_vel_damp — velocity damping/s during UWB drop default 0.95
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from typing import Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
# ── State indices ──────────────────────────────────────────────────────────────
|
||||
N = 6
|
||||
IX, IY, IT, IVX, IVY, IW = range(N)
|
||||
|
||||
|
||||
def _wrap(a: float) -> float:
|
||||
"""Wrap angle to (−π, π]."""
|
||||
return float((a + math.pi) % (2.0 * math.pi) - math.pi)
|
||||
|
||||
|
||||
class PoseFusionEKF:
|
||||
"""
|
||||
EKF fusing UWB+IMU absolute pose (10 Hz), visual-odometry velocity (30 Hz),
|
||||
and raw IMU (200 Hz) into a single authoritative map-frame pose.
|
||||
|
||||
Thread model: single-threaded — caller must serialise access.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
sigma_uwb_pos_m: float = 0.10,
|
||||
sigma_uwb_head_rad: float = 0.15,
|
||||
sigma_vo_vel_m_s: float = 0.05,
|
||||
sigma_vo_omega_r_s: float = 0.03,
|
||||
sigma_imu_accel: float = 0.05,
|
||||
sigma_imu_gyro: float = 0.003,
|
||||
sigma_vel_drift: float = 0.10,
|
||||
dropout_vel_damp: float = 0.95,
|
||||
) -> None:
|
||||
# Measurement noise variances
|
||||
self._R_uwb_pos = sigma_uwb_pos_m ** 2
|
||||
self._R_uwb_head = sigma_uwb_head_rad ** 2
|
||||
self._R_vo_vel = sigma_vo_vel_m_s ** 2
|
||||
self._R_vo_omega = sigma_vo_omega_r_s ** 2
|
||||
|
||||
# Process noise parameters
|
||||
self._q_a = sigma_imu_accel ** 2
|
||||
self._q_g = sigma_imu_gyro ** 2
|
||||
self._q_v = sigma_vel_drift ** 2
|
||||
self._damp = dropout_vel_damp
|
||||
|
||||
# State & covariance
|
||||
self._x = np.zeros(N, dtype=np.float64)
|
||||
self._P = np.eye(N, dtype=np.float64) * 9.0
|
||||
|
||||
# Simple accel bias estimator
|
||||
self._accel_bias = np.zeros(2, dtype=np.float64)
|
||||
self._bias_alpha = 0.005
|
||||
|
||||
self._init = False
|
||||
self._uwb_dropout_s = 0.0
|
||||
self._vo_dropout_s = 0.0
|
||||
|
||||
# ── Initialisation ─────────────────────────────────────────────────────────
|
||||
|
||||
def initialise(self, x: float, y: float, heading: float = 0.0) -> None:
|
||||
"""Seed filter with known position (called on first UWB measurement)."""
|
||||
self._x[:] = 0.0
|
||||
self._x[IX] = x
|
||||
self._x[IY] = y
|
||||
self._x[IT] = heading
|
||||
self._P = np.diag([0.25, 0.25, 0.1, 1.0, 1.0, 0.1])
|
||||
self._init = True
|
||||
|
||||
# ── IMU predict step (200 Hz) ──────────────────────────────────────────────
|
||||
|
||||
def predict_imu(
|
||||
self,
|
||||
ax_body: float,
|
||||
ay_body: float,
|
||||
omega: float,
|
||||
dt: float,
|
||||
) -> None:
|
||||
"""
|
||||
EKF predict driven by body-frame IMU.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ax_body : forward accel (m/s², body frame)
|
||||
ay_body : lateral accel (m/s², body frame, left positive)
|
||||
omega : yaw rate (rad/s, CCW positive)
|
||||
dt : timestep (s)
|
||||
"""
|
||||
if not self._init:
|
||||
return
|
||||
|
||||
# Bias-compensated acceleration
|
||||
ax_c = ax_body - self._accel_bias[0]
|
||||
ay_c = ay_body - self._accel_bias[1]
|
||||
|
||||
x = self._x
|
||||
th = x[IT]
|
||||
ct = math.cos(th)
|
||||
st = math.sin(th)
|
||||
|
||||
# World-frame acceleration
|
||||
ax_w = ax_c * ct - ay_c * st
|
||||
ay_w = ax_c * st + ay_c * ct
|
||||
|
||||
# Propagate state
|
||||
xp = x.copy()
|
||||
xp[IX] = x[IX] + x[IVX] * dt
|
||||
xp[IY] = x[IY] + x[IVY] * dt
|
||||
xp[IT] = _wrap(x[IT] + omega * dt)
|
||||
xp[IVX] = x[IVX] + ax_w * dt
|
||||
xp[IVY] = x[IVY] + ay_w * dt
|
||||
xp[IW] = omega
|
||||
|
||||
# Jacobian F = ∂f/∂x
|
||||
F = np.eye(N, dtype=np.float64)
|
||||
F[IX, IVX] = dt
|
||||
F[IY, IVY] = dt
|
||||
F[IVX, IT] = (-ax_c * st - ay_c * ct) * dt
|
||||
F[IVY, IT] = ( ax_c * ct - ay_c * st) * dt
|
||||
|
||||
# Process noise Q
|
||||
dt2 = dt * dt
|
||||
Q = np.diag([
|
||||
0.5 * self._q_a * dt2 * dt2,
|
||||
0.5 * self._q_a * dt2 * dt2,
|
||||
self._q_g * dt2,
|
||||
self._q_v * dt + self._q_a * dt2,
|
||||
self._q_v * dt + self._q_a * dt2,
|
||||
self._q_g,
|
||||
])
|
||||
|
||||
self._x = xp
|
||||
self._P = F @ self._P @ F.T + Q
|
||||
|
||||
# Advance dropout clocks
|
||||
self._uwb_dropout_s += dt
|
||||
self._vo_dropout_s += dt
|
||||
|
||||
# Velocity damping during extended UWB dropout
|
||||
if self._uwb_dropout_s > 2.0:
|
||||
d = self._damp ** dt
|
||||
self._x[IVX] *= d
|
||||
self._x[IVY] *= d
|
||||
|
||||
# Running accel bias estimate
|
||||
self._accel_bias[0] += self._bias_alpha * (ax_body - self._accel_bias[0])
|
||||
self._accel_bias[1] += self._bias_alpha * (ay_body - self._accel_bias[1])
|
||||
|
||||
# ── UWB+IMU position update (10 Hz) ───────────────────────────────────────
|
||||
|
||||
def update_uwb_position(
|
||||
self,
|
||||
x_m: float,
|
||||
y_m: float,
|
||||
sigma_m: float | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Update from UWB+IMU fused position measurement [x, y].
|
||||
|
||||
Parameters
|
||||
----------
|
||||
x_m, y_m : measured position (m, map frame)
|
||||
sigma_m : std-dev override; uses constructor default if None
|
||||
"""
|
||||
if not self._init:
|
||||
self.initialise(x_m, y_m)
|
||||
return
|
||||
|
||||
R_val = (sigma_m ** 2) if sigma_m is not None else self._R_uwb_pos
|
||||
R = np.eye(2, dtype=np.float64) * R_val
|
||||
|
||||
H = np.zeros((2, N), dtype=np.float64)
|
||||
H[0, IX] = 1.0
|
||||
H[1, IY] = 1.0
|
||||
|
||||
innov = np.array([x_m - self._x[IX], y_m - self._x[IY]])
|
||||
_ekf_update(self._x, self._P, H, R, innov)
|
||||
self._x[IT] = _wrap(self._x[IT])
|
||||
|
||||
self._uwb_dropout_s = 0.0
|
||||
|
||||
# ── UWB+IMU heading update (10 Hz) ────────────────────────────────────────
|
||||
|
||||
def update_uwb_heading(
|
||||
self,
|
||||
heading_rad: float,
|
||||
sigma_rad: float | None = None,
|
||||
) -> None:
|
||||
"""Update heading from UWB+IMU fused estimate."""
|
||||
if not self._init:
|
||||
return
|
||||
|
||||
R_val = (sigma_rad ** 2) if sigma_rad is not None else self._R_uwb_head
|
||||
R = np.array([[R_val]])
|
||||
|
||||
H = np.zeros((1, N), dtype=np.float64)
|
||||
H[0, IT] = 1.0
|
||||
|
||||
innov = np.array([_wrap(heading_rad - self._x[IT])])
|
||||
_ekf_update(self._x, self._P, H, R, innov)
|
||||
self._x[IT] = _wrap(self._x[IT])
|
||||
|
||||
# ── Visual-odometry velocity update (30 Hz) ───────────────────────────────
|
||||
|
||||
def update_vo_velocity(
|
||||
self,
|
||||
vx_body: float,
|
||||
omega: float,
|
||||
sigma_vel: float | None = None,
|
||||
sigma_omega: float | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Update from visual-odometry velocity measurement.
|
||||
|
||||
VO gives forward speed (vx_body, m/s) in the robot body frame, and
|
||||
yaw rate (omega, rad/s). We rotate vx_body to world frame using the
|
||||
current heading estimate, then update [vx, vy, ω] state.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
vx_body : VO forward linear speed (m/s)
|
||||
omega : VO yaw rate (rad/s)
|
||||
sigma_vel : std-dev for linear velocity component (m/s)
|
||||
sigma_omega: std-dev for yaw rate (rad/s)
|
||||
"""
|
||||
if not self._init:
|
||||
return
|
||||
|
||||
R_vel = (sigma_vel ** 2) if sigma_vel is not None else self._R_vo_vel
|
||||
R_omega = (sigma_omega ** 2) if sigma_omega is not None else self._R_vo_omega
|
||||
|
||||
th = self._x[IT]
|
||||
ct = math.cos(th)
|
||||
st = math.sin(th)
|
||||
|
||||
# Rotate body-frame vx to world-frame [vx_w, vy_w]
|
||||
vx_w = vx_body * ct
|
||||
vy_w = vx_body * st
|
||||
|
||||
# Measurement: [vx_w, vy_w, ω]
|
||||
z = np.array([vx_w, vy_w, omega])
|
||||
|
||||
H = np.zeros((3, N), dtype=np.float64)
|
||||
H[0, IVX] = 1.0
|
||||
H[1, IVY] = 1.0
|
||||
H[2, IW] = 1.0
|
||||
|
||||
R = np.diag([R_vel, R_vel, R_omega])
|
||||
|
||||
innov = z - H @ self._x
|
||||
_ekf_update(self._x, self._P, H, R, innov)
|
||||
self._x[IT] = _wrap(self._x[IT])
|
||||
|
||||
self._vo_dropout_s = 0.0
|
||||
|
||||
# ── Accessors ─────────────────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
def position(self) -> Tuple[float, float]:
|
||||
return float(self._x[IX]), float(self._x[IY])
|
||||
|
||||
@property
|
||||
def heading(self) -> float:
|
||||
return float(self._x[IT])
|
||||
|
||||
@property
|
||||
def velocity(self) -> Tuple[float, float]:
|
||||
return float(self._x[IVX]), float(self._x[IVY])
|
||||
|
||||
@property
|
||||
def yaw_rate(self) -> float:
|
||||
return float(self._x[IW])
|
||||
|
||||
@property
|
||||
def covariance_position(self) -> np.ndarray:
|
||||
"""2×2 position sub-covariance [x, y]."""
|
||||
return self._P[:2, :2].copy()
|
||||
|
||||
@property
|
||||
def covariance_full(self) -> np.ndarray:
|
||||
"""Full 6×6 state covariance."""
|
||||
return self._P.copy()
|
||||
|
||||
@property
|
||||
def is_initialised(self) -> bool:
|
||||
return self._init
|
||||
|
||||
@property
|
||||
def uwb_dropout_s(self) -> float:
|
||||
return self._uwb_dropout_s
|
||||
|
||||
@property
|
||||
def vo_dropout_s(self) -> float:
|
||||
return self._vo_dropout_s
|
||||
|
||||
def position_uncertainty_m(self) -> float:
|
||||
"""Approx 1-σ position uncertainty (m)."""
|
||||
return float(math.sqrt((self._P[IX, IX] + self._P[IY, IY]) / 2.0))
|
||||
|
||||
|
||||
# ── EKF update helper ──────────────────────────────────────────────────────────
|
||||
|
||||
def _ekf_update(
|
||||
x: np.ndarray,
|
||||
P: np.ndarray,
|
||||
H: np.ndarray,
|
||||
R: np.ndarray,
|
||||
innov: np.ndarray,
|
||||
) -> None:
|
||||
"""Standard linear EKF update step — modifies x and P in place."""
|
||||
S = H @ P @ H.T + R
|
||||
K = P @ H.T @ np.linalg.inv(S)
|
||||
x += K @ innov
|
||||
I_KH = np.eye(N, dtype=np.float64) - K @ H
|
||||
# Joseph form for numerical stability
|
||||
P[:] = I_KH @ P @ I_KH.T + K @ R @ K.T
|
||||
@ -0,0 +1,396 @@
|
||||
"""
|
||||
pose_fusion_node.py — Multi-sensor pose fusion node (Issue #595).
|
||||
|
||||
Fuses three sensor streams into a single authoritative map-frame pose:
|
||||
|
||||
IMU (200 Hz) /imu/data
|
||||
UWB + IMU fused pose /saltybot/pose/fused_cov (10 Hz)
|
||||
Visual odometry /saltybot/visual_odom (30 Hz)
|
||||
|
||||
The UWB+IMU stream (from saltybot_uwb_imu_fusion) provides absolute position
|
||||
anchoring every ~100 ms. Visual odometry fills in smooth relative motion
|
||||
between UWB updates at 30 Hz. Raw IMU drives the predict step at 200 Hz.
|
||||
|
||||
Publishes
|
||||
─────────
|
||||
/saltybot/pose/authoritative geometry_msgs/PoseWithCovarianceStamped
|
||||
/saltybot/pose/status std_msgs/String (JSON, 10 Hz)
|
||||
|
||||
TF2
|
||||
───
|
||||
map → base_link (broadcast at IMU rate when filter is healthy)
|
||||
|
||||
Parameters (see config/pose_fusion_params.yaml)
|
||||
───────────────────────────────────────────────
|
||||
Sensor weights:
|
||||
sigma_uwb_pos_m float 0.10 UWB position 1-σ (m)
|
||||
use_uwb_covariance bool true extract σ from PoseWithCovarianceStamped
|
||||
sigma_uwb_head_rad float 0.15 UWB heading 1-σ (rad)
|
||||
sigma_vo_vel_m_s float 0.05 VO linear-velocity 1-σ (m/s)
|
||||
sigma_vo_omega_r_s float 0.03 VO yaw-rate 1-σ (rad/s)
|
||||
sigma_imu_accel float 0.05 IMU accel noise (m/s²)
|
||||
sigma_imu_gyro float 0.003 IMU gyro noise (rad/s)
|
||||
sigma_vel_drift float 0.10 velocity process-noise (m/s)
|
||||
dropout_vel_damp float 0.95 velocity damping/s during UWB dropout
|
||||
|
||||
Dropout thresholds:
|
||||
uwb_dropout_warn_s float 2.0 log warning after this many s without UWB
|
||||
uwb_dropout_max_s float 10.0 suppress output after this many s
|
||||
vo_dropout_warn_s float 1.0 log warning after this many s without VO
|
||||
|
||||
Frame IDs:
|
||||
map_frame str map
|
||||
base_frame str base_link
|
||||
|
||||
Topics:
|
||||
imu_topic str /imu/data
|
||||
uwb_pose_topic str /saltybot/pose/fused_cov
|
||||
vo_topic str /saltybot/visual_odom
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import math
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import (
|
||||
QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
|
||||
)
|
||||
|
||||
import tf2_ros
|
||||
from geometry_msgs.msg import (
|
||||
PoseWithCovarianceStamped, TransformStamped
|
||||
)
|
||||
from nav_msgs.msg import Odometry
|
||||
from sensor_msgs.msg import Imu
|
||||
from std_msgs.msg import String
|
||||
|
||||
from .pose_fusion_ekf import PoseFusionEKF
|
||||
|
||||
|
||||
# ── QoS ───────────────────────────────────────────────────────────────────────
|
||||
_SENSOR_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=5,
|
||||
)
|
||||
_RELIABLE_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=5,
|
||||
)
|
||||
|
||||
|
||||
def _quat_to_yaw(qx: float, qy: float, qz: float, qw: float) -> float:
|
||||
"""Extract yaw (Z-axis rotation) from quaternion."""
|
||||
return math.atan2(2.0 * (qw * qz + qx * qy),
|
||||
1.0 - 2.0 * (qy * qy + qz * qz))
|
||||
|
||||
|
||||
def _yaw_to_qz_qw(yaw: float):
|
||||
half = yaw * 0.5
|
||||
return math.sin(half), math.cos(half)
|
||||
|
||||
|
||||
def _cov36_pos_sigma(cov36: list) -> float | None:
|
||||
"""Extract average xy position sigma from 6×6 covariance row-major list."""
|
||||
try:
|
||||
p_xx = cov36[0]
|
||||
p_yy = cov36[7]
|
||||
if p_xx > 0.0 and p_yy > 0.0:
|
||||
return float(math.sqrt((p_xx + p_yy) / 2.0))
|
||||
except (IndexError, ValueError):
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def _cov36_heading_sigma(cov36: list) -> float | None:
|
||||
"""Extract heading sigma from 6×6 covariance row-major list (index 35 = yaw)."""
|
||||
try:
|
||||
p_yaw = cov36[35]
|
||||
if p_yaw > 0.0:
|
||||
return float(math.sqrt(p_yaw))
|
||||
except (IndexError, ValueError):
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
class PoseFusionNode(Node):
|
||||
"""Multi-sensor EKF pose fusion — see module docstring for details."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__('pose_fusion')
|
||||
|
||||
# ── Parameters ────────────────────────────────────────────────────────
|
||||
self.declare_parameter('sigma_uwb_pos_m', 0.10)
|
||||
self.declare_parameter('use_uwb_covariance', True)
|
||||
self.declare_parameter('sigma_uwb_head_rad', 0.15)
|
||||
self.declare_parameter('sigma_vo_vel_m_s', 0.05)
|
||||
self.declare_parameter('sigma_vo_omega_r_s', 0.03)
|
||||
self.declare_parameter('sigma_imu_accel', 0.05)
|
||||
self.declare_parameter('sigma_imu_gyro', 0.003)
|
||||
self.declare_parameter('sigma_vel_drift', 0.10)
|
||||
self.declare_parameter('dropout_vel_damp', 0.95)
|
||||
self.declare_parameter('uwb_dropout_warn_s', 2.0)
|
||||
self.declare_parameter('uwb_dropout_max_s', 10.0)
|
||||
self.declare_parameter('vo_dropout_warn_s', 1.0)
|
||||
self.declare_parameter('map_frame', 'map')
|
||||
self.declare_parameter('base_frame', 'base_link')
|
||||
self.declare_parameter('imu_topic', '/imu/data')
|
||||
self.declare_parameter('uwb_pose_topic', '/saltybot/pose/fused_cov')
|
||||
self.declare_parameter('vo_topic', '/saltybot/visual_odom')
|
||||
|
||||
self._map_frame = self.get_parameter('map_frame').value
|
||||
self._base_frame = self.get_parameter('base_frame').value
|
||||
self._use_uwb_cov = self.get_parameter('use_uwb_covariance').value
|
||||
self._sigma_uwb_pos = self.get_parameter('sigma_uwb_pos_m').value
|
||||
self._sigma_uwb_head = self.get_parameter('sigma_uwb_head_rad').value
|
||||
self._uwb_warn = self.get_parameter('uwb_dropout_warn_s').value
|
||||
self._uwb_max = self.get_parameter('uwb_dropout_max_s').value
|
||||
self._vo_warn = self.get_parameter('vo_dropout_warn_s').value
|
||||
|
||||
# ── EKF ───────────────────────────────────────────────────────────────
|
||||
self._ekf = PoseFusionEKF(
|
||||
sigma_uwb_pos_m = self._sigma_uwb_pos,
|
||||
sigma_uwb_head_rad = self._sigma_uwb_head,
|
||||
sigma_vo_vel_m_s = self.get_parameter('sigma_vo_vel_m_s').value,
|
||||
sigma_vo_omega_r_s = self.get_parameter('sigma_vo_omega_r_s').value,
|
||||
sigma_imu_accel = self.get_parameter('sigma_imu_accel').value,
|
||||
sigma_imu_gyro = self.get_parameter('sigma_imu_gyro').value,
|
||||
sigma_vel_drift = self.get_parameter('sigma_vel_drift').value,
|
||||
dropout_vel_damp = self.get_parameter('dropout_vel_damp').value,
|
||||
)
|
||||
|
||||
self._last_imu_t: float | None = None
|
||||
|
||||
# ── Publishers ─────────────────────────────────────────────────────────
|
||||
self._pose_pub = self.create_publisher(
|
||||
PoseWithCovarianceStamped, '/saltybot/pose/authoritative', 10
|
||||
)
|
||||
self._status_pub = self.create_publisher(
|
||||
String, '/saltybot/pose/status', 10
|
||||
)
|
||||
self._tf_br = tf2_ros.TransformBroadcaster(self)
|
||||
|
||||
# ── Subscriptions ──────────────────────────────────────────────────────
|
||||
self.create_subscription(
|
||||
Imu,
|
||||
self.get_parameter('imu_topic').value,
|
||||
self._on_imu,
|
||||
_SENSOR_QOS,
|
||||
)
|
||||
self.create_subscription(
|
||||
PoseWithCovarianceStamped,
|
||||
self.get_parameter('uwb_pose_topic').value,
|
||||
self._on_uwb_pose,
|
||||
_RELIABLE_QOS,
|
||||
)
|
||||
self.create_subscription(
|
||||
Odometry,
|
||||
self.get_parameter('vo_topic').value,
|
||||
self._on_vo,
|
||||
_SENSOR_QOS,
|
||||
)
|
||||
|
||||
# ── Status timer (10 Hz) ────────────────────────────────────────────────
|
||||
self.create_timer(0.1, self._on_status_timer)
|
||||
|
||||
self.get_logger().info(
|
||||
f'pose_fusion ready — '
|
||||
f'IMU:{self.get_parameter("imu_topic").value} '
|
||||
f'UWB:{self.get_parameter("uwb_pose_topic").value} '
|
||||
f'VO:{self.get_parameter("vo_topic").value} '
|
||||
f'map:{self._map_frame}→{self._base_frame}'
|
||||
)
|
||||
|
||||
# ── IMU callback — predict step ────────────────────────────────────────────
|
||||
|
||||
def _on_imu(self, msg: Imu) -> None:
|
||||
now = self.get_clock().now().nanoseconds * 1e-9
|
||||
|
||||
if self._last_imu_t is None:
|
||||
self._last_imu_t = now
|
||||
return
|
||||
|
||||
dt = now - self._last_imu_t
|
||||
self._last_imu_t = now
|
||||
|
||||
if dt <= 0.0 or dt > 0.5:
|
||||
return
|
||||
|
||||
self._ekf.predict_imu(
|
||||
ax_body = msg.linear_acceleration.x,
|
||||
ay_body = msg.linear_acceleration.y,
|
||||
omega = msg.angular_velocity.z,
|
||||
dt = dt,
|
||||
)
|
||||
|
||||
if not self._ekf.is_initialised:
|
||||
return
|
||||
|
||||
# Warn / suppress on UWB dropout
|
||||
if self._ekf.uwb_dropout_s > self._uwb_max:
|
||||
self.get_logger().warn(
|
||||
f'UWB dropout {self._ekf.uwb_dropout_s:.1f}s '
|
||||
'> max — pose output suppressed',
|
||||
throttle_duration_sec=5.0,
|
||||
)
|
||||
return
|
||||
|
||||
if self._ekf.uwb_dropout_s > self._uwb_warn:
|
||||
self.get_logger().warn(
|
||||
f'UWB dropout {self._ekf.uwb_dropout_s:.1f}s — '
|
||||
'dead-reckoning only',
|
||||
throttle_duration_sec=2.0,
|
||||
)
|
||||
|
||||
self._publish_pose(msg.header.stamp)
|
||||
|
||||
# ── UWB+IMU fused pose callback — position+heading update ─────────────────
|
||||
|
||||
def _on_uwb_pose(self, msg: PoseWithCovarianceStamped) -> None:
|
||||
x = msg.pose.pose.position.x
|
||||
y = msg.pose.pose.position.y
|
||||
|
||||
# Position sigma: from message covariance or parameter
|
||||
sigma_pos: float | None = None
|
||||
if self._use_uwb_cov:
|
||||
sigma_pos = _cov36_pos_sigma(list(msg.pose.covariance))
|
||||
if sigma_pos is None or sigma_pos <= 0.0:
|
||||
sigma_pos = self._sigma_uwb_pos
|
||||
|
||||
self._ekf.update_uwb_position(x, y, sigma_m=sigma_pos)
|
||||
|
||||
# Heading update from quaternion
|
||||
q = msg.pose.pose.orientation
|
||||
yaw = _quat_to_yaw(q.x, q.y, q.z, q.w)
|
||||
|
||||
sigma_head: float | None = None
|
||||
if self._use_uwb_cov:
|
||||
sigma_head = _cov36_heading_sigma(list(msg.pose.covariance))
|
||||
if sigma_head is None or sigma_head <= 0.0:
|
||||
sigma_head = self._sigma_uwb_head
|
||||
|
||||
# Only update heading if quaternion is non-trivial (some nodes publish q=(0,0,0,0))
|
||||
if abs(q.w) > 0.01 or abs(q.z) > 0.01:
|
||||
self._ekf.update_uwb_heading(yaw, sigma_rad=sigma_head)
|
||||
|
||||
# ── Visual-odometry callback — velocity update ─────────────────────────────
|
||||
|
||||
def _on_vo(self, msg: Odometry) -> None:
|
||||
if not self._ekf.is_initialised:
|
||||
return
|
||||
|
||||
vx_body = msg.twist.twist.linear.x
|
||||
omega = msg.twist.twist.angular.z
|
||||
|
||||
# Extract per-sensor sigma from twist covariance if present
|
||||
# Row-major 6×6: [0]=vx, [7]=vy, [35]=ωz
|
||||
sigma_vel: float | None = None
|
||||
sigma_omega: float | None = None
|
||||
cov = list(msg.twist.covariance)
|
||||
if len(cov) == 36:
|
||||
p_vx = cov[0]
|
||||
p_wz = cov[35]
|
||||
if p_vx > 0.0:
|
||||
sigma_vel = float(math.sqrt(p_vx))
|
||||
if p_wz > 0.0:
|
||||
sigma_omega = float(math.sqrt(p_wz))
|
||||
|
||||
# Warn on VO dropout (but never suppress — VO is secondary)
|
||||
if self._ekf.vo_dropout_s > self._vo_warn:
|
||||
self.get_logger().warn(
|
||||
f'VO dropout {self._ekf.vo_dropout_s:.1f}s',
|
||||
throttle_duration_sec=2.0,
|
||||
)
|
||||
|
||||
self._ekf.update_vo_velocity(
|
||||
vx_body = vx_body,
|
||||
omega = omega,
|
||||
sigma_vel = sigma_vel,
|
||||
sigma_omega = sigma_omega,
|
||||
)
|
||||
|
||||
# ── Status timer (10 Hz) ──────────────────────────────────────────────────
|
||||
|
||||
def _on_status_timer(self) -> None:
|
||||
if not self._ekf.is_initialised:
|
||||
return
|
||||
x, y = self._ekf.position
|
||||
status = {
|
||||
'x': round(x, 4),
|
||||
'y': round(y, 4),
|
||||
'heading_deg': round(math.degrees(self._ekf.heading), 2),
|
||||
'pos_uncertainty_m': round(self._ekf.position_uncertainty_m(), 4),
|
||||
'uwb_dropout_s': round(self._ekf.uwb_dropout_s, 2),
|
||||
'vo_dropout_s': round(self._ekf.vo_dropout_s, 2),
|
||||
'healthy': self._ekf.uwb_dropout_s < self._uwb_max,
|
||||
}
|
||||
msg = String()
|
||||
msg.data = json.dumps(status)
|
||||
self._status_pub.publish(msg)
|
||||
|
||||
# ── Publish authoritative pose ────────────────────────────────────────────
|
||||
|
||||
def _publish_pose(self, stamp) -> None:
|
||||
x, y = self._ekf.position
|
||||
heading = self._ekf.heading
|
||||
cov_pos = self._ekf.covariance_position
|
||||
cov6 = self._ekf.covariance_full
|
||||
vx, vy = self._ekf.velocity
|
||||
omega = self._ekf.yaw_rate
|
||||
|
||||
qz, qw = _yaw_to_qz_qw(heading)
|
||||
|
||||
msg = PoseWithCovarianceStamped()
|
||||
msg.header.stamp = stamp
|
||||
msg.header.frame_id = self._map_frame
|
||||
msg.pose.pose.position.x = x
|
||||
msg.pose.pose.position.y = y
|
||||
msg.pose.pose.position.z = 0.0
|
||||
msg.pose.pose.orientation.z = qz
|
||||
msg.pose.pose.orientation.w = qw
|
||||
|
||||
# Build 6×6 row-major covariance
|
||||
cov36 = [0.0] * 36
|
||||
cov36[0] = cov_pos[0, 0] # x,x
|
||||
cov36[1] = cov_pos[0, 1] # x,y
|
||||
cov36[6] = cov_pos[1, 0] # y,x
|
||||
cov36[7] = cov_pos[1, 1] # y,y
|
||||
cov36[14] = 1e-4 # z (not estimated)
|
||||
cov36[21] = 1e-4 # roll
|
||||
cov36[28] = 1e-4 # pitch
|
||||
cov36[35] = float(cov6[2, 2]) # yaw
|
||||
msg.pose.covariance = cov36
|
||||
|
||||
self._pose_pub.publish(msg)
|
||||
|
||||
# TF2: map → base_link
|
||||
tf = TransformStamped()
|
||||
tf.header.stamp = stamp
|
||||
tf.header.frame_id = self._map_frame
|
||||
tf.child_frame_id = self._base_frame
|
||||
tf.transform.translation.x = x
|
||||
tf.transform.translation.y = y
|
||||
tf.transform.translation.z = 0.0
|
||||
tf.transform.rotation.z = qz
|
||||
tf.transform.rotation.w = qw
|
||||
self._tf_br.sendTransform(tf)
|
||||
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = PoseFusionNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.try_shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
5
jetson/ros2_ws/src/saltybot_pose_fusion/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_pose_fusion/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_pose_fusion
|
||||
|
||||
[install]
|
||||
install_scripts=$base/lib/saltybot_pose_fusion
|
||||
30
jetson/ros2_ws/src/saltybot_pose_fusion/setup.py
Normal file
30
jetson/ros2_ws/src/saltybot_pose_fusion/setup.py
Normal file
@ -0,0 +1,30 @@
|
||||
import os
|
||||
from glob import glob
|
||||
from setuptools import setup
|
||||
|
||||
package_name = 'saltybot_pose_fusion'
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version='0.1.0',
|
||||
packages=[package_name],
|
||||
data_files=[
|
||||
('share/ament_index/resource_index/packages',
|
||||
['resource/' + package_name]),
|
||||
('share/' + package_name, ['package.xml']),
|
||||
(os.path.join('share', package_name, 'launch'), glob('launch/*.py')),
|
||||
(os.path.join('share', package_name, 'config'), glob('config/*.yaml')),
|
||||
],
|
||||
install_requires=['setuptools'],
|
||||
zip_safe=True,
|
||||
maintainer='sl-perception',
|
||||
maintainer_email='sl-perception@saltylab.local',
|
||||
description='Multi-sensor EKF pose fusion — UWB + visual odom + IMU (Issue #595)',
|
||||
license='MIT',
|
||||
tests_require=['pytest'],
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'pose_fusion = saltybot_pose_fusion.pose_fusion_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -0,0 +1,134 @@
|
||||
"""Unit tests for PoseFusionEKF (no ROS2 required)."""
|
||||
|
||||
import math
|
||||
import pytest
|
||||
from saltybot_pose_fusion.pose_fusion_ekf import PoseFusionEKF
|
||||
|
||||
|
||||
def make_ekf(**kwargs):
|
||||
return PoseFusionEKF(**kwargs)
|
||||
|
||||
|
||||
# ── Initialisation ─────────────────────────────────────────────────────────────
|
||||
|
||||
def test_not_initialised_before_uwb():
|
||||
ekf = make_ekf()
|
||||
assert not ekf.is_initialised
|
||||
|
||||
|
||||
def test_initialised_after_uwb_update():
|
||||
ekf = make_ekf()
|
||||
ekf.update_uwb_position(1.0, 2.0)
|
||||
assert ekf.is_initialised
|
||||
x, y = ekf.position
|
||||
assert abs(x - 1.0) < 1e-9
|
||||
assert abs(y - 2.0) < 1e-9
|
||||
|
||||
|
||||
# ── IMU predict ───────────────────────────────────────────────────────────────
|
||||
|
||||
def test_predict_advances_position():
|
||||
ekf = make_ekf()
|
||||
ekf.update_uwb_position(0.0, 0.0)
|
||||
# Give it a forward velocity by injecting a forward acceleration
|
||||
ekf.predict_imu(ax_body=1.0, ay_body=0.0, omega=0.0, dt=1.0)
|
||||
ekf.predict_imu(ax_body=1.0, ay_body=0.0, omega=0.0, dt=1.0)
|
||||
x, y = ekf.position
|
||||
assert x > 0.0, f"Expected forward motion, got x={x}"
|
||||
|
||||
|
||||
def test_predict_no_crash_before_init():
|
||||
ekf = make_ekf()
|
||||
ekf.predict_imu(ax_body=1.0, ay_body=0.0, omega=0.0, dt=0.01)
|
||||
assert not ekf.is_initialised
|
||||
|
||||
|
||||
# ── UWB position update ───────────────────────────────────────────────────────
|
||||
|
||||
def test_uwb_update_converges():
|
||||
ekf = make_ekf(sigma_uwb_pos_m=0.10)
|
||||
ekf.update_uwb_position(5.0, 3.0)
|
||||
for _ in range(20):
|
||||
ekf.predict_imu(ax_body=0.0, ay_body=0.0, omega=0.0, dt=0.05)
|
||||
ekf.update_uwb_position(5.0, 3.0)
|
||||
x, y = ekf.position
|
||||
assert abs(x - 5.0) < 0.5
|
||||
assert abs(y - 3.0) < 0.5
|
||||
|
||||
|
||||
def test_uwb_resets_dropout():
|
||||
ekf = make_ekf()
|
||||
ekf.update_uwb_position(0.0, 0.0)
|
||||
ekf.predict_imu(0.0, 0.0, 0.0, dt=5.0)
|
||||
assert ekf.uwb_dropout_s > 4.0
|
||||
ekf.update_uwb_position(0.0, 0.0)
|
||||
assert ekf.uwb_dropout_s == 0.0
|
||||
|
||||
|
||||
# ── UWB heading update ────────────────────────────────────────────────────────
|
||||
|
||||
def test_uwb_heading_update():
|
||||
ekf = make_ekf(sigma_uwb_head_rad=0.1)
|
||||
ekf.update_uwb_position(0.0, 0.0)
|
||||
target_yaw = math.radians(45.0)
|
||||
for _ in range(10):
|
||||
ekf.update_uwb_heading(target_yaw)
|
||||
assert abs(ekf.heading - target_yaw) < 0.2
|
||||
|
||||
|
||||
# ── VO velocity update ────────────────────────────────────────────────────────
|
||||
|
||||
def test_vo_velocity_update():
|
||||
ekf = make_ekf(sigma_vo_vel_m_s=0.05, sigma_vo_omega_r_s=0.03)
|
||||
ekf.update_uwb_position(0.0, 0.0)
|
||||
ekf.update_vo_velocity(vx_body=1.0, omega=0.0)
|
||||
vx, vy = ekf.velocity
|
||||
assert vx > 0.5, f"Expected vx>0.5 after VO update, got {vx}"
|
||||
|
||||
|
||||
def test_vo_resets_dropout():
|
||||
ekf = make_ekf()
|
||||
ekf.update_uwb_position(0.0, 0.0)
|
||||
ekf.predict_imu(0.0, 0.0, 0.0, dt=3.0)
|
||||
assert ekf.vo_dropout_s > 2.0
|
||||
ekf.update_vo_velocity(vx_body=0.0, omega=0.0)
|
||||
assert ekf.vo_dropout_s == 0.0
|
||||
|
||||
|
||||
# ── Covariance ────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_covariance_decreases_with_updates():
|
||||
ekf = make_ekf()
|
||||
ekf.update_uwb_position(0.0, 0.0)
|
||||
p_init = ekf.position_uncertainty_m()
|
||||
for _ in range(30):
|
||||
ekf.predict_imu(0.0, 0.0, 0.0, dt=0.05)
|
||||
ekf.update_uwb_position(0.0, 0.0)
|
||||
assert ekf.position_uncertainty_m() < p_init
|
||||
|
||||
|
||||
def test_covariance_matrix_shape():
|
||||
ekf = make_ekf()
|
||||
ekf.update_uwb_position(1.0, 1.0)
|
||||
assert ekf.covariance_position.shape == (2, 2)
|
||||
assert ekf.covariance_full.shape == (6, 6)
|
||||
|
||||
|
||||
# ── Sigma override ────────────────────────────────────────────────────────────
|
||||
|
||||
def test_custom_sigma_override():
|
||||
"""High sigma should produce weaker updates than low sigma."""
|
||||
ekf_weak = make_ekf(sigma_uwb_pos_m=2.0)
|
||||
ekf_strong = make_ekf(sigma_uwb_pos_m=0.01)
|
||||
|
||||
for ekf in (ekf_weak, ekf_strong):
|
||||
ekf.update_uwb_position(10.0, 0.0)
|
||||
ekf.predict_imu(0.0, 0.0, 0.0, dt=0.1)
|
||||
ekf.update_uwb_position(0.0, 0.0)
|
||||
|
||||
x_weak, _ = ekf_weak.position
|
||||
x_strong, _ = ekf_strong.position
|
||||
|
||||
# Strong measurement should pull closer to 0
|
||||
assert x_strong < x_weak, \
|
||||
f"Strong sigma should pull x closer to 0: strong={x_strong:.3f} weak={x_weak:.3f}"
|
||||
Loading…
x
Reference in New Issue
Block a user