from __future__ import annotations import datetime, json, pprint, pytz, random, requests, secrets, string, time from requests.auth import HTTPBasicAuth from typing import Dict, List, Optional, Set, Union from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update from app.lib_general import log, logging, logger_reset, secure_hash_string, verify_secure_hash_string from app.methods.person_methods import create_person_kiss, get_person_rec_list, get_person_rec_w_external_id, load_person_obj, update_person_kiss from app.methods.membership_person_methods import create_membership_person_obj, update_membership_person_obj app = {} app['client_id'] = '0oalt6dz82oSbN9ok1t7' # From Cvent Developer Portal app['secret'] = 'gQY96qffZAuB_44k73C_hn_MHeByBS8LXHj1vPRm' # From Cvent Developer api = {} api['base_url'] = 'https://api-platform.cvent.com/ea' # Including /ea as the Cvent version. EA = Early Access api['headers'] = {} # { 'Content-Type': content_type, 'Authorization': 'Basic '+str(cvent_authorization_base64) } # Updated 2022-02-01 @logger_reset def get_access_token(): log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) log.debug(f'API data:\n{api}') # api['access_token'] = '' if 'access_token' in api: access_token = api.get('access_token') log.debug(f'Cvent Access Token found: {access_token}') else: log.info(f'Cvent Access Token was not found. Will request one.') if 'expire_on' in api and datetime.datetime.now() < api['expire_on']: expire_on = api.get('expire_on') log.debug(f'Cvent Access Token is current: {expire_on}') return api else: expire_on = api.get('expire_on') log.info(f'Cvent Access Token is not current. Requesting a new one. Expired On: {expire_on}') endpoint = '/oauth2/token' uri = api['base_url']+endpoint data = { 'grant_type': 'client_credentials', 'client_id': app['client_id'] } log.debug(f'Oauth Token Request Data:\n{api}') resp = requests.post(url=uri, data=data, auth=HTTPBasicAuth(app['client_id'], app['secret'])) # Sending as HTML form data log.debug(f'Status Code: {resp.status_code}') log.debug(f'Headers: {resp.headers}') log.debug(f'Encoding: {resp.encoding}') log.debug(f'JSON: {resp.json()}') # log.debug('Text:', resp.text) response_data = resp.json() api['access_token'] = response_data['access_token'] api['expires_in'] = response_data['expires_in'] api['token_type'] = response_data['token_type'] api['expire_on'] = datetime.datetime.now() + datetime.timedelta(seconds=response_data['expires_in']) api['headers']['Accept'] = 'application/json' api['headers']['x-api-key'] = app['client_id'] api['headers']['Authorization'] = 'Bearer '+api['access_token'] log.debug(api) log.warning('Sleeping for 1 second to avoid Cvent rate limit...') time.sleep(1) return api # Updated 2022-01-31 @logger_reset def get_contact_custom_field_list(api): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) endpoint = '/custom-fields' uri = api['base_url']+endpoint params = { 'filter': "category eq 'Contact'" } resp = requests.get(url=uri, headers=api['headers'], params=params) log.debug('Status Code:', resp.status_code) log.debug('Headers:', resp.headers) log.debug('Encoding:', resp.encoding) log.debug('JSON:', resp.json()) # log.debug('Text:', resp.text) response_data = resp.json() # f = open('contact_custom_field_list.txt', 'w') # f.write(pprint.pformat(response_data, indent=2, width=160)) return response_data # Updated 2022-01-31 @logger_reset def get_contact_list(external_id: str=None, email: str=None): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) get_access_token() endpoint = '/contacts' uri = api['base_url']+endpoint # External ID for IDAA: UUID = '609ab766-7d79-4a9d-a72c-f126412659ee' # customField.609ab766-7d79-4a9d-a72c-f126412659ee eq external_id params = {} if external_id: params['filter'] = f"deleted eq 'False' and customField.609ab766-7d79-4a9d-a72c-f126412659ee eq '{external_id}'" elif email: params['filter'] = f"deleted eq 'False' and email eq '{email}'" elif external_id is None: params['filter'] = f"deleted eq 'False' and customField.609ab766-7d79-4a9d-a72c-f126412659ee lt '1'" else: params['filter'] = f"deleted eq 'False'" resp = requests.get(url=uri, headers=api['headers'], params=params) response_data = resp.json() log.debug(json.dumps(response_data, indent=2, default=str)) if 'message' in response_data and response_data['message'] == 'Too Many Requests': return False return response_data # ### BEGIN ### API Cvent Methods ### get_recent_contact_list() ### # Updated 2022-02-02 @logger_reset def get_recent_contact_list( from_created_on = datetime.datetime.utcnow() + datetime.timedelta(minutes=-60), to_created_on = datetime.datetime.utcnow(), from_updated_on = datetime.datetime.utcnow() + datetime.timedelta(minutes=-60), to_updated_on = None, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) get_access_token() endpoint = f'/contacts' uri = api['base_url']+endpoint params = {} from_created_on_str = from_created_on.isoformat() log.info(f'From Created On: {from_created_on_str}') if to_created_on: to_created_on_str = to_created_on.isoformat() log.info(f'To Created On: {to_created_on_str}') from_updated_on_str = from_updated_on.isoformat() log.info(f'From Updated On: {from_updated_on_str}') if to_updated_on: to_updated_on_str = to_updated_on.isoformat() log.info(f'To Updated On: {to_updated_on_str}') if from_created_on and from_updated_on and to_created_on and to_updated_on: params['filter'] = f"deleted eq 'False' and (created gt '{from_created_on_str}Z' or lastModified gt '{from_updated_on_str}Z') and (created lt '{to_created_on_str}Z' or lastModified lt '{to_updated_on_str}Z')" elif from_created_on and to_created_on: params['filter'] = f"deleted eq 'False' and created gt '{from_created_on_str}Z' and created lt '{to_created_on_str}Z'" elif updated_on: params['filter'] = f"deleted eq 'False' and lastModified gt '{from_updated_on_str}Z'" else: log.warning('Created On or Updated On is requried. Returing False') return False resp = requests.get(url=uri, headers=api['headers'], params=params) response_data = resp.json() # log.debug(json.dumps(response_data, indent=2, default=str)) cvent_contact_list = response_data.get('data') next_token = None if 'paging' in response_data: next_token = response_data.get('paging').get('nextToken') log.debug(json.dumps(response_data.get('paging'), indent=2, default=str)) limit = 0 while next_token and limit < 100: limit = limit + 1 params = {} params['token'] = next_token resp = requests.get(url=uri, headers=api['headers'], params=params) response_data = resp.json() # log.debug(json.dumps(response_data, indent=2, default=str)) if 'message' in response_data and response_data['message'] == 'Too Many Requests': log.warning('Hit Cvent rate limit. Sleeping for .5 seconds...') time.sleep(.5) else: cvent_contact_list = cvent_contact_list + response_data.get('data') if 'paging' in response_data: next_token = response_data.get('paging').get('nextToken') log.debug(json.dumps(response_data.get('paging'), indent=2, default=str)) log.warning('Sleeping for .25 seconds to avoid Cvent rate limit...') time.sleep(.25) else: next_token = None # log.debug(json.dumps(cvent_contact_list, indent=2, default=str)) # log.debug(json.dumps(response_data, indent=2, default=str)) if 'message' in response_data and response_data['message'] == 'Too Many Requests': return False return cvent_contact_list # ### END ### API Cvent Methods ### get_recent_contact_list() ### # ### BEGIN ### API Cvent Methods ### get_contact_id() ### # Updated 2022-02-01 @logger_reset def get_contact_id(contact_id: str): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) get_access_token() endpoint = f'/contacts/{contact_id}' uri = api['base_url']+endpoint params = {} resp = requests.get(url=uri, headers=api['headers'], params=params) response_data = resp.json() log.debug(json.dumps(response_data, indent=2, default=str)) if 'message' in response_data and response_data['message'] == 'Too Many Requests': return False return response_data # ### END ### API Cvent Methods ### get_contact_id() ### # ### BEGIN ### API Cvent Methods ### modify_contact_id() ### # Updated 2022-02-01 @logger_reset def modify_contact_id(contact_id: str, field_list: list=[], custom_field_id: str=None, custom_field_value: str=None): log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) get_access_token() if custom_field_id: log.info(f'Updating Cvent Custom Field ID: {custom_field_id}') endpoint = f'/contacts/{contact_id}/custom-fields/{custom_field_id}/answers' elif field_list: log.info(f'Updating Cvent Field List: {field_list}') endpoint = f'/contacts/{contact_id}' else: log.error('Missing required parameters') return False uri = api['base_url']+endpoint params = {} data = {} if custom_field_id: data['id'] = custom_field_id data['value'] = custom_field_value else: data = field_list data['id'] = contact_id if custom_field_id: resp = requests.put(url=uri, headers=api['headers'], params=params, json=data) else: resp = requests.patch(url=uri, headers=api['headers'], params=params, json=data) log.info('Updated Cvent Contact ID: {contact_id}') response_data = resp.json() log.debug(json.dumps(response_data, indent=2, default=str)) if 'message' in response_data and response_data['message'] == 'Too Many Requests': return False return response_data # ### END ### API Cvent Methods ### modify_contact_id() ### # ### BEGIN ### API External Cvent ### create_update_aether_person() ### # Updated 2022-02-02 @logger_reset def create_update_aether_person( cvent_contact_id: str, cvent_contact_obj: dict, account_id: str, person_id: str=None, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) # Important variables used more than once. person_external_id = None if custom_field_list := cvent_contact_obj.get('customFields'): # List for custom_field in custom_field_list: # Get the External ID created by OSIT if custom_field.get('id') == '609ab766-7d79-4a9d-a72c-f126412659ee': # person_data['external_id'] = custom_field.get('value')[0] person_external_id = custom_field.get('value')[0] email = cvent_contact_obj.get('email') membership_person_id = None membership_person_type_id = None person_data = {} person_data['external_id'] = person_external_id # person_data['pronouns'] = ??? person_data['informal_name'] = cvent_contact_obj.get('nickname') person_data['title_names'] = cvent_contact_obj.get('prefix') person_data['given_name'] = cvent_contact_obj.get('firstName') person_data['middle_name'] = cvent_contact_obj.get('middleName') person_data['family_name'] = cvent_contact_obj.get('lastName') person_data['designations'] = cvent_contact_obj.get('designation') person_data['professional_title'] = cvent_contact_obj.get('title') person_data['affiliations'] = cvent_contact_obj.get('company') person_data['enable'] = True log.debug(person_data) contact_data = {} contact_data['email'] = cvent_contact_obj.get('email') contact_data['cc_email'] = cvent_contact_obj.get('ccEmail') contact_data['phone_mobile'] = cvent_contact_obj.get('mobilePhone') contact_data['phone_home'] = cvent_contact_obj.get('homePhone') contact_data['phone_office'] = cvent_contact_obj.get('workPhone') log.debug(contact_data) address_data = {} if cvent_contact_obj.get('homeAddress'): address_data['line_1'] = cvent_contact_obj.get('homeAddress').get('address1') address_data['line_2'] = cvent_contact_obj.get('homeAddress').get('address2') address_data['line_3'] = cvent_contact_obj.get('homeAddress').get('address3') address_data['city'] = cvent_contact_obj.get('homeAddress').get('city') address_data['state_province'] = cvent_contact_obj.get('homeAddress').get('region') if cvent_contact_obj.get('homeAddress').get('countryCode') and cvent_contact_obj.get('homeAddress').get('regionCode'): # NOTE: There needs to be a better fix for this. Why does Cvent include the 3 character country code only sometimes??? cvent_region_code = cvent_contact_obj.get('homeAddress').get('regionCode').replace('(AUS)', '') country_subdivision_code = cvent_contact_obj.get('homeAddress').get('countryCode') +'-'+ cvent_region_code address_data['country_subdivision_code'] = country_subdivision_code address_data['postal_code'] = cvent_contact_obj.get('homeAddress').get('postalCode') address_data['country_alpha_2_code'] = cvent_contact_obj.get('homeAddress').get('countryCode') address_data['country'] = cvent_contact_obj.get('homeAddress').get('country') address_data['country_alpha_2_code'] = cvent_contact_obj.get('homeAddress').get('countryCode') log.debug(address_data) contact_data['address'] = address_data person_data['contact'] = contact_data user_data = {} # user_data['name'] = user_data['username'] = cvent_contact_obj.get('email') user_data['email'] = cvent_contact_obj.get('email') # random_password_string = secrets.token_urlsafe(8) # user_data['password'] = secure_hash_string(string=random_password_string) user_data['email_verified'] = True # Assuming this is True because they came from another system where the email address is required. user_data['enable'] = True user_data['enable_from'] = datetime.datetime.now(datetime.timezone.utc) user_data['enable_to'] = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365) user_data['super'] = False # This is just not allowed user_data['manager'] = False # This is just not allowed # user_data['administrator'] = False user_data['public'] = False user_data['verified'] = True person_data['user'] = user_data if cvent_contact_obj.get('type'): # NOTE: Should this be renamed to contact_type_id and contact_type_name??? cvent_contact_type_id = cvent_contact_obj.get('type').get('id') cvent_contact_type_name = cvent_contact_obj.get('type').get('name') log.info(f'Found Cvent Contact Type Named: {cvent_contact_type_name}') else: cvent_contact_type_id = None cvent_contact_type_name = None # 'id': '5EB898D8-C253-482C-A93A-0B6667C26E04', 'name': 'Al-Anon Member' # 'id': 'A20358C5-0F6C-47AF-9843-BA9483A9D767', 'name': 'Al-Anon Non-Member' # 'id': 'A01900AB-496A-48A1-9B04-C2874651227E', 'name': 'Member' # 'id': '65437A15-39C2-4EB5-9AFE-67AF6FE41C27', 'name': 'Non-Member' # 'id': '03622AEE-F586-4AE5-A191-B8372543A8C8', 'name': 'Student Member' # 'id': 'A69FAF20-BF2A-4222-B15B-7B0C7EFBEAA7', 'name': 'Student Non-member' # 'id': '54127B4D-E531-4046-AF5C-0F0D71DC39D2', 'name': 'Adult Guest Registration' # 'id': 'C9FA7E47-A925-44AB-B94A-9B3003CA2AC4', 'name': 'Attendee' # 'id': '6F06D6B6-2C23-4EF8-986F-73BF0DB2B229', 'name': "Children's Program with Jerry Moe (7-12 years)" # 'id': 'AADABEF0-3C84-45A2-9D9B-E2CF585D4AE5', 'name': 'General Attendee' # 'id': '96D5B3CC-FD4E-4957-BA71-9CEF388095EF', 'name': 'Guest' # 'id': '71D07118-C24D-4B2E-888D-56AC1B941495', 'name': "IDAA 20's Guest Registration" # 'id': 'DA17F721-9924-43E3-A31F-C567BA96DC64', 'name': 'IDAA Teen (13-19 years)' # 'id': 'C49439B3-5AE6-496F-A0AD-4CCB1A9000E3', 'name': 'Spouse/SO Guest Registration' # Currently in use Member Type names for IDAA: # Member Type = Member Contact Type # Al-Anon Member(s) = "Al-Anon Member" and "Al-Anon Non-Member" # Annual Contribution(s) = "Attendee" and "Non-Member" # Doctoral Qualifying Member(s) = "Member" and "Non-Member" # Student Member(s) = "Student Member" and "Student Non-member" # NOTE: "Student Non-member" is NOT "Student Non-Member" (the lowercase m) membership_person_data = {} membership_person_type_data = {} # if cvent_contact_type_name: if cvent_contact_obj.get('membership'): if cvent_contact_type_name == 'Al-Anon Member' or cvent_contact_type_name == 'Al-Anon Members': membership_person_type_data['membership_type_id'] = 6 membership_person_type_data['product_id'] = 13 membership_person_type_data['level'] = 1 elif cvent_contact_type_name == 'Annual Contribution' or cvent_contact_type_name == 'Annual Contributions' or cvent_contact_type_name == 'Attendee': # Unsure... making affiliate membership_person_type_data['membership_type_id'] = 8 membership_person_type_data['product_id'] = 13 membership_person_type_data['level'] = 3 elif cvent_contact_type_name == 'Doctoral Qualifying Member' or cvent_contact_type_name == 'Doctoral Qualifying Members' or cvent_contact_type_name == 'Member': membership_person_type_data['membership_type_id'] = 5 membership_person_type_data['product_id'] = 4 membership_person_type_data['level'] = 1 elif cvent_contact_type_name == 'Student Member' or cvent_contact_type_name == 'Student Members': membership_person_type_data['membership_type_id'] = 7 membership_person_type_data['product_id'] = 14 membership_person_type_data['level'] = 1 elif cvent_contact_type_name in ['Al-Anon Non-Member', 'Non-Member', 'Student Non-member', 'Student Non-Member']: membership_person_type_data['membership_type_id'] = 8 membership_person_type_data['product_id'] = 13 membership_person_type_data['level'] = 3 else: log.error(f'Unknown Contact Type Name from Cvent: {cvent_contact_type_name}') cvent_contact_type_id = None cvent_contact_type_name = None membership_person_type_data['first_start_on'] = datetime.datetime.strptime(cvent_contact_obj.get('membership').get('joined'), '%Y-%m-%d') if cvent_contact_obj.get('membership').get('lastRenewal'): membership_person_type_data['start_on'] = datetime.datetime.strptime(cvent_contact_obj.get('membership').get('lastRenewal'), '%Y-%m-%d') else: membership_person_type_data['start_on'] = datetime.datetime.strptime(cvent_contact_obj.get('membership').get('joined'), '%Y-%m-%d') membership_person_type_data['end_on'] = datetime.datetime.strptime(cvent_contact_obj.get('membership').get('expiration'), '%Y-%m-%d') membership_person_type_data['last_end_on'] = datetime.datetime.strptime(cvent_contact_obj.get('membership').get('expiration'), '%Y-%m-%d') current_datetime = datetime.datetime.now() if membership_person_type_data['end_on'] >= current_datetime: membership_person_type_data['lu_membership_type_status_id'] = 5 # 5 = active; expiration is > now else: membership_person_type_data['lu_membership_type_status_id'] = 7 # 7 = inactive; expiration is < now membership_person_data['enable'] = True membership_person_type_data['enable'] = True membership_person_data['membership_person_type'] = membership_person_type_data # log.debug(json.dumps(membership_person_data, indent=2, default=str)) # Try to look up using external ID if not person_id and person_external_id: log.info(f'Looking up person with External ID: {person_external_id}') if result := get_person_rec_w_external_id(account_id=account_id, external_id=person_external_id): log.debug(result) person_id = result.get('person_id') log.info(f'Person ID {person_id} found using external ID.') else: pass # If that fails then try to look up using email address if not person_id and email: log.info(f'Looking up person with Email Address: {email}') if result := get_person_rec_list(for_obj_type='account', for_obj_id=account_id, email=email): log.debug(result[0]) person_id = result[0].get('person_id') log.info(f'Person ID {person_id} found using email address.') else: pass log.debug(f'Person ID: {person_id}; External ID: {person_external_id}; Email: {email}') # ### TESTING BREAK POINT ### # person_data['membership_person'] = membership_person_data # return False # ### TESTING BREAK POINT ### # If there is not an external ID for an existing or new person then one needs to be created. if not person_external_id: N = 8 random_string = ''.join(random.choices(string.ascii_uppercase + string.digits, k=N)) random_string = ''.join(random.SystemRandom().choices(string.ascii_uppercase + string.digits, k=N)) # random_string = ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(N)) log.info(f'Created random string: {random_string}') email = cvent_contact_obj.get('email') person_external_id = f'{random_string}~{email}' log.info(f'Created new Person External ID: {person_external_id}') person_data['external_id'] = person_external_id # NOTE: Update Cvent Contact custom_field_id = '609ab766-7d79-4a9d-a72c-f126412659ee' # IDAA Cvent External ID UUID custom_field_value = [person_external_id] log.warning('Sleeping for .75 second to avoid Cvent rate limit...') time.sleep(.75) contact_mod_result = modify_contact_id( contact_id = cvent_contact_id, # This is the Cvent Contact UUID for a person # field_list = field_list, custom_field_id = custom_field_id, custom_field_value = custom_field_value, ) if person_id: # ### SECTION ### Load person object to populate the person_data and membership_person_data dicts. person_obj = load_person_obj(person_id=person_id, inc_address=True, inc_contact=True, inc_membership_person=True, inc_membership_person_type=True, inc_user=True) log.debug(person_obj) person_data['id'] = person_id person_data['account_id'] = account_id person_data['contact']['id'] = person_obj.contact.id person_data['contact']['address']['id'] = person_obj.contact.address.id if person_obj.user: person_data['user']['id'] = person_obj.user.id if person_obj.membership_person: membership_person_id = person_obj.membership_person.id if person_obj.membership_person.membership_person_type: membership_person_type_id = person_obj.membership_person.membership_person_type.id else: membership_person_type_id = None if not person_obj.external_id: person_data['external_id'] = person_external_id log.debug(person_data) if update_person_kiss(person_id=person_id, person_dict_obj=person_data): log.info(f'Updated Person ID: {person_id}') else: log.info(f'Did not update Person ID: {person_id}') else: # ### SECTION ### Create new person and related with the person_data and membership_person_data dicts. person_data['account_id'] = account_id person_data['notes'] = 'Created using data from Cvent' random_password_string = secrets.token_urlsafe(8) person_data['user']['password'] = secure_hash_string(string=random_password_string) user_data['notes'] = 'Created using data from Cvent' # other_data = {} # other_data['temp_password'] = random_password_string # person_data['user']['other_json'] = json.dumps(other_data, indent=4) log.debug(person_data) if create_person_obj_result := create_person_kiss(account_id=account_id, person_dict_obj=person_data): person_id = create_person_obj_result log.info(f'Created Person ID: {person_id}') else: log.info(f'Did not create Person') # ### TESTING BREAK POINT ### # person_data['membership_person'] = membership_person_data # return False # ### TESTING BREAK POINT ### if cvent_contact_obj.get('membership'): if cvent_contact_type_id and membership_person_id: # membership_person_id = person_obj.membership_person.id membership_person_data['id'] = membership_person_id membership_person_data['membership_person_type']['id'] = membership_person_type_id if update_membership_person_obj(membership_person_id=membership_person_id, membership_person_dict_obj=membership_person_data): log.info(f'Updated Membership Person ID: {membership_person_id}') else: log.info(f'Did not update Membership Person ID: {membership_person_id}') elif cvent_contact_type_id: if create_membership_person_obj_result := create_membership_person_obj(account_id=account_id, person_id=person_id, membership_person_dict_obj=membership_person_data): membership_person_id = create_membership_person_obj_result log.info(f'Created Membership Person ID: {membership_person_id}') else: log.info(f'Did not create Membership Person') else: membership_person_data = {} else: membership_person_data = {} person_data['membership_person'] = membership_person_data log.debug(json.dumps(person_data, indent=2, default=str)) # person_obj = load_person_obj(person_id=person_id, inc_address=True, inc_contact=True, inc_membership_person=True, inc_membership_person_type=True, inc_user=True) # log.debug(person_obj) return person_id # True # ### END ### API External Cvent ### create_update_aether_person() ###