Files
OSIT-AE-API-FastAPI/app/routers/api_crud_v2.py

1181 lines
51 KiB
Python

import datetime, json, time
#from datetime import datetime, time, timedelta
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, Response, status
from fastapi.responses import FileResponse
# from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from app.lib_general import log, logging, common_route_params, Common_Route_Params
# from app.config import settings
from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random, lookup_id_random_pop
from app.ae_obj_types_def import *
router = APIRouter()
# Working on the basic API CRUD - STI 2021-03-08
# Updated 2023-07-06
@router.get('/{obj_type_l1}/list')
@router.get('/{obj_type_l1}/{obj_type_l2}/list')
@router.get('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/list')
async def get_obj_li(
obj_type_l1: str = Query(..., min_length=2, max_length=50),
obj_type_l2: str = Query(None, min_length=2, max_length=50),
obj_type_l3: str = Query(None, min_length=2, max_length=50),
for_obj_type: Optional[str] = Query(None, max_length=50),
for_obj_id: Optional[str] = Query(None, max_length=22),
tbl_alt: Optional[str] = Query('default', max_length=50), # This is used as a lookup for the real SQL database table or view name to use.
mdl_alt: Optional[str] = Query('default', max_length=50), # This is used as a lookup for the real Python Pydantic model name to use.
# use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-11-17
# use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-11-17
# field_qry_li: str = Query(None, max_length=150), # JSON formatted key value pair list of fields to search.
# fulltext_qry_li: str = Query(None, max_length=150), # JSON formatted key value pair list of fields to search.
# fulltext_qry_field_li: str = Header(None), # Json formatted string list of fields to search. It is not ideal that this is in the header. Need a better option, but this is currently a GET request.
# fulltext_qry_str: str = Query(None, max_length=150),
hidden: str = 'not_hidden', # hidden, not_hidden, all,
# order_by_li: dict = None,
order_by_li: str = Header(None), # JSON formatted string in a key value format. It is not ideal that this is in the header. Need a better option, but this is currently a GET request.
# dh_order_by_li: str = Header(None),
# dh_testing: str = Header(None),
# h_order_by_li: str = Header(None),
# h_testing: str = Header(None),
# include: Optional[list] = [],
# exclude: Optional[list] = [],
# exclude_none: Optional[bool] = True,
# Get the "json" param from the query string. This is a JSON formatted string of the data to be inserted.
jp: Optional[Union[str, None]] = None,
file_type: str = 'CSV', # CSV, Excel
return_file: Optional[bool] = False,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
import urllib
# This should be a dict list of fields with a list of values to search for using FULLTEXT.
fulltext_qry_dict_obj = None
# This should be a dict list of fields with a list of values to search for using AND.
and_qry_dict_obj = None
# This should be a dict list of fields with a list of values to search for using AND LIKE.
and_like_dict_obj = None
# This should be a dict list of fields with a list of values to search for using AND IN.
and_in_dict_li_obj = None
jp_obj = None
if jp:
log.setLevel(logging.INFO)
log.debug( urllib.parse.unquote(jp) )
try:
jp_obj = json.loads(urllib.parse.unquote(jp))
except Exception as e:
log.warning(e)
return mk_resp(data=False, status_code=400, response=commons.response, status_message='The JSON string was not formatted correctly.')
log.info(jp_obj)
if jp_obj.get('ft_qry'): # NOTE: This is for the fulltext query
fulltext_qry_dict_obj = jp_obj['ft_qry']
if jp_obj.get('and_qry'): # NOTE: This is for the additional AND clauses in the WHERE statement
and_qry_dict_obj = jp_obj['and_qry']
if jp_obj.get('and_like'): # NOTE: This is for the additional AND LIKE clauses in the WHERE statement
and_like_dict_obj = jp_obj['and_like']
if jp_obj.get('and_in_li'): # NOTE: This is for the additional AND IN clauses in the WHERE statement
and_in_dict_li_obj = jp_obj['and_in_li']
log.setLevel(logging.WARNING)
if order_by_li:
order_by_li = json.loads(order_by_li)
# # NOTE: This should eventually be used to pass small amounts of data to the API through the URL GET params. -2023-11-29
# if json_str: # NOTE: Currently this does absolutely nothing. It is here for future use. -2023-11-29
# log.debug( urllib.parse.unquote(json_str) )
# try:
# json_obj = json.loads(urllib.parse.unquote(json_str))
# except Exception as e:
# log.warning(e)
# return mk_resp(data=False, status_code=400, response=commons.response, status_message='The JSON string was not formatted correctly.')
# log.debug(json_obj)
debug_data = {}
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
#debug_data['obj_id'] = obj_id
debug_data['for_obj_type'] = for_obj_type
debug_data['for_obj_id'] = for_obj_id
debug_data['tbl_alt'] = tbl_alt
debug_data['mdl_alt'] = mdl_alt
# debug_data['use_alt_table'] = use_alt_table
# debug_data['use_alt_base'] = use_alt_base
debug_data['jp_obj'] = jp_obj
# debug_data['fulltext_qry_field_li'] = fulltext_qry_field_li
# debug_data['fulltext_qry_str'] = fulltext_qry_str
debug_data['hidden'] = hidden
debug_data['order_by_li'] = order_by_li
if obj_type_l1 == 'lu':
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
pass
log.debug(debug_data)
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_kv_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_kv_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_kv_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
# log.setLevel(logging.DEBUG)
log.info(f'obj_name: {obj_name}')
log.debug(obj_type_kv_li[obj_name])
if tbl_alt:
table_name = obj_type_kv_li[obj_name][f'tbl_{tbl_alt}']
# table_name = obj_type_kv_li[obj_name].get(f'tbl_{tbl_alt}')
log.info(f'tbl_alt was found. Using {table_name} table.')
if mdl_alt:
base_name = obj_type_kv_li[obj_name][f'mdl_{mdl_alt}']
# base_name = obj_type_kv_li[obj_name].get(f'mdl_{mdl_alt}')
log.info(f'mdl_alt was found. Using {base_name} model.')
# if use_alt_table:
# table_name = obj_type_kv_li[obj_name]['table_name_alt']
# else:
# table_name = obj_type_kv_li[obj_name]['table_name']
# if use_alt_base:
# base_name = obj_type_kv_li[obj_name]['base_name_alt']
# else:
# base_name = obj_type_kv_li[obj_name]['base_name']
if for_obj_type and for_obj_id:
for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(f'for_obj_type: {for_obj_type}')
log.debug(f'for_obj_id: {for_obj_id}')
# log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
field_name = f'{for_obj_type}_id'
# NOTE: The enabled and hidden parameters are new to this endpoint and the sql_select function! -2023-07-06
sql_result = sql_select(
table_name = table_name,
field_name = field_name,
field_value = for_obj_id,
enabled = commons.enabled,
hidden = hidden,
fulltext_qry_dict = fulltext_qry_dict_obj,
and_qry_dict = and_qry_dict_obj,
and_like_dict = and_like_dict_obj,
and_in_dict_li = and_in_dict_li_obj,
# fulltext_qry_field_li = fulltext_qry_field_li,
# fulltext_qry_str = fulltext_qry_str,
order_by_li = order_by_li,
limit = commons.limit,
offset = commons.offset,
as_list = True,
# log_lvl = logging.DEBUG
)
else:
# NOTE: The enabled and hidden parameters are new to this endpoint and the sql_select function! -2023-07-06
# NOTE: This call (without field_name, field_value, limit, offset) may need more testing.
sql_result = sql_select(
table_name = table_name,
enabled = commons.enabled,
hidden = hidden,
fulltext_qry_dict = fulltext_qry_dict_obj,
and_qry_dict = and_qry_dict_obj,
and_like_dict = and_like_dict_obj,
and_in_dict_li = and_in_dict_li_obj,
# fulltext_qry_field_li = fulltext_qry_field_li,
# fulltext_qry_str = fulltext_qry_str,
order_by_li = order_by_li,
limit = commons.limit,
offset = commons.offset,
as_list = True,
# log_lvl = logging.DEBUG
)
# log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(sql_result)
if sql_result:
if isinstance(sql_result, list):
# log.setLevel(logging.DEBUG)
resp_data_li = []
for record in sql_result:
if base_name:
log.info(f'base_name was found. Returning data using {base_name} model.')
resp_data = base_name(**record).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
else:
log.warning('base_name model was not found. Returning raw data.')
resp_data = record
resp_data_li.append(resp_data)
column_name_li = list(sql_result[0].keys()) # This should be the same for all records in the list.
if return_file:
log.setLevel(logging.DEBUG)
# We want to handle any field that has a suffix of _json. It should be expanded into a list of fields that are prefixed with the original field name minus _json.
# This will also allow us to export the data to a CSV or Excel file with the correct column names.
# additional_json_field_list_for_export = []
new_resp_data_li = []
for record in resp_data_li:
new_record = record.copy()
for field_name in record.keys():
field_value = record[field_name]
if field_name.endswith('li_json') and record[field_name]:
log.info(f'Found a field that ends with li_json: {field_name}')
log.info(f'Field value is a list??: {field_value}')
# field_value = json.loads(record[key]) # Convert the string to a list of dictionaries
# Example JSON data: {'facebook': 'https://www.facebook.com/example', 'twitter': 'https://twitter.com/example', 'instagram': 'https://www.instagram.com/example/', 'linkedin': 'https://www.linkedin.com/school/example', 'org': 'https://example.com/'}
# Example new fields: social__facebook, social__twitter, social__instagram, social__linkedin, social__org
if isinstance(field_value, list):
# Loop through the list of dictionaries
log.info(f'Field value is a list: {field_value}')
for item in field_value:
# item = json.loads(item) # Convert the string to a dictionary
for key, value in item.items():
new_field_name = field_name[:-8]+'__'+key
new_record[new_field_name] = value
else:
# Loop through key value pairs in the dictionary
log.info(f'Field value is a dict: {field_value}')
for key, value in field_value.items():
new_field_name = field_name[:-8]+'__'+key
new_record[new_field_name] = value
# Create a new field for each and value to the record
# new_field_name = key[:-8]+'__cust_li'
# new_record[new_field_name] = record[key]
new_record.pop(field_name) # Remove the original field
elif field_name.endswith('_json') and record[field_name]:
log.info(f'Found a field that ends with _json: {field_name}')
log.info(f'Field value is a dict???: {field_value}')
for key, value in field_value.items():
new_field_name = field_name[:-5]+'__'+key
new_record[new_field_name] = value
new_record.pop(field_name) # Remove the original field
elif field_name.endswith('li_json') or field_name.endswith('_json'):
log.info(f'Found a field that ends with li_json or _json but no value: {field_name}')
new_record.pop(field_name) # Remove the original field
new_resp_data_li.append(new_record)
datetime_format='%Y-%m-%d_%H%M'
# current_datetime = datetime.datetime.now() # Servers timezone (Eastern)
current_datetime_utc = datetime.datetime.utcnow() # UTC timezone
current_datetime_utc = current_datetime_utc.strftime(datetime_format)
filename = f'{obj_name}_list_{current_datetime_utc}'
if file_type == 'CSV':
filename_w_ext = filename+'.csv'
elif file_type == 'Excel':
filename_w_ext = filename+'.xlsx'
log.setLevel(logging.INFO)
if result := create_export_file(data_dict_list=new_resp_data_li, column_name_li=[], subdir_path=obj_name, filename=filename, rm_id=True, export_type=file_type):
log.info(f'Export file created and saved: {result}')
else:
log.error('Something went wrong while creating or saving the export file')
tmp_file_path = result
log.info(f'Filename: {filename_w_ext}')
if full_tmp_path := return_full_tmp_path(full_tmp_path=tmp_file_path):
return FileResponse(path=full_tmp_path, filename=filename_w_ext)
else:
return mk_resp(data=resp_data_li, response=commons.response)
else:
status_message='Not Implemented (sort of). Attempted to process this request. Got a SQL result, but the returned data was unexpected.'
return mk_resp(data=sql_result, response=commons.response, status_code=501, status_message=status_message) # Returns "Not Implemented" (sort of... unexpected response)
else: return mk_resp(data=None, response=commons.response, status_code=404)
# Updated 2023-11-03
@router.get('/{obj_type_l1}/{obj_id}')
@router.get('/{obj_type_l1}/{obj_type_l2}/{obj_id}')
@router.get('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}')
async def get_obj(
obj_type_l1: str=None,
obj_type_l2: str=None,
obj_type_l3: str=None,
obj_id: str=None,
use_alt_table: bool = False, # NOTE: This will use table_name_alt if they exist. -2023-12-01
use_alt_base: bool = False, # NOTE: This will use base_name_alt if they exist. -2023-12-01
# for_obj_type: Optional[str] = Query(None, max_length=50), # NOTE: This is not currently used. It is here for future use.
# for_obj_id: Optional[str] = Query(None, max_length=22), # NOTE: This is not currently used. It is here for future use.
# qry_str: Optional[str] = Query(None, max_length=50),
# qry_int: Optional[int] = None,
# include: Optional[list] = [],
# exclude: Optional[list] = [],
# exclude_none: Optional[bool] = True,
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Simple select object type with an ID:
- **obj_type_l1, obj_type_l2, obj_type_l3**:
- Examples:
- /account = account
- /user = user
- /user/role = user_role
- /event = event
- /event/exhibit = event_exhibit
- /order = order
- /order/cart = order_cart
- /order/cart/line = order_cart_line
- /lu/some_lookup = lu_some_lookup
"""
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# time.sleep(2.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
debug_data = {}
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
debug_data['obj_id'] = obj_id
debug_data['use_alt_table'] = use_alt_table
debug_data['use_alt_base'] = use_alt_base
log.debug(debug_data)
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]
#table_name = obj_type_kv_li[obj_name]['table_name']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]['table_name']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]['table_name']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
if use_alt_table:
table_name = obj_type_kv_li[obj_name]['table_name_alt']
else:
table_name = obj_type_kv_li[obj_name]['table_name']
if use_alt_base:
base_name = obj_type_kv_li[obj_name]['base_name_alt']
# log.setLevel(logging.DEBUG)
log.debug(debug_data)
else:
base_name = obj_type_kv_li[obj_name]['base_name']
# NOTE: Add a check for the object ID... assuming it is a random ID string for now.
if sql_result := sql_select(table_name=table_name, record_id_random=obj_id):
log.debug(sql_result)
if base_name:
log.info(f'base_name was found. Returning data using {base_name} model.')
resp_data = base_name(**sql_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
else:
log.info('base_name model was not found. Returning raw data.')
resp_data = sql_result
return mk_resp(data=resp_data, response=commons.response) #, details=debug_data)
else:
log.debug(sql_result)
return mk_resp(data=False, status_code=404, response=commons.response)
# ### BEGIN ### API CRUD ### patch_obj() ###
# Updated 2024-03-08
@router.patch('/{obj_type_l1}/{obj_id}')
@router.patch('/{obj_type_l1}/{obj_type_l2}/{obj_id}')
@router.patch('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}')
async def patch_obj(
crud: Api_Crud_Base,
obj_type_l1: Optional[str] = Query(..., max_length=50),
obj_type_l2: str = None,
obj_type_l3: str = None,
obj_id: str = Query(..., min_length=11, max_length=22),
run_safety_check: bool = True,
# for_obj_type: Optional[str] = Query(None, max_length=50),
# for_obj_id: Optional[str] = Query(None, max_length=22),
# The view name will be prefixed with "v_" and must be a valid view name based on the object type name from the URL. obj_type_l1, obj_type_l2, obj_type_l3 combined below as obj_name
return_obj: Optional[bool] = True, # I am not sure how to make this work yet. -2024-03-08
obj_v_name: Optional[str] = None, # Use view name to help return the object type. -2024-03-08
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Simple patch object type with an ID:
- **obj_type_l1, obj_type_l2, obj_type_l3**:
- Examples:
- /account = account
- /user = user
- /user/role = user_role
- /event = event
- /event/exhibit = event_exhibit
- /order = order
- /order/cart = order_cart
- /order/cart/line = order_cart_line
- /lu/some_lookup = lu_some_lookup
"""
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if crud.super_key == 'zp5PtX4zUsI': pass
elif crud.jwt:
# pass
log.warning('JWT was passed')
return mk_resp(data=False, status_code=501, response=commons.response, status_message='Token access for the API CRUD has not been implemented yet.')
else:
log.warning('Access key is missing or incorrect')
return mk_resp(data=False, status_code=400, response=commons.response)
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# time.sleep(2.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
debug_data = {}
debug_data['crud'] = crud
debug_data['create_key'] = crud.create_key
debug_data['read_key'] = crud.read_key
debug_data['update_key'] = crud.update_key
debug_data['delete_key'] = crud.delete_key
debug_data['data_list'] = crud.data_list
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
debug_data['obj_id'] = obj_id
log.debug(debug_data)
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]
#table_name = obj_type_kv_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
table_name = obj_type_kv_li[obj_name].get('tbl_name_update')
exclude = obj_type_kv_li[obj_name].get('exclude_for_db')
# ### SECTION ### Secondary data validation
obj_id_random = obj_id # This might need to be used later for the response data
if obj_id := redis_lookup_id_random(record_id_random=obj_id, table_name=table_name): pass
else: return mk_resp(data=None, status_code=404, response=commons.response, status_message='The object ID was invalid or not found.')
# NOTE: Doing a quick sanity check based on the object models and then dump to a dict to get rid of invalid fields. The other option is to just use the crud.data_list raw.
crud_data = crud.data_list
log.debug(crud_data.keys())
field_list = crud_data.keys()
if run_safety_check:
log.info('Running safety check by default')
base_name = obj_type_kv_li[obj_name]['base_name']
try:
obj_model = base_name(**crud.data_list)
except Exception as e:
log.error('An unknown exception happened. Returning False.')
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
log.warning(json.dumps(crud.data_list, indent=4))
return mk_resp(data=False, status_code=400, response=commons.response, status_message='There was likely a validation error. Returned False.')
else:
log.info('Successfully updated the object model.')
pass
log.debug(obj_model)
obj_dict = obj_model.dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset, include=field_list)
log.debug(obj_dict)
crud_data = obj_dict
else:
log.warning('The default safety check was not run!')
obj_dict = crud_data
# NOTE: Add a check for the object ID... assuming it is a random ID string for now. Using rm_id_random. That helps with some field names.
if sql_result := sql_update(data=crud_data, table_name=table_name, record_id=obj_id, rm_id_random=True, log_lvl=logging.DEBUG):
log.info('The record was updated.')
log.debug(sql_result)
resp_data = {}
resp_data['table_name'] = table_name
resp_data['request_data'] = obj_dict
resp_data['obj_id'] = obj_id
resp_data['obj_id_random'] = obj_id_random
# Comment out the following line if you do not want to allo for returning the updated data.
# return mk_resp(data=resp_data, response=commons.response) #, details=debug_data)
elif sql_result == None:
log.info('The record was probably not found to be updated.')
log.debug(sql_result)
return mk_resp(data=None, status_code=404, status_message='The record was probably not found to be updated.', response=commons.response)
else:
log.info('Something unexpected happened while trying to run the SQL UPDATE. The fields or field values passed may not be valid for the table and or model.')
log.debug(sql_result)
return mk_resp(data=False, status_code=400, status_message='Something unexpected happened while trying to runt he SQL UPDATE. The fields or field values passed may not be valid for the table and or model.', response=commons.response)
# ### SECTION ### Return successful results
# NOTE: obj_id was found and verified above
# New: 2024-03-08
if return_obj:
if obj_v_name:
# We should add a sanity check here to make sure the view name is valid first.
table_name = f'v_{obj_v_name}'
# Remove an extra "v_" if it is there.
if table_name.startswith('v_v_'):
table_name = table_name[2:]
# We can do more later... like check against an allowed list of view names per Aether object type
else:
# Use the table_name as a backup option? This should be safe since it is also passed through the model.
table_name = obj_type_kv_li[obj_name]['table_name']
base_name = obj_type_kv_li[obj_name]['base_name']
if sql_select_result := sql_select(table_name=table_name, record_id=obj_id):
log.debug(sql_select_result)
log.debug(base_name)
resp_data = base_name(**sql_select_result).dict(by_alias=True, exclude_unset=commons.exclude_unset)
log.debug(resp_data)
log.info(f'Returning object model {base_name} from table_name {table_name}; obj_id; obj_id_random={obj_id_random}) using PATCH data')
return mk_resp(data=resp_data, response=commons.response)
else:
log.debug(sql_select_result)
status_message = f'The record was not found in table_name={table_name} used for the SQL SELECT query. This may be a SQL VIEW and may be because the SQL VIEW needs to be created or updated.'
return mk_resp(data=False, status_code=404, response=commons.response, status_message=status_message)
log.info(f'Returning IDs (obj_id, obj_id_random={obj_id_random}) using PATCH data')
return mk_resp(data=resp_data, response=commons.response) #, details=debug_data)
# ### END ### API CRUD ### patch_obj() ###
# ### BEGIN ### API CRUD ### post_obj() ###
# Updated 2024-03-08
@router.post('/{obj_type_l1}')
@router.post('/{obj_type_l1}/{obj_type_l2}')
@router.post('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}')
async def post_obj(
crud: Api_Crud_Base,
obj_type_l1: Optional[str] = Query(..., max_length=50),
obj_type_l2: str = None,
obj_type_l3: str = None,
# obj_id: str = Query(..., min_length=11, max_length=22),
run_safety_check: bool = True,
# The view name will be prefixed with "v_" and must be a valid view name based on the object type name from the URL. obj_type_l1, obj_type_l2, obj_type_l3 combined below as obj_name
# for_obj_type: Optional[str] = Query(None, max_length=50),
# for_obj_id: Optional[str] = Query(None, max_length=22),
return_obj: Optional[bool] = True,
obj_v_name: Optional[str] = None, # Use view name to help return the object type. -2024-03-08
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Simple post object type:
- **obj_type_l1, obj_type_l2, obj_type_l3**:
- Examples:
- /account = account
- /user = user
- /user/role = user_role
- /event = event
- /event/exhibit = event_exhibit
- /order = order
- /order/cart = order_cart
- /order/cart/line = order_cart_line
- /lu/some_lookup = lu_some_lookup
"""
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if crud.super_key == 'zp5PtX4zUsI': pass
elif crud.jwt:
# pass
log.warning('JWT was passed')
return mk_resp(data=False, status_code=501, response=commons.response, status_message='Token access for the API CRUD has not been implemented yet.')
else:
log.warning('Access key is missing or incorrect')
return mk_resp(data=False, status_code=400, response=commons.response)
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# time.sleep(1.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
debug_data = {}
debug_data['crud'] = crud
debug_data['create_key'] = crud.create_key
debug_data['read_key'] = crud.read_key
debug_data['update_key'] = crud.update_key
debug_data['delete_key'] = crud.delete_key
debug_data['data_list'] = crud.data_list
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
# debug_data['obj_id'] = obj_id
log.debug(debug_data)
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]
#table_name = obj_type_kv_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_kv_li:
#table_name = obj_type_kv_li[obj_name]['tbl_name_update']
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
# ### SECTION ### Figure out the table_name to use
# Updated: 2024-03-08
view_name = None # This is the view name to use for the SQL SELECT query
if obj_v_name:
# We should add a sanity check here to make sure the view name is valid first.
view_name = f'v_{obj_v_name}'
# Remove an extra "v_" if it is there.
if view_name.startswith('v_v_'):
view_name = view_name[2:]
# We can do more later... like check against an allowed list of view names per Aether object type
elif obj_type_kv_li[obj_name].get('table_name'):
# Use the table_name as a backup option? This should be safe.
view_name = obj_type_kv_li[obj_name].get('table_name')
elif obj_type_kv_li[obj_name].get('table_name_alt'):
view_name = obj_type_kv_li[obj_name].get('table_name_alt')
elif obj_type_kv_li[obj_name].get('tbl_name_update'):
view_name = obj_type_kv_li[obj_name].get('tbl_name_update')
else:
log.error('The table_name was not found. This is a critical error. Returning False.')
return mk_resp(data=False, status_code=400, response=commons.response, status_message='The table_name was not found. This is a critical error.')
table_name = None # This is the table name to use for the SQL INSERT or UPDATE
if obj_type_kv_li[obj_name].get('tbl_name_update'):
table_name = obj_type_kv_li[obj_name].get('tbl_name_update')
elif obj_type_kv_li[obj_name].get('table_name'):
table_name = obj_type_kv_li[obj_name].get('table_name')
# This should be a view in most cases.
table_name_select = view_name
exclude = obj_type_kv_li[obj_name].get('exclude_for_db')
# ### SECTION ### Secondary data validation
# if obj_id := redis_lookup_id_random(record_id_random=obj_id, table_name=table_name): pass
# else: return mk_resp(data=None, status_code=404, response=commons.response, status_message='The object ID was invalid or not found.')
# NOTE: Doing a quick sanity check based on the object models and then dump to a dict to get rid of invalid fields. The other option is to just use the crud.data_list raw.
crud_data = crud.data_list
log.debug(crud_data.keys())
field_list = crud_data.keys()
if run_safety_check:
log.info('Running safety check by default')
base_name = obj_type_kv_li[obj_name]['base_name']
try:
obj_model = base_name(**crud.data_list)
except Exception as e:
log.error('An unknown exception happened. Returning False.')
log.exception('**** *** ** * ### BEGIN ### Exception Happened: Returning False * ** *** ****')
log.warning(json.dumps(crud.data_list, indent=4))
return mk_resp(data=False, status_code=400, response=commons.response, status_message='There was likely a validation error. Returned False.')
else:
log.info('Successfully created the object model.')
pass
log.debug(obj_model)
obj_dict = obj_model.dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset, include=field_list)
log.debug(obj_dict)
crud_data = obj_dict
else:
log.warning('The default safety check was not run!')
obj_dict = crud_data
# NOTE: Add a check for the object ID... assuming it is a random ID string for now. Using rm_id_random. That helps with some field names.
if sql_result := sql_insert(data=crud_data, table_name=table_name, rm_id_random=True, log_lvl=logging.INFO):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info('The record was inserted.')
log.debug(sql_result)
resp_data = {}
resp_data['table_name'] = table_name
resp_data['request_data'] = obj_dict
obj_id = sql_result # The ID should be returned
resp_data['obj_id'] = obj_id
obj_id_random = get_id_random(record_id=obj_id, table_name=table_name)
resp_data['obj_id_random'] = obj_id_random
# NOTE: obj_id was found and verified above
# Updated: 2024-03-08
if return_obj:
log.info('Returning object created from POST data')
# This is the more or less the same as what is done in the patch_obj() endpoint.
if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id):
log.debug(sql_select_result)
log.debug(base_name)
resp_data = base_name(**sql_select_result).dict(by_alias=True, exclude_unset=commons.exclude_unset)
log.debug(resp_data)
log.info(f'Returning object model ({base_name}; obj_id; obj_id_random={obj_id_random}) from POST data')
return mk_resp(data=resp_data, response=commons.response)
else:
log.debug(sql_select_result)
status_message = 'The record was not found. This may be because a SQL VIEW was used for the SQL SELECT query?'
return mk_resp(data=False, status_code=404, response=commons.response, status_message=status_message)
log.info('Returning IDs only created from POST data')
return mk_resp(data=resp_data, response=commons.response) #, details=debug_data)
elif sql_result == None:
log.info('The record was probably not found to be updated.')
log.debug(sql_result)
return mk_resp(data=None, status_code=404, status_message='The record was probably not found to be updated.', response=commons.response)
else:
log.info('Something unexpected happened while trying to runt he SQL UPDATE. The fields or field values passed may not be valid for the table and or model.')
log.debug(sql_result)
return mk_resp(data=False, status_code=400, status_message='Something unexpected happened while trying to run the SQL UPDATE. The fields or field values passed may not be valid for the table and or model.', response=commons.response)
# ### END ### API CRUD ### post_obj() ###
# ### BEGIN ### API CRUD ### delete_obj() ###
# Updated 2023-07-10
@router.delete('/{obj_type_l1}/{obj_id}')
@router.delete('/{obj_type_l1}/{obj_type_l2}/{obj_id}')
@router.delete('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}')
async def delete_obj(
obj_type_l1: str=None,
obj_type_l2: str=None,
obj_type_l3: str=None,
obj_id: str=None,
method: str = 'delete', # None, delete, disable, hide
# x_account_id: str = Header(...),
# response: Response = Response,
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Simple delete object type with an ID:
- **obj_type_l1, obj_type_l2, obj_type_l3**:
- Examples:
- /account = account
- /user = user
- /user/role = user_role
- /event = event
- /event/exhibit = event_exhibit
- /event/person/profile = event_person_profile
- /order = order
- /order/line = order_line
- /lu/some_lookup = lu_some_lookup
"""
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
debug_data = {}
debug_data['obj_type_l1'] = obj_type_l1
debug_data['obj_type_l2'] = obj_type_l2
debug_data['obj_type_l3'] = obj_type_l3
debug_data['obj_id'] = obj_id
log.debug(debug_data)
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# time.sleep(1.5) # NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
# NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING NOTE: WARNING
if obj_type_l1 and obj_type_l2 and obj_type_l3:
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
if obj_name in obj_type_kv_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1 and obj_type_l2:
obj_name = f'{obj_type_l1}_{obj_type_l2}'
if obj_name in obj_type_kv_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
elif obj_type_l1:
obj_name = f'{obj_type_l1}'
if obj_name in obj_type_kv_li:
pass
else:
return mk_resp(data=False, status_code=400, response=commons.response)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
table_name = obj_name # obj_type_kv_li[obj_name]['table_name'] # NOTE: Don't try to use the view!
# NOTE: Add a check for the object ID... assuming it is a random ID string for now.
if method == 'delete' or method is None:
sql_result = sql_delete(table_name=table_name, record_id_random=obj_id)
log.debug(sql_result)
elif method == 'disable':
data = {'enable': False}
sql_result = sql_update(data=data, table_name=table_name, record_id_random=obj_id, rm_id_random=True, log_lvl=logging.INFO)
elif method == 'hide':
data = {'hide': True}
sql_result = sql_update(data=data, table_name=table_name, record_id_random=obj_id, rm_id_random=True, log_lvl=logging.INFO)
else:
log.warning('We should not be here')
return mk_resp(data=False, status_code=400, response=commons.response)
if sql_result:
resp_data = True
log.info('The record was found and deleted or updated.')
elif sql_result == None:
resp_data = None
log.info('The record was probably not found to be deleted and or updated.')
else:
resp_data = False
log.info('Something unexpected happened while trying to run the SQL DELETE and or UPDATE. The fields or field values passed may not be valid for the table.')
resp_details = ''
if method == 'delete' and sql_result:
resp_details = f'Object type: {obj_name} Object ID: {obj_id}; deleted'
return mk_resp(data=resp_data, details=resp_details, response=commons.response) #, details=debug_data)
elif method == 'hide' and sql_result:
resp_details = f'Object type: {obj_name} Object ID: {obj_id}; hidden'
return mk_resp(data=resp_data, details=resp_details, response=commons.response) #, details=debug_data)
elif method == 'disable' and sql_result:
resp_details = f'Object type: {obj_name} Object ID: {obj_id}; disabled'
return mk_resp(data=resp_data, details=resp_details, response=commons.response) #, details=debug_data)
elif sql_result is None:
resp_details = f'Not found: Object type: {obj_name} Object ID: {obj_id}; {method}'
return mk_resp(data=resp_data, status_code=404, details=resp_details, response=commons.response) #, details=debug_data)
else:
resp_details = f'Unexpected result: Object type: {obj_name} Object ID: {obj_id}; {method}'
return mk_resp(data=resp_data, status_code=400, details=resp_details, response=commons.response) #, details=debug_data)
# ### END ### API CRUD ### delete_obj() ###
def post_obj_template(
obj_type: str,
data: dict,
id_random_length: int = 8, # Added 2023-04-13; need to move away from this
return_obj: bool = True,
by_alias: bool = True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_data_dict = data
obj_data = lookup_id_random_pop(obj_data_dict)
table_name_insert = obj_type
table_name_select = obj_type_kv_li[obj_type]['table_name']
base_name = obj_type_kv_li[obj_type]['base_name']
if sql_insert_result := sql_insert(table_name=table_name_insert, data=obj_data, id_random_length=id_random_length):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(sql_insert_result)
obj_id = sql_insert_result
else:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(sql_insert_result)
return mk_resp(data=False, status_code=400, response=response)
if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id):
log.debug(sql_select_result)
resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=resp_data, response=response)
else:
log.debug(sql_select_result)
return mk_resp(data=False, status_code=404, response=response)
def patch_obj_template(
obj_type: str,
data: dict,
obj_id: str,
return_obj: bool=True,
by_alias: bool=True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
obj_data_dict = data
#obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type)
obj_data_dict['id_random'] = obj_id # NOTE: Adding this in so the id_random is NOT updated
log.debug(obj_data_dict)
obj_data = lookup_id_random_pop(obj_data_dict)
table_name_update = obj_type
table_name_select = obj_type_kv_li[obj_type]['table_name']
base_name = obj_type_kv_li[obj_type]['base_name']
if sql_update_result := sql_update(table_name=table_name_update, data=obj_data):
log.debug(sql_update_result)
#obj_id = sql_update_result
obj_id = obj_data_dict['id']
else:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(sql_update_result)
return mk_resp(data=False, status_code=400, response=response)
if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id):
log.debug(sql_select_result)
resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=resp_data, response=response)
else:
log.debug(sql_select_result)
return mk_resp(data=False, status_code=404, response=response)
def get_obj_li_template(
obj_type: str = Query(None, max_length=50),
for_obj_type: Optional[str] = Query(None, max_length=50),
for_obj_id: Optional[Union[int,str]] = None,
by_alias: Optional[bool] = True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if isinstance(for_obj_id, int):
pass
elif isinstance(for_obj_id, str):
for_obj_id_random = for_obj_id
for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id_random, table_name=for_obj_type)
table_name_select = obj_type_kv_li[obj_type]['table_name']
if for_obj_type and for_obj_id:
field_name = f'{for_obj_type}_id'
sql_result = sql_select(table_name=table_name_select, field_name=field_name, field_value=for_obj_id)
else:
sql_result = sql_select(table_name=table_name_select)
log.debug(sql_result)
base_name = obj_type_kv_li[obj_type]['base_name']
resp_data_li = []
for record in sql_result:
resp_data = base_name(**record).dict(by_alias=by_alias, exclude_unset=exclude_unset)
resp_data_li.append(resp_data)
return mk_resp(data=resp_data_li, response=response)
def get_obj_template(
obj_id: Union[int,str],
obj_type: str = Query(None, max_length=50),
by_alias: Optional[bool] = True,
include: Optional[list] = [],
exclude: Optional[list] = [],
exclude_unset: Optional[bool] = True,
exclude_none: Optional[bool] = True,
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if isinstance(obj_id, int):
pass
elif isinstance(obj_id, str):
obj_id_random = obj_id
obj_id = redis_lookup_id_random(record_id_random=obj_id_random, table_name=obj_type)
table_name_select = obj_type_kv_li[obj_type]['table_name']
if obj_id:
sql_result = sql_select(table_name=table_name_select, record_id=obj_id)
else:
return mk_resp(data=False, status_code=404, response=response)
if sql_result:
log.debug(sql_result)
base_name = obj_type_kv_li[obj_type]['base_name']
resp_data = base_name(**sql_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=resp_data, response=response)
else:
log.debug(sql_result)
return mk_resp(data=False, status_code=404, response=response)
def delete_obj_template(
obj_type: str = Query(None, max_length=50),
obj_id: str = Query(None, max_length=22),
response: Response = Response,
):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
debug_data = {}
debug_data['obj_type'] = obj_type
debug_data['obj_id'] = obj_id
log.debug(debug_data)
table_name_delete = obj_type # NOTE: Not using the table name from the object type list because it may be a view (v_*).
# NOTE: Add a check for the object ID... assuming it is a random ID string for now.
if sql_result := sql_delete(table_name=table_name_delete, record_id_random=obj_id):
log.debug(sql_result)
return mk_resp(data=True, response=response)
else:
log.debug(sql_result)
return mk_resp(data=False, status_code=404, response=response)
# New dynamic API CRUD endpoint
# The POST data should be JSON formatted
# @router.post('/query')
# def query(
# # qry JSON should contain these properties:
# # list: for_obj_type, for_obj_id, tbl_view_name, base_name
# # id: obj_id, tbl_view_name, base_name
# qry: str,
# commons: Common_Route_Params = Depends(common_route_params),
# ):
# log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(locals())
# import urllib