Work on lots of methods and models.

This commit is contained in:
Scott Idem
2021-06-17 16:54:46 -04:00
parent e9399e107f
commit c17724cea1
20 changed files with 1673 additions and 480 deletions

View File

@@ -8,11 +8,15 @@ from app.db_sql import redis_lookup_id_random, sql_select
from app.lib_general import log, logging
from app.methods.account_cfg_methods import load_account_cfg_obj
# from app.methods.address_methods import load_address_obj_list
from app.methods.archive_methods import load_archive_obj_list
from app.methods.address_methods import get_address_rec_list, load_address_obj
from app.methods.archive_methods import get_archive_rec_list, load_archive_obj
# from app.methods.contact_methods import load_contact_obj_list
from app.methods.event_methods import load_event_obj_list
from app.methods.organization_methods import get_organization_rec_list, load_organization_obj
from app.methods.person_methods import get_person_rec_list, load_person_obj
from app.methods.product_methods import get_product_rec_list, load_product_obj
from app.methods.post_methods import load_post_obj_list
from app.methods.user_methods import get_user_rec_list, load_user_obj
from app.models.account_models import Account_Base
from app.models.account_cfg_models import Account_Cfg_Base
@@ -90,6 +94,7 @@ def load_account_obj(
inc_site_list: bool = False, # Priority l3
inc_user: bool = False,
inc_user_list: bool = False, # Priority l2
inc_user_role_list: bool = False,
) -> Account_Base|dict|bool:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
@@ -120,24 +125,68 @@ def load_account_obj(
account_obj.account_cfg = account_cfg_dict
else: account_obj.account_cfg = None
# if inc_address_list:
# if address_dict_list := load_address_obj_list(
# account_id = account_id,
# limit = limit,
# model_as_dict = model_as_dict,
# enabled = enabled,
# ):
# account_obj.address_list = address_dict_list
# else: account_obj.address_list = []
# Updated 2021-06-17
if inc_address_list:
if address_dict_list := load_address_obj_list(
account_id = account_id,
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
):
if address_rec_list_result := get_address_rec_list(
for_obj_type = 'account',
for_obj_id = account_id,
limit = limit,
enabled = enabled,
):
address_dict_list = []
for address_rec in address_rec_list_result:
address_dict_list.append(
load_address_obj(
address_id = address_rec.get('address_id', None),
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
inc_address_content_list = inc_address_content_list,
)
)
account_obj.address_list = address_dict_list
else: account_obj.address_list = []
# if inc_archive_list:
# if archive_dict_list := load_archive_obj_list(
# account_id = account_id,
# limit = limit,
# model_as_dict = model_as_dict,
# enabled = enabled,
# inc_archive_content_list = inc_archive_content_list,
# ):
# account_obj.archive_list = archive_dict_list
# else: account_obj.archive_list = []
# Updated 2021-06-17
if inc_archive_list:
if archive_dict_list := load_archive_obj_list(
account_id = account_id,
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
inc_archive_content_list = inc_archive_content_list,
):
if archive_rec_list_result := get_archive_rec_list(
for_obj_type = 'account',
for_obj_id = account_id,
limit = limit,
enabled = enabled,
):
archive_dict_list = []
for archive_rec in archive_rec_list_result:
archive_dict_list.append(
load_archive_obj(
archive_id = archive_rec.get('archive_id', None),
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
inc_archive_content_list = inc_archive_content_list,
)
)
account_obj.archive_list = archive_dict_list
else: account_obj.archive_list = []
@@ -162,34 +211,100 @@ def load_account_obj(
# account_obj.organization = organization_obj
# else: account_obj.organization = None
# Updated 2021-06-17
if inc_event_list:
if event_dict_list := load_event_obj_list(
account_id = account_id,
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
# inc_location_address = inc_address,
# inc_contact_1 = inc_contact,
# inc_contact_2 = inc_contact,
# inc_contact_3 = inc_contact,
# inc_event_abstract_list = inc_event_abstract_list,
# inc_event_badge_list = inc_event_badge_list,
# inc_event_device_list = inc_event_device_list,
inc_event_exhibit_list = inc_event_exhibit_list,
inc_event_file_list = inc_event_file_list,
inc_event_location_list = inc_event_location_list,
inc_event_person_list = inc_event_person_list,
inc_event_presentation_list = inc_event_presentation_list,
inc_event_presenter_list = inc_event_presenter_list,
inc_event_registration_list = inc_event_registration_list,
inc_event_session_list = inc_event_session_list,
inc_event_track_list = inc_event_track_list,
# inc_person = inc_person,
# inc_user = inc_user,
):
if event_rec_list_result := get_event_rec_list(
for_obj_type = 'account',
for_obj_id = account_id,
limit = limit,
enabled = enabled,
):
event_dict_list = []
for event_rec in event_rec_list_result:
event_dict_list.append(
load_event_obj(
event_id = event_rec.get('event_id', None),
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
# inc_location_address = inc_address,
# inc_contact_1 = inc_contact,
# inc_contact_2 = inc_contact,
# inc_contact_3 = inc_contact,
# inc_event_abstract_list = inc_event_abstract_list,
# inc_event_badge_list = inc_event_badge_list,
# inc_event_device_list = inc_event_device_list,
inc_event_exhibit_list = inc_event_exhibit_list,
inc_event_file_list = inc_event_file_list,
inc_event_location_list = inc_event_location_list,
inc_event_person_list = inc_event_person_list,
inc_event_presentation_list = inc_event_presentation_list,
inc_event_presenter_list = inc_event_presenter_list,
inc_event_registration_list = inc_event_registration_list,
inc_event_session_list = inc_event_session_list,
inc_event_track_list = inc_event_track_list,
# inc_person = inc_person,
# inc_user = inc_user,
)
)
account_obj.event_list = event_dict_list
else: account_obj.event_list = []
# if inc_event_list:
# if event_dict_list := load_event_obj_list(
# account_id = account_id,
# limit = limit,
# model_as_dict = model_as_dict,
# enabled = enabled,
# # inc_location_address = inc_address,
# # inc_contact_1 = inc_contact,
# # inc_contact_2 = inc_contact,
# # inc_contact_3 = inc_contact,
# # inc_event_abstract_list = inc_event_abstract_list,
# # inc_event_badge_list = inc_event_badge_list,
# # inc_event_device_list = inc_event_device_list,
# inc_event_exhibit_list = inc_event_exhibit_list,
# inc_event_file_list = inc_event_file_list,
# inc_event_location_list = inc_event_location_list,
# inc_event_person_list = inc_event_person_list,
# inc_event_presentation_list = inc_event_presentation_list,
# inc_event_presenter_list = inc_event_presenter_list,
# inc_event_registration_list = inc_event_registration_list,
# inc_event_session_list = inc_event_session_list,
# inc_event_track_list = inc_event_track_list,
# # inc_person = inc_person,
# # inc_user = inc_user,
# ):
# account_obj.event_list = event_dict_list
# else: account_obj.event_list = []
# Updated 2021-06-17
if inc_organization_list:
if organization_rec_list_result := get_organization_rec_list(
for_obj_type = 'account',
for_obj_id = account_id,
limit = limit,
enabled = enabled,
):
organization_dict_list = []
for organization_rec in organization_rec_list_result:
organization_dict_list.append(
load_organization_obj(
organization_id = organization_rec.get('organization_id', None),
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
inc_address = inc_address,
inc_contact = inc_contact,
inc_person = inc_person,
inc_user = inc_user,
)
)
account_obj.organization_list = organization_dict_list
else: account_obj.organization_list = []
if inc_post_list:
if post_dict_list := load_post_obj_list(
account_id = account_id,
@@ -203,6 +318,83 @@ def load_account_obj(
account_obj.post_list = post_dict_list
else: account_obj.post_list = []
# Updated 2021-06-17
if inc_person_list:
if person_rec_list_result := get_person_rec_list(
for_obj_type = 'account',
for_obj_id = account_id,
limit = limit,
enabled = enabled,
):
person_dict_list = []
for person_rec in person_rec_list_result:
person_dict_list.append(
load_person_obj(
person_id = person_rec.get('person_id', None),
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
inc_address = inc_address,
inc_contact = inc_contact,
inc_organization = inc_organization,
inc_user = inc_user,
)
)
account_obj.person_list = person_dict_list
else: account_obj.person_list = []
# Updated 2021-06-17
if inc_product_list:
if product_rec_list_result := get_product_rec_list(
for_obj_type = 'account',
for_obj_id = account_id,
limit = limit,
enabled = enabled,
):
product_dict_list = []
for product_rec in product_rec_list_result:
product_dict_list.append(
load_product_obj(
product_id = product_rec.get('product_id', None),
model_as_dict = model_as_dict,
)
)
account_obj.product_list = product_dict_list
else: account_obj.product_list = []
# Updated 2021-06-17
if inc_user_list:
if user_rec_list_result := get_user_rec_list(
for_obj_type = 'account',
for_obj_id = account_id,
limit = limit,
enabled = enabled,
):
user_dict_list = []
for user_rec in user_rec_list_result:
user_dict_list.append(
load_user_obj(
user_id = user_rec.get('user_id', None),
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
inc_address = inc_address,
inc_contact = inc_contact,
inc_event_list = inc_event_list,
inc_journal_list = inc_journal_list,
inc_membership_member = inc_membership_member,
inc_order_list = inc_order_list,
inc_order_cart_list = inc_order_cart_list,
inc_organization = inc_organization,
inc_person = inc_person,
inc_post_list = inc_post_list,
inc_post_comment_list = inc_post_comment_list,
inc_user_role_list = inc_user_role_list,
)
)
account_obj.user_list = user_dict_list
else: account_obj.user_list = []
if model_as_dict:
return account_obj.dict(by_alias=True, exclude_unset=True) # pylint: disable=no-member
else:

View File

@@ -91,3 +91,57 @@ def update_address_obj(
log.debug(address_obj_up_result)
return False
# ### END ### API Address Methods ### update_address_obj() ###
# ### BEGIN ### API Address Methods ### get_address_rec_list() ###
def get_address_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name='for_obj_type'): pass
else: return False
data = {}
data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'address_id', `tbl`.id_random AS 'address_id_random'
FROM `address` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if address_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
address_rec_li = address_rec_li_result
else:
address_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(address_rec_li_result)
return address_rec_li
# ### END ### API Address Methods ### get_address_rec_list() ###

View File

@@ -125,3 +125,57 @@ def load_archive_obj_list(
return archive_result_li
# ### END ### API Archive Methods ### load_archive_obj_list() ###
# ### BEGIN ### API Archive Methods ### get_archive_rec_list() ###
def get_archive_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name='for_obj_type'): pass
else: return False
data = {}
data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'archive_id', `tbl`.id_random AS 'archive_id_random'
FROM `archive` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if archive_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
archive_rec_li = archive_rec_li_result
else:
archive_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(archive_rec_li_result)
return archive_rec_li
# ### END ### API Archive Methods ### get_archive_rec_list() ###

View File

@@ -405,3 +405,57 @@ def update_event_obj(
log.debug(event_obj_up_result)
return False
# ### END ### API Event Methods ### update_event_obj() ###
# ### BEGIN ### API Event Methods ### get_event_rec_list() ###
def get_event_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name='for_obj_type'): pass
else: return False
data = {}
data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'event_id', `tbl`.id_random AS 'event_id_random'
FROM `event` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if event_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
event_rec_li = event_rec_li_result
else:
event_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(event_rec_li_result)
return event_rec_li
# ### END ### API Event Methods ### get_event_rec_list() ###

View File

@@ -24,10 +24,11 @@ def load_membership_cfg_obj(
if membership_cfg_rec := sql_select(
table_name = 'v_membership_cfg',
field_name = 'account_id',
field_value = account_id
field_value = account_id,
): pass
else: return False
log.debug(membership_cfg_rec)
try:
membership_cfg_obj = Membership_Cfg_Base(**membership_cfg_rec)
log.debug(membership_cfg_obj)

View File

@@ -0,0 +1,438 @@
from __future__ import annotations
import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.lib_general import log, logging
from app.db_sql import redis_lookup_id_random, sql_insert_or_update, sql_select
#from .address_models import Address_Base
#from .contact_models import Contact_Base
from app.models.membership_member_models import Membership_Member_Base
from app.models.membership_profile_models import Membership_Profile_Base
#from .organization_models import Organization_Base
#from .person_models import Person_Base
#from .user_models import User_Base
# ### BEGIN ### API Membership Member Methods ### save_membership_member_obj() ###
def save_membership_member_obj(order_obj_new:Membership_Member_Base=None):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if not order_obj_new:
return False
log.debug(order_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True))
order_line_obj_li_curr = [] # Initialize to store order_line list
if order_obj_new.id_random:
log.info(f'An order.id {order_obj_new.id} or order.id_random {order_obj_new.id_random} was included. We can update an existing order.')
log.info(f'Get the current order_line list to compare with what was sent...')
data = {}
data['order_id_random'] = order_obj_new.id_random
if order_line_rec_li_curr := sql_select(table_name='v_order_line', data=data, rm_id_random=True, as_list=True):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(order_line_rec_li_curr)
for order_line_rec in order_line_rec_li_curr:
try:
order_line_obj = Order_Line_Base(**order_line_rec)
log.debug(order_line_obj)
except ValidationError as e:
log.error(e.json())
order_line_obj_li_curr.append(order_line_obj)
else:
log.info(f'No order_line records were found')
elif order_obj_new.account_id_random and (order_obj_new.person_id_random or order_obj_new.user_id_random):
log.info(f'An account.id_random {order_obj_new.account_id_random} was passed. And either a person.id_random {order_obj_new.person_id_random} or user.id_random {order_obj_new.user_id_random} was passed. We can create a new order.')
# Because there was not an order ID, assume there are no order lines yet. So no look up.
else:
log.info('Either an order ID is required to update an order or an account ID along with a person ID or user ID is required to create an order.')
return False
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(order_line_obj_li_curr)
if repl_order_line_list: # This will remove any order line list items not sent with the new order information.
log.info('Removing any order line list items not sent with the new order information...')
for index, order_line_obj_curr in enumerate(order_line_obj_li_curr):
log.info(f'Current: order line ID={order_line_obj_curr.id_random} and product ID= {order_line_obj_curr.product_id_random}')
matched_product_id = False
for order_line_obj_new in order_obj_new.order_line_list:
log.debug(f'Checking new: product ID={order_line_obj_new.product_id_random}')
if order_line_obj_curr.product_id_random == order_line_obj_new.product_id_random:
matched_product_id = True
log.debug(f'Matched: product ID={order_line_obj_new.product_id_random}')
break
else:
log.debug(f'No match: product ID={order_line_obj_new.product_id_random}')
if not matched_product_id: # Was not found in the new order line list sent
log.info(f'Current order line product ID did not match any of the new list. DELETE order line ID {order_line_obj_curr.id_random} with product ID {order_line_obj_curr.product_id_random}')
if order_line_del_result := sql_delete(table_name='order_line', record_id_random=order_line_obj_curr.id_random):
log.info(f'Deleted record and now pop the current list item {index}...')
order_line_obj_li_curr.pop(index)
else:
log.info(f'Current order line product ID matched. Keeping order line ID {order_line_obj_curr.id_random} with product ID {order_line_obj_curr.product_id_random}')
# NOTE: That this current order line item will be updated below.
log.debug(order_line_obj_li_curr)
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('Loop through the line list that was sent and compare with what was pulled from the DB')
# Loop through the new line list that was sent and compare with the current line list that was pulled from the DB
# Only insert if a product ID does not match
# Only update if a product ID does match
for order_line_obj_new in order_obj_new.order_line_list:
log.info(f'New: order line ID={order_line_obj_new.id_random} and product ID= {order_line_obj_new.product_id_random}')
matched_product_id = False
for index, order_line_obj_curr in enumerate(order_line_obj_li_curr):
log.debug(f'Checking current: product ID={order_line_obj_curr.product_id_random}')
if order_line_obj_new.product_id_random == order_line_obj_curr.product_id_random:
matched_product_id = True
log.debug(f'Matched: product ID={order_line_obj_curr.product_id_random}')
log.info(f'Updating the current line item with the new line item.')
order_line_obj_new.id_random = order_line_obj_curr.id_random
order_line_obj_new.id = order_line_obj_curr.id
order_line_obj_li_curr[index] = order_line_obj_new
break
else:
log.debug(f'No match: product ID={order_line_obj_curr.product_id_random}')
if not matched_product_id: # Was not found in the current order line list that was pulled from the DB
log.info(f'New order line product ID did not match any of the current list. Append order line ID {order_line_obj_new.id_random} with product ID {order_line_obj_new.product_id_random}')
log.info('Append to current list...')
order_line_obj_li_curr.append(order_line_obj_new) # These will be inserted/updated below
# Save merged current and new list to the new order object
order_obj_new.order_line_list = order_line_obj_li_curr
log.debug(order_obj_new)
# Final loop through to get the new order totals
# Calculate totals
order_total_amount:int = 0
order_total_quantity:int = 0
for order_line_obj_new in order_obj_new.order_line_list:
order_total_amount += order_line_obj_new.quantity * order_line_obj_new.amount
order_total_quantity += order_line_obj_new.quantity
order_obj_new.total_bill = order_total_amount # "amount" is used by order_cart and Stripe
order_obj_new.total_quantity = order_total_quantity
order_obj_new.balance = order_total_amount - order_obj_new.total_paid
log.debug(order_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'order_line_list', 'cfg', 'created_on', 'updated_on'}))
order_obj_data = order_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'order_line_list', 'cfg', 'created_on', 'updated_on'})
# SQL INSERT or UPDATE the order record
log.info('SQL INSERT or UPDATE the order record')
if order_obj_resp := sql_insert_or_update(data=order_obj_data, table_name='order', rm_id_random=True, id_random_length=8): pass
else: return False
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(order_obj_resp)
if isinstance(order_obj_resp, bool) and order_obj_resp:
if order_id := order_obj_new.id: pass
elif order_id := order_obj_new.id_random: pass
elif isinstance(order_obj_resp, int):
order_id = order_obj_resp
else:
return False
log.debug(f'Order ID={order_id}')
# Loop through the order_line list to SQL INSERT or UPDATE the records
log.info('Loop through the order_line list to SQL INSERT or UPDATE the records')
for order_line_obj_new in order_obj_new.order_line_list:
log.info(f"New order_line: order_line_id_random={order_line_obj_new.id_random}; product_id_random={order_line_obj_new.product_id_random}")
log.debug(order_line_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=False, exclude={'order_line_id_random', 'product_type_id', 'product_type', 'created_on', 'updated_on'}))
order_line_obj_data = order_line_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'order_line_id_random', 'product_type_id', 'product_type', 'created_on', 'updated_on'})
order_line_obj_data['order_id'] = order_id
if order_line_obj_resp := sql_insert_or_update(sql=None, data=order_line_obj_data, table_name='order_line', rm_id_random=True, id_random_length=8): pass
else: return False
log.debug(order_line_obj_resp)
return order_id
# ### END ### API Membership Member Methods ### save_membership_member_obj() ###
# ### BEGIN ### API Membership Member Methods ### load_membership_member_obj() ###
def load_membership_member_obj(
membership_member_id:int|str,
limit: int = 1000,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_address: bool = False,
inc_contact: bool = False,
inc_membership_cfg: bool = False,
inc_membership_group_list: bool = False,
inc_membership_profile: bool = False,
# inc_membership_profile_cust: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
inc_user: bool = False,
) -> Membership_Member_Base:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if membership_member_id := redis_lookup_id_random(record_id_random=membership_member_id, table_name='membership_member'): pass
else: return False
if membership_member_rec := sql_select(table_name='v_membership_member', record_id=membership_member_id): pass
else: return False
try:
membership_member_obj = Membership_Member_Base(**membership_member_rec)
log.debug(membership_member_obj)
except ValidationError as e:
log.error(e.json())
if inc_membership_profile:
membership_profile_id = membership_member_rec.get('membership_profile_id', None)
log.debug(membership_profile_id)
if membership_profile_dict := load_membership_profile_obj(
membership_profile_id = membership_profile_id,
model_as_dict = model_as_dict,
inc_address = inc_address,
inc_contact = inc_contact,
# inc_membership = inc_membership,
# inc_organization = inc_organization,
):
membership_member_obj.membership_profile = membership_profile_dict
else: membership_member_obj.membership_profile = None
log.debug(membership_profile_dict)
if inc_person:
person_id = membership_member_rec.get('person_id', None)
log.debug(person_id)
if person_dict := load_person_obj(
person_id = person_id,
model_as_dict = model_as_dict,
inc_address = inc_address,
inc_contact = inc_contact,
# inc_organization = inc_organization,
# inc_user = inc_user,
):
membership_member_obj.person = person_dict
else: membership_member_obj.person = None
log.debug(person_dict)
if inc_user:
user_id = membership_member_rec.get('user_id', None)
if user_dict := load_user_obj(
user_id = user_id,
model_as_dict = model_as_dict,
inc_address = inc_address,
inc_contact = inc_contact,
# inc_organization = inc_organization,
# inc_person = inc_person,
):
membership_member_obj.user = user_dict
else: membership_member_obj.user = None
log.debug(user_dict)
if membership_member_rec := sql_select(table_name='v_membership', record_id=membership_member_id):
#log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(membership_member_rec)
if inc_membership_member_profile:
if membership_member_profile_rec := sql_select(table_name='v_membership_member_profile', field_name='membership_member_id', field_value=membership_member_id):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(membership_member_profile_rec)
membership_member_rec['profile'] = membership_member_profile_rec
if inc_membership_member_cfg:
if membership_member_cfg_rec := sql_select(table_name='v_membership_member_cfg', field_name='account_id', field_value=membership_member_rec.get('account_id', None)):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(membership_member_cfg_rec)
membership_member_rec['cfg'] = membership_member_cfg_rec
if inc_extended_profile:
account_code = membership_member_rec.get('account_code', None)
table_name = f'c_{account_code}_membership_member_profile'
if extended_profile_rec := sql_select(table_name=table_name, field_name='membership_member_id', field_value=membership_member_id):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(extended_profile_rec)
membership_member_rec['extended_profile'] = extended_profile_rec
if inc_person:
if person_rec := sql_select(table_name='v_person', record_id=membership_member_rec.get('person_id')):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(person_rec)
membership_member_rec['person'] = person_rec
if inc_user:
if user_rec := sql_select(table_name='v_user', record_id=membership_member_rec.get('user_id')):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_rec)
membership_member_rec['user'] = user_rec
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(membership_member_rec)
else:
return False
if model_as_dict:
return membership_member_obj.dict(by_alias=True, exclude_unset=True) # pylint: disable=no-member
else:
return membership_member_obj
# ### END ### API Membership Member Methods ### load_membership_member_obj() ###
# ### BEGIN ### API Membership Member Methods ### load_membership_member_obj_list() ###
def load_membership_member_obj_list(
account_id: int|str|None = None,
membership_id: int|str|None = None,
limit: int = 1000,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_address: bool = False,
inc_contact: bool = False,
inc_membership_cfg: bool = False,
inc_membership_group_list: bool = False,
inc_membership_profile: bool = False,
# inc_membership_profile_cust: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
inc_user: bool = False
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
data = {}
if account_id:
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: return False
data['account_id'] = account_id
sql_obj_type_id = f'`tbl`.account_id = :account_id'
elif membership_id:
if membership_id := redis_lookup_id_random(record_id_random=membership_id, table_name='membership'): pass
else: return False
data['membership_id'] = membership_id
sql_obj_type_id = f'`tbl`.membership_id = :membership_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
# else: tbl_obj['account'] = None
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'membership_member_id', `tbl`.id_random AS 'membership_member_id_random'
FROM `membership_member` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if membership_member_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(membership_member_rec_li_result)
membership_member_result_li = []
for membership_member_rec in membership_member_rec_li_result:
membership_member_id = membership_member_rec.get('membership_member_id', None)
if membership_member_result := load_membership_member_obj(
membership_member_id = membership_member_id,
model_as_dict = model_as_dict,
enabled = enabled,
# inc_membership = inc_membership,
inc_membership_group_list = inc_membership_group_list,
inc_membership_profile = inc_membership_profile,
inc_organization = inc_organization,
inc_person = inc_person,
# inc_product = inc_product,
inc_user = inc_user,
):
log.debug(membership_member_result)
membership_member_result_li.append(membership_member_result)
else:
log.debug(membership_member_result)
membership_member_result_li.append(None)
log.debug(membership_member_result_li)
else:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(membership_member_rec_li_result)
membership_member_result_li = []
return membership_member_result_li
# ### END ### API Membership Member Methods ### load_membership_member_obj_list() ###
# ### BEGIN ### API Membership Member Methods ### get_membership_member_rec_list() ###
def get_membership_member_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name='for_obj_type'): pass
else: return False
data = {}
data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'membership_member_id', `tbl`.id_random AS 'membership_member_id_random'
FROM `membership_member` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if membership_member_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
membership_member_rec_li = membership_member_rec_li_result
else:
membership_member_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(membership_member_rec_li_result)
return membership_member_rec_li
# ### END ### API Membership Member Methods ### get_membership_member_rec_list() ###

View File

@@ -4,217 +4,53 @@ import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.lib_general import *
from app.lib_general import log, logging
from app.db_sql import redis_lookup_id_random, sql_select
#from .address_models import Address_Base
#from .contact_models import Contact_Base
from app.methods.membership_cfg_methods import load_membership_cfg_obj
from app.methods.membership_member_methods import get_membership_member_rec_list, load_membership_member_obj
from app.methods.product_methods import get_product_rec_list, load_product_obj
from app.models.membership_models import Membership_Base
#from .organization_models import Organization_Base
#from .person_models import Person_Base
#from .user_models import User_Base
# ### BEGIN ### API Membership Methods ### save_membership_obj() ###
def save_membership_obj(order_obj_new:Membership_Base=None):
def save_membership_obj(membership_obj_new:Membership_Base):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if not order_obj_new:
return False
log.debug(order_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True))
order_line_obj_li_curr = [] # Initialize to store order_line list
if order_obj_new.id_random:
log.info(f'An order.id {order_obj_new.id} or order.id_random {order_obj_new.id_random} was included. We can update an existing order.')
log.info(f'Get the current order_line list to compare with what was sent...')
data = {}
data['order_id_random'] = order_obj_new.id_random
if order_line_rec_li_curr := sql_select(table_name='v_order_line', data=data, rm_id_random=True, as_list=True):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(order_line_rec_li_curr)
for order_line_rec in order_line_rec_li_curr:
try:
order_line_obj = Order_Line_Base(**order_line_rec)
log.debug(order_line_obj)
except ValidationError as e:
log.error(e.json())
order_line_obj_li_curr.append(order_line_obj)
else:
log.info(f'No order_line records were found')
elif order_obj_new.account_id_random and (order_obj_new.person_id_random or order_obj_new.user_id_random):
log.info(f'An account.id_random {order_obj_new.account_id_random} was passed. And either a person.id_random {order_obj_new.person_id_random} or user.id_random {order_obj_new.user_id_random} was passed. We can create a new order.')
# Because there was not an order ID, assume there are no order lines yet. So no look up.
else:
log.info('Either an order ID is required to update an order or an account ID along with a person ID or user ID is required to create an order.')
return False
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(order_line_obj_li_curr)
if repl_order_line_list: # This will remove any order line list items not sent with the new order information.
log.info('Removing any order line list items not sent with the new order information...')
for index, order_line_obj_curr in enumerate(order_line_obj_li_curr):
log.info(f'Current: order line ID={order_line_obj_curr.id_random} and product ID= {order_line_obj_curr.product_id_random}')
matched_product_id = False
for order_line_obj_new in order_obj_new.order_line_list:
log.debug(f'Checking new: product ID={order_line_obj_new.product_id_random}')
if order_line_obj_curr.product_id_random == order_line_obj_new.product_id_random:
matched_product_id = True
log.debug(f'Matched: product ID={order_line_obj_new.product_id_random}')
break
else:
log.debug(f'No match: product ID={order_line_obj_new.product_id_random}')
if not matched_product_id: # Was not found in the new order line list sent
log.info(f'Current order line product ID did not match any of the new list. DELETE order line ID {order_line_obj_curr.id_random} with product ID {order_line_obj_curr.product_id_random}')
if order_line_del_result := sql_delete(table_name='order_line', record_id_random=order_line_obj_curr.id_random):
log.info(f'Deleted record and now pop the current list item {index}...')
order_line_obj_li_curr.pop(index)
else:
log.info(f'Current order line product ID matched. Keeping order line ID {order_line_obj_curr.id_random} with product ID {order_line_obj_curr.product_id_random}')
# NOTE: That this current order line item will be updated below.
log.debug(order_line_obj_li_curr)
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('Loop through the line list that was sent and compare with what was pulled from the DB')
# Loop through the new line list that was sent and compare with the current line list that was pulled from the DB
# Only insert if a product ID does not match
# Only update if a product ID does match
for order_line_obj_new in order_obj_new.order_line_list:
log.info(f'New: order line ID={order_line_obj_new.id_random} and product ID= {order_line_obj_new.product_id_random}')
matched_product_id = False
for index, order_line_obj_curr in enumerate(order_line_obj_li_curr):
log.debug(f'Checking current: product ID={order_line_obj_curr.product_id_random}')
if order_line_obj_new.product_id_random == order_line_obj_curr.product_id_random:
matched_product_id = True
log.debug(f'Matched: product ID={order_line_obj_curr.product_id_random}')
log.info(f'Updating the current line item with the new line item.')
order_line_obj_new.id_random = order_line_obj_curr.id_random
order_line_obj_new.id = order_line_obj_curr.id
order_line_obj_li_curr[index] = order_line_obj_new
break
else:
log.debug(f'No match: product ID={order_line_obj_curr.product_id_random}')
if not matched_product_id: # Was not found in the current order line list that was pulled from the DB
log.info(f'New order line product ID did not match any of the current list. Append order line ID {order_line_obj_new.id_random} with product ID {order_line_obj_new.product_id_random}')
log.info('Append to current list...')
order_line_obj_li_curr.append(order_line_obj_new) # These will be inserted/updated below
# Save merged current and new list to the new order object
order_obj_new.order_line_list = order_line_obj_li_curr
log.debug(order_obj_new)
# Final loop through to get the new order totals
# Calculate totals
order_total_amount:int = 0
order_total_quantity:int = 0
for order_line_obj_new in order_obj_new.order_line_list:
order_total_amount += order_line_obj_new.quantity * order_line_obj_new.amount
order_total_quantity += order_line_obj_new.quantity
order_obj_new.total_bill = order_total_amount # "amount" is used by order_cart and Stripe
order_obj_new.total_quantity = order_total_quantity
order_obj_new.balance = order_total_amount - order_obj_new.total_paid
log.debug(order_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'order_line_list', 'cfg', 'created_on', 'updated_on'}))
order_obj_data = order_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'order_line_list', 'cfg', 'created_on', 'updated_on'})
# SQL INSERT or UPDATE the order record
log.info('SQL INSERT or UPDATE the order record')
if order_obj_resp := sql_insert_or_update(data=order_obj_data, table_name='order', rm_id_random=True, id_random_length=8): pass
else: return False
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(order_obj_resp)
if isinstance(order_obj_resp, bool) and order_obj_resp:
if order_id := order_obj_new.id: pass
elif order_id := order_obj_new.id_random: pass
elif isinstance(order_obj_resp, int):
order_id = order_obj_resp
else:
return False
log.debug(f'Order ID={order_id}')
# Loop through the order_line list to SQL INSERT or UPDATE the records
log.info('Loop through the order_line list to SQL INSERT or UPDATE the records')
for order_line_obj_new in order_obj_new.order_line_list:
log.info(f"New order_line: order_line_id_random={order_line_obj_new.id_random}; product_id_random={order_line_obj_new.product_id_random}")
log.debug(order_line_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=False, exclude={'order_line_id_random', 'product_type_id', 'product_type', 'created_on', 'updated_on'}))
order_line_obj_data = order_line_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'order_line_id_random', 'product_type_id', 'product_type', 'created_on', 'updated_on'})
order_line_obj_data['order_id'] = order_id
if order_line_obj_resp := sql_insert_or_update(sql=None, data=order_line_obj_data, table_name='order_line', rm_id_random=True, id_random_length=8): pass
else: return False
log.debug(order_line_obj_resp)
return order_id
return membership_id
# ### END ### API Membership Methods ### save_membership_obj() ###
# ### BEGIN ### API Membership Methods ### load_membership_obj() ###
def load_membership_obj(membership_id:int|str, inc_membership_profile:bool=False, inc_membership_cfg:bool=False, inc_extended_profile:bool=False, inc_person:bool=False, inc_user:bool=False) -> Membership_Base:
def load_membership_obj(
membership_id:int|str,
limit: int = 1000,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_address: bool = False,
inc_contact: bool = False,
inc_membership_cfg: bool = False,
inc_membership_group_list: bool = False,
inc_membership_member_list: bool = False,
inc_membership_profile: bool = False,
inc_organization: bool = False,
inc_person: bool = False,
inc_product: bool = False,
inc_product_list: bool = False,
inc_user: bool = False,
) -> Membership_Base:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if membership_id := redis_lookup_id_random(record_id_random=membership_id, table_name='membership'): pass
else: return False
if membership_rec := sql_select(table_name='v_membership', record_id=membership_id):
#log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(membership_rec)
if membership_rec := sql_select(table_name='v_membership', record_id=membership_id): pass
else: return False
if inc_membership_profile:
if membership_profile_rec := sql_select(table_name='v_membership_profile', field_name='membership_id', field_value=membership_id):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(membership_profile_rec)
membership_rec['profile'] = membership_profile_rec
if inc_membership_cfg:
if membership_cfg_rec := sql_select(table_name='v_membership_cfg', field_name='account_id', field_value=membership_rec.get('account_id', None)):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(membership_cfg_rec)
membership_rec['cfg'] = membership_cfg_rec
if inc_extended_profile:
account_code = membership_rec.get('account_code', None)
table_name = f'c_{account_code}_membership_profile'
if extended_profile_rec := sql_select(table_name=table_name, field_name='membership_id', field_value=membership_id):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(extended_profile_rec)
membership_rec['extended_profile'] = extended_profile_rec
if inc_person:
if person_rec := sql_select(table_name='v_person', record_id=membership_rec.get('person_id')):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(person_rec)
membership_rec['person'] = person_rec
if inc_user:
if user_rec := sql_select(table_name='v_user', record_id=membership_rec.get('user_id')):
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_rec)
membership_rec['user'] = user_rec
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(membership_rec)
else:
return False
log.debug(membership_rec)
try:
membership_obj = Membership_Base(**membership_rec)
@@ -222,5 +58,59 @@ def load_membership_obj(membership_id:int|str, inc_membership_profile:bool=False
except ValidationError as e:
log.error(e.json())
return membership_obj
if inc_membership_cfg:
if membership_cfg_rec := sql_select(table_name='v_membership_cfg', field_name='account_id', field_value=membership_rec.get('account_id', None)):
log.debug(membership_cfg_rec)
membership_rec['cfg'] = membership_cfg_rec
if inc_membership_member_list:
if membership_member_rec_list_result := get_membership_member_rec_list(
for_obj_type = 'membership',
for_obj_id = membership_id,
limit = limit,
enabled = enabled,
):
membership_member_dict_list = []
for membership_member_rec in membership_member_rec_list_result:
membership_member_dict_list.append(
load_membership_member_obj(
membership_member_id = membership_member_rec.get('membership_member_id', None),
limit = limit,
model_as_dict = model_as_dict,
enabled = enabled,
# inc_membership = inc_membership,
inc_membership_group_list = inc_membership_group_list,
inc_membership_profile = inc_membership_profile,
inc_organization = inc_organization,
inc_person = inc_person,
inc_product = inc_product,
inc_user = inc_user,
)
)
membership_obj.membership_member_list = membership_member_dict_list
else: membership_obj.membership_member_list = []
if inc_product_list:
if product_rec_list_result := get_product_rec_list(
for_obj_type = 'membership',
for_obj_id = membership_id,
limit = limit,
enabled = enabled,
):
product_dict_list = []
for product_rec in product_rec_list_result:
product_dict_list.append(
load_product_obj(
product_id = product_rec.get('product_id', None),
model_as_dict = model_as_dict,
)
)
membership_obj.product_list = product_dict_list
else: membership_obj.product_list = []
if model_as_dict:
return membership_obj.dict(by_alias=True, exclude_unset=True) # pylint: disable=no-member
else:
return membership_obj
# ### END ### API Membership Methods ### load_membership_obj() ###

View File

@@ -106,4 +106,58 @@ def update_organization_obj(
else:
log.debug(organization_obj_up_result)
return False
# ### END ### API Organization Methods ### update_organization_obj() ###
# ### END ### API Organization Methods ### update_organization_obj() ###
# ### BEGIN ### API Organization Methods ### get_organization_rec_list() ###
def get_organization_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name='for_obj_type'): pass
else: return False
data = {}
data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'organization_id', `tbl`.id_random AS 'organization_id_random'
FROM `organization` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if organization_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
organization_rec_li = organization_rec_li_result
else:
organization_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(organization_rec_li_result)
return organization_rec_li
# ### END ### API Organization Methods ### get_organization_rec_list() ###

View File

@@ -240,3 +240,57 @@ def update_person_obj(
log.debug(person_obj_up_result)
return False
# ### END ### API Person Methods ### update_person_obj() ###
# ### BEGIN ### API Person Methods ### get_person_rec_list() ###
def get_person_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name='for_obj_type'): pass
else: return False
data = {}
data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'person_id', `tbl`.id_random AS 'person_id_random'
FROM `person` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if person_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
person_rec_li = person_rec_li_result
else:
person_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(person_rec_li_result)
return person_rec_li
# ### END ### API Person Methods ### get_person_rec_list() ###

View File

@@ -0,0 +1,92 @@
from __future__ import annotations
import datetime
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.lib_general import log, logging
from app.db_sql import redis_lookup_id_random, sql_insert_or_update, sql_select
from app.models.product_models import Product_Base
# ### BEGIN ### API Product Methods ### load_product_obj() ###
def load_product_obj(
product_id: int|str,
limit: int = 1000,
model_as_dict: bool = False,
) -> Product_Base:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if product_id := redis_lookup_id_random(record_id_random=product_id, table_name='product'): pass
else: return False
if product_rec := sql_select(table_name='v_product', record_id=product_id): pass
else: return False
try:
product_obj = Product_Base(**product_rec)
log.debug(product_obj)
except ValidationError as e:
log.error(e.json())
if model_as_dict:
return product_obj.dict(by_alias=True, exclude_unset=True) # pylint: disable=no-member
else:
return product_obj
# ### END ### API Product Methods ### load_product_obj() ###
# ### BEGIN ### API Product Methods ### get_product_rec_list() ###
def get_product_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name='for_obj_type'): pass
else: return False
data = {}
data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'product_id', `tbl`.id_random AS 'product_id_random'
FROM `product` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if product_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
product_rec_li = product_rec_li_result
else:
product_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(product_rec_li_result)
return product_rec_li
# ### END ### API Product Methods ### get_product_rec_list() ###

View File

@@ -180,18 +180,18 @@ def load_user_obj(
# NOTE: Including user roles should probably be reviewed
if inc_user_role_list:
if role_rec_li := sql_select(table_name='v_user_role_detail', field_name='user_id', field_value=user_id, as_list=True):
user_rec['role_list'] = role_rec_li
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(role_rec_li)
# user_rec['role_list'] = role_rec_li
user_obj.role_list = role_rec_li
else:
user_rec['role_list'] = None
# user_rec['role_list'] = None
user_obj.role_list = None
log.debug(user_rec)
log.debug(user_obj)
#log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_rec)
return user_obj
if model_as_dict:
return user_obj.dict(by_alias=True, exclude_unset=True) # pylint: disable=no-member
else:
return user_obj
# ### END ### API User Methods ### load_user_obj() ###
@@ -299,3 +299,57 @@ def update_user_obj(
log.debug(user_obj_up_result)
return False
# ### END ### API User Methods ### update_user_obj() ###
# ### BEGIN ### API User Methods ### get_user_rec_list() ###
def get_user_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name='for_obj_type'): pass
else: return False
data = {}
data[f'{for_obj_type}_id'] = for_obj_id
# data['for_obj_type'] = for_obj_type
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id'
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f"""
SELECT `tbl`.id AS 'user_id', `tbl`.id_random AS 'user_id_random'
FROM `user` AS `tbl`
WHERE
{sql_obj_type_id}
{sql_enabled}
ORDER BY `tbl`.created_on DESC, `tbl`.updated_on DESC
{sql_limit};
"""
if user_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
user_rec_li = user_rec_li_result
else:
user_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(user_rec_li_result)
return user_rec_li
# ### END ### API User Methods ### get_user_rec_list() ###