Reverted to known working version and preserved new file changes in snapshots.

This commit is contained in:
Scott Idem
2025-12-03 20:43:47 -05:00
parent 98b980cf2b
commit 4598256c7c
6 changed files with 1971 additions and 0 deletions

171
app/lib_general_v3.py Normal file
View File

@@ -0,0 +1,171 @@
"""
This file contains general utility functions and helpers specifically for API v3.
It aims to provide a clean slate for new methods and refactor existing ones from lib_general.py
that are relevant to the v3 API, while removing unused or outdated functionalities.
"""
# Standard library imports
import time
import logging
from typing import (
Any,
Dict,
List,
Optional,
Union,
)
# Third-party imports
from fastapi import (
APIRouter,
Depends,
Header,
HTTPException,
Query,
Request,
Response,
status,
)
from pydantic import (
BaseModel,
Field,
ValidationError,
computed_field,
model_validator,
)
# Internal imports (from this project)
from app.config import settings
from app.db_sql import redis_lookup_id_random
from app.log import get_logger
logger = get_logger(__name__)
# --- Pydantic Model for Account Context ---
class AccountContext(BaseModel):
account_id: Optional[int]
account_id_random: Optional[str]
# --- Dependency Function for Account Context ---
def get_account_context(
x_account_id: Optional[str] = Header(None, min_length=11, max_length=22),
x_no_account_id: Optional[str] = Header(None, min_length=3, max_length=100), # Assuming 'bypass' or similar string
x_no_account_id_token: Optional[str] = Query(None, min_length=11, max_length=22),
) -> AccountContext:
"""
Resolves the account context from headers/query parameters with defined precedence.
Precedence: x_account_id (header) > x_no_account_id_token (query) > x_no_account_id (header flag)
Raises HTTPException 403 if no valid account is found and no bypass is indicated.
"""
logger.setLevel(logging.DEBUG) # Adjust as needed
logger.debug(locals())
resolved_account_id = None
resolved_account_id_random = None
if x_account_id:
# Primary check: x_account_id header
resolved_account_id_random = x_account_id
if looked_up_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
resolved_account_id = looked_up_id
logger.info(f'Found account from x_account_id header: {resolved_account_id}')
else:
logger.warning(f'Invalid x_account_id header provided: {x_account_id}')
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Invalid X-Account-ID header.')
elif x_no_account_id_token:
# Secondary check: x_no_account_id_token query parameter
resolved_account_id_random = x_no_account_id_token
if looked_up_id := redis_lookup_id_random(table_name='account', record_id_random=x_no_account_id_token):
resolved_account_id = looked_up_id
logger.info(f'Found account from x_no_account_id_token query: {resolved_account_id}')
else:
logger.warning(f'Invalid x_no_account_id_token query provided: {x_no_account_id_token}')
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Invalid X-No-Account-ID-Token query parameter.')
elif x_no_account_id:
# Tertiary check: x_no_account_id header for bypass
# For now, just presence indicates bypass. Can add a specific value check later if needed.
logger.info(f'X-No-Account-ID header found: {x_no_account_id}. Proceeding without specific account context.')
resolved_account_id = None # Explicitly None for "no specific account"
resolved_account_id_random = '--- NO ACCOUNT ---'
else:
logger.warning('No valid account context provided via X-Account-ID, X-No-Account-ID-Token, or X-No-Account-ID.')
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Account context required. Please provide X-Account-ID, X-No-Account-ID-Token, or X-No-Account-ID.')
return AccountContext(account_id=resolved_account_id, account_id_random=resolved_account_id_random)
# --- Pydantic Model for Pagination ---
class PaginationParams(BaseModel):
limit: int = 100 # Default limit
offset: int = 0
# --- Dependency Function for Pagination ---
def get_pagination_params(
limit: int = Query(100, ge=0, description="Maximum number of items to return"),
offset: int = Query(0, ge=0, description="Number of items to skip (for pagination)"),
) -> PaginationParams:
return PaginationParams(limit=limit, offset=offset)
# --- Pydantic Model for Status Filtering ---
class StatusFilterParams(BaseModel):
enabled: str = 'enabled' # 'enabled', 'disabled', 'all'
hidden: str = 'not_hidden' # 'hidden', 'not_hidden', 'all'
# --- Dependency Function for Status Filtering ---
def get_status_filter_params(
enabled: str = Query('enabled', description="Filter by object enabled status ('enabled', 'disabled', 'all')"),
hidden: str = Query('not_hidden', description="Filter by object hidden status ('hidden', 'not_hidden', 'all')"),
) -> StatusFilterParams:
allowed_enabled_values = {'enabled', 'disabled', 'all'}
allowed_hidden_values = {'hidden', 'not_hidden', 'all'}
if enabled not in allowed_enabled_values:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid value for 'enabled'. Must be one of {list(allowed_enabled_values)}."
)
if hidden not in allowed_hidden_values:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid value for 'hidden'. Must be one of {list(allowed_hidden_values)}."
)
return StatusFilterParams(enabled=enabled, hidden=hidden)
# --- Pydantic Model for Serialization Options ---
class SerializationParams(BaseModel):
by_alias: bool = True
exclude_unset: bool = False
exclude_defaults: bool = False # Added based on common_route_params
exclude_none: bool = False # Added based on common_route_params
# --- Dependency Function for Serialization Options ---
def get_serialization_params(
by_alias: bool = Query(True, description="Whether to use field aliases for serialization"),
exclude_unset: bool = Query(False, description="Whether to exclude unset fields from the response"),
exclude_defaults: bool = Query(False, description="Whether to exclude fields with their default values from the response"),
exclude_none: bool = Query(False, description="Whether to exclude fields that are None from the response"),
) -> SerializationParams:
return SerializationParams(
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_defaults=exclude_defaults,
exclude_none=exclude_none,
)
# --- Pydantic Model for Delay ---
class DelayParams(BaseModel):
sleep_time_ms: int = 0 # Raw delay value in ms
sleep_time_s: float = 0.0 # Converted to seconds for time.sleep()
# --- Dependency Function for Delay ---
def get_delay_params(
x_delay_ms: Optional[int] = Header(0, alias='X-Delay-ms', description="Delay response for X milliseconds (header)"),
delay_ms: Optional[int] = Query(0, description="Delay response for X milliseconds (query parameter)"),
) -> DelayParams:
calculated_delay_ms = max(x_delay_ms or 0, delay_ms or 0)
return DelayParams(sleep_time_ms=calculated_delay_ms, sleep_time_s=calculated_delay_ms / 1000.0)

View File

@@ -0,0 +1,171 @@
"""
This file contains general utility functions and helpers specifically for API v3.
It aims to provide a clean slate for new methods and refactor existing ones from lib_general.py
that are relevant to the v3 API, while removing unused or outdated functionalities.
"""
# Standard library imports
import time
import logging
from typing import (
Any,
Dict,
List,
Optional,
Union,
)
# Third-party imports
from fastapi import (
APIRouter,
Depends,
Header,
HTTPException,
Query,
Request,
Response,
status,
)
from pydantic import (
BaseModel,
Field,
ValidationError,
computed_field,
model_validator,
)
# Internal imports (from this project)
from app.config import settings
from app.db_sql import redis_lookup_id_random
from app.log import get_logger
logger = get_logger(__name__)
# --- Pydantic Model for Account Context ---
class AccountContext(BaseModel):
account_id: Optional[int]
account_id_random: Optional[str]
# --- Dependency Function for Account Context ---
def get_account_context(
x_account_id: Optional[str] = Header(None, min_length=11, max_length=22),
x_no_account_id: Optional[str] = Header(None, min_length=3, max_length=100), # Assuming 'bypass' or similar string
x_no_account_id_token: Optional[str] = Query(None, min_length=11, max_length=22),
) -> AccountContext:
"""
Resolves the account context from headers/query parameters with defined precedence.
Precedence: x_account_id (header) > x_no_account_id_token (query) > x_no_account_id (header flag)
Raises HTTPException 403 if no valid account is found and no bypass is indicated.
"""
logger.setLevel(logging.DEBUG) # Adjust as needed
logger.debug(locals())
resolved_account_id = None
resolved_account_id_random = None
if x_account_id:
# Primary check: x_account_id header
resolved_account_id_random = x_account_id
if looked_up_id := redis_lookup_id_random(table_name='account', record_id_random=x_account_id):
resolved_account_id = looked_up_id
logger.info(f'Found account from x_account_id header: {resolved_account_id}')
else:
logger.warning(f'Invalid x_account_id header provided: {x_account_id}')
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Invalid X-Account-ID header.')
elif x_no_account_id_token:
# Secondary check: x_no_account_id_token query parameter
resolved_account_id_random = x_no_account_id_token
if looked_up_id := redis_lookup_id_random(table_name='account', record_id_random=x_no_account_id_token):
resolved_account_id = looked_up_id
logger.info(f'Found account from x_no_account_id_token query: {resolved_account_id}')
else:
logger.warning(f'Invalid x_no_account_id_token query provided: {x_no_account_id_token}')
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Invalid X-No-Account-ID-Token query parameter.')
elif x_no_account_id:
# Tertiary check: x_no_account_id header for bypass
# For now, just presence indicates bypass. Can add a specific value check later if needed.
logger.info(f'X-No-Account-ID header found: {x_no_account_id}. Proceeding without specific account context.')
resolved_account_id = None # Explicitly None for "no specific account"
resolved_account_id_random = '--- NO ACCOUNT ---'
else:
logger.warning('No valid account context provided via X-Account-ID, X-No-Account-ID-Token, or X-No-Account-ID.')
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Account context required. Please provide X-Account-ID, X-No-Account-ID-Token, or X-No-Account-ID.')
return AccountContext(account_id=resolved_account_id, account_id_random=resolved_account_id_random)
# --- Pydantic Model for Pagination ---
class PaginationParams(BaseModel):
limit: int = 100 # Default limit
offset: int = 0
# --- Dependency Function for Pagination ---
def get_pagination_params(
limit: int = Query(100, ge=0, description="Maximum number of items to return"),
offset: int = Query(0, ge=0, description="Number of items to skip (for pagination)"),
) -> PaginationParams:
return PaginationParams(limit=limit, offset=offset)
# --- Pydantic Model for Status Filtering ---
class StatusFilterParams(BaseModel):
enabled: str = 'enabled' # 'enabled', 'disabled', 'all'
hidden: str = 'not_hidden' # 'hidden', 'not_hidden', 'all'
# --- Dependency Function for Status Filtering ---
def get_status_filter_params(
enabled: str = Query('enabled', description="Filter by object enabled status ('enabled', 'disabled', 'all')"),
hidden: str = Query('not_hidden', description="Filter by object hidden status ('hidden', 'not_hidden', 'all')"),
) -> StatusFilterParams:
allowed_enabled_values = {'enabled', 'disabled', 'all'}
allowed_hidden_values = {'hidden', 'not_hidden', 'all'}
if enabled not in allowed_enabled_values:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid value for 'enabled'. Must be one of {list(allowed_enabled_values)}."
)
if hidden not in allowed_hidden_values:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid value for 'hidden'. Must be one of {list(allowed_hidden_values)}."
)
return StatusFilterParams(enabled=enabled, hidden=hidden)
# --- Pydantic Model for Serialization Options ---
class SerializationParams(BaseModel):
by_alias: bool = True
exclude_unset: bool = False
exclude_defaults: bool = False # Added based on common_route_params
exclude_none: bool = False # Added based on common_route_params
# --- Dependency Function for Serialization Options ---
def get_serialization_params(
by_alias: bool = Query(True, description="Whether to use field aliases for serialization"),
exclude_unset: bool = Query(False, description="Whether to exclude unset fields from the response"),
exclude_defaults: bool = Query(False, description="Whether to exclude fields with their default values from the response"),
exclude_none: bool = Query(False, description="Whether to exclude fields that are None from the response"),
) -> SerializationParams:
return SerializationParams(
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_defaults=exclude_defaults,
exclude_none=exclude_none,
)
# --- Pydantic Model for Delay ---
class DelayParams(BaseModel):
sleep_time_ms: int = 0 # Raw delay value in ms
sleep_time_s: float = 0.0 # Converted to seconds for time.sleep()
# --- Dependency Function for Delay ---
def get_delay_params(
x_delay_ms: Optional[int] = Header(0, alias='X-Delay-ms', description="Delay response for X milliseconds (header)"),
delay_ms: Optional[int] = Query(0, description="Delay response for X milliseconds (query parameter)"),
) -> DelayParams:
calculated_delay_ms = max(x_delay_ms or 0, delay_ms or 0)
return DelayParams(sleep_time_ms=calculated_delay_ms, sleep_time_s=calculated_delay_ms / 1000.0)

100
app/log.py.snapshot Normal file
View File

@@ -0,0 +1,100 @@
import functools, logging
from app.config import settings
# stream options: 'ext://sys.stderr' or 'ext://sys.stdout'
# NOTE: This log config is confusing and may need work... 2022-10-07
# 'uvicorn' under 'loggers' creates an output to the 'console' handler
# Do not also add 'console' handler to the 'root' 'handlers' list
# For now just using that to add or remove file logging options.
# logging.config.dictConfig({
# 'version': 1,
# 'formatters': {
# 'default': {'format': '[%(asctime)s] %(levelname)s @ %(module)s.%(funcName)s()#%(lineno)d: %(message)s'},
# 'long': {'format': '[%(asctime)s] %(levelname)s @ %(module)s.%(funcName)s()#%(lineno)d: %(message)s', 'datefmt': '%Y-%m-%d %H:%M:%S'},
# 'short': {'format': '[%(asctime)s] %(levelname)s @ %(module)s.%(funcName)s()#%(lineno)d: %(message)s', 'datefmt': '%H:%M:%S', 'use_colors': True},
# },
# #'filename': 'example.log',
# # 'level': logging.ERROR,
# 'handlers': {
# 'console': {
# 'class': 'logging.StreamHandler',
# 'stream': 'ext://sys.stderr',
# 'formatter': 'short',
# },
# 'log_file_all': {
# 'level': 'NOTSET',
# 'class': 'logging.handlers.RotatingFileHandler',
# 'formatter': 'long',
# 'filename': settings.LOG_PATH['app'],
# 'maxBytes': 10485760, # 5,242,880 = 5 MB; 10,485,760 = 10 MB
# 'backupCount': 9
# },
# # 'log_file_warning': {
# # 'level': 'WARNING',
# # 'class': 'logging.handlers.RotatingFileHandler',
# # 'formatter': 'long',
# # 'filename': settings.LOG_PATH['app_warning'],
# # 'maxBytes': 512000, # 524,288 = 512KB
# # 'backupCount': 9
# # },
# # 'test_handler': {
# # 'class': 'logging.StreamHandler',
# # 'level': 'INFO',
# # 'formatter': 'short',
# # },
# # 'test_handler_all_rotate': {
# # 'class': 'logging.handlers.RotatingFileHandler',
# # 'level': 'NOTSET',
# # 'formatter': 'short',
# # 'filename': '/logs/test_rotate.log',
# # 'maxBytes': 100000, # 5120000 = 5 MB
# # 'backupCount': 2,
# # }
# },
# 'loggers': {
# # 'uvicorn': {'handlers': ['default'], 'level': 'INFO'},
# 'uvicorn': {'handlers': ['console'], 'level': 'INFO'},
# # 'uvicorn.error': {'level': 'INFO', 'handlers': ['default'], 'propagate': True},
# # 'uvicorn.error': {'level': 'INFO', 'handlers': ['console'], 'propagate': True},
# # 'uvicorn.access': {'handlers': ['access'], 'level': 'INFO', 'propagate': False},
# # 'gunicorn': {'handlers': ['console'], 'level': 'INFO'},
# },
# 'root': {
# 'handlers': ['log_file_all'], #, 'log_file_all', 'log_file_warning'],
# # 'handlers': ['console', 'log_file_all'], #, 'log_file_all', 'log_file_warning'],
# 'level': 'WARNING', # WARNING
# }
# })
# log = logging.getLogger('root')
# # log.setLevel(logging.INFO) # DEBUG > INFO > WARNING > ERROR > CRITICAL
# # logging.basicConfig(
# # format='[%(asctime)s] %(levelname)s @ %(module)s.%(funcName)s()#%(lineno)d: %(message)s'
# # )
# ### BEGIN ### Log ### logger_reset() ###
# https://realpython.com/primer-on-python-decorators/
# Updated 2022-02-15
# def logger_reset(func):
# # log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# # log.info(locals())
# @functools.wraps(func)
# def wrapper(*args, **kwargs):
# if func.__name__ not in ['redis_lookup_id_random', 'sql_enable_part', 'sql_hidden_part']:
# log.info(f'*** Function: "{func.__name__}()"')
# log.debug(f'*** Function Positional Args: {args}\nFunction Key Args: {kwargs}')
# init_log_level = log.level
# returned_result = func(*args, **kwargs)
# log.debug(f'*** Function finished: "{func.__name__}()". Resetting logger level to level: {log.level} ***')
# log.setLevel(init_log_level)
# return returned_result
# return wrapper
# ### END ### Log ### logger_reset() ###
def get_logger(name: str) -> logging.Logger:
return logging.getLogger(name)

625
app/main.py.snapshot Normal file
View File

@@ -0,0 +1,625 @@
import datetime, json, os, pytz, random, secrets # , uvicorn
from enum import Enum
#from datetime import datetime, time, timedelta
from fastapi import Body, Cookie, Depends, FastAPI, File, Form, Header, HTTPException, Path, Query, Request, Response, status, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, PlainTextResponse
from fastapi.staticfiles import StaticFiles
from functools import lru_cache
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
from . import config
from app.log import log, logging
# Import the routers here first:
from app.routers import ae_obj, aether_cfg, api_crud, api_crud_v2, api_crud_v3, api, importing, sql, account, activity_log, address, archive, archive_content, contact, cont_edu_cert, cont_edu_cert_person, data_store, event, event_abstract, event_badge, event_badge_importing, event_badge_template, event_device, event_exhibit, event_exhibit_tracking, event_file, event_importing, event_location, event_person, event_person_detail, event_person_tracking, event_presentation, event_presenter, event_registration, event_session, flask_cfg, fundraising, grant, hosted_file, journal, journal_entry, log_client_viewing, lookup, membership_cfg, membership_group, membership_person_group, membership_person, membership_person_profile, membership_type, membership_person_type, order, order_v3, order_line, order_cart, organization, page, person, person_user, post, post_comment, product, qr, site, site_domain, user, util_email, websockets_redis, e_confex, e_cvent, c_idaa, e_impexium, e_stripe
# from app.routers import aether_cfg, sql
from app.db_sql import sql_select, reset_redis # , sql_connect
print('### **** *** ** * The Aether API v4 using FastAPI is loading... * ** *** **** ###')
app = FastAPI(
# debug = True,
title = 'Aether API',
description = 'One Sky IT\'s Aether API v4 using FastAPI.',
version = '4.9.0',
operationsSorter = 'method',
)
log.setLevel(logging.INFO)
# log.debug(config.settings)
if aether_cfg_sql_result := sql_select(
table_name = 'cfg',
record_id = config.settings.AETHER_CFG['id'],
as_list = False,
max_count = 1,
):
aether_cfg_sql = aether_cfg_sql_result
config.settings.DB['server'] = aether_cfg_sql.get('db_server')
config.settings.DB['port'] = aether_cfg_sql.get('db_port')
config.settings.DB['name'] = aether_cfg_sql.get('db_name')
config.settings.DB['username'] = aether_cfg_sql.get('db_username')
config.settings.DB['password'] = aether_cfg_sql.get('db_password')
DB = config.settings.DB
config.settings.SQLALCHEMY_DB_URI = 'mysql://'+DB['username']+':'+DB['password']+'@'+DB['server']+'/'+DB['name']
# db_result = sql_connect(config.settings.SQLALCHEMY_DB_URI)
log.debug(config.settings.DB)
config.settings.SMTP['server'] = aether_cfg_sql.get('smtp_server')
config.settings.SMTP['port'] = aether_cfg_sql.get('smtp_port')
config.settings.SMTP['username'] = aether_cfg_sql.get('smtp_username')
config.settings.SMTP['password'] = aether_cfg_sql.get('smtp_password')
# config.settings.FILES_PATH['hosted_files_root'] = aether_cfg_sql.get('PATH_HOSTED_FILES_ROOT')
# config.settings.FILES_PATH['hosted_tmp_root'] = aether_cfg_sql.get('PATH_HOSTED_TMP_ROOT')
config.settings.FILES_PATH['hosted_files_root'] = aether_cfg_sql.get('path_hosted_files_root')
config.settings.FILES_PATH['hosted_tmp_root'] = aether_cfg_sql.get('path_hosted_tmp_root')
else:
# aether_cfg_sql_result
pass
log.debug(aether_cfg_sql_result)
log.debug(config.settings)
# @lru_cache()
# def get_settings():
# return config.Settings()
app.mount('/static', StaticFiles(directory='static'), name='static')
# Set up each route once the router has been imported
app.include_router(
ae_obj.router,
prefix='/ae_obj',
tags=['AE Object'],
)
app.include_router(
aether_cfg.router,
tags=['Aether Config'],
)
app.include_router(
api_crud.router,
prefix='/crud',
tags=['CRUD v1.2 (Legacy)'],
#dependencies=[Depends(get_token_header)],
#dependencies=[Depends(get_account_header)],
#responses={404: {'description': 'Not found'}},
)
app.include_router(
api_crud_v2.router,
prefix='/v2/crud',
tags=['CRUD v2.5'],
#dependencies=[Depends(get_token_header)],
#dependencies=[Depends(get_account_header)],
#responses={404: {'description': 'Not found'}},
)
app.include_router(
api_crud_v3.router,
prefix='/v3/crud',
tags=['CRUD v3'],
)
app.include_router(
api.router,
prefix='/api',
tags=['API'],
)
app.include_router(
flask_cfg.router,
prefix='/flask_cfg',
tags=['Flask CFG'],
)
app.include_router(
importing.router,
prefix='/importing',
tags=['Importing'],
)
app.include_router(
sql.router,
# prefix='/sql',
tags=['SQL'],
)
# # app.include_router(
# # flask_cfg.router,
# # prefix='/redis',
# # tags=['Redis'],
# # )
app.include_router(
account.router,
# prefix='/account',
tags=['Account'],
)
app.include_router(
activity_log.router,
prefix='/activity_log',
tags=['Activity Log'],
)
app.include_router(
address.router,
prefix='/address',
tags=['Address'],
)
app.include_router(
archive.router,
# prefix='/archive',
tags=['Archive'],
)
app.include_router(
archive_content.router,
prefix='/archive/content',
tags=['Archive Content'],
)
app.include_router(
contact.router,
prefix='/contact',
tags=['Contact'],
)
app.include_router(
cont_edu_cert.router,
tags=['Cont Edu Cert'],
)
app.include_router(
cont_edu_cert_person.router,
tags=['Cont Edu Cert Person'],
)
app.include_router(
data_store.router,
# prefix='/data_store',
tags=['Data Store'],
)
app.include_router(
event.router,
# prefix='/event',
tags=['Event'],
)
app.include_router(
event_abstract.router,
tags=['Event Abstract'],
)
app.include_router(
event_badge.router,
tags=['Event Badge'],
)
app.include_router(
event_badge_importing.router,
tags=['Event Badge Importing'],
)
app.include_router(
event_badge_template.router,
# prefix='/event/badge/template',
tags=['Event Badge Template'],
)
app.include_router(
event_device.router,
# prefix='/event/device',
tags=['Event Device'],
)
app.include_router(
event_exhibit.router,
# prefix='/event/exhibit',
tags=['Event Exhibit'],
)
app.include_router(
event_exhibit_tracking.router,
# prefix='/event/exhibit/tracking',
tags=['Event Exhibit Tracking'],
)
app.include_router(
event_file.router,
# prefix='/event/file',
tags=['Event File'],
)
app.include_router(
event_importing.router,
# prefix='/event/importing',
tags=['Event Importing'],
)
app.include_router(
event_location.router,
# prefix='/event/location',
tags=['Event Location'],
)
app.include_router(
event_person.router,
# prefix='/event/person',
tags=['Event Person'],
)
app.include_router(
event_person.router,
prefix='/event/person/detail',
tags=['Event Person Detail'],
)
app.include_router(
event_person_tracking.router,
tags=['Event Person Tracking'],
)
app.include_router(
event_presentation.router,
# prefix='/event/presentation',
tags=['Event Presentation'],
)
app.include_router(
event_presenter.router,
prefix='/event/presenter',
tags=['Event Presenter'],
)
app.include_router(
event_registration.router,
prefix='/event/registration',
tags=['Event Registration'],
)
app.include_router(
event_session.router,
# prefix='/event/session',
tags=['Event Session'],
)
app.include_router(
fundraising.router,
tags=['Fundraising'],
)
app.include_router(
grant.router,
tags=['Grant'],
)
app.include_router(
hosted_file.router,
prefix='/hosted_file',
tags=['Hosted File'],
)
app.include_router(
journal.router,
prefix='/journal',
tags=['Journal'],
)
app.include_router(
journal_entry.router,
# prefix='/journal/entry',
tags=['Journal Entry'],
)
app.include_router(
log_client_viewing.router,
# prefix='/log/client_viewing',
tags=['Log Client Viewing'],
)
app.include_router(
lookup.router,
prefix='/lu',
tags=['Lookup'],
)
app.include_router(
membership_cfg.router,
tags=['Membership Config'],
)
app.include_router(
membership_group.router,
tags=['Membership Group'],
)
app.include_router(
membership_person_group.router,
tags=['Membership Group Person'],
)
app.include_router(
membership_person_profile.router,
tags=['Membership Person Profile'],
)
app.include_router(
membership_person.router,
tags=['Membership Person'],
)
app.include_router(
membership_type.router,
tags=['Membership Type'],
)
app.include_router(
membership_person_type.router,
tags=['Membership Type Person'],
)
app.include_router(
order.router,
# prefix='/order',
tags=['Order'],
)
app.include_router(
order_v3.router,
# prefix='/order',
tags=['Order v3'],
)
app.include_router(
order_line.router,
# prefix='/order',
tags=['Order Line'],
)
app.include_router(
order_cart.router,
prefix='/order/cart',
tags=['Order Cart'],
)
app.include_router(
organization.router,
prefix='/organization',
tags=['Organization'],
)
app.include_router(
page.router,
prefix='/page',
tags=['Page'],
)
app.include_router(
person.router,
tags=['Person'],
)
app.include_router(
person_user.router,
prefix='/person_user',
tags=['Person User'],
)
app.include_router(
post.router,
# prefix='/post',
tags=['Post'],
)
app.include_router(
post_comment.router,
prefix='/post/comment',
tags=['Post Comment'],
)
app.include_router(
product.router,
# prefix='/product',
tags=['Product'],
)
app.include_router(
qr.router,
tags=['QR'],
)
app.include_router(
site.router,
# prefix='/site',
tags=['Site'],
)
app.include_router(
site_domain.router,
# prefix='/site/domain',
tags=['Site Domain'],
)
app.include_router(
user.router,
tags=['User'],
)
app.include_router(
util_email.router,
tags=['Utility: Email'],
)
# app.include_router(
# websockets.router,
# # prefix='/websocket',
# tags=['Websockets'],
# # dependencies=[Depends(get_token_header)],
# # responses={404: {'description': 'Not found'}},
# )
app.include_router(
websockets_redis.router,
tags=['Websockets (Redis)'],
)
app.include_router(
e_confex.router,
prefix='/e/confex',
tags=['External Service: Confex'],
)
app.include_router(
e_cvent.router,
prefix='/e/cvent',
tags=['External Service: Cvent'],
)
app.include_router(
e_impexium.router,
prefix='/e/impexium',
tags=['External Service: Impexium'],
)
app.include_router(
e_stripe.router,
prefix='/e/stripe',
tags=['External Service: Stripe'],
)
app.include_router(
c_idaa.router,
prefix='/c/idaa',
tags=['Client: IDAA'],
)
# BEGIN: CORS
# NOTE: Eventually this should query the DB for the specific list based on the cfg table and or site_domain table. That way it is dynamic and only allowing those defined in the DB. No wildcards or regex.
# NOTE: Need to include .localhost for less browser restrictions! Mainly for audio and video.
app.add_middleware(
CORSMiddleware,
# allow_origins = origins,
allow_origins = config.settings.ORIGINS,
allow_origin_regex = config.settings.ORIGINS_REGEX,
# allow_origin_regex = 'https://.*\.oneskyit\.com',
allow_credentials = True,
allow_methods = ['*'],
allow_headers = ['*'],
#expose_headers = [],
#max_age = 600,
)
# END: CORS
@app.on_event('startup')
async def startup():
log.setLevel(logging.INFO) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
log.info('The Aether FastAPI API is starting up...')
#await database.connect()
@app.on_event('shutdown')
async def shutdown():
log.setLevel(logging.INFO) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
log.info('The Aether FastAPI API is shutting down...')
#await database.disconnect()
#Add the processing time to the response header.
@app.middleware('http')
async def add_process_time_header(request: Request, call_next):
import time
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers['X-Process-Time'] = str(process_time)
return response
# ### BEGIN ### API Main ### fastapi_root() ###
@app.get('/', tags=['Root'], response_class=PlainTextResponse)
async def fastapi_root(response: Response = Response):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# log.info(config.settings.APP_NAME)
log.info('One Sky IT\'s Aether API root (FastAPI)')
log.info('***')
log.debug('This is debug') # 10 DEBUG
log.info('This is info') # 20 INFO
log.warning('This is a warning') # 30 WARNING (and WARN)
log.error('This is an error') # 40 ERROR
log.exception('This is an exception') # 40 ERROR
log.critical('This is critical') # 50 CRITICAL
log.info('^^^')
log.warning('Resetting Redis...')
reset_redis()
log.info('Reset Redis')
response_data = {}
response_data['message'] = 'This is One Sky IT\'s Aether API root (FastAPI).'
current_datetime = datetime.datetime.now()
current_datetime_string = current_datetime.isoformat()
timezone = pytz.timezone("America/New_York")
current_datetime_tz = timezone.localize(current_datetime)
current_datetime_tz_string = current_datetime_tz.isoformat()
current_datetime_utc = datetime.datetime.utcnow()
current_datetime_utc_string = current_datetime_utc.isoformat()
current_datetime_utc_localize = pytz.utc.localize(current_datetime_utc)
current_datetime_utc_localize_string = current_datetime_utc_localize.isoformat()
current_datetime_utc_localize_pst = current_datetime_utc_localize.astimezone(pytz.timezone("America/Los_Angeles"))
current_datetime_utc_localize_pst_string = current_datetime_utc_localize_pst.isoformat()
response_data['datetime'] = current_datetime_string
response_data['datetime_tz'] = current_datetime_tz_string
response_data['datetime_utc'] = current_datetime_utc_string
response_data['datetime_utc_localize'] = current_datetime_utc_localize_string
response_data['datetime_utc_localize_pst'] = current_datetime_utc_localize_pst_string
response_data['url_safe_string_4_bytes_1'] = secrets.token_urlsafe(4)
response_data['url_safe_string_8_bytes_1'] = secrets.token_urlsafe(8)
response_data['url_safe_string_8_bytes_2'] = secrets.token_urlsafe(8)
response_data['url_safe_string_8_bytes_3'] = secrets.token_urlsafe(8)
response_data['url_safe_string_8_bytes_4'] = secrets.token_urlsafe(8)
response_data['url_safe_string_8_bytes_5'] = secrets.token_urlsafe(8)
response_data['url_safe_string_16_bytes_1'] = secrets.token_urlsafe(16)
response_data['url_safe_string_16_bytes_2'] = secrets.token_urlsafe(16)
response_data['url_safe_string_16_bytes_3'] = secrets.token_urlsafe(16)
response_data['url_safe_string_16_bytes_4'] = secrets.token_urlsafe(16)
response_data['url_safe_string_16_bytes_5'] = secrets.token_urlsafe(16)
response_data['hex_string_4_bytes_1'] = secrets.token_hex(4)
response_data['hex_string_8_bytes_1'] = secrets.token_hex(8)
response_data['hex_string_16_bytes_1'] = secrets.token_hex(16)
response_data['hex_string_32_bytes_1'] = secrets.token_hex(32)
log.debug(json.dumps(response_data, indent=4))
return json.dumps(response_data, indent=4) # , sort_keys=True
# ### END ### API Main ### fastapi_root() ###
# ### BEGIN ### API Main ### generate_id_random() ###
# NOTE: This is just a quick utility function to generate a bunch of random IDs.
# Updated 2022-03-30
@app.get('/generate_id_random', tags=['Root'], response_class=PlainTextResponse)
async def generate_id_random(response: Response = Response):
log.setLevel(logging.INFO) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
response_data = {}
html_list = '<ul>'
for x in range(50):
html_list += f'<li>{secrets.token_urlsafe(8)}</li>'
html_list += '</ul>'
return HTMLResponse(content=html_list, status_code=200)
# ### END ### API Main ### generate_id_random() ###
# ### BEGIN ### API Main ### sql_test() ###
# ### TEST TEST TEST ### #
@app.get('/sql_test', tags=['Testing'], response_class=PlainTextResponse)
async def sql_test(response: Response = Response):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARN, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
return mk_resp(data=False, status_code=501, response=response)
log.info('Getting all accounts from DB...')
sql = text(
"""
SELECT id, id_random, name, enable
FROM `account`
"""
)
try:
result = db.execute(sql)
except Exception as e:
log.error('*** An exception happened. ***')
log.error(repr(e))
log.error('***')
log.error(str(e))
log.error('^^^ exception ^^^')
else:
if result.rowcount:
record_li = [dict(record) for record in result.fetchall()]
log.debug(record_li)
else:
log.error('No records found. Something went wrong.')
log.info('Got the account list')
response_data = {}
response_data['message'] = 'This is the Aether API using FastAPI.'
response_data['data'] = record_li
return json.dumps(response_data, indent=4) # , sort_keys=True
# ### END ### API Main ### sql_test() ###

View File

@@ -0,0 +1,134 @@
from app.lib_general import log, logging, Response, status
# ### BEGIN ### API Response Model ### Resp_Body_Base() ###
# The pydantic BaseModel to help make consistent REST responses.
# Updated 2021-03-05
class Resp_Body_Base(BaseModel):
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# test_prop: Optional[str] = Field(
# alias = 'test_prop_alias'
# )
data: Union[None, list, dict]
meta: Optional[dict]
# ### END ### API Response Model ### Resp_Body_Base() ###
# ### BEGIN ### API Response Model ### mk_resp() ###
# The method for making responses for REST. Returns a dict, but currently uses Resp_Body_Base inside the function.
# Update 2021-08-23
def mk_resp(
data: None|bool|dict|list,
tmp_file_path: None|str = None,
dict_to_json: bool = False,
status_code: int = 200,
status_message: str = '',
status_name: str = '',
success: bool = True,
details: str = '',
include: dict = None,
exclude: dict = None,
by_alias: bool = True,
exclude_unset: bool = False,
response: Response = None
) -> dict:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if data is None: data_out = { 'result': data }
elif data == False: data_out = { 'result': data }
elif data == True: data_out = { 'result': data }
elif isinstance(data, dict):
log.info('Data type is a dict')
data_out = data
elif isinstance(data, list):
log.info('Data type is a list')
data_out = data
elif isinstance(data, int):
log.info('Data type is an int')
data_out = { 'result': data }
elif isinstance(data, str):
log.info('Data type is a str')
data_out = { 'result': data }
else: # Assuming it is still and object. This should be improved. Example model type: "<class 'app.models.account_models.Account_Base'>"
log.info('Data type is other')
data_out = data.dict(include=include, exclude=exclude, by_alias=by_alias, exclude_unset=exclude_unset)
# log.debug(data_out)
resp_body = {}
resp_body['data'] = data_out
resp_body['meta'] = {}
resp_body['meta']['details'] = details
resp_body['meta']['status_code'] = status_code
if status_message:
resp_body['meta']['status_message'] = status_message
else:
resp_body['meta']['status_message'] = http_status_li[status_code]['message']
resp_body['meta']['status_name'] = http_status_li[status_code]['name']
resp_body['meta']['success'] = success
resp_body['meta']['tmp_file_path'] = tmp_file_path
if isinstance(data, bool):
resp_body['meta']['data_type'] = 'bool'
elif isinstance(data, int):
resp_body['meta']['data_type'] = 'int'
elif isinstance(data, str):
resp_body['meta']['data_type'] = 'str'
elif isinstance(data, dict):
resp_body['meta']['data_type'] = 'dict'
elif isinstance(data, list):
resp_body['meta']['data_type'] = 'list'
resp_body['meta']['data_list_count'] = len(data)
if response:
log.debug(response)
if status_code == 400:
log.warning('Likely bad request')
response.status_code = status.HTTP_400_BAD_REQUEST
elif status_code == 401: response.status_code = status.HTTP_401_UNAUTHORIZED
# elif status_code == 402: response.status_code = status.HTTP_402_X
elif status_code == 403: response.status_code = status.HTTP_403_FORBIDDEN
elif status_code == 404:
log.info('No results')
response.status_code = status.HTTP_404_NOT_FOUND
elif status_code == 408: response.status_code = status.HTTP_408_REQUEST_TIMEOUT
elif status_code == 429: response.status_code = status.HTTP_429_TOO_MANY_REQUESTS
elif status_code == 500: response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
elif status_code == 501: response.status_code = status.HTTP_501_NOT_IMPLEMENTED
elif status_code == 502: response.status_code = status.HTTP_502_BAD_GATEWAY
elif status_code == 503: response.status_code = status.HTTP_503_SERVICE_UNAVAILABLE
elif status_code == 504: response.status_code = status.HTTP_504_GATEWAY_TIMEOUT
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
# log.debug(resp_body)
# log.debug(type(resp_body['data']))
# import json
# with open('data.txt', 'w') as outfile:
# json.dump(resp_body['data'], outfile)
resp_body_obj = Resp_Body_Base(**resp_body)
log.debug(resp_body_obj)
resp_body_dict = resp_body_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset)
log.debug(resp_body_dict)
return resp_body_dict
# ### END ### API Response Model ### mk_resp() ###
http_status_li = {}
http_status_li[200] = { 'name': 'OK', 'message': 'The request has succeeded.' }
http_status_li[400] = { 'name': 'Bad Request', 'message': 'The request could not be understood by the server due to malformed syntax. The client SHOULD NOT repeat the request without modifications.' }
http_status_li[401] = { 'name': 'Unauthorized', 'message': 'The server could not verify that you are authorized to access the URL requested. You either supplied the wrong credentials (e.g. a bad password), or your browser does not understand how to supply the credentials required.' }
http_status_li[402] = { 'name': '?Request Failed?', 'message': '??The parameters were valid but the request failed.??' }
http_status_li[403] = { 'name': 'Forbidden', 'message': 'The server understood the request, but is refusing to fulfill it. Authorization will not help and the request SHOULD NOT be repeated. If the request method was not HEAD and the server wishes to make public why the request has not been fulfilled, it SHOULD describe the reason for the refusal in the entity. If the server does not wish to make this information available to the client, the status code 404 (Not Found) can be used instead.' }
http_status_li[404] = { 'name': 'Not Found', 'message': 'The requested resource does not exist.' }
http_status_li[409] = { 'name': 'Conflict', 'message': 'The request conflicts with another request (perhaps due to using the same idempotent key).' }
http_status_li[429] = { 'name': 'Too Many Requests', 'message': 'Too many requests hit the API too quickly. We recommend an exponential backoff of your requests.' }
http_status_li[500] = { 'name': 'Internal Server Error', 'message': 'The server encountered an unexpected condition which prevented it from fulfilling the request.' }
http_status_li[501] = { 'name': 'Not Implemented', 'message': 'The server does not support the functionality required to fulfill the request. This is the appropriate response when the server does not recognize the request method and is not capable of supporting it for any resource.' }
http_status_li[502] = { 'name': 'Bad Gateway', 'message': 'The server, while acting as a gateway or proxy, received an invalid response from the upstream server it accessed in attempting to fulfill the request.' }
http_status_li[503] = { 'name': 'Service Unavailable', 'message': 'The server is currently unable to handle the request due to a temporary overloading or maintenance of the server. The implication is that this is a temporary condition which will be alleviated after some delay. If known, the length of the delay MAY be indicated in a Retry-After header. If no Retry-After is given, the client SHOULD handle the response as it would for a 500 response.' }
http_status_li[504] = { 'name': 'Gateway Timeout', 'message': 'The server, while acting as a gateway or proxy, did not receive a timely response from the upstream server specified by the URI (e.g. HTTP, FTP, LDAP) or some other auxiliary server (e.g. DNS) it needed to access in attempting to complete the request.' }

View File

@@ -0,0 +1,770 @@
from fastapi import APIRouter, Depends, Header, HTTPException, Path, Query, Request, Response, status
from typing import Dict, List, Optional, Set, Union
import json
import urllib
import time
from app.lib_general import log, logging, Common_Route_Params, common_route_params
from app.models.response_models import *
from app.ae_obj_types_def import obj_type_kv_li
from app.db_sql import redis_lookup_id_random, sql_select, sql_insert, sql_update, sql_delete, get_id_random
router = APIRouter()
@router.get("/health", response_model=Resp_Body_Base)
async def health_check(
x_delay_ms: Optional[int] = Header(0, alias='X-Delay-ms'),
delay_ms: Optional[int] = Query(0),
):
"""
Health check endpoint for V3 API.
"""
log.setLevel(logging.INFO)
log.info("V3 Health Check Endpoint Hit")
sleep_time = max(x_delay_ms, delay_ms)
if sleep_time > 0:
log.info(f"Delaying response for {sleep_time} ms.")
time.sleep(sleep_time / 1000)
return mk_resp(data={"status": "V3 API is healthy!"})
@router.get('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def get_obj(
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
x_delay_ms: Optional[int] = Header(0, alias='X-Delay-ms'),
delay_ms: Optional[int] = Query(0),
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Get a single top-level object by its random ID.
Examples:
- /v3/crud/journal/{journal_id}
- /v3/crud/account/{account_id}
"""
log.setLevel(logging.WARNING)
log.debug(locals())
sleep_time = max(x_delay_ms, delay_ms)
if sleep_time > 0:
log.info(f"Delaying response for {sleep_time} ms.")
time.sleep(sleep_time / 1000)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Object with ID '{obj_id}' not found.")
if sql_result := sql_select(table_name=table_name, record_id=record_id):
resp_data = base_name(**sql_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=resp_data, response=commons.response)
else:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Object with ID '{obj_id}' not found in database.")
@router.get('/{obj_type_l1}/', response_model=Resp_Body_Base)
async def get_obj_li(
obj_type_l1: str,
for_obj_type: Optional[str] = None,
for_obj_id: Optional[str] = None,
hidden: str = 'not_hidden',
order_by_li: Optional[str] = None,
jp: Optional[Union[str, None]] = None,
x_delay_ms: Optional[int] = Header(0, alias='X-Delay-ms'),
delay_ms: Optional[int] = Query(0),
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Get a list of top-level objects.
Examples:
- /v3/crud/journal/
- /v3/crud/journal/?for_obj_type=account&for_obj_id={account_id_random}
- /v3/crud/journal/?jp={"qry":[{"type":"AND","field":"for_type","operator":"=","value":"user"},{"type":"AND","field":"for_id","operator":"=","value":<user_id>}]}
"""
log.setLevel(logging.WARNING)
log.debug(locals())
sleep_time = max(x_delay_ms, delay_ms)
if sleep_time > 0:
log.info(f"Delaying response for {sleep_time} ms.")
time.sleep(sleep_time / 1000)
# This should be a list of SQL WHERE parts defined in JSON.
qry_dict_li = None
fulltext_qry_dict_obj = None
and_qry_dict_obj = None
and_like_dict_obj = None
or_like_dict_obj = None
and_in_dict_li_obj = None
jp_obj = None
if jp:
try:
jp_obj = json.loads(urllib.parse.unquote(jp))
except Exception as e:
log.warning(e)
return mk_resp(data=False, status_code=400, response=commons.response, status_message='The JSON string was not formatted correctly.')
if jp_obj.get('qry'):
qry_dict_li = jp_obj['qry']
if jp_obj.get('ft_qry'):
fulltext_qry_dict_obj = jp_obj['ft_qry']
if jp_obj.get('and_qry'):
and_qry_dict_obj = jp_obj['and_qry']
if jp_obj.get('and_like'):
and_like_dict_obj = jp_obj['and_like']
if jp_obj.get('or_like'):
or_like_dict_obj = jp_obj['or_like']
if jp_obj.get('and_in_li'):
and_in_dict_li_obj = jp_obj['and_in_li']
if order_by_li:
order_by_li = json.loads(order_by_li)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
if for_obj_type and for_obj_id:
# Resolve random ID to integer ID
resolved_for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
if not resolved_for_obj_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Parent object with ID '{for_obj_id}' not found.")
field_name = f'{for_obj_type}_id' # Assuming convention like 'account_id' for for_obj_type='account'
sql_result = sql_select(
table_name=table_name,
field_name=field_name,
field_value=resolved_for_obj_id,
enabled=commons.enabled,
hidden=hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=commons.limit,
offset=commons.offset,
as_list=True,
)
else:
sql_result = sql_select(
table_name=table_name,
enabled=commons.enabled,
hidden=hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=commons.limit,
offset=commons.offset,
as_list=True,
)
if sql_result:
resp_data_li = []
for record in sql_result:
resp_data = base_name(**record).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
resp_data_li.append(resp_data)
return mk_resp(data=resp_data_li, response=commons.response)
else:
return mk_resp(data=[], status_code=200, response=commons.response) # Return empty list on no results
@router.post('/{obj_type_l1}/', response_model=Resp_Body_Base)
async def post_obj(
request: Request,
obj_type_l1: str = Path(min_length=2, max_length=50),
return_obj: Optional[bool] = True,
x_delay_ms: Optional[int] = Header(0, alias='X-Delay-ms'),
delay_ms: Optional[int] = Query(0),
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Create a new top-level object.
Examples:
- POST /v3/crud/journal/ (with Journal_Base in body)
"""
log.setLevel(logging.WARNING)
log.debug(locals())
sleep_time = max(x_delay_ms, delay_ms)
if sleep_time > 0:
log.info(f"Delaying response for {sleep_time} ms.")
time.sleep(sleep_time / 1000)
obj_data = await request.json()
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_insert or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
# Validate incoming data with the appropriate Pydantic model
try:
validated_obj = input_model(**obj_data)
except Exception as e:
log.warning(f"Validation error for {obj_name}: {e}")
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Validation error: {e}")
# Convert to dict, excluding unset fields, for database insertion
data_to_insert = validated_obj.dict(exclude_unset=True)
if sql_insert_result := sql_insert(data=data_to_insert, table_name=table_name_insert):
new_obj_id = sql_insert_result
new_obj_id_random = get_id_random(record_id=new_obj_id, table_name=obj_name)
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id):
resp_data = output_model(**sql_select_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=resp_data, response=commons.response)
else:
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, status_code=404, response=commons.response, status_message="Object created but could not be retrieved.")
else:
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, response=commons.response)
else:
return mk_resp(data=False, status_code=400, response=commons.response, status_message="Failed to create object in database.")
@router.patch('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def patch_obj(
request: Request,
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
return_obj: Optional[bool] = True,
x_delay_ms: Optional[int] = Header(0, alias='X-Delay-ms'),
delay_ms: Optional[int] = Query(0),
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Update a top-level object.
Examples:
- PATCH /v3/crud/journal/{journal_id} (with Journal_Base fields in body)
"""
log.setLevel(logging.WARNING)
log.debug(locals())
sleep_time = max(x_delay_ms, delay_ms)
if sleep_time > 0:
log.info(f"Delaying response for {sleep_time} ms.")
time.sleep(sleep_time / 1000)
obj_data = await request.json()
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name_update = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_update or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Object with ID '{obj_id}' not found.")
# Validate incoming data with the appropriate Pydantic model.
# For PATCH, we don't want to fail on missing fields, so we don't validate like in POST.
# The sql_update function will only update the fields provided in the dict.
data_to_update = obj_data
if sql_update_result := sql_update(data=data_to_update, table_name=table_name_update, record_id=record_id):
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=record_id):
resp_data = output_model(**sql_select_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=resp_data, response=commons.response)
else:
return mk_resp(data=True, status_code=404, response=commons.response, status_message="Object updated but could not be retrieved.")
else:
return mk_resp(data=True, response=commons.response, status_message="Object updated successfully.")
else:
return mk_resp(data=False, status_code=400, response=commons.response, status_message="Failed to update object in database. It may not have been found, or the data was invalid.")
@router.delete('/{obj_type_l1}/{obj_id}', response_model=Resp_Body_Base)
async def delete_obj(
obj_type_l1: str = Path(min_length=2, max_length=50),
obj_id: str = Path(min_length=11, max_length=22),
method: str = 'delete', # delete, disable, hide
x_delay_ms: Optional[int] = Header(0, alias='X-Delay-ms'),
delay_ms: Optional[int] = Query(0),
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Delete a top-level object.
Examples:
- DELETE /v3/crud/journal/{journal_id}
- DELETE /v3/crud/journal/{journal_id}?method=disable
"""
log.setLevel(logging.WARNING)
log.debug(locals())
sleep_time = max(x_delay_ms, delay_ms)
if sleep_time > 0:
log.info(f"Delaying response for {sleep_time} ms.")
time.sleep(sleep_time / 1000)
obj_name = obj_type_l1
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
if not table_name:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete (missing table).")
record_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_name)
if not record_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Object with ID '{obj_id}' not found.")
if method == 'disable':
sql_result = sql_update(data={'enable': False}, table_name=table_name, record_id=record_id)
elif method == 'hide':
sql_result = sql_update(data={'hide': True}, table_name=table_name, record_id=record_id)
else: # Default to hard delete
sql_result = sql_delete(table_name=table_name, record_id=record_id)
if sql_result:
return mk_resp(data=True, response=commons.response, status_message=f"Object with ID '{obj_id}' action '{method}' completed successfully.")
else:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Failed to perform '{method}' on object in database. It may not have been found.")
@router.get('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/', response_model=Resp_Body_Base)
async def get_child_obj_li(
parent_obj_type: str,
parent_obj_id: str,
child_obj_type: str,
hidden: str = 'not_hidden',
order_by_li: Optional[str] = None,
jp: Optional[Union[str, None]] = None,
x_delay_ms: Optional[int] = Header(0, alias='X-Delay-ms'),
delay_ms: Optional[int] = Query(0),
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Get a list of child objects belonging to a parent.
Examples:
- /v3/crud/journal/{journal_id}/journal_entry/
"""
log.setLevel(logging.WARNING)
log.debug(locals())
sleep_time = max(x_delay_ms, delay_ms)
if sleep_time > 0:
log.info(f"Delaying response for {sleep_time} ms.")
time.sleep(sleep_time / 1000)
# This function's logic is very similar to get_obj_li,
# but it enforces the parent-child relationship from the URL path.
# We can treat the parent path parameters as if they were for_obj_type and for_obj_id query params.
for_obj_type = parent_obj_type
for_obj_id = parent_obj_id
qry_dict_li = None
fulltext_qry_dict_obj = None
and_qry_dict_obj = None
and_like_dict_obj = None
or_like_dict_obj = None
and_in_dict_li_obj = None
jp_obj = None
if jp:
try:
jp_obj = json.loads(urllib.parse.unquote(jp))
except Exception as e:
log.warning(e)
return mk_resp(data=False, status_code=400, response=commons.response, status_message='The JSON string was not formatted correctly.')
if jp_obj.get('qry'):
qry_dict_li = jp_obj['qry']
if jp_obj.get('ft_qry'):
fulltext_qry_dict_obj = jp_obj['ft_qry']
if jp_obj.get('and_qry'):
and_qry_dict_obj = jp_obj['and_qry']
if jp_obj.get('and_like'):
and_like_dict_obj = jp_obj['and_like']
if jp_obj.get('or_like'):
or_like_dict_obj = jp_obj['or_like']
if jp_obj.get('and_in_li'):
and_in_dict_li_obj = jp_obj['and_in_li']
if order_by_li:
order_by_li = json.loads(order_by_li)
obj_name = child_obj_type
if obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Object type '{obj_name}' not found.")
obj_cfg = obj_type_kv_li[obj_name]
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{obj_name}' is incomplete.")
# Resolve parent's random ID to integer ID
resolved_parent_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Parent object '{for_obj_type}' with ID '{for_obj_id}' not found.")
field_name = f'{for_obj_type}_id' # Assuming convention like 'journal_id'
sql_result = sql_select(
table_name=table_name,
field_name=field_name,
field_value=resolved_parent_id,
enabled=commons.enabled,
hidden=hidden,
qry_dict_li=qry_dict_li,
fulltext_qry_dict=fulltext_qry_dict_obj,
and_qry_dict=and_qry_dict_obj,
and_like_dict=and_like_dict_obj,
or_like_dict=or_like_dict_obj,
and_in_dict_li=and_in_dict_li_obj,
order_by_li=order_by_li,
limit=commons.limit,
offset=commons.offset,
as_list=True,
)
if sql_result:
resp_data_li = []
for record in sql_result:
resp_data = base_name(**record).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
resp_data_li.append(resp_data)
return mk_resp(data=resp_data_li, response=commons.response)
else:
return mk_resp(data=[], status_code=200, response=commons.response) # Return empty list on no results
@router.post('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/', response_model=Resp_Body_Base)
async def post_child_obj(
request: Request,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
return_obj: Optional[bool] = True,
x_delay_ms: Optional[int] = Header(0, alias='X-Delay-ms'),
delay_ms: Optional[int] = Query(0),
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Create a new child object for a given parent.
Examples:
- POST /v3/crud/journal/{journal_id}/journal_entry/ (with Journal_Entry_Base in body)
"""
log.setLevel(logging.WARNING)
log.debug(locals())
sleep_time = max(x_delay_ms, delay_ms)
if sleep_time > 0:
log.info(f"Delaying response for {sleep_time} ms.")
time.sleep(sleep_time / 1000)
obj_data = await request.json()
parent_obj_name = parent_obj_type
child_obj_name = child_obj_type
if parent_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Parent object type '{parent_obj_name}' not found.")
if child_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Child object type '{child_obj_name}' not found.")
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_name)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Parent object '{parent_obj_name}' with ID '{parent_obj_id}' not found.")
obj_cfg = obj_type_kv_li[child_obj_name]
table_name_insert = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
input_model = obj_cfg.get('mdl_in', obj_cfg.get('mdl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_insert or not input_model or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for child object type '{child_obj_name}' is incomplete.")
# Inject the parent ID into the child object's data
parent_fk_field_name = f'{parent_obj_name}_id'
obj_data[parent_fk_field_name] = resolved_parent_id
# Validate incoming data with the appropriate Pydantic model
try:
validated_obj = input_model(**obj_data)
except Exception as e:
log.warning(f"Validation error for {child_obj_name}: {e}")
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Validation error: {e}")
# Convert to dict, excluding unset fields, for database insertion
data_to_insert = validated_obj.dict(exclude_unset=True)
if sql_insert_result := sql_insert(data=data_to_insert, table_name=table_name_insert):
new_obj_id = sql_insert_result
new_obj_id_random = get_id_random(record_id=new_obj_id, table_name=child_obj_name)
if return_obj:
if sql_select_result := sql_select(table_name=table_name_select, record_id=new_obj_id):
resp_data = output_model(**sql_select_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=resp_data, response=commons.response)
else:
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, status_code=404, response=commons.response, status_message="Child object created but could not be retrieved.")
else:
return mk_resp(data={"obj_id": new_obj_id, "obj_id_random": new_obj_id_random}, response=commons.response)
else:
return mk_resp(data=False, status_code=400, response=commons.response, status_message="Failed to create child object in database.")
@router.get('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def get_child_obj(
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
x_delay_ms: Optional[int] = Header(0, alias='X-Delay-ms'),
delay_ms: Optional[int] = Query(0),
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Get a single child object by its ID, ensuring it belongs to the correct parent.
Examples:
- /v3/crud/journal/{journal_id}/journal_entry/{entry_id}
"""
log.setLevel(logging.WARNING)
log.debug(locals())
sleep_time = max(x_delay_ms, delay_ms)
if sleep_time > 0:
log.info(f"Delaying response for {sleep_time} ms.")
time.sleep(sleep_time / 1000)
parent_obj_name = parent_obj_type
child_obj_name = child_obj_type
if parent_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Parent object type '{parent_obj_name}' not found.")
if child_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Child object type '{child_obj_name}' not found.")
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_name)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Parent object '{parent_obj_name}' with ID '{parent_obj_id}' not found.")
obj_cfg = obj_type_kv_li[child_obj_name]
table_name = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
base_name = obj_cfg.get('mdl_default', obj_cfg.get('mdl'))
if not table_name or not base_name:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for object type '{child_obj_name}' is incomplete.")
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_name)
if not resolved_child_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object with ID '{child_obj_id}' not found.")
if sql_result := sql_select(table_name=table_name, record_id=resolved_child_id):
# Verify the child belongs to the parent
parent_fk_field_name = f'{parent_obj_name}_id'
if sql_result.get(parent_fk_field_name) != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object '{child_obj_id}' not found under parent '{parent_obj_id}'.")
resp_data = base_name(**sql_result).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=resp_data, response=commons.response)
else:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object with ID '{child_obj_id}' not found in database.")
@router.patch('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def patch_child_obj(
request: Request,
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
return_obj: Optional[bool] = True,
x_delay_ms: Optional[int] = Header(0, alias='X-Delay-ms'),
delay_ms: Optional[int] = Query(0),
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Update a child object by its ID, ensuring it belongs to the correct parent.
Examples:
- PATCH /v3/crud/journal/{journal_id}/journal_entry/{entry_id}
"""
log.setLevel(logging.WARNING)
log.debug(locals())
sleep_time = max(x_delay_ms, delay_ms)
if sleep_time > 0:
log.info(f"Delaying response for {sleep_time} ms.")
time.sleep(sleep_time / 1000)
obj_data = await request.json()
parent_obj_name = parent_obj_type
child_obj_name = child_obj_type
if parent_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Parent object type '{parent_obj_name}' not found.")
if child_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Child object type '{child_obj_name}' not found.")
# Resolve IDs
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_name)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Parent object '{parent_obj_name}' with ID '{parent_obj_id}' not found.")
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_name)
if not resolved_child_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object '{child_obj_name}' with ID '{child_obj_id}' not found.")
# Get config for child object
obj_cfg = obj_type_kv_li[child_obj_name]
table_name_update = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl'))
output_model = obj_cfg.get('mdl_out', obj_cfg.get('mdl_default', obj_cfg.get('mdl')))
if not table_name_update or not table_name_select or not output_model:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for child object type '{child_obj_name}' is incomplete.")
# Verify parentage before updating
if existing_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
parent_fk_field_name = f'{parent_obj_name}_id'
if existing_child.get(parent_fk_field_name) != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object '{child_obj_id}' not found under parent '{parent_obj_id}'.")
else:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object '{child_obj_id}' not found.")
# The sql_update function will only update the fields provided in the dict.
data_to_update = obj_data
if sql_update(data=data_to_update, table_name=table_name_update, record_id=resolved_child_id):
if return_obj:
if updated_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
resp_data = output_model(**updated_child).dict(by_alias=commons.by_alias, exclude_unset=commons.exclude_unset)
return mk_resp(data=resp_data, response=commons.response)
else:
return mk_resp(data=True, status_code=404, response=commons.response, status_message="Object updated but could not be retrieved post-update.")
else:
return mk_resp(data=True, response=commons.response, status_message="Object updated successfully.")
else:
return mk_resp(data=False, status_code=400, response=commons.response, status_message="Failed to update object in database.")
@router.delete('/{parent_obj_type}/{parent_obj_id}/{child_obj_type}/{child_obj_id}', response_model=Resp_Body_Base)
async def delete_child_obj(
parent_obj_type: str = Path(min_length=2, max_length=50),
parent_obj_id: str = Path(min_length=11, max_length=22),
child_obj_type: str = Path(min_length=2, max_length=50),
child_obj_id: str = Path(min_length=11, max_length=22),
method: str = 'delete', # delete, disable, hide
x_delay_ms: Optional[int] = Header(0, alias='X-Delay-ms'),
delay_ms: Optional[int] = Query(0),
commons: Common_Route_Params = Depends(common_route_params),
):
"""
Delete a child object by its ID, ensuring it belongs to the correct parent.
Examples:
- DELETE /v3/crud/journal/{journal_id}/journal_entry/{entry_id}
- DELETE /v3/crud/journal/{journal_id}/journal_entry/{entry_id}?method=disable
"""
log.setLevel(logging.WARNING)
log.debug(locals())
sleep_time = max(x_delay_ms, delay_ms)
if sleep_time > 0:
log.info(f"Delaying response for {sleep_time} ms.")
time.sleep(sleep_time / 1000)
parent_obj_name = parent_obj_type
child_obj_name = child_obj_type
if parent_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Parent object type '{parent_obj_name}' not found.")
if child_obj_name not in obj_type_kv_li:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Child object type '{child_obj_name}' not found.")
# Resolve IDs
resolved_parent_id = redis_lookup_id_random(record_id_random=parent_obj_id, table_name=parent_obj_name)
if not resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Parent object '{parent_obj_name}' with ID '{parent_obj_id}' not found.")
resolved_child_id = redis_lookup_id_random(record_id_random=child_obj_id, table_name=child_obj_name)
if not resolved_child_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object '{child_obj_name}' with ID '{child_obj_id}' not found.")
# Get config for child object
obj_cfg = obj_type_kv_li[child_obj_name]
table_name = obj_cfg.get('tbl_update', obj_cfg.get('tbl'))
table_name_select = obj_cfg.get('tbl_default', obj_cfg.get('tbl')) # For verification
if not table_name or not table_name_select:
return mk_resp(data=False, status_code=500, response=commons.response, status_message=f"Configuration for child object type '{child_obj_name}' is incomplete.")
# Verify parentage before deleting
if existing_child := sql_select(table_name=table_name_select, record_id=resolved_child_id):
parent_fk_field_name = f'{parent_obj_name}_id'
if existing_child.get(parent_fk_field_name) != resolved_parent_id:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object '{child_obj_id}' not found under parent '{parent_obj_id}'.")
else:
return mk_resp(data=False, status_code=404, response=commons.response, status_message=f"Child object '{child_obj_id}' not found.")
# If verification passes, perform the action
if method == 'disable':
sql_result = sql_update(data={'enable': False}, table_name=table_name, record_id=resolved_child_id)
elif method == 'hide':
sql_result = sql_update(data={'hide': True}, table_name=table_name, record_id=resolved_child_id)
else: # Default to hard delete
sql_result = sql_delete(table_name=table_name, record_id=resolved_child_id)
if sql_result:
return mk_resp(data=True, response=commons.response, status_message=f"Object with ID '{child_obj_id}' action '{method}' completed successfully.")
else:
return mk_resp(data=False, status_code=400, response=commons.response, status_message=f"Failed to perform '{method}' on object in database.")