1567 lines
72 KiB
Python
1567 lines
72 KiB
Python
from __future__ import annotations
|
|
import datetime, pytz
|
|
|
|
from typing import Dict, List, Optional, Set, Union
|
|
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
|
|
|
|
from app.db_sql import redis_lookup_id_random, sql_insert, sql_insert_or_update, sql_select, sql_update
|
|
from app.lib_general import log, logging, logger_reset, send_email
|
|
|
|
# from app.methods.address_methods import load_address_obj
|
|
from app.methods.contact_methods import create_contact_obj, create_update_contact_obj, create_update_contact_obj_v4, load_contact_obj, update_contact_obj
|
|
from app.methods.order_cart_methods import get_order_cart_id_for_person_id, load_order_cart_obj
|
|
from app.methods.order_methods import get_order_rec_list, load_order_obj
|
|
from app.methods.organization_methods import create_update_organization_obj, load_organization_obj, update_organization_obj
|
|
# from app.methods.user_methods import create_user_obj # , load_user_obj, update_user_obj
|
|
|
|
from app.models.common_field_schema import default_num_bytes
|
|
from app.models.contact_models import Contact_Base
|
|
from app.models.person_models import Person_Base
|
|
|
|
|
|
# ### BEGIN ### API Person Methods ### load_person_obj() ###
|
|
# Updated 2021-12-15
|
|
def load_person_obj(
|
|
person_id: int|str,
|
|
auth_key: str = None,
|
|
limit: int = 1000,
|
|
by_alias: bool = True,
|
|
exclude_unset: bool = True,
|
|
model_as_dict: bool = False,
|
|
enabled: str = 'enabled', # enabled, disabled, all
|
|
inc_address: bool = False, # Under contact
|
|
inc_contact: bool = False,
|
|
inc_event_list: bool = False,
|
|
inc_journal_list: bool = False,
|
|
inc_journal_entry_list: bool = False,
|
|
inc_membership_cfg: bool = False,
|
|
inc_membership_group: bool = False,
|
|
inc_membership_person_group: bool = False,
|
|
inc_membership_group_list: bool = False,
|
|
inc_membership_group_person_list: bool = False,
|
|
inc_membership_person: bool = False, # NOTE: Same as inc_membership_person_list
|
|
inc_membership_person_list: bool = False, # NOTE: Same as inc_membership_person
|
|
inc_membership_person_profile: bool = False,
|
|
inc_membership_person_profile_cust: bool = False,
|
|
inc_membership_type: bool = False,
|
|
inc_membership_type_person: bool = False,
|
|
inc_membership_type_list: bool = False,
|
|
inc_membership_type_person_list: bool = False,
|
|
inc_order_cfg: bool = False,
|
|
inc_order_closed_count: bool = False,
|
|
inc_order_line_list: bool = False,
|
|
inc_order_list: bool = False,
|
|
inc_order_cart: bool = False,
|
|
inc_organization: bool = False,
|
|
inc_post_list: bool = False,
|
|
inc_post_comment_list: bool = False,
|
|
inc_product: bool = False,
|
|
inc_user: bool = False,
|
|
inc_user_role_list: bool = False,
|
|
) -> Person_Base|dict|bool:
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if person_id := redis_lookup_id_random(record_id_random=person_id, table_name='person'): pass
|
|
else: return None
|
|
|
|
if auth_key:
|
|
sql = f"""
|
|
SELECT *
|
|
FROM `v_person` AS person
|
|
WHERE person.id = :person_id
|
|
AND person.allow_auth_key = 1
|
|
AND person.auth_key = :auth_key
|
|
LIMIT 1
|
|
;
|
|
"""
|
|
log.debug(sql)
|
|
|
|
data = {}
|
|
data['person_id'] = person_id
|
|
data['auth_key'] = auth_key
|
|
log.debug(data)
|
|
|
|
if person_rec := sql_select(sql=sql, data=data):
|
|
# Only wipe the key if the last update to person record was more than X minutes
|
|
updated_on = person_rec.get('updated_on')
|
|
updated_on_string = updated_on.isoformat()
|
|
log.debug(updated_on_string)
|
|
|
|
eastern = pytz.timezone('US/Eastern')
|
|
|
|
updated_on_localized = eastern.localize(updated_on)
|
|
log.debug(updated_on_localized.isoformat())
|
|
|
|
# updated_on_utc = person_rec.get('updated_on').replace(tzinfo=datetime.timezone.utc)
|
|
# updated_on_tz = person_rec.get('updated_on').replace(tzinfo=eastern)
|
|
# updated_on_tz_string = updated_on_tz.isoformat()
|
|
# log.debug(updated_on_tz_string)
|
|
|
|
current_datetime_utc = datetime.datetime.utcnow()
|
|
current_datetime_utc_string = current_datetime_utc.isoformat()
|
|
log.debug(current_datetime_utc_string)
|
|
|
|
# datetime_difference = current_datetime_utc - updated_on_localized
|
|
# total_seconds = datetime_difference.total_seconds()
|
|
# log.debug(total_seconds)
|
|
|
|
current_datetime_utc_localize = pytz.utc.localize(current_datetime_utc)
|
|
current_datetime_utc_localize_string = current_datetime_utc_localize.isoformat()
|
|
|
|
# test_datetime_utc = datetime.datetime.utcnow()- datetime.timedelta(seconds=120)
|
|
datetime_difference = current_datetime_utc_localize - updated_on_localized
|
|
total_seconds = datetime_difference.total_seconds()
|
|
log.debug(total_seconds)
|
|
|
|
if total_seconds > 7200: # 7200 seconds = 2 hours
|
|
log.warning('The authorization key has expired')
|
|
|
|
update_person_data = {}
|
|
update_person_data['id'] = person_id
|
|
update_person_data['auth_key'] = None # secrets.token_urlsafe(default_num_bytes)
|
|
|
|
if person_rec_update_result := sql_update(table_name='person', data=update_person_data):
|
|
log.info('The person record was updated with a new auth_key')
|
|
else:
|
|
log.warning('The authorization key is still valid')
|
|
|
|
else: return person_rec # None or False
|
|
else:
|
|
if person_rec := sql_select(table_name='v_person', record_id=person_id): pass
|
|
else: return person_rec # None or False
|
|
|
|
log.debug(person_rec)
|
|
|
|
try:
|
|
person_obj = Person_Base(**person_rec)
|
|
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(person_obj)
|
|
except ValidationError as e:
|
|
log.error(e.json())
|
|
return False
|
|
|
|
# Updated 2021-12-14
|
|
if inc_contact:
|
|
log.info('Need to include contact data...')
|
|
contact_id = person_rec.get('contact_id', None)
|
|
log.debug(contact_id)
|
|
if contact_result := load_contact_obj(
|
|
contact_id = contact_id,
|
|
limit = limit,
|
|
by_alias = by_alias,
|
|
exclude_unset = exclude_unset,
|
|
model_as_dict = model_as_dict,
|
|
enabled = enabled,
|
|
inc_address = inc_address,
|
|
):
|
|
person_obj.contact = contact_result
|
|
else: person_obj.contact = {} # None
|
|
|
|
# Updated 2021-07-09
|
|
# if inc_membership_group_list:
|
|
# from app.methods.membership_group_methods import get_membership_group_rec_list, load_membership_group_obj
|
|
# if membership_group_rec_list_result := get_membership_group_rec_list(
|
|
# for_obj_type = 'person',
|
|
# for_obj_id = person_id,
|
|
# limit = limit,
|
|
# enabled = enabled,
|
|
# ):
|
|
# membership_group_result_list = []
|
|
# for membership_group_rec in membership_group_rec_list_result:
|
|
# if membership_group_result := load_membership_group_obj(
|
|
# membership_group_id = membership_group_rec.get('membership_group_id', None),
|
|
# limit = limit,
|
|
# by_alias = by_alias,
|
|
# exclude_unset = exclude_unset,
|
|
# model_as_dict = model_as_dict,
|
|
# enabled = enabled,
|
|
# inc_membership_group_profile = inc_membership_group_profile,
|
|
# inc_membership_group_profile_cust = inc_membership_group_profile_cust,
|
|
# inc_membership_type = inc_membership_type,
|
|
# inc_product = inc_product,
|
|
# ):
|
|
# membership_group_result_list.append(membership_group_result)
|
|
# else:
|
|
# membership_group_result_list.append(None)
|
|
# person_obj.membership_group_list = membership_group_result_list
|
|
# else: person_obj.membership_group_list = []
|
|
|
|
# Updated 2021-12-14
|
|
if inc_membership_person:
|
|
log.info('Need to include membership person data...')
|
|
from app.methods.membership_person_methods import load_membership_person_obj
|
|
membership_person_id = person_rec.get('membership_person_id', None)
|
|
log.debug(membership_person_id)
|
|
if membership_person_result := load_membership_person_obj(
|
|
membership_person_id = membership_person_id,
|
|
limit = limit,
|
|
by_alias = by_alias,
|
|
exclude_unset = exclude_unset,
|
|
model_as_dict = model_as_dict,
|
|
enabled = enabled,
|
|
inc_address = inc_address,
|
|
inc_contact = inc_contact,
|
|
inc_membership_cfg = inc_membership_cfg,
|
|
inc_membership_group = inc_membership_group, # The primary membership group, if there is one.
|
|
inc_membership_group_list = inc_membership_group_list, # All membership groups they are a part of.
|
|
inc_membership_person_profile = inc_membership_person_profile,
|
|
inc_membership_person_profile_cust = inc_membership_person_profile_cust,
|
|
inc_membership_type = inc_membership_type, # The primary membership type, if there is one.
|
|
inc_membership_type_list = inc_membership_type_list, # All the membership types they are a part of.
|
|
# inc_person = inc_person,
|
|
inc_product = inc_product,
|
|
# inc_product_list = inc_product_list,
|
|
# inc_user = inc_user,
|
|
):
|
|
person_obj.membership_person = membership_person_result
|
|
else: person_obj.membership_person = {} # None
|
|
|
|
# Updated 2021-07-09
|
|
# if inc_membership_type or inc_membership_type_list: # Technically should this be inc_membership_type_list???
|
|
# from app.methods.membership_type_methods import get_membership_type_rec_list, load_membership_type_obj
|
|
# if membership_type_rec_list_result := get_membership_type_rec_list(
|
|
# for_obj_type = 'person',
|
|
# for_obj_id = person_id,
|
|
# limit = limit,
|
|
# enabled = enabled,
|
|
# ):
|
|
# membership_type_result_list = []
|
|
# for membership_type_rec in membership_type_rec_list_result:
|
|
# if membership_type_result := load_membership_type_obj(
|
|
# membership_type_id = membership_type_rec.get('membership_type_id', None),
|
|
# limit = limit,
|
|
# by_alias = by_alias,
|
|
# exclude_unset = exclude_unset,
|
|
# model_as_dict = model_as_dict,
|
|
# enabled = enabled,
|
|
# inc_membership_type_profile = inc_membership_type_profile,
|
|
# inc_membership_type_profile_cust = inc_membership_type_profile_cust,
|
|
# inc_membership_type = inc_membership_type,
|
|
# inc_product = inc_product,
|
|
# ):
|
|
# membership_type_result_list.append(membership_type_result)
|
|
# else:
|
|
# membership_type_result_list.append(None)
|
|
# type_obj.membership_type_list = membership_type_result_list
|
|
# else: type_obj.membership_type_list = []
|
|
|
|
# Updated 2021-12-14
|
|
if inc_order_cart:
|
|
log.info('Need to include order cart...')
|
|
if order_cart_id := get_order_cart_id_for_person_id(
|
|
person_id = person_id,
|
|
):
|
|
if order_cart_result := load_order_cart_obj(
|
|
order_cart_id = order_cart_id,
|
|
inc_order_cart_line_list = True,
|
|
):
|
|
person_obj.order_cart = order_cart_result
|
|
else: person_obj.order_cart = {} # None
|
|
|
|
# Updated 2021-11-16
|
|
if inc_order_closed_count:
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.info('Need to include order closed count...')
|
|
log.debug(person_obj)
|
|
if order_rec_list_result := get_order_rec_list(
|
|
for_obj_type = 'person',
|
|
for_obj_id = person_id,
|
|
limit = limit,
|
|
enabled = enabled,
|
|
status = 'closed'
|
|
):
|
|
person_obj.orders_info = {}
|
|
person_obj.orders_info['closed_count'] = len(order_rec_list_result)
|
|
else:
|
|
person_obj.orders_info = {}
|
|
person_obj.orders_info['closed_count'] = None
|
|
|
|
# Updated 2021-06-18
|
|
if inc_order_list:
|
|
log.info('Need to include order list data...')
|
|
if order_rec_list_result := get_order_rec_list(
|
|
for_obj_type = 'person',
|
|
for_obj_id = person_id,
|
|
limit = limit,
|
|
enabled = enabled,
|
|
):
|
|
order_result_list = []
|
|
for order_rec in order_rec_list_result:
|
|
order_result_list.append(
|
|
load_order_obj(
|
|
order_id = order_rec.get('order_id', None),
|
|
limit = limit,
|
|
by_alias = by_alias,
|
|
exclude_unset = exclude_unset,
|
|
model_as_dict = model_as_dict,
|
|
enabled = enabled,
|
|
inc_order_cfg = inc_order_cfg,
|
|
inc_order_line_list = inc_order_line_list,
|
|
)
|
|
)
|
|
person_obj.order_list = order_result_list
|
|
else: person_obj.order_list = []
|
|
|
|
# Updated 2021-12-14
|
|
if inc_organization:
|
|
log.info('Need to include organization data...')
|
|
organization_id = person_rec.get('organization_id', None)
|
|
log.debug(organization_id)
|
|
if organization_dict := load_organization_obj(
|
|
organization_id = organization_id,
|
|
limit = limit,
|
|
by_alias = by_alias,
|
|
exclude_unset = exclude_unset,
|
|
model_as_dict = model_as_dict,
|
|
enabled = enabled,
|
|
inc_address = inc_address,
|
|
inc_contact = inc_contact,
|
|
):
|
|
person_obj.organization = organization_dict
|
|
else: person_obj.organization = {} # None
|
|
|
|
# Updated 2021-12-14
|
|
if inc_user:
|
|
log.info('Need to include user data...')
|
|
user_id = person_rec.get('user_id', None)
|
|
log.debug(user_id)
|
|
from app.methods.user_methods import load_user_obj
|
|
if user_result := load_user_obj(
|
|
user_id = user_id,
|
|
limit = limit,
|
|
by_alias = by_alias,
|
|
exclude_unset = exclude_unset,
|
|
model_as_dict = model_as_dict,
|
|
enabled = enabled,
|
|
inc_user_role_list = inc_user_role_list,
|
|
):
|
|
person_obj.user = user_result
|
|
else: person_obj.user = {} # None
|
|
|
|
if model_as_dict:
|
|
return person_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
|
|
else:
|
|
return person_obj
|
|
# ### END ### API Person Methods ### load_person_obj() ###
|
|
|
|
|
|
# ### BEGIN ### API Person Methods ### get_person_rec_list() ###
|
|
@logger_reset
|
|
def get_person_rec_list(
|
|
for_obj_type: str,
|
|
for_obj_id: str,
|
|
email: str,
|
|
limit: int = 1000,
|
|
enabled: str = 'enabled', # enabled, disabled, all
|
|
) -> list|bool:
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type): pass
|
|
else: return False
|
|
|
|
data = {}
|
|
data[f'{for_obj_type}_id'] = for_obj_id
|
|
data['email'] = email
|
|
sql_obj_type_id = f'`person`.{for_obj_type}_id = :{for_obj_type}_id'
|
|
|
|
if email:
|
|
sql_where_email = 'AND (user.email = :email OR contact.email = :email)'
|
|
|
|
# if enabled in ['enabled', 'disabled', 'all']:
|
|
# if enabled == 'enabled':
|
|
# data['enable'] = True
|
|
# sql_enabled = f'AND `person`.enable = :enable'
|
|
# elif enabled == 'disabled':
|
|
# data['enable'] = False
|
|
# sql_enabled = f'AND `person`.enable = :enable'
|
|
# elif enabled == 'all':
|
|
# sql_enabled = ''
|
|
sql_enabled = ''
|
|
|
|
if limit:
|
|
data['limit'] = limit
|
|
sql_limit = f'LIMIT :limit'
|
|
else:
|
|
sql_limit = ''
|
|
|
|
if not email:
|
|
sql = f"""
|
|
SELECT `person`.id AS 'person_id', `person`.id_random AS 'person_id_random'
|
|
FROM `person` AS `person`
|
|
WHERE
|
|
{sql_obj_type_id}
|
|
{sql_enabled}
|
|
ORDER BY `person`.created_on DESC, `person`.updated_on DESC
|
|
{sql_limit};
|
|
"""
|
|
else:
|
|
sql = f"""
|
|
SELECT `person`.id AS 'person_id', `person`.id_random AS 'person_id_random'
|
|
FROM `person` AS `person`
|
|
LEFT JOIN `user` ON person.user_id = user.id
|
|
LEFT JOIN `contact` AS `contact` ON person.id = contact.for_id AND contact.for_type = 'person'
|
|
WHERE
|
|
{sql_obj_type_id}
|
|
{sql_where_email}
|
|
{sql_enabled}
|
|
ORDER BY `person`.priority DESC, `person`.sort ASC,`person`.given_name ASC, `person`.family_name ASC, `person`.created_on DESC, `person`.updated_on DESC
|
|
{sql_limit};
|
|
"""
|
|
log.debug(sql)
|
|
|
|
if person_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
|
|
log.info('Got a list result')
|
|
person_rec_li = person_rec_li_result
|
|
else: # [] or False
|
|
log.info('No results or something went wrong')
|
|
person_rec_li = person_rec_li_result
|
|
|
|
log.debug(person_rec_li_result)
|
|
|
|
return person_rec_li
|
|
# ### END ### API Person Methods ### get_person_rec_list() ###
|
|
|
|
|
|
# ### BEGIN ### API Person Methods ### create_update_person_obj_v4b() ###
|
|
def create_update_person_obj_v4b(
|
|
account_id: int|str,
|
|
person_dict_obj: Person_Base,
|
|
person_id: int|str|None = None,
|
|
process_contact: bool = True, # For future v5
|
|
process_organization: bool = True, # For future v5
|
|
process_user: bool = True, # For future v5
|
|
create_sub_obj: bool = True, # For future v5
|
|
fail_any: bool = False, # Fail if any thing goes wrong for sub objects
|
|
return_outline: bool = False, # For future v5
|
|
) -> bool:
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
log.info('Checking requirements...')
|
|
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): log.info(f'Account ID: {account_id}')
|
|
else:
|
|
log.error('Missing or invalid Account ID passed. Failed requirement.')
|
|
log.error(f'Account ID: {account_id}')
|
|
return False
|
|
|
|
if person_id:
|
|
log.info(f'Person ID passed. Update existing Person below. Person ID: {person_id}')
|
|
|
|
if person_id := redis_lookup_id_random(record_id_random=person_id, table_name='person'): pass
|
|
else:
|
|
log.error('Person ID passed but is invalid. Failed requirement.')
|
|
return False
|
|
# person_obj.id = person_id
|
|
else:
|
|
log.info('No Person ID passed. Create new Person below. Required: Account ID')
|
|
|
|
log.debug(type(person_dict_obj))
|
|
log.info('Create dictionary or Pydantic object variables...')
|
|
if isinstance(person_dict_obj, dict):
|
|
person_dict = person_dict_obj
|
|
person_dict['account_id'] = account_id
|
|
if person_id:
|
|
person_dict['person_id'] = person_id
|
|
try:
|
|
person_obj = Person_Base(**person_dict)
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(person_obj)
|
|
except ValidationError as e:
|
|
log.error(e.json())
|
|
return False
|
|
else:
|
|
person_obj = person_dict_obj
|
|
person_obj.account_id = account_id
|
|
if person_id:
|
|
# NOTE: Can't update the ID alias if it was never set.
|
|
person_obj.id = person_id
|
|
|
|
person_dict = person_obj.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'contact', 'contact_id', 'contact_id_random', 'email', 'cc_email', 'membership_person_id', 'membership_person_id_random', 'organization', 'user', 'created_on', 'updated_on'})
|
|
|
|
log.info(f'SQL INSERT or UPDATE record... Person ID: {person_id}')
|
|
if person_id:
|
|
if person_dict_up_result := sql_update(data=person_dict, table_name='person', rm_id_random=True): pass
|
|
else:
|
|
log.warning(f'Person not updated. Person ID: {person_id}')
|
|
log.debug(person_dict_up_result)
|
|
return False
|
|
log.debug(person_dict_up_result)
|
|
else:
|
|
if person_dict_in_result := sql_insert(data=person_dict, table_name='person', rm_id_random=True, id_random_length=default_num_bytes): pass
|
|
else:
|
|
log.warning(f'Person not created.')
|
|
log.debug(person_dict_in_result)
|
|
return False
|
|
log.debug(person_dict_in_result)
|
|
|
|
person_id = person_dict_in_result
|
|
|
|
person_outline = {}
|
|
person_outline['person_id'] = person_id
|
|
person_outline['contact_id'] = None
|
|
person_outline['contact'] = {}
|
|
person_outline['contact']['address_id'] = None
|
|
person_outline['organization_id'] = None
|
|
person_outline['user_id'] = None
|
|
|
|
# Updated 2021-09-08
|
|
log.info(f'Check if processing Contact data...')
|
|
# NOTE: Use object model version because of better type checking and validations
|
|
if process_contact and person_obj.contact:
|
|
person_outline['contact_id'] = None
|
|
contact_obj = person_obj.contact
|
|
if contact_id := person_obj.contact_id : pass
|
|
elif contact_id := contact_obj.id: pass
|
|
else: contact_id = None
|
|
contact_obj.id
|
|
contact_obj.for_type = 'person'
|
|
contact_obj.for_id = person_id
|
|
create_update_contact_obj_result = create_update_contact_obj_v4(
|
|
account_id = account_id,
|
|
contact_dict_obj = contact_obj,
|
|
contact_id = contact_id,
|
|
for_type = 'person',
|
|
for_id = person_id,
|
|
# process_address = process_address,
|
|
# create_sub_obj = create_sub_obj,
|
|
fail_any = fail_any,
|
|
return_outline = return_outline,
|
|
)
|
|
if isinstance(create_update_contact_obj_result, int):
|
|
contact_id = create_update_contact_obj_result
|
|
elif create_update_contact_obj_result == True: pass
|
|
else:
|
|
log.warning(f'Create or Update failed while trying create_update_contact_obj_v4(): {create_update_contact_obj_result}')
|
|
contact_id = None
|
|
|
|
person_outline['contact_id'] = contact_id
|
|
|
|
# Updated 2021-09-08
|
|
log.info(f'Check if processing Organization data...')
|
|
if process_organization and person_obj.organization:
|
|
organization_obj = person_obj.organization
|
|
organization_id = person_obj.organization_id_random
|
|
update_person_obj = False
|
|
organization_result = create_update_organization_obj(
|
|
organization_id = organization_id,
|
|
organization_obj = organization_obj,
|
|
process_contact = True, # Setting to True under the assumption that if there is organization information then there is probably a contact (and address).
|
|
)
|
|
log.debug(organization_result)
|
|
if isinstance(organization_result, bool) and organization_result is True:
|
|
pass # Do not need to update person object.
|
|
elif isinstance(organization_result, bool) and organization_result is False:
|
|
pass # Do not need to update person object.
|
|
elif isinstance(organization_result, int):
|
|
person_obj.organization_id = organization_result
|
|
organization_id = organization_result
|
|
else:
|
|
log.warning('Something may have gone wrong while trying to create or update a organization.')
|
|
|
|
log.info(f'Check if need to update the Person with the Organization ID... Update Person Obj: {update_person_obj} Organization ID: {organization_id}')
|
|
if update_person_obj and isinstance(organization_id, int):
|
|
log.info(f'Updating the Person with the new/current Organization ID. Organization ID: {organization_id}')
|
|
person_data_up = {}
|
|
person_data_up['id'] = person_id
|
|
person_data_up['organization_id'] = organization_id
|
|
if person_data_up_result := sql_update(data=person_data_up, table_name='person'):
|
|
log.info(f'Person updated with Organization ID. Person ID: {person_id} Organization ID: {organization_id}')
|
|
else:
|
|
log.error(f'Person not updated with current Organization ID. Person ID: {person_id} Organization ID: {organization_id}')
|
|
if fail_any: return False
|
|
log.debug(person_dict_up_result)
|
|
|
|
person_outline['organization_id'] = organization_id
|
|
|
|
# Updated 2021-09-08
|
|
log.info(f'Check if processing User data... Process User: {process_user}')
|
|
if process_user and person_obj.user:
|
|
log.info(f'User data was found. Create a new User and link it to the new Person or update existing User. Person ID: {person_id}')
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
user_obj = person_obj.user
|
|
log.debug(user_obj)
|
|
if user_id := person_obj.user_id : pass
|
|
elif user_id := user_obj.id: pass
|
|
elif user_id := person_obj.user_id_random : pass # This is because of the person model and not wanting to create an import loop for user model
|
|
elif user_id := user_obj.id_random: pass # This is because of the person model and not wanting to create an import loop for user model
|
|
else: user_id = None
|
|
update_person_obj = False
|
|
if user_id:
|
|
log.warning('User ID found. This is not expected, but should be ok.')
|
|
# from app.methods.user_methods import update_user_obj_v3
|
|
from app.methods.user_methods import update_user_obj
|
|
if update_user_obj_result := update_user_obj(
|
|
user_id = user_id,
|
|
user_obj_up = user_obj,
|
|
# create_sub_obj = create_sub_obj,
|
|
# fail_any = fail_any,
|
|
):
|
|
log.info(f'User updated. User ID: {user_id}')
|
|
log.debug(update_user_obj_result)
|
|
|
|
# NOTE: This should not be needed. Updating person just in case!
|
|
update_person_obj = True
|
|
else:
|
|
log.warning(f'User not updated. Person ID: {person_id}')
|
|
log.debug(update_user_obj_result)
|
|
user_id = None
|
|
if fail_any: return False
|
|
else:
|
|
log.info(f'No User ID found.')
|
|
# from app.methods.user_methods import create_user_obj_v3
|
|
from app.methods.user_methods import create_user_obj
|
|
if create_user_obj_result := create_user_obj(
|
|
account_id = account_id,
|
|
user_obj_new = user_obj,
|
|
# create_sub_obj = create_sub_obj,
|
|
# fail_any = fail_any,
|
|
):
|
|
if isinstance(create_user_obj_result, int):
|
|
log.info(f'User created. User ID: {create_user_obj_result}')
|
|
log.debug(create_user_obj_result)
|
|
user_id = create_user_obj_result
|
|
# Need to update the person with the new user_id
|
|
update_person_obj = True
|
|
else:
|
|
log.warning(f'User not created. Updated? Person ID: {person_id}')
|
|
log.debug(create_user_obj_result)
|
|
user_id = None
|
|
if fail_any: return False
|
|
else:
|
|
log.warning(f'User not created. Person ID: {person_id}')
|
|
log.debug(create_user_obj_result)
|
|
user_id = None
|
|
if fail_any: return False
|
|
|
|
log.info(f'Check if need to update the Person with the User ID... Update Person Obj: {update_person_obj} User ID: {user_id}')
|
|
if update_person_obj and isinstance(user_id, int):
|
|
log.info(f'Updating the Person with the new/current User ID. User ID: {user_id}')
|
|
person_data_up = {}
|
|
person_data_up['id'] = person_id
|
|
person_data_up['user_id'] = user_id
|
|
if person_data_up_result := sql_update(data=person_data_up, table_name='person'):
|
|
log.info(f'Person updated with User ID. Person ID: {person_id} User ID: {user_id}')
|
|
else:
|
|
log.error(f'Person not updated with current User ID. Person ID: {person_id} User ID: {user_id}')
|
|
if fail_any: return False
|
|
log.debug(person_data_up_result)
|
|
|
|
person_outline['user_id'] = user_id
|
|
else:
|
|
log.info('User data not found.')
|
|
pass
|
|
|
|
return person_id
|
|
|
|
# Process person data
|
|
person_dict_up = person_obj.dict(by_alias=False, exclude_unset=True, exclude={'contact_id', 'contact_id_random', 'contact', 'organization', 'user'})
|
|
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(person_obj)
|
|
log.debug(person_dict_up)
|
|
|
|
# Update record
|
|
person_up_result = sql_update(
|
|
data = person_dict_up,
|
|
table_name = 'person',
|
|
rm_id_random = True,
|
|
)
|
|
log.debug(person_up_result)
|
|
if isinstance(person_up_result, bool) and person_up_result is True:
|
|
return person_id
|
|
elif isinstance(person_up_result, bool) and person_up_result is False:
|
|
return False
|
|
elif isinstance(person_up_result, int):
|
|
return person_up_result
|
|
else:
|
|
return False
|
|
# ### END ### API Person Methods ### create_update_person_obj_v4b() ###
|
|
|
|
|
|
# ### BEGIN ### API Person Methods ### get_person_rec_w_external_id() ###
|
|
# Updated 2021-08-19
|
|
def get_person_rec_w_external_id(
|
|
account_id: str,
|
|
external_id: str,
|
|
enabled: str = 'enabled', # enabled, disabled, all
|
|
) -> dict|bool:
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
|
|
else: return False
|
|
# account_id = 99
|
|
|
|
data = {}
|
|
data['account_id'] = account_id
|
|
data['external_id'] = external_id
|
|
|
|
sql = f"""
|
|
SELECT `person`.id AS 'person_id', `person`.id_random AS 'person_id_random'
|
|
FROM `person` AS `person`
|
|
WHERE person.account_id = :account_id
|
|
AND person.external_id = :external_id
|
|
LIMIT 1;
|
|
"""
|
|
|
|
if person_rec_result := sql_select(data=data, sql=sql):
|
|
person_rec = person_rec_result
|
|
else:
|
|
person_rec = None
|
|
log.debug(person_rec_result)
|
|
|
|
return person_rec
|
|
# ### END ### API Person Methods ### get_person_rec_w_external_id() ###
|
|
|
|
|
|
# ### BEGIN ### API Person Methods ### get_account_id_w_person_id() ###
|
|
# Updated 2021-08-25
|
|
def get_account_id_w_person_id(
|
|
person_id: int|str,
|
|
) -> bool|int|None:
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if person_id := redis_lookup_id_random(record_id_random=person_id, table_name='person'): pass
|
|
else: return False
|
|
|
|
data = {}
|
|
data['person_id'] = person_id
|
|
|
|
sql = f"""
|
|
SELECT `person`.id AS 'person_id', `person`.id_random AS 'person_id_random', `person`.account_id AS account_id
|
|
FROM `person` AS `person`
|
|
WHERE `person`.id = :person_id
|
|
LIMIT 1;
|
|
"""
|
|
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
if person_data_result := sql_select(data=data, sql=sql):
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(person_data_result)
|
|
if account_id := person_data_result.get('account_id', None): return account_id
|
|
else: return False
|
|
else: return None
|
|
# ### END ### API Person Methods ### get_account_id_w_person_id() ###
|
|
|
|
|
|
|
|
|
|
# ### BEGIN ### API Person Methods ### create_person_obj_v3() ###
|
|
# NOTE: This will create a person and then also create a linked contact if person_obj.contact data is passed. The create_contact_obj will create a contact and then also create a linked address if person_obj.contact.address data is passed.
|
|
# Updated 2021-08-25
|
|
# Reviewed and updated 2021-08-24
|
|
# Reviewed and updated 2021-08-21
|
|
# Reviewed and updated 2021-08-10
|
|
def create_person_obj_v3(
|
|
account_id: int|str,
|
|
person_obj_new: Person_Base,
|
|
create_sub_obj: bool = False,
|
|
fail_any: bool = True, # Fail if any thing goes wrong for sub objects
|
|
return_outline: bool = False,
|
|
) -> bool|dict|int:
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
|
|
else: return False
|
|
|
|
log.debug(type(person_obj_new))
|
|
if isinstance(person_obj_new, dict):
|
|
try:
|
|
person_obj_new = Person_Base(**person_obj_new)
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(person_obj_new)
|
|
except ValidationError as e:
|
|
log.error(e.json())
|
|
return False
|
|
|
|
person_obj_data = person_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'contact', 'organization', 'user', 'created_on', 'updated_on'})
|
|
log.debug(person_obj_data)
|
|
person_obj_data['account_id'] = account_id
|
|
|
|
if person_obj_in_result := sql_insert(data=person_obj_data, table_name='person', rm_id_random=True, id_random_length=8): pass
|
|
else:
|
|
log.warning(f'Person not created.')
|
|
log.debug(person_obj_in_result)
|
|
return False
|
|
|
|
person_id = person_obj_in_result
|
|
|
|
person_outline = {}
|
|
person_outline['person_id'] = None
|
|
person_outline['contact_id'] = None
|
|
person_outline['contact'] = {}
|
|
person_outline['contact']['address_id'] = None
|
|
person_outline['organization_id'] = None
|
|
person_outline['user_id'] = None
|
|
|
|
# NOTE: Use object model version because of better type checking and validations
|
|
if person_obj_new.contact:
|
|
person_outline['contact_id'] = None
|
|
contact_obj = person_obj_new.contact
|
|
if contact_id := person_obj_new.contact_id: pass
|
|
elif contact_id := contact_obj.id: pass
|
|
else: contact_id = None
|
|
contact_obj.id
|
|
contact_obj.for_type = 'person'
|
|
contact_obj.for_id = person_id
|
|
create_update_contact_obj_result = create_update_contact_obj_v4(
|
|
contact_dict_obj = contact_obj,
|
|
contact_id = contact_id,
|
|
account_id = account_id,
|
|
for_type = 'person',
|
|
for_id = person_id,
|
|
fail_any = fail_any,
|
|
return_outline = return_outline,
|
|
)
|
|
if isinstance(create_update_contact_obj_result, int):
|
|
contact_id = create_update_contact_obj_result
|
|
elif create_update_contact_obj_result == True: pass
|
|
else:
|
|
log.warning(f'Create or Update failed while trying create_update_contact_obj_v4(): {create_update_contact_obj_result}')
|
|
contact_id = None
|
|
|
|
person_outline['contact_id'] = contact_id
|
|
|
|
if person_obj_new.organization and isinstance(person_obj_new.organization, dict):
|
|
log.info(f'Organization was found. Create a new Organization and link it to the new Person or update existing Organization. Person ID: {person_id}')
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
organization_obj_unknown = person_obj_new.organization
|
|
log.debug(organization_obj_unknown)
|
|
if organization_id := organization_obj_unknown.get('organization_id_random', None):
|
|
log.warning('Organization ID found. This is not expected, but should be ok.')
|
|
# from app.methods.organization_methods import update_organization_obj_v3
|
|
if update_organization_obj_result := update_organization_obj(
|
|
organization_id = organization_id,
|
|
organization_obj_up = organization_obj_unknown,
|
|
create_sub_obj = create_sub_obj,
|
|
fail_any = fail_any,
|
|
):
|
|
organization_id = update_organization_obj_result
|
|
log.info(f'Organization updated. Organization ID: {organization_id}')
|
|
else:
|
|
log.warning(f'Organization not updated. Person ID: {person_id}')
|
|
log.debug(update_organization_obj_result)
|
|
organization_id = None
|
|
if fail_any: return False
|
|
|
|
if isinstance(update_organization_obj_result, int):
|
|
organization_id = update_organization_obj_result
|
|
log.info(f'Organization updated. Organization ID: {organization_id}')
|
|
else:
|
|
log.warning(f'Organization not updated. Person ID: {person_id}')
|
|
log.debug(update_organization_obj_result)
|
|
organization_id = None
|
|
if fail_any: return False
|
|
else:
|
|
log.info(f'No Organization ID found.')
|
|
# from app.methods.organization_methods import create_organization_obj_v3
|
|
if create_organization_obj_result := create_organization_obj(
|
|
organization_obj_new = organization_obj_unknown,
|
|
create_sub_obj = create_sub_obj,
|
|
fail_any = fail_any,
|
|
):
|
|
if isinstance(create_organization_obj_result, int):
|
|
organization_id = create_organization_obj_result
|
|
log.info(f'Organization created. Organization ID: {organization_id}')
|
|
else:
|
|
log.warning(f'Organization not created. Person ID: {person_id}')
|
|
log.debug(create_organization_obj_result)
|
|
organization_id = None
|
|
if fail_any: return False
|
|
else:
|
|
log.warning(f'Organization not created. Person ID: {person_id}')
|
|
log.debug(create_organization_obj_result)
|
|
organization_id = None
|
|
if fail_any: return False
|
|
person_outline['organization_id'] = organization_id
|
|
else:
|
|
log.info('Organization not found or not in a dict.')
|
|
pass
|
|
|
|
if person_obj_new.user and isinstance(person_obj_new.user, dict):
|
|
log.info(f'User was found. Create a new User and link it to the new Person or update existing User. Person ID: {person_id}')
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
user_obj_unknown = person_obj_new.user
|
|
log.debug(user_obj_unknown)
|
|
if user_id := user_obj_unknown.get('user_id_random', None):
|
|
log.warning('User ID found. This is not expected, but should be ok.')
|
|
# from app.methods.user_methods import update_user_obj_v3
|
|
from app.methods.user_methods import update_user_obj
|
|
if update_user_obj_result := update_user_obj(
|
|
user_id = user_id,
|
|
user_obj_up = user_obj_unknown,
|
|
# create_sub_obj = create_sub_obj,
|
|
# fail_any = fail_any,
|
|
):
|
|
user_id = update_user_obj_result
|
|
log.info(f'User updated. User ID: {user_id}')
|
|
|
|
# NOTE: This should not be needed. Updating person just in case!
|
|
# Need to update the person with the current user_id
|
|
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
|
|
else: return False
|
|
log.info(f'Need to update the person with the current user_id.')
|
|
person_dict_up = {}
|
|
person_dict_up['id'] = person_id
|
|
person_dict_up['user_id'] = user_id
|
|
if person_dict_up_result := sql_update(data=person_dict_up, table_name='person'): pass
|
|
else:
|
|
log.error(f'Person not updated with current User ID. Person ID: {person_id} User ID: {user_id}')
|
|
if fail_any: return False
|
|
log.debug(person_dict_up_result)
|
|
else:
|
|
log.warning(f'User not updated. Person ID: {person_id}')
|
|
log.debug(update_user_obj_result)
|
|
user_id = None
|
|
if fail_any: return False
|
|
|
|
if isinstance(update_user_obj_result, int):
|
|
user_id = update_user_obj_result
|
|
log.info(f'User updated. User ID: {user_id}')
|
|
|
|
# Need to update the person with the current user_id
|
|
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
|
|
else: return False
|
|
log.info(f'Need to update the person with the current user_id.')
|
|
person_dict_up = {}
|
|
person_dict_up['id'] = person_id
|
|
person_dict_up['user_id'] = user_id
|
|
if person_dict_up_result := sql_update(data=person_dict_up, table_name='person'): pass
|
|
else:
|
|
log.error(f'Person not updated with current User ID. Person ID: {person_id} User ID: {user_id}')
|
|
if fail_any: return False
|
|
log.debug(person_dict_up_result)
|
|
else:
|
|
log.warning(f'User not updated. Person ID: {person_id}')
|
|
log.debug(update_user_obj_result)
|
|
user_id = None
|
|
if fail_any: return False
|
|
else:
|
|
log.info(f'No User ID found.')
|
|
# from app.methods.user_methods import create_user_obj_v3
|
|
from app.methods.user_methods import create_user_obj
|
|
if create_user_obj_result := create_user_obj(
|
|
account_id = account_id,
|
|
user_obj_new = user_obj_unknown,
|
|
# create_sub_obj = create_sub_obj,
|
|
# fail_any = fail_any,
|
|
):
|
|
if isinstance(create_user_obj_result, int):
|
|
user_id = create_user_obj_result
|
|
log.info(f'User created. User ID: {user_id}')
|
|
|
|
# Need to update the person with the new user_id
|
|
log.info(f'Need to update the person with the new user_id.')
|
|
person_dict_up = {}
|
|
person_dict_up['id'] = person_id
|
|
person_dict_up['user_id'] = user_id
|
|
if person_dict_up_result := sql_update(data=person_dict_up, table_name='person'): pass
|
|
else:
|
|
log.error(f'Person not updated with new User ID. Person ID: {person_id} User ID: {user_id}')
|
|
if fail_any: return False
|
|
log.debug(person_dict_up_result)
|
|
else:
|
|
log.warning(f'User not created. Person ID: {person_id}')
|
|
log.debug(create_user_obj_result)
|
|
user_id = None
|
|
if fail_any: return False
|
|
else:
|
|
log.warning(f'User not created. Person ID: {person_id}')
|
|
log.debug(create_user_obj_result)
|
|
user_id = None
|
|
if fail_any: return False
|
|
|
|
# Need to update the person with the new user_id
|
|
person_obj_up = {}
|
|
person_obj_up['id'] = person_id
|
|
person_obj_up['user_id'] = user_id
|
|
if person_obj_up_result := sql_update(data=person_obj_up, table_name='person'):
|
|
log.info(f'Person updated with user ID. Person ID: {person_id} User ID: {user_id}')
|
|
else:
|
|
log.warning(f'Person not updated with user ID. Person ID: {person_id} User ID: {user_id}')
|
|
if fail_any: return False
|
|
log.debug(person_obj_up_result)
|
|
|
|
person_outline['user_id'] = user_id
|
|
else:
|
|
log.info('User not found or not in a dict.')
|
|
pass
|
|
|
|
|
|
# NOTE: This is the older pre v3 version
|
|
# if person_obj_new.user:
|
|
# log.info(f'User data was found. Create a new user and link it to the new person. Person ID: {person_id}')
|
|
# from app.methods.user_methods import create_user_obj
|
|
# user_obj_new = person_obj_new.user
|
|
# user_obj_new.person_id = person_id
|
|
# create_user_obj_result = create_user_obj(user_obj_new=user_obj_new)
|
|
# if isinstance(create_user_obj_result, int):
|
|
# log.info(f'User created. User ID: {user_id} Update person {person_id} with new user ID.')
|
|
# user_id = create_user_obj_result
|
|
# # Need to update the person with the new user_id
|
|
# person_obj_up = {}
|
|
# person_obj_up['id'] = person_id
|
|
# person_obj_up['user_id'] = user_id
|
|
# if person_obj_up_result := sql_update(data=person_obj_up, table_name='person'):
|
|
# log.info(f'Person updated with user ID. Person ID: {person_id} User ID: {user_id}')
|
|
# else:
|
|
# log.warning(f'Person not updated with user ID. Person ID: {person_id} User ID: {user_id}')
|
|
# if fail_any: return False
|
|
# log.debug(person_obj_up_result)
|
|
# else:
|
|
# log.warning(f'User not created. Person ID: {person_id}')
|
|
# log.debug(create_user_obj_result)
|
|
# user_id = None
|
|
# if fail_any: return False
|
|
|
|
log.debug(f'Returning the new person_id: {person_id}')
|
|
return person_id
|
|
# ### END ### API Person Methods ### create_person_obj_v3() ###
|
|
|
|
|
|
# ### BEGIN ### API Person Methods ### update_person_obj_v3() ###
|
|
# Updated 2021-08-25
|
|
# Updated 2021-08-24
|
|
def update_person_obj_v3(
|
|
person_id: int|str,
|
|
person_obj_exist: Person_Base,
|
|
create_sub_obj: bool = False,
|
|
fail_any: bool = False, # Fail if any thing goes wrong for sub objects
|
|
return_outline: bool = False,
|
|
) -> bool:
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if person_id := redis_lookup_id_random(record_id_random=person_id, table_name='person'): pass
|
|
else: return False
|
|
|
|
account_id = get_account_id_w_person_id(person_id=person_id)
|
|
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(type(person_obj_exist))
|
|
log.debug(person_obj_exist)
|
|
if isinstance(person_obj_exist, dict):
|
|
try:
|
|
person_obj_exist = Person_Base(**person_obj_exist)
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(person_obj_exist)
|
|
except ValidationError as e:
|
|
log.error(e.json())
|
|
return False
|
|
|
|
# Can't update the person_id alias if the .id was never set.
|
|
# person_obj_exist.person_id = person_id
|
|
if not person_obj_exist.id:
|
|
person_obj_exist.id = person_id
|
|
|
|
person_obj_data = person_obj_exist.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'contact', 'organization', 'user', 'created_on', 'updated_on'})
|
|
log.debug(person_obj_data)
|
|
|
|
if person_obj_up_result := sql_update(data=person_obj_data, table_name='person', rm_id_random=True): pass
|
|
else:
|
|
log.warning(f'Person not updated.')
|
|
log.debug(person_obj_up_result)
|
|
return False
|
|
|
|
person_outline = {}
|
|
person_outline['person_id'] = person_id
|
|
person_outline['contact_id'] = None
|
|
person_outline['contact'] = {}
|
|
person_outline['contact']['address_id'] = None
|
|
person_outline['organization_id'] = None
|
|
person_outline['user_id'] = None
|
|
|
|
# NOTE: Use object model version because of better type checking and validations
|
|
if person_obj_exist.contact:
|
|
person_outline['contact_id'] = None
|
|
contact_obj = person_obj_exist.contact
|
|
if contact_id := person_obj_exist.contact_id: pass
|
|
elif contact_id := contact_obj.id: pass
|
|
else: contact_id = None
|
|
contact_obj.id
|
|
contact_obj.for_type = 'person'
|
|
contact_obj.for_id = person_id
|
|
create_update_contact_obj_result = create_update_contact_obj_v4(
|
|
contact_dict_obj = contact_obj,
|
|
contact_id = contact_id,
|
|
account_id = account_id,
|
|
for_type = 'person',
|
|
for_id = person_id,
|
|
fail_any = fail_any,
|
|
return_outline = return_outline,
|
|
)
|
|
if isinstance(create_update_contact_obj_result, int):
|
|
contact_id = create_update_contact_obj_result
|
|
elif create_update_contact_obj_result == True: pass
|
|
else:
|
|
log.warning(f'Create or Update failed while trying create_update_contact_obj_v4(): {create_update_contact_obj_result}')
|
|
contact_id = None
|
|
|
|
person_outline['contact_id'] = contact_id
|
|
|
|
if person_obj_exist.organization and isinstance(person_obj_exist.organization, dict):
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
organization_obj_unknown = person_obj_exist.organization
|
|
log.debug(organization_obj_unknown)
|
|
if organization_id := organization_obj_unknown.get('organization_id_random', None):
|
|
# from app.methods.organization_methods import update_organization_obj_v3
|
|
if update_organization_obj_result := update_organization_obj(
|
|
organization_id = organization_id,
|
|
organization_obj_exist = organization_obj_unknown,
|
|
create_sub_obj = create_sub_obj,
|
|
fail_any = fail_any,
|
|
):
|
|
organization_id = update_organization_obj_result
|
|
log.info(f'Organization updated. Organization ID: {organization_id}')
|
|
else:
|
|
log.warning(f'Organization not updated. Person ID: {person_id}')
|
|
log.debug(update_organization_obj_result)
|
|
organization_id = None
|
|
if fail_any: return False
|
|
|
|
if isinstance(update_organization_obj_result, int):
|
|
organization_id = update_organization_obj_result
|
|
log.info(f'Organization updated. Organization ID: {organization_id}')
|
|
else:
|
|
log.warning(f'Organization not updated. Person ID: {person_id}')
|
|
log.debug(update_organization_obj_result)
|
|
organization_id = None
|
|
if fail_any: return False
|
|
else:
|
|
log.info(f'No Organization ID found.')
|
|
# from app.methods.organization_methods import create_organization_obj_v3
|
|
if create_organization_obj_result := create_organization_obj(
|
|
person_id = person_id,
|
|
organization_obj_new = organization_obj_unknown,
|
|
create_sub_obj = create_sub_obj,
|
|
fail_any = fail_any,
|
|
):
|
|
if isinstance(create_organization_obj_result, int):
|
|
organization_id = create_organization_obj_result
|
|
log.info(f'Organization created. Organization ID: {organization_id}')
|
|
else:
|
|
log.warning(f'Organization not created. Person ID: {person_id}')
|
|
log.debug(create_organization_obj_result)
|
|
organization_id = None
|
|
if fail_any: return False
|
|
else:
|
|
log.warning(f'Organization not created. Person ID: {person_id}')
|
|
log.debug(create_organization_obj_result)
|
|
organization_id = None
|
|
if fail_any: return False
|
|
person_outline['organization_id'] = organization_id
|
|
else:
|
|
log.info('Organization not found or not in a dict.')
|
|
pass
|
|
|
|
if person_obj_exist.user and isinstance(person_obj_exist.user, dict):
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
user_obj_unknown = person_obj_exist.user
|
|
log.debug(user_obj_unknown)
|
|
if user_id := user_obj_unknown.get('user_id_random', None):
|
|
# from app.methods.user_methods import update_user_obj_v3
|
|
from app.methods.user_methods import update_user_obj
|
|
if update_user_obj_result := update_user_obj(
|
|
user_id = user_id,
|
|
user_obj_up = user_obj_unknown,
|
|
create_sub_obj = create_sub_obj,
|
|
fail_any = fail_any,
|
|
):
|
|
# user_id = update_user_obj_result # Result will hopefully always be 1 or True
|
|
log.info(f'User updated. User ID: {user_id}')
|
|
log.debug(update_user_obj_result)
|
|
|
|
# NOTE: This should not be needed. Updating person just in case!
|
|
# Need to update the person with the current user_id
|
|
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
|
|
else: return False
|
|
log.info(f'Need to update the person with the current user_id.')
|
|
person_dict_up = {}
|
|
person_dict_up['id'] = person_id
|
|
person_dict_up['user_id'] = user_id
|
|
if person_dict_up_result := sql_update(data=person_dict_up, table_name='person'): pass
|
|
else:
|
|
log.error(f'Person not updated with current User ID. Person ID: {person_id} User ID: {user_id}')
|
|
if fail_any: return False
|
|
log.debug(person_dict_up_result)
|
|
else:
|
|
log.warning(f'User not updated. Person ID: {person_id}')
|
|
log.debug(update_user_obj_result)
|
|
user_id = None
|
|
if fail_any: return False
|
|
|
|
if isinstance(update_user_obj_result, int):
|
|
# user_id = update_user_obj_result # Result will hopefully always be 1 or True
|
|
log.info(f'User updated. User ID: {user_id}')
|
|
log.debug(update_user_obj_result)
|
|
|
|
# NOTE: This should not be needed. Updating person just in case!
|
|
# Need to update the person with the current user_id
|
|
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
|
|
else: return False
|
|
log.info(f'Need to update the person with the current user_id.')
|
|
person_dict_up = {}
|
|
person_dict_up['id'] = person_id
|
|
person_dict_up['user_id'] = user_id
|
|
if person_dict_up_result := sql_update(data=person_dict_up, table_name='person'): pass
|
|
else:
|
|
log.error(f'Person not updated with current User ID. Person ID: {person_id} User ID: {user_id}')
|
|
if fail_any: return False
|
|
log.debug(person_dict_up_result)
|
|
else:
|
|
log.warning(f'User not updated. Person ID: {person_id}')
|
|
log.debug(update_user_obj_result)
|
|
user_id = None
|
|
if fail_any: return False
|
|
else:
|
|
log.info(f'No User ID found.')
|
|
# from app.methods.user_methods import create_user_obj_v3
|
|
from app.methods.user_methods import create_user_obj
|
|
if create_user_obj_result := create_user_obj(
|
|
account_id = account_id,
|
|
user_obj_new = user_obj_unknown,
|
|
create_sub_obj = create_sub_obj,
|
|
fail_any = fail_any,
|
|
):
|
|
if isinstance(create_user_obj_result, int):
|
|
user_id = create_user_obj_result
|
|
log.info(f'User created. User ID: {user_id}')
|
|
|
|
# Need to update the person with the new user_id
|
|
log.info(f'Need to update the person with the new user_id.')
|
|
person_dict_up = {}
|
|
person_dict_up['id'] = person_id
|
|
person_dict_up['user_id'] = user_id
|
|
if person_dict_up_result := sql_update(data=person_dict_up, table_name='person'): pass
|
|
else:
|
|
log.error(f'Person not updated with new User ID. Person ID: {person_id} User ID: {user_id}')
|
|
if fail_any: return False
|
|
log.debug(person_dict_up_result)
|
|
else:
|
|
log.warning(f'User not created. Person ID: {person_id}')
|
|
log.debug(create_user_obj_result)
|
|
user_id = None
|
|
if fail_any: return False
|
|
else:
|
|
log.warning(f'User not created. Person ID: {person_id}')
|
|
log.debug(create_user_obj_result)
|
|
user_id = None
|
|
if fail_any: return False
|
|
person_outline['user_id'] = user_id
|
|
else:
|
|
log.info('User not found or not in a dict.')
|
|
pass
|
|
|
|
log.info(f'The Person has been updated. Person ID: {person_id}')
|
|
return True
|
|
# ### END ### API Person Methods ### update_person_obj_v3() ###
|
|
|
|
|
|
|
|
# ### BEGIN ### API Person Methods ### update_person_obj() ###
|
|
# NOTE: This will update a person and then also create or update a linked contact if person_obj.contact data is passed. The create_contact_obj will create a contact and then also create a linked address if person_obj.contact.address data is passed. The update_contact_obj will update a contact and then also create or update a linked address if person_obj.contact.address data is passed.
|
|
# Reviewed and updated 2021-08-10
|
|
def update_person_obj(
|
|
person_id: int|str, # Ideally the int ID should be passed. This allows for updating of the id_random value.
|
|
person_obj_up: Person_Base,
|
|
create_sub_obj: bool = False,
|
|
fail_any: bool = True, # Fail if any thing goes wrong for sub objects
|
|
) -> bool:
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if person_id := redis_lookup_id_random(record_id_random=person_id, table_name='person'): pass
|
|
else: return False
|
|
|
|
account_id = get_account_id_w_person_id(person_id=person_id)
|
|
|
|
person_obj_up.id = person_id
|
|
|
|
log.debug(person_obj_up)
|
|
# log.debug(person_obj_up.dict(by_alias=True, exclude_unset=True))
|
|
log.debug(person_obj_up.dict(by_alias=False, exclude_unset=True))
|
|
# log.debug(person_obj_up.dict(by_alias=False, exclude_unset=False))
|
|
|
|
if person_obj_up.contact_id and person_obj_up.contact:
|
|
contact_id = person_obj_up.contact_id
|
|
contact_obj_up = person_obj_up.contact
|
|
log.debug(contact_id)
|
|
log.debug(contact_obj_up)
|
|
if contact_obj_up_result := update_contact_obj(
|
|
contact_id = contact_id,
|
|
contact_obj_up = contact_obj_up,
|
|
create_sub_obj = create_sub_obj,
|
|
):
|
|
log.debug(contact_obj_up_result)
|
|
else:
|
|
log.debug(contact_obj_up_result)
|
|
return False
|
|
elif person_obj_up.contact and not person_obj_up.contact.id:
|
|
# NOTE: This will blindly create a new contact even if there was one associated but the person.contact_id was not found.
|
|
contact_obj_in = person_obj_up.contact
|
|
log.debug(contact_obj_in)
|
|
if contact_obj_in_result := create_contact_obj(contact_obj_new=contact_obj_in):
|
|
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(contact_obj_in_result)
|
|
# NOTE: This last update should no longer be needed now that the person.contact_id is not supposed to be used.
|
|
# Need to update the person with the new contact_id
|
|
person_obj_up.contact_id = contact_obj_in_result # REMOVE
|
|
else:
|
|
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(contact_obj_in_result)
|
|
return False
|
|
|
|
if person_obj_up.organization_id and person_obj_up.organization:
|
|
organization_id = person_obj_up.organization_id
|
|
organization_obj_up = person_obj_up.organization
|
|
log.debug(organization_id)
|
|
log.debug(organization_obj_up)
|
|
if organization_obj_up_result := update_organization_obj(
|
|
organization_id = organization_id,
|
|
organization_obj_up = organization_obj_up,
|
|
create_sub_obj = create_sub_obj,
|
|
):
|
|
log.debug(organization_obj_up_result)
|
|
else:
|
|
log.debug(organization_obj_up_result)
|
|
return False
|
|
elif person_obj_up.organization and not person_obj_up.organization.id:
|
|
# NOTE: This will blindly create a new organization even if there was one associated but the person.organization_id was not found.
|
|
organization_obj_in = person_obj_up.organization
|
|
log.debug(organization_obj_in)
|
|
if organization_obj_in_result := create_organization_obj(organization_obj_new=organization_obj_in):
|
|
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(organization_obj_in_result)
|
|
# Need to update the person with the new organization_id
|
|
person_obj_up.organization_id = organization_obj_in_result
|
|
else:
|
|
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(organization_obj_in_result)
|
|
return False
|
|
|
|
if person_obj_up.user_id and person_obj_up.user:
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
user_id = person_obj_up.user_id
|
|
user_obj_up = person_obj_up.user
|
|
log.debug(user_id)
|
|
log.debug(user_obj_up)
|
|
if user_obj_up_result := update_user_obj(
|
|
user_id = user_id,
|
|
user_obj_up = user_obj_up,
|
|
create_sub_obj = create_sub_obj,
|
|
):
|
|
log.debug(user_obj_up_result)
|
|
else:
|
|
log.debug(user_obj_up_result)
|
|
return False
|
|
elif person_obj_up.user and not person_obj_up.user.id:
|
|
# NOTE: This will blindly create a new user even if there was one associated but the person.user_id was not found.
|
|
user_obj_in = person_obj_up.user
|
|
log.debug(user_obj_in)
|
|
if user_obj_in_result := create_user_obj(account_id=account_id, user_obj_new=user_obj_in):
|
|
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(user_obj_in_result)
|
|
# Need to update the person with the new user_id
|
|
person_obj_up.user_id = user_obj_in_result
|
|
else:
|
|
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(user_obj_in_result)
|
|
return False
|
|
|
|
person_dict_up = person_obj_up.dict(by_alias=False, exclude_unset=True, exclude={'contact', 'organization', 'user'})
|
|
log.debug(person_dict_up)
|
|
|
|
if person_obj_up_result := sql_update(data=person_dict_up, table_name='person', rm_id_random=True):
|
|
log.debug(person_obj_up_result)
|
|
return True
|
|
else:
|
|
log.debug(person_obj_up_result)
|
|
return False
|
|
# ### END ### API Person Methods ### update_person_obj() ###
|
|
|
|
|
|
|
|
# # ### BEGIN ### API Person Methods ### create_update_person_obj_v4b() ###
|
|
# def create_update_person_obj_v4b(
|
|
# person_id: int|str|None, # Ideally the int ID should be passed. This allows for updating of the id_random value.
|
|
# person_obj: Person_Base,
|
|
# process_contact: bool = False,
|
|
# process_organization: bool = False,
|
|
# ) -> bool:
|
|
# log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
# log.debug(locals())
|
|
|
|
# if person_id:
|
|
# log.info(f'Person ID passed. Update existing Person. Person ID: {person_id}')
|
|
|
|
# if person_id := redis_lookup_id_random(record_id_random=person_id, table_name='person'): pass
|
|
# else: return False
|
|
# person_obj.id = person_id
|
|
# else:
|
|
# # Insert record now and update later
|
|
# log.info('No Person ID passed. Create new Person. Required: Account ID')
|
|
|
|
# person_dict_in = person_obj.dict(by_alias=False, exclude_unset=True, exclude={'contact_id', 'contact_id_random', 'contact', 'organization', 'user'})
|
|
# log.debug(person_dict_in)
|
|
# person_in_result = sql_insert(
|
|
# data = person_dict_in,
|
|
# table_name = 'person',
|
|
# rm_id_random = True,
|
|
# id_random_length = default_num_bytes,
|
|
# )
|
|
# log.debug(person_in_result)
|
|
# if isinstance(person_in_result, bool) and person_in_result is True:
|
|
# return person_in_result
|
|
# elif isinstance(person_in_result, int):
|
|
# person_id = person_in_result
|
|
# person_obj.id = person_id
|
|
# else:
|
|
# return False # This should not happen.
|
|
|
|
# # Process contact data
|
|
# if process_contact and person_obj.contact:
|
|
# contact_obj = person_obj.contact
|
|
# contact_obj.for_type = 'person'
|
|
# contact_obj.for_id = person_id
|
|
# contact_id = person_obj.contact_id_random
|
|
# contact_result = create_update_contact_obj(
|
|
# contact_id = contact_id,
|
|
# contact_obj = contact_obj,
|
|
# process_address = True, # Setting to True under the assumption that if there is contact information then there is probably an address.
|
|
# )
|
|
# log.debug(contact_result)
|
|
# if isinstance(contact_result, bool) and contact_result is True:
|
|
# pass # Do not need to update person object.
|
|
# elif isinstance(contact_result, bool) and contact_result is False:
|
|
# pass # Do not need to update person object.
|
|
# elif isinstance(contact_result, int):
|
|
# person_obj.contact_id = contact_result
|
|
# # pass # Do not need to update person object.
|
|
# else:
|
|
# log.warning('Something may have gone wrong while trying to create or update a contact.')
|
|
|
|
# # Process organization data
|
|
# if process_organization and person_obj.organization:
|
|
# organization_obj = person_obj.organization
|
|
# organization_id = person_obj.organization_id_random
|
|
# organization_result = create_update_organization_obj(
|
|
# organization_id = organization_id,
|
|
# organization_obj = organization_obj,
|
|
# process_contact = True, # Setting to True under the assumption that if there is organization information then there is probably a contact (and address).
|
|
# )
|
|
# log.debug(organization_result)
|
|
# if isinstance(organization_result, bool) and organization_result is True:
|
|
# pass # Do not need to update person object.
|
|
# elif isinstance(organization_result, bool) and organization_result is False:
|
|
# pass # Do not need to update person object.
|
|
# elif isinstance(organization_result, int):
|
|
# person_obj.organization_id = organization_result
|
|
# else:
|
|
# log.warning('Something may have gone wrong while trying to create or update a organization.')
|
|
|
|
# # Process person data
|
|
# person_dict_up = person_obj.dict(by_alias=False, exclude_unset=True, exclude={'contact_id', 'contact_id_random', 'contact', 'organization', 'user'})
|
|
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
# log.debug(person_obj)
|
|
# log.debug(person_dict_up)
|
|
|
|
# # Update record
|
|
# person_up_result = sql_update(
|
|
# data = person_dict_up,
|
|
# table_name = 'person',
|
|
# rm_id_random = True,
|
|
# )
|
|
# log.debug(person_up_result)
|
|
# if isinstance(person_up_result, bool) and person_up_result is True:
|
|
# return person_id
|
|
# elif isinstance(person_up_result, bool) and person_up_result is False:
|
|
# return False
|
|
# elif isinstance(person_up_result, int):
|
|
# return person_up_result
|
|
# else:
|
|
# return False
|
|
# # ### END ### API Person Methods ### create_update_person_obj_v4b() ###
|
|
|
|
|
|
# ### BEGIN ### Person Methods ### email_person_create_url() ###
|
|
# This emails the actual one time use account creation URL for a person.
|
|
# Updated 2021-12-03
|
|
def email_person_create_url(
|
|
account_id: int|str,
|
|
person_id: int|str,
|
|
root_url: str,
|
|
):
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
|
|
else: return False
|
|
|
|
if person_id := redis_lookup_id_random(record_id_random=person_id, table_name='person'): pass
|
|
else: return False
|
|
|
|
if person_obj := load_person_obj(
|
|
person_id = person_id,
|
|
):
|
|
log.info('Person object loaded')
|
|
else: return False
|
|
log.debug(person_obj)
|
|
|
|
from app.methods.account_cfg_methods import load_account_cfg_obj
|
|
if account_cfg := load_account_cfg_obj(
|
|
account_id = account_id,
|
|
):
|
|
log.info('Account config loaded')
|
|
else: return False
|
|
log.debug(account_cfg)
|
|
|
|
person_id_random = person_obj.id_random # NOTE: Not person_id_random because of alias
|
|
|
|
from_email = account_cfg.default_no_reply_email
|
|
from_name = account_cfg.default_no_reply_name
|
|
|
|
to_name = person_obj.display_name
|
|
to_email = person_obj.email
|
|
|
|
bcc_email = account_cfg.confirm_email
|
|
bcc_name = account_cfg.confirm_name
|
|
|
|
help_tech_email = account_cfg.help_tech_email
|
|
help_tech_name = account_cfg.help_tech_name
|
|
|
|
account_short_name = account_cfg.account_short_name
|
|
|
|
person_create_url = f'{root_url}person/{person_id_random}/create'
|
|
# person_create_auth_key_url = f'{root_url}?user_id={user_id_random}&auth_key={new_auth_key}'
|
|
|
|
subject = f'{account_short_name}: One Time Use Create Account Link'
|
|
# subject = f'{account_short_name}: One Time Use Create Account Link ({new_auth_key})'
|
|
|
|
body_html = f"""
|
|
<p>{to_name},</p>
|
|
|
|
<p>If you did not request this account creation link, please delete this email. It is suggested that you delete this email after the account creation link has been used or if a new link has been requested.</p>
|
|
|
|
<p>The link below can only be used once. If you would like try again using this method, you must <a href="NOT READY YET">request a new account creation link</a>. If you request multiple links, only the newest link will work.</p>
|
|
|
|
<p><strong><a href="{person_create_url}" style="appearance: button; display: inline-block; text-align: center; text-decoration: none; padding: .2rem .4rem; border: solid thin gray; border-radius: .2rem; background-color: lightyellow; color: black; font-size: larger;">Click to Finish Account Creation With One Time Use Link</a></strong></p>
|
|
|
|
<p>Or copy and paste the link:<br>
|
|
<strong style="background-color: lightyellow; color: black; font-size: larger;"><a href="{person_create_url}">{person_create_url}</a></strong></p>
|
|
|
|
<p>If you have questions about this email or trouble with this one time use link, you can email <a href="mailto:{help_tech_email}">{help_tech_name} ({help_tech_email})</a>.</p>
|
|
|
|
<p>Thank you!</p>
|
|
"""
|
|
|
|
if send_email(from_email=from_email, from_name=from_name, to_email=to_email, to_name=to_name, bcc_email=bcc_email, bcc_name=bcc_name, subject=subject, body_text=None, body_html=body_html):
|
|
log.info(f'An email with a one time use sign in link was sent to {to_email}.')
|
|
return True
|
|
else:
|
|
log.info(f'An email with a one time use sign in link was not sent to {to_email}.')
|
|
return False
|
|
# ### END ### User ### email_user_auth_key_url() ###
|