V3 Migration Phase 2-4: Implementation of specialized Binary Actions (Upload, Stream, Delete) and Orphan management logic. Full E2E coverage.

This commit is contained in:
Scott Idem
2026-01-22 18:51:26 -05:00
parent 1837b442cf
commit 60345dd21e
6 changed files with 641 additions and 1 deletions

View File

@@ -0,0 +1,363 @@
from fastapi import APIRouter, Depends, File, Form, Header, HTTPException, Path, Query, Response, status, UploadFile
from fastapi.responses import FileResponse, StreamingResponse
import aiofiles
import mimetypes
import os
import pathlib
from typing import Dict, List, Optional, Set, Union
import asyncio
import logging
log = logging.getLogger(__name__)
from app.config import settings
from app.db_sql import redis_lookup_id_random, sql_select, sql_update, sql_delete, get_id_random
from app.methods.hosted_file_methods import (
create_hosted_file_obj, load_hosted_file_obj, save_file,
create_hosted_file_link, delete_hosted_file_link, get_hosted_file_link_rec_list
)
from app.lib_general_v3 import (
AccountContext, get_account_context, get_account_context_optional,
SerializationParams, DelayParams
)
from app.models.hosted_file_models import Hosted_File_Base
from app.models.response_models import Resp_Body_Base, mk_resp
"""
Aether API V3 - Hosted File Action Router
------------------------------------------
Handles specialized binary operations like uploads, downloads, and complex deletions.
These routes complement the standard CRUD metadata routes.
"""
router = APIRouter()
# --- Helpers ---
def validate_file_extension(filename: str, allowed_extensions: List[str]):
"""
Backup check for file extensions.
"""
if not allowed_extensions:
return True
ext = filename.rsplit('.', 1)[-1].lower()
if ext not in [e.lower().strip('.') for e in allowed_extensions]:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"File extension '.{ext}' is not allowed. Allowed: {', '.join(allowed_extensions)}"
)
return True
async def file_streamer(path: str, start: int, end: int):
chunk_size = 8192 # 8KB
async with aiofiles.open(path, mode='rb') as f:
await f.seek(start)
while True:
chunk_start = await f.tell()
if chunk_start >= end:
break
bytes_to_read = min(chunk_size, end - chunk_start)
data = await f.read(bytes_to_read)
if not data:
break
yield data
# --- Routes ---
@router.post('/upload', response_model=Resp_Body_Base)
async def upload_files_action(
file_list: List[UploadFile] = File(...),
account_id: str = Form(..., min_length=11, max_length=22),
link_to_type: str = Form(...),
link_to_id: str = Form(..., min_length=11, max_length=22),
allowed_extensions: Optional[List[str]] = Query(None),
account: AccountContext = Depends(get_account_context),
delay: DelayParams = Depends(),
):
"""
V3 Enhanced Upload Action.
- Handles multiple files.
- Resolves IDs to integers.
- Deduplicates via Hash lookup.
- Returns clean Vision IDs.
"""
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
# 1. Resolve Parent IDs
account_id_random = account_id
if res_account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'):
account_id_int = res_account_id
else:
raise HTTPException(status_code=400, detail="Invalid account_id.")
link_to_id_random = link_to_id
if res_link_id := redis_lookup_id_random(record_id_random=link_to_id, table_name=link_to_type):
link_to_id_int = res_link_id
else:
raise HTTPException(status_code=400, detail=f"Invalid link_to_id for type {link_to_type}.")
hosted_file_list = []
for file_obj in file_list:
# 2. Extension Validation
validate_file_extension(file_obj.filename, allowed_extensions)
# 3. Physical Save
file_info = await save_file(
file = file_obj,
account_id = account_id_int,
account_id_random = account_id_random,
link_to_type = link_to_type,
link_to_id = link_to_id_int,
link_to_id_random = link_to_id_random,
check_allowed_extension = False, # Handled by validate_file_extension above
)
if not file_info.get('saved'):
log.error(f"Failed to save file: {file_obj.filename}")
continue
hosted_file_id_int = None
hosted_file_dict = {}
# 4. Database Synchronization (Deduplication)
log.info(f"Syncing DB record for hash: {file_info['hash_sha256']}")
if existing_rec := sql_select(
table_name = 'hosted_file',
field_name = 'hash_sha256',
field_value = file_info['hash_sha256'],
):
# Use existing record
hosted_file_id_int = existing_rec.get('id')
# Migration check: Update subdirectory if missing
if not existing_rec.get('subdirectory_path') and file_info.get('subdirectory_path'):
sql_update(
table_name = 'hosted_file',
data = {'id': hosted_file_id_int, 'subdirectory_path': file_info['subdirectory_path']}
)
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id_int, model_as_dict=True)
else:
# Create new record
file_info['account_id'] = account_id_int
file_info['account_id_random'] = account_id_random
new_hosted_file_obj = Hosted_File_Base(**file_info)
if res_new_id := create_hosted_file_obj(hosted_file_obj_new=new_hosted_file_obj):
hosted_file_id_int = res_new_id
hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id_int, model_as_dict=True)
else:
log.error("Database insertion failed for hosted_file.")
continue
# 5. Relational Linking
if hosted_file_id_int:
create_hosted_file_link(
account_id = account_id_int,
hosted_file_id = hosted_file_id_int,
link_to_type = link_to_type,
link_to_id = link_to_id_int
)
# 6. Response Preparation (Vision IDs)
# Add metadata flags
hosted_file_dict['already_exists'] = file_info.get('already_exists')
hosted_file_dict['saved'] = file_info.get('saved')
hosted_file_dict['copy_timer'] = file_info.get('copy_timer')
# Ensure ID is a random string for the frontend
if not isinstance(hosted_file_dict.get('id'), str):
rid = get_id_random(hosted_file_id_int, table_name='hosted_file')
hosted_file_dict['id'] = rid
hosted_file_dict['hosted_file_id'] = rid
hosted_file_list.append(hosted_file_dict)
return mk_resp(data=hosted_file_list, status_message=f"Successfully processed {len(hosted_file_list)} files.")
@router.get('/{hosted_file_id}/download')
async def download_file_action(
response: Response,
hosted_file_id: str = Path(min_length=11, max_length=22),
filename: Optional[str] = Query(None, min_length=4, max_length=255),
site_key: Optional[str] = Query(None), # Bypass API Key/JWT if valid site key provided
range: Optional[str] = Header(None),
account: AccountContext = Depends(get_account_context_optional),
delay: DelayParams = Depends(),
):
"""
Enhanced download/streaming logic.
Supports byte-range seeking, delay simulation, and site_key bypass.
"""
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
# 1. Auth Bypass Logic (site_key)
is_authorized = False
if account.auth_method != 'guest':
is_authorized = True
elif site_key:
# Verify site key existence and status
sql = "SELECT id FROM site WHERE auth_key = :key AND enable = true LIMIT 1"
if site_res := sql_select(sql=sql, data={'key': site_key}):
is_authorized = True
log.info(f"Auth Bypass: Download authorized via site_key.")
if not is_authorized:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Authentication required or invalid site_key.")
# 2. Resolve File Record
resolved_id = redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file')
if not resolved_id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hosted file record not found.")
hosted_file_obj = load_hosted_file_obj(hosted_file_id=resolved_id)
if not hosted_file_obj:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hosted file data could not be loaded.")
# 3. Path Resolution
hosted_files_path = settings.FILES_PATH['hosted_files_root']
subdir_path = hosted_file_obj.subdirectory_path
hash_sha256 = hosted_file_obj.hash_sha256
hash_filename = f"{hash_sha256}.file"
if subdir_path:
full_file_path = os.path.join(hosted_files_path, subdir_path, hash_filename)
else:
full_file_path = os.path.join(hosted_files_path, hash_filename)
if not os.path.exists(full_file_path):
log.error(f"File not found on disk: {full_file_path}")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Physical file not found on server.")
# 4. Streaming / Download logic
target_filename = filename or hosted_file_obj.filename
media_type = mimetypes.guess_type(target_filename)[0] or 'application/octet-stream'
if range:
file_size = os.stat(full_file_path).st_size
try:
range_parts = range.replace('bytes=', '').split('-')
start = int(range_parts[0])
end = int(range_parts[1]) if len(range_parts) > 1 and range_parts[1] else file_size - 1
except (ValueError, IndexError):
raise HTTPException(status_code=status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE)
if start >= file_size:
raise HTTPException(status_code=status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE)
end = min(end, file_size - 1)
content_length = end - start + 1
return StreamingResponse(
file_streamer(full_file_path, start, end + 1),
media_type = media_type,
status_code = status.HTTP_206_PARTIAL_CONTENT,
headers = {
'Accept-Ranges': 'bytes',
'Content-Range': f'bytes {start}-{end}/{file_size}',
'Content-Length': str(content_length),
'Content-Disposition': f'attachment; filename="{target_filename}"'
}
)
return FileResponse(full_file_path, filename=target_filename, media_type=media_type)
@router.delete('/{hosted_file_id}', response_model=Resp_Body_Base)
async def delete_file_action(
hosted_file_id: str = Path(min_length=11, max_length=22),
link_to_type: Optional[str] = Query(None),
link_to_id: Optional[str] = Query(None),
method: str = Query('hide', regex='^(hide|disable|delete)$'),
rm_orphan: bool = Query(False),
fake_delete: bool = Query(False), # Testing mode
account: AccountContext = Depends(get_account_context),
delay: DelayParams = Depends(),
):
"""
Intelligent relational deletion.
- Removes specified link.
- Counts remaining links.
- Optionally cleans up orphans.
- Supports fake_delete for testing.
"""
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
# 1. Resolve IDs
file_id_int = redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file')
if not file_id_int:
raise HTTPException(status_code=404, detail="Hosted file record not found.")
link_id_int = None
if link_to_type and link_to_id:
link_id_int = redis_lookup_id_random(record_id_random=link_to_id, table_name=link_to_type)
if not link_id_int:
raise HTTPException(status_code=404, detail=f"Linked object {link_to_id} not found.")
# 2. Verify State (Existence Checks)
hosted_file_obj = load_hosted_file_obj(hosted_file_id=file_id_int)
if not hosted_file_obj:
raise HTTPException(status_code=404, detail="File metadata not found.")
# Path check
hosted_files_path = settings.FILES_PATH['hosted_files_root']
file_path = os.path.join(hosted_files_path, hosted_file_obj.subdirectory_path or '', f"{hosted_file_obj.hash_sha256}.file")
file_exists_on_disk = os.path.exists(file_path)
# Link check
links = get_hosted_file_link_rec_list(hosted_file_id=file_id_int)
link_found = any(l.get('link_to_type') == link_to_type and l.get('link_to_id') == link_id_int for l in links)
if fake_delete:
log.info(f"Fake Delete active. Verifying existence...")
return mk_resp(data={
"hosted_file_exists": True,
"file_on_disk": file_exists_on_disk,
"link_exists": link_found,
"fake_delete": True
}, status_message="Fake delete successful. No data was modified.")
# 3. Execution Phase
# A. Remove the Link
if link_id_int:
delete_hosted_file_link(
account_id = account.account_id,
hosted_file_id = file_id_int,
link_to_type = link_to_type,
link_to_id = link_id_int
)
log.info(f"Deleted link between file {file_id_int} and {link_to_type}:{link_id_int}")
# B. Orphan Check & Physical Cleanup
remaining_links = get_hosted_file_link_rec_list(hosted_file_id=file_id_int)
is_orphan = (len(remaining_links) == 0)
physical_removed = False
record_removed = False
if rm_orphan and is_orphan:
log.info(f"File {file_id_int} is an orphan. Cleaning up...")
# Method Handling
if method == 'delete':
# Hard delete: Record + Disk
if file_exists_on_disk:
pathlib.Path(file_path).unlink()
physical_removed = True
sql_delete(table_name='hosted_file', record_id=file_id_int)
record_removed = True
elif method == 'hide':
sql_update(table_name='hosted_file', data={'id': file_id_int, 'hide': True})
elif method == 'disable':
sql_update(table_name='hosted_file', data={'id': file_id_int, 'enable': False})
return mk_resp(data={
"link_removed": link_found if link_id_int else False,
"is_orphan": is_orphan,
"physical_removed": physical_removed,
"record_removed": record_removed,
"method": method
}, status_message="Deletion process complete.")

View File

@@ -6,7 +6,7 @@ from app.routers import (
event_device, event_exhibit, event_exhibit_tracking, event_file, event_importing,
event_location, event_person, event_person_detail, event_person_tracking,
event_presentation, event_presenter, event_registration, event_session,
flask_cfg, fundraising, grant, hosted_file, log_client_viewing, lookup,
flask_cfg, fundraising, grant, hosted_file, api_v3_actions_hosted_file, log_client_viewing, lookup,
membership_cfg, membership_group, membership_person_group, membership_person,
membership_person_profile, membership_type, membership_person_type,
order, order_v3, order_line, order_cart, organization, page, person,
@@ -48,6 +48,7 @@ def setup_routers(app: FastAPI):
app.include_router(event_session.router, tags=['Event Session'])
app.include_router(hosted_file.router, prefix='/hosted_file', tags=['Hosted File'])
app.include_router(api_v3_actions_hosted_file.router, prefix='/v3/action/hosted_file', tags=['Hosted File (V3 Actions)'])
app.include_router(lookup.router, prefix='/lu', tags=['Lookup'])
app.include_router(organization.router, prefix='/organization', tags=['Organization'])

View File

@@ -0,0 +1,70 @@
import requests
import json
import io
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/action/hosted_file"
API_KEY = "IDF68Em5X4HTZlswRNgepQ"
ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q"
LINK_TO_TYPE = "archive_content"
LINK_TO_ID = "bZOa7CtUm0E"
def test_v3_delete_flow():
print(f"--- Starting V3 Action Delete Tests ---")
headers = {
"X-Aether-API-Key": API_KEY,
"x-account-id": ACCOUNT_ID
}
# 1. Setup: Upload a fresh file to test with
print("\n[Step 1] Uploading test file...")
files = [("file_list", ("v3_delete_test.txt", io.BytesIO(b"Delete me"), "text/plain"))]
data = {
"account_id": ACCOUNT_ID,
"link_to_type": LINK_TO_TYPE,
"link_to_id": LINK_TO_ID
}
up_resp = requests.post(f"{BASE_URL}/upload", headers=headers, files=files, data=data)
file_id = up_resp.json()['data'][0]['id']
print(f"Created file: {file_id}")
# 2. Test Fake Delete
print("\n[Step 2] Testing Fake Delete (Testing Mode)...")
params_fake = {
"link_to_type": LINK_TO_TYPE,
"link_to_id": LINK_TO_ID,
"fake_delete": "true"
}
resp_fake = requests.delete(f"{BASE_URL}/{file_id}", headers=headers, params=params_fake)
print(f"Status: {resp_fake.status_code}")
print(f"Response: {json.dumps(resp_fake.json()['data'], indent=2)}")
assert resp_fake.json()['data']['fake_delete'] == True
# 3. Test Real Delete (Link Only)
print("\n[Step 3] Testing Real Delete (Link Only, rm_orphan=False)...")
params_link = {
"link_to_type": LINK_TO_TYPE,
"link_to_id": LINK_TO_ID,
"rm_orphan": "false"
}
resp_link = requests.delete(f"{BASE_URL}/{file_id}", headers=headers, params=params_link)
print(f"Status: {resp_link.status_code}")
print(f"Response: {json.dumps(resp_link.json()['data'], indent=2)}")
assert resp_link.json()['data']['link_removed'] == True
assert resp_link.json()['data']['is_orphan'] == True # Should be orphan now, but not removed
# 4. Test Orphan Cleanup (rm_orphan=True)
print("\n[Step 4] Testing Orphan Cleanup (rm_orphan=True, method=delete)...")
params_orphan = {
"rm_orphan": "true",
"method": "delete"
}
resp_orphan = requests.delete(f"{BASE_URL}/{file_id}", headers=headers, params=params_orphan)
print(f"Status: {resp_orphan.status_code}")
print(f"Response: {json.dumps(resp_orphan.json()['data'], indent=2)}")
assert resp_orphan.json()['data']['physical_removed'] == True
assert resp_orphan.json()['data']['record_removed'] == True
if __name__ == "__main__":
test_v3_delete_flow()

View File

@@ -0,0 +1,71 @@
import requests
import json
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/action/hosted_file"
API_KEY = "IDF68Em5X4HTZlswRNgepQ"
ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q"
# This file was created during our earlier upload tests
VALID_FILE_ID = "2R06T6yuQLw"
# Use a known site key from the DB for the bypass test
# SITE_KEY = "..."
def test_download_standard():
print(f"--- Testing Standard Download via V3 Action: {VALID_FILE_ID} ---")
url = f"{BASE_URL}/{VALID_FILE_ID}/download"
headers = {
"X-Aether-API-Key": API_KEY,
"x-account-id": ACCOUNT_ID
}
try:
# We don't want to download the whole binary in a test, so we'll check headers
response = requests.get(url, headers=headers, stream=True)
print(f"Status: {response.status_code}")
print(f"Content-Type: {response.headers.get('Content-Type')}")
print(f"Content-Length: {response.headers.get('Content-Length')}")
if response.status_code == 200:
print("✅ Success: Standard download works.")
return True
else:
print(f"❌ Failed: {response.text}")
return False
except Exception as e:
print(f"💥 Exception: {e}")
return False
def test_download_streaming():
print(f"\n--- Testing Byte-Range Streaming: {VALID_FILE_ID} ---")
url = f"{BASE_URL}/{VALID_FILE_ID}/download"
headers = {
"X-Aether-API-Key": API_KEY,
"x-account-id": ACCOUNT_ID,
"Range": "bytes=0-10"
}
try:
response = requests.get(url, headers=headers)
print(f"Status: {response.status_code} (Expected 206)")
print(f"Content-Range: {response.headers.get('Content-Range')}")
if response.status_code == 206:
print("✅ Success: Byte-range streaming works.")
return True
else:
print(f"❌ Failed: {response.status_code}")
return False
except Exception as e:
print(f"💥 Exception: {e}")
return False
if __name__ == "__main__":
s1 = test_download_standard()
s2 = test_download_streaming()
if s1 and s2:
print("\n🎉 ALL DOWNLOAD ACTION TESTS PASSED!")
else:
print("\n❌ SOME TESTS FAILED.")

View File

@@ -0,0 +1,44 @@
import requests
import json
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/action/hosted_file"
API_KEY = "IDF68Em5X4HTZlswRNgepQ"
ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q"
def test_scaffold_reachability():
print("--- Testing V3 Action Router Scaffold Reachability ---")
headers = {
"X-Aether-API-Key": API_KEY,
"x-account-id": ACCOUNT_ID
}
# 1. Test Upload Scaffold
print("\n[1] Testing Upload Action Reachability...")
files = [("file_list", ("test.txt", b"content", "text/plain"))]
data = {
"account_id": ACCOUNT_ID,
"link_to_type": "archive_content",
"link_to_id": "bZOa7CtUm0E8hx2FjbQ3C_"
}
resp = requests.post(f"{BASE_URL}/upload", headers=headers, files=files, data=data)
print(f"Status: {resp.status_code}")
if resp.status_code == 200:
print(f"✅ Success: {resp.json().get('status_message')}")
else:
print(f"❌ Failed: {resp.text}")
# 2. Test Download Scaffold with Delay
print("\n[2] Testing Download Action Reachability (with 500ms delay)...")
headers_w_delay = headers.copy()
headers_w_delay["X-Delay-ms"] = "500"
resp = requests.get(f"{BASE_URL}/some_file_id/download", headers=headers_w_delay)
print(f"Status: {resp.status_code}")
if resp.status_code == 200:
print(f"✅ Success: {resp.json().get('status_message')}")
else:
print(f"❌ Failed: {resp.text}")
if __name__ == "__main__":
test_scaffold_reachability()

View File

@@ -0,0 +1,91 @@
import requests
import io
import json
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/action/hosted_file"
API_KEY = "IDF68Em5X4HTZlswRNgepQ"
ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q"
LINK_TO_TYPE = "archive_content"
LINK_TO_ID = "bZOa7CtUm0E"
def test_v3_upload_flow():
print(f"--- Starting V3 Action Upload Tests against {BASE_URL} ---")
headers = {
"X-Aether-API-Key": API_KEY,
"x-account-id": ACCOUNT_ID
}
# 1. Multi-File Upload with Extension Check
print("\n[Test 1] Multi-File Upload + Extension Validation...")
files = [
("file_list", ("v3_multi_1.txt", io.BytesIO(b"V3 Content 1"), "text/plain")),
("file_list", ("v3_multi_2.txt", io.BytesIO(b"V3 Content 2"), "text/plain")),
]
data = {
"account_id": ACCOUNT_ID,
"link_to_type": LINK_TO_TYPE,
"link_to_id": LINK_TO_ID
}
params = {"allowed_extensions": ["txt", "pdf"]}
url = f"{BASE_URL}/upload"
try:
response = requests.post(url, headers=headers, files=files, data=data, params=params)
print(f"Status: {response.status_code}")
if response.status_code == 200:
result = response.json()
data_list = result.get('data', [])
print(f"✅ Success! Processed {len(data_list)} files.")
for i, f in enumerate(data_list):
print(f" File {i+1} ID: {f.get('id')} | Name: {f.get('filename')}")
assert isinstance(f.get('id'), str)
else:
print(f"❌ Failed: {response.text}")
return
except Exception as e:
print(f"💥 Exception: {e}")
return
# 2. Test Deduplication (Upload same file again)
print("\n[Test 2] Testing Deduplication Logic...")
files_dup = [
("file_list", ("v3_multi_1.txt", io.BytesIO(b"V3 Content 1"), "text/plain"))
]
try:
response = requests.post(url, headers=headers, files=files_dup, data=data)
print(f"Status: {response.status_code}")
if response.status_code == 200:
result = response.json()
file_data = result.get('data', [])[0]
print(f"Already Exists Flag: {file_data.get('already_exists')}")
assert file_data.get('already_exists') == True
print("✅ Deduplication logic verified.")
else:
print(f"❌ Failed: {response.text}")
except Exception as e:
print(f"💥 Exception: {e}")
# 3. Test Extension Rejection
print("\n[Test 3] Testing Extension Rejection...")
files_bad = [
("file_list", ("virus.exe", io.BytesIO(b"bad"), "application/octet-stream"))
]
params_restrict = {"allowed_extensions": ["txt"]}
try:
response = requests.post(url, headers=headers, files=files_bad, data=data, params=params_restrict)
print(f"Status: {response.status_code} (Expected 400)")
if response.status_code == 400:
print(f"✅ Success: correctly rejected .exe file.")
else:
print(f"❌ Failure: allowed .exe file with status {response.status_code}")
except Exception as e:
print(f"💥 Exception: {e}")
if __name__ == "__main__":
test_v3_upload_flow()