From 9d6c72bd24c3e768f1f50232bda76b4dac59bb93 Mon Sep 17 00:00:00 2001 From: sl-perception Date: Fri, 17 Apr 2026 21:49:00 -0400 Subject: [PATCH] feat: Here4 GPS DroneCAN integration via CANable2 (bd-p47c) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements saltybot_dronecan_gps ROS2 package — DroneCAN/UAVCAN v0 bridge that publishes Here4 GPS, IMU, magnetometer, and barometer data to ROS2. CANable2 freed from ESP32 BALANCE comms (bd-wim1) now runs Here4 at 1 Mbps DroneCAN. Key features: - dronecan_parser.py: pure-Python DSDL converters (Fix2, RawIMU, Mag, StaticPressure, StaticTemperature, NodeStatus, RTCMStream chunks), testable without dronecan library or CAN hardware - here4_node.py: ROS2 node, auto-discovers Here4 node ID on first Fix2, publishes /gps/fix + /gps/velocity for navsat_transform EKF fusion, HDOP-based NavSatStatus upgrade (RTK/SBAS), RTCM injection via /rtcm (ByteMultiArray) or /rtcm_hex (String hex fallback) - 39 unit tests, all passing - bring_up_can:=true parameter to configure SocketCAN at launch Resolves bd-p47c Co-Authored-By: Claude Sonnet 4.6 --- .../config/here4_params.yaml | 22 + .../launch/here4.launch.py | 109 ++++ .../src/saltybot_dronecan_gps/package.xml | 37 ++ .../resource/saltybot_dronecan_gps | 0 .../saltybot_dronecan_gps/__init__.py | 1 + .../saltybot_dronecan_gps/dronecan_parser.py | 333 +++++++++++ .../saltybot_dronecan_gps/here4_node.py | 532 ++++++++++++++++++ .../src/saltybot_dronecan_gps/setup.cfg | 5 + .../src/saltybot_dronecan_gps/setup.py | 27 + .../saltybot_dronecan_gps/test/__init__.py | 0 .../test/test_dronecan_parser.py | 367 ++++++++++++ 11 files changed, 1433 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_dronecan_gps/config/here4_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_dronecan_gps/launch/here4.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_dronecan_gps/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_dronecan_gps/resource/saltybot_dronecan_gps create mode 100644 jetson/ros2_ws/src/saltybot_dronecan_gps/saltybot_dronecan_gps/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_dronecan_gps/saltybot_dronecan_gps/dronecan_parser.py create mode 100644 jetson/ros2_ws/src/saltybot_dronecan_gps/saltybot_dronecan_gps/here4_node.py create mode 100644 jetson/ros2_ws/src/saltybot_dronecan_gps/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_dronecan_gps/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_dronecan_gps/test/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_dronecan_gps/test/test_dronecan_parser.py diff --git a/jetson/ros2_ws/src/saltybot_dronecan_gps/config/here4_params.yaml b/jetson/ros2_ws/src/saltybot_dronecan_gps/config/here4_params.yaml new file mode 100644 index 0000000..378001e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dronecan_gps/config/here4_params.yaml @@ -0,0 +1,22 @@ +here4_can_node: + ros__parameters: + # SocketCAN interface — freed from ESP32 BALANCE comms by bd-wim1 + can_interface: can0 + + # DroneCAN bitrate — Here4 runs at 1 Mbps (not 500 kbps used previously) + bitrate: 1000000 + + # Local DroneCAN node ID for this Orin instance (1-127, must be unique on bus) + local_node_id: 126 + + # Set true to run: ip link set can0 type can bitrate 1000000 && ip link set can0 up + # Requires: sudo setcap cap_net_admin+eip $(which python3) or run as root + # Default false — manage interface externally (systemd-networkd or udev) + bring_up_can: false + + # Here4 DroneCAN node ID — 0 means auto-detect from first Fix2 message + node_id_filter: 0 + + # TF frame IDs + fix_frame_id: gps + imu_frame_id: here4_imu diff --git a/jetson/ros2_ws/src/saltybot_dronecan_gps/launch/here4.launch.py b/jetson/ros2_ws/src/saltybot_dronecan_gps/launch/here4.launch.py new file mode 100644 index 0000000..757c8f2 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dronecan_gps/launch/here4.launch.py @@ -0,0 +1,109 @@ +"""here4.launch.py — Launch the Here4 GPS DroneCAN bridge on CANable2. + +bd-p47c: CANable2 freed by bd-wim1 (ESP32 moved to UART/USB) now used for +Here4 GPS at 1 Mbps DroneCAN/UAVCAN v0. + +CAN setup (one-time, survives reboot via systemd-networkd or udev) +------------------------------------------------------------------ + # Option A — manual (quick test): + sudo ip link set can0 down + sudo ip link set can0 type can bitrate 1000000 + sudo ip link set can0 up + + # Option B — let the node do it (requires CAP_NET_ADMIN): + ros2 launch saltybot_dronecan_gps here4.launch.py bring_up_can:=true + + # Option C — systemd-networkd (/etc/systemd/network/80-can0.network): + [Match] + Name=can0 + [CAN] + BitRate=1M + +Published topics +---------------- + /gps/fix sensor_msgs/NavSatFix → navsat_transform_node (EKF) + /gps/velocity geometry_msgs/TwistWithCovarianceStamped + /here4/fix sensor_msgs/NavSatFix (alias) + /here4/imu sensor_msgs/Imu + /here4/mag sensor_msgs/MagneticField + /here4/baro sensor_msgs/FluidPressure + /here4/status std_msgs/String JSON + /here4/node_id std_msgs/Int32 + +Subscribed topics +----------------- + /rtcm std_msgs/ByteMultiArray RTCM corrections (NTRIP client) + /rtcm_hex std_msgs/String hex-encoded RTCM (fallback) + +Usage +----- + # Default (interface already up): + ros2 launch saltybot_dronecan_gps here4.launch.py + + # Bring up interface automatically (CAP_NET_ADMIN required): + ros2 launch saltybot_dronecan_gps here4.launch.py bring_up_can:=true + + # Override interface (e.g. second CAN adapter): + ros2 launch saltybot_dronecan_gps here4.launch.py can_interface:=can1 + + # Pin to specific Here4 node ID (skip auto-detect): + ros2 launch saltybot_dronecan_gps here4.launch.py node_id_filter:=10 + +System dependency +----------------- + pip install dronecan # DroneCAN/UAVCAN v0 Python library + apt install can-utils # optional: candump, cansend for debugging +""" + +import os +from ament_index_python.packages import get_package_share_directory +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description() -> LaunchDescription: + pkg_share = get_package_share_directory("saltybot_dronecan_gps") + params_file = os.path.join(pkg_share, "config", "here4_params.yaml") + + args = [ + DeclareLaunchArgument( + "can_interface", + default_value="can0", + description="SocketCAN interface (CANable2 @ 1 Mbps)", + ), + DeclareLaunchArgument( + "bring_up_can", + default_value="false", + description="Bring up can_interface via ip link (requires CAP_NET_ADMIN)", + ), + DeclareLaunchArgument( + "node_id_filter", + default_value="0", + description="DroneCAN node ID of Here4 (0=auto-detect from first Fix2)", + ), + DeclareLaunchArgument( + "local_node_id", + default_value="126", + description="DroneCAN node ID for this Orin (must be unique on bus)", + ), + ] + + node = Node( + package="saltybot_dronecan_gps", + executable="here4_node", + name="here4_can_node", + output="screen", + parameters=[ + params_file, + { + "can_interface": LaunchConfiguration("can_interface"), + "bring_up_can": LaunchConfiguration("bring_up_can"), + "node_id_filter": LaunchConfiguration("node_id_filter"), + "local_node_id": LaunchConfiguration("local_node_id"), + }, + ], + ) + + return LaunchDescription([*args, node]) diff --git a/jetson/ros2_ws/src/saltybot_dronecan_gps/package.xml b/jetson/ros2_ws/src/saltybot_dronecan_gps/package.xml new file mode 100644 index 0000000..719535c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dronecan_gps/package.xml @@ -0,0 +1,37 @@ + + + + saltybot_dronecan_gps + 0.1.0 + + DroneCAN/UAVCAN v0 bridge for Here4 GPS on the CANable2 SocketCAN adapter. + Publishes GPS fix, IMU, magnetometer, and barometer data to ROS2. + Injects RTCM corrections for RTK. + + bd-p47c: CANable2 freed from ESP32 BALANCE comms by bd-wim1. + Here4 operates at 1 Mbps on can0 (was 500 kbps for VESC comms). + + System dependency: dronecan Python library (pip install dronecan) + + sl-perception + MIT + + rclpy + std_msgs + sensor_msgs + geometry_msgs + + + + + ament_python + + ament_copyright + ament_flake8 + ament_pep257 + python3-pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_dronecan_gps/resource/saltybot_dronecan_gps b/jetson/ros2_ws/src/saltybot_dronecan_gps/resource/saltybot_dronecan_gps new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_dronecan_gps/saltybot_dronecan_gps/__init__.py b/jetson/ros2_ws/src/saltybot_dronecan_gps/saltybot_dronecan_gps/__init__.py new file mode 100644 index 0000000..3c97be2 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dronecan_gps/saltybot_dronecan_gps/__init__.py @@ -0,0 +1 @@ +# saltybot_dronecan_gps — Here4 GPS DroneCAN bridge for CANable2 (bd-p47c) diff --git a/jetson/ros2_ws/src/saltybot_dronecan_gps/saltybot_dronecan_gps/dronecan_parser.py b/jetson/ros2_ws/src/saltybot_dronecan_gps/saltybot_dronecan_gps/dronecan_parser.py new file mode 100644 index 0000000..e630b7e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dronecan_gps/saltybot_dronecan_gps/dronecan_parser.py @@ -0,0 +1,333 @@ +"""dronecan_parser.py — Pure-Python conversion of DroneCAN/UAVCAN v0 message +objects to plain Python dicts, independent of ROS2. + +Keeps the dronecan library dependency isolated so that unit tests can run +without the dronecan package or CAN hardware. + +DroneCAN message types handled +------------------------------- + uavcan.equipment.gnss.Fix2 DTID 1063 + uavcan.equipment.gnss.Auxiliary DTID 1060 + uavcan.equipment.ahrs.RawIMU DTID 1003 + uavcan.equipment.ahrs.MagneticFieldStrength2 DTID 1001 + uavcan.equipment.air_data.StaticPressure DTID 1028 + uavcan.equipment.air_data.StaticTemperature DTID 1029 + uavcan.protocol.NodeStatus DTID 341 + uavcan.equipment.gnss.RTCMStream DTID 1062 (TX: RTCM injection) + +Fix2 status values +------------------ + 0 NO_FIX + 1 TIME_ONLY + 2 2D_FIX + 3 3D_FIX + +NodeStatus health / mode +------------------------- + health 0=OK 1=WARNING 2=ERROR 3=CRITICAL + mode 0=OPERATIONAL 1=INITIALIZATION 2=MAINTENANCE 3=SOFTWARE_UPDATE 7=OFFLINE +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +# ── DSDL constants ──────────────────────────────────────────────────────────── + +DTID_FIX2 = 1063 +DTID_AUXILIARY = 1060 +DTID_RAW_IMU = 1003 +DTID_MAG_FIELD = 1001 +DTID_STATIC_PRESSURE = 1028 +DTID_STATIC_TEMPERATURE = 1029 +DTID_NODE_STATUS = 341 +DTID_RTCM_STREAM = 1062 + +# Fix2 status +FIX_NO_FIX = 0 +FIX_TIME_ONLY = 1 +FIX_2D = 2 +FIX_3D = 3 + +FIX_LABEL = {FIX_NO_FIX: "NO_FIX", FIX_TIME_ONLY: "TIME_ONLY", + FIX_2D: "2D_FIX", FIX_3D: "3D_FIX"} + +# NodeStatus health +HEALTH_OK = 0 +HEALTH_WARNING = 1 +HEALTH_ERROR = 2 +HEALTH_CRITICAL = 3 + +HEALTH_LABEL = {0: "OK", 1: "WARNING", 2: "ERROR", 3: "CRITICAL"} +MODE_LABEL = {0: "OPERATIONAL", 1: "INITIALIZATION", 2: "MAINTENANCE", + 3: "SOFTWARE_UPDATE", 7: "OFFLINE"} + +# RTCM stream: max bytes per DroneCAN frame +RTCM_MAX_CHUNK = 128 + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _getf(obj: Any, *attrs: str, default: Any = None) -> Any: + """Get nested attribute with fallback, tolerant of dronecan proxy objects.""" + for attr in attrs: + try: + obj = getattr(obj, attr) + except AttributeError: + return default + return obj + + +def _list3(vec: Any, default: float = 0.0) -> List[float]: + """Convert a dronecan 3-element array to a plain list of floats.""" + try: + return [float(vec[0]), float(vec[1]), float(vec[2])] + except (TypeError, IndexError): + return [default, default, default] + + +def _list9(mat: Any, default: float = 0.0) -> List[float]: + """Convert a dronecan 9-element array (row-major 3×3) to a plain list.""" + try: + return [float(mat[i]) for i in range(9)] + except (TypeError, IndexError): + return [default] * 9 + + +# ── Fix2 ───────────────────────────────────────────────────────────────────── + +def parse_fix2(msg: Any) -> Optional[Dict[str, Any]]: + """Parse a uavcan.equipment.gnss.Fix2 message. + + Parameters + ---------- + msg : dronecan message object (event.message / event.transfer.payload) + + Returns + ------- + dict with keys: + lat_deg float degrees (WGS84) + lon_deg float degrees (WGS84) + alt_msl_m float metres above MSL + alt_ellipsoid_m float metres above WGS84 ellipsoid + ned_vel list[3] [north, east, down] m/s + fix_type int 0..3 + fix_label str + sats_used int + pdop float + hdop float + vdop float + pos_cov list[9] row-major 3×3 position covariance, m² + undulation_m float WGS84 geoid undulation (m) + gnss_timestamp_us int microseconds (GPS epoch) + """ + if msg is None: + return None + try: + lat = float(_getf(msg, "latitude_deg_1e8", default=0)) * 1e-8 + lon = float(_getf(msg, "longitude_deg_1e8", default=0)) * 1e-8 + alt_msl = float(_getf(msg, "height_msl_mm", default=0)) * 1e-3 + alt_ell = float(_getf(msg, "height_ellipsoid_mm", default=0)) * 1e-3 + + ned = _list3(_getf(msg, "ned_velocity")) + + fix_type = int(_getf(msg, "status", default=FIX_NO_FIX)) + + # sats_used is uint7, may be named differently across DSDL revisions + sats = int(_getf(msg, "sats_used", default=0)) + + pdop = float(_getf(msg, "pdop", default=99.0)) + hdop = float(_getf(msg, "hdop", default=99.0)) + vdop = float(_getf(msg, "vdop", default=99.0)) + undulation = float(_getf(msg, "undulation", default=0.0)) + + # position_covariance: optional, may be absent in older DSDL + pos_cov_raw = _getf(msg, "position_covariance") + pos_cov = _list9(pos_cov_raw) if pos_cov_raw is not None else [0.0] * 9 + + gnss_ts = int(_getf(msg, "gnss_timestamp", default=0)) + + return { + "lat_deg": lat, + "lon_deg": lon, + "alt_msl_m": alt_msl, + "alt_ellipsoid_m": alt_ell, + "ned_vel": ned, + "fix_type": fix_type, + "fix_label": FIX_LABEL.get(fix_type, f"UNKNOWN({fix_type})"), + "sats_used": sats, + "pdop": pdop, + "hdop": hdop, + "vdop": vdop, + "pos_cov": pos_cov, + "undulation_m": undulation, + "gnss_timestamp_us": gnss_ts, + } + except Exception: + return None + + +# ── RawIMU ──────────────────────────────────────────────────────────────────── + +def parse_raw_imu(msg: Any) -> Optional[Dict[str, Any]]: + """Parse a uavcan.equipment.ahrs.RawIMU message. + + Returns + ------- + dict with keys: + gyro_xyz list[3] rad/s + accel_xyz list[3] m/s² + dt_s float integration interval (seconds) + gyro_cov list[9] row-major 3×3 covariance + accel_cov list[9] + """ + if msg is None: + return None + try: + gyro = _list3(_getf(msg, "rate_gyro_latest")) + accel = _list3(_getf(msg, "accelerometer_latest")) + dt = float(_getf(msg, "integration_interval", default=0.0)) + + gyro_cov = _list9(_getf(msg, "rate_gyro_covariance")) + accel_cov = _list9(_getf(msg, "accelerometer_covariance")) + + return { + "gyro_xyz": gyro, + "accel_xyz": accel, + "dt_s": dt, + "gyro_cov": gyro_cov, + "accel_cov": accel_cov, + } + except Exception: + return None + + +# ── MagneticFieldStrength2 ──────────────────────────────────────────────────── + +def parse_mag(msg: Any) -> Optional[Dict[str, Any]]: + """Parse a uavcan.equipment.ahrs.MagneticFieldStrength2 message. + + Returns + ------- + dict with keys: + sensor_id int + field_ga list[3] [x, y, z] Gauss + field_cov list[9] row-major 3×3 covariance (Gauss²) + """ + if msg is None: + return None + try: + sensor_id = int(_getf(msg, "sensor_id", default=0)) + field_ga = _list3(_getf(msg, "magnetic_field_ga")) + field_cov = _list9(_getf(msg, "magnetic_field_covariance")) + return { + "sensor_id": sensor_id, + "field_ga": field_ga, + "field_cov": field_cov, + } + except Exception: + return None + + +# ── StaticPressure / StaticTemperature ─────────────────────────────────────── + +def parse_static_pressure(msg: Any) -> Optional[Dict[str, Any]]: + """Parse a uavcan.equipment.air_data.StaticPressure message. + + Returns + ------- + dict with keys: + pressure_pa float Pascals + pressure_variance float Pa² + """ + if msg is None: + return None + try: + return { + "pressure_pa": float(_getf(msg, "static_pressure", default=0.0)), + "pressure_variance": float(_getf(msg, "static_pressure_variance", default=0.0)), + } + except Exception: + return None + + +def parse_static_temperature(msg: Any) -> Optional[Dict[str, Any]]: + """Parse a uavcan.equipment.air_data.StaticTemperature message. + + Returns + ------- + dict with keys: + temperature_k float Kelvin + """ + if msg is None: + return None + try: + return { + "temperature_k": float(_getf(msg, "static_temperature", default=273.15)), + } + except Exception: + return None + + +# ── NodeStatus ──────────────────────────────────────────────────────────────── + +def parse_node_status(msg: Any, source_node_id: int) -> Optional[Dict[str, Any]]: + """Parse a uavcan.protocol.NodeStatus message. + + Returns + ------- + dict with keys: + node_id int + uptime_sec int + health int 0=OK .. 3=CRITICAL + health_label str + mode int + mode_label str + vendor_code int + """ + if msg is None: + return None + try: + health = int(_getf(msg, "health", default=0)) + mode = int(_getf(msg, "mode", default=0)) + uptime = int(_getf(msg, "uptime_sec", default=0)) + vendor_code = int(_getf(msg, "vendor_specific_status_code", default=0)) + return { + "node_id": source_node_id, + "uptime_sec": uptime, + "health": health, + "health_label": HEALTH_LABEL.get(health, f"UNKNOWN({health})"), + "mode": mode, + "mode_label": MODE_LABEL.get(mode, f"UNKNOWN({mode})"), + "vendor_code": vendor_code, + } + except Exception: + return None + + +# ── RTCM stream chunking ────────────────────────────────────────────────────── + +def chunk_rtcm(data: bytes, sequence_id: int) -> List[Dict[str, Any]]: + """Split a raw RTCM byte string into DroneCAN RTCMStream message dicts. + + Each chunk carries up to RTCM_MAX_CHUNK bytes. The caller must convert + these dicts into actual dronecan.uavcan.equipment.gnss.RTCMStream objects + and broadcast them on the bus. + + Parameters + ---------- + data : raw RTCM correction bytes + sequence_id : monotonically increasing counter (uint16, wraps at 65535) + + Returns + ------- + List of dicts: {"sequence_id": int, "data": bytes} + """ + chunks = [] + for i in range(0, len(data), RTCM_MAX_CHUNK): + chunks.append({ + "sequence_id": sequence_id & 0xFFFF, + "data": data[i:i + RTCM_MAX_CHUNK], + }) + sequence_id += 1 + return chunks diff --git a/jetson/ros2_ws/src/saltybot_dronecan_gps/saltybot_dronecan_gps/here4_node.py b/jetson/ros2_ws/src/saltybot_dronecan_gps/saltybot_dronecan_gps/here4_node.py new file mode 100644 index 0000000..627734c --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dronecan_gps/saltybot_dronecan_gps/here4_node.py @@ -0,0 +1,532 @@ +"""here4_node.py — ROS2 node for Here4 GPS via DroneCAN/UAVCAN v0 on CANable2. + +bd-p47c: CANable2 (freed from ESP32 BALANCE comms by bd-wim1) is now used +for Here4 GPS at 1 Mbps DroneCAN. + +CAN setup (run once before launching, or use bring_up_can:=true arg) +----------------------------------------------------------------------- + sudo ip link set can0 down + sudo ip link set can0 type can bitrate 1000000 + sudo ip link set can0 up + +DroneCAN messages received from Here4 +-------------------------------------- + uavcan.equipment.gnss.Fix2 DTID 1063 ~5 Hz + uavcan.equipment.gnss.Auxiliary DTID 1060 ~5 Hz + uavcan.equipment.ahrs.RawIMU DTID 1003 ~50 Hz + uavcan.equipment.ahrs.MagneticFieldStrength2 DTID 1001 ~10 Hz + uavcan.equipment.air_data.StaticPressure DTID 1028 ~10 Hz + uavcan.equipment.air_data.StaticTemperature DTID 1029 ~10 Hz + uavcan.protocol.NodeStatus DTID 341 ~1 Hz + +DroneCAN messages transmitted to Here4 +---------------------------------------- + uavcan.equipment.gnss.RTCMStream DTID 1062 on /rtcm input + +ROS2 topics published +---------------------- + /gps/fix sensor_msgs/NavSatFix 5 Hz — feeds navsat_transform_node + /gps/velocity geometry_msgs/TwistWithCovarianceStamped 5 Hz + /here4/fix sensor_msgs/NavSatFix alias of /gps/fix + /here4/imu sensor_msgs/Imu 50 Hz + /here4/mag sensor_msgs/MagneticField 10 Hz + /here4/baro sensor_msgs/FluidPressure 10 Hz + /here4/status std_msgs/String JSON 1 Hz (node ID, health, fix quality) + /here4/node_id std_msgs/Int32 node ID once discovered + +ROS2 topics subscribed +----------------------- + /rtcm std_msgs/ByteMultiArray — RTCM corrections (from NTRIP client) + or std_msgs/String (hex-encoded, fallback) + +Parameters +---------- + can_interface str SocketCAN interface name default: can0 + local_node_id int This node's DroneCAN ID (1-127) default: 126 + bitrate int CAN bitrate, bps default: 1000000 + bring_up_can bool Bring up can_interface via ip default: false + node_id_filter int Specific Here4 node ID (0=auto) default: 0 + fix_frame_id str Frame ID for NavSatFix default: gps + imu_frame_id str Frame ID for Imu default: here4_imu +""" + +from __future__ import annotations + +import json +import math +import subprocess +import threading +import time +from typing import Any, Optional + +import rclpy +from rclpy.node import Node +from geometry_msgs.msg import TwistWithCovarianceStamped +from sensor_msgs.msg import FluidPressure, Imu, MagneticField, NavSatFix, NavSatStatus +from std_msgs.msg import ByteMultiArray, Int32, String + +from .dronecan_parser import ( + FIX_3D, + FIX_NO_FIX, + chunk_rtcm, + parse_fix2, + parse_mag, + parse_raw_imu, + parse_static_pressure, + parse_static_temperature, + parse_node_status, +) + +# Gauss → Tesla conversion +_GA_TO_T = 1e-4 + +# NavSatStatus mapping from DroneCAN fix type +_FIX_TO_NAV_STATUS = { + 0: NavSatStatus.STATUS_NO_FIX, # NO_FIX + 1: NavSatStatus.STATUS_NO_FIX, # TIME_ONLY + 2: NavSatStatus.STATUS_FIX, # 2D_FIX + 3: NavSatStatus.STATUS_FIX, # 3D_FIX (upgraded to SBAS/RTK when applicable) +} + +# Here4 supports GPS+GLONASS+BeiDou+Galileo +_SERVICE_FLAGS = (NavSatStatus.SERVICE_GPS | NavSatStatus.SERVICE_GLONASS | + NavSatStatus.SERVICE_GALILEO | NavSatStatus.SERVICE_COMPASS) + +# RTCM injection sequence counter (per-session, wraps at 65535) +_rtcm_seq = 0 +_rtcm_lock = threading.Lock() + + +def _bring_up_can(iface: str, bitrate: int, logger) -> bool: + """Bring up the SocketCAN interface at the requested bitrate.""" + try: + subprocess.run(["ip", "link", "set", iface, "down"], check=False) + subprocess.run(["ip", "link", "set", iface, "type", "can", + "bitrate", str(bitrate)], check=True) + subprocess.run(["ip", "link", "set", iface, "up"], check=True) + logger.info(f"SocketCAN {iface} up at {bitrate} bps") + return True + except subprocess.CalledProcessError as exc: + logger.error(f"Failed to bring up {iface}: {exc} — check sudo/udev rules") + return False + + +class Here4CanNode(Node): + """DroneCAN bridge publishing Here4 GPS, IMU, mag, and baro to ROS2.""" + + def __init__(self) -> None: + super().__init__("here4_can_node") + + # ── Parameters ──────────────────────────────────────────────────── + self.declare_parameter("can_interface", "can0") + self.declare_parameter("local_node_id", 126) + self.declare_parameter("bitrate", 1_000_000) + self.declare_parameter("bring_up_can", False) + self.declare_parameter("node_id_filter", 0) # 0 = auto-detect + self.declare_parameter("fix_frame_id", "gps") + self.declare_parameter("imu_frame_id", "here4_imu") + + self._iface = self.get_parameter("can_interface").value + self._local_nid = self.get_parameter("local_node_id").value + self._bitrate = self.get_parameter("bitrate").value + self._bring_up = self.get_parameter("bring_up_can").value + self._nid_filter = self.get_parameter("node_id_filter").value + self._fix_frame = self.get_parameter("fix_frame_id").value + self._imu_frame = self.get_parameter("imu_frame_id").value + + # ── State ───────────────────────────────────────────────────────── + self._here4_nid: Optional[int] = self._nid_filter if self._nid_filter else None + self._last_fix: Optional[dict] = None + self._last_temp_k: float = 293.15 # 20 °C default until first reading + self._dc_node = None # dronecan node, set up after CAN is ready + + # ── Publishers ──────────────────────────────────────────────────── + self._pub_fix = self.create_publisher(NavSatFix, + "/gps/fix", 10) + self._pub_fix_h4 = self.create_publisher(NavSatFix, + "/here4/fix", 10) + self._pub_vel = self.create_publisher(TwistWithCovarianceStamped, + "/gps/velocity", 10) + self._pub_imu = self.create_publisher(Imu, + "/here4/imu", 10) + self._pub_mag = self.create_publisher(MagneticField, + "/here4/mag", 10) + self._pub_baro = self.create_publisher(FluidPressure, + "/here4/baro", 10) + self._pub_status = self.create_publisher(String, + "/here4/status", 10) + self._pub_node_id = self.create_publisher(Int32, + "/here4/node_id", 10) + + # ── Subscriptions ───────────────────────────────────────────────── + self.create_subscription(ByteMultiArray, "/rtcm", + self._on_rtcm_bytes, 10) + self.create_subscription(String, "/rtcm_hex", + self._on_rtcm_hex, 10) + + # ── Bring up CAN if requested ───────────────────────────────────── + if self._bring_up: + _bring_up_can(self._iface, self._bitrate, self.get_logger()) + + # ── Initialise DroneCAN node ────────────────────────────────────── + self._init_dronecan() + + # ── Timers ──────────────────────────────────────────────────────── + # Poll DroneCAN at 200 Hz (5 ms) to drain the RX queue in real-time + self.create_timer(0.005, self._spin_dronecan) + # Status publish at 1 Hz + self.create_timer(1.0, self._publish_status) + + # ── DroneCAN initialisation ─────────────────────────────────────────── + + def _init_dronecan(self) -> None: + try: + import dronecan # type: ignore[import] + except ImportError: + self.get_logger().error( + "dronecan package not found — install with: pip install dronecan\n" + "Here4 node will not receive GPS data until dronecan is installed." + ) + return + + try: + self._dc_node = dronecan.make_node( + f"socketcan:{self._iface}", + node_id=self._local_nid, + bitrate=self._bitrate, + ) + except Exception as exc: + self.get_logger().error( + f"Failed to open DroneCAN on {self._iface}: {exc}\n" + f"Verify: ip link show {self._iface} and try bring_up_can:=true" + ) + return + + # ── Register message handlers ────────────────────────────────────── + dc = dronecan + self._dc_node.add_handler(dc.uavcan.equipment.gnss.Fix2, + self._on_fix2) + self._dc_node.add_handler(dc.uavcan.equipment.ahrs.RawIMU, + self._on_raw_imu) + self._dc_node.add_handler(dc.uavcan.equipment.ahrs.MagneticFieldStrength2, + self._on_mag) + self._dc_node.add_handler(dc.uavcan.equipment.air_data.StaticPressure, + self._on_pressure) + self._dc_node.add_handler(dc.uavcan.equipment.air_data.StaticTemperature, + self._on_temperature) + self._dc_node.add_handler(dc.uavcan.protocol.NodeStatus, + self._on_node_status) + + self.get_logger().info( + f"DroneCAN node ready: {self._iface} @ {self._bitrate} bps " + f"local_id={self._local_nid}" + ) + + # ── DroneCAN spin (called from 5 ms timer) ──────────────────────────── + + def _spin_dronecan(self) -> None: + if self._dc_node is None: + return + try: + self._dc_node.spin(timeout=0) + except Exception as exc: + self.get_logger().warning( + f"DroneCAN spin error: {exc}", + throttle_duration_sec=5.0, + ) + + # ── Source node filter ──────────────────────────────────────────────── + + def _accept(self, event: Any) -> bool: # type: ignore[valid-type] + """Return True if this event comes from the tracked Here4 node.""" + src = getattr(getattr(event, "transfer", None), "source_node_id", None) + if src is None: + return True # unknown source — accept + if self._here4_nid is None: + return True # auto-detect mode — accept any + return src == self._here4_nid + + # ── DroneCAN message handlers ───────────────────────────────────────── + + def _on_fix2(self, event) -> None: + src = getattr(getattr(event, "transfer", None), "source_node_id", None) + + # Auto-latch: first node that sends Fix2 becomes our Here4 + if self._here4_nid is None and src is not None: + self._here4_nid = src + self.get_logger().info( + f"Here4 auto-discovered: DroneCAN node ID {src}" + ) + nid_msg = Int32() + nid_msg.data = src + self._pub_node_id.publish(nid_msg) + + if not self._accept(event): + return + d = parse_fix2(event.message) + if d is None: + return + self._last_fix = d + stamp = self.get_clock().now().to_msg() + self._publish_navsatfix(d, stamp) + self._publish_velocity(d, stamp) + + def _on_raw_imu(self, event) -> None: + if not self._accept(event): + return + d = parse_raw_imu(event.message) + if d is None: + return + stamp = self.get_clock().now().to_msg() + msg = Imu() + msg.header.stamp = stamp + msg.header.frame_id = self._imu_frame + + # Orientation: not provided by DroneCAN RawIMU — signal unknown + msg.orientation_covariance[0] = -1.0 + + gx, gy, gz = d["gyro_xyz"] + msg.angular_velocity.x = gx + msg.angular_velocity.y = gy + msg.angular_velocity.z = gz + _fill_cov9(msg.angular_velocity_covariance, d["gyro_cov"]) + + ax, ay, az = d["accel_xyz"] + msg.linear_acceleration.x = ax + msg.linear_acceleration.y = ay + msg.linear_acceleration.z = az + _fill_cov9(msg.linear_acceleration_covariance, d["accel_cov"]) + + self._pub_imu.publish(msg) + + def _on_mag(self, event) -> None: + if not self._accept(event): + return + d = parse_mag(event.message) + if d is None: + return + stamp = self.get_clock().now().to_msg() + msg = MagneticField() + msg.header.stamp = stamp + msg.header.frame_id = self._imu_frame + gx, gy, gz = d["field_ga"] + msg.magnetic_field.x = gx * _GA_TO_T + msg.magnetic_field.y = gy * _GA_TO_T + msg.magnetic_field.z = gz * _GA_TO_T + _fill_cov9(msg.magnetic_field_covariance, + [v * _GA_TO_T * _GA_TO_T for v in d["field_cov"]]) + self._pub_mag.publish(msg) + + def _on_pressure(self, event) -> None: + if not self._accept(event): + return + d = parse_static_pressure(event.message) + if d is None: + return + stamp = self.get_clock().now().to_msg() + msg = FluidPressure() + msg.header.stamp = stamp + msg.header.frame_id = self._imu_frame + msg.fluid_pressure = d["pressure_pa"] + msg.variance = d["pressure_variance"] + self._pub_baro.publish(msg) + + def _on_temperature(self, event) -> None: + if not self._accept(event): + return + d = parse_static_temperature(event.message) + if d is None: + return + self._last_temp_k = d["temperature_k"] + + def _on_node_status(self, event) -> None: + src = getattr(getattr(event, "transfer", None), "source_node_id", None) + if src is None: + return + d = parse_node_status(event.message, src) + if d is None: + return + + # Auto-discovery: latch first node that sends GPS-relevant data + # NodeStatus alone is not enough — we track whichever source_node_id + # first delivered a Fix2 message; if none yet, log all nodes seen. + if self._here4_nid is None: + # Any node could be Here4 — we'll latch on first Fix2 source + self.get_logger().debug( + f"DroneCAN node {src} alive: health={d['health_label']} " + f"mode={d['mode_label']} uptime={d['uptime_sec']}s" + ) + elif src == self._here4_nid: + self._last_node_status = d + + # ── NavSatFix publishing ───────────────────────────────────────────── + + def _publish_navsatfix(self, d: dict, stamp) -> None: + fix_type = d["fix_type"] + + status = NavSatStatus() + status.service = _SERVICE_FLAGS + + # Upgrade status for RTK: hdop < 0.5 and 3D fix → treat as SBAS/RTK + if fix_type == FIX_3D: + if d["hdop"] < 0.5: + status.status = NavSatStatus.STATUS_GBAS_FIX # RTK fixed + elif d["hdop"] < 1.5: + status.status = NavSatStatus.STATUS_SBAS_FIX # SBAS/DGPS + else: + status.status = NavSatStatus.STATUS_FIX + else: + status.status = _FIX_TO_NAV_STATUS.get(fix_type, NavSatStatus.STATUS_NO_FIX) + + msg = NavSatFix() + msg.header.stamp = stamp + msg.header.frame_id = self._fix_frame + msg.status = status + msg.latitude = d["lat_deg"] + msg.longitude = d["lon_deg"] + msg.altitude = d["alt_msl_m"] + + # Position covariance (diagonal from DroneCAN; full matrix if available) + pos_cov = d["pos_cov"] + msg.position_covariance_type = ( + NavSatFix.COVARIANCE_TYPE_FULL if any(pos_cov) else + NavSatFix.COVARIANCE_TYPE_APPROXIMATED + ) + if any(pos_cov): + for i, v in enumerate(pos_cov): + msg.position_covariance[i] = v + else: + # Approximate from HDOP/VDOP: σ_h ≈ hdop × 3m, σ_v ≈ vdop × 5m + sh2 = (d["hdop"] * 3.0) ** 2 + sv2 = (d["vdop"] * 5.0) ** 2 + msg.position_covariance[0] = sh2 # east + msg.position_covariance[4] = sh2 # north + msg.position_covariance[8] = sv2 # up + + self._pub_fix.publish(msg) + self._pub_fix_h4.publish(msg) + + def _publish_velocity(self, d: dict, stamp) -> None: + vn, ve, vd = d["ned_vel"] + msg = TwistWithCovarianceStamped() + msg.header.stamp = stamp + msg.header.frame_id = self._fix_frame + # NED → ENU: x=east=ve, y=north=vn, z=up=-vd + msg.twist.twist.linear.x = ve + msg.twist.twist.linear.y = vn + msg.twist.twist.linear.z = -vd + # Velocity covariance: approximate from HDOP + vel_var = (d["hdop"] * 0.2) ** 2 + msg.twist.covariance[0] = vel_var # xx (east) + msg.twist.covariance[7] = vel_var # yy (north) + msg.twist.covariance[14] = (d["vdop"] * 0.3) ** 2 # zz (up) + self._pub_vel.publish(msg) + + # ── Status publish (1 Hz) ──────────────────────────────────────────── + + def _publish_status(self) -> None: + fix = self._last_fix + ns = getattr(self, "_last_node_status", None) + + payload: dict = { + "here4_node_id": self._here4_nid, + "can_interface": self._iface, + "dc_node_ready": self._dc_node is not None, + "fix_type": fix["fix_type"] if fix else FIX_NO_FIX, + "fix_label": fix["fix_label"] if fix else "NO_FIX", + "sats_used": fix["sats_used"] if fix else 0, + "hdop": round(fix["hdop"], 2) if fix else 99.0, + "lat_deg": round(fix["lat_deg"], 7) if fix else 0.0, + "lon_deg": round(fix["lon_deg"], 7) if fix else 0.0, + "alt_msl_m": round(fix["alt_msl_m"], 2) if fix else 0.0, + "temp_c": round(self._last_temp_k - 273.15, 1), + "node_health": ns["health_label"] if ns else "UNKNOWN", + "node_uptime_s": ns["uptime_sec"] if ns else 0, + } + msg = String() + msg.data = json.dumps(payload) + self._pub_status.publish(msg) + + # Also publish discovered node ID + if self._here4_nid is not None: + nid_msg = Int32() + nid_msg.data = self._here4_nid + self._pub_node_id.publish(nid_msg) + + # ── RTCM injection ──────────────────────────────────────────────────── + + def _on_rtcm_bytes(self, msg: ByteMultiArray) -> None: + """Receive RTCM bytes from /rtcm and inject via DroneCAN RTCMStream.""" + raw = bytes(msg.data) + self._inject_rtcm(raw) + + def _on_rtcm_hex(self, msg: String) -> None: + """Receive hex-encoded RTCM from /rtcm_hex (fallback topic).""" + try: + raw = bytes.fromhex(msg.data.strip()) + except ValueError as exc: + self.get_logger().error(f"Bad /rtcm_hex: {exc}") + return + self._inject_rtcm(raw) + + def _inject_rtcm(self, raw: bytes) -> None: + if self._dc_node is None or not raw: + return + + global _rtcm_seq + try: + import dronecan # type: ignore[import] + except ImportError: + return + + with _rtcm_lock: + chunks = chunk_rtcm(raw, _rtcm_seq) + _rtcm_seq = (_rtcm_seq + len(chunks)) & 0xFFFF + + for chunk in chunks: + try: + rtcm_msg = dronecan.uavcan.equipment.gnss.RTCMStream() + rtcm_msg.sequence_id = chunk["sequence_id"] + rtcm_msg.data = list(chunk["data"]) + self._dc_node.broadcast(rtcm_msg) + except Exception as exc: + self.get_logger().warning( + f"RTCM inject error (seq {chunk['sequence_id']}): {exc}", + throttle_duration_sec=2.0, + ) + + # ── Shutdown ────────────────────────────────────────────────────────── + + def destroy_node(self) -> None: + if self._dc_node is not None: + try: + self._dc_node.close() + except Exception: + pass + super().destroy_node() + + +# ── Covariance helper ───────────────────────────────────────────────────────── + +def _fill_cov9(ros_cov, values) -> None: + """Fill a 9-element ROS2 covariance array from a list.""" + for i, v in enumerate(values[:9]): + ros_cov[i] = float(v) + + +# ── Entry point ─────────────────────────────────────────────────────────────── + +def main(args=None) -> None: + rclpy.init(args=args) + node = Here4CanNode() + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() diff --git a/jetson/ros2_ws/src/saltybot_dronecan_gps/setup.cfg b/jetson/ros2_ws/src/saltybot_dronecan_gps/setup.cfg new file mode 100644 index 0000000..36a667b --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dronecan_gps/setup.cfg @@ -0,0 +1,5 @@ +[develop] +script_dir=$base/lib/saltybot_dronecan_gps + +[install] +install_scripts=$base/lib/saltybot_dronecan_gps diff --git a/jetson/ros2_ws/src/saltybot_dronecan_gps/setup.py b/jetson/ros2_ws/src/saltybot_dronecan_gps/setup.py new file mode 100644 index 0000000..7af2f5f --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dronecan_gps/setup.py @@ -0,0 +1,27 @@ +from setuptools import setup + +package_name = "saltybot_dronecan_gps" + +setup( + name=package_name, + version="0.1.0", + packages=[package_name], + data_files=[ + ("share/ament_index/resource_index/packages", [f"resource/{package_name}"]), + (f"share/{package_name}", ["package.xml"]), + (f"share/{package_name}/config", ["config/here4_params.yaml"]), + (f"share/{package_name}/launch", ["launch/here4.launch.py"]), + ], + install_requires=["setuptools"], + zip_safe=True, + maintainer="sl-perception", + maintainer_email="sl-perception@saltylab.local", + description="DroneCAN/UAVCAN v0 bridge for Here4 GPS via CANable2 (bd-p47c)", + license="MIT", + tests_require=["pytest"], + entry_points={ + "console_scripts": [ + "here4_node = saltybot_dronecan_gps.here4_node:main", + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_dronecan_gps/test/__init__.py b/jetson/ros2_ws/src/saltybot_dronecan_gps/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_dronecan_gps/test/test_dronecan_parser.py b/jetson/ros2_ws/src/saltybot_dronecan_gps/test/test_dronecan_parser.py new file mode 100644 index 0000000..fac260f --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_dronecan_gps/test/test_dronecan_parser.py @@ -0,0 +1,367 @@ +#!/usr/bin/env python3 +"""Unit tests for saltybot_dronecan_gps.dronecan_parser. + +No dronecan library, ROS2, or CAN hardware required. +Uses simple Python objects (namespaces) to simulate dronecan message objects. + +Run with: pytest test/ -v +""" + +import math +import types +import unittest + +from saltybot_dronecan_gps.dronecan_parser import ( + DTID_FIX2, + DTID_RAW_IMU, + DTID_MAG_FIELD, + DTID_STATIC_PRESSURE, + DTID_STATIC_TEMPERATURE, + DTID_NODE_STATUS, + DTID_RTCM_STREAM, + FIX_NO_FIX, + FIX_TIME_ONLY, + FIX_2D, + FIX_3D, + RTCM_MAX_CHUNK, + chunk_rtcm, + parse_fix2, + parse_mag, + parse_node_status, + parse_raw_imu, + parse_static_pressure, + parse_static_temperature, + _getf, + _list3, + _list9, +) + + +# ── Helpers ─────────────────────────────────────────────────────────────────── + +def _ns(**kwargs) -> types.SimpleNamespace: + """Build a simple namespace to mimic a dronecan message object.""" + return types.SimpleNamespace(**kwargs) + + +def _ns_vec(*values): + """Return an object that is indexable like a dronecan vector field.""" + return list(values) + + +# ── DTID constants ──────────────────────────────────────────────────────────── + +class TestDtidConstants(unittest.TestCase): + def test_known_values(self): + self.assertEqual(DTID_FIX2, 1063) + self.assertEqual(DTID_RAW_IMU, 1003) + self.assertEqual(DTID_MAG_FIELD, 1001) + self.assertEqual(DTID_STATIC_PRESSURE, 1028) + self.assertEqual(DTID_STATIC_TEMPERATURE, 1029) + self.assertEqual(DTID_NODE_STATUS, 341) + self.assertEqual(DTID_RTCM_STREAM, 1062) + + def test_rtcm_chunk_size(self): + self.assertEqual(RTCM_MAX_CHUNK, 128) + + +# ── _getf / _list3 / _list9 ────────────────────────────────────────────────── + +class TestHelpers(unittest.TestCase): + def test_getf_simple(self): + obj = _ns(x=42) + self.assertEqual(_getf(obj, "x"), 42) + + def test_getf_missing(self): + obj = _ns(x=1) + self.assertIsNone(_getf(obj, "y")) + + def test_getf_default(self): + obj = _ns() + self.assertEqual(_getf(obj, "missing", default=99), 99) + + def test_list3_ok(self): + self.assertEqual(_list3([1.0, 2.0, 3.0]), [1.0, 2.0, 3.0]) + + def test_list3_bad(self): + self.assertEqual(_list3(None), [0.0, 0.0, 0.0]) + + def test_list9_ok(self): + vals = list(range(9)) + self.assertEqual(_list9(vals), [float(v) for v in vals]) + + def test_list9_short(self): + self.assertEqual(_list9(None), [0.0] * 9) + + +# ── parse_fix2 ──────────────────────────────────────────────────────────────── + +class TestParseFix2(unittest.TestCase): + + def _make_fix(self, lat_1e8=0, lon_1e8=0, alt_msl_mm=0, alt_ell_mm=0, + ned_vel=None, status=FIX_3D, sats=12, + pdop=1.2, hdop=0.9, vdop=1.5, + pos_cov=None, undulation=5.0, gnss_ts=0): + return _ns( + latitude_deg_1e8 = lat_1e8, + longitude_deg_1e8 = lon_1e8, + height_msl_mm = alt_msl_mm, + height_ellipsoid_mm = alt_ell_mm, + ned_velocity = ned_vel if ned_vel is not None else _ns_vec(0.0, 0.0, 0.0), + status = status, + sats_used = sats, + pdop = pdop, + hdop = hdop, + vdop = vdop, + position_covariance = pos_cov, + undulation = undulation, + gnss_timestamp = gnss_ts, + ) + + def test_basic_3d_fix(self): + msg = self._make_fix( + lat_1e8 = 4_576_543_210, # 45.7654321° + lon_1e8 = -7_345_678_901, # -73.45678901° + alt_msl_mm = 50_000, # 50 m + alt_ell_mm = 52_000, # 52 m + ned_vel = _ns_vec(1.5, -0.3, 0.0), + sats = 14, + hdop = 0.8, + ) + d = parse_fix2(msg) + self.assertIsNotNone(d) + self.assertAlmostEqual(d["lat_deg"], 45.7654321, places=5) + self.assertAlmostEqual(d["lon_deg"], -73.45678901, places=5) + self.assertAlmostEqual(d["alt_msl_m"], 50.0, places=3) + self.assertAlmostEqual(d["alt_ellipsoid_m"], 52.0, places=3) + self.assertEqual(d["ned_vel"], [1.5, -0.3, 0.0]) + self.assertEqual(d["fix_type"], FIX_3D) + self.assertEqual(d["fix_label"], "3D_FIX") + self.assertEqual(d["sats_used"], 14) + self.assertAlmostEqual(d["hdop"], 0.8, places=5) + + def test_no_fix(self): + msg = self._make_fix(status=FIX_NO_FIX, sats=0) + d = parse_fix2(msg) + self.assertIsNotNone(d) + self.assertEqual(d["fix_type"], FIX_NO_FIX) + self.assertEqual(d["fix_label"], "NO_FIX") + self.assertEqual(d["sats_used"], 0) + + def test_time_only(self): + msg = self._make_fix(status=FIX_TIME_ONLY) + d = parse_fix2(msg) + self.assertEqual(d["fix_label"], "TIME_ONLY") + + def test_2d_fix(self): + msg = self._make_fix(status=FIX_2D) + d = parse_fix2(msg) + self.assertEqual(d["fix_label"], "2D_FIX") + + def test_position_covariance(self): + cov = [float(i) for i in range(9)] + msg = self._make_fix(pos_cov=cov) + d = parse_fix2(msg) + self.assertEqual(d["pos_cov"], cov) + + def test_no_covariance_returns_zeros(self): + msg = self._make_fix(pos_cov=None) + d = parse_fix2(msg) + self.assertEqual(d["pos_cov"], [0.0] * 9) + + def test_zero_lat_lon(self): + msg = self._make_fix(lat_1e8=0, lon_1e8=0) + d = parse_fix2(msg) + self.assertAlmostEqual(d["lat_deg"], 0.0, places=8) + self.assertAlmostEqual(d["lon_deg"], 0.0, places=8) + + def test_returns_none_on_bad_input(self): + self.assertIsNone(parse_fix2(None)) + + def test_gnss_timestamp(self): + msg = self._make_fix(gnss_ts=1_000_000_000) + d = parse_fix2(msg) + self.assertEqual(d["gnss_timestamp_us"], 1_000_000_000) + + +# ── parse_raw_imu ───────────────────────────────────────────────────────────── + +class TestParseRawImu(unittest.TestCase): + + def _make_imu(self, gyro=None, accel=None, dt=0.02, + gyro_cov=None, accel_cov=None): + return _ns( + rate_gyro_latest = gyro if gyro else _ns_vec(0.0, 0.0, 0.0), + accelerometer_latest = accel if accel else _ns_vec(0.0, 0.0, 9.81), + integration_interval = dt, + rate_gyro_covariance = gyro_cov if gyro_cov else [0.0] * 9, + accelerometer_covariance = accel_cov if accel_cov else [0.0] * 9, + ) + + def test_basic(self): + msg = self._make_imu( + gyro = _ns_vec(0.01, -0.02, 0.03), + accel = _ns_vec(0.1, -0.2, 9.8), + dt = 0.02, + ) + d = parse_raw_imu(msg) + self.assertIsNotNone(d) + self.assertEqual(d["gyro_xyz"], [0.01, -0.02, 0.03]) + self.assertEqual(d["accel_xyz"], [0.1, -0.2, 9.8]) + self.assertAlmostEqual(d["dt_s"], 0.02) + + def test_covariances_returned(self): + cov = [float(i * 0.001) for i in range(9)] + msg = self._make_imu(gyro_cov=cov, accel_cov=cov) + d = parse_raw_imu(msg) + self.assertEqual(d["gyro_cov"], cov) + self.assertEqual(d["accel_cov"], cov) + + def test_none_input(self): + self.assertIsNone(parse_raw_imu(None)) + + +# ── parse_mag ───────────────────────────────────────────────────────────────── + +class TestParseMag(unittest.TestCase): + + def _make_mag(self, sensor_id=0, field=None, cov=None): + return _ns( + sensor_id = sensor_id, + magnetic_field_ga = field if field else _ns_vec(0.1, 0.2, -0.5), + magnetic_field_covariance = cov if cov else [0.0] * 9, + ) + + def test_basic(self): + msg = self._make_mag(sensor_id=1, field=_ns_vec(0.1, 0.2, -0.5)) + d = parse_mag(msg) + self.assertIsNotNone(d) + self.assertEqual(d["sensor_id"], 1) + self.assertEqual(d["field_ga"], [0.1, 0.2, -0.5]) + + def test_none_input(self): + self.assertIsNone(parse_mag(None)) + + +# ── parse_static_pressure ──────────────────────────────────────────────────── + +class TestParseStaticPressure(unittest.TestCase): + + def test_basic(self): + msg = _ns(static_pressure=101325.0, static_pressure_variance=100.0) + d = parse_static_pressure(msg) + self.assertIsNotNone(d) + self.assertAlmostEqual(d["pressure_pa"], 101325.0) + self.assertAlmostEqual(d["pressure_variance"], 100.0) + + def test_none_input(self): + self.assertIsNone(parse_static_pressure(None)) + + +# ── parse_static_temperature ───────────────────────────────────────────────── + +class TestParseStaticTemperature(unittest.TestCase): + + def test_celsius_to_kelvin(self): + msg = _ns(static_temperature=293.15) # 20 °C + d = parse_static_temperature(msg) + self.assertIsNotNone(d) + self.assertAlmostEqual(d["temperature_k"], 293.15) + + def test_none_input(self): + self.assertIsNone(parse_static_temperature(None)) + + +# ── parse_node_status ──────────────────────────────────────────────────────── + +class TestParseNodeStatus(unittest.TestCase): + + def _make_status(self, health=0, mode=0, uptime=120, vendor_code=0): + return _ns( + health = health, + mode = mode, + uptime_sec = uptime, + vendor_specific_status_code = vendor_code, + ) + + def test_ok_operational(self): + msg = self._make_status(health=0, mode=0, uptime=300) + d = parse_node_status(msg, source_node_id=10) + self.assertIsNotNone(d) + self.assertEqual(d["node_id"], 10) + self.assertEqual(d["health"], 0) + self.assertEqual(d["health_label"], "OK") + self.assertEqual(d["mode_label"], "OPERATIONAL") + self.assertEqual(d["uptime_sec"], 300) + + def test_error_state(self): + msg = self._make_status(health=2, mode=0) + d = parse_node_status(msg, source_node_id=5) + self.assertEqual(d["health_label"], "ERROR") + + def test_critical_state(self): + msg = self._make_status(health=3) + d = parse_node_status(msg, 7) + self.assertEqual(d["health_label"], "CRITICAL") + + def test_none_input(self): + self.assertIsNone(parse_node_status(None, 1)) + + +# ── chunk_rtcm ──────────────────────────────────────────────────────────────── + +class TestChunkRtcm(unittest.TestCase): + + def test_empty(self): + chunks = chunk_rtcm(b"", 0) + self.assertEqual(chunks, []) + + def test_single_chunk(self): + data = bytes(range(64)) + chunks = chunk_rtcm(data, 5) + self.assertEqual(len(chunks), 1) + self.assertEqual(chunks[0]["sequence_id"], 5) + self.assertEqual(chunks[0]["data"], data) + + def test_exact_chunk_boundary(self): + data = bytes(RTCM_MAX_CHUNK) + chunks = chunk_rtcm(data, 0) + self.assertEqual(len(chunks), 1) + self.assertEqual(len(chunks[0]["data"]), RTCM_MAX_CHUNK) + + def test_two_chunks(self): + data = bytes(RTCM_MAX_CHUNK + 1) + chunks = chunk_rtcm(data, 0) + self.assertEqual(len(chunks), 2) + self.assertEqual(len(chunks[0]["data"]), RTCM_MAX_CHUNK) + self.assertEqual(len(chunks[1]["data"]), 1) + + def test_sequence_id_increments(self): + data = bytes(RTCM_MAX_CHUNK * 3) + chunks = chunk_rtcm(data, 10) + for i, chunk in enumerate(chunks): + self.assertEqual(chunk["sequence_id"], 10 + i) + + def test_sequence_id_wraps_at_16bit(self): + data = bytes(RTCM_MAX_CHUNK * 2) + chunks = chunk_rtcm(data, 0xFFFF) + self.assertEqual(chunks[0]["sequence_id"], 0xFFFF) + self.assertEqual(chunks[1]["sequence_id"], 0x0000) # wraps + + def test_data_round_trip(self): + original = bytes(range(200)) + chunks = chunk_rtcm(original, 0) + reassembled = b"".join(c["data"] for c in chunks) + self.assertEqual(reassembled, original) + + def test_large_message(self): + data = bytes(1024) + chunks = chunk_rtcm(data, 0) + total_bytes = sum(len(c["data"]) for c in chunks) + self.assertEqual(total_bytes, 1024) + for c in chunks: + self.assertLessEqual(len(c["data"]), RTCM_MAX_CHUNK) + + +if __name__ == "__main__": + unittest.main()