Code clean up. Bug fixes for person, user, contact, and address methods
This commit is contained in:
@@ -6,14 +6,14 @@ from typing import Dict, List, Optional, Set, Union
|
||||
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
|
||||
|
||||
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
|
||||
from app.lib_general import log, logging, logger_reset, send_email
|
||||
from app.lib_general import log, logging, logger_reset, secure_hash_string, send_email
|
||||
|
||||
# from app.methods.account_methods import load_account_cfg_obj
|
||||
from app.methods.contact_methods import load_contact_obj, update_contact_obj
|
||||
# from app.methods.contact_methods import create_contact_obj, load_contact_obj, update_contact_obj
|
||||
from app.methods.order_methods import load_order_obj, get_order_rec_list
|
||||
from app.methods.organization_methods import load_organization_obj, update_organization_obj
|
||||
from app.methods.organization_methods import load_organization_obj # , update_organization_obj
|
||||
from app.methods.person_methods import create_person_obj_v3, load_person_obj, update_person_obj
|
||||
from app.methods.post_methods import get_post_rec_list, load_post_obj
|
||||
# from app.methods.post_methods import get_post_rec_list, load_post_obj
|
||||
from app.methods.user_role_methods import get_user_role_rec_list, load_user_role_obj
|
||||
|
||||
from app.models.common_field_schema import default_num_bytes
|
||||
@@ -34,7 +34,7 @@ def create_user_obj(
|
||||
fail_any: bool = True, # Fail if any thing goes wrong for sub objects
|
||||
return_dict: bool = False,
|
||||
) -> bool|dict|int:
|
||||
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(locals())
|
||||
|
||||
# ### SECTION ### Secondary data validation
|
||||
@@ -42,6 +42,10 @@ def create_user_obj(
|
||||
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
|
||||
else: return False
|
||||
|
||||
if person_id := redis_lookup_id_random(record_id_random=person_id, table_name='person'): pass
|
||||
elif person_id is None: pass
|
||||
else: return False
|
||||
|
||||
log.info('Create dictionary or Pydantic object')
|
||||
log.debug(type(user_dict_obj))
|
||||
if isinstance(user_dict_obj, dict):
|
||||
@@ -56,28 +60,31 @@ def create_user_obj(
|
||||
user_obj = user_dict_obj
|
||||
user_obj.account_id = account_id
|
||||
|
||||
user_dict = user_obj.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'contact', 'contact_id', 'contact_id_random', 'email', 'cc_email', 'membership_person_id', 'membership_person_id_random', 'new_password', 'organization', 'person', 'user', 'created_on', 'updated_on'})
|
||||
|
||||
# log.debug(type(user_dict_obj))
|
||||
# if isinstance(user_dict_obj, dict):
|
||||
# user_dict_obj['account_id'] = account_id
|
||||
# try:
|
||||
# user_obj = User_New_Base(**user_dict_obj)
|
||||
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
# log.debug(user_obj)
|
||||
# except ValidationError as e:
|
||||
# log.error(e.json())
|
||||
# return False
|
||||
|
||||
# user_dict = user_obj.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'contact', 'new_password', 'organization', 'person', 'created_on', 'updated_on'})
|
||||
# log.debug(user_dict)
|
||||
# user_dict['account_id'] = account_id
|
||||
user_dict = user_obj.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'contact', 'contact_id_random', 'new_password', 'organization', 'person', 'person_id_random', 'created_on', 'updated_on'})
|
||||
|
||||
# ### SECTION ### Process data
|
||||
user_obj.account_id = account_id # Is this needed?
|
||||
user_dict['account_id'] = account_id
|
||||
|
||||
user_dict['password'] = user_obj.password # There has to be a better way to do this??? It thinks "password" is unset and so is excluded?
|
||||
if user_obj.new_password:
|
||||
log.debug(user_obj.new_password)
|
||||
else:
|
||||
user_obj.new_password = secrets.token_urlsafe(default_num_bytes)
|
||||
hash_string = secure_hash_string(string=user_obj.new_password)
|
||||
user_obj.password = hash_string
|
||||
user_dict['password'] = hash_string
|
||||
|
||||
log.debug(user_obj.new_password)
|
||||
|
||||
# user_dict['password'] = user_obj.password # There has to be a better way to do this??? It thinks "password" is unset and so is excluded?
|
||||
|
||||
if person_id:
|
||||
# Link to an existing person
|
||||
log.info(f'Adding person_id to user_dict. User ID: {person_id}')
|
||||
user_obj.person_id = person_id # Is this needed?
|
||||
user_dict['person_id'] = person_id
|
||||
|
||||
log.debug(user_obj)
|
||||
log.debug(user_dict)
|
||||
|
||||
# if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
|
||||
@@ -87,7 +94,7 @@ def create_user_obj(
|
||||
# account_id = user_dict.get('account_id', None)
|
||||
username = user_dict.get('username', None)
|
||||
|
||||
log.info(f'Checking if the username is already in use for the account... Account: {account_id} Username: {username}')
|
||||
log.info(f'Checking if the username is already in use for the account... Account: {account_id}; Username: {username}')
|
||||
sql_select_user = f"""
|
||||
SELECT user.id, user.id_random, user.name, user.email
|
||||
FROM `user` AS user
|
||||
@@ -96,23 +103,27 @@ def create_user_obj(
|
||||
|
||||
if sql_select_user_result := sql_select(sql=sql_select_user, data=user_dict, rm_id_random=True):
|
||||
if isinstance(sql_select_user_result, list):
|
||||
log.exception(f'Multiple user accounts already exists with this username. The database needs to be checked. Account ID: {account_id} Username: {username}')
|
||||
log.exception(f'Multiple user accounts already exists with this username. The database needs to be checked. Account ID: {account_id}; Username: {username}')
|
||||
return False
|
||||
user_id = sql_select_user_result.get('id', None)
|
||||
log.info('A user account already exists with this username. Current User ID: {user_id} Username: {username}')
|
||||
person_id_for_user_id = sql_select_user_result.get('person_id', None)
|
||||
log.info(f'A user account already exists with this username. Current User ID: {user_id}; Username: {username}; Person ID for User: {person_id_for_user_id}; Person ID: {person_id}')
|
||||
if allow_update:
|
||||
log.info('Updating instead of inserting. Current User ID: {user_id} Username: {username}')
|
||||
user_dict['id'] = user_id
|
||||
if user_dict_up_result := sql_update(data=user_dict, table_name='user', rm_id_random=True):
|
||||
log.info(f'User updated with new user data. User ID: {user_id}')
|
||||
else:
|
||||
log.warning(f'User not updated with new user data. User ID: {user_id}')
|
||||
log.debug(user_dict_up_result)
|
||||
return False
|
||||
log.info(f'Updating instead of inserting. Current User ID: {user_id}; Username: {username}')
|
||||
# NOTE: Should this call the update_user_obj() function instead??? NOTE NOTE NOTE NOTE
|
||||
if person_id_for_user_id == person_id or person_id_for_user_id is None:
|
||||
user_dict['id'] = user_id
|
||||
user_dict['person_id'] = person_id
|
||||
if user_dict_up_result := sql_update(data=user_dict, table_name='user', rm_id_random=True):
|
||||
log.info(f'User updated with new user data. User ID: {user_id}')
|
||||
else:
|
||||
log.warning(f'User not updated with new user data. User ID: {user_id}')
|
||||
log.debug(user_dict_up_result)
|
||||
return False
|
||||
else:
|
||||
log.info('Updating is now allowed. Current User ID: {user_id} Username: {username}')
|
||||
log.info(f'Updating is now allowed. Current User ID: {user_id}; Username: {username}')
|
||||
if avoid_dup_username:
|
||||
log.info('Avoiding duplicate username is now allowed. Suggested Username: {username}')
|
||||
log.info(f'Avoiding duplicate username is now allowed. Suggested Username: {username}')
|
||||
new_username = username+'-'+str(random.randint(10, 99))
|
||||
user_dict['username'] = new_username
|
||||
if user_dict_in_result := sql_insert(data=user_dict, table_name='user', rm_id_random=True, id_random_length=8): pass
|
||||
@@ -160,6 +171,10 @@ def update_user_obj(
|
||||
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
|
||||
else: return False
|
||||
|
||||
if person_id := redis_lookup_id_random(record_id_random=person_id, table_name='person'): pass
|
||||
elif person_id is None: pass
|
||||
else: return False
|
||||
|
||||
log.info('Create dictionary or Pydantic object')
|
||||
log.debug(type(user_dict_obj))
|
||||
if isinstance(user_dict_obj, dict):
|
||||
@@ -209,7 +224,7 @@ def update_user_obj(
|
||||
# log.debug(contact_obj_up_result)
|
||||
# return False
|
||||
# elif user_obj.contact and not user_obj.contact.id:
|
||||
# # NOTE: This will blindly create a new contact even if there was one associated but the user.contact_id was not found.
|
||||
# # NOTE: This will blindly create a new contact even if there was one associated but the user_obj.contact_id was not found.
|
||||
# contact_obj_in = user_obj.contact
|
||||
# log.debug(contact_obj_in)
|
||||
# if contact_obj_in_result := create_contact_obj(contact_dict_obj=contact_obj_in):
|
||||
@@ -256,7 +271,7 @@ def update_user_obj(
|
||||
# log.debug(person_obj_up_result)
|
||||
# return False
|
||||
# elif user_obj.person and not user_obj.person.id:
|
||||
# # NOTE: This will blindly create a new person even if there was one associated but the user.person_id was not found.
|
||||
# # NOTE: This will blindly create a new person even if there was one associated but the user_obj.person_id was not found.
|
||||
# person_obj_in = user_obj.person
|
||||
# log.debug(person_obj_in)
|
||||
# if person_obj_in_result := create_person_obj_v3(person_obj_new=person_obj_in):
|
||||
@@ -276,12 +291,25 @@ def update_user_obj(
|
||||
user_obj.id = user_id # Is this needed?
|
||||
user_dict['id'] = user_id
|
||||
|
||||
if user_obj.new_password:
|
||||
log.debug(user_obj.new_password)
|
||||
else:
|
||||
user_obj.new_password = secrets.token_urlsafe(default_num_bytes)
|
||||
hash_string = secure_hash_string(string=user_obj.new_password)
|
||||
user_obj.password = hash_string
|
||||
user_dict['password'] = hash_string
|
||||
|
||||
log.debug(user_obj.new_password)
|
||||
|
||||
if person_id:
|
||||
# Link to an existing person
|
||||
log.info(f'Adding person_id to person_dict. Person ID: {person_id}')
|
||||
user_obj.person_id = person_id # Is this needed?
|
||||
user_dict['person_id'] = person_id
|
||||
|
||||
log.debug(user_obj)
|
||||
log.debug(user_dict)
|
||||
|
||||
if user_dict_up_result := sql_update(data=user_dict, table_name='user', rm_id_random=True): pass
|
||||
else:
|
||||
log.warning(f'User not updated.')
|
||||
|
||||
Reference in New Issue
Block a user