import datetime, random, secrets import urllib.parse from typing import Dict, List, Optional, Set, Union from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator from app.db_sql import redis_lookup_id_random, sql_enable_part, sql_insert, sql_limit_offset_part, sql_select, sql_update from app.lib_general import log, logging, logger_reset, secure_hash_string, send_email # from app.methods.account_methods import load_account_cfg_obj # from app.methods.contact_methods import create_contact_obj, load_contact_obj, update_contact_obj from app.methods.order_methods import load_order_obj, get_order_rec_list from app.methods.organization_methods import load_organization_obj # , update_organization_obj from app.methods.person_methods import create_person_obj_v3, load_person_obj, update_person_obj # from app.methods.post_methods import get_post_rec_list, load_post_obj from app.methods.user_role_methods import get_user_role_rec_list, load_user_role_obj from app.models.common_field_schema import default_num_bytes from app.models.user_models import User_Base, User_New_Base, User_Out_Base # ### BEGIN ### API User Methods ### create_user_obj() ### # NOTE: This will create a new user and also hash a new password string if given. # NOTE: This uses the User_New_Base model, not User_Base or User_Out_Base # Updated 2022-01-06 def create_user_obj( account_id: int|str, user_dict_obj: User_New_Base, person_id: int = None, # This should be required in the future allow_update: bool = False, # Allow updating the user account if one is found avoid_dup_username: bool = False, # Avoid creating a duplicate by modifying the supplied username set_default_password: bool = True, create_sub_obj: bool = False, fail_any: bool = True, # Fail if any thing goes wrong for sub objects return_dict: bool = False, ) -> bool|dict|int: log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) # ### SECTION ### Secondary data validation # NOTE: Remove at future date. Is this check needed if we trust that the ID is checked ahead of time? if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass else: return False if person_id := redis_lookup_id_random(record_id_random=person_id, table_name='person'): pass elif person_id is None: pass else: return False log.info('Create dictionary or Pydantic object') log.debug(type(user_dict_obj)) if isinstance(user_dict_obj, dict): user_dict = user_dict_obj try: user_obj = User_New_Base(**user_dict) log.debug(user_obj) except ValidationError as e: log.error(e.json()) return False else: user_obj = user_dict_obj user_obj.account_id = account_id user_dict = user_obj.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'contact', 'contact_id_random', 'new_password', 'organization', 'person', 'person_id_random', 'created_on', 'updated_on'}) # ### SECTION ### Process data # Look for an account_id in the user_obj if account_id: pass elif account_id := user_obj.account_id: pass user_obj.account_id = account_id # Is this needed? user_dict['account_id'] = account_id if user_obj.new_password: log.warning('A new password was passed.') log.debug(user_obj.new_password) elif set_default_password: log.warning('A new password was not passed. Setting a default password.') user_obj.new_password = secrets.token_urlsafe(default_num_bytes) hash_string = secure_hash_string(string=user_obj.new_password) user_obj.password = hash_string user_dict['password'] = hash_string else: log.warning('A new password was not passed and not setting a default password.') log.debug(user_obj.new_password) # user_dict['password'] = user_obj.password # There has to be a better way to do this??? It thinks "password" is unset and so is excluded? # Look for a person_id in the user_obj if person_id: pass elif person_id := user_obj.person.id: pass if person_id: # Link to an existing person log.info(f'Adding person_id to user_dict. User ID: {person_id}') user_obj.person_id = person_id # Is this needed? user_dict['person_id'] = person_id log.debug(user_obj) log.debug(user_dict) # if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass # elif account_id := user_dict.get('account_id', None): pass # else: return False # account_id = user_dict.get('account_id', None) username = user_dict.get('username', None) log.info(f'Checking if the username is already in use for the account... Account: {account_id}; Username: {username}') sql_select_user = f""" SELECT user.id, user.id_random, user.name, user.email FROM `user` AS user WHERE user.account_id = :account_id and user.username = :username """ if sql_select_user_result := sql_select(sql=sql_select_user, data=user_dict, rm_id_random=True): if isinstance(sql_select_user_result, list): log.exception(f'Multiple user accounts already exists with this username. The database needs to be checked. Account ID: {account_id}; Username: {username}') return False user_id = sql_select_user_result.get('id', None) person_id_for_user_id = sql_select_user_result.get('person_id', None) log.info(f'A user account already exists with this username. Current User ID: {user_id}; Username: {username}; Person ID for User: {person_id_for_user_id}; Person ID: {person_id}') if allow_update: log.info(f'Updating instead of inserting. Current User ID: {user_id}; Username: {username}') # NOTE: Should this call the update_user_obj() function instead??? NOTE NOTE NOTE NOTE if person_id_for_user_id == person_id or person_id_for_user_id is None: user_dict['id'] = user_id user_dict['person_id'] = person_id if user_dict_up_result := sql_update(data=user_dict, table_name='user', rm_id_random=True): log.info(f'User updated with new user data. User ID: {user_id}') else: log.warning(f'User not updated with new user data. User ID: {user_id}') log.debug(user_dict_up_result) return False else: log.info(f'Updating is now allowed. Current User ID: {user_id}; Username: {username}') if avoid_dup_username: log.info(f'Avoiding duplicate username is now allowed. Suggested Username: {username}') new_username = username+'-'+str(random.randint(10, 99)) user_dict['username'] = new_username if user_dict_in_result := sql_insert( data = user_dict, table_name = 'user', rm_id_random = True, id_random_length = default_num_bytes ): pass else: log.warning(f'User not created.') log.debug(user_dict_in_result) return False user_id = user_dict_in_result else: log.warning(f'Updating is not allowed and avoid duplicate username is not allowed. Username: {username}') return False #log.setLevel(logging.DEBUG) # log.debug(user_dict_up_result) log.debug(f'Returning the existing user_id: {user_id}') else: if user_dict_in_result := sql_insert( data = user_dict, table_name = 'user', rm_id_random = True, id_random_length = default_num_bytes ): pass else: log.warning(f'User not created.') log.debug(user_dict_in_result) return False user_id = user_dict_in_result log.info(f'Returning the new user_id: {user_id}') return user_id # ### END ### API User Methods ### create_user_obj() ### # ### BEGIN ### API User Methods ### update_user_obj() ### # Updated 2022-01-06 def update_user_obj( user_id: int|str, # Ideally the int ID should be passed. This allows for updating of the id_random value. user_dict_obj: User_Base, person_id: int = None, # This should be required in the future? set_default_password: bool = True, create_sub_obj: bool = False, fail_any: bool = True, # Fail if any thing goes wrong for sub objects return_dict: bool = False, ) -> bool|int: log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) # ### SECTION ### Secondary data validation # NOTE: Remove at future date. Is this check needed if we trust that the ID is checked ahead of time? if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass else: return False if person_id := redis_lookup_id_random(record_id_random=person_id, table_name='person'): pass elif person_id is None: pass else: return False log.info('Create dictionary or Pydantic object') log.debug(type(user_dict_obj)) if isinstance(user_dict_obj, dict): user_dict = user_dict_obj user_dict['id'] = user_id try: user_obj = User_Base(**user_dict) except ValidationError as e: log.error(e.json()) return False else: user_obj = user_dict_obj user_obj.id = user_id log.debug(user_obj) # IMPORTANT NOTE: Need to be extra careful if allowing an update to password, super, or manager. Maybe other fields? user_dict = user_obj.dict(by_alias=False, exclude_unset=True, exclude={'password', 'super', 'manager', 'contact', 'organization', 'person', 'created_on', 'updated_on'}) # log.debug(type(user_dict_obj)) # if isinstance(user_dict_obj, dict): # try: # user_obj = User_Base(**user_dict_obj) # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(user_obj) # except ValidationError as e: # log.error(e.json()) # return False # user_obj.id = user_id # log.debug(user_obj) # log.debug(user_obj.dict(by_alias=True, exclude_unset=True)) log.debug(user_obj.dict(by_alias=False, exclude_unset=True)) # log.debug(user_obj.dict(by_alias=False, exclude_unset=False)) # if user_obj.contact_id and user_obj.contact: # contact_id = user_obj.contact_id # contact_obj_up = user_obj.contact # log.debug(contact_id) # log.debug(contact_obj_up) # if contact_obj_up_result := update_contact_obj( # contact_id=contact_id, # contact_dict_obj=contact_obj_up, # create_sub_obj=create_sub_obj, # ): # log.debug(contact_obj_up_result) # else: # log.debug(contact_obj_up_result) # return False # elif user_obj.contact and not user_obj.contact.id: # # NOTE: This will blindly create a new contact even if there was one associated but the user_obj.contact_id was not found. # contact_obj_in = user_obj.contact # log.debug(contact_obj_in) # if contact_obj_in_result := create_contact_obj(contact_dict_obj=contact_obj_in): # # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(contact_obj_in_result) # user_obj.contact_id = contact_obj_in_result # else: # # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(contact_obj_in_result) # return False # if organization_obj_update := user_obj.organization: # log.debug(organization_obj_update) # if organization_obj_up_result := update_organization_obj(organization_obj_update): # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(organization_obj_up_result) # return True # else: # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(organization_obj_up_result) # return False # else: # if organization_obj_in_result := insert_organization_obj(organization_obj_insert): # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(organization_obj_in_result) # return True # NOTE: This needs to return the new organization ID # else: # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(organization_obj_in_result) # return False # if user_obj.person_id and user_obj.person: # person_id = user_obj.person_id # person_obj_up = user_obj.person # log.debug(person_id) # log.debug(person_obj_up) # if person_obj_up_result := update_person_obj( # person_id=person_id, # person_obj_up=person_obj_up, # create_sub_obj=create_sub_obj, # ): # log.debug(person_obj_up_result) # else: # log.debug(person_obj_up_result) # return False # elif user_obj.person and not user_obj.person.id: # # NOTE: This will blindly create a new person even if there was one associated but the user_obj.person_id was not found. # person_obj_in = user_obj.person # log.debug(person_obj_in) # if person_obj_in_result := create_person_obj_v3(person_obj_new=person_obj_in): # # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(person_obj_in_result) # person_obj_up.person_id = person_obj_in_result # else: # # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(person_obj_in_result) # return False # IMPORTANT NOTE: Need to be extra careful if allowing an update to password, super, or manager. Maybe other fields? # user_dict = user_obj.dict(by_alias=False, exclude_unset=True, exclude={'password', 'super', 'manager', 'contact', 'organization', 'person'}) # log.debug(user_dict) # ### SECTION ### Process data user_obj.id = user_id # Is this needed? user_dict['id'] = user_id # if user_obj.new_password: log.debug(user_obj.new_password) # if user_obj.password: log.debug(user_obj.password) # else: # user_obj.new_password = secrets.token_urlsafe(default_num_bytes) # hash_string = secure_hash_string(string=user_obj.new_password) # user_obj.password = hash_string # user_dict['password'] = hash_string # log.debug(user_obj.new_password) # Look for a person_id in the user_obj if person_id: pass elif person_id := user_obj.person.id: pass if person_id: # Link to an existing person log.info(f'Adding person_id to person_dict. Person ID: {person_id}') user_obj.person_id = person_id # Is this needed? user_dict['person_id'] = person_id log.debug(user_obj) log.debug(user_dict) if user_dict_up_result := sql_update( data = user_dict, table_name = 'user', rm_id_random = True ): pass else: log.warning(f'User not updated.') log.debug(user_dict_up_result) return False log.debug(user_dict_up_result) return True # ### END ### API User Methods ### update_user_obj() ### # ### BEGIN ### API User Methods ### load_user_obj() ### def load_user_obj( user_id: int|str, enabled: str = 'enabled', # enabled, disabled, all limit: int = 100, offset: int = 0, by_alias: bool = True, exclude_unset: bool = True, model_as_dict: bool = False, inc_address: bool = False, # inc_archive_list: bool = False, # deprecated inc_contact: bool = False, inc_event_list: bool = False, # deprecated # inc_hosted_file_list: bool = False, # deprecated # inc_journal_list: bool = False, # deprecated # inc_journal_entry_list: bool = False, # deprecated # inc_membership_person: bool = False, # deprecated inc_order_cfg: bool = False, # deprecated inc_order_line_list: bool = False, # deprecated inc_order_list: bool = False, # deprecated inc_order_cart_list: bool = False, # deprecated inc_organization: bool = False, # deprecated inc_person: bool = False, # inc_person_list: bool = False, # deprecated # inc_post_list: bool = False, # deprecated # inc_post_comment_list: bool = False, # deprecated inc_user_role_list: bool = False, ) -> User_Out_Base|dict|bool: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass else: return False if user_rec := sql_select(table_name='v_user', record_id=user_id): pass else: return False log.debug(user_rec) try: user_obj = User_Out_Base(**user_rec) log.debug(user_obj) except ValidationError as e: log.error(e.json()) return False # Updated 2021-06-18 if inc_contact: log.warning(f'This is being deprecated? load_user_obj() inc_contact') # contact_id = user_rec.get('contact_id', None) # log.debug(contact_id) # if contact_result := load_contact_obj( # contact_id = contact_id, # limit = limit, # by_alias = by_alias, # exclude_unset = exclude_unset, # model_as_dict = model_as_dict, # enabled = enabled, # inc_address = inc_address, # ): # user_obj.contact = contact_result # else: user_obj.contact = {} # None if inc_event_list: log.warning(f'This is being deprecated? load_user_obj() inc_event_list') from app.methods.event_methods import load_event_obj_list if event_dict_list := load_event_obj_list( user_id = user_id, limit = limit, model_as_dict = model_as_dict, enabled = enabled, ): user_obj.event_list = event_dict_list else: user_obj.event_list = [] # Updated 2021-06-18 if inc_order_list: log.warning(f'This is being deprecated? load_user_obj() inc_order_list') if order_rec_list_result := get_order_rec_list( for_obj_type = 'user', for_obj_id = user_id, limit = limit, enabled = enabled, ): order_result_list = [] for order_rec in order_rec_list_result: if load_order_result := load_order_obj( order_id = order_rec.get('order_id', None), limit = limit, by_alias = by_alias, exclude_unset = exclude_unset, model_as_dict = model_as_dict, enabled = enabled, inc_order_cfg = inc_order_cfg, inc_order_line_list = inc_order_line_list, inc_person = inc_person, ): order_result_list.append(load_order_result) else: order_result_list.append(None) user_obj.order_list = order_result_list else: user_obj.order_list = [] # Updated 2021-12-14 if inc_organization: log.warning(f'This is being deprecated? load_user_obj() inc_organization') organization_id = user_rec.get('organization_id', None) log.debug(organization_id) if organization_result := load_organization_obj( organization_id = organization_id, limit = limit, by_alias = by_alias, exclude_unset = exclude_unset, model_as_dict = model_as_dict, enabled = enabled, inc_address = inc_address, inc_contact = inc_contact, ): user_obj.organization = organization_result else: user_obj.organization = {} # None # Updated 2021-12-14 if inc_person: person_id = user_rec.get('person_id', None) log.debug(person_id) if person_result := load_person_obj( person_id = person_id, by_alias = by_alias, exclude_unset = exclude_unset, model_as_dict = model_as_dict, inc_address = inc_address, inc_contact = inc_contact, inc_organization = inc_organization, ): user_obj.person = person_result else: user_obj.person = {} # None log.debug(person_result) # Updated 2021-06-18 # if inc_post_list: # log.warning(f'This is being deprecated? load_user_obj() inc_post_list') # if post_rec_list_result := get_post_rec_list( # for_obj_type = 'user', # for_obj_id = user_id, # limit = limit, # enabled = enabled, # ): # post_result_list = [] # for post_rec in post_rec_list_result: # if load_post_result := load_post_obj( # post_id = post_rec.get('post_id', None), # limit = limit, # by_alias = by_alias, # exclude_unset = exclude_unset, # model_as_dict = model_as_dict, # enabled = enabled, # inc_post_comment_list = inc_post_comment_list, # inc_person = inc_person, # # inc_user = inc_user, # ): # post_result_list.append(load_post_result) # else: post_result_list.append(None) # user_obj.post_list = post_result_list # else: user_obj.post_list = [] # Updated 2021-06-25 if inc_user_role_list: if user_role_rec_list_result := get_user_role_rec_list( for_obj_type = 'user', for_obj_id = user_id, limit = limit, enabled = enabled, ): user_role_result_list = [] for user_role_rec in user_role_rec_list_result: if load_user_role_result := load_user_role_obj( user_role_id = user_role_rec.get('user_role_id', None), by_alias = by_alias, exclude_unset = exclude_unset, model_as_dict = model_as_dict, ): user_role_result_list.append(load_user_role_result) else: user_role_result_list.append(None) user_obj.user_role_list = user_role_result_list else: user_obj.user_role_list = [] log.debug(user_obj) if model_as_dict: return user_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member else: return user_obj # ### END ### API User Methods ### load_user_obj() ### # ### BEGIN ### API User Methods ### get_user_rec_list() ### # Updated 2021-12-13 @logger_reset def get_user_rec_list( account_id: int|str, hidden: str = 'not_hidden', # hidden, not_hidden, all enabled: str = 'enabled', # enabled, disabled, all limit: int = 1000, offset: int = 0, ) -> list|bool: log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass else: return False data = {} data['account_id'] = account_id sql_where_account_id = f'`user`.account_id = :account_id' # data[f'{for_obj_type}_id'] = for_obj_id # data['for_obj_type'] = for_obj_type # sql_obj_type_id = f'`user`.{for_obj_type}_id = :{for_obj_type}_id' sql_enabled, data['enable'] = sql_enable_part(table_name='user', enabled=enabled) # Reasonably safe return str and bool sql_limit = sql_limit_offset_part(limit=limit, offset=offset) # Reasonably safe return str sql = f""" SELECT `user`.id AS 'user_id', `user`.id_random AS 'user_id_random' FROM `user` AS `user` WHERE {sql_where_account_id} {sql_enabled} ORDER BY user.name, user.email, user.username, `user`.created_on DESC, `user`.updated_on DESC {sql_limit}; """ log.debug(sql) if user_rec_li_result := sql_select(data=data, sql=sql, as_list=True): log.info('Got a list result') user_rec_li = user_rec_li_result else: # [] or False log.info('No results or something went wrong') user_rec_li = user_rec_li_result log.debug(user_rec_li_result) return user_rec_li # ### END ### API User Methods ### get_user_rec_list() ### # ### BEGIN ### User Methods ### email_user_auth_key_url() ### # This emails the actual one time use sign in URL for a user. # Updated 2021-12-02 def email_user_auth_key_url( account_id: int|str, user_id: int|str, root_url: str, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass else: return False if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass else: return False if user_obj := load_user_obj( user_id = user_id, ): log.info('User object loaded') if user_obj.enable and user_obj.allow_auth_key: new_auth_key = secrets.token_urlsafe(default_num_bytes) update_user_data = {} update_user_data['auth_key'] = new_auth_key if user_rec_update_result := sql_update( table_name = 'user', record_id = user_id, data = update_user_data ): log.info('The user record was updated with a new auth_key') else: log.warning('The user record was not updated with a new auth_key') return False else: log.warning('The user record was not enabled or auth_key is not allowed') return False else: return False log.debug(user_obj) from app.methods.account_cfg_methods import load_account_cfg_obj if account_cfg := load_account_cfg_obj( account_id = account_id, ): log.info('Account config loaded') else: return False log.debug(account_cfg) user_id_random = user_obj.id_random # NOTE: Not user_id_random because of alias from_email = account_cfg.default_no_reply_email from_name = account_cfg.default_no_reply_name to_name = user_obj.name to_email = user_obj.email bcc_email = account_cfg.confirm_email bcc_name = account_cfg.confirm_name help_tech_email = account_cfg.help_tech_email help_tech_name = account_cfg.help_tech_name account_short_name = account_cfg.account_short_name username = user_obj.username enable = user_obj.enable if enable_from := user_obj.enable_from: # enable_from_datetime = datetime.datetime.fromisoformat(enable_from).replace(tzinfo=datetime.timezone.utc) enable_from_datetime = enable_from.replace(tzinfo=datetime.timezone.utc) enable_from_datetime = enable_from enable_from_str = enable_from_datetime.strftime('%A, %B %-d, %Y %-I:%M %p %Z') else: enable_from_str = '-- Not Set --' if enable_to := user_obj.enable_to: # enable_to_datetime = datetime.datetime.fromisoformat(enable_to).replace(tzinfo=datetime.timezone.utc) enable_to_datetime = enable_to.replace(tzinfo=datetime.timezone.utc) enable_to_str = enable_to_datetime.strftime('%A, %B %-d, %Y %-I:%M %p %Z') else: enable_to_str = '-- Not Set --' auth_key = user_obj.auth_key user_login_url = f'{root_url}user/login?username={urllib.parse.quote(username)}&email={urllib.parse.quote(to_email)}' user_login_auth_key_url = f'{root_url}?user_id={urllib.parse.quote(user_id_random)}&auth_key={urllib.parse.quote(new_auth_key)}&valid_email={True}' subject = f'{account_short_name}: One Time Use Sign In Link ({new_auth_key})' body_html = f"""
{to_name},
If you did not request this sign in link, please delete this email. It is suggested that you delete this email after the sign in link has been used or if a new link has been requested.
The link below can only be used to sign in once. If you would like to sign in again using this method, you must request a new sign in link. If you request multiple links, only the newest link will sign you in.
Click to Sign In With One Time Use Link
Or copy and paste the link:
{user_login_auth_key_url}
Your username is: {username}
User account enabled: {enable}
User account enabled from: {enable_from_str}
User account enabled to: {enable_to_str}
Current one time use auth key for user account: {new_auth_key}
If you have questions about this email or trouble with this one time use link, you can email {help_tech_name} ({help_tech_email}).
Thank you!
""" log.info('Trying send_email()...') if send_email(from_email=from_email, from_name=from_name, to_email=to_email, to_name=to_name, bcc_email=bcc_email, bcc_name=bcc_name, subject=subject, body_text=None, body_html=body_html): log.info(f'An email with a one time use sign in link was sent to {to_email}.') return True else: log.info(f'An email with a one time use sign in link was not sent to {to_email}.') return False # ### END ### User Methods ### email_user_auth_key_url() ###