feat(social): personal space respector node (Issue #310) #314
@ -0,0 +1,10 @@
|
||||
personal_space_node:
|
||||
ros__parameters:
|
||||
personal_space_m: 0.8 # Minimum comfortable distance (m)
|
||||
backup_speed: 0.1 # Retreat linear speed (m/s, applied as -x)
|
||||
hysteresis_m: 0.1 # Hysteresis band above threshold (m)
|
||||
unknown_distance_m: 99.0 # Distance assumed for faces without a PersonState
|
||||
lost_timeout_s: 1.5 # Face freshness window (s)
|
||||
control_rate: 10.0 # Control loop / publish rate (Hz)
|
||||
faces_topic: "/social/faces/detected"
|
||||
states_topic: "/social/person_states"
|
||||
@ -0,0 +1,39 @@
|
||||
"""personal_space.launch.py -- Launch personal space respector (Issue #310).
|
||||
|
||||
Usage:
|
||||
ros2 launch saltybot_social personal_space.launch.py
|
||||
ros2 launch saltybot_social personal_space.launch.py personal_space_m:=1.0 backup_speed:=0.15
|
||||
"""
|
||||
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
pkg = get_package_share_directory("saltybot_social")
|
||||
cfg = os.path.join(pkg, "config", "personal_space_params.yaml")
|
||||
|
||||
return LaunchDescription([
|
||||
DeclareLaunchArgument("personal_space_m", default_value="0.8",
|
||||
description="Minimum comfortable distance (m)"),
|
||||
DeclareLaunchArgument("backup_speed", default_value="0.1",
|
||||
description="Retreat linear speed (m/s)"),
|
||||
|
||||
Node(
|
||||
package="saltybot_social",
|
||||
executable="personal_space_node",
|
||||
name="personal_space_node",
|
||||
output="screen",
|
||||
parameters=[
|
||||
cfg,
|
||||
{
|
||||
"personal_space_m": LaunchConfiguration("personal_space_m"),
|
||||
"backup_speed": LaunchConfiguration("backup_speed"),
|
||||
},
|
||||
],
|
||||
),
|
||||
])
|
||||
@ -0,0 +1,211 @@
|
||||
"""personal_space_node.py — Personal space respector.
|
||||
Issue #310
|
||||
|
||||
Monitors the distance to detected faces. When the closest person
|
||||
enters within ``personal_space_m`` metres (default 0.8 m) the robot
|
||||
slowly backs up (``Twist.linear.x = -backup_speed``) and latches
|
||||
/saltybot/too_close to True. Once the person retreats beyond
|
||||
``personal_space_m + hysteresis_m`` the robot stops and the latch clears.
|
||||
|
||||
Distance is read from /social/person_states (PersonStateArray, face_id →
|
||||
distance). When a face has no matching PersonState entry the distance is
|
||||
treated as ``unknown_distance_m`` (default: very large → safe, no backup).
|
||||
|
||||
Subscriptions
|
||||
─────────────
|
||||
/social/faces/detected saltybot_social_msgs/FaceDetectionArray
|
||||
/social/person_states saltybot_social_msgs/PersonStateArray
|
||||
|
||||
Publications
|
||||
────────────
|
||||
/cmd_vel geometry_msgs/Twist — linear.x only; angular.z = 0
|
||||
/saltybot/too_close std_msgs/Bool — True while backing up
|
||||
|
||||
State machine
|
||||
─────────────
|
||||
CLEAR — no face within personal_space_m; publish zero Twist + False
|
||||
TOO_CLOSE — face within personal_space_m; publish backup Twist + True
|
||||
clears only when distance > personal_space_m + hysteresis_m
|
||||
|
||||
Parameters
|
||||
──────────
|
||||
personal_space_m (float, 0.8) minimum comfortable distance (m)
|
||||
backup_speed (float, 0.1) retreat linear speed (m/s, applied as -x)
|
||||
hysteresis_m (float, 0.1) hysteresis band above threshold (m)
|
||||
unknown_distance_m (float, 99.0) distance assumed for faces without state
|
||||
lost_timeout_s (float, 1.5) face freshness window (s)
|
||||
control_rate (float, 10.0) publish rate (Hz)
|
||||
faces_topic (str, "/social/faces/detected")
|
||||
states_topic (str, "/social/person_states")
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
import threading
|
||||
from typing import Dict
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile
|
||||
from std_msgs.msg import Bool
|
||||
from geometry_msgs.msg import Twist
|
||||
|
||||
try:
|
||||
from saltybot_social_msgs.msg import FaceDetectionArray, PersonStateArray
|
||||
_MSGS = True
|
||||
except ImportError:
|
||||
_MSGS = False
|
||||
|
||||
|
||||
# ── Pure helpers ───────────────────────────────────────────────────────────────
|
||||
|
||||
def closest_face_distance(face_ids, distance_cache: Dict[int, float],
|
||||
unknown_distance_m: float) -> float:
|
||||
"""Return the minimum distance across all visible face_ids.
|
||||
|
||||
Falls back to *unknown_distance_m* for any face_id not in the cache.
|
||||
Returns *unknown_distance_m* when *face_ids* is empty.
|
||||
"""
|
||||
if not face_ids:
|
||||
return unknown_distance_m
|
||||
return min(distance_cache.get(fid, unknown_distance_m) for fid in face_ids)
|
||||
|
||||
|
||||
def should_backup(distance: float, personal_space_m: float) -> bool:
|
||||
return distance <= personal_space_m
|
||||
|
||||
|
||||
def should_clear(distance: float, personal_space_m: float,
|
||||
hysteresis_m: float) -> bool:
|
||||
return distance > personal_space_m + hysteresis_m
|
||||
|
||||
|
||||
# ── ROS2 node ──────────────────────────────────────────────────────────────────
|
||||
|
||||
class PersonalSpaceNode(Node):
|
||||
"""Backs the robot away when a person enters personal space."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__("personal_space_node")
|
||||
|
||||
self.declare_parameter("personal_space_m", 0.8)
|
||||
self.declare_parameter("backup_speed", 0.1)
|
||||
self.declare_parameter("hysteresis_m", 0.1)
|
||||
self.declare_parameter("unknown_distance_m", 99.0)
|
||||
self.declare_parameter("lost_timeout_s", 1.5)
|
||||
self.declare_parameter("control_rate", 10.0)
|
||||
self.declare_parameter("faces_topic", "/social/faces/detected")
|
||||
self.declare_parameter("states_topic", "/social/person_states")
|
||||
|
||||
self._space = self.get_parameter("personal_space_m").value
|
||||
self._backup_speed = self.get_parameter("backup_speed").value
|
||||
self._hysteresis = self.get_parameter("hysteresis_m").value
|
||||
self._unknown_dist = self.get_parameter("unknown_distance_m").value
|
||||
self._lost_t = self.get_parameter("lost_timeout_s").value
|
||||
rate = self.get_parameter("control_rate").value
|
||||
faces_topic = self.get_parameter("faces_topic").value
|
||||
states_topic = self.get_parameter("states_topic").value
|
||||
|
||||
# State
|
||||
self._too_close: bool = False
|
||||
self._distance_cache: Dict[int, float] = {} # face_id → distance (m)
|
||||
self._active_face_ids: list = [] # face_ids in last msg
|
||||
self._last_face_t: float = 0.0
|
||||
self._lock = threading.Lock()
|
||||
|
||||
qos = QoSProfile(depth=10)
|
||||
self._cmd_pub = self.create_publisher(Twist, "/cmd_vel", qos)
|
||||
self._too_close_pub = self.create_publisher(Bool, "/saltybot/too_close", qos)
|
||||
|
||||
if _MSGS:
|
||||
self._states_sub = self.create_subscription(
|
||||
PersonStateArray, states_topic, self._on_person_states, qos
|
||||
)
|
||||
self._faces_sub = self.create_subscription(
|
||||
FaceDetectionArray, faces_topic, self._on_faces, qos
|
||||
)
|
||||
else:
|
||||
self.get_logger().warn(
|
||||
"saltybot_social_msgs not available — node passive"
|
||||
)
|
||||
|
||||
self._timer = self.create_timer(1.0 / rate, self._control_cb)
|
||||
|
||||
self.get_logger().info(
|
||||
f"PersonalSpaceNode ready "
|
||||
f"(space={self._space}m, backup={self._backup_speed}m/s, "
|
||||
f"hysteresis={self._hysteresis}m)"
|
||||
)
|
||||
|
||||
# ── Subscription callbacks ─────────────────────────────────────────────
|
||||
|
||||
def _on_person_states(self, msg) -> None:
|
||||
with self._lock:
|
||||
for ps in msg.persons:
|
||||
if ps.face_id >= 0:
|
||||
self._distance_cache[ps.face_id] = float(ps.distance)
|
||||
|
||||
def _on_faces(self, msg) -> None:
|
||||
face_ids = [int(f.face_id) for f in msg.faces]
|
||||
with self._lock:
|
||||
self._active_face_ids = face_ids
|
||||
if face_ids:
|
||||
self._last_face_t = time.monotonic()
|
||||
|
||||
# ── Control loop ───────────────────────────────────────────────────────
|
||||
|
||||
def _control_cb(self) -> None:
|
||||
now = time.monotonic()
|
||||
with self._lock:
|
||||
face_ids = list(self._active_face_ids)
|
||||
last_face_t = self._last_face_t
|
||||
cache = dict(self._distance_cache)
|
||||
|
||||
face_fresh = last_face_t > 0.0 and (now - last_face_t) < self._lost_t
|
||||
|
||||
if not face_fresh:
|
||||
# No fresh face data → clear state, stop
|
||||
if self._too_close:
|
||||
self._too_close = False
|
||||
self.get_logger().info("PersonalSpace: face lost — clearing")
|
||||
self._publish(backup=False)
|
||||
return
|
||||
|
||||
dist = closest_face_distance(face_ids, cache, self._unknown_dist)
|
||||
|
||||
if not self._too_close and should_backup(dist, self._space):
|
||||
self._too_close = True
|
||||
self.get_logger().warn(
|
||||
f"PersonalSpace: TOO CLOSE ({dist:.2f}m <= {self._space}m) — backing up"
|
||||
)
|
||||
elif self._too_close and should_clear(dist, self._space, self._hysteresis):
|
||||
self._too_close = False
|
||||
self.get_logger().info(
|
||||
f"PersonalSpace: CLEAR ({dist:.2f}m > {self._space + self._hysteresis:.2f}m)"
|
||||
)
|
||||
|
||||
self._publish(backup=self._too_close)
|
||||
|
||||
def _publish(self, backup: bool) -> None:
|
||||
twist = Twist()
|
||||
if backup:
|
||||
twist.linear.x = -self._backup_speed
|
||||
|
||||
bool_msg = Bool()
|
||||
bool_msg.data = backup
|
||||
|
||||
self._cmd_pub.publish(twist)
|
||||
self._too_close_pub.publish(bool_msg)
|
||||
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = PersonalSpaceNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
@ -55,6 +55,8 @@ setup(
|
||||
'volume_adjust_node = saltybot_social.volume_adjust_node:main',
|
||||
# Conversation topic memory (Issue #299)
|
||||
'topic_memory_node = saltybot_social.topic_memory_node:main',
|
||||
# Personal space respector (Issue #310)
|
||||
'personal_space_node = saltybot_social.personal_space_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
666
jetson/ros2_ws/src/saltybot_social/test/test_personal_space.py
Normal file
666
jetson/ros2_ws/src/saltybot_social/test/test_personal_space.py
Normal file
@ -0,0 +1,666 @@
|
||||
"""test_personal_space.py — Offline tests for personal_space_node (Issue #310).
|
||||
|
||||
Stubs out rclpy and ROS message types so tests run without a ROS install.
|
||||
"""
|
||||
|
||||
import importlib.util
|
||||
import sys
|
||||
import time
|
||||
import types
|
||||
import unittest
|
||||
|
||||
|
||||
# ── ROS2 stubs ────────────────────────────────────────────────────────────────
|
||||
|
||||
def _make_ros_stubs():
|
||||
for mod_name in ("rclpy", "rclpy.node", "rclpy.qos",
|
||||
"std_msgs", "std_msgs.msg",
|
||||
"geometry_msgs", "geometry_msgs.msg",
|
||||
"saltybot_social_msgs", "saltybot_social_msgs.msg"):
|
||||
if mod_name not in sys.modules:
|
||||
sys.modules[mod_name] = types.ModuleType(mod_name)
|
||||
|
||||
class _Node:
|
||||
def __init__(self, name="node"):
|
||||
self._name = name
|
||||
if not hasattr(self, "_params"):
|
||||
self._params = {}
|
||||
self._pubs = {}
|
||||
self._subs = {}
|
||||
self._logs = []
|
||||
self._timers = []
|
||||
|
||||
def declare_parameter(self, name, default):
|
||||
if name not in self._params:
|
||||
self._params[name] = default
|
||||
|
||||
def get_parameter(self, name):
|
||||
class _P:
|
||||
def __init__(self, v): self.value = v
|
||||
return _P(self._params.get(name))
|
||||
|
||||
def create_publisher(self, msg_type, topic, qos):
|
||||
pub = _FakePub()
|
||||
self._pubs[topic] = pub
|
||||
return pub
|
||||
|
||||
def create_subscription(self, msg_type, topic, cb, qos):
|
||||
self._subs[topic] = cb
|
||||
return object()
|
||||
|
||||
def create_timer(self, period, cb):
|
||||
self._timers.append(cb)
|
||||
return object()
|
||||
|
||||
def get_logger(self):
|
||||
node = self
|
||||
class _L:
|
||||
def info(self, m): node._logs.append(("INFO", m))
|
||||
def warn(self, m): node._logs.append(("WARN", m))
|
||||
def error(self, m): node._logs.append(("ERROR", m))
|
||||
return _L()
|
||||
|
||||
def destroy_node(self): pass
|
||||
|
||||
class _FakePub:
|
||||
def __init__(self):
|
||||
self.msgs = []
|
||||
def publish(self, msg):
|
||||
self.msgs.append(msg)
|
||||
|
||||
class _QoSProfile:
|
||||
def __init__(self, depth=10): self.depth = depth
|
||||
|
||||
class _Bool:
|
||||
def __init__(self): self.data = False
|
||||
|
||||
class _TwistLinear:
|
||||
def __init__(self): self.x = 0.0
|
||||
|
||||
class _TwistAngular:
|
||||
def __init__(self): self.z = 0.0
|
||||
|
||||
class _Twist:
|
||||
def __init__(self):
|
||||
self.linear = _TwistLinear()
|
||||
self.angular = _TwistAngular()
|
||||
|
||||
class _FaceDetection:
|
||||
def __init__(self, face_id=0):
|
||||
self.face_id = face_id
|
||||
|
||||
class _PersonState:
|
||||
def __init__(self, face_id=0, distance=1.0):
|
||||
self.face_id = face_id
|
||||
self.distance = distance
|
||||
|
||||
class _FaceDetectionArray:
|
||||
def __init__(self, faces=None):
|
||||
self.faces = faces or []
|
||||
|
||||
class _PersonStateArray:
|
||||
def __init__(self, persons=None):
|
||||
self.persons = persons or []
|
||||
|
||||
rclpy_mod = sys.modules["rclpy"]
|
||||
rclpy_mod.init = lambda args=None: None
|
||||
rclpy_mod.spin = lambda node: None
|
||||
rclpy_mod.shutdown = lambda: None
|
||||
|
||||
sys.modules["rclpy.node"].Node = _Node
|
||||
sys.modules["rclpy.qos"].QoSProfile = _QoSProfile
|
||||
sys.modules["std_msgs.msg"].Bool = _Bool
|
||||
sys.modules["geometry_msgs.msg"].Twist = _Twist
|
||||
sys.modules["saltybot_social_msgs.msg"].FaceDetectionArray = _FaceDetectionArray
|
||||
sys.modules["saltybot_social_msgs.msg"].PersonStateArray = _PersonStateArray
|
||||
|
||||
return (_Node, _FakePub, _Bool, _Twist,
|
||||
_FaceDetection, _PersonState,
|
||||
_FaceDetectionArray, _PersonStateArray)
|
||||
|
||||
|
||||
(_Node, _FakePub, _Bool, _Twist,
|
||||
_FaceDetection, _PersonState,
|
||||
_FaceDetectionArray, _PersonStateArray) = _make_ros_stubs()
|
||||
|
||||
|
||||
# ── Module loader ─────────────────────────────────────────────────────────────
|
||||
|
||||
_SRC = (
|
||||
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||
"saltybot_social/saltybot_social/personal_space_node.py"
|
||||
)
|
||||
|
||||
|
||||
def _load_mod():
|
||||
spec = importlib.util.spec_from_file_location("personal_space_testmod", _SRC)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
|
||||
def _make_node(mod, **kwargs):
|
||||
node = mod.PersonalSpaceNode.__new__(mod.PersonalSpaceNode)
|
||||
defaults = {
|
||||
"personal_space_m": 0.8,
|
||||
"backup_speed": 0.1,
|
||||
"hysteresis_m": 0.1,
|
||||
"unknown_distance_m": 99.0,
|
||||
"lost_timeout_s": 1.5,
|
||||
"control_rate": 10.0,
|
||||
"faces_topic": "/social/faces/detected",
|
||||
"states_topic": "/social/person_states",
|
||||
}
|
||||
defaults.update(kwargs)
|
||||
node._params = dict(defaults)
|
||||
mod.PersonalSpaceNode.__init__(node)
|
||||
return node
|
||||
|
||||
|
||||
def _inject_faces(node, face_ids):
|
||||
"""Push a FaceDetectionArray message with the given face_ids."""
|
||||
msg = _FaceDetectionArray(faces=[_FaceDetection(fid) for fid in face_ids])
|
||||
node._subs["/social/faces/detected"](msg)
|
||||
|
||||
|
||||
def _inject_states(node, id_dist_pairs):
|
||||
"""Push a PersonStateArray message with (face_id, distance) pairs."""
|
||||
persons = [_PersonState(fid, dist) for fid, dist in id_dist_pairs]
|
||||
msg = _PersonStateArray(persons=persons)
|
||||
node._subs["/social/person_states"](msg)
|
||||
|
||||
|
||||
# ── Tests: pure helpers ────────────────────────────────────────────────────────
|
||||
|
||||
class TestClosestFaceDistance(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def _cfd(self, face_ids, cache, unknown=99.0):
|
||||
return self.mod.closest_face_distance(face_ids, cache, unknown)
|
||||
|
||||
def test_empty_face_ids_returns_unknown(self):
|
||||
self.assertEqual(self._cfd([], {}), 99.0)
|
||||
|
||||
def test_single_face_known(self):
|
||||
self.assertAlmostEqual(self._cfd([1], {1: 0.5}), 0.5)
|
||||
|
||||
def test_single_face_unknown(self):
|
||||
self.assertEqual(self._cfd([1], {}), 99.0)
|
||||
|
||||
def test_multiple_faces_min(self):
|
||||
dist = self._cfd([1, 2, 3], {1: 2.0, 2: 0.5, 3: 1.5})
|
||||
self.assertAlmostEqual(dist, 0.5)
|
||||
|
||||
def test_mixed_known_unknown(self):
|
||||
# unknown face treated as unknown_distance_m = 99, so min is the known one
|
||||
dist = self._cfd([1, 2], {1: 0.6}, 99.0)
|
||||
self.assertAlmostEqual(dist, 0.6)
|
||||
|
||||
def test_all_unknown_returns_unknown(self):
|
||||
dist = self._cfd([1, 2], {}, 99.0)
|
||||
self.assertEqual(dist, 99.0)
|
||||
|
||||
def test_custom_unknown_value(self):
|
||||
dist = self._cfd([], {}, unknown=5.0)
|
||||
self.assertEqual(dist, 5.0)
|
||||
|
||||
|
||||
class TestShouldBackup(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def test_at_threshold_triggers(self):
|
||||
self.assertTrue(self.mod.should_backup(0.8, 0.8))
|
||||
|
||||
def test_below_threshold_triggers(self):
|
||||
self.assertTrue(self.mod.should_backup(0.5, 0.8))
|
||||
|
||||
def test_above_threshold_no_trigger(self):
|
||||
self.assertFalse(self.mod.should_backup(1.0, 0.8))
|
||||
|
||||
def test_just_above_no_trigger(self):
|
||||
self.assertFalse(self.mod.should_backup(0.81, 0.8))
|
||||
|
||||
|
||||
class TestShouldClear(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def test_above_band_clears(self):
|
||||
# 0.8 + 0.1 = 0.9; 1.0 > 0.9 → clear
|
||||
self.assertTrue(self.mod.should_clear(1.0, 0.8, 0.1))
|
||||
|
||||
def test_at_band_edge_does_not_clear(self):
|
||||
self.assertFalse(self.mod.should_clear(0.9, 0.8, 0.1))
|
||||
|
||||
def test_within_band_does_not_clear(self):
|
||||
self.assertFalse(self.mod.should_clear(0.85, 0.8, 0.1))
|
||||
|
||||
def test_below_threshold_does_not_clear(self):
|
||||
self.assertFalse(self.mod.should_clear(0.5, 0.8, 0.1))
|
||||
|
||||
def test_just_above_band_clears(self):
|
||||
self.assertTrue(self.mod.should_clear(0.91, 0.8, 0.1))
|
||||
|
||||
|
||||
# ── Tests: node init ──────────────────────────────────────────────────────────
|
||||
|
||||
class TestNodeInit(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def test_instantiates(self):
|
||||
self.assertIsNotNone(_make_node(self.mod))
|
||||
|
||||
def test_cmd_vel_publisher(self):
|
||||
node = _make_node(self.mod)
|
||||
self.assertIn("/cmd_vel", node._pubs)
|
||||
|
||||
def test_too_close_publisher(self):
|
||||
node = _make_node(self.mod)
|
||||
self.assertIn("/saltybot/too_close", node._pubs)
|
||||
|
||||
def test_faces_subscriber(self):
|
||||
node = _make_node(self.mod)
|
||||
self.assertIn("/social/faces/detected", node._subs)
|
||||
|
||||
def test_states_subscriber(self):
|
||||
node = _make_node(self.mod)
|
||||
self.assertIn("/social/person_states", node._subs)
|
||||
|
||||
def test_timer_registered(self):
|
||||
node = _make_node(self.mod)
|
||||
self.assertGreater(len(node._timers), 0)
|
||||
|
||||
def test_too_close_initially_false(self):
|
||||
node = _make_node(self.mod)
|
||||
self.assertFalse(node._too_close)
|
||||
|
||||
def test_distance_cache_empty(self):
|
||||
node = _make_node(self.mod)
|
||||
self.assertEqual(node._distance_cache, {})
|
||||
|
||||
def test_active_face_ids_empty(self):
|
||||
node = _make_node(self.mod)
|
||||
self.assertEqual(node._active_face_ids, [])
|
||||
|
||||
def test_custom_topics(self):
|
||||
node = _make_node(self.mod,
|
||||
faces_topic="/my/faces",
|
||||
states_topic="/my/states")
|
||||
self.assertIn("/my/faces", node._subs)
|
||||
self.assertIn("/my/states", node._subs)
|
||||
|
||||
def test_parameters_stored(self):
|
||||
node = _make_node(self.mod, personal_space_m=1.2, backup_speed=0.2)
|
||||
self.assertAlmostEqual(node._space, 1.2)
|
||||
self.assertAlmostEqual(node._backup_speed, 0.2)
|
||||
|
||||
|
||||
# ── Tests: _on_person_states ──────────────────────────────────────────────────
|
||||
|
||||
class TestOnPersonStates(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def setUp(self):
|
||||
self.node = _make_node(self.mod)
|
||||
|
||||
def test_single_state_cached(self):
|
||||
_inject_states(self.node, [(1, 1.5)])
|
||||
self.assertAlmostEqual(self.node._distance_cache[1], 1.5)
|
||||
|
||||
def test_multiple_states_cached(self):
|
||||
_inject_states(self.node, [(1, 1.5), (2, 0.6), (3, 2.0)])
|
||||
self.assertAlmostEqual(self.node._distance_cache[1], 1.5)
|
||||
self.assertAlmostEqual(self.node._distance_cache[2], 0.6)
|
||||
self.assertAlmostEqual(self.node._distance_cache[3], 2.0)
|
||||
|
||||
def test_state_updated_on_second_msg(self):
|
||||
_inject_states(self.node, [(1, 1.5)])
|
||||
_inject_states(self.node, [(1, 0.4)])
|
||||
self.assertAlmostEqual(self.node._distance_cache[1], 0.4)
|
||||
|
||||
def test_negative_face_id_ignored(self):
|
||||
_inject_states(self.node, [(-1, 0.5)])
|
||||
self.assertNotIn(-1, self.node._distance_cache)
|
||||
|
||||
def test_zero_face_id_stored(self):
|
||||
_inject_states(self.node, [(0, 0.7)])
|
||||
self.assertIn(0, self.node._distance_cache)
|
||||
|
||||
def test_distance_cast_to_float(self):
|
||||
_inject_states(self.node, [(1, 1)]) # integer distance
|
||||
self.assertIsInstance(self.node._distance_cache[1], float)
|
||||
|
||||
|
||||
# ── Tests: _on_faces ──────────────────────────────────────────────────────────
|
||||
|
||||
class TestOnFaces(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def setUp(self):
|
||||
self.node = _make_node(self.mod)
|
||||
|
||||
def test_face_ids_stored(self):
|
||||
_inject_faces(self.node, [1, 2, 3])
|
||||
self.assertEqual(sorted(self.node._active_face_ids), [1, 2, 3])
|
||||
|
||||
def test_empty_faces_stored(self):
|
||||
_inject_faces(self.node, [])
|
||||
self.assertEqual(self.node._active_face_ids, [])
|
||||
|
||||
def test_timestamp_updated_on_faces(self):
|
||||
before = time.monotonic()
|
||||
_inject_faces(self.node, [1])
|
||||
self.assertGreaterEqual(self.node._last_face_t, before)
|
||||
|
||||
def test_timestamp_not_updated_on_empty(self):
|
||||
self.node._last_face_t = 0.0
|
||||
_inject_faces(self.node, [])
|
||||
self.assertEqual(self.node._last_face_t, 0.0)
|
||||
|
||||
def test_faces_replaced_not_merged(self):
|
||||
_inject_faces(self.node, [1, 2])
|
||||
_inject_faces(self.node, [3])
|
||||
self.assertEqual(sorted(self.node._active_face_ids), [3])
|
||||
|
||||
|
||||
# ── Tests: control loop ───────────────────────────────────────────────────────
|
||||
|
||||
class TestControlLoop(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def setUp(self):
|
||||
self.node = _make_node(self.mod, personal_space_m=0.8, backup_speed=0.1,
|
||||
hysteresis_m=0.1, lost_timeout_s=1.5)
|
||||
self.cmd_pub = self.node._pubs["/cmd_vel"]
|
||||
self.close_pub = self.node._pubs["/saltybot/too_close"]
|
||||
|
||||
def _fresh_face(self, face_id=1, distance=1.0):
|
||||
"""Inject a face + state so the node sees a fresh, close/far person."""
|
||||
_inject_states(self.node, [(face_id, distance)])
|
||||
_inject_faces(self.node, [face_id])
|
||||
|
||||
def test_no_face_publishes_zero_twist(self):
|
||||
self.node._control_cb()
|
||||
self.assertEqual(len(self.cmd_pub.msgs), 1)
|
||||
self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, 0.0)
|
||||
|
||||
def test_no_face_publishes_false(self):
|
||||
self.node._control_cb()
|
||||
self.assertFalse(self.close_pub.msgs[-1].data)
|
||||
|
||||
def test_person_far_no_backup(self):
|
||||
self._fresh_face(distance=1.5)
|
||||
self.node._control_cb()
|
||||
self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, 0.0)
|
||||
self.assertFalse(self.close_pub.msgs[-1].data)
|
||||
|
||||
def test_person_at_threshold_triggers_backup(self):
|
||||
self._fresh_face(distance=0.8)
|
||||
self.node._control_cb()
|
||||
self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, -0.1)
|
||||
self.assertTrue(self.close_pub.msgs[-1].data)
|
||||
|
||||
def test_person_too_close_triggers_backup(self):
|
||||
self._fresh_face(distance=0.5)
|
||||
self.node._control_cb()
|
||||
self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, -0.1)
|
||||
self.assertTrue(self.close_pub.msgs[-1].data)
|
||||
|
||||
def test_too_close_state_latches(self):
|
||||
"""State stays TOO_CLOSE even if distance jumps to mid-band."""
|
||||
self._fresh_face(distance=0.5)
|
||||
self.node._control_cb() # enter TOO_CLOSE
|
||||
# Now person is at 0.85 — inside hysteresis band (0.8 < 0.85 < 0.9)
|
||||
_inject_states(self.node, [(1, 0.85)])
|
||||
self.node._control_cb()
|
||||
self.assertTrue(self.node._too_close)
|
||||
self.assertTrue(self.close_pub.msgs[-1].data)
|
||||
|
||||
def test_hysteresis_clears_state(self):
|
||||
"""State clears when distance exceeds personal_space_m + hysteresis_m."""
|
||||
self._fresh_face(distance=0.5)
|
||||
self.node._control_cb() # enter TOO_CLOSE
|
||||
# Person retreats past hysteresis band (> 0.9 m)
|
||||
_inject_states(self.node, [(1, 1.0)])
|
||||
self.node._control_cb()
|
||||
self.assertFalse(self.node._too_close)
|
||||
self.assertFalse(self.close_pub.msgs[-1].data)
|
||||
|
||||
def test_backup_speed_magnitude(self):
|
||||
node = _make_node(self.mod, backup_speed=0.25)
|
||||
_inject_states(node, [(1, 0.3)])
|
||||
_inject_faces(node, [1])
|
||||
node._control_cb()
|
||||
self.assertAlmostEqual(node._pubs["/cmd_vel"].msgs[-1].linear.x, -0.25)
|
||||
|
||||
def test_state_machine_full_cycle(self):
|
||||
"""CLEAR → TOO_CLOSE → CLEAR."""
|
||||
# Start clear
|
||||
self._fresh_face(distance=1.5)
|
||||
self.node._control_cb()
|
||||
self.assertFalse(self.node._too_close)
|
||||
|
||||
# Enter TOO_CLOSE
|
||||
_inject_states(self.node, [(1, 0.5)])
|
||||
self.node._control_cb()
|
||||
self.assertTrue(self.node._too_close)
|
||||
|
||||
# Return to CLEAR
|
||||
_inject_states(self.node, [(1, 1.0)])
|
||||
self.node._control_cb()
|
||||
self.assertFalse(self.node._too_close)
|
||||
|
||||
def test_multiple_faces_closest_used(self):
|
||||
"""Closest face determines the backup decision."""
|
||||
_inject_states(self.node, [(1, 2.0), (2, 0.5)])
|
||||
_inject_faces(self.node, [1, 2])
|
||||
self.node._control_cb()
|
||||
self.assertTrue(self.node._too_close)
|
||||
|
||||
def test_both_far_no_backup(self):
|
||||
_inject_states(self.node, [(1, 1.5), (2, 2.0)])
|
||||
_inject_faces(self.node, [1, 2])
|
||||
self.node._control_cb()
|
||||
self.assertFalse(self.node._too_close)
|
||||
|
||||
def test_unknown_face_uses_unknown_distance(self):
|
||||
"""Face without PersonState gets unknown_distance_m (default=99 → safe)."""
|
||||
# No PersonState injected, so face_id=5 has no cache entry
|
||||
_inject_faces(self.node, [5])
|
||||
self.node._last_face_t = time.monotonic() # mark fresh
|
||||
self.node._control_cb()
|
||||
self.assertFalse(self.node._too_close)
|
||||
|
||||
def test_publishes_on_every_tick(self):
|
||||
self.node._control_cb()
|
||||
self.node._control_cb()
|
||||
self.node._control_cb()
|
||||
self.assertEqual(len(self.cmd_pub.msgs), 3)
|
||||
self.assertEqual(len(self.close_pub.msgs), 3)
|
||||
|
||||
|
||||
# ── Tests: face freshness / lost timeout ──────────────────────────────────────
|
||||
|
||||
class TestFaceFreshness(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def setUp(self):
|
||||
self.node = _make_node(self.mod, lost_timeout_s=1.5)
|
||||
self.cmd_pub = self.node._pubs["/cmd_vel"]
|
||||
self.close_pub = self.node._pubs["/saltybot/too_close"]
|
||||
|
||||
def test_stale_face_clears_state(self):
|
||||
"""If no fresh face data, state clears and robot stops."""
|
||||
# Manually set backed-up state
|
||||
self.node._too_close = True
|
||||
self.node._last_face_t = time.monotonic() - 10.0 # stale
|
||||
self.node._control_cb()
|
||||
self.assertFalse(self.node._too_close)
|
||||
self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, 0.0)
|
||||
self.assertFalse(self.close_pub.msgs[-1].data)
|
||||
|
||||
def test_fresh_face_active(self):
|
||||
"""Recently received face is treated as fresh."""
|
||||
_inject_states(self.node, [(1, 0.5)])
|
||||
_inject_faces(self.node, [1]) # updates _last_face_t
|
||||
self.node._control_cb()
|
||||
self.assertTrue(self.node._too_close)
|
||||
|
||||
def test_no_face_ever_is_stale(self):
|
||||
"""Never received a face → stale, no backup."""
|
||||
# _last_face_t = 0.0 (default) → not fresh
|
||||
self.node._control_cb()
|
||||
self.assertFalse(self.node._too_close)
|
||||
|
||||
def test_stale_logs_info(self):
|
||||
self.node._too_close = True
|
||||
self.node._last_face_t = time.monotonic() - 100.0
|
||||
self.node._control_cb()
|
||||
infos = [l for l in self.node._logs if l[0] == "INFO"]
|
||||
self.assertTrue(any("face lost" in m.lower() or "clearing" in m.lower()
|
||||
for _, m in infos))
|
||||
|
||||
def test_too_close_entry_logs_warn(self):
|
||||
_inject_states(self.node, [(1, 0.3)])
|
||||
_inject_faces(self.node, [1])
|
||||
self.node._control_cb()
|
||||
warns = [l for l in self.node._logs if l[0] == "WARN"]
|
||||
self.assertTrue(any("too close" in m.lower() or "backing" in m.lower()
|
||||
for _, m in warns))
|
||||
|
||||
def test_clear_logs_info(self):
|
||||
_inject_states(self.node, [(1, 0.3)])
|
||||
_inject_faces(self.node, [1])
|
||||
self.node._control_cb()
|
||||
_inject_states(self.node, [(1, 1.5)])
|
||||
self.node._control_cb()
|
||||
infos = [l for l in self.node._logs if l[0] == "INFO"]
|
||||
self.assertTrue(any("clear" in m.lower() for _, m in infos))
|
||||
|
||||
|
||||
# ── Tests: _publish ───────────────────────────────────────────────────────────
|
||||
|
||||
class TestPublish(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls): cls.mod = _load_mod()
|
||||
|
||||
def setUp(self):
|
||||
self.node = _make_node(self.mod, backup_speed=0.15)
|
||||
self.cmd_pub = self.node._pubs["/cmd_vel"]
|
||||
self.bool_pub = self.node._pubs["/saltybot/too_close"]
|
||||
|
||||
def test_backup_false_zero_linear_x(self):
|
||||
self.node._publish(backup=False)
|
||||
self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, 0.0)
|
||||
|
||||
def test_backup_true_negative_linear_x(self):
|
||||
self.node._publish(backup=True)
|
||||
self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, -0.15)
|
||||
|
||||
def test_backup_false_bool_false(self):
|
||||
self.node._publish(backup=False)
|
||||
self.assertFalse(self.bool_pub.msgs[-1].data)
|
||||
|
||||
def test_backup_true_bool_true(self):
|
||||
self.node._publish(backup=True)
|
||||
self.assertTrue(self.bool_pub.msgs[-1].data)
|
||||
|
||||
def test_angular_z_always_zero(self):
|
||||
self.node._publish(backup=True)
|
||||
self.assertAlmostEqual(self.cmd_pub.msgs[-1].angular.z, 0.0)
|
||||
|
||||
def test_both_topics_published(self):
|
||||
self.node._publish(backup=False)
|
||||
self.assertEqual(len(self.cmd_pub.msgs), 1)
|
||||
self.assertEqual(len(self.bool_pub.msgs), 1)
|
||||
|
||||
|
||||
# ── Tests: source content ─────────────────────────────────────────────────────
|
||||
|
||||
class TestNodeSrc(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
with open(_SRC) as f: cls.src = f.read()
|
||||
|
||||
def test_issue_tag(self): self.assertIn("#310", self.src)
|
||||
def test_cmd_vel_topic(self): self.assertIn("/cmd_vel", self.src)
|
||||
def test_too_close_topic(self): self.assertIn("/saltybot/too_close", self.src)
|
||||
def test_faces_topic(self): self.assertIn("/social/faces/detected", self.src)
|
||||
def test_states_topic(self): self.assertIn("/social/person_states", self.src)
|
||||
def test_closest_face_fn(self): self.assertIn("closest_face_distance", self.src)
|
||||
def test_should_backup_fn(self): self.assertIn("should_backup", self.src)
|
||||
def test_should_clear_fn(self): self.assertIn("should_clear", self.src)
|
||||
def test_hysteresis_param(self): self.assertIn("hysteresis_m", self.src)
|
||||
def test_backup_speed_param(self): self.assertIn("backup_speed", self.src)
|
||||
def test_personal_space_param(self): self.assertIn("personal_space_m", self.src)
|
||||
def test_lost_timeout_param(self): self.assertIn("lost_timeout_s", self.src)
|
||||
def test_control_rate_param(self): self.assertIn("control_rate", self.src)
|
||||
def test_threading_lock(self): self.assertIn("threading.Lock", self.src)
|
||||
def test_linear_x(self): self.assertIn("linear.x", self.src)
|
||||
def test_twist_published(self): self.assertIn("Twist", self.src)
|
||||
def test_bool_published(self): self.assertIn("Bool", self.src)
|
||||
def test_main_defined(self): self.assertIn("def main", self.src)
|
||||
def test_face_detection_array(self): self.assertIn("FaceDetectionArray", self.src)
|
||||
def test_person_state_array(self): self.assertIn("PersonStateArray", self.src)
|
||||
|
||||
|
||||
# ── Tests: config / launch / setup ────────────────────────────────────────────
|
||||
|
||||
class TestConfig(unittest.TestCase):
|
||||
_CONFIG = (
|
||||
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||
"saltybot_social/config/personal_space_params.yaml"
|
||||
)
|
||||
_LAUNCH = (
|
||||
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||
"saltybot_social/launch/personal_space.launch.py"
|
||||
)
|
||||
_SETUP = (
|
||||
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
|
||||
"saltybot_social/setup.py"
|
||||
)
|
||||
|
||||
def test_config_exists(self):
|
||||
import os; self.assertTrue(os.path.exists(self._CONFIG))
|
||||
|
||||
def test_config_personal_space_m(self):
|
||||
with open(self._CONFIG) as f: c = f.read()
|
||||
self.assertIn("personal_space_m", c)
|
||||
|
||||
def test_config_backup_speed(self):
|
||||
with open(self._CONFIG) as f: c = f.read()
|
||||
self.assertIn("backup_speed", c)
|
||||
|
||||
def test_config_hysteresis(self):
|
||||
with open(self._CONFIG) as f: c = f.read()
|
||||
self.assertIn("hysteresis_m", c)
|
||||
|
||||
def test_config_lost_timeout(self):
|
||||
with open(self._CONFIG) as f: c = f.read()
|
||||
self.assertIn("lost_timeout_s", c)
|
||||
|
||||
def test_launch_exists(self):
|
||||
import os; self.assertTrue(os.path.exists(self._LAUNCH))
|
||||
|
||||
def test_launch_has_personal_space_arg(self):
|
||||
with open(self._LAUNCH) as f: c = f.read()
|
||||
self.assertIn("personal_space_m", c)
|
||||
|
||||
def test_launch_has_backup_speed_arg(self):
|
||||
with open(self._LAUNCH) as f: c = f.read()
|
||||
self.assertIn("backup_speed", c)
|
||||
|
||||
def test_entry_point_in_setup(self):
|
||||
with open(self._SETUP) as f: c = f.read()
|
||||
self.assertIn("personal_space_node", c)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Loading…
x
Reference in New Issue
Block a user