from __future__ import annotations import datetime, pprint, pytz, random, requests, time from requests.auth import HTTPBasicAuth from typing import Dict, List, Optional, Set, Union from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update from app.lib_general import log, logging, logger_reset from app.methods.person_methods import get_person_rec_w_external_id, load_person_obj, update_person_kiss app = {} app['client_id'] = '0oalt6dz82oSbN9ok1t7' # From Cvent Developer Portal app['secret'] = 'gQY96qffZAuB_44k73C_hn_MHeByBS8LXHj1vPRm' # From Cvent Developer api = {} api['base_url'] = 'https://api-platform.cvent.com/ea' # Including /ea as the Cvent version. EA = Early Access api['headers'] = {} # { 'Content-Type': content_type, 'Authorization': 'Basic '+str(cvent_authorization_base64) } # Updated 2022-02-01 @logger_reset def get_access_token(): log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) log.debug(f'API data:\n{api}') # api['access_token'] = 'eyJraWQiOiJXdWtMTUFGNFkxM1ZUQmFnV1I4WU94ZVdPU3dIYXM5RTRyaFhqc1p5X2JVIiwiYWxnIjoiUlMyNTYifQ.eyJ2ZXIiOjEsImp0aSI6IkFULmltZGJlbnJIbnE1Rjd4blhucEN2Y0ExTzdCWUtCUWhUTU1fQUdxd1Z0RUUiLCJpc3MiOiJodHRwczovL3Nzby5jdmVudC5jb20vb2F1dGgyL2F1c2kzbXowZjNvRENNMlRpMXQ3IiwiYXVkIjoiYXBpLXBsYXRmb3JtIiwiaWF0IjoxNjQzNjYxMTc1LCJleHAiOjE2NDM2NjQ3NzUsImNpZCI6IjBvYWx0NmR6ODJvU2JOOW9rMXQ3Iiwic2NwIjpbImV2ZW50L2NvbnRhY3QtZ3JvdXBzOnJlYWQiLCJldmVudC9jdXN0b20tZmllbGRzOnJlYWQiLCJldmVudC9jb250YWN0czpyZWFkLXNlbnNpdGl2ZSIsImV2ZW50L2NvbnRhY3RzOndyaXRlIiwiZXZlbnQvY29udGFjdHM6cmVhZCJdLCJzdWIiOiIwb2FsdDZkejgyb1NiTjlvazF0NyJ9.1BGae5F97OpRVlW_z7JFwhFuY5xSj-CTCdph4dy3mSW1fjSb2rXoTTMNqdBwssG8S5XD62MYabx1WpM9xHB1WPw4ydP3xDqpMDO_h1Im1wfdlkami8Xvm1vX293IibEG8sZwjmD7x1UoWE7svwLLKJ8yukpJXaQbrd3qhFpfCyyi-eFYLYYjRjkMaGSBDMQKUv9VV62afGNekkC3ARNJzUqe0Il6Wz7aj109q_gvFYr6XybYdMvXanWxoY9C2-b7g1AtmN7iGRqz2znIHLr7Vav8xvoXYXaWzaq1gbfd4QwrksBCaw4lpZWKJdM0bhCaiOiPGfKktGVQN2r0gaQ0nA' if 'access_token' in api: access_token = api.get('access_token') log.info(f'Cvent Access Token found: {access_token}') if 'expire_on' in api and datetime.datetime.now() < api['expire_on']: expire_on = api.get('expire_on') log.info(f'Cvent Access Token is current: {expire_on}') return api endpoint = '/oauth2/token' uri = api['base_url']+endpoint data = { 'grant_type': 'client_credentials', 'client_id': app['client_id'] } log.debug(f'Oauth Token Request Data:\n{api}') resp = requests.post(url=uri, data=data, auth=HTTPBasicAuth(app['client_id'], app['secret'])) # Sending as HTML form data log.debug(f'Status Code: {resp.status_code}') log.debug(f'Headers: {resp.headers}') log.debug(f'Encoding: {resp.encoding}') log.debug(f'JSON: {resp.json()}') # log.debug('Text:', resp.text) response_data = resp.json() api['access_token'] = response_data['access_token'] api['expires_in'] = response_data['expires_in'] api['token_type'] = response_data['token_type'] # current_datetime = datetime.datetime.now() # log.debug(type(current_datetime)) # log.debug(current_datetime) # expires_in = response_data['expires_in'] # log.debug(type(expires_in)) # log.debug(expires_in) api['expire_on'] = datetime.datetime.now() + datetime.timedelta(seconds=response_data['expires_in']) # log.debug(type(api['expire_on'])) # log.debug(api['expire_on']) api['headers']['Accept'] = 'application/json' api['headers']['x-api-key'] = app['client_id'] api['headers']['Authorization'] = 'Bearer '+api['access_token'] log.debug(api) # f = open("api_access.txt", "wb") # f.write(pprint.pformat(api, indent=2, width=160)) # pickle.dump(api, f) # f.close() return api # Updated 2022-01-31 @logger_reset def get_contact_custom_field_list(api): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) endpoint = '/custom-fields' uri = api['base_url']+endpoint params = { 'filter': "category eq 'Contact'" } resp = requests.get(url=uri, headers=api['headers'], params=params) log.debug('Status Code:', resp.status_code) log.debug('Headers:', resp.headers) log.debug('Encoding:', resp.encoding) log.debug('JSON:', resp.json()) # log.debug('Text:', resp.text) response_data = resp.json() # f = open('contact_custom_field_list.txt', 'w') # f.write(pprint.pformat(response_data, indent=2, width=160)) return response_data # Updated 2022-01-31 @logger_reset def get_contact_list(api, external_id: str=False): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) endpoint = '/contacts' uri = api['base_url']+endpoint # External ID for IDAA: UUID = '609ab766-7d79-4a9d-a72c-f126412659ee' # customField.609ab766-7d79-4a9d-a72c-f126412659ee eq external_id params = {} if external_id: params['filter'] = f"customField.609ab766-7d79-4a9d-a72c-f126412659ee eq '{external_id}'" elif external_id is None: params['filter'] = f"deleted eq 'False' and customField.609ab766-7d79-4a9d-a72c-f126412659ee lt '1'" else: params['filter'] = f"deleted eq 'False'" resp = requests.get(url=uri, headers=api['headers'], params=params) response_data = resp.json() # if external_id: filename = f'contact_{external_id}.txt' # elif external_id is None: filename = f'contact_list_no_external_id.txt' # else: filename = 'contact_list.txt' # f = open(filename, 'w') # f.write(pprint.pformat(response_data, indent=2, width=160)) return response_data # Updated 2022-02-01 @logger_reset def get_contact_id(contact_id: str): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) # if 'access_token' in api: pass # else: get_access_token() get_access_token() log.warning('Sleeping for 1 second to avoid Cvent rate limit...') time.sleep(1) endpoint = f'/contacts/{contact_id}' uri = api['base_url']+endpoint params = {} resp = requests.get(url=uri, headers=api['headers'], params=params) response_data = resp.json() log.debug(response_data) if 'message' in response_data and response_data['message'] == 'Too Many Requests': return False # filename = f'contact_{contact_id}.txt' # f = open(filename, 'w') # f.write(pprint.pformat(response_data, indent=2, width=160)) return response_data # Updated 2022-02-01 @logger_reset def modify_contact_id(contact_id: str, field_list: list=[], custom_field_id: str=None, custom_field_value: str=None): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) get_access_token() if custom_field_id: endpoint = f'/contacts/{contact_id}/custom-fields/{custom_field_id}/answers' elif field_list: endpoint = f'/contacts/{contact_id}' else: log.error('Missing required parameters') return False uri = api['base_url']+endpoint params = {} data = {} if custom_field_id: data['id'] = custom_field_id data['value'] = custom_field_value else: data = field_list data['id'] = contact_id if custom_field_id: resp = requests.put(url=uri, headers=api['headers'], params=params, json=data) else: resp = requests.patch(url=uri, headers=api['headers'], params=params, json=data) response_data = resp.json() log.debug(response_data) if 'message' in response_data and response_data['message'] == 'Too Many Requests': return False return response_data