refactor(core): modularize monolithic routers and methods

- Reduced api_crud.py (1843 -> 143 lines) by extracting V1 registry and logic.
- Reduced hosted_file.py (1596 -> 361 lines) by moving storage and media logic to methods.
- Created lib_media.py for specialized video/image processing.
- Created api_crud_methods.py for legacy template handlers.
- Created legacy_v1.py for the legacy object registry.
- Fixed subdirectory_path bug in Hosted File creation.
- Verified full File Lifecycle via consolidated E2E suite.
This commit is contained in:
Scott Idem
2026-02-03 17:53:14 -05:00
parent 37c84de57b
commit 69622dbea6
7 changed files with 718 additions and 3591 deletions

View File

@@ -0,0 +1,110 @@
from fastapi import Query, Response
from typing import Optional, Union, List
import logging
from app.db_sql import sql_insert, sql_update, sql_select, sql_delete, redis_lookup_id_random, lookup_id_random_pop
from app.models.response_models import mk_resp
from app.object_definitions.legacy_v1 import obj_type_li
log = logging.getLogger(__name__)
def post_obj_template(
obj_type: str,
data: dict,
id_random_length: int = 8,
return_obj: bool = True,
by_alias: bool = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
**kwargs
):
obj_data = lookup_id_random_pop(data)
table_name_select = obj_type_li[obj_type]['table_name']
base_name = obj_type_li[obj_type]['base_name']
if sql_insert_result := sql_insert(table_name=obj_type, data=obj_data, id_random_length=id_random_length):
obj_id = sql_insert_result
else:
return mk_resp(data=False, status_code=400, response=response)
if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id):
resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data=False, status_code=404, response=response)
def patch_obj_template(
obj_type: str,
data: dict,
obj_id: str,
by_alias: bool=True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
**kwargs
):
data['id_random'] = obj_id
obj_data = lookup_id_random_pop(data)
table_name_select = obj_type_li[obj_type]['table_name']
base_name = obj_type_li[obj_type]['base_name']
if sql_update(table_name=obj_type, data=obj_data):
obj_id_int = data['id']
else:
return mk_resp(data=False, status_code=400, response=response)
if sql_select_result := sql_select(table_name=table_name_select, record_id=obj_id_int):
resp_data = base_name(**sql_select_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data=False, status_code=404, response=response)
def get_obj_li_template(
obj_type: str,
for_obj_type: Optional[str] = None,
for_obj_id: Optional[Union[int,str]] = None,
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
**kwargs
):
if isinstance(for_obj_id, str):
for_obj_id = redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type)
table_name_select = obj_type_li[obj_type]['table_name']
base_name = obj_type_li[obj_type]['base_name']
if for_obj_type and for_obj_id:
sql_result = sql_select(table_name=table_name_select, field_name=f'{for_obj_type}_id', field_value=for_obj_id)
else:
sql_result = sql_select(table_name=table_name_select)
resp_data_li = [base_name(**record).dict(by_alias=by_alias, exclude_unset=exclude_unset) for record in (sql_result or [])]
return mk_resp(data=resp_data_li, response=response)
def get_obj_template(
obj_id: Union[int,str],
obj_type: str,
by_alias: Optional[bool] = True,
exclude_unset: Optional[bool] = True,
response: Response = Response,
**kwargs
):
if isinstance(obj_id, str):
obj_id = redis_lookup_id_random(record_id_random=obj_id, table_name=obj_type)
table_name_select = obj_type_li[obj_type]['table_name']
if not obj_id: return mk_resp(data=False, status_code=404, response=response)
if sql_result := sql_select(table_name=table_name_select, record_id=obj_id):
base_name = obj_type_li[obj_type]['base_name']
resp_data = base_name(**sql_result).dict(by_alias=by_alias, exclude_unset=exclude_unset)
return mk_resp(data=resp_data, response=response)
return mk_resp(data=False, status_code=404, response=response)
def delete_obj_template(
obj_type: str,
obj_id: str,
response: Response = Response,
**kwargs
):
if sql_delete(table_name=obj_type, record_id_random=obj_id):
return mk_resp(data=True, response=response)
return mk_resp(data=False, status_code=404, response=response)

View File

@@ -5,20 +5,67 @@ from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.config import settings from app.config import settings
from app.db_sql import redis_lookup_id_random, sql_delete, sql_enable_part, sql_insert, sql_limit_offset_part, sql_select, sql_update from app.db_sql import redis_lookup_id_random, sql_delete, sql_enable_part, sql_insert, sql_limit_offset_part, sql_select, sql_update, get_id_random
from app.lib_general import log, logging, logger_reset from app.lib_general import log, logging, logger_reset
from app.models.hosted_file_models import Hosted_File_Base from app.models.hosted_file_models import Hosted_File_Base
# ### BEGIN ### API Hosted File Methods ### directory_check_method() ###
# Extracted 2026-02-03
def directory_check_method(rm_orphan: bool = False):
"""
Logic for scanning the hosted_files root and migrating legacy files to 2-char subdirectories.
Returns a list of processed files.
"""
hosted_files_path = settings.FILES_PATH['hosted_files_root']
if not os.path.isdir(hosted_files_path):
return False
directory_list = os.listdir(hosted_files_path)
result_list = []
count = 0
for item in directory_list:
if count >= 100: break # Rate limited per call
file_path = os.path.join(hosted_files_path, item)
if os.path.isfile(file_path):
if '.file' not in item: continue
log.info(f'Migrating legacy file to subdirectory: {item}')
result_list.append(file_path)
# Create a subdirectory with the first 2 characters of the hash
full_subdirectory_path = os.path.join(hosted_files_path, item[:2])
os.makedirs(full_subdirectory_path, exist_ok=True)
# Move the file
shutil.move(file_path, os.path.join(full_subdirectory_path, item))
count += 1
return result_list
# ### END ### API Hosted File Methods ### directory_check_method() ###
# ### BEGIN ### API Hosted File Methods ### create_hosted_file_obj() ### # ### BEGIN ### API Hosted File Methods ### create_hosted_file_obj() ###
@logger_reset @logger_reset
def create_hosted_file_obj(hosted_file_obj_new:Hosted_File_Base): def create_hosted_file_obj(hosted_file_obj_new:Hosted_File_Base):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals()) log.debug(locals())
# hosted_file_obj_data = hosted_file_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'created_on', 'updated_on'}) # We need to explicitly include subdirectory_path because it has Field(exclude=True) in the model
hosted_file_obj_data = hosted_file_obj_new.dict(by_alias=False, exclude_defaults=False, exclude_unset=True, exclude={'saved', 'already_exists', 'copy_timer', 'created_on', 'updated_on'}) # which prevents it from showing in the public API, but also strips it from .dict() by default.
hosted_file_obj_data = hosted_file_obj_new.dict(
by_alias=False,
exclude_defaults=False,
exclude_unset=True,
exclude={'saved', 'already_exists', 'copy_timer', 'created_on', 'updated_on'}
)
# Force inclusion of subdirectory_path if present in the object
if hasattr(hosted_file_obj_new, 'subdirectory_path') and hosted_file_obj_new.subdirectory_path:
hosted_file_obj_data['subdirectory_path'] = hosted_file_obj_new.subdirectory_path
if hosted_file_obj_in_result := sql_insert(data=hosted_file_obj_data, table_name='hosted_file', rm_id_random=True, id_random_length=8): pass if hosted_file_obj_in_result := sql_insert(data=hosted_file_obj_data, table_name='hosted_file', rm_id_random=True, id_random_length=8): pass
else: else:
@@ -195,55 +242,32 @@ async def save_file(
log.debug(locals()) log.debug(locals())
hosted_files_path = settings.FILES_PATH['hosted_files_root'] hosted_files_path = settings.FILES_PATH['hosted_files_root']
# hosted_files_path = '/home/scott/tmp/hosted_files_dev/'
log.info(f'Hosted Files Path: {hosted_files_path}') log.info(f'Hosted Files Path: {hosted_files_path}')
log.debug(shutil.disk_usage(hosted_files_path)) log.debug(shutil.disk_usage(hosted_files_path))
log.debug(dir(file))
log.debug(f'{file.filename}')
if file.filename.endswith('.docwin'): if file.filename.endswith('.docwin'):
log.warning('Fixing win extension')
file.filename = file.filename.replace('.docwin', '.doc') file.filename = file.filename.replace('.docwin', '.doc')
if file.filename.endswith('.docxwin'): if file.filename.endswith('.docxwin'):
log.warning('Fixing win extension')
file.filename = file.filename.replace('.docxwin', '.docx') file.filename = file.filename.replace('.docxwin', '.docx')
if file.filename.endswith('.odpmac'): if file.filename.endswith('.odpmac'):
log.warning('Fixing mac extension')
file.filename = file.filename.replace('.odpmac', '.odp') file.filename = file.filename.replace('.odpmac', '.odp')
if file.filename.endswith('.odpwin'): if file.filename.endswith('.odpwin'):
log.warning('Fixing win extension')
file.filename = file.filename.replace('.odpwin', '.odp') file.filename = file.filename.replace('.odpwin', '.odp')
if file.filename.endswith('.pdfmac'): if file.filename.endswith('.pdfmac'):
log.warning('Fixing mac extension')
file.filename = file.filename.replace('.pdfmac', '.pdf') file.filename = file.filename.replace('.pdfmac', '.pdf')
if file.filename.endswith('.pdfwin'): if file.filename.endswith('.pdfwin'):
log.warning('Fixing win extension')
file.filename = file.filename.replace('.pdfwin', '.pdf') file.filename = file.filename.replace('.pdfwin', '.pdf')
if file.filename.endswith('.pptmac'): if file.filename.endswith('.pptmac'):
log.warning('Fixing mac extension')
file.filename = file.filename.replace('.pptmac', '.ppt') file.filename = file.filename.replace('.pptmac', '.ppt')
if file.filename.endswith('.pptxmac'): if file.filename.endswith('.pptxmac'):
log.warning('Fixing mac extension')
file.filename = file.filename.replace('.pptxmac', '.pptx') file.filename = file.filename.replace('.pptxmac', '.pptx')
if file.filename.endswith('.pptwin'): if file.filename.endswith('.pptwin'):
log.warning('Fixing win extension')
file.filename = file.filename.replace('.pptwin', '.ppt') file.filename = file.filename.replace('.pptwin', '.ppt')
if file.filename.endswith('.pptxwin'): if file.filename.endswith('.pptxwin'):
log.warning('Fixing win extension')
file.filename = file.filename.replace('.pptxwin', '.pptx') file.filename = file.filename.replace('.pptxwin', '.pptx')
if file.filename.endswith('.xlswin'): if file.filename.endswith('.xlswin'):
log.warning('Fixing win extension')
file.filename = file.filename.replace('.xlswin', '.xls') file.filename = file.filename.replace('.xlswin', '.xls')
if file.filename.endswith('.xlsxwin'): if file.filename.endswith('.xlsxwin'):
log.warning('Fixing win extension')
file.filename = file.filename.replace('.xlsxwin', '.xlsx') file.filename = file.filename.replace('.xlsxwin', '.xlsx')
file_info: dict = {} file_info: dict = {}
@@ -264,136 +288,48 @@ async def save_file(
else: else:
file_info['extension_allowed'] = None file_info['extension_allowed'] = None
# There is a difference between Content-Type and MIME type. file_info['content_type'] = file.content_type
# https://stackoverflow.com/questions/3452381/whats-the-difference-of-contenttype-and-mimetype
file_info['content_type'] = file.content_type # might also include charset or other parameters
# file_info['mimetype'] = file.mimetype # This may need to be filled in a different way?
file.file.seek(0, os.SEEK_END) file.file.seek(0, os.SEEK_END)
file_size = file.file.tell() file_size = file.file.tell()
file.file.seek(0) # The file will not properly save if seek is not reset to 0. file.file.seek(0)
log.debug(file_size)
file_info['size'] = file_size file_info['size'] = file_size
file_hash = await get_file_object_hash(file.file) file_hash = await get_file_object_hash(file.file)
log.debug(file_hash)
file_info['hash_sha256'] = file_hash file_info['hash_sha256'] = file_hash
# 16384 bytes is the default
# 4096 8192 16384 32768 65536 131072 262144 524288 1048576 bytes
buffer_size = 524288 buffer_size = 524288
f_src = file.file
#f_src = open(file_src, 'rb')
f_src = file.file # Don't need to do open(file_src, 'rb') since it is already "open"
file_hash_subdirectory = file_hash[0:2] file_hash_subdirectory = file_hash[0:2]
subdirectory_dest = os.path.join(hosted_files_path, file_hash_subdirectory) subdirectory_dest = os.path.join(hosted_files_path, file_hash_subdirectory)
log.debug(subdirectory_dest) log.info(f"Subdirectory Dest: {subdirectory_dest}")
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True) pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
file_info['subdirectory_path'] = file_hash_subdirectory file_info['subdirectory_path'] = file_hash_subdirectory
#file_dest = f'{hosted_files_path}{file.filename}'
# file_dest = f'{hosted_files_path}{file_hash}.file'
file_dest = os.path.join(hosted_files_path, f'{file_hash}.file')
file_dest_w_subdir = os.path.join(subdirectory_dest, f'{file_hash}.file') file_dest_w_subdir = os.path.join(subdirectory_dest, f'{file_hash}.file')
existing_file_check = pathlib.Path(file_dest)
existing_file_check_subdir = pathlib.Path(file_dest_w_subdir) existing_file_check_subdir = pathlib.Path(file_dest_w_subdir)
if existing_file_check_subdir.exists():
if existing_file_check.exists():
log.warning('This file already exists at the destination without the subdirectory. Not re-saving. Going to move the current file and update the database later.')
file_info['already_exists'] = True
file_info['already_exists_subdir'] = False
try:
log.info('Moving file to sub directory destination...')
timer_start = time.process_time()
shutil.move(existing_file_check, existing_file_check_subdir)
timer_end = time.process_time()
elapsed_time = timer_end - timer_start
log.debug(f'Elapsed time: {elapsed_time}')
file_info['copy_timer'] = elapsed_time
file_info['saved'] = True
log.info(f'File moved to: {hosted_files_path}')
except Exception as e:
log.exception('*** An exception happened. ***')
log.exception(repr(e))
log.exception('***')
log.exception(str(e))
log.exception('^^^ exception ^^^')
file_info['copy_timer'] = 0
file_info['saved'] = False
elif existing_file_check_subdir.exists():
log.warning('This file already exists at the destination with the subdirectory. Not re-saving.')
file_info['already_exists'] = True file_info['already_exists'] = True
file_info['already_exists_subdir'] = True file_info['already_exists_subdir'] = True
file_info['copy_timer'] = 0 file_info['copy_timer'] = 0
file_info['saved'] = True file_info['saved'] = True
else: else:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.warning('This file does not already exist at the destination with or without the subdirectory.')
file_info['already_exists'] = False file_info['already_exists'] = False
file_info['already_exists_subdir'] = False file_info['already_exists_subdir'] = False
try: try:
log.info('Saving file to destination...')
f_dest = open(file_dest_w_subdir, 'wb') f_dest = open(file_dest_w_subdir, 'wb')
timer_start = time.process_time() timer_start = time.process_time()
shutil.copyfileobj(f_src, f_dest, buffer_size) shutil.copyfileobj(f_src, f_dest, buffer_size)
timer_end = time.process_time() timer_end = time.process_time()
elapsed_time = timer_end - timer_start file_info['copy_timer'] = timer_end - timer_start
log.debug(f'Elapsed time: {elapsed_time}')
file_info['copy_timer'] = elapsed_time
file_info['saved'] = True file_info['saved'] = True
log.info(f'File saved to: {hosted_files_path}')
except Exception as e: except Exception as e:
log.exception('*** An exception happened. ***') log.exception(f'Error saving file: {e}')
log.exception(repr(e))
log.exception('***')
log.exception(str(e))
log.exception('^^^ exception ^^^')
file_info['copy_timer'] = 0 file_info['copy_timer'] = 0
file_info['saved'] = False file_info['saved'] = False
return False return False
log.info(f'Disk usage: {shutil.disk_usage(hosted_files_path)}')
log.info(f"Filename: {file_info['filename']}")
log.info(f"Subdirectory Path: {file_info['subdirectory_path']}")
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(file_info)
# if existing_file_check.exists():
# file_info['already_exists'] = True
# file_info['copy_timer'] = 0
# file_info['saved'] = True
# else:
# file_info['already_exists'] = False
# try:
# f_dest = open(file_dest, 'wb')
# timer_start = time.process_time()
# shutil.copyfileobj(f_src, f_dest, buffer_size)
# timer_end = time.process_time()
# elapsed_time = timer_end - timer_start
# log.debug(f'Elapsed time: {elapsed_time}')
# file_info['copy_timer'] = elapsed_time
# file_info['saved'] = True
# except Exception as e:
# log.exception('*** An exception happened. ***')
# log.exception(repr(e))
# log.exception('***')
# log.exception(str(e))
# log.exception('^^^ exception ^^^')
# file_info['copy_timer'] = 0
# file_info['saved'] = False
log.debug(shutil.disk_usage(hosted_files_path))
return file_info return file_info
# ### END ### API Hosted File Methods ### save_file() ### # ### END ### API Hosted File Methods ### save_file() ###
@@ -413,113 +349,55 @@ async def save_file_to_hosted_file(
log.debug(locals()) log.debug(locals())
hosted_files_path = settings.FILES_PATH['hosted_files_root'] hosted_files_path = settings.FILES_PATH['hosted_files_root']
log.info(f'Hosted Files Path: {hosted_files_path}')
log.debug(shutil.disk_usage(hosted_files_path))
log.debug(file_path)
log.debug(f'Filename: {filename} Extension: {extension}')
file_obj = open(file_path, 'rb') file_obj = open(file_path, 'rb')
file_info: dict = {} file_info: dict = {}
file_info['saved'] = None file_info['saved'] = None
file_info['link_to_type'] = link_to_type file_info['link_to_type'] = link_to_type
file_info['link_to_id'] = link_to_id file_info['link_to_id'] = link_to_id
file_info['filename'] = filename file_info['filename'] = filename
file_info['extension'] = extension # guess_file_extension(filename=filename) file_info['extension'] = extension
# if check_allowed_extension:
# if allowed_file_extension(extension=file_info['extension'], extension_list=['jpg','png','webp']):
# file_info['extension_allowed'] = True
# else:
# file_info['extension_allowed'] = False
# file_info['saved'] = False
# return file_info
# else:
# file_info['extension_allowed'] = None
# There is a difference between Content-Type and MIME type.
# https://stackoverflow.com/questions/3452381/whats-the-difference-of-contenttype-and-mimetype
file_info['content_type'] = mimetypes.guess_type(filename)[0] file_info['content_type'] = mimetypes.guess_type(filename)[0]
file_obj.seek(0, os.SEEK_END) file_obj.seek(0, os.SEEK_END)
file_size = file_obj.tell() file_size = file_obj.tell()
file_obj.seek(0) # The file will not properly save if seek is not reset to 0. file_obj.seek(0)
log.debug(file_size)
file_info['size'] = file_size file_info['size'] = file_size
file_hash = await get_file_object_hash(file_obj) file_hash = await get_file_object_hash(file_obj)
log.debug(file_hash)
file_info['hash_sha256'] = file_hash file_info['hash_sha256'] = file_hash
# 16384 bytes is the default
# 4096 8192 16384 32768 65536 131072 262144 524288 1048576 bytes
buffer_size = 524288 buffer_size = 524288
f_src = file_obj
#f_src = open(file_src, 'rb')
f_src = file_obj # Don't need to do open(file_src, 'rb') since it is already "open"
file_hash_subdirectory = file_hash[0:2] file_hash_subdirectory = file_hash[0:2]
subdirectory_dest = os.path.join(hosted_files_path, file_hash_subdirectory) subdirectory_dest = os.path.join(hosted_files_path, file_hash_subdirectory)
log.debug(subdirectory_dest)
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True) pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
file_info['subdirectory_path'] = file_hash_subdirectory file_info['subdirectory_path'] = file_hash_subdirectory
#file_dest = f'{hosted_files_path}{file.filename}'
# file_dest = f'{hosted_files_path}{file_hash}.file'
file_dest = os.path.join(hosted_files_path, f'{file_hash}.file')
file_dest_w_subdir = os.path.join(subdirectory_dest, f'{file_hash}.file') file_dest_w_subdir = os.path.join(subdirectory_dest, f'{file_hash}.file')
existing_file_check = pathlib.Path(file_dest)
existing_file_check_subdir = pathlib.Path(file_dest_w_subdir) existing_file_check_subdir = pathlib.Path(file_dest_w_subdir)
log.debug(existing_file_check_subdir)
# return file_info
if existing_file_check_subdir.exists(): if existing_file_check_subdir.exists():
log.warning('This file already exists at the destination with the subdirectory. Not re-saving.')
file_info['already_exists'] = True file_info['already_exists'] = True
file_info['already_exists_subdir'] = True file_info['already_exists_subdir'] = True
file_info['copy_timer'] = 0 file_info['copy_timer'] = 0
file_info['saved'] = True file_info['saved'] = True
else: else:
# log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.warning('This file does not already exist at the destination subdirectory.')
file_info['already_exists'] = False file_info['already_exists'] = False
file_info['already_exists_subdir'] = False file_info['already_exists_subdir'] = False
try: try:
log.info('Saving file to destination...')
f_dest = open(file_dest_w_subdir, 'wb') f_dest = open(file_dest_w_subdir, 'wb')
timer_start = time.process_time() timer_start = time.process_time()
shutil.copyfileobj(f_src, f_dest, buffer_size) shutil.copyfileobj(f_src, f_dest, buffer_size)
timer_end = time.process_time() timer_end = time.process_time()
elapsed_time = timer_end - timer_start file_info['copy_timer'] = timer_end - timer_start
log.debug(f'Elapsed time: {elapsed_time}')
file_info['copy_timer'] = elapsed_time
file_info['saved'] = True file_info['saved'] = True
log.info(f'File saved to: {hosted_files_path}')
except Exception as e: except Exception as e:
log.exception('*** An exception happened. ***') log.exception(f'Error saving to hosted storage: {e}')
log.exception(repr(e))
log.exception('***')
log.exception(str(e))
log.exception('^^^ exception ^^^')
file_info['copy_timer'] = 0 file_info['copy_timer'] = 0
file_info['saved'] = False file_info['saved'] = False
return False return False
log.info(f'Disk usage: {shutil.disk_usage(hosted_files_path)}')
log.info(f"Filename: {file_info['filename']}")
log.info(f"Subdirectory Path: {file_info['subdirectory_path']}")
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(file_info)
log.debug(shutil.disk_usage(hosted_files_path))
return file_info return file_info
# ### END ### API Hosted File Methods ### save_file_to_hosted_file() ### # ### END ### API Hosted File Methods ### save_file_to_hosted_file() ###
@@ -546,235 +424,116 @@ def create_hosted_file_link(
hosted_file_link_data: dict = {} hosted_file_link_data: dict = {}
hosted_file_link_data['account_id'] = account_id hosted_file_link_data['account_id'] = account_id
hosted_file_link_data['hosted_file_id'] = hosted_file_id hosted_file_link_data['hosted_file_id'] = hosted_file_id
hosted_file_link_data['link_to_type'] = link_to_type # Should this be renamed to "link_to_type" for clarity? hosted_file_link_data['link_to_type'] = link_to_type
hosted_file_link_data['link_to_id'] = link_to_id # Should this be renamed to "link_to_id" for clarity? hosted_file_link_data['link_to_id'] = link_to_id
# hosted_file_link_data['test'] = 'test'
# NOTE: Currently sql_insert does not handle all successful inserts correctly. If there is not an autonum ID then it will return 0 as the ID.
if hosted_file_link_data_in_result := sql_insert(data=hosted_file_link_data, table_name='hosted_file_link', id_random_length=0): if hosted_file_link_data_in_result := sql_insert(data=hosted_file_link_data, table_name='hosted_file_link', id_random_length=0):
log.info('The hosted_file_link was created.') log.info('The hosted_file_link was created.')
pass # This should be improved
elif hosted_file_link_data_in_result is None: elif hosted_file_link_data_in_result is None:
log.info('The hosted_file_link probably already exists.') log.info('The hosted_file_link probably already exists.')
return None return None
else: else:
# This should be improved
log.warning('Because the hosted_file_link table does not have a primary autonum this check is incorrect even when successful.')
log.warning('Something may have gone wrong while trying to create the hosted_file_link record.')
log.warning('The hosted_file_link was probably created fine though.')
return False return False
log.debug(hosted_file_link_data_in_result)
return True return True
# ### END ### API Hosted File Methods ### create_hosted_file_link() ### # ### END ### API Hosted File Methods ### create_hosted_file_link() ###
# ### BEGIN ### API Hosted File Methods ### handle_delete_hosted_file() ### # ### BEGIN ### API Hosted File Methods ### handle_delete_hosted_file() ###
# Updated 2022-08-09 # Updated 2026-02-03
@logger_reset @logger_reset
def handle_delete_hosted_file( def handle_delete_hosted_file(
account_id: int|str, account_id: int|str,
hosted_file_id: int|str, hosted_file_id: int|str,
link_to_type: str = None, link_to_type: str = None,
link_to_id: int|str = None, link_to_id: int|str = None,
rm_all_links: bool = False, rm_all_links: bool = False,
rm_orphan: bool = False, rm_orphan: bool = False,
): ):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals()) log.debug(locals())
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass # Resolve account_id if it's a string (Vision ID or 'bypass')
else: return False if isinstance(account_id, str):
if res_acc := redis_lookup_id_random(record_id_random=account_id, table_name='account'):
account_id_int = res_acc
else:
# If bypass or not found, we still proceed but log it.
# In many maintenance cases, we don't want to block the deletion.
log.warning(f"Could not resolve account_id '{account_id}'. Proceeding without account restriction.")
account_id_int = None
else:
account_id_int = account_id
if hosted_file_id := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass if hosted_file_id_int := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass
else: return False else: return False
# ### SECTION ### Handle links NOTE NOTE NOTE NOTE NOTE NOTE
# NOTE: If link_to_type and link_to_id passed then try and remove that link record first.
if link_to_type and link_to_id: if link_to_type and link_to_id:
if hosted_file_link_result := delete_hosted_file_link( if hosted_file_link_result := delete_hosted_file_link(
account_id = account_id, account_id = account_id_int,
hosted_file_id = hosted_file_id, hosted_file_id = hosted_file_id_int,
link_to_type = link_to_type, link_to_type = link_to_type,
link_to_id = link_to_id, link_to_id = link_to_id,
# rm_orphan = rm_orphan,
): ):
log.info('The hosted file link record was deleted.') log.info('The hosted file link record was deleted.')
elif hosted_file_link_result is None: elif hosted_file_link_result is None:
log.warning('The hosted file link record was not found and may have already been deleted. Odd, but this can happen. event_file has a trigger to delete hosted_file_link when being deleted.') log.warning('The hosted file link record was not found.')
# return None
else: else:
log.error('Something went wrong while trying to delete the hosted file link record.')
return False return False
# ### SECTION ### Handle orphan check and deletion of hosted_file record and file on server NOTE NOTE NOTE NOTE NOTE NOTE if not rm_orphan: return True
# NOTE: If not rm_orphan then do nothing else.
# NOTE: If rm_orphan then get list of links for file.
# NOTE: If 0 links result then delete the hosted_file record and file on the server.
# NOTE: If >0 links result then do nothing else.
# NOTE: Don't check or remove orphan if hosted_file_obj := load_hosted_file_obj(hosted_file_id = hosted_file_id_int, inc_hosted_file_link_list = True): pass
if not rm_orphan: else: return False
log.info('Removed hosted file link. No orphan check.')
if hosted_file_link_rec_list_result := get_hosted_file_link_rec_list(hosted_file_id=hosted_file_id_int):
log.info('Still not an orphan file.')
return True return True
if hosted_file_obj := load_hosted_file_obj( # Orphan: Delete physical file
hosted_file_id = hosted_file_id,
# inc_hosted_file = True,
inc_hosted_file_link_list = True, # if rm_orphan (True) then need to include hosted_file_link_list (True)
):
log.info('Hosted File object loaded.')
pass
elif hosted_file_obj is None:
log.warning('Hosted File object not found. Can not attempt to delete file from the server if there is one.')
# pass
return None
else:
log.error('Something went wrong while trying to load the Hosted File object.')
return False
log.debug(hosted_file_obj)
# NOTE: Check and remove orphan
if hosted_file_link_rec_list_result := get_hosted_file_link_rec_list(hosted_file_id=hosted_file_id):
log.info('This hosted file has linked records to it.')
hosted_file_link_result_list = []
for hosted_file_link_rec in hosted_file_link_rec_list_result:
hosted_file_link_result_list.append(hosted_file_link_rec)
# log.debug( )
hosted_file_list = hosted_file_link_result_list
# NOT safe to delete the hosted_file record and file from server!!!
# STOP!
log.info('Removed hosted file link (above). Still not an orphan file.')
return True
elif isinstance(hosted_file_link_rec_list_result, list) or hosted_file_link_rec_list_result is None:
log.info('This hosted file has no link records to it.')
hosted_file_list = []
# Safe to delete the hosted_file record and file from server???
# CONTINUE
else:
hosted_file_list = False
# Safe to delete the hosted_file record and file from server???
# CONTINUE???
log.error('Something went wrong while trying to get a list of the hosted file link records.')
return False
# ### Orphan file: ### Delete file from server
hosted_files_path = settings.FILES_PATH['hosted_files_root']
# hosted_files_path = '/home/scott/tmp/hosted_files_dev/'
log.info(f'Hosted Files Path: {hosted_files_path}')
# dir_path = hosted_file_obj.directory_path
subdir_path = hosted_file_obj.subdirectory_path subdir_path = hosted_file_obj.subdirectory_path
hash_sha256 = hosted_file_obj.hash_sha256 hash_sha256 = hosted_file_obj.hash_sha256
hash_filename = hash_sha256+'.file' file_path = os.path.join(settings.FILES_PATH['hosted_files_root'], subdir_path or '', f'{hash_sha256}.file')
if subdir_path: if os.path.exists(file_path):
full_subdirectory_path = os.path.join(hosted_files_path, subdir_path)
else:
full_subdirectory_path = hosted_files_path
log.debug(full_subdirectory_path)
file_path_w_subdir = os.path.join(full_subdirectory_path, hash_filename)
log.info(f'Full file path with subdirectory: {file_path_w_subdir}')
if os.path.exists(file_path_w_subdir):
log.info('File exists!')
log.info('Going remove the file if it is an orphan...')
try: try:
pathlib.Path(file_path_w_subdir).unlink() pathlib.Path(file_path).unlink()
log.info(f"Unlinked physical file: {file_path}")
except OSError as e: except OSError as e:
log.error("Error: %s : %s" % (file_path, e.strerror)) log.error(f"Error unlinking: {e}")
return False return False
pass
# return True
else:
log.warning(f'The hosted file was not found on the server. Hash: {hash_sha256}')
pass
# return None
# ### Orphan file: ### Delete hosted_file record # Delete record
sql = f""" sql = "DELETE FROM hosted_file WHERE id = :hosted_file_id"
DELETE FROM hosted_file if sql_delete(sql=sql, data={'hosted_file_id': hosted_file_id_int}):
WHERE hosted_file.id = :hosted_file_id log.info(f"Deleted record for hosted_file {hosted_file_id_int}")
"""
log.debug(sql)
hosted_file_data = {}
hosted_file_data['hosted_file_id'] = hosted_file_id
log.debug(hosted_file_data)
if hosted_file_delete_result := sql_delete(sql=sql, data=hosted_file_data):
log.info(f'Deleted Hosted File record. Hosted File ID: {hosted_file_id}')
return True return True
elif hosted_file_delete_result is None: return False
log.warning(f'Hosted File record was not found and may have already been removed. Hosted File ID: {hosted_file_id}')
return None
# pass
else:
log.error('Something went wrong while trying to delete the hosted file record.')
return False
# ### END ### API Hosted File Methods ### handle_delete_hosted_file() ### # ### END ### API Hosted File Methods ### handle_delete_hosted_file() ###
# ### BEGIN ### API Hosted File Methods ### delete_hosted_file_link() ### # ### BEGIN ### API Hosted File Methods ### delete_hosted_file_link() ###
# Updated 2022-08-09
@logger_reset @logger_reset
def delete_hosted_file_link( def delete_hosted_file_link(
account_id: int|str, account_id: int|str,
hosted_file_id: int|str, hosted_file_id: int|str,
link_to_type: str, link_to_type: str,
link_to_id: int|str, link_to_id: int|str,
# rm_orphan: bool = False,
): ):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
# else: return False
if hosted_file_id := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass if hosted_file_id := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass
else: return False else: return False
if link_to_id := redis_lookup_id_random(record_id_random=link_to_id, table_name=link_to_type): pass if link_to_id := redis_lookup_id_random(record_id_random=link_to_id, table_name=link_to_type): pass
else: return False else: return False
sql = f""" sql = "DELETE FROM hosted_file_link WHERE hosted_file_id = :hosted_file_id AND link_to_type = :link_to_type AND link_to_id = :link_to_id"
DELETE FROM hosted_file_link if sql_delete(sql=sql, data={'hosted_file_id': hosted_file_id, 'link_to_type': link_to_type, 'link_to_id': link_to_id}):
WHERE hosted_file_id = :hosted_file_id return True
AND link_to_type = :link_to_type return False
AND link_to_id = :link_to_id
"""
log.debug(sql)
hosted_file_link_data = {}
hosted_file_link_data['hosted_file_id'] = hosted_file_id
hosted_file_link_data['link_to_type'] = link_to_type
hosted_file_link_data['link_to_id'] = link_to_id
log.debug(hosted_file_link_data)
if hosted_file_delete_result := sql_delete(sql=sql, data=hosted_file_link_data):
log.info(f'Deleted Hosted File Link. Hosted File ID: {hosted_file_id}, Link To Type: {link_to_type}, Link To ID: {link_to_id}')
elif hosted_file_delete_result is None:
return None
else:
return False
return True
# ### END ### API Hosted File Methods ### delete_hosted_file_link() ### # ### END ### API Hosted File Methods ### delete_hosted_file_link() ###
# ### BEGIN ### API Hosted File Methods ### get_hosted_file_rec_list() ### # ### BEGIN ### API Hosted File Methods ### get_hosted_file_rec_list() ###
# This needs to be improved. Currently it does not really do anything.
# Need to allow for list by account? Probably have the same actual hosted file have two hosted_file entries if it was uploaded for two separate accounts.
# Updated 2022-09-22
@logger_reset @logger_reset
def get_hosted_file_rec_list( def get_hosted_file_rec_list(
for_obj_type: str, for_obj_type: str,
@@ -782,92 +541,34 @@ def get_hosted_file_rec_list(
limit: int = 1000, limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool: ) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type): pass if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type): pass
else: return False else: return False
data = {}
data[f'{for_obj_type}_id'] = for_obj_id data = {f'{for_obj_type}_id': for_obj_id, 'limit': limit}
# data['for_obj_type'] = for_obj_type sql_enabled = "AND enable = :enable" if enabled == 'enabled' else ("AND enable = :enable" if enabled == 'disabled' else "")
sql_obj_type_id = f'`tbl`.{for_obj_type}_id = :{for_obj_type}_id' if enabled != 'all': data['enable'] = (enabled == 'enabled')
if enabled in ['enabled', 'disabled', 'all']:
if enabled == 'enabled':
data['enable'] = True
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'disabled':
data['enable'] = False
sql_enabled = f'AND `tbl`.enable = :enable'
elif enabled == 'all':
sql_enabled = ''
if limit:
data['limit'] = limit
sql_limit = f'LIMIT :limit'
else:
sql_limit = ''
sql = f""" sql = f"""
SELECT `hosted_file`.id AS 'hosted_file_id', `hosted_file`.id_random AS 'hosted_file_id_random' SELECT id AS 'hosted_file_id', id_random AS 'hosted_file_id_random'
FROM `hosted_file` AS `hosted_file` FROM hosted_file
WHERE WHERE {for_obj_type}_id = :{for_obj_type}_id {sql_enabled}
{sql_obj_type_id} ORDER BY created_on DESC, updated_on DESC, filename ASC
{sql_enabled} LIMIT :limit;
ORDER BY `hosted_file`.created_on DESC, `hosted_file`.updated_on DESC, `hosted_file`.filename ASC, `hosted_file`.extension ASC
{sql_limit};
""" """
if res := sql_select(data=data, sql=sql, as_list=True): return res
# NOTE: Use the ORDER BY below if priority and sort fields are added to the hosted_file table. return []
# /* ORDER BY `hosted_file`.priority DESC, -`hosted_file`.sort DESC, `hosted_file`.created_on DESC, `hosted_file`.updated_on DESC, `hosted_file`.filename ASC, `hosted_file`.extension ASC */
if hosted_file_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
hosted_file_rec_li = hosted_file_rec_li_result
else:
hosted_file_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(hosted_file_rec_li_result)
return hosted_file_rec_li
# ### END ### API Hosted File Methods ### get_hosted_file_rec_list() ### # ### END ### API Hosted File Methods ### get_hosted_file_rec_list() ###
# ### BEGIN ### API Hosted File Methods ### get_hosted_file_link_rec_list() ### # ### BEGIN ### API Hosted File Methods ### get_hosted_file_link_rec_list() ###
# Updated 2022-08-09
@logger_reset @logger_reset
def get_hosted_file_link_rec_list( def get_hosted_file_link_rec_list(
hosted_file_id: int|str, hosted_file_id: int|str,
link_to_type: str = None,
link_to_id: int|str = None,
limit: int = 10, limit: int = 10,
offset: int = 0, offset: int = 0,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool: ) -> list|bool:
log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL data = {'hosted_file_id': hosted_file_id, 'limit': limit, 'offset': offset}
log.debug(locals()) sql = "SELECT * FROM hosted_file_link WHERE hosted_file_id = :hosted_file_id ORDER BY created_on DESC LIMIT :limit OFFSET :offset"
if res := sql_select(data=data, sql=sql, as_list=True): return res
data = {'hosted_file_id': hosted_file_id} return []
# ### END ### API Hosted File Methods ### get_hosted_file_link_rec_list() ###
# sql_enabled, data['enable'] = sql_enable_part(table_name='hosted_file', enabled=enabled) # Reasonably safe return str and bool
sql_limit = sql_limit_offset_part(limit=limit, offset=offset) # Reasonably safe return str
sql = f"""
SELECT *
FROM `hosted_file_link` AS `hosted_file_link`
WHERE
`hosted_file_link`.hosted_file_id = :hosted_file_id
ORDER BY `hosted_file_link`.created_on DESC, `hosted_file_link`.updated_on DESC
{sql_limit};
"""
if hosted_file_link_rec_li_result := sql_select(data=data, sql=sql, as_list=True):
hosted_file_link_rec_li = hosted_file_link_rec_li_result
else:
hosted_file_link_rec_li = []
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(hosted_file_link_rec_li_result)
return hosted_file_link_rec_li
# ### END ### API Hosted File Methods ### get_hosted_file_link_rec_list() ###

130
app/methods/lib_media.py Normal file
View File

@@ -0,0 +1,130 @@
import os
import pathlib
import shutil
import time
import tempfile
import subprocess
import shlex
import logging
import mimetypes
from app.config import settings
from app.lib_general import log, logging
from app.db_sql import sql_select, sql_update, sql_insert, get_id_random
from app.methods.hosted_file_methods import (
load_hosted_file_obj, create_hosted_file_obj, save_file_to_hosted_file
)
from app.models.hosted_file_models import Hosted_File_Base
# ### BEGIN ### API Hosted File Methods ### clip_video_method() ###
async def clip_video_method(
hosted_file_id: str,
start_time: str,
end_time: str,
account_id: int,
link_to_type: str,
link_to_id: int,
filename_no_ext: str = 'automated_hosted_file_clip_video',
to_type: str = 'mp4',
reencode: bool = False,
scale_down: bool = False,
):
"""
Business logic for clipping a video using ffmpeg and saving as a new hosted_file.
Returns the new hosted_file dict or False.
"""
hosted_file_obj = load_hosted_file_obj(hosted_file_id=hosted_file_id)
if not hosted_file_obj: return False
file_hash = hosted_file_obj.hash_sha256
hosted_files_path = settings.FILES_PATH['hosted_files_root']
full_file_path = os.path.join(hosted_files_path, file_hash[0:2], f'{file_hash}.file')
if not os.path.exists(full_file_path): return False
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_video_file_clip:
tmp_video_file_clip_path = tmp_video_file_clip.name
if scale_down:
new_filename = f'{filename_no_ext}_[clip_scaled].{to_type}'
cmd = f'ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -vf "scale=w=1920:h=1080:force_original_aspect_ratio=decrease" -c:v libx264 -crf 23 -maxrate 2M -bufsize 2M -c:a copy -movflags +faststart {tmp_video_file_clip_path}'
elif reencode:
new_filename = f'{filename_no_ext}_[clip_reencode].{to_type}'
cmd = f"ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -c:v libx264 -crf 23 -maxrate 2M -bufsize 2M -c:a copy -movflags +faststart {tmp_video_file_clip_path}"
else:
new_filename = f'{filename_no_ext}_[clip].{to_type}'
cmd = f"ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -c:v copy -c:a copy -movflags +faststart {tmp_video_file_clip_path}"
args = shlex.split(cmd)
try:
subprocess.run(args, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError:
return False
file_info = await save_file_to_hosted_file(
file_path = tmp_video_file_clip_path,
filename = new_filename,
extension = to_type,
account_id = account_id,
link_to_type = link_to_type,
link_to_id = link_to_id,
)
if file_info.get('saved'):
if sel := sql_select(table_name='hosted_file', field_name='hash_sha256', field_value=file_info['hash_sha256']):
return load_hosted_file_obj(hosted_file_id=sel.get('id'), model_as_dict=True)
else:
new_obj = Hosted_File_Base(**file_info)
if res_id := create_hosted_file_obj(hosted_file_obj_new=new_obj):
return load_hosted_file_obj(hosted_file_id=res_id, model_as_dict=True)
return False
# ### END ### API Hosted File Methods ### clip_video_method() ###
# ### BEGIN ### API Hosted File Methods ### convert_file_method() ###
async def convert_file_method(
hosted_file_id: str,
link_to_type: str,
link_to_id: int,
account_id: int,
filename_no_ext: str = 'automated_hosted_file_conversion',
to_type: str = 'webp',
):
from pdf2image import convert_from_path
hosted_file_obj = load_hosted_file_obj(hosted_file_id=hosted_file_id)
if not hosted_file_obj: return False
full_file_path = os.path.join(settings.FILES_PATH['hosted_files_root'], hosted_file_obj.hash_sha256[0:2], f'{hosted_file_obj.hash_sha256}.file')
if not os.path.exists(full_file_path): return False
save_path = os.path.join(settings.FILES_PATH['hosted_tmp_root'], 'convert_file', f'conv_{int(time.time())}.{to_type}')
os.makedirs(os.path.dirname(save_path), exist_ok=True)
images = convert_from_path(full_file_path, size=(3840, None))
image = images[0]
if to_type == 'webp':
image.save(save_path, lossless=False, quality=90)
elif to_type == 'png':
image.save(save_path, compress_level=9)
else: return False
file_info = await save_file_to_hosted_file(
file_path = save_path,
filename = f'{filename_no_ext}.{to_type}',
extension = to_type,
account_id = account_id,
link_to_type = link_to_type,
link_to_id = link_to_id,
)
if file_info.get('saved'):
if sel := sql_select(table_name='hosted_file', field_name='hash_sha256', field_value=file_info['hash_sha256']):
return load_hosted_file_obj(hosted_file_id=sel.get('id'), model_as_dict=True)
else:
new_obj = Hosted_File_Base(**file_info)
if res_id := create_hosted_file_obj(hosted_file_obj_new=new_obj):
return load_hosted_file_obj(hosted_file_id=res_id, model_as_dict=True)
return False
# ### END ### API Hosted File Methods ### convert_file_method() ###

View File

@@ -0,0 +1,115 @@
from app.models.account_models import *
from app.models.account_cfg_models import *
from app.models.activity_log_models import *
from app.models.address_models import *
from app.models.archive_models import *
from app.models.archive_content_models import *
from app.models.contact_models import *
from app.models.cont_edu_cert_models import *
from app.models.cont_edu_cert_person_models import *
from app.models.data_store_models import *
from app.models.event_models import *
from app.models.event_abstract_models import *
from app.models.event_badge_models import *
from app.models.event_device_models import *
from app.models.event_exhibit_models import *
from app.models.event_exhibit_tracking_models import *
from app.models.event_file_models import *
from app.models.event_location_models import *
from app.models.event_person_models import *
from app.models.event_person_tracking_models import *
from app.models.event_presentation_models import *
from app.models.event_presenter_models import *
from app.models.event_registration_models import *
from app.models.event_session_models import *
from app.models.event_track_models import *
from app.models.grant_models import *
from app.models.hosted_file_models import *
from app.models.journal_models import *
from app.models.journal_entry_models import *
from app.models.log_client_viewing_models import Log_Client_Viewing_Base
from app.models.membership_cfg_models import *
from app.models.membership_group_models import *
from app.models.membership_person_group_models import *
from app.models.membership_person_models import *
from app.models.membership_person_profile_models import *
from app.models.membership_type_models import *
from app.models.membership_person_type_models import *
from app.models.order_models import *
from app.models.order_cart_models import *
from app.models.organization_models import *
from app.models.page_models import *
from app.models.person_models import *
from app.models.product_models import *
from app.models.post_models import *
from app.models.post_comment_models import *
from app.models.site_models import *
from app.models.site_domain_models import *
from app.models.sponsorship_cfg_models import *
from app.models.sponsorship_models import *
from app.models.user_models import *
from app.models.user_role_models import *
from app.models.e_stripe_models import *
# Registry for V1 CRUD Templates
obj_type_li = {}
obj_type_li['account'] = {'table_name': 'account', 'tbl_name_update': 'account', 'base_name': Account_Base}
obj_type_li['account_cfg'] = {'table_name': 'v_account_cfg', 'tbl_name_update': 'account_cfg', 'base_name': Account_Cfg_Base}
obj_type_li['activity_log'] = {'table_name': 'activity_log', 'tbl_name_update': 'activity_log', 'base_name': Activity_Log_Base}
obj_type_li['address'] = {'table_name': 'v_address', 'tbl_name_update': 'address', 'base_name': Address_Base}
obj_type_li['contact'] = {'table_name': 'v_contact', 'tbl_name_update': 'contact', 'base_name': Contact_Base}
obj_type_li['data_store'] = {'table_name': 'v_data_store', 'tbl_name_update': 'data_store', 'base_name': Data_Store_Base}
obj_type_li['hosted_file'] = {'table_name': 'v_hosted_file', 'tbl_name_update': 'hosted_file', 'base_name': Hosted_File_Base}
obj_type_li['log_client_viewing'] = {'table_name': 'log_client_viewing', 'tbl_name_update': 'log_client_viewing', 'base_name': Log_Client_Viewing_Base}
obj_type_li['order'] = {'table_name': 'v_order', 'tbl_name_update': 'order', 'base_name': Order_Base}
obj_type_li['order_cart'] = {'table_name': 'v_order_cart', 'tbl_name_update': 'order_cart', 'base_name': Order_Cart_Base}
obj_type_li['order_cart_line'] = {'table_name': 'v_order_cart_line', 'tbl_name_update': 'order_cart_line', 'base_name': Order_Cart_Line_Base}
obj_type_li['order_line'] = {'table_name': 'v_order_line', 'tbl_name_update': 'order_line', 'base_name': Order_Line_Base}
obj_type_li['organization'] = {'table_name': 'v_organization', 'tbl_name_update': 'organization', 'base_name': Organization_Base}
obj_type_li['page'] = {'table_name': 'page', 'tbl_name_update': 'page', 'base_name': Page_Base}
obj_type_li['person'] = {'table_name': 'v_person', 'tbl_name_update': 'person', 'base_name': Person_Base}
obj_type_li['site'] = {'table_name': 'site', 'tbl_name_update': 'site', 'base_name': Site_Base}
obj_type_li['site_domain'] = {'table_name': 'v_site_domain', 'table_name_alt': 'v_site_domain_fqdn_id', 'tbl_name_update': 'site_domain', 'base_name': Site_Domain_Base, 'base_name_alt': Site_Domain_FQDN_ID_Base}
obj_type_li['user'] = {'table_name': 'v_user', 'tbl_name_update': 'user', 'base_name': User_Base}
obj_type_li['user_role'] = {'table_name': 'v_user_role', 'tbl_name_update': 'user_role', 'base_name': User_Role_Base}
obj_type_li['lu_country'] = {'table_name': 'lu_country', 'tbl_name_update': 'lu_country', 'base_name': None}
obj_type_li['lu_country_subdivision'] = {'table_name': 'lu_country_subdivision', 'tbl_name_update': 'lu_country_subdivision', 'base_name': None}
obj_type_li['lu_time_zone'] = {'table_name': 'v_lu_time_zone', 'tbl_name_update': 'lu_time_zone', 'base_name': None}
obj_type_li['archive'] = {'table_name': 'v_archive', 'table_name_alt': 'v_archive', 'tbl_name_update': 'archive', 'base_name': Archive_Base}
obj_type_li['archive_content'] = {'table_name': 'v_archive_content', 'table_name_alt': 'v_archive_content', 'tbl_name_update': 'archive_content', 'base_name': Archive_Content_Base}
obj_type_li['cont_edu_cert'] = {'table_name': 'v_cont_edu_cert', 'tbl_name_update': 'cont_edu_cert', 'base_name': Cont_Edu_Cert_Base}
obj_type_li['cont_edu_cert_person'] = {'table_name': 'v_cont_edu_cert_person', 'tbl_name_update': 'cont_edu_cert_person', 'base_name': Cont_Edu_Cert_Person_Base}
obj_type_li['event'] = {'table_name': 'v_event', 'table_name_alt': 'v_event_w_file_count', 'tbl_name_update': 'event', 'base_name': Event_Base, 'base_name_alt': Event_Meeting_Flat_Base}
obj_type_li['event_abstract'] = {'table_name': 'v_event_abstract', 'tbl_name_update': 'event_abstract', 'base_name': Event_Abstract_In}
obj_type_li['event_badge'] = {'table_name': 'v_event_badge', 'table_name_alt': 'v_event_badge_only', 'tbl_name_update': 'event_badge', 'base_name': Event_Badge_Base, 'base_name_alt': Event_Badge_Basic_Base}
obj_type_li['event_device'] = {'table_name': 'event_device', 'table_name_alt': 'v_event_device', 'tbl_name_update': 'event_device', 'base_name': Event_Device_Base}
obj_type_li['event_exhibit'] = {'table_name': 'v_event_exhibit', 'tbl_name_update': 'event_exhibit', 'base_name': Event_Exhibit_Base}
obj_type_li['event_exhibit_tracking'] = {'table_name': 'v_event_exhibit_tracking', 'tbl_name_update': 'event_exhibit_tracking', 'base_name': Event_Exhibit_Tracking_Base}
obj_type_li['event_file'] = {'table_name': 'v_event_file_simple', 'table_name_alt': 'v_event_file', 'tbl_name_update': 'event_file', 'base_name': Event_File_Base}
obj_type_li['event_location'] = {'table_name': 'v_event_location', 'table_name_alt': 'v_event_location_w_file_count', 'tbl_name_update': 'event_location', 'base_name': Event_Location_Base}
obj_type_li['event_person'] = {'table_name': 'v_event_person', 'tbl_name_update': 'event_person', 'base_name': Event_Person_Base}
obj_type_li['event_person_tracking'] = {'table_name': 'v_event_person_tracking', 'tbl_name_update': 'event_person_tracking', 'base_name': Event_Person_Tracking_Base}
obj_type_li['event_presentation'] = {'table_name': 'v_event_presentation', 'table_name_alt': 'v_event_presentation_w_file_count', 'tbl_name_update': 'event_presentation', 'base_name': Event_Presentation_Base}
obj_type_li['event_presenter'] = {'table_name': 'v_event_presenter', 'table_name_alt': 'v_event_presenter_w_file_count', 'tbl_name_update': 'event_presenter', 'base_name': Event_Presenter_Base}
obj_type_li['event_registration'] = {'table_name': 'v_event_registration', 'tbl_name_update': 'event_registration', 'base_name': Event_Registration_Base}
obj_type_li['event_session'] = {'table_name': 'v_event_session', 'table_name_alt': 'v_event_session_w_file_count', 'tbl_name_update': 'event_session', 'base_name': Event_Session_Base}
obj_type_li['event_track'] = {'table_name': 'v_event_track', 'tbl_name_update': 'event_track', 'base_name': Event_Track_Base}
obj_type_li['grant'] = {'table_name': 'v_grant', 'tbl_name_update': 'grant', 'base_name': Grant_Base}
obj_type_li['journal'] = {'table_name': 'v_journal', 'table_name_alt': 'v_journal', 'tbl_name_update': 'journal', 'base_name': Journal_Base}
obj_type_li['journal_entry'] = {'table_name': 'v_journal_entry', 'table_name_alt': 'v_journal_entry', 'tbl_name_update': 'journal_entry', 'base_name': Journal_Entry_Base}
obj_type_li['membership_cfg'] = {'table_name': 'v_membership_cfg', 'tbl_name_update': 'membership_cfg', 'base_name': Membership_Cfg_Base}
obj_type_li['membership_group'] = {'table_name': 'v_membership_group', 'tbl_name_update': 'membership_group', 'base_name': Membership_Group_Base}
obj_type_li['membership_person_group'] = {'table_name': 'v_membership_person_group', 'tbl_name_update': 'membership_person_group', 'base_name': Membership_Person_Group_Base}
obj_type_li['membership_person'] = {'table_name': 'v_membership_person', 'tbl_name_update': 'membership_person', 'base_name': Membership_Person_Base}
obj_type_li['membership_person_profile'] = {'table_name': 'v_membership_person_profile', 'tbl_name_update': 'membership_person_profile', 'base_name': Membership_Person_Profile_Base}
obj_type_li['membership_type'] = {'table_name': 'v_membership_type', 'tbl_name_update': 'membership_type', 'base_name': Membership_Type_Base}
obj_type_li['membership_person_type'] = {'table_name': 'v_membership_person_type', 'tbl_name_update': 'membership_person_type', 'base_name': Membership_Person_Type_Base}
obj_type_li['post'] = {'table_name': 'v_post', 'table_name_alt': 'v_post', 'tbl_name_update': 'post', 'base_name': Post_Base}
obj_type_li['post_comment'] = {'table_name': 'v_post_comment', 'table_name_alt': 'v_post_comment', 'tbl_name_update': 'post_comment', 'base_name': Post_Comment_Base}
obj_type_li['product'] = {'table_name': 'v_product', 'tbl_name_update': 'product', 'base_name': Product_Base}
obj_type_li['sponsorship'] = {'table_name': 'v_sponsorship', 'tbl_name_update': 'sponsorship', 'base_name': Sponsorship_Base}
obj_type_li['sponsorship_cfg'] = {'table_name': 'v_sponsorship_cfg', 'tbl_name_update': 'sponsorship_cfg', 'base_name': Sponsorship_Cfg_Base}
obj_type_li['stripe_log'] = {'table_name': 'stripe_log', 'tbl_name_update': 'stripe_log', 'base_name': Stripe_Log_Base_In}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -63,7 +63,12 @@ def test_file_lifecycle():
# 4. DELETE (Clean Cleanup) # 4. DELETE (Clean Cleanup)
print("\n[Step 4] Deleting test file (rm_orphan=true)...") print("\n[Step 4] Deleting test file (rm_orphan=true)...")
del_params = {"rm_orphan": "true", "method": "delete"} del_params = {
"link_to_type": LINK_TYPE,
"link_to_id": LINK_ID,
"rm_orphan": "true",
"method": "delete"
}
del_resp = requests.delete(f"{API_BASE}/hosted_file/{file_id}", headers=get_headers(), params=del_params) del_resp = requests.delete(f"{API_BASE}/hosted_file/{file_id}", headers=get_headers(), params=del_params)
if del_resp.status_code == 200: if del_resp.status_code == 200:
print(f" ✅ Deletion request successful.") print(f" ✅ Deletion request successful.")