Working on all the routes, methods, and models.

This commit is contained in:
Scott Idem
2021-06-18 14:09:07 -04:00
parent bb2f14b67c
commit 938aabb2a8
29 changed files with 941 additions and 335 deletions

View File

@@ -12,12 +12,15 @@ from app.methods.address_methods import get_address_rec_list, load_address_obj
from app.methods.archive_methods import get_archive_rec_list, load_archive_obj
from app.methods.contact_methods import get_contact_rec_list, load_contact_obj
from app.methods.event_methods import get_event_rec_list, load_event_obj
from app.methods.hosted_file_methods import get_hosted_file_rec_list, load_hosted_file_obj
from app.methods.journal_methods import get_journal_rec_list, load_journal_obj
from app.methods.order_methods import get_order_rec_list, load_order_obj
# from app.methods.order_cart_methods import get_order_cart_rec_list, load_order_cart_obj
from app.methods.organization_methods import get_organization_rec_list, load_organization_obj
from app.methods.person_methods import get_person_rec_list, load_person_obj
from app.methods.product_methods import get_product_rec_list, load_product_obj
from app.methods.post_methods import get_post_rec_list, load_post_obj
from app.methods.site_methods import get_site_rec_list, load_site_obj
from app.methods.user_methods import get_user_rec_list, load_user_obj
from app.models.account_models import Account_Base
@@ -30,6 +33,8 @@ from app.models.account_cfg_models import Account_Cfg_Base
def load_account_obj(
account_id: int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_account_cfg: bool = False, # Priority l1
@@ -72,7 +77,9 @@ def load_account_obj(
inc_event_track_list: bool = False,
inc_fundraising_cfg: bool = False,
inc_hosted_file_list: bool = False,
inc_hosted_file_link_list: bool = False,
inc_journal_list: bool = False, # Priority l3
inc_journal_entry_list: bool = False, # Priority l3
# inc_membership: bool = False,
inc_membership_cfg: bool = False,
inc_membership_list: bool = False,
@@ -99,6 +106,7 @@ def load_account_obj(
inc_product_list: bool = False, # Priority l3
# inc_site: bool = False,
inc_site_list: bool = False, # Priority l3
inc_site_domain_list: bool = False, # Priority l3
inc_user: bool = False,
inc_user_list: bool = False, # Priority l2
inc_user_role_list: bool = False,
@@ -109,7 +117,7 @@ def load_account_obj(
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: return False
if account_rec := sql_select(table_name='account', record_id=account_id): pass
if account_rec := sql_select(table_name='v_account', record_id=account_id): pass
else: return False
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
@@ -236,7 +244,7 @@ def load_account_obj(
account_obj.event_list = event_dict_list
else: account_obj.event_list = []
# Updated 2021-06-17
# Updated 2021-06-18
if inc_hosted_file_list:
if hosted_file_rec_list_result := get_hosted_file_rec_list(
for_obj_type = 'account',
@@ -252,12 +260,38 @@ def load_account_obj(
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
inc_hosted_file_link_list = inc_hosted_file_line_list,
inc_hosted_file_link_list = inc_hosted_file_link_list,
)
)
account_obj.hosted_file_list = hosted_file_dict_list
else: account_obj.hosted_file_list = []
# Updated 2021-06-18
if inc_journal_list:
if journal_rec_list_result := get_journal_rec_list(
for_obj_type = 'account',
for_obj_id = account_id,
limit = limit,
enabled = enabled,
):
journal_dict_list = []
for journal_rec in journal_rec_list_result:
journal_dict_list.append(
load_journal_obj(
journal_id = journal_rec.get('journal_id', None),
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
inc_journal_entry_list = inc_journal_entry_list,
)
)
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(journal_dict_list)
account_obj.journal_list = journal_dict_list
else: account_obj.journal_list = []
# Updated 2021-06-17
if inc_order_list:
if order_rec_list_result := get_order_rec_list(
@@ -376,6 +410,28 @@ def load_account_obj(
account_obj.product_list = product_dict_list
else: account_obj.product_list = []
# Updated 2021-06-17
if inc_site_list:
if site_rec_list_result := get_site_rec_list(
for_obj_type = 'account',
for_obj_id = account_id,
limit = limit,
enabled = enabled,
):
site_result_list = []
for site_rec in site_rec_list_result:
site_result_list.append(
load_site_obj(
site_id = site_rec.get('site_id', None),
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
inc_site_domain_list = inc_site_domain_list,
)
)
account_obj.site_list = site_result_list
else: account_obj.site_list = []
# Updated 2021-06-17
if inc_user_list:
if user_rec_list_result := get_user_rec_list(
@@ -410,7 +466,7 @@ def load_account_obj(
else: account_obj.user_list = []
if model_as_dict:
return account_obj.dict(by_alias=True, exclude_unset=True) # pylint: disable=no-member
return account_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return account_obj
# ### END ### API Account Methods ### load_account_obj() ###

View File

@@ -37,6 +37,8 @@ def create_address_obj(address_obj_new:Address_Base):
def load_address_obj(
address_id:int|str,
limit: int = 1000, # Probably not needed for the address
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all # Probably not needed for the address
) -> Address_Base|bool:

View File

@@ -14,6 +14,8 @@ from app.models.archive_content_models import Archive_Content_Base
def load_archive_content_obj(
archive_content_id: int|str,
# limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
# enabled: str = 'enabled', # enabled, disabled, all
# inc_archive_content_content_list: bool = False,
@@ -37,28 +39,28 @@ def load_archive_content_obj(
log.error(e.json())
if model_as_dict:
return archive_content_obj.dict(by_alias=True, exclude_unset=True) # pylint: disable=no-member
return archive_content_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return archive_content_obj
# ### END ### API Archive Content Methods ### load_archive_content_obj() ###
# ### BEGIN ### API Archive Content Methods ### load_archive_content_obj_list() ###
def load_archive_content_obj_list(
archive_id: int|str,
# ### BEGIN ### API Archive Content Methods ### get_archive_content_rec_list() ###
def get_archive_content_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
# inc_archive_content_content_list: bool = False,
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if archive_id := redis_lookup_id_random(record_id_random=archive_id, table_name='archive'): pass
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name='for_obj_type'): pass
else: return False
data = {}
data['archive_id'] = archive_id
data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
@@ -69,7 +71,6 @@ def load_archive_content_obj_list(
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
# else: tbl_obj['archive'] = None
if limit:
data['limit'] = limit
@@ -80,35 +81,19 @@ def load_archive_content_obj_list(
sql = f"""
SELECT `tbl`.id AS 'archive_content_id', `tbl`.id_random AS 'archive_content_id_random'
FROM `archive_content` AS `tbl`
WHERE `tbl`.archive_id = :archive_id
{sql_enabled}
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if archive_content_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(archive_content_rec_li_result)
archive_content_result_li = []
for archive_content_rec in archive_content_rec_li_result:
archive_content_id = archive_content_rec.get('archive_content_id', None)
if archive_content_result := load_archive_content_obj(
archive_content_id = archive_content_id,
model_as_dict = model_as_dict,
# enabled=enabled,
# inc_archive_content_content_list=inc_archive_content_content_list,
):
log.debug(archive_content_result)
archive_content_result_li.append(archive_content_result)
else:
log.debug(archive_content_result)
archive_content_result_li.append(None)
log.debug(archive_content_result_li)
archive_content_rec_li = archive_content_rec_li_result
else:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(archive_content_rec_li_result)
archive_content_result_li = []
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
archive_content_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(archive_content_rec_li_result)
return archive_content_result_li
# ### END ### API Archive Content Methods ### load_archive_content_obj_list() ###
return archive_content_rec_li
# ### END ### API Archive Content Methods ### get_archive_content_rec_list() ###

View File

@@ -7,7 +7,7 @@ from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, v
from app.db_sql import redis_lookup_id_random, sql_select
from app.lib_general import log, logging
from app.methods.archive_content_methods import load_archive_content_obj, load_archive_content_obj_list
from app.methods.archive_content_methods import get_archive_content_rec_list, load_archive_content_obj
from app.models.archive_models import Archive_Base
@@ -16,6 +16,8 @@ from app.models.archive_models import Archive_Base
def load_archive_obj(
archive_id: int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_archive_content_list: bool = False,
@@ -38,95 +40,34 @@ def load_archive_obj(
except ValidationError as e:
log.error(e.json())
# Updated 2021-06-18
if inc_archive_content_list:
if archive_content_dict_list := load_archive_content_obj_list(
archive_id = archive_id,
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
# inc_archive_content_content_list=inc_archive_content_content_list
):
archive_obj.archive_content_list = archive_content_dict_list
else: archive_obj.archive_content_list = None
if archive_content_rec_list_result := get_archive_content_rec_list(
for_obj_type = 'archive',
for_obj_id = archive_id,
limit = limit,
enabled = enabled,
):
archive_content_result_list = []
for archive_content_rec in archive_content_rec_list_result:
archive_content_result_list.append(
load_archive_content_obj(
archive_content_id = archive_content_rec.get('archive_content_id', None),
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
)
)
archive_obj.archive_content_list = archive_content_result_list
else: archive_obj.archive_content_list = []
if model_as_dict:
return archive_obj.dict(by_alias=True, exclude_unset=True) # pylint: disable=no-member
return archive_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return archive_obj
# ### END ### API Archive Methods ### load_archive_obj() ###
# ### BEGIN ### API Archive Methods ### load_archive_obj_list() ###
def load_archive_obj_list(
account_id: int|str,
limit: int = 1000,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_archive_content_list: bool = False,
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: return False
data = {}
data['account_id'] = account_id
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
# else: tbl_obj['account'] = None
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'archive_id', `tbl`.id_random AS 'archive_id_random'
FROM `archive` AS `tbl`
WHERE `tbl`.account_id = :account_id
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if archive_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(archive_rec_li_result)
archive_result_li = []
for archive_rec in archive_rec_li_result:
archive_id = archive_rec.get('archive_id', None)
if archive_result := load_archive_obj(
archive_id = archive_id,
model_as_dict = model_as_dict,
enabled = enabled,
inc_archive_content_list = inc_archive_content_list,
):
log.debug(archive_result)
archive_result_li.append(archive_result)
else:
log.debug(archive_result)
archive_result_li.append(None)
log.debug(archive_result_li)
else:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(archive_rec_li_result)
archive_result_li = []
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
return archive_result_li
# ### END ### API Archive Methods ### load_archive_obj_list() ###
# ### BEGIN ### API Archive Methods ### get_archive_rec_list() ###
def get_archive_rec_list(
for_obj_type: str,

View File

@@ -58,6 +58,8 @@ def create_contact_obj(contact_obj_new:Contact_Base):
def load_contact_obj(
contact_id:int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_address:bool=False
@@ -83,15 +85,17 @@ def load_contact_obj(
address_id = contact_rec.get('address_id', None)
log.debug(address_id)
from app.methods.address_methods import load_address_obj
if address_dict := load_address_obj(
if address_result := load_address_obj(
address_id = address_id,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
):
contact_obj.address = address_dict
contact_obj.address = address_result
else: contact_obj.address = None
if model_as_dict:
return contact_obj.dict(by_alias=True, exclude_unset=True) # pylint: disable=no-member
return contact_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return contact_obj
# ### END ### API Contact Methods ### load_contact_obj() ###

View File

@@ -23,6 +23,8 @@ from app.models.event_session_models import Event_Session_Base
def load_event_session_obj(
event_session_id: int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_address: bool = False,

View File

@@ -36,6 +36,8 @@ def create_hosted_file_obj(hosted_file_obj_new:Hosted_File_Base):
def load_hosted_file_obj(
hosted_file_id: int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_hosted_file_link_list: bool = False,
@@ -68,7 +70,7 @@ def load_hosted_file_obj(
# else: hosted_file_obj.x = None
if model_as_dict:
return hosted_file_obj.dict(by_alias=True, exclude_unset=True) # pylint: disable=no-member
return hosted_file_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return hosted_file_obj
# ### END ### API Hosted File Methods ### load_hosted_file_obj() ###

View File

@@ -0,0 +1,157 @@
from __future__ import annotations
import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
from app.lib_general import log, logging
# from app.methods.journal_methods import load_journal_entry_obj
from app.models.journal_entry_models import Journal_Entry_Base
# ### BEGIN ### API Journal Entry Methods ### create_journal_entry_obj() ###
def create_journal_entry_obj(journal_entry_obj_new:Journal_Entry_Base) -> bool|int:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
journal_entry_obj_data = journal_entry_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'created_on', 'updated_on'})
if journal_entry_obj_in_result := sql_insert(
data=journal_entry_obj_data,
table_name='journal_entry',
rm_id_random=True,
id_random_length=8
): pass
else: return False
log.debug(journal_entry_obj_in_result)
journal_entry_id = journal_entry_obj_in_result
log.debug(f'New journal_entry_id: {journal_entry_id}')
return journal_entry_id
# ### END ### API Journal Entry Methods ### create_journal_entry_obj() ###
# ### BEGIN ### API Journal Entry Methods ### load_journal_entry_obj() ###
def load_journal_entry_obj(
journal_entry_id: int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_journal_entry_list: bool = False,
inc_person: bool = False,
inc_user: bool = False,
) -> Journal_Entry_Base|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if journal_entry_id := redis_lookup_id_random(record_id_random=journal_entry_id, table_name='journal_entry'): pass
else: return False
log.debug(journal_entry_id)
if journal_entry_rec := sql_select(table_name='v_journal_entry', record_id=journal_entry_id):
log.debug(journal_entry_rec)
else: return False
log.debug(journal_entry_rec)
try:
journal_entry_obj = Journal_Entry_Base(**journal_entry_rec)
log.debug(journal_entry_obj)
except ValidationError as e:
log.error(e.json())
if model_as_dict:
return journal_entry_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return journal_entry_obj
# ### END ### API Journal Entry Methods ### load_journal_entry_obj() ###
# ### BEGIN ### API Journal Entry Methods ### update_journal_entry_obj() ###
def update_journal_entry_obj(
journal_entry_id: int|str, # This allows for updating of the id_random value.
journal_entry_obj_up: Journal_Entry_Base,
create_missing_obj: bool = False,
) -> bool:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if journal_entry_id := redis_lookup_id_random(record_id_random=journal_entry_id, table_name='journal_entry'): pass
else: return False
journal_entry_obj_up.id = journal_entry_id
log.debug(journal_entry_obj_up)
log.debug(journal_entry_obj_up.dict(by_alias=False, exclude_unset=True))
# log.debug(journal_entry_obj_up.dict(by_alias=False, exclude_unset=False))
journal_dict_up = journal_entry_obj_up.dict(by_alias=False, exclude_unset=True)
log.debug(journal_dict_up)
if journal_entry_obj_up_result := sql_update(data=journal_dict_up, table_name='journal_entry', rm_id_random=True):
log.debug(journal_entry_obj_up_result)
return True
else:
log.debug(journal_entry_obj_up_result)
return False
# ### END ### API Journal Entry Methods ### update_journal_entry_obj() ###
# ### BEGIN ### API Journal Entry Methods ### get_journal_entry_rec_list() ###
def get_journal_entry_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name='for_obj_type'): pass
else: return False
data = {}
data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'journal_entry_id', `tbl`.id_random AS 'journal_entry_id_random'
FROM `journal_entry` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if journal_entry_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
journal_entry_rec_li = journal_entry_rec_li_result
else:
journal_entry_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(journal_entry_rec_li_result)
return journal_entry_rec_li
# ### END ### API Journal Entry Methods ### get_journal_entry_rec_list() ###

View File

@@ -0,0 +1,187 @@
from __future__ import annotations
import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
from app.lib_general import log, logging
# from app.methods.person_methods import load_person_obj
from app.methods.journal_entry_methods import get_journal_entry_rec_list, load_journal_entry_obj
# from app.methods.user_methods import load_user_obj
from app.models.journal_models import Journal_Base
# ### BEGIN ### API Journal Methods ### create_journal_obj() ###
def create_journal_obj(journal_obj_new:Journal_Base):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if not journal_obj_new:
return False
journal_obj_data = journal_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'user', 'created_on', 'updated_on'})
if journal_obj_in_result := sql_insert(
data=journal_obj_data,
table_name='journal',
rm_id_random=True,
id_random_length=8
): pass
else: return False
log.debug(journal_obj_in_result)
journal_id = journal_obj_in_result
log.debug(f'New journal_id: {journal_entry_id}')
return journal_id
# ### END ### API Journal Methods ### create_journal_obj() ###
# ### BEGIN ### API Journal Methods ### load_journal_obj() ###
def load_journal_obj(
journal_id: int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_journal_entry_list: bool = False,
inc_person: bool = False,
inc_user: bool = False,
) -> Journal_Base|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if journal_id := redis_lookup_id_random(record_id_random=journal_id, table_name='journal'): pass
else: return False
if journal_rec := sql_select(table_name='v_journal', record_id=journal_id):
log.debug(journal_rec)
else: return False
log.debug(journal_rec)
try:
journal_obj = Journal_Base(**journal_rec)
log.debug(journal_obj)
except ValidationError as e:
log.error(e.json())
# Updated 2021-06-18
if inc_journal_entry_list:
if journal_entry_rec_list_result := get_journal_entry_rec_list(
for_obj_type = 'journal',
for_obj_id = journal_id,
limit = limit,
enabled = enabled,
):
journal_entry_result_list = []
for journal_entry_rec in journal_entry_rec_list_result:
journal_entry_result_list.append(
load_journal_entry_obj(
journal_entry_id = journal_entry_rec.get('journal_entry_id', None),
limit = limit,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
enabled = enabled,
inc_person = inc_person,
inc_user = inc_user,
)
)
journal_obj.journal_entry_list = journal_entry_result_list
else: journal_obj.journal_entry_list = []
if model_as_dict:
return journal_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return journal_obj
# ### END ### API Journal Methods ### load_journal_obj() ###
# ### BEGIN ### API Journal Methods ### update_journal_obj() ###
def update_journal_obj(
journal_id: int|str, # This allows for updating of the id_random value.
journal_obj_up: Journal_Base,
create_missing_obj: bool = False,
) -> bool:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if journal_id := redis_lookup_id_random(record_id_random=journal_id, table_name='journal'): pass
else: return False
journal_obj_up.id = journal_id
log.debug(journal_obj_up)
log.debug(journal_obj_up.dict(by_alias=False, exclude_unset=True))
# log.debug(journal_obj_up.dict(by_alias=False, exclude_unset=False))
journal_dict_up = journal_obj_up.dict(by_alias=False, exclude_unset=True, exclude={'user'})
log.debug(journal_dict_up)
if journal_obj_up_result := sql_update(data=journal_dict_up, table_name='journal', rm_id_random=True):
log.debug(journal_obj_up_result)
return True
else:
log.debug(journal_obj_up_result)
return False
# ### END ### API Journal Methods ### update_journal_obj() ###
# ### BEGIN ### API Journal Methods ### get_journal_rec_list() ###
def get_journal_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name='for_obj_type'): pass
else: return False
data = {}
data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'journal_id', `tbl`.id_random AS 'journal_id_random'
FROM `journal` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if journal_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
journal_rec_li = journal_rec_li_result
else:
journal_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(journal_rec_li_result)
return journal_rec_li
# ### END ### API Journal Methods ### get_journal_rec_list() ###

View File

@@ -8,9 +8,7 @@ from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_updat
from app.lib_general import log, logging
from app.methods.person_methods import load_person_obj
# from app.methods.post_comment_list_methods import load_post_comment_obj_list
# from app.methods.user_methods import load_user_obj
# from app.methods.user_load_methods import load_user_obj
from app.models.post_comment_models import Post_Comment_Base
@@ -42,6 +40,8 @@ def create_post_comment_obj(post_comment_obj_new:Post_Comment_Base):
def load_post_comment_obj(
post_comment_id: int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_person: bool = False,
@@ -87,85 +87,12 @@ def load_post_comment_obj(
else: post_comment_obj.user = None
if model_as_dict:
return post_comment_obj.dict(by_alias=True, exclude_unset=True) # pylint: disable=no-member
return post_comment_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return post_comment_obj
# ### END ### API Post Comment Methods ### load_post_comment_obj() ###
# ### BEGIN ### API Post Comment Methods ### load_post_comment_obj_list() ###
def load_post_comment_obj_list(
post_id: int|str,
limit: int = 1000,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_person: bool = False,
inc_user: bool = False,
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if post_id := redis_lookup_id_random(record_id_random=post_id, table_name='post'): pass
else: return False
data = {}
data['post_id'] = post_id
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
# else: tbl_obj['post'] = None
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'post_comment_id', `tbl`.id_random AS 'post_comment_id_random'
FROM `post_comment` AS `tbl`
WHERE `tbl`.post_id = :post_id
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if post_comment_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(post_comment_rec_li_result)
post_comment_result_li = []
for post_comment_rec in post_comment_rec_li_result:
post_comment_id = post_comment_rec.get('post_comment_id', None)
if post_comment_result := load_post_comment_obj(
post_comment_id = post_comment_id,
model_as_dict = model_as_dict,
# enabled = enabled,
inc_person = inc_person,
inc_user = inc_user,
):
log.debug(post_comment_result)
post_comment_result_li.append(post_comment_result)
else:
log.debug(post_comment_result)
post_comment_result_li.append(None)
log.debug(post_comment_result_li)
else:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(post_comment_rec_li_result)
post_comment_result_li = []
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
return post_comment_result_li
# ### END ### API Post Comment Methods ### load_post_comment_obj_list() ###
# ### BEGIN ### API Post Comment Methods ### update_post_comment_obj() ###
def update_post_comment_obj(
post_comment_id: int|str, # Ideally the int ID should be passed. This allows for updating of the id_random value.

View File

@@ -42,6 +42,8 @@ def create_post_obj(post_obj_new:Post_Base):
def load_post_obj(
post_id: int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_post_comment_list: bool = False,
@@ -86,33 +88,23 @@ def load_post_obj(
limit = limit,
enabled = enabled,
):
post_comment_dict_list = []
post_comment_result_list = []
for post_comment_rec in post_comment_rec_list_result:
post_comment_dict_list.append(
post_comment_result_list.append(
load_post_comment_obj(
post_comment_id = post_comment_rec.get('post_comment_id', None),
limit = limit,
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
enabled = enabled,
inc_person = inc_person,
inc_user = inc_user,
)
)
post_obj.post_comment_list = post_comment_dict_list
post_obj.post_comment_list = post_comment_result_list
else: post_obj.post_comment_list = []
# if inc_post_comment_list:
# if post_comment_dict_list := load_post_comment_obj_list(
# post_id = post_id,
# limit = limit,
# model_as_dict = model_as_dict,
# enabled = enabled,
# inc_person = inc_person,
# inc_user = inc_user,
# ):
# post_obj.post_comment_list = post_comment_dict_list
# else: post_obj.post_comment_list = None
if inc_user:
user_id = post_rec.get('user_id', None)
if user_obj_result := load_user_obj(user_id=user_id):
@@ -135,6 +127,8 @@ def load_post_obj_list(
account_id: int|str|None = None,
user_id: int|str|None = None,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_post_comment_list: bool = False,

View File

@@ -0,0 +1,157 @@
from __future__ import annotations
import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
from app.lib_general import log, logging
from app.models.site_domain_models import Site_Domain_Base
# ### BEGIN ### API Site Domain Methods ### create_site_domain_obj() ###
def create_site_domain_obj(site_domain_obj_new:Site_Domain_Base):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if not site_domain_obj_new:
return False
site_domain_obj_data = site_domain_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'user', 'created_on', 'updated_on'})
if site_domain_obj_in_result := sql_insert(
data=site_domain_obj_data,
table_name='site_domain',
rm_id_random=True,
id_random_length=8
): pass
else: return False
log.debug(site_domain_obj_in_result)
site_domain_id = site_domain_obj_in_result
log.debug(f'New site_domain_id: {site_domain_id}')
return site_domain_id
# ### END ### API Site Domain Methods ### create_site_domain_obj() ###
# ### BEGIN ### API Site Domain Methods ### load_site_domain_obj() ###
def load_site_domain_obj(
site_domain_id: int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_site_domain_list: bool = False,
) -> Site_Domain_Base|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if site_domain_id := redis_lookup_id_random(record_id_random=site_domain_id, table_name='site_domain'): pass
else: return False
if site_domain_rec := sql_select(table_name='v_site_domain', record_id=site_domain_id):
log.debug(site_domain_rec)
else: return False
log.debug(site_domain_rec)
try:
site_domain_obj = Site_Domain_Base(**site_domain_rec)
log.debug(site_domain_obj)
except ValidationError as e:
log.error(e.json())
if model_as_dict:
return site_domain_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return site_domain_obj
# ### END ### API Site Domain Methods ### load_site_domain_obj() ###
# ### BEGIN ### API Site Domain Methods ### update_site_domain_obj() ###
def update_site_domain_obj(
site_domain_id: int|str, # This allows for updating of the id_random value.
site_domain_obj_up: Site_Domain_Base,
create_missing_obj: bool = False,
) -> bool:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if site_domain_id := redis_lookup_id_random(record_id_random=site_domain_id, table_name='site_domain'): pass
else: return False
site_domain_obj_up.id = site_domain_id
log.debug(site_domain_obj_up)
log.debug(site_domain_obj_up.dict(by_alias=False, exclude_unset=True))
# log.debug(site_domain_obj_up.dict(by_alias=False, exclude_unset=False))
site_domain_dict_up = site_domain_obj_up.dict(by_alias=False, exclude_unset=True, exclude={'user'})
log.debug(site_domain_dict_up)
if site_domain_obj_up_result := sql_update(data=site_domain_dict_up, table_name='site_domain', rm_id_random=True):
log.debug(site_domain_obj_up_result)
return True
else:
log.debug(site_domain_obj_up_result)
return False
# ### END ### API Site Domain Methods ### update_site_domain_obj() ###
# ### BEGIN ### API Site Domain Methods ### get_site_domain_rec_list() ###
def get_site_domain_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name='for_obj_type'): pass
else: return False
data = {}
data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id'
# if enabled in ['enabled', 'disabled', 'all']:
# if enabled == 'enabled':
# data['enable'] = True
# sql_enabled = f'AND `tbl`.enable = :enable'
# elif enabled == 'disabled':
# data['enable'] = False
# sql_enabled = f'AND `tbl`.enable = :enable'
# elif enabled == 'all':
# sql_enabled = ''
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'site_domain_id', `tbl`.id_random AS 'site_domain_id_random'
FROM `site_domain` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if site_domain_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
site_domain_rec_li = site_domain_rec_li_result
else:
site_domain_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(site_domain_rec_li_result)
return site_domain_rec_li
# ### END ### API Site Domain Methods ### get_site_domain_rec_list() ###

181
app/methods/site_methods.py Normal file
View File

@@ -0,0 +1,181 @@
from __future__ import annotations
import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.db_sql import redis_lookup_id_random, sql_insert, sql_select, sql_update
from app.lib_general import log, logging
# from app.methods.person_methods import load_person_obj
from app.methods.site_domain_methods import get_site_domain_rec_list, load_site_domain_obj
# from app.methods.user_methods import load_user_obj
from app.models.site_models import Site_Base
# ### BEGIN ### API Site Methods ### create_site_obj() ###
def create_site_obj(site_obj_new:Site_Base):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if not site_obj_new:
return False
site_obj_data = site_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'user', 'created_on', 'updated_on'})
if site_obj_in_result := sql_insert(
data=site_obj_data,
table_name='site',
rm_id_random=True,
id_random_length=8
): pass
else: return False
log.debug(site_obj_in_result)
site_id = site_obj_in_result
log.debug(f'New site_id: {site_id}')
return site_id
# ### END ### API Site Methods ### create_site_obj() ###
# ### BEGIN ### API Site Methods ### load_site_obj() ###
def load_site_obj(
site_id: int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_site_domain_list: bool = False,
) -> Site_Base|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if site_id := redis_lookup_id_random(record_id_random=site_id, table_name='site'): pass
else: return False
if site_rec := sql_select(table_name='v_site', record_id=site_id):
log.debug(site_rec)
else: return False
log.debug(site_rec)
try:
site_obj = Site_Base(**site_rec)
log.debug(site_obj)
except ValidationError as e:
log.error(e.json())
# Updated 2021-06-18
if inc_site_domain_list:
if site_domain_rec_list_result := get_site_domain_rec_list(
for_obj_type = 'site',
for_obj_id = site_id,
limit = limit,
enabled = enabled,
):
site_domain_result_list = []
for site_domain_rec in site_domain_rec_list_result:
site_domain_result_list.append(
load_site_domain_obj(
site_domain_id = site_domain_rec.get('site_domain_id', None),
by_alias = by_alias,
exclude_unset = exclude_unset,
model_as_dict = model_as_dict,
)
)
site_obj.site_domain_list = site_domain_result_list
else: site_obj.site_domain_list = []
if model_as_dict:
return site_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset) # pylint: disable=no-member
else:
return site_obj
# ### END ### API Site Methods ### load_site_obj() ###
# ### BEGIN ### API Site Methods ### update_site_obj() ###
def update_site_obj(
site_id: int|str, # This allows for updating of the id_random value.
site_obj_up: Site_Base,
create_missing_obj: bool = False,
) -> bool:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if site_id := redis_lookup_id_random(record_id_random=site_id, table_name='site'): pass
else: return False
site_obj_up.id = site_id
log.debug(site_obj_up)
log.debug(site_obj_up.dict(by_alias=False, exclude_unset=True))
# log.debug(site_obj_up.dict(by_alias=False, exclude_unset=False))
site_dict_up = site_obj_up.dict(by_alias=False, exclude_unset=True, exclude={'user'})
log.debug(site_dict_up)
if site_obj_up_result := sql_update(data=site_dict_up, table_name='site', rm_id_random=True):
log.debug(site_obj_up_result)
return True
else:
log.debug(site_obj_up_result)
return False
# ### END ### API Site Methods ### update_site_obj() ###
# ### BEGIN ### API Site Methods ### get_site_rec_list() ###
def get_site_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name='for_obj_type'): pass
else: return False
data = {}
data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'site_id', `tbl`.id_random AS 'site_id_random'
FROM `site` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if site_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
site_rec_li = site_rec_li_result
else:
site_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(site_rec_li_result)
return site_rec_li
# ### END ### API Site Methods ### get_site_rec_list() ###