feat(social): system resource monitor for Jetson Orin (Issue #355) #357

Merged
sl-jetson merged 1 commits from sl-jetson/issue-355-sysmon into main 2026-03-03 14:32:49 -05:00
5 changed files with 1183 additions and 0 deletions

View File

@ -0,0 +1,13 @@
sysmon_node:
ros__parameters:
publish_rate: 1.0 # resource publish frequency (Hz)
disk_path: "/" # filesystem path for disk usage
# Jetson Orin GPU load sysfs path (per-mille or percent depending on kernel)
gpu_sysfs_path: "/sys/devices/gpu.0/load"
# Glob patterns for thermal zone discovery
thermal_glob: "/sys/devices/virtual/thermal/thermal_zone*/temp"
thermal_type_glob: "/sys/devices/virtual/thermal/thermal_zone*/type"
output_topic: "/saltybot/system_resources"

View File

@ -0,0 +1,40 @@
"""sysmon.launch.py — Launch system resource monitor (Issue #355).
Usage:
ros2 launch saltybot_social sysmon.launch.py
ros2 launch saltybot_social sysmon.launch.py publish_rate:=2.0
ros2 launch saltybot_social sysmon.launch.py disk_path:=/data
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
pkg = get_package_share_directory("saltybot_social")
cfg = os.path.join(pkg, "config", "sysmon_params.yaml")
return LaunchDescription([
DeclareLaunchArgument("publish_rate", default_value="1.0",
description="Resource publish frequency (Hz)"),
DeclareLaunchArgument("disk_path", default_value="/",
description="Filesystem path for disk usage"),
Node(
package="saltybot_social",
executable="sysmon_node",
name="sysmon_node",
output="screen",
parameters=[
cfg,
{
"publish_rate": LaunchConfiguration("publish_rate"),
"disk_path": LaunchConfiguration("disk_path"),
},
],
),
])

View File

@ -0,0 +1,337 @@
"""sysmon_node.py — System resource monitor for Jetson Orin.
Issue #355
Reads CPU, GPU, RAM, disk usage, and thermal temperatures, then
publishes a JSON string to /saltybot/system_resources at a configurable
rate (default 1 Hz).
All reads use /proc and /sys where available; GPU load falls back to -1.0
if the sysfs path is absent (non-Jetson host).
JSON payload
{
"ts": 1234567890.123, // epoch seconds (float)
"cpu_percent": [45.2, 32.1], // per-core %; index 0 = aggregate
"cpu_avg_percent": 38.6, // mean of per-core values
"ram_total_mb": 16384.0,
"ram_used_mb": 4096.0,
"ram_percent": 25.0,
"disk_total_gb": 64.0,
"disk_used_gb": 12.5,
"disk_percent": 19.5,
"gpu_percent": 42.0, // -1.0 if unavailable
"thermal": {"CPU-therm": 47.5, "GPU-therm": 43.2}
}
Publications
/saltybot/system_resources std_msgs/String JSON payload
Parameters
publish_rate (float, 1.0) publish frequency (Hz)
disk_path (str, "/") path for statvfs disk usage
gpu_sysfs_path (str, "/sys/devices/gpu.0/load")
thermal_glob (str, "/sys/devices/virtual/thermal/thermal_zone*/temp")
thermal_type_glob (str, "/sys/devices/virtual/thermal/thermal_zone*/type")
output_topic (str, "/saltybot/system_resources")
"""
from __future__ import annotations
import glob as _glob_mod
import json
import os
import threading
import time
from typing import Dict, List, Optional, Tuple
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import String
# ── Pure helpers ────────────────────────────────────────────────────────────────
def parse_proc_stat(content: str) -> List[List[int]]:
"""Parse /proc/stat and return per-cpu jiffies lists.
Returns a list where index 0 = "cpu" (aggregate) and index 1+ = cpu0, cpu1,
Each entry is [user, nice, system, idle, iowait, irq, softirq, steal].
"""
result: List[List[int]] = []
for line in content.splitlines():
if not line.startswith("cpu"):
break
parts = line.split()
# parts[0] is "cpu" or "cpu0", parts[1:] are jiffie fields
fields = [int(x) for x in parts[1:9]] # take up to 8 fields
while len(fields) < 8:
fields.append(0)
result.append(fields)
return result
def cpu_percent_from_stats(
prev: List[List[int]],
curr: List[List[int]],
) -> List[float]:
"""Compute per-cpu busy percentage from two /proc/stat snapshots.
Index 0 = aggregate (from "cpu" line), index 1+ = per-core.
Returns empty list if inputs are incompatible.
"""
if not prev or not curr or len(prev) != len(curr):
return []
result: List[float] = []
for p, c in zip(prev, curr):
idle_p = p[3] + p[4] # idle + iowait
idle_c = c[3] + c[4]
total_p = sum(p)
total_c = sum(c)
d_total = total_c - total_p
d_idle = idle_c - idle_p
if d_total <= 0:
result.append(0.0)
else:
pct = 100.0 * (d_total - d_idle) / d_total
result.append(round(max(0.0, min(100.0, pct)), 2))
return result
def parse_meminfo(content: str) -> Dict[str, int]:
"""Parse /proc/meminfo, return mapping of key → value in kB."""
info: Dict[str, int] = {}
for line in content.splitlines():
if ":" not in line:
continue
key, _, rest = line.partition(":")
value_str = rest.strip().split()[0] if rest.strip() else "0"
try:
info[key.strip()] = int(value_str)
except ValueError:
pass
return info
def compute_ram_stats(meminfo: Dict[str, int]) -> Tuple[float, float, float]:
"""Return (total_mb, used_mb, percent) from parsed /proc/meminfo.
Uses MemAvailable when present; falls back to MemFree.
"""
total_kb = meminfo.get("MemTotal", 0)
avail_kb = meminfo.get("MemAvailable", meminfo.get("MemFree", 0))
used_kb = max(0, total_kb - avail_kb)
total_mb = round(total_kb / 1024.0, 2)
used_mb = round(used_kb / 1024.0, 2)
if total_kb > 0:
percent = round(100.0 * used_kb / total_kb, 2)
else:
percent = 0.0
return total_mb, used_mb, percent
def read_disk_usage(path: str) -> Tuple[float, float, float]:
"""Return (total_gb, used_gb, percent) for the filesystem at *path*.
Returns (-1.0, -1.0, -1.0) on error.
"""
try:
st = os.statvfs(path)
total = st.f_frsize * st.f_blocks
free = st.f_frsize * st.f_bavail
used = total - free
total_gb = round(total / (1024 ** 3), 3)
used_gb = round(used / (1024 ** 3), 3)
pct = round(100.0 * used / total, 2) if total > 0 else 0.0
return total_gb, used_gb, pct
except Exception:
return -1.0, -1.0, -1.0
def read_gpu_load(path: str) -> float:
"""Read Jetson GPU load from sysfs (0100) or -1.0 if unavailable."""
try:
with open(path, "r") as fh:
raw = fh.read().strip()
# Some Jetson kernels report 01000 (per-mille), others 0100
val = int(raw)
if val > 100:
val = round(val / 10.0, 1)
return float(max(0.0, min(100.0, val)))
except Exception:
return -1.0
def read_thermal_zones(
temp_glob: str,
type_glob: str,
) -> Dict[str, float]:
"""Glob thermal zone sysfs paths and return {zone_type: temp_C}.
Falls back to numeric zone names ("zone0", "zone1", ) when type
files are absent.
"""
temp_paths = sorted(_glob_mod.glob(temp_glob))
result: Dict[str, float] = {}
for tp in temp_paths:
# Derive zone directory from temp path: …/thermal_zoneN/temp
zone_dir = os.path.dirname(tp)
zone_index = os.path.basename(zone_dir).replace("thermal_zone", "zone")
# Try to read the human-readable type
type_path = os.path.join(zone_dir, "type")
try:
with open(type_path, "r") as fh:
zone_name = fh.read().strip()
except Exception:
zone_name = zone_index
try:
with open(tp, "r") as fh:
milli_c = int(fh.read().strip())
result[zone_name] = round(milli_c / 1000.0, 1)
except Exception:
pass
return result
# ── ROS2 node ───────────────────────────────────────────────────────────────────
class SysmonNode(Node):
"""Publishes Jetson system resource usage as a JSON String at a fixed rate."""
def __init__(self) -> None:
super().__init__("sysmon_node")
self.declare_parameter("publish_rate", 1.0)
self.declare_parameter("disk_path", "/")
self.declare_parameter("gpu_sysfs_path", "/sys/devices/gpu.0/load")
self.declare_parameter(
"thermal_glob",
"/sys/devices/virtual/thermal/thermal_zone*/temp"
)
self.declare_parameter(
"thermal_type_glob",
"/sys/devices/virtual/thermal/thermal_zone*/type"
)
self.declare_parameter("output_topic", "/saltybot/system_resources")
rate = float(self.get_parameter("publish_rate").value)
self._disk = self.get_parameter("disk_path").value
self._gpu_path = self.get_parameter("gpu_sysfs_path").value
self._th_temp = self.get_parameter("thermal_glob").value
self._th_type = self.get_parameter("thermal_type_glob").value
topic = self.get_parameter("output_topic").value
# Injectable I/O functions for offline testing
self._read_proc_stat = self._default_read_proc_stat
self._read_meminfo = self._default_read_meminfo
self._read_disk_usage = read_disk_usage
self._read_gpu_load = read_gpu_load
self._read_thermal = read_thermal_zones
# CPU stats require two samples; prime with first read
self._lock = threading.Lock()
self._prev_stat: Optional[List[List[int]]] = self._read_proc_stat()
qos = QoSProfile(depth=10)
self._pub = self.create_publisher(String, topic, qos)
self._timer = self.create_timer(1.0 / rate, self._tick)
self.get_logger().info(
f"SysmonNode ready — publishing {topic} at {rate:.1f} Hz"
)
# ── Default I/O readers ──────────────────────────────────────────────
@staticmethod
def _default_read_proc_stat() -> Optional[List[List[int]]]:
try:
with open("/proc/stat", "r") as fh:
return parse_proc_stat(fh.read())
except Exception:
return None
@staticmethod
def _default_read_meminfo() -> str:
try:
with open("/proc/meminfo", "r") as fh:
return fh.read()
except Exception:
return ""
# ── Timer callback ───────────────────────────────────────────────────
def _tick(self) -> None:
ts = time.time()
# CPU
curr_stat = self._read_proc_stat()
with self._lock:
prev_stat = self._prev_stat
self._prev_stat = curr_stat
if prev_stat is not None and curr_stat is not None:
cpu_pcts = cpu_percent_from_stats(prev_stat, curr_stat)
else:
cpu_pcts = []
# Use per-core values (skip index 0 = aggregate) for avg
per_core = cpu_pcts[1:] if len(cpu_pcts) > 1 else cpu_pcts
cpu_avg = round(sum(per_core) / len(per_core), 2) if per_core else 0.0
# RAM
meminfo = parse_meminfo(self._read_meminfo())
ram_total, ram_used, ram_pct = compute_ram_stats(meminfo)
# Disk
disk_total, disk_used, disk_pct = self._read_disk_usage(self._disk)
# GPU
gpu_pct = self._read_gpu_load(self._gpu_path)
# Thermal
thermal = self._read_thermal(self._th_temp, self._th_type)
payload = {
"ts": ts,
"cpu_percent": cpu_pcts,
"cpu_avg_percent": cpu_avg,
"ram_total_mb": ram_total,
"ram_used_mb": ram_used,
"ram_percent": ram_pct,
"disk_total_gb": disk_total,
"disk_used_gb": disk_used,
"disk_percent": disk_pct,
"gpu_percent": gpu_pct,
"thermal": thermal,
}
msg = String()
msg.data = json.dumps(payload)
self._pub.publish(msg)
# ── Public accessor ──────────────────────────────────────────────────
@property
def prev_stat(self) -> Optional[List[List[int]]]:
with self._lock:
return self._prev_stat
def main(args=None) -> None:
rclpy.init(args=args)
node = SysmonNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()

View File

@ -63,6 +63,8 @@ setup(
'camera_hotplug_node = saltybot_social.camera_hotplug_node:main',
# Trigger-based ROS2 bag recorder (Issue #332)
'rosbag_recorder_node = saltybot_social.rosbag_recorder_node:main',
# System resource monitor for Jetson Orin (Issue #355)
'sysmon_node = saltybot_social.sysmon_node:main',
],
},
)

View File

@ -0,0 +1,791 @@
"""test_sysmon.py — Offline tests for sysmon_node (Issue #355).
All tests run without ROS, /proc, or /sys via:
- pure-function unit tests
- ROS2 stub pattern (_make_node + injectable I/O functions)
Coverage
parse_proc_stat single / multi cpu / malformed
cpu_percent_from_stats busy, idle, edge cases
parse_meminfo standard / missing keys
compute_ram_stats normal / zero total
read_disk_usage statvfs mock / error path
read_gpu_load normal / per-mille / unavailable
read_thermal_zones normal / missing type file / bad temp
SysmonNode init parameter wiring
SysmonNode._tick full payload published, JSON valid
SysmonNode CPU delta prev_stat used correctly
SysmonNode injectable I/O all readers swappable
source / entry-point file exists, main importable
"""
from __future__ import annotations
import importlib
import json
import os
import sys
import types
import unittest
from typing import Dict, List, Optional, Tuple
from unittest.mock import MagicMock, patch
# ── ROS2 / rclpy stub ───────────────────────────────────────────────────────────
def _make_rclpy_stub():
rclpy = types.ModuleType("rclpy")
rclpy_node = types.ModuleType("rclpy.node")
rclpy_qos = types.ModuleType("rclpy.qos")
std_msgs = types.ModuleType("std_msgs")
std_msgs_msg = types.ModuleType("std_msgs.msg")
class _QoSProfile:
def __init__(self, **kw): pass
class _String:
def __init__(self): self.data = ""
class _Node:
def __init__(self, name, **kw):
if not hasattr(self, "_params"):
self._params = {}
if not hasattr(self, "_pubs"):
self._pubs = {}
self._timers = []
self._logger = MagicMock()
def declare_parameter(self, name, default=None):
if name not in self._params:
self._params[name] = default
def get_parameter(self, name):
m = MagicMock()
m.value = self._params.get(name)
return m
def create_publisher(self, msg_type, topic, qos):
pub = MagicMock()
pub.msgs = []
pub.publish = lambda msg: pub.msgs.append(msg)
self._pubs[topic] = pub
return pub
def create_timer(self, period, cb):
t = MagicMock()
t._cb = cb
self._timers.append(t)
return t
def get_logger(self):
return self._logger
def destroy_node(self): pass
rclpy_node.Node = _Node
rclpy_qos.QoSProfile = _QoSProfile
std_msgs_msg.String = _String
rclpy.init = lambda *a, **kw: None
rclpy.spin = lambda n: None
rclpy.shutdown = lambda: None
return rclpy, rclpy_node, rclpy_qos, std_msgs, std_msgs_msg
def _load_mod():
"""Load sysmon_node with ROS2 stubs, return the module."""
rclpy, rclpy_node, rclpy_qos, std_msgs, std_msgs_msg = _make_rclpy_stub()
sys.modules.setdefault("rclpy", rclpy)
sys.modules.setdefault("rclpy.node", rclpy_node)
sys.modules.setdefault("rclpy.qos", rclpy_qos)
sys.modules.setdefault("std_msgs", std_msgs)
sys.modules.setdefault("std_msgs.msg", std_msgs_msg)
mod_name = "saltybot_social.sysmon_node"
if mod_name in sys.modules:
del sys.modules[mod_name]
mod = importlib.import_module(mod_name)
return mod
def _make_node(mod, **params) -> "mod.SysmonNode":
"""Create a SysmonNode with default params overridden by *params*."""
defaults = {
"publish_rate": 1.0,
"disk_path": "/",
"gpu_sysfs_path": "/sys/devices/gpu.0/load",
"thermal_glob": "/sys/devices/virtual/thermal/thermal_zone*/temp",
"thermal_type_glob": "/sys/devices/virtual/thermal/thermal_zone*/type",
"output_topic": "/saltybot/system_resources",
}
defaults.update(params)
node = mod.SysmonNode.__new__(mod.SysmonNode)
node._params = defaults
# Stub I/O before __init__ to avoid real /proc reads
node._read_proc_stat = lambda: None
mod.SysmonNode.__init__(node)
return node
# ── Sample /proc/stat content ────────────────────────────────────────────────────
_STAT_2CORE = """\
cpu 100 0 50 850 0 0 0 0 0 0
cpu0 60 0 30 410 0 0 0 0 0 0
cpu1 40 0 20 440 0 0 0 0 0 0
intr 12345 ...
"""
_STAT_2CORE_V2 = """\
cpu 250 0 100 1000 0 0 0 0 0 0
cpu0 140 0 60 510 0 0 0 0 0 0
cpu1 110 0 40 490 0 0 0 0 0 0
intr 99999 ...
"""
_MEMINFO = """\
MemTotal: 16384000 kB
MemFree: 4096000 kB
MemAvailable: 8192000 kB
Buffers: 512000 kB
Cached: 2048000 kB
"""
# ══════════════════════════════════════════════════════════════════════════════
# parse_proc_stat
# ══════════════════════════════════════════════════════════════════════════════
class TestParseProcStat(unittest.TestCase):
def setUp(self):
self.mod = _load_mod()
def test_aggregate_line_parsed(self):
result = self.mod.parse_proc_stat(_STAT_2CORE)
# index 0 = aggregate "cpu" line
self.assertEqual(result[0], [100, 0, 50, 850, 0, 0, 0, 0])
def test_per_core_lines_parsed(self):
result = self.mod.parse_proc_stat(_STAT_2CORE)
self.assertEqual(len(result), 3) # agg + cpu0 + cpu1
self.assertEqual(result[1], [60, 0, 30, 410, 0, 0, 0, 0])
self.assertEqual(result[2], [40, 0, 20, 440, 0, 0, 0, 0])
def test_stops_at_non_cpu_line(self):
result = self.mod.parse_proc_stat(_STAT_2CORE)
# should not include "intr" line
self.assertEqual(len(result), 3)
def test_short_fields_padded(self):
content = "cpu 1 2 3\n"
result = self.mod.parse_proc_stat(content)
self.assertEqual(len(result[0]), 8)
self.assertEqual(result[0][:3], [1, 2, 3])
self.assertEqual(result[0][3:], [0, 0, 0, 0, 0])
def test_empty_content(self):
result = self.mod.parse_proc_stat("")
self.assertEqual(result, [])
def test_no_cpu_lines(self):
result = self.mod.parse_proc_stat("intr 12345\nmem 0\n")
self.assertEqual(result, [])
def test_single_cpu_no_cores(self):
content = "cpu 200 0 100 700 0 0 0 0\n"
result = self.mod.parse_proc_stat(content)
self.assertEqual(len(result), 1)
self.assertEqual(result[0][0], 200)
def test_extra_fields_truncated_to_8(self):
content = "cpu 1 2 3 4 5 6 7 8 9 10\n"
result = self.mod.parse_proc_stat(content)
self.assertEqual(len(result[0]), 8)
# ══════════════════════════════════════════════════════════════════════════════
# cpu_percent_from_stats
# ══════════════════════════════════════════════════════════════════════════════
class TestCpuPercentFromStats(unittest.TestCase):
def setUp(self):
self.mod = _load_mod()
def _parse(self, content):
return self.mod.parse_proc_stat(content)
def test_basic_cpu_usage(self):
prev = self._parse(_STAT_2CORE)
curr = self._parse(_STAT_2CORE_V2)
pcts = self.mod.cpu_percent_from_stats(prev, curr)
# aggregate: delta = 300, idle delta = -150 → 50%
self.assertEqual(len(pcts), 3)
self.assertGreater(pcts[0], 0)
self.assertLessEqual(pcts[0], 100)
def test_all_idle(self):
prev = [[0, 0, 0, 1000, 0, 0, 0, 0]]
curr = [[0, 0, 0, 2000, 0, 0, 0, 0]]
pcts = self.mod.cpu_percent_from_stats(prev, curr)
self.assertEqual(pcts[0], 0.0)
def test_fully_busy(self):
prev = [[0, 0, 0, 0, 0, 0, 0, 0]]
curr = [[1000, 0, 0, 0, 0, 0, 0, 0]]
pcts = self.mod.cpu_percent_from_stats(prev, curr)
self.assertEqual(pcts[0], 100.0)
def test_empty_prev(self):
curr = self._parse(_STAT_2CORE)
result = self.mod.cpu_percent_from_stats([], curr)
self.assertEqual(result, [])
def test_mismatched_length(self):
prev = self._parse(_STAT_2CORE)
curr = self._parse(_STAT_2CORE)[:1]
result = self.mod.cpu_percent_from_stats(prev, curr)
self.assertEqual(result, [])
def test_zero_delta_total(self):
stat = [[100, 0, 50, 850, 0, 0, 0, 0]]
pcts = self.mod.cpu_percent_from_stats(stat, stat)
self.assertEqual(pcts[0], 0.0)
def test_values_clamped_0_to_100(self):
# Simulate counter wrap-around or bogus data
prev = [[9999, 0, 0, 0, 0, 0, 0, 0]]
curr = [[0, 0, 0, 1000, 0, 0, 0, 0]]
pcts = self.mod.cpu_percent_from_stats(prev, curr)
self.assertGreaterEqual(pcts[0], 0.0)
self.assertLessEqual(pcts[0], 100.0)
def test_multi_core_per_core_values(self):
prev = self._parse(_STAT_2CORE)
curr = self._parse(_STAT_2CORE_V2)
pcts = self.mod.cpu_percent_from_stats(prev, curr)
self.assertEqual(len(pcts), 3)
for p in pcts:
self.assertGreaterEqual(p, 0.0)
self.assertLessEqual(p, 100.0)
# ══════════════════════════════════════════════════════════════════════════════
# parse_meminfo
# ══════════════════════════════════════════════════════════════════════════════
class TestParseMeminfo(unittest.TestCase):
def setUp(self):
self.mod = _load_mod()
def test_total_parsed(self):
info = self.mod.parse_meminfo(_MEMINFO)
self.assertEqual(info["MemTotal"], 16384000)
def test_available_parsed(self):
info = self.mod.parse_meminfo(_MEMINFO)
self.assertEqual(info["MemAvailable"], 8192000)
def test_empty_returns_empty(self):
info = self.mod.parse_meminfo("")
self.assertEqual(info, {})
def test_lines_without_colon_ignored(self):
info = self.mod.parse_meminfo("NoColon 1234\nKey: 42 kB\n")
self.assertNotIn("NoColon 1234", info)
self.assertEqual(info["Key"], 42)
def test_malformed_value_skipped(self):
info = self.mod.parse_meminfo("Bad: abc kB\nGood: 100 kB\n")
self.assertNotIn("Bad", info)
self.assertEqual(info["Good"], 100)
def test_multiple_keys(self):
info = self.mod.parse_meminfo(_MEMINFO)
self.assertIn("MemFree", info)
self.assertIn("Buffers", info)
self.assertIn("Cached", info)
# ══════════════════════════════════════════════════════════════════════════════
# compute_ram_stats
# ══════════════════════════════════════════════════════════════════════════════
class TestComputeRamStats(unittest.TestCase):
def setUp(self):
self.mod = _load_mod()
def test_uses_mem_available(self):
info = self.mod.parse_meminfo(_MEMINFO)
total, used, pct = self.mod.compute_ram_stats(info)
# total = 16384000 kB = 16000 MB
self.assertAlmostEqual(total, 16000.0, delta=1)
# used = total - available = 16384000 - 8192000 = 8192000 kB = 8000 MB
self.assertAlmostEqual(used, 8000.0, delta=1)
def test_fallback_to_mem_free(self):
info = {"MemTotal": 1024, "MemFree": 512}
total, used, pct = self.mod.compute_ram_stats(info)
self.assertAlmostEqual(used, 0.5, delta=0.01) # 512 kB = 0.5 MB
def test_zero_total(self):
total, used, pct = self.mod.compute_ram_stats({})
self.assertEqual(total, 0.0)
self.assertEqual(pct, 0.0)
def test_percent_correct(self):
info = {"MemTotal": 1000, "MemAvailable": 750}
_, _, pct = self.mod.compute_ram_stats(info)
self.assertAlmostEqual(pct, 25.0, delta=0.1)
def test_fully_used(self):
info = {"MemTotal": 1000, "MemAvailable": 0}
_, _, pct = self.mod.compute_ram_stats(info)
self.assertEqual(pct, 100.0)
# ══════════════════════════════════════════════════════════════════════════════
# read_disk_usage
# ══════════════════════════════════════════════════════════════════════════════
class TestReadDiskUsage(unittest.TestCase):
def setUp(self):
self.mod = _load_mod()
def test_root_filesystem_succeeds(self):
"""On any real POSIX host / should be readable."""
total, used, pct = self.mod.read_disk_usage("/")
self.assertGreater(total, 0.0)
self.assertGreaterEqual(used, 0.0)
self.assertGreaterEqual(pct, 0.0)
self.assertLessEqual(pct, 100.0)
def test_nonexistent_path_returns_minus_one(self):
total, used, pct = self.mod.read_disk_usage("/nonexistent_xyz_12345")
self.assertEqual(total, -1.0)
self.assertEqual(used, -1.0)
self.assertEqual(pct, -1.0)
def test_statvfs_mock(self):
fake = MagicMock()
fake.f_frsize = 4096
fake.f_blocks = 1000 # total = 4096000 bytes ~ 3.8 MB
fake.f_bavail = 250 # free = 1024000 bytes, used = 3072000
with patch("os.statvfs", return_value=fake):
total, used, pct = self.mod.read_disk_usage("/any")
total_b = 4096 * 1000
used_b = 4096 * 750
self.assertAlmostEqual(total, total_b / (1024**3), delta=0.001)
self.assertAlmostEqual(used, used_b / (1024**3), delta=0.001)
self.assertAlmostEqual(pct, 75.0, delta=0.1)
def test_zero_blocks_returns_zero_percent(self):
fake = MagicMock()
fake.f_frsize = 4096
fake.f_blocks = 0
fake.f_bavail = 0
with patch("os.statvfs", return_value=fake):
total, used, pct = self.mod.read_disk_usage("/any")
self.assertEqual(pct, 0.0)
# ══════════════════════════════════════════════════════════════════════════════
# read_gpu_load
# ══════════════════════════════════════════════════════════════════════════════
class TestReadGpuLoad(unittest.TestCase):
def setUp(self):
self.mod = _load_mod()
def _make_tmpfile(self, content: str) -> str:
import tempfile
f = tempfile.NamedTemporaryFile("w", delete=False, suffix=".txt")
f.write(content)
f.flush()
f.close()
return f.name
def test_normal_percent(self):
p = self._make_tmpfile("42\n")
try:
self.assertEqual(self.mod.read_gpu_load(p), 42.0)
finally:
os.unlink(p)
def test_per_mille_converted(self):
p = self._make_tmpfile("750\n")
try:
val = self.mod.read_gpu_load(p)
self.assertEqual(val, 75.0)
finally:
os.unlink(p)
def test_missing_file_returns_minus_one(self):
val = self.mod.read_gpu_load("/nonexistent_gpu_sysfs_xyz")
self.assertEqual(val, -1.0)
def test_non_numeric_returns_minus_one(self):
p = self._make_tmpfile("N/A\n")
try:
val = self.mod.read_gpu_load(p)
self.assertEqual(val, -1.0)
finally:
os.unlink(p)
def test_clamped_to_100(self):
p = self._make_tmpfile("1001\n") # > 1000 → not per-mille → clamp
try:
val = self.mod.read_gpu_load(p)
self.assertLessEqual(val, 100.0)
finally:
os.unlink(p)
def test_zero(self):
p = self._make_tmpfile("0\n")
try:
self.assertEqual(self.mod.read_gpu_load(p), 0.0)
finally:
os.unlink(p)
def test_100_percent(self):
p = self._make_tmpfile("100\n")
try:
self.assertEqual(self.mod.read_gpu_load(p), 100.0)
finally:
os.unlink(p)
# ══════════════════════════════════════════════════════════════════════════════
# read_thermal_zones
# ══════════════════════════════════════════════════════════════════════════════
class TestReadThermalZones(unittest.TestCase):
def setUp(self):
self.mod = _load_mod()
import tempfile
self._tmpdir = tempfile.mkdtemp()
def tearDown(self):
import shutil
shutil.rmtree(self._tmpdir, ignore_errors=True)
def _make_zone(self, name: str, zone_id: int, milli_c: int) -> str:
zone_dir = os.path.join(self._tmpdir, f"thermal_zone{zone_id}")
os.makedirs(zone_dir, exist_ok=True)
with open(os.path.join(zone_dir, "temp"), "w") as f:
f.write(f"{milli_c}\n")
with open(os.path.join(zone_dir, "type"), "w") as f:
f.write(f"{name}\n")
return zone_dir
def test_reads_zones_with_type(self):
self._make_zone("CPU-therm", 0, 47500)
self._make_zone("GPU-therm", 1, 43200)
temp_glob = os.path.join(self._tmpdir, "thermal_zone*/temp")
type_glob = os.path.join(self._tmpdir, "thermal_zone*/type")
result = self.mod.read_thermal_zones(temp_glob, type_glob)
self.assertAlmostEqual(result["CPU-therm"], 47.5, delta=0.1)
self.assertAlmostEqual(result["GPU-therm"], 43.2, delta=0.1)
def test_fallback_to_zone_index_when_no_type(self):
zone_dir = os.path.join(self._tmpdir, "thermal_zone0")
os.makedirs(zone_dir, exist_ok=True)
with open(os.path.join(zone_dir, "temp"), "w") as f:
f.write("35000\n")
# No type file
temp_glob = os.path.join(self._tmpdir, "thermal_zone*/temp")
type_glob = os.path.join(self._tmpdir, "thermal_zone*/type")
result = self.mod.read_thermal_zones(temp_glob, type_glob)
self.assertIn("zone0", result)
self.assertAlmostEqual(result["zone0"], 35.0, delta=0.1)
def test_empty_glob_returns_empty(self):
result = self.mod.read_thermal_zones(
"/nonexistent_path/*/temp",
"/nonexistent_path/*/type",
)
self.assertEqual(result, {})
def test_bad_temp_file_skipped(self):
zone_dir = os.path.join(self._tmpdir, "thermal_zone0")
os.makedirs(zone_dir, exist_ok=True)
with open(os.path.join(zone_dir, "temp"), "w") as f:
f.write("INVALID\n")
with open(os.path.join(zone_dir, "type"), "w") as f:
f.write("CPU-therm\n")
temp_glob = os.path.join(self._tmpdir, "thermal_zone*/temp")
type_glob = os.path.join(self._tmpdir, "thermal_zone*/type")
result = self.mod.read_thermal_zones(temp_glob, type_glob)
self.assertEqual(result, {})
def test_temperature_conversion(self):
self._make_zone("SOC-therm", 0, 55000)
temp_glob = os.path.join(self._tmpdir, "thermal_zone*/temp")
type_glob = os.path.join(self._tmpdir, "thermal_zone*/type")
result = self.mod.read_thermal_zones(temp_glob, type_glob)
self.assertAlmostEqual(result["SOC-therm"], 55.0, delta=0.1)
# ══════════════════════════════════════════════════════════════════════════════
# SysmonNode init
# ══════════════════════════════════════════════════════════════════════════════
class TestSysmonNodeInit(unittest.TestCase):
def setUp(self):
self.mod = _load_mod()
def test_node_creates_publisher(self):
node = _make_node(self.mod)
self.assertIn("/saltybot/system_resources", node._pubs)
def test_node_creates_timer(self):
node = _make_node(self.mod)
self.assertEqual(len(node._timers), 1)
def test_custom_output_topic(self):
node = _make_node(self.mod, output_topic="/custom/resources")
self.assertIn("/custom/resources", node._pubs)
def test_timer_period_from_rate(self):
# Rate=2 Hz → period=0.5s — timer is created; period verified via mock
node = _make_node(self.mod, publish_rate=2.0)
self.assertEqual(len(node._timers), 1)
def test_injectable_read_proc_stat(self):
node = _make_node(self.mod)
called = []
node._read_proc_stat = lambda: (called.append(1) or None)
node._tick()
self.assertTrue(len(called) >= 1)
def test_injectable_read_meminfo(self):
node = _make_node(self.mod)
node._read_meminfo = lambda: _MEMINFO
node._read_disk_usage = lambda p: (1.0, 0.5, 50.0)
node._read_gpu_load = lambda p: 0.0
node._read_thermal = lambda g, t: {}
node._tick() # should not raise
def test_prev_stat_initialised(self):
node = _make_node(self.mod)
# _read_proc_stat was stubbed to return None during _make_node
# so prev_stat may be None — just confirm the property exists
_ = node.prev_stat
# ══════════════════════════════════════════════════════════════════════════════
# SysmonNode._tick — JSON payload
# ══════════════════════════════════════════════════════════════════════════════
class TestSysmonNodeTick(unittest.TestCase):
def setUp(self):
self.mod = _load_mod()
def _wired_node(self, **kwargs):
"""Return a node with fully stubbed I/O."""
stat_v1 = self.mod.parse_proc_stat(_STAT_2CORE)
stat_v2 = self.mod.parse_proc_stat(_STAT_2CORE_V2)
call_count = [0]
def fake_stat():
call_count[0] += 1
return stat_v2 if call_count[0] > 1 else stat_v1
node = _make_node(self.mod, **kwargs)
node._read_proc_stat = fake_stat
node._prev_stat = stat_v1
node._read_meminfo = lambda: _MEMINFO
node._read_disk_usage = lambda p: (64.0, 12.5, 19.5)
node._read_gpu_load = lambda p: 42.0
node._read_thermal = lambda g, t: {"CPU-therm": 47.5, "GPU-therm": 43.2}
return node
def _get_payload(self, node) -> dict:
node._tick()
pub = node._pubs["/saltybot/system_resources"]
self.assertTrue(len(pub.msgs) >= 1)
return json.loads(pub.msgs[-1].data)
def test_payload_is_valid_json(self):
node = self._wired_node()
node._tick()
pub = node._pubs["/saltybot/system_resources"]
msg = pub.msgs[-1]
data = json.loads(msg.data)
self.assertIsInstance(data, dict)
def test_payload_has_required_keys(self):
node = self._wired_node()
payload = self._get_payload(node)
for key in (
"ts", "cpu_percent", "cpu_avg_percent",
"ram_total_mb", "ram_used_mb", "ram_percent",
"disk_total_gb", "disk_used_gb", "disk_percent",
"gpu_percent", "thermal",
):
self.assertIn(key, payload, f"Missing key: {key}")
def test_ts_is_float(self):
node = self._wired_node()
payload = self._get_payload(node)
self.assertIsInstance(payload["ts"], float)
def test_cpu_percent_is_list(self):
node = self._wired_node()
payload = self._get_payload(node)
self.assertIsInstance(payload["cpu_percent"], list)
def test_gpu_percent_forwarded(self):
node = self._wired_node()
payload = self._get_payload(node)
self.assertAlmostEqual(payload["gpu_percent"], 42.0, delta=0.1)
def test_disk_stats_forwarded(self):
node = self._wired_node()
payload = self._get_payload(node)
self.assertAlmostEqual(payload["disk_total_gb"], 64.0, delta=0.1)
self.assertAlmostEqual(payload["disk_used_gb"], 12.5, delta=0.1)
self.assertAlmostEqual(payload["disk_percent"], 19.5, delta=0.1)
def test_ram_stats_forwarded(self):
node = self._wired_node()
payload = self._get_payload(node)
self.assertGreater(payload["ram_total_mb"], 0)
self.assertGreaterEqual(payload["ram_used_mb"], 0)
def test_thermal_is_dict(self):
node = self._wired_node()
payload = self._get_payload(node)
self.assertIsInstance(payload["thermal"], dict)
self.assertIn("CPU-therm", payload["thermal"])
def test_cpu_avg_matches_per_core(self):
node = self._wired_node()
payload = self._get_payload(node)
per_core = payload["cpu_percent"][1:]
if per_core:
expected_avg = sum(per_core) / len(per_core)
self.assertAlmostEqual(payload["cpu_avg_percent"], expected_avg, delta=0.1)
def test_publishes_once_per_tick(self):
node = self._wired_node()
node._tick()
node._tick()
pub = node._pubs["/saltybot/system_resources"]
self.assertEqual(len(pub.msgs), 2)
def test_no_prev_stat_gives_empty_cpu(self):
node = self._wired_node()
node._prev_stat = None
node._read_proc_stat = lambda: None
payload = self._get_payload(node)
self.assertEqual(payload["cpu_percent"], [])
self.assertEqual(payload["cpu_avg_percent"], 0.0)
# ══════════════════════════════════════════════════════════════════════════════
# CPU delta tracking across ticks
# ══════════════════════════════════════════════════════════════════════════════
class TestSysmonCpuDelta(unittest.TestCase):
def setUp(self):
self.mod = _load_mod()
def test_prev_stat_updated_after_tick(self):
stat_v1 = self.mod.parse_proc_stat(_STAT_2CORE)
stat_v2 = self.mod.parse_proc_stat(_STAT_2CORE_V2)
calls = [0]
def fake_stat():
calls[0] += 1
return stat_v2
node = _make_node(self.mod)
node._prev_stat = stat_v1
node._read_proc_stat = fake_stat
node._read_meminfo = lambda: ""
node._read_disk_usage = lambda p: (1.0, 0.5, 50.0)
node._read_gpu_load = lambda p: 0.0
node._read_thermal = lambda g, t: {}
node._tick()
self.assertEqual(node.prev_stat, stat_v2)
def test_second_tick_uses_updated_prev(self):
stat_v1 = self.mod.parse_proc_stat(_STAT_2CORE)
stat_v2 = self.mod.parse_proc_stat(_STAT_2CORE_V2)
seq = [stat_v1, stat_v2, stat_v1]
idx = [0]
def fake_stat():
v = seq[idx[0] % len(seq)]
idx[0] += 1
return v
node = _make_node(self.mod)
node._prev_stat = None
node._read_proc_stat = fake_stat
node._read_meminfo = lambda: ""
node._read_disk_usage = lambda p: (1.0, 0.5, 50.0)
node._read_gpu_load = lambda p: 0.0
node._read_thermal = lambda g, t: {}
node._tick()
node._tick()
pub = node._pubs["/saltybot/system_resources"]
self.assertEqual(len(pub.msgs), 2)
# ══════════════════════════════════════════════════════════════════════════════
# Source / entry-point sanity
# ══════════════════════════════════════════════════════════════════════════════
class TestSysmonSource(unittest.TestCase):
def setUp(self):
self.mod = _load_mod()
def test_file_exists(self):
src = os.path.join(
os.path.dirname(__file__),
"..", "saltybot_social", "sysmon_node.py"
)
self.assertTrue(os.path.isfile(os.path.normpath(src)))
def test_main_callable(self):
self.assertTrue(callable(self.mod.main))
def test_issue_tag_in_source(self):
src = os.path.join(
os.path.dirname(__file__),
"..", "saltybot_social", "sysmon_node.py"
)
with open(src) as fh:
content = fh.read()
self.assertIn("355", content)
def test_status_constants_present(self):
# Confirm key pure functions exported
self.assertTrue(callable(self.mod.parse_proc_stat))
self.assertTrue(callable(self.mod.cpu_percent_from_stats))
self.assertTrue(callable(self.mod.parse_meminfo))
self.assertTrue(callable(self.mod.compute_ram_stats))
self.assertTrue(callable(self.mod.read_disk_usage))
self.assertTrue(callable(self.mod.read_gpu_load))
self.assertTrue(callable(self.mod.read_thermal_zones))
if __name__ == "__main__":
unittest.main()