import datetime, pytz, time #from datetime import datetime, time, timedelta from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, status from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from ..lib_general import * from ..log import * from app.config import settings from app.db_sql import * from .api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template from ..models.common_field_schema import default_num_bytes from ..models.user_model import User_Base, User_New_Base, User_Out_Base from ..models.user_methods import load_user_obj from ..models.response_model import * router = APIRouter() @router.post('', response_model=Resp_Body_Base) async def post_user_obj( obj: User_Base, x_account_id: str = Header(...), return_obj: Optional[bool] = True, by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'user' obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) result = post_obj_template( obj_type=obj_type, data=obj_data_dict, return_obj=True, by_alias=True, exclude_unset=True, ) return result @router.post('/new', response_model=Resp_Body_Base) async def post_user_new_obj( user_obj: User_New_Base, x_account_id: str = Header(...), return_obj: bool = True, by_alias: bool = True, exclude_unset: bool = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) user_data = user_obj.dict(by_alias=False, exclude_unset=False, exclude={'new_password', 'account_id_random'}) log.info('Checking if the username is already in use for the account...') sql_select_user = f""" SELECT * FROM `user` AS user WHERE user.account_id = :account_id and user.username = :username """ if sql_select_result := sql_select(sql=sql_select_user, data=user_data): return mk_resp(data=False, status_message='The user account was not created. This is likely because of a duplicate username.') log.info('Adding new user account...') if sql_insert_result := sql_insert(table_name='user', data=user_data): log.info('Selecting new user account to return as an object...') sql_select_user_result = sql_select(table_name='v_user', record_id=sql_insert_result) user_obj_new = User_Out_Base(**sql_select_user_result) return mk_resp(data=user_obj_new.dict(by_alias=True, exclude_unset=True)) else: return mk_resp(data=False, status_message='The user account was not created. Something seems to have gone wrong on insert.') @router.patch('/change_password/{user_id}', response_model=Resp_Body_Base) async def change_user_obj_password( user_id: Union[int,str], password: Optional[str] = Query(None, min_length=6, max_length=50), x_account_id: Optional[str] = Header(..., ), return_obj: bool = False, inc_roles: bool = False, inc_contact: bool = False, inc_organization: bool = False, inc_person: bool = False, by_alias: bool = True, exclude_unset: bool = True, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if password and len(password) >= 10: pass else: log.warning('The password given must be at least 10 characters. Generating a new random password.') password = secrets.token_urlsafe(default_num_bytes) if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass else: return mk_resp(data=False, status_code=404) # Not Found user_data = {} #user_data['user_id'] = user_id #user_data['username'] = username #???? user_data['password'] = secure_hash_string(string=password) table_name = 'user' user_rec_update_result = sql_update(data=user_data, table_name=table_name, record_id=user_id, id_random_length=None) if return_obj: user_obj = load_user_obj( user_id=user_id, inc_contact=inc_contact, inc_organization=inc_organization, inc_person=inc_person ).dict(by_alias=by_alias, exclude_unset=exclude_unset) data = user_obj else: data = True return mk_resp(data=data) #return mk_resp(data=None, status_code=501) # Not Implemented @router.patch('/{obj_id}', response_model=Resp_Body_Base) async def patch_user_obj( obj_id: str = Query(..., min_length=1, max_length=22), obj: User_Base = None, x_account_id: Optional[str] = Header(..., ), return_obj: Optional[bool] = True, by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'user' obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type) obj_data_dict['id_random'] = obj_id result = patch_obj_template( obj_type=obj_type, data=obj_data_dict, obj_id=obj_id, return_obj=True, by_alias=True, exclude_unset=True, ) return result # ### BEGIN ### API User Routers ### user_new_auth_key() ### # Generate a new one time use authorization key @router.get('/new_auth_key', response_model=Resp_Body_Base) async def user_new_auth_key( user_id: Optional[str] = Query(None, min_length=2, max_length=50), x_account_id: str = Header(...), return_obj: Optional[bool] = False, by_alias: bool = True, exclude_unset: bool = True, exclude_none: bool = True, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) update_user_data = {} update_user_data['id_random'] = user_id update_user_data['auth_key'] = secrets.token_urlsafe(default_num_bytes) if user_rec_update_result := sql_update(table_name='user', data=update_user_data): log.info('The user record was updated with a new auth_key') if return_obj: user_obj = load_user_obj( user_id=user_id, inc_contact=False, inc_organization=False, inc_person=False ).dict(by_alias=by_alias, exclude_unset=exclude_unset) data = user_obj else: user_obj = {} user_obj['auth_key'] = update_user_data['auth_key'] return mk_resp(data=user_obj) else: log.info('The user record was not updated with a new auth_key') log.debug(user_rec_update_result) return mk_resp(data=False, status_code=404) # ### BEGIN ### API User Routers ### user_authenticate() ### # Authenticate a username and password OR by authorization key # An authorization key can only be done once. It will be deleted if found. # A new key will need to be requested for a particular user each time. @router.get('/authenticate', response_model=Resp_Body_Base) async def user_authenticate( account_id: Optional[Union[int,str]] = None, username: Optional[str] = Query(None, min_length=2, max_length=50), password: Optional[str] = Query(None, min_length=6, max_length=50), auth_key: Optional[str] = Query(None, min_length=11, max_length=22), x_account_id: str = Header(...), inc_roles: bool = False, inc_contact: bool = False, inc_organization: bool = False, inc_person: bool = False, by_alias: bool = True, exclude_unset: bool = True, exclude_none: bool = True, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if account_id and username and password: if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass else: return mk_resp(data=False, status_code=404) # Not Found user_data = {} user_data['account_id'] = account_id user_data['username'] = username sql_select(table_name='user', data=user_data) sql = f""" SELECT `user`.id AS 'user_id', `user`.id_random AS 'user_id_random', `user`.password, `user`.enable, `user`.enable_from, `user`.enable_to FROM `user` AS `user` WHERE `user`.account_id = :account_id AND `user`.username = :username LIMIT 1 """ # This will return a list if selecting by account ID if user_rec_result := sql_select(data=user_data, sql=sql): user_id = user_rec_result.get('user_id', None) if password_hash := user_rec_result.get('password', None): if verify_secure_hash_string(string=password, string_hash=password_hash): log.info('The username was found, and the password matched.') #return mk_resp(data=False, status_message='The username was found, and the password matched.') else: log.info('The username was found, but the password did not match.') return mk_resp(data=False, status_message='The username was found, but the password did not match.') else: log.error('The password has was not found. This should not happen.') return mk_resp(data=False, status_message='The password has was not found. This should not happen.') else: return mk_resp(data=None, status_code=404, status_message='The user account was not found') elif auth_key: if user_rec_result := sql_select(table_name='user', field_name='auth_key', field_value=auth_key): update_user_data = {} update_user_data['id'] = user_rec_result.get('id', None) update_user_data['auth_key'] = None if user_rec_update_result := sql_update(table_name='user', data=update_user_data): log.info('The user record was updated with a NULL auth_key') else: log.info('The user record was not updated with a NULL auth_key') log.debug(user_rec_update_result) user_id = user_rec_result.get('id', None) # NOTE: This us looking for "id", not "user_id" else: return mk_resp(data=None, status_code=404, status_message='A user account with that auth key was not found') else: return mk_resp(data=None, status_code=400, status_message='One more user account fields was missing or unexpected.') # Bad Request log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(user_rec_result) if isinstance(user_rec_result, dict): current_utc_datetime = datetime.datetime.now(datetime.timezone.utc) log.debug(current_utc_datetime) if user_rec_result.get('enable', None): log.info('The user account is enabled') else: log.info('The user account is not enabled') return mk_resp(data=False, status_message='This user account is not enabled') #if user_enable_from := user_rec_result.get('enable_from', None).astimezone(pytz.UTC): if user_enable_from := user_rec_result.get('enable_from', None).replace(tzinfo=datetime.timezone.utc): log.debug(user_enable_from) if user_enable_from <= current_utc_datetime: log.info('Enable from datetime is valid') else: log.info('Enable from datetime is in the future. Please wait.') return mk_resp(data=False, status_message='This account is not yet enabled') #if user_enable_to := user_rec_result.get('enable_to', None).astimezone(pytz.UTC): if user_enable_to := user_rec_result.get('enable_to', None).replace(tzinfo=datetime.timezone.utc): log.debug(user_enable_to) if user_enable_to >= current_utc_datetime: log.info('Enable to datetime is valid') else: log.info('Enable to datetime is in the past. Your user account has been disabled.') return mk_resp(data=False, status_message='This account is not enabled because the expiratation date has passed') user_obj = load_user_obj( user_id=user_id, inc_roles=inc_roles, inc_contact=inc_contact, inc_organization=inc_organization, inc_person=inc_person ).dict(by_alias=by_alias, exclude_unset=exclude_unset) data = user_obj return mk_resp(data=user_obj) else: log.error('SQL result was unexpected. A dict result type was expected. This should not happen.') return mk_resp(data=False, status_code=500) # ### END ### API User Routers ### user_authenticate() ### @router.get('/list', response_model=Resp_Body_Base) async def get_user_obj_li( for_obj_type: Optional[str] = Query(None, min_length=2, max_length=50), for_obj_id: Optional[str] = Query(None, min_length=1, max_length=22), x_account_id: str = Header(...), by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'user' result = get_obj_li_template( obj_type=obj_type, for_obj_type=for_obj_type, for_obj_id=for_obj_id, by_alias=True, exclude_unset=True, ) return result # Look up is only for account or person records @router.get('/lookup', response_model=Resp_Body_Base) async def lookup_user_obj( for_obj_id: Union[int,str], for_obj_type: str = Query(..., min_length=2, max_length=50), x_account_id: str = Header(...), inc_roles: bool = False, inc_contact: bool = False, inc_organization: bool = False, inc_person: bool = False, by_alias: bool = True, exclude_unset: bool = True, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'user' base_name = User_Out_Base if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type): pass else: return mk_resp(data=False, status_code=404) # Not Found log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL data = {} as_list = False if for_obj_type == 'account' and for_obj_id: data['account_id'] = for_obj_id sql_where_for_obj_type = """`user`.account_id = :account_id""" sql_limit = '' as_list = True elif for_obj_type == 'person' and for_obj_id: data['person_id'] = for_obj_id sql_where_for_obj_type = """`user`.person_id = :person_id""" sql_limit = 'LIMIT 1' else: log.debug(f'Object type={for_obj_type}; Object ID={for_obj_id}') return mk_resp(data=False, status_code=400) # Bad Request sql = f""" SELECT id AS 'user_id', id_random AS 'user_id_random' FROM `user` AS `user` WHERE {sql_where_for_obj_type} {sql_limit} """ # This will return a list if selecting by account ID user_rec_result = sql_select(data=data, sql=sql, as_list=as_list) if isinstance(user_rec_result, dict): user_id = user_rec_result.get('user_id', None) user_obj = load_user_obj( user_id=user_id, inc_roles=inc_roles, inc_contact=inc_contact, inc_organization=inc_organization, inc_person=inc_person ).dict(by_alias=by_alias, exclude_unset=exclude_unset) data = user_obj elif isinstance(user_rec_result, list): user_obj_li = [] for user_obj in user_rec_result: user_id = user_obj.get('user_id', None) user_obj_li.append( load_user_obj( user_id=user_id, inc_roles=inc_roles, inc_contact=inc_contact, inc_organization=inc_organization, inc_person=inc_person, ).dict(by_alias=by_alias, exclude_unset=exclude_unset) ) data = user_obj_li else: log.debug(user_rec_result) return mk_resp(data=None, status_code=404) # Not Found return mk_resp(data=data) # Look up a user with an email addresss for an account @router.get('/lookup_email', response_model=Resp_Body_Base) async def lookup_email( account_id: Union[int,str], email: str = Query(..., min_length=2, max_length=50), x_account_id: str = Header(...), inc_roles: bool = False, inc_contact: bool = False, inc_organization: bool = False, inc_person: bool = False, by_alias: bool = True, exclude_unset: bool = True, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if account_id == '': account_id = None elif account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass else: return mk_resp(data=False, status_code=404) # Not Found log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL data = {} data['account_id'] = account_id data['email'] = email log.debug(data) if account_id: sql = f""" SELECT id AS 'user_id', id_random AS 'user_id_random' FROM `user` AS `user` WHERE `user`.account_id = :account_id AND `user`.email = :email """ else: sql = f""" SELECT id AS 'user_id', id_random AS 'user_id_random' FROM `user` AS `user` WHERE `user`.account_id IS NULL AND `user`.email = :email """ log.debug(sql) # This will return a list if selecting by account ID user_obj_result = sql_select(data=data, sql=sql) if isinstance(user_obj_result, dict): user_id = user_obj_result.get('user_id', None) user_obj = load_user_obj( user_id=user_id, inc_roles=inc_roles, inc_contact=inc_contact, inc_organization=inc_organization, inc_person=inc_person ).dict(by_alias=by_alias, exclude_unset=exclude_unset) data = user_obj elif isinstance(user_obj_result, list): user_obj_li = [] for user_obj in user_obj_result: user_id = user_obj.get('user_id', None) user_obj_li.append( load_user_obj( user_id=user_id, inc_roles=inc_roles, inc_contact=inc_contact, inc_organization=inc_organization, inc_person=inc_person, ).dict(by_alias=by_alias, exclude_unset=exclude_unset) ) data = user_obj_li else: log.debug(user_obj_result) return mk_resp(data=None, status_code=404) # Not Found return mk_resp(data=data) # Look up is only for account or person records # Look up a user with a username for an account @router.get('/lookup_username', response_model=Resp_Body_Base) async def lookup_username( account_id: Union[int,str], username: str = Query(..., min_length=2, max_length=50), x_account_id: str = Header(...), inc_roles: bool = False, inc_contact: bool = False, inc_organization: bool = False, inc_person: bool = False, by_alias: bool = True, exclude_unset: bool = True, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if account_id == '': account_id = None elif account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass else: return mk_resp(data=False, status_code=404) # Not Found log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL data = {} data['account_id'] = account_id data['username'] = username log.debug(data) if account_id: sql = f""" SELECT id AS 'user_id', id_random AS 'user_id_random' FROM `user` AS `user` WHERE `user`.account_id = :account_id AND `user`.username = :username """ else: sql = f""" SELECT id AS 'user_id', id_random AS 'user_id_random' FROM `user` AS `user` WHERE `user`.account_id IS NULL AND `user`.username = :username """ log.debug(sql) # This will return a list if selecting by account ID user_obj_result = sql_select(data=data, sql=sql) if isinstance(user_obj_result, dict): user_id = user_obj_result.get('user_id', None) user_obj = load_user_obj( user_id=user_id, inc_roles=inc_roles, inc_contact=inc_contact, inc_organization=inc_organization, inc_person=inc_person ).dict(by_alias=by_alias, exclude_unset=exclude_unset) data = user_obj elif isinstance(user_obj_result, list): user_obj_li = [] for user_obj in user_obj_result: user_id = user_obj.get('user_id', None) user_obj_li.append( load_user_obj( user_id=user_id, inc_roles=inc_roles, inc_contact=inc_contact, inc_organization=inc_organization, inc_person=inc_person, ).dict(by_alias=by_alias, exclude_unset=exclude_unset) ) data = user_obj_li else: log.debug(user_obj_result) return mk_resp(data=None, status_code=404) # Not Found return mk_resp(data=data) @router.get('/{obj_id}', response_model=Resp_Body_Base) async def get_user_obj( obj_id: str = Query(..., min_length=1, max_length=22), x_account_id: str = Header(...), inc_contact: bool = False, inc_organization: bool = False, inc_person: bool = False, by_alias: bool = True, exclude_unset: bool = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'user' result = get_obj_template( obj_type=obj_type, obj_id=obj_id, by_alias=True, exclude_unset=True, ) return result @router.delete('/{obj_id}', response_model=Resp_Body_Base) async def delete_user_obj( obj_id: str = Query(..., min_length=1, max_length=22), x_account_id: str = Header(...), ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'user' result = delete_obj_template( obj_type=obj_type, obj_id=obj_id, ) return result