Files
OSIT-AE-API-FastAPI/app/routers/user.py
2021-03-18 22:34:35 +00:00

363 lines
13 KiB
Python

import datetime, pytz, time
#from datetime import datetime, time, timedelta
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from ..lib_general import *
from ..log import *
from app.config import settings
from app.db_sql import *
from .api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template
from ..models.user_model import User_Base, User_New_Base, User_Out_Base
from ..models.user_methods import load_user_obj
from ..models.response_model import *
router = APIRouter()
@router.post('', response_model=Resp_Body_Base)
async def post_user_obj(
obj: User_Base,
x_account_id: str = Header(...),
return_obj: Optional[bool] = True,
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
obj_data_dict = obj.dict(by_alias=False, exclude_unset=True)
result = post_obj_template(
obj_type=obj_type,
data=obj_data_dict,
return_obj=True,
by_alias=True,
exclude_unset=True,
)
return result
@router.post('/new', response_model=Resp_Body_Base)
async def post_user_new_obj(
user_obj: User_New_Base,
x_account_id: str = Header(...),
return_obj: bool = True,
by_alias: bool = True,
exclude_unset: bool = True,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
user_data = user_obj.dict(by_alias=False, exclude_unset=False, exclude={'new_password', 'account_id_random'})
log.info('Checking if the username is already in use for the account...')
sql_select_user = f"""
SELECT *
FROM `user` AS user
WHERE user.account_id = :account_id and user.username = :username
"""
if sql_select_result := sql_select(sql=sql_select_user, data=user_data):
return mk_resp(data=False, status_message='The user account was not created. This is likely because of a duplicate username.')
log.info('Adding new user account...')
if sql_insert_result := sql_insert(table_name='user', data=user_data):
log.info('Selecting new user account to return as an object...')
sql_select_user_result = sql_select(table_name='v_user', record_id=sql_insert_result)
user_obj_new = User_Out_Base(**sql_select_user_result)
return mk_resp(data=user_obj_new.dict(by_alias=True, exclude_unset=True))
else:
return mk_resp(data=False, status_message='The user account was not created. Something seems to have gone wrong on insert.')
@router.patch('/{obj_id}', response_model=Resp_Body_Base)
async def patch_user_obj(
obj_id: str = Query(..., min_length=1, max_length=22),
obj: User_Base = None,
x_account_id: Optional[str] = Header(..., ),
return_obj: Optional[bool] = True,
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
obj_data_dict = obj.dict(by_alias=False, exclude_unset=True)
obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type)
obj_data_dict['id_random'] = obj_id
result = patch_obj_template(
obj_type=obj_type,
data=obj_data_dict,
obj_id=obj_id,
return_obj=True,
by_alias=True,
exclude_unset=True,
)
return result
# This will look up a user based on the auth key given
# This can only be done once per key. It will be deleted if found
# A new one will need to be requested for a particular user each time
@router.get('/authenticate/auth_key/{auth_key}', response_model=Resp_Body_Base)
async def auth_key_get_user_obj(
auth_key: str = Query(..., min_length=11, max_length=22),
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if sql_select_result := sql_select(table_name='user', field_name='auth_key', field_value=auth_key):
log.debug(sql_select_result)
resp_data = {}
resp_data['user_id_random'] = sql_select_result.get('id_random')
resp_data['username'] = sql_select_result.get('username')
resp_data['enable'] = sql_select_result.get('enable')
resp_data['enable_from'] = sql_select_result.get('enable_from')
resp_data['enable_to'] = sql_select_result.get('enable_to')
try:
user_obj = User_Base(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset, exclude_none=exclude_none)
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_obj)
log.debug(user_obj.get('enable_from', None))
except ValidationError as e:
log.error(e.json())
current_utc_datetime = datetime.datetime.now(datetime.timezone.utc)
log.debug(user_obj.get('enable_from', None).astimezone(pytz.UTC))
user_enable_from = user_obj.get('enable_from', None).astimezone(pytz.UTC)
log.debug(user_enable_from)
log.debug(user_obj.get('enable_to', None))
user_enable_to = user_obj.get('enable_to', None).astimezone(pytz.UTC)
log.debug(user_enable_to)
if resp_data['enable']: pass
else:
log.info('The user account has been disabled')
if user_enable_from <= current_utc_datetime:
log.info('Enable from datetime is valid')
else:
log.info('Enable from datetime is in the future. Please wait.')
if user_enable_to >= current_utc_datetime:
log.info('Enable to datetime is valid')
else:
log.info('Enable to datetime is in the past. Your user account has been disabled.')
update_data = {}
update_data['id'] = sql_select_result.get('id')
update_data['auth_key'] = None
if sql_update_resp := sql_update(table_name='user', data=update_data):
log.info('The user record was updated with a NULL auth_key')
else:
log.info('The user record was not updated with a NULL auth_key')
log.debug(sql_update_resp)
return mk_resp(data=user_obj)
else:
return mk_resp(data=None, status_code=404)
@router.get('/list', response_model=Resp_Body_Base)
async def get_user_obj_li(
for_obj_type: Optional[str] = Query(None, min_length=2, max_length=50),
for_obj_id: Optional[str] = Query(None, min_length=1, max_length=22),
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
result = get_obj_li_template(
obj_type=obj_type,
for_obj_type=for_obj_type,
for_obj_id=for_obj_id,
by_alias=True,
exclude_unset=True,
)
return result
# Look up is only for account or person records
@router.get('/lookup', response_model=Resp_Body_Base)
async def lookup_user_obj(
for_obj_id: Union[int,str],
for_obj_type: str = Query(..., min_length=2, max_length=50),
x_account_id: str = Header(...),
inc_contact: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
by_alias: bool = True,
exclude_unset: bool = True,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
base_name = User_Out_Base
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type): pass
else: return mk_resp(data=False, status_code=404) # Not Found
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
data = {}
as_list = False
if for_obj_type == 'account' and for_obj_id:
data['account_id'] = for_obj_id
sql_where_for_obj_type = """`user`.account_id = :account_id"""
sql_limit = ''
as_list = True
elif for_obj_type == 'person' and for_obj_id:
data['person_id'] = for_obj_id
sql_where_for_obj_type = """`user`.person_id = :person_id"""
sql_limit = 'LIMIT 1'
else:
log.debug(f'Object type={for_obj_type}; Object ID={for_obj_id}')
return mk_resp(data=False, status_code=400) # Bad Request
sql = f"""
SELECT id AS 'user_id', id_random AS 'user_id_random'
FROM `user` AS `user`
WHERE {sql_where_for_obj_type}
{sql_limit}
"""
# This will return a list if selecting by account ID
user_obj_result = sql_select(data=data, sql=sql, as_list=as_list)
if isinstance(user_obj_result, dict):
user_id = user_obj_result.get('user_id', None)
user_obj = load_user_obj(
user_id=user_id,
inc_contact=inc_contact,
inc_organization=inc_organization,
inc_person=inc_person
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
data = user_obj
elif isinstance(user_obj_result, list):
user_obj_li = []
for user_obj in user_obj_result:
user_id = user_obj.get('user_id', None)
user_obj_li.append(
load_user_obj(
user_id=user_id,
inc_contact=inc_contact,
inc_organization=inc_organization,
inc_person=inc_person,
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
)
data = user_obj_li
else:
log.debug(user_obj_result)
return mk_resp(data=None, status_code=404) # Not Found
return mk_resp(data=data)
# Look up is only for account or person records
@router.get('/lookup_username', response_model=Resp_Body_Base)
async def lookup_username_obj(
account_id: Union[int,str],
username: str = Query(..., min_length=2, max_length=50),
x_account_id: str = Header(...),
inc_contact: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
by_alias: bool = True,
exclude_unset: bool = True,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: return mk_resp(data=False, status_code=404) # Not Found
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
data = {}
data['account_id'] = account_id
data['username'] = username
sql = f"""
SELECT id AS 'user_id', id_random AS 'user_id_random'
FROM `user` AS `user`
WHERE `user`.account_id = :account_id AND `user`.username = :username
"""
# This will return a list if selecting by account ID
user_obj_result = sql_select(data=data, sql=sql)
if isinstance(user_obj_result, dict):
user_id = user_obj_result.get('user_id', None)
user_obj = load_user_obj(
user_id=user_id,
inc_contact=inc_contact,
inc_organization=inc_organization,
inc_person=inc_person
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
data = user_obj
elif isinstance(user_obj_result, list):
user_obj_li = []
for user_obj in user_obj_result:
user_id = user_obj.get('user_id', None)
user_obj_li.append(
load_user_obj(
user_id=user_id,
inc_contact=inc_contact,
inc_organization=inc_organization,
inc_person=inc_person,
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
)
data = user_obj_li
else:
log.debug(user_obj_result)
return mk_resp(data=None, status_code=404) # Not Found
return mk_resp(data=data)
@router.get('/{obj_id}', response_model=Resp_Body_Base)
async def get_user_obj(
obj_id: str = Query(..., min_length=1, max_length=22),
x_account_id: str = Header(...),
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
result = get_obj_template(
obj_type=obj_type,
obj_id=obj_id,
by_alias=True,
exclude_unset=True,
)
return result
@router.delete('/{obj_id}', response_model=Resp_Body_Base)
async def delete_user_obj(
obj_id: str = Query(..., min_length=1, max_length=22),
x_account_id: str = Header(...),
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_type = 'user'
result = delete_obj_template(
obj_type=obj_type,
obj_id=obj_id,
)
return result