- Reduced api_crud.py (1843 -> 143 lines) by extracting V1 registry and logic. - Reduced hosted_file.py (1596 -> 361 lines) by moving storage and media logic to methods. - Created lib_media.py for specialized video/image processing. - Created api_crud_methods.py for legacy template handlers. - Created legacy_v1.py for the legacy object registry. - Fixed subdirectory_path bug in Hosted File creation. - Verified full File Lifecycle via consolidated E2E suite.
131 lines
5.5 KiB
Python
131 lines
5.5 KiB
Python
import os
|
|
import pathlib
|
|
import shutil
|
|
import time
|
|
import tempfile
|
|
import subprocess
|
|
import shlex
|
|
import logging
|
|
import mimetypes
|
|
|
|
from app.config import settings
|
|
from app.lib_general import log, logging
|
|
from app.db_sql import sql_select, sql_update, sql_insert, get_id_random
|
|
from app.methods.hosted_file_methods import (
|
|
load_hosted_file_obj, create_hosted_file_obj, save_file_to_hosted_file
|
|
)
|
|
from app.models.hosted_file_models import Hosted_File_Base
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### clip_video_method() ###
|
|
async def clip_video_method(
|
|
hosted_file_id: str,
|
|
start_time: str,
|
|
end_time: str,
|
|
account_id: int,
|
|
link_to_type: str,
|
|
link_to_id: int,
|
|
filename_no_ext: str = 'automated_hosted_file_clip_video',
|
|
to_type: str = 'mp4',
|
|
reencode: bool = False,
|
|
scale_down: bool = False,
|
|
):
|
|
"""
|
|
Business logic for clipping a video using ffmpeg and saving as a new hosted_file.
|
|
Returns the new hosted_file dict or False.
|
|
"""
|
|
hosted_file_obj = load_hosted_file_obj(hosted_file_id=hosted_file_id)
|
|
if not hosted_file_obj: return False
|
|
|
|
file_hash = hosted_file_obj.hash_sha256
|
|
hosted_files_path = settings.FILES_PATH['hosted_files_root']
|
|
full_file_path = os.path.join(hosted_files_path, file_hash[0:2], f'{file_hash}.file')
|
|
|
|
if not os.path.exists(full_file_path): return False
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_video_file_clip:
|
|
tmp_video_file_clip_path = tmp_video_file_clip.name
|
|
|
|
if scale_down:
|
|
new_filename = f'{filename_no_ext}_[clip_scaled].{to_type}'
|
|
cmd = f'ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -vf "scale=w=1920:h=1080:force_original_aspect_ratio=decrease" -c:v libx264 -crf 23 -maxrate 2M -bufsize 2M -c:a copy -movflags +faststart {tmp_video_file_clip_path}'
|
|
elif reencode:
|
|
new_filename = f'{filename_no_ext}_[clip_reencode].{to_type}'
|
|
cmd = f"ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -c:v libx264 -crf 23 -maxrate 2M -bufsize 2M -c:a copy -movflags +faststart {tmp_video_file_clip_path}"
|
|
else:
|
|
new_filename = f'{filename_no_ext}_[clip].{to_type}'
|
|
cmd = f"ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -c:v copy -c:a copy -movflags +faststart {tmp_video_file_clip_path}"
|
|
|
|
args = shlex.split(cmd)
|
|
try:
|
|
subprocess.run(args, check=True, capture_output=True, text=True)
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
|
|
file_info = await save_file_to_hosted_file(
|
|
file_path = tmp_video_file_clip_path,
|
|
filename = new_filename,
|
|
extension = to_type,
|
|
account_id = account_id,
|
|
link_to_type = link_to_type,
|
|
link_to_id = link_to_id,
|
|
)
|
|
|
|
if file_info.get('saved'):
|
|
if sel := sql_select(table_name='hosted_file', field_name='hash_sha256', field_value=file_info['hash_sha256']):
|
|
return load_hosted_file_obj(hosted_file_id=sel.get('id'), model_as_dict=True)
|
|
else:
|
|
new_obj = Hosted_File_Base(**file_info)
|
|
if res_id := create_hosted_file_obj(hosted_file_obj_new=new_obj):
|
|
return load_hosted_file_obj(hosted_file_id=res_id, model_as_dict=True)
|
|
return False
|
|
# ### END ### API Hosted File Methods ### clip_video_method() ###
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### convert_file_method() ###
|
|
async def convert_file_method(
|
|
hosted_file_id: str,
|
|
link_to_type: str,
|
|
link_to_id: int,
|
|
account_id: int,
|
|
filename_no_ext: str = 'automated_hosted_file_conversion',
|
|
to_type: str = 'webp',
|
|
):
|
|
from pdf2image import convert_from_path
|
|
|
|
hosted_file_obj = load_hosted_file_obj(hosted_file_id=hosted_file_id)
|
|
if not hosted_file_obj: return False
|
|
|
|
full_file_path = os.path.join(settings.FILES_PATH['hosted_files_root'], hosted_file_obj.hash_sha256[0:2], f'{hosted_file_obj.hash_sha256}.file')
|
|
if not os.path.exists(full_file_path): return False
|
|
|
|
save_path = os.path.join(settings.FILES_PATH['hosted_tmp_root'], 'convert_file', f'conv_{int(time.time())}.{to_type}')
|
|
os.makedirs(os.path.dirname(save_path), exist_ok=True)
|
|
|
|
images = convert_from_path(full_file_path, size=(3840, None))
|
|
image = images[0]
|
|
|
|
if to_type == 'webp':
|
|
image.save(save_path, lossless=False, quality=90)
|
|
elif to_type == 'png':
|
|
image.save(save_path, compress_level=9)
|
|
else: return False
|
|
|
|
file_info = await save_file_to_hosted_file(
|
|
file_path = save_path,
|
|
filename = f'{filename_no_ext}.{to_type}',
|
|
extension = to_type,
|
|
account_id = account_id,
|
|
link_to_type = link_to_type,
|
|
link_to_id = link_to_id,
|
|
)
|
|
|
|
if file_info.get('saved'):
|
|
if sel := sql_select(table_name='hosted_file', field_name='hash_sha256', field_value=file_info['hash_sha256']):
|
|
return load_hosted_file_obj(hosted_file_id=sel.get('id'), model_as_dict=True)
|
|
else:
|
|
new_obj = Hosted_File_Base(**file_info)
|
|
if res_id := create_hosted_file_obj(hosted_file_obj_new=new_obj):
|
|
return load_hosted_file_obj(hosted_file_id=res_id, model_as_dict=True)
|
|
return False
|
|
# ### END ### API Hosted File Methods ### convert_file_method() ###
|