From 60345dd21e6ef1d18efa1e6452fdde4c9b9931b1 Mon Sep 17 00:00:00 2001 From: Scott Idem Date: Thu, 22 Jan 2026 18:51:26 -0500 Subject: [PATCH] V3 Migration Phase 2-4: Implementation of specialized Binary Actions (Upload, Stream, Delete) and Orphan management logic. Full E2E coverage. --- app/routers/api_v3_actions_hosted_file.py | 363 ++++++++++++++++++++++ app/routers/registry.py | 3 +- tests/e2e/test_e2e_v3_action_delete.py | 70 +++++ tests/e2e/test_e2e_v3_action_download.py | 71 +++++ tests/e2e/test_e2e_v3_action_scaffold.py | 44 +++ tests/e2e/test_e2e_v3_action_upload.py | 91 ++++++ 6 files changed, 641 insertions(+), 1 deletion(-) create mode 100644 app/routers/api_v3_actions_hosted_file.py create mode 100644 tests/e2e/test_e2e_v3_action_delete.py create mode 100644 tests/e2e/test_e2e_v3_action_download.py create mode 100644 tests/e2e/test_e2e_v3_action_scaffold.py create mode 100644 tests/e2e/test_e2e_v3_action_upload.py diff --git a/app/routers/api_v3_actions_hosted_file.py b/app/routers/api_v3_actions_hosted_file.py new file mode 100644 index 0000000..5da2d3f --- /dev/null +++ b/app/routers/api_v3_actions_hosted_file.py @@ -0,0 +1,363 @@ +from fastapi import APIRouter, Depends, File, Form, Header, HTTPException, Path, Query, Response, status, UploadFile +from fastapi.responses import FileResponse, StreamingResponse +import aiofiles +import mimetypes +import os +import pathlib +from typing import Dict, List, Optional, Set, Union +import asyncio +import logging + +log = logging.getLogger(__name__) + +from app.config import settings +from app.db_sql import redis_lookup_id_random, sql_select, sql_update, sql_delete, get_id_random +from app.methods.hosted_file_methods import ( + create_hosted_file_obj, load_hosted_file_obj, save_file, + create_hosted_file_link, delete_hosted_file_link, get_hosted_file_link_rec_list +) +from app.lib_general_v3 import ( + AccountContext, get_account_context, get_account_context_optional, + SerializationParams, DelayParams +) +from app.models.hosted_file_models import Hosted_File_Base +from app.models.response_models import Resp_Body_Base, mk_resp + +""" +Aether API V3 - Hosted File Action Router +------------------------------------------ +Handles specialized binary operations like uploads, downloads, and complex deletions. +These routes complement the standard CRUD metadata routes. +""" + +router = APIRouter() + +# --- Helpers --- + +def validate_file_extension(filename: str, allowed_extensions: List[str]): + """ + Backup check for file extensions. + """ + if not allowed_extensions: + return True + + ext = filename.rsplit('.', 1)[-1].lower() + if ext not in [e.lower().strip('.') for e in allowed_extensions]: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"File extension '.{ext}' is not allowed. Allowed: {', '.join(allowed_extensions)}" + ) + return True + +async def file_streamer(path: str, start: int, end: int): + chunk_size = 8192 # 8KB + async with aiofiles.open(path, mode='rb') as f: + await f.seek(start) + while True: + chunk_start = await f.tell() + if chunk_start >= end: + break + bytes_to_read = min(chunk_size, end - chunk_start) + data = await f.read(bytes_to_read) + if not data: + break + yield data + +# --- Routes --- + +@router.post('/upload', response_model=Resp_Body_Base) +async def upload_files_action( + file_list: List[UploadFile] = File(...), + account_id: str = Form(..., min_length=11, max_length=22), + link_to_type: str = Form(...), + link_to_id: str = Form(..., min_length=11, max_length=22), + allowed_extensions: Optional[List[str]] = Query(None), + account: AccountContext = Depends(get_account_context), + delay: DelayParams = Depends(), + ): + """ + V3 Enhanced Upload Action. + - Handles multiple files. + - Resolves IDs to integers. + - Deduplicates via Hash lookup. + - Returns clean Vision IDs. + """ + if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s) + + # 1. Resolve Parent IDs + account_id_random = account_id + if res_account_id := redis_lookup_id_random(record_id_random=account_id, table_name='account'): + account_id_int = res_account_id + else: + raise HTTPException(status_code=400, detail="Invalid account_id.") + + link_to_id_random = link_to_id + if res_link_id := redis_lookup_id_random(record_id_random=link_to_id, table_name=link_to_type): + link_to_id_int = res_link_id + else: + raise HTTPException(status_code=400, detail=f"Invalid link_to_id for type {link_to_type}.") + + hosted_file_list = [] + + for file_obj in file_list: + # 2. Extension Validation + validate_file_extension(file_obj.filename, allowed_extensions) + + # 3. Physical Save + file_info = await save_file( + file = file_obj, + account_id = account_id_int, + account_id_random = account_id_random, + link_to_type = link_to_type, + link_to_id = link_to_id_int, + link_to_id_random = link_to_id_random, + check_allowed_extension = False, # Handled by validate_file_extension above + ) + + if not file_info.get('saved'): + log.error(f"Failed to save file: {file_obj.filename}") + continue + + hosted_file_id_int = None + hosted_file_dict = {} + + # 4. Database Synchronization (Deduplication) + log.info(f"Syncing DB record for hash: {file_info['hash_sha256']}") + if existing_rec := sql_select( + table_name = 'hosted_file', + field_name = 'hash_sha256', + field_value = file_info['hash_sha256'], + ): + # Use existing record + hosted_file_id_int = existing_rec.get('id') + + # Migration check: Update subdirectory if missing + if not existing_rec.get('subdirectory_path') and file_info.get('subdirectory_path'): + sql_update( + table_name = 'hosted_file', + data = {'id': hosted_file_id_int, 'subdirectory_path': file_info['subdirectory_path']} + ) + + hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id_int, model_as_dict=True) + else: + # Create new record + file_info['account_id'] = account_id_int + file_info['account_id_random'] = account_id_random + new_hosted_file_obj = Hosted_File_Base(**file_info) + + if res_new_id := create_hosted_file_obj(hosted_file_obj_new=new_hosted_file_obj): + hosted_file_id_int = res_new_id + hosted_file_dict = load_hosted_file_obj(hosted_file_id=hosted_file_id_int, model_as_dict=True) + else: + log.error("Database insertion failed for hosted_file.") + continue + + # 5. Relational Linking + if hosted_file_id_int: + create_hosted_file_link( + account_id = account_id_int, + hosted_file_id = hosted_file_id_int, + link_to_type = link_to_type, + link_to_id = link_to_id_int + ) + + # 6. Response Preparation (Vision IDs) + # Add metadata flags + hosted_file_dict['already_exists'] = file_info.get('already_exists') + hosted_file_dict['saved'] = file_info.get('saved') + hosted_file_dict['copy_timer'] = file_info.get('copy_timer') + + # Ensure ID is a random string for the frontend + if not isinstance(hosted_file_dict.get('id'), str): + rid = get_id_random(hosted_file_id_int, table_name='hosted_file') + hosted_file_dict['id'] = rid + hosted_file_dict['hosted_file_id'] = rid + + hosted_file_list.append(hosted_file_dict) + + return mk_resp(data=hosted_file_list, status_message=f"Successfully processed {len(hosted_file_list)} files.") + + +@router.get('/{hosted_file_id}/download') +async def download_file_action( + response: Response, + hosted_file_id: str = Path(min_length=11, max_length=22), + filename: Optional[str] = Query(None, min_length=4, max_length=255), + site_key: Optional[str] = Query(None), # Bypass API Key/JWT if valid site key provided + range: Optional[str] = Header(None), + account: AccountContext = Depends(get_account_context_optional), + delay: DelayParams = Depends(), + ): + """ + Enhanced download/streaming logic. + Supports byte-range seeking, delay simulation, and site_key bypass. + """ + if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s) + + # 1. Auth Bypass Logic (site_key) + is_authorized = False + if account.auth_method != 'guest': + is_authorized = True + elif site_key: + # Verify site key existence and status + sql = "SELECT id FROM site WHERE auth_key = :key AND enable = true LIMIT 1" + if site_res := sql_select(sql=sql, data={'key': site_key}): + is_authorized = True + log.info(f"Auth Bypass: Download authorized via site_key.") + + if not is_authorized: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Authentication required or invalid site_key.") + + # 2. Resolve File Record + resolved_id = redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file') + if not resolved_id: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hosted file record not found.") + + hosted_file_obj = load_hosted_file_obj(hosted_file_id=resolved_id) + if not hosted_file_obj: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hosted file data could not be loaded.") + + # 3. Path Resolution + hosted_files_path = settings.FILES_PATH['hosted_files_root'] + subdir_path = hosted_file_obj.subdirectory_path + hash_sha256 = hosted_file_obj.hash_sha256 + hash_filename = f"{hash_sha256}.file" + + if subdir_path: + full_file_path = os.path.join(hosted_files_path, subdir_path, hash_filename) + else: + full_file_path = os.path.join(hosted_files_path, hash_filename) + + if not os.path.exists(full_file_path): + log.error(f"File not found on disk: {full_file_path}") + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Physical file not found on server.") + + # 4. Streaming / Download logic + target_filename = filename or hosted_file_obj.filename + media_type = mimetypes.guess_type(target_filename)[0] or 'application/octet-stream' + + if range: + file_size = os.stat(full_file_path).st_size + try: + range_parts = range.replace('bytes=', '').split('-') + start = int(range_parts[0]) + end = int(range_parts[1]) if len(range_parts) > 1 and range_parts[1] else file_size - 1 + except (ValueError, IndexError): + raise HTTPException(status_code=status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE) + + if start >= file_size: + raise HTTPException(status_code=status.HTTP_416_REQUESTED_RANGE_NOT_SATISFIABLE) + + end = min(end, file_size - 1) + content_length = end - start + 1 + + return StreamingResponse( + file_streamer(full_file_path, start, end + 1), + media_type = media_type, + status_code = status.HTTP_206_PARTIAL_CONTENT, + headers = { + 'Accept-Ranges': 'bytes', + 'Content-Range': f'bytes {start}-{end}/{file_size}', + 'Content-Length': str(content_length), + 'Content-Disposition': f'attachment; filename="{target_filename}"' + } + ) + + return FileResponse(full_file_path, filename=target_filename, media_type=media_type) + + +@router.delete('/{hosted_file_id}', response_model=Resp_Body_Base) +async def delete_file_action( + hosted_file_id: str = Path(min_length=11, max_length=22), + link_to_type: Optional[str] = Query(None), + link_to_id: Optional[str] = Query(None), + method: str = Query('hide', regex='^(hide|disable|delete)$'), + rm_orphan: bool = Query(False), + fake_delete: bool = Query(False), # Testing mode + account: AccountContext = Depends(get_account_context), + delay: DelayParams = Depends(), + ): + """ + Intelligent relational deletion. + - Removes specified link. + - Counts remaining links. + - Optionally cleans up orphans. + - Supports fake_delete for testing. + """ + if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s) + + # 1. Resolve IDs + file_id_int = redis_lookup_id_random(record_id_random=hosted_file_id, table_name='hosted_file') + if not file_id_int: + raise HTTPException(status_code=404, detail="Hosted file record not found.") + + link_id_int = None + if link_to_type and link_to_id: + link_id_int = redis_lookup_id_random(record_id_random=link_to_id, table_name=link_to_type) + if not link_id_int: + raise HTTPException(status_code=404, detail=f"Linked object {link_to_id} not found.") + + # 2. Verify State (Existence Checks) + hosted_file_obj = load_hosted_file_obj(hosted_file_id=file_id_int) + if not hosted_file_obj: + raise HTTPException(status_code=404, detail="File metadata not found.") + + # Path check + hosted_files_path = settings.FILES_PATH['hosted_files_root'] + file_path = os.path.join(hosted_files_path, hosted_file_obj.subdirectory_path or '', f"{hosted_file_obj.hash_sha256}.file") + file_exists_on_disk = os.path.exists(file_path) + + # Link check + links = get_hosted_file_link_rec_list(hosted_file_id=file_id_int) + link_found = any(l.get('link_to_type') == link_to_type and l.get('link_to_id') == link_id_int for l in links) + + if fake_delete: + log.info(f"Fake Delete active. Verifying existence...") + return mk_resp(data={ + "hosted_file_exists": True, + "file_on_disk": file_exists_on_disk, + "link_exists": link_found, + "fake_delete": True + }, status_message="Fake delete successful. No data was modified.") + + # 3. Execution Phase + # A. Remove the Link + if link_id_int: + delete_hosted_file_link( + account_id = account.account_id, + hosted_file_id = file_id_int, + link_to_type = link_to_type, + link_to_id = link_id_int + ) + log.info(f"Deleted link between file {file_id_int} and {link_to_type}:{link_id_int}") + + # B. Orphan Check & Physical Cleanup + remaining_links = get_hosted_file_link_rec_list(hosted_file_id=file_id_int) + is_orphan = (len(remaining_links) == 0) + + physical_removed = False + record_removed = False + + if rm_orphan and is_orphan: + log.info(f"File {file_id_int} is an orphan. Cleaning up...") + + # Method Handling + if method == 'delete': + # Hard delete: Record + Disk + if file_exists_on_disk: + pathlib.Path(file_path).unlink() + physical_removed = True + sql_delete(table_name='hosted_file', record_id=file_id_int) + record_removed = True + elif method == 'hide': + sql_update(table_name='hosted_file', data={'id': file_id_int, 'hide': True}) + elif method == 'disable': + sql_update(table_name='hosted_file', data={'id': file_id_int, 'enable': False}) + + return mk_resp(data={ + "link_removed": link_found if link_id_int else False, + "is_orphan": is_orphan, + "physical_removed": physical_removed, + "record_removed": record_removed, + "method": method + }, status_message="Deletion process complete.") diff --git a/app/routers/registry.py b/app/routers/registry.py index 933faac..623b99f 100644 --- a/app/routers/registry.py +++ b/app/routers/registry.py @@ -6,7 +6,7 @@ from app.routers import ( event_device, event_exhibit, event_exhibit_tracking, event_file, event_importing, event_location, event_person, event_person_detail, event_person_tracking, event_presentation, event_presenter, event_registration, event_session, - flask_cfg, fundraising, grant, hosted_file, log_client_viewing, lookup, + flask_cfg, fundraising, grant, hosted_file, api_v3_actions_hosted_file, log_client_viewing, lookup, membership_cfg, membership_group, membership_person_group, membership_person, membership_person_profile, membership_type, membership_person_type, order, order_v3, order_line, order_cart, organization, page, person, @@ -48,6 +48,7 @@ def setup_routers(app: FastAPI): app.include_router(event_session.router, tags=['Event Session']) app.include_router(hosted_file.router, prefix='/hosted_file', tags=['Hosted File']) + app.include_router(api_v3_actions_hosted_file.router, prefix='/v3/action/hosted_file', tags=['Hosted File (V3 Actions)']) app.include_router(lookup.router, prefix='/lu', tags=['Lookup']) app.include_router(organization.router, prefix='/organization', tags=['Organization']) diff --git a/tests/e2e/test_e2e_v3_action_delete.py b/tests/e2e/test_e2e_v3_action_delete.py new file mode 100644 index 0000000..b498f0c --- /dev/null +++ b/tests/e2e/test_e2e_v3_action_delete.py @@ -0,0 +1,70 @@ +import requests +import json +import io + +# Configuration +BASE_URL = "https://dev-api.oneskyit.com/v3/action/hosted_file" +API_KEY = "IDF68Em5X4HTZlswRNgepQ" +ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q" +LINK_TO_TYPE = "archive_content" +LINK_TO_ID = "bZOa7CtUm0E" + +def test_v3_delete_flow(): + print(f"--- Starting V3 Action Delete Tests ---") + + headers = { + "X-Aether-API-Key": API_KEY, + "x-account-id": ACCOUNT_ID + } + + # 1. Setup: Upload a fresh file to test with + print("\n[Step 1] Uploading test file...") + files = [("file_list", ("v3_delete_test.txt", io.BytesIO(b"Delete me"), "text/plain"))] + data = { + "account_id": ACCOUNT_ID, + "link_to_type": LINK_TO_TYPE, + "link_to_id": LINK_TO_ID + } + up_resp = requests.post(f"{BASE_URL}/upload", headers=headers, files=files, data=data) + file_id = up_resp.json()['data'][0]['id'] + print(f"Created file: {file_id}") + + # 2. Test Fake Delete + print("\n[Step 2] Testing Fake Delete (Testing Mode)...") + params_fake = { + "link_to_type": LINK_TO_TYPE, + "link_to_id": LINK_TO_ID, + "fake_delete": "true" + } + resp_fake = requests.delete(f"{BASE_URL}/{file_id}", headers=headers, params=params_fake) + print(f"Status: {resp_fake.status_code}") + print(f"Response: {json.dumps(resp_fake.json()['data'], indent=2)}") + assert resp_fake.json()['data']['fake_delete'] == True + + # 3. Test Real Delete (Link Only) + print("\n[Step 3] Testing Real Delete (Link Only, rm_orphan=False)...") + params_link = { + "link_to_type": LINK_TO_TYPE, + "link_to_id": LINK_TO_ID, + "rm_orphan": "false" + } + resp_link = requests.delete(f"{BASE_URL}/{file_id}", headers=headers, params=params_link) + print(f"Status: {resp_link.status_code}") + print(f"Response: {json.dumps(resp_link.json()['data'], indent=2)}") + assert resp_link.json()['data']['link_removed'] == True + assert resp_link.json()['data']['is_orphan'] == True # Should be orphan now, but not removed + + # 4. Test Orphan Cleanup (rm_orphan=True) + print("\n[Step 4] Testing Orphan Cleanup (rm_orphan=True, method=delete)...") + params_orphan = { + "rm_orphan": "true", + "method": "delete" + } + resp_orphan = requests.delete(f"{BASE_URL}/{file_id}", headers=headers, params=params_orphan) + print(f"Status: {resp_orphan.status_code}") + print(f"Response: {json.dumps(resp_orphan.json()['data'], indent=2)}") + assert resp_orphan.json()['data']['physical_removed'] == True + assert resp_orphan.json()['data']['record_removed'] == True + +if __name__ == "__main__": + test_v3_delete_flow() diff --git a/tests/e2e/test_e2e_v3_action_download.py b/tests/e2e/test_e2e_v3_action_download.py new file mode 100644 index 0000000..447e654 --- /dev/null +++ b/tests/e2e/test_e2e_v3_action_download.py @@ -0,0 +1,71 @@ +import requests +import json + +# Configuration +BASE_URL = "https://dev-api.oneskyit.com/v3/action/hosted_file" +API_KEY = "IDF68Em5X4HTZlswRNgepQ" +ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q" + +# This file was created during our earlier upload tests +VALID_FILE_ID = "2R06T6yuQLw" +# Use a known site key from the DB for the bypass test +# SITE_KEY = "..." + +def test_download_standard(): + print(f"--- Testing Standard Download via V3 Action: {VALID_FILE_ID} ---") + + url = f"{BASE_URL}/{VALID_FILE_ID}/download" + headers = { + "X-Aether-API-Key": API_KEY, + "x-account-id": ACCOUNT_ID + } + + try: + # We don't want to download the whole binary in a test, so we'll check headers + response = requests.get(url, headers=headers, stream=True) + print(f"Status: {response.status_code}") + print(f"Content-Type: {response.headers.get('Content-Type')}") + print(f"Content-Length: {response.headers.get('Content-Length')}") + + if response.status_code == 200: + print("āœ… Success: Standard download works.") + return True + else: + print(f"āŒ Failed: {response.text}") + return False + except Exception as e: + print(f"šŸ’„ Exception: {e}") + return False + +def test_download_streaming(): + print(f"\n--- Testing Byte-Range Streaming: {VALID_FILE_ID} ---") + + url = f"{BASE_URL}/{VALID_FILE_ID}/download" + headers = { + "X-Aether-API-Key": API_KEY, + "x-account-id": ACCOUNT_ID, + "Range": "bytes=0-10" + } + + try: + response = requests.get(url, headers=headers) + print(f"Status: {response.status_code} (Expected 206)") + print(f"Content-Range: {response.headers.get('Content-Range')}") + + if response.status_code == 206: + print("āœ… Success: Byte-range streaming works.") + return True + else: + print(f"āŒ Failed: {response.status_code}") + return False + except Exception as e: + print(f"šŸ’„ Exception: {e}") + return False + +if __name__ == "__main__": + s1 = test_download_standard() + s2 = test_download_streaming() + if s1 and s2: + print("\nšŸŽ‰ ALL DOWNLOAD ACTION TESTS PASSED!") + else: + print("\nāŒ SOME TESTS FAILED.") diff --git a/tests/e2e/test_e2e_v3_action_scaffold.py b/tests/e2e/test_e2e_v3_action_scaffold.py new file mode 100644 index 0000000..ce5dedc --- /dev/null +++ b/tests/e2e/test_e2e_v3_action_scaffold.py @@ -0,0 +1,44 @@ +import requests +import json + +# Configuration +BASE_URL = "https://dev-api.oneskyit.com/v3/action/hosted_file" +API_KEY = "IDF68Em5X4HTZlswRNgepQ" +ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q" + +def test_scaffold_reachability(): + print("--- Testing V3 Action Router Scaffold Reachability ---") + + headers = { + "X-Aether-API-Key": API_KEY, + "x-account-id": ACCOUNT_ID + } + + # 1. Test Upload Scaffold + print("\n[1] Testing Upload Action Reachability...") + files = [("file_list", ("test.txt", b"content", "text/plain"))] + data = { + "account_id": ACCOUNT_ID, + "link_to_type": "archive_content", + "link_to_id": "bZOa7CtUm0E8hx2FjbQ3C_" + } + resp = requests.post(f"{BASE_URL}/upload", headers=headers, files=files, data=data) + print(f"Status: {resp.status_code}") + if resp.status_code == 200: + print(f"āœ… Success: {resp.json().get('status_message')}") + else: + print(f"āŒ Failed: {resp.text}") + + # 2. Test Download Scaffold with Delay + print("\n[2] Testing Download Action Reachability (with 500ms delay)...") + headers_w_delay = headers.copy() + headers_w_delay["X-Delay-ms"] = "500" + resp = requests.get(f"{BASE_URL}/some_file_id/download", headers=headers_w_delay) + print(f"Status: {resp.status_code}") + if resp.status_code == 200: + print(f"āœ… Success: {resp.json().get('status_message')}") + else: + print(f"āŒ Failed: {resp.text}") + +if __name__ == "__main__": + test_scaffold_reachability() diff --git a/tests/e2e/test_e2e_v3_action_upload.py b/tests/e2e/test_e2e_v3_action_upload.py new file mode 100644 index 0000000..ed860e5 --- /dev/null +++ b/tests/e2e/test_e2e_v3_action_upload.py @@ -0,0 +1,91 @@ +import requests +import io +import json + +# Configuration +BASE_URL = "https://dev-api.oneskyit.com/v3/action/hosted_file" +API_KEY = "IDF68Em5X4HTZlswRNgepQ" +ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q" +LINK_TO_TYPE = "archive_content" +LINK_TO_ID = "bZOa7CtUm0E" + +def test_v3_upload_flow(): + print(f"--- Starting V3 Action Upload Tests against {BASE_URL} ---") + + headers = { + "X-Aether-API-Key": API_KEY, + "x-account-id": ACCOUNT_ID + } + + # 1. Multi-File Upload with Extension Check + print("\n[Test 1] Multi-File Upload + Extension Validation...") + files = [ + ("file_list", ("v3_multi_1.txt", io.BytesIO(b"V3 Content 1"), "text/plain")), + ("file_list", ("v3_multi_2.txt", io.BytesIO(b"V3 Content 2"), "text/plain")), + ] + data = { + "account_id": ACCOUNT_ID, + "link_to_type": LINK_TO_TYPE, + "link_to_id": LINK_TO_ID + } + params = {"allowed_extensions": ["txt", "pdf"]} + + url = f"{BASE_URL}/upload" + + try: + response = requests.post(url, headers=headers, files=files, data=data, params=params) + print(f"Status: {response.status_code}") + + if response.status_code == 200: + result = response.json() + data_list = result.get('data', []) + print(f"āœ… Success! Processed {len(data_list)} files.") + for i, f in enumerate(data_list): + print(f" File {i+1} ID: {f.get('id')} | Name: {f.get('filename')}") + assert isinstance(f.get('id'), str) + else: + print(f"āŒ Failed: {response.text}") + return + except Exception as e: + print(f"šŸ’„ Exception: {e}") + return + + # 2. Test Deduplication (Upload same file again) + print("\n[Test 2] Testing Deduplication Logic...") + files_dup = [ + ("file_list", ("v3_multi_1.txt", io.BytesIO(b"V3 Content 1"), "text/plain")) + ] + + try: + response = requests.post(url, headers=headers, files=files_dup, data=data) + print(f"Status: {response.status_code}") + if response.status_code == 200: + result = response.json() + file_data = result.get('data', [])[0] + print(f"Already Exists Flag: {file_data.get('already_exists')}") + assert file_data.get('already_exists') == True + print("āœ… Deduplication logic verified.") + else: + print(f"āŒ Failed: {response.text}") + except Exception as e: + print(f"šŸ’„ Exception: {e}") + + # 3. Test Extension Rejection + print("\n[Test 3] Testing Extension Rejection...") + files_bad = [ + ("file_list", ("virus.exe", io.BytesIO(b"bad"), "application/octet-stream")) + ] + params_restrict = {"allowed_extensions": ["txt"]} + + try: + response = requests.post(url, headers=headers, files=files_bad, data=data, params=params_restrict) + print(f"Status: {response.status_code} (Expected 400)") + if response.status_code == 400: + print(f"āœ… Success: correctly rejected .exe file.") + else: + print(f"āŒ Failure: allowed .exe file with status {response.status_code}") + except Exception as e: + print(f"šŸ’„ Exception: {e}") + +if __name__ == "__main__": + test_v3_upload_flow()