Each role can now disable the current date/time header injected into the system prompt. Default is true (all existing roles unchanged). Useful for pure processing roles (summarizer, classifier, translator) where temporal context is irrelevant or could cause unexpected model behavior. Changes: - model_registry: set_role_config/get_role_config gain inject_datetime field - context_loader: load_context gains inject_datetime param (default True) - orchestrator router: passes inject_datetime from role_cfg to load_context - local_llm router: reads inject_datetime from POST body, passes to registry; role_config_data_js includes the field - local_llm.html: checkbox in role config panel; populate on open, save on submit Session logs still timestamp every turn (HH:MM header in YYYY-MM-DD.md files) regardless of this setting — the toggle only affects the system prompt header. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
367 lines
14 KiB
Python
367 lines
14 KiB
Python
"""
|
|
Orchestrator router — POST /orchestrate, GET /orchestrate/{job_id}
|
|
|
|
Accepts a task description, runs it through the orchestrator engine
|
|
(Gemini tool loop → Claude response), and returns the result.
|
|
|
|
Designed to be triggered from:
|
|
- The Cortex web UI (future "Agent mode" toggle)
|
|
- Cron jobs: curl -X POST http://localhost:8000/orchestrate -d '{"task":"..."}'
|
|
- Webhooks: Gitea, Aether events, etc.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import uuid
|
|
from datetime import datetime, timezone
|
|
|
|
from fastapi import APIRouter, HTTPException
|
|
from pydantic import BaseModel
|
|
|
|
from auth_utils import get_user_gemini_key, get_user_role, get_tool_policy
|
|
from config import settings
|
|
from context_loader import load_context
|
|
from persona import set_context, validate as validate_persona
|
|
import model_registry
|
|
import orchestrator_engine
|
|
import openai_orchestrator
|
|
|
|
logger = logging.getLogger(__name__)
|
|
router = APIRouter(prefix="/orchestrate", tags=["orchestrator"])
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# In-memory job store
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_jobs: dict[str, dict] = {}
|
|
_jobs_lock = asyncio.Lock()
|
|
|
|
# Checkpoints are stored separately — they hold Python objects (types.Content, etc.)
|
|
# that can't be included in the JSON-serializable job dict.
|
|
_checkpoints: dict[str, orchestrator_engine.OrchestrateCheckpoint] = {}
|
|
_checkpoints_lock = asyncio.Lock()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Request / response models
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class OrchestrateRequest(BaseModel):
|
|
task: str
|
|
session_id: str | None = None # include session history in context
|
|
tier: int | None = None # Inara context tier (default from settings)
|
|
respond_with_claude: bool = True # False = return Gemini summary only (faster, for cron)
|
|
include_long: bool = True
|
|
include_mid: bool = True
|
|
include_short: bool = True
|
|
user: str = "scott"
|
|
persona: str = "inara"
|
|
chat_role: str = "chat" # role used for the final response (decoupled from tool-loop model)
|
|
|
|
|
|
class OrchestrateResponse(BaseModel):
|
|
job_id: str
|
|
status: str # "queued" | "running" | "complete" | "error" | "awaiting_confirmation"
|
|
|
|
|
|
class JobStatusResponse(BaseModel):
|
|
job_id: str
|
|
status: str
|
|
task: str
|
|
created_at: str
|
|
completed_at: str | None = None
|
|
session_id: str | None = None
|
|
response: str | None = None
|
|
tool_calls: list[dict] | None = None
|
|
backend: str | None = None
|
|
gemini_summary: str | None = None
|
|
error: str | None = None
|
|
pending_confirmation: dict | None = None # {tools: [{name, args}], message: str}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@router.post("", response_model=OrchestrateResponse)
|
|
async def orchestrate(req: OrchestrateRequest) -> OrchestrateResponse:
|
|
"""Submit a task to the orchestrator. Returns a job_id to poll."""
|
|
try:
|
|
user, persona = validate_persona(req.user, req.persona)
|
|
set_context(user, persona)
|
|
except ValueError as e:
|
|
raise HTTPException(status_code=400, detail=str(e))
|
|
|
|
job_id = str(uuid.uuid4())
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
|
|
job: dict = {
|
|
"job_id": job_id,
|
|
"status": "queued",
|
|
"task": req.task,
|
|
"created_at": now,
|
|
"completed_at": None,
|
|
"session_id": None,
|
|
"response": None,
|
|
"tool_calls": None,
|
|
"backend": None,
|
|
"gemini_summary": None,
|
|
"error": None,
|
|
"pending_confirmation": None,
|
|
"_user": user,
|
|
}
|
|
|
|
async with _jobs_lock:
|
|
_jobs[job_id] = job
|
|
|
|
asyncio.create_task(_run_job(job_id, req, user))
|
|
logger.info("Orchestrator job queued: %s — %.80s", job_id, req.task)
|
|
return OrchestrateResponse(job_id=job_id, status="queued")
|
|
|
|
|
|
@router.get("/{job_id}", response_model=JobStatusResponse)
|
|
async def job_status(job_id: str) -> JobStatusResponse:
|
|
"""Poll the status of an orchestrator job."""
|
|
async with _jobs_lock:
|
|
job = _jobs.get(job_id)
|
|
|
|
if job is None:
|
|
raise HTTPException(status_code=404, detail=f"Job {job_id} not found")
|
|
|
|
return JobStatusResponse(**{k: v for k, v in job.items() if not k.startswith("_")})
|
|
|
|
|
|
@router.get("", response_model=list[JobStatusResponse])
|
|
async def list_jobs() -> list[JobStatusResponse]:
|
|
"""List all jobs (most recent first). Useful for debugging."""
|
|
async with _jobs_lock:
|
|
jobs = sorted(_jobs.values(), key=lambda j: j["created_at"], reverse=True)
|
|
return [JobStatusResponse(**{k: v for k, v in j.items() if not k.startswith("_")}) for j in jobs]
|
|
|
|
|
|
@router.post("/{job_id}/confirm", response_model=OrchestrateResponse)
|
|
async def confirm_job(job_id: str) -> OrchestrateResponse:
|
|
"""Confirm a pending tool call — the blocked tool will execute and the job continues."""
|
|
async with _checkpoints_lock:
|
|
checkpoint = _checkpoints.pop(job_id, None)
|
|
|
|
if checkpoint is None:
|
|
raise HTTPException(status_code=404, detail="No pending confirmation for this job")
|
|
|
|
async with _jobs_lock:
|
|
job = _jobs.get(job_id)
|
|
if not job or job["status"] != "awaiting_confirmation":
|
|
raise HTTPException(status_code=409, detail="Job is not awaiting confirmation")
|
|
_jobs[job_id]["status"] = "running"
|
|
_jobs[job_id]["pending_confirmation"] = None
|
|
user = job.get("_user", "scott")
|
|
|
|
asyncio.create_task(_resume_job(job_id, checkpoint, confirmed=True, user=user))
|
|
logger.info("Orchestrator job %s confirmed — resuming", job_id)
|
|
return OrchestrateResponse(job_id=job_id, status="running")
|
|
|
|
|
|
@router.post("/{job_id}/deny", response_model=OrchestrateResponse)
|
|
async def deny_job(job_id: str) -> OrchestrateResponse:
|
|
"""Deny a pending tool call — the tool is skipped and the job produces a final response."""
|
|
async with _checkpoints_lock:
|
|
checkpoint = _checkpoints.pop(job_id, None)
|
|
|
|
if checkpoint is None:
|
|
raise HTTPException(status_code=404, detail="No pending confirmation for this job")
|
|
|
|
async with _jobs_lock:
|
|
job = _jobs.get(job_id)
|
|
if not job or job["status"] != "awaiting_confirmation":
|
|
raise HTTPException(status_code=409, detail="Job is not awaiting confirmation")
|
|
_jobs[job_id]["status"] = "running"
|
|
_jobs[job_id]["pending_confirmation"] = None
|
|
user = job.get("_user", "scott")
|
|
|
|
asyncio.create_task(_resume_job(job_id, checkpoint, confirmed=False, user=user))
|
|
logger.info("Orchestrator job %s denied — resuming with skip", job_id)
|
|
return OrchestrateResponse(job_id=job_id, status="running")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Background runners
|
|
# ---------------------------------------------------------------------------
|
|
|
|
async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
|
|
"""Execute the orchestration job and update the job store."""
|
|
async with _jobs_lock:
|
|
_jobs[job_id]["status"] = "running"
|
|
|
|
try:
|
|
from session_store import load as load_session, save as save_session, generate_session_id
|
|
|
|
tier = req.tier or settings.default_tier
|
|
role_cfg = model_registry.get_role_config(user, req.chat_role)
|
|
system_prompt = load_context(
|
|
tier,
|
|
include_long=req.include_long,
|
|
include_mid=req.include_mid,
|
|
include_short=req.include_short,
|
|
role_append=role_cfg.get("system_append", ""),
|
|
inject_datetime=role_cfg.get("inject_datetime", True),
|
|
)
|
|
|
|
session_id = req.session_id or generate_session_id()
|
|
history = load_session(session_id)
|
|
session_messages = history or None
|
|
|
|
orch_model = model_registry.get_model_for_role(user, "orchestrator")
|
|
user_role = get_user_role(user)
|
|
tool_list = role_cfg.get("tools")
|
|
|
|
policy = get_tool_policy(user)
|
|
confirm_allow = set(policy.get("allow", []))
|
|
confirm_deny = set(policy.get("deny", []))
|
|
|
|
if orch_model and orch_model.get("type") == "local_openai":
|
|
result = await openai_orchestrator.run(
|
|
task=req.task,
|
|
system_prompt=system_prompt,
|
|
session_messages=session_messages,
|
|
model_cfg=orch_model,
|
|
respond_with_final=req.respond_with_claude,
|
|
user_role=user_role,
|
|
tool_list=tool_list,
|
|
confirm_allow=confirm_allow,
|
|
confirm_deny=confirm_deny,
|
|
)
|
|
else:
|
|
gemini_key = (
|
|
(orch_model.get("api_key") if orch_model else None)
|
|
or get_user_gemini_key(user)
|
|
)
|
|
result = await orchestrator_engine.run(
|
|
task=req.task,
|
|
system_prompt=system_prompt,
|
|
session_messages=session_messages,
|
|
respond_with_claude=req.respond_with_claude,
|
|
gemini_api_key=gemini_key,
|
|
model_name=orch_model.get("model_name") if orch_model else None,
|
|
response_role=req.chat_role,
|
|
user_role=user_role,
|
|
tool_list=tool_list,
|
|
confirm_allow=confirm_allow,
|
|
confirm_deny=confirm_deny,
|
|
)
|
|
|
|
if result.checkpoint:
|
|
async with _checkpoints_lock:
|
|
_checkpoints[job_id] = result.checkpoint
|
|
async with _jobs_lock:
|
|
_jobs[job_id].update({
|
|
"status": "awaiting_confirmation",
|
|
"response": result.response,
|
|
"tool_calls": result.tool_calls,
|
|
"backend": result.backend,
|
|
"gemini_summary": result.gemini_summary,
|
|
"session_id": session_id,
|
|
"pending_confirmation": {
|
|
"tools": result.checkpoint.pending_tools,
|
|
"message": result.response,
|
|
},
|
|
})
|
|
logger.info("Orchestrator job %s awaiting confirmation — %d tool(s) blocked",
|
|
job_id, len(result.checkpoint.pending_tools))
|
|
return
|
|
|
|
await _finalize_job(job_id, result, session_id, req.task, history)
|
|
|
|
except Exception as e:
|
|
logger.exception("Orchestrator job failed: %s", job_id)
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
async with _jobs_lock:
|
|
_jobs[job_id].update({
|
|
"status": "error",
|
|
"completed_at": now,
|
|
"error": str(e),
|
|
})
|
|
|
|
|
|
async def _resume_job(
|
|
job_id: str,
|
|
checkpoint: orchestrator_engine.OrchestrateCheckpoint,
|
|
confirmed: bool,
|
|
user: str,
|
|
) -> None:
|
|
"""Resume a job after the user confirms or denies a pending tool call."""
|
|
try:
|
|
if checkpoint.engine == "gemini":
|
|
result = await orchestrator_engine.resume(checkpoint, confirmed)
|
|
else:
|
|
result = await openai_orchestrator.resume(checkpoint, confirmed)
|
|
|
|
if result.checkpoint:
|
|
# Another confirmation needed (chained gates)
|
|
async with _checkpoints_lock:
|
|
_checkpoints[job_id] = result.checkpoint
|
|
async with _jobs_lock:
|
|
_jobs[job_id].update({
|
|
"status": "awaiting_confirmation",
|
|
"response": result.response,
|
|
"tool_calls": result.tool_calls,
|
|
"backend": result.backend,
|
|
"gemini_summary": result.gemini_summary,
|
|
"pending_confirmation": {
|
|
"tools": result.checkpoint.pending_tools,
|
|
"message": result.response,
|
|
},
|
|
})
|
|
logger.info("Orchestrator job %s awaiting another confirmation", job_id)
|
|
return
|
|
|
|
async with _jobs_lock:
|
|
session_id = _jobs[job_id].get("session_id") or ""
|
|
task = _jobs[job_id].get("task", "")
|
|
|
|
from session_store import load as load_session
|
|
history = load_session(session_id) if session_id else []
|
|
await _finalize_job(job_id, result, session_id, task, history)
|
|
|
|
except Exception as e:
|
|
logger.exception("Orchestrator resume failed: %s", job_id)
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
async with _jobs_lock:
|
|
_jobs[job_id].update({
|
|
"status": "error",
|
|
"completed_at": now,
|
|
"error": str(e),
|
|
})
|
|
|
|
|
|
async def _finalize_job(
|
|
job_id: str,
|
|
result: orchestrator_engine.OrchestratorResult,
|
|
session_id: str,
|
|
task: str,
|
|
history: list,
|
|
) -> None:
|
|
"""Save session, log the turn, and mark the job complete."""
|
|
from session_store import save as save_session, generate_session_id
|
|
from session_logger import log_turn
|
|
|
|
if not session_id:
|
|
session_id = generate_session_id()
|
|
|
|
history.append({"role": "user", "content": task})
|
|
history.append({"role": "assistant", "content": result.response})
|
|
save_session(session_id, history)
|
|
log_turn(session_id, task, result.response)
|
|
|
|
now = datetime.now(timezone.utc).isoformat()
|
|
async with _jobs_lock:
|
|
_jobs[job_id].update({
|
|
"status": "complete",
|
|
"completed_at": now,
|
|
"session_id": session_id,
|
|
"response": result.response,
|
|
"tool_calls": result.tool_calls,
|
|
"backend": result.backend,
|
|
"gemini_summary": result.gemini_summary,
|
|
})
|
|
logger.info("Orchestrator job complete: %s (%d tool calls)", job_id, len(result.tool_calls))
|