Files
OSIT-AE-API-FastAPI/app/routers/api_crud_v2.py
Scott Idem 9d89d4c8e4 fix: exclude account_id and virtual fields from archive_content DB writes
- Adds fields_to_exclude_from_db to Archive_Content_Base to prevent SQL errors on non-existent columns.
- Updates documentation for V3 Create/Update patterns and the x-ae-ignore-extra-fields header.
- Propagates account_id_random to hosted file and media processing methods.
2026-02-24 11:30:17 -05:00

1526 lines
66 KiB
Python

import datetime, json, time
#from datetime import datetime, time, timedelta
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Path, Query, Response, status
from fastapi.responses import FileResponse
# from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from app.lib_general import log, logging, common_route_params, Common_Route_Params
# from app.config import settings
from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random, lookup_id_random_pop
from app.ae_obj_types_def import *
router = APIRouter()
# Working on the basic API CRUD - STI 2021-03-08
# obj_type_l1: str = Path(min_length=2, max_length=50),
# obj_type_l2: str = Path(min_length=2, max_length=50),
# obj_type_l3: str = Path(min_length=2, max_length=50),
# for_obj_type: Optional[str] = Query(None, max_length=50),
# for_obj_id: Optional[str] = Query(None, max_length=22),
# tbl_alt: Optional[str] = Query('default', max_length=50), # This is used as a lookup for the real SQL database table or view name to use.
# mdl_alt: Optional[str] = Query('default', max_length=50), # This is used as a lookup for the real Python Pydantic model name to use.
# # use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-11-17
# # use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-11-17
# # field_qry_li: str = Query(None, max_length=150), # JSON formatted key value pair list of fields to search.
# # fulltext_qry_li: str = Query(None, max_length=150), # JSON formatted key value pair list of fields to search.
# # fulltext_qry_field_li: str = Header(None), # Json formatted string list of fields to search. It is not ideal that this is in the header. Need a better option, but this is currently a GET request.
# # fulltext_qry_str: str = Query(None, max_length=150),
# hidden: str = 'not_hidden', # hidden, not_hidden, all,
# # order_by_li: dict = None,
# order_by_li: str = Header(None), # JSON formatted string in a key value format. It is not ideal that this is in the header. Need a better option, but this is currently a GET request.
# # dh_order_by_li: str = Header(None),
# # dh_testing: str = Header(None),
# # h_order_by_li: str = Header(None),
# # h_testing: str = Header(None),
# # include: Optional[list] = [],
# # exclude: Optional[list] = [],
# # exclude_none: Optional[bool] = True,
# # Get the "json" param from the query string. This is a JSON formatted string of the data to be inserted.
# jp: Optional[Union[str, None]] = None,
# file_type: str = 'CSV', # CSV, Excel
# return_file: Optional[bool] = False,
# commons: Common_Route_Params = Depends(common_route_params),
# Updated 2023-07-06
@router.get('/{obj_type_l1}/list')
async def get_obj_li_l1(
obj_type_l1: str = Path(min_length=2, max_length=50),
for_obj_type: Optional[str] = Query(None, max_length=50),
for_obj_id: Optional[str] = Query(None, max_length=22),
tbl_alt: Optional[str] = Query('default', max_length=50), # This is used as a lookup for the real SQL database table or view name to use.
mdl_alt: Optional[str] = Query('default', max_length=50), # This is used as a lookup for the real Python Pydantic model name to use.
exp_alt: Optional[str] = Query('default', max_length=50), # This is used as a lookup for a list of column names to use.
# use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-11-17
# use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-11-17
# field_qry_li: str = Query(None, max_length=150), # JSON formatted key value pair list of fields to search.
# fulltext_qry_li: str = Query(None, max_length=150), # JSON formatted key value pair list of fields to search.
# fulltext_qry_field_li: str = Header(None), # Json formatted string list of fields to search. It is not ideal that this is in the header. Need a better option, but this is currently a GET request.
# fulltext_qry_str: str = Query(None, max_length=150),
hidden: str = 'not_hidden', # hidden, not_hidden, all,
# order_by_li: dict = None,
order_by_li: str = Header(None), # JSON formatted string in a key value format. It is not ideal that this is in the header. Need a better option, but this is currently a GET request.
# dh_order_by_li: str = Header(None),
# dh_testing: str = Header(None),
# h_order_by_li: str = Header(None),
# h_testing: str = Header(None),
# include: Optional[list] = [],
# exclude: Optional[list] = [],
# exclude_none: Optional[bool] = True,
# Get the "json" param from the query string. This is a JSON formatted string of the data to be inserted.
jp: Optional[Union[str, None]] = None,
file_type: str = 'CSV', # CSV, Excel
return_file: Optional[bool] = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# ### SECTION ### Call generic function to get the list of objects
return handle_get_obj_li(
obj_type_l1=obj_type_l1,
for_obj_type=for_obj_type,
for_obj_id=for_obj_id,
tbl_alt=tbl_alt,
mdl_alt=mdl_alt,
exp_alt=exp_alt,
hidden=hidden,
order_by_li=order_by_li,
jp=jp,
file_type=file_type,
return_file=return_file,
commons=commons,
)
@router.get('/{obj_type_l1}/{obj_type_l2}/list')
async def get_obj_li_l2(
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_type_l2: str = Path(min_length=2, max_length=50),
for_obj_type: Optional[str] = Query(None, max_length=50),
for_obj_id: Optional[str] = Query(None, max_length=22),
tbl_alt: Optional[str] = Query('default', max_length=50), # This is used as a lookup for the real SQL database table or view name to use.
mdl_alt: Optional[str] = Query('default', max_length=50), # This is used as a lookup for the real Python Pydantic model name to use.
exp_alt: Optional[str] = Query('default', max_length=50), # This is used as a lookup for a list of column names to use.
# use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-11-17
# use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-11-17
# field_qry_li: str = Query(None, max_length=150), # JSON formatted key value pair list of fields to search.
# fulltext_qry_li: str = Query(None, max_length=150), # JSON formatted key value pair list of fields to search.
# fulltext_qry_field_li: str = Header(None), # Json formatted string list of fields to search. It is not ideal that this is in the header. Need a better option, but this is currently a GET request.
# fulltext_qry_str: str = Query(None, max_length=150),
hidden: str = 'not_hidden', # hidden, not_hidden, all,
# order_by_li: dict = None,
order_by_li: str = Header(None), # JSON formatted string in a key value format. It is not ideal that this is in the header. Need a better option, but this is currently a GET request.
# dh_order_by_li: str = Header(None),
# dh_testing: str = Header(None),
# h_order_by_li: str = Header(None),
# h_testing: str = Header(None),
# include: Optional[list] = [],
# exclude: Optional[list] = [],
# exclude_none: Optional[bool] = True,
# Get the "json" param from the query string. This is a JSON formatted string of the data to be inserted.
jp: Optional[Union[str, None]] = None,
file_type: str = 'CSV', # CSV, Excel
return_file: Optional[bool] = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# ### SECTION ### Call generic function to get the list of objects
return handle_get_obj_li(
obj_type_l1=obj_type_l1,
obj_type_l2=obj_type_l2,
for_obj_type=for_obj_type,
for_obj_id=for_obj_id,
tbl_alt=tbl_alt,
mdl_alt=mdl_alt,
exp_alt=exp_alt,
hidden=hidden,
order_by_li=order_by_li,
jp=jp,
file_type=file_type,
return_file=return_file,
commons=commons,
)
@router.get('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/list')
async def get_obj_li_l3(
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_type_l2: str = Path(min_length=2, max_length=50),
obj_type_l3: str = Path(min_length=2, max_length=50),
for_obj_type: Optional[str] = Query(None, max_length=50),
for_obj_id: Optional[str] = Query(None, max_length=22),
tbl_alt: Optional[str] = Query('default', max_length=50), # This is used as a lookup for the real SQL database table or view name to use.
mdl_alt: Optional[str] = Query('default', max_length=50), # This is used as a lookup for the real Python Pydantic model name to use.
exp_alt: Optional[str] = Query('default', max_length=50), # This is used as a lookup for a list of column names to use.
# use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-11-17
# use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-11-17
# field_qry_li: str = Query(None, max_length=150), # JSON formatted key value pair list of fields to search.
# fulltext_qry_li: str = Query(None, max_length=150), # JSON formatted key value pair list of fields to search.
# fulltext_qry_field_li: str = Header(None), # Json formatted string list of fields to search. It is not ideal that this is in the header. Need a better option, but this is currently a GET request.
# fulltext_qry_str: str = Query(None, max_length=150),
hidden: str = 'not_hidden', # hidden, not_hidden, all,
# order_by_li: dict = None,
order_by_li: str = Header(None), # JSON formatted string in a key value format. It is not ideal that this is in the header. Need a better option, but this is currently a GET request.
# dh_order_by_li: str = Header(None),
# dh_testing: str = Header(None),
# h_order_by_li: str = Header(None),
# h_testing: str = Header(None),
# include: Optional[list] = [],
# exclude: Optional[list] = [],
# exclude_none: Optional[bool] = True,
# Get the "json" param from the query string. This is a JSON formatted string of the data to be inserted.
jp: Optional[Union[str, None]] = None,
file_type: str = 'CSV', # CSV, Excel
return_file: Optional[bool] = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# ### SECTION ### Call generic function to get the list of objects
return handle_get_obj_li(
obj_type_l1=obj_type_l1,
obj_type_l2=obj_type_l2,
obj_type_l3=obj_type_l3,
for_obj_type=for_obj_type,
for_obj_id=for_obj_id,
tbl_alt=tbl_alt,
mdl_alt=mdl_alt,
exp_alt=exp_alt,
hidden=hidden,
order_by_li=order_by_li,
jp=jp,
file_type=file_type,
return_file=return_file,
commons=commons,
)
# Updated 2024-08-14
def handle_get_obj_li(
obj_type_l1: str,
obj_type_l2: Optional[str] = None,
obj_type_l3: Optional[str] = None,
for_obj_type: Optional[str] = None,
for_obj_id: Optional[str] = None,
tbl_alt: Optional[str] = 'default',
mdl_alt: Optional[str] = 'default',
exp_alt: Optional[str] = 'default',
hidden: str = 'not_hidden',
order_by_li: Optional[str] = None,
jp: Optional[Union[str, None]] = None,
file_type: str = 'CSV',
return_file: Optional[bool] = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
import urllib
# This should be a list of SQL WHERE parts defined in JSON.
qry_dict_li = None
# This should be a dict list of fields with a list of values to search for using FULLTEXT.
fulltext_qry_dict_obj = None
# This should be a dict list of fields with a list of values to search for using AND.
and_qry_dict_obj = None
# This should be a dict list of fields with a list of values to search for using AND LIKE.
and_like_dict_obj = None
# This should be a dict list of fields with a list of values to search for using OR LIKE.
or_like_dict_obj = None
# This should be a dict list of fields with a list of values to search for using AND IN.
and_in_dict_li_obj = None
jp_obj = None
if jp:
# log.setLevel(logging.INFO)
log.debug( urllib.parse.unquote(jp) )
try:
jp_obj = json.loads(urllib.parse.unquote(jp))
except Exception as e:
log.warning(e)
return mk_resp(data=False, status_code=400, response=commons.response, status_message='The JSON string was not formatted correctly.')
log.info(jp_obj)
# Updated 2024-08-14
if jp_obj.get('qry'): # NOTE: This is for specific additional WHERE clauses in the SQL statement
# Example JSON:
# jp: {
# qry: [
# {
# type: "AND",
# field: "enable",
# operator: "=",
# value: TRUE
# },
# {
# type: "AND",
# field: "example",
# operator: ">=",
# value: 2
# },
# {
# type: "OR",
# field: "test",
# operator: "LIKE",
# value: "%xyz%"
# },
# ]
# }
qry_dict_li = jp_obj['qry']
if jp_obj.get('ft_qry'): # NOTE: This is for the fulltext query
fulltext_qry_dict_obj = jp_obj['ft_qry']
if jp_obj.get('and_qry'): # NOTE: This is for the additional AND clauses in the WHERE statement
and_qry_dict_obj = jp_obj['and_qry']
if jp_obj.get('and_like'): # NOTE: This is for the additional AND LIKE clauses in the WHERE statement
and_like_dict_obj = jp_obj['and_like']
if jp_obj.get('or_like'): # NOTE: This is for the additional OR LIKE clauses in the WHERE statement
or_like_dict_obj = jp_obj['or_like']
if jp_obj.get('and_in_li'): # NOTE: This is for the additional AND IN clauses in the WHERE statement
and_in_dict_li_obj = jp_obj['and_in_li']
log.setLevel(logging.WARNING)
if order_by_li:
order_by_li = json.loads(order_by_li)
# # NOTE: This should eventually be used to pass small amounts of data to the API through the URL GET params. -2023-11-29
# if json_str: # NOTE: Currently this does absolutely nothing. It is here for future use. -2023-11-29
# log.debug( urllib.parse.unquote(json_str) )
# try:
# json_obj = json.loads(urllib.parse.unquote(json_str))
# except Exception as e:
# log.warning(e)
# return mk_resp(data=False, status_code=400, response=commons.response, status_message='The JSON string was not formatted correctly.')
# log.debug(json_obj)
debug_data = {}
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
#debug_data['obj_id'] = obj_id
debug_data['for_obj_type'] = for_obj_type
debug_data['for_obj_id'] = for_obj_id
debug_data['tbl_alt'] = tbl_alt
debug_data['mdl_alt'] = mdl_alt
debug_data['exp_alt'] = exp_alt
# debug_data['use_alt_table'] = use_alt_table
# debug_data['use_alt_base'] = use_alt_base
debug_data['jp_obj'] = jp_obj
# debug_data['fulltext_qry_field_li'] = fulltext_qry_field_li
# debug_data['fulltext_qry_str'] = fulltext_qry_str
debug_data['hidden'] = hidden
debug_data['order_by_li'] = order_by_li
if obj_type_l1 == 'lu':
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
pass
log.debug(debug_data)
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_kv_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_kv_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_kv_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
# log.setLevel(logging.DEBUG)
log.info(f'obj_name: {obj_name}')
log.debug(obj_type_kv_li[obj_name])
# Updated 2024-08-14
# Check the table (view) name to make sure it is valid.
if tbl_alt: # and tbl_alt != 'default':
try:
table_name = obj_type_kv_li[obj_name][f'tbl_{tbl_alt}']
# table_name = obj_type_kv_li[obj_name].get(f'tbl_{tbl_alt}')
except:
table_name = obj_type_kv_li[obj_name]['tbl']
log.warning(f'tbl_alt ({tbl_alt}) was not found. Using {table_name} table.')
log.info(f'tbl_alt ({tbl_alt}) was found. Using {table_name} table.')
else:
table_name = obj_type_kv_li[obj_name]['tbl']
log.warning(f'tbl_alt ({tbl_alt}) was not found. Using {table_name} table.')
# Check the model name to make sure it is valid.
if mdl_alt:
try:
base_name = obj_type_kv_li[obj_name][f'mdl_{mdl_alt}']
# base_name = obj_type_kv_li[obj_name].get(f'mdl_{mdl_alt}')
except:
base_name = obj_type_kv_li[obj_name]['mdl']
log.info(f'mdl_alt ({mdl_alt}) was found. Using {base_name} model.')
else:
base_name = obj_type_kv_li[obj_name]['mdl']
log.warning(f'mdl_alt ({mdl_alt}) was not found. Using {base_name} model.')
# If we are returning a file, check the column name list to make sure it is valid.
if return_file and exp_alt:
try:
column_name_li = obj_type_kv_li[obj_name][f'exp_{exp_alt}']
# column_name_li = obj_type_kv_li[obj_name].get(f'exp_{exp_alt}')
except:
column_name_li = obj_type_kv_li[obj_name]['exp']
log.info(f'exp_alt ({exp_alt}) was found. Using {column_name_li} column list.')
# else:
# column_name_li = obj_type_kv_li[obj_name]['exp']
# log.warning(f'exp_alt ({exp_alt}) was not found. Using {column_name_li} column list.')
if for_obj_type and for_obj_id:
for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(f'for_obj_type: {for_obj_type}')
log.debug(f'for_obj_id: {for_obj_id}')
# log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
field_name = f'{for_obj_type}_id'
# NOTE: The enabled and hidden parameters are new to this endpoint and the sql_select function! -2023-07-06
sql_result = sql_select(
table_name = table_name,
field_name = field_name,
field_value = for_obj_id,
enabled = commons.enabled,
hidden = hidden,
qry_dict_li = qry_dict_li,
fulltext_qry_dict = fulltext_qry_dict_obj,
and_qry_dict = and_qry_dict_obj,
and_like_dict = and_like_dict_obj,
or_like_dict = or_like_dict_obj,
and_in_dict_li = and_in_dict_li_obj,
# fulltext_qry_field_li = fulltext_qry_field_li,
# fulltext_qry_str = fulltext_qry_str,
order_by_li = order_by_li,
limit = commons.limit,
offset = commons.offset,
as_list = True,
# log_lvl = logging.DEBUG
)
else:
# NOTE: The enabled and hidden parameters are new to this endpoint and the sql_select function! -2023-07-06
# NOTE: This call (without field_name, field_value, limit, offset) may need more testing.
sql_result = sql_select(
table_name = table_name,
enabled = commons.enabled,
hidden = hidden,
qry_dict_li = qry_dict_li,
fulltext_qry_dict = fulltext_qry_dict_obj,
and_qry_dict = and_qry_dict_obj,
and_like_dict = and_like_dict_obj,
or_like_dict = or_like_dict_obj,
and_in_dict_li = and_in_dict_li_obj,
# fulltext_qry_field_li = fulltext_qry_field_li,
# fulltext_qry_str = fulltext_qry_str,
order_by_li = order_by_li,
limit = commons.limit,
offset = commons.offset,
as_list = True,
# log_lvl = logging.DEBUG
)
# log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(sql_result)
if sql_result:
if isinstance(sql_result, list):
# log.setLevel(logging.DEBUG)
resp_data_li = []
for record in sql_result:
if base_name:
log.info(f'base_name was found. Returning data using {base_name} model.')
resp_data = base_name(**record).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
else:
log.warning('base_name model "{base_name}" was not found. Returning raw data.')
resp_data = record
resp_data_li.append(resp_data)
if return_file:
log.setLevel(logging.INFO)
if not column_name_li:
column_name_li = list(sql_result[0].keys()) # This should be the same for all records in the list.
log.info(f'Column names: {column_name_li}')
additional_column_name_li = []
# We want to handle any field that has a suffix of _json. It should be expanded into a list of fields that are prefixed with the original field name minus _json.
# This will also allow us to export the data to a CSV or Excel file with the correct column names.
# additional_json_field_list_for_export = []
new_resp_data_li = []
for record in resp_data_li:
new_record = record.copy()
for field_name in record.keys():
field_value = record[field_name]
if field_name.endswith('li_json') and record[field_name]:
log.info(f'Found a field that ends with li_json: {field_name}: {field_value}')
# field_value = json.loads(record[key]) # Convert the string to a list of dictionaries
# Example JSON data: {'facebook': 'https://www.facebook.com/example', 'twitter': 'https://twitter.com/example', 'instagram': 'https://www.instagram.com/example/', 'linkedin': 'https://www.linkedin.com/school/example', 'org': 'https://example.com/'}
# Example new fields: social__facebook, social__twitter, social__instagram, social__linkedin, social__org
if isinstance(field_value, list):
# Loop through the list of dictionaries
log.info(f'Field value is a list (int key): {field_value}')
key = 1
for value in field_value:
new_field_name = field_name[:-8]+'__'+str(key)
new_record[new_field_name] = value
additional_column_name_li.append(new_field_name)
key += 1
# for item in field_value:
# # item = json.loads(item) # Convert the string to a dictionary
# for key, value in item.items():
# new_field_name = field_name[:-8]+'__'+key
# new_record[new_field_name] = value
# additional_column_name_li.append(new_field_name)
# We want to keep the original field in this case. This way the list of data will be displayed in one single field and expanded to one field per item in the list.
# new_record.pop(field_name) # Remove the original field
else:
# Loop through key value pairs in the dictionary
log.info(f'Field value is a dict (list): {field_value}')
for key, value in field_value.items():
log.debug(f'Key: {key}')
new_field_name = field_name[:-8]+'__'+key
new_record[new_field_name] = value
additional_column_name_li.append(new_field_name)
new_record.pop(field_name) # Remove the original field
# Create a new field for each and value to the record
# new_field_name = key[:-8]+'__cust_li'
# new_record[new_field_name] = record[key]
elif field_name.endswith('_json') and record[field_name]:
log.info(f'Found a field that ends with _json: {field_name}')
log.info(f'Field value is a dict???: {field_value}')
for key, value in field_value.items():
new_field_name = field_name[:-5]+'__'+key
new_record[new_field_name] = value
additional_column_name_li.append(new_field_name)
new_record.pop(field_name) # Remove the original field
elif field_name.endswith('li_json') or field_name.endswith('_json'):
log.info(f'Found a field that ends with li_json or _json but no value: {field_name}')
new_record.pop(field_name) # Remove the original field
new_resp_data_li.append(new_record)
datetime_format='%Y-%m-%d_%H%M'
# current_datetime = datetime.datetime.now() # Servers timezone (Eastern)
current_datetime_utc = datetime.datetime.utcnow() # UTC timezone
current_datetime_utc = current_datetime_utc.strftime(datetime_format)
filename = f'{obj_name}_list_{current_datetime_utc}'
if file_type == 'CSV':
filename_w_ext = filename+'.csv'
elif file_type == 'Excel':
filename_w_ext = filename+'.xlsx'
log.setLevel(logging.INFO)
if not exp_alt and additional_column_name_li:
column_name_li = column_name_li + sorted(list(set(additional_column_name_li)))
elif additional_column_name_li:
column_name_li = column_name_li + sorted(list(set(additional_column_name_li)))
log.info(f'Column names: {column_name_li}')
log.info(f'Additional column names: {sorted(set(additional_column_name_li))}')
if result := create_export_file(data_dict_list=new_resp_data_li, column_name_li=column_name_li, subdir_path=obj_name, filename=filename, rm_id=True, export_type=file_type):
log.info(f'Export file created and saved: {result}')
else:
log.error('Something went wrong while creating or saving the export file')
tmp_file_path = result
log.info(f'Filename: {filename_w_ext}')
if full_tmp_path := return_full_tmp_path(full_tmp_path=tmp_file_path):
return FileResponse(path=full_tmp_path, filename=filename_w_ext)
else:
return mk_resp(data=resp_data_li, response=commons.response)
else:
status_message='Not Implemented (sort of). Attempted to process this request. Got a SQL result, but the returned data was unexpected.'
return mk_resp(data=sql_result, response=commons.response, status_code=501, status_message=status_message) # Returns "Not Implemented" (sort of... unexpected response)
else: return mk_resp(data=None, response=commons.response, status_code=404)
# Updated 2024-08-14
@router.get('/{obj_type_l1}/{obj_id}')
@router.get('/{obj_type_l1}/{obj_type_l2}/{obj_id}')
@router.get('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}')
async def get_obj(
obj_type_l1: str=None,
obj_type_l2: str=None,
obj_type_l3: str=None,
obj_id: str=None,
tbl_alt: Optional[str] = 'default',
mdl_alt: Optional[str] = 'default',
# use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-12-01
# use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-12-01
# for_obj_type: Optional[str] = Query(None, max_length=50), # NOTE: This is not currently used. It is here for future use.
# for_obj_id: Optional[str] = Query(None, max_length=22), # NOTE: This is not currently used. It is here for future use.
# qry_str: Optional[str] = Query(None, max_length=50),
# qry_int: Optional[int] = None,
# include: Optional[list] = [],
# exclude: Optional[list] = [],
# exclude_none: Optional[bool] = True,
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Simple select object type with an ID:
- **obj_type_l1, obj_type_l2, obj_type_l3**:
- Examples:
- /account = account
- /user = user
- /user/role = user_role
- /event = event
- /event/exhibit = event_exhibit
- /order = order
- /order/cart = order_cart
- /order/cart/line = order_cart_line
- /lu/some_lookup = lu_some_lookup
"""
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# time.sleep(2.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
debug_data = {}
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
debug_data['obj_id'] = obj_id
debug_data['tbl_alt'] = tbl_alt
debug_data['mdl_alt'] = mdl_alt
log.debug(debug_data)
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]
#table_name = obj_type_kv_li[obj_name]['table_name']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]['table_name']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]['table_name']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
# Updated 2024-08-14
# Check the table (view) name to make sure it is valid.
if tbl_alt: # and tbl_alt != 'default':
try:
table_name = obj_type_kv_li[obj_name][f'tbl_{tbl_alt}']
# table_name = obj_type_kv_li[obj_name].get(f'tbl_{tbl_alt}')
except:
table_name = obj_type_kv_li[obj_name]['tbl']
log.warning(f'tbl_alt ({tbl_alt}) was not found. Using {table_name} table.')
log.info(f'tbl_alt ({tbl_alt}) was found. Using {table_name} table.')
else:
table_name = obj_type_kv_li[obj_name]['tbl']
log.warning(f'tbl_alt ({tbl_alt}) was not found. Using {table_name} table.')
# Check the model name to make sure it is valid.
if mdl_alt:
try:
base_name = obj_type_kv_li[obj_name][f'mdl_{mdl_alt}']
# base_name = obj_type_kv_li[obj_name].get(f'mdl_{mdl_alt}')
except:
base_name = obj_type_kv_li[obj_name]['mdl']
log.info(f'mdl_alt ({mdl_alt}) was found. Using {base_name} model.')
else:
base_name = obj_type_kv_li[obj_name]['mdl']
log.warning(f'mdl_alt ({mdl_alt}) was not found. Using {base_name} model.')
# NOTE: Add a check for the object ID... assuming it is a random ID string for now.
if sql_result := sql_select(table_name=table_name, record_id_random=obj_id):
log.debug(sql_result)
if base_name:
log.info(f'base_name was found. Returning data using {base_name} model.')
resp_data = base_name(**sql_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
else:
log.info('base_name model was not found. Returning raw data.')
resp_data = sql_result
return mk_resp(data=resp_data, response=commons.response) #, details=debug_data)
else:
log.debug(sql_result)
return mk_resp(data=False, status_code=404, response=commons.response)
# ### BEGIN ### API CRUD ### patch_obj() ###
# Updated 2024-03-08
@router.patch('/{obj_type_l1}/{obj_id}')
@router.patch('/{obj_type_l1}/{obj_type_l2}/{obj_id}')
@router.patch('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}')
async def patch_obj(
crud: Api_Crud_Base,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
obj_type_l2: str = None,
obj_type_l3: str = None,
run_safety_check: bool = True,
# for_obj_type: Optional[str] = Query(None, max_length=50),
# for_obj_id: Optional[str] = Query(None, max_length=22),
# The view name will be prefixed with "v_" and must be a valid view name based on the object type name from the URL. obj_type_l1, obj_type_l2, obj_type_l3 combined below as obj_name
return_obj: Optional[bool] = True, # I am not sure how to make this work yet. -2024-03-08
obj_v_name: Optional[str] = None, # Use view name to help return the object type. -2024-03-08
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Simple patch object type with an ID:
- **obj_type_l1, obj_type_l2, obj_type_l3**:
- Examples:
- /account = account
- /user = user
- /user/role = user_role
- /event = event
- /event/exhibit = event_exhibit
- /order = order
- /order/cart = order_cart
- /order/cart/line = order_cart_line
- /lu/some_lookup = lu_some_lookup
"""
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if crud.super_key == 'zp5PtX4zUsI': pass
elif crud.jwt:
# pass
log.warning('JWT was passed')
return mk_resp(data=False, status_code=501, response=commons.response, status_message='Token access for the API CRUD has not been implemented yet.')
else:
log.warning('Access key is missing or incorrect')
return mk_resp(data=False, status_code=400, response=commons.response)
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# time.sleep(2.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
debug_data = {}
debug_data['crud'] = crud
debug_data['create_key'] = crud.create_key
debug_data['read_key'] = crud.read_key
debug_data['update_key'] = crud.update_key
debug_data['delete_key'] = crud.delete_key
debug_data['data_list'] = crud.data_list
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
debug_data['obj_id'] = obj_id
log.debug(debug_data)
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]
#table_name = obj_type_kv_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
table_name = obj_type_kv_li[obj_name].get('tbl_name_update')
exclude = obj_type_kv_li[obj_name].get('exclude_for_db')
# ### SECTION ### Secondary data validation
obj_id_random = obj_id # This might need to be used later for the response data
if obj_id := redis_lookup_id_random(record_id_random=obj_id, table_name=table_name): pass
else: return mk_resp(data=None, status_code=404, response=commons.response, status_message='The object ID was invalid or not found.')
# NOTE: Doing a quick sanity check based on the object models and then dump to a dict to get rid of invalid fields. The other option is to just use the crud.data_list raw.
crud_data = crud.data_list
log.debug(crud_data.keys())
field_list = crud_data.keys()
if run_safety_check:
log.info('Running safety check by default')
base_name = obj_type_kv_li[obj_name]['base_name']
try:
obj_model = base_name(**crud.data_list)
except Exception as e:
log.error('An unknown exception happened. Returning False.')
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
log.warning(json.dumps(crud.data_list, indent=4))
return mk_resp(data=False, status_code=400, response=commons.response, status_message='There was likely a validation error. Returned False.')
else:
log.info('Successfully updated the object model.')
pass
log.debug(obj_model)
obj_dict = obj_model.dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset, include=field_list)
log.debug(obj_dict)
crud_data = obj_dict
else:
log.warning('The default safety check was not run!')
obj_dict = crud_data
# NOTE: Add a check for the object ID... assuming it is a random ID string for now. Using rm_id_random. That helps with some field names.
if sql_result := sql_update(data=crud_data, table_name=table_name, record_id=obj_id, rm_id_random=True, log_lvl=logging.DEBUG):
log.info('The record was updated.')
log.debug(sql_result)
resp_data = {}
resp_data['table_name'] = table_name
resp_data['request_data'] = obj_dict
resp_data['obj_id'] = obj_id
resp_data['obj_id_random'] = obj_id_random
# Comment out the following line if you do not want to allo for returning the updated data.
# return mk_resp(data=resp_data, response=commons.response) #, details=debug_data)
elif sql_result == None:
log.info('The record was probably not found to be updated.')
log.debug(sql_result)
return mk_resp(data=None, status_code=404, status_message='The record was probably not found to be updated.', response=commons.response)
else:
log.info('Something unexpected happened while trying to run the SQL UPDATE. The fields or field values passed may not be valid for the table and or model.')
log.debug(sql_result)
return mk_resp(data=False, status_code=400, status_message='Something unexpected happened while trying to runt he SQL UPDATE. The fields or field values passed may not be valid for the table and or model.', response=commons.response)
# ### SECTION ### Return successful results
# NOTE: obj_id was found and verified above
# New: 2024-03-08
if return_obj:
if obj_v_name:
# We should add a sanity check here to make sure the view name is valid first.
table_name = f'v_{obj_v_name}'
# Remove an extra "v_" if it is there.
if table_name.startswith('v_v_'):
table_name = table_name[2:]
# We can do more later... like check against an allowed list of view names per Aether object type
else:
# Use the table_name as a backup option? This should be safe since it is also passed through the model.
table_name = obj_type_kv_li[obj_name]['table_name']
base_name = obj_type_kv_li[obj_name]['base_name']
if sql_select_result := sql_select(table_name=table_name, record_id=obj_id):
log.debug(sql_select_result)
log.debug(base_name)
resp_data = base_name(**sql_select_result).dict(by_alias=True, exclude_unset=commons.exclude_unset)
log.debug(resp_data)
log.info(f'Returning object model {base_name} from table_name {table_name}; obj_id; obj_id_random={obj_id_random}) using PATCH data')
return mk_resp(data=resp_data, response=commons.response)
else:
log.debug(sql_select_result)
status_message = f'The record was not found in table_name={table_name} used for the SQL SELECT query. This may be a SQL VIEW and may be because the SQL VIEW needs to be created or updated.'
return mk_resp(data=False, status_code=404, response=commons.response, status_message=status_message)
log.info(f'Returning IDs (obj_id, obj_id_random={obj_id_random}) using PATCH data')
return mk_resp(data=resp_data, response=commons.response) #, details=debug_data)
# ### END ### API CRUD ### patch_obj() ###
# ### BEGIN ### API CRUD ### post_obj() ###
# Updated 2024-03-08
@router.post('/{obj_type_l1}')
@router.post('/{obj_type_l1}/{obj_type_l2}')
@router.post('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}')
async def post_obj(
crud: Api_Crud_Base,
obj_type_l1: Optional[str] = Path(..., max_length=50),
obj_type_l2: str = None,
obj_type_l3: str = None,
# obj_id: str = Path(min_length=11, max_length=22),
run_safety_check: bool = True,
# The view name will be prefixed with "v_" and must be a valid view name based on the object type name from the URL. obj_type_l1, obj_type_l2, obj_type_l3 combined below as obj_name
# for_obj_type: Optional[str] = Query(None, max_length=50),
# for_obj_id: Optional[str] = Query(None, max_length=22),
return_obj: Optional[bool] = True,
obj_v_name: Optional[str] = None, # Use view name to help return the object type. -2024-03-08
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Simple post object type:
- **obj_type_l1, obj_type_l2, obj_type_l3**:
- Examples:
- /account = account
- /user = user
- /user/role = user_role
- /event = event
- /event/exhibit = event_exhibit
- /order = order
- /order/cart = order_cart
- /order/cart/line = order_cart_line
- /lu/some_lookup = lu_some_lookup
"""
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if crud.super_key == 'zp5PtX4zUsI': pass
elif crud.jwt:
# pass
log.warning('JWT was passed')
return mk_resp(data=False, status_code=501, response=commons.response, status_message='Token access for the API CRUD has not been implemented yet.')
else:
log.warning('Access key is missing or incorrect')
return mk_resp(data=False, status_code=400, response=commons.response)
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# time.sleep(1.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
debug_data = {}
debug_data['crud'] = crud
debug_data['create_key'] = crud.create_key
debug_data['read_key'] = crud.read_key
debug_data['update_key'] = crud.update_key
debug_data['delete_key'] = crud.delete_key
debug_data['data_list'] = crud.data_list
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
# debug_data['obj_id'] = obj_id
log.debug(debug_data)
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]
#table_name = obj_type_kv_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
# ### SECTION ### Figure out the table_name to use
# Updated: 2024-03-08
view_name = None # This is the view name to use for the SQL SELECT query
if obj_v_name:
# We should add a sanity check here to make sure the view name is valid first.
view_name = f'v_{obj_v_name}'
# Remove an extra "v_" if it is there.
if view_name.startswith('v_v_'):
view_name = view_name[2:]
# We can do more later... like check against an allowed list of view names per Aether object type
elif obj_type_kv_li[obj_name].get('table_name'):
# Use the table_name as a backup option? This should be safe.
view_name = obj_type_kv_li[obj_name].get('table_name')
elif obj_type_kv_li[obj_name].get('table_name_alt'):
view_name = obj_type_kv_li[obj_name].get('table_name_alt')
elif obj_type_kv_li[obj_name].get('tbl_name_update'):
view_name = obj_type_kv_li[obj_name].get('tbl_name_update')
else:
log.error('The table_name was not found. This is a critical error. Returning False.')
return mk_resp(data=False, status_code=400, response=commons.response, status_message='The table_name was not found. This is a critical error.')
table_name = None # This is the table name to use for the SQL INSERT or UPDATE
if obj_type_kv_li[obj_name].get('tbl_name_update'):
table_name = obj_type_kv_li[obj_name].get('tbl_name_update')
elif obj_type_kv_li[obj_name].get('table_name'):
table_name = obj_type_kv_li[obj_name].get('table_name')
# This should be a view in most cases.
table_name_select = view_name
exclude = obj_type_kv_li[obj_name].get('exclude_for_db')
# ### SECTION ### Secondary data validation
# if obj_id := redis_lookup_id_random(record_id_random=obj_id, table_name=table_name): pass
# else: return mk_resp(data=None, status_code=404, response=commons.response, status_message='The object ID was invalid or not found.')
# NOTE: Doing a quick sanity check based on the object models and then dump to a dict to get rid of invalid fields. The other option is to just use the crud.data_list raw.
crud_data = crud.data_list
log.debug(crud_data.keys())
field_list = crud_data.keys()
if run_safety_check:
log.info('Running safety check by default')
base_name = obj_type_kv_li[obj_name]['base_name']
try:
obj_model = base_name(**crud.data_list)
except Exception as e:
log.error('An unknown exception happened. Returning False.')
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
log.warning(json.dumps(crud.data_list, indent=4))
return mk_resp(data=False, status_code=400, response=commons.response, status_message='There was likely a validation error. Returned False.')
else:
log.info('Successfully created the object model.')
pass
log.debug(obj_model)
obj_dict = obj_model.dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset, include=field_list)
log.debug(obj_dict)
crud_data = obj_dict
else:
log.warning('The default safety check was not run!')
obj_dict = crud_data
# NOTE: Add a check for the object ID... assuming it is a random ID string for now. Using rm_id_random. That helps with some field names.
if sql_result := sql_insert(data=crud_data, table_name=table_name, rm_id_random=True, log_lvl=logging.INFO):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('The record was inserted.')
log.debug(sql_result)
resp_data = {}
resp_data['table_name'] = table_name
resp_data['request_data'] = obj_dict
obj_id = sql_result # The ID should be returned
resp_data['obj_id'] = obj_id
obj_id_random = get_id_random(record_id=obj_id, table_name=table_name)
resp_data['obj_id_random'] = obj_id_random
# NOTE: obj_id was found and verified above
# Updated: 2024-03-08
if return_obj:
log.info('Returning object created from POST data')
# This is the more or less the same as what is done in the patch_obj() endpoint.
if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id):
log.debug(sql_select_result)
log.debug(base_name)
resp_data = base_name(**sql_select_result).dict(by_alias=True, exclude_unset=commons.exclude_unset)
log.debug(resp_data)
log.info(f'Returning object model ({base_name}; obj_id; obj_id_random={obj_id_random}) from POST data')
return mk_resp(data=resp_data, response=commons.response)
else:
log.debug(sql_select_result)
status_message = 'The record was not found. This may be because a SQL VIEW was used for the SQL SELECT query?'
return mk_resp(data=False, status_code=404, response=commons.response, status_message=status_message)
log.info('Returning IDs only created from POST data')
return mk_resp(data=resp_data, response=commons.response) #, details=debug_data)
elif sql_result == None:
log.info('The record was probably not found to be updated.')
log.debug(sql_result)
return mk_resp(data=None, status_code=404, status_message='The record was probably not found to be updated.', response=commons.response)
else:
log.info('Something unexpected happened while trying to runt he SQL UPDATE. The fields or field values passed may not be valid for the table and or model.')
log.debug(sql_result)
return mk_resp(data=False, status_code=400, status_message='Something unexpected happened while trying to run the SQL UPDATE. The fields or field values passed may not be valid for the table and or model.', response=commons.response)
# ### END ### API CRUD ### post_obj() ###
# ### BEGIN ### API CRUD ### delete_obj() ###
# Updated 2023-07-10
@router.delete('/{obj_type_l1}/{obj_id}')
@router.delete('/{obj_type_l1}/{obj_type_l2}/{obj_id}')
@router.delete('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}')
async def delete_obj(
obj_type_l1: str=None,
obj_type_l2: str=None,
obj_type_l3: str=None,
obj_id: str=None,
method: str = 'delete', # None, delete, disable, hide
# x_account_id: str = Header(...),
# response: Response = Response,
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Simple delete object type with an ID:
- **obj_type_l1, obj_type_l2, obj_type_l3**:
- Examples:
- /account = account
- /user = user
- /user/role = user_role
- /event = event
- /event/exhibit = event_exhibit
- /event/person/profile = event_person_profile
- /order = order
- /order/line = order_line
- /lu/some_lookup = lu_some_lookup
"""
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
debug_data = {}
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
debug_data['obj_id'] = obj_id
log.debug(debug_data)
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# time.sleep(1.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_kv_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_kv_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_kv_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
table_name = obj_name # obj_type_kv_li[obj_name]['table_name'] # NOTE: Don't try to use the view!
# NOTE: Add a check for the object ID... assuming it is a random ID string for now.
if method == 'delete' or method is None:
sql_result = sql_delete(table_name=table_name, record_id_random=obj_id)
log.debug(sql_result)
elif method == 'disable':
data = {'enable': False}
sql_result = sql_update(data=data, table_name=table_name, record_id_random=obj_id, rm_id_random=True, log_lvl=logging.INFO)
elif method == 'hide':
data = {'hide': True}
sql_result = sql_update(data=data, table_name=table_name, record_id_random=obj_id, rm_id_random=True, log_lvl=logging.INFO)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
if sql_result:
resp_data = True
log.info('The record was found and deleted or updated.')
elif sql_result == None:
resp_data = None
log.info('The record was probably not found to be deleted and or updated.')
else:
resp_data = False
log.info('Something unexpected happened while trying to run the SQL DELETE and or UPDATE. The fields or field values passed may not be valid for the table.')
resp_details = ''
if method == 'delete' and sql_result:
resp_details = f'Object type: {obj_name} Object ID: {obj_id}; deleted'
return mk_resp(data=resp_data, details=resp_details, response=commons.response) #, details=debug_data)
elif method == 'hide' and sql_result:
resp_details = f'Object type: {obj_name} Object ID: {obj_id}; hidden'
return mk_resp(data=resp_data, details=resp_details, response=commons.response) #, details=debug_data)
elif method == 'disable' and sql_result:
resp_details = f'Object type: {obj_name} Object ID: {obj_id}; disabled'
return mk_resp(data=resp_data, details=resp_details, response=commons.response) #, details=debug_data)
elif sql_result is None:
resp_details = f'Not found: Object type: {obj_name} Object ID: {obj_id}; {method}'
return mk_resp(data=resp_data, status_code=404, details=resp_details, response=commons.response) #, details=debug_data)
else:
resp_details = f'Unexpected result: Object type: {obj_name} Object ID: {obj_id}; {method}'
return mk_resp(data=resp_data, status_code=400, details=resp_details, response=commons.response) #, details=debug_data)
# ### END ### API CRUD ### delete_obj() ###
def post_obj_template(
obj_type: str,
data: dict,
id_random_length: int = 8, # Added 2023-04-13; need to move away from this
return_obj: bool = True,
by_alias: bool = True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_data_dict = data
obj_data = lookup_id_random_pop(obj_data_dict)
table_name_insert = obj_type
table_name_select = obj_type_kv_li[obj_type]['table_name']
base_name = obj_type_kv_li[obj_type]['base_name']
# # Prune any keys that are not actual columns on the target table to avoid
# # SQL errors when clients include convenience fields (e.g., account_id)
# try:
# from app import lib_sql_core
# from sqlalchemy import text
# with lib_sql_core.engine.connect() as conn:
# cols_res = conn.execute(text(f"DESCRIBE `{table_name_insert}`;"))
# cols = [r[0] for r in cols_res.fetchall()]
# # keep only keys that match real columns (always allow id_random)
# obj_data = {k: v for k, v in obj_data.items() if k in cols or k == 'id_random'}
# except Exception as _:
# # If DESCRIBE fails for any reason, fall back to original obj_data
# log.debug(f"Could not inspect table columns for {table_name_insert}; proceeding without pruning.")
if sql_insert_result := sql_insert(table_name=table_name_insert, data=obj_data, id_random_length=id_random_length):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(sql_insert_result)
obj_id = sql_insert_result
else:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(sql_insert_result)
return mk_resp(data=False, status_code=400, response=response)
if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id):
log.debug(sql_select_result)
resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=resp_data, response=response)
else:
log.debug(sql_select_result)
return mk_resp(data=False, status_code=404, response=response)
def patch_obj_template(
obj_type: str,
data: dict,
obj_id: str,
return_obj: bool=True,
by_alias: bool=True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_data_dict = data
#obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type)
obj_data_dict['id_random'] = obj_id # NOTE: Adding this in so the id_random is NOT updated
log.debug(obj_data_dict)
obj_data = lookup_id_random_pop(obj_data_dict)
table_name_update = obj_type
table_name_select = obj_type_kv_li[obj_type]['table_name']
base_name = obj_type_kv_li[obj_type]['base_name']
if sql_update_result := sql_update(table_name=table_name_update, data=obj_data):
log.debug(sql_update_result)
#obj_id = sql_update_result
obj_id = obj_data_dict['id']
else:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(sql_update_result)
return mk_resp(data=False, status_code=400, response=response)
if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id):
log.debug(sql_select_result)
resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=resp_data, response=response)
else:
log.debug(sql_select_result)
return mk_resp(data=False, status_code=404, response=response)
def get_obj_li_template(
obj_type: str = Query(None, max_length=50),
for_obj_type: Optional[str] = Query(None, max_length=50),
for_obj_id: Optional[Union[int,str]] = None,
by_alias: Optional[bool] = True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if isinstance(for_obj_id, int):
pass
elif isinstance(for_obj_id, str):
for_obj_id_random = for_obj_id
for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id_random, table_name=for_obj_type)
table_name_select = obj_type_kv_li[obj_type]['table_name']
if for_obj_type and for_obj_id:
field_name = f'{for_obj_type}_id'
sql_result = sql_select(table_name=table_name_select, field_name=field_name, field_value=for_obj_id)
else:
sql_result = sql_select(table_name=table_name_select)
log.debug(sql_result)
base_name = obj_type_kv_li[obj_type]['base_name']
resp_data_li = []
for record in sql_result:
resp_data = base_name(**record).dict(by_alias=by_alias, exclude_unset=exclude_unset)
resp_data_li.append(resp_data)
return mk_resp(data=resp_data_li, response=response)
def get_obj_template(
obj_id: Union[int,str],
obj_type: str = Query(None, max_length=50),
by_alias: Optional[bool] = True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if isinstance(obj_id, int):
pass
elif isinstance(obj_id, str):
obj_id_random = obj_id
obj_id = redis_lookup_id_random(record_id_random=obj_id_random, table_name=obj_type)
table_name_select = obj_type_kv_li[obj_type]['table_name']
if obj_id:
sql_result = sql_select(table_name=table_name_select, record_id=obj_id)
else:
return mk_resp(data=False, status_code=404, response=response)
if sql_result:
log.debug(sql_result)
base_name = obj_type_kv_li[obj_type]['base_name']
resp_data = base_name(**sql_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=resp_data, response=response)
else:
log.debug(sql_result)
return mk_resp(data=False, status_code=404, response=response)
def delete_obj_template(
obj_type: str = Query(None, max_length=50),
obj_id: str = Query(None, max_length=22),
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
debug_data = {}
debug_data['obj_type'] = obj_type
debug_data['obj_id'] = obj_id
log.debug(debug_data)
table_name_delete = obj_type # NOTE: Not using the table name from the object type list because it may be a view (v_*).
# NOTE: Add a check for the object ID... assuming it is a random ID string for now.
if sql_result := sql_delete(table_name=table_name_delete, record_id_random=obj_id):
log.debug(sql_result)
return mk_resp(data=True, response=response)
else:
log.debug(sql_result)
return mk_resp(data=False, status_code=404, response=response)
# New dynamic API CRUD endpoint
# The POST data should be JSON formatted
# @router.post('/query')
# def query(
# # qry JSON should contain these properties:
# # list: for_obj_type, for_obj_id, tbl_view_name, base_name
# # id: obj_id, tbl_view_name, base_name
# qry: str,
# commons: Common_Route_Params = Depends(common_route_params),
# ):
# log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(locals())
# import urllib