import os import pathlib import shutil import time import tempfile import subprocess import shlex import logging import mimetypes from app.config import settings from app.lib_general import log, logging from app.db_sql import sql_select, sql_update, sql_insert, get_id_random from app.methods.hosted_file_methods import ( load_hosted_file_obj, create_hosted_file_obj, save_file_to_hosted_file ) from app.models.hosted_file_models import Hosted_File_Base # ### BEGIN ### API Hosted File Methods ### clip_video_method() ### async def clip_video_method( hosted_file_id: str, start_time: str, end_time: str, account_id: int, link_to_type: str, link_to_id: int, filename_no_ext: str = 'automated_hosted_file_clip_video', to_type: str = 'mp4', reencode: bool = False, scale_down: bool = False, ): """ Business logic for clipping a video using ffmpeg and saving as a new hosted_file. Returns the new hosted_file dict or False. """ hosted_file_obj = load_hosted_file_obj(hosted_file_id=hosted_file_id) if not hosted_file_obj: return False file_hash = hosted_file_obj.hash_sha256 hosted_files_path = settings.FILES_PATH['hosted_files_root'] full_file_path = os.path.join(hosted_files_path, file_hash[0:2], f'{file_hash}.file') if not os.path.exists(full_file_path): return False with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_video_file_clip: tmp_video_file_clip_path = tmp_video_file_clip.name if scale_down: new_filename = f'{filename_no_ext}_[clip_scaled].{to_type}' cmd = f'ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -vf "scale=w=1920:h=1080:force_original_aspect_ratio=decrease" -c:v libx264 -crf 23 -maxrate 2M -bufsize 2M -c:a copy -movflags +faststart {tmp_video_file_clip_path}' elif reencode: new_filename = f'{filename_no_ext}_[clip_reencode].{to_type}' cmd = f"ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -c:v libx264 -crf 23 -maxrate 2M -bufsize 2M -c:a copy -movflags +faststart {tmp_video_file_clip_path}" else: new_filename = f'{filename_no_ext}_[clip].{to_type}' cmd = f"ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -c:v copy -c:a copy -movflags +faststart {tmp_video_file_clip_path}" args = shlex.split(cmd) try: subprocess.run(args, check=True, capture_output=True, text=True) except subprocess.CalledProcessError: return False file_info = await save_file_to_hosted_file( file_path = tmp_video_file_clip_path, filename = new_filename, extension = to_type, account_id = account_id, link_to_type = link_to_type, link_to_id = link_to_id, ) if file_info.get('saved'): if sel := sql_select(table_name='hosted_file', field_name='hash_sha256', field_value=file_info['hash_sha256']): return load_hosted_file_obj(hosted_file_id=sel.get('id'), model_as_dict=True) else: new_obj = Hosted_File_Base(**file_info) if res_id := create_hosted_file_obj(hosted_file_obj_new=new_obj): return load_hosted_file_obj(hosted_file_id=res_id, model_as_dict=True) return False # ### END ### API Hosted File Methods ### clip_video_method() ### # ### BEGIN ### API Hosted File Methods ### convert_file_method() ### async def convert_file_method( hosted_file_id: str, link_to_type: str, link_to_id: int, account_id: int, filename_no_ext: str = 'automated_hosted_file_conversion', to_type: str = 'webp', ): from pdf2image import convert_from_path hosted_file_obj = load_hosted_file_obj(hosted_file_id=hosted_file_id) if not hosted_file_obj: return False full_file_path = os.path.join(settings.FILES_PATH['hosted_files_root'], hosted_file_obj.hash_sha256[0:2], f'{hosted_file_obj.hash_sha256}.file') if not os.path.exists(full_file_path): return False save_path = os.path.join(settings.FILES_PATH['hosted_tmp_root'], 'convert_file', f'conv_{int(time.time())}.{to_type}') os.makedirs(os.path.dirname(save_path), exist_ok=True) images = convert_from_path(full_file_path, size=(3840, None)) image = images[0] if to_type == 'webp': image.save(save_path, lossless=False, quality=90) elif to_type == 'png': image.save(save_path, compress_level=9) else: return False file_info = await save_file_to_hosted_file( file_path = save_path, filename = f'{filename_no_ext}.{to_type}', extension = to_type, account_id = account_id, link_to_type = link_to_type, link_to_id = link_to_id, ) if file_info.get('saved'): if sel := sql_select(table_name='hosted_file', field_name='hash_sha256', field_value=file_info['hash_sha256']): return load_hosted_file_obj(hosted_file_id=sel.get('id'), model_as_dict=True) else: new_obj = Hosted_File_Base(**file_info) if res_id := create_hosted_file_obj(hosted_file_obj_new=new_obj): return load_hosted_file_obj(hosted_file_id=res_id, model_as_dict=True) return False # ### END ### API Hosted File Methods ### convert_file_method() ###