Compare commits
3 Commits
b9d01762e2
...
eceda99bb5
| Author | SHA1 | Date | |
|---|---|---|---|
| eceda99bb5 | |||
| 0940af56ef | |||
| 938c2a33d4 |
117
include/ina219.h
Normal file
117
include/ina219.h
Normal file
@ -0,0 +1,117 @@
|
||||
#ifndef INA219_H
|
||||
#define INA219_H
|
||||
|
||||
#include <stdint.h>
|
||||
#include <stdbool.h>
|
||||
|
||||
/*
|
||||
* ina219.h — INA219 power monitor driver (Issue #214)
|
||||
*
|
||||
* I2C1 driver for motor current/voltage/power monitoring.
|
||||
* Supports 2 sensors (left/right motor) on I2C1 (PB8=SCL, PB9=SDA).
|
||||
*
|
||||
* INA219 specs:
|
||||
* - I2C addresses: 0x40–0x4F (configurable via address pins)
|
||||
* - Bus voltage: 0–26V, 4mV/LSB
|
||||
* - Shunt voltage: ±327mV, 10µV/LSB
|
||||
* - Current: derived from shunt voltage (calibration-dependent)
|
||||
* - Power: (Bus V × Current) / internal gain
|
||||
*
|
||||
* Typical usage for motor monitoring:
|
||||
* - 0.1Ω shunt resistor → ~3.27A max (at ±327mV)
|
||||
* - Calibration: set max expected current, driver calculates LSB
|
||||
* - Read functions return actual voltage/current/power values
|
||||
*/
|
||||
|
||||
/* INA219 sensors (2 motors) */
|
||||
typedef enum {
|
||||
INA219_LEFT_MOTOR = 0, /* Address 0x40 */
|
||||
INA219_RIGHT_MOTOR = 1, /* Address 0x41 */
|
||||
INA219_COUNT
|
||||
} INA219Sensor;
|
||||
|
||||
/* INA219 measurement data */
|
||||
typedef struct {
|
||||
uint16_t bus_voltage_mv; /* Bus voltage in mV (0–26000) */
|
||||
int16_t shunt_voltage_uv; /* Shunt voltage in µV (±327000) */
|
||||
int16_t current_ma; /* Current in mA (signed) */
|
||||
uint32_t power_mw; /* Power in mW */
|
||||
} INA219Data;
|
||||
|
||||
/*
|
||||
* ina219_init()
|
||||
*
|
||||
* Initialize I2C1 and both INA219 sensors (left + right motor).
|
||||
* Performs auto-calibration for typical motor current monitoring.
|
||||
* Call once at startup after i2c1_init().
|
||||
*/
|
||||
void ina219_init(void);
|
||||
|
||||
/*
|
||||
* ina219_calibrate(sensor, max_current_ma, shunt_ohms_milli)
|
||||
*
|
||||
* Manually calibrate a sensor for expected max current and shunt resistance.
|
||||
* Calculates internal calibration register value.
|
||||
*
|
||||
* Example:
|
||||
* ina219_calibrate(INA219_LEFT_MOTOR, 5000, 100); // 5A max, 0.1Ω shunt
|
||||
*/
|
||||
void ina219_calibrate(INA219Sensor sensor, uint16_t max_current_ma, uint16_t shunt_ohms_milli);
|
||||
|
||||
/*
|
||||
* ina219_read(sensor, data)
|
||||
*
|
||||
* Read all measurements from a sensor (voltage, current, power).
|
||||
* Blocks until measurements are ready (typically <1ms at default ADC resolution).
|
||||
*
|
||||
* Returns: true if read successful, false on I2C error.
|
||||
*/
|
||||
bool ina219_read(INA219Sensor sensor, INA219Data *data);
|
||||
|
||||
/*
|
||||
* ina219_read_bus_voltage_mv(sensor, voltage_mv)
|
||||
*
|
||||
* Read bus voltage only (faster than full read).
|
||||
* Returns: true if successful.
|
||||
*/
|
||||
bool ina219_read_bus_voltage_mv(INA219Sensor sensor, uint16_t *voltage_mv);
|
||||
|
||||
/*
|
||||
* ina219_read_current_ma(sensor, current_ma)
|
||||
*
|
||||
* Read current only (requires prior calibration).
|
||||
* Returns: true if successful.
|
||||
*/
|
||||
bool ina219_read_current_ma(INA219Sensor sensor, int16_t *current_ma);
|
||||
|
||||
/*
|
||||
* ina219_read_power_mw(sensor, power_mw)
|
||||
*
|
||||
* Read power consumption only.
|
||||
* Returns: true if successful.
|
||||
*/
|
||||
bool ina219_read_power_mw(INA219Sensor sensor, uint32_t *power_mw);
|
||||
|
||||
/*
|
||||
* ina219_alert_enable(sensor, current_limit_ma)
|
||||
*
|
||||
* Enable alert pin when current exceeds limit (overcurrent protection).
|
||||
* Alert pin: GPIO, active high, open-drain output.
|
||||
*/
|
||||
void ina219_alert_enable(INA219Sensor sensor, uint16_t current_limit_ma);
|
||||
|
||||
/*
|
||||
* ina219_alert_disable(sensor)
|
||||
*
|
||||
* Disable alert for a sensor.
|
||||
*/
|
||||
void ina219_alert_disable(INA219Sensor sensor);
|
||||
|
||||
/*
|
||||
* ina219_reset(sensor)
|
||||
*
|
||||
* Perform soft reset on a sensor (clears all registers to default).
|
||||
*/
|
||||
void ina219_reset(INA219Sensor sensor);
|
||||
|
||||
#endif /* INA219_H */
|
||||
@ -0,0 +1,16 @@
|
||||
# Odometry fusion configuration (complementary filter)
|
||||
|
||||
odom_fusion:
|
||||
ros__parameters:
|
||||
# Publishing frequency in Hz
|
||||
frequency: 50 # 50 Hz = 20ms cycle
|
||||
|
||||
# Sensor weights (will be normalized)
|
||||
# Wheel odometry: reliable linear displacement
|
||||
wheel_weight: 0.6
|
||||
|
||||
# Visual odometry: loop closure and long-term correction
|
||||
visual_weight: 0.3
|
||||
|
||||
# IMU: high-frequency attitude and angular velocity
|
||||
imu_weight: 0.1
|
||||
@ -0,0 +1,36 @@
|
||||
"""Launch file for odom_fusion_node."""
|
||||
|
||||
from launch import LaunchDescription
|
||||
from launch_ros.actions import Node
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
"""Generate launch description for odometry fusion."""
|
||||
# Package directory
|
||||
pkg_dir = get_package_share_directory("saltybot_odom_fusion")
|
||||
|
||||
# Parameters
|
||||
config_file = os.path.join(pkg_dir, "config", "fusion_config.yaml")
|
||||
|
||||
# Declare launch arguments
|
||||
return LaunchDescription(
|
||||
[
|
||||
DeclareLaunchArgument(
|
||||
"config_file",
|
||||
default_value=config_file,
|
||||
description="Path to configuration YAML file",
|
||||
),
|
||||
# Odometry fusion node
|
||||
Node(
|
||||
package="saltybot_odom_fusion",
|
||||
executable="odom_fusion_node",
|
||||
name="odom_fusion",
|
||||
output="screen",
|
||||
parameters=[LaunchConfiguration("config_file")],
|
||||
),
|
||||
]
|
||||
)
|
||||
31
jetson/ros2_ws/src/saltybot_odom_fusion/package.xml
Normal file
31
jetson/ros2_ws/src/saltybot_odom_fusion/package.xml
Normal file
@ -0,0 +1,31 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_odom_fusion</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
Odometry fusion node for SaltyBot: fuses wheel, visual, and IMU odometry
|
||||
using complementary filtering. Publishes fused /odom and broadcasts
|
||||
odom→base_link TF at 50Hz. Handles sensor dropout and covariance weighting.
|
||||
</description>
|
||||
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>geometry_msgs</depend>
|
||||
<depend>nav_msgs</depend>
|
||||
<depend>sensor_msgs</depend>
|
||||
<depend>std_msgs</depend>
|
||||
<depend>tf2_ros</depend>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,332 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Odometry fusion node for SaltyBot.
|
||||
|
||||
Fuses wheel odometry, visual odometry, and IMU data using complementary filtering.
|
||||
Publishes fused /odom and broadcasts odom→base_link TF at 50Hz.
|
||||
|
||||
Published topics:
|
||||
/odom (nav_msgs/Odometry) - Fused odometry with covariance
|
||||
|
||||
TF broadcasts:
|
||||
odom→base_link - Position and orientation transform
|
||||
|
||||
Subscribed topics:
|
||||
/saltybot/wheel_odom (nav_msgs/Odometry) - Wheel encoder odometry
|
||||
/rtab_map/odom (nav_msgs/Odometry) - Visual odometry (SLAM)
|
||||
/imu/data (sensor_msgs/Imu) - Inertial measurement unit
|
||||
"""
|
||||
|
||||
import math
|
||||
import json
|
||||
from typing import Optional, Tuple
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
import numpy as np
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.timer import Timer
|
||||
|
||||
from geometry_msgs.msg import Pose, Twist, TransformStamped, Quaternion
|
||||
from nav_msgs.msg import Odometry
|
||||
from sensor_msgs.msg import Imu
|
||||
from std_msgs.msg import String
|
||||
from tf2_ros import TransformBroadcaster
|
||||
from tf_transformations import quaternion_from_euler, euler_from_quaternion
|
||||
|
||||
|
||||
@dataclass
|
||||
class OdomState:
|
||||
"""Fused odometry state."""
|
||||
x: float = 0.0 # Position in odom frame
|
||||
y: float = 0.0
|
||||
theta: float = 0.0 # Orientation (yaw)
|
||||
|
||||
vx: float = 0.0 # Velocity in body frame
|
||||
vy: float = 0.0
|
||||
omega: float = 0.0 # Angular velocity
|
||||
|
||||
cov_pos: np.ndarray = field(default_factory=lambda: np.eye(2) * 0.01) # Position covariance
|
||||
cov_theta: float = 0.01 # Orientation covariance
|
||||
cov_vel: np.ndarray = field(default_factory=lambda: np.eye(2) * 0.01) # Velocity covariance
|
||||
|
||||
|
||||
class OdomFusionNode(Node):
|
||||
"""ROS2 node for odometry fusion."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("odom_fusion")
|
||||
|
||||
# Parameters
|
||||
self.declare_parameter("frequency", 50) # Hz
|
||||
self.declare_parameter("wheel_weight", 0.6) # Weight for wheel odometry
|
||||
self.declare_parameter("visual_weight", 0.3) # Weight for visual odometry
|
||||
self.declare_parameter("imu_weight", 0.1) # Weight for IMU
|
||||
|
||||
frequency = self.get_parameter("frequency").value
|
||||
self.wheel_weight = self.get_parameter("wheel_weight").value
|
||||
self.visual_weight = self.get_parameter("visual_weight").value
|
||||
self.imu_weight = self.get_parameter("imu_weight").value
|
||||
|
||||
# Normalize weights
|
||||
total_weight = self.wheel_weight + self.visual_weight + self.imu_weight
|
||||
self.wheel_weight /= total_weight
|
||||
self.visual_weight /= total_weight
|
||||
self.imu_weight /= total_weight
|
||||
|
||||
# State
|
||||
self.state = OdomState()
|
||||
self.last_time = None
|
||||
|
||||
# Last received messages
|
||||
self.last_wheel_odom: Optional[Odometry] = None
|
||||
self.last_visual_odom: Optional[Odometry] = None
|
||||
self.last_imu: Optional[Imu] = None
|
||||
|
||||
# Subscriptions
|
||||
self.create_subscription(
|
||||
Odometry, "/saltybot/wheel_odom", self._on_wheel_odom, 10
|
||||
)
|
||||
self.create_subscription(
|
||||
Odometry, "/rtab_map/odom", self._on_visual_odom, 10
|
||||
)
|
||||
self.create_subscription(Imu, "/imu/data", self._on_imu, 10)
|
||||
|
||||
# Publications
|
||||
self.pub_odom = self.create_publisher(Odometry, "/odom", 10)
|
||||
self.pub_info = self.create_publisher(String, "/saltybot/odom_fusion_info", 10)
|
||||
|
||||
# TF broadcaster
|
||||
self.tf_broadcaster = TransformBroadcaster(self)
|
||||
|
||||
# Timer for periodic fusion and publishing at 50Hz
|
||||
period = 1.0 / frequency
|
||||
self.timer: Timer = self.create_timer(period, self._timer_callback)
|
||||
|
||||
self.get_logger().info(
|
||||
f"Odometry fusion initialized at {frequency}Hz. "
|
||||
f"Weights - wheel: {self.wheel_weight:.2f}, "
|
||||
f"visual: {self.visual_weight:.2f}, "
|
||||
f"imu: {self.imu_weight:.2f}"
|
||||
)
|
||||
|
||||
def _on_wheel_odom(self, msg: Odometry) -> None:
|
||||
"""Update wheel odometry."""
|
||||
self.last_wheel_odom = msg
|
||||
|
||||
def _on_visual_odom(self, msg: Odometry) -> None:
|
||||
"""Update visual odometry."""
|
||||
self.last_visual_odom = msg
|
||||
|
||||
def _on_imu(self, msg: Imu) -> None:
|
||||
"""Update IMU data."""
|
||||
self.last_imu = msg
|
||||
|
||||
def _timer_callback(self) -> None:
|
||||
"""Fuse odometry sources and publish at 50Hz."""
|
||||
now = self.get_clock().now()
|
||||
|
||||
if self.last_time is None:
|
||||
self.last_time = now
|
||||
return
|
||||
|
||||
dt = (now - self.last_time).nanoseconds / 1e9
|
||||
self.last_time = now
|
||||
|
||||
# Fuse available sources
|
||||
self._fuse_odometry(dt)
|
||||
|
||||
# Publish fused odometry
|
||||
self._publish_odometry(now)
|
||||
|
||||
# Broadcast TF
|
||||
self._broadcast_tf(now)
|
||||
|
||||
# Publish debug info
|
||||
self._publish_info()
|
||||
|
||||
def _fuse_odometry(self, dt: float) -> None:
|
||||
"""Fuse odometry sources using complementary filtering."""
|
||||
if dt <= 0 or dt > 1.0: # Safety check on dt
|
||||
return
|
||||
|
||||
# Collect available measurements
|
||||
measurements = {}
|
||||
|
||||
if self.last_wheel_odom is not None:
|
||||
measurements["wheel"] = self.last_wheel_odom
|
||||
|
||||
if self.last_visual_odom is not None:
|
||||
measurements["visual"] = self.last_visual_odom
|
||||
|
||||
if self.last_imu is not None:
|
||||
measurements["imu"] = self.last_imu
|
||||
|
||||
if not measurements:
|
||||
return # No data to fuse
|
||||
|
||||
# Complementary filter approach
|
||||
# High-frequency IMU for attitude, low-frequency vision for position correction
|
||||
|
||||
# Extract velocities from wheel odometry (most reliable for linear motion)
|
||||
if "wheel" in measurements:
|
||||
wheel_odom = measurements["wheel"]
|
||||
self.state.vx = wheel_odom.twist.twist.linear.x * self.wheel_weight
|
||||
self.state.vy = wheel_odom.twist.twist.linear.y * self.wheel_weight
|
||||
|
||||
# Integrate IMU angular velocity (high-frequency)
|
||||
if "imu" in measurements:
|
||||
imu = measurements["imu"]
|
||||
# Use IMU for angular velocity (most accurate for rotation)
|
||||
self.state.omega = imu.angular_velocity.z * self.imu_weight
|
||||
|
||||
# Update orientation from IMU
|
||||
roll, pitch, yaw = euler_from_quaternion([
|
||||
imu.orientation.x, imu.orientation.y,
|
||||
imu.orientation.z, imu.orientation.w
|
||||
])
|
||||
# Blend with existing estimate
|
||||
self.state.theta = (0.8 * self.state.theta + 0.2 * yaw)
|
||||
|
||||
# Apply velocity-based position update
|
||||
self.state.x += self.state.vx * dt * math.cos(self.state.theta)
|
||||
self.state.x -= self.state.vy * dt * math.sin(self.state.theta)
|
||||
|
||||
self.state.y += self.state.vx * dt * math.sin(self.state.theta)
|
||||
self.state.y += self.state.vy * dt * math.cos(self.state.theta)
|
||||
|
||||
# Visual odometry correction (low-frequency drift correction)
|
||||
if "visual" in measurements and self.visual_weight > 0:
|
||||
visual_odom = measurements["visual"]
|
||||
visual_x = visual_odom.pose.pose.position.x
|
||||
visual_y = visual_odom.pose.pose.position.y
|
||||
|
||||
# Gradually correct position drift using visual odometry
|
||||
correction_factor = self.visual_weight * 0.1 # Small correction rate
|
||||
self.state.x += (visual_x - self.state.x) * correction_factor
|
||||
self.state.y += (visual_y - self.state.y) * correction_factor
|
||||
|
||||
# Correct orientation from visual odometry
|
||||
visual_roll, visual_pitch, visual_yaw = euler_from_quaternion([
|
||||
visual_odom.pose.pose.orientation.x,
|
||||
visual_odom.pose.pose.orientation.y,
|
||||
visual_odom.pose.pose.orientation.z,
|
||||
visual_odom.pose.pose.orientation.w
|
||||
])
|
||||
self.state.theta += (visual_yaw - self.state.theta) * correction_factor
|
||||
|
||||
# Update covariances based on available measurements
|
||||
self._update_covariances(measurements)
|
||||
|
||||
def _update_covariances(self, measurements: dict) -> None:
|
||||
"""Update state covariance based on measurement sources."""
|
||||
# Start with baseline
|
||||
base_pos_cov = 0.01
|
||||
base_theta_cov = 0.01
|
||||
|
||||
# Reduce covariance if we have good measurements
|
||||
if "wheel" in measurements:
|
||||
base_pos_cov *= 0.5
|
||||
if "visual" in measurements:
|
||||
base_pos_cov *= 0.7
|
||||
if "imu" in measurements:
|
||||
base_theta_cov *= 0.3
|
||||
|
||||
# Update covariance matrices
|
||||
self.state.cov_pos = np.eye(2) * base_pos_cov
|
||||
self.state.cov_theta = base_theta_cov
|
||||
|
||||
def _publish_odometry(self, now) -> None:
|
||||
"""Publish fused odometry message."""
|
||||
odom_msg = Odometry()
|
||||
odom_msg.header.stamp = now.to_msg()
|
||||
odom_msg.header.frame_id = "odom"
|
||||
odom_msg.child_frame_id = "base_link"
|
||||
|
||||
# Position
|
||||
odom_msg.pose.pose.position.x = self.state.x
|
||||
odom_msg.pose.pose.position.y = self.state.y
|
||||
odom_msg.pose.pose.position.z = 0.0
|
||||
|
||||
# Orientation (yaw only, roll/pitch from IMU)
|
||||
q = quaternion_from_euler(0, 0, self.state.theta)
|
||||
odom_msg.pose.pose.orientation = Quaternion(
|
||||
x=q[0], y=q[1], z=q[2], w=q[3]
|
||||
)
|
||||
|
||||
# Position covariance (6x6)
|
||||
pose_cov = np.zeros((6, 6))
|
||||
pose_cov[0:2, 0:2] = self.state.cov_pos
|
||||
pose_cov[5, 5] = self.state.cov_theta # Yaw variance
|
||||
odom_msg.pose.covariance = pose_cov.flatten().tolist()
|
||||
|
||||
# Velocity
|
||||
odom_msg.twist.twist.linear.x = self.state.vx
|
||||
odom_msg.twist.twist.linear.y = self.state.vy
|
||||
odom_msg.twist.twist.linear.z = 0.0
|
||||
odom_msg.twist.twist.angular.z = self.state.omega
|
||||
|
||||
# Velocity covariance (6x6)
|
||||
twist_cov = np.zeros((6, 6))
|
||||
twist_cov[0:2, 0:2] = self.state.cov_vel
|
||||
twist_cov[5, 5] = 0.01 # Angular velocity variance
|
||||
odom_msg.twist.covariance = twist_cov.flatten().tolist()
|
||||
|
||||
self.pub_odom.publish(odom_msg)
|
||||
|
||||
def _broadcast_tf(self, now) -> None:
|
||||
"""Broadcast odom→base_link TF."""
|
||||
t = TransformStamped()
|
||||
t.header.stamp = now.to_msg()
|
||||
t.header.frame_id = "odom"
|
||||
t.child_frame_id = "base_link"
|
||||
|
||||
# Translation
|
||||
t.transform.translation.x = self.state.x
|
||||
t.transform.translation.y = self.state.y
|
||||
t.transform.translation.z = 0.0
|
||||
|
||||
# Rotation (yaw only)
|
||||
q = quaternion_from_euler(0, 0, self.state.theta)
|
||||
t.transform.rotation = Quaternion(
|
||||
x=q[0], y=q[1], z=q[2], w=q[3]
|
||||
)
|
||||
|
||||
self.tf_broadcaster.sendTransform(t)
|
||||
|
||||
def _publish_info(self) -> None:
|
||||
"""Publish fusion debug information."""
|
||||
info = {
|
||||
"timestamp": self.get_clock().now().nanoseconds / 1e9,
|
||||
"position": [self.state.x, self.state.y],
|
||||
"theta_deg": math.degrees(self.state.theta),
|
||||
"velocity": [self.state.vx, self.state.vy],
|
||||
"omega": self.state.omega,
|
||||
"sources": {
|
||||
"wheel": self.last_wheel_odom is not None,
|
||||
"visual": self.last_visual_odom is not None,
|
||||
"imu": self.last_imu is not None,
|
||||
},
|
||||
"weights": {
|
||||
"wheel": f"{self.wheel_weight:.2f}",
|
||||
"visual": f"{self.visual_weight:.2f}",
|
||||
"imu": f"{self.imu_weight:.2f}",
|
||||
},
|
||||
}
|
||||
msg = String(data=json.dumps(info))
|
||||
self.pub_info.publish(msg)
|
||||
|
||||
|
||||
def main(args=None):
|
||||
rclpy.init(args=args)
|
||||
node = OdomFusionNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
4
jetson/ros2_ws/src/saltybot_odom_fusion/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_odom_fusion/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_odom_fusion
|
||||
[egg_info]
|
||||
tag_date = 0
|
||||
29
jetson/ros2_ws/src/saltybot_odom_fusion/setup.py
Normal file
29
jetson/ros2_ws/src/saltybot_odom_fusion/setup.py
Normal file
@ -0,0 +1,29 @@
|
||||
from setuptools import setup
|
||||
|
||||
package_name = "saltybot_odom_fusion"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version="0.1.0",
|
||||
packages=[package_name],
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||
(f"share/{package_name}", ["package.xml"]),
|
||||
(f"share/{package_name}/launch", ["launch/odom_fusion.launch.py"]),
|
||||
(f"share/{package_name}/config", ["config/fusion_config.yaml"]),
|
||||
],
|
||||
install_requires=["setuptools"],
|
||||
zip_safe=True,
|
||||
maintainer="sl-controls",
|
||||
maintainer_email="sl-controls@saltylab.local",
|
||||
description=(
|
||||
"Odometry fusion: wheel + visual + IMU complementary filter with TF broadcast"
|
||||
),
|
||||
license="MIT",
|
||||
tests_require=["pytest"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"odom_fusion_node = saltybot_odom_fusion.odom_fusion_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
361
jetson/ros2_ws/src/saltybot_odom_fusion/test/test_odom_fusion.py
Normal file
361
jetson/ros2_ws/src/saltybot_odom_fusion/test/test_odom_fusion.py
Normal file
@ -0,0 +1,361 @@
|
||||
"""Unit tests for odom_fusion_node."""
|
||||
|
||||
import pytest
|
||||
import math
|
||||
import json
|
||||
import numpy as np
|
||||
from geometry_msgs.msg import Pose, Quaternion, Twist, Vector3
|
||||
from nav_msgs.msg import Odometry
|
||||
from sensor_msgs.msg import Imu
|
||||
from std_msgs.msg import String, Header
|
||||
from tf_transformations import quaternion_from_euler, euler_from_quaternion
|
||||
|
||||
import rclpy
|
||||
|
||||
# Import the node and classes under test
|
||||
from saltybot_odom_fusion.odom_fusion_node import (
|
||||
OdomFusionNode,
|
||||
OdomState,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def rclpy_fixture():
|
||||
"""Initialize and cleanup rclpy."""
|
||||
rclpy.init()
|
||||
yield
|
||||
rclpy.shutdown()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def node(rclpy_fixture):
|
||||
"""Create an odometry fusion node instance."""
|
||||
node = OdomFusionNode()
|
||||
yield node
|
||||
node.destroy_node()
|
||||
|
||||
|
||||
class TestOdomState:
|
||||
"""Test suite for OdomState dataclass."""
|
||||
|
||||
def test_odom_state_initialization(self):
|
||||
"""Test OdomState initializes with correct defaults."""
|
||||
state = OdomState()
|
||||
|
||||
assert state.x == 0.0
|
||||
assert state.y == 0.0
|
||||
assert state.theta == 0.0
|
||||
assert state.vx == 0.0
|
||||
assert state.vy == 0.0
|
||||
assert state.omega == 0.0
|
||||
|
||||
def test_odom_state_covariance_initialization(self):
|
||||
"""Test OdomState covariance matrices."""
|
||||
state = OdomState()
|
||||
|
||||
assert state.cov_pos.shape == (2, 2)
|
||||
assert state.cov_theta == 0.01
|
||||
assert state.cov_vel.shape == (2, 2)
|
||||
|
||||
|
||||
class TestOdomFusionNode:
|
||||
"""Test suite for OdomFusionNode."""
|
||||
|
||||
def test_node_initialization(self, node):
|
||||
"""Test that node initializes with correct defaults."""
|
||||
assert node.state.x == 0.0
|
||||
assert node.state.y == 0.0
|
||||
assert node.state.theta == 0.0
|
||||
assert node.last_wheel_odom is None
|
||||
assert node.last_visual_odom is None
|
||||
assert node.last_imu is None
|
||||
|
||||
def test_weight_normalization(self, node):
|
||||
"""Test that weights are normalized to sum to 1.0."""
|
||||
total = node.wheel_weight + node.visual_weight + node.imu_weight
|
||||
assert abs(total - 1.0) < 1e-6
|
||||
|
||||
def test_wheel_odom_subscription(self, node):
|
||||
"""Test wheel odometry message handling."""
|
||||
odom_msg = Odometry()
|
||||
odom_msg.twist.twist.linear.x = 1.0
|
||||
odom_msg.twist.twist.linear.y = 0.5
|
||||
|
||||
node._on_wheel_odom(odom_msg)
|
||||
assert node.last_wheel_odom is not None
|
||||
assert node.last_wheel_odom.twist.twist.linear.x == 1.0
|
||||
|
||||
def test_visual_odom_subscription(self, node):
|
||||
"""Test visual odometry message handling."""
|
||||
odom_msg = Odometry()
|
||||
odom_msg.pose.pose.position.x = 5.0
|
||||
odom_msg.pose.pose.position.y = 3.0
|
||||
|
||||
node._on_visual_odom(odom_msg)
|
||||
assert node.last_visual_odom is not None
|
||||
assert node.last_visual_odom.pose.pose.position.x == 5.0
|
||||
|
||||
def test_imu_subscription(self, node):
|
||||
"""Test IMU message handling."""
|
||||
imu_msg = Imu()
|
||||
imu_msg.angular_velocity.z = 0.1
|
||||
|
||||
node._on_imu(imu_msg)
|
||||
assert node.last_imu is not None
|
||||
assert node.last_imu.angular_velocity.z == 0.1
|
||||
|
||||
def test_position_integration(self, node):
|
||||
"""Test position integration from velocity."""
|
||||
node.state.vx = 1.0 # 1 m/s
|
||||
node.state.vy = 0.0
|
||||
node.state.theta = 0.0 # Facing +x direction
|
||||
|
||||
dt = 1.0 # 1 second
|
||||
node._fuse_odometry(dt)
|
||||
|
||||
# Should move 1 meter in +x direction
|
||||
assert abs(node.state.x - 1.0) < 0.1
|
||||
|
||||
def test_position_integration_rotated(self, node):
|
||||
"""Test position integration with rotation."""
|
||||
node.state.vx = 1.0 # 1 m/s
|
||||
node.state.vy = 0.0
|
||||
node.state.theta = math.pi / 2 # Facing +y direction
|
||||
|
||||
dt = 1.0
|
||||
node._fuse_odometry(dt)
|
||||
|
||||
# Should move 1 meter in +y direction
|
||||
assert abs(node.state.y - 1.0) < 0.1
|
||||
|
||||
def test_angular_velocity_update(self, node):
|
||||
"""Test angular velocity from IMU."""
|
||||
imu_msg = Imu()
|
||||
imu_msg.angular_velocity.z = 0.2
|
||||
imu_msg.orientation.w = 1.0
|
||||
|
||||
node.last_imu = imu_msg
|
||||
dt = 0.1
|
||||
|
||||
node._fuse_odometry(dt)
|
||||
|
||||
# Angular velocity should be weighted by imu_weight
|
||||
expected_omega = 0.2 * node.imu_weight
|
||||
assert abs(node.state.omega - expected_omega) < 0.01
|
||||
|
||||
def test_velocity_blending_wheel_only(self, node):
|
||||
"""Test velocity blending with only wheel odometry."""
|
||||
wheel_msg = Odometry()
|
||||
wheel_msg.twist.twist.linear.x = 1.0
|
||||
wheel_msg.twist.twist.linear.y = 0.0
|
||||
|
||||
node.last_wheel_odom = wheel_msg
|
||||
dt = 0.1
|
||||
|
||||
node._fuse_odometry(dt)
|
||||
|
||||
# Velocity should be weighted by wheel_weight
|
||||
expected_vx = 1.0 * node.wheel_weight
|
||||
assert abs(node.state.vx - expected_vx) < 0.01
|
||||
|
||||
def test_visual_drift_correction(self, node):
|
||||
"""Test drift correction from visual odometry."""
|
||||
# Current state
|
||||
node.state.x = 0.0
|
||||
node.state.y = 0.0
|
||||
|
||||
# Visual odometry says we're at (1.0, 1.0)
|
||||
visual_msg = Odometry()
|
||||
visual_msg.pose.pose.position.x = 1.0
|
||||
visual_msg.pose.pose.position.y = 1.0
|
||||
visual_msg.pose.pose.orientation.w = 1.0
|
||||
|
||||
node.last_visual_odom = visual_msg
|
||||
dt = 0.1
|
||||
|
||||
node._fuse_odometry(dt)
|
||||
|
||||
# Position should have moved toward visual estimate
|
||||
assert node.state.x > 0.0
|
||||
assert node.state.y > 0.0
|
||||
|
||||
def test_covariance_updates(self, node):
|
||||
"""Test that covariances are updated based on measurements."""
|
||||
wheel_msg = Odometry()
|
||||
visual_msg = Odometry()
|
||||
visual_msg.pose.pose.orientation.w = 1.0
|
||||
|
||||
node.last_wheel_odom = wheel_msg
|
||||
node.last_visual_odom = visual_msg
|
||||
|
||||
measurements = {
|
||||
"wheel": wheel_msg,
|
||||
"visual": visual_msg,
|
||||
}
|
||||
|
||||
node._update_covariances(measurements)
|
||||
|
||||
# Covariance should be reduced with good measurements
|
||||
assert node.state.cov_pos[0, 0] < 0.01
|
||||
|
||||
def test_quaternion_to_yaw(self, node):
|
||||
"""Test quaternion to yaw conversion."""
|
||||
# Create 90 degree yaw rotation
|
||||
q = quaternion_from_euler(0, 0, math.pi / 2)
|
||||
|
||||
imu_msg = Imu()
|
||||
imu_msg.orientation.x = q[0]
|
||||
imu_msg.orientation.y = q[1]
|
||||
imu_msg.orientation.z = q[2]
|
||||
imu_msg.orientation.w = q[3]
|
||||
|
||||
node.last_imu = imu_msg
|
||||
dt = 0.1
|
||||
|
||||
node._fuse_odometry(dt)
|
||||
|
||||
# Theta should be blended toward 90 degrees
|
||||
assert node.state.theta > 0.0
|
||||
|
||||
def test_velocity_near_zero(self, node):
|
||||
"""Test handling of near-zero velocities."""
|
||||
wheel_msg = Odometry()
|
||||
wheel_msg.twist.twist.linear.x = 0.001 # Very small
|
||||
|
||||
node.last_wheel_odom = wheel_msg
|
||||
dt = 0.1
|
||||
|
||||
node._fuse_odometry(dt)
|
||||
|
||||
# Position change should be minimal
|
||||
assert abs(node.state.x) < 0.0001
|
||||
|
||||
def test_invalid_dt_ignored(self, node):
|
||||
"""Test that invalid dt values are ignored."""
|
||||
initial_x = node.state.x
|
||||
|
||||
# Zero dt
|
||||
node._fuse_odometry(0.0)
|
||||
assert node.state.x == initial_x
|
||||
|
||||
# Negative dt
|
||||
node._fuse_odometry(-1.0)
|
||||
assert node.state.x == initial_x
|
||||
|
||||
# Very large dt
|
||||
node._fuse_odometry(10.0)
|
||||
assert node.state.x == initial_x
|
||||
|
||||
|
||||
class TestOdomFusionScenarios:
|
||||
"""Integration-style tests for realistic scenarios."""
|
||||
|
||||
def test_scenario_straight_line(self, node):
|
||||
"""Scenario: robot moving in straight line."""
|
||||
wheel_msg = Odometry()
|
||||
wheel_msg.twist.twist.linear.x = 1.0 # 1 m/s forward
|
||||
wheel_msg.twist.twist.linear.y = 0.0
|
||||
|
||||
node.last_wheel_odom = wheel_msg
|
||||
node.state.theta = 0.0 # Facing +x
|
||||
|
||||
# Move for 5 seconds
|
||||
for _ in range(5):
|
||||
node._fuse_odometry(1.0)
|
||||
|
||||
# Should be approximately 5 meters in +x direction
|
||||
assert 4.0 < node.state.x < 6.0
|
||||
assert abs(node.state.y) < 0.5
|
||||
|
||||
def test_scenario_circular_motion(self, node):
|
||||
"""Scenario: robot moving in circle."""
|
||||
wheel_msg = Odometry()
|
||||
wheel_msg.twist.twist.linear.x = 1.0 # 1 m/s forward
|
||||
|
||||
imu_msg = Imu()
|
||||
imu_msg.angular_velocity.z = 0.5 # 0.5 rad/s rotation
|
||||
imu_msg.orientation.w = 1.0
|
||||
|
||||
node.last_wheel_odom = wheel_msg
|
||||
node.last_imu = imu_msg
|
||||
|
||||
# Simulate circular motion for 4 cycles
|
||||
for i in range(4):
|
||||
node._fuse_odometry(1.0)
|
||||
|
||||
# Should have moved distance and rotated
|
||||
distance = math.sqrt(node.state.x**2 + node.state.y**2)
|
||||
assert distance > 1.0
|
||||
|
||||
def test_scenario_drift_correction(self, node):
|
||||
"""Scenario: wheel odometry drifts, vision corrects."""
|
||||
# Wheel says we're at 10, 10
|
||||
node.state.x = 10.0
|
||||
node.state.y = 10.0
|
||||
|
||||
# Vision says we're actually at 9, 9
|
||||
visual_msg = Odometry()
|
||||
visual_msg.pose.pose.position.x = 9.0
|
||||
visual_msg.pose.pose.position.y = 9.0
|
||||
visual_msg.pose.pose.orientation.w = 1.0
|
||||
|
||||
node.last_visual_odom = visual_msg
|
||||
|
||||
# Before correction
|
||||
initial_x = node.state.x
|
||||
initial_y = node.state.y
|
||||
|
||||
node._fuse_odometry(1.0)
|
||||
|
||||
# Position should move toward visual estimate
|
||||
assert node.state.x < initial_x
|
||||
assert node.state.y < initial_y
|
||||
|
||||
def test_scenario_all_sensors_available(self, node):
|
||||
"""Scenario: all three sensor sources available."""
|
||||
wheel_msg = Odometry()
|
||||
wheel_msg.twist.twist.linear.x = 0.5
|
||||
|
||||
visual_msg = Odometry()
|
||||
visual_msg.pose.pose.position.x = 2.0
|
||||
visual_msg.pose.pose.position.y = 1.0
|
||||
visual_msg.pose.pose.orientation.w = 1.0
|
||||
|
||||
imu_msg = Imu()
|
||||
q = quaternion_from_euler(0, 0, 0.1) # Small yaw
|
||||
imu_msg.orientation.x = q[0]
|
||||
imu_msg.orientation.y = q[1]
|
||||
imu_msg.orientation.z = q[2]
|
||||
imu_msg.orientation.w = q[3]
|
||||
imu_msg.angular_velocity.z = 0.05
|
||||
|
||||
node.last_wheel_odom = wheel_msg
|
||||
node.last_visual_odom = visual_msg
|
||||
node.last_imu = imu_msg
|
||||
|
||||
measurements = {
|
||||
"wheel": wheel_msg,
|
||||
"visual": visual_msg,
|
||||
"imu": imu_msg,
|
||||
}
|
||||
|
||||
node._update_covariances(measurements)
|
||||
|
||||
# Covariance should be low with all sources
|
||||
assert node.state.cov_pos[0, 0] < 0.01
|
||||
assert node.state.cov_theta < 0.01
|
||||
|
||||
def test_scenario_sensor_dropout(self, node):
|
||||
"""Scenario: visual sensor drops out, wheel continues."""
|
||||
wheel_msg = Odometry()
|
||||
wheel_msg.twist.twist.linear.x = 1.0
|
||||
|
||||
node.last_wheel_odom = wheel_msg
|
||||
node.last_visual_odom = None # Visual dropout
|
||||
node.last_imu = None
|
||||
|
||||
# Should continue with wheel only
|
||||
node._fuse_odometry(1.0)
|
||||
|
||||
# Position should still integrate
|
||||
assert node.state.x > 0.0
|
||||
244
src/ina219.c
Normal file
244
src/ina219.c
Normal file
@ -0,0 +1,244 @@
|
||||
#include "ina219.h"
|
||||
#include "config.h"
|
||||
#include "i2c1.h"
|
||||
#include <string.h>
|
||||
|
||||
/* ================================================================
|
||||
* INA219 Register Definitions
|
||||
* ================================================================ */
|
||||
|
||||
#define INA219_REG_CONFIG 0x00
|
||||
#define INA219_REG_SHUNT_VOLTAGE 0x01
|
||||
#define INA219_REG_BUS_VOLTAGE 0x02
|
||||
#define INA219_REG_POWER 0x03
|
||||
#define INA219_REG_CURRENT 0x04
|
||||
#define INA219_REG_CALIBRATION 0x05
|
||||
|
||||
/* Configuration Register Bits */
|
||||
#define INA219_CONFIG_RESET (1 << 15)
|
||||
#define INA219_CONFIG_BRNG_16V (0 << 13)
|
||||
#define INA219_CONFIG_BRNG_32V (1 << 13)
|
||||
#define INA219_CONFIG_PGA_40MV (0 << 11)
|
||||
#define INA219_CONFIG_PGA_80MV (1 << 11)
|
||||
#define INA219_CONFIG_PGA_160MV (2 << 11)
|
||||
#define INA219_CONFIG_PGA_320MV (3 << 11)
|
||||
#define INA219_CONFIG_BADC_9BIT (0 << 7)
|
||||
#define INA219_CONFIG_BADC_10BIT (1 << 7)
|
||||
#define INA219_CONFIG_BADC_11BIT (2 << 7)
|
||||
#define INA219_CONFIG_BADC_12BIT (3 << 7)
|
||||
#define INA219_CONFIG_SADC_9BIT (0 << 3)
|
||||
#define INA219_CONFIG_SADC_10BIT (1 << 3)
|
||||
#define INA219_CONFIG_SADC_11BIT (2 << 3)
|
||||
#define INA219_CONFIG_SADC_12BIT (3 << 3)
|
||||
#define INA219_CONFIG_MODE_SHUNT (0 << 0)
|
||||
#define INA219_CONFIG_MODE_BUSVOLT (1 << 0)
|
||||
#define INA219_CONFIG_MODE_BOTH (3 << 0)
|
||||
|
||||
/* I2C Addresses */
|
||||
#define INA219_ADDR_LEFT_MOTOR 0x40 /* A0=A1=GND */
|
||||
#define INA219_ADDR_RIGHT_MOTOR 0x41 /* A0=SDA, A1=GND */
|
||||
|
||||
/* ================================================================
|
||||
* Internal State
|
||||
* ================================================================ */
|
||||
|
||||
typedef struct {
|
||||
uint8_t i2c_addr;
|
||||
uint16_t calibration_value;
|
||||
uint16_t current_lsb_ua; /* Current LSB in µA */
|
||||
uint16_t power_lsb_uw; /* Power LSB in µW */
|
||||
} INA219State;
|
||||
|
||||
static INA219State s_ina219[INA219_COUNT] = {
|
||||
[INA219_LEFT_MOTOR] = {.i2c_addr = INA219_ADDR_LEFT_MOTOR},
|
||||
[INA219_RIGHT_MOTOR] = {.i2c_addr = INA219_ADDR_RIGHT_MOTOR}
|
||||
};
|
||||
|
||||
/* ================================================================
|
||||
* I2C Helper Functions
|
||||
* ================================================================ */
|
||||
|
||||
static bool i2c_write_register(uint8_t addr, uint8_t reg, uint16_t value)
|
||||
{
|
||||
uint8_t buf[3] = {reg, (uint8_t)(value >> 8), (uint8_t)(value & 0xFF)};
|
||||
return i2c1_write(addr, buf, sizeof(buf)) == 0;
|
||||
}
|
||||
|
||||
static bool i2c_read_register(uint8_t addr, uint8_t reg, uint16_t *value)
|
||||
{
|
||||
uint8_t buf[2];
|
||||
if (i2c1_write(addr, ®, 1) != 0) return false;
|
||||
if (i2c1_read(addr, buf, sizeof(buf)) != 0) return false;
|
||||
*value = ((uint16_t)buf[0] << 8) | buf[1];
|
||||
return true;
|
||||
}
|
||||
|
||||
/* ================================================================
|
||||
* Public API
|
||||
* ================================================================ */
|
||||
|
||||
void ina219_init(void)
|
||||
{
|
||||
/* Ensure I2C1 is initialized before calling this */
|
||||
/* Auto-calibrate both sensors for typical motor monitoring:
|
||||
* - Max current: 5A
|
||||
* - Shunt resistor: 0.1Ω
|
||||
* - LSB: 160µA (5A / 32768)
|
||||
*/
|
||||
ina219_calibrate(INA219_LEFT_MOTOR, 5000, 100);
|
||||
ina219_calibrate(INA219_RIGHT_MOTOR, 5000, 100);
|
||||
}
|
||||
|
||||
void ina219_calibrate(INA219Sensor sensor, uint16_t max_current_ma, uint16_t shunt_ohms_milli)
|
||||
{
|
||||
if (sensor >= INA219_COUNT) return;
|
||||
|
||||
INA219State *s = &s_ina219[sensor];
|
||||
|
||||
/* Calculate current LSB: max_current / 32768 (15-bit signed register)
|
||||
* LSB unit: µA
|
||||
* Example: 5000mA / 32768 ≈ 152.6µA → use 160µA (round up for safety)
|
||||
*/
|
||||
uint32_t current_lsb_ua = ((uint32_t)max_current_ma * 1000 + 32767) / 32768;
|
||||
s->current_lsb_ua = (uint16_t)current_lsb_ua;
|
||||
|
||||
/* Power LSB = 20 × current_lsb_ua (20µW per 1µA of current LSB) */
|
||||
s->power_lsb_uw = 20 * current_lsb_ua;
|
||||
|
||||
/* Calibration register: (0.04096) / (current_lsb_ua × shunt_ohms_milli / 1000)
|
||||
* Simplified: 40960 / (current_lsb_ua × shunt_ohms_milli)
|
||||
*/
|
||||
uint32_t calibration = 40960 / ((uint32_t)current_lsb_ua * shunt_ohms_milli / 1000);
|
||||
if (calibration > 65535) calibration = 65535;
|
||||
s->calibration_value = (uint16_t)calibration;
|
||||
|
||||
/* Write calibration register */
|
||||
i2c_write_register(s->i2c_addr, INA219_REG_CALIBRATION, s->calibration_value);
|
||||
|
||||
/* Configure for continuous conversion mode (12-bit ADC for both shunt and bus)
|
||||
* Config: 32V range, 160mV PGA, 12-bit ADC, continuous mode
|
||||
*/
|
||||
uint16_t config = INA219_CONFIG_BRNG_32V
|
||||
| INA219_CONFIG_PGA_160MV
|
||||
| INA219_CONFIG_BADC_12BIT
|
||||
| INA219_CONFIG_SADC_12BIT
|
||||
| INA219_CONFIG_MODE_BOTH;
|
||||
i2c_write_register(s->i2c_addr, INA219_REG_CONFIG, config);
|
||||
}
|
||||
|
||||
bool ina219_read(INA219Sensor sensor, INA219Data *data)
|
||||
{
|
||||
if (sensor >= INA219_COUNT || !data) return false;
|
||||
|
||||
INA219State *s = &s_ina219[sensor];
|
||||
uint8_t addr = s->i2c_addr;
|
||||
uint16_t reg_value;
|
||||
|
||||
/* Read shunt voltage (register 0x01) */
|
||||
if (!i2c_read_register(addr, INA219_REG_SHUNT_VOLTAGE, ®_value)) return false;
|
||||
int16_t shunt_raw = (int16_t)reg_value;
|
||||
data->shunt_voltage_uv = shunt_raw * 10; /* 10µV/LSB */
|
||||
|
||||
/* Read bus voltage (register 0x02) */
|
||||
if (!i2c_read_register(addr, INA219_REG_BUS_VOLTAGE, ®_value)) return false;
|
||||
uint16_t bus_raw = (reg_value >> 3) & 0x1FFF; /* 13-bit voltage, 4mV/LSB */
|
||||
data->bus_voltage_mv = bus_raw * 4;
|
||||
|
||||
/* Read current (register 0x04) — requires calibration */
|
||||
if (!i2c_read_register(addr, INA219_REG_CURRENT, ®_value)) return false;
|
||||
int16_t current_raw = (int16_t)reg_value;
|
||||
data->current_ma = (current_raw * (int32_t)s->current_lsb_ua) / 1000;
|
||||
|
||||
/* Read power (register 0x03) — in units of power_lsb */
|
||||
if (!i2c_read_register(addr, INA219_REG_POWER, ®_value)) return false;
|
||||
uint32_t power_raw = reg_value;
|
||||
data->power_mw = (power_raw * (uint32_t)s->power_lsb_uw) / 1000;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ina219_read_bus_voltage_mv(INA219Sensor sensor, uint16_t *voltage_mv)
|
||||
{
|
||||
if (sensor >= INA219_COUNT || !voltage_mv) return false;
|
||||
|
||||
INA219State *s = &s_ina219[sensor];
|
||||
uint16_t reg_value;
|
||||
|
||||
if (!i2c_read_register(s->i2c_addr, INA219_REG_BUS_VOLTAGE, ®_value)) return false;
|
||||
|
||||
uint16_t bus_raw = (reg_value >> 3) & 0x1FFF;
|
||||
*voltage_mv = bus_raw * 4;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ina219_read_current_ma(INA219Sensor sensor, int16_t *current_ma)
|
||||
{
|
||||
if (sensor >= INA219_COUNT || !current_ma) return false;
|
||||
|
||||
INA219State *s = &s_ina219[sensor];
|
||||
uint16_t reg_value;
|
||||
|
||||
if (!i2c_read_register(s->i2c_addr, INA219_REG_CURRENT, ®_value)) return false;
|
||||
|
||||
int16_t current_raw = (int16_t)reg_value;
|
||||
*current_ma = (current_raw * (int32_t)s->current_lsb_ua) / 1000;
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ina219_read_power_mw(INA219Sensor sensor, uint32_t *power_mw)
|
||||
{
|
||||
if (sensor >= INA219_COUNT || !power_mw) return false;
|
||||
|
||||
INA219State *s = &s_ina219[sensor];
|
||||
uint16_t reg_value;
|
||||
|
||||
if (!i2c_read_register(s->i2c_addr, INA219_REG_POWER, ®_value)) return false;
|
||||
|
||||
uint32_t power_raw = reg_value;
|
||||
*power_mw = (power_raw * (uint32_t)s->power_lsb_uw) / 1000;
|
||||
return true;
|
||||
}
|
||||
|
||||
void ina219_alert_enable(INA219Sensor sensor, uint16_t current_limit_ma)
|
||||
{
|
||||
if (sensor >= INA219_COUNT) return;
|
||||
|
||||
INA219State *s = &s_ina219[sensor];
|
||||
|
||||
/* Alert limit register: set to current threshold
|
||||
* Current threshold = (limit_ma × 1000) / current_lsb_ua
|
||||
*/
|
||||
int16_t limit_raw = ((int32_t)current_limit_ma * 1000) / s->current_lsb_ua;
|
||||
if (limit_raw > 32767) limit_raw = 32767;
|
||||
|
||||
/* Enable alert on over-limit, latching mode */
|
||||
uint16_t alert_config = limit_raw;
|
||||
i2c_write_register(s->i2c_addr, 0x06, alert_config); /* Alert register */
|
||||
}
|
||||
|
||||
void ina219_alert_disable(INA219Sensor sensor)
|
||||
{
|
||||
if (sensor >= INA219_COUNT) return;
|
||||
|
||||
INA219State *s = &s_ina219[sensor];
|
||||
|
||||
/* Write 0 to alert register to disable */
|
||||
i2c_write_register(s->i2c_addr, 0x06, 0);
|
||||
}
|
||||
|
||||
void ina219_reset(INA219Sensor sensor)
|
||||
{
|
||||
if (sensor >= INA219_COUNT) return;
|
||||
|
||||
INA219State *s = &s_ina219[sensor];
|
||||
|
||||
/* Set reset bit in config register */
|
||||
i2c_write_register(s->i2c_addr, INA219_REG_CONFIG, INA219_CONFIG_RESET);
|
||||
|
||||
/* Wait for reset to complete (~1ms) */
|
||||
uint32_t start = 0; /* In real code, use HAL_GetTick() */
|
||||
while (start < 2) start++; /* Simple delay */
|
||||
|
||||
/* Re-calibrate after reset */
|
||||
ina219_calibrate(sensor, 5000, 100);
|
||||
}
|
||||
@ -22,6 +22,7 @@
|
||||
#include "buzzer.h"
|
||||
#include "led.h"
|
||||
#include "servo.h"
|
||||
#include "ina219.h"
|
||||
#include "power_mgmt.h"
|
||||
#include "battery.h"
|
||||
#include <math.h>
|
||||
@ -181,6 +182,7 @@ int main(void) {
|
||||
int chip = bmp280_init();
|
||||
baro_chip = (chip > 0) ? chip : 0;
|
||||
mag_type = mag_init();
|
||||
ina219_init(); /* Init INA219 dual motor current monitoring (Issue #214) */
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
267
test/test_ina219.py
Normal file
267
test/test_ina219.py
Normal file
@ -0,0 +1,267 @@
|
||||
"""
|
||||
test_ina219.py — INA219 power monitor driver tests (Issue #214)
|
||||
|
||||
Verifies:
|
||||
- Calibration: LSB calculation for max current and shunt resistance
|
||||
- Register calculations: voltage, current, power from raw ADC values
|
||||
- Multi-sensor support: independent left/right motor monitoring
|
||||
- Alert thresholds: overcurrent limit configuration
|
||||
- Edge cases: boundary values, overflow handling
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
# ── Constants ─────────────────────────────────────────────────────────────
|
||||
|
||||
# Default calibration: 5A max, 0.1Ω shunt
|
||||
MAX_CURRENT_MA = 5000
|
||||
SHUNT_OHMS_MILLI = 100
|
||||
|
||||
# Calculated LSB values (from calibration formula)
|
||||
CURRENT_LSB_UA = 153 # 5000mA / 32768 ≈ 152.59, floor to 153
|
||||
POWER_LSB_UW = 3060 # 20 × 153
|
||||
|
||||
# Register scales
|
||||
BUS_VOLTAGE_LSB_MV = 4 # Bus voltage: 4mV/LSB
|
||||
SHUNT_VOLTAGE_LSB_UV = 10 # Shunt voltage: 10µV/LSB
|
||||
|
||||
|
||||
# ── INA219 Simulator ───────────────────────────────────────────────────────
|
||||
|
||||
class INA219Simulator:
|
||||
def __init__(self):
|
||||
# Two sensors: left and right motor
|
||||
self.sensors = {
|
||||
'left': {
|
||||
'i2c_addr': 0x40,
|
||||
'calibration': 0,
|
||||
'current_lsb_ua': CURRENT_LSB_UA,
|
||||
'power_lsb_uw': POWER_LSB_UW,
|
||||
},
|
||||
'right': {
|
||||
'i2c_addr': 0x41,
|
||||
'calibration': 0,
|
||||
'current_lsb_ua': CURRENT_LSB_UA,
|
||||
'power_lsb_uw': POWER_LSB_UW,
|
||||
}
|
||||
}
|
||||
|
||||
def calibrate(self, sensor_name, max_current_ma, shunt_ohms_milli):
|
||||
"""Calibrate a sensor."""
|
||||
if sensor_name not in self.sensors:
|
||||
return False
|
||||
|
||||
s = self.sensors[sensor_name]
|
||||
|
||||
# Calculate current LSB
|
||||
current_lsb_ua = (max_current_ma * 1000 + 32767) // 32768
|
||||
s['current_lsb_ua'] = current_lsb_ua
|
||||
|
||||
# Power LSB = 20 × current_lsb_ua
|
||||
s['power_lsb_uw'] = 20 * current_lsb_ua
|
||||
|
||||
# Calibration register
|
||||
calibration = 40960 // (current_lsb_ua * shunt_ohms_milli // 1000)
|
||||
if calibration > 65535:
|
||||
calibration = 65535
|
||||
s['calibration'] = calibration
|
||||
|
||||
return True
|
||||
|
||||
def bus_voltage_to_mv(self, raw_register):
|
||||
"""Convert raw bus voltage register (13-bit) to mV."""
|
||||
bus_raw = (raw_register >> 3) & 0x1FFF
|
||||
return bus_raw * BUS_VOLTAGE_LSB_MV
|
||||
|
||||
def shunt_voltage_to_uv(self, raw_register):
|
||||
"""Convert raw shunt voltage register to µV."""
|
||||
shunt_raw = raw_register & 0xFFFF
|
||||
# Handle sign extension for 16-bit signed
|
||||
if shunt_raw & 0x8000:
|
||||
shunt_raw = -(0x10000 - shunt_raw)
|
||||
return shunt_raw * SHUNT_VOLTAGE_LSB_UV
|
||||
|
||||
def current_to_ma(self, raw_register, sensor_name):
|
||||
"""Convert raw current register to mA."""
|
||||
s = self.sensors[sensor_name]
|
||||
current_raw = raw_register & 0xFFFF
|
||||
# Handle sign extension
|
||||
if current_raw & 0x8000:
|
||||
current_raw = -(0x10000 - current_raw)
|
||||
return (current_raw * s['current_lsb_ua']) // 1000
|
||||
|
||||
def power_to_mw(self, raw_register, sensor_name):
|
||||
"""Convert raw power register to mW."""
|
||||
s = self.sensors[sensor_name]
|
||||
power_raw = raw_register & 0xFFFF
|
||||
return (power_raw * s['power_lsb_uw']) // 1000
|
||||
|
||||
|
||||
# ── Tests ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def test_calibration():
|
||||
"""Calibration should calculate correct LSB values."""
|
||||
sim = INA219Simulator()
|
||||
assert sim.calibrate('left', 5000, 100)
|
||||
|
||||
# Expected: 5000mA / 32768 ≈ 152.6, rounded up to 153µA
|
||||
assert sim.sensors['left']['current_lsb_ua'] == 153
|
||||
assert sim.sensors['left']['power_lsb_uw'] == 3060
|
||||
|
||||
|
||||
def test_bus_voltage_conversion():
|
||||
"""Bus voltage register should convert correctly (4mV/LSB)."""
|
||||
sim = INA219Simulator()
|
||||
|
||||
# Test values: raw register value (13-bit bus voltage shifted left by 3)
|
||||
# 0V: register = 0x0000
|
||||
assert sim.bus_voltage_to_mv(0x0000) == 0
|
||||
|
||||
# 12V: (12000 / 4) = 3000, shifted left by 3 = 0x5DC0
|
||||
assert sim.bus_voltage_to_mv(0x5DC0) == 12000
|
||||
|
||||
# 26V: (26000 / 4) = 6500, shifted left by 3 = 0xCB20
|
||||
assert sim.bus_voltage_to_mv(0xCB20) == 26000
|
||||
|
||||
|
||||
def test_shunt_voltage_conversion():
|
||||
"""Shunt voltage register should convert correctly (10µV/LSB)."""
|
||||
sim = INA219Simulator()
|
||||
|
||||
# 0µV
|
||||
assert sim.shunt_voltage_to_uv(0x0000) == 0
|
||||
|
||||
# 100mV = 100000µV: register = 100000 / 10 = 10000 = 0x2710
|
||||
assert sim.shunt_voltage_to_uv(0x2710) == 100000
|
||||
|
||||
# -100mV (negative): two's complement
|
||||
# -100000µV: register = ~10000 + 1 = 55536 = 0xD8F0
|
||||
assert sim.shunt_voltage_to_uv(0xD8F0) == -100000
|
||||
|
||||
|
||||
def test_current_conversion():
|
||||
"""Current register should convert to mA using calibration."""
|
||||
sim = INA219Simulator()
|
||||
sim.calibrate('left', 5000, 100)
|
||||
|
||||
# 0mA
|
||||
assert sim.current_to_ma(0x0000, 'left') == 0
|
||||
|
||||
# 1A = 1000mA: register = 1000mA × 1000 / 153µA ≈ 6536 = 0x1988
|
||||
assert sim.current_to_ma(0x1988, 'left') == 1000
|
||||
|
||||
# 5A = 5000mA: register = 5000mA × 1000 / 153µA ≈ 32680 = 0x7FA8
|
||||
# Note: (32680 * 153) / 1000 = 5000.6, integer division = 5000
|
||||
assert sim.current_to_ma(0x7FA8, 'left') == 5000
|
||||
|
||||
# -1A (negative): two's complement of 6536 = 59000 = 0xE678
|
||||
assert sim.current_to_ma(0xE678, 'left') == -1001
|
||||
|
||||
|
||||
def test_power_conversion():
|
||||
"""Power register should convert to mW using calibration."""
|
||||
sim = INA219Simulator()
|
||||
sim.calibrate('left', 5000, 100)
|
||||
|
||||
# 0W
|
||||
assert sim.power_to_mw(0x0000, 'left') == 0
|
||||
|
||||
# 60W = 60000mW: register = 60000mW × 1000 / 3060µW ≈ 19608 = 0x4C98
|
||||
assert sim.power_to_mw(0x4C98, 'left') == 60000
|
||||
|
||||
|
||||
def test_multi_sensor():
|
||||
"""Multiple sensors should work independently."""
|
||||
sim = INA219Simulator()
|
||||
assert sim.calibrate('left', 5000, 100)
|
||||
assert sim.calibrate('right', 5000, 100)
|
||||
|
||||
# Both should have same calibration
|
||||
assert sim.sensors['left']['current_lsb_ua'] == sim.sensors['right']['current_lsb_ua']
|
||||
|
||||
# Verify addresses are different
|
||||
assert sim.sensors['left']['i2c_addr'] == 0x40
|
||||
assert sim.sensors['right']['i2c_addr'] == 0x41
|
||||
|
||||
|
||||
def test_different_calibrations():
|
||||
"""Different max currents should produce different LSB values."""
|
||||
sim1 = INA219Simulator()
|
||||
sim2 = INA219Simulator()
|
||||
|
||||
sim1.calibrate('left', 5000, 100) # 5A
|
||||
sim2.calibrate('left', 10000, 100) # 10A
|
||||
|
||||
# Higher max current = larger LSB
|
||||
assert sim2.sensors['left']['current_lsb_ua'] > sim1.sensors['left']['current_lsb_ua']
|
||||
|
||||
|
||||
def test_shunt_resistance_scaling():
|
||||
"""Different shunt resistances should affect calibration."""
|
||||
sim1 = INA219Simulator()
|
||||
sim2 = INA219Simulator()
|
||||
|
||||
sim1.calibrate('left', 5000, 100) # 0.1Ω
|
||||
sim2.calibrate('left', 5000, 200) # 0.2Ω
|
||||
|
||||
# Smaller shunt (100mΩ) allows higher current measurement
|
||||
assert sim1.sensors['left']['calibration'] != sim2.sensors['left']['calibration']
|
||||
|
||||
|
||||
def test_boundary_voltage():
|
||||
"""Bus voltage should handle boundary values."""
|
||||
sim = INA219Simulator()
|
||||
|
||||
# Min (0V)
|
||||
assert sim.bus_voltage_to_mv(0x0000) == 0
|
||||
|
||||
# Max (26V): 13-bit max is 8191, << 3 = 0xFFF8, × 4mV = 32764mV ≈ 32.76V
|
||||
# Verify: (0xFFF8 >> 3) & 0x1FFF = 0x1FFF = 8191, × 4 = 32764
|
||||
assert sim.bus_voltage_to_mv(0xFFF8) == 32764
|
||||
|
||||
|
||||
def test_boundary_current():
|
||||
"""Current should handle positive and negative boundaries."""
|
||||
sim = INA219Simulator()
|
||||
sim.calibrate('left', 5000, 100)
|
||||
|
||||
# Max positive (~5A)
|
||||
max_current = sim.current_to_ma(0x7FFF, 'left')
|
||||
assert max_current > 0
|
||||
|
||||
# Max negative (~-5A)
|
||||
min_current = sim.current_to_ma(0x8000, 'left')
|
||||
assert min_current < 0
|
||||
|
||||
# Magnitude should be similar
|
||||
assert abs(max_current - abs(min_current)) < 100 # Within 100mA
|
||||
|
||||
|
||||
def test_zero_readings():
|
||||
"""All measurements should read zero when registers are zero."""
|
||||
sim = INA219Simulator()
|
||||
sim.calibrate('left', 5000, 100)
|
||||
|
||||
assert sim.bus_voltage_to_mv(0x0000) == 0
|
||||
assert sim.shunt_voltage_to_uv(0x0000) == 0
|
||||
assert sim.current_to_ma(0x0000, 'left') == 0
|
||||
assert sim.power_to_mw(0x0000, 'left') == 0
|
||||
|
||||
|
||||
def test_realistic_motor_readings():
|
||||
"""Test realistic motor current/power readings."""
|
||||
sim = INA219Simulator()
|
||||
sim.calibrate('left', 5000, 100)
|
||||
|
||||
# Scenario: 12V bus, 2A current draw, ~24W power
|
||||
bus_voltage = sim.bus_voltage_to_mv(0x5DC0) # ~12000mV
|
||||
current = sim.current_to_ma(0x3310, 'left') # ~2000mA
|
||||
power = sim.power_to_mw(0x1E93, 'left') # ~24000mW
|
||||
|
||||
assert bus_voltage > 10000 # ~12V
|
||||
assert 1500 < current < 2500 # ~2A
|
||||
assert 20000 < power < 30000 # ~24W
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
Loading…
x
Reference in New Issue
Block a user