feat(integration): initial Zoom Events backend integration

- Implemented Server-to-Server OAuth2 logic in e_zoom_methods.py.
- Created V3 Action router for Zoom (test_connection, fetch tickets, atomic sync).
- Added sync_zoom_attendees_to_event with mapping to Aether person/badge models.
- Registered /v3/action/e_zoom router.
- Added E2E connection test script.
This commit is contained in:
Scott Idem
2026-02-03 18:23:07 -05:00
parent cc5af1c2e2
commit 2fe783784c
4 changed files with 323 additions and 1 deletions

View File

@@ -0,0 +1,226 @@
import datetime, json, requests, time
from typing import Dict, List, Optional, Set, Union
from app.db_sql import sql_select, sql_update, sql_insert
from app.lib_general import log, logging, logger_reset
# Zoom API Configuration (Defaults)
ZOOM_OAUTH_URL = "https://zoom.us/oauth/token"
ZOOM_API_BASE = "https://api.zoom.us/v2"
# In-memory token cache (Standard Aether pattern)
zoom_api_cache = {
"access_token": None,
"expire_on": None,
"headers": {}
}
@logger_reset
def get_zoom_access_token(force_refresh: bool = False):
"""
Retrieves a Zoom Access Token using Server-to-Server OAuth.
Credentials should be stored in the 'data_store' table with code 'zoom_api_config'.
"""
log.setLevel(logging.INFO)
# 1. Check Cache
if not force_refresh and zoom_api_cache["access_token"] and zoom_api_cache["expire_on"]:
if datetime.datetime.now() < zoom_api_cache["expire_on"]:
log.debug("Using cached Zoom access token.")
return zoom_api_cache
# 2. Load Credentials from Data Store
# Logic: Look for 'zoom_api_config' in data_store
config_rec = sql_select(
table_name='data_store',
field_name='code',
field_value='zoom_api_config'
)
if not config_rec:
log.error("Zoom API credentials not found in data_store (code='zoom_api_config').")
return False
try:
config_data = json.loads(config_rec['text'])
client_id = config_data['client_id']
client_secret = config_data['client_secret']
account_id = config_data['account_id']
except Exception as e:
log.error(f"Failed to parse Zoom credentials from data_store: {e}")
return False
# 3. Request Token
log.info("Requesting new Zoom Access Token...")
params = {
"grant_type": "account_credentials",
"account_id": account_id
}
try:
resp = requests.post(
ZOOM_OAUTH_URL,
params=params,
auth=(client_id, client_secret),
timeout=10
)
if resp.status_code != 200:
log.error(f"Zoom OAuth failure: {resp.status_code} - {resp.text}")
return False
data = resp.json()
zoom_api_cache["access_token"] = data["access_token"]
# Set expiry with a 60s safety buffer
zoom_api_cache["expire_on"] = datetime.datetime.now() + datetime.timedelta(seconds=data["expires_in"] - 60)
zoom_api_cache["headers"] = {
"Authorization": f"Bearer {data['access_token']}",
"Accept": "application/json"
}
log.info("Successfully obtained Zoom access token.")
return zoom_api_cache
except Exception as e:
log.exception(f"Unexpected error during Zoom OAuth: {e}")
return False
@logger_reset
def get_zoom_tickets(event_id: str, page_size: int = 300, next_page_token: str = None):
"""
Retrieves 'Tickets' (Attendees) for a specific Zoom Event.
Endpoint: GET /zoom_events/events/{eventId}/tickets
"""
auth = get_zoom_access_token()
if not auth: return False
url = f"{ZOOM_API_BASE}/zoom_events/events/{event_id}/tickets"
params = {"page_size": page_size}
if next_page_token: params["next_page_token"] = next_page_token
try:
resp = requests.get(url, headers=auth["headers"], params=params, timeout=15)
if resp.status_code != 200:
log.error(f"Zoom API Error: {resp.status_code} - {resp.text}")
return False
return resp.json()
except Exception as e:
log.exception(f"Failed to fetch tickets from Zoom: {e}")
return False
@logger_reset
def sync_zoom_attendees_to_event(event_id_random: str, zoom_event_id: str):
"""
Atomic sync action: Pulls Zoom tickets and upserts Aether event_person records.
Uses Zoom ticket_id as the primary external identifier.
"""
from app.methods.event_person_methods import create_update_event_person_obj_v4
log.info(f"Starting Zoom sync for event {event_id_random} (Zoom ID: {zoom_event_id})")
# 1. Fetch tickets from Zoom
zoom_data = get_zoom_tickets(zoom_event_id)
if not zoom_data:
log.error("Failed to retrieve tickets from Zoom API.")
return False
tickets = zoom_data.get("tickets", [])
log.info(f"Found {len(tickets)} tickets in Zoom.")
# 2. Resolve Local Context
if event_id_int := redis_lookup_id_random(record_id_random=event_id_random, table_name='event'):
pass
else:
log.error(f"Aether Event ID {event_id_random} not found.")
return False
# Get account_id for this event
res = sql_select(sql="SELECT account_id FROM event WHERE id = :id", data={'id': event_id_int})
account_id_int = res.get('account_id') if res else None
if not account_id_int:
log.error("Could not resolve account_id for event.")
return False
sync_results = {
"total": len(tickets),
"created": 0,
"updated": 0,
"failed": 0
}
# 3. Iterate and Upsert
for ticket in tickets:
try:
ticket_id = ticket.get('ticket_id')
email = ticket.get('email')
first_name = ticket.get('first_name', '').strip()
last_name = ticket.get('last_name', '').strip()
# Standard External ID Pattern for Zoom: {zoom_event_id}:{ticket_id}
external_id = f"{zoom_event_id}:{ticket_id}"
# Prepare Aether Data structure (event_person_methods v4 expects this nested shape)
event_person_data = {
"enable": True,
"external_id": external_id,
"external_person_id": ticket.get('registrant_id'), # Zoom User ID
"external_registration_id": ticket_id,
"allow_tracking": True, # Lead retrieval demo default
# Nested Person Profile (Person Table)
"event_person_profile": {
"given_name": first_name,
"family_name": last_name,
"full_name": f"{first_name} {last_name}".strip(),
"email": email,
"enable": True
},
# Nested Badge Data (Event Badge Table)
"event_badge": {
"given_name": first_name,
"family_name": last_name,
"full_name": f"{first_name} {last_name}".strip(),
"email": email,
"badge_type": ticket.get('ticket_type_name', 'Attendee'),
"badge_type_code": ticket.get('ticket_type_id'),
"external_id": external_id,
"enable": True
}
}
# 4. Check for existing Event Person to determine create vs update
# (Matches Impexium pattern of looking up by external_id)
existing = sql_select(
sql="SELECT id, event_badge_id, event_person_profile_id FROM event_person WHERE event_id = :eid AND external_id = :ext",
data={'eid': event_id_int, 'ext': external_id}
)
if existing:
log.info(f"Updating existing record for {email} ({external_id})")
res = create_update_event_person_obj_v4(
event_person_dict_obj = event_person_data,
event_person_id = existing['id'],
account_id = account_id_int,
event_id = event_id_int,
event_badge_id = existing['event_badge_id'],
event_person_profile_id = existing['event_person_profile_id']
)
if res: sync_results["updated"] += 1
else: sync_results["failed"] += 1
else:
log.info(f"Creating new record for {email} ({external_id})")
res = create_update_event_person_obj_v4(
event_person_dict_obj = event_person_data,
account_id = account_id_int,
event_id = event_id_int
)
if res: sync_results["created"] += 1
else: sync_results["failed"] += 1
except Exception as e:
log.exception(f"Failed to process ticket {ticket.get('ticket_id')}: {e}")
sync_results["failed"] += 1
log.info(f"Zoom sync complete: {sync_results}")
return sync_results

View File

@@ -0,0 +1,62 @@
from fastapi import APIRouter, Depends, HTTPException, Query, status
from typing import Optional
import asyncio
from app.lib_general import log, logging
from app.lib_general_v3 import AccountContext, get_account_context, DelayParams
from app.models.response_models import Resp_Body_Base, mk_resp
from app.methods.e_zoom_methods import get_zoom_access_token, get_zoom_tickets, sync_zoom_attendees_to_event
router = APIRouter()
@router.get('/test_connection', response_model=Resp_Body_Base)
async def test_zoom_connection(
account: AccountContext = Depends(get_account_context),
delay: DelayParams = Depends(),
):
"""
Verifies that the Zoom API credentials in data_store are valid.
"""
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
auth = get_zoom_access_token()
if auth:
return mk_resp(data={"status": "connected", "expires_on": str(auth["expire_on"])})
else:
return mk_resp(data=False, status_code=401, status_message="Zoom authentication failed. Check data_store credentials.")
@router.get('/events/{zoom_event_id}/tickets', response_model=Resp_Body_Base)
async def get_zoom_event_tickets(
zoom_event_id: str,
page_size: int = Query(300, ge=1, le=300),
next_page_token: Optional[str] = None,
account: AccountContext = Depends(get_account_context),
delay: DelayParams = Depends(),
):
"""
Proxy route to fetch raw ticket data from Zoom.
"""
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
data = get_zoom_tickets(zoom_event_id, page_size, next_page_token)
if data:
return mk_resp(data=data)
return mk_resp(data=False, status_code=500, status_message="Failed to fetch data from Zoom API.")
@router.post('/sync/event/{event_id_random}', response_model=Resp_Body_Base)
async def sync_zoom_to_aether(
event_id_random: str,
zoom_event_id: str = Query(...),
account: AccountContext = Depends(get_account_context),
delay: DelayParams = Depends(),
):
"""
Atomic sync action: Pulls Zoom tickets and upserts Aether event_person records.
Returns counts of created and updated records.
"""
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
result = sync_zoom_attendees_to_event(event_id_random, zoom_event_id)
if result:
return mk_resp(data=result, status_message="Zoom sync process completed.")
return mk_resp(data=False, status_code=500, status_message="Sync process failed or returned no data.")

View File

@@ -7,7 +7,7 @@ from app.routers import (
event_device, event_exhibit, event_exhibit_tracking, event_file, event_importing, event_device, event_exhibit, event_exhibit_tracking, event_file, event_importing,
event_location, event_person, event_location, event_person,
event_presentation, event_presenter, event_session, event_presentation, event_presenter, event_session,
flask_cfg, hosted_file, api_v3_actions_hosted_file, api_v3_actions_event_file, lookup, flask_cfg, hosted_file, api_v3_actions_hosted_file, api_v3_actions_event_file, api_v3_actions_e_zoom, lookup,
organization, page, person, organization, page, person,
person_user, qr, site, site_domain, user, person_user, qr, site, site_domain, user,
util_email, websockets, websockets_redis, websockets_v3, e_confex, e_cvent, e_impexium, e_stripe util_email, websockets, websockets_redis, websockets_v3, e_confex, e_cvent, e_impexium, e_stripe
@@ -49,6 +49,7 @@ def setup_routers(app: FastAPI):
app.include_router(hosted_file.router, prefix='/hosted_file', tags=['Hosted File']) app.include_router(hosted_file.router, prefix='/hosted_file', tags=['Hosted File'])
app.include_router(api_v3_actions_hosted_file.router, prefix='/v3/action/hosted_file', tags=['Hosted File (V3 Actions)']) app.include_router(api_v3_actions_hosted_file.router, prefix='/v3/action/hosted_file', tags=['Hosted File (V3 Actions)'])
app.include_router(api_v3_actions_event_file.router, prefix='/v3/action/event_file', tags=['Event File (V3 Actions)']) app.include_router(api_v3_actions_event_file.router, prefix='/v3/action/event_file', tags=['Event File (V3 Actions)'])
app.include_router(api_v3_actions_e_zoom.router, prefix='/v3/action/e_zoom', tags=['Zoom Events (V3 Actions)'])
app.include_router(lookup.router, prefix='/lu', tags=['Lookup']) app.include_router(lookup.router, prefix='/lu', tags=['Lookup'])
# app.include_router(organization.router, prefix='/organization', tags=['Organization'], dependencies=[Depends(DeprecationParams)]) # app.include_router(organization.router, prefix='/organization', tags=['Organization'], dependencies=[Depends(DeprecationParams)])

View File

@@ -0,0 +1,33 @@
import requests
import json
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/action/e_zoom"
API_KEY = "PMM4n50teUCaOMMTN8qOJA"
def test_zoom_connection():
print("--- Testing Zoom API Connection (V3 Action) ---")
url = f"{BASE_URL}/test_connection"
headers = {
"X-Aether-API-Key": API_KEY,
"x-no-account-id": "bypass"
}
try:
response = requests.get(url, headers=headers)
print(f"Status: {response.status_code}")
if response.status_code == 200:
print("✅ Success: API is reachable.")
print(f" Response: {json.dumps(response.json()['data'], indent=2)}")
elif response.status_code == 401:
print("⚠️ Note: API reachable, but credentials missing in data_store.")
print(f" Details: {response.json().get('status_message')}")
else:
print(f"❌ Failed: {response.text}")
except Exception as e:
print(f"💥 Exception: {e}")
if __name__ == "__main__":
test_zoom_connection()