from __future__ import annotations import datetime, pprint, pytz, random, requests from requests.auth import HTTPBasicAuth from typing import Dict, List, Optional, Set, Union from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update from app.lib_general import log, logging from app.methods.person_methods import get_person_rec_w_external_id, load_person_obj, update_person_kiss app = {} app['client_id'] = '0oalt6dz82oSbN9ok1t7' # From Cvent Developer Portal app['secret'] = 'gQY96qffZAuB_44k73C_hn_MHeByBS8LXHj1vPRm' # From Cvent Developer api = {} api['base_url'] = 'https://api-platform.cvent.com/ea' # Including /ea as the Cvent version. EA = Early Access api['headers'] = {} # { 'Content-Type': content_type, 'Authorization': 'Basic '+str(cvent_authorization_base64) } # def get_access_token(api, app): def get_access_token(): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) log.debug(f'API data:\n{api}') # api['access_token'] = 'eyJraWQiOiJXdWtMTUFGNFkxM1ZUQmFnV1I4WU94ZVdPU3dIYXM5RTRyaFhqc1p5X2JVIiwiYWxnIjoiUlMyNTYifQ.eyJ2ZXIiOjEsImp0aSI6IkFULmltZGJlbnJIbnE1Rjd4blhucEN2Y0ExTzdCWUtCUWhUTU1fQUdxd1Z0RUUiLCJpc3MiOiJodHRwczovL3Nzby5jdmVudC5jb20vb2F1dGgyL2F1c2kzbXowZjNvRENNMlRpMXQ3IiwiYXVkIjoiYXBpLXBsYXRmb3JtIiwiaWF0IjoxNjQzNjYxMTc1LCJleHAiOjE2NDM2NjQ3NzUsImNpZCI6IjBvYWx0NmR6ODJvU2JOOW9rMXQ3Iiwic2NwIjpbImV2ZW50L2NvbnRhY3QtZ3JvdXBzOnJlYWQiLCJldmVudC9jdXN0b20tZmllbGRzOnJlYWQiLCJldmVudC9jb250YWN0czpyZWFkLXNlbnNpdGl2ZSIsImV2ZW50L2NvbnRhY3RzOndyaXRlIiwiZXZlbnQvY29udGFjdHM6cmVhZCJdLCJzdWIiOiIwb2FsdDZkejgyb1NiTjlvazF0NyJ9.1BGae5F97OpRVlW_z7JFwhFuY5xSj-CTCdph4dy3mSW1fjSb2rXoTTMNqdBwssG8S5XD62MYabx1WpM9xHB1WPw4ydP3xDqpMDO_h1Im1wfdlkami8Xvm1vX293IibEG8sZwjmD7x1UoWE7svwLLKJ8yukpJXaQbrd3qhFpfCyyi-eFYLYYjRjkMaGSBDMQKUv9VV62afGNekkC3ARNJzUqe0Il6Wz7aj109q_gvFYr6XybYdMvXanWxoY9C2-b7g1AtmN7iGRqz2znIHLr7Vav8xvoXYXaWzaq1gbfd4QwrksBCaw4lpZWKJdM0bhCaiOiPGfKktGVQN2r0gaQ0nA' if 'access_token' in api: access_token = api.get('access_token') log.info(f'Cvent Access Token found: {access_token}') if 'expire_on' in api and datetime.datetime.now() < api['expire_on']: expire_on = api.get('expire_on') log.info(f'Cvent Access Token is current: {expire_on}') return api endpoint = '/oauth2/token' uri = api['base_url']+endpoint data = { 'grant_type': 'client_credentials', 'client_id': app['client_id'] } log.debug(f'Oauth Token Request Data:\n{api}') resp = requests.post(url=uri, data=data, auth=HTTPBasicAuth(app['client_id'], app['secret'])) # Sending as HTML form data log.debug(f'Status Code: {resp.status_code}') log.debug(f'Headers: {resp.headers}') log.debug(f'Encoding: {resp.encoding}') log.debug(f'JSON: {resp.json()}') # log.debug('Text:', resp.text) response_data = resp.json() api['access_token'] = response_data['access_token'] api['expires_in'] = response_data['expires_in'] api['token_type'] = response_data['token_type'] # current_datetime = datetime.datetime.now() # log.debug(type(current_datetime)) # log.debug(current_datetime) # expires_in = response_data['expires_in'] # log.debug(type(expires_in)) # log.debug(expires_in) api['expire_on'] = datetime.datetime.now() + datetime.timedelta(seconds=response_data['expires_in']) # log.debug(type(api['expire_on'])) # log.debug(api['expire_on']) api['headers']['Accept'] = 'application/json' api['headers']['x-api-key'] = app['client_id'] api['headers']['Authorization'] = 'Bearer '+api['access_token'] log.debug(api) # f = open("api_access.txt", "wb") # f.write(pprint.pformat(api, indent=2, width=160)) # pickle.dump(api, f) # f.close() return api def get_contact_custom_field_list(api): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) endpoint = '/custom-fields' uri = api['base_url']+endpoint params = { 'filter': "category eq 'Contact'" } resp = requests.get(url=uri, headers=api['headers'], params=params) log.debug('Status Code:', resp.status_code) log.debug('Headers:', resp.headers) log.debug('Encoding:', resp.encoding) log.debug('JSON:', resp.json()) # log.debug('Text:', resp.text) response_data = resp.json() # f = open('contact_custom_field_list.txt', 'w') # f.write(pprint.pformat(response_data, indent=2, width=160)) return response_data def get_contact_list(api, external_id=False): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) endpoint = '/contacts' uri = api['base_url']+endpoint # External ID for IDAA: UUID = '609ab766-7d79-4a9d-a72c-f126412659ee' # customField.609ab766-7d79-4a9d-a72c-f126412659ee eq external_id params = {} if external_id: params['filter'] = f"customField.609ab766-7d79-4a9d-a72c-f126412659ee eq '{external_id}'" elif external_id is None: params['filter'] = f"deleted eq 'False' and customField.609ab766-7d79-4a9d-a72c-f126412659ee lt '1'" else: params['filter'] = f"deleted eq 'False'" resp = requests.get(url=uri, headers=api['headers'], params=params) response_data = resp.json() # if external_id: filename = f'contact_{external_id}.txt' # elif external_id is None: filename = f'contact_list_no_external_id.txt' # else: filename = 'contact_list.txt' # f = open(filename, 'w') # f.write(pprint.pformat(response_data, indent=2, width=160)) return response_data # def get_contact_id(api, contact_id): def get_contact_id(contact_id): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) # if 'access_token' in api: pass # else: get_access_token() get_access_token() endpoint = f'/contacts/{contact_id}' uri = api['base_url']+endpoint params = {} resp = requests.get(url=uri, headers=api['headers'], params=params) response_data = resp.json() log.debug(response_data) if 'message' in response_data and response_data['message'] == 'Too Many Requests': return False # filename = f'contact_{contact_id}.txt' # f = open(filename, 'w') # f.write(pprint.pformat(response_data, indent=2, width=160)) return response_data