from __future__ import annotations import datetime from typing import Dict, List, Optional, Set, Union from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update from app.lib_general import log, logging from app.methods.contact_methods import load_contact_obj, update_contact_obj # from app.methods.event_methods import get_event_rec_list from app.methods.order_methods import load_order_obj, get_order_rec_list from app.methods.organization_methods import load_organization_obj, update_organization_obj from app.methods.person_methods import load_person_obj, update_person_obj from app.methods.post_methods import get_post_rec_list, load_post_obj from app.methods.user_role_methods import get_user_role_rec_list, load_user_role_obj from app.models.user_models import User_Base, User_New_Base, User_Out_Base # ### BEGIN ### API User Methods ### create_user_obj() ### def create_user_obj(user_obj_new:User_New_Base) -> int|bool: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if not user_obj_new: return False # user_obj_data = user_obj_new.dict(by_alias=False, exclude_defaults=False, include={'password'}, exclude={'new_password'}) # log.debug(user_obj_data) user_obj_data = user_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'new_password'}) log.debug(user_obj_data) user_obj_data['password'] = user_obj_new.password # There has to be a better way to do this??? It thinks "password" is unset and so is excluded? log.debug(user_obj_data) log.info('Checking if the username is already in use for the account...') sql_select_user = f""" SELECT user.id, user.id_random, user.name, user.email FROM `user` AS user WHERE user.account_id = :account_id and user.username = :username """ if sql_select_result := sql_select(sql=sql_select_user, data=user_obj_data, rm_id_random=True): if isinstance(sql_select_result, list): log.exception(f"Multiple user accounts already exists with this username ({user_obj_data['username']}). The database needs to be checked.") return False log.info('A user account already exists with this username. Updating instead of inserting.') user_id = sql_select_result.get('id', None) user_obj_data['id'] = user_id if user_obj_up_result := sql_update(data=user_obj_data, table_name='user', rm_id_random=True): pass else: return False #log.setLevel(logging.DEBUG) log.debug(user_obj_up_result) log.debug(f'Returning the existing user_id: {user_id}') else: if user_obj_in_result := sql_insert(data=user_obj_data, table_name='user', rm_id_random=True, id_random_length=8): pass else: return False #log.setLevel(logging.DEBUG) log.debug(user_obj_in_result) user_id = user_obj_in_result log.debug(f'Returning the new user_id: {user_id}') return user_id # ### END ### API User Methods ### create_user_obj() ### # ### BEGIN ### API User Methods ### load_user_obj() ### def load_user_obj( user_id: int|str, limit: int = 1000, by_alias: bool = True, exclude_unset: bool = True, model_as_dict: bool = False, enabled: str = 'enabled', # enabled, disabled, all inc_address: bool = False, # inc_archive_list: bool = False, inc_contact: bool = False, inc_event_list: bool = False, # inc_hosted_file_list: bool = False, inc_journal_list: bool = False, inc_journal_entry_list: bool = False, inc_membership_member: bool = False, inc_order_cfg: bool = False, inc_order_line_list: bool = False, inc_order_list: bool = False, inc_order_cart_list: bool = False, inc_organization: bool = False, inc_person: bool = False, # inc_person_list: bool = False, inc_post_list: bool = False, inc_post_comment_list: bool = False, inc_user_role_list: bool = False, ) -> User_Out_Base|bool: # log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass else: return False if user_rec := sql_select(table_name='v_user', record_id=user_id): pass else: return False # log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(user_rec) try: user_obj = User_Out_Base(**user_rec) log.debug(user_obj) except ValidationError as e: log.error(e.json()) return False # Updated 2021-06-18 # if inc_contact: # contact_id = user_rec.get('contact_id', None) # log.debug(contact_id) # if contact_result := load_contact_obj( # contact_id = contact_id, # limit = limit, # by_alias = by_alias, # exclude_unset = exclude_unset, # model_as_dict = model_as_dict, # enabled = enabled, # inc_address = inc_address, # ): # user_obj.contact = contact_result # else: user_obj.contact = None if inc_event_list: from app.methods.event_methods import load_event_obj_list if event_dict_list := load_event_obj_list( user_id = user_id, limit = limit, model_as_dict = model_as_dict, enabled = enabled, ): user_obj.event_list = event_dict_list else: user_obj.event_list = [] # Updated 2021-06-18 if inc_order_list: if order_rec_list_result := get_order_rec_list( for_obj_type = 'user', for_obj_id = user_id, limit = limit, enabled = enabled, ): order_result_list = [] for order_rec in order_rec_list_result: if load_order_result := load_order_obj( order_id = order_rec.get('order_id', None), limit = limit, by_alias = by_alias, exclude_unset = exclude_unset, model_as_dict = model_as_dict, enabled = enabled, inc_order_cfg = inc_order_cfg, inc_order_line_list = inc_order_line_list, inc_person = inc_person, # inc_user = inc_user, ): order_result_list.append(load_order_result) else: order_result_list.append(None) user_obj.order_list = order_result_list else: user_obj.order_list = [] # Updated 2021-06-18 if inc_organization: organization_id = user_rec.get('organization_id', None) log.debug(organization_id) if organization_result := load_organization_obj( organization_id = organization_id, limit = limit, by_alias = by_alias, exclude_unset = exclude_unset, model_as_dict = model_as_dict, enabled = enabled, inc_address = inc_address, inc_contact = inc_contact, ): user_obj.organization = organization_result else: user_obj.organization = None # Updated 2021-06-18 if inc_person: person_id = user_rec.get('person_id', None) log.debug(person_id) if person_result := load_person_obj( person_id = person_id, by_alias = by_alias, exclude_unset = exclude_unset, model_as_dict = model_as_dict, inc_address = inc_address, inc_contact = inc_contact, inc_organization = inc_organization, ): user_obj.person = person_result else: user_obj.person = None log.debug(person_result) # Updated 2021-06-18 if inc_post_list: if post_rec_list_result := get_post_rec_list( for_obj_type = 'user', for_obj_id = user_id, limit = limit, enabled = enabled, ): post_result_list = [] for post_rec in post_rec_list_result: if load_post_result := load_post_obj( post_id = post_rec.get('post_id', None), limit = limit, by_alias = by_alias, exclude_unset = exclude_unset, model_as_dict = model_as_dict, enabled = enabled, inc_post_comment_list = inc_post_comment_list, inc_person = inc_person, # inc_user = inc_user, ): post_result_list.append(load_post_result) else: post_result_list.append(None) user_obj.post_list = post_result_list else: user_obj.post_list = [] # Updated 2021-06-25 if inc_user_role_list: log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL if user_role_rec_list_result := get_user_role_rec_list( for_obj_type = 'user', for_obj_id = user_id, limit = limit, enabled = enabled, ): user_role_result_list = [] log.debug(user_role_rec_list_result) for user_role_rec in user_role_rec_list_result: if load_user_role_result := load_user_role_obj( user_role_id = user_role_rec.get('user_role_id', None), by_alias = by_alias, exclude_unset = exclude_unset, model_as_dict = model_as_dict, ): user_role_result_list.append(load_user_role_result) else: user_role_result_list.append(None) user_obj.user_role_list = user_role_result_list else: user_obj.user_role_list = [] # if role_rec_li := sql_select(table_name='v_user_role_detail', field_name='user_id', field_value=user_id, as_list=True): # # user_rec['user_role_list'] = role_rec_li # user_obj.user_role_list = role_rec_li # else: # # user_rec['user_role_list'] = None # user_obj.user_role_list = None # log.debug(user_rec) log.debug(user_obj) if model_as_dict: return user_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member else: return user_obj # ### END ### API User Methods ### load_user_obj() ### # ### BEGIN ### API User Methods ### update_user_obj() ### def update_user_obj( user_id: int|str, # Ideally the int ID should be passed. This allows for updating of the id_random value. user_obj_up: User_Base, create_missing_obj: bool = False, ) -> bool: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass else: return False user_obj_up.id = user_id log.debug(user_obj_up) # log.debug(user_obj_up.dict(by_alias=True, exclude_unset=True)) log.debug(user_obj_up.dict(by_alias=False, exclude_unset=True)) # log.debug(user_obj_up.dict(by_alias=False, exclude_unset=False)) # if user_obj_up.contact_id and user_obj_up.contact: # contact_id = user_obj_up.contact_id # contact_obj_up = user_obj_up.contact # log.debug(contact_id) # log.debug(contact_obj_up) # if contact_obj_up_result := update_contact_obj( # contact_id=contact_id, # contact_obj_up=contact_obj_up, # create_missing_obj=create_missing_obj, # ): # log.debug(contact_obj_up_result) # else: # log.debug(contact_obj_up_result) # return False # elif user_obj_up.contact and not user_obj_up.contact.id: # # NOTE: This will blindly create a new contact even if there was one associated but the user.contact_id was not found. # contact_obj_in = user_obj_up.contact # log.debug(contact_obj_in) # if contact_obj_in_result := create_contact_obj(contact_obj_new=contact_obj_in): # # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(contact_obj_in_result) # user_obj_up.contact_id = contact_obj_in_result # else: # # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(contact_obj_in_result) # return False # if organization_obj_update := user_obj_up.organization: # log.debug(organization_obj_update) # if organization_obj_up_result := update_organization_obj(organization_obj_update): # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(organization_obj_up_result) # return True # else: # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(organization_obj_up_result) # return False # else: # if organization_obj_in_result := insert_organization_obj(organization_obj_insert): # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(organization_obj_in_result) # return True # NOTE: This needs to return the new organization ID # else: # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(organization_obj_in_result) # return False # if user_obj_up.person_id and user_obj_up.person: # person_id = user_obj_up.person_id # person_obj_up = user_obj_up.person # log.debug(person_id) # log.debug(person_obj_up) # if person_obj_up_result := update_person_obj( # person_id=person_id, # person_obj_up=person_obj_up, # create_missing_obj=create_missing_obj, # ): # log.debug(person_obj_up_result) # else: # log.debug(person_obj_up_result) # return False # elif user_obj_up.person and not user_obj_up.person.id: # # NOTE: This will blindly create a new person even if there was one associated but the user.person_id was not found. # person_obj_in = user_obj_up.person # log.debug(person_obj_in) # if person_obj_in_result := create_person_obj(person_obj_new=person_obj_in): # # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(person_obj_in_result) # person_obj_up.person_id = person_obj_in_result # else: # # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(person_obj_in_result) # return False # IMPORTANT NOTE: Need to be extra careful if allowing an update to password, super, or manager. Maybe other fields? user_dict_up = user_obj_up.dict(by_alias=False, exclude_unset=True, exclude={'password', 'super', 'manager', 'contact', 'organization', 'person'}) log.debug(user_dict_up) if user_obj_up_result := sql_update(data=user_dict_up, table_name='user', rm_id_random=True): log.debug(user_obj_up_result) return True else: log.debug(user_obj_up_result) return False # ### END ### API User Methods ### update_user_obj() ### # ### BEGIN ### API User Methods ### get_user_rec_list() ### def get_user_rec_list( for_obj_type: str, for_obj_id: str, limit: int = 1000, enabled: str = 'enabled', # enabled, disabled, all ) -> list|bool: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name='for_obj_type'): pass else: return False data = {} data[f'{for_obj_type}_id'] = for_obj_id # data['for_obj_type'] = for_obj_type sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id' if enabled in ['enabled', 'disabled', 'all']: if enabled == 'enabled': data['enable'] = True sql_enabled = f'AND `tbl`.enable = :enable' elif enabled == 'disabled': data['enable'] = False sql_enabled = f'AND `tbl`.enable = :enable' elif enabled == 'all': sql_enabled = '' if limit: data['limit'] = limit sql_limit = f'LIMIT :limit' else: sql_limit = '' sql = f""" SELECT `tbl`.id AS 'user_id', `tbl`.id_random AS 'user_id_random' FROM `user` AS `tbl` WHERE {sql_obj_type_id} {sql_enabled} ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC {sql_limit}; """ if user_rec_li_result := sql_select(data=data, sql=sql, as_list=True): user_rec_li = user_rec_li_result else: user_rec_li = [] log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(user_rec_li_result) return user_rec_li # ### END ### API User Methods ### get_user_rec_list() ###