Files
OSIT-AE-API-FastAPI/app/methods/e_impexium_methods.py
2021-10-07 23:14:10 -04:00

323 lines
9.4 KiB
Python

from __future__ import annotations
import datetime, pprint, random, requests
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
from app.lib_general import log, logging
headers = { 'Content-Type': 'application/json;charset=UTF-8' }
app = {}
app['user_email'] = 'oneskyit_integration@oneskyit.com'
app['user_password'] = 'dAPpHDE6d5vLHjsk'
app['name'] = 'IshltOneSkyITLIVE'
app['key'] = '98yp4fa57mJX6nU4'
app['id'] = 'IshltOneSkyITLIVE'
app['password'] = '98yp4fa57mJX6nU4'
print('App data', app)
api = {}
api['base_url'] = 'https://public.impexium.com/Api/v1' # or https://ishlt.mpxapi.com:443/api/v1 ??
api['headers'] = { 'Content-Type': 'application/json;charset=UTF-8' }
def get_access_token(api, app):
print('***** **** *** ** * ** *** **** *****')
print('*** Get Web API End Point and Access Token ***')
print('***** **** *** ** * ** *** **** *****')
endpoint = '/WebApiUrl'
uri = api['base_url']+endpoint
data = { 'AppName': app['name'], 'AppKey': app['key'] }
r = requests.post(url=uri, json=data, headers=api['headers'])
print('**************************')
print('Status Code:', r.status_code)
print('Headers:', r.headers)
# print('Encoding:', r.encoding)
print('JSON:', r.json())
# print('Text:', r.text)
print('**************************')
response_data = r.json()
api['access_token'] = response_data['accessToken']
api['headers']['AccessToken'] = response_data['accessToken']
api['auth_uri'] = response_data['uri']
return api
def authenticate(api, app):
print('***** **** *** ** * ** *** **** *****')
print('*** Authenticate App, and get app token and user token (SSO Token) ***')
print('***** **** *** ** * ** *** **** *****')
data = { 'AppId': app['id'], 'AppPassword': app['password'], 'AppUserEmail': app['user_email'], 'AppUserPassword': app['user_password'] }
r = requests.post(url=api['auth_uri'], json=data, headers=api['headers'])
print('**************************')
print('Status Code:', r.status_code)
print('Headers:', r.headers)
# print('Encoding:', r.encoding)
print('JSON:', r.json())
# print('Text:', r.text)
print('**************************')
response_data = r.json()
api['headers']['AppToken'] = response_data['appToken']
api['headers']['UserToken'] = response_data['userToken']
api['base_url'] = response_data['uri']
# app['user_id'] = response_data['userId']
# app['sso_token'] = response_data['ssoToken']
# app['app_configuration'] = response_data['appConfiguration'] # allowGoogleLogin=False, allowLinkedInLogin=False
return api
api = get_access_token(api=api, app=app)
log.debug(api)
api = authenticate(api=api, app=app)
log.debug(api)
def get_custom_fields(api, name=None, page=1):
print('***** **** *** ** * ** *** **** *****')
print('*** Get Custom Fields ***')
print('***** **** *** ** * ** *** **** *****')
endpoint = f'/Setup/customfields/{page}'
uri = api['base_url']+endpoint
print('Endpoint URI', uri)
params = {}
r = requests.get(url=uri, params=params, headers=api['headers'])
print('**************************')
print('Status Code:', r.status_code)
# print('Headers:', r.headers)
# print('Encoding:', r.encoding)
print('JSON:', r.json())
# print('Text:', r.text)
print('**************************')
response_data = r.json()
# pp = pprint.PrettyPrinter(indent=2)
# pp.pprint(response_data)
# print('**************************')
custom_field_li = response_data
return custom_field_li
def get_events(page=1):
print('***** **** *** ** * ** *** **** *****')
print('*** Get Events ***')
print('***** **** *** ** * ** *** **** *****')
endpoint = f'/Events/All/{page}'
uri = api['base_url']+endpoint
print('Endpoint URI', uri)
params = {}
r = requests.get(url=uri, params=params, headers=api['headers'])
print('**************************')
print('Status Code:', r.status_code)
# print('Headers:', r.headers)
# print('Encoding:', r.encoding)
print('JSON:', r.json())
# print('Text:', r.text)
print('**************************')
response_data = r.json()
# pp = pprint.PrettyPrinter(indent=2)
# pp.pprint(response_data)
# print('**************************')
event_li = response_data
return event_li
# ### BEGIN ### API Impexium Methods ### get_event_registrants() ###
# Updated 2021-10-07
def get_event_registrants(
event_code: str,
page: int = 1,
return_all: bool = False
):
print('***** **** *** ** * ** *** **** *****')
print('*** Get Event Registrants ***')
print('***** **** *** ** * ** *** **** *****')
endpoint = f'/Events/{event_code}/Registrations/{page}'
uri = api['base_url']+endpoint
params = {}
r = requests.get(url=uri, params=params, headers=api['headers'])
print('**************************')
print('Status Code:', r.status_code)
# print('Headers:', r.headers)
# print('Encoding:', r.encoding)
# print('JSON:', r.json())
# print('Text:', r.text)
print('**************************')
response_data = r.json()
# pp = pprint.PrettyPrinter(indent=2)
# pp.pprint(response_data)
# print(response_data)
# print('**************************')
event_registrant_li = response_data
if return_all:
return event_registrant_li
else:
return event_registrant_li['dataList']
# ### END ### API Impexium Methods ### get_event_registrants() ###
def get_individual_registrations(api, record_number, event_code=None, page=1):
print('***** **** *** ** * ** *** **** *****')
print('*** Get Individual Registrations ***')
print('***** **** *** ** * ** *** **** *****')
endpoint = f'/Individuals/{record_number}/Registrations/{page}'
uri = api['base_url']+endpoint
params = { 'eventCode': event_code }
r = requests.get(url=uri, params=params, headers=api['headers'])
print('**************************')
print('Status Code:', r.status_code)
# print('Headers:', r.headers)
# print('Encoding:', r.encoding)
# print('JSON:', r.json())
# print('Text:', r.text)
print('**************************')
response_data = r.json()
# pp = pprint.PrettyPrinter(indent=2)
# pp.pprint(response_data)
# print('**************************')
individual_registraion_li = response_data
return individual_registraion_li
def get_individual_purchases(api, record_number, product_code=None, purchased_since=None, product_category_code=None, page=1):
print('***** **** *** ** * ** *** **** *****')
print('*** Get Individual Purchases ***')
print('***** **** *** ** * ** *** **** *****')
endpoint = f'/Individuals/{record_number}/Purchases/{page}'
uri = api['base_url']+endpoint
params = { 'productCode': product_code, 'purchasedSince': purchased_since, 'productCategoryCode': product_category_code }
r = requests.get(url=uri, params=params, headers=api['headers'])
print('**************************')
print('Status Code:', r.status_code)
# print('Headers:', r.headers)
# print('Encoding:', r.encoding)
print('JSON:', r.json())
# print('Text:', r.text)
print('**************************')
response_data = r.json()
# pp = pprint.PrettyPrinter(indent=2)
# pp.pprint(response_data)
# print('**************************')
individual_purchase_li = response_data
return individual_purchase_li
def get_individual_custom_fields(api, individual_id):
print('***** **** *** ** * ** *** **** *****')
print('*** Get Individual Custom Fields ***')
print('***** **** *** ** * ** *** **** *****')
endpoint = f'/Individuals/{individual_id}/CustomFields'
uri = api['base_url']+endpoint
params = { }
r = requests.get(url=uri, params=params, headers=api['headers'])
print('**************************')
print('Status Code:', r.status_code)
# print('Headers:', r.headers)
# print('Encoding:', r.encoding)
# print('JSON:', r.json())
# print('Text:', r.text)
print('**************************')
response_data = r.json()
# pp = pprint.PrettyPrinter(indent=2)
# pp.pprint(response_data)
# print('**************************')
individual_custom_field_li = response_data
return individual_custom_field_li
def get_individual_orders_open(api, record_number, include_line_items=False, from_date=None, to_date=None, page=1):
print('***** **** *** ** * ** *** **** *****')
print('*** Get Individual Orders Open ***')
print('***** **** *** ** * ** *** **** *****')
endpoint = f'/Individuals/{record_number}/Orders/Open/{page}'
uri = api['base_url']+endpoint
print(uri)
params = { 'includeLineItems': include_line_items, 'fromDate': from_date, 'toDate': to_date }
r = requests.get(url=uri, params=params, headers=api['headers'])
print('**************************')
print('Status Code:', r.status_code)
print('Headers:', r.headers)
# print('Encoding:', r.encoding)
print('JSON:', r.json())
print('Text:', r.text)
print('**************************')
response_data = r.json()
# pp = pprint.PrettyPrinter(indent=2)
# pp.pprint(response_data)
# print('**************************')
individual_orders_open_li = response_data
return individual_orders_open_li