""" CLI auth status for both Claude and Gemini backends. GET /auth/status — returns per-backend auth info and warning flags Claude: warns when OAuth token is < WARN_HOURS from expiry (requires user to re-run `claude` to refresh via browser flow). Gemini: warns only when oauth_creds.json is missing or has no refresh_token (access token rotates automatically every ~1h). """ import json import logging from datetime import datetime, timezone from pathlib import Path from fastapi import APIRouter from config import settings logger = logging.getLogger(__name__) router = APIRouter(prefix="/auth") CLAUDE_CREDS = Path.home() / ".claude" / ".credentials.json" GEMINI_CREDS = Path.home() / ".gemini" / "oauth_creds.json" GEMINI_ACCTS = Path.home() / ".gemini" / "google_accounts.json" WARN_HOURS = 24 # no refresh token — warn a day ahead WARN_HOURS_REFRESH = 1 # refresh token present — only warn if CLI hasn't rotated in time def _claude_status() -> dict: try: data = json.loads(CLAUDE_CREDS.read_text()) oauth = data["claudeAiOauth"] has_refresh = bool(oauth.get("refreshToken")) expires_dt = datetime.fromtimestamp(oauth["expiresAt"] / 1000, tz=timezone.utc) now = datetime.now(tz=timezone.utc) hours_remaining = (expires_dt - now).total_seconds() / 3600 # When a refresh token is present the CLI *should* auto-rotate the access # token, but sometimes it doesn't. Use a tight 1-hour window so a fresh # 8-hour token doesn't immediately trigger a warning, but a stale token # that the CLI missed will still surface before it expires. expired = hours_remaining <= 0 threshold = WARN_HOURS_REFRESH if has_refresh else WARN_HOURS warning = expired or hours_remaining < threshold return { "ok": True, "has_refresh_token": has_refresh, "access_token_expires_at": expires_dt.isoformat(), "access_token_hours_remaining": round(hours_remaining, 1), "warning": warning, "expired": expired, } except Exception as e: logger.warning("claude auth check failed: %s", e) return {"ok": False, "error": str(e), "warning": True, "expired": False} def _gemini_status() -> dict: try: creds = json.loads(GEMINI_CREDS.read_text()) if not creds.get("refresh_token"): return {"ok": True, "authenticated": False, "warning": True, "account": None} account = None try: accts = json.loads(GEMINI_ACCTS.read_text()) account = accts.get("active") except Exception: pass return {"ok": True, "authenticated": True, "warning": False, "account": account} except FileNotFoundError: return {"ok": True, "authenticated": False, "warning": True, "account": None} except Exception as e: logger.warning("gemini auth check failed: %s", e) return {"ok": False, "error": str(e), "warning": True, "authenticated": False} async def _local_status(username: str = "scott") -> dict: """Check reachability of the user's configured local model host.""" import model_registry cfg = model_registry.get_best_local_model(username) if not cfg: return {"configured": False} api_url = cfg.get("api_url", "") if not api_url: return {"configured": False} try: import httpx url = api_url.rstrip("/") + "/api/models" headers = {} api_key = cfg.get("api_key", "") if api_key: headers["Authorization"] = f"Bearer {api_key}" async with httpx.AsyncClient(timeout=5) as client: resp = await client.get(url, headers=headers) reachable = resp.status_code < 400 return { "configured": True, "reachable": reachable, "model": cfg.get("model_name", ""), "label": cfg.get("label", ""), } except Exception as e: return {"configured": True, "reachable": False, "error": str(e), "model": cfg.get("model_name", "")} @router.get("/status") async def auth_status() -> dict: return { "claude": _claude_status(), "gemini": _gemini_status(), "local": await _local_status(), }