pywebpush 2.x routes string keys through Vapid.from_string() which only handles raw/DER base64 — not PEM. Pre-build the Vapid object so the key deserializes correctly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
118 lines
3.5 KiB
Python
118 lines
3.5 KiB
Python
"""
|
|
Web Push (VAPID) helpers.
|
|
|
|
Subscriptions are stored per-user at:
|
|
home/{user}/push_subscriptions.json → list of {endpoint, keys:{p256dh, auth}}
|
|
|
|
send_push(username, title, body, url) iterates all stored subscriptions for that
|
|
user and fires a push. Stale endpoints (410 Gone) are pruned automatically.
|
|
"""
|
|
import asyncio
|
|
import base64
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
from config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _subs_path(username: str) -> Path:
|
|
return settings.home_root() / username / "push_subscriptions.json"
|
|
|
|
|
|
def load_subscriptions(username: str) -> list[dict]:
|
|
path = _subs_path(username)
|
|
if not path.exists():
|
|
return []
|
|
try:
|
|
return json.loads(path.read_text())
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def _save_subscriptions(username: str, subs: list[dict]) -> None:
|
|
path = _subs_path(username)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps(subs, indent=2))
|
|
|
|
|
|
def add_subscription(username: str, sub: dict) -> None:
|
|
"""Upsert a subscription by endpoint URL."""
|
|
subs = load_subscriptions(username)
|
|
endpoint = sub.get("endpoint", "")
|
|
subs = [s for s in subs if s.get("endpoint") != endpoint]
|
|
subs.append(sub)
|
|
_save_subscriptions(username, subs)
|
|
|
|
|
|
def remove_subscription(username: str, endpoint: str) -> bool:
|
|
subs = load_subscriptions(username)
|
|
new_subs = [s for s in subs if s.get("endpoint") != endpoint]
|
|
if len(new_subs) == len(subs):
|
|
return False
|
|
_save_subscriptions(username, new_subs)
|
|
return True
|
|
|
|
|
|
def _get_private_key_pem() -> str:
|
|
"""Decode the base64-encoded PEM private key from settings."""
|
|
raw = settings.vapid_private_key_b64.strip()
|
|
if not raw:
|
|
raise RuntimeError("VAPID_PRIVATE_KEY_B64 is not set in .env")
|
|
return base64.b64decode(raw).decode()
|
|
|
|
|
|
def _send_one(sub: dict, payload: dict) -> bool:
|
|
"""Send a push to a single subscription. Returns False if the endpoint is stale (410)."""
|
|
from pywebpush import webpush, WebPushException
|
|
from py_vapid import Vapid
|
|
|
|
try:
|
|
vapid = Vapid.from_pem(_get_private_key_pem().encode())
|
|
webpush(
|
|
subscription_info=sub,
|
|
data=json.dumps(payload),
|
|
vapid_private_key=vapid,
|
|
vapid_claims={"sub": settings.vapid_contact},
|
|
)
|
|
return True
|
|
except WebPushException as e:
|
|
if e.response is not None and e.response.status_code == 410:
|
|
logger.info("push endpoint gone (410), pruning: %s", sub.get("endpoint", "")[:60])
|
|
return False
|
|
logger.warning("push failed: %s", e)
|
|
return True # keep the sub; might be transient
|
|
|
|
|
|
async def send_push(username: str, title: str, body: str, url: str = "") -> dict:
|
|
"""
|
|
Send a push notification to all subscriptions for username.
|
|
Returns {"sent": n, "pruned": m}.
|
|
"""
|
|
if not settings.vapid_public_key or not settings.vapid_private_key_b64:
|
|
return {"error": "VAPID keys not configured"}
|
|
|
|
subs = load_subscriptions(username)
|
|
if not subs:
|
|
return {"error": f"No push subscriptions for {username}"}
|
|
|
|
payload = {"title": title, "body": body, "url": url}
|
|
keep = []
|
|
sent = 0
|
|
pruned = 0
|
|
|
|
for sub in subs:
|
|
alive = await asyncio.to_thread(_send_one, sub, payload)
|
|
if alive:
|
|
keep.append(sub)
|
|
sent += 1
|
|
else:
|
|
pruned += 1
|
|
|
|
if pruned:
|
|
_save_subscriptions(username, keep)
|
|
|
|
return {"sent": sent, "pruned": pruned}
|