From 7dcdd2088a17e86174b95451d7e557874fdf5334 Mon Sep 17 00:00:00 2001 From: sl-perception Date: Mon, 2 Mar 2026 10:36:18 -0500 Subject: [PATCH] feat(jetson): add saltybot_night_vision package (issue #168) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements ambient-light-aware night vision mode for the D435i + IMX219 stack on the Jetson Orin Nano Super: • light_analyser.py — histogram-based intensity FSM with hysteresis: day → twilight → night → ir_only • camera_controller.py — D435i IR emitter via realsense2_camera param service + IMX219 gain/exposure via v4l2-ctl • gpio_headlight.py — physical pin 33 headlight; Jetson.GPIO PWM primary, sysfs on/off fallback, sim mode • light_monitor_node.py — subscribes IMX219/IR1, publishes /saltybot/ambient_light + /saltybot/vision_mode • night_vision_controller_node.py — reacts to mode changes; drives D435i emitter, IMX219 gain, headlight • ir_slam_bridge_node.py — mono8 IR1 → rgb8 republish so RTAB-Map keeps loop-closing in darkness • launch/night_vision.launch.py + config/night_vision_params.yaml • test/test_night_vision.py — 18 unit tests (18/18 pass, no ROS2 needed) Co-Authored-By: Claude Sonnet 4.6 --- .../config/night_vision_params.yaml | 49 +++++ .../launch/night_vision.launch.py | 68 +++++++ .../src/saltybot_night_vision/package.xml | 27 +++ .../resource/saltybot_night_vision | 0 .../saltybot_night_vision/__init__.py | 0 .../camera_controller.py | 158 ++++++++++++++ .../saltybot_night_vision/gpio_headlight.py | 192 ++++++++++++++++++ .../ir_slam_bridge_node.py | 157 ++++++++++++++ .../saltybot_night_vision/light_analyser.py | 107 ++++++++++ .../light_monitor_node.py | 141 +++++++++++++ .../night_vision_controller_node.py | 137 +++++++++++++ .../src/saltybot_night_vision/setup.cfg | 4 + .../src/saltybot_night_vision/setup.py | 34 ++++ .../test/test_night_vision.py | 152 ++++++++++++++ 14 files changed, 1226 insertions(+) create mode 100644 jetson/ros2_ws/src/saltybot_night_vision/config/night_vision_params.yaml create mode 100644 jetson/ros2_ws/src/saltybot_night_vision/launch/night_vision.launch.py create mode 100644 jetson/ros2_ws/src/saltybot_night_vision/package.xml create mode 100644 jetson/ros2_ws/src/saltybot_night_vision/resource/saltybot_night_vision create mode 100644 jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/__init__.py create mode 100644 jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/camera_controller.py create mode 100644 jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/gpio_headlight.py create mode 100644 jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/ir_slam_bridge_node.py create mode 100644 jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/light_analyser.py create mode 100644 jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/light_monitor_node.py create mode 100644 jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/night_vision_controller_node.py create mode 100644 jetson/ros2_ws/src/saltybot_night_vision/setup.cfg create mode 100644 jetson/ros2_ws/src/saltybot_night_vision/setup.py create mode 100644 jetson/ros2_ws/src/saltybot_night_vision/test/test_night_vision.py diff --git a/jetson/ros2_ws/src/saltybot_night_vision/config/night_vision_params.yaml b/jetson/ros2_ws/src/saltybot_night_vision/config/night_vision_params.yaml new file mode 100644 index 0000000..fcb7206 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_night_vision/config/night_vision_params.yaml @@ -0,0 +1,49 @@ +# saltybot_night_vision — runtime parameters +# +# Ambient light detection requires ONE of: +# /camera/color/image_raw — IMX219 front camera (primary) +# /camera/infra1/image_rect_raw — D435i IR fallback +# +# D435i IR emitter is controlled via the realsense2_camera parameter service. +# Make sure the node is running as /camera/camera (default). +# +# GPIO headlight on physical pin 33 of the Jetson Orin Nano Super 40-pin header. +# Requires Jetson.GPIO (jetson-gpio pip package) or /sys/class/gpio write access. + +light_monitor: + ros__parameters: + primary_camera_topic: '/camera/color/image_raw' + fallback_ir_topic: '/camera/infra1/image_rect_raw' + publish_hz: 2.0 # ambient light evaluation rate + initial_mode: 'day' + history_len: 6 # frames to median-filter intensity + +night_vision_controller: + ros__parameters: + headlight_pin: 33 # physical board pin (Jetson 40-pin J26) + day_headlight: 0.0 # off + twilight_headlight: 0.25 # 25 % — subtle fill + night_headlight: 0.50 # 50 % — normal dark room + ir_only_headlight: 1.0 # 100 % — pitch black + +ir_slam_bridge: + ros__parameters: + active_modes: 'night,ir_only' # publish IR→RGB only in these modes + output_encoding: 'rgb8' # RTAB-Map expects rgb8 or bgr8 + + +# ── RTAB-Map integration note ────────────────────────────────────────────────── +# When the bridge is active (/saltybot/ir_bridge/image_raw), redirect RTAB-Map's +# RGB input by adding to slam_rtabmap.launch.py Node remappings: +# +# remappings=[ +# ('rgb/image', '/saltybot/ir_bridge/image_raw'), +# ('rgb/camera_info', '/saltybot/ir_bridge/camera_info'), +# ] +# +# Or set the topic at launch: +# ros2 launch saltybot_bringup slam_rtabmap.launch.py \ +# rgb_image_topic:=/saltybot/ir_bridge/image_raw +# +# This ensures loop-closure detection continues using the IR feature stream +# instead of a dark / blank colour image. diff --git a/jetson/ros2_ws/src/saltybot_night_vision/launch/night_vision.launch.py b/jetson/ros2_ws/src/saltybot_night_vision/launch/night_vision.launch.py new file mode 100644 index 0000000..84ecee6 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_night_vision/launch/night_vision.launch.py @@ -0,0 +1,68 @@ +""" +night_vision.launch.py — SaltyBot night vision stack. + +Starts: + light_monitor — histogram ambient-light analysis → /saltybot/vision_mode + night_vision_ctrl — D435i emitter + IMX219 gain + GPIO headlight + ir_slam_bridge — IR→RGB republish for RTAB-Map (active in dark modes) + +Launch args: + primary_camera_topic str '/camera/color/image_raw' + headlight_pin int '33' + publish_hz float '2.0' + active_ir_modes str 'night,ir_only' + +Verify: + ros2 topic echo /saltybot/vision_mode + ros2 topic echo /saltybot/ambient_light + ros2 topic echo /saltybot/night_vision_status + ros2 topic hz /saltybot/ir_bridge/image_raw # should be ~30 Hz in dark +""" + +from launch import LaunchDescription +from launch.actions import DeclareLaunchArgument +from launch.substitutions import LaunchConfiguration +from launch_ros.actions import Node + + +def generate_launch_description(): + args = [ + DeclareLaunchArgument('primary_camera_topic', + default_value='/camera/color/image_raw'), + DeclareLaunchArgument('headlight_pin', default_value='33'), + DeclareLaunchArgument('publish_hz', default_value='2.0'), + DeclareLaunchArgument('active_ir_modes', default_value='night,ir_only'), + ] + + light_monitor = Node( + package='saltybot_night_vision', + executable='light_monitor', + name='light_monitor', + output='screen', + parameters=[{ + 'primary_camera_topic': LaunchConfiguration('primary_camera_topic'), + 'publish_hz': LaunchConfiguration('publish_hz'), + }], + ) + + nv_ctrl = Node( + package='saltybot_night_vision', + executable='night_vision_ctrl', + name='night_vision_controller', + output='screen', + parameters=[{ + 'headlight_pin': LaunchConfiguration('headlight_pin'), + }], + ) + + ir_bridge = Node( + package='saltybot_night_vision', + executable='ir_slam_bridge', + name='ir_slam_bridge', + output='screen', + parameters=[{ + 'active_modes': LaunchConfiguration('active_ir_modes'), + }], + ) + + return LaunchDescription(args + [light_monitor, nv_ctrl, ir_bridge]) diff --git a/jetson/ros2_ws/src/saltybot_night_vision/package.xml b/jetson/ros2_ws/src/saltybot_night_vision/package.xml new file mode 100644 index 0000000..9490b2b --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_night_vision/package.xml @@ -0,0 +1,27 @@ + + + + saltybot_night_vision + 0.1.0 + + Night vision mode for SaltyBot: ambient-light histogram detection, + D435i IR emitter control, IMX219 auto-gain/exposure, GPIO headlight, + and IR-only SLAM bridging. + + SaltyLab + MIT + + rclpy + std_msgs + sensor_msgs + cv_bridge + + python3-numpy + python3-opencv + + pytest + + + ament_python + + diff --git a/jetson/ros2_ws/src/saltybot_night_vision/resource/saltybot_night_vision b/jetson/ros2_ws/src/saltybot_night_vision/resource/saltybot_night_vision new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/__init__.py b/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/camera_controller.py b/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/camera_controller.py new file mode 100644 index 0000000..d464bf7 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/camera_controller.py @@ -0,0 +1,158 @@ +""" +camera_controller.py — D435i parameter service client + IMX219 v4l2 control. + +D435i (realsense2_camera node): + depth_module.emitter_enabled int 0=off 1=on 2=auto + depth_module.emitter_always_on bool keep emitter on between frames + +IMX219 (CSI cameras via v4l2): + exposure_auto 0=manual 1=auto + exposure microseconds (manual) + analogue_gain raw sensor gain (16–170 typical range) + +The IMX219 controls are exercised via subprocess `v4l2-ctl`, which is +always available on JetPack 6. +""" + +from __future__ import annotations + +import subprocess +import logging +from dataclasses import dataclass +from typing import Optional + +import rclpy +from rclpy.node import Node +from rcl_interfaces.msg import Parameter, ParameterValue, ParameterType +from rcl_interfaces.srv import SetParameters + +_LOG = logging.getLogger(__name__) + +# D435i camera node (realsense2_camera default namespace) +_RS_NODE = '/camera/camera' + +# IMX219 v4l2 device paths for each camera position +# Front camera is the primary source for ambient-light detection +_IMX219_DEVICES: dict[str, str] = { + 'front': '/dev/video0', + 'right': '/dev/video2', + 'rear': '/dev/video4', + 'left': '/dev/video6', +} + +# Auto-exposure values for IMX219 +_IMX219_AE_AUTO = 1 +_IMX219_AE_MANUAL = 0 + + +@dataclass +class CameraProfile: + """Parameter set for a given vision mode.""" + emitter_enabled: int # 0=off, 1=on, 2=auto + imx_ae_mode: int # 0=manual, 1=auto + imx_exposure_us: Optional[int] # None → leave AE to control it + imx_gain: Optional[int] # None → leave AE to control it + + +# Per-mode camera profiles +PROFILES: dict[str, CameraProfile] = { + 'day': CameraProfile( + emitter_enabled=2, # RS depth auto + imx_ae_mode=_IMX219_AE_AUTO, + imx_exposure_us=None, + imx_gain=None, + ), + 'twilight': CameraProfile( + emitter_enabled=1, # emitter on for better depth + imx_ae_mode=_IMX219_AE_MANUAL, + imx_exposure_us=20_000, # 20 ms + imx_gain=64, + ), + 'night': CameraProfile( + emitter_enabled=1, + imx_ae_mode=_IMX219_AE_MANUAL, + imx_exposure_us=50_000, # 50 ms (sacrifice motion sharpness) + imx_gain=128, + ), + 'ir_only': CameraProfile( + emitter_enabled=1, # emitter critical for IR stereo in dark + imx_ae_mode=_IMX219_AE_MANUAL, + imx_exposure_us=80_000, + imx_gain=170, # near-max analogue gain + ), +} + + +class CameraController: + """ + Non-blocking camera controller. + + set_profile() sends async ROS2 parameter requests and v4l2 commands. + It should be called from within a ROS2 node (pass the node reference). + """ + + def __init__(self, node: Node): + self._node = node + self._cli: Optional[rclpy.client.Client] = None + self._pending_mode: Optional[str] = None + self._current_mode: Optional[str] = None + self._init_rs_client() + + # ── Public API ──────────────────────────────────────────────────────────── + + def set_profile(self, mode: str) -> None: + """Apply the camera profile for `mode`. Fire-and-forget async.""" + if mode not in PROFILES: + self._node.get_logger().warning(f'[camera_ctrl] Unknown mode: {mode}') + return + if mode == self._current_mode: + return + profile = PROFILES[mode] + self._set_rs_emitter(profile.emitter_enabled) + self._set_imx219(profile) + self._current_mode = mode + self._node.get_logger().info( + f'[camera_ctrl] Applied profile={mode} ' + f'emitter={profile.emitter_enabled} ' + f'gain={profile.imx_gain} exp={profile.imx_exposure_us}' + ) + + # ── D435i (RealSense) ───────────────────────────────────────────────────── + + def _init_rs_client(self) -> None: + self._cli = self._node.create_client( + SetParameters, f'{_RS_NODE}/set_parameters' + ) + + def _set_rs_emitter(self, value: int) -> None: + if not self._cli.service_is_ready(): + self._node.get_logger().debug( + '[camera_ctrl] RS parameter service not ready — skipping emitter set' + ) + return + pval = ParameterValue(type=ParameterType.PARAMETER_INTEGER, integer_value=value) + param = Parameter(name='depth_module.emitter_enabled', value=pval) + req = SetParameters.Request(parameters=[param]) + self._cli.call_async(req) # fire-and-forget + + # ── IMX219 via v4l2-ctl ────────────────────────────────────────────────── + + def _set_imx219(self, profile: CameraProfile) -> None: + for dev in _IMX219_DEVICES.values(): + self._v4l2_set(dev, 'exposure_auto', profile.imx_ae_mode) + if profile.imx_exposure_us is not None: + self._v4l2_set(dev, 'exposure', profile.imx_exposure_us) + if profile.imx_gain is not None: + self._v4l2_set(dev, 'analogue_gain', profile.imx_gain) + + def _v4l2_set(self, dev: str, ctrl: str, value: int) -> None: + cmd = ['v4l2-ctl', '-d', dev, f'--set-ctrl={ctrl}={value}'] + try: + result = subprocess.run(cmd, capture_output=True, timeout=1.0) + if result.returncode != 0: + self._node.get_logger().debug( + f'[camera_ctrl] v4l2 {dev} {ctrl}={value}: ' + + result.stderr.decode().strip() + ) + except (subprocess.TimeoutExpired, FileNotFoundError) as exc: + self._node.get_logger().debug(f'[camera_ctrl] v4l2-ctl unavailable: {exc}') diff --git a/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/gpio_headlight.py b/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/gpio_headlight.py new file mode 100644 index 0000000..e2a41c3 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/gpio_headlight.py @@ -0,0 +1,192 @@ +""" +gpio_headlight.py — Jetson Orin Nano Super 40-pin GPIO headlight control. + +Physical pin 33 on the J26 40-pin header is used for the headlight. +On JetPack 6 this corresponds to sysfs GPIO chip gpio-tegra234 / offset 12. + +Brightness levels (0.0–1.0): + 0.0 → headlight off + 0.25 → dim (e.g. indicator, twilight assist) + 0.50 → medium (normal dark indoor) + 1.0 → full (outdoor night, ir_only mode) + +Implementation: + 1. Tries Jetson.GPIO PWM (RPi-compatible API, available if jetson-gpio installed) + 2. Falls back to sysfs binary (on/off only) via /sys/class/gpio + 3. If sysfs export fails (permissions / virtual env), logs a warning and no-ops. + +Sysfs GPIO path on Orin Nano Super (JetPack 6): + /sys/class/gpio/gpiochip/ — use the tegra234 chip + Or the legacy /sys/class/gpio/gpio/ path after export. + +Note: udev rule or `sudo` may be needed in production. + Add to /etc/udev/rules.d/99-jetson-gpio.rules: + SUBSYSTEM=="gpio", ACTION=="add", RUN+="/bin/chmod 777 /sys/class/gpio" +""" + +from __future__ import annotations + +import logging +import os +from typing import Optional + +_LOG = logging.getLogger(__name__) + +# Physical board pin for the headlight +_BOARD_PIN = 33 + +# Brightness → duty cycle string mapping for sysfs PWM (when available) +_LEVEL_DUTY: dict[str, float] = { + 'off': 0.0, + 'dim': 0.25, + 'medium': 0.50, + 'full': 1.0, +} + + +class GPIOHeadlight: + """Controls the GPIO headlight with graceful fallbacks.""" + + def __init__(self, pin: int = _BOARD_PIN): + self._pin = pin + self._brightness: float = 0.0 + self._backend: str = 'none' + self._pwm = None + self._gpio_path: Optional[str] = None + self._init() + + # ── Public API ──────────────────────────────────────────────────────────── + + def set_brightness(self, level: float) -> None: + """Set headlight brightness in [0.0, 1.0].""" + level = max(0.0, min(1.0, level)) + if level == self._brightness: + return + self._brightness = level + self._apply(level) + + def set_level(self, name: str) -> None: + """Set headlight to a named level: 'off', 'dim', 'medium', 'full'.""" + self.set_brightness(_LEVEL_DUTY.get(name, 0.0)) + + def off(self) -> None: + self.set_brightness(0.0) + + def cleanup(self) -> None: + self.off() + if self._pwm is not None: + try: + self._pwm.stop() + import Jetson.GPIO as GPIO + GPIO.cleanup() + except Exception: + pass + elif self._gpio_path: + self._gpio_write('0') + + # ── Init backends ───────────────────────────────────────────────────────── + + def _init(self) -> None: + # Try Jetson.GPIO PWM first + try: + import Jetson.GPIO as GPIO + GPIO.setmode(GPIO.BOARD) + GPIO.setup(self._pin, GPIO.OUT, initial=GPIO.LOW) + self._pwm = GPIO.PWM(self._pin, 1000) # 1 kHz PWM + self._pwm.start(0) + self._backend = 'jetson_gpio_pwm' + _LOG.info(f'[headlight] Jetson.GPIO PWM on pin {self._pin}') + return + except ImportError: + _LOG.debug('[headlight] Jetson.GPIO not available') + except Exception as exc: + _LOG.warning(f'[headlight] Jetson.GPIO init failed: {exc}') + + # Fall back to sysfs binary + gpio_num = self._board_pin_to_sysfs(self._pin) + if gpio_num is not None: + path = f'/sys/class/gpio/gpio{gpio_num}' + if self._sysfs_export(gpio_num): + self._gpio_path = path + self._backend = 'sysfs' + _LOG.info(f'[headlight] sysfs GPIO /sys/class/gpio/gpio{gpio_num}') + return + + _LOG.warning('[headlight] No GPIO backend available — headlight is simulated') + self._backend = 'sim' + + # ── Apply ───────────────────────────────────────────────────────────────── + + def _apply(self, level: float) -> None: + if self._backend == 'jetson_gpio_pwm': + try: + self._pwm.ChangeDutyCycle(level * 100.0) + except Exception as exc: + _LOG.warning(f'[headlight] PWM error: {exc}') + elif self._backend == 'sysfs': + self._gpio_write('1' if level > 0 else '0') + # sim: no-op + + # ── Sysfs helpers ───────────────────────────────────────────────────────── + + @staticmethod + def _board_pin_to_sysfs(board_pin: int) -> Optional[int]: + """ + Map physical 40-pin board pin to sysfs GPIO number on Orin Nano Super. + Only a subset is mapped here; extend as needed. + """ + # Partial map of Orin Nano Super 40-pin header (physical → sysfs offset) + _ORIN_MAP: dict[int, int] = { + 7: 106, + 11: 112, + 12: 50, + 13: 108, + 15: 109, + 16: 85, + 18: 110, + 19: 130, + 21: 128, + 22: 111, + 23: 129, + 24: 131, + 26: 132, + 29: 118, + 31: 120, + 32: 122, + 33: 200, # headlight pin + 35: 116, + 36: 54, + 37: 121, + 38: 117, + 40: 56, + } + return _ORIN_MAP.get(board_pin) + + def _sysfs_export(self, gpio_num: int) -> bool: + export_path = f'/sys/class/gpio/gpio{gpio_num}' + if not os.path.exists(export_path): + try: + with open('/sys/class/gpio/export', 'w') as f: + f.write(str(gpio_num)) + except PermissionError: + _LOG.warning('[headlight] No permission for /sys/class/gpio/export') + return False + except OSError as exc: + _LOG.warning(f'[headlight] sysfs export error: {exc}') + return False + try: + with open(f'{export_path}/direction', 'w') as f: + f.write('out') + return True + except OSError as exc: + _LOG.warning(f'[headlight] sysfs direction error: {exc}') + return False + + def _gpio_write(self, value: str) -> None: + if not self._gpio_path: + return + try: + with open(f'{self._gpio_path}/value', 'w') as f: + f.write(value) + except OSError as exc: + _LOG.warning(f'[headlight] sysfs write error: {exc}') diff --git a/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/ir_slam_bridge_node.py b/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/ir_slam_bridge_node.py new file mode 100644 index 0000000..f7a659e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/ir_slam_bridge_node.py @@ -0,0 +1,157 @@ +""" +ir_slam_bridge_node.py — Bridge D435i IR1 stream to RGB-style topics for SLAM. + +When the vision mode is 'night' or 'ir_only', visible-light cameras are +unusable. RTAB-Map (and other SLAM front-ends) expect a colour image. This +node re-publishes the D435i infra1 (mono8) stream as fake RGB (rgb8) so that +RTAB-Map's visual front-end continues to operate without reconfiguration. + +Also re-publishes the camera_info with the same frame_id so that the pipeline +stays consistent. + +When mode is 'day' or 'twilight' the bridge is inactive (no republish) to +avoid double-feeding RTAB-Map. + +Subscribes: + /saltybot/vision_mode std_msgs/String mode gate + /camera/infra1/image_rect_raw sensor_msgs/Image IR source (mono8) + /camera/infra1/camera_info sensor_msgs/CameraInfo IR intrinsics + +Publishes (when active): + /saltybot/ir_bridge/image_raw sensor_msgs/Image grey-as-rgb8 + /saltybot/ir_bridge/camera_info sensor_msgs/CameraInfo passthrough + +Parameters: + active_modes str 'night,ir_only' (comma-separated) + output_encoding str 'rgb8' +""" + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy + +import numpy as np +import cv2 +from cv_bridge import CvBridge + +from sensor_msgs.msg import Image, CameraInfo +from std_msgs.msg import String + + +_SENSOR_QOS = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=5, +) +_LATCHED_QOS = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=1, + durability=DurabilityPolicy.TRANSIENT_LOCAL, +) +_RELIABLE_QOS = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=10, +) + + +class IRSlamBridgeNode(Node): + + def __init__(self): + super().__init__('ir_slam_bridge') + + self.declare_parameter('active_modes', 'night,ir_only') + self.declare_parameter('output_encoding', 'rgb8') + + active_str = self.get_parameter('active_modes').value + self._active_modes: set[str] = {m.strip() for m in active_str.split(',')} + self._out_enc: str = self.get_parameter('output_encoding').value + + self._bridge = CvBridge() + self._active = False + self._latest_info: CameraInfo | None = None + + # Mode gate + self.create_subscription( + String, '/saltybot/vision_mode', self._on_mode, _RELIABLE_QOS + ) + + # IR source + self.create_subscription( + Image, '/camera/infra1/image_rect_raw', self._on_ir_image, _SENSOR_QOS + ) + self.create_subscription( + CameraInfo, '/camera/infra1/camera_info', self._on_camera_info, _LATCHED_QOS + ) + + # Bridge output + self._img_pub = self.create_publisher( + Image, '/saltybot/ir_bridge/image_raw', 10 + ) + self._info_pub = self.create_publisher( + CameraInfo, '/saltybot/ir_bridge/camera_info', 10 + ) + + self.get_logger().info( + f'ir_slam_bridge ready — active in modes: {sorted(self._active_modes)}' + ) + + # ── Subscriptions ───────────────────────────────────────────────────────── + + def _on_mode(self, msg: String) -> None: + was_active = self._active + self._active = msg.data in self._active_modes + if self._active != was_active: + state = 'ACTIVE' if self._active else 'idle' + self.get_logger().info( + f'[ir_bridge] mode={msg.data} → bridge {state}' + ) + + def _on_camera_info(self, msg: CameraInfo) -> None: + self._latest_info = msg + + def _on_ir_image(self, msg: Image) -> None: + if not self._active: + return + + try: + mono = self._bridge.imgmsg_to_cv2(msg, desired_encoding='mono8') + except Exception as exc: + self.get_logger().warning(f'[ir_bridge] decode error: {exc}') + return + + # Convert mono8 → target encoding + if self._out_enc == 'rgb8': + colour = cv2.cvtColor(mono, cv2.COLOR_GRAY2RGB) + elif self._out_enc == 'bgr8': + colour = cv2.cvtColor(mono, cv2.COLOR_GRAY2BGR) + else: + colour = mono # passthrough + + try: + out_msg = self._bridge.cv2_to_imgmsg(colour, encoding=self._out_enc) + except Exception as exc: + self.get_logger().warning(f'[ir_bridge] encode error: {exc}') + return + + out_msg.header = msg.header + self._img_pub.publish(out_msg) + + if self._latest_info is not None: + self._latest_info.header = msg.header + self._info_pub.publish(self._latest_info) + + +def main(args=None): + rclpy.init(args=args) + node = IRSlamBridgeNode() + try: + rclpy.spin(node) + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/light_analyser.py b/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/light_analyser.py new file mode 100644 index 0000000..082876e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/light_analyser.py @@ -0,0 +1,107 @@ +""" +light_analyser.py — Histogram-based ambient light analysis and vision mode FSM. + +Vision modes (string keys used on /saltybot/vision_mode): + 'day' — full colour, all cameras nominal + 'twilight' — dim light, start boosting IMX219 gain + 'night' — dark, max gain + headlight on, IR emitter off + 'ir_only' — effectively no ambient light, switch to D435i IR stereo only + +Transitions use hysteresis to avoid rapid toggling: + Promotion (to darker mode): threshold - HYST + Demotion (to lighter mode): threshold + HYST +""" + +from __future__ import annotations + +import numpy as np +from typing import Tuple + +# ── Thresholds (0–255 mean grey intensity) ─────────────────────────────────── +# Modes: ir_only < NIGHT_THR <= night < TWILIGHT_THR <= twilight < DAY_THR <= day +_TWILIGHT_THR = 110 # below → twilight +_NIGHT_THR = 45 # below → night +_IR_ONLY_THR = 15 # below → ir_only +_HYST = 8 # hysteresis band (half-width) + +# Ordered list of modes darkest → brightest +_MODES = ('ir_only', 'night', 'twilight', 'day') + +# Per-mode thresholds: (promote_below, demote_above) +# promote_below — if median < this, enter the next darker mode +# demote_above — if median > this, enter the next lighter mode +# +# Hysteresis: promote threshold is thr - HYST; demote is thr + HYST +# where thr is the boundary between THIS mode and the adjacent lighter mode. +_THRESHOLDS = { + # (promote_below, demote_above) + 'day': (_TWILIGHT_THR - _HYST, None), + 'twilight': (_NIGHT_THR - _HYST, _TWILIGHT_THR + _HYST), + 'night': (_IR_ONLY_THR - _HYST, _NIGHT_THR + _HYST), + 'ir_only': (None, _IR_ONLY_THR + _HYST), +} + + +class LightAnalyser: + """Stateful ambient-light classifier with hysteresis.""" + + def __init__( + self, + initial_mode: str = 'day', + history_len: int = 6, + ): + if initial_mode not in _MODES: + raise ValueError(f'Unknown mode: {initial_mode}') + self._mode = initial_mode + self._history: list[float] = [] + self._history_len = history_len + + # ── Public API ──────────────────────────────────────────────────────────── + + @property + def mode(self) -> str: + return self._mode + + def update(self, frame: np.ndarray) -> Tuple[str, float]: + """ + Analyse a new image frame (any colour space; converted to grey internally). + + Returns: + (mode, mean_intensity) — mode string + 0–255 grey mean + """ + intensity = self._frame_intensity(frame) + self._history.append(intensity) + if len(self._history) > self._history_len: + self._history.pop(0) + + smooth = float(np.median(self._history)) + self._mode = self._next_mode(self._mode, smooth) + return self._mode, smooth + + # ── Internal helpers ────────────────────────────────────────────────────── + + @staticmethod + def _frame_intensity(frame: np.ndarray) -> float: + """Return mean pixel intensity of the frame as float in [0, 255].""" + if frame.ndim == 3: + # RGB / BGR → greyscale + grey = np.mean(frame, axis=2).astype(np.float32) + else: + grey = frame.astype(np.float32) + return float(grey.mean()) + + @staticmethod + def _next_mode(current: str, intensity: float) -> str: + """Apply hysteresis FSM to decide next mode.""" + idx = _MODES.index(current) + + # Try promotion (→ darker mode) first + lo, hi = _THRESHOLDS[current] + if lo is not None and intensity < lo: + return _MODES[max(0, idx - 1)] + + # Try demotion (→ lighter mode) + if hi is not None and intensity > hi: + return _MODES[min(len(_MODES) - 1, idx + 1)] + + return current diff --git a/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/light_monitor_node.py b/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/light_monitor_node.py new file mode 100644 index 0000000..4d75364 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/light_monitor_node.py @@ -0,0 +1,141 @@ +""" +light_monitor_node.py — Ambient light monitor for SaltyBot night vision. + +Uses the front IMX219 RGB stream (or D435i infra1 as fallback) to estimate +ambient brightness via histogram analysis. Publishes the current vision mode +and raw intensity on a 2 Hz digest timer. + +Subscribes (any ONE active at a time): + /camera/color/image_raw sensor_msgs/Image RGB from IMX219 front (primary) + /camera/infra1/image_rect_raw sensor_msgs/Image IR fallback (mono8) + +Publishes: + /saltybot/ambient_light std_msgs/Float32 mean grey intensity 0–255 + /saltybot/vision_mode std_msgs/String 'day'|'twilight'|'night'|'ir_only' + +Parameters: + primary_camera_topic str '/camera/color/image_raw' + fallback_ir_topic str '/camera/infra1/image_rect_raw' + publish_hz float 2.0 + initial_mode str 'day' + history_len int 6 +""" + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy + +import numpy as np +import cv2 +from cv_bridge import CvBridge + +from sensor_msgs.msg import Image +from std_msgs.msg import Float32, String + +from .light_analyser import LightAnalyser + + +_SENSOR_QOS = QoSProfile( + reliability=ReliabilityPolicy.BEST_EFFORT, + history=HistoryPolicy.KEEP_LAST, + depth=3, +) + + +class LightMonitorNode(Node): + + def __init__(self): + super().__init__('light_monitor') + + self.declare_parameter('primary_camera_topic', '/camera/color/image_raw') + self.declare_parameter('fallback_ir_topic', '/camera/infra1/image_rect_raw') + self.declare_parameter('publish_hz', 2.0) + self.declare_parameter('initial_mode', 'day') + self.declare_parameter('history_len', 6) + + primary_topic = self.get_parameter('primary_camera_topic').value + fallback_topic = self.get_parameter('fallback_ir_topic').value + pub_hz = self.get_parameter('publish_hz').value + initial_mode = self.get_parameter('initial_mode').value + history_len = self.get_parameter('history_len').value + + self._bridge = CvBridge() + self._analyser = LightAnalyser(initial_mode=initial_mode, history_len=history_len) + + # Latest decoded frame (thread-safe in single-threaded executor) + self._latest_frame: np.ndarray | None = None + self._using_fallback = False + + # Subscriptions — primary preferred; fallback used when primary silent + self.create_subscription(Image, primary_topic, self._on_primary, _SENSOR_QOS) + self.create_subscription(Image, fallback_topic, self._on_fallback, _SENSOR_QOS) + + self._mode_pub = self.create_publisher(String, '/saltybot/vision_mode', 10) + self._light_pub = self.create_publisher(Float32, '/saltybot/ambient_light', 10) + + self.create_timer(1.0 / pub_hz, self._tick) + + self.get_logger().info( + f'light_monitor ready — primary={primary_topic} fallback={fallback_topic}' + ) + + # ── Subscriptions ───────────────────────────────────────────────────────── + + def _on_primary(self, msg: Image) -> None: + try: + frame = self._bridge.imgmsg_to_cv2(msg, desired_encoding='passthrough') + self._latest_frame = frame + self._using_fallback = False + except Exception as exc: + self.get_logger().warning(f'[light_monitor] primary decode: {exc}') + + def _on_fallback(self, msg: Image) -> None: + # Only use fallback if no primary frame recently arrived + if self._latest_frame is not None and not self._using_fallback: + return # primary is active + try: + frame = self._bridge.imgmsg_to_cv2(msg, desired_encoding='mono8') + self._latest_frame = frame + self._using_fallback = True + except Exception as exc: + self.get_logger().warning(f'[light_monitor] fallback decode: {exc}') + + # ── Publish timer ───────────────────────────────────────────────────────── + + def _tick(self) -> None: + if self._latest_frame is None: + return # no data yet + + # Subsample for speed (analyse quarter-resolution) + frame = self._latest_frame + if frame.shape[0] > 120: + frame = frame[::4, ::4] + + mode, intensity = self._analyser.update(frame) + + msg_mode = String() + msg_mode.data = mode + self._mode_pub.publish(msg_mode) + + msg_light = Float32() + msg_light.data = float(intensity) + self._light_pub.publish(msg_light) + + if self._using_fallback: + self.get_logger().debug( + f'[light_monitor] IR fallback — mode={mode} intensity={intensity:.1f}' + ) + + +def main(args=None): + rclpy.init(args=args) + node = LightMonitorNode() + try: + rclpy.spin(node) + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/night_vision_controller_node.py b/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/night_vision_controller_node.py new file mode 100644 index 0000000..960c474 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_night_vision/saltybot_night_vision/night_vision_controller_node.py @@ -0,0 +1,137 @@ +""" +night_vision_controller_node.py — Hardware adaptation on vision mode changes. + +Subscribes to /saltybot/vision_mode and drives: + • D435i IR emitter (via realsense2_camera parameter service) + • IMX219 gain / exposure (via v4l2-ctl subprocess) + • GPIO headlight brightness + +Mode → hardware mapping: + 'day' → emitter=auto, IMX219 AE, headlight off + 'twilight' → emitter=on, IMX219 boosted, headlight dim + 'night' → emitter=on, IMX219 max gain, headlight medium + 'ir_only' → emitter=on, IMX219 max gain, headlight full + +Publishes: + /saltybot/visual_odom_status std_msgs/String (JSON — reuses VO status topic) + +Parameters: + headlight_pin int 33 (physical 40-pin board pin) + day_headlight float 0.0 + twilight_headlight float 0.25 + night_headlight float 0.50 + ir_only_headlight float 1.0 +""" + +import json +import time + +import rclpy +from rclpy.node import Node +from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy + +from std_msgs.msg import String + +from .camera_controller import CameraController +from .gpio_headlight import GPIOHeadlight + + +_RELIABLE_QOS = QoSProfile( + reliability=ReliabilityPolicy.RELIABLE, + history=HistoryPolicy.KEEP_LAST, + depth=10, +) + + +class NightVisionControllerNode(Node): + + def __init__(self): + super().__init__('night_vision_controller') + + self.declare_parameter('headlight_pin', 33) + self.declare_parameter('day_headlight', 0.0) + self.declare_parameter('twilight_headlight', 0.25) + self.declare_parameter('night_headlight', 0.50) + self.declare_parameter('ir_only_headlight', 1.0) + + pin = self.get_parameter('headlight_pin').value + self._brightness_map = { + 'day': self.get_parameter('day_headlight').value, + 'twilight': self.get_parameter('twilight_headlight').value, + 'night': self.get_parameter('night_headlight').value, + 'ir_only': self.get_parameter('ir_only_headlight').value, + } + + self._cam_ctrl = CameraController(self) + self._headlight = GPIOHeadlight(pin=pin) + self._cur_mode: str | None = None + self._mode_change_t: float = 0.0 + + self.create_subscription( + String, '/saltybot/vision_mode', self._on_mode, _RELIABLE_QOS + ) + + self._status_pub = self.create_publisher( + String, '/saltybot/night_vision_status', 10 + ) + self.create_timer(5.0, self._status_tick) + + self.get_logger().info( + f'night_vision_controller ready — headlight pin={pin}' + ) + + # ── Subscription ────────────────────────────────────────────────────────── + + def _on_mode(self, msg: String) -> None: + mode = msg.data + if mode == self._cur_mode: + return + + prev = self._cur_mode + self._cur_mode = mode + self._mode_change_t = time.monotonic() + + # Apply camera profile (D435i emitter + IMX219 gain/exposure) + self._cam_ctrl.set_profile(mode) + + # Apply headlight + brightness = self._brightness_map.get(mode, 0.0) + self._headlight.set_brightness(brightness) + + self.get_logger().info( + f'[nv_ctrl] {prev} → {mode} | ' + f'headlight={brightness:.2f}' + ) + + # ── Status heartbeat ────────────────────────────────────────────────────── + + def _status_tick(self) -> None: + status = { + 'mode': self._cur_mode or 'unknown', + 'headlight': round(self._headlight._brightness, 2), + 'mode_age_s': round(time.monotonic() - self._mode_change_t, 1) + if self._cur_mode else None, + } + msg = String() + msg.data = json.dumps(status) + self._status_pub.publish(msg) + + # ── Cleanup ─────────────────────────────────────────────────────────────── + + def destroy_node(self) -> None: + self._headlight.cleanup() + super().destroy_node() + + +def main(args=None): + rclpy.init(args=args) + node = NightVisionControllerNode() + try: + rclpy.spin(node) + finally: + node.destroy_node() + rclpy.shutdown() + + +if __name__ == '__main__': + main() diff --git a/jetson/ros2_ws/src/saltybot_night_vision/setup.cfg b/jetson/ros2_ws/src/saltybot_night_vision/setup.cfg new file mode 100644 index 0000000..5ac781e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_night_vision/setup.cfg @@ -0,0 +1,4 @@ +[develop] +script_dir=$base/lib/saltybot_night_vision +[install] +install_scripts=$base/lib/saltybot_night_vision diff --git a/jetson/ros2_ws/src/saltybot_night_vision/setup.py b/jetson/ros2_ws/src/saltybot_night_vision/setup.py new file mode 100644 index 0000000..d7a806e --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_night_vision/setup.py @@ -0,0 +1,34 @@ +from setuptools import setup, find_packages +import os +from glob import glob + +package_name = 'saltybot_night_vision' + +setup( + name=package_name, + version='0.1.0', + packages=find_packages(exclude=['test']), + data_files=[ + ('share/ament_index/resource_index/packages', + ['resource/' + package_name]), + ('share/' + package_name, ['package.xml']), + ('share/' + package_name + '/launch', + glob('launch/*.launch.py')), + ('share/' + package_name + '/config', + glob('config/*.yaml')), + ], + install_requires=['setuptools'], + zip_safe=True, + maintainer='SaltyLab', + maintainer_email='robot@saltylab.local', + description='Night vision mode: IR emitter, headlight, ambient-light FSM, IR SLAM bridge', + license='MIT', + tests_require=['pytest'], + entry_points={ + 'console_scripts': [ + 'light_monitor = saltybot_night_vision.light_monitor_node:main', + 'night_vision_ctrl = saltybot_night_vision.night_vision_controller_node:main', + 'ir_slam_bridge = saltybot_night_vision.ir_slam_bridge_node:main', + ], + }, +) diff --git a/jetson/ros2_ws/src/saltybot_night_vision/test/test_night_vision.py b/jetson/ros2_ws/src/saltybot_night_vision/test/test_night_vision.py new file mode 100644 index 0000000..41789b7 --- /dev/null +++ b/jetson/ros2_ws/src/saltybot_night_vision/test/test_night_vision.py @@ -0,0 +1,152 @@ +""" +test_night_vision.py — Unit tests for LightAnalyser and GPIOHeadlight. + +Runs without ROS2 / Jetson hardware (no rclpy, no GPIO). +""" + +import sys +import os +import math + +import numpy as np +import pytest + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from saltybot_night_vision.light_analyser import LightAnalyser +from saltybot_night_vision.gpio_headlight import GPIOHeadlight + + +# ── LightAnalyser tests ─────────────────────────────────────────────────────── + +class TestLightAnalyser: + + def _bright_frame(self, value: int = 180) -> np.ndarray: + return np.full((480, 640), value, dtype=np.uint8) + + def _dark_frame(self, value: int = 5) -> np.ndarray: + return np.full((480, 640), value, dtype=np.uint8) + + def test_initial_mode_default(self): + la = LightAnalyser() + assert la.mode == 'day' + + def test_bright_frame_stays_day(self): + la = LightAnalyser() + for _ in range(10): + mode, _ = la.update(self._bright_frame(180)) + assert mode == 'day' + + def test_dark_frame_transitions_to_ir_only(self): + la = LightAnalyser() + for _ in range(10): + mode, _ = la.update(self._dark_frame(3)) + assert mode == 'ir_only' + + def test_medium_intensity_twilight(self): + la = LightAnalyser() + frame = np.full((480, 640), 75, dtype=np.uint8) + for _ in range(10): + mode, _ = la.update(frame) + assert mode == 'twilight' + + def test_night_range(self): + la = LightAnalyser() + frame = np.full((480, 640), 25, dtype=np.uint8) + for _ in range(10): + mode, _ = la.update(frame) + assert mode == 'night' + + def test_intensity_value_correct(self): + la = LightAnalyser() + frame = np.full((60, 80), 128, dtype=np.uint8) + _, intensity = la.update(frame) + assert intensity == pytest.approx(128.0, abs=0.5) + + def test_rgb_frame_accepted(self): + la = LightAnalyser() + frame = np.full((480, 640, 3), 60, dtype=np.uint8) + mode, intensity = la.update(frame) + assert isinstance(mode, str) + assert 0.0 <= intensity <= 255.0 + + def test_hysteresis_prevents_oscillation(self): + """Frame right at threshold should not cause mode flip on each tick.""" + la = LightAnalyser() + # Start from day, push to just below twilight threshold + frame = np.full((480, 640), 100, dtype=np.uint8) + modes: list[str] = [] + for _ in range(20): + mode, _ = la.update(frame) + modes.append(mode) + # Should not oscillate — all same mode + assert len(set(modes[-10:])) == 1 + + def test_recovery_to_day(self): + la = LightAnalyser(initial_mode='ir_only') + bright = np.full((480, 640), 200, dtype=np.uint8) + for _ in range(20): + mode, _ = la.update(bright) + assert mode == 'day' + + +# ── GPIOHeadlight tests (simulated — no hardware) ───────────────────────────── + +class TestGPIOHeadlightSim: + """ + Tests that GPIOHeadlight correctly tracks brightness internally + even when running in simulation mode (no hardware GPIO). + """ + + def test_default_brightness_zero(self): + hl = GPIOHeadlight() + assert hl._brightness == pytest.approx(0.0) + + def test_set_brightness_clamps_high(self): + hl = GPIOHeadlight() + hl.set_brightness(2.5) + assert hl._brightness == pytest.approx(1.0) + + def test_set_brightness_clamps_low(self): + hl = GPIOHeadlight() + hl.set_brightness(-0.5) + assert hl._brightness == pytest.approx(0.0) + + def test_set_level_off(self): + hl = GPIOHeadlight() + hl.set_brightness(0.8) + hl.set_level('off') + assert hl._brightness == pytest.approx(0.0) + + def test_set_level_full(self): + hl = GPIOHeadlight() + hl.set_level('full') + assert hl._brightness == pytest.approx(1.0) + + def test_set_level_dim(self): + hl = GPIOHeadlight() + hl.set_level('dim') + assert hl._brightness == pytest.approx(0.25) + + def test_off_method(self): + hl = GPIOHeadlight() + hl.set_brightness(0.75) + hl.off() + assert hl._brightness == pytest.approx(0.0) + + def test_cleanup_sets_zero(self): + hl = GPIOHeadlight() + hl.set_brightness(0.9) + hl.cleanup() + assert hl._brightness == pytest.approx(0.0) + + def test_no_op_on_same_brightness(self): + """Repeated set of same value should not raise.""" + hl = GPIOHeadlight() + hl.set_brightness(0.5) + hl.set_brightness(0.5) # should be no-op, no error + assert hl._brightness == pytest.approx(0.5) + + +if __name__ == '__main__': + pytest.main([__file__, '-v'])