Merge pull request 'feat(social): Issue #86 — physical expression + motor attention' (#94) from sl-firmware/social-expression into main

This commit is contained in:
sl-jetson 2026-03-01 23:55:14 -05:00
commit 35b940e1f5
13 changed files with 873 additions and 2 deletions

View File

@ -0,0 +1,206 @@
/*
* social_expression.ino LED mood display for saltybot social layer.
*
* Hardware
*
* MCU : ESP32-C3 (e.g. Seeed XIAO ESP32-C3 or equivalent)
* LEDs : NeoPixel ring or strip, DATA_PIN GPIO5, power via 5V rail
*
* Serial protocol (from Orin Nano, 115200 8N1)
*
* One JSON line per command, newline-terminated:
* {"mood":"happy","intensity":0.8}\n
*
* mood values : "happy" | "curious" | "annoyed" | "playful" | "idle"
* intensity : 0.0 (off) .. 1.0 (full brightness)
*
* If no command arrives for IDLE_TIMEOUT_MS, the node enters the
* breathing idle animation automatically.
*
* Mood colour mapping
*
* happy green #00FF00
* curious blue #0040FF
* annoyed red #FF1000
* playful rainbow (cycling hue across all pixels)
* idle soft white breathing (sine modulated)
*
* Dependencies (Arduino Library Manager)
*
* Adafruit NeoPixel 1.11.0
* ArduinoJson 7.0.0
*
* Build: Arduino IDE 2 / arduino-cli with board "ESP32C3 Dev Module"
* Board manager URL: https://raw.githubusercontent.com/espressif/arduino-esp32/gh-pages/package_esp32_index.json
*/
#include <Adafruit_NeoPixel.h>
#include <ArduinoJson.h>
// ── Hardware config ──────────────────────────────────────────────────────────
#define LED_PIN 5 // GPIO connected to NeoPixel data line
#define LED_COUNT 16 // pixels in ring / strip — adjust as needed
#define LED_BRIGHTNESS 120 // global cap 0-255 (≈47%), protects PSU
// ── Timing constants ─────────────────────────────────────────────────────────
#define IDLE_TIMEOUT_MS 3000 // fall back to breathing after 3 s silence
#define LOOP_INTERVAL_MS 20 // animation tick ≈ 50 Hz
// ── Colours (R, G, B) ────────────────────────────────────────────────────────
static const uint8_t COL_HAPPY[3] = { 0, 220, 0 }; // green
static const uint8_t COL_CURIOUS[3] = { 0, 64, 255 }; // blue
static const uint8_t COL_ANNOYED[3] = {255, 16, 0 }; // red
// ── State ────────────────────────────────────────────────────────────────────
enum Mood { MOOD_IDLE, MOOD_HAPPY, MOOD_CURIOUS, MOOD_ANNOYED, MOOD_PLAYFUL };
static Mood g_mood = MOOD_IDLE;
static float g_intensity = 1.0f;
static uint32_t g_last_cmd_ms = 0; // millis() of last received command
// Animation counters
static uint16_t g_rainbow_hue = 0; // 0..65535, cycles for playful
static uint32_t g_last_tick = 0;
// Serial receive buffer
static char g_serial_buf[128];
static uint8_t g_buf_pos = 0;
// ── NeoPixel object ──────────────────────────────────────────────────────────
Adafruit_NeoPixel strip(LED_COUNT, LED_PIN, NEO_GRB + NEO_KHZ800);
// ── Helpers ──────────────────────────────────────────────────────────────────
static uint8_t scale(uint8_t v, float intensity) {
return (uint8_t)(v * intensity);
}
/*
* Breathing: sine envelope over time.
* Returns brightness factor 0.0..1.0, period ~4 s.
*/
static float breath_factor(uint32_t now_ms) {
float phase = (float)(now_ms % 4000) / 4000.0f; // 0..1 per period
return 0.08f + 0.35f * (0.5f + 0.5f * sinf(2.0f * M_PI * phase));
}
static void set_all(uint8_t r, uint8_t g, uint8_t b) {
for (int i = 0; i < LED_COUNT; i++) {
strip.setPixelColor(i, strip.Color(r, g, b));
}
}
// ── Animation drivers ────────────────────────────────────────────────────────
static void animate_solid(const uint8_t col[3], float intensity) {
set_all(scale(col[0], intensity),
scale(col[1], intensity),
scale(col[2], intensity));
}
static void animate_breathing(uint32_t now_ms, float intensity) {
float bf = breath_factor(now_ms) * intensity;
// Warm white: R=255 G=200 B=120
set_all(scale(255, bf), scale(200, bf), scale(120, bf));
}
static void animate_rainbow(float intensity) {
// Spread full wheel across all pixels
for (int i = 0; i < LED_COUNT; i++) {
uint16_t hue = g_rainbow_hue + (uint16_t)((float)i / LED_COUNT * 65536.0f);
uint32_t rgb = strip.ColorHSV(hue, 255,
(uint8_t)(255.0f * intensity));
strip.setPixelColor(i, rgb);
}
// Advance hue each tick (full cycle in ~6 s at 50 Hz)
g_rainbow_hue += 218;
}
// ── Serial parser ────────────────────────────────────────────────────────────
static void parse_command(const char *line) {
StaticJsonDocument<128> doc;
DeserializationError err = deserializeJson(doc, line);
if (err) return;
const char *mood_str = doc["mood"] | "";
float intensity = doc["intensity"] | 1.0f;
if (intensity < 0.0f) intensity = 0.0f;
if (intensity > 1.0f) intensity = 1.0f;
if (strcmp(mood_str, "happy") == 0) g_mood = MOOD_HAPPY;
else if (strcmp(mood_str, "curious") == 0) g_mood = MOOD_CURIOUS;
else if (strcmp(mood_str, "annoyed") == 0) g_mood = MOOD_ANNOYED;
else if (strcmp(mood_str, "playful") == 0) g_mood = MOOD_PLAYFUL;
else g_mood = MOOD_IDLE;
g_intensity = intensity;
g_last_cmd_ms = millis();
}
static void read_serial(void) {
while (Serial.available()) {
char c = (char)Serial.read();
if (c == '\n' || c == '\r') {
if (g_buf_pos > 0) {
g_serial_buf[g_buf_pos] = '\0';
parse_command(g_serial_buf);
g_buf_pos = 0;
}
} else if (g_buf_pos < (sizeof(g_serial_buf) - 1)) {
g_serial_buf[g_buf_pos++] = c;
} else {
// Buffer overflow — discard line
g_buf_pos = 0;
}
}
}
// ── Arduino entry points ─────────────────────────────────────────────────────
void setup(void) {
Serial.begin(115200);
strip.begin();
strip.setBrightness(LED_BRIGHTNESS);
strip.clear();
strip.show();
g_last_tick = millis();
}
void loop(void) {
uint32_t now = millis();
read_serial();
// Fall back to idle if no command for IDLE_TIMEOUT_MS
if ((now - g_last_cmd_ms) > IDLE_TIMEOUT_MS) {
g_mood = MOOD_IDLE;
}
// Throttle animation ticks
if ((now - g_last_tick) < LOOP_INTERVAL_MS) return;
g_last_tick = now;
switch (g_mood) {
case MOOD_HAPPY:
animate_solid(COL_HAPPY, g_intensity);
break;
case MOOD_CURIOUS:
animate_solid(COL_CURIOUS, g_intensity);
break;
case MOOD_ANNOYED:
animate_solid(COL_ANNOYED, g_intensity);
break;
case MOOD_PLAYFUL:
animate_rainbow(g_intensity);
break;
case MOOD_IDLE:
default:
animate_breathing(now, g_intensity > 0.0f ? g_intensity : 1.0f);
break;
}
strip.show();
}

View File

@ -0,0 +1,30 @@
# attention_params.yaml — Social attention controller configuration.
#
# ── Proportional gain ─────────────────────────────────────────────────────────
# angular_z = clamp(kp_angular * bearing_rad, ±max_angular_vel)
#
# kp_angular : 1.0 → 1.0 rad/s per 1 rad error (57° → full speed).
# Increase for snappier tracking; decrease if robot oscillates.
kp_angular: 1.0 # rad/s per rad
# ── Velocity limits ──────────────────────────────────────────────────────────
# Hard cap on rotation output. In social mode the balance loop must not be
# disturbed by large sudden angular commands; keep this ≤ 1.0 rad/s.
max_angular_vel: 0.8 # rad/s
# ── Dead zone ────────────────────────────────────────────────────────────────
# If |bearing| ≤ dead_zone_rad, rotation is suppressed.
# 0.15 rad ≈ 8.6° — prevents chattering when person is roughly centred.
dead_zone_rad: 0.15 # radians
# ── Control loop ─────────────────────────────────────────────────────────────
control_rate: 20.0 # Hz
# ── Lost-target timeout ───────────────────────────────────────────────────────
# If no /social/persons message arrives for this many seconds, publish zero
# and enter IDLE state. The cmd_vel bridge deadman will also kick in.
lost_timeout_s: 2.0 # seconds
# ── Master enable ─────────────────────────────────────────────────────────────
# Toggle at runtime: ros2 param set /attention_node attention_enabled false
attention_enabled: true

View File

@ -0,0 +1,22 @@
# expression_params.yaml — LED mood display bridge configuration.
#
# serial_port : udev symlink or device path for the ESP32-C3 USB-CDC port.
# Recommended: create a udev rule:
# SUBSYSTEM=="tty", ATTRS{idVendor}=="303a", ATTRS{idProduct}=="1001",
# SYMLINK+="esp32-social"
#
# baud_rate : must match social_expression.ino (default 115200)
#
# idle_timeout_s : if no /social/mood message arrives for this many seconds,
# the node sends {"mood":"idle","intensity":1.0} so the ESP32 breathing
# animation is synchronised with node awareness.
#
# control_rate : how often to check the idle timeout (Hz).
# Does NOT gate the mood messages — those are forwarded immediately.
expression_node:
ros__parameters:
serial_port: /dev/esp32-social
baud_rate: 115200
idle_timeout_s: 3.0
control_rate: 10.0

View File

@ -1,3 +1,18 @@
"""
social.launch.py Launch the full saltybot social stack.
Includes:
person_state_tracker multi-modal person identity fusion (Issue #82)
expression_node /social/mood ESP32-C3 NeoPixel serial (Issue #86)
attention_node /social/persons /cmd_vel rotation (Issue #86)
Usage:
ros2 launch saltybot_social social.launch.py
ros2 launch saltybot_social social.launch.py serial_port:=/dev/ttyUSB1
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration from launch.substitutions import LaunchConfiguration
@ -5,7 +20,12 @@ from launch_ros.actions import Node
def generate_launch_description(): def generate_launch_description():
pkg = get_package_share_directory("saltybot_social")
exp_cfg = os.path.join(pkg, "config", "expression_params.yaml")
att_cfg = os.path.join(pkg, "config", "attention_params.yaml")
return LaunchDescription([ return LaunchDescription([
# person_state_tracker args (Issue #82)
DeclareLaunchArgument( DeclareLaunchArgument(
'engagement_distance', 'engagement_distance',
default_value='2.0', default_value='2.0',
@ -21,6 +41,19 @@ def generate_launch_description():
default_value='false', default_value='false',
description='Whether UWB anchor data is available' description='Whether UWB anchor data is available'
), ),
# expression_node args (Issue #86)
DeclareLaunchArgument("serial_port", default_value="/dev/esp32-social"),
DeclareLaunchArgument("baud_rate", default_value="115200"),
DeclareLaunchArgument("idle_timeout_s", default_value="3.0"),
# attention_node args (Issue #86)
DeclareLaunchArgument("kp_angular", default_value="1.0"),
DeclareLaunchArgument("max_angular_vel", default_value="0.8"),
DeclareLaunchArgument("dead_zone_rad", default_value="0.15"),
DeclareLaunchArgument("lost_timeout_s", default_value="2.0"),
DeclareLaunchArgument("attention_enabled", default_value="true"),
Node( Node(
package='saltybot_social', package='saltybot_social',
executable='person_state_tracker', executable='person_state_tracker',
@ -32,4 +65,36 @@ def generate_launch_description():
'uwb_enabled': LaunchConfiguration('uwb_enabled'), 'uwb_enabled': LaunchConfiguration('uwb_enabled'),
}], }],
), ),
Node(
package="saltybot_social",
executable="expression_node",
name="expression_node",
output="screen",
parameters=[
exp_cfg,
{
"serial_port": LaunchConfiguration("serial_port"),
"baud_rate": LaunchConfiguration("baud_rate"),
"idle_timeout_s": LaunchConfiguration("idle_timeout_s"),
},
],
),
Node(
package="saltybot_social",
executable="attention_node",
name="attention_node",
output="screen",
parameters=[
att_cfg,
{
"kp_angular": LaunchConfiguration("kp_angular"),
"max_angular_vel": LaunchConfiguration("max_angular_vel"),
"dead_zone_rad": LaunchConfiguration("dead_zone_rad"),
"lost_timeout_s": LaunchConfiguration("lost_timeout_s"),
"attention_enabled": LaunchConfiguration("attention_enabled"),
},
],
),
]) ])

View File

@ -3,7 +3,12 @@
<package format="3"> <package format="3">
<name>saltybot_social</name> <name>saltybot_social</name>
<version>0.1.0</version> <version>0.1.0</version>
<description>Multi-modal person identity fusion and state tracking for saltybot</description> <description>
Social interaction layer for saltybot.
person_state_tracker: multi-modal person identity fusion (Issue #82).
expression_node: bridges /social/mood to ESP32-C3 NeoPixel ring over serial (Issue #86).
attention_node: rotates robot toward active speaker via /social/persons bearing (Issue #86).
</description>
<maintainer email="seb@vayrette.com">seb</maintainer> <maintainer email="seb@vayrette.com">seb</maintainer>
<license>MIT</license> <license>MIT</license>
<depend>rclpy</depend> <depend>rclpy</depend>

View File

@ -0,0 +1,205 @@
"""
attention_node.py Social attention controller for saltybot.
Rotates the robot to face the active speaker or most confident detected person.
Publishes rotation-only cmd_vel (linear.x = 0) so balance is not disturbed.
Subscribes
/social/persons (saltybot_social_msgs/PersonArray)
Contains all tracked persons; active_id marks the current speaker.
bearing_rad: signed bearing in base_link (+ve = left/CCW).
Publishes
/cmd_vel (geometry_msgs/Twist)
angular.z = clamp(kp_angular * bearing, ±max_angular_vel)
linear.x = 0 always (social mode, not follow mode)
Control law
1. Pick target: person with active_id; fall back to highest-confidence person.
2. Dead zone: if |bearing| dead_zone_rad angular_z = 0.
3. Proportional: angular_z = kp_angular * bearing.
4. Clamp to ±max_angular_vel.
5. If no fresh person data for lost_timeout_s publish zero and stay idle.
State machine
ATTENDING fresh target; publishing proportional rotation
IDLE no target; publishing zero
Parameters
kp_angular (float) 1.0 proportional gain (rad/s per rad error)
max_angular_vel (float) 0.8 hard cap (rad/s)
dead_zone_rad (float) 0.15 ~8.6° dead zone around robot heading
control_rate (float) 20.0 Hz
lost_timeout_s (float) 2.0 seconds before going idle
attention_enabled (bool) True runtime kill switch
"""
import math
import time
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from saltybot_social_msgs.msg import PersonArray
# ── Pure helpers (used by tests) ──────────────────────────────────────────────
def clamp(v: float, lo: float, hi: float) -> float:
return max(lo, min(hi, v))
def select_target(persons, active_id: int):
"""
Pick the attention target from a PersonArray's persons list.
Priority:
1. Person whose track_id == active_id (speaking / explicitly active).
2. Person with highest confidence (fallback when no speaker declared).
Returns the selected Person message, or None if persons is empty.
"""
if not persons:
return None
# Prefer the declared active speaker
for p in persons:
if p.track_id == active_id:
return p
# Fall back to most confident detection
return max(persons, key=lambda p: p.confidence)
def compute_attention_cmd(
bearing_rad: float,
dead_zone_rad: float,
kp_angular: float,
max_angular_vel: float,
) -> float:
"""
Proportional rotation command toward bearing.
Returns angular_z (rad/s).
Zero inside dead zone; proportional + clamped outside.
"""
if abs(bearing_rad) <= dead_zone_rad:
return 0.0
return clamp(kp_angular * bearing_rad, -max_angular_vel, max_angular_vel)
# ── ROS2 node ─────────────────────────────────────────────────────────────────
class AttentionNode(Node):
def __init__(self):
super().__init__("attention_node")
self.declare_parameter("kp_angular", 1.0)
self.declare_parameter("max_angular_vel", 0.8)
self.declare_parameter("dead_zone_rad", 0.15)
self.declare_parameter("control_rate", 20.0)
self.declare_parameter("lost_timeout_s", 2.0)
self.declare_parameter("attention_enabled", True)
self._reload_params()
# State
self._last_persons_t = 0.0 # monotonic; 0 = never received
self._persons = []
self._active_id = -1
self._state = "idle"
self.create_subscription(
PersonArray, "/social/persons", self._persons_cb, 10
)
self._cmd_pub = self.create_publisher(Twist, "/cmd_vel", 10)
rate = self._p["control_rate"]
self._timer = self.create_timer(1.0 / rate, self._control_cb)
self.get_logger().info(
f"AttentionNode ready kp={self._p['kp_angular']} "
f"dead_zone={math.degrees(self._p['dead_zone_rad']):.1f}° "
f"max_ω={self._p['max_angular_vel']} rad/s"
)
def _reload_params(self):
self._p = {
"kp_angular": self.get_parameter("kp_angular").value,
"max_angular_vel": self.get_parameter("max_angular_vel").value,
"dead_zone_rad": self.get_parameter("dead_zone_rad").value,
"control_rate": self.get_parameter("control_rate").value,
"lost_timeout_s": self.get_parameter("lost_timeout_s").value,
"attention_enabled": self.get_parameter("attention_enabled").value,
}
# ── Callbacks ─────────────────────────────────────────────────────────────
def _persons_cb(self, msg: PersonArray):
self._persons = msg.persons
self._active_id = msg.active_id
self._last_persons_t = time.monotonic()
if self._state == "idle" and msg.persons:
self._state = "attending"
self.get_logger().info("Attention: person detected — attending")
def _control_cb(self):
self._reload_params()
twist = Twist()
if not self._p["attention_enabled"]:
self._cmd_pub.publish(twist)
return
now = time.monotonic()
fresh = (
self._last_persons_t > 0.0
and (now - self._last_persons_t) < self._p["lost_timeout_s"]
)
if not fresh:
if self._state == "attending":
self._state = "idle"
self.get_logger().info("Attention: no person — idle")
self._cmd_pub.publish(twist)
return
target = select_target(self._persons, self._active_id)
if target is None:
self._cmd_pub.publish(twist)
return
self._state = "attending"
twist.angular.z = compute_attention_cmd(
bearing_rad=target.bearing_rad,
dead_zone_rad=self._p["dead_zone_rad"],
kp_angular=self._p["kp_angular"],
max_angular_vel=self._p["max_angular_vel"],
)
self._cmd_pub.publish(twist)
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = AttentionNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,157 @@
"""
expression_node.py LED mood display bridge for saltybot social layer.
Subscribes
/social/mood (saltybot_social_msgs/Mood)
mood : "happy" | "curious" | "annoyed" | "playful" | "idle"
intensity : 0.0 .. 1.0
Behaviour
* On every /social/mood message, serialises the command as a single-line
JSON frame and writes it over the ESP32-C3 serial link:
{"mood":"happy","intensity":0.80}\n
* If no mood message arrives for idle_timeout_s, sends an explicit
idle command so the ESP32 breathing animation is synchronised with the
node's own awareness of whether the robot is active.
* Re-opens the serial port automatically if the device disconnects.
Parameters
serial_port (str) /dev/ttyUSB0 ESP32-C3 USB-CDC device
baud_rate (int) 115200
idle_timeout_s (float) 3.0 seconds before sending idle
control_rate (float) 10.0 Hz; how often to check idle
"""
import json
import threading
import time
import rclpy
from rclpy.node import Node
from saltybot_social_msgs.msg import Mood
try:
import serial
_SERIAL_AVAILABLE = True
except ImportError:
_SERIAL_AVAILABLE = False
class ExpressionNode(Node):
def __init__(self):
super().__init__("expression_node")
self.declare_parameter("serial_port", "/dev/ttyUSB0")
self.declare_parameter("baud_rate", 115200)
self.declare_parameter("idle_timeout_s", 3.0)
self.declare_parameter("control_rate", 10.0)
self._port = self.get_parameter("serial_port").value
self._baud = self.get_parameter("baud_rate").value
self._idle_to = self.get_parameter("idle_timeout_s").value
rate = self.get_parameter("control_rate").value
self._last_cmd_t = 0.0 # monotonic, 0 = never received
self._lock = threading.Lock()
self._ser = None
self.create_subscription(Mood, "/social/mood", self._mood_cb, 10)
self._timer = self.create_timer(1.0 / rate, self._tick_cb)
if _SERIAL_AVAILABLE:
threading.Thread(target=self._open_serial, daemon=True).start()
else:
self.get_logger().warn(
"pyserial not available — serial output disabled (dry-run mode)"
)
self.get_logger().info(
f"ExpressionNode ready port={self._port} baud={self._baud} "
f"idle_timeout={self._idle_to}s"
)
# ── Serial management ─────────────────────────────────────────────────────
def _open_serial(self):
"""Background thread: keep serial port open, reconnect on error."""
while rclpy.ok():
try:
with self._lock:
self._ser = serial.Serial(
self._port, self._baud, timeout=1.0
)
self.get_logger().info(f"Opened serial port {self._port}")
# Hold serial open until error
while rclpy.ok():
with self._lock:
if self._ser is None or not self._ser.is_open:
break
time.sleep(0.5)
except Exception as exc:
self.get_logger().warn(
f"Serial {self._port} error: {exc} — retry in 3 s"
)
with self._lock:
if self._ser and self._ser.is_open:
self._ser.close()
self._ser = None
time.sleep(3.0)
def _send(self, mood: str, intensity: float):
"""Serialise and write one JSON line to ESP32-C3."""
line = json.dumps({"mood": mood, "intensity": round(intensity, 3)}) + "\n"
data = line.encode("ascii")
with self._lock:
if self._ser and self._ser.is_open:
try:
self._ser.write(data)
except Exception as exc:
self.get_logger().warn(f"Serial write error: {exc}")
self._ser.close()
self._ser = None
else:
# Dry-run: log the frame
self.get_logger().debug(f"[dry-run] → {line.strip()}")
# ── Callbacks ─────────────────────────────────────────────────────────────
def _mood_cb(self, msg: Mood):
intensity = max(0.0, min(1.0, float(msg.intensity)))
self._send(msg.mood, intensity)
self._last_cmd_t = time.monotonic()
def _tick_cb(self):
"""Periodic check — send idle if we've been quiet too long."""
if self._last_cmd_t == 0.0:
return
if (time.monotonic() - self._last_cmd_t) >= self._idle_to:
self._send("idle", 1.0)
# Reset so we don't flood the ESP32 with idle frames
self._last_cmd_t = time.monotonic()
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = ExpressionNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -21,12 +21,14 @@ setup(
zip_safe=True, zip_safe=True,
maintainer='seb', maintainer='seb',
maintainer_email='seb@vayrette.com', maintainer_email='seb@vayrette.com',
description='Multi-modal person identity fusion and state tracking for saltybot', description='Social interaction layer — person state tracking, LED expression + attention',
license='MIT', license='MIT',
tests_require=['pytest'], tests_require=['pytest'],
entry_points={ entry_points={
'console_scripts': [ 'console_scripts': [
'person_state_tracker = saltybot_social.person_state_tracker_node:main', 'person_state_tracker = saltybot_social.person_state_tracker_node:main',
'expression_node = saltybot_social.expression_node:main',
'attention_node = saltybot_social.attention_node:main',
], ],
}, },
) )

View File

@ -0,0 +1,140 @@
"""
test_attention.py Unit tests for attention_node helpers.
No ROS2 / serial / GPU dependencies runs with plain pytest.
"""
import math
import pytest
from saltybot_social.attention_node import (
clamp,
compute_attention_cmd,
select_target,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
class FakePerson:
def __init__(self, track_id, bearing_rad, confidence=0.9, is_speaking=False):
self.track_id = track_id
self.bearing_rad = bearing_rad
self.confidence = confidence
self.is_speaking = is_speaking
# ── clamp ─────────────────────────────────────────────────────────────────────
class TestClamp:
def test_within_range(self):
assert clamp(0.5, 0.0, 1.0) == 0.5
def test_below_lo(self):
assert clamp(-0.5, 0.0, 1.0) == 0.0
def test_above_hi(self):
assert clamp(1.5, 0.0, 1.0) == 1.0
def test_at_boundary(self):
assert clamp(0.0, 0.0, 1.0) == 0.0
assert clamp(1.0, 0.0, 1.0) == 1.0
# ── select_target ─────────────────────────────────────────────────────────────
class TestSelectTarget:
def test_empty_returns_none(self):
assert select_target([], active_id=-1) is None
def test_picks_active_id(self):
persons = [FakePerson(1, 0.1, confidence=0.6),
FakePerson(2, 0.5, confidence=0.9)]
result = select_target(persons, active_id=1)
assert result.track_id == 1
def test_falls_back_to_highest_confidence(self):
persons = [FakePerson(1, 0.1, confidence=0.6),
FakePerson(2, 0.5, confidence=0.9),
FakePerson(3, 0.2, confidence=0.4)]
result = select_target(persons, active_id=-1)
assert result.track_id == 2
def test_single_person_no_active(self):
persons = [FakePerson(5, 0.3)]
result = select_target(persons, active_id=-1)
assert result.track_id == 5
def test_active_id_not_in_list_falls_back(self):
persons = [FakePerson(1, 0.0, confidence=0.5),
FakePerson(2, 0.2, confidence=0.95)]
result = select_target(persons, active_id=99)
assert result.track_id == 2
# ── compute_attention_cmd ─────────────────────────────────────────────────────
class TestComputeAttentionCmd:
def test_inside_dead_zone_zero(self):
cmd = compute_attention_cmd(
bearing_rad=0.05,
dead_zone_rad=0.15,
kp_angular=1.0,
max_angular_vel=0.8,
)
assert cmd == 0.0
def test_exactly_at_dead_zone_boundary_zero(self):
cmd = compute_attention_cmd(
bearing_rad=0.15,
dead_zone_rad=0.15,
kp_angular=1.0,
max_angular_vel=0.8,
)
assert cmd == 0.0
def test_positive_bearing_gives_positive_angular_z(self):
cmd = compute_attention_cmd(
bearing_rad=0.4,
dead_zone_rad=0.15,
kp_angular=1.0,
max_angular_vel=0.8,
)
assert cmd > 0.0
def test_negative_bearing_gives_negative_angular_z(self):
cmd = compute_attention_cmd(
bearing_rad=-0.4,
dead_zone_rad=0.15,
kp_angular=1.0,
max_angular_vel=0.8,
)
assert cmd < 0.0
def test_clamps_to_max_angular_vel(self):
cmd = compute_attention_cmd(
bearing_rad=math.pi, # 180° off
dead_zone_rad=0.15,
kp_angular=2.0,
max_angular_vel=0.8,
)
assert abs(cmd) <= 0.8
def test_proportional_scaling(self):
"""Double bearing → double cmd (within linear region)."""
kp = 1.0
dz = 0.05
cmd1 = compute_attention_cmd(0.3, dz, kp, 10.0)
cmd2 = compute_attention_cmd(0.6, dz, kp, 10.0)
assert abs(cmd2 - 2.0 * cmd1) < 0.01
def test_zero_bearing_zero_cmd(self):
cmd = compute_attention_cmd(
bearing_rad=0.0,
dead_zone_rad=0.1,
kp_angular=1.0,
max_angular_vel=0.8,
)
assert cmd == 0.0

View File

@ -8,6 +8,7 @@ find_package(geometry_msgs REQUIRED)
find_package(builtin_interfaces REQUIRED) find_package(builtin_interfaces REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME} rosidl_generate_interfaces(${PROJECT_NAME}
# Issue #80 face detection + recognition
"msg/FaceDetection.msg" "msg/FaceDetection.msg"
"msg/FaceDetectionArray.msg" "msg/FaceDetectionArray.msg"
"msg/FaceEmbedding.msg" "msg/FaceEmbedding.msg"
@ -18,7 +19,12 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"srv/ListPersons.srv" "srv/ListPersons.srv"
"srv/DeletePerson.srv" "srv/DeletePerson.srv"
"srv/UpdatePerson.srv" "srv/UpdatePerson.srv"
# Issue #86 LED expression + motor attention
"msg/Mood.msg"
"msg/Person.msg"
"msg/PersonArray.msg"
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
) )
ament_export_dependencies(rosidl_default_runtime)
ament_package() ament_package()

View File

@ -0,0 +1,7 @@
# Mood.msg — social expression command sent to the LED display node.
#
# mood : one of "happy", "curious", "annoyed", "playful", "idle"
# intensity : 0.0 (off) to 1.0 (full brightness)
string mood
float32 intensity

View File

@ -0,0 +1,17 @@
# Person.msg — single tracked person for social attention.
#
# bearing_rad : signed bearing in base_link frame (rad)
# positive = person to the left (CCW), negative = right (CW)
# distance_m : estimated distance in metres; 0 if unknown
# confidence : detection confidence 0..1
# is_speaking : true if mic DOA or VAD identifies this person as speaking
# source : "camera" | "mic_doa"
std_msgs/Header header
int32 track_id
float32 bearing_rad
float32 distance_m
float32 confidence
bool is_speaking
string source

View File

@ -0,0 +1,9 @@
# PersonArray.msg — all detected persons plus the current attention target.
#
# persons : all currently tracked persons
# active_id : track_id of the active/speaking person; -1 if none
std_msgs/Header header
saltybot_social_msgs/Person[] persons
int32 active_id