import os import pathlib import shutil import time import tempfile import subprocess import shlex import logging import mimetypes from app.config import settings from app.lib_general import log, logging from app.db_sql import sql_select, sql_update, sql_insert, get_id_random from app.methods.hosted_file_methods import ( load_hosted_file_obj, create_hosted_file_obj, save_file_to_hosted_file ) from app.models.hosted_file_models import Hosted_File_Base # ### BEGIN ### API Hosted File Methods ### clip_video_method() ### async def clip_video_method( hosted_file_id: str, start_time: str, end_time: str, account_id: int, link_to_type: str, link_to_id: int, account_id_random: str = None, filename_no_ext: str = 'automated_hosted_file_clip_video', to_type: str = 'mp4', reencode: bool = False, scale_down: bool = False, ): """ Business logic for clipping a video using ffmpeg and saving as a new hosted_file. Returns the new hosted_file dict or False. """ # NOTE: This function is invoked by the hosted_file router at # `/hosted_file/{hosted_file_id}/clip_video` and returns the created # hosted_file metadata (or False) so the router can build the standard # response body consumed by frontends. hosted_file_obj = load_hosted_file_obj(hosted_file_id=hosted_file_id) if not hosted_file_obj: return False file_hash = hosted_file_obj.hash_sha256 hosted_files_path = settings.FILES_PATH['hosted_files_root'] full_file_path = os.path.join(hosted_files_path, file_hash[0:2], f'{file_hash}.file') if not os.path.exists(full_file_path): return False with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_video_file_clip: tmp_video_file_clip_path = tmp_video_file_clip.name try: if scale_down: new_filename = f'{filename_no_ext}_[clip_scaled].{to_type}' cmd = f'ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -vf "scale=w=1920:h=1080:force_original_aspect_ratio=decrease" -c:v libx264 -crf 23 -maxrate 2M -bufsize 2M -c:a copy -movflags +faststart {tmp_video_file_clip_path}' elif reencode: new_filename = f'{filename_no_ext}_[clip_reencode].{to_type}' cmd = f"ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -c:v libx264 -crf 23 -maxrate 2M -bufsize 2M -c:a copy -movflags +faststart {tmp_video_file_clip_path}" else: new_filename = f'{filename_no_ext}_[clip].{to_type}' cmd = f"ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -c:v copy -c:a copy -movflags +faststart {tmp_video_file_clip_path}" args = shlex.split(cmd) try: subprocess.run(args, check=True, capture_output=True, text=True) except subprocess.CalledProcessError as e: log.exception(f'ffmpeg failed: returncode={e.returncode}; stdout={e.stdout}; stderr={e.stderr}') return False file_info = await save_file_to_hosted_file( file_path = tmp_video_file_clip_path, filename = new_filename, extension = to_type, account_id = account_id, account_id_random = account_id_random, link_to_type = link_to_type, link_to_id = link_to_id, ) if file_info.get('saved'): if sel := sql_select(table_name='hosted_file', field_name='hash_sha256', field_value=file_info['hash_sha256']): return load_hosted_file_obj(hosted_file_id=sel.get('id'), model_as_dict=True) else: new_obj = Hosted_File_Base(**file_info) if res_id := create_hosted_file_obj(hosted_file_obj_new=new_obj): return load_hosted_file_obj(hosted_file_id=res_id, model_as_dict=True) finally: try: if os.path.exists(tmp_video_file_clip_path): os.unlink(tmp_video_file_clip_path) except Exception: log.exception('Failed to remove temporary video clip file') return False # ### END ### API Hosted File Methods ### clip_video_method() ### # ### BEGIN ### API Hosted File Methods ### convert_file_method() ### async def convert_file_method( hosted_file_id: str, link_to_type: str, link_to_id: int, account_id: int, account_id_random: str = None, filename_no_ext: str = 'automated_hosted_file_conversion', to_type: str = 'webp', ): from pdf2image import convert_from_path # NOTE: Invoked by the hosted_file router at # `/hosted_file/{hosted_file_id}/convert_file`. This helper currently # converts the first page of a PDF to an image (webp/png) and saves a # new hosted_file record; it returns that record or False on failure. hosted_file_obj = load_hosted_file_obj(hosted_file_id=hosted_file_id) if not hosted_file_obj: return False # Ensure input is a PDF (pdf2image is designed for PDFs) if (getattr(hosted_file_obj, 'extension', None) or '').lower() != 'pdf' and (getattr(hosted_file_obj, 'content_type', None) or '') != 'application/pdf': log.warning('convert_file_method called on non-PDF file') return False full_file_path = os.path.join(settings.FILES_PATH['hosted_files_root'], hosted_file_obj.hash_sha256[0:2], f'{hosted_file_obj.hash_sha256}.file') if not os.path.exists(full_file_path): return False if not os.path.exists(full_file_path): return False save_path = os.path.join(settings.FILES_PATH['hosted_tmp_root'], 'convert_file', f'conv_{int(time.time())}.{to_type}') os.makedirs(os.path.dirname(save_path), exist_ok=True) try: images = convert_from_path(full_file_path, size=(3840, None)) image = images[0] if to_type == 'webp': image.save(save_path, lossless=False, quality=90) elif to_type == 'png': image.save(save_path, compress_level=9) else: log.warning(f'Unsupported target type for convert_file_method: {to_type}') return False except Exception: log.exception('Error converting file to image') return False file_info = await save_file_to_hosted_file( file_path = save_path, filename = f'{filename_no_ext}.{to_type}', extension = to_type, account_id = account_id, account_id_random = account_id_random, link_to_type = link_to_type, link_to_id = link_to_id, ) if file_info.get('saved'): if sel := sql_select(table_name='hosted_file', field_name='hash_sha256', field_value=file_info['hash_sha256']): return load_hosted_file_obj(hosted_file_id=sel.get('id'), model_as_dict=True) else: new_obj = Hosted_File_Base(**file_info) if res_id := create_hosted_file_obj(hosted_file_obj_new=new_obj): # cleanup tmp file try: if os.path.exists(save_path): os.unlink(save_path) except Exception: log.exception('Failed to remove temporary converted file') return load_hosted_file_obj(hosted_file_id=res_id, model_as_dict=True) return False # ### END ### API Hosted File Methods ### convert_file_method() ###