Files
OSIT-AE-API-FastAPI/app/methods/user_methods.py

608 lines
26 KiB
Python

from __future__ import annotations
import datetime, random, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
from app.lib_general import log, logging, send_email
# from app.methods.account_methods import load_account_cfg_obj
from app.methods.contact_methods import load_contact_obj, update_contact_obj
from app.methods.order_methods import load_order_obj, get_order_rec_list
from app.methods.organization_methods import load_organization_obj, update_organization_obj
from app.methods.person_methods import create_person_obj_v3, load_person_obj, update_person_obj
from app.methods.post_methods import get_post_rec_list, load_post_obj
from app.methods.user_role_methods import get_user_role_rec_list, load_user_role_obj
from app.models.common_field_schema import default_num_bytes
from app.models.user_models import User_Base, User_New_Base, User_Out_Base
# ### BEGIN ### API User Methods ### create_user_obj() ###
# NOTE: This will create a new user and also hash a new password string if given.
# NOTE: This uses the User_New_Base model, not User_Base or User_Out_Base
# Updated 2021-08-25
# Reviewed and updated 2021-08-21
# Reviewed and updated 2021-08-10
def create_user_obj(
account_id: int|str,
user_obj_new: User_New_Base,
allow_update: bool = False, # Allow updating the user account if one is found
avoid_dup_username: bool = False, # Avoid creating a duplicate by modifying the supplied username
create_sub_obj: bool = False,
fail_any: bool = True, # Fail if any thing goes wrong for sub objects
return_dict: bool = False,
) -> bool|dict|int:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: return False
log.debug(type(user_obj_new))
if isinstance(user_obj_new, dict):
user_obj_new['account_id'] = account_id
try:
user_obj_new = User_New_Base(**user_obj_new)
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_obj_new)
except ValidationError as e:
log.error(e.json())
return False
user_obj_data = user_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'contact', 'new_password', 'organization', 'person', 'created_on', 'updated_on'})
log.debug(user_obj_data)
user_obj_data['account_id'] = account_id
user_obj_data['password'] = user_obj_new.password # There has to be a better way to do this??? It thinks "password" is unset and so is excluded?
log.debug(user_obj_data)
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
elif account_id := user_obj_data.get('account_id', None): pass
else: return False
# account_id = user_obj_data.get('account_id', None)
username = user_obj_data.get('username', None)
log.info(f'Checking if the username is already in use for the account... Account: {account_id} Username: {username}')
sql_select_user = f"""
SELECT user.id, user.id_random, user.name, user.email
FROM `user` AS user
WHERE user.account_id = :account_id and user.username = :username
"""
if sql_select_result := sql_select(sql=sql_select_user, data=user_obj_data, rm_id_random=True):
if isinstance(sql_select_result, list):
log.exception(f'Multiple user accounts already exists with this username. The database needs to be checked. Account ID: {account_id} Username: {username}')
return False
user_id = sql_select_result.get('id', None)
log.info('A user account already exists with this username. Current User ID: {user_id} Username: {username}')
if allow_update:
log.info('Updating instead of inserting. Current User ID: {user_id} Username: {username}')
user_obj_data['id'] = user_id
if user_obj_up_result := sql_update(data=user_obj_data, table_name='user', rm_id_random=True):
log.info(f'User updated with new user data. User ID: {user_id}')
else:
log.warning(f'User not updated with new user data. User ID: {user_id}')
log.debug(user_obj_up_result)
return False
else:
log.info('Updating is now allowed. Current User ID: {user_id} Username: {username}')
if avoid_dup_username:
log.info('Avoiding duplicate username is now allowed. Suggested Username: {username}')
new_username = username+'-'+str(random.randint(10, 99))
user_obj_data['username'] = new_username
if user_obj_in_result := sql_insert(data=user_obj_data, table_name='user', rm_id_random=True, id_random_length=8): pass
else:
log.warning(f'User not created.')
log.debug(user_obj_in_result)
return False
user_id = user_obj_in_result
else:
log.warning(f'Updating is not allowed and avoid duplicate username is not allowed. Username: {username}')
return False
#log.setLevel(logging.DEBUG)
# log.debug(user_obj_up_result)
log.debug(f'Returning the existing user_id: {user_id}')
else:
if user_obj_in_result := sql_insert(data=user_obj_data, table_name='user', rm_id_random=True, id_random_length=8): pass
else:
log.warning(f'User not created.')
log.debug(user_obj_in_result)
return False
user_id = user_obj_in_result
log.debug(f'Returning the new user_id: {user_id}')
return user_id
# ### END ### API User Methods ### create_user_obj() ###
# ### BEGIN ### API User Methods ### update_user_obj() ###
# Updated 2021-08-25
def update_user_obj(
user_id: int|str, # Ideally the int ID should be passed. This allows for updating of the id_random value.
user_obj_up: User_Base,
create_sub_obj: bool = False,
fail_any: bool = True, # Fail if any thing goes wrong for sub objects
return_dict: bool = False,
) -> bool|int:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else: return False
log.debug(type(user_obj_up))
if isinstance(user_obj_up, dict):
try:
user_obj_up = User_Base(**user_obj_up)
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_obj_up)
except ValidationError as e:
log.error(e.json())
return False
user_obj_up.id = user_id
log.debug(user_obj_up)
# log.debug(user_obj_up.dict(by_alias=True, exclude_unset=True))
log.debug(user_obj_up.dict(by_alias=False, exclude_unset=True))
# log.debug(user_obj_up.dict(by_alias=False, exclude_unset=False))
# if user_obj_up.contact_id and user_obj_up.contact:
# contact_id = user_obj_up.contact_id
# contact_obj_up = user_obj_up.contact
# log.debug(contact_id)
# log.debug(contact_obj_up)
# if contact_obj_up_result := update_contact_obj(
# contact_id=contact_id,
# contact_obj_up=contact_obj_up,
# create_sub_obj=create_sub_obj,
# ):
# log.debug(contact_obj_up_result)
# else:
# log.debug(contact_obj_up_result)
# return False
# elif user_obj_up.contact and not user_obj_up.contact.id:
# # NOTE: This will blindly create a new contact even if there was one associated but the user.contact_id was not found.
# contact_obj_in = user_obj_up.contact
# log.debug(contact_obj_in)
# if contact_obj_in_result := create_contact_obj(contact_obj_new=contact_obj_in):
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(contact_obj_in_result)
# user_obj_up.contact_id = contact_obj_in_result
# else:
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(contact_obj_in_result)
# return False
# if organization_obj_update := user_obj_up.organization:
# log.debug(organization_obj_update)
# if organization_obj_up_result := update_organization_obj(organization_obj_update):
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(organization_obj_up_result)
# return True
# else:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(organization_obj_up_result)
# return False
# else:
# if organization_obj_in_result := insert_organization_obj(organization_obj_insert):
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(organization_obj_in_result)
# return True # NOTE: This needs to return the new organization ID
# else:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(organization_obj_in_result)
# return False
# if user_obj_up.person_id and user_obj_up.person:
# person_id = user_obj_up.person_id
# person_obj_up = user_obj_up.person
# log.debug(person_id)
# log.debug(person_obj_up)
# if person_obj_up_result := update_person_obj(
# person_id=person_id,
# person_obj_up=person_obj_up,
# create_sub_obj=create_sub_obj,
# ):
# log.debug(person_obj_up_result)
# else:
# log.debug(person_obj_up_result)
# return False
# elif user_obj_up.person and not user_obj_up.person.id:
# # NOTE: This will blindly create a new person even if there was one associated but the user.person_id was not found.
# person_obj_in = user_obj_up.person
# log.debug(person_obj_in)
# if person_obj_in_result := create_person_obj_v3(person_obj_new=person_obj_in):
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(person_obj_in_result)
# person_obj_up.person_id = person_obj_in_result
# else:
# # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(person_obj_in_result)
# return False
# IMPORTANT NOTE: Need to be extra careful if allowing an update to password, super, or manager. Maybe other fields?
user_dict_up = user_obj_up.dict(by_alias=False, exclude_unset=True, exclude={'password', 'super', 'manager', 'contact', 'organization', 'person'})
log.debug(user_dict_up)
if user_obj_up_result := sql_update(data=user_dict_up, table_name='user', rm_id_random=True):
log.debug(user_obj_up_result)
return True
# return user_id
else:
log.debug(user_obj_up_result)
return False
# ### END ### API User Methods ### update_user_obj() ###
# ### BEGIN ### API User Methods ### load_user_obj() ###
def load_user_obj(
user_id: int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_address: bool = False,
# inc_archive_list: bool = False, # deprecated
inc_contact: bool = False,
inc_event_list: bool = False, # deprecated
# inc_hosted_file_list: bool = False, # deprecated
# inc_journal_list: bool = False, # deprecated
# inc_journal_entry_list: bool = False, # deprecated
# inc_membership_person: bool = False, # deprecated
inc_order_cfg: bool = False, # deprecated
inc_order_line_list: bool = False, # deprecated
inc_order_list: bool = False, # deprecated
inc_order_cart_list: bool = False, # deprecated
inc_organization: bool = False, # deprecated
inc_person: bool = False,
# inc_person_list: bool = False, # deprecated
# inc_post_list: bool = False, # deprecated
# inc_post_comment_list: bool = False, # deprecated
inc_user_role_list: bool = False,
) -> User_Out_Base|dict|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else: return False
if user_rec := sql_select(table_name='v_user', record_id=user_id): pass
else: return False
log.debug(user_rec)
try:
user_obj = User_Out_Base(**user_rec)
log.debug(user_obj)
except ValidationError as e:
log.error(e.json())
return False
# Updated 2021-06-18
if inc_contact:
log.warning(f'This is being deprecated? load_user_obj() inc_contact')
# contact_id = user_rec.get('contact_id', None)
# log.debug(contact_id)
# if contact_result := load_contact_obj(
# contact_id = contact_id,
# limit = limit,
# by_alias = by_alias,
# exclude_unset = exclude_unset,
# model_as_dict = model_as_dict,
# enabled = enabled,
# inc_address = inc_address,
# ):
# user_obj.contact = contact_result
# else: user_obj.contact = None
if inc_event_list:
log.warning(f'This is being deprecated? load_user_obj() inc_event_list')
from app.methods.event_methods import load_event_obj_list
if event_dict_list := load_event_obj_list(
user_id = user_id,
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
):
user_obj.event_list = event_dict_list
else: user_obj.event_list = []
# Updated 2021-06-18
if inc_order_list:
log.warning(f'This is being deprecated? load_user_obj() inc_order_list')
if order_rec_list_result := get_order_rec_list(
for_obj_type = 'user',
for_obj_id = user_id,
limit = limit,
enabled = enabled,
):
order_result_list = []
for order_rec in order_rec_list_result:
if load_order_result := load_order_obj(
order_id = order_rec.get('order_id', None),
limit = limit,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
enabled = enabled,
inc_order_cfg = inc_order_cfg,
inc_order_line_list = inc_order_line_list,
inc_person = inc_person,
):
order_result_list.append(load_order_result)
else: order_result_list.append(None)
user_obj.order_list = order_result_list
else: user_obj.order_list = []
# Updated 2021-06-18
if inc_organization:
log.warning(f'This is being deprecated? load_user_obj() inc_organization')
organization_id = user_rec.get('organization_id', None)
log.debug(organization_id)
if organization_result := load_organization_obj(
organization_id = organization_id,
limit = limit,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
enabled = enabled,
inc_address = inc_address,
inc_contact = inc_contact,
):
user_obj.organization = organization_result
else: user_obj.organization = None
# Updated 2021-06-18
if inc_person:
person_id = user_rec.get('person_id', None)
log.debug(person_id)
if person_result := load_person_obj(
person_id = person_id,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
inc_address = inc_address,
inc_contact = inc_contact,
inc_organization = inc_organization,
):
user_obj.person = person_result
else: user_obj.person = None
log.debug(person_result)
# Updated 2021-06-18
# if inc_post_list:
# log.warning(f'This is being deprecated? load_user_obj() inc_post_list')
# if post_rec_list_result := get_post_rec_list(
# for_obj_type = 'user',
# for_obj_id = user_id,
# limit = limit,
# enabled = enabled,
# ):
# post_result_list = []
# for post_rec in post_rec_list_result:
# if load_post_result := load_post_obj(
# post_id = post_rec.get('post_id', None),
# limit = limit,
# by_alias = by_alias,
# exclude_unset = exclude_unset,
# model_as_dict = model_as_dict,
# enabled = enabled,
# inc_post_comment_list = inc_post_comment_list,
# inc_person = inc_person,
# # inc_user = inc_user,
# ):
# post_result_list.append(load_post_result)
# else: post_result_list.append(None)
# user_obj.post_list = post_result_list
# else: user_obj.post_list = []
# Updated 2021-06-25
if inc_user_role_list:
if user_role_rec_list_result := get_user_role_rec_list(
for_obj_type = 'user',
for_obj_id = user_id,
limit = limit,
enabled = enabled,
):
user_role_result_list = []
for user_role_rec in user_role_rec_list_result:
if load_user_role_result := load_user_role_obj(
user_role_id = user_role_rec.get('user_role_id', None),
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
):
user_role_result_list.append(load_user_role_result)
else: user_role_result_list.append(None)
user_obj.user_role_list = user_role_result_list
else: user_obj.user_role_list = []
log.debug(user_obj)
if model_as_dict:
return user_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return user_obj
# ### END ### API User Methods ### load_user_obj() ###
# ### BEGIN ### API User Methods ### get_user_rec_list() ###
def get_user_rec_list(
account_id: int|str,
hidden: str = 'not_hidden', # hidden, not_hidden, all
enabled: str = 'enabled', # enabled, disabled, all
limit: int = 1000,
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: return False
data = {}
data['account_id'] = account_id
sql_where_account_id = f'`user`.account_id = :account_id'
# data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
# sql_obj_type_id = f'`user`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `user`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `user`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `user`.id AS 'user_id', `user`.id_random AS 'user_id_random'
FROM `user` AS `user`
WHERE
{sql_where_account_id}
{sql_enabled}
ORDER BY user.name, user.email, user.username, `user`.created_on DESC, `user`.updated_on DESC
{sql_limit};
"""
if user_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
user_rec_li = user_rec_li_result
else:
user_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_rec_li_result)
return user_rec_li
# ### END ### API User Methods ### get_user_rec_list() ###
# ### BEGIN ### User Methods ### email_user_auth_key_url() ###
# This emails the actual one time use sign in URL for a user.
# Updated 2021-12-02
def email_user_auth_key_url(
account_id: int|str,
user_id: int|str,
root_url: str,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: return False
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else: return False
if user_obj := load_user_obj(
user_id = user_id,
):
log.info('User object loaded')
if user_obj.enable and user_obj.allow_auth_key:
new_auth_key = secrets.token_urlsafe(default_num_bytes)
update_user_data = {}
update_user_data['auth_key'] = new_auth_key
if user_rec_update_result := sql_update(
table_name = 'user',
record_id = user_id,
data = update_user_data
):
log.info('The user record was updated with a new auth_key')
else:
log.warning('The user record was not updated with a new auth_key')
return False
else:
log.warning('The user record was not enabled or auth_key is not allowed')
return False
else: return False
log.debug(user_obj)
from app.methods.account_cfg_methods import load_account_cfg_obj
if account_cfg := load_account_cfg_obj(
account_id = account_id,
):
log.info('Account config loaded')
else: return False
log.debug(account_cfg)
user_id_random = user_obj.id_random # NOTE: Not user_id_random because of alias
from_email = account_cfg.default_no_reply_email
from_name = account_cfg.default_no_reply_name
to_name = user_obj.name
to_email = user_obj.email
bcc_email = account_cfg.confirm_email
bcc_name = account_cfg.confirm_name
help_tech_email = account_cfg.help_tech_email
help_tech_name = account_cfg.help_tech_name
account_short_name = account_cfg.account_short_name
username = user_obj.username
enable = user_obj.enable
if enable_from := user_obj.enable_from:
# enable_from_datetime = datetime.datetime.fromisoformat(enable_from).replace(tzinfo=datetime.timezone.utc)
enable_from_datetime = enable_from.replace(tzinfo=datetime.timezone.utc)
enable_from_datetime = enable_from
enable_from_str = enable_from_datetime.strftime('%A, %B %-d, %Y %-I:%M %p %Z')
else: enable_from_str = '-- Not Set --'
if enable_to := user_obj.enable_to:
# enable_to_datetime = datetime.datetime.fromisoformat(enable_to).replace(tzinfo=datetime.timezone.utc)
enable_to_datetime = enable_to.replace(tzinfo=datetime.timezone.utc)
enable_to_str = enable_to_datetime.strftime('%A, %B %-d, %Y %-I:%M %p %Z')
else: enable_to_str = '-- Not Set --'
auth_key = user_obj.auth_key
user_login_url = f'{root_url}user/login?username={username}'
user_login_auth_key_url = f'{root_url}?user_id={user_id_random}&auth_key={new_auth_key}'
subject = f'{account_short_name}: One Time Use Sign In Link ({new_auth_key})'
body_html = f"""
<p>{to_name},</p>
<p>If you did not request this sign in link, please delete this email. It is suggested that you delete this email after the sign in link has been used or if a new link has been requested.</p>
<p>The link below can only be used to sign in once. If you would like to sign in again using this method, you must <a href="{user_login_url}">request a new sign in link</a>. If you request multiple links, only the newest link will sign you in.</p>
<p><strong><a href="{user_login_auth_key_url}" style="appearance: button; display: inline-block; text-align: center; text-decoration: none; padding: .2rem .4rem; border: solid thin gray; border-radius: .2rem; background-color: lightyellow; color: black; font-size: larger;">Click to Sign In With One Time Use Link</a></strong></p>
<p>Or copy and paste the link:<br>
<strong style="background-color: lightyellow; color: black; font-size: larger;"><a href="{user_login_auth_key_url}">{user_login_auth_key_url}</a></strong></p>
<p>Your username is: {username}<br>
User account enabled: {enable}<br>
User account enabled from: {enable_from_str}<br>
User account enabled to: {enable_to_str}<br>
Current one time use auth key for user account: {new_auth_key}</p>
<p>If you have questions about this email or trouble with this one time use link, you can email <a href="mailto:{help_tech_email}">{help_tech_name} ({help_tech_email})</a>.</p>
<p>Thank you!</p>
"""
if send_email(from_email=from_email, from_name=from_name, to_email=to_email, to_name=to_name, bcc_email=bcc_email, bcc_name=bcc_name, subject=subject, body_text=None, body_html=body_html):
log.info(f'An email with a one time use sign in link was sent to {to_email}.')
return True
else:
log.info(f'An email with a one time use sign in link was not sent to {to_email}.')
return False
# ### END ### User ### email_user_auth_key_url() ###