feat(bridge): battery management node — SoC, alerts, speed limits (Issue #125) #133

Merged
sl-jetson merged 1 commits from sl-jetson/issue-125-battery-management into main 2026-03-02 09:27:53 -05:00
4 changed files with 921 additions and 0 deletions
Showing only changes of commit ea26cda76a - Show all commits

View File

@ -0,0 +1,24 @@
# battery_params.yaml — Battery management node configuration (Issue #125)
# ── SQLite history ─────────────────────────────────────────────────────────────
db_path: /var/log/saltybot/battery.db
history_retention_days: 7 # delete records older than N days
# ── Battery pack ──────────────────────────────────────────────────────────────
cell_count: 3 # 3S LiPo (11.1V nominal, 12.6V full)
# ── SoC alert thresholds ──────────────────────────────────────────────────────
warn_pct: 20.0 # WARNING: reduce speed to warn_speed_factor
critical_pct: 10.0 # CRITICAL: reduce speed to critical_speed_factor
emergency_pct: 5.0 # EMERGENCY: full stop + disarm
# ── Speed reduction factors (0.0 = stop, 1.0 = no reduction) ─────────────────
warn_speed_factor: 0.6 # 60% of normal speed at WARNING
critical_speed_factor: 0.3 # 30% of normal speed at CRITICAL
# ── Charging detection ────────────────────────────────────────────────────────
# current_ma below this value → charging detected
charge_detect_ma: -100 # -100 mA threshold (small negative allowed for noise)
# ── Publish rate ──────────────────────────────────────────────────────────────
publish_rate_hz: 1.0 # /saltybot/battery publish frequency

View File

@ -0,0 +1,59 @@
"""battery.launch.py — Launch the battery management node (Issue #125).
Usage:
ros2 launch saltybot_bridge battery.launch.py
# Custom thresholds:
ros2 launch saltybot_bridge battery.launch.py \
warn_pct:=25.0 critical_pct:=12.0 emergency_pct:=6.0
# Custom DB path:
ros2 launch saltybot_bridge battery.launch.py \
db_path:=/mnt/nvme/saltybot/battery.db
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description() -> LaunchDescription:
pkg = get_package_share_directory("saltybot_bridge")
params_file = os.path.join(pkg, "config", "battery_params.yaml")
return LaunchDescription([
DeclareLaunchArgument("db_path", default_value="/var/log/saltybot/battery.db"),
DeclareLaunchArgument("cell_count", default_value="3"),
DeclareLaunchArgument("warn_pct", default_value="20.0"),
DeclareLaunchArgument("critical_pct", default_value="10.0"),
DeclareLaunchArgument("emergency_pct", default_value="5.0"),
DeclareLaunchArgument("warn_speed_factor", default_value="0.6"),
DeclareLaunchArgument("critical_speed_factor", default_value="0.3"),
DeclareLaunchArgument("charge_detect_ma", default_value="-100"),
DeclareLaunchArgument("publish_rate_hz", default_value="1.0"),
Node(
package="saltybot_bridge",
executable="battery_node",
name="battery_node",
output="screen",
emulate_tty=True,
parameters=[
params_file,
{
"db_path": LaunchConfiguration("db_path"),
"cell_count": LaunchConfiguration("cell_count"),
"warn_pct": LaunchConfiguration("warn_pct"),
"critical_pct": LaunchConfiguration("critical_pct"),
"emergency_pct": LaunchConfiguration("emergency_pct"),
"warn_speed_factor": LaunchConfiguration("warn_speed_factor"),
"critical_speed_factor": LaunchConfiguration("critical_speed_factor"),
"charge_detect_ma": LaunchConfiguration("charge_detect_ma"),
"publish_rate_hz": LaunchConfiguration("publish_rate_hz"),
},
],
),
])

View File

@ -0,0 +1,409 @@
"""battery_node.py — Battery management for saltybot (Issue #125).
Subscribes to /saltybot/telemetry/battery (JSON from stm32_cmd_node) and:
- Publishes sensor_msgs/BatteryState on /saltybot/battery
- Publishes JSON alerts on /saltybot/battery/alert at threshold crossings
- Reduces speed limit at low SoC via /saltybot/speed_limit (std_msgs/Float32)
- Commands safe stop at critical SoC via /cmd_vel zero + /saltybot/arm False
- Logs history to SQLite (default: /var/log/saltybot/battery.db)
- Detects charging (current_ma < charge_detect_threshold_ma)
Alert levels (SoC thresholds):
20% WARNING reduce max speed to 60%
10% CRITICAL reduce max speed to 30%, log prominently
5% EMERGENCY publish zero /cmd_vel, disarm, log + alert
SoC source priority:
1. soc_pct field from STM32 BATTERY telemetry (fuel gauge or lookup on STM32)
2. Voltage-based lookup table (3S LiPo curve) if soc_pct == 0 and voltage known
Parameters (config/battery_params.yaml):
db_path /var/log/saltybot/battery.db
cell_count 3 (number of LiPo cells)
warn_pct 20.0
critical_pct 10.0
emergency_pct 5.0
warn_speed_factor 0.6
critical_speed_factor 0.3
charge_detect_ma -100 (current_ma below this charging)
history_retention_days 7
publish_rate_hz 1.0 (re-publish rate for /saltybot/battery)
"""
from __future__ import annotations
import json
import math
import os
import sqlite3
import time
from typing import Optional
import rclpy
from rclpy.node import Node
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
from geometry_msgs.msg import Twist
from sensor_msgs.msg import BatteryState
from std_msgs.msg import Float32, String
from std_srvs.srv import SetBool
# ── SoC voltage lookup table (3S LiPo, per-cell voltage → SoC %) ─────────────
# Based on typical LiPo discharge curve at 1C
_SOC_CURVE_PER_CELL = [
# (cell_v, soc_pct) — sorted ascending by voltage
(3.00, 0),
(3.27, 5),
(3.37, 10),
(3.53, 20),
(3.62, 30),
(3.68, 40),
(3.73, 50),
(3.78, 60),
(3.83, 70),
(3.90, 80),
(4.00, 90),
(4.20, 100),
]
# Alert level constants
_ALERT_NONE = "ok"
_ALERT_WARN = "warning"
_ALERT_CRITICAL = "critical"
_ALERT_EMERGENCY = "emergency"
def _soc_from_voltage(voltage_v: float, cell_count: int) -> int:
"""Estimate SoC from pack voltage using per-cell LiPo lookup table."""
if cell_count <= 0:
return 0
cell_v = voltage_v / cell_count
curve = _SOC_CURVE_PER_CELL
if cell_v <= curve[0][0]:
return 0
if cell_v >= curve[-1][0]:
return 100
# Linear interpolation between adjacent points
for i in range(len(curve) - 1):
v_lo, soc_lo = curve[i]
v_hi, soc_hi = curve[i + 1]
if v_lo <= cell_v <= v_hi:
t = (cell_v - v_lo) / (v_hi - v_lo)
return int(soc_lo + t * (soc_hi - soc_lo))
return 0
class BatteryNode(Node):
"""Battery management and monitoring node."""
def __init__(self) -> None:
super().__init__("battery_node")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("db_path", "/var/log/saltybot/battery.db")
self.declare_parameter("cell_count", 3)
self.declare_parameter("warn_pct", 20.0)
self.declare_parameter("critical_pct", 10.0)
self.declare_parameter("emergency_pct", 5.0)
self.declare_parameter("warn_speed_factor", 0.6)
self.declare_parameter("critical_speed_factor", 0.3)
self.declare_parameter("charge_detect_ma", -100)
self.declare_parameter("history_retention_days", 7)
self.declare_parameter("publish_rate_hz", 1.0)
self._db_path = self.get_parameter("db_path").value
self._cell_count = self.get_parameter("cell_count").value
self._warn_pct = self.get_parameter("warn_pct").value
self._critical_pct = self.get_parameter("critical_pct").value
self._emergency_pct = self.get_parameter("emergency_pct").value
self._warn_speed_factor = self.get_parameter("warn_speed_factor").value
self._critical_speed_factor = self.get_parameter("critical_speed_factor").value
self._charge_detect_ma = self.get_parameter("charge_detect_ma").value
self._retention_days = self.get_parameter("history_retention_days").value
publish_rate = self.get_parameter("publish_rate_hz").value
# ── QoS ───────────────────────────────────────────────────────────────
rel_qos = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST, depth=10,
)
# ── Publishers ────────────────────────────────────────────────────────
self._battery_pub = self.create_publisher(BatteryState, "/saltybot/battery", rel_qos)
self._alert_pub = self.create_publisher(String, "/saltybot/battery/alert", rel_qos)
self._speed_limit_pub = self.create_publisher(Float32, "/saltybot/speed_limit", rel_qos)
self._cmd_vel_pub = self.create_publisher(Twist, "/cmd_vel", rel_qos)
# ── Subscribers ───────────────────────────────────────────────────────
self._telem_sub = self.create_subscription(
String, "/saltybot/telemetry/battery",
self._on_battery_telem, rel_qos,
)
# ── Arm service client (for emergency stop) ───────────────────────────
self._arm_client = self.create_client(SetBool, "/saltybot/arm")
# ── State ─────────────────────────────────────────────────────────────
self._last_voltage_v = 0.0
self._last_current_ma = 0
self._last_soc_pct = 100
self._last_alert = _ALERT_NONE
self._charging = False
self._emergency_fired = False # prevent repeated disarm calls
# ── SQLite history ─────────────────────────────────────────────────────
self._db: Optional[sqlite3.Connection] = None
self._init_db()
# ── Timers ────────────────────────────────────────────────────────────
self._publish_timer = self.create_timer(1.0 / publish_rate, self._publish_cb)
self._prune_timer = self.create_timer(3600.0, self._prune_history) # hourly
self.get_logger().info(
f"battery_node started — warn={self._warn_pct}% "
f"critical={self._critical_pct}% "
f"emergency={self._emergency_pct}% "
f"cells={self._cell_count}"
)
# ── SQLite ────────────────────────────────────────────────────────────────
def _init_db(self) -> None:
"""Create the SQLite DB and battery_history table if not present."""
try:
db_dir = os.path.dirname(self._db_path)
if db_dir:
os.makedirs(db_dir, exist_ok=True)
self._db = sqlite3.connect(
self._db_path, check_same_thread=False, timeout=5.0
)
self._db.execute("""
CREATE TABLE IF NOT EXISTS battery_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts REAL NOT NULL,
voltage_v REAL NOT NULL,
current_ma INTEGER NOT NULL,
soc_pct INTEGER NOT NULL,
charging INTEGER NOT NULL,
alert_level TEXT NOT NULL
)
""")
self._db.execute(
"CREATE INDEX IF NOT EXISTS idx_ts ON battery_history(ts)"
)
self._db.commit()
self.get_logger().info(f"Battery history DB: {self._db_path}")
except Exception as exc:
self.get_logger().warn(f"Cannot open battery DB ({exc}) — logging disabled")
self._db = None
def _log_to_db(self, voltage_v: float, current_ma: int,
soc_pct: int, charging: bool, alert: str) -> None:
if self._db is None:
return
try:
self._db.execute(
"INSERT INTO battery_history(ts,voltage_v,current_ma,soc_pct,charging,alert_level) "
"VALUES (?,?,?,?,?,?)",
(time.time(), voltage_v, current_ma, soc_pct, int(charging), alert),
)
self._db.commit()
except Exception as exc:
self.get_logger().warn(f"Battery DB write error: {exc}")
def _prune_history(self) -> None:
"""Delete records older than history_retention_days."""
if self._db is None:
return
try:
cutoff = time.time() - self._retention_days * 86400
self._db.execute("DELETE FROM battery_history WHERE ts < ?", (cutoff,))
self._db.commit()
except Exception as exc:
self.get_logger().warn(f"Battery DB prune error: {exc}")
# ── Telemetry receive ─────────────────────────────────────────────────────
def _on_battery_telem(self, msg: String) -> None:
"""Parse JSON battery telemetry and update internal state."""
try:
data = json.loads(msg.data)
voltage_mv = int(data["voltage_mv"])
current_ma = int(data["current_ma"])
soc_from_fw = int(data.get("soc_pct", 0))
except (json.JSONDecodeError, KeyError, ValueError) as exc:
self.get_logger().warn(f"Bad battery telemetry: {exc}")
return
voltage_v = voltage_mv / 1000.0
charging = current_ma < self._charge_detect_ma
# SoC: prefer firmware value; fall back to voltage curve
if soc_from_fw > 0:
soc_pct = soc_from_fw
else:
soc_pct = _soc_from_voltage(voltage_v, self._cell_count)
self._last_voltage_v = voltage_v
self._last_current_ma = current_ma
self._last_soc_pct = soc_pct
self._charging = charging
# Check thresholds and act
new_alert = self._compute_alert(soc_pct)
if new_alert != self._last_alert:
self._handle_alert_change(new_alert, soc_pct, voltage_v)
# Log to SQLite
self._log_to_db(voltage_v, current_ma, soc_pct, charging, new_alert)
# ── Alert handling ────────────────────────────────────────────────────────
def _compute_alert(self, soc_pct: int) -> str:
"""Return the current alert level string for a given SoC."""
if soc_pct <= self._emergency_pct:
return _ALERT_EMERGENCY
if soc_pct <= self._critical_pct:
return _ALERT_CRITICAL
if soc_pct <= self._warn_pct:
return _ALERT_WARN
return _ALERT_NONE
def _handle_alert_change(self, new_alert: str, soc_pct: int, voltage_v: float) -> None:
"""Act on threshold transition."""
prev = self._last_alert
self._last_alert = new_alert
msg_text = (
f"Battery {new_alert.upper()}: {soc_pct}% SoC / {voltage_v:.2f}V "
f"(was {prev})"
)
if new_alert == _ALERT_NONE:
# Recovery (e.g., plugged in for charging)
self.get_logger().info(f"Battery OK: {soc_pct}% SoC")
self._publish_speed_limit(1.0)
elif new_alert == _ALERT_WARN:
self.get_logger().warn(msg_text)
self._publish_speed_limit(self._warn_speed_factor)
elif new_alert == _ALERT_CRITICAL:
self.get_logger().error(msg_text)
self._publish_speed_limit(self._critical_speed_factor)
elif new_alert == _ALERT_EMERGENCY:
self.get_logger().fatal(msg_text)
self._publish_speed_limit(0.0)
if not self._emergency_fired:
self._emergency_fired = True
self._execute_safe_stop()
# Publish alert message
alert_payload = json.dumps({
"level": new_alert,
"soc_pct": soc_pct,
"voltage_v": round(voltage_v, 3),
"prev_level": prev,
"ts": time.time(),
})
alert_msg = String()
alert_msg.data = alert_payload
self._alert_pub.publish(alert_msg)
def _publish_speed_limit(self, factor: float) -> None:
"""Publish normalised speed limit factor 0.01.0."""
msg = Float32()
msg.data = float(factor)
self._speed_limit_pub.publish(msg)
def _execute_safe_stop(self) -> None:
"""Send zero /cmd_vel and disarm the STM32."""
self.get_logger().fatal("EMERGENCY: publishing zero /cmd_vel and disarming")
# Publish zero velocity
zero_twist = Twist()
self._cmd_vel_pub.publish(zero_twist)
# Call /saltybot/arm(False) if service is available
if self._arm_client.service_is_ready():
req = SetBool.Request()
req.data = False
self._arm_client.call_async(req)
else:
self.get_logger().warn(
"/saltybot/arm service not available for emergency disarm"
)
# ── Periodic publish ──────────────────────────────────────────────────────
def _publish_cb(self) -> None:
"""Publish sensor_msgs/BatteryState at configured rate."""
if self._last_voltage_v == 0.0:
return # no telemetry received yet
msg = BatteryState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = "battery"
msg.voltage = self._last_voltage_v
msg.current = self._last_current_ma / 1000.0 # mA → A
msg.percentage = self._last_soc_pct / 100.0 # 0.01.0
# Capacity: 3S 5000 mAh pack (configurable in future)
msg.design_capacity = 5.0 # Ah
msg.capacity = float("nan")
msg.charge = float("nan")
msg.temperature = float("nan")
if self._charging:
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_CHARGING
elif self._last_soc_pct >= 99:
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_FULL
else:
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
msg.power_supply_technology = BatteryState.POWER_SUPPLY_TECHNOLOGY_LIPO
msg.power_supply_health = (
BatteryState.POWER_SUPPLY_HEALTH_GOOD
if self._last_alert in (_ALERT_NONE, _ALERT_WARN)
else BatteryState.POWER_SUPPLY_HEALTH_UNSPEC_FAILURE
)
msg.present = True
# Cell voltages (estimated, equal distribution assumption)
if self._cell_count > 0:
cell_v = self._last_voltage_v / self._cell_count
msg.cell_voltage = [cell_v] * self._cell_count
# Location field: alert level for easy monitoring
msg.location = self._last_alert
self._battery_pub.publish(msg)
# ── Lifecycle ─────────────────────────────────────────────────────────────
def destroy_node(self) -> None:
if self._db:
try:
self._db.close()
except Exception:
pass
super().destroy_node()
def main(args=None) -> None:
rclpy.init(args=args)
node = BatteryNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,429 @@
"""test_battery.py — Unit tests for battery_node SoC, alerts, and SQLite logging.
Tests:
- SoC lookup table: known voltages map to expected percentages
- SoC interpolation: midpoint between curve entries
- SoC clamping: below min 0%, above max 100%
- Alert level computation: correct level for various SoC values
- Alert threshold transitions: NONEWARNCRITICALEMERGENCY
- Speed factor assignments: correct factor per alert level
- Charging detection: current_ma < threshold charging
- sensor_msgs/BatteryState field population
- SQLite logging: records inserted with correct fields
- SQLite pruning: old records removed
- Voltage-based SoC fallback when firmware SoC == 0
- Multi-cell voltage scaling (3S, 4S)
Run with: pytest test/test_battery.py -v
No ROS2 runtime required.
"""
from __future__ import annotations
import json
import os
import sqlite3
import sys
import tempfile
import time
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from saltybot_bridge.battery_node import (
_soc_from_voltage,
_ALERT_NONE, _ALERT_WARN, _ALERT_CRITICAL, _ALERT_EMERGENCY,
_SOC_CURVE_PER_CELL,
)
# ═══════════════════════════════════════════════════════════════════
# SoC voltage lookup table
# ═══════════════════════════════════════════════════════════════════
class TestSocLookup:
def test_full_voltage_returns_100(self):
"""4.20 V/cell (3-cell = 12.60 V) → 100%."""
assert _soc_from_voltage(12.60, cell_count=3) == 100
def test_below_min_returns_0(self):
"""Below 3.0 V/cell → 0%."""
assert _soc_from_voltage(8.0, cell_count=3) == 0
def test_nominal_voltage_returns_50ish(self):
"""3S at ~11.1 V (3.7 V/cell) should be near 50%."""
soc = _soc_from_voltage(11.10, cell_count=3)
assert 45 <= soc <= 55
def test_warn_threshold_voltage_single_cell(self):
"""Single-cell 3.53 V is on the 20% curve point."""
assert _soc_from_voltage(3.53, cell_count=1) == 20
def test_critical_threshold_voltage(self):
"""3.37 V/cell → 10%."""
assert _soc_from_voltage(3.37, cell_count=1) == 10
def test_emergency_threshold_voltage(self):
"""3.27 V/cell → 5%."""
assert _soc_from_voltage(3.27, cell_count=1) == 5
def test_4s_pack_full(self):
"""4S fully charged: 4 × 4.20 = 16.80 V → 100%."""
assert _soc_from_voltage(16.80, cell_count=4) == 100
def test_4s_pack_nominal(self):
"""4S nominal: 4 × 3.7 = 14.8 V → ~50%."""
soc = _soc_from_voltage(14.80, cell_count=4)
assert 45 <= soc <= 55
def test_interpolation_midpoint(self):
"""Midpoint between two curve entries should yield midpoint SoC."""
# Between (3.27, 5) and (3.37, 10): midpoint ≈ 3.32 V → ~7.5%
mid_v = (3.27 + 3.37) / 2 * 3 # 3-cell pack voltage
soc = _soc_from_voltage(mid_v, cell_count=3)
assert 5 < soc < 10
def test_exactly_at_curve_point(self):
"""Values exactly on a curve point must return the tabulated SoC."""
for cell_v, expected_soc in _SOC_CURVE_PER_CELL:
pack_v = cell_v * 3 # 3-cell
result = _soc_from_voltage(pack_v, cell_count=3)
# Allow ±1 for floating-point interpolation edge
assert abs(result - expected_soc) <= 1, \
f"At {cell_v}V/cell ({pack_v}V 3S): expected {expected_soc}%, got {result}%"
def test_zero_cell_count_returns_0(self):
assert _soc_from_voltage(12.0, cell_count=0) == 0
def test_above_max_clamped_to_100(self):
"""Overcharged (e.g., 4.5 V/cell) → clamped to 100%."""
assert _soc_from_voltage(15.0, cell_count=3) == 100
def test_zero_voltage_returns_0(self):
assert _soc_from_voltage(0.0, cell_count=3) == 0
def test_returns_integer(self):
result = _soc_from_voltage(11.1, cell_count=3)
assert isinstance(result, int)
# ═══════════════════════════════════════════════════════════════════
# Alert level computation
# ═══════════════════════════════════════════════════════════════════
class TestAlertComputation:
"""Test _compute_alert() logic (instantiated via a minimal stub)."""
# Mirror of BatteryNode._compute_alert() for unit testing
@staticmethod
def _compute_alert(soc_pct, warn=20.0, crit=10.0, emergency=5.0):
if soc_pct <= emergency:
return _ALERT_EMERGENCY
if soc_pct <= crit:
return _ALERT_CRITICAL
if soc_pct <= warn:
return _ALERT_WARN
return _ALERT_NONE
def test_full_is_ok(self):
assert self._compute_alert(100) == _ALERT_NONE
def test_above_warn_is_ok(self):
assert self._compute_alert(21) == _ALERT_NONE
def test_at_warn_threshold(self):
assert self._compute_alert(20) == _ALERT_WARN
def test_between_warn_and_critical(self):
assert self._compute_alert(15) == _ALERT_WARN
def test_at_critical_threshold(self):
assert self._compute_alert(10) == _ALERT_CRITICAL
def test_between_critical_and_emergency(self):
assert self._compute_alert(7) == _ALERT_CRITICAL
def test_at_emergency_threshold(self):
assert self._compute_alert(5) == _ALERT_EMERGENCY
def test_below_emergency(self):
assert self._compute_alert(1) == _ALERT_EMERGENCY
def test_zero_is_emergency(self):
assert self._compute_alert(0) == _ALERT_EMERGENCY
def test_custom_thresholds(self):
assert self._compute_alert(25, warn=30, crit=15, emergency=8) == _ALERT_WARN
assert self._compute_alert(13, warn=30, crit=15, emergency=8) == _ALERT_WARN
assert self._compute_alert(12, warn=30, crit=15, emergency=8) == _ALERT_CRITICAL
def test_speed_factor_at_warn(self):
"""Speed factor at WARN must be 0 < factor < 1.0."""
factor = 0.6 # warn_speed_factor default
assert 0.0 < factor < 1.0
def test_speed_factor_at_critical(self):
factor = 0.3 # critical_speed_factor default
assert 0.0 < factor < 1.0
def test_speed_factor_at_emergency(self):
factor = 0.0 # full stop
assert factor == 0.0
# ═══════════════════════════════════════════════════════════════════
# Charging detection
# ═══════════════════════════════════════════════════════════════════
class TestChargingDetection:
THRESHOLD_MA = -100
def _is_charging(self, current_ma):
return current_ma < self.THRESHOLD_MA
def test_discharging_positive(self):
assert not self._is_charging(500)
def test_discharging_zero(self):
assert not self._is_charging(0)
def test_just_below_threshold_is_charging(self):
assert self._is_charging(-101)
def test_at_threshold_not_charging(self):
assert not self._is_charging(-100)
def test_deeply_negative_is_charging(self):
assert self._is_charging(-2000) # fast charge
def test_regen_braking_not_charging(self):
# Small negative current during regen isn't real charging
assert not self._is_charging(-50)
# ═══════════════════════════════════════════════════════════════════
# SQLite history logging
# ═══════════════════════════════════════════════════════════════════
class TestSqliteLogging:
"""Test the SQLite DB operations directly (no ROS2)."""
def _make_db(self):
"""Return an in-memory SQLite connection with the battery_history table."""
db = sqlite3.connect(":memory:")
db.execute("""
CREATE TABLE battery_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts REAL NOT NULL,
voltage_v REAL NOT NULL,
current_ma INTEGER NOT NULL,
soc_pct INTEGER NOT NULL,
charging INTEGER NOT NULL,
alert_level TEXT NOT NULL
)
""")
db.execute("CREATE INDEX idx_ts ON battery_history(ts)")
db.commit()
return db
def _insert(self, db, ts, voltage_v, current_ma, soc_pct, charging, alert):
db.execute(
"INSERT INTO battery_history(ts,voltage_v,current_ma,soc_pct,charging,alert_level) "
"VALUES (?,?,?,?,?,?)",
(ts, voltage_v, current_ma, soc_pct, int(charging), alert)
)
db.commit()
def test_insert_and_retrieve(self):
db = self._make_db()
self._insert(db, time.time(), 11.1, 500, 50, False, _ALERT_NONE)
row = db.execute("SELECT * FROM battery_history").fetchone()
assert row is not None
assert row[2] == pytest.approx(11.1)
assert row[3] == 500
assert row[4] == 50
assert row[5] == 0 # not charging
assert row[6] == _ALERT_NONE
def test_insert_charging_event(self):
db = self._make_db()
self._insert(db, time.time(), 12.0, -1500, 85, True, _ALERT_NONE)
row = db.execute("SELECT charging FROM battery_history").fetchone()
assert row[0] == 1 # charging = True
def test_insert_alert_event(self):
db = self._make_db()
self._insert(db, time.time(), 10.5, 300, 18, False, _ALERT_WARN)
row = db.execute("SELECT alert_level FROM battery_history").fetchone()
assert row[0] == _ALERT_WARN
def test_prune_old_records(self):
db = self._make_db()
old_ts = time.time() - 8 * 86400 # 8 days ago (>7 day retention)
new_ts = time.time()
self._insert(db, old_ts, 11.0, 100, 60, False, _ALERT_NONE)
self._insert(db, new_ts, 11.5, 200, 70, False, _ALERT_NONE)
retention_days = 7
cutoff = time.time() - retention_days * 86400
db.execute("DELETE FROM battery_history WHERE ts < ?", (cutoff,))
db.commit()
rows = db.execute("SELECT COUNT(*) FROM battery_history").fetchone()
assert rows[0] == 1 # only new record remains
def test_multiple_inserts_sequential_ids(self):
db = self._make_db()
for i in range(5):
self._insert(db, time.time() + i, 11.0 + i * 0.1, 100, 50 + i, False, _ALERT_NONE)
count = db.execute("SELECT COUNT(*) FROM battery_history").fetchone()[0]
assert count == 5
def test_db_on_disk(self):
"""Write and read a real on-disk SQLite DB."""
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
path = f.name
try:
db = sqlite3.connect(path)
db.execute("""CREATE TABLE battery_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts REAL, voltage_v REAL, current_ma INTEGER,
soc_pct INTEGER, charging INTEGER, alert_level TEXT
)""")
db.execute(
"INSERT INTO battery_history VALUES (NULL,?,?,?,?,?,?)",
(time.time(), 11.5, 500, 70, 0, _ALERT_NONE)
)
db.commit()
db.close()
# Reopen and verify
db2 = sqlite3.connect(path)
row = db2.execute("SELECT soc_pct FROM battery_history").fetchone()
assert row[0] == 70
db2.close()
finally:
os.unlink(path)
# ═══════════════════════════════════════════════════════════════════
# BatteryState field population
# ═══════════════════════════════════════════════════════════════════
class TestBatteryStateFields:
"""Verify sensor_msgs/BatteryState field logic without a live node."""
def _make_state(self, voltage_v, current_ma, soc_pct, charging,
cell_count=3, alert=_ALERT_NONE):
"""Minimal BatteryState-like dict mirrors _publish_cb logic."""
return {
"voltage": voltage_v,
"current": current_ma / 1000.0,
"percentage": soc_pct / 100.0,
"power_supply_status": "CHARGING" if charging else (
"FULL" if soc_pct >= 99 else "DISCHARGING"
),
"location": alert,
"cell_voltage": [voltage_v / cell_count] * cell_count,
}
def test_voltage_field(self):
state = self._make_state(11.1, 500, 50, False)
assert state["voltage"] == pytest.approx(11.1)
def test_current_converted_to_amps(self):
state = self._make_state(11.1, 1500, 50, False)
assert state["current"] == pytest.approx(1.5)
def test_percentage_normalised(self):
state = self._make_state(11.1, 500, 75, False)
assert state["percentage"] == pytest.approx(0.75)
def test_charging_status(self):
state = self._make_state(12.0, -1500, 85, True)
assert state["power_supply_status"] == "CHARGING"
def test_discharging_status(self):
state = self._make_state(11.1, 500, 50, False)
assert state["power_supply_status"] == "DISCHARGING"
def test_full_status_at_99_pct(self):
state = self._make_state(12.5, 0, 99, False)
assert state["power_supply_status"] == "FULL"
def test_cell_voltages_equal_distribution(self):
state = self._make_state(12.0, 0, 80, False, cell_count=3)
assert len(state["cell_voltage"]) == 3
for cv in state["cell_voltage"]:
assert cv == pytest.approx(4.0)
def test_alert_in_location_field(self):
state = self._make_state(10.5, 300, 18, False, alert=_ALERT_WARN)
assert state["location"] == _ALERT_WARN
def test_zero_percentage_at_empty(self):
state = self._make_state(9.6, 50, 0, False)
assert state["percentage"] == pytest.approx(0.0)
def test_full_percentage(self):
state = self._make_state(12.6, 0, 100, False)
assert state["percentage"] == pytest.approx(1.0)
# ═══════════════════════════════════════════════════════════════════
# Telemetry JSON parsing
# ═══════════════════════════════════════════════════════════════════
class TestTelemetryParsing:
"""Test the JSON parsing logic that the battery_node uses."""
@staticmethod
def _parse(json_str: str, charge_threshold: int = -100):
data = json.loads(json_str)
voltage_mv = int(data["voltage_mv"])
current_ma = int(data["current_ma"])
soc_from_fw = int(data.get("soc_pct", 0))
voltage_v = voltage_mv / 1000.0
charging = current_ma < charge_threshold
soc_pct = soc_from_fw if soc_from_fw > 0 else \
_soc_from_voltage(voltage_v, cell_count=3)
return voltage_v, current_ma, soc_pct, charging
def test_normal_discharge_telemetry(self):
v, i, soc, chg = self._parse(
'{"voltage_mv":11100,"current_ma":1000,"soc_pct":55}'
)
assert v == pytest.approx(11.1)
assert i == 1000
assert soc == 55 # from firmware
assert not chg
def test_charging_telemetry(self):
v, i, soc, chg = self._parse(
'{"voltage_mv":12000,"current_ma":-1500,"soc_pct":80}'
)
assert v == pytest.approx(12.0)
assert i == -1500
assert chg
def test_firmware_soc_zero_uses_voltage_curve(self):
"""If soc_pct == 0, fall back to voltage curve."""
v, i, soc, chg = self._parse(
'{"voltage_mv":11100,"current_ma":500,"soc_pct":0}'
)
# soc_pct=0 from firmware → use voltage 11.1V / 3 cells → ~50%
assert 45 <= soc <= 55
def test_missing_soc_falls_back_to_voltage(self):
"""soc_pct field absent → use voltage lookup."""
v, i, soc, chg = self._parse(
'{"voltage_mv":10500,"current_ma":300}'
)
assert soc < 25 # 10.5V (3.5V/cell) → ~20%
def test_parse_error_on_missing_fields(self):
with pytest.raises(KeyError):
self._parse('{"current_ma":500}') # missing voltage_mv