Files
OSIT-AE-API-FastAPI/app/routers/user.py
2021-08-25 11:16:02 -04:00

727 lines
29 KiB
Python

import datetime, pytz, secrets, time
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, Response, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from app.lib_general import log, logging, secure_hash_string, verify_secure_hash_string
from app.config import settings
from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, get_id_random,redis_lookup_id_random
from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template
from app.methods.order_methods import get_order_rec_list, load_order_obj
from app.methods.user_methods import create_user_obj, load_user_obj
from app.models.common_field_schema import default_num_bytes
from app.models.response_models import Resp_Body_Base, mk_resp
from app.models.user_models import User_Base, User_New_Base, User_Out_Base
router = APIRouter()
@router.post('', response_model=Resp_Body_Base)
async def post_user_obj(
obj: User_Base,
x_account_id: str = Header(...),
return_obj: Optional[bool] = True,
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
obj_data_dict = obj.dict(by_alias=False, exclude_unset=True)
result = post_obj_template(
obj_type=obj_type,
data=obj_data_dict,
return_obj=True,
by_alias=True,
exclude_unset=True,
)
return result
# ### BEGIN ### API User ### post_user_obj_new() ###
# Updated 2021-08-21 (complete re-write)
@router.post('/new', response_model=Resp_Body_Base)
async def post_user_obj_new(
user_obj: User_New_Base,
allow_update: bool = False,
avoid_dup_username: bool = False,
x_account_id: str = Header(...),
return_obj: bool = True,
by_alias: bool = True,
exclude_unset: bool = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id_random := user_obj.account_id_random: pass
else: return False
if create_user_obj_result := create_user_obj(account_id=account_id_random, user_obj_new=user_obj, allow_update=allow_update, avoid_dup_username=avoid_dup_username): pass
else: return mk_resp(data=False, status_code=400, response=response, status_message='The user account was not created. This is likely because that username already exists for this account.')
if isinstance(create_user_obj_result, int):
user_id = create_user_obj_result
if return_obj:
if load_user_obj_result := load_user_obj(user_id=user_id):
data = load_user_obj_result
else:
data = False
else:
user_id = create_user_obj_result
user_id_random = get_id_random(record_id=user_id, table_name='user')
data = {}
data['user_id'] = user_id
data['user_id_random'] = user_id_random
return mk_resp(data=data, response=response, status_message='The user account was created.')
else:
return mk_resp(data=False, status_code=400, response=response, status_message='The result from trying to create a user account was unexpected.')
# ### END ### API User ### post_user_obj_new() ###
@router.patch('/change_password/{user_id}', response_model=Resp_Body_Base)
async def change_user_obj_password(
user_id: Union[int,str],
password: Optional[str] = Query(None, min_length=6, max_length=50),
x_account_id: Optional[str] = Header(..., ),
return_obj: bool = False,
inc_user_role_list: bool = False,
inc_contact: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
by_alias: bool = True,
exclude_unset: bool = True,
response: Response = Response,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if password and len(password) >= 10: pass
else:
log.warning('The password given must be at least 10 characters. Generating a new random password.')
password = secrets.token_urlsafe(default_num_bytes)
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else: return mk_resp(data=False, status_code=404, response=response) # Not Found
user_data = {}
#user_data['user_id'] = user_id
#user_data['username'] = username #????
user_data['password'] = secure_hash_string(string=password)
table_name = 'user'
user_rec_update_result = sql_update(data=user_data, table_name=table_name, record_id=user_id, id_random_length=None)
if return_obj:
user_obj = load_user_obj(
user_id=user_id,
inc_contact=inc_contact,
inc_organization=inc_organization,
inc_person=inc_person
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
data = user_obj
else:
data = True
return mk_resp(data=data)
#return mk_resp(data=None, status_code=501) # Not Implemented
@router.patch('/{obj_id}', response_model=Resp_Body_Base)
async def patch_user_obj(
obj: User_Base,
obj_id: str = Query(..., min_length=1, max_length=22),
x_account_id: Optional[str] = Header(..., ),
return_obj: Optional[bool] = True,
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
obj_data_dict = obj.dict(by_alias=False, exclude_unset=True)
obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type)
obj_data_dict['id_random'] = obj_id
result = patch_obj_template(
obj_type=obj_type,
data=obj_data_dict,
obj_id=obj_id,
return_obj=True,
by_alias=True,
exclude_unset=True,
)
return result
# ### BEGIN ### API User Routers ### user_new_auth_key() ###
# Generate a new one time use authorization key
@router.get('/new_auth_key', response_model=Resp_Body_Base)
async def user_new_auth_key(
user_id: Optional[str] = Query(None, min_length=2, max_length=50),
x_account_id: str = Header(...),
return_obj: Optional[bool] = False,
by_alias: bool = True,
exclude_unset: bool = True,
exclude_none: bool = True,
response: Response = Response,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
update_user_data = {}
update_user_data['id_random'] = user_id
update_user_data['auth_key'] = secrets.token_urlsafe(default_num_bytes)
if user_rec_update_result := sql_update(table_name='user', data=update_user_data):
log.info('The user record was updated with a new auth_key')
if return_obj:
user_obj = load_user_obj(
user_id=user_id,
inc_contact=False,
inc_organization=False,
inc_person=False
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
data = user_obj
else:
user_obj = {}
user_obj['auth_key'] = update_user_data['auth_key']
return mk_resp(data=user_obj)
else:
log.info('The user record was not updated with a new auth_key')
log.debug(user_rec_update_result)
return mk_resp(data=False, status_code=404, response=response)
# ### BEGIN ### API User Routers ### user_authenticate() ###
# Authenticate a username and password OR by authorization key
# An authorization key can only be done once. It will be deleted if found.
# A new key will need to be requested for a particular user each time.
@router.get('/authenticate', response_model=Resp_Body_Base)
async def user_authenticate(
account_id: Optional[Union[int,str]] = None,
username: Optional[str] = Query(None, min_length=2, max_length=50),
password: Optional[str] = Query(None, min_length=6, max_length=50),
auth_key: Optional[str] = Query(None, min_length=11, max_length=22),
x_account_id: str = Header(...),
inc_user_role_list: bool = False,
inc_contact: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
by_alias: bool = True,
exclude_unset: bool = True,
exclude_none: bool = True,
response: Response = Response,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id and username and password:
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: return mk_resp(data=False, status_code=404, response=response) # Not Found
user_data = {}
user_data['account_id'] = account_id
user_data['username'] = username
sql_select(table_name='user', data=user_data)
sql = f"""
SELECT `user`.id AS 'user_id', `user`.id_random AS 'user_id_random', `user`.password, `user`.enable, `user`.enable_from, `user`.enable_to
FROM `user` AS `user`
WHERE `user`.account_id = :account_id AND `user`.username = :username
LIMIT 1
"""
# This will return a list if selecting by account ID
if user_rec_result := sql_select(data=user_data, sql=sql):
user_id = user_rec_result.get('user_id', None)
if password_hash := user_rec_result.get('password', None):
if verify_secure_hash_string(string=password, string_hash=password_hash):
log.info('The username was found, and the password matched.')
#return mk_resp(data=False, status_message='The username was found, and the password matched.')
else:
log.info('The username was found, but the password did not match.')
return mk_resp(data=False, status_message='The username was found, but the password did not match.')
else:
log.error('The password has was not found. This should not happen.')
return mk_resp(data=False, status_message='The password has was not found. This should not happen.')
else: return mk_resp(data=None, status_code=404, status_message='The user account was not found')
elif auth_key:
if user_rec_result := sql_select(table_name='user', field_name='auth_key', field_value=auth_key):
update_user_data = {}
update_user_data['id'] = user_rec_result.get('id', None)
update_user_data['auth_key'] = None
if user_rec_update_result := sql_update(table_name='user', data=update_user_data):
log.info('The user record was updated with a NULL auth_key')
else:
log.info('The user record was not updated with a NULL auth_key')
log.debug(user_rec_update_result)
user_id = user_rec_result.get('id', None) # NOTE: This us looking for "id", not "user_id"
else: return mk_resp(data=None, status_code=404, status_message='A user account with that auth key was not found')
else:
return mk_resp(data=None, status_code=400, status_message='One more user account fields was missing or unexpected.') # Bad Request
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_rec_result)
if isinstance(user_rec_result, dict):
current_utc_datetime = datetime.datetime.now(datetime.timezone.utc)
log.debug(current_utc_datetime)
if user_rec_result.get('enable', None):
log.info('The user account is enabled')
else:
log.info('The user account is not enabled')
return mk_resp(data=False, status_message='This user account is not enabled')
#if user_enable_from := user_rec_result.get('enable_from', None).astimezone(pytz.UTC):
if user_enable_from := user_rec_result.get('enable_from', None).replace(tzinfo=datetime.timezone.utc):
log.debug(user_enable_from)
if user_enable_from <= current_utc_datetime:
log.info('Enable from datetime is valid')
else:
log.info('Enable from datetime is in the future. Please wait.')
return mk_resp(data=False, status_message='This account is not yet enabled')
#if user_enable_to := user_rec_result.get('enable_to', None).astimezone(pytz.UTC):
if user_enable_to := user_rec_result.get('enable_to', None).replace(tzinfo=datetime.timezone.utc):
log.debug(user_enable_to)
if user_enable_to >= current_utc_datetime:
log.info('Enable to datetime is valid')
else:
log.info('Enable to datetime is in the past. Your user account has been disabled.')
return mk_resp(data=False, status_message='This account is not enabled because the expiratation date has passed')
user_obj = load_user_obj(
user_id=user_id,
inc_user_role_list=inc_user_role_list,
inc_contact=inc_contact,
inc_organization=inc_organization,
inc_person=inc_person
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
data = user_obj
return mk_resp(data=user_obj)
else:
log.error('SQL result was unexpected. A dict result type was expected. This should not happen.')
return mk_resp(data=False, status_code=500, response=response)
# ### END ### API User Routers ### user_authenticate() ###
@router.get('/list', response_model=Resp_Body_Base)
async def get_user_obj_li(
for_obj_type: Optional[str] = Query(None, min_length=2, max_length=50),
for_obj_id: Optional[str] = Query(None, min_length=1, max_length=22),
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
result = get_obj_li_template(
obj_type=obj_type,
for_obj_type=for_obj_type,
for_obj_id=for_obj_id,
by_alias=True,
exclude_unset=True,
)
return result
# Look up is only for account or person records
@router.get('/lookup', response_model=Resp_Body_Base)
async def lookup_user_obj(
for_obj_id: Union[int,str],
for_obj_type: str = Query(..., min_length=2, max_length=50),
x_account_id: str = Header(...),
inc_user_role_list: bool = False,
inc_contact: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
by_alias: bool = True,
exclude_unset: bool = True,
response: Response = Response,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
base_name = User_Out_Base
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type): pass
else: return mk_resp(data=False, status_code=404, response=response) # Not Found
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
data = {}
as_list = False
if for_obj_type == 'account' and for_obj_id:
data['account_id'] = for_obj_id
sql_where_for_obj_type = """`user`.account_id = :account_id"""
sql_limit = ''
as_list = True
elif for_obj_type == 'person' and for_obj_id:
data['person_id'] = for_obj_id
sql_where_for_obj_type = """`user`.person_id = :person_id"""
sql_limit = 'LIMIT 1'
else:
log.debug(f'Object type={for_obj_type}; Object ID={for_obj_id}')
return mk_resp(data=False, status_code=400, response=response) # Bad Request
sql = f"""
SELECT id AS 'user_id', id_random AS 'user_id_random'
FROM `user` AS `user`
WHERE {sql_where_for_obj_type}
{sql_limit}
"""
# This will return a list if selecting by account ID
user_rec_result = sql_select(data=data, sql=sql, as_list=as_list)
if isinstance(user_rec_result, dict):
user_id = user_rec_result.get('user_id', None)
user_obj = load_user_obj(
user_id=user_id,
inc_user_role_list=inc_user_role_list,
inc_contact=inc_contact,
inc_organization=inc_organization,
inc_person=inc_person
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
data = user_obj
elif isinstance(user_rec_result, list):
user_obj_li = []
for user_obj in user_rec_result:
user_id = user_obj.get('user_id', None)
user_obj_li.append(
load_user_obj(
user_id=user_id,
inc_user_role_list=inc_user_role_list,
inc_contact=inc_contact,
inc_organization=inc_organization,
inc_person=inc_person,
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
)
data = user_obj_li
else:
log.debug(user_rec_result)
return mk_resp(data=None, status_code=404) # Not Found
return mk_resp(data=data)
# Look up a user with an email address for an account
@router.get('/lookup_email', response_model=Resp_Body_Base)
async def lookup_email(
account_id: Union[int,str],
email: str = Query(..., min_length=2, max_length=50),
x_account_id: str = Header(...),
inc_user_role_list: bool = False,
inc_contact: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
by_alias: bool = True,
exclude_unset: bool = True,
response: Response = Response,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id == '':
account_id = None
elif account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'):
pass
else:
return mk_resp(data=False, status_code=404, response=response) # Not Found
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
data = {}
data['account_id'] = account_id
data['email'] = email
log.debug(data)
if account_id:
sql = f"""
SELECT id AS 'user_id', id_random AS 'user_id_random'
FROM `user` AS `user`
WHERE `user`.account_id = :account_id AND `user`.email = :email
"""
else:
sql = f"""
SELECT id AS 'user_id', id_random AS 'user_id_random'
FROM `user` AS `user`
WHERE `user`.account_id IS NULL AND `user`.email = :email
"""
log.debug(sql)
# This will return a list if selecting by account ID
user_obj_result = sql_select(data=data, sql=sql)
if isinstance(user_obj_result, dict):
user_id = user_obj_result.get('user_id', None)
user_obj = load_user_obj(
user_id=user_id,
inc_user_role_list=inc_user_role_list,
inc_contact=inc_contact,
inc_organization=inc_organization,
inc_person=inc_person
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
data = user_obj
elif isinstance(user_obj_result, list):
user_obj_li = []
for user_obj in user_obj_result:
user_id = user_obj.get('user_id', None)
user_obj_li.append(
load_user_obj(
user_id=user_id,
inc_user_role_list=inc_user_role_list,
inc_contact=inc_contact,
inc_organization=inc_organization,
inc_person=inc_person,
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
)
data = user_obj_li
else:
log.debug(user_obj_result)
return mk_resp(data=None, status_code=404) # Not Found
return mk_resp(data=data)
# Look up is only for account or person records
# Look up a user with a username for an account
@router.get('/lookup_username', response_model=Resp_Body_Base)
async def lookup_username(
account_id: Union[int,str],
username: str = Query(..., min_length=2, max_length=50),
x_account_id: str = Header(...),
inc_address: bool = False,
inc_contact: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
inc_user_role_list: bool = False,
by_alias: bool = True,
exclude_unset: bool = True,
response: Response = Response,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id == '':
account_id = None
elif account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'):
pass
else:
return mk_resp(data=False, status_code=404, response=response) # Not Found
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
data = {}
data['account_id'] = account_id
data['username'] = username
log.debug(data)
if account_id:
sql = f"""
SELECT id AS 'user_id', id_random AS 'user_id_random'
FROM `user` AS `user`
WHERE `user`.account_id = :account_id AND `user`.username = :username
"""
else:
sql = f"""
SELECT id AS 'user_id', id_random AS 'user_id_random'
FROM `user` AS `user`
WHERE `user`.account_id IS NULL AND `user`.username = :username
"""
log.debug(sql)
# This will return a list if selecting by account ID
user_obj_result = sql_select(data=data, sql=sql)
if isinstance(user_obj_result, dict):
user_id = user_obj_result.get('user_id', None)
user_obj = load_user_obj(
user_id = user_id,
inc_address = inc_address,
inc_contact = inc_contact,
inc_organization = inc_organization,
inc_person = inc_person,
inc_user_role_list = inc_user_role_list,
).dict(by_alias = by_alias, exclude_unset=exclude_unset)
data = user_obj
elif isinstance(user_obj_result, list):
user_obj_li = []
for user_obj in user_obj_result:
user_id = user_obj.get('user_id', None)
user_obj_li.append(
load_user_obj(
user_id = user_id,
inc_address = inc_address,
inc_contact = inc_contact,
inc_organization = inc_organization,
inc_person = inc_person,
inc_user_role_list = inc_user_role_list,
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
)
data = user_obj_li
else:
log.debug(user_obj_result)
return mk_resp(data=None, status_code=404) # Not Found
return mk_resp(data=data)
# ### BEGIN ### API User ### get_user_obj() ###
# Working well as of 2021-06-25. Using as a template for other routes.
@router.get('/{user_id}', response_model=Resp_Body_Base)
async def get_user_obj(
user_id: str = Query(..., min_length=1, max_length=22),
limit: int = 500, # For now this covers any included objects or object lists
enabled: str = 'enabled', # For now this covers any included objects or object lists
inc_address: bool = False, # Priority l1
# inc_archive_list: bool = False, # Priority l3
inc_contact: bool = False, # Priority l1
inc_event_list: bool = False, # Priority l1
# inc_hosted_file_list: bool = False, # Priority l3
inc_journal_list: bool = False, # Priority l2
# inc_journal_entry_list: bool = False, # Priority l3
inc_membership_person: bool = False, # Priority l2
# inc_membership_list: bool = False, # ???
inc_order_line_list: bool = False, # Priority l1
inc_order_list: bool = False, # Priority l1
inc_order_cart_list: bool = False, # Priority l1
inc_organization: bool = False, # Priority l1
# inc_organization_list: bool = False,
inc_person: bool = False, # Priority l1
# inc_person_list: bool = False,
inc_post_list: bool = False, # Priority l2
inc_post_comment_list: bool = False, # Priority l3
inc_user_role_list: bool = False, # Priority l1
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else:
return mk_resp(data=None, status_code=404)
if user_result := load_user_obj(
user_id = user_id,
limit = limit,
model_as_dict = True, # NOTE: returning model as a dict
enabled = enabled,
inc_address = inc_address,
# inc_archive_list = inc_archive_list,
inc_contact = inc_contact,
inc_event_list = inc_event_list,
# inc_hosted_file_list = inc_hosted_file_list,
inc_journal_list = inc_journal_list,
# inc_journal_entry_list = inc_journal_entry_list,
inc_membership_person = inc_membership_person,
# inc_membership_list = inc_membership_list, # ???
inc_order_line_list = inc_order_line_list,
inc_order_list = inc_order_list,
inc_order_cart_list = inc_order_cart_list,
inc_organization = inc_organization,
# inc_organization_list = inc_organization_list,
inc_person = inc_person,
# inc_person_list = inc_person_list,
inc_post_list = inc_post_list,
# inc_post_comment_list = inc_post_comment_list,
inc_user_role_list = inc_user_role_list,
):
response_data = user_result
else:
return mk_resp(data=False, status_code=400, response=response) # Bad Request
return mk_resp(data=response_data)
# ### END ### API User ### get_user_obj() ###
# ### BEGIN ### API User ### get_user_obj_order_list() ###
# Working well as of 2021-06-28. Using as a template for other routes.
@router.get('/{user_id}/order_list', response_model=Resp_Body_Base)
async def get_user_obj_order_list(
user_id: str = Query(..., min_length=1, max_length=22),
limit: int = 500, # For now this covers any included objects or object lists
enabled: str = 'enabled', # For now this covers any included objects or object lists
from_datetime: datetime.datetime = None,
to_datetime: datetime.datetime = None,
# inc_address: bool = False,
# inc_contact: bool = False,
inc_order_cfg: bool = False,
inc_order_line_list: bool = False,
status: str = 'complete',
# inc_person: bool = False,
# inc_order_list: bool = False,
# inc_order_cart_list: bool = False,
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass
else:
return mk_resp(data=None, status_code=404)
# Updated 2021-06-28
if order_rec_list_result := get_order_rec_list(
for_obj_type = 'user',
for_obj_id = user_id,
limit = limit,
enabled = enabled,
from_datetime = from_datetime,
to_datetime = to_datetime,
status = status,
):
order_result_list = []
for order_rec in order_rec_list_result:
if load_order_result := load_order_obj(
order_id = order_rec.get('order_id', None),
limit = limit,
enabled = enabled,
by_alias = by_alias,
exclude_unset = exclude_unset,
# model_as_dict = model_as_dict,
inc_order_cfg = inc_order_cfg,
inc_order_line_list = inc_order_line_list,
):
order_result_list.append(load_order_result)
else:
order_result_list.append(None)
response_data = order_result_list
else:
return mk_resp(data=False, status_code=400, response=response) # Bad Request
return mk_resp(data=response_data)
# ### END ### API User ### get_user_obj_order_list() ###
@router.delete('/{obj_id}', response_model=Resp_Body_Base)
async def delete_user_obj(
obj_id: str = Query(..., min_length=1, max_length=22),
x_account_id: str = Header(...),
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
result = delete_obj_template(
obj_type=obj_type,
obj_id=obj_id,
)
return result