feat(jetson): add saltybot_night_vision package (issue #168)

Implements ambient-light-aware night vision mode for the D435i + IMX219
stack on the Jetson Orin Nano Super:

• light_analyser.py       — histogram-based intensity FSM with hysteresis:
                            day → twilight → night → ir_only
• camera_controller.py   — D435i IR emitter via realsense2_camera param
                            service + IMX219 gain/exposure via v4l2-ctl
• gpio_headlight.py      — physical pin 33 headlight; Jetson.GPIO PWM
                            primary, sysfs on/off fallback, sim mode
• light_monitor_node.py  — subscribes IMX219/IR1, publishes
                            /saltybot/ambient_light + /saltybot/vision_mode
• night_vision_controller_node.py — reacts to mode changes; drives
                            D435i emitter, IMX219 gain, headlight
• ir_slam_bridge_node.py — mono8 IR1 → rgb8 republish so RTAB-Map
                            keeps loop-closing in darkness
• launch/night_vision.launch.py + config/night_vision_params.yaml
• test/test_night_vision.py — 18 unit tests (18/18 pass, no ROS2 needed)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sl-perception 2026-03-02 10:36:18 -05:00
parent 566cfc8811
commit 7dcdd2088a
14 changed files with 1226 additions and 0 deletions

View File

@ -0,0 +1,49 @@
# saltybot_night_vision — runtime parameters
#
# Ambient light detection requires ONE of:
# /camera/color/image_raw — IMX219 front camera (primary)
# /camera/infra1/image_rect_raw — D435i IR fallback
#
# D435i IR emitter is controlled via the realsense2_camera parameter service.
# Make sure the node is running as /camera/camera (default).
#
# GPIO headlight on physical pin 33 of the Jetson Orin Nano Super 40-pin header.
# Requires Jetson.GPIO (jetson-gpio pip package) or /sys/class/gpio write access.
light_monitor:
ros__parameters:
primary_camera_topic: '/camera/color/image_raw'
fallback_ir_topic: '/camera/infra1/image_rect_raw'
publish_hz: 2.0 # ambient light evaluation rate
initial_mode: 'day'
history_len: 6 # frames to median-filter intensity
night_vision_controller:
ros__parameters:
headlight_pin: 33 # physical board pin (Jetson 40-pin J26)
day_headlight: 0.0 # off
twilight_headlight: 0.25 # 25 % — subtle fill
night_headlight: 0.50 # 50 % — normal dark room
ir_only_headlight: 1.0 # 100 % — pitch black
ir_slam_bridge:
ros__parameters:
active_modes: 'night,ir_only' # publish IR→RGB only in these modes
output_encoding: 'rgb8' # RTAB-Map expects rgb8 or bgr8
# ── RTAB-Map integration note ──────────────────────────────────────────────────
# When the bridge is active (/saltybot/ir_bridge/image_raw), redirect RTAB-Map's
# RGB input by adding to slam_rtabmap.launch.py Node remappings:
#
# remappings=[
# ('rgb/image', '/saltybot/ir_bridge/image_raw'),
# ('rgb/camera_info', '/saltybot/ir_bridge/camera_info'),
# ]
#
# Or set the topic at launch:
# ros2 launch saltybot_bringup slam_rtabmap.launch.py \
# rgb_image_topic:=/saltybot/ir_bridge/image_raw
#
# This ensures loop-closure detection continues using the IR feature stream
# instead of a dark / blank colour image.

View File

@ -0,0 +1,68 @@
"""
night_vision.launch.py SaltyBot night vision stack.
Starts:
light_monitor histogram ambient-light analysis /saltybot/vision_mode
night_vision_ctrl D435i emitter + IMX219 gain + GPIO headlight
ir_slam_bridge IRRGB republish for RTAB-Map (active in dark modes)
Launch args:
primary_camera_topic str '/camera/color/image_raw'
headlight_pin int '33'
publish_hz float '2.0'
active_ir_modes str 'night,ir_only'
Verify:
ros2 topic echo /saltybot/vision_mode
ros2 topic echo /saltybot/ambient_light
ros2 topic echo /saltybot/night_vision_status
ros2 topic hz /saltybot/ir_bridge/image_raw # should be ~30 Hz in dark
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
args = [
DeclareLaunchArgument('primary_camera_topic',
default_value='/camera/color/image_raw'),
DeclareLaunchArgument('headlight_pin', default_value='33'),
DeclareLaunchArgument('publish_hz', default_value='2.0'),
DeclareLaunchArgument('active_ir_modes', default_value='night,ir_only'),
]
light_monitor = Node(
package='saltybot_night_vision',
executable='light_monitor',
name='light_monitor',
output='screen',
parameters=[{
'primary_camera_topic': LaunchConfiguration('primary_camera_topic'),
'publish_hz': LaunchConfiguration('publish_hz'),
}],
)
nv_ctrl = Node(
package='saltybot_night_vision',
executable='night_vision_ctrl',
name='night_vision_controller',
output='screen',
parameters=[{
'headlight_pin': LaunchConfiguration('headlight_pin'),
}],
)
ir_bridge = Node(
package='saltybot_night_vision',
executable='ir_slam_bridge',
name='ir_slam_bridge',
output='screen',
parameters=[{
'active_modes': LaunchConfiguration('active_ir_modes'),
}],
)
return LaunchDescription(args + [light_monitor, nv_ctrl, ir_bridge])

View File

@ -0,0 +1,27 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_night_vision</name>
<version>0.1.0</version>
<description>
Night vision mode for SaltyBot: ambient-light histogram detection,
D435i IR emitter control, IMX219 auto-gain/exposure, GPIO headlight,
and IR-only SLAM bridging.
</description>
<maintainer email="robot@saltylab.local">SaltyLab</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>sensor_msgs</depend>
<depend>cv_bridge</depend>
<exec_depend>python3-numpy</exec_depend>
<exec_depend>python3-opencv</exec_depend>
<test_depend>pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,158 @@
"""
camera_controller.py D435i parameter service client + IMX219 v4l2 control.
D435i (realsense2_camera node):
depth_module.emitter_enabled int 0=off 1=on 2=auto
depth_module.emitter_always_on bool keep emitter on between frames
IMX219 (CSI cameras via v4l2):
exposure_auto 0=manual 1=auto
exposure microseconds (manual)
analogue_gain raw sensor gain (16170 typical range)
The IMX219 controls are exercised via subprocess `v4l2-ctl`, which is
always available on JetPack 6.
"""
from __future__ import annotations
import subprocess
import logging
from dataclasses import dataclass
from typing import Optional
import rclpy
from rclpy.node import Node
from rcl_interfaces.msg import Parameter, ParameterValue, ParameterType
from rcl_interfaces.srv import SetParameters
_LOG = logging.getLogger(__name__)
# D435i camera node (realsense2_camera default namespace)
_RS_NODE = '/camera/camera'
# IMX219 v4l2 device paths for each camera position
# Front camera is the primary source for ambient-light detection
_IMX219_DEVICES: dict[str, str] = {
'front': '/dev/video0',
'right': '/dev/video2',
'rear': '/dev/video4',
'left': '/dev/video6',
}
# Auto-exposure values for IMX219
_IMX219_AE_AUTO = 1
_IMX219_AE_MANUAL = 0
@dataclass
class CameraProfile:
"""Parameter set for a given vision mode."""
emitter_enabled: int # 0=off, 1=on, 2=auto
imx_ae_mode: int # 0=manual, 1=auto
imx_exposure_us: Optional[int] # None → leave AE to control it
imx_gain: Optional[int] # None → leave AE to control it
# Per-mode camera profiles
PROFILES: dict[str, CameraProfile] = {
'day': CameraProfile(
emitter_enabled=2, # RS depth auto
imx_ae_mode=_IMX219_AE_AUTO,
imx_exposure_us=None,
imx_gain=None,
),
'twilight': CameraProfile(
emitter_enabled=1, # emitter on for better depth
imx_ae_mode=_IMX219_AE_MANUAL,
imx_exposure_us=20_000, # 20 ms
imx_gain=64,
),
'night': CameraProfile(
emitter_enabled=1,
imx_ae_mode=_IMX219_AE_MANUAL,
imx_exposure_us=50_000, # 50 ms (sacrifice motion sharpness)
imx_gain=128,
),
'ir_only': CameraProfile(
emitter_enabled=1, # emitter critical for IR stereo in dark
imx_ae_mode=_IMX219_AE_MANUAL,
imx_exposure_us=80_000,
imx_gain=170, # near-max analogue gain
),
}
class CameraController:
"""
Non-blocking camera controller.
set_profile() sends async ROS2 parameter requests and v4l2 commands.
It should be called from within a ROS2 node (pass the node reference).
"""
def __init__(self, node: Node):
self._node = node
self._cli: Optional[rclpy.client.Client] = None
self._pending_mode: Optional[str] = None
self._current_mode: Optional[str] = None
self._init_rs_client()
# ── Public API ────────────────────────────────────────────────────────────
def set_profile(self, mode: str) -> None:
"""Apply the camera profile for `mode`. Fire-and-forget async."""
if mode not in PROFILES:
self._node.get_logger().warning(f'[camera_ctrl] Unknown mode: {mode}')
return
if mode == self._current_mode:
return
profile = PROFILES[mode]
self._set_rs_emitter(profile.emitter_enabled)
self._set_imx219(profile)
self._current_mode = mode
self._node.get_logger().info(
f'[camera_ctrl] Applied profile={mode} '
f'emitter={profile.emitter_enabled} '
f'gain={profile.imx_gain} exp={profile.imx_exposure_us}'
)
# ── D435i (RealSense) ─────────────────────────────────────────────────────
def _init_rs_client(self) -> None:
self._cli = self._node.create_client(
SetParameters, f'{_RS_NODE}/set_parameters'
)
def _set_rs_emitter(self, value: int) -> None:
if not self._cli.service_is_ready():
self._node.get_logger().debug(
'[camera_ctrl] RS parameter service not ready — skipping emitter set'
)
return
pval = ParameterValue(type=ParameterType.PARAMETER_INTEGER, integer_value=value)
param = Parameter(name='depth_module.emitter_enabled', value=pval)
req = SetParameters.Request(parameters=[param])
self._cli.call_async(req) # fire-and-forget
# ── IMX219 via v4l2-ctl ──────────────────────────────────────────────────
def _set_imx219(self, profile: CameraProfile) -> None:
for dev in _IMX219_DEVICES.values():
self._v4l2_set(dev, 'exposure_auto', profile.imx_ae_mode)
if profile.imx_exposure_us is not None:
self._v4l2_set(dev, 'exposure', profile.imx_exposure_us)
if profile.imx_gain is not None:
self._v4l2_set(dev, 'analogue_gain', profile.imx_gain)
def _v4l2_set(self, dev: str, ctrl: str, value: int) -> None:
cmd = ['v4l2-ctl', '-d', dev, f'--set-ctrl={ctrl}={value}']
try:
result = subprocess.run(cmd, capture_output=True, timeout=1.0)
if result.returncode != 0:
self._node.get_logger().debug(
f'[camera_ctrl] v4l2 {dev} {ctrl}={value}: '
+ result.stderr.decode().strip()
)
except (subprocess.TimeoutExpired, FileNotFoundError) as exc:
self._node.get_logger().debug(f'[camera_ctrl] v4l2-ctl unavailable: {exc}')

View File

@ -0,0 +1,192 @@
"""
gpio_headlight.py Jetson Orin Nano Super 40-pin GPIO headlight control.
Physical pin 33 on the J26 40-pin header is used for the headlight.
On JetPack 6 this corresponds to sysfs GPIO chip gpio-tegra234 / offset 12.
Brightness levels (0.01.0):
0.0 headlight off
0.25 dim (e.g. indicator, twilight assist)
0.50 medium (normal dark indoor)
1.0 full (outdoor night, ir_only mode)
Implementation:
1. Tries Jetson.GPIO PWM (RPi-compatible API, available if jetson-gpio installed)
2. Falls back to sysfs binary (on/off only) via /sys/class/gpio
3. If sysfs export fails (permissions / virtual env), logs a warning and no-ops.
Sysfs GPIO path on Orin Nano Super (JetPack 6):
/sys/class/gpio/gpiochip<N>/ use the tegra234 chip
Or the legacy /sys/class/gpio/gpio<number>/ path after export.
Note: udev rule or `sudo` may be needed in production.
Add to /etc/udev/rules.d/99-jetson-gpio.rules:
SUBSYSTEM=="gpio", ACTION=="add", RUN+="/bin/chmod 777 /sys/class/gpio"
"""
from __future__ import annotations
import logging
import os
from typing import Optional
_LOG = logging.getLogger(__name__)
# Physical board pin for the headlight
_BOARD_PIN = 33
# Brightness → duty cycle string mapping for sysfs PWM (when available)
_LEVEL_DUTY: dict[str, float] = {
'off': 0.0,
'dim': 0.25,
'medium': 0.50,
'full': 1.0,
}
class GPIOHeadlight:
"""Controls the GPIO headlight with graceful fallbacks."""
def __init__(self, pin: int = _BOARD_PIN):
self._pin = pin
self._brightness: float = 0.0
self._backend: str = 'none'
self._pwm = None
self._gpio_path: Optional[str] = None
self._init()
# ── Public API ────────────────────────────────────────────────────────────
def set_brightness(self, level: float) -> None:
"""Set headlight brightness in [0.0, 1.0]."""
level = max(0.0, min(1.0, level))
if level == self._brightness:
return
self._brightness = level
self._apply(level)
def set_level(self, name: str) -> None:
"""Set headlight to a named level: 'off', 'dim', 'medium', 'full'."""
self.set_brightness(_LEVEL_DUTY.get(name, 0.0))
def off(self) -> None:
self.set_brightness(0.0)
def cleanup(self) -> None:
self.off()
if self._pwm is not None:
try:
self._pwm.stop()
import Jetson.GPIO as GPIO
GPIO.cleanup()
except Exception:
pass
elif self._gpio_path:
self._gpio_write('0')
# ── Init backends ─────────────────────────────────────────────────────────
def _init(self) -> None:
# Try Jetson.GPIO PWM first
try:
import Jetson.GPIO as GPIO
GPIO.setmode(GPIO.BOARD)
GPIO.setup(self._pin, GPIO.OUT, initial=GPIO.LOW)
self._pwm = GPIO.PWM(self._pin, 1000) # 1 kHz PWM
self._pwm.start(0)
self._backend = 'jetson_gpio_pwm'
_LOG.info(f'[headlight] Jetson.GPIO PWM on pin {self._pin}')
return
except ImportError:
_LOG.debug('[headlight] Jetson.GPIO not available')
except Exception as exc:
_LOG.warning(f'[headlight] Jetson.GPIO init failed: {exc}')
# Fall back to sysfs binary
gpio_num = self._board_pin_to_sysfs(self._pin)
if gpio_num is not None:
path = f'/sys/class/gpio/gpio{gpio_num}'
if self._sysfs_export(gpio_num):
self._gpio_path = path
self._backend = 'sysfs'
_LOG.info(f'[headlight] sysfs GPIO /sys/class/gpio/gpio{gpio_num}')
return
_LOG.warning('[headlight] No GPIO backend available — headlight is simulated')
self._backend = 'sim'
# ── Apply ─────────────────────────────────────────────────────────────────
def _apply(self, level: float) -> None:
if self._backend == 'jetson_gpio_pwm':
try:
self._pwm.ChangeDutyCycle(level * 100.0)
except Exception as exc:
_LOG.warning(f'[headlight] PWM error: {exc}')
elif self._backend == 'sysfs':
self._gpio_write('1' if level > 0 else '0')
# sim: no-op
# ── Sysfs helpers ─────────────────────────────────────────────────────────
@staticmethod
def _board_pin_to_sysfs(board_pin: int) -> Optional[int]:
"""
Map physical 40-pin board pin to sysfs GPIO number on Orin Nano Super.
Only a subset is mapped here; extend as needed.
"""
# Partial map of Orin Nano Super 40-pin header (physical → sysfs offset)
_ORIN_MAP: dict[int, int] = {
7: 106,
11: 112,
12: 50,
13: 108,
15: 109,
16: 85,
18: 110,
19: 130,
21: 128,
22: 111,
23: 129,
24: 131,
26: 132,
29: 118,
31: 120,
32: 122,
33: 200, # headlight pin
35: 116,
36: 54,
37: 121,
38: 117,
40: 56,
}
return _ORIN_MAP.get(board_pin)
def _sysfs_export(self, gpio_num: int) -> bool:
export_path = f'/sys/class/gpio/gpio{gpio_num}'
if not os.path.exists(export_path):
try:
with open('/sys/class/gpio/export', 'w') as f:
f.write(str(gpio_num))
except PermissionError:
_LOG.warning('[headlight] No permission for /sys/class/gpio/export')
return False
except OSError as exc:
_LOG.warning(f'[headlight] sysfs export error: {exc}')
return False
try:
with open(f'{export_path}/direction', 'w') as f:
f.write('out')
return True
except OSError as exc:
_LOG.warning(f'[headlight] sysfs direction error: {exc}')
return False
def _gpio_write(self, value: str) -> None:
if not self._gpio_path:
return
try:
with open(f'{self._gpio_path}/value', 'w') as f:
f.write(value)
except OSError as exc:
_LOG.warning(f'[headlight] sysfs write error: {exc}')

View File

@ -0,0 +1,157 @@
"""
ir_slam_bridge_node.py Bridge D435i IR1 stream to RGB-style topics for SLAM.
When the vision mode is 'night' or 'ir_only', visible-light cameras are
unusable. RTAB-Map (and other SLAM front-ends) expect a colour image. This
node re-publishes the D435i infra1 (mono8) stream as fake RGB (rgb8) so that
RTAB-Map's visual front-end continues to operate without reconfiguration.
Also re-publishes the camera_info with the same frame_id so that the pipeline
stays consistent.
When mode is 'day' or 'twilight' the bridge is inactive (no republish) to
avoid double-feeding RTAB-Map.
Subscribes:
/saltybot/vision_mode std_msgs/String mode gate
/camera/infra1/image_rect_raw sensor_msgs/Image IR source (mono8)
/camera/infra1/camera_info sensor_msgs/CameraInfo IR intrinsics
Publishes (when active):
/saltybot/ir_bridge/image_raw sensor_msgs/Image grey-as-rgb8
/saltybot/ir_bridge/camera_info sensor_msgs/CameraInfo passthrough
Parameters:
active_modes str 'night,ir_only' (comma-separated)
output_encoding str 'rgb8'
"""
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
import numpy as np
import cv2
from cv_bridge import CvBridge
from sensor_msgs.msg import Image, CameraInfo
from std_msgs.msg import String
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=5,
)
_LATCHED_QOS = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=1,
durability=DurabilityPolicy.TRANSIENT_LOCAL,
)
_RELIABLE_QOS = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
class IRSlamBridgeNode(Node):
def __init__(self):
super().__init__('ir_slam_bridge')
self.declare_parameter('active_modes', 'night,ir_only')
self.declare_parameter('output_encoding', 'rgb8')
active_str = self.get_parameter('active_modes').value
self._active_modes: set[str] = {m.strip() for m in active_str.split(',')}
self._out_enc: str = self.get_parameter('output_encoding').value
self._bridge = CvBridge()
self._active = False
self._latest_info: CameraInfo | None = None
# Mode gate
self.create_subscription(
String, '/saltybot/vision_mode', self._on_mode, _RELIABLE_QOS
)
# IR source
self.create_subscription(
Image, '/camera/infra1/image_rect_raw', self._on_ir_image, _SENSOR_QOS
)
self.create_subscription(
CameraInfo, '/camera/infra1/camera_info', self._on_camera_info, _LATCHED_QOS
)
# Bridge output
self._img_pub = self.create_publisher(
Image, '/saltybot/ir_bridge/image_raw', 10
)
self._info_pub = self.create_publisher(
CameraInfo, '/saltybot/ir_bridge/camera_info', 10
)
self.get_logger().info(
f'ir_slam_bridge ready — active in modes: {sorted(self._active_modes)}'
)
# ── Subscriptions ─────────────────────────────────────────────────────────
def _on_mode(self, msg: String) -> None:
was_active = self._active
self._active = msg.data in self._active_modes
if self._active != was_active:
state = 'ACTIVE' if self._active else 'idle'
self.get_logger().info(
f'[ir_bridge] mode={msg.data} → bridge {state}'
)
def _on_camera_info(self, msg: CameraInfo) -> None:
self._latest_info = msg
def _on_ir_image(self, msg: Image) -> None:
if not self._active:
return
try:
mono = self._bridge.imgmsg_to_cv2(msg, desired_encoding='mono8')
except Exception as exc:
self.get_logger().warning(f'[ir_bridge] decode error: {exc}')
return
# Convert mono8 → target encoding
if self._out_enc == 'rgb8':
colour = cv2.cvtColor(mono, cv2.COLOR_GRAY2RGB)
elif self._out_enc == 'bgr8':
colour = cv2.cvtColor(mono, cv2.COLOR_GRAY2BGR)
else:
colour = mono # passthrough
try:
out_msg = self._bridge.cv2_to_imgmsg(colour, encoding=self._out_enc)
except Exception as exc:
self.get_logger().warning(f'[ir_bridge] encode error: {exc}')
return
out_msg.header = msg.header
self._img_pub.publish(out_msg)
if self._latest_info is not None:
self._latest_info.header = msg.header
self._info_pub.publish(self._latest_info)
def main(args=None):
rclpy.init(args=args)
node = IRSlamBridgeNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,107 @@
"""
light_analyser.py Histogram-based ambient light analysis and vision mode FSM.
Vision modes (string keys used on /saltybot/vision_mode):
'day' full colour, all cameras nominal
'twilight' dim light, start boosting IMX219 gain
'night' dark, max gain + headlight on, IR emitter off
'ir_only' effectively no ambient light, switch to D435i IR stereo only
Transitions use hysteresis to avoid rapid toggling:
Promotion (to darker mode): threshold - HYST
Demotion (to lighter mode): threshold + HYST
"""
from __future__ import annotations
import numpy as np
from typing import Tuple
# ── Thresholds (0255 mean grey intensity) ───────────────────────────────────
# Modes: ir_only < NIGHT_THR <= night < TWILIGHT_THR <= twilight < DAY_THR <= day
_TWILIGHT_THR = 110 # below → twilight
_NIGHT_THR = 45 # below → night
_IR_ONLY_THR = 15 # below → ir_only
_HYST = 8 # hysteresis band (half-width)
# Ordered list of modes darkest → brightest
_MODES = ('ir_only', 'night', 'twilight', 'day')
# Per-mode thresholds: (promote_below, demote_above)
# promote_below — if median < this, enter the next darker mode
# demote_above — if median > this, enter the next lighter mode
#
# Hysteresis: promote threshold is thr - HYST; demote is thr + HYST
# where thr is the boundary between THIS mode and the adjacent lighter mode.
_THRESHOLDS = {
# (promote_below, demote_above)
'day': (_TWILIGHT_THR - _HYST, None),
'twilight': (_NIGHT_THR - _HYST, _TWILIGHT_THR + _HYST),
'night': (_IR_ONLY_THR - _HYST, _NIGHT_THR + _HYST),
'ir_only': (None, _IR_ONLY_THR + _HYST),
}
class LightAnalyser:
"""Stateful ambient-light classifier with hysteresis."""
def __init__(
self,
initial_mode: str = 'day',
history_len: int = 6,
):
if initial_mode not in _MODES:
raise ValueError(f'Unknown mode: {initial_mode}')
self._mode = initial_mode
self._history: list[float] = []
self._history_len = history_len
# ── Public API ────────────────────────────────────────────────────────────
@property
def mode(self) -> str:
return self._mode
def update(self, frame: np.ndarray) -> Tuple[str, float]:
"""
Analyse a new image frame (any colour space; converted to grey internally).
Returns:
(mode, mean_intensity) mode string + 0255 grey mean
"""
intensity = self._frame_intensity(frame)
self._history.append(intensity)
if len(self._history) > self._history_len:
self._history.pop(0)
smooth = float(np.median(self._history))
self._mode = self._next_mode(self._mode, smooth)
return self._mode, smooth
# ── Internal helpers ──────────────────────────────────────────────────────
@staticmethod
def _frame_intensity(frame: np.ndarray) -> float:
"""Return mean pixel intensity of the frame as float in [0, 255]."""
if frame.ndim == 3:
# RGB / BGR → greyscale
grey = np.mean(frame, axis=2).astype(np.float32)
else:
grey = frame.astype(np.float32)
return float(grey.mean())
@staticmethod
def _next_mode(current: str, intensity: float) -> str:
"""Apply hysteresis FSM to decide next mode."""
idx = _MODES.index(current)
# Try promotion (→ darker mode) first
lo, hi = _THRESHOLDS[current]
if lo is not None and intensity < lo:
return _MODES[max(0, idx - 1)]
# Try demotion (→ lighter mode)
if hi is not None and intensity > hi:
return _MODES[min(len(_MODES) - 1, idx + 1)]
return current

View File

@ -0,0 +1,141 @@
"""
light_monitor_node.py Ambient light monitor for SaltyBot night vision.
Uses the front IMX219 RGB stream (or D435i infra1 as fallback) to estimate
ambient brightness via histogram analysis. Publishes the current vision mode
and raw intensity on a 2 Hz digest timer.
Subscribes (any ONE active at a time):
/camera/color/image_raw sensor_msgs/Image RGB from IMX219 front (primary)
/camera/infra1/image_rect_raw sensor_msgs/Image IR fallback (mono8)
Publishes:
/saltybot/ambient_light std_msgs/Float32 mean grey intensity 0255
/saltybot/vision_mode std_msgs/String 'day'|'twilight'|'night'|'ir_only'
Parameters:
primary_camera_topic str '/camera/color/image_raw'
fallback_ir_topic str '/camera/infra1/image_rect_raw'
publish_hz float 2.0
initial_mode str 'day'
history_len int 6
"""
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
import numpy as np
import cv2
from cv_bridge import CvBridge
from sensor_msgs.msg import Image
from std_msgs.msg import Float32, String
from .light_analyser import LightAnalyser
_SENSOR_QOS = QoSProfile(
reliability=ReliabilityPolicy.BEST_EFFORT,
history=HistoryPolicy.KEEP_LAST,
depth=3,
)
class LightMonitorNode(Node):
def __init__(self):
super().__init__('light_monitor')
self.declare_parameter('primary_camera_topic', '/camera/color/image_raw')
self.declare_parameter('fallback_ir_topic', '/camera/infra1/image_rect_raw')
self.declare_parameter('publish_hz', 2.0)
self.declare_parameter('initial_mode', 'day')
self.declare_parameter('history_len', 6)
primary_topic = self.get_parameter('primary_camera_topic').value
fallback_topic = self.get_parameter('fallback_ir_topic').value
pub_hz = self.get_parameter('publish_hz').value
initial_mode = self.get_parameter('initial_mode').value
history_len = self.get_parameter('history_len').value
self._bridge = CvBridge()
self._analyser = LightAnalyser(initial_mode=initial_mode, history_len=history_len)
# Latest decoded frame (thread-safe in single-threaded executor)
self._latest_frame: np.ndarray | None = None
self._using_fallback = False
# Subscriptions — primary preferred; fallback used when primary silent
self.create_subscription(Image, primary_topic, self._on_primary, _SENSOR_QOS)
self.create_subscription(Image, fallback_topic, self._on_fallback, _SENSOR_QOS)
self._mode_pub = self.create_publisher(String, '/saltybot/vision_mode', 10)
self._light_pub = self.create_publisher(Float32, '/saltybot/ambient_light', 10)
self.create_timer(1.0 / pub_hz, self._tick)
self.get_logger().info(
f'light_monitor ready — primary={primary_topic} fallback={fallback_topic}'
)
# ── Subscriptions ─────────────────────────────────────────────────────────
def _on_primary(self, msg: Image) -> None:
try:
frame = self._bridge.imgmsg_to_cv2(msg, desired_encoding='passthrough')
self._latest_frame = frame
self._using_fallback = False
except Exception as exc:
self.get_logger().warning(f'[light_monitor] primary decode: {exc}')
def _on_fallback(self, msg: Image) -> None:
# Only use fallback if no primary frame recently arrived
if self._latest_frame is not None and not self._using_fallback:
return # primary is active
try:
frame = self._bridge.imgmsg_to_cv2(msg, desired_encoding='mono8')
self._latest_frame = frame
self._using_fallback = True
except Exception as exc:
self.get_logger().warning(f'[light_monitor] fallback decode: {exc}')
# ── Publish timer ─────────────────────────────────────────────────────────
def _tick(self) -> None:
if self._latest_frame is None:
return # no data yet
# Subsample for speed (analyse quarter-resolution)
frame = self._latest_frame
if frame.shape[0] > 120:
frame = frame[::4, ::4]
mode, intensity = self._analyser.update(frame)
msg_mode = String()
msg_mode.data = mode
self._mode_pub.publish(msg_mode)
msg_light = Float32()
msg_light.data = float(intensity)
self._light_pub.publish(msg_light)
if self._using_fallback:
self.get_logger().debug(
f'[light_monitor] IR fallback — mode={mode} intensity={intensity:.1f}'
)
def main(args=None):
rclpy.init(args=args)
node = LightMonitorNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,137 @@
"""
night_vision_controller_node.py Hardware adaptation on vision mode changes.
Subscribes to /saltybot/vision_mode and drives:
D435i IR emitter (via realsense2_camera parameter service)
IMX219 gain / exposure (via v4l2-ctl subprocess)
GPIO headlight brightness
Mode hardware mapping:
'day' emitter=auto, IMX219 AE, headlight off
'twilight' emitter=on, IMX219 boosted, headlight dim
'night' emitter=on, IMX219 max gain, headlight medium
'ir_only' emitter=on, IMX219 max gain, headlight full
Publishes:
/saltybot/visual_odom_status std_msgs/String (JSON reuses VO status topic)
Parameters:
headlight_pin int 33 (physical 40-pin board pin)
day_headlight float 0.0
twilight_headlight float 0.25
night_headlight float 0.50
ir_only_headlight float 1.0
"""
import json
import time
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from std_msgs.msg import String
from .camera_controller import CameraController
from .gpio_headlight import GPIOHeadlight
_RELIABLE_QOS = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE,
history=HistoryPolicy.KEEP_LAST,
depth=10,
)
class NightVisionControllerNode(Node):
def __init__(self):
super().__init__('night_vision_controller')
self.declare_parameter('headlight_pin', 33)
self.declare_parameter('day_headlight', 0.0)
self.declare_parameter('twilight_headlight', 0.25)
self.declare_parameter('night_headlight', 0.50)
self.declare_parameter('ir_only_headlight', 1.0)
pin = self.get_parameter('headlight_pin').value
self._brightness_map = {
'day': self.get_parameter('day_headlight').value,
'twilight': self.get_parameter('twilight_headlight').value,
'night': self.get_parameter('night_headlight').value,
'ir_only': self.get_parameter('ir_only_headlight').value,
}
self._cam_ctrl = CameraController(self)
self._headlight = GPIOHeadlight(pin=pin)
self._cur_mode: str | None = None
self._mode_change_t: float = 0.0
self.create_subscription(
String, '/saltybot/vision_mode', self._on_mode, _RELIABLE_QOS
)
self._status_pub = self.create_publisher(
String, '/saltybot/night_vision_status', 10
)
self.create_timer(5.0, self._status_tick)
self.get_logger().info(
f'night_vision_controller ready — headlight pin={pin}'
)
# ── Subscription ──────────────────────────────────────────────────────────
def _on_mode(self, msg: String) -> None:
mode = msg.data
if mode == self._cur_mode:
return
prev = self._cur_mode
self._cur_mode = mode
self._mode_change_t = time.monotonic()
# Apply camera profile (D435i emitter + IMX219 gain/exposure)
self._cam_ctrl.set_profile(mode)
# Apply headlight
brightness = self._brightness_map.get(mode, 0.0)
self._headlight.set_brightness(brightness)
self.get_logger().info(
f'[nv_ctrl] {prev}{mode} | '
f'headlight={brightness:.2f}'
)
# ── Status heartbeat ──────────────────────────────────────────────────────
def _status_tick(self) -> None:
status = {
'mode': self._cur_mode or 'unknown',
'headlight': round(self._headlight._brightness, 2),
'mode_age_s': round(time.monotonic() - self._mode_change_t, 1)
if self._cur_mode else None,
}
msg = String()
msg.data = json.dumps(status)
self._status_pub.publish(msg)
# ── Cleanup ───────────────────────────────────────────────────────────────
def destroy_node(self) -> None:
self._headlight.cleanup()
super().destroy_node()
def main(args=None):
rclpy.init(args=args)
node = NightVisionControllerNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_night_vision
[install]
install_scripts=$base/lib/saltybot_night_vision

View File

@ -0,0 +1,34 @@
from setuptools import setup, find_packages
import os
from glob import glob
package_name = 'saltybot_night_vision'
setup(
name=package_name,
version='0.1.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/launch',
glob('launch/*.launch.py')),
('share/' + package_name + '/config',
glob('config/*.yaml')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='SaltyLab',
maintainer_email='robot@saltylab.local',
description='Night vision mode: IR emitter, headlight, ambient-light FSM, IR SLAM bridge',
license='MIT',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'light_monitor = saltybot_night_vision.light_monitor_node:main',
'night_vision_ctrl = saltybot_night_vision.night_vision_controller_node:main',
'ir_slam_bridge = saltybot_night_vision.ir_slam_bridge_node:main',
],
},
)

View File

@ -0,0 +1,152 @@
"""
test_night_vision.py Unit tests for LightAnalyser and GPIOHeadlight.
Runs without ROS2 / Jetson hardware (no rclpy, no GPIO).
"""
import sys
import os
import math
import numpy as np
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from saltybot_night_vision.light_analyser import LightAnalyser
from saltybot_night_vision.gpio_headlight import GPIOHeadlight
# ── LightAnalyser tests ───────────────────────────────────────────────────────
class TestLightAnalyser:
def _bright_frame(self, value: int = 180) -> np.ndarray:
return np.full((480, 640), value, dtype=np.uint8)
def _dark_frame(self, value: int = 5) -> np.ndarray:
return np.full((480, 640), value, dtype=np.uint8)
def test_initial_mode_default(self):
la = LightAnalyser()
assert la.mode == 'day'
def test_bright_frame_stays_day(self):
la = LightAnalyser()
for _ in range(10):
mode, _ = la.update(self._bright_frame(180))
assert mode == 'day'
def test_dark_frame_transitions_to_ir_only(self):
la = LightAnalyser()
for _ in range(10):
mode, _ = la.update(self._dark_frame(3))
assert mode == 'ir_only'
def test_medium_intensity_twilight(self):
la = LightAnalyser()
frame = np.full((480, 640), 75, dtype=np.uint8)
for _ in range(10):
mode, _ = la.update(frame)
assert mode == 'twilight'
def test_night_range(self):
la = LightAnalyser()
frame = np.full((480, 640), 25, dtype=np.uint8)
for _ in range(10):
mode, _ = la.update(frame)
assert mode == 'night'
def test_intensity_value_correct(self):
la = LightAnalyser()
frame = np.full((60, 80), 128, dtype=np.uint8)
_, intensity = la.update(frame)
assert intensity == pytest.approx(128.0, abs=0.5)
def test_rgb_frame_accepted(self):
la = LightAnalyser()
frame = np.full((480, 640, 3), 60, dtype=np.uint8)
mode, intensity = la.update(frame)
assert isinstance(mode, str)
assert 0.0 <= intensity <= 255.0
def test_hysteresis_prevents_oscillation(self):
"""Frame right at threshold should not cause mode flip on each tick."""
la = LightAnalyser()
# Start from day, push to just below twilight threshold
frame = np.full((480, 640), 100, dtype=np.uint8)
modes: list[str] = []
for _ in range(20):
mode, _ = la.update(frame)
modes.append(mode)
# Should not oscillate — all same mode
assert len(set(modes[-10:])) == 1
def test_recovery_to_day(self):
la = LightAnalyser(initial_mode='ir_only')
bright = np.full((480, 640), 200, dtype=np.uint8)
for _ in range(20):
mode, _ = la.update(bright)
assert mode == 'day'
# ── GPIOHeadlight tests (simulated — no hardware) ─────────────────────────────
class TestGPIOHeadlightSim:
"""
Tests that GPIOHeadlight correctly tracks brightness internally
even when running in simulation mode (no hardware GPIO).
"""
def test_default_brightness_zero(self):
hl = GPIOHeadlight()
assert hl._brightness == pytest.approx(0.0)
def test_set_brightness_clamps_high(self):
hl = GPIOHeadlight()
hl.set_brightness(2.5)
assert hl._brightness == pytest.approx(1.0)
def test_set_brightness_clamps_low(self):
hl = GPIOHeadlight()
hl.set_brightness(-0.5)
assert hl._brightness == pytest.approx(0.0)
def test_set_level_off(self):
hl = GPIOHeadlight()
hl.set_brightness(0.8)
hl.set_level('off')
assert hl._brightness == pytest.approx(0.0)
def test_set_level_full(self):
hl = GPIOHeadlight()
hl.set_level('full')
assert hl._brightness == pytest.approx(1.0)
def test_set_level_dim(self):
hl = GPIOHeadlight()
hl.set_level('dim')
assert hl._brightness == pytest.approx(0.25)
def test_off_method(self):
hl = GPIOHeadlight()
hl.set_brightness(0.75)
hl.off()
assert hl._brightness == pytest.approx(0.0)
def test_cleanup_sets_zero(self):
hl = GPIOHeadlight()
hl.set_brightness(0.9)
hl.cleanup()
assert hl._brightness == pytest.approx(0.0)
def test_no_op_on_same_brightness(self):
"""Repeated set of same value should not raise."""
hl = GPIOHeadlight()
hl.set_brightness(0.5)
hl.set_brightness(0.5) # should be no-op, no error
assert hl._brightness == pytest.approx(0.5)
if __name__ == '__main__':
pytest.main([__file__, '-v'])