import datetime, json, time #from datetime import datetime, time, timedelta from fastapi import APIRouter, Body, Depends, Header, HTTPException, Path, Query, Response, status from fastapi.responses import FileResponse from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from app.lib_general import log, logging, common_route_params, Common_Route_Params, common_route_params_min, Common_Route_Params_Min # from app.config import settings from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random, lookup_id_random_pop from app.models.response_models import * from app.models.api_crud_models import * from app.models.account_models import * from app.models.account_cfg_models import * from app.models.activity_log_models import * from app.models.address_models import * from app.models.archive_models import * from app.models.archive_content_models import * from app.models.contact_models import * from app.models.cont_edu_cert_models import * from app.models.cont_edu_cert_person_models import * from app.models.data_store_models import * from app.models.event_models import * from app.models.event_abstract_models import * from app.models.event_badge_models import * from app.models.event_exhibit_models import * from app.models.event_exhibit_tracking_models import * from app.models.event_file_models import * from app.models.event_location_models import * from app.models.event_person_models import * from app.models.event_person_tracking_models import * from app.models.event_presentation_models import * from app.models.event_presenter_models import * from app.models.event_registration_models import * from app.models.event_session_models import * from app.models.event_track_models import * from app.models.grant_models import * from app.models.hosted_file_models import * from app.models.journal_models import * from app.models.journal_entry_models import * from app.models.log_client_viewing_models import Log_Client_Viewing_Base from app.models.membership_cfg_models import * from app.models.membership_group_models import * from app.models.membership_person_group_models import * from app.models.membership_person_models import * from app.models.membership_person_profile_models import * from app.models.membership_type_models import * from app.models.membership_person_type_models import * from app.models.order_models import * from app.models.order_cart_models import * from app.models.organization_models import * from app.models.page_models import * from app.models.person_models import * from app.models.product_models import * from app.models.post_models import * from app.models.post_comment_models import * from app.models.site_models import * from app.models.site_domain_models import * from app.models.sponsorship_cfg_models import * from app.models.sponsorship_models import * from app.models.user_models import * from app.models.user_role_models import * from app.models.e_stripe_models import * obj_type_kv_li = {} # New 2024-04-23 obj_type_li = {} # Old... #obj_type_li['cfg_flask'] = {'table_name': 'cfg_flask', 'base_name': Cfg_Flask_Base} #obj_type_li['api_client_token'] = {'table_name': 'api_client_token', 'base_name': Api_Client_Token_Base} #obj_type_li['api_key'] = {'table_name': 'api_key', 'base_name': Api_Key_Base} #obj_type_li['api_token'] = {'table_name': 'api_token', 'base_name': Api_Token_Base} # ### Core module objects obj_type_li['account'] = {'table_name': 'account', 'tbl_name_update': 'account', 'base_name': Account_Base} obj_type_li['account_cfg'] = {'table_name': 'v_account_cfg', 'tbl_name_update': 'account_cfg', 'base_name': Account_Cfg_Base} # NOTE check view name: *_detail? obj_type_li['activity_log'] = {'table_name': 'activity_log', 'tbl_name_update': 'activity_log', 'base_name': Activity_Log_Base} obj_type_li['address'] = {'table_name': 'v_address', 'tbl_name_update': 'address', 'base_name': Address_Base} #obj_type_li['change_log'] = {'table_name': 'change_log', 'tbl_name_update': 'change_log', 'base_name': Change_Log_Base} obj_type_li['contact'] = {'table_name': 'v_contact', 'tbl_name_update': 'contact', 'base_name': Contact_Base} obj_type_li['data_store'] = {'table_name': 'v_data_store', 'tbl_name_update': 'data_store', 'base_name': Data_Store_Base} obj_type_li['hosted_file'] = {'table_name': 'v_hosted_file', 'tbl_name_update': 'hosted_file', 'base_name': Hosted_File_Base} #obj_type_li['hosted_file_link'] = {'table_name': 'hosted_file_link', 'tbl_name_update': 'hosted_file_link', 'base_name': Hosted_File_Link_Base} obj_type_li['log_client_viewing'] = {'table_name': 'log_client_viewing', 'tbl_name_update': 'log_client_viewing', 'base_name': Log_Client_Viewing_Base} obj_type_li['order'] = {'table_name': 'v_order', 'tbl_name_update': 'order', 'base_name': Order_Base} obj_type_li['order_cart'] = {'table_name': 'v_order_cart', 'tbl_name_update': 'order_cart', 'base_name': Order_Cart_Base} obj_type_li['order_cart_line'] = {'table_name': 'v_order_cart_line', 'tbl_name_update': 'order_cart_line', 'base_name': Order_Cart_Line_Base} obj_type_li['order_line'] = {'table_name': 'v_order_line', 'tbl_name_update': 'order_line', 'base_name': Order_Line_Base} #obj_type_li['order_transaction'] = {'table_name': 'order_transaction', 'tbl_name_update': 'order_transaction', 'base_name': Order_Transaction_Base} obj_type_li['organization'] = {'table_name': 'v_organization', 'tbl_name_update': 'organization', 'base_name': Organization_Base} obj_type_li['page'] = {'table_name': 'page', 'tbl_name_update': 'page', 'base_name': Page_Base} obj_type_li['person'] = {'table_name': 'v_person', 'tbl_name_update': 'person', 'base_name': Person_Base} obj_type_li['site'] = {'table_name': 'site', 'tbl_name_update': 'site', 'base_name': Site_Base} obj_type_li['site_domain'] = {'table_name': 'v_site_domain', 'table_name_alt': 'v_site_domain_fqdn_id', 'tbl_name_update': 'site_domain', 'base_name': Site_Domain_Base, 'base_name_alt': Site_Domain_FQDN_ID_Base} # NOTE check view name: *_detail? obj_type_li['user'] = {'table_name': 'v_user', 'tbl_name_update': 'user', 'base_name': User_Base} obj_type_li['user_role'] = {'table_name': 'v_user_role', 'tbl_name_update': 'user_role', 'base_name': User_Role_Base} # NOTE check view name: *_detail? # ### Common shared lookup objects obj_type_li['lu_country'] = {'table_name': 'lu_country', 'tbl_name_update': 'lu_country', 'base_name': None} obj_type_li['lu_country_subdivision'] = {'table_name': 'lu_country_subdivision', 'tbl_name_update': 'lu_country_subdivision', 'base_name': None} #obj_type_li['lu_education_degree'] = {'table_name': 'lu_education_degree', 'tbl_name_update': 'lu_education_degree', 'base_name': Lu_Education_Degree_Base} #obj_type_li['lu_education_level'] = {'table_name': 'lu_education_level', 'tbl_name_update': 'lu_education_level', 'base_name': Lu_Education_Level_Base} #obj_type_li['lu_ethnicity'] = {'table_name': 'lu_ethnicity', 'tbl_name_update': 'lu_ethnicity', 'base_name': Lu_Ethnicity_Base} #obj_type_li['lu_file_purpose'] = {'table_name': 'lu_file_purpose', 'tbl_name_update': 'lu_file_purpose', 'base_name': Lu_File_Purpose_Base} #obj_type_li['lu_gender'] = {'table_name': 'lu_gender', 'tbl_name_update': 'lu_gender', 'base_name': Lu_Gender_Base} #obj_type_li['lu_html_color'] = {'table_name': 'lu_html_color', 'tbl_name_update': 'lu_html_color', 'base_name': Lu_Html_Color_Base} #obj_type_li['lu_media_type'] = {'table_name': 'lu_media_type', 'tbl_name_update': 'lu_media_type', 'base_name': Lu_Media_Type_Base} #obj_type_li['lu_membership_status'] = {'table_name': 'lu_membership_status', 'tbl_name_update': 'lu_membership_status', 'base_name': Lu_Membership_Status_Base} #obj_type_li['lu_membership_type'] = {'table_name': 'lu_membership_type', 'tbl_name_update': 'lu_membership_type', 'base_name': Lu_Membership_Type_Base} #obj_type_li['lu_order_status'] = {'table_name': 'lu_order_status', 'tbl_name_update': 'lu_order_status', 'base_name': Lu_Order_Status_Base} #obj_type_li['lu_post_topic'] = {'table_name': 'lu_post_topic', 'tbl_name_update': 'lu_post_topic', 'base_name': Lu_Post_Topic_Base} #obj_type_li['lu_product_type'] = {'table_name': 'lu_product_type', 'tbl_name_update': 'lu_product_type', 'base_name': Lu_Product_Type_Base} #obj_type_li['lu_pronoun'] = {'table_name': 'lu_pronoun', 'tbl_name_update': 'lu_pronoun', 'base_name': Lu_Pronoun_Base} #obj_type_li['lu_race'] = {'table_name': 'lu_race', 'tbl_name_update': 'lu_race', 'base_name': Lu_Race_Base} obj_type_li['lu_time_zone'] = {'table_name': 'v_lu_time_zone', 'tbl_name_update': 'lu_time_zone', 'base_name': None} #obj_type_li['lu_user_role'] = {'table_name': 'lu_user_role', 'tbl_name_update': 'lu_user_role', 'base_name': Lu_User_Role_Base} #obj_type_li['lu_user_status'] = {'table_name': 'lu_user_status', 'tbl_name_update': 'lu_user_status', 'base_name': Lu_User_Status_Base} # ### Additional module objects obj_type_li['archive'] = {'table_name': 'v_archive', 'table_name_alt': 'v_archive_w_content_count', 'tbl_name_update': 'archive', 'base_name': Archive_Base} obj_type_li['archive_content'] = {'table_name': 'v_archive_content', 'tbl_name_update': 'archive_content', 'base_name': Archive_Content_Base} obj_type_li['cont_edu_cert'] = {'table_name': 'v_cont_edu_cert', 'tbl_name_update': 'cont_edu_cert', 'base_name': Cont_Edu_Cert_Base} obj_type_li['cont_edu_cert_person'] = {'table_name': 'v_cont_edu_cert_person', 'tbl_name_update': 'cont_edu_cert_person', 'base_name': Cont_Edu_Cert_Person_Base} obj_type_li['event'] = {'table_name': 'v_event', 'table_name_alt': 'v_event_w_file_count', 'tbl_name_update': 'event', 'base_name': Event_Base, 'base_name_alt': Event_Meeting_Flat_Base} obj_type_li['event_abstract'] = {'table_name': 'v_event_abstract', 'tbl_name_update': 'event_abstract', 'base_name': Event_Abstract_In} obj_type_li['event_badge'] = {'table_name': 'v_event_badge', 'table_name_alt': 'v_event_badge_only', 'tbl_name_update': 'event_badge', 'base_name': Event_Badge_Base, 'base_name_alt': Event_Badge_Basic_Base} #obj_type_li['event_badge_log'] = {'table_name': 'event_badge_log', 'tbl_name_update': 'event_badge_log', 'base_name': Event_Badge_Log_Base} #obj_type_li['event_badge_template'] = {'table_name': 'event_badge_template', 'tbl_name_update': 'event_badge_template', 'base_name': Event_Badge_Template_Base} #obj_type_li['event_device'] = {'table_name': 'event_device', 'tbl_name_update': 'event_device', 'base_name': Event_Device_Base} obj_type_li['event_exhibit'] = {'table_name': 'v_event_exhibit', 'tbl_name_update': 'event_exhibit', 'base_name': Event_Exhibit_Base} # NOTE check view name: *_detail? obj_type_li['event_exhibit_tracking'] = {'table_name': 'v_event_exhibit_tracking', 'tbl_name_update': 'event_exhibit_tracking', 'base_name': Event_Exhibit_Tracking_Base} # NOTE: Using v_event_file_simple instead of v_event_file because of linking with for_type and for_id versus event_id, event_session_id, event_presenter_id, etc. 2022-08-19 # NOTE: This will not pull in linked to details like a session name, presentation time, or presenter name. obj_type_li['event_file'] = {'table_name': 'v_event_file_simple', 'tbl_name_update': 'event_file_simple', 'base_name': Event_File_Base} # Should this eventually be changed to event_hosted_file obj_type_li['event_location'] = {'table_name': 'v_event_location', 'tbl_name_update': 'event_location', 'base_name': Event_Location_Base} obj_type_li['event_person'] = {'table_name': 'v_event_person', 'tbl_name_update': 'event_person', 'base_name': Event_Person_Base} obj_type_li['event_person_tracking'] = {'table_name': 'v_event_person_tracking', 'tbl_name_update': 'event_person_tracking', 'base_name': Event_Person_Tracking_Base} obj_type_li['event_presentation'] = {'table_name': 'v_event_presentation', 'tbl_name_update': 'event_presentation', 'base_name': Event_Presentation_Base} obj_type_li['event_presenter'] = {'table_name': 'v_event_presenter', 'tbl_name_update': 'event_presenter', 'base_name': Event_Presenter_Base} obj_type_li['event_registration'] = {'table_name': 'v_event_registration', 'tbl_name_update': 'event_registration', 'base_name': Event_Registration_Base} obj_type_li['event_session'] = {'table_name': 'v_event_session', 'tbl_name_update': 'event_session', 'base_name': Event_Session_Base, 'exclude_for_db': {'poc_person_id', 'file_count', 'internal_use_count', 'enable_from', 'enable_to', 'event_name', 'event_start_datetime', 'event_end_datetime', 'event_location_name', 'event_track_name', 'event_abstract_list', 'event_badge_list', 'event_device_list', 'event_file_list', 'event_file_internal_use_list', 'event_location', 'event_location_list', 'event_person_list', 'event_presenter_cat', 'event_presentation_list', 'event_presenter_list', 'event_track', 'poc_event_person'}} obj_type_li['event_track'] = {'table_name': 'v_event_track', 'tbl_name_update': 'event_track', 'base_name': Event_Track_Base} obj_type_li['grant'] = {'table_name': 'v_grant', 'tbl_name_update': 'grant', 'base_name': Grant_Base} obj_type_li['journal'] = {'table_name': 'v_journal', 'tbl_name_update': 'journal', 'base_name': Journal_Base} obj_type_li['journal_entry'] = {'table_name': 'v_journal_entry', 'tbl_name_update': 'journal_entry', 'base_name': Journal_Entry_Base} #obj_type_li['log'] = {'table_name': 'log', 'tbl_name_update': 'log', 'base_name': Log_Base} #'v_log' obj_type_li['membership_cfg'] = {'table_name': 'v_membership_cfg', 'tbl_name_update': 'membership_cfg', 'base_name': Membership_Cfg_Base} obj_type_li['membership_group'] = {'table_name': 'v_membership_group', 'tbl_name_update': 'membership_group', 'base_name': Membership_Group_Base} obj_type_li['membership_person_group'] = {'table_name': 'v_membership_person_group', 'tbl_name_update': 'membership_person_group', 'base_name': Membership_Person_Group_Base} obj_type_li['membership_person'] = {'table_name': 'v_membership_person', 'tbl_name_update': 'membership_person', 'base_name': Membership_Person_Base} obj_type_li['membership_person_profile'] = {'table_name': 'v_membership_person_profile', 'tbl_name_update': 'membership_person_profile', 'base_name': Membership_Person_Profile_Base} obj_type_li['membership_type'] = {'table_name': 'v_membership_type', 'tbl_name_update': 'membership_type', 'base_name': Membership_Type_Base} obj_type_li['membership_person_type'] = {'table_name': 'v_membership_person_type', 'tbl_name_update': 'membership_person_type', 'base_name': Membership_Person_Type_Base} #obj_type_li['message'] = {'table_name': 'message', 'tbl_name_update': 'message', 'base_name': Message_Base} #'v_message' obj_type_li['post'] = {'table_name': 'v_post', 'tbl_name_update': 'post', 'base_name': Post_Base} # NOTE check view name: *_detail? obj_type_li['post_comment'] = {'table_name': 'v_post_comment', 'tbl_name_update': 'post_comment', 'base_name': Post_Comment_Base} # NOTE check view name: *_detail? obj_type_li['product'] = {'table_name': 'v_product', 'tbl_name_update': 'product', 'base_name': Product_Base} obj_type_li['sponsorship'] = {'table_name': 'v_sponsorship', 'tbl_name_update': 'sponsorship', 'base_name': Sponsorship_Base, 'tbl': 'v_sponsorship', 'tbl': 'v_sponsorship', 'mdl': Sponsorship_Base } # NOTE check view name: *_detail? obj_type_li['sponsorship_cfg'] = {'table_name': 'v_sponsorship_cfg', 'tbl_name_update': 'sponsorship_cfg', 'base_name': Sponsorship_Cfg_Base} #obj_type_li['stripe_customer'] = {'table_name': 'stripe_customer', 'tbl_name_update': 'stripe_customer', 'base_name': Stripe_Customer_Base} obj_type_li['stripe_log'] = {'table_name': 'stripe_log', 'tbl_name_update': 'stripe_log', 'base_name': Stripe_Log_Base_In} # obj_type_li['c_idda_membership_person_profile'] = {'table_name': 'c_idda_membership_person_profile', 'base_name': C_Idda_membership_person_profile_Base} # obj_type_li['c_osit_demo_membership_person_profile'] = {'table_name': 'c_osit_demo_membership_person_profile', 'base_name': C_Osit_Demo_membership_person_profile_Base} router = APIRouter() # Working on the basic API CRUD - STI 2021-03-08 # Updated 2023-07-06 @router.get('/{obj_type_l1}/list') @router.get('/{obj_type_l1}/{obj_type_l2}/list') @router.get('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/list') async def get_obj_li( obj_type_l1: str = Path(min_length=2, max_length=50), obj_type_l2: Optional[str] = Path(min_length=2, max_length=50), obj_type_l3: Optional[str] = Path(min_length=2, max_length=50), for_obj_type: Optional[str] = Query(None, max_length=50), for_obj_id: Optional[str] = Query(None, max_length=22), use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-11-17 use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-11-17 # field_qry_li: str = Query(None, max_length=150), # JSON formatted key value pair list of fields to search. # fulltext_qry_li: str = Query(None, max_length=150), # JSON formatted key value pair list of fields to search. # fulltext_qry_field_li: str = Header(None), # Json formatted string list of fields to search. It is not ideal that this is in the header. Need a better option, but this is currently a GET request. # fulltext_qry_str: str = Query(None, max_length=150), hidden: str = 'not_hidden', # hidden, not_hidden, all, # order_by_li: dict = None, order_by_li: str = Header(None), # JSON formatted string in a key value format. It is not ideal that this is in the header. Need a better option, but this is currently a GET request. # dh_order_by_li: str = Header(None), # dh_testing: str = Header(None), # h_order_by_li: str = Header(None), # h_testing: str = Header(None), # include: Optional[list] = [], # exclude: Optional[list] = [], # exclude_none: Optional[bool] = True, # Get the "json" param from the query string. This is a JSON formatted string of the data to be inserted. jp: Optional[Union[str, None]] = None, file_type: str = 'CSV', # CSV, Excel return_file: Optional[bool] = False, commons: Common_Route_Params = Depends(common_route_params), ): log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) import urllib # This should be a dict list of fields with a list of values to search for using FULLTEXT. fulltext_qry_dict_obj = None # This should be a dict list of fields with a list of values to search for using AND. and_qry_dict_obj = None # This should be a dict list of fields with a list of values to search for using AND LIKE. and_like_dict_obj = None # This should be a dict list of fields with a list of values to search for using AND IN. and_in_dict_li_obj = None jp_obj = None if jp: log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug( urllib.parse.unquote(jp) ) try: jp_obj = json.loads(urllib.parse.unquote(jp)) except Exception as e: log.warning(e) return mk_resp(data=False, status_code=400, response=commons.response, status_message='The JSON string was not formatted correctly.') log.info(jp_obj) if jp_obj.get('ft_qry'): # NOTE: This is for the fulltext query fulltext_qry_dict_obj = jp_obj['ft_qry'] if jp_obj.get('and_qry'): # NOTE: This is for the additional AND clauses in the WHERE statement and_qry_dict_obj = jp_obj['and_qry'] if jp_obj.get('and_like'): # NOTE: This is for the additional AND LIKE clauses in the WHERE statement and_like_dict_obj = jp_obj['and_like'] if jp_obj.get('and_in_li'): # NOTE: This is for the additional AND IN clauses in the WHERE statement and_in_dict_li_obj = jp_obj['and_in_li'] log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL if order_by_li: order_by_li = json.loads(order_by_li) # # NOTE: This should eventually be used to pass small amounts of data to the API through the URL GET params. -2023-11-29 # if json_str: # NOTE: Currently this does absolutely nothing. It is here for future use. -2023-11-29 # log.debug( urllib.parse.unquote(json_str) ) # try: # json_obj = json.loads(urllib.parse.unquote(json_str)) # except Exception as e: # log.warning(e) # return mk_resp(data=False, status_code=400, response=commons.response, status_message='The JSON string was not formatted correctly.') # log.debug(json_obj) debug_data = {} debug_data['obj_type_l1'] = obj_type_l1 debug_data['obj_type_l2'] = obj_type_l2 debug_data['obj_type_l3'] = obj_type_l3 #debug_data['obj_id'] = obj_id debug_data['for_obj_type'] = for_obj_type debug_data['for_obj_id'] = for_obj_id debug_data['use_alt_table'] = use_alt_table debug_data['use_alt_base'] = use_alt_base debug_data['jp_obj'] = jp_obj # debug_data['fulltext_qry_field_li'] = fulltext_qry_field_li # debug_data['fulltext_qry_str'] = fulltext_qry_str debug_data['hidden'] = hidden debug_data['order_by_li'] = order_by_li if obj_type_l1 == 'lu': # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL pass log.debug(debug_data) if obj_type_l1 and obj_type_l2 and obj_type_l3: obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}' if obj_name in obj_type_li: pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1 and obj_type_l2: obj_name = f'{obj_type_l1}_{obj_type_l2}' if obj_name in obj_type_li: pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1: obj_name = f'{obj_type_l1}' if obj_name in obj_type_li: pass else: return mk_resp(data=False, status_code=400, response=commons.response) else: log.warning('We should not be here') return mk_resp(data=False, status_code=400, response=commons.response) if use_alt_table: table_name = obj_type_li[obj_name]['table_name_alt'] else: table_name = obj_type_li[obj_name]['table_name'] if use_alt_base: base_name = obj_type_li[obj_name]['base_name_alt'] else: base_name = obj_type_li[obj_name]['base_name'] if for_obj_type and for_obj_id: for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type) # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(f'for_obj_type: {for_obj_type}') log.debug(f'for_obj_id: {for_obj_id}') # log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL field_name = f'{for_obj_type}_id' # NOTE: The enabled and hidden parameters are new to this endpoint and the sql_select function! -2023-07-06 sql_result = sql_select( table_name = table_name, field_name = field_name, field_value = for_obj_id, enabled = commons.enabled, hidden = hidden, fulltext_qry_dict = fulltext_qry_dict_obj, and_qry_dict = and_qry_dict_obj, and_like_dict = and_like_dict_obj, and_in_dict_li = and_in_dict_li_obj, # fulltext_qry_field_li = fulltext_qry_field_li, # fulltext_qry_str = fulltext_qry_str, order_by_li = order_by_li, limit = commons.limit, offset = commons.offset, as_list = True, # log_lvl = logging.DEBUG ) else: # NOTE: The enabled and hidden parameters are new to this endpoint and the sql_select function! -2023-07-06 # NOTE: This call (without field_name, field_value, limit, offset) may need more testing. sql_result = sql_select( table_name = table_name, enabled = commons.enabled, hidden = hidden, fulltext_qry_dict = fulltext_qry_dict_obj, and_qry_dict = and_qry_dict_obj, and_like_dict = and_like_dict_obj, and_in_dict_li = and_in_dict_li_obj, # fulltext_qry_field_li = fulltext_qry_field_li, # fulltext_qry_str = fulltext_qry_str, order_by_li = order_by_li, limit = commons.limit, offset = commons.offset, as_list = True, # log_lvl = logging.DEBUG ) # log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(sql_result) if sql_result: if isinstance(sql_result, list): log.setLevel(logging.DEBUG) resp_data_li = [] for record in sql_result: if base_name: log.info(f'base_name was found. Returning data using {base_name} model.') resp_data = base_name(**record).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset) else: log.warning('base_name model was not found. Returning raw data.') resp_data = record resp_data_li.append(resp_data) column_name_li = list(sql_result[0].keys()) # This should be the same for all records in the list. if return_file: log.setLevel(logging.DEBUG) # We want to handle any field that has a suffix of _json. It should be expanded into a list of fields that are prefixed with the original field name minus _json. # This will also allow us to export the data to a CSV or Excel file with the correct column names. # additional_json_field_list_for_export = [] new_resp_data_li = [] for record in resp_data_li: new_record = record.copy() for field_name in record.keys(): field_value = record[field_name] if field_name.endswith('li_json') and record[field_name]: log.info(f'Found a field that ends with li_json: {field_name}') log.info(f'Field value is a list??: {field_value}') # field_value = json.loads(record[key]) # Convert the string to a list of dictionaries # Example JSON data: {'facebook': 'https://www.facebook.com/example', 'twitter': 'https://twitter.com/example', 'instagram': 'https://www.instagram.com/example/', 'linkedin': 'https://www.linkedin.com/school/example', 'org': 'https://example.com/'} # Example new fields: social__facebook, social__twitter, social__instagram, social__linkedin, social__org if isinstance(field_value, list): # Loop through the list of dictionaries log.info(f'Field value is a list: {field_value}') for item in field_value: # item = json.loads(item) # Convert the string to a dictionary for key, value in item.items(): new_field_name = field_name[:-8]+'__'+key new_record[new_field_name] = value else: # Loop through key value pairs in the dictionary log.info(f'Field value is a dict: {field_value}') for key, value in field_value.items(): new_field_name = field_name[:-8]+'__'+key new_record[new_field_name] = value # Create a new field for each and value to the record # new_field_name = key[:-8]+'__cust_li' # new_record[new_field_name] = record[key] new_record.pop(field_name) # Remove the original field elif field_name.endswith('_json') and record[field_name]: log.info(f'Found a field that ends with _json: {field_name}') log.info(f'Field value is a dict???: {field_value}') for key, value in field_value.items(): new_field_name = field_name[:-5]+'__'+key new_record[new_field_name] = value new_record.pop(field_name) # Remove the original field elif field_name.endswith('li_json') or field_name.endswith('_json'): log.info(f'Found a field that ends with li_json or _json but no value: {field_name}') new_record.pop(field_name) # Remove the original field new_resp_data_li.append(new_record) datetime_format='%Y-%m-%d_%H%M' # current_datetime = datetime.datetime.now() # Servers timezone (Eastern) current_datetime_utc = datetime.datetime.utcnow() # UTC timezone current_datetime_utc = current_datetime_utc.strftime(datetime_format) filename = f'{obj_name}_list_{current_datetime_utc}' if file_type == 'CSV': filename_w_ext = filename+'.csv' elif file_type == 'Excel': filename_w_ext = filename+'.xlsx' log.setLevel(logging.INFO) if result := create_export_file(data_dict_list=new_resp_data_li, column_name_li=[], subdir_path=obj_name, filename=filename, rm_id=True, export_type=file_type): log.info(f'Export file created and saved: {result}') else: log.error('Something went wrong while creating or saving the export file') tmp_file_path = result log.info(f'Filename: {filename_w_ext}') if full_tmp_path := return_full_tmp_path(full_tmp_path=tmp_file_path): return FileResponse(path=full_tmp_path, filename=filename_w_ext) else: return mk_resp(data=resp_data_li, response=commons.response) else: status_message='Not Implemented (sort of). Attempted to process this request. Got a SQL result, but the returned data was unexpected.' return mk_resp(data=sql_result, response=commons.response, status_code=501, status_message=status_message) # Returns "Not Implemented" (sort of... unexpected response) else: return mk_resp(data=None, response=commons.response, status_code=404) # Updated 2023-11-03 @router.get('/{obj_type_l1}/{obj_id}') @router.get('/{obj_type_l1}/{obj_type_l2}/{obj_id}') @router.get('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}') async def get_obj( obj_type_l1: str=None, obj_type_l2: str=None, obj_type_l3: str=None, obj_id: str=None, use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-12-01 use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-12-01 # for_obj_type: Optional[str] = Query(None, max_length=50), # NOTE: This is not currently used. It is here for future use. # for_obj_id: Optional[str] = Query(None, max_length=22), # NOTE: This is not currently used. It is here for future use. # qry_str: Optional[str] = Query(None, max_length=50), # qry_int: Optional[int] = None, # include: Optional[list] = [], # exclude: Optional[list] = [], # exclude_none: Optional[bool] = True, commons: Common_Route_Params = Depends(common_route_params), ): """ Simple select object type with an ID: - **obj_type_l1, obj_type_l2, obj_type_l3**: - Examples: - /account = account - /user = user - /user/role = user_role - /event = event - /event/exhibit = event_exhibit - /order = order - /order/cart = order_cart - /order/cart/line = order_cart_line - /lu/some_lookup = lu_some_lookup """ log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # time.sleep(2.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING debug_data = {} debug_data['obj_type_l1'] = obj_type_l1 debug_data['obj_type_l2'] = obj_type_l2 debug_data['obj_type_l3'] = obj_type_l3 debug_data['obj_id'] = obj_id debug_data['use_alt_table'] = use_alt_table debug_data['use_alt_base'] = use_alt_base log.debug(debug_data) if obj_type_l1 and obj_type_l2 and obj_type_l3: obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}' if obj_name in obj_type_li: #table_name = obj_type_li[obj_name] #table_name = obj_type_li[obj_name]['table_name'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1 and obj_type_l2: obj_name = f'{obj_type_l1}_{obj_type_l2}' if obj_name in obj_type_li: #table_name = obj_type_li[obj_name]['table_name'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1: obj_name = f'{obj_type_l1}' if obj_name in obj_type_li: #table_name = obj_type_li[obj_name]['table_name'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) else: log.warning('We should not be here') return mk_resp(data=False, status_code=400, response=commons.response) if use_alt_table: table_name = obj_type_li[obj_name]['table_name_alt'] else: table_name = obj_type_li[obj_name]['table_name'] if use_alt_base: base_name = obj_type_li[obj_name]['base_name_alt'] # log.setLevel(logging.DEBUG) log.debug(debug_data) else: base_name = obj_type_li[obj_name]['base_name'] # NOTE: Add a check for the object ID... assuming it is a random ID string for now. if sql_result := sql_select(table_name=table_name, record_id_random=obj_id): log.debug(sql_result) if base_name: log.info(f'base_name was found. Returning data using {base_name} model.') resp_data = base_name(**sql_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset) else: log.info('base_name model was not found. Returning raw data.') resp_data = sql_result return mk_resp(data=resp_data, response=commons.response) #, details=debug_data) else: log.debug(sql_result) return mk_resp(data=False, status_code=404, response=commons.response) # ### BEGIN ### API CRUD ### patch_obj() ### # Updated 2024-03-08 @router.patch('/{obj_type_l1}/{obj_id}') @router.patch('/{obj_type_l1}/{obj_type_l2}/{obj_id}') @router.patch('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}') async def patch_obj( crud: Api_Crud_Base, obj_type_l1: str = Path(min_length=2, max_length=50), obj_id: str = Path(min_length=11, max_length=22), obj_type_l2: str = None, obj_type_l3: str = None, run_safety_check: bool = True, # for_obj_type: Optional[str] = Query(None, max_length=50), # for_obj_id: Optional[str] = Query(None, max_length=22), # The view name will be prefixed with "v_" and must be a valid view name based on the object type name from the URL. obj_type_l1, obj_type_l2, obj_type_l3 combined below as obj_name return_obj: Optional[bool] = True, # I am not sure how to make this work yet. -2024-03-08 obj_v_name: Optional[str] = None, # Use view name to help return the object type. -2024-03-08 commons: Common_Route_Params = Depends(common_route_params), ): """ Simple patch object type with an ID: - **obj_type_l1, obj_type_l2, obj_type_l3**: - Examples: - /account = account - /user = user - /user/role = user_role - /event = event - /event/exhibit = event_exhibit - /order = order - /order/cart = order_cart - /order/cart/line = order_cart_line - /lu/some_lookup = lu_some_lookup """ log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if crud.super_key == 'zp5PtX4zUsI': pass elif crud.jwt: # pass log.warning('JWT was passed') return mk_resp(data=False, status_code=501, response=commons.response, status_message='Token access for the API CRUD has not been implemented yet.') else: log.warning('Access key is missing or incorrect') return mk_resp(data=False, status_code=400, response=commons.response) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # time.sleep(2.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING debug_data = {} debug_data['crud'] = crud debug_data['create_key'] = crud.create_key debug_data['read_key'] = crud.read_key debug_data['update_key'] = crud.update_key debug_data['delete_key'] = crud.delete_key debug_data['data_list'] = crud.data_list debug_data['obj_type_l1'] = obj_type_l1 debug_data['obj_type_l2'] = obj_type_l2 debug_data['obj_type_l3'] = obj_type_l3 debug_data['obj_id'] = obj_id log.debug(debug_data) if obj_type_l1 and obj_type_l2 and obj_type_l3: obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}' if obj_name in obj_type_li: #table_name = obj_type_li[obj_name] #table_name = obj_type_li[obj_name]['tbl_name_update'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1 and obj_type_l2: obj_name = f'{obj_type_l1}_{obj_type_l2}' if obj_name in obj_type_li: #table_name = obj_type_li[obj_name]['tbl_name_update'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1: obj_name = f'{obj_type_l1}' if obj_name in obj_type_li: #table_name = obj_type_li[obj_name]['tbl_name_update'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) else: log.warning('We should not be here') return mk_resp(data=False, status_code=400, response=commons.response) table_name = obj_type_li[obj_name].get('tbl_name_update') exclude = obj_type_li[obj_name].get('exclude_for_db') # ### SECTION ### Secondary data validation obj_id_random = obj_id # This might need to be used later for the response data if obj_id := redis_lookup_id_random(record_id_random=obj_id, table_name=table_name): pass else: return mk_resp(data=None, status_code=404, response=commons.response, status_message='The object ID was invalid or not found.') # NOTE: Doing a quick sanity check based on the object models and then dump to a dict to get rid of invalid fields. The other option is to just use the crud.data_list raw. crud_data = crud.data_list log.debug(crud_data.keys()) field_list = crud_data.keys() if run_safety_check: log.info('Running safety check by default') base_name = obj_type_li[obj_name]['base_name'] try: obj_model = base_name(**crud.data_list) except Exception as e: log.error('An unknown exception happened. Returning False.') log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') log.warning(json.dumps(crud.data_list, indent=4)) return mk_resp(data=False, status_code=400, response=commons.response, status_message='There was likely a validation error. Returned False.') else: log.info('Successfully updated the object model.') pass log.debug(obj_model) obj_dict = obj_model.dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset, include=field_list) log.debug(obj_dict) crud_data = obj_dict else: log.warning('The default safety check was not run!') obj_dict = crud_data # NOTE: Add a check for the object ID... assuming it is a random ID string for now. Using rm_id_random. That helps with some field names. if sql_result := sql_update(data=crud_data, table_name=table_name, record_id=obj_id, rm_id_random=True, log_lvl=logging.DEBUG): log.info('The record was updated.') log.debug(sql_result) resp_data = {} resp_data['table_name'] = table_name resp_data['request_data'] = obj_dict resp_data['obj_id'] = obj_id resp_data['obj_id_random'] = obj_id_random # Comment out the following line if you do not want to allo for returning the updated data. # return mk_resp(data=resp_data, response=commons.response) #, details=debug_data) elif sql_result == None: log.info('The record was probably not found to be updated.') log.debug(sql_result) return mk_resp(data=None, status_code=404, status_message='The record was probably not found to be updated.', response=commons.response) else: log.info('Something unexpected happened while trying to run the SQL UPDATE. The fields or field values passed may not be valid for the table and or model.') log.debug(sql_result) return mk_resp(data=False, status_code=400, status_message='Something unexpected happened while trying to runt he SQL UPDATE. The fields or field values passed may not be valid for the table and or model.', response=commons.response) # ### SECTION ### Return successful results # NOTE: obj_id was found and verified above # New: 2024-03-08 if return_obj: if obj_v_name: # We should add a sanity check here to make sure the view name is valid first. table_name = f'v_{obj_v_name}' # Remove an extra "v_" if it is there. if table_name.startswith('v_v_'): table_name = table_name[2:] # We can do more later... like check against an allowed list of view names per Aether object type else: # Use the table_name as a backup option? This should be safe since it is also passed through the model. table_name = obj_type_li[obj_name]['table_name'] base_name = obj_type_li[obj_name]['base_name'] if sql_select_result := sql_select(table_name=table_name, record_id=obj_id): log.debug(sql_select_result) log.debug(base_name) resp_data = base_name(**sql_select_result).dict(by_alias=True, exclude_unset=commons.exclude_unset) log.debug(resp_data) log.info(f'Returning object model {base_name} from table_name {table_name}; obj_id; obj_id_random={obj_id_random}) using PATCH data') return mk_resp(data=resp_data, response=commons.response) else: log.debug(sql_select_result) status_message = f'The record was not found in table_name={table_name} used for the SQL SELECT query. This may be a SQL VIEW and may be because the SQL VIEW needs to be created or updated.' return mk_resp(data=False, status_code=404, response=commons.response, status_message=status_message) log.info(f'Returning IDs (obj_id, obj_id_random={obj_id_random}) using PATCH data') return mk_resp(data=resp_data, response=commons.response) #, details=debug_data) # ### END ### API CRUD ### patch_obj() ### # ### BEGIN ### API CRUD ### post_obj() ### # Updated 2024-03-08 @router.post('/{obj_type_l1}') @router.post('/{obj_type_l1}/{obj_type_l2}') @router.post('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}') async def post_obj( crud: Api_Crud_Base, obj_type_l1: Optional[str] = Path(..., max_length=50), obj_type_l2: str = None, obj_type_l3: str = None, # obj_id: str = Path(min_length=11, max_length=22), run_safety_check: bool = True, # The view name will be prefixed with "v_" and must be a valid view name based on the object type name from the URL. obj_type_l1, obj_type_l2, obj_type_l3 combined below as obj_name # for_obj_type: Optional[str] = Query(None, max_length=50), # for_obj_id: Optional[str] = Query(None, max_length=22), return_obj: Optional[bool] = True, obj_v_name: Optional[str] = None, # Use view name to help return the object type. -2024-03-08 commons: Common_Route_Params = Depends(common_route_params), ): """ Simple post object type: - **obj_type_l1, obj_type_l2, obj_type_l3**: - Examples: - /account = account - /user = user - /user/role = user_role - /event = event - /event/exhibit = event_exhibit - /order = order - /order/cart = order_cart - /order/cart/line = order_cart_line - /lu/some_lookup = lu_some_lookup """ log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if crud.super_key == 'zp5PtX4zUsI': pass elif crud.jwt: # pass log.warning('JWT was passed') return mk_resp(data=False, status_code=501, response=commons.response, status_message='Token access for the API CRUD has not been implemented yet.') else: log.warning('Access key is missing or incorrect') return mk_resp(data=False, status_code=400, response=commons.response) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # time.sleep(1.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING debug_data = {} debug_data['crud'] = crud debug_data['create_key'] = crud.create_key debug_data['read_key'] = crud.read_key debug_data['update_key'] = crud.update_key debug_data['delete_key'] = crud.delete_key debug_data['data_list'] = crud.data_list debug_data['obj_type_l1'] = obj_type_l1 debug_data['obj_type_l2'] = obj_type_l2 debug_data['obj_type_l3'] = obj_type_l3 # debug_data['obj_id'] = obj_id log.debug(debug_data) if obj_type_l1 and obj_type_l2 and obj_type_l3: obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}' if obj_name in obj_type_li: #table_name = obj_type_li[obj_name] #table_name = obj_type_li[obj_name]['tbl_name_update'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1 and obj_type_l2: obj_name = f'{obj_type_l1}_{obj_type_l2}' if obj_name in obj_type_li: #table_name = obj_type_li[obj_name]['tbl_name_update'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1: obj_name = f'{obj_type_l1}' if obj_name in obj_type_li: #table_name = obj_type_li[obj_name]['tbl_name_update'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) else: log.warning('We should not be here') return mk_resp(data=False, status_code=400, response=commons.response) # ### SECTION ### Figure out the table_name to use # Updated: 2024-03-08 view_name = None # This is the view name to use for the SQL SELECT query if obj_v_name: # We should add a sanity check here to make sure the view name is valid first. view_name = f'v_{obj_v_name}' # Remove an extra "v_" if it is there. if view_name.startswith('v_v_'): view_name = view_name[2:] # We can do more later... like check against an allowed list of view names per Aether object type elif obj_type_li[obj_name].get('table_name'): # Use the table_name as a backup option? This should be safe. view_name = obj_type_li[obj_name].get('table_name') elif obj_type_li[obj_name].get('table_name_alt'): view_name = obj_type_li[obj_name].get('table_name_alt') elif obj_type_li[obj_name].get('tbl_name_update'): view_name = obj_type_li[obj_name].get('tbl_name_update') else: log.error('The table_name was not found. This is a critical error. Returning False.') return mk_resp(data=False, status_code=400, response=commons.response, status_message='The table_name was not found. This is a critical error.') table_name = None # This is the table name to use for the SQL INSERT or UPDATE if obj_type_li[obj_name].get('tbl_name_update'): table_name = obj_type_li[obj_name].get('tbl_name_update') elif obj_type_li[obj_name].get('table_name'): table_name = obj_type_li[obj_name].get('table_name') # This should be a view in most cases. table_name_select = view_name exclude = obj_type_li[obj_name].get('exclude_for_db') # ### SECTION ### Secondary data validation # if obj_id := redis_lookup_id_random(record_id_random=obj_id, table_name=table_name): pass # else: return mk_resp(data=None, status_code=404, response=commons.response, status_message='The object ID was invalid or not found.') # NOTE: Doing a quick sanity check based on the object models and then dump to a dict to get rid of invalid fields. The other option is to just use the crud.data_list raw. crud_data = crud.data_list log.debug(crud_data.keys()) field_list = crud_data.keys() if run_safety_check: log.info('Running safety check by default') base_name = obj_type_li[obj_name]['base_name'] try: obj_model = base_name(**crud.data_list) except Exception as e: log.error('An unknown exception happened. Returning False.') log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') log.warning(json.dumps(crud.data_list, indent=4)) return mk_resp(data=False, status_code=400, response=commons.response, status_message='There was likely a validation error. Returned False.') else: log.info('Successfully created the object model.') pass log.debug(obj_model) obj_dict = obj_model.dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset, include=field_list) log.debug(obj_dict) crud_data = obj_dict else: log.warning('The default safety check was not run!') obj_dict = crud_data # NOTE: Add a check for the object ID... assuming it is a random ID string for now. Using rm_id_random. That helps with some field names. if sql_result := sql_insert(data=crud_data, table_name=table_name, rm_id_random=True, log_lvl=logging.INFO): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.info('The record was inserted.') log.debug(sql_result) resp_data = {} resp_data['table_name'] = table_name resp_data['request_data'] = obj_dict obj_id = sql_result # The ID should be returned resp_data['obj_id'] = obj_id obj_id_random = get_id_random(record_id=obj_id, table_name=table_name) resp_data['obj_id_random'] = obj_id_random # NOTE: obj_id was found and verified above # Updated: 2024-03-08 if return_obj: log.info('Returning object created from POST data') # This is the more or less the same as what is done in the patch_obj() endpoint. if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id): log.debug(sql_select_result) log.debug(base_name) resp_data = base_name(**sql_select_result).dict(by_alias=True, exclude_unset=commons.exclude_unset) log.debug(resp_data) log.info(f'Returning object model ({base_name}; obj_id; obj_id_random={obj_id_random}) from POST data') return mk_resp(data=resp_data, response=commons.response) else: log.debug(sql_select_result) status_message = 'The record was not found. This may be because a SQL VIEW was used for the SQL SELECT query?' return mk_resp(data=False, status_code=404, response=commons.response, status_message=status_message) log.info('Returning IDs only created from POST data') return mk_resp(data=resp_data, response=commons.response) #, details=debug_data) elif sql_result == None: log.info('The record was probably not found to be updated.') log.debug(sql_result) return mk_resp(data=None, status_code=404, status_message='The record was probably not found to be updated.', response=commons.response) else: log.info('Something unexpected happened while trying to runt he SQL UPDATE. The fields or field values passed may not be valid for the table and or model.') log.debug(sql_result) return mk_resp(data=False, status_code=400, status_message='Something unexpected happened while trying to run the SQL UPDATE. The fields or field values passed may not be valid for the table and or model.', response=commons.response) # ### END ### API CRUD ### post_obj() ### # ### BEGIN ### API CRUD ### delete_obj() ### # Updated 2023-07-10 @router.delete('/{obj_type_l1}/{obj_id}') @router.delete('/{obj_type_l1}/{obj_type_l2}/{obj_id}') @router.delete('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}') async def delete_obj( obj_type_l1: str=None, obj_type_l2: str=None, obj_type_l3: str=None, obj_id: str=None, method: str = 'delete', # None, delete, disable, hide # x_account_id: str = Header(...), # response: Response = Response, commons: Common_Route_Params = Depends(common_route_params), ): """ Simple delete object type with an ID: - **obj_type_l1, obj_type_l2, obj_type_l3**: - Examples: - /account = account - /user = user - /user/role = user_role - /event = event - /event/exhibit = event_exhibit - /event/person/profile = event_person_profile - /order = order - /order/line = order_line - /lu/some_lookup = lu_some_lookup """ log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) debug_data = {} debug_data['obj_type_l1'] = obj_type_l1 debug_data['obj_type_l2'] = obj_type_l2 debug_data['obj_type_l3'] = obj_type_l3 debug_data['obj_id'] = obj_id log.debug(debug_data) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # time.sleep(1.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING if obj_type_l1 and obj_type_l2 and obj_type_l3: obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}' if obj_name in obj_type_li: pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1 and obj_type_l2: obj_name = f'{obj_type_l1}_{obj_type_l2}' if obj_name in obj_type_li: pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1: obj_name = f'{obj_type_l1}' if obj_name in obj_type_li: pass else: return mk_resp(data=False, status_code=400, response=commons.response) else: log.warning('We should not be here') return mk_resp(data=False, status_code=400, response=commons.response) table_name = obj_name # obj_type_li[obj_name]['table_name'] # NOTE: Don't try to use the view! # NOTE: Add a check for the object ID... assuming it is a random ID string for now. if method == 'delete' or method is None: sql_result = sql_delete(table_name=table_name, record_id_random=obj_id) log.debug(sql_result) elif method == 'disable': data = {'enable': False} sql_result = sql_update(data=data, table_name=table_name, record_id_random=obj_id, rm_id_random=True, log_lvl=logging.INFO) elif method == 'hide': data = {'hide': True} sql_result = sql_update(data=data, table_name=table_name, record_id_random=obj_id, rm_id_random=True, log_lvl=logging.INFO) else: log.warning('We should not be here') return mk_resp(data=False, status_code=400, response=commons.response) if sql_result: resp_data = True log.info('The record was found and deleted or updated.') elif sql_result == None: resp_data = None log.info('The record was probably not found to be deleted and or updated.') else: resp_data = False log.info('Something unexpected happened while trying to run the SQL DELETE and or UPDATE. The fields or field values passed may not be valid for the table.') resp_details = '' if method == 'delete' and sql_result: resp_details = f'Object type: {obj_name} Object ID: {obj_id}; deleted' return mk_resp(data=resp_data, details=resp_details, response=commons.response) #, details=debug_data) elif method == 'hide' and sql_result: resp_details = f'Object type: {obj_name} Object ID: {obj_id}; hidden' return mk_resp(data=resp_data, details=resp_details, response=commons.response) #, details=debug_data) elif method == 'disable' and sql_result: resp_details = f'Object type: {obj_name} Object ID: {obj_id}; disabled' return mk_resp(data=resp_data, details=resp_details, response=commons.response) #, details=debug_data) elif sql_result is None: resp_details = f'Not found: Object type: {obj_name} Object ID: {obj_id}; {method}' return mk_resp(data=resp_data, status_code=404, details=resp_details, response=commons.response) #, details=debug_data) else: resp_details = f'Unexpected result: Object type: {obj_name} Object ID: {obj_id}; {method}' return mk_resp(data=resp_data, status_code=400, details=resp_details, response=commons.response) #, details=debug_data) # ### END ### API CRUD ### delete_obj() ### def post_obj_template( obj_type: str, data: dict, id_random_length: int = 8, # Added 2023-04-13; need to move away from this return_obj: bool = True, by_alias: bool = True, include: Optional[list] = [], exclude: Optional[list] = [], exclude_unset: Optional[bool] = True, exclude_none: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_data_dict = data obj_data = lookup_id_random_pop(obj_data_dict) table_name_insert = obj_type table_name_select = obj_type_li[obj_type]['table_name'] base_name = obj_type_li[obj_type]['base_name'] if sql_insert_result := sql_insert(table_name=table_name_insert, data=obj_data, id_random_length=id_random_length): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(sql_insert_result) obj_id = sql_insert_result else: log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(sql_insert_result) return mk_resp(data=False, status_code=400, response=response) if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id): log.debug(sql_select_result) resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset) return mk_resp(data=resp_data, response=response) else: log.debug(sql_select_result) return mk_resp(data=False, status_code=404, response=response) def patch_obj_template( obj_type: str, data: dict, obj_id: str, return_obj: bool=True, by_alias: bool=True, include: Optional[list] = [], exclude: Optional[list] = [], exclude_unset: Optional[bool] = True, exclude_none: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_data_dict = data #obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type) obj_data_dict['id_random'] = obj_id # NOTE: Adding this in so the id_random is NOT updated log.debug(obj_data_dict) obj_data = lookup_id_random_pop(obj_data_dict) table_name_update = obj_type table_name_select = obj_type_li[obj_type]['table_name'] base_name = obj_type_li[obj_type]['base_name'] if sql_update_result := sql_update(table_name=table_name_update, data=obj_data): log.debug(sql_update_result) #obj_id = sql_update_result obj_id = obj_data_dict['id'] else: # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(sql_update_result) return mk_resp(data=False, status_code=400, response=response) if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id): log.debug(sql_select_result) resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset) return mk_resp(data=resp_data, response=response) else: log.debug(sql_select_result) return mk_resp(data=False, status_code=404, response=response) def get_obj_li_template( obj_type: str = Query(None, max_length=50), for_obj_type: Optional[str] = Query(None, max_length=50), for_obj_id: Optional[Union[int,str]] = None, by_alias: Optional[bool] = True, include: Optional[list] = [], exclude: Optional[list] = [], exclude_unset: Optional[bool] = True, exclude_none: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if isinstance(for_obj_id, int): pass elif isinstance(for_obj_id, str): for_obj_id_random = for_obj_id for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id_random, table_name=for_obj_type) table_name_select = obj_type_li[obj_type]['table_name'] if for_obj_type and for_obj_id: field_name = f'{for_obj_type}_id' sql_result = sql_select(table_name=table_name_select, field_name=field_name, field_value=for_obj_id) else: sql_result = sql_select(table_name=table_name_select) log.debug(sql_result) base_name = obj_type_li[obj_type]['base_name'] resp_data_li = [] for record in sql_result: resp_data = base_name(**record).dict(by_alias=by_alias, exclude_unset=exclude_unset) resp_data_li.append(resp_data) return mk_resp(data=resp_data_li, response=response) def get_obj_template( obj_id: Union[int,str], obj_type: str = Query(None, max_length=50), by_alias: Optional[bool] = True, include: Optional[list] = [], exclude: Optional[list] = [], exclude_unset: Optional[bool] = True, exclude_none: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if isinstance(obj_id, int): pass elif isinstance(obj_id, str): obj_id_random = obj_id obj_id = redis_lookup_id_random(record_id_random=obj_id_random, table_name=obj_type) table_name_select = obj_type_li[obj_type]['table_name'] if obj_id: sql_result = sql_select(table_name=table_name_select, record_id=obj_id) else: return mk_resp(data=False, status_code=404, response=response) if sql_result: log.debug(sql_result) base_name = obj_type_li[obj_type]['base_name'] resp_data = base_name(**sql_result).dict(by_alias=by_alias, exclude_unset=exclude_unset) return mk_resp(data=resp_data, response=response) else: log.debug(sql_result) return mk_resp(data=False, status_code=404, response=response) def delete_obj_template( obj_type: str = Query(None, max_length=50), obj_id: str = Query(None, max_length=22), response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) debug_data = {} debug_data['obj_type'] = obj_type debug_data['obj_id'] = obj_id log.debug(debug_data) table_name_delete = obj_type # NOTE: Not using the table name from the object type list because it may be a view (v_*). # NOTE: Add a check for the object ID... assuming it is a random ID string for now. if sql_result := sql_delete(table_name=table_name_delete, record_id_random=obj_id): log.debug(sql_result) return mk_resp(data=True, response=response) else: log.debug(sql_result) return mk_resp(data=False, status_code=404, response=response) # New dynamic API CRUD endpoint # The POST data should be JSON formatted # @router.post('/query') # def query( # # qry JSON should contain these properties: # # list: for_obj_type, for_obj_id, tbl_view_name, base_name # # id: obj_id, tbl_view_name, base_name # qry: str, # commons: Common_Route_Params = Depends(common_route_params), # ): # log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(locals()) # import urllib