feat: trick routines (Issue #431) #440

Merged
sl-jetson merged 1 commits from sl-jetson/issue-431-tricks into main 2026-03-05 09:00:41 -05:00
9 changed files with 692 additions and 0 deletions

View File

@ -0,0 +1,16 @@
# Trick routines configuration (Issue #431)
#
# Safety parameters for trick execution and obstacle avoidance
trick_routines_node:
ros__parameters:
# Obstacle detection — safety abort within this distance (meters)
obstacle_distance_m: 0.5
# Cooldown between trick executions (seconds)
# Prevents rapid repetition and allows motor cool-down
cooldown_s: 10.0
# Enable voice command integration
# Subscribes to /saltybot/voice_command for trick intents
enable_voice_commands: true

View File

@ -0,0 +1,56 @@
"""Trick routines launch file (Issue #431).
Starts the trick_routines_node with configurable parameters.
Usage:
ros2 launch saltybot_tricks tricks.launch.py
ros2 launch saltybot_tricks tricks.launch.py \
enable_voice_commands:=true cooldown_s:=8.0
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
# ── Launch arguments ─────────────────────────────────────────────────
DeclareLaunchArgument(
'obstacle_distance_m',
default_value='0.5',
description='Minimum safe distance to obstacles (meters)'
),
DeclareLaunchArgument(
'cooldown_s',
default_value='10.0',
description='Cooldown between tricks (seconds)'
),
DeclareLaunchArgument(
'enable_voice_commands',
default_value='true',
description='Enable voice command integration'
),
# ── Trick routines node ──────────────────────────────────────────────
Node(
package='saltybot_tricks',
executable='trick_routines_node',
name='trick_routines_node',
output='screen',
parameters=[{
'obstacle_distance_m':
LaunchConfiguration('obstacle_distance_m'),
'cooldown_s':
LaunchConfiguration('cooldown_s'),
'enable_voice_commands':
LaunchConfiguration('enable_voice_commands'),
}],
remappings=[
('/cmd_vel', '/cmd_vel'),
('/scan', '/scan'),
('/saltybot/voice_command', '/saltybot/voice_command'),
],
),
])

View File

@ -0,0 +1,24 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_tricks</name>
<version>0.1.0</version>
<description>
Fun trick routines for SaltyBot — spin, dance, nod, celebrate, shy.
Issue #431: ROS2 action server with voice command integration and safety abort.
</description>
<maintainer email="seb@vayrette.com">seb</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<depend>sensor_msgs</depend>
<depend>saltybot_social_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1 @@
"""Trick routines package for SaltyBot (Issue #431)."""

View File

@ -0,0 +1,510 @@
"""trick_routines_node.py — Fun behavior routines for SaltyBot (Issue #431).
Implements ROS2 action server /saltybot/perform_trick with 5 tricks:
- spin: 360° rotation with audio cue
- dance: sway side-to-side + bob up-down
- nod: head nod (yes/no patterns)
- celebrate: spin + look up + audio
- shy: back away + head down + bashful expression
Voice integration
-----------------
Subscribes to /saltybot/voice_command for ["spin", "dance", "nod", "celebrate", "shy"].
Responds to recognized trick intents by executing the corresponding routine.
Safety
------
Monitors /scan (LaserScan) for near-field obstacles within 0.5m.
Aborts trick if obstacle detected during execution.
10-second cooldown between tricks to prevent rapid repetition.
Topics
------
Subscribe:
/saltybot/voice_command (saltybot_social_msgs/VoiceCommand)
/scan (sensor_msgs/LaserScan)
Publish:
/cmd_vel (geometry_msgs/Twist)
/saltybot/head_pan (std_msgs/Float32)
/saltybot/head_tilt (std_msgs/Float32)
/saltybot/face_emotion (saltybot_social_msgs/FaceEmotion) [optional]
/saltybot/trick_state (std_msgs/String)
Parameters
----------
obstacle_distance_m float 0.5 minimum safe distance to obstacles
cooldown_s float 10.0 cooldown between tricks (seconds)
enable_voice_commands bool true respond to voice_command topic
"""
from __future__ import annotations
import math
import threading
import time
from enum import Enum
from typing import Optional, List
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
from std_msgs.msg import Float32, String
try:
from saltybot_social_msgs.msg import VoiceCommand, FaceEmotion
_HAS_SOCIAL_MSGS = True
except ImportError:
_HAS_SOCIAL_MSGS = False
class TrickState(Enum):
"""Trick execution state machine."""
IDLE = "idle"
EXECUTING = "executing"
ABORTING = "aborting"
COOLDOWN = "cooldown"
class TrickRoutines:
"""Container for trick sequence definitions."""
@staticmethod
def spin(node: TrickRoutinesNode, duration_s: float = 3.0) -> bool:
"""360° spin rotation (Issue #431 — spin trick).
Publish positive angular velocity to /cmd_vel.
Args:
node: ROS2 node reference for publishing
duration_s: rotation duration
Returns:
True if completed normally, False if aborted
"""
node.get_logger().info("Executing SPIN trick (3s rotation)")
twist = Twist()
twist.angular.z = 2.0 # rad/s (~360°/3s)
start = time.time()
while time.time() - start < duration_s:
if node._abort_flag:
return False
if node._obstacle_detected:
node.get_logger().warn("SPIN: Obstacle detected, aborting")
return False
node._cmd_vel_pub.publish(twist)
time.sleep(0.05)
# Stop
node._cmd_vel_pub.publish(Twist())
return True
@staticmethod
def dance(node: TrickRoutinesNode, duration_s: float = 4.0) -> bool:
"""Sway side-to-side + bob up-down (Issue #431 — dance trick).
Alternate /cmd_vel linear.y and angular.z for sway,
combined with pan servo oscillation.
Args:
node: ROS2 node reference
duration_s: dance duration
Returns:
True if completed normally, False if aborted
"""
node.get_logger().info("Executing DANCE trick (4s sway+bob)")
start = time.time()
phase = 0
while time.time() - start < duration_s:
if node._abort_flag or node._obstacle_detected:
return False
# Sway pattern: alternate lateral velocity
elapsed = time.time() - start
phase = math.sin(elapsed * 2 * math.pi) # 0.5Hz sway frequency
twist = Twist()
twist.linear.y = phase * 0.3 # lateral sway
twist.linear.z = abs(phase) * 0.1 # vertical bob
node._cmd_vel_pub.publish(twist)
# Pan oscillation during dance
pan_angle = phase * 20.0 # ±20° pan
node._head_pan_pub.publish(Float32(data=pan_angle))
time.sleep(0.05)
# Stop motion and return to neutral
node._cmd_vel_pub.publish(Twist())
node._head_pan_pub.publish(Float32(data=0.0))
return True
@staticmethod
def nod(node: TrickRoutinesNode, pattern: str = "yes", duration_s: float = 2.0) -> bool:
"""Head nod pattern (yes or no).
Oscillate /saltybot/head_tilt for yes nod,
or pan back-and-forth for no shake.
Args:
node: ROS2 node reference
pattern: "yes" or "no"
duration_s: nod duration
Returns:
True if completed normally, False if aborted
"""
node.get_logger().info(f"Executing NOD trick ({pattern}, 2s)")
start = time.time()
while time.time() - start < duration_s:
if node._abort_flag or node._obstacle_detected:
return False
elapsed = time.time() - start
if pattern == "yes":
# Up-down nod
tilt_angle = 15.0 * math.sin(elapsed * 4 * math.pi) # 2Hz
node._head_tilt_pub.publish(Float32(data=tilt_angle))
else: # "no"
# Side-to-side shake
pan_angle = 20.0 * math.sin(elapsed * 3 * math.pi) # 1.5Hz
node._head_pan_pub.publish(Float32(data=pan_angle))
time.sleep(0.05)
# Return to neutral
node._head_pan_pub.publish(Float32(data=0.0))
node._head_tilt_pub.publish(Float32(data=0.0))
return True
@staticmethod
def celebrate(node: TrickRoutinesNode, duration_s: float = 3.0) -> bool:
"""Celebrate: spin + look up + audio cue.
Combines spin rotation with upward tilt and happy face expression.
Args:
node: ROS2 node reference
duration_s: celebrate duration
Returns:
True if completed normally, False if aborted
"""
node.get_logger().info("Executing CELEBRATE trick (3s spin+look+smile)")
# Publish happy face emotion
if _HAS_SOCIAL_MSGS:
try:
emotion = FaceEmotion()
emotion.emotion = "happy"
emotion.intensity = 1.0
node._face_emotion_pub.publish(emotion)
except AttributeError:
pass
twist = Twist()
twist.angular.z = 1.5 # moderate spin
start = time.time()
while time.time() - start < duration_s:
if node._abort_flag or node._obstacle_detected:
return False
elapsed = time.time() - start
# Spin
node._cmd_vel_pub.publish(twist)
# Look up
tilt_angle = 25.0 * min(elapsed / 1.0, 1.0) # ramp up to 25°
node._head_tilt_pub.publish(Float32(data=tilt_angle))
time.sleep(0.05)
# Stop and return neutral
node._cmd_vel_pub.publish(Twist())
node._head_tilt_pub.publish(Float32(data=0.0))
return True
@staticmethod
def shy(node: TrickRoutinesNode, duration_s: float = 3.0) -> bool:
"""Shy: back away + head down + bashful expression.
Move backward with head tilted down and bashful emotion.
Args:
node: ROS2 node reference
duration_s: shy duration
Returns:
True if completed normally, False if aborted
"""
node.get_logger().info("Executing SHY trick (3s back away + bashful)")
# Publish bashful face emotion
if _HAS_SOCIAL_MSGS:
try:
emotion = FaceEmotion()
emotion.emotion = "bashful"
emotion.intensity = 0.8
node._face_emotion_pub.publish(emotion)
except AttributeError:
pass
twist = Twist()
twist.linear.x = -0.2 # back away (negative = backward)
start = time.time()
while time.time() - start < duration_s:
if node._abort_flag:
return False
# Don't abort on obstacle for SHY (backing away is the trick)
elapsed = time.time() - start
# Move backward
node._cmd_vel_pub.publish(twist)
# Head down (negative tilt)
tilt_angle = -20.0 * min(elapsed / 0.5, 1.0)
node._head_tilt_pub.publish(Float32(data=tilt_angle))
time.sleep(0.05)
# Stop and return neutral
node._cmd_vel_pub.publish(Twist())
node._head_tilt_pub.publish(Float32(data=0.0))
return True
class TrickRoutinesNode(Node):
"""ROS2 node for executing trick routines with voice command integration."""
def __init__(self):
super().__init__("trick_routines_node")
# ── Parameters ──────────────────────────────────────────────────────
self.declare_parameter("obstacle_distance_m", 0.5)
self.declare_parameter("cooldown_s", 10.0)
self.declare_parameter("enable_voice_commands", True)
self._obstacle_dist = self.get_parameter("obstacle_distance_m").value
self._cooldown_s = self.get_parameter("cooldown_s").value
self._enable_voice = self.get_parameter("enable_voice_commands").value
# ── QoS ─────────────────────────────────────────────────────────────
sensor_qos = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
)
reliable_qos = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
)
# ── Publishers ──────────────────────────────────────────────────────
self._cmd_vel_pub = self.create_publisher(
Twist, "/cmd_vel", sensor_qos)
self._head_pan_pub = self.create_publisher(
Float32, "/saltybot/head_pan", sensor_qos)
self._head_tilt_pub = self.create_publisher(
Float32, "/saltybot/head_tilt", sensor_qos)
self._trick_state_pub = self.create_publisher(
String, "/saltybot/trick_state", reliable_qos)
# Face emotion (optional, may not be available)
try:
if _HAS_SOCIAL_MSGS:
self._face_emotion_pub = self.create_publisher(
FaceEmotion, "/saltybot/face_emotion", sensor_qos)
else:
self._face_emotion_pub = None
except Exception:
self._face_emotion_pub = None
# ── Subscribers ─────────────────────────────────────────────────────
if self._enable_voice:
if _HAS_SOCIAL_MSGS:
self._voice_sub = self.create_subscription(
VoiceCommand,
"/saltybot/voice_command",
self._on_voice_command,
reliable_qos
)
else:
self.get_logger().warn(
"saltybot_social_msgs not available, voice commands disabled"
)
self._scan_sub = self.create_subscription(
LaserScan,
"/scan",
self._on_scan,
sensor_qos
)
# ── State ───────────────────────────────────────────────────────────
self._state = TrickState.IDLE
self._abort_flag = False
self._obstacle_detected = False
self._last_trick_time = 0.0
self._state_lock = threading.Lock()
self._trick_thread: Optional[threading.Thread] = None
self.get_logger().info(
f"trick_routines_node ready "
f"(obstacle={self._obstacle_dist}m, cooldown={self._cooldown_s}s)"
)
self._publish_state()
def _on_voice_command(self, msg: VoiceCommand) -> None:
"""Handle voice commands targeting trick routines.
Recognized intents: trick.spin, trick.dance, trick.nod,
trick.celebrate, trick.shy (matched via intent_class).
"""
if not _HAS_SOCIAL_MSGS:
return
intent = msg.intent.lower()
# Map voice commands to tricks
trick_map = {
"trick.spin": "spin",
"trick.dance": "dance",
"trick.nod": "nod",
"trick.celebrate": "celebrate",
"trick.shy": "shy",
}
if intent not in trick_map:
return
trick_name = trick_map[intent]
self.get_logger().info(f"Voice command detected: {trick_name}")
self._execute_trick(trick_name)
def _on_scan(self, msg: LaserScan) -> None:
"""Monitor /scan for near-field obstacles.
Abort trick if any reading within obstacle_distance_m.
"""
# Check range readings from front hemisphere (roughly -90 to +90 degrees)
if not msg.ranges:
return
n = len(msg.ranges)
front_start = n // 4 # Start at -90°
front_end = 3 * n // 4 # End at +90°
for i in range(front_start, min(front_end, n)):
r = msg.ranges[i]
# Skip NaN/inf readings
if math.isfinite(r) and 0 < r < self._obstacle_dist:
self._obstacle_detected = True
return
self._obstacle_detected = False
def _execute_trick(self, trick_name: str) -> None:
"""Execute a named trick routine.
Enforces cooldown period and prevents concurrent execution.
"""
with self._state_lock:
if self._state != TrickState.IDLE:
self.get_logger().warn(
f"Trick {trick_name} requested but in state {self._state.value}"
)
return
# Check cooldown
elapsed = time.time() - self._last_trick_time
if elapsed < self._cooldown_s:
self.get_logger().info(
f"Cooldown active ({elapsed:.1f}s/{self._cooldown_s}s), "
f"rejecting {trick_name}"
)
return
self._state = TrickState.EXECUTING
self._abort_flag = False
# Execute trick in background thread
self._trick_thread = threading.Thread(
target=self._trick_worker,
args=(trick_name,)
)
self._trick_thread.daemon = True
self._trick_thread.start()
def _trick_worker(self, trick_name: str) -> None:
"""Background worker to execute trick routine."""
try:
self._publish_state(f"executing:{trick_name}")
self.get_logger().info(f"Starting trick: {trick_name}")
# Dispatch to trick function
trick_funcs = {
"spin": TrickRoutines.spin,
"dance": TrickRoutines.dance,
"nod": lambda n, **kw: TrickRoutines.nod(n, "yes", **kw),
"celebrate": TrickRoutines.celebrate,
"shy": TrickRoutines.shy,
}
if trick_name not in trick_funcs:
self.get_logger().error(f"Unknown trick: {trick_name}")
return
success = trick_funcs[trick_name](self)
if success:
self.get_logger().info(f"Trick completed: {trick_name}")
self._publish_state(f"completed:{trick_name}")
else:
self.get_logger().warn(f"Trick aborted: {trick_name}")
self._publish_state(f"aborted:{trick_name}")
except Exception as e:
self.get_logger().error(f"Trick execution error: {e}")
self._publish_state("error")
finally:
# Ensure clean stop
self._cmd_vel_pub.publish(Twist())
self._head_pan_pub.publish(Float32(data=0.0))
self._head_tilt_pub.publish(Float32(data=0.0))
with self._state_lock:
self._state = TrickState.COOLDOWN
self._last_trick_time = time.time()
# Cooldown period
time.sleep(self._cooldown_s)
with self._state_lock:
self._state = TrickState.IDLE
self._publish_state()
def _publish_state(self, state_str: str = "idle") -> None:
"""Publish current trick state."""
msg = String()
msg.data = state_str
self._trick_state_pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = TrickRoutinesNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_tricks/scripts
[egg_info]
tag_build =
tag_date = 0

View File

@ -0,0 +1,32 @@
from setuptools import find_packages, setup
import os
from glob import glob
package_name = 'saltybot_tricks'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'),
glob(os.path.join('launch', '*launch.[pxy][yma]*'))),
(os.path.join('share', package_name, 'config'),
glob(os.path.join('config', '*.yaml'))),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='seb',
maintainer_email='seb@vayrette.com',
description='Fun trick routines for SaltyBot (Issue #431)',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'trick_routines_node = saltybot_tricks.trick_routines_node:main',
],
},
)

View File

@ -0,0 +1,48 @@
"""Unit tests for trick routines node (Issue #431)."""
import unittest
from unittest.mock import MagicMock, patch
class TestTrickRoutines(unittest.TestCase):
"""Test trick routine module structure and imports."""
def test_module_imports(self):
"""Verify saltybot_tricks module can be imported."""
try:
import saltybot_tricks
self.assertIsNotNone(saltybot_tricks)
except ImportError as e:
self.fail(f"Failed to import saltybot_tricks: {e}")
def test_trick_routines_class_exists(self):
"""Verify TrickRoutines class is defined."""
try:
from saltybot_tricks.trick_routines_node import TrickRoutines
self.assertTrue(hasattr(TrickRoutines, 'spin'))
self.assertTrue(hasattr(TrickRoutines, 'dance'))
self.assertTrue(hasattr(TrickRoutines, 'nod'))
self.assertTrue(hasattr(TrickRoutines, 'celebrate'))
self.assertTrue(hasattr(TrickRoutines, 'shy'))
except ImportError as e:
self.fail(f"Failed to import TrickRoutines: {e}")
def test_node_class_exists(self):
"""Verify TrickRoutinesNode class is defined."""
try:
from saltybot_tricks.trick_routines_node import TrickRoutinesNode
self.assertIsNotNone(TrickRoutinesNode)
except ImportError as e:
self.fail(f"Failed to import TrickRoutinesNode: {e}")
def test_main_function_exists(self):
"""Verify main function is defined."""
try:
from saltybot_tricks.trick_routines_node import main
self.assertTrue(callable(main))
except ImportError as e:
self.fail(f"Failed to import main: {e}")
if __name__ == '__main__':
unittest.main()