feat(social): system resource monitor for Jetson Orin (Issue #355) #357
13
jetson/ros2_ws/src/saltybot_social/config/sysmon_params.yaml
Normal file
13
jetson/ros2_ws/src/saltybot_social/config/sysmon_params.yaml
Normal file
@ -0,0 +1,13 @@
|
||||
sysmon_node:
|
||||
ros__parameters:
|
||||
publish_rate: 1.0 # resource publish frequency (Hz)
|
||||
disk_path: "/" # filesystem path for disk usage
|
||||
|
||||
# Jetson Orin GPU load sysfs path (per-mille or percent depending on kernel)
|
||||
gpu_sysfs_path: "/sys/devices/gpu.0/load"
|
||||
|
||||
# Glob patterns for thermal zone discovery
|
||||
thermal_glob: "/sys/devices/virtual/thermal/thermal_zone*/temp"
|
||||
thermal_type_glob: "/sys/devices/virtual/thermal/thermal_zone*/type"
|
||||
|
||||
output_topic: "/saltybot/system_resources"
|
||||
40
jetson/ros2_ws/src/saltybot_social/launch/sysmon.launch.py
Normal file
40
jetson/ros2_ws/src/saltybot_social/launch/sysmon.launch.py
Normal file
@ -0,0 +1,40 @@
|
||||
"""sysmon.launch.py — Launch system resource monitor (Issue #355).
|
||||
|
||||
Usage:
|
||||
ros2 launch saltybot_social sysmon.launch.py
|
||||
ros2 launch saltybot_social sysmon.launch.py publish_rate:=2.0
|
||||
ros2 launch saltybot_social sysmon.launch.py disk_path:=/data
|
||||
"""
|
||||
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
pkg = get_package_share_directory("saltybot_social")
|
||||
cfg = os.path.join(pkg, "config", "sysmon_params.yaml")
|
||||
|
||||
return LaunchDescription([
|
||||
DeclareLaunchArgument("publish_rate", default_value="1.0",
|
||||
description="Resource publish frequency (Hz)"),
|
||||
DeclareLaunchArgument("disk_path", default_value="/",
|
||||
description="Filesystem path for disk usage"),
|
||||
|
||||
Node(
|
||||
package="saltybot_social",
|
||||
executable="sysmon_node",
|
||||
name="sysmon_node",
|
||||
output="screen",
|
||||
parameters=[
|
||||
cfg,
|
||||
{
|
||||
"publish_rate": LaunchConfiguration("publish_rate"),
|
||||
"disk_path": LaunchConfiguration("disk_path"),
|
||||
},
|
||||
],
|
||||
),
|
||||
])
|
||||
@ -0,0 +1,337 @@
|
||||
"""sysmon_node.py — System resource monitor for Jetson Orin.
|
||||
Issue #355
|
||||
|
||||
Reads CPU, GPU, RAM, disk usage, and thermal temperatures, then
|
||||
publishes a JSON string to /saltybot/system_resources at a configurable
|
||||
rate (default 1 Hz).
|
||||
|
||||
All reads use /proc and /sys where available; GPU load falls back to -1.0
|
||||
if the sysfs path is absent (non-Jetson host).
|
||||
|
||||
JSON payload
|
||||
────────────
|
||||
{
|
||||
"ts": 1234567890.123, // epoch seconds (float)
|
||||
"cpu_percent": [45.2, 32.1], // per-core %; index 0 = aggregate
|
||||
"cpu_avg_percent": 38.6, // mean of per-core values
|
||||
"ram_total_mb": 16384.0,
|
||||
"ram_used_mb": 4096.0,
|
||||
"ram_percent": 25.0,
|
||||
"disk_total_gb": 64.0,
|
||||
"disk_used_gb": 12.5,
|
||||
"disk_percent": 19.5,
|
||||
"gpu_percent": 42.0, // -1.0 if unavailable
|
||||
"thermal": {"CPU-therm": 47.5, "GPU-therm": 43.2}
|
||||
}
|
||||
|
||||
Publications
|
||||
────────────
|
||||
/saltybot/system_resources std_msgs/String JSON payload
|
||||
|
||||
Parameters
|
||||
──────────
|
||||
publish_rate (float, 1.0) publish frequency (Hz)
|
||||
disk_path (str, "/") path for statvfs disk usage
|
||||
gpu_sysfs_path (str, "/sys/devices/gpu.0/load")
|
||||
thermal_glob (str, "/sys/devices/virtual/thermal/thermal_zone*/temp")
|
||||
thermal_type_glob (str, "/sys/devices/virtual/thermal/thermal_zone*/type")
|
||||
output_topic (str, "/saltybot/system_resources")
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import glob as _glob_mod
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile
|
||||
from std_msgs.msg import String
|
||||
|
||||
|
||||
# ── Pure helpers ────────────────────────────────────────────────────────────────
|
||||
|
||||
def parse_proc_stat(content: str) -> List[List[int]]:
|
||||
"""Parse /proc/stat and return per-cpu jiffies lists.
|
||||
|
||||
Returns a list where index 0 = "cpu" (aggregate) and index 1+ = cpu0, cpu1, …
|
||||
Each entry is [user, nice, system, idle, iowait, irq, softirq, steal].
|
||||
"""
|
||||
result: List[List[int]] = []
|
||||
for line in content.splitlines():
|
||||
if not line.startswith("cpu"):
|
||||
break
|
||||
parts = line.split()
|
||||
# parts[0] is "cpu" or "cpu0", parts[1:] are jiffie fields
|
||||
fields = [int(x) for x in parts[1:9]] # take up to 8 fields
|
||||
while len(fields) < 8:
|
||||
fields.append(0)
|
||||
result.append(fields)
|
||||
return result
|
||||
|
||||
|
||||
def cpu_percent_from_stats(
|
||||
prev: List[List[int]],
|
||||
curr: List[List[int]],
|
||||
) -> List[float]:
|
||||
"""Compute per-cpu busy percentage from two /proc/stat snapshots.
|
||||
|
||||
Index 0 = aggregate (from "cpu" line), index 1+ = per-core.
|
||||
Returns empty list if inputs are incompatible.
|
||||
"""
|
||||
if not prev or not curr or len(prev) != len(curr):
|
||||
return []
|
||||
|
||||
result: List[float] = []
|
||||
for p, c in zip(prev, curr):
|
||||
idle_p = p[3] + p[4] # idle + iowait
|
||||
idle_c = c[3] + c[4]
|
||||
total_p = sum(p)
|
||||
total_c = sum(c)
|
||||
d_total = total_c - total_p
|
||||
d_idle = idle_c - idle_p
|
||||
if d_total <= 0:
|
||||
result.append(0.0)
|
||||
else:
|
||||
pct = 100.0 * (d_total - d_idle) / d_total
|
||||
result.append(round(max(0.0, min(100.0, pct)), 2))
|
||||
return result
|
||||
|
||||
|
||||
def parse_meminfo(content: str) -> Dict[str, int]:
|
||||
"""Parse /proc/meminfo, return mapping of key → value in kB."""
|
||||
info: Dict[str, int] = {}
|
||||
for line in content.splitlines():
|
||||
if ":" not in line:
|
||||
continue
|
||||
key, _, rest = line.partition(":")
|
||||
value_str = rest.strip().split()[0] if rest.strip() else "0"
|
||||
try:
|
||||
info[key.strip()] = int(value_str)
|
||||
except ValueError:
|
||||
pass
|
||||
return info
|
||||
|
||||
|
||||
def compute_ram_stats(meminfo: Dict[str, int]) -> Tuple[float, float, float]:
|
||||
"""Return (total_mb, used_mb, percent) from parsed /proc/meminfo.
|
||||
|
||||
Uses MemAvailable when present; falls back to MemFree.
|
||||
"""
|
||||
total_kb = meminfo.get("MemTotal", 0)
|
||||
avail_kb = meminfo.get("MemAvailable", meminfo.get("MemFree", 0))
|
||||
used_kb = max(0, total_kb - avail_kb)
|
||||
total_mb = round(total_kb / 1024.0, 2)
|
||||
used_mb = round(used_kb / 1024.0, 2)
|
||||
if total_kb > 0:
|
||||
percent = round(100.0 * used_kb / total_kb, 2)
|
||||
else:
|
||||
percent = 0.0
|
||||
return total_mb, used_mb, percent
|
||||
|
||||
|
||||
def read_disk_usage(path: str) -> Tuple[float, float, float]:
|
||||
"""Return (total_gb, used_gb, percent) for the filesystem at *path*.
|
||||
|
||||
Returns (-1.0, -1.0, -1.0) on error.
|
||||
"""
|
||||
try:
|
||||
st = os.statvfs(path)
|
||||
total = st.f_frsize * st.f_blocks
|
||||
free = st.f_frsize * st.f_bavail
|
||||
used = total - free
|
||||
total_gb = round(total / (1024 ** 3), 3)
|
||||
used_gb = round(used / (1024 ** 3), 3)
|
||||
pct = round(100.0 * used / total, 2) if total > 0 else 0.0
|
||||
return total_gb, used_gb, pct
|
||||
except Exception:
|
||||
return -1.0, -1.0, -1.0
|
||||
|
||||
|
||||
def read_gpu_load(path: str) -> float:
|
||||
"""Read Jetson GPU load from sysfs (0–100) or -1.0 if unavailable."""
|
||||
try:
|
||||
with open(path, "r") as fh:
|
||||
raw = fh.read().strip()
|
||||
# Some Jetson kernels report 0–1000 (per-mille), others 0–100
|
||||
val = int(raw)
|
||||
if val > 100:
|
||||
val = round(val / 10.0, 1)
|
||||
return float(max(0.0, min(100.0, val)))
|
||||
except Exception:
|
||||
return -1.0
|
||||
|
||||
|
||||
def read_thermal_zones(
|
||||
temp_glob: str,
|
||||
type_glob: str,
|
||||
) -> Dict[str, float]:
|
||||
"""Glob thermal zone sysfs paths and return {zone_type: temp_C}.
|
||||
|
||||
Falls back to numeric zone names ("zone0", "zone1", …) when type
|
||||
files are absent.
|
||||
"""
|
||||
temp_paths = sorted(_glob_mod.glob(temp_glob))
|
||||
result: Dict[str, float] = {}
|
||||
|
||||
for tp in temp_paths:
|
||||
# Derive zone directory from temp path: …/thermal_zoneN/temp
|
||||
zone_dir = os.path.dirname(tp)
|
||||
zone_index = os.path.basename(zone_dir).replace("thermal_zone", "zone")
|
||||
|
||||
# Try to read the human-readable type
|
||||
type_path = os.path.join(zone_dir, "type")
|
||||
try:
|
||||
with open(type_path, "r") as fh:
|
||||
zone_name = fh.read().strip()
|
||||
except Exception:
|
||||
zone_name = zone_index
|
||||
|
||||
try:
|
||||
with open(tp, "r") as fh:
|
||||
milli_c = int(fh.read().strip())
|
||||
result[zone_name] = round(milli_c / 1000.0, 1)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ── ROS2 node ───────────────────────────────────────────────────────────────────
|
||||
|
||||
class SysmonNode(Node):
|
||||
"""Publishes Jetson system resource usage as a JSON String at a fixed rate."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__("sysmon_node")
|
||||
|
||||
self.declare_parameter("publish_rate", 1.0)
|
||||
self.declare_parameter("disk_path", "/")
|
||||
self.declare_parameter("gpu_sysfs_path", "/sys/devices/gpu.0/load")
|
||||
self.declare_parameter(
|
||||
"thermal_glob",
|
||||
"/sys/devices/virtual/thermal/thermal_zone*/temp"
|
||||
)
|
||||
self.declare_parameter(
|
||||
"thermal_type_glob",
|
||||
"/sys/devices/virtual/thermal/thermal_zone*/type"
|
||||
)
|
||||
self.declare_parameter("output_topic", "/saltybot/system_resources")
|
||||
|
||||
rate = float(self.get_parameter("publish_rate").value)
|
||||
self._disk = self.get_parameter("disk_path").value
|
||||
self._gpu_path = self.get_parameter("gpu_sysfs_path").value
|
||||
self._th_temp = self.get_parameter("thermal_glob").value
|
||||
self._th_type = self.get_parameter("thermal_type_glob").value
|
||||
topic = self.get_parameter("output_topic").value
|
||||
|
||||
# Injectable I/O functions for offline testing
|
||||
self._read_proc_stat = self._default_read_proc_stat
|
||||
self._read_meminfo = self._default_read_meminfo
|
||||
self._read_disk_usage = read_disk_usage
|
||||
self._read_gpu_load = read_gpu_load
|
||||
self._read_thermal = read_thermal_zones
|
||||
|
||||
# CPU stats require two samples; prime with first read
|
||||
self._lock = threading.Lock()
|
||||
self._prev_stat: Optional[List[List[int]]] = self._read_proc_stat()
|
||||
|
||||
qos = QoSProfile(depth=10)
|
||||
self._pub = self.create_publisher(String, topic, qos)
|
||||
self._timer = self.create_timer(1.0 / rate, self._tick)
|
||||
|
||||
self.get_logger().info(
|
||||
f"SysmonNode ready — publishing {topic} at {rate:.1f} Hz"
|
||||
)
|
||||
|
||||
# ── Default I/O readers ──────────────────────────────────────────────
|
||||
|
||||
@staticmethod
|
||||
def _default_read_proc_stat() -> Optional[List[List[int]]]:
|
||||
try:
|
||||
with open("/proc/stat", "r") as fh:
|
||||
return parse_proc_stat(fh.read())
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _default_read_meminfo() -> str:
|
||||
try:
|
||||
with open("/proc/meminfo", "r") as fh:
|
||||
return fh.read()
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
# ── Timer callback ───────────────────────────────────────────────────
|
||||
|
||||
def _tick(self) -> None:
|
||||
ts = time.time()
|
||||
|
||||
# CPU
|
||||
curr_stat = self._read_proc_stat()
|
||||
with self._lock:
|
||||
prev_stat = self._prev_stat
|
||||
self._prev_stat = curr_stat
|
||||
|
||||
if prev_stat is not None and curr_stat is not None:
|
||||
cpu_pcts = cpu_percent_from_stats(prev_stat, curr_stat)
|
||||
else:
|
||||
cpu_pcts = []
|
||||
|
||||
# Use per-core values (skip index 0 = aggregate) for avg
|
||||
per_core = cpu_pcts[1:] if len(cpu_pcts) > 1 else cpu_pcts
|
||||
cpu_avg = round(sum(per_core) / len(per_core), 2) if per_core else 0.0
|
||||
|
||||
# RAM
|
||||
meminfo = parse_meminfo(self._read_meminfo())
|
||||
ram_total, ram_used, ram_pct = compute_ram_stats(meminfo)
|
||||
|
||||
# Disk
|
||||
disk_total, disk_used, disk_pct = self._read_disk_usage(self._disk)
|
||||
|
||||
# GPU
|
||||
gpu_pct = self._read_gpu_load(self._gpu_path)
|
||||
|
||||
# Thermal
|
||||
thermal = self._read_thermal(self._th_temp, self._th_type)
|
||||
|
||||
payload = {
|
||||
"ts": ts,
|
||||
"cpu_percent": cpu_pcts,
|
||||
"cpu_avg_percent": cpu_avg,
|
||||
"ram_total_mb": ram_total,
|
||||
"ram_used_mb": ram_used,
|
||||
"ram_percent": ram_pct,
|
||||
"disk_total_gb": disk_total,
|
||||
"disk_used_gb": disk_used,
|
||||
"disk_percent": disk_pct,
|
||||
"gpu_percent": gpu_pct,
|
||||
"thermal": thermal,
|
||||
}
|
||||
|
||||
msg = String()
|
||||
msg.data = json.dumps(payload)
|
||||
self._pub.publish(msg)
|
||||
|
||||
# ── Public accessor ──────────────────────────────────────────────────
|
||||
|
||||
@property
|
||||
def prev_stat(self) -> Optional[List[List[int]]]:
|
||||
with self._lock:
|
||||
return self._prev_stat
|
||||
|
||||
|
||||
def main(args=None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = SysmonNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
@ -63,6 +63,8 @@ setup(
|
||||
'camera_hotplug_node = saltybot_social.camera_hotplug_node:main',
|
||||
# Trigger-based ROS2 bag recorder (Issue #332)
|
||||
'rosbag_recorder_node = saltybot_social.rosbag_recorder_node:main',
|
||||
# System resource monitor for Jetson Orin (Issue #355)
|
||||
'sysmon_node = saltybot_social.sysmon_node:main',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
||||
791
jetson/ros2_ws/src/saltybot_social/test/test_sysmon.py
Normal file
791
jetson/ros2_ws/src/saltybot_social/test/test_sysmon.py
Normal file
@ -0,0 +1,791 @@
|
||||
"""test_sysmon.py — Offline tests for sysmon_node (Issue #355).
|
||||
|
||||
All tests run without ROS, /proc, or /sys via:
|
||||
- pure-function unit tests
|
||||
- ROS2 stub pattern (_make_node + injectable I/O functions)
|
||||
|
||||
Coverage
|
||||
────────
|
||||
parse_proc_stat — single / multi cpu / malformed
|
||||
cpu_percent_from_stats — busy, idle, edge cases
|
||||
parse_meminfo — standard / missing keys
|
||||
compute_ram_stats — normal / zero total
|
||||
read_disk_usage — statvfs mock / error path
|
||||
read_gpu_load — normal / per-mille / unavailable
|
||||
read_thermal_zones — normal / missing type file / bad temp
|
||||
SysmonNode init — parameter wiring
|
||||
SysmonNode._tick — full payload published, JSON valid
|
||||
SysmonNode CPU delta — prev_stat used correctly
|
||||
SysmonNode injectable I/O — all readers swappable
|
||||
source / entry-point — file exists, main importable
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import types
|
||||
import unittest
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
|
||||
# ── ROS2 / rclpy stub ───────────────────────────────────────────────────────────
|
||||
|
||||
def _make_rclpy_stub():
|
||||
rclpy = types.ModuleType("rclpy")
|
||||
rclpy_node = types.ModuleType("rclpy.node")
|
||||
rclpy_qos = types.ModuleType("rclpy.qos")
|
||||
std_msgs = types.ModuleType("std_msgs")
|
||||
std_msgs_msg = types.ModuleType("std_msgs.msg")
|
||||
|
||||
class _QoSProfile:
|
||||
def __init__(self, **kw): pass
|
||||
|
||||
class _String:
|
||||
def __init__(self): self.data = ""
|
||||
|
||||
class _Node:
|
||||
def __init__(self, name, **kw):
|
||||
if not hasattr(self, "_params"):
|
||||
self._params = {}
|
||||
if not hasattr(self, "_pubs"):
|
||||
self._pubs = {}
|
||||
self._timers = []
|
||||
self._logger = MagicMock()
|
||||
|
||||
def declare_parameter(self, name, default=None):
|
||||
if name not in self._params:
|
||||
self._params[name] = default
|
||||
|
||||
def get_parameter(self, name):
|
||||
m = MagicMock()
|
||||
m.value = self._params.get(name)
|
||||
return m
|
||||
|
||||
def create_publisher(self, msg_type, topic, qos):
|
||||
pub = MagicMock()
|
||||
pub.msgs = []
|
||||
pub.publish = lambda msg: pub.msgs.append(msg)
|
||||
self._pubs[topic] = pub
|
||||
return pub
|
||||
|
||||
def create_timer(self, period, cb):
|
||||
t = MagicMock()
|
||||
t._cb = cb
|
||||
self._timers.append(t)
|
||||
return t
|
||||
|
||||
def get_logger(self):
|
||||
return self._logger
|
||||
|
||||
def destroy_node(self): pass
|
||||
|
||||
rclpy_node.Node = _Node
|
||||
rclpy_qos.QoSProfile = _QoSProfile
|
||||
std_msgs_msg.String = _String
|
||||
|
||||
rclpy.init = lambda *a, **kw: None
|
||||
rclpy.spin = lambda n: None
|
||||
rclpy.shutdown = lambda: None
|
||||
|
||||
return rclpy, rclpy_node, rclpy_qos, std_msgs, std_msgs_msg
|
||||
|
||||
|
||||
def _load_mod():
|
||||
"""Load sysmon_node with ROS2 stubs, return the module."""
|
||||
rclpy, rclpy_node, rclpy_qos, std_msgs, std_msgs_msg = _make_rclpy_stub()
|
||||
sys.modules.setdefault("rclpy", rclpy)
|
||||
sys.modules.setdefault("rclpy.node", rclpy_node)
|
||||
sys.modules.setdefault("rclpy.qos", rclpy_qos)
|
||||
sys.modules.setdefault("std_msgs", std_msgs)
|
||||
sys.modules.setdefault("std_msgs.msg", std_msgs_msg)
|
||||
|
||||
mod_name = "saltybot_social.sysmon_node"
|
||||
if mod_name in sys.modules:
|
||||
del sys.modules[mod_name]
|
||||
mod = importlib.import_module(mod_name)
|
||||
return mod
|
||||
|
||||
|
||||
def _make_node(mod, **params) -> "mod.SysmonNode":
|
||||
"""Create a SysmonNode with default params overridden by *params*."""
|
||||
defaults = {
|
||||
"publish_rate": 1.0,
|
||||
"disk_path": "/",
|
||||
"gpu_sysfs_path": "/sys/devices/gpu.0/load",
|
||||
"thermal_glob": "/sys/devices/virtual/thermal/thermal_zone*/temp",
|
||||
"thermal_type_glob": "/sys/devices/virtual/thermal/thermal_zone*/type",
|
||||
"output_topic": "/saltybot/system_resources",
|
||||
}
|
||||
defaults.update(params)
|
||||
|
||||
node = mod.SysmonNode.__new__(mod.SysmonNode)
|
||||
node._params = defaults
|
||||
# Stub I/O before __init__ to avoid real /proc reads
|
||||
node._read_proc_stat = lambda: None
|
||||
mod.SysmonNode.__init__(node)
|
||||
return node
|
||||
|
||||
|
||||
# ── Sample /proc/stat content ────────────────────────────────────────────────────
|
||||
|
||||
_STAT_2CORE = """\
|
||||
cpu 100 0 50 850 0 0 0 0 0 0
|
||||
cpu0 60 0 30 410 0 0 0 0 0 0
|
||||
cpu1 40 0 20 440 0 0 0 0 0 0
|
||||
intr 12345 ...
|
||||
"""
|
||||
|
||||
_STAT_2CORE_V2 = """\
|
||||
cpu 250 0 100 1000 0 0 0 0 0 0
|
||||
cpu0 140 0 60 510 0 0 0 0 0 0
|
||||
cpu1 110 0 40 490 0 0 0 0 0 0
|
||||
intr 99999 ...
|
||||
"""
|
||||
|
||||
_MEMINFO = """\
|
||||
MemTotal: 16384000 kB
|
||||
MemFree: 4096000 kB
|
||||
MemAvailable: 8192000 kB
|
||||
Buffers: 512000 kB
|
||||
Cached: 2048000 kB
|
||||
"""
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# parse_proc_stat
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestParseProcStat(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mod = _load_mod()
|
||||
|
||||
def test_aggregate_line_parsed(self):
|
||||
result = self.mod.parse_proc_stat(_STAT_2CORE)
|
||||
# index 0 = aggregate "cpu" line
|
||||
self.assertEqual(result[0], [100, 0, 50, 850, 0, 0, 0, 0])
|
||||
|
||||
def test_per_core_lines_parsed(self):
|
||||
result = self.mod.parse_proc_stat(_STAT_2CORE)
|
||||
self.assertEqual(len(result), 3) # agg + cpu0 + cpu1
|
||||
self.assertEqual(result[1], [60, 0, 30, 410, 0, 0, 0, 0])
|
||||
self.assertEqual(result[2], [40, 0, 20, 440, 0, 0, 0, 0])
|
||||
|
||||
def test_stops_at_non_cpu_line(self):
|
||||
result = self.mod.parse_proc_stat(_STAT_2CORE)
|
||||
# should not include "intr" line
|
||||
self.assertEqual(len(result), 3)
|
||||
|
||||
def test_short_fields_padded(self):
|
||||
content = "cpu 1 2 3\n"
|
||||
result = self.mod.parse_proc_stat(content)
|
||||
self.assertEqual(len(result[0]), 8)
|
||||
self.assertEqual(result[0][:3], [1, 2, 3])
|
||||
self.assertEqual(result[0][3:], [0, 0, 0, 0, 0])
|
||||
|
||||
def test_empty_content(self):
|
||||
result = self.mod.parse_proc_stat("")
|
||||
self.assertEqual(result, [])
|
||||
|
||||
def test_no_cpu_lines(self):
|
||||
result = self.mod.parse_proc_stat("intr 12345\nmem 0\n")
|
||||
self.assertEqual(result, [])
|
||||
|
||||
def test_single_cpu_no_cores(self):
|
||||
content = "cpu 200 0 100 700 0 0 0 0\n"
|
||||
result = self.mod.parse_proc_stat(content)
|
||||
self.assertEqual(len(result), 1)
|
||||
self.assertEqual(result[0][0], 200)
|
||||
|
||||
def test_extra_fields_truncated_to_8(self):
|
||||
content = "cpu 1 2 3 4 5 6 7 8 9 10\n"
|
||||
result = self.mod.parse_proc_stat(content)
|
||||
self.assertEqual(len(result[0]), 8)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# cpu_percent_from_stats
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestCpuPercentFromStats(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mod = _load_mod()
|
||||
|
||||
def _parse(self, content):
|
||||
return self.mod.parse_proc_stat(content)
|
||||
|
||||
def test_basic_cpu_usage(self):
|
||||
prev = self._parse(_STAT_2CORE)
|
||||
curr = self._parse(_STAT_2CORE_V2)
|
||||
pcts = self.mod.cpu_percent_from_stats(prev, curr)
|
||||
# aggregate: delta = 300, idle delta = -150 → 50%
|
||||
self.assertEqual(len(pcts), 3)
|
||||
self.assertGreater(pcts[0], 0)
|
||||
self.assertLessEqual(pcts[0], 100)
|
||||
|
||||
def test_all_idle(self):
|
||||
prev = [[0, 0, 0, 1000, 0, 0, 0, 0]]
|
||||
curr = [[0, 0, 0, 2000, 0, 0, 0, 0]]
|
||||
pcts = self.mod.cpu_percent_from_stats(prev, curr)
|
||||
self.assertEqual(pcts[0], 0.0)
|
||||
|
||||
def test_fully_busy(self):
|
||||
prev = [[0, 0, 0, 0, 0, 0, 0, 0]]
|
||||
curr = [[1000, 0, 0, 0, 0, 0, 0, 0]]
|
||||
pcts = self.mod.cpu_percent_from_stats(prev, curr)
|
||||
self.assertEqual(pcts[0], 100.0)
|
||||
|
||||
def test_empty_prev(self):
|
||||
curr = self._parse(_STAT_2CORE)
|
||||
result = self.mod.cpu_percent_from_stats([], curr)
|
||||
self.assertEqual(result, [])
|
||||
|
||||
def test_mismatched_length(self):
|
||||
prev = self._parse(_STAT_2CORE)
|
||||
curr = self._parse(_STAT_2CORE)[:1]
|
||||
result = self.mod.cpu_percent_from_stats(prev, curr)
|
||||
self.assertEqual(result, [])
|
||||
|
||||
def test_zero_delta_total(self):
|
||||
stat = [[100, 0, 50, 850, 0, 0, 0, 0]]
|
||||
pcts = self.mod.cpu_percent_from_stats(stat, stat)
|
||||
self.assertEqual(pcts[0], 0.0)
|
||||
|
||||
def test_values_clamped_0_to_100(self):
|
||||
# Simulate counter wrap-around or bogus data
|
||||
prev = [[9999, 0, 0, 0, 0, 0, 0, 0]]
|
||||
curr = [[0, 0, 0, 1000, 0, 0, 0, 0]]
|
||||
pcts = self.mod.cpu_percent_from_stats(prev, curr)
|
||||
self.assertGreaterEqual(pcts[0], 0.0)
|
||||
self.assertLessEqual(pcts[0], 100.0)
|
||||
|
||||
def test_multi_core_per_core_values(self):
|
||||
prev = self._parse(_STAT_2CORE)
|
||||
curr = self._parse(_STAT_2CORE_V2)
|
||||
pcts = self.mod.cpu_percent_from_stats(prev, curr)
|
||||
self.assertEqual(len(pcts), 3)
|
||||
for p in pcts:
|
||||
self.assertGreaterEqual(p, 0.0)
|
||||
self.assertLessEqual(p, 100.0)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# parse_meminfo
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestParseMeminfo(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mod = _load_mod()
|
||||
|
||||
def test_total_parsed(self):
|
||||
info = self.mod.parse_meminfo(_MEMINFO)
|
||||
self.assertEqual(info["MemTotal"], 16384000)
|
||||
|
||||
def test_available_parsed(self):
|
||||
info = self.mod.parse_meminfo(_MEMINFO)
|
||||
self.assertEqual(info["MemAvailable"], 8192000)
|
||||
|
||||
def test_empty_returns_empty(self):
|
||||
info = self.mod.parse_meminfo("")
|
||||
self.assertEqual(info, {})
|
||||
|
||||
def test_lines_without_colon_ignored(self):
|
||||
info = self.mod.parse_meminfo("NoColon 1234\nKey: 42 kB\n")
|
||||
self.assertNotIn("NoColon 1234", info)
|
||||
self.assertEqual(info["Key"], 42)
|
||||
|
||||
def test_malformed_value_skipped(self):
|
||||
info = self.mod.parse_meminfo("Bad: abc kB\nGood: 100 kB\n")
|
||||
self.assertNotIn("Bad", info)
|
||||
self.assertEqual(info["Good"], 100)
|
||||
|
||||
def test_multiple_keys(self):
|
||||
info = self.mod.parse_meminfo(_MEMINFO)
|
||||
self.assertIn("MemFree", info)
|
||||
self.assertIn("Buffers", info)
|
||||
self.assertIn("Cached", info)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# compute_ram_stats
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestComputeRamStats(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mod = _load_mod()
|
||||
|
||||
def test_uses_mem_available(self):
|
||||
info = self.mod.parse_meminfo(_MEMINFO)
|
||||
total, used, pct = self.mod.compute_ram_stats(info)
|
||||
# total = 16384000 kB = 16000 MB
|
||||
self.assertAlmostEqual(total, 16000.0, delta=1)
|
||||
# used = total - available = 16384000 - 8192000 = 8192000 kB = 8000 MB
|
||||
self.assertAlmostEqual(used, 8000.0, delta=1)
|
||||
|
||||
def test_fallback_to_mem_free(self):
|
||||
info = {"MemTotal": 1024, "MemFree": 512}
|
||||
total, used, pct = self.mod.compute_ram_stats(info)
|
||||
self.assertAlmostEqual(used, 0.5, delta=0.01) # 512 kB = 0.5 MB
|
||||
|
||||
def test_zero_total(self):
|
||||
total, used, pct = self.mod.compute_ram_stats({})
|
||||
self.assertEqual(total, 0.0)
|
||||
self.assertEqual(pct, 0.0)
|
||||
|
||||
def test_percent_correct(self):
|
||||
info = {"MemTotal": 1000, "MemAvailable": 750}
|
||||
_, _, pct = self.mod.compute_ram_stats(info)
|
||||
self.assertAlmostEqual(pct, 25.0, delta=0.1)
|
||||
|
||||
def test_fully_used(self):
|
||||
info = {"MemTotal": 1000, "MemAvailable": 0}
|
||||
_, _, pct = self.mod.compute_ram_stats(info)
|
||||
self.assertEqual(pct, 100.0)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# read_disk_usage
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestReadDiskUsage(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mod = _load_mod()
|
||||
|
||||
def test_root_filesystem_succeeds(self):
|
||||
"""On any real POSIX host / should be readable."""
|
||||
total, used, pct = self.mod.read_disk_usage("/")
|
||||
self.assertGreater(total, 0.0)
|
||||
self.assertGreaterEqual(used, 0.0)
|
||||
self.assertGreaterEqual(pct, 0.0)
|
||||
self.assertLessEqual(pct, 100.0)
|
||||
|
||||
def test_nonexistent_path_returns_minus_one(self):
|
||||
total, used, pct = self.mod.read_disk_usage("/nonexistent_xyz_12345")
|
||||
self.assertEqual(total, -1.0)
|
||||
self.assertEqual(used, -1.0)
|
||||
self.assertEqual(pct, -1.0)
|
||||
|
||||
def test_statvfs_mock(self):
|
||||
fake = MagicMock()
|
||||
fake.f_frsize = 4096
|
||||
fake.f_blocks = 1000 # total = 4096000 bytes ~ 3.8 MB
|
||||
fake.f_bavail = 250 # free = 1024000 bytes, used = 3072000
|
||||
with patch("os.statvfs", return_value=fake):
|
||||
total, used, pct = self.mod.read_disk_usage("/any")
|
||||
total_b = 4096 * 1000
|
||||
used_b = 4096 * 750
|
||||
self.assertAlmostEqual(total, total_b / (1024**3), delta=0.001)
|
||||
self.assertAlmostEqual(used, used_b / (1024**3), delta=0.001)
|
||||
self.assertAlmostEqual(pct, 75.0, delta=0.1)
|
||||
|
||||
def test_zero_blocks_returns_zero_percent(self):
|
||||
fake = MagicMock()
|
||||
fake.f_frsize = 4096
|
||||
fake.f_blocks = 0
|
||||
fake.f_bavail = 0
|
||||
with patch("os.statvfs", return_value=fake):
|
||||
total, used, pct = self.mod.read_disk_usage("/any")
|
||||
self.assertEqual(pct, 0.0)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# read_gpu_load
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestReadGpuLoad(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mod = _load_mod()
|
||||
|
||||
def _make_tmpfile(self, content: str) -> str:
|
||||
import tempfile
|
||||
f = tempfile.NamedTemporaryFile("w", delete=False, suffix=".txt")
|
||||
f.write(content)
|
||||
f.flush()
|
||||
f.close()
|
||||
return f.name
|
||||
|
||||
def test_normal_percent(self):
|
||||
p = self._make_tmpfile("42\n")
|
||||
try:
|
||||
self.assertEqual(self.mod.read_gpu_load(p), 42.0)
|
||||
finally:
|
||||
os.unlink(p)
|
||||
|
||||
def test_per_mille_converted(self):
|
||||
p = self._make_tmpfile("750\n")
|
||||
try:
|
||||
val = self.mod.read_gpu_load(p)
|
||||
self.assertEqual(val, 75.0)
|
||||
finally:
|
||||
os.unlink(p)
|
||||
|
||||
def test_missing_file_returns_minus_one(self):
|
||||
val = self.mod.read_gpu_load("/nonexistent_gpu_sysfs_xyz")
|
||||
self.assertEqual(val, -1.0)
|
||||
|
||||
def test_non_numeric_returns_minus_one(self):
|
||||
p = self._make_tmpfile("N/A\n")
|
||||
try:
|
||||
val = self.mod.read_gpu_load(p)
|
||||
self.assertEqual(val, -1.0)
|
||||
finally:
|
||||
os.unlink(p)
|
||||
|
||||
def test_clamped_to_100(self):
|
||||
p = self._make_tmpfile("1001\n") # > 1000 → not per-mille → clamp
|
||||
try:
|
||||
val = self.mod.read_gpu_load(p)
|
||||
self.assertLessEqual(val, 100.0)
|
||||
finally:
|
||||
os.unlink(p)
|
||||
|
||||
def test_zero(self):
|
||||
p = self._make_tmpfile("0\n")
|
||||
try:
|
||||
self.assertEqual(self.mod.read_gpu_load(p), 0.0)
|
||||
finally:
|
||||
os.unlink(p)
|
||||
|
||||
def test_100_percent(self):
|
||||
p = self._make_tmpfile("100\n")
|
||||
try:
|
||||
self.assertEqual(self.mod.read_gpu_load(p), 100.0)
|
||||
finally:
|
||||
os.unlink(p)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# read_thermal_zones
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestReadThermalZones(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mod = _load_mod()
|
||||
import tempfile
|
||||
self._tmpdir = tempfile.mkdtemp()
|
||||
|
||||
def tearDown(self):
|
||||
import shutil
|
||||
shutil.rmtree(self._tmpdir, ignore_errors=True)
|
||||
|
||||
def _make_zone(self, name: str, zone_id: int, milli_c: int) -> str:
|
||||
zone_dir = os.path.join(self._tmpdir, f"thermal_zone{zone_id}")
|
||||
os.makedirs(zone_dir, exist_ok=True)
|
||||
with open(os.path.join(zone_dir, "temp"), "w") as f:
|
||||
f.write(f"{milli_c}\n")
|
||||
with open(os.path.join(zone_dir, "type"), "w") as f:
|
||||
f.write(f"{name}\n")
|
||||
return zone_dir
|
||||
|
||||
def test_reads_zones_with_type(self):
|
||||
self._make_zone("CPU-therm", 0, 47500)
|
||||
self._make_zone("GPU-therm", 1, 43200)
|
||||
temp_glob = os.path.join(self._tmpdir, "thermal_zone*/temp")
|
||||
type_glob = os.path.join(self._tmpdir, "thermal_zone*/type")
|
||||
result = self.mod.read_thermal_zones(temp_glob, type_glob)
|
||||
self.assertAlmostEqual(result["CPU-therm"], 47.5, delta=0.1)
|
||||
self.assertAlmostEqual(result["GPU-therm"], 43.2, delta=0.1)
|
||||
|
||||
def test_fallback_to_zone_index_when_no_type(self):
|
||||
zone_dir = os.path.join(self._tmpdir, "thermal_zone0")
|
||||
os.makedirs(zone_dir, exist_ok=True)
|
||||
with open(os.path.join(zone_dir, "temp"), "w") as f:
|
||||
f.write("35000\n")
|
||||
# No type file
|
||||
temp_glob = os.path.join(self._tmpdir, "thermal_zone*/temp")
|
||||
type_glob = os.path.join(self._tmpdir, "thermal_zone*/type")
|
||||
result = self.mod.read_thermal_zones(temp_glob, type_glob)
|
||||
self.assertIn("zone0", result)
|
||||
self.assertAlmostEqual(result["zone0"], 35.0, delta=0.1)
|
||||
|
||||
def test_empty_glob_returns_empty(self):
|
||||
result = self.mod.read_thermal_zones(
|
||||
"/nonexistent_path/*/temp",
|
||||
"/nonexistent_path/*/type",
|
||||
)
|
||||
self.assertEqual(result, {})
|
||||
|
||||
def test_bad_temp_file_skipped(self):
|
||||
zone_dir = os.path.join(self._tmpdir, "thermal_zone0")
|
||||
os.makedirs(zone_dir, exist_ok=True)
|
||||
with open(os.path.join(zone_dir, "temp"), "w") as f:
|
||||
f.write("INVALID\n")
|
||||
with open(os.path.join(zone_dir, "type"), "w") as f:
|
||||
f.write("CPU-therm\n")
|
||||
temp_glob = os.path.join(self._tmpdir, "thermal_zone*/temp")
|
||||
type_glob = os.path.join(self._tmpdir, "thermal_zone*/type")
|
||||
result = self.mod.read_thermal_zones(temp_glob, type_glob)
|
||||
self.assertEqual(result, {})
|
||||
|
||||
def test_temperature_conversion(self):
|
||||
self._make_zone("SOC-therm", 0, 55000)
|
||||
temp_glob = os.path.join(self._tmpdir, "thermal_zone*/temp")
|
||||
type_glob = os.path.join(self._tmpdir, "thermal_zone*/type")
|
||||
result = self.mod.read_thermal_zones(temp_glob, type_glob)
|
||||
self.assertAlmostEqual(result["SOC-therm"], 55.0, delta=0.1)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# SysmonNode init
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestSysmonNodeInit(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mod = _load_mod()
|
||||
|
||||
def test_node_creates_publisher(self):
|
||||
node = _make_node(self.mod)
|
||||
self.assertIn("/saltybot/system_resources", node._pubs)
|
||||
|
||||
def test_node_creates_timer(self):
|
||||
node = _make_node(self.mod)
|
||||
self.assertEqual(len(node._timers), 1)
|
||||
|
||||
def test_custom_output_topic(self):
|
||||
node = _make_node(self.mod, output_topic="/custom/resources")
|
||||
self.assertIn("/custom/resources", node._pubs)
|
||||
|
||||
def test_timer_period_from_rate(self):
|
||||
# Rate=2 Hz → period=0.5s — timer is created; period verified via mock
|
||||
node = _make_node(self.mod, publish_rate=2.0)
|
||||
self.assertEqual(len(node._timers), 1)
|
||||
|
||||
def test_injectable_read_proc_stat(self):
|
||||
node = _make_node(self.mod)
|
||||
called = []
|
||||
node._read_proc_stat = lambda: (called.append(1) or None)
|
||||
node._tick()
|
||||
self.assertTrue(len(called) >= 1)
|
||||
|
||||
def test_injectable_read_meminfo(self):
|
||||
node = _make_node(self.mod)
|
||||
node._read_meminfo = lambda: _MEMINFO
|
||||
node._read_disk_usage = lambda p: (1.0, 0.5, 50.0)
|
||||
node._read_gpu_load = lambda p: 0.0
|
||||
node._read_thermal = lambda g, t: {}
|
||||
node._tick() # should not raise
|
||||
|
||||
def test_prev_stat_initialised(self):
|
||||
node = _make_node(self.mod)
|
||||
# _read_proc_stat was stubbed to return None during _make_node
|
||||
# so prev_stat may be None — just confirm the property exists
|
||||
_ = node.prev_stat
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# SysmonNode._tick — JSON payload
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestSysmonNodeTick(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mod = _load_mod()
|
||||
|
||||
def _wired_node(self, **kwargs):
|
||||
"""Return a node with fully stubbed I/O."""
|
||||
stat_v1 = self.mod.parse_proc_stat(_STAT_2CORE)
|
||||
stat_v2 = self.mod.parse_proc_stat(_STAT_2CORE_V2)
|
||||
call_count = [0]
|
||||
|
||||
def fake_stat():
|
||||
call_count[0] += 1
|
||||
return stat_v2 if call_count[0] > 1 else stat_v1
|
||||
|
||||
node = _make_node(self.mod, **kwargs)
|
||||
node._read_proc_stat = fake_stat
|
||||
node._prev_stat = stat_v1
|
||||
node._read_meminfo = lambda: _MEMINFO
|
||||
node._read_disk_usage = lambda p: (64.0, 12.5, 19.5)
|
||||
node._read_gpu_load = lambda p: 42.0
|
||||
node._read_thermal = lambda g, t: {"CPU-therm": 47.5, "GPU-therm": 43.2}
|
||||
return node
|
||||
|
||||
def _get_payload(self, node) -> dict:
|
||||
node._tick()
|
||||
pub = node._pubs["/saltybot/system_resources"]
|
||||
self.assertTrue(len(pub.msgs) >= 1)
|
||||
return json.loads(pub.msgs[-1].data)
|
||||
|
||||
def test_payload_is_valid_json(self):
|
||||
node = self._wired_node()
|
||||
node._tick()
|
||||
pub = node._pubs["/saltybot/system_resources"]
|
||||
msg = pub.msgs[-1]
|
||||
data = json.loads(msg.data)
|
||||
self.assertIsInstance(data, dict)
|
||||
|
||||
def test_payload_has_required_keys(self):
|
||||
node = self._wired_node()
|
||||
payload = self._get_payload(node)
|
||||
for key in (
|
||||
"ts", "cpu_percent", "cpu_avg_percent",
|
||||
"ram_total_mb", "ram_used_mb", "ram_percent",
|
||||
"disk_total_gb", "disk_used_gb", "disk_percent",
|
||||
"gpu_percent", "thermal",
|
||||
):
|
||||
self.assertIn(key, payload, f"Missing key: {key}")
|
||||
|
||||
def test_ts_is_float(self):
|
||||
node = self._wired_node()
|
||||
payload = self._get_payload(node)
|
||||
self.assertIsInstance(payload["ts"], float)
|
||||
|
||||
def test_cpu_percent_is_list(self):
|
||||
node = self._wired_node()
|
||||
payload = self._get_payload(node)
|
||||
self.assertIsInstance(payload["cpu_percent"], list)
|
||||
|
||||
def test_gpu_percent_forwarded(self):
|
||||
node = self._wired_node()
|
||||
payload = self._get_payload(node)
|
||||
self.assertAlmostEqual(payload["gpu_percent"], 42.0, delta=0.1)
|
||||
|
||||
def test_disk_stats_forwarded(self):
|
||||
node = self._wired_node()
|
||||
payload = self._get_payload(node)
|
||||
self.assertAlmostEqual(payload["disk_total_gb"], 64.0, delta=0.1)
|
||||
self.assertAlmostEqual(payload["disk_used_gb"], 12.5, delta=0.1)
|
||||
self.assertAlmostEqual(payload["disk_percent"], 19.5, delta=0.1)
|
||||
|
||||
def test_ram_stats_forwarded(self):
|
||||
node = self._wired_node()
|
||||
payload = self._get_payload(node)
|
||||
self.assertGreater(payload["ram_total_mb"], 0)
|
||||
self.assertGreaterEqual(payload["ram_used_mb"], 0)
|
||||
|
||||
def test_thermal_is_dict(self):
|
||||
node = self._wired_node()
|
||||
payload = self._get_payload(node)
|
||||
self.assertIsInstance(payload["thermal"], dict)
|
||||
self.assertIn("CPU-therm", payload["thermal"])
|
||||
|
||||
def test_cpu_avg_matches_per_core(self):
|
||||
node = self._wired_node()
|
||||
payload = self._get_payload(node)
|
||||
per_core = payload["cpu_percent"][1:]
|
||||
if per_core:
|
||||
expected_avg = sum(per_core) / len(per_core)
|
||||
self.assertAlmostEqual(payload["cpu_avg_percent"], expected_avg, delta=0.1)
|
||||
|
||||
def test_publishes_once_per_tick(self):
|
||||
node = self._wired_node()
|
||||
node._tick()
|
||||
node._tick()
|
||||
pub = node._pubs["/saltybot/system_resources"]
|
||||
self.assertEqual(len(pub.msgs), 2)
|
||||
|
||||
def test_no_prev_stat_gives_empty_cpu(self):
|
||||
node = self._wired_node()
|
||||
node._prev_stat = None
|
||||
node._read_proc_stat = lambda: None
|
||||
payload = self._get_payload(node)
|
||||
self.assertEqual(payload["cpu_percent"], [])
|
||||
self.assertEqual(payload["cpu_avg_percent"], 0.0)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# CPU delta tracking across ticks
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestSysmonCpuDelta(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mod = _load_mod()
|
||||
|
||||
def test_prev_stat_updated_after_tick(self):
|
||||
stat_v1 = self.mod.parse_proc_stat(_STAT_2CORE)
|
||||
stat_v2 = self.mod.parse_proc_stat(_STAT_2CORE_V2)
|
||||
calls = [0]
|
||||
|
||||
def fake_stat():
|
||||
calls[0] += 1
|
||||
return stat_v2
|
||||
|
||||
node = _make_node(self.mod)
|
||||
node._prev_stat = stat_v1
|
||||
node._read_proc_stat = fake_stat
|
||||
node._read_meminfo = lambda: ""
|
||||
node._read_disk_usage = lambda p: (1.0, 0.5, 50.0)
|
||||
node._read_gpu_load = lambda p: 0.0
|
||||
node._read_thermal = lambda g, t: {}
|
||||
|
||||
node._tick()
|
||||
self.assertEqual(node.prev_stat, stat_v2)
|
||||
|
||||
def test_second_tick_uses_updated_prev(self):
|
||||
stat_v1 = self.mod.parse_proc_stat(_STAT_2CORE)
|
||||
stat_v2 = self.mod.parse_proc_stat(_STAT_2CORE_V2)
|
||||
seq = [stat_v1, stat_v2, stat_v1]
|
||||
idx = [0]
|
||||
|
||||
def fake_stat():
|
||||
v = seq[idx[0] % len(seq)]
|
||||
idx[0] += 1
|
||||
return v
|
||||
|
||||
node = _make_node(self.mod)
|
||||
node._prev_stat = None
|
||||
node._read_proc_stat = fake_stat
|
||||
node._read_meminfo = lambda: ""
|
||||
node._read_disk_usage = lambda p: (1.0, 0.5, 50.0)
|
||||
node._read_gpu_load = lambda p: 0.0
|
||||
node._read_thermal = lambda g, t: {}
|
||||
|
||||
node._tick()
|
||||
node._tick()
|
||||
pub = node._pubs["/saltybot/system_resources"]
|
||||
self.assertEqual(len(pub.msgs), 2)
|
||||
|
||||
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
# Source / entry-point sanity
|
||||
# ══════════════════════════════════════════════════════════════════════════════
|
||||
|
||||
class TestSysmonSource(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.mod = _load_mod()
|
||||
|
||||
def test_file_exists(self):
|
||||
src = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"..", "saltybot_social", "sysmon_node.py"
|
||||
)
|
||||
self.assertTrue(os.path.isfile(os.path.normpath(src)))
|
||||
|
||||
def test_main_callable(self):
|
||||
self.assertTrue(callable(self.mod.main))
|
||||
|
||||
def test_issue_tag_in_source(self):
|
||||
src = os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"..", "saltybot_social", "sysmon_node.py"
|
||||
)
|
||||
with open(src) as fh:
|
||||
content = fh.read()
|
||||
self.assertIn("355", content)
|
||||
|
||||
def test_status_constants_present(self):
|
||||
# Confirm key pure functions exported
|
||||
self.assertTrue(callable(self.mod.parse_proc_stat))
|
||||
self.assertTrue(callable(self.mod.cpu_percent_from_stats))
|
||||
self.assertTrue(callable(self.mod.parse_meminfo))
|
||||
self.assertTrue(callable(self.mod.compute_ram_stats))
|
||||
self.assertTrue(callable(self.mod.read_disk_usage))
|
||||
self.assertTrue(callable(self.mod.read_gpu_load))
|
||||
self.assertTrue(callable(self.mod.read_thermal_zones))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Loading…
x
Reference in New Issue
Block a user