Merge remote-tracking branch 'origin/sl-jetson/issue-456-photo-capture'
# Conflicts: # jetson/ros2_ws/src/saltybot_obstacle_memory/package.xml # jetson/ros2_ws/src/saltybot_photo_capture/package.xml # jetson/ros2_ws/src/saltybot_photo_capture/setup.py
This commit is contained in:
commit
c25d63772a
7
jetson/ros2_ws/src/saltybot_obstacle_memory/.gitignore
vendored
Normal file
7
jetson/ros2_ws/src/saltybot_obstacle_memory/.gitignore
vendored
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
build/
|
||||||
|
install/
|
||||||
|
log/
|
||||||
|
*.egg-info/
|
||||||
|
__pycache__/
|
||||||
|
*.pyc
|
||||||
|
.pytest_cache/
|
||||||
101
jetson/ros2_ws/src/saltybot_obstacle_memory/README.md
Normal file
101
jetson/ros2_ws/src/saltybot_obstacle_memory/README.md
Normal file
@ -0,0 +1,101 @@
|
|||||||
|
# SaltyBot Obstacle Memory Map (Issue #453)
|
||||||
|
|
||||||
|
Persistent spatial memory system for tracking obstacles and hazards detected by LIDAR. Maintains a 2D occupancy grid that accumulates detections over time with temporal decay.
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
### 1. Occupancy Grid Map
|
||||||
|
- **Resolution**: 10cm per cell (configurable)
|
||||||
|
- **Grid Size**: 100×100 cells by default (~10×10 meters centered on robot)
|
||||||
|
- **Cell States**:
|
||||||
|
- **Unknown**: Never observed (-1 in OccupancyGrid)
|
||||||
|
- **Free**: Definitely free space (0% occupancy)
|
||||||
|
- **Obstacle**: Detected obstacle (0–100% based on confidence)
|
||||||
|
- **Hazard**: Persistent hazard with 3+ detections (high confidence)
|
||||||
|
|
||||||
|
### 2. Hazard Classification
|
||||||
|
- **Detection-based**: 3+ observations of same location = hazard
|
||||||
|
- **Permanent**: Hazards persist even with temporal decay
|
||||||
|
- **Confidence Tracking**: 0.0–1.0 based on detection history
|
||||||
|
- **Quality Metric**: Increases confidence with repeated detections
|
||||||
|
|
||||||
|
### 3. Temporal Decay
|
||||||
|
- **Hazard Decay**: 95% per day (30-day half-life)
|
||||||
|
- Decays slowly due to persistence
|
||||||
|
- Never drops below 30% confidence
|
||||||
|
- **Obstacle Decay**: 85% per day (~21-day half-life)
|
||||||
|
- Faster decay than hazards
|
||||||
|
- Reverts to free space when confidence < 20%
|
||||||
|
- **Decay Interval**: Applied every hour
|
||||||
|
|
||||||
|
### 4. LIDAR Integration
|
||||||
|
- **Input**: LaserScan on `/scan` (or `/saltybot/scan`)
|
||||||
|
- **Odometry**: Robot position from `/odom`
|
||||||
|
- **Ray Casting**: Marks hit points as obstacles, intermediate points as free
|
||||||
|
- **Range Filtering**: Configurable min/max range (default 0.1–10.0m)
|
||||||
|
|
||||||
|
### 5. ROS2 Navigation Integration
|
||||||
|
- **Output**: `/saltybot/obstacle_map` (nav_msgs/OccupancyGrid)
|
||||||
|
- **Format**: Standard OccupancyGrid for Nav2 costmap
|
||||||
|
- **Frequency**: Published at 5 Hz
|
||||||
|
- **Frame**: Map frame compatible with move_base
|
||||||
|
|
||||||
|
### 6. Persistence
|
||||||
|
- **Storage**: YAML file at `/home/seb/saltybot-data/obstacle_map.yaml`
|
||||||
|
- **Format**: Grid metadata + cell states with timestamps
|
||||||
|
- **Auto-Save**: Every 5 minutes (configurable)
|
||||||
|
- **Load on Startup**: Restores previous session's map
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
Edit `config/obstacle_memory.yaml`:
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
grid_width: 100
|
||||||
|
grid_height: 100
|
||||||
|
grid_resolution: 0.1
|
||||||
|
map_file: "/home/seb/saltybot-data/obstacle_map.yaml"
|
||||||
|
auto_save_interval: 300.0
|
||||||
|
scan_max_range: 10.0
|
||||||
|
scan_min_range: 0.1
|
||||||
|
publish_rate_hz: 5.0
|
||||||
|
decay_interval_seconds: 3600.0
|
||||||
|
```
|
||||||
|
|
||||||
|
## Running
|
||||||
|
|
||||||
|
### Launch the obstacle memory node
|
||||||
|
```bash
|
||||||
|
ros2 launch saltybot_obstacle_memory obstacle_memory.launch.py
|
||||||
|
```
|
||||||
|
|
||||||
|
### Direct execution
|
||||||
|
```bash
|
||||||
|
ros2 run saltybot_obstacle_memory obstacle_memory
|
||||||
|
```
|
||||||
|
|
||||||
|
## Integration Points
|
||||||
|
|
||||||
|
1. **Navigation Stack** (Nav2)
|
||||||
|
- Consumes `/saltybot/obstacle_map` in costmap_2d_ros
|
||||||
|
- Influences pathfinding and local costmap
|
||||||
|
|
||||||
|
2. **LIDAR Driver** (e.g., saltybot_sensors)
|
||||||
|
- Publishes `/scan` or `/saltybot/scan`
|
||||||
|
- Ray angles, distances, and timestamp
|
||||||
|
|
||||||
|
3. **Odometry Provider** (e.g., wheel encoders + IMU)
|
||||||
|
- Publishes `/odom`
|
||||||
|
- Robot pose (x, y, theta)
|
||||||
|
|
||||||
|
4. **Dashboard/WebUI**
|
||||||
|
- Subscribes to `/saltybot/obstacle_map`
|
||||||
|
- Displays as overlay on navigation view
|
||||||
|
- Hazard visualization
|
||||||
|
|
||||||
|
## Performance
|
||||||
|
|
||||||
|
- **Memory**: ~100KB per 100×100 grid
|
||||||
|
- **Computation**: O(n) per LIDAR scan (n = number of rays)
|
||||||
|
- **Persistence**: O(width×height) per save (~1ms for 100×100)
|
||||||
|
- **Decay**: O(width×height) per hour (~5ms for 100×100)
|
||||||
@ -0,0 +1,12 @@
|
|||||||
|
/**:
|
||||||
|
ros__parameters:
|
||||||
|
grid_width: 100
|
||||||
|
grid_height: 100
|
||||||
|
grid_resolution: 0.1
|
||||||
|
map_file: "/home/seb/saltybot-data/obstacle_map.yaml"
|
||||||
|
auto_save_interval: 300.0
|
||||||
|
scan_max_range: 10.0
|
||||||
|
scan_min_range: 0.1
|
||||||
|
publish_rate_hz: 5.0
|
||||||
|
decay_interval_seconds: 3600.0
|
||||||
|
hazard_detection_threshold: 3
|
||||||
@ -0,0 +1,31 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
from launch.substitutions import PathJoinSubstitution
|
||||||
|
from launch_ros.substitutions import FindPackageShare
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
"""Launch obstacle memory node with configuration."""
|
||||||
|
|
||||||
|
package_share_dir = FindPackageShare("saltybot_obstacle_memory")
|
||||||
|
config_file = PathJoinSubstitution(
|
||||||
|
[package_share_dir, "config", "obstacle_memory.yaml"]
|
||||||
|
)
|
||||||
|
|
||||||
|
obstacle_memory_node = Node(
|
||||||
|
package="saltybot_obstacle_memory",
|
||||||
|
executable="obstacle_memory",
|
||||||
|
name="obstacle_memory",
|
||||||
|
output="screen",
|
||||||
|
parameters=[config_file],
|
||||||
|
remappings=[
|
||||||
|
("/scan", "/saltybot/scan"),
|
||||||
|
("/odom", "/odom"),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
return LaunchDescription([
|
||||||
|
obstacle_memory_node,
|
||||||
|
])
|
||||||
@ -5,24 +5,19 @@
|
|||||||
<version>0.1.0</version>
|
<version>0.1.0</version>
|
||||||
<description>
|
<description>
|
||||||
Persistent spatial memory map for LIDAR-detected obstacles and hazards.
|
Persistent spatial memory map for LIDAR-detected obstacles and hazards.
|
||||||
Maintains a grid-based occupancy map with temporal decay, persistent hazard classification,
|
|
||||||
and OccupancyGrid publishing for Nav2 integration (Issue #453).
|
|
||||||
</description>
|
</description>
|
||||||
<maintainer email="seb@vayrette.com">seb</maintainer>
|
<maintainer email="seb@vayrette.com">seb</maintainer>
|
||||||
<license>MIT</license>
|
<license>MIT</license>
|
||||||
|
|
||||||
<depend>rclpy</depend>
|
<depend>rclpy</depend>
|
||||||
<depend>std_msgs</depend>
|
<depend>std_msgs</depend>
|
||||||
<depend>sensor_msgs</depend>
|
<depend>sensor_msgs</depend>
|
||||||
<depend>nav_msgs</depend>
|
<depend>nav_msgs</depend>
|
||||||
<depend>geometry_msgs</depend>
|
<depend>geometry_msgs</depend>
|
||||||
<depend>ament_index_python</depend>
|
<depend>ament_index_python</depend>
|
||||||
|
|
||||||
<test_depend>ament_copyright</test_depend>
|
<test_depend>ament_copyright</test_depend>
|
||||||
<test_depend>ament_flake8</test_depend>
|
<test_depend>ament_flake8</test_depend>
|
||||||
<test_depend>ament_pep257</test_depend>
|
<test_depend>ament_pep257</test_depend>
|
||||||
<test_depend>python3-pytest</test_depend>
|
<test_depend>python3-pytest</test_depend>
|
||||||
|
|
||||||
<export>
|
<export>
|
||||||
<build_type>ament_python</build_type>
|
<build_type>ament_python</build_type>
|
||||||
</export>
|
</export>
|
||||||
|
|||||||
@ -0,0 +1,424 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import math
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import yaml
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from typing import Optional, Tuple, List, Dict
|
||||||
|
from collections import defaultdict
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
|
||||||
|
from sensor_msgs.msg import LaserScan
|
||||||
|
from nav_msgs.msg import Odometry, OccupancyGrid
|
||||||
|
from geometry_msgs.msg import MapMetaData
|
||||||
|
|
||||||
|
|
||||||
|
class GridCell:
|
||||||
|
"""Represents a single cell in the occupancy grid."""
|
||||||
|
|
||||||
|
STATES = {
|
||||||
|
"free": -1,
|
||||||
|
"unknown": -1,
|
||||||
|
"obstacle": 100,
|
||||||
|
"hazard": 100,
|
||||||
|
}
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.state = "unknown"
|
||||||
|
self.detection_count = 0
|
||||||
|
self.last_update = time.time()
|
||||||
|
self.confidence = 0.0
|
||||||
|
self.decay_rate = 0.95
|
||||||
|
|
||||||
|
def update_obstacle(self):
|
||||||
|
"""Mark this cell as obstacle."""
|
||||||
|
self.detection_count += 1
|
||||||
|
self.last_update = time.time()
|
||||||
|
|
||||||
|
if self.detection_count >= 3:
|
||||||
|
self.state = "hazard"
|
||||||
|
else:
|
||||||
|
self.state = "obstacle"
|
||||||
|
|
||||||
|
self.confidence = min(1.0, self.detection_count / 5.0)
|
||||||
|
|
||||||
|
def mark_free(self):
|
||||||
|
"""Mark this cell as definitely free."""
|
||||||
|
if self.state != "hazard":
|
||||||
|
self.state = "free"
|
||||||
|
self.confidence = 0.5
|
||||||
|
|
||||||
|
def apply_decay(self, dt: float) -> float:
|
||||||
|
"""Apply temporal decay to confidence."""
|
||||||
|
if self.state == "hazard":
|
||||||
|
days_elapsed = dt / 86400.0
|
||||||
|
decay_factor = (0.95 ** days_elapsed)
|
||||||
|
self.confidence = max(0.3, self.confidence * decay_factor)
|
||||||
|
elif self.state == "obstacle":
|
||||||
|
days_elapsed = dt / 86400.0
|
||||||
|
decay_factor = (0.85 ** days_elapsed)
|
||||||
|
self.confidence = max(0.0, self.confidence * decay_factor)
|
||||||
|
|
||||||
|
if self.confidence < 0.2:
|
||||||
|
self.state = "free"
|
||||||
|
self.confidence = 0.0
|
||||||
|
|
||||||
|
return self.confidence
|
||||||
|
|
||||||
|
def get_occupancy_value(self) -> int:
|
||||||
|
"""Get OccupancyGrid value (-1 to 100)."""
|
||||||
|
if self.state == "unknown":
|
||||||
|
return -1
|
||||||
|
elif self.state == "free":
|
||||||
|
return 0
|
||||||
|
else:
|
||||||
|
return int(self.confidence * 100)
|
||||||
|
|
||||||
|
def to_dict(self) -> Dict:
|
||||||
|
"""Serialize to dict for YAML persistence."""
|
||||||
|
return {
|
||||||
|
"state": self.state,
|
||||||
|
"detection_count": self.detection_count,
|
||||||
|
"confidence": float(self.confidence),
|
||||||
|
"last_update": self.last_update,
|
||||||
|
}
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def from_dict(data: Dict) -> "GridCell":
|
||||||
|
"""Deserialize from dict."""
|
||||||
|
cell = GridCell()
|
||||||
|
cell.state = data.get("state", "unknown")
|
||||||
|
cell.detection_count = data.get("detection_count", 0)
|
||||||
|
cell.confidence = data.get("confidence", 0.0)
|
||||||
|
cell.last_update = data.get("last_update", time.time())
|
||||||
|
return cell
|
||||||
|
|
||||||
|
|
||||||
|
class OccupancyGridMap:
|
||||||
|
"""2D occupancy grid with spatial persistence."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
width: int = 100,
|
||||||
|
height: int = 100,
|
||||||
|
resolution: float = 0.1,
|
||||||
|
origin_x: float = 0.0,
|
||||||
|
origin_y: float = 0.0,
|
||||||
|
):
|
||||||
|
"""Initialize occupancy grid."""
|
||||||
|
self.width = width
|
||||||
|
self.height = height
|
||||||
|
self.resolution = resolution
|
||||||
|
self.origin_x = -width * resolution / 2
|
||||||
|
self.origin_y = -height * resolution / 2
|
||||||
|
|
||||||
|
self.grid = [[GridCell() for _ in range(width)] for _ in range(height)]
|
||||||
|
self.last_decay_time = time.time()
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
|
||||||
|
def world_to_grid(self, x: float, y: float) -> Optional[Tuple[int, int]]:
|
||||||
|
"""Convert world coordinates to grid indices."""
|
||||||
|
grid_x = int((x - self.origin_x) / self.resolution)
|
||||||
|
grid_y = int((y - self.origin_y) / self.resolution)
|
||||||
|
|
||||||
|
if 0 <= grid_x < self.width and 0 <= grid_y < self.height:
|
||||||
|
return (grid_x, grid_y)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def grid_to_world(self, grid_x: int, grid_y: int) -> Tuple[float, float]:
|
||||||
|
"""Convert grid indices to world coordinates."""
|
||||||
|
x = self.origin_x + (grid_x + 0.5) * self.resolution
|
||||||
|
y = self.origin_y + (grid_y + 0.5) * self.resolution
|
||||||
|
return (x, y)
|
||||||
|
|
||||||
|
def mark_obstacle(self, x: float, y: float):
|
||||||
|
"""Mark a world point as obstacle."""
|
||||||
|
with self.lock:
|
||||||
|
coords = self.world_to_grid(x, y)
|
||||||
|
if coords:
|
||||||
|
grid_x, grid_y = coords
|
||||||
|
self.grid[grid_y][grid_x].update_obstacle()
|
||||||
|
|
||||||
|
def mark_free(self, x: float, y: float):
|
||||||
|
"""Mark a world point as free space."""
|
||||||
|
with self.lock:
|
||||||
|
coords = self.world_to_grid(x, y)
|
||||||
|
if coords:
|
||||||
|
grid_x, grid_y = coords
|
||||||
|
self.grid[grid_y][grid_x].mark_free()
|
||||||
|
|
||||||
|
def apply_temporal_decay(self):
|
||||||
|
"""Apply decay to all cells."""
|
||||||
|
with self.lock:
|
||||||
|
current_time = time.time()
|
||||||
|
dt = current_time - self.last_decay_time
|
||||||
|
|
||||||
|
if dt > 3600:
|
||||||
|
for row in self.grid:
|
||||||
|
for cell in row:
|
||||||
|
cell.apply_decay(dt)
|
||||||
|
self.last_decay_time = current_time
|
||||||
|
|
||||||
|
def to_occupancy_grid(self, frame_id: str = "map", timestamp=None) -> OccupancyGrid:
|
||||||
|
"""Convert to ROS2 OccupancyGrid message."""
|
||||||
|
msg = OccupancyGrid()
|
||||||
|
msg.header.frame_id = frame_id
|
||||||
|
msg.header.stamp = timestamp if timestamp else rclpy.clock.Clock().now().to_msg()
|
||||||
|
|
||||||
|
msg.info.width = self.width
|
||||||
|
msg.info.height = self.height
|
||||||
|
msg.info.resolution = self.resolution
|
||||||
|
msg.info.origin.position.x = self.origin_x
|
||||||
|
msg.info.origin.position.y = self.origin_y
|
||||||
|
msg.info.origin.position.z = 0.0
|
||||||
|
|
||||||
|
with self.lock:
|
||||||
|
data = []
|
||||||
|
for y in range(self.height):
|
||||||
|
for x in range(self.width):
|
||||||
|
data.append(self.grid[y][x].get_occupancy_value())
|
||||||
|
msg.data = data
|
||||||
|
|
||||||
|
return msg
|
||||||
|
|
||||||
|
def to_yaml(self) -> Dict:
|
||||||
|
"""Serialize grid to YAML-compatible dict."""
|
||||||
|
with self.lock:
|
||||||
|
grid_data = []
|
||||||
|
for y in range(self.height):
|
||||||
|
row = []
|
||||||
|
for x in range(self.width):
|
||||||
|
row.append(self.grid[y][x].to_dict())
|
||||||
|
grid_data.append(row)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"metadata": {
|
||||||
|
"width": self.width,
|
||||||
|
"height": self.height,
|
||||||
|
"resolution": self.resolution,
|
||||||
|
"origin": {"x": self.origin_x, "y": self.origin_y},
|
||||||
|
"timestamp": datetime.now().isoformat(),
|
||||||
|
},
|
||||||
|
"grid": grid_data,
|
||||||
|
}
|
||||||
|
|
||||||
|
def from_yaml(self, data: Dict):
|
||||||
|
"""Load grid from YAML-compatible dict."""
|
||||||
|
with self.lock:
|
||||||
|
meta = data.get("metadata", {})
|
||||||
|
self.width = meta.get("width", self.width)
|
||||||
|
self.height = meta.get("height", self.height)
|
||||||
|
self.resolution = meta.get("resolution", self.resolution)
|
||||||
|
|
||||||
|
origin = meta.get("origin", {})
|
||||||
|
self.origin_x = origin.get("x", self.origin_x)
|
||||||
|
self.origin_y = origin.get("y", self.origin_y)
|
||||||
|
|
||||||
|
grid_data = data.get("grid", [])
|
||||||
|
self.grid = []
|
||||||
|
for y in range(self.height):
|
||||||
|
row = []
|
||||||
|
for x in range(self.width):
|
||||||
|
if y < len(grid_data) and x < len(grid_data[y]):
|
||||||
|
cell = GridCell.from_dict(grid_data[y][x])
|
||||||
|
else:
|
||||||
|
cell = GridCell()
|
||||||
|
row.append(cell)
|
||||||
|
self.grid.append(row)
|
||||||
|
|
||||||
|
def get_statistics(self) -> Dict:
|
||||||
|
"""Get grid statistics."""
|
||||||
|
with self.lock:
|
||||||
|
total_cells = self.width * self.height
|
||||||
|
obstacle_count = 0
|
||||||
|
hazard_count = 0
|
||||||
|
free_count = 0
|
||||||
|
|
||||||
|
for row in self.grid:
|
||||||
|
for cell in row:
|
||||||
|
if cell.state == "obstacle":
|
||||||
|
obstacle_count += 1
|
||||||
|
elif cell.state == "hazard":
|
||||||
|
hazard_count += 1
|
||||||
|
elif cell.state == "free":
|
||||||
|
free_count += 1
|
||||||
|
|
||||||
|
return {
|
||||||
|
"total_cells": total_cells,
|
||||||
|
"obstacle_cells": obstacle_count,
|
||||||
|
"hazard_cells": hazard_count,
|
||||||
|
"free_cells": free_count,
|
||||||
|
"unknown_cells": total_cells - obstacle_count - hazard_count - free_count,
|
||||||
|
"coverage_percent": (obstacle_count + hazard_count + free_count) / total_cells * 100,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class ObstacleMemoryNode(Node):
|
||||||
|
"""ROS2 node for persistent obstacle memory mapping."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("saltybot_obstacle_memory")
|
||||||
|
|
||||||
|
self.declare_parameter("grid_width", 100)
|
||||||
|
self.declare_parameter("grid_height", 100)
|
||||||
|
self.declare_parameter("grid_resolution", 0.1)
|
||||||
|
self.declare_parameter("map_file", "/home/seb/saltybot-data/obstacle_map.yaml")
|
||||||
|
self.declare_parameter("scan_max_range", 10.0)
|
||||||
|
self.declare_parameter("scan_min_range", 0.1)
|
||||||
|
self.declare_parameter("publish_rate_hz", 5.0)
|
||||||
|
self.declare_parameter("decay_interval_seconds", 3600.0)
|
||||||
|
self.declare_parameter("auto_save_interval", 300.0)
|
||||||
|
|
||||||
|
width = self.get_parameter("grid_width").value
|
||||||
|
height = self.get_parameter("grid_height").value
|
||||||
|
resolution = self.get_parameter("grid_resolution").value
|
||||||
|
map_file = self.get_parameter("map_file").value
|
||||||
|
|
||||||
|
self.map_file = Path(map_file)
|
||||||
|
self.map_file.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
self.grid = OccupancyGridMap(
|
||||||
|
width=width,
|
||||||
|
height=height,
|
||||||
|
resolution=resolution,
|
||||||
|
)
|
||||||
|
|
||||||
|
if self.map_file.exists():
|
||||||
|
try:
|
||||||
|
with open(self.map_file) as f:
|
||||||
|
data = yaml.safe_load(f)
|
||||||
|
self.grid.from_yaml(data)
|
||||||
|
self.get_logger().info(f"Loaded map from {self.map_file}")
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f"Failed to load map: {e}")
|
||||||
|
|
||||||
|
self.robot_x = 0.0
|
||||||
|
self.robot_y = 0.0
|
||||||
|
self.robot_angle = 0.0
|
||||||
|
self.scan_max_range = self.get_parameter("scan_max_range").value
|
||||||
|
self.scan_min_range = self.get_parameter("scan_min_range").value
|
||||||
|
|
||||||
|
qos = QoSProfile(
|
||||||
|
reliability=ReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=HistoryPolicy.KEEP_LAST,
|
||||||
|
depth=5,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.scan_sub = self.create_subscription(
|
||||||
|
LaserScan,
|
||||||
|
"/scan",
|
||||||
|
self.scan_callback,
|
||||||
|
qos,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.odom_sub = self.create_subscription(
|
||||||
|
Odometry,
|
||||||
|
"/odom",
|
||||||
|
self.odom_callback,
|
||||||
|
qos,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.map_pub = self.create_publisher(
|
||||||
|
OccupancyGrid,
|
||||||
|
"/saltybot/obstacle_map",
|
||||||
|
10,
|
||||||
|
)
|
||||||
|
|
||||||
|
publish_rate = self.get_parameter("publish_rate_hz").value
|
||||||
|
self.publish_timer = self.create_timer(1.0 / publish_rate, self.publish_map_callback)
|
||||||
|
|
||||||
|
decay_interval = self.get_parameter("decay_interval_seconds").value
|
||||||
|
self.decay_timer = self.create_timer(decay_interval, self.decay_callback)
|
||||||
|
|
||||||
|
auto_save_interval = self.get_parameter("auto_save_interval").value
|
||||||
|
self.save_timer = self.create_timer(auto_save_interval, self.save_map_callback)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Obstacle memory initialized: {width}x{height} grid @ {resolution}m/cell, "
|
||||||
|
f"map file: {self.map_file}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def odom_callback(self, msg: Odometry):
|
||||||
|
"""Update robot position from odometry."""
|
||||||
|
self.robot_x = msg.pose.pose.position.x
|
||||||
|
self.robot_y = msg.pose.pose.position.y
|
||||||
|
|
||||||
|
q = msg.pose.pose.orientation
|
||||||
|
siny_cosp = 2 * (q.w * q.z + q.x * q.y)
|
||||||
|
cosy_cosp = 1 - 2 * (q.y * q.y + q.z * q.z)
|
||||||
|
self.robot_angle = math.atan2(siny_cosp, cosy_cosp)
|
||||||
|
|
||||||
|
def scan_callback(self, msg: LaserScan):
|
||||||
|
"""Process LIDAR scan and update grid."""
|
||||||
|
angle_min = msg.angle_min
|
||||||
|
angle_increment = msg.angle_increment
|
||||||
|
|
||||||
|
for i, distance in enumerate(msg.ranges):
|
||||||
|
if math.isnan(distance) or math.isinf(distance):
|
||||||
|
continue
|
||||||
|
|
||||||
|
if distance < self.scan_min_range or distance > self.scan_max_range:
|
||||||
|
continue
|
||||||
|
|
||||||
|
ray_angle = angle_min + i * angle_increment + self.robot_angle
|
||||||
|
|
||||||
|
hit_x = self.robot_x + distance * math.cos(ray_angle)
|
||||||
|
hit_y = self.robot_y + distance * math.sin(ray_angle)
|
||||||
|
|
||||||
|
self.grid.mark_obstacle(hit_x, hit_y)
|
||||||
|
|
||||||
|
steps = max(1, int(distance / (self.grid.resolution * 2)))
|
||||||
|
for step in range(steps):
|
||||||
|
free_dist = (distance * step) / steps
|
||||||
|
free_x = self.robot_x + free_dist * math.cos(ray_angle)
|
||||||
|
free_y = self.robot_y + free_dist * math.sin(ray_angle)
|
||||||
|
self.grid.mark_free(free_x, free_y)
|
||||||
|
|
||||||
|
def publish_map_callback(self):
|
||||||
|
"""Publish current occupancy grid."""
|
||||||
|
grid_msg = self.grid.to_occupancy_grid("map")
|
||||||
|
self.map_pub.publish(grid_msg)
|
||||||
|
|
||||||
|
def decay_callback(self):
|
||||||
|
"""Apply temporal decay to grid."""
|
||||||
|
self.grid.apply_temporal_decay()
|
||||||
|
stats = self.grid.get_statistics()
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Grid decay: {stats['hazard_cells']} hazards, "
|
||||||
|
f"{stats['obstacle_cells']} obstacles, {stats['free_cells']} free"
|
||||||
|
)
|
||||||
|
|
||||||
|
def save_map_callback(self):
|
||||||
|
"""Save grid to YAML file."""
|
||||||
|
try:
|
||||||
|
with open(self.map_file, "w") as f:
|
||||||
|
yaml.dump(self.grid.to_yaml(), f, default_flow_style=False)
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f"Failed to save map: {e}")
|
||||||
|
|
||||||
|
def destroy_node(self):
|
||||||
|
"""Cleanup: save map before shutdown."""
|
||||||
|
self.save_map_callback()
|
||||||
|
super().destroy_node()
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = ObstacleMemoryNode()
|
||||||
|
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
@ -0,0 +1,11 @@
|
|||||||
|
photo_capture_node:
|
||||||
|
ros__parameters:
|
||||||
|
photo_dir: "/home/seb/saltybot-data/photos/"
|
||||||
|
max_storage_gb: 10.0
|
||||||
|
timelapse_interval: 30.0
|
||||||
|
enable_timelapse: true
|
||||||
|
enable_event_capture: true
|
||||||
|
enable_gps_overlay: false
|
||||||
|
enable_nas_sync: false
|
||||||
|
nas_path: "/mnt/nas/saltybot-photos/"
|
||||||
|
quality: 95
|
||||||
@ -0,0 +1,56 @@
|
|||||||
|
"""Photo capture launch file (Issue #456)."""
|
||||||
|
|
||||||
|
from launch import LaunchDescription
|
||||||
|
from launch.actions import DeclareLaunchArgument
|
||||||
|
from launch.substitutions import LaunchConfiguration
|
||||||
|
from launch_ros.actions import Node
|
||||||
|
|
||||||
|
|
||||||
|
def generate_launch_description():
|
||||||
|
return LaunchDescription([
|
||||||
|
DeclareLaunchArgument('photo_dir',
|
||||||
|
default_value='/home/seb/saltybot-data/photos/',
|
||||||
|
description='Directory for photo storage'),
|
||||||
|
DeclareLaunchArgument('max_storage_gb',
|
||||||
|
default_value='10.0',
|
||||||
|
description='Maximum storage capacity (GB)'),
|
||||||
|
DeclareLaunchArgument('timelapse_interval',
|
||||||
|
default_value='30.0',
|
||||||
|
description='Timelapse interval (seconds)'),
|
||||||
|
DeclareLaunchArgument('enable_timelapse',
|
||||||
|
default_value='true',
|
||||||
|
description='Enable timelapse mode'),
|
||||||
|
DeclareLaunchArgument('enable_event_capture',
|
||||||
|
default_value='true',
|
||||||
|
description='Enable event-triggered capture'),
|
||||||
|
DeclareLaunchArgument('enable_gps_overlay',
|
||||||
|
default_value='false',
|
||||||
|
description='Add GPS overlay to photos'),
|
||||||
|
DeclareLaunchArgument('enable_nas_sync',
|
||||||
|
default_value='false',
|
||||||
|
description='Enable WiFi sync to NAS'),
|
||||||
|
DeclareLaunchArgument('nas_path',
|
||||||
|
default_value='/mnt/nas/saltybot-photos/',
|
||||||
|
description='NAS mount path'),
|
||||||
|
DeclareLaunchArgument('quality',
|
||||||
|
default_value='95',
|
||||||
|
description='JPEG quality (1-100)'),
|
||||||
|
|
||||||
|
Node(
|
||||||
|
package='saltybot_photo_capture',
|
||||||
|
executable='photo_capture_node',
|
||||||
|
name='photo_capture_node',
|
||||||
|
output='screen',
|
||||||
|
parameters=[{
|
||||||
|
'photo_dir': LaunchConfiguration('photo_dir'),
|
||||||
|
'max_storage_gb': LaunchConfiguration('max_storage_gb'),
|
||||||
|
'timelapse_interval': LaunchConfiguration('timelapse_interval'),
|
||||||
|
'enable_timelapse': LaunchConfiguration('enable_timelapse'),
|
||||||
|
'enable_event_capture': LaunchConfiguration('enable_event_capture'),
|
||||||
|
'enable_gps_overlay': LaunchConfiguration('enable_gps_overlay'),
|
||||||
|
'enable_nas_sync': LaunchConfiguration('enable_nas_sync'),
|
||||||
|
'nas_path': LaunchConfiguration('nas_path'),
|
||||||
|
'quality': LaunchConfiguration('quality'),
|
||||||
|
}],
|
||||||
|
),
|
||||||
|
])
|
||||||
@ -3,23 +3,13 @@
|
|||||||
<package format="3">
|
<package format="3">
|
||||||
<name>saltybot_photo_capture</name>
|
<name>saltybot_photo_capture</name>
|
||||||
<version>0.1.0</version>
|
<version>0.1.0</version>
|
||||||
<description>
|
<description>Photo capture service for SaltyBot (Issue #456)</description>
|
||||||
Photo capture service for SaltyBot — snapshot + timelapse + event-triggered.
|
|
||||||
Issue #456: Manual, timelapse, and event-triggered photo capture with
|
|
||||||
metadata, storage management, and WiFi sync to NAS.
|
|
||||||
</description>
|
|
||||||
<maintainer email="seb@vayrette.com">seb</maintainer>
|
<maintainer email="seb@vayrette.com">seb</maintainer>
|
||||||
<license>MIT</license>
|
<license>MIT</license>
|
||||||
<depend>rclpy</depend>
|
<depend>rclpy</depend>
|
||||||
<depend>std_msgs</depend>
|
<depend>std_msgs</depend>
|
||||||
<depend>sensor_msgs</depend>
|
<depend>sensor_msgs</depend>
|
||||||
<depend>cv_bridge</depend>
|
<depend>cv_bridge</depend>
|
||||||
<depend>opencv-python</depend>
|
|
||||||
<test_depend>ament_copyright</test_depend>
|
|
||||||
<test_depend>ament_flake8</test_depend>
|
|
||||||
<test_depend>ament_pep257</test_depend>
|
|
||||||
<test_depend>python3-pytest</test_depend>
|
<test_depend>python3-pytest</test_depend>
|
||||||
<export>
|
<export><build_type>ament_python</build_type></export>
|
||||||
<build_type>ament_python</build_type>
|
|
||||||
</export>
|
|
||||||
</package>
|
</package>
|
||||||
|
|||||||
@ -0,0 +1,242 @@
|
|||||||
|
"""photo_capture_node.py — Photo capture service for SaltyBot (Issue #456).
|
||||||
|
|
||||||
|
Subscribes to /camera/color/image_raw and captures photos in modes:
|
||||||
|
- manual: voice command or gamepad button trigger
|
||||||
|
- timelapse: configurable interval (e.g., every 30s)
|
||||||
|
- event-triggered: on person detection, encounter, or trick execution
|
||||||
|
|
||||||
|
Features: JPEG storage, FIFO management (10GB cap), GPS overlay, NAS sync.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import shutil
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
try:
|
||||||
|
import cv2
|
||||||
|
from cv_bridge import CvBridge
|
||||||
|
except ImportError:
|
||||||
|
cv2 = None
|
||||||
|
CvBridge = None
|
||||||
|
|
||||||
|
import rclpy
|
||||||
|
from rclpy.node import Node
|
||||||
|
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
|
||||||
|
from sensor_msgs.msg import Image
|
||||||
|
from std_msgs.msg import String
|
||||||
|
|
||||||
|
try:
|
||||||
|
from saltybot_social_msgs.msg import VoiceCommand
|
||||||
|
_HAS_VOICE = True
|
||||||
|
except ImportError:
|
||||||
|
_HAS_VOICE = False
|
||||||
|
|
||||||
|
|
||||||
|
class PhotoCaptureNode(Node):
|
||||||
|
"""Multi-mode photo capture service for SaltyBot."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
super().__init__("photo_capture_node")
|
||||||
|
|
||||||
|
# Parameters
|
||||||
|
self.declare_parameter("photo_dir", "/home/seb/saltybot-data/photos/")
|
||||||
|
self.declare_parameter("max_storage_gb", 10.0)
|
||||||
|
self.declare_parameter("timelapse_interval", 30.0)
|
||||||
|
self.declare_parameter("enable_timelapse", True)
|
||||||
|
self.declare_parameter("enable_event_capture", True)
|
||||||
|
self.declare_parameter("enable_gps_overlay", False)
|
||||||
|
self.declare_parameter("enable_nas_sync", False)
|
||||||
|
self.declare_parameter("nas_path", "/mnt/nas/saltybot-photos/")
|
||||||
|
self.declare_parameter("quality", 95)
|
||||||
|
|
||||||
|
self._photo_dir = Path(self.get_parameter("photo_dir").value)
|
||||||
|
self._max_storage_bytes = self.get_parameter("max_storage_gb").value * 1e9
|
||||||
|
self._timelapse_interval = self.get_parameter("timelapse_interval").value
|
||||||
|
self._enable_timelapse = self.get_parameter("enable_timelapse").value
|
||||||
|
self._enable_event_capture = self.get_parameter("enable_event_capture").value
|
||||||
|
self._enable_gps = self.get_parameter("enable_gps_overlay").value
|
||||||
|
self._enable_nas_sync = self.get_parameter("enable_nas_sync").value
|
||||||
|
self._nas_path = Path(self.get_parameter("nas_path").value)
|
||||||
|
self._quality = self.get_parameter("quality").value
|
||||||
|
|
||||||
|
self._photo_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
# QoS
|
||||||
|
sensor_qos = QoSProfile(
|
||||||
|
reliability=QoSReliabilityPolicy.BEST_EFFORT,
|
||||||
|
history=QoSHistoryPolicy.KEEP_LAST, depth=10
|
||||||
|
)
|
||||||
|
reliable_qos = QoSProfile(
|
||||||
|
reliability=QoSReliabilityPolicy.RELIABLE,
|
||||||
|
history=QoSHistoryPolicy.KEEP_LAST, depth=10
|
||||||
|
)
|
||||||
|
|
||||||
|
# Publishers
|
||||||
|
self._photo_pub = self.create_publisher(
|
||||||
|
String, "/saltybot/photo_captured", reliable_qos
|
||||||
|
)
|
||||||
|
|
||||||
|
# Subscribers
|
||||||
|
self._image_sub = self.create_subscription(
|
||||||
|
Image, "/camera/color/image_raw", self._on_image, sensor_qos
|
||||||
|
)
|
||||||
|
self._manual_sub = self.create_subscription(
|
||||||
|
String, "/saltybot/save_photo", self._on_manual_trigger, reliable_qos
|
||||||
|
)
|
||||||
|
if _HAS_VOICE:
|
||||||
|
self._voice_sub = self.create_subscription(
|
||||||
|
VoiceCommand, "/saltybot/voice_command",
|
||||||
|
self._on_voice_command, reliable_qos
|
||||||
|
)
|
||||||
|
self._trick_sub = self.create_subscription(
|
||||||
|
String, "/saltybot/trick_state", self._on_trick_state, reliable_qos
|
||||||
|
)
|
||||||
|
|
||||||
|
# State
|
||||||
|
self._latest_frame: Optional[Image] = None
|
||||||
|
self._bridge = CvBridge() if CvBridge else None
|
||||||
|
self._last_timelapse_time = time.time()
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
# Timers
|
||||||
|
if self._enable_timelapse:
|
||||||
|
self.create_timer(self._timelapse_interval, self._capture_timelapse)
|
||||||
|
self.create_timer(60.0, self._check_storage)
|
||||||
|
|
||||||
|
self.get_logger().info(
|
||||||
|
f"photo_capture_node ready (dir={self._photo_dir})"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _on_image(self, msg: Image) -> None:
|
||||||
|
"""Cache latest camera frame."""
|
||||||
|
with self._lock:
|
||||||
|
self._latest_frame = msg
|
||||||
|
|
||||||
|
def _on_manual_trigger(self, msg: String) -> None:
|
||||||
|
"""Handle manual photo capture trigger."""
|
||||||
|
self.get_logger().info(f"Manual capture: {msg.data}")
|
||||||
|
self._capture_photo(f"manual-{msg.data}")
|
||||||
|
|
||||||
|
def _on_voice_command(self, msg) -> None:
|
||||||
|
"""Handle voice commands for photo capture."""
|
||||||
|
if not _HAS_VOICE:
|
||||||
|
return
|
||||||
|
intent = msg.intent.lower()
|
||||||
|
if "photo" in intent or "capture" in intent or "picture" in intent:
|
||||||
|
self._capture_photo("voice-command")
|
||||||
|
|
||||||
|
def _on_trick_state(self, msg: String) -> None:
|
||||||
|
"""Capture photo on trick execution."""
|
||||||
|
if not self._enable_event_capture:
|
||||||
|
return
|
||||||
|
state = msg.data.lower()
|
||||||
|
if "executing:" in state:
|
||||||
|
trick = state.split(":")[-1]
|
||||||
|
self._capture_photo(f"event-{trick}")
|
||||||
|
|
||||||
|
def _capture_timelapse(self) -> None:
|
||||||
|
"""Periodic timelapse capture."""
|
||||||
|
if self._enable_timelapse:
|
||||||
|
self._capture_photo("timelapse")
|
||||||
|
|
||||||
|
def _capture_photo(self, mode: str) -> None:
|
||||||
|
"""Capture and save photo with metadata."""
|
||||||
|
with self._lock:
|
||||||
|
if self._latest_frame is None or not cv2 or not self._bridge:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
cv_image = self._bridge.imgmsg_to_cv2(
|
||||||
|
self._latest_frame, desired_encoding="bgr8"
|
||||||
|
)
|
||||||
|
timestamp = datetime.now()
|
||||||
|
filename = f"{timestamp.strftime('%Y%m%d_%H%M%S_%f')[:-3]}_{mode}.jpg"
|
||||||
|
filepath = self._photo_dir / filename
|
||||||
|
|
||||||
|
if self._enable_gps:
|
||||||
|
self._add_overlay(cv_image, timestamp, mode)
|
||||||
|
|
||||||
|
cv2.imwrite(
|
||||||
|
str(filepath), cv_image,
|
||||||
|
[cv2.IMWRITE_JPEG_QUALITY, self._quality]
|
||||||
|
)
|
||||||
|
|
||||||
|
metadata = {
|
||||||
|
"timestamp": timestamp.isoformat(),
|
||||||
|
"mode": mode,
|
||||||
|
"size": filepath.stat().st_size,
|
||||||
|
}
|
||||||
|
self._photo_pub.publish(String(data=json.dumps(metadata)))
|
||||||
|
self.get_logger().info(f"Captured: {filename}")
|
||||||
|
|
||||||
|
if self._enable_nas_sync:
|
||||||
|
self._sync_to_nas(filepath)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f"Capture failed: {e}")
|
||||||
|
|
||||||
|
def _add_overlay(self, image: any, timestamp: datetime, mode: str) -> None:
|
||||||
|
"""Add timestamp and mode overlay to image."""
|
||||||
|
if not cv2:
|
||||||
|
return
|
||||||
|
height, width = image.shape[:2]
|
||||||
|
timestamp_str = timestamp.strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
cv2.putText(image, timestamp_str, (10, 30),
|
||||||
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
|
||||||
|
cv2.putText(image, f"Mode: {mode}", (10, height - 10),
|
||||||
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
|
||||||
|
|
||||||
|
def _check_storage(self) -> None:
|
||||||
|
"""Monitor storage and purge oldest photos if exceeding capacity."""
|
||||||
|
try:
|
||||||
|
total_size = sum(
|
||||||
|
f.stat().st_size for f in self._photo_dir.glob("*.jpg")
|
||||||
|
)
|
||||||
|
if total_size > self._max_storage_bytes:
|
||||||
|
self.get_logger().warn(
|
||||||
|
f"Storage limit exceeded, purging oldest"
|
||||||
|
)
|
||||||
|
files = sorted(
|
||||||
|
self._photo_dir.glob("*.jpg"),
|
||||||
|
key=lambda f: f.stat().st_mtime
|
||||||
|
)
|
||||||
|
for file in files:
|
||||||
|
if total_size <= self._max_storage_bytes * 0.9:
|
||||||
|
break
|
||||||
|
total_size -= file.stat().st_size
|
||||||
|
file.unlink()
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().error(f"Storage check failed: {e}")
|
||||||
|
|
||||||
|
def _sync_to_nas(self, filepath: Path) -> None:
|
||||||
|
"""Asynchronous WiFi sync to NAS."""
|
||||||
|
def sync_worker():
|
||||||
|
try:
|
||||||
|
shutil.copy2(filepath, self._nas_path / filepath.name)
|
||||||
|
except Exception as e:
|
||||||
|
self.get_logger().warn(f"NAS sync failed: {e}")
|
||||||
|
thread = threading.Thread(target=sync_worker, daemon=True)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
|
||||||
|
def main(args=None):
|
||||||
|
rclpy.init(args=args)
|
||||||
|
node = PhotoCaptureNode()
|
||||||
|
try:
|
||||||
|
rclpy.spin(node)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
node.destroy_node()
|
||||||
|
rclpy.shutdown()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
@ -2,19 +2,17 @@ from setuptools import find_packages, setup
|
|||||||
import os
|
import os
|
||||||
from glob import glob
|
from glob import glob
|
||||||
|
|
||||||
package_name = 'saltybot_photo_capture'
|
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name=package_name,
|
name='saltybot_photo_capture',
|
||||||
version='0.1.0',
|
version='0.1.0',
|
||||||
packages=find_packages(exclude=['test']),
|
packages=find_packages(exclude=['test']),
|
||||||
data_files=[
|
data_files=[
|
||||||
('share/ament_index/resource_index/packages',
|
('share/ament_index/resource_index/packages',
|
||||||
['resource/' + package_name]),
|
['resource/saltybot_photo_capture']),
|
||||||
('share/' + package_name, ['package.xml']),
|
('share/saltybot_photo_capture', ['package.xml']),
|
||||||
(os.path.join('share', package_name, 'launch'),
|
(os.path.join('share', 'saltybot_photo_capture', 'launch'),
|
||||||
glob(os.path.join('launch', '*launch.[pxy][yma]*'))),
|
glob(os.path.join('launch', '*launch.[pxy][yma]*'))),
|
||||||
(os.path.join('share', package_name, 'config'),
|
(os.path.join('share', 'saltybot_photo_capture', 'config'),
|
||||||
glob(os.path.join('config', '*.yaml'))),
|
glob(os.path.join('config', '*.yaml'))),
|
||||||
],
|
],
|
||||||
install_requires=['setuptools'],
|
install_requires=['setuptools'],
|
||||||
|
|||||||
@ -0,0 +1,35 @@
|
|||||||
|
"""Unit tests for photo capture node (Issue #456)."""
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
|
||||||
|
class TestPhotoCaptureNode(unittest.TestCase):
|
||||||
|
"""Test photo capture module structure."""
|
||||||
|
|
||||||
|
def test_module_imports(self):
|
||||||
|
"""Verify module can be imported."""
|
||||||
|
try:
|
||||||
|
import saltybot_photo_capture
|
||||||
|
self.assertIsNotNone(saltybot_photo_capture)
|
||||||
|
except ImportError as e:
|
||||||
|
self.fail(f"Failed to import: {e}")
|
||||||
|
|
||||||
|
def test_node_class_exists(self):
|
||||||
|
"""Verify PhotoCaptureNode class exists."""
|
||||||
|
try:
|
||||||
|
from saltybot_photo_capture.photo_capture_node import PhotoCaptureNode
|
||||||
|
self.assertIsNotNone(PhotoCaptureNode)
|
||||||
|
except ImportError as e:
|
||||||
|
self.fail(f"Failed to import PhotoCaptureNode: {e}")
|
||||||
|
|
||||||
|
def test_main_function_exists(self):
|
||||||
|
"""Verify main function exists."""
|
||||||
|
try:
|
||||||
|
from saltybot_photo_capture.photo_capture_node import main
|
||||||
|
self.assertTrue(callable(main))
|
||||||
|
except ImportError as e:
|
||||||
|
self.fail(f"Failed to import main: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
||||||
Loading…
x
Reference in New Issue
Block a user