Compare commits

...

2 Commits

Author SHA1 Message Date
3e60d7552a feat: Battery voltage telemetry and LVC (Issue #613)
- Add include/lvc.h + src/lvc.c: 3-stage low voltage cutoff state machine
  WARNING  21.0V: MELODY_LOW_BATTERY buzzer, full motor power
  CRITICAL 19.8V: double-beep every 10s, 50% motor power scaling
  CUTOFF   18.6V: MELODY_ERROR one-shot, motors disabled + latched
  200mV hysteresis on recovery; CUTOFF latched until reboot
- Add JLINK_TLM_LVC (0x8B, 4 bytes): voltage_mv, percent, protection_state
  jlink_send_lvc_tlm() frame encoder in jlink.c
- Wire into main.c:
  lvc_init() at startup; lvc_tick() each 1kHz loop tick
  lvc_is_cutoff() triggers safety_arm_cancel + balance_disarm + motor_driver_estop
  lvc_get_power_scale() applied to ESC speed command (100/50/0%)
  1Hz JLINK_TLM_LVC telemetry with fuel-gauge percent field
- Add LVC thresholds to config.h (LVC_WARNING/CRITICAL/CUTOFF/HYSTERESIS_MV)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 10:14:57 -04:00
a4da93de7e feat: ROS2 bag recording manager (Issue #615)
Upgrades saltybot_bag_recorder (Issue #488) with:

- Motion-triggered auto-record: subscribes /cmd_vel, starts on non-zero
  velocity, stops after 30s idle timeout (configurable)
- Auto-split at 1 GB or 10 min via subprocess restart
- USB/NVMe storage selection: ordered priority list, picks first path
  with >= 2 GB free (/media/usb0 -> /media/usb1 -> /mnt/nvme -> ~/bags)
- Disk monitoring: warns at 70%, triggers cleanup of bags >7 days at 80%
- JSON status on /saltybot/bag_recorder/status at 1 Hz
- Services: /saltybot/bag_recorder/{start,stop,split}
  (legacy /saltybot/{start,stop}_recording kept for compatibility)
- bag_policy.py: pure-Python MotionState, DiskInfo, StorageSelector,
  BagPolicy — ROS2-free, fully unit-testable
- 76 unit tests passing

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 10:12:17 -04:00
10 changed files with 1319 additions and 256 deletions

View File

@ -262,4 +262,13 @@
#define CAN_RPM_SCALE 10 // motor_cmd to RPM: 1 cmd count = 10 RPM #define CAN_RPM_SCALE 10 // motor_cmd to RPM: 1 cmd count = 10 RPM
#define CAN_TLM_HZ 1u // JLINK_TLM_CAN_STATS transmit rate (Hz) #define CAN_TLM_HZ 1u // JLINK_TLM_CAN_STATS transmit rate (Hz)
// --- LVC: Low Voltage Cutoff (Issue #613) ---
// 3-stage undervoltage protection; voltages in mV
#define LVC_WARNING_MV 21000u // 21.0 V -- buzzer alert, full power
#define LVC_CRITICAL_MV 19800u // 19.8 V -- 50% motor power reduction
#define LVC_CUTOFF_MV 18600u // 18.6 V -- motors disabled, latch until reboot
#define LVC_HYSTERESIS_MV 200u // recovery hysteresis to prevent threshold chatter
#define LVC_TLM_HZ 1u // JLINK_TLM_LVC transmit rate (Hz)
#endif // CONFIG_H #endif // CONFIG_H

View File

@ -50,6 +50,7 @@
* 0x87 FAULT_LOG - jlink_tlm_fault_log_t (20 bytes), sent on boot + FAULT_LOG_GET (Issue #565) * 0x87 FAULT_LOG - jlink_tlm_fault_log_t (20 bytes), sent on boot + FAULT_LOG_GET (Issue #565)
* 0x88 SLOPE - jlink_tlm_slope_t (4 bytes), sent at SLOPE_TLM_HZ (Issue #600) * 0x88 SLOPE - jlink_tlm_slope_t (4 bytes), sent at SLOPE_TLM_HZ (Issue #600)
* 0x89 CAN_STATS - jlink_tlm_can_stats_t (16 bytes), sent at CAN_TLM_HZ + CAN_STATS_GET (Issue #597) * 0x89 CAN_STATS - jlink_tlm_can_stats_t (16 bytes), sent at CAN_TLM_HZ + CAN_STATS_GET (Issue #597)
* 0x8B LVC - jlink_tlm_lvc_t (4 bytes), sent at LVC_TLM_HZ (Issue #613)
* *
* Priority: CRSF RC always takes precedence. Jetson steer/speed only applied * Priority: CRSF RC always takes precedence. Jetson steer/speed only applied
* when mode_manager_active() == MODE_AUTONOMOUS (CH6 high). In RC_MANUAL and * when mode_manager_active() == MODE_AUTONOMOUS (CH6 high). In RC_MANUAL and
@ -93,6 +94,7 @@
#define JLINK_TLM_FAULT_LOG 0x87u /* jlink_tlm_fault_log_t (20 bytes, Issue #565) */ #define JLINK_TLM_FAULT_LOG 0x87u /* jlink_tlm_fault_log_t (20 bytes, Issue #565) */
#define JLINK_TLM_SLOPE 0x88u /* jlink_tlm_slope_t (4 bytes, Issue #600) */ #define JLINK_TLM_SLOPE 0x88u /* jlink_tlm_slope_t (4 bytes, Issue #600) */
#define JLINK_TLM_CAN_STATS 0x89u /* jlink_tlm_can_stats_t (16 bytes, Issue #597) */ #define JLINK_TLM_CAN_STATS 0x89u /* jlink_tlm_can_stats_t (16 bytes, Issue #597) */
#define JLINK_TLM_LVC 0x8Bu /* jlink_tlm_lvc_t (4 bytes, Issue #613) */
/* ---- Telemetry STATUS payload (20 bytes, packed) ---- */ /* ---- Telemetry STATUS payload (20 bytes, packed) ---- */
typedef struct __attribute__((packed)) { typedef struct __attribute__((packed)) {
@ -204,6 +206,14 @@ typedef struct __attribute__((packed)) {
int16_t vel1_rpm; /* node 1 current velocity (RPM) */ int16_t vel1_rpm; /* node 1 current velocity (RPM) */
} jlink_tlm_can_stats_t; /* 16 bytes */ } jlink_tlm_can_stats_t; /* 16 bytes */
/* ---- Telemetry LVC payload (4 bytes, packed) Issue #613 ---- */
/* Sent at LVC_TLM_HZ (1 Hz); reports battery voltage and LVC protection state. */
typedef struct __attribute__((packed)) {
uint16_t voltage_mv; /* battery voltage (mV) */
uint8_t percent; /* 0-100: fuel gauge within CUTOFF..WARNING; 255=unknown */
uint8_t protection_state; /* LvcState: 0=NORMAL,1=WARNING,2=CRITICAL,3=CUTOFF */
} jlink_tlm_lvc_t; /* 4 bytes */
/* ---- Volatile state (read from main loop) ---- */ /* ---- Volatile state (read from main loop) ---- */
typedef struct { typedef struct {
/* Drive command - updated on JLINK_CMD_DRIVE */ /* Drive command - updated on JLINK_CMD_DRIVE */
@ -322,4 +332,10 @@ void jlink_send_slope_tlm(const jlink_tlm_slope_t *tlm);
*/ */
void jlink_send_can_stats(const jlink_tlm_can_stats_t *tlm); void jlink_send_can_stats(const jlink_tlm_can_stats_t *tlm);
/*
* jlink_send_lvc_tlm(tlm) - transmit JLINK_TLM_LVC (0x8B) frame
* (10 bytes total) at LVC_TLM_HZ (1 Hz). Issue #613.
*/
void jlink_send_lvc_tlm(const jlink_tlm_lvc_t *tlm);
#endif /* JLINK_H */ #endif /* JLINK_H */

39
include/lvc.h Normal file
View File

@ -0,0 +1,39 @@
#ifndef LVC_H
#define LVC_H
#include <stdint.h>
#include <stdbool.h>
/*
* lvc.h -- Low Voltage Cutoff (LVC) protection (Issue #613)
*
* 3-stage battery voltage protection using battery_read_mv():
*
* LVC_WARNING (21.0 V) -- periodic buzzer alert; full power maintained
* LVC_CRITICAL (19.8 V) -- faster buzzer; motor commands scaled to 50%
* LVC_CUTOFF (18.6 V) -- error buzzer; motors disabled; latched until reboot
*
* Recovery uses LVC_HYSTERESIS_MV to prevent threshold chatter.
* CUTOFF is one-way: once latched, only a power-cycle clears it.
*
* Integration:
* lvc_init() -- call once during system init
* lvc_tick(now_ms, vbat_mv) -- call each main loop tick (1 kHz)
* lvc_get_power_scale() -- returns 0/50/100; apply to motor speed
* lvc_is_cutoff() -- true when motors must be disabled
*/
typedef enum {
LVC_NORMAL = 0, /* Vbat >= WARNING threshold */
LVC_WARNING = 1, /* Vbat < 21.0 V -- alert only */
LVC_CRITICAL = 2, /* Vbat < 19.8 V -- 50% power */
LVC_CUTOFF = 3, /* Vbat < 18.6 V -- motors off */
} LvcState;
void lvc_init(void);
void lvc_tick(uint32_t now_ms, uint32_t vbat_mv);
LvcState lvc_get_state(void);
uint8_t lvc_get_power_scale(void); /* 100 = full, 50 = critical, 0 = cutoff */
bool lvc_is_cutoff(void);
#endif /* LVC_H */

View File

@ -1,28 +1,33 @@
# bag_recorder.yaml — Bag recording manager config (Issue #615).
bag_recorder: bag_recorder:
ros__parameters: ros__parameters:
# Path where bags are stored (Issue #488: mission logging) storage_paths:
bag_dir: '~/saltybot-data/bags' - '/media/usb0'
- '/media/usb1'
# Topics to record for mission logging (Issue #488) - '/mnt/nvme/saltybot-bags'
- '~/saltybot-data/bags'
topics: topics:
- '/scan' - '/scan'
- '/cmd_vel' - '/cmd_vel'
- '/odom' - '/odom'
- '/tf' - '/tf'
- '/camera/color/image_raw/compressed' - '/tf_static'
- '/saltybot/pose/fused_cov'
- '/saltybot/diagnostics' - '/saltybot/diagnostics'
- '/saltybot/sensor_health'
# Rotation interval: save new bag every N minutes (Issue #488: 30 min) - '/saltybot/battery'
buffer_duration_minutes: 30 - '/saltybot/motor_daemon/status'
- '/camera/color/image_raw/compressed'
# Storage management (Issue #488: FIFO 20GB limit) - '/camera/depth/image_rect_raw/compressed'
storage_ttl_days: 7 # Remove bags older than 7 days - '/imu/data'
max_storage_gb: 20 # Enforce 20GB FIFO quota motion_trigger: true
idle_timeout_s: 30.0
# Storage format (Issue #488: prefer MCAP) split_size_gb: 1.0
storage_format: 'mcap' # Options: mcap, sqlite3 split_duration_min: 10.0
storage_format: 'mcap'
# NAS sync (optional) warn_disk_pct: 70.0
enable_rsync: false cleanup_disk_pct: 80.0
rsync_destination: '' cleanup_age_days: 7
# rsync_destination: 'user@nas:/path/to/backups/' min_free_gb: 2.0
status_hz: 1.0
check_hz: 0.033

View File

@ -0,0 +1,293 @@
"""bag_policy.py — Pure-Python recording policy logic (Issue #615).
No ROS2 dependencies importable and fully unit-testable without a live
ROS2 install.
Classes
MotionState tracks /cmd_vel activity and idle timeout
DiskInfo disk-usage snapshot with threshold helpers
StorageSelector picks the best storage path from a priority list
BagPolicy decides when to start/stop/split/clean up
"""
from __future__ import annotations
import math
import shutil
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Tuple
# ── Motion tracking ───────────────────────────────────────────────────────────
class MotionState:
"""Tracks /cmd_vel activity and determines idle timeout.
Usage::
ms = MotionState(idle_timeout_s=30.0)
ms.update(linear_x=0.5, angular_z=0.0, now=time.monotonic())
if ms.should_start_recording(now):
...
if ms.should_stop_recording(now):
...
"""
def __init__(self, idle_timeout_s: float = 30.0) -> None:
if idle_timeout_s <= 0:
raise ValueError(f"idle_timeout_s must be > 0, got {idle_timeout_s!r}")
self._idle_timeout = idle_timeout_s
self._last_motion_t: Optional[float] = None
self._ever_moved: bool = False
# ── Mutators ───────────────────────────────────────────────────────────
def update(self, linear_x: float, angular_z: float, now: float) -> None:
"""Feed the latest cmd_vel values. Non-zero → reset idle timer."""
if abs(linear_x) > 1e-3 or abs(angular_z) > 1e-3:
self._last_motion_t = now
self._ever_moved = True
def reset(self) -> None:
"""Forget all motion history (e.g. after a recording session ends)."""
self._last_motion_t = None
self._ever_moved = False
# ── Queries ────────────────────────────────────────────────────────────
def idle_seconds(self, now: float) -> float:
"""Seconds since last non-zero cmd_vel (inf if never moved)."""
if self._last_motion_t is None:
return math.inf
return now - self._last_motion_t
def is_idle(self, now: float) -> bool:
"""True if idle_timeout has elapsed since last non-zero cmd_vel."""
return self.idle_seconds(now) >= self._idle_timeout
def is_moving(self, now: float) -> bool:
"""True if non-zero cmd_vel was received within the idle window."""
return self._ever_moved and not self.is_idle(now)
def should_start_recording(self, now: float) -> bool:
"""True when motion is detected (used to trigger auto-record start)."""
return self._ever_moved and not self.is_idle(now)
def should_stop_recording(self, now: float) -> bool:
"""True when idle timeout has elapsed after motion (trigger auto-stop)."""
return self._ever_moved and self.is_idle(now)
@property
def ever_moved(self) -> bool:
return self._ever_moved
@property
def idle_timeout_s(self) -> float:
return self._idle_timeout
# ── Disk information ──────────────────────────────────────────────────────────
@dataclass
class DiskInfo:
"""Snapshot of disk usage for a single mount point.
Construct with :meth:`DiskInfo.from_path` for live data or build
directly from known values for unit tests.
"""
path: str
total_bytes: int
used_bytes: int
free_bytes: int
@classmethod
def from_path(cls, path: str) -> "DiskInfo":
"""Read live disk usage from the OS."""
u = shutil.disk_usage(path)
return cls(path=path, total_bytes=u.total,
used_bytes=u.used, free_bytes=u.free)
# ── Derived properties ─────────────────────────────────────────────────
@property
def used_pct(self) -> float:
"""Percentage of disk space used (0100)."""
if self.total_bytes == 0:
return 100.0
return self.used_bytes / self.total_bytes * 100.0
@property
def free_pct(self) -> float:
return 100.0 - self.used_pct
@property
def free_gb(self) -> float:
return self.free_bytes / (1024 ** 3)
@property
def used_gb(self) -> float:
return self.used_bytes / (1024 ** 3)
@property
def total_gb(self) -> float:
return self.total_bytes / (1024 ** 3)
# ── Threshold helpers ──────────────────────────────────────────────────
def exceeds_pct(self, threshold_pct: float) -> bool:
return self.used_pct >= threshold_pct
def has_min_free(self, min_free_gb: float) -> bool:
return self.free_gb >= min_free_gb
def summary(self) -> str:
return (f"{self.path}: {self.used_gb:.1f}/{self.total_gb:.1f} GB "
f"({self.used_pct:.1f}% used, {self.free_gb:.1f} GB free)")
# ── Storage path selection ────────────────────────────────────────────────────
class StorageSelector:
"""Selects the best storage path from a priority-ordered list.
Paths are tried in order; the first accessible path with at least
``min_free_gb`` of free space is returned. Typical priority order:
USB NVMe internal home directory.
Example::
sel = StorageSelector(['/media/usb0', '/mnt/nvme', '~/bags'])
path = sel.select() # e.g. '/media/usb0'
"""
DEFAULT_MIN_FREE_GB = 2.0
def __init__(self,
paths: List[str],
min_free_gb: float = DEFAULT_MIN_FREE_GB) -> None:
if not paths:
raise ValueError("paths list must not be empty")
self._paths = [str(Path(p).expanduser()) for p in paths]
self._min_free_gb = min_free_gb
def select(self) -> Optional[str]:
"""Return the best available storage path, or None if none qualify."""
for path in self._paths:
p = Path(path)
if not p.exists():
continue
try:
info = DiskInfo.from_path(path)
if info.has_min_free(self._min_free_gb):
return path
except OSError:
continue
return None
def select_or_default(self, default: str) -> str:
"""Return the best path, falling back to ``default`` if none qualify."""
return self.select() or default
def disk_infos(self) -> List[DiskInfo]:
"""Return DiskInfo for all accessible paths (for status publishing)."""
result = []
for path in self._paths:
if Path(path).exists():
try:
result.append(DiskInfo.from_path(path))
except OSError:
pass
return result
# ── Recording policy ──────────────────────────────────────────────────────────
class BagPolicy:
"""Encapsulates all recording policy thresholds (Issue #615).
Separates business logic from ROS2 so it can be unit-tested without a
live ROS2 install.
Parameters
split_size_gb float Split bag when it reaches this size. Default 1.0
split_duration_min float Split bag after this many minutes. Default 10.0
cleanup_age_days int Bags older than this are cleanup candidates. Default 7
cleanup_disk_pct float Trigger cleanup when disk exceeds this %. Default 80.0
warn_disk_pct float Log a warning when disk exceeds this %. Default 70.0
min_free_gb float Minimum free space to start recording. Default 2.0
"""
def __init__(
self,
split_size_gb: float = 1.0,
split_duration_min: float = 10.0,
cleanup_age_days: int = 7,
cleanup_disk_pct: float = 80.0,
warn_disk_pct: float = 70.0,
min_free_gb: float = 2.0,
) -> None:
if split_size_gb <= 0:
raise ValueError(f"split_size_gb must be > 0, got {split_size_gb!r}")
if split_duration_min <= 0:
raise ValueError(f"split_duration_min must be > 0, got {split_duration_min!r}")
if cleanup_age_days < 1:
raise ValueError(f"cleanup_age_days must be >= 1, got {cleanup_age_days!r}")
if not (0 < warn_disk_pct < cleanup_disk_pct <= 100):
raise ValueError(
f"Must have 0 < warn_disk_pct({warn_disk_pct}) "
f"< cleanup_disk_pct({cleanup_disk_pct}) <= 100"
)
self.split_size_bytes = int(split_size_gb * 1024 ** 3)
self.split_duration_s = split_duration_min * 60.0
self.cleanup_age_days = cleanup_age_days
self.cleanup_disk_pct = cleanup_disk_pct
self.warn_disk_pct = warn_disk_pct
self.min_free_gb = min_free_gb
# ── Split decision ─────────────────────────────────────────────────────
def needs_split(
self,
bag_size_bytes: int,
bag_duration_s: float,
) -> Tuple[bool, str]:
"""Return (True, reason) if the active bag segment should be split."""
if bag_size_bytes >= self.split_size_bytes:
size_gb = bag_size_bytes / 1024 ** 3
lim_gb = self.split_size_bytes / 1024 ** 3
return True, f"size {size_gb:.2f} GB ≥ {lim_gb:.1f} GB"
if bag_duration_s >= self.split_duration_s:
return True, (
f"duration {bag_duration_s / 60:.1f} min "
f"{self.split_duration_s / 60:.0f} min"
)
return False, ""
# ── Disk thresholds ────────────────────────────────────────────────────
def needs_cleanup(self, disk: DiskInfo) -> bool:
"""True if disk is full enough to warrant deleting old bags."""
return disk.exceeds_pct(self.cleanup_disk_pct)
def should_warn_disk(self, disk: DiskInfo) -> bool:
"""True if disk usage deserves a warning log."""
return disk.exceeds_pct(self.warn_disk_pct)
def can_record(self, disk: DiskInfo) -> bool:
"""True if there is enough free space to start/continue recording."""
return disk.has_min_free(self.min_free_gb)
# ── Age-based cleanup ──────────────────────────────────────────────────
def bag_is_expired(self, bag_age_days: float) -> bool:
"""True if a bag is old enough to be a cleanup candidate."""
return bag_age_days >= self.cleanup_age_days
def bag_age_days(self, bag_mtime: float, now: float) -> float:
"""Compute bag age in fractional days from mtime and current time."""
return (now - bag_mtime) / 86400.0

View File

@ -1,265 +1,383 @@
#!/usr/bin/env python3 """bag_recorder_node.py — ROS2 bag recording manager (Issue #615).
import os Upgrades the Issue #488 basic recorder with:
import signal Motion-triggered auto-record (/cmd_vel non-zero start, 30s idle stop)
import shutil Auto-split at 1 GB or 10 min
import subprocess USB/NVMe storage path selection with disk monitoring
import threading Disk-usage-triggered cleanup (>80% delete bags >7 days old)
import time JSON status topic at 1 Hz
Services (Trigger):
/saltybot/bag_recorder/start /saltybot/bag_recorder/stop
/saltybot/bag_recorder/split (plus legacy /saltybot/start_recording etc.)
Topics:
SUB /cmd_vel (geometry_msgs/Twist)
PUB /saltybot/bag_recorder/status (std_msgs/String, JSON)
"""
from __future__ import annotations
import json, os, shutil, signal, subprocess, threading, time
from datetime import datetime
from pathlib import Path from pathlib import Path
from datetime import datetime, timedelta
from typing import List, Optional from typing import List, Optional
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from std_srvs.srv import Trigger from geometry_msgs.msg import Twist
from std_msgs.msg import String from std_msgs.msg import String
from std_srvs.srv import Trigger
from saltybot_bag_recorder.bag_policy import BagPolicy, DiskInfo, MotionState, StorageSelector
class _BagSession:
"""Tracks one recording session (possibly multiple split segments)."""
def __init__(self, bag_dir: Path, session_name: str) -> None:
self.bag_dir = bag_dir
self.session_name = session_name
self.start_time = time.monotonic()
self.split_count = 0
self.proc: Optional[subprocess.Popen] = None
self.segment_start = time.monotonic()
self._lock = threading.Lock()
def segment_name(self) -> str:
return f"{self.session_name}_s{self.split_count:03d}"
def segment_path(self) -> Path:
return self.bag_dir / self.segment_name()
def segment_size_bytes(self) -> int:
p = self.segment_path()
if not p.exists():
return 0
return sum(f.stat().st_size for f in p.rglob("*") if f.is_file())
def segment_duration_s(self) -> float:
return time.monotonic() - self.segment_start
def session_duration_s(self) -> float:
return time.monotonic() - self.start_time
def is_alive(self) -> bool:
with self._lock:
return self.proc is not None and self.proc.poll() is None
def stop_proc(self, timeout: float = 5.0) -> None:
with self._lock:
if self.proc is None:
return
try:
self.proc.send_signal(signal.SIGINT)
self.proc.wait(timeout=timeout)
except subprocess.TimeoutExpired:
self.proc.kill()
self.proc.wait()
except OSError:
pass
self.proc = None
class BagRecorderNode(Node): class BagRecorderNode(Node):
"""ROS2 bag recording service for mission logging (Issue #488).""" """ROS2 bag recording manager with motion trigger and disk monitoring (Issue #615)."""
def __init__(self): DEFAULT_TOPICS: List[str] = [
super().__init__('saltybot_bag_recorder') "/scan", "/cmd_vel", "/odom", "/tf", "/tf_static",
"/saltybot/pose/fused_cov", "/saltybot/diagnostics",
"/saltybot/sensor_health", "/saltybot/battery",
"/saltybot/motor_daemon/status",
"/camera/color/image_raw/compressed",
"/camera/depth/image_rect_raw/compressed",
"/imu/data",
]
# Configuration (Issue #488: mission logging) DEFAULT_STORAGE_PATHS: List[str] = [
default_bag_dir = str(Path.home() / 'saltybot-data' / 'bags') "/media/usb0", "/media/usb1",
self.declare_parameter('bag_dir', default_bag_dir) "/mnt/nvme/saltybot-bags",
self.declare_parameter('topics', [ "~/saltybot-data/bags",
'/scan', ]
'/cmd_vel',
'/odom',
'/tf',
'/camera/color/image_raw/compressed',
'/saltybot/diagnostics'
])
self.declare_parameter('buffer_duration_minutes', 30)
self.declare_parameter('storage_ttl_days', 7)
self.declare_parameter('max_storage_gb', 20)
self.declare_parameter('enable_rsync', False)
self.declare_parameter('rsync_destination', '')
self.declare_parameter('storage_format', 'mcap')
self.bag_dir = Path(self.get_parameter('bag_dir').value) def __init__(self) -> None:
self.topics = self.get_parameter('topics').value super().__init__("bag_recorder")
self.buffer_duration = self.get_parameter('buffer_duration_minutes').value * 60
self.storage_ttl_days = self.get_parameter('storage_ttl_days').value
self.max_storage_gb = self.get_parameter('max_storage_gb').value
self.enable_rsync = self.get_parameter('enable_rsync').value
self.rsync_destination = self.get_parameter('rsync_destination').value
self.storage_format = self.get_parameter('storage_format').value
self.bag_dir.mkdir(parents=True, exist_ok=True) self.declare_parameter("storage_paths", self.DEFAULT_STORAGE_PATHS)
self.declare_parameter("topics", self.DEFAULT_TOPICS)
self.declare_parameter("motion_trigger", True)
self.declare_parameter("idle_timeout_s", 30.0)
self.declare_parameter("split_size_gb", 1.0)
self.declare_parameter("split_duration_min", 10.0)
self.declare_parameter("storage_format", "mcap")
self.declare_parameter("warn_disk_pct", 70.0)
self.declare_parameter("cleanup_disk_pct", 80.0)
self.declare_parameter("cleanup_age_days", 7)
self.declare_parameter("min_free_gb", 2.0)
self.declare_parameter("status_hz", 1.0)
self.declare_parameter("check_hz", 0.033)
# Recording state self._topics = list(self.get_parameter("topics").value)
self.is_recording = False self._motion_trigger = self.get_parameter("motion_trigger").value
self.current_bag_process = None self._storage_fmt = self.get_parameter("storage_format").value
self.current_bag_name = None
self.buffer_bags: List[str] = []
self.recording_lock = threading.Lock()
# Services self._policy = BagPolicy(
self.save_service = self.create_service(Trigger, '/saltybot/save_bag', self.save_bag_callback) split_size_gb = self.get_parameter("split_size_gb").value,
self.start_service = self.create_service(Trigger, '/saltybot/start_recording', self.start_recording_callback) split_duration_min = self.get_parameter("split_duration_min").value,
self.stop_service = self.create_service(Trigger, '/saltybot/stop_recording', self.stop_recording_callback) cleanup_age_days = int(self.get_parameter("cleanup_age_days").value),
cleanup_disk_pct = self.get_parameter("cleanup_disk_pct").value,
# Watchdog to handle crash recovery warn_disk_pct = self.get_parameter("warn_disk_pct").value,
self.setup_signal_handlers() min_free_gb = self.get_parameter("min_free_gb").value,
)
# Start recording self._motion = MotionState(
self.start_recording() idle_timeout_s = self.get_parameter("idle_timeout_s").value,
)
# Periodic maintenance (cleanup old bags, manage storage) self._selector = StorageSelector(
self.maintenance_timer = self.create_timer(300.0, self.maintenance_callback) paths = list(self.get_parameter("storage_paths").value),
min_free_gb = self.get_parameter("min_free_gb").value,
self.get_logger().info(
f'Bag recorder initialized: {self.bag_dir}, format={self.storage_format}, '
f'buffer={self.buffer_duration}s, max={self.max_storage_gb}GB, topics={len(self.topics)}'
) )
def setup_signal_handlers(self): self._bag_dir: Optional[Path] = None
"""Setup signal handlers for graceful shutdown and crash recovery.""" self._session: Optional[_BagSession] = None
def signal_handler(sig, frame): self._lock: threading.Lock = threading.Lock()
self.get_logger().warn(f'Signal {sig} received, saving current bag') self._recording: bool = False
self.stop_recording(save=True)
self.cleanup()
rclpy.shutdown()
signal.signal(signal.SIGINT, signal_handler) self._status_pub = self.create_publisher(
signal.signal(signal.SIGTERM, signal_handler) String, "/saltybot/bag_recorder/status", 10)
def start_recording(self): self.create_service(Trigger, "/saltybot/bag_recorder/start", self._svc_start)
"""Start bag recording in the background.""" self.create_service(Trigger, "/saltybot/bag_recorder/stop", self._svc_stop)
with self.recording_lock: self.create_service(Trigger, "/saltybot/bag_recorder/split", self._svc_split)
if self.is_recording: # Legacy (Issue #488 compatibility)
self.create_service(Trigger, "/saltybot/start_recording", self._svc_start)
self.create_service(Trigger, "/saltybot/stop_recording", self._svc_stop)
self.create_service(Trigger, "/saltybot/save_bag", self._svc_split)
if self._motion_trigger:
self.create_subscription(Twist, "/cmd_vel", self._on_cmd_vel, 10)
hz = self.get_parameter("status_hz").value
self.create_timer(1.0 / hz, self._publish_status)
chz = self.get_parameter("check_hz").value
self.create_timer(1.0 / chz, self._periodic_check)
self.create_timer(60.0, self._disk_maintenance)
self.get_logger().info(
f"BagRecorder ready | motion={self._motion_trigger} "
f"idle={self._motion.idle_timeout_s:.0f}s "
f"split={self._policy.split_size_bytes//1024**3}GB/"
f"{int(self._policy.split_duration_s//60)}min "
f"fmt={self._storage_fmt} topics={len(self._topics)}"
)
def _on_cmd_vel(self, msg: Twist) -> None:
now = time.monotonic()
self._motion.update(msg.linear.x, msg.angular.z, now)
with self._lock:
if not self._recording and self._motion.should_start_recording(now):
self.get_logger().info("Motion detected — auto-starting bag recording")
self._start_recording_locked()
def _svc_start(self, _req, resp):
with self._lock:
if self._recording:
resp.success = False
resp.message = f"Already recording: {self._session.session_name}"
return resp
ok, msg = self._start_recording_locked()
resp.success = ok
resp.message = msg
return resp
def _svc_stop(self, _req, resp):
with self._lock:
if not self._recording:
resp.success = False
resp.message = "Not recording"
return resp
_, ok, msg = self._stop_recording_locked()
resp.success = ok
resp.message = msg
return resp
def _svc_split(self, _req, resp):
with self._lock:
if not self._recording:
resp.success = False
resp.message = "Not recording — nothing to split"
return resp
ok, msg = self._split_locked()
resp.success = ok
resp.message = msg
return resp
def _periodic_check(self) -> None:
now = time.monotonic()
with self._lock:
if self._recording and self._motion_trigger:
if self._motion.should_stop_recording(now):
self.get_logger().info(
f"Idle {self._motion.idle_seconds(now):.0f}s — auto-stopping")
self._stop_recording_locked()
return
if self._recording and self._session is not None:
size = self._session.segment_size_bytes()
dur = self._session.segment_duration_s()
split, reason = self._policy.needs_split(size, dur)
if split:
self.get_logger().info(f"Auto-split: {reason}")
self._split_locked()
def _disk_maintenance(self) -> None:
if self._bag_dir is None:
sel = self._selector.select()
if sel is None:
return return
self._bag_dir = Path(sel)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
self.current_bag_name = f'saltybot_{timestamp}'
bag_path = self.bag_dir / self.current_bag_name
try:
# Build rosbag2 record command
cmd = ['ros2', 'bag', 'record', '--output', str(bag_path), '--storage', self.storage_format]
# Add compression for sqlite3 format
if self.storage_format == 'sqlite3':
cmd.extend(['--compression-format', 'zstd', '--compression-mode', 'file'])
# Add topics (required for mission logging)
if self.topics and self.topics[0]:
cmd.extend(self.topics)
else:
cmd.extend(['/scan', '/cmd_vel', '/odom', '/tf', '/camera/color/image_raw/compressed', '/saltybot/diagnostics'])
self.current_bag_process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.is_recording = True
self.buffer_bags.append(self.current_bag_name)
self.get_logger().info(f'Started recording: {self.current_bag_name}')
except Exception as e:
self.get_logger().error(f'Failed to start recording: {e}')
def save_bag_callback(self, request, response):
"""Service callback to manually trigger bag save."""
try: try:
self.stop_recording(save=True) disk = DiskInfo.from_path(str(self._bag_dir))
self.start_recording() except OSError as exc:
response.success = True self.get_logger().warn(f"Disk check failed: {exc}")
response.message = f'Saved: {self.current_bag_name}'
self.get_logger().info(response.message)
except Exception as e:
response.success = False
response.message = f'Failed to save bag: {e}'
self.get_logger().error(response.message)
return response
def start_recording_callback(self, request, response):
"""Service callback to start recording."""
if self.is_recording:
response.success = False
response.message = 'Recording already in progress'
return response
try:
self.start_recording()
response.success = True
response.message = f'Recording started: {self.current_bag_name}'
self.get_logger().info(response.message)
except Exception as e:
response.success = False
response.message = f'Failed to start recording: {e}'
self.get_logger().error(response.message)
return response
def stop_recording_callback(self, request, response):
"""Service callback to stop recording."""
if not self.is_recording:
response.success = False
response.message = 'No recording in progress'
return response
try:
self.stop_recording(save=True)
response.success = True
response.message = f'Recording stopped and saved: {self.current_bag_name}'
self.get_logger().info(response.message)
except Exception as e:
response.success = False
response.message = f'Failed to stop recording: {e}'
self.get_logger().error(response.message)
return response
def stop_recording(self, save: bool = False):
"""Stop the current bag recording."""
with self.recording_lock:
if not self.is_recording or not self.current_bag_process:
return
try:
self.current_bag_process.send_signal(signal.SIGINT)
self.current_bag_process.wait(timeout=5.0)
except subprocess.TimeoutExpired:
self.get_logger().warn(f'Force terminating {self.current_bag_name}')
self.current_bag_process.kill()
self.current_bag_process.wait()
except Exception as e:
self.get_logger().error(f'Error stopping recording: {e}')
self.is_recording = False
self.get_logger().info(f'Stopped recording: {self.current_bag_name}')
if save:
self.apply_compression()
def apply_compression(self):
"""Compress the current bag using zstd."""
if not self.current_bag_name:
return return
bag_path = self.bag_dir / self.current_bag_name if self._policy.should_warn_disk(disk):
try: self.get_logger().warn(
tar_path = f'{bag_path}.tar.zst' f"Disk {disk.used_pct:.1f}% > {self._policy.warn_disk_pct:.0f}%")
if bag_path.exists(): if self._policy.needs_cleanup(disk):
cmd = ['tar', '-I', 'zstd', '-cf', tar_path, '-C', str(self.bag_dir), self.current_bag_name] self.get_logger().warn(
subprocess.run(cmd, check=True, capture_output=True, timeout=60) f"Disk {disk.used_pct:.1f}% >= {self._policy.cleanup_disk_pct:.0f}% — cleaning")
self.get_logger().info(f'Compressed: {tar_path}') self._cleanup_expired_bags()
except Exception as e:
self.get_logger().warn(f'Compression skipped: {e}')
def maintenance_callback(self): def _start_recording_locked(self):
"""Periodic maintenance: cleanup old bags and manage storage.""" sel = self._selector.select()
self.cleanup_expired_bags() if sel is None:
self.enforce_storage_quota() msg = f"No storage path with >={self._policy.min_free_gb:.1f}GB free"
if self.enable_rsync and self.rsync_destination: self.get_logger().error(msg)
self.rsync_bags() return False, msg
self._bag_dir = Path(sel)
self._bag_dir.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
self._session = _BagSession(self._bag_dir, f"saltybot_{ts}")
ok, msg = self._launch_ros2bag_locked()
if ok:
self._recording = True
self.get_logger().info(
f"Recording: {self._session.segment_name()} -> {self._bag_dir}")
return ok, msg
def cleanup_expired_bags(self): def _stop_recording_locked(self):
"""Remove bags older than TTL.""" if self._session is None:
return None, False, "No active session"
name = self._session.session_name
self._session.stop_proc()
self._recording = False
self.get_logger().info(f"Stopped: {name}")
return name, True, f"Stopped: {name}"
def _split_locked(self):
if self._session is None:
return False, "No active session"
old = self._session.segment_name()
self._session.stop_proc()
self._session.split_count += 1
self._session.segment_start = time.monotonic()
ok, msg = self._launch_ros2bag_locked()
if ok:
self.get_logger().info(f"Split: {old} -> {self._session.segment_name()}")
return True, f"Split: {old} -> {self._session.segment_name()}"
return ok, msg
def _launch_ros2bag_locked(self):
seg_path = self._session.segment_path()
cmd = ["ros2", "bag", "record",
"--output", str(seg_path),
"--storage", self._storage_fmt]
if self._storage_fmt == "sqlite3":
cmd += ["--compression-format", "zstd", "--compression-mode", "file"]
cmd += self._topics
try: try:
cutoff_time = datetime.now() - timedelta(days=self.storage_ttl_days) self._session.proc = subprocess.Popen(
for item in self.bag_dir.iterdir(): cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE,
if item.is_dir() and item.name.startswith('saltybot_'): preexec_fn=os.setsid)
return True, f"Recording: {self._session.segment_name()}"
except Exception as exc:
msg = f"Failed to launch ros2 bag record: {exc}"
self.get_logger().error(msg)
return False, msg
def _cleanup_expired_bags(self) -> None:
if self._bag_dir is None or not self._bag_dir.exists():
return
now = time.time()
for item in sorted(self._bag_dir.iterdir(),
key=lambda p: p.stat().st_mtime):
if not item.is_dir() or not item.name.startswith("saltybot_"):
continue
if (self._session and
item.name.startswith(self._session.session_name)):
continue
age = self._policy.bag_age_days(item.stat().st_mtime, now)
if self._policy.bag_is_expired(age):
try:
shutil.rmtree(item)
self.get_logger().info(
f"Cleanup: removed {item.name} (age {age:.1f}d)")
try: try:
timestamp_str = item.name.replace('saltybot_', '') disk = DiskInfo.from_path(str(self._bag_dir))
item_time = datetime.strptime(timestamp_str, '%Y%m%d_%H%M%S') if not self._policy.needs_cleanup(disk):
if item_time < cutoff_time: break
shutil.rmtree(item, ignore_errors=True) except OSError:
self.get_logger().info(f'Removed expired bag: {item.name}')
except (ValueError, OSError) as e:
self.get_logger().warn(f'Error processing {item.name}: {e}')
except Exception as e:
self.get_logger().error(f'Cleanup failed: {e}')
def enforce_storage_quota(self):
"""Remove oldest bags if total size exceeds quota (FIFO)."""
try:
total_size = sum(f.stat().st_size for f in self.bag_dir.rglob('*') if f.is_file()) / (1024 ** 3)
if total_size > self.max_storage_gb:
self.get_logger().warn(f'Storage quota exceeded: {total_size:.2f}GB > {self.max_storage_gb}GB')
bags = sorted([d for d in self.bag_dir.iterdir() if d.is_dir() and d.name.startswith('saltybot_')], key=lambda x: x.stat().st_mtime)
for bag in bags:
if total_size <= self.max_storage_gb:
break break
bag_size = sum(f.stat().st_size for f in bag.rglob('*') if f.is_file()) / (1024 ** 3) except OSError as exc:
shutil.rmtree(bag, ignore_errors=True) self.get_logger().warn(f"Cleanup failed {item.name}: {exc}")
total_size -= bag_size
self.get_logger().info(f'Removed bag to enforce quota: {bag.name}')
except Exception as e:
self.get_logger().error(f'Storage quota enforcement failed: {e}')
def rsync_bags(self): def _publish_status(self) -> None:
"""Optional: rsync bags to NAS.""" with self._lock:
try: s_name = self._session.session_name if self._session else None
cmd = ['rsync', '-avz', '--delete', f'{self.bag_dir}/', self.rsync_destination] seg = self._session.segment_name() if self._session else None
subprocess.run(cmd, check=False, timeout=300) splits = self._session.split_count if self._session else 0
self.get_logger().info(f'Synced bags to NAS: {self.rsync_destination}') seg_sz = self._session.segment_size_bytes() if self._session else 0
except subprocess.TimeoutExpired: s_dur = self._session.session_duration_s() if self._session else 0.0
self.get_logger().warn('Rsync timed out') seg_dur = self._session.segment_duration_s() if self._session else 0.0
except Exception as e: rec = self._recording
self.get_logger().error(f'Rsync failed: {e}') bdir = str(self._bag_dir) if self._bag_dir else None
now = time.monotonic()
idle = self._motion.idle_seconds(now)
disk_status = {}
if self._bag_dir and self._bag_dir.exists():
try:
di = DiskInfo.from_path(str(self._bag_dir))
disk_status = {
"path": bdir,
"used_pct": round(di.used_pct, 1),
"free_gb": round(di.free_gb, 2),
"total_gb": round(di.total_gb, 2),
"warn": self._policy.should_warn_disk(di),
"cleanup": self._policy.needs_cleanup(di),
}
except OSError:
pass
payload = {
"recording": rec,
"session": s_name,
"segment": seg,
"split_count": splits,
"segment_size_mb": round(seg_sz / 1e6, 1),
"session_dur_s": round(s_dur, 1),
"segment_dur_s": round(seg_dur, 1),
"motion_idle_s": round(idle, 1) if idle != float("inf") else None,
"motion_trigger": self._motion_trigger,
"bag_dir": bdir,
"disk": disk_status,
}
self._status_pub.publish(String(data=json.dumps(payload)))
def cleanup(self): def _shutdown(self) -> None:
"""Cleanup resources.""" with self._lock:
self.stop_recording(save=True) if self._recording:
self.get_logger().info('Bag recorder shutdown complete') self._stop_recording_locked()
def main(args=None): def main(args=None) -> None:
rclpy.init(args=args) rclpy.init(args=args)
node = BagRecorderNode() node = BagRecorderNode()
try: try:
@ -267,10 +385,6 @@ def main(args=None):
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
finally: finally:
node.cleanup() node._shutdown()
node.destroy_node() node.destroy_node()
rclpy.shutdown() rclpy.try_shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,379 @@
"""test_bag_policy.py — Unit tests for bag recording policy (Issue #615).
ROS2-free. Run with:
python3 -m pytest \
jetson/ros2_ws/src/saltybot_bag_recorder/test/test_bag_policy.py -v
"""
from __future__ import annotations
import math
import os
import sys
from unittest.mock import MagicMock, patch
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..",
"saltybot_bag_recorder"))
import pytest
from bag_policy import BagPolicy, DiskInfo, MotionState, StorageSelector
# ─── MotionState ──────────────────────────────────────────────────────────────
class TestMotionStateInit:
def test_default_idle_timeout(self):
assert MotionState().idle_timeout_s == 30.0
def test_custom_idle_timeout(self):
assert MotionState(idle_timeout_s=60.0).idle_timeout_s == 60.0
def test_zero_timeout_raises(self):
with pytest.raises(ValueError, match="idle_timeout_s"):
MotionState(idle_timeout_s=0.0)
def test_negative_timeout_raises(self):
with pytest.raises(ValueError):
MotionState(idle_timeout_s=-5.0)
def test_initial_not_ever_moved(self):
assert not MotionState().ever_moved
def test_initial_idle_seconds_inf(self):
assert MotionState().idle_seconds(1000.0) == math.inf
def test_initial_is_idle(self):
assert MotionState().is_idle(1000.0)
def test_initial_not_moving(self):
assert not MotionState().is_moving(1000.0)
class TestMotionStateUpdate:
def setup_method(self):
self.ms = MotionState(idle_timeout_s=30.0)
def test_nonzero_linear_sets_moved(self):
self.ms.update(0.5, 0.0, 100.0)
assert self.ms.ever_moved
def test_nonzero_angular_sets_moved(self):
self.ms.update(0.0, 0.3, 100.0)
assert self.ms.ever_moved
def test_zero_cmd_vel_does_not_set_moved(self):
self.ms.update(0.0, 0.0, 100.0)
assert not self.ms.ever_moved
def test_noise_below_threshold_ignored(self):
self.ms.update(0.0005, 0.0005, 100.0)
assert not self.ms.ever_moved
def test_idle_seconds_after_motion(self):
self.ms.update(0.5, 0.0, 100.0)
assert abs(self.ms.idle_seconds(105.0) - 5.0) < 1e-9
def test_not_idle_immediately(self):
self.ms.update(0.5, 0.0, 100.0)
assert not self.ms.is_idle(100.0)
def test_not_idle_within_timeout(self):
self.ms.update(0.5, 0.0, 100.0)
assert not self.ms.is_idle(129.9)
def test_idle_at_boundary(self):
self.ms.update(0.5, 0.0, 100.0)
assert self.ms.is_idle(130.0)
def test_idle_after_timeout(self):
self.ms.update(0.5, 0.0, 100.0)
assert self.ms.is_idle(200.0)
def test_moving_within_window(self):
self.ms.update(0.5, 0.0, 100.0)
assert self.ms.is_moving(115.0)
def test_not_moving_after_timeout(self):
self.ms.update(0.5, 0.0, 100.0)
assert not self.ms.is_moving(200.0)
def test_second_motion_resets_timer(self):
self.ms.update(0.5, 0.0, 100.0)
self.ms.update(0.5, 0.0, 125.0)
assert not self.ms.is_idle(150.0) # 25s since last motion
assert self.ms.is_idle(160.0) # 35s since last motion
class TestMotionStateAutoDecisions:
def setup_method(self):
self.ms = MotionState(idle_timeout_s=30.0)
def test_no_start_before_motion(self):
assert not self.ms.should_start_recording(1000.0)
def test_start_after_motion(self):
self.ms.update(0.5, 0.0, 1000.0)
assert self.ms.should_start_recording(1001.0)
def test_no_start_after_idle(self):
self.ms.update(0.5, 0.0, 1000.0)
assert not self.ms.should_start_recording(1040.0)
def test_no_stop_before_motion(self):
assert not self.ms.should_stop_recording(1000.0)
def test_no_stop_while_active(self):
self.ms.update(0.5, 0.0, 1000.0)
assert not self.ms.should_stop_recording(1010.0)
def test_stop_after_idle(self):
self.ms.update(0.5, 0.0, 1000.0)
assert self.ms.should_stop_recording(1035.0)
class TestMotionStateReset:
def test_reset_clears_motion(self):
ms = MotionState()
ms.update(1.0, 0.0, 100.0)
ms.reset()
assert not ms.ever_moved
assert ms.idle_seconds(200.0) == math.inf
# ─── DiskInfo ─────────────────────────────────────────────────────────────────
def _disk(total_gb=100.0, free_gb=50.0, path="/fake"):
total = int(total_gb * 1024**3)
free = int(free_gb * 1024**3)
used = total - free
return DiskInfo(path=path, total_bytes=total, used_bytes=used, free_bytes=free)
class TestDiskInfo:
def test_used_pct_50(self):
assert abs(_disk(100, 50).used_pct - 50.0) < 0.01
def test_used_pct_100(self):
assert _disk(100, 0).used_pct == 100.0
def test_used_pct_0(self):
assert abs(_disk(100, 100).used_pct) < 0.01
def test_free_pct(self):
assert abs(_disk(100, 30).free_pct - 30.0) < 0.01
def test_free_gb(self):
assert abs(_disk(100, 20).free_gb - 20.0) < 0.01
def test_total_gb(self):
assert abs(_disk(250, 100).total_gb - 250.0) < 0.01
def test_used_gb(self):
assert abs(_disk(100, 30).used_gb - 70.0) < 0.01
def test_exceeds_pct_true(self):
assert _disk(100, 10).exceeds_pct(80.0) # 90% used
def test_exceeds_pct_false(self):
assert not _disk(100, 50).exceeds_pct(80.0) # 50% used
def test_exceeds_pct_boundary(self):
assert _disk(100, 20).exceeds_pct(80.0) # exactly 80%
def test_has_min_free_true(self):
assert _disk(100, 10).has_min_free(2.0)
def test_has_min_free_false(self):
assert not _disk(100, 1).has_min_free(2.0)
def test_zero_total_is_100pct(self):
di = DiskInfo(path="/", total_bytes=0, used_bytes=0, free_bytes=0)
assert di.used_pct == 100.0
def test_summary_has_path(self):
di = _disk(path="/media/usb0")
assert "/media/usb0" in di.summary()
# ─── StorageSelector ─────────────────────────────────────────────────────────
class TestStorageSelector:
def test_empty_paths_raises(self):
with pytest.raises(ValueError, match="paths"):
StorageSelector([])
def test_select_none_when_no_paths_exist(self):
sel = StorageSelector(["/nonexistent/a", "/nonexistent/b"])
assert sel.select() is None
def test_select_first_accessible(self, tmp_path):
p1 = tmp_path / "p1"; p1.mkdir()
p2 = tmp_path / "p2"; p2.mkdir()
sel = StorageSelector([str(p1), str(p2)], min_free_gb=0.0)
assert sel.select() == str(p1)
def test_select_skips_insufficient_free(self, tmp_path):
small = tmp_path / "small"; small.mkdir()
large = tmp_path / "large"; large.mkdir()
def fake_from(path):
if "small" in path:
return DiskInfo(path=path, total_bytes=10*1024**3,
used_bytes=9*1024**3, free_bytes=1*1024**3)
return DiskInfo(path=path, total_bytes=100*1024**3,
used_bytes=10*1024**3, free_bytes=90*1024**3)
with patch("bag_policy.DiskInfo.from_path", side_effect=fake_from):
sel = StorageSelector([str(small), str(large)], min_free_gb=2.0)
assert sel.select() == str(large)
def test_select_or_default_fallback(self):
sel = StorageSelector(["/nonexistent"], min_free_gb=1.0)
assert sel.select_or_default("/fb") == "/fb"
def test_disk_infos_empty_nonexistent(self):
sel = StorageSelector(["/nonexistent"])
assert sel.disk_infos() == []
# ─── BagPolicy constructor ────────────────────────────────────────────────────
class TestBagPolicyConstructor:
def test_defaults(self):
p = BagPolicy()
assert p.split_size_bytes == 1 * 1024**3
assert p.split_duration_s == 600.0
assert p.cleanup_age_days == 7
assert p.cleanup_disk_pct == 80.0
assert p.warn_disk_pct == 70.0
def test_size_stored_as_bytes(self):
assert BagPolicy(split_size_gb=2.0).split_size_bytes == 2 * 1024**3
def test_duration_stored_as_seconds(self):
assert BagPolicy(split_duration_min=5.0).split_duration_s == 300.0
def test_zero_size_raises(self):
with pytest.raises(ValueError, match="split_size_gb"):
BagPolicy(split_size_gb=0.0)
def test_negative_size_raises(self):
with pytest.raises(ValueError):
BagPolicy(split_size_gb=-1.0)
def test_zero_duration_raises(self):
with pytest.raises(ValueError, match="split_duration_min"):
BagPolicy(split_duration_min=0.0)
def test_cleanup_age_zero_raises(self):
with pytest.raises(ValueError, match="cleanup_age_days"):
BagPolicy(cleanup_age_days=0)
def test_warn_above_cleanup_raises(self):
with pytest.raises(ValueError):
BagPolicy(warn_disk_pct=85.0, cleanup_disk_pct=80.0)
def test_equal_warn_cleanup_raises(self):
with pytest.raises(ValueError):
BagPolicy(warn_disk_pct=80.0, cleanup_disk_pct=80.0)
# ─── BagPolicy split ─────────────────────────────────────────────────────────
class TestBagPolicySplit:
def setup_method(self):
self.p = BagPolicy(split_size_gb=1.0, split_duration_min=10.0)
def test_no_split_small_short(self):
split, reason = self.p.needs_split(100*1024**2, 60)
assert not split and reason == ""
def test_split_at_size_boundary(self):
split, reason = self.p.needs_split(1024**3, 60)
assert split
assert "size" in reason.lower() or "gb" in reason.lower()
def test_split_over_size(self):
split, _ = self.p.needs_split(int(1.5*1024**3), 60)
assert split
def test_split_at_duration_boundary(self):
split, reason = self.p.needs_split(100*1024**2, 600)
assert split
assert "min" in reason.lower() or "duration" in reason.lower()
def test_split_over_duration(self):
split, _ = self.p.needs_split(100*1024**2, 700)
assert split
def test_just_below_both(self):
split, _ = self.p.needs_split(1024**3 - 1, 599)
assert not split
def test_both_exceeded_still_splits(self):
split, _ = self.p.needs_split(2*1024**3, 700)
assert split
# ─── BagPolicy disk thresholds ────────────────────────────────────────────────
class TestBagPolicyDisk:
def setup_method(self):
self.p = BagPolicy(warn_disk_pct=70.0, cleanup_disk_pct=80.0,
min_free_gb=2.0)
def test_no_warn_below(self):
assert not self.p.should_warn_disk(_disk(100, 40)) # 60% used
def test_warn_at_boundary(self):
assert self.p.should_warn_disk(_disk(100, 30)) # 70% used
def test_warn_above(self):
assert self.p.should_warn_disk(_disk(100, 25)) # 75% used
def test_no_cleanup_at_warn(self):
assert not self.p.needs_cleanup(_disk(100, 30)) # 70% used
def test_cleanup_at_boundary(self):
assert self.p.needs_cleanup(_disk(100, 20)) # 80% used
def test_cleanup_above(self):
assert self.p.needs_cleanup(_disk(100, 5)) # 95% used
def test_can_record_exact_min_free(self):
total = 100 * 1024**3
free = int(2.0 * 1024**3)
di = DiskInfo(path="/", total_bytes=total,
used_bytes=total-free, free_bytes=free)
assert self.p.can_record(di)
def test_cannot_record_below_min_free(self):
total = 100 * 1024**3
free = int(1.0 * 1024**3)
di = DiskInfo(path="/", total_bytes=total,
used_bytes=total-free, free_bytes=free)
assert not self.p.can_record(di)
# ─── BagPolicy age ────────────────────────────────────────────────────────────
class TestBagPolicyAge:
def setup_method(self):
self.p = BagPolicy(cleanup_age_days=7)
def test_fresh_not_expired(self):
assert not self.p.bag_is_expired(1.0)
def test_at_boundary_expired(self):
assert self.p.bag_is_expired(7.0)
def test_old_expired(self):
assert self.p.bag_is_expired(30.0)
def test_age_computation(self):
mtime = 1000.0
age = self.p.bag_age_days(mtime, mtime + 8 * 86400)
assert abs(age - 8.0) < 1e-9
def test_age_zero(self):
assert self.p.bag_age_days(5000.0, 5000.0) == 0.0

View File

@ -618,3 +618,27 @@ void jlink_send_can_stats(const jlink_tlm_can_stats_t *tlm)
jlink_tx_locked(frame, sizeof(frame)); jlink_tx_locked(frame, sizeof(frame));
} }
/* ---- jlink_send_lvc_tlm() -- Issue #613 ---- */
void jlink_send_lvc_tlm(const jlink_tlm_lvc_t *tlm)
{
/*
* Frame: [STX][LEN][0x8B][4 bytes LVC][CRC_hi][CRC_lo][ETX]
* LEN = 1 + 4 = 5; total = 10 bytes
*/
static uint8_t frame[10];
const uint8_t plen = (uint8_t)sizeof(jlink_tlm_lvc_t); /* 4 */
const uint8_t len = 1u + plen; /* 5 */
frame[0] = JLINK_STX;
frame[1] = len;
frame[2] = JLINK_TLM_LVC;
memcpy(&frame[3], tlm, plen);
uint16_t crc = crc16_xmodem(&frame[2], len);
frame[3 + plen] = (uint8_t)(crc >> 8);
frame[3 + plen + 1] = (uint8_t)(crc & 0xFFu);
frame[3 + plen + 2] = JLINK_ETX;
jlink_tx_locked(frame, sizeof(frame));
}

144
src/lvc.c Normal file
View File

@ -0,0 +1,144 @@
/*
* lvc.c -- Low Voltage Cutoff (LVC) protection (Issue #613)
*
* State machine:
* NORMAL -> WARNING when vbat < LVC_WARNING_MV
* WARNING -> CRITICAL when vbat < LVC_CRITICAL_MV
* CRITICAL -> CUTOFF when vbat < LVC_CUTOFF_MV
*
* Recovery: step down severity only when voltage exceeds the threshold
* by LVC_HYSTERESIS_MV. CUTOFF is latched for the remainder of the session.
*
* Buzzer alerts:
* WARNING -- MELODY_LOW_BATTERY once, then every 30 s
* CRITICAL -- MELODY_LOW_BATTERY x2, then every 10 s
* CUTOFF -- MELODY_ERROR (one-shot; motor disable handled by main.c)
*/
#include "lvc.h"
#include "buzzer.h"
#include "config.h"
/* Periodic buzzer reminder intervals */
#define LVC_WARN_INTERVAL_MS 30000u /* 30 s */
#define LVC_CRIT_INTERVAL_MS 10000u /* 10 s */
static LvcState s_state = LVC_NORMAL;
static bool s_cutoff_latched = false;
static uint32_t s_buzzer_tick = 0u;
/* ---- lvc_init() ---- */
void lvc_init(void)
{
s_state = LVC_NORMAL;
s_cutoff_latched = false;
s_buzzer_tick = 0u;
}
/* ---- lvc_tick() ---- */
void lvc_tick(uint32_t now_ms, uint32_t vbat_mv)
{
if (vbat_mv == 0u) {
return; /* ADC not ready; hold current state */
}
/* Determine new state from raw voltage */
LvcState new_state;
if (vbat_mv < LVC_CUTOFF_MV) {
new_state = LVC_CUTOFF;
} else if (vbat_mv < LVC_CRITICAL_MV) {
new_state = LVC_CRITICAL;
} else if (vbat_mv < LVC_WARNING_MV) {
new_state = LVC_WARNING;
} else {
new_state = LVC_NORMAL;
}
/* Hysteresis on recovery: only decrease severity when voltage exceeds
* the threshold by LVC_HYSTERESIS_MV to prevent rapid toggling. */
if (new_state < s_state) {
LvcState recovered;
if (vbat_mv >= LVC_CUTOFF_MV + LVC_HYSTERESIS_MV) {
recovered = LVC_CRITICAL;
} else if (vbat_mv >= LVC_CRITICAL_MV + LVC_HYSTERESIS_MV) {
recovered = LVC_WARNING;
} else if (vbat_mv >= LVC_WARNING_MV + LVC_HYSTERESIS_MV) {
recovered = LVC_NORMAL;
} else {
recovered = s_state; /* insufficient margin; stay at current level */
}
new_state = recovered;
}
/* CUTOFF latch: once triggered, only a reboot clears it */
if (s_cutoff_latched) {
new_state = LVC_CUTOFF;
}
if (new_state == LVC_CUTOFF) {
s_cutoff_latched = true;
}
/* Buzzer alerts */
bool state_changed = (new_state != s_state);
s_state = new_state;
switch (s_state) {
case LVC_WARNING:
if (state_changed ||
(now_ms - s_buzzer_tick) >= LVC_WARN_INTERVAL_MS) {
s_buzzer_tick = now_ms;
buzzer_play_melody(MELODY_LOW_BATTERY);
}
break;
case LVC_CRITICAL:
if (state_changed ||
(now_ms - s_buzzer_tick) >= LVC_CRIT_INTERVAL_MS) {
s_buzzer_tick = now_ms;
/* Double alert to distinguish critical from warning */
buzzer_play_melody(MELODY_LOW_BATTERY);
buzzer_play_melody(MELODY_LOW_BATTERY);
}
break;
case LVC_CUTOFF:
if (state_changed) {
/* One-shot alarm; motors disabled by main.c */
buzzer_play_melody(MELODY_ERROR);
}
break;
default:
break;
}
}
/* ---- lvc_get_state() ---- */
LvcState lvc_get_state(void)
{
return s_state;
}
/* ---- lvc_get_power_scale() ---- */
/*
* Returns the motor power scale factor (0-100):
* NORMAL / WARNING : 100% -- no reduction
* CRITICAL : 50% -- halve motor commands
* CUTOFF : 0% -- all commands zeroed
*/
uint8_t lvc_get_power_scale(void)
{
switch (s_state) {
case LVC_NORMAL: /* fall-through */
case LVC_WARNING: return 100u;
case LVC_CRITICAL: return 50u;
case LVC_CUTOFF: return 0u;
default: return 100u;
}
}
/* ---- lvc_is_cutoff() ---- */
bool lvc_is_cutoff(void)
{
return s_cutoff_latched;
}

View File

@ -35,6 +35,7 @@
#include "can_driver.h" #include "can_driver.h"
#include "servo_bus.h" #include "servo_bus.h"
#include "gimbal.h" #include "gimbal.h"
#include "lvc.h"
#include <math.h> #include <math.h>
#include <string.h> #include <string.h>
#include <stdio.h> #include <stdio.h>
@ -257,6 +258,9 @@ int main(void) {
/* Init battery ADC (PC1/ADC3 — Vbat divider 11:1) for CRSF telemetry */ /* Init battery ADC (PC1/ADC3 — Vbat divider 11:1) for CRSF telemetry */
battery_init(); battery_init();
/* Init LVC: low voltage cutoff state machine (Issue #613) */
lvc_init();
/* Probe I2C1 for optional sensors — skip gracefully if not found */ /* Probe I2C1 for optional sensors — skip gracefully if not found */
int baro_chip = 0; /* chip_id: 0x58=BMP280, 0x60=BME280, 0=absent */ int baro_chip = 0; /* chip_id: 0x58=BMP280, 0x60=BME280, 0=absent */
mag_type_t mag_type = MAG_NONE; mag_type_t mag_type = MAG_NONE;
@ -288,6 +292,7 @@ int main(void) {
uint32_t pm_tlm_tick = 0; /* JLINK_TLM_POWER transmit timer */ uint32_t pm_tlm_tick = 0; /* JLINK_TLM_POWER transmit timer */
uint32_t can_cmd_tick = 0; /* CAN velocity command TX timer (Issue #597) */ uint32_t can_cmd_tick = 0; /* CAN velocity command TX timer (Issue #597) */
uint32_t can_tlm_tick = 0; /* JLINK_TLM_CAN_STATS transmit timer (Issue #597) */ uint32_t can_tlm_tick = 0; /* JLINK_TLM_CAN_STATS transmit timer (Issue #597) */
uint32_t lvc_tlm_tick = 0; /* JLINK_TLM_LVC transmit timer (Issue #613) */
uint8_t pm_pwm_phase = 0; /* Software PWM counter for sleep LED */ uint8_t pm_pwm_phase = 0; /* Software PWM counter for sleep LED */
const float dt = 1.0f / PID_LOOP_HZ; /* 1ms at 1kHz */ const float dt = 1.0f / PID_LOOP_HZ; /* 1ms at 1kHz */
@ -320,6 +325,9 @@ int main(void) {
/* Accumulate coulombs for battery state-of-charge estimation (Issue #325) */ /* Accumulate coulombs for battery state-of-charge estimation (Issue #325) */
battery_accumulate_coulombs(); battery_accumulate_coulombs();
/* LVC: update low-voltage protection state machine (Issue #613) */
lvc_tick(now, battery_read_mv());
/* Sleep LED: software PWM on LED1 (active-low PC15) driven by PM brightness. /* Sleep LED: software PWM on LED1 (active-low PC15) driven by PM brightness.
* pm_pwm_phase rolls over each ms; brightness sets duty cycle 0-255. */ * pm_pwm_phase rolls over each ms; brightness sets duty cycle 0-255. */
pm_pwm_phase++; pm_pwm_phase++;
@ -600,6 +608,13 @@ int main(void) {
motor_driver_estop_clear(&motors); motor_driver_estop_clear(&motors);
} }
/* LVC cutoff: disarm and estop on undervoltage latch (Issue #613) */
if (lvc_is_cutoff() && bal.state == BALANCE_ARMED) {
safety_arm_cancel();
balance_disarm(&bal);
motor_driver_estop(&motors);
}
/* Feed autonomous steer from Jetson into mode manager. /* Feed autonomous steer from Jetson into mode manager.
* jlink takes priority over legacy CDC jetson_cmd. * jlink takes priority over legacy CDC jetson_cmd.
* mode_manager_get_steer() blends it with RC steer per active mode. */ * mode_manager_get_steer() blends it with RC steer per active mode. */
@ -616,6 +631,11 @@ int main(void) {
int16_t steer = mode_manager_get_steer(&mode); int16_t steer = mode_manager_get_steer(&mode);
int16_t spd_bias = mode_manager_get_speed_bias(&mode); int16_t spd_bias = mode_manager_get_speed_bias(&mode);
int32_t speed = (int32_t)bal.motor_cmd + spd_bias; int32_t speed = (int32_t)bal.motor_cmd + spd_bias;
/* LVC power scaling: 100% normal, 50% critical, 0% cutoff (Issue #613) */
uint8_t lvc_scale = lvc_get_power_scale();
if (lvc_scale < 100u) {
speed = (speed * (int32_t)lvc_scale) / 100;
}
if (speed > 1000) speed = 1000; if (speed > 1000) speed = 1000;
if (speed < -1000) speed = -1000; if (speed < -1000) speed = -1000;
motor_driver_update(&motors, (int16_t)speed, steer, now); motor_driver_update(&motors, (int16_t)speed, steer, now);
@ -728,6 +748,26 @@ int main(void) {
jlink_send_power_telemetry(&pow); jlink_send_power_telemetry(&pow);
} }
/* JLINK_TLM_LVC telemetry at LVC_TLM_HZ (1 Hz) -- battery voltage + protection state (Issue #613) */
if (now - lvc_tlm_tick >= (1000u / LVC_TLM_HZ)) {
lvc_tlm_tick = now;
uint32_t lvc_vbat = battery_read_mv();
jlink_tlm_lvc_t ltlm;
ltlm.voltage_mv = (lvc_vbat > 65535u) ? 65535u : (uint16_t)lvc_vbat;
ltlm.protection_state = (uint8_t)lvc_get_state();
if (lvc_vbat == 0u) {
ltlm.percent = 255u;
} else if (lvc_vbat <= LVC_CUTOFF_MV) {
ltlm.percent = 0u;
} else if (lvc_vbat >= LVC_WARNING_MV) {
ltlm.percent = 100u;
} else {
ltlm.percent = (uint8_t)(((lvc_vbat - LVC_CUTOFF_MV) * 100u) /
(LVC_WARNING_MV - LVC_CUTOFF_MV));
}
jlink_send_lvc_tlm(&ltlm);
}
/* USB telemetry at 50Hz (only when streaming enabled and calibration done) */ /* USB telemetry at 50Hz (only when streaming enabled and calibration done) */
if (cdc_streaming && mpu6000_is_calibrated() && now - send_tick >= 20) { if (cdc_streaming && mpu6000_is_calibrated() && now - send_tick >= 20) {
send_tick = now; send_tick = now;