Files
OSIT-AE-API-FastAPI/app/routers/api_crud.py

1806 lines
79 KiB
Python

import datetime, json, time
#from datetime import datetime, time, timedelta
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Path, Query, Response, status
from fastapi.responses import FileResponse
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from app.lib_general import log, logging, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min
# from app.config import settings
from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random, lookup_id_random_pop
from app.models.response_models import *
from app.models.api_crud_models import *
from app.models.account_models import *
from app.models.account_cfg_models import *
from app.models.activity_log_models import *
from app.models.address_models import *
from app.models.archive_models import *
from app.models.archive_content_models import *
from app.models.contact_models import *
from app.models.cont_edu_cert_models import *
from app.models.cont_edu_cert_person_models import *
from app.models.data_store_models import *
from app.models.event_models import *
from app.models.event_abstract_models import *
from app.models.event_badge_models import *
from app.models.event_exhibit_models import *
from app.models.event_exhibit_tracking_models import *
from app.models.event_file_models import *
from app.models.event_location_models import *
from app.models.event_person_models import *
from app.models.event_person_tracking_models import *
from app.models.event_presentation_models import *
from app.models.event_presenter_models import *
from app.models.event_registration_models import *
from app.models.event_session_models import *
from app.models.event_track_models import *
from app.models.grant_models import *
from app.models.hosted_file_models import *
from app.models.journal_models import *
from app.models.journal_entry_models import *
from app.models.log_client_viewing_models import Log_Client_Viewing_Base
from app.models.membership_cfg_models import *
from app.models.membership_group_models import *
from app.models.membership_person_group_models import *
from app.models.membership_person_models import *
from app.models.membership_person_profile_models import *
from app.models.membership_type_models import *
from app.models.membership_person_type_models import *
from app.models.order_models import *
from app.models.order_cart_models import *
from app.models.organization_models import *
from app.models.page_models import *
from app.models.person_models import *
from app.models.product_models import *
from app.models.post_models import *
from app.models.post_comment_models import *
from app.models.site_models import *
from app.models.site_domain_models import *
from app.models.sponsorship_cfg_models import *
from app.models.sponsorship_models import *
from app.models.user_models import *
from app.models.user_role_models import *
from app.models.e_stripe_models import *
obj_type_kv_li = {} # New 2024-04-23
obj_type_li = {} # Old...
#obj_type_li['cfg_flask'] = {'table_name': 'cfg_flask', 'base_name': Cfg_Flask_Base}
#obj_type_li['api_client_token'] = {'table_name': 'api_client_token', 'base_name': Api_Client_Token_Base}
#obj_type_li['api_key'] = {'table_name': 'api_key', 'base_name': Api_Key_Base}
#obj_type_li['api_token'] = {'table_name': 'api_token', 'base_name': Api_Token_Base}
# ### Core module objects
obj_type_li['account'] = {'table_name': 'account', 'tbl_name_update': 'account', 'base_name': Account_Base}
obj_type_li['account_cfg'] = {'table_name': 'v_account_cfg', 'tbl_name_update': 'account_cfg', 'base_name': Account_Cfg_Base} # NOTE check view name: *_detail?
obj_type_li['activity_log'] = {'table_name': 'activity_log', 'tbl_name_update': 'activity_log', 'base_name': Activity_Log_Base}
obj_type_li['address'] = {'table_name': 'v_address', 'tbl_name_update': 'address', 'base_name': Address_Base}
#obj_type_li['change_log'] = {'table_name': 'change_log', 'tbl_name_update': 'change_log', 'base_name': Change_Log_Base}
obj_type_li['contact'] = {'table_name': 'v_contact', 'tbl_name_update': 'contact', 'base_name': Contact_Base}
obj_type_li['data_store'] = {'table_name': 'v_data_store', 'tbl_name_update': 'data_store', 'base_name': Data_Store_Base}
obj_type_li['hosted_file'] = {'table_name': 'v_hosted_file', 'tbl_name_update': 'hosted_file', 'base_name': Hosted_File_Base}
#obj_type_li['hosted_file_link'] = {'table_name': 'hosted_file_link', 'tbl_name_update': 'hosted_file_link', 'base_name': Hosted_File_Link_Base}
obj_type_li['log_client_viewing'] = {'table_name': 'log_client_viewing', 'tbl_name_update': 'log_client_viewing', 'base_name': Log_Client_Viewing_Base}
obj_type_li['order'] = {'table_name': 'v_order', 'tbl_name_update': 'order', 'base_name': Order_Base}
obj_type_li['order_cart'] = {'table_name': 'v_order_cart', 'tbl_name_update': 'order_cart', 'base_name': Order_Cart_Base}
obj_type_li['order_cart_line'] = {'table_name': 'v_order_cart_line', 'tbl_name_update': 'order_cart_line', 'base_name': Order_Cart_Line_Base}
obj_type_li['order_line'] = {'table_name': 'v_order_line', 'tbl_name_update': 'order_line', 'base_name': Order_Line_Base}
#obj_type_li['order_transaction'] = {'table_name': 'order_transaction', 'tbl_name_update': 'order_transaction', 'base_name': Order_Transaction_Base}
obj_type_li['organization'] = {'table_name': 'v_organization', 'tbl_name_update': 'organization', 'base_name': Organization_Base}
obj_type_li['page'] = {'table_name': 'page', 'tbl_name_update': 'page', 'base_name': Page_Base}
obj_type_li['person'] = {'table_name': 'v_person', 'tbl_name_update': 'person', 'base_name': Person_Base}
obj_type_li['site'] = {'table_name': 'site', 'tbl_name_update': 'site', 'base_name': Site_Base}
obj_type_li['site_domain'] = {'table_name': 'v_site_domain', 'table_name_alt': 'v_site_domain_fqdn_id', 'tbl_name_update': 'site_domain', 'base_name': Site_Domain_Base, 'base_name_alt': Site_Domain_FQDN_ID_Base} # NOTE check view name: *_detail?
obj_type_li['user'] = {'table_name': 'v_user', 'tbl_name_update': 'user', 'base_name': User_Base}
obj_type_li['user_role'] = {'table_name': 'v_user_role', 'tbl_name_update': 'user_role', 'base_name': User_Role_Base} # NOTE check view name: *_detail?
# ### Common shared lookup objects
obj_type_li['lu_country'] = {'table_name': 'lu_country', 'tbl_name_update': 'lu_country', 'base_name': None}
obj_type_li['lu_country_subdivision'] = {'table_name': 'lu_country_subdivision', 'tbl_name_update': 'lu_country_subdivision', 'base_name': None}
#obj_type_li['lu_education_degree'] = {'table_name': 'lu_education_degree', 'tbl_name_update': 'lu_education_degree', 'base_name': Lu_Education_Degree_Base}
#obj_type_li['lu_education_level'] = {'table_name': 'lu_education_level', 'tbl_name_update': 'lu_education_level', 'base_name': Lu_Education_Level_Base}
#obj_type_li['lu_ethnicity'] = {'table_name': 'lu_ethnicity', 'tbl_name_update': 'lu_ethnicity', 'base_name': Lu_Ethnicity_Base}
#obj_type_li['lu_file_purpose'] = {'table_name': 'lu_file_purpose', 'tbl_name_update': 'lu_file_purpose', 'base_name': Lu_File_Purpose_Base}
#obj_type_li['lu_gender'] = {'table_name': 'lu_gender', 'tbl_name_update': 'lu_gender', 'base_name': Lu_Gender_Base}
#obj_type_li['lu_html_color'] = {'table_name': 'lu_html_color', 'tbl_name_update': 'lu_html_color', 'base_name': Lu_Html_Color_Base}
#obj_type_li['lu_media_type'] = {'table_name': 'lu_media_type', 'tbl_name_update': 'lu_media_type', 'base_name': Lu_Media_Type_Base}
#obj_type_li['lu_membership_status'] = {'table_name': 'lu_membership_status', 'tbl_name_update': 'lu_membership_status', 'base_name': Lu_Membership_Status_Base}
#obj_type_li['lu_membership_type'] = {'table_name': 'lu_membership_type', 'tbl_name_update': 'lu_membership_type', 'base_name': Lu_Membership_Type_Base}
#obj_type_li['lu_order_status'] = {'table_name': 'lu_order_status', 'tbl_name_update': 'lu_order_status', 'base_name': Lu_Order_Status_Base}
#obj_type_li['lu_post_topic'] = {'table_name': 'lu_post_topic', 'tbl_name_update': 'lu_post_topic', 'base_name': Lu_Post_Topic_Base}
#obj_type_li['lu_product_type'] = {'table_name': 'lu_product_type', 'tbl_name_update': 'lu_product_type', 'base_name': Lu_Product_Type_Base}
#obj_type_li['lu_pronoun'] = {'table_name': 'lu_pronoun', 'tbl_name_update': 'lu_pronoun', 'base_name': Lu_Pronoun_Base}
#obj_type_li['lu_race'] = {'table_name': 'lu_race', 'tbl_name_update': 'lu_race', 'base_name': Lu_Race_Base}
obj_type_li['lu_time_zone'] = {'table_name': 'v_lu_time_zone', 'tbl_name_update': 'lu_time_zone', 'base_name': None}
#obj_type_li['lu_user_role'] = {'table_name': 'lu_user_role', 'tbl_name_update': 'lu_user_role', 'base_name': Lu_User_Role_Base}
#obj_type_li['lu_user_status'] = {'table_name': 'lu_user_status', 'tbl_name_update': 'lu_user_status', 'base_name': Lu_User_Status_Base}
# ### Additional module objects
obj_type_li['archive'] = {'table_name': 'v_archive', 'table_name_alt': 'v_archive_w_content_count', 'tbl_name_update': 'archive', 'base_name': Archive_Base}
obj_type_li['archive_content'] = {'table_name': 'v_archive_content', 'tbl_name_update': 'archive_content', 'base_name': Archive_Content_Base}
obj_type_li['cont_edu_cert'] = {'table_name': 'v_cont_edu_cert', 'tbl_name_update': 'cont_edu_cert', 'base_name': Cont_Edu_Cert_Base}
obj_type_li['cont_edu_cert_person'] = {'table_name': 'v_cont_edu_cert_person', 'tbl_name_update': 'cont_edu_cert_person', 'base_name': Cont_Edu_Cert_Person_Base}
obj_type_li['event'] = {'table_name': 'v_event', 'table_name_alt': 'v_event_w_file_count',
'tbl_name_update': 'event', 'base_name': Event_Base, 'base_name_alt': Event_Meeting_Flat_Base}
obj_type_li['event_abstract'] = {'table_name': 'v_event_abstract', 'tbl_name_update': 'event_abstract', 'base_name': Event_Abstract_In}
obj_type_li['event_badge'] = {'table_name': 'v_event_badge', 'table_name_alt': 'v_event_badge_only', 'tbl_name_update': 'event_badge', 'base_name': Event_Badge_Base, 'base_name_alt': Event_Badge_Basic_Base}
#obj_type_li['event_badge_log'] = {'table_name': 'event_badge_log', 'tbl_name_update': 'event_badge_log', 'base_name': Event_Badge_Log_Base}
#obj_type_li['event_badge_template'] = {'table_name': 'event_badge_template', 'tbl_name_update': 'event_badge_template', 'base_name': Event_Badge_Template_Base}
#obj_type_li['event_device'] = {'table_name': 'event_device', 'tbl_name_update': 'event_device', 'base_name': Event_Device_Base}
obj_type_li['event_exhibit'] = {'table_name': 'v_event_exhibit', 'tbl_name_update': 'event_exhibit', 'base_name': Event_Exhibit_Base} # NOTE check view name: *_detail?
obj_type_li['event_exhibit_tracking'] = {'table_name': 'v_event_exhibit_tracking', 'tbl_name_update': 'event_exhibit_tracking', 'base_name': Event_Exhibit_Tracking_Base}
# NOTE: Using v_event_file_simple instead of v_event_file because of linking with for_type and for_id versus event_id, event_session_id, event_presenter_id, etc. 2022-08-19
# NOTE: This will not pull in linked to details like a session name, presentation time, or presenter name.
obj_type_li['event_file'] = {'table_name': 'v_event_file_simple', 'tbl_name_update': 'event_file_simple', 'base_name': Event_File_Base} # Should this eventually be changed to event_hosted_file
obj_type_li['event_location'] = {'table_name': 'v_event_location', 'tbl_name_update': 'event_location', 'base_name': Event_Location_Base}
obj_type_li['event_person'] = {'table_name': 'v_event_person', 'tbl_name_update': 'event_person', 'base_name': Event_Person_Base}
obj_type_li['event_person_tracking'] = {'table_name': 'v_event_person_tracking', 'tbl_name_update': 'event_person_tracking', 'base_name': Event_Person_Tracking_Base}
obj_type_li['event_presentation'] = {'table_name': 'v_event_presentation', 'tbl_name_update': 'event_presentation', 'base_name': Event_Presentation_Base}
obj_type_li['event_presenter'] = {'table_name': 'v_event_presenter', 'tbl_name_update': 'event_presenter', 'base_name': Event_Presenter_Base}
obj_type_li['event_registration'] = {'table_name': 'v_event_registration', 'tbl_name_update': 'event_registration', 'base_name': Event_Registration_Base}
obj_type_li['event_session'] = {'table_name': 'v_event_session', 'tbl_name_update': 'event_session', 'base_name': Event_Session_Base, 'exclude_for_db': {'poc_person_id', 'file_count', 'internal_use_count', 'enable_from', 'enable_to', 'event_name', 'event_start_datetime', 'event_end_datetime', 'event_location_name', 'event_track_name', 'event_abstract_list', 'event_badge_list', 'event_device_list', 'event_file_list', 'event_file_internal_use_list', 'event_location', 'event_location_list', 'event_person_list', 'event_presenter_cat', 'event_presentation_list', 'event_presenter_list', 'event_track', 'poc_event_person'}}
obj_type_li['event_track'] = {'table_name': 'v_event_track', 'tbl_name_update': 'event_track', 'base_name': Event_Track_Base}
obj_type_li['grant'] = {'table_name': 'v_grant', 'tbl_name_update': 'grant', 'base_name': Grant_Base}
obj_type_li['journal'] = {'table_name': 'v_journal', 'tbl_name_update': 'journal', 'base_name': Journal_Base}
obj_type_li['journal_entry'] = {'table_name': 'v_journal_entry', 'tbl_name_update': 'journal_entry', 'base_name': Journal_Entry_Base}
#obj_type_li['log'] = {'table_name': 'log', 'tbl_name_update': 'log', 'base_name': Log_Base} #'v_log'
obj_type_li['membership_cfg'] = {'table_name': 'v_membership_cfg', 'tbl_name_update': 'membership_cfg', 'base_name': Membership_Cfg_Base}
obj_type_li['membership_group'] = {'table_name': 'v_membership_group', 'tbl_name_update': 'membership_group', 'base_name': Membership_Group_Base}
obj_type_li['membership_person_group'] = {'table_name': 'v_membership_person_group', 'tbl_name_update': 'membership_person_group', 'base_name': Membership_Person_Group_Base}
obj_type_li['membership_person'] = {'table_name': 'v_membership_person', 'tbl_name_update': 'membership_person', 'base_name': Membership_Person_Base}
obj_type_li['membership_person_profile'] = {'table_name': 'v_membership_person_profile', 'tbl_name_update': 'membership_person_profile', 'base_name': Membership_Person_Profile_Base}
obj_type_li['membership_type'] = {'table_name': 'v_membership_type', 'tbl_name_update': 'membership_type', 'base_name': Membership_Type_Base}
obj_type_li['membership_person_type'] = {'table_name': 'v_membership_person_type', 'tbl_name_update': 'membership_person_type', 'base_name': Membership_Person_Type_Base}
#obj_type_li['message'] = {'table_name': 'message', 'tbl_name_update': 'message', 'base_name': Message_Base} #'v_message'
obj_type_li['post'] = {'table_name': 'v_post', 'tbl_name_update': 'post', 'base_name': Post_Base} # NOTE check view name: *_detail?
obj_type_li['post_comment'] = {'table_name': 'v_post_comment', 'tbl_name_update': 'post_comment', 'base_name': Post_Comment_Base} # NOTE check view name: *_detail?
obj_type_li['product'] = {'table_name': 'v_product', 'tbl_name_update': 'product', 'base_name': Product_Base}
obj_type_li['sponsorship'] = {'table_name': 'v_sponsorship', 'tbl_name_update': 'sponsorship', 'base_name': Sponsorship_Base, 'tbl': 'v_sponsorship', 'tbl': 'v_sponsorship', 'mdl': Sponsorship_Base } # NOTE check view name: *_detail?
obj_type_li['sponsorship_cfg'] = {'table_name': 'v_sponsorship_cfg', 'tbl_name_update': 'sponsorship_cfg', 'base_name': Sponsorship_Cfg_Base}
#obj_type_li['stripe_customer'] = {'table_name': 'stripe_customer', 'tbl_name_update': 'stripe_customer', 'base_name': Stripe_Customer_Base}
obj_type_li['stripe_log'] = {'table_name': 'stripe_log', 'tbl_name_update': 'stripe_log', 'base_name': Stripe_Log_Base_In}
# obj_type_li['c_idda_membership_person_profile'] = {'table_name': 'c_idda_membership_person_profile', 'base_name': C_Idda_membership_person_profile_Base}
# obj_type_li['c_osit_demo_membership_person_profile'] = {'table_name': 'c_osit_demo_membership_person_profile', 'base_name': C_Osit_Demo_membership_person_profile_Base}
router = APIRouter()
# Split API CRUD functions for FastAPI 0.95.1 - STI 2024-04-26
# ### BEGIN ### API CRUD ### get_obj_li_lx() ###
# Updated 2024-04-26
@router.get('/{obj_type_l1}/list')
async def get_obj_li_l1(
obj_type_l1: str = Path(min_length=2, max_length=50),
for_obj_type: Optional[str] = Query(None, max_length=50),
for_obj_id: Optional[str] = Query(None, max_length=22),
use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-11-17
use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-11-17
hidden: str = 'not_hidden', # hidden, not_hidden, all,
# order_by_li: dict = None,
order_by_li: str = Header(None), # JSON formatted string in a key value format. It is not ideal that this is in the header. Need a better option, but this is currently a GET request.
# Get the "json" param from the query string. This is a JSON formatted string of the data to be inserted.
jp: Optional[Union[str, None]] = None,
file_type: str = 'CSV', # CSV, Excel
return_file: Optional[bool] = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# ### SECTION ### Call generic function to get the list of objects
return handle_get_obj_li(
obj_type_l1=obj_type_l1,
for_obj_type=for_obj_type,
for_obj_id=for_obj_id,
use_alt_table=use_alt_table,
use_alt_base=use_alt_base,
hidden=hidden,
order_by_li=order_by_li,
jp=jp,
file_type=file_type,
return_file=return_file,
commons=commons,
)
@router.get('/{obj_type_l1}/{obj_type_l2}/list')
async def get_obj_li_l2(
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_type_l2: str = Path(min_length=2, max_length=50),
for_obj_type: Optional[str] = Query(None, max_length=50),
for_obj_id: Optional[str] = Query(None, max_length=22),
use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-11-17
use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-11-17
hidden: str = 'not_hidden', # hidden, not_hidden, all,
# order_by_li: dict = None,
order_by_li: str = Header(None), # JSON formatted string in a key value format. It is not ideal that this is in the header. Need a better option, but this is currently a GET request.
# Get the "json" param from the query string. This is a JSON formatted string of the data to be inserted.
jp: Optional[Union[str, None]] = None,
file_type: str = 'CSV', # CSV, Excel
return_file: Optional[bool] = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# ### SECTION ### Call generic function to get the list of objects
return handle_get_obj_li(
obj_type_l1=obj_type_l1,
obj_type_l2=obj_type_l2,
for_obj_type=for_obj_type,
for_obj_id=for_obj_id,
use_alt_table=use_alt_table,
use_alt_base=use_alt_base,
hidden=hidden,
order_by_li=order_by_li,
jp=jp,
file_type=file_type,
return_file=return_file,
commons=commons,
)
@router.get('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/list')
async def get_obj_li_l3(
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_type_l2: str = Path(min_length=2, max_length=50),
obj_type_l3: str = Path(min_length=2, max_length=50),
for_obj_type: Optional[str] = Query(None, max_length=50),
for_obj_id: Optional[str] = Query(None, max_length=22),
use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-11-17
use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-11-17
hidden: str = 'not_hidden', # hidden, not_hidden, all,
# order_by_li: dict = None,
order_by_li: str = Header(None), # JSON formatted string in a key value format. It is not ideal that this is in the header. Need a better option, but this is currently a GET request.
# Get the "json" param from the query string. This is a JSON formatted string of the data to be inserted.
jp: Optional[Union[str, None]] = None,
file_type: str = 'CSV', # CSV, Excel
return_file: Optional[bool] = False,
commons: Common_Route_Params = Depends(common_route_params),
):
# ### SECTION ### Call generic function to get the list of objects
return handle_get_obj_li(
obj_type_l1=obj_type_l1,
obj_type_l2=obj_type_l2,
obj_type_l3=obj_type_l3,
for_obj_type=for_obj_type,
for_obj_id=for_obj_id,
use_alt_table=use_alt_table,
use_alt_base=use_alt_base,
hidden=hidden,
order_by_li=order_by_li,
jp=jp,
file_type=file_type,
return_file=return_file,
commons=commons,
)
def handle_get_obj_li(
obj_type_l1: str,
obj_type_l2: Optional[str] = None,
obj_type_l3: Optional[str] = None,
for_obj_type: Optional[str] = None,
for_obj_id: Optional[str] = None,
use_alt_table: bool = False,
use_alt_base: bool = False,
hidden: str = 'not_hidden',
order_by_li: Optional[Union[str, None]] = None,
jp: Optional[Union[str, None]] = None,
file_type: str = 'CSV',
return_file: Optional[bool] = False,
commons: Common_Route_Params = None,
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
import urllib
# This should be a dict list of fields with a list of values to search for using FULLTEXT.
fulltext_qry_dict_obj = None
# This should be a dict list of fields with a list of values to search for using AND.
and_qry_dict_obj = None
# This should be a dict list of fields with a list of values to search for using AND LIKE.
and_like_dict_obj = None
# This should be a dict list of fields with a list of values to search for using AND IN.
and_in_dict_li_obj = None
jp_obj = None
if jp:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug( urllib.parse.unquote(jp) )
try:
jp_obj = json.loads(urllib.parse.unquote(jp))
except Exception as e:
log.warning(e)
return mk_resp(data=False, status_code=400, response=commons.response, status_message='The JSON string was not formatted correctly.')
log.info(jp_obj)
if jp_obj.get('ft_qry'): # NOTE: This is for the fulltext query
fulltext_qry_dict_obj = jp_obj['ft_qry']
if jp_obj.get('and_qry'): # NOTE: This is for the additional AND clauses in the WHERE statement
and_qry_dict_obj = jp_obj['and_qry']
if jp_obj.get('and_like'): # NOTE: This is for the additional AND LIKE clauses in the WHERE statement
and_like_dict_obj = jp_obj['and_like']
if jp_obj.get('and_in_li'): # NOTE: This is for the additional AND IN clauses in the WHERE statement
and_in_dict_li_obj = jp_obj['and_in_li']
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
if order_by_li:
order_by_li = json.loads(order_by_li)
# # NOTE: This should eventually be used to pass small amounts of data to the API through the URL GET params. -2023-11-29
# if json_str: # NOTE: Currently this does absolutely nothing. It is here for future use. -2023-11-29
# log.debug( urllib.parse.unquote(json_str) )
# try:
# json_obj = json.loads(urllib.parse.unquote(json_str))
# except Exception as e:
# log.warning(e)
# return mk_resp(data=False, status_code=400, response=commons.response, status_message='The JSON string was not formatted correctly.')
# log.debug(json_obj)
debug_data = {}
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
#debug_data['obj_id'] = obj_id
debug_data['for_obj_type'] = for_obj_type
debug_data['for_obj_id'] = for_obj_id
debug_data['use_alt_table'] = use_alt_table
debug_data['use_alt_base'] = use_alt_base
debug_data['jp_obj'] = jp_obj
# debug_data['fulltext_qry_field_li'] = fulltext_qry_field_li
# debug_data['fulltext_qry_str'] = fulltext_qry_str
debug_data['hidden'] = hidden
debug_data['order_by_li'] = order_by_li
if obj_type_l1 == 'lu':
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
pass
log.debug(debug_data)
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
if use_alt_table:
table_name = obj_type_li[obj_name]['table_name_alt']
else:
table_name = obj_type_li[obj_name]['table_name']
if use_alt_base:
base_name = obj_type_li[obj_name]['base_name_alt']
else:
base_name = obj_type_li[obj_name]['base_name']
if for_obj_type and for_obj_id:
for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(f'for_obj_type: {for_obj_type}')
log.debug(f'for_obj_id: {for_obj_id}')
# log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
field_name = f'{for_obj_type}_id'
# NOTE: The enabled and hidden parameters are new to this endpoint and the sql_select function! -2023-07-06
sql_result = sql_select(
table_name = table_name,
field_name = field_name,
field_value = for_obj_id,
enabled = commons.enabled,
hidden = hidden,
fulltext_qry_dict = fulltext_qry_dict_obj,
and_qry_dict = and_qry_dict_obj,
and_like_dict = and_like_dict_obj,
and_in_dict_li = and_in_dict_li_obj,
# fulltext_qry_field_li = fulltext_qry_field_li,
# fulltext_qry_str = fulltext_qry_str,
order_by_li = order_by_li,
limit = commons.limit,
offset = commons.offset,
as_list = True,
# log_lvl = logging.DEBUG
)
else:
# NOTE: The enabled and hidden parameters are new to this endpoint and the sql_select function! -2023-07-06
# NOTE: This call (without field_name, field_value, limit, offset) may need more testing.
sql_result = sql_select(
table_name = table_name,
enabled = commons.enabled,
hidden = hidden,
fulltext_qry_dict = fulltext_qry_dict_obj,
and_qry_dict = and_qry_dict_obj,
and_like_dict = and_like_dict_obj,
and_in_dict_li = and_in_dict_li_obj,
# fulltext_qry_field_li = fulltext_qry_field_li,
# fulltext_qry_str = fulltext_qry_str,
order_by_li = order_by_li,
limit = commons.limit,
offset = commons.offset,
as_list = True,
# log_lvl = logging.DEBUG
)
# log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(sql_result)
if sql_result:
if isinstance(sql_result, list):
log.setLevel(logging.DEBUG)
resp_data_li = []
for record in sql_result:
if base_name:
log.info(f'base_name was found. Returning data using {base_name} model.')
resp_data = base_name(**record).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
else:
log.warning('base_name model was not found. Returning raw data.')
resp_data = record
resp_data_li.append(resp_data)
column_name_li = list(sql_result[0].keys()) # This should be the same for all records in the list.
if return_file:
log.setLevel(logging.DEBUG)
# We want to handle any field that has a suffix of _json. It should be expanded into a list of fields that are prefixed with the original field name minus _json.
# This will also allow us to export the data to a CSV or Excel file with the correct column names.
# additional_json_field_list_for_export = []
new_resp_data_li = []
for record in resp_data_li:
new_record = record.copy()
for field_name in record.keys():
field_value = record[field_name]
if field_name.endswith('li_json') and record[field_name]:
log.info(f'Found a field that ends with li_json: {field_name}')
log.info(f'Field value is a list??: {field_value}')
# field_value = json.loads(record[key]) # Convert the string to a list of dictionaries
# Example JSON data: {'facebook': 'https://www.facebook.com/example', 'twitter': 'https://twitter.com/example', 'instagram': 'https://www.instagram.com/example/', 'linkedin': 'https://www.linkedin.com/school/example', 'org': 'https://example.com/'}
# Example new fields: social__facebook, social__twitter, social__instagram, social__linkedin, social__org
if isinstance(field_value, list):
# Loop through the list of dictionaries
log.info(f'Field value is a list: {field_value}')
for item in field_value:
# item = json.loads(item) # Convert the string to a dictionary
for key, value in item.items():
new_field_name = field_name[:-8]+'__'+key
new_record[new_field_name] = value
else:
# Loop through key value pairs in the dictionary
log.info(f'Field value is a dict: {field_value}')
for key, value in field_value.items():
new_field_name = field_name[:-8]+'__'+key
new_record[new_field_name] = value
# Create a new field for each and value to the record
# new_field_name = key[:-8]+'__cust_li'
# new_record[new_field_name] = record[key]
new_record.pop(field_name) # Remove the original field
elif field_name.endswith('_json') and record[field_name]:
log.info(f'Found a field that ends with _json: {field_name}')
log.info(f'Field value is a dict???: {field_value}')
for key, value in field_value.items():
new_field_name = field_name[:-5]+'__'+key
new_record[new_field_name] = value
new_record.pop(field_name) # Remove the original field
elif field_name.endswith('li_json') or field_name.endswith('_json'):
log.info(f'Found a field that ends with li_json or _json but no value: {field_name}')
new_record.pop(field_name) # Remove the original field
new_resp_data_li.append(new_record)
datetime_format='%Y-%m-%d_%H%M'
# current_datetime = datetime.datetime.now() # Servers timezone (Eastern)
current_datetime_utc = datetime.datetime.utcnow() # UTC timezone
current_datetime_utc = current_datetime_utc.strftime(datetime_format)
filename = f'{obj_name}_list_{current_datetime_utc}'
if file_type == 'CSV':
filename_w_ext = filename+'.csv'
elif file_type == 'Excel':
filename_w_ext = filename+'.xlsx'
log.setLevel(logging.INFO)
if result := create_export_file(data_dict_list=new_resp_data_li, column_name_li=[], subdir_path=obj_name, filename=filename, rm_id=True, export_type=file_type):
log.info(f'Export file created and saved: {result}')
else:
log.error('Something went wrong while creating or saving the export file')
tmp_file_path = result
log.info(f'Filename: {filename_w_ext}')
if full_tmp_path := return_full_tmp_path(full_tmp_path=tmp_file_path):
return FileResponse(path=full_tmp_path, filename=filename_w_ext)
else:
return mk_resp(data=resp_data_li, response=commons.response)
else:
status_message='Not Implemented (sort of). Attempted to process this request. Got a SQL result, but the returned data was unexpected.'
return mk_resp(data=sql_result, response=commons.response, status_code=501, status_message=status_message) # Returns "Not Implemented" (sort of... unexpected response)
else: return mk_resp(data=None, response=commons.response, status_code=404)
# ### BEGIN ### API CRUD ### get_obj_lx() ###
# Updated 2023-11-03
@router.get('/{obj_type_l1}/{obj_id}')
async def get_obj_l1(
obj_type_l1: str=None,
obj_id: str=None,
use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-12-01
use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-12-01
# for_obj_type: Optional[str] = Query(None, max_length=50), # NOTE: This is not currently used. It is here for future use.
# for_obj_id: Optional[str] = Query(None, max_length=22), # NOTE: This is not currently used. It is here for future use.
# qry_str: Optional[str] = Query(None, max_length=50),
# qry_int: Optional[int] = None,
# include: Optional[list] = [],
# exclude: Optional[list] = [],
# exclude_none: Optional[bool] = True,
commons: Common_Route_Params = Depends(common_route_params),
):
# ### SECTION ### Call generic function to get the object
return handle_get_obj_id(
obj_type_l1=obj_type_l1,
obj_id=obj_id,
use_alt_table=use_alt_table,
use_alt_base=use_alt_base,
commons=commons,
)
@router.get('/{obj_type_l1}/{obj_type_l2}/{obj_id}')
async def get_obj_l2(
obj_type_l1: str=None,
obj_type_l2: str=None,
obj_id: str=None,
use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-12-01
use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-12-01
# for_obj_type: Optional[str] = Query(None, max_length=50), # NOTE: This is not currently used. It is here for future use.
# for_obj_id: Optional[str] = Query(None, max_length=22), # NOTE: This is not currently used. It is here for future use.
# qry_str: Optional[str] = Query(None, max_length=50),
# qry_int: Optional[int] = None,
# include: Optional[list] = [],
# exclude: Optional[list] = [],
# exclude_none: Optional[bool] = True,
commons: Common_Route_Params = Depends(common_route_params),
):
# ### SECTION ### Call generic function to get the object
return handle_get_obj_id(
obj_type_l1=obj_type_l1,
obj_type_l2=obj_type_l2,
obj_id=obj_id,
use_alt_table=use_alt_table,
use_alt_base=use_alt_base,
commons=commons,
)
@router.get('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}')
async def get_obj_l3(
obj_type_l1: str=None,
obj_type_l2: str=None,
obj_type_l3: str=None,
obj_id: str=None,
use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-12-01
use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-12-01
# for_obj_type: Optional[str] = Query(None, max_length=50), # NOTE: This is not currently used. It is here for future use.
# for_obj_id: Optional[str] = Query(None, max_length=22), # NOTE: This is not currently used. It is here for future use.
# qry_str: Optional[str] = Query(None, max_length=50),
# qry_int: Optional[int] = None,
# include: Optional[list] = [],
# exclude: Optional[list] = [],
# exclude_none: Optional[bool] = True,
commons: Common_Route_Params = Depends(common_route_params),
):
# ### SECTION ### Call generic function to get the object
return handle_get_obj_id(
obj_type_l1=obj_type_l1,
obj_type_l2=obj_type_l2,
obj_type_l3=obj_type_l3,
obj_id=obj_id,
use_alt_table=use_alt_table,
use_alt_base=use_alt_base,
commons=commons,
)
def handle_get_obj_id(
obj_type_l1: str,
obj_type_l2: Optional[str] = None,
obj_type_l3: Optional[str] = None,
obj_id: str = None,
use_alt_table: bool = False,
use_alt_base: bool = False,
commons: Common_Route_Params = None,
):
"""
Simple select object type with an ID:
- **obj_type_l1, obj_type_l2, obj_type_l3**:
- Examples:
- /account = account
- /user = user
- /user/role = user_role
- /event = event
- /event/exhibit = event_exhibit
- /order = order
- /order/cart = order_cart
- /order/cart/line = order_cart_line
- /lu/some_lookup = lu_some_lookup
"""
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# time.sleep(2.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
debug_data = {}
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
debug_data['obj_id'] = obj_id
debug_data['use_alt_table'] = use_alt_table
debug_data['use_alt_base'] = use_alt_base
log.debug(debug_data)
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_li:
#table_name = obj_type_li[obj_name]
#table_name = obj_type_li[obj_name]['table_name']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_li:
#table_name = obj_type_li[obj_name]['table_name']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_li:
#table_name = obj_type_li[obj_name]['table_name']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
if use_alt_table:
table_name = obj_type_li[obj_name]['table_name_alt']
else:
table_name = obj_type_li[obj_name]['table_name']
if use_alt_base:
base_name = obj_type_li[obj_name]['base_name_alt']
# log.setLevel(logging.DEBUG)
log.debug(debug_data)
else:
base_name = obj_type_li[obj_name]['base_name']
# NOTE: Add a check for the object ID... assuming it is a random ID string for now.
if sql_result := sql_select(table_name=table_name, record_id_random=obj_id):
log.debug(sql_result)
if base_name:
log.info(f'base_name was found. Returning data using {base_name} model.')
resp_data = base_name(**sql_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
else:
log.info('base_name model was not found. Returning raw data.')
resp_data = sql_result
return mk_resp(data=resp_data, response=commons.response) #, details=debug_data)
else:
log.debug(sql_result)
return mk_resp(data=False, status_code=404, response=commons.response)
# ### BEGIN ### API CRUD ### patch_obj_lx() ###
# Updated 2024-03-08
@router.patch('/{obj_type_l1}/{obj_id}')
async def patch_obj_l1(
crud: Api_Crud_Base,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
run_safety_check: bool = True,
# for_obj_type: Optional[str] = Query(None, max_length=50),
# for_obj_id: Optional[str] = Query(None, max_length=22),
# The view name will be prefixed with "v_" and must be a valid view name based on the object type name from the URL. obj_type_l1, obj_type_l2, obj_type_l3 combined below as obj_name
return_obj: Optional[bool] = True, # I am not sure how to make this work yet. -2024-03-08
obj_v_name: Optional[str] = None, # Use view name to help return the object type. -2024-03-08
commons: Common_Route_Params = Depends(common_route_params),
):
# ### SECTION ### Call generic function to patch the object
return handle_patch_obj(
crud=crud,
obj_type_l1=obj_type_l1,
obj_id=obj_id,
run_safety_check=run_safety_check,
commons=commons,
)
@router.patch('/{obj_type_l1}/{obj_type_l2}/{obj_id}')
async def patch_obj_l2(
crud: Api_Crud_Base,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_type_l2: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
run_safety_check: bool = True,
# for_obj_type: Optional[str] = Query(None, max_length=50),
# for_obj_id: Optional[str] = Query(None, max_length=22),
# The view name will be prefixed with "v_" and must be a valid view name based on the object type name from the URL. obj_type_l1, obj_type_l2, obj_type_l3 combined below as obj_name
return_obj: Optional[bool] = True, # I am not sure how to make this work yet. -2024-03-08
obj_v_name: Optional[str] = None, # Use view name to help return the object type. -2024-03-08
commons: Common_Route_Params = Depends(common_route_params),
):
# ### SECTION ### Call generic function to patch the object
return handle_patch_obj(
crud=crud,
obj_type_l1=obj_type_l1,
obj_id=obj_id,
obj_type_l2=obj_type_l2,
run_safety_check=run_safety_check,
commons=commons,
)
@router.patch('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}')
async def patch_obj_l3(
crud: Api_Crud_Base,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_type_l2: str = Path(min_length=2, max_length=50),
obj_type_l3: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
run_safety_check: bool = True,
# for_obj_type: Optional[str] = Query(None, max_length=50),
# for_obj_id: Optional[str] = Query(None, max_length=22),
# The view name will be prefixed with "v_" and must be a valid view name based on the object type name from the URL. obj_type_l1, obj_type_l2, obj_type_l3 combined below as obj_name
return_obj: Optional[bool] = True, # I am not sure how to make this work yet. -2024-03-08
obj_v_name: Optional[str] = None, # Use view name to help return the object type. -2024-03-08
commons: Common_Route_Params = Depends(common_route_params),
):
# ### SECTION ### Call generic function to patch the object
return handle_patch_obj(
crud=crud,
obj_type_l1=obj_type_l1,
obj_id=obj_id,
obj_type_l2=obj_type_l2,
obj_type_l3=obj_type_l3,
return_obj=return_obj,
obj_v_name=obj_v_name,
run_safety_check=run_safety_check,
commons=commons,
)
def handle_patch_obj(
crud: Api_Crud_Base,
obj_type_l1: str,
obj_id: str,
obj_type_l2: Optional[str] = None,
obj_type_l3: Optional[str] = None,
run_safety_check: bool = True,
return_obj: Optional[bool] = True,
obj_v_name: Optional[str] = None,
commons: Common_Route_Params = None,
):
"""
Simple patch object type with an ID:
- **obj_type_l1, obj_type_l2, obj_type_l3**:
- Examples:
- /account = account
- /user = user
- /user/role = user_role
- /event = event
- /event/exhibit = event_exhibit
- /order = order
- /order/cart = order_cart
- /order/cart/line = order_cart_line
- /lu/some_lookup = lu_some_lookup
"""
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if crud.super_key == 'zp5PtX4zUsI': pass
elif crud.jwt:
# pass
log.warning('JWT was passed')
return mk_resp(data=False, status_code=501, response=commons.response, status_message='Token access for the API CRUD has not been implemented yet.')
else:
log.warning('Access key is missing or incorrect')
return mk_resp(data=False, status_code=400, response=commons.response)
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# time.sleep(2.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
debug_data = {}
debug_data['crud'] = crud
debug_data['create_key'] = crud.create_key
debug_data['read_key'] = crud.read_key
debug_data['update_key'] = crud.update_key
debug_data['delete_key'] = crud.delete_key
debug_data['data_list'] = crud.data_list
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
debug_data['obj_id'] = obj_id
log.debug(debug_data)
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_li:
#table_name = obj_type_li[obj_name]
#table_name = obj_type_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_li:
#table_name = obj_type_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_li:
#table_name = obj_type_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
table_name = obj_type_li[obj_name].get('tbl_name_update')
exclude = obj_type_li[obj_name].get('exclude_for_db')
# ### SECTION ### Secondary data validation
obj_id_random = obj_id # This might need to be used later for the response data
if obj_id := redis_lookup_id_random(record_id_random=obj_id, table_name=table_name): pass
else: return mk_resp(data=None, status_code=404, response=commons.response, status_message='The object ID was invalid or not found.')
# NOTE: Doing a quick sanity check based on the object models and then dump to a dict to get rid of invalid fields. The other option is to just use the crud.data_list raw.
crud_data = crud.data_list
log.debug(crud_data.keys())
field_list = crud_data.keys()
if run_safety_check:
log.info('Running safety check by default')
base_name = obj_type_li[obj_name]['base_name']
try:
obj_model = base_name(**crud.data_list)
except Exception as e:
log.error('An unknown exception happened. Returning False.')
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
log.warning(json.dumps(crud.data_list, indent=4))
return mk_resp(data=False, status_code=400, response=commons.response, status_message='There was likely a validation error. Returned False.')
else:
log.info('Successfully updated the object model.')
pass
log.debug(obj_model)
obj_dict = obj_model.dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset, include=field_list)
log.debug(obj_dict)
crud_data = obj_dict
else:
log.warning('The default safety check was not run!')
obj_dict = crud_data
# NOTE: Add a check for the object ID... assuming it is a random ID string for now. Using rm_id_random. That helps with some field names.
if sql_result := sql_update(data=crud_data, table_name=table_name, record_id=obj_id, rm_id_random=True, log_lvl=logging.DEBUG):
log.info('The record was updated.')
log.debug(sql_result)
resp_data = {}
resp_data['table_name'] = table_name
resp_data['request_data'] = obj_dict
resp_data['obj_id'] = obj_id
resp_data['obj_id_random'] = obj_id_random
# Comment out the following line if you do not want to allo for returning the updated data.
# return mk_resp(data=resp_data, response=commons.response) #, details=debug_data)
elif sql_result == None:
log.info('The record was probably not found to be updated.')
log.debug(sql_result)
return mk_resp(data=None, status_code=404, status_message='The record was probably not found to be updated.', response=commons.response)
else:
log.info('Something unexpected happened while trying to run the SQL UPDATE. The fields or field values passed may not be valid for the table and or model.')
log.debug(sql_result)
return mk_resp(data=False, status_code=400, status_message='Something unexpected happened while trying to runt he SQL UPDATE. The fields or field values passed may not be valid for the table and or model.', response=commons.response)
# ### SECTION ### Return successful results
# NOTE: obj_id was found and verified above
# New: 2024-03-08
if return_obj:
if obj_v_name:
# We should add a sanity check here to make sure the view name is valid first.
table_name = f'v_{obj_v_name}'
# Remove an extra "v_" if it is there.
if table_name.startswith('v_v_'):
table_name = table_name[2:]
# We can do more later... like check against an allowed list of view names per Aether object type
else:
# Use the table_name as a backup option? This should be safe since it is also passed through the model.
table_name = obj_type_li[obj_name]['table_name']
base_name = obj_type_li[obj_name]['base_name']
if sql_select_result := sql_select(table_name=table_name, record_id=obj_id):
log.debug(sql_select_result)
log.debug(base_name)
resp_data = base_name(**sql_select_result).dict(by_alias=True, exclude_unset=commons.exclude_unset)
log.debug(resp_data)
log.info(f'Returning object model {base_name} from table_name {table_name}; obj_id; obj_id_random={obj_id_random}) using PATCH data')
return mk_resp(data=resp_data, response=commons.response)
else:
log.debug(sql_select_result)
status_message = f'The record was not found in table_name={table_name} used for the SQL SELECT query. This may be a SQL VIEW and may be because the SQL VIEW needs to be created or updated.'
return mk_resp(data=False, status_code=404, response=commons.response, status_message=status_message)
log.info(f'Returning IDs (obj_id, obj_id_random={obj_id_random}) using PATCH data')
return mk_resp(data=resp_data, response=commons.response) #, details=debug_data)
# ### END ### API CRUD ### patch_obj() ###
# ### BEGIN ### API CRUD ### post_obj_lx() ###
# Updated 2024-03-08
@router.post('/{obj_type_l1}')
async def post_obj_l1(
crud: Api_Crud_Base,
obj_type_l1: Optional[str] = Path(..., max_length=50),
# obj_id: str = Path(min_length=11, max_length=22),
run_safety_check: bool = True,
# The view name will be prefixed with "v_" and must be a valid view name based on the object type name from the URL. obj_type_l1, obj_type_l2, obj_type_l3 combined below as obj_name
# for_obj_type: Optional[str] = Query(None, max_length=50),
# for_obj_id: Optional[str] = Query(None, max_length=22),
return_obj: Optional[bool] = True,
obj_v_name: Optional[str] = None, # Use view name to help return the object type. -2024-03-08
commons: Common_Route_Params = Depends(common_route_params),
):
# ### SECTION ### Call generic function to post the object
return handle_post_obj(
crud=crud,
obj_type_l1=obj_type_l1,
run_safety_check=run_safety_check,
return_obj=return_obj,
obj_v_name=obj_v_name,
commons=commons,
)
@router.post('/{obj_type_l1}/{obj_type_l2}')
async def post_obj_l2(
crud: Api_Crud_Base,
obj_type_l1: Optional[str] = Path(..., max_length=50),
obj_type_l2: str = None,
# obj_id: str = Path(min_length=11, max_length=22),
run_safety_check: bool = True,
# The view name will be prefixed with "v_" and must be a valid view name based on the object type name from the URL. obj_type_l1, obj_type_l2, obj_type_l3 combined below as obj_name
# for_obj_type: Optional[str] = Query(None, max_length=50),
# for_obj_id: Optional[str] = Query(None, max_length=22),
return_obj: Optional[bool] = True,
obj_v_name: Optional[str] = None, # Use view name to help return the object type. -2024-03-08
commons: Common_Route_Params = Depends(common_route_params),
):
# ### SECTION ### Call generic function to post the object
return handle_post_obj(
crud=crud,
obj_type_l1=obj_type_l1,
obj_type_l2=obj_type_l2,
run_safety_check=run_safety_check,
return_obj=return_obj,
obj_v_name=obj_v_name,
commons=commons,
)
@router.post('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}')
async def post_obj_l3(
crud: Api_Crud_Base,
obj_type_l1: Optional[str] = Path(..., max_length=50),
obj_type_l2: str = None,
obj_type_l3: str = None,
# obj_id: str = Path(min_length=11, max_length=22),
run_safety_check: bool = True,
# The view name will be prefixed with "v_" and must be a valid view name based on the object type name from the URL. obj_type_l1, obj_type_l2, obj_type_l3 combined below as obj_name
# for_obj_type: Optional[str] = Query(None, max_length=50),
# for_obj_id: Optional[str] = Query(None, max_length=22),
return_obj: Optional[bool] = True,
obj_v_name: Optional[str] = None, # Use view name to help return the object type. -2024-03-08
commons: Common_Route_Params = Depends(common_route_params),
):
# ### SECTION ### Call generic function to post the object
return handle_post_obj(
crud=crud,
obj_type_l1=obj_type_l1,
obj_type_l2=obj_type_l2,
obj_type_l3=obj_type_l3,
run_safety_check=run_safety_check,
return_obj=return_obj,
obj_v_name=obj_v_name,
commons=commons,
)
def handle_post_obj(
crud: Api_Crud_Base,
obj_type_l1: str,
obj_type_l2: Optional[str] = None,
obj_type_l3: Optional[str] = None,
run_safety_check: bool = True,
return_obj: Optional[bool] = True,
obj_v_name: Optional[str] = None,
commons: Common_Route_Params = None,
):
"""
Simple post object type:
- **obj_type_l1, obj_type_l2, obj_type_l3**:
- Examples:
- /account = account
- /user = user
- /user/role = user_role
- /event = event
- /event/exhibit = event_exhibit
- /order = order
- /order/cart = order_cart
- /order/cart/line = order_cart_line
- /lu/some_lookup = lu_some_lookup
"""
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if crud.super_key == 'zp5PtX4zUsI': pass
elif crud.jwt:
# pass
log.warning('JWT was passed')
return mk_resp(data=False, status_code=501, response=commons.response, status_message='Token access for the API CRUD has not been implemented yet.')
else:
log.warning('Access key is missing or incorrect')
return mk_resp(data=False, status_code=400, response=commons.response)
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# time.sleep(1.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
debug_data = {}
debug_data['crud'] = crud
debug_data['create_key'] = crud.create_key
debug_data['read_key'] = crud.read_key
debug_data['update_key'] = crud.update_key
debug_data['delete_key'] = crud.delete_key
debug_data['data_list'] = crud.data_list
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
# debug_data['obj_id'] = obj_id
log.debug(debug_data)
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_li:
#table_name = obj_type_li[obj_name]
#table_name = obj_type_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_li:
#table_name = obj_type_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_li:
#table_name = obj_type_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
# ### SECTION ### Figure out the table_name to use
# Updated: 2024-03-08
view_name = None # This is the view name to use for the SQL SELECT query
if obj_v_name:
# We should add a sanity check here to make sure the view name is valid first.
view_name = f'v_{obj_v_name}'
# Remove an extra "v_" if it is there.
if view_name.startswith('v_v_'):
view_name = view_name[2:]
# We can do more later... like check against an allowed list of view names per Aether object type
elif obj_type_li[obj_name].get('table_name'):
# Use the table_name as a backup option? This should be safe.
view_name = obj_type_li[obj_name].get('table_name')
elif obj_type_li[obj_name].get('table_name_alt'):
view_name = obj_type_li[obj_name].get('table_name_alt')
elif obj_type_li[obj_name].get('tbl_name_update'):
view_name = obj_type_li[obj_name].get('tbl_name_update')
else:
log.error('The table_name was not found. This is a critical error. Returning False.')
return mk_resp(data=False, status_code=400, response=commons.response, status_message='The table_name was not found. This is a critical error.')
table_name = None # This is the table name to use for the SQL INSERT or UPDATE
if obj_type_li[obj_name].get('tbl_name_update'):
table_name = obj_type_li[obj_name].get('tbl_name_update')
elif obj_type_li[obj_name].get('table_name'):
table_name = obj_type_li[obj_name].get('table_name')
# This should be a view in most cases.
table_name_select = view_name
exclude = obj_type_li[obj_name].get('exclude_for_db')
# ### SECTION ### Secondary data validation
# if obj_id := redis_lookup_id_random(record_id_random=obj_id, table_name=table_name): pass
# else: return mk_resp(data=None, status_code=404, response=commons.response, status_message='The object ID was invalid or not found.')
# NOTE: Doing a quick sanity check based on the object models and then dump to a dict to get rid of invalid fields. The other option is to just use the crud.data_list raw.
crud_data = crud.data_list
log.debug(crud_data.keys())
field_list = crud_data.keys()
if run_safety_check:
log.info('Running safety check by default')
base_name = obj_type_li[obj_name]['base_name']
try:
obj_model = base_name(**crud.data_list)
except Exception as e:
log.error('An unknown exception happened. Returning False.')
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
log.warning(json.dumps(crud.data_list, indent=4))
return mk_resp(data=False, status_code=400, response=commons.response, status_message='There was likely a validation error. Returned False.')
else:
log.info('Successfully created the object model.')
pass
log.debug(obj_model)
obj_dict = obj_model.dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset, include=field_list)
log.debug(obj_dict)
crud_data = obj_dict
else:
log.warning('The default safety check was not run!')
obj_dict = crud_data
# NOTE: Add a check for the object ID... assuming it is a random ID string for now. Using rm_id_random. That helps with some field names.
if sql_result := sql_insert(data=crud_data, table_name=table_name, rm_id_random=True, log_lvl=logging.INFO):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('The record was inserted.')
log.debug(sql_result)
resp_data = {}
resp_data['table_name'] = table_name
resp_data['request_data'] = obj_dict
obj_id = sql_result # The ID should be returned
resp_data['obj_id'] = obj_id
obj_id_random = get_id_random(record_id=obj_id, table_name=table_name)
resp_data['obj_id_random'] = obj_id_random
# NOTE: obj_id was found and verified above
# Updated: 2024-03-08
if return_obj:
log.info('Returning object created from POST data')
# This is the more or less the same as what is done in the patch_obj() endpoint.
if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id):
log.debug(sql_select_result)
log.debug(base_name)
resp_data = base_name(**sql_select_result).dict(by_alias=True, exclude_unset=commons.exclude_unset)
log.debug(resp_data)
log.info(f'Returning object model ({base_name}; obj_id; obj_id_random={obj_id_random}) from POST data')
return mk_resp(data=resp_data, response=commons.response)
else:
log.debug(sql_select_result)
status_message = 'The record was not found. This may be because a SQL VIEW was used for the SQL SELECT query?'
return mk_resp(data=False, status_code=404, response=commons.response, status_message=status_message)
log.info('Returning IDs only created from POST data')
return mk_resp(data=resp_data, response=commons.response) #, details=debug_data)
elif sql_result == None:
log.info('The record was probably not found to be updated.')
log.debug(sql_result)
return mk_resp(data=None, status_code=404, status_message='The record was probably not found to be updated.', response=commons.response)
else:
log.info('Something unexpected happened while trying to runt he SQL UPDATE. The fields or field values passed may not be valid for the table and or model.')
log.debug(sql_result)
return mk_resp(data=False, status_code=400, status_message='Something unexpected happened while trying to run the SQL UPDATE. The fields or field values passed may not be valid for the table and or model.', response=commons.response)
# ### END ### API CRUD ### post_obj() ###
# ### BEGIN ### API CRUD ### delete_obj_lx() ###
# Updated 2023-07-10
@router.delete('/{obj_type_l1}/{obj_id}')
async def delete_obj_l1(
obj_type_l1: str=None,
obj_id: str=None,
method: str = 'delete', # None, delete, disable, hide
# x_account_id: str = Header(...),
# response: Response = Response,
commons: Common_Route_Params = Depends(common_route_params),
):
# ### SECTION ### Call generic function to delete the object
return handle_delete_obj(
obj_type_l1=obj_type_l1,
obj_id=obj_id,
method=method,
commons=commons,
)
@router.delete('/{obj_type_l1}/{obj_type_l2}/{obj_id}')
async def delete_obj_l2(
obj_type_l1: str=None,
obj_type_l2: str=None,
obj_id: str=None,
method: str = 'delete', # None, delete, disable, hide
# x_account_id: str = Header(...),
# response: Response = Response,
commons: Common_Route_Params = Depends(common_route_params),
):
# ### SECTION ### Call generic function to delete the object
return handle_delete_obj(
obj_type_l1=obj_type_l1,
obj_type_l2=obj_type_l2,
obj_id=obj_id,
method=method,
commons=commons,
)
@router.delete('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}')
async def delete_obj_l3(
obj_type_l1: str=None,
obj_type_l2: str=None,
obj_type_l3: str=None,
obj_id: str=None,
method: str = 'delete', # None, delete, disable, hide
# x_account_id: str = Header(...),
# response: Response = Response,
commons: Common_Route_Params = Depends(common_route_params),
):
# ### SECTION ### Call generic function to delete the object
return handle_delete_obj(
obj_type_l1=obj_type_l1,
obj_type_l2=obj_type_l2,
obj_type_l3=obj_type_l3,
obj_id=obj_id,
method=method,
commons=commons,
)
def handle_delete_obj(
obj_type_l1: str=None,
obj_type_l2: str=None,
obj_type_l3: str=None,
obj_id: str=None,
method: str = 'delete', # None, delete, disable, hide
# x_account_id: str = Header(...),
# response: Response = Response,
commons: Common_Route_Params = None,
):
"""
Simple delete object type with an ID:
- **obj_type_l1, obj_type_l2, obj_type_l3**:
- Examples:
- /account = account
- /user = user
- /user/role = user_role
- /event = event
- /event/exhibit = event_exhibit
- /event/person/profile = event_person_profile
- /order = order
- /order/line = order_line
- /lu/some_lookup = lu_some_lookup
"""
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
debug_data = {}
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
debug_data['obj_id'] = obj_id
log.debug(debug_data)
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# time.sleep(1.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
table_name = obj_name # obj_type_li[obj_name]['table_name'] # NOTE: Don't try to use the view!
# NOTE: Add a check for the object ID... assuming it is a random ID string for now.
if method == 'delete' or method is None:
sql_result = sql_delete(table_name=table_name, record_id_random=obj_id)
log.debug(sql_result)
elif method == 'disable':
data = {'enable': False}
sql_result = sql_update(data=data, table_name=table_name, record_id_random=obj_id, rm_id_random=True, log_lvl=logging.INFO)
elif method == 'hide':
data = {'hide': True}
sql_result = sql_update(data=data, table_name=table_name, record_id_random=obj_id, rm_id_random=True, log_lvl=logging.INFO)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
if sql_result:
resp_data = True
log.info('The record was found and deleted or updated.')
elif sql_result == None:
resp_data = None
log.info('The record was probably not found to be deleted and or updated.')
else:
resp_data = False
log.info('Something unexpected happened while trying to run the SQL DELETE and or UPDATE. The fields or field values passed may not be valid for the table.')
resp_details = ''
if method == 'delete' and sql_result:
resp_details = f'Object type: {obj_name} Object ID: {obj_id}; deleted'
return mk_resp(data=resp_data, details=resp_details, response=commons.response) #, details=debug_data)
elif method == 'hide' and sql_result:
resp_details = f'Object type: {obj_name} Object ID: {obj_id}; hidden'
return mk_resp(data=resp_data, details=resp_details, response=commons.response) #, details=debug_data)
elif method == 'disable' and sql_result:
resp_details = f'Object type: {obj_name} Object ID: {obj_id}; disabled'
return mk_resp(data=resp_data, details=resp_details, response=commons.response) #, details=debug_data)
elif sql_result is None:
resp_details = f'Not found: Object type: {obj_name} Object ID: {obj_id}; {method}'
return mk_resp(data=resp_data, status_code=404, details=resp_details, response=commons.response) #, details=debug_data)
else:
resp_details = f'Unexpected result: Object type: {obj_name} Object ID: {obj_id}; {method}'
return mk_resp(data=resp_data, status_code=400, details=resp_details, response=commons.response) #, details=debug_data)
# ### END ### API CRUD ### delete_obj() ###
def post_obj_template(
obj_type: str,
data: dict,
id_random_length: int = 8, # Added 2023-04-13; need to move away from this
return_obj: bool = True,
by_alias: bool = True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_data_dict = data
obj_data = lookup_id_random_pop(obj_data_dict)
table_name_insert = obj_type
table_name_select = obj_type_li[obj_type]['table_name']
base_name = obj_type_li[obj_type]['base_name']
if sql_insert_result := sql_insert(table_name=table_name_insert, data=obj_data, id_random_length=id_random_length):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(sql_insert_result)
obj_id = sql_insert_result
else:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(sql_insert_result)
return mk_resp(data=False, status_code=400, response=response)
if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id):
log.debug(sql_select_result)
resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=resp_data, response=response)
else:
log.debug(sql_select_result)
return mk_resp(data=False, status_code=404, response=response)
def patch_obj_template(
obj_type: str,
data: dict,
obj_id: str,
return_obj: bool=True,
by_alias: bool=True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_data_dict = data
#obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type)
obj_data_dict['id_random'] = obj_id # NOTE: Adding this in so the id_random is NOT updated
log.debug(obj_data_dict)
obj_data = lookup_id_random_pop(obj_data_dict)
table_name_update = obj_type
table_name_select = obj_type_li[obj_type]['table_name']
base_name = obj_type_li[obj_type]['base_name']
if sql_update_result := sql_update(table_name=table_name_update, data=obj_data):
log.debug(sql_update_result)
#obj_id = sql_update_result
obj_id = obj_data_dict['id']
else:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(sql_update_result)
return mk_resp(data=False, status_code=400, response=response)
if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id):
log.debug(sql_select_result)
resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=resp_data, response=response)
else:
log.debug(sql_select_result)
return mk_resp(data=False, status_code=404, response=response)
def get_obj_li_template(
obj_type: str = Query(None, max_length=50),
for_obj_type: Optional[str] = Query(None, max_length=50),
for_obj_id: Optional[Union[int,str]] = None,
by_alias: Optional[bool] = True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if isinstance(for_obj_id, int):
pass
elif isinstance(for_obj_id, str):
for_obj_id_random = for_obj_id
for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id_random, table_name=for_obj_type)
table_name_select = obj_type_li[obj_type]['table_name']
if for_obj_type and for_obj_id:
field_name = f'{for_obj_type}_id'
sql_result = sql_select(table_name=table_name_select, field_name=field_name, field_value=for_obj_id)
else:
sql_result = sql_select(table_name=table_name_select)
log.debug(sql_result)
base_name = obj_type_li[obj_type]['base_name']
resp_data_li = []
for record in sql_result:
resp_data = base_name(**record).dict(by_alias=by_alias, exclude_unset=exclude_unset)
resp_data_li.append(resp_data)
return mk_resp(data=resp_data_li, response=response)
def get_obj_template(
obj_id: Union[int,str],
obj_type: str = Query(None, max_length=50),
by_alias: Optional[bool] = True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if isinstance(obj_id, int):
pass
elif isinstance(obj_id, str):
obj_id_random = obj_id
obj_id = redis_lookup_id_random(record_id_random=obj_id_random, table_name=obj_type)
table_name_select = obj_type_li[obj_type]['table_name']
if obj_id:
sql_result = sql_select(table_name=table_name_select, record_id=obj_id)
else:
return mk_resp(data=False, status_code=404, response=response)
if sql_result:
log.debug(sql_result)
base_name = obj_type_li[obj_type]['base_name']
resp_data = base_name(**sql_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=resp_data, response=response)
else:
log.debug(sql_result)
return mk_resp(data=False, status_code=404, response=response)
def delete_obj_template(
obj_type: str = Query(None, max_length=50),
obj_id: str = Query(None, max_length=22),
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
debug_data = {}
debug_data['obj_type'] = obj_type
debug_data['obj_id'] = obj_id
log.debug(debug_data)
table_name_delete = obj_type # NOTE: Not using the table name from the object type list because it may be a view (v_*).
# NOTE: Add a check for the object ID... assuming it is a random ID string for now.
if sql_result := sql_delete(table_name=table_name_delete, record_id_random=obj_id):
log.debug(sql_result)
return mk_resp(data=True, response=response)
else:
log.debug(sql_result)
return mk_resp(data=False, status_code=404, response=response)
# New dynamic API CRUD endpoint
# The POST data should be JSON formatted
# @router.post('/query')
# def query(
# # qry JSON should contain these properties:
# # list: for_obj_type, for_obj_id, tbl_view_name, base_name
# # id: obj_id, tbl_view_name, base_name
# qry: str,
# commons: Common_Route_Params = Depends(common_route_params),
# ):
# log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(locals())
# import urllib