Files
Cortex-Inara/cortex/tests/test_agent_manager.py
Scott Idem 0c1cf3989a feat: aider multi-provider credentials + test suite green (182/182)
aider_run multi-provider credentials (tools/aider.py):
- _resolve_credentials() — general credential resolver; replaces the previous
  OpenRouter-only injection; resolution priority: Anthropic model hint → explicit
  host_label → model prefix (openrouter/*, groq/*, deepseek/*, …) → OpenRouter
  default → Anthropic API key → any keyed cloud host → local/generic host
- _host_flags() — generates --api-key slug=key for known cloud providers (OpenRouter,
  OpenAI, Groq, Together, Fireworks, X.ai, DeepSeek, Mistral); generates
  --openai-api-base + --openai-api-key for generic/local hosts (Open WebUI, Ollama);
  appends /api suffix for openwebui host_type; auto-prefixes model with 'openai/'
  for generic endpoints when model has no / prefix
- Anthropic API keys from providers.anthropic.credentials (not a host entry)
- host_label param added to aider_run and FunctionDeclaration — pick a configured
  host by partial label match (e.g. 'OpenRouter', 'Local', 'scott-lt-i7-rtx')
- 16 unit tests for _resolve_credentials covering all resolution paths

main.py: move @app.get("/health") before app.include_router(ui.router) — the
/{username} catch-all in ui.router was swallowing the /health path

Test suite: 37 pre-existing failures → 182/182 passing
- test_tools.py: _task_list() missing priority arg (6 callsites); cron ID regex
  c_\w+ → c_[\w-]+ (token_urlsafe includes '-', causing intermittent truncation)
- test_webhooks.py: rewritten for per-user channel config architecture —
  patch routers.nextcloud_talk/google_chat.get_user_channels instead of removed
  settings fields; corrected endpoints /webhook/nextcloud/scott and
  /channels/google-chat/scott; non-empty cfg dicts so falsy-guard passes
- test_health.py: test_unknown_route_404 now uses 3-segment path (/{u}/{p}/x)
  since single-segment paths hit the /{username} UI catch-all
- test_api_files.py: removed '../config.py' from not-in-allowed test (ASGI
  normalizes it to /config.py which hits /{username} catch-all, not files router)
- test_security.py: same webhook patch target fix; per-user endpoint URLs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 23:00:45 -04:00

877 lines
34 KiB
Python

"""
Tests for agent_manager.py and the spawn_agent / aider_run background paths.
Run with:
cd cortex && .venv/bin/python -m pytest tests/test_agent_manager.py -v
No browser, no LLM calls, no Cortex service needed. All LLM interactions are mocked.
The agent_manager tests need no mocks at all — the module is pure asyncio.
"""
import asyncio
import pytest
import pytest_asyncio
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_mock_result(response: str = "Agent done."):
"""Build a mock OrchestratorResult returned by openai_orchestrator.run."""
r = MagicMock()
r.checkpoint = None
r.response = response
return r
def _mock_spawn_deps(
model_type: str = "local_openai",
user_role: str = "admin",
tool_policy: dict | None = None,
role_tools: list | None = None,
):
"""Return a context-manager stack that patches all spawn_agent external deps."""
if tool_policy is None:
tool_policy = {"allow": [], "deny": []}
model_cfg = {
"type": model_type,
"api_url": "http://localhost:3000",
"model_name": "test-model",
"api_key": "x",
}
role_cfg = {
"tools": role_tools,
"system_append": "",
"inject_datetime": True,
"inject_mode": True,
}
class _Stack:
def __enter__(self_):
self_._patches = [
patch("model_registry.get_role_config", return_value=role_cfg),
patch("model_registry.get_model_for_role", return_value=model_cfg),
patch("model_registry.get_registry", return_value={"hosts": []}),
patch("context_loader.load_context", return_value="Test system prompt"),
patch("auth_utils.get_user_role", return_value=user_role),
patch("auth_utils.get_tool_policy", return_value=tool_policy),
patch("persona.get_user", return_value="scott"),
]
for p in self_._patches:
p.start()
return self_
def __exit__(self_, *args):
for p in self_._patches:
p.stop()
return _Stack()
# ---------------------------------------------------------------------------
# Fixture — reset agent_manager state between tests
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def clear_agent_registry():
"""Wipe the in-process agent registry before each test."""
import agent_manager
agent_manager._agents.clear()
yield
agent_manager._agents.clear()
# ---------------------------------------------------------------------------
# agent_manager — core CRUD
# ---------------------------------------------------------------------------
class TestAgentManagerCore:
@pytest.mark.asyncio
async def test_register_creates_record(self):
import agent_manager
rec = await agent_manager.register(
user="scott", role="research", task="Investigate topic X", level=2
)
assert rec.agent_id in agent_manager._agents
assert rec.status == "running"
assert rec.level == 2
assert rec.role == "research"
assert rec.task == "Investigate topic X"
assert rec.user == "scott"
assert rec.finished is None
@pytest.mark.asyncio
async def test_register_truncates_long_task(self):
import agent_manager
long_task = "x" * 500
rec = await agent_manager.register(user="scott", role="chat", task=long_task, level=2)
assert len(rec.task) == 200
@pytest.mark.asyncio
async def test_finish_updates_record(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
await agent_manager.finish(rec.agent_id, "All done!", "done")
updated = agent_manager.get(rec.agent_id)
assert updated.status == "done"
assert updated.result == "All done!"
assert updated.finished is not None
@pytest.mark.asyncio
async def test_finish_truncates_result(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
await agent_manager.finish(rec.agent_id, "y" * 2000)
updated = agent_manager.get(rec.agent_id)
assert len(updated.result) <= agent_manager._RESULT_PREVIEW_CHARS
@pytest.mark.asyncio
async def test_finish_failed_status(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
await agent_manager.finish(rec.agent_id, "Boom", "failed")
assert agent_manager.get(rec.agent_id).status == "failed"
@pytest.mark.asyncio
async def test_cancel_own_agent(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
msg = await agent_manager.cancel_agent(rec.agent_id, "scott")
assert "cancelled" in msg
assert agent_manager.get(rec.agent_id).status == "cancelled"
@pytest.mark.asyncio
async def test_cancel_wrong_user_denied(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
msg = await agent_manager.cancel_agent(rec.agent_id, "holly")
assert "denied" in msg.lower()
assert agent_manager.get(rec.agent_id).status == "running"
@pytest.mark.asyncio
async def test_cancel_nonexistent_agent(self):
import agent_manager
msg = await agent_manager.cancel_agent("does-not-exist", "scott")
assert "No agent found" in msg
@pytest.mark.asyncio
async def test_cancel_already_done(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
await agent_manager.finish(rec.agent_id, "done", "done")
msg = await agent_manager.cancel_agent(rec.agent_id, "scott")
assert "already" in msg or "done" in msg
@pytest.mark.asyncio
async def test_cancel_kills_real_task(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
sleep_task = asyncio.create_task(asyncio.sleep(60))
agent_manager.set_task_ref(rec.agent_id, sleep_task)
await agent_manager.cancel_agent(rec.agent_id, "scott")
await asyncio.sleep(0) # let the event loop process the cancellation
assert sleep_task.cancelled() or sleep_task.done()
def test_list_agents_returns_users_agents(self):
import agent_manager
# Manually populate the registry
agent_manager._agents["a1"] = _make_record("a1", "scott", "running")
agent_manager._agents["a2"] = _make_record("a2", "scott", "done")
agent_manager._agents["a3"] = _make_record("a3", "holly", "running")
records = agent_manager.list_agents("scott")
ids = {r.agent_id for r in records}
assert "a1" in ids
assert "a2" in ids
assert "a3" not in ids
def test_list_agents_filters_by_status(self):
import agent_manager
agent_manager._agents["a1"] = _make_record("a1", "scott", "running")
agent_manager._agents["a2"] = _make_record("a2", "scott", "done")
running = agent_manager.list_agents("scott", status="running")
assert len(running) == 1
assert running[0].agent_id == "a1"
def test_list_agents_respects_limit(self):
import agent_manager
for i in range(20):
agent_manager._agents[f"a{i}"] = _make_record(f"a{i}", "scott", "done")
records = agent_manager.list_agents("scott", limit=5)
assert len(records) == 5
@pytest.mark.asyncio
async def test_prune_removes_old_completed(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
await agent_manager.finish(rec.agent_id, "done")
# Manually backdate the finished time past the prune threshold
agent_manager._agents[rec.agent_id].finished = (
datetime.now() - agent_manager._PRUNE_AFTER - timedelta(seconds=1)
)
# Trigger pruning via a new registration
await agent_manager.register(user="scott", role="chat", task="t2", level=2)
assert agent_manager.get(rec.agent_id) is None
@pytest.mark.asyncio
async def test_prune_keeps_running_agents(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
# Running agent — finished is None so it should never be pruned
assert rec.agent_id in agent_manager._agents
await agent_manager.register(user="scott", role="chat", task="t2", level=2)
assert agent_manager.get(rec.agent_id) is not None
@pytest.mark.asyncio
async def test_finish_unknown_agent_is_noop(self):
import agent_manager
# Should not raise
await agent_manager.finish("ghost-id", "result", "done")
# ---------------------------------------------------------------------------
# agent_manager — notification hook
# ---------------------------------------------------------------------------
class TestAgentManagerNotify:
@pytest.mark.asyncio
async def test_notify_called_on_done(self):
import agent_manager
rec = await agent_manager.register(
user="scott", role="chat", task="t", level=2, notify=True
)
with patch("notification.notify", new_callable=AsyncMock) as mock_notify:
await agent_manager.finish(rec.agent_id, "All good", "done")
mock_notify.assert_called_once()
call_args = mock_notify.call_args
assert call_args[0][0] == "scott" # user
assert "" in call_args[0][1] # success emoji
@pytest.mark.asyncio
async def test_notify_called_on_failed(self):
import agent_manager
rec = await agent_manager.register(
user="scott", role="chat", task="t", level=2, notify=True
)
with patch("notification.notify", new_callable=AsyncMock) as mock_notify:
await agent_manager.finish(rec.agent_id, "Oops", "failed")
mock_notify.assert_called_once()
assert "⚠️" in mock_notify.call_args[0][1]
@pytest.mark.asyncio
async def test_no_notify_when_cancelled(self):
import agent_manager
rec = await agent_manager.register(
user="scott", role="chat", task="t", level=2, notify=True
)
with patch("notification.notify", new_callable=AsyncMock) as mock_notify:
await agent_manager.finish(rec.agent_id, "Cancelled.", "cancelled")
mock_notify.assert_not_called()
@pytest.mark.asyncio
async def test_no_notify_when_flag_false(self):
import agent_manager
rec = await agent_manager.register(
user="scott", role="chat", task="t", level=2, notify=False
)
with patch("notification.notify", new_callable=AsyncMock) as mock_notify:
await agent_manager.finish(rec.agent_id, "Done", "done")
mock_notify.assert_not_called()
# ---------------------------------------------------------------------------
# spawn_agent — background mode
# ---------------------------------------------------------------------------
class TestSpawnAgentBackground:
@pytest.mark.asyncio
async def test_background_returns_agent_id_immediately(self):
import agent_manager
from tools.agents import spawn_agent
mock_result = _make_mock_result("Research complete.")
with _mock_spawn_deps():
with patch("openai_orchestrator.run", new_callable=AsyncMock, return_value=mock_result):
result = await spawn_agent(
task="Test background research",
role="research",
background=True,
)
assert "Agent started in background" in result
assert "ID:" in result
@pytest.mark.asyncio
async def test_background_registers_agent(self):
import agent_manager
from tools.agents import spawn_agent
mock_result = _make_mock_result()
with _mock_spawn_deps():
with patch("openai_orchestrator.run", new_callable=AsyncMock, return_value=mock_result):
await spawn_agent(task="Background task", background=True)
agents = agent_manager.list_agents("scott")
assert len(agents) >= 1
@pytest.mark.asyncio
async def test_background_agent_eventually_completes(self):
import agent_manager
from tools.agents import spawn_agent
mock_result = _make_mock_result("Task done!")
with _mock_spawn_deps():
with patch("openai_orchestrator.run", new_callable=AsyncMock, return_value=mock_result):
result = await spawn_agent(task="Quick task", background=True)
agent_id = result.split("ID: ")[1].split("\n")[0].strip()
# Poll while patches are still active
for _ in range(40):
rec = agent_manager.get(agent_id)
if rec and rec.status != "running":
break
await asyncio.sleep(0.05)
rec = agent_manager.get(agent_id)
assert rec is not None
assert rec.status == "done"
assert "Task done!" in (rec.result or "")
@pytest.mark.asyncio
async def test_background_sync_path_unchanged(self):
"""Verify that background=False still blocks and returns the result string."""
from tools.agents import spawn_agent
mock_result = _make_mock_result("Sync result here.")
with _mock_spawn_deps():
with patch("openai_orchestrator.run", new_callable=AsyncMock, return_value=mock_result):
result = await spawn_agent(task="Sync task", background=False)
assert result == "Sync result here."
@pytest.mark.asyncio
async def test_background_agent_timeout(self):
import agent_manager
from tools.agents import spawn_agent
async def _slow(*args, **kwargs):
await asyncio.sleep(60)
return _make_mock_result()
with _mock_spawn_deps():
with patch("openai_orchestrator.run", side_effect=_slow):
result = await spawn_agent(task="Slow task", background=True, timeout=1)
agent_id = result.split("ID: ")[1].split("\n")[0].strip()
# Poll while patches are still active (timeout=1s so this completes quickly)
for _ in range(60):
rec = agent_manager.get(agent_id)
if rec and rec.status != "running":
break
await asyncio.sleep(0.05)
rec = agent_manager.get(agent_id)
assert rec.status == "timeout"
@pytest.mark.asyncio
async def test_background_agent_failure(self):
import agent_manager
from tools.agents import spawn_agent
with _mock_spawn_deps():
with patch("openai_orchestrator.run", new_callable=AsyncMock, side_effect=RuntimeError("Boom")):
result = await spawn_agent(task="Failing task", background=True)
agent_id = result.split("ID: ")[1].split("\n")[0].strip()
for _ in range(20):
rec = agent_manager.get(agent_id)
if rec and rec.status != "running":
break
await asyncio.sleep(0.05)
assert agent_manager.get(agent_id).status == "failed"
# ---------------------------------------------------------------------------
# spawn_agent — level enforcement
# ---------------------------------------------------------------------------
class TestLevelEnforcement:
@pytest.mark.asyncio
async def test_l2_parent_denies_spawn_in_l3_child(self):
"""Level 2 agent spawning a child: spawn_agent and aider_run must be denied."""
from tools.agents import spawn_agent
captured_kwargs = {}
async def _capture_run(**kwargs):
captured_kwargs.update(kwargs)
return _make_mock_result()
with _mock_spawn_deps():
with patch("openai_orchestrator.run", side_effect=_capture_run):
await spawn_agent(
task="Test L3 enforcement",
background=False,
_agent_level=2, # this agent is Level 2; its child would be Level 3
)
# The orchestrator should have received spawn_agent and aider_run in confirm_deny
confirm_deny = captured_kwargs.get("confirm_deny", set())
assert "spawn_agent" in confirm_deny, "spawn_agent must be blocked for L3 children"
assert "aider_run" in confirm_deny, "aider_run must be blocked for L3 children"
@pytest.mark.asyncio
async def test_l1_parent_does_not_deny_spawn(self):
"""Level 1 agent (persona) spawning a Level 2 child: no extra denies."""
from tools.agents import spawn_agent
captured_kwargs = {}
async def _capture_run(**kwargs):
captured_kwargs.update(kwargs)
return _make_mock_result()
with _mock_spawn_deps():
with patch("openai_orchestrator.run", side_effect=_capture_run):
await spawn_agent(
task="Test L2 spawn",
background=False,
_agent_level=1, # persona is Level 1; child would be Level 2
)
confirm_deny = captured_kwargs.get("confirm_deny", set())
assert "spawn_agent" not in confirm_deny, "L2 agents must be allowed to spawn"
@pytest.mark.asyncio
async def test_l2_deny_intersected_with_tool_list(self):
"""When the role has an explicit tool_list, L3 deny removes from list directly."""
from tools.agents import spawn_agent
captured_kwargs = {}
async def _capture_run(**kwargs):
captured_kwargs.update(kwargs)
return _make_mock_result()
# Role has an explicit tool_list that includes spawn_agent
with _mock_spawn_deps(role_tools=["web_search", "spawn_agent", "aider_run"]):
with patch("openai_orchestrator.run", side_effect=_capture_run):
await spawn_agent(
task="Test",
background=False,
_agent_level=2,
)
# spawn_agent and aider_run must be absent from the tool_list passed to orchestrator
tool_list = captured_kwargs.get("tool_list", [])
assert "spawn_agent" not in tool_list
assert "aider_run" not in tool_list
assert "web_search" in tool_list # unrelated tools must survive
# ---------------------------------------------------------------------------
# Agent lifecycle tools — output formatting
# ---------------------------------------------------------------------------
class TestAgentLifecycleTools:
@pytest.mark.asyncio
async def test_agent_status_running(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="research", task="Do research", level=2)
with patch("persona.get_user", return_value="scott"):
from tools.agents import agent_status
output = await agent_status(rec.agent_id)
assert "running" in output
assert "research" in output
assert rec.agent_id[:8] in output
@pytest.mark.asyncio
async def test_agent_status_done(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="Task", level=2)
await agent_manager.finish(rec.agent_id, "The result text", "done")
with patch("persona.get_user", return_value="scott"):
from tools.agents import agent_status
output = await agent_status(rec.agent_id)
assert "done" in output
assert "The result text" in output
@pytest.mark.asyncio
async def test_agent_status_wrong_user(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
with patch("persona.get_user", return_value="holly"):
from tools.agents import agent_status
output = await agent_status(rec.agent_id)
assert "denied" in output.lower()
@pytest.mark.asyncio
async def test_agent_status_not_found(self):
with patch("persona.get_user", return_value="scott"):
from tools.agents import agent_status
output = await agent_status("nonexistent-id")
assert "No agent found" in output
@pytest.mark.asyncio
async def test_agent_list_shows_running(self):
import agent_manager
await agent_manager.register(user="scott", role="research", task="Research X", level=2)
await agent_manager.register(user="scott", role="coder", task="Fix bug", level=2)
with patch("persona.get_user", return_value="scott"):
from tools.agents import agent_list
output = await agent_list()
assert "2 agent(s)" in output
assert "research" in output
assert "coder" in output
@pytest.mark.asyncio
async def test_agent_list_status_filter(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
await agent_manager.finish(rec.agent_id, "done", "done")
await agent_manager.register(user="scott", role="chat", task="t2", level=2)
with patch("persona.get_user", return_value="scott"):
from tools.agents import agent_list
output = await agent_list(status="running")
assert "1 agent(s)" in output
@pytest.mark.asyncio
async def test_agent_list_empty(self):
with patch("persona.get_user", return_value="scott"):
from tools.agents import agent_list
output = await agent_list()
assert "No agents found" in output
@pytest.mark.asyncio
async def test_agent_cancel_tool(self):
import agent_manager
rec = await agent_manager.register(user="scott", role="chat", task="t", level=2)
with patch("persona.get_user", return_value="scott"):
from tools.agents import agent_cancel
output = await agent_cancel(rec.agent_id)
assert "cancelled" in output
assert agent_manager.get(rec.agent_id).status == "cancelled"
# ---------------------------------------------------------------------------
# aider_run — background mode
# ---------------------------------------------------------------------------
class TestAiderRunBackground:
@pytest.mark.asyncio
async def test_background_returns_agent_id(self):
import agent_manager
async def _fake_proc(*args, **kwargs):
mock_proc = MagicMock()
mock_proc.communicate = AsyncMock(return_value=(b"All changes applied.", b""))
mock_proc.returncode = 0
return mock_proc
with (
patch("persona.get_user", return_value="scott"),
patch("model_registry.get_registry", return_value={"hosts": []}),
patch("asyncio.create_subprocess_exec", side_effect=_fake_proc),
):
from tools.aider import aider_run
result = await aider_run(
project=str(_CORTEX_DIR.parent), # use actual project root (exists)
task="Test background task",
background=True,
)
assert "Aider task started in background" in result
assert "ID:" in result
@pytest.mark.asyncio
async def test_background_agent_completes(self):
import agent_manager
async def _fake_proc(*args, **kwargs):
mock_proc = MagicMock()
mock_proc.communicate = AsyncMock(return_value=(b"Edits applied.", b""))
mock_proc.returncode = 0
return mock_proc
from tools.aider import aider_run
with (
patch("persona.get_user", return_value="scott"),
patch("model_registry.get_registry", return_value={"hosts": []}),
patch("asyncio.create_subprocess_exec", side_effect=_fake_proc),
):
result = await aider_run(
project=str(_CORTEX_DIR.parent),
task="Test",
background=True,
)
agent_id = result.split("ID: ")[1].split("\n")[0].strip()
# Poll while patches are still active
for _ in range(40):
rec = agent_manager.get(agent_id)
if rec and rec.status != "running":
break
await asyncio.sleep(0.05)
rec = agent_manager.get(agent_id)
assert rec.status == "done"
assert "Edits applied" in (rec.result or "")
@pytest.mark.asyncio
async def test_invalid_project_directory(self):
from tools.aider import aider_run
result = await aider_run(project="/this/does/not/exist", task="Test")
assert "does not exist" in result
@pytest.mark.asyncio
async def test_sync_path_still_works(self):
async def _fake_proc(*args, **kwargs):
mock_proc = MagicMock()
mock_proc.communicate = AsyncMock(return_value=(b"Done.", b""))
mock_proc.returncode = 0
return mock_proc
with (
patch("persona.get_user", return_value="scott"),
patch("model_registry.get_registry", return_value={"hosts": []}),
patch("asyncio.create_subprocess_exec", side_effect=_fake_proc),
):
from tools.aider import aider_run
result = await aider_run(
project=str(_CORTEX_DIR.parent),
task="Sync test",
background=False,
)
assert "Done." in result
# ---------------------------------------------------------------------------
# aider_run — credential resolver (_resolve_credentials)
# ---------------------------------------------------------------------------
class TestAiderCredentialResolver:
"""Pure unit tests for _resolve_credentials — no subprocess, no registry I/O."""
def _registry(self, hosts=None, anthropic_key=None):
reg = {"hosts": hosts or [], "providers": {}}
if anthropic_key:
reg["providers"]["anthropic"] = {
"credentials": [{"api_key": anthropic_key}]
}
return reg
def _host(self, label, api_url, api_key="sk-test", host_type="openai"):
return {"id": "x", "label": label, "api_url": api_url,
"api_key": api_key, "host_type": host_type}
# --- Provider detection ---
def test_openrouter_host_gets_api_key_flag(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("OpenRouter", "https://openrouter.ai/api/v1", "or-key"),
])
flags, model = _resolve_credentials(reg, None, None)
assert "--api-key" in flags
assert "openrouter=or-key" in flags
def test_anthropic_model_hint_uses_provider_key(self):
from tools.aider import _resolve_credentials
reg = self._registry(
hosts=[self._host("OpenRouter", "https://openrouter.ai/api/v1")],
anthropic_key="ant-key",
)
flags, model = _resolve_credentials(reg, "claude-3-5-sonnet-20241022", None)
assert "anthropic=ant-key" in flags
assert model == "claude-3-5-sonnet-20241022"
def test_anthropic_slash_prefix_hint(self):
from tools.aider import _resolve_credentials
reg = self._registry(anthropic_key="ant-key")
flags, _ = _resolve_credentials(reg, "anthropic/claude-opus-4", None)
assert "anthropic=ant-key" in flags
def test_local_openwebui_host_gets_base_url(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("Local", "http://192.168.32.19:3000", "localkey", host_type="openwebui"),
])
flags, model = _resolve_credentials(reg, None, None)
assert "--openai-api-base" in flags
base = flags[flags.index("--openai-api-base") + 1]
assert base == "http://192.168.32.19:3000/api"
assert "--openai-api-key" in flags
def test_local_host_appends_api_suffix_for_openwebui(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("OpenWebUI", "http://localhost:3000", host_type="openwebui"),
])
flags, _ = _resolve_credentials(reg, None, None)
base = flags[flags.index("--openai-api-base") + 1]
assert base.endswith("/api")
def test_generic_openai_host_no_api_suffix(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("Custom", "http://localhost:8080/v1", host_type="openai"),
])
flags, _ = _resolve_credentials(reg, None, None)
base = flags[flags.index("--openai-api-base") + 1]
assert not base.endswith("/api")
assert base == "http://localhost:8080/v1"
# --- Model name adjustment ---
def test_local_host_prefixes_model_without_slash(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("Local", "http://localhost:3000", host_type="openwebui"),
])
_, model = _resolve_credentials(reg, "gemma-4-27b-it", None)
assert model == "openai/gemma-4-27b-it"
def test_local_host_leaves_model_with_slash(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("Local", "http://localhost:3000", host_type="openwebui"),
])
_, model = _resolve_credentials(reg, "ollama/gemma4", None)
assert model == "ollama/gemma4" # already prefixed, don't touch
def test_cloud_provider_does_not_prefix_model(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("OpenRouter", "https://openrouter.ai/api/v1"),
])
_, model = _resolve_credentials(reg, "google/gemma-3-27b-it", None)
assert model == "google/gemma-3-27b-it"
# --- Host label override ---
def test_host_label_selects_local_over_openrouter(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("OpenRouter", "https://openrouter.ai/api/v1", "or-key"),
self._host("Local RTX", "http://192.168.32.19:3000", "local-key", host_type="openwebui"),
])
flags, _ = _resolve_credentials(reg, None, "Local")
assert "--openai-api-base" in flags
assert "--api-key" not in flags
def test_host_label_case_insensitive(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("OpenRouter", "https://openrouter.ai/api/v1", "or-key"),
])
flags, _ = _resolve_credentials(reg, None, "openrouter")
assert "openrouter=or-key" in flags
# --- Model prefix routing ---
def test_model_openrouter_prefix_routes_to_openrouter(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("Local", "http://localhost:3000", host_type="openwebui"),
self._host("OpenRouter", "https://openrouter.ai/api/v1", "or-key"),
])
flags, model = _resolve_credentials(reg, "openrouter/google/gemma-3-27b-it", None)
assert "openrouter=or-key" in flags
assert model == "openrouter/google/gemma-3-27b-it"
def test_model_groq_prefix_routes_to_groq_host(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("Groq", "https://api.groq.com/openai/v1", "groq-key"),
])
flags, _ = _resolve_credentials(reg, "groq/llama-3.3-70b", None)
assert "groq=groq-key" in flags
# --- Default fallback priority ---
def test_prefers_openrouter_over_local_when_no_hint(self):
from tools.aider import _resolve_credentials
reg = self._registry(hosts=[
self._host("Local", "http://localhost:3000", host_type="openwebui"),
self._host("OpenRouter", "https://openrouter.ai/api/v1", "or-key"),
])
flags, _ = _resolve_credentials(reg, None, None)
assert "openrouter=or-key" in flags
def test_prefers_anthropic_over_local_when_no_openrouter(self):
from tools.aider import _resolve_credentials
reg = self._registry(
hosts=[self._host("Local", "http://localhost:3000", host_type="openwebui")],
anthropic_key="ant-key",
)
flags, _ = _resolve_credentials(reg, None, None)
assert "anthropic=ant-key" in flags
def test_empty_registry_returns_no_flags(self):
from tools.aider import _resolve_credentials
flags, model = _resolve_credentials({}, "gemma-4", None)
assert flags == []
assert model == "gemma-4"
# ---------------------------------------------------------------------------
# Helpers for manual test record creation (used in list tests above)
# ---------------------------------------------------------------------------
import agent_manager as _am
_CORTEX_DIR = _am.__file__ and _am and __import__("pathlib").Path(_am.__file__).parent
def _make_record(agent_id: str, user: str, status: str) -> "_am.AgentRecord":
from datetime import datetime
import agent_manager
rec = agent_manager.AgentRecord(
agent_id=agent_id,
level=2,
role="chat",
task="test task",
status=status,
started=datetime.now(),
user=user,
finished=datetime.now() if status != "running" else None,
)
return rec