from __future__ import annotations import datetime, pprint, random, requests from typing import Dict, List, Optional, Set, Union from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update from app.lib_general import log, logging headers = { 'Content-Type': 'application/json;charset=UTF-8' } app = {} app['user_email'] = 'oneskyit_integration@oneskyit.com' app['user_password'] = 'dAPpHDE6d5vLHjsk' app['name'] = 'IshltOneSkyITLIVE' app['key'] = '98yp4fa57mJX6nU4' app['id'] = 'IshltOneSkyITLIVE' app['password'] = '98yp4fa57mJX6nU4' print('App data', app) api = {} api['base_url'] = 'https://public.impexium.com/Api/v1' # or https://ishlt.mpxapi.com:443/api/v1 ?? api['headers'] = { 'Content-Type': 'application/json;charset=UTF-8' } def get_access_token(api, app): print('***** **** *** ** * ** *** **** *****') print('*** Get Web API End Point and Access Token ***') print('***** **** *** ** * ** *** **** *****') endpoint = '/WebApiUrl' uri = api['base_url']+endpoint data = { 'AppName': app['name'], 'AppKey': app['key'] } r = requests.post(url=uri, json=data, headers=api['headers']) print('**************************') print('Status Code:', r.status_code) print('Headers:', r.headers) # print('Encoding:', r.encoding) print('JSON:', r.json()) # print('Text:', r.text) print('**************************') response_data = r.json() api['access_token'] = response_data['accessToken'] api['headers']['AccessToken'] = response_data['accessToken'] api['auth_uri'] = response_data['uri'] return api def authenticate(api, app): print('***** **** *** ** * ** *** **** *****') print('*** Authenticate App, and get app token and user token (SSO Token) ***') print('***** **** *** ** * ** *** **** *****') data = { 'AppId': app['id'], 'AppPassword': app['password'], 'AppUserEmail': app['user_email'], 'AppUserPassword': app['user_password'] } r = requests.post(url=api['auth_uri'], json=data, headers=api['headers']) print('**************************') print('Status Code:', r.status_code) print('Headers:', r.headers) # print('Encoding:', r.encoding) print('JSON:', r.json()) # print('Text:', r.text) print('**************************') response_data = r.json() api['headers']['AppToken'] = response_data['appToken'] api['headers']['UserToken'] = response_data['userToken'] api['base_url'] = response_data['uri'] # app['user_id'] = response_data['userId'] # app['sso_token'] = response_data['ssoToken'] # app['app_configuration'] = response_data['appConfiguration'] # allowGoogleLogin=False, allowLinkedInLogin=False return api api = get_access_token(api=api, app=app) log.debug(api) api = authenticate(api=api, app=app) log.debug(api) def get_custom_fields(api, name=None, page=1): print('***** **** *** ** * ** *** **** *****') print('*** Get Custom Fields ***') print('***** **** *** ** * ** *** **** *****') endpoint = f'/Setup/customfields/{page}' uri = api['base_url']+endpoint print('Endpoint URI', uri) params = {} r = requests.get(url=uri, params=params, headers=api['headers']) print('**************************') print('Status Code:', r.status_code) # print('Headers:', r.headers) # print('Encoding:', r.encoding) print('JSON:', r.json()) # print('Text:', r.text) print('**************************') response_data = r.json() # pp = pprint.PrettyPrinter(indent=2) # pp.pprint(response_data) # print('**************************') custom_field_li = response_data return custom_field_li def get_events(page=1): print('***** **** *** ** * ** *** **** *****') print('*** Get Events ***') print('***** **** *** ** * ** *** **** *****') endpoint = f'/Events/All/{page}' uri = api['base_url']+endpoint print('Endpoint URI', uri) params = {} r = requests.get(url=uri, params=params, headers=api['headers']) print('**************************') print('Status Code:', r.status_code) # print('Headers:', r.headers) # print('Encoding:', r.encoding) print('JSON:', r.json()) # print('Text:', r.text) print('**************************') response_data = r.json() # pp = pprint.PrettyPrinter(indent=2) # pp.pprint(response_data) # print('**************************') event_li = response_data return event_li # ### BEGIN ### API Impexium Methods ### get_event_registrants() ### # Updated 2021-10-07 def get_event_registrants( event_code: str, page: int = 1, return_all: bool = False ): print('***** **** *** ** * ** *** **** *****') print('*** Get Event Registrants ***') print('***** **** *** ** * ** *** **** *****') endpoint = f'/Events/{event_code}/Registrations/{page}' uri = api['base_url']+endpoint params = {} r = requests.get(url=uri, params=params, headers=api['headers']) print('**************************') print('Status Code:', r.status_code) # print('Headers:', r.headers) # print('Encoding:', r.encoding) # print('JSON:', r.json()) # print('Text:', r.text) print('**************************') response_data = r.json() # pp = pprint.PrettyPrinter(indent=2) # pp.pprint(response_data) # print(response_data) # print('**************************') event_registrant_li = response_data if return_all: return event_registrant_li else: return event_registrant_li['dataList'] # ### END ### API Impexium Methods ### get_event_registrants() ### def get_individual_registrations(api, record_number, event_code=None, page=1): print('***** **** *** ** * ** *** **** *****') print('*** Get Individual Registrations ***') print('***** **** *** ** * ** *** **** *****') endpoint = f'/Individuals/{record_number}/Registrations/{page}' uri = api['base_url']+endpoint params = { 'eventCode': event_code } r = requests.get(url=uri, params=params, headers=api['headers']) print('**************************') print('Status Code:', r.status_code) # print('Headers:', r.headers) # print('Encoding:', r.encoding) # print('JSON:', r.json()) # print('Text:', r.text) print('**************************') response_data = r.json() # pp = pprint.PrettyPrinter(indent=2) # pp.pprint(response_data) # print('**************************') individual_registraion_li = response_data return individual_registraion_li def get_individual_purchases(api, record_number, product_code=None, purchased_since=None, product_category_code=None, page=1): print('***** **** *** ** * ** *** **** *****') print('*** Get Individual Purchases ***') print('***** **** *** ** * ** *** **** *****') endpoint = f'/Individuals/{record_number}/Purchases/{page}' uri = api['base_url']+endpoint params = { 'productCode': product_code, 'purchasedSince': purchased_since, 'productCategoryCode': product_category_code } r = requests.get(url=uri, params=params, headers=api['headers']) print('**************************') print('Status Code:', r.status_code) # print('Headers:', r.headers) # print('Encoding:', r.encoding) print('JSON:', r.json()) # print('Text:', r.text) print('**************************') response_data = r.json() # pp = pprint.PrettyPrinter(indent=2) # pp.pprint(response_data) # print('**************************') individual_purchase_li = response_data return individual_purchase_li def get_individual_custom_fields(api, individual_id): print('***** **** *** ** * ** *** **** *****') print('*** Get Individual Custom Fields ***') print('***** **** *** ** * ** *** **** *****') endpoint = f'/Individuals/{individual_id}/CustomFields' uri = api['base_url']+endpoint params = { } r = requests.get(url=uri, params=params, headers=api['headers']) print('**************************') print('Status Code:', r.status_code) # print('Headers:', r.headers) # print('Encoding:', r.encoding) # print('JSON:', r.json()) # print('Text:', r.text) print('**************************') response_data = r.json() # pp = pprint.PrettyPrinter(indent=2) # pp.pprint(response_data) # print('**************************') individual_custom_field_li = response_data return individual_custom_field_li def get_individual_orders_open(api, record_number, include_line_items=False, from_date=None, to_date=None, page=1): print('***** **** *** ** * ** *** **** *****') print('*** Get Individual Orders Open ***') print('***** **** *** ** * ** *** **** *****') endpoint = f'/Individuals/{record_number}/Orders/Open/{page}' uri = api['base_url']+endpoint print(uri) params = { 'includeLineItems': include_line_items, 'fromDate': from_date, 'toDate': to_date } r = requests.get(url=uri, params=params, headers=api['headers']) print('**************************') print('Status Code:', r.status_code) print('Headers:', r.headers) # print('Encoding:', r.encoding) print('JSON:', r.json()) print('Text:', r.text) print('**************************') response_data = r.json() # pp = pprint.PrettyPrinter(indent=2) # pp.pprint(response_data) # print('**************************') individual_orders_open_li = response_data return individual_orders_open_li