import json import random from pathlib import Path from datetime import datetime from config import settings from persona import persona_path _ADJECTIVES = [ "amber", "azure", "bold", "bright", "calm", "cedar", "cobalt", "coral", "crisp", "dusk", "ember", "fern", "frost", "gold", "hazy", "indigo", "jade", "keen", "lark", "lunar", "maple", "misty", "noble", "north", "oak", "onyx", "opal", "pine", "quiet", "raven", "ridge", "river", "sage", "silver", "slate", "solar", "steel", "stone", "swift", "teal", "timber", "vale", "velvet", "violet", "warm", "wild", "winter", "wren", ] _NOUNS = [ "bay", "bloom", "brook", "canyon", "cave", "cliff", "cloud", "coast", "creek", "dale", "dawn", "delta", "dune", "echo", "falls", "field", "fjord", "flare", "glade", "glen", "grove", "harbor", "haven", "hill", "hollow", "isle", "lake", "ledge", "marsh", "meadow", "mist", "moon", "moor", "peak", "plain", "pond", "reef", "ridge", "rise", "rock", "shore", "sky", "slope", "spring", "star", "storm", "trail", "vale", "wave", "wood", ] def generate_session_id() -> str: """Return a readable slug like 'amber-lake' or 'amber-lake-03'.""" existing = {s["session_id"] for s in list_all()} for _ in range(30): base = f"{random.choice(_ADJECTIVES)}-{random.choice(_NOUNS)}" if base not in existing: return base # Try with a numeric suffix candidate = f"{base}-{random.randint(2, 99):02d}" if candidate not in existing: return candidate # Statistically unreachable for a personal tool import uuid return str(uuid.uuid4())[:8] def _path(session_id: str) -> Path: d = persona_path() / "session_data" d.mkdir(parents=True, exist_ok=True) return d / f"{session_id}.json" def load(session_id: str) -> list[dict]: path = _path(session_id) if not path.exists(): return [] return json.loads(path.read_text()).get("messages", []) def save(session_id: str, messages: list[dict]) -> None: path = _path(session_id) existing = json.loads(path.read_text()) if path.exists() else {} # Enforce rolling window windowed = messages[-settings.max_history_messages:] data = { "session_id": session_id, "created": existing.get("created", datetime.now().isoformat()), "updated": datetime.now().isoformat(), "messages": windowed, } if "name" in existing: data["name"] = existing["name"] path.write_text(json.dumps(data, indent=2)) def get_name(session_id: str) -> str: """Return the friendly name for a session, or '' if none set.""" path = _path(session_id) if not path.exists(): return "" try: return json.loads(path.read_text()).get("name", "") except Exception: return "" def rename(session_id: str, name: str) -> bool: """Set (or clear) the friendly name on a session. Returns False if not found.""" path = _path(session_id) if not path.exists(): return False data = json.loads(path.read_text()) if name: data["name"] = name else: data.pop("name", None) path.write_text(json.dumps(data, indent=2)) return True def delete(session_id: str) -> bool: """Delete a session file. Returns True if it existed and was deleted.""" path = _path(session_id) if not path.exists(): return False path.unlink() return True def list_all() -> list[dict]: d = persona_path() / "session_data" if not d.exists(): return [] results = [] for f in d.glob("*.json"): try: data = json.loads(f.read_text()) results.append({ "session_id": data["session_id"], "name": data.get("name", ""), "updated": data.get("updated"), "message_count": len(data.get("messages", [])), "_sort_key": data.get("updated") or f.stat().st_mtime, }) except Exception: pass results.sort(key=lambda s: s.pop("_sort_key"), reverse=True) return results