Compare commits

...

4 Commits

Author SHA1 Message Date
6c00d6a321 Merge pull request 'feat: UWB anchor auto-calibration via inter-anchor ranging + MDS (Issue #602)' (#608) from sl-uwb/issue-602-anchor-calibration into main 2026-03-14 15:54:56 -04:00
sl-uwb
82cc223fb8 feat: Add AT+PEER_RANGE= command for inter-anchor calibration (Issue #602)
- peer_range_once(): DS-TWR initiator role toward a peer anchor
  (POLL → RESP → FINAL, one-sided range estimate Ra - Da/2)
- AT+PEER_RANGE=<id>: returns +PEER_RANGE:<my>,<peer>,<mm>,<rssi>
  or +PEER_RANGE:ERR,<peer>,TIMEOUT

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-14 15:06:29 -04:00
5f03e4cbef feat: Tilt compensation for slopes (Issue #600)
Adds a slow-adapting terrain slope estimator (IIR tau=5s) that decouples
the robot's balance offset from genuine ground incline.  The balance
controller subtracts the slope estimate from measured pitch so the PID
balances around the slope surface rather than absolute vertical.

- include/slope_estimator.h + src/slope_estimator.c: first-order IIR
  filter clamped to ±15°; JLINK_TLM_SLOPE (0x88) telemetry at 1 Hz
- include/jlink.h + src/jlink.c: add JLINK_TLM_SLOPE (0x88),
  jlink_tlm_slope_t (4 bytes), jlink_send_slope_tlm()
- include/balance.h + src/balance.c: integrate slope_estimator into
  balance_t; update, reset on tilt-fault and disarm
- test/test_slope_estimator.c: 35 unit tests, all passing

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-14 15:04:58 -04:00
sl-uwb
587ca8a98e feat: UWB anchor auto-calibration via inter-anchor ranging + MDS (Issue #602)
Anchor firmware (esp32/uwb_anchor/src/main.cpp):
- Add peer_range_once(peer_id) — DS-TWR initiator role toward a peer anchor
- Add AT+PEER_RANGE=<id> command: triggers inter-anchor ranging and returns
  +PEER_RANGE:<my_id>,<peer_id>,<range_mm>,<rssi_dbm> (or ERR,TIMEOUT)

ROS2 package saltybot_uwb_calibration_msgs:
- CalibrateAnchors.srv: request (anchor_ids[], n_samples) →
  response (positions_x/y/z[], residual_rms_m, anchor_positions_json)

ROS2 package saltybot_uwb_calibration:
- mds_math.py: classical MDS (double-centred D², eigendecomposition),
  anchor_frame_align() to fix anchor-0 at origin / anchor-1 on +X
- calibration_node.py: /saltybot/uwb/calibrate_anchors service —
  opens anchor serial ports, rounds-robin AT+PEER_RANGE= for all pairs,
  builds N×N distance matrix, runs MDS, returns JSON anchor positions
- 12/12 unit tests passing (test/test_mds_math.py)
- Supports ≥ 4 anchors; 5× averaged ranging per pair by default

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-14 15:03:53 -04:00
21 changed files with 1518 additions and 2 deletions

View File

@ -27,6 +27,8 @@
* AT+RANGE_ADDR=<hex_addr> pair with specific tag (filter others) * AT+RANGE_ADDR=<hex_addr> pair with specific tag (filter others)
* AT+RANGE_ADDR= clear pairing (accept all tags) * AT+RANGE_ADDR= clear pairing (accept all tags)
* AT+ID? returns +ID:<anchor_id> * AT+ID? returns +ID:<anchor_id>
* AT+PEER_RANGE=<id> inter-anchor DS-TWR (for auto-calibration)
* +PEER_RANGE:<my>,<peer>,<mm>,<rssi>
* *
* Pin mapping Makerfabs ESP32 UWB Pro * Pin mapping Makerfabs ESP32 UWB Pro
* *
@ -188,6 +190,116 @@ static float rx_power_dbm(void) {
return 10.0f * log10f((f * f) / (n * n)) - 121.74f; return 10.0f * log10f((f * f) / (n * n)) - 121.74f;
} }
/* ── Peer-anchor ranging (initiator role, for auto-calibration) ─── */
/* Timeout waiting for peer's RESP during inter-anchor ranging */
#define PEER_RX_RESP_TIMEOUT_US 3500
/* Block up to this many ms waiting for the interrupt flags */
#define PEER_WAIT_MS 20
/* Initiate a single DS-TWR exchange toward peer anchor `peer_id`.
* Returns range in mm (>=0) on success, or -1 on timeout/error.
* RSSI is stored in *rssi_out if non-null.
*
* Exchange:
* This anchor peer POLL
* peer This RESP (carries T_poll_rx, T_resp_tx)
* This anchor peer FINAL (carries Ra, Da)
* This side computes its own range estimate from Ra/Da.
*/
static int32_t peer_range_once(uint8_t peer_id, float *rssi_out) {
/* ── Reset interrupt flags ── */
g_tx_done = g_rx_ok = g_rx_err = g_rx_to = false;
/* ── Build POLL frame ── */
uint8_t poll_buf[FRAME_HDR + FCS_LEN] = {
FTYPE_POLL,
(uint8_t)ANCHOR_ID,
peer_id,
};
dwt_writetxdata(sizeof(poll_buf), poll_buf, 0);
dwt_writetxfctrl(sizeof(poll_buf), 0, 1);
dwt_setrxtimeout(PEER_RX_RESP_TIMEOUT_US);
dwt_rxenable(DWT_START_RX_IMMEDIATE);
if (dwt_starttx(DWT_START_TX_IMMEDIATE | DWT_RESPONSE_EXPECTED) != DWT_SUCCESS)
return -1;
/* Wait for TX done */
uint32_t t0 = millis();
while (!g_tx_done && !g_rx_err && !g_rx_to) {
if (millis() - t0 > (uint32_t)PEER_WAIT_MS) return -1;
}
if (g_rx_err || g_rx_to) return -1;
g_tx_done = false;
/* Capture T_poll_tx */
uint64_t T_poll_tx;
dwt_readtxtimestamp((uint8_t *)&T_poll_tx);
T_poll_tx &= DWT_TS_MASK;
/* Wait for RESP */
t0 = millis();
while (!g_rx_ok && !g_rx_err && !g_rx_to) {
if (millis() - t0 > (uint32_t)PEER_WAIT_MS) return -1;
}
if (!g_rx_ok) return -1;
/* Validate RESP */
if (g_rx_len < (uint32_t)(FRAME_HDR + RESP_PAYLOAD + FCS_LEN)) return -1;
if (g_rx_buf[0] != FTYPE_RESP) return -1;
if (g_rx_buf[1] != peer_id) return -1;
if (g_rx_buf[2] != (uint8_t)ANCHOR_ID) return -1;
uint64_t T_resp_rx;
dwt_readrxtimestamp((uint8_t *)&T_resp_rx);
T_resp_rx &= DWT_TS_MASK;
/* Extract peer timestamps from RESP payload */
const uint8_t *pl = g_rx_buf + FRAME_HDR;
uint64_t T_poll_rx_peer = ts_read(pl);
uint64_t T_resp_tx_peer = ts_read(pl + 5);
/* DS-TWR Ra, Da */
uint64_t Ra = ts_diff(T_resp_rx, T_poll_tx);
uint64_t Da = ts_diff(T_resp_tx_peer, T_poll_rx_peer);
g_rx_ok = g_rx_err = g_rx_to = false;
/* ── Build FINAL frame ── */
uint8_t final_buf[FRAME_HDR + FINAL_PAYLOAD + FCS_LEN];
final_buf[0] = FTYPE_FINAL;
final_buf[1] = (uint8_t)ANCHOR_ID;
final_buf[2] = peer_id;
ts_write(final_buf + FRAME_HDR, Ra);
ts_write(final_buf + FRAME_HDR + 5, Da);
ts_write(final_buf + FRAME_HDR + 10, (uint64_t)0); /* Db placeholder */
dwt_setrxtimeout(0);
dwt_writetxdata(sizeof(final_buf), final_buf, 0);
dwt_writetxfctrl(sizeof(final_buf), 0, 1);
if (dwt_starttx(DWT_START_TX_IMMEDIATE) != DWT_SUCCESS) return -1;
t0 = millis();
while (!g_tx_done && !g_rx_err) {
if (millis() - t0 > (uint32_t)PEER_WAIT_MS) return -1;
}
g_tx_done = false;
/* Simplified one-sided range estimate: tof = Ra - Da/2 */
double tof_s = ticks_to_s(Ra) - ticks_to_s(Da) / 2.0;
if (tof_s < 0.0) tof_s = 0.0;
int32_t range_mm = (int32_t)(tof_s * SPEED_OF_LIGHT * 1000.0);
if (rssi_out) *rssi_out = rx_power_dbm();
/* Re-enable normal RX for tag ranging */
dwt_setrxtimeout(0);
dwt_rxenable(DWT_START_RX_IMMEDIATE);
return range_mm;
}
/* ── AT command handler ─────────────────────────────────────────── */ /* ── AT command handler ─────────────────────────────────────────── */
static char g_at_buf[64]; static char g_at_buf[64];
@ -212,6 +324,23 @@ static void at_dispatch(const char *cmd) {
g_paired_tag_id = (uint8_t)strtoul(v, nullptr, 0); g_paired_tag_id = (uint8_t)strtoul(v, nullptr, 0);
Serial.printf("+OK:PAIRED=0x%02X\r\n", g_paired_tag_id); Serial.printf("+OK:PAIRED=0x%02X\r\n", g_paired_tag_id);
} }
} else if (strncmp(cmd, "AT+PEER_RANGE=", 14) == 0) {
/* Inter-anchor ranging for calibration.
* Usage: AT+PEER_RANGE=<peer_anchor_id>
* Response: +PEER_RANGE:<my_id>,<peer_id>,<range_mm>,<rssi_dbm>
* or: +PEER_RANGE:ERR,<peer_id>,TIMEOUT
*/
uint8_t peer_id = (uint8_t)strtoul(cmd + 14, nullptr, 0);
float rssi = 0.0f;
int32_t mm = peer_range_once(peer_id, &rssi);
if (mm >= 0) {
Serial.printf("+PEER_RANGE:%d,%d,%ld,%.1f\r\n",
ANCHOR_ID, peer_id, mm, (double)rssi);
} else {
Serial.printf("+PEER_RANGE:ERR,%d,TIMEOUT\r\n", peer_id);
}
} else { } else {
Serial.println("+ERR:UNKNOWN_CMD"); Serial.println("+ERR:UNKNOWN_CMD");
} }

View File

@ -3,6 +3,7 @@
#include <stdint.h> #include <stdint.h>
#include "mpu6000.h" #include "mpu6000.h"
#include "slope_estimator.h"
/* /*
* SaltyLab Balance Controller * SaltyLab Balance Controller
@ -36,6 +37,9 @@ typedef struct {
/* Safety */ /* Safety */
float max_tilt; /* Cutoff angle (degrees) */ float max_tilt; /* Cutoff angle (degrees) */
int16_t max_speed; /* Speed limit */ int16_t max_speed; /* Speed limit */
/* Slope compensation (Issue #600) */
slope_estimator_t slope;
} balance_t; } balance_t;
void balance_init(balance_t *b); void balance_init(balance_t *b);

View File

@ -87,6 +87,7 @@
#define JLINK_TLM_SCHED 0x85u /* jlink_tlm_sched_t (1+N*16 bytes, Issue #550) */ #define JLINK_TLM_SCHED 0x85u /* jlink_tlm_sched_t (1+N*16 bytes, Issue #550) */
#define JLINK_TLM_MOTOR_CURRENT 0x86u /* jlink_tlm_motor_current_t (8 bytes, Issue #584) */ #define JLINK_TLM_MOTOR_CURRENT 0x86u /* jlink_tlm_motor_current_t (8 bytes, Issue #584) */
#define JLINK_TLM_FAULT_LOG 0x87u /* jlink_tlm_fault_log_t (20 bytes, Issue #565) */ #define JLINK_TLM_FAULT_LOG 0x87u /* jlink_tlm_fault_log_t (20 bytes, Issue #565) */
#define JLINK_TLM_SLOPE 0x88u /* jlink_tlm_slope_t (4 bytes, Issue #600) */
/* ---- Telemetry STATUS payload (20 bytes, packed) ---- */ /* ---- Telemetry STATUS payload (20 bytes, packed) ---- */
typedef struct __attribute__((packed)) { typedef struct __attribute__((packed)) {
@ -164,6 +165,14 @@ typedef struct __attribute__((packed)) {
uint8_t _pad; /* reserved */ uint8_t _pad; /* reserved */
} jlink_tlm_motor_current_t; /* 8 bytes */ } jlink_tlm_motor_current_t; /* 8 bytes */
/* ---- Telemetry SLOPE payload (4 bytes, packed) Issue #600 ---- */
/* Published at SLOPE_TLM_HZ (1 Hz); reports slow-adapting terrain slope estimate. */
typedef struct __attribute__((packed)) {
int16_t slope_x100; /* slope estimate degrees x100 (0.01° resolution) */
uint8_t active; /* 1 = slope compensation enabled */
uint8_t _pad; /* reserved */
} jlink_tlm_slope_t; /* 4 bytes */
/* ---- Telemetry FAULT_LOG payload (20 bytes, packed) Issue #565 ---- */ /* ---- Telemetry FAULT_LOG payload (20 bytes, packed) Issue #565 ---- */
/* Sent on boot (if last fault != NONE) and in response to FAULT_LOG_GET. */ /* Sent on boot (if last fault != NONE) and in response to FAULT_LOG_GET. */
typedef struct __attribute__((packed)) { typedef struct __attribute__((packed)) {
@ -272,6 +281,13 @@ JLinkSchedSetBuf *jlink_get_sched_set(void);
*/ */
void jlink_send_motor_current_tlm(const jlink_tlm_motor_current_t *tlm); void jlink_send_motor_current_tlm(const jlink_tlm_motor_current_t *tlm);
/*
* jlink_send_slope_tlm(tlm) - transmit JLINK_TLM_SLOPE (0x88) frame
* (10 bytes total) to Jetson. Issue #600.
* Rate-limiting is handled by slope_estimator_send_tlm(); call from there only.
*/
void jlink_send_slope_tlm(const jlink_tlm_slope_t *tlm);
/* /*
* jlink_send_fault_log(fl) - transmit JLINK_TLM_FAULT_LOG (0x87) frame * jlink_send_fault_log(fl) - transmit JLINK_TLM_FAULT_LOG (0x87) frame
* (26 bytes) on boot (if fault log non-empty) and in response to * (26 bytes) on boot (if fault log non-empty) and in response to

101
include/slope_estimator.h Normal file
View File

@ -0,0 +1,101 @@
#ifndef SLOPE_ESTIMATOR_H
#define SLOPE_ESTIMATOR_H
#include <stdint.h>
#include <stdbool.h>
/*
* slope_estimator slow-adapting terrain slope estimator for Issue #600.
*
* On a slope the robot must lean slightly into the hill to stay balanced.
* The IMU pitch reading therefore includes both the robot's balance offset
* and the ground incline. This module decouples the two by tracking the
* slowly-changing DC component of the pitch signal with a first-order IIR
* low-pass filter (time constant SLOPE_TAU_S = 5 s).
*
* HOW IT WORKS:
* Every call to slope_estimator_update(pitch_deg, dt):
*
* alpha = dt / (SLOPE_TAU_S + dt) // ≈ 0.0002 at 1 kHz
* raw = slope * (1 - alpha) + pitch * alpha
* slope = clamp(raw, -SLOPE_MAX_DEG, +SLOPE_MAX_DEG)
*
* The IIR converges to the steady-state pitch in ~5 s. Fast tilt
* transients (balance corrections, steps, bumps) are attenuated by
* the long time constant and do not corrupt the estimate.
*
* INTEGRATION IN BALANCE PID:
* Subtract slope_estimate from the measured pitch before computing
* the PID error so the controller balances around the slope surface
* rather than absolute vertical:
*
* tilt_corrected = pitch_deg - slope_estimate_deg
* error = setpoint - tilt_corrected
*
* This is equivalent to continuously adjusting the balance setpoint
* to track the incline.
*
* SAFETY:
* - Estimate is clamped to ±SLOPE_MAX_DEG (15°) to prevent drift from
* extreme falls being mistaken for genuine slopes.
* - slope_estimator_reset() zeroes the state; call on disarm or after
* a tilt fault so re-arming starts fresh.
*
* TELEMETRY:
* JLINK_TLM_SLOPE (0x88) published at SLOPE_TLM_HZ (1 Hz):
* jlink_tlm_slope_t { int16 slope_x100, uint8 active, uint8 _pad }
*/
/* ---- Configuration ---- */
#define SLOPE_TAU_S 5.0f /* IIR time constant (seconds) */
#define SLOPE_MAX_DEG 15.0f /* Maximum estimate magnitude (degrees) */
#define SLOPE_TLM_HZ 1u /* JLINK_TLM_SLOPE publish rate (Hz) */
/* ---- State ---- */
typedef struct {
float estimate_deg; /* current slope estimate (degrees, + = nose-up) */
bool enabled; /* compensation on/off; off = estimate frozen */
uint32_t last_tlm_ms; /* timestamp of last TLM transmission */
} slope_estimator_t;
/* ---- API ---- */
/*
* slope_estimator_init(se) zero state, enable estimation.
* Call once during system init.
*/
void slope_estimator_init(slope_estimator_t *se);
/*
* slope_estimator_reset(se) zero estimate without changing enabled flag.
* Call on disarm or after BALANCE_TILT_FAULT to avoid stale state on rearm.
*/
void slope_estimator_reset(slope_estimator_t *se);
/*
* slope_estimator_update(se, pitch_deg, dt) advance the IIR filter.
* pitch_deg : current fused pitch angle from IMU (degrees)
* dt : loop interval (seconds)
* No-op if se->enabled == false or dt <= 0.
*/
void slope_estimator_update(slope_estimator_t *se, float pitch_deg, float dt);
/*
* slope_estimator_get_deg(se) return current estimate (degrees).
* Returns 0 if disabled.
*/
float slope_estimator_get_deg(const slope_estimator_t *se);
/*
* slope_estimator_set_enabled(se, en) enable or disable compensation.
* Disabling freezes the estimate at its current value.
*/
void slope_estimator_set_enabled(slope_estimator_t *se, bool en);
/*
* slope_estimator_send_tlm(se, now_ms) transmit JLINK_TLM_SLOPE (0x88)
* frame to Jetson. Rate-limited to SLOPE_TLM_HZ; safe to call every tick.
*/
void slope_estimator_send_tlm(const slope_estimator_t *se, uint32_t now_ms);
#endif /* SLOPE_ESTIMATOR_H */

View File

@ -0,0 +1,17 @@
uwb_anchor_calibration:
ros__parameters:
# Serial ports for each anchor, indexed by anchor ID
# udev rules create stable symlinks in /dev/uwb-anchor<N>
anchor_serials:
- /dev/uwb-anchor0
- /dev/uwb-anchor1
- /dev/uwb-anchor2
- /dev/uwb-anchor3
baud_rate: 115200
# Seconds to wait for a +PEER_RANGE response per attempt
peer_range_timeout_s: 2.0
# Range measurements to average per anchor pair (higher = more accurate)
default_n_samples: 5

View File

@ -0,0 +1,32 @@
"""
uwb_calibration.launch.py Launch the UWB anchor auto-calibration service.
Usage:
ros2 launch saltybot_uwb_calibration uwb_calibration.launch.py
Then trigger from command line:
ros2 service call /saltybot/uwb/calibrate_anchors \\
saltybot_uwb_calibration_msgs/srv/CalibrateAnchors \\
"{anchor_ids: [], n_samples: 5}"
"""
from launch import LaunchDescription
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description() -> LaunchDescription:
pkg_dir = get_package_share_directory("saltybot_uwb_calibration")
params_file = os.path.join(pkg_dir, "config", "calibration_params.yaml")
calibration_node = Node(
package="saltybot_uwb_calibration",
executable="uwb_calibration",
name="uwb_anchor_calibration",
parameters=[params_file],
output="screen",
emulate_tty=True,
)
return LaunchDescription([calibration_node])

View File

@ -0,0 +1,29 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_uwb_calibration</name>
<version>0.1.0</version>
<description>
UWB anchor auto-calibration for SaltyBot (Issue #602).
Commands anchors to range against each other via AT+PEER_RANGE=,
builds a pairwise distance matrix, and uses classical MDS to recover
2-D anchor positions. Exposes /saltybot/uwb/calibrate_anchors service.
</description>
<maintainer email="sl-uwb@saltylab.local">sl-uwb</maintainer>
<license>Apache-2.0</license>
<depend>rclpy</depend>
<depend>saltybot_uwb_calibration_msgs</depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-serial</exec_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,272 @@
"""
calibration_node.py UWB anchor auto-calibration ROS2 service (Issue #602)
Service: /saltybot/uwb/calibrate_anchors (CalibrateAnchors)
Workflow
1. For every ordered pair (i, j) with i < j, send AT+PEER_RANGE=<j> over
anchor i's serial port, collect `n_samples` measurements, average them.
2. Build the symmetric N×N distance matrix D.
3. Run classical MDS (mds_math.mds_2d) to recover (x, y) positions.
4. Align the frame: anchor-0 at origin, anchor-1 on +X axis.
5. Return positions + residual RMS + JSON config string.
Parameters (ROS2, see config/calibration_params.yaml)
anchor_serials list of serial port paths, index = anchor ID
e.g. ["/dev/uwb-anchor0", "/dev/uwb-anchor1", ...]
baud_rate 115200
peer_range_timeout_s 2.0 (per AT+PEER_RANGE= attempt)
default_n_samples 5
"""
from __future__ import annotations
import json
import re
import time
from typing import Optional
import numpy as np
import serial
import rclpy
from rclpy.node import Node
from saltybot_uwb_calibration_msgs.srv import CalibrateAnchors
from saltybot_uwb_calibration.mds_math import mds_2d, anchor_frame_align
_PEER_RANGE_RE = re.compile(
r"\+PEER_RANGE:(\d+),(\d+),(\d+),([-\d.]+)"
)
_PEER_ERR_RE = re.compile(
r"\+PEER_RANGE:ERR,(\d+),(\w+)"
)
class CalibrationNode(Node):
def __init__(self) -> None:
super().__init__("uwb_anchor_calibration")
self.declare_parameter("anchor_serials", [
"/dev/uwb-anchor0",
"/dev/uwb-anchor1",
"/dev/uwb-anchor2",
"/dev/uwb-anchor3",
])
self.declare_parameter("baud_rate", 115200)
self.declare_parameter("peer_range_timeout_s", 2.0)
self.declare_parameter("default_n_samples", 5)
self._serials: list[str] = (
self.get_parameter("anchor_serials").value
)
self._baud: int = self.get_parameter("baud_rate").value
self._timeout: float = self.get_parameter("peer_range_timeout_s").value
self._default_n: int = self.get_parameter("default_n_samples").value
self._srv = self.create_service(
CalibrateAnchors,
"/saltybot/uwb/calibrate_anchors",
self._handle_calibrate,
)
self.get_logger().info(
f"UWB calibration service ready — "
f"{len(self._serials)} configured anchors"
)
# ── Service handler ────────────────────────────────────────────────────
def _handle_calibrate(
self,
request: CalibrateAnchors.Request,
response: CalibrateAnchors.Response,
) -> CalibrateAnchors.Response:
# Determine anchor IDs to calibrate
if request.anchor_ids:
ids = list(request.anchor_ids)
else:
ids = list(range(len(self._serials)))
n_anchors = len(ids)
if n_anchors < 4:
response.success = False
response.message = (
f"Need at least 4 anchors, got {n_anchors}"
)
return response
n_samples = int(request.n_samples) if request.n_samples > 0 \
else self._default_n
self.get_logger().info(
f"Starting calibration: anchors={ids}, n_samples={n_samples}"
)
# Open serial ports
ports: dict[int, serial.Serial] = {}
try:
for anchor_id in ids:
if anchor_id >= len(self._serials):
raise RuntimeError(
f"No serial port configured for anchor {anchor_id}"
)
port_path = self._serials[anchor_id]
ports[anchor_id] = serial.Serial(
port_path,
baudrate=self._baud,
timeout=self._timeout,
)
# Flush any stale data
ports[anchor_id].reset_input_buffer()
time.sleep(0.05)
self.get_logger().info(
f" Opened anchor {anchor_id}{port_path}"
)
except Exception as exc:
_close_all(ports)
response.success = False
response.message = f"Serial open failed: {exc}"
return response
# Measure all unique pairs
D = np.zeros((n_anchors, n_anchors), dtype=float)
id_to_idx = {aid: idx for idx, aid in enumerate(ids)}
try:
for i_idx, i_id in enumerate(ids):
for j_idx, j_id in enumerate(ids):
if j_idx <= i_idx:
continue
dist_m = self._measure_pair(
ports[i_id], i_id, j_id, n_samples
)
if dist_m is None:
raise RuntimeError(
f"No valid range between anchor {i_id} "
f"and anchor {j_id}"
)
D[i_idx, j_idx] = dist_m
D[j_idx, i_idx] = dist_m
self.get_logger().info(
f" [{i_id}{j_id}] = {dist_m:.3f} m"
)
except Exception as exc:
_close_all(ports)
response.success = False
response.message = str(exc)
return response
_close_all(ports)
# MDS
try:
positions, residual = mds_2d(D)
positions = anchor_frame_align(positions)
except Exception as exc:
response.success = False
response.message = f"MDS failed: {exc}"
return response
# Build JSON config
pos_dict = {}
for idx, aid in enumerate(ids):
pos_dict[str(aid)] = [
round(float(positions[idx, 0]), 4),
round(float(positions[idx, 1]), 4),
0.0,
]
json_str = json.dumps(pos_dict, indent=2)
self.get_logger().info(
f"Calibration complete — residual RMS {residual:.4f} m\n{json_str}"
)
# Fill response
response.success = True
response.message = (
f"OK — residual RMS {residual*1000:.1f} mm"
)
response.anchor_ids = [int(a) for a in ids]
response.positions_x = [float(positions[i, 0]) for i in range(n_anchors)]
response.positions_y = [float(positions[i, 1]) for i in range(n_anchors)]
response.positions_z = [0.0] * n_anchors
response.residual_rms_m = float(residual)
response.anchor_positions_json = json_str
return response
# ── Per-pair measurement ───────────────────────────────────────────────
def _measure_pair(
self,
port: serial.Serial,
my_id: int,
peer_id: int,
n_samples: int,
) -> Optional[float]:
"""
Send AT+PEER_RANGE=<peer_id> `n_samples` times over `port`.
Returns the mean range in metres, or None if all attempts fail.
"""
measurements: list[float] = []
for _ in range(n_samples):
cmd = f"AT+PEER_RANGE={peer_id}\r\n"
port.write(cmd.encode())
port.flush()
deadline = time.monotonic() + self._timeout
while time.monotonic() < deadline:
line = port.readline().decode(errors="replace").strip()
if not line:
continue
m = _PEER_RANGE_RE.match(line)
if m:
# +PEER_RANGE:<my_id>,<peer_id>,<mm>,<rssi>
if int(m.group(1)) == my_id and int(m.group(2)) == peer_id:
mm = float(m.group(3))
measurements.append(mm / 1000.0) # mm → m
break
if _PEER_ERR_RE.match(line):
break # timeout from firmware — try next sample
time.sleep(0.05) # inter-command gap
if not measurements:
return None
return float(np.mean(measurements))
# ── Helpers ────────────────────────────────────────────────────────────────
def _close_all(ports: dict) -> None:
for p in ports.values():
try:
p.close()
except Exception:
pass
def main(args=None) -> None:
rclpy.init(args=args)
node = CalibrationNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,140 @@
"""
mds_math.py Classical Multi-Dimensional Scaling for UWB anchor calibration
(Issue #602)
Given an N×N symmetric distance matrix D (in metres), recover the 2-D
Cartesian positions of N anchors using classical (metric) MDS.
Algorithm
1. Compute the double-centred squared-distance matrix B = -½ H H
where H = I - (1/N) 11 (centering matrix).
2. Eigen-decompose B = V Λ Vᵀ (symmetric).
3. Keep the 2 largest positive eigenvalues (λ₁, λ₂).
4. Positions X = V[:, :2] · diag(λ₁, λ₂).
The recovered configuration is up to a global rigid transformation
(rotation, reflection, translation). The caller is responsible for
anchoring the coordinate frame (e.g. fix anchor-0 at origin, anchor-1
on the positive-X axis).
Usage
from saltybot_uwb_calibration.mds_math import mds_2d, anchor_frame_align
D = np.array([[0, d01, d02, d03],
[d01, 0, d12, d13],
...])
positions, residual = mds_2d(D) # (N,2) array, float RMS (m)
positions = anchor_frame_align(positions) # align frame: anchor-0 at origin
"""
from __future__ import annotations
import numpy as np
def mds_2d(D: np.ndarray) -> tuple[np.ndarray, float]:
"""
Recover 2-D anchor positions from pairwise distance matrix D.
Parameters
----------
D : (N, N) symmetric ndarray of float
Pairwise distances in metres. Diagonal must be 0.
N must be >= 3 (4+ recommended for overdetermined system).
Returns
-------
positions : (N, 2) ndarray
Recovered x, y coordinates in metres.
residual_rms : float
RMS of (D_measured - D_recovered) in metres across all unique pairs.
Raises
------
ValueError
If D is not square, not symmetric, or N < 3.
"""
D = np.asarray(D, dtype=float)
n = D.shape[0]
if D.ndim != 2 or D.shape[1] != n:
raise ValueError(f"D must be square, got shape {D.shape}")
if n < 3:
raise ValueError(f"Need at least 3 anchors, got {n}")
if not np.allclose(D, D.T, atol=1e-3):
raise ValueError("Distance matrix D is not symmetric")
# ── Step 1: double-centre D² ──────────────────────────────────────────
D2 = D ** 2
H = np.eye(n) - np.ones((n, n)) / n
B = -0.5 * H @ D2 @ H
# ── Step 2: eigendecomposition ────────────────────────────────────────
eigvals, eigvecs = np.linalg.eigh(B) # eigvals in ascending order
# ── Step 3: keep top-2 positive eigenvalues ───────────────────────────
idx = np.argsort(eigvals)[::-1] # descending
top2_idx = idx[:2]
lam = eigvals[top2_idx]
lam = np.maximum(lam, 0.0) # clamp numerical negatives
# ── Step 4: recover positions ─────────────────────────────────────────
V = eigvecs[:, top2_idx]
positions = V * np.sqrt(lam) # (N, 2)
# ── Residual RMS ──────────────────────────────────────────────────────
residual_rms = _residual(D, positions)
return positions, residual_rms
def anchor_frame_align(positions: np.ndarray) -> np.ndarray:
"""
Align recovered positions so that:
- anchor-0 is at the origin (0, 0)
- anchor-1 lies on the positive-X axis (y = 0)
This removes the MDS ambiguity (translation + rotation + reflection).
Parameters
----------
positions : (N, 2) ndarray
Returns
-------
(N, 2) ndarray aligned positions
"""
positions = np.array(positions, dtype=float)
if positions.shape[0] < 2:
return positions
# Translate so anchor-0 is at origin
positions -= positions[0]
# Rotate so anchor-1 is on positive-X axis
dx, dy = positions[1, 0], positions[1, 1]
angle = np.arctan2(dy, dx)
cos_a, sin_a = np.cos(-angle), np.sin(-angle)
R = np.array([[cos_a, -sin_a],
[sin_a, cos_a]])
positions = (R @ positions.T).T
# Ensure anchor-1 has positive X (handle reflection)
if positions[1, 0] < 0:
positions[:, 0] *= -1.0
return positions
def _residual(D_meas: np.ndarray, positions: np.ndarray) -> float:
"""Compute RMS of (measured - recovered) distances over all unique pairs."""
n = D_meas.shape[0]
errors = []
for i in range(n):
for j in range(i + 1, n):
d_rec = float(np.linalg.norm(positions[i] - positions[j]))
errors.append(D_meas[i, j] - d_rec)
if not errors:
return 0.0
return float(np.sqrt(np.mean(np.array(errors) ** 2)))

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_uwb_calibration
[install]
install_scripts=$base/lib/saltybot_uwb_calibration

View File

@ -0,0 +1,29 @@
import os
from glob import glob
from setuptools import setup
package_name = "saltybot_uwb_calibration"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages",
[f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(os.path.join("share", package_name, "launch"), glob("launch/*.py")),
(os.path.join("share", package_name, "config"), glob("config/*.yaml")),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-uwb",
maintainer_email="sl-uwb@saltylab.local",
description="UWB anchor auto-calibration via MDS (Issue #602)",
license="Apache-2.0",
entry_points={
"console_scripts": [
"uwb_calibration = saltybot_uwb_calibration.calibration_node:main",
],
},
)

View File

@ -0,0 +1,142 @@
"""
Unit tests for saltybot_uwb_calibration.mds_math (Issue #602).
No ROS2 or hardware required pure numpy.
"""
import math
import sys
import os
import numpy as np
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from saltybot_uwb_calibration.mds_math import mds_2d, anchor_frame_align, _residual
def _dist_matrix(positions: np.ndarray) -> np.ndarray:
"""Build exact pairwise distance matrix from ground-truth positions."""
n = len(positions)
D = np.zeros((n, n))
for i in range(n):
for j in range(n):
D[i, j] = np.linalg.norm(positions[i] - positions[j])
return D
# ── Square configuration (4 anchors) ──────────────────────────────────────
GT_SQUARE = np.array([
[0.0, 0.0],
[4.0, 0.0],
[4.0, 3.0],
[0.0, 3.0],
])
class TestMdsSquare:
def test_residual_near_zero(self):
D = _dist_matrix(GT_SQUARE)
positions, rms = mds_2d(D)
assert rms < 1e-6, f"Residual too large: {rms}"
def test_correct_distances_preserved(self):
D = _dist_matrix(GT_SQUARE)
positions, _ = mds_2d(D)
positions = anchor_frame_align(positions)
D_rec = _dist_matrix(positions)
# All pairwise distances should match to < 1 mm
assert np.max(np.abs(D - D_rec)) < 1e-3
def test_anchor_0_at_origin(self):
D = _dist_matrix(GT_SQUARE)
positions, _ = mds_2d(D)
positions = anchor_frame_align(positions)
assert abs(positions[0, 0]) < 1e-9
assert abs(positions[0, 1]) < 1e-9
def test_anchor_1_on_positive_x(self):
D = _dist_matrix(GT_SQUARE)
positions, _ = mds_2d(D)
positions = anchor_frame_align(positions)
assert positions[1, 0] > 0
assert abs(positions[1, 1]) < 1e-9
# ── Non-uniform configuration (5 anchors) ─────────────────────────────────
GT_5 = np.array([
[0.0, 0.0],
[5.0, 0.0],
[5.0, 4.0],
[2.5, 6.0],
[0.0, 4.0],
])
class TestMds5Anchors:
def test_residual_near_zero(self):
D = _dist_matrix(GT_5)
positions, rms = mds_2d(D)
assert rms < 1e-6
def test_shape(self):
D = _dist_matrix(GT_5)
positions, _ = mds_2d(D)
assert positions.shape == (5, 2)
# ── Noisy distances (simulate UWB measurement noise) ──────────────────────
class TestMdsNoisyInput:
def test_residual_acceptable_with_5cm_noise(self):
rng = np.random.default_rng(42)
D = _dist_matrix(GT_SQUARE)
noise = rng.normal(0, 0.05, D.shape)
noise = (noise + noise.T) / 2 # keep symmetric
np.fill_diagonal(noise, 0)
D_noisy = np.maximum(D + noise, 0)
positions, rms = mds_2d(D_noisy)
# With 5 cm noise the residual should be sub-10 cm
assert rms < 0.10, f"Residual {rms:.4f} m too large"
# ── Error handling ─────────────────────────────────────────────────────────
class TestMdsErrors:
def test_non_square_raises(self):
with pytest.raises(ValueError):
mds_2d(np.ones((3, 4)))
def test_too_few_anchors_raises(self):
with pytest.raises(ValueError):
mds_2d(np.array([[0.0, 1.0], [1.0, 0.0]]))
def test_asymmetric_raises(self):
D = _dist_matrix(GT_SQUARE)
D[0, 1] += 0.5 # break symmetry
with pytest.raises(ValueError):
mds_2d(D)
# ── anchor_frame_align ─────────────────────────────────────────────────────
class TestFrameAlign:
def test_translated_positions_aligned(self):
pts = GT_SQUARE.copy() + np.array([10.0, -5.0])
aligned = anchor_frame_align(pts)
assert abs(aligned[0, 0]) < 1e-9
assert abs(aligned[0, 1]) < 1e-9
def test_rotated_positions_aligned(self):
angle = math.pi / 3
R = np.array([[math.cos(angle), -math.sin(angle)],
[math.sin(angle), math.cos(angle)]])
pts = (R @ GT_SQUARE.T).T
aligned = anchor_frame_align(pts)
assert aligned[1, 0] > 0
assert abs(aligned[1, 1]) < 1e-9
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@ -0,0 +1,12 @@
cmake_minimum_required(VERSION 3.8)
project(saltybot_uwb_calibration_msgs)
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"srv/CalibrateAnchors.srv"
)
ament_export_dependencies(rosidl_default_runtime)
ament_package()

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_uwb_calibration_msgs</name>
<version>0.1.0</version>
<description>
Service definitions for UWB anchor auto-calibration (Issue #602).
Provides CalibrateAnchors.srv used by saltybot_uwb_calibration node.
</description>
<maintainer email="sl-uwb@saltylab.local">sl-uwb</maintainer>
<license>Apache-2.0</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@ -0,0 +1,29 @@
# CalibrateAnchors.srv — UWB anchor auto-calibration service (Issue #602)
#
# Request: list of anchor IDs to calibrate (minimum 4).
# If anchor_ids is empty, the node uses all discovered anchors.
#
# Each anchor is commanded via AT+PEER_RANGE=<peer_id> to range against every
# other anchor. The resulting N×N distance matrix is passed through classical
# MDS to recover 2-D positions. Results are written to anchor_positions_json.
#
# Response: success flag, per-anchor [x, y, z] positions, residual RMS error,
# and JSON string suitable for dropping into fusion_params.yaml.
# ── Request ────────────────────────────────────────────────────────────────
uint8[] anchor_ids # IDs of anchors to calibrate (empty = auto-discover)
uint8 n_samples # range measurements to average per pair (default 5)
---
# ── Response ───────────────────────────────────────────────────────────────
bool success
string message # human-readable status / error description
uint8[] anchor_ids # same order as positions below
float64[] positions_x # metres, MDS-recovered X (2-D plane)
float64[] positions_y # metres, MDS-recovered Y
float64[] positions_z # metres, always 0.0 (flat-floor assumption)
float64 residual_rms_m # RMS of |D_measured - D_recovered| across all pairs
string anchor_positions_json # JSON: {"0": [x,y,z], "1": [x,y,z], ...}

View File

@ -1,4 +1,5 @@
#include "balance.h" #include "balance.h"
#include "slope_estimator.h"
#include "config.h" #include "config.h"
#include <math.h> #include <math.h>
@ -19,6 +20,9 @@ void balance_init(balance_t *b) {
/* Safety defaults from config */ /* Safety defaults from config */
b->max_tilt = MAX_TILT_DEG; b->max_tilt = MAX_TILT_DEG;
b->max_speed = MAX_SPEED_LIMIT; b->max_speed = MAX_SPEED_LIMIT;
/* Slope estimator */
slope_estimator_init(&b->slope);
} }
void balance_update(balance_t *b, const IMUData *imu, float dt) { void balance_update(balance_t *b, const IMUData *imu, float dt) {
@ -28,12 +32,16 @@ void balance_update(balance_t *b, const IMUData *imu, float dt) {
b->pitch_deg = imu->pitch; b->pitch_deg = imu->pitch;
b->pitch_rate = imu->pitch_rate; b->pitch_rate = imu->pitch_rate;
/* Advance slope estimator (no-op when not armed) */
slope_estimator_update(&b->slope, b->pitch_deg, dt);
/* Safety: tilt cutoff */ /* Safety: tilt cutoff */
if (b->state == BALANCE_ARMED) { if (b->state == BALANCE_ARMED) {
if (fabsf(b->pitch_deg) > b->max_tilt) { if (fabsf(b->pitch_deg) > b->max_tilt) {
b->state = BALANCE_TILT_FAULT; b->state = BALANCE_TILT_FAULT;
b->motor_cmd = 0; b->motor_cmd = 0;
b->integral = 0.0f; b->integral = 0.0f;
slope_estimator_reset(&b->slope);
return; return;
} }
} }
@ -45,8 +53,9 @@ void balance_update(balance_t *b, const IMUData *imu, float dt) {
return; return;
} }
/* PID */ /* PID — subtract slope estimate so controller balances on incline */
float error = b->setpoint - b->pitch_deg; float tilt_corrected = b->pitch_deg - slope_estimator_get_deg(&b->slope);
float error = b->setpoint - tilt_corrected;
/* Proportional */ /* Proportional */
float p_term = b->kp * error; float p_term = b->kp * error;
@ -85,4 +94,5 @@ void balance_disarm(balance_t *b) {
b->state = BALANCE_DISARMED; b->state = BALANCE_DISARMED;
b->motor_cmd = 0; b->motor_cmd = 0;
b->integral = 0.0f; b->integral = 0.0f;
slope_estimator_reset(&b->slope);
} }

View File

@ -541,6 +541,30 @@ void jlink_send_sched_telemetry(const jlink_tlm_sched_t *tlm)
jlink_tx_locked(frame, (uint16_t)(3u + plen + 3u)); jlink_tx_locked(frame, (uint16_t)(3u + plen + 3u));
} }
/* ---- jlink_send_slope_tlm() -- Issue #600 ---- */
void jlink_send_slope_tlm(const jlink_tlm_slope_t *tlm)
{
/*
* Frame: [STX][LEN][0x88][4 bytes SLOPE][CRC_hi][CRC_lo][ETX]
* Total: 1+1+1+4+2+1 = 10 bytes
*/
static uint8_t frame[10];
const uint8_t plen = (uint8_t)sizeof(jlink_tlm_slope_t); /* 4 */
const uint8_t len = 1u + plen; /* CMD byte + payload */
frame[0] = JLINK_STX;
frame[1] = len;
frame[2] = JLINK_TLM_SLOPE;
memcpy(&frame[3], tlm, plen);
uint16_t crc = crc16_xmodem(&frame[2], len);
frame[3 + plen] = (uint8_t)(crc >> 8);
frame[3 + plen + 1] = (uint8_t)(crc & 0xFFu);
frame[3 + plen + 2] = JLINK_ETX;
jlink_tx_locked(frame, sizeof(frame));
}
/* ---- jlink_send_fault_log() -- Issue #565 ---- */ /* ---- jlink_send_fault_log() -- Issue #565 ---- */
void jlink_send_fault_log(const jlink_tlm_fault_log_t *fl) void jlink_send_fault_log(const jlink_tlm_fault_log_t *fl)
{ {

82
src/slope_estimator.c Normal file
View File

@ -0,0 +1,82 @@
/*
* slope_estimator.c slow-adapting terrain slope estimator (Issue #600).
*
* First-order IIR low-pass filter on fused IMU pitch with tau = SLOPE_TAU_S (5 s).
* The long time constant means fast balance corrections and transients are
* ignored; only sustained pitch bias (i.e., genuine slope) accumulates.
*
* Estimate is clamped to ±SLOPE_MAX_DEG (15°) per the safety requirement.
*/
#include "slope_estimator.h"
#include "jlink.h"
/* ---- slope_estimator_init() ---- */
void slope_estimator_init(slope_estimator_t *se)
{
se->estimate_deg = 0.0f;
se->enabled = true;
/* Initialize so first send_tlm call fires immediately */
se->last_tlm_ms = (uint32_t)(-(uint32_t)(1000u / (SLOPE_TLM_HZ > 0u ? SLOPE_TLM_HZ : 1u)));
}
/* ---- slope_estimator_reset() ---- */
void slope_estimator_reset(slope_estimator_t *se)
{
se->estimate_deg = 0.0f;
}
/* ---- slope_estimator_update() ---- */
void slope_estimator_update(slope_estimator_t *se, float pitch_deg, float dt)
{
if (!se->enabled || dt <= 0.0f) return;
/*
* First-order IIR: alpha = dt / (tau + dt)
* At 1 kHz (dt=0.001): alpha 0.0002 time to ~63% 5 s
* At 100 Hz (dt=0.01): alpha 0.002 time to ~63% 5 s (same tau)
*/
float alpha = dt / (SLOPE_TAU_S + dt);
float raw = se->estimate_deg * (1.0f - alpha) + pitch_deg * alpha;
/* Clamp to ±SLOPE_MAX_DEG */
if (raw > SLOPE_MAX_DEG) raw = SLOPE_MAX_DEG;
if (raw < -SLOPE_MAX_DEG) raw = -SLOPE_MAX_DEG;
se->estimate_deg = raw;
}
/* ---- slope_estimator_get_deg() ---- */
float slope_estimator_get_deg(const slope_estimator_t *se)
{
if (!se->enabled) return 0.0f;
return se->estimate_deg;
}
/* ---- slope_estimator_set_enabled() ---- */
void slope_estimator_set_enabled(slope_estimator_t *se, bool en)
{
se->enabled = en;
}
/* ---- slope_estimator_send_tlm() ---- */
void slope_estimator_send_tlm(const slope_estimator_t *se, uint32_t now_ms)
{
if (SLOPE_TLM_HZ == 0u) return;
uint32_t interval_ms = 1000u / SLOPE_TLM_HZ;
if ((now_ms - se->last_tlm_ms) < interval_ms) return;
/* Cast away const for timestamp update -- only mutable field */
((slope_estimator_t *)se)->last_tlm_ms = now_ms;
jlink_tlm_slope_t tlm;
/* Scale to ×100 for 0.01° resolution, clamped to int16 range */
float scaled = se->estimate_deg * 100.0f;
if (scaled > 32767.0f) scaled = 32767.0f;
if (scaled < -32768.0f) scaled = -32768.0f;
tlm.slope_x100 = (int16_t)scaled;
tlm.active = se->enabled ? 1u : 0u;
tlm._pad = 0u;
jlink_send_slope_tlm(&tlm);
}

421
test/test_slope_estimator.c Normal file
View File

@ -0,0 +1,421 @@
/*
* test_slope_estimator.c Unit tests for slope estimator (Issue #600).
*
* Build (host, no hardware):
* gcc -I include -I test/stubs -DTEST_HOST -lm \
* -o /tmp/test_slope src/slope_estimator.c test/test_slope_estimator.c
*
* All tests are self-contained; no HAL, no ADC, no UART required.
* slope_estimator.c calls jlink_send_slope_tlm() which is stubbed below.
*/
/* ---- Stubs ---- */
/* Prevent jlink.h from pulling in HAL / pid_flash types */
#define JLINK_H
#include <stdint.h>
#include <stdbool.h>
/* Minimal jlink_tlm_slope_t matching the real definition */
typedef struct __attribute__((packed)) {
int16_t slope_x100;
uint8_t active;
uint8_t _pad;
} jlink_tlm_slope_t;
/* Capture last transmitted tlm for inspection */
static jlink_tlm_slope_t g_last_tlm;
static int g_tlm_count = 0;
void jlink_send_slope_tlm(const jlink_tlm_slope_t *tlm)
{
g_last_tlm = *tlm;
g_tlm_count++;
}
/* ---- Include implementation directly ---- */
#include "../src/slope_estimator.c"
/* ---- Test framework ---- */
#include <stdio.h>
#include <math.h>
#include <string.h>
static int g_pass = 0;
static int g_fail = 0;
#define ASSERT(cond, msg) do { \
if (cond) { g_pass++; } \
else { g_fail++; printf("FAIL [%s:%d] %s\n", __FILE__, __LINE__, msg); } \
} while(0)
#define ASSERT_NEAR(a, b, tol, msg) \
ASSERT(fabsf((a)-(b)) <= (tol), msg)
/* ---- Tests ---- */
static void test_init_zeroes_state(void)
{
slope_estimator_t se;
/* dirty the memory first */
memset(&se, 0xAB, sizeof(se));
slope_estimator_init(&se);
ASSERT(se.estimate_deg == 0.0f, "init: estimate_deg == 0");
ASSERT(se.enabled == true, "init: enabled == true");
}
static void test_get_when_disabled_returns_zero(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
/* Force a non-zero estimate by running a few ticks, then disable */
slope_estimator_update(&se, 10.0f, 0.1f);
slope_estimator_set_enabled(&se, false);
ASSERT(slope_estimator_get_deg(&se) == 0.0f,
"get while disabled must return 0");
}
static void test_update_when_disabled_no_change(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
slope_estimator_set_enabled(&se, false);
slope_estimator_update(&se, 10.0f, 0.01f);
ASSERT(se.estimate_deg == 0.0f,
"update while disabled must not change estimate");
}
static void test_update_zero_dt_no_change(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
slope_estimator_update(&se, 5.0f, 0.0f);
ASSERT(se.estimate_deg == 0.0f,
"update with dt=0 must not change estimate");
}
static void test_update_negative_dt_no_change(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
slope_estimator_update(&se, 5.0f, -0.001f);
ASSERT(se.estimate_deg == 0.0f,
"update with dt<0 must not change estimate");
}
static void test_single_step_converges_toward_input(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
/* One update with dt=0.01s, pitch=10 deg */
slope_estimator_update(&se, 10.0f, 0.01f);
/* alpha = 0.01 / (5.0 + 0.01) ≈ 0.002; new estimate ≈ 0 + 0.002*10 = 0.02 */
ASSERT(se.estimate_deg > 0.0f,
"estimate should move toward positive pitch");
ASSERT(se.estimate_deg < 10.0f,
"estimate should not jump to full pitch in one step");
}
static void test_iir_reaches_63pct_in_one_tau(void)
{
/* Run estimator at 1 kHz for exactly SLOPE_TAU_S seconds */
slope_estimator_t se;
slope_estimator_init(&se);
const float pitch = 10.0f;
const float dt = 0.001f;
const int steps = (int)(SLOPE_TAU_S / dt); /* 5000 steps */
for (int i = 0; i < steps; i++) {
slope_estimator_update(&se, pitch, dt);
}
/* IIR should be at ≈63.2% of the step */
float expected_lo = pitch * 0.60f;
float expected_hi = pitch * 0.66f;
ASSERT(se.estimate_deg >= expected_lo && se.estimate_deg <= expected_hi,
"IIR should reach ~63% of step at t=tau");
}
static void test_iir_reaches_63pct_at_100hz(void)
{
/* Same tau, but at 100 Hz */
slope_estimator_t se;
slope_estimator_init(&se);
const float pitch = 8.0f;
const float dt = 0.01f;
const int steps = (int)(SLOPE_TAU_S / dt); /* 500 steps */
for (int i = 0; i < steps; i++) {
slope_estimator_update(&se, pitch, dt);
}
float expected_lo = pitch * 0.60f;
float expected_hi = pitch * 0.66f;
ASSERT(se.estimate_deg >= expected_lo && se.estimate_deg <= expected_hi,
"IIR 100 Hz: should reach ~63% at t=tau");
}
static void test_positive_clamp(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
/* Drive with a huge pitch for a long time — estimate must clamp at SLOPE_MAX_DEG */
for (int i = 0; i < 100000; i++) {
slope_estimator_update(&se, 90.0f, 0.001f);
}
ASSERT_NEAR(se.estimate_deg, SLOPE_MAX_DEG, 0.001f,
"estimate must clamp at +SLOPE_MAX_DEG");
}
static void test_negative_clamp(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
for (int i = 0; i < 100000; i++) {
slope_estimator_update(&se, -90.0f, 0.001f);
}
ASSERT_NEAR(se.estimate_deg, -SLOPE_MAX_DEG, 0.001f,
"estimate must clamp at -SLOPE_MAX_DEG");
}
static void test_reset_zeroes_estimate(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
/* Build up some estimate */
for (int i = 0; i < 1000; i++) {
slope_estimator_update(&se, 12.0f, 0.001f);
}
ASSERT(se.estimate_deg > 0.1f, "pre-reset: estimate should be non-zero");
slope_estimator_reset(&se);
ASSERT(se.estimate_deg == 0.0f, "post-reset: estimate should be zero");
/* enabled state must be preserved */
ASSERT(se.enabled == true, "reset must not change enabled flag");
}
static void test_reset_preserves_disabled_state(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
slope_estimator_set_enabled(&se, false);
slope_estimator_reset(&se);
ASSERT(se.enabled == false, "reset must not re-enable disabled estimator");
}
static void test_balance_setpoint_compensation(void)
{
/*
* Simulate: robot on 5-deg slope, pitch = 5 deg, slope estimate converges.
* After convergence, tilt_corrected = pitch - slope 0.
* Verify that the effective PID error (setpoint=0, tilt_corrected0) is near 0.
*/
slope_estimator_t se;
slope_estimator_init(&se);
const float slope_deg = 5.0f;
const float dt = 0.001f;
const int steps = (int)(3.0f * SLOPE_TAU_S / dt); /* 3 tau = ~95% converged */
for (int i = 0; i < steps; i++) {
/* Robot holds steady on slope — pitch stays constant */
slope_estimator_update(&se, slope_deg, dt);
}
float est = slope_estimator_get_deg(&se);
float tilt_corrected = slope_deg - est;
/* After 3 tau, estimate should be within 5% of slope */
ASSERT(est >= slope_deg * 0.90f, "3-tau estimate >= 90% of slope");
/* tilt_corrected should be close to zero — minimal PID error */
ASSERT(fabsf(tilt_corrected) < slope_deg * 0.10f,
"corrected tilt error < 10% of slope after 3 tau");
}
static void test_fast_tilt_not_tracked(void)
{
/*
* Simulate: robot starts balanced (pitch0), then tips to 20 deg suddenly
* for 100 ms (balance intervention), then returns to 0.
* The slope estimate should not move significantly.
*/
slope_estimator_t se;
slope_estimator_init(&se);
const float dt = 0.001f;
/* 2 s of stable balancing at pitch ≈ 0 */
for (int i = 0; i < 2000; i++) {
slope_estimator_update(&se, 0.0f, dt);
}
float before = se.estimate_deg;
/* 100 ms spike to 20 deg (fast tilt) */
for (int i = 0; i < 100; i++) {
slope_estimator_update(&se, 20.0f, dt);
}
/* Return to 0 for another 100 ms */
for (int i = 0; i < 100; i++) {
slope_estimator_update(&se, 0.0f, dt);
}
float after = se.estimate_deg;
/* 200ms / 5000ms = 4% of tau — estimate should move < 1 deg */
ASSERT(fabsf(after - before) < 1.0f,
"fast tilt transient should not significantly move slope estimate");
}
static void test_slope_change_tracks_slowly(void)
{
/* Robot transitions from flat (0°) to 10° slope at t=0.
* After 1 tau, estimate should be ~63% of 10 = ~6.3 deg. */
slope_estimator_t se;
slope_estimator_init(&se);
const float dt = 0.001f;
const int steps = (int)(SLOPE_TAU_S / dt);
for (int i = 0; i < steps; i++) {
slope_estimator_update(&se, 10.0f, dt);
}
ASSERT(se.estimate_deg >= 6.0f && se.estimate_deg <= 6.5f,
"slope change: 1-tau estimate should be 6.0-6.5 deg");
}
static void test_symmetry_negative_slope(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
const float dt = 0.001f;
const int steps = (int)(SLOPE_TAU_S / dt);
for (int i = 0; i < steps; i++) {
slope_estimator_update(&se, -8.0f, dt);
}
/* Should be ~63% of -8 */
ASSERT(se.estimate_deg <= -4.0f && se.estimate_deg >= -5.5f,
"negative slope: estimate should converge symmetrically");
}
static void test_enable_disable_toggle(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
/* Build estimate */
for (int i = 0; i < 2000; i++) {
slope_estimator_update(&se, 10.0f, 0.001f);
}
float val = se.estimate_deg;
ASSERT(val > 0.1f, "estimate built before disable");
/* Disable: estimate frozen */
slope_estimator_set_enabled(&se, false);
slope_estimator_update(&se, 10.0f, 0.001f);
ASSERT(se.estimate_deg == val, "estimate frozen while disabled");
/* get returns 0 when disabled */
ASSERT(slope_estimator_get_deg(&se) == 0.0f, "get returns 0 while disabled");
/* Re-enable: estimate resumes */
slope_estimator_set_enabled(&se, true);
slope_estimator_update(&se, 10.0f, 0.001f);
ASSERT(se.estimate_deg > val, "estimate resumes after re-enable");
}
static void test_tlm_rate_limiting(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
g_tlm_count = 0;
/* Call at 1000 Hz for 3 seconds → should send 3 TLMs (at 1 Hz rate limit) */
for (uint32_t ms = 0; ms < 3000; ms++) {
slope_estimator_send_tlm(&se, ms);
}
/* First call at ms=0 sends, then 1000ms, 2000ms → 3 total */
ASSERT(g_tlm_count == 3, "TLM rate-limited to SLOPE_TLM_HZ (1 Hz)");
}
static void test_tlm_payload_encoding(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
/* Force a known estimate */
se.estimate_deg = 7.5f;
se.enabled = true;
g_tlm_count = 0;
g_last_tlm.slope_x100 = 0;
/* Trigger a TLM by advancing time enough */
slope_estimator_send_tlm(&se, 0u);
ASSERT(g_tlm_count == 1, "TLM sent");
ASSERT(g_last_tlm.slope_x100 == 750, "7.5 deg -> slope_x100 = 750");
ASSERT(g_last_tlm.active == 1u, "active flag set when enabled");
}
static void test_tlm_disabled_active_flag(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
slope_estimator_set_enabled(&se, false);
se.estimate_deg = 3.0f;
g_tlm_count = 0;
slope_estimator_send_tlm(&se, 0u);
ASSERT(g_tlm_count == 1, "TLM sent when disabled");
ASSERT(g_last_tlm.active == 0u, "active flag clear when disabled");
/* slope_x100 reflects stored estimate but get() returns 0 */
ASSERT(g_last_tlm.slope_x100 == 300, "slope_x100 encodes stored estimate");
}
static void test_tlm_clamped_to_int16(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
/* Manually set beyond int16 range for test */
se.estimate_deg = 400.0f; /* 400 * 100 = 40000, within int16 */
se.enabled = true;
g_tlm_count = 0;
slope_estimator_send_tlm(&se, 0u);
ASSERT(g_last_tlm.slope_x100 == 32767, "large estimate clamped to INT16_MAX");
}
static void test_multiple_resets_idempotent(void)
{
slope_estimator_t se;
slope_estimator_init(&se);
slope_estimator_reset(&se);
slope_estimator_reset(&se);
ASSERT(se.estimate_deg == 0.0f, "multiple resets: still zero");
ASSERT(se.enabled == true, "multiple resets: still enabled");
}
/* ---- main ---- */
int main(void)
{
printf("=== test_slope_estimator ===\n");
test_init_zeroes_state();
test_get_when_disabled_returns_zero();
test_update_when_disabled_no_change();
test_update_zero_dt_no_change();
test_update_negative_dt_no_change();
test_single_step_converges_toward_input();
test_iir_reaches_63pct_in_one_tau();
test_iir_reaches_63pct_at_100hz();
test_positive_clamp();
test_negative_clamp();
test_reset_zeroes_estimate();
test_reset_preserves_disabled_state();
test_balance_setpoint_compensation();
test_fast_tilt_not_tracked();
test_slope_change_tracks_slowly();
test_symmetry_negative_slope();
test_enable_disable_toggle();
test_tlm_rate_limiting();
test_tlm_payload_encoding();
test_tlm_disabled_active_flag();
test_tlm_clamped_to_int16();
test_multiple_resets_idempotent();
printf("\nResults: %d passed, %d failed\n", g_pass, g_fail);
return g_fail ? 1 : 0;
}