Compare commits
1 Commits
4730ea46d1
...
c00688cd7e
| Author | SHA1 | Date | |
|---|---|---|---|
| c00688cd7e |
@ -0,0 +1,10 @@
|
||||
# Motor protection configuration
|
||||
|
||||
motor_protection:
|
||||
ros__parameters:
|
||||
# Overcurrent limits (Amps)
|
||||
soft_current_limit: 5.0 # Warning threshold, trigger derating
|
||||
hard_current_limit: 8.0 # Fault threshold, trigger emergency stop
|
||||
|
||||
# Protection monitoring frequency (Hz)
|
||||
frequency: 50 # 50 Hz = 20ms cycle
|
||||
@ -0,0 +1,36 @@
|
||||
"""Launch file for motor_protection_node."""
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch_ros.actions import Node
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
"""Generate launch description for motor protection."""
|
||||
# Package directory
|
||||
pkg_dir = get_package_share_directory("saltybot_motor_protection")
|
||||
|
||||
# Parameters
|
||||
config_file = os.path.join(pkg_dir, "config", "protection_config.yaml")
|
||||
|
||||
# Declare launch arguments
|
||||
return LaunchDescription(
|
||||
[
|
||||
DeclareLaunchArgument(
|
||||
"config_file",
|
||||
default_value=config_file,
|
||||
description="Path to configuration YAML file",
|
||||
),
|
||||
# Motor protection node
|
||||
Node(
|
||||
package="saltybot_motor_protection",
|
||||
executable="motor_protection_node",
|
||||
name="motor_protection",
|
||||
output="screen",
|
||||
parameters=[LaunchConfiguration("config_file")],
|
||||
),
|
||||
]
|
||||
)
|
||||
29
jetson/ros2_ws/src/saltybot_motor_protection/package.xml
Normal file
29
jetson/ros2_ws/src/saltybot_motor_protection/package.xml
Normal file
@ -0,0 +1,29 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_motor_protection</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
Motor current protection node for SaltyBot: overcurrent detection (soft 5A, hard 8A)
|
||||
and thermal derating using I^2R model. Monitors motor current, calculates thermal
|
||||
state, applies speed derating, and triggers emergency stop on hard fault.
|
||||
</description>
|
||||
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>std_srvs</depend>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,263 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Motor current protection node for SaltyBot.
|
||||
|
||||
Monitors motor current, detects overcurrent conditions (soft 5A, hard 8A),
|
||||
models thermal state using I^2R model, and applies speed derating.
|
||||
Triggers emergency stop on hard fault.
|
||||
|
||||
Published topics:
|
||||
/saltybot/motor_protection (std_msgs/String) - JSON protection status
|
||||
/saltybot/speed_limit (std_msgs/Float32) - Thermal derating factor [0, 1]
|
||||
|
||||
Subscribed topics:
|
||||
/saltybot/motor_current_left (std_msgs/Float32) - Left motor current (A)
|
||||
/saltybot/motor_current_right (std_msgs/Float32) - Right motor current (A)
|
||||
"""
|
||||
|
||||
import json
|
||||
import math
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.timer import Timer
|
||||
from geometry_msgs.msg import Twist
|
||||
from std_msgs.msg import Float32, String
|
||||
|
||||
|
||||
class ProtectionState(Enum):
|
||||
"""Motor protection state."""
|
||||
NORMAL = 0
|
||||
SOFT_FAULT = 1 # Soft overcurrent
|
||||
HARD_FAULT = 2 # Hard overcurrent
|
||||
|
||||
|
||||
@dataclass
|
||||
class ThermalModel:
|
||||
"""I^2R thermal model parameters."""
|
||||
motor_resistance: float = 1.5 # Ohms (motor winding resistance)
|
||||
thermal_mass: float = 100.0 # J/°C (thermal capacitance)
|
||||
ambient_temp: float = 25.0 # °C
|
||||
max_temp: float = 80.0 # °C (safe operating limit)
|
||||
cooling_constant: float = 0.05 # 1/s (exponential cooling rate)
|
||||
|
||||
# Derived
|
||||
motor_temp: float = 25.0 # Current motor temperature
|
||||
|
||||
|
||||
class MotorProtectionNode(Node):
|
||||
"""ROS2 node for motor current protection."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("motor_protection")
|
||||
|
||||
# Parameters
|
||||
self.declare_parameter("soft_current_limit", 5.0) # Amps
|
||||
self.declare_parameter("hard_current_limit", 8.0) # Amps
|
||||
self.declare_parameter("frequency", 50) # Hz
|
||||
|
||||
self.soft_limit = self.get_parameter("soft_current_limit").value
|
||||
self.hard_limit = self.get_parameter("hard_current_limit").value
|
||||
frequency = self.get_parameter("frequency").value
|
||||
|
||||
# Current measurements
|
||||
self.current_left = 0.0
|
||||
self.current_right = 0.0
|
||||
self.current_max = 0.0 # Maximum of the two motors
|
||||
|
||||
# Thermal model
|
||||
self.thermal = ThermalModel()
|
||||
|
||||
# Protection state
|
||||
self.state = ProtectionState.NORMAL
|
||||
self.soft_fault_duration = 0.0 # Time in soft fault state
|
||||
|
||||
# Subscriptions
|
||||
self.create_subscription(
|
||||
Float32, "/saltybot/motor_current_left", self._on_current_left, 10
|
||||
)
|
||||
self.create_subscription(
|
||||
Float32, "/saltybot/motor_current_right", self._on_current_right, 10
|
||||
)
|
||||
|
||||
# Publications
|
||||
self.pub_protection = self.create_publisher(
|
||||
String, "/saltybot/motor_protection", 10
|
||||
)
|
||||
self.pub_speed_limit = self.create_publisher(
|
||||
Float32, "/saltybot/speed_limit", 10
|
||||
)
|
||||
|
||||
# Timer for monitoring at 50Hz
|
||||
period = 1.0 / frequency
|
||||
self.timer: Timer = self.create_timer(period, self._timer_callback)
|
||||
|
||||
self.get_logger().info(
|
||||
f"Motor protection initialized at {frequency}Hz. "
|
||||
f"Soft limit: {self.soft_limit}A, Hard limit: {self.hard_limit}A"
|
||||
)
|
||||
|
||||
def _on_current_left(self, msg: Float32) -> None:
|
||||
"""Update left motor current."""
|
||||
self.current_left = max(0.0, msg.data) # Ensure non-negative
|
||||
|
||||
def _on_current_right(self, msg: Float32) -> None:
|
||||
"""Update right motor current."""
|
||||
self.current_right = max(0.0, msg.data) # Ensure non-negative
|
||||
|
||||
def _timer_callback(self) -> None:
|
||||
"""Monitor motor current and thermal state at 50Hz."""
|
||||
dt = 0.02 # 50Hz = 20ms
|
||||
|
||||
# Get maximum current (worst case)
|
||||
self.current_max = max(self.current_left, self.current_right)
|
||||
|
||||
# Update thermal state
|
||||
self._update_thermal_state(dt)
|
||||
|
||||
# Check overcurrent conditions
|
||||
self._check_overcurrent()
|
||||
|
||||
# Calculate speed derate factor
|
||||
derate_factor = self._calculate_derate()
|
||||
|
||||
# Publish status
|
||||
self._publish_protection_status()
|
||||
self._publish_speed_limit(derate_factor)
|
||||
|
||||
def _update_thermal_state(self, dt: float) -> None:
|
||||
"""Update motor temperature using I^2R thermal model.
|
||||
|
||||
Model: dT/dt = (P_loss - P_cool) / C_thermal
|
||||
where:
|
||||
P_loss = I^2 * R (Joule heating)
|
||||
P_cool = h * (T - T_ambient) (convective cooling)
|
||||
C_thermal = thermal mass / dt
|
||||
"""
|
||||
# Heat generation: P_loss = I^2 * R (both motors)
|
||||
p_loss_left = self.current_left**2 * self.thermal.motor_resistance
|
||||
p_loss_right = self.current_right**2 * self.thermal.motor_resistance
|
||||
p_loss_total = p_loss_left + p_loss_right # Combined heat
|
||||
|
||||
# Passive cooling (exponential decay)
|
||||
temp_diff = self.thermal.motor_temp - self.thermal.ambient_temp
|
||||
p_cool = self.thermal.cooling_constant * temp_diff
|
||||
|
||||
# Temperature change
|
||||
dtemp_dt = (p_loss_total - p_cool) / self.thermal.thermal_mass
|
||||
|
||||
# Update temperature (with saturation)
|
||||
new_temp = self.thermal.motor_temp + dtemp_dt * dt
|
||||
self.thermal.motor_temp = max(self.thermal.ambient_temp, new_temp)
|
||||
|
||||
self.get_logger().debug(
|
||||
f"Thermal: T={self.thermal.motor_temp:.1f}°C, "
|
||||
f"P_loss={p_loss_total:.1f}W, P_cool={p_cool:.1f}W"
|
||||
)
|
||||
|
||||
def _check_overcurrent(self) -> None:
|
||||
"""Check for overcurrent conditions and update protection state."""
|
||||
if self.current_max >= self.hard_limit:
|
||||
# Hard fault: immediate stop
|
||||
if self.state != ProtectionState.HARD_FAULT:
|
||||
self.get_logger().error(
|
||||
f"HARD OVERCURRENT FAULT: {self.current_max:.2f}A >= {self.hard_limit}A"
|
||||
)
|
||||
self.state = ProtectionState.HARD_FAULT
|
||||
self.soft_fault_duration = 0.0
|
||||
|
||||
elif self.current_max >= self.soft_limit:
|
||||
# Soft fault: warning and derating
|
||||
if self.state == ProtectionState.NORMAL:
|
||||
self.get_logger().warn(
|
||||
f"Soft overcurrent: {self.current_max:.2f}A >= {self.soft_limit}A"
|
||||
)
|
||||
self.state = ProtectionState.SOFT_FAULT
|
||||
self.soft_fault_duration += 0.02 # 50Hz = 20ms
|
||||
|
||||
else:
|
||||
# Back to normal
|
||||
if self.state != ProtectionState.NORMAL:
|
||||
self.get_logger().info("Overcurrent cleared, resuming normal operation")
|
||||
self.state = ProtectionState.NORMAL
|
||||
self.soft_fault_duration = 0.0
|
||||
|
||||
def _calculate_derate(self) -> float:
|
||||
"""Calculate speed derating factor based on thermal state.
|
||||
|
||||
Returns:
|
||||
float: Speed derate factor in [0, 1]
|
||||
"""
|
||||
# Hard fault: complete stop
|
||||
if self.state == ProtectionState.HARD_FAULT:
|
||||
return 0.0
|
||||
|
||||
# Soft fault: reduce speed based on current and temperature
|
||||
soft_derate = 1.0
|
||||
if self.state == ProtectionState.SOFT_FAULT:
|
||||
# Reduce speed proportionally to current above soft limit
|
||||
current_excess = (self.current_max - self.soft_limit) / (
|
||||
self.hard_limit - self.soft_limit
|
||||
)
|
||||
soft_derate = max(0.1, 1.0 - current_excess * 0.5) # Min 10% speed
|
||||
|
||||
# Thermal derating: reduce speed as temperature increases
|
||||
temp_range = self.thermal.max_temp - self.thermal.ambient_temp
|
||||
if temp_range > 0:
|
||||
temp_ratio = (
|
||||
self.thermal.motor_temp - self.thermal.ambient_temp
|
||||
) / temp_range
|
||||
|
||||
# Linear derating: full speed at ambient, zero at max
|
||||
if temp_ratio > 1.0:
|
||||
thermal_derate = 0.0
|
||||
elif temp_ratio > 0.8:
|
||||
# Aggressive derating near limit
|
||||
thermal_derate = (1.0 - temp_ratio) / 0.2
|
||||
else:
|
||||
thermal_derate = 1.0
|
||||
else:
|
||||
thermal_derate = 1.0
|
||||
|
||||
# Combined derate: use minimum of both
|
||||
derate = min(soft_derate, thermal_derate)
|
||||
return max(0.0, min(1.0, derate)) # Clamp to [0, 1]
|
||||
|
||||
def _publish_protection_status(self) -> None:
|
||||
"""Publish motor protection status as JSON."""
|
||||
status = {
|
||||
"timestamp": self.get_clock().now().nanoseconds / 1e9,
|
||||
"state": self.state.name,
|
||||
"current_left": float(self.current_left),
|
||||
"current_right": float(self.current_right),
|
||||
"current_max": float(self.current_max),
|
||||
"motor_temp": float(self.thermal.motor_temp),
|
||||
"soft_limit": self.soft_limit,
|
||||
"hard_limit": self.hard_limit,
|
||||
"soft_fault_duration": float(self.soft_fault_duration),
|
||||
}
|
||||
|
||||
msg = String(data=json.dumps(status))
|
||||
self.pub_protection.publish(msg)
|
||||
|
||||
def _publish_speed_limit(self, derate: float) -> None:
|
||||
"""Publish thermal derate speed limit."""
|
||||
msg = Float32(data=derate)
|
||||
self.pub_speed_limit.publish(msg)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = MotorProtectionNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
4
jetson/ros2_ws/src/saltybot_motor_protection/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_motor_protection/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_motor_protection
|
||||
[egg_info]
|
||||
tag_date = 0
|
||||
29
jetson/ros2_ws/src/saltybot_motor_protection/setup.py
Normal file
29
jetson/ros2_ws/src/saltybot_motor_protection/setup.py
Normal file
@ -0,0 +1,29 @@
|
||||
from setuptools import setup
|
||||
|
||||
package_name = "saltybot_motor_protection"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version="0.1.0",
|
||||
packages=[package_name],
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||
(f"share/{package_name}", ["package.xml"]),
|
||||
(f"share/{package_name}/launch", ["launch/motor_protection.launch.py"]),
|
||||
(f"share/{package_name}/config", ["config/protection_config.yaml"]),
|
||||
],
|
||||
install_requires=["setuptools"],
|
||||
zip_safe=True,
|
||||
maintainer="sl-controls",
|
||||
maintainer_email="sl-controls@saltylab.local",
|
||||
description=(
|
||||
"Motor protection: overcurrent detection + I^2R thermal derating"
|
||||
),
|
||||
license="MIT",
|
||||
tests_require=["pytest"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"motor_protection_node = saltybot_motor_protection.motor_protection_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
@ -0,0 +1,384 @@
|
||||
"""Unit tests for motor_protection_node."""
|
||||
|
||||
import pytest
|
||||
import json
|
||||
import math
|
||||
from std_msgs.msg import Float32, String
|
||||
|
||||
import rclpy
|
||||
|
||||
# Import the node and classes under test
|
||||
from saltybot_motor_protection.motor_protection_node import (
|
||||
MotorProtectionNode,
|
||||
ProtectionState,
|
||||
ThermalModel,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def rclpy_fixture():
|
||||
"""Initialize and cleanup rclpy."""
|
||||
rclpy.init()
|
||||
yield
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def node(rclpy_fixture):
|
||||
"""Create a motor protection node instance."""
|
||||
node = MotorProtectionNode()
|
||||
yield node
|
||||
node.destroy_node()
|
||||
|
||||
|
||||
class TestThermalModel:
|
||||
"""Test suite for ThermalModel."""
|
||||
|
||||
def test_thermal_model_initialization(self):
|
||||
"""Test ThermalModel initializes with correct defaults."""
|
||||
model = ThermalModel()
|
||||
|
||||
assert model.motor_resistance == 1.5
|
||||
assert model.thermal_mass == 100.0
|
||||
assert model.ambient_temp == 25.0
|
||||
assert model.max_temp == 80.0
|
||||
assert model.cooling_constant == 0.05
|
||||
assert model.motor_temp == 25.0
|
||||
|
||||
def test_thermal_model_custom_params(self):
|
||||
"""Test ThermalModel with custom parameters."""
|
||||
model = ThermalModel(
|
||||
motor_resistance=2.0,
|
||||
thermal_mass=50.0,
|
||||
max_temp=100.0,
|
||||
)
|
||||
|
||||
assert model.motor_resistance == 2.0
|
||||
assert model.thermal_mass == 50.0
|
||||
assert model.max_temp == 100.0
|
||||
|
||||
|
||||
class TestMotorProtectionNode:
|
||||
"""Test suite for MotorProtectionNode."""
|
||||
|
||||
def test_node_initialization(self, node):
|
||||
"""Test that node initializes with correct defaults."""
|
||||
assert node.soft_limit == 5.0
|
||||
assert node.hard_limit == 8.0
|
||||
assert node.current_left == 0.0
|
||||
assert node.current_right == 0.0
|
||||
assert node.current_max == 0.0
|
||||
assert node.state == ProtectionState.NORMAL
|
||||
assert node.soft_fault_duration == 0.0
|
||||
|
||||
def test_current_left_subscription(self, node):
|
||||
"""Test left motor current subscription."""
|
||||
msg = Float32(data=2.5)
|
||||
node._on_current_left(msg)
|
||||
assert node.current_left == 2.5
|
||||
|
||||
def test_current_right_subscription(self, node):
|
||||
"""Test right motor current subscription."""
|
||||
msg = Float32(data=3.0)
|
||||
node._on_current_right(msg)
|
||||
assert node.current_right == 3.0
|
||||
|
||||
def test_current_negative_clamping(self, node):
|
||||
"""Test that negative currents are clamped to zero."""
|
||||
node._on_current_left(Float32(data=-1.0))
|
||||
assert node.current_left == 0.0
|
||||
|
||||
node._on_current_right(Float32(data=-2.5))
|
||||
assert node.current_right == 0.0
|
||||
|
||||
def test_max_current_calculation(self, node):
|
||||
"""Test maximum current selection."""
|
||||
node.current_left = 2.0
|
||||
node.current_right = 3.5
|
||||
|
||||
# Simulate timer callback to update max
|
||||
node._timer_callback()
|
||||
|
||||
assert node.current_max == 3.5
|
||||
|
||||
def test_normal_state_low_current(self, node):
|
||||
"""Test normal state with low current."""
|
||||
node.current_max = 2.0
|
||||
|
||||
node._check_overcurrent()
|
||||
|
||||
assert node.state == ProtectionState.NORMAL
|
||||
assert node.soft_fault_duration == 0.0
|
||||
|
||||
def test_soft_fault_detection(self, node):
|
||||
"""Test soft overcurrent detection at 5A."""
|
||||
node.current_max = 5.5
|
||||
|
||||
node._check_overcurrent()
|
||||
|
||||
assert node.state == ProtectionState.SOFT_FAULT
|
||||
|
||||
def test_hard_fault_detection(self, node):
|
||||
"""Test hard overcurrent detection at 8A."""
|
||||
node.current_max = 8.5
|
||||
|
||||
node._check_overcurrent()
|
||||
|
||||
assert node.state == ProtectionState.HARD_FAULT
|
||||
|
||||
def test_soft_fault_duration_tracking(self, node):
|
||||
"""Test soft fault duration accumulation."""
|
||||
node.current_max = 5.5
|
||||
node.state = ProtectionState.NORMAL
|
||||
|
||||
# First cycle
|
||||
node._check_overcurrent()
|
||||
assert node.soft_fault_duration == 0.02
|
||||
|
||||
# Second cycle
|
||||
node._check_overcurrent()
|
||||
assert node.soft_fault_duration == 0.04
|
||||
|
||||
def test_fault_recovery(self, node):
|
||||
"""Test recovery from soft fault."""
|
||||
# Start in soft fault
|
||||
node.state = ProtectionState.SOFT_FAULT
|
||||
node.current_max = 5.5
|
||||
|
||||
# Current drops below threshold
|
||||
node.current_max = 4.0
|
||||
node._check_overcurrent()
|
||||
|
||||
assert node.state == ProtectionState.NORMAL
|
||||
assert node.soft_fault_duration == 0.0
|
||||
|
||||
def test_joule_heating_calculation(self, node):
|
||||
"""Test I^2R heating calculation."""
|
||||
# Current = 5A, Resistance = 1.5 Ohm
|
||||
# P = I^2 * R = 25 * 1.5 = 37.5W per motor
|
||||
node.current_left = 5.0
|
||||
node.current_right = 5.0
|
||||
|
||||
# Simulate thermal update
|
||||
node._update_thermal_state(1.0)
|
||||
|
||||
# Temperature should increase
|
||||
assert node.thermal.motor_temp > 25.0
|
||||
|
||||
def test_temperature_increase(self, node):
|
||||
"""Test motor temperature increases with current."""
|
||||
initial_temp = node.thermal.motor_temp
|
||||
|
||||
node.current_left = 4.0
|
||||
node.current_right = 4.0
|
||||
|
||||
# Run multiple iterations
|
||||
for _ in range(10):
|
||||
node._update_thermal_state(0.1)
|
||||
|
||||
# Temperature should be higher
|
||||
assert node.thermal.motor_temp > initial_temp
|
||||
|
||||
def test_temperature_saturation(self, node):
|
||||
"""Test temperature doesn't exceed ambient with zero current."""
|
||||
node.thermal.motor_temp = 50.0
|
||||
node.current_left = 0.0
|
||||
node.current_right = 0.0
|
||||
|
||||
# Cool down for many iterations
|
||||
for _ in range(100):
|
||||
node._update_thermal_state(0.1)
|
||||
|
||||
# Temperature should cool toward ambient
|
||||
assert node.thermal.motor_temp < 50.0
|
||||
assert node.thermal.motor_temp >= 25.0 # Don't go below ambient
|
||||
|
||||
def test_derate_hard_fault(self, node):
|
||||
"""Test derate returns 0 on hard fault."""
|
||||
node.state = ProtectionState.HARD_FAULT
|
||||
|
||||
derate = node._calculate_derate()
|
||||
|
||||
assert derate == 0.0
|
||||
|
||||
def test_derate_normal(self, node):
|
||||
"""Test derate returns 1.0 in normal state."""
|
||||
node.state = ProtectionState.NORMAL
|
||||
node.thermal.motor_temp = 25.0 # Ambient
|
||||
|
||||
derate = node._calculate_derate()
|
||||
|
||||
assert derate == 1.0
|
||||
|
||||
def test_derate_soft_fault(self, node):
|
||||
"""Test derate reduces in soft fault state."""
|
||||
node.state = ProtectionState.SOFT_FAULT
|
||||
node.current_max = 6.0 # Midway between soft and hard
|
||||
node.thermal.motor_temp = 25.0
|
||||
|
||||
derate = node._calculate_derate()
|
||||
|
||||
# Should be reduced but not zero
|
||||
assert 0.1 <= derate < 1.0
|
||||
|
||||
def test_derate_high_temperature(self, node):
|
||||
"""Test thermal derating at high temperature."""
|
||||
node.state = ProtectionState.NORMAL
|
||||
node.thermal.motor_temp = 75.0 # Near limit (80°C)
|
||||
|
||||
derate = node._calculate_derate()
|
||||
|
||||
# Should be significantly reduced
|
||||
assert derate < 0.5
|
||||
|
||||
def test_derate_maximum_temperature(self, node):
|
||||
"""Test derate approaches zero at max temperature."""
|
||||
node.state = ProtectionState.NORMAL
|
||||
node.thermal.motor_temp = 80.0 # At limit
|
||||
|
||||
derate = node._calculate_derate()
|
||||
|
||||
assert derate == 0.0
|
||||
|
||||
def test_derate_clipping(self, node):
|
||||
"""Test derate is clipped to [0, 1]."""
|
||||
# Even with extreme calculations, should be in range
|
||||
node.state = ProtectionState.NORMAL
|
||||
node.thermal.motor_temp = -50.0 # Invalid, but test robustness
|
||||
node.current_max = 20.0 # Extreme
|
||||
|
||||
derate = node._calculate_derate()
|
||||
|
||||
assert 0.0 <= derate <= 1.0
|
||||
|
||||
def test_protection_status_json(self, node):
|
||||
"""Test protection status JSON format."""
|
||||
node.current_left = 2.5
|
||||
node.current_right = 3.0
|
||||
node.state = ProtectionState.NORMAL
|
||||
node.thermal.motor_temp = 45.0
|
||||
|
||||
# Publish would be called, but test JSON creation
|
||||
status = {
|
||||
"state": node.state.name,
|
||||
"current_left": float(node.current_left),
|
||||
"current_right": float(node.current_right),
|
||||
"current_max": float(node.current_max),
|
||||
"motor_temp": float(node.thermal.motor_temp),
|
||||
"soft_limit": node.soft_limit,
|
||||
"hard_limit": node.hard_limit,
|
||||
}
|
||||
|
||||
json_str = json.dumps(status)
|
||||
parsed = json.loads(json_str)
|
||||
|
||||
assert parsed["state"] == "NORMAL"
|
||||
assert parsed["current_left"] == 2.5
|
||||
assert parsed["motor_temp"] == 45.0
|
||||
|
||||
|
||||
class TestMotorProtectionScenarios:
|
||||
"""Integration-style tests for realistic scenarios."""
|
||||
|
||||
def test_scenario_normal_operation(self, node):
|
||||
"""Scenario: normal motor operation."""
|
||||
node.current_left = 2.0
|
||||
node.current_right = 2.5
|
||||
|
||||
node._timer_callback()
|
||||
|
||||
assert node.state == ProtectionState.NORMAL
|
||||
derate = node._calculate_derate()
|
||||
assert derate > 0.9
|
||||
|
||||
def test_scenario_slow_thermal_rise(self, node):
|
||||
"""Scenario: gradual temperature increase from sustained current."""
|
||||
node.current_left = 3.0
|
||||
node.current_right = 3.0
|
||||
|
||||
# Simulate 10 seconds at 50Hz
|
||||
for _ in range(500):
|
||||
node._update_thermal_state(0.02)
|
||||
|
||||
# Temperature should have risen significantly
|
||||
assert node.thermal.motor_temp > 40.0
|
||||
assert node.thermal.motor_temp < 80.0
|
||||
|
||||
def test_scenario_soft_overcurrent_trip(self, node):
|
||||
"""Scenario: soft overcurrent detection and derating."""
|
||||
node.current_max = 5.5
|
||||
|
||||
node._check_overcurrent()
|
||||
assert node.state == ProtectionState.SOFT_FAULT
|
||||
|
||||
derate = node._calculate_derate()
|
||||
assert 0.0 < derate < 1.0
|
||||
|
||||
def test_scenario_hard_overcurrent_emergency(self, node):
|
||||
"""Scenario: hard overcurrent triggers immediate shutdown."""
|
||||
node.current_max = 9.0
|
||||
|
||||
node._check_overcurrent()
|
||||
assert node.state == ProtectionState.HARD_FAULT
|
||||
|
||||
derate = node._calculate_derate()
|
||||
assert derate == 0.0
|
||||
|
||||
def test_scenario_combined_thermal_current_derating(self, node):
|
||||
"""Scenario: both current and thermal derating apply."""
|
||||
# Soft overcurrent
|
||||
node.current_max = 5.5
|
||||
node.state = ProtectionState.SOFT_FAULT
|
||||
|
||||
# Also high temperature
|
||||
node.thermal.motor_temp = 70.0
|
||||
|
||||
derate = node._calculate_derate()
|
||||
|
||||
# Should be reduced by both factors
|
||||
assert derate < 0.5
|
||||
|
||||
def test_scenario_current_spike_recovery(self, node):
|
||||
"""Scenario: brief current spike then recovery."""
|
||||
# Spike
|
||||
node.current_max = 8.5
|
||||
node._check_overcurrent()
|
||||
assert node.state == ProtectionState.HARD_FAULT
|
||||
|
||||
derate = node._calculate_derate()
|
||||
assert derate == 0.0
|
||||
|
||||
# Recovery
|
||||
node.current_max = 2.0
|
||||
node._check_overcurrent()
|
||||
assert node.state == ProtectionState.NORMAL
|
||||
|
||||
derate = node._calculate_derate()
|
||||
assert derate > 0.9
|
||||
|
||||
def test_scenario_thermal_limit_shutdown(self, node):
|
||||
"""Scenario: motor heats up and speed is derated to zero."""
|
||||
node.state = ProtectionState.NORMAL
|
||||
|
||||
# Simulate sustained high current heating
|
||||
node.current_left = 5.0
|
||||
node.current_right = 5.0
|
||||
|
||||
# Heat up to max temp
|
||||
while node.thermal.motor_temp < 80.0:
|
||||
node._update_thermal_state(0.1)
|
||||
|
||||
# At max temp, derate should be zero
|
||||
derate = node._calculate_derate()
|
||||
assert derate == 0.0
|
||||
|
||||
def test_scenario_asymmetric_motor_loading(self, node):
|
||||
"""Scenario: one motor draws more current than the other."""
|
||||
node.current_left = 2.0
|
||||
node.current_right = 6.5
|
||||
|
||||
node._timer_callback()
|
||||
|
||||
# Should use worst case (right motor)
|
||||
assert node.current_max == 6.5
|
||||
assert node.state == ProtectionState.SOFT_FAULT
|
||||
Loading…
x
Reference in New Issue
Block a user