672 lines
26 KiB
Python
672 lines
26 KiB
Python
import datetime, pytz, time
|
|
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, status
|
|
from pydantic import BaseModel, EmailStr, Field
|
|
from typing import Dict, List, Optional, Set, Union
|
|
|
|
from app.lib_general import log, logging, secure_hash_string, verify_secure_hash_string
|
|
from app.config import settings
|
|
from app.db_sql import *
|
|
|
|
from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template
|
|
|
|
from app.methods.user_methods import load_user_obj
|
|
|
|
from app.models.common_field_schema import default_num_bytes
|
|
from app.models.response_models import *
|
|
from app.models.user_models import User_Base, User_New_Base, User_Out_Base
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
@router.post('', response_model=Resp_Body_Base)
|
|
async def post_user_obj(
|
|
obj: User_Base,
|
|
x_account_id: str = Header(...),
|
|
return_obj: Optional[bool] = True,
|
|
by_alias: Optional[bool] = True,
|
|
exclude_unset: Optional[bool] = True,
|
|
):
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
obj_type = 'user'
|
|
obj_data_dict = obj.dict(by_alias=False, exclude_unset=True)
|
|
result = post_obj_template(
|
|
obj_type=obj_type,
|
|
data=obj_data_dict,
|
|
return_obj=True,
|
|
by_alias=True,
|
|
exclude_unset=True,
|
|
)
|
|
return result
|
|
|
|
|
|
@router.post('/new', response_model=Resp_Body_Base)
|
|
async def post_user_new_obj(
|
|
user_obj: User_New_Base,
|
|
x_account_id: str = Header(...),
|
|
return_obj: bool = True,
|
|
by_alias: bool = True,
|
|
exclude_unset: bool = True,
|
|
):
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
user_data = user_obj.dict(by_alias=False, exclude_unset=False, exclude={'new_password', 'account_id_random'})
|
|
|
|
log.info('Checking if the username is already in use for the account...')
|
|
sql_select_user = f"""
|
|
SELECT *
|
|
FROM `user` AS user
|
|
WHERE user.account_id = :account_id and user.username = :username
|
|
"""
|
|
|
|
if sql_select_result := sql_select(sql=sql_select_user, data=user_data):
|
|
return mk_resp(data=False, status_message='The user account was not created. This is likely because of a duplicate username.')
|
|
|
|
log.info('Adding new user account...')
|
|
if sql_insert_result := sql_insert(table_name='user', data=user_data):
|
|
log.info('Selecting new user account to return as an object...')
|
|
sql_select_user_result = sql_select(table_name='v_user', record_id=sql_insert_result)
|
|
|
|
user_obj_new = User_Out_Base(**sql_select_user_result)
|
|
|
|
return mk_resp(data=user_obj_new.dict(by_alias=True, exclude_unset=True))
|
|
else:
|
|
return mk_resp(data=False, status_message='The user account was not created. Something seems to have gone wrong on insert.')
|
|
|
|
|
|
@router.patch('/change_password/{user_id}', response_model=Resp_Body_Base)
|
|
async def change_user_obj_password(
|
|
user_id: Union[int,str],
|
|
password: Optional[str] = Query(None, min_length=6, max_length=50),
|
|
x_account_id: Optional[str] = Header(..., ),
|
|
return_obj: bool = False,
|
|
inc_roles: bool = False,
|
|
inc_contact: bool = False,
|
|
inc_organization: bool = False,
|
|
inc_person: bool = False,
|
|
by_alias: bool = True,
|
|
exclude_unset: bool = True,
|
|
):
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if password and len(password) >= 10: pass
|
|
else:
|
|
log.warning('The password given must be at least 10 characters. Generating a new random password.')
|
|
password = secrets.token_urlsafe(default_num_bytes)
|
|
|
|
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
|
|
else: return mk_resp(data=False, status_code=404) # Not Found
|
|
|
|
user_data = {}
|
|
#user_data['user_id'] = user_id
|
|
#user_data['username'] = username #????
|
|
user_data['password'] = secure_hash_string(string=password)
|
|
|
|
table_name = 'user'
|
|
user_rec_update_result = sql_update(data=user_data, table_name=table_name, record_id=user_id, id_random_length=None)
|
|
|
|
if return_obj:
|
|
user_obj = load_user_obj(
|
|
user_id=user_id,
|
|
inc_contact=inc_contact,
|
|
inc_organization=inc_organization,
|
|
inc_person=inc_person
|
|
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
|
data = user_obj
|
|
else:
|
|
data = True
|
|
return mk_resp(data=data)
|
|
#return mk_resp(data=None, status_code=501) # Not Implemented
|
|
|
|
|
|
@router.patch('/{obj_id}', response_model=Resp_Body_Base)
|
|
async def patch_user_obj(
|
|
obj: User_Base,
|
|
obj_id: str = Query(..., min_length=1, max_length=22),
|
|
x_account_id: Optional[str] = Header(..., ),
|
|
return_obj: Optional[bool] = True,
|
|
by_alias: Optional[bool] = True,
|
|
exclude_unset: Optional[bool] = True,
|
|
):
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
obj_type = 'user'
|
|
obj_data_dict = obj.dict(by_alias=False, exclude_unset=True)
|
|
obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type)
|
|
obj_data_dict['id_random'] = obj_id
|
|
result = patch_obj_template(
|
|
obj_type=obj_type,
|
|
data=obj_data_dict,
|
|
obj_id=obj_id,
|
|
return_obj=True,
|
|
by_alias=True,
|
|
exclude_unset=True,
|
|
)
|
|
return result
|
|
|
|
|
|
# ### BEGIN ### API User Routers ### user_new_auth_key() ###
|
|
# Generate a new one time use authorization key
|
|
@router.get('/new_auth_key', response_model=Resp_Body_Base)
|
|
async def user_new_auth_key(
|
|
user_id: Optional[str] = Query(None, min_length=2, max_length=50),
|
|
x_account_id: str = Header(...),
|
|
return_obj: Optional[bool] = False,
|
|
by_alias: bool = True,
|
|
exclude_unset: bool = True,
|
|
exclude_none: bool = True,
|
|
):
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
update_user_data = {}
|
|
update_user_data['id_random'] = user_id
|
|
update_user_data['auth_key'] = secrets.token_urlsafe(default_num_bytes)
|
|
|
|
if user_rec_update_result := sql_update(table_name='user', data=update_user_data):
|
|
log.info('The user record was updated with a new auth_key')
|
|
|
|
if return_obj:
|
|
user_obj = load_user_obj(
|
|
user_id=user_id,
|
|
inc_contact=False,
|
|
inc_organization=False,
|
|
inc_person=False
|
|
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
|
data = user_obj
|
|
else:
|
|
user_obj = {}
|
|
user_obj['auth_key'] = update_user_data['auth_key']
|
|
return mk_resp(data=user_obj)
|
|
else:
|
|
log.info('The user record was not updated with a new auth_key')
|
|
log.debug(user_rec_update_result)
|
|
|
|
return mk_resp(data=False, status_code=404)
|
|
|
|
|
|
# ### BEGIN ### API User Routers ### user_authenticate() ###
|
|
# Authenticate a username and password OR by authorization key
|
|
# An authorization key can only be done once. It will be deleted if found.
|
|
# A new key will need to be requested for a particular user each time.
|
|
@router.get('/authenticate', response_model=Resp_Body_Base)
|
|
async def user_authenticate(
|
|
account_id: Optional[Union[int,str]] = None,
|
|
username: Optional[str] = Query(None, min_length=2, max_length=50),
|
|
password: Optional[str] = Query(None, min_length=6, max_length=50),
|
|
auth_key: Optional[str] = Query(None, min_length=11, max_length=22),
|
|
x_account_id: str = Header(...),
|
|
inc_roles: bool = False,
|
|
inc_contact: bool = False,
|
|
inc_organization: bool = False,
|
|
inc_person: bool = False,
|
|
by_alias: bool = True,
|
|
exclude_unset: bool = True,
|
|
exclude_none: bool = True,
|
|
):
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if account_id and username and password:
|
|
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
|
|
else: return mk_resp(data=False, status_code=404) # Not Found
|
|
|
|
user_data = {}
|
|
user_data['account_id'] = account_id
|
|
user_data['username'] = username
|
|
|
|
sql_select(table_name='user', data=user_data)
|
|
|
|
sql = f"""
|
|
SELECT `user`.id AS 'user_id', `user`.id_random AS 'user_id_random', `user`.password, `user`.enable, `user`.enable_from, `user`.enable_to
|
|
FROM `user` AS `user`
|
|
WHERE `user`.account_id = :account_id AND `user`.username = :username
|
|
LIMIT 1
|
|
"""
|
|
|
|
# This will return a list if selecting by account ID
|
|
if user_rec_result := sql_select(data=user_data, sql=sql):
|
|
user_id = user_rec_result.get('user_id', None)
|
|
|
|
if password_hash := user_rec_result.get('password', None):
|
|
if verify_secure_hash_string(string=password, string_hash=password_hash):
|
|
log.info('The username was found, and the password matched.')
|
|
#return mk_resp(data=False, status_message='The username was found, and the password matched.')
|
|
else:
|
|
log.info('The username was found, but the password did not match.')
|
|
return mk_resp(data=False, status_message='The username was found, but the password did not match.')
|
|
else:
|
|
log.error('The password has was not found. This should not happen.')
|
|
return mk_resp(data=False, status_message='The password has was not found. This should not happen.')
|
|
|
|
else: return mk_resp(data=None, status_code=404, status_message='The user account was not found')
|
|
elif auth_key:
|
|
if user_rec_result := sql_select(table_name='user', field_name='auth_key', field_value=auth_key):
|
|
update_user_data = {}
|
|
update_user_data['id'] = user_rec_result.get('id', None)
|
|
update_user_data['auth_key'] = None
|
|
|
|
if user_rec_update_result := sql_update(table_name='user', data=update_user_data):
|
|
log.info('The user record was updated with a NULL auth_key')
|
|
else:
|
|
log.info('The user record was not updated with a NULL auth_key')
|
|
log.debug(user_rec_update_result)
|
|
|
|
user_id = user_rec_result.get('id', None) # NOTE: This us looking for "id", not "user_id"
|
|
else: return mk_resp(data=None, status_code=404, status_message='A user account with that auth key was not found')
|
|
else:
|
|
return mk_resp(data=None, status_code=400, status_message='One more user account fields was missing or unexpected.') # Bad Request
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(user_rec_result)
|
|
|
|
if isinstance(user_rec_result, dict):
|
|
|
|
current_utc_datetime = datetime.datetime.now(datetime.timezone.utc)
|
|
log.debug(current_utc_datetime)
|
|
|
|
if user_rec_result.get('enable', None):
|
|
log.info('The user account is enabled')
|
|
else:
|
|
log.info('The user account is not enabled')
|
|
return mk_resp(data=False, status_message='This user account is not enabled')
|
|
|
|
#if user_enable_from := user_rec_result.get('enable_from', None).astimezone(pytz.UTC):
|
|
if user_enable_from := user_rec_result.get('enable_from', None).replace(tzinfo=datetime.timezone.utc):
|
|
log.debug(user_enable_from)
|
|
if user_enable_from <= current_utc_datetime:
|
|
log.info('Enable from datetime is valid')
|
|
else:
|
|
log.info('Enable from datetime is in the future. Please wait.')
|
|
return mk_resp(data=False, status_message='This account is not yet enabled')
|
|
|
|
#if user_enable_to := user_rec_result.get('enable_to', None).astimezone(pytz.UTC):
|
|
if user_enable_to := user_rec_result.get('enable_to', None).replace(tzinfo=datetime.timezone.utc):
|
|
log.debug(user_enable_to)
|
|
if user_enable_to >= current_utc_datetime:
|
|
log.info('Enable to datetime is valid')
|
|
else:
|
|
log.info('Enable to datetime is in the past. Your user account has been disabled.')
|
|
return mk_resp(data=False, status_message='This account is not enabled because the expiratation date has passed')
|
|
|
|
user_obj = load_user_obj(
|
|
user_id=user_id,
|
|
inc_roles=inc_roles,
|
|
inc_contact=inc_contact,
|
|
inc_organization=inc_organization,
|
|
inc_person=inc_person
|
|
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
|
data = user_obj
|
|
return mk_resp(data=user_obj)
|
|
else:
|
|
log.error('SQL result was unexpected. A dict result type was expected. This should not happen.')
|
|
return mk_resp(data=False, status_code=500)
|
|
# ### END ### API User Routers ### user_authenticate() ###
|
|
|
|
|
|
@router.get('/list', response_model=Resp_Body_Base)
|
|
async def get_user_obj_li(
|
|
for_obj_type: Optional[str] = Query(None, min_length=2, max_length=50),
|
|
for_obj_id: Optional[str] = Query(None, min_length=1, max_length=22),
|
|
x_account_id: str = Header(...),
|
|
by_alias: Optional[bool] = True,
|
|
exclude_unset: Optional[bool] = True,
|
|
):
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
obj_type = 'user'
|
|
result = get_obj_li_template(
|
|
obj_type=obj_type,
|
|
for_obj_type=for_obj_type,
|
|
for_obj_id=for_obj_id,
|
|
by_alias=True,
|
|
exclude_unset=True,
|
|
)
|
|
return result
|
|
|
|
|
|
# Look up is only for account or person records
|
|
@router.get('/lookup', response_model=Resp_Body_Base)
|
|
async def lookup_user_obj(
|
|
for_obj_id: Union[int,str],
|
|
for_obj_type: str = Query(..., min_length=2, max_length=50),
|
|
x_account_id: str = Header(...),
|
|
inc_roles: bool = False,
|
|
inc_contact: bool = False,
|
|
inc_organization: bool = False,
|
|
inc_person: bool = False,
|
|
by_alias: bool = True,
|
|
exclude_unset: bool = True,
|
|
):
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
obj_type = 'user'
|
|
base_name = User_Out_Base
|
|
|
|
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type): pass
|
|
else: return mk_resp(data=False, status_code=404) # Not Found
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
|
|
data = {}
|
|
as_list = False
|
|
if for_obj_type == 'account' and for_obj_id:
|
|
data['account_id'] = for_obj_id
|
|
sql_where_for_obj_type = """`user`.account_id = :account_id"""
|
|
sql_limit = ''
|
|
as_list = True
|
|
elif for_obj_type == 'person' and for_obj_id:
|
|
data['person_id'] = for_obj_id
|
|
sql_where_for_obj_type = """`user`.person_id = :person_id"""
|
|
sql_limit = 'LIMIT 1'
|
|
else:
|
|
log.debug(f'Object type={for_obj_type}; Object ID={for_obj_id}')
|
|
return mk_resp(data=False, status_code=400) # Bad Request
|
|
|
|
sql = f"""
|
|
SELECT id AS 'user_id', id_random AS 'user_id_random'
|
|
FROM `user` AS `user`
|
|
WHERE {sql_where_for_obj_type}
|
|
{sql_limit}
|
|
"""
|
|
|
|
# This will return a list if selecting by account ID
|
|
user_rec_result = sql_select(data=data, sql=sql, as_list=as_list)
|
|
if isinstance(user_rec_result, dict):
|
|
user_id = user_rec_result.get('user_id', None)
|
|
user_obj = load_user_obj(
|
|
user_id=user_id,
|
|
inc_roles=inc_roles,
|
|
inc_contact=inc_contact,
|
|
inc_organization=inc_organization,
|
|
inc_person=inc_person
|
|
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
|
data = user_obj
|
|
elif isinstance(user_rec_result, list):
|
|
user_obj_li = []
|
|
for user_obj in user_rec_result:
|
|
user_id = user_obj.get('user_id', None)
|
|
user_obj_li.append(
|
|
load_user_obj(
|
|
user_id=user_id,
|
|
inc_roles=inc_roles,
|
|
inc_contact=inc_contact,
|
|
inc_organization=inc_organization,
|
|
inc_person=inc_person,
|
|
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
|
)
|
|
data = user_obj_li
|
|
else:
|
|
log.debug(user_rec_result)
|
|
return mk_resp(data=None, status_code=404) # Not Found
|
|
return mk_resp(data=data)
|
|
|
|
|
|
# Look up a user with an email addresss for an account
|
|
@router.get('/lookup_email', response_model=Resp_Body_Base)
|
|
async def lookup_email(
|
|
account_id: Union[int,str],
|
|
email: str = Query(..., min_length=2, max_length=50),
|
|
x_account_id: str = Header(...),
|
|
inc_roles: bool = False,
|
|
inc_contact: bool = False,
|
|
inc_organization: bool = False,
|
|
inc_person: bool = False,
|
|
by_alias: bool = True,
|
|
exclude_unset: bool = True,
|
|
):
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if account_id == '':
|
|
account_id = None
|
|
elif account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'):
|
|
pass
|
|
else:
|
|
return mk_resp(data=False, status_code=404) # Not Found
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
|
|
data = {}
|
|
data['account_id'] = account_id
|
|
data['email'] = email
|
|
log.debug(data)
|
|
|
|
if account_id:
|
|
sql = f"""
|
|
SELECT id AS 'user_id', id_random AS 'user_id_random'
|
|
FROM `user` AS `user`
|
|
WHERE `user`.account_id = :account_id AND `user`.email = :email
|
|
"""
|
|
else:
|
|
sql = f"""
|
|
SELECT id AS 'user_id', id_random AS 'user_id_random'
|
|
FROM `user` AS `user`
|
|
WHERE `user`.account_id IS NULL AND `user`.email = :email
|
|
"""
|
|
log.debug(sql)
|
|
|
|
# This will return a list if selecting by account ID
|
|
user_obj_result = sql_select(data=data, sql=sql)
|
|
if isinstance(user_obj_result, dict):
|
|
user_id = user_obj_result.get('user_id', None)
|
|
user_obj = load_user_obj(
|
|
user_id=user_id,
|
|
inc_roles=inc_roles,
|
|
inc_contact=inc_contact,
|
|
inc_organization=inc_organization,
|
|
inc_person=inc_person
|
|
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
|
data = user_obj
|
|
elif isinstance(user_obj_result, list):
|
|
user_obj_li = []
|
|
for user_obj in user_obj_result:
|
|
user_id = user_obj.get('user_id', None)
|
|
user_obj_li.append(
|
|
load_user_obj(
|
|
user_id=user_id,
|
|
inc_roles=inc_roles,
|
|
inc_contact=inc_contact,
|
|
inc_organization=inc_organization,
|
|
inc_person=inc_person,
|
|
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
|
)
|
|
data = user_obj_li
|
|
else:
|
|
log.debug(user_obj_result)
|
|
return mk_resp(data=None, status_code=404) # Not Found
|
|
return mk_resp(data=data)
|
|
|
|
|
|
# Look up is only for account or person records
|
|
# Look up a user with a username for an account
|
|
@router.get('/lookup_username', response_model=Resp_Body_Base)
|
|
async def lookup_username(
|
|
account_id: Union[int,str],
|
|
username: str = Query(..., min_length=2, max_length=50),
|
|
x_account_id: str = Header(...),
|
|
inc_roles: bool = False,
|
|
inc_contact: bool = False,
|
|
inc_organization: bool = False,
|
|
inc_person: bool = False,
|
|
by_alias: bool = True,
|
|
exclude_unset: bool = True,
|
|
):
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if account_id == '':
|
|
account_id = None
|
|
elif account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'):
|
|
pass
|
|
else:
|
|
return mk_resp(data=False, status_code=404) # Not Found
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
|
|
data = {}
|
|
data['account_id'] = account_id
|
|
data['username'] = username
|
|
log.debug(data)
|
|
|
|
if account_id:
|
|
sql = f"""
|
|
SELECT id AS 'user_id', id_random AS 'user_id_random'
|
|
FROM `user` AS `user`
|
|
WHERE `user`.account_id = :account_id AND `user`.username = :username
|
|
"""
|
|
else:
|
|
sql = f"""
|
|
SELECT id AS 'user_id', id_random AS 'user_id_random'
|
|
FROM `user` AS `user`
|
|
WHERE `user`.account_id IS NULL AND `user`.username = :username
|
|
"""
|
|
log.debug(sql)
|
|
|
|
# This will return a list if selecting by account ID
|
|
user_obj_result = sql_select(data=data, sql=sql)
|
|
if isinstance(user_obj_result, dict):
|
|
user_id = user_obj_result.get('user_id', None)
|
|
user_obj = load_user_obj(
|
|
user_id=user_id,
|
|
inc_roles=inc_roles,
|
|
inc_contact=inc_contact,
|
|
inc_organization=inc_organization,
|
|
inc_person=inc_person
|
|
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
|
data = user_obj
|
|
elif isinstance(user_obj_result, list):
|
|
user_obj_li = []
|
|
for user_obj in user_obj_result:
|
|
user_id = user_obj.get('user_id', None)
|
|
user_obj_li.append(
|
|
load_user_obj(
|
|
user_id=user_id,
|
|
inc_roles=inc_roles,
|
|
inc_contact=inc_contact,
|
|
inc_organization=inc_organization,
|
|
inc_person=inc_person,
|
|
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
|
)
|
|
data = user_obj_li
|
|
else:
|
|
log.debug(user_obj_result)
|
|
return mk_resp(data=None, status_code=404) # Not Found
|
|
return mk_resp(data=data)
|
|
|
|
|
|
|
|
# ### BEGIN ### API User ### get_user_obj_new() ###
|
|
# Working well as of 2021-06-11. Using as a template for other routes.
|
|
@router.get('/{user_id}/json', response_model=Resp_Body_Base)
|
|
async def get_user_obj_new(
|
|
user_id: str = Query(..., min_length=1, max_length=22),
|
|
limit: int = 500, # For now this covers any included objects or object lists
|
|
enabled: str = 'enabled', # For now this covers any included objects or object lists
|
|
inc_address: bool = False, # Priority l1
|
|
# inc_archive_list: bool = False, # Priority l3
|
|
inc_contact: bool = False, # Priority l1
|
|
inc_event_list: bool = False, # Priority l1
|
|
# inc_hosted_file_list: bool = False, # Priority l3
|
|
inc_journal_list: bool = False, # Priority l2
|
|
# inc_journal_entry_list: bool = False, # Priority l3
|
|
inc_membership: bool = False, # Priority l2
|
|
# inc_membership_list: bool = False, # ???
|
|
inc_order_list: bool = False, # Priority l1
|
|
inc_order_cart_list: bool = False, # Priority l1
|
|
inc_organization: bool = False, # Priority l1
|
|
# inc_organization_list: bool = False,
|
|
inc_person: bool = False, # Priority l1
|
|
# inc_person_list: bool = False,
|
|
inc_post_list: bool = False, # Priority l2
|
|
# inc_post_comment_list: bool = False, # Priority l3
|
|
inc_user_role_list: bool = False, # Priority l1
|
|
x_account_id: str = Header(...),
|
|
by_alias: Optional[bool] = True,
|
|
exclude_unset: Optional[bool] = True,
|
|
):
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
|
|
else:
|
|
return mk_resp(data=None, status_code=404)
|
|
|
|
if user_dict := load_user_obj(
|
|
user_id = user_id,
|
|
limit = limit,
|
|
model_as_dict = True, # NOTE: returning model as a dict
|
|
enabled = enabled,
|
|
inc_address = inc_address,
|
|
# inc_archive_list = inc_archive_list,
|
|
inc_contact = inc_contact,
|
|
inc_event_list = inc_event_list,
|
|
# inc_hosted_file_list = inc_hosted_file_list,
|
|
inc_journal_list = inc_journal_list,
|
|
# inc_journal_entry_list = inc_journal_entry_list,
|
|
inc_membership = inc_membership,
|
|
# inc_membership_list = inc_membership_list, # ???
|
|
inc_order_list = inc_order_list,
|
|
inc_order_cart_list = inc_order_cart_list,
|
|
inc_organization = inc_organization,
|
|
# inc_organization_list = inc_organization_list,
|
|
inc_person = inc_person,
|
|
# inc_person_list = inc_person_list,
|
|
inc_post_list = inc_post_list,
|
|
# inc_post_comment_list = inc_post_comment_list,
|
|
inc_user_role_list = inc_user_role_list,
|
|
):
|
|
if isinstance(user_dict, dict):
|
|
response_data = user_dict
|
|
else:
|
|
response_data = user_dict
|
|
else:
|
|
return mk_resp(data=False, status_code=400) # Bad Request
|
|
|
|
return mk_resp(data=response_data)
|
|
# ### END ### API User ### get_user_obj_new() ###
|
|
|
|
|
|
|
|
@router.get('/{user_id}', response_model=Resp_Body_Base)
|
|
async def get_user_obj(
|
|
user_id: str = Query(..., min_length=1, max_length=22),
|
|
x_account_id: str = Header(...),
|
|
inc_roles: bool = False,
|
|
inc_contact: bool = False,
|
|
inc_organization: bool = False,
|
|
inc_person: bool = False,
|
|
by_alias: bool = True,
|
|
exclude_unset: bool = True,
|
|
):
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
user_obj = load_user_obj(
|
|
user_id=user_id,
|
|
inc_roles=inc_roles,
|
|
inc_contact=inc_contact,
|
|
inc_organization=inc_organization,
|
|
inc_person=inc_person
|
|
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
|
data = user_obj
|
|
return mk_resp(data=data)
|
|
|
|
|
|
@router.delete('/{obj_id}', response_model=Resp_Body_Base)
|
|
async def delete_user_obj(
|
|
obj_id: str = Query(..., min_length=1, max_length=22),
|
|
x_account_id: str = Header(...),
|
|
):
|
|
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
obj_type = 'user'
|
|
result = delete_obj_template(
|
|
obj_type=obj_type,
|
|
obj_id=obj_id,
|
|
)
|
|
return result |