import datetime, json, requests, time from typing import Dict, List, Optional, Set, Union from app.db_sql import sql_select, sql_update, sql_insert from app.lib_general import log, logging, logger_reset # Zoom API Configuration (Defaults) ZOOM_OAUTH_URL = "https://zoom.us/oauth/token" ZOOM_API_BASE = "https://api.zoom.us/v2" # In-memory token cache (Standard Aether pattern) zoom_api_cache = { "access_token": None, "expire_on": None, "headers": {} } @logger_reset def get_zoom_access_token(force_refresh: bool = False): """ Retrieves a Zoom Access Token using Server-to-Server OAuth. Credentials should be stored in the 'data_store' table with code 'zoom_api_config'. """ log.setLevel(logging.INFO) # 1. Check Cache if not force_refresh and zoom_api_cache["access_token"] and zoom_api_cache["expire_on"]: if datetime.datetime.now() < zoom_api_cache["expire_on"]: log.debug("Using cached Zoom access token.") return zoom_api_cache # 2. Load Credentials from Data Store # Logic: Look for 'zoom_api_config' in data_store config_rec = sql_select( table_name='data_store', field_name='code', field_value='zoom_api_config' ) if not config_rec: log.error("Zoom API credentials not found in data_store (code='zoom_api_config').") return False try: config_data = json.loads(config_rec['text']) client_id = config_data['client_id'] client_secret = config_data['client_secret'] account_id = config_data['account_id'] except Exception as e: log.error(f"Failed to parse Zoom credentials from data_store: {e}") return False # 3. Request Token log.info("Requesting new Zoom Access Token...") params = { "grant_type": "account_credentials", "account_id": account_id } try: resp = requests.post( ZOOM_OAUTH_URL, params=params, auth=(client_id, client_secret), timeout=10 ) if resp.status_code != 200: log.error(f"Zoom OAuth failure: {resp.status_code} - {resp.text}") return False data = resp.json() zoom_api_cache["access_token"] = data["access_token"] # Set expiry with a 60s safety buffer zoom_api_cache["expire_on"] = datetime.datetime.now() + datetime.timedelta(seconds=data["expires_in"] - 60) zoom_api_cache["headers"] = { "Authorization": f"Bearer {data['access_token']}", "Accept": "application/json" } log.info("Successfully obtained Zoom access token.") return zoom_api_cache except Exception as e: log.exception(f"Unexpected error during Zoom OAuth: {e}") return False @logger_reset def get_zoom_tickets(event_id: str, page_size: int = 300, next_page_token: str = None): """ Retrieves 'Tickets' (Attendees) for a specific Zoom Event. Endpoint: GET /zoom_events/events/{eventId}/tickets """ auth = get_zoom_access_token() if not auth: return False url = f"{ZOOM_API_BASE}/zoom_events/events/{event_id}/tickets" params = {"page_size": page_size} if next_page_token: params["next_page_token"] = next_page_token try: resp = requests.get(url, headers=auth["headers"], params=params, timeout=15) if resp.status_code != 200: log.error(f"Zoom API Error: {resp.status_code} - {resp.text}") return False return resp.json() except Exception as e: log.exception(f"Failed to fetch tickets from Zoom: {e}") return False @logger_reset def sync_zoom_attendees_to_event(event_id_random: str, zoom_event_id: str): """ Atomic sync action: Pulls Zoom tickets and upserts Aether event_person records. Uses Zoom ticket_id as the primary external identifier. """ from app.methods.event_person_methods import create_update_event_person_obj_v4 log.info(f"Starting Zoom sync for event {event_id_random} (Zoom ID: {zoom_event_id})") # 1. Fetch tickets from Zoom zoom_data = get_zoom_tickets(zoom_event_id) if not zoom_data: log.error("Failed to retrieve tickets from Zoom API.") return False tickets = zoom_data.get("tickets", []) log.info(f"Found {len(tickets)} tickets in Zoom.") # 2. Resolve Local Context if event_id_int := redis_lookup_id_random(record_id_random=event_id_random, table_name='event'): pass else: log.error(f"Aether Event ID {event_id_random} not found.") return False # Get account_id for this event res = sql_select(sql="SELECT account_id FROM event WHERE id = :id", data={'id': event_id_int}) account_id_int = res.get('account_id') if res else None if not account_id_int: log.error("Could not resolve account_id for event.") return False sync_results = { "total": len(tickets), "created": 0, "updated": 0, "failed": 0 } # 3. Iterate and Upsert for ticket in tickets: try: ticket_id = ticket.get('ticket_id') email = ticket.get('email') first_name = ticket.get('first_name', '').strip() last_name = ticket.get('last_name', '').strip() # Standard External ID Pattern for Zoom: {zoom_event_id}:{ticket_id} external_id = f"{zoom_event_id}:{ticket_id}" # Prepare Aether Data structure (event_person_methods v4 expects this nested shape) event_person_data = { "enable": True, "external_id": external_id, "external_person_id": ticket.get('registrant_id'), # Zoom User ID "external_registration_id": ticket_id, "allow_tracking": True, # Lead retrieval demo default # Nested Person Profile (Person Table) "event_person_profile": { "given_name": first_name, "family_name": last_name, "full_name": f"{first_name} {last_name}".strip(), "email": email, "enable": True }, # Nested Badge Data (Event Badge Table) "event_badge": { "given_name": first_name, "family_name": last_name, "full_name": f"{first_name} {last_name}".strip(), "email": email, "badge_type": ticket.get('ticket_type_name', 'Attendee'), "badge_type_code": ticket.get('ticket_type_id'), "external_id": external_id, "enable": True } } # 4. Check for existing Event Person to determine create vs update # (Matches Impexium pattern of looking up by external_id) existing = sql_select( sql="SELECT id, event_badge_id, event_person_profile_id FROM event_person WHERE event_id = :eid AND external_id = :ext", data={'eid': event_id_int, 'ext': external_id} ) if existing: log.info(f"Updating existing record for {email} ({external_id})") res = create_update_event_person_obj_v4( event_person_dict_obj = event_person_data, event_person_id = existing['id'], account_id = account_id_int, event_id = event_id_int, event_badge_id = existing['event_badge_id'], event_person_profile_id = existing['event_person_profile_id'] ) if res: sync_results["updated"] += 1 else: sync_results["failed"] += 1 else: log.info(f"Creating new record for {email} ({external_id})") res = create_update_event_person_obj_v4( event_person_dict_obj = event_person_data, account_id = account_id_int, event_id = event_id_int ) if res: sync_results["created"] += 1 else: sync_results["failed"] += 1 except Exception as e: log.exception(f"Failed to process ticket {ticket.get('ticket_id')}: {e}") sync_results["failed"] += 1 log.info(f"Zoom sync complete: {sync_results}") return sync_results