Files
OSIT-AE-API-FastAPI/app/routers/importing.py
2021-08-10 19:06:40 -04:00

466 lines
22 KiB
Python

import datetime, json, pytz, secrets, time
import pandas, xlrd # qrcode
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, Response, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from app.lib_general import log, logging, secure_hash_string
from app.config import settings
from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random
from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template
from app.methods.contact_methods import load_contact_obj, update_contact_obj
from app.methods.person_methods import create_update_person_obj, get_person_rec_list, load_person_obj, update_person_obj
from app.methods.user_methods import load_user_obj
from app.models.contact_models import Contact_Base
from app.models.person_models import Person_Base
from app.models.user_models import User_Base, User_New_Base, User_Out_Base
from app.models.response_models import Resp_Body_Base, mk_resp
router = APIRouter()
@router.post('/person_data', response_model=Resp_Body_Base)
async def importing_person_data(
response: Response = Response,
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
account_id = 19
full_file_path = 'admin/temp/import_person_data.xlsx'
df = pandas.read_excel(full_file_path, na_filter=False, dtype={'external_id':str, 'phone_home':str, 'phone_mobile':str, 'city':str, 'state_province':str, 'address_postal_code':str, 'country':str})
#df = df.fillna('') # replace NaN with ''
# df = df.fillna(None)
# df = df.fillna('', inplace=True)
#return str(df.info())
log.debug(df)
df_dict = df.to_dict(orient='records')
# log.debug(df_dict)
# return mk_resp(data=False, status_code=500, response=response)
person_data_li = []
# for i in df.index:
for record in df_dict:
person_new = None
person_id = None
contact_id = None
address_id = None
user_id = None
person_data = {}
person_data['external_id'] = record['external_id']
# person_data['informal_name'] = record['informal_name']
person_data['given_name'] = record['given_name']
if record['middle_name']:
person_data['middle_name'] = record['middle_name']
else:
person_data['middle_name'] = None
if record['family_name']:
person_data['family_name'] = record['family_name']
else:
person_data['family_name'] = None
# person_data['prefix'] = record['prefix']
person_data['suffix'] = record['suffix']
if person_data['given_name'] and person_data['middle_name'] and person_data['family_name']:
person_data['full_name'] = person_data['given_name']+' '+person_data['middle_name']+' '+person_data['family_name']
elif person_data['given_name'] and person_data['family_name']:
person_data['full_name'] = person_data['given_name']+' '+person_data['family_name']
elif person_data['given_name']:
person_data['full_name'] = person_data['family_name']
elif record['informal_full_name']:
person_data['full_name'] = record['informal_full_name']
elif record['informal_name']:
person_data['full_name'] = record['informal_name']
else:
person_data['full_name'] = None
if record['informal_full_name']:
person_data['informal_full_name'] = record['informal_full_name']
else:
person_data['informal_full_name'] = None
person_data['last_first_name'] = record['last_first_name']
person_data['created_on'] = record['created_on']
person_data['updated_on'] = record['updated_on']
other_data = {}
other_data['contact_type'] = record['contact_type'] # ????
meta_data = {}
meta_data['created_by_method'] = record['created_by_method']
meta_data['created_by_name'] = record['created_by_name']
meta_data['modified_by'] = record['modified_by']
meta_data['created_by_method'] = record['created_by_method']
person_data['other_json'] = json.dumps(other_data, indent=4)
person_data['meta_json'] = json.dumps(meta_data, indent=4)
# Look up by email address or external ID and INSERT or UPDATE new person record
# INSERT or UPDATE a contact record and address record if needed
# INSERT or UPDATE a user record if needed
# Process the person data
log.debug(person_data)
# log.debug('*** *** *** *** END TEST RUN *** *** *** ***')
# continue
if person_rec_li_result := sql_select(table_name='v_person', field_name='external_id', field_value=person_data['external_id']):
if not isinstance(person_rec_li_result, list):
# Pull out IDs and UPDATE existing person record
log.debug('Found one record')
person_rec = person_rec_li_result
person_id = person_rec.get('person_id', None)
contact_id = person_rec.get('contact_id_new', None) # Using _new from view until old contact_id is removed from person table
address_id = person_rec.get('address_id', None)
user_id = person_rec.get('user_id', None)
person_data['id'] = person_id
if person_obj_up_result := sql_update(data=person_data, table_name='person'):
log.debug(person_obj_up_result)
else:
log.warning(person_obj_up_result)
continue # Something unexpected may have happened
else:
log.warning('Found more than one record')
log.warning(person_rec_li_result)
# Do nothing
continue # Something unexpected may have happened
person_rec_li = person_rec_li_result
else:
# INSERT new record
log.debug('Found no records or something went wrong')
person_data['account_id'] = account_id
if person_obj_in_result := sql_insert(data=person_data, table_name='person'):
log.debug(person_obj_in_result)
person_id = person_obj_in_result # Should be an int
person_new = True # Need to UPDATE this record after the contact, address, and user data is processed
else:
log.warning(person_obj_in_result)
continue # Something unexpected may have happened
# Process the contact data
log.debug('Process the contact data')
contact_data = {}
contact_data['email'] = record['email']
if record['email_status'] != 'Undeliverable':
contact_data['email_active'] = True
else:
contact_data['email_active'] = False
contact_data['email_status'] = record['email_status']
if record['email_status']:
contact_data['email_status'] = record['email_status']
else:
contact_data['email_status'] = None
if record['phone_mobile']:
contact_data['phone_mobile'] = record['phone_mobile']
else:
contact_data['phone_mobile'] = None
if record['phone_home']:
contact_data['phone_home'] = record['phone_home']
else:
contact_data['phone_home'] = None
if record['phone_fax']:
contact_data['phone_fax'] = record['phone_fax']
elif record['phone_fax'] and record['phone_work_fax']:
contact_data['phone_fax'] = record['phone_fax']
contact_data['phone_other'] = record['phone_work_fax']
if record['phone_work']:
contact_data['phone_office'] = record['phone_work']
else:
contact_data['phone_office'] = None
log.debug(contact_data)
if contact_id:
# UPDATE existing contact record
log.info('UPDATE existing contact record')
contact_data['id'] = contact_id
if contact_obj_up_result := sql_update(data=contact_data, table_name='contact'):
log.debug(contact_obj_up_result)
else:
log.warning(contact_obj_up_result)
continue # Something unexpected may have happened
elif person_id:
# INSERT new contact record and link to person record
log.info('INSERT new contact record and link to person record')
contact_data['account_id'] = account_id
contact_data['for_type'] = 'person'
contact_data['for_id'] = person_id
if contact_obj_in_result := sql_insert(data=contact_data, table_name='contact'):
log.debug(contact_obj_in_result)
contact_id = contact_obj_in_result # Should be an int
person_new = True # Need to UPDATE this record after the contact, address, and user data is processed
else:
log.debug(contact_obj_in_result)
continue # Something unexpected may have happened
# Process the contact address data
log.debug('Process the contact address data')
address_data = {}
if record['address_line_1']:
address_data['line_1'] = record['address_line_1']
else:
address_data['line_1'] = None
if record['address_line_2']:
address_data['line_2'] = record['address_line_2']
else:
address_data['line_2'] = None
if record['address_line_3']:
address_data['line_3'] = record['address_line_3']
else:
address_data['line_3'] = None
if record['address_city']:
address_data['city'] = record['address_city']
else:
address_data['city'] = None
if record['address_state_province_code']:
address_data['country_subdivision_code'] = record['address_country_code']+'-'+record['address_state_province_code']
else:
address_data['country_subdivision_code'] = None
if record['address_postal_code']:
address_data['postal_code'] = record['address_postal_code']
else:
address_data['postal_code'] = None
if record['address_country_code']:
address_data['country_alpha_2_code'] = record['address_country_code']
else:
address_data['country_alpha_2_code'] = None
log.debug(address_data)
if address_id:
# UPDATE existing address record
log.info('UPDATE existing address record')
address_data['id'] = address_id
if address_obj_up_result := sql_update(data=address_data, table_name='address'):
log.debug(address_obj_up_result)
else:
log.warning(address_obj_up_result)
# continue # Something unexpected may have happened
elif contact_id:
# INSERT new address record and link to contact record
log.info('INSERT new address record and link to contact record')
address_data['account_id'] = account_id
address_data['for_type'] = 'contact'
address_data['for_id'] = contact_id
if address_obj_in_result := sql_insert(data=address_data, table_name='address'):
log.debug(address_obj_in_result)
address_id = address_obj_in_result # Should be an int
person_new = True # Need to UPDATE this record after the contact, address, and user data is processed
else:
log.debug(address_obj_in_result)
# break
continue # Something unexpected may have happened
else:
log.error('No address ID to update or contact ID to create an address linked to the contact.')
return False
# Process the user data
log.debug('Process the user data')
user_data = {}
user_data['name'] = person_data['full_name']
user_data['username'] = record['email']
user_data['email'] = record['email']
user_data['email_verified'] = contact_data['email_active'] # Not perfect, but a good start
user_data['enable'] = False
user_data['enable_from'] = datetime.datetime.now(datetime.timezone.utc)
user_data['enable_to'] = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365)
user_data['super'] = False
user_data['manager'] = False
user_data['administrator'] = False
user_data['public'] = False
user_data['verified'] = True
user_data['notes'] = 'Created by importing list'
log.debug(user_data)
if user_id:
# UPDATE existing user record
log.info('UPDATE existing user record')
user_data['id'] = user_id
if user_obj_up_result := sql_update(data=user_data, table_name='user'):
log.debug(user_obj_up_result)
else:
log.warning(user_obj_up_result)
continue # Something unexpected may have happened
elif person_id:
# INSERT new user record and link to person record
log.info('INSERT new user record and link to person record')
user_data['account_id'] = account_id
user_data['person_id'] = person_id
random_password_string = secrets.token_urlsafe(8)
user_data['password'] = secure_hash_string(string=random_password_string)
other_data = {}
other_data['temp_password'] = random_password_string
user_data['other_json'] = json.dumps(other_data, indent=4)
if user_obj_in_result := sql_insert(data=user_data, table_name='user'):
log.debug(user_obj_in_result)
user_id = user_obj_in_result # Should be an int
person_new = True # Need to UPDATE this record after the contact, address, and user data is processed
else:
log.debug(user_obj_in_result)
# break
continue # Something unexpected may have happened
else:
log.error('No user ID to update or person ID to create a user linked to the person.')
return False
if person_new:
log.debug('Updating person record one more time since this is a new person')
person_data_up = {}
person_data_up['id'] = person_id
person_data_up['user_id'] = user_id
random_password_string
# Don't need to update with the new contact or address IDs that were just created.
if person_obj_up_result := sql_update(data=person_data_up, table_name='person'):
log.debug(person_obj_up_result)
else:
log.warning(person_obj_up_result)
# break
continue # Something unexpected may have happened
person_data_li.append(person_data)
log.debug(f"Record processed: {person_id} {person_data['full_name']}")
# log.debug('*** *** *** *** END TEST RUN *** *** *** ***')
# break
return mk_resp(data=person_data_li)
@router.post('/cont_edu_cert_person_data', response_model=Resp_Body_Base)
async def importing_cont_edu_cert_person_data(
response: Response = Response,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
account_id = 19
cont_edu_cert_id = 3
full_file_path = 'admin/temp/import_cont_edu_cert_person_data.xlsx'
df = pandas.read_excel(full_file_path, na_filter=False, dtype={'external_id':str, 'phone_home':str, 'phone_mobile':str, 'city':str, 'state_province':str, 'address_postal_code':str, 'country':str})
log.debug(df)
df_dict = df.to_dict(orient='records')
# log.debug(df_dict)
# return mk_resp(data=False, status_code=500, response=response)
cont_edu_cert_person_data_li = []
# for i in df.index:
for record in df_dict:
cont_edu_cert_person_new = None
person_id = None
user_id = None
cont_edu_cert_person_id = None
cont_edu_cert_person_data = {}
cont_edu_cert_person_data['cont_edu_cert_id'] = cont_edu_cert_id
cont_edu_cert_person_data['enable'] = True
cont_edu_cert_person_data['email'] = record['email']
cont_edu_cert_person_data['given_name'] = record['given_name']
if record.get('middle_name', None):
cont_edu_cert_person_data['middle_name'] = record['middle_name']
else:
cont_edu_cert_person_data['middle_name'] = None
if record['family_name']:
cont_edu_cert_person_data['family_name'] = record['family_name']
else:
cont_edu_cert_person_data['family_name'] = None
if cont_edu_cert_person_data['given_name'] and cont_edu_cert_person_data['middle_name'] and cont_edu_cert_person_data['family_name']:
cont_edu_cert_person_data['full_name'] = cont_edu_cert_person_data['given_name']+' '+cont_edu_cert_person_data['middle_name']+' '+cont_edu_cert_person_data['family_name']
elif cont_edu_cert_person_data['given_name'] and cont_edu_cert_person_data['family_name']:
cont_edu_cert_person_data['full_name'] = cont_edu_cert_person_data['given_name']+' '+cont_edu_cert_person_data['family_name']
elif cont_edu_cert_person_data['given_name']:
cont_edu_cert_person_data['full_name'] = cont_edu_cert_person_data['family_name']
elif record.get('informal_full_name', None):
cont_edu_cert_person_data['full_name'] = record['informal_full_name']
elif record['informal_name']:
cont_edu_cert_person_data['full_name'] = record['informal_name']
else:
cont_edu_cert_person_data['full_name'] = None
if record.get('informal_full_name', None):
cont_edu_cert_person_data['informal_full_name'] = record['informal_full_name']
else:
cont_edu_cert_person_data['informal_full_name'] = None
cont_edu_cert_person_data['last_first_name'] = record.get('last_first_name', None)
cont_edu_cert_person_data['display_name'] = record.get('display_name', None)
# cont_edu_cert_person_data['created_on'] = record['created_on']
# cont_edu_cert_person_data['updated_on'] = record['updated_on']
other_data = {}
other_data['other_guest_of'] = record['other_guest_of']
other_data['other_guest_li'] = record['other_guest_li']
cont_edu_cert_person_data['other_json'] = json.dumps(other_data, indent=4)
# Look up by email address and INSERT or UPDATE new cont_edu_cert_person record
# Process the cont_edu_cert_person data
log.debug(cont_edu_cert_person_data)
# log.debug('*** *** *** *** END TEST RUN *** *** *** ***')
# continue
if cont_edu_cert_person_rec_li_result := sql_select(table_name='v_cont_edu_cert_person', field_name='email', field_value=cont_edu_cert_person_data['email']):
if not isinstance(cont_edu_cert_person_rec_li_result, list):
# Pull out IDs and UPDATE existing cont_edu_cert_person record
log.debug('Found one record')
cont_edu_cert_person_rec = cont_edu_cert_person_rec_li_result
# person_id = cont_edu_cert_person_rec.get('person_id', None)
# user_id = cont_edu_cert_person_rec.get('user_id', None)
cont_edu_cert_person_data['id'] = cont_edu_cert_person_id
if cont_edu_cert_person_obj_up_result := sql_update(data=cont_edu_cert_person_data, table_name='cont_edu_cert_person'):
log.debug(cont_edu_cert_person_obj_up_result)
else:
log.warning(cont_edu_cert_person_obj_up_result)
continue # Something unexpected may have happened
else:
log.warning('Found more than one record')
log.warning(cont_edu_cert_person_rec_li_result)
# Do nothing
continue # Something unexpected may have happened
cont_edu_cert_person_rec_li = cont_edu_cert_person_rec_li_result
else:
# INSERT new record
log.debug('Found no records or something went wrong')
cont_edu_cert_person_data['account_id'] = account_id
if cont_edu_cert_person_obj_in_result := sql_insert(data=cont_edu_cert_person_data, table_name='cont_edu_cert_person'):
log.debug(cont_edu_cert_person_obj_in_result)
cont_edu_cert_person_id = cont_edu_cert_person_obj_in_result # Should be an int
cont_edu_cert_person_new = True # Need to UPDATE this record after the contact, address, and user data is processed
else:
log.warning(cont_edu_cert_person_obj_in_result)
continue # Something unexpected may have happened
# if cont_edu_cert_person_new:
# log.debug('Updating person record one more time since this is a new person')
# cont_edu_cert_person_data_up = {}
# cont_edu_cert_person_data_up['id'] = person_id
# cont_edu_cert_person_data_up['user_id'] = user_id
# random_password_string
# # Don't need to update with the new contact or address IDs that were just created.
# if cont_edu_cert_person_obj_up_result := sql_update(data=cont_edu_cert_person_data_up, table_name='cont_edu_cert_person'):
# log.debug(cont_edu_cert_person_obj_up_result)
# else:
# log.warning(cont_edu_cert_person_obj_up_result)
# # break
# continue # Something unexpected may have happened
cont_edu_cert_person_data_li.append(cont_edu_cert_person_data)
log.debug(f"Record processed: {cont_edu_cert_person_id} {cont_edu_cert_person_data['full_name']}")
# log.debug('*** *** *** *** END TEST RUN *** *** *** ***')
# break
return mk_resp(data=cont_edu_cert_person_data_li)