Merge pull request 'feat(bridge): battery management node — SoC, alerts, speed limits (Issue #125)' (#133) from sl-jetson/issue-125-battery-management into main
This commit is contained in:
commit
9281f3bc44
@ -0,0 +1,24 @@
|
|||||||
|
# battery_params.yaml — Battery management node configuration (Issue #125)
|
||||||
|
|
||||||
|
# ── SQLite history ─────────────────────────────────────────────────────────────
|
||||||
|
db_path: /var/log/saltybot/battery.db
|
||||||
|
history_retention_days: 7 # delete records older than N days
|
||||||
|
|
||||||
|
# ── Battery pack ──────────────────────────────────────────────────────────────
|
||||||
|
cell_count: 3 # 3S LiPo (11.1V nominal, 12.6V full)
|
||||||
|
|
||||||
|
# ── SoC alert thresholds ──────────────────────────────────────────────────────
|
||||||
|
warn_pct: 20.0 # WARNING: reduce speed to warn_speed_factor
|
||||||
|
critical_pct: 10.0 # CRITICAL: reduce speed to critical_speed_factor
|
||||||
|
emergency_pct: 5.0 # EMERGENCY: full stop + disarm
|
||||||
|
|
||||||
|
# ── Speed reduction factors (0.0 = stop, 1.0 = no reduction) ─────────────────
|
||||||
|
warn_speed_factor: 0.6 # 60% of normal speed at WARNING
|
||||||
|
critical_speed_factor: 0.3 # 30% of normal speed at CRITICAL
|
||||||
|
|
||||||
|
# ── Charging detection ────────────────────────────────────────────────────────
|
||||||
|
# current_ma below this value → charging detected
|
||||||
|
charge_detect_ma: -100 # -100 mA threshold (small negative allowed for noise)
|
||||||
|
|
||||||
|
# ── Publish rate ──────────────────────────────────────────────────────────────
|
||||||
|
publish_rate_hz: 1.0 # /saltybot/battery publish frequency
|
||||||
59
jetson/ros2_ws/src/saltybot_bridge/launch/battery.launch.py
Normal file
59
jetson/ros2_ws/src/saltybot_bridge/launch/battery.launch.py
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
"""battery.launch.py — Launch the battery management node (Issue #125).
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
ros2 launch saltybot_bridge battery.launch.py
|
||||||
|
|
||||||
|
# Custom thresholds:
|
||||||
|
ros2 launch saltybot_bridge battery.launch.py \
|
||||||
|
warn_pct:=25.0 critical_pct:=12.0 emergency_pct:=6.0
|
||||||
|
|
||||||
|
# Custom DB path:
|
||||||
|
ros2 launch saltybot_bridge battery.launch.py \
|
||||||
|
db_path:=/mnt/nvme/saltybot/battery.db
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description() -> LaunchDescription:
|
||||||
|
pkg = get_package_share_directory("saltybot_bridge")
|
||||||
|
params_file = os.path.join(pkg, "config", "battery_params.yaml")
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument("db_path", default_value="/var/log/saltybot/battery.db"),
|
||||||
|
DeclareLaunchArgument("cell_count", default_value="3"),
|
||||||
|
DeclareLaunchArgument("warn_pct", default_value="20.0"),
|
||||||
|
DeclareLaunchArgument("critical_pct", default_value="10.0"),
|
||||||
|
DeclareLaunchArgument("emergency_pct", default_value="5.0"),
|
||||||
|
DeclareLaunchArgument("warn_speed_factor", default_value="0.6"),
|
||||||
|
DeclareLaunchArgument("critical_speed_factor", default_value="0.3"),
|
||||||
|
DeclareLaunchArgument("charge_detect_ma", default_value="-100"),
|
||||||
|
DeclareLaunchArgument("publish_rate_hz", default_value="1.0"),
|
||||||
|
|
||||||
|
Node(
|
||||||
|
package="saltybot_bridge",
|
||||||
|
executable="battery_node",
|
||||||
|
name="battery_node",
|
||||||
|
output="screen",
|
||||||
|
emulate_tty=True,
|
||||||
|
parameters=[
|
||||||
|
params_file,
|
||||||
|
{
|
||||||
|
"db_path": LaunchConfiguration("db_path"),
|
||||||
|
"cell_count": LaunchConfiguration("cell_count"),
|
||||||
|
"warn_pct": LaunchConfiguration("warn_pct"),
|
||||||
|
"critical_pct": LaunchConfiguration("critical_pct"),
|
||||||
|
"emergency_pct": LaunchConfiguration("emergency_pct"),
|
||||||
|
"warn_speed_factor": LaunchConfiguration("warn_speed_factor"),
|
||||||
|
"critical_speed_factor": LaunchConfiguration("critical_speed_factor"),
|
||||||
|
"charge_detect_ma": LaunchConfiguration("charge_detect_ma"),
|
||||||
|
"publish_rate_hz": LaunchConfiguration("publish_rate_hz"),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
])
|
||||||
@ -0,0 +1,409 @@
|
|||||||
|
"""battery_node.py — Battery management for saltybot (Issue #125).
|
||||||
|
|
||||||
|
Subscribes to /saltybot/telemetry/battery (JSON from stm32_cmd_node) and:
|
||||||
|
- Publishes sensor_msgs/BatteryState on /saltybot/battery
|
||||||
|
- Publishes JSON alerts on /saltybot/battery/alert at threshold crossings
|
||||||
|
- Reduces speed limit at low SoC via /saltybot/speed_limit (std_msgs/Float32)
|
||||||
|
- Commands safe stop at critical SoC via /cmd_vel zero + /saltybot/arm False
|
||||||
|
- Logs history to SQLite (default: /var/log/saltybot/battery.db)
|
||||||
|
- Detects charging (current_ma < charge_detect_threshold_ma)
|
||||||
|
|
||||||
|
Alert levels (SoC thresholds):
|
||||||
|
20% WARNING — reduce max speed to 60%
|
||||||
|
10% CRITICAL — reduce max speed to 30%, log prominently
|
||||||
|
5% EMERGENCY — publish zero /cmd_vel, disarm, log + alert
|
||||||
|
|
||||||
|
SoC source priority:
|
||||||
|
1. soc_pct field from STM32 BATTERY telemetry (fuel gauge or lookup on STM32)
|
||||||
|
2. Voltage-based lookup table (3S LiPo curve) if soc_pct == 0 and voltage known
|
||||||
|
|
||||||
|
Parameters (config/battery_params.yaml):
|
||||||
|
db_path /var/log/saltybot/battery.db
|
||||||
|
cell_count 3 (number of LiPo cells)
|
||||||
|
warn_pct 20.0
|
||||||
|
critical_pct 10.0
|
||||||
|
emergency_pct 5.0
|
||||||
|
warn_speed_factor 0.6
|
||||||
|
critical_speed_factor 0.3
|
||||||
|
charge_detect_ma -100 (current_ma below this → charging)
|
||||||
|
history_retention_days 7
|
||||||
|
publish_rate_hz 1.0 (re-publish rate for /saltybot/battery)
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
import os
|
||||||
|
import sqlite3
|
||||||
|
import time
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
|
||||||
|
|
||||||
|
from geometry_msgs.msg import Twist
|
||||||
|
from sensor_msgs.msg import BatteryState
|
||||||
|
from std_msgs.msg import Float32, String
|
||||||
|
from std_srvs.srv import SetBool
|
||||||
|
|
||||||
|
# ── SoC voltage lookup table (3S LiPo, per-cell voltage → SoC %) ─────────────
|
||||||
|
# Based on typical LiPo discharge curve at 1C
|
||||||
|
_SOC_CURVE_PER_CELL = [
|
||||||
|
# (cell_v, soc_pct) — sorted ascending by voltage
|
||||||
|
(3.00, 0),
|
||||||
|
(3.27, 5),
|
||||||
|
(3.37, 10),
|
||||||
|
(3.53, 20),
|
||||||
|
(3.62, 30),
|
||||||
|
(3.68, 40),
|
||||||
|
(3.73, 50),
|
||||||
|
(3.78, 60),
|
||||||
|
(3.83, 70),
|
||||||
|
(3.90, 80),
|
||||||
|
(4.00, 90),
|
||||||
|
(4.20, 100),
|
||||||
|
]
|
||||||
|
|
||||||
|
# Alert level constants
|
||||||
|
_ALERT_NONE = "ok"
|
||||||
|
_ALERT_WARN = "warning"
|
||||||
|
_ALERT_CRITICAL = "critical"
|
||||||
|
_ALERT_EMERGENCY = "emergency"
|
||||||
|
|
||||||
|
|
||||||
|
def _soc_from_voltage(voltage_v: float, cell_count: int) -> int:
|
||||||
|
"""Estimate SoC from pack voltage using per-cell LiPo lookup table."""
|
||||||
|
if cell_count <= 0:
|
||||||
|
return 0
|
||||||
|
cell_v = voltage_v / cell_count
|
||||||
|
curve = _SOC_CURVE_PER_CELL
|
||||||
|
|
||||||
|
if cell_v <= curve[0][0]:
|
||||||
|
return 0
|
||||||
|
if cell_v >= curve[-1][0]:
|
||||||
|
return 100
|
||||||
|
|
||||||
|
# Linear interpolation between adjacent points
|
||||||
|
for i in range(len(curve) - 1):
|
||||||
|
v_lo, soc_lo = curve[i]
|
||||||
|
v_hi, soc_hi = curve[i + 1]
|
||||||
|
if v_lo <= cell_v <= v_hi:
|
||||||
|
t = (cell_v - v_lo) / (v_hi - v_lo)
|
||||||
|
return int(soc_lo + t * (soc_hi - soc_lo))
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
class BatteryNode(Node):
|
||||||
|
"""Battery management and monitoring node."""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__("battery_node")
|
||||||
|
|
||||||
|
# ── Parameters ────────────────────────────────────────────────────────
|
||||||
|
self.declare_parameter("db_path", "/var/log/saltybot/battery.db")
|
||||||
|
self.declare_parameter("cell_count", 3)
|
||||||
|
self.declare_parameter("warn_pct", 20.0)
|
||||||
|
self.declare_parameter("critical_pct", 10.0)
|
||||||
|
self.declare_parameter("emergency_pct", 5.0)
|
||||||
|
self.declare_parameter("warn_speed_factor", 0.6)
|
||||||
|
self.declare_parameter("critical_speed_factor", 0.3)
|
||||||
|
self.declare_parameter("charge_detect_ma", -100)
|
||||||
|
self.declare_parameter("history_retention_days", 7)
|
||||||
|
self.declare_parameter("publish_rate_hz", 1.0)
|
||||||
|
|
||||||
|
self._db_path = self.get_parameter("db_path").value
|
||||||
|
self._cell_count = self.get_parameter("cell_count").value
|
||||||
|
self._warn_pct = self.get_parameter("warn_pct").value
|
||||||
|
self._critical_pct = self.get_parameter("critical_pct").value
|
||||||
|
self._emergency_pct = self.get_parameter("emergency_pct").value
|
||||||
|
self._warn_speed_factor = self.get_parameter("warn_speed_factor").value
|
||||||
|
self._critical_speed_factor = self.get_parameter("critical_speed_factor").value
|
||||||
|
self._charge_detect_ma = self.get_parameter("charge_detect_ma").value
|
||||||
|
self._retention_days = self.get_parameter("history_retention_days").value
|
||||||
|
publish_rate = self.get_parameter("publish_rate_hz").value
|
||||||
|
|
||||||
|
# ── QoS ───────────────────────────────────────────────────────────────
|
||||||
|
rel_qos = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.RELIABLE,
|
||||||
|
history=HistoryPolicy.KEEP_LAST, depth=10,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Publishers ────────────────────────────────────────────────────────
|
||||||
|
self._battery_pub = self.create_publisher(BatteryState, "/saltybot/battery", rel_qos)
|
||||||
|
self._alert_pub = self.create_publisher(String, "/saltybot/battery/alert", rel_qos)
|
||||||
|
self._speed_limit_pub = self.create_publisher(Float32, "/saltybot/speed_limit", rel_qos)
|
||||||
|
self._cmd_vel_pub = self.create_publisher(Twist, "/cmd_vel", rel_qos)
|
||||||
|
|
||||||
|
# ── Subscribers ───────────────────────────────────────────────────────
|
||||||
|
self._telem_sub = self.create_subscription(
|
||||||
|
String, "/saltybot/telemetry/battery",
|
||||||
|
self._on_battery_telem, rel_qos,
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Arm service client (for emergency stop) ───────────────────────────
|
||||||
|
self._arm_client = self.create_client(SetBool, "/saltybot/arm")
|
||||||
|
|
||||||
|
# ── State ─────────────────────────────────────────────────────────────
|
||||||
|
self._last_voltage_v = 0.0
|
||||||
|
self._last_current_ma = 0
|
||||||
|
self._last_soc_pct = 100
|
||||||
|
self._last_alert = _ALERT_NONE
|
||||||
|
self._charging = False
|
||||||
|
self._emergency_fired = False # prevent repeated disarm calls
|
||||||
|
|
||||||
|
# ── SQLite history ─────────────────────────────────────────────────────
|
||||||
|
self._db: Optional[sqlite3.Connection] = None
|
||||||
|
self._init_db()
|
||||||
|
|
||||||
|
# ── Timers ────────────────────────────────────────────────────────────
|
||||||
|
self._publish_timer = self.create_timer(1.0 / publish_rate, self._publish_cb)
|
||||||
|
self._prune_timer = self.create_timer(3600.0, self._prune_history) # hourly
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"battery_node started — warn={self._warn_pct}% "
|
||||||
|
f"critical={self._critical_pct}% "
|
||||||
|
f"emergency={self._emergency_pct}% "
|
||||||
|
f"cells={self._cell_count}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── SQLite ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _init_db(self) -> None:
|
||||||
|
"""Create the SQLite DB and battery_history table if not present."""
|
||||||
|
try:
|
||||||
|
db_dir = os.path.dirname(self._db_path)
|
||||||
|
if db_dir:
|
||||||
|
os.makedirs(db_dir, exist_ok=True)
|
||||||
|
self._db = sqlite3.connect(
|
||||||
|
self._db_path, check_same_thread=False, timeout=5.0
|
||||||
|
)
|
||||||
|
self._db.execute("""
|
||||||
|
CREATE TABLE IF NOT EXISTS battery_history (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
ts REAL NOT NULL,
|
||||||
|
voltage_v REAL NOT NULL,
|
||||||
|
current_ma INTEGER NOT NULL,
|
||||||
|
soc_pct INTEGER NOT NULL,
|
||||||
|
charging INTEGER NOT NULL,
|
||||||
|
alert_level TEXT NOT NULL
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
self._db.execute(
|
||||||
|
"CREATE INDEX IF NOT EXISTS idx_ts ON battery_history(ts)"
|
||||||
|
)
|
||||||
|
self._db.commit()
|
||||||
|
self.get_logger().info(f"Battery history DB: {self._db_path}")
|
||||||
|
except Exception as exc:
|
||||||
|
self.get_logger().warn(f"Cannot open battery DB ({exc}) — logging disabled")
|
||||||
|
self._db = None
|
||||||
|
|
||||||
|
def _log_to_db(self, voltage_v: float, current_ma: int,
|
||||||
|
soc_pct: int, charging: bool, alert: str) -> None:
|
||||||
|
if self._db is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
self._db.execute(
|
||||||
|
"INSERT INTO battery_history(ts,voltage_v,current_ma,soc_pct,charging,alert_level) "
|
||||||
|
"VALUES (?,?,?,?,?,?)",
|
||||||
|
(time.time(), voltage_v, current_ma, soc_pct, int(charging), alert),
|
||||||
|
)
|
||||||
|
self._db.commit()
|
||||||
|
except Exception as exc:
|
||||||
|
self.get_logger().warn(f"Battery DB write error: {exc}")
|
||||||
|
|
||||||
|
def _prune_history(self) -> None:
|
||||||
|
"""Delete records older than history_retention_days."""
|
||||||
|
if self._db is None:
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
cutoff = time.time() - self._retention_days * 86400
|
||||||
|
self._db.execute("DELETE FROM battery_history WHERE ts < ?", (cutoff,))
|
||||||
|
self._db.commit()
|
||||||
|
except Exception as exc:
|
||||||
|
self.get_logger().warn(f"Battery DB prune error: {exc}")
|
||||||
|
|
||||||
|
# ── Telemetry receive ─────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _on_battery_telem(self, msg: String) -> None:
|
||||||
|
"""Parse JSON battery telemetry and update internal state."""
|
||||||
|
try:
|
||||||
|
data = json.loads(msg.data)
|
||||||
|
voltage_mv = int(data["voltage_mv"])
|
||||||
|
current_ma = int(data["current_ma"])
|
||||||
|
soc_from_fw = int(data.get("soc_pct", 0))
|
||||||
|
except (json.JSONDecodeError, KeyError, ValueError) as exc:
|
||||||
|
self.get_logger().warn(f"Bad battery telemetry: {exc}")
|
||||||
|
return
|
||||||
|
|
||||||
|
voltage_v = voltage_mv / 1000.0
|
||||||
|
charging = current_ma < self._charge_detect_ma
|
||||||
|
|
||||||
|
# SoC: prefer firmware value; fall back to voltage curve
|
||||||
|
if soc_from_fw > 0:
|
||||||
|
soc_pct = soc_from_fw
|
||||||
|
else:
|
||||||
|
soc_pct = _soc_from_voltage(voltage_v, self._cell_count)
|
||||||
|
|
||||||
|
self._last_voltage_v = voltage_v
|
||||||
|
self._last_current_ma = current_ma
|
||||||
|
self._last_soc_pct = soc_pct
|
||||||
|
self._charging = charging
|
||||||
|
|
||||||
|
# Check thresholds and act
|
||||||
|
new_alert = self._compute_alert(soc_pct)
|
||||||
|
if new_alert != self._last_alert:
|
||||||
|
self._handle_alert_change(new_alert, soc_pct, voltage_v)
|
||||||
|
|
||||||
|
# Log to SQLite
|
||||||
|
self._log_to_db(voltage_v, current_ma, soc_pct, charging, new_alert)
|
||||||
|
|
||||||
|
# ── Alert handling ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _compute_alert(self, soc_pct: int) -> str:
|
||||||
|
"""Return the current alert level string for a given SoC."""
|
||||||
|
if soc_pct <= self._emergency_pct:
|
||||||
|
return _ALERT_EMERGENCY
|
||||||
|
if soc_pct <= self._critical_pct:
|
||||||
|
return _ALERT_CRITICAL
|
||||||
|
if soc_pct <= self._warn_pct:
|
||||||
|
return _ALERT_WARN
|
||||||
|
return _ALERT_NONE
|
||||||
|
|
||||||
|
def _handle_alert_change(self, new_alert: str, soc_pct: int, voltage_v: float) -> None:
|
||||||
|
"""Act on threshold transition."""
|
||||||
|
prev = self._last_alert
|
||||||
|
self._last_alert = new_alert
|
||||||
|
|
||||||
|
msg_text = (
|
||||||
|
f"Battery {new_alert.upper()}: {soc_pct}% SoC / {voltage_v:.2f}V "
|
||||||
|
f"(was {prev})"
|
||||||
|
)
|
||||||
|
|
||||||
|
if new_alert == _ALERT_NONE:
|
||||||
|
# Recovery (e.g., plugged in for charging)
|
||||||
|
self.get_logger().info(f"Battery OK: {soc_pct}% SoC")
|
||||||
|
self._publish_speed_limit(1.0)
|
||||||
|
|
||||||
|
elif new_alert == _ALERT_WARN:
|
||||||
|
self.get_logger().warn(msg_text)
|
||||||
|
self._publish_speed_limit(self._warn_speed_factor)
|
||||||
|
|
||||||
|
elif new_alert == _ALERT_CRITICAL:
|
||||||
|
self.get_logger().error(msg_text)
|
||||||
|
self._publish_speed_limit(self._critical_speed_factor)
|
||||||
|
|
||||||
|
elif new_alert == _ALERT_EMERGENCY:
|
||||||
|
self.get_logger().fatal(msg_text)
|
||||||
|
self._publish_speed_limit(0.0)
|
||||||
|
if not self._emergency_fired:
|
||||||
|
self._emergency_fired = True
|
||||||
|
self._execute_safe_stop()
|
||||||
|
|
||||||
|
# Publish alert message
|
||||||
|
alert_payload = json.dumps({
|
||||||
|
"level": new_alert,
|
||||||
|
"soc_pct": soc_pct,
|
||||||
|
"voltage_v": round(voltage_v, 3),
|
||||||
|
"prev_level": prev,
|
||||||
|
"ts": time.time(),
|
||||||
|
})
|
||||||
|
alert_msg = String()
|
||||||
|
alert_msg.data = alert_payload
|
||||||
|
self._alert_pub.publish(alert_msg)
|
||||||
|
|
||||||
|
def _publish_speed_limit(self, factor: float) -> None:
|
||||||
|
"""Publish normalised speed limit factor 0.0–1.0."""
|
||||||
|
msg = Float32()
|
||||||
|
msg.data = float(factor)
|
||||||
|
self._speed_limit_pub.publish(msg)
|
||||||
|
|
||||||
|
def _execute_safe_stop(self) -> None:
|
||||||
|
"""Send zero /cmd_vel and disarm the STM32."""
|
||||||
|
self.get_logger().fatal("EMERGENCY: publishing zero /cmd_vel and disarming")
|
||||||
|
# Publish zero velocity
|
||||||
|
zero_twist = Twist()
|
||||||
|
self._cmd_vel_pub.publish(zero_twist)
|
||||||
|
|
||||||
|
# Call /saltybot/arm(False) if service is available
|
||||||
|
if self._arm_client.service_is_ready():
|
||||||
|
req = SetBool.Request()
|
||||||
|
req.data = False
|
||||||
|
self._arm_client.call_async(req)
|
||||||
|
else:
|
||||||
|
self.get_logger().warn(
|
||||||
|
"/saltybot/arm service not available for emergency disarm"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ── Periodic publish ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def _publish_cb(self) -> None:
|
||||||
|
"""Publish sensor_msgs/BatteryState at configured rate."""
|
||||||
|
if self._last_voltage_v == 0.0:
|
||||||
|
return # no telemetry received yet
|
||||||
|
|
||||||
|
msg = BatteryState()
|
||||||
|
msg.header.stamp = self.get_clock().now().to_msg()
|
||||||
|
msg.header.frame_id = "battery"
|
||||||
|
|
||||||
|
msg.voltage = self._last_voltage_v
|
||||||
|
msg.current = self._last_current_ma / 1000.0 # mA → A
|
||||||
|
msg.percentage = self._last_soc_pct / 100.0 # 0.0–1.0
|
||||||
|
|
||||||
|
# Capacity: 3S 5000 mAh pack (configurable in future)
|
||||||
|
msg.design_capacity = 5.0 # Ah
|
||||||
|
msg.capacity = float("nan")
|
||||||
|
msg.charge = float("nan")
|
||||||
|
msg.temperature = float("nan")
|
||||||
|
|
||||||
|
if self._charging:
|
||||||
|
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_CHARGING
|
||||||
|
elif self._last_soc_pct >= 99:
|
||||||
|
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_FULL
|
||||||
|
else:
|
||||||
|
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
|
||||||
|
|
||||||
|
msg.power_supply_technology = BatteryState.POWER_SUPPLY_TECHNOLOGY_LIPO
|
||||||
|
msg.power_supply_health = (
|
||||||
|
BatteryState.POWER_SUPPLY_HEALTH_GOOD
|
||||||
|
if self._last_alert in (_ALERT_NONE, _ALERT_WARN)
|
||||||
|
else BatteryState.POWER_SUPPLY_HEALTH_UNSPEC_FAILURE
|
||||||
|
)
|
||||||
|
msg.present = True
|
||||||
|
|
||||||
|
# Cell voltages (estimated, equal distribution assumption)
|
||||||
|
if self._cell_count > 0:
|
||||||
|
cell_v = self._last_voltage_v / self._cell_count
|
||||||
|
msg.cell_voltage = [cell_v] * self._cell_count
|
||||||
|
|
||||||
|
# Location field: alert level for easy monitoring
|
||||||
|
msg.location = self._last_alert
|
||||||
|
|
||||||
|
self._battery_pub.publish(msg)
|
||||||
|
|
||||||
|
# ── Lifecycle ─────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
def destroy_node(self) -> None:
|
||||||
|
if self._db:
|
||||||
|
try:
|
||||||
|
self._db.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None) -> None:
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = BatteryNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
429
jetson/ros2_ws/src/saltybot_bridge/test/test_battery.py
Normal file
429
jetson/ros2_ws/src/saltybot_bridge/test/test_battery.py
Normal file
@ -0,0 +1,429 @@
|
|||||||
|
"""test_battery.py — Unit tests for battery_node SoC, alerts, and SQLite logging.
|
||||||
|
|
||||||
|
Tests:
|
||||||
|
- SoC lookup table: known voltages map to expected percentages
|
||||||
|
- SoC interpolation: midpoint between curve entries
|
||||||
|
- SoC clamping: below min → 0%, above max → 100%
|
||||||
|
- Alert level computation: correct level for various SoC values
|
||||||
|
- Alert threshold transitions: NONE→WARN→CRITICAL→EMERGENCY
|
||||||
|
- Speed factor assignments: correct factor per alert level
|
||||||
|
- Charging detection: current_ma < threshold → charging
|
||||||
|
- sensor_msgs/BatteryState field population
|
||||||
|
- SQLite logging: records inserted with correct fields
|
||||||
|
- SQLite pruning: old records removed
|
||||||
|
- Voltage-based SoC fallback when firmware SoC == 0
|
||||||
|
- Multi-cell voltage scaling (3S, 4S)
|
||||||
|
|
||||||
|
Run with: pytest test/test_battery.py -v
|
||||||
|
No ROS2 runtime required.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sqlite3
|
||||||
|
import sys
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
||||||
|
|
||||||
|
from saltybot_bridge.battery_node import (
|
||||||
|
_soc_from_voltage,
|
||||||
|
_ALERT_NONE, _ALERT_WARN, _ALERT_CRITICAL, _ALERT_EMERGENCY,
|
||||||
|
_SOC_CURVE_PER_CELL,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# SoC voltage lookup table
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
class TestSocLookup:
|
||||||
|
|
||||||
|
def test_full_voltage_returns_100(self):
|
||||||
|
"""4.20 V/cell (3-cell = 12.60 V) → 100%."""
|
||||||
|
assert _soc_from_voltage(12.60, cell_count=3) == 100
|
||||||
|
|
||||||
|
def test_below_min_returns_0(self):
|
||||||
|
"""Below 3.0 V/cell → 0%."""
|
||||||
|
assert _soc_from_voltage(8.0, cell_count=3) == 0
|
||||||
|
|
||||||
|
def test_nominal_voltage_returns_50ish(self):
|
||||||
|
"""3S at ~11.1 V (3.7 V/cell) should be near 50%."""
|
||||||
|
soc = _soc_from_voltage(11.10, cell_count=3)
|
||||||
|
assert 45 <= soc <= 55
|
||||||
|
|
||||||
|
def test_warn_threshold_voltage_single_cell(self):
|
||||||
|
"""Single-cell 3.53 V is on the 20% curve point."""
|
||||||
|
assert _soc_from_voltage(3.53, cell_count=1) == 20
|
||||||
|
|
||||||
|
def test_critical_threshold_voltage(self):
|
||||||
|
"""3.37 V/cell → 10%."""
|
||||||
|
assert _soc_from_voltage(3.37, cell_count=1) == 10
|
||||||
|
|
||||||
|
def test_emergency_threshold_voltage(self):
|
||||||
|
"""3.27 V/cell → 5%."""
|
||||||
|
assert _soc_from_voltage(3.27, cell_count=1) == 5
|
||||||
|
|
||||||
|
def test_4s_pack_full(self):
|
||||||
|
"""4S fully charged: 4 × 4.20 = 16.80 V → 100%."""
|
||||||
|
assert _soc_from_voltage(16.80, cell_count=4) == 100
|
||||||
|
|
||||||
|
def test_4s_pack_nominal(self):
|
||||||
|
"""4S nominal: 4 × 3.7 = 14.8 V → ~50%."""
|
||||||
|
soc = _soc_from_voltage(14.80, cell_count=4)
|
||||||
|
assert 45 <= soc <= 55
|
||||||
|
|
||||||
|
def test_interpolation_midpoint(self):
|
||||||
|
"""Midpoint between two curve entries should yield midpoint SoC."""
|
||||||
|
# Between (3.27, 5) and (3.37, 10): midpoint ≈ 3.32 V → ~7.5%
|
||||||
|
mid_v = (3.27 + 3.37) / 2 * 3 # 3-cell pack voltage
|
||||||
|
soc = _soc_from_voltage(mid_v, cell_count=3)
|
||||||
|
assert 5 < soc < 10
|
||||||
|
|
||||||
|
def test_exactly_at_curve_point(self):
|
||||||
|
"""Values exactly on a curve point must return the tabulated SoC."""
|
||||||
|
for cell_v, expected_soc in _SOC_CURVE_PER_CELL:
|
||||||
|
pack_v = cell_v * 3 # 3-cell
|
||||||
|
result = _soc_from_voltage(pack_v, cell_count=3)
|
||||||
|
# Allow ±1 for floating-point interpolation edge
|
||||||
|
assert abs(result - expected_soc) <= 1, \
|
||||||
|
f"At {cell_v}V/cell ({pack_v}V 3S): expected {expected_soc}%, got {result}%"
|
||||||
|
|
||||||
|
def test_zero_cell_count_returns_0(self):
|
||||||
|
assert _soc_from_voltage(12.0, cell_count=0) == 0
|
||||||
|
|
||||||
|
def test_above_max_clamped_to_100(self):
|
||||||
|
"""Overcharged (e.g., 4.5 V/cell) → clamped to 100%."""
|
||||||
|
assert _soc_from_voltage(15.0, cell_count=3) == 100
|
||||||
|
|
||||||
|
def test_zero_voltage_returns_0(self):
|
||||||
|
assert _soc_from_voltage(0.0, cell_count=3) == 0
|
||||||
|
|
||||||
|
def test_returns_integer(self):
|
||||||
|
result = _soc_from_voltage(11.1, cell_count=3)
|
||||||
|
assert isinstance(result, int)
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# Alert level computation
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
class TestAlertComputation:
|
||||||
|
"""Test _compute_alert() logic (instantiated via a minimal stub)."""
|
||||||
|
|
||||||
|
# Mirror of BatteryNode._compute_alert() for unit testing
|
||||||
|
@staticmethod
|
||||||
|
def _compute_alert(soc_pct, warn=20.0, crit=10.0, emergency=5.0):
|
||||||
|
if soc_pct <= emergency:
|
||||||
|
return _ALERT_EMERGENCY
|
||||||
|
if soc_pct <= crit:
|
||||||
|
return _ALERT_CRITICAL
|
||||||
|
if soc_pct <= warn:
|
||||||
|
return _ALERT_WARN
|
||||||
|
return _ALERT_NONE
|
||||||
|
|
||||||
|
def test_full_is_ok(self):
|
||||||
|
assert self._compute_alert(100) == _ALERT_NONE
|
||||||
|
|
||||||
|
def test_above_warn_is_ok(self):
|
||||||
|
assert self._compute_alert(21) == _ALERT_NONE
|
||||||
|
|
||||||
|
def test_at_warn_threshold(self):
|
||||||
|
assert self._compute_alert(20) == _ALERT_WARN
|
||||||
|
|
||||||
|
def test_between_warn_and_critical(self):
|
||||||
|
assert self._compute_alert(15) == _ALERT_WARN
|
||||||
|
|
||||||
|
def test_at_critical_threshold(self):
|
||||||
|
assert self._compute_alert(10) == _ALERT_CRITICAL
|
||||||
|
|
||||||
|
def test_between_critical_and_emergency(self):
|
||||||
|
assert self._compute_alert(7) == _ALERT_CRITICAL
|
||||||
|
|
||||||
|
def test_at_emergency_threshold(self):
|
||||||
|
assert self._compute_alert(5) == _ALERT_EMERGENCY
|
||||||
|
|
||||||
|
def test_below_emergency(self):
|
||||||
|
assert self._compute_alert(1) == _ALERT_EMERGENCY
|
||||||
|
|
||||||
|
def test_zero_is_emergency(self):
|
||||||
|
assert self._compute_alert(0) == _ALERT_EMERGENCY
|
||||||
|
|
||||||
|
def test_custom_thresholds(self):
|
||||||
|
assert self._compute_alert(25, warn=30, crit=15, emergency=8) == _ALERT_WARN
|
||||||
|
assert self._compute_alert(13, warn=30, crit=15, emergency=8) == _ALERT_WARN
|
||||||
|
assert self._compute_alert(12, warn=30, crit=15, emergency=8) == _ALERT_CRITICAL
|
||||||
|
|
||||||
|
def test_speed_factor_at_warn(self):
|
||||||
|
"""Speed factor at WARN must be 0 < factor < 1.0."""
|
||||||
|
factor = 0.6 # warn_speed_factor default
|
||||||
|
assert 0.0 < factor < 1.0
|
||||||
|
|
||||||
|
def test_speed_factor_at_critical(self):
|
||||||
|
factor = 0.3 # critical_speed_factor default
|
||||||
|
assert 0.0 < factor < 1.0
|
||||||
|
|
||||||
|
def test_speed_factor_at_emergency(self):
|
||||||
|
factor = 0.0 # full stop
|
||||||
|
assert factor == 0.0
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# Charging detection
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
class TestChargingDetection:
|
||||||
|
|
||||||
|
THRESHOLD_MA = -100
|
||||||
|
|
||||||
|
def _is_charging(self, current_ma):
|
||||||
|
return current_ma < self.THRESHOLD_MA
|
||||||
|
|
||||||
|
def test_discharging_positive(self):
|
||||||
|
assert not self._is_charging(500)
|
||||||
|
|
||||||
|
def test_discharging_zero(self):
|
||||||
|
assert not self._is_charging(0)
|
||||||
|
|
||||||
|
def test_just_below_threshold_is_charging(self):
|
||||||
|
assert self._is_charging(-101)
|
||||||
|
|
||||||
|
def test_at_threshold_not_charging(self):
|
||||||
|
assert not self._is_charging(-100)
|
||||||
|
|
||||||
|
def test_deeply_negative_is_charging(self):
|
||||||
|
assert self._is_charging(-2000) # fast charge
|
||||||
|
|
||||||
|
def test_regen_braking_not_charging(self):
|
||||||
|
# Small negative current during regen isn't real charging
|
||||||
|
assert not self._is_charging(-50)
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# SQLite history logging
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
class TestSqliteLogging:
|
||||||
|
"""Test the SQLite DB operations directly (no ROS2)."""
|
||||||
|
|
||||||
|
def _make_db(self):
|
||||||
|
"""Return an in-memory SQLite connection with the battery_history table."""
|
||||||
|
db = sqlite3.connect(":memory:")
|
||||||
|
db.execute("""
|
||||||
|
CREATE TABLE battery_history (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
ts REAL NOT NULL,
|
||||||
|
voltage_v REAL NOT NULL,
|
||||||
|
current_ma INTEGER NOT NULL,
|
||||||
|
soc_pct INTEGER NOT NULL,
|
||||||
|
charging INTEGER NOT NULL,
|
||||||
|
alert_level TEXT NOT NULL
|
||||||
|
)
|
||||||
|
""")
|
||||||
|
db.execute("CREATE INDEX idx_ts ON battery_history(ts)")
|
||||||
|
db.commit()
|
||||||
|
return db
|
||||||
|
|
||||||
|
def _insert(self, db, ts, voltage_v, current_ma, soc_pct, charging, alert):
|
||||||
|
db.execute(
|
||||||
|
"INSERT INTO battery_history(ts,voltage_v,current_ma,soc_pct,charging,alert_level) "
|
||||||
|
"VALUES (?,?,?,?,?,?)",
|
||||||
|
(ts, voltage_v, current_ma, soc_pct, int(charging), alert)
|
||||||
|
)
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
def test_insert_and_retrieve(self):
|
||||||
|
db = self._make_db()
|
||||||
|
self._insert(db, time.time(), 11.1, 500, 50, False, _ALERT_NONE)
|
||||||
|
row = db.execute("SELECT * FROM battery_history").fetchone()
|
||||||
|
assert row is not None
|
||||||
|
assert row[2] == pytest.approx(11.1)
|
||||||
|
assert row[3] == 500
|
||||||
|
assert row[4] == 50
|
||||||
|
assert row[5] == 0 # not charging
|
||||||
|
assert row[6] == _ALERT_NONE
|
||||||
|
|
||||||
|
def test_insert_charging_event(self):
|
||||||
|
db = self._make_db()
|
||||||
|
self._insert(db, time.time(), 12.0, -1500, 85, True, _ALERT_NONE)
|
||||||
|
row = db.execute("SELECT charging FROM battery_history").fetchone()
|
||||||
|
assert row[0] == 1 # charging = True
|
||||||
|
|
||||||
|
def test_insert_alert_event(self):
|
||||||
|
db = self._make_db()
|
||||||
|
self._insert(db, time.time(), 10.5, 300, 18, False, _ALERT_WARN)
|
||||||
|
row = db.execute("SELECT alert_level FROM battery_history").fetchone()
|
||||||
|
assert row[0] == _ALERT_WARN
|
||||||
|
|
||||||
|
def test_prune_old_records(self):
|
||||||
|
db = self._make_db()
|
||||||
|
old_ts = time.time() - 8 * 86400 # 8 days ago (>7 day retention)
|
||||||
|
new_ts = time.time()
|
||||||
|
self._insert(db, old_ts, 11.0, 100, 60, False, _ALERT_NONE)
|
||||||
|
self._insert(db, new_ts, 11.5, 200, 70, False, _ALERT_NONE)
|
||||||
|
|
||||||
|
retention_days = 7
|
||||||
|
cutoff = time.time() - retention_days * 86400
|
||||||
|
db.execute("DELETE FROM battery_history WHERE ts < ?", (cutoff,))
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
rows = db.execute("SELECT COUNT(*) FROM battery_history").fetchone()
|
||||||
|
assert rows[0] == 1 # only new record remains
|
||||||
|
|
||||||
|
def test_multiple_inserts_sequential_ids(self):
|
||||||
|
db = self._make_db()
|
||||||
|
for i in range(5):
|
||||||
|
self._insert(db, time.time() + i, 11.0 + i * 0.1, 100, 50 + i, False, _ALERT_NONE)
|
||||||
|
count = db.execute("SELECT COUNT(*) FROM battery_history").fetchone()[0]
|
||||||
|
assert count == 5
|
||||||
|
|
||||||
|
def test_db_on_disk(self):
|
||||||
|
"""Write and read a real on-disk SQLite DB."""
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
||||||
|
path = f.name
|
||||||
|
try:
|
||||||
|
db = sqlite3.connect(path)
|
||||||
|
db.execute("""CREATE TABLE battery_history (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
ts REAL, voltage_v REAL, current_ma INTEGER,
|
||||||
|
soc_pct INTEGER, charging INTEGER, alert_level TEXT
|
||||||
|
)""")
|
||||||
|
db.execute(
|
||||||
|
"INSERT INTO battery_history VALUES (NULL,?,?,?,?,?,?)",
|
||||||
|
(time.time(), 11.5, 500, 70, 0, _ALERT_NONE)
|
||||||
|
)
|
||||||
|
db.commit()
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
# Reopen and verify
|
||||||
|
db2 = sqlite3.connect(path)
|
||||||
|
row = db2.execute("SELECT soc_pct FROM battery_history").fetchone()
|
||||||
|
assert row[0] == 70
|
||||||
|
db2.close()
|
||||||
|
finally:
|
||||||
|
os.unlink(path)
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# BatteryState field population
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
class TestBatteryStateFields:
|
||||||
|
"""Verify sensor_msgs/BatteryState field logic without a live node."""
|
||||||
|
|
||||||
|
def _make_state(self, voltage_v, current_ma, soc_pct, charging,
|
||||||
|
cell_count=3, alert=_ALERT_NONE):
|
||||||
|
"""Minimal BatteryState-like dict mirrors _publish_cb logic."""
|
||||||
|
return {
|
||||||
|
"voltage": voltage_v,
|
||||||
|
"current": current_ma / 1000.0,
|
||||||
|
"percentage": soc_pct / 100.0,
|
||||||
|
"power_supply_status": "CHARGING" if charging else (
|
||||||
|
"FULL" if soc_pct >= 99 else "DISCHARGING"
|
||||||
|
),
|
||||||
|
"location": alert,
|
||||||
|
"cell_voltage": [voltage_v / cell_count] * cell_count,
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_voltage_field(self):
|
||||||
|
state = self._make_state(11.1, 500, 50, False)
|
||||||
|
assert state["voltage"] == pytest.approx(11.1)
|
||||||
|
|
||||||
|
def test_current_converted_to_amps(self):
|
||||||
|
state = self._make_state(11.1, 1500, 50, False)
|
||||||
|
assert state["current"] == pytest.approx(1.5)
|
||||||
|
|
||||||
|
def test_percentage_normalised(self):
|
||||||
|
state = self._make_state(11.1, 500, 75, False)
|
||||||
|
assert state["percentage"] == pytest.approx(0.75)
|
||||||
|
|
||||||
|
def test_charging_status(self):
|
||||||
|
state = self._make_state(12.0, -1500, 85, True)
|
||||||
|
assert state["power_supply_status"] == "CHARGING"
|
||||||
|
|
||||||
|
def test_discharging_status(self):
|
||||||
|
state = self._make_state(11.1, 500, 50, False)
|
||||||
|
assert state["power_supply_status"] == "DISCHARGING"
|
||||||
|
|
||||||
|
def test_full_status_at_99_pct(self):
|
||||||
|
state = self._make_state(12.5, 0, 99, False)
|
||||||
|
assert state["power_supply_status"] == "FULL"
|
||||||
|
|
||||||
|
def test_cell_voltages_equal_distribution(self):
|
||||||
|
state = self._make_state(12.0, 0, 80, False, cell_count=3)
|
||||||
|
assert len(state["cell_voltage"]) == 3
|
||||||
|
for cv in state["cell_voltage"]:
|
||||||
|
assert cv == pytest.approx(4.0)
|
||||||
|
|
||||||
|
def test_alert_in_location_field(self):
|
||||||
|
state = self._make_state(10.5, 300, 18, False, alert=_ALERT_WARN)
|
||||||
|
assert state["location"] == _ALERT_WARN
|
||||||
|
|
||||||
|
def test_zero_percentage_at_empty(self):
|
||||||
|
state = self._make_state(9.6, 50, 0, False)
|
||||||
|
assert state["percentage"] == pytest.approx(0.0)
|
||||||
|
|
||||||
|
def test_full_percentage(self):
|
||||||
|
state = self._make_state(12.6, 0, 100, False)
|
||||||
|
assert state["percentage"] == pytest.approx(1.0)
|
||||||
|
|
||||||
|
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
# Telemetry JSON parsing
|
||||||
|
# ═══════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
class TestTelemetryParsing:
|
||||||
|
"""Test the JSON parsing logic that the battery_node uses."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _parse(json_str: str, charge_threshold: int = -100):
|
||||||
|
data = json.loads(json_str)
|
||||||
|
voltage_mv = int(data["voltage_mv"])
|
||||||
|
current_ma = int(data["current_ma"])
|
||||||
|
soc_from_fw = int(data.get("soc_pct", 0))
|
||||||
|
voltage_v = voltage_mv / 1000.0
|
||||||
|
charging = current_ma < charge_threshold
|
||||||
|
soc_pct = soc_from_fw if soc_from_fw > 0 else \
|
||||||
|
_soc_from_voltage(voltage_v, cell_count=3)
|
||||||
|
return voltage_v, current_ma, soc_pct, charging
|
||||||
|
|
||||||
|
def test_normal_discharge_telemetry(self):
|
||||||
|
v, i, soc, chg = self._parse(
|
||||||
|
'{"voltage_mv":11100,"current_ma":1000,"soc_pct":55}'
|
||||||
|
)
|
||||||
|
assert v == pytest.approx(11.1)
|
||||||
|
assert i == 1000
|
||||||
|
assert soc == 55 # from firmware
|
||||||
|
assert not chg
|
||||||
|
|
||||||
|
def test_charging_telemetry(self):
|
||||||
|
v, i, soc, chg = self._parse(
|
||||||
|
'{"voltage_mv":12000,"current_ma":-1500,"soc_pct":80}'
|
||||||
|
)
|
||||||
|
assert v == pytest.approx(12.0)
|
||||||
|
assert i == -1500
|
||||||
|
assert chg
|
||||||
|
|
||||||
|
def test_firmware_soc_zero_uses_voltage_curve(self):
|
||||||
|
"""If soc_pct == 0, fall back to voltage curve."""
|
||||||
|
v, i, soc, chg = self._parse(
|
||||||
|
'{"voltage_mv":11100,"current_ma":500,"soc_pct":0}'
|
||||||
|
)
|
||||||
|
# soc_pct=0 from firmware → use voltage 11.1V / 3 cells → ~50%
|
||||||
|
assert 45 <= soc <= 55
|
||||||
|
|
||||||
|
def test_missing_soc_falls_back_to_voltage(self):
|
||||||
|
"""soc_pct field absent → use voltage lookup."""
|
||||||
|
v, i, soc, chg = self._parse(
|
||||||
|
'{"voltage_mv":10500,"current_ma":300}'
|
||||||
|
)
|
||||||
|
assert soc < 25 # 10.5V (3.5V/cell) → ~20%
|
||||||
|
|
||||||
|
def test_parse_error_on_missing_fields(self):
|
||||||
|
with pytest.raises(KeyError):
|
||||||
|
self._parse('{"current_ma":500}') # missing voltage_mv
|
||||||
Loading…
x
Reference in New Issue
Block a user