diff --git a/jetson/ros2_ws/src/saltybot_social/config/personal_space_params.yaml b/jetson/ros2_ws/src/saltybot_social/config/personal_space_params.yaml new file mode 100644 index 0000000..f8010a8 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/config/personal_space_params.yaml @@ -0,0 +1,10 @@ +personal_space_node: + ros__parameters: + personal_space_m: 0.8 # Minimum comfortable distance (m) + backup_speed: 0.1 # Retreat linear speed (m/s, applied as -x) + hysteresis_m: 0.1 # Hysteresis band above threshold (m) + unknown_distance_m: 99.0 # Distance assumed for faces without a PersonState + lost_timeout_s: 1.5 # Face freshness window (s) + control_rate: 10.0 # Control loop / publish rate (Hz) + faces_topic: "/social/faces/detected" + states_topic: "/social/person_states" diff --git a/jetson/ros2_ws/src/saltybot_social/launch/personal_space.launch.py b/jetson/ros2_ws/src/saltybot_social/launch/personal_space.launch.py new file mode 100644 index 0000000..4598808 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/launch/personal_space.launch.py @@ -0,0 +1,39 @@ +"""personal_space.launch.py -- Launch personal space respector (Issue #310). + +Usage: + ros2 launch saltybot_social personal_space.launch.py + ros2 launch saltybot_social personal_space.launch.py personal_space_m:=1.0 backup_speed:=0.15 +""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description(): + pkg = get_package_share_directory("saltybot_social") + cfg = os.path.join(pkg, "config", "personal_space_params.yaml") + + return LaunchDescription([ + DeclareLaunchArgument("personal_space_m", default_value="0.8", + description="Minimum comfortable distance (m)"), + DeclareLaunchArgument("backup_speed", default_value="0.1", + description="Retreat linear speed (m/s)"), + + Node( + package="saltybot_social", + executable="personal_space_node", + name="personal_space_node", + output="screen", + parameters=[ + cfg, + { + "personal_space_m": LaunchConfiguration("personal_space_m"), + "backup_speed": LaunchConfiguration("backup_speed"), + }, + ], + ), + ]) diff --git a/jetson/ros2_ws/src/saltybot_social/saltybot_social/personal_space_node.py b/jetson/ros2_ws/src/saltybot_social/saltybot_social/personal_space_node.py new file mode 100644 index 0000000..3301066 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/saltybot_social/personal_space_node.py @@ -0,0 +1,211 @@ +"""personal_space_node.py — Personal space respector. +Issue #310 + +Monitors the distance to detected faces. When the closest person +enters within ``personal_space_m`` metres (default 0.8 m) the robot +slowly backs up (``Twist.linear.x = -backup_speed``) and latches +/saltybot/too_close to True. Once the person retreats beyond +``personal_space_m + hysteresis_m`` the robot stops and the latch clears. + +Distance is read from /social/person_states (PersonStateArray, face_id → +distance). When a face has no matching PersonState entry the distance is +treated as ``unknown_distance_m`` (default: very large → safe, no backup). + +Subscriptions +───────────── + /social/faces/detected saltybot_social_msgs/FaceDetectionArray + /social/person_states saltybot_social_msgs/PersonStateArray + +Publications +──────────── + /cmd_vel geometry_msgs/Twist — linear.x only; angular.z = 0 + /saltybot/too_close std_msgs/Bool — True while backing up + +State machine +───────────── + CLEAR — no face within personal_space_m; publish zero Twist + False + TOO_CLOSE — face within personal_space_m; publish backup Twist + True + clears only when distance > personal_space_m + hysteresis_m + +Parameters +────────── + personal_space_m (float, 0.8) minimum comfortable distance (m) + backup_speed (float, 0.1) retreat linear speed (m/s, applied as -x) + hysteresis_m (float, 0.1) hysteresis band above threshold (m) + unknown_distance_m (float, 99.0) distance assumed for faces without state + lost_timeout_s (float, 1.5) face freshness window (s) + control_rate (float, 10.0) publish rate (Hz) + faces_topic (str, "/social/faces/detected") + states_topic (str, "/social/person_states") +""" + +from __future__ import annotations + +import time +import threading +from typing import Dict + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile +from std_msgs.msg import Bool +from geometry_msgs.msg import Twist + +try: + from saltybot_social_msgs.msg import FaceDetectionArray, PersonStateArray + _MSGS = True +except ImportError: + _MSGS = False + + +# ── Pure helpers ─────────────────────────────────────────────────────────────── + +def closest_face_distance(face_ids, distance_cache: Dict[int, float], + unknown_distance_m: float) -> float: + """Return the minimum distance across all visible face_ids. + + Falls back to *unknown_distance_m* for any face_id not in the cache. + Returns *unknown_distance_m* when *face_ids* is empty. + """ + if not face_ids: + return unknown_distance_m + return min(distance_cache.get(fid, unknown_distance_m) for fid in face_ids) + + +def should_backup(distance: float, personal_space_m: float) -> bool: + return distance <= personal_space_m + + +def should_clear(distance: float, personal_space_m: float, + hysteresis_m: float) -> bool: + return distance > personal_space_m + hysteresis_m + + +# ── ROS2 node ────────────────────────────────────────────────────────────────── + +class PersonalSpaceNode(Node): + """Backs the robot away when a person enters personal space.""" + + def __init__(self) -> None: + super().__init__("personal_space_node") + + self.declare_parameter("personal_space_m", 0.8) + self.declare_parameter("backup_speed", 0.1) + self.declare_parameter("hysteresis_m", 0.1) + self.declare_parameter("unknown_distance_m", 99.0) + self.declare_parameter("lost_timeout_s", 1.5) + self.declare_parameter("control_rate", 10.0) + self.declare_parameter("faces_topic", "/social/faces/detected") + self.declare_parameter("states_topic", "/social/person_states") + + self._space = self.get_parameter("personal_space_m").value + self._backup_speed = self.get_parameter("backup_speed").value + self._hysteresis = self.get_parameter("hysteresis_m").value + self._unknown_dist = self.get_parameter("unknown_distance_m").value + self._lost_t = self.get_parameter("lost_timeout_s").value + rate = self.get_parameter("control_rate").value + faces_topic = self.get_parameter("faces_topic").value + states_topic = self.get_parameter("states_topic").value + + # State + self._too_close: bool = False + self._distance_cache: Dict[int, float] = {} # face_id → distance (m) + self._active_face_ids: list = [] # face_ids in last msg + self._last_face_t: float = 0.0 + self._lock = threading.Lock() + + qos = QoSProfile(depth=10) + self._cmd_pub = self.create_publisher(Twist, "/cmd_vel", qos) + self._too_close_pub = self.create_publisher(Bool, "/saltybot/too_close", qos) + + if _MSGS: + self._states_sub = self.create_subscription( + PersonStateArray, states_topic, self._on_person_states, qos + ) + self._faces_sub = self.create_subscription( + FaceDetectionArray, faces_topic, self._on_faces, qos + ) + else: + self.get_logger().warn( + "saltybot_social_msgs not available — node passive" + ) + + self._timer = self.create_timer(1.0 / rate, self._control_cb) + + self.get_logger().info( + f"PersonalSpaceNode ready " + f"(space={self._space}m, backup={self._backup_speed}m/s, " + f"hysteresis={self._hysteresis}m)" + ) + + # ── Subscription callbacks ───────────────────────────────────────────── + + def _on_person_states(self, msg) -> None: + with self._lock: + for ps in msg.persons: + if ps.face_id >= 0: + self._distance_cache[ps.face_id] = float(ps.distance) + + def _on_faces(self, msg) -> None: + face_ids = [int(f.face_id) for f in msg.faces] + with self._lock: + self._active_face_ids = face_ids + if face_ids: + self._last_face_t = time.monotonic() + + # ── Control loop ─────────────────────────────────────────────────────── + + def _control_cb(self) -> None: + now = time.monotonic() + with self._lock: + face_ids = list(self._active_face_ids) + last_face_t = self._last_face_t + cache = dict(self._distance_cache) + + face_fresh = last_face_t > 0.0 and (now - last_face_t) < self._lost_t + + if not face_fresh: + # No fresh face data → clear state, stop + if self._too_close: + self._too_close = False + self.get_logger().info("PersonalSpace: face lost — clearing") + self._publish(backup=False) + return + + dist = closest_face_distance(face_ids, cache, self._unknown_dist) + + if not self._too_close and should_backup(dist, self._space): + self._too_close = True + self.get_logger().warn( + f"PersonalSpace: TOO CLOSE ({dist:.2f}m <= {self._space}m) — backing up" + ) + elif self._too_close and should_clear(dist, self._space, self._hysteresis): + self._too_close = False + self.get_logger().info( + f"PersonalSpace: CLEAR ({dist:.2f}m > {self._space + self._hysteresis:.2f}m)" + ) + + self._publish(backup=self._too_close) + + def _publish(self, backup: bool) -> None: + twist = Twist() + if backup: + twist.linear.x = -self._backup_speed + + bool_msg = Bool() + bool_msg.data = backup + + self._cmd_pub.publish(twist) + self._too_close_pub.publish(bool_msg) + + +def main(args=None) -> None: + rclpy.init(args=args) + node = PersonalSpaceNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() diff --git a/jetson/ros2_ws/src/saltybot_social/setup.py b/jetson/ros2_ws/src/saltybot_social/setup.py index feafc7f..b2ca025 100644 --- a/jetson/ros2_ws/src/saltybot_social/setup.py +++ b/jetson/ros2_ws/src/saltybot_social/setup.py @@ -55,6 +55,8 @@ setup( 'volume_adjust_node = saltybot_social.volume_adjust_node:main', # Conversation topic memory (Issue #299) 'topic_memory_node = saltybot_social.topic_memory_node:main', + # Personal space respector (Issue #310) + 'personal_space_node = saltybot_social.personal_space_node:main', ], }, ) diff --git a/jetson/ros2_ws/src/saltybot_social/test/test_personal_space.py b/jetson/ros2_ws/src/saltybot_social/test/test_personal_space.py new file mode 100644 index 0000000..22b048b --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_social/test/test_personal_space.py @@ -0,0 +1,666 @@ +"""test_personal_space.py — Offline tests for personal_space_node (Issue #310). + +Stubs out rclpy and ROS message types so tests run without a ROS install. +""" + +import importlib.util +import sys +import time +import types +import unittest + + +# ── ROS2 stubs ──────────────────────────────────────────────────────────────── + +def _make_ros_stubs(): + for mod_name in ("rclpy", "rclpy.node", "rclpy.qos", + "std_msgs", "std_msgs.msg", + "geometry_msgs", "geometry_msgs.msg", + "saltybot_social_msgs", "saltybot_social_msgs.msg"): + if mod_name not in sys.modules: + sys.modules[mod_name] = types.ModuleType(mod_name) + + class _Node: + def __init__(self, name="node"): + self._name = name + if not hasattr(self, "_params"): + self._params = {} + self._pubs = {} + self._subs = {} + self._logs = [] + self._timers = [] + + def declare_parameter(self, name, default): + if name not in self._params: + self._params[name] = default + + def get_parameter(self, name): + class _P: + def __init__(self, v): self.value = v + return _P(self._params.get(name)) + + def create_publisher(self, msg_type, topic, qos): + pub = _FakePub() + self._pubs[topic] = pub + return pub + + def create_subscription(self, msg_type, topic, cb, qos): + self._subs[topic] = cb + return object() + + def create_timer(self, period, cb): + self._timers.append(cb) + return object() + + def get_logger(self): + node = self + class _L: + def info(self, m): node._logs.append(("INFO", m)) + def warn(self, m): node._logs.append(("WARN", m)) + def error(self, m): node._logs.append(("ERROR", m)) + return _L() + + def destroy_node(self): pass + + class _FakePub: + def __init__(self): + self.msgs = [] + def publish(self, msg): + self.msgs.append(msg) + + class _QoSProfile: + def __init__(self, depth=10): self.depth = depth + + class _Bool: + def __init__(self): self.data = False + + class _TwistLinear: + def __init__(self): self.x = 0.0 + + class _TwistAngular: + def __init__(self): self.z = 0.0 + + class _Twist: + def __init__(self): + self.linear = _TwistLinear() + self.angular = _TwistAngular() + + class _FaceDetection: + def __init__(self, face_id=0): + self.face_id = face_id + + class _PersonState: + def __init__(self, face_id=0, distance=1.0): + self.face_id = face_id + self.distance = distance + + class _FaceDetectionArray: + def __init__(self, faces=None): + self.faces = faces or [] + + class _PersonStateArray: + def __init__(self, persons=None): + self.persons = persons or [] + + rclpy_mod = sys.modules["rclpy"] + rclpy_mod.init = lambda args=None: None + rclpy_mod.spin = lambda node: None + rclpy_mod.shutdown = lambda: None + + sys.modules["rclpy.node"].Node = _Node + sys.modules["rclpy.qos"].QoSProfile = _QoSProfile + sys.modules["std_msgs.msg"].Bool = _Bool + sys.modules["geometry_msgs.msg"].Twist = _Twist + sys.modules["saltybot_social_msgs.msg"].FaceDetectionArray = _FaceDetectionArray + sys.modules["saltybot_social_msgs.msg"].PersonStateArray = _PersonStateArray + + return (_Node, _FakePub, _Bool, _Twist, + _FaceDetection, _PersonState, + _FaceDetectionArray, _PersonStateArray) + + +(_Node, _FakePub, _Bool, _Twist, + _FaceDetection, _PersonState, + _FaceDetectionArray, _PersonStateArray) = _make_ros_stubs() + + +# ── Module loader ───────────────────────────────────────────────────────────── + +_SRC = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/saltybot_social/personal_space_node.py" +) + + +def _load_mod(): + spec = importlib.util.spec_from_file_location("personal_space_testmod", _SRC) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +def _make_node(mod, **kwargs): + node = mod.PersonalSpaceNode.__new__(mod.PersonalSpaceNode) + defaults = { + "personal_space_m": 0.8, + "backup_speed": 0.1, + "hysteresis_m": 0.1, + "unknown_distance_m": 99.0, + "lost_timeout_s": 1.5, + "control_rate": 10.0, + "faces_topic": "/social/faces/detected", + "states_topic": "/social/person_states", + } + defaults.update(kwargs) + node._params = dict(defaults) + mod.PersonalSpaceNode.__init__(node) + return node + + +def _inject_faces(node, face_ids): + """Push a FaceDetectionArray message with the given face_ids.""" + msg = _FaceDetectionArray(faces=[_FaceDetection(fid) for fid in face_ids]) + node._subs["/social/faces/detected"](msg) + + +def _inject_states(node, id_dist_pairs): + """Push a PersonStateArray message with (face_id, distance) pairs.""" + persons = [_PersonState(fid, dist) for fid, dist in id_dist_pairs] + msg = _PersonStateArray(persons=persons) + node._subs["/social/person_states"](msg) + + +# ── Tests: pure helpers ──────────────────────────────────────────────────────── + +class TestClosestFaceDistance(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def _cfd(self, face_ids, cache, unknown=99.0): + return self.mod.closest_face_distance(face_ids, cache, unknown) + + def test_empty_face_ids_returns_unknown(self): + self.assertEqual(self._cfd([], {}), 99.0) + + def test_single_face_known(self): + self.assertAlmostEqual(self._cfd([1], {1: 0.5}), 0.5) + + def test_single_face_unknown(self): + self.assertEqual(self._cfd([1], {}), 99.0) + + def test_multiple_faces_min(self): + dist = self._cfd([1, 2, 3], {1: 2.0, 2: 0.5, 3: 1.5}) + self.assertAlmostEqual(dist, 0.5) + + def test_mixed_known_unknown(self): + # unknown face treated as unknown_distance_m = 99, so min is the known one + dist = self._cfd([1, 2], {1: 0.6}, 99.0) + self.assertAlmostEqual(dist, 0.6) + + def test_all_unknown_returns_unknown(self): + dist = self._cfd([1, 2], {}, 99.0) + self.assertEqual(dist, 99.0) + + def test_custom_unknown_value(self): + dist = self._cfd([], {}, unknown=5.0) + self.assertEqual(dist, 5.0) + + +class TestShouldBackup(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def test_at_threshold_triggers(self): + self.assertTrue(self.mod.should_backup(0.8, 0.8)) + + def test_below_threshold_triggers(self): + self.assertTrue(self.mod.should_backup(0.5, 0.8)) + + def test_above_threshold_no_trigger(self): + self.assertFalse(self.mod.should_backup(1.0, 0.8)) + + def test_just_above_no_trigger(self): + self.assertFalse(self.mod.should_backup(0.81, 0.8)) + + +class TestShouldClear(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def test_above_band_clears(self): + # 0.8 + 0.1 = 0.9; 1.0 > 0.9 → clear + self.assertTrue(self.mod.should_clear(1.0, 0.8, 0.1)) + + def test_at_band_edge_does_not_clear(self): + self.assertFalse(self.mod.should_clear(0.9, 0.8, 0.1)) + + def test_within_band_does_not_clear(self): + self.assertFalse(self.mod.should_clear(0.85, 0.8, 0.1)) + + def test_below_threshold_does_not_clear(self): + self.assertFalse(self.mod.should_clear(0.5, 0.8, 0.1)) + + def test_just_above_band_clears(self): + self.assertTrue(self.mod.should_clear(0.91, 0.8, 0.1)) + + +# ── Tests: node init ────────────────────────────────────────────────────────── + +class TestNodeInit(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def test_instantiates(self): + self.assertIsNotNone(_make_node(self.mod)) + + def test_cmd_vel_publisher(self): + node = _make_node(self.mod) + self.assertIn("/cmd_vel", node._pubs) + + def test_too_close_publisher(self): + node = _make_node(self.mod) + self.assertIn("/saltybot/too_close", node._pubs) + + def test_faces_subscriber(self): + node = _make_node(self.mod) + self.assertIn("/social/faces/detected", node._subs) + + def test_states_subscriber(self): + node = _make_node(self.mod) + self.assertIn("/social/person_states", node._subs) + + def test_timer_registered(self): + node = _make_node(self.mod) + self.assertGreater(len(node._timers), 0) + + def test_too_close_initially_false(self): + node = _make_node(self.mod) + self.assertFalse(node._too_close) + + def test_distance_cache_empty(self): + node = _make_node(self.mod) + self.assertEqual(node._distance_cache, {}) + + def test_active_face_ids_empty(self): + node = _make_node(self.mod) + self.assertEqual(node._active_face_ids, []) + + def test_custom_topics(self): + node = _make_node(self.mod, + faces_topic="/my/faces", + states_topic="/my/states") + self.assertIn("/my/faces", node._subs) + self.assertIn("/my/states", node._subs) + + def test_parameters_stored(self): + node = _make_node(self.mod, personal_space_m=1.2, backup_speed=0.2) + self.assertAlmostEqual(node._space, 1.2) + self.assertAlmostEqual(node._backup_speed, 0.2) + + +# ── Tests: _on_person_states ────────────────────────────────────────────────── + +class TestOnPersonStates(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def setUp(self): + self.node = _make_node(self.mod) + + def test_single_state_cached(self): + _inject_states(self.node, [(1, 1.5)]) + self.assertAlmostEqual(self.node._distance_cache[1], 1.5) + + def test_multiple_states_cached(self): + _inject_states(self.node, [(1, 1.5), (2, 0.6), (3, 2.0)]) + self.assertAlmostEqual(self.node._distance_cache[1], 1.5) + self.assertAlmostEqual(self.node._distance_cache[2], 0.6) + self.assertAlmostEqual(self.node._distance_cache[3], 2.0) + + def test_state_updated_on_second_msg(self): + _inject_states(self.node, [(1, 1.5)]) + _inject_states(self.node, [(1, 0.4)]) + self.assertAlmostEqual(self.node._distance_cache[1], 0.4) + + def test_negative_face_id_ignored(self): + _inject_states(self.node, [(-1, 0.5)]) + self.assertNotIn(-1, self.node._distance_cache) + + def test_zero_face_id_stored(self): + _inject_states(self.node, [(0, 0.7)]) + self.assertIn(0, self.node._distance_cache) + + def test_distance_cast_to_float(self): + _inject_states(self.node, [(1, 1)]) # integer distance + self.assertIsInstance(self.node._distance_cache[1], float) + + +# ── Tests: _on_faces ────────────────────────────────────────────────────────── + +class TestOnFaces(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def setUp(self): + self.node = _make_node(self.mod) + + def test_face_ids_stored(self): + _inject_faces(self.node, [1, 2, 3]) + self.assertEqual(sorted(self.node._active_face_ids), [1, 2, 3]) + + def test_empty_faces_stored(self): + _inject_faces(self.node, []) + self.assertEqual(self.node._active_face_ids, []) + + def test_timestamp_updated_on_faces(self): + before = time.monotonic() + _inject_faces(self.node, [1]) + self.assertGreaterEqual(self.node._last_face_t, before) + + def test_timestamp_not_updated_on_empty(self): + self.node._last_face_t = 0.0 + _inject_faces(self.node, []) + self.assertEqual(self.node._last_face_t, 0.0) + + def test_faces_replaced_not_merged(self): + _inject_faces(self.node, [1, 2]) + _inject_faces(self.node, [3]) + self.assertEqual(sorted(self.node._active_face_ids), [3]) + + +# ── Tests: control loop ─────────────────────────────────────────────────────── + +class TestControlLoop(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def setUp(self): + self.node = _make_node(self.mod, personal_space_m=0.8, backup_speed=0.1, + hysteresis_m=0.1, lost_timeout_s=1.5) + self.cmd_pub = self.node._pubs["/cmd_vel"] + self.close_pub = self.node._pubs["/saltybot/too_close"] + + def _fresh_face(self, face_id=1, distance=1.0): + """Inject a face + state so the node sees a fresh, close/far person.""" + _inject_states(self.node, [(face_id, distance)]) + _inject_faces(self.node, [face_id]) + + def test_no_face_publishes_zero_twist(self): + self.node._control_cb() + self.assertEqual(len(self.cmd_pub.msgs), 1) + self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, 0.0) + + def test_no_face_publishes_false(self): + self.node._control_cb() + self.assertFalse(self.close_pub.msgs[-1].data) + + def test_person_far_no_backup(self): + self._fresh_face(distance=1.5) + self.node._control_cb() + self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, 0.0) + self.assertFalse(self.close_pub.msgs[-1].data) + + def test_person_at_threshold_triggers_backup(self): + self._fresh_face(distance=0.8) + self.node._control_cb() + self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, -0.1) + self.assertTrue(self.close_pub.msgs[-1].data) + + def test_person_too_close_triggers_backup(self): + self._fresh_face(distance=0.5) + self.node._control_cb() + self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, -0.1) + self.assertTrue(self.close_pub.msgs[-1].data) + + def test_too_close_state_latches(self): + """State stays TOO_CLOSE even if distance jumps to mid-band.""" + self._fresh_face(distance=0.5) + self.node._control_cb() # enter TOO_CLOSE + # Now person is at 0.85 — inside hysteresis band (0.8 < 0.85 < 0.9) + _inject_states(self.node, [(1, 0.85)]) + self.node._control_cb() + self.assertTrue(self.node._too_close) + self.assertTrue(self.close_pub.msgs[-1].data) + + def test_hysteresis_clears_state(self): + """State clears when distance exceeds personal_space_m + hysteresis_m.""" + self._fresh_face(distance=0.5) + self.node._control_cb() # enter TOO_CLOSE + # Person retreats past hysteresis band (> 0.9 m) + _inject_states(self.node, [(1, 1.0)]) + self.node._control_cb() + self.assertFalse(self.node._too_close) + self.assertFalse(self.close_pub.msgs[-1].data) + + def test_backup_speed_magnitude(self): + node = _make_node(self.mod, backup_speed=0.25) + _inject_states(node, [(1, 0.3)]) + _inject_faces(node, [1]) + node._control_cb() + self.assertAlmostEqual(node._pubs["/cmd_vel"].msgs[-1].linear.x, -0.25) + + def test_state_machine_full_cycle(self): + """CLEAR → TOO_CLOSE → CLEAR.""" + # Start clear + self._fresh_face(distance=1.5) + self.node._control_cb() + self.assertFalse(self.node._too_close) + + # Enter TOO_CLOSE + _inject_states(self.node, [(1, 0.5)]) + self.node._control_cb() + self.assertTrue(self.node._too_close) + + # Return to CLEAR + _inject_states(self.node, [(1, 1.0)]) + self.node._control_cb() + self.assertFalse(self.node._too_close) + + def test_multiple_faces_closest_used(self): + """Closest face determines the backup decision.""" + _inject_states(self.node, [(1, 2.0), (2, 0.5)]) + _inject_faces(self.node, [1, 2]) + self.node._control_cb() + self.assertTrue(self.node._too_close) + + def test_both_far_no_backup(self): + _inject_states(self.node, [(1, 1.5), (2, 2.0)]) + _inject_faces(self.node, [1, 2]) + self.node._control_cb() + self.assertFalse(self.node._too_close) + + def test_unknown_face_uses_unknown_distance(self): + """Face without PersonState gets unknown_distance_m (default=99 → safe).""" + # No PersonState injected, so face_id=5 has no cache entry + _inject_faces(self.node, [5]) + self.node._last_face_t = time.monotonic() # mark fresh + self.node._control_cb() + self.assertFalse(self.node._too_close) + + def test_publishes_on_every_tick(self): + self.node._control_cb() + self.node._control_cb() + self.node._control_cb() + self.assertEqual(len(self.cmd_pub.msgs), 3) + self.assertEqual(len(self.close_pub.msgs), 3) + + +# ── Tests: face freshness / lost timeout ────────────────────────────────────── + +class TestFaceFreshness(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def setUp(self): + self.node = _make_node(self.mod, lost_timeout_s=1.5) + self.cmd_pub = self.node._pubs["/cmd_vel"] + self.close_pub = self.node._pubs["/saltybot/too_close"] + + def test_stale_face_clears_state(self): + """If no fresh face data, state clears and robot stops.""" + # Manually set backed-up state + self.node._too_close = True + self.node._last_face_t = time.monotonic() - 10.0 # stale + self.node._control_cb() + self.assertFalse(self.node._too_close) + self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, 0.0) + self.assertFalse(self.close_pub.msgs[-1].data) + + def test_fresh_face_active(self): + """Recently received face is treated as fresh.""" + _inject_states(self.node, [(1, 0.5)]) + _inject_faces(self.node, [1]) # updates _last_face_t + self.node._control_cb() + self.assertTrue(self.node._too_close) + + def test_no_face_ever_is_stale(self): + """Never received a face → stale, no backup.""" + # _last_face_t = 0.0 (default) → not fresh + self.node._control_cb() + self.assertFalse(self.node._too_close) + + def test_stale_logs_info(self): + self.node._too_close = True + self.node._last_face_t = time.monotonic() - 100.0 + self.node._control_cb() + infos = [l for l in self.node._logs if l[0] == "INFO"] + self.assertTrue(any("face lost" in m.lower() or "clearing" in m.lower() + for _, m in infos)) + + def test_too_close_entry_logs_warn(self): + _inject_states(self.node, [(1, 0.3)]) + _inject_faces(self.node, [1]) + self.node._control_cb() + warns = [l for l in self.node._logs if l[0] == "WARN"] + self.assertTrue(any("too close" in m.lower() or "backing" in m.lower() + for _, m in warns)) + + def test_clear_logs_info(self): + _inject_states(self.node, [(1, 0.3)]) + _inject_faces(self.node, [1]) + self.node._control_cb() + _inject_states(self.node, [(1, 1.5)]) + self.node._control_cb() + infos = [l for l in self.node._logs if l[0] == "INFO"] + self.assertTrue(any("clear" in m.lower() for _, m in infos)) + + +# ── Tests: _publish ─────────────────────────────────────────────────────────── + +class TestPublish(unittest.TestCase): + @classmethod + def setUpClass(cls): cls.mod = _load_mod() + + def setUp(self): + self.node = _make_node(self.mod, backup_speed=0.15) + self.cmd_pub = self.node._pubs["/cmd_vel"] + self.bool_pub = self.node._pubs["/saltybot/too_close"] + + def test_backup_false_zero_linear_x(self): + self.node._publish(backup=False) + self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, 0.0) + + def test_backup_true_negative_linear_x(self): + self.node._publish(backup=True) + self.assertAlmostEqual(self.cmd_pub.msgs[-1].linear.x, -0.15) + + def test_backup_false_bool_false(self): + self.node._publish(backup=False) + self.assertFalse(self.bool_pub.msgs[-1].data) + + def test_backup_true_bool_true(self): + self.node._publish(backup=True) + self.assertTrue(self.bool_pub.msgs[-1].data) + + def test_angular_z_always_zero(self): + self.node._publish(backup=True) + self.assertAlmostEqual(self.cmd_pub.msgs[-1].angular.z, 0.0) + + def test_both_topics_published(self): + self.node._publish(backup=False) + self.assertEqual(len(self.cmd_pub.msgs), 1) + self.assertEqual(len(self.bool_pub.msgs), 1) + + +# ── Tests: source content ───────────────────────────────────────────────────── + +class TestNodeSrc(unittest.TestCase): + @classmethod + def setUpClass(cls): + with open(_SRC) as f: cls.src = f.read() + + def test_issue_tag(self): self.assertIn("#310", self.src) + def test_cmd_vel_topic(self): self.assertIn("/cmd_vel", self.src) + def test_too_close_topic(self): self.assertIn("/saltybot/too_close", self.src) + def test_faces_topic(self): self.assertIn("/social/faces/detected", self.src) + def test_states_topic(self): self.assertIn("/social/person_states", self.src) + def test_closest_face_fn(self): self.assertIn("closest_face_distance", self.src) + def test_should_backup_fn(self): self.assertIn("should_backup", self.src) + def test_should_clear_fn(self): self.assertIn("should_clear", self.src) + def test_hysteresis_param(self): self.assertIn("hysteresis_m", self.src) + def test_backup_speed_param(self): self.assertIn("backup_speed", self.src) + def test_personal_space_param(self): self.assertIn("personal_space_m", self.src) + def test_lost_timeout_param(self): self.assertIn("lost_timeout_s", self.src) + def test_control_rate_param(self): self.assertIn("control_rate", self.src) + def test_threading_lock(self): self.assertIn("threading.Lock", self.src) + def test_linear_x(self): self.assertIn("linear.x", self.src) + def test_twist_published(self): self.assertIn("Twist", self.src) + def test_bool_published(self): self.assertIn("Bool", self.src) + def test_main_defined(self): self.assertIn("def main", self.src) + def test_face_detection_array(self): self.assertIn("FaceDetectionArray", self.src) + def test_person_state_array(self): self.assertIn("PersonStateArray", self.src) + + +# ── Tests: config / launch / setup ──────────────────────────────────────────── + +class TestConfig(unittest.TestCase): + _CONFIG = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/config/personal_space_params.yaml" + ) + _LAUNCH = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/launch/personal_space.launch.py" + ) + _SETUP = ( + "/Users/seb/AI/saltylab-firmware/jetson/ros2_ws/src/" + "saltybot_social/setup.py" + ) + + def test_config_exists(self): + import os; self.assertTrue(os.path.exists(self._CONFIG)) + + def test_config_personal_space_m(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("personal_space_m", c) + + def test_config_backup_speed(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("backup_speed", c) + + def test_config_hysteresis(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("hysteresis_m", c) + + def test_config_lost_timeout(self): + with open(self._CONFIG) as f: c = f.read() + self.assertIn("lost_timeout_s", c) + + def test_launch_exists(self): + import os; self.assertTrue(os.path.exists(self._LAUNCH)) + + def test_launch_has_personal_space_arg(self): + with open(self._LAUNCH) as f: c = f.read() + self.assertIn("personal_space_m", c) + + def test_launch_has_backup_speed_arg(self): + with open(self._LAUNCH) as f: c = f.read() + self.assertIn("backup_speed", c) + + def test_entry_point_in_setup(self): + with open(self._SETUP) as f: c = f.read() + self.assertIn("personal_space_node", c) + + +if __name__ == "__main__": + unittest.main()