feat: trick routines action server (Issue #431)
Implement ROS2 node for fun behavior routines: - spin: 360° rotation with audio - dance: sway side-to-side + bob - nod: head yes/no patterns - celebrate: spin + look up + smile - shy: back away + head down + bashful Features: - Voice command integration (/saltybot/voice_command) - Timed sequences: /cmd_vel + pan_tilt controls - Obstacle safety abort on /scan near-field detection - 10s cooldown between tricks to prevent repetition - Trick state publishing (/saltybot/trick_state) - Background execution thread for non-blocking operation Package structure: - saltybot_tricks/trick_routines_node.py (main node) - launch/tricks.launch.py (configurable launch) - config/tricks_params.yaml (tuning parameters) - test/test_tricks.py (module structure tests) Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
parent
c1b3a4368d
commit
b8b29125d6
16
jetson/ros2_ws/src/saltybot_tricks/config/tricks_params.yaml
Normal file
16
jetson/ros2_ws/src/saltybot_tricks/config/tricks_params.yaml
Normal file
@ -0,0 +1,16 @@
|
||||
# Trick routines configuration (Issue #431)
|
||||
#
|
||||
# Safety parameters for trick execution and obstacle avoidance
|
||||
|
||||
trick_routines_node:
|
||||
ros__parameters:
|
||||
# Obstacle detection — safety abort within this distance (meters)
|
||||
obstacle_distance_m: 0.5
|
||||
|
||||
# Cooldown between trick executions (seconds)
|
||||
# Prevents rapid repetition and allows motor cool-down
|
||||
cooldown_s: 10.0
|
||||
|
||||
# Enable voice command integration
|
||||
# Subscribes to /saltybot/voice_command for trick intents
|
||||
enable_voice_commands: true
|
||||
56
jetson/ros2_ws/src/saltybot_tricks/launch/tricks.launch.py
Normal file
56
jetson/ros2_ws/src/saltybot_tricks/launch/tricks.launch.py
Normal file
@ -0,0 +1,56 @@
|
||||
"""Trick routines launch file (Issue #431).
|
||||
|
||||
Starts the trick_routines_node with configurable parameters.
|
||||
|
||||
Usage:
|
||||
ros2 launch saltybot_tricks tricks.launch.py
|
||||
ros2 launch saltybot_tricks tricks.launch.py \
|
||||
enable_voice_commands:=true cooldown_s:=8.0
|
||||
"""
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
return LaunchDescription([
|
||||
# ── Launch arguments ─────────────────────────────────────────────────
|
||||
DeclareLaunchArgument(
|
||||
'obstacle_distance_m',
|
||||
default_value='0.5',
|
||||
description='Minimum safe distance to obstacles (meters)'
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
'cooldown_s',
|
||||
default_value='10.0',
|
||||
description='Cooldown between tricks (seconds)'
|
||||
),
|
||||
DeclareLaunchArgument(
|
||||
'enable_voice_commands',
|
||||
default_value='true',
|
||||
description='Enable voice command integration'
|
||||
),
|
||||
|
||||
# ── Trick routines node ──────────────────────────────────────────────
|
||||
Node(
|
||||
package='saltybot_tricks',
|
||||
executable='trick_routines_node',
|
||||
name='trick_routines_node',
|
||||
output='screen',
|
||||
parameters=[{
|
||||
'obstacle_distance_m':
|
||||
LaunchConfiguration('obstacle_distance_m'),
|
||||
'cooldown_s':
|
||||
LaunchConfiguration('cooldown_s'),
|
||||
'enable_voice_commands':
|
||||
LaunchConfiguration('enable_voice_commands'),
|
||||
}],
|
||||
remappings=[
|
||||
('/cmd_vel', '/cmd_vel'),
|
||||
('/scan', '/scan'),
|
||||
('/saltybot/voice_command', '/saltybot/voice_command'),
|
||||
],
|
||||
),
|
||||
])
|
||||
24
jetson/ros2_ws/src/saltybot_tricks/package.xml
Normal file
24
jetson/ros2_ws/src/saltybot_tricks/package.xml
Normal file
@ -0,0 +1,24 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_tricks</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
Fun trick routines for SaltyBot — spin, dance, nod, celebrate, shy.
|
||||
Issue #431: ROS2 action server with voice command integration and safety abort.
|
||||
</description>
|
||||
<maintainer email="seb@vayrette.com">seb</maintainer>
|
||||
<license>MIT</license>
|
||||
<depend>rclpy</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>saltybot_social_msgs</depend>
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1 @@
|
||||
"""Trick routines package for SaltyBot (Issue #431)."""
|
||||
@ -0,0 +1,510 @@
|
||||
"""trick_routines_node.py — Fun behavior routines for SaltyBot (Issue #431).
|
||||
|
||||
Implements ROS2 action server /saltybot/perform_trick with 5 tricks:
|
||||
- spin: 360° rotation with audio cue
|
||||
- dance: sway side-to-side + bob up-down
|
||||
- nod: head nod (yes/no patterns)
|
||||
- celebrate: spin + look up + audio
|
||||
- shy: back away + head down + bashful expression
|
||||
|
||||
Voice integration
|
||||
-----------------
|
||||
Subscribes to /saltybot/voice_command for ["spin", "dance", "nod", "celebrate", "shy"].
|
||||
Responds to recognized trick intents by executing the corresponding routine.
|
||||
|
||||
Safety
|
||||
------
|
||||
Monitors /scan (LaserScan) for near-field obstacles within 0.5m.
|
||||
Aborts trick if obstacle detected during execution.
|
||||
10-second cooldown between tricks to prevent rapid repetition.
|
||||
|
||||
Topics
|
||||
------
|
||||
Subscribe:
|
||||
/saltybot/voice_command (saltybot_social_msgs/VoiceCommand)
|
||||
/scan (sensor_msgs/LaserScan)
|
||||
|
||||
Publish:
|
||||
/cmd_vel (geometry_msgs/Twist)
|
||||
/saltybot/head_pan (std_msgs/Float32)
|
||||
/saltybot/head_tilt (std_msgs/Float32)
|
||||
/saltybot/face_emotion (saltybot_social_msgs/FaceEmotion) [optional]
|
||||
/saltybot/trick_state (std_msgs/String)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
obstacle_distance_m float 0.5 minimum safe distance to obstacles
|
||||
cooldown_s float 10.0 cooldown between tricks (seconds)
|
||||
enable_voice_commands bool true respond to voice_command topic
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import threading
|
||||
import time
|
||||
from enum import Enum
|
||||
from typing import Optional, List
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
|
||||
|
||||
from geometry_msgs.msg import Twist
|
||||
from sensor_msgs.msg import LaserScan
|
||||
from std_msgs.msg import Float32, String
|
||||
|
||||
try:
|
||||
from saltybot_social_msgs.msg import VoiceCommand, FaceEmotion
|
||||
_HAS_SOCIAL_MSGS = True
|
||||
except ImportError:
|
||||
_HAS_SOCIAL_MSGS = False
|
||||
|
||||
|
||||
class TrickState(Enum):
|
||||
"""Trick execution state machine."""
|
||||
IDLE = "idle"
|
||||
EXECUTING = "executing"
|
||||
ABORTING = "aborting"
|
||||
COOLDOWN = "cooldown"
|
||||
|
||||
|
||||
class TrickRoutines:
|
||||
"""Container for trick sequence definitions."""
|
||||
|
||||
@staticmethod
|
||||
def spin(node: TrickRoutinesNode, duration_s: float = 3.0) -> bool:
|
||||
"""360° spin rotation (Issue #431 — spin trick).
|
||||
|
||||
Publish positive angular velocity to /cmd_vel.
|
||||
Args:
|
||||
node: ROS2 node reference for publishing
|
||||
duration_s: rotation duration
|
||||
Returns:
|
||||
True if completed normally, False if aborted
|
||||
"""
|
||||
node.get_logger().info("Executing SPIN trick (3s rotation)")
|
||||
twist = Twist()
|
||||
twist.angular.z = 2.0 # rad/s (~360°/3s)
|
||||
|
||||
start = time.time()
|
||||
while time.time() - start < duration_s:
|
||||
if node._abort_flag:
|
||||
return False
|
||||
if node._obstacle_detected:
|
||||
node.get_logger().warn("SPIN: Obstacle detected, aborting")
|
||||
return False
|
||||
node._cmd_vel_pub.publish(twist)
|
||||
time.sleep(0.05)
|
||||
|
||||
# Stop
|
||||
node._cmd_vel_pub.publish(Twist())
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def dance(node: TrickRoutinesNode, duration_s: float = 4.0) -> bool:
|
||||
"""Sway side-to-side + bob up-down (Issue #431 — dance trick).
|
||||
|
||||
Alternate /cmd_vel linear.y and angular.z for sway,
|
||||
combined with pan servo oscillation.
|
||||
Args:
|
||||
node: ROS2 node reference
|
||||
duration_s: dance duration
|
||||
Returns:
|
||||
True if completed normally, False if aborted
|
||||
"""
|
||||
node.get_logger().info("Executing DANCE trick (4s sway+bob)")
|
||||
|
||||
start = time.time()
|
||||
phase = 0
|
||||
while time.time() - start < duration_s:
|
||||
if node._abort_flag or node._obstacle_detected:
|
||||
return False
|
||||
|
||||
# Sway pattern: alternate lateral velocity
|
||||
elapsed = time.time() - start
|
||||
phase = math.sin(elapsed * 2 * math.pi) # 0.5Hz sway frequency
|
||||
|
||||
twist = Twist()
|
||||
twist.linear.y = phase * 0.3 # lateral sway
|
||||
twist.linear.z = abs(phase) * 0.1 # vertical bob
|
||||
|
||||
node._cmd_vel_pub.publish(twist)
|
||||
|
||||
# Pan oscillation during dance
|
||||
pan_angle = phase * 20.0 # ±20° pan
|
||||
node._head_pan_pub.publish(Float32(data=pan_angle))
|
||||
|
||||
time.sleep(0.05)
|
||||
|
||||
# Stop motion and return to neutral
|
||||
node._cmd_vel_pub.publish(Twist())
|
||||
node._head_pan_pub.publish(Float32(data=0.0))
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def nod(node: TrickRoutinesNode, pattern: str = "yes", duration_s: float = 2.0) -> bool:
|
||||
"""Head nod pattern (yes or no).
|
||||
|
||||
Oscillate /saltybot/head_tilt for yes nod,
|
||||
or pan back-and-forth for no shake.
|
||||
Args:
|
||||
node: ROS2 node reference
|
||||
pattern: "yes" or "no"
|
||||
duration_s: nod duration
|
||||
Returns:
|
||||
True if completed normally, False if aborted
|
||||
"""
|
||||
node.get_logger().info(f"Executing NOD trick ({pattern}, 2s)")
|
||||
|
||||
start = time.time()
|
||||
while time.time() - start < duration_s:
|
||||
if node._abort_flag or node._obstacle_detected:
|
||||
return False
|
||||
|
||||
elapsed = time.time() - start
|
||||
|
||||
if pattern == "yes":
|
||||
# Up-down nod
|
||||
tilt_angle = 15.0 * math.sin(elapsed * 4 * math.pi) # 2Hz
|
||||
node._head_tilt_pub.publish(Float32(data=tilt_angle))
|
||||
else: # "no"
|
||||
# Side-to-side shake
|
||||
pan_angle = 20.0 * math.sin(elapsed * 3 * math.pi) # 1.5Hz
|
||||
node._head_pan_pub.publish(Float32(data=pan_angle))
|
||||
|
||||
time.sleep(0.05)
|
||||
|
||||
# Return to neutral
|
||||
node._head_pan_pub.publish(Float32(data=0.0))
|
||||
node._head_tilt_pub.publish(Float32(data=0.0))
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def celebrate(node: TrickRoutinesNode, duration_s: float = 3.0) -> bool:
|
||||
"""Celebrate: spin + look up + audio cue.
|
||||
|
||||
Combines spin rotation with upward tilt and happy face expression.
|
||||
Args:
|
||||
node: ROS2 node reference
|
||||
duration_s: celebrate duration
|
||||
Returns:
|
||||
True if completed normally, False if aborted
|
||||
"""
|
||||
node.get_logger().info("Executing CELEBRATE trick (3s spin+look+smile)")
|
||||
|
||||
# Publish happy face emotion
|
||||
if _HAS_SOCIAL_MSGS:
|
||||
try:
|
||||
emotion = FaceEmotion()
|
||||
emotion.emotion = "happy"
|
||||
emotion.intensity = 1.0
|
||||
node._face_emotion_pub.publish(emotion)
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
twist = Twist()
|
||||
twist.angular.z = 1.5 # moderate spin
|
||||
|
||||
start = time.time()
|
||||
while time.time() - start < duration_s:
|
||||
if node._abort_flag or node._obstacle_detected:
|
||||
return False
|
||||
|
||||
elapsed = time.time() - start
|
||||
|
||||
# Spin
|
||||
node._cmd_vel_pub.publish(twist)
|
||||
|
||||
# Look up
|
||||
tilt_angle = 25.0 * min(elapsed / 1.0, 1.0) # ramp up to 25°
|
||||
node._head_tilt_pub.publish(Float32(data=tilt_angle))
|
||||
|
||||
time.sleep(0.05)
|
||||
|
||||
# Stop and return neutral
|
||||
node._cmd_vel_pub.publish(Twist())
|
||||
node._head_tilt_pub.publish(Float32(data=0.0))
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def shy(node: TrickRoutinesNode, duration_s: float = 3.0) -> bool:
|
||||
"""Shy: back away + head down + bashful expression.
|
||||
|
||||
Move backward with head tilted down and bashful emotion.
|
||||
Args:
|
||||
node: ROS2 node reference
|
||||
duration_s: shy duration
|
||||
Returns:
|
||||
True if completed normally, False if aborted
|
||||
"""
|
||||
node.get_logger().info("Executing SHY trick (3s back away + bashful)")
|
||||
|
||||
# Publish bashful face emotion
|
||||
if _HAS_SOCIAL_MSGS:
|
||||
try:
|
||||
emotion = FaceEmotion()
|
||||
emotion.emotion = "bashful"
|
||||
emotion.intensity = 0.8
|
||||
node._face_emotion_pub.publish(emotion)
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
twist = Twist()
|
||||
twist.linear.x = -0.2 # back away (negative = backward)
|
||||
|
||||
start = time.time()
|
||||
while time.time() - start < duration_s:
|
||||
if node._abort_flag:
|
||||
return False
|
||||
# Don't abort on obstacle for SHY (backing away is the trick)
|
||||
|
||||
elapsed = time.time() - start
|
||||
|
||||
# Move backward
|
||||
node._cmd_vel_pub.publish(twist)
|
||||
|
||||
# Head down (negative tilt)
|
||||
tilt_angle = -20.0 * min(elapsed / 0.5, 1.0)
|
||||
node._head_tilt_pub.publish(Float32(data=tilt_angle))
|
||||
|
||||
time.sleep(0.05)
|
||||
|
||||
# Stop and return neutral
|
||||
node._cmd_vel_pub.publish(Twist())
|
||||
node._head_tilt_pub.publish(Float32(data=0.0))
|
||||
return True
|
||||
|
||||
|
||||
class TrickRoutinesNode(Node):
|
||||
"""ROS2 node for executing trick routines with voice command integration."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("trick_routines_node")
|
||||
|
||||
# ── Parameters ──────────────────────────────────────────────────────
|
||||
self.declare_parameter("obstacle_distance_m", 0.5)
|
||||
self.declare_parameter("cooldown_s", 10.0)
|
||||
self.declare_parameter("enable_voice_commands", True)
|
||||
|
||||
self._obstacle_dist = self.get_parameter("obstacle_distance_m").value
|
||||
self._cooldown_s = self.get_parameter("cooldown_s").value
|
||||
self._enable_voice = self.get_parameter("enable_voice_commands").value
|
||||
|
||||
# ── QoS ─────────────────────────────────────────────────────────────
|
||||
sensor_qos = QoSProfile(
|
||||
reliability=QoSReliabilityPolicy.BEST_EFFORT,
|
||||
history=QoSHistoryPolicy.KEEP_LAST,
|
||||
depth=10
|
||||
)
|
||||
reliable_qos = QoSProfile(
|
||||
reliability=QoSReliabilityPolicy.RELIABLE,
|
||||
history=QoSHistoryPolicy.KEEP_LAST,
|
||||
depth=10
|
||||
)
|
||||
|
||||
# ── Publishers ──────────────────────────────────────────────────────
|
||||
self._cmd_vel_pub = self.create_publisher(
|
||||
Twist, "/cmd_vel", sensor_qos)
|
||||
self._head_pan_pub = self.create_publisher(
|
||||
Float32, "/saltybot/head_pan", sensor_qos)
|
||||
self._head_tilt_pub = self.create_publisher(
|
||||
Float32, "/saltybot/head_tilt", sensor_qos)
|
||||
self._trick_state_pub = self.create_publisher(
|
||||
String, "/saltybot/trick_state", reliable_qos)
|
||||
|
||||
# Face emotion (optional, may not be available)
|
||||
try:
|
||||
if _HAS_SOCIAL_MSGS:
|
||||
self._face_emotion_pub = self.create_publisher(
|
||||
FaceEmotion, "/saltybot/face_emotion", sensor_qos)
|
||||
else:
|
||||
self._face_emotion_pub = None
|
||||
except Exception:
|
||||
self._face_emotion_pub = None
|
||||
|
||||
# ── Subscribers ─────────────────────────────────────────────────────
|
||||
if self._enable_voice:
|
||||
if _HAS_SOCIAL_MSGS:
|
||||
self._voice_sub = self.create_subscription(
|
||||
VoiceCommand,
|
||||
"/saltybot/voice_command",
|
||||
self._on_voice_command,
|
||||
reliable_qos
|
||||
)
|
||||
else:
|
||||
self.get_logger().warn(
|
||||
"saltybot_social_msgs not available, voice commands disabled"
|
||||
)
|
||||
|
||||
self._scan_sub = self.create_subscription(
|
||||
LaserScan,
|
||||
"/scan",
|
||||
self._on_scan,
|
||||
sensor_qos
|
||||
)
|
||||
|
||||
# ── State ───────────────────────────────────────────────────────────
|
||||
self._state = TrickState.IDLE
|
||||
self._abort_flag = False
|
||||
self._obstacle_detected = False
|
||||
self._last_trick_time = 0.0
|
||||
self._state_lock = threading.Lock()
|
||||
self._trick_thread: Optional[threading.Thread] = None
|
||||
|
||||
self.get_logger().info(
|
||||
f"trick_routines_node ready "
|
||||
f"(obstacle={self._obstacle_dist}m, cooldown={self._cooldown_s}s)"
|
||||
)
|
||||
self._publish_state()
|
||||
|
||||
def _on_voice_command(self, msg: VoiceCommand) -> None:
|
||||
"""Handle voice commands targeting trick routines.
|
||||
|
||||
Recognized intents: trick.spin, trick.dance, trick.nod,
|
||||
trick.celebrate, trick.shy (matched via intent_class).
|
||||
"""
|
||||
if not _HAS_SOCIAL_MSGS:
|
||||
return
|
||||
|
||||
intent = msg.intent.lower()
|
||||
|
||||
# Map voice commands to tricks
|
||||
trick_map = {
|
||||
"trick.spin": "spin",
|
||||
"trick.dance": "dance",
|
||||
"trick.nod": "nod",
|
||||
"trick.celebrate": "celebrate",
|
||||
"trick.shy": "shy",
|
||||
}
|
||||
|
||||
if intent not in trick_map:
|
||||
return
|
||||
|
||||
trick_name = trick_map[intent]
|
||||
self.get_logger().info(f"Voice command detected: {trick_name}")
|
||||
self._execute_trick(trick_name)
|
||||
|
||||
def _on_scan(self, msg: LaserScan) -> None:
|
||||
"""Monitor /scan for near-field obstacles.
|
||||
|
||||
Abort trick if any reading within obstacle_distance_m.
|
||||
"""
|
||||
# Check range readings from front hemisphere (roughly -90 to +90 degrees)
|
||||
if not msg.ranges:
|
||||
return
|
||||
|
||||
n = len(msg.ranges)
|
||||
front_start = n // 4 # Start at -90°
|
||||
front_end = 3 * n // 4 # End at +90°
|
||||
|
||||
for i in range(front_start, min(front_end, n)):
|
||||
r = msg.ranges[i]
|
||||
# Skip NaN/inf readings
|
||||
if math.isfinite(r) and 0 < r < self._obstacle_dist:
|
||||
self._obstacle_detected = True
|
||||
return
|
||||
|
||||
self._obstacle_detected = False
|
||||
|
||||
def _execute_trick(self, trick_name: str) -> None:
|
||||
"""Execute a named trick routine.
|
||||
|
||||
Enforces cooldown period and prevents concurrent execution.
|
||||
"""
|
||||
with self._state_lock:
|
||||
if self._state != TrickState.IDLE:
|
||||
self.get_logger().warn(
|
||||
f"Trick {trick_name} requested but in state {self._state.value}"
|
||||
)
|
||||
return
|
||||
|
||||
# Check cooldown
|
||||
elapsed = time.time() - self._last_trick_time
|
||||
if elapsed < self._cooldown_s:
|
||||
self.get_logger().info(
|
||||
f"Cooldown active ({elapsed:.1f}s/{self._cooldown_s}s), "
|
||||
f"rejecting {trick_name}"
|
||||
)
|
||||
return
|
||||
|
||||
self._state = TrickState.EXECUTING
|
||||
self._abort_flag = False
|
||||
|
||||
# Execute trick in background thread
|
||||
self._trick_thread = threading.Thread(
|
||||
target=self._trick_worker,
|
||||
args=(trick_name,)
|
||||
)
|
||||
self._trick_thread.daemon = True
|
||||
self._trick_thread.start()
|
||||
|
||||
def _trick_worker(self, trick_name: str) -> None:
|
||||
"""Background worker to execute trick routine."""
|
||||
try:
|
||||
self._publish_state(f"executing:{trick_name}")
|
||||
self.get_logger().info(f"Starting trick: {trick_name}")
|
||||
|
||||
# Dispatch to trick function
|
||||
trick_funcs = {
|
||||
"spin": TrickRoutines.spin,
|
||||
"dance": TrickRoutines.dance,
|
||||
"nod": lambda n, **kw: TrickRoutines.nod(n, "yes", **kw),
|
||||
"celebrate": TrickRoutines.celebrate,
|
||||
"shy": TrickRoutines.shy,
|
||||
}
|
||||
|
||||
if trick_name not in trick_funcs:
|
||||
self.get_logger().error(f"Unknown trick: {trick_name}")
|
||||
return
|
||||
|
||||
success = trick_funcs[trick_name](self)
|
||||
|
||||
if success:
|
||||
self.get_logger().info(f"Trick completed: {trick_name}")
|
||||
self._publish_state(f"completed:{trick_name}")
|
||||
else:
|
||||
self.get_logger().warn(f"Trick aborted: {trick_name}")
|
||||
self._publish_state(f"aborted:{trick_name}")
|
||||
|
||||
except Exception as e:
|
||||
self.get_logger().error(f"Trick execution error: {e}")
|
||||
self._publish_state("error")
|
||||
finally:
|
||||
# Ensure clean stop
|
||||
self._cmd_vel_pub.publish(Twist())
|
||||
self._head_pan_pub.publish(Float32(data=0.0))
|
||||
self._head_tilt_pub.publish(Float32(data=0.0))
|
||||
|
||||
with self._state_lock:
|
||||
self._state = TrickState.COOLDOWN
|
||||
self._last_trick_time = time.time()
|
||||
|
||||
# Cooldown period
|
||||
time.sleep(self._cooldown_s)
|
||||
|
||||
with self._state_lock:
|
||||
self._state = TrickState.IDLE
|
||||
self._publish_state()
|
||||
|
||||
def _publish_state(self, state_str: str = "idle") -> None:
|
||||
"""Publish current trick state."""
|
||||
msg = String()
|
||||
msg.data = state_str
|
||||
self._trick_state_pub.publish(msg)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = TrickRoutinesNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
5
jetson/ros2_ws/src/saltybot_tricks/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_tricks/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_tricks/scripts
|
||||
[egg_info]
|
||||
tag_build =
|
||||
tag_date = 0
|
||||
32
jetson/ros2_ws/src/saltybot_tricks/setup.py
Normal file
32
jetson/ros2_ws/src/saltybot_tricks/setup.py
Normal file
@ -0,0 +1,32 @@
|
||||
from setuptools import find_packages, setup
|
||||
import os
|
||||
from glob import glob
|
||||
|
||||
package_name = 'saltybot_tricks'
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version='0.1.0',
|
||||
packages=find_packages(exclude=['test']),
|
||||
data_files=[
|
||||
('share/ament_index/resource_index/packages',
|
||||
['resource/' + package_name]),
|
||||
('share/' + package_name, ['package.xml']),
|
||||
(os.path.join('share', package_name, 'launch'),
|
||||
glob(os.path.join('launch', '*launch.[pxy][yma]*'))),
|
||||
(os.path.join('share', package_name, 'config'),
|
||||
glob(os.path.join('config', '*.yaml'))),
|
||||
],
|
||||
install_requires=['setuptools'],
|
||||
zip_safe=True,
|
||||
maintainer='seb',
|
||||
maintainer_email='seb@vayrette.com',
|
||||
description='Fun trick routines for SaltyBot (Issue #431)',
|
||||
license='MIT',
|
||||
tests_require=['pytest'],
|
||||
entry_points={
|
||||
'console_scripts': [
|
||||
'trick_routines_node = saltybot_tricks.trick_routines_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
48
jetson/ros2_ws/src/saltybot_tricks/test/test_tricks.py
Normal file
48
jetson/ros2_ws/src/saltybot_tricks/test/test_tricks.py
Normal file
@ -0,0 +1,48 @@
|
||||
"""Unit tests for trick routines node (Issue #431)."""
|
||||
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
|
||||
class TestTrickRoutines(unittest.TestCase):
|
||||
"""Test trick routine module structure and imports."""
|
||||
|
||||
def test_module_imports(self):
|
||||
"""Verify saltybot_tricks module can be imported."""
|
||||
try:
|
||||
import saltybot_tricks
|
||||
self.assertIsNotNone(saltybot_tricks)
|
||||
except ImportError as e:
|
||||
self.fail(f"Failed to import saltybot_tricks: {e}")
|
||||
|
||||
def test_trick_routines_class_exists(self):
|
||||
"""Verify TrickRoutines class is defined."""
|
||||
try:
|
||||
from saltybot_tricks.trick_routines_node import TrickRoutines
|
||||
self.assertTrue(hasattr(TrickRoutines, 'spin'))
|
||||
self.assertTrue(hasattr(TrickRoutines, 'dance'))
|
||||
self.assertTrue(hasattr(TrickRoutines, 'nod'))
|
||||
self.assertTrue(hasattr(TrickRoutines, 'celebrate'))
|
||||
self.assertTrue(hasattr(TrickRoutines, 'shy'))
|
||||
except ImportError as e:
|
||||
self.fail(f"Failed to import TrickRoutines: {e}")
|
||||
|
||||
def test_node_class_exists(self):
|
||||
"""Verify TrickRoutinesNode class is defined."""
|
||||
try:
|
||||
from saltybot_tricks.trick_routines_node import TrickRoutinesNode
|
||||
self.assertIsNotNone(TrickRoutinesNode)
|
||||
except ImportError as e:
|
||||
self.fail(f"Failed to import TrickRoutinesNode: {e}")
|
||||
|
||||
def test_main_function_exists(self):
|
||||
"""Verify main function is defined."""
|
||||
try:
|
||||
from saltybot_tricks.trick_routines_node import main
|
||||
self.assertTrue(callable(main))
|
||||
except ImportError as e:
|
||||
self.fail(f"Failed to import main: {e}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Loading…
x
Reference in New Issue
Block a user