320 lines
9.3 KiB
Python
320 lines
9.3 KiB
Python
from __future__ import annotations
|
|
import datetime, pprint, random, requests
|
|
|
|
from typing import Dict, List, Optional, Set, Union
|
|
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
|
|
|
|
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
|
|
from app.lib_general import log, logging
|
|
|
|
headers = { 'Content-Type': 'application/json;charset=UTF-8' }
|
|
|
|
app = {}
|
|
app['user_email'] = 'oneskyit_integration@oneskyit.com'
|
|
app['user_password'] = 'dAPpHDE6d5vLHjsk'
|
|
|
|
app['name'] = 'IshltOneSkyITLIVE'
|
|
app['key'] = '98yp4fa57mJX6nU4'
|
|
app['id'] = 'IshltOneSkyITLIVE'
|
|
app['password'] = '98yp4fa57mJX6nU4'
|
|
print('App data', app)
|
|
|
|
api = {}
|
|
api['base_url'] = 'https://public.impexium.com/Api/v1' # or https://ishlt.mpxapi.com:443/api/v1 ??
|
|
api['headers'] = { 'Content-Type': 'application/json;charset=UTF-8' }
|
|
|
|
|
|
def get_access_token(api, app):
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
print('*** Get Web API End Point and Access Token ***')
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
|
|
endpoint = '/WebApiUrl'
|
|
uri = api['base_url']+endpoint
|
|
|
|
data = { 'AppName': app['name'], 'AppKey': app['key'] }
|
|
|
|
r = requests.post(url=uri, json=data, headers=api['headers'])
|
|
|
|
print('**************************')
|
|
print('Status Code:', r.status_code)
|
|
print('Headers:', r.headers)
|
|
# print('Encoding:', r.encoding)
|
|
print('JSON:', r.json())
|
|
# print('Text:', r.text)
|
|
print('**************************')
|
|
|
|
response_data = r.json()
|
|
|
|
api['access_token'] = response_data['accessToken']
|
|
api['headers']['AccessToken'] = response_data['accessToken']
|
|
api['auth_uri'] = response_data['uri']
|
|
|
|
return api
|
|
|
|
|
|
def authenticate(api, app):
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
print('*** Authenticate App, and get app token and user token (SSO Token) ***')
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
|
|
data = { 'AppId': app['id'], 'AppPassword': app['password'], 'AppUserEmail': app['user_email'], 'AppUserPassword': app['user_password'] }
|
|
|
|
r = requests.post(url=api['auth_uri'], json=data, headers=api['headers'])
|
|
|
|
print('**************************')
|
|
print('Status Code:', r.status_code)
|
|
print('Headers:', r.headers)
|
|
# print('Encoding:', r.encoding)
|
|
print('JSON:', r.json())
|
|
# print('Text:', r.text)
|
|
print('**************************')
|
|
|
|
response_data = r.json()
|
|
|
|
api['headers']['AppToken'] = response_data['appToken']
|
|
api['headers']['UserToken'] = response_data['userToken']
|
|
api['base_url'] = response_data['uri']
|
|
|
|
# app['user_id'] = response_data['userId']
|
|
# app['sso_token'] = response_data['ssoToken']
|
|
# app['app_configuration'] = response_data['appConfiguration'] # allowGoogleLogin=False, allowLinkedInLogin=False
|
|
|
|
return api
|
|
|
|
|
|
api = get_access_token(api=api, app=app)
|
|
log.debug(api)
|
|
|
|
api = authenticate(api=api, app=app)
|
|
log.debug(api)
|
|
|
|
|
|
def get_custom_fields(api, name=None, page=1):
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
print('*** Get Custom Fields ***')
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
|
|
endpoint = f'/Setup/customfields/{page}'
|
|
uri = api['base_url']+endpoint
|
|
|
|
print('Endpoint URI', uri)
|
|
|
|
params = {}
|
|
|
|
r = requests.get(url=uri, params=params, headers=api['headers'])
|
|
|
|
print('**************************')
|
|
print('Status Code:', r.status_code)
|
|
# print('Headers:', r.headers)
|
|
# print('Encoding:', r.encoding)
|
|
print('JSON:', r.json())
|
|
# print('Text:', r.text)
|
|
print('**************************')
|
|
|
|
response_data = r.json()
|
|
|
|
# pp = pprint.PrettyPrinter(indent=2)
|
|
# pp.pprint(response_data)
|
|
# print('**************************')
|
|
|
|
custom_field_li = response_data
|
|
|
|
return custom_field_li
|
|
|
|
|
|
def get_events(page=1):
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
print('*** Get Events ***')
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
|
|
endpoint = f'/Events/All/{page}'
|
|
uri = api['base_url']+endpoint
|
|
|
|
print('Endpoint URI', uri)
|
|
|
|
params = {}
|
|
|
|
r = requests.get(url=uri, params=params, headers=api['headers'])
|
|
|
|
print('**************************')
|
|
print('Status Code:', r.status_code)
|
|
# print('Headers:', r.headers)
|
|
# print('Encoding:', r.encoding)
|
|
print('JSON:', r.json())
|
|
# print('Text:', r.text)
|
|
print('**************************')
|
|
|
|
response_data = r.json()
|
|
|
|
# pp = pprint.PrettyPrinter(indent=2)
|
|
# pp.pprint(response_data)
|
|
# print('**************************')
|
|
|
|
event_li = response_data
|
|
|
|
return event_li
|
|
|
|
|
|
def get_event_registrants(
|
|
event_code: str,
|
|
page: int = 1,
|
|
return_all: bool = False
|
|
):
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
print('*** Get Event Registrants ***')
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
|
|
endpoint = f'/Events/{event_code}/Registrations/{page}'
|
|
uri = api['base_url']+endpoint
|
|
|
|
params = {}
|
|
|
|
r = requests.get(url=uri, params=params, headers=api['headers'])
|
|
|
|
print('**************************')
|
|
print('Status Code:', r.status_code)
|
|
# print('Headers:', r.headers)
|
|
# print('Encoding:', r.encoding)
|
|
# print('JSON:', r.json())
|
|
# print('Text:', r.text)
|
|
print('**************************')
|
|
|
|
response_data = r.json()
|
|
|
|
# pp = pprint.PrettyPrinter(indent=2)
|
|
# pp.pprint(response_data)
|
|
# print(response_data)
|
|
# print('**************************')
|
|
|
|
event_registrant_li = response_data
|
|
if return_all:
|
|
return event_registrant_li
|
|
else:
|
|
return event_registrant_li['dataList']
|
|
|
|
|
|
def get_individual_registrations(api, record_number, event_code=None, page=1):
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
print('*** Get Individual Registrations ***')
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
|
|
endpoint = f'/Individuals/{record_number}/Registrations/{page}'
|
|
uri = api['base_url']+endpoint
|
|
|
|
params = { 'eventCode': event_code }
|
|
|
|
r = requests.get(url=uri, params=params, headers=api['headers'])
|
|
|
|
print('**************************')
|
|
print('Status Code:', r.status_code)
|
|
# print('Headers:', r.headers)
|
|
# print('Encoding:', r.encoding)
|
|
# print('JSON:', r.json())
|
|
# print('Text:', r.text)
|
|
print('**************************')
|
|
|
|
response_data = r.json()
|
|
|
|
# pp = pprint.PrettyPrinter(indent=2)
|
|
# pp.pprint(response_data)
|
|
# print('**************************')
|
|
|
|
individual_registraion_li = response_data
|
|
|
|
return individual_registraion_li
|
|
|
|
|
|
def get_individual_purchases(api, record_number, product_code=None, purchased_since=None, product_category_code=None, page=1):
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
print('*** Get Individual Purchases ***')
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
|
|
endpoint = f'/Individuals/{record_number}/Purchases/{page}'
|
|
uri = api['base_url']+endpoint
|
|
|
|
params = { 'productCode': product_code, 'purchasedSince': purchased_since, 'productCategoryCode': product_category_code }
|
|
|
|
r = requests.get(url=uri, params=params, headers=api['headers'])
|
|
|
|
print('**************************')
|
|
print('Status Code:', r.status_code)
|
|
# print('Headers:', r.headers)
|
|
# print('Encoding:', r.encoding)
|
|
print('JSON:', r.json())
|
|
# print('Text:', r.text)
|
|
print('**************************')
|
|
|
|
response_data = r.json()
|
|
|
|
# pp = pprint.PrettyPrinter(indent=2)
|
|
# pp.pprint(response_data)
|
|
# print('**************************')
|
|
|
|
individual_purchase_li = response_data
|
|
|
|
return individual_purchase_li
|
|
|
|
|
|
def get_individual_custom_fields(api, individual_id):
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
print('*** Get Individual Custom Fields ***')
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
|
|
endpoint = f'/Individuals/{individual_id}/CustomFields'
|
|
uri = api['base_url']+endpoint
|
|
|
|
params = { }
|
|
|
|
r = requests.get(url=uri, params=params, headers=api['headers'])
|
|
|
|
print('**************************')
|
|
print('Status Code:', r.status_code)
|
|
# print('Headers:', r.headers)
|
|
# print('Encoding:', r.encoding)
|
|
# print('JSON:', r.json())
|
|
# print('Text:', r.text)
|
|
print('**************************')
|
|
|
|
response_data = r.json()
|
|
|
|
# pp = pprint.PrettyPrinter(indent=2)
|
|
# pp.pprint(response_data)
|
|
# print('**************************')
|
|
|
|
individual_custom_field_li = response_data
|
|
|
|
return individual_custom_field_li
|
|
|
|
|
|
def get_individual_orders_open(api, record_number, include_line_items=False, from_date=None, to_date=None, page=1):
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
print('*** Get Individual Orders Open ***')
|
|
print('***** **** *** ** * ** *** **** *****')
|
|
|
|
endpoint = f'/Individuals/{record_number}/Orders/Open/{page}'
|
|
uri = api['base_url']+endpoint
|
|
print(uri)
|
|
|
|
params = { 'includeLineItems': include_line_items, 'fromDate': from_date, 'toDate': to_date }
|
|
|
|
r = requests.get(url=uri, params=params, headers=api['headers'])
|
|
|
|
print('**************************')
|
|
print('Status Code:', r.status_code)
|
|
print('Headers:', r.headers)
|
|
# print('Encoding:', r.encoding)
|
|
print('JSON:', r.json())
|
|
print('Text:', r.text)
|
|
print('**************************')
|
|
|
|
response_data = r.json()
|
|
|
|
# pp = pprint.PrettyPrinter(indent=2)
|
|
# pp.pprint(response_data)
|
|
# print('**************************')
|
|
|
|
individual_orders_open_li = response_data
|
|
|
|
return individual_orders_open_li
|