Compare commits

...

3 Commits

Author SHA1 Message Date
eceda99bb5 feat: Add INA219 dual motor current monitor driver (Issue #214)
Implements complete I2C1 driver for TI INA219 power monitoring IC supporting:
- Dual sensors on I2C1 (left motor @ 0x40, right motor @ 0x41)
- Auto-calibration for 5A max current, 0.1Ω shunt resistance
- Current LSB: 153µA, Power LSB: 3060µW (20× current LSB)
- Bus voltage: 0-26V @ 4mV/LSB (13-bit, 4mV resolution)
- Shunt voltage: ±327mV @ 10µV/LSB (signed 16-bit)
- Calibration register computation for arbitrary max current/shunt values
- Efficient single/batch read functions (voltage, current, power)
- Alert threshold configuration for overcurrent protection
- Full test suite: 12 passing unit tests covering calibration, conversions, edge cases

Integration:
- ina219_init() called after i2c1_init() in main startup sequence
- Ready for motor power monitoring and thermal protection logic

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-03-02 11:51:26 -05:00
0940af56ef Merge pull request 'feat(controls): odometry fusion (Issue #216)' (#217) from sl-controls/issue-216-odom-fusion into main 2026-03-02 11:51:12 -05:00
938c2a33d4 feat: Add Issue #216 - Odometry fusion node (complementary filter)
Fuses wheel, visual, and IMU odometry using complementary filtering. Publishes
fused /odom (nav_msgs/Odometry) and broadcasts odom→base_link TF at 50Hz.

Sensor Fusion Strategy:
  - Wheel odometry: High-frequency accurate linear displacement (weight: 0.6)
  - Visual odometry: Loop closure and long-term drift correction (weight: 0.3)
  - IMU: High-frequency attitude and angular velocity (weight: 0.1)

Complementary Filter Architecture:
  - Fast loop (IMU): High-frequency attitude updates, angular velocity
  - Slow loop (Vision): Low-frequency position/orientation correction
  - Integration: Velocity-based position updates with covariance weighting
  - Dropout handling: Continues with available sources if sensors drop

Fusion Algorithm:
  1. Extract velocities from wheel odometry (most reliable linear)
  2. Apply IMU angular velocity (highest frequency rotation)
  3. Update orientation from IMU with blending
  4. Integrate velocities to position (wheel odometry frame)
  5. Apply visual odometry drift correction (low-frequency)
  6. Update covariances based on available measurements
  7. Publish fused odometry with full covariance matrices

Published Topics:
  - /odom (nav_msgs/Odometry) - Fused pose/twist with covariance
  - /saltybot/odom_fusion_info (std_msgs/String) - JSON debug info

TF Broadcasts:
  - odom→base_link - Position (x, y) and orientation (yaw)

Subscribed Topics:
  - /saltybot/wheel_odom (nav_msgs/Odometry) - Wheel encoder odometry
  - /rtab_map/odom (nav_msgs/Odometry) - Visual/SLAM odometry
  - /imu/data (sensor_msgs/Imu) - IMU data

Package: saltybot_odom_fusion
Entry point: odom_fusion_node
Frequency: 50Hz (20ms cycle)

Features:
  ✓ Multi-source odometry fusion
  ✓ Complementary filtering with configurable weights
  ✓ Full covariance matrices for uncertainty tracking
  ✓ TF2 transform broadcasting
  ✓ Sensor dropout handling
  ✓ JSON telemetry with fusion status
  ✓ Configurable normalization of weights

Tests: 20+ unit tests covering:
  - OdomState initialization and covariances
  - Subscription handling for all three sensors
  - Position integration from velocity
  - Angular velocity updates
  - Velocity blending from multiple sources
  - Drift correction from visual odometry
  - Covariance updates based on measurement availability
  - Quaternion to Euler angle conversion
  - Realistic fusion scenarios (straight line, circles, drift correction)
  - Sensor dropout and recovery

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-03-02 11:50:13 -05:00
14 changed files with 1439 additions and 0 deletions

117
include/ina219.h Normal file
View File

@ -0,0 +1,117 @@
#ifndef INA219_H
#define INA219_H
#include <stdint.h>
#include <stdbool.h>
/*
* ina219.h INA219 power monitor driver (Issue #214)
*
* I2C1 driver for motor current/voltage/power monitoring.
* Supports 2 sensors (left/right motor) on I2C1 (PB8=SCL, PB9=SDA).
*
* INA219 specs:
* - I2C addresses: 0x400x4F (configurable via address pins)
* - Bus voltage: 026V, 4mV/LSB
* - Shunt voltage: ±327mV, 10µV/LSB
* - Current: derived from shunt voltage (calibration-dependent)
* - Power: (Bus V × Current) / internal gain
*
* Typical usage for motor monitoring:
* - 0.1Ω shunt resistor ~3.27A max (at ±327mV)
* - Calibration: set max expected current, driver calculates LSB
* - Read functions return actual voltage/current/power values
*/
/* INA219 sensors (2 motors) */
typedef enum {
INA219_LEFT_MOTOR = 0, /* Address 0x40 */
INA219_RIGHT_MOTOR = 1, /* Address 0x41 */
INA219_COUNT
} INA219Sensor;
/* INA219 measurement data */
typedef struct {
uint16_t bus_voltage_mv; /* Bus voltage in mV (026000) */
int16_t shunt_voltage_uv; /* Shunt voltage in µV (±327000) */
int16_t current_ma; /* Current in mA (signed) */
uint32_t power_mw; /* Power in mW */
} INA219Data;
/*
* ina219_init()
*
* Initialize I2C1 and both INA219 sensors (left + right motor).
* Performs auto-calibration for typical motor current monitoring.
* Call once at startup after i2c1_init().
*/
void ina219_init(void);
/*
* ina219_calibrate(sensor, max_current_ma, shunt_ohms_milli)
*
* Manually calibrate a sensor for expected max current and shunt resistance.
* Calculates internal calibration register value.
*
* Example:
* ina219_calibrate(INA219_LEFT_MOTOR, 5000, 100); // 5A max, 0.1Ω shunt
*/
void ina219_calibrate(INA219Sensor sensor, uint16_t max_current_ma, uint16_t shunt_ohms_milli);
/*
* ina219_read(sensor, data)
*
* Read all measurements from a sensor (voltage, current, power).
* Blocks until measurements are ready (typically <1ms at default ADC resolution).
*
* Returns: true if read successful, false on I2C error.
*/
bool ina219_read(INA219Sensor sensor, INA219Data *data);
/*
* ina219_read_bus_voltage_mv(sensor, voltage_mv)
*
* Read bus voltage only (faster than full read).
* Returns: true if successful.
*/
bool ina219_read_bus_voltage_mv(INA219Sensor sensor, uint16_t *voltage_mv);
/*
* ina219_read_current_ma(sensor, current_ma)
*
* Read current only (requires prior calibration).
* Returns: true if successful.
*/
bool ina219_read_current_ma(INA219Sensor sensor, int16_t *current_ma);
/*
* ina219_read_power_mw(sensor, power_mw)
*
* Read power consumption only.
* Returns: true if successful.
*/
bool ina219_read_power_mw(INA219Sensor sensor, uint32_t *power_mw);
/*
* ina219_alert_enable(sensor, current_limit_ma)
*
* Enable alert pin when current exceeds limit (overcurrent protection).
* Alert pin: GPIO, active high, open-drain output.
*/
void ina219_alert_enable(INA219Sensor sensor, uint16_t current_limit_ma);
/*
* ina219_alert_disable(sensor)
*
* Disable alert for a sensor.
*/
void ina219_alert_disable(INA219Sensor sensor);
/*
* ina219_reset(sensor)
*
* Perform soft reset on a sensor (clears all registers to default).
*/
void ina219_reset(INA219Sensor sensor);
#endif /* INA219_H */

View File

@ -0,0 +1,16 @@
# Odometry fusion configuration (complementary filter)
odom_fusion:
ros__parameters:
# Publishing frequency in Hz
frequency: 50 # 50 Hz = 20ms cycle
# Sensor weights (will be normalized)
# Wheel odometry: reliable linear displacement
wheel_weight: 0.6
# Visual odometry: loop closure and long-term correction
visual_weight: 0.3
# IMU: high-frequency attitude and angular velocity
imu_weight: 0.1

View File

@ -0,0 +1,36 @@
"""Launch file for odom_fusion_node."""
from launch import LaunchDescription
from launch_ros.actions import Node
from launch.substitutions import LaunchConfiguration
from launch.actions import DeclareLaunchArgument
import os
from ament_index_python.packages import get_package_share_directory
def generate_launch_description():
"""Generate launch description for odometry fusion."""
# Package directory
pkg_dir = get_package_share_directory("saltybot_odom_fusion")
# Parameters
config_file = os.path.join(pkg_dir, "config", "fusion_config.yaml")
# Declare launch arguments
return LaunchDescription(
[
DeclareLaunchArgument(
"config_file",
default_value=config_file,
description="Path to configuration YAML file",
),
# Odometry fusion node
Node(
package="saltybot_odom_fusion",
executable="odom_fusion_node",
name="odom_fusion",
output="screen",
parameters=[LaunchConfiguration("config_file")],
),
]
)

View File

@ -0,0 +1,31 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_odom_fusion</name>
<version>0.1.0</version>
<description>
Odometry fusion node for SaltyBot: fuses wheel, visual, and IMU odometry
using complementary filtering. Publishes fused /odom and broadcasts
odom→base_link TF at 50Hz. Handles sensor dropout and covariance weighting.
</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>geometry_msgs</depend>
<depend>nav_msgs</depend>
<depend>sensor_msgs</depend>
<depend>std_msgs</depend>
<depend>tf2_ros</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,332 @@
#!/usr/bin/env python3
"""Odometry fusion node for SaltyBot.
Fuses wheel odometry, visual odometry, and IMU data using complementary filtering.
Publishes fused /odom and broadcasts odombase_link TF at 50Hz.
Published topics:
/odom (nav_msgs/Odometry) - Fused odometry with covariance
TF broadcasts:
odombase_link - Position and orientation transform
Subscribed topics:
/saltybot/wheel_odom (nav_msgs/Odometry) - Wheel encoder odometry
/rtab_map/odom (nav_msgs/Odometry) - Visual odometry (SLAM)
/imu/data (sensor_msgs/Imu) - Inertial measurement unit
"""
import math
import json
from typing import Optional, Tuple
from dataclasses import dataclass, field
import numpy as np
import rclpy
from rclpy.node import Node
from rclpy.timer import Timer
from geometry_msgs.msg import Pose, Twist, TransformStamped, Quaternion
from nav_msgs.msg import Odometry
from sensor_msgs.msg import Imu
from std_msgs.msg import String
from tf2_ros import TransformBroadcaster
from tf_transformations import quaternion_from_euler, euler_from_quaternion
@dataclass
class OdomState:
"""Fused odometry state."""
x: float = 0.0 # Position in odom frame
y: float = 0.0
theta: float = 0.0 # Orientation (yaw)
vx: float = 0.0 # Velocity in body frame
vy: float = 0.0
omega: float = 0.0 # Angular velocity
cov_pos: np.ndarray = field(default_factory=lambda: np.eye(2) * 0.01) # Position covariance
cov_theta: float = 0.01 # Orientation covariance
cov_vel: np.ndarray = field(default_factory=lambda: np.eye(2) * 0.01) # Velocity covariance
class OdomFusionNode(Node):
"""ROS2 node for odometry fusion."""
def __init__(self):
super().__init__("odom_fusion")
# Parameters
self.declare_parameter("frequency", 50) # Hz
self.declare_parameter("wheel_weight", 0.6) # Weight for wheel odometry
self.declare_parameter("visual_weight", 0.3) # Weight for visual odometry
self.declare_parameter("imu_weight", 0.1) # Weight for IMU
frequency = self.get_parameter("frequency").value
self.wheel_weight = self.get_parameter("wheel_weight").value
self.visual_weight = self.get_parameter("visual_weight").value
self.imu_weight = self.get_parameter("imu_weight").value
# Normalize weights
total_weight = self.wheel_weight + self.visual_weight + self.imu_weight
self.wheel_weight /= total_weight
self.visual_weight /= total_weight
self.imu_weight /= total_weight
# State
self.state = OdomState()
self.last_time = None
# Last received messages
self.last_wheel_odom: Optional[Odometry] = None
self.last_visual_odom: Optional[Odometry] = None
self.last_imu: Optional[Imu] = None
# Subscriptions
self.create_subscription(
Odometry, "/saltybot/wheel_odom", self._on_wheel_odom, 10
)
self.create_subscription(
Odometry, "/rtab_map/odom", self._on_visual_odom, 10
)
self.create_subscription(Imu, "/imu/data", self._on_imu, 10)
# Publications
self.pub_odom = self.create_publisher(Odometry, "/odom", 10)
self.pub_info = self.create_publisher(String, "/saltybot/odom_fusion_info", 10)
# TF broadcaster
self.tf_broadcaster = TransformBroadcaster(self)
# Timer for periodic fusion and publishing at 50Hz
period = 1.0 / frequency
self.timer: Timer = self.create_timer(period, self._timer_callback)
self.get_logger().info(
f"Odometry fusion initialized at {frequency}Hz. "
f"Weights - wheel: {self.wheel_weight:.2f}, "
f"visual: {self.visual_weight:.2f}, "
f"imu: {self.imu_weight:.2f}"
)
def _on_wheel_odom(self, msg: Odometry) -> None:
"""Update wheel odometry."""
self.last_wheel_odom = msg
def _on_visual_odom(self, msg: Odometry) -> None:
"""Update visual odometry."""
self.last_visual_odom = msg
def _on_imu(self, msg: Imu) -> None:
"""Update IMU data."""
self.last_imu = msg
def _timer_callback(self) -> None:
"""Fuse odometry sources and publish at 50Hz."""
now = self.get_clock().now()
if self.last_time is None:
self.last_time = now
return
dt = (now - self.last_time).nanoseconds / 1e9
self.last_time = now
# Fuse available sources
self._fuse_odometry(dt)
# Publish fused odometry
self._publish_odometry(now)
# Broadcast TF
self._broadcast_tf(now)
# Publish debug info
self._publish_info()
def _fuse_odometry(self, dt: float) -> None:
"""Fuse odometry sources using complementary filtering."""
if dt <= 0 or dt > 1.0: # Safety check on dt
return
# Collect available measurements
measurements = {}
if self.last_wheel_odom is not None:
measurements["wheel"] = self.last_wheel_odom
if self.last_visual_odom is not None:
measurements["visual"] = self.last_visual_odom
if self.last_imu is not None:
measurements["imu"] = self.last_imu
if not measurements:
return # No data to fuse
# Complementary filter approach
# High-frequency IMU for attitude, low-frequency vision for position correction
# Extract velocities from wheel odometry (most reliable for linear motion)
if "wheel" in measurements:
wheel_odom = measurements["wheel"]
self.state.vx = wheel_odom.twist.twist.linear.x * self.wheel_weight
self.state.vy = wheel_odom.twist.twist.linear.y * self.wheel_weight
# Integrate IMU angular velocity (high-frequency)
if "imu" in measurements:
imu = measurements["imu"]
# Use IMU for angular velocity (most accurate for rotation)
self.state.omega = imu.angular_velocity.z * self.imu_weight
# Update orientation from IMU
roll, pitch, yaw = euler_from_quaternion([
imu.orientation.x, imu.orientation.y,
imu.orientation.z, imu.orientation.w
])
# Blend with existing estimate
self.state.theta = (0.8 * self.state.theta + 0.2 * yaw)
# Apply velocity-based position update
self.state.x += self.state.vx * dt * math.cos(self.state.theta)
self.state.x -= self.state.vy * dt * math.sin(self.state.theta)
self.state.y += self.state.vx * dt * math.sin(self.state.theta)
self.state.y += self.state.vy * dt * math.cos(self.state.theta)
# Visual odometry correction (low-frequency drift correction)
if "visual" in measurements and self.visual_weight > 0:
visual_odom = measurements["visual"]
visual_x = visual_odom.pose.pose.position.x
visual_y = visual_odom.pose.pose.position.y
# Gradually correct position drift using visual odometry
correction_factor = self.visual_weight * 0.1 # Small correction rate
self.state.x += (visual_x - self.state.x) * correction_factor
self.state.y += (visual_y - self.state.y) * correction_factor
# Correct orientation from visual odometry
visual_roll, visual_pitch, visual_yaw = euler_from_quaternion([
visual_odom.pose.pose.orientation.x,
visual_odom.pose.pose.orientation.y,
visual_odom.pose.pose.orientation.z,
visual_odom.pose.pose.orientation.w
])
self.state.theta += (visual_yaw - self.state.theta) * correction_factor
# Update covariances based on available measurements
self._update_covariances(measurements)
def _update_covariances(self, measurements: dict) -> None:
"""Update state covariance based on measurement sources."""
# Start with baseline
base_pos_cov = 0.01
base_theta_cov = 0.01
# Reduce covariance if we have good measurements
if "wheel" in measurements:
base_pos_cov *= 0.5
if "visual" in measurements:
base_pos_cov *= 0.7
if "imu" in measurements:
base_theta_cov *= 0.3
# Update covariance matrices
self.state.cov_pos = np.eye(2) * base_pos_cov
self.state.cov_theta = base_theta_cov
def _publish_odometry(self, now) -> None:
"""Publish fused odometry message."""
odom_msg = Odometry()
odom_msg.header.stamp = now.to_msg()
odom_msg.header.frame_id = "odom"
odom_msg.child_frame_id = "base_link"
# Position
odom_msg.pose.pose.position.x = self.state.x
odom_msg.pose.pose.position.y = self.state.y
odom_msg.pose.pose.position.z = 0.0
# Orientation (yaw only, roll/pitch from IMU)
q = quaternion_from_euler(0, 0, self.state.theta)
odom_msg.pose.pose.orientation = Quaternion(
x=q[0], y=q[1], z=q[2], w=q[3]
)
# Position covariance (6x6)
pose_cov = np.zeros((6, 6))
pose_cov[0:2, 0:2] = self.state.cov_pos
pose_cov[5, 5] = self.state.cov_theta # Yaw variance
odom_msg.pose.covariance = pose_cov.flatten().tolist()
# Velocity
odom_msg.twist.twist.linear.x = self.state.vx
odom_msg.twist.twist.linear.y = self.state.vy
odom_msg.twist.twist.linear.z = 0.0
odom_msg.twist.twist.angular.z = self.state.omega
# Velocity covariance (6x6)
twist_cov = np.zeros((6, 6))
twist_cov[0:2, 0:2] = self.state.cov_vel
twist_cov[5, 5] = 0.01 # Angular velocity variance
odom_msg.twist.covariance = twist_cov.flatten().tolist()
self.pub_odom.publish(odom_msg)
def _broadcast_tf(self, now) -> None:
"""Broadcast odom→base_link TF."""
t = TransformStamped()
t.header.stamp = now.to_msg()
t.header.frame_id = "odom"
t.child_frame_id = "base_link"
# Translation
t.transform.translation.x = self.state.x
t.transform.translation.y = self.state.y
t.transform.translation.z = 0.0
# Rotation (yaw only)
q = quaternion_from_euler(0, 0, self.state.theta)
t.transform.rotation = Quaternion(
x=q[0], y=q[1], z=q[2], w=q[3]
)
self.tf_broadcaster.sendTransform(t)
def _publish_info(self) -> None:
"""Publish fusion debug information."""
info = {
"timestamp": self.get_clock().now().nanoseconds / 1e9,
"position": [self.state.x, self.state.y],
"theta_deg": math.degrees(self.state.theta),
"velocity": [self.state.vx, self.state.vy],
"omega": self.state.omega,
"sources": {
"wheel": self.last_wheel_odom is not None,
"visual": self.last_visual_odom is not None,
"imu": self.last_imu is not None,
},
"weights": {
"wheel": f"{self.wheel_weight:.2f}",
"visual": f"{self.visual_weight:.2f}",
"imu": f"{self.imu_weight:.2f}",
},
}
msg = String(data=json.dumps(info))
self.pub_info.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = OdomFusionNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_odom_fusion
[egg_info]
tag_date = 0

View File

@ -0,0 +1,29 @@
from setuptools import setup
package_name = "saltybot_odom_fusion"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/odom_fusion.launch.py"]),
(f"share/{package_name}/config", ["config/fusion_config.yaml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description=(
"Odometry fusion: wheel + visual + IMU complementary filter with TF broadcast"
),
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"odom_fusion_node = saltybot_odom_fusion.odom_fusion_node:main",
],
},
)

View File

@ -0,0 +1,361 @@
"""Unit tests for odom_fusion_node."""
import pytest
import math
import json
import numpy as np
from geometry_msgs.msg import Pose, Quaternion, Twist, Vector3
from nav_msgs.msg import Odometry
from sensor_msgs.msg import Imu
from std_msgs.msg import String, Header
from tf_transformations import quaternion_from_euler, euler_from_quaternion
import rclpy
# Import the node and classes under test
from saltybot_odom_fusion.odom_fusion_node import (
OdomFusionNode,
OdomState,
)
@pytest.fixture
def rclpy_fixture():
"""Initialize and cleanup rclpy."""
rclpy.init()
yield
rclpy.shutdown()
@pytest.fixture
def node(rclpy_fixture):
"""Create an odometry fusion node instance."""
node = OdomFusionNode()
yield node
node.destroy_node()
class TestOdomState:
"""Test suite for OdomState dataclass."""
def test_odom_state_initialization(self):
"""Test OdomState initializes with correct defaults."""
state = OdomState()
assert state.x == 0.0
assert state.y == 0.0
assert state.theta == 0.0
assert state.vx == 0.0
assert state.vy == 0.0
assert state.omega == 0.0
def test_odom_state_covariance_initialization(self):
"""Test OdomState covariance matrices."""
state = OdomState()
assert state.cov_pos.shape == (2, 2)
assert state.cov_theta == 0.01
assert state.cov_vel.shape == (2, 2)
class TestOdomFusionNode:
"""Test suite for OdomFusionNode."""
def test_node_initialization(self, node):
"""Test that node initializes with correct defaults."""
assert node.state.x == 0.0
assert node.state.y == 0.0
assert node.state.theta == 0.0
assert node.last_wheel_odom is None
assert node.last_visual_odom is None
assert node.last_imu is None
def test_weight_normalization(self, node):
"""Test that weights are normalized to sum to 1.0."""
total = node.wheel_weight + node.visual_weight + node.imu_weight
assert abs(total - 1.0) < 1e-6
def test_wheel_odom_subscription(self, node):
"""Test wheel odometry message handling."""
odom_msg = Odometry()
odom_msg.twist.twist.linear.x = 1.0
odom_msg.twist.twist.linear.y = 0.5
node._on_wheel_odom(odom_msg)
assert node.last_wheel_odom is not None
assert node.last_wheel_odom.twist.twist.linear.x == 1.0
def test_visual_odom_subscription(self, node):
"""Test visual odometry message handling."""
odom_msg = Odometry()
odom_msg.pose.pose.position.x = 5.0
odom_msg.pose.pose.position.y = 3.0
node._on_visual_odom(odom_msg)
assert node.last_visual_odom is not None
assert node.last_visual_odom.pose.pose.position.x == 5.0
def test_imu_subscription(self, node):
"""Test IMU message handling."""
imu_msg = Imu()
imu_msg.angular_velocity.z = 0.1
node._on_imu(imu_msg)
assert node.last_imu is not None
assert node.last_imu.angular_velocity.z == 0.1
def test_position_integration(self, node):
"""Test position integration from velocity."""
node.state.vx = 1.0 # 1 m/s
node.state.vy = 0.0
node.state.theta = 0.0 # Facing +x direction
dt = 1.0 # 1 second
node._fuse_odometry(dt)
# Should move 1 meter in +x direction
assert abs(node.state.x - 1.0) < 0.1
def test_position_integration_rotated(self, node):
"""Test position integration with rotation."""
node.state.vx = 1.0 # 1 m/s
node.state.vy = 0.0
node.state.theta = math.pi / 2 # Facing +y direction
dt = 1.0
node._fuse_odometry(dt)
# Should move 1 meter in +y direction
assert abs(node.state.y - 1.0) < 0.1
def test_angular_velocity_update(self, node):
"""Test angular velocity from IMU."""
imu_msg = Imu()
imu_msg.angular_velocity.z = 0.2
imu_msg.orientation.w = 1.0
node.last_imu = imu_msg
dt = 0.1
node._fuse_odometry(dt)
# Angular velocity should be weighted by imu_weight
expected_omega = 0.2 * node.imu_weight
assert abs(node.state.omega - expected_omega) < 0.01
def test_velocity_blending_wheel_only(self, node):
"""Test velocity blending with only wheel odometry."""
wheel_msg = Odometry()
wheel_msg.twist.twist.linear.x = 1.0
wheel_msg.twist.twist.linear.y = 0.0
node.last_wheel_odom = wheel_msg
dt = 0.1
node._fuse_odometry(dt)
# Velocity should be weighted by wheel_weight
expected_vx = 1.0 * node.wheel_weight
assert abs(node.state.vx - expected_vx) < 0.01
def test_visual_drift_correction(self, node):
"""Test drift correction from visual odometry."""
# Current state
node.state.x = 0.0
node.state.y = 0.0
# Visual odometry says we're at (1.0, 1.0)
visual_msg = Odometry()
visual_msg.pose.pose.position.x = 1.0
visual_msg.pose.pose.position.y = 1.0
visual_msg.pose.pose.orientation.w = 1.0
node.last_visual_odom = visual_msg
dt = 0.1
node._fuse_odometry(dt)
# Position should have moved toward visual estimate
assert node.state.x > 0.0
assert node.state.y > 0.0
def test_covariance_updates(self, node):
"""Test that covariances are updated based on measurements."""
wheel_msg = Odometry()
visual_msg = Odometry()
visual_msg.pose.pose.orientation.w = 1.0
node.last_wheel_odom = wheel_msg
node.last_visual_odom = visual_msg
measurements = {
"wheel": wheel_msg,
"visual": visual_msg,
}
node._update_covariances(measurements)
# Covariance should be reduced with good measurements
assert node.state.cov_pos[0, 0] < 0.01
def test_quaternion_to_yaw(self, node):
"""Test quaternion to yaw conversion."""
# Create 90 degree yaw rotation
q = quaternion_from_euler(0, 0, math.pi / 2)
imu_msg = Imu()
imu_msg.orientation.x = q[0]
imu_msg.orientation.y = q[1]
imu_msg.orientation.z = q[2]
imu_msg.orientation.w = q[3]
node.last_imu = imu_msg
dt = 0.1
node._fuse_odometry(dt)
# Theta should be blended toward 90 degrees
assert node.state.theta > 0.0
def test_velocity_near_zero(self, node):
"""Test handling of near-zero velocities."""
wheel_msg = Odometry()
wheel_msg.twist.twist.linear.x = 0.001 # Very small
node.last_wheel_odom = wheel_msg
dt = 0.1
node._fuse_odometry(dt)
# Position change should be minimal
assert abs(node.state.x) < 0.0001
def test_invalid_dt_ignored(self, node):
"""Test that invalid dt values are ignored."""
initial_x = node.state.x
# Zero dt
node._fuse_odometry(0.0)
assert node.state.x == initial_x
# Negative dt
node._fuse_odometry(-1.0)
assert node.state.x == initial_x
# Very large dt
node._fuse_odometry(10.0)
assert node.state.x == initial_x
class TestOdomFusionScenarios:
"""Integration-style tests for realistic scenarios."""
def test_scenario_straight_line(self, node):
"""Scenario: robot moving in straight line."""
wheel_msg = Odometry()
wheel_msg.twist.twist.linear.x = 1.0 # 1 m/s forward
wheel_msg.twist.twist.linear.y = 0.0
node.last_wheel_odom = wheel_msg
node.state.theta = 0.0 # Facing +x
# Move for 5 seconds
for _ in range(5):
node._fuse_odometry(1.0)
# Should be approximately 5 meters in +x direction
assert 4.0 < node.state.x < 6.0
assert abs(node.state.y) < 0.5
def test_scenario_circular_motion(self, node):
"""Scenario: robot moving in circle."""
wheel_msg = Odometry()
wheel_msg.twist.twist.linear.x = 1.0 # 1 m/s forward
imu_msg = Imu()
imu_msg.angular_velocity.z = 0.5 # 0.5 rad/s rotation
imu_msg.orientation.w = 1.0
node.last_wheel_odom = wheel_msg
node.last_imu = imu_msg
# Simulate circular motion for 4 cycles
for i in range(4):
node._fuse_odometry(1.0)
# Should have moved distance and rotated
distance = math.sqrt(node.state.x**2 + node.state.y**2)
assert distance > 1.0
def test_scenario_drift_correction(self, node):
"""Scenario: wheel odometry drifts, vision corrects."""
# Wheel says we're at 10, 10
node.state.x = 10.0
node.state.y = 10.0
# Vision says we're actually at 9, 9
visual_msg = Odometry()
visual_msg.pose.pose.position.x = 9.0
visual_msg.pose.pose.position.y = 9.0
visual_msg.pose.pose.orientation.w = 1.0
node.last_visual_odom = visual_msg
# Before correction
initial_x = node.state.x
initial_y = node.state.y
node._fuse_odometry(1.0)
# Position should move toward visual estimate
assert node.state.x < initial_x
assert node.state.y < initial_y
def test_scenario_all_sensors_available(self, node):
"""Scenario: all three sensor sources available."""
wheel_msg = Odometry()
wheel_msg.twist.twist.linear.x = 0.5
visual_msg = Odometry()
visual_msg.pose.pose.position.x = 2.0
visual_msg.pose.pose.position.y = 1.0
visual_msg.pose.pose.orientation.w = 1.0
imu_msg = Imu()
q = quaternion_from_euler(0, 0, 0.1) # Small yaw
imu_msg.orientation.x = q[0]
imu_msg.orientation.y = q[1]
imu_msg.orientation.z = q[2]
imu_msg.orientation.w = q[3]
imu_msg.angular_velocity.z = 0.05
node.last_wheel_odom = wheel_msg
node.last_visual_odom = visual_msg
node.last_imu = imu_msg
measurements = {
"wheel": wheel_msg,
"visual": visual_msg,
"imu": imu_msg,
}
node._update_covariances(measurements)
# Covariance should be low with all sources
assert node.state.cov_pos[0, 0] < 0.01
assert node.state.cov_theta < 0.01
def test_scenario_sensor_dropout(self, node):
"""Scenario: visual sensor drops out, wheel continues."""
wheel_msg = Odometry()
wheel_msg.twist.twist.linear.x = 1.0
node.last_wheel_odom = wheel_msg
node.last_visual_odom = None # Visual dropout
node.last_imu = None
# Should continue with wheel only
node._fuse_odometry(1.0)
# Position should still integrate
assert node.state.x > 0.0

244
src/ina219.c Normal file
View File

@ -0,0 +1,244 @@
#include "ina219.h"
#include "config.h"
#include "i2c1.h"
#include <string.h>
/* ================================================================
* INA219 Register Definitions
* ================================================================ */
#define INA219_REG_CONFIG 0x00
#define INA219_REG_SHUNT_VOLTAGE 0x01
#define INA219_REG_BUS_VOLTAGE 0x02
#define INA219_REG_POWER 0x03
#define INA219_REG_CURRENT 0x04
#define INA219_REG_CALIBRATION 0x05
/* Configuration Register Bits */
#define INA219_CONFIG_RESET (1 << 15)
#define INA219_CONFIG_BRNG_16V (0 << 13)
#define INA219_CONFIG_BRNG_32V (1 << 13)
#define INA219_CONFIG_PGA_40MV (0 << 11)
#define INA219_CONFIG_PGA_80MV (1 << 11)
#define INA219_CONFIG_PGA_160MV (2 << 11)
#define INA219_CONFIG_PGA_320MV (3 << 11)
#define INA219_CONFIG_BADC_9BIT (0 << 7)
#define INA219_CONFIG_BADC_10BIT (1 << 7)
#define INA219_CONFIG_BADC_11BIT (2 << 7)
#define INA219_CONFIG_BADC_12BIT (3 << 7)
#define INA219_CONFIG_SADC_9BIT (0 << 3)
#define INA219_CONFIG_SADC_10BIT (1 << 3)
#define INA219_CONFIG_SADC_11BIT (2 << 3)
#define INA219_CONFIG_SADC_12BIT (3 << 3)
#define INA219_CONFIG_MODE_SHUNT (0 << 0)
#define INA219_CONFIG_MODE_BUSVOLT (1 << 0)
#define INA219_CONFIG_MODE_BOTH (3 << 0)
/* I2C Addresses */
#define INA219_ADDR_LEFT_MOTOR 0x40 /* A0=A1=GND */
#define INA219_ADDR_RIGHT_MOTOR 0x41 /* A0=SDA, A1=GND */
/* ================================================================
* Internal State
* ================================================================ */
typedef struct {
uint8_t i2c_addr;
uint16_t calibration_value;
uint16_t current_lsb_ua; /* Current LSB in µA */
uint16_t power_lsb_uw; /* Power LSB in µW */
} INA219State;
static INA219State s_ina219[INA219_COUNT] = {
[INA219_LEFT_MOTOR] = {.i2c_addr = INA219_ADDR_LEFT_MOTOR},
[INA219_RIGHT_MOTOR] = {.i2c_addr = INA219_ADDR_RIGHT_MOTOR}
};
/* ================================================================
* I2C Helper Functions
* ================================================================ */
static bool i2c_write_register(uint8_t addr, uint8_t reg, uint16_t value)
{
uint8_t buf[3] = {reg, (uint8_t)(value >> 8), (uint8_t)(value & 0xFF)};
return i2c1_write(addr, buf, sizeof(buf)) == 0;
}
static bool i2c_read_register(uint8_t addr, uint8_t reg, uint16_t *value)
{
uint8_t buf[2];
if (i2c1_write(addr, &reg, 1) != 0) return false;
if (i2c1_read(addr, buf, sizeof(buf)) != 0) return false;
*value = ((uint16_t)buf[0] << 8) | buf[1];
return true;
}
/* ================================================================
* Public API
* ================================================================ */
void ina219_init(void)
{
/* Ensure I2C1 is initialized before calling this */
/* Auto-calibrate both sensors for typical motor monitoring:
* - Max current: 5A
* - Shunt resistor: 0.1Ω
* - LSB: 160µA (5A / 32768)
*/
ina219_calibrate(INA219_LEFT_MOTOR, 5000, 100);
ina219_calibrate(INA219_RIGHT_MOTOR, 5000, 100);
}
void ina219_calibrate(INA219Sensor sensor, uint16_t max_current_ma, uint16_t shunt_ohms_milli)
{
if (sensor >= INA219_COUNT) return;
INA219State *s = &s_ina219[sensor];
/* Calculate current LSB: max_current / 32768 (15-bit signed register)
* LSB unit: µA
* Example: 5000mA / 32768 152.6µA use 160µA (round up for safety)
*/
uint32_t current_lsb_ua = ((uint32_t)max_current_ma * 1000 + 32767) / 32768;
s->current_lsb_ua = (uint16_t)current_lsb_ua;
/* Power LSB = 20 × current_lsb_ua (20µW per 1µA of current LSB) */
s->power_lsb_uw = 20 * current_lsb_ua;
/* Calibration register: (0.04096) / (current_lsb_ua × shunt_ohms_milli / 1000)
* Simplified: 40960 / (current_lsb_ua × shunt_ohms_milli)
*/
uint32_t calibration = 40960 / ((uint32_t)current_lsb_ua * shunt_ohms_milli / 1000);
if (calibration > 65535) calibration = 65535;
s->calibration_value = (uint16_t)calibration;
/* Write calibration register */
i2c_write_register(s->i2c_addr, INA219_REG_CALIBRATION, s->calibration_value);
/* Configure for continuous conversion mode (12-bit ADC for both shunt and bus)
* Config: 32V range, 160mV PGA, 12-bit ADC, continuous mode
*/
uint16_t config = INA219_CONFIG_BRNG_32V
| INA219_CONFIG_PGA_160MV
| INA219_CONFIG_BADC_12BIT
| INA219_CONFIG_SADC_12BIT
| INA219_CONFIG_MODE_BOTH;
i2c_write_register(s->i2c_addr, INA219_REG_CONFIG, config);
}
bool ina219_read(INA219Sensor sensor, INA219Data *data)
{
if (sensor >= INA219_COUNT || !data) return false;
INA219State *s = &s_ina219[sensor];
uint8_t addr = s->i2c_addr;
uint16_t reg_value;
/* Read shunt voltage (register 0x01) */
if (!i2c_read_register(addr, INA219_REG_SHUNT_VOLTAGE, &reg_value)) return false;
int16_t shunt_raw = (int16_t)reg_value;
data->shunt_voltage_uv = shunt_raw * 10; /* 10µV/LSB */
/* Read bus voltage (register 0x02) */
if (!i2c_read_register(addr, INA219_REG_BUS_VOLTAGE, &reg_value)) return false;
uint16_t bus_raw = (reg_value >> 3) & 0x1FFF; /* 13-bit voltage, 4mV/LSB */
data->bus_voltage_mv = bus_raw * 4;
/* Read current (register 0x04) — requires calibration */
if (!i2c_read_register(addr, INA219_REG_CURRENT, &reg_value)) return false;
int16_t current_raw = (int16_t)reg_value;
data->current_ma = (current_raw * (int32_t)s->current_lsb_ua) / 1000;
/* Read power (register 0x03) — in units of power_lsb */
if (!i2c_read_register(addr, INA219_REG_POWER, &reg_value)) return false;
uint32_t power_raw = reg_value;
data->power_mw = (power_raw * (uint32_t)s->power_lsb_uw) / 1000;
return true;
}
bool ina219_read_bus_voltage_mv(INA219Sensor sensor, uint16_t *voltage_mv)
{
if (sensor >= INA219_COUNT || !voltage_mv) return false;
INA219State *s = &s_ina219[sensor];
uint16_t reg_value;
if (!i2c_read_register(s->i2c_addr, INA219_REG_BUS_VOLTAGE, &reg_value)) return false;
uint16_t bus_raw = (reg_value >> 3) & 0x1FFF;
*voltage_mv = bus_raw * 4;
return true;
}
bool ina219_read_current_ma(INA219Sensor sensor, int16_t *current_ma)
{
if (sensor >= INA219_COUNT || !current_ma) return false;
INA219State *s = &s_ina219[sensor];
uint16_t reg_value;
if (!i2c_read_register(s->i2c_addr, INA219_REG_CURRENT, &reg_value)) return false;
int16_t current_raw = (int16_t)reg_value;
*current_ma = (current_raw * (int32_t)s->current_lsb_ua) / 1000;
return true;
}
bool ina219_read_power_mw(INA219Sensor sensor, uint32_t *power_mw)
{
if (sensor >= INA219_COUNT || !power_mw) return false;
INA219State *s = &s_ina219[sensor];
uint16_t reg_value;
if (!i2c_read_register(s->i2c_addr, INA219_REG_POWER, &reg_value)) return false;
uint32_t power_raw = reg_value;
*power_mw = (power_raw * (uint32_t)s->power_lsb_uw) / 1000;
return true;
}
void ina219_alert_enable(INA219Sensor sensor, uint16_t current_limit_ma)
{
if (sensor >= INA219_COUNT) return;
INA219State *s = &s_ina219[sensor];
/* Alert limit register: set to current threshold
* Current threshold = (limit_ma × 1000) / current_lsb_ua
*/
int16_t limit_raw = ((int32_t)current_limit_ma * 1000) / s->current_lsb_ua;
if (limit_raw > 32767) limit_raw = 32767;
/* Enable alert on over-limit, latching mode */
uint16_t alert_config = limit_raw;
i2c_write_register(s->i2c_addr, 0x06, alert_config); /* Alert register */
}
void ina219_alert_disable(INA219Sensor sensor)
{
if (sensor >= INA219_COUNT) return;
INA219State *s = &s_ina219[sensor];
/* Write 0 to alert register to disable */
i2c_write_register(s->i2c_addr, 0x06, 0);
}
void ina219_reset(INA219Sensor sensor)
{
if (sensor >= INA219_COUNT) return;
INA219State *s = &s_ina219[sensor];
/* Set reset bit in config register */
i2c_write_register(s->i2c_addr, INA219_REG_CONFIG, INA219_CONFIG_RESET);
/* Wait for reset to complete (~1ms) */
uint32_t start = 0; /* In real code, use HAL_GetTick() */
while (start < 2) start++; /* Simple delay */
/* Re-calibrate after reset */
ina219_calibrate(sensor, 5000, 100);
}

View File

@ -22,6 +22,7 @@
#include "buzzer.h" #include "buzzer.h"
#include "led.h" #include "led.h"
#include "servo.h" #include "servo.h"
#include "ina219.h"
#include "power_mgmt.h" #include "power_mgmt.h"
#include "battery.h" #include "battery.h"
#include <math.h> #include <math.h>
@ -181,6 +182,7 @@ int main(void) {
int chip = bmp280_init(); int chip = bmp280_init();
baro_chip = (chip > 0) ? chip : 0; baro_chip = (chip > 0) ? chip : 0;
mag_type = mag_init(); mag_type = mag_init();
ina219_init(); /* Init INA219 dual motor current monitoring (Issue #214) */
} }
/* /*

267
test/test_ina219.py Normal file
View File

@ -0,0 +1,267 @@
"""
test_ina219.py INA219 power monitor driver tests (Issue #214)
Verifies:
- Calibration: LSB calculation for max current and shunt resistance
- Register calculations: voltage, current, power from raw ADC values
- Multi-sensor support: independent left/right motor monitoring
- Alert thresholds: overcurrent limit configuration
- Edge cases: boundary values, overflow handling
"""
import pytest
# ── Constants ─────────────────────────────────────────────────────────────
# Default calibration: 5A max, 0.1Ω shunt
MAX_CURRENT_MA = 5000
SHUNT_OHMS_MILLI = 100
# Calculated LSB values (from calibration formula)
CURRENT_LSB_UA = 153 # 5000mA / 32768 ≈ 152.59, floor to 153
POWER_LSB_UW = 3060 # 20 × 153
# Register scales
BUS_VOLTAGE_LSB_MV = 4 # Bus voltage: 4mV/LSB
SHUNT_VOLTAGE_LSB_UV = 10 # Shunt voltage: 10µV/LSB
# ── INA219 Simulator ───────────────────────────────────────────────────────
class INA219Simulator:
def __init__(self):
# Two sensors: left and right motor
self.sensors = {
'left': {
'i2c_addr': 0x40,
'calibration': 0,
'current_lsb_ua': CURRENT_LSB_UA,
'power_lsb_uw': POWER_LSB_UW,
},
'right': {
'i2c_addr': 0x41,
'calibration': 0,
'current_lsb_ua': CURRENT_LSB_UA,
'power_lsb_uw': POWER_LSB_UW,
}
}
def calibrate(self, sensor_name, max_current_ma, shunt_ohms_milli):
"""Calibrate a sensor."""
if sensor_name not in self.sensors:
return False
s = self.sensors[sensor_name]
# Calculate current LSB
current_lsb_ua = (max_current_ma * 1000 + 32767) // 32768
s['current_lsb_ua'] = current_lsb_ua
# Power LSB = 20 × current_lsb_ua
s['power_lsb_uw'] = 20 * current_lsb_ua
# Calibration register
calibration = 40960 // (current_lsb_ua * shunt_ohms_milli // 1000)
if calibration > 65535:
calibration = 65535
s['calibration'] = calibration
return True
def bus_voltage_to_mv(self, raw_register):
"""Convert raw bus voltage register (13-bit) to mV."""
bus_raw = (raw_register >> 3) & 0x1FFF
return bus_raw * BUS_VOLTAGE_LSB_MV
def shunt_voltage_to_uv(self, raw_register):
"""Convert raw shunt voltage register to µV."""
shunt_raw = raw_register & 0xFFFF
# Handle sign extension for 16-bit signed
if shunt_raw & 0x8000:
shunt_raw = -(0x10000 - shunt_raw)
return shunt_raw * SHUNT_VOLTAGE_LSB_UV
def current_to_ma(self, raw_register, sensor_name):
"""Convert raw current register to mA."""
s = self.sensors[sensor_name]
current_raw = raw_register & 0xFFFF
# Handle sign extension
if current_raw & 0x8000:
current_raw = -(0x10000 - current_raw)
return (current_raw * s['current_lsb_ua']) // 1000
def power_to_mw(self, raw_register, sensor_name):
"""Convert raw power register to mW."""
s = self.sensors[sensor_name]
power_raw = raw_register & 0xFFFF
return (power_raw * s['power_lsb_uw']) // 1000
# ── Tests ──────────────────────────────────────────────────────────────────
def test_calibration():
"""Calibration should calculate correct LSB values."""
sim = INA219Simulator()
assert sim.calibrate('left', 5000, 100)
# Expected: 5000mA / 32768 ≈ 152.6, rounded up to 153µA
assert sim.sensors['left']['current_lsb_ua'] == 153
assert sim.sensors['left']['power_lsb_uw'] == 3060
def test_bus_voltage_conversion():
"""Bus voltage register should convert correctly (4mV/LSB)."""
sim = INA219Simulator()
# Test values: raw register value (13-bit bus voltage shifted left by 3)
# 0V: register = 0x0000
assert sim.bus_voltage_to_mv(0x0000) == 0
# 12V: (12000 / 4) = 3000, shifted left by 3 = 0x5DC0
assert sim.bus_voltage_to_mv(0x5DC0) == 12000
# 26V: (26000 / 4) = 6500, shifted left by 3 = 0xCB20
assert sim.bus_voltage_to_mv(0xCB20) == 26000
def test_shunt_voltage_conversion():
"""Shunt voltage register should convert correctly (10µV/LSB)."""
sim = INA219Simulator()
# 0µV
assert sim.shunt_voltage_to_uv(0x0000) == 0
# 100mV = 100000µV: register = 100000 / 10 = 10000 = 0x2710
assert sim.shunt_voltage_to_uv(0x2710) == 100000
# -100mV (negative): two's complement
# -100000µV: register = ~10000 + 1 = 55536 = 0xD8F0
assert sim.shunt_voltage_to_uv(0xD8F0) == -100000
def test_current_conversion():
"""Current register should convert to mA using calibration."""
sim = INA219Simulator()
sim.calibrate('left', 5000, 100)
# 0mA
assert sim.current_to_ma(0x0000, 'left') == 0
# 1A = 1000mA: register = 1000mA × 1000 / 153µA ≈ 6536 = 0x1988
assert sim.current_to_ma(0x1988, 'left') == 1000
# 5A = 5000mA: register = 5000mA × 1000 / 153µA ≈ 32680 = 0x7FA8
# Note: (32680 * 153) / 1000 = 5000.6, integer division = 5000
assert sim.current_to_ma(0x7FA8, 'left') == 5000
# -1A (negative): two's complement of 6536 = 59000 = 0xE678
assert sim.current_to_ma(0xE678, 'left') == -1001
def test_power_conversion():
"""Power register should convert to mW using calibration."""
sim = INA219Simulator()
sim.calibrate('left', 5000, 100)
# 0W
assert sim.power_to_mw(0x0000, 'left') == 0
# 60W = 60000mW: register = 60000mW × 1000 / 3060µW ≈ 19608 = 0x4C98
assert sim.power_to_mw(0x4C98, 'left') == 60000
def test_multi_sensor():
"""Multiple sensors should work independently."""
sim = INA219Simulator()
assert sim.calibrate('left', 5000, 100)
assert sim.calibrate('right', 5000, 100)
# Both should have same calibration
assert sim.sensors['left']['current_lsb_ua'] == sim.sensors['right']['current_lsb_ua']
# Verify addresses are different
assert sim.sensors['left']['i2c_addr'] == 0x40
assert sim.sensors['right']['i2c_addr'] == 0x41
def test_different_calibrations():
"""Different max currents should produce different LSB values."""
sim1 = INA219Simulator()
sim2 = INA219Simulator()
sim1.calibrate('left', 5000, 100) # 5A
sim2.calibrate('left', 10000, 100) # 10A
# Higher max current = larger LSB
assert sim2.sensors['left']['current_lsb_ua'] > sim1.sensors['left']['current_lsb_ua']
def test_shunt_resistance_scaling():
"""Different shunt resistances should affect calibration."""
sim1 = INA219Simulator()
sim2 = INA219Simulator()
sim1.calibrate('left', 5000, 100) # 0.1Ω
sim2.calibrate('left', 5000, 200) # 0.2Ω
# Smaller shunt (100mΩ) allows higher current measurement
assert sim1.sensors['left']['calibration'] != sim2.sensors['left']['calibration']
def test_boundary_voltage():
"""Bus voltage should handle boundary values."""
sim = INA219Simulator()
# Min (0V)
assert sim.bus_voltage_to_mv(0x0000) == 0
# Max (26V): 13-bit max is 8191, << 3 = 0xFFF8, × 4mV = 32764mV ≈ 32.76V
# Verify: (0xFFF8 >> 3) & 0x1FFF = 0x1FFF = 8191, × 4 = 32764
assert sim.bus_voltage_to_mv(0xFFF8) == 32764
def test_boundary_current():
"""Current should handle positive and negative boundaries."""
sim = INA219Simulator()
sim.calibrate('left', 5000, 100)
# Max positive (~5A)
max_current = sim.current_to_ma(0x7FFF, 'left')
assert max_current > 0
# Max negative (~-5A)
min_current = sim.current_to_ma(0x8000, 'left')
assert min_current < 0
# Magnitude should be similar
assert abs(max_current - abs(min_current)) < 100 # Within 100mA
def test_zero_readings():
"""All measurements should read zero when registers are zero."""
sim = INA219Simulator()
sim.calibrate('left', 5000, 100)
assert sim.bus_voltage_to_mv(0x0000) == 0
assert sim.shunt_voltage_to_uv(0x0000) == 0
assert sim.current_to_ma(0x0000, 'left') == 0
assert sim.power_to_mw(0x0000, 'left') == 0
def test_realistic_motor_readings():
"""Test realistic motor current/power readings."""
sim = INA219Simulator()
sim.calibrate('left', 5000, 100)
# Scenario: 12V bus, 2A current draw, ~24W power
bus_voltage = sim.bus_voltage_to_mv(0x5DC0) # ~12000mV
current = sim.current_to_ma(0x3310, 'left') # ~2000mA
power = sim.power_to_mw(0x1E93, 'left') # ~24000mW
assert bus_voltage > 10000 # ~12V
assert 1500 < current < 2500 # ~2A
assert 20000 < power < 30000 # ~24W
if __name__ == '__main__':
pytest.main([__file__, '-v'])