- Reduced api_crud.py (1843 -> 143 lines) by extracting V1 registry and logic. - Reduced hosted_file.py (1596 -> 361 lines) by moving storage and media logic to methods. - Created lib_media.py for specialized video/image processing. - Created api_crud_methods.py for legacy template handlers. - Created legacy_v1.py for the legacy object registry. - Fixed subdirectory_path bug in Hosted File creation. - Verified full File Lifecycle via consolidated E2E suite.
574 lines
22 KiB
Python
574 lines
22 KiB
Python
import datetime, hashlib, mimetypes, os, pathlib, shutil, time
|
|
|
|
from fastapi import File, UploadFile
|
|
from typing import Dict, List, Optional, Set, Union
|
|
from pydantic import BaseModel, EmailStr, Field, PrivateAttr, ValidationError, validator
|
|
|
|
from app.config import settings
|
|
from app.db_sql import redis_lookup_id_random, sql_delete, sql_enable_part, sql_insert, sql_limit_offset_part, sql_select, sql_update, get_id_random
|
|
from app.lib_general import log, logging, logger_reset
|
|
|
|
from app.models.hosted_file_models import Hosted_File_Base
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### directory_check_method() ###
|
|
# Extracted 2026-02-03
|
|
def directory_check_method(rm_orphan: bool = False):
|
|
"""
|
|
Logic for scanning the hosted_files root and migrating legacy files to 2-char subdirectories.
|
|
Returns a list of processed files.
|
|
"""
|
|
hosted_files_path = settings.FILES_PATH['hosted_files_root']
|
|
if not os.path.isdir(hosted_files_path):
|
|
return False
|
|
|
|
directory_list = os.listdir(hosted_files_path)
|
|
result_list = []
|
|
count = 0
|
|
|
|
for item in directory_list:
|
|
if count >= 100: break # Rate limited per call
|
|
|
|
file_path = os.path.join(hosted_files_path, item)
|
|
if os.path.isfile(file_path):
|
|
if '.file' not in item: continue
|
|
|
|
log.info(f'Migrating legacy file to subdirectory: {item}')
|
|
result_list.append(file_path)
|
|
|
|
# Create a subdirectory with the first 2 characters of the hash
|
|
full_subdirectory_path = os.path.join(hosted_files_path, item[:2])
|
|
os.makedirs(full_subdirectory_path, exist_ok=True)
|
|
|
|
# Move the file
|
|
shutil.move(file_path, os.path.join(full_subdirectory_path, item))
|
|
count += 1
|
|
|
|
return result_list
|
|
# ### END ### API Hosted File Methods ### directory_check_method() ###
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### create_hosted_file_obj() ###
|
|
@logger_reset
|
|
def create_hosted_file_obj(hosted_file_obj_new:Hosted_File_Base):
|
|
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
# We need to explicitly include subdirectory_path because it has Field(exclude=True) in the model
|
|
# which prevents it from showing in the public API, but also strips it from .dict() by default.
|
|
hosted_file_obj_data = hosted_file_obj_new.dict(
|
|
by_alias=False,
|
|
exclude_defaults=False,
|
|
exclude_unset=True,
|
|
exclude={'saved', 'already_exists', 'copy_timer', 'created_on', 'updated_on'}
|
|
)
|
|
|
|
# Force inclusion of subdirectory_path if present in the object
|
|
if hasattr(hosted_file_obj_new, 'subdirectory_path') and hosted_file_obj_new.subdirectory_path:
|
|
hosted_file_obj_data['subdirectory_path'] = hosted_file_obj_new.subdirectory_path
|
|
|
|
if hosted_file_obj_in_result := sql_insert(data=hosted_file_obj_data, table_name='hosted_file', rm_id_random=True, id_random_length=8): pass
|
|
else:
|
|
return False
|
|
|
|
log.debug(hosted_file_obj_in_result)
|
|
|
|
hosted_file_id = hosted_file_obj_in_result
|
|
|
|
log.debug(f'Returning the new hosted_file_id: {hosted_file_id}')
|
|
return hosted_file_id
|
|
# ### END ### API Hosted File Methods ### create_hosted_file_obj() ###
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### load_hosted_file_obj() ###
|
|
# Updated 2023-08-18
|
|
@logger_reset
|
|
def load_hosted_file_obj(
|
|
hosted_file_id: int|str,
|
|
limit: int = 1000,
|
|
by_alias: bool = True,
|
|
exclude_unset: bool = True,
|
|
model_as_dict: bool = False,
|
|
enabled: str = 'enabled', # enabled, disabled, all
|
|
inc_hosted_file_link_list: bool = False,
|
|
) -> Hosted_File_Base|dict|bool:
|
|
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if hosted_file_id := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass
|
|
else: return False
|
|
|
|
if hosted_file_rec := sql_select(table_name='v_hosted_file', record_id=hosted_file_id): pass
|
|
elif hosted_file_rec is None: return None
|
|
else: return False
|
|
log.debug(hosted_file_rec)
|
|
|
|
try:
|
|
hosted_file_obj = Hosted_File_Base(**hosted_file_rec)
|
|
except ValidationError as e:
|
|
log.error(e.json())
|
|
return False
|
|
log.info(f'Filename: {hosted_file_obj.filename}; Size: {hosted_file_obj.size}; Hash SHA256: {hosted_file_obj.hash_sha256}; ')
|
|
log.debug(hosted_file_obj)
|
|
|
|
if model_as_dict:
|
|
return hosted_file_obj.dict(by_alias=by_alias, exclude_unset=exclude_unset)
|
|
else:
|
|
return hosted_file_obj
|
|
# ### END ### API Hosted File Methods ### load_hosted_file_obj() ###
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### lookup_file_hash() ###
|
|
# Updated 2022-08-09
|
|
@logger_reset
|
|
def lookup_file_hash(
|
|
file_hash: str,
|
|
) -> Hosted_File_Base|dict|bool:
|
|
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
|
|
sql = f"""
|
|
SELECT id AS 'hosted_file_id', id_random AS 'hosted_file_id_random'
|
|
FROM hosted_file
|
|
WHERE hosted_file.hash_sha256 = :hash_sha256
|
|
"""
|
|
log.debug(sql)
|
|
|
|
hosted_file_data = {}
|
|
hosted_file_data['hash_sha256'] = file_hash
|
|
log.debug(hosted_file_data)
|
|
|
|
if hosted_file_select_result := sql_select(sql=sql, data=hosted_file_data):
|
|
hosted_file_id = hosted_file_select_result.get('hosted_file_id')
|
|
hosted_file_id_random = hosted_file_select_result.get('hosted_file_id_random')
|
|
log.info(f'Selected Hosted File record. Hosted File ID: {hosted_file_id}')
|
|
return hosted_file_id
|
|
elif hosted_file_select_result is None:
|
|
log.warning(f'Hosted File record was not found. SHA 256 Hash: {file_hash}')
|
|
return None
|
|
# pass
|
|
else:
|
|
log.error(f'Something went wrong while trying to select the hosted file record. SHA 256 Hash: {file_hash}')
|
|
return False
|
|
# ### END ### API Hosted File Methods ### lookup_file_hash() ###
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### get_file_object_hash() ###
|
|
# Really shouldn't this be called generate_file_obj_hash() ??? -2023-05-04
|
|
@logger_reset
|
|
async def get_file_object_hash(file_object:File):
|
|
#log.setLevel(logging.WARNING) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
# 4096 bytes is the current block size on my workstation and Linode server
|
|
# 4096 8192 16384 32768 65536 131072 262144 524288 1048576 bytes
|
|
block_size = 131072
|
|
hash_value = hashlib.sha256()
|
|
|
|
timer_start = time.process_time()
|
|
for chunk in iter(lambda: file_object.read(block_size), b""):
|
|
hash_value.update(chunk)
|
|
file_hash = hash_value.hexdigest()
|
|
file_object.seek(0) # The file will not properly save if seek is not reset to 0.
|
|
timer_end = time.process_time()
|
|
elapsed_time = timer_end - timer_start
|
|
log.debug(f'Elapsed time: {elapsed_time}')
|
|
|
|
return file_hash
|
|
# ### END ### API Hosted File Methods ### get_file_object_hash() ###
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### guess_file_extension() ###
|
|
def guess_file_extension(filename: str):
|
|
return filename.rsplit('.', 1)[1].lower()
|
|
# ### END ### API Hosted File Methods ### guess_file_extension() ###
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### allowed_file_extension() ###
|
|
def allowed_file_extension(extension: str, extension_list: list):
|
|
return extension.lower() in extension_list # app.config['ALLOWED_EXTENSIONS']
|
|
# ### END ### API Hosted File Methods ### allowed_file_extension() ###
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### lookup_file_hash() ###
|
|
# Updated 2023-09-19
|
|
@logger_reset
|
|
def check_for_hosted_file_hash_file(
|
|
file_hash: str,
|
|
sub_dir: str,
|
|
) -> dict|bool:
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
file_size = None
|
|
|
|
hosted_files_path = settings.FILES_PATH['hosted_files_root']
|
|
log.info(f'Hosted Files Path: {hosted_files_path}')
|
|
log.debug(shutil.disk_usage(hosted_files_path))
|
|
|
|
hosted_files_dir_w_subdir = os.path.join(hosted_files_path, sub_dir)
|
|
path_hosted_files_dir_w_subdir = pathlib.Path(hosted_files_dir_w_subdir)
|
|
|
|
if path_hosted_files_dir_w_subdir.exists(): pass
|
|
else:
|
|
log.warning('Hashed hosted file subdirectory was not found in the hosted files root.')
|
|
return False
|
|
|
|
hosted_files_dir_w_subdir_filename = os.path.join(hosted_files_path, sub_dir, f'{file_hash}.file')
|
|
path_hosted_files_dir_w_subdir_filename = pathlib.Path(hosted_files_dir_w_subdir_filename)
|
|
if path_hosted_files_dir_w_subdir_filename.exists():
|
|
file_size = os.path.getsize(path_hosted_files_dir_w_subdir_filename)
|
|
else:
|
|
log.warning('Hashed hosted file not found in the expected hosted files subdirectory.')
|
|
return False
|
|
|
|
return {'found': True, 'file_size': file_size}
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### save_file() ###
|
|
# Updated 2022-08-09
|
|
@logger_reset
|
|
async def save_file(
|
|
file: UploadFile,
|
|
account_id: int,
|
|
link_to_type: str,
|
|
link_to_id: int,
|
|
account_id_random: str = None,
|
|
link_to_id_random: str = None,
|
|
check_allowed_extension: bool = False,
|
|
):
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
hosted_files_path = settings.FILES_PATH['hosted_files_root']
|
|
log.info(f'Hosted Files Path: {hosted_files_path}')
|
|
log.debug(shutil.disk_usage(hosted_files_path))
|
|
|
|
if file.filename.endswith('.docwin'):
|
|
file.filename = file.filename.replace('.docwin', '.doc')
|
|
if file.filename.endswith('.docxwin'):
|
|
file.filename = file.filename.replace('.docxwin', '.docx')
|
|
if file.filename.endswith('.odpmac'):
|
|
file.filename = file.filename.replace('.odpmac', '.odp')
|
|
if file.filename.endswith('.odpwin'):
|
|
file.filename = file.filename.replace('.odpwin', '.odp')
|
|
if file.filename.endswith('.pdfmac'):
|
|
file.filename = file.filename.replace('.pdfmac', '.pdf')
|
|
if file.filename.endswith('.pdfwin'):
|
|
file.filename = file.filename.replace('.pdfwin', '.pdf')
|
|
if file.filename.endswith('.pptmac'):
|
|
file.filename = file.filename.replace('.pptmac', '.ppt')
|
|
if file.filename.endswith('.pptxmac'):
|
|
file.filename = file.filename.replace('.pptxmac', '.pptx')
|
|
if file.filename.endswith('.pptwin'):
|
|
file.filename = file.filename.replace('.pptwin', '.ppt')
|
|
if file.filename.endswith('.pptxwin'):
|
|
file.filename = file.filename.replace('.pptxwin', '.pptx')
|
|
if file.filename.endswith('.xlswin'):
|
|
file.filename = file.filename.replace('.xlswin', '.xls')
|
|
if file.filename.endswith('.xlsxwin'):
|
|
file.filename = file.filename.replace('.xlsxwin', '.xlsx')
|
|
|
|
file_info: dict = {}
|
|
file_info['saved'] = None
|
|
file_info['link_to_type'] = link_to_type
|
|
file_info['link_to_id'] = link_to_id
|
|
file_info['link_to_id_random'] = link_to_id_random
|
|
file_info['filename'] = file.filename
|
|
file_info['extension'] = guess_file_extension(filename=file.filename)
|
|
|
|
if check_allowed_extension:
|
|
if allowed_file_extension(extension=file_info['extension'], extension_list=['jpg','png','webp']):
|
|
file_info['extension_allowed'] = True
|
|
else:
|
|
file_info['extension_allowed'] = False
|
|
file_info['saved'] = False
|
|
return file_info
|
|
else:
|
|
file_info['extension_allowed'] = None
|
|
|
|
file_info['content_type'] = file.content_type
|
|
|
|
file.file.seek(0, os.SEEK_END)
|
|
file_size = file.file.tell()
|
|
file.file.seek(0)
|
|
file_info['size'] = file_size
|
|
|
|
file_hash = await get_file_object_hash(file.file)
|
|
file_info['hash_sha256'] = file_hash
|
|
|
|
buffer_size = 524288
|
|
f_src = file.file
|
|
|
|
file_hash_subdirectory = file_hash[0:2]
|
|
subdirectory_dest = os.path.join(hosted_files_path, file_hash_subdirectory)
|
|
log.info(f"Subdirectory Dest: {subdirectory_dest}")
|
|
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
|
|
file_info['subdirectory_path'] = file_hash_subdirectory
|
|
|
|
file_dest_w_subdir = os.path.join(subdirectory_dest, f'{file_hash}.file')
|
|
existing_file_check_subdir = pathlib.Path(file_dest_w_subdir)
|
|
|
|
if existing_file_check_subdir.exists():
|
|
file_info['already_exists'] = True
|
|
file_info['already_exists_subdir'] = True
|
|
file_info['copy_timer'] = 0
|
|
file_info['saved'] = True
|
|
else:
|
|
file_info['already_exists'] = False
|
|
file_info['already_exists_subdir'] = False
|
|
try:
|
|
f_dest = open(file_dest_w_subdir, 'wb')
|
|
timer_start = time.process_time()
|
|
shutil.copyfileobj(f_src, f_dest, buffer_size)
|
|
timer_end = time.process_time()
|
|
file_info['copy_timer'] = timer_end - timer_start
|
|
file_info['saved'] = True
|
|
except Exception as e:
|
|
log.exception(f'Error saving file: {e}')
|
|
file_info['copy_timer'] = 0
|
|
file_info['saved'] = False
|
|
return False
|
|
return file_info
|
|
# ### END ### API Hosted File Methods ### save_file() ###
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### save_file_to_hosted_file() ###
|
|
# Updated 2022-08-09
|
|
@logger_reset
|
|
async def save_file_to_hosted_file(
|
|
file_path: str,
|
|
filename: str,
|
|
extension: str,
|
|
account_id: int,
|
|
link_to_type: str,
|
|
link_to_id: int,
|
|
):
|
|
log.setLevel(logging.DEBUG) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
hosted_files_path = settings.FILES_PATH['hosted_files_root']
|
|
file_obj = open(file_path, 'rb')
|
|
|
|
file_info: dict = {}
|
|
file_info['saved'] = None
|
|
file_info['link_to_type'] = link_to_type
|
|
file_info['link_to_id'] = link_to_id
|
|
file_info['filename'] = filename
|
|
file_info['extension'] = extension
|
|
file_info['content_type'] = mimetypes.guess_type(filename)[0]
|
|
|
|
file_obj.seek(0, os.SEEK_END)
|
|
file_size = file_obj.tell()
|
|
file_obj.seek(0)
|
|
file_info['size'] = file_size
|
|
|
|
file_hash = await get_file_object_hash(file_obj)
|
|
file_info['hash_sha256'] = file_hash
|
|
|
|
buffer_size = 524288
|
|
f_src = file_obj
|
|
|
|
file_hash_subdirectory = file_hash[0:2]
|
|
subdirectory_dest = os.path.join(hosted_files_path, file_hash_subdirectory)
|
|
pathlib.Path(subdirectory_dest).mkdir(parents=True, exist_ok=True)
|
|
file_info['subdirectory_path'] = file_hash_subdirectory
|
|
|
|
file_dest_w_subdir = os.path.join(subdirectory_dest, f'{file_hash}.file')
|
|
existing_file_check_subdir = pathlib.Path(file_dest_w_subdir)
|
|
|
|
if existing_file_check_subdir.exists():
|
|
file_info['already_exists'] = True
|
|
file_info['already_exists_subdir'] = True
|
|
file_info['copy_timer'] = 0
|
|
file_info['saved'] = True
|
|
else:
|
|
file_info['already_exists'] = False
|
|
file_info['already_exists_subdir'] = False
|
|
try:
|
|
f_dest = open(file_dest_w_subdir, 'wb')
|
|
timer_start = time.process_time()
|
|
shutil.copyfileobj(f_src, f_dest, buffer_size)
|
|
timer_end = time.process_time()
|
|
file_info['copy_timer'] = timer_end - timer_start
|
|
file_info['saved'] = True
|
|
except Exception as e:
|
|
log.exception(f'Error saving to hosted storage: {e}')
|
|
file_info['copy_timer'] = 0
|
|
file_info['saved'] = False
|
|
return False
|
|
return file_info
|
|
# ### END ### API Hosted File Methods ### save_file_to_hosted_file() ###
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### create_hosted_file_link() ###
|
|
# Updated 2022-08-09
|
|
@logger_reset
|
|
def create_hosted_file_link(
|
|
account_id: int|str,
|
|
hosted_file_id: int|str,
|
|
link_to_type: str,
|
|
link_to_id: int|str,
|
|
):
|
|
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
if account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): pass
|
|
else: return False
|
|
if hosted_file_id := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass
|
|
else: return False
|
|
if link_to_id := redis_lookup_id_random(record_id_random=link_to_id, table_name=link_to_type): pass
|
|
else: return False
|
|
|
|
hosted_file_link_data: dict = {}
|
|
hosted_file_link_data['account_id'] = account_id
|
|
hosted_file_link_data['hosted_file_id'] = hosted_file_id
|
|
hosted_file_link_data['link_to_type'] = link_to_type
|
|
hosted_file_link_data['link_to_id'] = link_to_id
|
|
|
|
if hosted_file_link_data_in_result := sql_insert(data=hosted_file_link_data, table_name='hosted_file_link', id_random_length=0):
|
|
log.info('The hosted_file_link was created.')
|
|
elif hosted_file_link_data_in_result is None:
|
|
log.info('The hosted_file_link probably already exists.')
|
|
return None
|
|
else:
|
|
return False
|
|
return True
|
|
# ### END ### API Hosted File Methods ### create_hosted_file_link() ###
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### handle_delete_hosted_file() ###
|
|
# Updated 2026-02-03
|
|
@logger_reset
|
|
def handle_delete_hosted_file(
|
|
account_id: int|str,
|
|
hosted_file_id: int|str,
|
|
link_to_type: str = None,
|
|
link_to_id: int|str = None,
|
|
rm_all_links: bool = False,
|
|
rm_orphan: bool = False,
|
|
):
|
|
log.setLevel(logging.INFO) # DEBUG, INFO, WARNING, ERROR, EXCEPTION, CRITICAL
|
|
log.debug(locals())
|
|
|
|
# Resolve account_id if it's a string (Vision ID or 'bypass')
|
|
if isinstance(account_id, str):
|
|
if res_acc := redis_lookup_id_random(record_id_random=account_id, table_name='account'):
|
|
account_id_int = res_acc
|
|
else:
|
|
# If bypass or not found, we still proceed but log it.
|
|
# In many maintenance cases, we don't want to block the deletion.
|
|
log.warning(f"Could not resolve account_id '{account_id}'. Proceeding without account restriction.")
|
|
account_id_int = None
|
|
else:
|
|
account_id_int = account_id
|
|
|
|
if hosted_file_id_int := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass
|
|
else: return False
|
|
|
|
if link_to_type and link_to_id:
|
|
if hosted_file_link_result := delete_hosted_file_link(
|
|
account_id = account_id_int,
|
|
hosted_file_id = hosted_file_id_int,
|
|
link_to_type = link_to_type,
|
|
link_to_id = link_to_id,
|
|
):
|
|
log.info('The hosted file link record was deleted.')
|
|
elif hosted_file_link_result is None:
|
|
log.warning('The hosted file link record was not found.')
|
|
else:
|
|
return False
|
|
|
|
if not rm_orphan: return True
|
|
|
|
if hosted_file_obj := load_hosted_file_obj(hosted_file_id = hosted_file_id_int, inc_hosted_file_link_list = True): pass
|
|
else: return False
|
|
|
|
if hosted_file_link_rec_list_result := get_hosted_file_link_rec_list(hosted_file_id=hosted_file_id_int):
|
|
log.info('Still not an orphan file.')
|
|
return True
|
|
|
|
# Orphan: Delete physical file
|
|
subdir_path = hosted_file_obj.subdirectory_path
|
|
hash_sha256 = hosted_file_obj.hash_sha256
|
|
file_path = os.path.join(settings.FILES_PATH['hosted_files_root'], subdir_path or '', f'{hash_sha256}.file')
|
|
|
|
if os.path.exists(file_path):
|
|
try:
|
|
pathlib.Path(file_path).unlink()
|
|
log.info(f"Unlinked physical file: {file_path}")
|
|
except OSError as e:
|
|
log.error(f"Error unlinking: {e}")
|
|
return False
|
|
|
|
# Delete record
|
|
sql = "DELETE FROM hosted_file WHERE id = :hosted_file_id"
|
|
if sql_delete(sql=sql, data={'hosted_file_id': hosted_file_id_int}):
|
|
log.info(f"Deleted record for hosted_file {hosted_file_id_int}")
|
|
return True
|
|
return False
|
|
# ### END ### API Hosted File Methods ### handle_delete_hosted_file() ###
|
|
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### delete_hosted_file_link() ###
|
|
@logger_reset
|
|
def delete_hosted_file_link(
|
|
account_id: int|str,
|
|
hosted_file_id: int|str,
|
|
link_to_type: str,
|
|
link_to_id: int|str,
|
|
):
|
|
if hosted_file_id := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass
|
|
else: return False
|
|
|
|
if link_to_id := redis_lookup_id_random(record_id_random=link_to_id, table_name=link_to_type): pass
|
|
else: return False
|
|
|
|
sql = "DELETE FROM hosted_file_link WHERE hosted_file_id = :hosted_file_id AND link_to_type = :link_to_type AND link_to_id = :link_to_id"
|
|
if sql_delete(sql=sql, data={'hosted_file_id': hosted_file_id, 'link_to_type': link_to_type, 'link_to_id': link_to_id}):
|
|
return True
|
|
return False
|
|
# ### END ### API Hosted File Methods ### delete_hosted_file_link() ###
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### get_hosted_file_rec_list() ###
|
|
@logger_reset
|
|
def get_hosted_file_rec_list(
|
|
for_obj_type: str,
|
|
for_obj_id: str,
|
|
limit: int = 1000,
|
|
enabled: str = 'enabled', # enabled, disabled, all
|
|
) -> list|bool:
|
|
if for_obj_id := redis_lookup_id_random(record_id_random=for_obj_id, table_name=for_obj_type): pass
|
|
else: return False
|
|
|
|
data = {f'{for_obj_type}_id': for_obj_id, 'limit': limit}
|
|
sql_enabled = "AND enable = :enable" if enabled == 'enabled' else ("AND enable = :enable" if enabled == 'disabled' else "")
|
|
if enabled != 'all': data['enable'] = (enabled == 'enabled')
|
|
|
|
sql = f"""
|
|
SELECT id AS 'hosted_file_id', id_random AS 'hosted_file_id_random'
|
|
FROM hosted_file
|
|
WHERE {for_obj_type}_id = :{for_obj_type}_id {sql_enabled}
|
|
ORDER BY created_on DESC, updated_on DESC, filename ASC
|
|
LIMIT :limit;
|
|
"""
|
|
if res := sql_select(data=data, sql=sql, as_list=True): return res
|
|
return []
|
|
# ### END ### API Hosted File Methods ### get_hosted_file_rec_list() ###
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### get_hosted_file_link_rec_list() ###
|
|
@logger_reset
|
|
def get_hosted_file_link_rec_list(
|
|
hosted_file_id: int|str,
|
|
limit: int = 10,
|
|
offset: int = 0,
|
|
) -> list|bool:
|
|
data = {'hosted_file_id': hosted_file_id, 'limit': limit, 'offset': offset}
|
|
sql = "SELECT * FROM hosted_file_link WHERE hosted_file_id = :hosted_file_id ORDER BY created_on DESC LIMIT :limit OFFSET :offset"
|
|
if res := sql_select(data=data, sql=sql, as_list=True): return res
|
|
return []
|
|
# ### END ### API Hosted File Methods ### get_hosted_file_link_rec_list() ### |