Working on general PATCH template function.
This commit is contained in:
@@ -9,6 +9,8 @@ from ..log import *
|
||||
from app.config import settings
|
||||
from app.db_sql import *
|
||||
|
||||
from .api_crud import patch_obj_template
|
||||
|
||||
from ..models.address_model import Address_Base
|
||||
from ..models.response_model import *
|
||||
|
||||
@@ -52,3 +54,66 @@ async def post_address_obj(address: Address_Base,
|
||||
resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
||||
|
||||
return mk_resp(data=resp_data)
|
||||
|
||||
|
||||
@router.patch('/{address_id}')
|
||||
async def patch_address_obj(
|
||||
address_id: str,
|
||||
address: Address_Base,
|
||||
x_account_id: str = Header(...),
|
||||
return_obj: Optional[bool] = True,
|
||||
by_alias: Optional[bool] = True,
|
||||
exclude_unset: Optional[bool] = True,
|
||||
):
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(locals())
|
||||
|
||||
table_name_update = 'address'
|
||||
obj_id = address_id # actually id_random
|
||||
obj_data_dict = address.dict(by_alias=False, exclude_unset=True)
|
||||
obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=table_name_update)
|
||||
obj_data_dict['id_random'] = obj_id
|
||||
result = patch_obj_template(table_name_update=table_name_update,
|
||||
data=obj_data_dict,
|
||||
obj_id=obj_id,
|
||||
return_obj=True,
|
||||
by_alias=True,
|
||||
exclude_unset=True
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(locals())
|
||||
|
||||
table_name_update = 'address'
|
||||
table_name_select = 'v_address'
|
||||
obj_id = address_id
|
||||
obj_data_dict = address.dict(by_alias=False, exclude_unset=True)
|
||||
obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=table_name_update)
|
||||
obj_data_dict['id_random'] = address_id # NOTE: Adding this in so the id_random is NOT updated
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(obj_data_dict)
|
||||
obj_data = lookup_id_random_pop(obj_data_dict)
|
||||
base_name = Address_Base
|
||||
|
||||
if sql_update_result := sql_update(table_name=table_name_update, data=obj_data):
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(sql_update_result)
|
||||
#obj_id = sql_update_result
|
||||
obj_id = obj_data_dict['id']
|
||||
else:
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(sql_update_result)
|
||||
return mk_resp(data=False, status_code=400)
|
||||
|
||||
if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id):
|
||||
log.debug(sql_select_result)
|
||||
|
||||
resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
||||
|
||||
return mk_resp(data=resp_data)
|
||||
else:
|
||||
log.debug(sql_select_result)
|
||||
return mk_resp(data=False, status_code=404)
|
||||
@@ -208,6 +208,20 @@ async def get_obj(obj_type_l1: str=None,
|
||||
by_alias: Optional[bool] = True,
|
||||
exclude_unset: Optional[bool] = True,
|
||||
):
|
||||
"""
|
||||
Simple select object type with an ID:
|
||||
- **obj_type_l1, obj_type_l2, obj_type_l3**:
|
||||
- Examples:
|
||||
- /account = account
|
||||
- /user = user
|
||||
- /user/role = user_role
|
||||
- /event = event
|
||||
- /event/exhibit = event_exhibit
|
||||
- /order = order
|
||||
- /order/cart = order_cart
|
||||
- /order/cart/line = order_cart_line
|
||||
- /lu/some_lookup = lu_some_lookup
|
||||
"""
|
||||
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(locals())
|
||||
|
||||
@@ -250,10 +264,123 @@ async def get_obj(obj_type_l1: str=None,
|
||||
table_name = obj_type_li[obj_name]['table_name']
|
||||
|
||||
# NOTE: Add a check for the object ID... assuming it is a random ID string for now.
|
||||
sql_result = sql_select(table_name=table_name, record_id_random=obj_id)
|
||||
if sql_result := sql_select(table_name=table_name, record_id_random=obj_id):
|
||||
log.debug(sql_result)
|
||||
|
||||
base_name = obj_type_li[obj_name]['base_name']
|
||||
resp_data = base_name(**sql_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
||||
|
||||
return mk_resp(data=resp_data) #, details=debug_data)
|
||||
else:
|
||||
log.debug(sql_result)
|
||||
return mk_resp(data=False, status_code=404)
|
||||
|
||||
|
||||
@router.delete('/{obj_type_l1}/{obj_id}')
|
||||
@router.delete('/{obj_type_l1}/{obj_type_l2}/{obj_id}')
|
||||
@router.delete('/{obj_type_l1}/{obj_type_l2}/{obj_type_l3}/{obj_id}')
|
||||
async def delete_obj(obj_type_l1: str=None,
|
||||
obj_type_l2: str=None,
|
||||
obj_type_l3: str=None,
|
||||
obj_id: str=None,
|
||||
x_account_id: str = Header(...),
|
||||
by_alias: Optional[bool] = True,
|
||||
exclude_unset: Optional[bool] = True,
|
||||
):
|
||||
"""
|
||||
Simple delete object type with an ID:
|
||||
- **obj_type_l1, obj_type_l2, obj_type_l3**:
|
||||
- Examples:
|
||||
- /account = account
|
||||
- /user = user
|
||||
- /user/role = user_role
|
||||
- /event = event
|
||||
- /event/exhibit = event_exhibit
|
||||
- /order = order
|
||||
- /order/cart = order_cart
|
||||
- /order/cart/line = order_cart_line
|
||||
- /lu/some_lookup = lu_some_lookup
|
||||
"""
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(locals())
|
||||
|
||||
debug_data = {}
|
||||
debug_data['obj_type_l1'] = obj_type_l1
|
||||
debug_data['obj_type_l2'] = obj_type_l2
|
||||
debug_data['obj_type_l3'] = obj_type_l3
|
||||
debug_data['obj_id'] = obj_id
|
||||
|
||||
log.debug(debug_data)
|
||||
|
||||
if obj_type_l1 and obj_type_l2 and obj_type_l3:
|
||||
obj_name = f'{obj_type_l1}_{obj_type_l2}_{obj_type_l3}'
|
||||
if obj_name in obj_type_li:
|
||||
pass
|
||||
else:
|
||||
return mk_resp(data=False, status_code=400)
|
||||
elif obj_type_l1 and obj_type_l2:
|
||||
obj_name = f'{obj_type_l1}_{obj_type_l2}'
|
||||
if obj_name in obj_type_li:
|
||||
pass
|
||||
else:
|
||||
return mk_resp(data=False, status_code=400)
|
||||
elif obj_type_l1:
|
||||
obj_name = f'{obj_type_l1}'
|
||||
if obj_name in obj_type_li:
|
||||
pass
|
||||
else:
|
||||
return mk_resp(data=False, status_code=400)
|
||||
else:
|
||||
log.warning('We should not be here')
|
||||
return mk_resp(data=False, status_code=400)
|
||||
|
||||
table_name = obj_name # obj_type_li[obj_name]['table_name'] # NOTE: Don't try to use the view!
|
||||
|
||||
# NOTE: Add a check for the object ID... assuming it is a random ID string for now.
|
||||
sql_result = sql_delete(table_name=table_name, record_id_random=obj_id)
|
||||
log.debug(sql_result)
|
||||
|
||||
base_name = obj_type_li[obj_name]['base_name']
|
||||
resp_data = base_name(**sql_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
||||
resp_data = True
|
||||
|
||||
return mk_resp(data=resp_data) #, details=debug_data)
|
||||
|
||||
|
||||
def patch_obj_template(table_name_update:str=None,
|
||||
data:dict=None,
|
||||
obj_id:str=None,
|
||||
return_obj:bool=True,
|
||||
by_alias:bool=True,
|
||||
exclude_unset:bool=True
|
||||
):
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(locals())
|
||||
|
||||
obj_data_dict = data
|
||||
|
||||
#obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=table_name_update)
|
||||
obj_data_dict['id_random'] = obj_id # NOTE: Adding this in so the id_random is NOT updated
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(obj_data_dict)
|
||||
obj_data = lookup_id_random_pop(obj_data_dict)
|
||||
table_name_select = obj_type_li[table_name_update]['table_name']
|
||||
base_name = obj_type_li[table_name_update]['base_name']
|
||||
|
||||
if sql_update_result := sql_update(table_name=table_name_update, data=obj_data):
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(sql_update_result)
|
||||
#obj_id = sql_update_result
|
||||
obj_id = obj_data_dict['id']
|
||||
else:
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(sql_update_result)
|
||||
return mk_resp(data=False, status_code=400)
|
||||
|
||||
if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id):
|
||||
log.debug(sql_select_result)
|
||||
|
||||
resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
||||
|
||||
return mk_resp(data=resp_data)
|
||||
else:
|
||||
log.debug(sql_select_result)
|
||||
return mk_resp(data=False, status_code=404)
|
||||
112
app/routers/contact.py
Normal file
112
app/routers/contact.py
Normal file
@@ -0,0 +1,112 @@
|
||||
import datetime
|
||||
#from datetime import datetime, time, timedelta
|
||||
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Query, status
|
||||
from pydantic import BaseModel, EmailStr, Field
|
||||
from typing import Dict, List, Optional, Set, Union
|
||||
|
||||
from ..lib_general import *
|
||||
from ..log import *
|
||||
from app.config import settings
|
||||
from app.db_sql import *
|
||||
|
||||
from ..models.contact_model import Contact_Base
|
||||
from ..models.response_model import *
|
||||
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
# Working on the contact API CRUD - STI 2021-03-08
|
||||
|
||||
|
||||
@router.post('')
|
||||
async def post_contact_obj(contact: Contact_Base,
|
||||
x_account_id: str = Header(...),
|
||||
return_obj: Optional[bool] = True,
|
||||
by_alias: Optional[bool] = True,
|
||||
exclude_unset: Optional[bool] = True,
|
||||
):
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(locals())
|
||||
|
||||
table_name_insert = 'contact'
|
||||
obj_data_dict = contact.dict(by_alias=False, exclude_unset=True)
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(obj_data_dict)
|
||||
obj_data = lookup_id_random_pop(obj_data_dict)
|
||||
base_name = Contact_Base
|
||||
|
||||
if sql_insert_result := sql_insert(table_name=table_name_insert, data=obj_data):
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(sql_insert_result)
|
||||
obj_id = sql_insert_result
|
||||
else:
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(sql_insert_result)
|
||||
return mk_resp(data=False, status_code=400)
|
||||
|
||||
table_name_select = 'v_contact'
|
||||
sql_select_result = sql_select(table_name=table_name_select, record_id=obj_id)
|
||||
log.debug(sql_select_result)
|
||||
|
||||
resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
||||
|
||||
return mk_resp(data=resp_data)
|
||||
|
||||
|
||||
@router.patch('/{contact_id}')
|
||||
async def patch_contact_obj(contact_id: str,
|
||||
contact: Contact_Base,
|
||||
x_account_id: str = Header(...),
|
||||
return_obj: Optional[bool] = True,
|
||||
by_alias: Optional[bool] = True,
|
||||
exclude_unset: Optional[bool] = True,
|
||||
):
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(locals())
|
||||
|
||||
table_name_update = 'contact'
|
||||
obj_id = contact_id # actually id_random
|
||||
obj_data_dict = contact.dict(by_alias=False, exclude_unset=True)
|
||||
obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=table_name_update)
|
||||
obj_data_dict['id_random'] = obj_id
|
||||
result = patch_obj_template(table_name_update=table_name_update,
|
||||
data=obj_data_dict,
|
||||
obj_id=obj_id,
|
||||
return_obj=True,
|
||||
by_alias=True,
|
||||
exclude_unset=True
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
table_name_update = 'contact'
|
||||
table_name_select = 'v_contact'
|
||||
obj_id = contact_id
|
||||
obj_data_dict = contact.dict(by_alias=False, exclude_unset=True)
|
||||
obj_data_dict['id'] = redis_lookup_id_random(record_id_random=obj_id, table_name=table_name_update)
|
||||
obj_data_dict['id_random'] = contact_id # NOTE: Adding this in so the id_random is NOT updated
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(obj_data_dict)
|
||||
obj_data = lookup_id_random_pop(obj_data_dict)
|
||||
base_name = Contact_Base
|
||||
|
||||
if sql_update_result := sql_update(table_name=table_name_update, data=obj_data):
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(sql_update_result)
|
||||
#obj_id = sql_update_result
|
||||
obj_id = obj_data_dict['id']
|
||||
else:
|
||||
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
||||
log.debug(sql_update_result)
|
||||
return mk_resp(data=False, status_code=400)
|
||||
|
||||
if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id):
|
||||
log.debug(sql_select_result)
|
||||
|
||||
resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
||||
|
||||
return mk_resp(data=resp_data)
|
||||
else:
|
||||
log.debug(sql_select_result)
|
||||
return mk_resp(data=False, status_code=404)
|
||||
Reference in New Issue
Block a user