Files
OSIT-AE-API-FastAPI/app/methods/hosted_file_methods.py
Scott Idem 69622dbea6 refactor(core): modularize monolithic routers and methods
- Reduced api_crud.py (1843 -> 143 lines) by extracting V1 registry and logic.
- Reduced hosted_file.py (1596 -> 361 lines) by moving storage and media logic to methods.
- Created lib_media.py for specialized video/image processing.
- Created api_crud_methods.py for legacy template handlers.
- Created legacy_v1.py for the legacy object registry.
- Fixed subdirectory_path bug in Hosted File creation.
- Verified full File Lifecycle via consolidated E2E suite.
2026-02-03 17:53:14 -05:00

574 lines
22 KiB
Python

import datetime, hashlib, mimetypes, os, pathlib, shutil, time
from fastapi import File, UploadFile
from typing import Dict, List, Optional, Set, Union
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
from app.config import settings
from app.db_sql import redis_lookup_id_random, sql_delete, sql_enable_part, sql_insert, sql_limit_offset_part, sql_select, sql_update, get_id_random
from app.lib_general import log, logging, logger_reset
from app.models.hosted_file_models import Hosted_File_Base
# ### BEGIN ### API Hosted File Methods ### directory_check_method() ###
# Extracted 2026-02-03
def directory_check_method(rm_orphan: bool = False):
"""
Logic for scanning the hosted_files root and migrating legacy files to 2-char subdirectories.
Returns a list of processed files.
"""
hosted_files_path = settings.FILES_PATH['hosted_files_root']
if not os.path.isdir(hosted_files_path):
return False
directory_list = os.listdir(hosted_files_path)
result_list = []
count = 0
for item in directory_list:
if count >= 100: break # Rate limited per call
file_path = os.path.join(hosted_files_path, item)
if os.path.isfile(file_path):
if '.file' not in item: continue
log.info(f'Migrating legacy file to subdirectory: {item}')
result_list.append(file_path)
# Create a subdirectory with the first 2 characters of the hash
full_subdirectory_path = os.path.join(hosted_files_path, item[:2])
os.makedirs(full_subdirectory_path, exist_ok=True)
# Move the file
shutil.move(file_path, os.path.join(full_subdirectory_path, item))
count += 1
return result_list
# ### END ### API Hosted File Methods ### directory_check_method() ###
# ### BEGIN ### API Hosted File Methods ### create_hosted_file_obj() ###
@logger_reset
def create_hosted_file_obj(hosted_file_obj_new:Hosted_File_Base):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# We need to explicitly include subdirectory_path because it has Field(exclude=True) in the model
# which prevents it from showing in the public API, but also strips it from .dict() by default.
hosted_file_obj_data = hosted_file_obj_new.dict(
by_alias=False,
exclude_defaults=False,
exclude_unset=True,
exclude={'saved', 'already_exists', 'copy_timer', 'created_on', 'updated_on'}
)
# Force inclusion of subdirectory_path if present in the object
if hasattr(hosted_file_obj_new, 'subdirectory_path') and hosted_file_obj_new.subdirectory_path:
hosted_file_obj_data['subdirectory_path'] = hosted_file_obj_new.subdirectory_path
if hosted_file_obj_in_result := sql_insert(data=hosted_file_obj_data, table_name='hosted_file', rm_id_random=True, id_random_length=8): pass
else:
return False
log.debug(hosted_file_obj_in_result)
hosted_file_id = hosted_file_obj_in_result
log.debug(f'Returning the new hosted_file_id: {hosted_file_id}')
return hosted_file_id
# ### END ### API Hosted File Methods ### create_hosted_file_obj() ###
# ### BEGIN ### API Hosted File Methods ### load_hosted_file_obj() ###
# Updated 2023-08-18
@logger_reset
def load_hosted_file_obj(
hosted_file_id: int|str,
limit: int = 1000,
by_alias: bool = True,
exclude_unset: bool = True,
model_as_dict: bool = False,
enabled: str = 'enabled', # enabled, disabled, all
inc_hosted_file_link_list: bool = False,
) -> Hosted_File_Base|dict|bool:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if hosted_file_id := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass
else: return False
if hosted_file_rec := sql_select(table_name='v_hosted_file', record_id=hosted_file_id): pass
elif hosted_file_rec is None: return None
else: return False
log.debug(hosted_file_rec)
try:
hosted_file_obj = Hosted_File_Base(**hosted_file_rec)
except ValidationError as e:
log.error(e.json())
return False
log.info(f'Filename: {hosted_file_obj.filename}; Size: {hosted_file_obj.size}; Hash SHA256: {hosted_file_obj.hash_sha256}; ')
log.debug(hosted_file_obj)
if model_as_dict:
return hosted_file_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset)
else:
return hosted_file_obj
# ### END ### API Hosted File Methods ### load_hosted_file_obj() ###
# ### BEGIN ### API Hosted File Methods ### lookup_file_hash() ###
# Updated 2022-08-09
@logger_reset
def lookup_file_hash(
file_hash: str,
) -> Hosted_File_Base|dict|bool:
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
sql = f"""
SELECT id AS 'hosted_file_id', id_random AS 'hosted_file_id_random'
FROM hosted_file
WHERE hosted_file.hash_sha256 = :hash_sha256
"""
log.debug(sql)
hosted_file_data = {}
hosted_file_data['hash_sha256'] = file_hash
log.debug(hosted_file_data)
if hosted_file_select_result := sql_select(sql=sql, data=hosted_file_data):
hosted_file_id = hosted_file_select_result.get('hosted_file_id')
hosted_file_id_random = hosted_file_select_result.get('hosted_file_id_random')
log.info(f'Selected Hosted File record. Hosted File ID: {hosted_file_id}')
return hosted_file_id
elif hosted_file_select_result is None:
log.warning(f'Hosted File record was not found. SHA 256 Hash: {file_hash}')
return None
# pass
else:
log.error(f'Something went wrong while trying to select the hosted file record. SHA 256 Hash: {file_hash}')
return False
# ### END ### API Hosted File Methods ### lookup_file_hash() ###
# ### BEGIN ### API Hosted File Methods ### get_file_object_hash() ###
# Really shouldn't this be called generate_file_obj_hash() ??? -2023-05-04
@logger_reset
async def get_file_object_hash(file_object:File):
#log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# 4096 bytes is the current block size on my workstation and Linode server
# 4096 8192 16384 32768 65536 131072 262144 524288 1048576 bytes
block_size = 131072
hash_value = hashlib.sha256()
timer_start = time.process_time()
for chunk in iter(lambda: file_object.read(block_size), b""):
hash_value.update(chunk)
file_hash = hash_value.hexdigest()
file_object.seek(0) # The file will not properly save if seek is not reset to 0.
timer_end = time.process_time()
elapsed_time = timer_end - timer_start
log.debug(f'Elapsed time: {elapsed_time}')
return file_hash
# ### END ### API Hosted File Methods ### get_file_object_hash() ###
# ### BEGIN ### API Hosted File Methods ### guess_file_extension() ###
def guess_file_extension(filename: str):
return filename.rsplit('.', 1)[1].lower()
# ### END ### API Hosted File Methods ### guess_file_extension() ###
# ### BEGIN ### API Hosted File Methods ### allowed_file_extension() ###
def allowed_file_extension(extension: str, extension_list: list):
return extension.lower() in extension_list # app.config['ALLOWED_EXTENSIONS']
# ### END ### API Hosted File Methods ### allowed_file_extension() ###
# ### BEGIN ### API Hosted File Methods ### lookup_file_hash() ###
# Updated 2023-09-19
@logger_reset
def check_for_hosted_file_hash_file(
file_hash: str,
sub_dir: str,
) -> dict|bool:
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
file_size = None
hosted_files_path = settings.FILES_PATH['hosted_files_root']
log.info(f'Hosted Files Path: {hosted_files_path}')
log.debug(shutil.disk_usage(hosted_files_path))
hosted_files_dir_w_subdir = os.path.join(hosted_files_path, sub_dir)
path_hosted_files_dir_w_subdir = pathlib.Path(hosted_files_dir_w_subdir)
if path_hosted_files_dir_w_subdir.exists(): pass
else:
log.warning('Hashed hosted file subdirectory was not found in the hosted files root.')
return False
hosted_files_dir_w_subdir_filename = os.path.join(hosted_files_path, sub_dir, f'{file_hash}.file')
path_hosted_files_dir_w_subdir_filename = pathlib.Path(hosted_files_dir_w_subdir_filename)
if path_hosted_files_dir_w_subdir_filename.exists():
file_size = os.path.getsize(path_hosted_files_dir_w_subdir_filename)
else:
log.warning('Hashed hosted file not found in the expected hosted files subdirectory.')
return False
return {'found': True, 'file_size': file_size}
# ### BEGIN ### API Hosted File Methods ### save_file() ###
# Updated 2022-08-09
@logger_reset
async def save_file(
file: UploadFile,
account_id: int,
link_to_type: str,
link_to_id: int,
account_id_random: str = None,
link_to_id_random: str = None,
check_allowed_extension: bool = False,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
hosted_files_path = settings.FILES_PATH['hosted_files_root']
log.info(f'Hosted Files Path: {hosted_files_path}')
log.debug(shutil.disk_usage(hosted_files_path))
if file.filename.endswith('.docwin'):
file.filename = file.filename.replace('.docwin', '.doc')
if file.filename.endswith('.docxwin'):
file.filename = file.filename.replace('.docxwin', '.docx')
if file.filename.endswith('.odpmac'):
file.filename = file.filename.replace('.odpmac', '.odp')
if file.filename.endswith('.odpwin'):
file.filename = file.filename.replace('.odpwin', '.odp')
if file.filename.endswith('.pdfmac'):
file.filename = file.filename.replace('.pdfmac', '.pdf')
if file.filename.endswith('.pdfwin'):
file.filename = file.filename.replace('.pdfwin', '.pdf')
if file.filename.endswith('.pptmac'):
file.filename = file.filename.replace('.pptmac', '.ppt')
if file.filename.endswith('.pptxmac'):
file.filename = file.filename.replace('.pptxmac', '.pptx')
if file.filename.endswith('.pptwin'):
file.filename = file.filename.replace('.pptwin', '.ppt')
if file.filename.endswith('.pptxwin'):
file.filename = file.filename.replace('.pptxwin', '.pptx')
if file.filename.endswith('.xlswin'):
file.filename = file.filename.replace('.xlswin', '.xls')
if file.filename.endswith('.xlsxwin'):
file.filename = file.filename.replace('.xlsxwin', '.xlsx')
file_info: dict = {}
file_info['saved'] = None
file_info['link_to_type'] = link_to_type
file_info['link_to_id'] = link_to_id
file_info['link_to_id_random'] = link_to_id_random
file_info['filename'] = file.filename
file_info['extension'] = guess_file_extension(filename=file.filename)
if check_allowed_extension:
if allowed_file_extension(extension=file_info['extension'], extension_list=['jpg','png','webp']):
file_info['extension_allowed'] = True
else:
file_info['extension_allowed'] = False
file_info['saved'] = False
return file_info
else:
file_info['extension_allowed'] = None
file_info['content_type'] = file.content_type
file.file.seek(0, os.SEEK_END)
file_size = file.file.tell()
file.file.seek(0)
file_info['size'] = file_size
file_hash = await get_file_object_hash(file.file)
file_info['hash_sha256'] = file_hash
buffer_size = 524288
f_src = file.file
file_hash_subdirectory = file_hash[0:2]
subdirectory_dest = os.path.join(hosted_files_path, file_hash_subdirectory)
log.info(f"Subdirectory Dest: {subdirectory_dest}")
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
file_info['subdirectory_path'] = file_hash_subdirectory
file_dest_w_subdir = os.path.join(subdirectory_dest, f'{file_hash}.file')
existing_file_check_subdir = pathlib.Path(file_dest_w_subdir)
if existing_file_check_subdir.exists():
file_info['already_exists'] = True
file_info['already_exists_subdir'] = True
file_info['copy_timer'] = 0
file_info['saved'] = True
else:
file_info['already_exists'] = False
file_info['already_exists_subdir'] = False
try:
f_dest = open(file_dest_w_subdir, 'wb')
timer_start = time.process_time()
shutil.copyfileobj(f_src, f_dest, buffer_size)
timer_end = time.process_time()
file_info['copy_timer'] = timer_end - timer_start
file_info['saved'] = True
except Exception as e:
log.exception(f'Error saving file: {e}')
file_info['copy_timer'] = 0
file_info['saved'] = False
return False
return file_info
# ### END ### API Hosted File Methods ### save_file() ###
# ### BEGIN ### API Hosted File Methods ### save_file_to_hosted_file() ###
# Updated 2022-08-09
@logger_reset
async def save_file_to_hosted_file(
file_path: str,
filename: str,
extension: str,
account_id: int,
link_to_type: str,
link_to_id: int,
):
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
hosted_files_path = settings.FILES_PATH['hosted_files_root']
file_obj = open(file_path, 'rb')
file_info: dict = {}
file_info['saved'] = None
file_info['link_to_type'] = link_to_type
file_info['link_to_id'] = link_to_id
file_info['filename'] = filename
file_info['extension'] = extension
file_info['content_type'] = mimetypes.guess_type(filename)[0]
file_obj.seek(0, os.SEEK_END)
file_size = file_obj.tell()
file_obj.seek(0)
file_info['size'] = file_size
file_hash = await get_file_object_hash(file_obj)
file_info['hash_sha256'] = file_hash
buffer_size = 524288
f_src = file_obj
file_hash_subdirectory = file_hash[0:2]
subdirectory_dest = os.path.join(hosted_files_path, file_hash_subdirectory)
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
file_info['subdirectory_path'] = file_hash_subdirectory
file_dest_w_subdir = os.path.join(subdirectory_dest, f'{file_hash}.file')
existing_file_check_subdir = pathlib.Path(file_dest_w_subdir)
if existing_file_check_subdir.exists():
file_info['already_exists'] = True
file_info['already_exists_subdir'] = True
file_info['copy_timer'] = 0
file_info['saved'] = True
else:
file_info['already_exists'] = False
file_info['already_exists_subdir'] = False
try:
f_dest = open(file_dest_w_subdir, 'wb')
timer_start = time.process_time()
shutil.copyfileobj(f_src, f_dest, buffer_size)
timer_end = time.process_time()
file_info['copy_timer'] = timer_end - timer_start
file_info['saved'] = True
except Exception as e:
log.exception(f'Error saving to hosted storage: {e}')
file_info['copy_timer'] = 0
file_info['saved'] = False
return False
return file_info
# ### END ### API Hosted File Methods ### save_file_to_hosted_file() ###
# ### BEGIN ### API Hosted File Methods ### create_hosted_file_link() ###
# Updated 2022-08-09
@logger_reset
def create_hosted_file_link(
account_id: int|str,
hosted_file_id: int|str,
link_to_type: str,
link_to_id: int|str,
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
else: return False
if hosted_file_id := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass
else: return False
if link_to_id := redis_lookup_id_random(record_id_random=link_to_id, table_name=link_to_type): pass
else: return False
hosted_file_link_data: dict = {}
hosted_file_link_data['account_id'] = account_id
hosted_file_link_data['hosted_file_id'] = hosted_file_id
hosted_file_link_data['link_to_type'] = link_to_type
hosted_file_link_data['link_to_id'] = link_to_id
if hosted_file_link_data_in_result := sql_insert(data=hosted_file_link_data, table_name='hosted_file_link', id_random_length=0):
log.info('The hosted_file_link was created.')
elif hosted_file_link_data_in_result is None:
log.info('The hosted_file_link probably already exists.')
return None
else:
return False
return True
# ### END ### API Hosted File Methods ### create_hosted_file_link() ###
# ### BEGIN ### API Hosted File Methods ### handle_delete_hosted_file() ###
# Updated 2026-02-03
@logger_reset
def handle_delete_hosted_file(
account_id: int|str,
hosted_file_id: int|str,
link_to_type: str = None,
link_to_id: int|str = None,
rm_all_links: bool = False,
rm_orphan: bool = False,
):
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
log.debug(locals())
# Resolve account_id if it's a string (Vision ID or 'bypass')
if isinstance(account_id, str):
if res_acc := redis_lookup_id_random(record_id_random=account_id, table_name='account'):
account_id_int = res_acc
else:
# If bypass or not found, we still proceed but log it.
# In many maintenance cases, we don't want to block the deletion.
log.warning(f"Could not resolve account_id '{account_id}'. Proceeding without account restriction.")
account_id_int = None
else:
account_id_int = account_id
if hosted_file_id_int := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass
else: return False
if link_to_type and link_to_id:
if hosted_file_link_result := delete_hosted_file_link(
account_id = account_id_int,
hosted_file_id = hosted_file_id_int,
link_to_type = link_to_type,
link_to_id = link_to_id,
):
log.info('The hosted file link record was deleted.')
elif hosted_file_link_result is None:
log.warning('The hosted file link record was not found.')
else:
return False
if not rm_orphan: return True
if hosted_file_obj := load_hosted_file_obj(hosted_file_id = hosted_file_id_int, inc_hosted_file_link_list = True): pass
else: return False
if hosted_file_link_rec_list_result := get_hosted_file_link_rec_list(hosted_file_id=hosted_file_id_int):
log.info('Still not an orphan file.')
return True
# Orphan: Delete physical file
subdir_path = hosted_file_obj.subdirectory_path
hash_sha256 = hosted_file_obj.hash_sha256
file_path = os.path.join(settings.FILES_PATH['hosted_files_root'], subdir_path or '', f'{hash_sha256}.file')
if os.path.exists(file_path):
try:
pathlib.Path(file_path).unlink()
log.info(f"Unlinked physical file: {file_path}")
except OSError as e:
log.error(f"Error unlinking: {e}")
return False
# Delete record
sql = "DELETE FROM hosted_file WHERE id = :hosted_file_id"
if sql_delete(sql=sql, data={'hosted_file_id': hosted_file_id_int}):
log.info(f"Deleted record for hosted_file {hosted_file_id_int}")
return True
return False
# ### END ### API Hosted File Methods ### handle_delete_hosted_file() ###
# ### BEGIN ### API Hosted File Methods ### delete_hosted_file_link() ###
@logger_reset
def delete_hosted_file_link(
account_id: int|str,
hosted_file_id: int|str,
link_to_type: str,
link_to_id: int|str,
):
if hosted_file_id := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass
else: return False
if link_to_id := redis_lookup_id_random(record_id_random=link_to_id, table_name=link_to_type): pass
else: return False
sql = "DELETE FROM hosted_file_link WHERE hosted_file_id = :hosted_file_id AND link_to_type = :link_to_type AND link_to_id = :link_to_id"
if sql_delete(sql=sql, data={'hosted_file_id': hosted_file_id, 'link_to_type': link_to_type, 'link_to_id': link_to_id}):
return True
return False
# ### END ### API Hosted File Methods ### delete_hosted_file_link() ###
# ### BEGIN ### API Hosted File Methods ### get_hosted_file_rec_list() ###
@logger_reset
def get_hosted_file_rec_list(
for_obj_type: str,
for_obj_id: str,
limit: int = 1000,
enabled: str = 'enabled', # enabled, disabled, all
) -> list|bool:
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type): pass
else: return False
data = {f'{for_obj_type}_id': for_obj_id, 'limit': limit}
sql_enabled = "AND enable = :enable" if enabled == 'enabled' else ("AND enable = :enable" if enabled == 'disabled' else "")
if enabled != 'all': data['enable'] = (enabled == 'enabled')
sql = f"""
SELECT id AS 'hosted_file_id', id_random AS 'hosted_file_id_random'
FROM hosted_file
WHERE {for_obj_type}_id = :{for_obj_type}_id {sql_enabled}
ORDER BY created_on DESC, updated_on DESC, filename ASC
LIMIT :limit;
"""
if res := sql_select(data=data, sql=sql, as_list=True): return res
return []
# ### END ### API Hosted File Methods ### get_hosted_file_rec_list() ###
# ### BEGIN ### API Hosted File Methods ### get_hosted_file_link_rec_list() ###
@logger_reset
def get_hosted_file_link_rec_list(
hosted_file_id: int|str,
limit: int = 10,
offset: int = 0,
) -> list|bool:
data = {'hosted_file_id': hosted_file_id, 'limit': limit, 'offset': offset}
sql = "SELECT * FROM hosted_file_link WHERE hosted_file_id = :hosted_file_id ORDER BY created_on DESC LIMIT :limit OFFSET :offset"
if res := sql_select(data=data, sql=sql, as_list=True): return res
return []
# ### END ### API Hosted File Methods ### get_hosted_file_link_rec_list() ###