Files
OSIT-AE-API-FastAPI/app/routers/api_v3_actions_e_zoom.py
Scott Idem 2fe783784c feat(integration): initial Zoom Events backend integration
- Implemented Server-to-Server OAuth2 logic in e_zoom_methods.py.
- Created V3 Action router for Zoom (test_connection, fetch tickets, atomic sync).
- Added sync_zoom_attendees_to_event with mapping to Aether person/badge models.
- Registered /v3/action/e_zoom router.
- Added E2E connection test script.
2026-02-03 18:23:07 -05:00

62 lines
2.5 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query, status
from typing import Optional
import asyncio
from app.lib_general import log, logging
from app.lib_general_v3 import AccountContext, get_account_context, DelayParams
from app.models.response_models import Resp_Body_Base, mk_resp
from app.methods.e_zoom_methods import get_zoom_access_token, get_zoom_tickets, sync_zoom_attendees_to_event
router = APIRouter()
@router.get('/test_connection', response_model=Resp_Body_Base)
async def test_zoom_connection(
account: AccountContext = Depends(get_account_context),
delay: DelayParams = Depends(),
):
"""
Verifies that the Zoom API credentials in data_store are valid.
"""
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
auth = get_zoom_access_token()
if auth:
return mk_resp(data={"status": "connected", "expires_on": str(auth["expire_on"])})
else:
return mk_resp(data=False, status_code=401, status_message="Zoom authentication failed. Check data_store credentials.")
@router.get('/events/{zoom_event_id}/tickets', response_model=Resp_Body_Base)
async def get_zoom_event_tickets(
zoom_event_id: str,
page_size: int = Query(300, ge=1, le=300),
next_page_token: Optional[str] = None,
account: AccountContext = Depends(get_account_context),
delay: DelayParams = Depends(),
):
"""
Proxy route to fetch raw ticket data from Zoom.
"""
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
data = get_zoom_tickets(zoom_event_id, page_size, next_page_token)
if data:
return mk_resp(data=data)
return mk_resp(data=False, status_code=500, status_message="Failed to fetch data from Zoom API.")
@router.post('/sync/event/{event_id_random}', response_model=Resp_Body_Base)
async def sync_zoom_to_aether(
event_id_random: str,
zoom_event_id: str = Query(...),
account: AccountContext = Depends(get_account_context),
delay: DelayParams = Depends(),
):
"""
Atomic sync action: Pulls Zoom tickets and upserts Aether event_person records.
Returns counts of created and updated records.
"""
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
result = sync_zoom_attendees_to_event(event_id_random, zoom_event_id)
if result:
return mk_resp(data=result, status_message="Zoom sync process completed.")
return mk_resp(data=False, status_code=500, status_message="Sync process failed or returned no data.")