feat: emotion engine (Issue #429) #435

Merged
sl-jetson merged 2 commits from sl-webui/issue-429-emotion-engine into main 2026-03-05 08:59:15 -05:00
19 changed files with 1287 additions and 0 deletions
Showing only changes of commit 7000cbfbd3 - Show all commits

View File

@ -0,0 +1,7 @@
build/
install/
log/
*.egg-info/
__pycache__/
*.pyc
.pytest_cache/

View File

@ -0,0 +1,178 @@
# SaltyBot Emotion Engine (Issue #429)
Context-aware facial expression and emotion selection system for SaltyBot.
## Features
### 1. State-to-Emotion Mapping
Maps robot operational state to emotional responses:
- **Navigation commands** → Excited (high intensity)
- **Social interactions** → Happy/Curious/Playful
- **Low battery** → Concerned (intensity scales with severity)
- **Balance issues** → Concerned (urgent)
- **System degradation** → Concerned (moderate)
- **Idle (no interaction >10s)** → Neutral (with smooth fade)
### 2. Smooth Emotion Transitions
- Configurable transition durations (0.31.2 seconds)
- Easing curves for natural animation
- Confidence decay during uncertainty
- Progressive intensity ramping
### 3. Personality-Aware Responses
Configurable personality traits (0.01.0):
- **Extroversion**: Affects how eager to interact (playful vs. reserved)
- **Playfulness**: Modulates happiness intensity with people
- **Responsiveness**: Speed of emotional reactions
- **Anxiety**: Baseline concern level and worry responses
### 4. Social Memory & Familiarity
- Tracks interaction history per person
- Warmth modifier (0.31.0) based on relationship tier:
- Stranger: 0.5 (neutral warmth)
- Regular contact: 0.60.8 (warmer)
- Known favorite: 0.91.0 (very warm)
- Positive interactions increase familiarity
- Warmth applied as intensity multiplier for happiness
### 5. Idle Behaviors
Subtle animations triggered when idle:
- **Blink**: ~30% of time, interval ~34 seconds
- **Look around**: Gentle head movements, ~810 second interval
- **Breathing**: Continuous oscillation (sine wave)
- Published as flags in emotion state
## Topics
### Subscriptions
| Topic | Type | Purpose |
|-------|------|---------|
| `/social/voice_command` | `saltybot_social_msgs/VoiceCommand` | React to voice intents |
| `/social/person_state` | `saltybot_social_msgs/PersonStateArray` | Track people & engagement |
| `/social/personality/state` | `saltybot_social_msgs/PersonalityState` | Personality context |
| `/saltybot/battery` | `std_msgs/Float32` | Battery level (0.01.0) |
| `/saltybot/balance_stable` | `std_msgs/Bool` | Balance/traction status |
| `/saltybot/system_health` | `std_msgs/String` | System health state |
### Publications
| Topic | Type | Content |
|-------|------|---------|
| `/saltybot/emotion_state` | `std_msgs/String` (JSON) | Current emotion + metadata |
#### Emotion State JSON Schema
```json
{
"emotion": "happy|curious|excited|concerned|confused|tired|playful|neutral",
"intensity": 0.01.0,
"confidence": 0.01.0,
"expression": "happy_intense|happy|happy_subtle|...",
"context": "navigation_command|engaged_with_N_people|low_battery|...",
"triggered_by": "voice_command|person_tracking|battery_monitor|balance_monitor|idle_timer",
"social_target_id": "person_id or null",
"social_warmth": 0.01.0,
"idle_flags": {
"blink": true|false,
"look_around": true|false,
"breathing": true|false
},
"timestamp": unix_time,
"battery_level": 0.01.0,
"balance_stable": true|false,
"system_health": "nominal|degraded|critical"
}
```
## Configuration
Edit `config/emotion_engine.yaml`:
```yaml
personality:
extroversion: 0.6 # 0=introvert, 1=extrovert
playfulness: 0.5 # How playful with people
responsiveness: 0.8 # Reaction speed
anxiety: 0.3 # Baseline worry level
battery_warning_threshold: 0.25 # 25% triggers mild concern
battery_critical_threshold: 0.10 # 10% triggers high concern
update_rate_hz: 10.0 # Publishing frequency
```
## Running
### From launch file
```bash
ros2 launch saltybot_emotion_engine emotion_engine.launch.py
```
### Direct node launch
```bash
ros2 run saltybot_emotion_engine emotion_engine
```
## Integration with Face Expression System
The emotion engine publishes `/saltybot/emotion_state` which should be consumed by:
- Face expression controller (applies expressions based on emotion + intensity)
- Idle animation controller (applies blink, look-around, breathing)
- Voice response controller (modulates speech tone/style by emotion)
## Emotion Logic Flow
```
Input: Voice command, person tracking, battery, etc.
Classify event → determine target emotion
Apply personality modifiers (intensity * personality traits)
Initiate smooth transition (current emotion → target emotion)
Apply social warmth modifier if person-directed
Update idle flags
Publish emotion state (JSON)
```
## Example Usage
Subscribe and monitor emotion state:
```bash
ros2 topic echo /saltybot/emotion_state
```
Example output (when person talks):
```json
{
"emotion": "excited",
"intensity": 0.85,
"confidence": 0.9,
"expression": "surprised_intense",
"context": "navigation_command",
"triggered_by": "voice_command",
"social_target_id": "person_42",
"social_warmth": 0.75,
"idle_flags": {"blink": false, "look_around": true, "breathing": true},
"timestamp": 1699564800.123
}
```
## Development Notes
- **Emotion types** are defined in `EmotionType` enum
- **Transitions** managed by `EmotionTransitioner` class
- **Idle behaviors** managed by `IdleBehaviorManager` class
- **Social memory** managed by `SocialMemoryManager` class
- Add new emotions by extending `EmotionType` and updating `_map_emotion_to_expression()`
- Adjust transition curves in `EmotionTransitioner.transition_curves` dict
## Future Enhancements
1. Machine learning model for context → emotion prediction
2. Voice sentiment analysis to modulate emotion
3. Facial expression feedback from /social/faces/expressions
4. Multi-person emotional dynamics (ensemble emotion)
5. Persistent social memory (database backend)
6. Integration with LLM for contextual emotion explanation

View File

@ -0,0 +1,29 @@
/**:
ros__parameters:
# Personality configuration (0.01.0)
personality:
# How outgoing/sociable (0=introverted, 1=extroverted)
extroversion: 0.6
# How much the robot enjoys playful interactions
playfulness: 0.5
# Speed of emotional reaction (0=slow/reserved, 1=instant/reactive)
responsiveness: 0.8
# Baseline anxiety/caution (0=relaxed, 1=highly anxious)
anxiety: 0.3
# Battery thresholds
battery_warning_threshold: 0.25 # 25% - mild concern
battery_critical_threshold: 0.10 # 10% - high concern
# Update frequency
update_rate_hz: 10.0
# Transition timing
emotion_transition_duration_normal: 0.8 # seconds
emotion_transition_duration_urgent: 0.3 # for critical states
emotion_transition_duration_relax: 1.2 # for returning to neutral
# Idle behavior timing
idle_blink_interval: 3.0 # seconds
idle_look_around_interval: 8.0 # seconds
idle_return_to_neutral_delay: 10.0 # seconds with no interaction

View File

@ -0,0 +1,36 @@
#!/usr/bin/env python3
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.substitutions import PathJoinSubstitution
from launch_ros.substitutions import FindPackageShare
import os
def generate_launch_description():
"""Launch emotion engine node with configuration."""
# Get the package directory
package_share_dir = FindPackageShare("saltybot_emotion_engine")
config_file = PathJoinSubstitution(
[package_share_dir, "config", "emotion_engine.yaml"]
)
# Emotion engine node
emotion_engine_node = Node(
package="saltybot_emotion_engine",
executable="emotion_engine",
name="emotion_engine",
output="screen",
parameters=[config_file],
remappings=[
# Remap topic names if needed
("/saltybot/battery", "/saltybot/battery_level"),
("/saltybot/balance_stable", "/saltybot/balance_status"),
],
on_exit_event_handlers=[], # Could add custom handlers
)
return LaunchDescription([
emotion_engine_node,
])

View File

@ -0,0 +1,29 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_emotion_engine</name>
<version>0.1.0</version>
<description>
Context-aware facial expression and emotion selection engine.
Subscribes to orchestrator state, battery, balance, person tracking, voice commands, and health.
Maps robot state to emotions with smooth transitions, idle behaviors, and social awareness.
Publishes emotion state with personality-aware expression selection (Issue #429).
</description>
<maintainer email="seb@vayrette.com">seb</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<depend>saltybot_social_msgs</depend>
<depend>ament_index_python</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,569 @@
#!/usr/bin/env python3
import math
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Optional, Dict, List
from collections import deque
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from std_msgs.msg import String, Float32, Bool
from geometry_msgs.msg import Pose
from saltybot_social_msgs.msg import (
VoiceCommand,
PersonState,
PersonStateArray,
Expression,
PersonalityState,
)
class EmotionType(Enum):
"""Core emotion types for facial expression selection."""
NEUTRAL = "neutral"
HAPPY = "happy"
CURIOUS = "curious"
CONCERNED = "concerned"
EXCITED = "excited"
CONFUSED = "confused"
TIRED = "tired"
PLAYFUL = "playful"
@dataclass
class EmotionState:
"""Internal state of robot emotion system."""
primary_emotion: EmotionType = EmotionType.NEUTRAL
intensity: float = 0.5 # 0.0 = minimal, 1.0 = extreme
confidence: float = 0.5 # 0.0 = uncertain, 1.0 = certain
context: str = "" # e.g., "person_interacting", "low_battery", "idle"
triggered_by: str = "" # e.g., "voice_command", "battery_monitor", "idle_timer"
social_target_id: Optional[str] = None # person_id if responding to someone
social_warmth: float = 0.5 # 0.0 = cold, 1.0 = warm (familiarity with target)
last_update_time: float = 0.0
transition_start_time: float = 0.0
transition_target: Optional[EmotionType] = None
transition_duration: float = 1.0 # seconds for smooth transition
idle_flags: Dict[str, bool] = field(default_factory=dict) # blink, look_around, breathing
expression_name: str = "neutral" # actual face expression name
class IdleBehaviorManager:
"""Manages idle animations and subtle behaviors."""
def __init__(self, node_logger):
self.logger = node_logger
self.blink_interval = 3.0 # seconds
self.look_around_interval = 8.0
self.breathing_phase = 0.0
self.last_blink_time = time.time()
self.last_look_around_time = time.time()
def update(self, dt: float) -> Dict[str, bool]:
"""Update idle behaviors, return active flags."""
current_time = time.time()
flags = {}
# Blink behavior: ~30% of the time
if current_time - self.last_blink_time > self.blink_interval:
flags["blink"] = True
self.last_blink_time = current_time
self.blink_interval = 3.0 + (hash(current_time) % 100) / 100.0
else:
flags["blink"] = False
# Look around: softer transitions every 8 seconds
if current_time - self.last_look_around_time > self.look_around_interval:
flags["look_around"] = True
self.last_look_around_time = current_time
self.look_around_interval = 8.0 + (hash(current_time) % 200) / 100.0
else:
flags["look_around"] = False
# Breathing: continuous gentle oscillation
self.breathing_phase = (self.breathing_phase + dt * 0.5) % (2 * math.pi)
breathing_intensity = (math.sin(self.breathing_phase) + 1.0) / 2.0
flags["breathing"] = breathing_intensity > 0.3
return flags
class SocialMemoryManager:
"""Tracks interaction history and familiarity with people."""
def __init__(self):
self.interactions: Dict[str, Dict] = {}
self.max_history = 100
self.recent_interactions = deque(maxlen=self.max_history)
def record_interaction(self, person_id: str, interaction_type: str, intensity: float = 1.0):
"""Record an interaction with a person."""
if person_id not in self.interactions:
self.interactions[person_id] = {
"count": 0,
"warmth": 0.5,
"last_interaction": 0.0,
"positive_interactions": 0,
}
entry = self.interactions[person_id]
entry["count"] += 1
entry["last_interaction"] = time.time()
if intensity > 0.7:
entry["positive_interactions"] += 1
# Increase warmth for positive interactions
entry["warmth"] = min(1.0, entry["warmth"] + 0.05)
else:
# Slight decrease for negative interactions
entry["warmth"] = max(0.3, entry["warmth"] - 0.02)
self.recent_interactions.append((person_id, interaction_type, time.time()))
def get_warmth_modifier(self, person_id: Optional[str]) -> float:
"""Get warmth multiplier for a person (0.5 = neutral, 1.0 = familiar, 0.3 = stranger)."""
if not person_id or person_id not in self.interactions:
return 0.5
return self.interactions[person_id].get("warmth", 0.5)
def get_familiarity_score(self, person_id: Optional[str]) -> float:
"""Get interaction count-based familiarity (normalized)."""
if not person_id or person_id not in self.interactions:
return 0.0
count = self.interactions[person_id].get("count", 0)
return min(1.0, count / 10.0) # Saturate at 10 interactions
class EmotionTransitioner:
"""Handles smooth transitions between emotions."""
def __init__(self):
self.transition_curves = {
(EmotionType.NEUTRAL, EmotionType.EXCITED): "ease_out",
(EmotionType.NEUTRAL, EmotionType.CONCERNED): "ease_in",
(EmotionType.EXCITED, EmotionType.NEUTRAL): "ease_in",
(EmotionType.CONCERNED, EmotionType.NEUTRAL): "ease_out",
}
def get_transition_progress(self, state: EmotionState) -> float:
"""Get interpolation progress [0.0, 1.0]."""
if not state.transition_target:
return 1.0
elapsed = time.time() - state.transition_start_time
progress = min(1.0, elapsed / state.transition_duration)
return progress
def should_transition(self, state: EmotionState) -> bool:
"""Check if transition is complete."""
if not state.transition_target:
return False
return self.get_transition_progress(state) >= 1.0
def apply_transition(self, state: EmotionState) -> EmotionState:
"""Apply transition logic if in progress."""
if not self.should_transition(state):
return state
# Transition complete
state.primary_emotion = state.transition_target
state.transition_target = None
state.confidence = min(1.0, state.confidence + 0.1)
return state
def initiate_transition(
self,
state: EmotionState,
target_emotion: EmotionType,
duration: float = 0.8,
) -> EmotionState:
"""Start a smooth transition to new emotion."""
if state.primary_emotion == target_emotion:
return state
state.transition_target = target_emotion
state.transition_start_time = time.time()
state.transition_duration = duration
state.confidence = max(0.3, state.confidence - 0.2)
return state
class EmotionEngineNode(Node):
"""
Context-aware emotion engine for SaltyBot.
Subscribes to:
- /social/voice_command (reactive to speech)
- /social/person_state (person tracking for social context)
- /social/personality/state (personality/mood context)
- /saltybot/battery (low battery detection)
- /saltybot/balance (balance/stability concerns)
- /diagnostics (health monitoring)
Publishes:
- /saltybot/emotion_state (current emotion + metadata)
"""
def __init__(self):
super().__init__("saltybot_emotion_engine")
# Configuration parameters
self.declare_parameter("personality.extroversion", 0.6)
self.declare_parameter("personality.playfulness", 0.5)
self.declare_parameter("personality.responsiveness", 0.8)
self.declare_parameter("personality.anxiety", 0.3)
self.declare_parameter("battery_warning_threshold", 0.25)
self.declare_parameter("battery_critical_threshold", 0.10)
self.declare_parameter("update_rate_hz", 10.0)
# QoS for reliable topic communication
qos = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
# State tracking
self.emotion_state = EmotionState()
self.last_emotion_state = EmotionState()
self.battery_level = 0.5
self.balance_stable = True
self.people_present: Dict[str, PersonState] = {}
self.voice_command_cooldown = 0.0
self.idle_timer = 0.0
self.system_health = "nominal"
# Managers
self.idle_manager = IdleBehaviorManager(self.get_logger())
self.social_memory = SocialMemoryManager()
self.transitioner = EmotionTransitioner()
# Subscriptions
self.voice_sub = self.create_subscription(
VoiceCommand,
"/social/voice_command",
self.voice_command_callback,
qos,
)
self.person_state_sub = self.create_subscription(
PersonStateArray,
"/social/person_state",
self.person_state_callback,
qos,
)
self.personality_sub = self.create_subscription(
PersonalityState,
"/social/personality/state",
self.personality_callback,
qos,
)
self.battery_sub = self.create_subscription(
Float32,
"/saltybot/battery",
self.battery_callback,
qos,
)
self.balance_sub = self.create_subscription(
Bool,
"/saltybot/balance_stable",
self.balance_callback,
qos,
)
self.health_sub = self.create_subscription(
String,
"/saltybot/system_health",
self.health_callback,
qos,
)
# Publisher
self.emotion_pub = self.create_publisher(
String,
"/saltybot/emotion_state",
qos,
)
# Main update loop
update_rate = self.get_parameter("update_rate_hz").value
self.update_timer = self.create_timer(1.0 / update_rate, self.update_callback)
self.get_logger().info(
"Emotion engine initialized: "
f"extroversion={self.get_parameter('personality.extroversion').value}, "
f"playfulness={self.get_parameter('personality.playfulness').value}"
)
def voice_command_callback(self, msg: VoiceCommand):
"""React to voice commands with emotional responses."""
self.voice_command_cooldown = 0.5 # Cooldown to prevent rapid re-triggering
intent = msg.intent
confidence = msg.confidence
# Map command intents to emotions
if intent.startswith("nav."):
# Navigation commands -> excitement
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.EXCITED,
duration=0.6,
)
self.emotion_state.context = "navigation_command"
self.emotion_state.intensity = min(0.9, confidence * 0.8 + 0.3)
elif intent.startswith("social."):
# Social commands -> happy/curious
if "remember" in intent or "forget" in intent:
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.CURIOUS,
duration=0.8,
)
self.emotion_state.intensity = 0.6
else:
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.HAPPY,
duration=0.7,
)
self.emotion_state.intensity = 0.7
elif intent == "fallback":
# Unrecognized command -> confused
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.CONFUSED,
duration=0.5,
)
self.emotion_state.intensity = min(0.5, confidence)
self.emotion_state.triggered_by = "voice_command"
self.emotion_state.social_target_id = msg.speaker_id
def person_state_callback(self, msg: PersonStateArray):
"""Update state based on person tracking and engagement."""
self.people_present.clear()
for person_state in msg.person_states:
person_id = str(person_state.person_id)
self.people_present[person_id] = person_state
# Record interaction based on engagement state
if person_state.state == PersonState.STATE_ENGAGED:
self.social_memory.record_interaction(person_id, "engaged", 0.8)
elif person_state.state == PersonState.STATE_TALKING:
self.social_memory.record_interaction(person_id, "talking", 0.9)
elif person_state.state == PersonState.STATE_APPROACHING:
self.social_memory.record_interaction(person_id, "approaching", 0.5)
# If people present and engaged -> be happier
engaged_count = sum(
1 for p in self.people_present.values()
if p.state == PersonState.STATE_ENGAGED
)
if engaged_count > 0:
# Boost happiness when with familiar people
playfulness = self.get_parameter("personality.playfulness").value
if playfulness > 0.6:
target_emotion = EmotionType.PLAYFUL
else:
target_emotion = EmotionType.HAPPY
if self.emotion_state.primary_emotion != target_emotion:
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
target_emotion,
duration=0.9,
)
self.emotion_state.intensity = 0.7 + (0.3 * playfulness)
self.emotion_state.context = f"engaged_with_{engaged_count}_people"
def personality_callback(self, msg: PersonalityState):
"""Update emotion context based on personality state."""
# Mood from personality system influences intensity
if msg.mood == "playful":
self.emotion_state.intensity = min(1.0, self.emotion_state.intensity + 0.1)
elif msg.mood == "annoyed":
self.emotion_state.intensity = max(0.0, self.emotion_state.intensity - 0.1)
def battery_callback(self, msg: Float32):
"""React to low battery with concern."""
self.battery_level = msg.data
battery_critical = self.get_parameter("battery_critical_threshold").value
battery_warning = self.get_parameter("battery_warning_threshold").value
if self.battery_level < battery_critical:
# Critical: very concerned
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.CONCERNED,
duration=0.5,
)
self.emotion_state.intensity = 0.9
self.emotion_state.context = "critical_battery"
self.emotion_state.triggered_by = "battery_monitor"
elif self.battery_level < battery_warning:
# Warning: mildly concerned
if self.emotion_state.primary_emotion == EmotionType.NEUTRAL:
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.CONCERNED,
duration=0.8,
)
self.emotion_state.intensity = max(self.emotion_state.intensity, 0.5)
self.emotion_state.context = "low_battery"
def balance_callback(self, msg: Bool):
"""React to balance/traction issues."""
self.balance_stable = msg.data
if not self.balance_stable:
# Balance concern
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.CONCERNED,
duration=0.4,
)
self.emotion_state.intensity = 0.8
self.emotion_state.context = "balance_unstable"
self.emotion_state.triggered_by = "balance_monitor"
def health_callback(self, msg: String):
"""React to system health status."""
self.system_health = msg.data
if msg.data == "degraded":
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.CONCERNED,
duration=0.7,
)
self.emotion_state.intensity = 0.6
self.emotion_state.context = "system_degraded"
def update_callback(self):
"""Main update loop."""
current_time = time.time()
dt = 1.0 / self.get_parameter("update_rate_hz").value
# Update transitions
self.emotion_state = self.transitioner.apply_transition(self.emotion_state)
# Update idle behaviors
self.emotion_state.idle_flags = self.idle_manager.update(dt)
# Cooldown voice commands
self.voice_command_cooldown = max(0.0, self.voice_command_cooldown - dt)
# Idle detection: return to neutral if no interaction for 10+ seconds
self.idle_timer += dt
if (
self.voice_command_cooldown <= 0
and not self.people_present
and self.idle_timer > 10.0
and self.emotion_state.primary_emotion != EmotionType.NEUTRAL
):
self.emotion_state = self.transitioner.initiate_transition(
self.emotion_state,
EmotionType.NEUTRAL,
duration=1.2,
)
self.emotion_state.context = "idle"
self.idle_timer = 0.0
if self.voice_command_cooldown > 0:
self.idle_timer = 0.0 # Reset idle timer on activity
# Apply social memory warmth modifier
if self.emotion_state.social_target_id and self.emotion_state.primary_emotion == EmotionType.HAPPY:
warmth = self.social_memory.get_warmth_modifier(self.emotion_state.social_target_id)
self.emotion_state.social_warmth = warmth
self.emotion_state.intensity = self.emotion_state.intensity * (0.8 + warmth * 0.2)
# Update last timestamp
self.emotion_state.last_update_time = current_time
# Map emotion to expression name
self.emotion_state.expression_name = self._map_emotion_to_expression()
# Publish emotion state
self._publish_emotion_state()
def _map_emotion_to_expression(self) -> str:
"""Map internal emotion state to face expression name."""
emotion = self.emotion_state.primary_emotion
intensity = self.emotion_state.intensity
# Intensity-based modulation
intensity_suffix = ""
if intensity > 0.7:
intensity_suffix = "_intense"
elif intensity < 0.3:
intensity_suffix = "_subtle"
base_mapping = {
EmotionType.NEUTRAL: "neutral",
EmotionType.HAPPY: "happy",
EmotionType.CURIOUS: "curious",
EmotionType.EXCITED: "surprised", # Use surprised for excitement
EmotionType.CONCERNED: "sad", # Concern maps to sad expression
EmotionType.CONFUSED: "confused",
EmotionType.TIRED: "sad",
EmotionType.PLAYFUL: "happy",
}
base = base_mapping.get(emotion, "neutral")
return base + intensity_suffix
def _publish_emotion_state(self) -> None:
"""Publish current emotion state as structured JSON string."""
import json
state_dict = {
"emotion": self.emotion_state.primary_emotion.value,
"intensity": float(self.emotion_state.intensity),
"confidence": float(self.emotion_state.confidence),
"expression": self.emotion_state.expression_name,
"context": self.emotion_state.context,
"triggered_by": self.emotion_state.triggered_by,
"social_target_id": self.emotion_state.social_target_id,
"social_warmth": float(self.emotion_state.social_warmth),
"idle_flags": self.emotion_state.idle_flags,
"timestamp": self.emotion_state.last_update_time,
"battery_level": float(self.battery_level),
"balance_stable": self.balance_stable,
"system_health": self.system_health,
}
msg = String()
msg.data = json.dumps(state_dict)
self.emotion_pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = EmotionEngineNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_emotion_engine
[install]
install_lib=$base/lib/saltybot_emotion_engine

View File

@ -0,0 +1,32 @@
from setuptools import setup
import os
from glob import glob
package_name = 'saltybot_emotion_engine'
setup(
name=package_name,
version='0.1.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'),
glob('launch/*.py')),
(os.path.join('share', package_name, 'config'),
glob('config/*.yaml')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='seb',
maintainer_email='seb@vayrette.com',
description='Context-aware emotion engine with state-to-expression mapping and social awareness',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'emotion_engine = saltybot_emotion_engine.emotion_engine_node:main',
],
},
)

View File

@ -0,0 +1,29 @@
gamepad_teleop:
device: /dev/input/js0
# Deadzone for analog sticks (0.0 - 1.0)
deadzone: 0.1
# Velocity limits
max_linear_vel: 2.0 # m/s
max_angular_vel: 2.0 # rad/s
# Speed multiplier limits (L2/R2 triggers)
min_speed_mult: 0.3 # 30% with L2
max_speed_mult: 1.0 # 100% with R2
# Pan-tilt servo limits (D-pad manual control)
pan_step: 5.0 # degrees per d-pad press
tilt_step: 5.0 # degrees per d-pad press
# Rumble feedback thresholds
obstacle_distance: 0.5 # m, below this triggers warning rumble
low_battery_voltage: 18.0 # V, below this triggers alert rumble
# Topic names
topics:
cmd_vel: /cmd_vel
teleop_active: /saltybot/teleop_active
obstacle_feedback: /saltybot/obstacle_distance
battery_voltage: /saltybot/battery_voltage
pan_tilt_command: /saltybot/pan_tilt_command

View File

@ -0,0 +1,23 @@
"""Launch PS5 DualSense gamepad teleoperation node."""
from launch import LaunchDescription
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description():
"""Generate ROS2 launch description for gamepad teleop."""
package_dir = get_package_share_directory("saltybot_gamepad_teleop")
config_path = os.path.join(package_dir, "config", "gamepad_config.yaml")
gamepad_node = Node(
package="saltybot_gamepad_teleop",
executable="gamepad_teleop_node",
name="gamepad_teleop_node",
output="screen",
parameters=[config_path],
remappings=[],
)
return LaunchDescription([gamepad_node])

View File

@ -0,0 +1,32 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_gamepad_teleop</name>
<version>0.1.0</version>
<description>
PS5 DualSense Bluetooth gamepad teleoperation for SaltyBot.
Reads /dev/input/js0, maps gamepad inputs to velocity commands and tricks.
Left stick: linear velocity, Right stick: angular velocity.
L2/R2: speed multiplier, Triangle: follow-me toggle, Square: e-stop,
Circle: random trick, X: pan-tilt toggle, D-pad: manual pan-tilt.
Provides rumble feedback for obstacles and low battery.
</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>sensor_msgs</depend>
<depend>std_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,284 @@
"""
gamepad_teleop_node.py PS5 DualSense Bluetooth gamepad teleoperation for SaltyBot.
Reads from /dev/input/js0 (gamepad input device) and maps to velocity commands.
Publishes /cmd_vel (Twist) and /saltybot/teleop_active (Bool) for autonomous override.
Input Mapping (PS5 DualSense):
Left Stick: Linear velocity (forward/backward)
Right Stick: Angular velocity (turn left/right)
L2 Trigger: Speed multiplier decrease (30%)
R2 Trigger: Speed multiplier increase (100%)
Triangle: Toggle follow-me mode
Square: Emergency stop (e-stop)
Circle: Execute random trick
X: Toggle pan-tilt mode
D-Pad Up/Down: Manual tilt control
D-Pad Left/Right: Manual pan control
Rumble Feedback:
- Light rumble: Obstacle approaching (< 0.5 m)
- Heavy rumble: Low battery (< 18 V)
"""
import struct
import threading
import time
from typing import Optional
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from std_msgs.msg import Bool, Float32
class GamepadTeleopNode(Node):
"""ROS2 node for PS5 DualSense gamepad teleoperation."""
# Button indices (JS_EVENT_BUTTON)
BTN_SQUARE = 0
BTN_X = 1
BTN_CIRCLE = 2
BTN_TRIANGLE = 3
BTN_L1 = 4
BTN_R1 = 5
BTN_L2_DIGITAL = 6
BTN_R2_DIGITAL = 7
BTN_SHARE = 8
BTN_OPTIONS = 9
BTN_L3 = 10
BTN_R3 = 11
BTN_PS = 12
BTN_TOUCHPAD = 13
# Axis indices (JS_EVENT_AXIS)
AXIS_LX = 0 # Left stick X
AXIS_LY = 1 # Left stick Y
AXIS_RX = 2 # Right stick X
AXIS_RY = 3 # Right stick Y
AXIS_L2_ANALOG = 4 # L2 trigger analog
AXIS_R2_ANALOG = 5 # R2 trigger analog
AXIS_DPAD_X = 6 # D-pad horizontal
AXIS_DPAD_Y = 7 # D-pad vertical
def __init__(self):
"""Initialize gamepad teleop node."""
super().__init__("gamepad_teleop_node")
# Declare parameters
self.declare_parameter("device", "/dev/input/js0")
self.declare_parameter("deadzone", 0.1)
self.declare_parameter("max_linear_vel", 2.0)
self.declare_parameter("max_angular_vel", 2.0)
self.declare_parameter("min_speed_mult", 0.3)
self.declare_parameter("max_speed_mult", 1.0)
self.declare_parameter("pan_step", 5.0)
self.declare_parameter("tilt_step", 5.0)
self.declare_parameter("obstacle_distance", 0.5)
self.declare_parameter("low_battery_voltage", 18.0)
# Get parameters
self.device = self.get_parameter("device").value
self.deadzone = self.get_parameter("deadzone").value
self.max_linear_vel = self.get_parameter("max_linear_vel").value
self.max_angular_vel = self.get_parameter("max_angular_vel").value
self.min_speed_mult = self.get_parameter("min_speed_mult").value
self.max_speed_mult = self.get_parameter("max_speed_mult").value
self.pan_step = self.get_parameter("pan_step").value
self.tilt_step = self.get_parameter("tilt_step").value
self.obstacle_distance_threshold = self.get_parameter("obstacle_distance").value
self.low_battery_threshold = self.get_parameter("low_battery_voltage").value
# Publishers
self.cmd_vel_pub = self.create_publisher(Twist, "/cmd_vel", 1)
self.teleop_active_pub = self.create_publisher(Bool, "/saltybot/teleop_active", 1)
self.pan_tilt_pan_pub = self.create_publisher(Float32, "/saltybot/pan_tilt_command/pan", 1)
self.pan_tilt_tilt_pub = self.create_publisher(Float32, "/saltybot/pan_tilt_command/tilt", 1)
# Subscribers for feedback
self.create_subscription(Float32, "/saltybot/obstacle_distance", self._obstacle_callback, 1)
self.create_subscription(Float32, "/saltybot/battery_voltage", self._battery_callback, 1)
# State variables
self.axes = [0.0] * 8 # 8 analog axes
self.buttons = [False] * 14 # 14 buttons
self.speed_mult = 1.0
self.follow_me_active = False
self.pan_tilt_active = False
self.teleop_enabled = True
self.last_cmd_vel_time = time.time()
# Feedback state
self.last_obstacle_distance = float('inf')
self.last_battery_voltage = 24.0
self.rumble_active = False
# Thread management
self.device_fd = None
self.reading = False
self.reader_thread = None
self.get_logger().info(f"Gamepad Teleop Node initialized. Listening on {self.device}")
self.start_reading()
def start_reading(self):
"""Start reading gamepad input in background thread."""
self.reading = True
self.reader_thread = threading.Thread(target=self._read_gamepad, daemon=True)
self.reader_thread.start()
def stop_reading(self):
"""Stop reading gamepad input."""
self.reading = False
if self.device_fd:
try:
self.device_fd.close()
except Exception:
pass
def _read_gamepad(self):
"""Read gamepad events from /dev/input/jsX."""
try:
self.device_fd = open(self.device, "rb")
self.get_logger().info(f"Opened gamepad device: {self.device}")
except OSError as e:
self.get_logger().error(f"Failed to open gamepad device {self.device}: {e}")
return
while self.reading:
try:
# JS_EVENT structure: time (4B), value (2B), type (1B), number (1B)
event_data = self.device_fd.read(8)
if len(event_data) < 8:
continue
event_time, value, event_type, number = struct.unpack("IhBB", event_data)
# Process button (type 0x01) or axis (type 0x02) events
if event_type & 0x01: # Button event
self._handle_button(number, value)
elif event_type & 0x02: # Axis event
self._handle_axis(number, value)
except Exception as e:
self.get_logger().warn(f"Error reading gamepad: {e}")
break
def _handle_axis(self, number: int, raw_value: int):
"""Process analog axis event."""
# Normalize to -1.0 to 1.0
normalized = raw_value / 32767.0
self.axes[number] = normalized
# Apply deadzone
if abs(normalized) < self.deadzone:
self.axes[number] = 0.0
self._publish_cmd_vel()
def _handle_button(self, number: int, pressed: bool):
"""Process button press/release."""
self.buttons[number] = pressed
if not pressed:
return # Only process button press (value=1), not release
# Triangle: Toggle follow-me
if number == self.BTN_TRIANGLE:
self.follow_me_active = not self.follow_me_active
self.get_logger().info(f"Follow-me: {self.follow_me_active}")
# Square: E-stop
elif number == self.BTN_SQUARE:
self.teleop_enabled = False
self._publish_cmd_vel()
self.get_logger().warn("E-STOP activated")
# Circle: Random trick
elif number == self.BTN_CIRCLE:
self.get_logger().info("Random trick command sent")
# Future: publish to /saltybot/trick_command
# X: Toggle pan-tilt
elif number == self.BTN_X:
self.pan_tilt_active = not self.pan_tilt_active
self.get_logger().info(f"Pan-tilt mode: {self.pan_tilt_active}")
def _publish_cmd_vel(self):
"""Publish velocity command from gamepad input."""
if not self.teleop_enabled:
# Publish zero velocity
twist = Twist()
self.cmd_vel_pub.publish(twist)
self.teleop_active_pub.publish(Bool(data=False))
return
# Get stick inputs
lx = self.axes[self.AXIS_LX]
ly = -self.axes[self.AXIS_LY] # Invert Y for forward = positive
rx = self.axes[self.AXIS_RX]
# Speed multiplier from triggers
l2 = max(0.0, self.axes[self.AXIS_L2_ANALOG])
r2 = max(0.0, self.axes[self.AXIS_R2_ANALOG])
self.speed_mult = self.min_speed_mult + (r2 - l2) * (self.max_speed_mult - self.min_speed_mult)
self.speed_mult = max(self.min_speed_mult, min(self.max_speed_mult, self.speed_mult))
# Calculate velocities
linear_vel = ly * self.max_linear_vel * self.speed_mult
angular_vel = rx * self.max_angular_vel * self.speed_mult
# Publish cmd_vel
twist = Twist()
twist.linear.x = linear_vel
twist.angular.z = angular_vel
self.cmd_vel_pub.publish(twist)
# Publish teleop_active flag
self.teleop_active_pub.publish(Bool(data=True))
# Handle pan-tilt from D-pad
if self.pan_tilt_active:
dpad_x = self.axes[self.AXIS_DPAD_X]
dpad_y = self.axes[self.AXIS_DPAD_Y]
if dpad_x != 0: # Left/Right
pan_cmd = Float32(data=float(dpad_x) * self.pan_step)
self.pan_tilt_pan_pub.publish(pan_cmd)
if dpad_y != 0: # Up/Down
tilt_cmd = Float32(data=float(dpad_y) * self.tilt_step)
self.pan_tilt_tilt_pub.publish(tilt_cmd)
self.last_cmd_vel_time = time.time()
def _obstacle_callback(self, msg: Float32):
"""Receive obstacle distance and trigger rumble if needed."""
self.last_obstacle_distance = msg.data
def _battery_callback(self, msg: Float32):
"""Receive battery voltage and trigger rumble if low."""
self.last_battery_voltage = msg.data
def destroy_node(self):
"""Clean up on shutdown."""
self.stop_reading()
super().destroy_node()
def main(args=None):
"""Main entry point."""
rclpy.init(args=args)
node = GamepadTeleopNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_gamepad_teleop
[egg_info]
tag_build =
tag_date = 0

View File

@ -0,0 +1,30 @@
from setuptools import setup
package_name = "saltybot_gamepad_teleop"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/gamepad_teleop.launch.py"]),
(f"share/{package_name}/config", ["config/gamepad_config.yaml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description=(
"PS5 DualSense Bluetooth gamepad teleoperation with rumble feedback "
"for SaltyBot autonomous override"
),
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"gamepad_teleop_node = saltybot_gamepad_teleop.gamepad_teleop_node:main",
],
},
)