Merge pull request 'feat(jetson): CPU/GPU thermal monitor — sysfs + /saltybot/thermal JSON (Issue #205)' (#209) from sl-jetson/issue-205-thermal into main
This commit is contained in:
commit
03e7995e66
@ -0,0 +1,6 @@
|
||||
thermal_node:
|
||||
ros__parameters:
|
||||
publish_rate_hz: 1.0 # Hz — publish rate for /saltybot/thermal
|
||||
warn_temp_c: 75.0 # Log WARN above this temperature (°C)
|
||||
throttle_temp_c: 85.0 # Log ERROR + set throttled=true above this (°C)
|
||||
thermal_root: "/sys/class/thermal" # Sysfs thermal root; override for tests
|
||||
42
jetson/ros2_ws/src/saltybot_thermal/launch/thermal.launch.py
Normal file
42
jetson/ros2_ws/src/saltybot_thermal/launch/thermal.launch.py
Normal file
@ -0,0 +1,42 @@
|
||||
"""thermal.launch.py — Launch the Jetson thermal monitor (Issue #205).
|
||||
|
||||
Usage:
|
||||
ros2 launch saltybot_thermal thermal.launch.py
|
||||
ros2 launch saltybot_thermal thermal.launch.py warn_temp_c:=70.0
|
||||
"""
|
||||
|
||||
import os
|
||||
from ament_index_python.packages import get_package_share_directory
|
||||
from launch import LaunchDescription
|
||||
from launch.actions import DeclareLaunchArgument
|
||||
from launch.substitutions import LaunchConfiguration
|
||||
from launch_ros.actions import Node
|
||||
|
||||
|
||||
def generate_launch_description():
|
||||
pkg = get_package_share_directory("saltybot_thermal")
|
||||
cfg = os.path.join(pkg, "config", "thermal_params.yaml")
|
||||
|
||||
return LaunchDescription([
|
||||
DeclareLaunchArgument("publish_rate_hz", default_value="1.0",
|
||||
description="Publish rate (Hz)"),
|
||||
DeclareLaunchArgument("warn_temp_c", default_value="75.0",
|
||||
description="WARN threshold (°C)"),
|
||||
DeclareLaunchArgument("throttle_temp_c", default_value="85.0",
|
||||
description="THROTTLE threshold (°C)"),
|
||||
|
||||
Node(
|
||||
package="saltybot_thermal",
|
||||
executable="thermal_node",
|
||||
name="thermal_node",
|
||||
output="screen",
|
||||
parameters=[
|
||||
cfg,
|
||||
{
|
||||
"publish_rate_hz": LaunchConfiguration("publish_rate_hz"),
|
||||
"warn_temp_c": LaunchConfiguration("warn_temp_c"),
|
||||
"throttle_temp_c": LaunchConfiguration("throttle_temp_c"),
|
||||
},
|
||||
],
|
||||
),
|
||||
])
|
||||
26
jetson/ros2_ws/src/saltybot_thermal/package.xml
Normal file
26
jetson/ros2_ws/src/saltybot_thermal/package.xml
Normal file
@ -0,0 +1,26 @@
|
||||
<?xml version="1.0"?>
|
||||
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||
<package format="3">
|
||||
<name>saltybot_thermal</name>
|
||||
<version>0.1.0</version>
|
||||
<description>
|
||||
Jetson thermal monitor (Issue #205). Reads /sys/class/thermal/thermal_zone*,
|
||||
publishes /saltybot/thermal JSON at 1 Hz, warns at 75 °C, throttles at 85 °C.
|
||||
</description>
|
||||
<maintainer email="sl-jetson@saltylab.local">sl-jetson</maintainer>
|
||||
<license>MIT</license>
|
||||
|
||||
<depend>rclpy</depend>
|
||||
<depend>std_msgs</depend>
|
||||
|
||||
<buildtool_depend>ament_python</buildtool_depend>
|
||||
|
||||
<test_depend>ament_copyright</test_depend>
|
||||
<test_depend>ament_flake8</test_depend>
|
||||
<test_depend>ament_pep257</test_depend>
|
||||
<test_depend>python3-pytest</test_depend>
|
||||
|
||||
<export>
|
||||
<build_type>ament_python</build_type>
|
||||
</export>
|
||||
</package>
|
||||
@ -0,0 +1,139 @@
|
||||
"""thermal_node.py — Jetson CPU/GPU thermal monitor.
|
||||
Issue #205
|
||||
|
||||
Reads every /sys/class/thermal/thermal_zone* sysfs entry, publishes a JSON
|
||||
blob on /saltybot/thermal at a configurable rate (default 1 Hz), and logs
|
||||
ROS2 WARN / ERROR when zone temperatures exceed configurable thresholds.
|
||||
|
||||
Published topic:
|
||||
/saltybot/thermal (std_msgs/String, JSON)
|
||||
|
||||
JSON schema:
|
||||
{
|
||||
"ts": <float unix seconds>,
|
||||
"zones": [
|
||||
{"zone": "CPU-therm", "index": 0, "temp_c": 42.5},
|
||||
...
|
||||
],
|
||||
"max_temp_c": 55.0,
|
||||
"throttled": false,
|
||||
"warn": false
|
||||
}
|
||||
|
||||
Parameters:
|
||||
publish_rate_hz (float, 1.0) — publish rate
|
||||
warn_temp_c (float, 75.0) — log WARN above this temperature
|
||||
throttle_temp_c (float, 85.0) — log ERROR and set throttled=true above this
|
||||
thermal_root (str, "/sys/class/thermal") — sysfs thermal root (override for tests)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from typing import List, Optional
|
||||
|
||||
import rclpy
|
||||
from rclpy.node import Node
|
||||
from rclpy.qos import QoSProfile
|
||||
from std_msgs.msg import String
|
||||
|
||||
|
||||
def read_thermal_zones(root: str) -> List[dict]:
|
||||
"""Return a list of {zone, index, temp_c} dicts from sysfs."""
|
||||
zones = []
|
||||
try:
|
||||
entries = sorted(os.listdir(root))
|
||||
except OSError:
|
||||
return zones
|
||||
for entry in entries:
|
||||
if not entry.startswith("thermal_zone"):
|
||||
continue
|
||||
try:
|
||||
idx = int(entry[len("thermal_zone"):])
|
||||
except ValueError:
|
||||
continue
|
||||
zone_dir = os.path.join(root, entry)
|
||||
try:
|
||||
with open(os.path.join(zone_dir, "type")) as f:
|
||||
zone_type = f.read().strip()
|
||||
except OSError:
|
||||
zone_type = entry
|
||||
try:
|
||||
with open(os.path.join(zone_dir, "temp")) as f:
|
||||
temp_mc = int(f.read().strip()) # millidegrees Celsius
|
||||
temp_c = round(temp_mc / 1000.0, 1)
|
||||
except (OSError, ValueError):
|
||||
continue
|
||||
zones.append({"zone": zone_type, "index": idx, "temp_c": temp_c})
|
||||
return zones
|
||||
|
||||
|
||||
class ThermalNode(Node):
|
||||
"""Reads Jetson thermal zones and publishes /saltybot/thermal at 1 Hz."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
super().__init__("thermal_node")
|
||||
|
||||
self.declare_parameter("publish_rate_hz", 1.0)
|
||||
self.declare_parameter("warn_temp_c", 75.0)
|
||||
self.declare_parameter("throttle_temp_c", 85.0)
|
||||
self.declare_parameter("thermal_root", "/sys/class/thermal")
|
||||
|
||||
self._rate = self.get_parameter("publish_rate_hz").value
|
||||
self._warn_t = self.get_parameter("warn_temp_c").value
|
||||
self._throttle_t = self.get_parameter("throttle_temp_c").value
|
||||
self._root = self.get_parameter("thermal_root").value
|
||||
|
||||
qos = QoSProfile(depth=10)
|
||||
self._pub = self.create_publisher(String, "/saltybot/thermal", qos)
|
||||
self._timer = self.create_timer(1.0 / self._rate, self._publish)
|
||||
|
||||
self.get_logger().info(
|
||||
f"ThermalNode ready (rate={self._rate} Hz, "
|
||||
f"warn={self._warn_t}°C, throttle={self._throttle_t}°C, "
|
||||
f"root={self._root})"
|
||||
)
|
||||
|
||||
def _publish(self) -> None:
|
||||
zones = read_thermal_zones(self._root)
|
||||
if not zones:
|
||||
self.get_logger().warn("No thermal zones found — check thermal_root param")
|
||||
return
|
||||
|
||||
max_temp = max(z["temp_c"] for z in zones)
|
||||
throttled = max_temp >= self._throttle_t
|
||||
warn = max_temp >= self._warn_t
|
||||
|
||||
payload = {
|
||||
"ts": time.time(),
|
||||
"zones": zones,
|
||||
"max_temp_c": max_temp,
|
||||
"throttled": throttled,
|
||||
"warn": warn,
|
||||
}
|
||||
msg = String()
|
||||
msg.data = json.dumps(payload)
|
||||
self._pub.publish(msg)
|
||||
|
||||
if throttled:
|
||||
self.get_logger().error(
|
||||
f"THERMAL THROTTLE: {max_temp}°C >= {self._throttle_t}°C"
|
||||
)
|
||||
elif warn:
|
||||
self.get_logger().warn(
|
||||
f"Thermal warning: {max_temp}°C >= {self._warn_t}°C"
|
||||
)
|
||||
|
||||
|
||||
def main(args: Optional[list] = None) -> None:
|
||||
rclpy.init(args=args)
|
||||
node = ThermalNode()
|
||||
try:
|
||||
rclpy.spin(node)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
finally:
|
||||
node.destroy_node()
|
||||
rclpy.shutdown()
|
||||
4
jetson/ros2_ws/src/saltybot_thermal/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_thermal/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
||||
[develop]
|
||||
script_dir=$base/lib/saltybot_thermal
|
||||
[egg_info]
|
||||
tag_date = 0
|
||||
27
jetson/ros2_ws/src/saltybot_thermal/setup.py
Normal file
27
jetson/ros2_ws/src/saltybot_thermal/setup.py
Normal file
@ -0,0 +1,27 @@
|
||||
from setuptools import setup
|
||||
|
||||
package_name = "saltybot_thermal"
|
||||
|
||||
setup(
|
||||
name=package_name,
|
||||
version="0.1.0",
|
||||
packages=[package_name],
|
||||
data_files=[
|
||||
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||
(f"share/{package_name}", ["package.xml"]),
|
||||
(f"share/{package_name}/launch", ["launch/thermal.launch.py"]),
|
||||
(f"share/{package_name}/config", ["config/thermal_params.yaml"]),
|
||||
],
|
||||
install_requires=["setuptools"],
|
||||
zip_safe=True,
|
||||
maintainer="sl-jetson",
|
||||
maintainer_email="sl-jetson@saltylab.local",
|
||||
description="Jetson thermal monitor — /saltybot/thermal JSON at 1 Hz",
|
||||
license="MIT",
|
||||
tests_require=["pytest"],
|
||||
entry_points={
|
||||
"console_scripts": [
|
||||
"thermal_node = saltybot_thermal.thermal_node:main",
|
||||
],
|
||||
},
|
||||
)
|
||||
303
jetson/ros2_ws/src/saltybot_thermal/test/test_thermal.py
Normal file
303
jetson/ros2_ws/src/saltybot_thermal/test/test_thermal.py
Normal file
@ -0,0 +1,303 @@
|
||||
"""test_thermal.py -- Unit tests for Issue #205 Jetson thermal monitor."""
|
||||
|
||||
from __future__ import annotations
|
||||
import json, os, time
|
||||
import pytest
|
||||
|
||||
|
||||
def _pkg_root():
|
||||
return os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
|
||||
def _read_src(rel_path):
|
||||
with open(os.path.join(_pkg_root(), rel_path)) as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
# ── Import the sysfs reader (no ROS required) ─────────────────────────────────
|
||||
|
||||
def _import_reader():
|
||||
import importlib.util, sys, types
|
||||
|
||||
# Build minimal ROS2 stubs so thermal_node.py imports without a ROS install
|
||||
def _stub(name):
|
||||
m = types.ModuleType(name)
|
||||
sys.modules[name] = m
|
||||
return m
|
||||
|
||||
rclpy_mod = _stub("rclpy")
|
||||
rclpy_node_mod = _stub("rclpy.node")
|
||||
rclpy_qos_mod = _stub("rclpy.qos")
|
||||
std_msgs_mod = _stub("std_msgs")
|
||||
std_msg_mod = _stub("std_msgs.msg")
|
||||
|
||||
class _Node:
|
||||
def __init__(self, *a, **kw): pass
|
||||
def declare_parameter(self, *a, **kw): pass
|
||||
def get_parameter(self, name):
|
||||
class _P:
|
||||
value = None
|
||||
return _P()
|
||||
def create_publisher(self, *a, **kw): return None
|
||||
def create_timer(self, *a, **kw): return None
|
||||
def get_logger(self):
|
||||
class _L:
|
||||
def info(self, *a): pass
|
||||
def warn(self, *a): pass
|
||||
def error(self, *a): pass
|
||||
return _L()
|
||||
def destroy_node(self): pass
|
||||
|
||||
class _QoSProfile:
|
||||
def __init__(self, **kw): pass
|
||||
|
||||
class _String:
|
||||
data = ""
|
||||
|
||||
rclpy_node_mod.Node = _Node
|
||||
rclpy_qos_mod.QoSProfile = _QoSProfile
|
||||
std_msg_mod.String = _String
|
||||
rclpy_mod.init = lambda *a, **kw: None
|
||||
rclpy_mod.spin = lambda node: None
|
||||
rclpy_mod.ok = lambda: True
|
||||
rclpy_mod.shutdown = lambda: None
|
||||
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"thermal_node_testmod",
|
||||
os.path.join(_pkg_root(), "saltybot_thermal", "thermal_node.py"),
|
||||
)
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
return mod
|
||||
|
||||
|
||||
# ── Sysfs fixture helpers ─────────────────────────────────────────────────────
|
||||
|
||||
def _make_zone(root, idx, zone_type, temp_mc):
|
||||
"""Create a fake thermal_zone<idx> directory under root."""
|
||||
zdir = os.path.join(str(root), "thermal_zone{}".format(idx))
|
||||
os.makedirs(zdir, exist_ok=True)
|
||||
with open(os.path.join(zdir, "type"), "w") as f:
|
||||
f.write(zone_type + "\n")
|
||||
with open(os.path.join(zdir, "temp"), "w") as f:
|
||||
f.write(str(temp_mc) + "\n")
|
||||
|
||||
|
||||
# ── read_thermal_zones ────────────────────────────────────────────────────────
|
||||
|
||||
class TestReadThermalZones:
|
||||
@pytest.fixture(scope="class")
|
||||
def mod(self):
|
||||
return _import_reader()
|
||||
|
||||
def test_empty_dir(self, mod, tmp_path):
|
||||
assert mod.read_thermal_zones(str(tmp_path)) == []
|
||||
|
||||
def test_missing_dir(self, mod):
|
||||
assert mod.read_thermal_zones("/nonexistent/path/xyz") == []
|
||||
|
||||
def test_single_zone(self, mod, tmp_path):
|
||||
_make_zone(tmp_path, 0, "CPU-therm", 45000)
|
||||
zones = mod.read_thermal_zones(str(tmp_path))
|
||||
assert len(zones) == 1
|
||||
assert zones[0]["zone"] == "CPU-therm"
|
||||
assert zones[0]["temp_c"] == 45.0
|
||||
assert zones[0]["index"] == 0
|
||||
|
||||
def test_temp_millidegrees_conversion(self, mod, tmp_path):
|
||||
_make_zone(tmp_path, 0, "GPU-therm", 72500)
|
||||
zones = mod.read_thermal_zones(str(tmp_path))
|
||||
assert zones[0]["temp_c"] == 72.5
|
||||
|
||||
def test_multiple_zones(self, mod, tmp_path):
|
||||
_make_zone(tmp_path, 0, "CPU-therm", 40000)
|
||||
_make_zone(tmp_path, 1, "GPU-therm", 55000)
|
||||
_make_zone(tmp_path, 2, "PMIC-Die", 38000)
|
||||
zones = mod.read_thermal_zones(str(tmp_path))
|
||||
assert len(zones) == 3
|
||||
|
||||
def test_sorted_by_index(self, mod, tmp_path):
|
||||
_make_zone(tmp_path, 2, "Z2", 20000)
|
||||
_make_zone(tmp_path, 0, "Z0", 10000)
|
||||
_make_zone(tmp_path, 1, "Z1", 15000)
|
||||
zones = mod.read_thermal_zones(str(tmp_path))
|
||||
indices = [z["index"] for z in zones]
|
||||
assert indices == sorted(indices)
|
||||
|
||||
def test_skips_non_zone_entries(self, mod, tmp_path):
|
||||
os.makedirs(os.path.join(str(tmp_path), "cooling_device0"))
|
||||
_make_zone(tmp_path, 0, "CPU-therm", 40000)
|
||||
zones = mod.read_thermal_zones(str(tmp_path))
|
||||
assert len(zones) == 1
|
||||
|
||||
def test_skips_zone_without_temp(self, mod, tmp_path):
|
||||
zdir = os.path.join(str(tmp_path), "thermal_zone0")
|
||||
os.makedirs(zdir)
|
||||
with open(os.path.join(zdir, "type"), "w") as f:
|
||||
f.write("CPU-therm\n")
|
||||
# No temp file — should be skipped
|
||||
zones = mod.read_thermal_zones(str(tmp_path))
|
||||
assert zones == []
|
||||
|
||||
def test_zone_type_fallback(self, mod, tmp_path):
|
||||
"""Zone without type file falls back to directory name."""
|
||||
zdir = os.path.join(str(tmp_path), "thermal_zone0")
|
||||
os.makedirs(zdir)
|
||||
with open(os.path.join(zdir, "temp"), "w") as f:
|
||||
f.write("40000\n")
|
||||
zones = mod.read_thermal_zones(str(tmp_path))
|
||||
assert len(zones) == 1
|
||||
assert zones[0]["zone"] == "thermal_zone0"
|
||||
|
||||
def test_temp_rounding(self, mod, tmp_path):
|
||||
_make_zone(tmp_path, 0, "CPU-therm", 72333)
|
||||
zones = mod.read_thermal_zones(str(tmp_path))
|
||||
assert zones[0]["temp_c"] == 72.3
|
||||
|
||||
|
||||
# ── Threshold logic (pure Python) ────────────────────────────────────────────
|
||||
|
||||
class TestThresholds:
|
||||
def _classify(self, temp_c, warn_t=75.0, throttle_t=85.0):
|
||||
throttled = temp_c >= throttle_t
|
||||
warn = temp_c >= warn_t
|
||||
return throttled, warn
|
||||
|
||||
def test_normal(self):
|
||||
t, w = self._classify(50.0)
|
||||
assert not t and not w
|
||||
|
||||
def test_warn_boundary(self):
|
||||
t, w = self._classify(75.0)
|
||||
assert not t and w
|
||||
|
||||
def test_below_warn(self):
|
||||
t, w = self._classify(74.9)
|
||||
assert not t and not w
|
||||
|
||||
def test_throttle_boundary(self):
|
||||
t, w = self._classify(85.0)
|
||||
assert t and w
|
||||
|
||||
def test_above_throttle(self):
|
||||
t, w = self._classify(90.0)
|
||||
assert t and w
|
||||
|
||||
def test_custom_thresholds(self):
|
||||
t, w = self._classify(70.0, warn_t=70.0, throttle_t=80.0)
|
||||
assert not t and w
|
||||
|
||||
def test_max_temp_drives_status(self):
|
||||
zones = [{"temp_c": 40.0}, {"temp_c": 86.0}, {"temp_c": 55.0}]
|
||||
max_t = max(z["temp_c"] for z in zones)
|
||||
assert max_t == 86.0
|
||||
t, w = self._classify(max_t)
|
||||
assert t and w
|
||||
|
||||
|
||||
# ── JSON payload schema ───────────────────────────────────────────────────────
|
||||
|
||||
class TestJsonPayload:
|
||||
def _make_payload(self, zones, warn_t=75.0, throttle_t=85.0):
|
||||
max_temp = max(z["temp_c"] for z in zones) if zones else 0.0
|
||||
return {
|
||||
"ts": time.time(),
|
||||
"zones": zones,
|
||||
"max_temp_c": max_temp,
|
||||
"throttled": max_temp >= throttle_t,
|
||||
"warn": max_temp >= warn_t,
|
||||
}
|
||||
|
||||
def test_has_ts(self):
|
||||
p = self._make_payload([{"zone": "CPU", "index": 0, "temp_c": 40.0}])
|
||||
assert "ts" in p and isinstance(p["ts"], float)
|
||||
|
||||
def test_has_zones(self):
|
||||
p = self._make_payload([{"zone": "CPU", "index": 0, "temp_c": 40.0}])
|
||||
assert "zones" in p and len(p["zones"]) == 1
|
||||
|
||||
def test_has_max_temp(self):
|
||||
p = self._make_payload([{"zone": "CPU", "index": 0, "temp_c": 55.0}])
|
||||
assert p["max_temp_c"] == 55.0
|
||||
|
||||
def test_throttled_false_below(self):
|
||||
p = self._make_payload([{"zone": "CPU", "index": 0, "temp_c": 60.0}])
|
||||
assert p["throttled"] is False
|
||||
|
||||
def test_warn_true_at_threshold(self):
|
||||
p = self._make_payload([{"zone": "CPU", "index": 0, "temp_c": 75.0}])
|
||||
assert p["warn"] is True and p["throttled"] is False
|
||||
|
||||
def test_throttled_true_above(self):
|
||||
p = self._make_payload([{"zone": "CPU", "index": 0, "temp_c": 90.0}])
|
||||
assert p["throttled"] is True
|
||||
|
||||
def test_json_serializable(self):
|
||||
zones = [{"zone": "CPU", "index": 0, "temp_c": 50.0}]
|
||||
p = self._make_payload(zones)
|
||||
blob = json.dumps(p)
|
||||
parsed = json.loads(blob)
|
||||
assert parsed["max_temp_c"] == 50.0
|
||||
|
||||
def test_multi_zone_max(self):
|
||||
zones = [
|
||||
{"zone": "CPU-therm", "index": 0, "temp_c": 55.0},
|
||||
{"zone": "GPU-therm", "index": 1, "temp_c": 78.0},
|
||||
{"zone": "PMIC-Die", "index": 2, "temp_c": 38.0},
|
||||
]
|
||||
p = self._make_payload(zones)
|
||||
assert p["max_temp_c"] == 78.0
|
||||
assert p["warn"] is True
|
||||
assert p["throttled"] is False
|
||||
|
||||
|
||||
# ── Node source checks ────────────────────────────────────────────────────────
|
||||
|
||||
class TestNodeSrc:
|
||||
@pytest.fixture(scope="class")
|
||||
def src(self):
|
||||
return _read_src("saltybot_thermal/thermal_node.py")
|
||||
|
||||
def test_class_defined(self, src): assert "class ThermalNode" in src
|
||||
def test_publish_rate_param(self, src): assert '"publish_rate_hz"' in src
|
||||
def test_warn_param(self, src): assert '"warn_temp_c"' in src
|
||||
def test_throttle_param(self, src): assert '"throttle_temp_c"' in src
|
||||
def test_thermal_root_param(self, src): assert '"thermal_root"' in src
|
||||
def test_topic(self, src): assert '"/saltybot/thermal"' in src
|
||||
def test_read_fn(self, src): assert "read_thermal_zones" in src
|
||||
def test_warn_log(self, src): assert "warn" in src.lower()
|
||||
def test_error_log(self, src): assert "error" in src.lower()
|
||||
def test_throttled_flag(self, src): assert '"throttled"' in src
|
||||
def test_warn_flag(self, src): assert '"warn"' in src
|
||||
def test_max_temp(self, src): assert '"max_temp_c"' in src
|
||||
def test_millidegrees(self, src): assert "1000" in src
|
||||
def test_json_dumps(self, src): assert "json.dumps" in src
|
||||
def test_issue_tag(self, src): assert "205" in src
|
||||
def test_main(self, src): assert "def main" in src
|
||||
def test_sysfs_path(self, src): assert "/sys/class/thermal" in src
|
||||
|
||||
|
||||
# ── Package metadata ──────────────────────────────────────────────────────────
|
||||
|
||||
class TestPackageMeta:
|
||||
@pytest.fixture(scope="class")
|
||||
def pkg_xml(self):
|
||||
return _read_src("package.xml")
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def setup_py(self):
|
||||
return _read_src("setup.py")
|
||||
|
||||
@pytest.fixture(scope="class")
|
||||
def cfg(self):
|
||||
return _read_src("config/thermal_params.yaml")
|
||||
|
||||
def test_pkg_name(self, pkg_xml): assert "saltybot_thermal" in pkg_xml
|
||||
def test_issue_tag(self, pkg_xml): assert "205" in pkg_xml
|
||||
def test_entry_point(self, setup_py): assert "thermal_node = saltybot_thermal.thermal_node:main" in setup_py
|
||||
def test_cfg_node_name(self, cfg): assert "thermal_node:" in cfg
|
||||
def test_cfg_warn(self, cfg): assert "warn_temp_c" in cfg
|
||||
def test_cfg_throttle(self, cfg): assert "throttle_temp_c" in cfg
|
||||
def test_cfg_rate(self, cfg): assert "publish_rate_hz" in cfg
|
||||
def test_cfg_defaults(self, cfg):
|
||||
assert "75.0" in cfg and "85.0" in cfg and "1.0" in cfg
|
||||
Loading…
x
Reference in New Issue
Block a user