feat: UWB follow-me system (#57) #66
@ -1,30 +1,42 @@
|
|||||||
"""
|
"""
|
||||||
person_follower_node.py — Proportional person-following control loop.
|
person_follower_node.py -- Proportional person-following control loop.
|
||||||
|
|
||||||
Subscribes:
|
Subscribes:
|
||||||
/person/target (geometry_msgs/PoseStamped) — person pose in base_link frame
|
/uwb/target (geometry_msgs/PoseStamped) -- UWB-triangulated position (primary)
|
||||||
/local_costmap/costmap (nav_msgs/OccupancyGrid) — Nav2 local obstacle map
|
/person/target (geometry_msgs/PoseStamped) -- camera-detected position (secondary)
|
||||||
|
/local_costmap/costmap (nav_msgs/OccupancyGrid) -- Nav2 local obstacle map
|
||||||
|
|
||||||
Publishes:
|
Publishes:
|
||||||
/cmd_vel (geometry_msgs/Twist) — velocity commands
|
/cmd_vel (geometry_msgs/Twist) -- velocity commands
|
||||||
|
|
||||||
|
Sensor fusion
|
||||||
|
-------------
|
||||||
|
When both UWB and camera sources are fresh:
|
||||||
|
fused = uwb_weight * uwb + (1 - uwb_weight) * camera
|
||||||
|
When only UWB is fresh: use UWB directly.
|
||||||
|
When only camera is fresh: use camera directly.
|
||||||
|
When neither is fresh: STOPPING -> SEARCHING.
|
||||||
|
|
||||||
|
UWB is declared stale after uwb_timeout seconds (default 1.0s).
|
||||||
|
Camera is declared stale after lost_timeout seconds (default 2.0s).
|
||||||
|
|
||||||
State machine
|
State machine
|
||||||
─────────────
|
-------------
|
||||||
FOLLOWING — person visible (last detection < lost_timeout ago)
|
FOLLOWING -- at least one source fresh (last detection < timeout ago)
|
||||||
STOPPING — person lost, publishing zero until search_timeout
|
STOPPING -- all sources lost, publishing zero until search_timeout
|
||||||
SEARCHING — person lost > search_timeout, slow rotation to re-acquire
|
SEARCHING -- all sources lost > search_timeout, slow rotation to re-acquire
|
||||||
|
|
||||||
Safety wiring
|
Safety wiring
|
||||||
─────────────
|
-------------
|
||||||
• cmd_vel bridge (PR #46) applies ramp + deadman + STM32 AUTONOMOUS mode gate —
|
* cmd_vel bridge (PR #46) applies ramp + deadman + STM32 AUTONOMOUS mode gate --
|
||||||
this node publishes raw /cmd_vel, the bridge handles hardware safety.
|
this node publishes raw /cmd_vel, the bridge handles hardware safety.
|
||||||
• follow_enabled param (default True) lets the operator disable the controller
|
* follow_enabled param (default True) lets the operator disable the controller
|
||||||
at runtime: ros2 param set /person_follower follow_enabled false
|
at runtime: ros2 param set /person_follower follow_enabled false
|
||||||
• Obstacle override: if Nav2 local costmap shows a lethal cell in the forward
|
* Obstacle override: if Nav2 local costmap shows a lethal cell in the forward
|
||||||
corridor, forward cmd_vel is zeroed; turning is still allowed.
|
corridor, forward cmd_vel is zeroed; turning is still allowed.
|
||||||
|
|
||||||
Usage
|
Usage
|
||||||
─────
|
-----
|
||||||
ros2 launch saltybot_follower person_follower.launch.py
|
ros2 launch saltybot_follower person_follower.launch.py
|
||||||
ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.2
|
ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.2
|
||||||
"""
|
"""
|
||||||
@ -37,13 +49,13 @@ from rclpy.node import Node
|
|||||||
from geometry_msgs.msg import PoseStamped, Twist
|
from geometry_msgs.msg import PoseStamped, Twist
|
||||||
from nav_msgs.msg import OccupancyGrid
|
from nav_msgs.msg import OccupancyGrid
|
||||||
|
|
||||||
# ── State constants ────────────────────────────────────────────────────────────
|
# -- State constants -----------------------------------------------------------
|
||||||
_FOLLOWING = "following"
|
_FOLLOWING = "following"
|
||||||
_STOPPING = "stopping" # person lost < search_timeout, publishing zero
|
_STOPPING = "stopping" # all sources lost < search_timeout, publishing zero
|
||||||
_SEARCHING = "searching" # person lost > search_timeout, slow rotation
|
_SEARCHING = "searching" # all sources lost > search_timeout, slow rotation
|
||||||
|
|
||||||
|
|
||||||
# ── Pure helper functions (also exercised by unit tests) ──────────────────────
|
# -- Pure helper functions (also exercised by unit tests) ----------------------
|
||||||
|
|
||||||
def clamp(v: float, lo: float, hi: float) -> float:
|
def clamp(v: float, lo: float, hi: float) -> float:
|
||||||
return max(lo, min(hi, v))
|
return max(lo, min(hi, v))
|
||||||
@ -58,6 +70,40 @@ def compute_bearing(px: float, py: float) -> float:
|
|||||||
return math.atan2(py, px)
|
return math.atan2(py, px)
|
||||||
|
|
||||||
|
|
||||||
|
def fuse_targets(
|
||||||
|
uwb_x: float,
|
||||||
|
uwb_y: float,
|
||||||
|
uwb_fresh: bool,
|
||||||
|
cam_x: float,
|
||||||
|
cam_y: float,
|
||||||
|
cam_fresh: bool,
|
||||||
|
uwb_weight: float,
|
||||||
|
) -> tuple:
|
||||||
|
"""
|
||||||
|
Fuse UWB (primary) and camera (secondary) position estimates.
|
||||||
|
|
||||||
|
Returns (fused_x, fused_y, any_fresh).
|
||||||
|
|
||||||
|
Fusion rules:
|
||||||
|
Both fresh -> weighted blend: uwb_weight * uwb + (1-uwb_weight) * camera
|
||||||
|
UWB only -> UWB position, full weight
|
||||||
|
Camera only -> camera position, full weight
|
||||||
|
Neither -> (0, 0, False)
|
||||||
|
"""
|
||||||
|
if uwb_fresh and cam_fresh:
|
||||||
|
w = clamp(uwb_weight, 0.0, 1.0)
|
||||||
|
return (
|
||||||
|
w * uwb_x + (1.0 - w) * cam_x,
|
||||||
|
w * uwb_y + (1.0 - w) * cam_y,
|
||||||
|
True,
|
||||||
|
)
|
||||||
|
if uwb_fresh:
|
||||||
|
return uwb_x, uwb_y, True
|
||||||
|
if cam_fresh:
|
||||||
|
return cam_x, cam_y, True
|
||||||
|
return 0.0, 0.0, False
|
||||||
|
|
||||||
|
|
||||||
def compute_follow_cmd(
|
def compute_follow_cmd(
|
||||||
px: float,
|
px: float,
|
||||||
py: float,
|
py: float,
|
||||||
@ -75,30 +121,27 @@ def compute_follow_cmd(
|
|||||||
Returns (linear_x, angular_z) as floats.
|
Returns (linear_x, angular_z) as floats.
|
||||||
|
|
||||||
linear_x:
|
linear_x:
|
||||||
Proportional to signed distance error (distance − follow_distance).
|
Proportional to signed distance error (distance - follow_distance).
|
||||||
Zero inside the dead zone. Clamped to ±max_linear_vel.
|
Zero inside the dead zone. Clamped to +-max_linear_vel.
|
||||||
Zeroed (not reversed) if obstacle_ahead and moving forward.
|
Zeroed (not reversed) if obstacle_ahead and moving forward.
|
||||||
|
|
||||||
angular_z:
|
angular_z:
|
||||||
Proportional to bearing. Always corrects even when inside the dead zone,
|
Proportional to bearing. Always corrects even when inside the dead zone,
|
||||||
so the robot stays pointed at the person while holding distance.
|
so the robot stays pointed at the person while holding distance.
|
||||||
Clamped to ±max_angular_vel.
|
Clamped to +-max_angular_vel.
|
||||||
"""
|
"""
|
||||||
distance = compute_distance(px, py)
|
distance = compute_distance(px, py)
|
||||||
bearing = compute_bearing(px, py)
|
bearing = compute_bearing(px, py)
|
||||||
|
|
||||||
# Linear — dead zone suppresses jitter at target distance
|
|
||||||
dist_error = distance - follow_distance
|
dist_error = distance - follow_distance
|
||||||
if abs(dist_error) > dead_zone:
|
if abs(dist_error) > dead_zone:
|
||||||
linear_x = clamp(kp_linear * dist_error, -max_linear_vel, max_linear_vel)
|
linear_x = clamp(kp_linear * dist_error, -max_linear_vel, max_linear_vel)
|
||||||
else:
|
else:
|
||||||
linear_x = 0.0
|
linear_x = 0.0
|
||||||
|
|
||||||
# Obstacle override: suppress forward motion, allow turning
|
|
||||||
if obstacle_ahead and linear_x > 0.0:
|
if obstacle_ahead and linear_x > 0.0:
|
||||||
linear_x = 0.0
|
linear_x = 0.0
|
||||||
|
|
||||||
# Angular — always correct bearing
|
|
||||||
angular_z = clamp(kp_angular * bearing, -max_angular_vel, max_angular_vel)
|
angular_z = clamp(kp_angular * bearing, -max_angular_vel, max_angular_vel)
|
||||||
|
|
||||||
return linear_x, angular_z
|
return linear_x, angular_z
|
||||||
@ -116,7 +159,7 @@ def check_obstacle_in_costmap(
|
|||||||
"""
|
"""
|
||||||
Check for obstacle cells in the robot's forward corridor.
|
Check for obstacle cells in the robot's forward corridor.
|
||||||
|
|
||||||
Assumes a Nav2 rolling-window local costmap (robot ≈ centre cell).
|
Assumes a Nav2 rolling-window local costmap (robot ~= centre cell).
|
||||||
Forward = +col direction (robot +X).
|
Forward = +col direction (robot +X).
|
||||||
|
|
||||||
costmap_data : flat row-major int8 list (from OccupancyGrid.data)
|
costmap_data : flat row-major int8 list (from OccupancyGrid.data)
|
||||||
@ -124,7 +167,7 @@ def check_obstacle_in_costmap(
|
|||||||
resolution : metres per cell
|
resolution : metres per cell
|
||||||
check_dist : how far ahead to look (metres)
|
check_dist : how far ahead to look (metres)
|
||||||
robot_half_width: corridor half-width (metres)
|
robot_half_width: corridor half-width (metres)
|
||||||
threshold : costmap value ≥ this is treated as obstacle (Nav2 lethal=99)
|
threshold : costmap value >= this is treated as obstacle (Nav2 lethal=99)
|
||||||
"""
|
"""
|
||||||
if resolution <= 0.0:
|
if resolution <= 0.0:
|
||||||
return False
|
return False
|
||||||
@ -147,14 +190,14 @@ def check_obstacle_in_costmap(
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
|
# -- ROS2 Node ----------------------------------------------------------------
|
||||||
|
|
||||||
class PersonFollowerNode(Node):
|
class PersonFollowerNode(Node):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__("person_follower")
|
super().__init__("person_follower")
|
||||||
|
|
||||||
# ── Parameters ────────────────────────────────────────────────────────
|
# -- Parameters --------------------------------------------------------
|
||||||
self.declare_parameter("follow_distance", 1.5)
|
self.declare_parameter("follow_distance", 1.5)
|
||||||
self.declare_parameter("dead_zone", 0.3)
|
self.declare_parameter("dead_zone", 0.3)
|
||||||
self.declare_parameter("kp_linear", 0.6)
|
self.declare_parameter("kp_linear", 0.6)
|
||||||
@ -169,38 +212,48 @@ class PersonFollowerNode(Node):
|
|||||||
self.declare_parameter("robot_width", 0.3)
|
self.declare_parameter("robot_width", 0.3)
|
||||||
self.declare_parameter("control_rate", 20.0)
|
self.declare_parameter("control_rate", 20.0)
|
||||||
self.declare_parameter("follow_enabled", True)
|
self.declare_parameter("follow_enabled", True)
|
||||||
|
# UWB fusion
|
||||||
|
self.declare_parameter("uwb_weight", 0.7)
|
||||||
|
self.declare_parameter("uwb_timeout", 1.0)
|
||||||
|
|
||||||
self._p = {}
|
self._p = {}
|
||||||
self._reload_params()
|
self._reload_params()
|
||||||
|
|
||||||
# ── State ─────────────────────────────────────────────────────────────
|
# -- State -------------------------------------------------------------
|
||||||
self._state = _STOPPING
|
self._state = _STOPPING
|
||||||
self._last_target_t = 0.0 # monotonic clock; 0 = never received
|
self._last_target_t = 0.0 # camera; monotonic; 0 = never received
|
||||||
self._person_x = 0.0
|
self._person_x = 0.0
|
||||||
self._person_y = 0.0
|
self._person_y = 0.0
|
||||||
|
self._last_uwb_t = 0.0 # UWB; monotonic; 0 = never received
|
||||||
|
self._uwb_x = 0.0
|
||||||
|
self._uwb_y = 0.0
|
||||||
self._obstacle_ahead = False
|
self._obstacle_ahead = False
|
||||||
|
|
||||||
# ── Subscriptions ─────────────────────────────────────────────────────
|
# -- Subscriptions -----------------------------------------------------
|
||||||
|
self.create_subscription(
|
||||||
|
PoseStamped, "/uwb/target", self._uwb_cb, 10)
|
||||||
self.create_subscription(
|
self.create_subscription(
|
||||||
PoseStamped, "/person/target", self._target_cb, 10)
|
PoseStamped, "/person/target", self._target_cb, 10)
|
||||||
self.create_subscription(
|
self.create_subscription(
|
||||||
OccupancyGrid, "/local_costmap/costmap", self._costmap_cb, 1)
|
OccupancyGrid, "/local_costmap/costmap", self._costmap_cb, 1)
|
||||||
|
|
||||||
# ── Publisher ─────────────────────────────────────────────────────────
|
# -- Publisher ---------------------------------------------------------
|
||||||
self._cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10)
|
self._cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10)
|
||||||
|
|
||||||
# ── Control timer ─────────────────────────────────────────────────────
|
# -- Control timer -----------------------------------------------------
|
||||||
self._timer = self.create_timer(
|
self._timer = self.create_timer(
|
||||||
1.0 / self._p["control_rate"], self._control_cb)
|
1.0 / self._p["control_rate"], self._control_cb)
|
||||||
|
|
||||||
self.get_logger().info(
|
self.get_logger().info(
|
||||||
f"PersonFollower ready follow_dist={self._p['follow_distance']}m "
|
"PersonFollower ready "
|
||||||
f"dead_zone=±{self._p['dead_zone']}m "
|
f"follow_dist={self._p['follow_distance']}m "
|
||||||
|
f"dead_zone=+/-{self._p['dead_zone']}m "
|
||||||
f"max_vel={self._p['max_linear_vel']}m/s "
|
f"max_vel={self._p['max_linear_vel']}m/s "
|
||||||
|
f"uwb_weight={self._p['uwb_weight']} "
|
||||||
f"lost_timeout={self._p['lost_timeout']}s"
|
f"lost_timeout={self._p['lost_timeout']}s"
|
||||||
)
|
)
|
||||||
|
|
||||||
# ── Parameter helper ──────────────────────────────────────────────────────
|
# -- Parameter helper ------------------------------------------------------
|
||||||
|
|
||||||
def _reload_params(self):
|
def _reload_params(self):
|
||||||
self._p = {
|
self._p = {
|
||||||
@ -218,12 +271,21 @@ class PersonFollowerNode(Node):
|
|||||||
"robot_width": self.get_parameter("robot_width").value,
|
"robot_width": self.get_parameter("robot_width").value,
|
||||||
"control_rate": self.get_parameter("control_rate").value,
|
"control_rate": self.get_parameter("control_rate").value,
|
||||||
"follow_enabled": self.get_parameter("follow_enabled").value,
|
"follow_enabled": self.get_parameter("follow_enabled").value,
|
||||||
|
"uwb_weight": self.get_parameter("uwb_weight").value,
|
||||||
|
"uwb_timeout": self.get_parameter("uwb_timeout").value,
|
||||||
}
|
}
|
||||||
|
|
||||||
# ── Callbacks ─────────────────────────────────────────────────────────────
|
# -- Callbacks -------------------------------------------------------------
|
||||||
|
|
||||||
|
def _uwb_cb(self, msg):
|
||||||
|
"""UWB-triangulated person position (primary source)."""
|
||||||
|
self._uwb_x = msg.pose.position.x
|
||||||
|
self._uwb_y = msg.pose.position.y
|
||||||
|
self._last_uwb_t = time.monotonic()
|
||||||
|
self._state = _FOLLOWING
|
||||||
|
|
||||||
def _target_cb(self, msg):
|
def _target_cb(self, msg):
|
||||||
"""Person position in base_link frame."""
|
"""Camera-detected person position (secondary source)."""
|
||||||
self._person_x = msg.pose.position.x
|
self._person_x = msg.pose.position.x
|
||||||
self._person_y = msg.pose.position.y
|
self._person_y = msg.pose.position.y
|
||||||
self._last_target_t = time.monotonic()
|
self._last_target_t = time.monotonic()
|
||||||
@ -240,10 +302,9 @@ class PersonFollowerNode(Node):
|
|||||||
threshold=self._p["obstacle_threshold"],
|
threshold=self._p["obstacle_threshold"],
|
||||||
)
|
)
|
||||||
|
|
||||||
# ── Control loop ──────────────────────────────────────────────────────────
|
# -- Control loop ----------------------------------------------------------
|
||||||
|
|
||||||
def _control_cb(self):
|
def _control_cb(self):
|
||||||
# Re-read mutable params every tick (cheap dict lookup)
|
|
||||||
self._reload_params()
|
self._reload_params()
|
||||||
|
|
||||||
twist = Twist()
|
twist = Twist()
|
||||||
@ -253,24 +314,45 @@ class PersonFollowerNode(Node):
|
|||||||
return
|
return
|
||||||
|
|
||||||
now = time.monotonic()
|
now = time.monotonic()
|
||||||
dt_lost = (now - self._last_target_t) if self._last_target_t > 0.0 else 1e9
|
|
||||||
|
|
||||||
# State transitions (don't transition back from STOPPING/SEARCHING
|
uwb_fresh = (
|
||||||
# here — _target_cb resets to FOLLOWING on new detection)
|
self._last_uwb_t > 0.0 and
|
||||||
|
(now - self._last_uwb_t) <= self._p["uwb_timeout"]
|
||||||
|
)
|
||||||
|
cam_fresh = (
|
||||||
|
self._last_target_t > 0.0 and
|
||||||
|
(now - self._last_target_t) <= self._p["lost_timeout"]
|
||||||
|
)
|
||||||
|
|
||||||
|
any_fresh = uwb_fresh or cam_fresh
|
||||||
if self._state == _FOLLOWING:
|
if self._state == _FOLLOWING:
|
||||||
|
if not any_fresh:
|
||||||
|
uwb_stale = (now - self._last_uwb_t) if self._last_uwb_t > 0.0 else 1e9
|
||||||
|
cam_stale = (now - self._last_target_t) if self._last_target_t > 0.0 else 1e9
|
||||||
|
dt_lost = min(uwb_stale, cam_stale)
|
||||||
if dt_lost > self._p["search_timeout"]:
|
if dt_lost > self._p["search_timeout"]:
|
||||||
self._state = _SEARCHING
|
self._state = _SEARCHING
|
||||||
self.get_logger().warn("Person lost — entering SEARCHING state")
|
self.get_logger().warn("Person lost -- entering SEARCHING state")
|
||||||
elif dt_lost > self._p["lost_timeout"]:
|
else:
|
||||||
self._state = _STOPPING
|
self._state = _STOPPING
|
||||||
self.get_logger().warn("Person lost — stopping")
|
self.get_logger().warn("Person lost -- stopping")
|
||||||
|
|
||||||
linear_x, angular_z = 0.0, 0.0
|
linear_x, angular_z = 0.0, 0.0
|
||||||
|
|
||||||
if self._state == _FOLLOWING:
|
if self._state == _FOLLOWING:
|
||||||
|
fx, fy, fresh = fuse_targets(
|
||||||
|
uwb_x=self._uwb_x,
|
||||||
|
uwb_y=self._uwb_y,
|
||||||
|
uwb_fresh=uwb_fresh,
|
||||||
|
cam_x=self._person_x,
|
||||||
|
cam_y=self._person_y,
|
||||||
|
cam_fresh=cam_fresh,
|
||||||
|
uwb_weight=self._p["uwb_weight"],
|
||||||
|
)
|
||||||
|
if fresh:
|
||||||
linear_x, angular_z = compute_follow_cmd(
|
linear_x, angular_z = compute_follow_cmd(
|
||||||
px=self._person_x,
|
px=fx,
|
||||||
py=self._person_y,
|
py=fy,
|
||||||
follow_distance=self._p["follow_distance"],
|
follow_distance=self._p["follow_distance"],
|
||||||
dead_zone=self._p["dead_zone"],
|
dead_zone=self._p["dead_zone"],
|
||||||
kp_linear=self._p["kp_linear"],
|
kp_linear=self._p["kp_linear"],
|
||||||
@ -281,22 +363,19 @@ class PersonFollowerNode(Node):
|
|||||||
)
|
)
|
||||||
if self._obstacle_ahead and linear_x == 0.0:
|
if self._obstacle_ahead and linear_x == 0.0:
|
||||||
self.get_logger().warn(
|
self.get_logger().warn(
|
||||||
"Obstacle in path — forward motion suppressed",
|
"Obstacle in path -- forward motion suppressed",
|
||||||
throttle_duration_sec=2.0,
|
throttle_duration_sec=2.0,
|
||||||
)
|
)
|
||||||
|
|
||||||
elif self._state == _SEARCHING:
|
elif self._state == _SEARCHING:
|
||||||
angular_z = float(self._p["search_angular_vel"])
|
angular_z = float(self._p["search_angular_vel"])
|
||||||
# linear_x stays 0 — rotate in place to search
|
|
||||||
|
|
||||||
# _STOPPING: both stay 0
|
|
||||||
|
|
||||||
twist.linear.x = linear_x
|
twist.linear.x = linear_x
|
||||||
twist.angular.z = angular_z
|
twist.angular.z = angular_z
|
||||||
self._cmd_pub.publish(twist)
|
self._cmd_pub.publish(twist)
|
||||||
|
|
||||||
|
|
||||||
# ── Entry point ───────────────────────────────────────────────────────────────
|
# -- Entry point ---------------------------------------------------------------
|
||||||
|
|
||||||
def main(args=None):
|
def main(args=None):
|
||||||
rclpy.init(args=args)
|
rclpy.init(args=args)
|
||||||
|
|||||||
@ -388,3 +388,98 @@ class TestFollowEnabled:
|
|||||||
lin, ang = (0.0, 0.0) if not follow_enabled else \
|
lin, ang = (0.0, 0.0) if not follow_enabled else \
|
||||||
_compute_follow_cmd(3.0, 0.0, **_DEF)
|
_compute_follow_cmd(3.0, 0.0, **_DEF)
|
||||||
assert lin != pytest.approx(0.0)
|
assert lin != pytest.approx(0.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ── fuse_targets — UWB + camera fusion ──────────────────────────────────────
|
||||||
|
|
||||||
|
def _fuse_targets(uwb_x, uwb_y, uwb_fresh, cam_x, cam_y, cam_fresh, uwb_weight):
|
||||||
|
"""Mirror of person_follower_node.fuse_targets (no ROS2)."""
|
||||||
|
def clamp(v, lo, hi): return max(lo, min(hi, v))
|
||||||
|
if uwb_fresh and cam_fresh:
|
||||||
|
w = clamp(uwb_weight, 0.0, 1.0)
|
||||||
|
return w * uwb_x + (1.0 - w) * cam_x, w * uwb_y + (1.0 - w) * cam_y, True
|
||||||
|
if uwb_fresh:
|
||||||
|
return uwb_x, uwb_y, True
|
||||||
|
if cam_fresh:
|
||||||
|
return cam_x, cam_y, True
|
||||||
|
return 0.0, 0.0, False
|
||||||
|
|
||||||
|
|
||||||
|
class TestFuseTargets:
|
||||||
|
|
||||||
|
def test_both_fresh_blends(self):
|
||||||
|
"""Both sources fresh → weighted blend."""
|
||||||
|
fx, fy, ok = _fuse_targets(
|
||||||
|
uwb_x=4.0, uwb_y=0.0, uwb_fresh=True,
|
||||||
|
cam_x=2.0, cam_y=0.0, cam_fresh=True,
|
||||||
|
uwb_weight=0.7,
|
||||||
|
)
|
||||||
|
assert ok is True
|
||||||
|
assert fx == pytest.approx(0.7 * 4.0 + 0.3 * 2.0)
|
||||||
|
assert fy == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_uwb_only(self):
|
||||||
|
"""Only UWB fresh → UWB position returned as-is."""
|
||||||
|
fx, fy, ok = _fuse_targets(
|
||||||
|
uwb_x=3.0, uwb_y=1.0, uwb_fresh=True,
|
||||||
|
cam_x=0.0, cam_y=0.0, cam_fresh=False,
|
||||||
|
uwb_weight=0.7,
|
||||||
|
)
|
||||||
|
assert ok is True
|
||||||
|
assert fx == pytest.approx(3.0)
|
||||||
|
assert fy == pytest.approx(1.0)
|
||||||
|
|
||||||
|
def test_camera_only(self):
|
||||||
|
"""Only camera fresh → camera position returned as-is."""
|
||||||
|
fx, fy, ok = _fuse_targets(
|
||||||
|
uwb_x=0.0, uwb_y=0.0, uwb_fresh=False,
|
||||||
|
cam_x=2.5, cam_y=-0.5, cam_fresh=True,
|
||||||
|
uwb_weight=0.7,
|
||||||
|
)
|
||||||
|
assert ok is True
|
||||||
|
assert fx == pytest.approx(2.5)
|
||||||
|
assert fy == pytest.approx(-0.5)
|
||||||
|
|
||||||
|
def test_neither_fresh(self):
|
||||||
|
"""Both stale → (0, 0, False)."""
|
||||||
|
fx, fy, ok = _fuse_targets(
|
||||||
|
uwb_x=3.0, uwb_y=1.0, uwb_fresh=False,
|
||||||
|
cam_x=2.0, cam_y=0.0, cam_fresh=False,
|
||||||
|
uwb_weight=0.7,
|
||||||
|
)
|
||||||
|
assert ok is False
|
||||||
|
assert fx == pytest.approx(0.0)
|
||||||
|
assert fy == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_uwb_weight_zero_gives_camera(self):
|
||||||
|
"""uwb_weight=0.0 → pure camera even when both fresh."""
|
||||||
|
fx, fy, ok = _fuse_targets(
|
||||||
|
uwb_x=4.0, uwb_y=2.0, uwb_fresh=True,
|
||||||
|
cam_x=1.0, cam_y=0.5, cam_fresh=True,
|
||||||
|
uwb_weight=0.0,
|
||||||
|
)
|
||||||
|
assert ok is True
|
||||||
|
assert fx == pytest.approx(1.0)
|
||||||
|
assert fy == pytest.approx(0.5)
|
||||||
|
|
||||||
|
def test_uwb_weight_one_gives_uwb(self):
|
||||||
|
"""uwb_weight=1.0 → pure UWB even when both fresh."""
|
||||||
|
fx, fy, ok = _fuse_targets(
|
||||||
|
uwb_x=4.0, uwb_y=2.0, uwb_fresh=True,
|
||||||
|
cam_x=1.0, cam_y=0.5, cam_fresh=True,
|
||||||
|
uwb_weight=1.0,
|
||||||
|
)
|
||||||
|
assert ok is True
|
||||||
|
assert fx == pytest.approx(4.0)
|
||||||
|
assert fy == pytest.approx(2.0)
|
||||||
|
|
||||||
|
def test_lateral_blend(self):
|
||||||
|
"""Lateral (Y) component is also blended correctly."""
|
||||||
|
fx, fy, ok = _fuse_targets(
|
||||||
|
uwb_x=2.0, uwb_y=1.0, uwb_fresh=True,
|
||||||
|
cam_x=2.0, cam_y=-1.0, cam_fresh=True,
|
||||||
|
uwb_weight=0.5,
|
||||||
|
)
|
||||||
|
assert ok is True
|
||||||
|
assert fx == pytest.approx(2.0)
|
||||||
|
assert fy == pytest.approx(0.0) # 0.5*1.0 + 0.5*(-1.0) = 0
|
||||||
|
|||||||
57
jetson/ros2_ws/src/saltybot_uwb/config/uwb_config.yaml
Normal file
57
jetson/ros2_ws/src/saltybot_uwb/config/uwb_config.yaml
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
# uwb_config.yaml — MaUWB ESP32-S3 DW3000 UWB follow-me system
|
||||||
|
#
|
||||||
|
# Hardware layout:
|
||||||
|
# Anchor-0 (port side) → USB port_a, y = +anchor_separation/2
|
||||||
|
# Anchor-1 (starboard side) → USB port_b, y = -anchor_separation/2
|
||||||
|
# Tag on person → belt clip, ~0.9m above ground
|
||||||
|
#
|
||||||
|
# Run with:
|
||||||
|
# ros2 launch saltybot_uwb uwb.launch.py
|
||||||
|
# Override at launch:
|
||||||
|
# ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2
|
||||||
|
|
||||||
|
# ── Serial ports ──────────────────────────────────────────────────────────────
|
||||||
|
# Set udev rules to get stable symlinks:
|
||||||
|
# /dev/uwb-anchor0 → port_a
|
||||||
|
# /dev/uwb-anchor1 → port_b
|
||||||
|
# (See jetson/docs/pinout.md for udev setup)
|
||||||
|
port_a: /dev/uwb-anchor0 # Anchor-0 (port)
|
||||||
|
port_b: /dev/uwb-anchor1 # Anchor-1 (starboard)
|
||||||
|
baudrate: 115200 # MaUWB default — do not change
|
||||||
|
|
||||||
|
# ── Anchor geometry ────────────────────────────────────────────────────────────
|
||||||
|
# anchor_separation: centre-to-centre distance between anchors (metres)
|
||||||
|
# Must match physical mounting. Larger = more accurate lateral resolution.
|
||||||
|
anchor_separation: 0.25 # metres (25cm)
|
||||||
|
|
||||||
|
# anchor_height: height of anchors above ground (metres)
|
||||||
|
# Orin stem mount ≈ 0.80m on the saltybot platform
|
||||||
|
anchor_height: 0.80 # metres
|
||||||
|
|
||||||
|
# tag_height: height of person's belt-clip tag above ground (metres)
|
||||||
|
tag_height: 0.90 # metres (adjust per user)
|
||||||
|
|
||||||
|
# ── Range validity ─────────────────────────────────────────────────────────────
|
||||||
|
# range_timeout_s: stale anchor — excluded from triangulation after this gap
|
||||||
|
range_timeout_s: 1.0 # seconds
|
||||||
|
|
||||||
|
# max_range_m: discard ranges beyond this (DW3000 indoor practical limit ≈8m)
|
||||||
|
max_range_m: 8.0 # metres
|
||||||
|
|
||||||
|
# min_range_m: discard ranges below this (likely multipath artefacts)
|
||||||
|
min_range_m: 0.05 # metres
|
||||||
|
|
||||||
|
# ── Kalman filter ──────────────────────────────────────────────────────────────
|
||||||
|
# kf_process_noise: Q scalar — how dynamic the person's motion is
|
||||||
|
# Higher → faster response, more jitter
|
||||||
|
kf_process_noise: 0.1
|
||||||
|
|
||||||
|
# kf_meas_noise: R scalar — how noisy the UWB ranging is
|
||||||
|
# DW3000 indoor accuracy ≈ 10cm RMS → 0.1m → R ≈ 0.01
|
||||||
|
# Use 0.3 to be conservative on first deployment
|
||||||
|
kf_meas_noise: 0.3
|
||||||
|
|
||||||
|
# ── Publish rate ──────────────────────────────────────────────────────────────
|
||||||
|
# Should match or exceed the AT+RANGE? poll rate from both anchors.
|
||||||
|
# 20Hz means 50ms per cycle; each anchor query takes ~10ms → headroom ok.
|
||||||
|
publish_rate: 20.0 # Hz
|
||||||
59
jetson/ros2_ws/src/saltybot_uwb/launch/uwb.launch.py
Normal file
59
jetson/ros2_ws/src/saltybot_uwb/launch/uwb.launch.py
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
"""
|
||||||
|
uwb.launch.py — Launch UWB driver node for MaUWB ESP32-S3 follow-me.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
ros2 launch saltybot_uwb uwb.launch.py
|
||||||
|
ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2 port_b:=/dev/ttyUSB3
|
||||||
|
ros2 launch saltybot_uwb uwb.launch.py anchor_separation:=0.30 publish_rate:=10.0
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
pkg = get_package_share_directory("saltybot_uwb")
|
||||||
|
cfg = os.path.join(pkg, "config", "uwb_config.yaml")
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument("port_a", default_value="/dev/uwb-anchor0"),
|
||||||
|
DeclareLaunchArgument("port_b", default_value="/dev/uwb-anchor1"),
|
||||||
|
DeclareLaunchArgument("baudrate", default_value="115200"),
|
||||||
|
DeclareLaunchArgument("anchor_separation", default_value="0.25"),
|
||||||
|
DeclareLaunchArgument("anchor_height", default_value="0.80"),
|
||||||
|
DeclareLaunchArgument("tag_height", default_value="0.90"),
|
||||||
|
DeclareLaunchArgument("range_timeout_s", default_value="1.0"),
|
||||||
|
DeclareLaunchArgument("max_range_m", default_value="8.0"),
|
||||||
|
DeclareLaunchArgument("min_range_m", default_value="0.05"),
|
||||||
|
DeclareLaunchArgument("kf_process_noise", default_value="0.1"),
|
||||||
|
DeclareLaunchArgument("kf_meas_noise", default_value="0.3"),
|
||||||
|
DeclareLaunchArgument("publish_rate", default_value="20.0"),
|
||||||
|
|
||||||
|
Node(
|
||||||
|
package="saltybot_uwb",
|
||||||
|
executable="uwb_driver",
|
||||||
|
name="uwb_driver",
|
||||||
|
output="screen",
|
||||||
|
parameters=[
|
||||||
|
cfg,
|
||||||
|
{
|
||||||
|
"port_a": LaunchConfiguration("port_a"),
|
||||||
|
"port_b": LaunchConfiguration("port_b"),
|
||||||
|
"baudrate": LaunchConfiguration("baudrate"),
|
||||||
|
"anchor_separation": LaunchConfiguration("anchor_separation"),
|
||||||
|
"anchor_height": LaunchConfiguration("anchor_height"),
|
||||||
|
"tag_height": LaunchConfiguration("tag_height"),
|
||||||
|
"range_timeout_s": LaunchConfiguration("range_timeout_s"),
|
||||||
|
"max_range_m": LaunchConfiguration("max_range_m"),
|
||||||
|
"min_range_m": LaunchConfiguration("min_range_m"),
|
||||||
|
"kf_process_noise": LaunchConfiguration("kf_process_noise"),
|
||||||
|
"kf_meas_noise": LaunchConfiguration("kf_meas_noise"),
|
||||||
|
"publish_rate": LaunchConfiguration("publish_rate"),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
])
|
||||||
28
jetson/ros2_ws/src/saltybot_uwb/package.xml
Normal file
28
jetson/ros2_ws/src/saltybot_uwb/package.xml
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_uwb</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
UWB follow-me driver for saltybot — MaUWB ESP32-S3 DW3000 two-anchor
|
||||||
|
ranging, triangulation, Kalman filtering, and /uwb/target PoseStamped publisher.
|
||||||
|
</description>
|
||||||
|
<maintainer email="seb@vayrette.com">seb</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>saltybot_uwb_msgs</depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
278
jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/ranging_math.py
Normal file
278
jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/ranging_math.py
Normal file
@ -0,0 +1,278 @@
|
|||||||
|
"""
|
||||||
|
ranging_math.py — Pure triangulation and Kalman filter helpers for UWB follow-me.
|
||||||
|
|
||||||
|
No ROS2 dependencies — importable in unit tests without a ROS2 install.
|
||||||
|
|
||||||
|
2-Anchor geometry
|
||||||
|
─────────────────
|
||||||
|
Anchors are mounted on the robot stem, separated by `sep` metres along the Y-axis:
|
||||||
|
|
||||||
|
A0 (0, +sep/2, anchor_z) ← port side
|
||||||
|
A1 (0, -sep/2, anchor_z) ← starboard side
|
||||||
|
|
||||||
|
The person tag is at (x_t, y_t, tag_z) in base_link.
|
||||||
|
|
||||||
|
Step 1 — height compensation:
|
||||||
|
dz = anchor_z - tag_z
|
||||||
|
r0_h = sqrt(max(0, r0² - dz²))
|
||||||
|
r1_h = sqrt(max(0, r1² - dz²))
|
||||||
|
|
||||||
|
Step 2 — lateral offset (Y):
|
||||||
|
y_t = (r1_h² - r0_h²) / (2 * sep)
|
||||||
|
|
||||||
|
Step 3 — forward distance (X):
|
||||||
|
x_t = sqrt(max(0, r0_h² - (y_t - sep/2)²))
|
||||||
|
|
||||||
|
Returns (x_t, y_t); caller should treat negative x_t as 0.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
|
|
||||||
|
|
||||||
|
# ── Triangulation ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def triangulate_2anchor(
|
||||||
|
r0: float,
|
||||||
|
r1: float,
|
||||||
|
sep: float,
|
||||||
|
anchor_z: float = 0.0,
|
||||||
|
tag_z: float = 0.0,
|
||||||
|
) -> tuple:
|
||||||
|
"""
|
||||||
|
Triangulate person position in base_link from two anchor ranges.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
r0 : range to anchor-0 (port, y = +sep/2) in metres
|
||||||
|
r1 : range to anchor-1 (starboard, y = -sep/2) in metres
|
||||||
|
sep : anchor separation in metres (centre-to-centre)
|
||||||
|
anchor_z : height of anchors above ground (metres, default 0)
|
||||||
|
tag_z : height of person tag above ground (metres, default 0)
|
||||||
|
|
||||||
|
Returns
|
||||||
|
-------
|
||||||
|
(x_t, y_t) : person position in metres (base_link X = forward, Y = left)
|
||||||
|
x_t is clamped to ≥ 0 (can't be behind anchors geometrically).
|
||||||
|
"""
|
||||||
|
if sep <= 0.0:
|
||||||
|
raise ValueError(f"sep must be > 0, got {sep}")
|
||||||
|
if r0 < 0.0 or r1 < 0.0:
|
||||||
|
raise ValueError(f"ranges must be ≥ 0, got r0={r0}, r1={r1}")
|
||||||
|
|
||||||
|
# Height correction — bring to horizontal plane
|
||||||
|
dz = anchor_z - tag_z
|
||||||
|
dz2 = dz * dz
|
||||||
|
r0_h2 = max(0.0, r0 * r0 - dz2)
|
||||||
|
r1_h2 = max(0.0, r1 * r1 - dz2)
|
||||||
|
r0_h = math.sqrt(r0_h2)
|
||||||
|
|
||||||
|
# Lateral (Y) offset — from circle-circle intersection formula
|
||||||
|
y_t = (r1_h2 - r0_h2) / (2.0 * sep)
|
||||||
|
|
||||||
|
# Forward (X) distance
|
||||||
|
dx2 = r0_h2 - (y_t - sep / 2.0) ** 2
|
||||||
|
x_t = math.sqrt(max(0.0, dx2))
|
||||||
|
|
||||||
|
return x_t, y_t
|
||||||
|
|
||||||
|
|
||||||
|
# ── Simple 2-D Kalman filter ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class KalmanFilter2D:
|
||||||
|
"""
|
||||||
|
Constant-velocity Kalman filter for a 2-D position estimate.
|
||||||
|
|
||||||
|
State vector: [x, y, vx, vy]ᵀ
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
process_noise : Q scalar — how much we trust the motion model
|
||||||
|
measurement_noise : R scalar — how noisy the UWB measurement is
|
||||||
|
dt : expected update interval in seconds (used for F matrix)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
process_noise: float = 0.1,
|
||||||
|
measurement_noise: float = 0.3,
|
||||||
|
dt: float = 0.05,
|
||||||
|
):
|
||||||
|
self._q = process_noise
|
||||||
|
self._r = measurement_noise
|
||||||
|
self._dt = dt
|
||||||
|
|
||||||
|
# State: [x, y, vx, vy]
|
||||||
|
self._x = [0.0, 0.0, 0.0, 0.0]
|
||||||
|
|
||||||
|
# Covariance matrix P (4×4, stored as flat list, row-major)
|
||||||
|
init_pos_var = 1.0
|
||||||
|
init_vel_var = 0.5
|
||||||
|
self._P = [
|
||||||
|
init_pos_var, 0.0, 0.0, 0.0,
|
||||||
|
0.0, init_pos_var, 0.0, 0.0,
|
||||||
|
0.0, 0.0, init_vel_var, 0.0,
|
||||||
|
0.0, 0.0, 0.0, init_vel_var,
|
||||||
|
]
|
||||||
|
self._initialised = False
|
||||||
|
|
||||||
|
# ── Matrix helpers (minimal, no numpy — importable without GPU deps) ──────
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _mat_add(A, B):
|
||||||
|
return [a + b for a, b in zip(A, B)]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _mat_sub(A, B):
|
||||||
|
return [a - b for a, b in zip(A, B)]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _mat_mul(A, B, n):
|
||||||
|
"""n×n matrix multiply."""
|
||||||
|
C = [0.0] * (n * n)
|
||||||
|
for i in range(n):
|
||||||
|
for k in range(n):
|
||||||
|
for j in range(n):
|
||||||
|
C[i * n + j] += A[i * n + k] * B[k * n + j]
|
||||||
|
return C
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _mat_T(A, n):
|
||||||
|
"""n×n transpose."""
|
||||||
|
T = [0.0] * (n * n)
|
||||||
|
for i in range(n):
|
||||||
|
for j in range(n):
|
||||||
|
T[j * n + i] = A[i * n + j]
|
||||||
|
return T
|
||||||
|
|
||||||
|
def _build_F(self):
|
||||||
|
dt = self._dt
|
||||||
|
return [
|
||||||
|
1.0, 0.0, dt, 0.0,
|
||||||
|
0.0, 1.0, 0.0, dt,
|
||||||
|
0.0, 0.0, 1.0, 0.0,
|
||||||
|
0.0, 0.0, 0.0, 1.0,
|
||||||
|
]
|
||||||
|
|
||||||
|
def _build_Q(self):
|
||||||
|
q = self._q
|
||||||
|
dt = self._dt
|
||||||
|
dt2 = dt * dt
|
||||||
|
dt3 = dt2 * dt
|
||||||
|
dt4 = dt3 * dt
|
||||||
|
# Continuous white-noise model discretised for constant-velocity
|
||||||
|
return [
|
||||||
|
q * dt4 / 4, 0.0, q * dt3 / 2, 0.0,
|
||||||
|
0.0, q * dt4 / 4, 0.0, q * dt3 / 2,
|
||||||
|
q * dt3 / 2, 0.0, q * dt2, 0.0,
|
||||||
|
0.0, q * dt3 / 2, 0.0, q * dt2,
|
||||||
|
]
|
||||||
|
|
||||||
|
def predict(self, dt: float = None):
|
||||||
|
"""Predict step — advances state by dt (defaults to self._dt)."""
|
||||||
|
if dt is not None:
|
||||||
|
self._dt = dt
|
||||||
|
F = self._build_F()
|
||||||
|
Q = self._build_Q()
|
||||||
|
n = 4
|
||||||
|
x = self._x
|
||||||
|
# x = F @ x
|
||||||
|
new_x = [
|
||||||
|
F[0] * x[0] + F[1] * x[1] + F[2] * x[2] + F[3] * x[3],
|
||||||
|
F[4] * x[0] + F[5] * x[1] + F[6] * x[2] + F[7] * x[3],
|
||||||
|
F[8] * x[0] + F[9] * x[1] + F[10] * x[2] + F[11] * x[3],
|
||||||
|
F[12] * x[0] + F[13] * x[1] + F[14] * x[2] + F[15] * x[3],
|
||||||
|
]
|
||||||
|
# P = F @ P @ F.T + Q
|
||||||
|
FP = self._mat_mul(F, self._P, n)
|
||||||
|
FT = self._mat_T(F, n)
|
||||||
|
FPF = self._mat_mul(FP, FT, n)
|
||||||
|
self._P = self._mat_add(FPF, Q)
|
||||||
|
self._x = new_x
|
||||||
|
|
||||||
|
def update(self, x_meas: float, y_meas: float):
|
||||||
|
"""
|
||||||
|
Update step with a 2-D position measurement.
|
||||||
|
|
||||||
|
H = [[1,0,0,0],[0,1,0,0]] (observe position only)
|
||||||
|
"""
|
||||||
|
if not self._initialised:
|
||||||
|
self._x[0] = x_meas
|
||||||
|
self._x[1] = y_meas
|
||||||
|
self._initialised = True
|
||||||
|
return
|
||||||
|
|
||||||
|
# Innovation
|
||||||
|
innov_x = x_meas - self._x[0]
|
||||||
|
innov_y = y_meas - self._x[1]
|
||||||
|
|
||||||
|
# S = H @ P @ H.T + R (2×2, position rows/cols only)
|
||||||
|
P = self._P
|
||||||
|
r = self._r
|
||||||
|
# H picks rows 0,1 and cols 0,1 from P
|
||||||
|
S00 = P[0] + r
|
||||||
|
S01 = P[1]
|
||||||
|
S10 = P[4]
|
||||||
|
S11 = P[5] + r
|
||||||
|
|
||||||
|
# K = P @ H.T @ S⁻¹ (4×2 Kalman gain)
|
||||||
|
det = S00 * S11 - S01 * S10
|
||||||
|
if abs(det) < 1e-12:
|
||||||
|
return
|
||||||
|
inv_det = 1.0 / det
|
||||||
|
# S⁻¹
|
||||||
|
Si00 = S11 * inv_det
|
||||||
|
Si01 = -S01 * inv_det
|
||||||
|
Si10 = -S10 * inv_det
|
||||||
|
Si11 = S00 * inv_det
|
||||||
|
|
||||||
|
# P @ H.T = first two columns of P (H.T selects cols 0,1)
|
||||||
|
PH = [P[0], P[1], P[4], P[5], P[8], P[9], P[12], P[13]] # 4×2
|
||||||
|
|
||||||
|
# K (4×2) = PH @ Si
|
||||||
|
K = [
|
||||||
|
PH[0] * Si00 + PH[1] * Si10, PH[0] * Si01 + PH[1] * Si11,
|
||||||
|
PH[2] * Si00 + PH[3] * Si10, PH[2] * Si01 + PH[3] * Si11,
|
||||||
|
PH[4] * Si00 + PH[5] * Si10, PH[4] * Si01 + PH[5] * Si11,
|
||||||
|
PH[6] * Si00 + PH[7] * Si10, PH[6] * Si01 + PH[7] * Si11,
|
||||||
|
]
|
||||||
|
|
||||||
|
# x = x + K @ innov
|
||||||
|
self._x[0] += K[0] * innov_x + K[1] * innov_y
|
||||||
|
self._x[1] += K[2] * innov_x + K[3] * innov_y
|
||||||
|
self._x[2] += K[4] * innov_x + K[5] * innov_y
|
||||||
|
self._x[3] += K[6] * innov_x + K[7] * innov_y
|
||||||
|
|
||||||
|
# P = (I - K @ H) @ P (Joseph form for numerical stability skipped —
|
||||||
|
# this is a low-latency embedded context where simplicity wins)
|
||||||
|
n = 4
|
||||||
|
# K @ H selects first two columns → rows of P
|
||||||
|
KH = [0.0] * 16
|
||||||
|
for row in range(4):
|
||||||
|
KH[row * 4 + 0] = K[row * 2 + 0]
|
||||||
|
KH[row * 4 + 1] = K[row * 2 + 1]
|
||||||
|
# I - KH
|
||||||
|
IKH = [-KH[i] for i in range(16)]
|
||||||
|
for i in range(4):
|
||||||
|
IKH[i * 4 + i] += 1.0
|
||||||
|
self._P = self._mat_mul(IKH, self._P, n)
|
||||||
|
|
||||||
|
def position(self) -> tuple:
|
||||||
|
"""Return current (x, y) position estimate."""
|
||||||
|
return self._x[0], self._x[1]
|
||||||
|
|
||||||
|
def velocity(self) -> tuple:
|
||||||
|
"""Return current (vx, vy) velocity estimate."""
|
||||||
|
return self._x[2], self._x[3]
|
||||||
|
|
||||||
|
def reset(self):
|
||||||
|
"""Reset filter to uninitialised state."""
|
||||||
|
self._x = [0.0, 0.0, 0.0, 0.0]
|
||||||
|
init_pos_var = 1.0
|
||||||
|
init_vel_var = 0.5
|
||||||
|
self._P = [
|
||||||
|
init_pos_var, 0.0, 0.0, 0.0,
|
||||||
|
0.0, init_pos_var, 0.0, 0.0,
|
||||||
|
0.0, 0.0, init_vel_var, 0.0,
|
||||||
|
0.0, 0.0, 0.0, init_vel_var,
|
||||||
|
]
|
||||||
|
self._initialised = False
|
||||||
306
jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/uwb_driver_node.py
Normal file
306
jetson/ros2_ws/src/saltybot_uwb/saltybot_uwb/uwb_driver_node.py
Normal file
@ -0,0 +1,306 @@
|
|||||||
|
"""
|
||||||
|
uwb_driver_node.py — ROS2 node for MaUWB ESP32-S3 DW3000 follow-me system.
|
||||||
|
|
||||||
|
Hardware
|
||||||
|
────────
|
||||||
|
• 2× MaUWB ESP32-S3 DW3000 anchors on robot stem (USB → Orin Nano)
|
||||||
|
- Anchor-0: port side (y = +sep/2)
|
||||||
|
- Anchor-1: starboard (y = -sep/2)
|
||||||
|
• 1× MaUWB tag on person (belt clip)
|
||||||
|
|
||||||
|
AT command interface (115200 8N1)
|
||||||
|
──────────────────────────────────
|
||||||
|
Query: AT+RANGE?\r\n
|
||||||
|
Response (from anchors):
|
||||||
|
+RANGE:<anchor_id>,<range_mm>[,<rssi>]\r\n
|
||||||
|
|
||||||
|
Config:
|
||||||
|
AT+anchor_tag=ANCHOR\r\n — set module as anchor
|
||||||
|
AT+anchor_tag=TAG\r\n — set module as tag
|
||||||
|
|
||||||
|
Publishes
|
||||||
|
─────────
|
||||||
|
/uwb/target (geometry_msgs/PoseStamped) — triangulated person position in base_link
|
||||||
|
/uwb/ranges (saltybot_uwb_msgs/UwbRangeArray) — raw ranges from both anchors
|
||||||
|
|
||||||
|
Safety
|
||||||
|
──────
|
||||||
|
If a range is stale (> range_timeout_s), that anchor is excluded from
|
||||||
|
triangulation. If both anchors are stale, /uwb/target is not published.
|
||||||
|
|
||||||
|
Usage
|
||||||
|
─────
|
||||||
|
ros2 launch saltybot_uwb uwb.launch.py
|
||||||
|
ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2 port_b:=/dev/ttyUSB3
|
||||||
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
|
import re
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from geometry_msgs.msg import PoseStamped
|
||||||
|
from std_msgs.msg import Header
|
||||||
|
|
||||||
|
from saltybot_uwb_msgs.msg import UwbRange, UwbRangeArray
|
||||||
|
from saltybot_uwb.ranging_math import triangulate_2anchor, KalmanFilter2D
|
||||||
|
|
||||||
|
try:
|
||||||
|
import serial
|
||||||
|
_SERIAL_AVAILABLE = True
|
||||||
|
except ImportError:
|
||||||
|
_SERIAL_AVAILABLE = False
|
||||||
|
|
||||||
|
|
||||||
|
# Regex: +RANGE:<id>,<mm> or +RANGE:<id>,<mm>,<rssi>
|
||||||
|
_RANGE_RE = re.compile(
|
||||||
|
r"\+RANGE\s*:\s*(\d+)\s*,\s*(\d+)(?:\s*,\s*(-?[\d.]+))?",
|
||||||
|
re.IGNORECASE,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class SerialReader(threading.Thread):
|
||||||
|
"""
|
||||||
|
Background thread: polls an anchor's UART, fires callback on every
|
||||||
|
valid +RANGE response.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, port, baudrate, anchor_id, callback, logger):
|
||||||
|
super().__init__(daemon=True)
|
||||||
|
self._port = port
|
||||||
|
self._baudrate = baudrate
|
||||||
|
self._anchor_id = anchor_id
|
||||||
|
self._callback = callback
|
||||||
|
self._logger = logger
|
||||||
|
self._running = False
|
||||||
|
self._ser = None
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self._running = True
|
||||||
|
while self._running:
|
||||||
|
try:
|
||||||
|
self._ser = serial.Serial(
|
||||||
|
self._port, self._baudrate, timeout=0.5
|
||||||
|
)
|
||||||
|
self._logger.info(
|
||||||
|
f"Anchor-{self._anchor_id}: opened {self._port}"
|
||||||
|
)
|
||||||
|
self._read_loop()
|
||||||
|
except Exception as exc:
|
||||||
|
self._logger.warn(
|
||||||
|
f"Anchor-{self._anchor_id} serial error: {exc} — retrying in 2s"
|
||||||
|
)
|
||||||
|
if self._ser and self._ser.is_open:
|
||||||
|
self._ser.close()
|
||||||
|
time.sleep(2.0)
|
||||||
|
|
||||||
|
def _read_loop(self):
|
||||||
|
while self._running:
|
||||||
|
try:
|
||||||
|
# Query the anchor
|
||||||
|
self._ser.write(b"AT+RANGE?\r\n")
|
||||||
|
# Read up to 10 lines waiting for a +RANGE response
|
||||||
|
for _ in range(10):
|
||||||
|
raw = self._ser.readline()
|
||||||
|
if not raw:
|
||||||
|
break
|
||||||
|
line = raw.decode("ascii", errors="replace").strip()
|
||||||
|
m = _RANGE_RE.search(line)
|
||||||
|
if m:
|
||||||
|
range_mm = int(m.group(2))
|
||||||
|
rssi = float(m.group(3)) if m.group(3) else 0.0
|
||||||
|
self._callback(self._anchor_id, range_mm, rssi)
|
||||||
|
break
|
||||||
|
except Exception as exc:
|
||||||
|
self._logger.warn(
|
||||||
|
f"Anchor-{self._anchor_id} read error: {exc}"
|
||||||
|
)
|
||||||
|
break # trigger reconnect
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self._running = False
|
||||||
|
if self._ser and self._ser.is_open:
|
||||||
|
self._ser.close()
|
||||||
|
|
||||||
|
|
||||||
|
class UwbDriverNode(Node):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("uwb_driver")
|
||||||
|
|
||||||
|
# ── Parameters ────────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter("port_a", "/dev/ttyUSB0")
|
||||||
|
self.declare_parameter("port_b", "/dev/ttyUSB1")
|
||||||
|
self.declare_parameter("baudrate", 115200)
|
||||||
|
self.declare_parameter("anchor_separation", 0.25)
|
||||||
|
self.declare_parameter("anchor_height", 0.80)
|
||||||
|
self.declare_parameter("tag_height", 0.90)
|
||||||
|
self.declare_parameter("range_timeout_s", 1.0)
|
||||||
|
self.declare_parameter("max_range_m", 8.0)
|
||||||
|
self.declare_parameter("min_range_m", 0.05)
|
||||||
|
self.declare_parameter("kf_process_noise", 0.1)
|
||||||
|
self.declare_parameter("kf_meas_noise", 0.3)
|
||||||
|
self.declare_parameter("publish_rate", 20.0)
|
||||||
|
|
||||||
|
self._p = self._load_params()
|
||||||
|
|
||||||
|
# ── State (protected by lock) ──────────────────────────────────────
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
self._ranges = {} # anchor_id → (range_m, rssi, timestamp)
|
||||||
|
self._kf = KalmanFilter2D(
|
||||||
|
process_noise=self._p["kf_process_noise"],
|
||||||
|
measurement_noise=self._p["kf_meas_noise"],
|
||||||
|
dt=1.0 / self._p["publish_rate"],
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Publishers ────────────────────────────────────────────────────
|
||||||
|
self._target_pub = self.create_publisher(
|
||||||
|
PoseStamped, "/uwb/target", 10)
|
||||||
|
self._ranges_pub = self.create_publisher(
|
||||||
|
UwbRangeArray, "/uwb/ranges", 10)
|
||||||
|
|
||||||
|
# ── Serial readers ────────────────────────────────────────────────
|
||||||
|
if _SERIAL_AVAILABLE:
|
||||||
|
self._reader_a = SerialReader(
|
||||||
|
self._p["port_a"], self._p["baudrate"],
|
||||||
|
anchor_id=0, callback=self._range_cb,
|
||||||
|
logger=self.get_logger(),
|
||||||
|
)
|
||||||
|
self._reader_b = SerialReader(
|
||||||
|
self._p["port_b"], self._p["baudrate"],
|
||||||
|
anchor_id=1, callback=self._range_cb,
|
||||||
|
logger=self.get_logger(),
|
||||||
|
)
|
||||||
|
self._reader_a.start()
|
||||||
|
self._reader_b.start()
|
||||||
|
else:
|
||||||
|
self.get_logger().warn(
|
||||||
|
"pyserial not installed — running in simulation mode (no serial I/O)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Publish timer ─────────────────────────────────────────────────
|
||||||
|
self._timer = self.create_timer(
|
||||||
|
1.0 / self._p["publish_rate"], self._publish_cb
|
||||||
|
)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"UWB driver ready sep={self._p['anchor_separation']}m "
|
||||||
|
f"ports={self._p['port_a']},{self._p['port_b']} "
|
||||||
|
f"rate={self._p['publish_rate']}Hz"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Helpers ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _load_params(self):
|
||||||
|
return {
|
||||||
|
"port_a": self.get_parameter("port_a").value,
|
||||||
|
"port_b": self.get_parameter("port_b").value,
|
||||||
|
"baudrate": self.get_parameter("baudrate").value,
|
||||||
|
"anchor_separation": self.get_parameter("anchor_separation").value,
|
||||||
|
"anchor_height": self.get_parameter("anchor_height").value,
|
||||||
|
"tag_height": self.get_parameter("tag_height").value,
|
||||||
|
"range_timeout_s": self.get_parameter("range_timeout_s").value,
|
||||||
|
"max_range_m": self.get_parameter("max_range_m").value,
|
||||||
|
"min_range_m": self.get_parameter("min_range_m").value,
|
||||||
|
"kf_process_noise": self.get_parameter("kf_process_noise").value,
|
||||||
|
"kf_meas_noise": self.get_parameter("kf_meas_noise").value,
|
||||||
|
"publish_rate": self.get_parameter("publish_rate").value,
|
||||||
|
}
|
||||||
|
|
||||||
|
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _range_cb(self, anchor_id: int, range_mm: int, rssi: float):
|
||||||
|
"""Called from serial reader threads — thread-safe update."""
|
||||||
|
range_m = range_mm / 1000.0
|
||||||
|
p = self._p
|
||||||
|
if range_m < p["min_range_m"] or range_m > p["max_range_m"]:
|
||||||
|
return
|
||||||
|
with self._lock:
|
||||||
|
self._ranges[anchor_id] = (range_m, rssi, time.monotonic())
|
||||||
|
|
||||||
|
def _publish_cb(self):
|
||||||
|
now = time.monotonic()
|
||||||
|
timeout = self._p["range_timeout_s"]
|
||||||
|
sep = self._p["anchor_separation"]
|
||||||
|
|
||||||
|
with self._lock:
|
||||||
|
# Collect valid (non-stale) ranges
|
||||||
|
valid = {}
|
||||||
|
for aid, (r, rssi, t) in self._ranges.items():
|
||||||
|
if now - t <= timeout:
|
||||||
|
valid[aid] = (r, rssi, t)
|
||||||
|
|
||||||
|
# Build and publish UwbRangeArray regardless (even if partial)
|
||||||
|
hdr = Header()
|
||||||
|
hdr.stamp = self.get_clock().now().to_msg()
|
||||||
|
hdr.frame_id = "base_link"
|
||||||
|
|
||||||
|
arr = UwbRangeArray()
|
||||||
|
arr.header = hdr
|
||||||
|
for aid, (r, rssi, _) in valid.items():
|
||||||
|
entry = UwbRange()
|
||||||
|
entry.header = hdr
|
||||||
|
entry.anchor_id = aid
|
||||||
|
entry.range_m = float(r)
|
||||||
|
entry.raw_mm = int(round(r * 1000.0))
|
||||||
|
entry.rssi = float(rssi)
|
||||||
|
arr.ranges.append(entry)
|
||||||
|
self._ranges_pub.publish(arr)
|
||||||
|
|
||||||
|
# Need both anchors to triangulate
|
||||||
|
if 0 not in valid or 1 not in valid:
|
||||||
|
return
|
||||||
|
|
||||||
|
r0 = valid[0][0]
|
||||||
|
r1 = valid[1][0]
|
||||||
|
|
||||||
|
try:
|
||||||
|
x_t, y_t = triangulate_2anchor(
|
||||||
|
r0=r0,
|
||||||
|
r1=r1,
|
||||||
|
sep=sep,
|
||||||
|
anchor_z=self._p["anchor_height"],
|
||||||
|
tag_z=self._p["tag_height"],
|
||||||
|
)
|
||||||
|
except (ValueError, ZeroDivisionError) as exc:
|
||||||
|
self.get_logger().warn(f"Triangulation error: {exc}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Kalman filter update
|
||||||
|
dt = 1.0 / self._p["publish_rate"]
|
||||||
|
self._kf.predict(dt=dt)
|
||||||
|
self._kf.update(x_t, y_t)
|
||||||
|
kx, ky = self._kf.position()
|
||||||
|
|
||||||
|
# Publish PoseStamped in base_link
|
||||||
|
pose = PoseStamped()
|
||||||
|
pose.header = hdr
|
||||||
|
pose.pose.position.x = kx
|
||||||
|
pose.pose.position.y = ky
|
||||||
|
pose.pose.position.z = 0.0
|
||||||
|
# Orientation: face the person (yaw = atan2(y, x))
|
||||||
|
yaw = math.atan2(ky, kx)
|
||||||
|
pose.pose.orientation.z = math.sin(yaw / 2.0)
|
||||||
|
pose.pose.orientation.w = math.cos(yaw / 2.0)
|
||||||
|
|
||||||
|
self._target_pub.publish(pose)
|
||||||
|
|
||||||
|
|
||||||
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = UwbDriverNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.try_shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
4
jetson/ros2_ws/src/saltybot_uwb/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_uwb/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_uwb
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_uwb
|
||||||
32
jetson/ros2_ws/src/saltybot_uwb/setup.py
Normal file
32
jetson/ros2_ws/src/saltybot_uwb/setup.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
from setuptools import setup
|
||||||
|
import os
|
||||||
|
from glob import glob
|
||||||
|
|
||||||
|
package_name = "saltybot_uwb"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=[package_name],
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages",
|
||||||
|
["resource/" + package_name]),
|
||||||
|
("share/" + package_name, ["package.xml"]),
|
||||||
|
(os.path.join("share", package_name, "launch"),
|
||||||
|
glob("launch/*.py")),
|
||||||
|
(os.path.join("share", package_name, "config"),
|
||||||
|
glob("config/*.yaml")),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="seb",
|
||||||
|
maintainer_email="seb@vayrette.com",
|
||||||
|
description="UWB follow-me driver for saltybot (MaUWB ESP32-S3 DW3000)",
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
"uwb_driver = saltybot_uwb.uwb_driver_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
174
jetson/ros2_ws/src/saltybot_uwb/test/test_ranging_math.py
Normal file
174
jetson/ros2_ws/src/saltybot_uwb/test/test_ranging_math.py
Normal file
@ -0,0 +1,174 @@
|
|||||||
|
"""
|
||||||
|
test_ranging_math.py — Unit tests for UWB triangulation and Kalman filter.
|
||||||
|
|
||||||
|
No ROS2 / serial / GPU dependencies — runs with plain pytest.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from saltybot_uwb.ranging_math import triangulate_2anchor, KalmanFilter2D
|
||||||
|
|
||||||
|
|
||||||
|
# ── triangulate_2anchor ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestTriangulate2Anchor:
|
||||||
|
|
||||||
|
def test_person_directly_ahead(self):
|
||||||
|
"""Person 2m ahead, centred: x≈2, y≈0."""
|
||||||
|
sep = 0.25
|
||||||
|
# A0 at y=+0.125, A1 at y=-0.125 → symmetric → y_t ≈ 0
|
||||||
|
r0 = math.sqrt(2.0**2 + 0.125**2)
|
||||||
|
r1 = r0 # symmetric
|
||||||
|
x, y = triangulate_2anchor(r0, r1, sep)
|
||||||
|
assert abs(x - 2.0) < 0.02
|
||||||
|
assert abs(y) < 0.01
|
||||||
|
|
||||||
|
def test_person_offset_left(self):
|
||||||
|
"""Person 2m ahead, 0.5m to the left: y_t ≈ +0.5."""
|
||||||
|
sep = 0.25
|
||||||
|
# A0 at y=+0.125, A1 at y=-0.125
|
||||||
|
px, py = 2.0, 0.5
|
||||||
|
r0 = math.sqrt(px**2 + (py - 0.125)**2)
|
||||||
|
r1 = math.sqrt(px**2 + (py + 0.125)**2)
|
||||||
|
x, y = triangulate_2anchor(r0, r1, sep)
|
||||||
|
assert abs(x - px) < 0.05
|
||||||
|
assert abs(y - py) < 0.05
|
||||||
|
|
||||||
|
def test_person_offset_right(self):
|
||||||
|
"""Person 2m ahead, 0.5m to the right: y_t ≈ -0.5."""
|
||||||
|
sep = 0.25
|
||||||
|
px, py = 2.0, -0.5
|
||||||
|
r0 = math.sqrt(px**2 + (py - 0.125)**2)
|
||||||
|
r1 = math.sqrt(px**2 + (py + 0.125)**2)
|
||||||
|
x, y = triangulate_2anchor(r0, r1, sep)
|
||||||
|
assert abs(x - px) < 0.05
|
||||||
|
assert abs(y - py) < 0.05
|
||||||
|
|
||||||
|
def test_height_compensation_reduces_horizontal_range(self):
|
||||||
|
"""With height difference, horizontal range < slant range → x smaller."""
|
||||||
|
sep = 0.25
|
||||||
|
# No height: x ≈ 2.0
|
||||||
|
x_flat, _ = triangulate_2anchor(2.0, 2.0, sep, anchor_z=0.0, tag_z=0.0)
|
||||||
|
# Same slant range but 0.5m height difference
|
||||||
|
slant = math.sqrt(2.0**2 + 0.5**2)
|
||||||
|
x_tall, _ = triangulate_2anchor(slant, slant, sep, anchor_z=0.8, tag_z=0.3)
|
||||||
|
# x_tall should be close to 2.0 (height-compensated back to horizontal)
|
||||||
|
assert abs(x_tall - x_flat) < 0.05
|
||||||
|
|
||||||
|
def test_returns_nonnegative_x(self):
|
||||||
|
"""x_t is always ≥ 0 (person can't be geometrically behind anchors)."""
|
||||||
|
sep = 0.25
|
||||||
|
# Pathological: extremely mismatched ranges → sqrt clamped to 0
|
||||||
|
x, _ = triangulate_2anchor(0.1, 0.1, sep)
|
||||||
|
assert x >= 0.0
|
||||||
|
|
||||||
|
def test_invalid_sep_raises(self):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
triangulate_2anchor(1.0, 1.0, sep=0.0)
|
||||||
|
|
||||||
|
def test_negative_range_raises(self):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
triangulate_2anchor(-1.0, 1.0, sep=0.25)
|
||||||
|
|
||||||
|
def test_close_range(self):
|
||||||
|
"""Person very close (0.3m) — should not crash."""
|
||||||
|
sep = 0.25
|
||||||
|
px, py = 0.3, 0.0
|
||||||
|
r0 = math.sqrt(px**2 + 0.125**2)
|
||||||
|
r1 = r0
|
||||||
|
x, y = triangulate_2anchor(r0, r1, sep)
|
||||||
|
assert x >= 0.0
|
||||||
|
|
||||||
|
def test_large_range(self):
|
||||||
|
"""Person 7m ahead — within DW3000 practical range."""
|
||||||
|
sep = 0.25
|
||||||
|
r0 = math.sqrt(7.0**2 + 0.125**2)
|
||||||
|
r1 = r0
|
||||||
|
x, y = triangulate_2anchor(r0, r1, sep)
|
||||||
|
assert abs(x - 7.0) < 0.1
|
||||||
|
assert abs(y) < 0.05
|
||||||
|
|
||||||
|
|
||||||
|
# ── KalmanFilter2D ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestKalmanFilter2D:
|
||||||
|
|
||||||
|
def test_initial_position_after_first_update(self):
|
||||||
|
"""First update initialises position to measurement."""
|
||||||
|
kf = KalmanFilter2D()
|
||||||
|
kf.predict()
|
||||||
|
kf.update(3.0, 1.0)
|
||||||
|
x, y = kf.position()
|
||||||
|
assert abs(x - 3.0) < 0.01
|
||||||
|
assert abs(y - 1.0) < 0.01
|
||||||
|
|
||||||
|
def test_converges_to_stationary_target(self):
|
||||||
|
"""Repeated updates to same position → Kalman converges to it."""
|
||||||
|
kf = KalmanFilter2D(process_noise=0.1, measurement_noise=0.3)
|
||||||
|
for _ in range(50):
|
||||||
|
kf.predict(dt=0.05)
|
||||||
|
kf.update(2.0, 0.5)
|
||||||
|
x, y = kf.position()
|
||||||
|
assert abs(x - 2.0) < 0.1
|
||||||
|
assert abs(y - 0.5) < 0.1
|
||||||
|
|
||||||
|
def test_smooths_noisy_measurements(self):
|
||||||
|
"""Filter output should be smoother than raw measurements."""
|
||||||
|
import random
|
||||||
|
rng = random.Random(42)
|
||||||
|
kf = KalmanFilter2D(process_noise=0.05, measurement_noise=0.5)
|
||||||
|
raw_errors = []
|
||||||
|
kf_errors = []
|
||||||
|
true_x, true_y = 2.0, 1.0
|
||||||
|
for _ in range(30):
|
||||||
|
noisy_x = true_x + rng.gauss(0, 0.3)
|
||||||
|
noisy_y = true_y + rng.gauss(0, 0.3)
|
||||||
|
kf.predict(dt=0.05)
|
||||||
|
kf.update(noisy_x, noisy_y)
|
||||||
|
kx, ky = kf.position()
|
||||||
|
raw_errors.append((noisy_x - true_x)**2 + (noisy_y - true_y)**2)
|
||||||
|
kf_errors.append((kx - true_x)**2 + (ky - true_y)**2)
|
||||||
|
avg_raw = sum(raw_errors[-10:]) / 10
|
||||||
|
avg_kf = sum(kf_errors[-10:]) / 10
|
||||||
|
assert avg_kf < avg_raw # filter is smoother
|
||||||
|
|
||||||
|
def test_reset_clears_state(self):
|
||||||
|
"""After reset(), position returns to ~0."""
|
||||||
|
kf = KalmanFilter2D()
|
||||||
|
for _ in range(10):
|
||||||
|
kf.predict()
|
||||||
|
kf.update(5.0, 3.0)
|
||||||
|
kf.reset()
|
||||||
|
kf.predict()
|
||||||
|
kf.update(0.0, 0.0)
|
||||||
|
x, y = kf.position()
|
||||||
|
assert abs(x) < 0.1
|
||||||
|
assert abs(y) < 0.1
|
||||||
|
|
||||||
|
def test_velocity_nonzero_for_moving_target(self):
|
||||||
|
"""For a target moving at constant velocity, vx should be detected."""
|
||||||
|
kf = KalmanFilter2D(process_noise=1.0, measurement_noise=0.1, dt=0.1)
|
||||||
|
# Target moving at 0.5 m/s in X
|
||||||
|
for i in range(40):
|
||||||
|
kf.predict(dt=0.1)
|
||||||
|
kf.update(0.5 * i * 0.1, 0.0)
|
||||||
|
vx, vy = kf.velocity()
|
||||||
|
assert abs(vx - 0.5) < 0.15
|
||||||
|
|
||||||
|
def test_custom_dt(self):
|
||||||
|
"""predict() accepts custom dt without raising."""
|
||||||
|
kf = KalmanFilter2D()
|
||||||
|
kf.predict(dt=0.02)
|
||||||
|
kf.update(1.0, 1.0)
|
||||||
|
x, y = kf.position()
|
||||||
|
assert abs(x - 1.0) < 0.01
|
||||||
|
|
||||||
|
def test_no_nan_on_zero_measurement(self):
|
||||||
|
"""Zero measurement should not produce NaN."""
|
||||||
|
kf = KalmanFilter2D()
|
||||||
|
kf.predict()
|
||||||
|
kf.update(0.0, 0.0)
|
||||||
|
x, y = kf.position()
|
||||||
|
assert not math.isnan(x)
|
||||||
|
assert not math.isnan(y)
|
||||||
15
jetson/ros2_ws/src/saltybot_uwb_msgs/CMakeLists.txt
Normal file
15
jetson/ros2_ws/src/saltybot_uwb_msgs/CMakeLists.txt
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
cmake_minimum_required(VERSION 3.8)
|
||||||
|
project(saltybot_uwb_msgs)
|
||||||
|
|
||||||
|
find_package(ament_cmake REQUIRED)
|
||||||
|
find_package(std_msgs REQUIRED)
|
||||||
|
find_package(rosidl_default_generators REQUIRED)
|
||||||
|
|
||||||
|
rosidl_generate_interfaces(${PROJECT_NAME}
|
||||||
|
"msg/UwbRange.msg"
|
||||||
|
"msg/UwbRangeArray.msg"
|
||||||
|
DEPENDENCIES std_msgs
|
||||||
|
)
|
||||||
|
|
||||||
|
ament_export_dependencies(rosidl_default_runtime)
|
||||||
|
ament_package()
|
||||||
13
jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRange.msg
Normal file
13
jetson/ros2_ws/src/saltybot_uwb_msgs/msg/UwbRange.msg
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
# UwbRange.msg — range measurement from a single UWB anchor
|
||||||
|
#
|
||||||
|
# anchor_id : 0 = anchor-0 (port side), 1 = anchor-1 (starboard side)
|
||||||
|
# range_m : measured horizontal range in metres (after height correction)
|
||||||
|
# raw_mm : raw TWR range from AT+RANGE? response, millimetres
|
||||||
|
# rssi : received signal strength (dBm), 0 if not reported by module
|
||||||
|
|
||||||
|
std_msgs/Header header
|
||||||
|
|
||||||
|
uint8 anchor_id
|
||||||
|
float32 range_m
|
||||||
|
uint32 raw_mm
|
||||||
|
float32 rssi
|
||||||
@ -0,0 +1,8 @@
|
|||||||
|
# UwbRangeArray.msg — all UWB anchor ranges in one message
|
||||||
|
#
|
||||||
|
# Published by uwb_driver_node on /uwb/ranges.
|
||||||
|
# Contains one UwbRange entry per active anchor.
|
||||||
|
|
||||||
|
std_msgs/Header header
|
||||||
|
|
||||||
|
saltybot_uwb_msgs/UwbRange[] ranges
|
||||||
22
jetson/ros2_ws/src/saltybot_uwb_msgs/package.xml
Normal file
22
jetson/ros2_ws/src/saltybot_uwb_msgs/package.xml
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_uwb_msgs</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>Custom ROS2 message definitions for UWB ranging (saltybot).</description>
|
||||||
|
<maintainer email="seb@vayrette.com">seb</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_cmake</buildtool_depend>
|
||||||
|
<buildtool_depend>rosidl_default_generators</buildtool_depend>
|
||||||
|
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
|
||||||
|
<exec_depend>rosidl_default_runtime</exec_depend>
|
||||||
|
|
||||||
|
<member_of_group>rosidl_interface_packages</member_of_group>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_cmake</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
Loading…
x
Reference in New Issue
Block a user