Files
OSIT-AE-API-FastAPI/app/models/user_models.py

371 lines
13 KiB
Python

import datetime, pytz, secrets
# import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random
from app.lib_general import log, logging, secure_hash_string
from app.models.common_field_schema import base_fields, default_num_bytes
# from app.models.contact_models import Contact_Base
from app.models.organization_models import Organization_Base
# from app.models.person_models import Person_Base # Causes circular import
# from app.models.user_role_models import User_Role_Base
# ### BEGIN ### API User Models ### User_New_Base() ###
class User_New_Base(BaseModel):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
id_random: Optional[str] = Field(
**base_fields['user_id_random'],
alias = 'user_id_random',
# default_factory = lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
alias = 'user_id'
)
account_id_random: Optional[str]
account_id: Optional[int]
account_name: Optional[str]
contact_id_random: Optional[str]
contact_id: Optional[int]
organization_id_random: Optional[str]
organization_id: Optional[int]
person_id_random: Optional[str]
person_id: Optional[int]
username: str
name: str
email: str
email_verified: bool = False
new_password: str = Field(default_factory = lambda:secrets.token_urlsafe(default_num_bytes))
password: Optional[str] # If new_password is found then the validator below will create secure_hash_string() from the new password string.
allow_auth_key: bool = False
enable: bool = False
enable_from: Optional[datetime.datetime] = datetime.datetime.now(datetime.timezone.utc)
#enable_from: Optional[datetime.datetime] = datetime.datetime.now()
enable_to: Optional[datetime.datetime] = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365)
#enable_to: Optional[datetime.datetime] = datetime.datetime.now() + datetime.timedelta(days=365)
#super: Optional[bool] = False
#manager: Optional[bool] = False
administrator: bool = False
public: bool = False
verified: bool = False
group: Optional[str]
notes: Optional[str]
# Including JSON data
other_json: Optional[Json]
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('user_id_random', always=True)
def user_id_random_copy(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def user_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
log.debug(values['id_random'])
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='user')
return None
@validator('account_id', always=True)
def account_id_lookup(cls, v, values, **kwargs):
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('account_id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='account')
return None
@validator('contact_id', always=True)
def contact_id_lookup(cls, v, values, **kwargs):
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('contact_id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='contact')
return None
@validator('organization_id', always=True)
def organization_id_lookup(cls, v, values, **kwargs):
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('organization_id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='organization')
return None
@validator('person_id', always=True)
def person_id_lookup(cls, v, values, **kwargs):
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('person_id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='person')
return None
@validator('password', always=True)
def hash_new_password(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values.get('new_password'):
return secure_hash_string(string=values['new_password'])
return None
class Config:
underscore_attrs_are_private = True
allow_population_by_field_name = True
fields = base_fields
# ### END ### API User Models ### User_New_Base() ###
# ### BEGIN ### API User Models ### User_Out_Base() ###
class User_Out_Base(BaseModel):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
id_random: Optional[str] = Field(
**base_fields['user_id_random'],
alias = 'user_id_random',
)
id: Optional[int] = Field(
alias = 'user_id'
)
account_id_random: Optional[str]
#account_id: Optional[int]
account_name: Optional[str]
contact_id_random: Optional[str]
#contact_id: Optional[int]
organization_id_random: Optional[str]
#organization_id: Optional[int]
person_id_random: Optional[str]
#person_id: Optional[int]
username: Optional[str]
name: Optional[str]
email: Optional[str]
email_verified: Optional[bool]
password: Optional[str]
allow_auth_key: Optional[int]
auth_key: Optional[str]
enable: Optional[bool]
enable_from: Optional[datetime.datetime]
enable_to: Optional[datetime.datetime]
super: Optional[bool]
manager: Optional[bool]
administrator: Optional[bool]
public: Optional[bool]
verified: Optional[bool]
status_id: Optional[int]
status_name: Optional[str]
password_set_on: Optional[datetime.datetime]
password_reset_token: Optional[str]
password_reset_expire_on: Optional[datetime.datetime]
logged_in_on: Optional[datetime.datetime]
last_activity_on: Optional[datetime.datetime]
group: Optional[str]
notes: Optional[str]
created_on: Optional[datetime.datetime]
updated_on: Optional[datetime.datetime]
# Including other related objects
# from app.models.person_models import Person_Base # Causes circular import
# archive_list: Optional[list] # Archive_Base()
# contact: Optional[Contact_Base]
event_list: Optional[list] # Event_Base() # Priority complete
hosted_file_list: Optional[list] # Hosted_File_Base() # Priority l3
journal_list: Optional[list] # Journal_Base() # Priority l3
# membership_person: Optional[Membership_Person_Base] # Priority l2
# membership_person_list: Optional[list] # Membership_Base() ???
order_list: Optional[list] # Order_Base() # Priority l2
order_cart_list: Optional[list] # Order_Base() # Priority l2
organization: Optional[Union[Organization_Base, None]] # Organization_Base() # Priority l3
person: Optional[dict] # Person_Base() # Priority l2
# person: Optional[Union[Person_Base, None]]
post_list: Optional[list] # Post_Base() # Priority l1
user_role_list: Optional[list] = Field(
alias = 'role_list'
) # User_Role_Base()
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
class Config:
underscore_attrs_are_private = True
allow_population_by_field_name = True
fields = base_fields
# ### END ### API User Models ### User_Out_Base() ###
# ### BEGIN ### API User Models ### User_Base() ###
class User_Base(BaseModel):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
id_random: Optional[str] = Field(
**base_fields['user_id_random'],
alias = 'user_id_random',
# default_factory = lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
alias = 'user_id'
)
account_id_random: Optional[str]
account_id: Optional[int]
account_name: Optional[str]
# contact_id_random: Optional[str]
# contact_id: Optional[int]
organization_id_random: Optional[str]
organization_id: Optional[int]
person_id_random: Optional[str]
person_id: Optional[int]
username: Optional[str]
name: Optional[str]
email: Optional[str]
email_verified: Optional[bool]
password: Optional[str]
current_password: Optional[str]
new_password: Optional[str]
allow_auth_key: Optional[int]
auth_key: Optional[str]
enable: Optional[bool]
enable_from: Optional[datetime.datetime] = None
enable_to: Optional[datetime.datetime] = None
super: Optional[bool]
manager: Optional[bool]
administrator: Optional[bool]
public: Optional[bool]
verified: Optional[bool]
status_id: Optional[int]
status_name: Optional[str]
password_set_on: Optional[datetime.datetime] = None
password_reset_token: Optional[str] = None
password_reset_expire_on: Optional[datetime.datetime] = None
logged_in_on: Optional[datetime.datetime] = None
last_activity_on: Optional[datetime.datetime] = None
group: Optional[str]
notes: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
# Including other related objects
# from app.models.person_models import Person_Base # Causes circular import
# archive_list: Optional[list] # Archive_Base()
# contact: Optional[Contact_Base]
event_list: Optional[list] # Event_Base() # Priority l1
hosted_file_list: Optional[list] # Hosted_File_Base() # Priority l2
journal_list: Optional[list] # Journal_Base() # Priority l3
order_list: Optional[list] # Order_Base() # Priority l2
order_cart_list: Optional[list] # Order_Base() # Priority l2
organization: Optional[Union[Organization_Base, None]] # Organization_Base() # Priority l3
person: Optional[dict] # Person_Base() # Priority l2
# person: Optional[Union[Person_Base, None]]
post_list: Optional[list] # Post_Base() # Priority l1
user_role_list: Optional[list] = Field(
alias = 'role_list'
) # User_Role_Base()
# role_list: Optional[list] = [] # User_Role_Base() # NOTE <- This is a duplicate of above!
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('user_id_random', always=True)
def user_id_random_copy(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def user_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
log.debug(values['id_random'])
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='user')
return None
@validator('account_id', always=True)
def account_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['account_id_random']:
return redis_lookup_id_random(record_id_random=values['account_id_random'], table_name='account')
return None
# @validator('contact_id', always=True)
# def contact_id_lookup(cls, v, values, **kwargs):
# log.setLevel(logging.WARNING)
# log.debug(locals())
# if values['contact_id_random']:
# return redis_lookup_id_random(record_id_random=values['contact_id_random'], table_name='contact')
# return None
@validator('organization_id', always=True)
def organization_id_lookup(cls, v, values, **kwargs):
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('organization_id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='organization')
return None
@validator('person_id', always=True)
def person_id_lookup(cls, v, values, **kwargs):
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('person_id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='person')
return None
@validator('password', always=True)
def hash_new_password(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values.get('new_password'):
return secure_hash_string(string=values['new_password'])
return None
class Config:
underscore_attrs_are_private = True
allow_population_by_field_name = True
fields = base_fields
# ### END ### API User Models ### User_Base() ###