"""test_thermal.py -- Unit tests for Issue #205 Jetson thermal monitor.""" from __future__ import annotations import json, os, time import pytest def _pkg_root(): return os.path.dirname(os.path.dirname(os.path.abspath(__file__))) def _read_src(rel_path): with open(os.path.join(_pkg_root(), rel_path)) as f: return f.read() # ── Import the sysfs reader (no ROS required) ───────────────────────────────── def _import_reader(): import importlib.util, sys, types # Build minimal ROS2 stubs so thermal_node.py imports without a ROS install def _stub(name): m = types.ModuleType(name) sys.modules[name] = m return m rclpy_mod = _stub("rclpy") rclpy_node_mod = _stub("rclpy.node") rclpy_qos_mod = _stub("rclpy.qos") std_msgs_mod = _stub("std_msgs") std_msg_mod = _stub("std_msgs.msg") class _Node: def __init__(self, *a, **kw): pass def declare_parameter(self, *a, **kw): pass def get_parameter(self, name): class _P: value = None return _P() def create_publisher(self, *a, **kw): return None def create_timer(self, *a, **kw): return None def get_logger(self): class _L: def info(self, *a): pass def warn(self, *a): pass def error(self, *a): pass return _L() def destroy_node(self): pass class _QoSProfile: def __init__(self, **kw): pass class _String: data = "" rclpy_node_mod.Node = _Node rclpy_qos_mod.QoSProfile = _QoSProfile std_msg_mod.String = _String rclpy_mod.init = lambda *a, **kw: None rclpy_mod.spin = lambda node: None rclpy_mod.ok = lambda: True rclpy_mod.shutdown = lambda: None spec = importlib.util.spec_from_file_location( "thermal_node_testmod", os.path.join(_pkg_root(), "saltybot_thermal", "thermal_node.py"), ) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) return mod # ── Sysfs fixture helpers ───────────────────────────────────────────────────── def _make_zone(root, idx, zone_type, temp_mc): """Create a fake thermal_zone directory under root.""" zdir = os.path.join(str(root), "thermal_zone{}".format(idx)) os.makedirs(zdir, exist_ok=True) with open(os.path.join(zdir, "type"), "w") as f: f.write(zone_type + "\n") with open(os.path.join(zdir, "temp"), "w") as f: f.write(str(temp_mc) + "\n") # ── read_thermal_zones ──────────────────────────────────────────────────────── class TestReadThermalZones: @pytest.fixture(scope="class") def mod(self): return _import_reader() def test_empty_dir(self, mod, tmp_path): assert mod.read_thermal_zones(str(tmp_path)) == [] def test_missing_dir(self, mod): assert mod.read_thermal_zones("/nonexistent/path/xyz") == [] def test_single_zone(self, mod, tmp_path): _make_zone(tmp_path, 0, "CPU-therm", 45000) zones = mod.read_thermal_zones(str(tmp_path)) assert len(zones) == 1 assert zones[0]["zone"] == "CPU-therm" assert zones[0]["temp_c"] == 45.0 assert zones[0]["index"] == 0 def test_temp_millidegrees_conversion(self, mod, tmp_path): _make_zone(tmp_path, 0, "GPU-therm", 72500) zones = mod.read_thermal_zones(str(tmp_path)) assert zones[0]["temp_c"] == 72.5 def test_multiple_zones(self, mod, tmp_path): _make_zone(tmp_path, 0, "CPU-therm", 40000) _make_zone(tmp_path, 1, "GPU-therm", 55000) _make_zone(tmp_path, 2, "PMIC-Die", 38000) zones = mod.read_thermal_zones(str(tmp_path)) assert len(zones) == 3 def test_sorted_by_index(self, mod, tmp_path): _make_zone(tmp_path, 2, "Z2", 20000) _make_zone(tmp_path, 0, "Z0", 10000) _make_zone(tmp_path, 1, "Z1", 15000) zones = mod.read_thermal_zones(str(tmp_path)) indices = [z["index"] for z in zones] assert indices == sorted(indices) def test_skips_non_zone_entries(self, mod, tmp_path): os.makedirs(os.path.join(str(tmp_path), "cooling_device0")) _make_zone(tmp_path, 0, "CPU-therm", 40000) zones = mod.read_thermal_zones(str(tmp_path)) assert len(zones) == 1 def test_skips_zone_without_temp(self, mod, tmp_path): zdir = os.path.join(str(tmp_path), "thermal_zone0") os.makedirs(zdir) with open(os.path.join(zdir, "type"), "w") as f: f.write("CPU-therm\n") # No temp file — should be skipped zones = mod.read_thermal_zones(str(tmp_path)) assert zones == [] def test_zone_type_fallback(self, mod, tmp_path): """Zone without type file falls back to directory name.""" zdir = os.path.join(str(tmp_path), "thermal_zone0") os.makedirs(zdir) with open(os.path.join(zdir, "temp"), "w") as f: f.write("40000\n") zones = mod.read_thermal_zones(str(tmp_path)) assert len(zones) == 1 assert zones[0]["zone"] == "thermal_zone0" def test_temp_rounding(self, mod, tmp_path): _make_zone(tmp_path, 0, "CPU-therm", 72333) zones = mod.read_thermal_zones(str(tmp_path)) assert zones[0]["temp_c"] == 72.3 # ── Threshold logic (pure Python) ──────────────────────────────────────────── class TestThresholds: def _classify(self, temp_c, warn_t=75.0, throttle_t=85.0): throttled = temp_c >= throttle_t warn = temp_c >= warn_t return throttled, warn def test_normal(self): t, w = self._classify(50.0) assert not t and not w def test_warn_boundary(self): t, w = self._classify(75.0) assert not t and w def test_below_warn(self): t, w = self._classify(74.9) assert not t and not w def test_throttle_boundary(self): t, w = self._classify(85.0) assert t and w def test_above_throttle(self): t, w = self._classify(90.0) assert t and w def test_custom_thresholds(self): t, w = self._classify(70.0, warn_t=70.0, throttle_t=80.0) assert not t and w def test_max_temp_drives_status(self): zones = [{"temp_c": 40.0}, {"temp_c": 86.0}, {"temp_c": 55.0}] max_t = max(z["temp_c"] for z in zones) assert max_t == 86.0 t, w = self._classify(max_t) assert t and w # ── JSON payload schema ─────────────────────────────────────────────────────── class TestJsonPayload: def _make_payload(self, zones, warn_t=75.0, throttle_t=85.0): max_temp = max(z["temp_c"] for z in zones) if zones else 0.0 return { "ts": time.time(), "zones": zones, "max_temp_c": max_temp, "throttled": max_temp >= throttle_t, "warn": max_temp >= warn_t, } def test_has_ts(self): p = self._make_payload([{"zone": "CPU", "index": 0, "temp_c": 40.0}]) assert "ts" in p and isinstance(p["ts"], float) def test_has_zones(self): p = self._make_payload([{"zone": "CPU", "index": 0, "temp_c": 40.0}]) assert "zones" in p and len(p["zones"]) == 1 def test_has_max_temp(self): p = self._make_payload([{"zone": "CPU", "index": 0, "temp_c": 55.0}]) assert p["max_temp_c"] == 55.0 def test_throttled_false_below(self): p = self._make_payload([{"zone": "CPU", "index": 0, "temp_c": 60.0}]) assert p["throttled"] is False def test_warn_true_at_threshold(self): p = self._make_payload([{"zone": "CPU", "index": 0, "temp_c": 75.0}]) assert p["warn"] is True and p["throttled"] is False def test_throttled_true_above(self): p = self._make_payload([{"zone": "CPU", "index": 0, "temp_c": 90.0}]) assert p["throttled"] is True def test_json_serializable(self): zones = [{"zone": "CPU", "index": 0, "temp_c": 50.0}] p = self._make_payload(zones) blob = json.dumps(p) parsed = json.loads(blob) assert parsed["max_temp_c"] == 50.0 def test_multi_zone_max(self): zones = [ {"zone": "CPU-therm", "index": 0, "temp_c": 55.0}, {"zone": "GPU-therm", "index": 1, "temp_c": 78.0}, {"zone": "PMIC-Die", "index": 2, "temp_c": 38.0}, ] p = self._make_payload(zones) assert p["max_temp_c"] == 78.0 assert p["warn"] is True assert p["throttled"] is False # ── Node source checks ──────────────────────────────────────────────────────── class TestNodeSrc: @pytest.fixture(scope="class") def src(self): return _read_src("saltybot_thermal/thermal_node.py") def test_class_defined(self, src): assert "class ThermalNode" in src def test_publish_rate_param(self, src): assert '"publish_rate_hz"' in src def test_warn_param(self, src): assert '"warn_temp_c"' in src def test_throttle_param(self, src): assert '"throttle_temp_c"' in src def test_thermal_root_param(self, src): assert '"thermal_root"' in src def test_topic(self, src): assert '"/saltybot/thermal"' in src def test_read_fn(self, src): assert "read_thermal_zones" in src def test_warn_log(self, src): assert "warn" in src.lower() def test_error_log(self, src): assert "error" in src.lower() def test_throttled_flag(self, src): assert '"throttled"' in src def test_warn_flag(self, src): assert '"warn"' in src def test_max_temp(self, src): assert '"max_temp_c"' in src def test_millidegrees(self, src): assert "1000" in src def test_json_dumps(self, src): assert "json.dumps" in src def test_issue_tag(self, src): assert "205" in src def test_main(self, src): assert "def main" in src def test_sysfs_path(self, src): assert "/sys/class/thermal" in src # ── Package metadata ────────────────────────────────────────────────────────── class TestPackageMeta: @pytest.fixture(scope="class") def pkg_xml(self): return _read_src("package.xml") @pytest.fixture(scope="class") def setup_py(self): return _read_src("setup.py") @pytest.fixture(scope="class") def cfg(self): return _read_src("config/thermal_params.yaml") def test_pkg_name(self, pkg_xml): assert "saltybot_thermal" in pkg_xml def test_issue_tag(self, pkg_xml): assert "205" in pkg_xml def test_entry_point(self, setup_py): assert "thermal_node = saltybot_thermal.thermal_node:main" in setup_py def test_cfg_node_name(self, cfg): assert "thermal_node:" in cfg def test_cfg_warn(self, cfg): assert "warn_temp_c" in cfg def test_cfg_throttle(self, cfg): assert "throttle_temp_c" in cfg def test_cfg_rate(self, cfg): assert "publish_rate_hz" in cfg def test_cfg_defaults(self, cfg): assert "75.0" in cfg and "85.0" in cfg and "1.0" in cfg