118 lines
5.0 KiB
Python
118 lines
5.0 KiB
Python
from __future__ import annotations
|
|
import datetime
|
|
|
|
from typing import Dict, List, Optional, Set, Union
|
|
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
|
|
|
|
from ..lib_general import *
|
|
from ..db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
|
|
|
|
from .user_model import User_Base, User_Out_Base, User_New_Base
|
|
from .user_role_model import User_Role_Base
|
|
|
|
|
|
# ### BEGIN ### API User Methods ### create_user_obj() ###
|
|
def create_user_obj(user_obj_new:User_Base) -> int|bool:
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if not user_obj_new:
|
|
return False
|
|
|
|
# user_obj_data = user_obj_new.dict(by_alias=False, exclude_defaults=False, include={'password'}, exclude={'new_password'})
|
|
# log.debug(user_obj_data)
|
|
|
|
user_obj_data = user_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'new_password'})
|
|
log.debug(user_obj_data)
|
|
|
|
user_obj_data['password'] = user_obj_new.password # There has to be a better way to do this??? It thinks "password" is unset and so is excluded?
|
|
log.debug(user_obj_data)
|
|
|
|
log.info('Checking if the username is already in use for the account...')
|
|
sql_select_user = f"""
|
|
SELECT user.id, user.id_random, user.name, user.email
|
|
FROM `user` AS user
|
|
WHERE user.account_id = :account_id and user.username = :username
|
|
"""
|
|
|
|
if sql_select_result := sql_select(sql=sql_select_user, data=user_obj_data, rm_id_random=True):
|
|
if isinstance(sql_select_result, list):
|
|
log.exception(f"Multiple user accounts already exists with this username ({user_obj_data['username']}). The database needs to be checked.")
|
|
return False
|
|
log.info('A user account already exists with this username. Updating instead of inserting.')
|
|
user_id = sql_select_result.get('id', None)
|
|
user_obj_data['id'] = user_id
|
|
if user_obj_up_result := sql_update(data=user_obj_data, table_name='user', rm_id_random=True): pass
|
|
else:
|
|
return False
|
|
|
|
log.setLevel(logging.DEBUG)
|
|
log.debug(user_obj_up_result)
|
|
|
|
log.debug(f'Returning the existing user_id: {user_id}')
|
|
else:
|
|
if user_obj_in_result := sql_insert(data=user_obj_data, table_name='user', rm_id_random=True, id_random_length=8): pass
|
|
else:
|
|
return False
|
|
|
|
log.setLevel(logging.DEBUG)
|
|
log.debug(user_obj_in_result)
|
|
|
|
user_id = user_obj_in_result
|
|
|
|
log.debug(f'Returning the new user_id: {user_id}')
|
|
return user_id
|
|
# ### END ### API User Methods ### create_user_obj() ###
|
|
|
|
|
|
# ### BEGIN ### API User Methods ### load_user_obj() ###
|
|
def load_user_obj(user_id:int|str, inc_roles:bool=False, inc_contact:bool=False, inc_organization:bool=False, inc_person:bool=False) -> User_Out_Base|bool:
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
|
|
else: return False
|
|
|
|
if user_rec := sql_select(table_name='v_user', record_id=user_id):
|
|
#log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(user_rec)
|
|
|
|
if inc_roles:
|
|
if role_rec_li := sql_select(table_name='v_user_role_detail', field_name='user_id', field_value=user_id, as_list=True):
|
|
user_rec['role_list'] = role_rec_li
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(role_rec_li)
|
|
|
|
if inc_contact:
|
|
if contact_rec := sql_select(table_name='v_contact', field_name='contact_id', field_value=user_rec.get('contact_id', None)):
|
|
user_rec['contact'] = contact_rec
|
|
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(contact_rec)
|
|
|
|
if inc_organization:
|
|
if organization_rec := sql_select(table_name='v_organization', field_name='organization_id', field_value=user_rec.get('organization_id', None)):
|
|
user_rec['organization'] = organization_rec
|
|
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(organization_rec)
|
|
|
|
if inc_person:
|
|
if person_rec := sql_select(table_name='v_person', field_name='person_id', field_value=user_rec.get('person_id', None)):
|
|
user_rec['person'] = person_rec
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(person_rec)
|
|
|
|
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(user_rec)
|
|
else:
|
|
return False
|
|
|
|
try:
|
|
user_obj = User_Out_Base(**user_rec)
|
|
log.debug(user_obj)
|
|
except ValidationError as e:
|
|
log.error(e.json())
|
|
return False
|
|
|
|
return user_obj
|
|
# ### END ### API User Methods ### load_user_obj() ###
|