Files
OSIT-AE-API-FastAPI/app/methods/user_methods.py
2021-06-22 18:00:34 -04:00

411 lines
16 KiB
Python

from __future__ import annotations
import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
from app.lib_general import log, logging
from app.methods.contact_methods import load_contact_obj, update_contact_obj
# from app.methods.event_methods import get_event_rec_list
from app.methods.order_methods import load_order_obj, get_order_rec_list
from app.methods.organization_methods import load_organization_obj, update_organization_obj
from app.methods.person_methods import load_person_obj, update_person_obj
from app.methods.post_methods import get_post_rec_list, load_post_obj
from app.models.user_models import User_Base, User_New_Base, User_Out_Base
from app.models.user_role_models import User_Role_Base
# ### BEGIN ### API User Methods ### create_user_obj() ###
def create_user_obj(user_obj_new:User_New_Base) -> int|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if not user_obj_new:
return False
# user_obj_data = user_obj_new.dict(by_alias=False, exclude_defaults=False, include={'password'}, exclude={'new_password'})
# log.debug(user_obj_data)
user_obj_data = user_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'new_password'})
log.debug(user_obj_data)
user_obj_data['password'] = user_obj_new.password # There has to be a better way to do this??? It thinks "password" is unset and so is excluded?
log.debug(user_obj_data)
log.info('Checking if the username is already in use for the account...')
sql_select_user = f"""
SELECT user.id, user.id_random, user.name, user.email
FROM `user` AS user
WHERE user.account_id = :account_id and user.username = :username
"""
if sql_select_result := sql_select(sql=sql_select_user, data=user_obj_data, rm_id_random=True):
if isinstance(sql_select_result, list):
log.exception(f"Multiple user accounts already exists with this username ({user_obj_data['username']}). The database needs to be checked.")
return False
log.info('A user account already exists with this username. Updating instead of inserting.')
user_id = sql_select_result.get('id', None)
user_obj_data['id'] = user_id
if user_obj_up_result := sql_update(data=user_obj_data, table_name='user', rm_id_random=True): pass
else:
return False
#log.setLevel(logging.DEBUG)
log.debug(user_obj_up_result)
log.debug(f'Returning the existing user_id: {user_id}')
else:
if user_obj_in_result := sql_insert(data=user_obj_data, table_name='user', rm_id_random=True, id_random_length=8): pass
else:
return False
#log.setLevel(logging.DEBUG)
log.debug(user_obj_in_result)
user_id = user_obj_in_result
log.debug(f'Returning the new user_id: {user_id}')
return user_id
# ### END ### API User Methods ### create_user_obj() ###
# ### BEGIN ### API User Methods ### load_user_obj() ###
def load_user_obj(
user_id: int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_address: bool = False,
# inc_archive_list: bool = False,
inc_contact: bool = False,
inc_event_list: bool = False,
# inc_hosted_file_list: bool = False,
inc_journal_list: bool = False,
inc_journal_entry_list: bool = False,
inc_membership_member: bool = False,
inc_order_cfg: bool = False,
inc_order_line_list: bool = False,
inc_order_list: bool = False,
inc_order_cart_list: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
# inc_person_list: bool = False,
inc_post_list: bool = False,
inc_post_comment_list: bool = False,
inc_user_role_list: bool = False,
) -> User_Out_Base|bool:
#log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else: return False
if user_rec := sql_select(table_name='v_user', record_id=user_id): pass
else: return False
#log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_rec)
try:
user_obj = User_Out_Base(**user_rec)
log.debug(user_obj)
except ValidationError as e:
log.error(e.json())
return False
# Updated 2021-06-18
# if inc_contact:
# contact_id = user_rec.get('contact_id', None)
# log.debug(contact_id)
# if contact_result := load_contact_obj(
# contact_id = contact_id,
# limit = limit,
# by_alias = by_alias,
# exclude_unset = exclude_unset,
# model_as_dict = model_as_dict,
# enabled = enabled,
# inc_address = inc_address,
# ):
# user_obj.contact = contact_result
# else: user_obj.contact = None
if inc_event_list:
from app.methods.event_methods import load_event_obj_list
if event_dict_list := load_event_obj_list(
user_id = user_id,
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
):
user_obj.event_list = event_dict_list
else: user_obj.event_list = []
# Updated 2021-06-18
if inc_order_list:
if order_rec_list_result := get_order_rec_list(
for_obj_type = 'user',
for_obj_id = user_id,
limit = limit,
enabled = enabled,
):
order_result_list = []
for order_rec in order_rec_list_result:
order_result_list.append(
load_order_obj(
order_id = order_rec.get('order_id', None),
limit = limit,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
enabled = enabled,
inc_order_cfg = inc_order_cfg,
inc_order_line_list = inc_order_line_list,
inc_person = inc_person,
# inc_user = inc_user,
)
)
user_obj.order_list = order_result_list
else: user_obj.order_list = []
# Updated 2021-06-18
if inc_organization:
organization_id = user_rec.get('organization_id', None)
log.debug(organization_id)
if organization_result := load_organization_obj(
organization_id = organization_id,
limit = limit,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
enabled = enabled,
inc_address = inc_address,
inc_contact = inc_contact,
):
user_obj.organization = organization_result
else: user_obj.organization = None
# Updated 2021-06-18
if inc_person:
person_id = user_rec.get('person_id', None)
log.debug(person_id)
if person_result := load_person_obj(
person_id = person_id,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
inc_address = inc_address,
inc_contact = inc_contact,
inc_organization = inc_organization,
):
user_obj.person = person_result
else: user_obj.person = None
log.debug(person_result)
# Updated 2021-06-18
if inc_post_list:
if post_rec_list_result := get_post_rec_list(
for_obj_type = 'user',
for_obj_id = user_id,
limit = limit,
enabled = enabled,
):
post_result_list = []
for post_rec in post_rec_list_result:
post_result_list.append(
load_post_obj(
post_id = post_rec.get('post_id', None),
limit = limit,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
enabled = enabled,
inc_post_comment_list = inc_post_comment_list,
inc_person = inc_person,
# inc_user = inc_user,
)
)
user_obj.post_list = post_result_list
else: user_obj.post_list = []
# NOTE: Including user roles should probably be reviewed
if inc_user_role_list:
if role_rec_li := sql_select(table_name='v_user_role_detail', field_name='user_id', field_value=user_id, as_list=True):
# user_rec['role_list'] = role_rec_li
user_obj.role_list = role_rec_li
else:
# user_rec['role_list'] = None
user_obj.role_list = None
log.debug(user_rec)
log.debug(user_obj)
if model_as_dict:
return user_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return user_obj
# ### END ### API User Methods ### load_user_obj() ###
# ### BEGIN ### API User Methods ### update_user_obj() ###
def update_user_obj(
user_id: int|str, # Ideally the int ID should be passed. This allows for updating of the id_random value.
user_obj_up: User_Base,
create_missing_obj: bool = False,
) -> bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else: return False
user_obj_up.id = user_id
log.debug(user_obj_up)
# log.debug(user_obj_up.dict(by_alias=True, exclude_unset=True))
log.debug(user_obj_up.dict(by_alias=False, exclude_unset=True))
# log.debug(user_obj_up.dict(by_alias=False, exclude_unset=False))
# if user_obj_up.contact_id and user_obj_up.contact:
# contact_id = user_obj_up.contact_id
# contact_obj_up = user_obj_up.contact
# log.debug(contact_id)
# log.debug(contact_obj_up)
# if contact_obj_up_result := update_contact_obj(
# contact_id=contact_id,
# contact_obj_up=contact_obj_up,
# create_missing_obj=create_missing_obj,
# ):
# log.debug(contact_obj_up_result)
# else:
# log.debug(contact_obj_up_result)
# return False
# elif user_obj_up.contact and not user_obj_up.contact.id:
# # NOTE: This will blindly create a new contact even if there was one associated but the user.contact_id was not found.
# contact_obj_in = user_obj_up.contact
# log.debug(contact_obj_in)
# if contact_obj_in_result := create_contact_obj(contact_obj_new=contact_obj_in):
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(contact_obj_in_result)
# user_obj_up.contact_id = contact_obj_in_result
# else:
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(contact_obj_in_result)
# return False
# if organization_obj_update := user_obj_up.organization:
# log.debug(organization_obj_update)
# if organization_obj_up_result := update_organization_obj(organization_obj_update):
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(organization_obj_up_result)
# return True
# else:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(organization_obj_up_result)
# return False
# else:
# if organization_obj_in_result := insert_organization_obj(organization_obj_insert):
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(organization_obj_in_result)
# return True # NOTE: This needs to return the new organization ID
# else:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(organization_obj_in_result)
# return False
# if user_obj_up.person_id and user_obj_up.person:
# person_id = user_obj_up.person_id
# person_obj_up = user_obj_up.person
# log.debug(person_id)
# log.debug(person_obj_up)
# if person_obj_up_result := update_person_obj(
# person_id=person_id,
# person_obj_up=person_obj_up,
# create_missing_obj=create_missing_obj,
# ):
# log.debug(person_obj_up_result)
# else:
# log.debug(person_obj_up_result)
# return False
# elif user_obj_up.person and not user_obj_up.person.id:
# # NOTE: This will blindly create a new person even if there was one associated but the user.person_id was not found.
# person_obj_in = user_obj_up.person
# log.debug(person_obj_in)
# if person_obj_in_result := create_person_obj(person_obj_new=person_obj_in):
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(person_obj_in_result)
# person_obj_up.person_id = person_obj_in_result
# else:
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(person_obj_in_result)
# return False
# IMPORTANT NOTE: Need to be extra careful if allowing an update to password, super, or manager. Maybe other fields?
user_dict_up = user_obj_up.dict(by_alias=False, exclude_unset=True, exclude={'password', 'super', 'manager', 'contact', 'organization', 'person'})
log.debug(user_dict_up)
if user_obj_up_result := sql_update(data=user_dict_up, table_name='user', rm_id_random=True):
log.debug(user_obj_up_result)
return True
else:
log.debug(user_obj_up_result)
return False
# ### END ### API User Methods ### update_user_obj() ###
# ### BEGIN ### API User Methods ### get_user_rec_list() ###
def get_user_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name='for_obj_type'): pass
else: return False
data = {}
data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'user_id', `tbl`.id_random AS 'user_id_random'
FROM `user` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if user_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
user_rec_li = user_rec_li_result
else:
user_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_rec_li_result)
return user_rec_li
# ### END ### API User Methods ### get_user_rec_list() ###