import datetime, hashlib, logging, os, pytz, redis, secrets from typing import Dict, List, Optional, Set, Union from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator from app.db_sql import redis_lookup_id_random from app.lib_general import log, logging, secure_hash_string from app.models.common_field_schema import base_fields, default_num_bytes # from app.models.contact_models import Contact_Base # from app.models.membership_person_models import Membership_Person_Base from app.models.organization_models import Organization_Base # from app.models.person_models import Person_Base # Causes circular import # from app.models.user_role_models import User_Role_Base # ### BEGIN ### API User Models ### User_New_Base() ### class User_New_Base(BaseModel): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) id_random: Optional[str] = Field( **base_fields['user_id_random'], alias = 'user_id_random', default_factory = lambda:secrets.token_urlsafe(default_num_bytes), ) id: Optional[int] = Field( alias = 'user_id' ) account_id_random: Optional[str] account_id: Optional[int] account_name: Optional[str] contact_id_random: Optional[str] contact_id: Optional[int] organization_id_random: Optional[str] organization_id: Optional[int] person_id_random: Optional[str] person_id: Optional[int] username: str name: str email: str email_verified: bool = False new_password: str = Field(default_factory = lambda:secrets.token_urlsafe(default_num_bytes)) password: Optional[str] # If new_password is found then the validator below will create secure_hash_string() from the new password string. allow_auth_key: bool = False enable: bool = False enable_from: Optional[datetime.datetime] = datetime.datetime.now(datetime.timezone.utc) #enable_from: Optional[datetime.datetime] = datetime.datetime.now() enable_to: Optional[datetime.datetime] = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365) #enable_to: Optional[datetime.datetime] = datetime.datetime.now() + datetime.timedelta(days=365) #super: Optional[bool] = False #manager: Optional[bool] = False administrator: bool = False public: bool = False verified: bool = False group: Optional[str] notes: Optional[str] # Including JSON data other_json: Optional[Json] _processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now) #@validator('user_id_random', always=True) def user_id_random_copy(cls, v, values, **kwargs): log.setLevel(logging.WARNING) log.debug(locals()) if values['id_random']: return values['id_random'] return None @validator('id', always=True) def user_id_lookup(cls, v, values, **kwargs): log.setLevel(logging.WARNING) log.debug(locals()) if values['id_random']: log.debug(values['id_random']) return redis_lookup_id_random(record_id_random=values['id_random'], table_name='user') return None @validator('account_id', always=True) def account_id_lookup(cls, v, values, **kwargs): log.setLevel(logging.WARNING) log.debug(locals()) if values['account_id_random']: return redis_lookup_id_random(record_id_random=values['account_id_random'], table_name='account') return None @validator('contact_id', always=True) def contact_id_lookup(cls, v, values, **kwargs): log.setLevel(logging.WARNING) log.debug(locals()) if values['contact_id_random']: return redis_lookup_id_random(record_id_random=values['contact_id_random'], table_name='contact') return None @validator('organization_id', always=True) def organization_id_lookup(cls, v, values, **kwargs): log.setLevel(logging.WARNING) log.debug(locals()) if values['organization_id_random']: return redis_lookup_id_random(record_id_random=values['organization_id_random'], table_name='organization') return None @validator('person_id', always=True) def person_id_lookup(cls, v, values, **kwargs): log.setLevel(logging.WARNING) log.debug(locals()) if values['person_id_random']: return redis_lookup_id_random(record_id_random=values['person_id_random'], table_name='person') return None @validator('password', always=True) def hash_new_password(cls, v, values, **kwargs): log.setLevel(logging.WARNING) log.debug(locals()) if values.get('new_password'): return secure_hash_string(string=values['new_password']) return None class Config: underscore_attrs_are_private = True allow_population_by_field_name = True fields = base_fields # ### END ### API User Models ### User_New_Base() ### # ### BEGIN ### API User Models ### User_Out_Base() ### class User_Out_Base(BaseModel): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) id_random: Optional[str] = Field( **base_fields['user_id_random'], alias = 'user_id_random', ) id: Optional[int] = Field( alias = 'user_id' ) account_id_random: Optional[str] #account_id: Optional[int] account_name: Optional[str] contact_id_random: Optional[str] #contact_id: Optional[int] organization_id_random: Optional[str] #organization_id: Optional[int] person_id_random: Optional[str] #person_id: Optional[int] username: Optional[str] name: Optional[str] email: Optional[str] email_verified: Optional[bool] password: Optional[str] allow_auth_key: Optional[int] auth_key: Optional[str] enable: Optional[bool] enable_from: Optional[datetime.datetime] enable_to: Optional[datetime.datetime] super: Optional[bool] manager: Optional[bool] administrator: Optional[bool] public: Optional[bool] verified: Optional[bool] status_id: Optional[int] status_name: Optional[str] password_set_on: Optional[datetime.datetime] password_reset_token: Optional[str] password_reset_expire_on: Optional[datetime.datetime] logged_in_on: Optional[datetime.datetime] last_activity_on: Optional[datetime.datetime] group: Optional[str] notes: Optional[str] created_on: Optional[datetime.datetime] updated_on: Optional[datetime.datetime] # Including other related objects # from app.models.person_models import Person_Base # Causes circular import # archive_list: Optional[list] # Archive_Base() # contact: Optional[Contact_Base] event_list: Optional[list] # Event_Base() # Priority complete hosted_file_list: Optional[list] # Hosted_File_Base() # Priority l3 journal_list: Optional[list] # Journal_Base() # Priority l3 # membership_person: Optional[Membership_Person_Base] # Priority l2 # membership_person_list: Optional[list] # Membership_Base() ??? order_list: Optional[list] # Order_Base() # Priority l2 order_cart_list: Optional[list] # Order_Base() # Priority l2 organization: Optional[Union[Organization_Base, None]] # Organization_Base() # Priority l3 person: Optional[dict] # Person_Base() # Priority l2 # person: Optional[Union[Person_Base, None]] post_list: Optional[list] # Post_Base() # Priority l1 user_role_list: Optional[list] = Field( alias = 'role_list' ) # User_Role_Base() _processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now) class Config: underscore_attrs_are_private = True allow_population_by_field_name = True fields = base_fields # ### END ### API User Models ### User_Out_Base() ### # ### BEGIN ### API User Models ### User_Base() ### class User_Base(BaseModel): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) id_random: Optional[str] = Field( **base_fields['user_id_random'], alias = 'user_id_random', default_factory = lambda:secrets.token_urlsafe(default_num_bytes), ) id: Optional[int] = Field( alias = 'user_id' ) account_id_random: Optional[str] account_id: Optional[int] account_name: Optional[str] # contact_id_random: Optional[str] # contact_id: Optional[int] organization_id_random: Optional[str] organization_id: Optional[int] person_id_random: Optional[str] person_id: Optional[int] username: Optional[str] name: Optional[str] email: Optional[str] email_verified: Optional[bool] password: Optional[str] current_password: Optional[str] new_password: Optional[str] allow_auth_key: Optional[int] auth_key: Optional[str] enable: Optional[bool] enable_from: Optional[datetime.datetime] = None enable_to: Optional[datetime.datetime] = None super: Optional[bool] manager: Optional[bool] administrator: Optional[bool] public: Optional[bool] verified: Optional[bool] status_id: Optional[int] status_name: Optional[str] password_set_on: Optional[datetime.datetime] = None password_reset_token: Optional[str] = None password_reset_expire_on: Optional[datetime.datetime] = None logged_in_on: Optional[datetime.datetime] = None last_activity_on: Optional[datetime.datetime] = None group: Optional[str] notes: Optional[str] created_on: Optional[datetime.datetime] = None updated_on: Optional[datetime.datetime] = None # Including other related objects # from app.models.person_models import Person_Base # Causes circular import # archive_list: Optional[list] # Archive_Base() # contact: Optional[Contact_Base] event_list: Optional[list] # Event_Base() # Priority l1 hosted_file_list: Optional[list] # Hosted_File_Base() # Priority l2 journal_list: Optional[list] # Journal_Base() # Priority l3 order_list: Optional[list] # Order_Base() # Priority l2 order_cart_list: Optional[list] # Order_Base() # Priority l2 organization: Optional[Union[Organization_Base, None]] # Organization_Base() # Priority l3 person: Optional[dict] # Person_Base() # Priority l2 # person: Optional[Union[Person_Base, None]] post_list: Optional[list] # Post_Base() # Priority l1 user_role_list: Optional[list] = Field( alias = 'role_list' ) # User_Role_Base() # role_list: Optional[list] = [] # User_Role_Base() # NOTE <- This is a duplicate of above! _processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now) #@validator('user_id_random', always=True) def user_id_random_copy(cls, v, values, **kwargs): log.setLevel(logging.WARNING) log.debug(locals()) if values['id_random']: return values['id_random'] return None @validator('id', always=True) def user_id_lookup(cls, v, values, **kwargs): log.setLevel(logging.WARNING) log.debug(locals()) if values['id_random']: log.debug(values['id_random']) return redis_lookup_id_random(record_id_random=values['id_random'], table_name='user') return None @validator('account_id', always=True) def account_id_lookup(cls, v, values, **kwargs): log.setLevel(logging.WARNING) log.debug(locals()) if values['account_id_random']: return redis_lookup_id_random(record_id_random=values['account_id_random'], table_name='account') return None # @validator('contact_id', always=True) # def contact_id_lookup(cls, v, values, **kwargs): # log.setLevel(logging.WARNING) # log.debug(locals()) # if values['contact_id_random']: # return redis_lookup_id_random(record_id_random=values['contact_id_random'], table_name='contact') # return None @validator('organization_id', always=True) def organization_id_lookup(cls, v, values, **kwargs): log.setLevel(logging.WARNING) log.debug(locals()) if values['organization_id_random']: return redis_lookup_id_random(record_id_random=values['organization_id_random'], table_name='organization') return None @validator('person_id', always=True) def person_id_lookup(cls, v, values, **kwargs): log.setLevel(logging.WARNING) log.debug(locals()) if values['person_id_random']: return redis_lookup_id_random(record_id_random=values['person_id_random'], table_name='person') return None @validator('password', always=True) def hash_new_password(cls, v, values, **kwargs): log.setLevel(logging.WARNING) log.debug(locals()) if values.get('new_password'): return secure_hash_string(string=values['new_password']) return None class Config: underscore_attrs_are_private = True allow_population_by_field_name = True fields = base_fields # ### END ### API User Models ### User_Base() ###