feat(jetson): dynamic obstacle tracking — LIDAR motion detection, Kalman tracking, trajectory prediction, Nav2 costmap (#176) #181

Merged
sl-jetson merged 1 commits from sl-perception/issue-176-dynamic-obstacles into main 2026-03-02 10:45:23 -05:00
17 changed files with 1529 additions and 0 deletions

View File

@ -0,0 +1,16 @@
cmake_minimum_required(VERSION 3.8)
project(saltybot_dynamic_obs_msgs)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(std_msgs REQUIRED)
find_package(geometry_msgs REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/TrackedObject.msg"
"msg/MovingObjectArray.msg"
DEPENDENCIES std_msgs geometry_msgs
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()

View File

@ -0,0 +1,12 @@
# MovingObjectArray — all currently tracked moving obstacles.
#
# Published at ~10 Hz on /saltybot/moving_objects.
# Only confirmed tracks (hits >= confirm_frames) appear here.
std_msgs/Header header
saltybot_dynamic_obs_msgs/TrackedObject[] objects
uint32 active_count # number of confirmed tracks
uint32 tentative_count # tracks not yet confirmed
float32 detector_latency_ms # pipeline latency hint

View File

@ -0,0 +1,21 @@
# TrackedObject — a single tracked moving obstacle.
#
# predicted_path[i] is the estimated pose at predicted_times[i] seconds from now.
# All poses are in the same frame as header.frame_id (typically 'odom').
std_msgs/Header header
uint32 object_id # stable ID across frames (monotonically increasing)
geometry_msgs/PoseWithCovariance pose # current best-estimate pose (x, y, yaw)
geometry_msgs/Vector3 velocity # vx, vy in m/s (vz = 0 for ground objects)
geometry_msgs/Pose[] predicted_path # future positions at predicted_times
float32[] predicted_times # seconds from header.stamp for each pose
float32 speed_mps # scalar |v|
float32 confidence # 0.01.0 (higher after more confirmed frames)
uint32 age_frames # frames since first detection
uint32 hits # number of successful associations
bool is_valid # false if in tentative / just-created state

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_dynamic_obs_msgs</name>
<version>0.1.0</version>
<description>Custom message types for dynamic obstacle tracking.</description>
<maintainer email="robot@saltylab.local">SaltyLab</maintainer>
<license>MIT</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@ -0,0 +1,52 @@
# saltybot_dynamic_obstacles — runtime parameters
#
# Requires:
# /scan (sensor_msgs/LaserScan) — RPLIDAR A1M8 at ~5.5 Hz
#
# LIDAR scan is published by rplidar_ros node.
# Make sure RPLIDAR is running before starting this stack.
dynamic_obs_tracker:
ros__parameters:
max_tracks: 20 # max simultaneous tracked objects
confirm_frames: 3 # hits before a track is published
max_missed_frames: 6 # missed frames before track deletion
assoc_dist_m: 1.5 # max assignment distance (Hungarian)
prediction_hz: 10.0 # tracker update + publish rate
horizon_s: 2.5 # prediction look-ahead
pred_step_s: 0.5 # time between predicted waypoints
odom_frame: 'odom'
min_speed_mps: 0.05 # suppress near-stationary tracks
max_range_m: 8.0 # ignore detections beyond this
dynamic_obs_costmap:
ros__parameters:
inflation_radius_m: 0.35 # safety bubble around each predicted point
ring_points: 8 # polygon points for inflation circle
clear_on_empty: true # push empty cloud to clear stale Nav2 markings
# ── Nav2 costmap integration ───────────────────────────────────────────────────
# In your nav2_params.yaml, under local_costmap or global_costmap > plugins, add
# an ObstacleLayer with:
#
# obstacle_layer:
# plugin: "nav2_costmap_2d::ObstacleLayer"
# enabled: true
# observation_sources: static_scan dynamic_obs
# static_scan:
# topic: /scan
# data_type: LaserScan
# ...
# dynamic_obs:
# topic: /saltybot/dynamic_obs_cloud
# data_type: PointCloud2
# sensor_frame: odom
# obstacle_max_range: 10.0
# raytrace_max_range: 10.0
# marking: true
# clearing: false
#
# This feeds the predicted trajectory smear directly into Nav2's obstacle
# inflation, forcing the planner to route around the predicted future path
# of every tracked moving object.

View File

@ -0,0 +1,75 @@
"""
dynamic_obstacles.launch.py Dynamic obstacle tracking + Nav2 costmap layer.
Starts:
dynamic_obs_tracker LIDAR motion detection + Kalman tracking @10 Hz
dynamic_obs_costmap Predicted-trajectory PointCloud2 for Nav2
Launch args:
max_tracks int '20'
assoc_dist_m float '1.5'
horizon_s float '2.5'
inflation_radius_m float '0.35'
Verify:
ros2 topic hz /saltybot/moving_objects # ~10 Hz
ros2 topic echo /saltybot/moving_objects # TrackedObject list
ros2 topic hz /saltybot/dynamic_obs_cloud # ~10 Hz (when objects present)
rviz2 add MarkerArray /saltybot/moving_objects_viz
Nav2 costmap integration:
In your costmap_params.yaml ObstacleLayer observation_sources, add:
dynamic_obs:
topic: /saltybot/dynamic_obs_cloud
data_type: PointCloud2
marking: true
clearing: false
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
args = [
DeclareLaunchArgument('max_tracks', default_value='20'),
DeclareLaunchArgument('assoc_dist_m', default_value='1.5'),
DeclareLaunchArgument('horizon_s', default_value='2.5'),
DeclareLaunchArgument('inflation_radius_m', default_value='0.35'),
DeclareLaunchArgument('min_speed_mps', default_value='0.05'),
]
tracker = Node(
package='saltybot_dynamic_obstacles',
executable='dynamic_obs_tracker',
name='dynamic_obs_tracker',
output='screen',
parameters=[{
'max_tracks': LaunchConfiguration('max_tracks'),
'assoc_dist_m': LaunchConfiguration('assoc_dist_m'),
'horizon_s': LaunchConfiguration('horizon_s'),
'min_speed_mps': LaunchConfiguration('min_speed_mps'),
'prediction_hz': 10.0,
'confirm_frames': 3,
'max_missed_frames': 6,
'pred_step_s': 0.5,
'odom_frame': 'odom',
'max_range_m': 8.0,
}],
)
costmap = Node(
package='saltybot_dynamic_obstacles',
executable='dynamic_obs_costmap',
name='dynamic_obs_costmap',
output='screen',
parameters=[{
'inflation_radius_m': LaunchConfiguration('inflation_radius_m'),
'ring_points': 8,
'clear_on_empty': True,
}],
)
return LaunchDescription(args + [tracker, costmap])

View File

@ -0,0 +1,29 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_dynamic_obstacles</name>
<version>0.1.0</version>
<description>
Dynamic obstacle detection, multi-object Kalman tracking, trajectory
prediction, and Nav2 costmap layer integration for SaltyBot.
</description>
<maintainer email="robot@saltylab.local">SaltyLab</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<depend>nav_msgs</depend>
<depend>visualization_msgs</depend>
<depend>saltybot_dynamic_obs_msgs</depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-scipy</exec_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,178 @@
"""
costmap_layer_node.py Nav2 costmap integration for dynamic obstacles.
Converts predicted trajectories from /saltybot/moving_objects into a
PointCloud2 fed into Nav2's ObstacleLayer. Each predicted future position
is added as a point, creating a "smeared" dynamic obstacle zone that
covers the full 2-3 s prediction horizon.
Nav2 ObstacleLayer config (in costmap_params.yaml):
obstacle_layer:
enabled: true
observation_sources: dynamic_obs
dynamic_obs:
topic: /saltybot/dynamic_obs_cloud
sensor_frame: odom
data_type: PointCloud2
obstacle_max_range: 12.0
obstacle_min_range: 0.0
raytrace_max_range: 12.0
marking: true
clearing: false # let the tracker handle clearing
The node also clears old obstacle points when tracks are dropped, by
publishing a clearing cloud to /saltybot/dynamic_obs_clear.
Subscribes:
/saltybot/moving_objects saltybot_dynamic_obs_msgs/MovingObjectArray
Publishes:
/saltybot/dynamic_obs_cloud sensor_msgs/PointCloud2 marking cloud
/saltybot/dynamic_obs_clear sensor_msgs/PointCloud2 clearing cloud
Parameters:
inflation_radius_m float 0.35 (each predicted point inflated by this radius)
ring_points int 8 (polygon approximation of inflation circle)
clear_on_empty bool true (publish clear cloud when no objects tracked)
"""
from __future__ import annotations
import math
import struct
from typing import List
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from sensor_msgs.msg import PointCloud2, PointField
from std_msgs.msg import Header
try:
from saltybot_dynamic_obs_msgs.msg import MovingObjectArray
_MSGS_AVAILABLE = True
except ImportError:
_MSGS_AVAILABLE = False
_RELIABLE_QOS = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
def _make_pc2(header: Header, points_xyz: List[tuple]) -> PointCloud2:
"""Pack a list of (x, y, z) into a PointCloud2 message."""
fields = [
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
]
point_step = 12 # 3 × float32
data = bytearray(len(points_xyz) * point_step)
for i, (x, y, z) in enumerate(points_xyz):
struct.pack_into('<fff', data, i * point_step, x, y, z)
pc = PointCloud2()
pc.header = header
pc.height = 1
pc.width = len(points_xyz)
pc.fields = fields
pc.is_bigendian = False
pc.point_step = point_step
pc.row_step = point_step * len(points_xyz)
pc.data = bytes(data)
pc.is_dense = True
return pc
class CostmapLayerNode(Node):
def __init__(self):
super().__init__('dynamic_obs_costmap')
self.declare_parameter('inflation_radius_m', 0.35)
self.declare_parameter('ring_points', 8)
self.declare_parameter('clear_on_empty', True)
self._infl_r = self.get_parameter('inflation_radius_m').value
self._ring_n = self.get_parameter('ring_points').value
self._clear_empty = self.get_parameter('clear_on_empty').value
# Pre-compute ring offsets for inflation
self._ring_offsets = [
(self._infl_r * math.cos(2 * math.pi * i / self._ring_n),
self._infl_r * math.sin(2 * math.pi * i / self._ring_n))
for i in range(self._ring_n)
]
if _MSGS_AVAILABLE:
self.create_subscription(
MovingObjectArray,
'/saltybot/moving_objects',
self._on_objects,
_RELIABLE_QOS,
)
else:
self.get_logger().warning(
'[costmap_layer] saltybot_dynamic_obs_msgs not built — '
'will not subscribe to MovingObjectArray'
)
self._mark_pub = self.create_publisher(
PointCloud2, '/saltybot/dynamic_obs_cloud', 10
)
self._clear_pub = self.create_publisher(
PointCloud2, '/saltybot/dynamic_obs_clear', 10
)
self.get_logger().info(
f'dynamic_obs_costmap ready — '
f'inflation={self._infl_r}m ring_pts={self._ring_n}'
)
# ── Callback ──────────────────────────────────────────────────────────────
def _on_objects(self, msg: 'MovingObjectArray') -> None:
hdr = msg.header
mark_pts: List[tuple] = []
for obj in msg.objects:
if not obj.is_valid:
continue
# Current position
self._add_inflated(obj.pose.pose.position.x,
obj.pose.pose.position.y, mark_pts)
# Predicted future positions
for pose in obj.predicted_path:
self._add_inflated(pose.position.x, pose.position.y, mark_pts)
if mark_pts:
self._mark_pub.publish(_make_pc2(hdr, mark_pts))
elif self._clear_empty:
# Publish tiny clear cloud so Nav2 clears stale markings
self._clear_pub.publish(_make_pc2(hdr, []))
def _add_inflated(self, cx: float, cy: float, pts: List[tuple]) -> None:
"""Add the centre + ring of inflation points at height 0.5 m."""
pts.append((cx, cy, 0.5))
for ox, oy in self._ring_offsets:
pts.append((cx + ox, cy + oy, 0.5))
def main(args=None):
rclpy.init(args=args)
node = CostmapLayerNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,319 @@
"""
dynamic_obs_node.py ROS2 node: LIDAR moving-object detection + Kalman tracking.
Pipeline:
1. Subscribe /scan (RPLIDAR LaserScan, ~5.5 Hz).
2. ObjectDetector performs background subtraction moving blobs.
3. TrackerManager runs Hungarian assignment + Kalman predict/update at 10 Hz.
4. Publish /saltybot/moving_objects (MovingObjectArray).
5. Publish /saltybot/moving_objects_viz (MarkerArray) for RViz.
The 10 Hz timer drives the tracker regardless of scan rate, so prediction
continues between scans (pure-predict steps).
Subscribes:
/scan sensor_msgs/LaserScan RPLIDAR A1M8
Publishes:
/saltybot/moving_objects saltybot_dynamic_obs_msgs/MovingObjectArray
/saltybot/moving_objects_viz visualization_msgs/MarkerArray
Parameters:
max_tracks int 20
confirm_frames int 3
max_missed_frames int 6
assoc_dist_m float 1.5
prediction_hz float 10.0 (tracker + publish rate)
horizon_s float 2.5
pred_step_s float 0.5
odom_frame str 'odom'
min_speed_mps float 0.05 (suppress near-zero velocity tracks)
max_range_m float 8.0
"""
import time
import math
import numpy as np
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from sensor_msgs.msg import LaserScan
from geometry_msgs.msg import Pose, Point, Quaternion, Vector3
from std_msgs.msg import Header, ColorRGBA
from visualization_msgs.msg import Marker, MarkerArray
try:
from saltybot_dynamic_obs_msgs.msg import TrackedObject, MovingObjectArray
_MSGS_AVAILABLE = True
except ImportError:
_MSGS_AVAILABLE = False
from .object_detector import ObjectDetector
from .tracker_manager import TrackerManager, Track
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
def _yaw_quat(yaw: float) -> Quaternion:
q = Quaternion()
q.w = math.cos(yaw * 0.5)
q.z = math.sin(yaw * 0.5)
return q
class DynamicObsNode(Node):
def __init__(self):
super().__init__('dynamic_obs_tracker')
# ── Parameters ──────────────────────────────────────────────────────
self.declare_parameter('max_tracks', 20)
self.declare_parameter('confirm_frames', 3)
self.declare_parameter('max_missed_frames', 6)
self.declare_parameter('assoc_dist_m', 1.5)
self.declare_parameter('prediction_hz', 10.0)
self.declare_parameter('horizon_s', 2.5)
self.declare_parameter('pred_step_s', 0.5)
self.declare_parameter('odom_frame', 'odom')
self.declare_parameter('min_speed_mps', 0.05)
self.declare_parameter('max_range_m', 8.0)
max_tracks = self.get_parameter('max_tracks').value
confirm_f = self.get_parameter('confirm_frames').value
max_missed = self.get_parameter('max_missed_frames').value
assoc_dist = self.get_parameter('assoc_dist_m').value
pred_hz = self.get_parameter('prediction_hz').value
horizon_s = self.get_parameter('horizon_s').value
pred_step = self.get_parameter('pred_step_s').value
self._frame = self.get_parameter('odom_frame').value
self._min_spd = self.get_parameter('min_speed_mps').value
self._max_rng = self.get_parameter('max_range_m').value
# ── Core modules ────────────────────────────────────────────────────
self._detector = ObjectDetector(
grid_radius_m=min(self._max_rng + 2.0, 12.0),
max_cluster=int((self._max_rng / 0.1) ** 2 * 0.5),
)
self._tracker = TrackerManager(
max_tracks=max_tracks,
confirm_frames=confirm_f,
max_missed=max_missed,
assoc_dist_m=assoc_dist,
horizon_s=horizon_s,
pred_step_s=pred_step,
)
self._horizon_s = horizon_s
self._pred_step = pred_step
# ── State ────────────────────────────────────────────────────────────
self._latest_scan: LaserScan | None = None
self._last_track_t: float = time.monotonic()
self._scan_processed_stamp: float | None = None
# ── Subscriptions ────────────────────────────────────────────────────
self.create_subscription(LaserScan, '/scan', self._on_scan, _SENSOR_QOS)
# ── Publishers ───────────────────────────────────────────────────────
if _MSGS_AVAILABLE:
self._obj_pub = self.create_publisher(
MovingObjectArray, '/saltybot/moving_objects', 10
)
else:
self._obj_pub = None
self.get_logger().warning(
'[dyn_obs] saltybot_dynamic_obs_msgs not built — '
'MovingObjectArray will not be published'
)
self._viz_pub = self.create_publisher(
MarkerArray, '/saltybot/moving_objects_viz', 10
)
# ── Timer ────────────────────────────────────────────────────────────
self.create_timer(1.0 / pred_hz, self._track_tick)
self.get_logger().info(
f'dynamic_obs_tracker ready — '
f'max_tracks={max_tracks} horizon={horizon_s}s assoc={assoc_dist}m'
)
# ── Scan callback ─────────────────────────────────────────────────────────
def _on_scan(self, msg: LaserScan) -> None:
self._latest_scan = msg
# ── 10 Hz tracker tick ────────────────────────────────────────────────────
def _track_tick(self) -> None:
t0 = time.monotonic()
now_mono = t0
dt = now_mono - self._last_track_t
dt = max(1e-3, min(dt, 0.5))
self._last_track_t = now_mono
scan = self._latest_scan
detections = []
if scan is not None:
stamp_sec = scan.header.stamp.sec + scan.header.stamp.nanosec * 1e-9
if stamp_sec != self._scan_processed_stamp:
self._scan_processed_stamp = stamp_sec
ranges = np.asarray(scan.ranges, dtype=np.float32)
detections = self._detector.update(
ranges,
scan.angle_min,
scan.angle_increment,
min(scan.range_max, self._max_rng),
)
confirmed = self._tracker.update(detections, dt)
latency_ms = (time.monotonic() - t0) * 1000.0
stamp = self.get_clock().now().to_msg()
if _MSGS_AVAILABLE and self._obj_pub is not None:
self._publish_objects(confirmed, stamp, latency_ms)
self._publish_viz(confirmed, stamp)
# ── Publish helpers ───────────────────────────────────────────────────────
def _publish_objects(self, confirmed: list, stamp, latency_ms: float) -> None:
arr = MovingObjectArray()
arr.header.stamp = stamp
arr.header.frame_id = self._frame
arr.active_count = len(confirmed)
arr.tentative_count = self._tracker.tentative_count
arr.detector_latency_ms = float(latency_ms)
for tr in confirmed:
px, py = tr.kalman.position
vx, vy = tr.kalman.velocity
speed = tr.kalman.speed
if speed < self._min_spd:
continue
obj = TrackedObject()
obj.header = arr.header
obj.object_id = tr.track_id
obj.pose.pose.position.x = px
obj.pose.pose.position.y = py
obj.pose.pose.orientation = _yaw_quat(math.atan2(vy, vx))
cov = tr.kalman.covariance_2x2
obj.pose.covariance[0] = float(cov[0, 0])
obj.pose.covariance[1] = float(cov[0, 1])
obj.pose.covariance[6] = float(cov[1, 0])
obj.pose.covariance[7] = float(cov[1, 1])
obj.velocity.x = vx
obj.velocity.y = vy
obj.speed_mps = speed
obj.confidence = min(1.0, tr.hits / (self._tracker._confirm_frames * 3))
obj.age_frames = tr.age
obj.hits = tr.hits
obj.is_valid = True
# Predicted path
for px_f, py_f, t_f in tr.kalman.predict_horizon(
self._horizon_s, self._pred_step
):
p = Pose()
p.position.x = px_f
p.position.y = py_f
p.orientation.w = 1.0
obj.predicted_path.append(p)
obj.predicted_times.append(float(t_f))
arr.objects.append(obj)
self._obj_pub.publish(arr)
def _publish_viz(self, confirmed: list, stamp) -> None:
markers = MarkerArray()
# Delete old markers
del_marker = Marker()
del_marker.header.stamp = stamp
del_marker.header.frame_id = self._frame
del_marker.action = Marker.DELETEALL
markers.markers.append(del_marker)
for tr in confirmed:
px, py = tr.kalman.position
vx, vy = tr.kalman.velocity
speed = tr.kalman.speed
if speed < self._min_spd:
continue
# Cylinder at current position
m = Marker()
m.header.stamp = stamp
m.header.frame_id = self._frame
m.ns = 'dyn_obs'
m.id = tr.track_id
m.type = Marker.CYLINDER
m.action = Marker.ADD
m.pose.position.x = px
m.pose.position.y = py
m.pose.position.z = 0.5
m.pose.orientation.w = 1.0
m.scale.x = 0.4
m.scale.y = 0.4
m.scale.z = 1.0
m.color = ColorRGBA(r=1.0, g=0.2, b=0.0, a=0.7)
m.lifetime.sec = 1
markers.markers.append(m)
# Arrow for velocity
vel_m = Marker()
vel_m.header = m.header
vel_m.ns = 'dyn_obs_vel'
vel_m.id = tr.track_id
vel_m.type = Marker.ARROW
vel_m.action = Marker.ADD
from geometry_msgs.msg import Point as GPoint
p_start = GPoint(x=px, y=py, z=1.0)
p_end = GPoint(x=px + vx, y=py + vy, z=1.0)
vel_m.points = [p_start, p_end]
vel_m.scale.x = 0.05
vel_m.scale.y = 0.10
vel_m.color = ColorRGBA(r=1.0, g=1.0, b=0.0, a=0.9)
vel_m.lifetime.sec = 1
markers.markers.append(vel_m)
# Line strip for predicted path
path_m = Marker()
path_m.header = m.header
path_m.ns = 'dyn_obs_path'
path_m.id = tr.track_id
path_m.type = Marker.LINE_STRIP
path_m.action = Marker.ADD
path_m.scale.x = 0.04
path_m.color = ColorRGBA(r=1.0, g=0.5, b=0.0, a=0.5)
path_m.lifetime.sec = 1
path_m.points.append(GPoint(x=px, y=py, z=0.5))
for fx, fy, _ in tr.kalman.predict_horizon(self._horizon_s, self._pred_step):
path_m.points.append(GPoint(x=fx, y=fy, z=0.5))
markers.markers.append(path_m)
self._viz_pub.publish(markers)
def main(args=None):
rclpy.init(args=args)
node = DynamicObsNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,132 @@
"""
kalman_tracker.py Single-object Kalman filter for 2-D ground-plane tracking.
State vector: x = [px, py, vx, vy] (position + velocity)
Motion model: constant-velocity (CV) with process noise on acceleration
Predict step:
F = | 1 0 dt 0 | x_{k|k-1} = F @ x_{k-1|k-1}
| 0 1 0 dt | P_{k|k-1} = F @ P @ F^T + Q
| 0 0 1 0 |
| 0 0 0 1 |
Update step (position observation only):
H = | 1 0 0 0 | y = z - H @ x
| 0 1 0 0 | S = H @ P @ H^T + R
K = P @ H^T @ inv(S)
x = x + K @ y
P = (I - K @ H) @ P (Joseph form for stability)
Trajectory prediction: unrolls the CV model forward at fixed time steps.
"""
from __future__ import annotations
from typing import List, Tuple
import numpy as np
# ── Default noise matrices ────────────────────────────────────────────────────
# Process noise: models uncertainty in acceleration between frames
_Q_BASE = np.diag([0.02, 0.02, 0.8, 0.8]).astype(np.float64)
# Measurement noise: LIDAR centroid uncertainty (~0.15 m std)
_R = np.diag([0.025, 0.025]).astype(np.float64) # 0.16 m sigma each axis
# Observation matrix
_H = np.array([[1, 0, 0, 0],
[0, 1, 0, 0]], dtype=np.float64)
_I4 = np.eye(4, dtype=np.float64)
class KalmanTracker:
"""
Kalman filter tracking one object.
Parameters
----------
x0, y0 : initial position (metres, odom frame)
process_noise : scalar multiplier on _Q_BASE
"""
def __init__(self, x0: float, y0: float, process_noise: float = 1.0):
self._x = np.array([x0, y0, 0.0, 0.0], dtype=np.float64)
self._P = np.eye(4, dtype=np.float64) * 0.5
self._Q = _Q_BASE * process_noise
# ── Core filter ───────────────────────────────────────────────────────────
def predict(self, dt: float) -> None:
"""Propagate state by dt seconds."""
F = np.array([
[1, 0, dt, 0],
[0, 1, 0, dt],
[0, 0, 1, 0],
[0, 0, 0, 1],
], dtype=np.float64)
self._x = F @ self._x
self._P = F @ self._P @ F.T + self._Q
def update(self, z: np.ndarray) -> None:
"""
Incorporate a position measurement z = [x, y].
Uses Joseph-form covariance update for numerical stability.
"""
y = z.astype(np.float64) - _H @ self._x
S = _H @ self._P @ _H.T + _R
K = self._P @ _H.T @ np.linalg.inv(S)
self._x = self._x + K @ y
IKH = _I4 - K @ _H
# Joseph form: (I-KH) P (I-KH)^T + K R K^T
self._P = IKH @ self._P @ IKH.T + K @ _R @ K.T
# ── Prediction horizon ────────────────────────────────────────────────────
def predict_horizon(
self,
horizon_s: float = 2.5,
step_s: float = 0.5,
) -> List[Tuple[float, float, float]]:
"""
Return [(x, y, t), ...] at equally-spaced future times.
Does NOT modify internal filter state.
"""
predictions: List[Tuple[float, float, float]] = []
state = self._x.copy()
t = 0.0
F_step = np.array([
[1, 0, step_s, 0],
[0, 1, 0, step_s],
[0, 0, 1, 0],
[0, 0, 0, 1],
], dtype=np.float64)
while t < horizon_s - 1e-6:
state = F_step @ state
t += step_s
predictions.append((float(state[0]), float(state[1]), t))
return predictions
# ── Properties ────────────────────────────────────────────────────────────
@property
def position(self) -> Tuple[float, float]:
return float(self._x[0]), float(self._x[1])
@property
def velocity(self) -> Tuple[float, float]:
return float(self._x[2]), float(self._x[3])
@property
def speed(self) -> float:
return float(np.hypot(self._x[2], self._x[3]))
@property
def covariance_2x2(self) -> np.ndarray:
"""Position covariance (top-left 2×2 of P)."""
return self._P[:2, :2].copy()
@property
def state(self) -> np.ndarray:
return self._x.copy()

View File

@ -0,0 +1,168 @@
"""
object_detector.py LIDAR-based moving object detector.
Algorithm:
1. Convert each LaserScan to a 2-D occupancy grid (robot-centred, fixed size).
2. Maintain a background model via exponential moving average (EMA):
bg_t = α * current + (1-α) * bg_{t-1} (only for non-moving cells)
3. Foreground = cells whose occupancy significantly exceeds the background.
4. Cluster foreground cells with scipy connected-components Detection list.
The grid is robot-relative (origin at robot centre) so it naturally tracks
the robot's motion without needing TF at this stage. The caller is responsible
for transforming detections into a stable frame (odom) before passing to the
tracker.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Optional
import numpy as np
from scipy import ndimage
@dataclass
class Detection:
"""A clustered moving foreground blob from one scan."""
x: float # centroid x in sensor frame (m)
y: float # centroid y in sensor frame (m)
size_m2: float # approximate area of the cluster (m²)
range_m: float # distance from robot (m)
class ObjectDetector:
"""
Detects moving objects in consecutive 2-D LIDAR scans.
Parameters
----------
grid_radius_m : half-size of the occupancy grid (grid covers ±radius)
resolution : metres per cell
bg_alpha : EMA update rate for background (small = slow forgetting)
motion_thr : occupancy delta above background to count as moving
min_cluster : minimum cells to keep a cluster
max_cluster : maximum cells before a cluster is considered static wall
"""
def __init__(
self,
grid_radius_m: float = 10.0,
resolution: float = 0.10,
bg_alpha: float = 0.04,
motion_thr: float = 0.45,
min_cluster: int = 3,
max_cluster: int = 200,
):
cells = int(2 * grid_radius_m / resolution)
self._cells = cells
self._res = resolution
self._origin = -grid_radius_m # world x/y at grid index 0
self._bg_alpha = bg_alpha
self._motion_thr = motion_thr
self._min_clust = min_cluster
self._max_clust = max_cluster
self._bg = np.zeros((cells, cells), dtype=np.float32)
self._initialized = False
# ── Public API ────────────────────────────────────────────────────────────
def update(
self,
ranges: np.ndarray,
angle_min: float,
angle_increment: float,
range_max: float,
) -> List[Detection]:
"""
Process one LaserScan and return detected moving blobs.
Parameters
----------
ranges : 1-D array of range readings (metres)
angle_min : angle of first beam (radians)
angle_increment : angular step between beams (radians)
range_max : maximum valid range (metres)
"""
curr = self._scan_to_grid(ranges, angle_min, angle_increment, range_max)
if not self._initialized:
self._bg = curr.copy()
self._initialized = True
return []
# Foreground mask
motion_mask = (curr - self._bg) > self._motion_thr
# Update background only on non-moving cells
static = ~motion_mask
self._bg[static] = (
self._bg[static] * (1.0 - self._bg_alpha)
+ curr[static] * self._bg_alpha
)
return self._cluster(motion_mask)
def reset(self) -> None:
self._bg[:] = 0.0
self._initialized = False
# ── Internals ─────────────────────────────────────────────────────────────
def _scan_to_grid(
self,
ranges: np.ndarray,
angle_min: float,
angle_increment: float,
range_max: float,
) -> np.ndarray:
grid = np.zeros((self._cells, self._cells), dtype=np.float32)
n = len(ranges)
angles = angle_min + np.arange(n) * angle_increment
r = np.asarray(ranges, dtype=np.float32)
valid = (r > 0.05) & (r < range_max) & np.isfinite(r)
r, a = r[valid], angles[valid]
x = r * np.cos(a)
y = r * np.sin(a)
ix = np.clip(
((x - self._origin) / self._res).astype(np.int32), 0, self._cells - 1
)
iy = np.clip(
((y - self._origin) / self._res).astype(np.int32), 0, self._cells - 1
)
grid[iy, ix] = 1.0
return grid
def _cluster(self, mask: np.ndarray) -> List[Detection]:
# Dilate slightly to connect nearby hit cells into one blob
struct = ndimage.generate_binary_structure(2, 2)
dilated = ndimage.binary_dilation(mask, structure=struct, iterations=1)
labeled, n_labels = ndimage.label(dilated)
detections: List[Detection] = []
for label_id in range(1, n_labels + 1):
coords = np.argwhere(labeled == label_id)
n_cells = len(coords)
if n_cells < self._min_clust or n_cells > self._max_clust:
continue
ys, xs = coords[:, 0], coords[:, 1]
cx_grid = float(np.mean(xs))
cy_grid = float(np.mean(ys))
cx = cx_grid * self._res + self._origin
cy = cy_grid * self._res + self._origin
detections.append(Detection(
x=cx,
y=cy,
size_m2=n_cells * self._res ** 2,
range_m=float(np.hypot(cx, cy)),
))
return detections

View File

@ -0,0 +1,206 @@
"""
tracker_manager.py Multi-object tracker with Hungarian data association.
Track lifecycle:
TENTATIVE confirmed after `confirm_frames` consecutive hits
CONFIRMED normal tracked state
LOST missed for 1..max_missed frames (still predicts, not published)
DEAD missed > max_missed removed
Association:
Uses scipy.optimize.linear_sum_assignment (Hungarian algorithm) on a cost
matrix of Euclidean distances between predicted track positions and new
detections. Assignments with cost > assoc_dist_m are rejected.
Up to `max_tracks` simultaneous live tracks (tentative + confirmed).
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Dict, List, Optional, Tuple
import numpy as np
from scipy.optimize import linear_sum_assignment
from .kalman_tracker import KalmanTracker
from .object_detector import Detection
class TrackState(IntEnum):
TENTATIVE = 0
CONFIRMED = 1
LOST = 2
@dataclass
class Track:
track_id: int
kalman: KalmanTracker
state: TrackState = TrackState.TENTATIVE
hits: int = 1
age: int = 1 # frames since creation
missed: int = 0 # consecutive missed frames
class TrackerManager:
"""
Manages a pool of Kalman tracks.
Parameters
----------
max_tracks : hard cap on simultaneously alive tracks
confirm_frames : hits needed before a track is CONFIRMED
max_missed : consecutive missed frames before a track is DEAD
assoc_dist_m : max allowed distance (m) for a valid assignment
horizon_s : prediction horizon for trajectory output (seconds)
pred_step_s : time step between predicted waypoints
process_noise : KalmanTracker process-noise multiplier
"""
def __init__(
self,
max_tracks: int = 20,
confirm_frames: int = 3,
max_missed: int = 6,
assoc_dist_m: float = 1.5,
horizon_s: float = 2.5,
pred_step_s: float = 0.5,
process_noise: float = 1.0,
):
self._max_tracks = max_tracks
self._confirm_frames = confirm_frames
self._max_missed = max_missed
self._assoc_dist = assoc_dist_m
self._horizon_s = horizon_s
self._pred_step = pred_step_s
self._proc_noise = process_noise
self._tracks: Dict[int, Track] = {}
self._next_id: int = 1
# ── Public API ────────────────────────────────────────────────────────────
def update(self, detections: List[Detection], dt: float) -> List[Track]:
"""
Process one frame of detections.
1. Predict all tracks by dt.
2. Hungarian assignment of predictions detections.
3. Update matched tracks; mark unmatched tracks as LOST.
4. Promote tracks crossing `confirm_frames`.
5. Create new tracks for unmatched detections (if room).
6. Remove DEAD tracks.
Returns confirmed tracks only.
"""
# 1. Predict
for tr in self._tracks.values():
tr.kalman.predict(dt)
tr.age += 1
# 2. Assign
matched, unmatched_tracks, unmatched_dets = self._assign(detections)
# 3a. Update matched
for tid, did in matched:
tr = self._tracks[tid]
det = detections[did]
tr.kalman.update(np.array([det.x, det.y]))
tr.hits += 1
tr.missed = 0
if tr.state == TrackState.LOST:
tr.state = TrackState.CONFIRMED
elif tr.state == TrackState.TENTATIVE and tr.hits >= self._confirm_frames:
tr.state = TrackState.CONFIRMED
# 3b. Unmatched tracks
for tid in unmatched_tracks:
tr = self._tracks[tid]
tr.missed += 1
if tr.missed > 1:
tr.state = TrackState.LOST
# 4. New tracks for unmatched detections
live = sum(1 for t in self._tracks.values() if t.state != TrackState.LOST
or t.missed <= self._max_missed)
for did in unmatched_dets:
if live >= self._max_tracks:
break
det = detections[did]
init_state = (TrackState.CONFIRMED
if self._confirm_frames <= 1
else TrackState.TENTATIVE)
new_tr = Track(
track_id=self._next_id,
kalman=KalmanTracker(det.x, det.y, self._proc_noise),
state=init_state,
)
self._tracks[self._next_id] = new_tr
self._next_id += 1
live += 1
# 5. Prune dead
dead = [tid for tid, t in self._tracks.items() if t.missed > self._max_missed]
for tid in dead:
del self._tracks[tid]
return [t for t in self._tracks.values() if t.state == TrackState.CONFIRMED]
@property
def all_tracks(self) -> List[Track]:
return list(self._tracks.values())
@property
def tentative_count(self) -> int:
return sum(1 for t in self._tracks.values()
if t.state == TrackState.TENTATIVE)
def reset(self) -> None:
self._tracks.clear()
self._next_id = 1
# ── Hungarian assignment ──────────────────────────────────────────────────
def _assign(
self,
detections: List[Detection],
) -> Tuple[List[Tuple[int, int]], List[int], List[int]]:
"""
Returns:
matched list of (track_id, det_index)
unmatched_tids track IDs with no detection assigned
unmatched_dids detection indices with no track assigned
"""
track_ids = list(self._tracks.keys())
if not track_ids or not detections:
return [], track_ids, list(range(len(detections)))
# Build cost matrix: rows=tracks, cols=detections
cost = np.full((len(track_ids), len(detections)), fill_value=np.inf)
for r, tid in enumerate(track_ids):
tx, ty = self._tracks[tid].kalman.position
for c, det in enumerate(detections):
cost[r, c] = np.hypot(tx - det.x, ty - det.y)
row_ind, col_ind = linear_sum_assignment(cost)
matched: List[Tuple[int, int]] = []
matched_track_idx: set = set()
matched_det_idx: set = set()
for r, c in zip(row_ind, col_ind):
if cost[r, c] > self._assoc_dist:
continue
matched.append((track_ids[r], c))
matched_track_idx.add(r)
matched_det_idx.add(c)
unmatched_tids = [track_ids[r] for r in range(len(track_ids))
if r not in matched_track_idx]
unmatched_dids = [c for c in range(len(detections))
if c not in matched_det_idx]
return matched, unmatched_tids, unmatched_dids

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_dynamic_obstacles
[install]
install_scripts=$base/lib/saltybot_dynamic_obstacles

View File

@ -0,0 +1,32 @@
from setuptools import setup, find_packages
from glob import glob
package_name = 'saltybot_dynamic_obstacles'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/launch',
glob('launch/*.launch.py')),
('share/' + package_name + '/config',
glob('config/*.yaml')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='SaltyLab',
maintainer_email='robot@saltylab.local',
description='Dynamic obstacle tracking: LIDAR motion detection, Kalman tracking, Nav2 costmap',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'dynamic_obs_tracker = saltybot_dynamic_obstacles.dynamic_obs_node:main',
'dynamic_obs_costmap = saltybot_dynamic_obstacles.costmap_layer_node:main',
],
},
)

View File

@ -0,0 +1,262 @@
"""
test_dynamic_obstacles.py Unit tests for KalmanTracker, TrackerManager,
and ObjectDetector.
Runs without ROS2 / hardware (no rclpy imports).
"""
from __future__ import annotations
import math
import sys
import os
import numpy as np
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_dynamic_obstacles.kalman_tracker import KalmanTracker
from saltybot_dynamic_obstacles.tracker_manager import TrackerManager, TrackState
from saltybot_dynamic_obstacles.object_detector import ObjectDetector, Detection
# ── KalmanTracker ─────────────────────────────────────────────────────────────
class TestKalmanTracker:
def test_initial_position(self):
kt = KalmanTracker(3.0, 4.0)
px, py = kt.position
assert px == pytest.approx(3.0)
assert py == pytest.approx(4.0)
def test_initial_velocity_zero(self):
kt = KalmanTracker(0.0, 0.0)
vx, vy = kt.velocity
assert vx == pytest.approx(0.0)
assert vy == pytest.approx(0.0)
def test_predict_moves_position(self):
kt = KalmanTracker(0.0, 0.0)
# Give it some velocity via update sequence
kt.update(np.array([0.1, 0.0]))
kt.update(np.array([0.2, 0.0]))
kt.predict(0.1)
px, _ = kt.position
assert px > 0.0 # should have moved forward
def test_pure_predict_constant_velocity(self):
"""After velocity is established, predict() should move linearly."""
kt = KalmanTracker(0.0, 0.0)
# Force velocity by repeated updates
for i in range(10):
kt.update(np.array([i * 0.1, 0.0]))
kt.predict(0.1)
vx, _ = kt.velocity
px0, _ = kt.position
kt.predict(1.0)
px1, _ = kt.position
# Should advance roughly vx * 1.0 metres
assert px1 == pytest.approx(px0 + vx * 1.0, abs=0.3)
def test_update_corrects_position(self):
kt = KalmanTracker(0.0, 0.0)
# Predict way off
kt.predict(10.0)
# Then update to ground truth
kt.update(np.array([1.0, 2.0]))
px, py = kt.position
# Should move toward (1, 2)
assert px == pytest.approx(1.0, abs=0.5)
assert py == pytest.approx(2.0, abs=0.5)
def test_predict_horizon_length(self):
kt = KalmanTracker(0.0, 0.0)
preds = kt.predict_horizon(horizon_s=2.5, step_s=0.5)
assert len(preds) == 5 # 0.5, 1.0, 1.5, 2.0, 2.5
def test_predict_horizon_times(self):
kt = KalmanTracker(0.0, 0.0)
preds = kt.predict_horizon(horizon_s=2.0, step_s=0.5)
times = [t for _, _, t in preds]
assert times == pytest.approx([0.5, 1.0, 1.5, 2.0], abs=0.01)
def test_predict_horizon_does_not_mutate_state(self):
kt = KalmanTracker(1.0, 2.0)
kt.predict_horizon(horizon_s=3.0, step_s=0.5)
px, py = kt.position
assert px == pytest.approx(1.0)
assert py == pytest.approx(2.0)
def test_speed_zero_at_init(self):
kt = KalmanTracker(5.0, 5.0)
assert kt.speed == pytest.approx(0.0)
def test_covariance_shape(self):
kt = KalmanTracker(0.0, 0.0)
cov = kt.covariance_2x2
assert cov.shape == (2, 2)
def test_covariance_positive_definite(self):
kt = KalmanTracker(0.0, 0.0)
for _ in range(5):
kt.predict(0.1)
kt.update(np.array([0.1, 0.0]))
eigvals = np.linalg.eigvalsh(kt.covariance_2x2)
assert np.all(eigvals > 0)
def test_joseph_form_stays_symmetric(self):
"""Covariance should remain symmetric after many updates."""
kt = KalmanTracker(0.0, 0.0)
for i in range(50):
kt.predict(0.1)
kt.update(np.array([i * 0.01, 0.0]))
P = kt._P
assert np.allclose(P, P.T, atol=1e-10)
# ── TrackerManager ────────────────────────────────────────────────────────────
class TestTrackerManager:
def _det(self, x, y):
return Detection(x=x, y=y, size_m2=0.1, range_m=math.hypot(x, y))
def test_empty_detections_no_tracks(self):
tm = TrackerManager()
confirmed = tm.update([], 0.1)
assert confirmed == []
def test_track_created_on_detection(self):
tm = TrackerManager(confirm_frames=1)
confirmed = tm.update([self._det(1.0, 0.0)], 0.1)
assert len(confirmed) == 1
def test_track_tentative_before_confirm(self):
tm = TrackerManager(confirm_frames=3)
tm.update([self._det(1.0, 0.0)], 0.1)
# Only 1 hit — should still be tentative
assert tm.tentative_count == 1
def test_track_confirmed_after_N_hits(self):
tm = TrackerManager(confirm_frames=3, assoc_dist_m=2.0)
for _ in range(4):
confirmed = tm.update([self._det(1.0, 0.0)], 0.1)
assert len(confirmed) == 1
def test_track_deleted_after_max_missed(self):
tm = TrackerManager(confirm_frames=1, max_missed=3, assoc_dist_m=2.0)
tm.update([self._det(1.0, 0.0)], 0.1) # create
for _ in range(5):
tm.update([], 0.1) # no detections → missed++
assert len(tm.all_tracks) == 0
def test_max_tracks_cap(self):
tm = TrackerManager(max_tracks=5, confirm_frames=1)
dets = [self._det(float(i), 0.0) for i in range(10)]
tm.update(dets, 0.1)
assert len(tm.all_tracks) <= 5
def test_consistent_track_id(self):
tm = TrackerManager(confirm_frames=3, assoc_dist_m=1.5)
for i in range(5):
confirmed = tm.update([self._det(1.0 + i * 0.01, 0.0)], 0.1)
assert len(confirmed) == 1
track_id = confirmed[0].track_id
# One more tick — ID should be stable
confirmed2 = tm.update([self._det(1.06, 0.0)], 0.1)
assert confirmed2[0].track_id == track_id
def test_two_independent_tracks(self):
tm = TrackerManager(confirm_frames=3, assoc_dist_m=0.8)
for _ in range(5):
confirmed = tm.update([self._det(1.0, 0.0), self._det(5.0, 0.0)], 0.1)
assert len(confirmed) == 2
def test_reset_clears_all(self):
tm = TrackerManager(confirm_frames=1)
tm.update([self._det(1.0, 0.0)], 0.1)
tm.reset()
assert len(tm.all_tracks) == 0
def test_far_detection_not_assigned(self):
tm = TrackerManager(confirm_frames=1, assoc_dist_m=0.5)
tm.update([self._det(1.0, 0.0)], 0.1) # create track at (1,0)
# Detection 3 m away → new track, not update
tm.update([self._det(4.0, 0.0)], 0.1)
assert len(tm.all_tracks) == 2
# ── ObjectDetector ────────────────────────────────────────────────────────────
class TestObjectDetector:
def _empty_scan(self, n=360, rmax=8.0) -> tuple:
"""All readings at max range (static background)."""
ranges = np.full(n, rmax - 0.1, dtype=np.float32)
return ranges, -math.pi, 2 * math.pi / n, rmax
def _scan_with_blob(self, blob_r=2.0, blob_theta=0.0, n=360, rmax=8.0) -> tuple:
"""Background scan + a short-range cluster at blob_theta."""
ranges = np.full(n, rmax - 0.1, dtype=np.float32)
angle_inc = 2 * math.pi / n
angle_min = -math.pi
# Put a cluster of ~10 beams at blob_r
center_idx = int((blob_theta - angle_min) / angle_inc) % n
for di in range(-5, 6):
idx = (center_idx + di) % n
ranges[idx] = blob_r
return ranges, angle_min, angle_inc, rmax
def test_empty_scan_no_detections_after_warmup(self):
od = ObjectDetector()
r, a_min, a_inc, rmax = self._empty_scan()
od.update(r, a_min, a_inc, rmax) # init background
for _ in range(3):
dets = od.update(r, a_min, a_inc, rmax)
assert len(dets) == 0
def test_moving_blob_detected(self):
od = ObjectDetector()
bg_r, a_min, a_inc, rmax = self._empty_scan()
od.update(bg_r, a_min, a_inc, rmax) # seed background
for _ in range(5):
od.update(bg_r, a_min, a_inc, rmax)
# Now inject a foreground blob
fg_r, _, _, _ = self._scan_with_blob(blob_r=2.0, blob_theta=0.0)
dets = od.update(fg_r, a_min, a_inc, rmax)
assert len(dets) >= 1
def test_detection_centroid_approximate(self):
od = ObjectDetector()
bg_r, a_min, a_inc, rmax = self._empty_scan()
for _ in range(8):
od.update(bg_r, a_min, a_inc, rmax)
fg_r, _, _, _ = self._scan_with_blob(blob_r=3.0, blob_theta=0.0)
dets = od.update(fg_r, a_min, a_inc, rmax)
assert len(dets) >= 1
# Blob is at ~3 m along x-axis (theta=0)
cx = dets[0].x
cy = dets[0].y
assert abs(cx - 3.0) < 0.5
assert abs(cy) < 0.5
def test_reset_clears_background(self):
od = ObjectDetector()
bg_r, a_min, a_inc, rmax = self._empty_scan()
for _ in range(5):
od.update(bg_r, a_min, a_inc, rmax)
od.reset()
assert not od._initialized
def test_no_inf_nan_ranges(self):
od = ObjectDetector()
r = np.array([np.inf, np.nan, 5.0, -1.0, 0.0] * 72, dtype=np.float32)
a_min = -math.pi
a_inc = 2 * math.pi / len(r)
od.update(r, a_min, a_inc, 8.0) # should not raise
if __name__ == '__main__':
pytest.main([__file__, '-v'])