feat: live progress updates during orchestrator tool loop

The thinking bubble now shows real-time status instead of a static spinner:
   Round 1 — thinking…
   Round 1 — web_search
   Round 2 — thinking…
   Generating response…

Implementation: async on_progress callback passed from _run_job into both
orchestrators (_run_from_messages / _run_from_contents). Callback writes to
job["progress"] under the jobs lock; poll responses include the field;
app.js displays it in the thinking bubble when present.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Scott Idem
2026-06-16 22:55:14 -04:00
parent 4fdb9fd0c7
commit c31eba111f
4 changed files with 37 additions and 4 deletions

View File

@@ -52,6 +52,7 @@ async def run(
max_risk: str | None = None,
risk_whitelist: list[str] | None = None,
risk_blacklist: list[str] | None = None,
on_progress=None, # async (str) -> None; called with live status updates
) -> OrchestratorResult:
"""
Run a tool-enabled task using an OpenAI-compatible API.
@@ -117,6 +118,7 @@ async def run(
confirm_allow=_confirm_allow,
confirm_deny=_confirm_deny,
starting_round=0,
on_progress=on_progress,
)
if checkpoint:
@@ -307,6 +309,7 @@ async def _run_from_messages(
confirm_deny: frozenset,
starting_round: int = 0,
tool_list: list[str] | None = None,
on_progress=None,
) -> tuple[str, OrchestrateCheckpoint | None]:
"""
Run the OpenAI ReAct loop from the current messages state.
@@ -323,6 +326,8 @@ async def _run_from_messages(
est = _estimate_tokens(messages)
logger.info("OpenAI orchestrator round %d / %d model=%s ~%d tokens",
round_num + 1, effective_limit, model_name, est)
if on_progress:
await on_progress(f"Round {round_num + 1} — thinking…")
call_kwargs: dict = {"model": model_name, "messages": messages}
if active_tools:
@@ -373,6 +378,8 @@ async def _run_from_messages(
pending_tools.append({"name": name, "args": args_parsed, "tool_call_id": tc.id})
logger.info("Tool %s blocked — confirmation required", name)
else:
if on_progress:
await on_progress(f"Round {round_num + 1}{name}")
result_str = await _execute_tool(name, tc.function.arguments, user_role, tool_list)
logger.info("Tool %s%d chars", name, len(result_str))
executed_results.append({"name": name, "args": args_parsed, "result": result_str, "tool_call_id": tc.id})
@@ -415,6 +422,8 @@ async def _run_from_messages(
return final_response, checkpoint
else:
if on_progress:
await on_progress("Generating response…")
final_response = msg.content or ""
logger.info(
"OpenAI orchestrator done after %d round(s). Tools used: %d",

View File

@@ -120,6 +120,7 @@ async def run(
max_risk: str | None = None,
risk_whitelist: list[str] | None = None,
risk_blacklist: list[str] | None = None,
on_progress=None, # async (str) -> None; called with live status updates
) -> OrchestratorResult:
"""
Run the full orchestration loop for a task.
@@ -183,6 +184,7 @@ async def run(
starting_round=0,
gemini_api_key=api_key,
max_rounds=max_rounds,
on_progress=on_progress,
)
if checkpoint:
@@ -194,6 +196,9 @@ async def run(
checkpoint=checkpoint,
)
if on_progress:
await on_progress("Generating response…")
return await _claude_handoff(
task=task,
tool_call_log=tool_call_log,
@@ -306,6 +311,7 @@ async def _run_from_contents(
gemini_api_key: str | None = None,
tool_list: list[str] | None = None,
max_rounds: int | None = None,
on_progress=None,
) -> tuple[str, OrchestrateCheckpoint | None]:
"""
Run the ReAct loop from the current contents state.
@@ -317,6 +323,8 @@ async def _run_from_contents(
for round_num in range(starting_round, effective_limit):
logger.info("Orchestrator round %d for task: %.80s", round_num + 1, task)
if on_progress:
await on_progress(f"Round {round_num + 1} — thinking…")
response = await asyncio.to_thread(
client.models.generate_content,
@@ -363,6 +371,8 @@ async def _run_from_contents(
pending_tools.append({"name": name, "args": args})
logger.info("Tool %s blocked — confirmation required", name)
else:
if on_progress:
await on_progress(f"Round {round_num + 1}{name}")
result_str = await _execute_tool(name, args, tool_callables)
logger.info("Tool %s%d chars", name, len(result_str))
executed_results.append({"name": name, "args": args, "result": result_str})

View File

@@ -81,6 +81,7 @@ class JobStatusResponse(BaseModel):
gemini_summary: str | None = None
error: str | None = None
pending_confirmation: dict | None = None # {tools: [{name, args}], message: str}
progress: str | None = None # live status text shown in UI during run
# ---------------------------------------------------------------------------
@@ -112,6 +113,7 @@ async def orchestrate(req: OrchestrateRequest) -> OrchestrateResponse:
"gemini_summary": None,
"error": None,
"pending_confirmation": None,
"progress": None,
"_user": user,
"_off_record": req.off_record,
}
@@ -197,6 +199,11 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
async with _jobs_lock:
_jobs[job_id]["status"] = "running"
async def _on_progress(msg: str) -> None:
async with _jobs_lock:
if job_id in _jobs:
_jobs[job_id]["progress"] = msg
try:
from session_store import load as load_session, save as save_session, generate_session_id
@@ -240,6 +247,7 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
max_risk=max_risk,
risk_whitelist=risk_wl,
risk_blacklist=risk_bl,
on_progress=_on_progress,
)
else:
gemini_key = (
@@ -262,6 +270,7 @@ async def _run_job(job_id: str, req: OrchestrateRequest, user: str) -> None:
max_risk=max_risk,
risk_whitelist=risk_wl,
risk_blacklist=risk_bl,
on_progress=_on_progress,
)
if result.checkpoint:

View File

@@ -1490,11 +1490,16 @@
if (!pollRes.ok) throw new Error(`Poll failed: HTTP ${pollRes.status}`);
job = await pollRes.json();
const n = job.tool_calls?.length || 0;
if (job.status === 'queued' || job.status === 'running') {
const prog = job.progress;
const n = job.tool_calls?.length || 0;
if (prog) {
thinkingDiv.textContent = `${prog}`;
} else {
thinkingDiv.textContent = n
? `⚡ working… (${n} tool${n !== 1 ? 's' : ''} used)`
: '⚡ working…';
}
continue;
}