- Implemented Server-to-Server OAuth2 logic in e_zoom_methods.py. - Created V3 Action router for Zoom (test_connection, fetch tickets, atomic sync). - Added sync_zoom_attendees_to_event with mapping to Aether person/badge models. - Registered /v3/action/e_zoom router. - Added E2E connection test script.
227 lines
8.4 KiB
Python
227 lines
8.4 KiB
Python
import datetime, json, requests, time
|
|
from typing import Dict, List, Optional, Set, Union
|
|
from app.db_sql import sql_select, sql_update, sql_insert
|
|
from app.lib_general import log, logging, logger_reset
|
|
|
|
# Zoom API Configuration (Defaults)
|
|
ZOOM_OAUTH_URL = "https://zoom.us/oauth/token"
|
|
ZOOM_API_BASE = "https://api.zoom.us/v2"
|
|
|
|
# In-memory token cache (Standard Aether pattern)
|
|
zoom_api_cache = {
|
|
"access_token": None,
|
|
"expire_on": None,
|
|
"headers": {}
|
|
}
|
|
|
|
@logger_reset
|
|
def get_zoom_access_token(force_refresh: bool = False):
|
|
"""
|
|
Retrieves a Zoom Access Token using Server-to-Server OAuth.
|
|
Credentials should be stored in the 'data_store' table with code 'zoom_api_config'.
|
|
"""
|
|
log.setLevel(logging.INFO)
|
|
|
|
# 1. Check Cache
|
|
if not force_refresh and zoom_api_cache["access_token"] and zoom_api_cache["expire_on"]:
|
|
if datetime.datetime.now() < zoom_api_cache["expire_on"]:
|
|
log.debug("Using cached Zoom access token.")
|
|
return zoom_api_cache
|
|
|
|
# 2. Load Credentials from Data Store
|
|
# Logic: Look for 'zoom_api_config' in data_store
|
|
config_rec = sql_select(
|
|
table_name='data_store',
|
|
field_name='code',
|
|
field_value='zoom_api_config'
|
|
)
|
|
|
|
if not config_rec:
|
|
log.error("Zoom API credentials not found in data_store (code='zoom_api_config').")
|
|
return False
|
|
|
|
try:
|
|
config_data = json.loads(config_rec['text'])
|
|
client_id = config_data['client_id']
|
|
client_secret = config_data['client_secret']
|
|
account_id = config_data['account_id']
|
|
except Exception as e:
|
|
log.error(f"Failed to parse Zoom credentials from data_store: {e}")
|
|
return False
|
|
|
|
# 3. Request Token
|
|
log.info("Requesting new Zoom Access Token...")
|
|
params = {
|
|
"grant_type": "account_credentials",
|
|
"account_id": account_id
|
|
}
|
|
|
|
try:
|
|
resp = requests.post(
|
|
ZOOM_OAUTH_URL,
|
|
params=params,
|
|
auth=(client_id, client_secret),
|
|
timeout=10
|
|
)
|
|
|
|
if resp.status_code != 200:
|
|
log.error(f"Zoom OAuth failure: {resp.status_code} - {resp.text}")
|
|
return False
|
|
|
|
data = resp.json()
|
|
zoom_api_cache["access_token"] = data["access_token"]
|
|
# Set expiry with a 60s safety buffer
|
|
zoom_api_cache["expire_on"] = datetime.datetime.now() + datetime.timedelta(seconds=data["expires_in"] - 60)
|
|
zoom_api_cache["headers"] = {
|
|
"Authorization": f"Bearer {data['access_token']}",
|
|
"Accept": "application/json"
|
|
}
|
|
|
|
log.info("Successfully obtained Zoom access token.")
|
|
return zoom_api_cache
|
|
|
|
except Exception as e:
|
|
log.exception(f"Unexpected error during Zoom OAuth: {e}")
|
|
return False
|
|
|
|
@logger_reset
|
|
def get_zoom_tickets(event_id: str, page_size: int = 300, next_page_token: str = None):
|
|
"""
|
|
Retrieves 'Tickets' (Attendees) for a specific Zoom Event.
|
|
Endpoint: GET /zoom_events/events/{eventId}/tickets
|
|
"""
|
|
auth = get_zoom_access_token()
|
|
if not auth: return False
|
|
|
|
url = f"{ZOOM_API_BASE}/zoom_events/events/{event_id}/tickets"
|
|
params = {"page_size": page_size}
|
|
if next_page_token: params["next_page_token"] = next_page_token
|
|
|
|
try:
|
|
resp = requests.get(url, headers=auth["headers"], params=params, timeout=15)
|
|
if resp.status_code != 200:
|
|
log.error(f"Zoom API Error: {resp.status_code} - {resp.text}")
|
|
return False
|
|
|
|
return resp.json()
|
|
except Exception as e:
|
|
log.exception(f"Failed to fetch tickets from Zoom: {e}")
|
|
return False
|
|
|
|
@logger_reset
|
|
def sync_zoom_attendees_to_event(event_id_random: str, zoom_event_id: str):
|
|
"""
|
|
Atomic sync action: Pulls Zoom tickets and upserts Aether event_person records.
|
|
Uses Zoom ticket_id as the primary external identifier.
|
|
"""
|
|
from app.methods.event_person_methods import create_update_event_person_obj_v4
|
|
|
|
log.info(f"Starting Zoom sync for event {event_id_random} (Zoom ID: {zoom_event_id})")
|
|
|
|
# 1. Fetch tickets from Zoom
|
|
zoom_data = get_zoom_tickets(zoom_event_id)
|
|
if not zoom_data:
|
|
log.error("Failed to retrieve tickets from Zoom API.")
|
|
return False
|
|
|
|
tickets = zoom_data.get("tickets", [])
|
|
log.info(f"Found {len(tickets)} tickets in Zoom.")
|
|
|
|
# 2. Resolve Local Context
|
|
if event_id_int := redis_lookup_id_random(record_id_random=event_id_random, table_name='event'):
|
|
pass
|
|
else:
|
|
log.error(f"Aether Event ID {event_id_random} not found.")
|
|
return False
|
|
|
|
# Get account_id for this event
|
|
res = sql_select(sql="SELECT account_id FROM event WHERE id = :id", data={'id': event_id_int})
|
|
account_id_int = res.get('account_id') if res else None
|
|
if not account_id_int:
|
|
log.error("Could not resolve account_id for event.")
|
|
return False
|
|
|
|
sync_results = {
|
|
"total": len(tickets),
|
|
"created": 0,
|
|
"updated": 0,
|
|
"failed": 0
|
|
}
|
|
|
|
# 3. Iterate and Upsert
|
|
for ticket in tickets:
|
|
try:
|
|
ticket_id = ticket.get('ticket_id')
|
|
email = ticket.get('email')
|
|
first_name = ticket.get('first_name', '').strip()
|
|
last_name = ticket.get('last_name', '').strip()
|
|
|
|
# Standard External ID Pattern for Zoom: {zoom_event_id}:{ticket_id}
|
|
external_id = f"{zoom_event_id}:{ticket_id}"
|
|
|
|
# Prepare Aether Data structure (event_person_methods v4 expects this nested shape)
|
|
event_person_data = {
|
|
"enable": True,
|
|
"external_id": external_id,
|
|
"external_person_id": ticket.get('registrant_id'), # Zoom User ID
|
|
"external_registration_id": ticket_id,
|
|
"allow_tracking": True, # Lead retrieval demo default
|
|
|
|
# Nested Person Profile (Person Table)
|
|
"event_person_profile": {
|
|
"given_name": first_name,
|
|
"family_name": last_name,
|
|
"full_name": f"{first_name} {last_name}".strip(),
|
|
"email": email,
|
|
"enable": True
|
|
},
|
|
|
|
# Nested Badge Data (Event Badge Table)
|
|
"event_badge": {
|
|
"given_name": first_name,
|
|
"family_name": last_name,
|
|
"full_name": f"{first_name} {last_name}".strip(),
|
|
"email": email,
|
|
"badge_type": ticket.get('ticket_type_name', 'Attendee'),
|
|
"badge_type_code": ticket.get('ticket_type_id'),
|
|
"external_id": external_id,
|
|
"enable": True
|
|
}
|
|
}
|
|
|
|
# 4. Check for existing Event Person to determine create vs update
|
|
# (Matches Impexium pattern of looking up by external_id)
|
|
existing = sql_select(
|
|
sql="SELECT id, event_badge_id, event_person_profile_id FROM event_person WHERE event_id = :eid AND external_id = :ext",
|
|
data={'eid': event_id_int, 'ext': external_id}
|
|
)
|
|
|
|
if existing:
|
|
log.info(f"Updating existing record for {email} ({external_id})")
|
|
res = create_update_event_person_obj_v4(
|
|
event_person_dict_obj = event_person_data,
|
|
event_person_id = existing['id'],
|
|
account_id = account_id_int,
|
|
event_id = event_id_int,
|
|
event_badge_id = existing['event_badge_id'],
|
|
event_person_profile_id = existing['event_person_profile_id']
|
|
)
|
|
if res: sync_results["updated"] += 1
|
|
else: sync_results["failed"] += 1
|
|
else:
|
|
log.info(f"Creating new record for {email} ({external_id})")
|
|
res = create_update_event_person_obj_v4(
|
|
event_person_dict_obj = event_person_data,
|
|
account_id = account_id_int,
|
|
event_id = event_id_int
|
|
)
|
|
if res: sync_results["created"] += 1
|
|
else: sync_results["failed"] += 1
|
|
|
|
except Exception as e:
|
|
log.exception(f"Failed to process ticket {ticket.get('ticket_id')}: {e}")
|
|
sync_results["failed"] += 1
|
|
|
|
log.info(f"Zoom sync complete: {sync_results}")
|
|
return sync_results
|