""" Orchestrator router — POST /orchestrate, GET /orchestrate/{job_id} Accepts a task description, runs it through the orchestrator engine (Gemini tool loop → Claude response), and returns the result. Designed to be triggered from: - The Cortex web UI (future "Agent mode" toggle) - Cron jobs: curl -X POST http://localhost:8000/orchestrate -d '{"task":"..."}' - Webhooks: Gitea, Aether events, etc. """ import asyncio import logging import uuid from datetime import datetime, timezone from fastapi import APIRouter from pydantic import BaseModel from config import settings from context_loader import load_context from persona import set_context, validate as validate_persona import orchestrator_engine logger = logging.getLogger(__name__) router = APIRouter(prefix="/orchestrate", tags=["orchestrator"]) # --------------------------------------------------------------------------- # In-memory job store # Jobs are keyed by UUID. For this phase, memory is fine — jobs are short-lived. # --------------------------------------------------------------------------- _jobs: dict[str, dict] = {} _jobs_lock = asyncio.Lock() # --------------------------------------------------------------------------- # Request / response models # --------------------------------------------------------------------------- class OrchestrateRequest(BaseModel): task: str session_id: str | None = None # include session history in context tier: int | None = None # Inara context tier (default from settings) respond_with_claude: bool = True # False = return Gemini summary only (faster, for cron) include_long: bool = True include_mid: bool = True include_short: bool = True user: str = "scott" persona: str = "inara" class OrchestrateResponse(BaseModel): job_id: str status: str # "queued" | "running" | "complete" | "error" class JobStatusResponse(BaseModel): job_id: str status: str task: str created_at: str completed_at: str | None = None session_id: str | None = None response: str | None = None tool_calls: list[dict] | None = None backend: str | None = None gemini_summary: str | None = None error: str | None = None # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @router.post("", response_model=OrchestrateResponse) async def orchestrate(req: OrchestrateRequest) -> OrchestrateResponse: """Submit a task to the orchestrator. Returns a job_id to poll.""" try: user, persona = validate_persona(req.user, req.persona) set_context(user, persona) except ValueError as e: from fastapi import HTTPException raise HTTPException(status_code=400, detail=str(e)) job_id = str(uuid.uuid4()) now = datetime.now(timezone.utc).isoformat() job: dict = { "job_id": job_id, "status": "queued", "task": req.task, "created_at": now, "completed_at": None, "response": None, "tool_calls": None, "backend": None, "gemini_summary": None, "error": None, } async with _jobs_lock: _jobs[job_id] = job # Run in background — caller polls GET /orchestrate/{job_id} asyncio.create_task(_run_job(job_id, req)) logger.info("Orchestrator job queued: %s — %.80s", job_id, req.task) return OrchestrateResponse(job_id=job_id, status="queued") @router.get("/{job_id}", response_model=JobStatusResponse) async def job_status(job_id: str) -> JobStatusResponse: """Poll the status of an orchestrator job.""" async with _jobs_lock: job = _jobs.get(job_id) if job is None: from fastapi import HTTPException raise HTTPException(status_code=404, detail=f"Job {job_id} not found") return JobStatusResponse(**job) @router.get("", response_model=list[JobStatusResponse]) async def list_jobs() -> list[JobStatusResponse]: """List all jobs (most recent first). Useful for debugging.""" async with _jobs_lock: jobs = sorted(_jobs.values(), key=lambda j: j["created_at"], reverse=True) return [JobStatusResponse(**j) for j in jobs] # --------------------------------------------------------------------------- # Background runner # --------------------------------------------------------------------------- async def _run_job(job_id: str, req: OrchestrateRequest) -> None: """Execute the orchestration job and update the job store.""" async with _jobs_lock: _jobs[job_id]["status"] = "running" try: from session_store import load as load_session, save as save_session, generate_session_id # Load Inara's system prompt (same as the chat router does) tier = req.tier or settings.default_tier system_prompt = load_context( tier, include_long=req.include_long, include_mid=req.include_mid, include_short=req.include_short, ) # Load session history if a session_id was provided session_id = req.session_id or generate_session_id() history = load_session(session_id) session_messages = history or None result = await orchestrator_engine.run( task=req.task, system_prompt=system_prompt, session_messages=session_messages, respond_with_claude=req.respond_with_claude, ) # Save the turn to the session store so it survives a page refresh history.append({"role": "user", "content": req.task}) history.append({"role": "assistant", "content": result.response}) save_session(session_id, history) from session_logger import log_turn log_turn(session_id, req.task, result.response) now = datetime.now(timezone.utc).isoformat() async with _jobs_lock: _jobs[job_id].update({ "status": "complete", "completed_at": now, "session_id": session_id, "response": result.response, "tool_calls": result.tool_calls, "backend": result.backend, "gemini_summary": result.gemini_summary, }) logger.info("Orchestrator job complete: %s (%d tool calls)", job_id, len(result.tool_calls)) except Exception as e: logger.exception("Orchestrator job failed: %s", job_id) now = datetime.now(timezone.utc).isoformat() async with _jobs_lock: _jobs[job_id].update({ "status": "error", "completed_at": now, "error": str(e), })