import datetime, json, pytz, secrets, time import pandas, xlrd # qrcode from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, Response, status from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from app.lib_general import log, logging, secure_hash_string from app.config import settings from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template from app.methods.contact_methods import load_contact_obj, update_contact_obj from app.methods.person_methods import create_update_person_obj, get_person_rec_list, load_person_obj, update_person_obj from app.methods.user_methods import load_user_obj from app.models.contact_models import Contact_Base from app.models.person_models import Person_Base from app.models.user_models import User_Base, User_New_Base, User_Out_Base from app.models.response_models import Resp_Body_Base, mk_resp router = APIRouter() @router.post('/person_data', response_model=Resp_Body_Base) async def importing_person_data( response: Response = Response, ): log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) account_id = 19 full_file_path = 'admin/temp/import_person_data.xlsx' df = pandas.read_excel(full_file_path, na_filter=False, dtype={'external_id':str, 'phone_home':str, 'phone_mobile':str, 'city':str, 'state_province':str, 'address_postal_code':str, 'country':str}) #df = df.fillna('') # replace NaN with '' # df = df.fillna(None) # df = df.fillna('', inplace=True) #return str(df.info()) log.debug(df) df_dict = df.to_dict(orient='records') # log.debug(df_dict) # return mk_resp(data=False, status_code=500, response=response) person_data_li = [] # for i in df.index: for record in df_dict: person_new = None person_id = None contact_id = None address_id = None user_id = None person_data = {} person_data['external_id'] = record['external_id'] # person_data['informal_name'] = record['informal_name'] person_data['given_name'] = record['given_name'] if record['middle_name']: person_data['middle_name'] = record['middle_name'] else: person_data['middle_name'] = None if record['family_name']: person_data['family_name'] = record['family_name'] else: person_data['family_name'] = None # person_data['prefix'] = record['prefix'] person_data['suffix'] = record['suffix'] if person_data['given_name'] and person_data['middle_name'] and person_data['family_name']: person_data['full_name'] = person_data['given_name']+' '+person_data['middle_name']+' '+person_data['family_name'] elif person_data['given_name'] and person_data['family_name']: person_data['full_name'] = person_data['given_name']+' '+person_data['family_name'] elif person_data['given_name']: person_data['full_name'] = person_data['family_name'] elif record['informal_full_name']: person_data['full_name'] = record['informal_full_name'] elif record['informal_name']: person_data['full_name'] = record['informal_name'] else: person_data['full_name'] = None if record['informal_full_name']: person_data['informal_full_name'] = record['informal_full_name'] else: person_data['informal_full_name'] = None person_data['last_first_name'] = record['last_first_name'] person_data['created_on'] = record['created_on'] person_data['updated_on'] = record['updated_on'] other_data = {} other_data['contact_type'] = record['contact_type'] # ???? meta_data = {} meta_data['created_by_method'] = record['created_by_method'] meta_data['created_by_name'] = record['created_by_name'] meta_data['modified_by'] = record['modified_by'] meta_data['created_by_method'] = record['created_by_method'] person_data['other_json'] = json.dumps(other_data, indent=4) person_data['meta_json'] = json.dumps(meta_data, indent=4) # Look up by email address or external ID and INSERT or UPDATE new person record # INSERT or UPDATE a contact record and address record if needed # INSERT or UPDATE a user record if needed # Process the person data log.debug(person_data) # log.debug('*** *** *** *** END TEST RUN *** *** *** ***') # continue if person_rec_li_result := sql_select(table_name='v_person', field_name='external_id', field_value=person_data['external_id']): if not isinstance(person_rec_li_result, list): # Pull out IDs and UPDATE existing person record log.debug('Found one record') person_rec = person_rec_li_result person_id = person_rec.get('person_id', None) contact_id = person_rec.get('contact_id_new', None) # Using _new from view until old contact_id is removed from person table address_id = person_rec.get('address_id', None) user_id = person_rec.get('user_id', None) person_data['id'] = person_id if person_obj_up_result := sql_update(data=person_data, table_name='person'): log.debug(person_obj_up_result) else: log.warning(person_obj_up_result) continue # Something unexpected may have happened else: log.warning('Found more than one record') log.warning(person_rec_li_result) # Do nothing continue # Something unexpected may have happened person_rec_li = person_rec_li_result else: # INSERT new record log.debug('Found no records or something went wrong') person_data['account_id'] = account_id if person_obj_in_result := sql_insert(data=person_data, table_name='person'): log.debug(person_obj_in_result) person_id = person_obj_in_result # Should be an int person_new = True # Need to UPDATE this record after the contact, address, and user data is processed else: log.warning(person_obj_in_result) continue # Something unexpected may have happened # Process the contact data log.debug('Process the contact data') contact_data = {} contact_data['email'] = record['email'] if record['email_status'] != 'Undeliverable': contact_data['email_active'] = True else: contact_data['email_active'] = False contact_data['email_status'] = record['email_status'] if record['email_status']: contact_data['email_status'] = record['email_status'] else: contact_data['email_status'] = None if record['phone_mobile']: contact_data['phone_mobile'] = record['phone_mobile'] else: contact_data['phone_mobile'] = None if record['phone_home']: contact_data['phone_home'] = record['phone_home'] else: contact_data['phone_home'] = None if record['phone_fax']: contact_data['phone_fax'] = record['phone_fax'] elif record['phone_fax'] and record['phone_work_fax']: contact_data['phone_fax'] = record['phone_fax'] contact_data['phone_other'] = record['phone_work_fax'] if record['phone_work']: contact_data['phone_office'] = record['phone_work'] else: contact_data['phone_office'] = None log.debug(contact_data) if contact_id: # UPDATE existing contact record log.info('UPDATE existing contact record') contact_data['id'] = contact_id if contact_obj_up_result := sql_update(data=contact_data, table_name='contact'): log.debug(contact_obj_up_result) else: log.warning(contact_obj_up_result) continue # Something unexpected may have happened elif person_id: # INSERT new contact record and link to person record log.info('INSERT new contact record and link to person record') contact_data['account_id'] = account_id contact_data['for_type'] = 'person' contact_data['for_id'] = person_id if contact_obj_in_result := sql_insert(data=contact_data, table_name='contact'): log.debug(contact_obj_in_result) contact_id = contact_obj_in_result # Should be an int person_new = True # Need to UPDATE this record after the contact, address, and user data is processed else: log.debug(contact_obj_in_result) continue # Something unexpected may have happened # Process the contact address data log.debug('Process the contact address data') address_data = {} if record['address_line_1']: address_data['line_1'] = record['address_line_1'] else: address_data['line_1'] = None if record['address_line_2']: address_data['line_2'] = record['address_line_2'] else: address_data['line_2'] = None if record['address_line_3']: address_data['line_3'] = record['address_line_3'] else: address_data['line_3'] = None if record['address_city']: address_data['city'] = record['address_city'] else: address_data['city'] = None if record['address_state_province_code']: address_data['country_subdivision_code'] = record['address_country_code']+'-'+record['address_state_province_code'] else: address_data['country_subdivision_code'] = None if record['address_postal_code']: address_data['postal_code'] = record['address_postal_code'] else: address_data['postal_code'] = None if record['address_country_code']: address_data['country_alpha_2_code'] = record['address_country_code'] else: address_data['country_alpha_2_code'] = None log.debug(address_data) if address_id: # UPDATE existing address record log.info('UPDATE existing address record') address_data['id'] = address_id if address_obj_up_result := sql_update(data=address_data, table_name='address'): log.debug(address_obj_up_result) else: log.warning(address_obj_up_result) # continue # Something unexpected may have happened elif contact_id: # INSERT new address record and link to contact record log.info('INSERT new address record and link to contact record') address_data['account_id'] = account_id address_data['for_type'] = 'contact' address_data['for_id'] = contact_id if address_obj_in_result := sql_insert(data=address_data, table_name='address'): log.debug(address_obj_in_result) address_id = address_obj_in_result # Should be an int person_new = True # Need to UPDATE this record after the contact, address, and user data is processed else: log.debug(address_obj_in_result) # break continue # Something unexpected may have happened else: log.error('No address ID to update or contact ID to create an address linked to the contact.') return False # Process the user data log.debug('Process the user data') user_data = {} user_data['name'] = person_data['full_name'] user_data['username'] = record['email'] user_data['email'] = record['email'] user_data['email_verified'] = contact_data['email_active'] # Not perfect, but a good start user_data['enable'] = False user_data['enable_from'] = datetime.datetime.now(datetime.timezone.utc) user_data['enable_to'] = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365) user_data['super'] = False user_data['manager'] = False user_data['administrator'] = False user_data['public'] = False user_data['verified'] = True user_data['notes'] = 'Created by importing list' log.debug(user_data) if user_id: # UPDATE existing user record log.info('UPDATE existing user record') user_data['id'] = user_id if user_obj_up_result := sql_update(data=user_data, table_name='user'): log.debug(user_obj_up_result) else: log.warning(user_obj_up_result) continue # Something unexpected may have happened elif person_id: # INSERT new user record and link to person record log.info('INSERT new user record and link to person record') user_data['account_id'] = account_id user_data['person_id'] = person_id random_password_string = secrets.token_urlsafe(8) user_data['password'] = secure_hash_string(string=random_password_string) other_data = {} other_data['temp_password'] = random_password_string user_data['other_json'] = json.dumps(other_data, indent=4) if user_obj_in_result := sql_insert(data=user_data, table_name='user'): log.debug(user_obj_in_result) user_id = user_obj_in_result # Should be an int person_new = True # Need to UPDATE this record after the contact, address, and user data is processed else: log.debug(user_obj_in_result) # break continue # Something unexpected may have happened else: log.error('No user ID to update or person ID to create a user linked to the person.') return False if person_new: log.debug('Updating person record one more time since this is a new person') person_data_up = {} person_data_up['id'] = person_id person_data_up['user_id'] = user_id random_password_string # Don't need to update with the new contact or address IDs that were just created. if person_obj_up_result := sql_update(data=person_data_up, table_name='person'): log.debug(person_obj_up_result) else: log.warning(person_obj_up_result) # break continue # Something unexpected may have happened person_data_li.append(person_data) log.debug(f"Record processed: {person_id} {person_data['full_name']}") # log.debug('*** *** *** *** END TEST RUN *** *** *** ***') # break return mk_resp(data=person_data_li) @router.post('/cont_edu_cert_person_data', response_model=Resp_Body_Base) async def importing_cont_edu_cert_person_data( response: Response = Response, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) account_id = 19 cont_edu_cert_id = 3 full_file_path = 'admin/temp/import_cont_edu_cert_person_data.xlsx' df = pandas.read_excel(full_file_path, na_filter=False, dtype={'external_id':str, 'phone_home':str, 'phone_mobile':str, 'city':str, 'state_province':str, 'address_postal_code':str, 'country':str}) log.debug(df) df_dict = df.to_dict(orient='records') # log.debug(df_dict) # return mk_resp(data=False, status_code=500, response=response) cont_edu_cert_person_data_li = [] # for i in df.index: for record in df_dict: cont_edu_cert_person_new = None person_id = None user_id = None cont_edu_cert_person_id = None cont_edu_cert_person_data = {} cont_edu_cert_person_data['cont_edu_cert_id'] = cont_edu_cert_id cont_edu_cert_person_data['enable'] = True cont_edu_cert_person_data['email'] = record['email'] cont_edu_cert_person_data['given_name'] = record['given_name'] if record.get('middle_name', None): cont_edu_cert_person_data['middle_name'] = record['middle_name'] else: cont_edu_cert_person_data['middle_name'] = None if record['family_name']: cont_edu_cert_person_data['family_name'] = record['family_name'] else: cont_edu_cert_person_data['family_name'] = None if cont_edu_cert_person_data['given_name'] and cont_edu_cert_person_data['middle_name'] and cont_edu_cert_person_data['family_name']: cont_edu_cert_person_data['full_name'] = cont_edu_cert_person_data['given_name']+' '+cont_edu_cert_person_data['middle_name']+' '+cont_edu_cert_person_data['family_name'] elif cont_edu_cert_person_data['given_name'] and cont_edu_cert_person_data['family_name']: cont_edu_cert_person_data['full_name'] = cont_edu_cert_person_data['given_name']+' '+cont_edu_cert_person_data['family_name'] elif cont_edu_cert_person_data['given_name']: cont_edu_cert_person_data['full_name'] = cont_edu_cert_person_data['family_name'] elif record.get('informal_full_name', None): cont_edu_cert_person_data['full_name'] = record['informal_full_name'] elif record['informal_name']: cont_edu_cert_person_data['full_name'] = record['informal_name'] else: cont_edu_cert_person_data['full_name'] = None if record.get('informal_full_name', None): cont_edu_cert_person_data['informal_full_name'] = record['informal_full_name'] else: cont_edu_cert_person_data['informal_full_name'] = None cont_edu_cert_person_data['last_first_name'] = record.get('last_first_name', None) cont_edu_cert_person_data['display_name'] = record.get('display_name', None) # cont_edu_cert_person_data['created_on'] = record['created_on'] # cont_edu_cert_person_data['updated_on'] = record['updated_on'] other_data = {} other_data['other_guest_of'] = record['other_guest_of'] other_data['other_guest_li'] = record['other_guest_li'] cont_edu_cert_person_data['other_json'] = json.dumps(other_data, indent=4) # Look up by email address and INSERT or UPDATE new cont_edu_cert_person record # Process the cont_edu_cert_person data log.debug(cont_edu_cert_person_data) # log.debug('*** *** *** *** END TEST RUN *** *** *** ***') # continue if cont_edu_cert_person_rec_li_result := sql_select(table_name='v_cont_edu_cert_person', field_name='email', field_value=cont_edu_cert_person_data['email']): if not isinstance(cont_edu_cert_person_rec_li_result, list): # Pull out IDs and UPDATE existing cont_edu_cert_person record log.debug('Found one record') cont_edu_cert_person_rec = cont_edu_cert_person_rec_li_result # person_id = cont_edu_cert_person_rec.get('person_id', None) # user_id = cont_edu_cert_person_rec.get('user_id', None) cont_edu_cert_person_data['id'] = cont_edu_cert_person_id if cont_edu_cert_person_obj_up_result := sql_update(data=cont_edu_cert_person_data, table_name='cont_edu_cert_person'): log.debug(cont_edu_cert_person_obj_up_result) else: log.warning(cont_edu_cert_person_obj_up_result) continue # Something unexpected may have happened else: log.warning('Found more than one record') log.warning(cont_edu_cert_person_rec_li_result) # Do nothing continue # Something unexpected may have happened cont_edu_cert_person_rec_li = cont_edu_cert_person_rec_li_result else: # INSERT new record log.debug('Found no records or something went wrong') cont_edu_cert_person_data['account_id'] = account_id if cont_edu_cert_person_obj_in_result := sql_insert(data=cont_edu_cert_person_data, table_name='cont_edu_cert_person'): log.debug(cont_edu_cert_person_obj_in_result) cont_edu_cert_person_id = cont_edu_cert_person_obj_in_result # Should be an int cont_edu_cert_person_new = True # Need to UPDATE this record after the contact, address, and user data is processed else: log.warning(cont_edu_cert_person_obj_in_result) continue # Something unexpected may have happened # if cont_edu_cert_person_new: # log.debug('Updating person record one more time since this is a new person') # cont_edu_cert_person_data_up = {} # cont_edu_cert_person_data_up['id'] = person_id # cont_edu_cert_person_data_up['user_id'] = user_id # random_password_string # # Don't need to update with the new contact or address IDs that were just created. # if cont_edu_cert_person_obj_up_result := sql_update(data=cont_edu_cert_person_data_up, table_name='cont_edu_cert_person'): # log.debug(cont_edu_cert_person_obj_up_result) # else: # log.warning(cont_edu_cert_person_obj_up_result) # # break # continue # Something unexpected may have happened cont_edu_cert_person_data_li.append(cont_edu_cert_person_data) log.debug(f"Record processed: {cont_edu_cert_person_id} {cont_edu_cert_person_data['full_name']}") # log.debug('*** *** *** *** END TEST RUN *** *** *** ***') # break return mk_resp(data=cont_edu_cert_person_data_li) @router.post('/cont_edu_cert_person_data_touch', response_model=Resp_Body_Base) async def importing_cont_edu_cert_person_data_touch( response: Response = Response, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) account_id = 19 cont_edu_cert_id = 3 full_file_path = 'admin/temp/import_cont_edu_cert_person_data.xlsx' df = pandas.read_excel(full_file_path, na_filter=False, dtype={'external_id':str, 'phone_home':str, 'phone_mobile':str, 'city':str, 'state_province':str, 'address_postal_code':str, 'country':str}) log.debug(df) df_dict = df.to_dict(orient='records') # log.debug(df_dict) # return mk_resp(data=False, status_code=500, response=response) cont_edu_cert_person_data_li = [] # for i in df.index: for record in df_dict: cont_edu_cert_person_new = None person_id = None user_id = None cont_edu_cert_person_id = None cont_edu_cert_person_data = {} # cont_edu_cert_person_data['cont_edu_cert_id'] = cont_edu_cert_id cont_edu_cert_person_data['enable'] = True cont_edu_cert_person_data['email'] = record['email'] other_data = {} other_data['last_event_date'] = '2021-08-01' other_data['other_guest_of'] = record['other_guest_of'] other_data['other_guest_li'] = record['other_guest_li'] cont_edu_cert_person_data['other_json'] = json.dumps(other_data, indent=4) # Look up by email address and INSERT or UPDATE new cont_edu_cert_person record # Process the cont_edu_cert_person data log.debug(cont_edu_cert_person_data) # log.debug('*** *** *** *** END TEST RUN *** *** *** ***') # continue if cont_edu_cert_person_rec_li_result := sql_select(table_name='v_cont_edu_cert_person', field_name='email', field_value=cont_edu_cert_person_data['email']): if not isinstance(cont_edu_cert_person_rec_li_result, list): # Pull out IDs and UPDATE existing cont_edu_cert_person record # log.debug('Found one record') cont_edu_cert_person_rec = cont_edu_cert_person_rec_li_result cont_edu_cert_person_id = cont_edu_cert_person_rec.get('cont_edu_cert_person_id', None) log.info(cont_edu_cert_person_id) # person_id = cont_edu_cert_person_rec.get('person_id', None) # user_id = cont_edu_cert_person_rec.get('user_id', None) cont_edu_cert_person_data['id'] = cont_edu_cert_person_id if cont_edu_cert_person_obj_up_result := sql_update(data=cont_edu_cert_person_data, table_name='cont_edu_cert_person'): # log.debug(cont_edu_cert_person_obj_up_result) pass else: log.warning(cont_edu_cert_person_obj_up_result) continue # Something unexpected may have happened else: log.warning('Found more than one record') log.warning(cont_edu_cert_person_rec_li_result) # Do nothing continue # Something unexpected may have happened cont_edu_cert_person_rec_li = cont_edu_cert_person_rec_li_result else: # INSERT new record log.debug('Found no records or something went wrong') cont_edu_cert_person_data['account_id'] = account_id if cont_edu_cert_person_obj_in_result := sql_insert(data=cont_edu_cert_person_data, table_name='cont_edu_cert_person'): log.debug(cont_edu_cert_person_obj_in_result) cont_edu_cert_person_id = cont_edu_cert_person_obj_in_result # Should be an int cont_edu_cert_person_new = True # Need to UPDATE this record after the contact, address, and user data is processed else: log.warning(cont_edu_cert_person_obj_in_result) continue # Something unexpected may have happened cont_edu_cert_person_data_li.append(cont_edu_cert_person_data) log.debug(f"Record processed: {cont_edu_cert_person_id}") # log.debug('*** *** *** *** END TEST RUN *** *** *** ***') # break log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL return mk_resp(data=cont_edu_cert_person_data_li)