diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/CMakeLists.txt b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/CMakeLists.txt new file mode 100644 index 0000000..737df82 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/CMakeLists.txt @@ -0,0 +1,16 @@ +cmake_minimum_required(VERSION 3.8) +project(saltybot_dynamic_obs_msgs) + +find_package(ament_cmake REQUIRED) +find_package(rosidl_default_generators REQUIRED) +find_package(std_msgs REQUIRED) +find_package(geometry_msgs REQUIRED) + +rosidl_generate_interfaces(${PROJECT_NAME} + "msg/TrackedObject.msg" + "msg/MovingObjectArray.msg" + DEPENDENCIES std_msgs geometry_msgs +) + +ament_export_dependencies(rosidl_default_runtime) +ament_package() diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/msg/MovingObjectArray.msg b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/msg/MovingObjectArray.msg new file mode 100644 index 0000000..619d2a5 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/msg/MovingObjectArray.msg @@ -0,0 +1,12 @@ +# MovingObjectArray — all currently tracked moving obstacles. +# +# Published at ~10 Hz on /saltybot/moving_objects. +# Only confirmed tracks (hits >= confirm_frames) appear here. + +std_msgs/Header header + +saltybot_dynamic_obs_msgs/TrackedObject[] objects + +uint32 active_count # number of confirmed tracks +uint32 tentative_count # tracks not yet confirmed +float32 detector_latency_ms # pipeline latency hint diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/msg/TrackedObject.msg b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/msg/TrackedObject.msg new file mode 100644 index 0000000..d5d3d9b --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/msg/TrackedObject.msg @@ -0,0 +1,21 @@ +# TrackedObject — a single tracked moving obstacle. +# +# predicted_path[i] is the estimated pose at predicted_times[i] seconds from now. +# All poses are in the same frame as header.frame_id (typically 'odom'). + +std_msgs/Header header + +uint32 object_id # stable ID across frames (monotonically increasing) + +geometry_msgs/PoseWithCovariance pose # current best-estimate pose (x, y, yaw) +geometry_msgs/Vector3 velocity # vx, vy in m/s (vz = 0 for ground objects) + +geometry_msgs/Pose[] predicted_path # future positions at predicted_times +float32[] predicted_times # seconds from header.stamp for each pose + +float32 speed_mps # scalar |v| +float32 confidence # 0.0–1.0 (higher after more confirmed frames) + +uint32 age_frames # frames since first detection +uint32 hits # number of successful associations +bool is_valid # false if in tentative / just-created state diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/package.xml b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/package.xml new file mode 100644 index 0000000..a768d97 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/package.xml @@ -0,0 +1,23 @@ + + + + saltybot_dynamic_obs_msgs + 0.1.0 + Custom message types for dynamic obstacle tracking. + SaltyLab + MIT + + ament_cmake + rosidl_default_generators + + std_msgs + geometry_msgs + + rosidl_default_runtime + + rosidl_interface_packages + + + ament_cmake + + diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/config/dynamic_obstacles_params.yaml b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/config/dynamic_obstacles_params.yaml new file mode 100644 index 0000000..1bb113f --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/config/dynamic_obstacles_params.yaml @@ -0,0 +1,52 @@ +# saltybot_dynamic_obstacles — runtime parameters +# +# Requires: +# /scan (sensor_msgs/LaserScan) — RPLIDAR A1M8 at ~5.5 Hz +# +# LIDAR scan is published by rplidar_ros node. +# Make sure RPLIDAR is running before starting this stack. + +dynamic_obs_tracker: + ros__parameters: + max_tracks: 20 # max simultaneous tracked objects + confirm_frames: 3 # hits before a track is published + max_missed_frames: 6 # missed frames before track deletion + assoc_dist_m: 1.5 # max assignment distance (Hungarian) + prediction_hz: 10.0 # tracker update + publish rate + horizon_s: 2.5 # prediction look-ahead + pred_step_s: 0.5 # time between predicted waypoints + odom_frame: 'odom' + min_speed_mps: 0.05 # suppress near-stationary tracks + max_range_m: 8.0 # ignore detections beyond this + +dynamic_obs_costmap: + ros__parameters: + inflation_radius_m: 0.35 # safety bubble around each predicted point + ring_points: 8 # polygon points for inflation circle + clear_on_empty: true # push empty cloud to clear stale Nav2 markings + + +# ── Nav2 costmap integration ─────────────────────────────────────────────────── +# In your nav2_params.yaml, under local_costmap or global_costmap > plugins, add +# an ObstacleLayer with: +# +# obstacle_layer: +# plugin: "nav2_costmap_2d::ObstacleLayer" +# enabled: true +# observation_sources: static_scan dynamic_obs +# static_scan: +# topic: /scan +# data_type: LaserScan +# ... +# dynamic_obs: +# topic: /saltybot/dynamic_obs_cloud +# data_type: PointCloud2 +# sensor_frame: odom +# obstacle_max_range: 10.0 +# raytrace_max_range: 10.0 +# marking: true +# clearing: false +# +# This feeds the predicted trajectory smear directly into Nav2's obstacle +# inflation, forcing the planner to route around the predicted future path +# of every tracked moving object. diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/launch/dynamic_obstacles.launch.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/launch/dynamic_obstacles.launch.py new file mode 100644 index 0000000..1c1cc57 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/launch/dynamic_obstacles.launch.py @@ -0,0 +1,75 @@ +""" +dynamic_obstacles.launch.py — Dynamic obstacle tracking + Nav2 costmap layer. + +Starts: + dynamic_obs_tracker — LIDAR motion detection + Kalman tracking @10 Hz + dynamic_obs_costmap — Predicted-trajectory → PointCloud2 for Nav2 + +Launch args: + max_tracks int '20' + assoc_dist_m float '1.5' + horizon_s float '2.5' + inflation_radius_m float '0.35' + +Verify: + ros2 topic hz /saltybot/moving_objects # ~10 Hz + ros2 topic echo /saltybot/moving_objects # TrackedObject list + ros2 topic hz /saltybot/dynamic_obs_cloud # ~10 Hz (when objects present) + rviz2 → add MarkerArray → /saltybot/moving_objects_viz + +Nav2 costmap integration: + In your costmap_params.yaml ObstacleLayer observation_sources, add: + dynamic_obs: + topic: /saltybot/dynamic_obs_cloud + data_type: PointCloud2 + marking: true + clearing: false +""" + +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description(): + args = [ + DeclareLaunchArgument('max_tracks', default_value='20'), + DeclareLaunchArgument('assoc_dist_m', default_value='1.5'), + DeclareLaunchArgument('horizon_s', default_value='2.5'), + DeclareLaunchArgument('inflation_radius_m', default_value='0.35'), + DeclareLaunchArgument('min_speed_mps', default_value='0.05'), + ] + + tracker = Node( + package='saltybot_dynamic_obstacles', + executable='dynamic_obs_tracker', + name='dynamic_obs_tracker', + output='screen', + parameters=[{ + 'max_tracks': LaunchConfiguration('max_tracks'), + 'assoc_dist_m': LaunchConfiguration('assoc_dist_m'), + 'horizon_s': LaunchConfiguration('horizon_s'), + 'min_speed_mps': LaunchConfiguration('min_speed_mps'), + 'prediction_hz': 10.0, + 'confirm_frames': 3, + 'max_missed_frames': 6, + 'pred_step_s': 0.5, + 'odom_frame': 'odom', + 'max_range_m': 8.0, + }], + ) + + costmap = Node( + package='saltybot_dynamic_obstacles', + executable='dynamic_obs_costmap', + name='dynamic_obs_costmap', + output='screen', + parameters=[{ + 'inflation_radius_m': LaunchConfiguration('inflation_radius_m'), + 'ring_points': 8, + 'clear_on_empty': True, + }], + ) + + return LaunchDescription(args + [tracker, costmap]) diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/package.xml b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/package.xml new file mode 100644 index 0000000..ebdc91c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/package.xml @@ -0,0 +1,29 @@ + + + + saltybot_dynamic_obstacles + 0.1.0 + + Dynamic obstacle detection, multi-object Kalman tracking, trajectory + prediction, and Nav2 costmap layer integration for SaltyBot. + + SaltyLab + MIT + + rclpy + std_msgs + sensor_msgs + geometry_msgs + nav_msgs + visualization_msgs + saltybot_dynamic_obs_msgs + + python3-numpy + python3-scipy + + pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/resource/saltybot_dynamic_obstacles b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/resource/saltybot_dynamic_obstacles new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/__init__.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/costmap_layer_node.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/costmap_layer_node.py new file mode 100644 index 0000000..792ff34 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/costmap_layer_node.py @@ -0,0 +1,178 @@ +""" +costmap_layer_node.py — Nav2 costmap integration for dynamic obstacles. + +Converts predicted trajectories from /saltybot/moving_objects into a +PointCloud2 fed into Nav2's ObstacleLayer. Each predicted future position +is added as a point, creating a "smeared" dynamic obstacle zone that +covers the full 2-3 s prediction horizon. + +Nav2 ObstacleLayer config (in costmap_params.yaml): + obstacle_layer: + enabled: true + observation_sources: dynamic_obs + dynamic_obs: + topic: /saltybot/dynamic_obs_cloud + sensor_frame: odom + data_type: PointCloud2 + obstacle_max_range: 12.0 + obstacle_min_range: 0.0 + raytrace_max_range: 12.0 + marking: true + clearing: false # let the tracker handle clearing + +The node also clears old obstacle points when tracks are dropped, by +publishing a clearing cloud to /saltybot/dynamic_obs_clear. + +Subscribes: + /saltybot/moving_objects saltybot_dynamic_obs_msgs/MovingObjectArray + +Publishes: + /saltybot/dynamic_obs_cloud sensor_msgs/PointCloud2 marking cloud + /saltybot/dynamic_obs_clear sensor_msgs/PointCloud2 clearing cloud + +Parameters: + inflation_radius_m float 0.35 (each predicted point inflated by this radius) + ring_points int 8 (polygon approximation of inflation circle) + clear_on_empty bool true (publish clear cloud when no objects tracked) +""" + +from __future__ import annotations + +import math +import struct +from typing import List + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy + +from sensor_msgs.msg import PointCloud2, PointField +from std_msgs.msg import Header + +try: + from saltybot_dynamic_obs_msgs.msg import MovingObjectArray + _MSGS_AVAILABLE = True +except ImportError: + _MSGS_AVAILABLE = False + + +_RELIABLE_QOS = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=10, +) + + +def _make_pc2(header: Header, points_xyz: List[tuple]) -> PointCloud2: + """Pack a list of (x, y, z) into a PointCloud2 message.""" + fields = [ + PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1), + PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1), + PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1), + ] + point_step = 12 # 3 × float32 + data = bytearray(len(points_xyz) * point_step) + for i, (x, y, z) in enumerate(points_xyz): + struct.pack_into(' None: + hdr = msg.header + mark_pts: List[tuple] = [] + + for obj in msg.objects: + if not obj.is_valid: + continue + + # Current position + self._add_inflated(obj.pose.pose.position.x, + obj.pose.pose.position.y, mark_pts) + + # Predicted future positions + for pose in obj.predicted_path: + self._add_inflated(pose.position.x, pose.position.y, mark_pts) + + if mark_pts: + self._mark_pub.publish(_make_pc2(hdr, mark_pts)) + elif self._clear_empty: + # Publish tiny clear cloud so Nav2 clears stale markings + self._clear_pub.publish(_make_pc2(hdr, [])) + + def _add_inflated(self, cx: float, cy: float, pts: List[tuple]) -> None: + """Add the centre + ring of inflation points at height 0.5 m.""" + pts.append((cx, cy, 0.5)) + for ox, oy in self._ring_offsets: + pts.append((cx + ox, cy + oy, 0.5)) + + +def main(args=None): + rclpy.init(args=args) + node = CostmapLayerNode() + try: + rclpy.spin(node) + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/dynamic_obs_node.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/dynamic_obs_node.py new file mode 100644 index 0000000..087accc --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/dynamic_obs_node.py @@ -0,0 +1,319 @@ +""" +dynamic_obs_node.py — ROS2 node: LIDAR moving-object detection + Kalman tracking. + +Pipeline: + 1. Subscribe /scan (RPLIDAR LaserScan, ~5.5 Hz). + 2. ObjectDetector performs background subtraction → moving blobs. + 3. TrackerManager runs Hungarian assignment + Kalman predict/update at 10 Hz. + 4. Publish /saltybot/moving_objects (MovingObjectArray). + 5. Publish /saltybot/moving_objects_viz (MarkerArray) for RViz. + +The 10 Hz timer drives the tracker regardless of scan rate, so prediction +continues between scans (pure-predict steps). + +Subscribes: + /scan sensor_msgs/LaserScan RPLIDAR A1M8 + +Publishes: + /saltybot/moving_objects saltybot_dynamic_obs_msgs/MovingObjectArray + /saltybot/moving_objects_viz visualization_msgs/MarkerArray + +Parameters: + max_tracks int 20 + confirm_frames int 3 + max_missed_frames int 6 + assoc_dist_m float 1.5 + prediction_hz float 10.0 (tracker + publish rate) + horizon_s float 2.5 + pred_step_s float 0.5 + odom_frame str 'odom' + min_speed_mps float 0.05 (suppress near-zero velocity tracks) + max_range_m float 8.0 +""" + +import time +import math + +import numpy as np +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy + +from sensor_msgs.msg import LaserScan +from geometry_msgs.msg import Pose, Point, Quaternion, Vector3 +from std_msgs.msg import Header, ColorRGBA +from visualization_msgs.msg import Marker, MarkerArray + +try: + from saltybot_dynamic_obs_msgs.msg import TrackedObject, MovingObjectArray + _MSGS_AVAILABLE = True +except ImportError: + _MSGS_AVAILABLE = False + +from .object_detector import ObjectDetector +from .tracker_manager import TrackerManager, Track + + +_SENSOR_QOS = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=5, +) + + +def _yaw_quat(yaw: float) -> Quaternion: + q = Quaternion() + q.w = math.cos(yaw * 0.5) + q.z = math.sin(yaw * 0.5) + return q + + +class DynamicObsNode(Node): + + def __init__(self): + super().__init__('dynamic_obs_tracker') + + # ── Parameters ────────────────────────────────────────────────────── + self.declare_parameter('max_tracks', 20) + self.declare_parameter('confirm_frames', 3) + self.declare_parameter('max_missed_frames', 6) + self.declare_parameter('assoc_dist_m', 1.5) + self.declare_parameter('prediction_hz', 10.0) + self.declare_parameter('horizon_s', 2.5) + self.declare_parameter('pred_step_s', 0.5) + self.declare_parameter('odom_frame', 'odom') + self.declare_parameter('min_speed_mps', 0.05) + self.declare_parameter('max_range_m', 8.0) + + max_tracks = self.get_parameter('max_tracks').value + confirm_f = self.get_parameter('confirm_frames').value + max_missed = self.get_parameter('max_missed_frames').value + assoc_dist = self.get_parameter('assoc_dist_m').value + pred_hz = self.get_parameter('prediction_hz').value + horizon_s = self.get_parameter('horizon_s').value + pred_step = self.get_parameter('pred_step_s').value + self._frame = self.get_parameter('odom_frame').value + self._min_spd = self.get_parameter('min_speed_mps').value + self._max_rng = self.get_parameter('max_range_m').value + + # ── Core modules ──────────────────────────────────────────────────── + self._detector = ObjectDetector( + grid_radius_m=min(self._max_rng + 2.0, 12.0), + max_cluster=int((self._max_rng / 0.1) ** 2 * 0.5), + ) + self._tracker = TrackerManager( + max_tracks=max_tracks, + confirm_frames=confirm_f, + max_missed=max_missed, + assoc_dist_m=assoc_dist, + horizon_s=horizon_s, + pred_step_s=pred_step, + ) + self._horizon_s = horizon_s + self._pred_step = pred_step + + # ── State ──────────────────────────────────────────────────────────── + self._latest_scan: LaserScan | None = None + self._last_track_t: float = time.monotonic() + self._scan_processed_stamp: float | None = None + + # ── Subscriptions ──────────────────────────────────────────────────── + self.create_subscription(LaserScan, '/scan', self._on_scan, _SENSOR_QOS) + + # ── Publishers ─────────────────────────────────────────────────────── + if _MSGS_AVAILABLE: + self._obj_pub = self.create_publisher( + MovingObjectArray, '/saltybot/moving_objects', 10 + ) + else: + self._obj_pub = None + self.get_logger().warning( + '[dyn_obs] saltybot_dynamic_obs_msgs not built — ' + 'MovingObjectArray will not be published' + ) + + self._viz_pub = self.create_publisher( + MarkerArray, '/saltybot/moving_objects_viz', 10 + ) + + # ── Timer ──────────────────────────────────────────────────────────── + self.create_timer(1.0 / pred_hz, self._track_tick) + + self.get_logger().info( + f'dynamic_obs_tracker ready — ' + f'max_tracks={max_tracks} horizon={horizon_s}s assoc={assoc_dist}m' + ) + + # ── Scan callback ───────────────────────────────────────────────────────── + + def _on_scan(self, msg: LaserScan) -> None: + self._latest_scan = msg + + # ── 10 Hz tracker tick ──────────────────────────────────────────────────── + + def _track_tick(self) -> None: + t0 = time.monotonic() + + now_mono = t0 + dt = now_mono - self._last_track_t + dt = max(1e-3, min(dt, 0.5)) + self._last_track_t = now_mono + + scan = self._latest_scan + detections = [] + + if scan is not None: + stamp_sec = scan.header.stamp.sec + scan.header.stamp.nanosec * 1e-9 + if stamp_sec != self._scan_processed_stamp: + self._scan_processed_stamp = stamp_sec + ranges = np.asarray(scan.ranges, dtype=np.float32) + detections = self._detector.update( + ranges, + scan.angle_min, + scan.angle_increment, + min(scan.range_max, self._max_rng), + ) + + confirmed = self._tracker.update(detections, dt) + + latency_ms = (time.monotonic() - t0) * 1000.0 + stamp = self.get_clock().now().to_msg() + + if _MSGS_AVAILABLE and self._obj_pub is not None: + self._publish_objects(confirmed, stamp, latency_ms) + self._publish_viz(confirmed, stamp) + + # ── Publish helpers ─────────────────────────────────────────────────────── + + def _publish_objects(self, confirmed: list, stamp, latency_ms: float) -> None: + arr = MovingObjectArray() + arr.header.stamp = stamp + arr.header.frame_id = self._frame + arr.active_count = len(confirmed) + arr.tentative_count = self._tracker.tentative_count + arr.detector_latency_ms = float(latency_ms) + + for tr in confirmed: + px, py = tr.kalman.position + vx, vy = tr.kalman.velocity + speed = tr.kalman.speed + if speed < self._min_spd: + continue + + obj = TrackedObject() + obj.header = arr.header + obj.object_id = tr.track_id + obj.pose.pose.position.x = px + obj.pose.pose.position.y = py + obj.pose.pose.orientation = _yaw_quat(math.atan2(vy, vx)) + cov = tr.kalman.covariance_2x2 + obj.pose.covariance[0] = float(cov[0, 0]) + obj.pose.covariance[1] = float(cov[0, 1]) + obj.pose.covariance[6] = float(cov[1, 0]) + obj.pose.covariance[7] = float(cov[1, 1]) + obj.velocity.x = vx + obj.velocity.y = vy + obj.speed_mps = speed + obj.confidence = min(1.0, tr.hits / (self._tracker._confirm_frames * 3)) + obj.age_frames = tr.age + obj.hits = tr.hits + obj.is_valid = True + + # Predicted path + for px_f, py_f, t_f in tr.kalman.predict_horizon( + self._horizon_s, self._pred_step + ): + p = Pose() + p.position.x = px_f + p.position.y = py_f + p.orientation.w = 1.0 + obj.predicted_path.append(p) + obj.predicted_times.append(float(t_f)) + + arr.objects.append(obj) + + self._obj_pub.publish(arr) + + def _publish_viz(self, confirmed: list, stamp) -> None: + markers = MarkerArray() + + # Delete old markers + del_marker = Marker() + del_marker.header.stamp = stamp + del_marker.header.frame_id = self._frame + del_marker.action = Marker.DELETEALL + markers.markers.append(del_marker) + + for tr in confirmed: + px, py = tr.kalman.position + vx, vy = tr.kalman.velocity + speed = tr.kalman.speed + if speed < self._min_spd: + continue + + # Cylinder at current position + m = Marker() + m.header.stamp = stamp + m.header.frame_id = self._frame + m.ns = 'dyn_obs' + m.id = tr.track_id + m.type = Marker.CYLINDER + m.action = Marker.ADD + m.pose.position.x = px + m.pose.position.y = py + m.pose.position.z = 0.5 + m.pose.orientation.w = 1.0 + m.scale.x = 0.4 + m.scale.y = 0.4 + m.scale.z = 1.0 + m.color = ColorRGBA(r=1.0, g=0.2, b=0.0, a=0.7) + m.lifetime.sec = 1 + markers.markers.append(m) + + # Arrow for velocity + vel_m = Marker() + vel_m.header = m.header + vel_m.ns = 'dyn_obs_vel' + vel_m.id = tr.track_id + vel_m.type = Marker.ARROW + vel_m.action = Marker.ADD + from geometry_msgs.msg import Point as GPoint + p_start = GPoint(x=px, y=py, z=1.0) + p_end = GPoint(x=px + vx, y=py + vy, z=1.0) + vel_m.points = [p_start, p_end] + vel_m.scale.x = 0.05 + vel_m.scale.y = 0.10 + vel_m.color = ColorRGBA(r=1.0, g=1.0, b=0.0, a=0.9) + vel_m.lifetime.sec = 1 + markers.markers.append(vel_m) + + # Line strip for predicted path + path_m = Marker() + path_m.header = m.header + path_m.ns = 'dyn_obs_path' + path_m.id = tr.track_id + path_m.type = Marker.LINE_STRIP + path_m.action = Marker.ADD + path_m.scale.x = 0.04 + path_m.color = ColorRGBA(r=1.0, g=0.5, b=0.0, a=0.5) + path_m.lifetime.sec = 1 + path_m.points.append(GPoint(x=px, y=py, z=0.5)) + for fx, fy, _ in tr.kalman.predict_horizon(self._horizon_s, self._pred_step): + path_m.points.append(GPoint(x=fx, y=fy, z=0.5)) + markers.markers.append(path_m) + + self._viz_pub.publish(markers) + + +def main(args=None): + rclpy.init(args=args) + node = DynamicObsNode() + try: + rclpy.spin(node) + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/kalman_tracker.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/kalman_tracker.py new file mode 100644 index 0000000..3c9d9a7 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/kalman_tracker.py @@ -0,0 +1,132 @@ +""" +kalman_tracker.py — Single-object Kalman filter for 2-D ground-plane tracking. + +State vector: x = [px, py, vx, vy] (position + velocity) +Motion model: constant-velocity (CV) with process noise on acceleration + +Predict step: + F = | 1 0 dt 0 | x_{k|k-1} = F @ x_{k-1|k-1} + | 0 1 0 dt | P_{k|k-1} = F @ P @ F^T + Q + | 0 0 1 0 | + | 0 0 0 1 | + +Update step (position observation only): + H = | 1 0 0 0 | y = z - H @ x + | 0 1 0 0 | S = H @ P @ H^T + R + K = P @ H^T @ inv(S) + x = x + K @ y + P = (I - K @ H) @ P (Joseph form for stability) + +Trajectory prediction: unrolls the CV model forward at fixed time steps. +""" + +from __future__ import annotations + +from typing import List, Tuple + +import numpy as np + + +# ── Default noise matrices ──────────────────────────────────────────────────── +# Process noise: models uncertainty in acceleration between frames +_Q_BASE = np.diag([0.02, 0.02, 0.8, 0.8]).astype(np.float64) + +# Measurement noise: LIDAR centroid uncertainty (~0.15 m std) +_R = np.diag([0.025, 0.025]).astype(np.float64) # 0.16 m sigma each axis + +# Observation matrix +_H = np.array([[1, 0, 0, 0], + [0, 1, 0, 0]], dtype=np.float64) +_I4 = np.eye(4, dtype=np.float64) + + +class KalmanTracker: + """ + Kalman filter tracking one object. + + Parameters + ---------- + x0, y0 : initial position (metres, odom frame) + process_noise : scalar multiplier on _Q_BASE + """ + + def __init__(self, x0: float, y0: float, process_noise: float = 1.0): + self._x = np.array([x0, y0, 0.0, 0.0], dtype=np.float64) + self._P = np.eye(4, dtype=np.float64) * 0.5 + self._Q = _Q_BASE * process_noise + + # ── Core filter ─────────────────────────────────────────────────────────── + + def predict(self, dt: float) -> None: + """Propagate state by dt seconds.""" + F = np.array([ + [1, 0, dt, 0], + [0, 1, 0, dt], + [0, 0, 1, 0], + [0, 0, 0, 1], + ], dtype=np.float64) + self._x = F @ self._x + self._P = F @ self._P @ F.T + self._Q + + def update(self, z: np.ndarray) -> None: + """ + Incorporate a position measurement z = [x, y]. + Uses Joseph-form covariance update for numerical stability. + """ + y = z.astype(np.float64) - _H @ self._x + S = _H @ self._P @ _H.T + _R + K = self._P @ _H.T @ np.linalg.inv(S) + self._x = self._x + K @ y + IKH = _I4 - K @ _H + # Joseph form: (I-KH) P (I-KH)^T + K R K^T + self._P = IKH @ self._P @ IKH.T + K @ _R @ K.T + + # ── Prediction horizon ──────────────────────────────────────────────────── + + def predict_horizon( + self, + horizon_s: float = 2.5, + step_s: float = 0.5, + ) -> List[Tuple[float, float, float]]: + """ + Return [(x, y, t), ...] at equally-spaced future times. + + Does NOT modify internal filter state. + """ + predictions: List[Tuple[float, float, float]] = [] + state = self._x.copy() + t = 0.0 + F_step = np.array([ + [1, 0, step_s, 0], + [0, 1, 0, step_s], + [0, 0, 1, 0], + [0, 0, 0, 1], + ], dtype=np.float64) + while t < horizon_s - 1e-6: + state = F_step @ state + t += step_s + predictions.append((float(state[0]), float(state[1]), t)) + return predictions + + # ── Properties ──────────────────────────────────────────────────────────── + + @property + def position(self) -> Tuple[float, float]: + return float(self._x[0]), float(self._x[1]) + + @property + def velocity(self) -> Tuple[float, float]: + return float(self._x[2]), float(self._x[3]) + + @property + def speed(self) -> float: + return float(np.hypot(self._x[2], self._x[3])) + + @property + def covariance_2x2(self) -> np.ndarray: + """Position covariance (top-left 2×2 of P).""" + return self._P[:2, :2].copy() + + @property + def state(self) -> np.ndarray: + return self._x.copy() diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/object_detector.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/object_detector.py new file mode 100644 index 0000000..be8c3d8 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/object_detector.py @@ -0,0 +1,168 @@ +""" +object_detector.py — LIDAR-based moving object detector. + +Algorithm: + 1. Convert each LaserScan to a 2-D occupancy grid (robot-centred, fixed size). + 2. Maintain a background model via exponential moving average (EMA): + bg_t = α * current + (1-α) * bg_{t-1} (only for non-moving cells) + 3. Foreground = cells whose occupancy significantly exceeds the background. + 4. Cluster foreground cells with scipy connected-components → Detection list. + +The grid is robot-relative (origin at robot centre) so it naturally tracks +the robot's motion without needing TF at this stage. The caller is responsible +for transforming detections into a stable frame (odom) before passing to the +tracker. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import List, Optional + +import numpy as np +from scipy import ndimage + + +@dataclass +class Detection: + """A clustered moving foreground blob from one scan.""" + x: float # centroid x in sensor frame (m) + y: float # centroid y in sensor frame (m) + size_m2: float # approximate area of the cluster (m²) + range_m: float # distance from robot (m) + + +class ObjectDetector: + """ + Detects moving objects in consecutive 2-D LIDAR scans. + + Parameters + ---------- + grid_radius_m : half-size of the occupancy grid (grid covers ±radius) + resolution : metres per cell + bg_alpha : EMA update rate for background (small = slow forgetting) + motion_thr : occupancy delta above background to count as moving + min_cluster : minimum cells to keep a cluster + max_cluster : maximum cells before a cluster is considered static wall + """ + + def __init__( + self, + grid_radius_m: float = 10.0, + resolution: float = 0.10, + bg_alpha: float = 0.04, + motion_thr: float = 0.45, + min_cluster: int = 3, + max_cluster: int = 200, + ): + cells = int(2 * grid_radius_m / resolution) + self._cells = cells + self._res = resolution + self._origin = -grid_radius_m # world x/y at grid index 0 + self._bg_alpha = bg_alpha + self._motion_thr = motion_thr + self._min_clust = min_cluster + self._max_clust = max_cluster + + self._bg = np.zeros((cells, cells), dtype=np.float32) + self._initialized = False + + # ── Public API ──────────────────────────────────────────────────────────── + + def update( + self, + ranges: np.ndarray, + angle_min: float, + angle_increment: float, + range_max: float, + ) -> List[Detection]: + """ + Process one LaserScan and return detected moving blobs. + + Parameters + ---------- + ranges : 1-D array of range readings (metres) + angle_min : angle of first beam (radians) + angle_increment : angular step between beams (radians) + range_max : maximum valid range (metres) + """ + curr = self._scan_to_grid(ranges, angle_min, angle_increment, range_max) + + if not self._initialized: + self._bg = curr.copy() + self._initialized = True + return [] + + # Foreground mask + motion_mask = (curr - self._bg) > self._motion_thr + + # Update background only on non-moving cells + static = ~motion_mask + self._bg[static] = ( + self._bg[static] * (1.0 - self._bg_alpha) + + curr[static] * self._bg_alpha + ) + + return self._cluster(motion_mask) + + def reset(self) -> None: + self._bg[:] = 0.0 + self._initialized = False + + # ── Internals ───────────────────────────────────────────────────────────── + + def _scan_to_grid( + self, + ranges: np.ndarray, + angle_min: float, + angle_increment: float, + range_max: float, + ) -> np.ndarray: + grid = np.zeros((self._cells, self._cells), dtype=np.float32) + n = len(ranges) + angles = angle_min + np.arange(n) * angle_increment + r = np.asarray(ranges, dtype=np.float32) + + valid = (r > 0.05) & (r < range_max) & np.isfinite(r) + r, a = r[valid], angles[valid] + + x = r * np.cos(a) + y = r * np.sin(a) + + ix = np.clip( + ((x - self._origin) / self._res).astype(np.int32), 0, self._cells - 1 + ) + iy = np.clip( + ((y - self._origin) / self._res).astype(np.int32), 0, self._cells - 1 + ) + grid[iy, ix] = 1.0 + return grid + + def _cluster(self, mask: np.ndarray) -> List[Detection]: + # Dilate slightly to connect nearby hit cells into one blob + struct = ndimage.generate_binary_structure(2, 2) + dilated = ndimage.binary_dilation(mask, structure=struct, iterations=1) + + labeled, n_labels = ndimage.label(dilated) + detections: List[Detection] = [] + + for label_id in range(1, n_labels + 1): + coords = np.argwhere(labeled == label_id) + n_cells = len(coords) + if n_cells < self._min_clust or n_cells > self._max_clust: + continue + + ys, xs = coords[:, 0], coords[:, 1] + cx_grid = float(np.mean(xs)) + cy_grid = float(np.mean(ys)) + cx = cx_grid * self._res + self._origin + cy = cy_grid * self._res + self._origin + + detections.append(Detection( + x=cx, + y=cy, + size_m2=n_cells * self._res ** 2, + range_m=float(np.hypot(cx, cy)), + )) + + return detections diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/tracker_manager.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/tracker_manager.py new file mode 100644 index 0000000..5862c94 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/tracker_manager.py @@ -0,0 +1,206 @@ +""" +tracker_manager.py — Multi-object tracker with Hungarian data association. + +Track lifecycle: + TENTATIVE → confirmed after `confirm_frames` consecutive hits + CONFIRMED → normal tracked state + LOST → missed for 1..max_missed frames (still predicts, not published) + DEAD → missed > max_missed → removed + +Association: + Uses scipy.optimize.linear_sum_assignment (Hungarian algorithm) on a cost + matrix of Euclidean distances between predicted track positions and new + detections. Assignments with cost > assoc_dist_m are rejected. + +Up to `max_tracks` simultaneous live tracks (tentative + confirmed). +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import IntEnum +from typing import Dict, List, Optional, Tuple + +import numpy as np +from scipy.optimize import linear_sum_assignment + +from .kalman_tracker import KalmanTracker +from .object_detector import Detection + + +class TrackState(IntEnum): + TENTATIVE = 0 + CONFIRMED = 1 + LOST = 2 + + +@dataclass +class Track: + track_id: int + kalman: KalmanTracker + state: TrackState = TrackState.TENTATIVE + hits: int = 1 + age: int = 1 # frames since creation + missed: int = 0 # consecutive missed frames + + +class TrackerManager: + """ + Manages a pool of Kalman tracks. + + Parameters + ---------- + max_tracks : hard cap on simultaneously alive tracks + confirm_frames : hits needed before a track is CONFIRMED + max_missed : consecutive missed frames before a track is DEAD + assoc_dist_m : max allowed distance (m) for a valid assignment + horizon_s : prediction horizon for trajectory output (seconds) + pred_step_s : time step between predicted waypoints + process_noise : KalmanTracker process-noise multiplier + """ + + def __init__( + self, + max_tracks: int = 20, + confirm_frames: int = 3, + max_missed: int = 6, + assoc_dist_m: float = 1.5, + horizon_s: float = 2.5, + pred_step_s: float = 0.5, + process_noise: float = 1.0, + ): + self._max_tracks = max_tracks + self._confirm_frames = confirm_frames + self._max_missed = max_missed + self._assoc_dist = assoc_dist_m + self._horizon_s = horizon_s + self._pred_step = pred_step_s + self._proc_noise = process_noise + + self._tracks: Dict[int, Track] = {} + self._next_id: int = 1 + + # ── Public API ──────────────────────────────────────────────────────────── + + def update(self, detections: List[Detection], dt: float) -> List[Track]: + """ + Process one frame of detections. + + 1. Predict all tracks by dt. + 2. Hungarian assignment of predictions → detections. + 3. Update matched tracks; mark unmatched tracks as LOST. + 4. Promote tracks crossing `confirm_frames`. + 5. Create new tracks for unmatched detections (if room). + 6. Remove DEAD tracks. + + Returns confirmed tracks only. + """ + # 1. Predict + for tr in self._tracks.values(): + tr.kalman.predict(dt) + tr.age += 1 + + # 2. Assign + matched, unmatched_tracks, unmatched_dets = self._assign(detections) + + # 3a. Update matched + for tid, did in matched: + tr = self._tracks[tid] + det = detections[did] + tr.kalman.update(np.array([det.x, det.y])) + tr.hits += 1 + tr.missed = 0 + if tr.state == TrackState.LOST: + tr.state = TrackState.CONFIRMED + elif tr.state == TrackState.TENTATIVE and tr.hits >= self._confirm_frames: + tr.state = TrackState.CONFIRMED + + # 3b. Unmatched tracks + for tid in unmatched_tracks: + tr = self._tracks[tid] + tr.missed += 1 + if tr.missed > 1: + tr.state = TrackState.LOST + + # 4. New tracks for unmatched detections + live = sum(1 for t in self._tracks.values() if t.state != TrackState.LOST + or t.missed <= self._max_missed) + for did in unmatched_dets: + if live >= self._max_tracks: + break + det = detections[did] + init_state = (TrackState.CONFIRMED + if self._confirm_frames <= 1 + else TrackState.TENTATIVE) + new_tr = Track( + track_id=self._next_id, + kalman=KalmanTracker(det.x, det.y, self._proc_noise), + state=init_state, + ) + self._tracks[self._next_id] = new_tr + self._next_id += 1 + live += 1 + + # 5. Prune dead + dead = [tid for tid, t in self._tracks.items() if t.missed > self._max_missed] + for tid in dead: + del self._tracks[tid] + + return [t for t in self._tracks.values() if t.state == TrackState.CONFIRMED] + + @property + def all_tracks(self) -> List[Track]: + return list(self._tracks.values()) + + @property + def tentative_count(self) -> int: + return sum(1 for t in self._tracks.values() + if t.state == TrackState.TENTATIVE) + + def reset(self) -> None: + self._tracks.clear() + self._next_id = 1 + + # ── Hungarian assignment ────────────────────────────────────────────────── + + def _assign( + self, + detections: List[Detection], + ) -> Tuple[List[Tuple[int, int]], List[int], List[int]]: + """ + Returns: + matched — list of (track_id, det_index) + unmatched_tids — track IDs with no detection assigned + unmatched_dids — detection indices with no track assigned + """ + track_ids = list(self._tracks.keys()) + + if not track_ids or not detections: + return [], track_ids, list(range(len(detections))) + + # Build cost matrix: rows=tracks, cols=detections + cost = np.full((len(track_ids), len(detections)), fill_value=np.inf) + for r, tid in enumerate(track_ids): + tx, ty = self._tracks[tid].kalman.position + for c, det in enumerate(detections): + cost[r, c] = np.hypot(tx - det.x, ty - det.y) + + row_ind, col_ind = linear_sum_assignment(cost) + + matched: List[Tuple[int, int]] = [] + matched_track_idx: set = set() + matched_det_idx: set = set() + + for r, c in zip(row_ind, col_ind): + if cost[r, c] > self._assoc_dist: + continue + matched.append((track_ids[r], c)) + matched_track_idx.add(r) + matched_det_idx.add(c) + + unmatched_tids = [track_ids[r] for r in range(len(track_ids)) + if r not in matched_track_idx] + unmatched_dids = [c for c in range(len(detections)) + if c not in matched_det_idx] + + return matched, unmatched_tids, unmatched_dids diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.cfg b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.cfg new file mode 100644 index 0000000..eb006bc --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltybot_dynamic_obstacles +[install] +install_scripts=$base/lib/saltybot_dynamic_obstacles diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.py new file mode 100644 index 0000000..62bae03 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.py @@ -0,0 +1,32 @@ +from setuptools import setup, find_packages +from glob import glob + +package_name = 'saltybot_dynamic_obstacles' + +setup( + name=package_name, + version='0.1.0', + packages=find_packages(exclude=['test']), + data_files=[ + ('share/ament_index/resource_index/packages', + ['resource/' + package_name]), + ('share/' + package_name, ['package.xml']), + ('share/' + package_name + '/launch', + glob('launch/*.launch.py')), + ('share/' + package_name + '/config', + glob('config/*.yaml')), + ], + install_requires=['setuptools'], + zip_safe=True, + maintainer='SaltyLab', + maintainer_email='robot@saltylab.local', + description='Dynamic obstacle tracking: LIDAR motion detection, Kalman tracking, Nav2 costmap', + license='MIT', + tests_require=['pytest'], + entry_points={ + 'console_scripts': [ + 'dynamic_obs_tracker = saltybot_dynamic_obstacles.dynamic_obs_node:main', + 'dynamic_obs_costmap = saltybot_dynamic_obstacles.costmap_layer_node:main', + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/test/test_dynamic_obstacles.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/test/test_dynamic_obstacles.py new file mode 100644 index 0000000..6b87c79 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/test/test_dynamic_obstacles.py @@ -0,0 +1,262 @@ +""" +test_dynamic_obstacles.py — Unit tests for KalmanTracker, TrackerManager, +and ObjectDetector. + +Runs without ROS2 / hardware (no rclpy imports). +""" + +from __future__ import annotations + +import math +import sys +import os + +import numpy as np +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from saltybot_dynamic_obstacles.kalman_tracker import KalmanTracker +from saltybot_dynamic_obstacles.tracker_manager import TrackerManager, TrackState +from saltybot_dynamic_obstacles.object_detector import ObjectDetector, Detection + + +# ── KalmanTracker ───────────────────────────────────────────────────────────── + +class TestKalmanTracker: + + def test_initial_position(self): + kt = KalmanTracker(3.0, 4.0) + px, py = kt.position + assert px == pytest.approx(3.0) + assert py == pytest.approx(4.0) + + def test_initial_velocity_zero(self): + kt = KalmanTracker(0.0, 0.0) + vx, vy = kt.velocity + assert vx == pytest.approx(0.0) + assert vy == pytest.approx(0.0) + + def test_predict_moves_position(self): + kt = KalmanTracker(0.0, 0.0) + # Give it some velocity via update sequence + kt.update(np.array([0.1, 0.0])) + kt.update(np.array([0.2, 0.0])) + kt.predict(0.1) + px, _ = kt.position + assert px > 0.0 # should have moved forward + + def test_pure_predict_constant_velocity(self): + """After velocity is established, predict() should move linearly.""" + kt = KalmanTracker(0.0, 0.0) + # Force velocity by repeated updates + for i in range(10): + kt.update(np.array([i * 0.1, 0.0])) + kt.predict(0.1) + vx, _ = kt.velocity + px0, _ = kt.position + kt.predict(1.0) + px1, _ = kt.position + # Should advance roughly vx * 1.0 metres + assert px1 == pytest.approx(px0 + vx * 1.0, abs=0.3) + + def test_update_corrects_position(self): + kt = KalmanTracker(0.0, 0.0) + # Predict way off + kt.predict(10.0) + # Then update to ground truth + kt.update(np.array([1.0, 2.0])) + px, py = kt.position + # Should move toward (1, 2) + assert px == pytest.approx(1.0, abs=0.5) + assert py == pytest.approx(2.0, abs=0.5) + + def test_predict_horizon_length(self): + kt = KalmanTracker(0.0, 0.0) + preds = kt.predict_horizon(horizon_s=2.5, step_s=0.5) + assert len(preds) == 5 # 0.5, 1.0, 1.5, 2.0, 2.5 + + def test_predict_horizon_times(self): + kt = KalmanTracker(0.0, 0.0) + preds = kt.predict_horizon(horizon_s=2.0, step_s=0.5) + times = [t for _, _, t in preds] + assert times == pytest.approx([0.5, 1.0, 1.5, 2.0], abs=0.01) + + def test_predict_horizon_does_not_mutate_state(self): + kt = KalmanTracker(1.0, 2.0) + kt.predict_horizon(horizon_s=3.0, step_s=0.5) + px, py = kt.position + assert px == pytest.approx(1.0) + assert py == pytest.approx(2.0) + + def test_speed_zero_at_init(self): + kt = KalmanTracker(5.0, 5.0) + assert kt.speed == pytest.approx(0.0) + + def test_covariance_shape(self): + kt = KalmanTracker(0.0, 0.0) + cov = kt.covariance_2x2 + assert cov.shape == (2, 2) + + def test_covariance_positive_definite(self): + kt = KalmanTracker(0.0, 0.0) + for _ in range(5): + kt.predict(0.1) + kt.update(np.array([0.1, 0.0])) + eigvals = np.linalg.eigvalsh(kt.covariance_2x2) + assert np.all(eigvals > 0) + + def test_joseph_form_stays_symmetric(self): + """Covariance should remain symmetric after many updates.""" + kt = KalmanTracker(0.0, 0.0) + for i in range(50): + kt.predict(0.1) + kt.update(np.array([i * 0.01, 0.0])) + P = kt._P + assert np.allclose(P, P.T, atol=1e-10) + + +# ── TrackerManager ──────────────────────────────────────────────────────────── + +class TestTrackerManager: + + def _det(self, x, y): + return Detection(x=x, y=y, size_m2=0.1, range_m=math.hypot(x, y)) + + def test_empty_detections_no_tracks(self): + tm = TrackerManager() + confirmed = tm.update([], 0.1) + assert confirmed == [] + + def test_track_created_on_detection(self): + tm = TrackerManager(confirm_frames=1) + confirmed = tm.update([self._det(1.0, 0.0)], 0.1) + assert len(confirmed) == 1 + + def test_track_tentative_before_confirm(self): + tm = TrackerManager(confirm_frames=3) + tm.update([self._det(1.0, 0.0)], 0.1) + # Only 1 hit — should still be tentative + assert tm.tentative_count == 1 + + def test_track_confirmed_after_N_hits(self): + tm = TrackerManager(confirm_frames=3, assoc_dist_m=2.0) + for _ in range(4): + confirmed = tm.update([self._det(1.0, 0.0)], 0.1) + assert len(confirmed) == 1 + + def test_track_deleted_after_max_missed(self): + tm = TrackerManager(confirm_frames=1, max_missed=3, assoc_dist_m=2.0) + tm.update([self._det(1.0, 0.0)], 0.1) # create + for _ in range(5): + tm.update([], 0.1) # no detections → missed++ + assert len(tm.all_tracks) == 0 + + def test_max_tracks_cap(self): + tm = TrackerManager(max_tracks=5, confirm_frames=1) + dets = [self._det(float(i), 0.0) for i in range(10)] + tm.update(dets, 0.1) + assert len(tm.all_tracks) <= 5 + + def test_consistent_track_id(self): + tm = TrackerManager(confirm_frames=3, assoc_dist_m=1.5) + for i in range(5): + confirmed = tm.update([self._det(1.0 + i * 0.01, 0.0)], 0.1) + assert len(confirmed) == 1 + track_id = confirmed[0].track_id + # One more tick — ID should be stable + confirmed2 = tm.update([self._det(1.06, 0.0)], 0.1) + assert confirmed2[0].track_id == track_id + + def test_two_independent_tracks(self): + tm = TrackerManager(confirm_frames=3, assoc_dist_m=0.8) + for _ in range(5): + confirmed = tm.update([self._det(1.0, 0.0), self._det(5.0, 0.0)], 0.1) + assert len(confirmed) == 2 + + def test_reset_clears_all(self): + tm = TrackerManager(confirm_frames=1) + tm.update([self._det(1.0, 0.0)], 0.1) + tm.reset() + assert len(tm.all_tracks) == 0 + + def test_far_detection_not_assigned(self): + tm = TrackerManager(confirm_frames=1, assoc_dist_m=0.5) + tm.update([self._det(1.0, 0.0)], 0.1) # create track at (1,0) + # Detection 3 m away → new track, not update + tm.update([self._det(4.0, 0.0)], 0.1) + assert len(tm.all_tracks) == 2 + + +# ── ObjectDetector ──────────────────────────────────────────────────────────── + +class TestObjectDetector: + + def _empty_scan(self, n=360, rmax=8.0) -> tuple: + """All readings at max range (static background).""" + ranges = np.full(n, rmax - 0.1, dtype=np.float32) + return ranges, -math.pi, 2 * math.pi / n, rmax + + def _scan_with_blob(self, blob_r=2.0, blob_theta=0.0, n=360, rmax=8.0) -> tuple: + """Background scan + a short-range cluster at blob_theta.""" + ranges = np.full(n, rmax - 0.1, dtype=np.float32) + angle_inc = 2 * math.pi / n + angle_min = -math.pi + # Put a cluster of ~10 beams at blob_r + center_idx = int((blob_theta - angle_min) / angle_inc) % n + for di in range(-5, 6): + idx = (center_idx + di) % n + ranges[idx] = blob_r + return ranges, angle_min, angle_inc, rmax + + def test_empty_scan_no_detections_after_warmup(self): + od = ObjectDetector() + r, a_min, a_inc, rmax = self._empty_scan() + od.update(r, a_min, a_inc, rmax) # init background + for _ in range(3): + dets = od.update(r, a_min, a_inc, rmax) + assert len(dets) == 0 + + def test_moving_blob_detected(self): + od = ObjectDetector() + bg_r, a_min, a_inc, rmax = self._empty_scan() + od.update(bg_r, a_min, a_inc, rmax) # seed background + for _ in range(5): + od.update(bg_r, a_min, a_inc, rmax) + # Now inject a foreground blob + fg_r, _, _, _ = self._scan_with_blob(blob_r=2.0, blob_theta=0.0) + dets = od.update(fg_r, a_min, a_inc, rmax) + assert len(dets) >= 1 + + def test_detection_centroid_approximate(self): + od = ObjectDetector() + bg_r, a_min, a_inc, rmax = self._empty_scan() + for _ in range(8): + od.update(bg_r, a_min, a_inc, rmax) + fg_r, _, _, _ = self._scan_with_blob(blob_r=3.0, blob_theta=0.0) + dets = od.update(fg_r, a_min, a_inc, rmax) + assert len(dets) >= 1 + # Blob is at ~3 m along x-axis (theta=0) + cx = dets[0].x + cy = dets[0].y + assert abs(cx - 3.0) < 0.5 + assert abs(cy) < 0.5 + + def test_reset_clears_background(self): + od = ObjectDetector() + bg_r, a_min, a_inc, rmax = self._empty_scan() + for _ in range(5): + od.update(bg_r, a_min, a_inc, rmax) + od.reset() + assert not od._initialized + + def test_no_inf_nan_ranges(self): + od = ObjectDetector() + r = np.array([np.inf, np.nan, 5.0, -1.0, 0.0] * 72, dtype=np.float32) + a_min = -math.pi + a_inc = 2 * math.pi / len(r) + od.update(r, a_min, a_inc, 8.0) # should not raise + + +if __name__ == '__main__': + pytest.main([__file__, '-v'])