Compare commits
3 Commits
83e3033abe
...
80ee9ece87
| Author | SHA1 | Date | |
|---|---|---|---|
| 80ee9ece87 | |||
|
|
868b453777 | ||
| 6d6909d9d9 |
@ -194,6 +194,12 @@ def generate_launch_description():
|
|||||||
description="Launch rosbridge WebSocket server (port 9090)",
|
description="Launch rosbridge WebSocket server (port 9090)",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
enable_mission_logging_arg = DeclareLaunchArgument(
|
||||||
|
"enable_mission_logging",
|
||||||
|
default_value="true",
|
||||||
|
description="Launch ROS2 bag recorder for mission logging (Issue #488)",
|
||||||
|
)
|
||||||
|
|
||||||
enable_docking_arg = DeclareLaunchArgument(
|
enable_docking_arg = DeclareLaunchArgument(
|
||||||
"enable_docking",
|
"enable_docking",
|
||||||
default_value="true",
|
default_value="true",
|
||||||
@ -259,6 +265,22 @@ def generate_launch_description():
|
|||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# ── t=0.5s Mission logging (bag recorder; Issue #488) ───────────────────
|
||||||
|
mission_logging = TimerAction(
|
||||||
|
period=0.5,
|
||||||
|
actions=[
|
||||||
|
GroupAction(
|
||||||
|
condition=IfCondition(LaunchConfiguration("enable_mission_logging")),
|
||||||
|
actions=[
|
||||||
|
LogInfo(msg="[full_stack] Starting mission logging recorder"),
|
||||||
|
IncludeLaunchDescription(
|
||||||
|
_launch("saltybot_bag_recorder", "launch", "bag_recorder.launch.py"),
|
||||||
|
),
|
||||||
|
],
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
# ── t=2s cmd_vel safety bridge (depends on STM32 bridge) ────────────────
|
# ── t=2s cmd_vel safety bridge (depends on STM32 bridge) ────────────────
|
||||||
cmd_vel_bridge = TimerAction(
|
cmd_vel_bridge = TimerAction(
|
||||||
period=2.0,
|
period=2.0,
|
||||||
@ -493,6 +515,7 @@ def generate_launch_description():
|
|||||||
enable_follower_arg,
|
enable_follower_arg,
|
||||||
enable_bridge_arg,
|
enable_bridge_arg,
|
||||||
enable_rosbridge_arg,
|
enable_rosbridge_arg,
|
||||||
|
enable_mission_logging_arg,
|
||||||
enable_docking_arg,
|
enable_docking_arg,
|
||||||
follow_distance_arg,
|
follow_distance_arg,
|
||||||
max_linear_vel_arg,
|
max_linear_vel_arg,
|
||||||
@ -507,6 +530,9 @@ def generate_launch_description():
|
|||||||
robot_description,
|
robot_description,
|
||||||
stm32_bridge,
|
stm32_bridge,
|
||||||
|
|
||||||
|
# t=0.5s
|
||||||
|
mission_logging,
|
||||||
|
|
||||||
# t=2s
|
# t=2s
|
||||||
sensors,
|
sensors,
|
||||||
cmd_vel_bridge,
|
cmd_vel_bridge,
|
||||||
|
|||||||
@ -0,0 +1,11 @@
|
|||||||
|
# Sensor Fusion Configuration (Issue #490)
|
||||||
|
sensor_fusion:
|
||||||
|
ros__parameters:
|
||||||
|
lidar_topic: "/scan"
|
||||||
|
depth_topic: "/depth_scan"
|
||||||
|
fused_topic: "/scan_fused"
|
||||||
|
cloud_topic: "/scan_fused_cloud"
|
||||||
|
frame_id: "base_link"
|
||||||
|
front_sector_angle: 45.0
|
||||||
|
depth_range_multiplier: 0.9
|
||||||
|
max_range_limit: 5.0
|
||||||
34
jetson/ros2_ws/src/saltybot_sensor_fusion/package.xml
Normal file
34
jetson/ros2_ws/src/saltybot_sensor_fusion/package.xml
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_sensor_fusion</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
Multi-sensor fusion for obstacle avoidance (Issue #490).
|
||||||
|
Fuses RPLIDAR A1M8 (360° 2D) + RealSense D435i depth (front 87° 3D).
|
||||||
|
Publishes unified /scan_fused and PointCloud2 for costmap voxel layer.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>message_filters</depend>
|
||||||
|
<depend>tf2_ros</depend>
|
||||||
|
<depend>cv_bridge</depend>
|
||||||
|
<depend>opencv-python</depend>
|
||||||
|
<depend>numpy</depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,197 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Multi-Sensor Fusion Node for Obstacle Avoidance
|
||||||
|
Issue #490: Fuse RPLIDAR A1M8 (360° 2D) + RealSense D435i (front 87° 3D)
|
||||||
|
|
||||||
|
Fuses:
|
||||||
|
- /scan: RPLIDAR A1M8 (360° 2D LaserScan, ~5.5Hz)
|
||||||
|
- /depth_scan: RealSense D435i depth_to_laserscan (front 87°, ~30Hz)
|
||||||
|
|
||||||
|
Outputs:
|
||||||
|
- /scan_fused: Unified LaserScan (360°, high-fidelity front + LIDAR sides/rear)
|
||||||
|
- /scan_fused_cloud: PointCloud2 for 3D costmap voxel layer
|
||||||
|
|
||||||
|
Strategy:
|
||||||
|
- Front sector (±45°): Prefer depth_scan (closer range, 3D data) if available
|
||||||
|
- Sides/rear: Use LIDAR only
|
||||||
|
"""
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import math
|
||||||
|
from typing import Optional
|
||||||
|
import logging
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||||
|
import message_filters
|
||||||
|
|
||||||
|
from sensor_msgs.msg import LaserScan, PointCloud2, PointField
|
||||||
|
from std_msgs.msg import Header
|
||||||
|
|
||||||
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_SENSOR_QOS = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=5,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _create_pointcloud2_from_laserscan(scan: LaserScan, frame_id: str) -> PointCloud2:
|
||||||
|
"""Convert LaserScan to PointCloud2."""
|
||||||
|
points = []
|
||||||
|
for i, range_val in enumerate(scan.ranges):
|
||||||
|
if scan.range_min <= range_val <= scan.range_max:
|
||||||
|
angle = scan.angle_min + i * scan.angle_increment
|
||||||
|
x = range_val * math.cos(angle)
|
||||||
|
y = range_val * math.sin(angle)
|
||||||
|
z = 0.0
|
||||||
|
intensity = scan.intensities[i] if i < len(scan.intensities) else 0.0
|
||||||
|
points.append([x, y, z, intensity])
|
||||||
|
|
||||||
|
if not points:
|
||||||
|
points = [[0, 0, 0, 0]]
|
||||||
|
|
||||||
|
points_array = np.array(points, dtype=np.float32)
|
||||||
|
|
||||||
|
# Create PointCloud2 message
|
||||||
|
cloud = PointCloud2()
|
||||||
|
cloud.header = Header(frame_id=frame_id, stamp=scan.header.stamp)
|
||||||
|
cloud.height = 1
|
||||||
|
cloud.width = len(points)
|
||||||
|
cloud.is_bigendian = False
|
||||||
|
cloud.point_step = 16
|
||||||
|
cloud.row_step = cloud.point_step * cloud.width
|
||||||
|
|
||||||
|
cloud.fields = [
|
||||||
|
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
|
||||||
|
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
|
||||||
|
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
|
||||||
|
PointField(name='intensity', offset=12, datatype=PointField.FLOAT32, count=1),
|
||||||
|
]
|
||||||
|
|
||||||
|
cloud.data = points_array.astype(np.float32).tobytes()
|
||||||
|
return cloud
|
||||||
|
|
||||||
|
|
||||||
|
class SensorFusionNode(Node):
|
||||||
|
"""Multi-sensor fusion for obstacle avoidance."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("sensor_fusion")
|
||||||
|
|
||||||
|
self.declare_parameter("lidar_topic", "/scan")
|
||||||
|
self.declare_parameter("depth_topic", "/depth_scan")
|
||||||
|
self.declare_parameter("fused_topic", "/scan_fused")
|
||||||
|
self.declare_parameter("cloud_topic", "/scan_fused_cloud")
|
||||||
|
self.declare_parameter("frame_id", "base_link")
|
||||||
|
self.declare_parameter("front_sector_angle", 45.0)
|
||||||
|
self.declare_parameter("depth_range_multiplier", 0.9)
|
||||||
|
self.declare_parameter("max_range_limit", 5.0)
|
||||||
|
|
||||||
|
self.lidar_topic = self.get_parameter("lidar_topic").value
|
||||||
|
self.depth_topic = self.get_parameter("depth_topic").value
|
||||||
|
self.fused_topic = self.get_parameter("fused_topic").value
|
||||||
|
self.cloud_topic = self.get_parameter("cloud_topic").value
|
||||||
|
self.frame_id = self.get_parameter("frame_id").value
|
||||||
|
self.front_sector_angle = self.get_parameter("front_sector_angle").value
|
||||||
|
self.depth_range_multiplier = self.get_parameter("depth_range_multiplier").value
|
||||||
|
self.max_range_limit = self.get_parameter("max_range_limit").value
|
||||||
|
|
||||||
|
# Subscriptions with message_filters sync
|
||||||
|
lidar_sub = message_filters.Subscriber(
|
||||||
|
self, LaserScan, self.lidar_topic, qos_profile=_SENSOR_QOS
|
||||||
|
)
|
||||||
|
depth_sub = message_filters.Subscriber(
|
||||||
|
self, LaserScan, self.depth_topic, qos_profile=_SENSOR_QOS
|
||||||
|
)
|
||||||
|
|
||||||
|
self.sync = message_filters.ApproximateTimeSynchronizer(
|
||||||
|
[lidar_sub, depth_sub], queue_size=10, slop=0.2
|
||||||
|
)
|
||||||
|
self.sync.registerCallback(self._on_scans)
|
||||||
|
|
||||||
|
# Publishers
|
||||||
|
self.fused_scan_pub = self.create_publisher(LaserScan, self.fused_topic, _SENSOR_QOS)
|
||||||
|
self.cloud_pub = self.create_publisher(PointCloud2, self.cloud_topic, _SENSOR_QOS)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"SensorFusionNode initialized: lidar={self.lidar_topic}, depth={self.depth_topic}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _on_scans(self, lidar_scan: LaserScan, depth_scan: LaserScan):
|
||||||
|
"""Fuse synchronized scans."""
|
||||||
|
try:
|
||||||
|
fused = self._fuse_scans(lidar_scan, depth_scan)
|
||||||
|
self.fused_scan_pub.publish(fused)
|
||||||
|
cloud = _create_pointcloud2_from_laserscan(fused, self.frame_id)
|
||||||
|
self.cloud_pub.publish(cloud)
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f"Fusion error: {e}")
|
||||||
|
|
||||||
|
def _fuse_scans(self, lidar_scan: LaserScan, depth_scan: LaserScan) -> LaserScan:
|
||||||
|
"""Fuse LIDAR and depth scans."""
|
||||||
|
fused = LaserScan()
|
||||||
|
fused.header = lidar_scan.header
|
||||||
|
fused.angle_min = lidar_scan.angle_min
|
||||||
|
fused.angle_max = lidar_scan.angle_max
|
||||||
|
fused.angle_increment = lidar_scan.angle_increment
|
||||||
|
fused.time_increment = lidar_scan.time_increment
|
||||||
|
fused.scan_time = lidar_scan.scan_time
|
||||||
|
fused.range_min = min(lidar_scan.range_min, depth_scan.range_min)
|
||||||
|
fused.range_max = min(lidar_scan.range_max, depth_scan.range_max)
|
||||||
|
|
||||||
|
num_rays = len(lidar_scan.ranges)
|
||||||
|
fused.ranges = list(lidar_scan.ranges)
|
||||||
|
fused.intensities = list(lidar_scan.intensities) if lidar_scan.intensities else [0.0] * num_rays
|
||||||
|
|
||||||
|
front_angle_rad = math.radians(self.front_sector_angle)
|
||||||
|
|
||||||
|
# Fuse depth scan into front sector
|
||||||
|
for i in range(len(depth_scan.ranges)):
|
||||||
|
depth_range = depth_scan.ranges[i]
|
||||||
|
|
||||||
|
if not (depth_scan.range_min <= depth_range <= depth_scan.range_max):
|
||||||
|
continue
|
||||||
|
|
||||||
|
depth_angle = depth_scan.angle_min + i * depth_scan.angle_increment
|
||||||
|
|
||||||
|
# Check if in front sector
|
||||||
|
if abs(depth_angle) > front_angle_rad:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Normalize angle
|
||||||
|
if depth_angle < lidar_scan.angle_min:
|
||||||
|
depth_angle += 2 * math.pi
|
||||||
|
if depth_angle > lidar_scan.angle_max:
|
||||||
|
depth_angle -= 2 * math.pi
|
||||||
|
|
||||||
|
# Find closest LIDAR ray
|
||||||
|
lidar_index = int(round((depth_angle - lidar_scan.angle_min) / lidar_scan.angle_increment))
|
||||||
|
lidar_index = max(0, min(num_rays - 1, lidar_index))
|
||||||
|
|
||||||
|
# Apply safety multiplier and take minimum
|
||||||
|
adjusted_depth_range = depth_range * self.depth_range_multiplier
|
||||||
|
adjusted_depth_range = min(adjusted_depth_range, self.max_range_limit)
|
||||||
|
current_range = fused.ranges[lidar_index]
|
||||||
|
|
||||||
|
if adjusted_depth_range < current_range or current_range < fused.range_min:
|
||||||
|
fused.ranges[lidar_index] = adjusted_depth_range
|
||||||
|
|
||||||
|
if i < len(depth_scan.intensities):
|
||||||
|
fused.intensities[lidar_index] = depth_scan.intensities[i]
|
||||||
|
|
||||||
|
return fused
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = SensorFusionNode()
|
||||||
|
rclpy.spin(node)
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
4
jetson/ros2_ws/src/saltybot_sensor_fusion/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_sensor_fusion/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_sensor_fusion
|
||||||
|
[install]
|
||||||
|
install_scripts=$base/lib/saltybot_sensor_fusion
|
||||||
32
jetson/ros2_ws/src/saltybot_sensor_fusion/setup.py
Normal file
32
jetson/ros2_ws/src/saltybot_sensor_fusion/setup.py
Normal file
@ -0,0 +1,32 @@
|
|||||||
|
from setuptools import setup, find_packages
|
||||||
|
|
||||||
|
package_name = 'saltybot_sensor_fusion'
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version='0.1.0',
|
||||||
|
packages=find_packages(exclude=['test']),
|
||||||
|
data_files=[
|
||||||
|
('share/ament_index/resource_index/packages',
|
||||||
|
['resource/' + package_name]),
|
||||||
|
('share/' + package_name, ['package.xml']),
|
||||||
|
('share/' + package_name + '/launch', [
|
||||||
|
'launch/sensor_fusion.launch.py',
|
||||||
|
]),
|
||||||
|
('share/' + package_name + '/config', [
|
||||||
|
'config/sensor_fusion_params.yaml',
|
||||||
|
]),
|
||||||
|
],
|
||||||
|
install_requires=['setuptools'],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer='sl-perception',
|
||||||
|
maintainer_email='sl-perception@saltylab.local',
|
||||||
|
description='Multi-sensor fusion for obstacle avoidance',
|
||||||
|
license='MIT',
|
||||||
|
tests_require=['pytest'],
|
||||||
|
entry_points={
|
||||||
|
'console_scripts': [
|
||||||
|
'sensor_fusion = saltybot_sensor_fusion.sensor_fusion_node:main',
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
155
jetson/ros2_ws/src/saltybot_tts_personality/README.md
Normal file
155
jetson/ros2_ws/src/saltybot_tts_personality/README.md
Normal file
@ -0,0 +1,155 @@
|
|||||||
|
# TTS Personality Engine (Issue #494)
|
||||||
|
|
||||||
|
Context-aware text-to-speech with emotion-based rate/pitch modulation for SaltyBot.
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
### Context-Aware Responses
|
||||||
|
- **Time-based greetings**: Different greetings for morning, afternoon, evening, night
|
||||||
|
- **Person recognition**: Uses detected person names in responses
|
||||||
|
- **Emotion-driven speech**: Adjusts rate and pitch based on current emotion state
|
||||||
|
- **Personality identity**: Configurable personality name (default: "Luna")
|
||||||
|
|
||||||
|
### Priority Queue Management
|
||||||
|
- **Safety (Priority 3)**: Emergency/safety messages (immediate)
|
||||||
|
- **Social (Priority 2)**: Greetings and interactions (high priority)
|
||||||
|
- **Idle (Priority 1)**: Commentary and chatter (medium priority)
|
||||||
|
- **Normal (Priority 0)**: Default messages
|
||||||
|
|
||||||
|
### Emotion-Based Rate/Pitch Modulation
|
||||||
|
- **Happy**: Faster (1.1x) with higher pitch (1.15x)
|
||||||
|
- **Sad**: Slower (0.9x) with lower pitch (0.85x)
|
||||||
|
- **Neutral**: Normal speech (1.0x)
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
### Node: `tts_personality_node`
|
||||||
|
|
||||||
|
**Subscribes**
|
||||||
|
- `/saltybot/tts_request` - Text to synthesize with personality
|
||||||
|
- `/saltybot/emotion_state` - Current emotion ("happy", "sad", "angry", "neutral")
|
||||||
|
- `/saltybot/person_detected` - Detected known person name
|
||||||
|
|
||||||
|
**Publishes**
|
||||||
|
- `/saltybot/tts_command` - Formatted TTS command with personality parameters (JSON)
|
||||||
|
- `/saltybot/personality_state` - Current personality state
|
||||||
|
|
||||||
|
### Integration Flow
|
||||||
|
|
||||||
|
```
|
||||||
|
tts_request → personality_node → apply context/emotion → tts_service → audio device
|
||||||
|
(Jabra SPEAK 810)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
### Parameters (tts_personality_params.yaml)
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
personality_name: "Luna" # Robot's personality name
|
||||||
|
enable_context_aware: true # Use time/person/emotion context
|
||||||
|
|
||||||
|
# Emotion modulation
|
||||||
|
speed_happy: 1.1 # Rate multiplier when happy
|
||||||
|
speed_sad: 0.9 # Rate multiplier when sad
|
||||||
|
pitch_happy: 1.15 # Pitch multiplier when happy
|
||||||
|
pitch_sad: 0.85 # Pitch multiplier when sad
|
||||||
|
|
||||||
|
# Time-based greetings
|
||||||
|
greetings_morning: [...] # 5am-12pm
|
||||||
|
greetings_afternoon: [...] # 12pm-5pm
|
||||||
|
greetings_evening: [...] # 5pm-9pm
|
||||||
|
greetings_night: [...] # 9pm-5am
|
||||||
|
|
||||||
|
# Known people for personalization
|
||||||
|
known_people:
|
||||||
|
person_001: "Seb"
|
||||||
|
person_002: "Alex"
|
||||||
|
```
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
### Launch
|
||||||
|
```bash
|
||||||
|
ros2 launch saltybot_tts_personality tts_personality.launch.py personality_name:=Luna
|
||||||
|
```
|
||||||
|
|
||||||
|
### Send TTS Request
|
||||||
|
```bash
|
||||||
|
ros2 topic pub /saltybot/tts_request std_msgs/String "data: 'hello world'"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Update Emotion
|
||||||
|
```bash
|
||||||
|
ros2 topic pub /saltybot/emotion_state std_msgs/String "data: 'happy'"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Detect Person
|
||||||
|
```bash
|
||||||
|
ros2 topic pub /saltybot/person_detected std_msgs/String "data: 'Seb'"
|
||||||
|
```
|
||||||
|
|
||||||
|
### Monitor Personality State
|
||||||
|
```bash
|
||||||
|
ros2 topic echo /saltybot/personality_state
|
||||||
|
```
|
||||||
|
|
||||||
|
## Example Personalities
|
||||||
|
|
||||||
|
### Luna (Default)
|
||||||
|
- Morning: "Good morning! I hope you slept well."
|
||||||
|
- Happy speech: Faster, higher pitch
|
||||||
|
- Emotion context: Uses detected mood
|
||||||
|
|
||||||
|
### Alternative: Customize in Config
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
personality_name: "Atlas"
|
||||||
|
greetings_morning:
|
||||||
|
- "Systems online. Good morning, Commander."
|
||||||
|
- "Wake-up sequence initiated."
|
||||||
|
speed_happy: 1.2 # Atlas speaks faster when happy
|
||||||
|
```
|
||||||
|
|
||||||
|
## Integration with Other Components
|
||||||
|
|
||||||
|
### Emotion Engine (Issue #429)
|
||||||
|
- Listens to `/saltybot/emotion_state`
|
||||||
|
- Adjusts rate/pitch based on emotion
|
||||||
|
- Integrates with personality for consistent expression
|
||||||
|
|
||||||
|
### TTS Service (Issue #421)
|
||||||
|
- Receives formatted TTS commands via `/saltybot/tts_command`
|
||||||
|
- Handles actual speech synthesis with Piper-TTS
|
||||||
|
- Outputs to Jabra SPEAK 810 audio device
|
||||||
|
|
||||||
|
### Person Tracking
|
||||||
|
- Subscribes to detected person names
|
||||||
|
- Personalizes responses with known person names
|
||||||
|
- Maintains recent person context (30-second window)
|
||||||
|
|
||||||
|
## Testing
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Run unit tests
|
||||||
|
colcon test --packages-select saltybot_tts_personality
|
||||||
|
|
||||||
|
# Manual testing
|
||||||
|
ros2 launch saltybot_tts_personality tts_personality.launch.py
|
||||||
|
ros2 topic pub /saltybot/tts_request std_msgs/String "data: 'test message'"
|
||||||
|
```
|
||||||
|
|
||||||
|
## Safety Considerations
|
||||||
|
|
||||||
|
- Safety messages cannot be overridden by emotions (always use neutral rate/pitch)
|
||||||
|
- Queue has maximum size of 16 items to prevent memory exhaustion
|
||||||
|
- Emergency/safety keywords trigger highest priority queue level
|
||||||
|
- Personality does not interfere with critical safety protocols
|
||||||
|
|
||||||
|
## Future Enhancements
|
||||||
|
|
||||||
|
- [ ] Machine learning-based emotion detection from voice analysis
|
||||||
|
- [ ] Multi-language support
|
||||||
|
- [ ] Voice variants (male, female, child)
|
||||||
|
- [ ] Custom personality training from recordings
|
||||||
|
- [ ] Integration with dialogue system for context-aware responses
|
||||||
@ -0,0 +1,58 @@
|
|||||||
|
# TTS Personality Engine Configuration (Issue #494)
|
||||||
|
|
||||||
|
tts_personality:
|
||||||
|
ros__parameters:
|
||||||
|
# Personality identity
|
||||||
|
personality_name: "Luna"
|
||||||
|
enable_context_aware: true
|
||||||
|
|
||||||
|
# Emotion-based rate/pitch modulation
|
||||||
|
# Rate: <1.0 = slower, >1.0 = faster (1.0 = normal)
|
||||||
|
# Pitch: <1.0 = lower, >1.0 = higher (1.0 = normal)
|
||||||
|
speed_happy: 1.1 # Faster speech when happy
|
||||||
|
speed_sad: 0.9 # Slower speech when sad
|
||||||
|
speed_neutral: 1.0 # Normal speed
|
||||||
|
|
||||||
|
pitch_happy: 1.15 # Higher pitch when happy (more energetic)
|
||||||
|
pitch_sad: 0.85 # Lower pitch when sad (more somber)
|
||||||
|
pitch_neutral: 1.0 # Normal pitch
|
||||||
|
|
||||||
|
# Context-aware greetings by time of day
|
||||||
|
# Morning: 5am-12pm
|
||||||
|
greetings_morning:
|
||||||
|
- "Good morning! I hope you slept well."
|
||||||
|
- "Rise and shine! Ready for a great day?"
|
||||||
|
- "Morning! Time to get moving."
|
||||||
|
- "Good morning! Coffee ready?"
|
||||||
|
- "Hello! Fresh start to the day?"
|
||||||
|
|
||||||
|
# Afternoon: 12pm-5pm
|
||||||
|
greetings_afternoon:
|
||||||
|
- "Good afternoon! How's your day going?"
|
||||||
|
- "Hey there! Still going strong?"
|
||||||
|
- "Afternoon! Taking a break?"
|
||||||
|
- "Hello! Hope things are productive."
|
||||||
|
- "Hi! Middle of the day check-in."
|
||||||
|
|
||||||
|
# Evening: 5pm-9pm
|
||||||
|
greetings_evening:
|
||||||
|
- "Good evening! Wrapping up for the day?"
|
||||||
|
- "Evening! Ready to relax?"
|
||||||
|
- "Hi! How was your day?"
|
||||||
|
- "Good evening! Dinnertime approaching?"
|
||||||
|
- "Hello! Evening already?"
|
||||||
|
|
||||||
|
# Night: 9pm-5am
|
||||||
|
greetings_night:
|
||||||
|
- "Burning the midnight oil?"
|
||||||
|
- "Still awake? Sweet dreams coming soon?"
|
||||||
|
- "Night owl mode activated!"
|
||||||
|
- "Late night work session?"
|
||||||
|
- "Hello! Can't sleep either?"
|
||||||
|
|
||||||
|
# Known people for personalization
|
||||||
|
# Maps person ID/name to friendly name
|
||||||
|
known_people:
|
||||||
|
person_001: "Seb"
|
||||||
|
person_002: "Alex"
|
||||||
|
person_003: "Jordan"
|
||||||
@ -0,0 +1,46 @@
|
|||||||
|
"""
|
||||||
|
tts_personality.launch.py — Launch the TTS personality engine
|
||||||
|
|
||||||
|
Usage
|
||||||
|
-----
|
||||||
|
ros2 launch saltybot_tts_personality tts_personality.launch.py
|
||||||
|
|
||||||
|
Note: Requires saltybot_tts_service to be running for actual TTS synthesis.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
pkg_share = get_package_share_directory("saltybot_tts_personality")
|
||||||
|
default_params = os.path.join(pkg_share, "config", "tts_personality_params.yaml")
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"params_file",
|
||||||
|
default_value=default_params,
|
||||||
|
description="Path to tts_personality_params.yaml",
|
||||||
|
),
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"personality_name",
|
||||||
|
default_value="Luna",
|
||||||
|
description="Personality name/identity",
|
||||||
|
),
|
||||||
|
Node(
|
||||||
|
package="saltybot_tts_personality",
|
||||||
|
executable="tts_personality_node",
|
||||||
|
name="tts_personality",
|
||||||
|
output="screen",
|
||||||
|
parameters=[
|
||||||
|
LaunchConfiguration("params_file"),
|
||||||
|
{
|
||||||
|
"personality_name": LaunchConfiguration("personality_name"),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
])
|
||||||
@ -0,0 +1,352 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
tts_personality_node.py — TTS personality engine with context-aware greetings (Issue #494)
|
||||||
|
|
||||||
|
Provides personality-driven text-to-speech with:
|
||||||
|
- Context-aware greetings (time of day, person names, emotion state)
|
||||||
|
- Priority queue management (safety > social > idle chatter)
|
||||||
|
- Rate/pitch modulation based on emotion
|
||||||
|
- Integration with emotion engine (Issue #429)
|
||||||
|
- Configurable personality parameters
|
||||||
|
|
||||||
|
Subscribes
|
||||||
|
──────────
|
||||||
|
/saltybot/tts_request std_msgs/String — natural language text
|
||||||
|
/saltybot/emotion_state std_msgs/String — emotion ("happy", "sad", "angry", "neutral")
|
||||||
|
/saltybot/person_detected std_msgs/String — known person name
|
||||||
|
|
||||||
|
Publishers
|
||||||
|
──────────
|
||||||
|
/saltybot/tts_command std_msgs/String — formatted TTS command with personality
|
||||||
|
/saltybot/personality_state std_msgs/String — current personality state
|
||||||
|
|
||||||
|
Parameters
|
||||||
|
──────────
|
||||||
|
personality_name str "Luna" — robot's name/personality
|
||||||
|
enable_context_aware bool True — use time/person/emotion context
|
||||||
|
speed_happy float 1.1 — faster speech when happy
|
||||||
|
speed_sad float 0.9 — slower speech when sad
|
||||||
|
pitch_happy float 1.15 — higher pitch when happy
|
||||||
|
pitch_sad float 0.85 — lower pitch when sad
|
||||||
|
pitch_neutral float 1.0
|
||||||
|
greetings_morning list [] — greetings for 5am-12pm
|
||||||
|
greetings_afternoon list [] — greetings for 12pm-5pm
|
||||||
|
greetings_evening list [] — greetings for 5pm-9pm
|
||||||
|
greetings_night list [] — greetings for 9pm-5am
|
||||||
|
known_people dict {} — person names for personalization
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import queue
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Optional, Dict, List
|
||||||
|
import random
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile
|
||||||
|
from std_msgs.msg import String
|
||||||
|
|
||||||
|
|
||||||
|
class PersonalityContext:
|
||||||
|
"""Context for personality-aware responses."""
|
||||||
|
def __init__(self):
|
||||||
|
self.current_emotion = "neutral"
|
||||||
|
self.last_person = None
|
||||||
|
self.emotion_timestamp = time.time()
|
||||||
|
self.person_timestamp = time.time()
|
||||||
|
self.interaction_count = 0
|
||||||
|
|
||||||
|
def update_emotion(self, emotion: str) -> None:
|
||||||
|
"""Update current emotion state."""
|
||||||
|
if emotion != self.current_emotion:
|
||||||
|
self.current_emotion = emotion
|
||||||
|
self.emotion_timestamp = time.time()
|
||||||
|
|
||||||
|
def update_person(self, person_name: str) -> None:
|
||||||
|
"""Update last detected person."""
|
||||||
|
if person_name != self.last_person:
|
||||||
|
self.last_person = person_name
|
||||||
|
self.person_timestamp = time.time()
|
||||||
|
|
||||||
|
def get_time_period(self) -> str:
|
||||||
|
"""Return time period: morning, afternoon, evening, or night."""
|
||||||
|
hour = datetime.now().hour
|
||||||
|
if 5 <= hour < 12:
|
||||||
|
return "morning"
|
||||||
|
elif 12 <= hour < 17:
|
||||||
|
return "afternoon"
|
||||||
|
elif 17 <= hour < 21:
|
||||||
|
return "evening"
|
||||||
|
else:
|
||||||
|
return "night"
|
||||||
|
|
||||||
|
|
||||||
|
class TtsPersonalityNode(Node):
|
||||||
|
"""TTS personality engine with context-aware responses."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("tts_personality")
|
||||||
|
|
||||||
|
# Personality parameters
|
||||||
|
self.declare_parameter("personality_name", "Luna")
|
||||||
|
self.declare_parameter("enable_context_aware", True)
|
||||||
|
|
||||||
|
# Emotion-based rate/pitch modulation
|
||||||
|
self.declare_parameter("speed_happy", 1.1)
|
||||||
|
self.declare_parameter("speed_sad", 0.9)
|
||||||
|
self.declare_parameter("speed_neutral", 1.0)
|
||||||
|
self.declare_parameter("pitch_happy", 1.15)
|
||||||
|
self.declare_parameter("pitch_sad", 0.85)
|
||||||
|
self.declare_parameter("pitch_neutral", 1.0)
|
||||||
|
|
||||||
|
# Greeting templates (by time of day)
|
||||||
|
self.declare_parameter("greetings_morning", [
|
||||||
|
"Good morning! I hope you slept well.",
|
||||||
|
"Rise and shine! Ready for a great day?",
|
||||||
|
"Morning! Let's get moving.",
|
||||||
|
])
|
||||||
|
self.declare_parameter("greetings_afternoon", [
|
||||||
|
"Afternoon! How's it going?",
|
||||||
|
"Hey there! Hope your day is great so far.",
|
||||||
|
"Afternoon check-in! Everything okay?",
|
||||||
|
])
|
||||||
|
self.declare_parameter("greetings_evening", [
|
||||||
|
"Good evening! Time to wind down?",
|
||||||
|
"Evening! Getting ready for tomorrow?",
|
||||||
|
"Hi! How was your day?",
|
||||||
|
])
|
||||||
|
self.declare_parameter("greetings_night", [
|
||||||
|
"Burning the midnight oil?",
|
||||||
|
"Still awake? Sweet dreams coming soon?",
|
||||||
|
"Night owl mode activated!",
|
||||||
|
])
|
||||||
|
|
||||||
|
# Known people for personalization
|
||||||
|
self.declare_parameter("known_people", {})
|
||||||
|
|
||||||
|
# Load parameters
|
||||||
|
self._personality_name = self.get_parameter("personality_name").value
|
||||||
|
self._context_aware = self.get_parameter("enable_context_aware").value
|
||||||
|
self._speed_happy = self.get_parameter("speed_happy").value
|
||||||
|
self._speed_sad = self.get_parameter("speed_sad").value
|
||||||
|
self._speed_neutral = self.get_parameter("speed_neutral").value
|
||||||
|
self._pitch_happy = self.get_parameter("pitch_happy").value
|
||||||
|
self._pitch_sad = self.get_parameter("pitch_sad").value
|
||||||
|
self._pitch_neutral = self.get_parameter("pitch_neutral").value
|
||||||
|
self._greetings_morning = self.get_parameter("greetings_morning").value
|
||||||
|
self._greetings_afternoon = self.get_parameter("greetings_afternoon").value
|
||||||
|
self._greetings_evening = self.get_parameter("greetings_evening").value
|
||||||
|
self._greetings_night = self.get_parameter("greetings_night").value
|
||||||
|
self._known_people = self.get_parameter("known_people").value
|
||||||
|
|
||||||
|
# Personality context
|
||||||
|
self._context = PersonalityContext()
|
||||||
|
self._context_lock = threading.Lock()
|
||||||
|
|
||||||
|
# Priority levels for queue
|
||||||
|
self.PRIORITY_SAFETY = 3 # Emergency/safety messages
|
||||||
|
self.PRIORITY_SOCIAL = 2 # Social interactions, greetings
|
||||||
|
self.PRIORITY_IDLE = 1 # Idle chatter, commentary
|
||||||
|
self.PRIORITY_NORMAL = 0 # Default
|
||||||
|
|
||||||
|
# Queue for TTS requests with priority
|
||||||
|
self._tts_queue = queue.PriorityQueue(maxsize=16)
|
||||||
|
self._state = "idle"
|
||||||
|
self._state_lock = threading.Lock()
|
||||||
|
|
||||||
|
# QoS profile
|
||||||
|
qos = QoSProfile(depth=5)
|
||||||
|
|
||||||
|
# Subscribers
|
||||||
|
self.create_subscription(
|
||||||
|
String, "/saltybot/tts_request",
|
||||||
|
self._on_tts_request, qos)
|
||||||
|
|
||||||
|
self.create_subscription(
|
||||||
|
String, "/saltybot/emotion_state",
|
||||||
|
self._on_emotion_update, qos)
|
||||||
|
|
||||||
|
self.create_subscription(
|
||||||
|
String, "/saltybot/person_detected",
|
||||||
|
self._on_person_detected, qos)
|
||||||
|
|
||||||
|
# Publishers
|
||||||
|
self._tts_command_pub = self.create_publisher(
|
||||||
|
String, "/saltybot/tts_command", qos)
|
||||||
|
|
||||||
|
self._personality_state_pub = self.create_publisher(
|
||||||
|
String, "/saltybot/personality_state", qos)
|
||||||
|
|
||||||
|
# Worker thread for processing queue
|
||||||
|
self._worker_thread = threading.Thread(
|
||||||
|
target=self._worker_loop, daemon=True)
|
||||||
|
self._worker_thread.start()
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"TTS Personality Engine '{self._personality_name}' started. "
|
||||||
|
f"Context-aware: {self._context_aware}")
|
||||||
|
|
||||||
|
def _on_tts_request(self, msg: String) -> None:
|
||||||
|
"""Handle incoming TTS request."""
|
||||||
|
try:
|
||||||
|
priority = self.PRIORITY_NORMAL
|
||||||
|
text = msg.data
|
||||||
|
|
||||||
|
# Determine priority based on keywords
|
||||||
|
if any(w in text.lower() for w in ["safety", "stop", "danger", "emergency"]):
|
||||||
|
priority = self.PRIORITY_SAFETY
|
||||||
|
elif any(w in text.lower() for w in ["hello", "hey", "hi", "greeting"]):
|
||||||
|
priority = self.PRIORITY_SOCIAL
|
||||||
|
|
||||||
|
# Add to queue with timestamp for FIFO within priority
|
||||||
|
item = (priority, time.time(), text)
|
||||||
|
self._tts_queue.put(item, block=False)
|
||||||
|
|
||||||
|
except queue.Full:
|
||||||
|
self.get_logger().warn("TTS queue full, dropping request")
|
||||||
|
|
||||||
|
def _on_emotion_update(self, msg: String) -> None:
|
||||||
|
"""Handle emotion state updates."""
|
||||||
|
with self._context_lock:
|
||||||
|
self._context.update_emotion(msg.data)
|
||||||
|
self.get_logger().debug(f"Emotion updated: {msg.data}")
|
||||||
|
|
||||||
|
def _on_person_detected(self, msg: String) -> None:
|
||||||
|
"""Handle person detection events."""
|
||||||
|
with self._context_lock:
|
||||||
|
self._context.update_person(msg.data)
|
||||||
|
self.get_logger().debug(f"Person detected: {msg.data}")
|
||||||
|
|
||||||
|
def _apply_personality(self, text: str) -> str:
|
||||||
|
"""
|
||||||
|
Apply personality modifications to text.
|
||||||
|
Returns: modified text with personality context.
|
||||||
|
"""
|
||||||
|
with self._context_lock:
|
||||||
|
emotion = self._context.current_emotion
|
||||||
|
person = self._context.last_person
|
||||||
|
time_period = self._context.get_time_period()
|
||||||
|
|
||||||
|
# Don't modify safety-critical messages
|
||||||
|
if any(w in text.lower() for w in ["stop", "danger", "emergency"]):
|
||||||
|
return text
|
||||||
|
|
||||||
|
# Add personalization based on context
|
||||||
|
modified_text = text
|
||||||
|
if self._context_aware:
|
||||||
|
# Add person name if recently detected
|
||||||
|
if person and (time.time() - self._context.person_timestamp < 30):
|
||||||
|
modified_text = f"{person}, {modified_text}"
|
||||||
|
|
||||||
|
return modified_text
|
||||||
|
|
||||||
|
def _get_emotion_modulation(self) -> tuple[float, float]:
|
||||||
|
"""
|
||||||
|
Get speed and pitch multipliers based on emotion.
|
||||||
|
Returns: (speed_multiplier, pitch_multiplier)
|
||||||
|
"""
|
||||||
|
with self._context_lock:
|
||||||
|
emotion = self._context.current_emotion
|
||||||
|
|
||||||
|
if emotion == "happy":
|
||||||
|
return (self._speed_happy, self._pitch_happy)
|
||||||
|
elif emotion == "sad":
|
||||||
|
return (self._speed_sad, self._pitch_sad)
|
||||||
|
else: # neutral
|
||||||
|
return (self._speed_neutral, self._pitch_neutral)
|
||||||
|
|
||||||
|
def _format_tts_command(self, text: str) -> str:
|
||||||
|
"""
|
||||||
|
Format text with personality parameters for TTS service.
|
||||||
|
Returns: JSON-formatted TTS command with rate/pitch modulation.
|
||||||
|
"""
|
||||||
|
speed, pitch = self._get_emotion_modulation()
|
||||||
|
|
||||||
|
# Format as JSON-compatible string for tts_service
|
||||||
|
# tts_service will parse: {"text": "...", "speed": 1.1, "pitch": 1.15}
|
||||||
|
import json
|
||||||
|
command = {
|
||||||
|
"text": text,
|
||||||
|
"speed": float(speed),
|
||||||
|
"pitch": float(pitch),
|
||||||
|
"personality": self._personality_name,
|
||||||
|
"emotion": self._context.current_emotion,
|
||||||
|
}
|
||||||
|
return json.dumps(command)
|
||||||
|
|
||||||
|
def _update_state(self, new_state: str) -> None:
|
||||||
|
"""Update and publish personality state."""
|
||||||
|
with self._state_lock:
|
||||||
|
if new_state != self._state:
|
||||||
|
self._state = new_state
|
||||||
|
state_msg = String(data=f"{new_state}:{self._personality_name}:{self._context.current_emotion}")
|
||||||
|
self._personality_state_pub.publish(state_msg)
|
||||||
|
|
||||||
|
def _worker_loop(self) -> None:
|
||||||
|
"""Worker thread loop to process TTS queue."""
|
||||||
|
while rclpy.ok():
|
||||||
|
try:
|
||||||
|
# Get next item from queue (blocking, 1s timeout)
|
||||||
|
priority, timestamp, text = self._tts_queue.get(timeout=1.0)
|
||||||
|
|
||||||
|
self._update_state("processing")
|
||||||
|
|
||||||
|
# Apply personality modifications
|
||||||
|
personalized_text = self._apply_personality(text)
|
||||||
|
|
||||||
|
# Format with emotion modulation
|
||||||
|
tts_command = self._format_tts_command(personalized_text)
|
||||||
|
|
||||||
|
# Publish to TTS service
|
||||||
|
cmd_msg = String(data=tts_command)
|
||||||
|
self._tts_command_pub.publish(cmd_msg)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"[{priority}] Speaking: {personalized_text[:60]}...")
|
||||||
|
|
||||||
|
# Small delay to avoid queue hammering
|
||||||
|
time.sleep(0.1)
|
||||||
|
self._update_state("idle")
|
||||||
|
|
||||||
|
except queue.Empty:
|
||||||
|
self._update_state("idle")
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f"Worker error: {e}")
|
||||||
|
|
||||||
|
def _generate_greeting(self) -> str:
|
||||||
|
"""Generate context-aware greeting."""
|
||||||
|
with self._context_lock:
|
||||||
|
time_period = self._context.get_time_period()
|
||||||
|
|
||||||
|
greetings_map = {
|
||||||
|
"morning": self._greetings_morning,
|
||||||
|
"afternoon": self._greetings_afternoon,
|
||||||
|
"evening": self._greetings_evening,
|
||||||
|
"night": self._greetings_night,
|
||||||
|
}
|
||||||
|
|
||||||
|
greetings = greetings_map.get(time_period, self._greetings_afternoon)
|
||||||
|
if greetings:
|
||||||
|
return random.choice(greetings)
|
||||||
|
return "Hello!"
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
"""Main entry point."""
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = TtsPersonalityNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -0,0 +1,57 @@
|
|||||||
|
"""
|
||||||
|
test_tts_personality.py — Tests for TTS personality engine
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from saltybot_tts_personality.tts_personality_node import PersonalityContext
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
|
def test_personality_context_emotion():
|
||||||
|
"""Test emotion state management."""
|
||||||
|
context = PersonalityContext()
|
||||||
|
assert context.current_emotion == "neutral"
|
||||||
|
|
||||||
|
context.update_emotion("happy")
|
||||||
|
assert context.current_emotion == "happy"
|
||||||
|
|
||||||
|
context.update_emotion("sad")
|
||||||
|
assert context.current_emotion == "sad"
|
||||||
|
|
||||||
|
|
||||||
|
def test_personality_context_person():
|
||||||
|
"""Test person tracking."""
|
||||||
|
context = PersonalityContext()
|
||||||
|
assert context.last_person is None
|
||||||
|
|
||||||
|
context.update_person("Alice")
|
||||||
|
assert context.last_person == "Alice"
|
||||||
|
|
||||||
|
context.update_person("Bob")
|
||||||
|
assert context.last_person == "Bob"
|
||||||
|
|
||||||
|
|
||||||
|
def test_time_period_classification():
|
||||||
|
"""Test time period classification."""
|
||||||
|
context = PersonalityContext()
|
||||||
|
|
||||||
|
# These tests depend on current time, so we just verify the function works
|
||||||
|
period = context.get_time_period()
|
||||||
|
assert period in ["morning", "afternoon", "evening", "night"]
|
||||||
|
|
||||||
|
|
||||||
|
def test_personality_context_timestamps():
|
||||||
|
"""Test timestamp tracking for context validity."""
|
||||||
|
context = PersonalityContext()
|
||||||
|
import time
|
||||||
|
|
||||||
|
ts_before = context.emotion_timestamp
|
||||||
|
time.sleep(0.01)
|
||||||
|
context.update_emotion("happy")
|
||||||
|
ts_after = context.emotion_timestamp
|
||||||
|
|
||||||
|
assert ts_after > ts_before
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
pytest.main([__file__, "-v"])
|
||||||
Loading…
x
Reference in New Issue
Block a user