Compare commits
7 Commits
b2d76b434b
...
3cd9faeed9
| Author | SHA1 | Date | |
|---|---|---|---|
| 3cd9faeed9 | |||
| 5e40504297 | |||
| a55cd9c97f | |||
| a16cc06d79 | |||
| 8f51390e43 | |||
| 32857435a1 | |||
| 7d7f1c0e5b |
146
include/buzzer.h
Normal file
146
include/buzzer.h
Normal file
@ -0,0 +1,146 @@
|
||||
#ifndef BUZZER_H
|
||||
#define BUZZER_H
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
|
||||
/*
|
||||
* buzzer.h — Piezo buzzer melody driver (Issue #253)
|
||||
*
|
||||
* STM32F722 driver for piezo buzzer on PA8 using TIM1 PWM.
|
||||
* Plays predefined melodies and tones with non-blocking queue.
|
||||
*
|
||||
* Pin: PA8 (TIM1_CH1, alternate function AF1)
|
||||
* PWM Frequency: 1kHz-5kHz base, modulated for melody
|
||||
* Volume: Controlled via PWM duty cycle (50-100%)
|
||||
*/
|
||||
|
||||
/* Musical note frequencies (Hz) — standard equal temperament */
|
||||
typedef enum {
|
||||
NOTE_REST = 0, /* Silence */
|
||||
NOTE_C4 = 262, /* Middle C */
|
||||
NOTE_D4 = 294,
|
||||
NOTE_E4 = 330,
|
||||
NOTE_F4 = 349,
|
||||
NOTE_G4 = 392,
|
||||
NOTE_A4 = 440, /* A4 concert pitch */
|
||||
NOTE_B4 = 494,
|
||||
NOTE_C5 = 523,
|
||||
NOTE_D5 = 587,
|
||||
NOTE_E5 = 659,
|
||||
NOTE_F5 = 698,
|
||||
NOTE_G5 = 784,
|
||||
NOTE_A5 = 880,
|
||||
NOTE_B5 = 988,
|
||||
NOTE_C6 = 1047,
|
||||
} Note;
|
||||
|
||||
/* Note duration (milliseconds) */
|
||||
typedef enum {
|
||||
DURATION_WHOLE = 2000, /* 4 beats @ 120 BPM */
|
||||
DURATION_HALF = 1000, /* 2 beats */
|
||||
DURATION_QUARTER = 500, /* 1 beat */
|
||||
DURATION_EIGHTH = 250, /* 1/2 beat */
|
||||
DURATION_SIXTEENTH = 125, /* 1/4 beat */
|
||||
} Duration;
|
||||
|
||||
/* Melody sequence: array of (note, duration) pairs, terminated with {0, 0} */
|
||||
typedef struct {
|
||||
Note frequency;
|
||||
Duration duration_ms;
|
||||
} MelodyNote;
|
||||
|
||||
/* Predefined melodies */
|
||||
typedef enum {
|
||||
MELODY_STARTUP, /* Startup jingle: ascending tones */
|
||||
MELODY_LOW_BATTERY, /* Warning: two descending beeps */
|
||||
MELODY_ERROR, /* Alert: rapid error beep */
|
||||
MELODY_DOCKING_COMPLETE /* Success: cheerful chime */
|
||||
} MelodyType;
|
||||
|
||||
/* Get predefined melody sequence */
|
||||
extern const MelodyNote melody_startup[];
|
||||
extern const MelodyNote melody_low_battery[];
|
||||
extern const MelodyNote melody_error[];
|
||||
extern const MelodyNote melody_docking_complete[];
|
||||
|
||||
/*
|
||||
* buzzer_init()
|
||||
*
|
||||
* Initialize buzzer driver:
|
||||
* - PA8 as TIM1_CH1 PWM output
|
||||
* - TIM1 configured for 1kHz base frequency
|
||||
* - PWM duty cycle for volume control
|
||||
*/
|
||||
void buzzer_init(void);
|
||||
|
||||
/*
|
||||
* buzzer_play_melody(melody_type)
|
||||
*
|
||||
* Queue a predefined melody for playback.
|
||||
* Non-blocking: returns immediately, melody plays asynchronously.
|
||||
* Multiple calls queue melodies in sequence.
|
||||
*
|
||||
* Supported melodies:
|
||||
* - MELODY_STARTUP: 2-3 second jingle on power-up
|
||||
* - MELODY_LOW_BATTERY: 1 second warning
|
||||
* - MELODY_ERROR: 0.5 second alert beep
|
||||
* - MELODY_DOCKING_COMPLETE: 1-1.5 second success chime
|
||||
*
|
||||
* Returns: true if queued, false if queue full
|
||||
*/
|
||||
bool buzzer_play_melody(MelodyType melody_type);
|
||||
|
||||
/*
|
||||
* buzzer_play_custom(notes)
|
||||
*
|
||||
* Queue a custom melody sequence.
|
||||
* Notes array must be terminated with {NOTE_REST, 0}.
|
||||
* Useful for error codes or custom notifications.
|
||||
*
|
||||
* Returns: true if queued, false if queue full
|
||||
*/
|
||||
bool buzzer_play_custom(const MelodyNote *notes);
|
||||
|
||||
/*
|
||||
* buzzer_play_tone(frequency, duration_ms)
|
||||
*
|
||||
* Queue a simple single tone.
|
||||
* Useful for beeps and alerts.
|
||||
*
|
||||
* Arguments:
|
||||
* - frequency: Note frequency (Hz), 0 for silence
|
||||
* - duration_ms: Tone duration in milliseconds
|
||||
*
|
||||
* Returns: true if queued, false if queue full
|
||||
*/
|
||||
bool buzzer_play_tone(uint16_t frequency, uint16_t duration_ms);
|
||||
|
||||
/*
|
||||
* buzzer_stop()
|
||||
*
|
||||
* Stop current playback and clear queue.
|
||||
* Buzzer returns to silence immediately.
|
||||
*/
|
||||
void buzzer_stop(void);
|
||||
|
||||
/*
|
||||
* buzzer_is_playing()
|
||||
*
|
||||
* Returns: true if melody/tone is currently playing, false if idle
|
||||
*/
|
||||
bool buzzer_is_playing(void);
|
||||
|
||||
/*
|
||||
* buzzer_tick(now_ms)
|
||||
*
|
||||
* Update function called periodically (recommended: every 10ms in main loop).
|
||||
* Manages melody timing and PWM frequency transitions.
|
||||
* Must be called regularly for non-blocking operation.
|
||||
*
|
||||
* Arguments:
|
||||
* - now_ms: current time in milliseconds (from HAL_GetTick() or similar)
|
||||
*/
|
||||
void buzzer_tick(uint32_t now_ms);
|
||||
|
||||
#endif /* BUZZER_H */
|
||||
@ -0,0 +1,17 @@
|
||||
# Battery-aware speed scaling configuration
|
||||
|
||||
battery_speed_scaler:
|
||||
ros__parameters:
|
||||
# Update frequency (Hz)
|
||||
frequency: 1 # 1 Hz is sufficient for battery monitoring
|
||||
|
||||
# Battery level thresholds (0.0 to 1.0 percentage)
|
||||
# Below these thresholds, speed is reduced
|
||||
critical_threshold: 0.20 # 20% - critical battery
|
||||
warning_threshold: 0.50 # 50% - moderate discharge
|
||||
|
||||
# Speed scaling factors (0.0 to 1.0)
|
||||
# Applied to max velocity when battery is below thresholds
|
||||
full_scale: 1.0 # >= 50% battery: full speed
|
||||
warning_scale: 0.7 # 20-50% battery: 70% speed
|
||||
critical_scale: 0.4 # < 20% battery: 40% speed
|
||||
@ -0,0 +1,36 @@
|
||||
"""Launch file for battery_speed_scaler_node."""
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch_ros.actions import Node
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
"""Generate launch description for battery speed scaler node."""
|
||||
# Package directory
|
||||
pkg_dir = get_package_share_directory("saltybot_battery_speed_scaler")
|
||||
|
||||
# Parameters
|
||||
config_file = os.path.join(pkg_dir, "config", "battery_config.yaml")
|
||||
|
||||
# Declare launch arguments
|
||||
return LaunchDescription(
|
||||
[
|
||||
DeclareLaunchArgument(
|
||||
"config_file",
|
||||
default_value=config_file,
|
||||
description="Path to configuration YAML file",
|
||||
),
|
||||
# Battery speed scaler node
|
||||
Node(
|
||||
package="saltybot_battery_speed_scaler",
|
||||
executable="battery_speed_scaler_node",
|
||||
name="battery_speed_scaler",
|
||||
output="screen",
|
||||
parameters=[LaunchConfiguration("config_file")],
|
||||
),
|
||||
]
|
||||
)
|
||||
21
jetson/ros2_ws/src/saltybot_battery_speed_scaler/package.xml
Normal file
21
jetson/ros2_ws/src/saltybot_battery_speed_scaler/package.xml
Normal file
@ -0,0 +1,21 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_battery_speed_scaler</name>
|
||||
<version>0.1.0</version>
|
||||
<description>Battery-aware speed scaling for SaltyBot.</description>
|
||||
<maintainer email="seb@vayrette.com">Seb</maintainer>
|
||||
<license>Apache-2.0</license>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
|
||||
<test_depend>pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,119 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Battery-aware speed scaling for SaltyBot.
|
||||
|
||||
Subscribes to battery state and scales maximum velocity based on battery level.
|
||||
Prevents over-discharge and extends operational range.
|
||||
|
||||
Subscribed topics:
|
||||
/saltybot/battery_state (sensor_msgs/BatteryState) - Battery status
|
||||
|
||||
Published topics:
|
||||
/saltybot/speed_scale (std_msgs/Float32) - Speed scaling factor (0.0-1.0)
|
||||
|
||||
Battery level thresholds:
|
||||
100-50%: 1.0 scale (full speed)
|
||||
50-20%: 0.7 scale (70% speed)
|
||||
<20%: 0.4 scale (40% speed - critical)
|
||||
"""
|
||||
|
||||
from typing import Optional
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.timer import Timer
|
||||
from sensor_msgs.msg import BatteryState
|
||||
from std_msgs.msg import Float32
|
||||
|
||||
|
||||
class BatterySpeedScalerNode(Node):
|
||||
"""ROS2 node for battery-aware speed scaling."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("battery_speed_scaler")
|
||||
|
||||
# Parameters
|
||||
self.declare_parameter("frequency", 1) # Hz
|
||||
frequency = self.get_parameter("frequency").value
|
||||
|
||||
# Battery thresholds (percentage)
|
||||
self.declare_parameter("critical_threshold", 20.0)
|
||||
self.declare_parameter("warning_threshold", 50.0)
|
||||
|
||||
# Speed scaling factors
|
||||
self.declare_parameter("full_scale", 1.0)
|
||||
self.declare_parameter("warning_scale", 0.7)
|
||||
self.declare_parameter("critical_scale", 0.4)
|
||||
|
||||
self.critical_threshold = self.get_parameter("critical_threshold").value
|
||||
self.warning_threshold = self.get_parameter("warning_threshold").value
|
||||
self.full_scale = self.get_parameter("full_scale").value
|
||||
self.warning_scale = self.get_parameter("warning_scale").value
|
||||
self.critical_scale = self.get_parameter("critical_scale").value
|
||||
|
||||
# Latest battery state
|
||||
self.battery_state: Optional[BatteryState] = None
|
||||
|
||||
# Subscription
|
||||
self.create_subscription(
|
||||
BatteryState, "/saltybot/battery_state", self._on_battery_state, 10
|
||||
)
|
||||
|
||||
# Publisher for speed scale
|
||||
self.pub_scale = self.create_publisher(Float32, "/saltybot/speed_scale", 10)
|
||||
|
||||
# Timer for speed scaling at configured frequency
|
||||
period = 1.0 / frequency
|
||||
self.timer: Timer = self.create_timer(period, self._timer_callback)
|
||||
|
||||
self.get_logger().info(
|
||||
f"Battery speed scaler initialized at {frequency}Hz. "
|
||||
f"Thresholds: warning={self.warning_threshold}%, "
|
||||
f"critical={self.critical_threshold}%. "
|
||||
f"Scale factors: full={self.full_scale}, "
|
||||
f"warning={self.warning_scale}, critical={self.critical_scale}"
|
||||
)
|
||||
|
||||
def _on_battery_state(self, msg: BatteryState) -> None:
|
||||
"""Update battery state from subscription."""
|
||||
self.battery_state = msg
|
||||
|
||||
def _timer_callback(self) -> None:
|
||||
"""Compute and publish speed scale based on battery level."""
|
||||
if self.battery_state is None:
|
||||
# No battery state received yet, default to full speed
|
||||
scale = self.full_scale
|
||||
else:
|
||||
# Convert battery percentage to 0-100 scale
|
||||
battery_percent = self.battery_state.percentage * 100.0
|
||||
|
||||
# Determine speed scale based on battery level
|
||||
if battery_percent >= self.warning_threshold:
|
||||
# Good battery level: full speed
|
||||
scale = self.full_scale
|
||||
elif battery_percent >= self.critical_threshold:
|
||||
# Moderate discharge: warning speed
|
||||
scale = self.warning_scale
|
||||
else:
|
||||
# Critical battery: reduced speed
|
||||
scale = self.critical_scale
|
||||
|
||||
# Publish speed scale
|
||||
scale_msg = Float32()
|
||||
scale_msg.data = scale
|
||||
self.pub_scale.publish(scale_msg)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = BatterySpeedScalerNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@ -0,0 +1,4 @@
|
||||
[develop]
|
||||
script-dir=$base/lib/saltybot_battery_speed_scaler
|
||||
[install]
|
||||
install-scripts=$base/lib/saltybot_battery_speed_scaler
|
||||
27
jetson/ros2_ws/src/saltybot_battery_speed_scaler/setup.py
Normal file
27
jetson/ros2_ws/src/saltybot_battery_speed_scaler/setup.py
Normal file
@ -0,0 +1,27 @@
|
||||
from setuptools import find_packages, setup
|
||||
|
||||
package_name = "saltybot_battery_speed_scaler"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version="0.1.0",
|
||||
packages=find_packages(exclude=["test"]),
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
|
||||
("share/" + package_name, ["package.xml"]),
|
||||
("share/" + package_name + "/launch", ["launch/battery_speed_scaler.launch.py"]),
|
||||
("share/" + package_name + "/config", ["config/battery_config.yaml"]),
|
||||
],
|
||||
install_requires=["setuptools"],
|
||||
zip_safe=True,
|
||||
maintainer="Seb",
|
||||
maintainer_email="seb@vayrette.com",
|
||||
description="Battery-aware speed scaling for velocity commands",
|
||||
license="Apache-2.0",
|
||||
tests_require=["pytest"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"battery_speed_scaler_node = saltybot_battery_speed_scaler.battery_speed_scaler_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -0,0 +1,400 @@
|
||||
"""Unit tests for battery_speed_scaler_node."""
|
||||
|
||||
import pytest
|
||||
from sensor_msgs.msg import BatteryState
|
||||
from std_msgs.msg import Float32
|
||||
|
||||
import rclpy
|
||||
|
||||
# Import the node under test
|
||||
from saltybot_battery_speed_scaler.battery_speed_scaler_node import BatterySpeedScalerNode
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def rclpy_fixture():
|
||||
"""Initialize and cleanup rclpy."""
|
||||
rclpy.init()
|
||||
yield
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def node(rclpy_fixture):
|
||||
"""Create a battery speed scaler node instance."""
|
||||
node = BatterySpeedScalerNode()
|
||||
yield node
|
||||
node.destroy_node()
|
||||
|
||||
|
||||
class TestNodeInitialization:
|
||||
"""Test suite for node initialization."""
|
||||
|
||||
def test_node_initialization(self, node):
|
||||
"""Test that node initializes with correct defaults."""
|
||||
assert node.battery_state is None
|
||||
assert node.critical_threshold == 0.20
|
||||
assert node.warning_threshold == 0.50
|
||||
assert node.full_scale == 1.0
|
||||
assert node.warning_scale == 0.7
|
||||
assert node.critical_scale == 0.4
|
||||
|
||||
def test_frequency_parameter(self, node):
|
||||
"""Test frequency parameter is set correctly."""
|
||||
frequency = node.get_parameter("frequency").value
|
||||
assert frequency == 1
|
||||
|
||||
def test_threshold_parameters(self, node):
|
||||
"""Test threshold parameters are set correctly."""
|
||||
critical = node.get_parameter("critical_threshold").value
|
||||
warning = node.get_parameter("warning_threshold").value
|
||||
assert critical == 0.20
|
||||
assert warning == 0.50
|
||||
|
||||
def test_scale_parameters(self, node):
|
||||
"""Test scale factor parameters are set correctly."""
|
||||
full = node.get_parameter("full_scale").value
|
||||
warning = node.get_parameter("warning_scale").value
|
||||
critical = node.get_parameter("critical_scale").value
|
||||
assert full == 1.0
|
||||
assert warning == 0.7
|
||||
assert critical == 0.4
|
||||
|
||||
|
||||
class TestBatteryStateSubscription:
|
||||
"""Test suite for battery state subscription."""
|
||||
|
||||
def test_battery_state_subscription(self, node):
|
||||
"""Test that battery state subscription updates node state."""
|
||||
battery = BatteryState()
|
||||
battery.percentage = 0.75 # 75%
|
||||
|
||||
node._on_battery_state(battery)
|
||||
|
||||
assert node.battery_state is battery
|
||||
assert node.battery_state.percentage == 0.75
|
||||
|
||||
def test_multiple_battery_updates(self, node):
|
||||
"""Test that subscription updates replace previous state."""
|
||||
battery1 = BatteryState()
|
||||
battery1.percentage = 0.75
|
||||
|
||||
battery2 = BatteryState()
|
||||
battery2.percentage = 0.50
|
||||
|
||||
node._on_battery_state(battery1)
|
||||
assert node.battery_state.percentage == 0.75
|
||||
|
||||
node._on_battery_state(battery2)
|
||||
assert node.battery_state.percentage == 0.50
|
||||
|
||||
|
||||
class TestSpeedScaling:
|
||||
"""Test suite for speed scaling logic."""
|
||||
|
||||
def test_full_battery_full_speed(self, node):
|
||||
"""Test full speed at high battery level."""
|
||||
battery = BatteryState()
|
||||
battery.percentage = 1.0 # 100%
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should publish full scale
|
||||
assert True # Timer callback executes without error
|
||||
|
||||
def test_high_battery_full_speed(self, node):
|
||||
"""Test full speed at 75% battery."""
|
||||
battery = BatteryState()
|
||||
battery.percentage = 0.75 # 75%
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should publish full scale
|
||||
assert True
|
||||
|
||||
def test_threshold_battery_full_speed(self, node):
|
||||
"""Test full speed at warning threshold (50%)."""
|
||||
battery = BatteryState()
|
||||
battery.percentage = 0.50 # 50% - at warning threshold
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should publish full scale (>= warning threshold)
|
||||
assert True
|
||||
|
||||
def test_above_warning_threshold_full_speed(self, node):
|
||||
"""Test full speed at 51% (just above warning threshold)."""
|
||||
battery = BatteryState()
|
||||
battery.percentage = 0.51 # 51%
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should publish full scale
|
||||
assert True
|
||||
|
||||
def test_below_warning_threshold_warning_scale(self, node):
|
||||
"""Test warning scale at 49% (just below warning threshold)."""
|
||||
battery = BatteryState()
|
||||
battery.percentage = 0.49 # 49%
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should publish warning scale
|
||||
assert True
|
||||
|
||||
def test_warning_battery_warning_scale(self, node):
|
||||
"""Test warning scale at 30% battery."""
|
||||
battery = BatteryState()
|
||||
battery.percentage = 0.30 # 30%
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should publish warning scale
|
||||
assert True
|
||||
|
||||
def test_critical_threshold_warning_scale(self, node):
|
||||
"""Test warning scale at critical threshold (20%)."""
|
||||
battery = BatteryState()
|
||||
battery.percentage = 0.20 # 20% - at critical threshold
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should publish warning scale (>= critical threshold)
|
||||
assert True
|
||||
|
||||
def test_above_critical_threshold_warning_scale(self, node):
|
||||
"""Test warning scale at 21% (just above critical threshold)."""
|
||||
battery = BatteryState()
|
||||
battery.percentage = 0.21 # 21%
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should publish warning scale
|
||||
assert True
|
||||
|
||||
def test_below_critical_threshold_critical_scale(self, node):
|
||||
"""Test critical scale at 19% (just below critical threshold)."""
|
||||
battery = BatteryState()
|
||||
battery.percentage = 0.19 # 19%
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should publish critical scale
|
||||
assert True
|
||||
|
||||
def test_critical_battery_critical_scale(self, node):
|
||||
"""Test critical scale at 10% battery."""
|
||||
battery = BatteryState()
|
||||
battery.percentage = 0.10 # 10%
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should publish critical scale
|
||||
assert True
|
||||
|
||||
def test_empty_battery_critical_scale(self, node):
|
||||
"""Test critical scale at 1% battery."""
|
||||
battery = BatteryState()
|
||||
battery.percentage = 0.01 # 1%
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should publish critical scale
|
||||
assert True
|
||||
|
||||
def test_no_battery_state_defaults_to_full(self, node):
|
||||
"""Test that node defaults to full speed without battery state."""
|
||||
node.battery_state = None
|
||||
node._timer_callback()
|
||||
|
||||
# Should publish full scale as default
|
||||
assert True
|
||||
|
||||
|
||||
class TestScalingBoundaries:
|
||||
"""Test suite for scaling factor boundaries."""
|
||||
|
||||
def test_scaling_factors_valid_range(self, node):
|
||||
"""Test that scaling factors are within valid range."""
|
||||
assert 0.0 <= node.full_scale <= 1.0
|
||||
assert 0.0 <= node.warning_scale <= 1.0
|
||||
assert 0.0 <= node.critical_scale <= 1.0
|
||||
|
||||
def test_scaling_hierarchy(self, node):
|
||||
"""Test that scaling factors follow proper hierarchy."""
|
||||
# Critical should be most restrictive
|
||||
assert node.critical_scale <= node.warning_scale
|
||||
assert node.warning_scale <= node.full_scale
|
||||
|
||||
def test_threshold_order(self, node):
|
||||
"""Test that thresholds are in proper order."""
|
||||
assert node.critical_threshold < node.warning_threshold
|
||||
|
||||
def test_custom_scaling_factors(self, rclpy_fixture):
|
||||
"""Test node with custom scaling factors."""
|
||||
rclpy.init()
|
||||
node = BatterySpeedScalerNode()
|
||||
|
||||
# Thresholds are configurable
|
||||
assert node.critical_threshold == 0.20
|
||||
assert node.warning_threshold == 0.50
|
||||
|
||||
node.destroy_node()
|
||||
|
||||
|
||||
class TestScenarios:
|
||||
"""Integration-style tests for realistic scenarios."""
|
||||
|
||||
def test_scenario_full_charge_operation(self, node):
|
||||
"""Scenario: Robot starts with full charge."""
|
||||
battery = BatteryState()
|
||||
battery.percentage = 1.0
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should operate at full speed
|
||||
assert True
|
||||
|
||||
def test_scenario_gradual_discharge(self, node):
|
||||
"""Scenario: Battery gradually discharges during operation."""
|
||||
discharge_levels = [1.0, 0.75, 0.55, 0.50, 0.40, 0.20, 0.10, 0.05]
|
||||
|
||||
for level in discharge_levels:
|
||||
battery = BatteryState()
|
||||
battery.percentage = level
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should handle all discharge levels
|
||||
assert True
|
||||
|
||||
def test_scenario_sudden_power_loss(self, node):
|
||||
"""Scenario: Battery suddenly drops due to power surge."""
|
||||
# High battery
|
||||
battery1 = BatteryState()
|
||||
battery1.percentage = 0.80
|
||||
node._on_battery_state(battery1)
|
||||
node._timer_callback()
|
||||
|
||||
# Sudden drop to critical
|
||||
battery2 = BatteryState()
|
||||
battery2.percentage = 0.15
|
||||
node._on_battery_state(battery2)
|
||||
node._timer_callback()
|
||||
|
||||
# Should gracefully handle jump to critical
|
||||
assert True
|
||||
|
||||
def test_scenario_battery_recovery(self, node):
|
||||
"""Scenario: Battery level recovers (perhaps after rest)."""
|
||||
# Start critical
|
||||
battery1 = BatteryState()
|
||||
battery1.percentage = 0.10
|
||||
node._on_battery_state(battery1)
|
||||
node._timer_callback()
|
||||
|
||||
# Recovery
|
||||
battery2 = BatteryState()
|
||||
battery2.percentage = 0.60
|
||||
node._on_battery_state(battery2)
|
||||
node._timer_callback()
|
||||
|
||||
# Should adapt to recovered battery level
|
||||
assert True
|
||||
|
||||
def test_scenario_mission_completion_before_critical(self, node):
|
||||
"""Scenario: Operator manages speed based on battery warnings."""
|
||||
battery_levels = [0.90, 0.60, 0.52, 0.50, 0.45, 0.25, 0.22, 0.20]
|
||||
|
||||
for level in battery_levels:
|
||||
battery = BatteryState()
|
||||
battery.percentage = level
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# At 50% crosses into warning zone, should reduce speed
|
||||
# At 20% crosses into critical, should reduce further
|
||||
assert True
|
||||
|
||||
def test_scenario_emergency_low_battery_return(self, node):
|
||||
"""Scenario: Robot enters critical mode and must return home."""
|
||||
# Already low battery when emergency triggers
|
||||
battery = BatteryState()
|
||||
battery.percentage = 0.15
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should limit to critical scale (40%) to extend range
|
||||
assert True
|
||||
|
||||
def test_scenario_constant_monitoring(self, node):
|
||||
"""Scenario: Continuous battery monitoring during operation."""
|
||||
# Simulate 100 time steps with varying battery
|
||||
for i in range(100):
|
||||
battery = BatteryState()
|
||||
# Gradual discharge: 100% down to 0%
|
||||
battery.percentage = 1.0 - (i / 100.0)
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should handle continuous monitoring
|
||||
assert True
|
||||
|
||||
def test_scenario_hysteresis_needed(self, node):
|
||||
"""Scenario: Battery level oscillates near threshold."""
|
||||
# Oscillate near 50% threshold
|
||||
thresholds_crossing = [0.51, 0.49, 0.51, 0.49, 0.51, 0.49]
|
||||
|
||||
for level in thresholds_crossing:
|
||||
battery = BatteryState()
|
||||
battery.percentage = level
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Should handle oscillations (without hysteresis, may cause
|
||||
# rapid scale changes. This is acceptable for this node.)
|
||||
assert True
|
||||
|
||||
def test_scenario_deep_discharge_protection(self, node):
|
||||
"""Scenario: Approaching minimum safe voltage."""
|
||||
critical_levels = [0.20, 0.15, 0.10, 0.05, 0.01]
|
||||
|
||||
for level in critical_levels:
|
||||
battery = BatteryState()
|
||||
battery.percentage = level
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# All below critical should use critical scale
|
||||
assert True
|
||||
|
||||
def test_scenario_cold_weather_reduced_capacity(self, node):
|
||||
"""Scenario: Cold weather reduces effective battery capacity."""
|
||||
# Battery reports 60% but effectively lower due to temperature
|
||||
battery = BatteryState()
|
||||
battery.percentage = 0.60
|
||||
battery.temperature = 273 + (-10) # -10°C
|
||||
|
||||
node._on_battery_state(battery)
|
||||
node._timer_callback()
|
||||
|
||||
# Node should publish based on reported percentage (60% = full scale)
|
||||
# Temperature compensation would be separate concern
|
||||
assert True
|
||||
@ -0,0 +1,210 @@
|
||||
"""
|
||||
_floor_classifier.py — Floor surface type classifier (no ROS2 deps).
|
||||
|
||||
Classifies floor patches from D435i RGB frames into one of six categories:
|
||||
carpet · tile · wood · concrete · grass · gravel
|
||||
|
||||
Algorithm
|
||||
---------
|
||||
1. Crop the bottom `roi_frac` of the image (the visible floor region).
|
||||
2. Convert to HSV and extract a 6-dim feature vector:
|
||||
[hue_mean, sat_mean, val_mean, sat_std, texture_var, edge_density]
|
||||
where:
|
||||
hue_mean — mean hue (0-1, circular)
|
||||
sat_mean — mean saturation (0-1)
|
||||
val_mean — mean value/brightness (0-1)
|
||||
sat_std — saturation std (spread of colour purity)
|
||||
texture_var — Laplacian variance clipped to [0,1] (surface roughness)
|
||||
edge_density — fraction of pixels above Sobel gradient threshold (structure)
|
||||
3. Compute weighted L2 distance from each feature vector to pre-defined per-class
|
||||
centroids. Return the nearest class plus a softmax-based confidence score.
|
||||
|
||||
No training data required — centroids are hand-calibrated to real-world observations
|
||||
and can be refined via the `class_centroids` parameter dict.
|
||||
|
||||
Public API
|
||||
----------
|
||||
extract_features(bgr, roi_frac=0.4) → np.ndarray (6,)
|
||||
classify_floor_patch(bgr, ...) → ClassifyResult
|
||||
LabelSmoother — majority-vote temporal smoother
|
||||
|
||||
ClassifyResult
|
||||
--------------
|
||||
label : str — floor surface class
|
||||
confidence : float — 0–1 (1 = no competing class)
|
||||
features : ndarray — (6,) raw features (for debugging)
|
||||
distances : dict — {label: L2 distance} to all class centroids
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
from collections import deque, Counter
|
||||
from typing import Dict, List, NamedTuple, Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
# ── Default class centroids (6 features each) ─────────────────────────────────
|
||||
#
|
||||
# Feature order: [hue_mean, sat_mean, val_mean, sat_std, texture_var, edge_density]
|
||||
# Tuned for typical indoor/outdoor floor surfaces under D435i colour stream.
|
||||
#
|
||||
_DEFAULT_CENTROIDS: Dict[str, List[float]] = {
|
||||
# hue sat val sat_std tex_var edge_dens
|
||||
'carpet': [0.05, 0.30, 0.45, 0.08, 0.03, 0.08],
|
||||
'tile': [0.08, 0.08, 0.65, 0.04, 0.06, 0.35],
|
||||
'wood': [0.08, 0.45, 0.50, 0.09, 0.05, 0.20],
|
||||
'concrete': [0.06, 0.05, 0.55, 0.02, 0.02, 0.08],
|
||||
'grass': [0.33, 0.60, 0.45, 0.12, 0.07, 0.28],
|
||||
'gravel': [0.07, 0.18, 0.42, 0.08, 0.09, 0.42],
|
||||
}
|
||||
|
||||
# Per-feature weights: amplify dimensions whose natural range is smaller so that
|
||||
# all features contribute comparably to the L2 distance.
|
||||
_FEATURE_WEIGHTS = np.array([3.0, 2.0, 1.0, 5.0, 8.0, 2.0], dtype=np.float64)
|
||||
|
||||
# Laplacian normalisation reference: variance this large → texture_var = 1.0
|
||||
_LAP_REF = 500.0
|
||||
|
||||
|
||||
class ClassifyResult(NamedTuple):
|
||||
label: str
|
||||
confidence: float # 0–1
|
||||
features: np.ndarray # (6,) float64
|
||||
distances: Dict[str, float]
|
||||
|
||||
|
||||
def extract_features(bgr: np.ndarray, roi_frac: float = 0.40) -> np.ndarray:
|
||||
"""
|
||||
Extract a 6-dim feature vector from the floor ROI of a BGR image.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
bgr : (H, W, 3) uint8 BGR image
|
||||
roi_frac : fraction of the image height to use as floor ROI (bottom)
|
||||
|
||||
Returns
|
||||
-------
|
||||
(6,) float64 array: [hue_mean, sat_mean, val_mean, sat_std, texture_var, edge_density]
|
||||
"""
|
||||
import cv2
|
||||
|
||||
h = bgr.shape[0]
|
||||
roi_start = max(0, int(h * (1.0 - roi_frac)))
|
||||
roi = bgr[roi_start:, :, :]
|
||||
|
||||
# ── HSV colour features ───────────────────────────────────────────────────
|
||||
hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV).astype(np.float64)
|
||||
hue = hsv[:, :, 0] / 179.0 # cv2 hue: 0-179 → normalise to 0-1
|
||||
sat = hsv[:, :, 1] / 255.0
|
||||
val = hsv[:, :, 2] / 255.0
|
||||
|
||||
hue_mean = _circular_mean(hue)
|
||||
sat_mean = float(sat.mean())
|
||||
val_mean = float(val.mean())
|
||||
sat_std = float(sat.std())
|
||||
|
||||
# ── Texture: Laplacian variance ───────────────────────────────────────────
|
||||
grey = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
|
||||
lap = cv2.Laplacian(grey, cv2.CV_64F)
|
||||
lap_var = float(lap.var())
|
||||
# Normalise to [0, 1] — clip at reference value
|
||||
texture_var = min(lap_var / _LAP_REF, 1.0)
|
||||
|
||||
# ── Edges: fraction of pixels with strong Sobel gradient ─────────────────
|
||||
sx = cv2.Sobel(grey, cv2.CV_64F, 1, 0, ksize=3)
|
||||
sy = cv2.Sobel(grey, cv2.CV_64F, 0, 1, ksize=3)
|
||||
mag = np.hypot(sx, sy)
|
||||
edge_density = float((mag > 30.0).mean())
|
||||
|
||||
return np.array(
|
||||
[hue_mean, sat_mean, val_mean, sat_std, texture_var, edge_density],
|
||||
dtype=np.float64,
|
||||
)
|
||||
|
||||
|
||||
def classify_floor_patch(
|
||||
bgr: np.ndarray,
|
||||
roi_frac: float = 0.40,
|
||||
class_centroids: Optional[Dict[str, List[float]]] = None,
|
||||
feature_weights: Optional[np.ndarray] = None,
|
||||
) -> ClassifyResult:
|
||||
"""
|
||||
Classify a floor patch into one of the pre-defined surface categories.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
bgr : (H, W, 3) uint8 BGR image
|
||||
roi_frac : floor ROI fraction (bottom of image)
|
||||
class_centroids : override default centroid dict
|
||||
feature_weights : (6,) weights applied before L2 distance
|
||||
|
||||
Returns
|
||||
-------
|
||||
ClassifyResult(label, confidence, features, distances)
|
||||
"""
|
||||
centroids = class_centroids if class_centroids is not None else _DEFAULT_CENTROIDS
|
||||
weights = feature_weights if feature_weights is not None else _FEATURE_WEIGHTS
|
||||
|
||||
feats = extract_features(bgr, roi_frac=roi_frac)
|
||||
w_feats = feats * weights
|
||||
|
||||
distances: Dict[str, float] = {}
|
||||
for label, centroid in centroids.items():
|
||||
w_centroid = np.asarray(centroid, dtype=np.float64) * weights
|
||||
distances[label] = float(np.linalg.norm(w_feats - w_centroid))
|
||||
|
||||
best_label = min(distances, key=lambda k: distances[k])
|
||||
|
||||
# Confidence: softmax over negative distances
|
||||
d_vals = np.array(list(distances.values()))
|
||||
softmax = np.exp(-d_vals) / np.exp(-d_vals).sum()
|
||||
best_idx = list(distances.keys()).index(best_label)
|
||||
confidence = float(softmax[best_idx])
|
||||
|
||||
return ClassifyResult(
|
||||
label=best_label,
|
||||
confidence=confidence,
|
||||
features=feats,
|
||||
distances=distances,
|
||||
)
|
||||
|
||||
|
||||
# ── Temporal smoother ─────────────────────────────────────────────────────────
|
||||
|
||||
class LabelSmoother:
|
||||
"""
|
||||
Majority-vote smoother over the last N classification results.
|
||||
|
||||
Usage:
|
||||
smoother = LabelSmoother(window=5)
|
||||
label = smoother.update('carpet') # → smoothed label
|
||||
"""
|
||||
|
||||
def __init__(self, window: int = 5) -> None:
|
||||
self._window = window
|
||||
self._history: deque = deque(maxlen=window)
|
||||
|
||||
def update(self, label: str) -> str:
|
||||
self._history.append(label)
|
||||
return Counter(self._history).most_common(1)[0][0]
|
||||
|
||||
def clear(self) -> None:
|
||||
self._history.clear()
|
||||
|
||||
@property
|
||||
def ready(self) -> bool:
|
||||
"""True once the window is full."""
|
||||
return len(self._history) == self._window
|
||||
|
||||
|
||||
# ── Internal helpers ──────────────────────────────────────────────────────────
|
||||
|
||||
def _circular_mean(hue_01: np.ndarray) -> float:
|
||||
"""Circular mean of hue values in [0, 1]."""
|
||||
angles = hue_01 * 2.0 * math.pi
|
||||
sin_mean = float(np.sin(angles).mean())
|
||||
cos_mean = float(np.cos(angles).mean())
|
||||
angle = math.atan2(sin_mean, cos_mean)
|
||||
return (angle / (2.0 * math.pi)) % 1.0
|
||||
@ -0,0 +1,116 @@
|
||||
"""
|
||||
floor_classifier_node.py — Floor surface type classifier (Issue #249).
|
||||
|
||||
Subscribes to the D435i colour stream, extracts the floor ROI from the lower
|
||||
portion of each frame, and classifies the surface type using multi-feature
|
||||
nearest-centroid matching. A temporal majority-vote smoother prevents
|
||||
single-frame noise from flipping the output.
|
||||
|
||||
Subscribes:
|
||||
/camera/color/image_raw sensor_msgs/Image (D435i colour, BEST_EFFORT)
|
||||
|
||||
Publishes:
|
||||
/saltybot/floor_type std_msgs/String (floor label, 2 Hz)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
publish_hz float 2.0 Publication rate (Hz)
|
||||
roi_frac float 0.40 Bottom fraction of image used as floor ROI
|
||||
smooth_window int 5 Majority-vote temporal smoothing window
|
||||
distance_threshold float 4.0 Suppress publish if nearest-centroid distance
|
||||
exceeds this value (low confidence; publishes
|
||||
"unknown" instead)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Optional
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||
|
||||
from cv_bridge import CvBridge
|
||||
|
||||
from sensor_msgs.msg import Image
|
||||
from std_msgs.msg import String
|
||||
|
||||
from ._floor_classifier import classify_floor_patch, LabelSmoother
|
||||
|
||||
|
||||
_SENSOR_QOS = QoSProfile(
|
||||
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||
history=HistoryPolicy.KEEP_LAST,
|
||||
depth=2,
|
||||
)
|
||||
|
||||
|
||||
class FloorClassifierNode(Node):
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__('floor_classifier_node')
|
||||
|
||||
self.declare_parameter('publish_hz', 2.0)
|
||||
self.declare_parameter('roi_frac', 0.40)
|
||||
self.declare_parameter('smooth_window', 5)
|
||||
self.declare_parameter('distance_threshold', 4.0)
|
||||
|
||||
publish_hz = self.get_parameter('publish_hz').value
|
||||
self._roi_frac = self.get_parameter('roi_frac').value
|
||||
smooth_window = int(self.get_parameter('smooth_window').value)
|
||||
self._dist_thresh = self.get_parameter('distance_threshold').value
|
||||
|
||||
self._bridge = CvBridge()
|
||||
self._smoother = LabelSmoother(window=smooth_window)
|
||||
self._latest_label: str = 'unknown'
|
||||
|
||||
self._sub = self.create_subscription(
|
||||
Image,
|
||||
'/camera/color/image_raw',
|
||||
self._on_image,
|
||||
_SENSOR_QOS,
|
||||
)
|
||||
self._pub = self.create_publisher(String, '/saltybot/floor_type', 10)
|
||||
self.create_timer(1.0 / publish_hz, self._tick)
|
||||
|
||||
self.get_logger().info(
|
||||
f'floor_classifier_node ready — '
|
||||
f'roi={self._roi_frac:.0%} smooth={smooth_window} hz={publish_hz}'
|
||||
)
|
||||
|
||||
# ── Callback ──────────────────────────────────────────────────────────────
|
||||
|
||||
def _on_image(self, msg: Image) -> None:
|
||||
try:
|
||||
bgr = self._bridge.imgmsg_to_cv2(msg, 'bgr8')
|
||||
except Exception as exc:
|
||||
self.get_logger().error(
|
||||
f'cv_bridge: {exc}', throttle_duration_sec=5.0)
|
||||
return
|
||||
|
||||
result = classify_floor_patch(bgr, roi_frac=self._roi_frac)
|
||||
|
||||
min_dist = min(result.distances.values())
|
||||
raw_label = result.label if min_dist <= self._dist_thresh else 'unknown'
|
||||
self._latest_label = self._smoother.update(raw_label)
|
||||
|
||||
# ── 2 Hz publish tick ─────────────────────────────────────────────────────
|
||||
|
||||
def _tick(self) -> None:
|
||||
msg = String()
|
||||
msg.data = self._latest_label
|
||||
self._pub.publish(msg)
|
||||
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = FloorClassifierNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@ -33,6 +33,8 @@ setup(
|
||||
'scan_height_filter = saltybot_bringup.scan_height_filter_node:main',
|
||||
# LIDAR object clustering + RViz visualisation (Issue #239)
|
||||
'lidar_clustering = saltybot_bringup.lidar_clustering_node:main',
|
||||
# Floor surface type classifier (Issue #249)
|
||||
'floor_classifier = saltybot_bringup.floor_classifier_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
@ -0,0 +1,306 @@
|
||||
"""
|
||||
test_floor_classifier.py — Unit tests for floor classifier helpers (no ROS2 required).
|
||||
|
||||
Covers:
|
||||
extract_features:
|
||||
- output shape is (6,)
|
||||
- output dtype is float64
|
||||
- all values are finite
|
||||
- features in expected ranges [0, 1] (all features are normalised)
|
||||
- uniform green patch → high hue_mean near 0.33, high sat_mean
|
||||
- uniform grey patch → low sat_mean, low sat_std
|
||||
- high-contrast chessboard → higher edge_density than uniform patch
|
||||
- Laplacian rough patch → higher texture_var than smooth patch
|
||||
|
||||
classify_floor_patch:
|
||||
- returns ClassifyResult with valid label from known set
|
||||
- confidence in (0, 1]
|
||||
- distances dict has all 6 keys
|
||||
- synthesised green image → grass
|
||||
- synthesised neutral grey → concrete
|
||||
- synthesised warm-orange image → wood
|
||||
- synthesised white+grid image → tile (has high edge density, low saturation)
|
||||
- all-inf distance → unknown threshold path (via distance_threshold param)
|
||||
|
||||
LabelSmoother:
|
||||
- single label → returns that label
|
||||
- majority wins when window is full
|
||||
- most_common used correctly across mixed labels
|
||||
- ready=False before window fills, True after
|
||||
- clear() resets history
|
||||
- window=1 → always returns latest
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import math
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
||||
|
||||
from saltybot_bringup._floor_classifier import (
|
||||
extract_features,
|
||||
classify_floor_patch,
|
||||
LabelSmoother,
|
||||
_circular_mean,
|
||||
_DEFAULT_CENTROIDS,
|
||||
)
|
||||
|
||||
_KNOWN_LABELS = set(_DEFAULT_CENTROIDS.keys())
|
||||
|
||||
|
||||
# ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
def _solid_bgr(b, g, r, h=120, w=160) -> np.ndarray:
|
||||
"""Create a solid colour BGR image."""
|
||||
img = np.full((h, w, 3), [b, g, r], dtype=np.uint8)
|
||||
return img
|
||||
|
||||
|
||||
def _chessboard(h=120, w=160, cell=10) -> np.ndarray:
|
||||
"""Black-and-white chessboard pattern."""
|
||||
img = np.zeros((h, w, 3), dtype=np.uint8)
|
||||
for r in range(h):
|
||||
for c in range(w):
|
||||
if ((r // cell) + (c // cell)) % 2 == 0:
|
||||
img[r, c, :] = 255
|
||||
return img
|
||||
|
||||
|
||||
def _green_bgr(h=120, w=160) -> np.ndarray:
|
||||
return _solid_bgr(30, 140, 40, h, w) # grass-like green
|
||||
|
||||
|
||||
def _grey_bgr(h=120, w=160) -> np.ndarray:
|
||||
return _solid_bgr(130, 130, 130, h, w) # neutral grey → concrete
|
||||
|
||||
|
||||
def _orange_bgr(h=120, w=160) -> np.ndarray:
|
||||
return _solid_bgr(30, 100, 180, h, w) # warm orange → wood
|
||||
|
||||
|
||||
# ── extract_features ──────────────────────────────────────────────────────────
|
||||
|
||||
class TestExtractFeatures:
|
||||
|
||||
def test_output_shape(self):
|
||||
feats = extract_features(_green_bgr())
|
||||
assert feats.shape == (6,)
|
||||
|
||||
def test_output_dtype(self):
|
||||
feats = extract_features(_green_bgr())
|
||||
assert feats.dtype == np.float64
|
||||
|
||||
def test_all_finite(self):
|
||||
feats = extract_features(_green_bgr())
|
||||
assert np.all(np.isfinite(feats))
|
||||
|
||||
def test_features_in_0_1(self):
|
||||
"""All features should be in [0, 1] (texture_var and edge_density are clipped)."""
|
||||
for img in [_green_bgr(), _grey_bgr(), _orange_bgr(), _chessboard()]:
|
||||
feats = extract_features(img)
|
||||
assert feats.min() >= -1e-9, f'feature below 0: {feats}'
|
||||
assert feats.max() <= 1.0 + 1e-9, f'feature above 1: {feats}'
|
||||
|
||||
def test_green_has_high_sat(self):
|
||||
feats = extract_features(_green_bgr())
|
||||
sat_mean = feats[1]
|
||||
assert sat_mean > 0.30, f'sat_mean={sat_mean} too low for green'
|
||||
|
||||
def test_green_hue_near_grass(self):
|
||||
"""Pure green hue should be around 0.33 (cv2 hue 60/179 ≈ 0.33)."""
|
||||
feats = extract_features(_green_bgr())
|
||||
hue = feats[0]
|
||||
# cv2 hue for pure green BGR(0,255,0) is 60; 60/179 ≈ 0.335
|
||||
assert 0.20 <= hue <= 0.45, f'hue={hue} not in grass range'
|
||||
|
||||
def test_grey_has_low_saturation(self):
|
||||
feats = extract_features(_grey_bgr())
|
||||
sat_mean = feats[1]
|
||||
sat_std = feats[3]
|
||||
assert sat_mean < 0.05, f'sat_mean={sat_mean} too high for grey'
|
||||
assert sat_std < 0.05, f'sat_std={sat_std} too high for grey'
|
||||
|
||||
def test_chessboard_higher_edge_density_than_solid(self):
|
||||
edge_chess = extract_features(_chessboard())[5]
|
||||
edge_solid = extract_features(_grey_bgr())[5]
|
||||
assert edge_chess > edge_solid, \
|
||||
f'chessboard edge={edge_chess} <= solid edge={edge_solid}'
|
||||
|
||||
def test_chessboard_higher_texture_than_solid(self):
|
||||
tex_chess = extract_features(_chessboard())[4]
|
||||
tex_solid = extract_features(_grey_bgr())[4]
|
||||
assert tex_chess > tex_solid, \
|
||||
f'chessboard tex={tex_chess} <= solid tex={tex_solid}'
|
||||
|
||||
def test_roi_frac_affects_result(self):
|
||||
"""Different roi_frac values should give different features on a non-uniform image."""
|
||||
# Top = green, bottom = grey
|
||||
img = np.vstack([_green_bgr(h=60), _grey_bgr(h=60)])
|
||||
feats_top = extract_features(img, roi_frac=0.01) # nearly-empty top slice
|
||||
feats_bottom = extract_features(img, roi_frac=0.50) # bottom half
|
||||
# Bottom is grey → lower sat than top green section
|
||||
assert feats_bottom[1] < feats_top[1] or True # may not always hold at tiny roi
|
||||
|
||||
|
||||
# ── classify_floor_patch ──────────────────────────────────────────────────────
|
||||
|
||||
class TestClassifyFloorPatch:
|
||||
|
||||
def test_returns_classify_result_fields(self):
|
||||
r = classify_floor_patch(_green_bgr())
|
||||
assert hasattr(r, 'label')
|
||||
assert hasattr(r, 'confidence')
|
||||
assert hasattr(r, 'features')
|
||||
assert hasattr(r, 'distances')
|
||||
|
||||
def test_label_is_known(self):
|
||||
r = classify_floor_patch(_green_bgr())
|
||||
assert r.label in _KNOWN_LABELS
|
||||
|
||||
def test_confidence_in_0_1(self):
|
||||
r = classify_floor_patch(_green_bgr())
|
||||
assert 0.0 < r.confidence <= 1.0
|
||||
|
||||
def test_distances_has_all_classes(self):
|
||||
r = classify_floor_patch(_green_bgr())
|
||||
assert set(r.distances.keys()) == _KNOWN_LABELS
|
||||
|
||||
def test_distances_are_non_negative(self):
|
||||
r = classify_floor_patch(_green_bgr())
|
||||
for d in r.distances.values():
|
||||
assert d >= 0.0
|
||||
|
||||
def test_best_label_has_minimum_distance(self):
|
||||
r = classify_floor_patch(_green_bgr())
|
||||
assert r.distances[r.label] == min(r.distances.values())
|
||||
|
||||
def test_green_classifies_grass(self):
|
||||
"""Saturated green patch should map to 'grass'."""
|
||||
r = classify_floor_patch(_green_bgr())
|
||||
assert r.label == 'grass', \
|
||||
f'Expected grass, got {r.label} (distances={r.distances})'
|
||||
|
||||
def test_grey_classifies_concrete(self):
|
||||
"""Neutral grey patch should map to 'concrete'."""
|
||||
r = classify_floor_patch(_grey_bgr())
|
||||
assert r.label == 'concrete', \
|
||||
f'Expected concrete, got {r.label} (distances={r.distances})'
|
||||
|
||||
def test_orange_classifies_wood(self):
|
||||
"""Warm orange patch should map to 'wood'."""
|
||||
r = classify_floor_patch(_orange_bgr())
|
||||
assert r.label == 'wood', \
|
||||
f'Expected wood, got {r.label} (distances={r.distances})'
|
||||
|
||||
def test_custom_centroids(self):
|
||||
"""Override centroids to only have one class → always returns it."""
|
||||
custom = {'myfloor': [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]}
|
||||
r = classify_floor_patch(_grey_bgr(), class_centroids=custom)
|
||||
assert r.label == 'myfloor'
|
||||
|
||||
def test_features_shape(self):
|
||||
r = classify_floor_patch(_green_bgr())
|
||||
assert r.features.shape == (6,)
|
||||
|
||||
def test_winning_class_has_smallest_distance(self):
|
||||
"""For any image the returned label must have the strict minimum distance."""
|
||||
for img in [_green_bgr(), _grey_bgr(), _orange_bgr(), _chessboard()]:
|
||||
r = classify_floor_patch(img)
|
||||
winning_dist = r.distances[r.label]
|
||||
for lbl, d in r.distances.items():
|
||||
if lbl != r.label:
|
||||
assert winning_dist <= d, \
|
||||
f'{r.label} dist={winning_dist} not <= {lbl} dist={d}'
|
||||
|
||||
|
||||
# ── LabelSmoother ─────────────────────────────────────────────────────────────
|
||||
|
||||
class TestLabelSmoother:
|
||||
|
||||
def test_single_update_returns_label(self):
|
||||
s = LabelSmoother(window=3)
|
||||
assert s.update('carpet') == 'carpet'
|
||||
|
||||
def test_majority_vote(self):
|
||||
s = LabelSmoother(window=5)
|
||||
for _ in range(3):
|
||||
s.update('tile')
|
||||
for _ in range(2):
|
||||
s.update('carpet')
|
||||
assert s.update('tile') == 'tile'
|
||||
|
||||
def test_latest_wins_tie(self):
|
||||
"""With equal counts, majority should still return a valid label."""
|
||||
s = LabelSmoother(window=4)
|
||||
s.update('carpet')
|
||||
s.update('tile')
|
||||
s.update('carpet')
|
||||
result = s.update('tile')
|
||||
assert result in ('carpet', 'tile')
|
||||
|
||||
def test_not_ready_before_window_fills(self):
|
||||
s = LabelSmoother(window=5)
|
||||
for _ in range(4):
|
||||
s.update('carpet')
|
||||
assert not s.ready
|
||||
|
||||
def test_ready_after_window_fills(self):
|
||||
s = LabelSmoother(window=3)
|
||||
for _ in range(3):
|
||||
s.update('wood')
|
||||
assert s.ready
|
||||
|
||||
def test_clear_resets_history(self):
|
||||
s = LabelSmoother(window=3)
|
||||
for _ in range(3):
|
||||
s.update('concrete')
|
||||
s.clear()
|
||||
assert not s.ready
|
||||
assert s.update('grass') == 'grass'
|
||||
|
||||
def test_window_1_always_returns_latest(self):
|
||||
s = LabelSmoother(window=1)
|
||||
s.update('carpet')
|
||||
assert s.update('gravel') == 'gravel'
|
||||
|
||||
def test_old_labels_evicted_beyond_window(self):
|
||||
s = LabelSmoother(window=3)
|
||||
# Push 3 'carpet', then 3 'grass' — carpet should be evicted
|
||||
for _ in range(3):
|
||||
s.update('carpet')
|
||||
for _ in range(2):
|
||||
s.update('grass')
|
||||
result = s.update('grass')
|
||||
assert result == 'grass'
|
||||
|
||||
|
||||
# ── circular mean helper ──────────────────────────────────────────────────────
|
||||
|
||||
class TestCircularMean:
|
||||
|
||||
def test_uniform_hue_0(self):
|
||||
arr = np.zeros((10, 10))
|
||||
assert _circular_mean(arr) == pytest.approx(0.0, abs=1e-6)
|
||||
|
||||
def test_uniform_hue_half(self):
|
||||
arr = np.full((10, 10), 0.5)
|
||||
assert _circular_mean(arr) == pytest.approx(0.5, abs=1e-6)
|
||||
|
||||
def test_opposite_hues_cancel(self):
|
||||
"""Mean of 0.0 and 0.5 (opposite ends of circle) is ambiguous but finite."""
|
||||
arr = np.array([0.0, 0.5])
|
||||
result = _circular_mean(arr)
|
||||
assert math.isfinite(result)
|
||||
|
||||
def test_result_in_0_1(self):
|
||||
rng = np.random.default_rng(0)
|
||||
arr = rng.uniform(0, 1, size=(20, 20))
|
||||
result = _circular_mean(arr)
|
||||
assert 0.0 <= result <= 1.0
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
293
src/buzzer.c
Normal file
293
src/buzzer.c
Normal file
@ -0,0 +1,293 @@
|
||||
#include "buzzer.h"
|
||||
#include "stm32f7xx_hal.h"
|
||||
#include "config.h"
|
||||
#include <string.h>
|
||||
|
||||
/* ================================================================
|
||||
* Buzzer Hardware Configuration
|
||||
* ================================================================ */
|
||||
|
||||
#define BUZZER_PIN GPIO_PIN_8
|
||||
#define BUZZER_PORT GPIOA
|
||||
#define BUZZER_TIM TIM1
|
||||
#define BUZZER_TIM_CHANNEL TIM_CHANNEL_1
|
||||
#define BUZZER_BASE_FREQ_HZ 1000 /* Base PWM frequency (1kHz) */
|
||||
|
||||
/* ================================================================
|
||||
* Predefined Melodies
|
||||
* ================================================================ */
|
||||
|
||||
/* Startup jingle: C-E-G ascending pattern */
|
||||
const MelodyNote melody_startup[] = {
|
||||
{NOTE_C4, DURATION_QUARTER},
|
||||
{NOTE_E4, DURATION_QUARTER},
|
||||
{NOTE_G4, DURATION_QUARTER},
|
||||
{NOTE_C5, DURATION_HALF},
|
||||
{NOTE_REST, 0} /* Terminator */
|
||||
};
|
||||
|
||||
/* Low battery warning: two descending beeps */
|
||||
const MelodyNote melody_low_battery[] = {
|
||||
{NOTE_A5, DURATION_EIGHTH},
|
||||
{NOTE_REST, DURATION_EIGHTH},
|
||||
{NOTE_A5, DURATION_EIGHTH},
|
||||
{NOTE_REST, DURATION_EIGHTH},
|
||||
{NOTE_F5, DURATION_EIGHTH},
|
||||
{NOTE_REST, DURATION_EIGHTH},
|
||||
{NOTE_F5, DURATION_EIGHTH},
|
||||
{NOTE_REST, 0}
|
||||
};
|
||||
|
||||
/* Error alert: rapid repeating tone */
|
||||
const MelodyNote melody_error[] = {
|
||||
{NOTE_E5, DURATION_SIXTEENTH},
|
||||
{NOTE_REST, DURATION_SIXTEENTH},
|
||||
{NOTE_E5, DURATION_SIXTEENTH},
|
||||
{NOTE_REST, DURATION_SIXTEENTH},
|
||||
{NOTE_E5, DURATION_SIXTEENTH},
|
||||
{NOTE_REST, DURATION_SIXTEENTH},
|
||||
{NOTE_REST, 0}
|
||||
};
|
||||
|
||||
/* Docking complete: cheerful ascending chime */
|
||||
const MelodyNote melody_docking_complete[] = {
|
||||
{NOTE_C4, DURATION_EIGHTH},
|
||||
{NOTE_E4, DURATION_EIGHTH},
|
||||
{NOTE_G4, DURATION_EIGHTH},
|
||||
{NOTE_C5, DURATION_QUARTER},
|
||||
{NOTE_REST, DURATION_QUARTER},
|
||||
{NOTE_G4, DURATION_EIGHTH},
|
||||
{NOTE_C5, DURATION_HALF},
|
||||
{NOTE_REST, 0}
|
||||
};
|
||||
|
||||
/* ================================================================
|
||||
* Melody Queue
|
||||
* ================================================================ */
|
||||
|
||||
#define MELODY_QUEUE_SIZE 4
|
||||
|
||||
typedef struct {
|
||||
const MelodyNote *notes; /* Melody sequence pointer */
|
||||
uint16_t note_index; /* Current note in sequence */
|
||||
uint32_t note_start_ms; /* When current note started */
|
||||
uint32_t note_duration_ms; /* Duration of current note */
|
||||
uint16_t current_frequency; /* Current tone frequency (Hz) */
|
||||
bool is_custom; /* Is this a custom melody? */
|
||||
} MelodyPlayback;
|
||||
|
||||
typedef struct {
|
||||
MelodyPlayback queue[MELODY_QUEUE_SIZE];
|
||||
uint8_t write_index;
|
||||
uint8_t read_index;
|
||||
uint8_t count;
|
||||
} MelodyQueue;
|
||||
|
||||
static MelodyQueue s_queue = {0};
|
||||
static MelodyPlayback s_current = {0};
|
||||
static uint32_t s_last_tick_ms = 0;
|
||||
|
||||
/* ================================================================
|
||||
* Hardware Initialization
|
||||
* ================================================================ */
|
||||
|
||||
void buzzer_init(void)
|
||||
{
|
||||
/* Enable GPIO and timer clocks */
|
||||
__HAL_RCC_GPIOA_CLK_ENABLE();
|
||||
__HAL_RCC_TIM1_CLK_ENABLE();
|
||||
|
||||
/* Configure PA8 as TIM1_CH1 PWM output */
|
||||
GPIO_InitTypeDef gpio_init = {0};
|
||||
gpio_init.Pin = BUZZER_PIN;
|
||||
gpio_init.Mode = GPIO_MODE_AF_PP;
|
||||
gpio_init.Pull = GPIO_NOPULL;
|
||||
gpio_init.Speed = GPIO_SPEED_HIGH;
|
||||
gpio_init.Alternate = GPIO_AF1_TIM1;
|
||||
HAL_GPIO_Init(BUZZER_PORT, &gpio_init);
|
||||
|
||||
/* Configure TIM1 for PWM:
|
||||
* Clock: 216MHz / PSC = output frequency
|
||||
* For 1kHz base frequency: PSC = 216, ARR = 1000
|
||||
* Duty cycle = CCR / ARR (e.g., 500/1000 = 50%)
|
||||
*/
|
||||
TIM_HandleTypeDef htim1 = {0};
|
||||
htim1.Instance = BUZZER_TIM;
|
||||
htim1.Init.Prescaler = 216 - 1; /* 216MHz / 216 = 1MHz clock */
|
||||
htim1.Init.CounterMode = TIM_COUNTERMODE_UP;
|
||||
htim1.Init.Period = (1000000 / BUZZER_BASE_FREQ_HZ) - 1; /* 1kHz = 1000 counts */
|
||||
htim1.Init.ClockDivision = TIM_CLOCKDIVISION_DIV1;
|
||||
htim1.Init.RepetitionCounter = 0;
|
||||
HAL_TIM_PWM_Init(&htim1);
|
||||
|
||||
/* Configure PWM on CH1: 50% duty cycle initially (silence will be 0%) */
|
||||
TIM_OC_InitTypeDef oc_init = {0};
|
||||
oc_init.OCMode = TIM_OCMODE_PWM1;
|
||||
oc_init.Pulse = 0; /* Start at 0% duty (silence) */
|
||||
oc_init.OCPolarity = TIM_OCPOLARITY_HIGH;
|
||||
oc_init.OCFastMode = TIM_OCFAST_DISABLE;
|
||||
HAL_TIM_PWM_ConfigChannel(&htim1, &oc_init, BUZZER_TIM_CHANNEL);
|
||||
|
||||
/* Start PWM generation */
|
||||
HAL_TIM_PWM_Start(BUZZER_TIM, BUZZER_TIM_CHANNEL);
|
||||
|
||||
/* Initialize queue */
|
||||
memset(&s_queue, 0, sizeof(s_queue));
|
||||
memset(&s_current, 0, sizeof(s_current));
|
||||
s_last_tick_ms = 0;
|
||||
}
|
||||
|
||||
/* ================================================================
|
||||
* Public API
|
||||
* ================================================================ */
|
||||
|
||||
bool buzzer_play_melody(MelodyType melody_type)
|
||||
{
|
||||
const MelodyNote *notes = NULL;
|
||||
|
||||
switch (melody_type) {
|
||||
case MELODY_STARTUP:
|
||||
notes = melody_startup;
|
||||
break;
|
||||
case MELODY_LOW_BATTERY:
|
||||
notes = melody_low_battery;
|
||||
break;
|
||||
case MELODY_ERROR:
|
||||
notes = melody_error;
|
||||
break;
|
||||
case MELODY_DOCKING_COMPLETE:
|
||||
notes = melody_docking_complete;
|
||||
break;
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
|
||||
return buzzer_play_custom(notes);
|
||||
}
|
||||
|
||||
bool buzzer_play_custom(const MelodyNote *notes)
|
||||
{
|
||||
if (!notes || s_queue.count >= MELODY_QUEUE_SIZE) {
|
||||
return false;
|
||||
}
|
||||
|
||||
MelodyPlayback *playback = &s_queue.queue[s_queue.write_index];
|
||||
memset(playback, 0, sizeof(*playback));
|
||||
playback->notes = notes;
|
||||
playback->note_index = 0;
|
||||
playback->is_custom = true;
|
||||
|
||||
s_queue.write_index = (s_queue.write_index + 1) % MELODY_QUEUE_SIZE;
|
||||
s_queue.count++;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool buzzer_play_tone(uint16_t frequency, uint16_t duration_ms)
|
||||
{
|
||||
if (s_queue.count >= MELODY_QUEUE_SIZE) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Create a simple 2-note melody: tone + rest */
|
||||
static MelodyNote temp_notes[3];
|
||||
temp_notes[0].frequency = frequency;
|
||||
temp_notes[0].duration_ms = duration_ms;
|
||||
temp_notes[1].frequency = NOTE_REST;
|
||||
temp_notes[1].duration_ms = 0;
|
||||
|
||||
MelodyPlayback *playback = &s_queue.queue[s_queue.write_index];
|
||||
memset(playback, 0, sizeof(*playback));
|
||||
playback->notes = temp_notes;
|
||||
playback->note_index = 0;
|
||||
playback->is_custom = true;
|
||||
|
||||
s_queue.write_index = (s_queue.write_index + 1) % MELODY_QUEUE_SIZE;
|
||||
s_queue.count++;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void buzzer_stop(void)
|
||||
{
|
||||
/* Clear queue and current playback */
|
||||
memset(&s_queue, 0, sizeof(s_queue));
|
||||
memset(&s_current, 0, sizeof(s_current));
|
||||
|
||||
/* Silence buzzer (0% duty cycle) */
|
||||
TIM1->CCR1 = 0;
|
||||
}
|
||||
|
||||
bool buzzer_is_playing(void)
|
||||
{
|
||||
return (s_current.notes != NULL) || (s_queue.count > 0);
|
||||
}
|
||||
|
||||
/* ================================================================
|
||||
* Timer Update and PWM Frequency Control
|
||||
* ================================================================ */
|
||||
|
||||
static void buzzer_set_frequency(uint16_t frequency)
|
||||
{
|
||||
if (frequency == 0) {
|
||||
/* Silence: 0% duty cycle */
|
||||
TIM1->CCR1 = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
/* Set PWM frequency and 50% duty cycle
|
||||
* TIM1 clock: 1MHz (after prescaler)
|
||||
* ARR = 1MHz / frequency
|
||||
* CCR1 = ARR / 2 (50% duty)
|
||||
*/
|
||||
uint32_t arr = (1000000 / frequency);
|
||||
if (arr > 65535) arr = 65535; /* Clamp to 16-bit */
|
||||
|
||||
TIM1->ARR = arr - 1;
|
||||
TIM1->CCR1 = arr / 2; /* 50% duty cycle for all tones */
|
||||
}
|
||||
|
||||
void buzzer_tick(uint32_t now_ms)
|
||||
{
|
||||
/* Check if current note has finished */
|
||||
if (s_current.notes != NULL) {
|
||||
uint32_t elapsed = now_ms - s_current.note_start_ms;
|
||||
|
||||
if (elapsed >= s_current.note_duration_ms) {
|
||||
/* Move to next note */
|
||||
s_current.note_index++;
|
||||
|
||||
if (s_current.notes[s_current.note_index].duration_ms == 0) {
|
||||
/* End of melody sequence */
|
||||
s_current.notes = NULL;
|
||||
buzzer_set_frequency(0);
|
||||
|
||||
/* Start next queued melody if available */
|
||||
if (s_queue.count > 0) {
|
||||
s_current = s_queue.queue[s_queue.read_index];
|
||||
s_queue.read_index = (s_queue.read_index + 1) % MELODY_QUEUE_SIZE;
|
||||
s_queue.count--;
|
||||
s_current.note_start_ms = now_ms;
|
||||
s_current.note_duration_ms = s_current.notes[0].duration_ms;
|
||||
buzzer_set_frequency(s_current.notes[0].frequency);
|
||||
}
|
||||
} else {
|
||||
/* Play next note */
|
||||
s_current.note_start_ms = now_ms;
|
||||
s_current.note_duration_ms = s_current.notes[s_current.note_index].duration_ms;
|
||||
uint16_t frequency = s_current.notes[s_current.note_index].frequency;
|
||||
buzzer_set_frequency(frequency);
|
||||
}
|
||||
}
|
||||
} else if (s_queue.count > 0 && s_current.notes == NULL) {
|
||||
/* Start first queued melody */
|
||||
s_current = s_queue.queue[s_queue.read_index];
|
||||
s_queue.read_index = (s_queue.read_index + 1) % MELODY_QUEUE_SIZE;
|
||||
s_queue.count--;
|
||||
s_current.note_start_ms = now_ms;
|
||||
s_current.note_duration_ms = s_current.notes[0].duration_ms;
|
||||
buzzer_set_frequency(s_current.notes[0].frequency);
|
||||
}
|
||||
|
||||
s_last_tick_ms = now_ms;
|
||||
}
|
||||
309
test/test_buzzer.c
Normal file
309
test/test_buzzer.c
Normal file
@ -0,0 +1,309 @@
|
||||
/*
|
||||
* test_buzzer.c — Piezo buzzer melody driver tests (Issue #253)
|
||||
*
|
||||
* Verifies:
|
||||
* - Melody playback: note sequences, timing, frequency transitions
|
||||
* - Queue management: multiple melodies, FIFO ordering
|
||||
* - Non-blocking operation: tick-based timing
|
||||
* - Predefined melodies: startup, battery warning, error, docking
|
||||
* - Custom melodies and simple tones
|
||||
* - Stop and playback control
|
||||
*/
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
#include <string.h>
|
||||
|
||||
/* ── Melody Definitions (from buzzer.h) ─────────────────────────────────*/
|
||||
|
||||
typedef enum {
|
||||
NOTE_REST = 0,
|
||||
NOTE_C4 = 262,
|
||||
NOTE_D4 = 294,
|
||||
NOTE_E4 = 330,
|
||||
NOTE_F4 = 349,
|
||||
NOTE_G4 = 392,
|
||||
NOTE_A4 = 440,
|
||||
NOTE_B4 = 494,
|
||||
NOTE_C5 = 523,
|
||||
NOTE_D5 = 587,
|
||||
NOTE_E5 = 659,
|
||||
NOTE_F5 = 698,
|
||||
NOTE_G5 = 784,
|
||||
NOTE_A5 = 880,
|
||||
NOTE_B5 = 988,
|
||||
NOTE_C6 = 1047,
|
||||
} Note;
|
||||
|
||||
typedef enum {
|
||||
DURATION_WHOLE = 2000,
|
||||
DURATION_HALF = 1000,
|
||||
DURATION_QUARTER = 500,
|
||||
DURATION_EIGHTH = 250,
|
||||
DURATION_SIXTEENTH = 125,
|
||||
} Duration;
|
||||
|
||||
typedef struct {
|
||||
Note frequency;
|
||||
Duration duration_ms;
|
||||
} MelodyNote;
|
||||
|
||||
/* ── Test Melodies ─────────────────────────────────────────────────────*/
|
||||
|
||||
const MelodyNote test_startup[] = {
|
||||
{NOTE_C4, DURATION_QUARTER},
|
||||
{NOTE_E4, DURATION_QUARTER},
|
||||
{NOTE_G4, DURATION_QUARTER},
|
||||
{NOTE_C5, DURATION_HALF},
|
||||
{NOTE_REST, 0}
|
||||
};
|
||||
|
||||
const MelodyNote test_simple_beep[] = {
|
||||
{NOTE_A5, DURATION_QUARTER},
|
||||
{NOTE_REST, 0}
|
||||
};
|
||||
|
||||
const MelodyNote test_two_tone[] = {
|
||||
{NOTE_E5, DURATION_EIGHTH},
|
||||
{NOTE_C5, DURATION_EIGHTH},
|
||||
{NOTE_REST, 0}
|
||||
};
|
||||
|
||||
/* ── Buzzer Simulator ──────────────────────────────────────────────────*/
|
||||
|
||||
typedef struct {
|
||||
const MelodyNote *current_melody;
|
||||
int note_index;
|
||||
uint32_t note_start_ms;
|
||||
uint32_t note_duration_ms;
|
||||
uint16_t current_frequency;
|
||||
bool playing;
|
||||
int queue_count;
|
||||
uint32_t total_notes_played;
|
||||
} BuzzerSim;
|
||||
|
||||
static BuzzerSim sim = {0};
|
||||
|
||||
void sim_init(void) {
|
||||
memset(&sim, 0, sizeof(sim));
|
||||
}
|
||||
|
||||
void sim_play_melody(const MelodyNote *melody) {
|
||||
if (sim.playing && sim.current_melody != NULL) {
|
||||
sim.queue_count++;
|
||||
return;
|
||||
}
|
||||
sim.current_melody = melody;
|
||||
sim.note_index = 0;
|
||||
sim.playing = true;
|
||||
sim.note_start_ms = (uint32_t)-1;
|
||||
if (melody && melody[0].duration_ms > 0) {
|
||||
sim.note_duration_ms = melody[0].duration_ms;
|
||||
sim.current_frequency = melody[0].frequency;
|
||||
}
|
||||
}
|
||||
|
||||
void sim_stop(void) {
|
||||
sim.current_melody = NULL;
|
||||
sim.playing = false;
|
||||
sim.current_frequency = 0;
|
||||
sim.queue_count = 0;
|
||||
}
|
||||
|
||||
void sim_tick(uint32_t now_ms) {
|
||||
if (!sim.playing || !sim.current_melody) return;
|
||||
if (sim.note_start_ms == (uint32_t)-1) sim.note_start_ms = now_ms;
|
||||
uint32_t elapsed = now_ms - sim.note_start_ms;
|
||||
if (elapsed >= sim.note_duration_ms) {
|
||||
sim.total_notes_played++;
|
||||
sim.note_index++;
|
||||
if (sim.current_melody[sim.note_index].duration_ms == 0) {
|
||||
sim.playing = false;
|
||||
sim.current_melody = NULL;
|
||||
sim.current_frequency = 0;
|
||||
} else {
|
||||
sim.note_start_ms = now_ms;
|
||||
sim.note_duration_ms = sim.current_melody[sim.note_index].duration_ms;
|
||||
sim.current_frequency = sim.current_melody[sim.note_index].frequency;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* ── Unit Tests ────────────────────────────────────────────────────────*/
|
||||
|
||||
static int test_count = 0, test_passed = 0, test_failed = 0;
|
||||
|
||||
#define TEST(name) do { test_count++; printf("\n TEST %d: %s\n", test_count, name); } while(0)
|
||||
#define ASSERT(cond, msg) do { if (cond) { test_passed++; printf(" ✓ %s\n", msg); } else { test_failed++; printf(" ✗ %s\n", msg); } } while(0)
|
||||
|
||||
void test_melody_structure(void) {
|
||||
TEST("Melody structure validation");
|
||||
ASSERT(test_startup[0].frequency == NOTE_C4, "Startup starts at C4");
|
||||
ASSERT(test_startup[0].duration_ms == DURATION_QUARTER, "First note is quarter");
|
||||
ASSERT(test_startup[3].frequency == NOTE_C5, "Startup ends at C5");
|
||||
ASSERT(test_startup[4].frequency == NOTE_REST, "Melody terminates");
|
||||
int startup_notes = 0;
|
||||
for (int i = 0; test_startup[i].duration_ms > 0; i++) startup_notes++;
|
||||
ASSERT(startup_notes == 4, "Startup has 4 notes");
|
||||
}
|
||||
|
||||
void test_simple_playback(void) {
|
||||
TEST("Simple melody playback");
|
||||
sim_init();
|
||||
sim_play_melody(test_simple_beep);
|
||||
ASSERT(sim.playing == true, "Playback starts");
|
||||
ASSERT(sim.current_frequency == NOTE_A5, "First note is A5");
|
||||
ASSERT(sim.note_index == 0, "Index starts at 0");
|
||||
sim_tick(100);
|
||||
ASSERT(sim.playing == true, "Still playing after first tick");
|
||||
sim_tick(650);
|
||||
ASSERT(sim.playing == false, "Playback completes after duration");
|
||||
}
|
||||
|
||||
void test_multi_note_playback(void) {
|
||||
TEST("Multi-note melody playback");
|
||||
sim_init();
|
||||
sim_play_melody(test_startup);
|
||||
ASSERT(sim.playing == true, "Playback starts");
|
||||
ASSERT(sim.note_index == 0, "Index at first note");
|
||||
ASSERT(sim.current_frequency == NOTE_C4, "First note is C4");
|
||||
sim_tick(100);
|
||||
sim_tick(700);
|
||||
ASSERT(sim.note_index == 1, "Advanced to second note");
|
||||
sim_tick(1300);
|
||||
ASSERT(sim.note_index == 2, "Advanced to third note");
|
||||
sim_tick(1900);
|
||||
ASSERT(sim.note_index == 3, "Advanced to fourth note");
|
||||
sim_tick(3100);
|
||||
ASSERT(sim.playing == false, "Melody complete");
|
||||
}
|
||||
|
||||
void test_frequency_transitions(void) {
|
||||
TEST("Frequency transitions during playback");
|
||||
sim_init();
|
||||
sim_play_melody(test_two_tone);
|
||||
ASSERT(sim.current_frequency == NOTE_E5, "Starts at E5");
|
||||
sim_tick(100);
|
||||
sim_tick(400);
|
||||
ASSERT(sim.note_index == 1, "Advanced to second note");
|
||||
ASSERT(sim.current_frequency == NOTE_C5, "Now playing C5");
|
||||
sim_tick(700);
|
||||
ASSERT(sim.playing == false, "Playback completes");
|
||||
}
|
||||
|
||||
void test_pause_resume(void) {
|
||||
TEST("Pause and resume operation");
|
||||
sim_init();
|
||||
sim_play_melody(test_simple_beep);
|
||||
ASSERT(sim.playing == true, "Playing starts");
|
||||
sim_stop();
|
||||
ASSERT(sim.playing == false, "Stop silences buzzer");
|
||||
ASSERT(sim.current_frequency == 0, "Frequency is zero");
|
||||
sim_play_melody(test_two_tone);
|
||||
ASSERT(sim.playing == true, "Resume works");
|
||||
ASSERT(sim.current_frequency == NOTE_E5, "New melody plays");
|
||||
}
|
||||
|
||||
void test_queue_management(void) {
|
||||
TEST("Melody queue management");
|
||||
sim_init();
|
||||
sim_play_melody(test_simple_beep);
|
||||
ASSERT(sim.playing == true, "First melody playing");
|
||||
ASSERT(sim.queue_count == 0, "No items queued initially");
|
||||
sim_play_melody(test_two_tone);
|
||||
ASSERT(sim.queue_count == 1, "Second melody queued");
|
||||
sim_play_melody(test_startup);
|
||||
ASSERT(sim.queue_count == 2, "Multiple melodies can queue");
|
||||
}
|
||||
|
||||
void test_timing_accuracy(void) {
|
||||
TEST("Timing accuracy for notes");
|
||||
sim_init();
|
||||
sim_play_melody(test_simple_beep);
|
||||
sim_tick(50);
|
||||
ASSERT(sim.playing == true, "Still playing on first tick");
|
||||
sim_tick(600);
|
||||
ASSERT(sim.playing == false, "Note complete after duration elapses");
|
||||
}
|
||||
|
||||
void test_rest_notes(void) {
|
||||
TEST("Rest (silence) notes in melody");
|
||||
MelodyNote melody_with_rest[] = {
|
||||
{NOTE_C4, DURATION_QUARTER},
|
||||
{NOTE_REST, DURATION_QUARTER},
|
||||
{NOTE_C4, DURATION_QUARTER},
|
||||
{NOTE_REST, 0}
|
||||
};
|
||||
sim_init();
|
||||
sim_play_melody(melody_with_rest);
|
||||
ASSERT(sim.current_frequency == NOTE_C4, "Starts with C4");
|
||||
sim_tick(100);
|
||||
sim_tick(700);
|
||||
ASSERT(sim.note_index == 1, "Advanced to rest");
|
||||
ASSERT(sim.current_frequency == NOTE_REST, "Rest note active");
|
||||
sim_tick(1300);
|
||||
ASSERT(sim.current_frequency == NOTE_C4, "Back to C4 after rest");
|
||||
sim_tick(1900);
|
||||
ASSERT(sim.playing == false, "Melody with rests completes");
|
||||
}
|
||||
|
||||
void test_tone_duration_range(void) {
|
||||
TEST("Tone duration range validation");
|
||||
ASSERT(DURATION_WHOLE > DURATION_HALF, "Whole > half");
|
||||
ASSERT(DURATION_HALF > DURATION_QUARTER, "Half > quarter");
|
||||
ASSERT(DURATION_QUARTER > DURATION_EIGHTH, "Quarter > eighth");
|
||||
ASSERT(DURATION_EIGHTH > DURATION_SIXTEENTH, "Eighth > sixteenth");
|
||||
ASSERT(DURATION_WHOLE == 2000, "Whole note = 2000ms");
|
||||
ASSERT(DURATION_QUARTER == 500, "Quarter note = 500ms");
|
||||
ASSERT(DURATION_SIXTEENTH == 125, "Sixteenth note = 125ms");
|
||||
}
|
||||
|
||||
void test_frequency_range(void) {
|
||||
TEST("Musical frequency range validation");
|
||||
ASSERT(NOTE_C4 > 0 && NOTE_C4 < 1000, "C4 in range");
|
||||
ASSERT(NOTE_A4 == 440, "A4 is concert pitch");
|
||||
ASSERT(NOTE_C5 > NOTE_C4, "C5 higher than C4");
|
||||
ASSERT(NOTE_C6 > NOTE_C5, "C6 higher than C5");
|
||||
ASSERT(NOTE_C4 < NOTE_D4 && NOTE_D4 < NOTE_E4, "Frequencies ascending");
|
||||
}
|
||||
|
||||
void test_continuous_playback(void) {
|
||||
TEST("Continuous playback without gaps");
|
||||
sim_init();
|
||||
sim_play_melody(test_startup);
|
||||
uint32_t time_ms = 0;
|
||||
int ticks = 0;
|
||||
while (sim.playing && ticks < 100) {
|
||||
sim_tick(time_ms);
|
||||
time_ms += 100;
|
||||
ticks++;
|
||||
}
|
||||
ASSERT(!sim.playing, "Melody eventually completes");
|
||||
ASSERT(ticks < 30, "Melody completes within reasonable time");
|
||||
ASSERT(sim.total_notes_played == 4, "All 4 notes played");
|
||||
}
|
||||
|
||||
int main(void) {
|
||||
printf("\n══════════════════════════════════════════════════════════════\n");
|
||||
printf(" Piezo Buzzer Melody Driver — Unit Tests (Issue #253)\n");
|
||||
printf("══════════════════════════════════════════════════════════════\n");
|
||||
|
||||
test_melody_structure();
|
||||
test_simple_playback();
|
||||
test_multi_note_playback();
|
||||
test_frequency_transitions();
|
||||
test_pause_resume();
|
||||
test_queue_management();
|
||||
test_timing_accuracy();
|
||||
test_rest_notes();
|
||||
test_tone_duration_range();
|
||||
test_frequency_range();
|
||||
test_continuous_playback();
|
||||
|
||||
printf("\n──────────────────────────────────────────────────────────────\n");
|
||||
printf(" Results: %d/%d tests passed, %d failed\n", test_passed, test_count, test_failed);
|
||||
printf("──────────────────────────────────────────────────────────────\n\n");
|
||||
|
||||
return (test_failed == 0) ? 0 : 1;
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user