feat(social): personal space respector node (Issue #310) #314

Merged
sl-jetson merged 1 commits from sl-jetson/issue-310-personal-space into main 2026-03-03 00:20:10 -05:00
5 changed files with 928 additions and 0 deletions

View File

@ -0,0 +1,10 @@
personal_space_node:
ros__parameters:
personal_space_m: 0.8 # Minimum comfortable distance (m)
backup_speed: 0.1 # Retreat linear speed (m/s, applied as -x)
hysteresis_m: 0.1 # Hysteresis band above threshold (m)
unknown_distance_m: 99.0 # Distance assumed for faces without a PersonState
lost_timeout_s: 1.5 # Face freshness window (s)
control_rate: 10.0 # Control loop / publish rate (Hz)
faces_topic: "/social/faces/detected"
states_topic: "/social/person_states"

View File

@ -0,0 +1,39 @@
"""personal_space.launch.py -- Launch personal space respector (Issue #310).
Usage:
ros2 launch saltybot_social personal_space.launch.py
ros2 launch saltybot_social personal_space.launch.py personal_space_m:=1.0 backup_speed:=0.15
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg = get_package_share_directory("saltybot_social")
cfg = os.path.join(pkg, "config", "personal_space_params.yaml")
return LaunchDescription([
DeclareLaunchArgument("personal_space_m", default_value="0.8",
description="Minimum comfortable distance (m)"),
DeclareLaunchArgument("backup_speed", default_value="0.1",
description="Retreat linear speed (m/s)"),
Node(
package="saltybot_social",
executable="personal_space_node",
name="personal_space_node",
output="screen",
parameters=[
cfg,
{
"personal_space_m": LaunchConfiguration("personal_space_m"),
"backup_speed": LaunchConfiguration("backup_speed"),
},
],
),
])

View File

@ -0,0 +1,211 @@
"""personal_space_node.py — Personal space respector.
Issue #310
Monitors the distance to detected faces. When the closest person
enters within ``personal_space_m`` metres (default 0.8 m) the robot
slowly backs up (``Twist.linear.x = -backup_speed``) and latches
/saltybot/too_close to True. Once the person retreats beyond
``personal_space_m + hysteresis_m`` the robot stops and the latch clears.
Distance is read from /social/person_states (PersonStateArray, face_id
distance). When a face has no matching PersonState entry the distance is
treated as ``unknown_distance_m`` (default: very large safe, no backup).
Subscriptions
/social/faces/detected saltybot_social_msgs/FaceDetectionArray
/social/person_states saltybot_social_msgs/PersonStateArray
Publications
/cmd_vel geometry_msgs/Twist linear.x only; angular.z = 0
/saltybot/too_close std_msgs/Bool True while backing up
State machine
CLEAR no face within personal_space_m; publish zero Twist + False
TOO_CLOSE face within personal_space_m; publish backup Twist + True
clears only when distance > personal_space_m + hysteresis_m
Parameters
personal_space_m (float, 0.8) minimum comfortable distance (m)
backup_speed (float, 0.1) retreat linear speed (m/s, applied as -x)
hysteresis_m (float, 0.1) hysteresis band above threshold (m)
unknown_distance_m (float, 99.0) distance assumed for faces without state
lost_timeout_s (float, 1.5) face freshness window (s)
control_rate (float, 10.0) publish rate (Hz)
faces_topic (str, "/social/faces/detected")
states_topic (str, "/social/person_states")
"""
from __future__ import annotations
import time
import threading
from typing import Dict
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import Bool
from geometry_msgs.msg import Twist
try:
from saltybot_social_msgs.msg import FaceDetectionArray, PersonStateArray
_MSGS = True
except ImportError:
_MSGS = False
# ── Pure helpers ───────────────────────────────────────────────────────────────
def closest_face_distance(face_ids, distance_cache: Dict[int, float],
unknown_distance_m: float) -> float:
"""Return the minimum distance across all visible face_ids.
Falls back to *unknown_distance_m* for any face_id not in the cache.
Returns *unknown_distance_m* when *face_ids* is empty.
"""
if not face_ids:
return unknown_distance_m
return min(distance_cache.get(fid, unknown_distance_m) for fid in face_ids)
def should_backup(distance: float, personal_space_m: float) -> bool:
return distance <= personal_space_m
def should_clear(distance: float, personal_space_m: float,
hysteresis_m: float) -> bool:
return distance > personal_space_m + hysteresis_m
# ── ROS2 node ──────────────────────────────────────────────────────────────────
class PersonalSpaceNode(Node):
"""Backs the robot away when a person enters personal space."""
def __init__(self) -> None:
super().__init__("personal_space_node")
self.declare_parameter("personal_space_m", 0.8)
self.declare_parameter("backup_speed", 0.1)
self.declare_parameter("hysteresis_m", 0.1)
self.declare_parameter("unknown_distance_m", 99.0)
self.declare_parameter("lost_timeout_s", 1.5)
self.declare_parameter("control_rate", 10.0)
self.declare_parameter("faces_topic", "/social/faces/detected")
self.declare_parameter("states_topic", "/social/person_states")
self._space = self.get_parameter("personal_space_m").value
self._backup_speed = self.get_parameter("backup_speed").value
self._hysteresis = self.get_parameter("hysteresis_m").value
self._unknown_dist = self.get_parameter("unknown_distance_m").value
self._lost_t = self.get_parameter("lost_timeout_s").value
rate = self.get_parameter("control_rate").value
faces_topic = self.get_parameter("faces_topic").value
states_topic = self.get_parameter("states_topic").value
# State
self._too_close: bool = False
self._distance_cache: Dict[int, float] = {} # face_id → distance (m)
self._active_face_ids: list = [] # face_ids in last msg
self._last_face_t: float = 0.0
self._lock = threading.Lock()
qos = QoSProfile(depth=10)
self._cmd_pub = self.create_publisher(Twist, "/cmd_vel", qos)
self._too_close_pub = self.create_publisher(Bool, "/saltybot/too_close", qos)
if _MSGS:
self._states_sub = self.create_subscription(
PersonStateArray, states_topic, self._on_person_states, qos
)
self._faces_sub = self.create_subscription(
FaceDetectionArray, faces_topic, self._on_faces, qos
)
else:
self.get_logger().warn(
"saltybot_social_msgs not available — node passive"
)
self._timer = self.create_timer(1.0 / rate, self._control_cb)
self.get_logger().info(
f"PersonalSpaceNode ready "
f"(space={self._space}m, backup={self._backup_speed}m/s, "
f"hysteresis={self._hysteresis}m)"
)
# ── Subscription callbacks ─────────────────────────────────────────────
def _on_person_states(self, msg) -> None:
with self._lock:
for ps in msg.persons:
if ps.face_id >= 0:
self._distance_cache[ps.face_id] = float(ps.distance)
def _on_faces(self, msg) -> None:
face_ids = [int(f.face_id) for f in msg.faces]
with self._lock:
self._active_face_ids = face_ids
if face_ids:
self._last_face_t = time.monotonic()
# ── Control loop ───────────────────────────────────────────────────────
def _control_cb(self) -> None:
now = time.monotonic()
with self._lock:
face_ids = list(self._active_face_ids)
last_face_t = self._last_face_t
cache = dict(self._distance_cache)
face_fresh = last_face_t > 0.0 and (now - last_face_t) < self._lost_t
if not face_fresh:
# No fresh face data → clear state, stop
if self._too_close:
self._too_close = False
self.get_logger().info("PersonalSpace: face lost — clearing")
self._publish(backup=False)
return
dist = closest_face_distance(face_ids, cache, self._unknown_dist)
if not self._too_close and should_backup(dist, self._space):
self._too_close = True
self.get_logger().warn(
f"PersonalSpace: TOO CLOSE ({dist:.2f}m <= {self._space}m) — backing up"
)
elif self._too_close and should_clear(dist, self._space, self._hysteresis):
self._too_close = False
self.get_logger().info(
f"PersonalSpace: CLEAR ({dist:.2f}m > {self._space + self._hysteresis:.2f}m)"
)
self._publish(backup=self._too_close)
def _publish(self, backup: bool) -> None:
twist = Twist()
if backup:
twist.linear.x = -self._backup_speed
bool_msg = Bool()
bool_msg.data = backup
self._cmd_pub.publish(twist)
self._too_close_pub.publish(bool_msg)
def main(args=None) -> None:
rclpy.init(args=args)
node = PersonalSpaceNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@ -55,6 +55,8 @@ setup(
'volume_adjust_node = saltybot_social.volume_adjust_node:main', 'volume_adjust_node = saltybot_social.volume_adjust_node:main',
# Conversation topic memory (Issue #299) # Conversation topic memory (Issue #299)
'topic_memory_node = saltybot_social.topic_memory_node:main', 'topic_memory_node = saltybot_social.topic_memory_node:main',
# Personal space respector (Issue #310)
'personal_space_node = saltybot_social.personal_space_node:main',
], ],
}, },
) )

View File

@ -0,0 +1,666 @@
"""test_personal_space.py — Offline tests for personal_space_node (Issue #310).
Stubs out rclpy and ROS message types so tests run without a ROS install.
"""
import importlib.util
import sys
import time
import types
import unittest
# ── ROS2 stubs ────────────────────────────────────────────────────────────────
def _make_ros_stubs():
for mod_name in ("rclpy", "rclpy.node", "rclpy.qos",
"std_msgs", "std_msgs.msg",
"geometry_msgs", "geometry_msgs.msg",
"saltybot_social_msgs", "saltybot_social_msgs.msg"):
if mod_name not in sys.modules:
sys.modules[mod_name] = types.ModuleType(mod_name)
class _Node:
def __init__(self, name="node"):
self._name = name
if not hasattr(self, "_params"):
self._params = {}
self._pubs = {}
self._subs = {}
self._logs = []
self._timers = []
def declare_parameter(self, name, default):
if name not in self._params:
self._params[name] = default
def get_parameter(self, name):
class _P:
def __init__(self, v): self.value = v
return _P(self._params.get(name))
def create_publisher(self, msg_type, topic, qos):
pub = _FakePub()
self._pubs[topic] = pub
return pub
def create_subscription(self, msg_type, topic, cb, qos):
self._subs[topic] = cb
return object()
def create_timer(self, period, cb):
self._timers.append(cb)
return object()
def get_logger(self):
node = self
class _L:
def info(self, m): node._logs.append(("INFO", m))
def warn(self, m): node._logs.append(("WARN", m))
def error(self, m): node._logs.append(("ERROR", m))
return _L()
def destroy_node(self): pass
class _FakePub:
def __init__(self):
self.msgs = []
def publish(self, msg):
self.msgs.append(msg)
class _QoSProfile:
def __init__(self, depth=10): self.depth = depth
class _Bool:
def __init__(self): self.data = False
class _TwistLinear:
def __init__(self): self.x = 0.0
class _TwistAngular:
def __init__(self): self.z = 0.0
class _Twist:
def __init__(self):
self.linear = _TwistLinear()
self.angular = _TwistAngular()
class _FaceDetection:
def __init__(self, face_id=0):
self.face_id = face_id
class _PersonState:
def __init__(self, face_id=0, distance=1.0):
self.face_id = face_id
self.distance = distance
class _FaceDetectionArray:
def __init__(self, faces=None):
self.faces = faces or []
class _PersonStateArray:
def __init__(self, persons=None):
self.persons = persons or []
rclpy_mod = sys.modules["rclpy"]
rclpy_mod.init = lambda args=None: None
rclpy_mod.spin = lambda node: None
rclpy_mod.shutdown = lambda: None
sys.modules["rclpy.node"].Node = _Node
sys.modules["rclpy.qos"].QoSProfile = _QoSProfile
sys.modules["std_msgs.msg"].Bool = _Bool
sys.modules["geometry_msgs.msg"].Twist = _Twist
sys.modules["saltybot_social_msgs.msg"].FaceDetectionArray = _FaceDetectionArray
sys.modules["saltybot_social_msgs.msg"].PersonStateArray = _PersonStateArray
return (_Node, _FakePub, _Bool, _Twist,
_FaceDetection, _PersonState,
_FaceDetectionArray, _PersonStateArray)
(_Node, _FakePub, _Bool, _Twist,
_FaceDetection, _PersonState,
_FaceDetectionArray, _PersonStateArray) = _make_ros_stubs()
# ── Module loader ─────────────────────────────────────────────────────────────
_SRC = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/saltybot_social/personal_space_node.py"
)
def _load_mod():
spec = importlib.util.spec_from_file_location("personal_space_testmod", _SRC)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def _make_node(mod, **kwargs):
node = mod.PersonalSpaceNode.__new__(mod.PersonalSpaceNode)
defaults = {
"personal_space_m": 0.8,
"backup_speed": 0.1,
"hysteresis_m": 0.1,
"unknown_distance_m": 99.0,
"lost_timeout_s": 1.5,
"control_rate": 10.0,
"faces_topic": "/social/faces/detected",
"states_topic": "/social/person_states",
}
defaults.update(kwargs)
node._params = dict(defaults)
mod.PersonalSpaceNode.__init__(node)
return node
def _inject_faces(node, face_ids):
"""Push a FaceDetectionArray message with the given face_ids."""
msg = _FaceDetectionArray(faces=[_FaceDetection(fid) for fid in face_ids])
node._subs["/social/faces/detected"](msg)
def _inject_states(node, id_dist_pairs):
"""Push a PersonStateArray message with (face_id, distance) pairs."""
persons = [_PersonState(fid, dist) for fid, dist in id_dist_pairs]
msg = _PersonStateArray(persons=persons)
node._subs["/social/person_states"](msg)
# ── Tests: pure helpers ────────────────────────────────────────────────────────
class TestClosestFaceDistance(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def _cfd(self, face_ids, cache, unknown=99.0):
return self.mod.closest_face_distance(face_ids, cache, unknown)
def test_empty_face_ids_returns_unknown(self):
self.assertEqual(self._cfd([], {}), 99.0)
def test_single_face_known(self):
self.assertAlmostEqual(self._cfd([1], {1: 0.5}), 0.5)
def test_single_face_unknown(self):
self.assertEqual(self._cfd([1], {}), 99.0)
def test_multiple_faces_min(self):
dist = self._cfd([1, 2, 3], {1: 2.0, 2: 0.5, 3: 1.5})
self.assertAlmostEqual(dist, 0.5)
def test_mixed_known_unknown(self):
# unknown face treated as unknown_distance_m = 99, so min is the known one
dist = self._cfd([1, 2], {1: 0.6}, 99.0)
self.assertAlmostEqual(dist, 0.6)
def test_all_unknown_returns_unknown(self):
dist = self._cfd([1, 2], {}, 99.0)
self.assertEqual(dist, 99.0)
def test_custom_unknown_value(self):
dist = self._cfd([], {}, unknown=5.0)
self.assertEqual(dist, 5.0)
class TestShouldBackup(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def test_at_threshold_triggers(self):
self.assertTrue(self.mod.should_backup(0.8, 0.8))
def test_below_threshold_triggers(self):
self.assertTrue(self.mod.should_backup(0.5, 0.8))
def test_above_threshold_no_trigger(self):
self.assertFalse(self.mod.should_backup(1.0, 0.8))
def test_just_above_no_trigger(self):
self.assertFalse(self.mod.should_backup(0.81, 0.8))
class TestShouldClear(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def test_above_band_clears(self):
# 0.8 + 0.1 = 0.9; 1.0 > 0.9 → clear
self.assertTrue(self.mod.should_clear(1.0, 0.8, 0.1))
def test_at_band_edge_does_not_clear(self):
self.assertFalse(self.mod.should_clear(0.9, 0.8, 0.1))
def test_within_band_does_not_clear(self):
self.assertFalse(self.mod.should_clear(0.85, 0.8, 0.1))
def test_below_threshold_does_not_clear(self):
self.assertFalse(self.mod.should_clear(0.5, 0.8, 0.1))
def test_just_above_band_clears(self):
self.assertTrue(self.mod.should_clear(0.91, 0.8, 0.1))
# ── Tests: node init ──────────────────────────────────────────────────────────
class TestNodeInit(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def test_instantiates(self):
self.assertIsNotNone(_make_node(self.mod))
def test_cmd_vel_publisher(self):
node = _make_node(self.mod)
self.assertIn("/cmd_vel", node._pubs)
def test_too_close_publisher(self):
node = _make_node(self.mod)
self.assertIn("/saltybot/too_close", node._pubs)
def test_faces_subscriber(self):
node = _make_node(self.mod)
self.assertIn("/social/faces/detected", node._subs)
def test_states_subscriber(self):
node = _make_node(self.mod)
self.assertIn("/social/person_states", node._subs)
def test_timer_registered(self):
node = _make_node(self.mod)
self.assertGreater(len(node._timers), 0)
def test_too_close_initially_false(self):
node = _make_node(self.mod)
self.assertFalse(node._too_close)
def test_distance_cache_empty(self):
node = _make_node(self.mod)
self.assertEqual(node._distance_cache, {})
def test_active_face_ids_empty(self):
node = _make_node(self.mod)
self.assertEqual(node._active_face_ids, [])
def test_custom_topics(self):
node = _make_node(self.mod,
faces_topic="/my/faces",
states_topic="/my/states")
self.assertIn("/my/faces", node._subs)
self.assertIn("/my/states", node._subs)
def test_parameters_stored(self):
node = _make_node(self.mod, personal_space_m=1.2, backup_speed=0.2)
self.assertAlmostEqual(node._space, 1.2)
self.assertAlmostEqual(node._backup_speed, 0.2)
# ── Tests: _on_person_states ──────────────────────────────────────────────────
class TestOnPersonStates(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def setUp(self):
self.node = _make_node(self.mod)
def test_single_state_cached(self):
_inject_states(self.node, [(1, 1.5)])
self.assertAlmostEqual(self.node._distance_cache[1], 1.5)
def test_multiple_states_cached(self):
_inject_states(self.node, [(1, 1.5), (2, 0.6), (3, 2.0)])
self.assertAlmostEqual(self.node._distance_cache[1], 1.5)
self.assertAlmostEqual(self.node._distance_cache[2], 0.6)
self.assertAlmostEqual(self.node._distance_cache[3], 2.0)
def test_state_updated_on_second_msg(self):
_inject_states(self.node, [(1, 1.5)])
_inject_states(self.node, [(1, 0.4)])
self.assertAlmostEqual(self.node._distance_cache[1], 0.4)
def test_negative_face_id_ignored(self):
_inject_states(self.node, [(-1, 0.5)])
self.assertNotIn(-1, self.node._distance_cache)
def test_zero_face_id_stored(self):
_inject_states(self.node, [(0, 0.7)])
self.assertIn(0, self.node._distance_cache)
def test_distance_cast_to_float(self):
_inject_states(self.node, [(1, 1)]) # integer distance
self.assertIsInstance(self.node._distance_cache[1], float)
# ── Tests: _on_faces ──────────────────────────────────────────────────────────
class TestOnFaces(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def setUp(self):
self.node = _make_node(self.mod)
def test_face_ids_stored(self):
_inject_faces(self.node, [1, 2, 3])
self.assertEqual(sorted(self.node._active_face_ids), [1, 2, 3])
def test_empty_faces_stored(self):
_inject_faces(self.node, [])
self.assertEqual(self.node._active_face_ids, [])
def test_timestamp_updated_on_faces(self):
before = time.monotonic()
_inject_faces(self.node, [1])
self.assertGreaterEqual(self.node._last_face_t, before)
def test_timestamp_not_updated_on_empty(self):
self.node._last_face_t = 0.0
_inject_faces(self.node, [])
self.assertEqual(self.node._last_face_t, 0.0)
def test_faces_replaced_not_merged(self):
_inject_faces(self.node, [1, 2])
_inject_faces(self.node, [3])
self.assertEqual(sorted(self.node._active_face_ids), [3])
# ── Tests: control loop ───────────────────────────────────────────────────────
class TestControlLoop(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def setUp(self):
self.node = _make_node(self.mod, personal_space_m=0.8, backup_speed=0.1,
hysteresis_m=0.1, lost_timeout_s=1.5)
self.cmd_pub = self.node._pubs["/cmd_vel"]
self.close_pub = self.node._pubs["/saltybot/too_close"]
def _fresh_face(self, face_id=1, distance=1.0):
"""Inject a face + state so the node sees a fresh, close/far person."""
_inject_states(self.node, [(face_id, distance)])
_inject_faces(self.node, [face_id])
def test_no_face_publishes_zero_twist(self):
self.node._control_cb()
self.assertEqual(len(self.cmd_pub.msgs), 1)
self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, 0.0)
def test_no_face_publishes_false(self):
self.node._control_cb()
self.assertFalse(self.close_pub.msgs[-1].data)
def test_person_far_no_backup(self):
self._fresh_face(distance=1.5)
self.node._control_cb()
self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, 0.0)
self.assertFalse(self.close_pub.msgs[-1].data)
def test_person_at_threshold_triggers_backup(self):
self._fresh_face(distance=0.8)
self.node._control_cb()
self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, -0.1)
self.assertTrue(self.close_pub.msgs[-1].data)
def test_person_too_close_triggers_backup(self):
self._fresh_face(distance=0.5)
self.node._control_cb()
self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, -0.1)
self.assertTrue(self.close_pub.msgs[-1].data)
def test_too_close_state_latches(self):
"""State stays TOO_CLOSE even if distance jumps to mid-band."""
self._fresh_face(distance=0.5)
self.node._control_cb() # enter TOO_CLOSE
# Now person is at 0.85 — inside hysteresis band (0.8 < 0.85 < 0.9)
_inject_states(self.node, [(1, 0.85)])
self.node._control_cb()
self.assertTrue(self.node._too_close)
self.assertTrue(self.close_pub.msgs[-1].data)
def test_hysteresis_clears_state(self):
"""State clears when distance exceeds personal_space_m + hysteresis_m."""
self._fresh_face(distance=0.5)
self.node._control_cb() # enter TOO_CLOSE
# Person retreats past hysteresis band (> 0.9 m)
_inject_states(self.node, [(1, 1.0)])
self.node._control_cb()
self.assertFalse(self.node._too_close)
self.assertFalse(self.close_pub.msgs[-1].data)
def test_backup_speed_magnitude(self):
node = _make_node(self.mod, backup_speed=0.25)
_inject_states(node, [(1, 0.3)])
_inject_faces(node, [1])
node._control_cb()
self.assertAlmostEqual(node._pubs["/cmd_vel"].msgs[-1].linear.x, -0.25)
def test_state_machine_full_cycle(self):
"""CLEAR → TOO_CLOSE → CLEAR."""
# Start clear
self._fresh_face(distance=1.5)
self.node._control_cb()
self.assertFalse(self.node._too_close)
# Enter TOO_CLOSE
_inject_states(self.node, [(1, 0.5)])
self.node._control_cb()
self.assertTrue(self.node._too_close)
# Return to CLEAR
_inject_states(self.node, [(1, 1.0)])
self.node._control_cb()
self.assertFalse(self.node._too_close)
def test_multiple_faces_closest_used(self):
"""Closest face determines the backup decision."""
_inject_states(self.node, [(1, 2.0), (2, 0.5)])
_inject_faces(self.node, [1, 2])
self.node._control_cb()
self.assertTrue(self.node._too_close)
def test_both_far_no_backup(self):
_inject_states(self.node, [(1, 1.5), (2, 2.0)])
_inject_faces(self.node, [1, 2])
self.node._control_cb()
self.assertFalse(self.node._too_close)
def test_unknown_face_uses_unknown_distance(self):
"""Face without PersonState gets unknown_distance_m (default=99 → safe)."""
# No PersonState injected, so face_id=5 has no cache entry
_inject_faces(self.node, [5])
self.node._last_face_t = time.monotonic() # mark fresh
self.node._control_cb()
self.assertFalse(self.node._too_close)
def test_publishes_on_every_tick(self):
self.node._control_cb()
self.node._control_cb()
self.node._control_cb()
self.assertEqual(len(self.cmd_pub.msgs), 3)
self.assertEqual(len(self.close_pub.msgs), 3)
# ── Tests: face freshness / lost timeout ──────────────────────────────────────
class TestFaceFreshness(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def setUp(self):
self.node = _make_node(self.mod, lost_timeout_s=1.5)
self.cmd_pub = self.node._pubs["/cmd_vel"]
self.close_pub = self.node._pubs["/saltybot/too_close"]
def test_stale_face_clears_state(self):
"""If no fresh face data, state clears and robot stops."""
# Manually set backed-up state
self.node._too_close = True
self.node._last_face_t = time.monotonic() - 10.0 # stale
self.node._control_cb()
self.assertFalse(self.node._too_close)
self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, 0.0)
self.assertFalse(self.close_pub.msgs[-1].data)
def test_fresh_face_active(self):
"""Recently received face is treated as fresh."""
_inject_states(self.node, [(1, 0.5)])
_inject_faces(self.node, [1]) # updates _last_face_t
self.node._control_cb()
self.assertTrue(self.node._too_close)
def test_no_face_ever_is_stale(self):
"""Never received a face → stale, no backup."""
# _last_face_t = 0.0 (default) → not fresh
self.node._control_cb()
self.assertFalse(self.node._too_close)
def test_stale_logs_info(self):
self.node._too_close = True
self.node._last_face_t = time.monotonic() - 100.0
self.node._control_cb()
infos = [l for l in self.node._logs if l[0] == "INFO"]
self.assertTrue(any("face lost" in m.lower() or "clearing" in m.lower()
for _, m in infos))
def test_too_close_entry_logs_warn(self):
_inject_states(self.node, [(1, 0.3)])
_inject_faces(self.node, [1])
self.node._control_cb()
warns = [l for l in self.node._logs if l[0] == "WARN"]
self.assertTrue(any("too close" in m.lower() or "backing" in m.lower()
for _, m in warns))
def test_clear_logs_info(self):
_inject_states(self.node, [(1, 0.3)])
_inject_faces(self.node, [1])
self.node._control_cb()
_inject_states(self.node, [(1, 1.5)])
self.node._control_cb()
infos = [l for l in self.node._logs if l[0] == "INFO"]
self.assertTrue(any("clear" in m.lower() for _, m in infos))
# ── Tests: _publish ───────────────────────────────────────────────────────────
class TestPublish(unittest.TestCase):
@classmethod
def setUpClass(cls): cls.mod = _load_mod()
def setUp(self):
self.node = _make_node(self.mod, backup_speed=0.15)
self.cmd_pub = self.node._pubs["/cmd_vel"]
self.bool_pub = self.node._pubs["/saltybot/too_close"]
def test_backup_false_zero_linear_x(self):
self.node._publish(backup=False)
self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, 0.0)
def test_backup_true_negative_linear_x(self):
self.node._publish(backup=True)
self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, -0.15)
def test_backup_false_bool_false(self):
self.node._publish(backup=False)
self.assertFalse(self.bool_pub.msgs[-1].data)
def test_backup_true_bool_true(self):
self.node._publish(backup=True)
self.assertTrue(self.bool_pub.msgs[-1].data)
def test_angular_z_always_zero(self):
self.node._publish(backup=True)
self.assertAlmostEqual(self.cmd_pub.msgs[-1].angular.z, 0.0)
def test_both_topics_published(self):
self.node._publish(backup=False)
self.assertEqual(len(self.cmd_pub.msgs), 1)
self.assertEqual(len(self.bool_pub.msgs), 1)
# ── Tests: source content ─────────────────────────────────────────────────────
class TestNodeSrc(unittest.TestCase):
@classmethod
def setUpClass(cls):
with open(_SRC) as f: cls.src = f.read()
def test_issue_tag(self): self.assertIn("#310", self.src)
def test_cmd_vel_topic(self): self.assertIn("/cmd_vel", self.src)
def test_too_close_topic(self): self.assertIn("/saltybot/too_close", self.src)
def test_faces_topic(self): self.assertIn("/social/faces/detected", self.src)
def test_states_topic(self): self.assertIn("/social/person_states", self.src)
def test_closest_face_fn(self): self.assertIn("closest_face_distance", self.src)
def test_should_backup_fn(self): self.assertIn("should_backup", self.src)
def test_should_clear_fn(self): self.assertIn("should_clear", self.src)
def test_hysteresis_param(self): self.assertIn("hysteresis_m", self.src)
def test_backup_speed_param(self): self.assertIn("backup_speed", self.src)
def test_personal_space_param(self): self.assertIn("personal_space_m", self.src)
def test_lost_timeout_param(self): self.assertIn("lost_timeout_s", self.src)
def test_control_rate_param(self): self.assertIn("control_rate", self.src)
def test_threading_lock(self): self.assertIn("threading.Lock", self.src)
def test_linear_x(self): self.assertIn("linear.x", self.src)
def test_twist_published(self): self.assertIn("Twist", self.src)
def test_bool_published(self): self.assertIn("Bool", self.src)
def test_main_defined(self): self.assertIn("def main", self.src)
def test_face_detection_array(self): self.assertIn("FaceDetectionArray", self.src)
def test_person_state_array(self): self.assertIn("PersonStateArray", self.src)
# ── Tests: config / launch / setup ────────────────────────────────────────────
class TestConfig(unittest.TestCase):
_CONFIG = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/config/personal_space_params.yaml"
)
_LAUNCH = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/launch/personal_space.launch.py"
)
_SETUP = (
"/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/"
"saltybot_social/setup.py"
)
def test_config_exists(self):
import os; self.assertTrue(os.path.exists(self._CONFIG))
def test_config_personal_space_m(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("personal_space_m", c)
def test_config_backup_speed(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("backup_speed", c)
def test_config_hysteresis(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("hysteresis_m", c)
def test_config_lost_timeout(self):
with open(self._CONFIG) as f: c = f.read()
self.assertIn("lost_timeout_s", c)
def test_launch_exists(self):
import os; self.assertTrue(os.path.exists(self._LAUNCH))
def test_launch_has_personal_space_arg(self):
with open(self._LAUNCH) as f: c = f.read()
self.assertIn("personal_space_m", c)
def test_launch_has_backup_speed_arg(self):
with open(self._LAUNCH) as f: c = f.read()
self.assertIn("backup_speed", c)
def test_entry_point_in_setup(self):
with open(self._SETUP) as f: c = f.read()
self.assertIn("personal_space_node", c)
if __name__ == "__main__":
unittest.main()