1003 lines
45 KiB
Python
1003 lines
45 KiB
Python
from fastapi import APIRouter, Depends, Header, HTTPException, Path, Query, Request, Response, status
|
|
from typing import Any, Dict, List, Optional, Set, Union
|
|
import json
|
|
import urllib.parse
|
|
import time
|
|
import asyncio
|
|
|
|
import logging
|
|
log = logging.getLogger(__name__)
|
|
|
|
from app.lib_general_v3 import (
|
|
AccountContext, get_account_context,
|
|
PaginationParams, get_pagination_params,
|
|
StatusFilterParams, get_status_filter_params,
|
|
SerializationParams, get_serialization_params,
|
|
DelayParams, get_delay_params
|
|
)
|
|
from app.models.response_models import *
|
|
from app.models.api_crud_models import SearchFilter, SearchQuery
|
|
from app.ae_obj_types_def import obj_type_kv_li
|
|
from app.db_sql import redis_lookup_id_random, sql_select, sql_insert, sql_update, sql_delete, get_id_random
|
|
|
|
router = APIRouter()
|
|
|
|
def check_account_access(sql_result: Any, account: AccountContext, obj_name: str = None) -> bool:
|
|
"""
|
|
Verifies if the current account context has access to the given SQL result.
|
|
Always returns True for super users.
|
|
"""
|
|
if account.super or account.auth_method == 'bypass':
|
|
return True
|
|
if not account.account_id:
|
|
return False
|
|
|
|
res_account_id = None
|
|
if isinstance(sql_result, dict):
|
|
if obj_name == 'account':
|
|
res_account_id = sql_result.get('id')
|
|
else:
|
|
res_account_id = sql_result.get('account_id')
|
|
|
|
if res_account_id is not None and res_account_id != account.account_id:
|
|
return False
|
|
return True
|
|
|
|
def apply_forced_account_filter(and_qry_dict: Optional[Dict], account: AccountContext, model: Any, obj_name: str) -> Dict:
|
|
"""
|
|
Adds a forced account_id filter to the query dictionary if the user is not a super user.
|
|
"""
|
|
forced = and_qry_dict or {}
|
|
if account.super or account.auth_method == 'bypass':
|
|
return forced
|
|
|
|
if obj_name == 'account':
|
|
forced['id'] = account.account_id
|
|
elif model and hasattr(model, '__fields__') and 'account_id' in model.__fields__:
|
|
forced['account_id'] = account.account_id
|
|
|
|
return forced
|
|
|
|
def safe_json_loads(json_str: Optional[str]) -> Any:
|
|
"""
|
|
Safely load a JSON string, handling 'undefined' or invalid formats
|
|
common in frontend URL parameters.
|
|
"""
|
|
if not json_str or json_str == 'undefined':
|
|
return None
|
|
try:
|
|
return json.loads(json_str)
|
|
except (json.JSONDecodeError, TypeError) as e:
|
|
log.warning(f"Failed to parse JSON string: {json_str}. Error: {e}")
|
|
return None
|
|
|
|
def filter_order_by(order_by_li: Any, model: Any, table_name: str = None) -> Optional[Dict[str, str]]:
|
|
"""
|
|
Filters the order_by_li dictionary to only include fields present in the Pydantic model
|
|
AND actually present in the database table/view.
|
|
"""
|
|
if not order_by_li or not isinstance(order_by_li, dict) or not model:
|
|
return order_by_li
|
|
|
|
if not hasattr(model, '__fields__'):
|
|
return order_by_li
|
|
|
|
# 1. Filter by Pydantic Model Fields/Aliases
|
|
model_fields = set(model.__fields__.keys())
|
|
model_fields.update({f.alias for f in model.__fields__.values() if f.alias})
|
|
|
|
filtered = {k: v for k, v in order_by_li.items() if k in model_fields}
|
|
|
|
# 2. Filter by actual DB Column existence (Dry run)
|
|
if table_name and filtered:
|
|
from app.db_sql import db
|
|
from sqlalchemy import text
|
|
|
|
final_filtered = {}
|
|
for column in filtered:
|
|
try:
|
|
# Use a lightweight query to check if column exists
|
|
db.execute(text(f"SELECT `{column}` FROM `{table_name}` LIMIT 0"))
|
|
final_filtered[column] = filtered[column]
|
|
except Exception:
|
|
log.warning(f"Column '{column}' does not exist in '{table_name}'. Removing from order_by_li.")
|
|
continue
|
|
filtered = final_filtered
|
|
|
|
if len(filtered) != len(order_by_li):
|
|
log.info(f"Filtered order_by_li. Removed fields: {set(order_by_li.keys()) - set(filtered.keys())}")
|
|
|
|
return filtered
|
|
|
|
def get_supported_filters(model: Any, status_filter: StatusFilterParams) -> StatusFilterParams:
|
|
"""
|
|
Adjusts the status filter based on what the model actually supports to avoid
|
|
SQL errors when filtering by non-existent columns (like 'hide' or 'enable').
|
|
"""
|
|
if not model or not hasattr(model, "__fields__"):
|
|
return status_filter
|
|
|
|
# Create a copy of the filter params
|
|
adjusted = StatusFilterParams(
|
|
enabled=status_filter.enabled,
|
|
hidden=status_filter.hidden
|
|
)
|
|
|
|
# Check for 'enable' and 'hide' fields in the model
|
|
if 'enable' not in model.__fields__:
|
|
adjusted.enabled = 'all'
|
|
|
|
if 'hide' not in model.__fields__:
|
|
adjusted.hidden = 'all'
|
|
|
|
return adjusted
|
|
|
|
@router.get("/health", response_model=Resp_Body_Base)
|
|
async def health_check(
|
|
delay: DelayParams = Depends(get_delay_params),
|
|
):
|
|
"""
|
|
Health check endpoint for V3 API.
|
|
"""
|
|
if delay.sleep_time_s > 0:
|
|
await asyncio.sleep(delay.sleep_time_s)
|
|
|
|
log.setLevel(logging.INFO)
|
|
log.info("V3 Health Check Endpoint Hit")
|
|
return mk_resp(data={"status": "V3 API is healthy!"})
|
|
|
|
|
|
@router.get("/{obj_type}/schema", response_model=Resp_Body_Base, tags=['CRUD v3 Schema (Dev)'])
|
|
async def get_obj_schema(
|
|
response: Response,
|
|
obj_type: str = Path(min_length=2, max_length=50),
|
|
view: str = Query('default', description="Select alternative view/model (e.g., enriched, detail)"),
|
|
variant: str = Query('base', regex='^(base|in|out|default)$', description="Select model variant"),
|
|
account: AccountContext = Depends(get_account_context),
|
|
):
|
|
"""
|
|
Returns the schema for a specific Aether object type.
|
|
Combines database column metadata with Pydantic model field definitions.
|
|
"""
|
|
from app.db_sql import db
|
|
from sqlalchemy import text
|
|
|
|
if obj_type not in obj_type_kv_li:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_type}' not found.")
|
|
|
|
obj_cfg = obj_type_kv_li[obj_type]
|
|
|
|
# Select table/view
|
|
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
|
|
|
|
# Select model variant
|
|
model_key = f'mdl_{variant}' if variant != 'base' else 'mdl'
|
|
model = obj_cfg.get(model_key, obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
|
|
|
|
if not table_name:
|
|
return mk_resp(data=False, status_code=500, response=response, status_message=f"Table configuration for '{obj_type}' is missing.")
|
|
|
|
schema_info = {
|
|
"object_type": obj_type,
|
|
"view": view,
|
|
"variant": variant,
|
|
"database": {
|
|
"table_name": table_name,
|
|
"columns": []
|
|
},
|
|
"model": {
|
|
"name": model.__name__ if hasattr(model, '__name__') else str(model),
|
|
"fields": {}
|
|
}
|
|
}
|
|
|
|
# 1. Get Database Metadata
|
|
try:
|
|
db_result = db.execute(text(f"DESCRIBE `{table_name}`"))
|
|
for row in db_result.fetchall():
|
|
schema_info["database"]["columns"].append({
|
|
"field": row[0],
|
|
"type": row[1],
|
|
"nullable": row[2] == 'YES',
|
|
"key": row[3],
|
|
"default": row[4],
|
|
"extra": row[5]
|
|
})
|
|
except Exception as e:
|
|
log.warning(f"Failed to describe table '{table_name}': {e}")
|
|
schema_info["database"]["error"] = str(e)
|
|
|
|
# 2. Get Pydantic Model Metadata
|
|
if model and hasattr(model, "__fields__"):
|
|
for field_name, field in model.__fields__.items():
|
|
field_info = {
|
|
"alias": field.alias,
|
|
"type": str(field.outer_type_),
|
|
"required": field.required,
|
|
"default": field.default
|
|
}
|
|
# Include extra metadata from Field() if present
|
|
if field.field_info.description:
|
|
field_info["description"] = field.field_info.description
|
|
|
|
schema_info["model"]["fields"][field_name] = field_info
|
|
|
|
return mk_resp(data=schema_info, response=response)
|
|
|
|
|
|
# --- TOP LEVEL OBJECTS ---
|
|
|
|
@router.get('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
|
|
async def get_obj(
|
|
response: Response,
|
|
obj_type_l1: str = Path(min_length=2, max_length=50),
|
|
obj_id: str = Path(min_length=11, max_length=22),
|
|
view: str = Query('default', description="Select alternative view/model (e.g., enriched, detail)"),
|
|
account: AccountContext = Depends(get_account_context),
|
|
serialization: SerializationParams = Depends(get_serialization_params),
|
|
delay: DelayParams = Depends(get_delay_params),
|
|
):
|
|
"""
|
|
Get a single top-level object by its random ID.
|
|
"""
|
|
if delay.sleep_time_s > 0:
|
|
await asyncio.sleep(delay.sleep_time_s)
|
|
|
|
log.setLevel(logging.WARNING)
|
|
log.debug(locals())
|
|
|
|
obj_name = obj_type_l1
|
|
if obj_name not in obj_type_kv_li:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
|
|
|
|
obj_cfg = obj_type_kv_li[obj_name]
|
|
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
|
|
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
|
|
|
|
if not table_name or not base_name:
|
|
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' (view: {view}) is incomplete.")
|
|
|
|
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
|
|
if not record_id:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.")
|
|
|
|
if sql_result := sql_select(table_name=table_name, record_id=record_id):
|
|
# Security Check
|
|
if not check_account_access(sql_result, account, obj_name):
|
|
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied.")
|
|
|
|
resp_data = base_name(**sql_result).dict(
|
|
by_alias=serialization.by_alias,
|
|
exclude_unset=serialization.exclude_unset,
|
|
exclude_defaults=serialization.exclude_defaults,
|
|
exclude_none=serialization.exclude_none
|
|
)
|
|
return mk_resp(data=resp_data, response=response)
|
|
else:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.")
|
|
|
|
|
|
@router.get('/{obj_type_l1}/', response_model=Resp_Body_Base)
|
|
async def get_obj_li(
|
|
response: Response,
|
|
obj_type_l1: str,
|
|
for_obj_type: Optional[str] = None,
|
|
for_obj_id: Optional[str] = None,
|
|
view: str = Query('default'),
|
|
order_by_li: Optional[str] = None,
|
|
jp: Optional[Union[str, None]] = None,
|
|
account: AccountContext = Depends(get_account_context),
|
|
pagination: PaginationParams = Depends(get_pagination_params),
|
|
status_filter: StatusFilterParams = Depends(get_status_filter_params),
|
|
serialization: SerializationParams = Depends(get_serialization_params),
|
|
delay: DelayParams = Depends(get_delay_params),
|
|
):
|
|
"""
|
|
Get a list of top-level objects.
|
|
"""
|
|
if delay.sleep_time_s > 0:
|
|
await asyncio.sleep(delay.sleep_time_s)
|
|
|
|
log.setLevel(logging.WARNING)
|
|
log.debug(locals())
|
|
|
|
qry_dict_li = None
|
|
fulltext_qry_dict_obj = None
|
|
and_qry_dict_obj = None
|
|
and_like_dict_obj = None
|
|
or_like_dict_obj = None
|
|
and_in_dict_li_obj = None
|
|
|
|
jp_obj = safe_json_loads(urllib.parse.unquote(jp)) if jp else None
|
|
if jp_obj:
|
|
if jp_obj.get('qry'): qry_dict_li = jp_obj['qry']
|
|
if jp_obj.get('ft_qry'): fulltext_qry_dict_obj = jp_obj['ft_qry']
|
|
if jp_obj.get('and_qry'): and_qry_dict_obj = jp_obj['and_qry']
|
|
if jp_obj.get('and_like'): and_like_dict_obj = jp_obj['and_like']
|
|
if jp_obj.get('or_like'): or_like_dict_obj = jp_obj['or_like']
|
|
if jp_obj.get('and_in_li'): and_in_dict_li_obj = jp_obj['and_in_li']
|
|
|
|
order_by_li = safe_json_loads(order_by_li)
|
|
|
|
obj_name = obj_type_l1
|
|
if obj_name not in obj_type_kv_li:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
|
|
|
|
# Security Restrictions
|
|
if obj_name == 'site' and not (for_obj_type == 'account' and for_obj_id):
|
|
return mk_resp(data=False, status_code=403, response=response, status_message="Listing sites is only permitted when filtered by account.")
|
|
|
|
# Explicit Account isolation
|
|
if for_obj_type == 'account' and for_obj_id:
|
|
if not account.super and for_obj_id != account.account_id_random:
|
|
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to requested account.")
|
|
|
|
obj_cfg = obj_type_kv_li[obj_name]
|
|
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
|
|
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
|
|
|
|
if not table_name or not base_name:
|
|
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' (view: {view}) is incomplete.")
|
|
|
|
order_by_li = filter_order_by(order_by_li, base_name, table_name)
|
|
status_filter = get_supported_filters(base_name, status_filter)
|
|
|
|
# Forced account isolation
|
|
and_qry_dict_obj = apply_forced_account_filter(and_qry_dict_obj, account, base_name, obj_name)
|
|
|
|
if for_obj_type and for_obj_id:
|
|
resolved_for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
|
|
if not resolved_for_obj_id:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent object with ID '{for_obj_id}' not found.")
|
|
|
|
sql_result = sql_select(
|
|
table_name=table_name,
|
|
field_name=f'{for_obj_type}_id',
|
|
field_value=resolved_for_obj_id,
|
|
enabled=status_filter.enabled,
|
|
hidden=status_filter.hidden,
|
|
qry_dict_li=qry_dict_li,
|
|
fulltext_qry_dict=fulltext_qry_dict_obj,
|
|
and_qry_dict=and_qry_dict_obj,
|
|
and_like_dict=and_like_dict_obj,
|
|
or_like_dict=or_like_dict_obj,
|
|
and_in_dict_li=and_in_dict_li_obj,
|
|
order_by_li=order_by_li,
|
|
limit=pagination.limit,
|
|
offset=pagination.offset,
|
|
as_list=True,
|
|
)
|
|
else:
|
|
sql_result = sql_select(
|
|
table_name=table_name,
|
|
enabled=status_filter.enabled,
|
|
hidden=status_filter.hidden,
|
|
qry_dict_li=qry_dict_li,
|
|
fulltext_qry_dict=fulltext_qry_dict_obj,
|
|
and_qry_dict=and_qry_dict_obj,
|
|
and_like_dict=and_like_dict_obj,
|
|
or_like_dict=or_like_dict_obj,
|
|
and_in_dict_li=and_in_dict_li_obj,
|
|
order_by_li=order_by_li,
|
|
limit=pagination.limit,
|
|
offset=pagination.offset,
|
|
as_list=True,
|
|
)
|
|
|
|
if sql_result is False:
|
|
return mk_resp(data=False, status_code=500, response=response, status_message="Database error occurred.")
|
|
elif sql_result:
|
|
resp_data_li = [base_name(**record).dict(
|
|
by_alias=serialization.by_alias,
|
|
exclude_unset=serialization.exclude_unset,
|
|
exclude_defaults=serialization.exclude_defaults,
|
|
exclude_none=serialization.exclude_none
|
|
) for record in sql_result]
|
|
return mk_resp(data=resp_data_li, response=response)
|
|
else:
|
|
return mk_resp(data=[], status_code=200, response=response)
|
|
|
|
|
|
@router.post('/{obj_type_l1}/search', response_model=Resp_Body_Base, tags=['CRUD v3 Search (Dev)'])
|
|
async def search_obj_li(
|
|
response: Response,
|
|
obj_type_l1: str,
|
|
search_query: SearchQuery,
|
|
for_obj_type: Optional[str] = Query(None, description="Explicit parent type filter"),
|
|
for_obj_id: Optional[str] = Query(None, description="Explicit parent ID filter"),
|
|
view: str = Query('default', description="Select alternative view/model (e.g., enriched, detail)"),
|
|
order_by_li: Optional[str] = Query(None),
|
|
account: AccountContext = Depends(get_account_context),
|
|
pagination: PaginationParams = Depends(get_pagination_params),
|
|
status_filter: StatusFilterParams = Depends(get_status_filter_params),
|
|
serialization: SerializationParams = Depends(get_serialization_params),
|
|
delay: DelayParams = Depends(get_delay_params),
|
|
):
|
|
"""
|
|
Search top-level objects using a complex SearchQuery in the POST body.
|
|
"""
|
|
if delay.sleep_time_s > 0:
|
|
await asyncio.sleep(delay.sleep_time_s)
|
|
|
|
log.setLevel(logging.WARNING)
|
|
log.debug(locals())
|
|
|
|
order_by_li = safe_json_loads(order_by_li)
|
|
|
|
obj_name = obj_type_l1
|
|
if obj_name not in obj_type_kv_li:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
|
|
|
|
# Security Restrictions
|
|
if obj_name == 'site' and not (for_obj_type == 'account' and for_obj_id):
|
|
return mk_resp(data=False, status_code=403, response=response, status_message="Listing sites is only permitted when filtered by account.")
|
|
|
|
obj_cfg = obj_type_kv_li[obj_name]
|
|
table_name = obj_cfg.get(f'tbl_{view}', obj_cfg.get('tbl_default', obj_cfg.get('tbl')))
|
|
base_name = obj_cfg.get(f'mdl_{view}', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
|
|
|
|
if not table_name or not base_name:
|
|
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' (view: {view}) is incomplete.")
|
|
|
|
order_by_li = filter_order_by(order_by_li, base_name, table_name)
|
|
status_filter = get_supported_filters(base_name, status_filter)
|
|
searchable_fields = obj_cfg.get('searchable_fields')
|
|
|
|
# Explicit Account isolation
|
|
if for_obj_type == 'account' and for_obj_id:
|
|
if not account.super and for_obj_id != account.account_id_random:
|
|
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to requested account.")
|
|
|
|
# Forced account isolation for the search query itself
|
|
# We inject the account_id into the SearchQuery object if it's not a super user
|
|
if not account.super and account.auth_method != 'bypass' and account.account_id:
|
|
if search_query.and_filters is None:
|
|
search_query.and_filters = []
|
|
|
|
if obj_name == 'account':
|
|
search_query.and_filters.append(SearchFilter(field='id', op='eq', value=account.account_id))
|
|
elif base_name and hasattr(base_name, '__fields__') and 'account_id' in base_name.__fields__:
|
|
search_query.and_filters.append(SearchFilter(field='account_id', op='eq', value=account.account_id))
|
|
|
|
if for_obj_type and for_obj_id:
|
|
resolved_for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
|
|
if not resolved_for_obj_id:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent object with ID '{for_obj_id}' not found.")
|
|
|
|
sql_result = sql_select(
|
|
table_name=table_name,
|
|
field_name=f'{for_obj_type}_id',
|
|
field_value=resolved_for_obj_id,
|
|
enabled=status_filter.enabled,
|
|
hidden=status_filter.hidden,
|
|
search_query=search_query,
|
|
searchable_fields=searchable_fields,
|
|
order_by_li=order_by_li,
|
|
limit=pagination.limit,
|
|
offset=pagination.offset,
|
|
as_list=True,
|
|
)
|
|
else:
|
|
sql_result = sql_select(
|
|
table_name=table_name,
|
|
enabled=status_filter.enabled,
|
|
hidden=status_filter.hidden,
|
|
search_query=search_query,
|
|
searchable_fields=searchable_fields,
|
|
order_by_li=order_by_li,
|
|
limit=pagination.limit,
|
|
offset=pagination.offset,
|
|
as_list=True,
|
|
)
|
|
|
|
if sql_result is False:
|
|
return mk_resp(data=False, status_code=500, response=response, status_message="Database error occurred.")
|
|
elif sql_result:
|
|
resp_data_li = [base_name(**record).dict(
|
|
by_alias=serialization.by_alias,
|
|
exclude_unset=serialization.exclude_unset,
|
|
exclude_defaults=serialization.exclude_defaults,
|
|
exclude_none=serialization.exclude_none
|
|
) for record in sql_result]
|
|
return mk_resp(data=resp_data_li, response=response)
|
|
else:
|
|
return mk_resp(data=[], status_code=200, response=response)
|
|
|
|
|
|
@router.post('/{obj_type_l1}/', response_model=Resp_Body_Base)
|
|
async def post_obj(
|
|
request: Request,
|
|
response: Response,
|
|
obj_type_l1: str = Path(min_length=2, max_length=50),
|
|
return_obj: Optional[bool] = True,
|
|
account: AccountContext = Depends(get_account_context),
|
|
serialization: SerializationParams = Depends(get_serialization_params),
|
|
delay: DelayParams = Depends(get_delay_params),
|
|
):
|
|
"""
|
|
Create a new top-level object.
|
|
"""
|
|
if delay.sleep_time_s > 0:
|
|
await asyncio.sleep(delay.sleep_time_s)
|
|
|
|
log.setLevel(logging.WARNING)
|
|
log.debug(locals())
|
|
|
|
obj_data = await request.json()
|
|
|
|
obj_name = obj_type_l1
|
|
if obj_name not in obj_type_kv_li:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
|
|
|
|
obj_cfg = obj_type_kv_li[obj_name]
|
|
table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
|
|
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
|
|
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
|
|
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
|
|
|
|
if not table_name_insert or not input_model or not table_name_select or not output_model:
|
|
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
|
|
|
|
# Security: Force account_id for non-super users if model supports it
|
|
if not account.super and account.auth_method != 'bypass' and account.account_id:
|
|
if 'account_id' in input_model.__fields__:
|
|
obj_data['account_id'] = account.account_id
|
|
elif obj_name == 'account':
|
|
# Users can't create accounts unless they are super (usually)
|
|
# But if they can, it should be their own ID?
|
|
# Actually, account creation is usually a super-admin task.
|
|
return mk_resp(data=False, status_code=403, response=response, status_message="Account creation is restricted to super administrators.")
|
|
|
|
try:
|
|
validated_obj = input_model(**obj_data)
|
|
except Exception as e:
|
|
log.warning(f"Validation error for {obj_name}: {e}")
|
|
return mk_resp(data=False, status_code=400, response=response, status_message=f"Validation error: {e}")
|
|
|
|
data_to_insert = validated_obj.dict(exclude_unset=True)
|
|
|
|
if sql_insert_result := sql_insert(data=data_to_insert, table_name=table_name_insert):
|
|
new_obj_id = sql_insert_result
|
|
new_obj_id_random = get_id_random(record_id=new_obj_id, table_name=obj_name)
|
|
|
|
if return_obj:
|
|
if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id):
|
|
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
|
|
return mk_resp(data=resp_data, response=response)
|
|
else:
|
|
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, status_code=404, response=response, status_message="Object created but could not be retrieved.")
|
|
else:
|
|
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, response=response)
|
|
else:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to create object in database.")
|
|
|
|
|
|
@router.patch('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
|
|
async def patch_obj(
|
|
request: Request,
|
|
response: Response,
|
|
obj_type_l1: str = Path(min_length=2, max_length=50),
|
|
obj_id: str = Path(min_length=11, max_length=22),
|
|
return_obj: Optional[bool] = True,
|
|
account: AccountContext = Depends(get_account_context),
|
|
serialization: SerializationParams = Depends(get_serialization_params),
|
|
delay: DelayParams = Depends(get_delay_params),
|
|
):
|
|
"""
|
|
Update a top-level object.
|
|
"""
|
|
if delay.sleep_time_s > 0:
|
|
await asyncio.sleep(delay.sleep_time_s)
|
|
|
|
log.setLevel(logging.WARNING)
|
|
log.debug(locals())
|
|
|
|
obj_data = await request.json()
|
|
|
|
obj_name = obj_type_l1
|
|
if obj_name not in obj_type_kv_li:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
|
|
|
|
obj_cfg = obj_type_kv_li[obj_name]
|
|
table_name_update = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
|
|
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
|
|
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
|
|
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
|
|
|
|
if not table_name_update or not input_model or not table_name_select or not output_model:
|
|
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
|
|
|
|
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
|
|
if not record_id:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.")
|
|
|
|
# Security Check
|
|
if existing_obj := sql_select(table_name=table_name_select, record_id=record_id):
|
|
if not check_account_access(existing_obj, account, obj_name):
|
|
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied.")
|
|
else:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.")
|
|
|
|
data_to_update = obj_data
|
|
|
|
if sql_update_result := sql_update(data=data_to_update, table_name=table_name_update, record_id=record_id):
|
|
if return_obj:
|
|
if sql_select_result := sql_select(table_name=table_name_select, record_id=record_id):
|
|
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
|
|
return mk_resp(data=resp_data, response=response)
|
|
else:
|
|
return mk_resp(data=True, status_code=404, response=response, status_message="Object updated but could not be retrieved.")
|
|
else:
|
|
return mk_resp(data=True, response=response, status_message="Object updated successfully.")
|
|
else:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to update object in database.")
|
|
|
|
|
|
@router.delete('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
|
|
async def delete_obj(
|
|
response: Response,
|
|
obj_type_l1: str = Path(min_length=2, max_length=50),
|
|
obj_id: str = Path(min_length=11, max_length=22),
|
|
method: str = Query('delete', regex='^(delete|hide|disable)$'),
|
|
account: AccountContext = Depends(get_account_context),
|
|
delay: DelayParams = Depends(get_delay_params),
|
|
):
|
|
"""
|
|
Delete a top-level object (hard or soft).
|
|
"""
|
|
if delay.sleep_time_s > 0:
|
|
await asyncio.sleep(delay.sleep_time_s)
|
|
|
|
log.setLevel(logging.WARNING)
|
|
log.debug(locals())
|
|
|
|
obj_name = obj_type_l1
|
|
if obj_name not in obj_type_kv_li:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message=f"Object type '{obj_name}' not found.")
|
|
|
|
obj_cfg = obj_type_kv_li[obj_name]
|
|
table_name_delete = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
|
|
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
|
|
|
|
if not table_name_delete:
|
|
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
|
|
|
|
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
|
|
if not record_id:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found.")
|
|
|
|
# Security Check
|
|
if existing_obj := sql_select(table_name=table_name_select, record_id=record_id):
|
|
if not check_account_access(existing_obj, account, obj_name):
|
|
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied.")
|
|
else:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message=f"Object with ID '{obj_id}' not found in database.")
|
|
|
|
if method == 'hide':
|
|
if sql_update(table_name=table_name_delete, record_id=record_id, data={'hide': True}):
|
|
return mk_resp(data=True, response=response, status_message=f"Object with ID '{obj_id}' hidden successfully.")
|
|
else:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to hide object.")
|
|
elif method == 'disable':
|
|
if sql_update(table_name=table_name_delete, record_id=record_id, data={'enable': False}):
|
|
return mk_resp(data=True, response=response, status_message=f"Object with ID '{obj_id}' disabled successfully.")
|
|
else:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to disable object.")
|
|
else:
|
|
if sql_delete_result := sql_delete(table_name=table_name_delete, record_id=record_id):
|
|
return mk_resp(data=True, response=response, status_message=f"Object with ID '{obj_id}' deleted successfully.")
|
|
else:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to delete object.")
|
|
|
|
|
|
# --- CHILD / NESTED OBJECTS ---
|
|
|
|
@router.get('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/', response_model=Resp_Body_Base)
|
|
async def get_child_obj_li(
|
|
response: Response,
|
|
parent_obj_type: str,
|
|
parent_obj_id: str,
|
|
child_obj_type: str,
|
|
order_by_li: Optional[str] = None,
|
|
jp: Optional[Union[str, None]] = None,
|
|
account: AccountContext = Depends(get_account_context),
|
|
pagination: PaginationParams = Depends(get_pagination_params),
|
|
status_filter: StatusFilterParams = Depends(get_status_filter_params),
|
|
serialization: SerializationParams = Depends(get_serialization_params),
|
|
delay: DelayParams = Depends(get_delay_params),
|
|
):
|
|
"""
|
|
Get a list of child objects belonging to a parent.
|
|
"""
|
|
if delay.sleep_time_s > 0:
|
|
await asyncio.sleep(delay.sleep_time_s)
|
|
|
|
log.setLevel(logging.WARNING)
|
|
log.debug(locals())
|
|
|
|
qry_dict_li = None
|
|
fulltext_qry_dict_obj = None
|
|
and_qry_dict_obj = None
|
|
and_like_dict_obj = None
|
|
or_like_dict_obj = None
|
|
and_in_dict_li_obj = None
|
|
|
|
jp_obj = safe_json_loads(urllib.parse.unquote(jp)) if jp else None
|
|
if jp_obj:
|
|
if jp_obj.get('qry'): qry_dict_li = jp_obj['qry']
|
|
if jp_obj.get('ft_qry'): fulltext_qry_dict_obj = jp_obj['ft_qry']
|
|
if jp_obj.get('and_qry'): and_qry_dict_obj = jp_obj['and_qry']
|
|
if jp_obj.get('and_like'): and_like_dict_obj = jp_obj['and_like']
|
|
if jp_obj.get('or_like'): or_like_dict_obj = jp_obj['or_like']
|
|
if jp_obj.get('and_in_li'): and_in_dict_li_obj = jp_obj['and_in_li']
|
|
|
|
order_by_li = safe_json_loads(order_by_li)
|
|
|
|
obj_name = child_obj_type
|
|
if obj_name not in obj_type_kv_li or parent_obj_type not in obj_type_kv_li:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message=f"Invalid object type(s).")
|
|
|
|
obj_cfg = obj_type_kv_li[obj_name]
|
|
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
|
|
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
|
|
|
|
if not table_name or not base_name:
|
|
return mk_resp(data=False, status_code=500, response=response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
|
|
|
|
order_by_li = filter_order_by(order_by_li, base_name, table_name)
|
|
status_filter = get_supported_filters(base_name, status_filter)
|
|
|
|
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
|
|
if not resolved_parent_id:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message=f"Parent object '{parent_obj_type}' with ID '{parent_obj_id}' not found.")
|
|
|
|
# Parent Security Check
|
|
parent_cfg = obj_type_kv_li[parent_obj_type]
|
|
parent_table = parent_cfg.get('tbl_default', parent_cfg.get('tbl'))
|
|
if parent_sql_res := sql_select(table_name=parent_table, record_id=resolved_parent_id):
|
|
if not check_account_access(parent_sql_res, account, parent_obj_type):
|
|
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to parent object.")
|
|
else:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message="Parent object not found.")
|
|
|
|
# Child security: Forced account isolation if child supports it
|
|
and_qry_dict_obj = apply_forced_account_filter(and_qry_dict_obj, account, base_name, obj_name)
|
|
|
|
sql_result = sql_select(
|
|
table_name=table_name,
|
|
field_name=f'{parent_obj_type}_id',
|
|
field_value=resolved_parent_id,
|
|
enabled=status_filter.enabled,
|
|
hidden=status_filter.hidden,
|
|
qry_dict_li=qry_dict_li,
|
|
fulltext_qry_dict=fulltext_qry_dict_obj,
|
|
and_qry_dict=and_qry_dict_obj,
|
|
and_like_dict=and_like_dict_obj,
|
|
or_like_dict=or_like_dict_obj,
|
|
and_in_dict_li=and_in_dict_li_obj,
|
|
order_by_li=order_by_li,
|
|
limit=pagination.limit,
|
|
offset=pagination.offset,
|
|
as_list=True,
|
|
)
|
|
|
|
if sql_result is False:
|
|
return mk_resp(data=False, status_code=500, response=response, status_message="Database error occurred.")
|
|
elif sql_result:
|
|
resp_data_li = [base_name(**record).dict(
|
|
by_alias=serialization.by_alias,
|
|
exclude_unset=serialization.exclude_unset,
|
|
exclude_defaults=serialization.exclude_defaults,
|
|
exclude_none=serialization.exclude_none
|
|
) for record in sql_result]
|
|
return mk_resp(data=resp_data_li, response=response)
|
|
else:
|
|
return mk_resp(data=[], status_code=200, response=response)
|
|
|
|
|
|
@router.post('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/', response_model=Resp_Body_Base)
|
|
async def post_child_obj(
|
|
request: Request,
|
|
response: Response,
|
|
parent_obj_type: str = Path(min_length=2, max_length=50),
|
|
parent_obj_id: str = Path(min_length=11, max_length=22),
|
|
child_obj_type: str = Path(min_length=2, max_length=50),
|
|
return_obj: Optional[bool] = True,
|
|
account: AccountContext = Depends(get_account_context),
|
|
serialization: SerializationParams = Depends(get_serialization_params),
|
|
delay: DelayParams = Depends(get_delay_params),
|
|
):
|
|
"""
|
|
Create a new child object for a given parent.
|
|
"""
|
|
if delay.sleep_time_s > 0:
|
|
await asyncio.sleep(delay.sleep_time_s)
|
|
|
|
log.setLevel(logging.WARNING)
|
|
log.debug(locals())
|
|
|
|
obj_data = await request.json()
|
|
|
|
if parent_obj_type not in obj_type_kv_li or child_obj_type not in obj_type_kv_li:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message="Invalid object type.")
|
|
|
|
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
|
|
if not resolved_parent_id:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message="Parent not found.")
|
|
|
|
# Parent Security Check
|
|
parent_cfg = obj_type_kv_li[parent_obj_type]
|
|
parent_table = parent_cfg.get('tbl_default', parent_cfg.get('tbl'))
|
|
if parent_sql_res := sql_select(table_name=parent_table, record_id=resolved_parent_id):
|
|
if not check_account_access(parent_sql_res, account, parent_obj_type):
|
|
return mk_resp(data=False, status_code=403, response=response, status_message="Access denied to parent object.")
|
|
else:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message="Parent object not found.")
|
|
|
|
obj_cfg = obj_type_kv_li[child_obj_type]
|
|
table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
|
|
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
|
|
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
|
|
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
|
|
|
|
# Security: Force account_id for non-super users if child supports it
|
|
if not account.super and account.auth_method != 'bypass' and account.account_id:
|
|
if 'account_id' in input_model.__fields__:
|
|
obj_data['account_id'] = account.account_id
|
|
|
|
obj_data[f'{parent_obj_type}_id'] = resolved_parent_id
|
|
|
|
try:
|
|
validated_obj = input_model(**obj_data)
|
|
except Exception as e:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message=f"Validation error: {e}")
|
|
|
|
data_to_insert = validated_obj.dict(exclude_unset=True)
|
|
|
|
if sql_insert_result := sql_insert(data=data_to_insert, table_name=table_name_insert):
|
|
new_obj_id = sql_insert_result
|
|
new_obj_id_random = get_id_random(record_id=new_obj_id, table_name=child_obj_type)
|
|
|
|
if return_obj:
|
|
if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id):
|
|
resp_data = output_model(**sql_select_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
|
|
return mk_resp(data=resp_data, response=response)
|
|
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, response=response)
|
|
else:
|
|
return mk_resp(data=False, status_code=400, response=response, status_message="Failed to create child object.")
|
|
|
|
|
|
@router.get('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
|
|
async def get_child_obj(
|
|
response: Response,
|
|
parent_obj_type: str = Path(min_length=2, max_length=50),
|
|
parent_obj_id: str = Path(min_length=11, max_length=22),
|
|
child_obj_type: str = Path(min_length=2, max_length=50),
|
|
child_obj_id: str = Path(min_length=11, max_length=22),
|
|
account: AccountContext = Depends(get_account_context),
|
|
serialization: SerializationParams = Depends(get_serialization_params),
|
|
delay: DelayParams = Depends(get_delay_params),
|
|
):
|
|
"""
|
|
Get a single child object, verifying parentage.
|
|
"""
|
|
if delay.sleep_time_s > 0:
|
|
await asyncio.sleep(delay.sleep_time_s)
|
|
|
|
log.setLevel(logging.WARNING)
|
|
log.debug(locals())
|
|
|
|
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
|
|
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_type)
|
|
|
|
if not resolved_parent_id or not resolved_child_id:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
|
|
|
|
obj_cfg = obj_type_kv_li[child_obj_type]
|
|
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
|
|
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
|
|
|
|
if sql_result := sql_select(table_name=table_name, record_id=resolved_child_id):
|
|
if sql_result.get(f'{parent_obj_type}_id') != resolved_parent_id:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found under parent.")
|
|
|
|
resp_data = base_name(**sql_result).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
|
|
return mk_resp(data=resp_data, response=response)
|
|
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found.")
|
|
|
|
|
|
@router.patch('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
|
|
async def patch_child_obj(
|
|
request: Request,
|
|
response: Response,
|
|
parent_obj_type: str = Path(min_length=2, max_length=50),
|
|
parent_obj_id: str = Path(min_length=11, max_length=22),
|
|
child_obj_type: str = Path(min_length=2, max_length=50),
|
|
child_obj_id: str = Path(min_length=11, max_length=22),
|
|
return_obj: Optional[bool] = True,
|
|
account: AccountContext = Depends(get_account_context),
|
|
serialization: SerializationParams = Depends(get_serialization_params),
|
|
delay: DelayParams = Depends(get_delay_params),
|
|
):
|
|
"""
|
|
Update a child object, verifying parentage.
|
|
"""
|
|
if delay.sleep_time_s > 0:
|
|
await asyncio.sleep(delay.sleep_time_s)
|
|
|
|
log.setLevel(logging.WARNING)
|
|
log.debug(locals())
|
|
|
|
obj_data = await request.json()
|
|
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
|
|
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_type)
|
|
|
|
if not resolved_parent_id or not resolved_child_id:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
|
|
|
|
obj_cfg = obj_type_kv_li[child_obj_type]
|
|
table_name_update = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
|
|
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
|
|
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
|
|
|
|
if existing_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
|
|
if existing_child.get(f'{parent_obj_type}_id') != resolved_parent_id:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found under parent.")
|
|
else:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found.")
|
|
|
|
if sql_update(data=obj_data, table_name=table_name_update, record_id=resolved_child_id):
|
|
if return_obj:
|
|
if updated_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
|
|
resp_data = output_model(**updated_child).dict(by_alias=serialization.by_alias, exclude_unset=serialization.exclude_unset)
|
|
return mk_resp(data=resp_data, response=response)
|
|
return mk_resp(data=True, response=response, status_message="Updated successfully.")
|
|
return mk_resp(data=False, status_code=400, response=response, status_message="Update failed.")
|
|
|
|
|
|
@router.delete('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
|
|
async def delete_child_obj(
|
|
response: Response,
|
|
parent_obj_type: str = Path(min_length=2, max_length=50),
|
|
parent_obj_id: str = Path(min_length=11, max_length=22),
|
|
child_obj_type: str = Path(min_length=2, max_length=50),
|
|
child_obj_id: str = Path(min_length=11, max_length=22),
|
|
method: str = Query('delete', regex='^(delete|hide|disable)$'),
|
|
account: AccountContext = Depends(get_account_context),
|
|
delay: DelayParams = Depends(get_delay_params),
|
|
):
|
|
"""
|
|
Delete a child object, verifying parentage (hard or soft).
|
|
"""
|
|
if delay.sleep_time_s > 0:
|
|
await asyncio.sleep(delay.sleep_time_s)
|
|
|
|
log.setLevel(logging.WARNING)
|
|
log.debug(locals())
|
|
|
|
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_type)
|
|
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_type)
|
|
|
|
if not resolved_parent_id or not resolved_child_id:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message="Object(s) not found.")
|
|
|
|
obj_cfg = obj_type_kv_li[child_obj_type]
|
|
table_name_delete = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
|
|
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
|
|
|
|
if existing_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
|
|
if existing_child.get(f'{parent_obj_type}_id') != resolved_parent_id:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found under parent.")
|
|
else:
|
|
return mk_resp(data=False, status_code=404, response=response, status_message="Child not found.")
|
|
|
|
if method == 'hide':
|
|
success = sql_update(table_name=table_name_delete, record_id=resolved_child_id, data={'hide': True})
|
|
elif method == 'disable':
|
|
success = sql_update(table_name=table_name_delete, record_id=resolved_child_id, data={'enable': False})
|
|
else:
|
|
success = sql_delete(table_name=table_name_delete, record_id=resolved_child_id)
|
|
|
|
if success:
|
|
return mk_resp(data=True, response=response, status_message=f"Deleted successfully via {method}.")
|
|
return mk_resp(data=False, status_code=400, response=response, status_message="Deletion failed.") |