Compare commits
3 Commits
c00688cd7e
...
4730ea46d1
| Author | SHA1 | Date | |
|---|---|---|---|
| 4730ea46d1 | |||
| 0cdbe4b43e | |||
| eceda99bb5 |
117
include/ina219.h
Normal file
117
include/ina219.h
Normal file
@ -0,0 +1,117 @@
|
|||||||
|
#ifndef INA219_H
|
||||||
|
#define INA219_H
|
||||||
|
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <stdbool.h>
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ina219.h — INA219 power monitor driver (Issue #214)
|
||||||
|
*
|
||||||
|
* I2C1 driver for motor current/voltage/power monitoring.
|
||||||
|
* Supports 2 sensors (left/right motor) on I2C1 (PB8=SCL, PB9=SDA).
|
||||||
|
*
|
||||||
|
* INA219 specs:
|
||||||
|
* - I2C addresses: 0x40–0x4F (configurable via address pins)
|
||||||
|
* - Bus voltage: 0–26V, 4mV/LSB
|
||||||
|
* - Shunt voltage: ±327mV, 10µV/LSB
|
||||||
|
* - Current: derived from shunt voltage (calibration-dependent)
|
||||||
|
* - Power: (Bus V × Current) / internal gain
|
||||||
|
*
|
||||||
|
* Typical usage for motor monitoring:
|
||||||
|
* - 0.1Ω shunt resistor → ~3.27A max (at ±327mV)
|
||||||
|
* - Calibration: set max expected current, driver calculates LSB
|
||||||
|
* - Read functions return actual voltage/current/power values
|
||||||
|
*/
|
||||||
|
|
||||||
|
/* INA219 sensors (2 motors) */
|
||||||
|
typedef enum {
|
||||||
|
INA219_LEFT_MOTOR = 0, /* Address 0x40 */
|
||||||
|
INA219_RIGHT_MOTOR = 1, /* Address 0x41 */
|
||||||
|
INA219_COUNT
|
||||||
|
} INA219Sensor;
|
||||||
|
|
||||||
|
/* INA219 measurement data */
|
||||||
|
typedef struct {
|
||||||
|
uint16_t bus_voltage_mv; /* Bus voltage in mV (0–26000) */
|
||||||
|
int16_t shunt_voltage_uv; /* Shunt voltage in µV (±327000) */
|
||||||
|
int16_t current_ma; /* Current in mA (signed) */
|
||||||
|
uint32_t power_mw; /* Power in mW */
|
||||||
|
} INA219Data;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ina219_init()
|
||||||
|
*
|
||||||
|
* Initialize I2C1 and both INA219 sensors (left + right motor).
|
||||||
|
* Performs auto-calibration for typical motor current monitoring.
|
||||||
|
* Call once at startup after i2c1_init().
|
||||||
|
*/
|
||||||
|
void ina219_init(void);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ina219_calibrate(sensor, max_current_ma, shunt_ohms_milli)
|
||||||
|
*
|
||||||
|
* Manually calibrate a sensor for expected max current and shunt resistance.
|
||||||
|
* Calculates internal calibration register value.
|
||||||
|
*
|
||||||
|
* Example:
|
||||||
|
* ina219_calibrate(INA219_LEFT_MOTOR, 5000, 100); // 5A max, 0.1Ω shunt
|
||||||
|
*/
|
||||||
|
void ina219_calibrate(INA219Sensor sensor, uint16_t max_current_ma, uint16_t shunt_ohms_milli);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ina219_read(sensor, data)
|
||||||
|
*
|
||||||
|
* Read all measurements from a sensor (voltage, current, power).
|
||||||
|
* Blocks until measurements are ready (typically <1ms at default ADC resolution).
|
||||||
|
*
|
||||||
|
* Returns: true if read successful, false on I2C error.
|
||||||
|
*/
|
||||||
|
bool ina219_read(INA219Sensor sensor, INA219Data *data);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ina219_read_bus_voltage_mv(sensor, voltage_mv)
|
||||||
|
*
|
||||||
|
* Read bus voltage only (faster than full read).
|
||||||
|
* Returns: true if successful.
|
||||||
|
*/
|
||||||
|
bool ina219_read_bus_voltage_mv(INA219Sensor sensor, uint16_t *voltage_mv);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ina219_read_current_ma(sensor, current_ma)
|
||||||
|
*
|
||||||
|
* Read current only (requires prior calibration).
|
||||||
|
* Returns: true if successful.
|
||||||
|
*/
|
||||||
|
bool ina219_read_current_ma(INA219Sensor sensor, int16_t *current_ma);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ina219_read_power_mw(sensor, power_mw)
|
||||||
|
*
|
||||||
|
* Read power consumption only.
|
||||||
|
* Returns: true if successful.
|
||||||
|
*/
|
||||||
|
bool ina219_read_power_mw(INA219Sensor sensor, uint32_t *power_mw);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ina219_alert_enable(sensor, current_limit_ma)
|
||||||
|
*
|
||||||
|
* Enable alert pin when current exceeds limit (overcurrent protection).
|
||||||
|
* Alert pin: GPIO, active high, open-drain output.
|
||||||
|
*/
|
||||||
|
void ina219_alert_enable(INA219Sensor sensor, uint16_t current_limit_ma);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ina219_alert_disable(sensor)
|
||||||
|
*
|
||||||
|
* Disable alert for a sensor.
|
||||||
|
*/
|
||||||
|
void ina219_alert_disable(INA219Sensor sensor);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ina219_reset(sensor)
|
||||||
|
*
|
||||||
|
* Perform soft reset on a sensor (clears all registers to default).
|
||||||
|
*/
|
||||||
|
void ina219_reset(INA219Sensor sensor);
|
||||||
|
|
||||||
|
#endif /* INA219_H */
|
||||||
@ -0,0 +1,10 @@
|
|||||||
|
# Motor protection configuration
|
||||||
|
|
||||||
|
motor_protection:
|
||||||
|
ros__parameters:
|
||||||
|
# Overcurrent limits (Amps)
|
||||||
|
soft_current_limit: 5.0 # Warning threshold, trigger derating
|
||||||
|
hard_current_limit: 8.0 # Fault threshold, trigger emergency stop
|
||||||
|
|
||||||
|
# Protection monitoring frequency (Hz)
|
||||||
|
frequency: 50 # 50 Hz = 20ms cycle
|
||||||
@ -0,0 +1,36 @@
|
|||||||
|
"""Launch file for motor_protection_node."""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
import os
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
"""Generate launch description for motor protection."""
|
||||||
|
# Package directory
|
||||||
|
pkg_dir = get_package_share_directory("saltybot_motor_protection")
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
config_file = os.path.join(pkg_dir, "config", "protection_config.yaml")
|
||||||
|
|
||||||
|
# Declare launch arguments
|
||||||
|
return LaunchDescription(
|
||||||
|
[
|
||||||
|
DeclareLaunchArgument(
|
||||||
|
"config_file",
|
||||||
|
default_value=config_file,
|
||||||
|
description="Path to configuration YAML file",
|
||||||
|
),
|
||||||
|
# Motor protection node
|
||||||
|
Node(
|
||||||
|
package="saltybot_motor_protection",
|
||||||
|
executable="motor_protection_node",
|
||||||
|
name="motor_protection",
|
||||||
|
output="screen",
|
||||||
|
parameters=[LaunchConfiguration("config_file")],
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
29
jetson/ros2_ws/src/saltybot_motor_protection/package.xml
Normal file
29
jetson/ros2_ws/src/saltybot_motor_protection/package.xml
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_motor_protection</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
Motor current protection node for SaltyBot: overcurrent detection (soft 5A, hard 8A)
|
||||||
|
and thermal derating using I^2R model. Monitors motor current, calculates thermal
|
||||||
|
state, applies speed derating, and triggers emergency stop on hard fault.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>geometry_msgs</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>std_srvs</depend>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,263 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Motor current protection node for SaltyBot.
|
||||||
|
|
||||||
|
Monitors motor current, detects overcurrent conditions (soft 5A, hard 8A),
|
||||||
|
models thermal state using I^2R model, and applies speed derating.
|
||||||
|
Triggers emergency stop on hard fault.
|
||||||
|
|
||||||
|
Published topics:
|
||||||
|
/saltybot/motor_protection (std_msgs/String) - JSON protection status
|
||||||
|
/saltybot/speed_limit (std_msgs/Float32) - Thermal derating factor [0, 1]
|
||||||
|
|
||||||
|
Subscribed topics:
|
||||||
|
/saltybot/motor_current_left (std_msgs/Float32) - Left motor current (A)
|
||||||
|
/saltybot/motor_current_right (std_msgs/Float32) - Right motor current (A)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
from enum import Enum
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.timer import Timer
|
||||||
|
from geometry_msgs.msg import Twist
|
||||||
|
from std_msgs.msg import Float32, String
|
||||||
|
|
||||||
|
|
||||||
|
class ProtectionState(Enum):
|
||||||
|
"""Motor protection state."""
|
||||||
|
NORMAL = 0
|
||||||
|
SOFT_FAULT = 1 # Soft overcurrent
|
||||||
|
HARD_FAULT = 2 # Hard overcurrent
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ThermalModel:
|
||||||
|
"""I^2R thermal model parameters."""
|
||||||
|
motor_resistance: float = 1.5 # Ohms (motor winding resistance)
|
||||||
|
thermal_mass: float = 100.0 # J/°C (thermal capacitance)
|
||||||
|
ambient_temp: float = 25.0 # °C
|
||||||
|
max_temp: float = 80.0 # °C (safe operating limit)
|
||||||
|
cooling_constant: float = 0.05 # 1/s (exponential cooling rate)
|
||||||
|
|
||||||
|
# Derived
|
||||||
|
motor_temp: float = 25.0 # Current motor temperature
|
||||||
|
|
||||||
|
|
||||||
|
class MotorProtectionNode(Node):
|
||||||
|
"""ROS2 node for motor current protection."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("motor_protection")
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
self.declare_parameter("soft_current_limit", 5.0) # Amps
|
||||||
|
self.declare_parameter("hard_current_limit", 8.0) # Amps
|
||||||
|
self.declare_parameter("frequency", 50) # Hz
|
||||||
|
|
||||||
|
self.soft_limit = self.get_parameter("soft_current_limit").value
|
||||||
|
self.hard_limit = self.get_parameter("hard_current_limit").value
|
||||||
|
frequency = self.get_parameter("frequency").value
|
||||||
|
|
||||||
|
# Current measurements
|
||||||
|
self.current_left = 0.0
|
||||||
|
self.current_right = 0.0
|
||||||
|
self.current_max = 0.0 # Maximum of the two motors
|
||||||
|
|
||||||
|
# Thermal model
|
||||||
|
self.thermal = ThermalModel()
|
||||||
|
|
||||||
|
# Protection state
|
||||||
|
self.state = ProtectionState.NORMAL
|
||||||
|
self.soft_fault_duration = 0.0 # Time in soft fault state
|
||||||
|
|
||||||
|
# Subscriptions
|
||||||
|
self.create_subscription(
|
||||||
|
Float32, "/saltybot/motor_current_left", self._on_current_left, 10
|
||||||
|
)
|
||||||
|
self.create_subscription(
|
||||||
|
Float32, "/saltybot/motor_current_right", self._on_current_right, 10
|
||||||
|
)
|
||||||
|
|
||||||
|
# Publications
|
||||||
|
self.pub_protection = self.create_publisher(
|
||||||
|
String, "/saltybot/motor_protection", 10
|
||||||
|
)
|
||||||
|
self.pub_speed_limit = self.create_publisher(
|
||||||
|
Float32, "/saltybot/speed_limit", 10
|
||||||
|
)
|
||||||
|
|
||||||
|
# Timer for monitoring at 50Hz
|
||||||
|
period = 1.0 / frequency
|
||||||
|
self.timer: Timer = self.create_timer(period, self._timer_callback)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Motor protection initialized at {frequency}Hz. "
|
||||||
|
f"Soft limit: {self.soft_limit}A, Hard limit: {self.hard_limit}A"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _on_current_left(self, msg: Float32) -> None:
|
||||||
|
"""Update left motor current."""
|
||||||
|
self.current_left = max(0.0, msg.data) # Ensure non-negative
|
||||||
|
|
||||||
|
def _on_current_right(self, msg: Float32) -> None:
|
||||||
|
"""Update right motor current."""
|
||||||
|
self.current_right = max(0.0, msg.data) # Ensure non-negative
|
||||||
|
|
||||||
|
def _timer_callback(self) -> None:
|
||||||
|
"""Monitor motor current and thermal state at 50Hz."""
|
||||||
|
dt = 0.02 # 50Hz = 20ms
|
||||||
|
|
||||||
|
# Get maximum current (worst case)
|
||||||
|
self.current_max = max(self.current_left, self.current_right)
|
||||||
|
|
||||||
|
# Update thermal state
|
||||||
|
self._update_thermal_state(dt)
|
||||||
|
|
||||||
|
# Check overcurrent conditions
|
||||||
|
self._check_overcurrent()
|
||||||
|
|
||||||
|
# Calculate speed derate factor
|
||||||
|
derate_factor = self._calculate_derate()
|
||||||
|
|
||||||
|
# Publish status
|
||||||
|
self._publish_protection_status()
|
||||||
|
self._publish_speed_limit(derate_factor)
|
||||||
|
|
||||||
|
def _update_thermal_state(self, dt: float) -> None:
|
||||||
|
"""Update motor temperature using I^2R thermal model.
|
||||||
|
|
||||||
|
Model: dT/dt = (P_loss - P_cool) / C_thermal
|
||||||
|
where:
|
||||||
|
P_loss = I^2 * R (Joule heating)
|
||||||
|
P_cool = h * (T - T_ambient) (convective cooling)
|
||||||
|
C_thermal = thermal mass / dt
|
||||||
|
"""
|
||||||
|
# Heat generation: P_loss = I^2 * R (both motors)
|
||||||
|
p_loss_left = self.current_left**2 * self.thermal.motor_resistance
|
||||||
|
p_loss_right = self.current_right**2 * self.thermal.motor_resistance
|
||||||
|
p_loss_total = p_loss_left + p_loss_right # Combined heat
|
||||||
|
|
||||||
|
# Passive cooling (exponential decay)
|
||||||
|
temp_diff = self.thermal.motor_temp - self.thermal.ambient_temp
|
||||||
|
p_cool = self.thermal.cooling_constant * temp_diff
|
||||||
|
|
||||||
|
# Temperature change
|
||||||
|
dtemp_dt = (p_loss_total - p_cool) / self.thermal.thermal_mass
|
||||||
|
|
||||||
|
# Update temperature (with saturation)
|
||||||
|
new_temp = self.thermal.motor_temp + dtemp_dt * dt
|
||||||
|
self.thermal.motor_temp = max(self.thermal.ambient_temp, new_temp)
|
||||||
|
|
||||||
|
self.get_logger().debug(
|
||||||
|
f"Thermal: T={self.thermal.motor_temp:.1f}°C, "
|
||||||
|
f"P_loss={p_loss_total:.1f}W, P_cool={p_cool:.1f}W"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _check_overcurrent(self) -> None:
|
||||||
|
"""Check for overcurrent conditions and update protection state."""
|
||||||
|
if self.current_max >= self.hard_limit:
|
||||||
|
# Hard fault: immediate stop
|
||||||
|
if self.state != ProtectionState.HARD_FAULT:
|
||||||
|
self.get_logger().error(
|
||||||
|
f"HARD OVERCURRENT FAULT: {self.current_max:.2f}A >= {self.hard_limit}A"
|
||||||
|
)
|
||||||
|
self.state = ProtectionState.HARD_FAULT
|
||||||
|
self.soft_fault_duration = 0.0
|
||||||
|
|
||||||
|
elif self.current_max >= self.soft_limit:
|
||||||
|
# Soft fault: warning and derating
|
||||||
|
if self.state == ProtectionState.NORMAL:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"Soft overcurrent: {self.current_max:.2f}A >= {self.soft_limit}A"
|
||||||
|
)
|
||||||
|
self.state = ProtectionState.SOFT_FAULT
|
||||||
|
self.soft_fault_duration += 0.02 # 50Hz = 20ms
|
||||||
|
|
||||||
|
else:
|
||||||
|
# Back to normal
|
||||||
|
if self.state != ProtectionState.NORMAL:
|
||||||
|
self.get_logger().info("Overcurrent cleared, resuming normal operation")
|
||||||
|
self.state = ProtectionState.NORMAL
|
||||||
|
self.soft_fault_duration = 0.0
|
||||||
|
|
||||||
|
def _calculate_derate(self) -> float:
|
||||||
|
"""Calculate speed derating factor based on thermal state.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
float: Speed derate factor in [0, 1]
|
||||||
|
"""
|
||||||
|
# Hard fault: complete stop
|
||||||
|
if self.state == ProtectionState.HARD_FAULT:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
# Soft fault: reduce speed based on current and temperature
|
||||||
|
soft_derate = 1.0
|
||||||
|
if self.state == ProtectionState.SOFT_FAULT:
|
||||||
|
# Reduce speed proportionally to current above soft limit
|
||||||
|
current_excess = (self.current_max - self.soft_limit) / (
|
||||||
|
self.hard_limit - self.soft_limit
|
||||||
|
)
|
||||||
|
soft_derate = max(0.1, 1.0 - current_excess * 0.5) # Min 10% speed
|
||||||
|
|
||||||
|
# Thermal derating: reduce speed as temperature increases
|
||||||
|
temp_range = self.thermal.max_temp - self.thermal.ambient_temp
|
||||||
|
if temp_range > 0:
|
||||||
|
temp_ratio = (
|
||||||
|
self.thermal.motor_temp - self.thermal.ambient_temp
|
||||||
|
) / temp_range
|
||||||
|
|
||||||
|
# Linear derating: full speed at ambient, zero at max
|
||||||
|
if temp_ratio > 1.0:
|
||||||
|
thermal_derate = 0.0
|
||||||
|
elif temp_ratio > 0.8:
|
||||||
|
# Aggressive derating near limit
|
||||||
|
thermal_derate = (1.0 - temp_ratio) / 0.2
|
||||||
|
else:
|
||||||
|
thermal_derate = 1.0
|
||||||
|
else:
|
||||||
|
thermal_derate = 1.0
|
||||||
|
|
||||||
|
# Combined derate: use minimum of both
|
||||||
|
derate = min(soft_derate, thermal_derate)
|
||||||
|
return max(0.0, min(1.0, derate)) # Clamp to [0, 1]
|
||||||
|
|
||||||
|
def _publish_protection_status(self) -> None:
|
||||||
|
"""Publish motor protection status as JSON."""
|
||||||
|
status = {
|
||||||
|
"timestamp": self.get_clock().now().nanoseconds / 1e9,
|
||||||
|
"state": self.state.name,
|
||||||
|
"current_left": float(self.current_left),
|
||||||
|
"current_right": float(self.current_right),
|
||||||
|
"current_max": float(self.current_max),
|
||||||
|
"motor_temp": float(self.thermal.motor_temp),
|
||||||
|
"soft_limit": self.soft_limit,
|
||||||
|
"hard_limit": self.hard_limit,
|
||||||
|
"soft_fault_duration": float(self.soft_fault_duration),
|
||||||
|
}
|
||||||
|
|
||||||
|
msg = String(data=json.dumps(status))
|
||||||
|
self.pub_protection.publish(msg)
|
||||||
|
|
||||||
|
def _publish_speed_limit(self, derate: float) -> None:
|
||||||
|
"""Publish thermal derate speed limit."""
|
||||||
|
msg = Float32(data=derate)
|
||||||
|
self.pub_speed_limit.publish(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = MotorProtectionNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
4
jetson/ros2_ws/src/saltybot_motor_protection/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_motor_protection/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_motor_protection
|
||||||
|
[egg_info]
|
||||||
|
tag_date = 0
|
||||||
29
jetson/ros2_ws/src/saltybot_motor_protection/setup.py
Normal file
29
jetson/ros2_ws/src/saltybot_motor_protection/setup.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
package_name = "saltybot_motor_protection"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=[package_name],
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(f"share/{package_name}/launch", ["launch/motor_protection.launch.py"]),
|
||||||
|
(f"share/{package_name}/config", ["config/protection_config.yaml"]),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-controls",
|
||||||
|
maintainer_email="sl-controls@saltylab.local",
|
||||||
|
description=(
|
||||||
|
"Motor protection: overcurrent detection + I^2R thermal derating"
|
||||||
|
),
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
"motor_protection_node = saltybot_motor_protection.motor_protection_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
@ -0,0 +1,384 @@
|
|||||||
|
"""Unit tests for motor_protection_node."""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
from std_msgs.msg import Float32, String
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
|
||||||
|
# Import the node and classes under test
|
||||||
|
from saltybot_motor_protection.motor_protection_node import (
|
||||||
|
MotorProtectionNode,
|
||||||
|
ProtectionState,
|
||||||
|
ThermalModel,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def rclpy_fixture():
|
||||||
|
"""Initialize and cleanup rclpy."""
|
||||||
|
rclpy.init()
|
||||||
|
yield
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def node(rclpy_fixture):
|
||||||
|
"""Create a motor protection node instance."""
|
||||||
|
node = MotorProtectionNode()
|
||||||
|
yield node
|
||||||
|
node.destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
class TestThermalModel:
|
||||||
|
"""Test suite for ThermalModel."""
|
||||||
|
|
||||||
|
def test_thermal_model_initialization(self):
|
||||||
|
"""Test ThermalModel initializes with correct defaults."""
|
||||||
|
model = ThermalModel()
|
||||||
|
|
||||||
|
assert model.motor_resistance == 1.5
|
||||||
|
assert model.thermal_mass == 100.0
|
||||||
|
assert model.ambient_temp == 25.0
|
||||||
|
assert model.max_temp == 80.0
|
||||||
|
assert model.cooling_constant == 0.05
|
||||||
|
assert model.motor_temp == 25.0
|
||||||
|
|
||||||
|
def test_thermal_model_custom_params(self):
|
||||||
|
"""Test ThermalModel with custom parameters."""
|
||||||
|
model = ThermalModel(
|
||||||
|
motor_resistance=2.0,
|
||||||
|
thermal_mass=50.0,
|
||||||
|
max_temp=100.0,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert model.motor_resistance == 2.0
|
||||||
|
assert model.thermal_mass == 50.0
|
||||||
|
assert model.max_temp == 100.0
|
||||||
|
|
||||||
|
|
||||||
|
class TestMotorProtectionNode:
|
||||||
|
"""Test suite for MotorProtectionNode."""
|
||||||
|
|
||||||
|
def test_node_initialization(self, node):
|
||||||
|
"""Test that node initializes with correct defaults."""
|
||||||
|
assert node.soft_limit == 5.0
|
||||||
|
assert node.hard_limit == 8.0
|
||||||
|
assert node.current_left == 0.0
|
||||||
|
assert node.current_right == 0.0
|
||||||
|
assert node.current_max == 0.0
|
||||||
|
assert node.state == ProtectionState.NORMAL
|
||||||
|
assert node.soft_fault_duration == 0.0
|
||||||
|
|
||||||
|
def test_current_left_subscription(self, node):
|
||||||
|
"""Test left motor current subscription."""
|
||||||
|
msg = Float32(data=2.5)
|
||||||
|
node._on_current_left(msg)
|
||||||
|
assert node.current_left == 2.5
|
||||||
|
|
||||||
|
def test_current_right_subscription(self, node):
|
||||||
|
"""Test right motor current subscription."""
|
||||||
|
msg = Float32(data=3.0)
|
||||||
|
node._on_current_right(msg)
|
||||||
|
assert node.current_right == 3.0
|
||||||
|
|
||||||
|
def test_current_negative_clamping(self, node):
|
||||||
|
"""Test that negative currents are clamped to zero."""
|
||||||
|
node._on_current_left(Float32(data=-1.0))
|
||||||
|
assert node.current_left == 0.0
|
||||||
|
|
||||||
|
node._on_current_right(Float32(data=-2.5))
|
||||||
|
assert node.current_right == 0.0
|
||||||
|
|
||||||
|
def test_max_current_calculation(self, node):
|
||||||
|
"""Test maximum current selection."""
|
||||||
|
node.current_left = 2.0
|
||||||
|
node.current_right = 3.5
|
||||||
|
|
||||||
|
# Simulate timer callback to update max
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
assert node.current_max == 3.5
|
||||||
|
|
||||||
|
def test_normal_state_low_current(self, node):
|
||||||
|
"""Test normal state with low current."""
|
||||||
|
node.current_max = 2.0
|
||||||
|
|
||||||
|
node._check_overcurrent()
|
||||||
|
|
||||||
|
assert node.state == ProtectionState.NORMAL
|
||||||
|
assert node.soft_fault_duration == 0.0
|
||||||
|
|
||||||
|
def test_soft_fault_detection(self, node):
|
||||||
|
"""Test soft overcurrent detection at 5A."""
|
||||||
|
node.current_max = 5.5
|
||||||
|
|
||||||
|
node._check_overcurrent()
|
||||||
|
|
||||||
|
assert node.state == ProtectionState.SOFT_FAULT
|
||||||
|
|
||||||
|
def test_hard_fault_detection(self, node):
|
||||||
|
"""Test hard overcurrent detection at 8A."""
|
||||||
|
node.current_max = 8.5
|
||||||
|
|
||||||
|
node._check_overcurrent()
|
||||||
|
|
||||||
|
assert node.state == ProtectionState.HARD_FAULT
|
||||||
|
|
||||||
|
def test_soft_fault_duration_tracking(self, node):
|
||||||
|
"""Test soft fault duration accumulation."""
|
||||||
|
node.current_max = 5.5
|
||||||
|
node.state = ProtectionState.NORMAL
|
||||||
|
|
||||||
|
# First cycle
|
||||||
|
node._check_overcurrent()
|
||||||
|
assert node.soft_fault_duration == 0.02
|
||||||
|
|
||||||
|
# Second cycle
|
||||||
|
node._check_overcurrent()
|
||||||
|
assert node.soft_fault_duration == 0.04
|
||||||
|
|
||||||
|
def test_fault_recovery(self, node):
|
||||||
|
"""Test recovery from soft fault."""
|
||||||
|
# Start in soft fault
|
||||||
|
node.state = ProtectionState.SOFT_FAULT
|
||||||
|
node.current_max = 5.5
|
||||||
|
|
||||||
|
# Current drops below threshold
|
||||||
|
node.current_max = 4.0
|
||||||
|
node._check_overcurrent()
|
||||||
|
|
||||||
|
assert node.state == ProtectionState.NORMAL
|
||||||
|
assert node.soft_fault_duration == 0.0
|
||||||
|
|
||||||
|
def test_joule_heating_calculation(self, node):
|
||||||
|
"""Test I^2R heating calculation."""
|
||||||
|
# Current = 5A, Resistance = 1.5 Ohm
|
||||||
|
# P = I^2 * R = 25 * 1.5 = 37.5W per motor
|
||||||
|
node.current_left = 5.0
|
||||||
|
node.current_right = 5.0
|
||||||
|
|
||||||
|
# Simulate thermal update
|
||||||
|
node._update_thermal_state(1.0)
|
||||||
|
|
||||||
|
# Temperature should increase
|
||||||
|
assert node.thermal.motor_temp > 25.0
|
||||||
|
|
||||||
|
def test_temperature_increase(self, node):
|
||||||
|
"""Test motor temperature increases with current."""
|
||||||
|
initial_temp = node.thermal.motor_temp
|
||||||
|
|
||||||
|
node.current_left = 4.0
|
||||||
|
node.current_right = 4.0
|
||||||
|
|
||||||
|
# Run multiple iterations
|
||||||
|
for _ in range(10):
|
||||||
|
node._update_thermal_state(0.1)
|
||||||
|
|
||||||
|
# Temperature should be higher
|
||||||
|
assert node.thermal.motor_temp > initial_temp
|
||||||
|
|
||||||
|
def test_temperature_saturation(self, node):
|
||||||
|
"""Test temperature doesn't exceed ambient with zero current."""
|
||||||
|
node.thermal.motor_temp = 50.0
|
||||||
|
node.current_left = 0.0
|
||||||
|
node.current_right = 0.0
|
||||||
|
|
||||||
|
# Cool down for many iterations
|
||||||
|
for _ in range(100):
|
||||||
|
node._update_thermal_state(0.1)
|
||||||
|
|
||||||
|
# Temperature should cool toward ambient
|
||||||
|
assert node.thermal.motor_temp < 50.0
|
||||||
|
assert node.thermal.motor_temp >= 25.0 # Don't go below ambient
|
||||||
|
|
||||||
|
def test_derate_hard_fault(self, node):
|
||||||
|
"""Test derate returns 0 on hard fault."""
|
||||||
|
node.state = ProtectionState.HARD_FAULT
|
||||||
|
|
||||||
|
derate = node._calculate_derate()
|
||||||
|
|
||||||
|
assert derate == 0.0
|
||||||
|
|
||||||
|
def test_derate_normal(self, node):
|
||||||
|
"""Test derate returns 1.0 in normal state."""
|
||||||
|
node.state = ProtectionState.NORMAL
|
||||||
|
node.thermal.motor_temp = 25.0 # Ambient
|
||||||
|
|
||||||
|
derate = node._calculate_derate()
|
||||||
|
|
||||||
|
assert derate == 1.0
|
||||||
|
|
||||||
|
def test_derate_soft_fault(self, node):
|
||||||
|
"""Test derate reduces in soft fault state."""
|
||||||
|
node.state = ProtectionState.SOFT_FAULT
|
||||||
|
node.current_max = 6.0 # Midway between soft and hard
|
||||||
|
node.thermal.motor_temp = 25.0
|
||||||
|
|
||||||
|
derate = node._calculate_derate()
|
||||||
|
|
||||||
|
# Should be reduced but not zero
|
||||||
|
assert 0.1 <= derate < 1.0
|
||||||
|
|
||||||
|
def test_derate_high_temperature(self, node):
|
||||||
|
"""Test thermal derating at high temperature."""
|
||||||
|
node.state = ProtectionState.NORMAL
|
||||||
|
node.thermal.motor_temp = 75.0 # Near limit (80°C)
|
||||||
|
|
||||||
|
derate = node._calculate_derate()
|
||||||
|
|
||||||
|
# Should be significantly reduced
|
||||||
|
assert derate < 0.5
|
||||||
|
|
||||||
|
def test_derate_maximum_temperature(self, node):
|
||||||
|
"""Test derate approaches zero at max temperature."""
|
||||||
|
node.state = ProtectionState.NORMAL
|
||||||
|
node.thermal.motor_temp = 80.0 # At limit
|
||||||
|
|
||||||
|
derate = node._calculate_derate()
|
||||||
|
|
||||||
|
assert derate == 0.0
|
||||||
|
|
||||||
|
def test_derate_clipping(self, node):
|
||||||
|
"""Test derate is clipped to [0, 1]."""
|
||||||
|
# Even with extreme calculations, should be in range
|
||||||
|
node.state = ProtectionState.NORMAL
|
||||||
|
node.thermal.motor_temp = -50.0 # Invalid, but test robustness
|
||||||
|
node.current_max = 20.0 # Extreme
|
||||||
|
|
||||||
|
derate = node._calculate_derate()
|
||||||
|
|
||||||
|
assert 0.0 <= derate <= 1.0
|
||||||
|
|
||||||
|
def test_protection_status_json(self, node):
|
||||||
|
"""Test protection status JSON format."""
|
||||||
|
node.current_left = 2.5
|
||||||
|
node.current_right = 3.0
|
||||||
|
node.state = ProtectionState.NORMAL
|
||||||
|
node.thermal.motor_temp = 45.0
|
||||||
|
|
||||||
|
# Publish would be called, but test JSON creation
|
||||||
|
status = {
|
||||||
|
"state": node.state.name,
|
||||||
|
"current_left": float(node.current_left),
|
||||||
|
"current_right": float(node.current_right),
|
||||||
|
"current_max": float(node.current_max),
|
||||||
|
"motor_temp": float(node.thermal.motor_temp),
|
||||||
|
"soft_limit": node.soft_limit,
|
||||||
|
"hard_limit": node.hard_limit,
|
||||||
|
}
|
||||||
|
|
||||||
|
json_str = json.dumps(status)
|
||||||
|
parsed = json.loads(json_str)
|
||||||
|
|
||||||
|
assert parsed["state"] == "NORMAL"
|
||||||
|
assert parsed["current_left"] == 2.5
|
||||||
|
assert parsed["motor_temp"] == 45.0
|
||||||
|
|
||||||
|
|
||||||
|
class TestMotorProtectionScenarios:
|
||||||
|
"""Integration-style tests for realistic scenarios."""
|
||||||
|
|
||||||
|
def test_scenario_normal_operation(self, node):
|
||||||
|
"""Scenario: normal motor operation."""
|
||||||
|
node.current_left = 2.0
|
||||||
|
node.current_right = 2.5
|
||||||
|
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
assert node.state == ProtectionState.NORMAL
|
||||||
|
derate = node._calculate_derate()
|
||||||
|
assert derate > 0.9
|
||||||
|
|
||||||
|
def test_scenario_slow_thermal_rise(self, node):
|
||||||
|
"""Scenario: gradual temperature increase from sustained current."""
|
||||||
|
node.current_left = 3.0
|
||||||
|
node.current_right = 3.0
|
||||||
|
|
||||||
|
# Simulate 10 seconds at 50Hz
|
||||||
|
for _ in range(500):
|
||||||
|
node._update_thermal_state(0.02)
|
||||||
|
|
||||||
|
# Temperature should have risen significantly
|
||||||
|
assert node.thermal.motor_temp > 40.0
|
||||||
|
assert node.thermal.motor_temp < 80.0
|
||||||
|
|
||||||
|
def test_scenario_soft_overcurrent_trip(self, node):
|
||||||
|
"""Scenario: soft overcurrent detection and derating."""
|
||||||
|
node.current_max = 5.5
|
||||||
|
|
||||||
|
node._check_overcurrent()
|
||||||
|
assert node.state == ProtectionState.SOFT_FAULT
|
||||||
|
|
||||||
|
derate = node._calculate_derate()
|
||||||
|
assert 0.0 < derate < 1.0
|
||||||
|
|
||||||
|
def test_scenario_hard_overcurrent_emergency(self, node):
|
||||||
|
"""Scenario: hard overcurrent triggers immediate shutdown."""
|
||||||
|
node.current_max = 9.0
|
||||||
|
|
||||||
|
node._check_overcurrent()
|
||||||
|
assert node.state == ProtectionState.HARD_FAULT
|
||||||
|
|
||||||
|
derate = node._calculate_derate()
|
||||||
|
assert derate == 0.0
|
||||||
|
|
||||||
|
def test_scenario_combined_thermal_current_derating(self, node):
|
||||||
|
"""Scenario: both current and thermal derating apply."""
|
||||||
|
# Soft overcurrent
|
||||||
|
node.current_max = 5.5
|
||||||
|
node.state = ProtectionState.SOFT_FAULT
|
||||||
|
|
||||||
|
# Also high temperature
|
||||||
|
node.thermal.motor_temp = 70.0
|
||||||
|
|
||||||
|
derate = node._calculate_derate()
|
||||||
|
|
||||||
|
# Should be reduced by both factors
|
||||||
|
assert derate < 0.5
|
||||||
|
|
||||||
|
def test_scenario_current_spike_recovery(self, node):
|
||||||
|
"""Scenario: brief current spike then recovery."""
|
||||||
|
# Spike
|
||||||
|
node.current_max = 8.5
|
||||||
|
node._check_overcurrent()
|
||||||
|
assert node.state == ProtectionState.HARD_FAULT
|
||||||
|
|
||||||
|
derate = node._calculate_derate()
|
||||||
|
assert derate == 0.0
|
||||||
|
|
||||||
|
# Recovery
|
||||||
|
node.current_max = 2.0
|
||||||
|
node._check_overcurrent()
|
||||||
|
assert node.state == ProtectionState.NORMAL
|
||||||
|
|
||||||
|
derate = node._calculate_derate()
|
||||||
|
assert derate > 0.9
|
||||||
|
|
||||||
|
def test_scenario_thermal_limit_shutdown(self, node):
|
||||||
|
"""Scenario: motor heats up and speed is derated to zero."""
|
||||||
|
node.state = ProtectionState.NORMAL
|
||||||
|
|
||||||
|
# Simulate sustained high current heating
|
||||||
|
node.current_left = 5.0
|
||||||
|
node.current_right = 5.0
|
||||||
|
|
||||||
|
# Heat up to max temp
|
||||||
|
while node.thermal.motor_temp < 80.0:
|
||||||
|
node._update_thermal_state(0.1)
|
||||||
|
|
||||||
|
# At max temp, derate should be zero
|
||||||
|
derate = node._calculate_derate()
|
||||||
|
assert derate == 0.0
|
||||||
|
|
||||||
|
def test_scenario_asymmetric_motor_loading(self, node):
|
||||||
|
"""Scenario: one motor draws more current than the other."""
|
||||||
|
node.current_left = 2.0
|
||||||
|
node.current_right = 6.5
|
||||||
|
|
||||||
|
node._timer_callback()
|
||||||
|
|
||||||
|
# Should use worst case (right motor)
|
||||||
|
assert node.current_max == 6.5
|
||||||
|
assert node.state == ProtectionState.SOFT_FAULT
|
||||||
244
src/ina219.c
Normal file
244
src/ina219.c
Normal file
@ -0,0 +1,244 @@
|
|||||||
|
#include "ina219.h"
|
||||||
|
#include "config.h"
|
||||||
|
#include "i2c1.h"
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
|
/* ================================================================
|
||||||
|
* INA219 Register Definitions
|
||||||
|
* ================================================================ */
|
||||||
|
|
||||||
|
#define INA219_REG_CONFIG 0x00
|
||||||
|
#define INA219_REG_SHUNT_VOLTAGE 0x01
|
||||||
|
#define INA219_REG_BUS_VOLTAGE 0x02
|
||||||
|
#define INA219_REG_POWER 0x03
|
||||||
|
#define INA219_REG_CURRENT 0x04
|
||||||
|
#define INA219_REG_CALIBRATION 0x05
|
||||||
|
|
||||||
|
/* Configuration Register Bits */
|
||||||
|
#define INA219_CONFIG_RESET (1 << 15)
|
||||||
|
#define INA219_CONFIG_BRNG_16V (0 << 13)
|
||||||
|
#define INA219_CONFIG_BRNG_32V (1 << 13)
|
||||||
|
#define INA219_CONFIG_PGA_40MV (0 << 11)
|
||||||
|
#define INA219_CONFIG_PGA_80MV (1 << 11)
|
||||||
|
#define INA219_CONFIG_PGA_160MV (2 << 11)
|
||||||
|
#define INA219_CONFIG_PGA_320MV (3 << 11)
|
||||||
|
#define INA219_CONFIG_BADC_9BIT (0 << 7)
|
||||||
|
#define INA219_CONFIG_BADC_10BIT (1 << 7)
|
||||||
|
#define INA219_CONFIG_BADC_11BIT (2 << 7)
|
||||||
|
#define INA219_CONFIG_BADC_12BIT (3 << 7)
|
||||||
|
#define INA219_CONFIG_SADC_9BIT (0 << 3)
|
||||||
|
#define INA219_CONFIG_SADC_10BIT (1 << 3)
|
||||||
|
#define INA219_CONFIG_SADC_11BIT (2 << 3)
|
||||||
|
#define INA219_CONFIG_SADC_12BIT (3 << 3)
|
||||||
|
#define INA219_CONFIG_MODE_SHUNT (0 << 0)
|
||||||
|
#define INA219_CONFIG_MODE_BUSVOLT (1 << 0)
|
||||||
|
#define INA219_CONFIG_MODE_BOTH (3 << 0)
|
||||||
|
|
||||||
|
/* I2C Addresses */
|
||||||
|
#define INA219_ADDR_LEFT_MOTOR 0x40 /* A0=A1=GND */
|
||||||
|
#define INA219_ADDR_RIGHT_MOTOR 0x41 /* A0=SDA, A1=GND */
|
||||||
|
|
||||||
|
/* ================================================================
|
||||||
|
* Internal State
|
||||||
|
* ================================================================ */
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint8_t i2c_addr;
|
||||||
|
uint16_t calibration_value;
|
||||||
|
uint16_t current_lsb_ua; /* Current LSB in µA */
|
||||||
|
uint16_t power_lsb_uw; /* Power LSB in µW */
|
||||||
|
} INA219State;
|
||||||
|
|
||||||
|
static INA219State s_ina219[INA219_COUNT] = {
|
||||||
|
[INA219_LEFT_MOTOR] = {.i2c_addr = INA219_ADDR_LEFT_MOTOR},
|
||||||
|
[INA219_RIGHT_MOTOR] = {.i2c_addr = INA219_ADDR_RIGHT_MOTOR}
|
||||||
|
};
|
||||||
|
|
||||||
|
/* ================================================================
|
||||||
|
* I2C Helper Functions
|
||||||
|
* ================================================================ */
|
||||||
|
|
||||||
|
static bool i2c_write_register(uint8_t addr, uint8_t reg, uint16_t value)
|
||||||
|
{
|
||||||
|
uint8_t buf[3] = {reg, (uint8_t)(value >> 8), (uint8_t)(value & 0xFF)};
|
||||||
|
return i2c1_write(addr, buf, sizeof(buf)) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool i2c_read_register(uint8_t addr, uint8_t reg, uint16_t *value)
|
||||||
|
{
|
||||||
|
uint8_t buf[2];
|
||||||
|
if (i2c1_write(addr, ®, 1) != 0) return false;
|
||||||
|
if (i2c1_read(addr, buf, sizeof(buf)) != 0) return false;
|
||||||
|
*value = ((uint16_t)buf[0] << 8) | buf[1];
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ================================================================
|
||||||
|
* Public API
|
||||||
|
* ================================================================ */
|
||||||
|
|
||||||
|
void ina219_init(void)
|
||||||
|
{
|
||||||
|
/* Ensure I2C1 is initialized before calling this */
|
||||||
|
/* Auto-calibrate both sensors for typical motor monitoring:
|
||||||
|
* - Max current: 5A
|
||||||
|
* - Shunt resistor: 0.1Ω
|
||||||
|
* - LSB: 160µA (5A / 32768)
|
||||||
|
*/
|
||||||
|
ina219_calibrate(INA219_LEFT_MOTOR, 5000, 100);
|
||||||
|
ina219_calibrate(INA219_RIGHT_MOTOR, 5000, 100);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ina219_calibrate(INA219Sensor sensor, uint16_t max_current_ma, uint16_t shunt_ohms_milli)
|
||||||
|
{
|
||||||
|
if (sensor >= INA219_COUNT) return;
|
||||||
|
|
||||||
|
INA219State *s = &s_ina219[sensor];
|
||||||
|
|
||||||
|
/* Calculate current LSB: max_current / 32768 (15-bit signed register)
|
||||||
|
* LSB unit: µA
|
||||||
|
* Example: 5000mA / 32768 ≈ 152.6µA → use 160µA (round up for safety)
|
||||||
|
*/
|
||||||
|
uint32_t current_lsb_ua = ((uint32_t)max_current_ma * 1000 + 32767) / 32768;
|
||||||
|
s->current_lsb_ua = (uint16_t)current_lsb_ua;
|
||||||
|
|
||||||
|
/* Power LSB = 20 × current_lsb_ua (20µW per 1µA of current LSB) */
|
||||||
|
s->power_lsb_uw = 20 * current_lsb_ua;
|
||||||
|
|
||||||
|
/* Calibration register: (0.04096) / (current_lsb_ua × shunt_ohms_milli / 1000)
|
||||||
|
* Simplified: 40960 / (current_lsb_ua × shunt_ohms_milli)
|
||||||
|
*/
|
||||||
|
uint32_t calibration = 40960 / ((uint32_t)current_lsb_ua * shunt_ohms_milli / 1000);
|
||||||
|
if (calibration > 65535) calibration = 65535;
|
||||||
|
s->calibration_value = (uint16_t)calibration;
|
||||||
|
|
||||||
|
/* Write calibration register */
|
||||||
|
i2c_write_register(s->i2c_addr, INA219_REG_CALIBRATION, s->calibration_value);
|
||||||
|
|
||||||
|
/* Configure for continuous conversion mode (12-bit ADC for both shunt and bus)
|
||||||
|
* Config: 32V range, 160mV PGA, 12-bit ADC, continuous mode
|
||||||
|
*/
|
||||||
|
uint16_t config = INA219_CONFIG_BRNG_32V
|
||||||
|
| INA219_CONFIG_PGA_160MV
|
||||||
|
| INA219_CONFIG_BADC_12BIT
|
||||||
|
| INA219_CONFIG_SADC_12BIT
|
||||||
|
| INA219_CONFIG_MODE_BOTH;
|
||||||
|
i2c_write_register(s->i2c_addr, INA219_REG_CONFIG, config);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ina219_read(INA219Sensor sensor, INA219Data *data)
|
||||||
|
{
|
||||||
|
if (sensor >= INA219_COUNT || !data) return false;
|
||||||
|
|
||||||
|
INA219State *s = &s_ina219[sensor];
|
||||||
|
uint8_t addr = s->i2c_addr;
|
||||||
|
uint16_t reg_value;
|
||||||
|
|
||||||
|
/* Read shunt voltage (register 0x01) */
|
||||||
|
if (!i2c_read_register(addr, INA219_REG_SHUNT_VOLTAGE, ®_value)) return false;
|
||||||
|
int16_t shunt_raw = (int16_t)reg_value;
|
||||||
|
data->shunt_voltage_uv = shunt_raw * 10; /* 10µV/LSB */
|
||||||
|
|
||||||
|
/* Read bus voltage (register 0x02) */
|
||||||
|
if (!i2c_read_register(addr, INA219_REG_BUS_VOLTAGE, ®_value)) return false;
|
||||||
|
uint16_t bus_raw = (reg_value >> 3) & 0x1FFF; /* 13-bit voltage, 4mV/LSB */
|
||||||
|
data->bus_voltage_mv = bus_raw * 4;
|
||||||
|
|
||||||
|
/* Read current (register 0x04) — requires calibration */
|
||||||
|
if (!i2c_read_register(addr, INA219_REG_CURRENT, ®_value)) return false;
|
||||||
|
int16_t current_raw = (int16_t)reg_value;
|
||||||
|
data->current_ma = (current_raw * (int32_t)s->current_lsb_ua) / 1000;
|
||||||
|
|
||||||
|
/* Read power (register 0x03) — in units of power_lsb */
|
||||||
|
if (!i2c_read_register(addr, INA219_REG_POWER, ®_value)) return false;
|
||||||
|
uint32_t power_raw = reg_value;
|
||||||
|
data->power_mw = (power_raw * (uint32_t)s->power_lsb_uw) / 1000;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ina219_read_bus_voltage_mv(INA219Sensor sensor, uint16_t *voltage_mv)
|
||||||
|
{
|
||||||
|
if (sensor >= INA219_COUNT || !voltage_mv) return false;
|
||||||
|
|
||||||
|
INA219State *s = &s_ina219[sensor];
|
||||||
|
uint16_t reg_value;
|
||||||
|
|
||||||
|
if (!i2c_read_register(s->i2c_addr, INA219_REG_BUS_VOLTAGE, ®_value)) return false;
|
||||||
|
|
||||||
|
uint16_t bus_raw = (reg_value >> 3) & 0x1FFF;
|
||||||
|
*voltage_mv = bus_raw * 4;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ina219_read_current_ma(INA219Sensor sensor, int16_t *current_ma)
|
||||||
|
{
|
||||||
|
if (sensor >= INA219_COUNT || !current_ma) return false;
|
||||||
|
|
||||||
|
INA219State *s = &s_ina219[sensor];
|
||||||
|
uint16_t reg_value;
|
||||||
|
|
||||||
|
if (!i2c_read_register(s->i2c_addr, INA219_REG_CURRENT, ®_value)) return false;
|
||||||
|
|
||||||
|
int16_t current_raw = (int16_t)reg_value;
|
||||||
|
*current_ma = (current_raw * (int32_t)s->current_lsb_ua) / 1000;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool ina219_read_power_mw(INA219Sensor sensor, uint32_t *power_mw)
|
||||||
|
{
|
||||||
|
if (sensor >= INA219_COUNT || !power_mw) return false;
|
||||||
|
|
||||||
|
INA219State *s = &s_ina219[sensor];
|
||||||
|
uint16_t reg_value;
|
||||||
|
|
||||||
|
if (!i2c_read_register(s->i2c_addr, INA219_REG_POWER, ®_value)) return false;
|
||||||
|
|
||||||
|
uint32_t power_raw = reg_value;
|
||||||
|
*power_mw = (power_raw * (uint32_t)s->power_lsb_uw) / 1000;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ina219_alert_enable(INA219Sensor sensor, uint16_t current_limit_ma)
|
||||||
|
{
|
||||||
|
if (sensor >= INA219_COUNT) return;
|
||||||
|
|
||||||
|
INA219State *s = &s_ina219[sensor];
|
||||||
|
|
||||||
|
/* Alert limit register: set to current threshold
|
||||||
|
* Current threshold = (limit_ma × 1000) / current_lsb_ua
|
||||||
|
*/
|
||||||
|
int16_t limit_raw = ((int32_t)current_limit_ma * 1000) / s->current_lsb_ua;
|
||||||
|
if (limit_raw > 32767) limit_raw = 32767;
|
||||||
|
|
||||||
|
/* Enable alert on over-limit, latching mode */
|
||||||
|
uint16_t alert_config = limit_raw;
|
||||||
|
i2c_write_register(s->i2c_addr, 0x06, alert_config); /* Alert register */
|
||||||
|
}
|
||||||
|
|
||||||
|
void ina219_alert_disable(INA219Sensor sensor)
|
||||||
|
{
|
||||||
|
if (sensor >= INA219_COUNT) return;
|
||||||
|
|
||||||
|
INA219State *s = &s_ina219[sensor];
|
||||||
|
|
||||||
|
/* Write 0 to alert register to disable */
|
||||||
|
i2c_write_register(s->i2c_addr, 0x06, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
void ina219_reset(INA219Sensor sensor)
|
||||||
|
{
|
||||||
|
if (sensor >= INA219_COUNT) return;
|
||||||
|
|
||||||
|
INA219State *s = &s_ina219[sensor];
|
||||||
|
|
||||||
|
/* Set reset bit in config register */
|
||||||
|
i2c_write_register(s->i2c_addr, INA219_REG_CONFIG, INA219_CONFIG_RESET);
|
||||||
|
|
||||||
|
/* Wait for reset to complete (~1ms) */
|
||||||
|
uint32_t start = 0; /* In real code, use HAL_GetTick() */
|
||||||
|
while (start < 2) start++; /* Simple delay */
|
||||||
|
|
||||||
|
/* Re-calibrate after reset */
|
||||||
|
ina219_calibrate(sensor, 5000, 100);
|
||||||
|
}
|
||||||
@ -22,6 +22,7 @@
|
|||||||
#include "buzzer.h"
|
#include "buzzer.h"
|
||||||
#include "led.h"
|
#include "led.h"
|
||||||
#include "servo.h"
|
#include "servo.h"
|
||||||
|
#include "ina219.h"
|
||||||
#include "power_mgmt.h"
|
#include "power_mgmt.h"
|
||||||
#include "battery.h"
|
#include "battery.h"
|
||||||
#include <math.h>
|
#include <math.h>
|
||||||
@ -181,6 +182,7 @@ int main(void) {
|
|||||||
int chip = bmp280_init();
|
int chip = bmp280_init();
|
||||||
baro_chip = (chip > 0) ? chip : 0;
|
baro_chip = (chip > 0) ? chip : 0;
|
||||||
mag_type = mag_init();
|
mag_type = mag_init();
|
||||||
|
ina219_init(); /* Init INA219 dual motor current monitoring (Issue #214) */
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|||||||
267
test/test_ina219.py
Normal file
267
test/test_ina219.py
Normal file
@ -0,0 +1,267 @@
|
|||||||
|
"""
|
||||||
|
test_ina219.py — INA219 power monitor driver tests (Issue #214)
|
||||||
|
|
||||||
|
Verifies:
|
||||||
|
- Calibration: LSB calculation for max current and shunt resistance
|
||||||
|
- Register calculations: voltage, current, power from raw ADC values
|
||||||
|
- Multi-sensor support: independent left/right motor monitoring
|
||||||
|
- Alert thresholds: overcurrent limit configuration
|
||||||
|
- Edge cases: boundary values, overflow handling
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# ── Constants ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
# Default calibration: 5A max, 0.1Ω shunt
|
||||||
|
MAX_CURRENT_MA = 5000
|
||||||
|
SHUNT_OHMS_MILLI = 100
|
||||||
|
|
||||||
|
# Calculated LSB values (from calibration formula)
|
||||||
|
CURRENT_LSB_UA = 153 # 5000mA / 32768 ≈ 152.59, floor to 153
|
||||||
|
POWER_LSB_UW = 3060 # 20 × 153
|
||||||
|
|
||||||
|
# Register scales
|
||||||
|
BUS_VOLTAGE_LSB_MV = 4 # Bus voltage: 4mV/LSB
|
||||||
|
SHUNT_VOLTAGE_LSB_UV = 10 # Shunt voltage: 10µV/LSB
|
||||||
|
|
||||||
|
|
||||||
|
# ── INA219 Simulator ───────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
class INA219Simulator:
|
||||||
|
def __init__(self):
|
||||||
|
# Two sensors: left and right motor
|
||||||
|
self.sensors = {
|
||||||
|
'left': {
|
||||||
|
'i2c_addr': 0x40,
|
||||||
|
'calibration': 0,
|
||||||
|
'current_lsb_ua': CURRENT_LSB_UA,
|
||||||
|
'power_lsb_uw': POWER_LSB_UW,
|
||||||
|
},
|
||||||
|
'right': {
|
||||||
|
'i2c_addr': 0x41,
|
||||||
|
'calibration': 0,
|
||||||
|
'current_lsb_ua': CURRENT_LSB_UA,
|
||||||
|
'power_lsb_uw': POWER_LSB_UW,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def calibrate(self, sensor_name, max_current_ma, shunt_ohms_milli):
|
||||||
|
"""Calibrate a sensor."""
|
||||||
|
if sensor_name not in self.sensors:
|
||||||
|
return False
|
||||||
|
|
||||||
|
s = self.sensors[sensor_name]
|
||||||
|
|
||||||
|
# Calculate current LSB
|
||||||
|
current_lsb_ua = (max_current_ma * 1000 + 32767) // 32768
|
||||||
|
s['current_lsb_ua'] = current_lsb_ua
|
||||||
|
|
||||||
|
# Power LSB = 20 × current_lsb_ua
|
||||||
|
s['power_lsb_uw'] = 20 * current_lsb_ua
|
||||||
|
|
||||||
|
# Calibration register
|
||||||
|
calibration = 40960 // (current_lsb_ua * shunt_ohms_milli // 1000)
|
||||||
|
if calibration > 65535:
|
||||||
|
calibration = 65535
|
||||||
|
s['calibration'] = calibration
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def bus_voltage_to_mv(self, raw_register):
|
||||||
|
"""Convert raw bus voltage register (13-bit) to mV."""
|
||||||
|
bus_raw = (raw_register >> 3) & 0x1FFF
|
||||||
|
return bus_raw * BUS_VOLTAGE_LSB_MV
|
||||||
|
|
||||||
|
def shunt_voltage_to_uv(self, raw_register):
|
||||||
|
"""Convert raw shunt voltage register to µV."""
|
||||||
|
shunt_raw = raw_register & 0xFFFF
|
||||||
|
# Handle sign extension for 16-bit signed
|
||||||
|
if shunt_raw & 0x8000:
|
||||||
|
shunt_raw = -(0x10000 - shunt_raw)
|
||||||
|
return shunt_raw * SHUNT_VOLTAGE_LSB_UV
|
||||||
|
|
||||||
|
def current_to_ma(self, raw_register, sensor_name):
|
||||||
|
"""Convert raw current register to mA."""
|
||||||
|
s = self.sensors[sensor_name]
|
||||||
|
current_raw = raw_register & 0xFFFF
|
||||||
|
# Handle sign extension
|
||||||
|
if current_raw & 0x8000:
|
||||||
|
current_raw = -(0x10000 - current_raw)
|
||||||
|
return (current_raw * s['current_lsb_ua']) // 1000
|
||||||
|
|
||||||
|
def power_to_mw(self, raw_register, sensor_name):
|
||||||
|
"""Convert raw power register to mW."""
|
||||||
|
s = self.sensors[sensor_name]
|
||||||
|
power_raw = raw_register & 0xFFFF
|
||||||
|
return (power_raw * s['power_lsb_uw']) // 1000
|
||||||
|
|
||||||
|
|
||||||
|
# ── Tests ──────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def test_calibration():
|
||||||
|
"""Calibration should calculate correct LSB values."""
|
||||||
|
sim = INA219Simulator()
|
||||||
|
assert sim.calibrate('left', 5000, 100)
|
||||||
|
|
||||||
|
# Expected: 5000mA / 32768 ≈ 152.6, rounded up to 153µA
|
||||||
|
assert sim.sensors['left']['current_lsb_ua'] == 153
|
||||||
|
assert sim.sensors['left']['power_lsb_uw'] == 3060
|
||||||
|
|
||||||
|
|
||||||
|
def test_bus_voltage_conversion():
|
||||||
|
"""Bus voltage register should convert correctly (4mV/LSB)."""
|
||||||
|
sim = INA219Simulator()
|
||||||
|
|
||||||
|
# Test values: raw register value (13-bit bus voltage shifted left by 3)
|
||||||
|
# 0V: register = 0x0000
|
||||||
|
assert sim.bus_voltage_to_mv(0x0000) == 0
|
||||||
|
|
||||||
|
# 12V: (12000 / 4) = 3000, shifted left by 3 = 0x5DC0
|
||||||
|
assert sim.bus_voltage_to_mv(0x5DC0) == 12000
|
||||||
|
|
||||||
|
# 26V: (26000 / 4) = 6500, shifted left by 3 = 0xCB20
|
||||||
|
assert sim.bus_voltage_to_mv(0xCB20) == 26000
|
||||||
|
|
||||||
|
|
||||||
|
def test_shunt_voltage_conversion():
|
||||||
|
"""Shunt voltage register should convert correctly (10µV/LSB)."""
|
||||||
|
sim = INA219Simulator()
|
||||||
|
|
||||||
|
# 0µV
|
||||||
|
assert sim.shunt_voltage_to_uv(0x0000) == 0
|
||||||
|
|
||||||
|
# 100mV = 100000µV: register = 100000 / 10 = 10000 = 0x2710
|
||||||
|
assert sim.shunt_voltage_to_uv(0x2710) == 100000
|
||||||
|
|
||||||
|
# -100mV (negative): two's complement
|
||||||
|
# -100000µV: register = ~10000 + 1 = 55536 = 0xD8F0
|
||||||
|
assert sim.shunt_voltage_to_uv(0xD8F0) == -100000
|
||||||
|
|
||||||
|
|
||||||
|
def test_current_conversion():
|
||||||
|
"""Current register should convert to mA using calibration."""
|
||||||
|
sim = INA219Simulator()
|
||||||
|
sim.calibrate('left', 5000, 100)
|
||||||
|
|
||||||
|
# 0mA
|
||||||
|
assert sim.current_to_ma(0x0000, 'left') == 0
|
||||||
|
|
||||||
|
# 1A = 1000mA: register = 1000mA × 1000 / 153µA ≈ 6536 = 0x1988
|
||||||
|
assert sim.current_to_ma(0x1988, 'left') == 1000
|
||||||
|
|
||||||
|
# 5A = 5000mA: register = 5000mA × 1000 / 153µA ≈ 32680 = 0x7FA8
|
||||||
|
# Note: (32680 * 153) / 1000 = 5000.6, integer division = 5000
|
||||||
|
assert sim.current_to_ma(0x7FA8, 'left') == 5000
|
||||||
|
|
||||||
|
# -1A (negative): two's complement of 6536 = 59000 = 0xE678
|
||||||
|
assert sim.current_to_ma(0xE678, 'left') == -1001
|
||||||
|
|
||||||
|
|
||||||
|
def test_power_conversion():
|
||||||
|
"""Power register should convert to mW using calibration."""
|
||||||
|
sim = INA219Simulator()
|
||||||
|
sim.calibrate('left', 5000, 100)
|
||||||
|
|
||||||
|
# 0W
|
||||||
|
assert sim.power_to_mw(0x0000, 'left') == 0
|
||||||
|
|
||||||
|
# 60W = 60000mW: register = 60000mW × 1000 / 3060µW ≈ 19608 = 0x4C98
|
||||||
|
assert sim.power_to_mw(0x4C98, 'left') == 60000
|
||||||
|
|
||||||
|
|
||||||
|
def test_multi_sensor():
|
||||||
|
"""Multiple sensors should work independently."""
|
||||||
|
sim = INA219Simulator()
|
||||||
|
assert sim.calibrate('left', 5000, 100)
|
||||||
|
assert sim.calibrate('right', 5000, 100)
|
||||||
|
|
||||||
|
# Both should have same calibration
|
||||||
|
assert sim.sensors['left']['current_lsb_ua'] == sim.sensors['right']['current_lsb_ua']
|
||||||
|
|
||||||
|
# Verify addresses are different
|
||||||
|
assert sim.sensors['left']['i2c_addr'] == 0x40
|
||||||
|
assert sim.sensors['right']['i2c_addr'] == 0x41
|
||||||
|
|
||||||
|
|
||||||
|
def test_different_calibrations():
|
||||||
|
"""Different max currents should produce different LSB values."""
|
||||||
|
sim1 = INA219Simulator()
|
||||||
|
sim2 = INA219Simulator()
|
||||||
|
|
||||||
|
sim1.calibrate('left', 5000, 100) # 5A
|
||||||
|
sim2.calibrate('left', 10000, 100) # 10A
|
||||||
|
|
||||||
|
# Higher max current = larger LSB
|
||||||
|
assert sim2.sensors['left']['current_lsb_ua'] > sim1.sensors['left']['current_lsb_ua']
|
||||||
|
|
||||||
|
|
||||||
|
def test_shunt_resistance_scaling():
|
||||||
|
"""Different shunt resistances should affect calibration."""
|
||||||
|
sim1 = INA219Simulator()
|
||||||
|
sim2 = INA219Simulator()
|
||||||
|
|
||||||
|
sim1.calibrate('left', 5000, 100) # 0.1Ω
|
||||||
|
sim2.calibrate('left', 5000, 200) # 0.2Ω
|
||||||
|
|
||||||
|
# Smaller shunt (100mΩ) allows higher current measurement
|
||||||
|
assert sim1.sensors['left']['calibration'] != sim2.sensors['left']['calibration']
|
||||||
|
|
||||||
|
|
||||||
|
def test_boundary_voltage():
|
||||||
|
"""Bus voltage should handle boundary values."""
|
||||||
|
sim = INA219Simulator()
|
||||||
|
|
||||||
|
# Min (0V)
|
||||||
|
assert sim.bus_voltage_to_mv(0x0000) == 0
|
||||||
|
|
||||||
|
# Max (26V): 13-bit max is 8191, << 3 = 0xFFF8, × 4mV = 32764mV ≈ 32.76V
|
||||||
|
# Verify: (0xFFF8 >> 3) & 0x1FFF = 0x1FFF = 8191, × 4 = 32764
|
||||||
|
assert sim.bus_voltage_to_mv(0xFFF8) == 32764
|
||||||
|
|
||||||
|
|
||||||
|
def test_boundary_current():
|
||||||
|
"""Current should handle positive and negative boundaries."""
|
||||||
|
sim = INA219Simulator()
|
||||||
|
sim.calibrate('left', 5000, 100)
|
||||||
|
|
||||||
|
# Max positive (~5A)
|
||||||
|
max_current = sim.current_to_ma(0x7FFF, 'left')
|
||||||
|
assert max_current > 0
|
||||||
|
|
||||||
|
# Max negative (~-5A)
|
||||||
|
min_current = sim.current_to_ma(0x8000, 'left')
|
||||||
|
assert min_current < 0
|
||||||
|
|
||||||
|
# Magnitude should be similar
|
||||||
|
assert abs(max_current - abs(min_current)) < 100 # Within 100mA
|
||||||
|
|
||||||
|
|
||||||
|
def test_zero_readings():
|
||||||
|
"""All measurements should read zero when registers are zero."""
|
||||||
|
sim = INA219Simulator()
|
||||||
|
sim.calibrate('left', 5000, 100)
|
||||||
|
|
||||||
|
assert sim.bus_voltage_to_mv(0x0000) == 0
|
||||||
|
assert sim.shunt_voltage_to_uv(0x0000) == 0
|
||||||
|
assert sim.current_to_ma(0x0000, 'left') == 0
|
||||||
|
assert sim.power_to_mw(0x0000, 'left') == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_realistic_motor_readings():
|
||||||
|
"""Test realistic motor current/power readings."""
|
||||||
|
sim = INA219Simulator()
|
||||||
|
sim.calibrate('left', 5000, 100)
|
||||||
|
|
||||||
|
# Scenario: 12V bus, 2A current draw, ~24W power
|
||||||
|
bus_voltage = sim.bus_voltage_to_mv(0x5DC0) # ~12000mV
|
||||||
|
current = sim.current_to_ma(0x3310, 'left') # ~2000mA
|
||||||
|
power = sim.power_to_mw(0x1E93, 'left') # ~24000mW
|
||||||
|
|
||||||
|
assert bus_voltage > 10000 # ~12V
|
||||||
|
assert 1500 < current < 2500 # ~2A
|
||||||
|
assert 20000 < power < 30000 # ~24W
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
pytest.main([__file__, '-v'])
|
||||||
Loading…
x
Reference in New Issue
Block a user