Files
OSIT-AE-API-FastAPI/app/methods/user_methods.py
2021-11-18 15:25:06 -05:00

485 lines
20 KiB
Python

from __future__ import annotations
import datetime, random
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
from app.lib_general import log, logging
from app.methods.contact_methods import load_contact_obj, update_contact_obj
# from app.methods.event_methods import get_event_rec_list
from app.methods.order_methods import load_order_obj, get_order_rec_list
from app.methods.organization_methods import load_organization_obj, update_organization_obj
from app.methods.person_methods import create_person_obj_v3, load_person_obj, update_person_obj
from app.methods.post_methods import get_post_rec_list, load_post_obj
from app.methods.user_role_methods import get_user_role_rec_list, load_user_role_obj
from app.models.user_models import User_Base, User_New_Base, User_Out_Base
# ### BEGIN ### API User Methods ### create_user_obj() ###
# NOTE: This will create a new user and also hash a new password string if given.
# NOTE: This uses the User_New_Base model, not User_Base or User_Out_Base
# Updated 2021-08-25
# Reviewed and updated 2021-08-21
# Reviewed and updated 2021-08-10
def create_user_obj(
account_id: int|str,
user_obj_new: User_New_Base,
allow_update: bool = False, # Allow updating the user account if one is found
avoid_dup_username: bool = False, # Avoid creating a duplicate by modifying the supplied username
create_sub_obj: bool = False,
fail_any: bool = True, # Fail if any thing goes wrong for sub objects
return_dict: bool = False,
) -> bool|dict|int:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: return False
log.debug(type(user_obj_new))
if isinstance(user_obj_new, dict):
user_obj_new['account_id'] = account_id
try:
user_obj_new = User_New_Base(**user_obj_new)
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_obj_new)
except ValidationError as e:
log.error(e.json())
return False
user_obj_data = user_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'contact', 'new_password', 'organization', 'person', 'created_on', 'updated_on'})
log.debug(user_obj_data)
user_obj_data['account_id'] = account_id
user_obj_data['password'] = user_obj_new.password # There has to be a better way to do this??? It thinks "password" is unset and so is excluded?
log.debug(user_obj_data)
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
elif account_id := user_obj_data.get('account_id', None): pass
else: return False
# account_id = user_obj_data.get('account_id', None)
username = user_obj_data.get('username', None)
log.info(f'Checking if the username is already in use for the account... Account: {account_id} Username: {username}')
sql_select_user = f"""
SELECT user.id, user.id_random, user.name, user.email
FROM `user` AS user
WHERE user.account_id = :account_id and user.username = :username
"""
if sql_select_result := sql_select(sql=sql_select_user, data=user_obj_data, rm_id_random=True):
if isinstance(sql_select_result, list):
log.exception(f'Multiple user accounts already exists with this username. The database needs to be checked. Account ID: {account_id} Username: {username}')
return False
user_id = sql_select_result.get('id', None)
log.info('A user account already exists with this username. Current User ID: {user_id} Username: {username}')
if allow_update:
log.info('Updating instead of inserting. Current User ID: {user_id} Username: {username}')
user_obj_data['id'] = user_id
if user_obj_up_result := sql_update(data=user_obj_data, table_name='user', rm_id_random=True):
log.info(f'User updated with new user data. User ID: {user_id}')
else:
log.warning(f'User not updated with new user data. User ID: {user_id}')
log.debug(user_obj_up_result)
return False
else:
log.info('Updating is now allowed. Current User ID: {user_id} Username: {username}')
if avoid_dup_username:
log.info('Avoiding duplicate username is now allowed. Suggested Username: {username}')
new_username = username+'-'+str(random.randint(10, 99))
user_obj_data['username'] = new_username
if user_obj_in_result := sql_insert(data=user_obj_data, table_name='user', rm_id_random=True, id_random_length=8): pass
else:
log.warning(f'User not created.')
log.debug(user_obj_in_result)
return False
user_id = user_obj_in_result
else:
log.warning(f'Updating is not allowed and avoid duplicate username is not allowed. Username: {username}')
return False
#log.setLevel(logging.DEBUG)
# log.debug(user_obj_up_result)
log.debug(f'Returning the existing user_id: {user_id}')
else:
if user_obj_in_result := sql_insert(data=user_obj_data, table_name='user', rm_id_random=True, id_random_length=8): pass
else:
log.warning(f'User not created.')
log.debug(user_obj_in_result)
return False
user_id = user_obj_in_result
log.debug(f'Returning the new user_id: {user_id}')
return user_id
# ### END ### API User Methods ### create_user_obj() ###
# ### BEGIN ### API User Methods ### update_user_obj() ###
# Updated 2021-08-25
def update_user_obj(
user_id: int|str, # Ideally the int ID should be passed. This allows for updating of the id_random value.
user_obj_up: User_Base,
create_sub_obj: bool = False,
fail_any: bool = True, # Fail if any thing goes wrong for sub objects
return_dict: bool = False,
) -> bool|int:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else: return False
log.debug(type(user_obj_up))
if isinstance(user_obj_up, dict):
try:
user_obj_up = User_Base(**user_obj_up)
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_obj_up)
except ValidationError as e:
log.error(e.json())
return False
user_obj_up.id = user_id
log.debug(user_obj_up)
# log.debug(user_obj_up.dict(by_alias=True, exclude_unset=True))
log.debug(user_obj_up.dict(by_alias=False, exclude_unset=True))
# log.debug(user_obj_up.dict(by_alias=False, exclude_unset=False))
# if user_obj_up.contact_id and user_obj_up.contact:
# contact_id = user_obj_up.contact_id
# contact_obj_up = user_obj_up.contact
# log.debug(contact_id)
# log.debug(contact_obj_up)
# if contact_obj_up_result := update_contact_obj(
# contact_id=contact_id,
# contact_obj_up=contact_obj_up,
# create_sub_obj=create_sub_obj,
# ):
# log.debug(contact_obj_up_result)
# else:
# log.debug(contact_obj_up_result)
# return False
# elif user_obj_up.contact and not user_obj_up.contact.id:
# # NOTE: This will blindly create a new contact even if there was one associated but the user.contact_id was not found.
# contact_obj_in = user_obj_up.contact
# log.debug(contact_obj_in)
# if contact_obj_in_result := create_contact_obj(contact_obj_new=contact_obj_in):
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(contact_obj_in_result)
# user_obj_up.contact_id = contact_obj_in_result
# else:
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(contact_obj_in_result)
# return False
# if organization_obj_update := user_obj_up.organization:
# log.debug(organization_obj_update)
# if organization_obj_up_result := update_organization_obj(organization_obj_update):
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(organization_obj_up_result)
# return True
# else:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(organization_obj_up_result)
# return False
# else:
# if organization_obj_in_result := insert_organization_obj(organization_obj_insert):
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(organization_obj_in_result)
# return True # NOTE: This needs to return the new organization ID
# else:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(organization_obj_in_result)
# return False
# if user_obj_up.person_id and user_obj_up.person:
# person_id = user_obj_up.person_id
# person_obj_up = user_obj_up.person
# log.debug(person_id)
# log.debug(person_obj_up)
# if person_obj_up_result := update_person_obj(
# person_id=person_id,
# person_obj_up=person_obj_up,
# create_sub_obj=create_sub_obj,
# ):
# log.debug(person_obj_up_result)
# else:
# log.debug(person_obj_up_result)
# return False
# elif user_obj_up.person and not user_obj_up.person.id:
# # NOTE: This will blindly create a new person even if there was one associated but the user.person_id was not found.
# person_obj_in = user_obj_up.person
# log.debug(person_obj_in)
# if person_obj_in_result := create_person_obj_v3(person_obj_new=person_obj_in):
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(person_obj_in_result)
# person_obj_up.person_id = person_obj_in_result
# else:
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(person_obj_in_result)
# return False
# IMPORTANT NOTE: Need to be extra careful if allowing an update to password, super, or manager. Maybe other fields?
user_dict_up = user_obj_up.dict(by_alias=False, exclude_unset=True, exclude={'password', 'super', 'manager', 'contact', 'organization', 'person'})
log.debug(user_dict_up)
if user_obj_up_result := sql_update(data=user_dict_up, table_name='user', rm_id_random=True):
log.debug(user_obj_up_result)
return True
# return user_id
else:
log.debug(user_obj_up_result)
return False
# ### END ### API User Methods ### update_user_obj() ###
# ### BEGIN ### API User Methods ### load_user_obj() ###
def load_user_obj(
user_id: int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_address: bool = False,
# inc_archive_list: bool = False,
inc_contact: bool = False,
inc_event_list: bool = False,
# inc_hosted_file_list: bool = False,
inc_journal_list: bool = False,
inc_journal_entry_list: bool = False,
inc_membership_person: bool = False,
inc_order_cfg: bool = False,
inc_order_line_list: bool = False,
inc_order_list: bool = False,
inc_order_cart_list: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
# inc_person_list: bool = False,
inc_post_list: bool = False,
inc_post_comment_list: bool = False,
inc_user_role_list: bool = False,
) -> User_Out_Base|dict|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else: return False
if user_rec := sql_select(table_name='v_user', record_id=user_id): pass
else: return False
log.debug(user_rec)
try:
user_obj = User_Out_Base(**user_rec)
log.debug(user_obj)
except ValidationError as e:
log.error(e.json())
return False
# Updated 2021-06-18
# if inc_contact:
# contact_id = user_rec.get('contact_id', None)
# log.debug(contact_id)
# if contact_result := load_contact_obj(
# contact_id = contact_id,
# limit = limit,
# by_alias = by_alias,
# exclude_unset = exclude_unset,
# model_as_dict = model_as_dict,
# enabled = enabled,
# inc_address = inc_address,
# ):
# user_obj.contact = contact_result
# else: user_obj.contact = None
if inc_event_list:
from app.methods.event_methods import load_event_obj_list
if event_dict_list := load_event_obj_list(
user_id = user_id,
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
):
user_obj.event_list = event_dict_list
else: user_obj.event_list = []
# Updated 2021-06-18
if inc_order_list:
if order_rec_list_result := get_order_rec_list(
for_obj_type = 'user',
for_obj_id = user_id,
limit = limit,
enabled = enabled,
):
order_result_list = []
for order_rec in order_rec_list_result:
if load_order_result := load_order_obj(
order_id = order_rec.get('order_id', None),
limit = limit,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
enabled = enabled,
inc_order_cfg = inc_order_cfg,
inc_order_line_list = inc_order_line_list,
inc_person = inc_person,
):
order_result_list.append(load_order_result)
else: order_result_list.append(None)
user_obj.order_list = order_result_list
else: user_obj.order_list = []
# Updated 2021-06-18
if inc_organization:
organization_id = user_rec.get('organization_id', None)
log.debug(organization_id)
if organization_result := load_organization_obj(
organization_id = organization_id,
limit = limit,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
enabled = enabled,
inc_address = inc_address,
inc_contact = inc_contact,
):
user_obj.organization = organization_result
else: user_obj.organization = None
# Updated 2021-06-18
if inc_person:
person_id = user_rec.get('person_id', None)
log.debug(person_id)
if person_result := load_person_obj(
person_id = person_id,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
inc_address = inc_address,
inc_contact = inc_contact,
inc_organization = inc_organization,
):
user_obj.person = person_result
else: user_obj.person = None
log.debug(person_result)
# Updated 2021-06-18
if inc_post_list:
if post_rec_list_result := get_post_rec_list(
for_obj_type = 'user',
for_obj_id = user_id,
limit = limit,
enabled = enabled,
):
post_result_list = []
for post_rec in post_rec_list_result:
if load_post_result := load_post_obj(
post_id = post_rec.get('post_id', None),
limit = limit,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
enabled = enabled,
inc_post_comment_list = inc_post_comment_list,
inc_person = inc_person,
# inc_user = inc_user,
):
post_result_list.append(load_post_result)
else: post_result_list.append(None)
user_obj.post_list = post_result_list
else: user_obj.post_list = []
# Updated 2021-06-25
if inc_user_role_list:
if user_role_rec_list_result := get_user_role_rec_list(
for_obj_type = 'user',
for_obj_id = user_id,
limit = limit,
enabled = enabled,
):
user_role_result_list = []
for user_role_rec in user_role_rec_list_result:
if load_user_role_result := load_user_role_obj(
user_role_id = user_role_rec.get('user_role_id', None),
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
):
user_role_result_list.append(load_user_role_result)
else: user_role_result_list.append(None)
user_obj.user_role_list = user_role_result_list
else: user_obj.user_role_list = []
log.debug(user_obj)
if model_as_dict:
return user_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return user_obj
# ### END ### API User Methods ### load_user_obj() ###
# ### BEGIN ### API User Methods ### get_user_rec_list() ###
def get_user_rec_list(
account_id: int|str,
hidden: str = 'not_hidden', # hidden, not_hidden, all
enabled: str = 'enabled', # enabled, disabled, all
limit: int = 1000,
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: return False
data = {}
data['account_id'] = account_id
sql_where_account_id = f'`user`.account_id = :account_id'
# data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
# sql_obj_type_id = f'`user`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `user`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `user`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `user`.id AS 'user_id', `user`.id_random AS 'user_id_random'
FROM `user` AS `user`
WHERE
{sql_where_account_id}
{sql_enabled}
ORDER BY user.name, user.email, user.username, `user`.created_on DESC, `user`.updated_on DESC
{sql_limit};
"""
if user_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
user_rec_li = user_rec_li_result
else:
user_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_rec_li_result)
return user_rec_li
# ### END ### API User Methods ### get_user_rec_list() ###