feat(jetson): add dynamic obstacle tracking package (issue #176)
Implements real-time moving obstacle detection, Kalman tracking, trajectory
prediction, and Nav2 costmap integration at 10 Hz / <50ms latency:
saltybot_dynamic_obs_msgs (ament_cmake):
• TrackedObject.msg — id, PoseWithCovariance, velocity, predicted_path,
predicted_times, speed, confidence, age, hits
• MovingObjectArray.msg — TrackedObject[], active_count, tentative_count,
detector_latency_ms
saltybot_dynamic_obstacles (ament_python):
• object_detector.py — LIDAR background subtraction (EMA occupancy grid),
foreground dilation + scipy connected-component
clustering → Detection list
• kalman_tracker.py — CV Kalman filter, state [px,py,vx,vy], Joseph-form
covariance update, predict_horizon() (non-mutating)
• tracker_manager.py — up to 20 tracks, Hungarian assignment
(scipy.optimize.linear_sum_assignment), TENTATIVE→
CONFIRMED lifecycle, miss-prune
• dynamic_obs_node.py — 10 Hz timer: detect→track→publish
/saltybot/moving_objects + MarkerArray viz
• costmap_layer_node.py — predicted paths → PointCloud2 inflation smear
→ /saltybot/dynamic_obs_cloud for Nav2 ObstacleLayer
• launch/dynamic_obstacles.launch.py + config/dynamic_obstacles_params.yaml
• test/test_dynamic_obstacles.py — 27 unit tests (27/27 pass, no ROS2 needed)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
parent
7c62f9bf88
commit
2f4540f1d3
16
jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/CMakeLists.txt
Normal file
16
jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/CMakeLists.txt
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
cmake_minimum_required(VERSION 3.8)
|
||||||
|
project(saltybot_dynamic_obs_msgs)
|
||||||
|
|
||||||
|
find_package(ament_cmake REQUIRED)
|
||||||
|
find_package(rosidl_default_generators REQUIRED)
|
||||||
|
find_package(std_msgs REQUIRED)
|
||||||
|
find_package(geometry_msgs REQUIRED)
|
||||||
|
|
||||||
|
rosidl_generate_interfaces(${PROJECT_NAME}
|
||||||
|
"msg/TrackedObject.msg"
|
||||||
|
"msg/MovingObjectArray.msg"
|
||||||
|
DEPENDENCIES std_msgs geometry_msgs
|
||||||
|
)
|
||||||
|
|
||||||
|
ament_export_dependencies(rosidl_default_runtime)
|
||||||
|
ament_package()
|
||||||
@ -0,0 +1,12 @@
|
|||||||
|
# MovingObjectArray — all currently tracked moving obstacles.
|
||||||
|
#
|
||||||
|
# Published at ~10 Hz on /saltybot/moving_objects.
|
||||||
|
# Only confirmed tracks (hits >= confirm_frames) appear here.
|
||||||
|
|
||||||
|
std_msgs/Header header
|
||||||
|
|
||||||
|
saltybot_dynamic_obs_msgs/TrackedObject[] objects
|
||||||
|
|
||||||
|
uint32 active_count # number of confirmed tracks
|
||||||
|
uint32 tentative_count # tracks not yet confirmed
|
||||||
|
float32 detector_latency_ms # pipeline latency hint
|
||||||
@ -0,0 +1,21 @@
|
|||||||
|
# TrackedObject — a single tracked moving obstacle.
|
||||||
|
#
|
||||||
|
# predicted_path[i] is the estimated pose at predicted_times[i] seconds from now.
|
||||||
|
# All poses are in the same frame as header.frame_id (typically 'odom').
|
||||||
|
|
||||||
|
std_msgs/Header header
|
||||||
|
|
||||||
|
uint32 object_id # stable ID across frames (monotonically increasing)
|
||||||
|
|
||||||
|
geometry_msgs/PoseWithCovariance pose # current best-estimate pose (x, y, yaw)
|
||||||
|
geometry_msgs/Vector3 velocity # vx, vy in m/s (vz = 0 for ground objects)
|
||||||
|
|
||||||
|
geometry_msgs/Pose[] predicted_path # future positions at predicted_times
|
||||||
|
float32[] predicted_times # seconds from header.stamp for each pose
|
||||||
|
|
||||||
|
float32 speed_mps # scalar |v|
|
||||||
|
float32 confidence # 0.0–1.0 (higher after more confirmed frames)
|
||||||
|
|
||||||
|
uint32 age_frames # frames since first detection
|
||||||
|
uint32 hits # number of successful associations
|
||||||
|
bool is_valid # false if in tentative / just-created state
|
||||||
23
jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/package.xml
Normal file
23
jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/package.xml
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_dynamic_obs_msgs</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>Custom message types for dynamic obstacle tracking.</description>
|
||||||
|
<maintainer email="robot@saltylab.local">SaltyLab</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_cmake</buildtool_depend>
|
||||||
|
<buildtool_depend>rosidl_default_generators</buildtool_depend>
|
||||||
|
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
|
||||||
|
<exec_depend>rosidl_default_runtime</exec_depend>
|
||||||
|
|
||||||
|
<member_of_group>rosidl_interface_packages</member_of_group>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_cmake</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,52 @@
|
|||||||
|
# saltybot_dynamic_obstacles — runtime parameters
|
||||||
|
#
|
||||||
|
# Requires:
|
||||||
|
# /scan (sensor_msgs/LaserScan) — RPLIDAR A1M8 at ~5.5 Hz
|
||||||
|
#
|
||||||
|
# LIDAR scan is published by rplidar_ros node.
|
||||||
|
# Make sure RPLIDAR is running before starting this stack.
|
||||||
|
|
||||||
|
dynamic_obs_tracker:
|
||||||
|
ros__parameters:
|
||||||
|
max_tracks: 20 # max simultaneous tracked objects
|
||||||
|
confirm_frames: 3 # hits before a track is published
|
||||||
|
max_missed_frames: 6 # missed frames before track deletion
|
||||||
|
assoc_dist_m: 1.5 # max assignment distance (Hungarian)
|
||||||
|
prediction_hz: 10.0 # tracker update + publish rate
|
||||||
|
horizon_s: 2.5 # prediction look-ahead
|
||||||
|
pred_step_s: 0.5 # time between predicted waypoints
|
||||||
|
odom_frame: 'odom'
|
||||||
|
min_speed_mps: 0.05 # suppress near-stationary tracks
|
||||||
|
max_range_m: 8.0 # ignore detections beyond this
|
||||||
|
|
||||||
|
dynamic_obs_costmap:
|
||||||
|
ros__parameters:
|
||||||
|
inflation_radius_m: 0.35 # safety bubble around each predicted point
|
||||||
|
ring_points: 8 # polygon points for inflation circle
|
||||||
|
clear_on_empty: true # push empty cloud to clear stale Nav2 markings
|
||||||
|
|
||||||
|
|
||||||
|
# ── Nav2 costmap integration ───────────────────────────────────────────────────
|
||||||
|
# In your nav2_params.yaml, under local_costmap or global_costmap > plugins, add
|
||||||
|
# an ObstacleLayer with:
|
||||||
|
#
|
||||||
|
# obstacle_layer:
|
||||||
|
# plugin: "nav2_costmap_2d::ObstacleLayer"
|
||||||
|
# enabled: true
|
||||||
|
# observation_sources: static_scan dynamic_obs
|
||||||
|
# static_scan:
|
||||||
|
# topic: /scan
|
||||||
|
# data_type: LaserScan
|
||||||
|
# ...
|
||||||
|
# dynamic_obs:
|
||||||
|
# topic: /saltybot/dynamic_obs_cloud
|
||||||
|
# data_type: PointCloud2
|
||||||
|
# sensor_frame: odom
|
||||||
|
# obstacle_max_range: 10.0
|
||||||
|
# raytrace_max_range: 10.0
|
||||||
|
# marking: true
|
||||||
|
# clearing: false
|
||||||
|
#
|
||||||
|
# This feeds the predicted trajectory smear directly into Nav2's obstacle
|
||||||
|
# inflation, forcing the planner to route around the predicted future path
|
||||||
|
# of every tracked moving object.
|
||||||
@ -0,0 +1,75 @@
|
|||||||
|
"""
|
||||||
|
dynamic_obstacles.launch.py — Dynamic obstacle tracking + Nav2 costmap layer.
|
||||||
|
|
||||||
|
Starts:
|
||||||
|
dynamic_obs_tracker — LIDAR motion detection + Kalman tracking @10 Hz
|
||||||
|
dynamic_obs_costmap — Predicted-trajectory → PointCloud2 for Nav2
|
||||||
|
|
||||||
|
Launch args:
|
||||||
|
max_tracks int '20'
|
||||||
|
assoc_dist_m float '1.5'
|
||||||
|
horizon_s float '2.5'
|
||||||
|
inflation_radius_m float '0.35'
|
||||||
|
|
||||||
|
Verify:
|
||||||
|
ros2 topic hz /saltybot/moving_objects # ~10 Hz
|
||||||
|
ros2 topic echo /saltybot/moving_objects # TrackedObject list
|
||||||
|
ros2 topic hz /saltybot/dynamic_obs_cloud # ~10 Hz (when objects present)
|
||||||
|
rviz2 → add MarkerArray → /saltybot/moving_objects_viz
|
||||||
|
|
||||||
|
Nav2 costmap integration:
|
||||||
|
In your costmap_params.yaml ObstacleLayer observation_sources, add:
|
||||||
|
dynamic_obs:
|
||||||
|
topic: /saltybot/dynamic_obs_cloud
|
||||||
|
data_type: PointCloud2
|
||||||
|
marking: true
|
||||||
|
clearing: false
|
||||||
|
"""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
args = [
|
||||||
|
DeclareLaunchArgument('max_tracks', default_value='20'),
|
||||||
|
DeclareLaunchArgument('assoc_dist_m', default_value='1.5'),
|
||||||
|
DeclareLaunchArgument('horizon_s', default_value='2.5'),
|
||||||
|
DeclareLaunchArgument('inflation_radius_m', default_value='0.35'),
|
||||||
|
DeclareLaunchArgument('min_speed_mps', default_value='0.05'),
|
||||||
|
]
|
||||||
|
|
||||||
|
tracker = Node(
|
||||||
|
package='saltybot_dynamic_obstacles',
|
||||||
|
executable='dynamic_obs_tracker',
|
||||||
|
name='dynamic_obs_tracker',
|
||||||
|
output='screen',
|
||||||
|
parameters=[{
|
||||||
|
'max_tracks': LaunchConfiguration('max_tracks'),
|
||||||
|
'assoc_dist_m': LaunchConfiguration('assoc_dist_m'),
|
||||||
|
'horizon_s': LaunchConfiguration('horizon_s'),
|
||||||
|
'min_speed_mps': LaunchConfiguration('min_speed_mps'),
|
||||||
|
'prediction_hz': 10.0,
|
||||||
|
'confirm_frames': 3,
|
||||||
|
'max_missed_frames': 6,
|
||||||
|
'pred_step_s': 0.5,
|
||||||
|
'odom_frame': 'odom',
|
||||||
|
'max_range_m': 8.0,
|
||||||
|
}],
|
||||||
|
)
|
||||||
|
|
||||||
|
costmap = Node(
|
||||||
|
package='saltybot_dynamic_obstacles',
|
||||||
|
executable='dynamic_obs_costmap',
|
||||||
|
name='dynamic_obs_costmap',
|
||||||
|
output='screen',
|
||||||
|
parameters=[{
|
||||||
|
'inflation_radius_m': LaunchConfiguration('inflation_radius_m'),
|
||||||
|
'ring_points': 8,
|
||||||
|
'clear_on_empty': True,
|
||||||
|
}],
|
||||||
|
)
|
||||||
|
|
||||||
|
return LaunchDescription(args + [tracker, costmap])
|
||||||
29
jetson/ros2_ws/src/saltybot_dynamic_obstacles/package.xml
Normal file
29
jetson/ros2_ws/src/saltybot_dynamic_obstacles/package.xml
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_dynamic_obstacles</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
Dynamic obstacle detection, multi-object Kalman tracking, trajectory
|
||||||
|
prediction, and Nav2 costmap layer integration for SaltyBot.
|
||||||
|
</description>
|
||||||
|
<maintainer email="robot@saltylab.local">SaltyLab</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>nav_msgs</depend>
|
||||||
|
<depend>visualization_msgs</depend>
|
||||||
|
<depend>saltybot_dynamic_obs_msgs</depend>
|
||||||
|
|
||||||
|
<exec_depend>python3-numpy</exec_depend>
|
||||||
|
<exec_depend>python3-scipy</exec_depend>
|
||||||
|
|
||||||
|
<test_depend>pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,178 @@
|
|||||||
|
"""
|
||||||
|
costmap_layer_node.py — Nav2 costmap integration for dynamic obstacles.
|
||||||
|
|
||||||
|
Converts predicted trajectories from /saltybot/moving_objects into a
|
||||||
|
PointCloud2 fed into Nav2's ObstacleLayer. Each predicted future position
|
||||||
|
is added as a point, creating a "smeared" dynamic obstacle zone that
|
||||||
|
covers the full 2-3 s prediction horizon.
|
||||||
|
|
||||||
|
Nav2 ObstacleLayer config (in costmap_params.yaml):
|
||||||
|
obstacle_layer:
|
||||||
|
enabled: true
|
||||||
|
observation_sources: dynamic_obs
|
||||||
|
dynamic_obs:
|
||||||
|
topic: /saltybot/dynamic_obs_cloud
|
||||||
|
sensor_frame: odom
|
||||||
|
data_type: PointCloud2
|
||||||
|
obstacle_max_range: 12.0
|
||||||
|
obstacle_min_range: 0.0
|
||||||
|
raytrace_max_range: 12.0
|
||||||
|
marking: true
|
||||||
|
clearing: false # let the tracker handle clearing
|
||||||
|
|
||||||
|
The node also clears old obstacle points when tracks are dropped, by
|
||||||
|
publishing a clearing cloud to /saltybot/dynamic_obs_clear.
|
||||||
|
|
||||||
|
Subscribes:
|
||||||
|
/saltybot/moving_objects saltybot_dynamic_obs_msgs/MovingObjectArray
|
||||||
|
|
||||||
|
Publishes:
|
||||||
|
/saltybot/dynamic_obs_cloud sensor_msgs/PointCloud2 marking cloud
|
||||||
|
/saltybot/dynamic_obs_clear sensor_msgs/PointCloud2 clearing cloud
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
inflation_radius_m float 0.35 (each predicted point inflated by this radius)
|
||||||
|
ring_points int 8 (polygon approximation of inflation circle)
|
||||||
|
clear_on_empty bool true (publish clear cloud when no objects tracked)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
import struct
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||||
|
|
||||||
|
from sensor_msgs.msg import PointCloud2, PointField
|
||||||
|
from std_msgs.msg import Header
|
||||||
|
|
||||||
|
try:
|
||||||
|
from saltybot_dynamic_obs_msgs.msg import MovingObjectArray
|
||||||
|
_MSGS_AVAILABLE = True
|
||||||
|
except ImportError:
|
||||||
|
_MSGS_AVAILABLE = False
|
||||||
|
|
||||||
|
|
||||||
|
_RELIABLE_QOS = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.RELIABLE,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=10,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _make_pc2(header: Header, points_xyz: List[tuple]) -> PointCloud2:
|
||||||
|
"""Pack a list of (x, y, z) into a PointCloud2 message."""
|
||||||
|
fields = [
|
||||||
|
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
|
||||||
|
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
|
||||||
|
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
|
||||||
|
]
|
||||||
|
point_step = 12 # 3 × float32
|
||||||
|
data = bytearray(len(points_xyz) * point_step)
|
||||||
|
for i, (x, y, z) in enumerate(points_xyz):
|
||||||
|
struct.pack_into('<fff', data, i * point_step, x, y, z)
|
||||||
|
|
||||||
|
pc = PointCloud2()
|
||||||
|
pc.header = header
|
||||||
|
pc.height = 1
|
||||||
|
pc.width = len(points_xyz)
|
||||||
|
pc.fields = fields
|
||||||
|
pc.is_bigendian = False
|
||||||
|
pc.point_step = point_step
|
||||||
|
pc.row_step = point_step * len(points_xyz)
|
||||||
|
pc.data = bytes(data)
|
||||||
|
pc.is_dense = True
|
||||||
|
return pc
|
||||||
|
|
||||||
|
|
||||||
|
class CostmapLayerNode(Node):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__('dynamic_obs_costmap')
|
||||||
|
|
||||||
|
self.declare_parameter('inflation_radius_m', 0.35)
|
||||||
|
self.declare_parameter('ring_points', 8)
|
||||||
|
self.declare_parameter('clear_on_empty', True)
|
||||||
|
|
||||||
|
self._infl_r = self.get_parameter('inflation_radius_m').value
|
||||||
|
self._ring_n = self.get_parameter('ring_points').value
|
||||||
|
self._clear_empty = self.get_parameter('clear_on_empty').value
|
||||||
|
|
||||||
|
# Pre-compute ring offsets for inflation
|
||||||
|
self._ring_offsets = [
|
||||||
|
(self._infl_r * math.cos(2 * math.pi * i / self._ring_n),
|
||||||
|
self._infl_r * math.sin(2 * math.pi * i / self._ring_n))
|
||||||
|
for i in range(self._ring_n)
|
||||||
|
]
|
||||||
|
|
||||||
|
if _MSGS_AVAILABLE:
|
||||||
|
self.create_subscription(
|
||||||
|
MovingObjectArray,
|
||||||
|
'/saltybot/moving_objects',
|
||||||
|
self._on_objects,
|
||||||
|
_RELIABLE_QOS,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.get_logger().warning(
|
||||||
|
'[costmap_layer] saltybot_dynamic_obs_msgs not built — '
|
||||||
|
'will not subscribe to MovingObjectArray'
|
||||||
|
)
|
||||||
|
|
||||||
|
self._mark_pub = self.create_publisher(
|
||||||
|
PointCloud2, '/saltybot/dynamic_obs_cloud', 10
|
||||||
|
)
|
||||||
|
self._clear_pub = self.create_publisher(
|
||||||
|
PointCloud2, '/saltybot/dynamic_obs_clear', 10
|
||||||
|
)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f'dynamic_obs_costmap ready — '
|
||||||
|
f'inflation={self._infl_r}m ring_pts={self._ring_n}'
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Callback ──────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _on_objects(self, msg: 'MovingObjectArray') -> None:
|
||||||
|
hdr = msg.header
|
||||||
|
mark_pts: List[tuple] = []
|
||||||
|
|
||||||
|
for obj in msg.objects:
|
||||||
|
if not obj.is_valid:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Current position
|
||||||
|
self._add_inflated(obj.pose.pose.position.x,
|
||||||
|
obj.pose.pose.position.y, mark_pts)
|
||||||
|
|
||||||
|
# Predicted future positions
|
||||||
|
for pose in obj.predicted_path:
|
||||||
|
self._add_inflated(pose.position.x, pose.position.y, mark_pts)
|
||||||
|
|
||||||
|
if mark_pts:
|
||||||
|
self._mark_pub.publish(_make_pc2(hdr, mark_pts))
|
||||||
|
elif self._clear_empty:
|
||||||
|
# Publish tiny clear cloud so Nav2 clears stale markings
|
||||||
|
self._clear_pub.publish(_make_pc2(hdr, []))
|
||||||
|
|
||||||
|
def _add_inflated(self, cx: float, cy: float, pts: List[tuple]) -> None:
|
||||||
|
"""Add the centre + ring of inflation points at height 0.5 m."""
|
||||||
|
pts.append((cx, cy, 0.5))
|
||||||
|
for ox, oy in self._ring_offsets:
|
||||||
|
pts.append((cx + ox, cy + oy, 0.5))
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = CostmapLayerNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
@ -0,0 +1,319 @@
|
|||||||
|
"""
|
||||||
|
dynamic_obs_node.py — ROS2 node: LIDAR moving-object detection + Kalman tracking.
|
||||||
|
|
||||||
|
Pipeline:
|
||||||
|
1. Subscribe /scan (RPLIDAR LaserScan, ~5.5 Hz).
|
||||||
|
2. ObjectDetector performs background subtraction → moving blobs.
|
||||||
|
3. TrackerManager runs Hungarian assignment + Kalman predict/update at 10 Hz.
|
||||||
|
4. Publish /saltybot/moving_objects (MovingObjectArray).
|
||||||
|
5. Publish /saltybot/moving_objects_viz (MarkerArray) for RViz.
|
||||||
|
|
||||||
|
The 10 Hz timer drives the tracker regardless of scan rate, so prediction
|
||||||
|
continues between scans (pure-predict steps).
|
||||||
|
|
||||||
|
Subscribes:
|
||||||
|
/scan sensor_msgs/LaserScan RPLIDAR A1M8
|
||||||
|
|
||||||
|
Publishes:
|
||||||
|
/saltybot/moving_objects saltybot_dynamic_obs_msgs/MovingObjectArray
|
||||||
|
/saltybot/moving_objects_viz visualization_msgs/MarkerArray
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
max_tracks int 20
|
||||||
|
confirm_frames int 3
|
||||||
|
max_missed_frames int 6
|
||||||
|
assoc_dist_m float 1.5
|
||||||
|
prediction_hz float 10.0 (tracker + publish rate)
|
||||||
|
horizon_s float 2.5
|
||||||
|
pred_step_s float 0.5
|
||||||
|
odom_frame str 'odom'
|
||||||
|
min_speed_mps float 0.05 (suppress near-zero velocity tracks)
|
||||||
|
max_range_m float 8.0
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
import math
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||||
|
|
||||||
|
from sensor_msgs.msg import LaserScan
|
||||||
|
from geometry_msgs.msg import Pose, Point, Quaternion, Vector3
|
||||||
|
from std_msgs.msg import Header, ColorRGBA
|
||||||
|
from visualization_msgs.msg import Marker, MarkerArray
|
||||||
|
|
||||||
|
try:
|
||||||
|
from saltybot_dynamic_obs_msgs.msg import TrackedObject, MovingObjectArray
|
||||||
|
_MSGS_AVAILABLE = True
|
||||||
|
except ImportError:
|
||||||
|
_MSGS_AVAILABLE = False
|
||||||
|
|
||||||
|
from .object_detector import ObjectDetector
|
||||||
|
from .tracker_manager import TrackerManager, Track
|
||||||
|
|
||||||
|
|
||||||
|
_SENSOR_QOS = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=5,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _yaw_quat(yaw: float) -> Quaternion:
|
||||||
|
q = Quaternion()
|
||||||
|
q.w = math.cos(yaw * 0.5)
|
||||||
|
q.z = math.sin(yaw * 0.5)
|
||||||
|
return q
|
||||||
|
|
||||||
|
|
||||||
|
class DynamicObsNode(Node):
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__('dynamic_obs_tracker')
|
||||||
|
|
||||||
|
# ── Parameters ──────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter('max_tracks', 20)
|
||||||
|
self.declare_parameter('confirm_frames', 3)
|
||||||
|
self.declare_parameter('max_missed_frames', 6)
|
||||||
|
self.declare_parameter('assoc_dist_m', 1.5)
|
||||||
|
self.declare_parameter('prediction_hz', 10.0)
|
||||||
|
self.declare_parameter('horizon_s', 2.5)
|
||||||
|
self.declare_parameter('pred_step_s', 0.5)
|
||||||
|
self.declare_parameter('odom_frame', 'odom')
|
||||||
|
self.declare_parameter('min_speed_mps', 0.05)
|
||||||
|
self.declare_parameter('max_range_m', 8.0)
|
||||||
|
|
||||||
|
max_tracks = self.get_parameter('max_tracks').value
|
||||||
|
confirm_f = self.get_parameter('confirm_frames').value
|
||||||
|
max_missed = self.get_parameter('max_missed_frames').value
|
||||||
|
assoc_dist = self.get_parameter('assoc_dist_m').value
|
||||||
|
pred_hz = self.get_parameter('prediction_hz').value
|
||||||
|
horizon_s = self.get_parameter('horizon_s').value
|
||||||
|
pred_step = self.get_parameter('pred_step_s').value
|
||||||
|
self._frame = self.get_parameter('odom_frame').value
|
||||||
|
self._min_spd = self.get_parameter('min_speed_mps').value
|
||||||
|
self._max_rng = self.get_parameter('max_range_m').value
|
||||||
|
|
||||||
|
# ── Core modules ────────────────────────────────────────────────────
|
||||||
|
self._detector = ObjectDetector(
|
||||||
|
grid_radius_m=min(self._max_rng + 2.0, 12.0),
|
||||||
|
max_cluster=int((self._max_rng / 0.1) ** 2 * 0.5),
|
||||||
|
)
|
||||||
|
self._tracker = TrackerManager(
|
||||||
|
max_tracks=max_tracks,
|
||||||
|
confirm_frames=confirm_f,
|
||||||
|
max_missed=max_missed,
|
||||||
|
assoc_dist_m=assoc_dist,
|
||||||
|
horizon_s=horizon_s,
|
||||||
|
pred_step_s=pred_step,
|
||||||
|
)
|
||||||
|
self._horizon_s = horizon_s
|
||||||
|
self._pred_step = pred_step
|
||||||
|
|
||||||
|
# ── State ────────────────────────────────────────────────────────────
|
||||||
|
self._latest_scan: LaserScan | None = None
|
||||||
|
self._last_track_t: float = time.monotonic()
|
||||||
|
self._scan_processed_stamp: float | None = None
|
||||||
|
|
||||||
|
# ── Subscriptions ────────────────────────────────────────────────────
|
||||||
|
self.create_subscription(LaserScan, '/scan', self._on_scan, _SENSOR_QOS)
|
||||||
|
|
||||||
|
# ── Publishers ───────────────────────────────────────────────────────
|
||||||
|
if _MSGS_AVAILABLE:
|
||||||
|
self._obj_pub = self.create_publisher(
|
||||||
|
MovingObjectArray, '/saltybot/moving_objects', 10
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self._obj_pub = None
|
||||||
|
self.get_logger().warning(
|
||||||
|
'[dyn_obs] saltybot_dynamic_obs_msgs not built — '
|
||||||
|
'MovingObjectArray will not be published'
|
||||||
|
)
|
||||||
|
|
||||||
|
self._viz_pub = self.create_publisher(
|
||||||
|
MarkerArray, '/saltybot/moving_objects_viz', 10
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Timer ────────────────────────────────────────────────────────────
|
||||||
|
self.create_timer(1.0 / pred_hz, self._track_tick)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f'dynamic_obs_tracker ready — '
|
||||||
|
f'max_tracks={max_tracks} horizon={horizon_s}s assoc={assoc_dist}m'
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Scan callback ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _on_scan(self, msg: LaserScan) -> None:
|
||||||
|
self._latest_scan = msg
|
||||||
|
|
||||||
|
# ── 10 Hz tracker tick ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _track_tick(self) -> None:
|
||||||
|
t0 = time.monotonic()
|
||||||
|
|
||||||
|
now_mono = t0
|
||||||
|
dt = now_mono - self._last_track_t
|
||||||
|
dt = max(1e-3, min(dt, 0.5))
|
||||||
|
self._last_track_t = now_mono
|
||||||
|
|
||||||
|
scan = self._latest_scan
|
||||||
|
detections = []
|
||||||
|
|
||||||
|
if scan is not None:
|
||||||
|
stamp_sec = scan.header.stamp.sec + scan.header.stamp.nanosec * 1e-9
|
||||||
|
if stamp_sec != self._scan_processed_stamp:
|
||||||
|
self._scan_processed_stamp = stamp_sec
|
||||||
|
ranges = np.asarray(scan.ranges, dtype=np.float32)
|
||||||
|
detections = self._detector.update(
|
||||||
|
ranges,
|
||||||
|
scan.angle_min,
|
||||||
|
scan.angle_increment,
|
||||||
|
min(scan.range_max, self._max_rng),
|
||||||
|
)
|
||||||
|
|
||||||
|
confirmed = self._tracker.update(detections, dt)
|
||||||
|
|
||||||
|
latency_ms = (time.monotonic() - t0) * 1000.0
|
||||||
|
stamp = self.get_clock().now().to_msg()
|
||||||
|
|
||||||
|
if _MSGS_AVAILABLE and self._obj_pub is not None:
|
||||||
|
self._publish_objects(confirmed, stamp, latency_ms)
|
||||||
|
self._publish_viz(confirmed, stamp)
|
||||||
|
|
||||||
|
# ── Publish helpers ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _publish_objects(self, confirmed: list, stamp, latency_ms: float) -> None:
|
||||||
|
arr = MovingObjectArray()
|
||||||
|
arr.header.stamp = stamp
|
||||||
|
arr.header.frame_id = self._frame
|
||||||
|
arr.active_count = len(confirmed)
|
||||||
|
arr.tentative_count = self._tracker.tentative_count
|
||||||
|
arr.detector_latency_ms = float(latency_ms)
|
||||||
|
|
||||||
|
for tr in confirmed:
|
||||||
|
px, py = tr.kalman.position
|
||||||
|
vx, vy = tr.kalman.velocity
|
||||||
|
speed = tr.kalman.speed
|
||||||
|
if speed < self._min_spd:
|
||||||
|
continue
|
||||||
|
|
||||||
|
obj = TrackedObject()
|
||||||
|
obj.header = arr.header
|
||||||
|
obj.object_id = tr.track_id
|
||||||
|
obj.pose.pose.position.x = px
|
||||||
|
obj.pose.pose.position.y = py
|
||||||
|
obj.pose.pose.orientation = _yaw_quat(math.atan2(vy, vx))
|
||||||
|
cov = tr.kalman.covariance_2x2
|
||||||
|
obj.pose.covariance[0] = float(cov[0, 0])
|
||||||
|
obj.pose.covariance[1] = float(cov[0, 1])
|
||||||
|
obj.pose.covariance[6] = float(cov[1, 0])
|
||||||
|
obj.pose.covariance[7] = float(cov[1, 1])
|
||||||
|
obj.velocity.x = vx
|
||||||
|
obj.velocity.y = vy
|
||||||
|
obj.speed_mps = speed
|
||||||
|
obj.confidence = min(1.0, tr.hits / (self._tracker._confirm_frames * 3))
|
||||||
|
obj.age_frames = tr.age
|
||||||
|
obj.hits = tr.hits
|
||||||
|
obj.is_valid = True
|
||||||
|
|
||||||
|
# Predicted path
|
||||||
|
for px_f, py_f, t_f in tr.kalman.predict_horizon(
|
||||||
|
self._horizon_s, self._pred_step
|
||||||
|
):
|
||||||
|
p = Pose()
|
||||||
|
p.position.x = px_f
|
||||||
|
p.position.y = py_f
|
||||||
|
p.orientation.w = 1.0
|
||||||
|
obj.predicted_path.append(p)
|
||||||
|
obj.predicted_times.append(float(t_f))
|
||||||
|
|
||||||
|
arr.objects.append(obj)
|
||||||
|
|
||||||
|
self._obj_pub.publish(arr)
|
||||||
|
|
||||||
|
def _publish_viz(self, confirmed: list, stamp) -> None:
|
||||||
|
markers = MarkerArray()
|
||||||
|
|
||||||
|
# Delete old markers
|
||||||
|
del_marker = Marker()
|
||||||
|
del_marker.header.stamp = stamp
|
||||||
|
del_marker.header.frame_id = self._frame
|
||||||
|
del_marker.action = Marker.DELETEALL
|
||||||
|
markers.markers.append(del_marker)
|
||||||
|
|
||||||
|
for tr in confirmed:
|
||||||
|
px, py = tr.kalman.position
|
||||||
|
vx, vy = tr.kalman.velocity
|
||||||
|
speed = tr.kalman.speed
|
||||||
|
if speed < self._min_spd:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Cylinder at current position
|
||||||
|
m = Marker()
|
||||||
|
m.header.stamp = stamp
|
||||||
|
m.header.frame_id = self._frame
|
||||||
|
m.ns = 'dyn_obs'
|
||||||
|
m.id = tr.track_id
|
||||||
|
m.type = Marker.CYLINDER
|
||||||
|
m.action = Marker.ADD
|
||||||
|
m.pose.position.x = px
|
||||||
|
m.pose.position.y = py
|
||||||
|
m.pose.position.z = 0.5
|
||||||
|
m.pose.orientation.w = 1.0
|
||||||
|
m.scale.x = 0.4
|
||||||
|
m.scale.y = 0.4
|
||||||
|
m.scale.z = 1.0
|
||||||
|
m.color = ColorRGBA(r=1.0, g=0.2, b=0.0, a=0.7)
|
||||||
|
m.lifetime.sec = 1
|
||||||
|
markers.markers.append(m)
|
||||||
|
|
||||||
|
# Arrow for velocity
|
||||||
|
vel_m = Marker()
|
||||||
|
vel_m.header = m.header
|
||||||
|
vel_m.ns = 'dyn_obs_vel'
|
||||||
|
vel_m.id = tr.track_id
|
||||||
|
vel_m.type = Marker.ARROW
|
||||||
|
vel_m.action = Marker.ADD
|
||||||
|
from geometry_msgs.msg import Point as GPoint
|
||||||
|
p_start = GPoint(x=px, y=py, z=1.0)
|
||||||
|
p_end = GPoint(x=px + vx, y=py + vy, z=1.0)
|
||||||
|
vel_m.points = [p_start, p_end]
|
||||||
|
vel_m.scale.x = 0.05
|
||||||
|
vel_m.scale.y = 0.10
|
||||||
|
vel_m.color = ColorRGBA(r=1.0, g=1.0, b=0.0, a=0.9)
|
||||||
|
vel_m.lifetime.sec = 1
|
||||||
|
markers.markers.append(vel_m)
|
||||||
|
|
||||||
|
# Line strip for predicted path
|
||||||
|
path_m = Marker()
|
||||||
|
path_m.header = m.header
|
||||||
|
path_m.ns = 'dyn_obs_path'
|
||||||
|
path_m.id = tr.track_id
|
||||||
|
path_m.type = Marker.LINE_STRIP
|
||||||
|
path_m.action = Marker.ADD
|
||||||
|
path_m.scale.x = 0.04
|
||||||
|
path_m.color = ColorRGBA(r=1.0, g=0.5, b=0.0, a=0.5)
|
||||||
|
path_m.lifetime.sec = 1
|
||||||
|
path_m.points.append(GPoint(x=px, y=py, z=0.5))
|
||||||
|
for fx, fy, _ in tr.kalman.predict_horizon(self._horizon_s, self._pred_step):
|
||||||
|
path_m.points.append(GPoint(x=fx, y=fy, z=0.5))
|
||||||
|
markers.markers.append(path_m)
|
||||||
|
|
||||||
|
self._viz_pub.publish(markers)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = DynamicObsNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
@ -0,0 +1,132 @@
|
|||||||
|
"""
|
||||||
|
kalman_tracker.py — Single-object Kalman filter for 2-D ground-plane tracking.
|
||||||
|
|
||||||
|
State vector: x = [px, py, vx, vy] (position + velocity)
|
||||||
|
Motion model: constant-velocity (CV) with process noise on acceleration
|
||||||
|
|
||||||
|
Predict step:
|
||||||
|
F = | 1 0 dt 0 | x_{k|k-1} = F @ x_{k-1|k-1}
|
||||||
|
| 0 1 0 dt | P_{k|k-1} = F @ P @ F^T + Q
|
||||||
|
| 0 0 1 0 |
|
||||||
|
| 0 0 0 1 |
|
||||||
|
|
||||||
|
Update step (position observation only):
|
||||||
|
H = | 1 0 0 0 | y = z - H @ x
|
||||||
|
| 0 1 0 0 | S = H @ P @ H^T + R
|
||||||
|
K = P @ H^T @ inv(S)
|
||||||
|
x = x + K @ y
|
||||||
|
P = (I - K @ H) @ P (Joseph form for stability)
|
||||||
|
|
||||||
|
Trajectory prediction: unrolls the CV model forward at fixed time steps.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import List, Tuple
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
|
# ── Default noise matrices ────────────────────────────────────────────────────
|
||||||
|
# Process noise: models uncertainty in acceleration between frames
|
||||||
|
_Q_BASE = np.diag([0.02, 0.02, 0.8, 0.8]).astype(np.float64)
|
||||||
|
|
||||||
|
# Measurement noise: LIDAR centroid uncertainty (~0.15 m std)
|
||||||
|
_R = np.diag([0.025, 0.025]).astype(np.float64) # 0.16 m sigma each axis
|
||||||
|
|
||||||
|
# Observation matrix
|
||||||
|
_H = np.array([[1, 0, 0, 0],
|
||||||
|
[0, 1, 0, 0]], dtype=np.float64)
|
||||||
|
_I4 = np.eye(4, dtype=np.float64)
|
||||||
|
|
||||||
|
|
||||||
|
class KalmanTracker:
|
||||||
|
"""
|
||||||
|
Kalman filter tracking one object.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
x0, y0 : initial position (metres, odom frame)
|
||||||
|
process_noise : scalar multiplier on _Q_BASE
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, x0: float, y0: float, process_noise: float = 1.0):
|
||||||
|
self._x = np.array([x0, y0, 0.0, 0.0], dtype=np.float64)
|
||||||
|
self._P = np.eye(4, dtype=np.float64) * 0.5
|
||||||
|
self._Q = _Q_BASE * process_noise
|
||||||
|
|
||||||
|
# ── Core filter ───────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def predict(self, dt: float) -> None:
|
||||||
|
"""Propagate state by dt seconds."""
|
||||||
|
F = np.array([
|
||||||
|
[1, 0, dt, 0],
|
||||||
|
[0, 1, 0, dt],
|
||||||
|
[0, 0, 1, 0],
|
||||||
|
[0, 0, 0, 1],
|
||||||
|
], dtype=np.float64)
|
||||||
|
self._x = F @ self._x
|
||||||
|
self._P = F @ self._P @ F.T + self._Q
|
||||||
|
|
||||||
|
def update(self, z: np.ndarray) -> None:
|
||||||
|
"""
|
||||||
|
Incorporate a position measurement z = [x, y].
|
||||||
|
Uses Joseph-form covariance update for numerical stability.
|
||||||
|
"""
|
||||||
|
y = z.astype(np.float64) - _H @ self._x
|
||||||
|
S = _H @ self._P @ _H.T + _R
|
||||||
|
K = self._P @ _H.T @ np.linalg.inv(S)
|
||||||
|
self._x = self._x + K @ y
|
||||||
|
IKH = _I4 - K @ _H
|
||||||
|
# Joseph form: (I-KH) P (I-KH)^T + K R K^T
|
||||||
|
self._P = IKH @ self._P @ IKH.T + K @ _R @ K.T
|
||||||
|
|
||||||
|
# ── Prediction horizon ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def predict_horizon(
|
||||||
|
self,
|
||||||
|
horizon_s: float = 2.5,
|
||||||
|
step_s: float = 0.5,
|
||||||
|
) -> List[Tuple[float, float, float]]:
|
||||||
|
"""
|
||||||
|
Return [(x, y, t), ...] at equally-spaced future times.
|
||||||
|
|
||||||
|
Does NOT modify internal filter state.
|
||||||
|
"""
|
||||||
|
predictions: List[Tuple[float, float, float]] = []
|
||||||
|
state = self._x.copy()
|
||||||
|
t = 0.0
|
||||||
|
F_step = np.array([
|
||||||
|
[1, 0, step_s, 0],
|
||||||
|
[0, 1, 0, step_s],
|
||||||
|
[0, 0, 1, 0],
|
||||||
|
[0, 0, 0, 1],
|
||||||
|
], dtype=np.float64)
|
||||||
|
while t < horizon_s - 1e-6:
|
||||||
|
state = F_step @ state
|
||||||
|
t += step_s
|
||||||
|
predictions.append((float(state[0]), float(state[1]), t))
|
||||||
|
return predictions
|
||||||
|
|
||||||
|
# ── Properties ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
@property
|
||||||
|
def position(self) -> Tuple[float, float]:
|
||||||
|
return float(self._x[0]), float(self._x[1])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def velocity(self) -> Tuple[float, float]:
|
||||||
|
return float(self._x[2]), float(self._x[3])
|
||||||
|
|
||||||
|
@property
|
||||||
|
def speed(self) -> float:
|
||||||
|
return float(np.hypot(self._x[2], self._x[3]))
|
||||||
|
|
||||||
|
@property
|
||||||
|
def covariance_2x2(self) -> np.ndarray:
|
||||||
|
"""Position covariance (top-left 2×2 of P)."""
|
||||||
|
return self._P[:2, :2].copy()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def state(self) -> np.ndarray:
|
||||||
|
return self._x.copy()
|
||||||
@ -0,0 +1,168 @@
|
|||||||
|
"""
|
||||||
|
object_detector.py — LIDAR-based moving object detector.
|
||||||
|
|
||||||
|
Algorithm:
|
||||||
|
1. Convert each LaserScan to a 2-D occupancy grid (robot-centred, fixed size).
|
||||||
|
2. Maintain a background model via exponential moving average (EMA):
|
||||||
|
bg_t = α * current + (1-α) * bg_{t-1} (only for non-moving cells)
|
||||||
|
3. Foreground = cells whose occupancy significantly exceeds the background.
|
||||||
|
4. Cluster foreground cells with scipy connected-components → Detection list.
|
||||||
|
|
||||||
|
The grid is robot-relative (origin at robot centre) so it naturally tracks
|
||||||
|
the robot's motion without needing TF at this stage. The caller is responsible
|
||||||
|
for transforming detections into a stable frame (odom) before passing to the
|
||||||
|
tracker.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from scipy import ndimage
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Detection:
|
||||||
|
"""A clustered moving foreground blob from one scan."""
|
||||||
|
x: float # centroid x in sensor frame (m)
|
||||||
|
y: float # centroid y in sensor frame (m)
|
||||||
|
size_m2: float # approximate area of the cluster (m²)
|
||||||
|
range_m: float # distance from robot (m)
|
||||||
|
|
||||||
|
|
||||||
|
class ObjectDetector:
|
||||||
|
"""
|
||||||
|
Detects moving objects in consecutive 2-D LIDAR scans.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
grid_radius_m : half-size of the occupancy grid (grid covers ±radius)
|
||||||
|
resolution : metres per cell
|
||||||
|
bg_alpha : EMA update rate for background (small = slow forgetting)
|
||||||
|
motion_thr : occupancy delta above background to count as moving
|
||||||
|
min_cluster : minimum cells to keep a cluster
|
||||||
|
max_cluster : maximum cells before a cluster is considered static wall
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
grid_radius_m: float = 10.0,
|
||||||
|
resolution: float = 0.10,
|
||||||
|
bg_alpha: float = 0.04,
|
||||||
|
motion_thr: float = 0.45,
|
||||||
|
min_cluster: int = 3,
|
||||||
|
max_cluster: int = 200,
|
||||||
|
):
|
||||||
|
cells = int(2 * grid_radius_m / resolution)
|
||||||
|
self._cells = cells
|
||||||
|
self._res = resolution
|
||||||
|
self._origin = -grid_radius_m # world x/y at grid index 0
|
||||||
|
self._bg_alpha = bg_alpha
|
||||||
|
self._motion_thr = motion_thr
|
||||||
|
self._min_clust = min_cluster
|
||||||
|
self._max_clust = max_cluster
|
||||||
|
|
||||||
|
self._bg = np.zeros((cells, cells), dtype=np.float32)
|
||||||
|
self._initialized = False
|
||||||
|
|
||||||
|
# ── Public API ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def update(
|
||||||
|
self,
|
||||||
|
ranges: np.ndarray,
|
||||||
|
angle_min: float,
|
||||||
|
angle_increment: float,
|
||||||
|
range_max: float,
|
||||||
|
) -> List[Detection]:
|
||||||
|
"""
|
||||||
|
Process one LaserScan and return detected moving blobs.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
ranges : 1-D array of range readings (metres)
|
||||||
|
angle_min : angle of first beam (radians)
|
||||||
|
angle_increment : angular step between beams (radians)
|
||||||
|
range_max : maximum valid range (metres)
|
||||||
|
"""
|
||||||
|
curr = self._scan_to_grid(ranges, angle_min, angle_increment, range_max)
|
||||||
|
|
||||||
|
if not self._initialized:
|
||||||
|
self._bg = curr.copy()
|
||||||
|
self._initialized = True
|
||||||
|
return []
|
||||||
|
|
||||||
|
# Foreground mask
|
||||||
|
motion_mask = (curr - self._bg) > self._motion_thr
|
||||||
|
|
||||||
|
# Update background only on non-moving cells
|
||||||
|
static = ~motion_mask
|
||||||
|
self._bg[static] = (
|
||||||
|
self._bg[static] * (1.0 - self._bg_alpha)
|
||||||
|
+ curr[static] * self._bg_alpha
|
||||||
|
)
|
||||||
|
|
||||||
|
return self._cluster(motion_mask)
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
self._bg[:] = 0.0
|
||||||
|
self._initialized = False
|
||||||
|
|
||||||
|
# ── Internals ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _scan_to_grid(
|
||||||
|
self,
|
||||||
|
ranges: np.ndarray,
|
||||||
|
angle_min: float,
|
||||||
|
angle_increment: float,
|
||||||
|
range_max: float,
|
||||||
|
) -> np.ndarray:
|
||||||
|
grid = np.zeros((self._cells, self._cells), dtype=np.float32)
|
||||||
|
n = len(ranges)
|
||||||
|
angles = angle_min + np.arange(n) * angle_increment
|
||||||
|
r = np.asarray(ranges, dtype=np.float32)
|
||||||
|
|
||||||
|
valid = (r > 0.05) & (r < range_max) & np.isfinite(r)
|
||||||
|
r, a = r[valid], angles[valid]
|
||||||
|
|
||||||
|
x = r * np.cos(a)
|
||||||
|
y = r * np.sin(a)
|
||||||
|
|
||||||
|
ix = np.clip(
|
||||||
|
((x - self._origin) / self._res).astype(np.int32), 0, self._cells - 1
|
||||||
|
)
|
||||||
|
iy = np.clip(
|
||||||
|
((y - self._origin) / self._res).astype(np.int32), 0, self._cells - 1
|
||||||
|
)
|
||||||
|
grid[iy, ix] = 1.0
|
||||||
|
return grid
|
||||||
|
|
||||||
|
def _cluster(self, mask: np.ndarray) -> List[Detection]:
|
||||||
|
# Dilate slightly to connect nearby hit cells into one blob
|
||||||
|
struct = ndimage.generate_binary_structure(2, 2)
|
||||||
|
dilated = ndimage.binary_dilation(mask, structure=struct, iterations=1)
|
||||||
|
|
||||||
|
labeled, n_labels = ndimage.label(dilated)
|
||||||
|
detections: List[Detection] = []
|
||||||
|
|
||||||
|
for label_id in range(1, n_labels + 1):
|
||||||
|
coords = np.argwhere(labeled == label_id)
|
||||||
|
n_cells = len(coords)
|
||||||
|
if n_cells < self._min_clust or n_cells > self._max_clust:
|
||||||
|
continue
|
||||||
|
|
||||||
|
ys, xs = coords[:, 0], coords[:, 1]
|
||||||
|
cx_grid = float(np.mean(xs))
|
||||||
|
cy_grid = float(np.mean(ys))
|
||||||
|
cx = cx_grid * self._res + self._origin
|
||||||
|
cy = cy_grid * self._res + self._origin
|
||||||
|
|
||||||
|
detections.append(Detection(
|
||||||
|
x=cx,
|
||||||
|
y=cy,
|
||||||
|
size_m2=n_cells * self._res ** 2,
|
||||||
|
range_m=float(np.hypot(cx, cy)),
|
||||||
|
))
|
||||||
|
|
||||||
|
return detections
|
||||||
@ -0,0 +1,206 @@
|
|||||||
|
"""
|
||||||
|
tracker_manager.py — Multi-object tracker with Hungarian data association.
|
||||||
|
|
||||||
|
Track lifecycle:
|
||||||
|
TENTATIVE → confirmed after `confirm_frames` consecutive hits
|
||||||
|
CONFIRMED → normal tracked state
|
||||||
|
LOST → missed for 1..max_missed frames (still predicts, not published)
|
||||||
|
DEAD → missed > max_missed → removed
|
||||||
|
|
||||||
|
Association:
|
||||||
|
Uses scipy.optimize.linear_sum_assignment (Hungarian algorithm) on a cost
|
||||||
|
matrix of Euclidean distances between predicted track positions and new
|
||||||
|
detections. Assignments with cost > assoc_dist_m are rejected.
|
||||||
|
|
||||||
|
Up to `max_tracks` simultaneous live tracks (tentative + confirmed).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from enum import IntEnum
|
||||||
|
from typing import Dict, List, Optional, Tuple
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from scipy.optimize import linear_sum_assignment
|
||||||
|
|
||||||
|
from .kalman_tracker import KalmanTracker
|
||||||
|
from .object_detector import Detection
|
||||||
|
|
||||||
|
|
||||||
|
class TrackState(IntEnum):
|
||||||
|
TENTATIVE = 0
|
||||||
|
CONFIRMED = 1
|
||||||
|
LOST = 2
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Track:
|
||||||
|
track_id: int
|
||||||
|
kalman: KalmanTracker
|
||||||
|
state: TrackState = TrackState.TENTATIVE
|
||||||
|
hits: int = 1
|
||||||
|
age: int = 1 # frames since creation
|
||||||
|
missed: int = 0 # consecutive missed frames
|
||||||
|
|
||||||
|
|
||||||
|
class TrackerManager:
|
||||||
|
"""
|
||||||
|
Manages a pool of Kalman tracks.
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
max_tracks : hard cap on simultaneously alive tracks
|
||||||
|
confirm_frames : hits needed before a track is CONFIRMED
|
||||||
|
max_missed : consecutive missed frames before a track is DEAD
|
||||||
|
assoc_dist_m : max allowed distance (m) for a valid assignment
|
||||||
|
horizon_s : prediction horizon for trajectory output (seconds)
|
||||||
|
pred_step_s : time step between predicted waypoints
|
||||||
|
process_noise : KalmanTracker process-noise multiplier
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
max_tracks: int = 20,
|
||||||
|
confirm_frames: int = 3,
|
||||||
|
max_missed: int = 6,
|
||||||
|
assoc_dist_m: float = 1.5,
|
||||||
|
horizon_s: float = 2.5,
|
||||||
|
pred_step_s: float = 0.5,
|
||||||
|
process_noise: float = 1.0,
|
||||||
|
):
|
||||||
|
self._max_tracks = max_tracks
|
||||||
|
self._confirm_frames = confirm_frames
|
||||||
|
self._max_missed = max_missed
|
||||||
|
self._assoc_dist = assoc_dist_m
|
||||||
|
self._horizon_s = horizon_s
|
||||||
|
self._pred_step = pred_step_s
|
||||||
|
self._proc_noise = process_noise
|
||||||
|
|
||||||
|
self._tracks: Dict[int, Track] = {}
|
||||||
|
self._next_id: int = 1
|
||||||
|
|
||||||
|
# ── Public API ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def update(self, detections: List[Detection], dt: float) -> List[Track]:
|
||||||
|
"""
|
||||||
|
Process one frame of detections.
|
||||||
|
|
||||||
|
1. Predict all tracks by dt.
|
||||||
|
2. Hungarian assignment of predictions → detections.
|
||||||
|
3. Update matched tracks; mark unmatched tracks as LOST.
|
||||||
|
4. Promote tracks crossing `confirm_frames`.
|
||||||
|
5. Create new tracks for unmatched detections (if room).
|
||||||
|
6. Remove DEAD tracks.
|
||||||
|
|
||||||
|
Returns confirmed tracks only.
|
||||||
|
"""
|
||||||
|
# 1. Predict
|
||||||
|
for tr in self._tracks.values():
|
||||||
|
tr.kalman.predict(dt)
|
||||||
|
tr.age += 1
|
||||||
|
|
||||||
|
# 2. Assign
|
||||||
|
matched, unmatched_tracks, unmatched_dets = self._assign(detections)
|
||||||
|
|
||||||
|
# 3a. Update matched
|
||||||
|
for tid, did in matched:
|
||||||
|
tr = self._tracks[tid]
|
||||||
|
det = detections[did]
|
||||||
|
tr.kalman.update(np.array([det.x, det.y]))
|
||||||
|
tr.hits += 1
|
||||||
|
tr.missed = 0
|
||||||
|
if tr.state == TrackState.LOST:
|
||||||
|
tr.state = TrackState.CONFIRMED
|
||||||
|
elif tr.state == TrackState.TENTATIVE and tr.hits >= self._confirm_frames:
|
||||||
|
tr.state = TrackState.CONFIRMED
|
||||||
|
|
||||||
|
# 3b. Unmatched tracks
|
||||||
|
for tid in unmatched_tracks:
|
||||||
|
tr = self._tracks[tid]
|
||||||
|
tr.missed += 1
|
||||||
|
if tr.missed > 1:
|
||||||
|
tr.state = TrackState.LOST
|
||||||
|
|
||||||
|
# 4. New tracks for unmatched detections
|
||||||
|
live = sum(1 for t in self._tracks.values() if t.state != TrackState.LOST
|
||||||
|
or t.missed <= self._max_missed)
|
||||||
|
for did in unmatched_dets:
|
||||||
|
if live >= self._max_tracks:
|
||||||
|
break
|
||||||
|
det = detections[did]
|
||||||
|
init_state = (TrackState.CONFIRMED
|
||||||
|
if self._confirm_frames <= 1
|
||||||
|
else TrackState.TENTATIVE)
|
||||||
|
new_tr = Track(
|
||||||
|
track_id=self._next_id,
|
||||||
|
kalman=KalmanTracker(det.x, det.y, self._proc_noise),
|
||||||
|
state=init_state,
|
||||||
|
)
|
||||||
|
self._tracks[self._next_id] = new_tr
|
||||||
|
self._next_id += 1
|
||||||
|
live += 1
|
||||||
|
|
||||||
|
# 5. Prune dead
|
||||||
|
dead = [tid for tid, t in self._tracks.items() if t.missed > self._max_missed]
|
||||||
|
for tid in dead:
|
||||||
|
del self._tracks[tid]
|
||||||
|
|
||||||
|
return [t for t in self._tracks.values() if t.state == TrackState.CONFIRMED]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def all_tracks(self) -> List[Track]:
|
||||||
|
return list(self._tracks.values())
|
||||||
|
|
||||||
|
@property
|
||||||
|
def tentative_count(self) -> int:
|
||||||
|
return sum(1 for t in self._tracks.values()
|
||||||
|
if t.state == TrackState.TENTATIVE)
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
self._tracks.clear()
|
||||||
|
self._next_id = 1
|
||||||
|
|
||||||
|
# ── Hungarian assignment ──────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _assign(
|
||||||
|
self,
|
||||||
|
detections: List[Detection],
|
||||||
|
) -> Tuple[List[Tuple[int, int]], List[int], List[int]]:
|
||||||
|
"""
|
||||||
|
Returns:
|
||||||
|
matched — list of (track_id, det_index)
|
||||||
|
unmatched_tids — track IDs with no detection assigned
|
||||||
|
unmatched_dids — detection indices with no track assigned
|
||||||
|
"""
|
||||||
|
track_ids = list(self._tracks.keys())
|
||||||
|
|
||||||
|
if not track_ids or not detections:
|
||||||
|
return [], track_ids, list(range(len(detections)))
|
||||||
|
|
||||||
|
# Build cost matrix: rows=tracks, cols=detections
|
||||||
|
cost = np.full((len(track_ids), len(detections)), fill_value=np.inf)
|
||||||
|
for r, tid in enumerate(track_ids):
|
||||||
|
tx, ty = self._tracks[tid].kalman.position
|
||||||
|
for c, det in enumerate(detections):
|
||||||
|
cost[r, c] = np.hypot(tx - det.x, ty - det.y)
|
||||||
|
|
||||||
|
row_ind, col_ind = linear_sum_assignment(cost)
|
||||||
|
|
||||||
|
matched: List[Tuple[int, int]] = []
|
||||||
|
matched_track_idx: set = set()
|
||||||
|
matched_det_idx: set = set()
|
||||||
|
|
||||||
|
for r, c in zip(row_ind, col_ind):
|
||||||
|
if cost[r, c] > self._assoc_dist:
|
||||||
|
continue
|
||||||
|
matched.append((track_ids[r], c))
|
||||||
|
matched_track_idx.add(r)
|
||||||
|
matched_det_idx.add(c)
|
||||||
|
|
||||||
|
unmatched_tids = [track_ids[r] for r in range(len(track_ids))
|
||||||
|
if r not in matched_track_idx]
|
||||||
|
unmatched_dids = [c for c in range(len(detections))
|
||||||
|
if c not in matched_det_idx]
|
||||||
|
|
||||||
|
return matched, unmatched_tids, unmatched_dids
|
||||||
4
jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_dynamic_obstacles
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_dynamic_obstacles
|
||||||
32
jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.py
Normal file
32
jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
from setuptools import setup, find_packages
|
||||||
|
from glob import glob
|
||||||
|
|
||||||
|
package_name = 'saltybot_dynamic_obstacles'
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version='0.1.0',
|
||||||
|
packages=find_packages(exclude=['test']),
|
||||||
|
data_files=[
|
||||||
|
('share/ament_index/resource_index/packages',
|
||||||
|
['resource/' + package_name]),
|
||||||
|
('share/' + package_name, ['package.xml']),
|
||||||
|
('share/' + package_name + '/launch',
|
||||||
|
glob('launch/*.launch.py')),
|
||||||
|
('share/' + package_name + '/config',
|
||||||
|
glob('config/*.yaml')),
|
||||||
|
],
|
||||||
|
install_requires=['setuptools'],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer='SaltyLab',
|
||||||
|
maintainer_email='robot@saltylab.local',
|
||||||
|
description='Dynamic obstacle tracking: LIDAR motion detection, Kalman tracking, Nav2 costmap',
|
||||||
|
license='MIT',
|
||||||
|
tests_require=['pytest'],
|
||||||
|
entry_points={
|
||||||
|
'console_scripts': [
|
||||||
|
'dynamic_obs_tracker = saltybot_dynamic_obstacles.dynamic_obs_node:main',
|
||||||
|
'dynamic_obs_costmap = saltybot_dynamic_obstacles.costmap_layer_node:main',
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
@ -0,0 +1,262 @@
|
|||||||
|
"""
|
||||||
|
test_dynamic_obstacles.py — Unit tests for KalmanTracker, TrackerManager,
|
||||||
|
and ObjectDetector.
|
||||||
|
|
||||||
|
Runs without ROS2 / hardware (no rclpy imports).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||||
|
|
||||||
|
from saltybot_dynamic_obstacles.kalman_tracker import KalmanTracker
|
||||||
|
from saltybot_dynamic_obstacles.tracker_manager import TrackerManager, TrackState
|
||||||
|
from saltybot_dynamic_obstacles.object_detector import ObjectDetector, Detection
|
||||||
|
|
||||||
|
|
||||||
|
# ── KalmanTracker ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestKalmanTracker:
|
||||||
|
|
||||||
|
def test_initial_position(self):
|
||||||
|
kt = KalmanTracker(3.0, 4.0)
|
||||||
|
px, py = kt.position
|
||||||
|
assert px == pytest.approx(3.0)
|
||||||
|
assert py == pytest.approx(4.0)
|
||||||
|
|
||||||
|
def test_initial_velocity_zero(self):
|
||||||
|
kt = KalmanTracker(0.0, 0.0)
|
||||||
|
vx, vy = kt.velocity
|
||||||
|
assert vx == pytest.approx(0.0)
|
||||||
|
assert vy == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_predict_moves_position(self):
|
||||||
|
kt = KalmanTracker(0.0, 0.0)
|
||||||
|
# Give it some velocity via update sequence
|
||||||
|
kt.update(np.array([0.1, 0.0]))
|
||||||
|
kt.update(np.array([0.2, 0.0]))
|
||||||
|
kt.predict(0.1)
|
||||||
|
px, _ = kt.position
|
||||||
|
assert px > 0.0 # should have moved forward
|
||||||
|
|
||||||
|
def test_pure_predict_constant_velocity(self):
|
||||||
|
"""After velocity is established, predict() should move linearly."""
|
||||||
|
kt = KalmanTracker(0.0, 0.0)
|
||||||
|
# Force velocity by repeated updates
|
||||||
|
for i in range(10):
|
||||||
|
kt.update(np.array([i * 0.1, 0.0]))
|
||||||
|
kt.predict(0.1)
|
||||||
|
vx, _ = kt.velocity
|
||||||
|
px0, _ = kt.position
|
||||||
|
kt.predict(1.0)
|
||||||
|
px1, _ = kt.position
|
||||||
|
# Should advance roughly vx * 1.0 metres
|
||||||
|
assert px1 == pytest.approx(px0 + vx * 1.0, abs=0.3)
|
||||||
|
|
||||||
|
def test_update_corrects_position(self):
|
||||||
|
kt = KalmanTracker(0.0, 0.0)
|
||||||
|
# Predict way off
|
||||||
|
kt.predict(10.0)
|
||||||
|
# Then update to ground truth
|
||||||
|
kt.update(np.array([1.0, 2.0]))
|
||||||
|
px, py = kt.position
|
||||||
|
# Should move toward (1, 2)
|
||||||
|
assert px == pytest.approx(1.0, abs=0.5)
|
||||||
|
assert py == pytest.approx(2.0, abs=0.5)
|
||||||
|
|
||||||
|
def test_predict_horizon_length(self):
|
||||||
|
kt = KalmanTracker(0.0, 0.0)
|
||||||
|
preds = kt.predict_horizon(horizon_s=2.5, step_s=0.5)
|
||||||
|
assert len(preds) == 5 # 0.5, 1.0, 1.5, 2.0, 2.5
|
||||||
|
|
||||||
|
def test_predict_horizon_times(self):
|
||||||
|
kt = KalmanTracker(0.0, 0.0)
|
||||||
|
preds = kt.predict_horizon(horizon_s=2.0, step_s=0.5)
|
||||||
|
times = [t for _, _, t in preds]
|
||||||
|
assert times == pytest.approx([0.5, 1.0, 1.5, 2.0], abs=0.01)
|
||||||
|
|
||||||
|
def test_predict_horizon_does_not_mutate_state(self):
|
||||||
|
kt = KalmanTracker(1.0, 2.0)
|
||||||
|
kt.predict_horizon(horizon_s=3.0, step_s=0.5)
|
||||||
|
px, py = kt.position
|
||||||
|
assert px == pytest.approx(1.0)
|
||||||
|
assert py == pytest.approx(2.0)
|
||||||
|
|
||||||
|
def test_speed_zero_at_init(self):
|
||||||
|
kt = KalmanTracker(5.0, 5.0)
|
||||||
|
assert kt.speed == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_covariance_shape(self):
|
||||||
|
kt = KalmanTracker(0.0, 0.0)
|
||||||
|
cov = kt.covariance_2x2
|
||||||
|
assert cov.shape == (2, 2)
|
||||||
|
|
||||||
|
def test_covariance_positive_definite(self):
|
||||||
|
kt = KalmanTracker(0.0, 0.0)
|
||||||
|
for _ in range(5):
|
||||||
|
kt.predict(0.1)
|
||||||
|
kt.update(np.array([0.1, 0.0]))
|
||||||
|
eigvals = np.linalg.eigvalsh(kt.covariance_2x2)
|
||||||
|
assert np.all(eigvals > 0)
|
||||||
|
|
||||||
|
def test_joseph_form_stays_symmetric(self):
|
||||||
|
"""Covariance should remain symmetric after many updates."""
|
||||||
|
kt = KalmanTracker(0.0, 0.0)
|
||||||
|
for i in range(50):
|
||||||
|
kt.predict(0.1)
|
||||||
|
kt.update(np.array([i * 0.01, 0.0]))
|
||||||
|
P = kt._P
|
||||||
|
assert np.allclose(P, P.T, atol=1e-10)
|
||||||
|
|
||||||
|
|
||||||
|
# ── TrackerManager ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestTrackerManager:
|
||||||
|
|
||||||
|
def _det(self, x, y):
|
||||||
|
return Detection(x=x, y=y, size_m2=0.1, range_m=math.hypot(x, y))
|
||||||
|
|
||||||
|
def test_empty_detections_no_tracks(self):
|
||||||
|
tm = TrackerManager()
|
||||||
|
confirmed = tm.update([], 0.1)
|
||||||
|
assert confirmed == []
|
||||||
|
|
||||||
|
def test_track_created_on_detection(self):
|
||||||
|
tm = TrackerManager(confirm_frames=1)
|
||||||
|
confirmed = tm.update([self._det(1.0, 0.0)], 0.1)
|
||||||
|
assert len(confirmed) == 1
|
||||||
|
|
||||||
|
def test_track_tentative_before_confirm(self):
|
||||||
|
tm = TrackerManager(confirm_frames=3)
|
||||||
|
tm.update([self._det(1.0, 0.0)], 0.1)
|
||||||
|
# Only 1 hit — should still be tentative
|
||||||
|
assert tm.tentative_count == 1
|
||||||
|
|
||||||
|
def test_track_confirmed_after_N_hits(self):
|
||||||
|
tm = TrackerManager(confirm_frames=3, assoc_dist_m=2.0)
|
||||||
|
for _ in range(4):
|
||||||
|
confirmed = tm.update([self._det(1.0, 0.0)], 0.1)
|
||||||
|
assert len(confirmed) == 1
|
||||||
|
|
||||||
|
def test_track_deleted_after_max_missed(self):
|
||||||
|
tm = TrackerManager(confirm_frames=1, max_missed=3, assoc_dist_m=2.0)
|
||||||
|
tm.update([self._det(1.0, 0.0)], 0.1) # create
|
||||||
|
for _ in range(5):
|
||||||
|
tm.update([], 0.1) # no detections → missed++
|
||||||
|
assert len(tm.all_tracks) == 0
|
||||||
|
|
||||||
|
def test_max_tracks_cap(self):
|
||||||
|
tm = TrackerManager(max_tracks=5, confirm_frames=1)
|
||||||
|
dets = [self._det(float(i), 0.0) for i in range(10)]
|
||||||
|
tm.update(dets, 0.1)
|
||||||
|
assert len(tm.all_tracks) <= 5
|
||||||
|
|
||||||
|
def test_consistent_track_id(self):
|
||||||
|
tm = TrackerManager(confirm_frames=3, assoc_dist_m=1.5)
|
||||||
|
for i in range(5):
|
||||||
|
confirmed = tm.update([self._det(1.0 + i * 0.01, 0.0)], 0.1)
|
||||||
|
assert len(confirmed) == 1
|
||||||
|
track_id = confirmed[0].track_id
|
||||||
|
# One more tick — ID should be stable
|
||||||
|
confirmed2 = tm.update([self._det(1.06, 0.0)], 0.1)
|
||||||
|
assert confirmed2[0].track_id == track_id
|
||||||
|
|
||||||
|
def test_two_independent_tracks(self):
|
||||||
|
tm = TrackerManager(confirm_frames=3, assoc_dist_m=0.8)
|
||||||
|
for _ in range(5):
|
||||||
|
confirmed = tm.update([self._det(1.0, 0.0), self._det(5.0, 0.0)], 0.1)
|
||||||
|
assert len(confirmed) == 2
|
||||||
|
|
||||||
|
def test_reset_clears_all(self):
|
||||||
|
tm = TrackerManager(confirm_frames=1)
|
||||||
|
tm.update([self._det(1.0, 0.0)], 0.1)
|
||||||
|
tm.reset()
|
||||||
|
assert len(tm.all_tracks) == 0
|
||||||
|
|
||||||
|
def test_far_detection_not_assigned(self):
|
||||||
|
tm = TrackerManager(confirm_frames=1, assoc_dist_m=0.5)
|
||||||
|
tm.update([self._det(1.0, 0.0)], 0.1) # create track at (1,0)
|
||||||
|
# Detection 3 m away → new track, not update
|
||||||
|
tm.update([self._det(4.0, 0.0)], 0.1)
|
||||||
|
assert len(tm.all_tracks) == 2
|
||||||
|
|
||||||
|
|
||||||
|
# ── ObjectDetector ────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class TestObjectDetector:
|
||||||
|
|
||||||
|
def _empty_scan(self, n=360, rmax=8.0) -> tuple:
|
||||||
|
"""All readings at max range (static background)."""
|
||||||
|
ranges = np.full(n, rmax - 0.1, dtype=np.float32)
|
||||||
|
return ranges, -math.pi, 2 * math.pi / n, rmax
|
||||||
|
|
||||||
|
def _scan_with_blob(self, blob_r=2.0, blob_theta=0.0, n=360, rmax=8.0) -> tuple:
|
||||||
|
"""Background scan + a short-range cluster at blob_theta."""
|
||||||
|
ranges = np.full(n, rmax - 0.1, dtype=np.float32)
|
||||||
|
angle_inc = 2 * math.pi / n
|
||||||
|
angle_min = -math.pi
|
||||||
|
# Put a cluster of ~10 beams at blob_r
|
||||||
|
center_idx = int((blob_theta - angle_min) / angle_inc) % n
|
||||||
|
for di in range(-5, 6):
|
||||||
|
idx = (center_idx + di) % n
|
||||||
|
ranges[idx] = blob_r
|
||||||
|
return ranges, angle_min, angle_inc, rmax
|
||||||
|
|
||||||
|
def test_empty_scan_no_detections_after_warmup(self):
|
||||||
|
od = ObjectDetector()
|
||||||
|
r, a_min, a_inc, rmax = self._empty_scan()
|
||||||
|
od.update(r, a_min, a_inc, rmax) # init background
|
||||||
|
for _ in range(3):
|
||||||
|
dets = od.update(r, a_min, a_inc, rmax)
|
||||||
|
assert len(dets) == 0
|
||||||
|
|
||||||
|
def test_moving_blob_detected(self):
|
||||||
|
od = ObjectDetector()
|
||||||
|
bg_r, a_min, a_inc, rmax = self._empty_scan()
|
||||||
|
od.update(bg_r, a_min, a_inc, rmax) # seed background
|
||||||
|
for _ in range(5):
|
||||||
|
od.update(bg_r, a_min, a_inc, rmax)
|
||||||
|
# Now inject a foreground blob
|
||||||
|
fg_r, _, _, _ = self._scan_with_blob(blob_r=2.0, blob_theta=0.0)
|
||||||
|
dets = od.update(fg_r, a_min, a_inc, rmax)
|
||||||
|
assert len(dets) >= 1
|
||||||
|
|
||||||
|
def test_detection_centroid_approximate(self):
|
||||||
|
od = ObjectDetector()
|
||||||
|
bg_r, a_min, a_inc, rmax = self._empty_scan()
|
||||||
|
for _ in range(8):
|
||||||
|
od.update(bg_r, a_min, a_inc, rmax)
|
||||||
|
fg_r, _, _, _ = self._scan_with_blob(blob_r=3.0, blob_theta=0.0)
|
||||||
|
dets = od.update(fg_r, a_min, a_inc, rmax)
|
||||||
|
assert len(dets) >= 1
|
||||||
|
# Blob is at ~3 m along x-axis (theta=0)
|
||||||
|
cx = dets[0].x
|
||||||
|
cy = dets[0].y
|
||||||
|
assert abs(cx - 3.0) < 0.5
|
||||||
|
assert abs(cy) < 0.5
|
||||||
|
|
||||||
|
def test_reset_clears_background(self):
|
||||||
|
od = ObjectDetector()
|
||||||
|
bg_r, a_min, a_inc, rmax = self._empty_scan()
|
||||||
|
for _ in range(5):
|
||||||
|
od.update(bg_r, a_min, a_inc, rmax)
|
||||||
|
od.reset()
|
||||||
|
assert not od._initialized
|
||||||
|
|
||||||
|
def test_no_inf_nan_ranges(self):
|
||||||
|
od = ObjectDetector()
|
||||||
|
r = np.array([np.inf, np.nan, 5.0, -1.0, 0.0] * 72, dtype=np.float32)
|
||||||
|
a_min = -math.pi
|
||||||
|
a_inc = 2 * math.pi / len(r)
|
||||||
|
od.update(r, a_min, a_inc, 8.0) # should not raise
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
pytest.main([__file__, '-v'])
|
||||||
Loading…
x
Reference in New Issue
Block a user