Working on user log in and person object

This commit is contained in:
Scott Idem
2021-04-06 17:59:44 -04:00
parent 3aa3fd8ae7
commit a3403109ae
7 changed files with 165 additions and 17 deletions

View File

@@ -54,10 +54,10 @@ def redis_lookup_id_random(record_id_random:int|str, table_name:str):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals()) log.debug(locals())
if isinstance(record_id_random, str): pass if isinstance(record_id_random, str) and len(record_id_random) >= 11 and len(record_id_random) <= 22: pass
elif isinstance(record_id_random, int): return record_id_random elif isinstance(record_id_random, int): return record_id_random
else: else:
log.warning(f'Unexpected data type: {str(type(record_id_random))} Expected type is a string 11 or 22 characters long.') log.warning(f'Unexpected data type or string format: {str(type(record_id_random))} Expected type is a string 11 or 22 characters long.')
return False return False
if record_id_random and table_name: if record_id_random and table_name:
@@ -70,9 +70,15 @@ def redis_lookup_id_random(record_id_random:int|str, table_name:str):
return False return False
else: else:
pass pass
else: elif record_id_random:
log.warning('Missing id_random')
return False
elif table_name:
log.warning('Missing table_name to select from for id_random') log.warning('Missing table_name to select from for id_random')
return False return False
else:
log.warning('Missing table_name and record_id_random')
return False
r = redis.Redis(host='localhost', port=6379, db=7, password=None, decode_responses=True) r = redis.Redis(host='localhost', port=6379, db=7, password=None, decode_responses=True)

View File

@@ -0,0 +1,50 @@
from __future__ import annotations
import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from ..lib_general import *
from ..db_sql import sql_select
from .person_model import Person_Base
# ### BEGIN ### API Person Methods ### load_person_obj() ###
def load_person_obj(person_id:int|str, inc_contact:bool=False, inc_organization:bool=False) -> Person_Base:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if person_id := redis_lookup_id_random(record_id_random=person_id, table_name='person'): pass
else: return False
if person_rec := sql_select(table_name='v_person', record_id=person_id):
#log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(person_rec)
if inc_contact:
if contact_rec := sql_select(table_name='v_contact', field_name='contact_id', field_value=person_rec.get('contact_id', None)):
person_rec['contact'] = contact_rec
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(contact_rec)
if inc_organization:
if organization_rec := sql_select(table_name='v_organization', field_name='organization_id', field_value=person_rec.get('organization_id', None)):
person_rec['organization'] = organization_rec
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(organization_rec)
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(person_rec)
else:
return False
try:
person_obj = Person_Base(**person_rec)
log.debug(person_obj)
except ValidationError as e:
log.error(e.json())
return False
return person_obj
# ### END ### API Person Methods ### load_person_obj() ###

View File

@@ -8,10 +8,11 @@ from ..lib_general import *
from ..db_sql import sql_select from ..db_sql import sql_select
from .user_model import User_Base, User_Out_Base, User_New_Base from .user_model import User_Base, User_Out_Base, User_New_Base
from .user_role_model import User_Role_Base
# ### BEGIN ### API User Methods ### load_user_obj() ### # ### BEGIN ### API User Methods ### load_user_obj() ###
def load_user_obj(user_id:int|str, inc_contact:bool=False, inc_organization:bool=False, inc_person:bool=False) -> User_Base: def load_user_obj(user_id:int|str, inc_roles:bool=False, inc_contact:bool=False, inc_organization:bool=False, inc_person:bool=False) -> User_Base:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals()) log.debug(locals())
@@ -22,18 +23,31 @@ def load_user_obj(user_id:int|str, inc_contact:bool=False, inc_organization:bool
#log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL #log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_rec) log.debug(user_rec)
if inc_roles:
if role_rec_li := sql_select(table_name='v_user_role_detail', field_name='user_id', field_value=user_id, as_list=True):
user_rec['role_list'] = role_rec_li
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(role_rec_li)
if inc_contact: if inc_contact:
if contact_rec := sql_select(table_name='v_contact', field_name='contact_id', field_value=user_rec.get('contact_id', None)): if contact_rec := sql_select(table_name='v_contact', field_name='contact_id', field_value=user_rec.get('contact_id', None)):
user_rec['contact'] = contact_rec user_rec['contact'] = contact_rec
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(contact_rec)
if inc_organization: if inc_organization:
if organization_rec := sql_select(table_name='v_organization', field_name='organization_id', field_value=user_rec.get('organization_id', None)): if organization_rec := sql_select(table_name='v_organization', field_name='organization_id', field_value=user_rec.get('organization_id', None)):
user_rec['organization'] = organization_rec user_rec['organization'] = organization_rec
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(organization_rec)
if inc_person: if inc_person:
if person_rec := sql_select(table_name='v_person', field_name='person_id', field_value=user_rec.get('person_id', None)): if person_rec := sql_select(table_name='v_person', field_name='person_id', field_value=user_rec.get('person_id', None)):
user_rec['person'] = person_rec user_rec['person'] = person_rec
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(person_rec)
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_rec) log.debug(user_rec)
else: else:
return False return False

View File

@@ -11,6 +11,7 @@ from .common_field_schema import base_fields, default_num_bytes
from .contact_model import Contact_Base from .contact_model import Contact_Base
#from .organization_model import Organization_Base #from .organization_model import Organization_Base
#from .person_model import Person_Base #from .person_model import Person_Base
from .user_role_model import User_Role_Base
class User_New_Base(BaseModel): class User_New_Base(BaseModel):
@@ -136,6 +137,7 @@ class User_Out_Base(BaseModel):
#contact: Optional[Contact_Base]# = Contact_Base() #contact: Optional[Contact_Base]# = Contact_Base()
#organization: Optional[Organization_Base]# = Organization_Base() #organization: Optional[Organization_Base]# = Organization_Base()
#person: Optional[Person_Base]# = Person_Base() #person: Optional[Person_Base]# = Person_Base()
role_list: Optional[list] = []# = User_Role_Base()
notes: Optional[str] notes: Optional[str]
created_on: Optional[datetime.datetime] created_on: Optional[datetime.datetime]
@@ -207,6 +209,7 @@ class User_Base(BaseModel):
contact: Optional[Contact_Base]# = Contact_Base() contact: Optional[Contact_Base]# = Contact_Base()
#organization: Optional[Organization_Base]# = Organization_Base() #organization: Optional[Organization_Base]# = Organization_Base()
#person: Optional[Person_Base]# = Person_Base() #person: Optional[Person_Base]# = Person_Base()
role_list: Optional[list] = []# = User_Role_Base()
notes: Optional[str] notes: Optional[str]
created_on: Optional[datetime.datetime] = None created_on: Optional[datetime.datetime] = None

View File

@@ -7,7 +7,7 @@ from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationEr
from ..lib_general import * from ..lib_general import *
from .common_field_schema import base_fields, default_num_bytes from .common_field_schema import base_fields, default_num_bytes
from .user_model import Contact_Base #from .user_model import User_Base
class User_Role_Base(BaseModel): class User_Role_Base(BaseModel):

View File

@@ -12,6 +12,7 @@ from app.db_sql import *
from .api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template from .api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template
from ..models.person_model import Person_Base from ..models.person_model import Person_Base
from ..models.person_methods import load_person_obj
from ..models.response_model import * from ..models.response_model import *
@@ -94,20 +95,30 @@ async def get_person_obj_li(
async def get_person_obj( async def get_person_obj(
obj_id: str = Query(..., min_length=1, max_length=22), obj_id: str = Query(..., min_length=1, max_length=22),
x_account_id: str = Header(...), x_account_id: str = Header(...),
inc_contact: bool = False,
inc_organization: bool = False,
by_alias: Optional[bool] = True, by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True, exclude_unset: Optional[bool] = True,
): ):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals()) log.debug(locals())
obj_type = 'person' user_obj = load_person_obj(
result = get_obj_template( person_id=obj_id,
obj_type=obj_type, inc_contact=inc_contact,
obj_id=obj_id, inc_organization=inc_organization,
by_alias=True, ).dict(by_alias=by_alias, exclude_unset=exclude_unset)
exclude_unset=True, data = user_obj
) return mk_resp(data=user_obj)
return result
# obj_type = 'person'
# result = get_obj_template(
# obj_type=obj_type,
# obj_id=obj_id,
# by_alias=True,
# exclude_unset=True,
# )
# return result
@router.delete('/{obj_id}', response_model=Resp_Body_Base) @router.delete('/{obj_id}', response_model=Resp_Body_Base)

View File

@@ -84,6 +84,7 @@ async def change_user_obj_password(
password: Optional[str] = Query(None, min_length=6, max_length=50), password: Optional[str] = Query(None, min_length=6, max_length=50),
x_account_id: Optional[str] = Header(..., ), x_account_id: Optional[str] = Header(..., ),
return_obj: bool = False, return_obj: bool = False,
inc_roles: bool = False,
inc_contact: bool = False, inc_contact: bool = False,
inc_organization: bool = False, inc_organization: bool = False,
inc_person: bool = False, inc_person: bool = False,
@@ -150,6 +151,46 @@ async def patch_user_obj(
return result return result
# ### BEGIN ### API User Routers ### user_new_auth_key() ###
# Generate a new one time use authorization key
@router.get('/new_auth_key', response_model=Resp_Body_Base)
async def user_new_auth_key(
user_id: Optional[str] = Query(None, min_length=2, max_length=50),
x_account_id: str = Header(...),
return_obj: Optional[bool] = False,
by_alias: bool = True,
exclude_unset: bool = True,
exclude_none: bool = True,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
update_user_data = {}
update_user_data['id_random'] = user_id
update_user_data['auth_key'] = secrets.token_urlsafe(default_num_bytes)
if user_rec_update_result := sql_update(table_name='user', data=update_user_data):
log.info('The user record was updated with a new auth_key')
if return_obj:
user_obj = load_user_obj(
user_id=user_id,
inc_contact=False,
inc_organization=False,
inc_person=False
).dict(by_alias=by_alias, exclude_unset=exclude_unset)
data = user_obj
else:
user_obj = {}
user_obj['auth_key'] = update_user_data['auth_key']
return mk_resp(data=user_obj)
else:
log.info('The user record was not updated with a new auth_key')
log.debug(user_rec_update_result)
return mk_resp(data=False, status_code=404)
# ### BEGIN ### API User Routers ### user_authenticate() ### # ### BEGIN ### API User Routers ### user_authenticate() ###
# Authenticate a username and password OR by authorization key # Authenticate a username and password OR by authorization key
# An authorization key can only be done once. It will be deleted if found. # An authorization key can only be done once. It will be deleted if found.
@@ -161,6 +202,7 @@ async def user_authenticate(
password: Optional[str] = Query(None, min_length=6, max_length=50), password: Optional[str] = Query(None, min_length=6, max_length=50),
auth_key: Optional[str] = Query(None, min_length=11, max_length=22), auth_key: Optional[str] = Query(None, min_length=11, max_length=22),
x_account_id: str = Header(...), x_account_id: str = Header(...),
inc_roles: bool = False,
inc_contact: bool = False, inc_contact: bool = False,
inc_organization: bool = False, inc_organization: bool = False,
inc_person: bool = False, inc_person: bool = False,
@@ -248,6 +290,7 @@ async def user_authenticate(
user_obj = load_user_obj( user_obj = load_user_obj(
user_id=user_id, user_id=user_id,
inc_roles=inc_roles,
inc_contact=inc_contact, inc_contact=inc_contact,
inc_organization=inc_organization, inc_organization=inc_organization,
inc_person=inc_person inc_person=inc_person
@@ -288,6 +331,7 @@ async def lookup_user_obj(
for_obj_id: Union[int,str], for_obj_id: Union[int,str],
for_obj_type: str = Query(..., min_length=2, max_length=50), for_obj_type: str = Query(..., min_length=2, max_length=50),
x_account_id: str = Header(...), x_account_id: str = Header(...),
inc_roles: bool = False,
inc_contact: bool = False, inc_contact: bool = False,
inc_organization: bool = False, inc_organization: bool = False,
inc_person: bool = False, inc_person: bool = False,
@@ -332,6 +376,7 @@ async def lookup_user_obj(
user_id = user_rec_result.get('user_id', None) user_id = user_rec_result.get('user_id', None)
user_obj = load_user_obj( user_obj = load_user_obj(
user_id=user_id, user_id=user_id,
inc_roles=inc_roles,
inc_contact=inc_contact, inc_contact=inc_contact,
inc_organization=inc_organization, inc_organization=inc_organization,
inc_person=inc_person inc_person=inc_person
@@ -344,6 +389,7 @@ async def lookup_user_obj(
user_obj_li.append( user_obj_li.append(
load_user_obj( load_user_obj(
user_id=user_id, user_id=user_id,
inc_roles=inc_roles,
inc_contact=inc_contact, inc_contact=inc_contact,
inc_organization=inc_organization, inc_organization=inc_organization,
inc_person=inc_person, inc_person=inc_person,
@@ -362,6 +408,7 @@ async def lookup_username_obj(
account_id: Union[int,str], account_id: Union[int,str],
username: str = Query(..., min_length=2, max_length=50), username: str = Query(..., min_length=2, max_length=50),
x_account_id: str = Header(...), x_account_id: str = Header(...),
inc_roles: bool = False,
inc_contact: bool = False, inc_contact: bool = False,
inc_organization: bool = False, inc_organization: bool = False,
inc_person: bool = False, inc_person: bool = False,
@@ -371,19 +418,34 @@ async def lookup_username_obj(
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals()) log.debug(locals())
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass if account_id == '':
else: return mk_resp(data=False, status_code=404) # Not Found account_id = None
elif account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'):
pass
else:
return mk_resp(data=False, status_code=404) # Not Found
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
data = {} data = {}
data['account_id'] = account_id data['account_id'] = account_id
data['username'] = username data['username'] = username
log.debug(data)
sql = f""" if account_id:
sql = f"""
SELECT id AS 'user_id', id_random AS 'user_id_random'
FROM `user` AS `user`
WHERE `user`.account_id = :account_id AND `user`.username = :username
"""
else:
sql = f"""
SELECT id AS 'user_id', id_random AS 'user_id_random' SELECT id AS 'user_id', id_random AS 'user_id_random'
FROM `user` AS `user` FROM `user` AS `user`
WHERE `user`.account_id = :account_id AND `user`.username = :username WHERE `user`.account_id IS NULL AND `user`.username = :username
""" """
log.debug(sql)
# This will return a list if selecting by account ID # This will return a list if selecting by account ID
user_obj_result = sql_select(data=data, sql=sql) user_obj_result = sql_select(data=data, sql=sql)
@@ -391,6 +453,7 @@ async def lookup_username_obj(
user_id = user_obj_result.get('user_id', None) user_id = user_obj_result.get('user_id', None)
user_obj = load_user_obj( user_obj = load_user_obj(
user_id=user_id, user_id=user_id,
inc_roles=inc_roles,
inc_contact=inc_contact, inc_contact=inc_contact,
inc_organization=inc_organization, inc_organization=inc_organization,
inc_person=inc_person inc_person=inc_person
@@ -403,6 +466,7 @@ async def lookup_username_obj(
user_obj_li.append( user_obj_li.append(
load_user_obj( load_user_obj(
user_id=user_id, user_id=user_id,
inc_roles=inc_roles,
inc_contact=inc_contact, inc_contact=inc_contact,
inc_organization=inc_organization, inc_organization=inc_organization,
inc_person=inc_person, inc_person=inc_person,