Compare commits
No commits in common. "9281f3bc44d7ebca94026ed9da6d0ff7e0e1b9ee" and "6da4ae885db94ec73fdf236d9a7647e7720f1bda" have entirely different histories.
9281f3bc44
...
6da4ae885d
@ -1,24 +0,0 @@
|
|||||||
# battery_params.yaml — Battery management node configuration (Issue #125)
|
|
||||||
|
|
||||||
# ── SQLite history ─────────────────────────────────────────────────────────────
|
|
||||||
db_path: /var/log/saltybot/battery.db
|
|
||||||
history_retention_days: 7 # delete records older than N days
|
|
||||||
|
|
||||||
# ── Battery pack ──────────────────────────────────────────────────────────────
|
|
||||||
cell_count: 3 # 3S LiPo (11.1V nominal, 12.6V full)
|
|
||||||
|
|
||||||
# ── SoC alert thresholds ──────────────────────────────────────────────────────
|
|
||||||
warn_pct: 20.0 # WARNING: reduce speed to warn_speed_factor
|
|
||||||
critical_pct: 10.0 # CRITICAL: reduce speed to critical_speed_factor
|
|
||||||
emergency_pct: 5.0 # EMERGENCY: full stop + disarm
|
|
||||||
|
|
||||||
# ── Speed reduction factors (0.0 = stop, 1.0 = no reduction) ─────────────────
|
|
||||||
warn_speed_factor: 0.6 # 60% of normal speed at WARNING
|
|
||||||
critical_speed_factor: 0.3 # 30% of normal speed at CRITICAL
|
|
||||||
|
|
||||||
# ── Charging detection ────────────────────────────────────────────────────────
|
|
||||||
# current_ma below this value → charging detected
|
|
||||||
charge_detect_ma: -100 # -100 mA threshold (small negative allowed for noise)
|
|
||||||
|
|
||||||
# ── Publish rate ──────────────────────────────────────────────────────────────
|
|
||||||
publish_rate_hz: 1.0 # /saltybot/battery publish frequency
|
|
||||||
@ -1,59 +0,0 @@
|
|||||||
"""battery.launch.py — Launch the battery management node (Issue #125).
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
ros2 launch saltybot_bridge battery.launch.py
|
|
||||||
|
|
||||||
# Custom thresholds:
|
|
||||||
ros2 launch saltybot_bridge battery.launch.py \
|
|
||||||
warn_pct:=25.0 critical_pct:=12.0 emergency_pct:=6.0
|
|
||||||
|
|
||||||
# Custom DB path:
|
|
||||||
ros2 launch saltybot_bridge battery.launch.py \
|
|
||||||
db_path:=/mnt/nvme/saltybot/battery.db
|
|
||||||
"""
|
|
||||||
|
|
||||||
import os
|
|
||||||
from ament_index_python.packages import get_package_share_directory
|
|
||||||
from launch import LaunchDescription
|
|
||||||
from launch.actions import DeclareLaunchArgument
|
|
||||||
from launch.substitutions import LaunchConfiguration
|
|
||||||
from launch_ros.actions import Node
|
|
||||||
|
|
||||||
|
|
||||||
def generate_launch_description() -> LaunchDescription:
|
|
||||||
pkg = get_package_share_directory("saltybot_bridge")
|
|
||||||
params_file = os.path.join(pkg, "config", "battery_params.yaml")
|
|
||||||
|
|
||||||
return LaunchDescription([
|
|
||||||
DeclareLaunchArgument("db_path", default_value="/var/log/saltybot/battery.db"),
|
|
||||||
DeclareLaunchArgument("cell_count", default_value="3"),
|
|
||||||
DeclareLaunchArgument("warn_pct", default_value="20.0"),
|
|
||||||
DeclareLaunchArgument("critical_pct", default_value="10.0"),
|
|
||||||
DeclareLaunchArgument("emergency_pct", default_value="5.0"),
|
|
||||||
DeclareLaunchArgument("warn_speed_factor", default_value="0.6"),
|
|
||||||
DeclareLaunchArgument("critical_speed_factor", default_value="0.3"),
|
|
||||||
DeclareLaunchArgument("charge_detect_ma", default_value="-100"),
|
|
||||||
DeclareLaunchArgument("publish_rate_hz", default_value="1.0"),
|
|
||||||
|
|
||||||
Node(
|
|
||||||
package="saltybot_bridge",
|
|
||||||
executable="battery_node",
|
|
||||||
name="battery_node",
|
|
||||||
output="screen",
|
|
||||||
emulate_tty=True,
|
|
||||||
parameters=[
|
|
||||||
params_file,
|
|
||||||
{
|
|
||||||
"db_path": LaunchConfiguration("db_path"),
|
|
||||||
"cell_count": LaunchConfiguration("cell_count"),
|
|
||||||
"warn_pct": LaunchConfiguration("warn_pct"),
|
|
||||||
"critical_pct": LaunchConfiguration("critical_pct"),
|
|
||||||
"emergency_pct": LaunchConfiguration("emergency_pct"),
|
|
||||||
"warn_speed_factor": LaunchConfiguration("warn_speed_factor"),
|
|
||||||
"critical_speed_factor": LaunchConfiguration("critical_speed_factor"),
|
|
||||||
"charge_detect_ma": LaunchConfiguration("charge_detect_ma"),
|
|
||||||
"publish_rate_hz": LaunchConfiguration("publish_rate_hz"),
|
|
||||||
},
|
|
||||||
],
|
|
||||||
),
|
|
||||||
])
|
|
||||||
@ -1,409 +0,0 @@
|
|||||||
"""battery_node.py — Battery management for saltybot (Issue #125).
|
|
||||||
|
|
||||||
Subscribes to /saltybot/telemetry/battery (JSON from stm32_cmd_node) and:
|
|
||||||
- Publishes sensor_msgs/BatteryState on /saltybot/battery
|
|
||||||
- Publishes JSON alerts on /saltybot/battery/alert at threshold crossings
|
|
||||||
- Reduces speed limit at low SoC via /saltybot/speed_limit (std_msgs/Float32)
|
|
||||||
- Commands safe stop at critical SoC via /cmd_vel zero + /saltybot/arm False
|
|
||||||
- Logs history to SQLite (default: /var/log/saltybot/battery.db)
|
|
||||||
- Detects charging (current_ma < charge_detect_threshold_ma)
|
|
||||||
|
|
||||||
Alert levels (SoC thresholds):
|
|
||||||
20% WARNING — reduce max speed to 60%
|
|
||||||
10% CRITICAL — reduce max speed to 30%, log prominently
|
|
||||||
5% EMERGENCY — publish zero /cmd_vel, disarm, log + alert
|
|
||||||
|
|
||||||
SoC source priority:
|
|
||||||
1. soc_pct field from STM32 BATTERY telemetry (fuel gauge or lookup on STM32)
|
|
||||||
2. Voltage-based lookup table (3S LiPo curve) if soc_pct == 0 and voltage known
|
|
||||||
|
|
||||||
Parameters (config/battery_params.yaml):
|
|
||||||
db_path /var/log/saltybot/battery.db
|
|
||||||
cell_count 3 (number of LiPo cells)
|
|
||||||
warn_pct 20.0
|
|
||||||
critical_pct 10.0
|
|
||||||
emergency_pct 5.0
|
|
||||||
warn_speed_factor 0.6
|
|
||||||
critical_speed_factor 0.3
|
|
||||||
charge_detect_ma -100 (current_ma below this → charging)
|
|
||||||
history_retention_days 7
|
|
||||||
publish_rate_hz 1.0 (re-publish rate for /saltybot/battery)
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import json
|
|
||||||
import math
|
|
||||||
import os
|
|
||||||
import sqlite3
|
|
||||||
import time
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
import rclpy
|
|
||||||
from rclpy.node import Node
|
|
||||||
from rclpy.qos import HistoryPolicy, QoSProfile, ReliabilityPolicy
|
|
||||||
|
|
||||||
from geometry_msgs.msg import Twist
|
|
||||||
from sensor_msgs.msg import BatteryState
|
|
||||||
from std_msgs.msg import Float32, String
|
|
||||||
from std_srvs.srv import SetBool
|
|
||||||
|
|
||||||
# ── SoC voltage lookup table (3S LiPo, per-cell voltage → SoC %) ─────────────
|
|
||||||
# Based on typical LiPo discharge curve at 1C
|
|
||||||
_SOC_CURVE_PER_CELL = [
|
|
||||||
# (cell_v, soc_pct) — sorted ascending by voltage
|
|
||||||
(3.00, 0),
|
|
||||||
(3.27, 5),
|
|
||||||
(3.37, 10),
|
|
||||||
(3.53, 20),
|
|
||||||
(3.62, 30),
|
|
||||||
(3.68, 40),
|
|
||||||
(3.73, 50),
|
|
||||||
(3.78, 60),
|
|
||||||
(3.83, 70),
|
|
||||||
(3.90, 80),
|
|
||||||
(4.00, 90),
|
|
||||||
(4.20, 100),
|
|
||||||
]
|
|
||||||
|
|
||||||
# Alert level constants
|
|
||||||
_ALERT_NONE = "ok"
|
|
||||||
_ALERT_WARN = "warning"
|
|
||||||
_ALERT_CRITICAL = "critical"
|
|
||||||
_ALERT_EMERGENCY = "emergency"
|
|
||||||
|
|
||||||
|
|
||||||
def _soc_from_voltage(voltage_v: float, cell_count: int) -> int:
|
|
||||||
"""Estimate SoC from pack voltage using per-cell LiPo lookup table."""
|
|
||||||
if cell_count <= 0:
|
|
||||||
return 0
|
|
||||||
cell_v = voltage_v / cell_count
|
|
||||||
curve = _SOC_CURVE_PER_CELL
|
|
||||||
|
|
||||||
if cell_v <= curve[0][0]:
|
|
||||||
return 0
|
|
||||||
if cell_v >= curve[-1][0]:
|
|
||||||
return 100
|
|
||||||
|
|
||||||
# Linear interpolation between adjacent points
|
|
||||||
for i in range(len(curve) - 1):
|
|
||||||
v_lo, soc_lo = curve[i]
|
|
||||||
v_hi, soc_hi = curve[i + 1]
|
|
||||||
if v_lo <= cell_v <= v_hi:
|
|
||||||
t = (cell_v - v_lo) / (v_hi - v_lo)
|
|
||||||
return int(soc_lo + t * (soc_hi - soc_lo))
|
|
||||||
return 0
|
|
||||||
|
|
||||||
|
|
||||||
class BatteryNode(Node):
|
|
||||||
"""Battery management and monitoring node."""
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
|
||||||
super().__init__("battery_node")
|
|
||||||
|
|
||||||
# ── Parameters ────────────────────────────────────────────────────────
|
|
||||||
self.declare_parameter("db_path", "/var/log/saltybot/battery.db")
|
|
||||||
self.declare_parameter("cell_count", 3)
|
|
||||||
self.declare_parameter("warn_pct", 20.0)
|
|
||||||
self.declare_parameter("critical_pct", 10.0)
|
|
||||||
self.declare_parameter("emergency_pct", 5.0)
|
|
||||||
self.declare_parameter("warn_speed_factor", 0.6)
|
|
||||||
self.declare_parameter("critical_speed_factor", 0.3)
|
|
||||||
self.declare_parameter("charge_detect_ma", -100)
|
|
||||||
self.declare_parameter("history_retention_days", 7)
|
|
||||||
self.declare_parameter("publish_rate_hz", 1.0)
|
|
||||||
|
|
||||||
self._db_path = self.get_parameter("db_path").value
|
|
||||||
self._cell_count = self.get_parameter("cell_count").value
|
|
||||||
self._warn_pct = self.get_parameter("warn_pct").value
|
|
||||||
self._critical_pct = self.get_parameter("critical_pct").value
|
|
||||||
self._emergency_pct = self.get_parameter("emergency_pct").value
|
|
||||||
self._warn_speed_factor = self.get_parameter("warn_speed_factor").value
|
|
||||||
self._critical_speed_factor = self.get_parameter("critical_speed_factor").value
|
|
||||||
self._charge_detect_ma = self.get_parameter("charge_detect_ma").value
|
|
||||||
self._retention_days = self.get_parameter("history_retention_days").value
|
|
||||||
publish_rate = self.get_parameter("publish_rate_hz").value
|
|
||||||
|
|
||||||
# ── QoS ───────────────────────────────────────────────────────────────
|
|
||||||
rel_qos = QoSProfile(
|
|
||||||
reliability=ReliabilityPolicy.RELIABLE,
|
|
||||||
history=HistoryPolicy.KEEP_LAST, depth=10,
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── Publishers ────────────────────────────────────────────────────────
|
|
||||||
self._battery_pub = self.create_publisher(BatteryState, "/saltybot/battery", rel_qos)
|
|
||||||
self._alert_pub = self.create_publisher(String, "/saltybot/battery/alert", rel_qos)
|
|
||||||
self._speed_limit_pub = self.create_publisher(Float32, "/saltybot/speed_limit", rel_qos)
|
|
||||||
self._cmd_vel_pub = self.create_publisher(Twist, "/cmd_vel", rel_qos)
|
|
||||||
|
|
||||||
# ── Subscribers ───────────────────────────────────────────────────────
|
|
||||||
self._telem_sub = self.create_subscription(
|
|
||||||
String, "/saltybot/telemetry/battery",
|
|
||||||
self._on_battery_telem, rel_qos,
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── Arm service client (for emergency stop) ───────────────────────────
|
|
||||||
self._arm_client = self.create_client(SetBool, "/saltybot/arm")
|
|
||||||
|
|
||||||
# ── State ─────────────────────────────────────────────────────────────
|
|
||||||
self._last_voltage_v = 0.0
|
|
||||||
self._last_current_ma = 0
|
|
||||||
self._last_soc_pct = 100
|
|
||||||
self._last_alert = _ALERT_NONE
|
|
||||||
self._charging = False
|
|
||||||
self._emergency_fired = False # prevent repeated disarm calls
|
|
||||||
|
|
||||||
# ── SQLite history ─────────────────────────────────────────────────────
|
|
||||||
self._db: Optional[sqlite3.Connection] = None
|
|
||||||
self._init_db()
|
|
||||||
|
|
||||||
# ── Timers ────────────────────────────────────────────────────────────
|
|
||||||
self._publish_timer = self.create_timer(1.0 / publish_rate, self._publish_cb)
|
|
||||||
self._prune_timer = self.create_timer(3600.0, self._prune_history) # hourly
|
|
||||||
|
|
||||||
self.get_logger().info(
|
|
||||||
f"battery_node started — warn={self._warn_pct}% "
|
|
||||||
f"critical={self._critical_pct}% "
|
|
||||||
f"emergency={self._emergency_pct}% "
|
|
||||||
f"cells={self._cell_count}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── SQLite ────────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _init_db(self) -> None:
|
|
||||||
"""Create the SQLite DB and battery_history table if not present."""
|
|
||||||
try:
|
|
||||||
db_dir = os.path.dirname(self._db_path)
|
|
||||||
if db_dir:
|
|
||||||
os.makedirs(db_dir, exist_ok=True)
|
|
||||||
self._db = sqlite3.connect(
|
|
||||||
self._db_path, check_same_thread=False, timeout=5.0
|
|
||||||
)
|
|
||||||
self._db.execute("""
|
|
||||||
CREATE TABLE IF NOT EXISTS battery_history (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
ts REAL NOT NULL,
|
|
||||||
voltage_v REAL NOT NULL,
|
|
||||||
current_ma INTEGER NOT NULL,
|
|
||||||
soc_pct INTEGER NOT NULL,
|
|
||||||
charging INTEGER NOT NULL,
|
|
||||||
alert_level TEXT NOT NULL
|
|
||||||
)
|
|
||||||
""")
|
|
||||||
self._db.execute(
|
|
||||||
"CREATE INDEX IF NOT EXISTS idx_ts ON battery_history(ts)"
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
self.get_logger().info(f"Battery history DB: {self._db_path}")
|
|
||||||
except Exception as exc:
|
|
||||||
self.get_logger().warn(f"Cannot open battery DB ({exc}) — logging disabled")
|
|
||||||
self._db = None
|
|
||||||
|
|
||||||
def _log_to_db(self, voltage_v: float, current_ma: int,
|
|
||||||
soc_pct: int, charging: bool, alert: str) -> None:
|
|
||||||
if self._db is None:
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
self._db.execute(
|
|
||||||
"INSERT INTO battery_history(ts,voltage_v,current_ma,soc_pct,charging,alert_level) "
|
|
||||||
"VALUES (?,?,?,?,?,?)",
|
|
||||||
(time.time(), voltage_v, current_ma, soc_pct, int(charging), alert),
|
|
||||||
)
|
|
||||||
self._db.commit()
|
|
||||||
except Exception as exc:
|
|
||||||
self.get_logger().warn(f"Battery DB write error: {exc}")
|
|
||||||
|
|
||||||
def _prune_history(self) -> None:
|
|
||||||
"""Delete records older than history_retention_days."""
|
|
||||||
if self._db is None:
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
cutoff = time.time() - self._retention_days * 86400
|
|
||||||
self._db.execute("DELETE FROM battery_history WHERE ts < ?", (cutoff,))
|
|
||||||
self._db.commit()
|
|
||||||
except Exception as exc:
|
|
||||||
self.get_logger().warn(f"Battery DB prune error: {exc}")
|
|
||||||
|
|
||||||
# ── Telemetry receive ─────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _on_battery_telem(self, msg: String) -> None:
|
|
||||||
"""Parse JSON battery telemetry and update internal state."""
|
|
||||||
try:
|
|
||||||
data = json.loads(msg.data)
|
|
||||||
voltage_mv = int(data["voltage_mv"])
|
|
||||||
current_ma = int(data["current_ma"])
|
|
||||||
soc_from_fw = int(data.get("soc_pct", 0))
|
|
||||||
except (json.JSONDecodeError, KeyError, ValueError) as exc:
|
|
||||||
self.get_logger().warn(f"Bad battery telemetry: {exc}")
|
|
||||||
return
|
|
||||||
|
|
||||||
voltage_v = voltage_mv / 1000.0
|
|
||||||
charging = current_ma < self._charge_detect_ma
|
|
||||||
|
|
||||||
# SoC: prefer firmware value; fall back to voltage curve
|
|
||||||
if soc_from_fw > 0:
|
|
||||||
soc_pct = soc_from_fw
|
|
||||||
else:
|
|
||||||
soc_pct = _soc_from_voltage(voltage_v, self._cell_count)
|
|
||||||
|
|
||||||
self._last_voltage_v = voltage_v
|
|
||||||
self._last_current_ma = current_ma
|
|
||||||
self._last_soc_pct = soc_pct
|
|
||||||
self._charging = charging
|
|
||||||
|
|
||||||
# Check thresholds and act
|
|
||||||
new_alert = self._compute_alert(soc_pct)
|
|
||||||
if new_alert != self._last_alert:
|
|
||||||
self._handle_alert_change(new_alert, soc_pct, voltage_v)
|
|
||||||
|
|
||||||
# Log to SQLite
|
|
||||||
self._log_to_db(voltage_v, current_ma, soc_pct, charging, new_alert)
|
|
||||||
|
|
||||||
# ── Alert handling ────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _compute_alert(self, soc_pct: int) -> str:
|
|
||||||
"""Return the current alert level string for a given SoC."""
|
|
||||||
if soc_pct <= self._emergency_pct:
|
|
||||||
return _ALERT_EMERGENCY
|
|
||||||
if soc_pct <= self._critical_pct:
|
|
||||||
return _ALERT_CRITICAL
|
|
||||||
if soc_pct <= self._warn_pct:
|
|
||||||
return _ALERT_WARN
|
|
||||||
return _ALERT_NONE
|
|
||||||
|
|
||||||
def _handle_alert_change(self, new_alert: str, soc_pct: int, voltage_v: float) -> None:
|
|
||||||
"""Act on threshold transition."""
|
|
||||||
prev = self._last_alert
|
|
||||||
self._last_alert = new_alert
|
|
||||||
|
|
||||||
msg_text = (
|
|
||||||
f"Battery {new_alert.upper()}: {soc_pct}% SoC / {voltage_v:.2f}V "
|
|
||||||
f"(was {prev})"
|
|
||||||
)
|
|
||||||
|
|
||||||
if new_alert == _ALERT_NONE:
|
|
||||||
# Recovery (e.g., plugged in for charging)
|
|
||||||
self.get_logger().info(f"Battery OK: {soc_pct}% SoC")
|
|
||||||
self._publish_speed_limit(1.0)
|
|
||||||
|
|
||||||
elif new_alert == _ALERT_WARN:
|
|
||||||
self.get_logger().warn(msg_text)
|
|
||||||
self._publish_speed_limit(self._warn_speed_factor)
|
|
||||||
|
|
||||||
elif new_alert == _ALERT_CRITICAL:
|
|
||||||
self.get_logger().error(msg_text)
|
|
||||||
self._publish_speed_limit(self._critical_speed_factor)
|
|
||||||
|
|
||||||
elif new_alert == _ALERT_EMERGENCY:
|
|
||||||
self.get_logger().fatal(msg_text)
|
|
||||||
self._publish_speed_limit(0.0)
|
|
||||||
if not self._emergency_fired:
|
|
||||||
self._emergency_fired = True
|
|
||||||
self._execute_safe_stop()
|
|
||||||
|
|
||||||
# Publish alert message
|
|
||||||
alert_payload = json.dumps({
|
|
||||||
"level": new_alert,
|
|
||||||
"soc_pct": soc_pct,
|
|
||||||
"voltage_v": round(voltage_v, 3),
|
|
||||||
"prev_level": prev,
|
|
||||||
"ts": time.time(),
|
|
||||||
})
|
|
||||||
alert_msg = String()
|
|
||||||
alert_msg.data = alert_payload
|
|
||||||
self._alert_pub.publish(alert_msg)
|
|
||||||
|
|
||||||
def _publish_speed_limit(self, factor: float) -> None:
|
|
||||||
"""Publish normalised speed limit factor 0.0–1.0."""
|
|
||||||
msg = Float32()
|
|
||||||
msg.data = float(factor)
|
|
||||||
self._speed_limit_pub.publish(msg)
|
|
||||||
|
|
||||||
def _execute_safe_stop(self) -> None:
|
|
||||||
"""Send zero /cmd_vel and disarm the STM32."""
|
|
||||||
self.get_logger().fatal("EMERGENCY: publishing zero /cmd_vel and disarming")
|
|
||||||
# Publish zero velocity
|
|
||||||
zero_twist = Twist()
|
|
||||||
self._cmd_vel_pub.publish(zero_twist)
|
|
||||||
|
|
||||||
# Call /saltybot/arm(False) if service is available
|
|
||||||
if self._arm_client.service_is_ready():
|
|
||||||
req = SetBool.Request()
|
|
||||||
req.data = False
|
|
||||||
self._arm_client.call_async(req)
|
|
||||||
else:
|
|
||||||
self.get_logger().warn(
|
|
||||||
"/saltybot/arm service not available for emergency disarm"
|
|
||||||
)
|
|
||||||
|
|
||||||
# ── Periodic publish ──────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def _publish_cb(self) -> None:
|
|
||||||
"""Publish sensor_msgs/BatteryState at configured rate."""
|
|
||||||
if self._last_voltage_v == 0.0:
|
|
||||||
return # no telemetry received yet
|
|
||||||
|
|
||||||
msg = BatteryState()
|
|
||||||
msg.header.stamp = self.get_clock().now().to_msg()
|
|
||||||
msg.header.frame_id = "battery"
|
|
||||||
|
|
||||||
msg.voltage = self._last_voltage_v
|
|
||||||
msg.current = self._last_current_ma / 1000.0 # mA → A
|
|
||||||
msg.percentage = self._last_soc_pct / 100.0 # 0.0–1.0
|
|
||||||
|
|
||||||
# Capacity: 3S 5000 mAh pack (configurable in future)
|
|
||||||
msg.design_capacity = 5.0 # Ah
|
|
||||||
msg.capacity = float("nan")
|
|
||||||
msg.charge = float("nan")
|
|
||||||
msg.temperature = float("nan")
|
|
||||||
|
|
||||||
if self._charging:
|
|
||||||
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_CHARGING
|
|
||||||
elif self._last_soc_pct >= 99:
|
|
||||||
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_FULL
|
|
||||||
else:
|
|
||||||
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
|
|
||||||
|
|
||||||
msg.power_supply_technology = BatteryState.POWER_SUPPLY_TECHNOLOGY_LIPO
|
|
||||||
msg.power_supply_health = (
|
|
||||||
BatteryState.POWER_SUPPLY_HEALTH_GOOD
|
|
||||||
if self._last_alert in (_ALERT_NONE, _ALERT_WARN)
|
|
||||||
else BatteryState.POWER_SUPPLY_HEALTH_UNSPEC_FAILURE
|
|
||||||
)
|
|
||||||
msg.present = True
|
|
||||||
|
|
||||||
# Cell voltages (estimated, equal distribution assumption)
|
|
||||||
if self._cell_count > 0:
|
|
||||||
cell_v = self._last_voltage_v / self._cell_count
|
|
||||||
msg.cell_voltage = [cell_v] * self._cell_count
|
|
||||||
|
|
||||||
# Location field: alert level for easy monitoring
|
|
||||||
msg.location = self._last_alert
|
|
||||||
|
|
||||||
self._battery_pub.publish(msg)
|
|
||||||
|
|
||||||
# ── Lifecycle ─────────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
def destroy_node(self) -> None:
|
|
||||||
if self._db:
|
|
||||||
try:
|
|
||||||
self._db.close()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
super().destroy_node()
|
|
||||||
|
|
||||||
|
|
||||||
def main(args=None) -> None:
|
|
||||||
rclpy.init(args=args)
|
|
||||||
node = BatteryNode()
|
|
||||||
try:
|
|
||||||
rclpy.spin(node)
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
pass
|
|
||||||
finally:
|
|
||||||
node.destroy_node()
|
|
||||||
rclpy.shutdown()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
@ -1,429 +0,0 @@
|
|||||||
"""test_battery.py — Unit tests for battery_node SoC, alerts, and SQLite logging.
|
|
||||||
|
|
||||||
Tests:
|
|
||||||
- SoC lookup table: known voltages map to expected percentages
|
|
||||||
- SoC interpolation: midpoint between curve entries
|
|
||||||
- SoC clamping: below min → 0%, above max → 100%
|
|
||||||
- Alert level computation: correct level for various SoC values
|
|
||||||
- Alert threshold transitions: NONE→WARN→CRITICAL→EMERGENCY
|
|
||||||
- Speed factor assignments: correct factor per alert level
|
|
||||||
- Charging detection: current_ma < threshold → charging
|
|
||||||
- sensor_msgs/BatteryState field population
|
|
||||||
- SQLite logging: records inserted with correct fields
|
|
||||||
- SQLite pruning: old records removed
|
|
||||||
- Voltage-based SoC fallback when firmware SoC == 0
|
|
||||||
- Multi-cell voltage scaling (3S, 4S)
|
|
||||||
|
|
||||||
Run with: pytest test/test_battery.py -v
|
|
||||||
No ROS2 runtime required.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import json
|
|
||||||
import os
|
|
||||||
import sqlite3
|
|
||||||
import sys
|
|
||||||
import tempfile
|
|
||||||
import time
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
|
|
||||||
|
|
||||||
from saltybot_bridge.battery_node import (
|
|
||||||
_soc_from_voltage,
|
|
||||||
_ALERT_NONE, _ALERT_WARN, _ALERT_CRITICAL, _ALERT_EMERGENCY,
|
|
||||||
_SOC_CURVE_PER_CELL,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# SoC voltage lookup table
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
class TestSocLookup:
|
|
||||||
|
|
||||||
def test_full_voltage_returns_100(self):
|
|
||||||
"""4.20 V/cell (3-cell = 12.60 V) → 100%."""
|
|
||||||
assert _soc_from_voltage(12.60, cell_count=3) == 100
|
|
||||||
|
|
||||||
def test_below_min_returns_0(self):
|
|
||||||
"""Below 3.0 V/cell → 0%."""
|
|
||||||
assert _soc_from_voltage(8.0, cell_count=3) == 0
|
|
||||||
|
|
||||||
def test_nominal_voltage_returns_50ish(self):
|
|
||||||
"""3S at ~11.1 V (3.7 V/cell) should be near 50%."""
|
|
||||||
soc = _soc_from_voltage(11.10, cell_count=3)
|
|
||||||
assert 45 <= soc <= 55
|
|
||||||
|
|
||||||
def test_warn_threshold_voltage_single_cell(self):
|
|
||||||
"""Single-cell 3.53 V is on the 20% curve point."""
|
|
||||||
assert _soc_from_voltage(3.53, cell_count=1) == 20
|
|
||||||
|
|
||||||
def test_critical_threshold_voltage(self):
|
|
||||||
"""3.37 V/cell → 10%."""
|
|
||||||
assert _soc_from_voltage(3.37, cell_count=1) == 10
|
|
||||||
|
|
||||||
def test_emergency_threshold_voltage(self):
|
|
||||||
"""3.27 V/cell → 5%."""
|
|
||||||
assert _soc_from_voltage(3.27, cell_count=1) == 5
|
|
||||||
|
|
||||||
def test_4s_pack_full(self):
|
|
||||||
"""4S fully charged: 4 × 4.20 = 16.80 V → 100%."""
|
|
||||||
assert _soc_from_voltage(16.80, cell_count=4) == 100
|
|
||||||
|
|
||||||
def test_4s_pack_nominal(self):
|
|
||||||
"""4S nominal: 4 × 3.7 = 14.8 V → ~50%."""
|
|
||||||
soc = _soc_from_voltage(14.80, cell_count=4)
|
|
||||||
assert 45 <= soc <= 55
|
|
||||||
|
|
||||||
def test_interpolation_midpoint(self):
|
|
||||||
"""Midpoint between two curve entries should yield midpoint SoC."""
|
|
||||||
# Between (3.27, 5) and (3.37, 10): midpoint ≈ 3.32 V → ~7.5%
|
|
||||||
mid_v = (3.27 + 3.37) / 2 * 3 # 3-cell pack voltage
|
|
||||||
soc = _soc_from_voltage(mid_v, cell_count=3)
|
|
||||||
assert 5 < soc < 10
|
|
||||||
|
|
||||||
def test_exactly_at_curve_point(self):
|
|
||||||
"""Values exactly on a curve point must return the tabulated SoC."""
|
|
||||||
for cell_v, expected_soc in _SOC_CURVE_PER_CELL:
|
|
||||||
pack_v = cell_v * 3 # 3-cell
|
|
||||||
result = _soc_from_voltage(pack_v, cell_count=3)
|
|
||||||
# Allow ±1 for floating-point interpolation edge
|
|
||||||
assert abs(result - expected_soc) <= 1, \
|
|
||||||
f"At {cell_v}V/cell ({pack_v}V 3S): expected {expected_soc}%, got {result}%"
|
|
||||||
|
|
||||||
def test_zero_cell_count_returns_0(self):
|
|
||||||
assert _soc_from_voltage(12.0, cell_count=0) == 0
|
|
||||||
|
|
||||||
def test_above_max_clamped_to_100(self):
|
|
||||||
"""Overcharged (e.g., 4.5 V/cell) → clamped to 100%."""
|
|
||||||
assert _soc_from_voltage(15.0, cell_count=3) == 100
|
|
||||||
|
|
||||||
def test_zero_voltage_returns_0(self):
|
|
||||||
assert _soc_from_voltage(0.0, cell_count=3) == 0
|
|
||||||
|
|
||||||
def test_returns_integer(self):
|
|
||||||
result = _soc_from_voltage(11.1, cell_count=3)
|
|
||||||
assert isinstance(result, int)
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# Alert level computation
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
class TestAlertComputation:
|
|
||||||
"""Test _compute_alert() logic (instantiated via a minimal stub)."""
|
|
||||||
|
|
||||||
# Mirror of BatteryNode._compute_alert() for unit testing
|
|
||||||
@staticmethod
|
|
||||||
def _compute_alert(soc_pct, warn=20.0, crit=10.0, emergency=5.0):
|
|
||||||
if soc_pct <= emergency:
|
|
||||||
return _ALERT_EMERGENCY
|
|
||||||
if soc_pct <= crit:
|
|
||||||
return _ALERT_CRITICAL
|
|
||||||
if soc_pct <= warn:
|
|
||||||
return _ALERT_WARN
|
|
||||||
return _ALERT_NONE
|
|
||||||
|
|
||||||
def test_full_is_ok(self):
|
|
||||||
assert self._compute_alert(100) == _ALERT_NONE
|
|
||||||
|
|
||||||
def test_above_warn_is_ok(self):
|
|
||||||
assert self._compute_alert(21) == _ALERT_NONE
|
|
||||||
|
|
||||||
def test_at_warn_threshold(self):
|
|
||||||
assert self._compute_alert(20) == _ALERT_WARN
|
|
||||||
|
|
||||||
def test_between_warn_and_critical(self):
|
|
||||||
assert self._compute_alert(15) == _ALERT_WARN
|
|
||||||
|
|
||||||
def test_at_critical_threshold(self):
|
|
||||||
assert self._compute_alert(10) == _ALERT_CRITICAL
|
|
||||||
|
|
||||||
def test_between_critical_and_emergency(self):
|
|
||||||
assert self._compute_alert(7) == _ALERT_CRITICAL
|
|
||||||
|
|
||||||
def test_at_emergency_threshold(self):
|
|
||||||
assert self._compute_alert(5) == _ALERT_EMERGENCY
|
|
||||||
|
|
||||||
def test_below_emergency(self):
|
|
||||||
assert self._compute_alert(1) == _ALERT_EMERGENCY
|
|
||||||
|
|
||||||
def test_zero_is_emergency(self):
|
|
||||||
assert self._compute_alert(0) == _ALERT_EMERGENCY
|
|
||||||
|
|
||||||
def test_custom_thresholds(self):
|
|
||||||
assert self._compute_alert(25, warn=30, crit=15, emergency=8) == _ALERT_WARN
|
|
||||||
assert self._compute_alert(13, warn=30, crit=15, emergency=8) == _ALERT_WARN
|
|
||||||
assert self._compute_alert(12, warn=30, crit=15, emergency=8) == _ALERT_CRITICAL
|
|
||||||
|
|
||||||
def test_speed_factor_at_warn(self):
|
|
||||||
"""Speed factor at WARN must be 0 < factor < 1.0."""
|
|
||||||
factor = 0.6 # warn_speed_factor default
|
|
||||||
assert 0.0 < factor < 1.0
|
|
||||||
|
|
||||||
def test_speed_factor_at_critical(self):
|
|
||||||
factor = 0.3 # critical_speed_factor default
|
|
||||||
assert 0.0 < factor < 1.0
|
|
||||||
|
|
||||||
def test_speed_factor_at_emergency(self):
|
|
||||||
factor = 0.0 # full stop
|
|
||||||
assert factor == 0.0
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# Charging detection
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
class TestChargingDetection:
|
|
||||||
|
|
||||||
THRESHOLD_MA = -100
|
|
||||||
|
|
||||||
def _is_charging(self, current_ma):
|
|
||||||
return current_ma < self.THRESHOLD_MA
|
|
||||||
|
|
||||||
def test_discharging_positive(self):
|
|
||||||
assert not self._is_charging(500)
|
|
||||||
|
|
||||||
def test_discharging_zero(self):
|
|
||||||
assert not self._is_charging(0)
|
|
||||||
|
|
||||||
def test_just_below_threshold_is_charging(self):
|
|
||||||
assert self._is_charging(-101)
|
|
||||||
|
|
||||||
def test_at_threshold_not_charging(self):
|
|
||||||
assert not self._is_charging(-100)
|
|
||||||
|
|
||||||
def test_deeply_negative_is_charging(self):
|
|
||||||
assert self._is_charging(-2000) # fast charge
|
|
||||||
|
|
||||||
def test_regen_braking_not_charging(self):
|
|
||||||
# Small negative current during regen isn't real charging
|
|
||||||
assert not self._is_charging(-50)
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# SQLite history logging
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
class TestSqliteLogging:
|
|
||||||
"""Test the SQLite DB operations directly (no ROS2)."""
|
|
||||||
|
|
||||||
def _make_db(self):
|
|
||||||
"""Return an in-memory SQLite connection with the battery_history table."""
|
|
||||||
db = sqlite3.connect(":memory:")
|
|
||||||
db.execute("""
|
|
||||||
CREATE TABLE battery_history (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
ts REAL NOT NULL,
|
|
||||||
voltage_v REAL NOT NULL,
|
|
||||||
current_ma INTEGER NOT NULL,
|
|
||||||
soc_pct INTEGER NOT NULL,
|
|
||||||
charging INTEGER NOT NULL,
|
|
||||||
alert_level TEXT NOT NULL
|
|
||||||
)
|
|
||||||
""")
|
|
||||||
db.execute("CREATE INDEX idx_ts ON battery_history(ts)")
|
|
||||||
db.commit()
|
|
||||||
return db
|
|
||||||
|
|
||||||
def _insert(self, db, ts, voltage_v, current_ma, soc_pct, charging, alert):
|
|
||||||
db.execute(
|
|
||||||
"INSERT INTO battery_history(ts,voltage_v,current_ma,soc_pct,charging,alert_level) "
|
|
||||||
"VALUES (?,?,?,?,?,?)",
|
|
||||||
(ts, voltage_v, current_ma, soc_pct, int(charging), alert)
|
|
||||||
)
|
|
||||||
db.commit()
|
|
||||||
|
|
||||||
def test_insert_and_retrieve(self):
|
|
||||||
db = self._make_db()
|
|
||||||
self._insert(db, time.time(), 11.1, 500, 50, False, _ALERT_NONE)
|
|
||||||
row = db.execute("SELECT * FROM battery_history").fetchone()
|
|
||||||
assert row is not None
|
|
||||||
assert row[2] == pytest.approx(11.1)
|
|
||||||
assert row[3] == 500
|
|
||||||
assert row[4] == 50
|
|
||||||
assert row[5] == 0 # not charging
|
|
||||||
assert row[6] == _ALERT_NONE
|
|
||||||
|
|
||||||
def test_insert_charging_event(self):
|
|
||||||
db = self._make_db()
|
|
||||||
self._insert(db, time.time(), 12.0, -1500, 85, True, _ALERT_NONE)
|
|
||||||
row = db.execute("SELECT charging FROM battery_history").fetchone()
|
|
||||||
assert row[0] == 1 # charging = True
|
|
||||||
|
|
||||||
def test_insert_alert_event(self):
|
|
||||||
db = self._make_db()
|
|
||||||
self._insert(db, time.time(), 10.5, 300, 18, False, _ALERT_WARN)
|
|
||||||
row = db.execute("SELECT alert_level FROM battery_history").fetchone()
|
|
||||||
assert row[0] == _ALERT_WARN
|
|
||||||
|
|
||||||
def test_prune_old_records(self):
|
|
||||||
db = self._make_db()
|
|
||||||
old_ts = time.time() - 8 * 86400 # 8 days ago (>7 day retention)
|
|
||||||
new_ts = time.time()
|
|
||||||
self._insert(db, old_ts, 11.0, 100, 60, False, _ALERT_NONE)
|
|
||||||
self._insert(db, new_ts, 11.5, 200, 70, False, _ALERT_NONE)
|
|
||||||
|
|
||||||
retention_days = 7
|
|
||||||
cutoff = time.time() - retention_days * 86400
|
|
||||||
db.execute("DELETE FROM battery_history WHERE ts < ?", (cutoff,))
|
|
||||||
db.commit()
|
|
||||||
|
|
||||||
rows = db.execute("SELECT COUNT(*) FROM battery_history").fetchone()
|
|
||||||
assert rows[0] == 1 # only new record remains
|
|
||||||
|
|
||||||
def test_multiple_inserts_sequential_ids(self):
|
|
||||||
db = self._make_db()
|
|
||||||
for i in range(5):
|
|
||||||
self._insert(db, time.time() + i, 11.0 + i * 0.1, 100, 50 + i, False, _ALERT_NONE)
|
|
||||||
count = db.execute("SELECT COUNT(*) FROM battery_history").fetchone()[0]
|
|
||||||
assert count == 5
|
|
||||||
|
|
||||||
def test_db_on_disk(self):
|
|
||||||
"""Write and read a real on-disk SQLite DB."""
|
|
||||||
with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as f:
|
|
||||||
path = f.name
|
|
||||||
try:
|
|
||||||
db = sqlite3.connect(path)
|
|
||||||
db.execute("""CREATE TABLE battery_history (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
ts REAL, voltage_v REAL, current_ma INTEGER,
|
|
||||||
soc_pct INTEGER, charging INTEGER, alert_level TEXT
|
|
||||||
)""")
|
|
||||||
db.execute(
|
|
||||||
"INSERT INTO battery_history VALUES (NULL,?,?,?,?,?,?)",
|
|
||||||
(time.time(), 11.5, 500, 70, 0, _ALERT_NONE)
|
|
||||||
)
|
|
||||||
db.commit()
|
|
||||||
db.close()
|
|
||||||
|
|
||||||
# Reopen and verify
|
|
||||||
db2 = sqlite3.connect(path)
|
|
||||||
row = db2.execute("SELECT soc_pct FROM battery_history").fetchone()
|
|
||||||
assert row[0] == 70
|
|
||||||
db2.close()
|
|
||||||
finally:
|
|
||||||
os.unlink(path)
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# BatteryState field population
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
class TestBatteryStateFields:
|
|
||||||
"""Verify sensor_msgs/BatteryState field logic without a live node."""
|
|
||||||
|
|
||||||
def _make_state(self, voltage_v, current_ma, soc_pct, charging,
|
|
||||||
cell_count=3, alert=_ALERT_NONE):
|
|
||||||
"""Minimal BatteryState-like dict mirrors _publish_cb logic."""
|
|
||||||
return {
|
|
||||||
"voltage": voltage_v,
|
|
||||||
"current": current_ma / 1000.0,
|
|
||||||
"percentage": soc_pct / 100.0,
|
|
||||||
"power_supply_status": "CHARGING" if charging else (
|
|
||||||
"FULL" if soc_pct >= 99 else "DISCHARGING"
|
|
||||||
),
|
|
||||||
"location": alert,
|
|
||||||
"cell_voltage": [voltage_v / cell_count] * cell_count,
|
|
||||||
}
|
|
||||||
|
|
||||||
def test_voltage_field(self):
|
|
||||||
state = self._make_state(11.1, 500, 50, False)
|
|
||||||
assert state["voltage"] == pytest.approx(11.1)
|
|
||||||
|
|
||||||
def test_current_converted_to_amps(self):
|
|
||||||
state = self._make_state(11.1, 1500, 50, False)
|
|
||||||
assert state["current"] == pytest.approx(1.5)
|
|
||||||
|
|
||||||
def test_percentage_normalised(self):
|
|
||||||
state = self._make_state(11.1, 500, 75, False)
|
|
||||||
assert state["percentage"] == pytest.approx(0.75)
|
|
||||||
|
|
||||||
def test_charging_status(self):
|
|
||||||
state = self._make_state(12.0, -1500, 85, True)
|
|
||||||
assert state["power_supply_status"] == "CHARGING"
|
|
||||||
|
|
||||||
def test_discharging_status(self):
|
|
||||||
state = self._make_state(11.1, 500, 50, False)
|
|
||||||
assert state["power_supply_status"] == "DISCHARGING"
|
|
||||||
|
|
||||||
def test_full_status_at_99_pct(self):
|
|
||||||
state = self._make_state(12.5, 0, 99, False)
|
|
||||||
assert state["power_supply_status"] == "FULL"
|
|
||||||
|
|
||||||
def test_cell_voltages_equal_distribution(self):
|
|
||||||
state = self._make_state(12.0, 0, 80, False, cell_count=3)
|
|
||||||
assert len(state["cell_voltage"]) == 3
|
|
||||||
for cv in state["cell_voltage"]:
|
|
||||||
assert cv == pytest.approx(4.0)
|
|
||||||
|
|
||||||
def test_alert_in_location_field(self):
|
|
||||||
state = self._make_state(10.5, 300, 18, False, alert=_ALERT_WARN)
|
|
||||||
assert state["location"] == _ALERT_WARN
|
|
||||||
|
|
||||||
def test_zero_percentage_at_empty(self):
|
|
||||||
state = self._make_state(9.6, 50, 0, False)
|
|
||||||
assert state["percentage"] == pytest.approx(0.0)
|
|
||||||
|
|
||||||
def test_full_percentage(self):
|
|
||||||
state = self._make_state(12.6, 0, 100, False)
|
|
||||||
assert state["percentage"] == pytest.approx(1.0)
|
|
||||||
|
|
||||||
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
# Telemetry JSON parsing
|
|
||||||
# ═══════════════════════════════════════════════════════════════════
|
|
||||||
|
|
||||||
class TestTelemetryParsing:
|
|
||||||
"""Test the JSON parsing logic that the battery_node uses."""
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _parse(json_str: str, charge_threshold: int = -100):
|
|
||||||
data = json.loads(json_str)
|
|
||||||
voltage_mv = int(data["voltage_mv"])
|
|
||||||
current_ma = int(data["current_ma"])
|
|
||||||
soc_from_fw = int(data.get("soc_pct", 0))
|
|
||||||
voltage_v = voltage_mv / 1000.0
|
|
||||||
charging = current_ma < charge_threshold
|
|
||||||
soc_pct = soc_from_fw if soc_from_fw > 0 else \
|
|
||||||
_soc_from_voltage(voltage_v, cell_count=3)
|
|
||||||
return voltage_v, current_ma, soc_pct, charging
|
|
||||||
|
|
||||||
def test_normal_discharge_telemetry(self):
|
|
||||||
v, i, soc, chg = self._parse(
|
|
||||||
'{"voltage_mv":11100,"current_ma":1000,"soc_pct":55}'
|
|
||||||
)
|
|
||||||
assert v == pytest.approx(11.1)
|
|
||||||
assert i == 1000
|
|
||||||
assert soc == 55 # from firmware
|
|
||||||
assert not chg
|
|
||||||
|
|
||||||
def test_charging_telemetry(self):
|
|
||||||
v, i, soc, chg = self._parse(
|
|
||||||
'{"voltage_mv":12000,"current_ma":-1500,"soc_pct":80}'
|
|
||||||
)
|
|
||||||
assert v == pytest.approx(12.0)
|
|
||||||
assert i == -1500
|
|
||||||
assert chg
|
|
||||||
|
|
||||||
def test_firmware_soc_zero_uses_voltage_curve(self):
|
|
||||||
"""If soc_pct == 0, fall back to voltage curve."""
|
|
||||||
v, i, soc, chg = self._parse(
|
|
||||||
'{"voltage_mv":11100,"current_ma":500,"soc_pct":0}'
|
|
||||||
)
|
|
||||||
# soc_pct=0 from firmware → use voltage 11.1V / 3 cells → ~50%
|
|
||||||
assert 45 <= soc <= 55
|
|
||||||
|
|
||||||
def test_missing_soc_falls_back_to_voltage(self):
|
|
||||||
"""soc_pct field absent → use voltage lookup."""
|
|
||||||
v, i, soc, chg = self._parse(
|
|
||||||
'{"voltage_mv":10500,"current_ma":300}'
|
|
||||||
)
|
|
||||||
assert soc < 25 # 10.5V (3.5V/cell) → ~20%
|
|
||||||
|
|
||||||
def test_parse_error_on_missing_fields(self):
|
|
||||||
with pytest.raises(KeyError):
|
|
||||||
self._parse('{"current_ma":500}') # missing voltage_mv
|
|
||||||
Loading…
x
Reference in New Issue
Block a user