Files
OSIT-AE-API-FastAPI/app/methods/e_cvent_methods.py

539 lines
24 KiB
Python

from __future__ import annotations
import datetime, json, pprint, pytz, random, requests, secrets, time
from requests.auth import HTTPBasicAuth
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
from app.lib_general import log, logging, logger_reset, secure_hash_string, verify_secure_hash_string
from app.methods.person_methods import get_person_rec_w_external_id, load_person_obj, update_person_kiss
app = {}
app['client_id'] = '0oalt6dz82oSbN9ok1t7' # From Cvent Developer Portal
app['secret'] = 'gQY96qffZAuB_44k73C_hn_MHeByBS8LXHj1vPRm' # From Cvent Developer
api = {}
api['base_url'] = 'https://api-platform.cvent.com/ea' # Including /ea as the Cvent version. EA = Early Access
api['headers'] = {} # { 'Content-Type': content_type, 'Authorization': 'Basic '+str(cvent_authorization_base64) }
# Updated 2022-02-01
@logger_reset
def get_access_token():
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
log.debug(f'API data:\n{api}')
# api['access_token'] = 'eyJraWQiOiJXdWtMTUFGNFkxM1ZUQmFnV1I4WU94ZVdPU3dIYXM5RTRyaFhqc1p5X2JVIiwiYWxnIjoiUlMyNTYifQ.eyJ2ZXIiOjEsImp0aSI6IkFULmltZGJlbnJIbnE1Rjd4blhucEN2Y0ExTzdCWUtCUWhUTU1fQUdxd1Z0RUUiLCJpc3MiOiJodHRwczovL3Nzby5jdmVudC5jb20vb2F1dGgyL2F1c2kzbXowZjNvRENNMlRpMXQ3IiwiYXVkIjoiYXBpLXBsYXRmb3JtIiwiaWF0IjoxNjQzNjYxMTc1LCJleHAiOjE2NDM2NjQ3NzUsImNpZCI6IjBvYWx0NmR6ODJvU2JOOW9rMXQ3Iiwic2NwIjpbImV2ZW50L2NvbnRhY3QtZ3JvdXBzOnJlYWQiLCJldmVudC9jdXN0b20tZmllbGRzOnJlYWQiLCJldmVudC9jb250YWN0czpyZWFkLXNlbnNpdGl2ZSIsImV2ZW50L2NvbnRhY3RzOndyaXRlIiwiZXZlbnQvY29udGFjdHM6cmVhZCJdLCJzdWIiOiIwb2FsdDZkejgyb1NiTjlvazF0NyJ9.1BGae5F97OpRVlW_z7JFwhFuY5xSj-CTCdph4dy3mSW1fjSb2rXoTTMNqdBwssG8S5XD62MYabx1WpM9xHB1WPw4ydP3xDqpMDO_h1Im1wfdlkami8Xvm1vX293IibEG8sZwjmD7x1UoWE7svwLLKJ8yukpJXaQbrd3qhFpfCyyi-eFYLYYjRjkMaGSBDMQKUv9VV62afGNekkC3ARNJzUqe0Il6Wz7aj109q_gvFYr6XybYdMvXanWxoY9C2-b7g1AtmN7iGRqz2znIHLr7Vav8xvoXYXaWzaq1gbfd4QwrksBCaw4lpZWKJdM0bhCaiOiPGfKktGVQN2r0gaQ0nA'
if 'access_token' in api:
access_token = api.get('access_token')
log.info(f'Cvent Access Token found: {access_token}')
if 'expire_on' in api and datetime.datetime.now() < api['expire_on']:
expire_on = api.get('expire_on')
log.info(f'Cvent Access Token is current: {expire_on}')
return api
endpoint = '/oauth2/token'
uri = api['base_url']+endpoint
data = { 'grant_type': 'client_credentials', 'client_id': app['client_id'] }
log.debug(f'Oauth Token Request Data:\n{api}')
resp = requests.post(url=uri, data=data, auth=HTTPBasicAuth(app['client_id'], app['secret'])) # Sending as HTML form data
log.debug(f'Status Code: {resp.status_code}')
log.debug(f'Headers: {resp.headers}')
log.debug(f'Encoding: {resp.encoding}')
log.debug(f'JSON: {resp.json()}')
# log.debug('Text:', resp.text)
response_data = resp.json()
api['access_token'] = response_data['access_token']
api['expires_in'] = response_data['expires_in']
api['token_type'] = response_data['token_type']
# current_datetime = datetime.datetime.now()
# log.debug(type(current_datetime))
# log.debug(current_datetime)
# expires_in = response_data['expires_in']
# log.debug(type(expires_in))
# log.debug(expires_in)
api['expire_on'] = datetime.datetime.now() + datetime.timedelta(seconds=response_data['expires_in'])
# log.debug(type(api['expire_on']))
# log.debug(api['expire_on'])
api['headers']['Accept'] = 'application/json'
api['headers']['x-api-key'] = app['client_id']
api['headers']['Authorization'] = 'Bearer '+api['access_token']
log.debug(api)
# f = open("api_access.txt", "wb")
# f.write(pprint.pformat(api, indent=2, width=160))
# pickle.dump(api, f)
# f.close()
return api
# Updated 2022-01-31
@logger_reset
def get_contact_custom_field_list(api):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
endpoint = '/custom-fields'
uri = api['base_url']+endpoint
params = { 'filter': "category eq 'Contact'" }
resp = requests.get(url=uri, headers=api['headers'], params=params)
log.debug('Status Code:', resp.status_code)
log.debug('Headers:', resp.headers)
log.debug('Encoding:', resp.encoding)
log.debug('JSON:', resp.json())
# log.debug('Text:', resp.text)
response_data = resp.json()
# f = open('contact_custom_field_list.txt', 'w')
# f.write(pprint.pformat(response_data, indent=2, width=160))
return response_data
# Updated 2022-01-31
@logger_reset
def get_contact_list(api, external_id: str=False):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
endpoint = '/contacts'
uri = api['base_url']+endpoint
# External ID for IDAA: UUID = '609ab766-7d79-4a9d-a72c-f126412659ee'
# customField.609ab766-7d79-4a9d-a72c-f126412659ee eq external_id
params = {}
if external_id: params['filter'] = f"customField.609ab766-7d79-4a9d-a72c-f126412659ee eq '{external_id}'"
elif external_id is None:
params['filter'] = f"deleted eq 'False' and customField.609ab766-7d79-4a9d-a72c-f126412659ee lt '1'"
else: params['filter'] = f"deleted eq 'False'"
resp = requests.get(url=uri, headers=api['headers'], params=params)
response_data = resp.json()
# if external_id: filename = f'contact_{external_id}.txt'
# elif external_id is None: filename = f'contact_list_no_external_id.txt'
# else: filename = 'contact_list.txt'
# f = open(filename, 'w')
# f.write(pprint.pformat(response_data, indent=2, width=160))
return response_data
# Updated 2022-02-02
@logger_reset
def get_recent_contact_list(created_on=None, updated_on=None):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
get_access_token()
log.warning('Sleeping for 1 second to avoid Cvent rate limit...')
time.sleep(1)
endpoint = f'/contacts'
uri = api['base_url']+endpoint
params = {}
if created_on:
created_on_str = created_on.isoformat()
print(f'Created On: {created_on_str}')
params['filter'] = f"deleted eq 'False' and created gt '{created_on_str}Z'"
elif updated_on:
print(f'Updated On: {created_on_str}')
params['filter'] = f"deleted eq 'False' and lastModified gt '{updated_on_str}Z'"
else:
log.warning('Created On or Updated On is requried. Returing False')
return False
resp = requests.get(url=uri, headers=api['headers'], params=params)
response_data = resp.json()
log.debug(response_data)
if 'message' in response_data and response_data['message'] == 'Too Many Requests': return False
return response_data
# Updated 2022-02-01
@logger_reset
def get_contact_id(contact_id: str):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# if 'access_token' in api: pass
# else: get_access_token()
get_access_token()
log.warning('Sleeping for 1 second to avoid Cvent rate limit...')
time.sleep(1)
endpoint = f'/contacts/{contact_id}'
uri = api['base_url']+endpoint
params = {}
resp = requests.get(url=uri, headers=api['headers'], params=params)
response_data = resp.json()
log.debug(response_data)
if 'message' in response_data and response_data['message'] == 'Too Many Requests': return False
return response_data
# Updated 2022-02-01
@logger_reset
def modify_contact_id(contact_id: str, field_list: list=[], custom_field_id: str=None, custom_field_value: str=None):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
get_access_token()
if custom_field_id:
endpoint = f'/contacts/{contact_id}/custom-fields/{custom_field_id}/answers'
elif field_list:
endpoint = f'/contacts/{contact_id}'
else:
log.error('Missing required parameters')
return False
uri = api['base_url']+endpoint
params = {}
data = {}
if custom_field_id:
data['id'] = custom_field_id
data['value'] = custom_field_value
else:
data = field_list
data['id'] = contact_id
if custom_field_id:
resp = requests.put(url=uri, headers=api['headers'], params=params, json=data)
else:
resp = requests.patch(url=uri, headers=api['headers'], params=params, json=data)
response_data = resp.json()
log.debug(response_data)
if 'message' in response_data and response_data['message'] == 'Too Many Requests': return False
return response_data
# ### BEGIN ### API External Cvent ### create_update_aether_person() ###
# Updated 2022-02-02
@logger_reset
def create_update_aether_person(
cvent_contact_id: str,
cvent_contact_obj: dict,
account_id: str,
person_id: str=None,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
person_external_id = None
if custom_field_list := cvent_contact_obj.get('customFields'): # List
for custom_field in custom_field_list:
# Get the External ID created by OSIT
if custom_field.get('id') == '609ab766-7d79-4a9d-a72c-f126412659ee':
# person_data['external_id'] = custom_field.get('value')[0]
person_external_id = custom_field.get('value')[0]
email = cvent_contact_obj.get('email')
membership_person_id = None
membership_person_type_id = None
person_data = {}
person_data['external_id'] = person_external_id
# person_data['pronouns'] = ???
person_data['informal_name'] = cvent_contact_obj.get('nickname')
person_data['title_names'] = cvent_contact_obj.get('prefix')
person_data['given_name'] = cvent_contact_obj.get('firstName')
person_data['middle_name'] = cvent_contact_obj.get('middleName')
person_data['family_name'] = cvent_contact_obj.get('lastName')
person_data['designations'] = cvent_contact_obj.get('designation')
person_data['professional_title'] = cvent_contact_obj.get('title')
person_data['affiliations'] = cvent_contact_obj.get('company')
person_data['enable'] = True
log.debug(person_data)
contact_data = {}
contact_data['email'] = cvent_contact_obj.get('email')
contact_data['cc_email'] = cvent_contact_obj.get('ccEmail')
contact_data['phone_mobile'] = cvent_contact_obj.get('mobilePhone')
contact_data['phone_home'] = cvent_contact_obj.get('homePhone')
contact_data['phone_office'] = cvent_contact_obj.get('workPhone')
log.debug(contact_data)
address_data = {}
if cvent_contact_obj.get('homeAddress'):
address_data['line_1'] = cvent_contact_obj.get('homeAddress').get('address1')
address_data['line_2'] = cvent_contact_obj.get('homeAddress').get('address2')
address_data['line_3'] = cvent_contact_obj.get('homeAddress').get('address3')
address_data['city'] = cvent_contact_obj.get('homeAddress').get('city')
address_data['state_province'] = cvent_contact_obj.get('homeAddress').get('region')
if cvent_contact_obj.get('homeAddress').get('countryCode') and cvent_contact_obj.get('homeAddress').get('regionCode'):
country_subdivision_code = cvent_contact_obj.get('homeAddress').get('countryCode') +'-'+ cvent_contact_obj.get('homeAddress').get('regionCode')
address_data['country_subdivision_code'] = country_subdivision_code
address_data['postal_code'] = cvent_contact_obj.get('homeAddress').get('postalCode')
address_data['country_alpha_2_code'] = cvent_contact_obj.get('homeAddress').get('countryCode')
address_data['country'] = cvent_contact_obj.get('homeAddress').get('country')
address_data['country_alpha_2_code'] = cvent_contact_obj.get('homeAddress').get('countryCode')
log.debug(address_data)
contact_data['address'] = address_data
person_data['contact'] = contact_data
user_data = {}
# user_data['name'] =
user_data['username'] = cvent_contact_obj.get('email')
user_data['email'] = cvent_contact_obj.get('email')
random_password_string = secrets.token_urlsafe(8)
user_data['password'] = secure_hash_string(string=random_password_string)
user_data['email_verified'] = True # Assuming this is True because they came from another system where the email address is required.
user_data['enable'] = True
user_data['enable_from'] = datetime.datetime.now(datetime.timezone.utc)
user_data['enable_to'] = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365)
user_data['super'] = False # This is just not allowed
user_data['manager'] = False # This is just not allowed
# user_data['administrator'] = False
user_data['public'] = False
user_data['verified'] = True
user_data['notes'] = 'Created by importing list'
other_data = {}
other_data['temp_password'] = random_password_string
user_data['other_json'] = json.dumps(other_data, indent=4)
person_data['user'] = user_data
if cvent_contact_obj.get('type'):
membership_type_cvent_id = cvent_contact_obj.get('type').get('id')
membership_type_cvent_name = cvent_contact_obj.get('type').get('name')
log.info(f'Found Cvent Membership Type Named: {membership_type_cvent_name}')
else:
membership_type_cvent_id = None
membership_type_cvent_name = None
# 'id': '5EB898D8-C253-482C-A93A-0B6667C26E04', 'name': 'Al-Anon Member'
# 'id': 'A20358C5-0F6C-47AF-9843-BA9483A9D767', 'name': 'Al-Anon Non-Member'
# 'id': 'A01900AB-496A-48A1-9B04-C2874651227E', 'name': 'Member'
# 'id': '65437A15-39C2-4EB5-9AFE-67AF6FE41C27', 'name': 'Non-Member'
# 'id': '03622AEE-F586-4AE5-A191-B8372543A8C8', 'name': 'Student Member'
# 'id': 'A69FAF20-BF2A-4222-B15B-7B0C7EFBEAA7', 'name': 'Student Non-member'
# 'id': '54127B4D-E531-4046-AF5C-0F0D71DC39D2', 'name': 'Adult Guest Registration'
# 'id': 'C9FA7E47-A925-44AB-B94A-9B3003CA2AC4', 'name': 'Attendee'
# 'id': '6F06D6B6-2C23-4EF8-986F-73BF0DB2B229', 'name': "Children's Program with Jerry Moe (7-12 years)"
# 'id': 'AADABEF0-3C84-45A2-9D9B-E2CF585D4AE5', 'name': 'General Attendee'
# 'id': '96D5B3CC-FD4E-4957-BA71-9CEF388095EF', 'name': 'Guest'
# 'id': '71D07118-C24D-4B2E-888D-56AC1B941495', 'name': "IDAA 20's Guest Registration"
# 'id': 'DA17F721-9924-43E3-A31F-C567BA96DC64', 'name': 'IDAA Teen (13-19 years)'
# 'id': 'C49439B3-5AE6-496F-A0AD-4CCB1A9000E3', 'name': 'Spouse/SO Guest Registration'
# Currently in use for IDAA:
# Member Type = Member Contact Type
# Annual Contribution(s) = Attendee
# Doctoral Qualifying Member(s) = Member
# Student Member(s) = Student Member
membership_person_data = {}
membership_person_type_data = {}
if membership_type_cvent_name:
if membership_type_cvent_name == 'Al-Anon Member' or membership_type_cvent_name == 'Al-Anon Members':
membership_person_type_data['membership_type_id'] = 6
membership_person_type_data['product_id'] = 13
membership_person_type_data['level'] = 1
elif membership_type_cvent_name == 'Annual Contribution' or membership_type_cvent_name == 'Annual Contributions' or membership_type_cvent_name == 'Attendee': # Unsure... making affiliate
membership_person_type_data['membership_type_id'] = 8
membership_person_type_data['product_id'] = 13
membership_person_type_data['level'] = 3
elif membership_type_cvent_name == 'Doctoral Qualifying Member' or membership_type_cvent_name == 'Doctoral Qualifying Members' or membership_type_cvent_name == 'Member':
membership_person_type_data['membership_type_id'] = 5
membership_person_type_data['product_id'] = 4
membership_person_type_data['level'] = 1
elif membership_type_cvent_name == 'Student Member' or membership_type_cvent_name == 'Student Members':
membership_person_type_data['membership_type_id'] = 7
membership_person_type_data['product_id'] = 14
membership_person_type_data['level'] = 1
else:
log.error(f'Unknown Membership Type Name from Cvent: {membership_type_cvent_name}')
return False
membership_person_type_data['first_start_on'] = datetime.datetime.strptime(cvent_contact_obj.get('membership').get('joined'), '%Y-%m-%d')
if cvent_contact_obj.get('membership').get('lastRenewal'):
membership_person_type_data['start_on'] = datetime.datetime.strptime(cvent_contact_obj.get('membership').get('lastRenewal'), '%Y-%m-%d')
else:
membership_person_type_data['start_on'] = datetime.datetime.strptime(cvent_contact_obj.get('membership').get('joined'), '%Y-%m-%d')
membership_person_type_data['end_on'] = datetime.datetime.strptime(cvent_contact_obj.get('membership').get('expiration'), '%Y-%m-%d')
membership_person_type_data['last_end_on'] = datetime.datetime.strptime(cvent_contact_obj.get('membership').get('expiration'), '%Y-%m-%d')
current_datetime = datetime.datetime.now()
if membership_person_type_data['end_on'] >= current_datetime:
membership_person_type_data['lu_membership_type_status_id'] = 5 # 5 = active; expiration is > now
else:
membership_person_type_data['lu_membership_type_status_id'] = 7 # 7 = inactive; expiration is < now
membership_person_data['enable'] = True
membership_person_type_data['enable'] = True
membership_person_data['membership_person_type'] = membership_person_type_data
log.debug(json.dumps(membership_person_data, indent=2, default=str))
# ### TESTING BREAK POINT ###
# person_data['membership_person'] = membership_person_data
# return False
# ### TESTING BREAK POINT ###
# Try to look up using external ID
if not person_id and person_external_id:
log.info(f'Looking up person with External ID: {person_external_id}')
if result := get_person_rec_w_external_id(account_id=account_id, external_id=person_external_id):
log.debug(result)
person_id = result.get('person_id')
log.info(f'Person ID {person_id} found using external ID.')
else: pass
# If that fails then try to look up using email address
if not person_id and email:
log.info(f'Looking up person with Email Address: {email}')
if result := get_person_rec_list(for_obj_type='account', for_obj_id=account_id, email=email):
log.debug(result[0])
person_id = result[0].get('person_id')
log.info(f'Person ID {person_id} found using email address.')
else: pass
log.debug(f'Person ID: {person_id}; External ID: {person_external_id}; Email: {email}')
# ### TESTING BREAK POINT ###
# person_data['membership_person'] = membership_person_data
# return False
# ### TESTING BREAK POINT ###
# If there is not an external ID for an existing or new person then one needs to be created.
if not person_external_id:
N = 8
random_string = ''.join(random.choices(string.ascii_uppercase + string.digits, k=N))
random_string = ''.join(random.SystemRandom().choices(string.ascii_uppercase + string.digits, k=N))
# random_string = ''.join(random.SystemRandom().choice(string.ascii_uppercase + string.digits) for _ in range(N))
log.info(f'Created random string: {random_string}')
email = cvent_contact_obj.get('email')
person_external_id = f'{random_string}~{email}'
log.info(f'Created new Person External ID: {person_external_id}')
person_data['external_id'] = person_external_id
# NOTE: Update Cvent Contact
custom_field_id = '609ab766-7d79-4a9d-a72c-f126412659ee' # IDAA Cvent External ID UUID
custom_field_value = [person_external_id]
log.warning('Sleeping for 1 second to avoid Cvent rate limit...')
time.sleep(1)
contact_mod_result = modify_contact_id(
contact_id = cvent_person_id, # This is the Cvent Contact UUID for a person
# field_list = field_list,
custom_field_id = custom_field_id,
custom_field_value = custom_field_value,
)
if person_id:
# ### SECTION ### Load person object to populate the person_data and membership_person_data dicts.
person_obj = load_person_obj(person_id=person_id, inc_address=True, inc_contact=True, inc_membership_person=True, inc_membership_person_type=True, inc_user=True)
log.debug(person_obj)
person_data['id'] = person_id
person_data['account_id'] = account_id
person_data['contact']['id'] = person_obj.contact.id
person_data['contact']['address']['id'] = person_obj.contact.address.id
if person_obj.user:
person_data['user']['id'] = person_obj.user.id
if person_obj.membership_person:
membership_person_id = person_obj.membership_person.id
if person_obj.membership_person.membership_person_type:
membership_person_type_id = person_obj.membership_person.membership_person_type.id
if not person_obj.external_id:
person_data['external_id'] = person_external_id
log.debug(person_data)
if update_person_kiss(person_id=person_id, person_dict_obj=person_data):
log.info(f'Updated Person ID: {person_id}')
else:
log.info(f'Did not update Person ID: {person_id}')
else:
# ### SECTION ### Create new person and related with the person_data and membership_person_data dicts.
# person_data['id'] = person_id
person_data['account_id'] = account_id
# person_data['contact']['id'] = person_obj.contact.id
# person_data['contact']['address']['id'] = person_obj.contact.address.id
# person_data['user']['id'] = person_obj.user.id
log.debug(person_data)
if create_person_obj_result := create_person_kiss(account_id=account_id, person_dict_obj=person_data):
person_id = create_person_obj_result
log.info(f'Created Person ID: {person_id}')
else:
log.info(f'Did not create Person')
# ### TESTING BREAK POINT ###
# person_data['membership_person'] = membership_person_data
# return False
# ### TESTING BREAK POINT ###
if membership_type_cvent_id and membership_person_id:
# membership_person_id = person_obj.membership_person.id
membership_person_data['id'] = membership_person_id
membership_person_data['membership_person_type']['id'] = membership_person_type_id
if update_membership_person_obj(membership_person_id=membership_person_id, membership_person_dict_obj=membership_person_data):
log.info(f'Updated Membership Person ID: {membership_person_id}')
else:
log.info(f'Did not update Membership Person ID: {membership_person_id}')
elif membership_type_cvent_id:
if create_membership_person_obj_result := create_membership_person_obj(account_id=account_id, person_id=person_id, membership_person_dict_obj=membership_person_data):
membership_person_id = create_membership_person_obj_result
log.info(f'Created Membership Person ID: {membership_person_id}')
else:
log.info(f'Did not create Membership Person')
else:
membership_person_data = {}
person_data['membership_person'] = membership_person_data
log.debug(json.dumps(person_data, indent=2, default=str))
person_obj = load_person_obj(person_id=person_id, inc_address=True, inc_contact=True, inc_membership_person=True, inc_membership_person_type=True, inc_user=True)
log.debug(person_obj)
return person_obj # True
# ### END ### API External Cvent ### create_update_aether_person() ###