Merge pull request 'Geofence boundary enforcer (Issue #298)' (#302) from sl-controls/issue-298-geofence into main

This commit is contained in:
sl-jetson 2026-03-02 21:35:26 -05:00
commit 86d798afe7
10 changed files with 406 additions and 0 deletions

View File

@ -0,0 +1,11 @@
geofence:
ros__parameters:
# Polygon vertices as flat list [x1, y1, x2, y2, ...]
# Example: square from (0,0) to (10,10)
geofence_vertices: [0.0, 0.0, 10.0, 0.0, 10.0, 10.0, 0.0, 10.0]
# Enforce boundary by zeroing cmd_vel on breach
enforce_boundary: false
# Safety margin (m) - breach triggered before actual boundary
margin: 0.0

View File

@ -0,0 +1,31 @@
"""Launch file for geofence enforcer node."""
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch.actions import DeclareLaunchArgument
import os
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
"""Generate launch description for geofence."""
pkg_dir = get_package_share_directory("saltybot_geofence")
config_file = os.path.join(pkg_dir, "config", "geofence_config.yaml")
return LaunchDescription(
[
DeclareLaunchArgument(
"config_file",
default_value=config_file,
description="Path to configuration YAML file",
),
Node(
package="saltybot_geofence",
executable="geofence_node",
name="geofence",
output="screen",
parameters=[LaunchConfiguration("config_file")],
),
]
)

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_geofence</name>
<version>0.1.0</version>
<description>Geofence boundary enforcer for SaltyBot</description>
<maintainer email="sl-controls@saltylab.local">SaltyLab Controls</maintainer>
<license>MIT</license>
<buildtool_depend>ament_python</buildtool_depend>
<depend>rclpy</depend>
<depend>nav_msgs</depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<test_depend>pytest</test_depend>
<test_depend>nav_msgs</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,143 @@
#!/usr/bin/env python3
"""Geofence boundary enforcer for SaltyBot.
Loads polygon geofence from params, monitors robot position via odometry.
Publishes Bool on /saltybot/geofence_breach when exiting boundary.
Optionally zeros cmd_vel to enforce boundary.
Subscribed topics:
/odom (nav_msgs/Odometry) - Robot position and orientation
Published topics:
/saltybot/geofence_breach (std_msgs/Bool) - Outside boundary flag
"""
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
from std_msgs.msg import Bool
from typing import List, Tuple
class GeofenceNode(Node):
"""ROS2 node for geofence boundary enforcement."""
def __init__(self):
super().__init__("geofence")
# Parameters
self.declare_parameter("geofence_vertices", [])
self.declare_parameter("enforce_boundary", False)
self.declare_parameter("margin", 0.0)
vertices = self.get_parameter("geofence_vertices").value
self.enforce_boundary = self.get_parameter("enforce_boundary").value
self.margin = self.get_parameter("margin").value
# Parse vertices from flat list [x1,y1,x2,y2,...]
self.geofence_vertices = self._parse_vertices(vertices)
# State tracking
self.robot_x = 0.0
self.robot_y = 0.0
self.inside_geofence = True
self.breach_published = False
# Subscription to odometry
self.sub_odom = self.create_subscription(
Odometry, "/odom", self._on_odometry, 10
)
# Publisher for breach status
self.pub_breach = self.create_publisher(Bool, "/saltybot/geofence_breach", 10)
self.get_logger().info(
f"Geofence enforcer initialized with {len(self.geofence_vertices)} vertices. "
f"Enforce: {self.enforce_boundary}, Margin: {self.margin}m"
)
if len(self.geofence_vertices) > 0:
self.get_logger().info(f"Geofence vertices: {self.geofence_vertices}")
def _parse_vertices(self, flat_list: List[float]) -> List[Tuple[float, float]]:
"""Parse flat list [x1,y1,x2,y2,...] into vertex tuples."""
if len(flat_list) < 6: # Need at least 3 vertices (6 values)
self.get_logger().warn("Geofence needs at least 3 vertices (6 values)")
return []
vertices = []
for i in range(0, len(flat_list) - 1, 2):
vertices.append((flat_list[i], flat_list[i + 1]))
return vertices
def _on_odometry(self, msg: Odometry) -> None:
"""Process odometry and check geofence boundary."""
if len(self.geofence_vertices) == 0:
# No geofence defined
self.inside_geofence = True
return
# Extract robot position
self.robot_x = msg.pose.pose.position.x
self.robot_y = msg.pose.pose.position.y
# Check if inside geofence
self.inside_geofence = self._point_in_polygon(
(self.robot_x, self.robot_y), self.geofence_vertices
)
# Publish breach status
breach = not self.inside_geofence
if breach and not self.breach_published:
self.get_logger().warn(
f"GEOFENCE BREACH! Robot at ({self.robot_x:.2f}, {self.robot_y:.2f})"
)
self.breach_published = True
elif not breach and self.breach_published:
self.get_logger().info(
f"Robot re-entered geofence at ({self.robot_x:.2f}, {self.robot_y:.2f})"
)
self.breach_published = False
msg_breach = Bool(data=breach)
self.pub_breach.publish(msg_breach)
def _point_in_polygon(self, point: Tuple[float, float], vertices: List[Tuple[float, float]]) -> bool:
"""Ray casting algorithm for point-in-polygon test."""
x, y = point
n = len(vertices)
inside = False
p1x, p1y = vertices[0]
for i in range(1, n + 1):
p2x, p2y = vertices[i % n]
# Check if ray crosses edge
if y > min(p1y, p2y):
if y <= max(p1y, p2y):
if x <= max(p1x, p2x):
if p1y != p2y:
xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x
if p1x == p2x or x <= xinters:
inside = not inside
p1x, p1y = p2x, p2y
return inside
def main(args=None):
rclpy.init(args=args)
node = GeofenceNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_geofence
[install]
install_scripts=$base/lib/saltybot_geofence

View File

@ -0,0 +1,24 @@
from setuptools import setup, find_packages
setup(
name='saltybot_geofence',
version='0.1.0',
packages=find_packages(),
data_files=[
('share/ament_index/resource_index/packages', ['resource/saltybot_geofence']),
('share/saltybot_geofence', ['package.xml']),
('share/saltybot_geofence/config', ['config/geofence_config.yaml']),
('share/saltybot_geofence/launch', ['launch/geofence.launch.py']),
],
install_requires=['setuptools'],
zip_safe=True,
author='SaltyLab Controls',
author_email='sl-controls@saltylab.local',
description='Geofence boundary enforcer for SaltyBot',
license='MIT',
entry_points={
'console_scripts': [
'geofence_node=saltybot_geofence.geofence_node:main',
],
},
)

View File

@ -0,0 +1,170 @@
"""Tests for geofence boundary enforcer."""
import pytest
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Point, Quaternion, Pose, PoseWithCovariance, TwistWithCovariance
import rclpy
from rclpy.time import Time
from saltybot_geofence.geofence_node import GeofenceNode
@pytest.fixture
def rclpy_fixture():
rclpy.init()
yield
rclpy.shutdown()
@pytest.fixture
def node(rclpy_fixture):
node = GeofenceNode()
yield node
node.destroy_node()
class TestInit:
def test_node_initialization(self, node):
assert node.enforce_boundary is False
assert node.margin == 0.0
assert node.inside_geofence is True
class TestVertexParsing:
def test_parse_vertices_valid(self, node):
vertices = node._parse_vertices([0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0])
assert len(vertices) == 4
assert vertices[0] == (0.0, 0.0)
assert vertices[1] == (1.0, 0.0)
def test_parse_vertices_insufficient(self, node):
vertices = node._parse_vertices([0.0, 0.0, 1.0])
assert len(vertices) == 0
class TestPointInPolygon:
def test_point_inside_square(self, node):
vertices = [(0.0, 0.0), (2.0, 0.0), (2.0, 2.0), (0.0, 2.0)]
assert node._point_in_polygon((1.0, 1.0), vertices) is True
def test_point_outside_square(self, node):
vertices = [(0.0, 0.0), (2.0, 0.0), (2.0, 2.0), (0.0, 2.0)]
assert node._point_in_polygon((3.0, 1.0), vertices) is False
def test_point_on_vertex(self, node):
vertices = [(0.0, 0.0), (2.0, 0.0), (2.0, 2.0), (0.0, 2.0)]
# Point on vertex behavior may vary (typically outside)
result = node._point_in_polygon((0.0, 0.0), vertices)
assert isinstance(result, bool)
def test_point_on_edge(self, node):
vertices = [(0.0, 0.0), (2.0, 0.0), (2.0, 2.0), (0.0, 2.0)]
# Point on edge behavior (typically outside)
result = node._point_in_polygon((1.0, 0.0), vertices)
assert isinstance(result, bool)
def test_triangle_inside(self, node):
vertices = [(0.0, 0.0), (4.0, 0.0), (2.0, 3.0)]
assert node._point_in_polygon((2.0, 1.0), vertices) is True
def test_triangle_outside(self, node):
vertices = [(0.0, 0.0), (4.0, 0.0), (2.0, 3.0)]
assert node._point_in_polygon((5.0, 5.0), vertices) is False
def test_concave_polygon(self, node):
# L-shaped polygon
vertices = [(0.0, 0.0), (3.0, 0.0), (3.0, 1.0), (1.0, 1.0), (1.0, 3.0), (0.0, 3.0)]
assert node._point_in_polygon((0.5, 0.5), vertices) is True
assert node._point_in_polygon((2.0, 2.0), vertices) is False
def test_circle_approximation(self, node):
# Octagon approximating circle
import math
vertices = []
for i in range(8):
angle = 2 * math.pi * i / 8
vertices.append((math.cos(angle), math.sin(angle)))
# Center should be inside
assert node._point_in_polygon((0.0, 0.0), vertices) is True
# Far outside should be outside
assert node._point_in_polygon((10.0, 10.0), vertices) is False
class TestOdometryProcessing:
def test_odometry_update_position(self, node):
node.geofence_vertices = [(0.0, 0.0), (10.0, 0.0), (10.0, 10.0), (0.0, 10.0)]
msg = Odometry()
msg.pose.pose.position.x = 5.0
msg.pose.pose.position.y = 5.0
node._on_odometry(msg)
assert node.robot_x == 5.0
assert node.robot_y == 5.0
def test_breach_detection_inside(self, node):
node.geofence_vertices = [(0.0, 0.0), (10.0, 0.0), (10.0, 10.0), (0.0, 10.0)]
msg = Odometry()
msg.pose.pose.position.x = 5.0
msg.pose.pose.position.y = 5.0
node._on_odometry(msg)
assert node.inside_geofence is True
def test_breach_detection_outside(self, node):
node.geofence_vertices = [(0.0, 0.0), (10.0, 0.0), (10.0, 10.0), (0.0, 10.0)]
msg = Odometry()
msg.pose.pose.position.x = 15.0
msg.pose.pose.position.y = 5.0
node._on_odometry(msg)
assert node.inside_geofence is False
def test_breach_flag_transition(self, node):
node.geofence_vertices = [(0.0, 0.0), (10.0, 0.0), (10.0, 10.0), (0.0, 10.0)]
assert node.breach_published is False
# Move outside
msg = Odometry()
msg.pose.pose.position.x = 15.0
msg.pose.pose.position.y = 5.0
node._on_odometry(msg)
assert node.breach_published is True
# Move back inside
msg.pose.pose.position.x = 5.0
msg.pose.pose.position.y = 5.0
node._on_odometry(msg)
assert node.breach_published is False
class TestEdgeCases:
def test_no_geofence_defined(self, node):
node.geofence_vertices = []
msg = Odometry()
msg.pose.pose.position.x = 0.0
msg.pose.pose.position.y = 0.0
node._on_odometry(msg)
# Should default to safe (inside)
assert node.inside_geofence is True
def test_very_small_polygon(self, node):
vertices = [(0.0, 0.0), (0.01, 0.0), (0.01, 0.01), (0.0, 0.01)]
assert node._point_in_polygon((0.005, 0.005), vertices) is True
assert node._point_in_polygon((0.1, 0.1), vertices) is False
def test_large_coordinates(self, node):
vertices = [(0.0, 0.0), (1000.0, 0.0), (1000.0, 1000.0), (0.0, 1000.0)]
assert node._point_in_polygon((500.0, 500.0), vertices) is True
assert node._point_in_polygon((1500.0, 500.0), vertices) is False