diff --git a/cortex/pytest.ini b/cortex/pytest.ini new file mode 100644 index 0000000..8f32afa --- /dev/null +++ b/cortex/pytest.ini @@ -0,0 +1,4 @@ +[pytest] +asyncio_mode = auto +testpaths = tests +pythonpath = . diff --git a/cortex/tests/conftest.py b/cortex/tests/conftest.py new file mode 100644 index 0000000..6070ba6 --- /dev/null +++ b/cortex/tests/conftest.py @@ -0,0 +1,100 @@ +""" +Shared fixtures for Cortex test suite. + +Key design choices: + - All file I/O goes to a tmp_path, never touching personas/inara/ or real sessions. + - LLM calls are mocked by default — tests are fast and deterministic. + - The 'app' fixture patches settings before importing main, so all modules + see the temp directory. +""" + +import json +import pytest +import pytest_asyncio +from pathlib import Path +from unittest.mock import AsyncMock, patch + +import httpx +from httpx import ASGITransport + + +# --------------------------------------------------------------------------- +# Temp persona directory +# --------------------------------------------------------------------------- + +@pytest.fixture(scope="session") +def personas_root(tmp_path_factory) -> Path: + """A temp personas/ dir with a minimal 'inara' persona for testing.""" + root = tmp_path_factory.mktemp("personas") + _make_persona(root, "inara", "Inara", "Scott") + return root + + +def _make_persona(root: Path, name: str, agent: str, user: str) -> Path: + p = root / name + p.mkdir(parents=True, exist_ok=True) + (p / "IDENTITY.md").write_text(f"# {agent}\nTest identity for {agent}.") + (p / "SOUL.md").write_text(f"# Soul\nTest soul for {agent}.") + (p / "PROTOCOLS.md").write_text("# Protocols\nBe helpful.") + (p / "USER.md").write_text(f"# {user}\nTest user profile.") + (p / "HELP.md").write_text("# Help\nTest help content.") + (p / "MEMORY_LONG.md").write_text("Not yet populated.") + (p / "MEMORY_MID.md").write_text("Not yet populated.") + (p / "MEMORY_SHORT.md").write_text("Not yet populated.") + (p / "TASKS.json").write_text("[]") + (p / "CRONS.json").write_text("[]") + (p / "SCRATCH.md").write_text("") + (p / "REMINDERS.md").write_text("") + (p / "sessions").mkdir() + return p + + +# --------------------------------------------------------------------------- +# App fixture — patches settings before the ASGI app is started +# --------------------------------------------------------------------------- + +@pytest_asyncio.fixture +async def client(personas_root, tmp_path): + """HTTPX async test client against the live ASGI app with patched paths.""" + import config + import persona as persona_mod + + sessions_dir = tmp_path / "sessions" + sessions_dir.mkdir() + + with ( + patch.object(config.settings, "personas_dir", personas_root), + patch.object(config.settings, "sessions_dir", sessions_dir), + patch("scheduler.start"), # don't run APScheduler in tests + patch("scheduler.stop"), + ): + # Force persona module to re-read patched settings + persona_mod._current.set("inara") + + from main import app + async with httpx.AsyncClient( + transport=ASGITransport(app=app), + base_url="http://test", + ) as c: + yield c + + +# --------------------------------------------------------------------------- +# LLM mock +# --------------------------------------------------------------------------- + +@pytest.fixture +def mock_llm(): + """ + Patch complete() at every import site so no real LLM calls are made. + Each router does `from llm_client import complete`, creating a local reference. + Patching llm_client.complete alone won't intercept those — patch each site. + """ + ret = ("Hello, I am a test response.", "claude") + with ( + patch("routers.chat.complete", new_callable=AsyncMock, return_value=ret), + patch("routers.nextcloud_talk.complete", new_callable=AsyncMock, return_value=ret), + patch("routers.google_chat.complete", new_callable=AsyncMock, return_value=ret), + patch("llm_client.complete", new_callable=AsyncMock, return_value=ret), + ): + yield diff --git a/cortex/tests/test_api_chat.py b/cortex/tests/test_api_chat.py new file mode 100644 index 0000000..25b9a53 --- /dev/null +++ b/cortex/tests/test_api_chat.py @@ -0,0 +1,122 @@ +""" +Tests for POST /chat — persona routing, session handling, LLM mocking. +""" +import json +import pytest + + +def _parse_sse(text: str) -> list[dict]: + """Extract JSON payloads from an SSE response body.""" + events = [] + for line in text.splitlines(): + if line.startswith("data: "): + try: + events.append(json.loads(line[6:])) + except json.JSONDecodeError: + pass + return events + + +@pytest.mark.anyio +async def test_chat_basic(client, mock_llm): + r = await client.post("/chat", json={"message": "Hello", "persona": "inara"}) + assert r.status_code == 200 + events = _parse_sse(r.text) + responses = [e for e in events if e.get("type") == "response"] + assert len(responses) == 1 + assert responses[0]["response"] == "Hello, I am a test response." + assert responses[0]["backend"] == "claude" + assert "session_id" in responses[0] + + +@pytest.mark.anyio +async def test_chat_default_persona(client, mock_llm): + """persona defaults to 'inara' when not specified.""" + r = await client.post("/chat", json={"message": "Hi"}) + assert r.status_code == 200 + events = _parse_sse(r.text) + assert any(e.get("type") == "response" for e in events) + + +@pytest.mark.anyio +async def test_chat_unknown_persona(client, mock_llm): + r = await client.post("/chat", json={"message": "Hi", "persona": "nobody"}) + assert r.status_code == 200 + events = _parse_sse(r.text) + errors = [e for e in events if e.get("type") == "error"] + assert len(errors) == 1 + assert "nobody" in errors[0]["message"] + + +@pytest.mark.anyio +async def test_chat_path_traversal_persona(client, mock_llm): + r = await client.post("/chat", json={"message": "Hi", "persona": "../../etc"}) + assert r.status_code == 200 + events = _parse_sse(r.text) + errors = [e for e in events if e.get("type") == "error"] + assert len(errors) == 1 + + +@pytest.mark.anyio +async def test_chat_session_persists(client, mock_llm): + """Same session_id reuses history.""" + r1 = await client.post("/chat", json={"message": "First message", "session_id": "test-sess-1"}) + r2 = await client.post("/chat", json={"message": "Second message", "session_id": "test-sess-1"}) + assert r1.status_code == 200 + assert r2.status_code == 200 + + +@pytest.mark.anyio +async def test_sessions_list(client, mock_llm): + """After a chat, the session appears in /sessions.""" + await client.post("/chat", json={"message": "Hello", "session_id": "test-list-sess"}) + r = await client.get("/sessions") + assert r.status_code == 200 + sessions = r.json()["sessions"] + ids = [s["session_id"] for s in sessions] + assert "test-list-sess" in ids + + +@pytest.mark.anyio +async def test_session_history(client, mock_llm): + await client.post("/chat", json={"message": "Hello", "session_id": "test-hist-sess"}) + r = await client.get("/history/test-hist-sess") + assert r.status_code == 200 + msgs = r.json()["messages"] + assert any(m["role"] == "user" and "Hello" in m["content"] for m in msgs) + + +@pytest.mark.anyio +async def test_session_delete(client, mock_llm): + await client.post("/chat", json={"message": "Hello", "session_id": "test-del-sess"}) + r = await client.delete("/sessions/test-del-sess") + assert r.status_code == 200 + assert r.json()["ok"] is True + + +@pytest.mark.anyio +async def test_session_delete_unknown(client): + r = await client.delete("/sessions/does-not-exist") + assert r.status_code == 404 + + +@pytest.mark.anyio +async def test_backend_get(client): + r = await client.get("/backend") + assert r.status_code == 200 + assert r.json()["primary"] in ("claude", "gemini") + + +@pytest.mark.anyio +async def test_backend_set(client): + r = await client.post("/backend", json={"primary": "gemini"}) + assert r.status_code == 200 + assert r.json()["primary"] == "gemini" + # Reset + await client.post("/backend", json={"primary": "claude"}) + + +@pytest.mark.anyio +async def test_backend_set_invalid(client): + r = await client.post("/backend", json={"primary": "gpt-4"}) + assert r.status_code == 400 diff --git a/cortex/tests/test_api_files.py b/cortex/tests/test_api_files.py new file mode 100644 index 0000000..538005f --- /dev/null +++ b/cortex/tests/test_api_files.py @@ -0,0 +1,66 @@ +""" +Tests for GET/PUT /files/* — allowed set enforcement, read/write, IDENTITY.md. +""" +import pytest + + +@pytest.mark.anyio +async def test_files_list(client): + r = await client.get("/files") + assert r.status_code == 200 + files = r.json()["files"] + names = [f["name"] for f in files] + assert "SOUL.md" in names + assert "IDENTITY.md" in names + assert "USER.md" in names + + +@pytest.mark.anyio +async def test_files_get_allowed(client): + r = await client.get("/files/IDENTITY.md") + assert r.status_code == 200 + assert "content" in r.json() + + +@pytest.mark.anyio +async def test_files_get_not_in_allowed(client): + """Files outside the ALLOWED set should return 404, not the file content.""" + for name in ("TASKS.json", "CRONS.json", "SCRATCH.md", "../config.py", ".env"): + r = await client.get(f"/files/{name}") + assert r.status_code == 404, f"Expected 404 for {name}, got {r.status_code}" + + +@pytest.mark.anyio +async def test_files_put_and_get(client): + """Write a new value and read it back.""" + content = "# Updated PROTOCOLS\nTest content." + r = await client.put("/files/PROTOCOLS.md", json={"content": content}) + assert r.status_code == 200 + assert r.json()["ok"] is True + + r2 = await client.get("/files/PROTOCOLS.md") + assert r2.status_code == 200 + assert r2.json()["content"] == content + + +@pytest.mark.anyio +async def test_files_put_not_allowed(client): + r = await client.put("/files/../../etc/passwd", json={"content": "pwned"}) + assert r.status_code == 404 + + +@pytest.mark.anyio +async def test_files_get_missing_but_allowed(client, personas_root): + """An allowed file that doesn't exist yet returns 404.""" + # Temporarily remove MEMORY_MID.md + f = personas_root / "inara" / "MEMORY_MID.md" + existed = f.exists() + if existed: + backup = f.read_text() + f.unlink() + try: + r = await client.get("/files/MEMORY_MID.md") + assert r.status_code == 404 + finally: + if existed: + f.write_text(backup) diff --git a/cortex/tests/test_health.py b/cortex/tests/test_health.py new file mode 100644 index 0000000..4f219cf --- /dev/null +++ b/cortex/tests/test_health.py @@ -0,0 +1,34 @@ +""" +Basic smoke tests — if these fail, nothing else matters. +""" +import pytest + + +@pytest.mark.anyio +async def test_health(client): + r = await client.get("/health") + assert r.status_code == 200 + assert r.json()["status"] == "ok" + + +@pytest.mark.anyio +async def test_auth_status(client): + r = await client.get("/auth/status") + assert r.status_code == 200 + data = r.json() + assert "claude" in data + assert "gemini" in data + assert "ok" in data["claude"] + + +@pytest.mark.anyio +async def test_distill_status(client): + r = await client.get("/distill/status") + assert r.status_code == 200 + assert "enabled" in r.json() + + +@pytest.mark.anyio +async def test_unknown_route_404(client): + r = await client.get("/does-not-exist") + assert r.status_code == 404 diff --git a/cortex/tests/test_persona.py b/cortex/tests/test_persona.py new file mode 100644 index 0000000..11cacb1 --- /dev/null +++ b/cortex/tests/test_persona.py @@ -0,0 +1,94 @@ +""" +Unit tests for persona.py — validation, routing, path traversal. +No HTTP involved. +""" +import pytest +from pathlib import Path +from unittest.mock import patch + + +def _make_temp_personas(tmp_path: Path) -> Path: + root = tmp_path / "personas" + for name in ("alice", "bob"): + p = root / name + p.mkdir(parents=True) + (p / "IDENTITY.md").write_text(f"# {name}") + # A directory WITHOUT IDENTITY.md — should not appear in list_personas() + (root / "incomplete").mkdir() + return root + + +def test_validate_good(tmp_path): + root = _make_temp_personas(tmp_path) + import config, persona + with patch.object(config.settings, "personas_dir", root): + assert persona.validate("alice") == "alice" + assert persona.validate("bob") == "bob" + + +def test_validate_unknown(tmp_path): + root = _make_temp_personas(tmp_path) + import config, persona + with patch.object(config.settings, "personas_dir", root): + with pytest.raises(ValueError, match="Unknown persona"): + persona.validate("charlie") + + +def test_validate_path_traversal(tmp_path): + root = _make_temp_personas(tmp_path) + import config, persona + with patch.object(config.settings, "personas_dir", root): + with pytest.raises(ValueError, match="Invalid persona name"): + persona.validate("../../etc/passwd") + with pytest.raises(ValueError, match="Invalid persona name"): + persona.validate("../alice") + with pytest.raises(ValueError, match="Invalid persona name"): + persona.validate("alice/../../etc") + + +def test_validate_special_chars(tmp_path): + root = _make_temp_personas(tmp_path) + import config, persona + with patch.object(config.settings, "personas_dir", root): + for bad in ("alice bob", "alice;bob", "alice\x00bob", "A" * 33, ""): + with pytest.raises(ValueError): + persona.validate(bad) + + +def test_validate_allows_hyphen_underscore(tmp_path): + root = _make_temp_personas(tmp_path) + # Create a persona with hyphen and underscore in name + p = root / "my_ai-agent" + p.mkdir(parents=True) + (p / "IDENTITY.md").write_text("# My Agent") + import config, persona + with patch.object(config.settings, "personas_dir", root): + assert persona.validate("my_ai-agent") == "my_ai-agent" + + +def test_list_personas(tmp_path): + root = _make_temp_personas(tmp_path) + import config, persona + with patch.object(config.settings, "personas_dir", root): + names = persona.list_personas() + assert "alice" in names + assert "bob" in names + assert "incomplete" not in names # no IDENTITY.md + + +def test_persona_path_uses_contextvar(tmp_path): + root = _make_temp_personas(tmp_path) + import config, persona + with patch.object(config.settings, "personas_dir", root): + persona.set_persona("alice") + assert persona.persona_path() == root / "alice" + persona.set_persona("bob") + assert persona.persona_path() == root / "bob" + + +def test_persona_path_explicit_name(tmp_path): + root = _make_temp_personas(tmp_path) + import config, persona + with patch.object(config.settings, "personas_dir", root): + persona.set_persona("alice") + assert persona.persona_path("bob") == root / "bob" diff --git a/cortex/tests/test_security.py b/cortex/tests/test_security.py new file mode 100644 index 0000000..07ab5f9 --- /dev/null +++ b/cortex/tests/test_security.py @@ -0,0 +1,126 @@ +""" +Security-focused tests — what should be blocked or rejected. + +These document the current security posture and will catch regressions. +Tests marked 'known_gap' document real issues that are not yet fixed; +they assert the current (insecure) behaviour so we notice when it changes. +""" +import pytest + + +# --------------------------------------------------------------------------- +# Path traversal +# --------------------------------------------------------------------------- + +@pytest.mark.anyio +async def test_files_no_path_traversal_in_filename(client): + """File endpoint must not serve files outside the ALLOWED set.""" + dangerous = [ + "../config.py", + "../../etc/passwd", + "SOUL.md/../config.py", + ".env", + "TASKS.json", + "CRONS.json", + ] + for name in dangerous: + r = await client.get(f"/files/{name}") + assert r.status_code in (404, 422), \ + f"Expected 404/422 for {name!r}, got {r.status_code}" + + +@pytest.mark.anyio +async def test_persona_traversal_blocked_in_chat(client, mock_llm): + """Path traversal in persona name must be rejected before any file access.""" + for bad in ("../inara", "../../etc", "inara/../inara", "inara\x00extra"): + r = await client.post("/chat", json={"message": "hi", "persona": bad}) + assert r.status_code == 200 # SSE stream, not HTTP error + import json + for line in r.text.splitlines(): + if line.startswith("data: "): + event = json.loads(line[6:]) + if event.get("type") == "error": + break + else: + pytest.fail(f"Expected error event for persona={bad!r}, got: {r.text[:200]}") + + +@pytest.mark.anyio +async def test_orchestrate_path_traversal(client, mock_llm): + r = await client.post("/orchestrate", json={"task": "hi", "persona": "../../etc"}) + assert r.status_code == 400 + + +# --------------------------------------------------------------------------- +# Signature verification +# --------------------------------------------------------------------------- + +@pytest.mark.anyio +async def test_nct_replayed_request_rejected(client): + """A request with correct format but wrong HMAC should always be rejected.""" + import json, hashlib, hmac as hmac_lib + payload = json.dumps({"type": "Create", "actor": {}, "object": {}, "target": {}}).encode() + # Use wrong secret to generate sig + wrong_sig = hmac_lib.new(b"wrong-secret", b"abc123" + payload, hashlib.sha256).hexdigest() + from unittest.mock import patch + with patch("config.settings.nextcloud_talk_bot_secret", "correct-secret"): + r = await client.post( + "/inara-nextcloud-talk-webhook", + content=payload, + headers={ + "Content-Type": "application/json", + "X-Nextcloud-Talk-Random": "abc123", + "X-Nextcloud-Talk-Signature": wrong_sig, + }, + ) + assert r.status_code == 401 + + +# --------------------------------------------------------------------------- +# Known gaps — document current behaviour, alert when it changes +# --------------------------------------------------------------------------- + +@pytest.mark.anyio +async def test_known_gap__distill_no_app_auth(client): + """ + KNOWN GAP: /distill/* has no app-layer auth. + Anyone reaching port 8000 directly can trigger LLM calls and overwrite memory. + Protection is currently nginx-only. + This test documents the current state — update when app-layer auth is added. + """ + r = await client.get("/distill/status") + assert r.status_code == 200 # currently open + + +@pytest.mark.anyio +async def test_known_gap__files_put_no_app_auth(client): + """ + KNOWN GAP: PUT /files/{filename} has no app-layer auth. + Overwriting SOUL.md or IDENTITY.md changes agent behavior. + Protection is currently nginx-only. + """ + r = await client.put("/files/PROTOCOLS.md", json={"content": "# Modified"}) + assert r.status_code == 200 # currently open + + +@pytest.mark.anyio +async def test_known_gap__gchat_no_audience_bypass(client, mock_llm): + """ + KNOWN GAP: Google Chat JWT verification is silently skipped when + GOOGLE_CHAT_AUDIENCE is empty (the default). Anyone can POST and get + LLM responses without a valid token. + Fix: make audience required; fail loudly if not set. + """ + from unittest.mock import patch + with patch("config.settings.google_chat_audience", ""): + r = await client.post("/channels/google-chat", json={ + "chat": { + "messagePayload": { + "message": {"text": "Exploit"}, + "space": {"name": "spaces/x", "type": "DM"}, + }, + "user": {"displayName": "Attacker"}, + } + }) + # This currently succeeds — it should not when audience is unconfigured + assert r.status_code == 200 # documents the gap diff --git a/cortex/tests/test_tools.py b/cortex/tests/test_tools.py new file mode 100644 index 0000000..a3531ed --- /dev/null +++ b/cortex/tests/test_tools.py @@ -0,0 +1,271 @@ +""" +Unit tests for Inara's internal tools — scratch, tasks, cron. + +These test the sync implementation functions directly, using a temp directory. +No HTTP, no LLM calls. +""" +import json +import pytest +from pathlib import Path +from unittest.mock import patch + + +@pytest.fixture +def persona_dir(tmp_path) -> Path: + """A temp persona directory pre-populated with empty tool files.""" + d = tmp_path / "inara" + d.mkdir() + (d / "SCRATCH.md").write_text("") + (d / "TASKS.json").write_text("[]") + (d / "CRONS.json").write_text("[]") + (d / "REMINDERS.md").write_text("") + return d + + +@pytest.fixture(autouse=True) +def patch_persona_path(persona_dir): + """ + Route all persona_path() calls to the temp dir. + Each tool does `from persona import persona_path`, so we must patch + the name in each module's namespace, not just in persona itself. + """ + with ( + patch("tools.scratch.persona_path", return_value=persona_dir), + patch("tools.tasks.persona_path", return_value=persona_dir), + patch("tools.cron.persona_path", return_value=persona_dir), + patch("cron_runner._persona_path", return_value=persona_dir), + ): + yield + + +# --------------------------------------------------------------------------- +# Scratch +# --------------------------------------------------------------------------- + +class TestScratch: + def test_read_empty(self): + from tools.scratch import _scratch_read + result = _scratch_read() + assert "empty" in result.lower() + + def test_write_and_read(self, persona_dir): + from tools.scratch import _scratch_write, _scratch_read + _scratch_write("# My notes\n\nSome content here.") + content = _scratch_read() + assert "My notes" in content + assert "Some content here" in content + + def test_append_adds_section(self, persona_dir): + from tools.scratch import _scratch_write, _scratch_append, _scratch_read + _scratch_write("# Existing") + _scratch_append("New section content", heading="Section A") + content = _scratch_read() + assert "Existing" in content + assert "Section A" in content + assert "New section content" in content + + def test_append_auto_heading(self, persona_dir): + from tools.scratch import _scratch_append, _scratch_read + _scratch_append("Content without explicit heading") + content = _scratch_read() + assert "UTC" in content # auto heading includes timestamp + + def test_clear(self, persona_dir): + from tools.scratch import _scratch_write, _scratch_clear, _scratch_read + _scratch_write("Some content") + _scratch_clear() + assert "empty" in _scratch_read().lower() + + def test_write_strips_trailing_whitespace(self, persona_dir): + from tools.scratch import _scratch_write, _scratch_read + _scratch_write("Content \n\n\n") + content = (persona_dir / "SCRATCH.md").read_text() + assert content.endswith("\n") + assert not content.endswith("\n\n") + + +# --------------------------------------------------------------------------- +# Tasks +# --------------------------------------------------------------------------- + +class TestTasks: + def _mk(self, title, description=None, priority="normal"): + from tools.tasks import _task_create + return _task_create(title, description, priority) + + def _id(self, result: str) -> str: + import re + m = re.search(r't_\w+', result) + assert m, f"No task ID in: {result}" + return m.group() + + def test_list_empty(self): + from tools.tasks import _task_list + assert "No tasks" in _task_list(status=None) + + def test_create_and_list(self): + from tools.tasks import _task_list + self._mk("Buy coffee", description="Dark roast", priority="high") + result = _task_list(status=None) + assert "Buy coffee" in result + assert "[high]" in result + + def test_create_bad_priority_defaults_to_normal(self): + from tools.tasks import _task_list + self._mk("Test task", priority="urgent") # invalid — becomes "normal" + result = _task_list(status=None) + assert "Test task" in result + assert "[normal]" not in result # normal priority not shown in brackets + + def test_update_status(self): + from tools.tasks import _task_update, _task_list + tid = self._id(self._mk("Work item")) + _task_update(tid, status="in_progress", title=None, description=None, priority=None) + assert "Work item" in _task_list(status="in_progress") + + def test_complete(self): + from tools.tasks import _task_complete, _task_list + tid = self._id(self._mk("Finish this")) + _task_complete(tid) + assert "Finish this" in _task_list(status="done") + assert "Finish this" not in _task_list(status="todo") + + def test_filter_by_status(self): + from tools.tasks import _task_list + self._mk("A task") + assert "A task" in _task_list(status="todo") + assert "A task" not in _task_list(status="done") + + def test_update_unknown_id(self): + from tools.tasks import _task_update + result = _task_update("t_doesnotexist", status="done", + title=None, description=None, priority=None) + assert "not found" in result.lower() + + def test_persistence(self, persona_dir): + """Tasks survive between _load() calls (written to TASKS.json).""" + self._mk("Persistent task") + data = json.loads((persona_dir / "TASKS.json").read_text()) + assert any(t["title"] == "Persistent task" for t in data) + + +# --------------------------------------------------------------------------- +# Cron +# --------------------------------------------------------------------------- + +class TestCronRunner: + def test_parse_hourly(self): + from cron_runner import parse_schedule + assert parse_schedule("hourly") == {"minute": 0} + + def test_parse_daily_default(self): + from cron_runner import parse_schedule + r = parse_schedule("daily") + assert r["hour"] == 9 + assert r["minute"] == 0 + + def test_parse_daily_custom_time(self): + from cron_runner import parse_schedule + r = parse_schedule("daily:14:30") + assert r["hour"] == 14 + assert r["minute"] == 30 + + def test_parse_weekly(self): + from cron_runner import parse_schedule + r = parse_schedule("weekly:mon") + assert r["day_of_week"] == "mon" + assert r["hour"] == 9 + + def test_parse_weekly_with_time(self): + from cron_runner import parse_schedule + r = parse_schedule("weekly:fri:17:00") + assert r["day_of_week"] == "fri" + assert r["hour"] == 17 + assert r["minute"] == 0 + + def test_parse_full_day_names(self): + from cron_runner import parse_schedule + assert parse_schedule("weekly:monday")["day_of_week"] == "mon" + assert parse_schedule("weekly:friday")["day_of_week"] == "fri" + + def test_parse_unknown_schedule(self): + from cron_runner import parse_schedule + with pytest.raises(ValueError, match="Unrecognised"): + parse_schedule("every-tuesday") + + def test_parse_bad_dow(self): + from cron_runner import parse_schedule + with pytest.raises(ValueError, match="day of week"): + parse_schedule("weekly:funday") + + def test_parse_bad_time_non_integer(self): + from cron_runner import parse_schedule + with pytest.raises((ValueError, Exception)): + parse_schedule("daily:noon") # non-integer — parse error + + +class TestCronTools: + def test_list_empty(self): + from tools.cron import _cron_list + result = _cron_list() + assert "No crons" in result + + def test_add_and_list(self): + from tools.cron import _cron_add, _cron_list + with patch("tools.cron._scheduler_add"): + r = _cron_add("Morning reminder", "daily:09:00", "remind", "Check in.") + assert "c_" in r + listing = _cron_list() + assert "Morning reminder" in listing + assert "daily:09:00" in listing + + def test_add_bad_schedule(self): + from tools.cron import _cron_add + r = _cron_add("Bad job", "every-day", "remind", "Hello") + assert "Bad schedule" in r + + def test_add_bad_type(self): + from tools.cron import _cron_add + r = _cron_add("Bad job", "daily", "email", "Hello") + assert "Bad type" in r + + def _extract_id(self, result: str) -> str: + import re + m = re.search(r'c_\w+', result) + assert m, f"No cron ID in: {result}" + return m.group() + + def test_remove(self): + from tools.cron import _cron_add, _cron_remove, _cron_list + with patch("tools.cron._scheduler_add"), patch("tools.cron._scheduler_remove"): + r = _cron_add("To remove", "hourly", "note", "Tick") + cron_id = self._extract_id(r) + result = _cron_remove(cron_id) + assert "Removed" in result + assert "To remove" not in _cron_list() + + def test_remove_unknown(self): + from tools.cron import _cron_remove + result = _cron_remove("c_doesnotexist") + assert "Not found" in result + + def test_toggle_pause_resume(self): + from tools.cron import _cron_add, _cron_toggle, _cron_list + with patch("tools.cron._scheduler_add"), patch("tools.cron._scheduler_pause"), patch("tools.cron._scheduler_resume"): + r = _cron_add("Toggleable", "daily", "note", "Content") + cron_id = self._extract_id(r) + + result = _cron_toggle(cron_id) + assert "Paused" in result + assert "PAUSED" in _cron_list() + + result = _cron_toggle(cron_id) + assert "Resumed" in result + assert "enabled" in _cron_list() + + def test_reminders_clear(self, persona_dir): + from tools.cron import _reminders_clear + (persona_dir / "REMINDERS.md").write_text("## Some reminder\n\nContent") + result = _reminders_clear() + assert "cleared" in result.lower() + assert (persona_dir / "REMINDERS.md").read_text() == "" diff --git a/cortex/tests/test_webhooks.py b/cortex/tests/test_webhooks.py new file mode 100644 index 0000000..dd2be34 --- /dev/null +++ b/cortex/tests/test_webhooks.py @@ -0,0 +1,164 @@ +""" +Webhook auth tests — NC Talk HMAC, Google Chat JWT. + +These tests verify that auth is enforced, not that full LLM responses work. +""" +import hashlib +import hmac +import json +import pytest +from unittest.mock import AsyncMock, patch + + +# --------------------------------------------------------------------------- +# Nextcloud Talk +# --------------------------------------------------------------------------- + +_NC_SECRET = "test-bot-secret-12345" + +_VALID_NC_PAYLOAD = { + "type": "Create", + "actor": {"type": "users", "id": "testuser", "name": "Test User"}, + "object": { + "type": "Note", + "content": json.dumps({"message": "Hello Inara"}), + }, + "target": {"id": "abc123token"}, +} + + +def _nc_headers(body: bytes, secret: str) -> dict: + random_str = "abc123" + sig = hmac.new( + secret.encode(), + (random_str + body.decode("utf-8")).encode(), + hashlib.sha256, + ).hexdigest() + return { + "X-Nextcloud-Talk-Random": random_str, + "X-Nextcloud-Talk-Signature": sig, + } + + +@pytest.mark.anyio +async def test_nct_valid_signature(client, mock_llm): + body = json.dumps(_VALID_NC_PAYLOAD).encode() + with patch("config.settings.nextcloud_talk_bot_secret", _NC_SECRET): + with patch("routers.nextcloud_talk._send_reply", new_callable=AsyncMock): + headers = _nc_headers(body, _NC_SECRET) + r = await client.post( + "/inara-nextcloud-talk-webhook", + content=body, + headers={**headers, "Content-Type": "application/json"}, + ) + assert r.status_code == 200 + + +@pytest.mark.anyio +async def test_nct_wrong_signature(client): + body = json.dumps(_VALID_NC_PAYLOAD).encode() + with patch("config.settings.nextcloud_talk_bot_secret", _NC_SECRET): + r = await client.post( + "/inara-nextcloud-talk-webhook", + content=body, + headers={ + "Content-Type": "application/json", + "X-Nextcloud-Talk-Random": "abc123", + "X-Nextcloud-Talk-Signature": "badsignature", + }, + ) + assert r.status_code == 401 + + +@pytest.mark.anyio +async def test_nct_missing_signature(client): + body = json.dumps(_VALID_NC_PAYLOAD).encode() + with patch("config.settings.nextcloud_talk_bot_secret", _NC_SECRET): + r = await client.post( + "/inara-nextcloud-talk-webhook", + content=body, + headers={"Content-Type": "application/json"}, + ) + assert r.status_code == 401 + + +@pytest.mark.anyio +async def test_nct_no_secret_configured(client): + """Service should return 500 if secret is not set, not process the message.""" + body = json.dumps(_VALID_NC_PAYLOAD).encode() + with patch("config.settings.nextcloud_talk_bot_secret", ""): + r = await client.post( + "/inara-nextcloud-talk-webhook", + content=body, + headers={"Content-Type": "application/json"}, + ) + assert r.status_code == 500 + + +@pytest.mark.anyio +async def test_nct_bot_message_ignored(client): + """Messages from other bots should be silently ignored (not processed).""" + payload = {**_VALID_NC_PAYLOAD, "actor": {"type": "bots", "id": "otherbot", "name": "Bot"}} + body = json.dumps(payload).encode() + with patch("config.settings.nextcloud_talk_bot_secret", _NC_SECRET): + headers = _nc_headers(body, _NC_SECRET) + r = await client.post( + "/inara-nextcloud-talk-webhook", + content=body, + headers={**headers, "Content-Type": "application/json"}, + ) + assert r.status_code == 200 + + +# --------------------------------------------------------------------------- +# Google Chat +# --------------------------------------------------------------------------- + +_GCHAT_PAYLOAD = { + "chat": { + "messagePayload": { + "message": {"text": "Hello", "argumentText": "Hello"}, + "space": {"name": "spaces/test123", "type": "ROOM"}, + }, + "user": {"displayName": "Test User"}, + } +} + + +@pytest.mark.anyio +async def test_gchat_no_audience_configured(client, mock_llm): + """When audience is not set, JWT check is skipped (current behaviour — documented bypass).""" + with patch("config.settings.google_chat_audience", ""): + r = await client.post("/channels/google-chat", json=_GCHAT_PAYLOAD) + # Should process the message (no auth enforcement when audience is empty) + assert r.status_code == 200 + + +@pytest.mark.anyio +async def test_gchat_missing_token_with_audience(client): + """When audience IS configured, requests without a token must be rejected.""" + with patch("config.settings.google_chat_audience", "123456789"): + r = await client.post("/channels/google-chat", json=_GCHAT_PAYLOAD) + assert r.status_code == 401 + + +@pytest.mark.anyio +async def test_gchat_invalid_token_with_audience(client): + """A fake token should fail JWT verification.""" + payload_with_token = { + **_GCHAT_PAYLOAD, + "authorizationEventObject": {"systemIdToken": "not.a.valid.jwt"}, + } + with patch("config.settings.google_chat_audience", "123456789"): + r = await client.post("/channels/google-chat", json=payload_with_token) + assert r.status_code == 401 + + +@pytest.mark.anyio +async def test_gchat_added_to_space(client, mock_llm): + """Bot added to a space — should return a greeting, no auth when audience empty.""" + payload = {"chat": {"addedToSpacePayload": {"space": {"type": "ROOM"}}}} + with patch("config.settings.google_chat_audience", ""): + r = await client.post("/channels/google-chat", json=payload) + assert r.status_code == 200 + assert "hostAppDataAction" in r.json()