Files
OSIT-AE-API-FastAPI/app/routers/data_store.py

401 lines
17 KiB
Python

import datetime, pytz, time
from fastapi import APIRouter, Body, Depends, Header, HTTPException, Path, Query, Response, status
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
import asyncio
from app.lib_general import log, logging, common_route_params, Common_Route_Params
from app.config import settings
from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, get_id_random, redis_lookup_id_random
from app.routers.api_crud import delete_obj_template, get_obj_template, get_obj_li_template, patch_obj_template, post_obj_template
from app.methods.data_store_methods import create_update_data_store_obj, get_data_store_rec_list, load_data_store_obj, load_data_store_obj_w_code
from app.models.common_field_schema import default_num_bytes
from app.models.data_store_models import Data_Store_Base
from app.models.response_models import Resp_Body_Base, mk_resp
# V3 Imports
from app.lib_general_v3 import AccountContext, get_account_context, SerializationParams, StatusFilterParams, DelayParams, PaginationParams
from app.models.api_crud_models import SearchFilter, SearchQuery
router = APIRouter()
# ### BEGIN ### API Data Store Routers ### post_data_store_obj() ###
# Updated 2026-01-28
@router.post('/data_store', response_model=Resp_Body_Base)
async def post_data_store_obj(
data_store_obj: Data_Store_Base,
return_obj: bool = True,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# ### SECTION ### Secondary data validation
# None
# ### SECTION ### Process data
if data_store_id := create_update_data_store_obj(
data_store_dict_obj = data_store_obj,
): pass
else:
log.warning('Likely bad request')
return mk_resp(data=False, status_code=400, response=commons.response, status_message='Not created. Something failed while processing the data. Check the field names and data types.') # Bad Request
# ### SECTION ### Return successful results
if return_obj:
data_store_obj = load_data_store_obj(
data_store_id = data_store_id,
)
data = data_store_obj
else:
data_store_id_random = get_id_random(record_id=data_store_id, table_name='data_store')
data = {}
data['data_store_id'] = data_store_id
data['data_store_id_random'] = data_store_id_random
return mk_resp(data=data, response=commons.response)
# ### END ### API Data Store Routers ### post_data_store_obj() ###
# ### BEGIN ### API Data Store Routers ### patch_data_store_obj() ###
# Updated 2022-03-11
@router.patch('/data_store/{data_store_id}', response_model=Resp_Body_Base)
async def patch_data_store_obj(
data_store_obj: Data_Store_Base,
data_store_id: str = Path(min_length=11, max_length=22),
return_obj: Optional[bool] = True,
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# ### SECTION ### Secondary data validation
data_store_id_random = data_store_id # This is used later for the response data
if data_store_id := redis_lookup_id_random(record_id_random=data_store_id, table_name='data_store'): pass
else: return mk_resp(data=None, status_code=404, response=commons.response, status_message='The Data Store ID was invalid or not found.')
# ### SECTION ### Process data
if data_store_up_result := create_update_data_store_obj(
data_store_dict_obj = data_store_obj,
data_store_id = data_store_id,
): pass
else:
log.warning('Likely bad request')
return mk_resp(data=False, status_code=400, response=commons.response, status_message='Not updated. Something failed while processing the data. Check the field names and data types.') # Bad Request
# ### SECTION ### Return successful results
if return_obj:
data_store_obj = load_data_store_obj(
data_store_id = data_store_id,
)
data = data_store_obj
else:
data = {}
data['data_store_id'] = data_store_id
data['data_store_id_random'] = data_store_id_random
return mk_resp(data=data, response=commons.response)
# ### END ### API Data Store Routers ### patch_data_store_obj() ###
# ### BEGIN ### API Data Store ### get_data_store_obj() ###
# Updated 2026-01-28
@router.get('/data_store/{data_store_id}', response_model=Resp_Body_Base)
async def get_data_store_obj(
data_store_id: str = Path(min_length=11, max_length=22),
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# ### SECTION ### Secondary data validation
if data_store_id := redis_lookup_id_random(record_id_random=data_store_id, table_name='data_store'): pass
else: return mk_resp(data=None, status_code=404, response=commons.response)
if data_store_rec_result := load_data_store_obj(
data_store_id = data_store_id,
limit = commons.limit,
enabled = commons.enabled,
):
log.info('Loading successful. Returning result')
return mk_resp(data=data_store_rec_result, response=commons.response)
elif isinstance(data_store_rec_result, list) or data_store_rec_result is None: # Empty list or None
log.info('No results')
return mk_resp(data=None, status_code=404, response=commons.response) # Not Found
else:
log.warning('Likely bad request')
return mk_resp(data=False, status_code=400, response=commons.response) # Bad Request
# ### END ### API Data Store ### get_data_store_obj() ###
# ### BEGIN ### API Data Store ### get_v3_data_store_obj_w_code() ###
# NEW V3 Endpoint for Code Lookup
# Updated 2026-01-28
@router.get('/v3/data_store/code/{data_store_code}', response_model=Resp_Body_Base, tags=['Data Store V3'])
async def get_v3_data_store_obj_w_code(
data_store_code: str = Path(min_length=3, max_length=50),
for_type: Optional[str] = Query(None, min_length=1, max_length=25),
for_id: Optional[str] = Query(None, min_length=11, max_length=22),
limit: int = Query(1, ge=1, description="Number of results to return (Default: 1)"),
account: AccountContext = Depends(get_account_context),
serialization: SerializationParams = Depends(),
status_filter: StatusFilterParams = Depends(),
delay: DelayParams = Depends(),
):
"""
V3 Standardized Data Store Lookup.
Uses JWT-based AccountContext and supports cascading fallback logic (Object > Account > Global).
Returns a single object if limit=1, otherwise returns a list.
"""
log.setLevel(logging.INFO)
# Map V3 params to the shared handler
v3_commons = Common_Route_Params(
x_account_id=account.account_id,
x_account_id_random=account.account_id_random,
enabled=status_filter.enabled,
response=Response()
)
return await handle_get_data_store_obj_w_code(
data_store_code = data_store_code,
for_type = for_type,
for_id = for_id,
commons = v3_commons,
limit = limit,
delay = delay,
)
# ### END ### API Data Store ### get_v3_data_store_obj_w_code() ###
# ### BEGIN ### API Data Store ### get_data_store_obj_w_code() ###
# NOTE: Adding some explanation because this is not quickly obvious how it fully works.
# The look up order starts with a required data_store_code. Then the first result that matches the most specific method. The for_type and for_id fields are not required. I think it makes the most sense to be a part of the URL path, not the GET params. Either should work with no problem though.
# Lookup using: for_type and for_id > account_id > data_store_code
# This is a nice way to have global default data along with account and object specific data.
# Updated 2023-05-22
@router.get('/data_store/code/{data_store_code}/{for_type}/{for_id}', response_model=Resp_Body_Base)
async def get_data_store_obj_w_code_path(
data_store_code: str = Path(min_length=3, max_length=50),
for_type: Optional[str] = Path(min_length=1, max_length=25),
for_id: Optional[str] = Path(min_length=11, max_length=22),
limit: int = Query(1, ge=1),
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
log.info('Using path parameters')
# ### SECTION ### Call generic function to get the data_store object
return await handle_get_data_store_obj_w_code(
data_store_code = data_store_code,
for_type = for_type,
for_id = for_id,
commons = commons,
limit = limit,
)
@router.get('/data_store/code/{data_store_code}', response_model=Resp_Body_Base)
async def get_data_store_obj_w_code_query(
data_store_code: str = Path(min_length=3, max_length=50),
for_type: Optional[str] = Query(None, min_length=1, max_length=25),
for_id: Optional[str] = Query(None, min_length=11, max_length=22),
limit: int = Query(1, ge=1),
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
log.info('Using query parameters')
# ### SECTION ### Call generic function to get the data_store object
return await handle_get_data_store_obj_w_code(
data_store_code = data_store_code,
for_type = for_type,
for_id = for_id,
commons = commons,
limit = limit,
)
@router.post('/v3/data_store/code/{data_store_code}/search', response_model=Resp_Body_Base, tags=['Data Store V3'])
async def search_v3_data_store_obj_w_code(
data_store_code: str,
search_query: SearchQuery,
for_type: Optional[str] = Query(None),
for_id: Optional[str] = Query(None),
view: str = Query('default'),
account: AccountContext = Depends(get_account_context),
pagination: PaginationParams = Depends(),
status_filter: StatusFilterParams = Depends(),
delay: DelayParams = Depends(),
):
"""
Advanced Search for Data Store (within a code hierarchy).
Enforces Account Context isolation and uses ID Vision (v_data_store).
"""
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
# 1. Resolve parent IDs if provided
resolved_for_id = None
if for_type and for_id:
resolved_for_id = redis_lookup_id_random(record_id_random=for_id, table_name=for_type)
if not resolved_for_id:
return mk_resp(data=False, status_code=404, status_message="Parent for_id not found.")
# 2. Construct the hierarchical search SQL
# We must enforce that users only see their own account records OR global defaults (account_id IS NULL)
from app.db_sql import sql_enable_part, sql_hidden_part, sql_search_qry_part, sql_limit_offset_part
sql_enabled, data_enabled = sql_enable_part('data_store', status_filter.enabled)
sql_hidden, data_hidden = sql_hidden_part('data_store', status_filter.hidden)
# Generate search logic from the SearchQuery model
search_sql, search_data = sql_search_qry_part(
search_query=search_query,
table_name='v_data_store'
)
sql_limit = sql_limit_offset_part(limit=pagination.limit, offset=pagination.offset)
# Prepare parameter dictionary
data = {
'code': data_store_code,
'account_id': account.account_id,
'for_type': for_type,
'for_id': resolved_for_id
}
data.update(search_data)
if data_enabled is not None: data['enable'] = data_enabled
if data_hidden is not None: data['hide'] = data_hidden
# Hierarchical Fallback Logic: (Object Override > Account Override > Global System Default)
# This matches the GET lookup logic but allows multiple results via search.
sql = f"""
SELECT *
FROM `v_data_store` AS `data_store`
WHERE
(
`data_store`.account_id = :account_id
OR `data_store`.account_id IS NULL
OR (`data_store`.for_type = :for_type AND `data_store`.for_id = :for_id)
)
AND `data_store`.code = :code
{sql_enabled}
{sql_hidden}
{search_sql}
ORDER BY `data_store`.for_id DESC, `data_store`.account_id DESC, `data_store`.created_on DESC
{sql_limit};
"""
sql_result = sql_select(sql=sql, data=data, as_list=True)
if sql_result is False:
return mk_resp(data=False, status_code=500, status_message="Database error during search.")
# 3. Parse results through Pydantic model to enforce ID Vision (random IDs only)
try:
data_objs = [Data_Store_Base(**r) for r in sql_result]
except Exception as e:
log.error(f"Validation error during Data Store search: {e}")
return mk_resp(data=False, status_code=500, status_message="Data integrity error during result mapping.")
return mk_resp(data=data_objs)
async def handle_get_data_store_obj_w_code(
data_store_code: str,
for_type: Optional[str],
for_id: Optional[str],
commons: Common_Route_Params,
limit: int = 1,
delay: Optional[DelayParams] = None,
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if delay and delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
# ### SECTION ### Secondary data validation
if for_type and for_id:
if for_id := redis_lookup_id_random(record_id_random=for_id, table_name=for_type): pass
else: return mk_resp(data=None, status_code=404, response=commons.response, status_message='The for type and ID was invalid or not found.')
# NOTE: load_data_store_obj_w_code() returns a list
if data_store_obj_result := load_data_store_obj_w_code(
account_id = commons.x_account_id,
code = data_store_code,
for_type = for_type,
for_id = for_id,
limit = limit,
enabled = commons.enabled,
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.info(f'Loading successful. Returning {len(data_store_obj_result)} result(s)')
# If limit=1, return the first object directly (standard lookup behavior)
# If limit > 1, return the list of results
data = data_store_obj_result[0] if limit == 1 else data_store_obj_result
log.debug(data)
return mk_resp(data=data, response=commons.response)
elif isinstance(data_store_obj_result, list) or data_store_obj_result is None: # Empty list or None
log.info('No results')
return mk_resp(data=None, status_code=404, response=commons.response) # Not Found
else:
log.warning('Likely bad request')
return mk_resp(data=False, status_code=400, response=commons.response) # Bad Request
# ### END ### API Data Store ### get_data_store_obj_w_code() ###
# ### BEGIN ### API Data Store ### get_account_obj_data_store_list() ###
# Updated 2022-03-11
@router.get('/account/{account_id}/data_store/list', response_model=Resp_Body_Base)
async def get_account_obj_data_store_list(
account_id: str = Path(min_length=11, max_length=22),
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: return mk_resp(data=None, status_code=404, response=commons.response)
# Updated 2022-03-11
if data_store_rec_list_result := get_data_store_rec_list(
account_id = account_id,
for_type = 'account',
for_id = account_id,
enabled = commons.enabled,
limit = commons.limit,
offset = commons.offset,
):
data_store_result_list = []
for data_store_rec in data_store_rec_list_result:
if load_data_store_result := load_data_store_obj(
data_store_id = data_store_rec.get('data_store_id', None),
enabled = commons.enabled,
):
data_store_result_list.append(load_data_store_result)
else:
data_store_result_list.append(None)
response_data = data_store_result_list
return mk_resp(data=response_data, response=commons.response)
elif isinstance(data_store_rec_list_result, list) or data_store_rec_list_result is None: # Empty list or None
log.info('No results')
return mk_resp(data=None, status_code=404, response=commons.response) # Not Found
else:
log.warning('Likely bad request')
return mk_resp(data=False, status_code=400, response=commons.response) # Bad Request
# ### END ### API Data Store ### get_account_obj_data_store_list() ###