import requests import io import json # Configuration BASE_URL = "https://dev-api.oneskyit.com/v3/action/hosted_file" API_KEY = "IDF68Em5X4HTZlswRNgepQ" ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q" LINK_TO_TYPE = "archive_content" LINK_TO_ID = "bZOa7CtUm0E" def test_v3_upload_flow(): print(f"--- Starting V3 Action Upload Tests against {BASE_URL} ---") headers = { "X-Aether-API-Key": API_KEY, "x-account-id": ACCOUNT_ID } # 1. Multi-File Upload with Extension Check print("\n[Test 1] Multi-File Upload + Extension Validation...") files = [ ("file_list", ("v3_multi_1.txt", io.BytesIO(b"V3 Content 1"), "text/plain")), ("file_list", ("v3_multi_2.txt", io.BytesIO(b"V3 Content 2"), "text/plain")), ] data = { "account_id": ACCOUNT_ID, "link_to_type": LINK_TO_TYPE, "link_to_id": LINK_TO_ID } params = {"allowed_extensions": ["txt", "pdf"]} url = f"{BASE_URL}/upload" try: response = requests.post(url, headers=headers, files=files, data=data, params=params) print(f"Status: {response.status_code}") if response.status_code == 200: result = response.json() data_list = result.get('data', []) print(f"✅ Success! Processed {len(data_list)} files.") for i, f in enumerate(data_list): print(f" File {i+1} ID: {f.get('id')} | Name: {f.get('filename')}") assert isinstance(f.get('id'), str) else: print(f"❌ Failed: {response.text}") return except Exception as e: print(f"💥 Exception: {e}") return # 2. Test Deduplication (Upload same file again) print("\n[Test 2] Testing Deduplication Logic...") files_dup = [ ("file_list", ("v3_multi_1.txt", io.BytesIO(b"V3 Content 1"), "text/plain")) ] try: response = requests.post(url, headers=headers, files=files_dup, data=data) print(f"Status: {response.status_code}") if response.status_code == 200: result = response.json() file_data = result.get('data', [])[0] print(f"Already Exists Flag: {file_data.get('already_exists')}") assert file_data.get('already_exists') == True print("✅ Deduplication logic verified.") else: print(f"❌ Failed: {response.text}") except Exception as e: print(f"💥 Exception: {e}") # 3. Test Extension Rejection print("\n[Test 3] Testing Extension Rejection...") files_bad = [ ("file_list", ("virus.exe", io.BytesIO(b"bad"), "application/octet-stream")) ] params_restrict = {"allowed_extensions": ["txt"]} try: response = requests.post(url, headers=headers, files=files_bad, data=data, params=params_restrict) print(f"Status: {response.status_code} (Expected 400)") if response.status_code == 400: print(f"✅ Success: correctly rejected .exe file.") else: print(f"❌ Failure: allowed .exe file with status {response.status_code}") except Exception as e: print(f"💥 Exception: {e}") if __name__ == "__main__": test_v3_upload_flow()