Files
OSIT-AE-API-FastAPI/app/routers/user.py

978 lines
46 KiB
Python

import datetime, pytz, secrets, time
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Path, Query, Response, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from app.lib_general import log, logging, secure_hash_string, verify_secure_hash_string, common_route_params, Common_Route_Params
from app.config import settings
from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, get_id_random, redis_lookup_id_random
from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template
from app.methods.order_methods import get_order_rec_list, load_order_obj
from app.methods.user_methods import create_user_obj, email_user_auth_key_url, get_user_rec_list, load_user_obj
from app.models.common_field_schema import default_num_bytes
from app.models.response_models import Resp_Body_Base, mk_resp
from app.models.user_models import User_Base, User_New_Base, User_Out_Base
router = APIRouter()
@router.post('/user', response_model=Resp_Body_Base)
async def post_user_obj(
obj: User_Base,
return_obj: Optional[bool] = True,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
obj_data_dict = obj.dict(by_alias=False, exclude_unset=True)
result = post_obj_template(
obj_type = obj_type,
data = obj_data_dict,
return_obj = True,
by_alias = True,
exclude_unset = True,
)
return result
# ### BEGIN ### API User ### post_user_obj_new() ###
# Updated 2021-08-21 (complete re-write)
@router.post('/user/new', response_model=Resp_Body_Base)
async def post_user_obj_new(
user_obj: User_New_Base,
allow_update: bool = False,
avoid_dup_username: bool = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id_random := user_obj.account_id_random: pass
else: return False
if create_user_obj_result := create_user_obj(account_id=account_id_random, user_dict_obj=user_obj, allow_update=allow_update, avoid_dup_username=avoid_dup_username): pass
else: return mk_resp(data=False, status_code=400, response=commons.response, status_message='The user account was not created. This is likely because that username already exists for this account.')
if isinstance(create_user_obj_result, int):
user_id = create_user_obj_result
if return_obj:
if load_user_obj_result := load_user_obj(user_id=user_id):
data = load_user_obj_result
else:
data = False
else:
user_id = create_user_obj_result
user_id_random = get_id_random(record_id=user_id, table_name='user')
data = {}
data['user_id'] = user_id
data['user_id_random'] = user_id_random
return mk_resp(data=data, response=commons.response, status_message='The user account was created.')
else:
return mk_resp(data=False, status_code=400, response=commons.response, status_message='The result from trying to create a user account was unexpected.')
# ### END ### API User ### post_user_obj_new() ###
# ### BEGIN ### API User ### user_obj_change_password() ###
@router.patch('/user/{user_id}/change_password', response_model=Resp_Body_Base)
async def user_obj_change_password(
user_id: Union[int,str],
# user_obj: User_Base,
user_dict: dict, # User_Base,
return_obj: bool = False,
inc_user_role_list: bool = False,
# inc_contact: bool = False,
# inc_organization: bool = False,
# inc_person: bool = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# if password := user_obj.password: pass
# else: return mk_resp(data=False, status_code=400, status_message='The new password is required.', response=commons.response) # Bad Request
if password := user_dict.get('password'): pass
else: return mk_resp(data=False, status_code=400, status_message='The new password is required.', response=commons.response) # Bad Request
generated_password = None
if password and len(password) >= 10: pass
elif password and len(password) < 10:
log.warning(f'The password given must be at least 10 characters. User ID: {user_id}')
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f'The password given must be at least 10 characters. User ID: {user_id}') # Bad Request
else:
log.warning('No password was given. Generating a new random password.')
generated_password = secrets.token_urlsafe(default_num_bytes)
password = generated_password
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else: return mk_resp(data=False, status_code=404, response=commons.response) # Not Found
user_data = {}
#user_data['user_id'] = user_id
#user_data['username'] = username #????
user_data['password'] = secure_hash_string(string=password)
table_name = 'user'
if user_rec_update_result := sql_update(data=user_data, table_name=table_name, record_id=user_id, id_random_length=None): pass
else: mk_resp(data=False, status_code=500, status_message='Something went wrong while trying to update the password record.', response=commons.response)
if return_obj:
user_obj = load_user_obj(
user_id = user_id,
inc_user_role_list = inc_user_role_list
# inc_contact = inc_contact,
# inc_organization = inc_organization,
# inc_person = inc_person
).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
data = user_obj
else:
data = True
if generated_password:
return mk_resp(data=data, status_message='Generated random password', response=commons.response)
else:
return mk_resp(data=data, status_message='The password has been changed.', response=commons.response)
#return mk_resp(data=None, status_code=501, response=commons.response) # Not Implemented
# ### END ### API User ### user_obj_change_password() ###
@router.patch('/user/{obj_id}', response_model=Resp_Body_Base)
async def patch_user_obj(
obj: User_Base,
obj_id: str = Path(min_length=11, max_length=22),
return_obj: Optional[bool] = True,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
obj_data_dict = obj.dict(by_alias=False, exclude_unset=True)
obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type)
obj_data_dict['id_random'] = obj_id
result = patch_obj_template(
obj_type=obj_type,
data=obj_data_dict,
obj_id=obj_id,
return_obj=True,
by_alias=True,
exclude_unset=True,
)
return result
# ### BEGIN ### API User Routers ### user_new_auth_key() ###
# Generate a new one time use authorization key for login without password
# Updated 2022-01-07
# @router.get('/user/new_auth_key', response_model=Resp_Body_Base)
@router.get('/user/{user_id}/new_auth_key', response_model=Resp_Body_Base)
async def user_new_auth_key(
user_id: str = Path(min_length=11, max_length=22),
return_obj: bool = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else: return mk_resp(data=None, status_code=404, response=commons.response, status_message='The user ID was invalid or not found.')
update_user_data = {}
update_user_data['id'] = user_id
update_user_data['auth_key'] = secrets.token_urlsafe(default_num_bytes)
if user_rec_update_result := sql_update(table_name='user', data=update_user_data):
log.info('The user record was updated with a new auth_key')
if return_obj:
user_obj = load_user_obj(
user_id = user_id,
).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
data = user_obj
else:
user_obj = {}
# user_obj['allow_auth_key'] = update_user_data['allow_auth_key']
user_obj['auth_key'] = update_user_data['auth_key']
# user_obj['enable'] = update_user_data['enable']
# user_obj['enable_from'] = update_user_data['enable_from']
# user_obj['enable_to'] = update_user_data['enable_to']
return mk_resp(data=user_obj, response=commons.response)
else:
log.info('The user record was not updated with a new auth_key')
log.debug(user_rec_update_result)
return mk_resp(data=False, status_code=404, response=commons.response)
# ### END ### API User Routers ### user_new_auth_key() ###
# ### BEGIN ### API User Routers ### user_authenticate() ###
# Authenticate a username and password OR by user ID and authorization key
# An authorization key can only be done once. It will be deleted if found.
# A new key will need to be requested for a particular user each time.
# NOTE: Should this be divided into username/password and user ID/auth key endpoints? Probably vote 2x
# Updated 2021-10-06
@router.get('/user/authenticate', response_model=Resp_Body_Base)
async def user_authenticate(
null_account_id: bool = False,
user_id: Optional[str] = Query(None, min_length=11, max_length=22),
username: Optional[str] = Query(None, min_length=3, max_length=50),
password: Optional[str] = Query(None, min_length=8, max_length=100),
auth_key: Optional[str] = Query(None, min_length=11, max_length=22),
valid_email: Optional[bool] = None,
inc_user_role_list: bool = False,
inc_contact: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
account_id = commons.x_account_id
if username and password:
user_data = {}
if not null_account_id: user_data['account_id'] = account_id
else: user_data['account_id'] = None
user_data['username'] = username
# sql_select(table_name='user', data=user_data)
if not null_account_id:
sql = f"""
SELECT `user`.id AS 'user_id', `user`.id_random AS 'user_id_random', `user`.password, `user`.enable, `user`.enable_from, `user`.enable_to
FROM `user` AS `user`
WHERE `user`.account_id = :account_id AND `user`.username = :username
LIMIT 1
"""
else:
sql = f"""
SELECT `user`.id AS 'user_id', `user`.id_random AS 'user_id_random', `user`.password, `user`.enable, `user`.enable_from, `user`.enable_to
FROM `user` AS `user`
WHERE `user`.account_id IS NULL AND `user`.username = :username
LIMIT 1
"""
# This will return a list if selecting by account ID
if user_rec_result := sql_select(data=user_data, sql=sql):
user_id = user_rec_result.get('user_id', None)
if password_hash := user_rec_result.get('password', None):
if verify_secure_hash_string(string=password, string_hash=password_hash):
log.info(f'The username was found, and the password matched. Log in allowed if the account is enabled. Account ID: {account_id}, Username: {username}')
# The user account will be checked if it is enabled below.
else:
log.info(f'The username was found, but the password did not match. Not allowed to log in. Account ID: {account_id}, Username: {username}')
return mk_resp(data=False, status_message=f'The username was found, but the password did not match. Not allowed to log in. Account ID: {account_id}, Username: {username}', response=commons.response)
else:
log.warning(f'The password hash has was not found. Not allowed to log in. Account ID: {account_id}, Username: {username}')
return mk_resp(data=False, status_code=400, status_message=f'The password hash has was not found. Not allowed to log in. Account ID: {account_id}, Username: {username}', response=commons.response)
else:
log.info(f'A user account was not found with the account and username given. Not allowed to log in. Account ID: {account_id}, Username: {username}')
return mk_resp(data=None, status_code=404, status_message=f'A user account was not found with the account and username given. Not allowed to log in. Account ID: {account_id}, Username: {username}', response=commons.response) # Not Found
elif user_id and auth_key:
# NOTE: Since the user_id (which is required to be unique in the DB table), the account_id is not really needed.
user_data = {}
user_data['user_id_random'] = user_id # user_id_random
user_data['auth_key'] = auth_key
# NOTE: allow_auth_key is hardcoded in the SQL query to check that it is True
sql = f"""
SELECT `user`.id AS 'user_id', `user`.id_random AS 'user_id_random', `user`.password, `user`.enable, `user`.enable_from, `user`.enable_to
FROM `user` AS `user`
WHERE `user`.id_random = :user_id_random
AND `user`.auth_key = :auth_key
AND `user`.allow_auth_key = 1
LIMIT 1
"""
if user_rec_result := sql_select(data=user_data, sql=sql):
log.info(f'The user ID and auth key combination was found. Log in allowed if the account is enabled. User ID: {user_id}')
# The user account will be checked if it is enabled below.
# NOTE: Using the id (not id_random) value to do the SQL UPDATE faster. Probably an insignificant speed difference though.
update_user_data = {}
update_user_data['id'] = user_rec_result.get('user_id', None) # Using ID, not ID Random
update_user_data['auth_key'] = None
if valid_email:
update_user_data['email_verified'] = True
if user_rec_update_result := sql_update(table_name='user', data=update_user_data):
log.info(f'The user record was updated with a NULL auth_key. User ID: {user_id}')
else:
log.error(f'The user record was not updated with a NULL auth_key. User ID: {user_id}')
log.debug(update_user_data)
log.debug(user_rec_update_result)
# user_id = user_rec_result.get('id', None) # NOTE: This is looking for "id", not "user_id"
else:
log.info(f'The user ID and auth key combination was not found. Not allowed to log in. User ID: {user_id}, Auth Key: {auth_key}')
log.debug(user_data)
log.debug(sql)
return mk_resp(data=None, status_code=404, status_message=f'The user ID and auth key combination was not found. Not allowed to log in. User ID: {user_id}, Auth Key: {auth_key}', response=commons.response) # Not Found
else:
return mk_resp(data=None, status_code=400, status_message='One more user account fields was missing or unexpected.', response=commons.response) # Bad Request
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_rec_result)
# Check if the SQL result is as expected and check if the user account is enabled.
if isinstance(user_rec_result, dict):
log.info(f'Checking if the user is enabled. Account ID: {account_id}, Username: {username}')
current_utc_datetime = datetime.datetime.now(datetime.timezone.utc)
log.debug(current_utc_datetime)
if user_rec_result.get('enable', None):
log.info('The user account is enabled')
else:
log.info('The user account is not enabled')
return mk_resp(data=False, status_message='This user account is not enabled', response=commons.response)
if user_rec_result.get('enable_from', None):
#if user_enable_from := user_rec_result.get('enable_from', None).astimezone(pytz.UTC):
if user_enable_from := user_rec_result.get('enable_from', None).replace(tzinfo=datetime.timezone.utc):
log.debug(user_enable_from)
if user_enable_from <= current_utc_datetime:
log.info('Enable from datetime is valid')
else:
log.info(f'Enable from datetime is in the future. The user account has not been enabled yet. User Enabled From: {user_enable_from}')
return mk_resp(data=False, status_message=f'This user account is not yet enabled. User Enabled From: {user_enable_from}', response=commons.response)
else:
log.warning('The enable_from datetime was not set. Ignoring this check.')
if user_rec_result.get('enable_to', None):
#if user_enable_to := user_rec_result.get('enable_to', None).astimezone(pytz.UTC):
if user_enable_to := user_rec_result.get('enable_to', None).replace(tzinfo=datetime.timezone.utc):
log.debug(user_enable_to)
if user_enable_to >= current_utc_datetime:
log.info('Enable to datetime is valid')
else:
log.info(f'Enable to datetime is in the past. The user account has been disabled. User Enabled To: {user_enable_to}')
return mk_resp(data=False, status_message=f'This user account is not enabled because the expiratation date has passed. User Enabled To: {user_enable_to}', response=commons.response)
else:
log.warning('The enable_to datetime was not set. Ignoring this check.')
update_user_data = {}
update_user_data['id'] = user_rec_result.get('user_id', None) # Using ID, not ID Random
if valid_email:
update_user_data['email_verified'] = True
update_user_data['logged_in_on'] = datetime.datetime.utcnow()
if user_rec_update_result := sql_update(table_name='user', data=update_user_data):
log.info(f'The user record was updated with a NULL auth_key. User ID: {user_id}')
else:
log.error(f'The user record was not updated with a NULL auth_key. User ID: {user_id}')
log.debug(update_user_data)
log.debug(user_rec_update_result)
# Try to load the user object
if user_obj_result := load_user_obj(
user_id = user_id,
inc_user_role_list = inc_user_role_list,
inc_contact = inc_contact,
inc_organization = inc_organization,
inc_person = inc_person,
):
log.info(f'The user account was loaded. Account ID: {account_id} Username: {username}')
user_obj_dict = user_obj_result.dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=user_obj_dict, response=commons.response)
else:
log.warning(f'Something went wrong while trying to load the user account. Account ID: {account_id} Username: {username}')
log.debug(user_obj_result)
return mk_resp(data=False, status_code=500, status_message=f'Something went wrong while trying to load the user account. Account ID: {account_id} Username: {username}', response=commons.response) # Internal Server Error
else:
log.error(f'SQL result was unexpected. A dict result type was expected. This should not happen. Account ID: {account_id} Username: {username}')
log.debug(user_rec_result)
return mk_resp(data=False, status_code=500, status_message=f'The database lookup result was unexpected. This should not happen. Account ID: {account_id} Username: {username}', response=commons.response)
# ### END ### API User Routers ### user_authenticate() ###
# ### BEGIN ### API User ### user_verify_password() ###
# @router.post('/{user_id}/verify_password', response_model=Resp_Body_Base)
@router.post('/user/verify_password', response_model=Resp_Body_Base)
async def user_verify_password(
user_obj: User_Base,
# user_id: Optional[str] = Query(None, min_length=11, max_length=22),
# username: Optional[str] = Query(None, min_length=3, max_length=50),
# password: Optional[str] = Query(None, min_length=8, max_length=50),
return_obj: bool = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
account_id = commons.x_account_id
log.debug(user_obj)
log.debug(user_obj.id_random)
log.debug(user_obj.current_password)
log.debug(user_obj.username)
if current_password := user_obj.current_password: pass
else: return mk_resp(data=False, status_code=400, status_message='The current password to verify is required.', response=commons.response) # Bad Request
if user_id_random := user_obj.id_random: # Use id_random instead of user_id_random when getting from User model.
log.info(f'Using the user ID to look up the user. User ID: {user_id_random}')
# NOTE: Not doing a redis lookup since we have to look up the record again. Redis lookup may save or add an insignificant amount of time.
user_data = {}
user_data['user_id_random'] = user_id_random
sql = f"""
SELECT `user`.id AS 'user_id', `user`.id_random AS 'user_id_random', `user`.username, `user`.password, `user`.enable, `user`.enable_from, `user`.enable_to
FROM `user` AS `user`
WHERE `user`.id_random = :user_id_random
LIMIT 1
"""
if user_rec_result := sql_select(data=user_data, sql=sql):
user_id = user_rec_result.get('user_id', None)
if password_hash := user_rec_result.get('password', None):
username = user_rec_result.get('username', None)
if verify_secure_hash_string(string=current_password, string_hash=password_hash):
log.info(f'The username was found, and the password matched. Log in allowed if the account is enabled. Account ID: {account_id}, Username: {username}')
return mk_resp(data=True, response=commons.response)
else:
log.info(f'The username was found, but the password did not match. Not allowed to log in. Account ID: {account_id}, Username: {username}')
# NOTE: Returning a 404 instead of 200 even though the actual user record was found.
return mk_resp(data=False, status_code=404, status_message=f'The username was found, but the password did not match. Not allowed to log in. Account ID: {account_id}, Username: {username}', response=commons.response) # Not Found
else:
log.warning(f'The password hash has was not found. Not allowed to log in. Account ID: {account_id}, Username: {username}')
return mk_resp(data=False, status_code=400, status_message=f'The password hash has was not found. Not allowed to log in. Account ID: {account_id}, Username: {username}', response=commons.response)
else:
log.info(f'A user account was not found with the account and username given. Not allowed to log in. Account ID: {account_id}, Username: {username}')
return mk_resp(data=None, status_code=404, status_message=f'A user account was not found with the account and username given. Not allowed to log in. Account ID: {account_id}, Username: {username}', response=commons.response) # Not Found
elif username := user_obj.username:
log.info(f'Using the username to look up the user. User ID: {username}')
user_data = {}
user_data['account_id'] = account_id
user_data['username'] = username
sql = f"""
SELECT `user`.id AS 'user_id', `user`.id_random AS 'user_id_random', `user`.username, `user`.password, `user`.enable, `user`.enable_from, `user`.enable_to
FROM `user` AS `user`
WHERE `user`.account_id = :account_id AND `user`.username = :username
LIMIT 1
"""
if user_rec_result := sql_select(data=user_data, sql=sql):
user_id = user_rec_result.get('user_id', None)
if password_hash := user_rec_result.get('password', None):
if verify_secure_hash_string(string=current_password, string_hash=password_hash):
log.info(f'The username was found, and the password matched. Log in allowed if the account is enabled. Account ID: {account_id}, Username: {username}')
return mk_resp(data=True, response=commons.response)
else:
log.info(f'The username was found, but the password did not match. Not allowed to log in. Account ID: {account_id}, Username: {username}')
return mk_resp(data=False, status_message=f'The username was found, but the password did not match. Not allowed to log in. Account ID: {account_id}, Username: {username}', response=commons.response)
else:
log.warning(f'The password hash has was not found. Not allowed to log in. Account ID: {account_id}, Username: {username}')
return mk_resp(data=False, status_code=400, status_message=f'The password hash has was not found. Not allowed to log in. Account ID: {account_id}, Username: {username}', response=commons.response)
else:
log.info(f'A user account was not found with the account and username given. Not allowed to log in. Account ID: {account_id}, Username: {username}')
return mk_resp(data=None, status_code=404, status_message=f'A user account was not found with the account and username given. Not allowed to log in. Account ID: {account_id}, Username: {username}', response=commons.response) # Not Found
else:
log.warning(f'A user ID or username is required. Can not verify password.')
return mk_resp(data=False, status_code=400, status_message=f'A user ID or username is required. Can not verify password.', response=commons.response)
# ### END ### API User ### user_verify_password() ###
# ### BEGIN ### API User ### get_account_user_obj_li() ###
# Updated 2021-12-13
@router.get('/account/{account_id}/user/list', response_model=Resp_Body_Base)
async def get_account_user_obj_li(
account_id: str = Path(min_length=11, max_length=22),
hidden: str = 'not_hidden', # hidden, not_hidden, all
inc_address: bool = False, # Priority l1
inc_contact: bool = False, # Priority l1
inc_person: bool = False, # Priority l1
inc_user_role_list: bool = False, # Priority l1
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: return mk_resp(data=None, status_code=404, response=commons.response)
# Updated 2021-12-13
if user_rec_list_result := get_user_rec_list(
account_id = account_id,
hidden = hidden, # hidden, not_hidden, all
enabled = commons.enabled,
limit = commons.limit,
):
user_result_list = []
for user_rec in user_rec_list_result:
if load_user_result := load_user_obj(
user_id = user_rec.get('user_id', None),
enabled = commons.enabled,
# hidden = hidden,
limit = commons.limit,
inc_address = inc_address,
inc_contact = inc_contact,
inc_person = inc_person,
inc_user_role_list = inc_user_role_list,
by_alias = commons.by_alias,
exclude_unset = commons.exclude_unset,
# model_as_dict = model_as_dict,
):
user_result_list.append(load_user_result)
else:
user_result_list.append(None)
response_data = user_result_list
elif isinstance(user_rec_list_result, list) or user_rec_list_result is None: # Empty list or None
log.info('No results')
return mk_resp(data=False, status_code=404, response=commons.response) # Not Found
else:
log.warning('Likely bad request')
return mk_resp(data=False, status_code=400, response=commons.response) # Bad Request
return mk_resp(data=response_data, response=commons.response)
# ### END ### API User ### get_account_user_obj_li() ###
@router.get('/user/list', response_model=Resp_Body_Base)
async def get_user_obj_li(
for_obj_type: Optional[str] = Query(None, min_length=2, max_length=50),
for_obj_id: Optional[str] = Query(None, min_length=1, max_length=22),
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
result = get_obj_li_template(
obj_type=obj_type,
for_obj_type=for_obj_type,
for_obj_id=for_obj_id,
by_alias=True,
exclude_unset=True,
)
return result
# Look up is only for account or person records
@router.get('/user/lookup', response_model=Resp_Body_Base)
async def lookup_user_obj(
for_obj_id: Union[int,str],
for_obj_type: str = Query(..., min_length=2, max_length=50),
inc_user_role_list: bool = False,
inc_contact: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
base_name = User_Out_Base
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type): pass
else: return mk_resp(data=False, status_code=404, response=commons.response) # Not Found
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
data = {}
as_list = False
if for_obj_type == 'account' and for_obj_id:
data['account_id'] = for_obj_id
sql_where_for_obj_type = """`user`.account_id = :account_id"""
sql_limit = ''
as_list = True
elif for_obj_type == 'person' and for_obj_id:
data['person_id'] = for_obj_id
sql_where_for_obj_type = """`user`.person_id = :person_id"""
sql_limit = 'LIMIT 1'
else:
log.debug(f'Object type={for_obj_type}; Object ID={for_obj_id}')
return mk_resp(data=False, status_code=400, response=commons.response) # Bad Request
sql = f"""
SELECT id AS 'user_id', id_random AS 'user_id_random'
FROM `user` AS `user`
WHERE {sql_where_for_obj_type}
{sql_limit}
"""
# This will return a list if selecting by account ID
user_rec_result = sql_select(data=data, sql=sql, as_list=as_list)
if isinstance(user_rec_result, dict):
user_id = user_rec_result.get('user_id', None)
user_obj = load_user_obj(
user_id = user_id,
inc_user_role_list = inc_user_role_list,
inc_contact = inc_contact,
inc_organization = inc_organization,
inc_person = inc_person
).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
data = user_obj
elif isinstance(user_rec_result, list):
user_obj_li = []
for user_obj in user_rec_result:
user_id = user_obj.get('user_id', None)
user_obj_li.append(
load_user_obj(
user_id = user_id,
inc_user_role_list = inc_user_role_list,
inc_contact = inc_contact,
inc_organization = inc_organization,
inc_person = inc_person,
).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
)
data = user_obj_li
else:
log.debug(user_rec_result)
return mk_resp(data=None, status_code=404, response=commons.response) # Not Found
return mk_resp(data=data, response=commons.response)
# Look up a user with an email address for an account
@router.get('/user/lookup_email', response_model=Resp_Body_Base)
async def lookup_email(
email: str = Query(..., min_length=2, max_length=50),
null_account_id: bool = False,
inc_user_role_list: bool = False,
inc_contact: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
account_id = commons.x_account_id
data = {}
data['account_id'] = account_id
data['email'] = email
if commons.enabled in ['enabled', 'disabled', 'all']:
if commons.enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `user`.enable = :enable'
elif commons.enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `user`.enable = :enable'
elif commons.enabled == 'all':
sql_enabled = ''
else:
return mk_resp(data=None, status_code=400, response=commons.response) # Bad Request
data['limit'] = commons.limit
sql_limit = f'LIMIT :limit'
log.debug(data)
if not null_account_id:
sql = f"""
SELECT id AS 'user_id', id_random AS 'user_id_random'
FROM `user` AS `user`
WHERE `user`.account_id = :account_id AND `user`.email = :email
{sql_enabled}
{sql_limit};
"""
else:
sql = f"""
SELECT id AS 'user_id', id_random AS 'user_id_random'
FROM `user` AS `user`
WHERE `user`.account_id IS NULL AND `user`.email = :email
{sql_enabled}
{sql_limit};
"""
log.debug(sql)
# This will return a list if selecting by account ID
user_obj_result = sql_select(data=data, sql=sql)
if isinstance(user_obj_result, dict):
user_id = user_obj_result.get('user_id', None)
user_obj = load_user_obj(
user_id=user_id,
inc_user_role_list=inc_user_role_list,
inc_contact=inc_contact,
inc_organization=inc_organization,
inc_person=inc_person
).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
data = user_obj
elif isinstance(user_obj_result, list):
user_obj_li = []
for user_obj in user_obj_result:
user_id = user_obj.get('user_id', None)
user_obj_li.append(
load_user_obj(
user_id=user_id,
inc_user_role_list=inc_user_role_list,
inc_contact=inc_contact,
inc_organization=inc_organization,
inc_person=inc_person,
).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
)
data = user_obj_li
else:
log.debug(user_obj_result)
return mk_resp(data=None, status_code=404, response=commons.response) # Not Found
log.debug(data)
return mk_resp(data=data, response=commons.response)
# Look up is only for account or person records
# Look up a user with a username for an account
@router.get('/user/lookup_username', response_model=Resp_Body_Base)
async def lookup_username(
username: str = Query(..., min_length=2, max_length=50),
null_account_id: bool = False,
inc_address: bool = False,
inc_contact: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
inc_user_role_list: bool = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
account_id = commons.x_account_id
data = {}
data['account_id'] = account_id
data['username'] = username
log.debug(data)
if not null_account_id:
sql = f"""
SELECT id AS 'user_id', id_random AS 'user_id_random'
FROM `user` AS `user`
WHERE `user`.account_id = :account_id AND `user`.username = :username
"""
else:
sql = f"""
SELECT id AS 'user_id', id_random AS 'user_id_random'
FROM `user` AS `user`
WHERE `user`.account_id IS NULL AND `user`.username = :username
"""
log.debug(sql)
# This will return a list if selecting by account ID
user_obj_result = sql_select(data=data, sql=sql)
if isinstance(user_obj_result, dict):
user_id = user_obj_result.get('user_id', None)
user_obj = load_user_obj(
user_id = user_id,
inc_address = inc_address,
inc_contact = inc_contact,
inc_person = inc_person,
inc_user_role_list = inc_user_role_list,
).dict(by_alias = commons.by_alias, exclude_unset=commons.exclude_unset)
data = user_obj
elif isinstance(user_obj_result, list):
user_obj_li = []
for user_obj in user_obj_result:
user_id = user_obj.get('user_id', None)
user_obj_li.append(
load_user_obj(
user_id = user_id,
inc_address = inc_address,
inc_contact = inc_contact,
inc_person = inc_person,
inc_user_role_list = inc_user_role_list,
).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
)
data = user_obj_li
else:
log.debug(user_obj_result)
return mk_resp(data=None, status_code=404, response=commons.response) # Not Found
return mk_resp(data=data, response=commons.response)
# ### BEGIN ### API User ### email_auth_key_url() ###
# This requires the user_id and root_url or base_url.
# This endpoint will generate a new user auth_key and send the email to the user's email address.
# Updated 2025-04-08
# @router.get('/user/email_auth_key_url', response_model=Resp_Body_Base)
@router.get('/user/{user_id}/email_auth_key_url', response_model=Resp_Body_Base)
async def email_auth_key_url(
user_id: str = Path(min_length=11, max_length=22),
root_url: Optional[str] = Query(None, min_length=10, max_length=100), # Absolute min = 7
key_param_name: str = Query('auth_key', min_length=2, max_length=10),
return_obj: bool = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
account_id = commons.x_account_id
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else: return mk_resp(data=False, status_code=404, response=commons.response) # Not Found
if result := email_user_auth_key_url(
account_id = account_id,
user_id = user_id,
root_url = root_url,
key_param_name = key_param_name,
):
log.info('Email with auth key log in URL was sent.')
return mk_resp(data=True, response=commons.response)
else:
log.warning('Email with auth key log in URL was not sent.')
return mk_resp(data=False, status_code=500, response=commons.response)
# ### END ### API User ### email_auth_key_url() ###
# ### BEGIN ### API User ### get_user_obj() ###
# Updated 2022-01-05
@router.get('/user/{user_id}', response_model=Resp_Body_Base)
async def get_user_obj(
user_id: str = Path(min_length=11, max_length=22),
inc_address: bool = False, # Priority l1
# inc_archive_list: bool = False, # Priority l3
inc_contact: bool = False, # Priority l1
inc_event_list: bool = False, # Priority l1
# inc_hosted_file_list: bool = False, # Priority l3
inc_journal_list: bool = False, # Priority l2
# inc_journal_entry_list: bool = False, # Priority l3
inc_membership_person: bool = False, # Priority l2
# inc_membership_list: bool = False, # ???
inc_order_line_list: bool = False, # Priority l1
inc_order_list: bool = False, # Priority l1
inc_order_cart_list: bool = False, # Priority l1
inc_organization: bool = False, # Priority l1
# inc_organization_list: bool = False,
inc_person: bool = False, # Priority l1
# inc_person_list: bool = False,
inc_post_list: bool = False, # Priority l2
inc_post_comment_list: bool = False, # Priority l3
inc_user_role_list: bool = False, # Priority l1
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else: return mk_resp(data=None, status_code=404, response=commons.response)
if user_result := load_user_obj(
user_id = user_id,
limit = commons.limit,
model_as_dict = True, # NOTE: returning model as a dict
enabled = commons.enabled,
inc_address = inc_address,
# inc_archive_list = inc_archive_list,
inc_contact = inc_contact,
inc_event_list = inc_event_list,
# inc_hosted_file_list = inc_hosted_file_list,
# inc_journal_list = inc_journal_list,
# inc_journal_entry_list = inc_journal_entry_list,
# inc_membership_person = inc_membership_person,
# inc_membership_list = inc_membership_list, # ???
inc_order_line_list = inc_order_line_list,
inc_order_list = inc_order_list,
inc_order_cart_list = inc_order_cart_list,
# inc_organization = inc_organization,
# inc_organization_list = inc_organization_list,
inc_person = inc_person,
# inc_person_list = inc_person_list,
# inc_post_list = inc_post_list,
# inc_post_comment_list = inc_post_comment_list,
inc_user_role_list = inc_user_role_list,
):
response_data = user_result
else:
return mk_resp(data=False, status_code=400, response=commons.response) # Bad Request
return mk_resp(data=response_data, response=commons.response)
# ### END ### API User ### get_user_obj() ###
# # ### BEGIN ### API User ### get_user_obj_order_list() ###
# # Deprecated 2021-11-19
# @router.get('/user/{user_id}/order_list', response_model=Resp_Body_Base)
# async def get_user_obj_order_list(
# user_id: str = Path(min_length=11, max_length=22),
#
#
# from_datetime: datetime.datetime = None,
# to_datetime: datetime.datetime = None,
# # inc_address: bool = False,
# # inc_contact: bool = False,
# inc_order_cfg: bool = False,
# inc_order_line_list: bool = False,
# status: str = 'complete',
# # inc_person: bool = False,
# # inc_order_list: bool = False,
# # inc_order_cart_list: bool = False,
#
# by_alias: Optional[bool] = True,
# exclude_unset: Optional[bool] = True,
# response: Response = Response,
# ):
# log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(locals())
# log.warning('Deprecated 2021-11-19')
# if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
# else:
# return mk_resp(data=None, status_code=404, response=commons.response)
# # Updated 2021-12-13
# if order_rec_list_result := get_order_rec_list(
# for_obj_type = 'user',
# for_obj_id = user_id,
# limit = commons.limit,
# enabled = commons.enabled,
# from_datetime = from_datetime,
# to_datetime = to_datetime,
# status = status,
# ):
# order_result_list = []
# for order_rec in order_rec_list_result:
# if load_order_result := load_order_obj(
# order_id = order_rec.get('order_id', None),
# limit = commons.limit,
# enabled = commons.enabled,
# by_alias = commons.by_alias,
# exclude_unset = commons.exclude_unset,
# # model_as_dict = model_as_dict,
# inc_order_cfg = inc_order_cfg,
# inc_order_line_list = inc_order_line_list,
# ):
# order_result_list.append(load_order_result)
# else:
# order_result_list.append(None)
# response_data = order_result_list
# elif event_location_rec_list_result is None:
# log.info('No results')
# return mk_resp(data=None, status_code=404, response=commons.response) # Not Found
# else:
# log.warning('Likely bad request')
# return mk_resp(data=False, status_code=400, response=commons.response) # Bad Request
# return mk_resp(data=response_data, response=commons.response)
# # ### END ### API User ### get_user_obj_order_list() ###
@router.delete('/user/{obj_id}', response_model=Resp_Body_Base)
async def delete_user_obj(
obj_id: str = Path(min_length=11, max_length=22),
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
result = delete_obj_template(
obj_type=obj_type,
obj_id=obj_id,
)
return result