Merge pull request 'feat: UWB anchor auto-calibration via inter-anchor ranging + MDS (Issue #602)' (#608) from sl-uwb/issue-602-anchor-calibration into main

This commit is contained in:
sl-jetson 2026-03-14 15:54:56 -04:00
commit 6c00d6a321
21 changed files with 1518 additions and 2 deletions

View File

@ -27,6 +27,8 @@
* AT+RANGE_ADDR=<hex_addr> pair with specific tag (filter others) * AT+RANGE_ADDR=<hex_addr> pair with specific tag (filter others)
* AT+RANGE_ADDR= clear pairing (accept all tags) * AT+RANGE_ADDR= clear pairing (accept all tags)
* AT+ID? returns +ID:<anchor_id> * AT+ID? returns +ID:<anchor_id>
* AT+PEER_RANGE=<id> inter-anchor DS-TWR (for auto-calibration)
* +PEER_RANGE:<my>,<peer>,<mm>,<rssi>
* *
* Pin mapping Makerfabs ESP32 UWB Pro * Pin mapping Makerfabs ESP32 UWB Pro
* *
@ -188,6 +190,116 @@ static float rx_power_dbm(void) {
return 10.0f * log10f((f * f) / (n * n)) - 121.74f; return 10.0f * log10f((f * f) / (n * n)) - 121.74f;
} }
/* ── Peer-anchor ranging (initiator role, for auto-calibration) ─── */
/* Timeout waiting for peer's RESP during inter-anchor ranging */
#define PEER_RX_RESP_TIMEOUT_US 3500
/* Block up to this many ms waiting for the interrupt flags */
#define PEER_WAIT_MS 20
/* Initiate a single DS-TWR exchange toward peer anchor `peer_id`.
* Returns range in mm (>=0) on success, or -1 on timeout/error.
* RSSI is stored in *rssi_out if non-null.
*
* Exchange:
* This anchor peer POLL
* peer This RESP (carries T_poll_rx, T_resp_tx)
* This anchor peer FINAL (carries Ra, Da)
* This side computes its own range estimate from Ra/Da.
*/
static int32_t peer_range_once(uint8_t peer_id, float *rssi_out) {
/* ── Reset interrupt flags ── */
g_tx_done = g_rx_ok = g_rx_err = g_rx_to = false;
/* ── Build POLL frame ── */
uint8_t poll_buf[FRAME_HDR + FCS_LEN] = {
FTYPE_POLL,
(uint8_t)ANCHOR_ID,
peer_id,
};
dwt_writetxdata(sizeof(poll_buf), poll_buf, 0);
dwt_writetxfctrl(sizeof(poll_buf), 0, 1);
dwt_setrxtimeout(PEER_RX_RESP_TIMEOUT_US);
dwt_rxenable(DWT_START_RX_IMMEDIATE);
if (dwt_starttx(DWT_START_TX_IMMEDIATE | DWT_RESPONSE_EXPECTED) != DWT_SUCCESS)
return -1;
/* Wait for TX done */
uint32_t t0 = millis();
while (!g_tx_done && !g_rx_err && !g_rx_to) {
if (millis() - t0 > (uint32_t)PEER_WAIT_MS) return -1;
}
if (g_rx_err || g_rx_to) return -1;
g_tx_done = false;
/* Capture T_poll_tx */
uint64_t T_poll_tx;
dwt_readtxtimestamp((uint8_t *)&T_poll_tx);
T_poll_tx &= DWT_TS_MASK;
/* Wait for RESP */
t0 = millis();
while (!g_rx_ok && !g_rx_err && !g_rx_to) {
if (millis() - t0 > (uint32_t)PEER_WAIT_MS) return -1;
}
if (!g_rx_ok) return -1;
/* Validate RESP */
if (g_rx_len < (uint32_t)(FRAME_HDR + RESP_PAYLOAD + FCS_LEN)) return -1;
if (g_rx_buf[0] != FTYPE_RESP) return -1;
if (g_rx_buf[1] != peer_id) return -1;
if (g_rx_buf[2] != (uint8_t)ANCHOR_ID) return -1;
uint64_t T_resp_rx;
dwt_readrxtimestamp((uint8_t *)&T_resp_rx);
T_resp_rx &= DWT_TS_MASK;
/* Extract peer timestamps from RESP payload */
const uint8_t *pl = g_rx_buf + FRAME_HDR;
uint64_t T_poll_rx_peer = ts_read(pl);
uint64_t T_resp_tx_peer = ts_read(pl + 5);
/* DS-TWR Ra, Da */
uint64_t Ra = ts_diff(T_resp_rx, T_poll_tx);
uint64_t Da = ts_diff(T_resp_tx_peer, T_poll_rx_peer);
g_rx_ok = g_rx_err = g_rx_to = false;
/* ── Build FINAL frame ── */
uint8_t final_buf[FRAME_HDR + FINAL_PAYLOAD + FCS_LEN];
final_buf[0] = FTYPE_FINAL;
final_buf[1] = (uint8_t)ANCHOR_ID;
final_buf[2] = peer_id;
ts_write(final_buf + FRAME_HDR, Ra);
ts_write(final_buf + FRAME_HDR + 5, Da);
ts_write(final_buf + FRAME_HDR + 10, (uint64_t)0); /* Db placeholder */
dwt_setrxtimeout(0);
dwt_writetxdata(sizeof(final_buf), final_buf, 0);
dwt_writetxfctrl(sizeof(final_buf), 0, 1);
if (dwt_starttx(DWT_START_TX_IMMEDIATE) != DWT_SUCCESS) return -1;
t0 = millis();
while (!g_tx_done && !g_rx_err) {
if (millis() - t0 > (uint32_t)PEER_WAIT_MS) return -1;
}
g_tx_done = false;
/* Simplified one-sided range estimate: tof = Ra - Da/2 */
double tof_s = ticks_to_s(Ra) - ticks_to_s(Da) / 2.0;
if (tof_s < 0.0) tof_s = 0.0;
int32_t range_mm = (int32_t)(tof_s * SPEED_OF_LIGHT * 1000.0);
if (rssi_out) *rssi_out = rx_power_dbm();
/* Re-enable normal RX for tag ranging */
dwt_setrxtimeout(0);
dwt_rxenable(DWT_START_RX_IMMEDIATE);
return range_mm;
}
/* ── AT command handler ─────────────────────────────────────────── */ /* ── AT command handler ─────────────────────────────────────────── */
static char g_at_buf[64]; static char g_at_buf[64];
@ -212,6 +324,23 @@ static void at_dispatch(const char *cmd) {
g_paired_tag_id = (uint8_t)strtoul(v, nullptr, 0); g_paired_tag_id = (uint8_t)strtoul(v, nullptr, 0);
Serial.printf("+OK:PAIRED=0x%02X\r\n", g_paired_tag_id); Serial.printf("+OK:PAIRED=0x%02X\r\n", g_paired_tag_id);
} }
} else if (strncmp(cmd, "AT+PEER_RANGE=", 14) == 0) {
/* Inter-anchor ranging for calibration.
* Usage: AT+PEER_RANGE=<peer_anchor_id>
* Response: +PEER_RANGE:<my_id>,<peer_id>,<range_mm>,<rssi_dbm>
* or: +PEER_RANGE:ERR,<peer_id>,TIMEOUT
*/
uint8_t peer_id = (uint8_t)strtoul(cmd + 14, nullptr, 0);
float rssi = 0.0f;
int32_t mm = peer_range_once(peer_id, &rssi);
if (mm >= 0) {
Serial.printf("+PEER_RANGE:%d,%d,%ld,%.1f\r\n",
ANCHOR_ID, peer_id, mm, (double)rssi);
} else {
Serial.printf("+PEER_RANGE:ERR,%d,TIMEOUT\r\n", peer_id);
}
} else { } else {
Serial.println("+ERR:UNKNOWN_CMD"); Serial.println("+ERR:UNKNOWN_CMD");
} }

View File

@ -3,6 +3,7 @@
#include <stdint.h> #include <stdint.h>
#include "mpu6000.h" #include "mpu6000.h"
#include "slope_estimator.h"
/* /*
* SaltyLab Balance Controller * SaltyLab Balance Controller
@ -36,6 +37,9 @@ typedef struct {
/* Safety */ /* Safety */
float max_tilt; /* Cutoff angle (degrees) */ float max_tilt; /* Cutoff angle (degrees) */
int16_t max_speed; /* Speed limit */ int16_t max_speed; /* Speed limit */
/* Slope compensation (Issue #600) */
slope_estimator_t slope;
} balance_t; } balance_t;
void balance_init(balance_t *b); void balance_init(balance_t *b);

View File

@ -87,6 +87,7 @@
#define JLINK_TLM_SCHED 0x85u /* jlink_tlm_sched_t (1+N*16 bytes, Issue #550) */ #define JLINK_TLM_SCHED 0x85u /* jlink_tlm_sched_t (1+N*16 bytes, Issue #550) */
#define JLINK_TLM_MOTOR_CURRENT 0x86u /* jlink_tlm_motor_current_t (8 bytes, Issue #584) */ #define JLINK_TLM_MOTOR_CURRENT 0x86u /* jlink_tlm_motor_current_t (8 bytes, Issue #584) */
#define JLINK_TLM_FAULT_LOG 0x87u /* jlink_tlm_fault_log_t (20 bytes, Issue #565) */ #define JLINK_TLM_FAULT_LOG 0x87u /* jlink_tlm_fault_log_t (20 bytes, Issue #565) */
#define JLINK_TLM_SLOPE 0x88u /* jlink_tlm_slope_t (4 bytes, Issue #600) */
/* ---- Telemetry STATUS payload (20 bytes, packed) ---- */ /* ---- Telemetry STATUS payload (20 bytes, packed) ---- */
typedef struct __attribute__((packed)) { typedef struct __attribute__((packed)) {
@ -164,6 +165,14 @@ typedef struct __attribute__((packed)) {
uint8_t _pad; /* reserved */ uint8_t _pad; /* reserved */
} jlink_tlm_motor_current_t; /* 8 bytes */ } jlink_tlm_motor_current_t; /* 8 bytes */
/* ---- Telemetry SLOPE payload (4 bytes, packed) Issue #600 ---- */
/* Published at SLOPE_TLM_HZ (1 Hz); reports slow-adapting terrain slope estimate. */
typedef struct __attribute__((packed)) {
int16_t slope_x100; /* slope estimate degrees x100 (0.01° resolution) */
uint8_t active; /* 1 = slope compensation enabled */
uint8_t _pad; /* reserved */
} jlink_tlm_slope_t; /* 4 bytes */
/* ---- Telemetry FAULT_LOG payload (20 bytes, packed) Issue #565 ---- */ /* ---- Telemetry FAULT_LOG payload (20 bytes, packed) Issue #565 ---- */
/* Sent on boot (if last fault != NONE) and in response to FAULT_LOG_GET. */ /* Sent on boot (if last fault != NONE) and in response to FAULT_LOG_GET. */
typedef struct __attribute__((packed)) { typedef struct __attribute__((packed)) {
@ -272,6 +281,13 @@ JLinkSchedSetBuf *jlink_get_sched_set(void);
*/ */
void jlink_send_motor_current_tlm(const jlink_tlm_motor_current_t *tlm); void jlink_send_motor_current_tlm(const jlink_tlm_motor_current_t *tlm);
/*
* jlink_send_slope_tlm(tlm) - transmit JLINK_TLM_SLOPE (0x88) frame
* (10 bytes total) to Jetson. Issue #600.
* Rate-limiting is handled by slope_estimator_send_tlm(); call from there only.
*/
void jlink_send_slope_tlm(const jlink_tlm_slope_t *tlm);
/* /*
* jlink_send_fault_log(fl) - transmit JLINK_TLM_FAULT_LOG (0x87) frame * jlink_send_fault_log(fl) - transmit JLINK_TLM_FAULT_LOG (0x87) frame
* (26 bytes) on boot (if fault log non-empty) and in response to * (26 bytes) on boot (if fault log non-empty) and in response to

101
include/slope_estimator.h Normal file
View File

@ -0,0 +1,101 @@
#ifndef SLOPE_ESTIMATOR_H
#define SLOPE_ESTIMATOR_H
#include <stdint.h>
#include <stdbool.h>
/*
* slope_estimator slow-adapting terrain slope estimator for Issue #600.
*
* On a slope the robot must lean slightly into the hill to stay balanced.
* The IMU pitch reading therefore includes both the robot's balance offset
* and the ground incline. This module decouples the two by tracking the
* slowly-changing DC component of the pitch signal with a first-order IIR
* low-pass filter (time constant SLOPE_TAU_S = 5 s).
*
* HOW IT WORKS:
* Every call to slope_estimator_update(pitch_deg, dt):
*
* alpha = dt / (SLOPE_TAU_S + dt) // ≈ 0.0002 at 1 kHz
* raw = slope * (1 - alpha) + pitch * alpha
* slope = clamp(raw, -SLOPE_MAX_DEG, +SLOPE_MAX_DEG)
*
* The IIR converges to the steady-state pitch in ~5 s. Fast tilt
* transients (balance corrections, steps, bumps) are attenuated by
* the long time constant and do not corrupt the estimate.
*
* INTEGRATION IN BALANCE PID:
* Subtract slope_estimate from the measured pitch before computing
* the PID error so the controller balances around the slope surface
* rather than absolute vertical:
*
* tilt_corrected = pitch_deg - slope_estimate_deg
* error = setpoint - tilt_corrected
*
* This is equivalent to continuously adjusting the balance setpoint
* to track the incline.
*
* SAFETY:
* - Estimate is clamped to ±SLOPE_MAX_DEG (15°) to prevent drift from
* extreme falls being mistaken for genuine slopes.
* - slope_estimator_reset() zeroes the state; call on disarm or after
* a tilt fault so re-arming starts fresh.
*
* TELEMETRY:
* JLINK_TLM_SLOPE (0x88) published at SLOPE_TLM_HZ (1 Hz):
* jlink_tlm_slope_t { int16 slope_x100, uint8 active, uint8 _pad }
*/
/* ---- Configuration ---- */
#define SLOPE_TAU_S 5.0f /* IIR time constant (seconds) */
#define SLOPE_MAX_DEG 15.0f /* Maximum estimate magnitude (degrees) */
#define SLOPE_TLM_HZ 1u /* JLINK_TLM_SLOPE publish rate (Hz) */
/* ---- State ---- */
typedef struct {
float estimate_deg; /* current slope estimate (degrees, + = nose-up) */
bool enabled; /* compensation on/off; off = estimate frozen */
uint32_t last_tlm_ms; /* timestamp of last TLM transmission */
} slope_estimator_t;
/* ---- API ---- */
/*
* slope_estimator_init(se) zero state, enable estimation.
* Call once during system init.
*/
void slope_estimator_init(slope_estimator_t *se);
/*
* slope_estimator_reset(se) zero estimate without changing enabled flag.
* Call on disarm or after BALANCE_TILT_FAULT to avoid stale state on rearm.
*/
void slope_estimator_reset(slope_estimator_t *se);
/*
* slope_estimator_update(se, pitch_deg, dt) advance the IIR filter.
* pitch_deg : current fused pitch angle from IMU (degrees)
* dt : loop interval (seconds)
* No-op if se->enabled == false or dt <= 0.
*/
void slope_estimator_update(slope_estimator_t *se, float pitch_deg, float dt);
/*
* slope_estimator_get_deg(se) return current estimate (degrees).
* Returns 0 if disabled.
*/
float slope_estimator_get_deg(const slope_estimator_t *se);
/*
* slope_estimator_set_enabled(se, en) enable or disable compensation.
* Disabling freezes the estimate at its current value.
*/
void slope_estimator_set_enabled(slope_estimator_t *se, bool en);
/*
* slope_estimator_send_tlm(se, now_ms) transmit JLINK_TLM_SLOPE (0x88)
* frame to Jetson. Rate-limited to SLOPE_TLM_HZ; safe to call every tick.
*/
void slope_estimator_send_tlm(const slope_estimator_t *se, uint32_t now_ms);
#endif /* SLOPE_ESTIMATOR_H */

View File

@ -0,0 +1,17 @@
uwb_anchor_calibration:
ros__parameters:
# Serial ports for each anchor, indexed by anchor ID
# udev rules create stable symlinks in /dev/uwb-anchor<N>
anchor_serials:
- /dev/uwb-anchor0
- /dev/uwb-anchor1
- /dev/uwb-anchor2
- /dev/uwb-anchor3
baud_rate: 115200
# Seconds to wait for a +PEER_RANGE response per attempt
peer_range_timeout_s: 2.0
# Range measurements to average per anchor pair (higher = more accurate)
default_n_samples: 5

View File

@ -0,0 +1,32 @@
"""
uwb_calibration.launch.py Launch the UWB anchor auto-calibration service.
Usage:
ros2 launch saltybot_uwb_calibration uwb_calibration.launch.py
Then trigger from command line:
ros2 service call /saltybot/uwb/calibrate_anchors \\
saltybot_uwb_calibration_msgs/srv/CalibrateAnchors \\
"{anchor_ids: [], n_samples: 5}"
"""
from launch import LaunchDescription
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description() -> LaunchDescription:
pkg_dir = get_package_share_directory("saltybot_uwb_calibration")
params_file = os.path.join(pkg_dir, "config", "calibration_params.yaml")
calibration_node = Node(
package="saltybot_uwb_calibration",
executable="uwb_calibration",
name="uwb_anchor_calibration",
parameters=[params_file],
output="screen",
emulate_tty=True,
)
return LaunchDescription([calibration_node])

View File

@ -0,0 +1,29 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_uwb_calibration</name>
<version>0.1.0</version>
<description>
UWB anchor auto-calibration for SaltyBot (Issue #602).
Commands anchors to range against each other via AT+PEER_RANGE=,
builds a pairwise distance matrix, and uses classical MDS to recover
2-D anchor positions. Exposes /saltybot/uwb/calibrate_anchors service.
</description>
<maintainer email="sl-uwb@saltylab.local">sl-uwb</maintainer>
<license>Apache-2.0</license>
<depend>rclpy</depend>
<depend>saltybot_uwb_calibration_msgs</depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-serial</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,272 @@
"""
calibration_node.py UWB anchor auto-calibration ROS2 service (Issue #602)
Service: /saltybot/uwb/calibrate_anchors (CalibrateAnchors)
Workflow
1. For every ordered pair (i, j) with i < j, send AT+PEER_RANGE=<j> over
anchor i's serial port, collect `n_samples` measurements, average them.
2. Build the symmetric N×N distance matrix D.
3. Run classical MDS (mds_math.mds_2d) to recover (x, y) positions.
4. Align the frame: anchor-0 at origin, anchor-1 on +X axis.
5. Return positions + residual RMS + JSON config string.
Parameters (ROS2, see config/calibration_params.yaml)
anchor_serials list of serial port paths, index = anchor ID
e.g. ["/dev/uwb-anchor0", "/dev/uwb-anchor1", ...]
baud_rate 115200
peer_range_timeout_s 2.0 (per AT+PEER_RANGE= attempt)
default_n_samples 5
"""
from __future__ import annotations
import json
import re
import time
from typing import Optional
import numpy as np
import serial
import rclpy
from rclpy.node import Node
from saltybot_uwb_calibration_msgs.srv import CalibrateAnchors
from saltybot_uwb_calibration.mds_math import mds_2d, anchor_frame_align
_PEER_RANGE_RE = re.compile(
r"\+PEER_RANGE:(\d+),(\d+),(\d+),([-\d.]+)"
)
_PEER_ERR_RE = re.compile(
r"\+PEER_RANGE:ERR,(\d+),(\w+)"
)
class CalibrationNode(Node):
def __init__(self) -> None:
super().__init__("uwb_anchor_calibration")
self.declare_parameter("anchor_serials", [
"/dev/uwb-anchor0",
"/dev/uwb-anchor1",
"/dev/uwb-anchor2",
"/dev/uwb-anchor3",
])
self.declare_parameter("baud_rate", 115200)
self.declare_parameter("peer_range_timeout_s", 2.0)
self.declare_parameter("default_n_samples", 5)
self._serials: list[str] = (
self.get_parameter("anchor_serials").value
)
self._baud: int = self.get_parameter("baud_rate").value
self._timeout: float = self.get_parameter("peer_range_timeout_s").value
self._default_n: int = self.get_parameter("default_n_samples").value
self._srv = self.create_service(
CalibrateAnchors,
"/saltybot/uwb/calibrate_anchors",
self._handle_calibrate,
)
self.get_logger().info(
f"UWB calibration service ready — "
f"{len(self._serials)} configured anchors"
)
# ── Service handler ────────────────────────────────────────────────────
def _handle_calibrate(
self,
request: CalibrateAnchors.Request,
response: CalibrateAnchors.Response,
) -> CalibrateAnchors.Response:
# Determine anchor IDs to calibrate
if request.anchor_ids:
ids = list(request.anchor_ids)
else:
ids = list(range(len(self._serials)))
n_anchors = len(ids)
if n_anchors < 4:
response.success = False
response.message = (
f"Need at least 4 anchors, got {n_anchors}"
)
return response
n_samples = int(request.n_samples) if request.n_samples > 0 \
else self._default_n
self.get_logger().info(
f"Starting calibration: anchors={ids}, n_samples={n_samples}"
)
# Open serial ports
ports: dict[int, serial.Serial] = {}
try:
for anchor_id in ids:
if anchor_id >= len(self._serials):
raise RuntimeError(
f"No serial port configured for anchor {anchor_id}"
)
port_path = self._serials[anchor_id]
ports[anchor_id] = serial.Serial(
port_path,
baudrate=self._baud,
timeout=self._timeout,
)
# Flush any stale data
ports[anchor_id].reset_input_buffer()
time.sleep(0.05)
self.get_logger().info(
f" Opened anchor {anchor_id}{port_path}"
)
except Exception as exc:
_close_all(ports)
response.success = False
response.message = f"Serial open failed: {exc}"
return response
# Measure all unique pairs
D = np.zeros((n_anchors, n_anchors), dtype=float)
id_to_idx = {aid: idx for idx, aid in enumerate(ids)}
try:
for i_idx, i_id in enumerate(ids):
for j_idx, j_id in enumerate(ids):
if j_idx <= i_idx:
continue
dist_m = self._measure_pair(
ports[i_id], i_id, j_id, n_samples
)
if dist_m is None:
raise RuntimeError(
f"No valid range between anchor {i_id} "
f"and anchor {j_id}"
)
D[i_idx, j_idx] = dist_m
D[j_idx, i_idx] = dist_m
self.get_logger().info(
f" [{i_id}{j_id}] = {dist_m:.3f} m"
)
except Exception as exc:
_close_all(ports)
response.success = False
response.message = str(exc)
return response
_close_all(ports)
# MDS
try:
positions, residual = mds_2d(D)
positions = anchor_frame_align(positions)
except Exception as exc:
response.success = False
response.message = f"MDS failed: {exc}"
return response
# Build JSON config
pos_dict = {}
for idx, aid in enumerate(ids):
pos_dict[str(aid)] = [
round(float(positions[idx, 0]), 4),
round(float(positions[idx, 1]), 4),
0.0,
]
json_str = json.dumps(pos_dict, indent=2)
self.get_logger().info(
f"Calibration complete — residual RMS {residual:.4f} m\n{json_str}"
)
# Fill response
response.success = True
response.message = (
f"OK — residual RMS {residual*1000:.1f} mm"
)
response.anchor_ids = [int(a) for a in ids]
response.positions_x = [float(positions[i, 0]) for i in range(n_anchors)]
response.positions_y = [float(positions[i, 1]) for i in range(n_anchors)]
response.positions_z = [0.0] * n_anchors
response.residual_rms_m = float(residual)
response.anchor_positions_json = json_str
return response
# ── Per-pair measurement ───────────────────────────────────────────────
def _measure_pair(
self,
port: serial.Serial,
my_id: int,
peer_id: int,
n_samples: int,
) -> Optional[float]:
"""
Send AT+PEER_RANGE=<peer_id> `n_samples` times over `port`.
Returns the mean range in metres, or None if all attempts fail.
"""
measurements: list[float] = []
for _ in range(n_samples):
cmd = f"AT+PEER_RANGE={peer_id}\r\n"
port.write(cmd.encode())
port.flush()
deadline = time.monotonic() + self._timeout
while time.monotonic() < deadline:
line = port.readline().decode(errors="replace").strip()
if not line:
continue
m = _PEER_RANGE_RE.match(line)
if m:
# +PEER_RANGE:<my_id>,<peer_id>,<mm>,<rssi>
if int(m.group(1)) == my_id and int(m.group(2)) == peer_id:
mm = float(m.group(3))
measurements.append(mm / 1000.0) # mm → m
break
if _PEER_ERR_RE.match(line):
break # timeout from firmware — try next sample
time.sleep(0.05) # inter-command gap
if not measurements:
return None
return float(np.mean(measurements))
# ── Helpers ────────────────────────────────────────────────────────────────
def _close_all(ports: dict) -> None:
for p in ports.values():
try:
p.close()
except Exception:
pass
def main(args=None) -> None:
rclpy.init(args=args)
node = CalibrationNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,140 @@
"""
mds_math.py Classical Multi-Dimensional Scaling for UWB anchor calibration
(Issue #602)
Given an N×N symmetric distance matrix D (in metres), recover the 2-D
Cartesian positions of N anchors using classical (metric) MDS.
Algorithm
1. Compute the double-centred squared-distance matrix B = -½ H H
where H = I - (1/N) 11 (centering matrix).
2. Eigen-decompose B = V Λ Vᵀ (symmetric).
3. Keep the 2 largest positive eigenvalues (λ₁, λ₂).
4. Positions X = V[:, :2] · diag(λ₁, λ₂).
The recovered configuration is up to a global rigid transformation
(rotation, reflection, translation). The caller is responsible for
anchoring the coordinate frame (e.g. fix anchor-0 at origin, anchor-1
on the positive-X axis).
Usage
from saltybot_uwb_calibration.mds_math import mds_2d, anchor_frame_align
D = np.array([[0, d01, d02, d03],
[d01, 0, d12, d13],
...])
positions, residual = mds_2d(D) # (N,2) array, float RMS (m)
positions = anchor_frame_align(positions) # align frame: anchor-0 at origin
"""
from __future__ import annotations
import numpy as np
def mds_2d(D: np.ndarray) -> tuple[np.ndarray, float]:
"""
Recover 2-D anchor positions from pairwise distance matrix D.
Parameters
----------
D : (N, N) symmetric ndarray of float
Pairwise distances in metres. Diagonal must be 0.
N must be >= 3 (4+ recommended for overdetermined system).
Returns
-------
positions : (N, 2) ndarray
Recovered x, y coordinates in metres.
residual_rms : float
RMS of (D_measured - D_recovered) in metres across all unique pairs.
Raises
------
ValueError
If D is not square, not symmetric, or N < 3.
"""
D = np.asarray(D, dtype=float)
n = D.shape[0]
if D.ndim != 2 or D.shape[1] != n:
raise ValueError(f"D must be square, got shape {D.shape}")
if n < 3:
raise ValueError(f"Need at least 3 anchors, got {n}")
if not np.allclose(D, D.T, atol=1e-3):
raise ValueError("Distance matrix D is not symmetric")
# ── Step 1: double-centre D² ──────────────────────────────────────────
D2 = D ** 2
H = np.eye(n) - np.ones((n, n)) / n
B = -0.5 * H @ D2 @ H
# ── Step 2: eigendecomposition ────────────────────────────────────────
eigvals, eigvecs = np.linalg.eigh(B) # eigvals in ascending order
# ── Step 3: keep top-2 positive eigenvalues ───────────────────────────
idx = np.argsort(eigvals)[::-1] # descending
top2_idx = idx[:2]
lam = eigvals[top2_idx]
lam = np.maximum(lam, 0.0) # clamp numerical negatives
# ── Step 4: recover positions ─────────────────────────────────────────
V = eigvecs[:, top2_idx]
positions = V * np.sqrt(lam) # (N, 2)
# ── Residual RMS ──────────────────────────────────────────────────────
residual_rms = _residual(D, positions)
return positions, residual_rms
def anchor_frame_align(positions: np.ndarray) -> np.ndarray:
"""
Align recovered positions so that:
- anchor-0 is at the origin (0, 0)
- anchor-1 lies on the positive-X axis (y = 0)
This removes the MDS ambiguity (translation + rotation + reflection).
Parameters
----------
positions : (N, 2) ndarray
Returns
-------
(N, 2) ndarray aligned positions
"""
positions = np.array(positions, dtype=float)
if positions.shape[0] < 2:
return positions
# Translate so anchor-0 is at origin
positions -= positions[0]
# Rotate so anchor-1 is on positive-X axis
dx, dy = positions[1, 0], positions[1, 1]
angle = np.arctan2(dy, dx)
cos_a, sin_a = np.cos(-angle), np.sin(-angle)
R = np.array([[cos_a, -sin_a],
[sin_a, cos_a]])
positions = (R @ positions.T).T
# Ensure anchor-1 has positive X (handle reflection)
if positions[1, 0] < 0:
positions[:, 0] *= -1.0
return positions
def _residual(D_meas: np.ndarray, positions: np.ndarray) -> float:
"""Compute RMS of (measured - recovered) distances over all unique pairs."""
n = D_meas.shape[0]
errors = []
for i in range(n):
for j in range(i + 1, n):
d_rec = float(np.linalg.norm(positions[i] - positions[j]))
errors.append(D_meas[i, j] - d_rec)
if not errors:
return 0.0
return float(np.sqrt(np.mean(np.array(errors) ** 2)))

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_uwb_calibration
[install]
install_scripts=$base/lib/saltybot_uwb_calibration

View File

@ -0,0 +1,29 @@
import os
from glob import glob
from setuptools import setup
package_name = "saltybot_uwb_calibration"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages",
[f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(os.path.join("share", package_name, "launch"), glob("launch/*.py")),
(os.path.join("share", package_name, "config"), glob("config/*.yaml")),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-uwb",
maintainer_email="sl-uwb@saltylab.local",
description="UWB anchor auto-calibration via MDS (Issue #602)",
license="Apache-2.0",
entry_points={
"console_scripts": [
"uwb_calibration = saltybot_uwb_calibration.calibration_node:main",
],
},
)

View File

@ -0,0 +1,142 @@
"""
Unit tests for saltybot_uwb_calibration.mds_math (Issue #602).
No ROS2 or hardware required pure numpy.
"""
import math
import sys
import os
import numpy as np
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from saltybot_uwb_calibration.mds_math import mds_2d, anchor_frame_align, _residual
def _dist_matrix(positions: np.ndarray) -> np.ndarray:
"""Build exact pairwise distance matrix from ground-truth positions."""
n = len(positions)
D = np.zeros((n, n))
for i in range(n):
for j in range(n):
D[i, j] = np.linalg.norm(positions[i] - positions[j])
return D
# ── Square configuration (4 anchors) ──────────────────────────────────────
GT_SQUARE = np.array([
[0.0, 0.0],
[4.0, 0.0],
[4.0, 3.0],
[0.0, 3.0],
])
class TestMdsSquare:
def test_residual_near_zero(self):
D = _dist_matrix(GT_SQUARE)
positions, rms = mds_2d(D)
assert rms < 1e-6, f"Residual too large: {rms}"
def test_correct_distances_preserved(self):
D = _dist_matrix(GT_SQUARE)
positions, _ = mds_2d(D)
positions = anchor_frame_align(positions)
D_rec = _dist_matrix(positions)
# All pairwise distances should match to < 1 mm
assert np.max(np.abs(D - D_rec)) < 1e-3
def test_anchor_0_at_origin(self):
D = _dist_matrix(GT_SQUARE)
positions, _ = mds_2d(D)
positions = anchor_frame_align(positions)
assert abs(positions[0, 0]) < 1e-9
assert abs(positions[0, 1]) < 1e-9
def test_anchor_1_on_positive_x(self):
D = _dist_matrix(GT_SQUARE)
positions, _ = mds_2d(D)
positions = anchor_frame_align(positions)
assert positions[1, 0] > 0
assert abs(positions[1, 1]) < 1e-9
# ── Non-uniform configuration (5 anchors) ─────────────────────────────────
GT_5 = np.array([
[0.0, 0.0],
[5.0, 0.0],
[5.0, 4.0],
[2.5, 6.0],
[0.0, 4.0],
])
class TestMds5Anchors:
def test_residual_near_zero(self):
D = _dist_matrix(GT_5)
positions, rms = mds_2d(D)
assert rms < 1e-6
def test_shape(self):
D = _dist_matrix(GT_5)
positions, _ = mds_2d(D)
assert positions.shape == (5, 2)
# ── Noisy distances (simulate UWB measurement noise) ──────────────────────
class TestMdsNoisyInput:
def test_residual_acceptable_with_5cm_noise(self):
rng = np.random.default_rng(42)
D = _dist_matrix(GT_SQUARE)
noise = rng.normal(0, 0.05, D.shape)
noise = (noise + noise.T) / 2 # keep symmetric
np.fill_diagonal(noise, 0)
D_noisy = np.maximum(D + noise, 0)
positions, rms = mds_2d(D_noisy)
# With 5 cm noise the residual should be sub-10 cm
assert rms < 0.10, f"Residual {rms:.4f} m too large"
# ── Error handling ─────────────────────────────────────────────────────────
class TestMdsErrors:
def test_non_square_raises(self):
with pytest.raises(ValueError):
mds_2d(np.ones((3, 4)))
def test_too_few_anchors_raises(self):
with pytest.raises(ValueError):
mds_2d(np.array([[0.0, 1.0], [1.0, 0.0]]))
def test_asymmetric_raises(self):
D = _dist_matrix(GT_SQUARE)
D[0, 1] += 0.5 # break symmetry
with pytest.raises(ValueError):
mds_2d(D)
# ── anchor_frame_align ─────────────────────────────────────────────────────
class TestFrameAlign:
def test_translated_positions_aligned(self):
pts = GT_SQUARE.copy() + np.array([10.0, -5.0])
aligned = anchor_frame_align(pts)
assert abs(aligned[0, 0]) < 1e-9
assert abs(aligned[0, 1]) < 1e-9
def test_rotated_positions_aligned(self):
angle = math.pi / 3
R = np.array([[math.cos(angle), -math.sin(angle)],
[math.sin(angle), math.cos(angle)]])
pts = (R @ GT_SQUARE.T).T
aligned = anchor_frame_align(pts)
assert aligned[1, 0] > 0
assert abs(aligned[1, 1]) < 1e-9
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@ -0,0 +1,12 @@
cmake_minimum_required(VERSION 3.8)
project(saltybot_uwb_calibration_msgs)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"srv/CalibrateAnchors.srv"
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_uwb_calibration_msgs</name>
<version>0.1.0</version>
<description>
Service definitions for UWB anchor auto-calibration (Issue #602).
Provides CalibrateAnchors.srv used by saltybot_uwb_calibration node.
</description>
<maintainer email="sl-uwb@saltylab.local">sl-uwb</maintainer>
<license>Apache-2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@ -0,0 +1,29 @@
# CalibrateAnchors.srv — UWB anchor auto-calibration service (Issue #602)
#
# Request: list of anchor IDs to calibrate (minimum 4).
# If anchor_ids is empty, the node uses all discovered anchors.
#
# Each anchor is commanded via AT+PEER_RANGE=<peer_id> to range against every
# other anchor. The resulting N×N distance matrix is passed through classical
# MDS to recover 2-D positions. Results are written to anchor_positions_json.
#
# Response: success flag, per-anchor [x, y, z] positions, residual RMS error,
# and JSON string suitable for dropping into fusion_params.yaml.
# ── Request ────────────────────────────────────────────────────────────────
uint8[] anchor_ids # IDs of anchors to calibrate (empty = auto-discover)
uint8 n_samples # range measurements to average per pair (default 5)
---
# ── Response ───────────────────────────────────────────────────────────────
bool success
string message # human-readable status / error description
uint8[] anchor_ids # same order as positions below
float64[] positions_x # metres, MDS-recovered X (2-D plane)
float64[] positions_y # metres, MDS-recovered Y
float64[] positions_z # metres, always 0.0 (flat-floor assumption)
float64 residual_rms_m # RMS of |D_measured - D_recovered| across all pairs
string anchor_positions_json # JSON: {"0": [x,y,z], "1": [x,y,z], ...}

View File

@ -1,4 +1,5 @@
#include "balance.h" #include "balance.h"
#include "slope_estimator.h"
#include "config.h" #include "config.h"
#include <math.h> #include <math.h>
@ -19,6 +20,9 @@ void balance_init(balance_t *b) {
/* Safety defaults from config */ /* Safety defaults from config */
b->max_tilt = MAX_TILT_DEG; b->max_tilt = MAX_TILT_DEG;
b->max_speed = MAX_SPEED_LIMIT; b->max_speed = MAX_SPEED_LIMIT;
/* Slope estimator */
slope_estimator_init(&b->slope);
} }
void balance_update(balance_t *b, const IMUData *imu, float dt) { void balance_update(balance_t *b, const IMUData *imu, float dt) {
@ -28,12 +32,16 @@ void balance_update(balance_t *b, const IMUData *imu, float dt) {
b->pitch_deg = imu->pitch; b->pitch_deg = imu->pitch;
b->pitch_rate = imu->pitch_rate; b->pitch_rate = imu->pitch_rate;
/* Advance slope estimator (no-op when not armed) */
slope_estimator_update(&b->slope, b->pitch_deg, dt);
/* Safety: tilt cutoff */ /* Safety: tilt cutoff */
if (b->state == BALANCE_ARMED) { if (b->state == BALANCE_ARMED) {
if (fabsf(b->pitch_deg) > b->max_tilt) { if (fabsf(b->pitch_deg) > b->max_tilt) {
b->state = BALANCE_TILT_FAULT; b->state = BALANCE_TILT_FAULT;
b->motor_cmd = 0; b->motor_cmd = 0;
b->integral = 0.0f; b->integral = 0.0f;
slope_estimator_reset(&b->slope);
return; return;
} }
} }
@ -45,8 +53,9 @@ void balance_update(balance_t *b, const IMUData *imu, float dt) {
return; return;
} }
/* PID */ /* PID — subtract slope estimate so controller balances on incline */
float error = b->setpoint - b->pitch_deg; float tilt_corrected = b->pitch_deg - slope_estimator_get_deg(&b->slope);
float error = b->setpoint - tilt_corrected;
/* Proportional */ /* Proportional */
float p_term = b->kp * error; float p_term = b->kp * error;
@ -85,4 +94,5 @@ void balance_disarm(balance_t *b) {
b->state = BALANCE_DISARMED; b->state = BALANCE_DISARMED;
b->motor_cmd = 0; b->motor_cmd = 0;
b->integral = 0.0f; b->integral = 0.0f;
slope_estimator_reset(&b->slope);
} }

View File

@ -541,6 +541,30 @@ void jlink_send_sched_telemetry(const jlink_tlm_sched_t *tlm)
jlink_tx_locked(frame, (uint16_t)(3u + plen + 3u)); jlink_tx_locked(frame, (uint16_t)(3u + plen + 3u));
} }
/* ---- jlink_send_slope_tlm() -- Issue #600 ---- */
void jlink_send_slope_tlm(const jlink_tlm_slope_t *tlm)
{
/*
* Frame: [STX][LEN][0x88][4 bytes SLOPE][CRC_hi][CRC_lo][ETX]
* Total: 1+1+1+4+2+1 = 10 bytes
*/
static uint8_t frame[10];
const uint8_t plen = (uint8_t)sizeof(jlink_tlm_slope_t); /* 4 */
const uint8_t len = 1u + plen; /* CMD byte + payload */
frame[0] = JLINK_STX;
frame[1] = len;
frame[2] = JLINK_TLM_SLOPE;
memcpy(&frame[3], tlm, plen);
uint16_t crc = crc16_xmodem(&frame[2], len);
frame[3 + plen] = (uint8_t)(crc >> 8);
frame[3 + plen + 1] = (uint8_t)(crc & 0xFFu);
frame[3 + plen + 2] = JLINK_ETX;
jlink_tx_locked(frame, sizeof(frame));
}
/* ---- jlink_send_fault_log() -- Issue #565 ---- */ /* ---- jlink_send_fault_log() -- Issue #565 ---- */
void jlink_send_fault_log(const jlink_tlm_fault_log_t *fl) void jlink_send_fault_log(const jlink_tlm_fault_log_t *fl)
{ {

82
src/slope_estimator.c Normal file
View File

@ -0,0 +1,82 @@
/*
* slope_estimator.c slow-adapting terrain slope estimator (Issue #600).
*
* First-order IIR low-pass filter on fused IMU pitch with tau = SLOPE_TAU_S (5 s).
* The long time constant means fast balance corrections and transients are
* ignored; only sustained pitch bias (i.e., genuine slope) accumulates.
*
* Estimate is clamped to ±SLOPE_MAX_DEG (15°) per the safety requirement.
*/
#include "slope_estimator.h"
#include "jlink.h"
/* ---- slope_estimator_init() ---- */
void slope_estimator_init(slope_estimator_t *se)
{
se->estimate_deg = 0.0f;
se->enabled = true;
/* Initialize so first send_tlm call fires immediately */
se->last_tlm_ms = (uint32_t)(-(uint32_t)(1000u / (SLOPE_TLM_HZ > 0u ? SLOPE_TLM_HZ : 1u)));
}
/* ---- slope_estimator_reset() ---- */
void slope_estimator_reset(slope_estimator_t *se)
{
se->estimate_deg = 0.0f;
}
/* ---- slope_estimator_update() ---- */
void slope_estimator_update(slope_estimator_t *se, float pitch_deg, float dt)
{
if (!se->enabled || dt <= 0.0f) return;
/*
* First-order IIR: alpha = dt / (tau + dt)
* At 1 kHz (dt=0.001): alpha 0.0002 time to ~63% 5 s
* At 100 Hz (dt=0.01): alpha 0.002 time to ~63% 5 s (same tau)
*/
float alpha = dt / (SLOPE_TAU_S + dt);
float raw = se->estimate_deg * (1.0f - alpha) + pitch_deg * alpha;
/* Clamp to ±SLOPE_MAX_DEG */
if (raw > SLOPE_MAX_DEG) raw = SLOPE_MAX_DEG;
if (raw < -SLOPE_MAX_DEG) raw = -SLOPE_MAX_DEG;
se->estimate_deg = raw;
}
/* ---- slope_estimator_get_deg() ---- */
float slope_estimator_get_deg(const slope_estimator_t *se)
{
if (!se->enabled) return 0.0f;
return se->estimate_deg;
}
/* ---- slope_estimator_set_enabled() ---- */
void slope_estimator_set_enabled(slope_estimator_t *se, bool en)
{
se->enabled = en;
}
/* ---- slope_estimator_send_tlm() ---- */
void slope_estimator_send_tlm(const slope_estimator_t *se, uint32_t now_ms)
{
if (SLOPE_TLM_HZ == 0u) return;
uint32_t interval_ms = 1000u / SLOPE_TLM_HZ;
if ((now_ms - se->last_tlm_ms) < interval_ms) return;
/* Cast away const for timestamp update -- only mutable field */
((slope_estimator_t *)se)->last_tlm_ms = now_ms;
jlink_tlm_slope_t tlm;
/* Scale to ×100 for 0.01° resolution, clamped to int16 range */
float scaled = se->estimate_deg * 100.0f;
if (scaled > 32767.0f) scaled = 32767.0f;
if (scaled < -32768.0f) scaled = -32768.0f;
tlm.slope_x100 = (int16_t)scaled;
tlm.active = se->enabled ? 1u : 0u;
tlm._pad = 0u;
jlink_send_slope_tlm(&tlm);
}

421
test/test_slope_estimator.c Normal file
View File

@ -0,0 +1,421 @@
/*
* test_slope_estimator.c Unit tests for slope estimator (Issue #600).
*
* Build (host, no hardware):
* gcc -I include -I test/stubs -DTEST_HOST -lm \
* -o /tmp/test_slope src/slope_estimator.c test/test_slope_estimator.c
*
* All tests are self-contained; no HAL, no ADC, no UART required.
* slope_estimator.c calls jlink_send_slope_tlm() which is stubbed below.
*/
/* ---- Stubs ---- */
/* Prevent jlink.h from pulling in HAL / pid_flash types */
#define JLINK_H
#include <stdint.h>
#include <stdbool.h>
/* Minimal jlink_tlm_slope_t matching the real definition */
typedef struct __attribute__((packed)) {
int16_t slope_x100;
uint8_t active;
uint8_t _pad;
} jlink_tlm_slope_t;
/* Capture last transmitted tlm for inspection */
static jlink_tlm_slope_t g_last_tlm;
static int g_tlm_count = 0;
void jlink_send_slope_tlm(const jlink_tlm_slope_t *tlm)
{
g_last_tlm = *tlm;
g_tlm_count++;
}
/* ---- Include implementation directly ---- */
#include "../src/slope_estimator.c"
/* ---- Test framework ---- */
#include <stdio.h>
#include <math.h>
#include <string.h>
static int g_pass = 0;
static int g_fail = 0;
#define ASSERT(cond, msg) do { \
if (cond) { g_pass++; } \
else { g_fail++; printf("FAIL [%s:%d] %s\n", __FILE__, __LINE__, msg); } \
} while(0)
#define ASSERT_NEAR(a, b, tol, msg) \
ASSERT(fabsf((a)-(b)) <= (tol), msg)
/* ---- Tests ---- */
static void test_init_zeroes_state(void)
{
slope_estimator_t se;
/* dirty the memory first */
memset(&se, 0xAB, sizeof(se));
slope_estimator_init(&se);
ASSERT(se.estimate_deg == 0.0f, "init: estimate_deg == 0");
ASSERT(se.enabled == true, "init: enabled == true");
}
static void test_get_when_disabled_returns_zero(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
/* Force a non-zero estimate by running a few ticks, then disable */
slope_estimator_update(&se, 10.0f, 0.1f);
slope_estimator_set_enabled(&se, false);
ASSERT(slope_estimator_get_deg(&se) == 0.0f,
"get while disabled must return 0");
}
static void test_update_when_disabled_no_change(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
slope_estimator_set_enabled(&se, false);
slope_estimator_update(&se, 10.0f, 0.01f);
ASSERT(se.estimate_deg == 0.0f,
"update while disabled must not change estimate");
}
static void test_update_zero_dt_no_change(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
slope_estimator_update(&se, 5.0f, 0.0f);
ASSERT(se.estimate_deg == 0.0f,
"update with dt=0 must not change estimate");
}
static void test_update_negative_dt_no_change(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
slope_estimator_update(&se, 5.0f, -0.001f);
ASSERT(se.estimate_deg == 0.0f,
"update with dt<0 must not change estimate");
}
static void test_single_step_converges_toward_input(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
/* One update with dt=0.01s, pitch=10 deg */
slope_estimator_update(&se, 10.0f, 0.01f);
/* alpha = 0.01 / (5.0 + 0.01) ≈ 0.002; new estimate ≈ 0 + 0.002*10 = 0.02 */
ASSERT(se.estimate_deg > 0.0f,
"estimate should move toward positive pitch");
ASSERT(se.estimate_deg < 10.0f,
"estimate should not jump to full pitch in one step");
}
static void test_iir_reaches_63pct_in_one_tau(void)
{
/* Run estimator at 1 kHz for exactly SLOPE_TAU_S seconds */
slope_estimator_t se;
slope_estimator_init(&se);
const float pitch = 10.0f;
const float dt = 0.001f;
const int steps = (int)(SLOPE_TAU_S / dt); /* 5000 steps */
for (int i = 0; i < steps; i++) {
slope_estimator_update(&se, pitch, dt);
}
/* IIR should be at ≈63.2% of the step */
float expected_lo = pitch * 0.60f;
float expected_hi = pitch * 0.66f;
ASSERT(se.estimate_deg >= expected_lo && se.estimate_deg <= expected_hi,
"IIR should reach ~63% of step at t=tau");
}
static void test_iir_reaches_63pct_at_100hz(void)
{
/* Same tau, but at 100 Hz */
slope_estimator_t se;
slope_estimator_init(&se);
const float pitch = 8.0f;
const float dt = 0.01f;
const int steps = (int)(SLOPE_TAU_S / dt); /* 500 steps */
for (int i = 0; i < steps; i++) {
slope_estimator_update(&se, pitch, dt);
}
float expected_lo = pitch * 0.60f;
float expected_hi = pitch * 0.66f;
ASSERT(se.estimate_deg >= expected_lo && se.estimate_deg <= expected_hi,
"IIR 100 Hz: should reach ~63% at t=tau");
}
static void test_positive_clamp(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
/* Drive with a huge pitch for a long time — estimate must clamp at SLOPE_MAX_DEG */
for (int i = 0; i < 100000; i++) {
slope_estimator_update(&se, 90.0f, 0.001f);
}
ASSERT_NEAR(se.estimate_deg, SLOPE_MAX_DEG, 0.001f,
"estimate must clamp at +SLOPE_MAX_DEG");
}
static void test_negative_clamp(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
for (int i = 0; i < 100000; i++) {
slope_estimator_update(&se, -90.0f, 0.001f);
}
ASSERT_NEAR(se.estimate_deg, -SLOPE_MAX_DEG, 0.001f,
"estimate must clamp at -SLOPE_MAX_DEG");
}
static void test_reset_zeroes_estimate(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
/* Build up some estimate */
for (int i = 0; i < 1000; i++) {
slope_estimator_update(&se, 12.0f, 0.001f);
}
ASSERT(se.estimate_deg > 0.1f, "pre-reset: estimate should be non-zero");
slope_estimator_reset(&se);
ASSERT(se.estimate_deg == 0.0f, "post-reset: estimate should be zero");
/* enabled state must be preserved */
ASSERT(se.enabled == true, "reset must not change enabled flag");
}
static void test_reset_preserves_disabled_state(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
slope_estimator_set_enabled(&se, false);
slope_estimator_reset(&se);
ASSERT(se.enabled == false, "reset must not re-enable disabled estimator");
}
static void test_balance_setpoint_compensation(void)
{
/*
* Simulate: robot on 5-deg slope, pitch = 5 deg, slope estimate converges.
* After convergence, tilt_corrected = pitch - slope 0.
* Verify that the effective PID error (setpoint=0, tilt_corrected0) is near 0.
*/
slope_estimator_t se;
slope_estimator_init(&se);
const float slope_deg = 5.0f;
const float dt = 0.001f;
const int steps = (int)(3.0f * SLOPE_TAU_S / dt); /* 3 tau = ~95% converged */
for (int i = 0; i < steps; i++) {
/* Robot holds steady on slope — pitch stays constant */
slope_estimator_update(&se, slope_deg, dt);
}
float est = slope_estimator_get_deg(&se);
float tilt_corrected = slope_deg - est;
/* After 3 tau, estimate should be within 5% of slope */
ASSERT(est >= slope_deg * 0.90f, "3-tau estimate >= 90% of slope");
/* tilt_corrected should be close to zero — minimal PID error */
ASSERT(fabsf(tilt_corrected) < slope_deg * 0.10f,
"corrected tilt error < 10% of slope after 3 tau");
}
static void test_fast_tilt_not_tracked(void)
{
/*
* Simulate: robot starts balanced (pitch0), then tips to 20 deg suddenly
* for 100 ms (balance intervention), then returns to 0.
* The slope estimate should not move significantly.
*/
slope_estimator_t se;
slope_estimator_init(&se);
const float dt = 0.001f;
/* 2 s of stable balancing at pitch ≈ 0 */
for (int i = 0; i < 2000; i++) {
slope_estimator_update(&se, 0.0f, dt);
}
float before = se.estimate_deg;
/* 100 ms spike to 20 deg (fast tilt) */
for (int i = 0; i < 100; i++) {
slope_estimator_update(&se, 20.0f, dt);
}
/* Return to 0 for another 100 ms */
for (int i = 0; i < 100; i++) {
slope_estimator_update(&se, 0.0f, dt);
}
float after = se.estimate_deg;
/* 200ms / 5000ms = 4% of tau — estimate should move < 1 deg */
ASSERT(fabsf(after - before) < 1.0f,
"fast tilt transient should not significantly move slope estimate");
}
static void test_slope_change_tracks_slowly(void)
{
/* Robot transitions from flat (0°) to 10° slope at t=0.
* After 1 tau, estimate should be ~63% of 10 = ~6.3 deg. */
slope_estimator_t se;
slope_estimator_init(&se);
const float dt = 0.001f;
const int steps = (int)(SLOPE_TAU_S / dt);
for (int i = 0; i < steps; i++) {
slope_estimator_update(&se, 10.0f, dt);
}
ASSERT(se.estimate_deg >= 6.0f && se.estimate_deg <= 6.5f,
"slope change: 1-tau estimate should be 6.0-6.5 deg");
}
static void test_symmetry_negative_slope(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
const float dt = 0.001f;
const int steps = (int)(SLOPE_TAU_S / dt);
for (int i = 0; i < steps; i++) {
slope_estimator_update(&se, -8.0f, dt);
}
/* Should be ~63% of -8 */
ASSERT(se.estimate_deg <= -4.0f && se.estimate_deg >= -5.5f,
"negative slope: estimate should converge symmetrically");
}
static void test_enable_disable_toggle(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
/* Build estimate */
for (int i = 0; i < 2000; i++) {
slope_estimator_update(&se, 10.0f, 0.001f);
}
float val = se.estimate_deg;
ASSERT(val > 0.1f, "estimate built before disable");
/* Disable: estimate frozen */
slope_estimator_set_enabled(&se, false);
slope_estimator_update(&se, 10.0f, 0.001f);
ASSERT(se.estimate_deg == val, "estimate frozen while disabled");
/* get returns 0 when disabled */
ASSERT(slope_estimator_get_deg(&se) == 0.0f, "get returns 0 while disabled");
/* Re-enable: estimate resumes */
slope_estimator_set_enabled(&se, true);
slope_estimator_update(&se, 10.0f, 0.001f);
ASSERT(se.estimate_deg > val, "estimate resumes after re-enable");
}
static void test_tlm_rate_limiting(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
g_tlm_count = 0;
/* Call at 1000 Hz for 3 seconds → should send 3 TLMs (at 1 Hz rate limit) */
for (uint32_t ms = 0; ms < 3000; ms++) {
slope_estimator_send_tlm(&se, ms);
}
/* First call at ms=0 sends, then 1000ms, 2000ms → 3 total */
ASSERT(g_tlm_count == 3, "TLM rate-limited to SLOPE_TLM_HZ (1 Hz)");
}
static void test_tlm_payload_encoding(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
/* Force a known estimate */
se.estimate_deg = 7.5f;
se.enabled = true;
g_tlm_count = 0;
g_last_tlm.slope_x100 = 0;
/* Trigger a TLM by advancing time enough */
slope_estimator_send_tlm(&se, 0u);
ASSERT(g_tlm_count == 1, "TLM sent");
ASSERT(g_last_tlm.slope_x100 == 750, "7.5 deg -> slope_x100 = 750");
ASSERT(g_last_tlm.active == 1u, "active flag set when enabled");
}
static void test_tlm_disabled_active_flag(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
slope_estimator_set_enabled(&se, false);
se.estimate_deg = 3.0f;
g_tlm_count = 0;
slope_estimator_send_tlm(&se, 0u);
ASSERT(g_tlm_count == 1, "TLM sent when disabled");
ASSERT(g_last_tlm.active == 0u, "active flag clear when disabled");
/* slope_x100 reflects stored estimate but get() returns 0 */
ASSERT(g_last_tlm.slope_x100 == 300, "slope_x100 encodes stored estimate");
}
static void test_tlm_clamped_to_int16(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
/* Manually set beyond int16 range for test */
se.estimate_deg = 400.0f; /* 400 * 100 = 40000, within int16 */
se.enabled = true;
g_tlm_count = 0;
slope_estimator_send_tlm(&se, 0u);
ASSERT(g_last_tlm.slope_x100 == 32767, "large estimate clamped to INT16_MAX");
}
static void test_multiple_resets_idempotent(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
slope_estimator_reset(&se);
slope_estimator_reset(&se);
ASSERT(se.estimate_deg == 0.0f, "multiple resets: still zero");
ASSERT(se.enabled == true, "multiple resets: still enabled");
}
/* ---- main ---- */
int main(void)
{
printf("=== test_slope_estimator ===\n");
test_init_zeroes_state();
test_get_when_disabled_returns_zero();
test_update_when_disabled_no_change();
test_update_zero_dt_no_change();
test_update_negative_dt_no_change();
test_single_step_converges_toward_input();
test_iir_reaches_63pct_in_one_tau();
test_iir_reaches_63pct_at_100hz();
test_positive_clamp();
test_negative_clamp();
test_reset_zeroes_estimate();
test_reset_preserves_disabled_state();
test_balance_setpoint_compensation();
test_fast_tilt_not_tracked();
test_slope_change_tracks_slowly();
test_symmetry_negative_slope();
test_enable_disable_toggle();
test_tlm_rate_limiting();
test_tlm_payload_encoding();
test_tlm_disabled_active_flag();
test_tlm_clamped_to_int16();
test_multiple_resets_idempotent();
printf("\nResults: %d passed, %d failed\n", g_pass, g_fail);
return g_fail ? 1 : 0;
}