170 lines
7.3 KiB
Python
170 lines
7.3 KiB
Python
import os
|
|
import pathlib
|
|
import shutil
|
|
import time
|
|
import tempfile
|
|
import subprocess
|
|
import shlex
|
|
import logging
|
|
import mimetypes
|
|
|
|
from app.config import settings
|
|
from app.lib_general import log, logging
|
|
from app.db_sql import sql_select, sql_update, sql_insert, get_id_random
|
|
from app.methods.hosted_file_methods import (
|
|
load_hosted_file_obj, create_hosted_file_obj, save_file_to_hosted_file
|
|
)
|
|
from app.models.hosted_file_models import Hosted_File_Base
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### clip_video_method() ###
|
|
async def clip_video_method(
|
|
hosted_file_id: str,
|
|
start_time: str,
|
|
end_time: str,
|
|
account_id: int,
|
|
link_to_type: str,
|
|
link_to_id: int,
|
|
account_id_random: str = None,
|
|
filename_no_ext: str = 'automated_hosted_file_clip_video',
|
|
to_type: str = 'mp4',
|
|
reencode: bool = False,
|
|
scale_down: bool = False,
|
|
):
|
|
"""
|
|
Business logic for clipping a video using ffmpeg and saving as a new hosted_file.
|
|
Returns the new hosted_file dict or False.
|
|
"""
|
|
# NOTE: This function is invoked by the hosted_file router at
|
|
# `/hosted_file/{hosted_file_id}/clip_video` and returns the created
|
|
# hosted_file metadata (or False) so the router can build the standard
|
|
# response body consumed by frontends.
|
|
hosted_file_obj = load_hosted_file_obj(hosted_file_id=hosted_file_id)
|
|
if not hosted_file_obj: return False
|
|
|
|
file_hash = hosted_file_obj.hash_sha256
|
|
hosted_files_path = settings.FILES_PATH['hosted_files_root']
|
|
full_file_path = os.path.join(hosted_files_path, file_hash[0:2], f'{file_hash}.file')
|
|
|
|
if not os.path.exists(full_file_path): return False
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_video_file_clip:
|
|
tmp_video_file_clip_path = tmp_video_file_clip.name
|
|
|
|
try:
|
|
if scale_down:
|
|
new_filename = f'{filename_no_ext}_[clip_scaled].{to_type}'
|
|
cmd = f'ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -vf "scale=w=1920:h=1080:force_original_aspect_ratio=decrease" -c:v libx264 -crf 23 -maxrate 2M -bufsize 2M -c:a copy -movflags +faststart {tmp_video_file_clip_path}'
|
|
elif reencode:
|
|
new_filename = f'{filename_no_ext}_[clip_reencode].{to_type}'
|
|
cmd = f"ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -c:v libx264 -crf 23 -maxrate 2M -bufsize 2M -c:a copy -movflags +faststart {tmp_video_file_clip_path}"
|
|
else:
|
|
new_filename = f'{filename_no_ext}_[clip].{to_type}'
|
|
cmd = f"ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -c:v copy -c:a copy -movflags +faststart {tmp_video_file_clip_path}"
|
|
|
|
args = shlex.split(cmd)
|
|
try:
|
|
subprocess.run(args, check=True, capture_output=True, text=True)
|
|
except subprocess.CalledProcessError as e:
|
|
log.exception(f'ffmpeg failed: returncode={e.returncode}; stdout={e.stdout}; stderr={e.stderr}')
|
|
return False
|
|
|
|
file_info = await save_file_to_hosted_file(
|
|
file_path = tmp_video_file_clip_path,
|
|
filename = new_filename,
|
|
extension = to_type,
|
|
account_id = account_id,
|
|
account_id_random = account_id_random,
|
|
link_to_type = link_to_type,
|
|
link_to_id = link_to_id,
|
|
)
|
|
|
|
if file_info.get('saved'):
|
|
if sel := sql_select(table_name='hosted_file', field_name='hash_sha256', field_value=file_info['hash_sha256']):
|
|
return load_hosted_file_obj(hosted_file_id=sel.get('id'), model_as_dict=True)
|
|
else:
|
|
new_obj = Hosted_File_Base(**file_info)
|
|
if res_id := create_hosted_file_obj(hosted_file_obj_new=new_obj):
|
|
return load_hosted_file_obj(hosted_file_id=res_id, model_as_dict=True)
|
|
finally:
|
|
try:
|
|
if os.path.exists(tmp_video_file_clip_path):
|
|
os.unlink(tmp_video_file_clip_path)
|
|
except Exception:
|
|
log.exception('Failed to remove temporary video clip file')
|
|
return False
|
|
# ### END ### API Hosted File Methods ### clip_video_method() ###
|
|
|
|
|
|
# ### BEGIN ### API Hosted File Methods ### convert_file_method() ###
|
|
async def convert_file_method(
|
|
hosted_file_id: str,
|
|
link_to_type: str,
|
|
link_to_id: int,
|
|
account_id: int,
|
|
account_id_random: str = None,
|
|
filename_no_ext: str = 'automated_hosted_file_conversion',
|
|
to_type: str = 'webp',
|
|
):
|
|
from pdf2image import convert_from_path
|
|
|
|
# NOTE: Invoked by the hosted_file router at
|
|
# `/hosted_file/{hosted_file_id}/convert_file`. This helper currently
|
|
# converts the first page of a PDF to an image (webp/png) and saves a
|
|
# new hosted_file record; it returns that record or False on failure.
|
|
|
|
hosted_file_obj = load_hosted_file_obj(hosted_file_id=hosted_file_id)
|
|
if not hosted_file_obj: return False
|
|
|
|
# Ensure input is a PDF (pdf2image is designed for PDFs)
|
|
if (getattr(hosted_file_obj, 'extension', None) or '').lower() != 'pdf' and (getattr(hosted_file_obj, 'content_type', None) or '') != 'application/pdf':
|
|
log.warning('convert_file_method called on non-PDF file')
|
|
return False
|
|
|
|
full_file_path = os.path.join(settings.FILES_PATH['hosted_files_root'], hosted_file_obj.hash_sha256[0:2], f'{hosted_file_obj.hash_sha256}.file')
|
|
if not os.path.exists(full_file_path): return False
|
|
if not os.path.exists(full_file_path): return False
|
|
|
|
save_path = os.path.join(settings.FILES_PATH['hosted_tmp_root'], 'convert_file', f'conv_{int(time.time())}.{to_type}')
|
|
os.makedirs(os.path.dirname(save_path), exist_ok=True)
|
|
|
|
try:
|
|
images = convert_from_path(full_file_path, size=(3840, None))
|
|
image = images[0]
|
|
|
|
if to_type == 'webp':
|
|
image.save(save_path, lossless=False, quality=90)
|
|
elif to_type == 'png':
|
|
image.save(save_path, compress_level=9)
|
|
else:
|
|
log.warning(f'Unsupported target type for convert_file_method: {to_type}')
|
|
return False
|
|
except Exception:
|
|
log.exception('Error converting file to image')
|
|
return False
|
|
|
|
file_info = await save_file_to_hosted_file(
|
|
file_path = save_path,
|
|
filename = f'{filename_no_ext}.{to_type}',
|
|
extension = to_type,
|
|
account_id = account_id,
|
|
account_id_random = account_id_random,
|
|
link_to_type = link_to_type,
|
|
link_to_id = link_to_id,
|
|
)
|
|
|
|
if file_info.get('saved'):
|
|
if sel := sql_select(table_name='hosted_file', field_name='hash_sha256', field_value=file_info['hash_sha256']):
|
|
return load_hosted_file_obj(hosted_file_id=sel.get('id'), model_as_dict=True)
|
|
else:
|
|
new_obj = Hosted_File_Base(**file_info)
|
|
if res_id := create_hosted_file_obj(hosted_file_obj_new=new_obj):
|
|
# cleanup tmp file
|
|
try:
|
|
if os.path.exists(save_path):
|
|
os.unlink(save_path)
|
|
except Exception:
|
|
log.exception('Failed to remove temporary converted file')
|
|
return load_hosted_file_obj(hosted_file_id=res_id, model_as_dict=True)
|
|
return False
|
|
# ### END ### API Hosted File Methods ### convert_file_method() ###
|