Files
OSIT-AE-API-FastAPI/app/models/user_models.py

406 lines
14 KiB
Python

import datetime, hashlib, logging, os, pytz, redis, secrets
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
from app.db_sql import get_id_random, redis_lookup_id_random
from app.lib_general import log, logging, secure_hash_string
from app.models.common_field_schema import base_fields, default_num_bytes
# from app.models.contact_models import Contact_Base
from app.models.organization_models import Organization_Base
# from app.models.person_models import Person_Base # Causes circular import
# from app.models.user_role_models import User_Role_Base
# ### BEGIN ### API User Models ### User_Base() ###
class User_Base(BaseModel):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
id_random: Optional[str] = Field(
**base_fields['user_id_random'],
alias = 'user_id_random',
# default_factory = lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
alias = 'user_id'
)
account_id_random: Optional[str]
account_id: Optional[int]
account_name: Optional[str]
# contact_id_random: Optional[str]
# contact_id: Optional[int]
organization_id_random: Optional[str]
organization_id: Optional[int]
person_id_random: Optional[str]
person_id: Optional[int]
username: Optional[str]
name: Optional[str]
email: Optional[str]
email_verified: Optional[bool]
password: Optional[str]
current_password: Optional[str]
new_password: Optional[str]
allow_auth_key: Optional[int]
auth_key: Optional[str]
enable: Optional[bool]
enable_from: Optional[datetime.datetime] = None
enable_to: Optional[datetime.datetime] = None
super: Optional[bool]
manager: Optional[bool]
administrator: Optional[bool]
public: Optional[bool]
verified: Optional[bool]
status_id: Optional[int]
status_name: Optional[str]
hide: Optional[bool]
priority: Optional[bool]
sort: Optional[int]
group: Optional[str]
notes: Optional[str]
created_on: Optional[datetime.datetime] = None
updated_on: Optional[datetime.datetime] = None
# Including other related objects
# from app.models.person_models import Person_Base # Causes circular import
# archive_list: Optional[list] # Archive_Base()
# contact: Optional[Contact_Base]
event_list: Optional[list] # Event_Base() # Priority l1
hosted_file_list: Optional[list] # Hosted_File_Base() # Priority l2
journal_list: Optional[list] # Journal_Base() # Priority l3
order_list: Optional[list] # Order_Base() # Priority l2
order_cart_list: Optional[list] # Order_Base() # Priority l2
organization: Optional[Union[Organization_Base, None]] # Organization_Base() # Priority l3
person: Optional[dict] # Person_Base() # Priority l2
# person: Optional[Union[Person_Base, None]]
post_list: Optional[list] # Post_Base() # Priority l1
user_role_list: Optional[list] = Field(
alias = 'role_list'
) # User_Role_Base()
# role_list: Optional[list] = [] # User_Role_Base() # NOTE <- This is a duplicate of above!
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('user_id_random', always=True)
def user_id_random_copy(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def user_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='user')
return None
@validator('account_id', always=True)
def account_id_lookup(cls, v, values, **kwargs):
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('account_id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='account')
return None
@validator('account_id_random', always=True)
def account_id_random_lookup(cls, v, values, **kwargs):
if isinstance(v, str) and len(v) >= 11: return v
elif account_id := values.get('account_id'):
return get_id_random(record_id=account_id, table_name='account')
return None
# @validator('contact_id', always=True)
# def contact_id_lookup(cls, v, values, **kwargs):
# log.setLevel(logging.WARNING)
# log.debug(locals())
# if values['contact_id_random']:
# return redis_lookup_id_random(record_id_random=values['contact_id_random'], table_name='contact')
# return None
@validator('organization_id', always=True)
def organization_id_lookup(cls, v, values, **kwargs):
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('organization_id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='organization')
return None
@validator('person_id', always=True)
def person_id_lookup(cls, v, values, **kwargs):
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('person_id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='person')
return None
@validator('password', always=True)
def hash_new_password(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values.get('new_password'):
return secure_hash_string(string=values['new_password'])
return None
class Config:
underscore_attrs_are_private = True
allow_population_by_field_name = True
fields = base_fields
# ### END ### API User Models ### User_Base() ###
# ### BEGIN ### API User Models ### User_New_Base() ###
class User_New_Base(BaseModel):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
id_random: Optional[str] = Field(
**base_fields['user_id_random'],
alias = 'user_id_random',
# default_factory = lambda:secrets.token_urlsafe(default_num_bytes),
)
id: Optional[int] = Field(
alias = 'user_id'
)
account_id_random: Optional[str]
account_id: Optional[int]
account_name: Optional[str]
contact_id_random: Optional[str]
contact_id: Optional[int]
organization_id_random: Optional[str]
organization_id: Optional[int]
person_id_random: Optional[str]
person_id: Optional[int]
username: str
name: str
email: str
email_verified: bool = False
new_password: str = Field(default_factory = lambda:secrets.token_urlsafe(default_num_bytes))
password: Optional[str] # If new_password is found then the validator below will create secure_hash_string() from the new password string.
allow_auth_key: bool = False
enable: bool = False
enable_from: Optional[datetime.datetime] = datetime.datetime.now(datetime.timezone.utc)
#enable_from: Optional[datetime.datetime] = datetime.datetime.now()
enable_to: Optional[datetime.datetime] = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365)
#enable_to: Optional[datetime.datetime] = datetime.datetime.now() + datetime.timedelta(days=365)
#super: Optional[bool] = False
#manager: Optional[bool] = False
administrator: bool = False
public: bool = False
verified: bool = False
hide: Optional[bool]
priority: Optional[bool]
sort: Optional[int]
group: Optional[str]
notes: Optional[str]
# Including JSON data
other_json: Optional[Json]
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
#@validator('user_id_random', always=True)
def user_id_random_copy(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values['id_random']:
return values['id_random']
return None
@validator('id', always=True)
def user_id_lookup(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='user')
return None
@validator('account_id', always=True)
def account_id_lookup(cls, v, values, **kwargs):
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('account_id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='account')
return None
@validator('account_id_random', always=True)
def account_id_random_lookup(cls, v, values, **kwargs):
if isinstance(v, str) and len(v) >= 11: return v
elif account_id := values.get('account_id'):
return get_id_random(record_id=account_id, table_name='account')
return None
@validator('contact_id', always=True)
def contact_id_lookup(cls, v, values, **kwargs):
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('contact_id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='contact')
return None
@validator('organization_id', always=True)
def organization_id_lookup(cls, v, values, **kwargs):
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('organization_id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='organization')
return None
@validator('person_id', always=True)
def person_id_lookup(cls, v, values, **kwargs):
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('person_id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='person')
return None
@validator('password', always=True)
def hash_new_password(cls, v, values, **kwargs):
log.setLevel(logging.WARNING)
log.debug(locals())
if values.get('new_password'):
return secure_hash_string(string=values['new_password'])
return None
class Config:
underscore_attrs_are_private = True
allow_population_by_field_name = True
fields = base_fields
# ### END ### API User Models ### User_New_Base() ###
# ### BEGIN ### API User Models ### User_Out_Base() ###
class User_Out_Base(BaseModel):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
id_random: Optional[str] = Field(
**base_fields['user_id_random'],
alias = 'user_id_random',
)
id: Optional[int] = Field(
alias = 'user_id'
)
account_id_random: Optional[str]
account_id: Optional[int]
account_name: Optional[str]
contact_id_random: Optional[str]
contact_id: Optional[int]
organization_id_random: Optional[str]
organization_id: Optional[int]
person_id_random: Optional[str]
person_id: Optional[int]
username: Optional[str]
name: Optional[str]
email: Optional[str]
email_verified: Optional[bool]
password: Optional[str]
allow_auth_key: Optional[int]
auth_key: Optional[str]
enable: Optional[bool]
enable_from: Optional[datetime.datetime]
enable_to: Optional[datetime.datetime]
super: Optional[bool]
manager: Optional[bool]
administrator: Optional[bool]
public: Optional[bool]
verified: Optional[bool]
status_id: Optional[int]
status_name: Optional[str]
password_set_on: Optional[datetime.datetime]
password_reset_token: Optional[str]
password_reset_expire_on: Optional[datetime.datetime]
logged_in_on: Optional[datetime.datetime]
last_activity_on: Optional[datetime.datetime]
hide: Optional[bool]
priority: Optional[bool]
sort: Optional[int]
group: Optional[str]
notes: Optional[str]
created_on: Optional[datetime.datetime]
updated_on: Optional[datetime.datetime]
# Including other related objects
# from app.models.person_models import Person_Base # Causes circular import
# archive_list: Optional[list] # Archive_Base()
# contact: Optional[Contact_Base]
event_list: Optional[list] # Event_Base() # Priority l1
hosted_file_list: Optional[list] # Hosted_File_Base() # Priority l2
journal_list: Optional[list] # Journal_Base() # Priority l3
# membership_person: Optional[Membership_Person_Base] # Priority l2
# membership_person_list: Optional[list] # Membership_Base() ???
order_list: Optional[list] # Order_Base() # Priority l2
order_cart_list: Optional[list] # Order_Base() # Priority l2
organization: Optional[Union[Organization_Base, None]] # Organization_Base() # Priority l3
person: Optional[dict] # Person_Base() # Priority l2
# person: Optional[Union[Person_Base, None]]
post_list: Optional[list] # Post_Base() # Priority l1
user_role_list: Optional[list] = Field(
alias = 'role_list'
) # User_Role_Base()
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
@validator('account_id', always=True)
def account_id_lookup(cls, v, values, **kwargs):
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('account_id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='account')
return None
@validator('account_id_random', always=True)
def account_id_random_lookup(cls, v, values, **kwargs):
if isinstance(v, str) and len(v) >= 11: return v
elif account_id := values.get('account_id'):
return get_id_random(record_id=account_id, table_name='account')
return None
@validator('person_id', always=True)
def person_id_lookup(cls, v, values, **kwargs):
if isinstance(v, int) and v > 0: return v
elif id_random := values.get('person_id_random'):
return redis_lookup_id_random(record_id_random=id_random, table_name='person')
return None
class Config:
underscore_attrs_are_private = True
allow_population_by_field_name = True
fields = base_fields
# ### END ### API User Models ### User_Out_Base() ###