Migrate clip/convert to V3 actions; add background clip support, redirect legacy route; update frontend guide

This commit is contained in:
Scott Idem
2026-03-11 14:51:08 -04:00
parent fbbc186af0
commit a20c436013
8 changed files with 283 additions and 64 deletions

View File

@@ -17,8 +17,7 @@ RUN apt-get update; \
# Install Python requirements
# This file is now located in the project root
COPY requirements.txt /tmp/requirements.txt
RUN --mount=type=cache,target=/root/.cache/pip \
pip install -r /tmp/requirements.txt
RUN pip install -r /tmp/requirements.txt
# Create a reference of actual installed versions
RUN pip freeze >> /tmp/aether_fastapi_requirements_current.txt

View File

@@ -34,18 +34,23 @@ async def clip_video_method(
Business logic for clipping a video using ffmpeg and saving as a new hosted_file.
Returns the new hosted_file dict or False.
"""
# NOTE: This function is invoked by the hosted_file router at
# `/hosted_file/{hosted_file_id}/clip_video` and returns the created
# hosted_file metadata (or False) so the router can build the standard
# response body consumed by frontends.
hosted_file_obj = load_hosted_file_obj(hosted_file_id=hosted_file_id)
if not hosted_file_obj: return False
file_hash = hosted_file_obj.hash_sha256
hosted_files_path = settings.FILES_PATH['hosted_files_root']
full_file_path = os.path.join(hosted_files_path, file_hash[0:2], f'{file_hash}.file')
if not os.path.exists(full_file_path): return False
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp4') as tmp_video_file_clip:
tmp_video_file_clip_path = tmp_video_file_clip.name
try:
if scale_down:
new_filename = f'{filename_no_ext}_[clip_scaled].{to_type}'
cmd = f'ffmpeg -hide_banner -loglevel error -nostats -y -i {full_file_path} -ss {start_time} -to {end_time} -vf "scale=w=1920:h=1080:force_original_aspect_ratio=decrease" -c:v libx264 -crf 23 -maxrate 2M -bufsize 2M -c:a copy -movflags +faststart {tmp_video_file_clip_path}'
@@ -59,7 +64,8 @@ async def clip_video_method(
args = shlex.split(cmd)
try:
subprocess.run(args, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError:
except subprocess.CalledProcessError as e:
log.exception(f'ffmpeg failed: returncode={e.returncode}; stdout={e.stdout}; stderr={e.stderr}')
return False
file_info = await save_file_to_hosted_file(
@@ -79,6 +85,12 @@ async def clip_video_method(
new_obj = Hosted_File_Base(**file_info)
if res_id := create_hosted_file_obj(hosted_file_obj_new=new_obj):
return load_hosted_file_obj(hosted_file_id=res_id, model_as_dict=True)
finally:
try:
if os.path.exists(tmp_video_file_clip_path):
os.unlink(tmp_video_file_clip_path)
except Exception:
log.exception('Failed to remove temporary video clip file')
return False
# ### END ### API Hosted File Methods ### clip_video_method() ###
@@ -94,24 +106,41 @@ async def convert_file_method(
to_type: str = 'webp',
):
from pdf2image import convert_from_path
# NOTE: Invoked by the hosted_file router at
# `/hosted_file/{hosted_file_id}/convert_file`. This helper currently
# converts the first page of a PDF to an image (webp/png) and saves a
# new hosted_file record; it returns that record or False on failure.
hosted_file_obj = load_hosted_file_obj(hosted_file_id=hosted_file_id)
if not hosted_file_obj: return False
# Ensure input is a PDF (pdf2image is designed for PDFs)
if (getattr(hosted_file_obj, 'extension', None) or '').lower() != 'pdf' and (getattr(hosted_file_obj, 'content_type', None) or '') != 'application/pdf':
log.warning('convert_file_method called on non-PDF file')
return False
full_file_path = os.path.join(settings.FILES_PATH['hosted_files_root'], hosted_file_obj.hash_sha256[0:2], f'{hosted_file_obj.hash_sha256}.file')
if not os.path.exists(full_file_path): return False
if not os.path.exists(full_file_path): return False
save_path = os.path.join(settings.FILES_PATH['hosted_tmp_root'], 'convert_file', f'conv_{int(time.time())}.{to_type}')
os.makedirs(os.path.dirname(save_path), exist_ok=True)
images = convert_from_path(full_file_path, size=(3840, None))
image = images[0]
if to_type == 'webp':
image.save(save_path, lossless=False, quality=90)
elif to_type == 'png':
image.save(save_path, compress_level=9)
else: return False
try:
images = convert_from_path(full_file_path, size=(3840, None))
image = images[0]
if to_type == 'webp':
image.save(save_path, lossless=False, quality=90)
elif to_type == 'png':
image.save(save_path, compress_level=9)
else:
log.warning(f'Unsupported target type for convert_file_method: {to_type}')
return False
except Exception:
log.exception('Error converting file to image')
return False
file_info = await save_file_to_hosted_file(
file_path = save_path,
@@ -129,6 +158,12 @@ async def convert_file_method(
else:
new_obj = Hosted_File_Base(**file_info)
if res_id := create_hosted_file_obj(hosted_file_obj_new=new_obj):
# cleanup tmp file
try:
if os.path.exists(save_path):
os.unlink(save_path)
except Exception:
log.exception('Failed to remove temporary converted file')
return load_hosted_file_obj(hosted_file_id=res_id, model_as_dict=True)
return False
# ### END ### API Hosted File Methods ### convert_file_method() ###

View File

@@ -18,6 +18,7 @@ from app.methods.hosted_file_methods import (
create_hosted_file_link, delete_hosted_file_link, get_hosted_file_link_rec_list
)
from app.methods.lib_media import convert_file_method
from app.methods.lib_media import clip_video_method
from app.lib_general_v3 import (
AccountContext, get_account_context, get_account_context_optional,
SerializationParams, DelayParams
@@ -42,7 +43,7 @@ def validate_file_extension(filename: str, allowed_extensions: List[str]):
"""
if not allowed_extensions:
return True
ext = filename.rsplit('.', 1)[-1].lower()
if ext not in [e.lower().strip('.') for e in allowed_extensions]:
raise HTTPException(
@@ -85,7 +86,7 @@ async def upload_files_action(
- Returns clean Vision IDs.
"""
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
# 1. Resolve Parent IDs
account_id_random = account_id
if res_account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'):
@@ -132,7 +133,7 @@ async def upload_files_action(
):
# Use existing record
hosted_file_id_int = existing_rec.get('id')
# Migration check: Update subdirectory if missing or mismatched
if file_info.get('subdirectory_path') and existing_rec.get('subdirectory_path') != file_info['subdirectory_path']:
log.info(f"Updating subdirectory_path for existing record {hosted_file_id_int} to {file_info['subdirectory_path']}")
@@ -140,7 +141,7 @@ async def upload_files_action(
table_name = 'hosted_file',
data = {'id': hosted_file_id_int, 'subdirectory_path': file_info['subdirectory_path']}
)
# Reload to get the latest DB state (including updated path)
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id_int, model_as_dict=True)
else:
@@ -148,7 +149,7 @@ async def upload_files_action(
file_info['account_id'] = account_id_int
file_info['account_id_random'] = account_id_random
new_hosted_file_obj = Hosted_File_Base(**file_info)
if res_new_id := create_hosted_file_obj(hosted_file_obj_new=new_hosted_file_obj):
hosted_file_id_int = res_new_id
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id_int, model_as_dict=True)
@@ -201,11 +202,11 @@ async def download_file_action(
# 1. Auth Bypass Logic (site_key and simplified key)
is_authorized = False
# Priority A: Standard Auth (JWT or API Key)
if account.auth_method != 'guest':
is_authorized = True
# Priority B: Simplified Access Pattern (?key=ANY_VALID_ACCOUNT_ID)
elif key:
# For now, to unblock the frontend, any valid account_id_random is sufficient.
@@ -221,17 +222,17 @@ async def download_file_action(
if site_res := sql_select(sql=sql, data={'key': site_key}):
is_authorized = True
log.info(f"Auth Bypass: Download authorized via site_key.")
if not is_authorized:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Authentication required or invalid access key.")
# 2. Resolve File Record
# ID Vision: Attempt to resolve the ID.
# ID Vision: Attempt to resolve the ID.
# 🛑 REMINDER: If adding a new specialized 'container' object (like event_person_profile),
# ensure the lookup logic is mirrored here to allow direct downloads via container ID.
# If not found in hosted_file, check if it's an event_file or archive_content ID that we can resolve.
resolved_id = redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file')
if not resolved_id:
log.info(f"ID {hosted_file_id} not found in hosted_file. Checking container tables...")
# A. Check event_file
@@ -239,7 +240,7 @@ async def download_file_action(
if ef_rec := sql_select(sql="SELECT hosted_file_id FROM event_file WHERE id = :id", data={'id': ef_id}):
resolved_id = ef_rec.get('hosted_file_id')
log.info(f"Resolved event_file {hosted_file_id} to hosted_file {resolved_id}")
# B. Check archive_content
if not resolved_id:
if ac_id := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='archive_content'):
@@ -305,7 +306,7 @@ async def download_file_action(
'Content-Disposition': f'attachment; filename="{safe_filename}"; filename*=utf-8\'\'{encoded_filename}'
}
)
return FileResponse(full_file_path, filename=target_filename, media_type=media_type)
@@ -329,7 +330,7 @@ async def download_file_by_hash_action(
# For now, we strictly require a valid machine API key (auth_method will not be 'guest')
if account.auth_method == 'guest':
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
status_code=status.HTTP_403_FORBIDDEN,
detail="Valid API Key required for hash-based downloads."
)
@@ -349,7 +350,7 @@ async def download_file_by_hash_action(
# 3. Serve File
target_filename = filename or f"file_{sha256[:8]}.bin"
media_type = mimetypes.guess_type(target_filename)[0] or 'application/octet-stream'
return FileResponse(full_file_path, filename=target_filename, media_type=media_type)
@@ -427,7 +428,7 @@ async def delete_file_action(
if rm_orphan and is_orphan:
log.info(f"File {file_id_int} is an orphan. Cleaning up...")
# Method Handling
if method == 'delete':
# Hard delete: Record + Disk
@@ -481,3 +482,55 @@ async def convert_file(
return mk_resp(data=result)
return mk_resp(data=None, status_code=400, status_message="Conversion failed.")
# ### END ### API V3 Hosted File Action ### convert_file() ###
@router.get('/{hosted_file_id}/clip_video', response_model=Resp_Body_Base)
async def clip_video(
hosted_file_id: str = Path(min_length=11, max_length=22),
link_to_type: str = Query(...),
link_to_id: str = Query(...),
start_time: str = Query(..., min_length=8, max_length=8),
end_time: str = Query(..., min_length=8, max_length=8),
filename_no_ext: str = Query('automated_hosted_file_clip_video'),
reencode: bool = Query(False),
scale_down: bool = Query(False),
background: bool = Query(False),
account: AccountContext = Depends(get_account_context),
):
"""
Clip a segment from a hosted video and save as a new hosted_file record.
Supports optional background scheduling returning `202 Accepted` when `background=true`.
"""
lid_int = redis_lookup_id_random(record_id_random=link_to_id, table_name=link_to_type)
if not lid_int:
raise HTTPException(status_code=404, detail=f"Linked object not found: {link_to_type}:{link_to_id}")
async def _run_clip():
try:
return await clip_video_method(
hosted_file_id=hosted_file_id,
start_time=start_time,
end_time=end_time,
account_id=account.account_id,
account_id_random=account.account_id_random,
link_to_type=link_to_type,
link_to_id=lid_int,
filename_no_ext=filename_no_ext,
reencode=reencode,
scale_down=scale_down,
)
except Exception:
log.exception('Background clip task failed')
return None
if background:
# Schedule and return 202 Accepted
asyncio.create_task(_run_clip())
return mk_resp(data={'task': 'scheduled'}, status_code=202, status_message='Clip scheduled (background)')
result = await _run_clip()
if result:
return mk_resp(data=result)
return mk_resp(data=None, status_code=400, status_message="Clip failed.")
# ### END ### API V3 Hosted File Action ### clip_video() ###
# ### END ### API V3 Hosted File Action ### convert_file() ###

View File

@@ -1,6 +1,7 @@
import aiofiles, datetime, hashlib, mimetypes, os, pathlib, random, shutil, subprocess, shlex, tempfile, time
from fastapi import APIRouter, Body, Depends, File, Form, Header, HTTPException, Path, Query, Response, status, UploadFile
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.responses import FileResponse, StreamingResponse, RedirectResponse
from urllib.parse import quote
from pydantic import BaseModel, EmailStr, Field
from typing import Dict, List, Optional, Set, Union
@@ -9,9 +10,9 @@ from app.config import settings
from app.db_sql import sql_insert, sql_update, sql_insert_or_update, sql_select, sql_delete, redis_lookup_id_random
from app.methods.hosted_file_methods import (
create_hosted_file_obj, handle_delete_hosted_file, load_hosted_file_obj,
save_file, save_file_to_hosted_file, create_hosted_file_link,
delete_hosted_file_link, get_hosted_file_link_rec_list, lookup_file_hash,
create_hosted_file_obj, handle_delete_hosted_file, load_hosted_file_obj,
save_file, save_file_to_hosted_file, create_hosted_file_link,
delete_hosted_file_link, get_hosted_file_link_rec_list, lookup_file_hash,
check_for_hosted_file_hash_file, directory_check_method
)
from app.methods.lib_media import clip_video_method, convert_file_method
@@ -35,10 +36,10 @@ async def directory_check(
"""
log.setLevel(logging.INFO)
result_list = directory_check_method(rm_orphan=rm_orphan)
if result_list is False:
return mk_resp(data=False, status_code=500, response=commons.response, status_message='Hosted files directory not found.')
return mk_resp(data=result_list, response=commons.response, status_message=f'Processed {len(result_list)} files.')
# ### END ### API Hosted File ### directory_check() ###
@@ -52,7 +53,7 @@ async def download_hosted_file(
commons: Common_Route_Params = Depends(common_route_params),
):
log.setLevel(logging.INFO)
# ID Resolve
if hfid_int := redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file'): pass
else: return mk_resp(data=None, status_code=404, response=commons.response, status_message='The hosted_file ID was invalid or not found.')
@@ -63,7 +64,7 @@ async def download_hosted_file(
target_filename = filename or hosted_file_obj.filename
hash_sha256 = hosted_file_obj.hash_sha256
hosted_files_path = settings.FILES_PATH['hosted_files_root']
subdir = hosted_file_obj.subdirectory_path or ''
file_path = os.path.join(hosted_files_path, subdir, f'{hash_sha256}.file')
@@ -151,7 +152,7 @@ async def upload_files(
Legacy Upload Route (V2). Preserved for frontend compatibility.
"""
log.setLevel(logging.INFO)
acc_id_rand = account_id
if acc_id_int := redis_lookup_id_random(record_id_random=acc_id_rand, table_name='account'): pass
else: return mk_resp(data=None, status_code=400, response=response)
@@ -192,7 +193,7 @@ async def upload_files(
# Final metadata and linking
hosted_file_dict.update({'saved': True, 'already_exists': file_info['already_exists'], 'copy_timer': file_info['copy_timer']})
if link_to_type not in ['event', 'event_location', 'event_session', 'event_presentation', 'event_presenter', 'event_badge', 'event_exhibit', 'event_person']:
create_hosted_file_link(account_id=acc_id_int, hosted_file_id=hfid_int, link_to_type=link_to_type, link_to_id=lid_int)
@@ -301,21 +302,23 @@ async def clip_video(
"""
Modularized video clipping route.
"""
if lid_int := redis_lookup_id_random(record_id_random=link_to_id, table_name=link_to_type):
result = await clip_video_method(
hosted_file_id = hosted_file_id,
start_time = start_time,
end_time = end_time,
account_id = commons.x_account_id,
account_id_random = commons.x_account_id_random,
link_to_type = link_to_type,
link_to_id = lid_int,
filename_no_ext = filename_no_ext,
reencode = reencode,
scale_down = scale_down
)
if result: return mk_resp(data=result, response=commons.response)
return mk_resp(data=None, status_code=400, response=commons.response)
# Deprecated legacy route — redirect clients to the V3 action endpoint
# New V3 action: /v3/action/hosted_file/{hosted_file_id}/clip_video
params = [
f"link_to_type={quote(link_to_type)}",
f"link_to_id={quote(link_to_id)}",
f"start_time={quote(start_time)}",
f"end_time={quote(end_time)}",
]
if filename_no_ext:
params.append(f"filename_no_ext={quote(filename_no_ext)}")
if reencode:
params.append(f"reencode=true")
if scale_down:
params.append(f"scale_down=true")
target = f"/v3/action/hosted_file/{hosted_file_id}/clip_video?" + "&".join(params)
return RedirectResponse(url=target, status_code=307)
# ### END ### API Hosted File ### clip_video() ###
@@ -332,7 +335,7 @@ async def create_video(
# Specialized utility route kept inline for now.
from wand.image import Image
from wand.drawing import Drawing
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as audio_file:
audio_file.write(await file.read())
audio_file_path = audio_file.name
@@ -359,5 +362,5 @@ async def create_video(
video_name = f"{tempfile.gettempdir()}/{title_part_1}.mp4"
cmd = f"ffmpeg -y -loop 1 -i {title_image_path} -i {audio_file_path} -c:a copy -c:v libx264 -shortest {video_name}"
subprocess.run(shlex.split(cmd), check=True)
return FileResponse(video_name, media_type='video/mp4', filename=f'{title_part_1}.mp4')

View File

@@ -60,7 +60,7 @@ The primary way to retrieve data.
### C. POST Create / PATCH Update
Modify data in the system.
* **Endpoints:**
* **Endpoints:**
* `POST /v3/crud/{obj_type}/`
* `PATCH /v3/crud/{obj_type}/{id}`
* **Strict Mode (Default):** The API validates your payload against the Pydantic model. If you send fields that do not exist in the model, the database might return a 400 "Unknown column" error.
@@ -123,6 +123,38 @@ Every Event File (`event_file`) **must** have a linked Hosted File (`hosted_file
---
## 6. Hosted File Actions: Convert & Clip (Frontend Notes)
These helper endpoints let the frontend request small server-side transformations without uploading new blobs. They return a newly-created `hosted_file` metadata object on success.
- **Convert (PDF → Image)**
- Method: `GET`
- Path: `/v3/action/hosted_file/{hosted_file_id}/convert_file`
- Required query params: `link_to_type`, `link_to_id`
- Optional query params: `filename_no_ext` (defaults to `automated_hosted_file_conversion`), `to_type` (defaults to `webp`)
- Auth: standard V3 headers (`x-aether-api-key`, `x-account-id` / `x-no-account-id` / `?jwt=`)
- Behavior: converts the first page of a PDF to `webp` or `png`, saves a new `hosted_file`, and returns its metadata. Returns 400 on failure.
- **Clip Video**
- Method: `GET`
- Path: `/v3/action/hosted_file/{hosted_file_id}/clip_video`
- Required query params: `link_to_type`, `link_to_id`, `start_time`, `end_time` (format `HH:MM:SS`)
- Optional query params: `filename_no_ext` (defaults to `automated_hosted_file_clip_video`), `reencode` (bool), `scale_down` (bool)
- Auth: standard V3 headers
- Behavior: extracts a clip using `ffmpeg` and saves it as a new `hosted_file`. Defaults to stream-copying to be fast; set `reencode=true` to force H.264 or `scale_down=true` to resize. Returns 400 on failure.
- Behavior: extracts a clip using `ffmpeg` and saves it as a new `hosted_file`.
- Defaults to stream-copying to be fast; set `reencode=true` to force H.264 or `scale_down=true` to resize.
- For longer-running clips you can schedule the job in the background by adding `?background=true`. When scheduled the API returns `202 Accepted` and the clip runs asynchronously on the server; check the returned `hosted_file` record later via the standard V3 `hosted_file` endpoints.
- Returns 400 on synchronous failure; returns 202 when scheduled successfully.
Frontend guidance:
- Call these routes with the same `link_to_type` / `link_to_id` you plan to associate the resulting hosted_file with — the server resolves random IDs for you.
- After a successful response, use the V3 `hosted_file` action endpoints (download/delete) to manage or retrieve the new file.
- These endpoints run synchronously and can take time for large inputs; for heavy or batch workloads use a queued job pattern instead.
- These endpoints may take time for large inputs. Prefer using `?background=true` to schedule work and receive a `202 Accepted` response for async processing. For heavy or batch workloads use a queued job pattern instead.
## 5. Troubleshooting 403 Forbidden
If you receive a 403 on a valid ID:

View File

@@ -7,8 +7,8 @@ API_KEY = "PMM4n50teUCaOMMTN8qOJA"
ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q" # OSIT
# IDs for testing ID Vision resolution
HOSTED_FILE_ID = "PUwhbT2tMgU"
EVENT_FILE_ID = "cFyCd7FPZe9"
HOSTED_FILE_ID = "PUwhbT2tMgU"
EVENT_FILE_ID = "2e_MFBgY5eg"
ARCHIVE_CONTENT_ID = "UjKzrk-GKu5"
# Hash for testing content-addressable storage
@@ -18,7 +18,7 @@ def test_download_by_id(label, test_id):
print(f"--- Testing Download via {label}: {test_id} ---")
url = f"{BASE_URL}/{test_id}/download"
headers = {"X-Aether-API-Key": API_KEY, "x-account-id": ACCOUNT_ID}
try:
response = requests.get(url, headers=headers, stream=True)
print(f"Status: {response.status_code}")
@@ -35,7 +35,7 @@ def test_download_by_id(label, test_id):
def test_download_by_hash_query():
print(f"\n--- Testing Hash Download via Query Param API Key ---")
url = f"{BASE_URL}/hash/{FILE_HASH}/download?api_key={API_KEY}"
try:
response = requests.get(url)
print(f"Status: {response.status_code}")
@@ -57,7 +57,7 @@ def test_download_streaming():
"x-account-id": ACCOUNT_ID,
"Range": "bytes=0-10"
}
try:
response = requests.get(url, headers=headers)
print(f"Status: {response.status_code} (Expected 206)")
@@ -79,7 +79,7 @@ if __name__ == "__main__":
test_download_by_hash_query(),
test_download_streaming()
]
if all(results):
print("\n🎉 ALL DOWNLOAD PATTERNS VERIFIED!")
else:

View File

@@ -0,0 +1,97 @@
import sys
import os
import asyncio
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
# Add project root to path
sys.path.append(os.getcwd())
# Mock configuration and low-level deps BEFORE importing the target module
mock_config = MagicMock()
mock_settings = MagicMock()
mock_settings.FILES_PATH = {
'hosted_files_root': '/tmp',
'hosted_tmp_root': '/tmp'
}
mock_config.settings = mock_settings
sys.modules['app.config'] = mock_config
sys.modules['app.lib_general'] = MagicMock()
sys.modules['app.db_sql'] = MagicMock()
from app.methods import lib_media
def test_clip_video_method():
print('--- test_clip_video_method ---')
hosted_obj = SimpleNamespace(hash_sha256='aa11bb22cc33ddeeff00112233445566778899aa', extension='mp4')
# Prepare mocked behaviors
async def _async_save(*args, **kwargs):
return {'saved': True, 'hash_sha256': 'newhash'}
with patch.object(lib_media, 'load_hosted_file_obj', side_effect=[hosted_obj, {'id': 'EXIST_ID', 'hosted_file_id': 'EXIST_ID', 'filename': 'clip.mp4'}]) as mock_load, \
patch.object(lib_media, 'save_file_to_hosted_file', new=_async_save) as mock_save, \
patch('subprocess.run', return_value=None) as mock_run, \
patch('os.path.exists', return_value=True) as mock_exists, \
patch.object(lib_media, 'sql_select', return_value={'id': 'EXIST_ID'}) as mock_sql:
res = asyncio.run(lib_media.clip_video_method(
hosted_file_id='SOME_ID',
start_time='00:00:01',
end_time='00:00:05',
account_id=1,
link_to_type='archive_content',
link_to_id=1,
))
print('Result:', res)
assert isinstance(res, dict)
assert res.get('id') == 'EXIST_ID'
def test_convert_file_method():
print('--- test_convert_file_method ---')
hosted_obj = SimpleNamespace(hash_sha256='ff22ee33dd44cc55bb66aa77cc88dd99ee001122', extension='pdf', content_type='application/pdf')
class DummyImage:
def save(self, path, **kwargs):
with open(path, 'wb') as f:
f.write(b'PNGDATA')
async def _async_save2(*args, **kwargs):
return {'saved': True, 'hash_sha256': 'newhash2'}
# Provide a dummy pdf2image module to satisfy the runtime import inside the function
sys.modules['pdf2image'] = MagicMock()
sys.modules['pdf2image'].convert_from_path = lambda *a, **k: [DummyImage()]
with patch.object(lib_media, 'load_hosted_file_obj', side_effect=[hosted_obj, {'id': 'EXIST_ID2', 'hosted_file_id': 'EXIST_ID2', 'filename': 'conv.webp'}]) as mock_load, \
patch.object(lib_media, 'save_file_to_hosted_file', new=_async_save2) as mock_save, \
patch('os.path.exists', return_value=True) as mock_exists, \
patch.object(lib_media, 'sql_select', return_value={'id': 'EXIST_ID2'}) as mock_sql, \
patch.object(lib_media, 'create_hosted_file_obj', return_value='NEW_ID') as mock_create:
res = asyncio.run(lib_media.convert_file_method(
hosted_file_id='SOME_ID',
link_to_type='archive_content',
link_to_id=1,
account_id=1,
to_type='webp'
))
print('Result:', res)
assert isinstance(res, dict)
assert res.get('id') == 'EXIST_ID2'
if __name__ == '__main__':
try:
test_clip_video_method()
test_convert_file_method()
print('\n🎉 MEDIA METHOD TESTS PASSED')
except AssertionError as e:
print('\n❌ TEST FAILED:', e)
raise