feat: UWB-IMU EKF fusion for robust indoor localization (Issue #573) #581
@ -0,0 +1,26 @@
|
|||||||
|
uwb_imu_fusion:
|
||||||
|
ros__parameters:
|
||||||
|
# ── Topics ────────────────────────────────────────────────────────────────
|
||||||
|
imu_topic: /imu/data
|
||||||
|
uwb_bearing_topic: /uwb/bearing
|
||||||
|
uwb_pose_topic: /saltybot/uwb/pose
|
||||||
|
use_uwb_pose: true # true = use absolute /saltybot/uwb/pose
|
||||||
|
# false = use relative /uwb/bearing
|
||||||
|
|
||||||
|
# ── TF ────────────────────────────────────────────────────────────────────
|
||||||
|
publish_tf: true
|
||||||
|
map_frame: map
|
||||||
|
base_frame: base_link
|
||||||
|
|
||||||
|
# ── EKF noise parameters ──────────────────────────────────────────────────
|
||||||
|
# Increase sigma_uwb_pos_m if UWB is noisy (multipath, few anchors)
|
||||||
|
# Increase sigma_imu_* if IMU has high vibration noise
|
||||||
|
sigma_uwb_pos_m: 0.12 # UWB position std-dev (m) — DW3000 ±10 cm + geometry
|
||||||
|
sigma_imu_accel: 0.05 # IMU accel noise (m/s²)
|
||||||
|
sigma_imu_gyro: 0.003 # IMU gyro noise (rad/s)
|
||||||
|
sigma_vel_drift: 0.10 # velocity drift process noise (m/s)
|
||||||
|
dropout_vel_damping: 0.95 # velocity decay factor per second during dropout
|
||||||
|
|
||||||
|
# ── Dropout ───────────────────────────────────────────────────────────────
|
||||||
|
max_dead_reckoning_s: 10.0 # suppress fused pose output after this many
|
||||||
|
# seconds without a UWB update
|
||||||
@ -0,0 +1,43 @@
|
|||||||
|
"""
|
||||||
|
uwb_imu_fusion.launch.py — Launch UWB-IMU EKF fusion node (Issue #573)
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
ros2 launch saltybot_uwb_imu_fusion uwb_imu_fusion.launch.py
|
||||||
|
ros2 launch saltybot_uwb_imu_fusion uwb_imu_fusion.launch.py publish_tf:=false
|
||||||
|
ros2 launch saltybot_uwb_imu_fusion uwb_imu_fusion.launch.py sigma_uwb_pos_m:=0.20
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
pkg = get_package_share_directory("saltybot_uwb_imu_fusion")
|
||||||
|
cfg = os.path.join(pkg, "config", "fusion_params.yaml")
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument("publish_tf", default_value="true"),
|
||||||
|
DeclareLaunchArgument("use_uwb_pose", default_value="true"),
|
||||||
|
DeclareLaunchArgument("sigma_uwb_pos_m", default_value="0.12"),
|
||||||
|
DeclareLaunchArgument("max_dead_reckoning_s", default_value="10.0"),
|
||||||
|
|
||||||
|
Node(
|
||||||
|
package="saltybot_uwb_imu_fusion",
|
||||||
|
executable="uwb_imu_fusion",
|
||||||
|
name="uwb_imu_fusion",
|
||||||
|
output="screen",
|
||||||
|
parameters=[
|
||||||
|
cfg,
|
||||||
|
{
|
||||||
|
"publish_tf": LaunchConfiguration("publish_tf"),
|
||||||
|
"use_uwb_pose": LaunchConfiguration("use_uwb_pose"),
|
||||||
|
"sigma_uwb_pos_m": LaunchConfiguration("sigma_uwb_pos_m"),
|
||||||
|
"max_dead_reckoning_s": LaunchConfiguration("max_dead_reckoning_s"),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
])
|
||||||
31
jetson/ros2_ws/src/saltybot_uwb_imu_fusion/package.xml
Normal file
31
jetson/ros2_ws/src/saltybot_uwb_imu_fusion/package.xml
Normal file
@ -0,0 +1,31 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_uwb_imu_fusion</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
EKF-based UWB + IMU sensor fusion for SaltyBot indoor localization (Issue #573).
|
||||||
|
Fuses UWB position at 10 Hz with IMU accel+gyro at 200 Hz.
|
||||||
|
Handles UWB dropouts via IMU dead-reckoning.
|
||||||
|
Publishes /saltybot/pose/fused and /saltybot/pose/fused_cov.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-uwb@saltylab.local">sl-uwb</maintainer>
|
||||||
|
<license>Apache-2.0</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
<depend>tf2_ros</depend>
|
||||||
|
<depend>saltybot_uwb_msgs</depend>
|
||||||
|
|
||||||
|
<exec_depend>python3-numpy</exec_depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,266 @@
|
|||||||
|
"""
|
||||||
|
ekf_math.py — Extended Kalman Filter for UWB + IMU fusion (Issue #573)
|
||||||
|
|
||||||
|
No ROS2 dependencies — pure Python + numpy, fully unit-testable.
|
||||||
|
|
||||||
|
State vector (6-DOF ground robot):
|
||||||
|
───────────────────────────────────
|
||||||
|
x = [x, y, θ, vx, vy, ω]ᵀ (world frame)
|
||||||
|
x — forward position (m)
|
||||||
|
y — lateral position (m)
|
||||||
|
θ — heading (rad, CCW positive)
|
||||||
|
vx — longitudinal velocity (m/s, world frame)
|
||||||
|
vy — lateral velocity (m/s, world frame)
|
||||||
|
ω — yaw rate (rad/s)
|
||||||
|
|
||||||
|
Process model — IMU as control input u = [ax_body, ay_body, ω_imu]
|
||||||
|
───────────────────────────────────────────────────────────────────
|
||||||
|
θ_k+1 = θ_k + ω_imu * dt
|
||||||
|
vx_k+1 = vx_k + (ax_body * cos(θ) - ay_body * sin(θ)) * dt
|
||||||
|
vy_k+1 = vy_k + (ax_body * sin(θ) + ay_body * cos(θ)) * dt
|
||||||
|
x_k+1 = x_k + vx_k * dt
|
||||||
|
y_k+1 = y_k + vy_k * dt
|
||||||
|
ω_k+1 = ω_imu (direct observation, no integration)
|
||||||
|
|
||||||
|
UWB measurement model (position observation):
|
||||||
|
──────────────────────────────────────────────
|
||||||
|
z = [x, y]ᵀ H = [[1,0,0,0,0,0],
|
||||||
|
[0,1,0,0,0,0]]
|
||||||
|
|
||||||
|
IMU bias handling:
|
||||||
|
──────────────────
|
||||||
|
Simple first-order high-pass on accel to reduce drift.
|
||||||
|
For longer deployments extend state to include accel bias.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
import numpy as np
|
||||||
|
from typing import Tuple
|
||||||
|
|
||||||
|
# State dimension
|
||||||
|
N = 6 # [x, y, θ, vx, vy, ω]
|
||||||
|
|
||||||
|
# Indices
|
||||||
|
IX, IY, IT, IVX, IVY, IW = range(N)
|
||||||
|
|
||||||
|
|
||||||
|
class UwbImuEKF:
|
||||||
|
"""
|
||||||
|
EKF fusing UWB position (10 Hz) with IMU accel+gyro (200 Hz).
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
sigma_uwb_pos_m : UWB position std-dev (m) default 0.12
|
||||||
|
sigma_imu_accel : IMU accelerometer noise (m/s²) default 0.05
|
||||||
|
sigma_imu_gyro : IMU gyroscope noise (rad/s) default 0.003
|
||||||
|
sigma_vel_drift : velocity drift process noise (m/s) default 0.1
|
||||||
|
dropout_vel_damping : velocity damping per second during UWB dropout
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
sigma_uwb_pos_m: float = 0.12,
|
||||||
|
sigma_imu_accel: float = 0.05,
|
||||||
|
sigma_imu_gyro: float = 0.003,
|
||||||
|
sigma_vel_drift: float = 0.10,
|
||||||
|
dropout_vel_damping: float = 0.95,
|
||||||
|
) -> None:
|
||||||
|
self._R_uwb = sigma_uwb_pos_m ** 2 # UWB position variance
|
||||||
|
self._q_a = sigma_imu_accel ** 2 # accel noise variance
|
||||||
|
self._q_g = sigma_imu_gyro ** 2 # gyro noise variance
|
||||||
|
self._q_v = sigma_vel_drift ** 2 # velocity drift variance
|
||||||
|
self._damp = dropout_vel_damping
|
||||||
|
|
||||||
|
# State and covariance
|
||||||
|
self._x = np.zeros(N, dtype=float)
|
||||||
|
self._P = np.eye(N, dtype=float) * 9.0 # large initial uncertainty
|
||||||
|
|
||||||
|
# Accel bias (simple running mean for high-pass)
|
||||||
|
self._accel_bias = np.zeros(2, dtype=float)
|
||||||
|
self._bias_alpha = 0.005 # low-pass coefficient for bias estimation
|
||||||
|
|
||||||
|
self._initialised = False
|
||||||
|
self._uwb_dropout_s = 0.0 # time since last UWB update
|
||||||
|
|
||||||
|
# ── Initialise ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def initialise(self, x: float, y: float, heading_rad: float = 0.0) -> None:
|
||||||
|
"""Seed the filter with a known position."""
|
||||||
|
self._x[:] = 0.0
|
||||||
|
self._x[IX] = x
|
||||||
|
self._x[IY] = y
|
||||||
|
self._x[IT] = heading_rad
|
||||||
|
self._P = np.diag([0.25, 0.25, 0.1, 1.0, 1.0, 0.1])
|
||||||
|
self._initialised = True
|
||||||
|
|
||||||
|
# ── IMU predict ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def predict(self, ax_body: float, ay_body: float, omega: float, dt: float) -> None:
|
||||||
|
"""
|
||||||
|
EKF predict step driven by IMU measurement.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
ax_body : forward acceleration in body frame (m/s²)
|
||||||
|
ay_body : lateral acceleration in body frame (m/s², left positive)
|
||||||
|
omega : yaw rate (rad/s, CCW positive)
|
||||||
|
dt : time step (s)
|
||||||
|
"""
|
||||||
|
if not self._initialised:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Subtract estimated accel bias
|
||||||
|
ax_c = ax_body - self._accel_bias[0]
|
||||||
|
ay_c = ay_body - self._accel_bias[1]
|
||||||
|
|
||||||
|
x = self._x
|
||||||
|
th = x[IT]
|
||||||
|
ct = math.cos(th)
|
||||||
|
st = math.sin(th)
|
||||||
|
|
||||||
|
# World-frame acceleration
|
||||||
|
ax_w = ax_c * ct - ay_c * st
|
||||||
|
ay_w = ax_c * st + ay_c * ct
|
||||||
|
|
||||||
|
# State prediction
|
||||||
|
xp = x.copy()
|
||||||
|
xp[IX] = x[IX] + x[IVX] * dt
|
||||||
|
xp[IY] = x[IY] + x[IVY] * dt
|
||||||
|
xp[IT] = _wrap_angle(x[IT] + omega * dt)
|
||||||
|
xp[IVX] = x[IVX] + ax_w * dt
|
||||||
|
xp[IVY] = x[IVY] + ay_w * dt
|
||||||
|
xp[IW] = omega # direct observation from gyro
|
||||||
|
|
||||||
|
# Jacobian F = ∂f/∂x
|
||||||
|
F = np.eye(N, dtype=float)
|
||||||
|
# ∂x / ∂vx, ∂x / ∂vy
|
||||||
|
F[IX, IVX] = dt
|
||||||
|
F[IY, IVY] = dt
|
||||||
|
# ∂vx / ∂θ = (-ax_c·sin(θ) - ay_c·cos(θ)) * dt
|
||||||
|
F[IVX, IT] = (-ax_c * st - ay_c * ct) * dt
|
||||||
|
# ∂vy / ∂θ = ( ax_c·cos(θ) - ay_c·sin(θ)) * dt
|
||||||
|
F[IVY, IT] = ( ax_c * ct - ay_c * st) * dt
|
||||||
|
|
||||||
|
# Process noise Q
|
||||||
|
dt2 = dt * dt
|
||||||
|
Q = np.diag([
|
||||||
|
0.5 * self._q_a * dt2 * dt2, # x: from accel double-integrated
|
||||||
|
0.5 * self._q_a * dt2 * dt2, # y
|
||||||
|
self._q_g * dt2, # θ: from gyro integrated
|
||||||
|
self._q_v * dt + self._q_a * dt2, # vx
|
||||||
|
self._q_v * dt + self._q_a * dt2, # vy
|
||||||
|
self._q_g, # ω: direct gyro noise
|
||||||
|
])
|
||||||
|
|
||||||
|
self._x = xp
|
||||||
|
self._P = F @ self._P @ F.T + Q
|
||||||
|
self._uwb_dropout_s += dt
|
||||||
|
|
||||||
|
# Velocity damping during extended UWB dropout (dead-reckoning coasting)
|
||||||
|
if self._uwb_dropout_s > 2.0:
|
||||||
|
damping = self._damp ** dt
|
||||||
|
self._x[IVX] *= damping
|
||||||
|
self._x[IVY] *= damping
|
||||||
|
|
||||||
|
# Update accel bias estimate
|
||||||
|
self._accel_bias[0] += self._bias_alpha * (ax_body - self._accel_bias[0])
|
||||||
|
self._accel_bias[1] += self._bias_alpha * (ay_body - self._accel_bias[1])
|
||||||
|
|
||||||
|
# ── UWB update ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def update_uwb(self, x_meas: float, y_meas: float,
|
||||||
|
sigma_m: float | None = None) -> None:
|
||||||
|
"""
|
||||||
|
EKF update step with a UWB position measurement.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
x_meas, y_meas : measured position (m, world frame)
|
||||||
|
sigma_m : override measurement std-dev (m); uses node default if None
|
||||||
|
"""
|
||||||
|
if not self._initialised:
|
||||||
|
self.initialise(x_meas, y_meas)
|
||||||
|
return
|
||||||
|
|
||||||
|
R_val = (sigma_m ** 2) if sigma_m is not None else self._R_uwb
|
||||||
|
R = np.eye(2) * R_val
|
||||||
|
|
||||||
|
# H: position rows only
|
||||||
|
H = np.zeros((2, N))
|
||||||
|
H[0, IX] = 1.0
|
||||||
|
H[1, IY] = 1.0
|
||||||
|
|
||||||
|
innov = np.array([x_meas - self._x[IX], y_meas - self._x[IY]])
|
||||||
|
S = H @ self._P @ H.T + R
|
||||||
|
K = self._P @ H.T @ np.linalg.inv(S)
|
||||||
|
|
||||||
|
self._x = self._x + K @ innov
|
||||||
|
self._x[IT] = _wrap_angle(self._x[IT])
|
||||||
|
self._P = (np.eye(N) - K @ H) @ self._P
|
||||||
|
|
||||||
|
self._uwb_dropout_s = 0.0
|
||||||
|
|
||||||
|
# ── Update heading from magnetometer / other source ───────────────────────
|
||||||
|
|
||||||
|
def update_heading(self, heading_rad: float, sigma_rad: float = 0.1) -> None:
|
||||||
|
"""Optional: update θ directly from compass or visual odometry."""
|
||||||
|
if not self._initialised:
|
||||||
|
return
|
||||||
|
H = np.zeros((1, N))
|
||||||
|
H[0, IT] = 1.0
|
||||||
|
innov = np.array([_wrap_angle(heading_rad - self._x[IT])])
|
||||||
|
S = H @ self._P @ H.T + np.array([[sigma_rad ** 2]])
|
||||||
|
K = self._P @ H.T @ np.linalg.inv(S)
|
||||||
|
self._x = self._x + K.flatten() * innov[0]
|
||||||
|
self._x[IT] = _wrap_angle(self._x[IT])
|
||||||
|
self._P = (np.eye(N) - K @ H) @ self._P
|
||||||
|
|
||||||
|
# ── Accessors ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@property
|
||||||
|
def position(self) -> Tuple[float, float]:
|
||||||
|
return float(self._x[IX]), float(self._x[IY])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def heading(self) -> float:
|
||||||
|
return float(self._x[IT])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def velocity(self) -> Tuple[float, float]:
|
||||||
|
return float(self._x[IVX]), float(self._x[IVY])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def yaw_rate(self) -> float:
|
||||||
|
return float(self._x[IW])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def covariance_position(self) -> np.ndarray:
|
||||||
|
"""2×2 position covariance sub-matrix."""
|
||||||
|
return self._P[:2, :2].copy()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def covariance_full(self) -> np.ndarray:
|
||||||
|
"""Full 6×6 state covariance."""
|
||||||
|
return self._P.copy()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def is_initialised(self) -> bool:
|
||||||
|
return self._initialised
|
||||||
|
|
||||||
|
@property
|
||||||
|
def uwb_dropout_s(self) -> float:
|
||||||
|
"""Seconds since last UWB update."""
|
||||||
|
return self._uwb_dropout_s
|
||||||
|
|
||||||
|
def position_uncertainty_m(self) -> float:
|
||||||
|
"""Approximate 1-σ position uncertainty (m)."""
|
||||||
|
return float(math.sqrt((self._P[IX, IX] + self._P[IY, IY]) / 2.0))
|
||||||
|
|
||||||
|
|
||||||
|
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _wrap_angle(a: float) -> float:
|
||||||
|
"""Wrap angle to [-π, π]."""
|
||||||
|
return float((a + math.pi) % (2 * math.pi) - math.pi)
|
||||||
@ -0,0 +1,268 @@
|
|||||||
|
"""
|
||||||
|
ekf_node.py — ROS2 node: UWB + IMU EKF fusion (Issue #573)
|
||||||
|
|
||||||
|
Fuses:
|
||||||
|
/imu/data (sensor_msgs/Imu, 200 Hz) — predict step
|
||||||
|
/uwb/bearing (UwbBearing, 10 Hz) — update step (bearing+range → x,y)
|
||||||
|
/saltybot/uwb/pose (geometry_msgs/PoseStamped, 10 Hz) — update step (absolute position)
|
||||||
|
|
||||||
|
Publishes:
|
||||||
|
/saltybot/pose/fused (geometry_msgs/PoseStamped) — fused pose at IMU rate
|
||||||
|
/saltybot/pose/fused_cov (geometry_msgs/PoseWithCovarianceStamped) — with covariance
|
||||||
|
|
||||||
|
TF2:
|
||||||
|
Broadcasts base_link → map from fused pose
|
||||||
|
|
||||||
|
UWB dropout:
|
||||||
|
IMU dead-reckoning continues for up to `max_dead_reckoning_s` seconds.
|
||||||
|
After that the velocity estimate is zeroed; position uncertainty grows.
|
||||||
|
|
||||||
|
Parameters (see config/fusion_params.yaml):
|
||||||
|
imu_topic /imu/data
|
||||||
|
uwb_bearing_topic /uwb/bearing
|
||||||
|
uwb_pose_topic /saltybot/uwb/pose (preferred if available)
|
||||||
|
use_uwb_pose true — use absolute pose; false = use bearing only
|
||||||
|
publish_tf true
|
||||||
|
map_frame map
|
||||||
|
base_frame base_link
|
||||||
|
sigma_uwb_pos_m 0.12
|
||||||
|
sigma_imu_accel 0.05
|
||||||
|
sigma_imu_gyro 0.003
|
||||||
|
sigma_vel_drift 0.10
|
||||||
|
max_dead_reckoning_s 10.0
|
||||||
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
|
import time
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import (
|
||||||
|
QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
|
||||||
|
)
|
||||||
|
|
||||||
|
import tf2_ros
|
||||||
|
from geometry_msgs.msg import (
|
||||||
|
PoseStamped, PoseWithCovarianceStamped, TransformStamped
|
||||||
|
)
|
||||||
|
from sensor_msgs.msg import Imu
|
||||||
|
from std_msgs.msg import Header
|
||||||
|
|
||||||
|
from saltybot_uwb_msgs.msg import UwbBearing
|
||||||
|
from saltybot_uwb_imu_fusion.ekf_math import UwbImuEKF
|
||||||
|
|
||||||
|
|
||||||
|
_SENSOR_QOS = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=5,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class EkfFusionNode(Node):
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__("uwb_imu_fusion")
|
||||||
|
|
||||||
|
# ── Parameters ────────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter("imu_topic", "/imu/data")
|
||||||
|
self.declare_parameter("uwb_bearing_topic", "/uwb/bearing")
|
||||||
|
self.declare_parameter("uwb_pose_topic", "/saltybot/uwb/pose")
|
||||||
|
self.declare_parameter("use_uwb_pose", True)
|
||||||
|
self.declare_parameter("publish_tf", True)
|
||||||
|
self.declare_parameter("map_frame", "map")
|
||||||
|
self.declare_parameter("base_frame", "base_link")
|
||||||
|
self.declare_parameter("sigma_uwb_pos_m", 0.12)
|
||||||
|
self.declare_parameter("sigma_imu_accel", 0.05)
|
||||||
|
self.declare_parameter("sigma_imu_gyro", 0.003)
|
||||||
|
self.declare_parameter("sigma_vel_drift", 0.10)
|
||||||
|
self.declare_parameter("dropout_vel_damping", 0.95)
|
||||||
|
self.declare_parameter("max_dead_reckoning_s", 10.0)
|
||||||
|
|
||||||
|
self._map_frame = self.get_parameter("map_frame").value
|
||||||
|
self._base_frame = self.get_parameter("base_frame").value
|
||||||
|
self._publish_tf = self.get_parameter("publish_tf").value
|
||||||
|
self._use_uwb_pose = self.get_parameter("use_uwb_pose").value
|
||||||
|
self._max_dr = self.get_parameter("max_dead_reckoning_s").value
|
||||||
|
|
||||||
|
# ── EKF ───────────────────────────────────────────────────────────────
|
||||||
|
self._ekf = UwbImuEKF(
|
||||||
|
sigma_uwb_pos_m = self.get_parameter("sigma_uwb_pos_m").value,
|
||||||
|
sigma_imu_accel = self.get_parameter("sigma_imu_accel").value,
|
||||||
|
sigma_imu_gyro = self.get_parameter("sigma_imu_gyro").value,
|
||||||
|
sigma_vel_drift = self.get_parameter("sigma_vel_drift").value,
|
||||||
|
dropout_vel_damping = self.get_parameter("dropout_vel_damping").value,
|
||||||
|
)
|
||||||
|
|
||||||
|
self._last_imu_t: float | None = None
|
||||||
|
|
||||||
|
# ── Publishers ────────────────────────────────────────────────────────
|
||||||
|
self._pose_pub = self.create_publisher(PoseStamped, "/saltybot/pose/fused", 10)
|
||||||
|
self._pose_cov_pub = self.create_publisher(PoseWithCovarianceStamped, "/saltybot/pose/fused_cov", 10)
|
||||||
|
|
||||||
|
if self._publish_tf:
|
||||||
|
self._tf_br = tf2_ros.TransformBroadcaster(self)
|
||||||
|
else:
|
||||||
|
self._tf_br = None
|
||||||
|
|
||||||
|
# ── Subscriptions ─────────────────────────────────────────────────────
|
||||||
|
self.create_subscription(
|
||||||
|
Imu, self.get_parameter("imu_topic").value,
|
||||||
|
self._imu_cb, _SENSOR_QOS
|
||||||
|
)
|
||||||
|
|
||||||
|
if self._use_uwb_pose:
|
||||||
|
self.create_subscription(
|
||||||
|
PoseStamped,
|
||||||
|
self.get_parameter("uwb_pose_topic").value,
|
||||||
|
self._uwb_pose_cb, 10
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.create_subscription(
|
||||||
|
UwbBearing,
|
||||||
|
self.get_parameter("uwb_bearing_topic").value,
|
||||||
|
self._uwb_bearing_cb, 10
|
||||||
|
)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"EKF fusion ready — "
|
||||||
|
f"IMU:{self.get_parameter('imu_topic').value} "
|
||||||
|
f"UWB:{'pose' if self._use_uwb_pose else 'bearing'} "
|
||||||
|
f"tf={self._publish_tf} "
|
||||||
|
f"dr={self._max_dr}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── IMU callback (200 Hz) — predict step ──────────────────────────────────
|
||||||
|
|
||||||
|
def _imu_cb(self, msg: Imu) -> None:
|
||||||
|
now = self.get_clock().now().nanoseconds * 1e-9
|
||||||
|
if self._last_imu_t is None:
|
||||||
|
self._last_imu_t = now
|
||||||
|
return
|
||||||
|
|
||||||
|
dt = now - self._last_imu_t
|
||||||
|
self._last_imu_t = now
|
||||||
|
|
||||||
|
if dt <= 0.0 or dt > 0.5:
|
||||||
|
return # stale or implausible
|
||||||
|
|
||||||
|
# Extract IMU data (body frame, ROS convention: x=forward, y=left, z=up)
|
||||||
|
ax = msg.linear_acceleration.x
|
||||||
|
ay = msg.linear_acceleration.y
|
||||||
|
# az = msg.linear_acceleration.z # gravity along z, ignored for ground robot
|
||||||
|
omega = msg.angular_velocity.z # yaw rate
|
||||||
|
|
||||||
|
self._ekf.predict(ax_body=ax, ay_body=ay, omega=omega, dt=dt)
|
||||||
|
|
||||||
|
# Publish fused pose at IMU rate if filter is alive
|
||||||
|
if self._ekf.is_initialised:
|
||||||
|
if self._ekf.uwb_dropout_s < self._max_dr:
|
||||||
|
self._publish_pose(msg.header.stamp)
|
||||||
|
else:
|
||||||
|
# Dropout too long — stop publishing, log warning
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"UWB dropout {self._ekf.uwb_dropout_s:.1f}s — "
|
||||||
|
"pose unreliable, output suppressed",
|
||||||
|
throttle_duration_sec=5.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── UWB pose callback (absolute position, 10 Hz) — update step ───────────
|
||||||
|
|
||||||
|
def _uwb_pose_cb(self, msg: PoseStamped) -> None:
|
||||||
|
x = msg.pose.position.x
|
||||||
|
y = msg.pose.position.y
|
||||||
|
self._ekf.update_uwb(x, y)
|
||||||
|
|
||||||
|
if not self._ekf.is_initialised:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Extract heading from quaternion if available
|
||||||
|
q = msg.pose.orientation
|
||||||
|
if abs(q.w) > 0.01:
|
||||||
|
yaw = 2.0 * math.atan2(q.z, q.w)
|
||||||
|
self._ekf.update_heading(yaw, sigma_rad=0.2)
|
||||||
|
|
||||||
|
# ── UWB bearing callback (relative, 10 Hz) — update step ─────────────────
|
||||||
|
|
||||||
|
def _uwb_bearing_cb(self, msg: UwbBearing) -> None:
|
||||||
|
r = float(msg.range_m)
|
||||||
|
brg = float(msg.bearing_rad)
|
||||||
|
if r < 0.1:
|
||||||
|
return
|
||||||
|
# Convert polar (bearing from base_link) to absolute position estimate
|
||||||
|
# This is relative to robot, so add to current robot position to get world coords.
|
||||||
|
# Note: if we don't have absolute anchors, this stays in base_link frame.
|
||||||
|
# Use it as a position measurement in base_link (x = forward, y = left).
|
||||||
|
x_rel = r * math.cos(brg)
|
||||||
|
y_rel = r * math.sin(brg)
|
||||||
|
# Inflate sigma for single-anchor degraded mode
|
||||||
|
sigma = 0.15 if msg.confidence >= 1.0 else 0.30
|
||||||
|
self._ekf.update_uwb(x_rel, y_rel, sigma_m=sigma)
|
||||||
|
|
||||||
|
# ── Publish fused pose ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _publish_pose(self, stamp) -> None:
|
||||||
|
x, y = self._ekf.position
|
||||||
|
heading = self._ekf.heading
|
||||||
|
cov_pos = self._ekf.covariance_position
|
||||||
|
cov_full = self._ekf.covariance_full
|
||||||
|
|
||||||
|
half_yaw = heading / 2.0
|
||||||
|
qz = math.sin(half_yaw)
|
||||||
|
qw = math.cos(half_yaw)
|
||||||
|
|
||||||
|
# PoseStamped
|
||||||
|
pose = PoseStamped()
|
||||||
|
pose.header.stamp = stamp
|
||||||
|
pose.header.frame_id = self._map_frame
|
||||||
|
pose.pose.position.x = x
|
||||||
|
pose.pose.position.y = y
|
||||||
|
pose.pose.position.z = 0.0
|
||||||
|
pose.pose.orientation.z = qz
|
||||||
|
pose.pose.orientation.w = qw
|
||||||
|
self._pose_pub.publish(pose)
|
||||||
|
|
||||||
|
# PoseWithCovarianceStamped
|
||||||
|
pose_cov = PoseWithCovarianceStamped()
|
||||||
|
pose_cov.header = pose.header
|
||||||
|
pose_cov.pose.pose = pose.pose
|
||||||
|
cov36 = [0.0] * 36
|
||||||
|
cov36[0] = cov_pos[0, 0] # x,x
|
||||||
|
cov36[1] = cov_pos[0, 1] # x,y
|
||||||
|
cov36[6] = cov_pos[1, 0] # y,x
|
||||||
|
cov36[7] = cov_pos[1, 1] # y,y
|
||||||
|
cov36[14] = 1e-4 # z (not estimated)
|
||||||
|
cov36[21] = 1e-4 # roll
|
||||||
|
cov36[28] = 1e-4 # pitch
|
||||||
|
cov36[35] = float(cov_full[2, 2]) # yaw
|
||||||
|
pose_cov.pose.covariance = cov36
|
||||||
|
self._pose_cov_pub.publish(pose_cov)
|
||||||
|
|
||||||
|
# TF2
|
||||||
|
if self._tf_br is not None:
|
||||||
|
tf = TransformStamped()
|
||||||
|
tf.header.stamp = stamp
|
||||||
|
tf.header.frame_id = self._map_frame
|
||||||
|
tf.child_frame_id = self._base_frame
|
||||||
|
tf.transform.translation.x = x
|
||||||
|
tf.transform.translation.y = y
|
||||||
|
tf.transform.translation.z = 0.0
|
||||||
|
tf.transform.rotation.z = qz
|
||||||
|
tf.transform.rotation.w = qw
|
||||||
|
self._tf_br.sendTransform(tf)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None) -> None:
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = EkfFusionNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.try_shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
4
jetson/ros2_ws/src/saltybot_uwb_imu_fusion/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_uwb_imu_fusion/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_uwb_imu_fusion
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_uwb_imu_fusion
|
||||||
29
jetson/ros2_ws/src/saltybot_uwb_imu_fusion/setup.py
Normal file
29
jetson/ros2_ws/src/saltybot_uwb_imu_fusion/setup.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
import os
|
||||||
|
from glob import glob
|
||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
package_name = "saltybot_uwb_imu_fusion"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=[package_name],
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages",
|
||||||
|
[f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(os.path.join("share", package_name, "launch"), glob("launch/*.py")),
|
||||||
|
(os.path.join("share", package_name, "config"), glob("config/*.yaml")),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-uwb",
|
||||||
|
maintainer_email="sl-uwb@saltylab.local",
|
||||||
|
description="EKF UWB+IMU fusion for SaltyBot localization",
|
||||||
|
license="Apache-2.0",
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
"uwb_imu_fusion = saltybot_uwb_imu_fusion.ekf_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
152
jetson/ros2_ws/src/saltybot_uwb_imu_fusion/test/test_ekf_math.py
Normal file
152
jetson/ros2_ws/src/saltybot_uwb_imu_fusion/test/test_ekf_math.py
Normal file
@ -0,0 +1,152 @@
|
|||||||
|
"""
|
||||||
|
Unit tests for saltybot_uwb_imu_fusion.ekf_math (Issue #573).
|
||||||
|
No ROS2 or hardware required.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||||
|
from saltybot_uwb_imu_fusion.ekf_math import UwbImuEKF, _wrap_angle
|
||||||
|
|
||||||
|
|
||||||
|
class TestWrapAngle:
|
||||||
|
def test_within_range(self):
|
||||||
|
assert abs(_wrap_angle(1.0) - 1.0) < 1e-9
|
||||||
|
|
||||||
|
def test_positive_overflow(self):
|
||||||
|
assert abs(_wrap_angle(math.pi + 0.1) - (-math.pi + 0.1)) < 1e-9
|
||||||
|
|
||||||
|
def test_negative_overflow(self):
|
||||||
|
assert abs(_wrap_angle(-math.pi - 0.1) - (math.pi - 0.1)) < 1e-9
|
||||||
|
|
||||||
|
|
||||||
|
class TestEkfInitialise:
|
||||||
|
def test_not_initialised_by_default(self):
|
||||||
|
ekf = UwbImuEKF()
|
||||||
|
assert not ekf.is_initialised
|
||||||
|
|
||||||
|
def test_initialise_sets_position(self):
|
||||||
|
ekf = UwbImuEKF()
|
||||||
|
ekf.initialise(3.0, 4.0, heading_rad=0.5)
|
||||||
|
assert ekf.is_initialised
|
||||||
|
x, y = ekf.position
|
||||||
|
assert abs(x - 3.0) < 1e-9
|
||||||
|
assert abs(y - 4.0) < 1e-9
|
||||||
|
assert abs(ekf.heading - 0.5) < 1e-9
|
||||||
|
|
||||||
|
|
||||||
|
class TestEkfPredict:
|
||||||
|
def test_stationary_predict_no_drift(self):
|
||||||
|
"""Stationary robot with zero IMU input stays near origin."""
|
||||||
|
ekf = UwbImuEKF(sigma_vel_drift=0.0)
|
||||||
|
ekf.initialise(0.0, 0.0)
|
||||||
|
for _ in range(10):
|
||||||
|
ekf.predict(ax_body=0.0, ay_body=0.0, omega=0.0, dt=0.01)
|
||||||
|
x, y = ekf.position
|
||||||
|
assert abs(x) < 0.01
|
||||||
|
assert abs(y) < 0.01
|
||||||
|
|
||||||
|
def test_forward_acceleration(self):
|
||||||
|
"""1 m/s² forward for 0.5 s → ~0.125 m forward displacement."""
|
||||||
|
ekf = UwbImuEKF(sigma_imu_accel=0.0, sigma_vel_drift=0.0)
|
||||||
|
ekf.initialise(0.0, 0.0, heading_rad=0.0)
|
||||||
|
dt = 0.005
|
||||||
|
for _ in range(100): # 0.5 s
|
||||||
|
ekf.predict(ax_body=1.0, ay_body=0.0, omega=0.0, dt=dt)
|
||||||
|
x, y = ekf.position
|
||||||
|
# x ≈ 0.5 * 1 * 0.5² = 0.125 m
|
||||||
|
assert 0.10 < x < 0.16, f"x={x}"
|
||||||
|
assert abs(y) < 0.01
|
||||||
|
|
||||||
|
def test_yaw_rate(self):
|
||||||
|
"""π/4 rad/s for 1 s → heading ≈ π/4."""
|
||||||
|
ekf = UwbImuEKF(sigma_imu_gyro=0.0)
|
||||||
|
ekf.initialise(0.0, 0.0, heading_rad=0.0)
|
||||||
|
dt = 0.005
|
||||||
|
for _ in range(200):
|
||||||
|
ekf.predict(ax_body=0.0, ay_body=0.0, omega=math.pi / 4, dt=dt)
|
||||||
|
assert abs(ekf.heading - math.pi / 4) < 0.05
|
||||||
|
|
||||||
|
def test_covariance_grows_with_time(self):
|
||||||
|
"""Covariance must grow during predict (no updates)."""
|
||||||
|
ekf = UwbImuEKF()
|
||||||
|
ekf.initialise(0.0, 0.0)
|
||||||
|
p0 = float(ekf.covariance_position[0, 0])
|
||||||
|
for _ in range(50):
|
||||||
|
ekf.predict(ax_body=0.0, ay_body=0.0, omega=0.0, dt=0.01)
|
||||||
|
p1 = float(ekf.covariance_position[0, 0])
|
||||||
|
assert p1 > p0, f"covariance did not grow: p0={p0}, p1={p1}"
|
||||||
|
|
||||||
|
|
||||||
|
class TestEkfUpdate:
|
||||||
|
def test_uwb_update_reduces_covariance(self):
|
||||||
|
"""UWB update must reduce position covariance."""
|
||||||
|
ekf = UwbImuEKF()
|
||||||
|
ekf.initialise(0.0, 0.0)
|
||||||
|
# Grow uncertainty first
|
||||||
|
for _ in range(20):
|
||||||
|
ekf.predict(ax_body=0.0, ay_body=0.0, omega=0.0, dt=0.05)
|
||||||
|
p_before = float(ekf.covariance_position[0, 0])
|
||||||
|
ekf.update_uwb(0.1, 0.1)
|
||||||
|
p_after = float(ekf.covariance_position[0, 0])
|
||||||
|
assert p_after < p_before
|
||||||
|
|
||||||
|
def test_uwb_corrects_position(self):
|
||||||
|
"""Large position error corrected by UWB measurement."""
|
||||||
|
ekf = UwbImuEKF(sigma_uwb_pos_m=0.1)
|
||||||
|
ekf.initialise(5.0, 5.0)
|
||||||
|
# Multiple UWB updates at true position (0,0)
|
||||||
|
for _ in range(20):
|
||||||
|
ekf.update_uwb(0.0, 0.0)
|
||||||
|
x, y = ekf.position
|
||||||
|
assert abs(x) < 0.5, f"x not corrected: {x}"
|
||||||
|
assert abs(y) < 0.5, f"y not corrected: {y}"
|
||||||
|
|
||||||
|
def test_uninitialised_update_initialises(self):
|
||||||
|
"""Calling update_uwb before initialise should initialise filter."""
|
||||||
|
ekf = UwbImuEKF()
|
||||||
|
assert not ekf.is_initialised
|
||||||
|
ekf.update_uwb(2.0, 3.0)
|
||||||
|
assert ekf.is_initialised
|
||||||
|
x, y = ekf.position
|
||||||
|
assert abs(x - 2.0) < 1e-9
|
||||||
|
assert abs(y - 3.0) < 1e-9
|
||||||
|
|
||||||
|
|
||||||
|
class TestDropout:
|
||||||
|
def test_velocity_damping_during_dropout(self):
|
||||||
|
"""Velocity decays during extended UWB dropout."""
|
||||||
|
ekf = UwbImuEKF(dropout_vel_damping=0.9)
|
||||||
|
ekf.initialise(0.0, 0.0)
|
||||||
|
# Give it some velocity
|
||||||
|
for _ in range(50):
|
||||||
|
ekf.predict(ax_body=2.0, ay_body=0.0, omega=0.0, dt=0.01)
|
||||||
|
vx_before, _ = ekf.velocity
|
||||||
|
# Let dropout accumulate (> 2 s threshold)
|
||||||
|
for _ in range(300):
|
||||||
|
ekf.predict(ax_body=0.0, ay_body=0.0, omega=0.0, dt=0.01)
|
||||||
|
vx_after, _ = ekf.velocity
|
||||||
|
assert abs(vx_after) < abs(vx_before), \
|
||||||
|
f"velocity not damped: before={vx_before:.3f}, after={vx_after:.3f}"
|
||||||
|
|
||||||
|
|
||||||
|
class TestCovariance:
|
||||||
|
def test_covariance_positive_definite(self):
|
||||||
|
"""Full covariance matrix must be positive definite at all times."""
|
||||||
|
ekf = UwbImuEKF()
|
||||||
|
ekf.initialise(1.0, 2.0)
|
||||||
|
for _ in range(20):
|
||||||
|
ekf.predict(ax_body=0.5, ay_body=0.1, omega=0.05, dt=0.01)
|
||||||
|
if _ % 5 == 0:
|
||||||
|
ekf.update_uwb(1.0, 2.0)
|
||||||
|
eigvals = np.linalg.eigvalsh(ekf.covariance_full)
|
||||||
|
assert all(ev > -1e-9 for ev in eigvals), f"Non-PD covariance: {eigvals}"
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
pytest.main([__file__, "-v"])
|
||||||
Loading…
x
Reference in New Issue
Block a user