Merge pull request 'feat: Here4 GPS DroneCAN on Orin via CANable2 (bd-p47c)' (#728) from sl-perception/bd-p47c-here4-can-gps into main

This commit is contained in:
sl-jetson 2026-04-17 23:10:05 -04:00
commit c02faf3ac2
11 changed files with 1433 additions and 0 deletions

View File

@ -0,0 +1,22 @@
here4_can_node:
ros__parameters:
# SocketCAN interface — freed from ESP32 BALANCE comms by bd-wim1
can_interface: can0
# DroneCAN bitrate — Here4 runs at 1 Mbps (not 500 kbps used previously)
bitrate: 1000000
# Local DroneCAN node ID for this Orin instance (1-127, must be unique on bus)
local_node_id: 126
# Set true to run: ip link set can0 type can bitrate 1000000 && ip link set can0 up
# Requires: sudo setcap cap_net_admin+eip $(which python3) or run as root
# Default false — manage interface externally (systemd-networkd or udev)
bring_up_can: false
# Here4 DroneCAN node ID — 0 means auto-detect from first Fix2 message
node_id_filter: 0
# TF frame IDs
fix_frame_id: gps
imu_frame_id: here4_imu

View File

@ -0,0 +1,109 @@
"""here4.launch.py — Launch the Here4 GPS DroneCAN bridge on CANable2.
bd-p47c: CANable2 freed by bd-wim1 (ESP32 moved to UART/USB) now used for
Here4 GPS at 1 Mbps DroneCAN/UAVCAN v0.
CAN setup (one-time, survives reboot via systemd-networkd or udev)
------------------------------------------------------------------
# Option A — manual (quick test):
sudo ip link set can0 down
sudo ip link set can0 type can bitrate 1000000
sudo ip link set can0 up
# Option B — let the node do it (requires CAP_NET_ADMIN):
ros2 launch saltybot_dronecan_gps here4.launch.py bring_up_can:=true
# Option C — systemd-networkd (/etc/systemd/network/80-can0.network):
[Match]
Name=can0
[CAN]
BitRate=1M
Published topics
----------------
/gps/fix sensor_msgs/NavSatFix navsat_transform_node (EKF)
/gps/velocity geometry_msgs/TwistWithCovarianceStamped
/here4/fix sensor_msgs/NavSatFix (alias)
/here4/imu sensor_msgs/Imu
/here4/mag sensor_msgs/MagneticField
/here4/baro sensor_msgs/FluidPressure
/here4/status std_msgs/String JSON
/here4/node_id std_msgs/Int32
Subscribed topics
-----------------
/rtcm std_msgs/ByteMultiArray RTCM corrections (NTRIP client)
/rtcm_hex std_msgs/String hex-encoded RTCM (fallback)
Usage
-----
# Default (interface already up):
ros2 launch saltybot_dronecan_gps here4.launch.py
# Bring up interface automatically (CAP_NET_ADMIN required):
ros2 launch saltybot_dronecan_gps here4.launch.py bring_up_can:=true
# Override interface (e.g. second CAN adapter):
ros2 launch saltybot_dronecan_gps here4.launch.py can_interface:=can1
# Pin to specific Here4 node ID (skip auto-detect):
ros2 launch saltybot_dronecan_gps here4.launch.py node_id_filter:=10
System dependency
-----------------
pip install dronecan # DroneCAN/UAVCAN v0 Python library
apt install can-utils # optional: candump, cansend for debugging
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description() -> LaunchDescription:
pkg_share = get_package_share_directory("saltybot_dronecan_gps")
params_file = os.path.join(pkg_share, "config", "here4_params.yaml")
args = [
DeclareLaunchArgument(
"can_interface",
default_value="can0",
description="SocketCAN interface (CANable2 @ 1 Mbps)",
),
DeclareLaunchArgument(
"bring_up_can",
default_value="false",
description="Bring up can_interface via ip link (requires CAP_NET_ADMIN)",
),
DeclareLaunchArgument(
"node_id_filter",
default_value="0",
description="DroneCAN node ID of Here4 (0=auto-detect from first Fix2)",
),
DeclareLaunchArgument(
"local_node_id",
default_value="126",
description="DroneCAN node ID for this Orin (must be unique on bus)",
),
]
node = Node(
package="saltybot_dronecan_gps",
executable="here4_node",
name="here4_can_node",
output="screen",
parameters=[
params_file,
{
"can_interface": LaunchConfiguration("can_interface"),
"bring_up_can": LaunchConfiguration("bring_up_can"),
"node_id_filter": LaunchConfiguration("node_id_filter"),
"local_node_id": LaunchConfiguration("local_node_id"),
},
],
)
return LaunchDescription([*args, node])

View File

@ -0,0 +1,37 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_dronecan_gps</name>
<version>0.1.0</version>
<description>
DroneCAN/UAVCAN v0 bridge for Here4 GPS on the CANable2 SocketCAN adapter.
Publishes GPS fix, IMU, magnetometer, and barometer data to ROS2.
Injects RTCM corrections for RTK.
bd-p47c: CANable2 freed from ESP32 BALANCE comms by bd-wim1.
Here4 operates at 1 Mbps on can0 (was 500 kbps for VESC comms).
System dependency: dronecan Python library (pip install dronecan)
</description>
<maintainer email="sl-perception@saltylab.local">sl-perception</maintainer>
<license>MIT</license>
<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>
<exec_depend>sensor_msgs</exec_depend>
<exec_depend>geometry_msgs</exec_depend>
<!-- dronecan: pip install dronecan (DroneCAN/UAVCAN v0 Python library) -->
<!-- python3-can is a transitive dependency of dronecan -->
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1 @@
# saltybot_dronecan_gps — Here4 GPS DroneCAN bridge for CANable2 (bd-p47c)

View File

@ -0,0 +1,333 @@
"""dronecan_parser.py — Pure-Python conversion of DroneCAN/UAVCAN v0 message
objects to plain Python dicts, independent of ROS2.
Keeps the dronecan library dependency isolated so that unit tests can run
without the dronecan package or CAN hardware.
DroneCAN message types handled
-------------------------------
uavcan.equipment.gnss.Fix2 DTID 1063
uavcan.equipment.gnss.Auxiliary DTID 1060
uavcan.equipment.ahrs.RawIMU DTID 1003
uavcan.equipment.ahrs.MagneticFieldStrength2 DTID 1001
uavcan.equipment.air_data.StaticPressure DTID 1028
uavcan.equipment.air_data.StaticTemperature DTID 1029
uavcan.protocol.NodeStatus DTID 341
uavcan.equipment.gnss.RTCMStream DTID 1062 (TX: RTCM injection)
Fix2 status values
------------------
0 NO_FIX
1 TIME_ONLY
2 2D_FIX
3 3D_FIX
NodeStatus health / mode
-------------------------
health 0=OK 1=WARNING 2=ERROR 3=CRITICAL
mode 0=OPERATIONAL 1=INITIALIZATION 2=MAINTENANCE 3=SOFTWARE_UPDATE 7=OFFLINE
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
# ── DSDL constants ────────────────────────────────────────────────────────────
DTID_FIX2 = 1063
DTID_AUXILIARY = 1060
DTID_RAW_IMU = 1003
DTID_MAG_FIELD = 1001
DTID_STATIC_PRESSURE = 1028
DTID_STATIC_TEMPERATURE = 1029
DTID_NODE_STATUS = 341
DTID_RTCM_STREAM = 1062
# Fix2 status
FIX_NO_FIX = 0
FIX_TIME_ONLY = 1
FIX_2D = 2
FIX_3D = 3
FIX_LABEL = {FIX_NO_FIX: "NO_FIX", FIX_TIME_ONLY: "TIME_ONLY",
FIX_2D: "2D_FIX", FIX_3D: "3D_FIX"}
# NodeStatus health
HEALTH_OK = 0
HEALTH_WARNING = 1
HEALTH_ERROR = 2
HEALTH_CRITICAL = 3
HEALTH_LABEL = {0: "OK", 1: "WARNING", 2: "ERROR", 3: "CRITICAL"}
MODE_LABEL = {0: "OPERATIONAL", 1: "INITIALIZATION", 2: "MAINTENANCE",
3: "SOFTWARE_UPDATE", 7: "OFFLINE"}
# RTCM stream: max bytes per DroneCAN frame
RTCM_MAX_CHUNK = 128
# ── Helpers ───────────────────────────────────────────────────────────────────
def _getf(obj: Any, *attrs: str, default: Any = None) -> Any:
"""Get nested attribute with fallback, tolerant of dronecan proxy objects."""
for attr in attrs:
try:
obj = getattr(obj, attr)
except AttributeError:
return default
return obj
def _list3(vec: Any, default: float = 0.0) -> List[float]:
"""Convert a dronecan 3-element array to a plain list of floats."""
try:
return [float(vec[0]), float(vec[1]), float(vec[2])]
except (TypeError, IndexError):
return [default, default, default]
def _list9(mat: Any, default: float = 0.0) -> List[float]:
"""Convert a dronecan 9-element array (row-major 3×3) to a plain list."""
try:
return [float(mat[i]) for i in range(9)]
except (TypeError, IndexError):
return [default] * 9
# ── Fix2 ─────────────────────────────────────────────────────────────────────
def parse_fix2(msg: Any) -> Optional[Dict[str, Any]]:
"""Parse a uavcan.equipment.gnss.Fix2 message.
Parameters
----------
msg : dronecan message object (event.message / event.transfer.payload)
Returns
-------
dict with keys:
lat_deg float degrees (WGS84)
lon_deg float degrees (WGS84)
alt_msl_m float metres above MSL
alt_ellipsoid_m float metres above WGS84 ellipsoid
ned_vel list[3] [north, east, down] m/s
fix_type int 0..3
fix_label str
sats_used int
pdop float
hdop float
vdop float
pos_cov list[9] row-major 3×3 position covariance,
undulation_m float WGS84 geoid undulation (m)
gnss_timestamp_us int microseconds (GPS epoch)
"""
if msg is None:
return None
try:
lat = float(_getf(msg, "latitude_deg_1e8", default=0)) * 1e-8
lon = float(_getf(msg, "longitude_deg_1e8", default=0)) * 1e-8
alt_msl = float(_getf(msg, "height_msl_mm", default=0)) * 1e-3
alt_ell = float(_getf(msg, "height_ellipsoid_mm", default=0)) * 1e-3
ned = _list3(_getf(msg, "ned_velocity"))
fix_type = int(_getf(msg, "status", default=FIX_NO_FIX))
# sats_used is uint7, may be named differently across DSDL revisions
sats = int(_getf(msg, "sats_used", default=0))
pdop = float(_getf(msg, "pdop", default=99.0))
hdop = float(_getf(msg, "hdop", default=99.0))
vdop = float(_getf(msg, "vdop", default=99.0))
undulation = float(_getf(msg, "undulation", default=0.0))
# position_covariance: optional, may be absent in older DSDL
pos_cov_raw = _getf(msg, "position_covariance")
pos_cov = _list9(pos_cov_raw) if pos_cov_raw is not None else [0.0] * 9
gnss_ts = int(_getf(msg, "gnss_timestamp", default=0))
return {
"lat_deg": lat,
"lon_deg": lon,
"alt_msl_m": alt_msl,
"alt_ellipsoid_m": alt_ell,
"ned_vel": ned,
"fix_type": fix_type,
"fix_label": FIX_LABEL.get(fix_type, f"UNKNOWN({fix_type})"),
"sats_used": sats,
"pdop": pdop,
"hdop": hdop,
"vdop": vdop,
"pos_cov": pos_cov,
"undulation_m": undulation,
"gnss_timestamp_us": gnss_ts,
}
except Exception:
return None
# ── RawIMU ────────────────────────────────────────────────────────────────────
def parse_raw_imu(msg: Any) -> Optional[Dict[str, Any]]:
"""Parse a uavcan.equipment.ahrs.RawIMU message.
Returns
-------
dict with keys:
gyro_xyz list[3] rad/s
accel_xyz list[3] m/
dt_s float integration interval (seconds)
gyro_cov list[9] row-major 3×3 covariance
accel_cov list[9]
"""
if msg is None:
return None
try:
gyro = _list3(_getf(msg, "rate_gyro_latest"))
accel = _list3(_getf(msg, "accelerometer_latest"))
dt = float(_getf(msg, "integration_interval", default=0.0))
gyro_cov = _list9(_getf(msg, "rate_gyro_covariance"))
accel_cov = _list9(_getf(msg, "accelerometer_covariance"))
return {
"gyro_xyz": gyro,
"accel_xyz": accel,
"dt_s": dt,
"gyro_cov": gyro_cov,
"accel_cov": accel_cov,
}
except Exception:
return None
# ── MagneticFieldStrength2 ────────────────────────────────────────────────────
def parse_mag(msg: Any) -> Optional[Dict[str, Any]]:
"""Parse a uavcan.equipment.ahrs.MagneticFieldStrength2 message.
Returns
-------
dict with keys:
sensor_id int
field_ga list[3] [x, y, z] Gauss
field_cov list[9] row-major 3×3 covariance (Gauss²)
"""
if msg is None:
return None
try:
sensor_id = int(_getf(msg, "sensor_id", default=0))
field_ga = _list3(_getf(msg, "magnetic_field_ga"))
field_cov = _list9(_getf(msg, "magnetic_field_covariance"))
return {
"sensor_id": sensor_id,
"field_ga": field_ga,
"field_cov": field_cov,
}
except Exception:
return None
# ── StaticPressure / StaticTemperature ───────────────────────────────────────
def parse_static_pressure(msg: Any) -> Optional[Dict[str, Any]]:
"""Parse a uavcan.equipment.air_data.StaticPressure message.
Returns
-------
dict with keys:
pressure_pa float Pascals
pressure_variance float Pa²
"""
if msg is None:
return None
try:
return {
"pressure_pa": float(_getf(msg, "static_pressure", default=0.0)),
"pressure_variance": float(_getf(msg, "static_pressure_variance", default=0.0)),
}
except Exception:
return None
def parse_static_temperature(msg: Any) -> Optional[Dict[str, Any]]:
"""Parse a uavcan.equipment.air_data.StaticTemperature message.
Returns
-------
dict with keys:
temperature_k float Kelvin
"""
if msg is None:
return None
try:
return {
"temperature_k": float(_getf(msg, "static_temperature", default=273.15)),
}
except Exception:
return None
# ── NodeStatus ────────────────────────────────────────────────────────────────
def parse_node_status(msg: Any, source_node_id: int) -> Optional[Dict[str, Any]]:
"""Parse a uavcan.protocol.NodeStatus message.
Returns
-------
dict with keys:
node_id int
uptime_sec int
health int 0=OK .. 3=CRITICAL
health_label str
mode int
mode_label str
vendor_code int
"""
if msg is None:
return None
try:
health = int(_getf(msg, "health", default=0))
mode = int(_getf(msg, "mode", default=0))
uptime = int(_getf(msg, "uptime_sec", default=0))
vendor_code = int(_getf(msg, "vendor_specific_status_code", default=0))
return {
"node_id": source_node_id,
"uptime_sec": uptime,
"health": health,
"health_label": HEALTH_LABEL.get(health, f"UNKNOWN({health})"),
"mode": mode,
"mode_label": MODE_LABEL.get(mode, f"UNKNOWN({mode})"),
"vendor_code": vendor_code,
}
except Exception:
return None
# ── RTCM stream chunking ──────────────────────────────────────────────────────
def chunk_rtcm(data: bytes, sequence_id: int) -> List[Dict[str, Any]]:
"""Split a raw RTCM byte string into DroneCAN RTCMStream message dicts.
Each chunk carries up to RTCM_MAX_CHUNK bytes. The caller must convert
these dicts into actual dronecan.uavcan.equipment.gnss.RTCMStream objects
and broadcast them on the bus.
Parameters
----------
data : raw RTCM correction bytes
sequence_id : monotonically increasing counter (uint16, wraps at 65535)
Returns
-------
List of dicts: {"sequence_id": int, "data": bytes}
"""
chunks = []
for i in range(0, len(data), RTCM_MAX_CHUNK):
chunks.append({
"sequence_id": sequence_id & 0xFFFF,
"data": data[i:i + RTCM_MAX_CHUNK],
})
sequence_id += 1
return chunks

View File

@ -0,0 +1,532 @@
"""here4_node.py — ROS2 node for Here4 GPS via DroneCAN/UAVCAN v0 on CANable2.
bd-p47c: CANable2 (freed from ESP32 BALANCE comms by bd-wim1) is now used
for Here4 GPS at 1 Mbps DroneCAN.
CAN setup (run once before launching, or use bring_up_can:=true arg)
-----------------------------------------------------------------------
sudo ip link set can0 down
sudo ip link set can0 type can bitrate 1000000
sudo ip link set can0 up
DroneCAN messages received from Here4
--------------------------------------
uavcan.equipment.gnss.Fix2 DTID 1063 ~5 Hz
uavcan.equipment.gnss.Auxiliary DTID 1060 ~5 Hz
uavcan.equipment.ahrs.RawIMU DTID 1003 ~50 Hz
uavcan.equipment.ahrs.MagneticFieldStrength2 DTID 1001 ~10 Hz
uavcan.equipment.air_data.StaticPressure DTID 1028 ~10 Hz
uavcan.equipment.air_data.StaticTemperature DTID 1029 ~10 Hz
uavcan.protocol.NodeStatus DTID 341 ~1 Hz
DroneCAN messages transmitted to Here4
----------------------------------------
uavcan.equipment.gnss.RTCMStream DTID 1062 on /rtcm input
ROS2 topics published
----------------------
/gps/fix sensor_msgs/NavSatFix 5 Hz feeds navsat_transform_node
/gps/velocity geometry_msgs/TwistWithCovarianceStamped 5 Hz
/here4/fix sensor_msgs/NavSatFix alias of /gps/fix
/here4/imu sensor_msgs/Imu 50 Hz
/here4/mag sensor_msgs/MagneticField 10 Hz
/here4/baro sensor_msgs/FluidPressure 10 Hz
/here4/status std_msgs/String JSON 1 Hz (node ID, health, fix quality)
/here4/node_id std_msgs/Int32 node ID once discovered
ROS2 topics subscribed
-----------------------
/rtcm std_msgs/ByteMultiArray RTCM corrections (from NTRIP client)
or std_msgs/String (hex-encoded, fallback)
Parameters
----------
can_interface str SocketCAN interface name default: can0
local_node_id int This node's DroneCAN ID (1-127) default: 126
bitrate int CAN bitrate, bps default: 1000000
bring_up_can bool Bring up can_interface via ip default: false
node_id_filter int Specific Here4 node ID (0=auto) default: 0
fix_frame_id str Frame ID for NavSatFix default: gps
imu_frame_id str Frame ID for Imu default: here4_imu
"""
from __future__ import annotations
import json
import math
import subprocess
import threading
import time
from typing import Any, Optional
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import TwistWithCovarianceStamped
from sensor_msgs.msg import FluidPressure, Imu, MagneticField, NavSatFix, NavSatStatus
from std_msgs.msg import ByteMultiArray, Int32, String
from .dronecan_parser import (
FIX_3D,
FIX_NO_FIX,
chunk_rtcm,
parse_fix2,
parse_mag,
parse_raw_imu,
parse_static_pressure,
parse_static_temperature,
parse_node_status,
)
# Gauss → Tesla conversion
_GA_TO_T = 1e-4
# NavSatStatus mapping from DroneCAN fix type
_FIX_TO_NAV_STATUS = {
0: NavSatStatus.STATUS_NO_FIX, # NO_FIX
1: NavSatStatus.STATUS_NO_FIX, # TIME_ONLY
2: NavSatStatus.STATUS_FIX, # 2D_FIX
3: NavSatStatus.STATUS_FIX, # 3D_FIX (upgraded to SBAS/RTK when applicable)
}
# Here4 supports GPS+GLONASS+BeiDou+Galileo
_SERVICE_FLAGS = (NavSatStatus.SERVICE_GPS | NavSatStatus.SERVICE_GLONASS |
NavSatStatus.SERVICE_GALILEO | NavSatStatus.SERVICE_COMPASS)
# RTCM injection sequence counter (per-session, wraps at 65535)
_rtcm_seq = 0
_rtcm_lock = threading.Lock()
def _bring_up_can(iface: str, bitrate: int, logger) -> bool:
"""Bring up the SocketCAN interface at the requested bitrate."""
try:
subprocess.run(["ip", "link", "set", iface, "down"], check=False)
subprocess.run(["ip", "link", "set", iface, "type", "can",
"bitrate", str(bitrate)], check=True)
subprocess.run(["ip", "link", "set", iface, "up"], check=True)
logger.info(f"SocketCAN {iface} up at {bitrate} bps")
return True
except subprocess.CalledProcessError as exc:
logger.error(f"Failed to bring up {iface}: {exc} — check sudo/udev rules")
return False
class Here4CanNode(Node):
"""DroneCAN bridge publishing Here4 GPS, IMU, mag, and baro to ROS2."""
def __init__(self) -> None:
super().__init__("here4_can_node")
# ── Parameters ────────────────────────────────────────────────────
self.declare_parameter("can_interface", "can0")
self.declare_parameter("local_node_id", 126)
self.declare_parameter("bitrate", 1_000_000)
self.declare_parameter("bring_up_can", False)
self.declare_parameter("node_id_filter", 0) # 0 = auto-detect
self.declare_parameter("fix_frame_id", "gps")
self.declare_parameter("imu_frame_id", "here4_imu")
self._iface = self.get_parameter("can_interface").value
self._local_nid = self.get_parameter("local_node_id").value
self._bitrate = self.get_parameter("bitrate").value
self._bring_up = self.get_parameter("bring_up_can").value
self._nid_filter = self.get_parameter("node_id_filter").value
self._fix_frame = self.get_parameter("fix_frame_id").value
self._imu_frame = self.get_parameter("imu_frame_id").value
# ── State ─────────────────────────────────────────────────────────
self._here4_nid: Optional[int] = self._nid_filter if self._nid_filter else None
self._last_fix: Optional[dict] = None
self._last_temp_k: float = 293.15 # 20 °C default until first reading
self._dc_node = None # dronecan node, set up after CAN is ready
# ── Publishers ────────────────────────────────────────────────────
self._pub_fix = self.create_publisher(NavSatFix,
"/gps/fix", 10)
self._pub_fix_h4 = self.create_publisher(NavSatFix,
"/here4/fix", 10)
self._pub_vel = self.create_publisher(TwistWithCovarianceStamped,
"/gps/velocity", 10)
self._pub_imu = self.create_publisher(Imu,
"/here4/imu", 10)
self._pub_mag = self.create_publisher(MagneticField,
"/here4/mag", 10)
self._pub_baro = self.create_publisher(FluidPressure,
"/here4/baro", 10)
self._pub_status = self.create_publisher(String,
"/here4/status", 10)
self._pub_node_id = self.create_publisher(Int32,
"/here4/node_id", 10)
# ── Subscriptions ─────────────────────────────────────────────────
self.create_subscription(ByteMultiArray, "/rtcm",
self._on_rtcm_bytes, 10)
self.create_subscription(String, "/rtcm_hex",
self._on_rtcm_hex, 10)
# ── Bring up CAN if requested ─────────────────────────────────────
if self._bring_up:
_bring_up_can(self._iface, self._bitrate, self.get_logger())
# ── Initialise DroneCAN node ──────────────────────────────────────
self._init_dronecan()
# ── Timers ────────────────────────────────────────────────────────
# Poll DroneCAN at 200 Hz (5 ms) to drain the RX queue in real-time
self.create_timer(0.005, self._spin_dronecan)
# Status publish at 1 Hz
self.create_timer(1.0, self._publish_status)
# ── DroneCAN initialisation ───────────────────────────────────────────
def _init_dronecan(self) -> None:
try:
import dronecan # type: ignore[import]
except ImportError:
self.get_logger().error(
"dronecan package not found — install with: pip install dronecan\n"
"Here4 node will not receive GPS data until dronecan is installed."
)
return
try:
self._dc_node = dronecan.make_node(
f"socketcan:{self._iface}",
node_id=self._local_nid,
bitrate=self._bitrate,
)
except Exception as exc:
self.get_logger().error(
f"Failed to open DroneCAN on {self._iface}: {exc}\n"
f"Verify: ip link show {self._iface} and try bring_up_can:=true"
)
return
# ── Register message handlers ──────────────────────────────────────
dc = dronecan
self._dc_node.add_handler(dc.uavcan.equipment.gnss.Fix2,
self._on_fix2)
self._dc_node.add_handler(dc.uavcan.equipment.ahrs.RawIMU,
self._on_raw_imu)
self._dc_node.add_handler(dc.uavcan.equipment.ahrs.MagneticFieldStrength2,
self._on_mag)
self._dc_node.add_handler(dc.uavcan.equipment.air_data.StaticPressure,
self._on_pressure)
self._dc_node.add_handler(dc.uavcan.equipment.air_data.StaticTemperature,
self._on_temperature)
self._dc_node.add_handler(dc.uavcan.protocol.NodeStatus,
self._on_node_status)
self.get_logger().info(
f"DroneCAN node ready: {self._iface} @ {self._bitrate} bps "
f"local_id={self._local_nid}"
)
# ── DroneCAN spin (called from 5 ms timer) ────────────────────────────
def _spin_dronecan(self) -> None:
if self._dc_node is None:
return
try:
self._dc_node.spin(timeout=0)
except Exception as exc:
self.get_logger().warning(
f"DroneCAN spin error: {exc}",
throttle_duration_sec=5.0,
)
# ── Source node filter ────────────────────────────────────────────────
def _accept(self, event: Any) -> bool: # type: ignore[valid-type]
"""Return True if this event comes from the tracked Here4 node."""
src = getattr(getattr(event, "transfer", None), "source_node_id", None)
if src is None:
return True # unknown source — accept
if self._here4_nid is None:
return True # auto-detect mode — accept any
return src == self._here4_nid
# ── DroneCAN message handlers ─────────────────────────────────────────
def _on_fix2(self, event) -> None:
src = getattr(getattr(event, "transfer", None), "source_node_id", None)
# Auto-latch: first node that sends Fix2 becomes our Here4
if self._here4_nid is None and src is not None:
self._here4_nid = src
self.get_logger().info(
f"Here4 auto-discovered: DroneCAN node ID {src}"
)
nid_msg = Int32()
nid_msg.data = src
self._pub_node_id.publish(nid_msg)
if not self._accept(event):
return
d = parse_fix2(event.message)
if d is None:
return
self._last_fix = d
stamp = self.get_clock().now().to_msg()
self._publish_navsatfix(d, stamp)
self._publish_velocity(d, stamp)
def _on_raw_imu(self, event) -> None:
if not self._accept(event):
return
d = parse_raw_imu(event.message)
if d is None:
return
stamp = self.get_clock().now().to_msg()
msg = Imu()
msg.header.stamp = stamp
msg.header.frame_id = self._imu_frame
# Orientation: not provided by DroneCAN RawIMU — signal unknown
msg.orientation_covariance[0] = -1.0
gx, gy, gz = d["gyro_xyz"]
msg.angular_velocity.x = gx
msg.angular_velocity.y = gy
msg.angular_velocity.z = gz
_fill_cov9(msg.angular_velocity_covariance, d["gyro_cov"])
ax, ay, az = d["accel_xyz"]
msg.linear_acceleration.x = ax
msg.linear_acceleration.y = ay
msg.linear_acceleration.z = az
_fill_cov9(msg.linear_acceleration_covariance, d["accel_cov"])
self._pub_imu.publish(msg)
def _on_mag(self, event) -> None:
if not self._accept(event):
return
d = parse_mag(event.message)
if d is None:
return
stamp = self.get_clock().now().to_msg()
msg = MagneticField()
msg.header.stamp = stamp
msg.header.frame_id = self._imu_frame
gx, gy, gz = d["field_ga"]
msg.magnetic_field.x = gx * _GA_TO_T
msg.magnetic_field.y = gy * _GA_TO_T
msg.magnetic_field.z = gz * _GA_TO_T
_fill_cov9(msg.magnetic_field_covariance,
[v * _GA_TO_T * _GA_TO_T for v in d["field_cov"]])
self._pub_mag.publish(msg)
def _on_pressure(self, event) -> None:
if not self._accept(event):
return
d = parse_static_pressure(event.message)
if d is None:
return
stamp = self.get_clock().now().to_msg()
msg = FluidPressure()
msg.header.stamp = stamp
msg.header.frame_id = self._imu_frame
msg.fluid_pressure = d["pressure_pa"]
msg.variance = d["pressure_variance"]
self._pub_baro.publish(msg)
def _on_temperature(self, event) -> None:
if not self._accept(event):
return
d = parse_static_temperature(event.message)
if d is None:
return
self._last_temp_k = d["temperature_k"]
def _on_node_status(self, event) -> None:
src = getattr(getattr(event, "transfer", None), "source_node_id", None)
if src is None:
return
d = parse_node_status(event.message, src)
if d is None:
return
# Auto-discovery: latch first node that sends GPS-relevant data
# NodeStatus alone is not enough — we track whichever source_node_id
# first delivered a Fix2 message; if none yet, log all nodes seen.
if self._here4_nid is None:
# Any node could be Here4 — we'll latch on first Fix2 source
self.get_logger().debug(
f"DroneCAN node {src} alive: health={d['health_label']} "
f"mode={d['mode_label']} uptime={d['uptime_sec']}s"
)
elif src == self._here4_nid:
self._last_node_status = d
# ── NavSatFix publishing ─────────────────────────────────────────────
def _publish_navsatfix(self, d: dict, stamp) -> None:
fix_type = d["fix_type"]
status = NavSatStatus()
status.service = _SERVICE_FLAGS
# Upgrade status for RTK: hdop < 0.5 and 3D fix → treat as SBAS/RTK
if fix_type == FIX_3D:
if d["hdop"] < 0.5:
status.status = NavSatStatus.STATUS_GBAS_FIX # RTK fixed
elif d["hdop"] < 1.5:
status.status = NavSatStatus.STATUS_SBAS_FIX # SBAS/DGPS
else:
status.status = NavSatStatus.STATUS_FIX
else:
status.status = _FIX_TO_NAV_STATUS.get(fix_type, NavSatStatus.STATUS_NO_FIX)
msg = NavSatFix()
msg.header.stamp = stamp
msg.header.frame_id = self._fix_frame
msg.status = status
msg.latitude = d["lat_deg"]
msg.longitude = d["lon_deg"]
msg.altitude = d["alt_msl_m"]
# Position covariance (diagonal from DroneCAN; full matrix if available)
pos_cov = d["pos_cov"]
msg.position_covariance_type = (
NavSatFix.COVARIANCE_TYPE_FULL if any(pos_cov) else
NavSatFix.COVARIANCE_TYPE_APPROXIMATED
)
if any(pos_cov):
for i, v in enumerate(pos_cov):
msg.position_covariance[i] = v
else:
# Approximate from HDOP/VDOP: σ_h ≈ hdop × 3m, σ_v ≈ vdop × 5m
sh2 = (d["hdop"] * 3.0) ** 2
sv2 = (d["vdop"] * 5.0) ** 2
msg.position_covariance[0] = sh2 # east
msg.position_covariance[4] = sh2 # north
msg.position_covariance[8] = sv2 # up
self._pub_fix.publish(msg)
self._pub_fix_h4.publish(msg)
def _publish_velocity(self, d: dict, stamp) -> None:
vn, ve, vd = d["ned_vel"]
msg = TwistWithCovarianceStamped()
msg.header.stamp = stamp
msg.header.frame_id = self._fix_frame
# NED → ENU: x=east=ve, y=north=vn, z=up=-vd
msg.twist.twist.linear.x = ve
msg.twist.twist.linear.y = vn
msg.twist.twist.linear.z = -vd
# Velocity covariance: approximate from HDOP
vel_var = (d["hdop"] * 0.2) ** 2
msg.twist.covariance[0] = vel_var # xx (east)
msg.twist.covariance[7] = vel_var # yy (north)
msg.twist.covariance[14] = (d["vdop"] * 0.3) ** 2 # zz (up)
self._pub_vel.publish(msg)
# ── Status publish (1 Hz) ────────────────────────────────────────────
def _publish_status(self) -> None:
fix = self._last_fix
ns = getattr(self, "_last_node_status", None)
payload: dict = {
"here4_node_id": self._here4_nid,
"can_interface": self._iface,
"dc_node_ready": self._dc_node is not None,
"fix_type": fix["fix_type"] if fix else FIX_NO_FIX,
"fix_label": fix["fix_label"] if fix else "NO_FIX",
"sats_used": fix["sats_used"] if fix else 0,
"hdop": round(fix["hdop"], 2) if fix else 99.0,
"lat_deg": round(fix["lat_deg"], 7) if fix else 0.0,
"lon_deg": round(fix["lon_deg"], 7) if fix else 0.0,
"alt_msl_m": round(fix["alt_msl_m"], 2) if fix else 0.0,
"temp_c": round(self._last_temp_k - 273.15, 1),
"node_health": ns["health_label"] if ns else "UNKNOWN",
"node_uptime_s": ns["uptime_sec"] if ns else 0,
}
msg = String()
msg.data = json.dumps(payload)
self._pub_status.publish(msg)
# Also publish discovered node ID
if self._here4_nid is not None:
nid_msg = Int32()
nid_msg.data = self._here4_nid
self._pub_node_id.publish(nid_msg)
# ── RTCM injection ────────────────────────────────────────────────────
def _on_rtcm_bytes(self, msg: ByteMultiArray) -> None:
"""Receive RTCM bytes from /rtcm and inject via DroneCAN RTCMStream."""
raw = bytes(msg.data)
self._inject_rtcm(raw)
def _on_rtcm_hex(self, msg: String) -> None:
"""Receive hex-encoded RTCM from /rtcm_hex (fallback topic)."""
try:
raw = bytes.fromhex(msg.data.strip())
except ValueError as exc:
self.get_logger().error(f"Bad /rtcm_hex: {exc}")
return
self._inject_rtcm(raw)
def _inject_rtcm(self, raw: bytes) -> None:
if self._dc_node is None or not raw:
return
global _rtcm_seq
try:
import dronecan # type: ignore[import]
except ImportError:
return
with _rtcm_lock:
chunks = chunk_rtcm(raw, _rtcm_seq)
_rtcm_seq = (_rtcm_seq + len(chunks)) & 0xFFFF
for chunk in chunks:
try:
rtcm_msg = dronecan.uavcan.equipment.gnss.RTCMStream()
rtcm_msg.sequence_id = chunk["sequence_id"]
rtcm_msg.data = list(chunk["data"])
self._dc_node.broadcast(rtcm_msg)
except Exception as exc:
self.get_logger().warning(
f"RTCM inject error (seq {chunk['sequence_id']}): {exc}",
throttle_duration_sec=2.0,
)
# ── Shutdown ──────────────────────────────────────────────────────────
def destroy_node(self) -> None:
if self._dc_node is not None:
try:
self._dc_node.close()
except Exception:
pass
super().destroy_node()
# ── Covariance helper ─────────────────────────────────────────────────────────
def _fill_cov9(ros_cov, values) -> None:
"""Fill a 9-element ROS2 covariance array from a list."""
for i, v in enumerate(values[:9]):
ros_cov[i] = float(v)
# ── Entry point ───────────────────────────────────────────────────────────────
def main(args=None) -> None:
rclpy.init(args=args)
node = Here4CanNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
[develop]
script_dir=$base/lib/saltybot_dronecan_gps
[install]
install_scripts=$base/lib/saltybot_dronecan_gps

View File

@ -0,0 +1,27 @@
from setuptools import setup
package_name = "saltybot_dronecan_gps"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/config", ["config/here4_params.yaml"]),
(f"share/{package_name}/launch", ["launch/here4.launch.py"]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="sl-perception",
maintainer_email="sl-perception@saltylab.local",
description="DroneCAN/UAVCAN v0 bridge for Here4 GPS via CANable2 (bd-p47c)",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"here4_node = saltybot_dronecan_gps.here4_node:main",
],
},
)

View File

@ -0,0 +1,367 @@
#!/usr/bin/env python3
"""Unit tests for saltybot_dronecan_gps.dronecan_parser.
No dronecan library, ROS2, or CAN hardware required.
Uses simple Python objects (namespaces) to simulate dronecan message objects.
Run with: pytest test/ -v
"""
import math
import types
import unittest
from saltybot_dronecan_gps.dronecan_parser import (
DTID_FIX2,
DTID_RAW_IMU,
DTID_MAG_FIELD,
DTID_STATIC_PRESSURE,
DTID_STATIC_TEMPERATURE,
DTID_NODE_STATUS,
DTID_RTCM_STREAM,
FIX_NO_FIX,
FIX_TIME_ONLY,
FIX_2D,
FIX_3D,
RTCM_MAX_CHUNK,
chunk_rtcm,
parse_fix2,
parse_mag,
parse_node_status,
parse_raw_imu,
parse_static_pressure,
parse_static_temperature,
_getf,
_list3,
_list9,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _ns(**kwargs) -> types.SimpleNamespace:
"""Build a simple namespace to mimic a dronecan message object."""
return types.SimpleNamespace(**kwargs)
def _ns_vec(*values):
"""Return an object that is indexable like a dronecan vector field."""
return list(values)
# ── DTID constants ────────────────────────────────────────────────────────────
class TestDtidConstants(unittest.TestCase):
def test_known_values(self):
self.assertEqual(DTID_FIX2, 1063)
self.assertEqual(DTID_RAW_IMU, 1003)
self.assertEqual(DTID_MAG_FIELD, 1001)
self.assertEqual(DTID_STATIC_PRESSURE, 1028)
self.assertEqual(DTID_STATIC_TEMPERATURE, 1029)
self.assertEqual(DTID_NODE_STATUS, 341)
self.assertEqual(DTID_RTCM_STREAM, 1062)
def test_rtcm_chunk_size(self):
self.assertEqual(RTCM_MAX_CHUNK, 128)
# ── _getf / _list3 / _list9 ──────────────────────────────────────────────────
class TestHelpers(unittest.TestCase):
def test_getf_simple(self):
obj = _ns(x=42)
self.assertEqual(_getf(obj, "x"), 42)
def test_getf_missing(self):
obj = _ns(x=1)
self.assertIsNone(_getf(obj, "y"))
def test_getf_default(self):
obj = _ns()
self.assertEqual(_getf(obj, "missing", default=99), 99)
def test_list3_ok(self):
self.assertEqual(_list3([1.0, 2.0, 3.0]), [1.0, 2.0, 3.0])
def test_list3_bad(self):
self.assertEqual(_list3(None), [0.0, 0.0, 0.0])
def test_list9_ok(self):
vals = list(range(9))
self.assertEqual(_list9(vals), [float(v) for v in vals])
def test_list9_short(self):
self.assertEqual(_list9(None), [0.0] * 9)
# ── parse_fix2 ────────────────────────────────────────────────────────────────
class TestParseFix2(unittest.TestCase):
def _make_fix(self, lat_1e8=0, lon_1e8=0, alt_msl_mm=0, alt_ell_mm=0,
ned_vel=None, status=FIX_3D, sats=12,
pdop=1.2, hdop=0.9, vdop=1.5,
pos_cov=None, undulation=5.0, gnss_ts=0):
return _ns(
latitude_deg_1e8 = lat_1e8,
longitude_deg_1e8 = lon_1e8,
height_msl_mm = alt_msl_mm,
height_ellipsoid_mm = alt_ell_mm,
ned_velocity = ned_vel if ned_vel is not None else _ns_vec(0.0, 0.0, 0.0),
status = status,
sats_used = sats,
pdop = pdop,
hdop = hdop,
vdop = vdop,
position_covariance = pos_cov,
undulation = undulation,
gnss_timestamp = gnss_ts,
)
def test_basic_3d_fix(self):
msg = self._make_fix(
lat_1e8 = 4_576_543_210, # 45.7654321°
lon_1e8 = -7_345_678_901, # -73.45678901°
alt_msl_mm = 50_000, # 50 m
alt_ell_mm = 52_000, # 52 m
ned_vel = _ns_vec(1.5, -0.3, 0.0),
sats = 14,
hdop = 0.8,
)
d = parse_fix2(msg)
self.assertIsNotNone(d)
self.assertAlmostEqual(d["lat_deg"], 45.7654321, places=5)
self.assertAlmostEqual(d["lon_deg"], -73.45678901, places=5)
self.assertAlmostEqual(d["alt_msl_m"], 50.0, places=3)
self.assertAlmostEqual(d["alt_ellipsoid_m"], 52.0, places=3)
self.assertEqual(d["ned_vel"], [1.5, -0.3, 0.0])
self.assertEqual(d["fix_type"], FIX_3D)
self.assertEqual(d["fix_label"], "3D_FIX")
self.assertEqual(d["sats_used"], 14)
self.assertAlmostEqual(d["hdop"], 0.8, places=5)
def test_no_fix(self):
msg = self._make_fix(status=FIX_NO_FIX, sats=0)
d = parse_fix2(msg)
self.assertIsNotNone(d)
self.assertEqual(d["fix_type"], FIX_NO_FIX)
self.assertEqual(d["fix_label"], "NO_FIX")
self.assertEqual(d["sats_used"], 0)
def test_time_only(self):
msg = self._make_fix(status=FIX_TIME_ONLY)
d = parse_fix2(msg)
self.assertEqual(d["fix_label"], "TIME_ONLY")
def test_2d_fix(self):
msg = self._make_fix(status=FIX_2D)
d = parse_fix2(msg)
self.assertEqual(d["fix_label"], "2D_FIX")
def test_position_covariance(self):
cov = [float(i) for i in range(9)]
msg = self._make_fix(pos_cov=cov)
d = parse_fix2(msg)
self.assertEqual(d["pos_cov"], cov)
def test_no_covariance_returns_zeros(self):
msg = self._make_fix(pos_cov=None)
d = parse_fix2(msg)
self.assertEqual(d["pos_cov"], [0.0] * 9)
def test_zero_lat_lon(self):
msg = self._make_fix(lat_1e8=0, lon_1e8=0)
d = parse_fix2(msg)
self.assertAlmostEqual(d["lat_deg"], 0.0, places=8)
self.assertAlmostEqual(d["lon_deg"], 0.0, places=8)
def test_returns_none_on_bad_input(self):
self.assertIsNone(parse_fix2(None))
def test_gnss_timestamp(self):
msg = self._make_fix(gnss_ts=1_000_000_000)
d = parse_fix2(msg)
self.assertEqual(d["gnss_timestamp_us"], 1_000_000_000)
# ── parse_raw_imu ─────────────────────────────────────────────────────────────
class TestParseRawImu(unittest.TestCase):
def _make_imu(self, gyro=None, accel=None, dt=0.02,
gyro_cov=None, accel_cov=None):
return _ns(
rate_gyro_latest = gyro if gyro else _ns_vec(0.0, 0.0, 0.0),
accelerometer_latest = accel if accel else _ns_vec(0.0, 0.0, 9.81),
integration_interval = dt,
rate_gyro_covariance = gyro_cov if gyro_cov else [0.0] * 9,
accelerometer_covariance = accel_cov if accel_cov else [0.0] * 9,
)
def test_basic(self):
msg = self._make_imu(
gyro = _ns_vec(0.01, -0.02, 0.03),
accel = _ns_vec(0.1, -0.2, 9.8),
dt = 0.02,
)
d = parse_raw_imu(msg)
self.assertIsNotNone(d)
self.assertEqual(d["gyro_xyz"], [0.01, -0.02, 0.03])
self.assertEqual(d["accel_xyz"], [0.1, -0.2, 9.8])
self.assertAlmostEqual(d["dt_s"], 0.02)
def test_covariances_returned(self):
cov = [float(i * 0.001) for i in range(9)]
msg = self._make_imu(gyro_cov=cov, accel_cov=cov)
d = parse_raw_imu(msg)
self.assertEqual(d["gyro_cov"], cov)
self.assertEqual(d["accel_cov"], cov)
def test_none_input(self):
self.assertIsNone(parse_raw_imu(None))
# ── parse_mag ─────────────────────────────────────────────────────────────────
class TestParseMag(unittest.TestCase):
def _make_mag(self, sensor_id=0, field=None, cov=None):
return _ns(
sensor_id = sensor_id,
magnetic_field_ga = field if field else _ns_vec(0.1, 0.2, -0.5),
magnetic_field_covariance = cov if cov else [0.0] * 9,
)
def test_basic(self):
msg = self._make_mag(sensor_id=1, field=_ns_vec(0.1, 0.2, -0.5))
d = parse_mag(msg)
self.assertIsNotNone(d)
self.assertEqual(d["sensor_id"], 1)
self.assertEqual(d["field_ga"], [0.1, 0.2, -0.5])
def test_none_input(self):
self.assertIsNone(parse_mag(None))
# ── parse_static_pressure ────────────────────────────────────────────────────
class TestParseStaticPressure(unittest.TestCase):
def test_basic(self):
msg = _ns(static_pressure=101325.0, static_pressure_variance=100.0)
d = parse_static_pressure(msg)
self.assertIsNotNone(d)
self.assertAlmostEqual(d["pressure_pa"], 101325.0)
self.assertAlmostEqual(d["pressure_variance"], 100.0)
def test_none_input(self):
self.assertIsNone(parse_static_pressure(None))
# ── parse_static_temperature ─────────────────────────────────────────────────
class TestParseStaticTemperature(unittest.TestCase):
def test_celsius_to_kelvin(self):
msg = _ns(static_temperature=293.15) # 20 °C
d = parse_static_temperature(msg)
self.assertIsNotNone(d)
self.assertAlmostEqual(d["temperature_k"], 293.15)
def test_none_input(self):
self.assertIsNone(parse_static_temperature(None))
# ── parse_node_status ────────────────────────────────────────────────────────
class TestParseNodeStatus(unittest.TestCase):
def _make_status(self, health=0, mode=0, uptime=120, vendor_code=0):
return _ns(
health = health,
mode = mode,
uptime_sec = uptime,
vendor_specific_status_code = vendor_code,
)
def test_ok_operational(self):
msg = self._make_status(health=0, mode=0, uptime=300)
d = parse_node_status(msg, source_node_id=10)
self.assertIsNotNone(d)
self.assertEqual(d["node_id"], 10)
self.assertEqual(d["health"], 0)
self.assertEqual(d["health_label"], "OK")
self.assertEqual(d["mode_label"], "OPERATIONAL")
self.assertEqual(d["uptime_sec"], 300)
def test_error_state(self):
msg = self._make_status(health=2, mode=0)
d = parse_node_status(msg, source_node_id=5)
self.assertEqual(d["health_label"], "ERROR")
def test_critical_state(self):
msg = self._make_status(health=3)
d = parse_node_status(msg, 7)
self.assertEqual(d["health_label"], "CRITICAL")
def test_none_input(self):
self.assertIsNone(parse_node_status(None, 1))
# ── chunk_rtcm ────────────────────────────────────────────────────────────────
class TestChunkRtcm(unittest.TestCase):
def test_empty(self):
chunks = chunk_rtcm(b"", 0)
self.assertEqual(chunks, [])
def test_single_chunk(self):
data = bytes(range(64))
chunks = chunk_rtcm(data, 5)
self.assertEqual(len(chunks), 1)
self.assertEqual(chunks[0]["sequence_id"], 5)
self.assertEqual(chunks[0]["data"], data)
def test_exact_chunk_boundary(self):
data = bytes(RTCM_MAX_CHUNK)
chunks = chunk_rtcm(data, 0)
self.assertEqual(len(chunks), 1)
self.assertEqual(len(chunks[0]["data"]), RTCM_MAX_CHUNK)
def test_two_chunks(self):
data = bytes(RTCM_MAX_CHUNK + 1)
chunks = chunk_rtcm(data, 0)
self.assertEqual(len(chunks), 2)
self.assertEqual(len(chunks[0]["data"]), RTCM_MAX_CHUNK)
self.assertEqual(len(chunks[1]["data"]), 1)
def test_sequence_id_increments(self):
data = bytes(RTCM_MAX_CHUNK * 3)
chunks = chunk_rtcm(data, 10)
for i, chunk in enumerate(chunks):
self.assertEqual(chunk["sequence_id"], 10 + i)
def test_sequence_id_wraps_at_16bit(self):
data = bytes(RTCM_MAX_CHUNK * 2)
chunks = chunk_rtcm(data, 0xFFFF)
self.assertEqual(chunks[0]["sequence_id"], 0xFFFF)
self.assertEqual(chunks[1]["sequence_id"], 0x0000) # wraps
def test_data_round_trip(self):
original = bytes(range(200))
chunks = chunk_rtcm(original, 0)
reassembled = b"".join(c["data"] for c in chunks)
self.assertEqual(reassembled, original)
def test_large_message(self):
data = bytes(1024)
chunks = chunk_rtcm(data, 0)
total_bytes = sum(len(c["data"]) for c in chunks)
self.assertEqual(total_bytes, 1024)
for c in chunks:
self.assertLessEqual(len(c["data"]), RTCM_MAX_CHUNK)
if __name__ == "__main__":
unittest.main()