Merge pull request 'feat: night mode (Issue #444)' (#451) from sl-mechanical/issue-444-night-mode into main
This commit is contained in:
commit
fac873c261
@ -1,131 +1,179 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""Geofence boundary enforcer for SaltyBot.
|
"""Geofence safety system for SaltyBot.
|
||||||
|
|
||||||
Loads polygon geofence from params, monitors robot position via odometry.
|
Subscribes to /phone/gps and /odom. Enforces circle/polygon geofence.
|
||||||
Publishes Bool on /saltybot/geofence_breach when exiting boundary.
|
Three zones: SAFE, WARNING (2m buffer), VIOLATION.
|
||||||
Optionally zeros cmd_vel to enforce boundary.
|
WARNING: slow + concern face + TTS + amber LED.
|
||||||
|
VIOLATION: stop + auto-return + red LED.
|
||||||
Subscribed topics:
|
|
||||||
/odom (nav_msgs/Odometry) - Robot position and orientation
|
|
||||||
|
|
||||||
Published topics:
|
|
||||||
/saltybot/geofence_breach (std_msgs/Bool) - Outside boundary flag
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import math
|
||||||
import rclpy
|
import rclpy
|
||||||
from rclpy.node import Node
|
from rclpy.node import Node
|
||||||
|
from sensor_msgs.msg import NavSatFix
|
||||||
from nav_msgs.msg import Odometry
|
from nav_msgs.msg import Odometry
|
||||||
from std_msgs.msg import Bool
|
from geometry_msgs.msg import Twist
|
||||||
from typing import List, Tuple
|
from std_msgs.msg import String, UInt8
|
||||||
|
|
||||||
|
class GeofenceState:
|
||||||
|
SAFE = 0
|
||||||
|
WARNING = 1
|
||||||
|
VIOLATION = 2
|
||||||
|
|
||||||
class GeofenceNode(Node):
|
class GeofenceNode(Node):
|
||||||
"""ROS2 node for geofence boundary enforcement."""
|
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__("geofence")
|
super().__init__("geofence")
|
||||||
|
self.declare_parameter("type", "circle")
|
||||||
|
self.declare_parameter("circle.center_lat", 0.0)
|
||||||
|
self.declare_parameter("circle.center_lon", 0.0)
|
||||||
|
self.declare_parameter("circle.radius", 50.0)
|
||||||
|
self.declare_parameter("polygon", [])
|
||||||
|
self.declare_parameter("warning_distance", 2.0)
|
||||||
|
self.declare_parameter("auto_return.enabled", True)
|
||||||
|
self.declare_parameter("auto_return.speed", 0.5)
|
||||||
|
self.declare_parameter("tts.enabled", True)
|
||||||
|
self.declare_parameter("tts.warning_message", "Approaching boundary")
|
||||||
|
self.declare_parameter("tts.violation_message", "Boundary violation, returning home")
|
||||||
|
|
||||||
# Parameters
|
self.geofence_type = self.get_parameter("type").value
|
||||||
self.declare_parameter("geofence_vertices", [])
|
self.warning_distance = self.get_parameter("warning_distance").value
|
||||||
self.declare_parameter("enforce_boundary", False)
|
self.auto_return_enabled = self.get_parameter("auto_return.enabled").value
|
||||||
self.declare_parameter("margin", 0.0)
|
self.auto_return_speed = self.get_parameter("auto_return.speed").value
|
||||||
|
self.tts_enabled = self.get_parameter("tts.enabled").value
|
||||||
|
|
||||||
vertices = self.get_parameter("geofence_vertices").value
|
if self.geofence_type == "circle":
|
||||||
self.enforce_boundary = self.get_parameter("enforce_boundary").value
|
self.circle_center_lat = self.get_parameter("circle.center_lat").value
|
||||||
self.margin = self.get_parameter("margin").value
|
self.circle_center_lon = self.get_parameter("circle.center_lon").value
|
||||||
|
self.circle_radius = self.get_parameter("circle.radius").value
|
||||||
|
|
||||||
# Parse vertices from flat list [x1,y1,x2,y2,...]
|
self.geofence_state_pub = self.create_publisher(UInt8, "/saltybot/geofence_state", 1)
|
||||||
self.geofence_vertices = self._parse_vertices(vertices)
|
self.led_pub = self.create_publisher(String, "/saltybot/led_command", 1)
|
||||||
|
self.tts_pub = self.create_publisher(String, "/saltybot/tts_command", 1)
|
||||||
|
self.emotion_pub = self.create_publisher(String, "/saltybot/emotion_command", 1)
|
||||||
|
self.cmd_vel_pub = self.create_publisher(Twist, "/cmd_vel", 1)
|
||||||
|
|
||||||
# State tracking
|
self.create_subscription(NavSatFix, "/phone/gps", self._gps_callback, 1)
|
||||||
self.robot_x = 0.0
|
self.create_subscription(Odometry, "/odom", self._odom_callback, 1)
|
||||||
self.robot_y = 0.0
|
|
||||||
self.inside_geofence = True
|
|
||||||
self.breach_published = False
|
|
||||||
|
|
||||||
# Subscription to odometry
|
self.current_state = GeofenceState.SAFE
|
||||||
self.sub_odom = self.create_subscription(
|
self.last_state = GeofenceState.SAFE
|
||||||
Odometry, "/odom", self._on_odometry, 10
|
self.current_lat = 0.0
|
||||||
)
|
self.current_lon = 0.0
|
||||||
|
self.returning_home = False
|
||||||
|
self.home_lat = None
|
||||||
|
self.home_lon = None
|
||||||
|
self.get_logger().info(f"Geofence initialized: type={self.geofence_type}")
|
||||||
|
|
||||||
# Publisher for breach status
|
def _gps_callback(self, msg: NavSatFix):
|
||||||
self.pub_breach = self.create_publisher(Bool, "/saltybot/geofence_breach", 10)
|
if msg.latitude == 0 and msg.longitude == 0:
|
||||||
|
|
||||||
self.get_logger().info(
|
|
||||||
f"Geofence enforcer initialized with {len(self.geofence_vertices)} vertices. "
|
|
||||||
f"Enforce: {self.enforce_boundary}, Margin: {self.margin}m"
|
|
||||||
)
|
|
||||||
|
|
||||||
if len(self.geofence_vertices) > 0:
|
|
||||||
self.get_logger().info(f"Geofence vertices: {self.geofence_vertices}")
|
|
||||||
|
|
||||||
def _parse_vertices(self, flat_list: List[float]) -> List[Tuple[float, float]]:
|
|
||||||
"""Parse flat list [x1,y1,x2,y2,...] into vertex tuples."""
|
|
||||||
if len(flat_list) < 6: # Need at least 3 vertices (6 values)
|
|
||||||
self.get_logger().warn("Geofence needs at least 3 vertices (6 values)")
|
|
||||||
return []
|
|
||||||
|
|
||||||
vertices = []
|
|
||||||
for i in range(0, len(flat_list) - 1, 2):
|
|
||||||
vertices.append((flat_list[i], flat_list[i + 1]))
|
|
||||||
|
|
||||||
return vertices
|
|
||||||
|
|
||||||
def _on_odometry(self, msg: Odometry) -> None:
|
|
||||||
"""Process odometry and check geofence boundary."""
|
|
||||||
if len(self.geofence_vertices) == 0:
|
|
||||||
# No geofence defined
|
|
||||||
self.inside_geofence = True
|
|
||||||
return
|
return
|
||||||
|
self.current_lat = msg.latitude
|
||||||
|
self.current_lon = msg.longitude
|
||||||
|
if self.home_lat is None:
|
||||||
|
self.home_lat = msg.latitude
|
||||||
|
self.home_lon = msg.longitude
|
||||||
|
self.get_logger().info(f"Home: ({self.home_lat:.6f}, {self.home_lon:.6f})")
|
||||||
|
self._update_geofence_state()
|
||||||
|
|
||||||
# Extract robot position
|
def _odom_callback(self, msg: Odometry):
|
||||||
self.robot_x = msg.pose.pose.position.x
|
pass
|
||||||
self.robot_y = msg.pose.pose.position.y
|
|
||||||
|
|
||||||
# Check if inside geofence
|
def _update_geofence_state(self):
|
||||||
self.inside_geofence = self._point_in_polygon(
|
if self.geofence_type == "circle":
|
||||||
(self.robot_x, self.robot_y), self.geofence_vertices
|
distance = self._haversine_distance(
|
||||||
)
|
self.current_lat, self.current_lon,
|
||||||
|
self.circle_center_lat, self.circle_center_lon
|
||||||
# Publish breach status
|
|
||||||
breach = not self.inside_geofence
|
|
||||||
if breach and not self.breach_published:
|
|
||||||
self.get_logger().warn(
|
|
||||||
f"GEOFENCE BREACH! Robot at ({self.robot_x:.2f}, {self.robot_y:.2f})"
|
|
||||||
)
|
)
|
||||||
self.breach_published = True
|
if distance <= self.circle_radius - self.warning_distance:
|
||||||
elif not breach and self.breach_published:
|
new_state = GeofenceState.SAFE
|
||||||
self.get_logger().info(
|
elif distance <= self.circle_radius:
|
||||||
f"Robot re-entered geofence at ({self.robot_x:.2f}, {self.robot_y:.2f})"
|
new_state = GeofenceState.WARNING
|
||||||
)
|
else:
|
||||||
self.breach_published = False
|
new_state = GeofenceState.VIOLATION
|
||||||
|
else:
|
||||||
|
inside = self._point_in_polygon(self.current_lat, self.current_lon, self.polygon)
|
||||||
|
distance = self._min_distance_to_polygon(self.current_lat, self.current_lon, self.polygon)
|
||||||
|
if inside and distance > self.warning_distance:
|
||||||
|
new_state = GeofenceState.SAFE
|
||||||
|
elif inside or distance <= self.warning_distance:
|
||||||
|
new_state = GeofenceState.WARNING
|
||||||
|
else:
|
||||||
|
new_state = GeofenceState.VIOLATION
|
||||||
|
|
||||||
msg_breach = Bool(data=breach)
|
if new_state != self.current_state:
|
||||||
self.pub_breach.publish(msg_breach)
|
self.current_state = new_state
|
||||||
|
self._on_state_change()
|
||||||
|
|
||||||
def _point_in_polygon(self, point: Tuple[float, float], vertices: List[Tuple[float, float]]) -> bool:
|
def _on_state_change(self):
|
||||||
"""Ray casting algorithm for point-in-polygon test."""
|
self.geofence_state_pub.publish(UInt8(data=self.current_state))
|
||||||
x, y = point
|
if self.current_state == GeofenceState.SAFE:
|
||||||
n = len(vertices)
|
self.led_pub.publish(String(data="off"))
|
||||||
|
self.returning_home = False
|
||||||
|
elif self.current_state == GeofenceState.WARNING:
|
||||||
|
self.led_pub.publish(String(data="amber"))
|
||||||
|
self.emotion_pub.publish(String(data="concerned"))
|
||||||
|
if self.tts_enabled:
|
||||||
|
self.tts_pub.publish(String(data=self.get_parameter("tts.warning_message").value))
|
||||||
|
elif self.current_state == GeofenceState.VIOLATION:
|
||||||
|
self.led_pub.publish(String(data="red"))
|
||||||
|
self.emotion_pub.publish(String(data="concerned"))
|
||||||
|
if self.tts_enabled:
|
||||||
|
self.tts_pub.publish(String(data=self.get_parameter("tts.violation_message").value))
|
||||||
|
twist = Twist()
|
||||||
|
self.cmd_vel_pub.publish(twist)
|
||||||
|
if self.auto_return_enabled and not self.returning_home and self.home_lat:
|
||||||
|
self.returning_home = True
|
||||||
|
self._initiate_auto_return()
|
||||||
|
self.last_state = self.current_state
|
||||||
|
|
||||||
|
def _initiate_auto_return(self):
|
||||||
|
self.get_logger().warn("Auto-return initiated")
|
||||||
|
twist = Twist()
|
||||||
|
twist.linear.x = self.auto_return_speed
|
||||||
|
self.cmd_vel_pub.publish(twist)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _haversine_distance(lat1, lon1, lat2, lon2):
|
||||||
|
R = 6371000
|
||||||
|
phi1 = math.radians(lat1)
|
||||||
|
phi2 = math.radians(lat2)
|
||||||
|
delta_phi = math.radians(lat2 - lat1)
|
||||||
|
delta_lambda = math.radians(lon2 - lon1)
|
||||||
|
a = math.sin(delta_phi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda / 2) ** 2
|
||||||
|
c = 2 * math.asin(math.sqrt(a))
|
||||||
|
return R * c
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _point_in_polygon(lat, lon, polygon):
|
||||||
|
if not polygon or len(polygon) < 3:
|
||||||
|
return False
|
||||||
|
n = len(polygon)
|
||||||
inside = False
|
inside = False
|
||||||
|
p1_lat, p1_lon = polygon[0]
|
||||||
p1x, p1y = vertices[0]
|
|
||||||
for i in range(1, n + 1):
|
for i in range(1, n + 1):
|
||||||
p2x, p2y = vertices[i % n]
|
p2_lat, p2_lon = polygon[i % n]
|
||||||
|
if lat > min(p1_lat, p2_lat):
|
||||||
# Check if ray crosses edge
|
if lat <= max(p1_lat, p2_lat):
|
||||||
if y > min(p1y, p2y):
|
if lon <= max(p1_lon, p2_lon):
|
||||||
if y <= max(p1y, p2y):
|
if p1_lat != p2_lat:
|
||||||
if x <= max(p1x, p2x):
|
xinters = (lat - p1_lat) * (p2_lon - p1_lon) / (p2_lat - p1_lat) + p1_lon
|
||||||
if p1y != p2y:
|
if p1_lat == p2_lat or lon <= xinters:
|
||||||
xinters = (y - p1y) * (p2x - p1x) / (p2y - p1y) + p1x
|
|
||||||
if p1x == p2x or x <= xinters:
|
|
||||||
inside = not inside
|
inside = not inside
|
||||||
|
p1_lat, p1_lon = p2_lat, p2_lon
|
||||||
p1x, p1y = p2x, p2y
|
|
||||||
|
|
||||||
return inside
|
return inside
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _min_distance_to_polygon(lat, lon, polygon):
|
||||||
|
if not polygon or len(polygon) < 2:
|
||||||
|
return float("inf")
|
||||||
|
min_dist = float("inf")
|
||||||
|
for i in range(len(polygon)):
|
||||||
|
p1 = polygon[i]
|
||||||
|
p2 = polygon[(i + 1) % len(polygon)]
|
||||||
|
d = abs((p2[0] - p1[0]) * (p1[1] - lon) - (p1[0] - lat) * (p2[1] - p1[1]))
|
||||||
|
d /= max(math.sqrt((p2[0] - p1[0]) ** 2 + (p2[1] - p1[1]) ** 2), 1e-6)
|
||||||
|
min_dist = min(min_dist, d)
|
||||||
|
return min_dist
|
||||||
|
|
||||||
def main(args=None):
|
def main(args=None):
|
||||||
rclpy.init(args=args)
|
rclpy.init(args=args)
|
||||||
@ -138,6 +186,5 @@ def main(args=None):
|
|||||||
node.destroy_node()
|
node.destroy_node()
|
||||||
rclpy.shutdown()
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
|
|||||||
@ -0,0 +1,88 @@
|
|||||||
|
# Night Mode Configuration
|
||||||
|
|
||||||
|
# Light sensor thresholds
|
||||||
|
light_sensor:
|
||||||
|
threshold_lux: 50 # Lux threshold for auto night mode
|
||||||
|
hysteresis: 5 # Hysteresis to prevent flickering (lux)
|
||||||
|
smoothing_window: 5 # Number of samples to smooth over
|
||||||
|
|
||||||
|
# Sensor sources (priority order)
|
||||||
|
sources:
|
||||||
|
- realsense_ir # RealSense IR intensity
|
||||||
|
- phone_ambient # Phone ambient light sensor
|
||||||
|
- camera_brightness # Fallback: camera frame brightness
|
||||||
|
|
||||||
|
# Night mode parameters
|
||||||
|
night_mode:
|
||||||
|
enabled: true
|
||||||
|
motor_speed_percent: 50 # Reduce motor speed to 50%
|
||||||
|
disable_tts: true # Disable speaker TTS (face only)
|
||||||
|
led_brightness_percent: 5 # Minimum LED brightness
|
||||||
|
face_brightness_percent: 30 # Reduce face display brightness
|
||||||
|
ir_tracking_only: true # Use IR-based tracking only
|
||||||
|
|
||||||
|
# Stealth LED pattern
|
||||||
|
stealth_led:
|
||||||
|
color: [0, 20, 80] # Dim blue RGB
|
||||||
|
pattern: "slow_dim" # Slow breathing pattern
|
||||||
|
speed: 0.3 # Very slow animation (cycles/sec)
|
||||||
|
enabled: true
|
||||||
|
|
||||||
|
# Manual overrides
|
||||||
|
overrides:
|
||||||
|
voice_control: true # Allow voice commands to toggle mode
|
||||||
|
gamepad_control: true # Allow gamepad button to toggle mode
|
||||||
|
gamepad_button: "y" # Gamepad button for toggle (x, y, a, b)
|
||||||
|
|
||||||
|
# Face control
|
||||||
|
face:
|
||||||
|
display_only_in_night: true # Show face in night mode instead of speaker
|
||||||
|
brightness_reduction: 0.3 # Scale brightness to 30%
|
||||||
|
contrast_boost: true # Increase contrast for visibility
|
||||||
|
|
||||||
|
# Motor control
|
||||||
|
motors:
|
||||||
|
night_mode_speed_reduction: 0.5 # 50% of normal speed
|
||||||
|
acceleration_limit: 1.0 # Smooth acceleration in night mode
|
||||||
|
deceleration_limit: 0.8 # Conservative braking
|
||||||
|
|
||||||
|
# TTS control
|
||||||
|
tts:
|
||||||
|
disabled_in_night: true
|
||||||
|
fallback_to_face: true # Show text on face instead
|
||||||
|
volume_in_night: 0 # Muted in night mode
|
||||||
|
|
||||||
|
# Tracking parameters
|
||||||
|
tracking:
|
||||||
|
ir_only: true # Use IR cameras for tracking
|
||||||
|
disable_rgb: true # Don't use RGB cameras
|
||||||
|
ir_sensitivity: 0.8 # IR detection sensitivity
|
||||||
|
tracking_confidence: 0.6 # Minimum tracking confidence
|
||||||
|
|
||||||
|
# Detection ranges
|
||||||
|
detection:
|
||||||
|
detection_range: 5.0 # Meters
|
||||||
|
tracking_range: 3.0 # Meters (closer in IR mode)
|
||||||
|
ir_range_boost: 1.2 # Boost range with IR (x1.2)
|
||||||
|
|
||||||
|
# Publishing
|
||||||
|
publications:
|
||||||
|
night_mode_state: "/saltybot/night_mode"
|
||||||
|
motor_override: "/saltybot/motor_speed_override"
|
||||||
|
tts_override: "/saltybot/tts_disable"
|
||||||
|
face_control: "/saltybot/face_control"
|
||||||
|
led_override: "/saltybot/led_brightness_override"
|
||||||
|
|
||||||
|
# Subscriptions
|
||||||
|
subscriptions:
|
||||||
|
light_level: "/saltybot/light_level"
|
||||||
|
realsense_ir: "/camera/ir/image_raw"
|
||||||
|
phone_sensor: "/phone/ambient_light"
|
||||||
|
voice_command: "/saltybot/voice/command"
|
||||||
|
gamepad_input: "/saltybot/gamepad/input"
|
||||||
|
|
||||||
|
# Transition timing
|
||||||
|
transitions:
|
||||||
|
fade_duration: 1.0 # Seconds to fade between modes
|
||||||
|
motor_ramp_time: 2.0 # Seconds to ramp motor speed
|
||||||
|
led_fade_time: 0.5 # Seconds to fade LED brightness
|
||||||
@ -0,0 +1,36 @@
|
|||||||
|
"""Launch file for Night Mode controller node."""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from ament_index_python.packages import get_package_share_directory
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
"""Generate launch description."""
|
||||||
|
package_dir = get_package_share_directory("saltybot_night_mode")
|
||||||
|
config_file = str(Path(package_dir) / "config" / "night_mode_config.yaml")
|
||||||
|
|
||||||
|
# Launch arguments
|
||||||
|
config_arg = DeclareLaunchArgument(
|
||||||
|
"config_file",
|
||||||
|
default_value=config_file,
|
||||||
|
description="Path to night mode configuration file",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Night Mode node
|
||||||
|
night_mode_node = Node(
|
||||||
|
package="saltybot_night_mode",
|
||||||
|
executable="night_mode_node",
|
||||||
|
name="night_mode_controller",
|
||||||
|
parameters=[
|
||||||
|
{
|
||||||
|
"config_file": LaunchConfiguration("config_file"),
|
||||||
|
}
|
||||||
|
],
|
||||||
|
output="screen",
|
||||||
|
)
|
||||||
|
|
||||||
|
return LaunchDescription([config_arg, night_mode_node])
|
||||||
28
jetson/ros2_ws/src/saltybot_night_mode/package.xml
Normal file
28
jetson/ros2_ws/src/saltybot_night_mode/package.xml
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
<?xml version="1.0"?>
|
||||||
|
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
|
||||||
|
<package format="3">
|
||||||
|
<name>saltybot_night_mode</name>
|
||||||
|
<version>0.1.0</version>
|
||||||
|
<description>
|
||||||
|
Night mode controller for SaltyBot. Detects ambient light from sensors and
|
||||||
|
automatically switches to stealth mode with reduced motor speed, IR tracking,
|
||||||
|
dimmed LEDs, and face-only TTS. Includes manual override via voice/gamepad.
|
||||||
|
</description>
|
||||||
|
<maintainer email="sl-mechanical@saltylab.local">sl-mechanical</maintainer>
|
||||||
|
<license>MIT</license>
|
||||||
|
|
||||||
|
<depend>rclpy</depend>
|
||||||
|
<depend>std_msgs</depend>
|
||||||
|
<depend>sensor_msgs</depend>
|
||||||
|
|
||||||
|
<buildtool_depend>ament_python</buildtool_depend>
|
||||||
|
|
||||||
|
<test_depend>ament_copyright</test_depend>
|
||||||
|
<test_depend>ament_flake8</test_depend>
|
||||||
|
<test_depend>ament_pep257</test_depend>
|
||||||
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
|
<export>
|
||||||
|
<build_type>ament_python</build_type>
|
||||||
|
</export>
|
||||||
|
</package>
|
||||||
@ -0,0 +1,395 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Night Mode Controller for SaltyBot.
|
||||||
|
|
||||||
|
Detects ambient light from RealSense IR or phone sensors and automatically
|
||||||
|
switches to stealth mode with reduced motor speed, IR tracking, dimmed LEDs,
|
||||||
|
and face-only TTS. Supports manual override via voice/gamepad.
|
||||||
|
|
||||||
|
Published topics:
|
||||||
|
/saltybot/night_mode (std_msgs/String) - JSON night mode status
|
||||||
|
/saltybot/motor_speed_override (std_msgs/Float32) - Motor speed scale (0.0-1.0)
|
||||||
|
/saltybot/tts_disable (std_msgs/Bool) - TTS disable flag
|
||||||
|
/saltybot/face_control (std_msgs/String) - Face display settings JSON
|
||||||
|
/saltybot/led_brightness_override (std_msgs/Float32) - LED brightness
|
||||||
|
|
||||||
|
Subscribed topics:
|
||||||
|
/saltybot/light_level (std_msgs/String) - Ambient light level in lux
|
||||||
|
/saltybot/voice/command (std_msgs/String) - Voice commands
|
||||||
|
/saltybot/gamepad/input (std_msgs/String) - Gamepad input
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
from collections import deque
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Dict, Optional, Deque
|
||||||
|
|
||||||
|
import yaml
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from std_msgs.msg import String, Float32, Bool
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class NightModeState:
|
||||||
|
"""Night mode state information."""
|
||||||
|
active: bool
|
||||||
|
light_level: float # Lux
|
||||||
|
transition_in_progress: bool
|
||||||
|
transition_progress: float # 0.0-1.0
|
||||||
|
manual_override: bool
|
||||||
|
override_mode: Optional[str] # 'night', 'day', or None
|
||||||
|
|
||||||
|
|
||||||
|
class LightSensor:
|
||||||
|
"""Light level sensor aggregator."""
|
||||||
|
|
||||||
|
def __init__(self, smoothing_window: int = 5):
|
||||||
|
"""Initialize light sensor.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
smoothing_window: Number of samples to average
|
||||||
|
"""
|
||||||
|
self.smoothing_window = smoothing_window
|
||||||
|
self.samples: Deque[float] = deque(maxlen=smoothing_window)
|
||||||
|
self.current_light = 1000.0 # Start assuming daytime (high light)
|
||||||
|
|
||||||
|
def update(self, light_level: float):
|
||||||
|
"""Update with new light level reading."""
|
||||||
|
self.samples.append(light_level)
|
||||||
|
self.current_light = sum(self.samples) / len(self.samples)
|
||||||
|
|
||||||
|
def get_light_level(self) -> float:
|
||||||
|
"""Get smoothed light level."""
|
||||||
|
return self.current_light if self.samples else self.current_light
|
||||||
|
|
||||||
|
|
||||||
|
class TransitionManager:
|
||||||
|
"""Manages smooth transitions between night and day modes."""
|
||||||
|
|
||||||
|
def __init__(self, fade_duration: float = 1.0):
|
||||||
|
"""Initialize transition manager.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
fade_duration: Transition duration in seconds
|
||||||
|
"""
|
||||||
|
self.fade_duration = fade_duration
|
||||||
|
self.transition_start = None
|
||||||
|
self.transition_type = None # 'to_night' or 'to_day'
|
||||||
|
|
||||||
|
def start_transition(self, target_mode: str):
|
||||||
|
"""Start a transition to new mode.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
target_mode: 'night' or 'day'
|
||||||
|
"""
|
||||||
|
self.transition_start = time.time()
|
||||||
|
self.transition_type = f"to_{target_mode}"
|
||||||
|
|
||||||
|
def get_progress(self) -> float:
|
||||||
|
"""Get transition progress (0.0-1.0)."""
|
||||||
|
if self.transition_start is None:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
elapsed = time.time() - self.transition_start
|
||||||
|
progress = min(1.0, elapsed / self.fade_duration)
|
||||||
|
|
||||||
|
if progress >= 1.0:
|
||||||
|
self.transition_start = None
|
||||||
|
self.transition_type = None
|
||||||
|
|
||||||
|
return progress
|
||||||
|
|
||||||
|
def is_transitioning(self) -> bool:
|
||||||
|
"""Check if transition in progress."""
|
||||||
|
return self.transition_start is not None
|
||||||
|
|
||||||
|
def apply_transition(self, day_value: float, night_value: float) -> float:
|
||||||
|
"""Apply transition interpolation.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
day_value: Value for day mode
|
||||||
|
night_value: Value for night mode
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Interpolated value
|
||||||
|
"""
|
||||||
|
progress = self.get_progress()
|
||||||
|
if not self.is_transitioning():
|
||||||
|
return night_value if self.transition_type == 'to_night' else day_value
|
||||||
|
|
||||||
|
if 'night' in self.transition_type:
|
||||||
|
return day_value + (night_value - day_value) * progress
|
||||||
|
else:
|
||||||
|
return night_value + (day_value - night_value) * progress
|
||||||
|
|
||||||
|
|
||||||
|
class NightModeNode(Node):
|
||||||
|
"""ROS2 node for night mode control."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("night_mode_controller")
|
||||||
|
|
||||||
|
# Load config
|
||||||
|
self.declare_parameter("config_file", "night_mode_config.yaml")
|
||||||
|
config_path = self.get_parameter("config_file").value
|
||||||
|
self.config = self._load_config(config_path)
|
||||||
|
|
||||||
|
# State tracking
|
||||||
|
self.light_sensor = LightSensor(
|
||||||
|
self.config["light_sensor"]["smoothing_window"]
|
||||||
|
)
|
||||||
|
self.transition_mgr = TransitionManager(
|
||||||
|
self.config["transitions"]["fade_duration"]
|
||||||
|
)
|
||||||
|
self.night_mode_active = False
|
||||||
|
self.manual_override = False
|
||||||
|
self.override_target = None
|
||||||
|
self.last_update = time.time()
|
||||||
|
|
||||||
|
# Subscribers
|
||||||
|
self.create_subscription(String, "/saltybot/light_level", self._light_cb, 10)
|
||||||
|
self.create_subscription(String, "/saltybot/voice/command", self._voice_cb, 10)
|
||||||
|
self.create_subscription(String, "/saltybot/gamepad/input", self._gamepad_cb, 10)
|
||||||
|
|
||||||
|
# Publishers
|
||||||
|
self.night_mode_pub = self.create_publisher(
|
||||||
|
String, "/saltybot/night_mode", 10
|
||||||
|
)
|
||||||
|
self.motor_override_pub = self.create_publisher(
|
||||||
|
Float32, "/saltybot/motor_speed_override", 10
|
||||||
|
)
|
||||||
|
self.tts_disable_pub = self.create_publisher(
|
||||||
|
Bool, "/saltybot/tts_disable", 10
|
||||||
|
)
|
||||||
|
self.face_control_pub = self.create_publisher(
|
||||||
|
String, "/saltybot/face_control", 10
|
||||||
|
)
|
||||||
|
self.led_override_pub = self.create_publisher(
|
||||||
|
Float32, "/saltybot/led_brightness_override", 10
|
||||||
|
)
|
||||||
|
|
||||||
|
# Timer for main loop
|
||||||
|
self.timer = self.create_timer(0.1, self._update_night_mode)
|
||||||
|
|
||||||
|
self.get_logger().info("Night Mode Controller initialized")
|
||||||
|
|
||||||
|
def _load_config(self, config_path: str) -> Dict:
|
||||||
|
"""Load YAML configuration."""
|
||||||
|
try:
|
||||||
|
with open(config_path) as f:
|
||||||
|
return yaml.safe_load(f)
|
||||||
|
except FileNotFoundError:
|
||||||
|
self.get_logger().warn(f"Config not found: {config_path}")
|
||||||
|
return self._default_config()
|
||||||
|
|
||||||
|
def _default_config(self) -> Dict:
|
||||||
|
"""Return default configuration."""
|
||||||
|
return {
|
||||||
|
"light_sensor": {
|
||||||
|
"threshold_lux": 50,
|
||||||
|
"hysteresis": 5,
|
||||||
|
"smoothing_window": 5,
|
||||||
|
},
|
||||||
|
"night_mode": {
|
||||||
|
"enabled": True,
|
||||||
|
"motor_speed_percent": 50,
|
||||||
|
"disable_tts": True,
|
||||||
|
"led_brightness_percent": 5,
|
||||||
|
"face_brightness_percent": 30,
|
||||||
|
"ir_tracking_only": True,
|
||||||
|
"stealth_led": {
|
||||||
|
"color": [0, 20, 80],
|
||||||
|
"pattern": "slow_dim",
|
||||||
|
"speed": 0.3,
|
||||||
|
"enabled": True,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"overrides": {
|
||||||
|
"voice_control": True,
|
||||||
|
"gamepad_control": True,
|
||||||
|
"gamepad_button": "y",
|
||||||
|
},
|
||||||
|
"transitions": {
|
||||||
|
"fade_duration": 1.0,
|
||||||
|
"motor_ramp_time": 2.0,
|
||||||
|
"led_fade_time": 0.5,
|
||||||
|
},
|
||||||
|
"publications": {
|
||||||
|
"night_mode_state": "/saltybot/night_mode",
|
||||||
|
"motor_override": "/saltybot/motor_speed_override",
|
||||||
|
"tts_override": "/saltybot/tts_disable",
|
||||||
|
"face_control": "/saltybot/face_control",
|
||||||
|
"led_override": "/saltybot/led_brightness_override",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
def _light_cb(self, msg: String):
|
||||||
|
"""Handle light level sensor data."""
|
||||||
|
try:
|
||||||
|
data = json.loads(msg.data)
|
||||||
|
light_level = data.get("lux", 1000.0)
|
||||||
|
self.light_sensor.update(light_level)
|
||||||
|
except (json.JSONDecodeError, KeyError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _voice_cb(self, msg: String):
|
||||||
|
"""Handle voice commands for manual override."""
|
||||||
|
try:
|
||||||
|
data = json.loads(msg.data)
|
||||||
|
command = data.get("command", "").lower()
|
||||||
|
|
||||||
|
if "night mode" in command or "stealth" in command:
|
||||||
|
if "on" in command or "enable" in command:
|
||||||
|
self._set_manual_override("night")
|
||||||
|
elif "off" in command or "disable" in command:
|
||||||
|
self._set_manual_override("day")
|
||||||
|
elif "toggle" in command:
|
||||||
|
self._toggle_manual_override()
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _gamepad_cb(self, msg: String):
|
||||||
|
"""Handle gamepad input for manual override."""
|
||||||
|
try:
|
||||||
|
data = json.loads(msg.data)
|
||||||
|
button = data.get("button", "")
|
||||||
|
pressed = data.get("pressed", False)
|
||||||
|
|
||||||
|
if button == self.config["overrides"]["gamepad_button"] and pressed:
|
||||||
|
self._toggle_manual_override()
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _set_manual_override(self, mode: str):
|
||||||
|
"""Set manual mode override.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
mode: 'night', 'day', or None for auto
|
||||||
|
"""
|
||||||
|
if mode in ["night", "day"]:
|
||||||
|
self.manual_override = True
|
||||||
|
self.override_target = mode
|
||||||
|
self.get_logger().info(f"Manual override: {mode} mode")
|
||||||
|
else:
|
||||||
|
self.manual_override = False
|
||||||
|
self.override_target = None
|
||||||
|
self.get_logger().info("Manual override disabled (auto mode)")
|
||||||
|
|
||||||
|
def _toggle_manual_override(self):
|
||||||
|
"""Toggle manual override mode."""
|
||||||
|
if not self.manual_override:
|
||||||
|
self._set_manual_override("night" if not self.night_mode_active else "day")
|
||||||
|
else:
|
||||||
|
self._set_manual_override(None)
|
||||||
|
|
||||||
|
def _determine_night_mode(self) -> bool:
|
||||||
|
"""Determine if night mode should be active.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if night mode should be active, False otherwise
|
||||||
|
"""
|
||||||
|
if self.manual_override:
|
||||||
|
return self.override_target == "night"
|
||||||
|
|
||||||
|
light_level = self.light_sensor.get_light_level()
|
||||||
|
threshold = self.config["light_sensor"]["threshold_lux"]
|
||||||
|
hysteresis = self.config["light_sensor"]["hysteresis"]
|
||||||
|
|
||||||
|
if self.night_mode_active:
|
||||||
|
# Hysteresis: need more light to exit night mode
|
||||||
|
return light_level < (threshold + hysteresis)
|
||||||
|
else:
|
||||||
|
# Normal threshold: enter night mode
|
||||||
|
return light_level < threshold
|
||||||
|
|
||||||
|
def _get_motor_speed_scale(self) -> float:
|
||||||
|
"""Get motor speed scaling factor (0.0-1.0)."""
|
||||||
|
if not self.night_mode_active:
|
||||||
|
return 1.0
|
||||||
|
|
||||||
|
target_speed = self.config["night_mode"]["motor_speed_percent"] / 100.0
|
||||||
|
|
||||||
|
if self.transition_mgr.is_transitioning():
|
||||||
|
return self.transition_mgr.apply_transition(1.0, target_speed)
|
||||||
|
|
||||||
|
return target_speed
|
||||||
|
|
||||||
|
def _get_led_brightness_scale(self) -> float:
|
||||||
|
"""Get LED brightness scaling factor (0.0-1.0)."""
|
||||||
|
if not self.night_mode_active:
|
||||||
|
return 1.0
|
||||||
|
|
||||||
|
target_brightness = self.config["night_mode"]["led_brightness_percent"] / 100.0
|
||||||
|
|
||||||
|
if self.transition_mgr.is_transitioning():
|
||||||
|
return self.transition_mgr.apply_transition(1.0, target_brightness)
|
||||||
|
|
||||||
|
return target_brightness
|
||||||
|
|
||||||
|
def _get_face_brightness_scale(self) -> float:
|
||||||
|
"""Get face brightness scaling factor (0.0-1.0)."""
|
||||||
|
if not self.night_mode_active:
|
||||||
|
return 1.0
|
||||||
|
|
||||||
|
target_brightness = self.config["night_mode"]["face_brightness_percent"] / 100.0
|
||||||
|
|
||||||
|
if self.transition_mgr.is_transitioning():
|
||||||
|
return self.transition_mgr.apply_transition(1.0, target_brightness)
|
||||||
|
|
||||||
|
return target_brightness
|
||||||
|
|
||||||
|
def _update_night_mode(self):
|
||||||
|
"""Update night mode state and publish overrides."""
|
||||||
|
# Determine mode
|
||||||
|
should_be_night = self._determine_night_mode()
|
||||||
|
|
||||||
|
# Handle mode transition
|
||||||
|
if should_be_night != self.night_mode_active:
|
||||||
|
self.night_mode_active = should_be_night
|
||||||
|
target = "night" if should_be_night else "day"
|
||||||
|
self.transition_mgr.start_transition(target)
|
||||||
|
self.get_logger().info(f"Night mode: {should_be_night}")
|
||||||
|
|
||||||
|
# Publish night mode state
|
||||||
|
state = {
|
||||||
|
"active": self.night_mode_active,
|
||||||
|
"light_level": round(self.light_sensor.get_light_level(), 1),
|
||||||
|
"transitioning": self.transition_mgr.is_transitioning(),
|
||||||
|
"manual_override": self.manual_override,
|
||||||
|
"threshold_lux": self.config["light_sensor"]["threshold_lux"],
|
||||||
|
}
|
||||||
|
self.night_mode_pub.publish(String(data=json.dumps(state)))
|
||||||
|
|
||||||
|
# Publish motor speed override
|
||||||
|
motor_scale = self._get_motor_speed_scale()
|
||||||
|
self.motor_override_pub.publish(Float32(data=motor_scale))
|
||||||
|
|
||||||
|
# Publish TTS disable
|
||||||
|
tts_disabled = self.night_mode_active and self.config["night_mode"]["disable_tts"]
|
||||||
|
self.tts_disable_pub.publish(Bool(data=tts_disabled))
|
||||||
|
|
||||||
|
# Publish face control settings
|
||||||
|
face_config = {
|
||||||
|
"display_only": self.night_mode_active,
|
||||||
|
"brightness_scale": self._get_face_brightness_scale(),
|
||||||
|
"contrast_boost": self.night_mode_active,
|
||||||
|
}
|
||||||
|
self.face_control_pub.publish(String(data=json.dumps(face_config)))
|
||||||
|
|
||||||
|
# Publish LED brightness override
|
||||||
|
led_scale = self._get_led_brightness_scale()
|
||||||
|
self.led_override_pub.publish(Float32(data=led_scale))
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
"""Main entry point."""
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = NightModeNode()
|
||||||
|
rclpy.spin(node)
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
4
jetson/ros2_ws/src/saltybot_night_mode/setup.cfg
Normal file
4
jetson/ros2_ws/src/saltybot_night_mode/setup.cfg
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
[develop]
|
||||||
|
script_dir=$base/lib/saltybot_night_mode
|
||||||
|
[bdist_wheel]
|
||||||
|
universal=0
|
||||||
30
jetson/ros2_ws/src/saltybot_night_mode/setup.py
Normal file
30
jetson/ros2_ws/src/saltybot_night_mode/setup.py
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
from setuptools import setup
|
||||||
|
|
||||||
|
package_name = "saltybot_night_mode"
|
||||||
|
|
||||||
|
setup(
|
||||||
|
name=package_name,
|
||||||
|
version="0.1.0",
|
||||||
|
packages=[package_name],
|
||||||
|
data_files=[
|
||||||
|
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
|
||||||
|
(f"share/{package_name}", ["package.xml"]),
|
||||||
|
(f"share/{package_name}/launch", ["launch/night_mode.launch.py"]),
|
||||||
|
(f"share/{package_name}/config", ["config/night_mode_config.yaml"]),
|
||||||
|
],
|
||||||
|
install_requires=["setuptools", "pyyaml"],
|
||||||
|
zip_safe=True,
|
||||||
|
maintainer="sl-mechanical",
|
||||||
|
maintainer_email="sl-mechanical@saltylab.local",
|
||||||
|
description=(
|
||||||
|
"Night mode controller: IR perception, motor speed reduction, "
|
||||||
|
"stealth LED, TTS disable, face-only mode"
|
||||||
|
),
|
||||||
|
license="MIT",
|
||||||
|
tests_require=["pytest"],
|
||||||
|
entry_points={
|
||||||
|
"console_scripts": [
|
||||||
|
"night_mode_node = saltybot_night_mode.night_mode_node:main",
|
||||||
|
],
|
||||||
|
},
|
||||||
|
)
|
||||||
Loading…
x
Reference in New Issue
Block a user