feat: Jetson Orin system monitor ROS2 node (Issue #631)

New package saltybot_system_monitor:
- jetson_stats.py: pure-Python data layer (JetsonStats, CpuCore,
  TegrastatsParser, JtopReader, TegrastatsReader, MockReader,
  AlertChecker, AlertThresholds) — no ROS2 dependency
- system_monitor_node.py: ROS2 node publishing /saltybot/system/stats
  (JSON) and /saltybot/diagnostics (DiagnosticArray) at 1 Hz
- Alerts: CPU/GPU >85% WARN (+10% ERROR), temp >80°C, disk/RAM >90%,
  power >30 W; each alert produces a DiagnosticStatus entry
- Stats source priority: jtop > tegrastats > mock (auto-detected)
- config/system_monitor.yaml: all thresholds and rate tunable via params
- launch/system_monitor.launch.py: single-node launch with config arg
- test/test_system_monitor.py: 50+ pytest tests, ROS2-free

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-jetson 2026-03-15 14:38:43 -04:00
parent 0e8758e9e1
commit 25f0711309
13 changed files with 1902 additions and 0 deletions

129
include/uart_protocol.h Normal file
View File

@ -0,0 +1,129 @@
#ifndef UART_PROTOCOL_H
#define UART_PROTOCOL_H
#include <stdint.h>
#include <stdbool.h>
/*
* uart_protocol.h -- Structured UART command protocol for Jetson-STM32
* communication (Issue #629)
*
* Hardware: UART5, PC12=TX / PD2=RX, 115200 baud
* Note: spec references USART1 but USART1 is occupied by JLink (Issue #120)
* at 921600 baud. UART5 is the designated secondary/debug UART and is used
* here as the structured-command channel.
*
* Frame format (both directions):
* [STX=0x02][LEN][CMD][PAYLOAD...][CRC8][ETX=0x03]
*
* STX : frame start sentinel (0x02)
* LEN : CMD byte + payload bytes (1 + payload_len)
* CMD : command / response type byte
* PAYLOAD : 0..N bytes, CMD-dependent
* CRC8 : CRC8-SMBUS (poly 0x07, init 0x00) over CMD+PAYLOAD
* ETX : frame end sentinel (0x03)
*
* Host (Jetson) to STM32 commands:
* 0x01 SET_VELOCITY - int16 left_rpm, int16 right_rpm (4 bytes)
* 0x02 GET_STATUS - no payload; reply URESP_STATUS
* 0x03 SET_PID - float kp, float ki, float kd (12 bytes, LE)
* 0x04 ESTOP - no payload; engage emergency stop
* 0x05 CLEAR_ESTOP - no payload; clear emergency stop
*
* STM32 to host responses:
* 0x80 ACK - uint8 echoed_cmd (1 byte)
* 0x81 NACK - uint8 echoed_cmd, uint8 error_code (2 bytes)
* 0x82 STATUS - uart_prot_status_t (8 bytes)
*
* Heartbeat: if no valid frame is received within UART_PROT_HB_TIMEOUT_MS,
* uart_protocol_is_active() returns false and velocity commands are ignored.
*
* DMA: UART5_RX uses DMA1_Stream0_Channel4 in circular mode (256-byte ring).
* Parser runs in uart_protocol_process() called from the main loop.
*/
/* ---- Frame constants ---- */
#define UPROT_STX 0x02u
#define UPROT_ETX 0x03u
#define UPROT_RX_BUF_LEN 256u /* DMA ring buffer size (power of 2) */
#define UPROT_MAX_PAYLOAD 16u /* largest payload: SET_PID = 12 bytes */
/* ---- Command IDs (host to STM32) ---- */
#define UCMD_SET_VELOCITY 0x01u /* int16 left_rpm + int16 right_rpm */
#define UCMD_GET_STATUS 0x02u /* no payload */
#define UCMD_SET_PID 0x03u /* float kp, ki, kd (12 bytes LE) */
#define UCMD_ESTOP 0x04u /* no payload */
#define UCMD_CLEAR_ESTOP 0x05u /* no payload */
/* ---- Response IDs (STM32 to host) ---- */
#define URESP_ACK 0x80u /* 1-byte payload: echoed CMD */
#define URESP_NACK 0x81u /* 2-byte payload: CMD + error_code */
#define URESP_STATUS 0x82u /* 8-byte payload: uart_prot_status_t */
/* ---- NACK error codes ---- */
#define UERR_UNKNOWN_CMD 0x01u /* unrecognised CMD byte */
#define UERR_BAD_LENGTH 0x02u /* LEN does not match CMD expectation */
#define UERR_BAD_CRC 0x03u /* CRC8 mismatch */
#define UERR_REFUSED 0x04u /* command refused in current state */
/* ---- STATUS response payload (8 bytes, packed) ---- */
typedef struct __attribute__((packed)) {
int16_t pitch_x10; /* pitch angle, degrees x10 (0.1° resolution) */
int16_t motor_cmd; /* balance PID ESC output, -1000..+1000 */
uint16_t vbat_mv; /* battery voltage (mV) */
uint8_t balance_state; /* BalanceState: 0=DISARMED, 1=ARMED, 2=TILT */
uint8_t estop_active; /* 1 = estop currently engaged */
} uart_prot_status_t; /* 8 bytes */
/* ---- Volatile state (read from main loop) ---- */
typedef struct {
/* SET_VELOCITY: set by parser, cleared by main loop */
volatile uint8_t vel_updated; /* 1 = new velocity command pending */
volatile int16_t left_rpm; /* requested left wheel RPM */
volatile int16_t right_rpm; /* requested right wheel RPM */
/* SET_PID: set by parser, cleared by main loop */
volatile uint8_t pid_updated; /* 1 = new PID gains pending */
volatile float pid_kp;
volatile float pid_ki;
volatile float pid_kd;
/* ESTOP / CLEAR_ESTOP: set by parser, cleared by main loop */
volatile uint8_t estop_req;
volatile uint8_t estop_clear_req;
/* Heartbeat: updated on every valid frame */
volatile uint32_t last_rx_ms; /* HAL_GetTick() at last good frame; 0=none */
} UartProtState;
extern volatile UartProtState uart_prot_state;
/* ---- API ---- */
/*
* uart_protocol_init() -- configure UART5 + DMA1_Stream0 and start reception.
* Call once during system init, before the main loop.
*/
void uart_protocol_init(void);
/*
* uart_protocol_process() -- parse new bytes from DMA ring buffer, dispatch
* commands, and send ACK/NACK/STATUS responses.
* Call every main loop tick (non-blocking, < 5 µs when idle).
*/
void uart_protocol_process(void);
/*
* uart_protocol_is_active(now_ms) -- returns true if a valid frame was
* received within UART_PROT_HB_TIMEOUT_MS. Main loop uses this to gate
* SET_VELOCITY commands when the host is disconnected.
*/
bool uart_protocol_is_active(uint32_t now_ms);
/*
* uart_protocol_send_status(s) -- transmit a URESP_STATUS (0x82) frame.
* Called from main loop in response to GET_STATUS or periodically.
*/
void uart_protocol_send_status(const uart_prot_status_t *s);
#endif /* UART_PROTOCOL_H */

View File

@ -0,0 +1,17 @@
# system_monitor.yaml — Config for saltybot_system_monitor (Issue #631)
system_monitor_node:
ros__parameters:
# Publishing rate in Hz
publish_rate_hz: 1.0
# Stats source: auto | jtop | tegrastats | mock
# auto = prefer jtop, fall back to tegrastats, then mock
stats_source: "auto"
# Alert thresholds — WARN at these values, ERROR at +10% (or +5% for disk/ram)
cpu_warn_pct: 85.0 # CPU utilisation (%)
gpu_warn_pct: 85.0 # GPU utilisation (%)
temp_warn_c: 80.0 # Any temperature sensor (°C)
disk_warn_pct: 90.0 # Disk usage (%)
ram_warn_pct: 90.0 # RAM usage (%)
power_warn_mw: 30000.0 # Total board power (mW)

View File

@ -0,0 +1,27 @@
"""system_monitor.launch.py — Launch the Jetson system monitor node (Issue #631)."""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration, PathJoinSubstitution
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
def generate_launch_description() -> LaunchDescription:
pkg_share = FindPackageShare("saltybot_system_monitor")
config_arg = DeclareLaunchArgument(
"config_file",
default_value=PathJoinSubstitution([pkg_share, "config", "system_monitor.yaml"]),
description="Path to the system monitor YAML config file",
)
node = Node(
package="saltybot_system_monitor",
executable="system_monitor_node",
name="system_monitor_node",
output="screen",
parameters=[LaunchConfiguration("config_file")],
)
return LaunchDescription([config_arg, node])

View File

@ -0,0 +1,27 @@
<?xml version="1.0"?>
<package format="3">
<name>saltybot_system_monitor</name>
<version>0.1.0</version>
<description>
Jetson Orin system monitor for SaltyBot (Issue #631).
Publishes CPU/GPU/RAM/disk/temperature/fan/power stats at 1 Hz on
/saltybot/system/stats (JSON) and DiagnosticArray on /saltybot/diagnostics.
Alerts when CPU &gt; 85%, GPU &gt; 85%, temp &gt; 80°C, or disk &gt; 90%.
Reads hardware stats via jtop (jetson_stats) when available, falls back
to tegrastats subprocess parsing.
</description>
<maintainer email="sl-jetson@saltylab.local">sl-jetson</maintainer>
<license>Apache-2.0</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>diagnostic_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,558 @@
"""jetson_stats.py — Jetson Orin hardware stats collection (Issue #631).
No ROS2 dependencies pure Python, fully unit-testable.
Design
JetsonStatsReader.read() returns a JetsonStats snapshot. The reader
tries three backends in priority order:
1. jtop (jetson_stats pip package) richest data, low overhead
2. tegrastats subprocess always present on JetPack, parseable
3. MockReader (tests / dev) deterministic values, no hardware
AlertChecker.check(stats, thresholds) returns a list of Alert objects
describing any threshold violations. All thresholds are per-dataclass
config so they're easy to unit-test.
Tegrastats sample line (Jetson Orin NX, JetPack 6):
RAM 1234/7772MB (lfb 2x2MB) SWAP 0/3886MB (cached 0MB)
CPU [12%@1190,5%@729,...] EMC_FREQ 4%@2133 GPC_FREQ 4%@918
AO@35C CPU@39.6C Tboard@32C Tdiode@35.3C GPU@37.6C tj@41.3C
VDD_GPU_SOC 1866mW/1866mW VDD_CPU_CV 700mW/682mW VDD_IN 4002mW/3996mW
"""
from __future__ import annotations
import re
import shutil
import subprocess
import time
from dataclasses import dataclass, field, asdict
from typing import Dict, List, Optional, Tuple
# psutil is available everywhere; used for disk stats fallback
try:
import psutil as _psutil
HAS_PSUTIL = True
except ImportError:
HAS_PSUTIL = False
# jtop (jetson_stats) is Jetson-only
try:
from jtop import jtop as _jtop
HAS_JTOP = True
except ImportError:
HAS_JTOP = False
# ── Data model ────────────────────────────────────────────────────────────────
@dataclass
class CpuCore:
"""Stats for one CPU core."""
index: int
usage_pct: float # 0100
freq_mhz: int = 0
@dataclass
class JetsonStats:
"""Snapshot of all monitored Jetson Orin metrics."""
timestamp: float = field(default_factory=time.time)
# CPU
cpu_cores: List[CpuCore] = field(default_factory=list)
cpu_avg_pct: float = 0.0 # mean across online cores
# GPU
gpu_pct: float = 0.0
gpu_freq_mhz: int = 0
# RAM
ram_used_mb: int = 0
ram_total_mb: int = 0
ram_pct: float = 0.0
# SWAP
swap_used_mb: int = 0
swap_total_mb: int = 0
# Temperatures (name → °C)
temperatures: Dict[str, float] = field(default_factory=dict)
temp_max_c: float = 0.0 # hottest sensor
# Fan
fan_pct: float = 0.0 # 0100 (0 if unavailable)
# Power (mW)
power_total_mw: int = 0
power_cpu_mw: int = 0
power_gpu_mw: int = 0
# Disk (/, primary partition)
disk_used_gb: float = 0.0
disk_total_gb: float = 0.0
disk_pct: float = 0.0
# Source identification
source: str = "unknown"
# ── Derived helpers ────────────────────────────────────────────────────
def to_dict(self) -> dict:
"""Return JSON-serialisable dict (cpu_cores as plain dicts)."""
d = asdict(self)
# dataclasses.asdict already recurses into nested dataclasses
return d
@property
def cpu_count(self) -> int:
return len(self.cpu_cores)
@property
def ram_free_mb(self) -> int:
return max(0, self.ram_total_mb - self.ram_used_mb)
@property
def disk_free_gb(self) -> float:
return max(0.0, self.disk_total_gb - self.disk_used_gb)
# ── Alert model ───────────────────────────────────────────────────────────────
@dataclass
class AlertThresholds:
"""Configurable alert thresholds for Jetson stats (Issue #631 spec)."""
cpu_pct: float = 85.0 # alert when CPU average exceeds this
gpu_pct: float = 85.0 # alert when GPU usage exceeds this
temp_c: float = 80.0 # alert when any sensor exceeds this
disk_pct: float = 90.0 # alert when disk usage exceeds this
ram_pct: float = 90.0 # alert when RAM usage exceeds this
power_mw: float = 30_000 # alert when total power exceeds 30 W
@dataclass
class Alert:
"""A single threshold-violation alert."""
component: str # e.g. "cpu", "gpu", "temp_CPU", "disk"
message: str
level: int # 1=WARN, 2=ERROR (matches DiagnosticStatus levels)
value: float = 0.0 # observed value
threshold: float = 0.0 # configured threshold
WARN = 1
ERROR = 2
def is_error(self) -> bool:
return self.level == self.ERROR
def is_warn(self) -> bool:
return self.level == self.WARN
class AlertChecker:
"""Check a JetsonStats snapshot against AlertThresholds.
Returns a list of Alert objects (empty = all OK).
Levels:
WARN value is between threshold and threshold+10% (or single step)
ERROR value is above threshold+10% (or temp > threshold)
"""
def check(
self,
stats: JetsonStats,
thresholds: AlertThresholds,
) -> List[Alert]:
alerts: List[Alert] = []
# CPU
if stats.cpu_avg_pct >= thresholds.cpu_pct:
level = (Alert.ERROR if stats.cpu_avg_pct >= thresholds.cpu_pct + 10
else Alert.WARN)
alerts.append(Alert(
component = "cpu",
message = (f"CPU avg {stats.cpu_avg_pct:.1f}% "
f">= {thresholds.cpu_pct:.0f}%"),
level = level,
value = stats.cpu_avg_pct,
threshold = thresholds.cpu_pct,
))
# GPU
if stats.gpu_pct >= thresholds.gpu_pct:
level = (Alert.ERROR if stats.gpu_pct >= thresholds.gpu_pct + 10
else Alert.WARN)
alerts.append(Alert(
component = "gpu",
message = (f"GPU {stats.gpu_pct:.1f}% "
f">= {thresholds.gpu_pct:.0f}%"),
level = level,
value = stats.gpu_pct,
threshold = thresholds.gpu_pct,
))
# Temperatures — check every sensor
for sensor, temp_c in stats.temperatures.items():
if temp_c >= thresholds.temp_c:
level = (Alert.ERROR if temp_c >= thresholds.temp_c + 10
else Alert.WARN)
alerts.append(Alert(
component = f"temp_{sensor}",
message = (f"{sensor} {temp_c:.1f}°C "
f">= {thresholds.temp_c:.0f}°C"),
level = level,
value = temp_c,
threshold = thresholds.temp_c,
))
# Disk
if stats.disk_pct >= thresholds.disk_pct:
level = (Alert.ERROR if stats.disk_pct >= thresholds.disk_pct + 5
else Alert.WARN)
alerts.append(Alert(
component = "disk",
message = (f"Disk {stats.disk_pct:.1f}% "
f">= {thresholds.disk_pct:.0f}%"),
level = level,
value = stats.disk_pct,
threshold = thresholds.disk_pct,
))
# RAM
if stats.ram_pct >= thresholds.ram_pct:
level = (Alert.ERROR if stats.ram_pct >= thresholds.ram_pct + 5
else Alert.WARN)
alerts.append(Alert(
component = "ram",
message = (f"RAM {stats.ram_pct:.1f}% "
f">= {thresholds.ram_pct:.0f}%"),
level = level,
value = stats.ram_pct,
threshold = thresholds.ram_pct,
))
# Power
if stats.power_total_mw >= thresholds.power_mw:
alerts.append(Alert(
component = "power",
message = (f"Power {stats.power_total_mw/1000:.1f}W "
f">= {thresholds.power_mw/1000:.0f}W"),
level = Alert.WARN,
value = stats.power_total_mw,
threshold = thresholds.power_mw,
))
return alerts
# ── Tegrastats parser ─────────────────────────────────────────────────────────
class TegrastatsParser:
"""Parse a single tegrastats output line into a JetsonStats snapshot."""
# RAM 1234/7772MB (lfb …)
_RE_RAM = re.compile(r"RAM\s+(\d+)/(\d+)MB")
# SWAP 0/3886MB
_RE_SWAP = re.compile(r"SWAP\s+(\d+)/(\d+)MB")
# CPU [12%@1190,5%@729,…]
_RE_CPU = re.compile(r"CPU\s+\[([^\]]+)\]")
# GPC_FREQ 4%@918 (GPU usage + freq)
_RE_GPU = re.compile(r"GPC_FREQ\s+(\d+)%@(\d+)")
# SensorName@Temp e.g. CPU@39.6C GPU@37.6C tj@41.3C
_RE_TEMP = re.compile(r"(\w+)@(\d+\.?\d*)C")
# VDD_IN 4002mW/4002mW (total board power: instant/average)
_RE_VDD = re.compile(r"VDD_IN\s+(\d+)mW/(\d+)mW")
# VDD_CPU_CV 700mW
_RE_VCPU = re.compile(r"VDD_CPU_CV\s+(\d+)mW/(\d+)mW")
# VDD_GPU_SOC 1866mW
_RE_VGPU = re.compile(r"VDD_GPU_SOC\s+(\d+)mW/(\d+)mW")
def parse(self, line: str) -> JetsonStats:
"""Parse one tegrastats text line. Returns JetsonStats."""
stats = JetsonStats(source="tegrastats")
# RAM
m = self._RE_RAM.search(line)
if m:
stats.ram_used_mb = int(m.group(1))
stats.ram_total_mb = int(m.group(2))
if stats.ram_total_mb > 0:
stats.ram_pct = stats.ram_used_mb / stats.ram_total_mb * 100.0
# SWAP
m = self._RE_SWAP.search(line)
if m:
stats.swap_used_mb = int(m.group(1))
stats.swap_total_mb = int(m.group(2))
# CPU cores
m = self._RE_CPU.search(line)
if m:
cores: List[CpuCore] = []
for idx, token in enumerate(m.group(1).split(",")):
parts = token.split("@")
pct_str = parts[0].rstrip("%")
freq = int(parts[1]) if len(parts) > 1 else 0
try:
cores.append(CpuCore(index=idx,
usage_pct=float(pct_str),
freq_mhz=freq))
except ValueError:
pass
stats.cpu_cores = cores
if cores:
stats.cpu_avg_pct = sum(c.usage_pct for c in cores) / len(cores)
# GPU
m = self._RE_GPU.search(line)
if m:
stats.gpu_pct = float(m.group(1))
stats.gpu_freq_mhz = int(m.group(2))
# Temperatures
temps: Dict[str, float] = {}
for m in self._RE_TEMP.finditer(line):
name, val = m.group(1), float(m.group(2))
# Skip tokens that are part of CPU/freq strings like "1190"
if name.isdigit():
continue
temps[name] = val
stats.temperatures = temps
stats.temp_max_c = max(temps.values(), default=0.0)
# Power
m = self._RE_VDD.search(line)
if m:
stats.power_total_mw = int(m.group(1))
m = self._RE_VCPU.search(line)
if m:
stats.power_cpu_mw = int(m.group(1))
m = self._RE_VGPU.search(line)
if m:
stats.power_gpu_mw = int(m.group(1))
# Disk — tegrastats does not include disk; use psutil fallback
stats.disk_used_gb, stats.disk_total_gb, stats.disk_pct = \
_read_disk_psutil()
stats.timestamp = time.time()
return stats
# ── Disk helper ───────────────────────────────────────────────────────────────
def _read_disk_psutil(path: str = "/") -> Tuple[float, float, float]:
"""Return (used_gb, total_gb, pct) for the given path."""
if HAS_PSUTIL:
try:
u = _psutil.disk_usage(path)
used_gb = u.used / 1024**3
total_gb = u.total / 1024**3
pct = u.percent
return used_gb, total_gb, pct
except OSError:
pass
# shutil fallback
try:
u = shutil.disk_usage(path)
total_gb = u.total / 1024**3
used_gb = (u.total - u.free) / 1024**3
pct = used_gb / total_gb * 100.0 if total_gb else 0.0
return used_gb, total_gb, pct
except OSError:
return 0.0, 0.0, 0.0
# ── Stats readers ─────────────────────────────────────────────────────────────
class TegrastatsReader:
"""Reads one snapshot from tegrastats subprocess."""
_TEGRASTATS_BIN = "tegrastats"
def __init__(self, interval_ms: int = 200) -> None:
self._interval_ms = interval_ms
self._parser = TegrastatsParser()
def is_available(self) -> bool:
return shutil.which(self._TEGRASTATS_BIN) is not None
def read(self) -> Optional[JetsonStats]:
"""Launch tegrastats, capture one line, parse it."""
if not self.is_available():
return None
try:
# --interval N: emit every N ms; capture 1 line then kill
result = subprocess.run(
[self._TEGRASTATS_BIN, f"--interval", str(self._interval_ms)],
capture_output=True,
text=True,
timeout=3.0,
)
# tegrastats may not exit on its own; take first non-empty line
for line in result.stdout.splitlines():
line = line.strip()
if line:
return self._parser.parse(line)
except (subprocess.TimeoutExpired, subprocess.SubprocessError,
FileNotFoundError, OSError):
pass
return None
class JtopReader:
"""Reads one snapshot via the jtop Python library (jetson_stats)."""
def is_available(self) -> bool:
return HAS_JTOP
def read(self) -> Optional[JetsonStats]:
if not HAS_JTOP:
return None
try:
with _jtop() as jetson:
stats = JetsonStats(source="jtop")
# CPU
if hasattr(jetson, "cpu"):
cores = []
cpu_data = jetson.cpu
# jetson_stats 4.x: cpu is a dict with keys 'CPU1', 'CPU2', ...
if isinstance(cpu_data, dict):
for i, (k, v) in enumerate(sorted(cpu_data.items())):
if isinstance(v, dict):
pct = float(v.get("val", 0) or 0)
freq = int(v.get("frq", 0) or 0)
else:
pct, freq = 0.0, 0
cores.append(CpuCore(index=i, usage_pct=pct,
freq_mhz=freq))
stats.cpu_cores = cores
stats.cpu_avg_pct = (sum(c.usage_pct for c in cores)
/ len(cores)) if cores else 0.0
# GPU
if hasattr(jetson, "gpu"):
gpu_d = jetson.gpu
if isinstance(gpu_d, dict):
stats.gpu_pct = float(gpu_d.get("val", 0) or 0)
stats.gpu_freq_mhz = int(gpu_d.get("frq", 0) or 0)
# RAM
if hasattr(jetson, "memory"):
mem = jetson.memory
if isinstance(mem, dict):
ram = mem.get("RAM", {})
if isinstance(ram, dict):
stats.ram_used_mb = int(ram.get("used", 0) or 0)
stats.ram_total_mb = int(ram.get("tot", 0) or 0)
swap = mem.get("SWAP", {})
if isinstance(swap, dict):
stats.swap_used_mb = int(swap.get("used", 0) or 0)
stats.swap_total_mb = int(swap.get("tot", 0) or 0)
if stats.ram_total_mb > 0:
stats.ram_pct = (stats.ram_used_mb /
stats.ram_total_mb * 100.0)
# Temperatures
if hasattr(jetson, "temperature"):
temps = {}
for k, v in jetson.temperature.items():
if isinstance(v, dict):
val = v.get("temp", None)
else:
val = v
if val is not None:
try:
temps[str(k)] = float(val)
except (TypeError, ValueError):
pass
stats.temperatures = temps
stats.temp_max_c = max(temps.values(), default=0.0)
# Fan
if hasattr(jetson, "fan"):
fan_d = jetson.fan
if isinstance(fan_d, dict):
stats.fan_pct = float(fan_d.get("speed", 0) or 0)
# Power
if hasattr(jetson, "power"):
pwr = jetson.power
if isinstance(pwr, dict):
tot = pwr.get("tot", {})
if isinstance(tot, dict):
stats.power_total_mw = int(
(tot.get("power", 0) or 0) * 1000)
# Disk
stats.disk_used_gb, stats.disk_total_gb, stats.disk_pct = \
_read_disk_psutil()
stats.timestamp = time.time()
return stats
except Exception:
return None
class MockReader:
"""Returns deterministic stats for testing and development."""
def __init__(self, **overrides) -> None:
self._overrides = overrides
def is_available(self) -> bool:
return True
def read(self) -> JetsonStats:
cores = [CpuCore(index=i, usage_pct=20.0 + i * 2, freq_mhz=1200)
for i in range(4)]
s = JetsonStats(
cpu_cores = cores,
cpu_avg_pct = sum(c.usage_pct for c in cores) / len(cores),
gpu_pct = 30.0,
gpu_freq_mhz = 918,
ram_used_mb = 3000,
ram_total_mb = 7772,
ram_pct = 3000 / 7772 * 100.0,
swap_used_mb = 0,
swap_total_mb= 3886,
temperatures = {"CPU": 40.0, "GPU": 38.0, "AO": 35.0, "tj": 42.0},
temp_max_c = 42.0,
fan_pct = 30.0,
power_total_mw = 8000,
power_cpu_mw = 2000,
power_gpu_mw = 1500,
disk_used_gb = 50.0,
disk_total_gb = 500.0,
disk_pct = 10.0,
source = "mock",
)
for k, v in self._overrides.items():
if hasattr(s, k):
setattr(s, k, v)
return s
# ── Auto-selecting reader factory ─────────────────────────────────────────────
def make_reader(prefer: str = "auto"):
"""Return the best available reader.
prefer: "jtop" | "tegrastats" | "mock" | "auto"
"""
if prefer == "mock":
return MockReader()
if prefer == "jtop" or (prefer == "auto" and HAS_JTOP):
r = JtopReader()
if r.is_available():
return r
if prefer == "tegrastats" or prefer == "auto":
r = TegrastatsReader()
if r.is_available():
return r
# Fallback: mock (dev machine without Jetson hw)
return MockReader()

View File

@ -0,0 +1,215 @@
"""system_monitor_node.py — ROS2 node for Jetson Orin system monitoring (Issue #631).
Publishes at 1 Hz:
/saltybot/system/stats std_msgs/String JSON stats payload
/saltybot/diagnostics diagnostic_msgs/DiagnosticArray health status
Stats source priority: jtop > tegrastats > mock (for testing).
"""
from __future__ import annotations
import json
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus, KeyValue
from saltybot_system_monitor.jetson_stats import (
AlertChecker,
AlertThresholds,
JetsonStats,
Alert,
make_reader,
)
class SystemMonitorNode(Node):
"""Publishes Jetson Orin hardware stats at 1 Hz."""
def __init__(self) -> None:
super().__init__("system_monitor_node")
# ── Parameters ─────────────────────────────────────────────────────
self.declare_parameter("publish_rate_hz", 1.0)
self.declare_parameter("stats_source", "auto") # auto | jtop | tegrastats | mock
self.declare_parameter("cpu_warn_pct", 85.0)
self.declare_parameter("gpu_warn_pct", 85.0)
self.declare_parameter("temp_warn_c", 80.0)
self.declare_parameter("disk_warn_pct", 90.0)
self.declare_parameter("ram_warn_pct", 90.0)
self.declare_parameter("power_warn_mw", 30_000.0)
rate = self.get_parameter("publish_rate_hz").value
source = self.get_parameter("stats_source").value
self._thresholds = AlertThresholds(
cpu_pct = float(self.get_parameter("cpu_warn_pct").value),
gpu_pct = float(self.get_parameter("gpu_warn_pct").value),
temp_c = float(self.get_parameter("temp_warn_c").value),
disk_pct = float(self.get_parameter("disk_warn_pct").value),
ram_pct = float(self.get_parameter("ram_warn_pct").value),
power_mw = float(self.get_parameter("power_warn_mw").value),
)
self._reader = make_reader(prefer=source)
self._checker = AlertChecker()
# ── Publishers ─────────────────────────────────────────────────────
self._stats_pub = self.create_publisher(String, "/saltybot/system/stats", 10)
self._diag_pub = self.create_publisher(
DiagnosticArray, "/saltybot/diagnostics", 10
)
# ── Timer ──────────────────────────────────────────────────────────
period = 1.0 / max(rate, 0.1)
self._timer = self.create_timer(period, self._on_timer)
self.get_logger().info(
f"SystemMonitorNode started (source={source}, rate={rate} Hz)"
)
# ── Timer callback ─────────────────────────────────────────────────────
def _on_timer(self) -> None:
stats = self._reader.read()
if stats is None:
self.get_logger().warning("Stats reader returned None — skipping cycle")
return
alerts = self._checker.check(stats, self._thresholds)
self._publish_stats(stats, alerts)
self._publish_diagnostics(stats, alerts)
# ── Stats publisher ────────────────────────────────────────────────────
def _publish_stats(self, stats: JetsonStats, alerts: list) -> None:
payload = {
"timestamp": stats.timestamp,
"source": stats.source,
"cpu": {
"avg_pct": stats.cpu_avg_pct,
"cores": [
{"index": c.index, "usage_pct": c.usage_pct, "freq_mhz": c.freq_mhz}
for c in stats.cpu_cores
],
},
"gpu": {
"pct": stats.gpu_pct,
"freq_mhz": stats.gpu_freq_mhz,
},
"ram": {
"used_mb": stats.ram_used_mb,
"total_mb": stats.ram_total_mb,
"pct": stats.ram_pct,
},
"swap": {
"used_mb": stats.swap_used_mb,
"total_mb": stats.swap_total_mb,
},
"temperatures": stats.temperatures,
"temp_max_c": stats.temp_max_c,
"fan_pct": stats.fan_pct,
"power": {
"total_mw": stats.power_total_mw,
"cpu_mw": stats.power_cpu_mw,
"gpu_mw": stats.power_gpu_mw,
},
"disk": {
"used_gb": stats.disk_used_gb,
"total_gb": stats.disk_total_gb,
"pct": stats.disk_pct,
},
"alerts": [
{
"component": a.component,
"message": a.message,
"level": a.level,
"value": a.value,
"threshold": a.threshold,
}
for a in alerts
],
}
msg = String()
msg.data = json.dumps(payload)
self._stats_pub.publish(msg)
# ── DiagnosticArray publisher ──────────────────────────────────────────
def _publish_diagnostics(self, stats: JetsonStats, alerts: list) -> None:
now = self.get_clock().now().to_msg()
diag = DiagnosticArray()
diag.header.stamp = now
# Overall system status — worst level among alerts
worst = 0
for a in alerts:
if a.level > worst:
worst = a.level
overall = DiagnosticStatus()
overall.name = "saltybot/system"
overall.hardware_id = "jetson_orin"
overall.level = worst
overall.message = "OK" if worst == 0 else (
"WARN" if worst == Alert.WARN else "ERROR"
)
overall.values = self._stats_to_kv(stats)
diag.status.append(overall)
# One status per alert
for a in alerts:
s = DiagnosticStatus()
s.name = f"saltybot/system/{a.component}"
s.hardware_id = "jetson_orin"
s.level = a.level
s.message = a.message
s.values = [
KeyValue(key="value", value=str(a.value)),
KeyValue(key="threshold", value=str(a.threshold)),
]
diag.status.append(s)
self._diag_pub.publish(diag)
# ── Helper: stats → KeyValue list ──────────────────────────────────────
@staticmethod
def _stats_to_kv(stats: JetsonStats) -> list:
pairs = [
("source", stats.source),
("cpu_avg_pct", f"{stats.cpu_avg_pct:.1f}"),
("gpu_pct", f"{stats.gpu_pct:.1f}"),
("gpu_freq_mhz", str(stats.gpu_freq_mhz)),
("ram_pct", f"{stats.ram_pct:.1f}"),
("ram_used_mb", str(stats.ram_used_mb)),
("ram_total_mb", str(stats.ram_total_mb)),
("temp_max_c", f"{stats.temp_max_c:.1f}"),
("fan_pct", f"{stats.fan_pct:.1f}"),
("power_total_mw", str(stats.power_total_mw)),
("disk_pct", f"{stats.disk_pct:.1f}"),
("disk_used_gb", f"{stats.disk_used_gb:.1f}"),
("disk_total_gb", f"{stats.disk_total_gb:.1f}"),
]
for name, temp in stats.temperatures.items():
pairs.append((f"temp_{name}_c", f"{temp:.1f}"))
return [KeyValue(key=k, value=v) for k, v in pairs]
def main(args=None) -> None:
rclpy.init(args=args)
node = SystemMonitorNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_system_monitor
[install]
install_scripts=$base/lib/saltybot_system_monitor

View File

@ -0,0 +1,30 @@
from setuptools import setup
package_name = "saltybot_system_monitor"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/system_monitor.launch.py"]),
(f"share/{package_name}/config", ["config/system_monitor.yaml"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-jetson",
maintainer_email="sl-jetson@saltylab.local",
description=(
"Jetson Orin system monitor: CPU/GPU/RAM/disk/temp/fan/power → "
"/saltybot/system/stats + DiagnosticArray (Issue #631)"
),
license="Apache-2.0",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"system_monitor_node = saltybot_system_monitor.system_monitor_node:main",
],
},
)

View File

@ -0,0 +1,507 @@
"""test_system_monitor.py — Unit tests for saltybot_system_monitor (Issue #631).
All tests are ROS2-free: they only import pure-Python modules from
saltybot_system_monitor.jetson_stats.
"""
from __future__ import annotations
import math
import sys
import time
from unittest.mock import patch, MagicMock
import pytest
# ---------------------------------------------------------------------------
# Ensure the package under test is importable (no ROS2 required)
# ---------------------------------------------------------------------------
import os
import importlib
_PKG_DIR = os.path.join(
os.path.dirname(__file__), "..", "saltybot_system_monitor"
)
sys.path.insert(0, os.path.join(_PKG_DIR, "..")) # adds src/ to path
from saltybot_system_monitor.jetson_stats import (
Alert,
AlertChecker,
AlertThresholds,
CpuCore,
DiskStats,
JetsonStats,
JtopReader,
MockReader,
TegrastatsParser,
TegrastatsReader,
make_reader,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_stats(**kw) -> JetsonStats:
"""Build a minimal JetsonStats with safe defaults, overriding with kw."""
defaults = dict(
timestamp=time.monotonic(),
cpu_cores=[CpuCore(index=0, usage_pct=10.0, freq_mhz=1200)],
cpu_avg_pct=10.0,
gpu_pct=5.0,
gpu_freq_mhz=500,
ram_used_mb=4096,
ram_total_mb=16384,
ram_pct=25.0,
swap_used_mb=0,
swap_total_mb=8192,
temperatures={"CPU": 45.0, "GPU": 42.0},
temp_max_c=45.0,
fan_pct=20.0,
power_total_mw=8000,
power_cpu_mw=3000,
power_gpu_mw=2000,
disk_used_gb=50.0,
disk_total_gb=500.0,
disk_pct=10.0,
source="test",
)
defaults.update(kw)
return JetsonStats(**defaults)
_DEFAULT_THRESH = AlertThresholds()
_CHECKER = AlertChecker()
def _alerts(**kw) -> list[Alert]:
return _CHECKER.check(_make_stats(**kw), _DEFAULT_THRESH)
def _alert_components(**kw) -> set[str]:
return {a.component for a in _alerts(**kw)}
# ===========================================================================
# CpuCore
# ===========================================================================
class TestCpuCore:
def test_basic(self):
c = CpuCore(index=2, usage_pct=55.5, freq_mhz=1800)
assert c.index == 2
assert c.usage_pct == pytest.approx(55.5)
assert c.freq_mhz == 1800
def test_default_freq(self):
c = CpuCore(index=0, usage_pct=0.0)
assert c.freq_mhz == 0
# ===========================================================================
# JetsonStats
# ===========================================================================
class TestJetsonStats:
def test_fields_present(self):
s = _make_stats()
assert hasattr(s, "cpu_avg_pct")
assert hasattr(s, "gpu_pct")
assert hasattr(s, "ram_pct")
assert hasattr(s, "temp_max_c")
assert hasattr(s, "fan_pct")
assert hasattr(s, "power_total_mw")
assert hasattr(s, "disk_pct")
assert hasattr(s, "source")
def test_source_default(self):
s = _make_stats(source="jtop")
assert s.source == "jtop"
# ===========================================================================
# AlertThresholds
# ===========================================================================
class TestAlertThresholds:
def test_defaults(self):
t = AlertThresholds()
assert t.cpu_pct == pytest.approx(85.0)
assert t.gpu_pct == pytest.approx(85.0)
assert t.temp_c == pytest.approx(80.0)
assert t.disk_pct == pytest.approx(90.0)
assert t.ram_pct == pytest.approx(90.0)
assert t.power_mw == pytest.approx(30_000.0)
def test_custom(self):
t = AlertThresholds(cpu_pct=70.0, temp_c=75.0)
assert t.cpu_pct == pytest.approx(70.0)
assert t.temp_c == pytest.approx(75.0)
# ===========================================================================
# AlertChecker — no alerts in normal range
# ===========================================================================
class TestAlertCheckerNoAlerts:
def test_no_alert_all_ok(self):
assert _alerts() == []
def test_cpu_just_below_warn(self):
# 84.9 < 85.0 — no alert
assert "cpu" not in _alert_components(cpu_avg_pct=84.9)
def test_gpu_just_below_warn(self):
assert "gpu" not in _alert_components(gpu_pct=84.9)
def test_temp_just_below_warn(self):
s = _make_stats(temperatures={"CPU": 79.9}, temp_max_c=79.9)
assert _CHECKER.check(s, _DEFAULT_THRESH) == []
def test_disk_just_below_warn(self):
assert "disk" not in _alert_components(disk_pct=89.9)
def test_ram_just_below_warn(self):
assert "ram" not in _alert_components(ram_pct=89.9)
# ===========================================================================
# AlertChecker — WARN level
# ===========================================================================
class TestAlertCheckerWarn:
def test_cpu_warn_at_threshold(self):
comps = _alert_components(cpu_avg_pct=85.0)
assert "cpu" in comps
alerts = _alerts(cpu_avg_pct=85.0)
cpu_alert = next(a for a in alerts if a.component == "cpu")
assert cpu_alert.level == Alert.WARN
def test_gpu_warn(self):
alerts = _alerts(gpu_pct=90.0)
gpu_alert = next(a for a in alerts if a.component == "gpu")
assert gpu_alert.level == Alert.WARN
def test_temp_warn(self):
s = _make_stats(temperatures={"AO": 80.0}, temp_max_c=80.0)
alerts = _CHECKER.check(s, _DEFAULT_THRESH)
temp_alerts = [a for a in alerts if "temp" in a.component.lower()]
assert any(a.level == Alert.WARN for a in temp_alerts)
def test_disk_warn(self):
alerts = _alerts(disk_pct=90.0)
disk_alert = next(a for a in alerts if a.component == "disk")
assert disk_alert.level == Alert.WARN
def test_ram_warn(self):
alerts = _alerts(ram_pct=90.0)
ram_alert = next(a for a in alerts if a.component == "ram")
assert ram_alert.level == Alert.WARN
def test_power_warn(self):
alerts = _alerts(power_total_mw=30_000)
power_alert = next(a for a in alerts if a.component == "power")
assert power_alert.level == Alert.WARN
def test_alert_carries_value_and_threshold(self):
alerts = _alerts(cpu_avg_pct=87.0)
a = next(x for x in alerts if x.component == "cpu")
assert a.value == pytest.approx(87.0)
assert a.threshold == pytest.approx(85.0)
# ===========================================================================
# AlertChecker — ERROR level (threshold + escalation)
# ===========================================================================
class TestAlertCheckerError:
def test_cpu_error_at_95(self):
# 85 + 10% = 95 → ERROR
alerts = _alerts(cpu_avg_pct=95.0)
a = next(x for x in alerts if x.component == "cpu")
assert a.level == Alert.ERROR
def test_gpu_error_at_95(self):
alerts = _alerts(gpu_pct=95.0)
a = next(x for x in alerts if x.component == "gpu")
assert a.level == Alert.ERROR
def test_temp_error_at_90(self):
# 80 + 10 = 90 → ERROR
s = _make_stats(temperatures={"CPU": 90.0}, temp_max_c=90.0)
alerts = _CHECKER.check(s, _DEFAULT_THRESH)
a = next(x for x in alerts if "temp" in x.component.lower())
assert a.level == Alert.ERROR
def test_disk_error_at_95(self):
# 90 + 5 = 95 → ERROR
alerts = _alerts(disk_pct=95.0)
a = next(x for x in alerts if x.component == "disk")
assert a.level == Alert.ERROR
def test_ram_error_at_95(self):
alerts = _alerts(ram_pct=95.0)
a = next(x for x in alerts if x.component == "ram")
assert a.level == Alert.ERROR
def test_multiple_errors(self):
alerts = _alerts(cpu_avg_pct=99.0, gpu_pct=99.0, disk_pct=99.0)
comps = {a.component for a in alerts if a.level == Alert.ERROR}
assert "cpu" in comps
assert "gpu" in comps
assert "disk" in comps
# ===========================================================================
# TegrastatsParser
# ===========================================================================
class TestTegrastatsParser:
P = TegrastatsParser()
# ── RAM ────────────────────────────────────────────────────────────────
def test_ram(self):
s = self.P.parse("RAM 2048/16384MB (lfb 512x4MB) SWAP 256/8192MB")
assert s.ram_used_mb == 2048
assert s.ram_total_mb == 16384
assert s.ram_pct == pytest.approx(2048 / 16384 * 100, rel=1e-3)
def test_ram_zero_total_doesnt_crash(self):
# Shouldn't happen in practice, but parser must not divide-by-zero
s = self.P.parse("RAM 0/0MB SWAP 0/0MB CPU [] GPC_FREQ 0%@0")
assert s.ram_pct == pytest.approx(0.0)
# ── SWAP ───────────────────────────────────────────────────────────────
def test_swap(self):
s = self.P.parse("RAM 1024/8192MB (lfb 4x4MB) SWAP 512/4096MB")
assert s.swap_used_mb == 512
assert s.swap_total_mb == 4096
# ── CPU ────────────────────────────────────────────────────────────────
def test_cpu_single_core(self):
s = self.P.parse("RAM 1000/8000MB SWAP 0/0MB CPU [50%@1200]")
assert len(s.cpu_cores) == 1
assert s.cpu_cores[0].usage_pct == pytest.approx(50.0)
assert s.cpu_cores[0].freq_mhz == 1200
assert s.cpu_avg_pct == pytest.approx(50.0)
def test_cpu_four_cores(self):
s = self.P.parse(
"RAM 4096/16384MB SWAP 0/8192MB "
"CPU [10%@1000,20%@1200,30%@1400,40%@1600]"
)
assert len(s.cpu_cores) == 4
assert s.cpu_cores[0].usage_pct == pytest.approx(10.0)
assert s.cpu_cores[3].usage_pct == pytest.approx(40.0)
assert s.cpu_avg_pct == pytest.approx(25.0)
def test_cpu_idle_core(self):
# off/0 means core is offline — should be parsed as 0% usage
s = self.P.parse(
"RAM 4096/16384MB SWAP 0/8192MB CPU [10%@1200,off,20%@1400,off]"
)
# off cores are skipped or treated as 0
assert len(s.cpu_cores) >= 2
# ── GPU ────────────────────────────────────────────────────────────────
def test_gpu(self):
s = self.P.parse(
"RAM 4096/16384MB SWAP 0/8192MB CPU [50%@1200] GPC_FREQ 75%@850"
)
assert s.gpu_pct == pytest.approx(75.0)
assert s.gpu_freq_mhz == 850
def test_gpu_absent(self):
s = self.P.parse("RAM 4096/16384MB SWAP 0/8192MB CPU [50%@1200]")
assert s.gpu_pct == pytest.approx(0.0)
assert s.gpu_freq_mhz == 0
# ── Temperatures ───────────────────────────────────────────────────────
def test_temperatures(self):
s = self.P.parse(
"RAM 4096/16384MB SWAP 0/8192MB CPU [50%@1200] "
"CPU@45.5C GPU@42.0C AO@38.0C"
)
assert "CPU" in s.temperatures
assert s.temperatures["CPU"] == pytest.approx(45.5)
assert s.temperatures["GPU"] == pytest.approx(42.0)
assert s.temp_max_c == pytest.approx(45.5)
def test_temp_max_computed(self):
s = self.P.parse(
"RAM 4096/16384MB SWAP 0/8192MB CPU [50%@1200] "
"CPU@55.0C GPU@72.0C"
)
assert s.temp_max_c == pytest.approx(72.0)
# ── Power ──────────────────────────────────────────────────────────────
def test_power_vdd_in(self):
s = self.P.parse(
"RAM 4096/16384MB SWAP 0/8192MB CPU [50%@1200] "
"VDD_IN 8000mW/9000mW VDD_CPU_CV 3000mW/3500mW VDD_GPU_SOC 2000mW/2500mW"
)
assert s.power_total_mw == 8000
assert s.power_cpu_mw == 3000
assert s.power_gpu_mw == 2000
def test_power_absent(self):
s = self.P.parse("RAM 4096/16384MB SWAP 0/8192MB CPU [50%@1200]")
assert s.power_total_mw == 0
# ── Full realistic line ────────────────────────────────────────────────
def test_full_tegrastats_line(self):
line = (
"RAM 5012/15625MB (lfb 1x2MB) SWAP 0/7812MB "
"CPU [12%@1190,8%@1190,15%@1190,9%@1190,5%@1190,7%@1190] "
"GPC_FREQ 0%@318 CPU@47.218C GPU@44.125C Tdiode@45.25C AO@47.5C "
"thermal@46.7C VDD_IN 6391mW/6391mW VDD_CPU_CV 2042mW/2042mW "
"VDD_GPU_SOC 1499mW/1499mW"
)
s = self.P.parse(line)
assert len(s.cpu_cores) == 6
assert s.gpu_pct == pytest.approx(0.0)
assert s.ram_used_mb == 5012
assert s.ram_total_mb == 15625
assert s.temperatures["CPU"] == pytest.approx(47.218)
assert s.power_total_mw == 6391
assert s.source == "tegrastats"
# ===========================================================================
# MockReader
# ===========================================================================
class TestMockReader:
def test_returns_jetson_stats(self):
r = MockReader()
s = r.read()
assert isinstance(s, JetsonStats)
def test_source_is_mock(self):
s = MockReader().read()
assert s.source == "mock"
def test_deterministic(self):
r = MockReader()
s1 = r.read()
s2 = r.read()
assert s1.cpu_avg_pct == s2.cpu_avg_pct
assert s1.gpu_pct == s2.gpu_pct
def test_overrides(self):
r = MockReader(cpu_avg_pct=99.9, gpu_pct=88.8)
s = r.read()
assert s.cpu_avg_pct == pytest.approx(99.9)
assert s.gpu_pct == pytest.approx(88.8)
def test_override_temperatures(self):
r = MockReader(temperatures={"CPU": 75.0}, temp_max_c=75.0)
s = r.read()
assert s.temperatures == {"CPU": 75.0}
assert s.temp_max_c == pytest.approx(75.0)
# ===========================================================================
# TegrastatsReader availability check
# ===========================================================================
class TestTegrastatsReader:
def test_is_available_true_when_found(self):
r = TegrastatsReader()
with patch("shutil.which", return_value="/usr/bin/tegrastats"):
assert r.is_available() is True
def test_is_available_false_when_missing(self):
r = TegrastatsReader()
with patch("shutil.which", return_value=None):
assert r.is_available() is False
# ===========================================================================
# JtopReader availability check
# ===========================================================================
class TestJtopReader:
def test_is_available_false_without_jtop(self):
r = JtopReader()
with patch.dict(sys.modules, {"jtop": None}):
# If jtop is not importable, should return False
# (actual behaviour depends on HAS_JTOP at import time)
result = r.is_available()
assert isinstance(result, bool)
# ===========================================================================
# make_reader factory
# ===========================================================================
class TestMakeReader:
def test_mock_returns_mock_reader(self):
r = make_reader(prefer="mock")
assert isinstance(r, MockReader)
def test_auto_with_no_backends_falls_back_to_mock(self):
with (
patch("shutil.which", return_value=None),
patch("saltybot_system_monitor.jetson_stats.HAS_JTOP", False),
):
r = make_reader(prefer="auto")
assert isinstance(r, MockReader)
def test_tegrastats_prefer(self):
with patch("shutil.which", return_value="/usr/bin/tegrastats"):
r = make_reader(prefer="tegrastats")
assert isinstance(r, TegrastatsReader)
def test_auto_prefers_tegrastats_over_mock(self):
with (
patch("shutil.which", return_value="/usr/bin/tegrastats"),
patch("saltybot_system_monitor.jetson_stats.HAS_JTOP", False),
):
r = make_reader(prefer="auto")
assert isinstance(r, TegrastatsReader)
# ===========================================================================
# Edge cases
# ===========================================================================
class TestEdgeCases:
P = TegrastatsParser()
def test_parse_empty_string(self):
s = self.P.parse("")
assert isinstance(s, JetsonStats)
assert s.ram_used_mb == 0
assert s.cpu_cores == []
def test_parse_partial_line(self):
s = self.P.parse("RAM 1024/8192MB")
assert s.ram_used_mb == 1024
assert s.cpu_cores == []
def test_alert_checker_empty_temps(self):
s = _make_stats(temperatures={}, temp_max_c=0.0)
alerts = _CHECKER.check(s, _DEFAULT_THRESH)
temp_alerts = [a for a in alerts if "temp" in a.component.lower()]
assert temp_alerts == []
def test_alert_checker_multiple_temp_sensors(self):
s = _make_stats(
temperatures={"CPU": 82.0, "GPU": 50.0, "AO": 40.0},
temp_max_c=82.0,
)
alerts = _CHECKER.check(s, _DEFAULT_THRESH)
temp_alerts = [a for a in alerts if "temp" in a.component.lower()]
assert len(temp_alerts) >= 1
# Only CPU temp should fire
assert all("CPU" in a.component or a.value >= 80.0 for a in temp_alerts)

View File

@ -36,6 +36,7 @@
#include "servo_bus.h"
#include "gimbal.h"
#include "lvc.h"
#include "uart_protocol.h"
#include <math.h>
#include <string.h>
#include <stdio.h>
@ -261,6 +262,9 @@ int main(void) {
/* Init LVC: low voltage cutoff state machine (Issue #613) */
lvc_init();
/* Init UART command protocol for Jetson (UART5 PC12/PD2, 115200 baud, Issue #629) */
uart_protocol_init();
/* Probe I2C1 for optional sensors — skip gracefully if not found */
int baro_chip = 0; /* chip_id: 0x58=BMP280, 0x60=BME280, 0=absent */
mag_type_t mag_type = MAG_NONE;
@ -293,6 +297,7 @@ int main(void) {
uint32_t can_cmd_tick = 0; /* CAN velocity command TX timer (Issue #597) */
uint32_t can_tlm_tick = 0; /* JLINK_TLM_CAN_STATS transmit timer (Issue #597) */
uint32_t lvc_tlm_tick = 0; /* JLINK_TLM_LVC transmit timer (Issue #613) */
uint32_t uart_status_tick = 0; /* UART protocol STATUS frame timer (Issue #629) */
uint8_t pm_pwm_phase = 0; /* Software PWM counter for sleep LED */
const float dt = 1.0f / PID_LOOP_HZ; /* 1ms at 1kHz */
@ -368,6 +373,9 @@ int main(void) {
/* CAN bus RX: drain FIFO0 and parse feedback frames (Issue #597) */
can_driver_process();
/* UART command protocol RX: parse Jetson frames (Issue #629) */
uart_protocol_process();
/* Handle JLink one-shot flags from Jetson binary protocol */
if (jlink_state.estop_req) {
jlink_state.estop_req = 0u;
@ -430,6 +438,39 @@ int main(void) {
(double)bal.kp, (double)bal.ki, (double)bal.kd);
}
/* UART protocol: handle commands from Jetson (Issue #629) */
{
extern UartProtState uart_prot_state;
if (uart_prot_state.vel_updated) {
uart_prot_state.vel_updated = 0u;
if (bal.state == BALANCE_ARMED && !lvc_is_cutoff()) {
can_cmd_t ucmd_l = { uart_prot_state.left_rpm, 0 };
can_cmd_t ucmd_r = { uart_prot_state.right_rpm, 0 };
can_driver_send_cmd(CAN_NODE_LEFT, &ucmd_l);
can_driver_send_cmd(CAN_NODE_RIGHT, &ucmd_r);
}
}
if (uart_prot_state.pid_updated) {
uart_prot_state.pid_updated = 0u;
bal.kp = uart_prot_state.pid_kp;
bal.ki = uart_prot_state.pid_ki;
bal.kd = uart_prot_state.pid_kd;
}
if (uart_prot_state.estop_req) {
uart_prot_state.estop_req = 0u;
safety_remote_estop(ESTOP_REMOTE);
safety_arm_cancel();
balance_disarm(&bal);
motor_driver_estop(&motors);
}
if (uart_prot_state.estop_clear_req) {
uart_prot_state.estop_clear_req = 0u;
if (safety_remote_estop_active() && bal.state == BALANCE_DISARMED)
safety_remote_estop_clear();
}
}
/* FAULT_LOG_GET: send fault log telemetry to Jetson (Issue #565) */
if (jlink_state.fault_log_req) {
jlink_state.fault_log_req = 0u;
@ -768,6 +809,20 @@ int main(void) {
jlink_send_lvc_tlm(&ltlm);
}
/* UART protocol: send STATUS to Jetson at 10 Hz (Issue #629) */
if (now - uart_status_tick >= 100u) {
uart_status_tick = now;
uart_prot_status_t ups;
ups.pitch_x10 = (int16_t)(bal.pitch_deg * 10.0f);
ups.motor_cmd = bal.motor_cmd;
uint32_t _uv = battery_read_mv();
ups.vbat_mv = (_uv > 65535u) ? 65535u : (uint16_t)_uv;
ups.balance_state = (uint8_t)bal.state;
ups.estop_active = safety_remote_estop_active() ? 1u : 0u;
uart_protocol_send_status(&ups);
}
/* USB telemetry at 50Hz (only when streaming enabled and calibration done) */
if (cdc_streaming && mpu6000_is_calibrated() && now - send_tick >= 20) {
send_tick = now;

333
src/uart_protocol.c Normal file
View File

@ -0,0 +1,333 @@
/*
* uart_protocol.c -- Structured UART command protocol (Issue #629)
*
* UART5 (PC12=TX / PD2=RX) at 115200 baud.
* DMA1_Stream0_Channel4 runs in circular mode to fill a 256-byte ring buffer.
* uart_protocol_process() is called each main loop tick to drain new bytes
* through a lightweight state-machine parser.
*
* Frame: [STX=0x02][LEN][CMD][PAYLOAD...][CRC8][ETX=0x03]
* LEN = 1 (CMD) + payload_len
* CRC8 = SMBUS (poly 0x07, init 0x00) computed over CMD+PAYLOAD bytes
*/
#include "uart_protocol.h"
#include "config.h"
#include "stm32f7xx_hal.h"
#include <string.h>
/* ---- DMA ring buffer ---- */
static volatile uint8_t s_rx_buf[UPROT_RX_BUF_LEN];
static uint16_t s_rx_head = 0; /* last byte we processed */
/* ---- HAL handles ---- */
static UART_HandleTypeDef s_huart;
static DMA_HandleTypeDef s_hdma_rx;
/* ---- TX buffer (largest TX frame = URESP_STATUS: 13 bytes) ---- */
#define TX_BUF_LEN 16u
static uint8_t s_tx_buf[TX_BUF_LEN];
/* ---- Public state ---- */
volatile UartProtState uart_prot_state;
/* ---- Parser state ---- */
typedef enum {
PS_IDLE = 0,
PS_LEN,
PS_CMD,
PS_PAYLOAD,
PS_CRC,
PS_ETX,
} ParseState;
static ParseState s_ps = PS_IDLE;
static uint8_t s_frame_len = 0; /* LEN byte value */
static uint8_t s_frame_cmd = 0;
static uint8_t s_payload_buf[UPROT_MAX_PAYLOAD];
static uint8_t s_payload_idx = 0;
static uint8_t s_frame_crc = 0;
/* ================================================================
* CRC8-SMBUS: poly=0x07, init=0x00
* ================================================================ */
static uint8_t crc8_smbus(const uint8_t *data, uint8_t len)
{
uint8_t crc = 0x00u;
for (uint8_t i = 0u; i < len; i++) {
crc ^= data[i];
for (uint8_t b = 0u; b < 8u; b++) {
crc = (crc & 0x80u) ? (uint8_t)((crc << 1u) ^ 0x07u)
: (uint8_t)(crc << 1u);
}
}
return crc;
}
/* ================================================================
* TX helpers
* ================================================================ */
static void tx_send(const uint8_t *buf, uint8_t len)
{
/* Blocking transmit — frames are short (<=13 bytes @ 115200 = <1.2 ms) */
HAL_UART_Transmit(&s_huart, (uint8_t *)buf, len, 5u);
}
static void send_ack(uint8_t cmd)
{
/* [STX][LEN=2][URESP_ACK][cmd][CRC8][ETX] = 6 bytes */
uint8_t payload[2] = { URESP_ACK, cmd };
uint8_t crc = crc8_smbus(payload, 2u);
uint8_t frame[6] = { UPROT_STX, 0x02u, URESP_ACK, cmd, crc, UPROT_ETX };
tx_send(frame, 6u);
}
static void send_nack(uint8_t cmd, uint8_t err)
{
/* [STX][LEN=3][URESP_NACK][cmd][err][CRC8][ETX] = 7 bytes */
uint8_t payload[3] = { URESP_NACK, cmd, err };
uint8_t crc = crc8_smbus(payload, 3u);
uint8_t frame[7] = { UPROT_STX, 0x03u, URESP_NACK, cmd, err, crc, UPROT_ETX };
tx_send(frame, 7u);
}
/* ================================================================
* Command dispatcher (called when a valid frame is fully received)
* ================================================================ */
static void dispatch(uint8_t cmd, const uint8_t *payload, uint8_t plen)
{
switch (cmd) {
case UCMD_SET_VELOCITY:
if (plen != 4u) { send_nack(cmd, UERR_BAD_LENGTH); return; }
{
int16_t lr, rr;
memcpy(&lr, &payload[0], 2u);
memcpy(&rr, &payload[2], 2u);
uart_prot_state.left_rpm = lr;
uart_prot_state.right_rpm = rr;
uart_prot_state.vel_updated = 1u;
}
send_ack(cmd);
break;
case UCMD_GET_STATUS:
if (plen != 0u) { send_nack(cmd, UERR_BAD_LENGTH); return; }
/* ACK not sent; STATUS response IS the reply */
uart_prot_state.vel_updated = uart_prot_state.vel_updated; /* no-op flag; main sends status */
/* Signal main loop to send status — reuse vel_updated channel not appropriate;
* Instead send ACK here and let main loop send STATUS immediately after.
* To keep ISR-safe, just set a flag and let main loop build the payload. */
/* Simple approach: ack first, status built by main.c using uart_protocol_send_status() */
send_ack(cmd); /* main.c checks for vel_updated; GET_STATUS is handled by main */
break;
case UCMD_SET_PID:
if (plen != 12u) { send_nack(cmd, UERR_BAD_LENGTH); return; }
{
float kp, ki, kd;
memcpy(&kp, &payload[0], 4u);
memcpy(&ki, &payload[4], 4u);
memcpy(&kd, &payload[8], 4u);
uart_prot_state.pid_kp = kp;
uart_prot_state.pid_ki = ki;
uart_prot_state.pid_kd = kd;
uart_prot_state.pid_updated = 1u;
}
send_ack(cmd);
break;
case UCMD_ESTOP:
if (plen != 0u) { send_nack(cmd, UERR_BAD_LENGTH); return; }
uart_prot_state.estop_req = 1u;
send_ack(cmd);
break;
case UCMD_CLEAR_ESTOP:
if (plen != 0u) { send_nack(cmd, UERR_BAD_LENGTH); return; }
uart_prot_state.estop_clear_req = 1u;
send_ack(cmd);
break;
default:
send_nack(cmd, UERR_UNKNOWN_CMD);
break;
}
}
/* ================================================================
* uart_protocol_init()
* ================================================================ */
void uart_protocol_init(void)
{
/* ---- GPIO: PC12 = TX, PD2 = RX ---- */
__HAL_RCC_GPIOC_CLK_ENABLE();
__HAL_RCC_GPIOD_CLK_ENABLE();
GPIO_InitTypeDef gpio = {0};
gpio.Mode = GPIO_MODE_AF_PP;
gpio.Pull = GPIO_PULLUP;
gpio.Speed = GPIO_SPEED_FREQ_HIGH;
gpio.Alternate = GPIO_AF8_UART5;
gpio.Pin = GPIO_PIN_12;
HAL_GPIO_Init(GPIOC, &gpio);
gpio.Pin = GPIO_PIN_2;
HAL_GPIO_Init(GPIOD, &gpio);
/* ---- DMA1_Stream0 Channel4 → UART5_RX (circular) ---- */
__HAL_RCC_DMA1_CLK_ENABLE();
s_hdma_rx.Instance = DMA1_Stream0;
s_hdma_rx.Init.Channel = DMA_CHANNEL_4;
s_hdma_rx.Init.Direction = DMA_PERIPH_TO_MEMORY;
s_hdma_rx.Init.PeriphInc = DMA_PINC_DISABLE;
s_hdma_rx.Init.MemInc = DMA_MINC_ENABLE;
s_hdma_rx.Init.PeriphDataAlignment = DMA_PDATAALIGN_BYTE;
s_hdma_rx.Init.MemDataAlignment = DMA_MDATAALIGN_BYTE;
s_hdma_rx.Init.Mode = DMA_CIRCULAR;
s_hdma_rx.Init.Priority = DMA_PRIORITY_LOW;
s_hdma_rx.Init.FIFOMode = DMA_FIFOMODE_DISABLE;
if (HAL_DMA_Init(&s_hdma_rx) != HAL_OK) { return; }
/* ---- UART5 at UART_PROT_BAUD ---- */
__HAL_RCC_UART5_CLK_ENABLE();
s_huart.Instance = UART5;
s_huart.Init.BaudRate = UART_PROT_BAUD;
s_huart.Init.WordLength = UART_WORDLENGTH_8B;
s_huart.Init.StopBits = UART_STOPBITS_1;
s_huart.Init.Parity = UART_PARITY_NONE;
s_huart.Init.Mode = UART_MODE_TX_RX;
s_huart.Init.HwFlowCtl = UART_HWCONTROL_NONE;
s_huart.Init.OverSampling = UART_OVERSAMPLING_16;
__HAL_LINKDMA(&s_huart, hdmarx, s_hdma_rx);
if (HAL_UART_Init(&s_huart) != HAL_OK) { return; }
/* ---- Start circular DMA reception ---- */
HAL_UART_Receive_DMA(&s_huart, (uint8_t *)s_rx_buf, UPROT_RX_BUF_LEN);
memset((void *)&uart_prot_state, 0, sizeof(uart_prot_state));
s_rx_head = 0u;
s_ps = PS_IDLE;
}
/* ================================================================
* uart_protocol_process() -- parse DMA ring buffer, one byte at a time
* ================================================================ */
void uart_protocol_process(void)
{
/* DMA write position: bytes not yet consumed by the parser */
uint16_t dma_tail = (uint16_t)(UPROT_RX_BUF_LEN -
__HAL_DMA_GET_COUNTER(&s_hdma_rx));
while (s_rx_head != dma_tail) {
uint8_t b = s_rx_buf[s_rx_head];
s_rx_head = (s_rx_head + 1u) & (UPROT_RX_BUF_LEN - 1u);
switch (s_ps) {
case PS_IDLE:
if (b == UPROT_STX) { s_ps = PS_LEN; }
break;
case PS_LEN:
/* LEN must be 1..UPROT_MAX_PAYLOAD+1 */
if (b == 0u || b > (UPROT_MAX_PAYLOAD + 1u)) {
s_ps = PS_IDLE; /* invalid length */
} else {
s_frame_len = b;
s_payload_idx = 0u;
s_ps = PS_CMD;
}
break;
case PS_CMD:
s_frame_cmd = b;
if (s_frame_len == 1u) {
/* No payload bytes: go straight to CRC */
s_ps = PS_CRC;
} else {
s_ps = PS_PAYLOAD;
}
break;
case PS_PAYLOAD:
if (s_payload_idx < UPROT_MAX_PAYLOAD) {
s_payload_buf[s_payload_idx] = b;
}
s_payload_idx++;
/* LEN-1 payload bytes collected (LEN includes CMD byte) */
if (s_payload_idx >= (uint8_t)(s_frame_len - 1u)) {
s_ps = PS_CRC;
}
break;
case PS_CRC:
s_frame_crc = b;
s_ps = PS_ETX;
break;
case PS_ETX:
if (b == UPROT_ETX) {
/* Validate CRC over CMD+PAYLOAD */
uint8_t crc_data[UPROT_MAX_PAYLOAD + 1u];
uint8_t crc_len = (uint8_t)(s_frame_len); /* CMD + payload bytes */
crc_data[0] = s_frame_cmd;
uint8_t plen = (uint8_t)(s_frame_len - 1u);
if (plen > 0u) {
memcpy(&crc_data[1], s_payload_buf, plen);
}
uint8_t calc = crc8_smbus(crc_data, crc_len);
if (calc == s_frame_crc) {
/* Valid frame: update heartbeat and dispatch */
uart_prot_state.last_rx_ms = HAL_GetTick();
dispatch(s_frame_cmd, s_payload_buf, plen);
} else {
send_nack(s_frame_cmd, UERR_BAD_CRC);
}
}
/* Always return to IDLE after ETX (or wrong ETX byte) */
s_ps = PS_IDLE;
break;
default:
s_ps = PS_IDLE;
break;
}
}
}
/* ================================================================
* uart_protocol_is_active()
* ================================================================ */
bool uart_protocol_is_active(uint32_t now_ms)
{
if (uart_prot_state.last_rx_ms == 0u) { return false; }
return (now_ms - uart_prot_state.last_rx_ms) < UART_PROT_HB_TIMEOUT_MS;
}
/* ================================================================
* uart_protocol_send_status()
* ================================================================ */
void uart_protocol_send_status(const uart_prot_status_t *s)
{
/*
* Frame: [STX][LEN=9][0x82][8 bytes status][CRC8][ETX] = 13 bytes
*/
const uint8_t plen = (uint8_t)sizeof(uart_prot_status_t); /* 8 */
const uint8_t len = 1u + plen; /* 9 */
uint8_t frame[13];
frame[0] = UPROT_STX;
frame[1] = len;
frame[2] = URESP_STATUS;
memcpy(&frame[3], s, plen);
/* CRC over CMD + payload */
uint8_t crc_buf[9];
crc_buf[0] = URESP_STATUS;
memcpy(&crc_buf[1], s, plen);
frame[3 + plen] = crc8_smbus(crc_buf, len);
frame[3 + plen + 1] = UPROT_ETX;
tx_send(frame, (uint8_t)(3u + plen + 2u)); /* 13 bytes total */
}