diff --git a/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/config/fusion_params.yaml b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/config/fusion_params.yaml new file mode 100644 index 0000000..45bc423 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/config/fusion_params.yaml @@ -0,0 +1,26 @@ +uwb_imu_fusion: + ros__parameters: + # ── Topics ──────────────────────────────────────────────────────────────── + imu_topic: /imu/data + uwb_bearing_topic: /uwb/bearing + uwb_pose_topic: /saltybot/uwb/pose + use_uwb_pose: true # true = use absolute /saltybot/uwb/pose + # false = use relative /uwb/bearing + + # ── TF ──────────────────────────────────────────────────────────────────── + publish_tf: true + map_frame: map + base_frame: base_link + + # ── EKF noise parameters ────────────────────────────────────────────────── + # Increase sigma_uwb_pos_m if UWB is noisy (multipath, few anchors) + # Increase sigma_imu_* if IMU has high vibration noise + sigma_uwb_pos_m: 0.12 # UWB position std-dev (m) — DW3000 ±10 cm + geometry + sigma_imu_accel: 0.05 # IMU accel noise (m/s²) + sigma_imu_gyro: 0.003 # IMU gyro noise (rad/s) + sigma_vel_drift: 0.10 # velocity drift process noise (m/s) + dropout_vel_damping: 0.95 # velocity decay factor per second during dropout + + # ── Dropout ─────────────────────────────────────────────────────────────── + max_dead_reckoning_s: 10.0 # suppress fused pose output after this many + # seconds without a UWB update diff --git a/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/launch/uwb_imu_fusion.launch.py b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/launch/uwb_imu_fusion.launch.py new file mode 100644 index 0000000..9524b4d --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/launch/uwb_imu_fusion.launch.py @@ -0,0 +1,43 @@ +""" +uwb_imu_fusion.launch.py — Launch UWB-IMU EKF fusion node (Issue #573) + +Usage: + ros2 launch saltybot_uwb_imu_fusion uwb_imu_fusion.launch.py + ros2 launch saltybot_uwb_imu_fusion uwb_imu_fusion.launch.py publish_tf:=false + ros2 launch saltybot_uwb_imu_fusion uwb_imu_fusion.launch.py sigma_uwb_pos_m:=0.20 +""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description(): + pkg = get_package_share_directory("saltybot_uwb_imu_fusion") + cfg = os.path.join(pkg, "config", "fusion_params.yaml") + + return LaunchDescription([ + DeclareLaunchArgument("publish_tf", default_value="true"), + DeclareLaunchArgument("use_uwb_pose", default_value="true"), + DeclareLaunchArgument("sigma_uwb_pos_m", default_value="0.12"), + DeclareLaunchArgument("max_dead_reckoning_s", default_value="10.0"), + + Node( + package="saltybot_uwb_imu_fusion", + executable="uwb_imu_fusion", + name="uwb_imu_fusion", + output="screen", + parameters=[ + cfg, + { + "publish_tf": LaunchConfiguration("publish_tf"), + "use_uwb_pose": LaunchConfiguration("use_uwb_pose"), + "sigma_uwb_pos_m": LaunchConfiguration("sigma_uwb_pos_m"), + "max_dead_reckoning_s": LaunchConfiguration("max_dead_reckoning_s"), + }, + ], + ), + ]) diff --git a/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/package.xml b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/package.xml new file mode 100644 index 0000000..3aa88f5 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/package.xml @@ -0,0 +1,31 @@ + + + + saltybot_uwb_imu_fusion + 0.1.0 + + EKF-based UWB + IMU sensor fusion for SaltyBot indoor localization (Issue #573). + Fuses UWB position at 10 Hz with IMU accel+gyro at 200 Hz. + Handles UWB dropouts via IMU dead-reckoning. + Publishes /saltybot/pose/fused and /saltybot/pose/fused_cov. + + sl-uwb + Apache-2.0 + + rclpy + geometry_msgs + sensor_msgs + tf2_ros + saltybot_uwb_msgs + + python3-numpy + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/resource/saltybot_uwb_imu_fusion b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/resource/saltybot_uwb_imu_fusion new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/saltybot_uwb_imu_fusion/__init__.py b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/saltybot_uwb_imu_fusion/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/saltybot_uwb_imu_fusion/ekf_math.py b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/saltybot_uwb_imu_fusion/ekf_math.py new file mode 100644 index 0000000..e2b5059 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/saltybot_uwb_imu_fusion/ekf_math.py @@ -0,0 +1,266 @@ +""" +ekf_math.py — Extended Kalman Filter for UWB + IMU fusion (Issue #573) + +No ROS2 dependencies — pure Python + numpy, fully unit-testable. + +State vector (6-DOF ground robot): +─────────────────────────────────── + x = [x, y, θ, vx, vy, ω]ᵀ (world frame) + x — forward position (m) + y — lateral position (m) + θ — heading (rad, CCW positive) + vx — longitudinal velocity (m/s, world frame) + vy — lateral velocity (m/s, world frame) + ω — yaw rate (rad/s) + +Process model — IMU as control input u = [ax_body, ay_body, ω_imu] +─────────────────────────────────────────────────────────────────── + θ_k+1 = θ_k + ω_imu * dt + vx_k+1 = vx_k + (ax_body * cos(θ) - ay_body * sin(θ)) * dt + vy_k+1 = vy_k + (ax_body * sin(θ) + ay_body * cos(θ)) * dt + x_k+1 = x_k + vx_k * dt + y_k+1 = y_k + vy_k * dt + ω_k+1 = ω_imu (direct observation, no integration) + +UWB measurement model (position observation): +────────────────────────────────────────────── + z = [x, y]ᵀ H = [[1,0,0,0,0,0], + [0,1,0,0,0,0]] + +IMU bias handling: +────────────────── + Simple first-order high-pass on accel to reduce drift. + For longer deployments extend state to include accel bias. +""" + +from __future__ import annotations + +import math +import numpy as np +from typing import Tuple + +# State dimension +N = 6 # [x, y, θ, vx, vy, ω] + +# Indices +IX, IY, IT, IVX, IVY, IW = range(N) + + +class UwbImuEKF: + """ + EKF fusing UWB position (10 Hz) with IMU accel+gyro (200 Hz). + + Parameters + ---------- + sigma_uwb_pos_m : UWB position std-dev (m) default 0.12 + sigma_imu_accel : IMU accelerometer noise (m/s²) default 0.05 + sigma_imu_gyro : IMU gyroscope noise (rad/s) default 0.003 + sigma_vel_drift : velocity drift process noise (m/s) default 0.1 + dropout_vel_damping : velocity damping per second during UWB dropout + """ + + def __init__( + self, + sigma_uwb_pos_m: float = 0.12, + sigma_imu_accel: float = 0.05, + sigma_imu_gyro: float = 0.003, + sigma_vel_drift: float = 0.10, + dropout_vel_damping: float = 0.95, + ) -> None: + self._R_uwb = sigma_uwb_pos_m ** 2 # UWB position variance + self._q_a = sigma_imu_accel ** 2 # accel noise variance + self._q_g = sigma_imu_gyro ** 2 # gyro noise variance + self._q_v = sigma_vel_drift ** 2 # velocity drift variance + self._damp = dropout_vel_damping + + # State and covariance + self._x = np.zeros(N, dtype=float) + self._P = np.eye(N, dtype=float) * 9.0 # large initial uncertainty + + # Accel bias (simple running mean for high-pass) + self._accel_bias = np.zeros(2, dtype=float) + self._bias_alpha = 0.005 # low-pass coefficient for bias estimation + + self._initialised = False + self._uwb_dropout_s = 0.0 # time since last UWB update + + # ── Initialise ─────────────────────────────────────────────────────────── + + def initialise(self, x: float, y: float, heading_rad: float = 0.0) -> None: + """Seed the filter with a known position.""" + self._x[:] = 0.0 + self._x[IX] = x + self._x[IY] = y + self._x[IT] = heading_rad + self._P = np.diag([0.25, 0.25, 0.1, 1.0, 1.0, 0.1]) + self._initialised = True + + # ── IMU predict ────────────────────────────────────────────────────────── + + def predict(self, ax_body: float, ay_body: float, omega: float, dt: float) -> None: + """ + EKF predict step driven by IMU measurement. + + Parameters + ---------- + ax_body : forward acceleration in body frame (m/s²) + ay_body : lateral acceleration in body frame (m/s², left positive) + omega : yaw rate (rad/s, CCW positive) + dt : time step (s) + """ + if not self._initialised: + return + + # Subtract estimated accel bias + ax_c = ax_body - self._accel_bias[0] + ay_c = ay_body - self._accel_bias[1] + + x = self._x + th = x[IT] + ct = math.cos(th) + st = math.sin(th) + + # World-frame acceleration + ax_w = ax_c * ct - ay_c * st + ay_w = ax_c * st + ay_c * ct + + # State prediction + xp = x.copy() + xp[IX] = x[IX] + x[IVX] * dt + xp[IY] = x[IY] + x[IVY] * dt + xp[IT] = _wrap_angle(x[IT] + omega * dt) + xp[IVX] = x[IVX] + ax_w * dt + xp[IVY] = x[IVY] + ay_w * dt + xp[IW] = omega # direct observation from gyro + + # Jacobian F = ∂f/∂x + F = np.eye(N, dtype=float) + # ∂x / ∂vx, ∂x / ∂vy + F[IX, IVX] = dt + F[IY, IVY] = dt + # ∂vx / ∂θ = (-ax_c·sin(θ) - ay_c·cos(θ)) * dt + F[IVX, IT] = (-ax_c * st - ay_c * ct) * dt + # ∂vy / ∂θ = ( ax_c·cos(θ) - ay_c·sin(θ)) * dt + F[IVY, IT] = ( ax_c * ct - ay_c * st) * dt + + # Process noise Q + dt2 = dt * dt + Q = np.diag([ + 0.5 * self._q_a * dt2 * dt2, # x: from accel double-integrated + 0.5 * self._q_a * dt2 * dt2, # y + self._q_g * dt2, # θ: from gyro integrated + self._q_v * dt + self._q_a * dt2, # vx + self._q_v * dt + self._q_a * dt2, # vy + self._q_g, # ω: direct gyro noise + ]) + + self._x = xp + self._P = F @ self._P @ F.T + Q + self._uwb_dropout_s += dt + + # Velocity damping during extended UWB dropout (dead-reckoning coasting) + if self._uwb_dropout_s > 2.0: + damping = self._damp ** dt + self._x[IVX] *= damping + self._x[IVY] *= damping + + # Update accel bias estimate + self._accel_bias[0] += self._bias_alpha * (ax_body - self._accel_bias[0]) + self._accel_bias[1] += self._bias_alpha * (ay_body - self._accel_bias[1]) + + # ── UWB update ─────────────────────────────────────────────────────────── + + def update_uwb(self, x_meas: float, y_meas: float, + sigma_m: float | None = None) -> None: + """ + EKF update step with a UWB position measurement. + + Parameters + ---------- + x_meas, y_meas : measured position (m, world frame) + sigma_m : override measurement std-dev (m); uses node default if None + """ + if not self._initialised: + self.initialise(x_meas, y_meas) + return + + R_val = (sigma_m ** 2) if sigma_m is not None else self._R_uwb + R = np.eye(2) * R_val + + # H: position rows only + H = np.zeros((2, N)) + H[0, IX] = 1.0 + H[1, IY] = 1.0 + + innov = np.array([x_meas - self._x[IX], y_meas - self._x[IY]]) + S = H @ self._P @ H.T + R + K = self._P @ H.T @ np.linalg.inv(S) + + self._x = self._x + K @ innov + self._x[IT] = _wrap_angle(self._x[IT]) + self._P = (np.eye(N) - K @ H) @ self._P + + self._uwb_dropout_s = 0.0 + + # ── Update heading from magnetometer / other source ─────────────────────── + + def update_heading(self, heading_rad: float, sigma_rad: float = 0.1) -> None: + """Optional: update θ directly from compass or visual odometry.""" + if not self._initialised: + return + H = np.zeros((1, N)) + H[0, IT] = 1.0 + innov = np.array([_wrap_angle(heading_rad - self._x[IT])]) + S = H @ self._P @ H.T + np.array([[sigma_rad ** 2]]) + K = self._P @ H.T @ np.linalg.inv(S) + self._x = self._x + K.flatten() * innov[0] + self._x[IT] = _wrap_angle(self._x[IT]) + self._P = (np.eye(N) - K @ H) @ self._P + + # ── Accessors ──────────────────────────────────────────────────────────── + + @property + def position(self) -> Tuple[float, float]: + return float(self._x[IX]), float(self._x[IY]) + + @property + def heading(self) -> float: + return float(self._x[IT]) + + @property + def velocity(self) -> Tuple[float, float]: + return float(self._x[IVX]), float(self._x[IVY]) + + @property + def yaw_rate(self) -> float: + return float(self._x[IW]) + + @property + def covariance_position(self) -> np.ndarray: + """2×2 position covariance sub-matrix.""" + return self._P[:2, :2].copy() + + @property + def covariance_full(self) -> np.ndarray: + """Full 6×6 state covariance.""" + return self._P.copy() + + @property + def is_initialised(self) -> bool: + return self._initialised + + @property + def uwb_dropout_s(self) -> float: + """Seconds since last UWB update.""" + return self._uwb_dropout_s + + def position_uncertainty_m(self) -> float: + """Approximate 1-σ position uncertainty (m).""" + return float(math.sqrt((self._P[IX, IX] + self._P[IY, IY]) / 2.0)) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _wrap_angle(a: float) -> float: + """Wrap angle to [-π, π].""" + return float((a + math.pi) % (2 * math.pi) - math.pi) diff --git a/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/saltybot_uwb_imu_fusion/ekf_node.py b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/saltybot_uwb_imu_fusion/ekf_node.py new file mode 100644 index 0000000..b4fe3f7 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/saltybot_uwb_imu_fusion/ekf_node.py @@ -0,0 +1,268 @@ +""" +ekf_node.py — ROS2 node: UWB + IMU EKF fusion (Issue #573) + +Fuses: + /imu/data (sensor_msgs/Imu, 200 Hz) — predict step + /uwb/bearing (UwbBearing, 10 Hz) — update step (bearing+range → x,y) + /saltybot/uwb/pose (geometry_msgs/PoseStamped, 10 Hz) — update step (absolute position) + +Publishes: + /saltybot/pose/fused (geometry_msgs/PoseStamped) — fused pose at IMU rate + /saltybot/pose/fused_cov (geometry_msgs/PoseWithCovarianceStamped) — with covariance + +TF2: + Broadcasts base_link → map from fused pose + +UWB dropout: + IMU dead-reckoning continues for up to `max_dead_reckoning_s` seconds. + After that the velocity estimate is zeroed; position uncertainty grows. + +Parameters (see config/fusion_params.yaml): + imu_topic /imu/data + uwb_bearing_topic /uwb/bearing + uwb_pose_topic /saltybot/uwb/pose (preferred if available) + use_uwb_pose true — use absolute pose; false = use bearing only + publish_tf true + map_frame map + base_frame base_link + sigma_uwb_pos_m 0.12 + sigma_imu_accel 0.05 + sigma_imu_gyro 0.003 + sigma_vel_drift 0.10 + max_dead_reckoning_s 10.0 +""" + +import math +import time + +import rclpy +from rclpy.node import Node +from rclpy.qos import ( + QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy +) + +import tf2_ros +from geometry_msgs.msg import ( + PoseStamped, PoseWithCovarianceStamped, TransformStamped +) +from sensor_msgs.msg import Imu +from std_msgs.msg import Header + +from saltybot_uwb_msgs.msg import UwbBearing +from saltybot_uwb_imu_fusion.ekf_math import UwbImuEKF + + +_SENSOR_QOS = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=5, +) + + +class EkfFusionNode(Node): + + def __init__(self) -> None: + super().__init__("uwb_imu_fusion") + + # ── Parameters ──────────────────────────────────────────────────────── + self.declare_parameter("imu_topic", "/imu/data") + self.declare_parameter("uwb_bearing_topic", "/uwb/bearing") + self.declare_parameter("uwb_pose_topic", "/saltybot/uwb/pose") + self.declare_parameter("use_uwb_pose", True) + self.declare_parameter("publish_tf", True) + self.declare_parameter("map_frame", "map") + self.declare_parameter("base_frame", "base_link") + self.declare_parameter("sigma_uwb_pos_m", 0.12) + self.declare_parameter("sigma_imu_accel", 0.05) + self.declare_parameter("sigma_imu_gyro", 0.003) + self.declare_parameter("sigma_vel_drift", 0.10) + self.declare_parameter("dropout_vel_damping", 0.95) + self.declare_parameter("max_dead_reckoning_s", 10.0) + + self._map_frame = self.get_parameter("map_frame").value + self._base_frame = self.get_parameter("base_frame").value + self._publish_tf = self.get_parameter("publish_tf").value + self._use_uwb_pose = self.get_parameter("use_uwb_pose").value + self._max_dr = self.get_parameter("max_dead_reckoning_s").value + + # ── EKF ─────────────────────────────────────────────────────────────── + self._ekf = UwbImuEKF( + sigma_uwb_pos_m = self.get_parameter("sigma_uwb_pos_m").value, + sigma_imu_accel = self.get_parameter("sigma_imu_accel").value, + sigma_imu_gyro = self.get_parameter("sigma_imu_gyro").value, + sigma_vel_drift = self.get_parameter("sigma_vel_drift").value, + dropout_vel_damping = self.get_parameter("dropout_vel_damping").value, + ) + + self._last_imu_t: float | None = None + + # ── Publishers ──────────────────────────────────────────────────────── + self._pose_pub = self.create_publisher(PoseStamped, "/saltybot/pose/fused", 10) + self._pose_cov_pub = self.create_publisher(PoseWithCovarianceStamped, "/saltybot/pose/fused_cov", 10) + + if self._publish_tf: + self._tf_br = tf2_ros.TransformBroadcaster(self) + else: + self._tf_br = None + + # ── Subscriptions ───────────────────────────────────────────────────── + self.create_subscription( + Imu, self.get_parameter("imu_topic").value, + self._imu_cb, _SENSOR_QOS + ) + + if self._use_uwb_pose: + self.create_subscription( + PoseStamped, + self.get_parameter("uwb_pose_topic").value, + self._uwb_pose_cb, 10 + ) + else: + self.create_subscription( + UwbBearing, + self.get_parameter("uwb_bearing_topic").value, + self._uwb_bearing_cb, 10 + ) + + self.get_logger().info( + f"EKF fusion ready — " + f"IMU:{self.get_parameter('imu_topic').value} " + f"UWB:{'pose' if self._use_uwb_pose else 'bearing'} " + f"tf={self._publish_tf} " + f"dr={self._max_dr}s" + ) + + # ── IMU callback (200 Hz) — predict step ────────────────────────────────── + + def _imu_cb(self, msg: Imu) -> None: + now = self.get_clock().now().nanoseconds * 1e-9 + if self._last_imu_t is None: + self._last_imu_t = now + return + + dt = now - self._last_imu_t + self._last_imu_t = now + + if dt <= 0.0 or dt > 0.5: + return # stale or implausible + + # Extract IMU data (body frame, ROS convention: x=forward, y=left, z=up) + ax = msg.linear_acceleration.x + ay = msg.linear_acceleration.y + # az = msg.linear_acceleration.z # gravity along z, ignored for ground robot + omega = msg.angular_velocity.z # yaw rate + + self._ekf.predict(ax_body=ax, ay_body=ay, omega=omega, dt=dt) + + # Publish fused pose at IMU rate if filter is alive + if self._ekf.is_initialised: + if self._ekf.uwb_dropout_s < self._max_dr: + self._publish_pose(msg.header.stamp) + else: + # Dropout too long — stop publishing, log warning + self.get_logger().warn( + f"UWB dropout {self._ekf.uwb_dropout_s:.1f}s — " + "pose unreliable, output suppressed", + throttle_duration_sec=5.0, + ) + + # ── UWB pose callback (absolute position, 10 Hz) — update step ─────────── + + def _uwb_pose_cb(self, msg: PoseStamped) -> None: + x = msg.pose.position.x + y = msg.pose.position.y + self._ekf.update_uwb(x, y) + + if not self._ekf.is_initialised: + return + + # Extract heading from quaternion if available + q = msg.pose.orientation + if abs(q.w) > 0.01: + yaw = 2.0 * math.atan2(q.z, q.w) + self._ekf.update_heading(yaw, sigma_rad=0.2) + + # ── UWB bearing callback (relative, 10 Hz) — update step ───────────────── + + def _uwb_bearing_cb(self, msg: UwbBearing) -> None: + r = float(msg.range_m) + brg = float(msg.bearing_rad) + if r < 0.1: + return + # Convert polar (bearing from base_link) to absolute position estimate + # This is relative to robot, so add to current robot position to get world coords. + # Note: if we don't have absolute anchors, this stays in base_link frame. + # Use it as a position measurement in base_link (x = forward, y = left). + x_rel = r * math.cos(brg) + y_rel = r * math.sin(brg) + # Inflate sigma for single-anchor degraded mode + sigma = 0.15 if msg.confidence >= 1.0 else 0.30 + self._ekf.update_uwb(x_rel, y_rel, sigma_m=sigma) + + # ── Publish fused pose ──────────────────────────────────────────────────── + + def _publish_pose(self, stamp) -> None: + x, y = self._ekf.position + heading = self._ekf.heading + cov_pos = self._ekf.covariance_position + cov_full = self._ekf.covariance_full + + half_yaw = heading / 2.0 + qz = math.sin(half_yaw) + qw = math.cos(half_yaw) + + # PoseStamped + pose = PoseStamped() + pose.header.stamp = stamp + pose.header.frame_id = self._map_frame + pose.pose.position.x = x + pose.pose.position.y = y + pose.pose.position.z = 0.0 + pose.pose.orientation.z = qz + pose.pose.orientation.w = qw + self._pose_pub.publish(pose) + + # PoseWithCovarianceStamped + pose_cov = PoseWithCovarianceStamped() + pose_cov.header = pose.header + pose_cov.pose.pose = pose.pose + cov36 = [0.0] * 36 + cov36[0] = cov_pos[0, 0] # x,x + cov36[1] = cov_pos[0, 1] # x,y + cov36[6] = cov_pos[1, 0] # y,x + cov36[7] = cov_pos[1, 1] # y,y + cov36[14] = 1e-4 # z (not estimated) + cov36[21] = 1e-4 # roll + cov36[28] = 1e-4 # pitch + cov36[35] = float(cov_full[2, 2]) # yaw + pose_cov.pose.covariance = cov36 + self._pose_cov_pub.publish(pose_cov) + + # TF2 + if self._tf_br is not None: + tf = TransformStamped() + tf.header.stamp = stamp + tf.header.frame_id = self._map_frame + tf.child_frame_id = self._base_frame + tf.transform.translation.x = x + tf.transform.translation.y = y + tf.transform.translation.z = 0.0 + tf.transform.rotation.z = qz + tf.transform.rotation.w = qw + self._tf_br.sendTransform(tf) + + +def main(args=None) -> None: + rclpy.init(args=args) + node = EkfFusionNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.try_shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/setup.cfg b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/setup.cfg new file mode 100644 index 0000000..ffbee02 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltybot_uwb_imu_fusion +[install] +install_scripts=$base/lib/saltybot_uwb_imu_fusion diff --git a/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/setup.py b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/setup.py new file mode 100644 index 0000000..1dac3c3 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/setup.py @@ -0,0 +1,29 @@ +import os +from glob import glob +from setuptools import setup + +package_name = "saltybot_uwb_imu_fusion" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", + [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (os.path.join("share", package_name, "launch"), glob("launch/*.py")), + (os.path.join("share", package_name, "config"), glob("config/*.yaml")), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-uwb", + maintainer_email="sl-uwb@saltylab.local", + description="EKF UWB+IMU fusion for SaltyBot localization", + license="Apache-2.0", + entry_points={ + "console_scripts": [ + "uwb_imu_fusion = saltybot_uwb_imu_fusion.ekf_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/test/test_ekf_math.py b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/test/test_ekf_math.py new file mode 100644 index 0000000..6d40bc6 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_uwb_imu_fusion/test/test_ekf_math.py @@ -0,0 +1,152 @@ +""" +Unit tests for saltybot_uwb_imu_fusion.ekf_math (Issue #573). +No ROS2 or hardware required. +""" + +import math +import sys +import os + +import numpy as np +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) +from saltybot_uwb_imu_fusion.ekf_math import UwbImuEKF, _wrap_angle + + +class TestWrapAngle: + def test_within_range(self): + assert abs(_wrap_angle(1.0) - 1.0) < 1e-9 + + def test_positive_overflow(self): + assert abs(_wrap_angle(math.pi + 0.1) - (-math.pi + 0.1)) < 1e-9 + + def test_negative_overflow(self): + assert abs(_wrap_angle(-math.pi - 0.1) - (math.pi - 0.1)) < 1e-9 + + +class TestEkfInitialise: + def test_not_initialised_by_default(self): + ekf = UwbImuEKF() + assert not ekf.is_initialised + + def test_initialise_sets_position(self): + ekf = UwbImuEKF() + ekf.initialise(3.0, 4.0, heading_rad=0.5) + assert ekf.is_initialised + x, y = ekf.position + assert abs(x - 3.0) < 1e-9 + assert abs(y - 4.0) < 1e-9 + assert abs(ekf.heading - 0.5) < 1e-9 + + +class TestEkfPredict: + def test_stationary_predict_no_drift(self): + """Stationary robot with zero IMU input stays near origin.""" + ekf = UwbImuEKF(sigma_vel_drift=0.0) + ekf.initialise(0.0, 0.0) + for _ in range(10): + ekf.predict(ax_body=0.0, ay_body=0.0, omega=0.0, dt=0.01) + x, y = ekf.position + assert abs(x) < 0.01 + assert abs(y) < 0.01 + + def test_forward_acceleration(self): + """1 m/s² forward for 0.5 s → ~0.125 m forward displacement.""" + ekf = UwbImuEKF(sigma_imu_accel=0.0, sigma_vel_drift=0.0) + ekf.initialise(0.0, 0.0, heading_rad=0.0) + dt = 0.005 + for _ in range(100): # 0.5 s + ekf.predict(ax_body=1.0, ay_body=0.0, omega=0.0, dt=dt) + x, y = ekf.position + # x ≈ 0.5 * 1 * 0.5² = 0.125 m + assert 0.10 < x < 0.16, f"x={x}" + assert abs(y) < 0.01 + + def test_yaw_rate(self): + """π/4 rad/s for 1 s → heading ≈ π/4.""" + ekf = UwbImuEKF(sigma_imu_gyro=0.0) + ekf.initialise(0.0, 0.0, heading_rad=0.0) + dt = 0.005 + for _ in range(200): + ekf.predict(ax_body=0.0, ay_body=0.0, omega=math.pi / 4, dt=dt) + assert abs(ekf.heading - math.pi / 4) < 0.05 + + def test_covariance_grows_with_time(self): + """Covariance must grow during predict (no updates).""" + ekf = UwbImuEKF() + ekf.initialise(0.0, 0.0) + p0 = float(ekf.covariance_position[0, 0]) + for _ in range(50): + ekf.predict(ax_body=0.0, ay_body=0.0, omega=0.0, dt=0.01) + p1 = float(ekf.covariance_position[0, 0]) + assert p1 > p0, f"covariance did not grow: p0={p0}, p1={p1}" + + +class TestEkfUpdate: + def test_uwb_update_reduces_covariance(self): + """UWB update must reduce position covariance.""" + ekf = UwbImuEKF() + ekf.initialise(0.0, 0.0) + # Grow uncertainty first + for _ in range(20): + ekf.predict(ax_body=0.0, ay_body=0.0, omega=0.0, dt=0.05) + p_before = float(ekf.covariance_position[0, 0]) + ekf.update_uwb(0.1, 0.1) + p_after = float(ekf.covariance_position[0, 0]) + assert p_after < p_before + + def test_uwb_corrects_position(self): + """Large position error corrected by UWB measurement.""" + ekf = UwbImuEKF(sigma_uwb_pos_m=0.1) + ekf.initialise(5.0, 5.0) + # Multiple UWB updates at true position (0,0) + for _ in range(20): + ekf.update_uwb(0.0, 0.0) + x, y = ekf.position + assert abs(x) < 0.5, f"x not corrected: {x}" + assert abs(y) < 0.5, f"y not corrected: {y}" + + def test_uninitialised_update_initialises(self): + """Calling update_uwb before initialise should initialise filter.""" + ekf = UwbImuEKF() + assert not ekf.is_initialised + ekf.update_uwb(2.0, 3.0) + assert ekf.is_initialised + x, y = ekf.position + assert abs(x - 2.0) < 1e-9 + assert abs(y - 3.0) < 1e-9 + + +class TestDropout: + def test_velocity_damping_during_dropout(self): + """Velocity decays during extended UWB dropout.""" + ekf = UwbImuEKF(dropout_vel_damping=0.9) + ekf.initialise(0.0, 0.0) + # Give it some velocity + for _ in range(50): + ekf.predict(ax_body=2.0, ay_body=0.0, omega=0.0, dt=0.01) + vx_before, _ = ekf.velocity + # Let dropout accumulate (> 2 s threshold) + for _ in range(300): + ekf.predict(ax_body=0.0, ay_body=0.0, omega=0.0, dt=0.01) + vx_after, _ = ekf.velocity + assert abs(vx_after) < abs(vx_before), \ + f"velocity not damped: before={vx_before:.3f}, after={vx_after:.3f}" + + +class TestCovariance: + def test_covariance_positive_definite(self): + """Full covariance matrix must be positive definite at all times.""" + ekf = UwbImuEKF() + ekf.initialise(1.0, 2.0) + for _ in range(20): + ekf.predict(ax_body=0.5, ay_body=0.1, omega=0.05, dt=0.01) + if _ % 5 == 0: + ekf.update_uwb(1.0, 2.0) + eigvals = np.linalg.eigvalsh(ekf.covariance_full) + assert all(ev > -1e-9 for ev in eigvals), f"Non-PD covariance: {eigvals}" + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])