Compare commits
No commits in common. "64d411b48af7f9512cca074cdf7b515a6e8549d1" and "528034fe696c19379f65a4aed6b55f14bfc542cf" have entirely different histories.
64d411b48a
...
528034fe69
@ -1,42 +1,30 @@
|
|||||||
"""
|
"""
|
||||||
person_follower_node.py -- Proportional person-following control loop.
|
person_follower_node.py — Proportional person-following control loop.
|
||||||
|
|
||||||
Subscribes:
|
Subscribes:
|
||||||
/uwb/target (geometry_msgs/PoseStamped) -- UWB-triangulated position (primary)
|
/person/target (geometry_msgs/PoseStamped) — person pose in base_link frame
|
||||||
/person/target (geometry_msgs/PoseStamped) -- camera-detected position (secondary)
|
/local_costmap/costmap (nav_msgs/OccupancyGrid) — Nav2 local obstacle map
|
||||||
/local_costmap/costmap (nav_msgs/OccupancyGrid) -- Nav2 local obstacle map
|
|
||||||
|
|
||||||
Publishes:
|
Publishes:
|
||||||
/cmd_vel (geometry_msgs/Twist) -- velocity commands
|
/cmd_vel (geometry_msgs/Twist) — velocity commands
|
||||||
|
|
||||||
Sensor fusion
|
|
||||||
-------------
|
|
||||||
When both UWB and camera sources are fresh:
|
|
||||||
fused = uwb_weight * uwb + (1 - uwb_weight) * camera
|
|
||||||
When only UWB is fresh: use UWB directly.
|
|
||||||
When only camera is fresh: use camera directly.
|
|
||||||
When neither is fresh: STOPPING -> SEARCHING.
|
|
||||||
|
|
||||||
UWB is declared stale after uwb_timeout seconds (default 1.0s).
|
|
||||||
Camera is declared stale after lost_timeout seconds (default 2.0s).
|
|
||||||
|
|
||||||
State machine
|
State machine
|
||||||
-------------
|
─────────────
|
||||||
FOLLOWING -- at least one source fresh (last detection < timeout ago)
|
FOLLOWING — person visible (last detection < lost_timeout ago)
|
||||||
STOPPING -- all sources lost, publishing zero until search_timeout
|
STOPPING — person lost, publishing zero until search_timeout
|
||||||
SEARCHING -- all sources lost > search_timeout, slow rotation to re-acquire
|
SEARCHING — person lost > search_timeout, slow rotation to re-acquire
|
||||||
|
|
||||||
Safety wiring
|
Safety wiring
|
||||||
-------------
|
─────────────
|
||||||
* cmd_vel bridge (PR #46) applies ramp + deadman + STM32 AUTONOMOUS mode gate --
|
• cmd_vel bridge (PR #46) applies ramp + deadman + STM32 AUTONOMOUS mode gate —
|
||||||
this node publishes raw /cmd_vel, the bridge handles hardware safety.
|
this node publishes raw /cmd_vel, the bridge handles hardware safety.
|
||||||
* follow_enabled param (default True) lets the operator disable the controller
|
• follow_enabled param (default True) lets the operator disable the controller
|
||||||
at runtime: ros2 param set /person_follower follow_enabled false
|
at runtime: ros2 param set /person_follower follow_enabled false
|
||||||
* Obstacle override: if Nav2 local costmap shows a lethal cell in the forward
|
• Obstacle override: if Nav2 local costmap shows a lethal cell in the forward
|
||||||
corridor, forward cmd_vel is zeroed; turning is still allowed.
|
corridor, forward cmd_vel is zeroed; turning is still allowed.
|
||||||
|
|
||||||
Usage
|
Usage
|
||||||
-----
|
─────
|
||||||
ros2 launch saltybot_follower person_follower.launch.py
|
ros2 launch saltybot_follower person_follower.launch.py
|
||||||
ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.2
|
ros2 launch saltybot_follower person_follower.launch.py follow_distance:=1.2
|
||||||
"""
|
"""
|
||||||
@ -49,13 +37,13 @@ from rclpy.node import Node
|
|||||||
from geometry_msgs.msg import PoseStamped, Twist
|
from geometry_msgs.msg import PoseStamped, Twist
|
||||||
from nav_msgs.msg import OccupancyGrid
|
from nav_msgs.msg import OccupancyGrid
|
||||||
|
|
||||||
# -- State constants -----------------------------------------------------------
|
# ── State constants ────────────────────────────────────────────────────────────
|
||||||
_FOLLOWING = "following"
|
_FOLLOWING = "following"
|
||||||
_STOPPING = "stopping" # all sources lost < search_timeout, publishing zero
|
_STOPPING = "stopping" # person lost < search_timeout, publishing zero
|
||||||
_SEARCHING = "searching" # all sources lost > search_timeout, slow rotation
|
_SEARCHING = "searching" # person lost > search_timeout, slow rotation
|
||||||
|
|
||||||
|
|
||||||
# -- Pure helper functions (also exercised by unit tests) ----------------------
|
# ── Pure helper functions (also exercised by unit tests) ──────────────────────
|
||||||
|
|
||||||
def clamp(v: float, lo: float, hi: float) -> float:
|
def clamp(v: float, lo: float, hi: float) -> float:
|
||||||
return max(lo, min(hi, v))
|
return max(lo, min(hi, v))
|
||||||
@ -70,40 +58,6 @@ def compute_bearing(px: float, py: float) -> float:
|
|||||||
return math.atan2(py, px)
|
return math.atan2(py, px)
|
||||||
|
|
||||||
|
|
||||||
def fuse_targets(
|
|
||||||
uwb_x: float,
|
|
||||||
uwb_y: float,
|
|
||||||
uwb_fresh: bool,
|
|
||||||
cam_x: float,
|
|
||||||
cam_y: float,
|
|
||||||
cam_fresh: bool,
|
|
||||||
uwb_weight: float,
|
|
||||||
) -> tuple:
|
|
||||||
"""
|
|
||||||
Fuse UWB (primary) and camera (secondary) position estimates.
|
|
||||||
|
|
||||||
Returns (fused_x, fused_y, any_fresh).
|
|
||||||
|
|
||||||
Fusion rules:
|
|
||||||
Both fresh -> weighted blend: uwb_weight * uwb + (1-uwb_weight) * camera
|
|
||||||
UWB only -> UWB position, full weight
|
|
||||||
Camera only -> camera position, full weight
|
|
||||||
Neither -> (0, 0, False)
|
|
||||||
"""
|
|
||||||
if uwb_fresh and cam_fresh:
|
|
||||||
w = clamp(uwb_weight, 0.0, 1.0)
|
|
||||||
return (
|
|
||||||
w * uwb_x + (1.0 - w) * cam_x,
|
|
||||||
w * uwb_y + (1.0 - w) * cam_y,
|
|
||||||
True,
|
|
||||||
)
|
|
||||||
if uwb_fresh:
|
|
||||||
return uwb_x, uwb_y, True
|
|
||||||
if cam_fresh:
|
|
||||||
return cam_x, cam_y, True
|
|
||||||
return 0.0, 0.0, False
|
|
||||||
|
|
||||||
|
|
||||||
def compute_follow_cmd(
|
def compute_follow_cmd(
|
||||||
px: float,
|
px: float,
|
||||||
py: float,
|
py: float,
|
||||||
@ -121,27 +75,30 @@ def compute_follow_cmd(
|
|||||||
Returns (linear_x, angular_z) as floats.
|
Returns (linear_x, angular_z) as floats.
|
||||||
|
|
||||||
linear_x:
|
linear_x:
|
||||||
Proportional to signed distance error (distance - follow_distance).
|
Proportional to signed distance error (distance − follow_distance).
|
||||||
Zero inside the dead zone. Clamped to +-max_linear_vel.
|
Zero inside the dead zone. Clamped to ±max_linear_vel.
|
||||||
Zeroed (not reversed) if obstacle_ahead and moving forward.
|
Zeroed (not reversed) if obstacle_ahead and moving forward.
|
||||||
|
|
||||||
angular_z:
|
angular_z:
|
||||||
Proportional to bearing. Always corrects even when inside the dead zone,
|
Proportional to bearing. Always corrects even when inside the dead zone,
|
||||||
so the robot stays pointed at the person while holding distance.
|
so the robot stays pointed at the person while holding distance.
|
||||||
Clamped to +-max_angular_vel.
|
Clamped to ±max_angular_vel.
|
||||||
"""
|
"""
|
||||||
distance = compute_distance(px, py)
|
distance = compute_distance(px, py)
|
||||||
bearing = compute_bearing(px, py)
|
bearing = compute_bearing(px, py)
|
||||||
|
|
||||||
|
# Linear — dead zone suppresses jitter at target distance
|
||||||
dist_error = distance - follow_distance
|
dist_error = distance - follow_distance
|
||||||
if abs(dist_error) > dead_zone:
|
if abs(dist_error) > dead_zone:
|
||||||
linear_x = clamp(kp_linear * dist_error, -max_linear_vel, max_linear_vel)
|
linear_x = clamp(kp_linear * dist_error, -max_linear_vel, max_linear_vel)
|
||||||
else:
|
else:
|
||||||
linear_x = 0.0
|
linear_x = 0.0
|
||||||
|
|
||||||
|
# Obstacle override: suppress forward motion, allow turning
|
||||||
if obstacle_ahead and linear_x > 0.0:
|
if obstacle_ahead and linear_x > 0.0:
|
||||||
linear_x = 0.0
|
linear_x = 0.0
|
||||||
|
|
||||||
|
# Angular — always correct bearing
|
||||||
angular_z = clamp(kp_angular * bearing, -max_angular_vel, max_angular_vel)
|
angular_z = clamp(kp_angular * bearing, -max_angular_vel, max_angular_vel)
|
||||||
|
|
||||||
return linear_x, angular_z
|
return linear_x, angular_z
|
||||||
@ -159,7 +116,7 @@ def check_obstacle_in_costmap(
|
|||||||
"""
|
"""
|
||||||
Check for obstacle cells in the robot's forward corridor.
|
Check for obstacle cells in the robot's forward corridor.
|
||||||
|
|
||||||
Assumes a Nav2 rolling-window local costmap (robot ~= centre cell).
|
Assumes a Nav2 rolling-window local costmap (robot ≈ centre cell).
|
||||||
Forward = +col direction (robot +X).
|
Forward = +col direction (robot +X).
|
||||||
|
|
||||||
costmap_data : flat row-major int8 list (from OccupancyGrid.data)
|
costmap_data : flat row-major int8 list (from OccupancyGrid.data)
|
||||||
@ -167,7 +124,7 @@ def check_obstacle_in_costmap(
|
|||||||
resolution : metres per cell
|
resolution : metres per cell
|
||||||
check_dist : how far ahead to look (metres)
|
check_dist : how far ahead to look (metres)
|
||||||
robot_half_width: corridor half-width (metres)
|
robot_half_width: corridor half-width (metres)
|
||||||
threshold : costmap value >= this is treated as obstacle (Nav2 lethal=99)
|
threshold : costmap value ≥ this is treated as obstacle (Nav2 lethal=99)
|
||||||
"""
|
"""
|
||||||
if resolution <= 0.0:
|
if resolution <= 0.0:
|
||||||
return False
|
return False
|
||||||
@ -175,8 +132,8 @@ def check_obstacle_in_costmap(
|
|||||||
cx = width // 2
|
cx = width // 2
|
||||||
cy = height // 2
|
cy = height // 2
|
||||||
|
|
||||||
n_fwd = int(check_dist / resolution)
|
n_fwd = int(check_dist / resolution)
|
||||||
n_side = int(robot_half_width / resolution)
|
n_side = int(robot_half_width / resolution)
|
||||||
|
|
||||||
for dx in range(1, n_fwd + 1):
|
for dx in range(1, n_fwd + 1):
|
||||||
col = cx + dx
|
col = cx + dx
|
||||||
@ -190,14 +147,14 @@ def check_obstacle_in_costmap(
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
# -- ROS2 Node ----------------------------------------------------------------
|
# ── ROS2 Node ─────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
class PersonFollowerNode(Node):
|
class PersonFollowerNode(Node):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__("person_follower")
|
super().__init__("person_follower")
|
||||||
|
|
||||||
# -- Parameters --------------------------------------------------------
|
# ── Parameters ────────────────────────────────────────────────────────
|
||||||
self.declare_parameter("follow_distance", 1.5)
|
self.declare_parameter("follow_distance", 1.5)
|
||||||
self.declare_parameter("dead_zone", 0.3)
|
self.declare_parameter("dead_zone", 0.3)
|
||||||
self.declare_parameter("kp_linear", 0.6)
|
self.declare_parameter("kp_linear", 0.6)
|
||||||
@ -212,48 +169,38 @@ class PersonFollowerNode(Node):
|
|||||||
self.declare_parameter("robot_width", 0.3)
|
self.declare_parameter("robot_width", 0.3)
|
||||||
self.declare_parameter("control_rate", 20.0)
|
self.declare_parameter("control_rate", 20.0)
|
||||||
self.declare_parameter("follow_enabled", True)
|
self.declare_parameter("follow_enabled", True)
|
||||||
# UWB fusion
|
|
||||||
self.declare_parameter("uwb_weight", 0.7)
|
|
||||||
self.declare_parameter("uwb_timeout", 1.0)
|
|
||||||
|
|
||||||
self._p = {}
|
self._p = {}
|
||||||
self._reload_params()
|
self._reload_params()
|
||||||
|
|
||||||
# -- State -------------------------------------------------------------
|
# ── State ─────────────────────────────────────────────────────────────
|
||||||
self._state = _STOPPING
|
self._state = _STOPPING
|
||||||
self._last_target_t = 0.0 # camera; monotonic; 0 = never received
|
self._last_target_t = 0.0 # monotonic clock; 0 = never received
|
||||||
self._person_x = 0.0
|
self._person_x = 0.0
|
||||||
self._person_y = 0.0
|
self._person_y = 0.0
|
||||||
self._last_uwb_t = 0.0 # UWB; monotonic; 0 = never received
|
|
||||||
self._uwb_x = 0.0
|
|
||||||
self._uwb_y = 0.0
|
|
||||||
self._obstacle_ahead = False
|
self._obstacle_ahead = False
|
||||||
|
|
||||||
# -- Subscriptions -----------------------------------------------------
|
# ── Subscriptions ─────────────────────────────────────────────────────
|
||||||
self.create_subscription(
|
|
||||||
PoseStamped, "/uwb/target", self._uwb_cb, 10)
|
|
||||||
self.create_subscription(
|
self.create_subscription(
|
||||||
PoseStamped, "/person/target", self._target_cb, 10)
|
PoseStamped, "/person/target", self._target_cb, 10)
|
||||||
self.create_subscription(
|
self.create_subscription(
|
||||||
OccupancyGrid, "/local_costmap/costmap", self._costmap_cb, 1)
|
OccupancyGrid, "/local_costmap/costmap", self._costmap_cb, 1)
|
||||||
|
|
||||||
# -- Publisher ---------------------------------------------------------
|
# ── Publisher ─────────────────────────────────────────────────────────
|
||||||
self._cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10)
|
self._cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10)
|
||||||
|
|
||||||
# -- Control timer -----------------------------------------------------
|
# ── Control timer ─────────────────────────────────────────────────────
|
||||||
self._timer = self.create_timer(
|
self._timer = self.create_timer(
|
||||||
1.0 / self._p["control_rate"], self._control_cb)
|
1.0 / self._p["control_rate"], self._control_cb)
|
||||||
|
|
||||||
self.get_logger().info(
|
self.get_logger().info(
|
||||||
"PersonFollower ready "
|
f"PersonFollower ready follow_dist={self._p['follow_distance']}m "
|
||||||
f"follow_dist={self._p['follow_distance']}m "
|
f"dead_zone=±{self._p['dead_zone']}m "
|
||||||
f"dead_zone=+/-{self._p['dead_zone']}m "
|
|
||||||
f"max_vel={self._p['max_linear_vel']}m/s "
|
f"max_vel={self._p['max_linear_vel']}m/s "
|
||||||
f"uwb_weight={self._p['uwb_weight']} "
|
|
||||||
f"lost_timeout={self._p['lost_timeout']}s"
|
f"lost_timeout={self._p['lost_timeout']}s"
|
||||||
)
|
)
|
||||||
|
|
||||||
# -- Parameter helper ------------------------------------------------------
|
# ── Parameter helper ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
def _reload_params(self):
|
def _reload_params(self):
|
||||||
self._p = {
|
self._p = {
|
||||||
@ -271,21 +218,12 @@ class PersonFollowerNode(Node):
|
|||||||
"robot_width": self.get_parameter("robot_width").value,
|
"robot_width": self.get_parameter("robot_width").value,
|
||||||
"control_rate": self.get_parameter("control_rate").value,
|
"control_rate": self.get_parameter("control_rate").value,
|
||||||
"follow_enabled": self.get_parameter("follow_enabled").value,
|
"follow_enabled": self.get_parameter("follow_enabled").value,
|
||||||
"uwb_weight": self.get_parameter("uwb_weight").value,
|
|
||||||
"uwb_timeout": self.get_parameter("uwb_timeout").value,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# -- Callbacks -------------------------------------------------------------
|
# ── Callbacks ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
def _uwb_cb(self, msg):
|
|
||||||
"""UWB-triangulated person position (primary source)."""
|
|
||||||
self._uwb_x = msg.pose.position.x
|
|
||||||
self._uwb_y = msg.pose.position.y
|
|
||||||
self._last_uwb_t = time.monotonic()
|
|
||||||
self._state = _FOLLOWING
|
|
||||||
|
|
||||||
def _target_cb(self, msg):
|
def _target_cb(self, msg):
|
||||||
"""Camera-detected person position (secondary source)."""
|
"""Person position in base_link frame."""
|
||||||
self._person_x = msg.pose.position.x
|
self._person_x = msg.pose.position.x
|
||||||
self._person_y = msg.pose.position.y
|
self._person_y = msg.pose.position.y
|
||||||
self._last_target_t = time.monotonic()
|
self._last_target_t = time.monotonic()
|
||||||
@ -302,9 +240,10 @@ class PersonFollowerNode(Node):
|
|||||||
threshold=self._p["obstacle_threshold"],
|
threshold=self._p["obstacle_threshold"],
|
||||||
)
|
)
|
||||||
|
|
||||||
# -- Control loop ----------------------------------------------------------
|
# ── Control loop ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
def _control_cb(self):
|
def _control_cb(self):
|
||||||
|
# Re-read mutable params every tick (cheap dict lookup)
|
||||||
self._reload_params()
|
self._reload_params()
|
||||||
|
|
||||||
twist = Twist()
|
twist = Twist()
|
||||||
@ -313,69 +252,51 @@ class PersonFollowerNode(Node):
|
|||||||
self._cmd_pub.publish(twist)
|
self._cmd_pub.publish(twist)
|
||||||
return
|
return
|
||||||
|
|
||||||
now = time.monotonic()
|
now = time.monotonic()
|
||||||
|
dt_lost = (now - self._last_target_t) if self._last_target_t > 0.0 else 1e9
|
||||||
|
|
||||||
uwb_fresh = (
|
# State transitions (don't transition back from STOPPING/SEARCHING
|
||||||
self._last_uwb_t > 0.0 and
|
# here — _target_cb resets to FOLLOWING on new detection)
|
||||||
(now - self._last_uwb_t) <= self._p["uwb_timeout"]
|
|
||||||
)
|
|
||||||
cam_fresh = (
|
|
||||||
self._last_target_t > 0.0 and
|
|
||||||
(now - self._last_target_t) <= self._p["lost_timeout"]
|
|
||||||
)
|
|
||||||
|
|
||||||
any_fresh = uwb_fresh or cam_fresh
|
|
||||||
if self._state == _FOLLOWING:
|
if self._state == _FOLLOWING:
|
||||||
if not any_fresh:
|
if dt_lost > self._p["search_timeout"]:
|
||||||
uwb_stale = (now - self._last_uwb_t) if self._last_uwb_t > 0.0 else 1e9
|
self._state = _SEARCHING
|
||||||
cam_stale = (now - self._last_target_t) if self._last_target_t > 0.0 else 1e9
|
self.get_logger().warn("Person lost — entering SEARCHING state")
|
||||||
dt_lost = min(uwb_stale, cam_stale)
|
elif dt_lost > self._p["lost_timeout"]:
|
||||||
if dt_lost > self._p["search_timeout"]:
|
self._state = _STOPPING
|
||||||
self._state = _SEARCHING
|
self.get_logger().warn("Person lost — stopping")
|
||||||
self.get_logger().warn("Person lost -- entering SEARCHING state")
|
|
||||||
else:
|
|
||||||
self._state = _STOPPING
|
|
||||||
self.get_logger().warn("Person lost -- stopping")
|
|
||||||
|
|
||||||
linear_x, angular_z = 0.0, 0.0
|
linear_x, angular_z = 0.0, 0.0
|
||||||
|
|
||||||
if self._state == _FOLLOWING:
|
if self._state == _FOLLOWING:
|
||||||
fx, fy, fresh = fuse_targets(
|
linear_x, angular_z = compute_follow_cmd(
|
||||||
uwb_x=self._uwb_x,
|
px=self._person_x,
|
||||||
uwb_y=self._uwb_y,
|
py=self._person_y,
|
||||||
uwb_fresh=uwb_fresh,
|
follow_distance=self._p["follow_distance"],
|
||||||
cam_x=self._person_x,
|
dead_zone=self._p["dead_zone"],
|
||||||
cam_y=self._person_y,
|
kp_linear=self._p["kp_linear"],
|
||||||
cam_fresh=cam_fresh,
|
kp_angular=self._p["kp_angular"],
|
||||||
uwb_weight=self._p["uwb_weight"],
|
max_linear_vel=self._p["max_linear_vel"],
|
||||||
|
max_angular_vel=self._p["max_angular_vel"],
|
||||||
|
obstacle_ahead=self._obstacle_ahead,
|
||||||
)
|
)
|
||||||
if fresh:
|
if self._obstacle_ahead and linear_x == 0.0:
|
||||||
linear_x, angular_z = compute_follow_cmd(
|
self.get_logger().warn(
|
||||||
px=fx,
|
"Obstacle in path — forward motion suppressed",
|
||||||
py=fy,
|
throttle_duration_sec=2.0,
|
||||||
follow_distance=self._p["follow_distance"],
|
|
||||||
dead_zone=self._p["dead_zone"],
|
|
||||||
kp_linear=self._p["kp_linear"],
|
|
||||||
kp_angular=self._p["kp_angular"],
|
|
||||||
max_linear_vel=self._p["max_linear_vel"],
|
|
||||||
max_angular_vel=self._p["max_angular_vel"],
|
|
||||||
obstacle_ahead=self._obstacle_ahead,
|
|
||||||
)
|
)
|
||||||
if self._obstacle_ahead and linear_x == 0.0:
|
|
||||||
self.get_logger().warn(
|
|
||||||
"Obstacle in path -- forward motion suppressed",
|
|
||||||
throttle_duration_sec=2.0,
|
|
||||||
)
|
|
||||||
|
|
||||||
elif self._state == _SEARCHING:
|
elif self._state == _SEARCHING:
|
||||||
angular_z = float(self._p["search_angular_vel"])
|
angular_z = float(self._p["search_angular_vel"])
|
||||||
|
# linear_x stays 0 — rotate in place to search
|
||||||
|
|
||||||
|
# _STOPPING: both stay 0
|
||||||
|
|
||||||
twist.linear.x = linear_x
|
twist.linear.x = linear_x
|
||||||
twist.angular.z = angular_z
|
twist.angular.z = angular_z
|
||||||
self._cmd_pub.publish(twist)
|
self._cmd_pub.publish(twist)
|
||||||
|
|
||||||
|
|
||||||
# -- Entry point ---------------------------------------------------------------
|
# ── Entry point ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
def main(args=None):
|
def main(args=None):
|
||||||
rclpy.init(args=args)
|
rclpy.init(args=args)
|
||||||
|
|||||||
@ -388,98 +388,3 @@ class TestFollowEnabled:
|
|||||||
lin, ang = (0.0, 0.0) if not follow_enabled else \
|
lin, ang = (0.0, 0.0) if not follow_enabled else \
|
||||||
_compute_follow_cmd(3.0, 0.0, **_DEF)
|
_compute_follow_cmd(3.0, 0.0, **_DEF)
|
||||||
assert lin != pytest.approx(0.0)
|
assert lin != pytest.approx(0.0)
|
||||||
|
|
||||||
|
|
||||||
# ── fuse_targets — UWB + camera fusion ──────────────────────────────────────
|
|
||||||
|
|
||||||
def _fuse_targets(uwb_x, uwb_y, uwb_fresh, cam_x, cam_y, cam_fresh, uwb_weight):
|
|
||||||
"""Mirror of person_follower_node.fuse_targets (no ROS2)."""
|
|
||||||
def clamp(v, lo, hi): return max(lo, min(hi, v))
|
|
||||||
if uwb_fresh and cam_fresh:
|
|
||||||
w = clamp(uwb_weight, 0.0, 1.0)
|
|
||||||
return w * uwb_x + (1.0 - w) * cam_x, w * uwb_y + (1.0 - w) * cam_y, True
|
|
||||||
if uwb_fresh:
|
|
||||||
return uwb_x, uwb_y, True
|
|
||||||
if cam_fresh:
|
|
||||||
return cam_x, cam_y, True
|
|
||||||
return 0.0, 0.0, False
|
|
||||||
|
|
||||||
|
|
||||||
class TestFuseTargets:
|
|
||||||
|
|
||||||
def test_both_fresh_blends(self):
|
|
||||||
"""Both sources fresh → weighted blend."""
|
|
||||||
fx, fy, ok = _fuse_targets(
|
|
||||||
uwb_x=4.0, uwb_y=0.0, uwb_fresh=True,
|
|
||||||
cam_x=2.0, cam_y=0.0, cam_fresh=True,
|
|
||||||
uwb_weight=0.7,
|
|
||||||
)
|
|
||||||
assert ok is True
|
|
||||||
assert fx == pytest.approx(0.7 * 4.0 + 0.3 * 2.0)
|
|
||||||
assert fy == pytest.approx(0.0)
|
|
||||||
|
|
||||||
def test_uwb_only(self):
|
|
||||||
"""Only UWB fresh → UWB position returned as-is."""
|
|
||||||
fx, fy, ok = _fuse_targets(
|
|
||||||
uwb_x=3.0, uwb_y=1.0, uwb_fresh=True,
|
|
||||||
cam_x=0.0, cam_y=0.0, cam_fresh=False,
|
|
||||||
uwb_weight=0.7,
|
|
||||||
)
|
|
||||||
assert ok is True
|
|
||||||
assert fx == pytest.approx(3.0)
|
|
||||||
assert fy == pytest.approx(1.0)
|
|
||||||
|
|
||||||
def test_camera_only(self):
|
|
||||||
"""Only camera fresh → camera position returned as-is."""
|
|
||||||
fx, fy, ok = _fuse_targets(
|
|
||||||
uwb_x=0.0, uwb_y=0.0, uwb_fresh=False,
|
|
||||||
cam_x=2.5, cam_y=-0.5, cam_fresh=True,
|
|
||||||
uwb_weight=0.7,
|
|
||||||
)
|
|
||||||
assert ok is True
|
|
||||||
assert fx == pytest.approx(2.5)
|
|
||||||
assert fy == pytest.approx(-0.5)
|
|
||||||
|
|
||||||
def test_neither_fresh(self):
|
|
||||||
"""Both stale → (0, 0, False)."""
|
|
||||||
fx, fy, ok = _fuse_targets(
|
|
||||||
uwb_x=3.0, uwb_y=1.0, uwb_fresh=False,
|
|
||||||
cam_x=2.0, cam_y=0.0, cam_fresh=False,
|
|
||||||
uwb_weight=0.7,
|
|
||||||
)
|
|
||||||
assert ok is False
|
|
||||||
assert fx == pytest.approx(0.0)
|
|
||||||
assert fy == pytest.approx(0.0)
|
|
||||||
|
|
||||||
def test_uwb_weight_zero_gives_camera(self):
|
|
||||||
"""uwb_weight=0.0 → pure camera even when both fresh."""
|
|
||||||
fx, fy, ok = _fuse_targets(
|
|
||||||
uwb_x=4.0, uwb_y=2.0, uwb_fresh=True,
|
|
||||||
cam_x=1.0, cam_y=0.5, cam_fresh=True,
|
|
||||||
uwb_weight=0.0,
|
|
||||||
)
|
|
||||||
assert ok is True
|
|
||||||
assert fx == pytest.approx(1.0)
|
|
||||||
assert fy == pytest.approx(0.5)
|
|
||||||
|
|
||||||
def test_uwb_weight_one_gives_uwb(self):
|
|
||||||
"""uwb_weight=1.0 → pure UWB even when both fresh."""
|
|
||||||
fx, fy, ok = _fuse_targets(
|
|
||||||
uwb_x=4.0, uwb_y=2.0, uwb_fresh=True,
|
|
||||||
cam_x=1.0, cam_y=0.5, cam_fresh=True,
|
|
||||||
uwb_weight=1.0,
|
|
||||||
)
|
|
||||||
assert ok is True
|
|
||||||
assert fx == pytest.approx(4.0)
|
|
||||||
assert fy == pytest.approx(2.0)
|
|
||||||
|
|
||||||
def test_lateral_blend(self):
|
|
||||||
"""Lateral (Y) component is also blended correctly."""
|
|
||||||
fx, fy, ok = _fuse_targets(
|
|
||||||
uwb_x=2.0, uwb_y=1.0, uwb_fresh=True,
|
|
||||||
cam_x=2.0, cam_y=-1.0, cam_fresh=True,
|
|
||||||
uwb_weight=0.5,
|
|
||||||
)
|
|
||||||
assert ok is True
|
|
||||||
assert fx == pytest.approx(2.0)
|
|
||||||
assert fy == pytest.approx(0.0) # 0.5*1.0 + 0.5*(-1.0) = 0
|
|
||||||
|
|||||||
@ -1,57 +0,0 @@
|
|||||||
# uwb_config.yaml — MaUWB ESP32-S3 DW3000 UWB follow-me system
|
|
||||||
#
|
|
||||||
# Hardware layout:
|
|
||||||
# Anchor-0 (port side) → USB port_a, y = +anchor_separation/2
|
|
||||||
# Anchor-1 (starboard side) → USB port_b, y = -anchor_separation/2
|
|
||||||
# Tag on person → belt clip, ~0.9m above ground
|
|
||||||
#
|
|
||||||
# Run with:
|
|
||||||
# ros2 launch saltybot_uwb uwb.launch.py
|
|
||||||
# Override at launch:
|
|
||||||
# ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2
|
|
||||||
|
|
||||||
# ── Serial ports ──────────────────────────────────────────────────────────────
|
|
||||||
# Set udev rules to get stable symlinks:
|
|
||||||
# /dev/uwb-anchor0 → port_a
|
|
||||||
# /dev/uwb-anchor1 → port_b
|
|
||||||
# (See jetson/docs/pinout.md for udev setup)
|
|
||||||
port_a: /dev/uwb-anchor0 # Anchor-0 (port)
|
|
||||||
port_b: /dev/uwb-anchor1 # Anchor-1 (starboard)
|
|
||||||
baudrate: 115200 # MaUWB default — do not change
|
|
||||||
|
|
||||||
# ── Anchor geometry ────────────────────────────────────────────────────────────
|
|
||||||
# anchor_separation: centre-to-centre distance between anchors (metres)
|
|
||||||
# Must match physical mounting. Larger = more accurate lateral resolution.
|
|
||||||
anchor_separation: 0.25 # metres (25cm)
|
|
||||||
|
|
||||||
# anchor_height: height of anchors above ground (metres)
|
|
||||||
# Orin stem mount ≈ 0.80m on the saltybot platform
|
|
||||||
anchor_height: 0.80 # metres
|
|
||||||
|
|
||||||
# tag_height: height of person's belt-clip tag above ground (metres)
|
|
||||||
tag_height: 0.90 # metres (adjust per user)
|
|
||||||
|
|
||||||
# ── Range validity ─────────────────────────────────────────────────────────────
|
|
||||||
# range_timeout_s: stale anchor — excluded from triangulation after this gap
|
|
||||||
range_timeout_s: 1.0 # seconds
|
|
||||||
|
|
||||||
# max_range_m: discard ranges beyond this (DW3000 indoor practical limit ≈8m)
|
|
||||||
max_range_m: 8.0 # metres
|
|
||||||
|
|
||||||
# min_range_m: discard ranges below this (likely multipath artefacts)
|
|
||||||
min_range_m: 0.05 # metres
|
|
||||||
|
|
||||||
# ── Kalman filter ──────────────────────────────────────────────────────────────
|
|
||||||
# kf_process_noise: Q scalar — how dynamic the person's motion is
|
|
||||||
# Higher → faster response, more jitter
|
|
||||||
kf_process_noise: 0.1
|
|
||||||
|
|
||||||
# kf_meas_noise: R scalar — how noisy the UWB ranging is
|
|
||||||
# DW3000 indoor accuracy ≈ 10cm RMS → 0.1m → R ≈ 0.01
|
|
||||||
# Use 0.3 to be conservative on first deployment
|
|
||||||
kf_meas_noise: 0.3
|
|
||||||
|
|
||||||
# ── Publish rate ──────────────────────────────────────────────────────────────
|
|
||||||
# Should match or exceed the AT+RANGE? poll rate from both anchors.
|
|
||||||
# 20Hz means 50ms per cycle; each anchor query takes ~10ms → headroom ok.
|
|
||||||
publish_rate: 20.0 # Hz
|
|
||||||
@ -1,59 +0,0 @@
|
|||||||
"""
|
|
||||||
uwb.launch.py — Launch UWB driver node for MaUWB ESP32-S3 follow-me.
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
ros2 launch saltybot_uwb uwb.launch.py
|
|
||||||
ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2 port_b:=/dev/ttyUSB3
|
|
||||||
ros2 launch saltybot_uwb uwb.launch.py anchor_separation:=0.30 publish_rate:=10.0
|
|
||||||
"""
|
|
||||||
|
|
||||||
import os
|
|
||||||
from ament_index_python.packages import get_package_share_directory
|
|
||||||
from launch import LaunchDescription
|
|
||||||
from launch.actions import DeclareLaunchArgument
|
|
||||||
from launch.substitutions import LaunchConfiguration
|
|
||||||
from launch_ros.actions import Node
|
|
||||||
|
|
||||||
|
|
||||||
def generate_launch_description():
|
|
||||||
pkg = get_package_share_directory("saltybot_uwb")
|
|
||||||
cfg = os.path.join(pkg, "config", "uwb_config.yaml")
|
|
||||||
|
|
||||||
return LaunchDescription([
|
|
||||||
DeclareLaunchArgument("port_a", default_value="/dev/uwb-anchor0"),
|
|
||||||
DeclareLaunchArgument("port_b", default_value="/dev/uwb-anchor1"),
|
|
||||||
DeclareLaunchArgument("baudrate", default_value="115200"),
|
|
||||||
DeclareLaunchArgument("anchor_separation", default_value="0.25"),
|
|
||||||
DeclareLaunchArgument("anchor_height", default_value="0.80"),
|
|
||||||
DeclareLaunchArgument("tag_height", default_value="0.90"),
|
|
||||||
DeclareLaunchArgument("range_timeout_s", default_value="1.0"),
|
|
||||||
DeclareLaunchArgument("max_range_m", default_value="8.0"),
|
|
||||||
DeclareLaunchArgument("min_range_m", default_value="0.05"),
|
|
||||||
DeclareLaunchArgument("kf_process_noise", default_value="0.1"),
|
|
||||||
DeclareLaunchArgument("kf_meas_noise", default_value="0.3"),
|
|
||||||
DeclareLaunchArgument("publish_rate", default_value="20.0"),
|
|
||||||
|
|
||||||
Node(
|
|
||||||
package="saltybot_uwb",
|
|
||||||
executable="uwb_driver",
|
|
||||||
name="uwb_driver",
|
|
||||||
output="screen",
|
|
||||||
parameters=[
|
|
||||||
cfg,
|
|
||||||
{
|
|
||||||
"port_a": LaunchConfiguration("port_a"),
|
|
||||||
"port_b": LaunchConfiguration("port_b"),
|
|
||||||
"baudrate": LaunchConfiguration("baudrate"),
|
|
||||||
"anchor_separation": LaunchConfiguration("anchor_separation"),
|
|
||||||
"anchor_height": LaunchConfiguration("anchor_height"),
|
|
||||||
"tag_height": LaunchConfiguration("tag_height"),
|
|
||||||
"range_timeout_s": LaunchConfiguration("range_timeout_s"),
|
|
||||||
"max_range_m": LaunchConfiguration("max_range_m"),
|
|
||||||
"min_range_m": LaunchConfiguration("min_range_m"),
|
|
||||||
"kf_process_noise": LaunchConfiguration("kf_process_noise"),
|
|
||||||
"kf_meas_noise": LaunchConfiguration("kf_meas_noise"),
|
|
||||||
"publish_rate": LaunchConfiguration("publish_rate"),
|
|
||||||
},
|
|
||||||
],
|
|
||||||
),
|
|
||||||
])
|
|
||||||
@ -1,28 +0,0 @@
|
|||||||
<?xml version="1.0"?>
|
|
||||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
|
||||||
<package format="3">
|
|
||||||
<name>saltybot_uwb</name>
|
|
||||||
<version>0.1.0</version>
|
|
||||||
<description>
|
|
||||||
UWB follow-me driver for saltybot — MaUWB ESP32-S3 DW3000 two-anchor
|
|
||||||
ranging, triangulation, Kalman filtering, and /uwb/target PoseStamped publisher.
|
|
||||||
</description>
|
|
||||||
<maintainer email="seb@vayrette.com">seb</maintainer>
|
|
||||||
<license>MIT</license>
|
|
||||||
|
|
||||||
<buildtool_depend>ament_python</buildtool_depend>
|
|
||||||
|
|
||||||
<depend>rclpy</depend>
|
|
||||||
<depend>geometry_msgs</depend>
|
|
||||||
<depend>std_msgs</depend>
|
|
||||||
<depend>saltybot_uwb_msgs</depend>
|
|
||||||
|
|
||||||
<test_depend>ament_copyright</test_depend>
|
|
||||||
<test_depend>ament_flake8</test_depend>
|
|
||||||
<test_depend>ament_pep257</test_depend>
|
|
||||||
<test_depend>pytest</test_depend>
|
|
||||||
|
|
||||||
<export>
|
|
||||||
<build_type>ament_python</build_type>
|
|
||||||
</export>
|
|
||||||
</package>
|
|
||||||
@ -1,278 +0,0 @@
|
|||||||
"""
|
|
||||||
ranging_math.py — Pure triangulation and Kalman filter helpers for UWB follow-me.
|
|
||||||
|
|
||||||
No ROS2 dependencies — importable in unit tests without a ROS2 install.
|
|
||||||
|
|
||||||
2-Anchor geometry
|
|
||||||
─────────────────
|
|
||||||
Anchors are mounted on the robot stem, separated by `sep` metres along the Y-axis:
|
|
||||||
|
|
||||||
A0 (0, +sep/2, anchor_z) ← port side
|
|
||||||
A1 (0, -sep/2, anchor_z) ← starboard side
|
|
||||||
|
|
||||||
The person tag is at (x_t, y_t, tag_z) in base_link.
|
|
||||||
|
|
||||||
Step 1 — height compensation:
|
|
||||||
dz = anchor_z - tag_z
|
|
||||||
r0_h = sqrt(max(0, r0² - dz²))
|
|
||||||
r1_h = sqrt(max(0, r1² - dz²))
|
|
||||||
|
|
||||||
Step 2 — lateral offset (Y):
|
|
||||||
y_t = (r1_h² - r0_h²) / (2 * sep)
|
|
||||||
|
|
||||||
Step 3 — forward distance (X):
|
|
||||||
x_t = sqrt(max(0, r0_h² - (y_t - sep/2)²))
|
|
||||||
|
|
||||||
Returns (x_t, y_t); caller should treat negative x_t as 0.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import math
|
|
||||||
|
|
||||||
|
|
||||||
# ── Triangulation ─────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def triangulate_2anchor(
|
|
||||||
r0: float,
|
|
||||||
r1: float,
|
|
||||||
sep: float,
|
|
||||||
anchor_z: float = 0.0,
|
|
||||||
tag_z: float = 0.0,
|
|
||||||
) -> tuple:
|
|
||||||
"""
|
|
||||||
Triangulate person position in base_link from two anchor ranges.
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
r0 : range to anchor-0 (port, y = +sep/2) in metres
|
|
||||||
r1 : range to anchor-1 (starboard, y = -sep/2) in metres
|
|
||||||
sep : anchor separation in metres (centre-to-centre)
|
|
||||||
anchor_z : height of anchors above ground (metres, default 0)
|
|
||||||
tag_z : height of person tag above ground (metres, default 0)
|
|
||||||
|
|
||||||
Returns
|
|
||||||
-------
|
|
||||||
(x_t, y_t) : person position in metres (base_link X = forward, Y = left)
|
|
||||||
x_t is clamped to ≥ 0 (can't be behind anchors geometrically).
|
|
||||||
"""
|
|
||||||
if sep <= 0.0:
|
|
||||||
raise ValueError(f"sep must be > 0, got {sep}")
|
|
||||||
if r0 < 0.0 or r1 < 0.0:
|
|
||||||
raise ValueError(f"ranges must be ≥ 0, got r0={r0}, r1={r1}")
|
|
||||||
|
|
||||||
# Height correction — bring to horizontal plane
|
|
||||||
dz = anchor_z - tag_z
|
|
||||||
dz2 = dz * dz
|
|
||||||
r0_h2 = max(0.0, r0 * r0 - dz2)
|
|
||||||
r1_h2 = max(0.0, r1 * r1 - dz2)
|
|
||||||
r0_h = math.sqrt(r0_h2)
|
|
||||||
|
|
||||||
# Lateral (Y) offset — from circle-circle intersection formula
|
|
||||||
y_t = (r1_h2 - r0_h2) / (2.0 * sep)
|
|
||||||
|
|
||||||
# Forward (X) distance
|
|
||||||
dx2 = r0_h2 - (y_t - sep / 2.0) ** 2
|
|
||||||
x_t = math.sqrt(max(0.0, dx2))
|
|
||||||
|
|
||||||
return x_t, y_t
|
|
||||||
|
|
||||||
|
|
||||||
# ── Simple 2-D Kalman filter ──────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class KalmanFilter2D:
|
|
||||||
"""
|
|
||||||
Constant-velocity Kalman filter for a 2-D position estimate.
|
|
||||||
|
|
||||||
State vector: [x, y, vx, vy]ᵀ
|
|
||||||
|
|
||||||
Parameters
|
|
||||||
----------
|
|
||||||
process_noise : Q scalar — how much we trust the motion model
|
|
||||||
measurement_noise : R scalar — how noisy the UWB measurement is
|
|
||||||
dt : expected update interval in seconds (used for F matrix)
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
process_noise: float = 0.1,
|
|
||||||
measurement_noise: float = 0.3,
|
|
||||||
dt: float = 0.05,
|
|
||||||
):
|
|
||||||
self._q = process_noise
|
|
||||||
self._r = measurement_noise
|
|
||||||
self._dt = dt
|
|
||||||
|
|
||||||
# State: [x, y, vx, vy]
|
|
||||||
self._x = [0.0, 0.0, 0.0, 0.0]
|
|
||||||
|
|
||||||
# Covariance matrix P (4×4, stored as flat list, row-major)
|
|
||||||
init_pos_var = 1.0
|
|
||||||
init_vel_var = 0.5
|
|
||||||
self._P = [
|
|
||||||
init_pos_var, 0.0, 0.0, 0.0,
|
|
||||||
0.0, init_pos_var, 0.0, 0.0,
|
|
||||||
0.0, 0.0, init_vel_var, 0.0,
|
|
||||||
0.0, 0.0, 0.0, init_vel_var,
|
|
||||||
]
|
|
||||||
self._initialised = False
|
|
||||||
|
|
||||||
# ── Matrix helpers (minimal, no numpy — importable without GPU deps) ──────
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _mat_add(A, B):
|
|
||||||
return [a + b for a, b in zip(A, B)]
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _mat_sub(A, B):
|
|
||||||
return [a - b for a, b in zip(A, B)]
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _mat_mul(A, B, n):
|
|
||||||
"""n×n matrix multiply."""
|
|
||||||
C = [0.0] * (n * n)
|
|
||||||
for i in range(n):
|
|
||||||
for k in range(n):
|
|
||||||
for j in range(n):
|
|
||||||
C[i * n + j] += A[i * n + k] * B[k * n + j]
|
|
||||||
return C
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _mat_T(A, n):
|
|
||||||
"""n×n transpose."""
|
|
||||||
T = [0.0] * (n * n)
|
|
||||||
for i in range(n):
|
|
||||||
for j in range(n):
|
|
||||||
T[j * n + i] = A[i * n + j]
|
|
||||||
return T
|
|
||||||
|
|
||||||
def _build_F(self):
|
|
||||||
dt = self._dt
|
|
||||||
return [
|
|
||||||
1.0, 0.0, dt, 0.0,
|
|
||||||
0.0, 1.0, 0.0, dt,
|
|
||||||
0.0, 0.0, 1.0, 0.0,
|
|
||||||
0.0, 0.0, 0.0, 1.0,
|
|
||||||
]
|
|
||||||
|
|
||||||
def _build_Q(self):
|
|
||||||
q = self._q
|
|
||||||
dt = self._dt
|
|
||||||
dt2 = dt * dt
|
|
||||||
dt3 = dt2 * dt
|
|
||||||
dt4 = dt3 * dt
|
|
||||||
# Continuous white-noise model discretised for constant-velocity
|
|
||||||
return [
|
|
||||||
q * dt4 / 4, 0.0, q * dt3 / 2, 0.0,
|
|
||||||
0.0, q * dt4 / 4, 0.0, q * dt3 / 2,
|
|
||||||
q * dt3 / 2, 0.0, q * dt2, 0.0,
|
|
||||||
0.0, q * dt3 / 2, 0.0, q * dt2,
|
|
||||||
]
|
|
||||||
|
|
||||||
def predict(self, dt: float = None):
|
|
||||||
"""Predict step — advances state by dt (defaults to self._dt)."""
|
|
||||||
if dt is not None:
|
|
||||||
self._dt = dt
|
|
||||||
F = self._build_F()
|
|
||||||
Q = self._build_Q()
|
|
||||||
n = 4
|
|
||||||
x = self._x
|
|
||||||
# x = F @ x
|
|
||||||
new_x = [
|
|
||||||
F[0] * x[0] + F[1] * x[1] + F[2] * x[2] + F[3] * x[3],
|
|
||||||
F[4] * x[0] + F[5] * x[1] + F[6] * x[2] + F[7] * x[3],
|
|
||||||
F[8] * x[0] + F[9] * x[1] + F[10] * x[2] + F[11] * x[3],
|
|
||||||
F[12] * x[0] + F[13] * x[1] + F[14] * x[2] + F[15] * x[3],
|
|
||||||
]
|
|
||||||
# P = F @ P @ F.T + Q
|
|
||||||
FP = self._mat_mul(F, self._P, n)
|
|
||||||
FT = self._mat_T(F, n)
|
|
||||||
FPF = self._mat_mul(FP, FT, n)
|
|
||||||
self._P = self._mat_add(FPF, Q)
|
|
||||||
self._x = new_x
|
|
||||||
|
|
||||||
def update(self, x_meas: float, y_meas: float):
|
|
||||||
"""
|
|
||||||
Update step with a 2-D position measurement.
|
|
||||||
|
|
||||||
H = [[1,0,0,0],[0,1,0,0]] (observe position only)
|
|
||||||
"""
|
|
||||||
if not self._initialised:
|
|
||||||
self._x[0] = x_meas
|
|
||||||
self._x[1] = y_meas
|
|
||||||
self._initialised = True
|
|
||||||
return
|
|
||||||
|
|
||||||
# Innovation
|
|
||||||
innov_x = x_meas - self._x[0]
|
|
||||||
innov_y = y_meas - self._x[1]
|
|
||||||
|
|
||||||
# S = H @ P @ H.T + R (2×2, position rows/cols only)
|
|
||||||
P = self._P
|
|
||||||
r = self._r
|
|
||||||
# H picks rows 0,1 and cols 0,1 from P
|
|
||||||
S00 = P[0] + r
|
|
||||||
S01 = P[1]
|
|
||||||
S10 = P[4]
|
|
||||||
S11 = P[5] + r
|
|
||||||
|
|
||||||
# K = P @ H.T @ S⁻¹ (4×2 Kalman gain)
|
|
||||||
det = S00 * S11 - S01 * S10
|
|
||||||
if abs(det) < 1e-12:
|
|
||||||
return
|
|
||||||
inv_det = 1.0 / det
|
|
||||||
# S⁻¹
|
|
||||||
Si00 = S11 * inv_det
|
|
||||||
Si01 = -S01 * inv_det
|
|
||||||
Si10 = -S10 * inv_det
|
|
||||||
Si11 = S00 * inv_det
|
|
||||||
|
|
||||||
# P @ H.T = first two columns of P (H.T selects cols 0,1)
|
|
||||||
PH = [P[0], P[1], P[4], P[5], P[8], P[9], P[12], P[13]] # 4×2
|
|
||||||
|
|
||||||
# K (4×2) = PH @ Si
|
|
||||||
K = [
|
|
||||||
PH[0] * Si00 + PH[1] * Si10, PH[0] * Si01 + PH[1] * Si11,
|
|
||||||
PH[2] * Si00 + PH[3] * Si10, PH[2] * Si01 + PH[3] * Si11,
|
|
||||||
PH[4] * Si00 + PH[5] * Si10, PH[4] * Si01 + PH[5] * Si11,
|
|
||||||
PH[6] * Si00 + PH[7] * Si10, PH[6] * Si01 + PH[7] * Si11,
|
|
||||||
]
|
|
||||||
|
|
||||||
# x = x + K @ innov
|
|
||||||
self._x[0] += K[0] * innov_x + K[1] * innov_y
|
|
||||||
self._x[1] += K[2] * innov_x + K[3] * innov_y
|
|
||||||
self._x[2] += K[4] * innov_x + K[5] * innov_y
|
|
||||||
self._x[3] += K[6] * innov_x + K[7] * innov_y
|
|
||||||
|
|
||||||
# P = (I - K @ H) @ P (Joseph form for numerical stability skipped —
|
|
||||||
# this is a low-latency embedded context where simplicity wins)
|
|
||||||
n = 4
|
|
||||||
# K @ H selects first two columns → rows of P
|
|
||||||
KH = [0.0] * 16
|
|
||||||
for row in range(4):
|
|
||||||
KH[row * 4 + 0] = K[row * 2 + 0]
|
|
||||||
KH[row * 4 + 1] = K[row * 2 + 1]
|
|
||||||
# I - KH
|
|
||||||
IKH = [-KH[i] for i in range(16)]
|
|
||||||
for i in range(4):
|
|
||||||
IKH[i * 4 + i] += 1.0
|
|
||||||
self._P = self._mat_mul(IKH, self._P, n)
|
|
||||||
|
|
||||||
def position(self) -> tuple:
|
|
||||||
"""Return current (x, y) position estimate."""
|
|
||||||
return self._x[0], self._x[1]
|
|
||||||
|
|
||||||
def velocity(self) -> tuple:
|
|
||||||
"""Return current (vx, vy) velocity estimate."""
|
|
||||||
return self._x[2], self._x[3]
|
|
||||||
|
|
||||||
def reset(self):
|
|
||||||
"""Reset filter to uninitialised state."""
|
|
||||||
self._x = [0.0, 0.0, 0.0, 0.0]
|
|
||||||
init_pos_var = 1.0
|
|
||||||
init_vel_var = 0.5
|
|
||||||
self._P = [
|
|
||||||
init_pos_var, 0.0, 0.0, 0.0,
|
|
||||||
0.0, init_pos_var, 0.0, 0.0,
|
|
||||||
0.0, 0.0, init_vel_var, 0.0,
|
|
||||||
0.0, 0.0, 0.0, init_vel_var,
|
|
||||||
]
|
|
||||||
self._initialised = False
|
|
||||||
@ -1,306 +0,0 @@
|
|||||||
"""
|
|
||||||
uwb_driver_node.py — ROS2 node for MaUWB ESP32-S3 DW3000 follow-me system.
|
|
||||||
|
|
||||||
Hardware
|
|
||||||
────────
|
|
||||||
• 2× MaUWB ESP32-S3 DW3000 anchors on robot stem (USB → Orin Nano)
|
|
||||||
- Anchor-0: port side (y = +sep/2)
|
|
||||||
- Anchor-1: starboard (y = -sep/2)
|
|
||||||
• 1× MaUWB tag on person (belt clip)
|
|
||||||
|
|
||||||
AT command interface (115200 8N1)
|
|
||||||
──────────────────────────────────
|
|
||||||
Query: AT+RANGE?\r\n
|
|
||||||
Response (from anchors):
|
|
||||||
+RANGE:<anchor_id>,<range_mm>[,<rssi>]\r\n
|
|
||||||
|
|
||||||
Config:
|
|
||||||
AT+anchor_tag=ANCHOR\r\n — set module as anchor
|
|
||||||
AT+anchor_tag=TAG\r\n — set module as tag
|
|
||||||
|
|
||||||
Publishes
|
|
||||||
─────────
|
|
||||||
/uwb/target (geometry_msgs/PoseStamped) — triangulated person position in base_link
|
|
||||||
/uwb/ranges (saltybot_uwb_msgs/UwbRangeArray) — raw ranges from both anchors
|
|
||||||
|
|
||||||
Safety
|
|
||||||
──────
|
|
||||||
If a range is stale (> range_timeout_s), that anchor is excluded from
|
|
||||||
triangulation. If both anchors are stale, /uwb/target is not published.
|
|
||||||
|
|
||||||
Usage
|
|
||||||
─────
|
|
||||||
ros2 launch saltybot_uwb uwb.launch.py
|
|
||||||
ros2 launch saltybot_uwb uwb.launch.py port_a:=/dev/ttyUSB2 port_b:=/dev/ttyUSB3
|
|
||||||
"""
|
|
||||||
|
|
||||||
import math
|
|
||||||
import re
|
|
||||||
import threading
|
|
||||||
import time
|
|
||||||
|
|
||||||
import rclpy
|
|
||||||
from rclpy.node import Node
|
|
||||||
from geometry_msgs.msg import PoseStamped
|
|
||||||
from std_msgs.msg import Header
|
|
||||||
|
|
||||||
from saltybot_uwb_msgs.msg import UwbRange, UwbRangeArray
|
|
||||||
from saltybot_uwb.ranging_math import triangulate_2anchor, KalmanFilter2D
|
|
||||||
|
|
||||||
try:
|
|
||||||
import serial
|
|
||||||
_SERIAL_AVAILABLE = True
|
|
||||||
except ImportError:
|
|
||||||
_SERIAL_AVAILABLE = False
|
|
||||||
|
|
||||||
|
|
||||||
# Regex: +RANGE:<id>,<mm> or +RANGE:<id>,<mm>,<rssi>
|
|
||||||
_RANGE_RE = re.compile(
|
|
||||||
r"\+RANGE\s*:\s*(\d+)\s*,\s*(\d+)(?:\s*,\s*(-?[\d.]+))?",
|
|
||||||
re.IGNORECASE,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class SerialReader(threading.Thread):
|
|
||||||
"""
|
|
||||||
Background thread: polls an anchor's UART, fires callback on every
|
|
||||||
valid +RANGE response.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, port, baudrate, anchor_id, callback, logger):
|
|
||||||
super().__init__(daemon=True)
|
|
||||||
self._port = port
|
|
||||||
self._baudrate = baudrate
|
|
||||||
self._anchor_id = anchor_id
|
|
||||||
self._callback = callback
|
|
||||||
self._logger = logger
|
|
||||||
self._running = False
|
|
||||||
self._ser = None
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
self._running = True
|
|
||||||
while self._running:
|
|
||||||
try:
|
|
||||||
self._ser = serial.Serial(
|
|
||||||
self._port, self._baudrate, timeout=0.5
|
|
||||||
)
|
|
||||||
self._logger.info(
|
|
||||||
f"Anchor-{self._anchor_id}: opened {self._port}"
|
|
||||||
)
|
|
||||||
self._read_loop()
|
|
||||||
except Exception as exc:
|
|
||||||
self._logger.warn(
|
|
||||||
f"Anchor-{self._anchor_id} serial error: {exc} — retrying in 2s"
|
|
||||||
)
|
|
||||||
if self._ser and self._ser.is_open:
|
|
||||||
self._ser.close()
|
|
||||||
time.sleep(2.0)
|
|
||||||
|
|
||||||
def _read_loop(self):
|
|
||||||
while self._running:
|
|
||||||
try:
|
|
||||||
# Query the anchor
|
|
||||||
self._ser.write(b"AT+RANGE?\r\n")
|
|
||||||
# Read up to 10 lines waiting for a +RANGE response
|
|
||||||
for _ in range(10):
|
|
||||||
raw = self._ser.readline()
|
|
||||||
if not raw:
|
|
||||||
break
|
|
||||||
line = raw.decode("ascii", errors="replace").strip()
|
|
||||||
m = _RANGE_RE.search(line)
|
|
||||||
if m:
|
|
||||||
range_mm = int(m.group(2))
|
|
||||||
rssi = float(m.group(3)) if m.group(3) else 0.0
|
|
||||||
self._callback(self._anchor_id, range_mm, rssi)
|
|
||||||
break
|
|
||||||
except Exception as exc:
|
|
||||||
self._logger.warn(
|
|
||||||
f"Anchor-{self._anchor_id} read error: {exc}"
|
|
||||||
)
|
|
||||||
break # trigger reconnect
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self._running = False
|
|
||||||
if self._ser and self._ser.is_open:
|
|
||||||
self._ser.close()
|
|
||||||
|
|
||||||
|
|
||||||
class UwbDriverNode(Node):
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
super().__init__("uwb_driver")
|
|
||||||
|
|
||||||
# ── Parameters ────────────────────────────────────────────────────────
|
|
||||||
self.declare_parameter("port_a", "/dev/ttyUSB0")
|
|
||||||
self.declare_parameter("port_b", "/dev/ttyUSB1")
|
|
||||||
self.declare_parameter("baudrate", 115200)
|
|
||||||
self.declare_parameter("anchor_separation", 0.25)
|
|
||||||
self.declare_parameter("anchor_height", 0.80)
|
|
||||||
self.declare_parameter("tag_height", 0.90)
|
|
||||||
self.declare_parameter("range_timeout_s", 1.0)
|
|
||||||
self.declare_parameter("max_range_m", 8.0)
|
|
||||||
self.declare_parameter("min_range_m", 0.05)
|
|
||||||
self.declare_parameter("kf_process_noise", 0.1)
|
|
||||||
self.declare_parameter("kf_meas_noise", 0.3)
|
|
||||||
self.declare_parameter("publish_rate", 20.0)
|
|
||||||
|
|
||||||
self._p = self._load_params()
|
|
||||||
|
|
||||||
# ── State (protected by lock) ──────────────────────────────────────
|
|
||||||
self._lock = threading.Lock()
|
|
||||||
self._ranges = {} # anchor_id → (range_m, rssi, timestamp)
|
|
||||||
self._kf = KalmanFilter2D(
|
|
||||||
process_noise=self._p["kf_process_noise"],
|
|
||||||
measurement_noise=self._p["kf_meas_noise"],
|
|
||||||
dt=1.0 / self._p["publish_rate"],
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── Publishers ────────────────────────────────────────────────────
|
|
||||||
self._target_pub = self.create_publisher(
|
|
||||||
PoseStamped, "/uwb/target", 10)
|
|
||||||
self._ranges_pub = self.create_publisher(
|
|
||||||
UwbRangeArray, "/uwb/ranges", 10)
|
|
||||||
|
|
||||||
# ── Serial readers ────────────────────────────────────────────────
|
|
||||||
if _SERIAL_AVAILABLE:
|
|
||||||
self._reader_a = SerialReader(
|
|
||||||
self._p["port_a"], self._p["baudrate"],
|
|
||||||
anchor_id=0, callback=self._range_cb,
|
|
||||||
logger=self.get_logger(),
|
|
||||||
)
|
|
||||||
self._reader_b = SerialReader(
|
|
||||||
self._p["port_b"], self._p["baudrate"],
|
|
||||||
anchor_id=1, callback=self._range_cb,
|
|
||||||
logger=self.get_logger(),
|
|
||||||
)
|
|
||||||
self._reader_a.start()
|
|
||||||
self._reader_b.start()
|
|
||||||
else:
|
|
||||||
self.get_logger().warn(
|
|
||||||
"pyserial not installed — running in simulation mode (no serial I/O)"
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── Publish timer ─────────────────────────────────────────────────
|
|
||||||
self._timer = self.create_timer(
|
|
||||||
1.0 / self._p["publish_rate"], self._publish_cb
|
|
||||||
)
|
|
||||||
|
|
||||||
self.get_logger().info(
|
|
||||||
f"UWB driver ready sep={self._p['anchor_separation']}m "
|
|
||||||
f"ports={self._p['port_a']},{self._p['port_b']} "
|
|
||||||
f"rate={self._p['publish_rate']}Hz"
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── Helpers ───────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _load_params(self):
|
|
||||||
return {
|
|
||||||
"port_a": self.get_parameter("port_a").value,
|
|
||||||
"port_b": self.get_parameter("port_b").value,
|
|
||||||
"baudrate": self.get_parameter("baudrate").value,
|
|
||||||
"anchor_separation": self.get_parameter("anchor_separation").value,
|
|
||||||
"anchor_height": self.get_parameter("anchor_height").value,
|
|
||||||
"tag_height": self.get_parameter("tag_height").value,
|
|
||||||
"range_timeout_s": self.get_parameter("range_timeout_s").value,
|
|
||||||
"max_range_m": self.get_parameter("max_range_m").value,
|
|
||||||
"min_range_m": self.get_parameter("min_range_m").value,
|
|
||||||
"kf_process_noise": self.get_parameter("kf_process_noise").value,
|
|
||||||
"kf_meas_noise": self.get_parameter("kf_meas_noise").value,
|
|
||||||
"publish_rate": self.get_parameter("publish_rate").value,
|
|
||||||
}
|
|
||||||
|
|
||||||
# ── Callbacks ─────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _range_cb(self, anchor_id: int, range_mm: int, rssi: float):
|
|
||||||
"""Called from serial reader threads — thread-safe update."""
|
|
||||||
range_m = range_mm / 1000.0
|
|
||||||
p = self._p
|
|
||||||
if range_m < p["min_range_m"] or range_m > p["max_range_m"]:
|
|
||||||
return
|
|
||||||
with self._lock:
|
|
||||||
self._ranges[anchor_id] = (range_m, rssi, time.monotonic())
|
|
||||||
|
|
||||||
def _publish_cb(self):
|
|
||||||
now = time.monotonic()
|
|
||||||
timeout = self._p["range_timeout_s"]
|
|
||||||
sep = self._p["anchor_separation"]
|
|
||||||
|
|
||||||
with self._lock:
|
|
||||||
# Collect valid (non-stale) ranges
|
|
||||||
valid = {}
|
|
||||||
for aid, (r, rssi, t) in self._ranges.items():
|
|
||||||
if now - t <= timeout:
|
|
||||||
valid[aid] = (r, rssi, t)
|
|
||||||
|
|
||||||
# Build and publish UwbRangeArray regardless (even if partial)
|
|
||||||
hdr = Header()
|
|
||||||
hdr.stamp = self.get_clock().now().to_msg()
|
|
||||||
hdr.frame_id = "base_link"
|
|
||||||
|
|
||||||
arr = UwbRangeArray()
|
|
||||||
arr.header = hdr
|
|
||||||
for aid, (r, rssi, _) in valid.items():
|
|
||||||
entry = UwbRange()
|
|
||||||
entry.header = hdr
|
|
||||||
entry.anchor_id = aid
|
|
||||||
entry.range_m = float(r)
|
|
||||||
entry.raw_mm = int(round(r * 1000.0))
|
|
||||||
entry.rssi = float(rssi)
|
|
||||||
arr.ranges.append(entry)
|
|
||||||
self._ranges_pub.publish(arr)
|
|
||||||
|
|
||||||
# Need both anchors to triangulate
|
|
||||||
if 0 not in valid or 1 not in valid:
|
|
||||||
return
|
|
||||||
|
|
||||||
r0 = valid[0][0]
|
|
||||||
r1 = valid[1][0]
|
|
||||||
|
|
||||||
try:
|
|
||||||
x_t, y_t = triangulate_2anchor(
|
|
||||||
r0=r0,
|
|
||||||
r1=r1,
|
|
||||||
sep=sep,
|
|
||||||
anchor_z=self._p["anchor_height"],
|
|
||||||
tag_z=self._p["tag_height"],
|
|
||||||
)
|
|
||||||
except (ValueError, ZeroDivisionError) as exc:
|
|
||||||
self.get_logger().warn(f"Triangulation error: {exc}")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Kalman filter update
|
|
||||||
dt = 1.0 / self._p["publish_rate"]
|
|
||||||
self._kf.predict(dt=dt)
|
|
||||||
self._kf.update(x_t, y_t)
|
|
||||||
kx, ky = self._kf.position()
|
|
||||||
|
|
||||||
# Publish PoseStamped in base_link
|
|
||||||
pose = PoseStamped()
|
|
||||||
pose.header = hdr
|
|
||||||
pose.pose.position.x = kx
|
|
||||||
pose.pose.position.y = ky
|
|
||||||
pose.pose.position.z = 0.0
|
|
||||||
# Orientation: face the person (yaw = atan2(y, x))
|
|
||||||
yaw = math.atan2(ky, kx)
|
|
||||||
pose.pose.orientation.z = math.sin(yaw / 2.0)
|
|
||||||
pose.pose.orientation.w = math.cos(yaw / 2.0)
|
|
||||||
|
|
||||||
self._target_pub.publish(pose)
|
|
||||||
|
|
||||||
|
|
||||||
# ── Entry point ───────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def main(args=None):
|
|
||||||
rclpy.init(args=args)
|
|
||||||
node = UwbDriverNode()
|
|
||||||
try:
|
|
||||||
rclpy.spin(node)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
node.destroy_node()
|
|
||||||
rclpy.try_shutdown()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
@ -1,4 +0,0 @@
|
|||||||
[develop]
|
|
||||||
script_dir=$base/lib/saltybot_uwb
|
|
||||||
[install]
|
|
||||||
install_scripts=$base/lib/saltybot_uwb
|
|
||||||
@ -1,32 +0,0 @@
|
|||||||
from setuptools import setup
|
|
||||||
import os
|
|
||||||
from glob import glob
|
|
||||||
|
|
||||||
package_name = "saltybot_uwb"
|
|
||||||
|
|
||||||
setup(
|
|
||||||
name=package_name,
|
|
||||||
version="0.1.0",
|
|
||||||
packages=[package_name],
|
|
||||||
data_files=[
|
|
||||||
("share/ament_index/resource_index/packages",
|
|
||||||
["resource/" + package_name]),
|
|
||||||
("share/" + package_name, ["package.xml"]),
|
|
||||||
(os.path.join("share", package_name, "launch"),
|
|
||||||
glob("launch/*.py")),
|
|
||||||
(os.path.join("share", package_name, "config"),
|
|
||||||
glob("config/*.yaml")),
|
|
||||||
],
|
|
||||||
install_requires=["setuptools"],
|
|
||||||
zip_safe=True,
|
|
||||||
maintainer="seb",
|
|
||||||
maintainer_email="seb@vayrette.com",
|
|
||||||
description="UWB follow-me driver for saltybot (MaUWB ESP32-S3 DW3000)",
|
|
||||||
license="MIT",
|
|
||||||
tests_require=["pytest"],
|
|
||||||
entry_points={
|
|
||||||
"console_scripts": [
|
|
||||||
"uwb_driver = saltybot_uwb.uwb_driver_node:main",
|
|
||||||
],
|
|
||||||
},
|
|
||||||
)
|
|
||||||
@ -1,174 +0,0 @@
|
|||||||
"""
|
|
||||||
test_ranging_math.py — Unit tests for UWB triangulation and Kalman filter.
|
|
||||||
|
|
||||||
No ROS2 / serial / GPU dependencies — runs with plain pytest.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import math
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from saltybot_uwb.ranging_math import triangulate_2anchor, KalmanFilter2D
|
|
||||||
|
|
||||||
|
|
||||||
# ── triangulate_2anchor ───────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class TestTriangulate2Anchor:
|
|
||||||
|
|
||||||
def test_person_directly_ahead(self):
|
|
||||||
"""Person 2m ahead, centred: x≈2, y≈0."""
|
|
||||||
sep = 0.25
|
|
||||||
# A0 at y=+0.125, A1 at y=-0.125 → symmetric → y_t ≈ 0
|
|
||||||
r0 = math.sqrt(2.0**2 + 0.125**2)
|
|
||||||
r1 = r0 # symmetric
|
|
||||||
x, y = triangulate_2anchor(r0, r1, sep)
|
|
||||||
assert abs(x - 2.0) < 0.02
|
|
||||||
assert abs(y) < 0.01
|
|
||||||
|
|
||||||
def test_person_offset_left(self):
|
|
||||||
"""Person 2m ahead, 0.5m to the left: y_t ≈ +0.5."""
|
|
||||||
sep = 0.25
|
|
||||||
# A0 at y=+0.125, A1 at y=-0.125
|
|
||||||
px, py = 2.0, 0.5
|
|
||||||
r0 = math.sqrt(px**2 + (py - 0.125)**2)
|
|
||||||
r1 = math.sqrt(px**2 + (py + 0.125)**2)
|
|
||||||
x, y = triangulate_2anchor(r0, r1, sep)
|
|
||||||
assert abs(x - px) < 0.05
|
|
||||||
assert abs(y - py) < 0.05
|
|
||||||
|
|
||||||
def test_person_offset_right(self):
|
|
||||||
"""Person 2m ahead, 0.5m to the right: y_t ≈ -0.5."""
|
|
||||||
sep = 0.25
|
|
||||||
px, py = 2.0, -0.5
|
|
||||||
r0 = math.sqrt(px**2 + (py - 0.125)**2)
|
|
||||||
r1 = math.sqrt(px**2 + (py + 0.125)**2)
|
|
||||||
x, y = triangulate_2anchor(r0, r1, sep)
|
|
||||||
assert abs(x - px) < 0.05
|
|
||||||
assert abs(y - py) < 0.05
|
|
||||||
|
|
||||||
def test_height_compensation_reduces_horizontal_range(self):
|
|
||||||
"""With height difference, horizontal range < slant range → x smaller."""
|
|
||||||
sep = 0.25
|
|
||||||
# No height: x ≈ 2.0
|
|
||||||
x_flat, _ = triangulate_2anchor(2.0, 2.0, sep, anchor_z=0.0, tag_z=0.0)
|
|
||||||
# Same slant range but 0.5m height difference
|
|
||||||
slant = math.sqrt(2.0**2 + 0.5**2)
|
|
||||||
x_tall, _ = triangulate_2anchor(slant, slant, sep, anchor_z=0.8, tag_z=0.3)
|
|
||||||
# x_tall should be close to 2.0 (height-compensated back to horizontal)
|
|
||||||
assert abs(x_tall - x_flat) < 0.05
|
|
||||||
|
|
||||||
def test_returns_nonnegative_x(self):
|
|
||||||
"""x_t is always ≥ 0 (person can't be geometrically behind anchors)."""
|
|
||||||
sep = 0.25
|
|
||||||
# Pathological: extremely mismatched ranges → sqrt clamped to 0
|
|
||||||
x, _ = triangulate_2anchor(0.1, 0.1, sep)
|
|
||||||
assert x >= 0.0
|
|
||||||
|
|
||||||
def test_invalid_sep_raises(self):
|
|
||||||
with pytest.raises(ValueError):
|
|
||||||
triangulate_2anchor(1.0, 1.0, sep=0.0)
|
|
||||||
|
|
||||||
def test_negative_range_raises(self):
|
|
||||||
with pytest.raises(ValueError):
|
|
||||||
triangulate_2anchor(-1.0, 1.0, sep=0.25)
|
|
||||||
|
|
||||||
def test_close_range(self):
|
|
||||||
"""Person very close (0.3m) — should not crash."""
|
|
||||||
sep = 0.25
|
|
||||||
px, py = 0.3, 0.0
|
|
||||||
r0 = math.sqrt(px**2 + 0.125**2)
|
|
||||||
r1 = r0
|
|
||||||
x, y = triangulate_2anchor(r0, r1, sep)
|
|
||||||
assert x >= 0.0
|
|
||||||
|
|
||||||
def test_large_range(self):
|
|
||||||
"""Person 7m ahead — within DW3000 practical range."""
|
|
||||||
sep = 0.25
|
|
||||||
r0 = math.sqrt(7.0**2 + 0.125**2)
|
|
||||||
r1 = r0
|
|
||||||
x, y = triangulate_2anchor(r0, r1, sep)
|
|
||||||
assert abs(x - 7.0) < 0.1
|
|
||||||
assert abs(y) < 0.05
|
|
||||||
|
|
||||||
|
|
||||||
# ── KalmanFilter2D ────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
class TestKalmanFilter2D:
|
|
||||||
|
|
||||||
def test_initial_position_after_first_update(self):
|
|
||||||
"""First update initialises position to measurement."""
|
|
||||||
kf = KalmanFilter2D()
|
|
||||||
kf.predict()
|
|
||||||
kf.update(3.0, 1.0)
|
|
||||||
x, y = kf.position()
|
|
||||||
assert abs(x - 3.0) < 0.01
|
|
||||||
assert abs(y - 1.0) < 0.01
|
|
||||||
|
|
||||||
def test_converges_to_stationary_target(self):
|
|
||||||
"""Repeated updates to same position → Kalman converges to it."""
|
|
||||||
kf = KalmanFilter2D(process_noise=0.1, measurement_noise=0.3)
|
|
||||||
for _ in range(50):
|
|
||||||
kf.predict(dt=0.05)
|
|
||||||
kf.update(2.0, 0.5)
|
|
||||||
x, y = kf.position()
|
|
||||||
assert abs(x - 2.0) < 0.1
|
|
||||||
assert abs(y - 0.5) < 0.1
|
|
||||||
|
|
||||||
def test_smooths_noisy_measurements(self):
|
|
||||||
"""Filter output should be smoother than raw measurements."""
|
|
||||||
import random
|
|
||||||
rng = random.Random(42)
|
|
||||||
kf = KalmanFilter2D(process_noise=0.05, measurement_noise=0.5)
|
|
||||||
raw_errors = []
|
|
||||||
kf_errors = []
|
|
||||||
true_x, true_y = 2.0, 1.0
|
|
||||||
for _ in range(30):
|
|
||||||
noisy_x = true_x + rng.gauss(0, 0.3)
|
|
||||||
noisy_y = true_y + rng.gauss(0, 0.3)
|
|
||||||
kf.predict(dt=0.05)
|
|
||||||
kf.update(noisy_x, noisy_y)
|
|
||||||
kx, ky = kf.position()
|
|
||||||
raw_errors.append((noisy_x - true_x)**2 + (noisy_y - true_y)**2)
|
|
||||||
kf_errors.append((kx - true_x)**2 + (ky - true_y)**2)
|
|
||||||
avg_raw = sum(raw_errors[-10:]) / 10
|
|
||||||
avg_kf = sum(kf_errors[-10:]) / 10
|
|
||||||
assert avg_kf < avg_raw # filter is smoother
|
|
||||||
|
|
||||||
def test_reset_clears_state(self):
|
|
||||||
"""After reset(), position returns to ~0."""
|
|
||||||
kf = KalmanFilter2D()
|
|
||||||
for _ in range(10):
|
|
||||||
kf.predict()
|
|
||||||
kf.update(5.0, 3.0)
|
|
||||||
kf.reset()
|
|
||||||
kf.predict()
|
|
||||||
kf.update(0.0, 0.0)
|
|
||||||
x, y = kf.position()
|
|
||||||
assert abs(x) < 0.1
|
|
||||||
assert abs(y) < 0.1
|
|
||||||
|
|
||||||
def test_velocity_nonzero_for_moving_target(self):
|
|
||||||
"""For a target moving at constant velocity, vx should be detected."""
|
|
||||||
kf = KalmanFilter2D(process_noise=1.0, measurement_noise=0.1, dt=0.1)
|
|
||||||
# Target moving at 0.5 m/s in X
|
|
||||||
for i in range(40):
|
|
||||||
kf.predict(dt=0.1)
|
|
||||||
kf.update(0.5 * i * 0.1, 0.0)
|
|
||||||
vx, vy = kf.velocity()
|
|
||||||
assert abs(vx - 0.5) < 0.15
|
|
||||||
|
|
||||||
def test_custom_dt(self):
|
|
||||||
"""predict() accepts custom dt without raising."""
|
|
||||||
kf = KalmanFilter2D()
|
|
||||||
kf.predict(dt=0.02)
|
|
||||||
kf.update(1.0, 1.0)
|
|
||||||
x, y = kf.position()
|
|
||||||
assert abs(x - 1.0) < 0.01
|
|
||||||
|
|
||||||
def test_no_nan_on_zero_measurement(self):
|
|
||||||
"""Zero measurement should not produce NaN."""
|
|
||||||
kf = KalmanFilter2D()
|
|
||||||
kf.predict()
|
|
||||||
kf.update(0.0, 0.0)
|
|
||||||
x, y = kf.position()
|
|
||||||
assert not math.isnan(x)
|
|
||||||
assert not math.isnan(y)
|
|
||||||
@ -1,15 +0,0 @@
|
|||||||
cmake_minimum_required(VERSION 3.8)
|
|
||||||
project(saltybot_uwb_msgs)
|
|
||||||
|
|
||||||
find_package(ament_cmake REQUIRED)
|
|
||||||
find_package(std_msgs REQUIRED)
|
|
||||||
find_package(rosidl_default_generators REQUIRED)
|
|
||||||
|
|
||||||
rosidl_generate_interfaces(${PROJECT_NAME}
|
|
||||||
"msg/UwbRange.msg"
|
|
||||||
"msg/UwbRangeArray.msg"
|
|
||||||
DEPENDENCIES std_msgs
|
|
||||||
)
|
|
||||||
|
|
||||||
ament_export_dependencies(rosidl_default_runtime)
|
|
||||||
ament_package()
|
|
||||||
@ -1,13 +0,0 @@
|
|||||||
# UwbRange.msg — range measurement from a single UWB anchor
|
|
||||||
#
|
|
||||||
# anchor_id : 0 = anchor-0 (port side), 1 = anchor-1 (starboard side)
|
|
||||||
# range_m : measured horizontal range in metres (after height correction)
|
|
||||||
# raw_mm : raw TWR range from AT+RANGE? response, millimetres
|
|
||||||
# rssi : received signal strength (dBm), 0 if not reported by module
|
|
||||||
|
|
||||||
std_msgs/Header header
|
|
||||||
|
|
||||||
uint8 anchor_id
|
|
||||||
float32 range_m
|
|
||||||
uint32 raw_mm
|
|
||||||
float32 rssi
|
|
||||||
@ -1,8 +0,0 @@
|
|||||||
# UwbRangeArray.msg — all UWB anchor ranges in one message
|
|
||||||
#
|
|
||||||
# Published by uwb_driver_node on /uwb/ranges.
|
|
||||||
# Contains one UwbRange entry per active anchor.
|
|
||||||
|
|
||||||
std_msgs/Header header
|
|
||||||
|
|
||||||
saltybot_uwb_msgs/UwbRange[] ranges
|
|
||||||
@ -1,22 +0,0 @@
|
|||||||
<?xml version="1.0"?>
|
|
||||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
|
||||||
<package format="3">
|
|
||||||
<name>saltybot_uwb_msgs</name>
|
|
||||||
<version>0.1.0</version>
|
|
||||||
<description>Custom ROS2 message definitions for UWB ranging (saltybot).</description>
|
|
||||||
<maintainer email="seb@vayrette.com">seb</maintainer>
|
|
||||||
<license>MIT</license>
|
|
||||||
|
|
||||||
<buildtool_depend>ament_cmake</buildtool_depend>
|
|
||||||
<buildtool_depend>rosidl_default_generators</buildtool_depend>
|
|
||||||
|
|
||||||
<depend>std_msgs</depend>
|
|
||||||
|
|
||||||
<exec_depend>rosidl_default_runtime</exec_depend>
|
|
||||||
|
|
||||||
<member_of_group>rosidl_interface_packages</member_of_group>
|
|
||||||
|
|
||||||
<export>
|
|
||||||
<build_type>ament_cmake</build_type>
|
|
||||||
</export>
|
|
||||||
</package>
|
|
||||||
Loading…
x
Reference in New Issue
Block a user