diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/CMakeLists.txt b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/CMakeLists.txt
new file mode 100644
index 0000000..737df82
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/CMakeLists.txt
@@ -0,0 +1,16 @@
+cmake_minimum_required(VERSION 3.8)
+project(saltybot_dynamic_obs_msgs)
+
+find_package(ament_cmake REQUIRED)
+find_package(rosidl_default_generators REQUIRED)
+find_package(std_msgs REQUIRED)
+find_package(geometry_msgs REQUIRED)
+
+rosidl_generate_interfaces(${PROJECT_NAME}
+ "msg/TrackedObject.msg"
+ "msg/MovingObjectArray.msg"
+ DEPENDENCIES std_msgs geometry_msgs
+)
+
+ament_export_dependencies(rosidl_default_runtime)
+ament_package()
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/msg/MovingObjectArray.msg b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/msg/MovingObjectArray.msg
new file mode 100644
index 0000000..619d2a5
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/msg/MovingObjectArray.msg
@@ -0,0 +1,12 @@
+# MovingObjectArray — all currently tracked moving obstacles.
+#
+# Published at ~10 Hz on /saltybot/moving_objects.
+# Only confirmed tracks (hits >= confirm_frames) appear here.
+
+std_msgs/Header header
+
+saltybot_dynamic_obs_msgs/TrackedObject[] objects
+
+uint32 active_count # number of confirmed tracks
+uint32 tentative_count # tracks not yet confirmed
+float32 detector_latency_ms # pipeline latency hint
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/msg/TrackedObject.msg b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/msg/TrackedObject.msg
new file mode 100644
index 0000000..d5d3d9b
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/msg/TrackedObject.msg
@@ -0,0 +1,21 @@
+# TrackedObject — a single tracked moving obstacle.
+#
+# predicted_path[i] is the estimated pose at predicted_times[i] seconds from now.
+# All poses are in the same frame as header.frame_id (typically 'odom').
+
+std_msgs/Header header
+
+uint32 object_id # stable ID across frames (monotonically increasing)
+
+geometry_msgs/PoseWithCovariance pose # current best-estimate pose (x, y, yaw)
+geometry_msgs/Vector3 velocity # vx, vy in m/s (vz = 0 for ground objects)
+
+geometry_msgs/Pose[] predicted_path # future positions at predicted_times
+float32[] predicted_times # seconds from header.stamp for each pose
+
+float32 speed_mps # scalar |v|
+float32 confidence # 0.0–1.0 (higher after more confirmed frames)
+
+uint32 age_frames # frames since first detection
+uint32 hits # number of successful associations
+bool is_valid # false if in tentative / just-created state
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/package.xml b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/package.xml
new file mode 100644
index 0000000..a768d97
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_dynamic_obs_msgs/package.xml
@@ -0,0 +1,23 @@
+
+
+
+ saltybot_dynamic_obs_msgs
+ 0.1.0
+ Custom message types for dynamic obstacle tracking.
+ SaltyLab
+ MIT
+
+ ament_cmake
+ rosidl_default_generators
+
+ std_msgs
+ geometry_msgs
+
+ rosidl_default_runtime
+
+ rosidl_interface_packages
+
+
+ ament_cmake
+
+
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/config/dynamic_obstacles_params.yaml b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/config/dynamic_obstacles_params.yaml
new file mode 100644
index 0000000..1bb113f
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/config/dynamic_obstacles_params.yaml
@@ -0,0 +1,52 @@
+# saltybot_dynamic_obstacles — runtime parameters
+#
+# Requires:
+# /scan (sensor_msgs/LaserScan) — RPLIDAR A1M8 at ~5.5 Hz
+#
+# LIDAR scan is published by rplidar_ros node.
+# Make sure RPLIDAR is running before starting this stack.
+
+dynamic_obs_tracker:
+ ros__parameters:
+ max_tracks: 20 # max simultaneous tracked objects
+ confirm_frames: 3 # hits before a track is published
+ max_missed_frames: 6 # missed frames before track deletion
+ assoc_dist_m: 1.5 # max assignment distance (Hungarian)
+ prediction_hz: 10.0 # tracker update + publish rate
+ horizon_s: 2.5 # prediction look-ahead
+ pred_step_s: 0.5 # time between predicted waypoints
+ odom_frame: 'odom'
+ min_speed_mps: 0.05 # suppress near-stationary tracks
+ max_range_m: 8.0 # ignore detections beyond this
+
+dynamic_obs_costmap:
+ ros__parameters:
+ inflation_radius_m: 0.35 # safety bubble around each predicted point
+ ring_points: 8 # polygon points for inflation circle
+ clear_on_empty: true # push empty cloud to clear stale Nav2 markings
+
+
+# ── Nav2 costmap integration ───────────────────────────────────────────────────
+# In your nav2_params.yaml, under local_costmap or global_costmap > plugins, add
+# an ObstacleLayer with:
+#
+# obstacle_layer:
+# plugin: "nav2_costmap_2d::ObstacleLayer"
+# enabled: true
+# observation_sources: static_scan dynamic_obs
+# static_scan:
+# topic: /scan
+# data_type: LaserScan
+# ...
+# dynamic_obs:
+# topic: /saltybot/dynamic_obs_cloud
+# data_type: PointCloud2
+# sensor_frame: odom
+# obstacle_max_range: 10.0
+# raytrace_max_range: 10.0
+# marking: true
+# clearing: false
+#
+# This feeds the predicted trajectory smear directly into Nav2's obstacle
+# inflation, forcing the planner to route around the predicted future path
+# of every tracked moving object.
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/launch/dynamic_obstacles.launch.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/launch/dynamic_obstacles.launch.py
new file mode 100644
index 0000000..1c1cc57
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/launch/dynamic_obstacles.launch.py
@@ -0,0 +1,75 @@
+"""
+dynamic_obstacles.launch.py — Dynamic obstacle tracking + Nav2 costmap layer.
+
+Starts:
+ dynamic_obs_tracker — LIDAR motion detection + Kalman tracking @10 Hz
+ dynamic_obs_costmap — Predicted-trajectory → PointCloud2 for Nav2
+
+Launch args:
+ max_tracks int '20'
+ assoc_dist_m float '1.5'
+ horizon_s float '2.5'
+ inflation_radius_m float '0.35'
+
+Verify:
+ ros2 topic hz /saltybot/moving_objects # ~10 Hz
+ ros2 topic echo /saltybot/moving_objects # TrackedObject list
+ ros2 topic hz /saltybot/dynamic_obs_cloud # ~10 Hz (when objects present)
+ rviz2 → add MarkerArray → /saltybot/moving_objects_viz
+
+Nav2 costmap integration:
+ In your costmap_params.yaml ObstacleLayer observation_sources, add:
+ dynamic_obs:
+ topic: /saltybot/dynamic_obs_cloud
+ data_type: PointCloud2
+ marking: true
+ clearing: false
+"""
+
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument
+from launch.substitutions import LaunchConfiguration
+from launch_ros.actions import Node
+
+
+def generate_launch_description():
+ args = [
+ DeclareLaunchArgument('max_tracks', default_value='20'),
+ DeclareLaunchArgument('assoc_dist_m', default_value='1.5'),
+ DeclareLaunchArgument('horizon_s', default_value='2.5'),
+ DeclareLaunchArgument('inflation_radius_m', default_value='0.35'),
+ DeclareLaunchArgument('min_speed_mps', default_value='0.05'),
+ ]
+
+ tracker = Node(
+ package='saltybot_dynamic_obstacles',
+ executable='dynamic_obs_tracker',
+ name='dynamic_obs_tracker',
+ output='screen',
+ parameters=[{
+ 'max_tracks': LaunchConfiguration('max_tracks'),
+ 'assoc_dist_m': LaunchConfiguration('assoc_dist_m'),
+ 'horizon_s': LaunchConfiguration('horizon_s'),
+ 'min_speed_mps': LaunchConfiguration('min_speed_mps'),
+ 'prediction_hz': 10.0,
+ 'confirm_frames': 3,
+ 'max_missed_frames': 6,
+ 'pred_step_s': 0.5,
+ 'odom_frame': 'odom',
+ 'max_range_m': 8.0,
+ }],
+ )
+
+ costmap = Node(
+ package='saltybot_dynamic_obstacles',
+ executable='dynamic_obs_costmap',
+ name='dynamic_obs_costmap',
+ output='screen',
+ parameters=[{
+ 'inflation_radius_m': LaunchConfiguration('inflation_radius_m'),
+ 'ring_points': 8,
+ 'clear_on_empty': True,
+ }],
+ )
+
+ return LaunchDescription(args + [tracker, costmap])
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/package.xml b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/package.xml
new file mode 100644
index 0000000..ebdc91c
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/package.xml
@@ -0,0 +1,29 @@
+
+
+
+ saltybot_dynamic_obstacles
+ 0.1.0
+
+ Dynamic obstacle detection, multi-object Kalman tracking, trajectory
+ prediction, and Nav2 costmap layer integration for SaltyBot.
+
+ SaltyLab
+ MIT
+
+ rclpy
+ std_msgs
+ sensor_msgs
+ geometry_msgs
+ nav_msgs
+ visualization_msgs
+ saltybot_dynamic_obs_msgs
+
+ python3-numpy
+ python3-scipy
+
+ pytest
+
+
+ ament_python
+
+
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/resource/saltybot_dynamic_obstacles b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/resource/saltybot_dynamic_obstacles
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/__init__.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/costmap_layer_node.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/costmap_layer_node.py
new file mode 100644
index 0000000..792ff34
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/costmap_layer_node.py
@@ -0,0 +1,178 @@
+"""
+costmap_layer_node.py — Nav2 costmap integration for dynamic obstacles.
+
+Converts predicted trajectories from /saltybot/moving_objects into a
+PointCloud2 fed into Nav2's ObstacleLayer. Each predicted future position
+is added as a point, creating a "smeared" dynamic obstacle zone that
+covers the full 2-3 s prediction horizon.
+
+Nav2 ObstacleLayer config (in costmap_params.yaml):
+ obstacle_layer:
+ enabled: true
+ observation_sources: dynamic_obs
+ dynamic_obs:
+ topic: /saltybot/dynamic_obs_cloud
+ sensor_frame: odom
+ data_type: PointCloud2
+ obstacle_max_range: 12.0
+ obstacle_min_range: 0.0
+ raytrace_max_range: 12.0
+ marking: true
+ clearing: false # let the tracker handle clearing
+
+The node also clears old obstacle points when tracks are dropped, by
+publishing a clearing cloud to /saltybot/dynamic_obs_clear.
+
+Subscribes:
+ /saltybot/moving_objects saltybot_dynamic_obs_msgs/MovingObjectArray
+
+Publishes:
+ /saltybot/dynamic_obs_cloud sensor_msgs/PointCloud2 marking cloud
+ /saltybot/dynamic_obs_clear sensor_msgs/PointCloud2 clearing cloud
+
+Parameters:
+ inflation_radius_m float 0.35 (each predicted point inflated by this radius)
+ ring_points int 8 (polygon approximation of inflation circle)
+ clear_on_empty bool true (publish clear cloud when no objects tracked)
+"""
+
+from __future__ import annotations
+
+import math
+import struct
+from typing import List
+
+import rclpy
+from rclpy.node import Node
+from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
+
+from sensor_msgs.msg import PointCloud2, PointField
+from std_msgs.msg import Header
+
+try:
+ from saltybot_dynamic_obs_msgs.msg import MovingObjectArray
+ _MSGS_AVAILABLE = True
+except ImportError:
+ _MSGS_AVAILABLE = False
+
+
+_RELIABLE_QOS = QoSProfile(
+ reliability=ReliabilityPolicy.RELIABLE,
+ history=HistoryPolicy.KEEP_LAST,
+ depth=10,
+)
+
+
+def _make_pc2(header: Header, points_xyz: List[tuple]) -> PointCloud2:
+ """Pack a list of (x, y, z) into a PointCloud2 message."""
+ fields = [
+ PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
+ PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
+ PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
+ ]
+ point_step = 12 # 3 × float32
+ data = bytearray(len(points_xyz) * point_step)
+ for i, (x, y, z) in enumerate(points_xyz):
+ struct.pack_into(' None:
+ hdr = msg.header
+ mark_pts: List[tuple] = []
+
+ for obj in msg.objects:
+ if not obj.is_valid:
+ continue
+
+ # Current position
+ self._add_inflated(obj.pose.pose.position.x,
+ obj.pose.pose.position.y, mark_pts)
+
+ # Predicted future positions
+ for pose in obj.predicted_path:
+ self._add_inflated(pose.position.x, pose.position.y, mark_pts)
+
+ if mark_pts:
+ self._mark_pub.publish(_make_pc2(hdr, mark_pts))
+ elif self._clear_empty:
+ # Publish tiny clear cloud so Nav2 clears stale markings
+ self._clear_pub.publish(_make_pc2(hdr, []))
+
+ def _add_inflated(self, cx: float, cy: float, pts: List[tuple]) -> None:
+ """Add the centre + ring of inflation points at height 0.5 m."""
+ pts.append((cx, cy, 0.5))
+ for ox, oy in self._ring_offsets:
+ pts.append((cx + ox, cy + oy, 0.5))
+
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = CostmapLayerNode()
+ try:
+ rclpy.spin(node)
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/dynamic_obs_node.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/dynamic_obs_node.py
new file mode 100644
index 0000000..087accc
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/dynamic_obs_node.py
@@ -0,0 +1,319 @@
+"""
+dynamic_obs_node.py — ROS2 node: LIDAR moving-object detection + Kalman tracking.
+
+Pipeline:
+ 1. Subscribe /scan (RPLIDAR LaserScan, ~5.5 Hz).
+ 2. ObjectDetector performs background subtraction → moving blobs.
+ 3. TrackerManager runs Hungarian assignment + Kalman predict/update at 10 Hz.
+ 4. Publish /saltybot/moving_objects (MovingObjectArray).
+ 5. Publish /saltybot/moving_objects_viz (MarkerArray) for RViz.
+
+The 10 Hz timer drives the tracker regardless of scan rate, so prediction
+continues between scans (pure-predict steps).
+
+Subscribes:
+ /scan sensor_msgs/LaserScan RPLIDAR A1M8
+
+Publishes:
+ /saltybot/moving_objects saltybot_dynamic_obs_msgs/MovingObjectArray
+ /saltybot/moving_objects_viz visualization_msgs/MarkerArray
+
+Parameters:
+ max_tracks int 20
+ confirm_frames int 3
+ max_missed_frames int 6
+ assoc_dist_m float 1.5
+ prediction_hz float 10.0 (tracker + publish rate)
+ horizon_s float 2.5
+ pred_step_s float 0.5
+ odom_frame str 'odom'
+ min_speed_mps float 0.05 (suppress near-zero velocity tracks)
+ max_range_m float 8.0
+"""
+
+import time
+import math
+
+import numpy as np
+import rclpy
+from rclpy.node import Node
+from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
+
+from sensor_msgs.msg import LaserScan
+from geometry_msgs.msg import Pose, Point, Quaternion, Vector3
+from std_msgs.msg import Header, ColorRGBA
+from visualization_msgs.msg import Marker, MarkerArray
+
+try:
+ from saltybot_dynamic_obs_msgs.msg import TrackedObject, MovingObjectArray
+ _MSGS_AVAILABLE = True
+except ImportError:
+ _MSGS_AVAILABLE = False
+
+from .object_detector import ObjectDetector
+from .tracker_manager import TrackerManager, Track
+
+
+_SENSOR_QOS = QoSProfile(
+ reliability=ReliabilityPolicy.BEST_EFFORT,
+ history=HistoryPolicy.KEEP_LAST,
+ depth=5,
+)
+
+
+def _yaw_quat(yaw: float) -> Quaternion:
+ q = Quaternion()
+ q.w = math.cos(yaw * 0.5)
+ q.z = math.sin(yaw * 0.5)
+ return q
+
+
+class DynamicObsNode(Node):
+
+ def __init__(self):
+ super().__init__('dynamic_obs_tracker')
+
+ # ── Parameters ──────────────────────────────────────────────────────
+ self.declare_parameter('max_tracks', 20)
+ self.declare_parameter('confirm_frames', 3)
+ self.declare_parameter('max_missed_frames', 6)
+ self.declare_parameter('assoc_dist_m', 1.5)
+ self.declare_parameter('prediction_hz', 10.0)
+ self.declare_parameter('horizon_s', 2.5)
+ self.declare_parameter('pred_step_s', 0.5)
+ self.declare_parameter('odom_frame', 'odom')
+ self.declare_parameter('min_speed_mps', 0.05)
+ self.declare_parameter('max_range_m', 8.0)
+
+ max_tracks = self.get_parameter('max_tracks').value
+ confirm_f = self.get_parameter('confirm_frames').value
+ max_missed = self.get_parameter('max_missed_frames').value
+ assoc_dist = self.get_parameter('assoc_dist_m').value
+ pred_hz = self.get_parameter('prediction_hz').value
+ horizon_s = self.get_parameter('horizon_s').value
+ pred_step = self.get_parameter('pred_step_s').value
+ self._frame = self.get_parameter('odom_frame').value
+ self._min_spd = self.get_parameter('min_speed_mps').value
+ self._max_rng = self.get_parameter('max_range_m').value
+
+ # ── Core modules ────────────────────────────────────────────────────
+ self._detector = ObjectDetector(
+ grid_radius_m=min(self._max_rng + 2.0, 12.0),
+ max_cluster=int((self._max_rng / 0.1) ** 2 * 0.5),
+ )
+ self._tracker = TrackerManager(
+ max_tracks=max_tracks,
+ confirm_frames=confirm_f,
+ max_missed=max_missed,
+ assoc_dist_m=assoc_dist,
+ horizon_s=horizon_s,
+ pred_step_s=pred_step,
+ )
+ self._horizon_s = horizon_s
+ self._pred_step = pred_step
+
+ # ── State ────────────────────────────────────────────────────────────
+ self._latest_scan: LaserScan | None = None
+ self._last_track_t: float = time.monotonic()
+ self._scan_processed_stamp: float | None = None
+
+ # ── Subscriptions ────────────────────────────────────────────────────
+ self.create_subscription(LaserScan, '/scan', self._on_scan, _SENSOR_QOS)
+
+ # ── Publishers ───────────────────────────────────────────────────────
+ if _MSGS_AVAILABLE:
+ self._obj_pub = self.create_publisher(
+ MovingObjectArray, '/saltybot/moving_objects', 10
+ )
+ else:
+ self._obj_pub = None
+ self.get_logger().warning(
+ '[dyn_obs] saltybot_dynamic_obs_msgs not built — '
+ 'MovingObjectArray will not be published'
+ )
+
+ self._viz_pub = self.create_publisher(
+ MarkerArray, '/saltybot/moving_objects_viz', 10
+ )
+
+ # ── Timer ────────────────────────────────────────────────────────────
+ self.create_timer(1.0 / pred_hz, self._track_tick)
+
+ self.get_logger().info(
+ f'dynamic_obs_tracker ready — '
+ f'max_tracks={max_tracks} horizon={horizon_s}s assoc={assoc_dist}m'
+ )
+
+ # ── Scan callback ─────────────────────────────────────────────────────────
+
+ def _on_scan(self, msg: LaserScan) -> None:
+ self._latest_scan = msg
+
+ # ── 10 Hz tracker tick ────────────────────────────────────────────────────
+
+ def _track_tick(self) -> None:
+ t0 = time.monotonic()
+
+ now_mono = t0
+ dt = now_mono - self._last_track_t
+ dt = max(1e-3, min(dt, 0.5))
+ self._last_track_t = now_mono
+
+ scan = self._latest_scan
+ detections = []
+
+ if scan is not None:
+ stamp_sec = scan.header.stamp.sec + scan.header.stamp.nanosec * 1e-9
+ if stamp_sec != self._scan_processed_stamp:
+ self._scan_processed_stamp = stamp_sec
+ ranges = np.asarray(scan.ranges, dtype=np.float32)
+ detections = self._detector.update(
+ ranges,
+ scan.angle_min,
+ scan.angle_increment,
+ min(scan.range_max, self._max_rng),
+ )
+
+ confirmed = self._tracker.update(detections, dt)
+
+ latency_ms = (time.monotonic() - t0) * 1000.0
+ stamp = self.get_clock().now().to_msg()
+
+ if _MSGS_AVAILABLE and self._obj_pub is not None:
+ self._publish_objects(confirmed, stamp, latency_ms)
+ self._publish_viz(confirmed, stamp)
+
+ # ── Publish helpers ───────────────────────────────────────────────────────
+
+ def _publish_objects(self, confirmed: list, stamp, latency_ms: float) -> None:
+ arr = MovingObjectArray()
+ arr.header.stamp = stamp
+ arr.header.frame_id = self._frame
+ arr.active_count = len(confirmed)
+ arr.tentative_count = self._tracker.tentative_count
+ arr.detector_latency_ms = float(latency_ms)
+
+ for tr in confirmed:
+ px, py = tr.kalman.position
+ vx, vy = tr.kalman.velocity
+ speed = tr.kalman.speed
+ if speed < self._min_spd:
+ continue
+
+ obj = TrackedObject()
+ obj.header = arr.header
+ obj.object_id = tr.track_id
+ obj.pose.pose.position.x = px
+ obj.pose.pose.position.y = py
+ obj.pose.pose.orientation = _yaw_quat(math.atan2(vy, vx))
+ cov = tr.kalman.covariance_2x2
+ obj.pose.covariance[0] = float(cov[0, 0])
+ obj.pose.covariance[1] = float(cov[0, 1])
+ obj.pose.covariance[6] = float(cov[1, 0])
+ obj.pose.covariance[7] = float(cov[1, 1])
+ obj.velocity.x = vx
+ obj.velocity.y = vy
+ obj.speed_mps = speed
+ obj.confidence = min(1.0, tr.hits / (self._tracker._confirm_frames * 3))
+ obj.age_frames = tr.age
+ obj.hits = tr.hits
+ obj.is_valid = True
+
+ # Predicted path
+ for px_f, py_f, t_f in tr.kalman.predict_horizon(
+ self._horizon_s, self._pred_step
+ ):
+ p = Pose()
+ p.position.x = px_f
+ p.position.y = py_f
+ p.orientation.w = 1.0
+ obj.predicted_path.append(p)
+ obj.predicted_times.append(float(t_f))
+
+ arr.objects.append(obj)
+
+ self._obj_pub.publish(arr)
+
+ def _publish_viz(self, confirmed: list, stamp) -> None:
+ markers = MarkerArray()
+
+ # Delete old markers
+ del_marker = Marker()
+ del_marker.header.stamp = stamp
+ del_marker.header.frame_id = self._frame
+ del_marker.action = Marker.DELETEALL
+ markers.markers.append(del_marker)
+
+ for tr in confirmed:
+ px, py = tr.kalman.position
+ vx, vy = tr.kalman.velocity
+ speed = tr.kalman.speed
+ if speed < self._min_spd:
+ continue
+
+ # Cylinder at current position
+ m = Marker()
+ m.header.stamp = stamp
+ m.header.frame_id = self._frame
+ m.ns = 'dyn_obs'
+ m.id = tr.track_id
+ m.type = Marker.CYLINDER
+ m.action = Marker.ADD
+ m.pose.position.x = px
+ m.pose.position.y = py
+ m.pose.position.z = 0.5
+ m.pose.orientation.w = 1.0
+ m.scale.x = 0.4
+ m.scale.y = 0.4
+ m.scale.z = 1.0
+ m.color = ColorRGBA(r=1.0, g=0.2, b=0.0, a=0.7)
+ m.lifetime.sec = 1
+ markers.markers.append(m)
+
+ # Arrow for velocity
+ vel_m = Marker()
+ vel_m.header = m.header
+ vel_m.ns = 'dyn_obs_vel'
+ vel_m.id = tr.track_id
+ vel_m.type = Marker.ARROW
+ vel_m.action = Marker.ADD
+ from geometry_msgs.msg import Point as GPoint
+ p_start = GPoint(x=px, y=py, z=1.0)
+ p_end = GPoint(x=px + vx, y=py + vy, z=1.0)
+ vel_m.points = [p_start, p_end]
+ vel_m.scale.x = 0.05
+ vel_m.scale.y = 0.10
+ vel_m.color = ColorRGBA(r=1.0, g=1.0, b=0.0, a=0.9)
+ vel_m.lifetime.sec = 1
+ markers.markers.append(vel_m)
+
+ # Line strip for predicted path
+ path_m = Marker()
+ path_m.header = m.header
+ path_m.ns = 'dyn_obs_path'
+ path_m.id = tr.track_id
+ path_m.type = Marker.LINE_STRIP
+ path_m.action = Marker.ADD
+ path_m.scale.x = 0.04
+ path_m.color = ColorRGBA(r=1.0, g=0.5, b=0.0, a=0.5)
+ path_m.lifetime.sec = 1
+ path_m.points.append(GPoint(x=px, y=py, z=0.5))
+ for fx, fy, _ in tr.kalman.predict_horizon(self._horizon_s, self._pred_step):
+ path_m.points.append(GPoint(x=fx, y=fy, z=0.5))
+ markers.markers.append(path_m)
+
+ self._viz_pub.publish(markers)
+
+
+def main(args=None):
+ rclpy.init(args=args)
+ node = DynamicObsNode()
+ try:
+ rclpy.spin(node)
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == '__main__':
+ main()
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/kalman_tracker.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/kalman_tracker.py
new file mode 100644
index 0000000..3c9d9a7
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/kalman_tracker.py
@@ -0,0 +1,132 @@
+"""
+kalman_tracker.py — Single-object Kalman filter for 2-D ground-plane tracking.
+
+State vector: x = [px, py, vx, vy] (position + velocity)
+Motion model: constant-velocity (CV) with process noise on acceleration
+
+Predict step:
+ F = | 1 0 dt 0 | x_{k|k-1} = F @ x_{k-1|k-1}
+ | 0 1 0 dt | P_{k|k-1} = F @ P @ F^T + Q
+ | 0 0 1 0 |
+ | 0 0 0 1 |
+
+Update step (position observation only):
+ H = | 1 0 0 0 | y = z - H @ x
+ | 0 1 0 0 | S = H @ P @ H^T + R
+ K = P @ H^T @ inv(S)
+ x = x + K @ y
+ P = (I - K @ H) @ P (Joseph form for stability)
+
+Trajectory prediction: unrolls the CV model forward at fixed time steps.
+"""
+
+from __future__ import annotations
+
+from typing import List, Tuple
+
+import numpy as np
+
+
+# ── Default noise matrices ────────────────────────────────────────────────────
+# Process noise: models uncertainty in acceleration between frames
+_Q_BASE = np.diag([0.02, 0.02, 0.8, 0.8]).astype(np.float64)
+
+# Measurement noise: LIDAR centroid uncertainty (~0.15 m std)
+_R = np.diag([0.025, 0.025]).astype(np.float64) # 0.16 m sigma each axis
+
+# Observation matrix
+_H = np.array([[1, 0, 0, 0],
+ [0, 1, 0, 0]], dtype=np.float64)
+_I4 = np.eye(4, dtype=np.float64)
+
+
+class KalmanTracker:
+ """
+ Kalman filter tracking one object.
+
+ Parameters
+ ----------
+ x0, y0 : initial position (metres, odom frame)
+ process_noise : scalar multiplier on _Q_BASE
+ """
+
+ def __init__(self, x0: float, y0: float, process_noise: float = 1.0):
+ self._x = np.array([x0, y0, 0.0, 0.0], dtype=np.float64)
+ self._P = np.eye(4, dtype=np.float64) * 0.5
+ self._Q = _Q_BASE * process_noise
+
+ # ── Core filter ───────────────────────────────────────────────────────────
+
+ def predict(self, dt: float) -> None:
+ """Propagate state by dt seconds."""
+ F = np.array([
+ [1, 0, dt, 0],
+ [0, 1, 0, dt],
+ [0, 0, 1, 0],
+ [0, 0, 0, 1],
+ ], dtype=np.float64)
+ self._x = F @ self._x
+ self._P = F @ self._P @ F.T + self._Q
+
+ def update(self, z: np.ndarray) -> None:
+ """
+ Incorporate a position measurement z = [x, y].
+ Uses Joseph-form covariance update for numerical stability.
+ """
+ y = z.astype(np.float64) - _H @ self._x
+ S = _H @ self._P @ _H.T + _R
+ K = self._P @ _H.T @ np.linalg.inv(S)
+ self._x = self._x + K @ y
+ IKH = _I4 - K @ _H
+ # Joseph form: (I-KH) P (I-KH)^T + K R K^T
+ self._P = IKH @ self._P @ IKH.T + K @ _R @ K.T
+
+ # ── Prediction horizon ────────────────────────────────────────────────────
+
+ def predict_horizon(
+ self,
+ horizon_s: float = 2.5,
+ step_s: float = 0.5,
+ ) -> List[Tuple[float, float, float]]:
+ """
+ Return [(x, y, t), ...] at equally-spaced future times.
+
+ Does NOT modify internal filter state.
+ """
+ predictions: List[Tuple[float, float, float]] = []
+ state = self._x.copy()
+ t = 0.0
+ F_step = np.array([
+ [1, 0, step_s, 0],
+ [0, 1, 0, step_s],
+ [0, 0, 1, 0],
+ [0, 0, 0, 1],
+ ], dtype=np.float64)
+ while t < horizon_s - 1e-6:
+ state = F_step @ state
+ t += step_s
+ predictions.append((float(state[0]), float(state[1]), t))
+ return predictions
+
+ # ── Properties ────────────────────────────────────────────────────────────
+
+ @property
+ def position(self) -> Tuple[float, float]:
+ return float(self._x[0]), float(self._x[1])
+
+ @property
+ def velocity(self) -> Tuple[float, float]:
+ return float(self._x[2]), float(self._x[3])
+
+ @property
+ def speed(self) -> float:
+ return float(np.hypot(self._x[2], self._x[3]))
+
+ @property
+ def covariance_2x2(self) -> np.ndarray:
+ """Position covariance (top-left 2×2 of P)."""
+ return self._P[:2, :2].copy()
+
+ @property
+ def state(self) -> np.ndarray:
+ return self._x.copy()
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/object_detector.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/object_detector.py
new file mode 100644
index 0000000..be8c3d8
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/object_detector.py
@@ -0,0 +1,168 @@
+"""
+object_detector.py — LIDAR-based moving object detector.
+
+Algorithm:
+ 1. Convert each LaserScan to a 2-D occupancy grid (robot-centred, fixed size).
+ 2. Maintain a background model via exponential moving average (EMA):
+ bg_t = α * current + (1-α) * bg_{t-1} (only for non-moving cells)
+ 3. Foreground = cells whose occupancy significantly exceeds the background.
+ 4. Cluster foreground cells with scipy connected-components → Detection list.
+
+The grid is robot-relative (origin at robot centre) so it naturally tracks
+the robot's motion without needing TF at this stage. The caller is responsible
+for transforming detections into a stable frame (odom) before passing to the
+tracker.
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from typing import List, Optional
+
+import numpy as np
+from scipy import ndimage
+
+
+@dataclass
+class Detection:
+ """A clustered moving foreground blob from one scan."""
+ x: float # centroid x in sensor frame (m)
+ y: float # centroid y in sensor frame (m)
+ size_m2: float # approximate area of the cluster (m²)
+ range_m: float # distance from robot (m)
+
+
+class ObjectDetector:
+ """
+ Detects moving objects in consecutive 2-D LIDAR scans.
+
+ Parameters
+ ----------
+ grid_radius_m : half-size of the occupancy grid (grid covers ±radius)
+ resolution : metres per cell
+ bg_alpha : EMA update rate for background (small = slow forgetting)
+ motion_thr : occupancy delta above background to count as moving
+ min_cluster : minimum cells to keep a cluster
+ max_cluster : maximum cells before a cluster is considered static wall
+ """
+
+ def __init__(
+ self,
+ grid_radius_m: float = 10.0,
+ resolution: float = 0.10,
+ bg_alpha: float = 0.04,
+ motion_thr: float = 0.45,
+ min_cluster: int = 3,
+ max_cluster: int = 200,
+ ):
+ cells = int(2 * grid_radius_m / resolution)
+ self._cells = cells
+ self._res = resolution
+ self._origin = -grid_radius_m # world x/y at grid index 0
+ self._bg_alpha = bg_alpha
+ self._motion_thr = motion_thr
+ self._min_clust = min_cluster
+ self._max_clust = max_cluster
+
+ self._bg = np.zeros((cells, cells), dtype=np.float32)
+ self._initialized = False
+
+ # ── Public API ────────────────────────────────────────────────────────────
+
+ def update(
+ self,
+ ranges: np.ndarray,
+ angle_min: float,
+ angle_increment: float,
+ range_max: float,
+ ) -> List[Detection]:
+ """
+ Process one LaserScan and return detected moving blobs.
+
+ Parameters
+ ----------
+ ranges : 1-D array of range readings (metres)
+ angle_min : angle of first beam (radians)
+ angle_increment : angular step between beams (radians)
+ range_max : maximum valid range (metres)
+ """
+ curr = self._scan_to_grid(ranges, angle_min, angle_increment, range_max)
+
+ if not self._initialized:
+ self._bg = curr.copy()
+ self._initialized = True
+ return []
+
+ # Foreground mask
+ motion_mask = (curr - self._bg) > self._motion_thr
+
+ # Update background only on non-moving cells
+ static = ~motion_mask
+ self._bg[static] = (
+ self._bg[static] * (1.0 - self._bg_alpha)
+ + curr[static] * self._bg_alpha
+ )
+
+ return self._cluster(motion_mask)
+
+ def reset(self) -> None:
+ self._bg[:] = 0.0
+ self._initialized = False
+
+ # ── Internals ─────────────────────────────────────────────────────────────
+
+ def _scan_to_grid(
+ self,
+ ranges: np.ndarray,
+ angle_min: float,
+ angle_increment: float,
+ range_max: float,
+ ) -> np.ndarray:
+ grid = np.zeros((self._cells, self._cells), dtype=np.float32)
+ n = len(ranges)
+ angles = angle_min + np.arange(n) * angle_increment
+ r = np.asarray(ranges, dtype=np.float32)
+
+ valid = (r > 0.05) & (r < range_max) & np.isfinite(r)
+ r, a = r[valid], angles[valid]
+
+ x = r * np.cos(a)
+ y = r * np.sin(a)
+
+ ix = np.clip(
+ ((x - self._origin) / self._res).astype(np.int32), 0, self._cells - 1
+ )
+ iy = np.clip(
+ ((y - self._origin) / self._res).astype(np.int32), 0, self._cells - 1
+ )
+ grid[iy, ix] = 1.0
+ return grid
+
+ def _cluster(self, mask: np.ndarray) -> List[Detection]:
+ # Dilate slightly to connect nearby hit cells into one blob
+ struct = ndimage.generate_binary_structure(2, 2)
+ dilated = ndimage.binary_dilation(mask, structure=struct, iterations=1)
+
+ labeled, n_labels = ndimage.label(dilated)
+ detections: List[Detection] = []
+
+ for label_id in range(1, n_labels + 1):
+ coords = np.argwhere(labeled == label_id)
+ n_cells = len(coords)
+ if n_cells < self._min_clust or n_cells > self._max_clust:
+ continue
+
+ ys, xs = coords[:, 0], coords[:, 1]
+ cx_grid = float(np.mean(xs))
+ cy_grid = float(np.mean(ys))
+ cx = cx_grid * self._res + self._origin
+ cy = cy_grid * self._res + self._origin
+
+ detections.append(Detection(
+ x=cx,
+ y=cy,
+ size_m2=n_cells * self._res ** 2,
+ range_m=float(np.hypot(cx, cy)),
+ ))
+
+ return detections
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/tracker_manager.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/tracker_manager.py
new file mode 100644
index 0000000..5862c94
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/saltybot_dynamic_obstacles/tracker_manager.py
@@ -0,0 +1,206 @@
+"""
+tracker_manager.py — Multi-object tracker with Hungarian data association.
+
+Track lifecycle:
+ TENTATIVE → confirmed after `confirm_frames` consecutive hits
+ CONFIRMED → normal tracked state
+ LOST → missed for 1..max_missed frames (still predicts, not published)
+ DEAD → missed > max_missed → removed
+
+Association:
+ Uses scipy.optimize.linear_sum_assignment (Hungarian algorithm) on a cost
+ matrix of Euclidean distances between predicted track positions and new
+ detections. Assignments with cost > assoc_dist_m are rejected.
+
+Up to `max_tracks` simultaneous live tracks (tentative + confirmed).
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from enum import IntEnum
+from typing import Dict, List, Optional, Tuple
+
+import numpy as np
+from scipy.optimize import linear_sum_assignment
+
+from .kalman_tracker import KalmanTracker
+from .object_detector import Detection
+
+
+class TrackState(IntEnum):
+ TENTATIVE = 0
+ CONFIRMED = 1
+ LOST = 2
+
+
+@dataclass
+class Track:
+ track_id: int
+ kalman: KalmanTracker
+ state: TrackState = TrackState.TENTATIVE
+ hits: int = 1
+ age: int = 1 # frames since creation
+ missed: int = 0 # consecutive missed frames
+
+
+class TrackerManager:
+ """
+ Manages a pool of Kalman tracks.
+
+ Parameters
+ ----------
+ max_tracks : hard cap on simultaneously alive tracks
+ confirm_frames : hits needed before a track is CONFIRMED
+ max_missed : consecutive missed frames before a track is DEAD
+ assoc_dist_m : max allowed distance (m) for a valid assignment
+ horizon_s : prediction horizon for trajectory output (seconds)
+ pred_step_s : time step between predicted waypoints
+ process_noise : KalmanTracker process-noise multiplier
+ """
+
+ def __init__(
+ self,
+ max_tracks: int = 20,
+ confirm_frames: int = 3,
+ max_missed: int = 6,
+ assoc_dist_m: float = 1.5,
+ horizon_s: float = 2.5,
+ pred_step_s: float = 0.5,
+ process_noise: float = 1.0,
+ ):
+ self._max_tracks = max_tracks
+ self._confirm_frames = confirm_frames
+ self._max_missed = max_missed
+ self._assoc_dist = assoc_dist_m
+ self._horizon_s = horizon_s
+ self._pred_step = pred_step_s
+ self._proc_noise = process_noise
+
+ self._tracks: Dict[int, Track] = {}
+ self._next_id: int = 1
+
+ # ── Public API ────────────────────────────────────────────────────────────
+
+ def update(self, detections: List[Detection], dt: float) -> List[Track]:
+ """
+ Process one frame of detections.
+
+ 1. Predict all tracks by dt.
+ 2. Hungarian assignment of predictions → detections.
+ 3. Update matched tracks; mark unmatched tracks as LOST.
+ 4. Promote tracks crossing `confirm_frames`.
+ 5. Create new tracks for unmatched detections (if room).
+ 6. Remove DEAD tracks.
+
+ Returns confirmed tracks only.
+ """
+ # 1. Predict
+ for tr in self._tracks.values():
+ tr.kalman.predict(dt)
+ tr.age += 1
+
+ # 2. Assign
+ matched, unmatched_tracks, unmatched_dets = self._assign(detections)
+
+ # 3a. Update matched
+ for tid, did in matched:
+ tr = self._tracks[tid]
+ det = detections[did]
+ tr.kalman.update(np.array([det.x, det.y]))
+ tr.hits += 1
+ tr.missed = 0
+ if tr.state == TrackState.LOST:
+ tr.state = TrackState.CONFIRMED
+ elif tr.state == TrackState.TENTATIVE and tr.hits >= self._confirm_frames:
+ tr.state = TrackState.CONFIRMED
+
+ # 3b. Unmatched tracks
+ for tid in unmatched_tracks:
+ tr = self._tracks[tid]
+ tr.missed += 1
+ if tr.missed > 1:
+ tr.state = TrackState.LOST
+
+ # 4. New tracks for unmatched detections
+ live = sum(1 for t in self._tracks.values() if t.state != TrackState.LOST
+ or t.missed <= self._max_missed)
+ for did in unmatched_dets:
+ if live >= self._max_tracks:
+ break
+ det = detections[did]
+ init_state = (TrackState.CONFIRMED
+ if self._confirm_frames <= 1
+ else TrackState.TENTATIVE)
+ new_tr = Track(
+ track_id=self._next_id,
+ kalman=KalmanTracker(det.x, det.y, self._proc_noise),
+ state=init_state,
+ )
+ self._tracks[self._next_id] = new_tr
+ self._next_id += 1
+ live += 1
+
+ # 5. Prune dead
+ dead = [tid for tid, t in self._tracks.items() if t.missed > self._max_missed]
+ for tid in dead:
+ del self._tracks[tid]
+
+ return [t for t in self._tracks.values() if t.state == TrackState.CONFIRMED]
+
+ @property
+ def all_tracks(self) -> List[Track]:
+ return list(self._tracks.values())
+
+ @property
+ def tentative_count(self) -> int:
+ return sum(1 for t in self._tracks.values()
+ if t.state == TrackState.TENTATIVE)
+
+ def reset(self) -> None:
+ self._tracks.clear()
+ self._next_id = 1
+
+ # ── Hungarian assignment ──────────────────────────────────────────────────
+
+ def _assign(
+ self,
+ detections: List[Detection],
+ ) -> Tuple[List[Tuple[int, int]], List[int], List[int]]:
+ """
+ Returns:
+ matched — list of (track_id, det_index)
+ unmatched_tids — track IDs with no detection assigned
+ unmatched_dids — detection indices with no track assigned
+ """
+ track_ids = list(self._tracks.keys())
+
+ if not track_ids or not detections:
+ return [], track_ids, list(range(len(detections)))
+
+ # Build cost matrix: rows=tracks, cols=detections
+ cost = np.full((len(track_ids), len(detections)), fill_value=np.inf)
+ for r, tid in enumerate(track_ids):
+ tx, ty = self._tracks[tid].kalman.position
+ for c, det in enumerate(detections):
+ cost[r, c] = np.hypot(tx - det.x, ty - det.y)
+
+ row_ind, col_ind = linear_sum_assignment(cost)
+
+ matched: List[Tuple[int, int]] = []
+ matched_track_idx: set = set()
+ matched_det_idx: set = set()
+
+ for r, c in zip(row_ind, col_ind):
+ if cost[r, c] > self._assoc_dist:
+ continue
+ matched.append((track_ids[r], c))
+ matched_track_idx.add(r)
+ matched_det_idx.add(c)
+
+ unmatched_tids = [track_ids[r] for r in range(len(track_ids))
+ if r not in matched_track_idx]
+ unmatched_dids = [c for c in range(len(detections))
+ if c not in matched_det_idx]
+
+ return matched, unmatched_tids, unmatched_dids
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.cfg b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.cfg
new file mode 100644
index 0000000..eb006bc
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.cfg
@@ -0,0 +1,4 @@
+[develop]
+script_dir=$base/lib/saltybot_dynamic_obstacles
+[install]
+install_scripts=$base/lib/saltybot_dynamic_obstacles
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.py
new file mode 100644
index 0000000..62bae03
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/setup.py
@@ -0,0 +1,32 @@
+from setuptools import setup, find_packages
+from glob import glob
+
+package_name = 'saltybot_dynamic_obstacles'
+
+setup(
+ name=package_name,
+ version='0.1.0',
+ packages=find_packages(exclude=['test']),
+ data_files=[
+ ('share/ament_index/resource_index/packages',
+ ['resource/' + package_name]),
+ ('share/' + package_name, ['package.xml']),
+ ('share/' + package_name + '/launch',
+ glob('launch/*.launch.py')),
+ ('share/' + package_name + '/config',
+ glob('config/*.yaml')),
+ ],
+ install_requires=['setuptools'],
+ zip_safe=True,
+ maintainer='SaltyLab',
+ maintainer_email='robot@saltylab.local',
+ description='Dynamic obstacle tracking: LIDAR motion detection, Kalman tracking, Nav2 costmap',
+ license='MIT',
+ tests_require=['pytest'],
+ entry_points={
+ 'console_scripts': [
+ 'dynamic_obs_tracker = saltybot_dynamic_obstacles.dynamic_obs_node:main',
+ 'dynamic_obs_costmap = saltybot_dynamic_obstacles.costmap_layer_node:main',
+ ],
+ },
+)
diff --git a/jetson/ros2_ws/src/saltybot_dynamic_obstacles/test/test_dynamic_obstacles.py b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/test/test_dynamic_obstacles.py
new file mode 100644
index 0000000..6b87c79
--- /dev/null
+++ b/jetson/ros2_ws/src/saltybot_dynamic_obstacles/test/test_dynamic_obstacles.py
@@ -0,0 +1,262 @@
+"""
+test_dynamic_obstacles.py — Unit tests for KalmanTracker, TrackerManager,
+and ObjectDetector.
+
+Runs without ROS2 / hardware (no rclpy imports).
+"""
+
+from __future__ import annotations
+
+import math
+import sys
+import os
+
+import numpy as np
+import pytest
+
+sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
+
+from saltybot_dynamic_obstacles.kalman_tracker import KalmanTracker
+from saltybot_dynamic_obstacles.tracker_manager import TrackerManager, TrackState
+from saltybot_dynamic_obstacles.object_detector import ObjectDetector, Detection
+
+
+# ── KalmanTracker ─────────────────────────────────────────────────────────────
+
+class TestKalmanTracker:
+
+ def test_initial_position(self):
+ kt = KalmanTracker(3.0, 4.0)
+ px, py = kt.position
+ assert px == pytest.approx(3.0)
+ assert py == pytest.approx(4.0)
+
+ def test_initial_velocity_zero(self):
+ kt = KalmanTracker(0.0, 0.0)
+ vx, vy = kt.velocity
+ assert vx == pytest.approx(0.0)
+ assert vy == pytest.approx(0.0)
+
+ def test_predict_moves_position(self):
+ kt = KalmanTracker(0.0, 0.0)
+ # Give it some velocity via update sequence
+ kt.update(np.array([0.1, 0.0]))
+ kt.update(np.array([0.2, 0.0]))
+ kt.predict(0.1)
+ px, _ = kt.position
+ assert px > 0.0 # should have moved forward
+
+ def test_pure_predict_constant_velocity(self):
+ """After velocity is established, predict() should move linearly."""
+ kt = KalmanTracker(0.0, 0.0)
+ # Force velocity by repeated updates
+ for i in range(10):
+ kt.update(np.array([i * 0.1, 0.0]))
+ kt.predict(0.1)
+ vx, _ = kt.velocity
+ px0, _ = kt.position
+ kt.predict(1.0)
+ px1, _ = kt.position
+ # Should advance roughly vx * 1.0 metres
+ assert px1 == pytest.approx(px0 + vx * 1.0, abs=0.3)
+
+ def test_update_corrects_position(self):
+ kt = KalmanTracker(0.0, 0.0)
+ # Predict way off
+ kt.predict(10.0)
+ # Then update to ground truth
+ kt.update(np.array([1.0, 2.0]))
+ px, py = kt.position
+ # Should move toward (1, 2)
+ assert px == pytest.approx(1.0, abs=0.5)
+ assert py == pytest.approx(2.0, abs=0.5)
+
+ def test_predict_horizon_length(self):
+ kt = KalmanTracker(0.0, 0.0)
+ preds = kt.predict_horizon(horizon_s=2.5, step_s=0.5)
+ assert len(preds) == 5 # 0.5, 1.0, 1.5, 2.0, 2.5
+
+ def test_predict_horizon_times(self):
+ kt = KalmanTracker(0.0, 0.0)
+ preds = kt.predict_horizon(horizon_s=2.0, step_s=0.5)
+ times = [t for _, _, t in preds]
+ assert times == pytest.approx([0.5, 1.0, 1.5, 2.0], abs=0.01)
+
+ def test_predict_horizon_does_not_mutate_state(self):
+ kt = KalmanTracker(1.0, 2.0)
+ kt.predict_horizon(horizon_s=3.0, step_s=0.5)
+ px, py = kt.position
+ assert px == pytest.approx(1.0)
+ assert py == pytest.approx(2.0)
+
+ def test_speed_zero_at_init(self):
+ kt = KalmanTracker(5.0, 5.0)
+ assert kt.speed == pytest.approx(0.0)
+
+ def test_covariance_shape(self):
+ kt = KalmanTracker(0.0, 0.0)
+ cov = kt.covariance_2x2
+ assert cov.shape == (2, 2)
+
+ def test_covariance_positive_definite(self):
+ kt = KalmanTracker(0.0, 0.0)
+ for _ in range(5):
+ kt.predict(0.1)
+ kt.update(np.array([0.1, 0.0]))
+ eigvals = np.linalg.eigvalsh(kt.covariance_2x2)
+ assert np.all(eigvals > 0)
+
+ def test_joseph_form_stays_symmetric(self):
+ """Covariance should remain symmetric after many updates."""
+ kt = KalmanTracker(0.0, 0.0)
+ for i in range(50):
+ kt.predict(0.1)
+ kt.update(np.array([i * 0.01, 0.0]))
+ P = kt._P
+ assert np.allclose(P, P.T, atol=1e-10)
+
+
+# ── TrackerManager ────────────────────────────────────────────────────────────
+
+class TestTrackerManager:
+
+ def _det(self, x, y):
+ return Detection(x=x, y=y, size_m2=0.1, range_m=math.hypot(x, y))
+
+ def test_empty_detections_no_tracks(self):
+ tm = TrackerManager()
+ confirmed = tm.update([], 0.1)
+ assert confirmed == []
+
+ def test_track_created_on_detection(self):
+ tm = TrackerManager(confirm_frames=1)
+ confirmed = tm.update([self._det(1.0, 0.0)], 0.1)
+ assert len(confirmed) == 1
+
+ def test_track_tentative_before_confirm(self):
+ tm = TrackerManager(confirm_frames=3)
+ tm.update([self._det(1.0, 0.0)], 0.1)
+ # Only 1 hit — should still be tentative
+ assert tm.tentative_count == 1
+
+ def test_track_confirmed_after_N_hits(self):
+ tm = TrackerManager(confirm_frames=3, assoc_dist_m=2.0)
+ for _ in range(4):
+ confirmed = tm.update([self._det(1.0, 0.0)], 0.1)
+ assert len(confirmed) == 1
+
+ def test_track_deleted_after_max_missed(self):
+ tm = TrackerManager(confirm_frames=1, max_missed=3, assoc_dist_m=2.0)
+ tm.update([self._det(1.0, 0.0)], 0.1) # create
+ for _ in range(5):
+ tm.update([], 0.1) # no detections → missed++
+ assert len(tm.all_tracks) == 0
+
+ def test_max_tracks_cap(self):
+ tm = TrackerManager(max_tracks=5, confirm_frames=1)
+ dets = [self._det(float(i), 0.0) for i in range(10)]
+ tm.update(dets, 0.1)
+ assert len(tm.all_tracks) <= 5
+
+ def test_consistent_track_id(self):
+ tm = TrackerManager(confirm_frames=3, assoc_dist_m=1.5)
+ for i in range(5):
+ confirmed = tm.update([self._det(1.0 + i * 0.01, 0.0)], 0.1)
+ assert len(confirmed) == 1
+ track_id = confirmed[0].track_id
+ # One more tick — ID should be stable
+ confirmed2 = tm.update([self._det(1.06, 0.0)], 0.1)
+ assert confirmed2[0].track_id == track_id
+
+ def test_two_independent_tracks(self):
+ tm = TrackerManager(confirm_frames=3, assoc_dist_m=0.8)
+ for _ in range(5):
+ confirmed = tm.update([self._det(1.0, 0.0), self._det(5.0, 0.0)], 0.1)
+ assert len(confirmed) == 2
+
+ def test_reset_clears_all(self):
+ tm = TrackerManager(confirm_frames=1)
+ tm.update([self._det(1.0, 0.0)], 0.1)
+ tm.reset()
+ assert len(tm.all_tracks) == 0
+
+ def test_far_detection_not_assigned(self):
+ tm = TrackerManager(confirm_frames=1, assoc_dist_m=0.5)
+ tm.update([self._det(1.0, 0.0)], 0.1) # create track at (1,0)
+ # Detection 3 m away → new track, not update
+ tm.update([self._det(4.0, 0.0)], 0.1)
+ assert len(tm.all_tracks) == 2
+
+
+# ── ObjectDetector ────────────────────────────────────────────────────────────
+
+class TestObjectDetector:
+
+ def _empty_scan(self, n=360, rmax=8.0) -> tuple:
+ """All readings at max range (static background)."""
+ ranges = np.full(n, rmax - 0.1, dtype=np.float32)
+ return ranges, -math.pi, 2 * math.pi / n, rmax
+
+ def _scan_with_blob(self, blob_r=2.0, blob_theta=0.0, n=360, rmax=8.0) -> tuple:
+ """Background scan + a short-range cluster at blob_theta."""
+ ranges = np.full(n, rmax - 0.1, dtype=np.float32)
+ angle_inc = 2 * math.pi / n
+ angle_min = -math.pi
+ # Put a cluster of ~10 beams at blob_r
+ center_idx = int((blob_theta - angle_min) / angle_inc) % n
+ for di in range(-5, 6):
+ idx = (center_idx + di) % n
+ ranges[idx] = blob_r
+ return ranges, angle_min, angle_inc, rmax
+
+ def test_empty_scan_no_detections_after_warmup(self):
+ od = ObjectDetector()
+ r, a_min, a_inc, rmax = self._empty_scan()
+ od.update(r, a_min, a_inc, rmax) # init background
+ for _ in range(3):
+ dets = od.update(r, a_min, a_inc, rmax)
+ assert len(dets) == 0
+
+ def test_moving_blob_detected(self):
+ od = ObjectDetector()
+ bg_r, a_min, a_inc, rmax = self._empty_scan()
+ od.update(bg_r, a_min, a_inc, rmax) # seed background
+ for _ in range(5):
+ od.update(bg_r, a_min, a_inc, rmax)
+ # Now inject a foreground blob
+ fg_r, _, _, _ = self._scan_with_blob(blob_r=2.0, blob_theta=0.0)
+ dets = od.update(fg_r, a_min, a_inc, rmax)
+ assert len(dets) >= 1
+
+ def test_detection_centroid_approximate(self):
+ od = ObjectDetector()
+ bg_r, a_min, a_inc, rmax = self._empty_scan()
+ for _ in range(8):
+ od.update(bg_r, a_min, a_inc, rmax)
+ fg_r, _, _, _ = self._scan_with_blob(blob_r=3.0, blob_theta=0.0)
+ dets = od.update(fg_r, a_min, a_inc, rmax)
+ assert len(dets) >= 1
+ # Blob is at ~3 m along x-axis (theta=0)
+ cx = dets[0].x
+ cy = dets[0].y
+ assert abs(cx - 3.0) < 0.5
+ assert abs(cy) < 0.5
+
+ def test_reset_clears_background(self):
+ od = ObjectDetector()
+ bg_r, a_min, a_inc, rmax = self._empty_scan()
+ for _ in range(5):
+ od.update(bg_r, a_min, a_inc, rmax)
+ od.reset()
+ assert not od._initialized
+
+ def test_no_inf_nan_ranges(self):
+ od = ObjectDetector()
+ r = np.array([np.inf, np.nan, 5.0, -1.0, 0.0] * 72, dtype=np.float32)
+ a_min = -math.pi
+ a_inc = 2 * math.pi / len(r)
+ od.update(r, a_min, a_inc, 8.0) # should not raise
+
+
+if __name__ == '__main__':
+ pytest.main([__file__, '-v'])