Merge pull request 'feat(jetson): dynamic obstacle tracking — LIDAR motion detection, Kalman tracking, trajectory prediction, Nav2 costmap (#176)' (#181) from sl-perception/issue-176-dynamic-obstacles into main
This commit is contained in:
commit
54668536c1
16
jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/CMakeLists.txt
Normal file
16
jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/CMakeLists.txt
Normal file
@ -0,0 +1,16 @@
|
||||
cmake_minimum_required(VERSION 3.8)
|
||||
project(saltybot_dynamic_obs_msgs)
|
||||
|
||||
find_package(ament_cmake REQUIRED)
|
||||
find_package(rosidl_default_generators REQUIRED)
|
||||
find_package(std_msgs REQUIRED)
|
||||
find_package(geometry_msgs REQUIRED)
|
||||
|
||||
rosidl_generate_interfaces(${PROJECT_NAME}
|
||||
"msg/TrackedObject.msg"
|
||||
"msg/MovingObjectArray.msg"
|
||||
DEPENDENCIES std_msgs geometry_msgs
|
||||
)
|
||||
|
||||
ament_export_dependencies(rosidl_default_runtime)
|
||||
ament_package()
|
||||
@ -0,0 +1,12 @@
|
||||
# MovingObjectArray — all currently tracked moving obstacles.
|
||||
#
|
||||
# Published at ~10 Hz on /saltybot/moving_objects.
|
||||
# Only confirmed tracks (hits >= confirm_frames) appear here.
|
||||
|
||||
std_msgs/Header header
|
||||
|
||||
saltybot_dynamic_obs_msgs/TrackedObject[] objects
|
||||
|
||||
uint32 active_count # number of confirmed tracks
|
||||
uint32 tentative_count # tracks not yet confirmed
|
||||
float32 detector_latency_ms # pipeline latency hint
|
||||
@ -0,0 +1,21 @@
|
||||
# TrackedObject — a single tracked moving obstacle.
|
||||
#
|
||||
# predicted_path[i] is the estimated pose at predicted_times[i] seconds from now.
|
||||
# All poses are in the same frame as header.frame_id (typically 'odom').
|
||||
|
||||
std_msgs/Header header
|
||||
|
||||
uint32 object_id # stable ID across frames (monotonically increasing)
|
||||
|
||||
geometry_msgs/PoseWithCovariance pose # current best-estimate pose (x, y, yaw)
|
||||
geometry_msgs/Vector3 velocity # vx, vy in m/s (vz = 0 for ground objects)
|
||||
|
||||
geometry_msgs/Pose[] predicted_path # future positions at predicted_times
|
||||
float32[] predicted_times # seconds from header.stamp for each pose
|
||||
|
||||
float32 speed_mps # scalar |v|
|
||||
float32 confidence # 0.0–1.0 (higher after more confirmed frames)
|
||||
|
||||
uint32 age_frames # frames since first detection
|
||||
uint32 hits # number of successful associations
|
||||
bool is_valid # false if in tentative / just-created state
|
||||
23
jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/package.xml
Normal file
23
jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/package.xml
Normal file
@ -0,0 +1,23 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_dynamic_obs_msgs</name>
|
||||
<version>0.1.0</version>
|
||||
<description>Custom message types for dynamic obstacle tracking.</description>
|
||||
<maintainer email="robot@saltylab.local">SaltyLab</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<buildtool_depend>ament_cmake</buildtool_depend>
|
||||
<buildtool_depend>rosidl_default_generators</buildtool_depend>
|
||||
|
||||
<depend>std_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
|
||||
<exec_depend>rosidl_default_runtime</exec_depend>
|
||||
|
||||
<member_of_group>rosidl_interface_packages</member_of_group>
|
||||
|
||||
<export>
|
||||
<build_type>ament_cmake</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,52 @@
|
||||
# saltybot_dynamic_obstacles — runtime parameters
|
||||
#
|
||||
# Requires:
|
||||
# /scan (sensor_msgs/LaserScan) — RPLIDAR A1M8 at ~5.5 Hz
|
||||
#
|
||||
# LIDAR scan is published by rplidar_ros node.
|
||||
# Make sure RPLIDAR is running before starting this stack.
|
||||
|
||||
dynamic_obs_tracker:
|
||||
ros__parameters:
|
||||
max_tracks: 20 # max simultaneous tracked objects
|
||||
confirm_frames: 3 # hits before a track is published
|
||||
max_missed_frames: 6 # missed frames before track deletion
|
||||
assoc_dist_m: 1.5 # max assignment distance (Hungarian)
|
||||
prediction_hz: 10.0 # tracker update + publish rate
|
||||
horizon_s: 2.5 # prediction look-ahead
|
||||
pred_step_s: 0.5 # time between predicted waypoints
|
||||
odom_frame: 'odom'
|
||||
min_speed_mps: 0.05 # suppress near-stationary tracks
|
||||
max_range_m: 8.0 # ignore detections beyond this
|
||||
|
||||
dynamic_obs_costmap:
|
||||
ros__parameters:
|
||||
inflation_radius_m: 0.35 # safety bubble around each predicted point
|
||||
ring_points: 8 # polygon points for inflation circle
|
||||
clear_on_empty: true # push empty cloud to clear stale Nav2 markings
|
||||
|
||||
|
||||
# ── Nav2 costmap integration ───────────────────────────────────────────────────
|
||||
# In your nav2_params.yaml, under local_costmap or global_costmap > plugins, add
|
||||
# an ObstacleLayer with:
|
||||
#
|
||||
# obstacle_layer:
|
||||
# plugin: "nav2_costmap_2d::ObstacleLayer"
|
||||
# enabled: true
|
||||
# observation_sources: static_scan dynamic_obs
|
||||
# static_scan:
|
||||
# topic: /scan
|
||||
# data_type: LaserScan
|
||||
# ...
|
||||
# dynamic_obs:
|
||||
# topic: /saltybot/dynamic_obs_cloud
|
||||
# data_type: PointCloud2
|
||||
# sensor_frame: odom
|
||||
# obstacle_max_range: 10.0
|
||||
# raytrace_max_range: 10.0
|
||||
# marking: true
|
||||
# clearing: false
|
||||
#
|
||||
# This feeds the predicted trajectory smear directly into Nav2's obstacle
|
||||
# inflation, forcing the planner to route around the predicted future path
|
||||
# of every tracked moving object.
|
||||
@ -0,0 +1,75 @@
|
||||
"""
|
||||
dynamic_obstacles.launch.py — Dynamic obstacle tracking + Nav2 costmap layer.
|
||||
|
||||
Starts:
|
||||
dynamic_obs_tracker — LIDAR motion detection + Kalman tracking @10 Hz
|
||||
dynamic_obs_costmap — Predicted-trajectory → PointCloud2 for Nav2
|
||||
|
||||
Launch args:
|
||||
max_tracks int '20'
|
||||
assoc_dist_m float '1.5'
|
||||
horizon_s float '2.5'
|
||||
inflation_radius_m float '0.35'
|
||||
|
||||
Verify:
|
||||
ros2 topic hz /saltybot/moving_objects # ~10 Hz
|
||||
ros2 topic echo /saltybot/moving_objects # TrackedObject list
|
||||
ros2 topic hz /saltybot/dynamic_obs_cloud # ~10 Hz (when objects present)
|
||||
rviz2 → add MarkerArray → /saltybot/moving_objects_viz
|
||||
|
||||
Nav2 costmap integration:
|
||||
In your costmap_params.yaml ObstacleLayer observation_sources, add:
|
||||
dynamic_obs:
|
||||
topic: /saltybot/dynamic_obs_cloud
|
||||
data_type: PointCloud2
|
||||
marking: true
|
||||
clearing: false
|
||||
"""
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
args = [
|
||||
DeclareLaunchArgument('max_tracks', default_value='20'),
|
||||
DeclareLaunchArgument('assoc_dist_m', default_value='1.5'),
|
||||
DeclareLaunchArgument('horizon_s', default_value='2.5'),
|
||||
DeclareLaunchArgument('inflation_radius_m', default_value='0.35'),
|
||||
DeclareLaunchArgument('min_speed_mps', default_value='0.05'),
|
||||
]
|
||||
|
||||
tracker = Node(
|
||||
package='saltybot_dynamic_obstacles',
|
||||
executable='dynamic_obs_tracker',
|
||||
name='dynamic_obs_tracker',
|
||||
output='screen',
|
||||
parameters=[{
|
||||
'max_tracks': LaunchConfiguration('max_tracks'),
|
||||
'assoc_dist_m': LaunchConfiguration('assoc_dist_m'),
|
||||
'horizon_s': LaunchConfiguration('horizon_s'),
|
||||
'min_speed_mps': LaunchConfiguration('min_speed_mps'),
|
||||
'prediction_hz': 10.0,
|
||||
'confirm_frames': 3,
|
||||
'max_missed_frames': 6,
|
||||
'pred_step_s': 0.5,
|
||||
'odom_frame': 'odom',
|
||||
'max_range_m': 8.0,
|
||||
}],
|
||||
)
|
||||
|
||||
costmap = Node(
|
||||
package='saltybot_dynamic_obstacles',
|
||||
executable='dynamic_obs_costmap',
|
||||
name='dynamic_obs_costmap',
|
||||
output='screen',
|
||||
parameters=[{
|
||||
'inflation_radius_m': LaunchConfiguration('inflation_radius_m'),
|
||||
'ring_points': 8,
|
||||
'clear_on_empty': True,
|
||||
}],
|
||||
)
|
||||
|
||||
return LaunchDescription(args + [tracker, costmap])
|
||||
29
jetson/ros2_ws/src/saltybot_dynamic_obstacles/package.xml
Normal file
29
jetson/ros2_ws/src/saltybot_dynamic_obstacles/package.xml
Normal file
@ -0,0 +1,29 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_dynamic_obstacles</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
Dynamic obstacle detection, multi-object Kalman tracking, trajectory
|
||||
prediction, and Nav2 costmap layer integration for SaltyBot.
|
||||
</description>
|
||||
<maintainer email="robot@saltylab.local">SaltyLab</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>nav_msgs</depend>
|
||||
<depend>visualization_msgs</depend>
|
||||
<depend>saltybot_dynamic_obs_msgs</depend>
|
||||
|
||||
<exec_depend>python3-numpy</exec_depend>
|
||||
<exec_depend>python3-scipy</exec_depend>
|
||||
|
||||
<test_depend>pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,178 @@
|
||||
"""
|
||||
costmap_layer_node.py — Nav2 costmap integration for dynamic obstacles.
|
||||
|
||||
Converts predicted trajectories from /saltybot/moving_objects into a
|
||||
PointCloud2 fed into Nav2's ObstacleLayer. Each predicted future position
|
||||
is added as a point, creating a "smeared" dynamic obstacle zone that
|
||||
covers the full 2-3 s prediction horizon.
|
||||
|
||||
Nav2 ObstacleLayer config (in costmap_params.yaml):
|
||||
obstacle_layer:
|
||||
enabled: true
|
||||
observation_sources: dynamic_obs
|
||||
dynamic_obs:
|
||||
topic: /saltybot/dynamic_obs_cloud
|
||||
sensor_frame: odom
|
||||
data_type: PointCloud2
|
||||
obstacle_max_range: 12.0
|
||||
obstacle_min_range: 0.0
|
||||
raytrace_max_range: 12.0
|
||||
marking: true
|
||||
clearing: false # let the tracker handle clearing
|
||||
|
||||
The node also clears old obstacle points when tracks are dropped, by
|
||||
publishing a clearing cloud to /saltybot/dynamic_obs_clear.
|
||||
|
||||
Subscribes:
|
||||
/saltybot/moving_objects saltybot_dynamic_obs_msgs/MovingObjectArray
|
||||
|
||||
Publishes:
|
||||
/saltybot/dynamic_obs_cloud sensor_msgs/PointCloud2 marking cloud
|
||||
/saltybot/dynamic_obs_clear sensor_msgs/PointCloud2 clearing cloud
|
||||
|
||||
Parameters:
|
||||
inflation_radius_m float 0.35 (each predicted point inflated by this radius)
|
||||
ring_points int 8 (polygon approximation of inflation circle)
|
||||
clear_on_empty bool true (publish clear cloud when no objects tracked)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import struct
|
||||
from typing import List
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||
|
||||
from sensor_msgs.msg import PointCloud2, PointField
|
||||
from std_msgs.msg import Header
|
||||
|
||||
try:
|
||||
from saltybot_dynamic_obs_msgs.msg import MovingObjectArray
|
||||
_MSGS_AVAILABLE = True
|
||||
except ImportError:
|
||||
_MSGS_AVAILABLE = False
|
||||
|
||||
|
||||
_RELIABLE_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.RELIABLE,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=10,
|
||||
)
|
||||
|
||||
|
||||
def _make_pc2(header: Header, points_xyz: List[tuple]) -> PointCloud2:
|
||||
"""Pack a list of (x, y, z) into a PointCloud2 message."""
|
||||
fields = [
|
||||
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
|
||||
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
|
||||
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
|
||||
]
|
||||
point_step = 12 # 3 × float32
|
||||
data = bytearray(len(points_xyz) * point_step)
|
||||
for i, (x, y, z) in enumerate(points_xyz):
|
||||
struct.pack_into('<fff', data, i * point_step, x, y, z)
|
||||
|
||||
pc = PointCloud2()
|
||||
pc.header = header
|
||||
pc.height = 1
|
||||
pc.width = len(points_xyz)
|
||||
pc.fields = fields
|
||||
pc.is_bigendian = False
|
||||
pc.point_step = point_step
|
||||
pc.row_step = point_step * len(points_xyz)
|
||||
pc.data = bytes(data)
|
||||
pc.is_dense = True
|
||||
return pc
|
||||
|
||||
|
||||
class CostmapLayerNode(Node):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('dynamic_obs_costmap')
|
||||
|
||||
self.declare_parameter('inflation_radius_m', 0.35)
|
||||
self.declare_parameter('ring_points', 8)
|
||||
self.declare_parameter('clear_on_empty', True)
|
||||
|
||||
self._infl_r = self.get_parameter('inflation_radius_m').value
|
||||
self._ring_n = self.get_parameter('ring_points').value
|
||||
self._clear_empty = self.get_parameter('clear_on_empty').value
|
||||
|
||||
# Pre-compute ring offsets for inflation
|
||||
self._ring_offsets = [
|
||||
(self._infl_r * math.cos(2 * math.pi * i / self._ring_n),
|
||||
self._infl_r * math.sin(2 * math.pi * i / self._ring_n))
|
||||
for i in range(self._ring_n)
|
||||
]
|
||||
|
||||
if _MSGS_AVAILABLE:
|
||||
self.create_subscription(
|
||||
MovingObjectArray,
|
||||
'/saltybot/moving_objects',
|
||||
self._on_objects,
|
||||
_RELIABLE_QOS,
|
||||
)
|
||||
else:
|
||||
self.get_logger().warning(
|
||||
'[costmap_layer] saltybot_dynamic_obs_msgs not built — '
|
||||
'will not subscribe to MovingObjectArray'
|
||||
)
|
||||
|
||||
self._mark_pub = self.create_publisher(
|
||||
PointCloud2, '/saltybot/dynamic_obs_cloud', 10
|
||||
)
|
||||
self._clear_pub = self.create_publisher(
|
||||
PointCloud2, '/saltybot/dynamic_obs_clear', 10
|
||||
)
|
||||
|
||||
self.get_logger().info(
|
||||
f'dynamic_obs_costmap ready — '
|
||||
f'inflation={self._infl_r}m ring_pts={self._ring_n}'
|
||||
)
|
||||
|
||||
# ── Callback ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _on_objects(self, msg: 'MovingObjectArray') -> None:
|
||||
hdr = msg.header
|
||||
mark_pts: List[tuple] = []
|
||||
|
||||
for obj in msg.objects:
|
||||
if not obj.is_valid:
|
||||
continue
|
||||
|
||||
# Current position
|
||||
self._add_inflated(obj.pose.pose.position.x,
|
||||
obj.pose.pose.position.y, mark_pts)
|
||||
|
||||
# Predicted future positions
|
||||
for pose in obj.predicted_path:
|
||||
self._add_inflated(pose.position.x, pose.position.y, mark_pts)
|
||||
|
||||
if mark_pts:
|
||||
self._mark_pub.publish(_make_pc2(hdr, mark_pts))
|
||||
elif self._clear_empty:
|
||||
# Publish tiny clear cloud so Nav2 clears stale markings
|
||||
self._clear_pub.publish(_make_pc2(hdr, []))
|
||||
|
||||
def _add_inflated(self, cx: float, cy: float, pts: List[tuple]) -> None:
|
||||
"""Add the centre + ring of inflation points at height 0.5 m."""
|
||||
pts.append((cx, cy, 0.5))
|
||||
for ox, oy in self._ring_offsets:
|
||||
pts.append((cx + ox, cy + oy, 0.5))
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = CostmapLayerNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -0,0 +1,319 @@
|
||||
"""
|
||||
dynamic_obs_node.py — ROS2 node: LIDAR moving-object detection + Kalman tracking.
|
||||
|
||||
Pipeline:
|
||||
1. Subscribe /scan (RPLIDAR LaserScan, ~5.5 Hz).
|
||||
2. ObjectDetector performs background subtraction → moving blobs.
|
||||
3. TrackerManager runs Hungarian assignment + Kalman predict/update at 10 Hz.
|
||||
4. Publish /saltybot/moving_objects (MovingObjectArray).
|
||||
5. Publish /saltybot/moving_objects_viz (MarkerArray) for RViz.
|
||||
|
||||
The 10 Hz timer drives the tracker regardless of scan rate, so prediction
|
||||
continues between scans (pure-predict steps).
|
||||
|
||||
Subscribes:
|
||||
/scan sensor_msgs/LaserScan RPLIDAR A1M8
|
||||
|
||||
Publishes:
|
||||
/saltybot/moving_objects saltybot_dynamic_obs_msgs/MovingObjectArray
|
||||
/saltybot/moving_objects_viz visualization_msgs/MarkerArray
|
||||
|
||||
Parameters:
|
||||
max_tracks int 20
|
||||
confirm_frames int 3
|
||||
max_missed_frames int 6
|
||||
assoc_dist_m float 1.5
|
||||
prediction_hz float 10.0 (tracker + publish rate)
|
||||
horizon_s float 2.5
|
||||
pred_step_s float 0.5
|
||||
odom_frame str 'odom'
|
||||
min_speed_mps float 0.05 (suppress near-zero velocity tracks)
|
||||
max_range_m float 8.0
|
||||
"""
|
||||
|
||||
import time
|
||||
import math
|
||||
|
||||
import numpy as np
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||
|
||||
from sensor_msgs.msg import LaserScan
|
||||
from geometry_msgs.msg import Pose, Point, Quaternion, Vector3
|
||||
from std_msgs.msg import Header, ColorRGBA
|
||||
from visualization_msgs.msg import Marker, MarkerArray
|
||||
|
||||
try:
|
||||
from saltybot_dynamic_obs_msgs.msg import TrackedObject, MovingObjectArray
|
||||
_MSGS_AVAILABLE = True
|
||||
except ImportError:
|
||||
_MSGS_AVAILABLE = False
|
||||
|
||||
from .object_detector import ObjectDetector
|
||||
from .tracker_manager import TrackerManager, Track
|
||||
|
||||
|
||||
_SENSOR_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=5,
|
||||
)
|
||||
|
||||
|
||||
def _yaw_quat(yaw: float) -> Quaternion:
|
||||
q = Quaternion()
|
||||
q.w = math.cos(yaw * 0.5)
|
||||
q.z = math.sin(yaw * 0.5)
|
||||
return q
|
||||
|
||||
|
||||
class DynamicObsNode(Node):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__('dynamic_obs_tracker')
|
||||
|
||||
# ── Parameters ──────────────────────────────────────────────────────
|
||||
self.declare_parameter('max_tracks', 20)
|
||||
self.declare_parameter('confirm_frames', 3)
|
||||
self.declare_parameter('max_missed_frames', 6)
|
||||
self.declare_parameter('assoc_dist_m', 1.5)
|
||||
self.declare_parameter('prediction_hz', 10.0)
|
||||
self.declare_parameter('horizon_s', 2.5)
|
||||
self.declare_parameter('pred_step_s', 0.5)
|
||||
self.declare_parameter('odom_frame', 'odom')
|
||||
self.declare_parameter('min_speed_mps', 0.05)
|
||||
self.declare_parameter('max_range_m', 8.0)
|
||||
|
||||
max_tracks = self.get_parameter('max_tracks').value
|
||||
confirm_f = self.get_parameter('confirm_frames').value
|
||||
max_missed = self.get_parameter('max_missed_frames').value
|
||||
assoc_dist = self.get_parameter('assoc_dist_m').value
|
||||
pred_hz = self.get_parameter('prediction_hz').value
|
||||
horizon_s = self.get_parameter('horizon_s').value
|
||||
pred_step = self.get_parameter('pred_step_s').value
|
||||
self._frame = self.get_parameter('odom_frame').value
|
||||
self._min_spd = self.get_parameter('min_speed_mps').value
|
||||
self._max_rng = self.get_parameter('max_range_m').value
|
||||
|
||||
# ── Core modules ────────────────────────────────────────────────────
|
||||
self._detector = ObjectDetector(
|
||||
grid_radius_m=min(self._max_rng + 2.0, 12.0),
|
||||
max_cluster=int((self._max_rng / 0.1) ** 2 * 0.5),
|
||||
)
|
||||
self._tracker = TrackerManager(
|
||||
max_tracks=max_tracks,
|
||||
confirm_frames=confirm_f,
|
||||
max_missed=max_missed,
|
||||
assoc_dist_m=assoc_dist,
|
||||
horizon_s=horizon_s,
|
||||
pred_step_s=pred_step,
|
||||
)
|
||||
self._horizon_s = horizon_s
|
||||
self._pred_step = pred_step
|
||||
|
||||
# ── State ────────────────────────────────────────────────────────────
|
||||
self._latest_scan: LaserScan | None = None
|
||||
self._last_track_t: float = time.monotonic()
|
||||
self._scan_processed_stamp: float | None = None
|
||||
|
||||
# ── Subscriptions ────────────────────────────────────────────────────
|
||||
self.create_subscription(LaserScan, '/scan', self._on_scan, _SENSOR_QOS)
|
||||
|
||||
# ── Publishers ───────────────────────────────────────────────────────
|
||||
if _MSGS_AVAILABLE:
|
||||
self._obj_pub = self.create_publisher(
|
||||
MovingObjectArray, '/saltybot/moving_objects', 10
|
||||
)
|
||||
else:
|
||||
self._obj_pub = None
|
||||
self.get_logger().warning(
|
||||
'[dyn_obs] saltybot_dynamic_obs_msgs not built — '
|
||||
'MovingObjectArray will not be published'
|
||||
)
|
||||
|
||||
self._viz_pub = self.create_publisher(
|
||||
MarkerArray, '/saltybot/moving_objects_viz', 10
|
||||
)
|
||||
|
||||
# ── Timer ────────────────────────────────────────────────────────────
|
||||
self.create_timer(1.0 / pred_hz, self._track_tick)
|
||||
|
||||
self.get_logger().info(
|
||||
f'dynamic_obs_tracker ready — '
|
||||
f'max_tracks={max_tracks} horizon={horizon_s}s assoc={assoc_dist}m'
|
||||
)
|
||||
|
||||
# ── Scan callback ─────────────────────────────────────────────────────────
|
||||
|
||||
def _on_scan(self, msg: LaserScan) -> None:
|
||||
self._latest_scan = msg
|
||||
|
||||
# ── 10 Hz tracker tick ────────────────────────────────────────────────────
|
||||
|
||||
def _track_tick(self) -> None:
|
||||
t0 = time.monotonic()
|
||||
|
||||
now_mono = t0
|
||||
dt = now_mono - self._last_track_t
|
||||
dt = max(1e-3, min(dt, 0.5))
|
||||
self._last_track_t = now_mono
|
||||
|
||||
scan = self._latest_scan
|
||||
detections = []
|
||||
|
||||
if scan is not None:
|
||||
stamp_sec = scan.header.stamp.sec + scan.header.stamp.nanosec * 1e-9
|
||||
if stamp_sec != self._scan_processed_stamp:
|
||||
self._scan_processed_stamp = stamp_sec
|
||||
ranges = np.asarray(scan.ranges, dtype=np.float32)
|
||||
detections = self._detector.update(
|
||||
ranges,
|
||||
scan.angle_min,
|
||||
scan.angle_increment,
|
||||
min(scan.range_max, self._max_rng),
|
||||
)
|
||||
|
||||
confirmed = self._tracker.update(detections, dt)
|
||||
|
||||
latency_ms = (time.monotonic() - t0) * 1000.0
|
||||
stamp = self.get_clock().now().to_msg()
|
||||
|
||||
if _MSGS_AVAILABLE and self._obj_pub is not None:
|
||||
self._publish_objects(confirmed, stamp, latency_ms)
|
||||
self._publish_viz(confirmed, stamp)
|
||||
|
||||
# ── Publish helpers ───────────────────────────────────────────────────────
|
||||
|
||||
def _publish_objects(self, confirmed: list, stamp, latency_ms: float) -> None:
|
||||
arr = MovingObjectArray()
|
||||
arr.header.stamp = stamp
|
||||
arr.header.frame_id = self._frame
|
||||
arr.active_count = len(confirmed)
|
||||
arr.tentative_count = self._tracker.tentative_count
|
||||
arr.detector_latency_ms = float(latency_ms)
|
||||
|
||||
for tr in confirmed:
|
||||
px, py = tr.kalman.position
|
||||
vx, vy = tr.kalman.velocity
|
||||
speed = tr.kalman.speed
|
||||
if speed < self._min_spd:
|
||||
continue
|
||||
|
||||
obj = TrackedObject()
|
||||
obj.header = arr.header
|
||||
obj.object_id = tr.track_id
|
||||
obj.pose.pose.position.x = px
|
||||
obj.pose.pose.position.y = py
|
||||
obj.pose.pose.orientation = _yaw_quat(math.atan2(vy, vx))
|
||||
cov = tr.kalman.covariance_2x2
|
||||
obj.pose.covariance[0] = float(cov[0, 0])
|
||||
obj.pose.covariance[1] = float(cov[0, 1])
|
||||
obj.pose.covariance[6] = float(cov[1, 0])
|
||||
obj.pose.covariance[7] = float(cov[1, 1])
|
||||
obj.velocity.x = vx
|
||||
obj.velocity.y = vy
|
||||
obj.speed_mps = speed
|
||||
obj.confidence = min(1.0, tr.hits / (self._tracker._confirm_frames * 3))
|
||||
obj.age_frames = tr.age
|
||||
obj.hits = tr.hits
|
||||
obj.is_valid = True
|
||||
|
||||
# Predicted path
|
||||
for px_f, py_f, t_f in tr.kalman.predict_horizon(
|
||||
self._horizon_s, self._pred_step
|
||||
):
|
||||
p = Pose()
|
||||
p.position.x = px_f
|
||||
p.position.y = py_f
|
||||
p.orientation.w = 1.0
|
||||
obj.predicted_path.append(p)
|
||||
obj.predicted_times.append(float(t_f))
|
||||
|
||||
arr.objects.append(obj)
|
||||
|
||||
self._obj_pub.publish(arr)
|
||||
|
||||
def _publish_viz(self, confirmed: list, stamp) -> None:
|
||||
markers = MarkerArray()
|
||||
|
||||
# Delete old markers
|
||||
del_marker = Marker()
|
||||
del_marker.header.stamp = stamp
|
||||
del_marker.header.frame_id = self._frame
|
||||
del_marker.action = Marker.DELETEALL
|
||||
markers.markers.append(del_marker)
|
||||
|
||||
for tr in confirmed:
|
||||
px, py = tr.kalman.position
|
||||
vx, vy = tr.kalman.velocity
|
||||
speed = tr.kalman.speed
|
||||
if speed < self._min_spd:
|
||||
continue
|
||||
|
||||
# Cylinder at current position
|
||||
m = Marker()
|
||||
m.header.stamp = stamp
|
||||
m.header.frame_id = self._frame
|
||||
m.ns = 'dyn_obs'
|
||||
m.id = tr.track_id
|
||||
m.type = Marker.CYLINDER
|
||||
m.action = Marker.ADD
|
||||
m.pose.position.x = px
|
||||
m.pose.position.y = py
|
||||
m.pose.position.z = 0.5
|
||||
m.pose.orientation.w = 1.0
|
||||
m.scale.x = 0.4
|
||||
m.scale.y = 0.4
|
||||
m.scale.z = 1.0
|
||||
m.color = ColorRGBA(r=1.0, g=0.2, b=0.0, a=0.7)
|
||||
m.lifetime.sec = 1
|
||||
markers.markers.append(m)
|
||||
|
||||
# Arrow for velocity
|
||||
vel_m = Marker()
|
||||
vel_m.header = m.header
|
||||
vel_m.ns = 'dyn_obs_vel'
|
||||
vel_m.id = tr.track_id
|
||||
vel_m.type = Marker.ARROW
|
||||
vel_m.action = Marker.ADD
|
||||
from geometry_msgs.msg import Point as GPoint
|
||||
p_start = GPoint(x=px, y=py, z=1.0)
|
||||
p_end = GPoint(x=px + vx, y=py + vy, z=1.0)
|
||||
vel_m.points = [p_start, p_end]
|
||||
vel_m.scale.x = 0.05
|
||||
vel_m.scale.y = 0.10
|
||||
vel_m.color = ColorRGBA(r=1.0, g=1.0, b=0.0, a=0.9)
|
||||
vel_m.lifetime.sec = 1
|
||||
markers.markers.append(vel_m)
|
||||
|
||||
# Line strip for predicted path
|
||||
path_m = Marker()
|
||||
path_m.header = m.header
|
||||
path_m.ns = 'dyn_obs_path'
|
||||
path_m.id = tr.track_id
|
||||
path_m.type = Marker.LINE_STRIP
|
||||
path_m.action = Marker.ADD
|
||||
path_m.scale.x = 0.04
|
||||
path_m.color = ColorRGBA(r=1.0, g=0.5, b=0.0, a=0.5)
|
||||
path_m.lifetime.sec = 1
|
||||
path_m.points.append(GPoint(x=px, y=py, z=0.5))
|
||||
for fx, fy, _ in tr.kalman.predict_horizon(self._horizon_s, self._pred_step):
|
||||
path_m.points.append(GPoint(x=fx, y=fy, z=0.5))
|
||||
markers.markers.append(path_m)
|
||||
|
||||
self._viz_pub.publish(markers)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = DynamicObsNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -0,0 +1,132 @@
|
||||
"""
|
||||
kalman_tracker.py — Single-object Kalman filter for 2-D ground-plane tracking.
|
||||
|
||||
State vector: x = [px, py, vx, vy] (position + velocity)
|
||||
Motion model: constant-velocity (CV) with process noise on acceleration
|
||||
|
||||
Predict step:
|
||||
F = | 1 0 dt 0 | x_{k|k-1} = F @ x_{k-1|k-1}
|
||||
| 0 1 0 dt | P_{k|k-1} = F @ P @ F^T + Q
|
||||
| 0 0 1 0 |
|
||||
| 0 0 0 1 |
|
||||
|
||||
Update step (position observation only):
|
||||
H = | 1 0 0 0 | y = z - H @ x
|
||||
| 0 1 0 0 | S = H @ P @ H^T + R
|
||||
K = P @ H^T @ inv(S)
|
||||
x = x + K @ y
|
||||
P = (I - K @ H) @ P (Joseph form for stability)
|
||||
|
||||
Trajectory prediction: unrolls the CV model forward at fixed time steps.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List, Tuple
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
# ── Default noise matrices ────────────────────────────────────────────────────
|
||||
# Process noise: models uncertainty in acceleration between frames
|
||||
_Q_BASE = np.diag([0.02, 0.02, 0.8, 0.8]).astype(np.float64)
|
||||
|
||||
# Measurement noise: LIDAR centroid uncertainty (~0.15 m std)
|
||||
_R = np.diag([0.025, 0.025]).astype(np.float64) # 0.16 m sigma each axis
|
||||
|
||||
# Observation matrix
|
||||
_H = np.array([[1, 0, 0, 0],
|
||||
[0, 1, 0, 0]], dtype=np.float64)
|
||||
_I4 = np.eye(4, dtype=np.float64)
|
||||
|
||||
|
||||
class KalmanTracker:
|
||||
"""
|
||||
Kalman filter tracking one object.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
x0, y0 : initial position (metres, odom frame)
|
||||
process_noise : scalar multiplier on _Q_BASE
|
||||
"""
|
||||
|
||||
def __init__(self, x0: float, y0: float, process_noise: float = 1.0):
|
||||
self._x = np.array([x0, y0, 0.0, 0.0], dtype=np.float64)
|
||||
self._P = np.eye(4, dtype=np.float64) * 0.5
|
||||
self._Q = _Q_BASE * process_noise
|
||||
|
||||
# ── Core filter ───────────────────────────────────────────────────────────
|
||||
|
||||
def predict(self, dt: float) -> None:
|
||||
"""Propagate state by dt seconds."""
|
||||
F = np.array([
|
||||
[1, 0, dt, 0],
|
||||
[0, 1, 0, dt],
|
||||
[0, 0, 1, 0],
|
||||
[0, 0, 0, 1],
|
||||
], dtype=np.float64)
|
||||
self._x = F @ self._x
|
||||
self._P = F @ self._P @ F.T + self._Q
|
||||
|
||||
def update(self, z: np.ndarray) -> None:
|
||||
"""
|
||||
Incorporate a position measurement z = [x, y].
|
||||
Uses Joseph-form covariance update for numerical stability.
|
||||
"""
|
||||
y = z.astype(np.float64) - _H @ self._x
|
||||
S = _H @ self._P @ _H.T + _R
|
||||
K = self._P @ _H.T @ np.linalg.inv(S)
|
||||
self._x = self._x + K @ y
|
||||
IKH = _I4 - K @ _H
|
||||
# Joseph form: (I-KH) P (I-KH)^T + K R K^T
|
||||
self._P = IKH @ self._P @ IKH.T + K @ _R @ K.T
|
||||
|
||||
# ── Prediction horizon ────────────────────────────────────────────────────
|
||||
|
||||
def predict_horizon(
|
||||
self,
|
||||
horizon_s: float = 2.5,
|
||||
step_s: float = 0.5,
|
||||
) -> List[Tuple[float, float, float]]:
|
||||
"""
|
||||
Return [(x, y, t), ...] at equally-spaced future times.
|
||||
|
||||
Does NOT modify internal filter state.
|
||||
"""
|
||||
predictions: List[Tuple[float, float, float]] = []
|
||||
state = self._x.copy()
|
||||
t = 0.0
|
||||
F_step = np.array([
|
||||
[1, 0, step_s, 0],
|
||||
[0, 1, 0, step_s],
|
||||
[0, 0, 1, 0],
|
||||
[0, 0, 0, 1],
|
||||
], dtype=np.float64)
|
||||
while t < horizon_s - 1e-6:
|
||||
state = F_step @ state
|
||||
t += step_s
|
||||
predictions.append((float(state[0]), float(state[1]), t))
|
||||
return predictions
|
||||
|
||||
# ── Properties ────────────────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
def position(self) -> Tuple[float, float]:
|
||||
return float(self._x[0]), float(self._x[1])
|
||||
|
||||
@property
|
||||
def velocity(self) -> Tuple[float, float]:
|
||||
return float(self._x[2]), float(self._x[3])
|
||||
|
||||
@property
|
||||
def speed(self) -> float:
|
||||
return float(np.hypot(self._x[2], self._x[3]))
|
||||
|
||||
@property
|
||||
def covariance_2x2(self) -> np.ndarray:
|
||||
"""Position covariance (top-left 2×2 of P)."""
|
||||
return self._P[:2, :2].copy()
|
||||
|
||||
@property
|
||||
def state(self) -> np.ndarray:
|
||||
return self._x.copy()
|
||||
@ -0,0 +1,168 @@
|
||||
"""
|
||||
object_detector.py — LIDAR-based moving object detector.
|
||||
|
||||
Algorithm:
|
||||
1. Convert each LaserScan to a 2-D occupancy grid (robot-centred, fixed size).
|
||||
2. Maintain a background model via exponential moving average (EMA):
|
||||
bg_t = α * current + (1-α) * bg_{t-1} (only for non-moving cells)
|
||||
3. Foreground = cells whose occupancy significantly exceeds the background.
|
||||
4. Cluster foreground cells with scipy connected-components → Detection list.
|
||||
|
||||
The grid is robot-relative (origin at robot centre) so it naturally tracks
|
||||
the robot's motion without needing TF at this stage. The caller is responsible
|
||||
for transforming detections into a stable frame (odom) before passing to the
|
||||
tracker.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional
|
||||
|
||||
import numpy as np
|
||||
from scipy import ndimage
|
||||
|
||||
|
||||
@dataclass
|
||||
class Detection:
|
||||
"""A clustered moving foreground blob from one scan."""
|
||||
x: float # centroid x in sensor frame (m)
|
||||
y: float # centroid y in sensor frame (m)
|
||||
size_m2: float # approximate area of the cluster (m²)
|
||||
range_m: float # distance from robot (m)
|
||||
|
||||
|
||||
class ObjectDetector:
|
||||
"""
|
||||
Detects moving objects in consecutive 2-D LIDAR scans.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
grid_radius_m : half-size of the occupancy grid (grid covers ±radius)
|
||||
resolution : metres per cell
|
||||
bg_alpha : EMA update rate for background (small = slow forgetting)
|
||||
motion_thr : occupancy delta above background to count as moving
|
||||
min_cluster : minimum cells to keep a cluster
|
||||
max_cluster : maximum cells before a cluster is considered static wall
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
grid_radius_m: float = 10.0,
|
||||
resolution: float = 0.10,
|
||||
bg_alpha: float = 0.04,
|
||||
motion_thr: float = 0.45,
|
||||
min_cluster: int = 3,
|
||||
max_cluster: int = 200,
|
||||
):
|
||||
cells = int(2 * grid_radius_m / resolution)
|
||||
self._cells = cells
|
||||
self._res = resolution
|
||||
self._origin = -grid_radius_m # world x/y at grid index 0
|
||||
self._bg_alpha = bg_alpha
|
||||
self._motion_thr = motion_thr
|
||||
self._min_clust = min_cluster
|
||||
self._max_clust = max_cluster
|
||||
|
||||
self._bg = np.zeros((cells, cells), dtype=np.float32)
|
||||
self._initialized = False
|
||||
|
||||
# ── Public API ────────────────────────────────────────────────────────────
|
||||
|
||||
def update(
|
||||
self,
|
||||
ranges: np.ndarray,
|
||||
angle_min: float,
|
||||
angle_increment: float,
|
||||
range_max: float,
|
||||
) -> List[Detection]:
|
||||
"""
|
||||
Process one LaserScan and return detected moving blobs.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
ranges : 1-D array of range readings (metres)
|
||||
angle_min : angle of first beam (radians)
|
||||
angle_increment : angular step between beams (radians)
|
||||
range_max : maximum valid range (metres)
|
||||
"""
|
||||
curr = self._scan_to_grid(ranges, angle_min, angle_increment, range_max)
|
||||
|
||||
if not self._initialized:
|
||||
self._bg = curr.copy()
|
||||
self._initialized = True
|
||||
return []
|
||||
|
||||
# Foreground mask
|
||||
motion_mask = (curr - self._bg) > self._motion_thr
|
||||
|
||||
# Update background only on non-moving cells
|
||||
static = ~motion_mask
|
||||
self._bg[static] = (
|
||||
self._bg[static] * (1.0 - self._bg_alpha)
|
||||
+ curr[static] * self._bg_alpha
|
||||
)
|
||||
|
||||
return self._cluster(motion_mask)
|
||||
|
||||
def reset(self) -> None:
|
||||
self._bg[:] = 0.0
|
||||
self._initialized = False
|
||||
|
||||
# ── Internals ─────────────────────────────────────────────────────────────
|
||||
|
||||
def _scan_to_grid(
|
||||
self,
|
||||
ranges: np.ndarray,
|
||||
angle_min: float,
|
||||
angle_increment: float,
|
||||
range_max: float,
|
||||
) -> np.ndarray:
|
||||
grid = np.zeros((self._cells, self._cells), dtype=np.float32)
|
||||
n = len(ranges)
|
||||
angles = angle_min + np.arange(n) * angle_increment
|
||||
r = np.asarray(ranges, dtype=np.float32)
|
||||
|
||||
valid = (r > 0.05) & (r < range_max) & np.isfinite(r)
|
||||
r, a = r[valid], angles[valid]
|
||||
|
||||
x = r * np.cos(a)
|
||||
y = r * np.sin(a)
|
||||
|
||||
ix = np.clip(
|
||||
((x - self._origin) / self._res).astype(np.int32), 0, self._cells - 1
|
||||
)
|
||||
iy = np.clip(
|
||||
((y - self._origin) / self._res).astype(np.int32), 0, self._cells - 1
|
||||
)
|
||||
grid[iy, ix] = 1.0
|
||||
return grid
|
||||
|
||||
def _cluster(self, mask: np.ndarray) -> List[Detection]:
|
||||
# Dilate slightly to connect nearby hit cells into one blob
|
||||
struct = ndimage.generate_binary_structure(2, 2)
|
||||
dilated = ndimage.binary_dilation(mask, structure=struct, iterations=1)
|
||||
|
||||
labeled, n_labels = ndimage.label(dilated)
|
||||
detections: List[Detection] = []
|
||||
|
||||
for label_id in range(1, n_labels + 1):
|
||||
coords = np.argwhere(labeled == label_id)
|
||||
n_cells = len(coords)
|
||||
if n_cells < self._min_clust or n_cells > self._max_clust:
|
||||
continue
|
||||
|
||||
ys, xs = coords[:, 0], coords[:, 1]
|
||||
cx_grid = float(np.mean(xs))
|
||||
cy_grid = float(np.mean(ys))
|
||||
cx = cx_grid * self._res + self._origin
|
||||
cy = cy_grid * self._res + self._origin
|
||||
|
||||
detections.append(Detection(
|
||||
x=cx,
|
||||
y=cy,
|
||||
size_m2=n_cells * self._res ** 2,
|
||||
range_m=float(np.hypot(cx, cy)),
|
||||
))
|
||||
|
||||
return detections
|
||||
@ -0,0 +1,206 @@
|
||||
"""
|
||||
tracker_manager.py — Multi-object tracker with Hungarian data association.
|
||||
|
||||
Track lifecycle:
|
||||
TENTATIVE → confirmed after `confirm_frames` consecutive hits
|
||||
CONFIRMED → normal tracked state
|
||||
LOST → missed for 1..max_missed frames (still predicts, not published)
|
||||
DEAD → missed > max_missed → removed
|
||||
|
||||
Association:
|
||||
Uses scipy.optimize.linear_sum_assignment (Hungarian algorithm) on a cost
|
||||
matrix of Euclidean distances between predicted track positions and new
|
||||
detections. Assignments with cost > assoc_dist_m are rejected.
|
||||
|
||||
Up to `max_tracks` simultaneous live tracks (tentative + confirmed).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from enum import IntEnum
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
from scipy.optimize import linear_sum_assignment
|
||||
|
||||
from .kalman_tracker import KalmanTracker
|
||||
from .object_detector import Detection
|
||||
|
||||
|
||||
class TrackState(IntEnum):
|
||||
TENTATIVE = 0
|
||||
CONFIRMED = 1
|
||||
LOST = 2
|
||||
|
||||
|
||||
@dataclass
|
||||
class Track:
|
||||
track_id: int
|
||||
kalman: KalmanTracker
|
||||
state: TrackState = TrackState.TENTATIVE
|
||||
hits: int = 1
|
||||
age: int = 1 # frames since creation
|
||||
missed: int = 0 # consecutive missed frames
|
||||
|
||||
|
||||
class TrackerManager:
|
||||
"""
|
||||
Manages a pool of Kalman tracks.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
max_tracks : hard cap on simultaneously alive tracks
|
||||
confirm_frames : hits needed before a track is CONFIRMED
|
||||
max_missed : consecutive missed frames before a track is DEAD
|
||||
assoc_dist_m : max allowed distance (m) for a valid assignment
|
||||
horizon_s : prediction horizon for trajectory output (seconds)
|
||||
pred_step_s : time step between predicted waypoints
|
||||
process_noise : KalmanTracker process-noise multiplier
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
max_tracks: int = 20,
|
||||
confirm_frames: int = 3,
|
||||
max_missed: int = 6,
|
||||
assoc_dist_m: float = 1.5,
|
||||
horizon_s: float = 2.5,
|
||||
pred_step_s: float = 0.5,
|
||||
process_noise: float = 1.0,
|
||||
):
|
||||
self._max_tracks = max_tracks
|
||||
self._confirm_frames = confirm_frames
|
||||
self._max_missed = max_missed
|
||||
self._assoc_dist = assoc_dist_m
|
||||
self._horizon_s = horizon_s
|
||||
self._pred_step = pred_step_s
|
||||
self._proc_noise = process_noise
|
||||
|
||||
self._tracks: Dict[int, Track] = {}
|
||||
self._next_id: int = 1
|
||||
|
||||
# ── Public API ────────────────────────────────────────────────────────────
|
||||
|
||||
def update(self, detections: List[Detection], dt: float) -> List[Track]:
|
||||
"""
|
||||
Process one frame of detections.
|
||||
|
||||
1. Predict all tracks by dt.
|
||||
2. Hungarian assignment of predictions → detections.
|
||||
3. Update matched tracks; mark unmatched tracks as LOST.
|
||||
4. Promote tracks crossing `confirm_frames`.
|
||||
5. Create new tracks for unmatched detections (if room).
|
||||
6. Remove DEAD tracks.
|
||||
|
||||
Returns confirmed tracks only.
|
||||
"""
|
||||
# 1. Predict
|
||||
for tr in self._tracks.values():
|
||||
tr.kalman.predict(dt)
|
||||
tr.age += 1
|
||||
|
||||
# 2. Assign
|
||||
matched, unmatched_tracks, unmatched_dets = self._assign(detections)
|
||||
|
||||
# 3a. Update matched
|
||||
for tid, did in matched:
|
||||
tr = self._tracks[tid]
|
||||
det = detections[did]
|
||||
tr.kalman.update(np.array([det.x, det.y]))
|
||||
tr.hits += 1
|
||||
tr.missed = 0
|
||||
if tr.state == TrackState.LOST:
|
||||
tr.state = TrackState.CONFIRMED
|
||||
elif tr.state == TrackState.TENTATIVE and tr.hits >= self._confirm_frames:
|
||||
tr.state = TrackState.CONFIRMED
|
||||
|
||||
# 3b. Unmatched tracks
|
||||
for tid in unmatched_tracks:
|
||||
tr = self._tracks[tid]
|
||||
tr.missed += 1
|
||||
if tr.missed > 1:
|
||||
tr.state = TrackState.LOST
|
||||
|
||||
# 4. New tracks for unmatched detections
|
||||
live = sum(1 for t in self._tracks.values() if t.state != TrackState.LOST
|
||||
or t.missed <= self._max_missed)
|
||||
for did in unmatched_dets:
|
||||
if live >= self._max_tracks:
|
||||
break
|
||||
det = detections[did]
|
||||
init_state = (TrackState.CONFIRMED
|
||||
if self._confirm_frames <= 1
|
||||
else TrackState.TENTATIVE)
|
||||
new_tr = Track(
|
||||
track_id=self._next_id,
|
||||
kalman=KalmanTracker(det.x, det.y, self._proc_noise),
|
||||
state=init_state,
|
||||
)
|
||||
self._tracks[self._next_id] = new_tr
|
||||
self._next_id += 1
|
||||
live += 1
|
||||
|
||||
# 5. Prune dead
|
||||
dead = [tid for tid, t in self._tracks.items() if t.missed > self._max_missed]
|
||||
for tid in dead:
|
||||
del self._tracks[tid]
|
||||
|
||||
return [t for t in self._tracks.values() if t.state == TrackState.CONFIRMED]
|
||||
|
||||
@property
|
||||
def all_tracks(self) -> List[Track]:
|
||||
return list(self._tracks.values())
|
||||
|
||||
@property
|
||||
def tentative_count(self) -> int:
|
||||
return sum(1 for t in self._tracks.values()
|
||||
if t.state == TrackState.TENTATIVE)
|
||||
|
||||
def reset(self) -> None:
|
||||
self._tracks.clear()
|
||||
self._next_id = 1
|
||||
|
||||
# ── Hungarian assignment ──────────────────────────────────────────────────
|
||||
|
||||
def _assign(
|
||||
self,
|
||||
detections: List[Detection],
|
||||
) -> Tuple[List[Tuple[int, int]], List[int], List[int]]:
|
||||
"""
|
||||
Returns:
|
||||
matched — list of (track_id, det_index)
|
||||
unmatched_tids — track IDs with no detection assigned
|
||||
unmatched_dids — detection indices with no track assigned
|
||||
"""
|
||||
track_ids = list(self._tracks.keys())
|
||||
|
||||
if not track_ids or not detections:
|
||||
return [], track_ids, list(range(len(detections)))
|
||||
|
||||
# Build cost matrix: rows=tracks, cols=detections
|
||||
cost = np.full((len(track_ids), len(detections)), fill_value=np.inf)
|
||||
for r, tid in enumerate(track_ids):
|
||||
tx, ty = self._tracks[tid].kalman.position
|
||||
for c, det in enumerate(detections):
|
||||
cost[r, c] = np.hypot(tx - det.x, ty - det.y)
|
||||
|
||||
row_ind, col_ind = linear_sum_assignment(cost)
|
||||
|
||||
matched: List[Tuple[int, int]] = []
|
||||
matched_track_idx: set = set()
|
||||
matched_det_idx: set = set()
|
||||
|
||||
for r, c in zip(row_ind, col_ind):
|
||||
if cost[r, c] > self._assoc_dist:
|
||||
continue
|
||||
matched.append((track_ids[r], c))
|
||||
matched_track_idx.add(r)
|
||||
matched_det_idx.add(c)
|
||||
|
||||
unmatched_tids = [track_ids[r] for r in range(len(track_ids))
|
||||
if r not in matched_track_idx]
|
||||
unmatched_dids = [c for c in range(len(detections))
|
||||
if c not in matched_det_idx]
|
||||
|
||||
return matched, unmatched_tids, unmatched_dids
|
||||
4
jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_dynamic_obstacles
|
||||
[install]
|
||||
install_scripts=$base/lib/saltybot_dynamic_obstacles
|
||||
32
jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.py
Normal file
32
jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.py
Normal file
@ -0,0 +1,32 @@
|
||||
from setuptools import setup, find_packages
|
||||
from glob import glob
|
||||
|
||||
package_name = 'saltybot_dynamic_obstacles'
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version='0.1.0',
|
||||
packages=find_packages(exclude=['test']),
|
||||
data_files=[
|
||||
('share/ament_index/resource_index/packages',
|
||||
['resource/' + package_name]),
|
||||
('share/' + package_name, ['package.xml']),
|
||||
('share/' + package_name + '/launch',
|
||||
glob('launch/*.launch.py')),
|
||||
('share/' + package_name + '/config',
|
||||
glob('config/*.yaml')),
|
||||
],
|
||||
install_requires=['setuptools'],
|
||||
zip_safe=True,
|
||||
maintainer='SaltyLab',
|
||||
maintainer_email='robot@saltylab.local',
|
||||
description='Dynamic obstacle tracking: LIDAR motion detection, Kalman tracking, Nav2 costmap',
|
||||
license='MIT',
|
||||
tests_require=['pytest'],
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'dynamic_obs_tracker = saltybot_dynamic_obstacles.dynamic_obs_node:main',
|
||||
'dynamic_obs_costmap = saltybot_dynamic_obstacles.costmap_layer_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -0,0 +1,262 @@
|
||||
"""
|
||||
test_dynamic_obstacles.py — Unit tests for KalmanTracker, TrackerManager,
|
||||
and ObjectDetector.
|
||||
|
||||
Runs without ROS2 / hardware (no rclpy imports).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import sys
|
||||
import os
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
from saltybot_dynamic_obstacles.kalman_tracker import KalmanTracker
|
||||
from saltybot_dynamic_obstacles.tracker_manager import TrackerManager, TrackState
|
||||
from saltybot_dynamic_obstacles.object_detector import ObjectDetector, Detection
|
||||
|
||||
|
||||
# ── KalmanTracker ─────────────────────────────────────────────────────────────
|
||||
|
||||
class TestKalmanTracker:
|
||||
|
||||
def test_initial_position(self):
|
||||
kt = KalmanTracker(3.0, 4.0)
|
||||
px, py = kt.position
|
||||
assert px == pytest.approx(3.0)
|
||||
assert py == pytest.approx(4.0)
|
||||
|
||||
def test_initial_velocity_zero(self):
|
||||
kt = KalmanTracker(0.0, 0.0)
|
||||
vx, vy = kt.velocity
|
||||
assert vx == pytest.approx(0.0)
|
||||
assert vy == pytest.approx(0.0)
|
||||
|
||||
def test_predict_moves_position(self):
|
||||
kt = KalmanTracker(0.0, 0.0)
|
||||
# Give it some velocity via update sequence
|
||||
kt.update(np.array([0.1, 0.0]))
|
||||
kt.update(np.array([0.2, 0.0]))
|
||||
kt.predict(0.1)
|
||||
px, _ = kt.position
|
||||
assert px > 0.0 # should have moved forward
|
||||
|
||||
def test_pure_predict_constant_velocity(self):
|
||||
"""After velocity is established, predict() should move linearly."""
|
||||
kt = KalmanTracker(0.0, 0.0)
|
||||
# Force velocity by repeated updates
|
||||
for i in range(10):
|
||||
kt.update(np.array([i * 0.1, 0.0]))
|
||||
kt.predict(0.1)
|
||||
vx, _ = kt.velocity
|
||||
px0, _ = kt.position
|
||||
kt.predict(1.0)
|
||||
px1, _ = kt.position
|
||||
# Should advance roughly vx * 1.0 metres
|
||||
assert px1 == pytest.approx(px0 + vx * 1.0, abs=0.3)
|
||||
|
||||
def test_update_corrects_position(self):
|
||||
kt = KalmanTracker(0.0, 0.0)
|
||||
# Predict way off
|
||||
kt.predict(10.0)
|
||||
# Then update to ground truth
|
||||
kt.update(np.array([1.0, 2.0]))
|
||||
px, py = kt.position
|
||||
# Should move toward (1, 2)
|
||||
assert px == pytest.approx(1.0, abs=0.5)
|
||||
assert py == pytest.approx(2.0, abs=0.5)
|
||||
|
||||
def test_predict_horizon_length(self):
|
||||
kt = KalmanTracker(0.0, 0.0)
|
||||
preds = kt.predict_horizon(horizon_s=2.5, step_s=0.5)
|
||||
assert len(preds) == 5 # 0.5, 1.0, 1.5, 2.0, 2.5
|
||||
|
||||
def test_predict_horizon_times(self):
|
||||
kt = KalmanTracker(0.0, 0.0)
|
||||
preds = kt.predict_horizon(horizon_s=2.0, step_s=0.5)
|
||||
times = [t for _, _, t in preds]
|
||||
assert times == pytest.approx([0.5, 1.0, 1.5, 2.0], abs=0.01)
|
||||
|
||||
def test_predict_horizon_does_not_mutate_state(self):
|
||||
kt = KalmanTracker(1.0, 2.0)
|
||||
kt.predict_horizon(horizon_s=3.0, step_s=0.5)
|
||||
px, py = kt.position
|
||||
assert px == pytest.approx(1.0)
|
||||
assert py == pytest.approx(2.0)
|
||||
|
||||
def test_speed_zero_at_init(self):
|
||||
kt = KalmanTracker(5.0, 5.0)
|
||||
assert kt.speed == pytest.approx(0.0)
|
||||
|
||||
def test_covariance_shape(self):
|
||||
kt = KalmanTracker(0.0, 0.0)
|
||||
cov = kt.covariance_2x2
|
||||
assert cov.shape == (2, 2)
|
||||
|
||||
def test_covariance_positive_definite(self):
|
||||
kt = KalmanTracker(0.0, 0.0)
|
||||
for _ in range(5):
|
||||
kt.predict(0.1)
|
||||
kt.update(np.array([0.1, 0.0]))
|
||||
eigvals = np.linalg.eigvalsh(kt.covariance_2x2)
|
||||
assert np.all(eigvals > 0)
|
||||
|
||||
def test_joseph_form_stays_symmetric(self):
|
||||
"""Covariance should remain symmetric after many updates."""
|
||||
kt = KalmanTracker(0.0, 0.0)
|
||||
for i in range(50):
|
||||
kt.predict(0.1)
|
||||
kt.update(np.array([i * 0.01, 0.0]))
|
||||
P = kt._P
|
||||
assert np.allclose(P, P.T, atol=1e-10)
|
||||
|
||||
|
||||
# ── TrackerManager ────────────────────────────────────────────────────────────
|
||||
|
||||
class TestTrackerManager:
|
||||
|
||||
def _det(self, x, y):
|
||||
return Detection(x=x, y=y, size_m2=0.1, range_m=math.hypot(x, y))
|
||||
|
||||
def test_empty_detections_no_tracks(self):
|
||||
tm = TrackerManager()
|
||||
confirmed = tm.update([], 0.1)
|
||||
assert confirmed == []
|
||||
|
||||
def test_track_created_on_detection(self):
|
||||
tm = TrackerManager(confirm_frames=1)
|
||||
confirmed = tm.update([self._det(1.0, 0.0)], 0.1)
|
||||
assert len(confirmed) == 1
|
||||
|
||||
def test_track_tentative_before_confirm(self):
|
||||
tm = TrackerManager(confirm_frames=3)
|
||||
tm.update([self._det(1.0, 0.0)], 0.1)
|
||||
# Only 1 hit — should still be tentative
|
||||
assert tm.tentative_count == 1
|
||||
|
||||
def test_track_confirmed_after_N_hits(self):
|
||||
tm = TrackerManager(confirm_frames=3, assoc_dist_m=2.0)
|
||||
for _ in range(4):
|
||||
confirmed = tm.update([self._det(1.0, 0.0)], 0.1)
|
||||
assert len(confirmed) == 1
|
||||
|
||||
def test_track_deleted_after_max_missed(self):
|
||||
tm = TrackerManager(confirm_frames=1, max_missed=3, assoc_dist_m=2.0)
|
||||
tm.update([self._det(1.0, 0.0)], 0.1) # create
|
||||
for _ in range(5):
|
||||
tm.update([], 0.1) # no detections → missed++
|
||||
assert len(tm.all_tracks) == 0
|
||||
|
||||
def test_max_tracks_cap(self):
|
||||
tm = TrackerManager(max_tracks=5, confirm_frames=1)
|
||||
dets = [self._det(float(i), 0.0) for i in range(10)]
|
||||
tm.update(dets, 0.1)
|
||||
assert len(tm.all_tracks) <= 5
|
||||
|
||||
def test_consistent_track_id(self):
|
||||
tm = TrackerManager(confirm_frames=3, assoc_dist_m=1.5)
|
||||
for i in range(5):
|
||||
confirmed = tm.update([self._det(1.0 + i * 0.01, 0.0)], 0.1)
|
||||
assert len(confirmed) == 1
|
||||
track_id = confirmed[0].track_id
|
||||
# One more tick — ID should be stable
|
||||
confirmed2 = tm.update([self._det(1.06, 0.0)], 0.1)
|
||||
assert confirmed2[0].track_id == track_id
|
||||
|
||||
def test_two_independent_tracks(self):
|
||||
tm = TrackerManager(confirm_frames=3, assoc_dist_m=0.8)
|
||||
for _ in range(5):
|
||||
confirmed = tm.update([self._det(1.0, 0.0), self._det(5.0, 0.0)], 0.1)
|
||||
assert len(confirmed) == 2
|
||||
|
||||
def test_reset_clears_all(self):
|
||||
tm = TrackerManager(confirm_frames=1)
|
||||
tm.update([self._det(1.0, 0.0)], 0.1)
|
||||
tm.reset()
|
||||
assert len(tm.all_tracks) == 0
|
||||
|
||||
def test_far_detection_not_assigned(self):
|
||||
tm = TrackerManager(confirm_frames=1, assoc_dist_m=0.5)
|
||||
tm.update([self._det(1.0, 0.0)], 0.1) # create track at (1,0)
|
||||
# Detection 3 m away → new track, not update
|
||||
tm.update([self._det(4.0, 0.0)], 0.1)
|
||||
assert len(tm.all_tracks) == 2
|
||||
|
||||
|
||||
# ── ObjectDetector ────────────────────────────────────────────────────────────
|
||||
|
||||
class TestObjectDetector:
|
||||
|
||||
def _empty_scan(self, n=360, rmax=8.0) -> tuple:
|
||||
"""All readings at max range (static background)."""
|
||||
ranges = np.full(n, rmax - 0.1, dtype=np.float32)
|
||||
return ranges, -math.pi, 2 * math.pi / n, rmax
|
||||
|
||||
def _scan_with_blob(self, blob_r=2.0, blob_theta=0.0, n=360, rmax=8.0) -> tuple:
|
||||
"""Background scan + a short-range cluster at blob_theta."""
|
||||
ranges = np.full(n, rmax - 0.1, dtype=np.float32)
|
||||
angle_inc = 2 * math.pi / n
|
||||
angle_min = -math.pi
|
||||
# Put a cluster of ~10 beams at blob_r
|
||||
center_idx = int((blob_theta - angle_min) / angle_inc) % n
|
||||
for di in range(-5, 6):
|
||||
idx = (center_idx + di) % n
|
||||
ranges[idx] = blob_r
|
||||
return ranges, angle_min, angle_inc, rmax
|
||||
|
||||
def test_empty_scan_no_detections_after_warmup(self):
|
||||
od = ObjectDetector()
|
||||
r, a_min, a_inc, rmax = self._empty_scan()
|
||||
od.update(r, a_min, a_inc, rmax) # init background
|
||||
for _ in range(3):
|
||||
dets = od.update(r, a_min, a_inc, rmax)
|
||||
assert len(dets) == 0
|
||||
|
||||
def test_moving_blob_detected(self):
|
||||
od = ObjectDetector()
|
||||
bg_r, a_min, a_inc, rmax = self._empty_scan()
|
||||
od.update(bg_r, a_min, a_inc, rmax) # seed background
|
||||
for _ in range(5):
|
||||
od.update(bg_r, a_min, a_inc, rmax)
|
||||
# Now inject a foreground blob
|
||||
fg_r, _, _, _ = self._scan_with_blob(blob_r=2.0, blob_theta=0.0)
|
||||
dets = od.update(fg_r, a_min, a_inc, rmax)
|
||||
assert len(dets) >= 1
|
||||
|
||||
def test_detection_centroid_approximate(self):
|
||||
od = ObjectDetector()
|
||||
bg_r, a_min, a_inc, rmax = self._empty_scan()
|
||||
for _ in range(8):
|
||||
od.update(bg_r, a_min, a_inc, rmax)
|
||||
fg_r, _, _, _ = self._scan_with_blob(blob_r=3.0, blob_theta=0.0)
|
||||
dets = od.update(fg_r, a_min, a_inc, rmax)
|
||||
assert len(dets) >= 1
|
||||
# Blob is at ~3 m along x-axis (theta=0)
|
||||
cx = dets[0].x
|
||||
cy = dets[0].y
|
||||
assert abs(cx - 3.0) < 0.5
|
||||
assert abs(cy) < 0.5
|
||||
|
||||
def test_reset_clears_background(self):
|
||||
od = ObjectDetector()
|
||||
bg_r, a_min, a_inc, rmax = self._empty_scan()
|
||||
for _ in range(5):
|
||||
od.update(bg_r, a_min, a_inc, rmax)
|
||||
od.reset()
|
||||
assert not od._initialized
|
||||
|
||||
def test_no_inf_nan_ranges(self):
|
||||
od = ObjectDetector()
|
||||
r = np.array([np.inf, np.nan, 5.0, -1.0, 0.0] * 72, dtype=np.float32)
|
||||
a_min = -math.pi
|
||||
a_inc = 2 * math.pi / len(r)
|
||||
od.update(r, a_min, a_inc, 8.0) # should not raise
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
Loading…
x
Reference in New Issue
Block a user