import datetime, json, time #from datetime import datetime, time, timedelta from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, Response, status from fastapi.responses import FileResponse # from pydantic import BaseModel, EmailStr, Field from typing import Dict, List, Optional, Set, Union from app.lib_general import log, logging, common_route_params, Common_Route_Params # from app.config import settings from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random, lookup_id_random_pop from app.ae_obj_types_def import * router = APIRouter() # Working on the basic API CRUD - STI 2021-03-08 # Updated 2023-07-06 @router.get('/{obj_type_l1}/list') @router.get('/{obj_type_l1}/{obj_type_l2}/list') @router.get('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/list') async def get_obj_li( obj_type_l1: str = Query(..., min_length=2, max_length=50), obj_type_l2: str = Query(None, min_length=2, max_length=50), obj_type_l3: str = Query(None, min_length=2, max_length=50), for_obj_type: Optional[str] = Query(None, max_length=50), for_obj_id: Optional[str] = Query(None, max_length=22), tbl_alt: Optional[str] = Query('default', max_length=50), # This is used as a lookup for the real SQL database table or view name to use. mdl_alt: Optional[str] = Query('default', max_length=50), # This is used as a lookup for the real Python Pydantic model name to use. # use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-11-17 # use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-11-17 # field_qry_li: str = Query(None, max_length=150), # JSON formatted key value pair list of fields to search. # fulltext_qry_li: str = Query(None, max_length=150), # JSON formatted key value pair list of fields to search. # fulltext_qry_field_li: str = Header(None), # Json formatted string list of fields to search. It is not ideal that this is in the header. Need a better option, but this is currently a GET request. # fulltext_qry_str: str = Query(None, max_length=150), hidden: str = 'not_hidden', # hidden, not_hidden, all, # order_by_li: dict = None, order_by_li: str = Header(None), # JSON formatted string in a key value format. It is not ideal that this is in the header. Need a better option, but this is currently a GET request. # dh_order_by_li: str = Header(None), # dh_testing: str = Header(None), # h_order_by_li: str = Header(None), # h_testing: str = Header(None), # include: Optional[list] = [], # exclude: Optional[list] = [], # exclude_none: Optional[bool] = True, # Get the "json" param from the query string. This is a JSON formatted string of the data to be inserted. jp: Optional[Union[str, None]] = None, file_type: str = 'CSV', # CSV, Excel return_file: Optional[bool] = False, commons: Common_Route_Params = Depends(common_route_params), ): log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) import urllib # This should be a dict list of fields with a list of values to search for using FULLTEXT. fulltext_qry_dict_obj = None # This should be a dict list of fields with a list of values to search for using AND. and_qry_dict_obj = None # This should be a dict list of fields with a list of values to search for using AND LIKE. and_like_dict_obj = None # This should be a dict list of fields with a list of values to search for using AND IN. and_in_dict_li_obj = None jp_obj = None if jp: log.setLevel(logging.INFO) log.debug( urllib.parse.unquote(jp) ) try: jp_obj = json.loads(urllib.parse.unquote(jp)) except Exception as e: log.warning(e) return mk_resp(data=False, status_code=400, response=commons.response, status_message='The JSON string was not formatted correctly.') log.info(jp_obj) if jp_obj.get('ft_qry'): # NOTE: This is for the fulltext query fulltext_qry_dict_obj = jp_obj['ft_qry'] if jp_obj.get('and_qry'): # NOTE: This is for the additional AND clauses in the WHERE statement and_qry_dict_obj = jp_obj['and_qry'] if jp_obj.get('and_like'): # NOTE: This is for the additional AND LIKE clauses in the WHERE statement and_like_dict_obj = jp_obj['and_like'] if jp_obj.get('and_in_li'): # NOTE: This is for the additional AND IN clauses in the WHERE statement and_in_dict_li_obj = jp_obj['and_in_li'] log.setLevel(logging.WARNING) if order_by_li: order_by_li = json.loads(order_by_li) # # NOTE: This should eventually be used to pass small amounts of data to the API through the URL GET params. -2023-11-29 # if json_str: # NOTE: Currently this does absolutely nothing. It is here for future use. -2023-11-29 # log.debug( urllib.parse.unquote(json_str) ) # try: # json_obj = json.loads(urllib.parse.unquote(json_str)) # except Exception as e: # log.warning(e) # return mk_resp(data=False, status_code=400, response=commons.response, status_message='The JSON string was not formatted correctly.') # log.debug(json_obj) debug_data = {} debug_data['obj_type_l1'] = obj_type_l1 debug_data['obj_type_l2'] = obj_type_l2 debug_data['obj_type_l3'] = obj_type_l3 #debug_data['obj_id'] = obj_id debug_data['for_obj_type'] = for_obj_type debug_data['for_obj_id'] = for_obj_id debug_data['tbl_alt'] = tbl_alt debug_data['mdl_alt'] = mdl_alt # debug_data['use_alt_table'] = use_alt_table # debug_data['use_alt_base'] = use_alt_base debug_data['jp_obj'] = jp_obj # debug_data['fulltext_qry_field_li'] = fulltext_qry_field_li # debug_data['fulltext_qry_str'] = fulltext_qry_str debug_data['hidden'] = hidden debug_data['order_by_li'] = order_by_li if obj_type_l1 == 'lu': # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL pass log.debug(debug_data) if obj_type_l1 and obj_type_l2 and obj_type_l3: obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}' if obj_name in obj_type_kv_li: pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1 and obj_type_l2: obj_name = f'{obj_type_l1}_{obj_type_l2}' if obj_name in obj_type_kv_li: pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1: obj_name = f'{obj_type_l1}' if obj_name in obj_type_kv_li: pass else: return mk_resp(data=False, status_code=400, response=commons.response) else: log.warning('We should not be here') return mk_resp(data=False, status_code=400, response=commons.response) # log.setLevel(logging.DEBUG) log.info(f'obj_name: {obj_name}') log.debug(obj_type_kv_li[obj_name]) if tbl_alt: table_name = obj_type_kv_li[obj_name][f'tbl_{tbl_alt}'] # table_name = obj_type_kv_li[obj_name].get(f'tbl_{tbl_alt}') log.info(f'tbl_alt was found. Using {table_name} table.') if mdl_alt: base_name = obj_type_kv_li[obj_name][f'mdl_{mdl_alt}'] # base_name = obj_type_kv_li[obj_name].get(f'mdl_{mdl_alt}') log.info(f'mdl_alt was found. Using {base_name} model.') # if use_alt_table: # table_name = obj_type_kv_li[obj_name]['table_name_alt'] # else: # table_name = obj_type_kv_li[obj_name]['table_name'] # if use_alt_base: # base_name = obj_type_kv_li[obj_name]['base_name_alt'] # else: # base_name = obj_type_kv_li[obj_name]['base_name'] if for_obj_type and for_obj_id: for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type) # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(f'for_obj_type: {for_obj_type}') log.debug(f'for_obj_id: {for_obj_id}') # log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL field_name = f'{for_obj_type}_id' # NOTE: The enabled and hidden parameters are new to this endpoint and the sql_select function! -2023-07-06 sql_result = sql_select( table_name = table_name, field_name = field_name, field_value = for_obj_id, enabled = commons.enabled, hidden = hidden, fulltext_qry_dict = fulltext_qry_dict_obj, and_qry_dict = and_qry_dict_obj, and_like_dict = and_like_dict_obj, and_in_dict_li = and_in_dict_li_obj, # fulltext_qry_field_li = fulltext_qry_field_li, # fulltext_qry_str = fulltext_qry_str, order_by_li = order_by_li, limit = commons.limit, offset = commons.offset, as_list = True, # log_lvl = logging.DEBUG ) else: # NOTE: The enabled and hidden parameters are new to this endpoint and the sql_select function! -2023-07-06 # NOTE: This call (without field_name, field_value, limit, offset) may need more testing. sql_result = sql_select( table_name = table_name, enabled = commons.enabled, hidden = hidden, fulltext_qry_dict = fulltext_qry_dict_obj, and_qry_dict = and_qry_dict_obj, and_like_dict = and_like_dict_obj, and_in_dict_li = and_in_dict_li_obj, # fulltext_qry_field_li = fulltext_qry_field_li, # fulltext_qry_str = fulltext_qry_str, order_by_li = order_by_li, limit = commons.limit, offset = commons.offset, as_list = True, # log_lvl = logging.DEBUG ) # log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(sql_result) if sql_result: if isinstance(sql_result, list): # log.setLevel(logging.DEBUG) resp_data_li = [] for record in sql_result: if base_name: log.info(f'base_name was found. Returning data using {base_name} model.') resp_data = base_name(**record).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset) else: log.warning('base_name model was not found. Returning raw data.') resp_data = record resp_data_li.append(resp_data) column_name_li = list(sql_result[0].keys()) # This should be the same for all records in the list. if return_file: log.setLevel(logging.DEBUG) # We want to handle any field that has a suffix of _json. It should be expanded into a list of fields that are prefixed with the original field name minus _json. # This will also allow us to export the data to a CSV or Excel file with the correct column names. # additional_json_field_list_for_export = [] new_resp_data_li = [] for record in resp_data_li: new_record = record.copy() for field_name in record.keys(): field_value = record[field_name] if field_name.endswith('li_json') and record[field_name]: log.info(f'Found a field that ends with li_json: {field_name}') log.info(f'Field value is a list??: {field_value}') # field_value = json.loads(record[key]) # Convert the string to a list of dictionaries # Example JSON data: {'facebook': 'https://www.facebook.com/example', 'twitter': 'https://twitter.com/example', 'instagram': 'https://www.instagram.com/example/', 'linkedin': 'https://www.linkedin.com/school/example', 'org': 'https://example.com/'} # Example new fields: social__facebook, social__twitter, social__instagram, social__linkedin, social__org if isinstance(field_value, list): # Loop through the list of dictionaries log.info(f'Field value is a list: {field_value}') for item in field_value: # item = json.loads(item) # Convert the string to a dictionary for key, value in item.items(): new_field_name = field_name[:-8]+'__'+key new_record[new_field_name] = value else: # Loop through key value pairs in the dictionary log.info(f'Field value is a dict: {field_value}') for key, value in field_value.items(): log.debug(f'Key: {key}') new_field_name = field_name[:-8]+'__'+key new_record[new_field_name] = value # Create a new field for each and value to the record # new_field_name = key[:-8]+'__cust_li' # new_record[new_field_name] = record[key] new_record.pop(field_name) # Remove the original field elif field_name.endswith('_json') and record[field_name]: log.info(f'Found a field that ends with _json: {field_name}') log.info(f'Field value is a dict???: {field_value}') for key, value in field_value.items(): new_field_name = field_name[:-5]+'__'+key new_record[new_field_name] = value new_record.pop(field_name) # Remove the original field elif field_name.endswith('li_json') or field_name.endswith('_json'): log.info(f'Found a field that ends with li_json or _json but no value: {field_name}') new_record.pop(field_name) # Remove the original field new_resp_data_li.append(new_record) datetime_format='%Y-%m-%d_%H%M' # current_datetime = datetime.datetime.now() # Servers timezone (Eastern) current_datetime_utc = datetime.datetime.utcnow() # UTC timezone current_datetime_utc = current_datetime_utc.strftime(datetime_format) filename = f'{obj_name}_list_{current_datetime_utc}' if file_type == 'CSV': filename_w_ext = filename+'.csv' elif file_type == 'Excel': filename_w_ext = filename+'.xlsx' log.setLevel(logging.INFO) if result := create_export_file(data_dict_list=new_resp_data_li, column_name_li=[], subdir_path=obj_name, filename=filename, rm_id=True, export_type=file_type): log.info(f'Export file created and saved: {result}') else: log.error('Something went wrong while creating or saving the export file') tmp_file_path = result log.info(f'Filename: {filename_w_ext}') if full_tmp_path := return_full_tmp_path(full_tmp_path=tmp_file_path): return FileResponse(path=full_tmp_path, filename=filename_w_ext) else: return mk_resp(data=resp_data_li, response=commons.response) else: status_message='Not Implemented (sort of). Attempted to process this request. Got a SQL result, but the returned data was unexpected.' return mk_resp(data=sql_result, response=commons.response, status_code=501, status_message=status_message) # Returns "Not Implemented" (sort of... unexpected response) else: return mk_resp(data=None, response=commons.response, status_code=404) # Updated 2023-11-03 @router.get('/{obj_type_l1}/{obj_id}') @router.get('/{obj_type_l1}/{obj_type_l2}/{obj_id}') @router.get('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}') async def get_obj( obj_type_l1: str=None, obj_type_l2: str=None, obj_type_l3: str=None, obj_id: str=None, use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-12-01 use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-12-01 # for_obj_type: Optional[str] = Query(None, max_length=50), # NOTE: This is not currently used. It is here for future use. # for_obj_id: Optional[str] = Query(None, max_length=22), # NOTE: This is not currently used. It is here for future use. # qry_str: Optional[str] = Query(None, max_length=50), # qry_int: Optional[int] = None, # include: Optional[list] = [], # exclude: Optional[list] = [], # exclude_none: Optional[bool] = True, commons: Common_Route_Params = Depends(common_route_params), ): """ Simple select object type with an ID: - **obj_type_l1, obj_type_l2, obj_type_l3**: - Examples: - /account = account - /user = user - /user/role = user_role - /event = event - /event/exhibit = event_exhibit - /order = order - /order/cart = order_cart - /order/cart/line = order_cart_line - /lu/some_lookup = lu_some_lookup """ log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # time.sleep(2.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING debug_data = {} debug_data['obj_type_l1'] = obj_type_l1 debug_data['obj_type_l2'] = obj_type_l2 debug_data['obj_type_l3'] = obj_type_l3 debug_data['obj_id'] = obj_id debug_data['use_alt_table'] = use_alt_table debug_data['use_alt_base'] = use_alt_base log.debug(debug_data) if obj_type_l1 and obj_type_l2 and obj_type_l3: obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}' if obj_name in obj_type_kv_li: #table_name = obj_type_kv_li[obj_name] #table_name = obj_type_kv_li[obj_name]['table_name'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1 and obj_type_l2: obj_name = f'{obj_type_l1}_{obj_type_l2}' if obj_name in obj_type_kv_li: #table_name = obj_type_kv_li[obj_name]['table_name'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1: obj_name = f'{obj_type_l1}' if obj_name in obj_type_kv_li: #table_name = obj_type_kv_li[obj_name]['table_name'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) else: log.warning('We should not be here') return mk_resp(data=False, status_code=400, response=commons.response) if use_alt_table: table_name = obj_type_kv_li[obj_name]['table_name_alt'] else: table_name = obj_type_kv_li[obj_name]['table_name'] if use_alt_base: base_name = obj_type_kv_li[obj_name]['base_name_alt'] # log.setLevel(logging.DEBUG) log.debug(debug_data) else: base_name = obj_type_kv_li[obj_name]['base_name'] # NOTE: Add a check for the object ID... assuming it is a random ID string for now. if sql_result := sql_select(table_name=table_name, record_id_random=obj_id): log.debug(sql_result) if base_name: log.info(f'base_name was found. Returning data using {base_name} model.') resp_data = base_name(**sql_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset) else: log.info('base_name model was not found. Returning raw data.') resp_data = sql_result return mk_resp(data=resp_data, response=commons.response) #, details=debug_data) else: log.debug(sql_result) return mk_resp(data=False, status_code=404, response=commons.response) # ### BEGIN ### API CRUD ### patch_obj() ### # Updated 2024-03-08 @router.patch('/{obj_type_l1}/{obj_id}') @router.patch('/{obj_type_l1}/{obj_type_l2}/{obj_id}') @router.patch('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}') async def patch_obj( crud: Api_Crud_Base, obj_type_l1: Optional[str] = Query(..., max_length=50), obj_type_l2: str = None, obj_type_l3: str = None, obj_id: str = Query(..., min_length=11, max_length=22), run_safety_check: bool = True, # for_obj_type: Optional[str] = Query(None, max_length=50), # for_obj_id: Optional[str] = Query(None, max_length=22), # The view name will be prefixed with "v_" and must be a valid view name based on the object type name from the URL. obj_type_l1, obj_type_l2, obj_type_l3 combined below as obj_name return_obj: Optional[bool] = True, # I am not sure how to make this work yet. -2024-03-08 obj_v_name: Optional[str] = None, # Use view name to help return the object type. -2024-03-08 commons: Common_Route_Params = Depends(common_route_params), ): """ Simple patch object type with an ID: - **obj_type_l1, obj_type_l2, obj_type_l3**: - Examples: - /account = account - /user = user - /user/role = user_role - /event = event - /event/exhibit = event_exhibit - /order = order - /order/cart = order_cart - /order/cart/line = order_cart_line - /lu/some_lookup = lu_some_lookup """ log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if crud.super_key == 'zp5PtX4zUsI': pass elif crud.jwt: # pass log.warning('JWT was passed') return mk_resp(data=False, status_code=501, response=commons.response, status_message='Token access for the API CRUD has not been implemented yet.') else: log.warning('Access key is missing or incorrect') return mk_resp(data=False, status_code=400, response=commons.response) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # time.sleep(2.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING debug_data = {} debug_data['crud'] = crud debug_data['create_key'] = crud.create_key debug_data['read_key'] = crud.read_key debug_data['update_key'] = crud.update_key debug_data['delete_key'] = crud.delete_key debug_data['data_list'] = crud.data_list debug_data['obj_type_l1'] = obj_type_l1 debug_data['obj_type_l2'] = obj_type_l2 debug_data['obj_type_l3'] = obj_type_l3 debug_data['obj_id'] = obj_id log.debug(debug_data) if obj_type_l1 and obj_type_l2 and obj_type_l3: obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}' if obj_name in obj_type_kv_li: #table_name = obj_type_kv_li[obj_name] #table_name = obj_type_kv_li[obj_name]['tbl_name_update'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1 and obj_type_l2: obj_name = f'{obj_type_l1}_{obj_type_l2}' if obj_name in obj_type_kv_li: #table_name = obj_type_kv_li[obj_name]['tbl_name_update'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1: obj_name = f'{obj_type_l1}' if obj_name in obj_type_kv_li: #table_name = obj_type_kv_li[obj_name]['tbl_name_update'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) else: log.warning('We should not be here') return mk_resp(data=False, status_code=400, response=commons.response) table_name = obj_type_kv_li[obj_name].get('tbl_name_update') exclude = obj_type_kv_li[obj_name].get('exclude_for_db') # ### SECTION ### Secondary data validation obj_id_random = obj_id # This might need to be used later for the response data if obj_id := redis_lookup_id_random(record_id_random=obj_id, table_name=table_name): pass else: return mk_resp(data=None, status_code=404, response=commons.response, status_message='The object ID was invalid or not found.') # NOTE: Doing a quick sanity check based on the object models and then dump to a dict to get rid of invalid fields. The other option is to just use the crud.data_list raw. crud_data = crud.data_list log.debug(crud_data.keys()) field_list = crud_data.keys() if run_safety_check: log.info('Running safety check by default') base_name = obj_type_kv_li[obj_name]['base_name'] try: obj_model = base_name(**crud.data_list) except Exception as e: log.error('An unknown exception happened. Returning False.') log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') log.warning(json.dumps(crud.data_list, indent=4)) return mk_resp(data=False, status_code=400, response=commons.response, status_message='There was likely a validation error. Returned False.') else: log.info('Successfully updated the object model.') pass log.debug(obj_model) obj_dict = obj_model.dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset, include=field_list) log.debug(obj_dict) crud_data = obj_dict else: log.warning('The default safety check was not run!') obj_dict = crud_data # NOTE: Add a check for the object ID... assuming it is a random ID string for now. Using rm_id_random. That helps with some field names. if sql_result := sql_update(data=crud_data, table_name=table_name, record_id=obj_id, rm_id_random=True, log_lvl=logging.DEBUG): log.info('The record was updated.') log.debug(sql_result) resp_data = {} resp_data['table_name'] = table_name resp_data['request_data'] = obj_dict resp_data['obj_id'] = obj_id resp_data['obj_id_random'] = obj_id_random # Comment out the following line if you do not want to allo for returning the updated data. # return mk_resp(data=resp_data, response=commons.response) #, details=debug_data) elif sql_result == None: log.info('The record was probably not found to be updated.') log.debug(sql_result) return mk_resp(data=None, status_code=404, status_message='The record was probably not found to be updated.', response=commons.response) else: log.info('Something unexpected happened while trying to run the SQL UPDATE. The fields or field values passed may not be valid for the table and or model.') log.debug(sql_result) return mk_resp(data=False, status_code=400, status_message='Something unexpected happened while trying to runt he SQL UPDATE. The fields or field values passed may not be valid for the table and or model.', response=commons.response) # ### SECTION ### Return successful results # NOTE: obj_id was found and verified above # New: 2024-03-08 if return_obj: if obj_v_name: # We should add a sanity check here to make sure the view name is valid first. table_name = f'v_{obj_v_name}' # Remove an extra "v_" if it is there. if table_name.startswith('v_v_'): table_name = table_name[2:] # We can do more later... like check against an allowed list of view names per Aether object type else: # Use the table_name as a backup option? This should be safe since it is also passed through the model. table_name = obj_type_kv_li[obj_name]['table_name'] base_name = obj_type_kv_li[obj_name]['base_name'] if sql_select_result := sql_select(table_name=table_name, record_id=obj_id): log.debug(sql_select_result) log.debug(base_name) resp_data = base_name(**sql_select_result).dict(by_alias=True, exclude_unset=commons.exclude_unset) log.debug(resp_data) log.info(f'Returning object model {base_name} from table_name {table_name}; obj_id; obj_id_random={obj_id_random}) using PATCH data') return mk_resp(data=resp_data, response=commons.response) else: log.debug(sql_select_result) status_message = f'The record was not found in table_name={table_name} used for the SQL SELECT query. This may be a SQL VIEW and may be because the SQL VIEW needs to be created or updated.' return mk_resp(data=False, status_code=404, response=commons.response, status_message=status_message) log.info(f'Returning IDs (obj_id, obj_id_random={obj_id_random}) using PATCH data') return mk_resp(data=resp_data, response=commons.response) #, details=debug_data) # ### END ### API CRUD ### patch_obj() ### # ### BEGIN ### API CRUD ### post_obj() ### # Updated 2024-03-08 @router.post('/{obj_type_l1}') @router.post('/{obj_type_l1}/{obj_type_l2}') @router.post('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}') async def post_obj( crud: Api_Crud_Base, obj_type_l1: Optional[str] = Query(..., max_length=50), obj_type_l2: str = None, obj_type_l3: str = None, # obj_id: str = Query(..., min_length=11, max_length=22), run_safety_check: bool = True, # The view name will be prefixed with "v_" and must be a valid view name based on the object type name from the URL. obj_type_l1, obj_type_l2, obj_type_l3 combined below as obj_name # for_obj_type: Optional[str] = Query(None, max_length=50), # for_obj_id: Optional[str] = Query(None, max_length=22), return_obj: Optional[bool] = True, obj_v_name: Optional[str] = None, # Use view name to help return the object type. -2024-03-08 commons: Common_Route_Params = Depends(common_route_params), ): """ Simple post object type: - **obj_type_l1, obj_type_l2, obj_type_l3**: - Examples: - /account = account - /user = user - /user/role = user_role - /event = event - /event/exhibit = event_exhibit - /order = order - /order/cart = order_cart - /order/cart/line = order_cart_line - /lu/some_lookup = lu_some_lookup """ log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if crud.super_key == 'zp5PtX4zUsI': pass elif crud.jwt: # pass log.warning('JWT was passed') return mk_resp(data=False, status_code=501, response=commons.response, status_message='Token access for the API CRUD has not been implemented yet.') else: log.warning('Access key is missing or incorrect') return mk_resp(data=False, status_code=400, response=commons.response) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # time.sleep(1.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING debug_data = {} debug_data['crud'] = crud debug_data['create_key'] = crud.create_key debug_data['read_key'] = crud.read_key debug_data['update_key'] = crud.update_key debug_data['delete_key'] = crud.delete_key debug_data['data_list'] = crud.data_list debug_data['obj_type_l1'] = obj_type_l1 debug_data['obj_type_l2'] = obj_type_l2 debug_data['obj_type_l3'] = obj_type_l3 # debug_data['obj_id'] = obj_id log.debug(debug_data) if obj_type_l1 and obj_type_l2 and obj_type_l3: obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}' if obj_name in obj_type_kv_li: #table_name = obj_type_kv_li[obj_name] #table_name = obj_type_kv_li[obj_name]['tbl_name_update'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1 and obj_type_l2: obj_name = f'{obj_type_l1}_{obj_type_l2}' if obj_name in obj_type_kv_li: #table_name = obj_type_kv_li[obj_name]['tbl_name_update'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1: obj_name = f'{obj_type_l1}' if obj_name in obj_type_kv_li: #table_name = obj_type_kv_li[obj_name]['tbl_name_update'] pass else: return mk_resp(data=False, status_code=400, response=commons.response) else: log.warning('We should not be here') return mk_resp(data=False, status_code=400, response=commons.response) # ### SECTION ### Figure out the table_name to use # Updated: 2024-03-08 view_name = None # This is the view name to use for the SQL SELECT query if obj_v_name: # We should add a sanity check here to make sure the view name is valid first. view_name = f'v_{obj_v_name}' # Remove an extra "v_" if it is there. if view_name.startswith('v_v_'): view_name = view_name[2:] # We can do more later... like check against an allowed list of view names per Aether object type elif obj_type_kv_li[obj_name].get('table_name'): # Use the table_name as a backup option? This should be safe. view_name = obj_type_kv_li[obj_name].get('table_name') elif obj_type_kv_li[obj_name].get('table_name_alt'): view_name = obj_type_kv_li[obj_name].get('table_name_alt') elif obj_type_kv_li[obj_name].get('tbl_name_update'): view_name = obj_type_kv_li[obj_name].get('tbl_name_update') else: log.error('The table_name was not found. This is a critical error. Returning False.') return mk_resp(data=False, status_code=400, response=commons.response, status_message='The table_name was not found. This is a critical error.') table_name = None # This is the table name to use for the SQL INSERT or UPDATE if obj_type_kv_li[obj_name].get('tbl_name_update'): table_name = obj_type_kv_li[obj_name].get('tbl_name_update') elif obj_type_kv_li[obj_name].get('table_name'): table_name = obj_type_kv_li[obj_name].get('table_name') # This should be a view in most cases. table_name_select = view_name exclude = obj_type_kv_li[obj_name].get('exclude_for_db') # ### SECTION ### Secondary data validation # if obj_id := redis_lookup_id_random(record_id_random=obj_id, table_name=table_name): pass # else: return mk_resp(data=None, status_code=404, response=commons.response, status_message='The object ID was invalid or not found.') # NOTE: Doing a quick sanity check based on the object models and then dump to a dict to get rid of invalid fields. The other option is to just use the crud.data_list raw. crud_data = crud.data_list log.debug(crud_data.keys()) field_list = crud_data.keys() if run_safety_check: log.info('Running safety check by default') base_name = obj_type_kv_li[obj_name]['base_name'] try: obj_model = base_name(**crud.data_list) except Exception as e: log.error('An unknown exception happened. Returning False.') log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****') log.warning(json.dumps(crud.data_list, indent=4)) return mk_resp(data=False, status_code=400, response=commons.response, status_message='There was likely a validation error. Returned False.') else: log.info('Successfully created the object model.') pass log.debug(obj_model) obj_dict = obj_model.dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset, include=field_list) log.debug(obj_dict) crud_data = obj_dict else: log.warning('The default safety check was not run!') obj_dict = crud_data # NOTE: Add a check for the object ID... assuming it is a random ID string for now. Using rm_id_random. That helps with some field names. if sql_result := sql_insert(data=crud_data, table_name=table_name, rm_id_random=True, log_lvl=logging.INFO): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.info('The record was inserted.') log.debug(sql_result) resp_data = {} resp_data['table_name'] = table_name resp_data['request_data'] = obj_dict obj_id = sql_result # The ID should be returned resp_data['obj_id'] = obj_id obj_id_random = get_id_random(record_id=obj_id, table_name=table_name) resp_data['obj_id_random'] = obj_id_random # NOTE: obj_id was found and verified above # Updated: 2024-03-08 if return_obj: log.info('Returning object created from POST data') # This is the more or less the same as what is done in the patch_obj() endpoint. if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id): log.debug(sql_select_result) log.debug(base_name) resp_data = base_name(**sql_select_result).dict(by_alias=True, exclude_unset=commons.exclude_unset) log.debug(resp_data) log.info(f'Returning object model ({base_name}; obj_id; obj_id_random={obj_id_random}) from POST data') return mk_resp(data=resp_data, response=commons.response) else: log.debug(sql_select_result) status_message = 'The record was not found. This may be because a SQL VIEW was used for the SQL SELECT query?' return mk_resp(data=False, status_code=404, response=commons.response, status_message=status_message) log.info('Returning IDs only created from POST data') return mk_resp(data=resp_data, response=commons.response) #, details=debug_data) elif sql_result == None: log.info('The record was probably not found to be updated.') log.debug(sql_result) return mk_resp(data=None, status_code=404, status_message='The record was probably not found to be updated.', response=commons.response) else: log.info('Something unexpected happened while trying to runt he SQL UPDATE. The fields or field values passed may not be valid for the table and or model.') log.debug(sql_result) return mk_resp(data=False, status_code=400, status_message='Something unexpected happened while trying to run the SQL UPDATE. The fields or field values passed may not be valid for the table and or model.', response=commons.response) # ### END ### API CRUD ### post_obj() ### # ### BEGIN ### API CRUD ### delete_obj() ### # Updated 2023-07-10 @router.delete('/{obj_type_l1}/{obj_id}') @router.delete('/{obj_type_l1}/{obj_type_l2}/{obj_id}') @router.delete('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}') async def delete_obj( obj_type_l1: str=None, obj_type_l2: str=None, obj_type_l3: str=None, obj_id: str=None, method: str = 'delete', # None, delete, disable, hide # x_account_id: str = Header(...), # response: Response = Response, commons: Common_Route_Params = Depends(common_route_params), ): """ Simple delete object type with an ID: - **obj_type_l1, obj_type_l2, obj_type_l3**: - Examples: - /account = account - /user = user - /user/role = user_role - /event = event - /event/exhibit = event_exhibit - /event/person/profile = event_person_profile - /order = order - /order/line = order_line - /lu/some_lookup = lu_some_lookup """ log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) debug_data = {} debug_data['obj_type_l1'] = obj_type_l1 debug_data['obj_type_l2'] = obj_type_l2 debug_data['obj_type_l3'] = obj_type_l3 debug_data['obj_id'] = obj_id log.debug(debug_data) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # time.sleep(1.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING if obj_type_l1 and obj_type_l2 and obj_type_l3: obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}' if obj_name in obj_type_kv_li: pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1 and obj_type_l2: obj_name = f'{obj_type_l1}_{obj_type_l2}' if obj_name in obj_type_kv_li: pass else: return mk_resp(data=False, status_code=400, response=commons.response) elif obj_type_l1: obj_name = f'{obj_type_l1}' if obj_name in obj_type_kv_li: pass else: return mk_resp(data=False, status_code=400, response=commons.response) else: log.warning('We should not be here') return mk_resp(data=False, status_code=400, response=commons.response) table_name = obj_name # obj_type_kv_li[obj_name]['table_name'] # NOTE: Don't try to use the view! # NOTE: Add a check for the object ID... assuming it is a random ID string for now. if method == 'delete' or method is None: sql_result = sql_delete(table_name=table_name, record_id_random=obj_id) log.debug(sql_result) elif method == 'disable': data = {'enable': False} sql_result = sql_update(data=data, table_name=table_name, record_id_random=obj_id, rm_id_random=True, log_lvl=logging.INFO) elif method == 'hide': data = {'hide': True} sql_result = sql_update(data=data, table_name=table_name, record_id_random=obj_id, rm_id_random=True, log_lvl=logging.INFO) else: log.warning('We should not be here') return mk_resp(data=False, status_code=400, response=commons.response) if sql_result: resp_data = True log.info('The record was found and deleted or updated.') elif sql_result == None: resp_data = None log.info('The record was probably not found to be deleted and or updated.') else: resp_data = False log.info('Something unexpected happened while trying to run the SQL DELETE and or UPDATE. The fields or field values passed may not be valid for the table.') resp_details = '' if method == 'delete' and sql_result: resp_details = f'Object type: {obj_name} Object ID: {obj_id}; deleted' return mk_resp(data=resp_data, details=resp_details, response=commons.response) #, details=debug_data) elif method == 'hide' and sql_result: resp_details = f'Object type: {obj_name} Object ID: {obj_id}; hidden' return mk_resp(data=resp_data, details=resp_details, response=commons.response) #, details=debug_data) elif method == 'disable' and sql_result: resp_details = f'Object type: {obj_name} Object ID: {obj_id}; disabled' return mk_resp(data=resp_data, details=resp_details, response=commons.response) #, details=debug_data) elif sql_result is None: resp_details = f'Not found: Object type: {obj_name} Object ID: {obj_id}; {method}' return mk_resp(data=resp_data, status_code=404, details=resp_details, response=commons.response) #, details=debug_data) else: resp_details = f'Unexpected result: Object type: {obj_name} Object ID: {obj_id}; {method}' return mk_resp(data=resp_data, status_code=400, details=resp_details, response=commons.response) #, details=debug_data) # ### END ### API CRUD ### delete_obj() ### def post_obj_template( obj_type: str, data: dict, id_random_length: int = 8, # Added 2023-04-13; need to move away from this return_obj: bool = True, by_alias: bool = True, include: Optional[list] = [], exclude: Optional[list] = [], exclude_unset: Optional[bool] = True, exclude_none: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_data_dict = data obj_data = lookup_id_random_pop(obj_data_dict) table_name_insert = obj_type table_name_select = obj_type_kv_li[obj_type]['table_name'] base_name = obj_type_kv_li[obj_type]['base_name'] if sql_insert_result := sql_insert(table_name=table_name_insert, data=obj_data, id_random_length=id_random_length): log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(sql_insert_result) obj_id = sql_insert_result else: log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(sql_insert_result) return mk_resp(data=False, status_code=400, response=response) if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id): log.debug(sql_select_result) resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset) return mk_resp(data=resp_data, response=response) else: log.debug(sql_select_result) return mk_resp(data=False, status_code=404, response=response) def patch_obj_template( obj_type: str, data: dict, obj_id: str, return_obj: bool=True, by_alias: bool=True, include: Optional[list] = [], exclude: Optional[list] = [], exclude_unset: Optional[bool] = True, exclude_none: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) obj_data_dict = data #obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type) obj_data_dict['id_random'] = obj_id # NOTE: Adding this in so the id_random is NOT updated log.debug(obj_data_dict) obj_data = lookup_id_random_pop(obj_data_dict) table_name_update = obj_type table_name_select = obj_type_kv_li[obj_type]['table_name'] base_name = obj_type_kv_li[obj_type]['base_name'] if sql_update_result := sql_update(table_name=table_name_update, data=obj_data): log.debug(sql_update_result) #obj_id = sql_update_result obj_id = obj_data_dict['id'] else: # log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(sql_update_result) return mk_resp(data=False, status_code=400, response=response) if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id): log.debug(sql_select_result) resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset) return mk_resp(data=resp_data, response=response) else: log.debug(sql_select_result) return mk_resp(data=False, status_code=404, response=response) def get_obj_li_template( obj_type: str = Query(None, max_length=50), for_obj_type: Optional[str] = Query(None, max_length=50), for_obj_id: Optional[Union[int,str]] = None, by_alias: Optional[bool] = True, include: Optional[list] = [], exclude: Optional[list] = [], exclude_unset: Optional[bool] = True, exclude_none: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if isinstance(for_obj_id, int): pass elif isinstance(for_obj_id, str): for_obj_id_random = for_obj_id for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id_random, table_name=for_obj_type) table_name_select = obj_type_kv_li[obj_type]['table_name'] if for_obj_type and for_obj_id: field_name = f'{for_obj_type}_id' sql_result = sql_select(table_name=table_name_select, field_name=field_name, field_value=for_obj_id) else: sql_result = sql_select(table_name=table_name_select) log.debug(sql_result) base_name = obj_type_kv_li[obj_type]['base_name'] resp_data_li = [] for record in sql_result: resp_data = base_name(**record).dict(by_alias=by_alias, exclude_unset=exclude_unset) resp_data_li.append(resp_data) return mk_resp(data=resp_data_li, response=response) def get_obj_template( obj_id: Union[int,str], obj_type: str = Query(None, max_length=50), by_alias: Optional[bool] = True, include: Optional[list] = [], exclude: Optional[list] = [], exclude_unset: Optional[bool] = True, exclude_none: Optional[bool] = True, response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) if isinstance(obj_id, int): pass elif isinstance(obj_id, str): obj_id_random = obj_id obj_id = redis_lookup_id_random(record_id_random=obj_id_random, table_name=obj_type) table_name_select = obj_type_kv_li[obj_type]['table_name'] if obj_id: sql_result = sql_select(table_name=table_name_select, record_id=obj_id) else: return mk_resp(data=False, status_code=404, response=response) if sql_result: log.debug(sql_result) base_name = obj_type_kv_li[obj_type]['base_name'] resp_data = base_name(**sql_result).dict(by_alias=by_alias, exclude_unset=exclude_unset) return mk_resp(data=resp_data, response=response) else: log.debug(sql_result) return mk_resp(data=False, status_code=404, response=response) def delete_obj_template( obj_type: str = Query(None, max_length=50), obj_id: str = Query(None, max_length=22), response: Response = Response, ): log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.debug(locals()) debug_data = {} debug_data['obj_type'] = obj_type debug_data['obj_id'] = obj_id log.debug(debug_data) table_name_delete = obj_type # NOTE: Not using the table name from the object type list because it may be a view (v_*). # NOTE: Add a check for the object ID... assuming it is a random ID string for now. if sql_result := sql_delete(table_name=table_name_delete, record_id_random=obj_id): log.debug(sql_result) return mk_resp(data=True, response=response) else: log.debug(sql_result) return mk_resp(data=False, status_code=404, response=response) # New dynamic API CRUD endpoint # The POST data should be JSON formatted # @router.post('/query') # def query( # # qry JSON should contain these properties: # # list: for_obj_type, for_obj_id, tbl_view_name, base_name # # id: obj_id, tbl_view_name, base_name # qry: str, # commons: Common_Route_Params = Depends(common_route_params), # ): # log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL # log.debug(locals()) # import urllib