feat: route recording + autonomous replay (#71)

Implements Phase 3 ride-once-replay-forever route system.

saltybot_routes package:
- route_recorder_node: samples GPS+odom+heading at 1Hz during follow-me
  rides; 2m waypoint spacing; JSON-Lines .jsonl on NVMe /data/routes/;
  services start_recording/stop_recording/save/discard
- route_replayer_node: loads .jsonl, GPS->ENU flat-earth conversion,
  heading->quaternion, 3m subsampling for Nav2 navigate_through_poses;
  2m GPS tolerance (SIM7600X +-2.5m); pause/resume/stop services
- route_manager_node: list/info/delete services for saved routes
- route_system.launch.py: all three nodes with shared params
- route_params.yaml: waypoint_spacing_m=2.0, replay_spacing_m=3.0

GPS: /gps/fix from SIM7600X (PR #65)
UWB: /uwb/target from follow-me (PR #66)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-perception 2026-03-01 01:07:06 -05:00
parent dcc26e6937
commit 5dcaa7bd49
10 changed files with 1106 additions and 0 deletions

View File

@ -0,0 +1,83 @@
# route_params.yaml — Route recording and replay configuration for SaltyBot
#
# "Ride once, replay forever."
#
# Flow:
# 1. Tee rides EUC, SaltyBot follows via UWB follow-me (PR #66)
# 2. route_recorder_node samples GPS + odom → /data/routes/<name>.jsonl
# 3. Next ride: route_replayer_node loads file → Nav2 navigate_through_poses
#
# GPS source: SIM7600X → /gps/fix (NavSatFix, ±2.5m CEP) — PR #65
# Heading: D435i IMU → /imu/data, converted yaw → route waypoint heading_deg
# Odometry: STM32 wheel encoders → /odom
# UWB: /uwb/target (follow-me reference, logged for context)
route_recorder:
ros__parameters:
save_dir: "/data/routes"
route_name: "route" # override via param or launch arg
waypoint_spacing_m: 2.0 # skip GPS samples closer than this
sample_rate_hz: 1.0 # max recording frequency (GPS ~1Hz)
min_fix_status: 0 # 0=STATUS_FIX; reject STATUS_NO_FIX (-1)
route_replayer:
ros__parameters:
save_dir: "/data/routes"
route_name: "route"
replay_spacing_m: 3.0 # subsample waypoints for Nav2 (3m spacing)
goal_tolerance_xy: 2.0 # metres — matches SIM7600X ±2.5m CEP
replay_speed_factor: 1.0 # 1.0 = same speed as recorded
map_frame: "map"
nav_timeout_s: 600.0 # 10 min max per nav batch
route_manager:
ros__parameters:
save_dir: "/data/routes"
route_name: "route"
# ── Operational notes ─────────────────────────────────────────────────────────
#
# Recording a route (EUC follow-me session):
# ros2 param set /route_recorder route_name "friday_loop"
# ros2 service call /route/start_recording std_srvs/srv/Trigger '{}'
# # ... follow Tee for the full route ...
# ros2 service call /route/stop_recording std_srvs/srv/Trigger '{}'
# ros2 service call /route/save std_srvs/srv/Trigger '{}'
#
# Replaying a route (autonomous):
# ros2 param set /route_replayer route_name "friday_loop"
# ros2 service call /route/load std_srvs/srv/Trigger '{}'
# ros2 service call /route/start_replay std_srvs/srv/Trigger '{}'
# ros2 topic echo /route/replay_status
#
# Listing routes:
# ros2 service call /route/list std_srvs/srv/Trigger '{}'
#
# Route info:
# ros2 param set /route_manager route_name "friday_loop"
# ros2 service call /route/info std_srvs/srv/Trigger '{}'
#
# Delete route:
# ros2 param set /route_manager route_name "friday_loop"
# ros2 service call /route/delete std_srvs/srv/Trigger '{}'
#
# Emergency stop during replay:
# ros2 service call /route/stop std_srvs/srv/Trigger '{}'
# # or: ros2 topic pub /route/pause_cmd std_msgs/msg/Bool '{data: true}'
#
# ── File format (JSON Lines, .jsonl) ─────────────────────────────────────────
#
# {
# "version": 1,
# "name": "friday_loop",
# "recorded_at": "2026-03-01T14:22:00+00:00",
# "waypoint_count": 142,
# "total_distance_m": 1240.5,
# "duration_s": 820.3,
# "waypoints": [
# {"t": 1740839000.1, "lat": 37.7749, "lon": -122.4194,
# "heading_deg": 45.0, "speed_mps": 1.2, "odom_x": 0.0, "odom_y": 0.0,
# "gps_status": 0},
# ...
# ]
# }

View File

@ -0,0 +1,99 @@
"""
route_system.launch.py Route recording and replay system for SaltyBot
Starts:
route_recorder_node records GPS + odom path during follow-me rides
route_replayer_node replays recorded routes autonomously via Nav2
route_manager_node list/delete/info for saved routes
Depends on:
saltybot-nav2 container (Nav2 action server /navigate_through_poses)
saltybot_cellular (/gps/fix from SIM7600X GPS PR #65)
saltybot_uwb (/uwb/target PR #66, used for context during recording)
STM32 bridge (/odom from wheel encoders)
D435i (/imu/data for heading)
Usage record a route:
# Set name, start recording, ride with Tee, stop and save:
ros2 param set /route_recorder route_name "friday_loop"
ros2 service call /route/start_recording std_srvs/srv/Trigger '{}'
# ... ride the route ...
ros2 service call /route/stop_recording std_srvs/srv/Trigger '{}'
ros2 service call /route/save std_srvs/srv/Trigger '{}'
Usage replay a route:
ros2 param set /route_replayer route_name "friday_loop"
ros2 service call /route/load std_srvs/srv/Trigger '{}'
ros2 service call /route/start_replay std_srvs/srv/Trigger '{}'
Arguments:
save_dir (default /data/routes) NVMe route storage
route_name (default route) name for record/replay
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg_dir = get_package_share_directory('saltybot_routes')
params_file = os.path.join(pkg_dir, 'config', 'route_params.yaml')
args = [
DeclareLaunchArgument('save_dir', default_value='/data/routes'),
DeclareLaunchArgument('route_name', default_value='route'),
]
recorder = Node(
package='saltybot_routes',
executable='route_recorder_node',
name='route_recorder',
output='screen',
parameters=[
params_file,
{
'save_dir': LaunchConfiguration('save_dir'),
'route_name': LaunchConfiguration('route_name'),
},
],
)
replayer = Node(
package='saltybot_routes',
executable='route_replayer_node',
name='route_replayer',
output='screen',
parameters=[
params_file,
{
'save_dir': LaunchConfiguration('save_dir'),
'route_name': LaunchConfiguration('route_name'),
},
],
)
manager = Node(
package='saltybot_routes',
executable='route_manager_node',
name='route_manager',
output='screen',
parameters=[
params_file,
{
'save_dir': LaunchConfiguration('save_dir'),
'route_name': LaunchConfiguration('route_name'),
},
],
)
return LaunchDescription([
*args,
recorder,
replayer,
manager,
])

View File

@ -0,0 +1,31 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_routes</name>
<version>0.1.0</version>
<description>Route recording and autonomous replay for SaltyBot — "Ride once, replay forever."</description>
<maintainer email="seb@saltylab.io">seb</maintainer>
<license>MIT</license>
<buildtool_depend>ament_python</buildtool_depend>
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>geometry_msgs</depend>
<depend>nav_msgs</depend>
<depend>std_msgs</depend>
<depend>std_srvs</depend>
<depend>nav2_msgs</depend>
<exec_depend>robot_localization</exec_depend>
<exec_depend>nav2_bringup</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,149 @@
"""
route_manager_node List, inspect, and delete saved route files.
Provides a management interface for the /data/routes/ directory.
All routes are .jsonl files written by route_recorder_node.
Services:
/route/list std_srvs/Trigger returns JSON list of all routes in status message
/route/delete std_srvs/Trigger delete route named by route_name param
/route/info std_srvs/Trigger return metadata for route named by route_name param
Parameters:
save_dir (default /data/routes)
route_name (default 'route') used by /route/delete and /route/info
Topics published:
/route/manager_status std_msgs/String last operation result
Response format (in message field):
/route/list JSON string: [{"name":..., "distance_m":..., "recorded_at":..., "waypoint_count":...}, ...]
/route/info JSON string: {full metadata dict}
/route/delete "Deleted: <name>"
"""
import json
import os
from datetime import datetime, timezone
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from std_srvs.srv import Trigger
class RouteManagerNode(Node):
def __init__(self):
super().__init__('route_manager')
self.declare_parameter('save_dir', '/data/routes')
self.declare_parameter('route_name', 'route')
self._status_pub = self.create_publisher(String, '/route/manager_status', 10)
self.create_service(Trigger, '/route/list', self._list_cb)
self.create_service(Trigger, '/route/delete', self._delete_cb)
self.create_service(Trigger, '/route/info', self._info_cb)
self.get_logger().info('route_manager: ready.')
# ── Service handlers ──────────────────────────────────────────────────────
def _list_cb(self, _req, response):
save_dir = self.get_parameter('save_dir').value
if not os.path.isdir(save_dir):
response.success = True
response.message = '[]'
return response
routes = []
for fname in sorted(os.listdir(save_dir)):
if not fname.endswith('.jsonl'):
continue
path = os.path.join(save_dir, fname)
try:
with open(path) as f:
meta = json.load(f)
routes.append({
'name': meta.get('name', fname[:-6]),
'distance_m': meta.get('total_distance_m', 0.0),
'duration_s': meta.get('duration_s', 0.0),
'recorded_at': meta.get('recorded_at', ''),
'waypoint_count': meta.get('waypoint_count', 0),
'file': fname,
})
except Exception as e:
self.get_logger().warn(f'route_manager: could not read {fname}: {e}')
result = json.dumps(routes)
self._pub_status(f'list:{len(routes)}_routes')
response.success = True
response.message = result
self.get_logger().info(f'route_manager: listed {len(routes)} routes')
return response
def _delete_cb(self, _req, response):
name = self.get_parameter('route_name').value
save_dir = self.get_parameter('save_dir').value
path = os.path.join(save_dir, f'{name}.jsonl')
if not os.path.exists(path):
response.success = False
response.message = f'Route not found: {name}'
return response
os.remove(path)
self._pub_status(f'deleted:{name}')
self.get_logger().info(f'route_manager: deleted "{name}"')
response.success = True
response.message = f'Deleted: {name}'
return response
def _info_cb(self, _req, response):
name = self.get_parameter('route_name').value
save_dir = self.get_parameter('save_dir').value
path = os.path.join(save_dir, f'{name}.jsonl')
if not os.path.exists(path):
response.success = False
response.message = f'Route not found: {name}'
return response
try:
with open(path) as f:
meta = json.load(f)
# Return metadata without the full waypoints array (can be large)
info = {k: v for k, v in meta.items() if k != 'waypoints'}
result = json.dumps(info, indent=2)
self._pub_status(f'info:{name}')
response.success = True
response.message = result
except Exception as e:
response.success = False
response.message = f'Error reading route: {e}'
return response
# ── Helpers ───────────────────────────────────────────────────────────────
def _pub_status(self, s: str) -> None:
msg = String()
msg.data = s
self._status_pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = RouteManagerNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,317 @@
"""
route_recorder_node Records GPS + odometry path during follow-me rides.
"Ride once, replay forever." During a UWB follow-me session, this node
samples the robot's GPS position, heading, and speed into a timestamped
JSON-Lines file. The result is a portable route file that route_replayer_node
can load to autonomously repeat the ride.
Recording strategy:
- Samples at up to 1 Hz (configurable)
- Skips waypoints within waypoint_spacing_m of the previous one
- Each waypoint: {t, lat, lon, heading_deg, speed_mps, x, y, source}
- source: 'gps' | 'odom' (used when GPS quality drops)
JSON-Lines format (one JSON object per line, .jsonl):
{"version": 1, "name": "...", "recorded_at": "...", "waypoints": [...]}
Written as a single-line header on line 1 (metadata), then appended to.
File location: /data/routes/<name>.jsonl
Topics subscribed:
/gps/fix sensor_msgs/NavSatFix GPS position (SIM7600X)
/odom nav_msgs/Odometry wheel encoder odometry
/uwb/target geometry_msgs/PoseStamped UWB target position (for context)
/imu/data sensor_msgs/Imu heading from IMU
Topics published:
/route/status std_msgs/String 'idle'|'recording:<name>'|'saved:<name>'
Services:
/route/start_recording std_srvs/Trigger begin recording (name via param)
/route/stop_recording std_srvs/Trigger stop and buffer (don't save yet)
/route/save std_srvs/Trigger flush buffer to disk
/route/discard std_srvs/Trigger discard unsaved recording
Parameters:
save_dir (default /data/routes)
waypoint_spacing_m (default 2.0) skip closer waypoints
sample_rate_hz (default 1.0) max recording rate
route_name (default 'route') set before start_recording
min_fix_status (default 0) reject STATUS_NO_FIX
"""
import json
import math
import os
import time
from datetime import datetime, timezone
from typing import Optional
import rclpy
from rclpy.node import Node
from rclpy.callback_groups import ReentrantCallbackGroup
from sensor_msgs.msg import NavSatFix, NavSatStatus, Imu
from nav_msgs.msg import Odometry
from geometry_msgs.msg import PoseStamped
from std_msgs.msg import String
from std_srvs.srv import Trigger
def _haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Great-circle distance in metres."""
R = 6_371_000.0
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _quat_to_yaw_deg(q) -> float:
"""Convert quaternion to yaw in degrees (-180..180)."""
siny = 2.0 * (q.w * q.z + q.x * q.y)
cosy = 1.0 - 2.0 * (q.y * q.y + q.z * q.z)
return math.degrees(math.atan2(siny, cosy))
class RouteRecorderNode(Node):
def __init__(self):
super().__init__('route_recorder')
self.declare_parameter('save_dir', '/data/routes')
self.declare_parameter('waypoint_spacing_m', 2.0)
self.declare_parameter('sample_rate_hz', 1.0)
self.declare_parameter('route_name', 'route')
self.declare_parameter('min_fix_status', NavSatStatus.STATUS_FIX)
self._recording = False
self._waypoints: list[dict] = []
self._last_wp: Optional[dict] = None
self._last_sample_t = 0.0
# Latest sensor state
self._fix: Optional[NavSatFix] = None
self._odom: Optional[Odometry] = None
self._imu: Optional[Imu] = None
self._uwb: Optional[PoseStamped] = None
self._cb_group = ReentrantCallbackGroup()
# ── Subscriptions ────────────────────────────────────────────────────
self.create_subscription(NavSatFix, '/gps/fix', self._fix_cb, 10)
self.create_subscription(Odometry, '/odom', self._odom_cb, 10)
self.create_subscription(Imu, '/imu/data', self._imu_cb, 10)
self.create_subscription(PoseStamped, '/uwb/target', self._uwb_cb, 10)
# ── Publishers ───────────────────────────────────────────────────────
self._status_pub = self.create_publisher(String, '/route/status', 10)
# ── Services ─────────────────────────────────────────────────────────
self.create_service(Trigger, '/route/start_recording', self._start_cb,
callback_group=self._cb_group)
self.create_service(Trigger, '/route/stop_recording', self._stop_cb,
callback_group=self._cb_group)
self.create_service(Trigger, '/route/save', self._save_cb,
callback_group=self._cb_group)
self.create_service(Trigger, '/route/discard', self._discard_cb,
callback_group=self._cb_group)
# ── Sample timer ─────────────────────────────────────────────────────
rate = self.get_parameter('sample_rate_hz').value
self.create_timer(1.0 / rate, self._sample)
self._pub_status('idle')
self.get_logger().info('route_recorder: ready. Call /route/start_recording.')
# ── Sensor callbacks ──────────────────────────────────────────────────────
def _fix_cb(self, msg: NavSatFix) -> None:
self._fix = msg
def _odom_cb(self, msg: Odometry) -> None:
self._odom = msg
def _imu_cb(self, msg: Imu) -> None:
self._imu = msg
def _uwb_cb(self, msg: PoseStamped) -> None:
self._uwb = msg
# ── Service handlers ──────────────────────────────────────────────────────
def _start_cb(self, _req, response):
if self._recording:
response.success = False
response.message = 'Already recording'
return response
name = self.get_parameter('route_name').value
self._waypoints = []
self._last_wp = None
self._recording = True
self.get_logger().info(f'route_recorder: started recording "{name}"')
self._pub_status(f'recording:{name}')
response.success = True
response.message = f'Recording started: {name}'
return response
def _stop_cb(self, _req, response):
if not self._recording:
response.success = False
response.message = 'Not recording'
return response
self._recording = False
n = len(self._waypoints)
self.get_logger().info(f'route_recorder: stopped. {n} waypoints buffered.')
self._pub_status(f'stopped:{n}_waypoints')
response.success = True
response.message = f'Recording stopped. {n} waypoints buffered. Call /route/save.'
return response
def _save_cb(self, _req, response):
if not self._waypoints:
response.success = False
response.message = 'No waypoints to save'
return response
name = self.get_parameter('route_name').value
save_dir = self.get_parameter('save_dir').value
os.makedirs(save_dir, exist_ok=True)
path = os.path.join(save_dir, f'{name}.jsonl')
total_dist = self._total_distance_m()
meta = {
'version': 1,
'name': name,
'recorded_at': datetime.now(timezone.utc).isoformat(),
'waypoint_count': len(self._waypoints),
'total_distance_m': round(total_dist, 1),
'duration_s': self._duration_s(),
'waypoints': self._waypoints,
}
with open(path, 'w') as f:
json.dump(meta, f)
f.write('\n')
self.get_logger().info(
f'route_recorder: saved {len(self._waypoints)} waypoints → {path} '
f'({total_dist:.0f} m)'
)
self._pub_status(f'saved:{name}')
self._waypoints = []
response.success = True
response.message = f'Saved: {path} ({len(meta["waypoints"])} waypoints, {total_dist:.0f} m)'
return response
def _discard_cb(self, _req, response):
n = len(self._waypoints)
self._waypoints = []
self._recording = False
self._last_wp = None
self._pub_status('idle')
response.success = True
response.message = f'Discarded {n} waypoints'
return response
# ── Sample timer ──────────────────────────────────────────────────────────
def _sample(self) -> None:
if not self._recording:
return
if self._fix is None:
return
# GPS quality check
min_status = self.get_parameter('min_fix_status').value
if self._fix.status.status < min_status:
return
lat = self._fix.latitude
lon = self._fix.longitude
# Skip if too close to last waypoint
spacing = self.get_parameter('waypoint_spacing_m').value
if self._last_wp is not None:
d = _haversine(lat, lon, self._last_wp['lat'], self._last_wp['lon'])
if d < spacing:
return
# Heading: prefer IMU yaw, fall back to GPS bearing
heading_deg = 0.0
if self._imu is not None:
heading_deg = _quat_to_yaw_deg(self._imu.orientation)
elif self._last_wp is not None:
dlat = lat - self._last_wp['lat']
dlon = lon - self._last_wp['lon']
heading_deg = math.degrees(math.atan2(dlon, dlat))
# Speed from odometry
speed_mps = 0.0
odom_x, odom_y = 0.0, 0.0
if self._odom is not None:
vx = self._odom.twist.twist.linear.x
vy = self._odom.twist.twist.linear.y
speed_mps = math.hypot(vx, vy)
odom_x = self._odom.pose.pose.position.x
odom_y = self._odom.pose.pose.position.y
wp = {
't': self.get_clock().now().nanoseconds / 1e9,
'lat': lat,
'lon': lon,
'heading_deg': round(heading_deg, 1),
'speed_mps': round(speed_mps, 2),
'odom_x': round(odom_x, 3),
'odom_y': round(odom_y, 3),
'gps_status': self._fix.status.status,
}
self._waypoints.append(wp)
self._last_wp = wp
name = self.get_parameter('route_name').value
self.get_logger().debug(
f'route_recorder: wp #{len(self._waypoints)} '
f'lat={lat:.6f} lon={lon:.6f} hdg={heading_deg:.0f}° spd={speed_mps:.1f}m/s'
)
self._pub_status(f'recording:{name}:{len(self._waypoints)}_wps')
# ── Helpers ───────────────────────────────────────────────────────────────
def _total_distance_m(self) -> float:
if len(self._waypoints) < 2:
return 0.0
total = 0.0
for i in range(1, len(self._waypoints)):
a, b = self._waypoints[i - 1], self._waypoints[i]
total += _haversine(a['lat'], a['lon'], b['lat'], b['lon'])
return total
def _duration_s(self) -> float:
if len(self._waypoints) < 2:
return 0.0
return self._waypoints[-1]['t'] - self._waypoints[0]['t']
def _pub_status(self, s: str) -> None:
msg = String()
msg.data = s
self._status_pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = RouteRecorderNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,389 @@
"""
route_replayer_node Loads and autonomously replays a recorded route.
"Ride once, replay forever." Loads a .jsonl route file, converts GPS
waypoints to map-frame PoseStamped via robot_localization's flat-earth
ENU projection, then sends batches to Nav2's navigate_through_poses action.
Replay strategy:
1. Load route file list of {lat, lon, heading_deg, ...}
2. Convert GPS map-frame ENU (relative to first fix / datum)
3. Subsample waypoints by replay_spacing_m (default 3m)
4. Inject heading from recorded data (preserved via quaternion)
5. Send full waypoint list to Nav2 navigate_through_poses
6. Monitor feedback; pause/resume on demand
7. Odometry correction: compare current odom position to expected odom
position and log drift for future closed-loop correction (V1: log only)
GPS drift handling:
- Use wider goal tolerance (2m default matches SIM7600X CEP)
- Slow down near waypoints (Nav2 DWB sim_time reduces automatically)
- Report GPS covariance in status so operator can monitor quality
Topics subscribed:
/gps/fix sensor_msgs/NavSatFix
/odom nav_msgs/Odometry
/route/pause_cmd std_msgs/Bool True=pause, False=resume
Topics published:
/route/replay_status std_msgs/String state machine status
/route/progress std_msgs/String 'N/M waypoints, D m remaining'
Actions called:
/navigate_through_poses nav2_msgs/action/NavigateThroughPoses
Services:
/route/load std_srvs/Trigger load route named by route_name param
/route/start_replay std_srvs/Trigger begin replay
/route/pause std_srvs/Trigger pause (cancel Nav2 goal, hold position)
/route/stop std_srvs/Trigger abort replay
"""
import json
import math
import os
from typing import Optional
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.executors import MultiThreadedExecutor
from sensor_msgs.msg import NavSatFix, NavSatStatus
from nav_msgs.msg import Odometry
from geometry_msgs.msg import PoseStamped, PoseArray, Pose
from std_msgs.msg import String, Bool
from std_srvs.srv import Trigger
from nav2_msgs.action import NavigateThroughPoses
def _haversine(lat1, lon1, lat2, lon2) -> float:
R = 6_371_000.0
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _latlon_to_enu(lat: float, lon: float, lat0: float, lon0: float):
"""Flat-earth ENU approximation (valid ~10 km)."""
dlat = math.radians(lat - lat0)
dlon = math.radians(lon - lon0)
cos0 = math.cos(math.radians(lat0))
x = dlon * cos0 * 6_371_000.0
y = dlat * 6_371_000.0
return x, y
def _heading_to_quat(heading_deg: float) -> tuple:
"""Convert heading (degrees, 0=North, CW) to ENU quaternion (0=East, CCW)."""
# In ENU: x=East, y=North. yaw=0 → facing East.
# Heading: 0=North, 90=East. Convert: yaw_enu = 90 - heading_deg
yaw = math.radians(90.0 - heading_deg)
return (0.0, 0.0, math.sin(yaw / 2), math.cos(yaw / 2)) # x, y, z, w
class RouteReplayerNode(Node):
def __init__(self):
super().__init__('route_replayer')
self.declare_parameter('save_dir', '/data/routes')
self.declare_parameter('route_name', 'route')
self.declare_parameter('replay_spacing_m', 3.0) # subsample spacing
self.declare_parameter('goal_tolerance_xy', 2.0) # GPS ±2.5m CEP
self.declare_parameter('replay_speed_factor', 1.0) # 1.0 = same speed as recorded
self.declare_parameter('map_frame', 'map')
self.declare_parameter('nav_timeout_s', 600.0) # 10 min
self._route: Optional[dict] = None # loaded route metadata
self._fix: Optional[NavSatFix] = None
self._odom: Optional[Odometry] = None
self._paused = False
self._active = False
self._nav_goal_handle = None
self._cb_group = ReentrantCallbackGroup()
# ── Subscriptions ────────────────────────────────────────────────────
self.create_subscription(NavSatFix, '/gps/fix', self._fix_cb, 10)
self.create_subscription(Odometry, '/odom', self._odom_cb, 10)
self.create_subscription(Bool, '/route/pause_cmd', self._pause_cb, 10)
# ── Publishers ───────────────────────────────────────────────────────
self._status_pub = self.create_publisher(String, '/route/replay_status', 10)
self._progress_pub = self.create_publisher(String, '/route/progress', 10)
# ── Services ─────────────────────────────────────────────────────────
self.create_service(Trigger, '/route/load', self._load_cb,
callback_group=self._cb_group)
self.create_service(Trigger, '/route/start_replay', self._start_cb,
callback_group=self._cb_group)
self.create_service(Trigger, '/route/pause', self._pause_svc_cb,
callback_group=self._cb_group)
self.create_service(Trigger, '/route/stop', self._stop_cb,
callback_group=self._cb_group)
# ── Nav2 action client ───────────────────────────────────────────────
self._nav_client = ActionClient(
self, NavigateThroughPoses, '/navigate_through_poses',
callback_group=self._cb_group,
)
# ── Progress timer ────────────────────────────────────────────────────
self.create_timer(2.0, self._publish_progress)
self._pub_status('idle')
self.get_logger().info('route_replayer: ready. Call /route/load then /route/start_replay.')
# ── Sensor callbacks ──────────────────────────────────────────────────────
def _fix_cb(self, msg: NavSatFix) -> None:
self._fix = msg
def _odom_cb(self, msg: Odometry) -> None:
self._odom = msg
def _pause_cb(self, msg: Bool) -> None:
if msg.data and not self._paused:
self._pause_replay()
elif not msg.data and self._paused:
self._resume_replay()
# ── Service handlers ──────────────────────────────────────────────────────
def _load_cb(self, _req, response):
name = self.get_parameter('route_name').value
dir_ = self.get_parameter('save_dir').value
path = os.path.join(dir_, f'{name}.jsonl')
if not os.path.exists(path):
response.success = False
response.message = f'Route file not found: {path}'
return response
with open(path) as f:
self._route = json.load(f)
n = self._route.get('waypoint_count', len(self._route.get('waypoints', [])))
dist = self._route.get('total_distance_m', 0.0)
dur = self._route.get('duration_s', 0.0)
self.get_logger().info(
f'route_replayer: loaded "{name}"{n} waypoints, '
f'{dist:.0f} m, {dur:.0f} s'
)
self._pub_status(f'loaded:{name}:{n}_waypoints:{dist:.0f}m')
response.success = True
response.message = f'Loaded: {name} ({n} waypoints, {dist:.0f} m, {dur:.0f} s)'
return response
def _start_cb(self, _req, response):
if self._route is None:
response.success = False
response.message = 'No route loaded — call /route/load first'
return response
if self._active:
response.success = False
response.message = 'Replay already active'
return response
if self._fix is None:
response.success = False
response.message = 'No GPS fix'
return response
if self._fix.status.status < NavSatStatus.STATUS_FIX:
response.success = False
response.message = f'GPS quality too low: status={self._fix.status.status}'
return response
# Kick off async nav
self.create_timer(0.1, self._send_nav_goal, callback_group=self._cb_group)
response.success = True
name = self._route.get('name', '?')
response.message = f'Starting replay of "{name}"'
return response
def _pause_svc_cb(self, _req, response):
if self._paused:
self._resume_replay()
response.message = 'Replay resumed'
else:
self._pause_replay()
response.message = 'Replay paused'
response.success = True
return response
def _stop_cb(self, _req, response):
self._cancel_nav('stop requested')
response.success = True
response.message = 'Replay stopped'
return response
# ── Navigation ────────────────────────────────────────────────────────────
def _send_nav_goal(self) -> None:
"""One-shot timer: convert route waypoints → Nav2 goal."""
self.destroy_timer(list(self.timers)[-1])
if not self._nav_client.wait_for_server(timeout_sec=5.0):
self.get_logger().error('Nav2 not available')
self._pub_status('error:nav2_unavailable')
return
waypoints = self._route.get('waypoints', [])
if not waypoints:
self.get_logger().error('Route has no waypoints')
self._pub_status('error:empty_route')
return
# Use current GPS fix as datum for ENU origin
lat0 = self._fix.latitude
lon0 = self._fix.longitude
# Subsample by spacing
spacing = self.get_parameter('replay_spacing_m').value
sampled = self._subsample_waypoints(waypoints, spacing)
frame = self.get_parameter('map_frame').value
now = self.get_clock().now().to_msg()
goal = NavigateThroughPoses.Goal()
for wp in sampled:
x, y = _latlon_to_enu(wp['lat'], wp['lon'], lat0, lon0)
qx, qy, qz, qw = _heading_to_quat(wp.get('heading_deg', 0.0))
ps = PoseStamped()
ps.header.stamp = now
ps.header.frame_id = frame
ps.pose.position.x = x
ps.pose.position.y = y
ps.pose.orientation.x = qx
ps.pose.orientation.y = qy
ps.pose.orientation.z = qz
ps.pose.orientation.w = qw
goal.poses.append(ps)
name = self._route.get('name', '?')
self.get_logger().info(
f'route_replayer: sending {len(goal.poses)} waypoints to Nav2 '
f'(from {len(waypoints)} recorded, datum={lat0:.6f},{lon0:.6f})'
)
self._pub_status(f'replaying:{name}:{len(goal.poses)}_waypoints')
self._active = True
self._paused = False
future = self._nav_client.send_goal_async(
goal,
feedback_callback=self._nav_feedback_cb,
)
future.add_done_callback(self._nav_goal_response_cb)
def _nav_goal_response_cb(self, future) -> None:
self._nav_goal_handle = future.result()
if not self._nav_goal_handle.accepted:
self.get_logger().error('Nav2 rejected replay goal')
self._active = False
self._pub_status('error:goal_rejected')
return
result_future = self._nav_goal_handle.get_result_async()
result_future.add_done_callback(self._nav_result_cb)
def _nav_feedback_cb(self, feedback) -> None:
fb = feedback.feedback
remaining = getattr(fb, 'number_of_poses_remaining', '?')
dist = getattr(fb, 'distance_remaining', None)
if dist is not None:
self.get_logger().info(
f'replay: {remaining} waypoints remaining, {dist:.1f} m to go',
throttle_duration_sec=10.0,
)
def _nav_result_cb(self, future) -> None:
self._active = False
result = future.result()
name = self._route.get('name', '?') if self._route else '?'
if result.status == 4:
self.get_logger().info(f'Replay of "{name}" completed successfully')
self._pub_status(f'completed:{name}')
else:
self.get_logger().warn(f'Replay ended with status {result.status}')
self._pub_status(f'failed:{name}:status={result.status}')
def _pause_replay(self) -> None:
if self._nav_goal_handle and self._active:
self._nav_goal_handle.cancel_goal_async()
self._paused = True
self._active = False
self._pub_status('paused')
self.get_logger().info('route_replayer: paused')
def _resume_replay(self) -> None:
"""Re-issue the nav goal from current position (simple re-start)."""
self._paused = False
if self._route:
self.create_timer(0.1, self._send_nav_goal, callback_group=self._cb_group)
self._pub_status('resuming')
self.get_logger().info('route_replayer: resuming')
def _cancel_nav(self, reason: str) -> None:
if self._nav_goal_handle and self._active:
self._nav_goal_handle.cancel_goal_async()
self._active = False
self._paused = False
self._pub_status(f'stopped:{reason}')
self.get_logger().info(f'route_replayer: stopped — {reason}')
# ── Helpers ───────────────────────────────────────────────────────────────
def _subsample_waypoints(self, waypoints: list, spacing_m: float) -> list:
"""Keep waypoints at least spacing_m apart. Always keep first + last."""
if not waypoints:
return []
out = [waypoints[0]]
for wp in waypoints[1:-1]:
prev = out[-1]
d = _haversine(wp['lat'], wp['lon'], prev['lat'], prev['lon'])
if d >= spacing_m:
out.append(wp)
if len(waypoints) > 1:
out.append(waypoints[-1])
return out
def _publish_progress(self) -> None:
if not self._active or self._route is None:
return
name = self._route.get('name', '?')
n = self._route.get('waypoint_count', '?')
dist = self._route.get('total_distance_m', 0.0)
msg = String()
msg.data = f'replaying:{name} total={n}wps dist={dist:.0f}m'
self._progress_pub.publish(msg)
def _pub_status(self, s: str) -> None:
msg = String()
msg.data = s
self._status_pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = RouteReplayerNode()
executor = MultiThreadedExecutor()
executor.add_node(node)
try:
executor.spin()
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_routes
[install]
install_scripts=$base/lib/saltybot_routes

View File

@ -0,0 +1,34 @@
import os
from glob import glob
from setuptools import find_packages, setup
package_name = 'saltybot_routes'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'),
glob('launch/*.py')),
(os.path.join('share', package_name, 'config'),
glob('config/*.yaml')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='seb',
maintainer_email='seb@saltylab.io',
description='Route recording and autonomous replay — ride once, replay forever.',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'route_recorder_node = saltybot_routes.route_recorder_node:main',
'route_replayer_node = saltybot_routes.route_replayer_node:main',
'route_manager_node = saltybot_routes.route_manager_node:main',
],
},
)