test: add Cortex test suite (77 tests, no LLM calls)
Tests cover: - Smoke: /health, /auth/status, /distill/status (test_health.py) - Persona validation: path traversal, bad names, list_personas (test_persona.py) - Chat API: persona routing, session persistence, error handling (test_api_chat.py) - Files API: ALLOWED set enforcement, read/write, missing files (test_api_files.py) - Webhooks: NC Talk HMAC accept/reject, Google Chat JWT (test_webhooks.py) - Tools: scratch read/write/append/clear, tasks CRUD, cron parser + tools (test_tools.py) - Security: path traversal, replay attack, known gaps documented (test_security.py) All LLM calls mocked — suite runs in ~1.4s. Run: cd cortex && .venv/bin/pytest Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
4
cortex/pytest.ini
Normal file
4
cortex/pytest.ini
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
[pytest]
|
||||||
|
asyncio_mode = auto
|
||||||
|
testpaths = tests
|
||||||
|
pythonpath = .
|
||||||
100
cortex/tests/conftest.py
Normal file
100
cortex/tests/conftest.py
Normal file
@@ -0,0 +1,100 @@
|
|||||||
|
"""
|
||||||
|
Shared fixtures for Cortex test suite.
|
||||||
|
|
||||||
|
Key design choices:
|
||||||
|
- All file I/O goes to a tmp_path, never touching personas/inara/ or real sessions.
|
||||||
|
- LLM calls are mocked by default — tests are fast and deterministic.
|
||||||
|
- The 'app' fixture patches settings before importing main, so all modules
|
||||||
|
see the temp directory.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
import pytest
|
||||||
|
import pytest_asyncio
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import AsyncMock, patch
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from httpx import ASGITransport
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Temp persona directory
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.fixture(scope="session")
|
||||||
|
def personas_root(tmp_path_factory) -> Path:
|
||||||
|
"""A temp personas/ dir with a minimal 'inara' persona for testing."""
|
||||||
|
root = tmp_path_factory.mktemp("personas")
|
||||||
|
_make_persona(root, "inara", "Inara", "Scott")
|
||||||
|
return root
|
||||||
|
|
||||||
|
|
||||||
|
def _make_persona(root: Path, name: str, agent: str, user: str) -> Path:
|
||||||
|
p = root / name
|
||||||
|
p.mkdir(parents=True, exist_ok=True)
|
||||||
|
(p / "IDENTITY.md").write_text(f"# {agent}\nTest identity for {agent}.")
|
||||||
|
(p / "SOUL.md").write_text(f"# Soul\nTest soul for {agent}.")
|
||||||
|
(p / "PROTOCOLS.md").write_text("# Protocols\nBe helpful.")
|
||||||
|
(p / "USER.md").write_text(f"# {user}\nTest user profile.")
|
||||||
|
(p / "HELP.md").write_text("# Help\nTest help content.")
|
||||||
|
(p / "MEMORY_LONG.md").write_text("Not yet populated.")
|
||||||
|
(p / "MEMORY_MID.md").write_text("Not yet populated.")
|
||||||
|
(p / "MEMORY_SHORT.md").write_text("Not yet populated.")
|
||||||
|
(p / "TASKS.json").write_text("[]")
|
||||||
|
(p / "CRONS.json").write_text("[]")
|
||||||
|
(p / "SCRATCH.md").write_text("")
|
||||||
|
(p / "REMINDERS.md").write_text("")
|
||||||
|
(p / "sessions").mkdir()
|
||||||
|
return p
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# App fixture — patches settings before the ASGI app is started
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest_asyncio.fixture
|
||||||
|
async def client(personas_root, tmp_path):
|
||||||
|
"""HTTPX async test client against the live ASGI app with patched paths."""
|
||||||
|
import config
|
||||||
|
import persona as persona_mod
|
||||||
|
|
||||||
|
sessions_dir = tmp_path / "sessions"
|
||||||
|
sessions_dir.mkdir()
|
||||||
|
|
||||||
|
with (
|
||||||
|
patch.object(config.settings, "personas_dir", personas_root),
|
||||||
|
patch.object(config.settings, "sessions_dir", sessions_dir),
|
||||||
|
patch("scheduler.start"), # don't run APScheduler in tests
|
||||||
|
patch("scheduler.stop"),
|
||||||
|
):
|
||||||
|
# Force persona module to re-read patched settings
|
||||||
|
persona_mod._current.set("inara")
|
||||||
|
|
||||||
|
from main import app
|
||||||
|
async with httpx.AsyncClient(
|
||||||
|
transport=ASGITransport(app=app),
|
||||||
|
base_url="http://test",
|
||||||
|
) as c:
|
||||||
|
yield c
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# LLM mock
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_llm():
|
||||||
|
"""
|
||||||
|
Patch complete() at every import site so no real LLM calls are made.
|
||||||
|
Each router does `from llm_client import complete`, creating a local reference.
|
||||||
|
Patching llm_client.complete alone won't intercept those — patch each site.
|
||||||
|
"""
|
||||||
|
ret = ("Hello, I am a test response.", "claude")
|
||||||
|
with (
|
||||||
|
patch("routers.chat.complete", new_callable=AsyncMock, return_value=ret),
|
||||||
|
patch("routers.nextcloud_talk.complete", new_callable=AsyncMock, return_value=ret),
|
||||||
|
patch("routers.google_chat.complete", new_callable=AsyncMock, return_value=ret),
|
||||||
|
patch("llm_client.complete", new_callable=AsyncMock, return_value=ret),
|
||||||
|
):
|
||||||
|
yield
|
||||||
122
cortex/tests/test_api_chat.py
Normal file
122
cortex/tests/test_api_chat.py
Normal file
@@ -0,0 +1,122 @@
|
|||||||
|
"""
|
||||||
|
Tests for POST /chat — persona routing, session handling, LLM mocking.
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_sse(text: str) -> list[dict]:
|
||||||
|
"""Extract JSON payloads from an SSE response body."""
|
||||||
|
events = []
|
||||||
|
for line in text.splitlines():
|
||||||
|
if line.startswith("data: "):
|
||||||
|
try:
|
||||||
|
events.append(json.loads(line[6:]))
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
pass
|
||||||
|
return events
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_chat_basic(client, mock_llm):
|
||||||
|
r = await client.post("/chat", json={"message": "Hello", "persona": "inara"})
|
||||||
|
assert r.status_code == 200
|
||||||
|
events = _parse_sse(r.text)
|
||||||
|
responses = [e for e in events if e.get("type") == "response"]
|
||||||
|
assert len(responses) == 1
|
||||||
|
assert responses[0]["response"] == "Hello, I am a test response."
|
||||||
|
assert responses[0]["backend"] == "claude"
|
||||||
|
assert "session_id" in responses[0]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_chat_default_persona(client, mock_llm):
|
||||||
|
"""persona defaults to 'inara' when not specified."""
|
||||||
|
r = await client.post("/chat", json={"message": "Hi"})
|
||||||
|
assert r.status_code == 200
|
||||||
|
events = _parse_sse(r.text)
|
||||||
|
assert any(e.get("type") == "response" for e in events)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_chat_unknown_persona(client, mock_llm):
|
||||||
|
r = await client.post("/chat", json={"message": "Hi", "persona": "nobody"})
|
||||||
|
assert r.status_code == 200
|
||||||
|
events = _parse_sse(r.text)
|
||||||
|
errors = [e for e in events if e.get("type") == "error"]
|
||||||
|
assert len(errors) == 1
|
||||||
|
assert "nobody" in errors[0]["message"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_chat_path_traversal_persona(client, mock_llm):
|
||||||
|
r = await client.post("/chat", json={"message": "Hi", "persona": "../../etc"})
|
||||||
|
assert r.status_code == 200
|
||||||
|
events = _parse_sse(r.text)
|
||||||
|
errors = [e for e in events if e.get("type") == "error"]
|
||||||
|
assert len(errors) == 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_chat_session_persists(client, mock_llm):
|
||||||
|
"""Same session_id reuses history."""
|
||||||
|
r1 = await client.post("/chat", json={"message": "First message", "session_id": "test-sess-1"})
|
||||||
|
r2 = await client.post("/chat", json={"message": "Second message", "session_id": "test-sess-1"})
|
||||||
|
assert r1.status_code == 200
|
||||||
|
assert r2.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_sessions_list(client, mock_llm):
|
||||||
|
"""After a chat, the session appears in /sessions."""
|
||||||
|
await client.post("/chat", json={"message": "Hello", "session_id": "test-list-sess"})
|
||||||
|
r = await client.get("/sessions")
|
||||||
|
assert r.status_code == 200
|
||||||
|
sessions = r.json()["sessions"]
|
||||||
|
ids = [s["session_id"] for s in sessions]
|
||||||
|
assert "test-list-sess" in ids
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_session_history(client, mock_llm):
|
||||||
|
await client.post("/chat", json={"message": "Hello", "session_id": "test-hist-sess"})
|
||||||
|
r = await client.get("/history/test-hist-sess")
|
||||||
|
assert r.status_code == 200
|
||||||
|
msgs = r.json()["messages"]
|
||||||
|
assert any(m["role"] == "user" and "Hello" in m["content"] for m in msgs)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_session_delete(client, mock_llm):
|
||||||
|
await client.post("/chat", json={"message": "Hello", "session_id": "test-del-sess"})
|
||||||
|
r = await client.delete("/sessions/test-del-sess")
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.json()["ok"] is True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_session_delete_unknown(client):
|
||||||
|
r = await client.delete("/sessions/does-not-exist")
|
||||||
|
assert r.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_backend_get(client):
|
||||||
|
r = await client.get("/backend")
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.json()["primary"] in ("claude", "gemini")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_backend_set(client):
|
||||||
|
r = await client.post("/backend", json={"primary": "gemini"})
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.json()["primary"] == "gemini"
|
||||||
|
# Reset
|
||||||
|
await client.post("/backend", json={"primary": "claude"})
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_backend_set_invalid(client):
|
||||||
|
r = await client.post("/backend", json={"primary": "gpt-4"})
|
||||||
|
assert r.status_code == 400
|
||||||
66
cortex/tests/test_api_files.py
Normal file
66
cortex/tests/test_api_files.py
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
"""
|
||||||
|
Tests for GET/PUT /files/* — allowed set enforcement, read/write, IDENTITY.md.
|
||||||
|
"""
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_files_list(client):
|
||||||
|
r = await client.get("/files")
|
||||||
|
assert r.status_code == 200
|
||||||
|
files = r.json()["files"]
|
||||||
|
names = [f["name"] for f in files]
|
||||||
|
assert "SOUL.md" in names
|
||||||
|
assert "IDENTITY.md" in names
|
||||||
|
assert "USER.md" in names
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_files_get_allowed(client):
|
||||||
|
r = await client.get("/files/IDENTITY.md")
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert "content" in r.json()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_files_get_not_in_allowed(client):
|
||||||
|
"""Files outside the ALLOWED set should return 404, not the file content."""
|
||||||
|
for name in ("TASKS.json", "CRONS.json", "SCRATCH.md", "../config.py", ".env"):
|
||||||
|
r = await client.get(f"/files/{name}")
|
||||||
|
assert r.status_code == 404, f"Expected 404 for {name}, got {r.status_code}"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_files_put_and_get(client):
|
||||||
|
"""Write a new value and read it back."""
|
||||||
|
content = "# Updated PROTOCOLS\nTest content."
|
||||||
|
r = await client.put("/files/PROTOCOLS.md", json={"content": content})
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.json()["ok"] is True
|
||||||
|
|
||||||
|
r2 = await client.get("/files/PROTOCOLS.md")
|
||||||
|
assert r2.status_code == 200
|
||||||
|
assert r2.json()["content"] == content
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_files_put_not_allowed(client):
|
||||||
|
r = await client.put("/files/../../etc/passwd", json={"content": "pwned"})
|
||||||
|
assert r.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_files_get_missing_but_allowed(client, personas_root):
|
||||||
|
"""An allowed file that doesn't exist yet returns 404."""
|
||||||
|
# Temporarily remove MEMORY_MID.md
|
||||||
|
f = personas_root / "inara" / "MEMORY_MID.md"
|
||||||
|
existed = f.exists()
|
||||||
|
if existed:
|
||||||
|
backup = f.read_text()
|
||||||
|
f.unlink()
|
||||||
|
try:
|
||||||
|
r = await client.get("/files/MEMORY_MID.md")
|
||||||
|
assert r.status_code == 404
|
||||||
|
finally:
|
||||||
|
if existed:
|
||||||
|
f.write_text(backup)
|
||||||
34
cortex/tests/test_health.py
Normal file
34
cortex/tests/test_health.py
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
"""
|
||||||
|
Basic smoke tests — if these fail, nothing else matters.
|
||||||
|
"""
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_health(client):
|
||||||
|
r = await client.get("/health")
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert r.json()["status"] == "ok"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_auth_status(client):
|
||||||
|
r = await client.get("/auth/status")
|
||||||
|
assert r.status_code == 200
|
||||||
|
data = r.json()
|
||||||
|
assert "claude" in data
|
||||||
|
assert "gemini" in data
|
||||||
|
assert "ok" in data["claude"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_distill_status(client):
|
||||||
|
r = await client.get("/distill/status")
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert "enabled" in r.json()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_unknown_route_404(client):
|
||||||
|
r = await client.get("/does-not-exist")
|
||||||
|
assert r.status_code == 404
|
||||||
94
cortex/tests/test_persona.py
Normal file
94
cortex/tests/test_persona.py
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
"""
|
||||||
|
Unit tests for persona.py — validation, routing, path traversal.
|
||||||
|
No HTTP involved.
|
||||||
|
"""
|
||||||
|
import pytest
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
|
||||||
|
def _make_temp_personas(tmp_path: Path) -> Path:
|
||||||
|
root = tmp_path / "personas"
|
||||||
|
for name in ("alice", "bob"):
|
||||||
|
p = root / name
|
||||||
|
p.mkdir(parents=True)
|
||||||
|
(p / "IDENTITY.md").write_text(f"# {name}")
|
||||||
|
# A directory WITHOUT IDENTITY.md — should not appear in list_personas()
|
||||||
|
(root / "incomplete").mkdir()
|
||||||
|
return root
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_good(tmp_path):
|
||||||
|
root = _make_temp_personas(tmp_path)
|
||||||
|
import config, persona
|
||||||
|
with patch.object(config.settings, "personas_dir", root):
|
||||||
|
assert persona.validate("alice") == "alice"
|
||||||
|
assert persona.validate("bob") == "bob"
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_unknown(tmp_path):
|
||||||
|
root = _make_temp_personas(tmp_path)
|
||||||
|
import config, persona
|
||||||
|
with patch.object(config.settings, "personas_dir", root):
|
||||||
|
with pytest.raises(ValueError, match="Unknown persona"):
|
||||||
|
persona.validate("charlie")
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_path_traversal(tmp_path):
|
||||||
|
root = _make_temp_personas(tmp_path)
|
||||||
|
import config, persona
|
||||||
|
with patch.object(config.settings, "personas_dir", root):
|
||||||
|
with pytest.raises(ValueError, match="Invalid persona name"):
|
||||||
|
persona.validate("../../etc/passwd")
|
||||||
|
with pytest.raises(ValueError, match="Invalid persona name"):
|
||||||
|
persona.validate("../alice")
|
||||||
|
with pytest.raises(ValueError, match="Invalid persona name"):
|
||||||
|
persona.validate("alice/../../etc")
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_special_chars(tmp_path):
|
||||||
|
root = _make_temp_personas(tmp_path)
|
||||||
|
import config, persona
|
||||||
|
with patch.object(config.settings, "personas_dir", root):
|
||||||
|
for bad in ("alice bob", "alice;bob", "alice\x00bob", "A" * 33, ""):
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
persona.validate(bad)
|
||||||
|
|
||||||
|
|
||||||
|
def test_validate_allows_hyphen_underscore(tmp_path):
|
||||||
|
root = _make_temp_personas(tmp_path)
|
||||||
|
# Create a persona with hyphen and underscore in name
|
||||||
|
p = root / "my_ai-agent"
|
||||||
|
p.mkdir(parents=True)
|
||||||
|
(p / "IDENTITY.md").write_text("# My Agent")
|
||||||
|
import config, persona
|
||||||
|
with patch.object(config.settings, "personas_dir", root):
|
||||||
|
assert persona.validate("my_ai-agent") == "my_ai-agent"
|
||||||
|
|
||||||
|
|
||||||
|
def test_list_personas(tmp_path):
|
||||||
|
root = _make_temp_personas(tmp_path)
|
||||||
|
import config, persona
|
||||||
|
with patch.object(config.settings, "personas_dir", root):
|
||||||
|
names = persona.list_personas()
|
||||||
|
assert "alice" in names
|
||||||
|
assert "bob" in names
|
||||||
|
assert "incomplete" not in names # no IDENTITY.md
|
||||||
|
|
||||||
|
|
||||||
|
def test_persona_path_uses_contextvar(tmp_path):
|
||||||
|
root = _make_temp_personas(tmp_path)
|
||||||
|
import config, persona
|
||||||
|
with patch.object(config.settings, "personas_dir", root):
|
||||||
|
persona.set_persona("alice")
|
||||||
|
assert persona.persona_path() == root / "alice"
|
||||||
|
persona.set_persona("bob")
|
||||||
|
assert persona.persona_path() == root / "bob"
|
||||||
|
|
||||||
|
|
||||||
|
def test_persona_path_explicit_name(tmp_path):
|
||||||
|
root = _make_temp_personas(tmp_path)
|
||||||
|
import config, persona
|
||||||
|
with patch.object(config.settings, "personas_dir", root):
|
||||||
|
persona.set_persona("alice")
|
||||||
|
assert persona.persona_path("bob") == root / "bob"
|
||||||
126
cortex/tests/test_security.py
Normal file
126
cortex/tests/test_security.py
Normal file
@@ -0,0 +1,126 @@
|
|||||||
|
"""
|
||||||
|
Security-focused tests — what should be blocked or rejected.
|
||||||
|
|
||||||
|
These document the current security posture and will catch regressions.
|
||||||
|
Tests marked 'known_gap' document real issues that are not yet fixed;
|
||||||
|
they assert the current (insecure) behaviour so we notice when it changes.
|
||||||
|
"""
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Path traversal
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_files_no_path_traversal_in_filename(client):
|
||||||
|
"""File endpoint must not serve files outside the ALLOWED set."""
|
||||||
|
dangerous = [
|
||||||
|
"../config.py",
|
||||||
|
"../../etc/passwd",
|
||||||
|
"SOUL.md/../config.py",
|
||||||
|
".env",
|
||||||
|
"TASKS.json",
|
||||||
|
"CRONS.json",
|
||||||
|
]
|
||||||
|
for name in dangerous:
|
||||||
|
r = await client.get(f"/files/{name}")
|
||||||
|
assert r.status_code in (404, 422), \
|
||||||
|
f"Expected 404/422 for {name!r}, got {r.status_code}"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_persona_traversal_blocked_in_chat(client, mock_llm):
|
||||||
|
"""Path traversal in persona name must be rejected before any file access."""
|
||||||
|
for bad in ("../inara", "../../etc", "inara/../inara", "inara\x00extra"):
|
||||||
|
r = await client.post("/chat", json={"message": "hi", "persona": bad})
|
||||||
|
assert r.status_code == 200 # SSE stream, not HTTP error
|
||||||
|
import json
|
||||||
|
for line in r.text.splitlines():
|
||||||
|
if line.startswith("data: "):
|
||||||
|
event = json.loads(line[6:])
|
||||||
|
if event.get("type") == "error":
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
pytest.fail(f"Expected error event for persona={bad!r}, got: {r.text[:200]}")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_orchestrate_path_traversal(client, mock_llm):
|
||||||
|
r = await client.post("/orchestrate", json={"task": "hi", "persona": "../../etc"})
|
||||||
|
assert r.status_code == 400
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Signature verification
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_nct_replayed_request_rejected(client):
|
||||||
|
"""A request with correct format but wrong HMAC should always be rejected."""
|
||||||
|
import json, hashlib, hmac as hmac_lib
|
||||||
|
payload = json.dumps({"type": "Create", "actor": {}, "object": {}, "target": {}}).encode()
|
||||||
|
# Use wrong secret to generate sig
|
||||||
|
wrong_sig = hmac_lib.new(b"wrong-secret", b"abc123" + payload, hashlib.sha256).hexdigest()
|
||||||
|
from unittest.mock import patch
|
||||||
|
with patch("config.settings.nextcloud_talk_bot_secret", "correct-secret"):
|
||||||
|
r = await client.post(
|
||||||
|
"/inara-nextcloud-talk-webhook",
|
||||||
|
content=payload,
|
||||||
|
headers={
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"X-Nextcloud-Talk-Random": "abc123",
|
||||||
|
"X-Nextcloud-Talk-Signature": wrong_sig,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert r.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Known gaps — document current behaviour, alert when it changes
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_known_gap__distill_no_app_auth(client):
|
||||||
|
"""
|
||||||
|
KNOWN GAP: /distill/* has no app-layer auth.
|
||||||
|
Anyone reaching port 8000 directly can trigger LLM calls and overwrite memory.
|
||||||
|
Protection is currently nginx-only.
|
||||||
|
This test documents the current state — update when app-layer auth is added.
|
||||||
|
"""
|
||||||
|
r = await client.get("/distill/status")
|
||||||
|
assert r.status_code == 200 # currently open
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_known_gap__files_put_no_app_auth(client):
|
||||||
|
"""
|
||||||
|
KNOWN GAP: PUT /files/{filename} has no app-layer auth.
|
||||||
|
Overwriting SOUL.md or IDENTITY.md changes agent behavior.
|
||||||
|
Protection is currently nginx-only.
|
||||||
|
"""
|
||||||
|
r = await client.put("/files/PROTOCOLS.md", json={"content": "# Modified"})
|
||||||
|
assert r.status_code == 200 # currently open
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_known_gap__gchat_no_audience_bypass(client, mock_llm):
|
||||||
|
"""
|
||||||
|
KNOWN GAP: Google Chat JWT verification is silently skipped when
|
||||||
|
GOOGLE_CHAT_AUDIENCE is empty (the default). Anyone can POST and get
|
||||||
|
LLM responses without a valid token.
|
||||||
|
Fix: make audience required; fail loudly if not set.
|
||||||
|
"""
|
||||||
|
from unittest.mock import patch
|
||||||
|
with patch("config.settings.google_chat_audience", ""):
|
||||||
|
r = await client.post("/channels/google-chat", json={
|
||||||
|
"chat": {
|
||||||
|
"messagePayload": {
|
||||||
|
"message": {"text": "Exploit"},
|
||||||
|
"space": {"name": "spaces/x", "type": "DM"},
|
||||||
|
},
|
||||||
|
"user": {"displayName": "Attacker"},
|
||||||
|
}
|
||||||
|
})
|
||||||
|
# This currently succeeds — it should not when audience is unconfigured
|
||||||
|
assert r.status_code == 200 # documents the gap
|
||||||
271
cortex/tests/test_tools.py
Normal file
271
cortex/tests/test_tools.py
Normal file
@@ -0,0 +1,271 @@
|
|||||||
|
"""
|
||||||
|
Unit tests for Inara's internal tools — scratch, tasks, cron.
|
||||||
|
|
||||||
|
These test the sync implementation functions directly, using a temp directory.
|
||||||
|
No HTTP, no LLM calls.
|
||||||
|
"""
|
||||||
|
import json
|
||||||
|
import pytest
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def persona_dir(tmp_path) -> Path:
|
||||||
|
"""A temp persona directory pre-populated with empty tool files."""
|
||||||
|
d = tmp_path / "inara"
|
||||||
|
d.mkdir()
|
||||||
|
(d / "SCRATCH.md").write_text("")
|
||||||
|
(d / "TASKS.json").write_text("[]")
|
||||||
|
(d / "CRONS.json").write_text("[]")
|
||||||
|
(d / "REMINDERS.md").write_text("")
|
||||||
|
return d
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def patch_persona_path(persona_dir):
|
||||||
|
"""
|
||||||
|
Route all persona_path() calls to the temp dir.
|
||||||
|
Each tool does `from persona import persona_path`, so we must patch
|
||||||
|
the name in each module's namespace, not just in persona itself.
|
||||||
|
"""
|
||||||
|
with (
|
||||||
|
patch("tools.scratch.persona_path", return_value=persona_dir),
|
||||||
|
patch("tools.tasks.persona_path", return_value=persona_dir),
|
||||||
|
patch("tools.cron.persona_path", return_value=persona_dir),
|
||||||
|
patch("cron_runner._persona_path", return_value=persona_dir),
|
||||||
|
):
|
||||||
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Scratch
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestScratch:
|
||||||
|
def test_read_empty(self):
|
||||||
|
from tools.scratch import _scratch_read
|
||||||
|
result = _scratch_read()
|
||||||
|
assert "empty" in result.lower()
|
||||||
|
|
||||||
|
def test_write_and_read(self, persona_dir):
|
||||||
|
from tools.scratch import _scratch_write, _scratch_read
|
||||||
|
_scratch_write("# My notes\n\nSome content here.")
|
||||||
|
content = _scratch_read()
|
||||||
|
assert "My notes" in content
|
||||||
|
assert "Some content here" in content
|
||||||
|
|
||||||
|
def test_append_adds_section(self, persona_dir):
|
||||||
|
from tools.scratch import _scratch_write, _scratch_append, _scratch_read
|
||||||
|
_scratch_write("# Existing")
|
||||||
|
_scratch_append("New section content", heading="Section A")
|
||||||
|
content = _scratch_read()
|
||||||
|
assert "Existing" in content
|
||||||
|
assert "Section A" in content
|
||||||
|
assert "New section content" in content
|
||||||
|
|
||||||
|
def test_append_auto_heading(self, persona_dir):
|
||||||
|
from tools.scratch import _scratch_append, _scratch_read
|
||||||
|
_scratch_append("Content without explicit heading")
|
||||||
|
content = _scratch_read()
|
||||||
|
assert "UTC" in content # auto heading includes timestamp
|
||||||
|
|
||||||
|
def test_clear(self, persona_dir):
|
||||||
|
from tools.scratch import _scratch_write, _scratch_clear, _scratch_read
|
||||||
|
_scratch_write("Some content")
|
||||||
|
_scratch_clear()
|
||||||
|
assert "empty" in _scratch_read().lower()
|
||||||
|
|
||||||
|
def test_write_strips_trailing_whitespace(self, persona_dir):
|
||||||
|
from tools.scratch import _scratch_write, _scratch_read
|
||||||
|
_scratch_write("Content \n\n\n")
|
||||||
|
content = (persona_dir / "SCRATCH.md").read_text()
|
||||||
|
assert content.endswith("\n")
|
||||||
|
assert not content.endswith("\n\n")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Tasks
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestTasks:
|
||||||
|
def _mk(self, title, description=None, priority="normal"):
|
||||||
|
from tools.tasks import _task_create
|
||||||
|
return _task_create(title, description, priority)
|
||||||
|
|
||||||
|
def _id(self, result: str) -> str:
|
||||||
|
import re
|
||||||
|
m = re.search(r't_\w+', result)
|
||||||
|
assert m, f"No task ID in: {result}"
|
||||||
|
return m.group()
|
||||||
|
|
||||||
|
def test_list_empty(self):
|
||||||
|
from tools.tasks import _task_list
|
||||||
|
assert "No tasks" in _task_list(status=None)
|
||||||
|
|
||||||
|
def test_create_and_list(self):
|
||||||
|
from tools.tasks import _task_list
|
||||||
|
self._mk("Buy coffee", description="Dark roast", priority="high")
|
||||||
|
result = _task_list(status=None)
|
||||||
|
assert "Buy coffee" in result
|
||||||
|
assert "[high]" in result
|
||||||
|
|
||||||
|
def test_create_bad_priority_defaults_to_normal(self):
|
||||||
|
from tools.tasks import _task_list
|
||||||
|
self._mk("Test task", priority="urgent") # invalid — becomes "normal"
|
||||||
|
result = _task_list(status=None)
|
||||||
|
assert "Test task" in result
|
||||||
|
assert "[normal]" not in result # normal priority not shown in brackets
|
||||||
|
|
||||||
|
def test_update_status(self):
|
||||||
|
from tools.tasks import _task_update, _task_list
|
||||||
|
tid = self._id(self._mk("Work item"))
|
||||||
|
_task_update(tid, status="in_progress", title=None, description=None, priority=None)
|
||||||
|
assert "Work item" in _task_list(status="in_progress")
|
||||||
|
|
||||||
|
def test_complete(self):
|
||||||
|
from tools.tasks import _task_complete, _task_list
|
||||||
|
tid = self._id(self._mk("Finish this"))
|
||||||
|
_task_complete(tid)
|
||||||
|
assert "Finish this" in _task_list(status="done")
|
||||||
|
assert "Finish this" not in _task_list(status="todo")
|
||||||
|
|
||||||
|
def test_filter_by_status(self):
|
||||||
|
from tools.tasks import _task_list
|
||||||
|
self._mk("A task")
|
||||||
|
assert "A task" in _task_list(status="todo")
|
||||||
|
assert "A task" not in _task_list(status="done")
|
||||||
|
|
||||||
|
def test_update_unknown_id(self):
|
||||||
|
from tools.tasks import _task_update
|
||||||
|
result = _task_update("t_doesnotexist", status="done",
|
||||||
|
title=None, description=None, priority=None)
|
||||||
|
assert "not found" in result.lower()
|
||||||
|
|
||||||
|
def test_persistence(self, persona_dir):
|
||||||
|
"""Tasks survive between _load() calls (written to TASKS.json)."""
|
||||||
|
self._mk("Persistent task")
|
||||||
|
data = json.loads((persona_dir / "TASKS.json").read_text())
|
||||||
|
assert any(t["title"] == "Persistent task" for t in data)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Cron
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestCronRunner:
|
||||||
|
def test_parse_hourly(self):
|
||||||
|
from cron_runner import parse_schedule
|
||||||
|
assert parse_schedule("hourly") == {"minute": 0}
|
||||||
|
|
||||||
|
def test_parse_daily_default(self):
|
||||||
|
from cron_runner import parse_schedule
|
||||||
|
r = parse_schedule("daily")
|
||||||
|
assert r["hour"] == 9
|
||||||
|
assert r["minute"] == 0
|
||||||
|
|
||||||
|
def test_parse_daily_custom_time(self):
|
||||||
|
from cron_runner import parse_schedule
|
||||||
|
r = parse_schedule("daily:14:30")
|
||||||
|
assert r["hour"] == 14
|
||||||
|
assert r["minute"] == 30
|
||||||
|
|
||||||
|
def test_parse_weekly(self):
|
||||||
|
from cron_runner import parse_schedule
|
||||||
|
r = parse_schedule("weekly:mon")
|
||||||
|
assert r["day_of_week"] == "mon"
|
||||||
|
assert r["hour"] == 9
|
||||||
|
|
||||||
|
def test_parse_weekly_with_time(self):
|
||||||
|
from cron_runner import parse_schedule
|
||||||
|
r = parse_schedule("weekly:fri:17:00")
|
||||||
|
assert r["day_of_week"] == "fri"
|
||||||
|
assert r["hour"] == 17
|
||||||
|
assert r["minute"] == 0
|
||||||
|
|
||||||
|
def test_parse_full_day_names(self):
|
||||||
|
from cron_runner import parse_schedule
|
||||||
|
assert parse_schedule("weekly:monday")["day_of_week"] == "mon"
|
||||||
|
assert parse_schedule("weekly:friday")["day_of_week"] == "fri"
|
||||||
|
|
||||||
|
def test_parse_unknown_schedule(self):
|
||||||
|
from cron_runner import parse_schedule
|
||||||
|
with pytest.raises(ValueError, match="Unrecognised"):
|
||||||
|
parse_schedule("every-tuesday")
|
||||||
|
|
||||||
|
def test_parse_bad_dow(self):
|
||||||
|
from cron_runner import parse_schedule
|
||||||
|
with pytest.raises(ValueError, match="day of week"):
|
||||||
|
parse_schedule("weekly:funday")
|
||||||
|
|
||||||
|
def test_parse_bad_time_non_integer(self):
|
||||||
|
from cron_runner import parse_schedule
|
||||||
|
with pytest.raises((ValueError, Exception)):
|
||||||
|
parse_schedule("daily:noon") # non-integer — parse error
|
||||||
|
|
||||||
|
|
||||||
|
class TestCronTools:
|
||||||
|
def test_list_empty(self):
|
||||||
|
from tools.cron import _cron_list
|
||||||
|
result = _cron_list()
|
||||||
|
assert "No crons" in result
|
||||||
|
|
||||||
|
def test_add_and_list(self):
|
||||||
|
from tools.cron import _cron_add, _cron_list
|
||||||
|
with patch("tools.cron._scheduler_add"):
|
||||||
|
r = _cron_add("Morning reminder", "daily:09:00", "remind", "Check in.")
|
||||||
|
assert "c_" in r
|
||||||
|
listing = _cron_list()
|
||||||
|
assert "Morning reminder" in listing
|
||||||
|
assert "daily:09:00" in listing
|
||||||
|
|
||||||
|
def test_add_bad_schedule(self):
|
||||||
|
from tools.cron import _cron_add
|
||||||
|
r = _cron_add("Bad job", "every-day", "remind", "Hello")
|
||||||
|
assert "Bad schedule" in r
|
||||||
|
|
||||||
|
def test_add_bad_type(self):
|
||||||
|
from tools.cron import _cron_add
|
||||||
|
r = _cron_add("Bad job", "daily", "email", "Hello")
|
||||||
|
assert "Bad type" in r
|
||||||
|
|
||||||
|
def _extract_id(self, result: str) -> str:
|
||||||
|
import re
|
||||||
|
m = re.search(r'c_\w+', result)
|
||||||
|
assert m, f"No cron ID in: {result}"
|
||||||
|
return m.group()
|
||||||
|
|
||||||
|
def test_remove(self):
|
||||||
|
from tools.cron import _cron_add, _cron_remove, _cron_list
|
||||||
|
with patch("tools.cron._scheduler_add"), patch("tools.cron._scheduler_remove"):
|
||||||
|
r = _cron_add("To remove", "hourly", "note", "Tick")
|
||||||
|
cron_id = self._extract_id(r)
|
||||||
|
result = _cron_remove(cron_id)
|
||||||
|
assert "Removed" in result
|
||||||
|
assert "To remove" not in _cron_list()
|
||||||
|
|
||||||
|
def test_remove_unknown(self):
|
||||||
|
from tools.cron import _cron_remove
|
||||||
|
result = _cron_remove("c_doesnotexist")
|
||||||
|
assert "Not found" in result
|
||||||
|
|
||||||
|
def test_toggle_pause_resume(self):
|
||||||
|
from tools.cron import _cron_add, _cron_toggle, _cron_list
|
||||||
|
with patch("tools.cron._scheduler_add"), patch("tools.cron._scheduler_pause"), patch("tools.cron._scheduler_resume"):
|
||||||
|
r = _cron_add("Toggleable", "daily", "note", "Content")
|
||||||
|
cron_id = self._extract_id(r)
|
||||||
|
|
||||||
|
result = _cron_toggle(cron_id)
|
||||||
|
assert "Paused" in result
|
||||||
|
assert "PAUSED" in _cron_list()
|
||||||
|
|
||||||
|
result = _cron_toggle(cron_id)
|
||||||
|
assert "Resumed" in result
|
||||||
|
assert "enabled" in _cron_list()
|
||||||
|
|
||||||
|
def test_reminders_clear(self, persona_dir):
|
||||||
|
from tools.cron import _reminders_clear
|
||||||
|
(persona_dir / "REMINDERS.md").write_text("## Some reminder\n\nContent")
|
||||||
|
result = _reminders_clear()
|
||||||
|
assert "cleared" in result.lower()
|
||||||
|
assert (persona_dir / "REMINDERS.md").read_text() == ""
|
||||||
164
cortex/tests/test_webhooks.py
Normal file
164
cortex/tests/test_webhooks.py
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
"""
|
||||||
|
Webhook auth tests — NC Talk HMAC, Google Chat JWT.
|
||||||
|
|
||||||
|
These tests verify that auth is enforced, not that full LLM responses work.
|
||||||
|
"""
|
||||||
|
import hashlib
|
||||||
|
import hmac
|
||||||
|
import json
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import AsyncMock, patch
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Nextcloud Talk
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_NC_SECRET = "test-bot-secret-12345"
|
||||||
|
|
||||||
|
_VALID_NC_PAYLOAD = {
|
||||||
|
"type": "Create",
|
||||||
|
"actor": {"type": "users", "id": "testuser", "name": "Test User"},
|
||||||
|
"object": {
|
||||||
|
"type": "Note",
|
||||||
|
"content": json.dumps({"message": "Hello Inara"}),
|
||||||
|
},
|
||||||
|
"target": {"id": "abc123token"},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def _nc_headers(body: bytes, secret: str) -> dict:
|
||||||
|
random_str = "abc123"
|
||||||
|
sig = hmac.new(
|
||||||
|
secret.encode(),
|
||||||
|
(random_str + body.decode("utf-8")).encode(),
|
||||||
|
hashlib.sha256,
|
||||||
|
).hexdigest()
|
||||||
|
return {
|
||||||
|
"X-Nextcloud-Talk-Random": random_str,
|
||||||
|
"X-Nextcloud-Talk-Signature": sig,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_nct_valid_signature(client, mock_llm):
|
||||||
|
body = json.dumps(_VALID_NC_PAYLOAD).encode()
|
||||||
|
with patch("config.settings.nextcloud_talk_bot_secret", _NC_SECRET):
|
||||||
|
with patch("routers.nextcloud_talk._send_reply", new_callable=AsyncMock):
|
||||||
|
headers = _nc_headers(body, _NC_SECRET)
|
||||||
|
r = await client.post(
|
||||||
|
"/inara-nextcloud-talk-webhook",
|
||||||
|
content=body,
|
||||||
|
headers={**headers, "Content-Type": "application/json"},
|
||||||
|
)
|
||||||
|
assert r.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_nct_wrong_signature(client):
|
||||||
|
body = json.dumps(_VALID_NC_PAYLOAD).encode()
|
||||||
|
with patch("config.settings.nextcloud_talk_bot_secret", _NC_SECRET):
|
||||||
|
r = await client.post(
|
||||||
|
"/inara-nextcloud-talk-webhook",
|
||||||
|
content=body,
|
||||||
|
headers={
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"X-Nextcloud-Talk-Random": "abc123",
|
||||||
|
"X-Nextcloud-Talk-Signature": "badsignature",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert r.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_nct_missing_signature(client):
|
||||||
|
body = json.dumps(_VALID_NC_PAYLOAD).encode()
|
||||||
|
with patch("config.settings.nextcloud_talk_bot_secret", _NC_SECRET):
|
||||||
|
r = await client.post(
|
||||||
|
"/inara-nextcloud-talk-webhook",
|
||||||
|
content=body,
|
||||||
|
headers={"Content-Type": "application/json"},
|
||||||
|
)
|
||||||
|
assert r.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_nct_no_secret_configured(client):
|
||||||
|
"""Service should return 500 if secret is not set, not process the message."""
|
||||||
|
body = json.dumps(_VALID_NC_PAYLOAD).encode()
|
||||||
|
with patch("config.settings.nextcloud_talk_bot_secret", ""):
|
||||||
|
r = await client.post(
|
||||||
|
"/inara-nextcloud-talk-webhook",
|
||||||
|
content=body,
|
||||||
|
headers={"Content-Type": "application/json"},
|
||||||
|
)
|
||||||
|
assert r.status_code == 500
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_nct_bot_message_ignored(client):
|
||||||
|
"""Messages from other bots should be silently ignored (not processed)."""
|
||||||
|
payload = {**_VALID_NC_PAYLOAD, "actor": {"type": "bots", "id": "otherbot", "name": "Bot"}}
|
||||||
|
body = json.dumps(payload).encode()
|
||||||
|
with patch("config.settings.nextcloud_talk_bot_secret", _NC_SECRET):
|
||||||
|
headers = _nc_headers(body, _NC_SECRET)
|
||||||
|
r = await client.post(
|
||||||
|
"/inara-nextcloud-talk-webhook",
|
||||||
|
content=body,
|
||||||
|
headers={**headers, "Content-Type": "application/json"},
|
||||||
|
)
|
||||||
|
assert r.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Google Chat
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
_GCHAT_PAYLOAD = {
|
||||||
|
"chat": {
|
||||||
|
"messagePayload": {
|
||||||
|
"message": {"text": "Hello", "argumentText": "Hello"},
|
||||||
|
"space": {"name": "spaces/test123", "type": "ROOM"},
|
||||||
|
},
|
||||||
|
"user": {"displayName": "Test User"},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_gchat_no_audience_configured(client, mock_llm):
|
||||||
|
"""When audience is not set, JWT check is skipped (current behaviour — documented bypass)."""
|
||||||
|
with patch("config.settings.google_chat_audience", ""):
|
||||||
|
r = await client.post("/channels/google-chat", json=_GCHAT_PAYLOAD)
|
||||||
|
# Should process the message (no auth enforcement when audience is empty)
|
||||||
|
assert r.status_code == 200
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_gchat_missing_token_with_audience(client):
|
||||||
|
"""When audience IS configured, requests without a token must be rejected."""
|
||||||
|
with patch("config.settings.google_chat_audience", "123456789"):
|
||||||
|
r = await client.post("/channels/google-chat", json=_GCHAT_PAYLOAD)
|
||||||
|
assert r.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_gchat_invalid_token_with_audience(client):
|
||||||
|
"""A fake token should fail JWT verification."""
|
||||||
|
payload_with_token = {
|
||||||
|
**_GCHAT_PAYLOAD,
|
||||||
|
"authorizationEventObject": {"systemIdToken": "not.a.valid.jwt"},
|
||||||
|
}
|
||||||
|
with patch("config.settings.google_chat_audience", "123456789"):
|
||||||
|
r = await client.post("/channels/google-chat", json=payload_with_token)
|
||||||
|
assert r.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_gchat_added_to_space(client, mock_llm):
|
||||||
|
"""Bot added to a space — should return a greeting, no auth when audience empty."""
|
||||||
|
payload = {"chat": {"addedToSpacePayload": {"space": {"type": "ROOM"}}}}
|
||||||
|
with patch("config.settings.google_chat_audience", ""):
|
||||||
|
r = await client.post("/channels/google-chat", json=payload)
|
||||||
|
assert r.status_code == 200
|
||||||
|
assert "hostAppDataAction" in r.json()
|
||||||
Reference in New Issue
Block a user