feat(fleet): multi-robot SLAM — map sharing + cooperative exploration (Issue #134) #146

Merged
sl-jetson merged 1 commits from sl-perception/issue-134-multi-robot-slam into main 2026-03-02 09:51:10 -05:00
26 changed files with 1948 additions and 0 deletions
Showing only changes of commit b9b866854f - Show all commits

View File

@ -0,0 +1,21 @@
cmake_minimum_required(VERSION 3.8)
project(saltybot_fleet)
find_package(ament_cmake REQUIRED)
find_package(ament_cmake_python REQUIRED)
# Install the Python package (registers console_scripts entry points via setup.py)
ament_python_install_package(${PROJECT_NAME})
install(PROGRAMS
scripts/fleet_status.py
DESTINATION lib/${PROJECT_NAME}
)
install(DIRECTORY
launch
config
DESTINATION share/${PROJECT_NAME}/
)
ament_package()

View File

@ -0,0 +1,49 @@
# saltybot_fleet — default parameters
#
# Override per-robot values at launch time:
# ros2 launch saltybot_fleet fleet_robot.launch.py robot_id:=saltybot_2 robot_namespace:=/saltybot_2
#
# Fleet topic layout:
# /fleet/maps MapChunk — robot → coordinator (compressed grids)
# /fleet/robots RobotInfo — heartbeat / status (all robots)
# /fleet/merged_map OccupancyGrid — coordinator → all (merged map)
# /fleet/exploration_frontiers FrontierArray — coordinator → all (consolidated frontiers)
# /fleet/status String (JSON) — coordinator health
#
# Per-robot topics (namespaced):
# /<robot_ns>/rtabmap/map OccupancyGrid — local RTAB-Map output
# /<robot_ns>/rtabmap/odom Odometry — local odometry
# /<robot_ns>/navigate_to_pose Nav2 action — local navigation
# ── Fleet coordinator ──────────────────────────────────────────────────────────
fleet_manager:
ros__parameters:
heartbeat_timeout_sec: 10.0 # seconds before a silent robot is considered lost
merge_hz: 2.0 # merged map publish rate (Hz)
frontier_timeout_sec: 60.0 # re-assign frontier if robot silent this long
map_resolution: 0.05 # all robots MUST match this (RTAB-Map Grid/CellSize)
# ── Frontier detector ──────────────────────────────────────────────────────────
frontier_detector:
ros__parameters:
detection_hz: 1.0 # frontier detection rate
min_frontier_cells: 10 # minimum frontier cluster size (cells)
# ── Per-robot: map broadcaster ────────────────────────────────────────────────
# Override robot_id / robot_namespace per robot at launch.
map_broadcaster:
ros__parameters:
robot_id: 'saltybot_1'
robot_namespace: '/saltybot_1'
broadcast_hz: 1.0
# ── Per-robot: cooperative explorer ──────────────────────────────────────────
fleet_explorer:
ros__parameters:
robot_id: 'saltybot_1'
robot_namespace: '/saltybot_1'
exploration_hz: 0.5 # state machine tick rate
goal_tolerance_m: 0.5 # distance threshold to declare frontier reached
nav_timeout_sec: 120.0 # cancel Nav2 goal after this long
backoff_sec: 5.0 # wait after stall before requesting new frontier
battery_min_pct: 20.0 # stop exploration below this battery level

View File

@ -0,0 +1,78 @@
"""
fleet_coordinator.launch.py Fleet-wide coordinator nodes (run on ONE robot or dedicated node).
Starts:
fleet_manager robot registry, map merging, frontier task allocator
frontier_detector detect frontiers on the merged map
Typically run on the robot that has the most compute headroom, or on the Orin.
Only one instance should run per fleet. Other robots run fleet_robot.launch.py only.
Launch args:
robot_id str "saltybot_1" (which robot's pose to use for scoring)
map_resolution float "0.05"
heartbeat_timeout float "10.0"
merge_hz float "2.0"
min_frontier_cells int "10"
Example:
ros2 launch saltybot_fleet fleet_coordinator.launch.py robot_id:=saltybot_1
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
robot_id_arg = DeclareLaunchArgument(
'robot_id', default_value='saltybot_1',
)
res_arg = DeclareLaunchArgument(
'map_resolution', default_value='0.05',
)
hb_arg = DeclareLaunchArgument(
'heartbeat_timeout', default_value='10.0',
)
hz_arg = DeclareLaunchArgument(
'merge_hz', default_value='2.0',
)
frontier_arg = DeclareLaunchArgument(
'min_frontier_cells', default_value='10',
)
fleet_manager = Node(
package='saltybot_fleet',
executable='fleet_manager',
name='fleet_manager',
output='screen',
parameters=[{
'heartbeat_timeout_sec': LaunchConfiguration('heartbeat_timeout'),
'merge_hz': LaunchConfiguration('merge_hz'),
'frontier_timeout_sec': 60.0,
'map_resolution': LaunchConfiguration('map_resolution'),
}],
)
frontier_detector = Node(
package='saltybot_fleet',
executable='frontier_detector',
name='frontier_detector',
output='screen',
parameters=[{
'robot_id': LaunchConfiguration('robot_id'),
'detection_hz': 1.0,
'min_frontier_cells': LaunchConfiguration('min_frontier_cells'),
}],
)
return LaunchDescription([
robot_id_arg,
res_arg,
hb_arg,
hz_arg,
frontier_arg,
fleet_manager,
frontier_detector,
])

View File

@ -0,0 +1,86 @@
"""
fleet_robot.launch.py Per-robot fleet nodes (run on every robot in the fleet).
Starts:
map_broadcaster publish this robot's compressed map to /fleet/maps
fleet_explorer cooperative frontier exploration state machine
This launch file is namespace-aware: set robot_id and robot_namespace to
isolate each robot's internal topics while sharing /fleet/* topics.
Launch args:
robot_id str "saltybot_1"
robot_namespace str "/saltybot_1"
battery_topic str "" (optional Float32 battery percentage topic)
exploration bool "true" set false to broadcast map only, no exploration
Example (robot 1):
ros2 launch saltybot_fleet fleet_robot.launch.py robot_id:=saltybot_1 robot_namespace:=/saltybot_1
Example (robot 2):
ros2 launch saltybot_fleet fleet_robot.launch.py robot_id:=saltybot_2 robot_namespace:=/saltybot_2
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.conditions import IfCondition
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
robot_id_arg = DeclareLaunchArgument(
'robot_id', default_value='saltybot_1',
description='Unique robot identifier (no slashes)',
)
ns_arg = DeclareLaunchArgument(
'robot_namespace', default_value='/saltybot_1',
description='ROS2 namespace for this robot',
)
battery_arg = DeclareLaunchArgument(
'battery_topic', default_value='',
description='Optional Float32 battery percentage topic',
)
exploration_arg = DeclareLaunchArgument(
'exploration', default_value='true',
description='Enable cooperative exploration state machine',
)
map_broadcaster = Node(
package='saltybot_fleet',
executable='map_broadcaster',
name='map_broadcaster',
output='screen',
parameters=[{
'robot_id': LaunchConfiguration('robot_id'),
'robot_namespace': LaunchConfiguration('robot_namespace'),
'broadcast_hz': 1.0,
'battery_topic': LaunchConfiguration('battery_topic'),
}],
)
fleet_explorer = Node(
package='saltybot_fleet',
executable='fleet_explorer',
name='fleet_explorer',
output='screen',
condition=IfCondition(LaunchConfiguration('exploration')),
parameters=[{
'robot_id': LaunchConfiguration('robot_id'),
'robot_namespace': LaunchConfiguration('robot_namespace'),
'exploration_hz': 0.5,
'goal_tolerance_m': 0.5,
'nav_timeout_sec': 120.0,
'backoff_sec': 5.0,
'battery_min_pct': 20.0,
}],
)
return LaunchDescription([
robot_id_arg,
ns_arg,
battery_arg,
exploration_arg,
map_broadcaster,
fleet_explorer,
])

View File

@ -0,0 +1,40 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_fleet</name>
<version>0.1.0</version>
<description>
Multi-robot SLAM coordination for saltybot — map sharing over ROS2 DDS multicast,
cooperative frontier-based exploration with task allocation, and RTAB-Map
multi-session map merging between fleet members.
</description>
<maintainer email="seb@vayrette.com">seb</maintainer>
<license>MIT</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>ament_cmake_python</buildtool_depend>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>nav_msgs</depend>
<depend>geometry_msgs</depend>
<depend>sensor_msgs</depend>
<depend>tf2_ros</depend>
<depend>tf2_geometry_msgs</depend>
<depend>nav2_msgs</depend>
<depend>action_msgs</depend>
<depend>saltybot_fleet_msgs</depend>
<depend>saltybot_mapping</depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-scipy</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@ -0,0 +1,300 @@
"""
fleet_explorer_node.py Per-robot cooperative exploration state machine.
This node runs on each robot. It requests frontier assignments from the fleet
coordinator, drives the robot towards the assigned frontier using Nav2 NavigateToPose,
and marks the frontier complete once the robot arrives (or times out).
State machine:
IDLE request a frontier from /fleet/request_frontier
NAVIGATING send NavigateToPose goal; poll for completion
ARRIVED update /fleet/robots status; loop back to IDLE
STALLED robot stuck; wait backoff_sec, then request new frontier
DONE all frontiers exhausted; switch to MAPPING status
Subscribes:
/fleet/robots (RobotInfo) monitor other robots
/<ns>/rtabmap/odom (Odometry) current robot pose
Publishes:
/fleet/robots (RobotInfo) heartbeat with status updates
Actions:
/<ns>/navigate_to_pose (Nav2 NavigateToPose)
Services (client):
/fleet/request_frontier (RequestFrontier)
Parameters:
robot_id str "saltybot_1"
robot_namespace str "/saltybot_1"
exploration_hz float 0.5 (state machine tick rate)
goal_tolerance_m float 0.5 (distance to frontier centroid = arrived)
nav_timeout_sec float 120.0 (cancel goal and stall if Nav2 doesn't finish)
backoff_sec float 5.0 (wait after stall before new frontier request)
battery_min_pct float 20.0 (stop exploration below this return to dock)
"""
import math
import time
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from rclpy.qos import (
QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
)
from nav_msgs.msg import Odometry
from geometry_msgs.msg import PoseStamped, Point, Quaternion
from nav2_msgs.action import NavigateToPose
from saltybot_fleet_msgs.msg import RobotInfo, Frontier
from saltybot_fleet_msgs.srv import RequestFrontier
class _State:
IDLE = 'IDLE'
NAVIGATING = 'NAVIGATING'
ARRIVED = 'ARRIVED'
STALLED = 'STALLED'
DONE = 'DONE'
class FleetExplorerNode(Node):
def __init__(self):
super().__init__('fleet_explorer')
# ── Parameters ──────────────────────────────────────────────────────
self.declare_parameter('robot_id', 'saltybot_1')
self.declare_parameter('robot_namespace', '/saltybot_1')
self.declare_parameter('exploration_hz', 0.5)
self.declare_parameter('goal_tolerance_m', 0.5)
self.declare_parameter('nav_timeout_sec', 120.0)
self.declare_parameter('backoff_sec', 5.0)
self.declare_parameter('battery_min_pct', 20.0)
self._robot_id = self.get_parameter('robot_id').value
self._ns = self.get_parameter('robot_namespace').value
hz = self.get_parameter('exploration_hz').value
self._goal_tol = self.get_parameter('goal_tolerance_m').value
self._nav_timeout = self.get_parameter('nav_timeout_sec').value
self._backoff = self.get_parameter('backoff_sec').value
self._bat_min = self.get_parameter('battery_min_pct').value
# ── State ────────────────────────────────────────────────────────────
self._state = _State.IDLE
self._current_frontier: Frontier | None = None
self._nav_goal_handle = None
self._nav_start_time: float = 0.0
self._stall_until: float = 0.0
self._robot_x: float = 0.0
self._robot_y: float = 0.0
self._battery_pct: float = -1.0
# ── Subscriptions ────────────────────────────────────────────────────
odom_topic = f'{self._ns}/rtabmap/odom'.replace('//', '/')
self.create_subscription(Odometry, odom_topic, self._on_odom, 10)
self.create_subscription(RobotInfo, '/fleet/robots', self._on_fleet_info, 10)
# ── Publishers ───────────────────────────────────────────────────────
self._info_pub = self.create_publisher(RobotInfo, '/fleet/robots', 10)
# ── Service clients ──────────────────────────────────────────────────
self._frontier_client = self.create_client(
RequestFrontier, '/fleet/request_frontier'
)
# ── Nav2 action client ────────────────────────────────────────────────
nav_action = f'{self._ns}/navigate_to_pose'.replace('//', '/')
self._nav_client = ActionClient(self, NavigateToPose, nav_action)
# ── Timer ────────────────────────────────────────────────────────────
self.create_timer(1.0 / hz, self._explore_tick)
self.get_logger().info(
f'[{self._robot_id}] fleet_explorer ready (battery_min={self._bat_min}%)'
)
# ── Callbacks ────────────────────────────────────────────────────────────
def _on_odom(self, msg: Odometry) -> None:
p = msg.pose.pose.position
self._robot_x = p.x
self._robot_y = p.y
def _on_fleet_info(self, msg: RobotInfo) -> None:
# Track own battery from heartbeats of other nodes (e.g. battery_manager)
if msg.robot_id == self._robot_id:
self._battery_pct = msg.battery_pct
# ── State machine ─────────────────────────────────────────────────────────
def _explore_tick(self) -> None:
now = time.monotonic()
# Low battery — abort
if self._battery_pct > 0 and self._battery_pct < self._bat_min:
if self._state != _State.DONE:
self.get_logger().warning(
f'[{self._robot_id}] battery {self._battery_pct:.0f}% < '
f'{self._bat_min:.0f}% — stopping exploration'
)
self._cancel_nav()
self._state = _State.DONE
self._publish_info(RobotInfo.STATUS_RETURNING)
return
if self._state == _State.IDLE:
self._request_frontier()
elif self._state == _State.NAVIGATING:
self._check_navigation(now)
elif self._state == _State.ARRIVED:
self._on_arrived()
elif self._state == _State.STALLED:
if now >= self._stall_until:
self.get_logger().info(f'[{self._robot_id}] backoff over — requesting new frontier')
self._state = _State.IDLE
elif self._state == _State.DONE:
self._publish_info(RobotInfo.STATUS_IDLE)
def _request_frontier(self) -> None:
if not self._frontier_client.wait_for_service(timeout_sec=2.0):
self.get_logger().warning(
f'[{self._robot_id}] /fleet/request_frontier unavailable — retrying'
)
return
req = RequestFrontier.Request()
req.robot_id = self._robot_id
req.robot_pose_xy = Point(x=self._robot_x, y=self._robot_y, z=0.0)
future = self._frontier_client.call_async(req)
future.add_done_callback(self._on_frontier_response)
def _on_frontier_response(self, future) -> None:
try:
resp = future.result()
except Exception as exc:
self.get_logger().error(f'[{self._robot_id}] frontier request failed: {exc}')
return
if not resp.assigned:
self.get_logger().info(f'[{self._robot_id}] {resp.message} — exploration complete')
self._state = _State.DONE
return
self._current_frontier = resp.frontier
self.get_logger().info(
f'[{self._robot_id}] assigned frontier {resp.frontier.frontier_id} '
f'at ({resp.frontier.centroid.x:.2f}, {resp.frontier.centroid.y:.2f})'
)
self._send_nav_goal(resp.frontier)
def _send_nav_goal(self, frontier: Frontier) -> None:
if not self._nav_client.wait_for_server(timeout_sec=5.0):
self.get_logger().error(
f'[{self._robot_id}] Nav2 action server not available'
)
self._stall(reason='nav_server_missing')
return
goal = NavigateToPose.Goal()
goal.pose.header.frame_id = 'fleet_map'
goal.pose.header.stamp = self.get_clock().now().to_msg()
goal.pose.pose.position.x = frontier.centroid.x
goal.pose.pose.position.y = frontier.centroid.y
goal.pose.pose.orientation.w = 1.0
send_fut = self._nav_client.send_goal_async(goal)
send_fut.add_done_callback(self._on_nav_goal_accepted)
self._nav_start_time = time.monotonic()
self._state = _State.NAVIGATING
self._publish_info(RobotInfo.STATUS_EXPLORING)
def _on_nav_goal_accepted(self, future) -> None:
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().warning(f'[{self._robot_id}] Nav2 rejected goal')
self._stall(reason='goal_rejected')
return
self._nav_goal_handle = goal_handle
result_fut = goal_handle.get_result_async()
result_fut.add_done_callback(self._on_nav_result)
def _on_nav_result(self, future) -> None:
try:
result = future.result()
self.get_logger().info(
f'[{self._robot_id}] Nav2 completed frontier '
f'{self._current_frontier.frontier_id if self._current_frontier else "?"}'
)
self._state = _State.ARRIVED
except Exception as exc:
self.get_logger().warning(f'[{self._robot_id}] Nav2 result error: {exc}')
self._stall(reason='nav_error')
def _check_navigation(self, now: float) -> None:
# ── Timeout guard ────────────────────────────────────────────────────
if now - self._nav_start_time > self._nav_timeout:
self.get_logger().warning(
f'[{self._robot_id}] navigation timeout after {self._nav_timeout:.0f}s'
)
self._cancel_nav()
self._stall(reason='timeout')
return
# ── Arrival check (proximity) ────────────────────────────────────────
if self._current_frontier is not None:
dist = math.hypot(
self._current_frontier.centroid.x - self._robot_x,
self._current_frontier.centroid.y - self._robot_y,
)
if dist < self._goal_tol:
self._state = _State.ARRIVED
def _on_arrived(self) -> None:
fid = self._current_frontier.frontier_id if self._current_frontier else '?'
self.get_logger().info(f'[{self._robot_id}] arrived at frontier {fid}')
self._current_frontier = None
self._nav_goal_handle = None
self._state = _State.IDLE
self._publish_info(RobotInfo.STATUS_EXPLORING)
def _stall(self, reason: str = '') -> None:
self.get_logger().warning(f'[{self._robot_id}] stalled ({reason}), backing off {self._backoff}s')
self._state = _State.STALLED
self._stall_until = time.monotonic() + self._backoff
self._current_frontier = None
def _cancel_nav(self) -> None:
if self._nav_goal_handle is not None:
self._nav_goal_handle.cancel_goal_async()
self._nav_goal_handle = None
def _publish_info(self, status: int) -> None:
info = RobotInfo()
info.stamp = self.get_clock().now().to_msg()
info.robot_id = self._robot_id
info.namespace = self._ns
info.status = status
info.battery_pct = self._battery_pct
info.active_frontier = (
self._current_frontier.frontier_id
if self._current_frontier else ''
)
self._info_pub.publish(info)
def main(args=None):
rclpy.init(args=args)
node = FleetExplorerNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,302 @@
"""
fleet_manager_node.py Central coordinator for multi-robot fleet SLAM.
Run ONE instance on the "lead" robot (or a dedicated compute node).
Responsibilities:
1. Robot registry track active peers, detect departures via heartbeat timeout.
2. Map merging decompress MapChunks from all robots merged OccupancyGrid.
3. Transform alignment detect shared landmarks and compute map_to_fleet_tf
for each robot to align their local map frames.
4. Frontier aggregation collect FrontierArrays from all robots; de-duplicate;
publish consolidated /fleet/exploration_frontiers.
5. Frontier task allocation service: RequestFrontier assigns highest-utility
unassigned frontier nearest to the requesting robot; re-assigns stale claims.
Subscribes:
/fleet/maps (MapChunk) robots' compressed grids
/fleet/robots (RobotInfo) heartbeat / status
/fleet/exploration_frontiers_raw (FrontierArray) per-robot frontier reports
/<robot_ns>/rtabmap/mapData (rtabmap_msgs/MapData) for landmark extraction
Publishes:
/fleet/merged_map (OccupancyGrid, 2 Hz, TRANSIENT_LOCAL)
/fleet/exploration_frontiers (FrontierArray, 2 Hz)
/fleet/status (std_msgs/String, 1 Hz JSON)
Services:
/fleet/register_robot (RegisterRobot)
/fleet/request_frontier (RequestFrontier)
Parameters:
heartbeat_timeout_sec float 10.0
merge_hz float 2.0
frontier_timeout_sec float 60.0 (re-assign frontier after this if robot silent)
map_resolution float 0.05
"""
import json
import math
import time
from typing import Dict, List, Optional
import rclpy
from rclpy.node import Node
from rclpy.qos import (
QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
)
from nav_msgs.msg import OccupancyGrid
from std_msgs.msg import String
from saltybot_fleet_msgs.msg import (
MapChunk, RobotInfo, Frontier, FrontierArray
)
from saltybot_fleet_msgs.srv import RegisterRobot, RequestFrontier
from .map_compressor import decompress_grid
from .map_merger import MapMerger
_TRANSIENT_QOS = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=1,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
)
class _RobotRecord:
__slots__ = ('info', 'last_seen', 'chunk_seq')
def __init__(self, info: RobotInfo, ts: float):
self.info = info
self.last_seen = ts
self.chunk_seq = -1
class FleetManagerNode(Node):
def __init__(self):
super().__init__('fleet_manager')
# ── Parameters ──────────────────────────────────────────────────────
self.declare_parameter('heartbeat_timeout_sec', 10.0)
self.declare_parameter('merge_hz', 2.0)
self.declare_parameter('frontier_timeout_sec', 60.0)
self.declare_parameter('map_resolution', 0.05)
self._hb_timeout = self.get_parameter('heartbeat_timeout_sec').value
self._merge_hz = self.get_parameter('merge_hz').value
self._frontier_to = self.get_parameter('frontier_timeout_sec').value
self._resolution = self.get_parameter('map_resolution').value
# ── State ────────────────────────────────────────────────────────────
self._robots: Dict[str, _RobotRecord] = {}
self._frontiers: Dict[str, Frontier] = {} # frontier_id → Frontier
self._merger = MapMerger(resolution=self._resolution)
self._merged_grid: Optional[OccupancyGrid] = None
# ── Subscriptions ────────────────────────────────────────────────────
self.create_subscription(MapChunk, '/fleet/maps', self._on_map_chunk, 10)
self.create_subscription(RobotInfo, '/fleet/robots', self._on_robot_info, 10)
self.create_subscription(FrontierArray, '/fleet/exploration_frontiers_raw',
self._on_frontiers_raw, 10)
# ── Publishers ───────────────────────────────────────────────────────
self._merged_pub = self.create_publisher(OccupancyGrid, '/fleet/merged_map', _TRANSIENT_QOS)
self._frontier_pub = self.create_publisher(FrontierArray, '/fleet/exploration_frontiers', 10)
self._status_pub = self.create_publisher(String, '/fleet/status', 10)
# ── Services ─────────────────────────────────────────────────────────
self.create_service(RegisterRobot, '/fleet/register_robot', self._svc_register)
self.create_service(RequestFrontier, '/fleet/request_frontier', self._svc_request_frontier)
# ── Timers ────────────────────────────────────────────────────────────
self.create_timer(1.0 / self._merge_hz, self._merge_tick)
self.create_timer(1.0, self._status_tick)
self.get_logger().info('fleet_manager ready')
# ── Subscription callbacks ────────────────────────────────────────────────
def _on_map_chunk(self, msg: MapChunk) -> None:
rid = msg.robot_id
rec = self._robots.get(rid)
if rec is None:
# Auto-register on first chunk (robot may not have called service)
rec = _RobotRecord(RobotInfo(), time.monotonic())
rec.info.robot_id = rid
rec.info.namespace = f'/{rid}'
self._robots[rid] = rec
self.get_logger().info(f'[fleet] auto-registered robot {rid} from map chunk')
if msg.seq <= rec.chunk_seq:
return # duplicate or stale
rec.chunk_seq = msg.seq
try:
grid = decompress_grid(bytes(msg.compressed_map), frame_id=msg.map_frame)
except Exception as exc:
self.get_logger().warning(f'[fleet] decompress failed for {rid}: {exc}')
return
tf = msg.map_to_fleet_tf if msg.tf_valid else None
ok = self._merger.update(rid, grid, tf_valid=msg.tf_valid, tf=tf)
if not ok:
self.get_logger().warning(
f'[fleet] resolution mismatch from {rid} '
f'({grid.info.resolution:.4f} vs {self._resolution:.4f}) — skipped'
)
def _on_robot_info(self, msg: RobotInfo) -> None:
rid = msg.robot_id
now = time.monotonic()
if rid not in self._robots:
self._robots[rid] = _RobotRecord(msg, now)
self.get_logger().info(f'[fleet] discovered robot {rid}')
else:
self._robots[rid].info = msg
self._robots[rid].last_seen = now
def _on_frontiers_raw(self, msg: FrontierArray) -> None:
"""Merge per-robot frontier reports into the global frontier pool."""
now_stamp = self.get_clock().now().to_msg()
for f in msg.frontiers:
if f.frontier_id not in self._frontiers:
self._frontiers[f.frontier_id] = f
else:
# Update utility (robot may have moved closer/farther)
existing = self._frontiers[f.frontier_id]
if f.assigned_to == '':
existing.utility = f.utility
# ── Timer callbacks ───────────────────────────────────────────────────────
def _merge_tick(self) -> None:
# ── Expire timed-out robots ──────────────────────────────────────────
now = time.monotonic()
departed = [
rid for rid, rec in self._robots.items()
if now - rec.last_seen > self._hb_timeout
]
for rid in departed:
self.get_logger().warning(f'[fleet] robot {rid} timed out — removing')
del self._robots[rid]
self._merger.remove(rid)
# Release frontiers assigned to departed robot
for f in self._frontiers.values():
if f.assigned_to == rid:
f.assigned_to = ''
# ── Re-assign stale frontier claims ──────────────────────────────────
# (robot claimed a frontier but hasn't updated its RobotInfo recently)
for f in self._frontiers.values():
if not f.assigned_to:
continue
rec = self._robots.get(f.assigned_to)
if rec is None or now - rec.last_seen > self._frontier_to:
self.get_logger().info(
f'[fleet] frontier {f.frontier_id} re-assigned '
f'(robot {f.assigned_to} stale)'
)
f.assigned_to = ''
# ── Build merged map ─────────────────────────────────────────────────
merged = self._merger.build_merged_grid()
if merged:
merged.header.stamp = self.get_clock().now().to_msg()
self._merged_grid = merged
self._merged_pub.publish(merged)
# ── Publish frontier list ─────────────────────────────────────────────
if self._frontiers:
fa = FrontierArray()
fa.stamp = self.get_clock().now().to_msg()
fa.source_robot = 'fleet_manager'
fa.frontiers = list(self._frontiers.values())
self._frontier_pub.publish(fa)
def _status_tick(self) -> None:
status = {
'robots': list(self._robots.keys()),
'robot_count': len(self._robots),
'frontier_count': len(self._frontiers),
'assigned_frontiers': sum(
1 for f in self._frontiers.values() if f.assigned_to
),
'map_ready': self._merged_grid is not None,
}
msg = String()
msg.data = json.dumps(status)
self._status_pub.publish(msg)
# ── Service handlers ──────────────────────────────────────────────────────
def _svc_register(
self, req: RegisterRobot.Request, resp: RegisterRobot.Response
) -> RegisterRobot.Response:
rid = req.robot_id
# Detect namespace collision — assign suffix if needed
if rid in self._robots:
# Check if it's the same robot re-registering (same namespace)
if self._robots[rid].info.namespace != req.namespace:
suffix = 2
while f'{rid}_{suffix}' in self._robots:
suffix += 1
rid = f'{rid}_{suffix}'
self._robots[rid] = _RobotRecord(RobotInfo(), time.monotonic())
self._robots[rid].info.robot_id = rid
self._robots[rid].info.namespace = req.namespace
resp.accepted = True
resp.assigned_id = rid
resp.message = f'Registered as {rid}'
self.get_logger().info(f'[fleet] registered robot {rid} ns={req.namespace}')
return resp
def _svc_request_frontier(
self, req: RequestFrontier.Request, resp: RequestFrontier.Response
) -> RequestFrontier.Response:
rid = req.robot_id
rx, ry = req.robot_pose_xy.x, req.robot_pose_xy.y
# Pick highest-utility unassigned frontier (re-score by distance to this robot)
best: Optional[Frontier] = None
best_score = -1.0
for f in self._frontiers.values():
if f.assigned_to:
continue
dist = math.hypot(f.centroid.x - rx, f.centroid.y - ry) + 1.0
score = f.area_m2 / dist
if score > best_score:
best_score = score
best = f
if best is None:
resp.assigned = False
resp.message = 'No unassigned frontiers available'
return resp
best.assigned_to = rid
best.assigned_at = self.get_clock().now().to_msg()
resp.assigned = True
resp.frontier = best
resp.message = f'Assigned frontier {best.frontier_id}'
self.get_logger().info(
f'[fleet] assigned frontier {best.frontier_id} to {rid} '
f'(utility={best.utility:.2f})'
)
return resp
def main(args=None):
rclpy.init(args=args)
node = FleetManagerNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,125 @@
"""
frontier_detector.py Detect exploration frontiers in an OccupancyGrid.
A frontier cell is a FREE cell (0) that has at least one UNKNOWN neighbour (-1 / 255).
Connected frontier cells are clustered; each cluster becomes a Frontier message with
a centroid and area estimate.
Algorithm:
1. Convert OccupancyGrid.data to a 2-D numpy array.
2. Find all free cells adjacent to unknown cells (4-connectivity for border check).
3. Label connected components of frontier cells (8-connectivity for clustering).
4. Compute centroid and area per cluster; filter by minimum area.
5. Score each frontier: utility = area_m2 / (distance_to_robot + 1.0)
Typical call rate: ~1 Hz on the merged fleet map.
"""
from __future__ import annotations
import math
from typing import List, Optional, Tuple
import numpy as np
from nav_msgs.msg import OccupancyGrid
from geometry_msgs.msg import Point
from saltybot_fleet_msgs.msg import Frontier
_FREE = 0
_UNKNOWN = -1 # stored as 255 in uint8 arrays; we use int8 semantics
def _grid_to_array(grid: OccupancyGrid) -> np.ndarray:
"""Return (height, width) int8 array from OccupancyGrid.data."""
arr = np.array(grid.data, dtype=np.int8).reshape(grid.info.height, grid.info.width)
return arr
def detect_frontiers(
grid: OccupancyGrid,
robot_xy: Optional[Tuple[float, float]] = None,
min_area_cells: int = 10,
robot_id: str = '',
seq: int = 0,
) -> List[Frontier]:
"""
Detect frontier clusters in *grid* and return a list of Frontier messages.
Args:
grid: Source OccupancyGrid (merged or local).
robot_xy: (x, y) in map frame for distance-weighted utility scoring.
min_area_cells: Discard clusters smaller than this (noise filter).
robot_id: Used to build frontier_id strings.
seq: Monotonic sequence counter for stable IDs.
Returns:
List of Frontier messages sorted by utility descending.
"""
arr = _grid_to_array(grid)
h, w = arr.shape
res = grid.info.resolution
ox = grid.info.origin.position.x
oy = grid.info.origin.position.y
# ── Find frontier cells ────────────────────────────────────────────────
# A cell is a frontier if it is FREE and has at least one UNKNOWN neighbour.
free_mask = (arr == _FREE)
unknown_mask = (arr == _UNKNOWN)
# Shift unknown_mask in 4 directions to find free cells adjacent to unknown
has_unknown_n = np.zeros_like(unknown_mask)
has_unknown_s = np.zeros_like(unknown_mask)
has_unknown_e = np.zeros_like(unknown_mask)
has_unknown_w = np.zeros_like(unknown_mask)
has_unknown_n[1:, :] = unknown_mask[:-1, :]
has_unknown_s[:-1, :] = unknown_mask[1:, :]
has_unknown_e[:, :-1] = unknown_mask[:, 1:]
has_unknown_w[:, 1:] = unknown_mask[:, :-1]
frontier_mask = free_mask & (
has_unknown_n | has_unknown_s | has_unknown_e | has_unknown_w
)
if not frontier_mask.any():
return []
# ── Connected-component labelling (8-connectivity) ─────────────────────
from scipy.ndimage import label as scipy_label
struct = np.ones((3, 3), dtype=int)
labelled, num_features = scipy_label(frontier_mask, structure=struct)
frontiers: List[Frontier] = []
for cluster_id in range(1, num_features + 1):
cells = np.argwhere(labelled == cluster_id) # (row, col) pairs
n_cells = len(cells)
if n_cells < min_area_cells:
continue
# centroid in map frame
rows = cells[:, 0].astype(float)
cols = cells[:, 1].astype(float)
cx = ox + (cols.mean() + 0.5) * res
cy = oy + (rows.mean() + 0.5) * res
area_m2 = n_cells * res * res
# utility: larger + closer = better
if robot_xy is not None:
dist = math.hypot(cx - robot_xy[0], cy - robot_xy[1]) + 1.0
else:
dist = 1.0
utility = area_m2 / dist
f = Frontier()
f.frontier_id = f'{robot_id}_{seq}_{cluster_id}'
f.centroid = Point(x=cx, y=cy, z=0.0)
f.area_m2 = float(area_m2)
f.utility = float(utility)
f.assigned_to = ''
frontiers.append(f)
frontiers.sort(key=lambda x: x.utility, reverse=True)
return frontiers

View File

@ -0,0 +1,110 @@
"""
frontier_detector_node.py Detect exploration frontiers from the merged fleet map.
Subscribes:
/fleet/merged_map (OccupancyGrid, TRANSIENT_LOCAL) merged fleet map
Publishes:
/fleet/exploration_frontiers_raw (FrontierArray) raw detections sent to manager
/fleet/exploration_frontiers (FrontierArray) also publish locally (for viz)
Parameters:
robot_id str "saltybot_1"
detection_hz float 1.0
min_frontier_cells int 10 (filter noise clusters smaller than this)
robot_pose_topic str "" (optional geometry_msgs/PoseStamped for scoring)
"""
import rclpy
from rclpy.node import Node
from rclpy.qos import (
QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
)
from nav_msgs.msg import OccupancyGrid
from geometry_msgs.msg import PoseStamped
from saltybot_fleet_msgs.msg import FrontierArray
from .frontier_detector import detect_frontiers
_TRANSIENT_QOS = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=1,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
)
class FrontierDetectorNode(Node):
def __init__(self):
super().__init__('frontier_detector')
self.declare_parameter('robot_id', 'saltybot_1')
self.declare_parameter('detection_hz', 1.0)
self.declare_parameter('min_frontier_cells', 10)
self.declare_parameter('robot_pose_topic', '')
self._robot_id = self.get_parameter('robot_id').value
hz = self.get_parameter('detection_hz').value
self._min_cells = self.get_parameter('min_frontier_cells').value
pose_topic = self.get_parameter('robot_pose_topic').value
self._grid: OccupancyGrid | None = None
self._robot_xy: tuple | None = None
self._detect_seq: int = 0
self.create_subscription(OccupancyGrid, '/fleet/merged_map', self._on_map, _TRANSIENT_QOS)
if pose_topic:
self.create_subscription(PoseStamped, pose_topic, self._on_pose, 10)
self._raw_pub = self.create_publisher(FrontierArray, '/fleet/exploration_frontiers_raw', 10)
self._local_pub = self.create_publisher(FrontierArray, '/fleet/exploration_frontiers', 10)
self.create_timer(1.0 / hz, self._detect_tick)
self.get_logger().info(f'[{self._robot_id}] frontier_detector ready @ {hz:.1f} Hz')
def _on_map(self, msg: OccupancyGrid) -> None:
self._grid = msg
def _on_pose(self, msg: PoseStamped) -> None:
p = msg.pose.position
self._robot_xy = (p.x, p.y)
def _detect_tick(self) -> None:
if self._grid is None:
return
self._detect_seq += 1
frontiers = detect_frontiers(
self._grid,
robot_xy=self._robot_xy,
min_area_cells=self._min_cells,
robot_id=self._robot_id,
seq=self._detect_seq,
)
if not frontiers:
return
fa = FrontierArray()
fa.stamp = self.get_clock().now().to_msg()
fa.source_robot = self._robot_id
fa.frontiers = frontiers
self._raw_pub.publish(fa)
self._local_pub.publish(fa)
self.get_logger().debug(
f'[{self._robot_id}] detected {len(frontiers)} frontiers'
)
def main(args=None):
rclpy.init(args=args)
node = FrontierDetectorNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,155 @@
"""
landmark_aligner.py Estimate SE(2) transforms between robot map frames
using common landmark observations.
Each robot's RTAB-Map publishes loop-closure keyframes with GPS-less
"global" constraints derived from visual features. When two robots
independently observe the same physical landmark (ArUco marker, stable
visual feature cluster), their descriptor vectors will match and we can
compute the rigid transform that maps robot-A's map frame to robot-B's
(or the common fleet_map frame).
Approach used here:
- Robots publish detected ArUco marker poses on /<ns>/landmarks
(geometry_msgs/PoseArray, frame = <ns>/map).
- The LandmarkAligner collects observations per marker_id from all robots.
- When the same marker_id has been seen by 2 robots, solve for the
SE(2) transform between each robot's map frame and the canonical
fleet_map frame (defined as robot_1's map frame by convention).
- Transform is published as a geometry_msgs/TransformStamped and also
returned as a geometry_msgs/Transform for embedding in MapChunk.
Landmark topic convention:
/<robot_ns>/landmarks PoseArray
- poses[i].position.x/y = landmark centroid in robot's map frame
- poses[i].position.z = float-encoded marker_id (cast to int)
SE(2) estimation:
Uses the Umeyama / Horn algorithm on 2-D point correspondences.
Rotation + translation only (no scale).
Module is standalone no ROS node, just math helpers called from
fleet_manager_node.py.
"""
from __future__ import annotations
import math
from typing import Dict, List, Optional, Tuple
import numpy as np
# robot_id → {marker_id → (x, y) in that robot's map frame}
_LandmarkObs = Dict[str, Dict[int, Tuple[float, float]]]
def _se2_from_correspondences(
src_pts: np.ndarray, # (N, 2) points in source frame
dst_pts: np.ndarray, # (N, 2) points in destination frame
) -> Tuple[float, float, float]:
"""
Compute SE(2) transform (tx, ty, yaw) that maps src_pts dst_pts.
Uses the closed-form Horn least-squares solution for 2-D rigid body.
Returns (tx, ty, yaw).
"""
assert src_pts.shape == dst_pts.shape and src_pts.shape[1] == 2
n = len(src_pts)
if n < 2:
raise ValueError('Need ≥ 2 correspondences for SE(2) estimation')
# Centre the point sets
c_src = src_pts.mean(axis=0)
c_dst = dst_pts.mean(axis=0)
s = src_pts - c_src
d = dst_pts - c_dst
# Cross-covariance H = S^T D
H = s.T @ d # (2, 2)
# Rotation via atan2 of the off-diagonal elements of H
yaw = math.atan2(H[0, 1] - H[1, 0], H[0, 0] + H[1, 1])
c, ss = math.cos(yaw), math.sin(yaw)
R = np.array([[c, -ss], [ss, c]])
t = c_dst - R @ c_src
return float(t[0]), float(t[1]), float(yaw)
def _build_transform(tx: float, ty: float, yaw: float):
"""Return geometry_msgs/Transform-like dict."""
from geometry_msgs.msg import Transform, Vector3, Quaternion
tf = Transform()
tf.translation.x = tx
tf.translation.y = ty
tf.translation.z = 0.0
tf.rotation.w = math.cos(yaw * 0.5)
tf.rotation.z = math.sin(yaw * 0.5)
return tf
class LandmarkAligner:
"""
Collects landmark observations from multiple robots and estimates
the per-robot transform into the fleet_map frame.
fleet_map is defined as robot_1's map frame (robot_id with the
lowest registration sequence number in the coordinator).
"""
def __init__(self, reference_robot: str):
"""
Args:
reference_robot: robot_id whose map frame IS fleet_map.
Other robots are aligned to this reference.
"""
self._reference = reference_robot
self._obs: _LandmarkObs = {} # robot_id → {marker_id → (x, y)}
# robot_id → (tx, ty, yaw, valid)
self._transforms: Dict[str, Tuple[float, float, float, bool]] = {}
def update_observations(
self,
robot_id: str,
landmarks: List[Tuple[int, float, float]], # [(marker_id, x, y), ...]
) -> None:
"""Record landmark observations from a robot."""
if robot_id not in self._obs:
self._obs[robot_id] = {}
for mid, x, y in landmarks:
self._obs[robot_id][mid] = (x, y)
self._recompute(robot_id)
def _recompute(self, updated_robot: str) -> None:
"""Recompute transform for updated_robot against the reference."""
if updated_robot == self._reference:
self._transforms[updated_robot] = (0.0, 0.0, 0.0, True)
return
ref_obs = self._obs.get(self._reference)
rob_obs = self._obs.get(updated_robot)
if not ref_obs or not rob_obs:
return
# Find shared marker IDs
common = set(ref_obs.keys()) & set(rob_obs.keys())
if len(common) < 2:
return # insufficient correspondences
src = np.array([rob_obs[mid] for mid in common])
dst = np.array([ref_obs[mid] for mid in common])
try:
tx, ty, yaw = _se2_from_correspondences(src, dst)
except ValueError:
return
self._transforms[updated_robot] = (tx, ty, yaw, True)
def get_transform(self, robot_id: str):
"""
Return geometry_msgs/Transform for robot_id's map → fleet_map.
Returns (transform, valid:bool).
"""
entry = self._transforms.get(robot_id)
if entry is None or not entry[3]:
from geometry_msgs.msg import Transform
return Transform(), False
tx, ty, yaw, _ = entry
return _build_transform(tx, ty, yaw), True
def aligned_robots(self) -> List[str]:
return [rid for rid, t in self._transforms.items() if t[3]]

View File

@ -0,0 +1,157 @@
"""
map_broadcaster_node.py Publish this robot's compressed occupancy grid to the fleet.
Subscribes:
/<ns>/rtabmap/map (nav_msgs/OccupancyGrid, TRANSIENT_LOCAL)
Publishes:
/fleet/maps (saltybot_fleet_msgs/MapChunk, 1 Hz)
/fleet/robots (saltybot_fleet_msgs/RobotInfo, 1 Hz)
Parameters:
robot_id str "saltybot_1"
robot_namespace str "/saltybot_1"
broadcast_hz float 1.0 (map publish rate compress-on-change, rate-limited)
battery_topic str "" (optional: std_msgs/Float32 battery percentage)
Map is only re-compressed and published when it changes (seq advances), but the
RobotInfo heartbeat is always published at broadcast_hz so peers detect presence.
"""
import time
import rclpy
from rclpy.node import Node
from rclpy.qos import (
QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
)
from std_msgs.msg import Float32
from nav_msgs.msg import OccupancyGrid
from builtin_interfaces.msg import Time as RosTime
from geometry_msgs.msg import PoseStamped, Pose, Point, Quaternion
from saltybot_fleet_msgs.msg import MapChunk, RobotInfo
from .map_compressor import compress_grid
_LATCHED_QOS = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=1,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
)
_BEST_EFFORT_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
class MapBroadcasterNode(Node):
def __init__(self):
super().__init__('map_broadcaster')
# ── Parameters ──────────────────────────────────────────────────────
self.declare_parameter('robot_id', 'saltybot_1')
self.declare_parameter('robot_namespace', '/saltybot_1')
self.declare_parameter('broadcast_hz', 1.0)
self.declare_parameter('battery_topic', '')
self._robot_id = self.get_parameter('robot_id').value
self._ns = self.get_parameter('robot_namespace').value
hz = self.get_parameter('broadcast_hz').value
bat_topic = self.get_parameter('battery_topic').value
# ── State ────────────────────────────────────────────────────────────
self._last_grid: OccupancyGrid | None = None
self._last_compressed: bytes = b''
self._last_uncomp_sz: int = 0
self._map_seq: int = 0
self._battery_pct: float = -1.0
self._keyframe_count: int = 0
# ── Subscribers ──────────────────────────────────────────────────────
map_topic = f'{self._ns}/rtabmap/map'.replace('//', '/')
self.create_subscription(
OccupancyGrid, map_topic, self._on_map, _LATCHED_QOS
)
if bat_topic:
self.create_subscription(
Float32, bat_topic, self._on_battery, _BEST_EFFORT_QOS
)
# ── Publishers ───────────────────────────────────────────────────────
self._chunk_pub = self.create_publisher(MapChunk, '/fleet/maps', 10)
self._info_pub = self.create_publisher(RobotInfo, '/fleet/robots', 10)
# ── Timer ────────────────────────────────────────────────────────────
self.create_timer(1.0 / hz, self._broadcast_tick)
self.get_logger().info(
f'[{self._robot_id}] map_broadcaster ready — '
f'subscribing to {map_topic}, publishing to /fleet/maps @ {hz:.1f} Hz'
)
# ── Callbacks ────────────────────────────────────────────────────────────
def _on_map(self, msg: OccupancyGrid) -> None:
self._last_grid = msg
# Re-compress immediately; keep bytes ready for next publish tick
compressed, sz = compress_grid(msg)
self._last_compressed = compressed
self._last_uncomp_sz = sz
self._map_seq += 1
def _on_battery(self, msg: Float32) -> None:
self._battery_pct = float(msg.data)
def _broadcast_tick(self) -> None:
now = self.get_clock().now().to_msg()
# ── RobotInfo heartbeat ──────────────────────────────────────────────
info = RobotInfo()
info.stamp = now
info.robot_id = self._robot_id
info.namespace = self._ns
info.status = RobotInfo.STATUS_MAPPING
info.battery_pct = self._battery_pct
info.keyframe_count = self._keyframe_count
if self._last_grid:
cells = self._last_grid.info.width * self._last_grid.info.height
res = self._last_grid.info.resolution
info.map_coverage_m2 = float(cells * res * res)
self._info_pub.publish(info)
# ── MapChunk — only publish when we have data ────────────────────────
if not self._last_compressed:
return
grid = self._last_grid
chunk = MapChunk()
chunk.stamp = now
chunk.robot_id = self._robot_id
chunk.map_frame = grid.header.frame_id if grid else 'map'
chunk.seq = self._map_seq
chunk.compressed_map = list(self._last_compressed)
chunk.uncompressed_size = self._last_uncomp_sz
chunk.resolution = grid.info.resolution if grid else 0.05
chunk.origin_x = grid.info.origin.position.x if grid else 0.0
chunk.origin_y = grid.info.origin.position.y if grid else 0.0
chunk.width = grid.info.width if grid else 0
chunk.height = grid.info.height if grid else 0
chunk.tf_valid = False # coordinator sets this after alignment
self._chunk_pub.publish(chunk)
def main(args=None):
rclpy.init(args=args)
node = MapBroadcasterNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,89 @@
"""
map_compressor.py Serialise / compress / decompress OccupancyGrid for fleet transport.
OccupancyGrid serialisation format (little-endian binary):
4B resolution (float32)
4B origin_x (float32)
4B origin_y (float32)
4B origin_yaw (float32) extracted from quaternion
4B width (uint32)
4B height (uint32)
N data (int8 × width*height)
All compressed with zlib level-6. Decompression restores a nav_msgs/OccupancyGrid
with the same metadata so the receiver can directly publish / merge it.
"""
import struct
import zlib
import math
from typing import Tuple
from nav_msgs.msg import OccupancyGrid, MapMetaData
from geometry_msgs.msg import Pose, Point, Quaternion
_HEADER_FMT = '<ffffII' # resolution, ox, oy, oyaw, width, height
_HEADER_SIZE = struct.calcsize(_HEADER_FMT)
def _quat_to_yaw(q: Quaternion) -> float:
siny = 2.0 * (q.w * q.z + q.x * q.y)
cosy = 1.0 - 2.0 * (q.y * q.y + q.z * q.z)
return math.atan2(siny, cosy)
def _yaw_to_quat(yaw: float) -> Quaternion:
q = Quaternion()
q.w = math.cos(yaw * 0.5)
q.z = math.sin(yaw * 0.5)
return q
def compress_grid(grid: OccupancyGrid) -> Tuple[bytes, int]:
"""
Serialise and zlib-compress an OccupancyGrid.
Returns:
(compressed_bytes, uncompressed_size)
"""
info = grid.info
yaw = _quat_to_yaw(info.origin.orientation)
header = struct.pack(
_HEADER_FMT,
info.resolution,
info.origin.position.x,
info.origin.position.y,
yaw,
info.width,
info.height,
)
data = bytes([v & 0xFF for v in grid.data]) # int8 → uint8 for bytes()
raw = header + data
return zlib.compress(raw, level=6), len(raw)
def decompress_grid(compressed: bytes, frame_id: str = 'map') -> OccupancyGrid:
"""
Decompress and deserialise bytes produced by compress_grid() into an OccupancyGrid.
"""
raw = zlib.decompress(compressed)
resolution, ox, oy, oyaw, width, height = struct.unpack_from(_HEADER_FMT, raw, 0)
data_bytes = raw[_HEADER_SIZE:]
grid = OccupancyGrid()
grid.header.frame_id = frame_id
grid.info.resolution = resolution
grid.info.width = width
grid.info.height = height
grid.info.origin = Pose(
position=Point(x=float(ox), y=float(oy), z=0.0),
orientation=_yaw_to_quat(oyaw),
)
# Re-sign: stored as uint8, OccupancyGrid.data is int8 list
grid.data = [int(b) if b < 128 else int(b) - 256 for b in data_bytes]
return grid
def compression_ratio(compressed: bytes, uncompressed_size: int) -> float:
return len(compressed) / max(uncompressed_size, 1)

View File

@ -0,0 +1,185 @@
"""
map_merger.py Merge multiple OccupancyGrids into a single fleet-wide map.
Each robot publishes its local occupancy grid compressed on /fleet/maps.
The coordinator decompresses them and calls merge_grids() to produce a unified
merged map suitable for frontier detection and visualisation on /fleet/merged_map.
Merge strategy:
- Compute the bounding box that covers all incoming grids (same resolution required).
- Accumulate probability estimates per cell: average known values; unknown (-1) only
fills cells not covered by any robot.
- Occupied (> 65) beats free (065) when two robots disagree on a cell conservative
obstacle inflation to prevent Nav2 from driving into conflicting-report zones.
Transform alignment:
- Each MapChunk carries a map_to_fleet_tf transform (set by the coordinator after
landmark matching). If tf_valid=true the grid is rotated/translated into the
common fleet frame before merging.
- Until tf_valid is set, the grid is assumed already in the common frame (works
when all robots start from the same origin or when only one robot is active).
Resolution constraint:
- All grids MUST share the same resolution. If they differ, the chunk is discarded
and a warning is logged. Re-parameterise RTAB-Map Grid/CellSize to enforce 0.05m.
"""
from __future__ import annotations
import math
from typing import Dict, List, Tuple
import numpy as np
from nav_msgs.msg import OccupancyGrid
from geometry_msgs.msg import Pose, Point, Quaternion, Transform
_FREE = 0
_OCCUPIED = 100
_UNKNOWN = -1
def _transform_to_matrix(tf: Transform) -> np.ndarray:
"""Return 3×3 SE(2) homogeneous matrix from a geometry_msgs/Transform."""
tx = tf.translation.x
ty = tf.translation.y
q = tf.rotation
yaw = math.atan2(
2.0 * (q.w * q.z + q.x * q.y),
1.0 - 2.0 * (q.y * q.y + q.z * q.z),
)
c, s = math.cos(yaw), math.sin(yaw)
return np.array([
[c, -s, tx],
[s, c, ty],
[0, 0, 1],
])
def _grid_bounds(grid: OccupancyGrid, tf_mat: np.ndarray) -> Tuple[float, float, float, float]:
"""Return (min_x, min_y, max_x, max_y) of grid corners after transform."""
info = grid.info
ox, oy = info.origin.position.x, info.origin.position.y
w, h = info.width * info.resolution, info.height * info.resolution
corners = np.array([
[ox, oy, 1],
[ox + w, oy, 1],
[ox + w, oy + h, 1],
[ox, oy + h, 1],
]).T # shape (3, 4)
transformed = tf_mat @ corners # shape (3, 4)
xs, ys = transformed[0], transformed[1]
return float(xs.min()), float(ys.min()), float(xs.max()), float(ys.max())
class MapMerger:
"""
Accumulates OccupancyGrids from multiple robots and produces a merged map.
Usage:
merger = MapMerger(resolution=0.05)
merger.update('robot_1', grid1, tf_valid=False)
merger.update('robot_2', grid2, tf_valid=True, tf=some_transform)
merged = merger.build_merged_grid()
"""
def __init__(self, resolution: float = 0.05):
self._resolution = resolution
# robot_id → (grid: OccupancyGrid, tf_matrix: np.ndarray)
self._grids: Dict[str, Tuple[OccupancyGrid, np.ndarray]] = {}
def update(
self,
robot_id: str,
grid: OccupancyGrid,
tf_valid: bool = False,
tf: Transform | None = None,
) -> bool:
"""
Register or update a robot's latest grid. Returns False if resolution mismatch.
"""
if abs(grid.info.resolution - self._resolution) > 1e-5:
return False
tf_mat = _transform_to_matrix(tf) if (tf_valid and tf is not None) else np.eye(3)
self._grids[robot_id] = (grid, tf_mat)
return True
def remove(self, robot_id: str) -> None:
self._grids.pop(robot_id, None)
def build_merged_grid(self) -> OccupancyGrid | None:
"""
Merge all registered grids into a single OccupancyGrid.
Returns None if no grids are registered.
"""
if not self._grids:
return None
res = self._resolution
# ── Compute merged bounding box ────────────────────────────────────
min_x = min_y = math.inf
max_x = max_y = -math.inf
for grid, tf_mat in self._grids.values():
bx0, by0, bx1, by1 = _grid_bounds(grid, tf_mat)
min_x = min(min_x, bx0)
min_y = min(min_y, by0)
max_x = max(max_x, bx1)
max_y = max(max_y, by1)
W = int(math.ceil((max_x - min_x) / res)) + 2
H = int(math.ceil((max_y - min_y) / res)) + 2
# Accumulators: sum of known values, count of known observations
acc = np.full((H, W), 0.0, dtype=np.float32)
count = np.zeros((H, W), dtype=np.int32)
# ── Paint each robot's grid into the merged canvas ─────────────────
for grid, tf_mat in self._grids.values():
src = np.array(grid.data, dtype=np.int8).reshape(
grid.info.height, grid.info.width
)
ox = grid.info.origin.position.x
oy = grid.info.origin.position.y
rows, cols = np.where(src != _UNKNOWN)
values = src[rows, cols].astype(np.float32)
# Cell centre in source grid frame
px = ox + (cols + 0.5) * res
py = oy + (rows + 0.5) * res
ones = np.ones_like(px)
pts = np.stack([px, py, ones], axis=0) # (3, N)
transformed = tf_mat @ pts # (3, N)
# Target cell indices in merged canvas
tcol = ((transformed[0] - min_x) / res).astype(int)
trow = ((transformed[1] - min_y) / res).astype(int)
valid = (tcol >= 0) & (tcol < W) & (trow >= 0) & (trow < H)
tcol, trow, values = tcol[valid], trow[valid], values[valid]
# Conservative merge: occupied wins over free in conflicts
existing_occ = (acc[trow, tcol] > 65) & (count[trow, tcol] > 0)
incoming_occ = values > 65
# For already-occupied cells don't average down with free
update_mask = ~existing_occ | incoming_occ
acc[trow[update_mask], tcol[update_mask]] += values[update_mask]
count[trow[update_mask], tcol[update_mask]] += 1
# ── Build output OccupancyGrid ─────────────────────────────────────
merged = OccupancyGrid()
merged.header.frame_id = 'fleet_map'
merged.info.resolution = res
merged.info.width = W
merged.info.height = H
merged.info.origin.position.x = min_x
merged.info.origin.position.y = min_y
merged.info.origin.orientation.w = 1.0
merged_arr = np.full((H, W), _UNKNOWN, dtype=np.int8)
known = count > 0
merged_arr[known] = np.clip(
(acc[known] / count[known]).astype(np.int8), 0, 100
)
merged.data = merged_arr.flatten().tolist()
return merged

View File

@ -0,0 +1,2 @@
# Entry points registered in setup.py / CMakeLists via ament_python_install_package.
# This file is a marker only — actual entry points are declared in setup.py below.

View File

@ -0,0 +1,104 @@
#!/usr/bin/env python3
"""
fleet_status.py CLI to display live fleet status.
Usage:
ros2 run saltybot_fleet fleet_status
ros2 run saltybot_fleet fleet_status -- --once
Subscribes to:
/fleet/status (std_msgs/String JSON) coordinator health
/fleet/robots (RobotInfo) per-robot status
/fleet/exploration_frontiers (FrontierArray) open frontiers
"""
import argparse
import json
import sys
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from saltybot_fleet_msgs.msg import RobotInfo, FrontierArray
_STATUS_LABELS = {
RobotInfo.STATUS_IDLE: 'IDLE',
RobotInfo.STATUS_MAPPING: 'MAPPING',
RobotInfo.STATUS_EXPLORING: 'EXPLORING',
RobotInfo.STATUS_RETURNING: 'RETURNING',
RobotInfo.STATUS_LOST: 'LOST',
}
class FleetStatusCLI(Node):
def __init__(self, once: bool):
super().__init__('fleet_status_cli')
self._once = once
self._printed = False
self._fleet_status: dict = {}
self._robots: dict = {}
self._frontier_count: int = 0
self.create_subscription(String, '/fleet/status', self._on_status, 10)
self.create_subscription(RobotInfo, '/fleet/robots', self._on_robot, 10)
self.create_subscription(FrontierArray,'/fleet/exploration_frontiers',self._on_frontiers,10)
self.create_timer(2.0, self._print_tick)
def _on_status(self, msg: String) -> None:
try:
self._fleet_status = json.loads(msg.data)
except json.JSONDecodeError:
pass
def _on_robot(self, msg: RobotInfo) -> None:
self._robots[msg.robot_id] = msg
def _on_frontiers(self, msg: FrontierArray) -> None:
self._frontier_count = len(msg.frontiers)
def _print_tick(self) -> None:
print('\033[H\033[J', end='') # clear screen
print('' * 60)
print(' SaltyBot Fleet Status')
print('' * 60)
print(f' Robots in fleet : {len(self._robots)}')
print(f' Open frontiers : {self._frontier_count}')
print()
for rid, info in sorted(self._robots.items()):
status_str = _STATUS_LABELS.get(info.status, '?')
bat = f'{info.battery_pct:.0f}%' if info.battery_pct >= 0 else '?'
frontier = info.active_frontier or ''
print(f' {rid:<20} {status_str:<12} bat={bat:<6} frontier={frontier}')
if self._fleet_status:
print()
print(f' Coordinator: {json.dumps(self._fleet_status, indent=2)}')
print('' * 60)
if self._once and not self._printed:
self._printed = True
raise SystemExit(0)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--once', action='store_true', help='Print once and exit')
args, ros_args = parser.parse_known_args()
rclpy.init(args=ros_args)
node = FleetStatusCLI(once=args.once)
try:
rclpy.spin(node)
except SystemExit:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,28 @@
from setuptools import setup, find_packages
package_name = 'saltybot_fleet'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
[f'resource/{package_name}']),
(f'share/{package_name}', ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='seb',
maintainer_email='seb@vayrette.com',
description='Multi-robot SLAM coordination for saltybot',
license='MIT',
entry_points={
'console_scripts': [
'map_broadcaster = saltybot_fleet.map_broadcaster_node:main',
'fleet_manager = saltybot_fleet.fleet_manager_node:main',
'frontier_detector = saltybot_fleet.frontier_detector_node:main',
'fleet_explorer = saltybot_fleet.fleet_explorer_node:main',
],
},
)

View File

@ -0,0 +1,24 @@
cmake_minimum_required(VERSION 3.8)
project(saltybot_fleet_msgs)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(std_msgs REQUIRED)
find_package(nav_msgs REQUIRED)
find_package(geometry_msgs REQUIRED)
find_package(builtin_interfaces REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
# Messages
"msg/RobotInfo.msg"
"msg/MapChunk.msg"
"msg/Frontier.msg"
"msg/FrontierArray.msg"
# Services
"srv/RegisterRobot.srv"
"srv/RequestFrontier.srv"
DEPENDENCIES std_msgs nav_msgs geometry_msgs builtin_interfaces
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()

View File

@ -0,0 +1,8 @@
# A single exploration frontier cell cluster.
string frontier_id # unique ID: "<robot_id>_<seq>"
geometry_msgs/Point centroid # in fleet common frame
float32 area_m2 # approximate size of frontier cluster
float32 utility # combined score (size × 1/distance)
string assigned_to # robot_id if claimed, "" if open
builtin_interfaces/Time assigned_at

View File

@ -0,0 +1,3 @@
builtin_interfaces/Time stamp
string source_robot # robot that detected these frontiers
Frontier[] frontiers

View File

@ -0,0 +1,25 @@
# Compressed occupancy grid chunk — published to /fleet/maps by each robot.
# Receivers merge this into the fleet-wide map.
builtin_interfaces/Time stamp
string robot_id # originating robot
string map_frame # typically "map" in robot's namespace
uint32 seq # monotonically increasing per robot
# Compressed OccupancyGrid (zlib-deflate of nav_msgs/OccupancyGrid serialisation)
# Full message serialisation preserves header, info (resolution, origin, width, height)
# and data fields. Receivers decompress with zlib.inflate.
uint8[] compressed_map # zlib-compressed serialised OccupancyGrid
uint32 uncompressed_size
# Map metadata (duplicated for quick filtering without decompression)
float32 resolution # metres/cell
float32 origin_x # map origin in robot's map frame
float32 origin_y
uint32 width # cells
uint32 height # cells
# Transform: robot_map_frame → fleet common frame (filled by coordinator)
# identity until landmark matching aligns the frames
geometry_msgs/Transform map_to_fleet_tf
bool tf_valid # false until transform is established

View File

@ -0,0 +1,20 @@
# Robot discovery and status — published to /fleet/robots at 1 Hz by each robot.
# Used for DDS peer discovery without a central broker.
builtin_interfaces/Time stamp
string robot_id # e.g. "saltybot_1"
string namespace # e.g. "/saltybot_1"
geometry_msgs/PoseStamped pose # current best-estimate pose (map frame)
# Status flags
uint8 STATUS_IDLE = 0
uint8 STATUS_MAPPING = 1
uint8 STATUS_EXPLORING = 2
uint8 STATUS_RETURNING = 3
uint8 STATUS_LOST = 4
uint8 status
float32 battery_pct # 0.0100.0; -1 = unknown
float32 map_coverage_m2 # explored area in square metres
uint32 keyframe_count # RTAB-Map keyframes in local DB
string active_frontier # frontier ID currently being explored, "" if none

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_fleet_msgs</name>
<version>0.1.0</version>
<description>ROS2 message and service definitions for saltybot multi-robot fleet coordination</description>
<maintainer email="seb@vayrette.com">seb</maintainer>
<license>MIT</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<depend>std_msgs</depend>
<depend>nav_msgs</depend>
<depend>geometry_msgs</depend>
<depend>builtin_interfaces</depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@ -0,0 +1,7 @@
# Called by a robot on first contact with the fleet coordinator.
string robot_id
string namespace
---
bool accepted
string message
string assigned_id # coordinator may reassign numeric suffix to avoid conflicts

View File

@ -0,0 +1,7 @@
# Robot requests its next exploration frontier from the coordinator.
string robot_id
geometry_msgs/Point robot_pose_xy # current x,y for distance-weighted allocation
---
bool assigned
saltybot_fleet_msgs/Frontier frontier # valid only if assigned=true
string message