feat(v3): robust search wildcards, smart status filtering, and fixed ID population
This commit is contained in:
@@ -1,10 +1,9 @@
|
||||
import datetime, pytz, secrets
|
||||
# import datetime, hashlib, logging, os, pytz, redis, secrets
|
||||
import datetime, hashlib, logging, os, pytz, redis, secrets
|
||||
|
||||
from typing import Dict, List, Optional, Set, Union
|
||||
from pydantic import BaseModel, EmailStr, Field, Json, PrivateAttr, ValidationError, validator
|
||||
|
||||
from app.db_sql import redis_lookup_id_random
|
||||
from app.db_sql import get_id_random, redis_lookup_id_random
|
||||
from app.lib_general import log, logging, secure_hash_string
|
||||
|
||||
from app.models.common_field_schema import base_fields, default_num_bytes
|
||||
@@ -14,212 +13,6 @@ from app.models.organization_models import Organization_Base
|
||||
# from app.models.user_role_models import User_Role_Base
|
||||
|
||||
|
||||
# ### BEGIN ### API User Models ### User_New_Base() ###
|
||||
class User_New_Base(BaseModel):
|
||||
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(locals())
|
||||
|
||||
id_random: Optional[str] = Field(
|
||||
**base_fields['user_id_random'],
|
||||
alias = 'user_id_random',
|
||||
# default_factory = lambda:secrets.token_urlsafe(default_num_bytes),
|
||||
)
|
||||
id: Optional[int] = Field(
|
||||
alias = 'user_id'
|
||||
)
|
||||
account_id_random: Optional[str]
|
||||
account_id: Optional[int]
|
||||
account_name: Optional[str]
|
||||
|
||||
contact_id_random: Optional[str]
|
||||
contact_id: Optional[int]
|
||||
|
||||
organization_id_random: Optional[str]
|
||||
organization_id: Optional[int]
|
||||
|
||||
person_id_random: Optional[str]
|
||||
person_id: Optional[int]
|
||||
|
||||
username: str
|
||||
name: str
|
||||
email: str
|
||||
email_verified: bool = False
|
||||
new_password: str = Field(default_factory = lambda:secrets.token_urlsafe(default_num_bytes))
|
||||
password: Optional[str] # If new_password is found then the validator below will create secure_hash_string() from the new password string.
|
||||
|
||||
allow_auth_key: bool = False
|
||||
|
||||
enable: bool = False
|
||||
enable_from: Optional[datetime.datetime] = datetime.datetime.now(datetime.timezone.utc)
|
||||
#enable_from: Optional[datetime.datetime] = datetime.datetime.now()
|
||||
enable_to: Optional[datetime.datetime] = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365)
|
||||
#enable_to: Optional[datetime.datetime] = datetime.datetime.now() + datetime.timedelta(days=365)
|
||||
|
||||
#super: Optional[bool] = False
|
||||
#manager: Optional[bool] = False
|
||||
administrator: bool = False
|
||||
public: bool = False
|
||||
verified: bool = False
|
||||
|
||||
group: Optional[str]
|
||||
|
||||
notes: Optional[str]
|
||||
|
||||
# Including JSON data
|
||||
other_json: Optional[Json]
|
||||
|
||||
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
|
||||
|
||||
#@validator('user_id_random', always=True)
|
||||
def user_id_random_copy(cls, v, values, **kwargs):
|
||||
log.setLevel(logging.WARNING)
|
||||
log.debug(locals())
|
||||
|
||||
if values['id_random']:
|
||||
return values['id_random']
|
||||
return None
|
||||
|
||||
@validator('id', always=True)
|
||||
def user_id_lookup(cls, v, values, **kwargs):
|
||||
log.setLevel(logging.WARNING)
|
||||
log.debug(locals())
|
||||
|
||||
if values['id_random']:
|
||||
log.debug(values['id_random'])
|
||||
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='user')
|
||||
return None
|
||||
|
||||
@validator('account_id', always=True)
|
||||
def account_id_lookup(cls, v, values, **kwargs):
|
||||
if isinstance(v, int) and v > 0: return v
|
||||
elif id_random := values.get('account_id_random'):
|
||||
return redis_lookup_id_random(record_id_random=id_random, table_name='account')
|
||||
return None
|
||||
|
||||
@validator('contact_id', always=True)
|
||||
def contact_id_lookup(cls, v, values, **kwargs):
|
||||
if isinstance(v, int) and v > 0: return v
|
||||
elif id_random := values.get('contact_id_random'):
|
||||
return redis_lookup_id_random(record_id_random=id_random, table_name='contact')
|
||||
return None
|
||||
|
||||
@validator('organization_id', always=True)
|
||||
def organization_id_lookup(cls, v, values, **kwargs):
|
||||
if isinstance(v, int) and v > 0: return v
|
||||
elif id_random := values.get('organization_id_random'):
|
||||
return redis_lookup_id_random(record_id_random=id_random, table_name='organization')
|
||||
return None
|
||||
|
||||
@validator('person_id', always=True)
|
||||
def person_id_lookup(cls, v, values, **kwargs):
|
||||
if isinstance(v, int) and v > 0: return v
|
||||
elif id_random := values.get('person_id_random'):
|
||||
return redis_lookup_id_random(record_id_random=id_random, table_name='person')
|
||||
return None
|
||||
|
||||
@validator('password', always=True)
|
||||
def hash_new_password(cls, v, values, **kwargs):
|
||||
log.setLevel(logging.WARNING)
|
||||
log.debug(locals())
|
||||
|
||||
if values.get('new_password'):
|
||||
return secure_hash_string(string=values['new_password'])
|
||||
return None
|
||||
|
||||
class Config:
|
||||
underscore_attrs_are_private = True
|
||||
allow_population_by_field_name = True
|
||||
fields = base_fields
|
||||
# ### END ### API User Models ### User_New_Base() ###
|
||||
|
||||
|
||||
# ### BEGIN ### API User Models ### User_Out_Base() ###
|
||||
class User_Out_Base(BaseModel):
|
||||
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(locals())
|
||||
|
||||
id_random: Optional[str] = Field(
|
||||
**base_fields['user_id_random'],
|
||||
alias = 'user_id_random',
|
||||
)
|
||||
id: Optional[int] = Field(
|
||||
alias = 'user_id'
|
||||
)
|
||||
|
||||
account_id_random: Optional[str]
|
||||
#account_id: Optional[int]
|
||||
account_name: Optional[str]
|
||||
|
||||
contact_id_random: Optional[str]
|
||||
#contact_id: Optional[int]
|
||||
|
||||
organization_id_random: Optional[str]
|
||||
#organization_id: Optional[int]
|
||||
|
||||
person_id_random: Optional[str]
|
||||
#person_id: Optional[int]
|
||||
|
||||
username: Optional[str]
|
||||
name: Optional[str]
|
||||
email: Optional[str]
|
||||
email_verified: Optional[bool]
|
||||
password: Optional[str]
|
||||
|
||||
allow_auth_key: Optional[int]
|
||||
auth_key: Optional[str]
|
||||
|
||||
enable: Optional[bool]
|
||||
enable_from: Optional[datetime.datetime]
|
||||
enable_to: Optional[datetime.datetime]
|
||||
|
||||
super: Optional[bool]
|
||||
manager: Optional[bool]
|
||||
administrator: Optional[bool]
|
||||
public: Optional[bool]
|
||||
verified: Optional[bool]
|
||||
status_id: Optional[int]
|
||||
status_name: Optional[str]
|
||||
|
||||
password_set_on: Optional[datetime.datetime]
|
||||
password_reset_token: Optional[str]
|
||||
password_reset_expire_on: Optional[datetime.datetime]
|
||||
logged_in_on: Optional[datetime.datetime]
|
||||
last_activity_on: Optional[datetime.datetime]
|
||||
|
||||
group: Optional[str]
|
||||
|
||||
notes: Optional[str]
|
||||
created_on: Optional[datetime.datetime]
|
||||
updated_on: Optional[datetime.datetime]
|
||||
|
||||
# Including other related objects
|
||||
# from app.models.person_models import Person_Base # Causes circular import
|
||||
# archive_list: Optional[list] # Archive_Base()
|
||||
# contact: Optional[Contact_Base]
|
||||
event_list: Optional[list] # Event_Base() # Priority complete
|
||||
hosted_file_list: Optional[list] # Hosted_File_Base() # Priority l3
|
||||
journal_list: Optional[list] # Journal_Base() # Priority l3
|
||||
# membership_person: Optional[Membership_Person_Base] # Priority l2
|
||||
# membership_person_list: Optional[list] # Membership_Base() ???
|
||||
order_list: Optional[list] # Order_Base() # Priority l2
|
||||
order_cart_list: Optional[list] # Order_Base() # Priority l2
|
||||
organization: Optional[Union[Organization_Base, None]] # Organization_Base() # Priority l3
|
||||
person: Optional[dict] # Person_Base() # Priority l2
|
||||
# person: Optional[Union[Person_Base, None]]
|
||||
post_list: Optional[list] # Post_Base() # Priority l1
|
||||
user_role_list: Optional[list] = Field(
|
||||
alias = 'role_list'
|
||||
) # User_Role_Base()
|
||||
|
||||
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
|
||||
|
||||
class Config:
|
||||
underscore_attrs_are_private = True
|
||||
allow_population_by_field_name = True
|
||||
fields = base_fields
|
||||
# ### END ### API User Models ### User_Out_Base() ###
|
||||
|
||||
|
||||
# ### BEGIN ### API User Models ### User_Base() ###
|
||||
class User_Base(BaseModel):
|
||||
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
@@ -271,12 +64,9 @@ class User_Base(BaseModel):
|
||||
status_id: Optional[int]
|
||||
status_name: Optional[str]
|
||||
|
||||
password_set_on: Optional[datetime.datetime] = None
|
||||
password_reset_token: Optional[str] = None
|
||||
password_reset_expire_on: Optional[datetime.datetime] = None
|
||||
logged_in_on: Optional[datetime.datetime] = None
|
||||
last_activity_on: Optional[datetime.datetime] = None
|
||||
|
||||
hide: Optional[bool]
|
||||
priority: Optional[bool]
|
||||
sort: Optional[int]
|
||||
group: Optional[str]
|
||||
|
||||
notes: Optional[str]
|
||||
@@ -317,18 +107,23 @@ class User_Base(BaseModel):
|
||||
log.setLevel(logging.WARNING)
|
||||
log.debug(locals())
|
||||
|
||||
if values['id_random']:
|
||||
log.debug(values['id_random'])
|
||||
return redis_lookup_id_random(record_id_random=values['id_random'], table_name='user')
|
||||
if isinstance(v, int) and v > 0: return v
|
||||
elif id_random := values.get('id_random'):
|
||||
return redis_lookup_id_random(record_id_random=id_random, table_name='user')
|
||||
return None
|
||||
|
||||
@validator('account_id', always=True)
|
||||
def account_id_lookup(cls, v, values, **kwargs):
|
||||
log.setLevel(logging.WARNING)
|
||||
log.debug(locals())
|
||||
if isinstance(v, int) and v > 0: return v
|
||||
elif id_random := values.get('account_id_random'):
|
||||
return redis_lookup_id_random(record_id_random=id_random, table_name='account')
|
||||
return None
|
||||
|
||||
if values['account_id_random']:
|
||||
return redis_lookup_id_random(record_id_random=values['account_id_random'], table_name='account')
|
||||
@validator('account_id_random', always=True)
|
||||
def account_id_random_lookup(cls, v, values, **kwargs):
|
||||
if isinstance(v, str) and len(v) >= 11: return v
|
||||
elif account_id := values.get('account_id'):
|
||||
return get_id_random(record_id=account_id, table_name='account')
|
||||
return None
|
||||
|
||||
# @validator('contact_id', always=True)
|
||||
@@ -368,3 +163,243 @@ class User_Base(BaseModel):
|
||||
allow_population_by_field_name = True
|
||||
fields = base_fields
|
||||
# ### END ### API User Models ### User_Base() ###
|
||||
|
||||
|
||||
# ### BEGIN ### API User Models ### User_New_Base() ###
|
||||
class User_New_Base(BaseModel):
|
||||
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(locals())
|
||||
|
||||
id_random: Optional[str] = Field(
|
||||
**base_fields['user_id_random'],
|
||||
alias = 'user_id_random',
|
||||
# default_factory = lambda:secrets.token_urlsafe(default_num_bytes),
|
||||
)
|
||||
id: Optional[int] = Field(
|
||||
alias = 'user_id'
|
||||
)
|
||||
account_id_random: Optional[str]
|
||||
account_id: Optional[int]
|
||||
account_name: Optional[str]
|
||||
|
||||
contact_id_random: Optional[str]
|
||||
contact_id: Optional[int]
|
||||
|
||||
organization_id_random: Optional[str]
|
||||
organization_id: Optional[int]
|
||||
|
||||
person_id_random: Optional[str]
|
||||
person_id: Optional[int]
|
||||
|
||||
username: str
|
||||
name: str
|
||||
email: str
|
||||
email_verified: bool = False
|
||||
new_password: str = Field(default_factory = lambda:secrets.token_urlsafe(default_num_bytes))
|
||||
password: Optional[str] # If new_password is found then the validator below will create secure_hash_string() from the new password string.
|
||||
|
||||
allow_auth_key: bool = False
|
||||
|
||||
enable: bool = False
|
||||
enable_from: Optional[datetime.datetime] = datetime.datetime.now(datetime.timezone.utc)
|
||||
#enable_from: Optional[datetime.datetime] = datetime.datetime.now()
|
||||
enable_to: Optional[datetime.datetime] = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=365)
|
||||
#enable_to: Optional[datetime.datetime] = datetime.datetime.now() + datetime.timedelta(days=365)
|
||||
|
||||
#super: Optional[bool] = False
|
||||
#manager: Optional[bool] = False
|
||||
administrator: bool = False
|
||||
public: bool = False
|
||||
verified: bool = False
|
||||
|
||||
hide: Optional[bool]
|
||||
priority: Optional[bool]
|
||||
sort: Optional[int]
|
||||
group: Optional[str]
|
||||
|
||||
notes: Optional[str]
|
||||
|
||||
# Including JSON data
|
||||
other_json: Optional[Json]
|
||||
|
||||
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
|
||||
|
||||
#@validator('user_id_random', always=True)
|
||||
def user_id_random_copy(cls, v, values, **kwargs):
|
||||
log.setLevel(logging.WARNING)
|
||||
log.debug(locals())
|
||||
|
||||
if values['id_random']:
|
||||
return values['id_random']
|
||||
return None
|
||||
|
||||
@validator('id', always=True)
|
||||
def user_id_lookup(cls, v, values, **kwargs):
|
||||
log.setLevel(logging.WARNING)
|
||||
log.debug(locals())
|
||||
|
||||
if isinstance(v, int) and v > 0: return v
|
||||
elif id_random := values.get('id_random'):
|
||||
return redis_lookup_id_random(record_id_random=id_random, table_name='user')
|
||||
return None
|
||||
|
||||
@validator('account_id', always=True)
|
||||
def account_id_lookup(cls, v, values, **kwargs):
|
||||
if isinstance(v, int) and v > 0: return v
|
||||
elif id_random := values.get('account_id_random'):
|
||||
return redis_lookup_id_random(record_id_random=id_random, table_name='account')
|
||||
return None
|
||||
|
||||
@validator('account_id_random', always=True)
|
||||
def account_id_random_lookup(cls, v, values, **kwargs):
|
||||
if isinstance(v, str) and len(v) >= 11: return v
|
||||
elif account_id := values.get('account_id'):
|
||||
return get_id_random(record_id=account_id, table_name='account')
|
||||
return None
|
||||
|
||||
@validator('contact_id', always=True)
|
||||
def contact_id_lookup(cls, v, values, **kwargs):
|
||||
if isinstance(v, int) and v > 0: return v
|
||||
elif id_random := values.get('contact_id_random'):
|
||||
return redis_lookup_id_random(record_id_random=id_random, table_name='contact')
|
||||
return None
|
||||
|
||||
@validator('organization_id', always=True)
|
||||
def organization_id_lookup(cls, v, values, **kwargs):
|
||||
if isinstance(v, int) and v > 0: return v
|
||||
elif id_random := values.get('organization_id_random'):
|
||||
return redis_lookup_id_random(record_id_random=id_random, table_name='organization')
|
||||
return None
|
||||
|
||||
@validator('person_id', always=True)
|
||||
def person_id_lookup(cls, v, values, **kwargs):
|
||||
if isinstance(v, int) and v > 0: return v
|
||||
elif id_random := values.get('person_id_random'):
|
||||
return redis_lookup_id_random(record_id_random=id_random, table_name='person')
|
||||
return None
|
||||
|
||||
@validator('password', always=True)
|
||||
def hash_new_password(cls, v, values, **kwargs):
|
||||
log.setLevel(logging.WARNING)
|
||||
log.debug(locals())
|
||||
|
||||
if values.get('new_password'):
|
||||
return secure_hash_string(string=values['new_password'])
|
||||
return None
|
||||
|
||||
class Config:
|
||||
underscore_attrs_are_private = True
|
||||
allow_population_by_field_name = True
|
||||
fields = base_fields
|
||||
# ### END ### API User Models ### User_New_Base() ###
|
||||
|
||||
|
||||
# ### BEGIN ### API User Models ### User_Out_Base() ###
|
||||
class User_Out_Base(BaseModel):
|
||||
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(locals())
|
||||
|
||||
id_random: Optional[str] = Field(
|
||||
**base_fields['user_id_random'],
|
||||
alias = 'user_id_random',
|
||||
)
|
||||
id: Optional[int] = Field(
|
||||
alias = 'user_id'
|
||||
)
|
||||
|
||||
account_id_random: Optional[str]
|
||||
account_id: Optional[int]
|
||||
account_name: Optional[str]
|
||||
|
||||
contact_id_random: Optional[str]
|
||||
contact_id: Optional[int]
|
||||
|
||||
organization_id_random: Optional[str]
|
||||
organization_id: Optional[int]
|
||||
|
||||
person_id_random: Optional[str]
|
||||
person_id: Optional[int]
|
||||
|
||||
username: Optional[str]
|
||||
name: Optional[str]
|
||||
email: Optional[str]
|
||||
email_verified: Optional[bool]
|
||||
password: Optional[str]
|
||||
|
||||
allow_auth_key: Optional[int]
|
||||
auth_key: Optional[str]
|
||||
|
||||
enable: Optional[bool]
|
||||
enable_from: Optional[datetime.datetime]
|
||||
enable_to: Optional[datetime.datetime]
|
||||
|
||||
super: Optional[bool]
|
||||
manager: Optional[bool]
|
||||
administrator: Optional[bool]
|
||||
public: Optional[bool]
|
||||
verified: Optional[bool]
|
||||
status_id: Optional[int]
|
||||
status_name: Optional[str]
|
||||
|
||||
password_set_on: Optional[datetime.datetime]
|
||||
password_reset_token: Optional[str]
|
||||
password_reset_expire_on: Optional[datetime.datetime]
|
||||
logged_in_on: Optional[datetime.datetime]
|
||||
last_activity_on: Optional[datetime.datetime]
|
||||
|
||||
hide: Optional[bool]
|
||||
priority: Optional[bool]
|
||||
sort: Optional[int]
|
||||
group: Optional[str]
|
||||
|
||||
notes: Optional[str]
|
||||
created_on: Optional[datetime.datetime]
|
||||
updated_on: Optional[datetime.datetime]
|
||||
|
||||
# Including other related objects
|
||||
# from app.models.person_models import Person_Base # Causes circular import
|
||||
# archive_list: Optional[list] # Archive_Base()
|
||||
# contact: Optional[Contact_Base]
|
||||
event_list: Optional[list] # Event_Base() # Priority l1
|
||||
hosted_file_list: Optional[list] # Hosted_File_Base() # Priority l2
|
||||
journal_list: Optional[list] # Journal_Base() # Priority l3
|
||||
# membership_person: Optional[Membership_Person_Base] # Priority l2
|
||||
# membership_person_list: Optional[list] # Membership_Base() ???
|
||||
order_list: Optional[list] # Order_Base() # Priority l2
|
||||
order_cart_list: Optional[list] # Order_Base() # Priority l2
|
||||
organization: Optional[Union[Organization_Base, None]] # Organization_Base() # Priority l3
|
||||
person: Optional[dict] # Person_Base() # Priority l2
|
||||
# person: Optional[Union[Person_Base, None]]
|
||||
post_list: Optional[list] # Post_Base() # Priority l1
|
||||
user_role_list: Optional[list] = Field(
|
||||
alias = 'role_list'
|
||||
) # User_Role_Base()
|
||||
|
||||
_processed_at: datetime.datetime = PrivateAttr(default_factory=datetime.datetime.now)
|
||||
|
||||
@validator('account_id', always=True)
|
||||
def account_id_lookup(cls, v, values, **kwargs):
|
||||
if isinstance(v, int) and v > 0: return v
|
||||
elif id_random := values.get('account_id_random'):
|
||||
return redis_lookup_id_random(record_id_random=id_random, table_name='account')
|
||||
return None
|
||||
|
||||
@validator('account_id_random', always=True)
|
||||
def account_id_random_lookup(cls, v, values, **kwargs):
|
||||
if isinstance(v, str) and len(v) >= 11: return v
|
||||
elif account_id := values.get('account_id'):
|
||||
return get_id_random(record_id=account_id, table_name='account')
|
||||
return None
|
||||
|
||||
@validator('person_id', always=True)
|
||||
def person_id_lookup(cls, v, values, **kwargs):
|
||||
if isinstance(v, int) and v > 0: return v
|
||||
elif id_random := values.get('person_id_random'):
|
||||
return redis_lookup_id_random(record_id_random=id_random, table_name='person')
|
||||
return None
|
||||
|
||||
class Config:
|
||||
underscore_attrs_are_private = True
|
||||
allow_population_by_field_name = True
|
||||
fields = base_fields
|
||||
# ### END ### API User Models ### User_Out_Base() ###
|
||||
|
||||
Reference in New Issue
Block a user