Files
OSIT-AE-API-FastAPI/tests/e2e/test_e2e_v3_action_upload.py

92 lines
3.2 KiB
Python

import requests
import io
import json
# Configuration
BASE_URL = "https://dev-api.oneskyit.com/v3/action/hosted_file"
API_KEY = "IDF68Em5X4HTZlswRNgepQ"
ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q"
LINK_TO_TYPE = "archive_content"
LINK_TO_ID = "bZOa7CtUm0E"
def test_v3_upload_flow():
print(f"--- Starting V3 Action Upload Tests against {BASE_URL} ---")
headers = {
"X-Aether-API-Key": API_KEY,
"x-account-id": ACCOUNT_ID
}
# 1. Multi-File Upload with Extension Check
print("\n[Test 1] Multi-File Upload + Extension Validation...")
files = [
("file_list", ("v3_multi_1.txt", io.BytesIO(b"V3 Content 1"), "text/plain")),
("file_list", ("v3_multi_2.txt", io.BytesIO(b"V3 Content 2"), "text/plain")),
]
data = {
"account_id": ACCOUNT_ID,
"link_to_type": LINK_TO_TYPE,
"link_to_id": LINK_TO_ID
}
params = {"allowed_extensions": ["txt", "pdf"]}
url = f"{BASE_URL}/upload"
try:
response = requests.post(url, headers=headers, files=files, data=data, params=params)
print(f"Status: {response.status_code}")
if response.status_code == 200:
result = response.json()
data_list = result.get('data', [])
print(f"✅ Success! Processed {len(data_list)} files.")
for i, f in enumerate(data_list):
print(f" File {i+1} ID: {f.get('id')} | Name: {f.get('filename')}")
assert isinstance(f.get('id'), str)
else:
print(f"❌ Failed: {response.text}")
return
except Exception as e:
print(f"💥 Exception: {e}")
return
# 2. Test Deduplication (Upload same file again)
print("\n[Test 2] Testing Deduplication Logic...")
files_dup = [
("file_list", ("v3_multi_1.txt", io.BytesIO(b"V3 Content 1"), "text/plain"))
]
try:
response = requests.post(url, headers=headers, files=files_dup, data=data)
print(f"Status: {response.status_code}")
if response.status_code == 200:
result = response.json()
file_data = result.get('data', [])[0]
print(f"Already Exists Flag: {file_data.get('already_exists')}")
assert file_data.get('already_exists') == True
print("✅ Deduplication logic verified.")
else:
print(f"❌ Failed: {response.text}")
except Exception as e:
print(f"💥 Exception: {e}")
# 3. Test Extension Rejection
print("\n[Test 3] Testing Extension Rejection...")
files_bad = [
("file_list", ("virus.exe", io.BytesIO(b"bad"), "application/octet-stream"))
]
params_restrict = {"allowed_extensions": ["txt"]}
try:
response = requests.post(url, headers=headers, files=files_bad, data=data, params=params_restrict)
print(f"Status: {response.status_code} (Expected 400)")
if response.status_code == 400:
print(f"✅ Success: correctly rejected .exe file.")
else:
print(f"❌ Failure: allowed .exe file with status {response.status_code}")
except Exception as e:
print(f"💥 Exception: {e}")
if __name__ == "__main__":
test_v3_upload_flow()