- push_utils.py: subscription storage + send helper (auto-prunes 410 endpoints) - routers/push.py: GET /api/push/vapid-key (public), POST/DELETE /api/push/subscribe - sw.js: push event listener shows notification; notificationclick focuses/opens tab - app.js: subscribe/unsubscribe flow + "Enable notifications" toggle in settings dropdown - tools/notify.py: web_push orchestrator tool (user-level, no admin required) - VAPID keys in .env; pywebpush added to requirements.txt Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
61 lines
1.9 KiB
Python
61 lines
1.9 KiB
Python
"""
|
|
Web Push endpoints.
|
|
|
|
GET /api/push/vapid-key → public VAPID key for browser PushManager.subscribe()
|
|
POST /api/push/subscribe → save a push subscription for the logged-in user
|
|
DELETE /api/push/subscribe → remove a subscription by endpoint
|
|
"""
|
|
import jwt
|
|
from fastapi import APIRouter, HTTPException, Request
|
|
from pydantic import BaseModel
|
|
|
|
from auth_utils import COOKIE_NAME, decode_token
|
|
from config import settings
|
|
import push_utils
|
|
|
|
router = APIRouter(prefix="/api/push")
|
|
|
|
|
|
def _require_user(request: Request) -> str:
|
|
token = request.cookies.get(COOKIE_NAME)
|
|
if not token:
|
|
raise HTTPException(status_code=401, detail="Not authenticated")
|
|
try:
|
|
return decode_token(token)
|
|
except jwt.InvalidTokenError:
|
|
raise HTTPException(status_code=401, detail="Invalid session")
|
|
|
|
|
|
@router.get("/vapid-key")
|
|
async def get_vapid_key() -> dict:
|
|
"""Return the VAPID public key. Public endpoint — needed before login to subscribe."""
|
|
key = settings.vapid_public_key
|
|
if not key:
|
|
raise HTTPException(status_code=503, detail="Push notifications not configured")
|
|
return {"public_key": key}
|
|
|
|
|
|
class SubscribeRequest(BaseModel):
|
|
subscription: dict # full PushSubscription JSON from browser
|
|
|
|
|
|
class UnsubscribeRequest(BaseModel):
|
|
endpoint: str
|
|
|
|
|
|
@router.post("/subscribe")
|
|
async def subscribe(req: SubscribeRequest, request: Request) -> dict:
|
|
username = _require_user(request)
|
|
sub = req.subscription
|
|
if not sub.get("endpoint"):
|
|
raise HTTPException(status_code=400, detail="subscription.endpoint is required")
|
|
push_utils.add_subscription(username, sub)
|
|
return {"ok": True}
|
|
|
|
|
|
@router.delete("/subscribe")
|
|
async def unsubscribe(req: UnsubscribeRequest, request: Request) -> dict:
|
|
username = _require_user(request)
|
|
found = push_utils.remove_subscription(username, req.endpoint)
|
|
return {"ok": True, "found": found}
|