import datetime, pytz, time from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, status from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from app.lib_general import log, logging, secure_hash_string, verify_secure_hash_string from app.config import settings from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template from app.methods.user_methods import load_user_obj from app.models.common_field_schema import default_num_bytes from app.models.response_models import Resp_Body_Base, mk_resp from app.models.user_models import User_Base, User_New_Base, User_Out_Base router = APIRouter() @router.post('', response_model=Resp_Body_Base) async def post_user_obj( obj: User_Base, x_account_id: str = Header(...), return_obj: Optional[bool] = True, by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'user' obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) result = post_obj_template( obj_type=obj_type, data=obj_data_dict, return_obj=True, by_alias=True, exclude_unset=True, ) return result @router.post('/new', response_model=Resp_Body_Base) async def post_user_new_obj( user_obj: User_New_Base, x_account_id: str = Header(...), return_obj: bool = True, by_alias: bool = True, exclude_unset: bool = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) user_data = user_obj.dict(by_alias=False, exclude_unset=False, exclude={'new_password', 'account_id_random'}) log.info('Checking if the username is already in use for the account...') sql_select_user = f""" SELECT * FROM `user` AS user WHERE user.account_id = :account_id and user.username = :username """ if sql_select_result := sql_select(sql=sql_select_user, data=user_data): return mk_resp(data=False, status_message='The user account was not created. This is likely because of a duplicate username.') log.info('Adding new user account...') if sql_insert_result := sql_insert(table_name='user', data=user_data): log.info('Selecting new user account to return as an object...') sql_select_user_result = sql_select(table_name='v_user', record_id=sql_insert_result) user_obj_new = User_Out_Base(**sql_select_user_result) return mk_resp(data=user_obj_new.dict(by_alias=True, exclude_unset=True)) else: return mk_resp(data=False, status_message='The user account was not created. Something seems to have gone wrong on insert.') @router.patch('/change_password/{user_id}', response_model=Resp_Body_Base) async def change_user_obj_password( user_id: Union[int,str], password: Optional[str] = Query(None, min_length=6, max_length=50), x_account_id: Optional[str] = Header(..., ), return_obj: bool = False, inc_roles: bool = False, inc_contact: bool = False, inc_organization: bool = False, inc_person: bool = False, by_alias: bool = True, exclude_unset: bool = True, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if password and len(password) >= 10: pass else: log.warning('The password given must be at least 10 characters. Generating a new random password.') password = secrets.token_urlsafe(default_num_bytes) if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass else: return mk_resp(data=False, status_code=404) # Not Found user_data = {} #user_data['user_id'] = user_id #user_data['username'] = username #???? user_data['password'] = secure_hash_string(string=password) table_name = 'user' user_rec_update_result = sql_update(data=user_data, table_name=table_name, record_id=user_id, id_random_length=None) if return_obj: user_obj = load_user_obj( user_id=user_id, inc_contact=inc_contact, inc_organization=inc_organization, inc_person=inc_person ).dict(by_alias=by_alias, exclude_unset=exclude_unset) data = user_obj else: data = True return mk_resp(data=data) #return mk_resp(data=None, status_code=501) # Not Implemented @router.patch('/{obj_id}', response_model=Resp_Body_Base) async def patch_user_obj( obj: User_Base, obj_id: str = Query(..., min_length=1, max_length=22), x_account_id: Optional[str] = Header(..., ), return_obj: Optional[bool] = True, by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'user' obj_data_dict = obj.dict(by_alias=False, exclude_unset=True) obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type) obj_data_dict['id_random'] = obj_id result = patch_obj_template( obj_type=obj_type, data=obj_data_dict, obj_id=obj_id, return_obj=True, by_alias=True, exclude_unset=True, ) return result # ### BEGIN ### API User Routers ### user_new_auth_key() ### # Generate a new one time use authorization key @router.get('/new_auth_key', response_model=Resp_Body_Base) async def user_new_auth_key( user_id: Optional[str] = Query(None, min_length=2, max_length=50), x_account_id: str = Header(...), return_obj: Optional[bool] = False, by_alias: bool = True, exclude_unset: bool = True, exclude_none: bool = True, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) update_user_data = {} update_user_data['id_random'] = user_id update_user_data['auth_key'] = secrets.token_urlsafe(default_num_bytes) if user_rec_update_result := sql_update(table_name='user', data=update_user_data): log.info('The user record was updated with a new auth_key') if return_obj: user_obj = load_user_obj( user_id=user_id, inc_contact=False, inc_organization=False, inc_person=False ).dict(by_alias=by_alias, exclude_unset=exclude_unset) data = user_obj else: user_obj = {} user_obj['auth_key'] = update_user_data['auth_key'] return mk_resp(data=user_obj) else: log.info('The user record was not updated with a new auth_key') log.debug(user_rec_update_result) return mk_resp(data=False, status_code=404) # ### BEGIN ### API User Routers ### user_authenticate() ### # Authenticate a username and password OR by authorization key # An authorization key can only be done once. It will be deleted if found. # A new key will need to be requested for a particular user each time. @router.get('/authenticate', response_model=Resp_Body_Base) async def user_authenticate( account_id: Optional[Union[int,str]] = None, username: Optional[str] = Query(None, min_length=2, max_length=50), password: Optional[str] = Query(None, min_length=6, max_length=50), auth_key: Optional[str] = Query(None, min_length=11, max_length=22), x_account_id: str = Header(...), inc_roles: bool = False, inc_contact: bool = False, inc_organization: bool = False, inc_person: bool = False, by_alias: bool = True, exclude_unset: bool = True, exclude_none: bool = True, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if account_id and username and password: if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass else: return mk_resp(data=False, status_code=404) # Not Found user_data = {} user_data['account_id'] = account_id user_data['username'] = username sql_select(table_name='user', data=user_data) sql = f""" SELECT `user`.id AS 'user_id', `user`.id_random AS 'user_id_random', `user`.password, `user`.enable, `user`.enable_from, `user`.enable_to FROM `user` AS `user` WHERE `user`.account_id = :account_id AND `user`.username = :username LIMIT 1 """ # This will return a list if selecting by account ID if user_rec_result := sql_select(data=user_data, sql=sql): user_id = user_rec_result.get('user_id', None) if password_hash := user_rec_result.get('password', None): if verify_secure_hash_string(string=password, string_hash=password_hash): log.info('The username was found, and the password matched.') #return mk_resp(data=False, status_message='The username was found, and the password matched.') else: log.info('The username was found, but the password did not match.') return mk_resp(data=False, status_message='The username was found, but the password did not match.') else: log.error('The password has was not found. This should not happen.') return mk_resp(data=False, status_message='The password has was not found. This should not happen.') else: return mk_resp(data=None, status_code=404, status_message='The user account was not found') elif auth_key: if user_rec_result := sql_select(table_name='user', field_name='auth_key', field_value=auth_key): update_user_data = {} update_user_data['id'] = user_rec_result.get('id', None) update_user_data['auth_key'] = None if user_rec_update_result := sql_update(table_name='user', data=update_user_data): log.info('The user record was updated with a NULL auth_key') else: log.info('The user record was not updated with a NULL auth_key') log.debug(user_rec_update_result) user_id = user_rec_result.get('id', None) # NOTE: This us looking for "id", not "user_id" else: return mk_resp(data=None, status_code=404, status_message='A user account with that auth key was not found') else: return mk_resp(data=None, status_code=400, status_message='One more user account fields was missing or unexpected.') # Bad Request log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(user_rec_result) if isinstance(user_rec_result, dict): current_utc_datetime = datetime.datetime.now(datetime.timezone.utc) log.debug(current_utc_datetime) if user_rec_result.get('enable', None): log.info('The user account is enabled') else: log.info('The user account is not enabled') return mk_resp(data=False, status_message='This user account is not enabled') #if user_enable_from := user_rec_result.get('enable_from', None).astimezone(pytz.UTC): if user_enable_from := user_rec_result.get('enable_from', None).replace(tzinfo=datetime.timezone.utc): log.debug(user_enable_from) if user_enable_from <= current_utc_datetime: log.info('Enable from datetime is valid') else: log.info('Enable from datetime is in the future. Please wait.') return mk_resp(data=False, status_message='This account is not yet enabled') #if user_enable_to := user_rec_result.get('enable_to', None).astimezone(pytz.UTC): if user_enable_to := user_rec_result.get('enable_to', None).replace(tzinfo=datetime.timezone.utc): log.debug(user_enable_to) if user_enable_to >= current_utc_datetime: log.info('Enable to datetime is valid') else: log.info('Enable to datetime is in the past. Your user account has been disabled.') return mk_resp(data=False, status_message='This account is not enabled because the expiratation date has passed') user_obj = load_user_obj( user_id=user_id, inc_roles=inc_roles, inc_contact=inc_contact, inc_organization=inc_organization, inc_person=inc_person ).dict(by_alias=by_alias, exclude_unset=exclude_unset) data = user_obj return mk_resp(data=user_obj) else: log.error('SQL result was unexpected. A dict result type was expected. This should not happen.') return mk_resp(data=False, status_code=500) # ### END ### API User Routers ### user_authenticate() ### @router.get('/list', response_model=Resp_Body_Base) async def get_user_obj_li( for_obj_type: Optional[str] = Query(None, min_length=2, max_length=50), for_obj_id: Optional[str] = Query(None, min_length=1, max_length=22), x_account_id: str = Header(...), by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'user' result = get_obj_li_template( obj_type=obj_type, for_obj_type=for_obj_type, for_obj_id=for_obj_id, by_alias=True, exclude_unset=True, ) return result # Look up is only for account or person records @router.get('/lookup', response_model=Resp_Body_Base) async def lookup_user_obj( for_obj_id: Union[int,str], for_obj_type: str = Query(..., min_length=2, max_length=50), x_account_id: str = Header(...), inc_roles: bool = False, inc_contact: bool = False, inc_organization: bool = False, inc_person: bool = False, by_alias: bool = True, exclude_unset: bool = True, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'user' base_name = User_Out_Base if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type): pass else: return mk_resp(data=False, status_code=404) # Not Found log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL data = {} as_list = False if for_obj_type == 'account' and for_obj_id: data['account_id'] = for_obj_id sql_where_for_obj_type = """`user`.account_id = :account_id""" sql_limit = '' as_list = True elif for_obj_type == 'person' and for_obj_id: data['person_id'] = for_obj_id sql_where_for_obj_type = """`user`.person_id = :person_id""" sql_limit = 'LIMIT 1' else: log.debug(f'Object type={for_obj_type}; Object ID={for_obj_id}') return mk_resp(data=False, status_code=400) # Bad Request sql = f""" SELECT id AS 'user_id', id_random AS 'user_id_random' FROM `user` AS `user` WHERE {sql_where_for_obj_type} {sql_limit} """ # This will return a list if selecting by account ID user_rec_result = sql_select(data=data, sql=sql, as_list=as_list) if isinstance(user_rec_result, dict): user_id = user_rec_result.get('user_id', None) user_obj = load_user_obj( user_id=user_id, inc_roles=inc_roles, inc_contact=inc_contact, inc_organization=inc_organization, inc_person=inc_person ).dict(by_alias=by_alias, exclude_unset=exclude_unset) data = user_obj elif isinstance(user_rec_result, list): user_obj_li = [] for user_obj in user_rec_result: user_id = user_obj.get('user_id', None) user_obj_li.append( load_user_obj( user_id=user_id, inc_roles=inc_roles, inc_contact=inc_contact, inc_organization=inc_organization, inc_person=inc_person, ).dict(by_alias=by_alias, exclude_unset=exclude_unset) ) data = user_obj_li else: log.debug(user_rec_result) return mk_resp(data=None, status_code=404) # Not Found return mk_resp(data=data) # Look up a user with an email addresss for an account @router.get('/lookup_email', response_model=Resp_Body_Base) async def lookup_email( account_id: Union[int,str], email: str = Query(..., min_length=2, max_length=50), x_account_id: str = Header(...), inc_roles: bool = False, inc_contact: bool = False, inc_organization: bool = False, inc_person: bool = False, by_alias: bool = True, exclude_unset: bool = True, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if account_id == '': account_id = None elif account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass else: return mk_resp(data=False, status_code=404) # Not Found log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL data = {} data['account_id'] = account_id data['email'] = email log.debug(data) if account_id: sql = f""" SELECT id AS 'user_id', id_random AS 'user_id_random' FROM `user` AS `user` WHERE `user`.account_id = :account_id AND `user`.email = :email """ else: sql = f""" SELECT id AS 'user_id', id_random AS 'user_id_random' FROM `user` AS `user` WHERE `user`.account_id IS NULL AND `user`.email = :email """ log.debug(sql) # This will return a list if selecting by account ID user_obj_result = sql_select(data=data, sql=sql) if isinstance(user_obj_result, dict): user_id = user_obj_result.get('user_id', None) user_obj = load_user_obj( user_id=user_id, inc_roles=inc_roles, inc_contact=inc_contact, inc_organization=inc_organization, inc_person=inc_person ).dict(by_alias=by_alias, exclude_unset=exclude_unset) data = user_obj elif isinstance(user_obj_result, list): user_obj_li = [] for user_obj in user_obj_result: user_id = user_obj.get('user_id', None) user_obj_li.append( load_user_obj( user_id=user_id, inc_roles=inc_roles, inc_contact=inc_contact, inc_organization=inc_organization, inc_person=inc_person, ).dict(by_alias=by_alias, exclude_unset=exclude_unset) ) data = user_obj_li else: log.debug(user_obj_result) return mk_resp(data=None, status_code=404) # Not Found return mk_resp(data=data) # Look up is only for account or person records # Look up a user with a username for an account @router.get('/lookup_username', response_model=Resp_Body_Base) async def lookup_username( account_id: Union[int,str], username: str = Query(..., min_length=2, max_length=50), x_account_id: str = Header(...), inc_user_role_list: bool = False, inc_contact: bool = False, inc_organization: bool = False, inc_person: bool = False, by_alias: bool = True, exclude_unset: bool = True, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if account_id == '': account_id = None elif account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass else: return mk_resp(data=False, status_code=404) # Not Found log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL data = {} data['account_id'] = account_id data['username'] = username log.debug(data) if account_id: sql = f""" SELECT id AS 'user_id', id_random AS 'user_id_random' FROM `user` AS `user` WHERE `user`.account_id = :account_id AND `user`.username = :username """ else: sql = f""" SELECT id AS 'user_id', id_random AS 'user_id_random' FROM `user` AS `user` WHERE `user`.account_id IS NULL AND `user`.username = :username """ log.debug(sql) # This will return a list if selecting by account ID user_obj_result = sql_select(data=data, sql=sql) if isinstance(user_obj_result, dict): user_id = user_obj_result.get('user_id', None) user_obj = load_user_obj( user_id = user_id, inc_contact = inc_contact, inc_organization = inc_organization, inc_person = inc_person, inc_user_role_list = inc_user_role_list, ).dict(by_alias = by_alias, exclude_unset=exclude_unset) data = user_obj elif isinstance(user_obj_result, list): user_obj_li = [] for user_obj in user_obj_result: user_id = user_obj.get('user_id', None) user_obj_li.append( load_user_obj( user_id = user_id, inc_contact = inc_contact, inc_organization = inc_organization, inc_person = inc_person, inc_user_role_list = inc_user_role_list, ).dict(by_alias=by_alias, exclude_unset=exclude_unset) ) data = user_obj_li else: log.debug(user_obj_result) return mk_resp(data=None, status_code=404) # Not Found return mk_resp(data=data) # ### BEGIN ### API User ### get_user_obj() ### # Working well as of 2021-06-25. Using as a template for other routes. @router.get('/{user_id}', response_model=Resp_Body_Base) async def get_user_obj( user_id: str = Query(..., min_length=1, max_length=22), limit: int = 500, # For now this covers any included objects or object lists enabled: str = 'enabled', # For now this covers any included objects or object lists inc_address: bool = False, # Priority l1 # inc_archive_list: bool = False, # Priority l3 inc_contact: bool = False, # Priority l1 inc_event_list: bool = False, # Priority l1 # inc_hosted_file_list: bool = False, # Priority l3 inc_journal_list: bool = False, # Priority l2 # inc_journal_entry_list: bool = False, # Priority l3 inc_membership_member: bool = False, # Priority l2 # inc_membership_list: bool = False, # ??? inc_order_line_list: bool = False, # Priority l1 inc_order_list: bool = False, # Priority l1 inc_order_cart_list: bool = False, # Priority l1 inc_organization: bool = False, # Priority l1 # inc_organization_list: bool = False, inc_person: bool = False, # Priority l1 # inc_person_list: bool = False, inc_post_list: bool = False, # Priority l2 inc_post_comment_list: bool = False, # Priority l3 inc_user_role_list: bool = False, # Priority l1 x_account_id: str = Header(...), by_alias: Optional[bool] = True, exclude_unset: Optional[bool] = True, ): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if user_id := redis_lookup_id_random(record_id_random=user_id, table_name='user'): pass else: return mk_resp(data=None, status_code=404) if user_result := load_user_obj( user_id = user_id, limit = limit, model_as_dict = True, # NOTE: returning model as a dict enabled = enabled, inc_address = inc_address, # inc_archive_list = inc_archive_list, inc_contact = inc_contact, inc_event_list = inc_event_list, # inc_hosted_file_list = inc_hosted_file_list, inc_journal_list = inc_journal_list, # inc_journal_entry_list = inc_journal_entry_list, inc_membership_member = inc_membership_member, # inc_membership_list = inc_membership_list, # ??? inc_order_line_list = inc_order_line_list, inc_order_list = inc_order_list, inc_order_cart_list = inc_order_cart_list, inc_organization = inc_organization, # inc_organization_list = inc_organization_list, inc_person = inc_person, # inc_person_list = inc_person_list, inc_post_list = inc_post_list, # inc_post_comment_list = inc_post_comment_list, inc_user_role_list = inc_user_role_list, ): response_data = user_result else: return mk_resp(data=False, status_code=400) # Bad Request return mk_resp(data=response_data) # ### END ### API User ### get_user_obj() ### # @router.get('/{user_id}', response_model=Resp_Body_Base) # async def get_user_obj( # user_id: str = Query(..., min_length=1, max_length=22), # x_account_id: str = Header(...), # inc_roles: bool = False, # inc_contact: bool = False, # inc_organization: bool = False, # inc_person: bool = False, # by_alias: bool = True, # exclude_unset: bool = True, # ): # log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(locals()) # user_obj = load_user_obj( # user_id=user_id, # inc_roles=inc_roles, # inc_contact=inc_contact, # inc_organization=inc_organization, # inc_person=inc_person # ).dict(by_alias=by_alias, exclude_unset=exclude_unset) # data = user_obj # return mk_resp(data=data) @router.delete('/{obj_id}', response_model=Resp_Body_Base) async def delete_user_obj( obj_id: str = Query(..., min_length=1, max_length=22), x_account_id: str = Header(...), ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_type = 'user' result = delete_obj_template( obj_type=obj_type, obj_id=obj_id, ) return result