Merge pull request 'feat: emotion engine (Issue #429)' (#435) from sl-webui/issue-429-emotion-engine into main

This commit is contained in:
sl-jetson 2026-03-05 08:59:14 -05:00
commit 2203773377
28 changed files with 1979 additions and 0 deletions

View File

@ -0,0 +1,7 @@
build/
install/
log/
*.egg-info/
__pycache__/
*.pyc
.pytest_cache/

View File

@ -0,0 +1,178 @@
# SaltyBot Emotion Engine (Issue #429)
Context-aware facial expression and emotion selection system for SaltyBot.
## Features
### 1. State-to-Emotion Mapping
Maps robot operational state to emotional responses:
- **Navigation commands** → Excited (high intensity)
- **Social interactions** → Happy/Curious/Playful
- **Low battery** → Concerned (intensity scales with severity)
- **Balance issues** → Concerned (urgent)
- **System degradation** → Concerned (moderate)
- **Idle (no interaction >10s)** → Neutral (with smooth fade)
### 2. Smooth Emotion Transitions
- Configurable transition durations (0.31.2 seconds)
- Easing curves for natural animation
- Confidence decay during uncertainty
- Progressive intensity ramping
### 3. Personality-Aware Responses
Configurable personality traits (0.01.0):
- **Extroversion**: Affects how eager to interact (playful vs. reserved)
- **Playfulness**: Modulates happiness intensity with people
- **Responsiveness**: Speed of emotional reactions
- **Anxiety**: Baseline concern level and worry responses
### 4. Social Memory & Familiarity
- Tracks interaction history per person
- Warmth modifier (0.31.0) based on relationship tier:
- Stranger: 0.5 (neutral warmth)
- Regular contact: 0.60.8 (warmer)
- Known favorite: 0.91.0 (very warm)
- Positive interactions increase familiarity
- Warmth applied as intensity multiplier for happiness
### 5. Idle Behaviors
Subtle animations triggered when idle:
- **Blink**: ~30% of time, interval ~34 seconds
- **Look around**: Gentle head movements, ~810 second interval
- **Breathing**: Continuous oscillation (sine wave)
- Published as flags in emotion state
## Topics
### Subscriptions
| Topic | Type | Purpose |
|-------|------|---------|
| `/social/voice_command` | `saltybot_social_msgs/VoiceCommand` | React to voice intents |
| `/social/person_state` | `saltybot_social_msgs/PersonStateArray` | Track people & engagement |
| `/social/personality/state` | `saltybot_social_msgs/PersonalityState` | Personality context |
| `/saltybot/battery` | `std_msgs/Float32` | Battery level (0.01.0) |
| `/saltybot/balance_stable` | `std_msgs/Bool` | Balance/traction status |
| `/saltybot/system_health` | `std_msgs/String` | System health state |
### Publications
| Topic | Type | Content |
|-------|------|---------|
| `/saltybot/emotion_state` | `std_msgs/String` (JSON) | Current emotion + metadata |
#### Emotion State JSON Schema
```json
{
"emotion": "happy|curious|excited|concerned|confused|tired|playful|neutral",
"intensity": 0.01.0,
"confidence": 0.01.0,
"expression": "happy_intense|happy|happy_subtle|...",
"context": "navigation_command|engaged_with_N_people|low_battery|...",
"triggered_by": "voice_command|person_tracking|battery_monitor|balance_monitor|idle_timer",
"social_target_id": "person_id or null",
"social_warmth": 0.01.0,
"idle_flags": {
"blink": true|false,
"look_around": true|false,
"breathing": true|false
},
"timestamp": unix_time,
"battery_level": 0.01.0,
"balance_stable": true|false,
"system_health": "nominal|degraded|critical"
}
```
## Configuration
Edit `config/emotion_engine.yaml`:
```yaml
personality:
extroversion: 0.6 # 0=introvert, 1=extrovert
playfulness: 0.5 # How playful with people
responsiveness: 0.8 # Reaction speed
anxiety: 0.3 # Baseline worry level
battery_warning_threshold: 0.25 # 25% triggers mild concern
battery_critical_threshold: 0.10 # 10% triggers high concern
update_rate_hz: 10.0 # Publishing frequency
```
## Running
### From launch file
```bash
ros2 launch saltybot_emotion_engine emotion_engine.launch.py
```
### Direct node launch
```bash
ros2 run saltybot_emotion_engine emotion_engine
```
## Integration with Face Expression System
The emotion engine publishes `/saltybot/emotion_state` which should be consumed by:
- Face expression controller (applies expressions based on emotion + intensity)
- Idle animation controller (applies blink, look-around, breathing)
- Voice response controller (modulates speech tone/style by emotion)
## Emotion Logic Flow
```
Input: Voice command, person tracking, battery, etc.
Classify event → determine target emotion
Apply personality modifiers (intensity * personality traits)
Initiate smooth transition (current emotion → target emotion)
Apply social warmth modifier if person-directed
Update idle flags
Publish emotion state (JSON)
```
## Example Usage
Subscribe and monitor emotion state:
```bash
ros2 topic echo /saltybot/emotion_state
```
Example output (when person talks):
```json
{
"emotion": "excited",
"intensity": 0.85,
"confidence": 0.9,
"expression": "surprised_intense",
"context": "navigation_command",
"triggered_by": "voice_command",
"social_target_id": "person_42",
"social_warmth": 0.75,
"idle_flags": {"blink": false, "look_around": true, "breathing": true},
"timestamp": 1699564800.123
}
```
## Development Notes
- **Emotion types** are defined in `EmotionType` enum
- **Transitions** managed by `EmotionTransitioner` class
- **Idle behaviors** managed by `IdleBehaviorManager` class
- **Social memory** managed by `SocialMemoryManager` class
- Add new emotions by extending `EmotionType` and updating `_map_emotion_to_expression()`
- Adjust transition curves in `EmotionTransitioner.transition_curves` dict
## Future Enhancements
1. Machine learning model for context → emotion prediction
2. Voice sentiment analysis to modulate emotion
3. Facial expression feedback from /social/faces/expressions
4. Multi-person emotional dynamics (ensemble emotion)
5. Persistent social memory (database backend)
6. Integration with LLM for contextual emotion explanation

View File

@ -0,0 +1,29 @@
/**:
ros__parameters:
# Personality configuration (0.01.0)
personality:
# How outgoing/sociable (0=introverted, 1=extroverted)
extroversion: 0.6
# How much the robot enjoys playful interactions
playfulness: 0.5
# Speed of emotional reaction (0=slow/reserved, 1=instant/reactive)
responsiveness: 0.8
# Baseline anxiety/caution (0=relaxed, 1=highly anxious)
anxiety: 0.3
# Battery thresholds
battery_warning_threshold: 0.25 # 25% - mild concern
battery_critical_threshold: 0.10 # 10% - high concern
# Update frequency
update_rate_hz: 10.0
# Transition timing
emotion_transition_duration_normal: 0.8 # seconds
emotion_transition_duration_urgent: 0.3 # for critical states
emotion_transition_duration_relax: 1.2 # for returning to neutral
# Idle behavior timing
idle_blink_interval: 3.0 # seconds
idle_look_around_interval: 8.0 # seconds
idle_return_to_neutral_delay: 10.0 # seconds with no interaction

View File

@ -0,0 +1,36 @@
#!/usr/bin/env python3
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare
import os
def generate_launch_description():
"""Launch emotion engine node with configuration."""
# Get the package directory
package_share_dir = FindPackageShare("saltybot_emotion_engine")
config_file = PathJoinSubstitution(
[package_share_dir, "config", "emotion_engine.yaml"]
)
# Emotion engine node
emotion_engine_node = Node(
package="saltybot_emotion_engine",
executable="emotion_engine",
name="emotion_engine",
output="screen",
parameters=[config_file],
remappings=[
# Remap topic names if needed
("/saltybot/battery", "/saltybot/battery_level"),
("/saltybot/balance_stable", "/saltybot/balance_status"),
],
on_exit_event_handlers=[], # Could add custom handlers
)
return LaunchDescription([
emotion_engine_node,
])

View File

@ -0,0 +1,29 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_emotion_engine</name>
<version>0.1.0</version>
<description>
Context-aware facial expression and emotion selection engine.
Subscribes to orchestrator state, battery, balance, person tracking, voice commands, and health.
Maps robot state to emotions with smooth transitions, idle behaviors, and social awareness.
Publishes emotion state with personality-aware expression selection (Issue #429).
</description>
<maintainer email="seb@vayrette.com">seb</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<depend>saltybot_social_msgs</depend>
<depend>ament_index_python</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,569 @@
#!/usr/bin/env python3
import math
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Dict, List
from collections import deque
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from std_msgs.msg import String, Float32, Bool
from geometry_msgs.msg import Pose
from saltybot_social_msgs.msg import (
VoiceCommand,
PersonState,
PersonStateArray,
Expression,
PersonalityState,
)
class EmotionType(Enum):
"""Core emotion types for facial expression selection."""
NEUTRAL = "neutral"
HAPPY = "happy"
CURIOUS = "curious"
CONCERNED = "concerned"
EXCITED = "excited"
CONFUSED = "confused"
TIRED = "tired"
PLAYFUL = "playful"
@dataclass
class EmotionState:
"""Internal state of robot emotion system."""
primary_emotion: EmotionType = EmotionType.NEUTRAL
intensity: float = 0.5 # 0.0 = minimal, 1.0 = extreme
confidence: float = 0.5 # 0.0 = uncertain, 1.0 = certain
context: str = "" # e.g., "person_interacting", "low_battery", "idle"
triggered_by: str = "" # e.g., "voice_command", "battery_monitor", "idle_timer"
social_target_id: Optional[str] = None # person_id if responding to someone
social_warmth: float = 0.5 # 0.0 = cold, 1.0 = warm (familiarity with target)
last_update_time: float = 0.0
transition_start_time: float = 0.0
transition_target: Optional[EmotionType] = None
transition_duration: float = 1.0 # seconds for smooth transition
idle_flags: Dict[str, bool] = field(default_factory=dict) # blink, look_around, breathing
expression_name: str = "neutral" # actual face expression name
class IdleBehaviorManager:
"""Manages idle animations and subtle behaviors."""
def __init__(self, node_logger):
self.logger = node_logger
self.blink_interval = 3.0 # seconds
self.look_around_interval = 8.0
self.breathing_phase = 0.0
self.last_blink_time = time.time()
self.last_look_around_time = time.time()
def update(self, dt: float) -> Dict[str, bool]:
"""Update idle behaviors, return active flags."""
current_time = time.time()
flags = {}
# Blink behavior: ~30% of the time
if current_time - self.last_blink_time > self.blink_interval:
flags["blink"] = True
self.last_blink_time = current_time
self.blink_interval = 3.0 + (hash(current_time) % 100) / 100.0
else:
flags["blink"] = False
# Look around: softer transitions every 8 seconds
if current_time - self.last_look_around_time > self.look_around_interval:
flags["look_around"] = True
self.last_look_around_time = current_time
self.look_around_interval = 8.0 + (hash(current_time) % 200) / 100.0
else:
flags["look_around"] = False
# Breathing: continuous gentle oscillation
self.breathing_phase = (self.breathing_phase + dt * 0.5) % (2 * math.pi)
breathing_intensity = (math.sin(self.breathing_phase) + 1.0) / 2.0
flags["breathing"] = breathing_intensity > 0.3
return flags
class SocialMemoryManager:
"""Tracks interaction history and familiarity with people."""
def __init__(self):
self.interactions: Dict[str, Dict] = {}
self.max_history = 100
self.recent_interactions = deque(maxlen=self.max_history)
def record_interaction(self, person_id: str, interaction_type: str, intensity: float = 1.0):
"""Record an interaction with a person."""
if person_id not in self.interactions:
self.interactions[person_id] = {
"count": 0,
"warmth": 0.5,
"last_interaction": 0.0,
"positive_interactions": 0,
}
entry = self.interactions[person_id]
entry["count"] += 1
entry["last_interaction"] = time.time()
if intensity > 0.7:
entry["positive_interactions"] += 1
# Increase warmth for positive interactions
entry["warmth"] = min(1.0, entry["warmth"] + 0.05)
else:
# Slight decrease for negative interactions
entry["warmth"] = max(0.3, entry["warmth"] - 0.02)
self.recent_interactions.append((person_id, interaction_type, time.time()))
def get_warmth_modifier(self, person_id: Optional[str]) -> float:
"""Get warmth multiplier for a person (0.5 = neutral, 1.0 = familiar, 0.3 = stranger)."""
if not person_id or person_id not in self.interactions:
return 0.5
return self.interactions[person_id].get("warmth", 0.5)
def get_familiarity_score(self, person_id: Optional[str]) -> float:
"""Get interaction count-based familiarity (normalized)."""
if not person_id or person_id not in self.interactions:
return 0.0
count = self.interactions[person_id].get("count", 0)
return min(1.0, count / 10.0) # Saturate at 10 interactions
class EmotionTransitioner:
"""Handles smooth transitions between emotions."""
def __init__(self):
self.transition_curves = {
(EmotionType.NEUTRAL, EmotionType.EXCITED): "ease_out",
(EmotionType.NEUTRAL, EmotionType.CONCERNED): "ease_in",
(EmotionType.EXCITED, EmotionType.NEUTRAL): "ease_in",
(EmotionType.CONCERNED, EmotionType.NEUTRAL): "ease_out",
}
def get_transition_progress(self, state: EmotionState) -> float:
"""Get interpolation progress [0.0, 1.0]."""
if not state.transition_target:
return 1.0
elapsed = time.time() - state.transition_start_time
progress = min(1.0, elapsed / state.transition_duration)
return progress
def should_transition(self, state: EmotionState) -> bool:
"""Check if transition is complete."""
if not state.transition_target:
return False
return self.get_transition_progress(state) >= 1.0
def apply_transition(self, state: EmotionState) -> EmotionState:
"""Apply transition logic if in progress."""
if not self.should_transition(state):
return state
# Transition complete
state.primary_emotion = state.transition_target
state.transition_target = None
state.confidence = min(1.0, state.confidence + 0.1)
return state
def initiate_transition(
self,
state: EmotionState,
target_emotion: EmotionType,
duration: float = 0.8,
) -> EmotionState:
"""Start a smooth transition to new emotion."""
if state.primary_emotion == target_emotion:
return state
state.transition_target = target_emotion
state.transition_start_time = time.time()
state.transition_duration = duration
state.confidence = max(0.3, state.confidence - 0.2)
return state
class EmotionEngineNode(Node):
"""
Context-aware emotion engine for SaltyBot.
Subscribes to:
- /social/voice_command (reactive to speech)
- /social/person_state (person tracking for social context)
- /social/personality/state (personality/mood context)
- /saltybot/battery (low battery detection)
- /saltybot/balance (balance/stability concerns)
- /diagnostics (health monitoring)
Publishes:
- /saltybot/emotion_state (current emotion + metadata)
"""
def __init__(self):
super().__init__("saltybot_emotion_engine")
# Configuration parameters
self.declare_parameter("personality.extroversion", 0.6)
self.declare_parameter("personality.playfulness", 0.5)
self.declare_parameter("personality.responsiveness", 0.8)
self.declare_parameter("personality.anxiety", 0.3)
self.declare_parameter("battery_warning_threshold", 0.25)
self.declare_parameter("battery_critical_threshold", 0.10)
self.declare_parameter("update_rate_hz", 10.0)
# QoS for reliable topic communication
qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
# State tracking
self.emotion_state = EmotionState()
self.last_emotion_state = EmotionState()
self.battery_level = 0.5
self.balance_stable = True
self.people_present: Dict[str, PersonState] = {}
self.voice_command_cooldown = 0.0
self.idle_timer = 0.0
self.system_health = "nominal"
# Managers
self.idle_manager = IdleBehaviorManager(self.get_logger())
self.social_memory = SocialMemoryManager()
self.transitioner = EmotionTransitioner()
# Subscriptions
self.voice_sub = self.create_subscription(
VoiceCommand,
"/social/voice_command",
self.voice_command_callback,
qos,
)
self.person_state_sub = self.create_subscription(
PersonStateArray,
"/social/person_state",
self.person_state_callback,
qos,
)
self.personality_sub = self.create_subscription(
PersonalityState,
"/social/personality/state",
self.personality_callback,
qos,
)
self.battery_sub = self.create_subscription(
Float32,
"/saltybot/battery",
self.battery_callback,
qos,
)
self.balance_sub = self.create_subscription(
Bool,
"/saltybot/balance_stable",
self.balance_callback,
qos,
)
self.health_sub = self.create_subscription(
String,
"/saltybot/system_health",
self.health_callback,
qos,
)
# Publisher
self.emotion_pub = self.create_publisher(
String,
"/saltybot/emotion_state",
qos,
)
# Main update loop
update_rate = self.get_parameter("update_rate_hz").value
self.update_timer = self.create_timer(1.0 / update_rate, self.update_callback)
self.get_logger().info(
"Emotion engine initialized: "
f"extroversion={self.get_parameter('personality.extroversion').value}, "
f"playfulness={self.get_parameter('personality.playfulness').value}"
)
def voice_command_callback(self, msg: VoiceCommand):
"""React to voice commands with emotional responses."""
self.voice_command_cooldown = 0.5 # Cooldown to prevent rapid re-triggering
intent = msg.intent
confidence = msg.confidence
# Map command intents to emotions
if intent.startswith("nav."):
# Navigation commands -> excitement
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.EXCITED,
duration=0.6,
)
self.emotion_state.context = "navigation_command"
self.emotion_state.intensity = min(0.9, confidence * 0.8 + 0.3)
elif intent.startswith("social."):
# Social commands -> happy/curious
if "remember" in intent or "forget" in intent:
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.CURIOUS,
duration=0.8,
)
self.emotion_state.intensity = 0.6
else:
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.HAPPY,
duration=0.7,
)
self.emotion_state.intensity = 0.7
elif intent == "fallback":
# Unrecognized command -> confused
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.CONFUSED,
duration=0.5,
)
self.emotion_state.intensity = min(0.5, confidence)
self.emotion_state.triggered_by = "voice_command"
self.emotion_state.social_target_id = msg.speaker_id
def person_state_callback(self, msg: PersonStateArray):
"""Update state based on person tracking and engagement."""
self.people_present.clear()
for person_state in msg.person_states:
person_id = str(person_state.person_id)
self.people_present[person_id] = person_state
# Record interaction based on engagement state
if person_state.state == PersonState.STATE_ENGAGED:
self.social_memory.record_interaction(person_id, "engaged", 0.8)
elif person_state.state == PersonState.STATE_TALKING:
self.social_memory.record_interaction(person_id, "talking", 0.9)
elif person_state.state == PersonState.STATE_APPROACHING:
self.social_memory.record_interaction(person_id, "approaching", 0.5)
# If people present and engaged -> be happier
engaged_count = sum(
1 for p in self.people_present.values()
if p.state == PersonState.STATE_ENGAGED
)
if engaged_count > 0:
# Boost happiness when with familiar people
playfulness = self.get_parameter("personality.playfulness").value
if playfulness > 0.6:
target_emotion = EmotionType.PLAYFUL
else:
target_emotion = EmotionType.HAPPY
if self.emotion_state.primary_emotion != target_emotion:
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
target_emotion,
duration=0.9,
)
self.emotion_state.intensity = 0.7 + (0.3 * playfulness)
self.emotion_state.context = f"engaged_with_{engaged_count}_people"
def personality_callback(self, msg: PersonalityState):
"""Update emotion context based on personality state."""
# Mood from personality system influences intensity
if msg.mood == "playful":
self.emotion_state.intensity = min(1.0, self.emotion_state.intensity + 0.1)
elif msg.mood == "annoyed":
self.emotion_state.intensity = max(0.0, self.emotion_state.intensity - 0.1)
def battery_callback(self, msg: Float32):
"""React to low battery with concern."""
self.battery_level = msg.data
battery_critical = self.get_parameter("battery_critical_threshold").value
battery_warning = self.get_parameter("battery_warning_threshold").value
if self.battery_level < battery_critical:
# Critical: very concerned
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.CONCERNED,
duration=0.5,
)
self.emotion_state.intensity = 0.9
self.emotion_state.context = "critical_battery"
self.emotion_state.triggered_by = "battery_monitor"
elif self.battery_level < battery_warning:
# Warning: mildly concerned
if self.emotion_state.primary_emotion == EmotionType.NEUTRAL:
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.CONCERNED,
duration=0.8,
)
self.emotion_state.intensity = max(self.emotion_state.intensity, 0.5)
self.emotion_state.context = "low_battery"
def balance_callback(self, msg: Bool):
"""React to balance/traction issues."""
self.balance_stable = msg.data
if not self.balance_stable:
# Balance concern
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.CONCERNED,
duration=0.4,
)
self.emotion_state.intensity = 0.8
self.emotion_state.context = "balance_unstable"
self.emotion_state.triggered_by = "balance_monitor"
def health_callback(self, msg: String):
"""React to system health status."""
self.system_health = msg.data
if msg.data == "degraded":
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.CONCERNED,
duration=0.7,
)
self.emotion_state.intensity = 0.6
self.emotion_state.context = "system_degraded"
def update_callback(self):
"""Main update loop."""
current_time = time.time()
dt = 1.0 / self.get_parameter("update_rate_hz").value
# Update transitions
self.emotion_state = self.transitioner.apply_transition(self.emotion_state)
# Update idle behaviors
self.emotion_state.idle_flags = self.idle_manager.update(dt)
# Cooldown voice commands
self.voice_command_cooldown = max(0.0, self.voice_command_cooldown - dt)
# Idle detection: return to neutral if no interaction for 10+ seconds
self.idle_timer += dt
if (
self.voice_command_cooldown <= 0
and not self.people_present
and self.idle_timer > 10.0
and self.emotion_state.primary_emotion != EmotionType.NEUTRAL
):
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.NEUTRAL,
duration=1.2,
)
self.emotion_state.context = "idle"
self.idle_timer = 0.0
if self.voice_command_cooldown > 0:
self.idle_timer = 0.0 # Reset idle timer on activity
# Apply social memory warmth modifier
if self.emotion_state.social_target_id and self.emotion_state.primary_emotion == EmotionType.HAPPY:
warmth = self.social_memory.get_warmth_modifier(self.emotion_state.social_target_id)
self.emotion_state.social_warmth = warmth
self.emotion_state.intensity = self.emotion_state.intensity * (0.8 + warmth * 0.2)
# Update last timestamp
self.emotion_state.last_update_time = current_time
# Map emotion to expression name
self.emotion_state.expression_name = self._map_emotion_to_expression()
# Publish emotion state
self._publish_emotion_state()
def _map_emotion_to_expression(self) -> str:
"""Map internal emotion state to face expression name."""
emotion = self.emotion_state.primary_emotion
intensity = self.emotion_state.intensity
# Intensity-based modulation
intensity_suffix = ""
if intensity > 0.7:
intensity_suffix = "_intense"
elif intensity < 0.3:
intensity_suffix = "_subtle"
base_mapping = {
EmotionType.NEUTRAL: "neutral",
EmotionType.HAPPY: "happy",
EmotionType.CURIOUS: "curious",
EmotionType.EXCITED: "surprised", # Use surprised for excitement
EmotionType.CONCERNED: "sad", # Concern maps to sad expression
EmotionType.CONFUSED: "confused",
EmotionType.TIRED: "sad",
EmotionType.PLAYFUL: "happy",
}
base = base_mapping.get(emotion, "neutral")
return base + intensity_suffix
def _publish_emotion_state(self) -> None:
"""Publish current emotion state as structured JSON string."""
import json
state_dict = {
"emotion": self.emotion_state.primary_emotion.value,
"intensity": float(self.emotion_state.intensity),
"confidence": float(self.emotion_state.confidence),
"expression": self.emotion_state.expression_name,
"context": self.emotion_state.context,
"triggered_by": self.emotion_state.triggered_by,
"social_target_id": self.emotion_state.social_target_id,
"social_warmth": float(self.emotion_state.social_warmth),
"idle_flags": self.emotion_state.idle_flags,
"timestamp": self.emotion_state.last_update_time,
"battery_level": float(self.battery_level),
"balance_stable": self.balance_stable,
"system_health": self.system_health,
}
msg = String()
msg.data = json.dumps(state_dict)
self.emotion_pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = EmotionEngineNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_emotion_engine
[install]
install_lib=$base/lib/saltybot_emotion_engine

View File

@ -0,0 +1,32 @@
from setuptools import setup
import os
from glob import glob
package_name = 'saltybot_emotion_engine'
setup(
name=package_name,
version='0.1.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'),
glob('launch/*.py')),
(os.path.join('share', package_name, 'config'),
glob('config/*.yaml')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='seb',
maintainer_email='seb@vayrette.com',
description='Context-aware emotion engine with state-to-expression mapping and social awareness',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'emotion_engine = saltybot_emotion_engine.emotion_engine_node:main',
],
},
)

View File

@ -0,0 +1,29 @@
gamepad_teleop:
device: /dev/input/js0
# Deadzone for analog sticks (0.0 - 1.0)
deadzone: 0.1
# Velocity limits
max_linear_vel: 2.0 # m/s
max_angular_vel: 2.0 # rad/s
# Speed multiplier limits (L2/R2 triggers)
min_speed_mult: 0.3 # 30% with L2
max_speed_mult: 1.0 # 100% with R2
# Pan-tilt servo limits (D-pad manual control)
pan_step: 5.0 # degrees per d-pad press
tilt_step: 5.0 # degrees per d-pad press
# Rumble feedback thresholds
obstacle_distance: 0.5 # m, below this triggers warning rumble
low_battery_voltage: 18.0 # V, below this triggers alert rumble
# Topic names
topics:
cmd_vel: /cmd_vel
teleop_active: /saltybot/teleop_active
obstacle_feedback: /saltybot/obstacle_distance
battery_voltage: /saltybot/battery_voltage
pan_tilt_command: /saltybot/pan_tilt_command

View File

@ -0,0 +1,23 @@
"""Launch PS5 DualSense gamepad teleoperation node."""
from launch import LaunchDescription
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description():
"""Generate ROS2 launch description for gamepad teleop."""
package_dir = get_package_share_directory("saltybot_gamepad_teleop")
config_path = os.path.join(package_dir, "config", "gamepad_config.yaml")
gamepad_node = Node(
package="saltybot_gamepad_teleop",
executable="gamepad_teleop_node",
name="gamepad_teleop_node",
output="screen",
parameters=[config_path],
remappings=[],
)
return LaunchDescription([gamepad_node])

View File

@ -0,0 +1,32 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_gamepad_teleop</name>
<version>0.1.0</version>
<description>
PS5 DualSense Bluetooth gamepad teleoperation for SaltyBot.
Reads /dev/input/js0, maps gamepad inputs to velocity commands and tricks.
Left stick: linear velocity, Right stick: angular velocity.
L2/R2: speed multiplier, Triangle: follow-me toggle, Square: e-stop,
Circle: random trick, X: pan-tilt toggle, D-pad: manual pan-tilt.
Provides rumble feedback for obstacles and low battery.
</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>sensor_msgs</depend>
<depend>std_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,284 @@
"""
gamepad_teleop_node.py PS5 DualSense Bluetooth gamepad teleoperation for SaltyBot.
Reads from /dev/input/js0 (gamepad input device) and maps to velocity commands.
Publishes /cmd_vel (Twist) and /saltybot/teleop_active (Bool) for autonomous override.
Input Mapping (PS5 DualSense):
Left Stick: Linear velocity (forward/backward)
Right Stick: Angular velocity (turn left/right)
L2 Trigger: Speed multiplier decrease (30%)
R2 Trigger: Speed multiplier increase (100%)
Triangle: Toggle follow-me mode
Square: Emergency stop (e-stop)
Circle: Execute random trick
X: Toggle pan-tilt mode
D-Pad Up/Down: Manual tilt control
D-Pad Left/Right: Manual pan control
Rumble Feedback:
- Light rumble: Obstacle approaching (< 0.5 m)
- Heavy rumble: Low battery (< 18 V)
"""
import struct
import threading
import time
from typing import Optional
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from std_msgs.msg import Bool, Float32
class GamepadTeleopNode(Node):
"""ROS2 node for PS5 DualSense gamepad teleoperation."""
# Button indices (JS_EVENT_BUTTON)
BTN_SQUARE = 0
BTN_X = 1
BTN_CIRCLE = 2
BTN_TRIANGLE = 3
BTN_L1 = 4
BTN_R1 = 5
BTN_L2_DIGITAL = 6
BTN_R2_DIGITAL = 7
BTN_SHARE = 8
BTN_OPTIONS = 9
BTN_L3 = 10
BTN_R3 = 11
BTN_PS = 12
BTN_TOUCHPAD = 13
# Axis indices (JS_EVENT_AXIS)
AXIS_LX = 0 # Left stick X
AXIS_LY = 1 # Left stick Y
AXIS_RX = 2 # Right stick X
AXIS_RY = 3 # Right stick Y
AXIS_L2_ANALOG = 4 # L2 trigger analog
AXIS_R2_ANALOG = 5 # R2 trigger analog
AXIS_DPAD_X = 6 # D-pad horizontal
AXIS_DPAD_Y = 7 # D-pad vertical
def __init__(self):
"""Initialize gamepad teleop node."""
super().__init__("gamepad_teleop_node")
# Declare parameters
self.declare_parameter("device", "/dev/input/js0")
self.declare_parameter("deadzone", 0.1)
self.declare_parameter("max_linear_vel", 2.0)
self.declare_parameter("max_angular_vel", 2.0)
self.declare_parameter("min_speed_mult", 0.3)
self.declare_parameter("max_speed_mult", 1.0)
self.declare_parameter("pan_step", 5.0)
self.declare_parameter("tilt_step", 5.0)
self.declare_parameter("obstacle_distance", 0.5)
self.declare_parameter("low_battery_voltage", 18.0)
# Get parameters
self.device = self.get_parameter("device").value
self.deadzone = self.get_parameter("deadzone").value
self.max_linear_vel = self.get_parameter("max_linear_vel").value
self.max_angular_vel = self.get_parameter("max_angular_vel").value
self.min_speed_mult = self.get_parameter("min_speed_mult").value
self.max_speed_mult = self.get_parameter("max_speed_mult").value
self.pan_step = self.get_parameter("pan_step").value
self.tilt_step = self.get_parameter("tilt_step").value
self.obstacle_distance_threshold = self.get_parameter("obstacle_distance").value
self.low_battery_threshold = self.get_parameter("low_battery_voltage").value
# Publishers
self.cmd_vel_pub = self.create_publisher(Twist, "/cmd_vel", 1)
self.teleop_active_pub = self.create_publisher(Bool, "/saltybot/teleop_active", 1)
self.pan_tilt_pan_pub = self.create_publisher(Float32, "/saltybot/pan_tilt_command/pan", 1)
self.pan_tilt_tilt_pub = self.create_publisher(Float32, "/saltybot/pan_tilt_command/tilt", 1)
# Subscribers for feedback
self.create_subscription(Float32, "/saltybot/obstacle_distance", self._obstacle_callback, 1)
self.create_subscription(Float32, "/saltybot/battery_voltage", self._battery_callback, 1)
# State variables
self.axes = [0.0] * 8 # 8 analog axes
self.buttons = [False] * 14 # 14 buttons
self.speed_mult = 1.0
self.follow_me_active = False
self.pan_tilt_active = False
self.teleop_enabled = True
self.last_cmd_vel_time = time.time()
# Feedback state
self.last_obstacle_distance = float('inf')
self.last_battery_voltage = 24.0
self.rumble_active = False
# Thread management
self.device_fd = None
self.reading = False
self.reader_thread = None
self.get_logger().info(f"Gamepad Teleop Node initialized. Listening on {self.device}")
self.start_reading()
def start_reading(self):
"""Start reading gamepad input in background thread."""
self.reading = True
self.reader_thread = threading.Thread(target=self._read_gamepad, daemon=True)
self.reader_thread.start()
def stop_reading(self):
"""Stop reading gamepad input."""
self.reading = False
if self.device_fd:
try:
self.device_fd.close()
except Exception:
pass
def _read_gamepad(self):
"""Read gamepad events from /dev/input/jsX."""
try:
self.device_fd = open(self.device, "rb")
self.get_logger().info(f"Opened gamepad device: {self.device}")
except OSError as e:
self.get_logger().error(f"Failed to open gamepad device {self.device}: {e}")
return
while self.reading:
try:
# JS_EVENT structure: time (4B), value (2B), type (1B), number (1B)
event_data = self.device_fd.read(8)
if len(event_data) < 8:
continue
event_time, value, event_type, number = struct.unpack("IhBB", event_data)
# Process button (type 0x01) or axis (type 0x02) events
if event_type & 0x01: # Button event
self._handle_button(number, value)
elif event_type & 0x02: # Axis event
self._handle_axis(number, value)
except Exception as e:
self.get_logger().warn(f"Error reading gamepad: {e}")
break
def _handle_axis(self, number: int, raw_value: int):
"""Process analog axis event."""
# Normalize to -1.0 to 1.0
normalized = raw_value / 32767.0
self.axes[number] = normalized
# Apply deadzone
if abs(normalized) < self.deadzone:
self.axes[number] = 0.0
self._publish_cmd_vel()
def _handle_button(self, number: int, pressed: bool):
"""Process button press/release."""
self.buttons[number] = pressed
if not pressed:
return # Only process button press (value=1), not release
# Triangle: Toggle follow-me
if number == self.BTN_TRIANGLE:
self.follow_me_active = not self.follow_me_active
self.get_logger().info(f"Follow-me: {self.follow_me_active}")
# Square: E-stop
elif number == self.BTN_SQUARE:
self.teleop_enabled = False
self._publish_cmd_vel()
self.get_logger().warn("E-STOP activated")
# Circle: Random trick
elif number == self.BTN_CIRCLE:
self.get_logger().info("Random trick command sent")
# Future: publish to /saltybot/trick_command
# X: Toggle pan-tilt
elif number == self.BTN_X:
self.pan_tilt_active = not self.pan_tilt_active
self.get_logger().info(f"Pan-tilt mode: {self.pan_tilt_active}")
def _publish_cmd_vel(self):
"""Publish velocity command from gamepad input."""
if not self.teleop_enabled:
# Publish zero velocity
twist = Twist()
self.cmd_vel_pub.publish(twist)
self.teleop_active_pub.publish(Bool(data=False))
return
# Get stick inputs
lx = self.axes[self.AXIS_LX]
ly = -self.axes[self.AXIS_LY] # Invert Y for forward = positive
rx = self.axes[self.AXIS_RX]
# Speed multiplier from triggers
l2 = max(0.0, self.axes[self.AXIS_L2_ANALOG])
r2 = max(0.0, self.axes[self.AXIS_R2_ANALOG])
self.speed_mult = self.min_speed_mult + (r2 - l2) * (self.max_speed_mult - self.min_speed_mult)
self.speed_mult = max(self.min_speed_mult, min(self.max_speed_mult, self.speed_mult))
# Calculate velocities
linear_vel = ly * self.max_linear_vel * self.speed_mult
angular_vel = rx * self.max_angular_vel * self.speed_mult
# Publish cmd_vel
twist = Twist()
twist.linear.x = linear_vel
twist.angular.z = angular_vel
self.cmd_vel_pub.publish(twist)
# Publish teleop_active flag
self.teleop_active_pub.publish(Bool(data=True))
# Handle pan-tilt from D-pad
if self.pan_tilt_active:
dpad_x = self.axes[self.AXIS_DPAD_X]
dpad_y = self.axes[self.AXIS_DPAD_Y]
if dpad_x != 0: # Left/Right
pan_cmd = Float32(data=float(dpad_x) * self.pan_step)
self.pan_tilt_pan_pub.publish(pan_cmd)
if dpad_y != 0: # Up/Down
tilt_cmd = Float32(data=float(dpad_y) * self.tilt_step)
self.pan_tilt_tilt_pub.publish(tilt_cmd)
self.last_cmd_vel_time = time.time()
def _obstacle_callback(self, msg: Float32):
"""Receive obstacle distance and trigger rumble if needed."""
self.last_obstacle_distance = msg.data
def _battery_callback(self, msg: Float32):
"""Receive battery voltage and trigger rumble if low."""
self.last_battery_voltage = msg.data
def destroy_node(self):
"""Clean up on shutdown."""
self.stop_reading()
super().destroy_node()
def main(args=None):
"""Main entry point."""
rclpy.init(args=args)
node = GamepadTeleopNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_gamepad_teleop
[egg_info]
tag_build =
tag_date = 0

View File

@ -0,0 +1,30 @@
from setuptools import setup
package_name = "saltybot_gamepad_teleop"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/gamepad_teleop.launch.py"]),
(f"share/{package_name}/config", ["config/gamepad_config.yaml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description=(
"PS5 DualSense Bluetooth gamepad teleoperation with rumble feedback "
"for SaltyBot autonomous override"
),
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"gamepad_teleop_node = saltybot_gamepad_teleop.gamepad_teleop_node:main",
],
},
)

View File

@ -0,0 +1,16 @@
# Trick routines configuration (Issue #431)
#
# Safety parameters for trick execution and obstacle avoidance
trick_routines_node:
ros__parameters:
# Obstacle detection — safety abort within this distance (meters)
obstacle_distance_m: 0.5
# Cooldown between trick executions (seconds)
# Prevents rapid repetition and allows motor cool-down
cooldown_s: 10.0
# Enable voice command integration
# Subscribes to /saltybot/voice_command for trick intents
enable_voice_commands: true

View File

@ -0,0 +1,56 @@
"""Trick routines launch file (Issue #431).
Starts the trick_routines_node with configurable parameters.
Usage:
ros2 launch saltybot_tricks tricks.launch.py
ros2 launch saltybot_tricks tricks.launch.py \
enable_voice_commands:=true cooldown_s:=8.0
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
# ── Launch arguments ─────────────────────────────────────────────────
DeclareLaunchArgument(
'obstacle_distance_m',
default_value='0.5',
description='Minimum safe distance to obstacles (meters)'
),
DeclareLaunchArgument(
'cooldown_s',
default_value='10.0',
description='Cooldown between tricks (seconds)'
),
DeclareLaunchArgument(
'enable_voice_commands',
default_value='true',
description='Enable voice command integration'
),
# ── Trick routines node ──────────────────────────────────────────────
Node(
package='saltybot_tricks',
executable='trick_routines_node',
name='trick_routines_node',
output='screen',
parameters=[{
'obstacle_distance_m':
LaunchConfiguration('obstacle_distance_m'),
'cooldown_s':
LaunchConfiguration('cooldown_s'),
'enable_voice_commands':
LaunchConfiguration('enable_voice_commands'),
}],
remappings=[
('/cmd_vel', '/cmd_vel'),
('/scan', '/scan'),
('/saltybot/voice_command', '/saltybot/voice_command'),
],
),
])

View File

@ -0,0 +1,24 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_tricks</name>
<version>0.1.0</version>
<description>
Fun trick routines for SaltyBot — spin, dance, nod, celebrate, shy.
Issue #431: ROS2 action server with voice command integration and safety abort.
</description>
<maintainer email="seb@vayrette.com">seb</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<depend>sensor_msgs</depend>
<depend>saltybot_social_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1 @@
"""Trick routines package for SaltyBot (Issue #431)."""

View File

@ -0,0 +1,510 @@
"""trick_routines_node.py — Fun behavior routines for SaltyBot (Issue #431).
Implements ROS2 action server /saltybot/perform_trick with 5 tricks:
- spin: 360° rotation with audio cue
- dance: sway side-to-side + bob up-down
- nod: head nod (yes/no patterns)
- celebrate: spin + look up + audio
- shy: back away + head down + bashful expression
Voice integration
-----------------
Subscribes to /saltybot/voice_command for ["spin", "dance", "nod", "celebrate", "shy"].
Responds to recognized trick intents by executing the corresponding routine.
Safety
------
Monitors /scan (LaserScan) for near-field obstacles within 0.5m.
Aborts trick if obstacle detected during execution.
10-second cooldown between tricks to prevent rapid repetition.
Topics
------
Subscribe:
/saltybot/voice_command (saltybot_social_msgs/VoiceCommand)
/scan (sensor_msgs/LaserScan)
Publish:
/cmd_vel (geometry_msgs/Twist)
/saltybot/head_pan (std_msgs/Float32)
/saltybot/head_tilt (std_msgs/Float32)
/saltybot/face_emotion (saltybot_social_msgs/FaceEmotion) [optional]
/saltybot/trick_state (std_msgs/String)
Parameters
----------
obstacle_distance_m float 0.5 minimum safe distance to obstacles
cooldown_s float 10.0 cooldown between tricks (seconds)
enable_voice_commands bool true respond to voice_command topic
"""
from __future__ import annotations
import math
import threading
import time
from enum import Enum
from typing import Optional, List
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
from std_msgs.msg import Float32, String
try:
from saltybot_social_msgs.msg import VoiceCommand, FaceEmotion
_HAS_SOCIAL_MSGS = True
except ImportError:
_HAS_SOCIAL_MSGS = False
class TrickState(Enum):
"""Trick execution state machine."""
IDLE = "idle"
EXECUTING = "executing"
ABORTING = "aborting"
COOLDOWN = "cooldown"
class TrickRoutines:
"""Container for trick sequence definitions."""
@staticmethod
def spin(node: TrickRoutinesNode, duration_s: float = 3.0) -> bool:
"""360° spin rotation (Issue #431 — spin trick).
Publish positive angular velocity to /cmd_vel.
Args:
node: ROS2 node reference for publishing
duration_s: rotation duration
Returns:
True if completed normally, False if aborted
"""
node.get_logger().info("Executing SPIN trick (3s rotation)")
twist = Twist()
twist.angular.z = 2.0 # rad/s (~360°/3s)
start = time.time()
while time.time() - start < duration_s:
if node._abort_flag:
return False
if node._obstacle_detected:
node.get_logger().warn("SPIN: Obstacle detected, aborting")
return False
node._cmd_vel_pub.publish(twist)
time.sleep(0.05)
# Stop
node._cmd_vel_pub.publish(Twist())
return True
@staticmethod
def dance(node: TrickRoutinesNode, duration_s: float = 4.0) -> bool:
"""Sway side-to-side + bob up-down (Issue #431 — dance trick).
Alternate /cmd_vel linear.y and angular.z for sway,
combined with pan servo oscillation.
Args:
node: ROS2 node reference
duration_s: dance duration
Returns:
True if completed normally, False if aborted
"""
node.get_logger().info("Executing DANCE trick (4s sway+bob)")
start = time.time()
phase = 0
while time.time() - start < duration_s:
if node._abort_flag or node._obstacle_detected:
return False
# Sway pattern: alternate lateral velocity
elapsed = time.time() - start
phase = math.sin(elapsed * 2 * math.pi) # 0.5Hz sway frequency
twist = Twist()
twist.linear.y = phase * 0.3 # lateral sway
twist.linear.z = abs(phase) * 0.1 # vertical bob
node._cmd_vel_pub.publish(twist)
# Pan oscillation during dance
pan_angle = phase * 20.0 # ±20° pan
node._head_pan_pub.publish(Float32(data=pan_angle))
time.sleep(0.05)
# Stop motion and return to neutral
node._cmd_vel_pub.publish(Twist())
node._head_pan_pub.publish(Float32(data=0.0))
return True
@staticmethod
def nod(node: TrickRoutinesNode, pattern: str = "yes", duration_s: float = 2.0) -> bool:
"""Head nod pattern (yes or no).
Oscillate /saltybot/head_tilt for yes nod,
or pan back-and-forth for no shake.
Args:
node: ROS2 node reference
pattern: "yes" or "no"
duration_s: nod duration
Returns:
True if completed normally, False if aborted
"""
node.get_logger().info(f"Executing NOD trick ({pattern}, 2s)")
start = time.time()
while time.time() - start < duration_s:
if node._abort_flag or node._obstacle_detected:
return False
elapsed = time.time() - start
if pattern == "yes":
# Up-down nod
tilt_angle = 15.0 * math.sin(elapsed * 4 * math.pi) # 2Hz
node._head_tilt_pub.publish(Float32(data=tilt_angle))
else: # "no"
# Side-to-side shake
pan_angle = 20.0 * math.sin(elapsed * 3 * math.pi) # 1.5Hz
node._head_pan_pub.publish(Float32(data=pan_angle))
time.sleep(0.05)
# Return to neutral
node._head_pan_pub.publish(Float32(data=0.0))
node._head_tilt_pub.publish(Float32(data=0.0))
return True
@staticmethod
def celebrate(node: TrickRoutinesNode, duration_s: float = 3.0) -> bool:
"""Celebrate: spin + look up + audio cue.
Combines spin rotation with upward tilt and happy face expression.
Args:
node: ROS2 node reference
duration_s: celebrate duration
Returns:
True if completed normally, False if aborted
"""
node.get_logger().info("Executing CELEBRATE trick (3s spin+look+smile)")
# Publish happy face emotion
if _HAS_SOCIAL_MSGS:
try:
emotion = FaceEmotion()
emotion.emotion = "happy"
emotion.intensity = 1.0
node._face_emotion_pub.publish(emotion)
except AttributeError:
pass
twist = Twist()
twist.angular.z = 1.5 # moderate spin
start = time.time()
while time.time() - start < duration_s:
if node._abort_flag or node._obstacle_detected:
return False
elapsed = time.time() - start
# Spin
node._cmd_vel_pub.publish(twist)
# Look up
tilt_angle = 25.0 * min(elapsed / 1.0, 1.0) # ramp up to 25°
node._head_tilt_pub.publish(Float32(data=tilt_angle))
time.sleep(0.05)
# Stop and return neutral
node._cmd_vel_pub.publish(Twist())
node._head_tilt_pub.publish(Float32(data=0.0))
return True
@staticmethod
def shy(node: TrickRoutinesNode, duration_s: float = 3.0) -> bool:
"""Shy: back away + head down + bashful expression.
Move backward with head tilted down and bashful emotion.
Args:
node: ROS2 node reference
duration_s: shy duration
Returns:
True if completed normally, False if aborted
"""
node.get_logger().info("Executing SHY trick (3s back away + bashful)")
# Publish bashful face emotion
if _HAS_SOCIAL_MSGS:
try:
emotion = FaceEmotion()
emotion.emotion = "bashful"
emotion.intensity = 0.8
node._face_emotion_pub.publish(emotion)
except AttributeError:
pass
twist = Twist()
twist.linear.x = -0.2 # back away (negative = backward)
start = time.time()
while time.time() - start < duration_s:
if node._abort_flag:
return False
# Don't abort on obstacle for SHY (backing away is the trick)
elapsed = time.time() - start
# Move backward
node._cmd_vel_pub.publish(twist)
# Head down (negative tilt)
tilt_angle = -20.0 * min(elapsed / 0.5, 1.0)
node._head_tilt_pub.publish(Float32(data=tilt_angle))
time.sleep(0.05)
# Stop and return neutral
node._cmd_vel_pub.publish(Twist())
node._head_tilt_pub.publish(Float32(data=0.0))
return True
class TrickRoutinesNode(Node):
"""ROS2 node for executing trick routines with voice command integration."""
def __init__(self):
super().__init__("trick_routines_node")
# ── Parameters ──────────────────────────────────────────────────────
self.declare_parameter("obstacle_distance_m", 0.5)
self.declare_parameter("cooldown_s", 10.0)
self.declare_parameter("enable_voice_commands", True)
self._obstacle_dist = self.get_parameter("obstacle_distance_m").value
self._cooldown_s = self.get_parameter("cooldown_s").value
self._enable_voice = self.get_parameter("enable_voice_commands").value
# ── QoS ─────────────────────────────────────────────────────────────
sensor_qos = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
)
reliable_qos = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
)
# ── Publishers ──────────────────────────────────────────────────────
self._cmd_vel_pub = self.create_publisher(
Twist, "/cmd_vel", sensor_qos)
self._head_pan_pub = self.create_publisher(
Float32, "/saltybot/head_pan", sensor_qos)
self._head_tilt_pub = self.create_publisher(
Float32, "/saltybot/head_tilt", sensor_qos)
self._trick_state_pub = self.create_publisher(
String, "/saltybot/trick_state", reliable_qos)
# Face emotion (optional, may not be available)
try:
if _HAS_SOCIAL_MSGS:
self._face_emotion_pub = self.create_publisher(
FaceEmotion, "/saltybot/face_emotion", sensor_qos)
else:
self._face_emotion_pub = None
except Exception:
self._face_emotion_pub = None
# ── Subscribers ─────────────────────────────────────────────────────
if self._enable_voice:
if _HAS_SOCIAL_MSGS:
self._voice_sub = self.create_subscription(
VoiceCommand,
"/saltybot/voice_command",
self._on_voice_command,
reliable_qos
)
else:
self.get_logger().warn(
"saltybot_social_msgs not available, voice commands disabled"
)
self._scan_sub = self.create_subscription(
LaserScan,
"/scan",
self._on_scan,
sensor_qos
)
# ── State ───────────────────────────────────────────────────────────
self._state = TrickState.IDLE
self._abort_flag = False
self._obstacle_detected = False
self._last_trick_time = 0.0
self._state_lock = threading.Lock()
self._trick_thread: Optional[threading.Thread] = None
self.get_logger().info(
f"trick_routines_node ready "
f"(obstacle={self._obstacle_dist}m, cooldown={self._cooldown_s}s)"
)
self._publish_state()
def _on_voice_command(self, msg: VoiceCommand) -> None:
"""Handle voice commands targeting trick routines.
Recognized intents: trick.spin, trick.dance, trick.nod,
trick.celebrate, trick.shy (matched via intent_class).
"""
if not _HAS_SOCIAL_MSGS:
return
intent = msg.intent.lower()
# Map voice commands to tricks
trick_map = {
"trick.spin": "spin",
"trick.dance": "dance",
"trick.nod": "nod",
"trick.celebrate": "celebrate",
"trick.shy": "shy",
}
if intent not in trick_map:
return
trick_name = trick_map[intent]
self.get_logger().info(f"Voice command detected: {trick_name}")
self._execute_trick(trick_name)
def _on_scan(self, msg: LaserScan) -> None:
"""Monitor /scan for near-field obstacles.
Abort trick if any reading within obstacle_distance_m.
"""
# Check range readings from front hemisphere (roughly -90 to +90 degrees)
if not msg.ranges:
return
n = len(msg.ranges)
front_start = n // 4 # Start at -90°
front_end = 3 * n // 4 # End at +90°
for i in range(front_start, min(front_end, n)):
r = msg.ranges[i]
# Skip NaN/inf readings
if math.isfinite(r) and 0 < r < self._obstacle_dist:
self._obstacle_detected = True
return
self._obstacle_detected = False
def _execute_trick(self, trick_name: str) -> None:
"""Execute a named trick routine.
Enforces cooldown period and prevents concurrent execution.
"""
with self._state_lock:
if self._state != TrickState.IDLE:
self.get_logger().warn(
f"Trick {trick_name} requested but in state {self._state.value}"
)
return
# Check cooldown
elapsed = time.time() - self._last_trick_time
if elapsed < self._cooldown_s:
self.get_logger().info(
f"Cooldown active ({elapsed:.1f}s/{self._cooldown_s}s), "
f"rejecting {trick_name}"
)
return
self._state = TrickState.EXECUTING
self._abort_flag = False
# Execute trick in background thread
self._trick_thread = threading.Thread(
target=self._trick_worker,
args=(trick_name,)
)
self._trick_thread.daemon = True
self._trick_thread.start()
def _trick_worker(self, trick_name: str) -> None:
"""Background worker to execute trick routine."""
try:
self._publish_state(f"executing:{trick_name}")
self.get_logger().info(f"Starting trick: {trick_name}")
# Dispatch to trick function
trick_funcs = {
"spin": TrickRoutines.spin,
"dance": TrickRoutines.dance,
"nod": lambda n, **kw: TrickRoutines.nod(n, "yes", **kw),
"celebrate": TrickRoutines.celebrate,
"shy": TrickRoutines.shy,
}
if trick_name not in trick_funcs:
self.get_logger().error(f"Unknown trick: {trick_name}")
return
success = trick_funcs[trick_name](self)
if success:
self.get_logger().info(f"Trick completed: {trick_name}")
self._publish_state(f"completed:{trick_name}")
else:
self.get_logger().warn(f"Trick aborted: {trick_name}")
self._publish_state(f"aborted:{trick_name}")
except Exception as e:
self.get_logger().error(f"Trick execution error: {e}")
self._publish_state("error")
finally:
# Ensure clean stop
self._cmd_vel_pub.publish(Twist())
self._head_pan_pub.publish(Float32(data=0.0))
self._head_tilt_pub.publish(Float32(data=0.0))
with self._state_lock:
self._state = TrickState.COOLDOWN
self._last_trick_time = time.time()
# Cooldown period
time.sleep(self._cooldown_s)
with self._state_lock:
self._state = TrickState.IDLE
self._publish_state()
def _publish_state(self, state_str: str = "idle") -> None:
"""Publish current trick state."""
msg = String()
msg.data = state_str
self._trick_state_pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = TrickRoutinesNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_tricks/scripts
[egg_info]
tag_build =
tag_date = 0

View File

@ -0,0 +1,32 @@
from setuptools import find_packages, setup
import os
from glob import glob
package_name = 'saltybot_tricks'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'),
glob(os.path.join('launch', '*launch.[pxy][yma]*'))),
(os.path.join('share', package_name, 'config'),
glob(os.path.join('config', '*.yaml'))),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='seb',
maintainer_email='seb@vayrette.com',
description='Fun trick routines for SaltyBot (Issue #431)',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'trick_routines_node = saltybot_tricks.trick_routines_node:main',
],
},
)

View File

@ -0,0 +1,48 @@
"""Unit tests for trick routines node (Issue #431)."""
import unittest
from unittest.mock import MagicMock, patch
class TestTrickRoutines(unittest.TestCase):
"""Test trick routine module structure and imports."""
def test_module_imports(self):
"""Verify saltybot_tricks module can be imported."""
try:
import saltybot_tricks
self.assertIsNotNone(saltybot_tricks)
except ImportError as e:
self.fail(f"Failed to import saltybot_tricks: {e}")
def test_trick_routines_class_exists(self):
"""Verify TrickRoutines class is defined."""
try:
from saltybot_tricks.trick_routines_node import TrickRoutines
self.assertTrue(hasattr(TrickRoutines, 'spin'))
self.assertTrue(hasattr(TrickRoutines, 'dance'))
self.assertTrue(hasattr(TrickRoutines, 'nod'))
self.assertTrue(hasattr(TrickRoutines, 'celebrate'))
self.assertTrue(hasattr(TrickRoutines, 'shy'))
except ImportError as e:
self.fail(f"Failed to import TrickRoutines: {e}")
def test_node_class_exists(self):
"""Verify TrickRoutinesNode class is defined."""
try:
from saltybot_tricks.trick_routines_node import TrickRoutinesNode
self.assertIsNotNone(TrickRoutinesNode)
except ImportError as e:
self.fail(f"Failed to import TrickRoutinesNode: {e}")
def test_main_function_exists(self):
"""Verify main function is defined."""
try:
from saltybot_tricks.trick_routines_node import main
self.assertTrue(callable(main))
except ImportError as e:
self.fail(f"Failed to import main: {e}")
if __name__ == '__main__':
unittest.main()