""" Google OAuth 2.0 sign-in. Flow: 1. GET /auth/google → redirect to Google's consent page 2. GET /auth/google/callback → exchange code, look up user, set JWT cookie Users must be pre-registered by Scott before they can sign in: cd cortex && .venv/bin/python manage_passwords.py google-add Routes are public (added to _PUBLIC_PREFIXES in auth_middleware.py). """ import json import logging import secrets import urllib.parse import urllib.request from fastapi import APIRouter, Request from fastapi.responses import HTMLResponse, RedirectResponse, Response from auth_utils import COOKIE_NAME, create_token, find_user_by_google, link_google from config import settings from persona import list_user_personas logger = logging.getLogger(__name__) router = APIRouter() _GOOGLE_AUTH_URL = "https://accounts.google.com/o/oauth2/v2/auth" _GOOGLE_TOKEN_URL = "https://oauth2.googleapis.com/token" _GOOGLE_USERINFO = "https://openidconnect.googleapis.com/v1/userinfo" _STATE_COOKIE = "oauth_state" _STATE_MAX_AGE = 600 # 10 minutes — plenty of time to complete the flow @router.get("/auth/google", include_in_schema=False) async def google_login(): if not settings.google_client_id: return HTMLResponse("Google sign-in is not configured on this server.", status_code=503) state = secrets.token_urlsafe(16) params = urllib.parse.urlencode({ "client_id": settings.google_client_id, "redirect_uri": f"{settings.cortex_base_url}/auth/google/callback", "response_type": "code", "scope": "openid email profile", "state": state, "access_type": "online", "prompt": "select_account", }) resp = RedirectResponse(f"{_GOOGLE_AUTH_URL}?{params}", status_code=302) resp.set_cookie(_STATE_COOKIE, state, max_age=_STATE_MAX_AGE, httponly=True, samesite="lax") return resp @router.get("/auth/google/callback", include_in_schema=False) async def google_callback( request: Request, code: str = "", state: str = "", error: str = "", ): if error: return _error_page(f"Google sign-in was cancelled or denied: {error}") if not code: return _error_page("No authorisation code returned by Google.") # CSRF check — state must match what we stored in the cookie stored_state = request.cookies.get(_STATE_COOKIE) if not stored_state or stored_state != state: return _error_page("State mismatch — please try signing in again.") # Exchange authorisation code for tokens try: token_data = _exchange_code(code) except Exception as e: logger.error("Google token exchange failed: %s", e) return _error_page("Could not complete sign-in with Google. Please try again.") access_token = token_data.get("access_token") if not access_token: return _error_page("No access token returned by Google.") # Fetch the user's profile try: userinfo = _get_userinfo(access_token) except Exception as e: logger.error("Google userinfo fetch failed: %s", e) return _error_page("Could not retrieve your Google profile. Please try again.") google_sub = userinfo.get("sub", "") google_email = userinfo.get("email", "") if not google_sub or not google_email: return _error_page("Your Google account didn't return a usable email address.") # Match to a Cortex user username = find_user_by_google(google_sub, google_email) if not username: logger.warning("Google sign-in rejected: no account for %s (%s)", google_sub, google_email) return _error_page( f"Your Google account ({google_email}) isn't registered with Cortex.

" "Contact Scott to get access." ) # Persist the stable sub so future lookups use it (not just email) link_google(username, google_sub, google_email) personas = list_user_personas(username) if not personas: return _error_page("No personas are configured for your account yet. Contact Scott.") logger.info("Google sign-in: %s (%s)", username, google_email) resp = RedirectResponse(f"/{username}/{personas[0]}", status_code=302) _set_session_cookie(resp, username) resp.delete_cookie(_STATE_COOKIE) return resp # --------------------------------------------------------------------------- # Private helpers # --------------------------------------------------------------------------- def _exchange_code(code: str) -> dict: body = urllib.parse.urlencode({ "code": code, "client_id": settings.google_client_id, "client_secret": settings.google_client_secret, "redirect_uri": f"{settings.cortex_base_url}/auth/google/callback", "grant_type": "authorization_code", }).encode() req = urllib.request.Request( _GOOGLE_TOKEN_URL, data=body, headers={"Content-Type": "application/x-www-form-urlencoded"}, method="POST", ) with urllib.request.urlopen(req, timeout=10) as resp: return json.loads(resp.read()) def _get_userinfo(access_token: str) -> dict: req = urllib.request.Request( _GOOGLE_USERINFO, headers={"Authorization": f"Bearer {access_token}"}, ) with urllib.request.urlopen(req, timeout=10) as resp: return json.loads(resp.read()) def _set_session_cookie(response: Response, username: str) -> None: token = create_token(username) response.set_cookie( COOKIE_NAME, token, max_age=settings.jwt_expire_days * 86400, httponly=True, samesite="lax", secure=False, # set True if terminating TLS at the app layer (not behind a proxy) ) def _error_page(message: str) -> HTMLResponse: html = f""" Cortex — Sign In Failed

Sign In Failed

{message}

← Back to Sign In
""" return HTMLResponse(html, status_code=403)