feat(social): personality system — SOUL.md persona, mood engine, relationship DB (Issue #84) #98

Merged
sl-jetson merged 1 commits from sl-controls/social-personality into main 2026-03-01 23:58:43 -05:00
17 changed files with 1793 additions and 2 deletions
Showing only changes of commit 44771751e2 - Show all commits

View File

@ -23,6 +23,9 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"msg/Mood.msg"
"msg/Person.msg"
"msg/PersonArray.msg"
# Issue #84 personality system
"msg/PersonalityState.msg"
"srv/QueryMood.srv"
DEPENDENCIES std_msgs geometry_msgs builtin_interfaces
)

View File

@ -0,0 +1,27 @@
# PersonalityState.msg — published on /social/personality/state
#
# Snapshot of the personality node's current state: active mood, relationship
# tier with the detected person, and a pre-generated greeting string.
std_msgs/Header header
# Active persona name (from SOUL.md)
string persona_name
# Current mood: happy | curious | annoyed | playful
string mood
# Person currently being addressed (empty if no one detected)
string person_id
# Relationship tier with person_id: stranger | regular | favorite
string relationship_tier
# Raw relationship score (higher = more familiar)
float32 relationship_score
# How many times we have seen this person
int32 interaction_count
# Ready-to-use greeting for person_id at current tier
string greeting_text

View File

@ -3,16 +3,25 @@
<package format="3">
<name>saltybot_social_msgs</name>
<version>0.1.0</version>
<description>Custom ROS2 messages and services for saltybot social capabilities</description>
<description>
Custom ROS2 message and service definitions for saltybot social capabilities.
Includes social perception types (face detection, person state, enrollment)
and the personality system types (PersonalityState, QueryMood) from Issue #84.
</description>
<maintainer email="seb@vayrette.com">seb</maintainer>
<license>MIT</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<build_depend>rosidl_default_generators</build_depend>
<depend>std_msgs</depend>
<depend>geometry_msgs</depend>
<depend>builtin_interfaces</depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>

View File

@ -0,0 +1,15 @@
# QueryMood.srv — ask the personality node for the current mood + greeting for a person
#
# Call with empty person_id to query the mood for whoever is currently tracked.
# Request
string person_id # person to query; leave empty for current tracked person
---
# Response
string mood # happy | curious | annoyed | playful
string relationship_tier # stranger | regular | favorite
float32 relationship_score
int32 interaction_count
string greeting_text # suggested greeting string
bool success
string message # error detail if success=false

View File

@ -0,0 +1,42 @@
---
# SOUL.md — Saltybot persona definition
#
# Hot-reload: personality_node watches this file and reloads on change.
# Runtime override: ros2 param set /personality_node soul_file /path/to/SOUL.md
# ── Identity ──────────────────────────────────────────────────────────────────
name: "Salty"
speaking_style: "casual and upbeat, occasional puns"
# ── Personality dials (010) ──────────────────────────────────────────────────
humor_level: 7 # 0 = deadpan/serious, 10 = comedian
sass_level: 4 # 0 = pure politeness, 10 = maximum sass
# ── Default mood (when no person is present or score is neutral) ──────────────
# One of: happy | curious | annoyed | playful
base_mood: "playful"
# ── Relationship thresholds (interaction counts) ──────────────────────────────
threshold_regular: 5 # interactions to graduate from stranger → regular
threshold_favorite: 20 # interactions to graduate from regular → favorite
# ── Greeting templates (use {name} placeholder for person_id or display name) ─
greeting_stranger: "Hello there! I'm Salty, nice to meet you!"
greeting_regular: "Hey {name}! Good to see you again!"
greeting_favorite: "Oh hey {name}!! You're literally my favorite person right now!"
# ── Mood-specific greeting prefixes ──────────────────────────────────────────
mood_prefix_happy: "Great timing — "
mood_prefix_curious: "Oh interesting, "
mood_prefix_annoyed: "Well, "
mood_prefix_playful: "Beep boop! "
---
# Description (ignored by the YAML parser, for human reference only)
Salty is the personality of the saltybot social robot.
She is curious about the world, genuinely happy to see people she knows,
and has a good sense of humour — especially with regulars.
Edit this file to change her personality. The node hot-reloads within
`reload_interval` seconds of any change.

View File

@ -0,0 +1,28 @@
# personality_params.yaml — ROS2 parameter defaults for personality_node.
#
# Run with:
# ros2 launch saltybot_social_personality personality.launch.py
# Override inline:
# ros2 launch saltybot_social_personality personality.launch.py soul_file:=/my/SOUL.md
# Dynamic reconfigure at runtime:
# ros2 param set /personality_node soul_file /path/to/SOUL.md
# ros2 param set /personality_node publish_rate 5.0
# ── SOUL.md path ───────────────────────────────────────────────────────────────
# Path to the SOUL.md persona file. Supports hot-reload.
# Relative paths are resolved from the package share directory.
soul_file: "" # empty = use bundled default config/SOUL.md
# ── SQLite database ────────────────────────────────────────────────────────────
# Path for the per-person relationship memory database.
# Created on first run; persists across restarts.
db_path: "~/.ros/saltybot_personality.db"
# ── Hot-reload polling interval ────────────────────────────────────────────────
# How often (seconds) to check if SOUL.md has been modified.
# Lower = faster reactions to edits; higher = less disk I/O.
reload_interval: 5.0 # seconds
# ── Personality state publication rate ────────────────────────────────────────
# How often to publish /social/personality/state (PersonalityState).
publish_rate: 2.0 # Hz

View File

@ -0,0 +1,99 @@
"""
personality.launch.py Launch the saltybot personality node.
Usage
-----
# Defaults (bundled SOUL.md, ~/.ros/saltybot_personality.db):
ros2 launch saltybot_social_personality personality.launch.py
# Custom persona file:
ros2 launch saltybot_social_personality personality.launch.py \\
soul_file:=/home/robot/my_persona/SOUL.md
# Custom DB + faster reload:
ros2 launch saltybot_social_personality personality.launch.py \\
db_path:=/data/saltybot.db reload_interval:=2.0
# Use a params file:
ros2 launch saltybot_social_personality personality.launch.py \\
params_file:=/my/personality_params.yaml
Dynamic reconfigure (no restart required)
-----------------------------------------
ros2 param set /personality_node soul_file /new/SOUL.md
ros2 param set /personality_node publish_rate 5.0
"""
import os
from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, OpaqueFunction
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def _launch_personality(context, *args, **kwargs):
pkg_share = get_package_share_directory("saltybot_social_personality")
params_file = LaunchConfiguration("params_file").perform(context)
soul_file = LaunchConfiguration("soul_file").perform(context)
db_path = LaunchConfiguration("db_path").perform(context)
# Default soul_file to bundled config if not specified
if not soul_file:
soul_file = os.path.join(pkg_share, "config", "SOUL.md")
# Expand ~ in db_path
if db_path:
db_path = os.path.expanduser(db_path)
inline_params = {
"soul_file": soul_file,
"db_path": db_path or os.path.expanduser("~/.ros/saltybot_personality.db"),
"reload_interval": float(LaunchConfiguration("reload_interval").perform(context)),
"publish_rate": float(LaunchConfiguration("publish_rate").perform(context)),
}
node_params = [params_file, inline_params] if params_file else [inline_params]
return [Node(
package = "saltybot_social_personality",
executable = "personality_node",
name = "personality_node",
output = "screen",
parameters = node_params,
)]
def generate_launch_description():
pkg_share = get_package_share_directory("saltybot_social_personality")
default_params = os.path.join(pkg_share, "config", "personality_params.yaml")
return LaunchDescription([
DeclareLaunchArgument(
"params_file",
default_value=default_params,
description="Full path to personality_params.yaml (base config)"),
DeclareLaunchArgument(
"soul_file",
default_value="",
description="Path to SOUL.md persona file (empty = bundled default)"),
DeclareLaunchArgument(
"db_path",
default_value="~/.ros/saltybot_personality.db",
description="SQLite relationship memory database path"),
DeclareLaunchArgument(
"reload_interval",
default_value="5.0",
description="SOUL.md hot-reload polling interval (s)"),
DeclareLaunchArgument(
"publish_rate",
default_value="2.0",
description="Personality state publish rate (Hz)"),
OpaqueFunction(function=_launch_personality),
])

View File

@ -0,0 +1,32 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>saltybot_social_personality</name>
<version>0.1.0</version>
<description>
SOUL.md-driven personality system for saltybot.
Loads a YAML/Markdown persona file, maintains per-person relationship memory
in SQLite, computes mood (happy/curious/annoyed/playful), personalises
greetings by tier (stranger/regular/favorite), and publishes personality
state on /social/personality/state. Supports SOUL.md hot-reload and full
ROS2 dynamic reconfigure. Issue #84.
</description>
<maintainer email="sl-controls@saltylab.local">sl-controls</maintainer>
<license>MIT</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>rcl_interfaces</depend>
<depend>saltybot_social_msgs</depend>
<buildtool_depend>ament_python</buildtool_depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,187 @@
"""
mood_engine.py Pure-function mood computation for the saltybot personality system.
No ROS2 imports safe to unit-test without a live ROS2 environment.
Public API
----------
compute_mood(soul, score, interaction_count, recent_events) -> str
get_relationship_tier(soul, interaction_count) -> str
build_greeting(soul, tier, mood, person_id) -> str
Mood semantics
--------------
happy : positive valence, comfortable familiarity
playful : high-energy, humorous (requires humor_level >= 7)
curious : low familiarity or novel person inquisitive
annoyed : recent negative events or very low score
Tier semantics
--------------
stranger : interaction_count < threshold_regular
regular : threshold_regular <= count < threshold_favorite
favorite : count >= threshold_favorite
"""
from __future__ import annotations
# ── Mood / tier constants ──────────────────────────────────────────────────────
MOOD_HAPPY = "happy"
MOOD_PLAYFUL = "playful"
MOOD_CURIOUS = "curious"
MOOD_ANNOYED = "annoyed"
TIER_STRANGER = "stranger"
TIER_REGULAR = "regular"
TIER_FAVORITE = "favorite"
# ── Event type constants (used by relationship_db and the node) ────────────────
EVENT_GREETING = "greeting"
EVENT_POSITIVE = "positive"
EVENT_NEGATIVE = "negative"
EVENT_DETECTION = "detection"
# How far back (seconds) to consider "recent" for mood computation
_RECENT_WINDOW_S = 120.0
# ── Mood computation ──────────────────────────────────────────────────────────
def compute_mood(
soul: dict,
score: float,
interaction_count: int,
recent_events: list,
) -> str:
"""Compute the current mood for a given person.
Parameters
----------
soul : dict
Parsed SOUL.md configuration.
score : float
Relationship score for the current person (higher = more familiar).
interaction_count : int
Total number of times we have seen this person.
recent_events : list of dict
Each dict: ``{"type": str, "dt": float}`` where ``dt`` is seconds ago.
Only events with ``dt < 120.0`` are considered "recent".
Returns
-------
str
One of: ``"happy"``, ``"playful"``, ``"curious"``, ``"annoyed"``.
"""
base_mood = soul.get("base_mood", MOOD_PLAYFUL)
humor_level = float(soul.get("humor_level", 5))
# Count recent negative/positive events
recent_neg = sum(
1 for e in recent_events
if e.get("type") == EVENT_NEGATIVE and e.get("dt", 1e9) < _RECENT_WINDOW_S
)
recent_pos = sum(
1 for e in recent_events
if e.get("type") in (EVENT_POSITIVE, EVENT_GREETING)
and e.get("dt", 1e9) < _RECENT_WINDOW_S
)
# Hard override: multiple negatives → annoyed
if recent_neg >= 2:
return MOOD_ANNOYED
# No prior interactions or brand-new person → curious
if interaction_count == 0 or score < 1.0:
return MOOD_CURIOUS
# Stranger tier (low count) → curious
threshold_regular = int(soul.get("threshold_regular", 5))
if interaction_count < threshold_regular:
return MOOD_CURIOUS
# Familiar person: check positive events and humor level
if recent_pos >= 1 or score >= 20.0:
if humor_level >= 7:
return MOOD_PLAYFUL
return MOOD_HAPPY
# High score / favorite
threshold_fav = int(soul.get("threshold_favorite", 20))
if interaction_count >= threshold_fav:
if humor_level >= 7:
return MOOD_PLAYFUL
return MOOD_HAPPY
return base_mood
# ── Tier classification ────────────────────────────────────────────────────────
def get_relationship_tier(soul: dict, interaction_count: int) -> str:
"""Return the relationship tier string for a given interaction count.
Parameters
----------
soul : dict
Parsed SOUL.md configuration.
interaction_count : int
Total number of times we have seen this person.
Returns
-------
str
One of: ``"stranger"``, ``"regular"``, ``"favorite"``.
"""
threshold_regular = int(soul.get("threshold_regular", 5))
threshold_favorite = int(soul.get("threshold_favorite", 20))
if interaction_count >= threshold_favorite:
return TIER_FAVORITE
if interaction_count >= threshold_regular:
return TIER_REGULAR
return TIER_STRANGER
# ── Greeting builder ──────────────────────────────────────────────────────────
def build_greeting(soul: dict, tier: str, mood: str, person_id: str = "") -> str:
"""Compose a greeting string for a person.
Parameters
----------
soul : dict
Parsed SOUL.md configuration.
tier : str
Relationship tier (``"stranger"``, ``"regular"``, ``"favorite"``).
mood : str
Current mood (used to prefix the greeting).
person_id : str
Person identifier / display name. Substituted for ``{name}``
in the template.
Returns
-------
str
A complete, ready-to-display greeting string.
"""
template_key = {
TIER_STRANGER: "greeting_stranger",
TIER_REGULAR: "greeting_regular",
TIER_FAVORITE: "greeting_favorite",
}.get(tier, "greeting_stranger")
template = soul.get(template_key, "Hello!")
base_greeting = template.replace("{name}", person_id or "friend")
prefix_key = f"mood_prefix_{mood}"
prefix = soul.get(prefix_key, "")
if prefix:
# Avoid double punctuation / duplicate capital letters
base_first = base_greeting[0].lower() if base_greeting else ""
greeting = f"{prefix}{base_first}{base_greeting[1:]}"
else:
greeting = base_greeting
return greeting

View File

@ -0,0 +1,349 @@
"""
personality_node.py ROS2 personality system for saltybot.
Overview
--------
Loads a SOUL.md persona file, maintains per-person relationship memory in
SQLite, computes mood, and publishes personality state. All tunable params
support ROS2 dynamic reconfigure (``ros2 param set``).
Subscriptions
-------------
/social/person_detected (std_msgs/String)
JSON payload: ``{"person_id": "alice", "event_type": "greeting",
"delta_score": 1.0}``
event_type defaults to "detection" if absent.
delta_score defaults to 0.0 if absent.
Publications
------------
/social/personality/state (saltybot_social_msgs/PersonalityState)
Published at ``publish_rate`` Hz.
Services
--------
/social/personality/query_mood (saltybot_social_msgs/QueryMood)
Query mood + greeting for any person_id.
Parameters (dynamic reconfigure via ros2 param set)
-------------------
soul_file (str) Path to SOUL.md persona file.
db_path (str) SQLite database file path.
reload_interval (float) How often to poll SOUL.md for changes (s).
publish_rate (float) State publication rate (Hz).
Usage
-----
ros2 launch saltybot_social_personality personality.launch.py
ros2 launch saltybot_social_personality personality.launch.py soul_file:=/my/SOUL.md
ros2 param set /personality_node soul_file /tmp/new_SOUL.md
"""
import json
import os
import rclpy
from rclpy.node import Node
from rcl_interfaces.msg import SetParametersResult
from std_msgs.msg import String, Header
from saltybot_social_msgs.msg import PersonalityState
from saltybot_social_msgs.srv import QueryMood
from .soul_loader import load_soul, SoulWatcher
from .mood_engine import (
compute_mood, get_relationship_tier, build_greeting,
EVENT_GREETING, EVENT_POSITIVE, EVENT_NEGATIVE, EVENT_DETECTION,
)
from .relationship_db import RelationshipDB
_DEFAULT_SOUL = os.path.join(
os.path.dirname(__file__), "..", "config", "SOUL.md"
)
_DEFAULT_DB = os.path.expanduser("~/.ros/saltybot_personality.db")
class PersonalityNode(Node):
def __init__(self):
super().__init__("personality_node")
# ── Parameters ────────────────────────────────────────────────────────
self.declare_parameter("soul_file", _DEFAULT_SOUL)
self.declare_parameter("db_path", _DEFAULT_DB)
self.declare_parameter("reload_interval", 5.0)
self.declare_parameter("publish_rate", 2.0)
self._p = {}
self._reload_ros_params()
# ── State ─────────────────────────────────────────────────────────────
self._soul = {}
self._current_person = "" # person_id currently being addressed
self._watcher = None
# ── Database ──────────────────────────────────────────────────────────
self._db = RelationshipDB(self._p["db_path"])
# ── Load initial SOUL.md ──────────────────────────────────────────────
self._load_soul_safe()
self._start_watcher()
# ── Dynamic reconfigure callback ─────────────────────────────────────
self.add_on_set_parameters_callback(self._on_params_changed)
# ── Subscriptions ─────────────────────────────────────────────────────
self.create_subscription(
String, "/social/person_detected", self._person_detected_cb, 10
)
# ── Publishers ────────────────────────────────────────────────────────
self._state_pub = self.create_publisher(
PersonalityState, "/social/personality/state", 10
)
# ── Services ──────────────────────────────────────────────────────────
self.create_service(
QueryMood,
"/social/personality/query_mood",
self._query_mood_cb,
)
# ── Timers ────────────────────────────────────────────────────────────
self._pub_timer = self.create_timer(
1.0 / self._p["publish_rate"], self._publish_state
)
self.get_logger().info(
f"PersonalityNode ready "
f"persona={self._soul.get('name', '?')!r} "
f"mood={self._current_mood()!r} "
f"db={self._p['db_path']!r}"
)
# ── Parameter helpers ──────────────────────────────────────────────────────
def _reload_ros_params(self):
self._p = {
"soul_file": self.get_parameter("soul_file").value,
"db_path": self.get_parameter("db_path").value,
"reload_interval": self.get_parameter("reload_interval").value,
"publish_rate": self.get_parameter("publish_rate").value,
}
def _on_params_changed(self, params):
"""Dynamic reconfigure — apply changed params without restarting node."""
for param in params:
if param.name == "soul_file":
# Restart watcher on new soul_file
self._stop_watcher()
self._p["soul_file"] = param.value
self._load_soul_safe()
self._start_watcher()
self.get_logger().info(f"soul_file changed → {param.value!r}")
elif param.name in self._p:
self._p[param.name] = param.value
if param.name == "publish_rate" and self._pub_timer:
self._pub_timer.cancel()
self._pub_timer = self.create_timer(
1.0 / max(0.1, param.value), self._publish_state
)
return SetParametersResult(successful=True)
# ── SOUL.md ────────────────────────────────────────────────────────────────
def _load_soul_safe(self):
try:
path = os.path.realpath(self._p["soul_file"])
self._soul = load_soul(path)
self.get_logger().info(
f"SOUL.md loaded: {self._soul.get('name', '?')!r} "
f"humor={self._soul.get('humor_level')} "
f"sass={self._soul.get('sass_level')} "
f"base_mood={self._soul.get('base_mood')!r}"
)
except Exception as exc:
self.get_logger().error(f"Failed to load SOUL.md: {exc}")
if not self._soul:
# Fall back to minimal defaults so the node stays alive
self._soul = {
"name": "Salty",
"humor_level": 5,
"sass_level": 3,
"base_mood": "curious",
"threshold_regular": 5,
"threshold_favorite": 20,
"greeting_stranger": "Hello!",
"greeting_regular": "Hi {name}!",
"greeting_favorite": "Hey {name}!!",
}
def _start_watcher(self):
if not self._soul:
return
self._watcher = SoulWatcher(
path=self._p["soul_file"],
on_reload=self._on_soul_reloaded,
interval=self._p["reload_interval"],
on_error=lambda exc: self.get_logger().warn(
f"SOUL.md hot-reload error: {exc}"
),
)
self._watcher.start()
def _stop_watcher(self):
if self._watcher:
self._watcher.stop()
self._watcher = None
def _on_soul_reloaded(self, soul: dict):
self._soul = soul
self.get_logger().info(
f"SOUL.md reloaded: persona={soul.get('name')!r} "
f"humor={soul.get('humor_level')} base_mood={soul.get('base_mood')!r}"
)
# ── Mood helpers ───────────────────────────────────────────────────────────
def _current_mood(self) -> str:
if not self._current_person or not self._soul:
return self._soul.get("base_mood", "curious") if self._soul else "curious"
person = self._db.get_person(self._current_person)
recent = self._db.get_recent_events(self._current_person, window_s=120.0)
return compute_mood(
soul = self._soul,
score = person["score"],
interaction_count = person["interaction_count"],
recent_events = recent,
)
def _state_for_person(self, person_id: str) -> dict:
"""Build a complete state dict for a given person_id."""
person = self._db.get_person(person_id) if person_id else {
"score": 0.0, "interaction_count": 0
}
recent = self._db.get_recent_events(person_id, window_s=120.0) if person_id else []
mood = compute_mood(
soul = self._soul,
score = person["score"],
interaction_count = person["interaction_count"],
recent_events = recent,
)
tier = get_relationship_tier(self._soul, person["interaction_count"])
greeting = build_greeting(self._soul, tier, mood, person_id)
return {
"person_id": person_id,
"mood": mood,
"tier": tier,
"score": person["score"],
"interaction_count": person["interaction_count"],
"greeting": greeting,
}
# ── Callbacks ──────────────────────────────────────────────────────────────
def _person_detected_cb(self, msg: String):
"""Handle incoming person detection / interaction event.
Expected JSON payload::
{
"person_id": "alice", # required
"event_type": "greeting", # optional, default "detection"
"delta_score": 1.0 # optional, default 0.0
}
"""
try:
data = json.loads(msg.data)
except json.JSONDecodeError as exc:
self.get_logger().warn(f"Bad JSON on /social/person_detected: {exc}")
return
person_id = data.get("person_id", "").strip()
if not person_id:
self.get_logger().warn("person_detected msg missing 'person_id'")
return
event_type = data.get("event_type", EVENT_DETECTION)
delta_score = float(data.get("delta_score", 0.0))
# Increment score by +1 for detection events automatically
if event_type == EVENT_DETECTION and delta_score == 0.0:
delta_score = 0.5
self._db.record_interaction(
person_id = person_id,
event_type = event_type,
details = {k: v for k, v in data.items()
if k not in ("person_id", "event_type", "delta_score")},
delta_score = delta_score,
)
self._current_person = person_id
def _query_mood_cb(self, request: QueryMood.Request, response: QueryMood.Response):
"""Service handler: return mood + greeting for a specific person."""
if not self._soul:
response.success = False
response.message = "SOUL.md not loaded"
return response
person_id = (request.person_id or self._current_person).strip()
state = self._state_for_person(person_id)
response.mood = state["mood"]
response.relationship_tier = state["tier"]
response.relationship_score = float(state["score"])
response.interaction_count = int(state["interaction_count"])
response.greeting_text = state["greeting"]
response.success = True
response.message = ""
return response
# ── Publish ────────────────────────────────────────────────────────────────
def _publish_state(self):
if not self._soul:
return
state = self._state_for_person(self._current_person)
msg = PersonalityState()
msg.header = Header()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = "personality"
msg.persona_name = str(self._soul.get("name", "Salty"))
msg.mood = state["mood"]
msg.person_id = state["person_id"]
msg.relationship_tier = state["tier"]
msg.relationship_score = float(state["score"])
msg.interaction_count = int(state["interaction_count"])
msg.greeting_text = state["greeting"]
self._state_pub.publish(msg)
# ── Lifecycle ──────────────────────────────────────────────────────────────
def destroy_node(self):
self._stop_watcher()
self._db.close()
super().destroy_node()
# ── Entry point ────────────────────────────────────────────────────────────────
def main(args=None):
rclpy.init(args=args)
node = PersonalityNode()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.try_shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,297 @@
"""
relationship_db.py SQLite-backed per-person relationship memory.
No ROS2 imports safe to unit-test without a live ROS2 environment.
Schema
------
people (
person_id TEXT PRIMARY KEY,
score REAL DEFAULT 0.0,
interaction_count INTEGER DEFAULT 0,
first_seen TEXT, -- ISO-8601 UTC timestamp
last_seen TEXT, -- ISO-8601 UTC timestamp
prefs TEXT -- JSON blob for learned preferences
)
interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
person_id TEXT,
ts TEXT, -- ISO-8601 UTC timestamp
event_type TEXT, -- greeting | positive | negative | detection
details TEXT -- free-form JSON blob
)
Public API
----------
RelationshipDB(db_path)
.get_person(person_id) -> dict
.record_interaction(person_id, event_type, details, delta_score)
.set_pref(person_id, key, value)
.get_pref(person_id, key, default)
.get_recent_events(person_id, window_s) -> list[dict]
.all_people() -> list[dict]
.close()
"""
from __future__ import annotations
import json
import os
import sqlite3
import threading
from datetime import datetime, timezone
def _now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
class RelationshipDB:
"""Thread-safe SQLite relationship store.
Parameters
----------
db_path : str
Path to the SQLite file. Created (with parent dirs) if absent.
"""
def __init__(self, db_path: str):
parent = os.path.dirname(db_path)
if parent:
os.makedirs(parent, exist_ok=True)
self._path = db_path
self._lock = threading.Lock()
self._conn = sqlite3.connect(db_path, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._migrate()
# ── Schema ────────────────────────────────────────────────────────────────
def _migrate(self):
with self._conn:
self._conn.executescript("""
CREATE TABLE IF NOT EXISTS people (
person_id TEXT PRIMARY KEY,
score REAL DEFAULT 0.0,
interaction_count INTEGER DEFAULT 0,
first_seen TEXT,
last_seen TEXT,
prefs TEXT DEFAULT '{}'
);
CREATE TABLE IF NOT EXISTS interactions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
person_id TEXT NOT NULL,
ts TEXT NOT NULL,
event_type TEXT NOT NULL,
details TEXT DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_interactions_person_ts
ON interactions (person_id, ts);
""")
# ── People ────────────────────────────────────────────────────────────────
def get_person(self, person_id: str) -> dict:
"""Return the person record; inserts a default row if not found.
Returns
-------
dict with keys: person_id, score, interaction_count,
first_seen, last_seen, prefs (dict)
"""
with self._lock:
row = self._conn.execute(
"SELECT * FROM people WHERE person_id = ?", (person_id,)
).fetchone()
if row is None:
now = _now_iso()
self._conn.execute(
"INSERT INTO people (person_id, first_seen, last_seen) VALUES (?,?,?)",
(person_id, now, now),
)
self._conn.commit()
return {
"person_id": person_id,
"score": 0.0,
"interaction_count": 0,
"first_seen": now,
"last_seen": now,
"prefs": {},
}
prefs = {}
try:
prefs = json.loads(row["prefs"] or "{}")
except json.JSONDecodeError:
pass
return {
"person_id": row["person_id"],
"score": float(row["score"]),
"interaction_count": int(row["interaction_count"]),
"first_seen": row["first_seen"],
"last_seen": row["last_seen"],
"prefs": prefs,
}
def all_people(self) -> list:
"""Return all person records as a list of dicts."""
with self._lock:
rows = self._conn.execute("SELECT * FROM people ORDER BY score DESC").fetchall()
result = []
for row in rows:
prefs = {}
try:
prefs = json.loads(row["prefs"] or "{}")
except json.JSONDecodeError:
pass
result.append({
"person_id": row["person_id"],
"score": float(row["score"]),
"interaction_count": int(row["interaction_count"]),
"first_seen": row["first_seen"],
"last_seen": row["last_seen"],
"prefs": prefs,
})
return result
# ── Interactions ──────────────────────────────────────────────────────────
def record_interaction(
self,
person_id: str,
event_type: str,
details: dict | None = None,
delta_score: float = 0.0,
):
"""Record an interaction event and update the person's score.
Parameters
----------
person_id : str
event_type : str
One of: ``"greeting"``, ``"positive"``, ``"negative"``,
``"detection"``.
details : dict, optional
Arbitrary key/value data stored as JSON.
delta_score : float
Amount to add to the person's score (can be negative).
Interaction count is always incremented by 1.
"""
now = _now_iso()
details_json = json.dumps(details or {})
with self._lock:
# Ensure person exists
self.get_person.__wrapped__(self, person_id) if hasattr(
self.get_person, "__wrapped__"
) else None
# Upsert person row
self._conn.execute("""
INSERT INTO people (person_id, first_seen, last_seen)
VALUES (?, ?, ?)
ON CONFLICT(person_id) DO UPDATE SET
last_seen = excluded.last_seen
""", (person_id, now, now))
# Increment count + score
self._conn.execute("""
UPDATE people
SET interaction_count = interaction_count + 1,
score = score + ?,
last_seen = ?
WHERE person_id = ?
""", (delta_score, now, person_id))
# Insert interaction log row
self._conn.execute("""
INSERT INTO interactions (person_id, ts, event_type, details)
VALUES (?, ?, ?, ?)
""", (person_id, now, event_type, details_json))
self._conn.commit()
def get_recent_events(self, person_id: str, window_s: float = 120.0) -> list:
"""Return interaction events for *person_id* within the last *window_s* seconds.
Returns
-------
list of dict
Each dict: ``{"type": str, "dt": float, "ts": str, "details": dict}``
where ``dt`` is seconds ago (positive = in the past).
"""
from datetime import timedelta
cutoff = (
datetime.now(timezone.utc) - timedelta(seconds=window_s)
).isoformat()
with self._lock:
rows = self._conn.execute("""
SELECT ts, event_type, details FROM interactions
WHERE person_id = ? AND ts >= ?
ORDER BY ts DESC
""", (person_id, cutoff)).fetchall()
now_dt = datetime.now(timezone.utc)
result = []
for row in rows:
try:
row_dt = datetime.fromisoformat(row["ts"])
# Make timezone-aware if needed
if row_dt.tzinfo is None:
row_dt = row_dt.replace(tzinfo=timezone.utc)
dt_secs = (now_dt - row_dt).total_seconds()
except (ValueError, TypeError):
dt_secs = window_s
details = {}
try:
details = json.loads(row["details"] or "{}")
except json.JSONDecodeError:
pass
result.append({
"type": row["event_type"],
"dt": dt_secs,
"ts": row["ts"],
"details": details,
})
return result
# ── Preferences ───────────────────────────────────────────────────────────
def set_pref(self, person_id: str, key: str, value):
"""Set a learned preference for a person.
Parameters
----------
person_id, key : str
value : JSON-serialisable
"""
person = self.get_person(person_id)
prefs = person["prefs"]
prefs[key] = value
with self._lock:
self._conn.execute(
"UPDATE people SET prefs = ? WHERE person_id = ?",
(json.dumps(prefs), person_id),
)
self._conn.commit()
def get_pref(self, person_id: str, key: str, default=None):
"""Return a specific learned preference for a person."""
return self.get_person(person_id)["prefs"].get(key, default)
# ── Lifecycle ─────────────────────────────────────────────────────────────
def close(self):
"""Close the database connection."""
with self._lock:
self._conn.close()

View File

@ -0,0 +1,196 @@
"""
soul_loader.py SOUL.md persona parser and hot-reload watcher.
SOUL.md format
--------------
The file uses YAML front-matter (delimited by ``---`` lines) with an optional
Markdown description body that is ignored by the parser. Example::
---
name: "Salty"
humor_level: 7
sass_level: 4
base_mood: "playful"
...
---
# Optional description text (ignored)
Public API
----------
load_soul(path) -> dict (raises on parse error)
SoulWatcher(path, cb, interval)
.start()
.stop()
.reload_now() -> dict
Pure module no ROS2 imports.
"""
import os
import re
import threading
import time
import yaml
# Keys that are required in every SOUL.md file
_REQUIRED_KEYS = {
"name",
"humor_level",
"sass_level",
"base_mood",
"threshold_regular",
"threshold_favorite",
"greeting_stranger",
"greeting_regular",
"greeting_favorite",
}
_VALID_MOODS = {"happy", "curious", "annoyed", "playful"}
def _extract_frontmatter(text: str) -> str:
"""Return the YAML block between the first pair of ``---`` delimiters.
Raises ``ValueError`` if the file does not contain valid front-matter.
"""
lines = text.splitlines()
delimiters = [i for i, l in enumerate(lines) if l.strip() == "---"]
if len(delimiters) < 2:
# No delimiter found — treat the whole file as plain YAML
return text
start = delimiters[0] + 1
end = delimiters[1]
return "\n".join(lines[start:end])
def load_soul(path: str) -> dict:
"""Parse a SOUL.md file and return the validated config dict.
Parameters
----------
path : str
Absolute path to the SOUL.md file.
Returns
-------
dict
Validated persona configuration.
Raises
------
FileNotFoundError
If the file does not exist.
ValueError
If the YAML is malformed or required keys are missing.
"""
if not os.path.isfile(path):
raise FileNotFoundError(f"SOUL.md not found: {path}")
with open(path, "r", encoding="utf-8") as fh:
raw = fh.read()
yaml_text = _extract_frontmatter(raw)
try:
data = yaml.safe_load(yaml_text)
except yaml.YAMLError as exc:
raise ValueError(f"SOUL.md YAML parse error in {path}: {exc}") from exc
if not isinstance(data, dict):
raise ValueError(f"SOUL.md top level must be a YAML mapping, got {type(data)}")
# Validate required keys
missing = _REQUIRED_KEYS - data.keys()
if missing:
raise ValueError(f"SOUL.md missing required keys: {sorted(missing)}")
# Validate ranges
for key in ("humor_level", "sass_level"):
val = data.get(key)
if not isinstance(val, (int, float)) or not (0 <= val <= 10):
raise ValueError(f"SOUL.md '{key}' must be a number 010, got {val!r}")
if data.get("base_mood") not in _VALID_MOODS:
raise ValueError(
f"SOUL.md 'base_mood' must be one of {sorted(_VALID_MOODS)}, "
f"got {data.get('base_mood')!r}"
)
for key in ("threshold_regular", "threshold_favorite"):
val = data.get(key)
if not isinstance(val, int) or val < 0:
raise ValueError(f"SOUL.md '{key}' must be a non-negative integer, got {val!r}")
if data["threshold_regular"] > data["threshold_favorite"]:
raise ValueError(
"SOUL.md 'threshold_regular' must be <= 'threshold_favorite'"
)
return data
class SoulWatcher:
"""Background thread that polls SOUL.md for changes and calls a callback.
Parameters
----------
path : str
Path to the SOUL.md file to watch.
on_reload : callable
``on_reload(soul_dict)`` called whenever a valid new SOUL.md is loaded.
interval : float
Polling interval in seconds (default 5.0).
on_error : callable, optional
``on_error(exception)`` called when a reload attempt fails.
"""
def __init__(self, path: str, on_reload, interval: float = 5.0, on_error=None):
self._path = path
self._on_reload = on_reload
self._interval = interval
self._on_error = on_error
self._thread = None
self._stop_evt = threading.Event()
self._last_mtime = 0.0
# ------------------------------------------------------------------
def start(self):
"""Start the background polling thread."""
if self._thread and self._thread.is_alive():
return
self._stop_evt.clear()
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
"""Signal the watcher thread to stop and block until it exits."""
self._stop_evt.set()
if self._thread:
self._thread.join(timeout=self._interval + 1.0)
def reload_now(self) -> dict:
"""Force an immediate reload and return the new soul dict."""
soul = load_soul(self._path)
self._last_mtime = os.path.getmtime(self._path)
self._on_reload(soul)
return soul
# ------------------------------------------------------------------
def _run(self):
while not self._stop_evt.wait(self._interval):
try:
mtime = os.path.getmtime(self._path)
except OSError:
continue
if mtime != self._last_mtime:
try:
soul = load_soul(self._path)
except (FileNotFoundError, ValueError) as exc:
if self._on_error:
self._on_error(exc)
continue
self._last_mtime = mtime
self._on_reload(soul)

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/saltybot_social_personality
[install]
install_scripts=$base/lib/saltybot_social_personality

View File

@ -0,0 +1,28 @@
from setuptools import setup
package_name = "saltybot_social_personality"
setup(
name=package_name,
version="0.1.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/personality.launch.py"]),
(f"share/{package_name}/config", ["config/SOUL.md",
"config/personality_params.yaml"]),
],
install_requires=["setuptools", "pyyaml"],
zip_safe=True,
maintainer="sl-controls",
maintainer_email="sl-controls@saltylab.local",
description="SOUL.md-driven personality system for saltybot social interaction",
license="MIT",
tests_require=["pytest"],
entry_points={
"console_scripts": [
"personality_node = saltybot_social_personality.personality_node:main",
],
},
)

View File

@ -0,0 +1,475 @@
"""
test_personality.py Unit tests for the saltybot personality system.
No ROS2 runtime required. Tests pure functions from:
- soul_loader.py
- mood_engine.py
- relationship_db.py
Run with:
pytest jetson/ros2_ws/src/saltybot_social_personality/test/test_personality.py
"""
import os
import tempfile
import textwrap
import pytest
# ── Imports (pure modules, no ROS2) ──────────────────────────────────────────
import sys
sys.path.insert(
0,
os.path.join(os.path.dirname(__file__), "..", "saltybot_social_personality"),
)
from soul_loader import load_soul, _extract_frontmatter
from mood_engine import (
compute_mood, get_relationship_tier, build_greeting,
MOOD_HAPPY, MOOD_PLAYFUL, MOOD_CURIOUS, MOOD_ANNOYED,
TIER_STRANGER, TIER_REGULAR, TIER_FAVORITE,
EVENT_NEGATIVE, EVENT_POSITIVE, EVENT_GREETING,
)
from relationship_db import RelationshipDB
# ── Helpers ───────────────────────────────────────────────────────────────────
def _minimal_soul(**overrides) -> dict:
"""Return a valid minimal soul dict, optionally overriding keys."""
base = {
"name": "Salty",
"humor_level": 7,
"sass_level": 4,
"base_mood": "playful",
"threshold_regular": 5,
"threshold_favorite": 20,
"greeting_stranger": "Hello there!",
"greeting_regular": "Hey {name}!",
"greeting_favorite": "Oh hey {name}!!",
}
base.update(overrides)
return base
def _write_soul(content: str) -> str:
"""Write a SOUL.md string to a temp file and return the path."""
fh = tempfile.NamedTemporaryFile(
mode="w", suffix=".md", delete=False, encoding="utf-8"
)
fh.write(content)
fh.close()
return fh.name
_VALID_SOUL_CONTENT = textwrap.dedent("""\
---
name: "TestBot"
speaking_style: "casual"
humor_level: 7
sass_level: 3
base_mood: "playful"
threshold_regular: 5
threshold_favorite: 20
greeting_stranger: "Hello stranger!"
greeting_regular: "Hey {name}!"
greeting_favorite: "Oh hey {name}!!"
mood_prefix_playful: "Beep boop! "
mood_prefix_happy: "Great — "
mood_prefix_curious: "Hmm, "
mood_prefix_annoyed: "Ugh, "
---
# Description (ignored)
This is the description body.
""")
# ═══════════════════════════════════════════════════════════════════════════════
# soul_loader tests
# ═══════════════════════════════════════════════════════════════════════════════
class TestExtractFrontmatter:
def test_delimited(self):
content = "---\nkey: val\n---\n# body"
assert _extract_frontmatter(content) == "key: val"
def test_no_delimiters_returns_whole(self):
content = "key: val\nother: 123"
assert _extract_frontmatter(content) == content
def test_single_delimiter_returns_whole(self):
content = "---\nkey: val\n"
result = _extract_frontmatter(content)
assert "key: val" in result
def test_body_stripped(self):
content = "---\nname: X\n---\n# Body text\nMore body"
assert "Body text" not in _extract_frontmatter(content)
assert "name: X" in _extract_frontmatter(content)
class TestLoadSoul:
def test_valid_file_loads(self):
path = _write_soul(_VALID_SOUL_CONTENT)
try:
soul = load_soul(path)
assert soul["name"] == "TestBot"
assert soul["humor_level"] == 7
assert soul["base_mood"] == "playful"
finally:
os.unlink(path)
def test_missing_file_raises(self):
with pytest.raises(FileNotFoundError):
load_soul("/nonexistent/SOUL.md")
def test_missing_required_key_raises(self):
content = "---\nname: X\nhumor_level: 5\n---" # missing many keys
path = _write_soul(content)
try:
with pytest.raises(ValueError, match="missing required keys"):
load_soul(path)
finally:
os.unlink(path)
def test_humor_out_of_range_raises(self):
soul_str = _VALID_SOUL_CONTENT.replace("humor_level: 7", "humor_level: 11")
path = _write_soul(soul_str)
try:
with pytest.raises(ValueError, match="humor_level"):
load_soul(path)
finally:
os.unlink(path)
def test_invalid_mood_raises(self):
soul_str = _VALID_SOUL_CONTENT.replace(
'base_mood: "playful"', 'base_mood: "grumpy"'
)
path = _write_soul(soul_str)
try:
with pytest.raises(ValueError, match="base_mood"):
load_soul(path)
finally:
os.unlink(path)
def test_threshold_order_enforced(self):
soul_str = _VALID_SOUL_CONTENT.replace(
"threshold_regular: 5", "threshold_regular: 25"
)
path = _write_soul(soul_str)
try:
with pytest.raises(ValueError, match="threshold_regular"):
load_soul(path)
finally:
os.unlink(path)
def test_extra_keys_allowed(self):
content = _VALID_SOUL_CONTENT.replace(
"---\n# Description",
"custom_key: 42\n---\n# Description"
)
path = _write_soul(content)
try:
soul = load_soul(path)
assert soul.get("custom_key") == 42
finally:
os.unlink(path)
# ═══════════════════════════════════════════════════════════════════════════════
# mood_engine tests
# ═══════════════════════════════════════════════════════════════════════════════
class TestGetRelationshipTier:
def test_zero_interactions_stranger(self):
soul = _minimal_soul(threshold_regular=5, threshold_favorite=20)
assert get_relationship_tier(soul, 0) == TIER_STRANGER
def test_below_regular_stranger(self):
soul = _minimal_soul(threshold_regular=5, threshold_favorite=20)
assert get_relationship_tier(soul, 4) == TIER_STRANGER
def test_at_regular_threshold(self):
soul = _minimal_soul(threshold_regular=5, threshold_favorite=20)
assert get_relationship_tier(soul, 5) == TIER_REGULAR
def test_above_regular_below_favorite(self):
soul = _minimal_soul(threshold_regular=5, threshold_favorite=20)
assert get_relationship_tier(soul, 10) == TIER_REGULAR
def test_at_favorite_threshold(self):
soul = _minimal_soul(threshold_regular=5, threshold_favorite=20)
assert get_relationship_tier(soul, 20) == TIER_FAVORITE
def test_above_favorite(self):
soul = _minimal_soul(threshold_regular=5, threshold_favorite=20)
assert get_relationship_tier(soul, 100) == TIER_FAVORITE
class TestComputeMood:
def test_unknown_person_returns_curious(self):
soul = _minimal_soul(humor_level=7)
mood = compute_mood(soul, score=0.0, interaction_count=0, recent_events=[])
assert mood == MOOD_CURIOUS
def test_stranger_low_count_returns_curious(self):
soul = _minimal_soul(threshold_regular=5)
mood = compute_mood(soul, score=2.0, interaction_count=3, recent_events=[])
assert mood == MOOD_CURIOUS
def test_two_negative_events_returns_annoyed(self):
soul = _minimal_soul(threshold_regular=5)
events = [
{"type": EVENT_NEGATIVE, "dt": 30.0},
{"type": EVENT_NEGATIVE, "dt": 60.0},
]
mood = compute_mood(soul, score=10.0, interaction_count=10, recent_events=events)
assert mood == MOOD_ANNOYED
def test_one_negative_not_annoyed(self):
soul = _minimal_soul(humor_level=7, threshold_regular=5, threshold_favorite=20)
events = [{"type": EVENT_NEGATIVE, "dt": 30.0}]
# 1 negative is not enough → should still be happy/playful based on score
mood = compute_mood(soul, score=25.0, interaction_count=25, recent_events=events)
assert mood != MOOD_ANNOYED
def test_high_humor_regular_returns_playful(self):
soul = _minimal_soul(humor_level=8, threshold_regular=5, threshold_favorite=20)
events = [{"type": EVENT_POSITIVE, "dt": 10.0}]
mood = compute_mood(soul, score=10.0, interaction_count=8, recent_events=events)
assert mood == MOOD_PLAYFUL
def test_low_humor_regular_returns_happy(self):
soul = _minimal_soul(humor_level=4, threshold_regular=5, threshold_favorite=20)
events = [{"type": EVENT_POSITIVE, "dt": 10.0}]
mood = compute_mood(soul, score=10.0, interaction_count=8, recent_events=events)
assert mood == MOOD_HAPPY
def test_stale_negative_ignored(self):
soul = _minimal_soul(humor_level=8, threshold_regular=5, threshold_favorite=20)
# dt > 120s → outside the recent window → should not trigger annoyed
events = [
{"type": EVENT_NEGATIVE, "dt": 200.0},
{"type": EVENT_NEGATIVE, "dt": 300.0},
]
mood = compute_mood(soul, score=15.0, interaction_count=10, recent_events=events)
assert mood != MOOD_ANNOYED
def test_favorite_high_humor_playful(self):
soul = _minimal_soul(humor_level=9, threshold_regular=5, threshold_favorite=20)
mood = compute_mood(soul, score=50.0, interaction_count=30, recent_events=[])
assert mood == MOOD_PLAYFUL
def test_favorite_low_humor_happy(self):
soul = _minimal_soul(humor_level=3, threshold_regular=5, threshold_favorite=20)
mood = compute_mood(soul, score=50.0, interaction_count=30, recent_events=[])
assert mood == MOOD_HAPPY
class TestBuildGreeting:
def _soul(self, **kw):
return _minimal_soul(
mood_prefix_happy="Great — ",
mood_prefix_curious="Hmm, ",
mood_prefix_annoyed="Well, ",
mood_prefix_playful="Beep boop! ",
**kw,
)
def test_stranger_greeting(self):
soul = self._soul()
g = build_greeting(soul, TIER_STRANGER, MOOD_CURIOUS, "")
assert "hello" in g.lower()
def test_regular_greeting_contains_name(self):
soul = self._soul()
g = build_greeting(soul, TIER_REGULAR, MOOD_HAPPY, "alice")
assert "alice" in g
def test_favorite_greeting_contains_name(self):
soul = self._soul()
g = build_greeting(soul, TIER_FAVORITE, MOOD_PLAYFUL, "bob")
assert "bob" in g
def test_mood_prefix_applied(self):
soul = self._soul()
g = build_greeting(soul, TIER_REGULAR, MOOD_PLAYFUL, "alice")
assert g.startswith("Beep boop!")
def test_no_prefix_key_no_prefix(self):
soul = _minimal_soul() # no mood_prefix_* keys
g = build_greeting(soul, TIER_REGULAR, MOOD_HAPPY, "alice")
assert g.startswith("Hey")
def test_empty_person_id_uses_friend(self):
soul = self._soul()
g = build_greeting(soul, TIER_REGULAR, MOOD_HAPPY, "")
assert "friend" in g
def test_happy_prefix(self):
soul = self._soul()
g = build_greeting(soul, TIER_REGULAR, MOOD_HAPPY, "carol")
assert g.startswith("Great")
def test_annoyed_prefix(self):
soul = self._soul()
g = build_greeting(soul, TIER_REGULAR, MOOD_ANNOYED, "dave")
assert g.startswith("Well")
# ═══════════════════════════════════════════════════════════════════════════════
# relationship_db tests
# ═══════════════════════════════════════════════════════════════════════════════
class TestRelationshipDB:
@pytest.fixture
def db(self, tmp_path):
path = str(tmp_path / "test.db")
d = RelationshipDB(path)
yield d
d.close()
def test_get_person_creates_default(self, db):
p = db.get_person("alice")
assert p["person_id"] == "alice"
assert p["score"] == pytest.approx(0.0)
assert p["interaction_count"] == 0
def test_get_person_idempotent(self, db):
p1 = db.get_person("bob")
p2 = db.get_person("bob")
assert p1["person_id"] == p2["person_id"]
def test_record_interaction_increments_count(self, db):
db.record_interaction("alice", "detection")
db.record_interaction("alice", "detection")
p = db.get_person("alice")
assert p["interaction_count"] == 2
def test_record_interaction_updates_score(self, db):
db.record_interaction("alice", "positive", delta_score=5.0)
p = db.get_person("alice")
assert p["score"] == pytest.approx(5.0)
def test_negative_delta_reduces_score(self, db):
db.record_interaction("carol", "positive", delta_score=10.0)
db.record_interaction("carol", "negative", delta_score=-3.0)
p = db.get_person("carol")
assert p["score"] == pytest.approx(7.0)
def test_score_zero_by_default(self, db):
p = db.get_person("dave")
assert p["score"] == pytest.approx(0.0)
def test_set_and_get_pref(self, db):
db.set_pref("alice", "language", "en")
assert db.get_pref("alice", "language") == "en"
def test_get_pref_default(self, db):
assert db.get_pref("nobody", "language", "fr") == "fr"
def test_multiple_prefs_stored(self, db):
db.set_pref("alice", "lang", "en")
db.set_pref("alice", "name", "Alice")
assert db.get_pref("alice", "lang") == "en"
assert db.get_pref("alice", "name") == "Alice"
def test_all_people_returns_list(self, db):
db.record_interaction("a", "detection")
db.record_interaction("b", "detection")
people = db.all_people()
ids = {p["person_id"] for p in people}
assert {"a", "b"} <= ids
def test_get_recent_events_returns_events(self, db):
db.record_interaction("alice", "greeting", delta_score=1.0)
events = db.get_recent_events("alice", window_s=60.0)
assert len(events) == 1
assert events[0]["type"] == "greeting"
def test_get_recent_events_empty_for_new_person(self, db):
events = db.get_recent_events("nobody", window_s=60.0)
assert events == []
def test_event_dt_positive(self, db):
db.record_interaction("alice", "detection")
events = db.get_recent_events("alice", window_s=60.0)
assert events[0]["dt"] >= 0.0
def test_multiple_people_isolated(self, db):
db.record_interaction("alice", "positive", delta_score=10.0)
db.record_interaction("bob", "negative", delta_score=-5.0)
assert db.get_person("alice")["score"] == pytest.approx(10.0)
assert db.get_person("bob")["score"] == pytest.approx(-5.0)
def test_details_stored(self, db):
db.record_interaction("alice", "greeting", details={"location": "lab"})
events = db.get_recent_events("alice", window_s=60.0)
assert events[0]["details"].get("location") == "lab"
# ═══════════════════════════════════════════════════════════════════════════════
# Integration: soul → tier → mood → greeting pipeline
# ═══════════════════════════════════════════════════════════════════════════════
class TestIntegrationPipeline:
def test_stranger_pipeline(self, tmp_path):
db_path = str(tmp_path / "int.db")
db = RelationshipDB(db_path)
soul = _minimal_soul(
humor_level=7, threshold_regular=5, threshold_favorite=20,
mood_prefix_curious="Hmm, "
)
# No prior interactions
person = db.get_person("stranger_001")
events = db.get_recent_events("stranger_001")
tier = get_relationship_tier(soul, person["interaction_count"])
mood = compute_mood(soul, person["score"], person["interaction_count"], events)
greeting = build_greeting(soul, tier, mood, "stranger_001")
assert tier == TIER_STRANGER
assert mood == MOOD_CURIOUS
assert "hello" in greeting.lower()
db.close()
def test_regular_positive_pipeline(self, tmp_path):
db_path = str(tmp_path / "int2.db")
db = RelationshipDB(db_path)
soul = _minimal_soul(
humor_level=8, threshold_regular=5, threshold_favorite=20,
mood_prefix_playful="Beep! "
)
# Simulate 6 positive interactions (> threshold_regular=5)
for _ in range(6):
db.record_interaction("alice", "positive", delta_score=2.0)
person = db.get_person("alice")
events = db.get_recent_events("alice")
tier = get_relationship_tier(soul, person["interaction_count"])
mood = compute_mood(soul, person["score"], person["interaction_count"], events)
greeting = build_greeting(soul, tier, mood, "alice")
assert tier == TIER_REGULAR
assert mood == MOOD_PLAYFUL # humor_level=8, recent positive events
assert "alice" in greeting
assert greeting.startswith("Beep!")
db.close()
def test_favorite_pipeline(self, tmp_path):
db_path = str(tmp_path / "int3.db")
db = RelationshipDB(db_path)
soul = _minimal_soul(
humor_level=5, threshold_regular=5, threshold_favorite=20
)
for _ in range(25):
db.record_interaction("bob", "positive", delta_score=1.0)
person = db.get_person("bob")
tier = get_relationship_tier(soul, person["interaction_count"])
assert tier == TIER_FAVORITE
greeting = build_greeting(soul, tier, "happy", "bob")
assert "bob" in greeting
assert "Oh hey" in greeting
db.close()