""" Two-level identity context — user + persona, modelled on OS home directories. Layout on disk: home/ scott/ persona/ inara/ ← IDENTITY.md, SOUL.md, sessions/, TASKS.json, … abc/ ← a second persona for the same user holly/ persona/ tina/ Each HTTP request sets both user and persona via set_context() at entry. Everything downstream calls persona_path() to get the right directory. Background tasks (cron, distiller) pass both names explicitly. Naming rules — same as Linux usernames: ^[a-z_][a-z0-9_-]{0,31}$ Lowercase, start with letter or underscore, max 32 chars. Examples: scott, holly, whatever_name_asian-v3 Future Aether integration: (user, persona) maps to (account_id, persona_id). Replace the directory lookups in persona_path() / validate() with DB lookups; the ContextVar contract stays identical. """ import re from contextvars import ContextVar from pathlib import Path from config import settings _user: ContextVar[str] = ContextVar("user", default="scott") _persona: ContextVar[str] = ContextVar("persona", default="inara") # Same rules as Linux usernames. _VALID = re.compile(r"^[a-z_][a-z0-9_-]{0,31}$") # --------------------------------------------------------------------------- # Context setters / getters # --------------------------------------------------------------------------- def set_context(username: str, persona_name: str) -> None: """Set the active user + persona for the current async task/coroutine.""" _user.set(username) _persona.set(persona_name) def get_user() -> str: return _user.get() def get_persona() -> str: return _persona.get() # --------------------------------------------------------------------------- # Path resolution # --------------------------------------------------------------------------- def persona_path(username: str | None = None, name: str | None = None) -> Path: """ Return the filesystem path for a persona's data directory. home/{username}/persona/{name}/ If either arg is omitted, falls back to the ContextVar for the current request. Pass both explicitly for background tasks (cron, distiller). """ u = username or _user.get() p = name or _persona.get() return settings.home_root() / u / "persona" / p # --------------------------------------------------------------------------- # Discovery # --------------------------------------------------------------------------- def list_users() -> list[str]: """Return all usernames that have at least one persona.""" root = settings.home_root() if not root.exists(): return [] return sorted( d.name for d in root.iterdir() if d.is_dir() and (d / "persona").is_dir() ) def list_user_personas(username: str) -> list[str]: """Return all persona names for a given user (must have IDENTITY.md).""" persona_root = settings.home_root() / username / "persona" if not persona_root.exists(): return [] return sorted( d.name for d in persona_root.iterdir() if d.is_dir() and (d / "IDENTITY.md").exists() ) # --------------------------------------------------------------------------- # Validation # --------------------------------------------------------------------------- def _check_name(name: str, label: str) -> None: if not _VALID.match(name): raise ValueError( f"Invalid {label} {name!r}. " f"Use lowercase letters, digits, underscores, or hyphens " f"(must start with letter/underscore, max 32 chars)." ) def validate(username: str, persona_name: str) -> tuple[str, str]: """ Validate a (username, persona_name) pair from an untrusted source. Returns (username, persona_name) if valid, raises ValueError otherwise. Checks format first (blocks path traversal), then existence. """ _check_name(username, "username") _check_name(persona_name, "persona") user_dir = settings.home_root() / username if not user_dir.is_dir(): raise ValueError(f"Unknown user: {username!r}") persona_dir = user_dir / "persona" / persona_name if not (persona_dir / "IDENTITY.md").exists(): raise ValueError(f"Unknown persona {persona_name!r} for user {username!r}") return username, persona_name