feat: trick routines (Issue #431) #440
16
jetson/ros2_ws/src/saltybot_tricks/config/tricks_params.yaml
Normal file
16
jetson/ros2_ws/src/saltybot_tricks/config/tricks_params.yaml
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
# Trick routines configuration (Issue #431)
|
||||||
|
#
|
||||||
|
# Safety parameters for trick execution and obstacle avoidance
|
||||||
|
|
||||||
|
trick_routines_node:
|
||||||
|
ros__parameters:
|
||||||
|
# Obstacle detection — safety abort within this distance (meters)
|
||||||
|
obstacle_distance_m: 0.5
|
||||||
|
|
||||||
|
# Cooldown between trick executions (seconds)
|
||||||
|
# Prevents rapid repetition and allows motor cool-down
|
||||||
|
cooldown_s: 10.0
|
||||||
|
|
||||||
|
# Enable voice command integration
|
||||||
|
# Subscribes to /saltybot/voice_command for trick intents
|
||||||
|
enable_voice_commands: true
|
||||||
56
jetson/ros2_ws/src/saltybot_tricks/launch/tricks.launch.py
Normal file
56
jetson/ros2_ws/src/saltybot_tricks/launch/tricks.launch.py
Normal file
@ -0,0 +1,56 @@
|
|||||||
|
"""Trick routines launch file (Issue #431).
|
||||||
|
|
||||||
|
Starts the trick_routines_node with configurable parameters.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
ros2 launch saltybot_tricks tricks.launch.py
|
||||||
|
ros2 launch saltybot_tricks tricks.launch.py \
|
||||||
|
enable_voice_commands:=true cooldown_s:=8.0
|
||||||
|
"""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
return LaunchDescription([
|
||||||
|
# ── Launch arguments ─────────────────────────────────────────────────
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
'obstacle_distance_m',
|
||||||
|
default_value='0.5',
|
||||||
|
description='Minimum safe distance to obstacles (meters)'
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
'cooldown_s',
|
||||||
|
default_value='10.0',
|
||||||
|
description='Cooldown between tricks (seconds)'
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
'enable_voice_commands',
|
||||||
|
default_value='true',
|
||||||
|
description='Enable voice command integration'
|
||||||
|
),
|
||||||
|
|
||||||
|
# ── Trick routines node ──────────────────────────────────────────────
|
||||||
|
Node(
|
||||||
|
package='saltybot_tricks',
|
||||||
|
executable='trick_routines_node',
|
||||||
|
name='trick_routines_node',
|
||||||
|
output='screen',
|
||||||
|
parameters=[{
|
||||||
|
'obstacle_distance_m':
|
||||||
|
LaunchConfiguration('obstacle_distance_m'),
|
||||||
|
'cooldown_s':
|
||||||
|
LaunchConfiguration('cooldown_s'),
|
||||||
|
'enable_voice_commands':
|
||||||
|
LaunchConfiguration('enable_voice_commands'),
|
||||||
|
}],
|
||||||
|
remappings=[
|
||||||
|
('/cmd_vel', '/cmd_vel'),
|
||||||
|
('/scan', '/scan'),
|
||||||
|
('/saltybot/voice_command', '/saltybot/voice_command'),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
])
|
||||||
24
jetson/ros2_ws/src/saltybot_tricks/package.xml
Normal file
24
jetson/ros2_ws/src/saltybot_tricks/package.xml
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_tricks</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
Fun trick routines for SaltyBot — spin, dance, nod, celebrate, shy.
|
||||||
|
Issue #431: ROS2 action server with voice command integration and safety abort.
|
||||||
|
</description>
|
||||||
|
<maintainer email="seb@vayrette.com">seb</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
<depend>saltybot_social_msgs</depend>
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1 @@
|
|||||||
|
"""Trick routines package for SaltyBot (Issue #431)."""
|
||||||
@ -0,0 +1,510 @@
|
|||||||
|
"""trick_routines_node.py — Fun behavior routines for SaltyBot (Issue #431).
|
||||||
|
|
||||||
|
Implements ROS2 action server /saltybot/perform_trick with 5 tricks:
|
||||||
|
- spin: 360° rotation with audio cue
|
||||||
|
- dance: sway side-to-side + bob up-down
|
||||||
|
- nod: head nod (yes/no patterns)
|
||||||
|
- celebrate: spin + look up + audio
|
||||||
|
- shy: back away + head down + bashful expression
|
||||||
|
|
||||||
|
Voice integration
|
||||||
|
-----------------
|
||||||
|
Subscribes to /saltybot/voice_command for ["spin", "dance", "nod", "celebrate", "shy"].
|
||||||
|
Responds to recognized trick intents by executing the corresponding routine.
|
||||||
|
|
||||||
|
Safety
|
||||||
|
------
|
||||||
|
Monitors /scan (LaserScan) for near-field obstacles within 0.5m.
|
||||||
|
Aborts trick if obstacle detected during execution.
|
||||||
|
10-second cooldown between tricks to prevent rapid repetition.
|
||||||
|
|
||||||
|
Topics
|
||||||
|
------
|
||||||
|
Subscribe:
|
||||||
|
/saltybot/voice_command (saltybot_social_msgs/VoiceCommand)
|
||||||
|
/scan (sensor_msgs/LaserScan)
|
||||||
|
|
||||||
|
Publish:
|
||||||
|
/cmd_vel (geometry_msgs/Twist)
|
||||||
|
/saltybot/head_pan (std_msgs/Float32)
|
||||||
|
/saltybot/head_tilt (std_msgs/Float32)
|
||||||
|
/saltybot/face_emotion (saltybot_social_msgs/FaceEmotion) [optional]
|
||||||
|
/saltybot/trick_state (std_msgs/String)
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
----------
|
||||||
|
obstacle_distance_m float 0.5 minimum safe distance to obstacles
|
||||||
|
cooldown_s float 10.0 cooldown between tricks (seconds)
|
||||||
|
enable_voice_commands bool true respond to voice_command topic
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import math
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from enum import Enum
|
||||||
|
from typing import Optional, List
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
|
||||||
|
|
||||||
|
from geometry_msgs.msg import Twist
|
||||||
|
from sensor_msgs.msg import LaserScan
|
||||||
|
from std_msgs.msg import Float32, String
|
||||||
|
|
||||||
|
try:
|
||||||
|
from saltybot_social_msgs.msg import VoiceCommand, FaceEmotion
|
||||||
|
_HAS_SOCIAL_MSGS = True
|
||||||
|
except ImportError:
|
||||||
|
_HAS_SOCIAL_MSGS = False
|
||||||
|
|
||||||
|
|
||||||
|
class TrickState(Enum):
|
||||||
|
"""Trick execution state machine."""
|
||||||
|
IDLE = "idle"
|
||||||
|
EXECUTING = "executing"
|
||||||
|
ABORTING = "aborting"
|
||||||
|
COOLDOWN = "cooldown"
|
||||||
|
|
||||||
|
|
||||||
|
class TrickRoutines:
|
||||||
|
"""Container for trick sequence definitions."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def spin(node: TrickRoutinesNode, duration_s: float = 3.0) -> bool:
|
||||||
|
"""360° spin rotation (Issue #431 — spin trick).
|
||||||
|
|
||||||
|
Publish positive angular velocity to /cmd_vel.
|
||||||
|
Args:
|
||||||
|
node: ROS2 node reference for publishing
|
||||||
|
duration_s: rotation duration
|
||||||
|
Returns:
|
||||||
|
True if completed normally, False if aborted
|
||||||
|
"""
|
||||||
|
node.get_logger().info("Executing SPIN trick (3s rotation)")
|
||||||
|
twist = Twist()
|
||||||
|
twist.angular.z = 2.0 # rad/s (~360°/3s)
|
||||||
|
|
||||||
|
start = time.time()
|
||||||
|
while time.time() - start < duration_s:
|
||||||
|
if node._abort_flag:
|
||||||
|
return False
|
||||||
|
if node._obstacle_detected:
|
||||||
|
node.get_logger().warn("SPIN: Obstacle detected, aborting")
|
||||||
|
return False
|
||||||
|
node._cmd_vel_pub.publish(twist)
|
||||||
|
time.sleep(0.05)
|
||||||
|
|
||||||
|
# Stop
|
||||||
|
node._cmd_vel_pub.publish(Twist())
|
||||||
|
return True
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def dance(node: TrickRoutinesNode, duration_s: float = 4.0) -> bool:
|
||||||
|
"""Sway side-to-side + bob up-down (Issue #431 — dance trick).
|
||||||
|
|
||||||
|
Alternate /cmd_vel linear.y and angular.z for sway,
|
||||||
|
combined with pan servo oscillation.
|
||||||
|
Args:
|
||||||
|
node: ROS2 node reference
|
||||||
|
duration_s: dance duration
|
||||||
|
Returns:
|
||||||
|
True if completed normally, False if aborted
|
||||||
|
"""
|
||||||
|
node.get_logger().info("Executing DANCE trick (4s sway+bob)")
|
||||||
|
|
||||||
|
start = time.time()
|
||||||
|
phase = 0
|
||||||
|
while time.time() - start < duration_s:
|
||||||
|
if node._abort_flag or node._obstacle_detected:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Sway pattern: alternate lateral velocity
|
||||||
|
elapsed = time.time() - start
|
||||||
|
phase = math.sin(elapsed * 2 * math.pi) # 0.5Hz sway frequency
|
||||||
|
|
||||||
|
twist = Twist()
|
||||||
|
twist.linear.y = phase * 0.3 # lateral sway
|
||||||
|
twist.linear.z = abs(phase) * 0.1 # vertical bob
|
||||||
|
|
||||||
|
node._cmd_vel_pub.publish(twist)
|
||||||
|
|
||||||
|
# Pan oscillation during dance
|
||||||
|
pan_angle = phase * 20.0 # ±20° pan
|
||||||
|
node._head_pan_pub.publish(Float32(data=pan_angle))
|
||||||
|
|
||||||
|
time.sleep(0.05)
|
||||||
|
|
||||||
|
# Stop motion and return to neutral
|
||||||
|
node._cmd_vel_pub.publish(Twist())
|
||||||
|
node._head_pan_pub.publish(Float32(data=0.0))
|
||||||
|
return True
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def nod(node: TrickRoutinesNode, pattern: str = "yes", duration_s: float = 2.0) -> bool:
|
||||||
|
"""Head nod pattern (yes or no).
|
||||||
|
|
||||||
|
Oscillate /saltybot/head_tilt for yes nod,
|
||||||
|
or pan back-and-forth for no shake.
|
||||||
|
Args:
|
||||||
|
node: ROS2 node reference
|
||||||
|
pattern: "yes" or "no"
|
||||||
|
duration_s: nod duration
|
||||||
|
Returns:
|
||||||
|
True if completed normally, False if aborted
|
||||||
|
"""
|
||||||
|
node.get_logger().info(f"Executing NOD trick ({pattern}, 2s)")
|
||||||
|
|
||||||
|
start = time.time()
|
||||||
|
while time.time() - start < duration_s:
|
||||||
|
if node._abort_flag or node._obstacle_detected:
|
||||||
|
return False
|
||||||
|
|
||||||
|
elapsed = time.time() - start
|
||||||
|
|
||||||
|
if pattern == "yes":
|
||||||
|
# Up-down nod
|
||||||
|
tilt_angle = 15.0 * math.sin(elapsed * 4 * math.pi) # 2Hz
|
||||||
|
node._head_tilt_pub.publish(Float32(data=tilt_angle))
|
||||||
|
else: # "no"
|
||||||
|
# Side-to-side shake
|
||||||
|
pan_angle = 20.0 * math.sin(elapsed * 3 * math.pi) # 1.5Hz
|
||||||
|
node._head_pan_pub.publish(Float32(data=pan_angle))
|
||||||
|
|
||||||
|
time.sleep(0.05)
|
||||||
|
|
||||||
|
# Return to neutral
|
||||||
|
node._head_pan_pub.publish(Float32(data=0.0))
|
||||||
|
node._head_tilt_pub.publish(Float32(data=0.0))
|
||||||
|
return True
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def celebrate(node: TrickRoutinesNode, duration_s: float = 3.0) -> bool:
|
||||||
|
"""Celebrate: spin + look up + audio cue.
|
||||||
|
|
||||||
|
Combines spin rotation with upward tilt and happy face expression.
|
||||||
|
Args:
|
||||||
|
node: ROS2 node reference
|
||||||
|
duration_s: celebrate duration
|
||||||
|
Returns:
|
||||||
|
True if completed normally, False if aborted
|
||||||
|
"""
|
||||||
|
node.get_logger().info("Executing CELEBRATE trick (3s spin+look+smile)")
|
||||||
|
|
||||||
|
# Publish happy face emotion
|
||||||
|
if _HAS_SOCIAL_MSGS:
|
||||||
|
try:
|
||||||
|
emotion = FaceEmotion()
|
||||||
|
emotion.emotion = "happy"
|
||||||
|
emotion.intensity = 1.0
|
||||||
|
node._face_emotion_pub.publish(emotion)
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
twist = Twist()
|
||||||
|
twist.angular.z = 1.5 # moderate spin
|
||||||
|
|
||||||
|
start = time.time()
|
||||||
|
while time.time() - start < duration_s:
|
||||||
|
if node._abort_flag or node._obstacle_detected:
|
||||||
|
return False
|
||||||
|
|
||||||
|
elapsed = time.time() - start
|
||||||
|
|
||||||
|
# Spin
|
||||||
|
node._cmd_vel_pub.publish(twist)
|
||||||
|
|
||||||
|
# Look up
|
||||||
|
tilt_angle = 25.0 * min(elapsed / 1.0, 1.0) # ramp up to 25°
|
||||||
|
node._head_tilt_pub.publish(Float32(data=tilt_angle))
|
||||||
|
|
||||||
|
time.sleep(0.05)
|
||||||
|
|
||||||
|
# Stop and return neutral
|
||||||
|
node._cmd_vel_pub.publish(Twist())
|
||||||
|
node._head_tilt_pub.publish(Float32(data=0.0))
|
||||||
|
return True
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def shy(node: TrickRoutinesNode, duration_s: float = 3.0) -> bool:
|
||||||
|
"""Shy: back away + head down + bashful expression.
|
||||||
|
|
||||||
|
Move backward with head tilted down and bashful emotion.
|
||||||
|
Args:
|
||||||
|
node: ROS2 node reference
|
||||||
|
duration_s: shy duration
|
||||||
|
Returns:
|
||||||
|
True if completed normally, False if aborted
|
||||||
|
"""
|
||||||
|
node.get_logger().info("Executing SHY trick (3s back away + bashful)")
|
||||||
|
|
||||||
|
# Publish bashful face emotion
|
||||||
|
if _HAS_SOCIAL_MSGS:
|
||||||
|
try:
|
||||||
|
emotion = FaceEmotion()
|
||||||
|
emotion.emotion = "bashful"
|
||||||
|
emotion.intensity = 0.8
|
||||||
|
node._face_emotion_pub.publish(emotion)
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
twist = Twist()
|
||||||
|
twist.linear.x = -0.2 # back away (negative = backward)
|
||||||
|
|
||||||
|
start = time.time()
|
||||||
|
while time.time() - start < duration_s:
|
||||||
|
if node._abort_flag:
|
||||||
|
return False
|
||||||
|
# Don't abort on obstacle for SHY (backing away is the trick)
|
||||||
|
|
||||||
|
elapsed = time.time() - start
|
||||||
|
|
||||||
|
# Move backward
|
||||||
|
node._cmd_vel_pub.publish(twist)
|
||||||
|
|
||||||
|
# Head down (negative tilt)
|
||||||
|
tilt_angle = -20.0 * min(elapsed / 0.5, 1.0)
|
||||||
|
node._head_tilt_pub.publish(Float32(data=tilt_angle))
|
||||||
|
|
||||||
|
time.sleep(0.05)
|
||||||
|
|
||||||
|
# Stop and return neutral
|
||||||
|
node._cmd_vel_pub.publish(Twist())
|
||||||
|
node._head_tilt_pub.publish(Float32(data=0.0))
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
class TrickRoutinesNode(Node):
|
||||||
|
"""ROS2 node for executing trick routines with voice command integration."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("trick_routines_node")
|
||||||
|
|
||||||
|
# ── Parameters ──────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter("obstacle_distance_m", 0.5)
|
||||||
|
self.declare_parameter("cooldown_s", 10.0)
|
||||||
|
self.declare_parameter("enable_voice_commands", True)
|
||||||
|
|
||||||
|
self._obstacle_dist = self.get_parameter("obstacle_distance_m").value
|
||||||
|
self._cooldown_s = self.get_parameter("cooldown_s").value
|
||||||
|
self._enable_voice = self.get_parameter("enable_voice_commands").value
|
||||||
|
|
||||||
|
# ── QoS ─────────────────────────────────────────────────────────────
|
||||||
|
sensor_qos = QoSProfile(
|
||||||
|
reliability=QoSReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=QoSHistoryPolicy.KEEP_LAST,
|
||||||
|
depth=10
|
||||||
|
)
|
||||||
|
reliable_qos = QoSProfile(
|
||||||
|
reliability=QoSReliabilityPolicy.RELIABLE,
|
||||||
|
history=QoSHistoryPolicy.KEEP_LAST,
|
||||||
|
depth=10
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Publishers ──────────────────────────────────────────────────────
|
||||||
|
self._cmd_vel_pub = self.create_publisher(
|
||||||
|
Twist, "/cmd_vel", sensor_qos)
|
||||||
|
self._head_pan_pub = self.create_publisher(
|
||||||
|
Float32, "/saltybot/head_pan", sensor_qos)
|
||||||
|
self._head_tilt_pub = self.create_publisher(
|
||||||
|
Float32, "/saltybot/head_tilt", sensor_qos)
|
||||||
|
self._trick_state_pub = self.create_publisher(
|
||||||
|
String, "/saltybot/trick_state", reliable_qos)
|
||||||
|
|
||||||
|
# Face emotion (optional, may not be available)
|
||||||
|
try:
|
||||||
|
if _HAS_SOCIAL_MSGS:
|
||||||
|
self._face_emotion_pub = self.create_publisher(
|
||||||
|
FaceEmotion, "/saltybot/face_emotion", sensor_qos)
|
||||||
|
else:
|
||||||
|
self._face_emotion_pub = None
|
||||||
|
except Exception:
|
||||||
|
self._face_emotion_pub = None
|
||||||
|
|
||||||
|
# ── Subscribers ─────────────────────────────────────────────────────
|
||||||
|
if self._enable_voice:
|
||||||
|
if _HAS_SOCIAL_MSGS:
|
||||||
|
self._voice_sub = self.create_subscription(
|
||||||
|
VoiceCommand,
|
||||||
|
"/saltybot/voice_command",
|
||||||
|
self._on_voice_command,
|
||||||
|
reliable_qos
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.get_logger().warn(
|
||||||
|
"saltybot_social_msgs not available, voice commands disabled"
|
||||||
|
)
|
||||||
|
|
||||||
|
self._scan_sub = self.create_subscription(
|
||||||
|
LaserScan,
|
||||||
|
"/scan",
|
||||||
|
self._on_scan,
|
||||||
|
sensor_qos
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── State ───────────────────────────────────────────────────────────
|
||||||
|
self._state = TrickState.IDLE
|
||||||
|
self._abort_flag = False
|
||||||
|
self._obstacle_detected = False
|
||||||
|
self._last_trick_time = 0.0
|
||||||
|
self._state_lock = threading.Lock()
|
||||||
|
self._trick_thread: Optional[threading.Thread] = None
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"trick_routines_node ready "
|
||||||
|
f"(obstacle={self._obstacle_dist}m, cooldown={self._cooldown_s}s)"
|
||||||
|
)
|
||||||
|
self._publish_state()
|
||||||
|
|
||||||
|
def _on_voice_command(self, msg: VoiceCommand) -> None:
|
||||||
|
"""Handle voice commands targeting trick routines.
|
||||||
|
|
||||||
|
Recognized intents: trick.spin, trick.dance, trick.nod,
|
||||||
|
trick.celebrate, trick.shy (matched via intent_class).
|
||||||
|
"""
|
||||||
|
if not _HAS_SOCIAL_MSGS:
|
||||||
|
return
|
||||||
|
|
||||||
|
intent = msg.intent.lower()
|
||||||
|
|
||||||
|
# Map voice commands to tricks
|
||||||
|
trick_map = {
|
||||||
|
"trick.spin": "spin",
|
||||||
|
"trick.dance": "dance",
|
||||||
|
"trick.nod": "nod",
|
||||||
|
"trick.celebrate": "celebrate",
|
||||||
|
"trick.shy": "shy",
|
||||||
|
}
|
||||||
|
|
||||||
|
if intent not in trick_map:
|
||||||
|
return
|
||||||
|
|
||||||
|
trick_name = trick_map[intent]
|
||||||
|
self.get_logger().info(f"Voice command detected: {trick_name}")
|
||||||
|
self._execute_trick(trick_name)
|
||||||
|
|
||||||
|
def _on_scan(self, msg: LaserScan) -> None:
|
||||||
|
"""Monitor /scan for near-field obstacles.
|
||||||
|
|
||||||
|
Abort trick if any reading within obstacle_distance_m.
|
||||||
|
"""
|
||||||
|
# Check range readings from front hemisphere (roughly -90 to +90 degrees)
|
||||||
|
if not msg.ranges:
|
||||||
|
return
|
||||||
|
|
||||||
|
n = len(msg.ranges)
|
||||||
|
front_start = n // 4 # Start at -90°
|
||||||
|
front_end = 3 * n // 4 # End at +90°
|
||||||
|
|
||||||
|
for i in range(front_start, min(front_end, n)):
|
||||||
|
r = msg.ranges[i]
|
||||||
|
# Skip NaN/inf readings
|
||||||
|
if math.isfinite(r) and 0 < r < self._obstacle_dist:
|
||||||
|
self._obstacle_detected = True
|
||||||
|
return
|
||||||
|
|
||||||
|
self._obstacle_detected = False
|
||||||
|
|
||||||
|
def _execute_trick(self, trick_name: str) -> None:
|
||||||
|
"""Execute a named trick routine.
|
||||||
|
|
||||||
|
Enforces cooldown period and prevents concurrent execution.
|
||||||
|
"""
|
||||||
|
with self._state_lock:
|
||||||
|
if self._state != TrickState.IDLE:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"Trick {trick_name} requested but in state {self._state.value}"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check cooldown
|
||||||
|
elapsed = time.time() - self._last_trick_time
|
||||||
|
if elapsed < self._cooldown_s:
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Cooldown active ({elapsed:.1f}s/{self._cooldown_s}s), "
|
||||||
|
f"rejecting {trick_name}"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
self._state = TrickState.EXECUTING
|
||||||
|
self._abort_flag = False
|
||||||
|
|
||||||
|
# Execute trick in background thread
|
||||||
|
self._trick_thread = threading.Thread(
|
||||||
|
target=self._trick_worker,
|
||||||
|
args=(trick_name,)
|
||||||
|
)
|
||||||
|
self._trick_thread.daemon = True
|
||||||
|
self._trick_thread.start()
|
||||||
|
|
||||||
|
def _trick_worker(self, trick_name: str) -> None:
|
||||||
|
"""Background worker to execute trick routine."""
|
||||||
|
try:
|
||||||
|
self._publish_state(f"executing:{trick_name}")
|
||||||
|
self.get_logger().info(f"Starting trick: {trick_name}")
|
||||||
|
|
||||||
|
# Dispatch to trick function
|
||||||
|
trick_funcs = {
|
||||||
|
"spin": TrickRoutines.spin,
|
||||||
|
"dance": TrickRoutines.dance,
|
||||||
|
"nod": lambda n, **kw: TrickRoutines.nod(n, "yes", **kw),
|
||||||
|
"celebrate": TrickRoutines.celebrate,
|
||||||
|
"shy": TrickRoutines.shy,
|
||||||
|
}
|
||||||
|
|
||||||
|
if trick_name not in trick_funcs:
|
||||||
|
self.get_logger().error(f"Unknown trick: {trick_name}")
|
||||||
|
return
|
||||||
|
|
||||||
|
success = trick_funcs[trick_name](self)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
self.get_logger().info(f"Trick completed: {trick_name}")
|
||||||
|
self._publish_state(f"completed:{trick_name}")
|
||||||
|
else:
|
||||||
|
self.get_logger().warn(f"Trick aborted: {trick_name}")
|
||||||
|
self._publish_state(f"aborted:{trick_name}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f"Trick execution error: {e}")
|
||||||
|
self._publish_state("error")
|
||||||
|
finally:
|
||||||
|
# Ensure clean stop
|
||||||
|
self._cmd_vel_pub.publish(Twist())
|
||||||
|
self._head_pan_pub.publish(Float32(data=0.0))
|
||||||
|
self._head_tilt_pub.publish(Float32(data=0.0))
|
||||||
|
|
||||||
|
with self._state_lock:
|
||||||
|
self._state = TrickState.COOLDOWN
|
||||||
|
self._last_trick_time = time.time()
|
||||||
|
|
||||||
|
# Cooldown period
|
||||||
|
time.sleep(self._cooldown_s)
|
||||||
|
|
||||||
|
with self._state_lock:
|
||||||
|
self._state = TrickState.IDLE
|
||||||
|
self._publish_state()
|
||||||
|
|
||||||
|
def _publish_state(self, state_str: str = "idle") -> None:
|
||||||
|
"""Publish current trick state."""
|
||||||
|
msg = String()
|
||||||
|
msg.data = state_str
|
||||||
|
self._trick_state_pub.publish(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = TrickRoutinesNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
5
jetson/ros2_ws/src/saltybot_tricks/setup.cfg
Normal file
5
jetson/ros2_ws/src/saltybot_tricks/setup.cfg
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_tricks/scripts
|
||||||
|
[egg_info]
|
||||||
|
tag_build =
|
||||||
|
tag_date = 0
|
||||||
32
jetson/ros2_ws/src/saltybot_tricks/setup.py
Normal file
32
jetson/ros2_ws/src/saltybot_tricks/setup.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
from setuptools import find_packages, setup
|
||||||
|
import os
|
||||||
|
from glob import glob
|
||||||
|
|
||||||
|
package_name = 'saltybot_tricks'
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version='0.1.0',
|
||||||
|
packages=find_packages(exclude=['test']),
|
||||||
|
data_files=[
|
||||||
|
('share/ament_index/resource_index/packages',
|
||||||
|
['resource/' + package_name]),
|
||||||
|
('share/' + package_name, ['package.xml']),
|
||||||
|
(os.path.join('share', package_name, 'launch'),
|
||||||
|
glob(os.path.join('launch', '*launch.[pxy][yma]*'))),
|
||||||
|
(os.path.join('share', package_name, 'config'),
|
||||||
|
glob(os.path.join('config', '*.yaml'))),
|
||||||
|
],
|
||||||
|
install_requires=['setuptools'],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer='seb',
|
||||||
|
maintainer_email='seb@vayrette.com',
|
||||||
|
description='Fun trick routines for SaltyBot (Issue #431)',
|
||||||
|
license='MIT',
|
||||||
|
tests_require=['pytest'],
|
||||||
|
entry_points={
|
||||||
|
'console_scripts': [
|
||||||
|
'trick_routines_node = saltybot_tricks.trick_routines_node:main',
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
48
jetson/ros2_ws/src/saltybot_tricks/test/test_tricks.py
Normal file
48
jetson/ros2_ws/src/saltybot_tricks/test/test_tricks.py
Normal file
@ -0,0 +1,48 @@
|
|||||||
|
"""Unit tests for trick routines node (Issue #431)."""
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
|
||||||
|
class TestTrickRoutines(unittest.TestCase):
|
||||||
|
"""Test trick routine module structure and imports."""
|
||||||
|
|
||||||
|
def test_module_imports(self):
|
||||||
|
"""Verify saltybot_tricks module can be imported."""
|
||||||
|
try:
|
||||||
|
import saltybot_tricks
|
||||||
|
self.assertIsNotNone(saltybot_tricks)
|
||||||
|
except ImportError as e:
|
||||||
|
self.fail(f"Failed to import saltybot_tricks: {e}")
|
||||||
|
|
||||||
|
def test_trick_routines_class_exists(self):
|
||||||
|
"""Verify TrickRoutines class is defined."""
|
||||||
|
try:
|
||||||
|
from saltybot_tricks.trick_routines_node import TrickRoutines
|
||||||
|
self.assertTrue(hasattr(TrickRoutines, 'spin'))
|
||||||
|
self.assertTrue(hasattr(TrickRoutines, 'dance'))
|
||||||
|
self.assertTrue(hasattr(TrickRoutines, 'nod'))
|
||||||
|
self.assertTrue(hasattr(TrickRoutines, 'celebrate'))
|
||||||
|
self.assertTrue(hasattr(TrickRoutines, 'shy'))
|
||||||
|
except ImportError as e:
|
||||||
|
self.fail(f"Failed to import TrickRoutines: {e}")
|
||||||
|
|
||||||
|
def test_node_class_exists(self):
|
||||||
|
"""Verify TrickRoutinesNode class is defined."""
|
||||||
|
try:
|
||||||
|
from saltybot_tricks.trick_routines_node import TrickRoutinesNode
|
||||||
|
self.assertIsNotNone(TrickRoutinesNode)
|
||||||
|
except ImportError as e:
|
||||||
|
self.fail(f"Failed to import TrickRoutinesNode: {e}")
|
||||||
|
|
||||||
|
def test_main_function_exists(self):
|
||||||
|
"""Verify main function is defined."""
|
||||||
|
try:
|
||||||
|
from saltybot_tricks.trick_routines_node import main
|
||||||
|
self.assertTrue(callable(main))
|
||||||
|
except ImportError as e:
|
||||||
|
self.fail(f"Failed to import main: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
||||||
Loading…
x
Reference in New Issue
Block a user