92 lines
3.2 KiB
Python
92 lines
3.2 KiB
Python
import requests
|
|
import io
|
|
import json
|
|
|
|
# Configuration
|
|
BASE_URL = "https://dev-api.oneskyit.com/v3/action/hosted_file"
|
|
API_KEY = "IDF68Em5X4HTZlswRNgepQ"
|
|
ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q"
|
|
LINK_TO_TYPE = "archive_content"
|
|
LINK_TO_ID = "bZOa7CtUm0E"
|
|
|
|
def test_v3_upload_flow():
|
|
print(f"--- Starting V3 Action Upload Tests against {BASE_URL} ---")
|
|
|
|
headers = {
|
|
"X-Aether-API-Key": API_KEY,
|
|
"x-account-id": ACCOUNT_ID
|
|
}
|
|
|
|
# 1. Multi-File Upload with Extension Check
|
|
print("\n[Test 1] Multi-File Upload + Extension Validation...")
|
|
files = [
|
|
("file_list", ("v3_multi_1.txt", io.BytesIO(b"V3 Content 1"), "text/plain")),
|
|
("file_list", ("v3_multi_2.txt", io.BytesIO(b"V3 Content 2"), "text/plain")),
|
|
]
|
|
data = {
|
|
"account_id": ACCOUNT_ID,
|
|
"link_to_type": LINK_TO_TYPE,
|
|
"link_to_id": LINK_TO_ID
|
|
}
|
|
params = {"allowed_extensions": ["txt", "pdf"]}
|
|
|
|
url = f"{BASE_URL}/upload"
|
|
|
|
try:
|
|
response = requests.post(url, headers=headers, files=files, data=data, params=params)
|
|
print(f"Status: {response.status_code}")
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
data_list = result.get('data', [])
|
|
print(f"✅ Success! Processed {len(data_list)} files.")
|
|
for i, f in enumerate(data_list):
|
|
print(f" File {i+1} ID: {f.get('id')} | Name: {f.get('filename')}")
|
|
assert isinstance(f.get('id'), str)
|
|
else:
|
|
print(f"❌ Failed: {response.text}")
|
|
return
|
|
except Exception as e:
|
|
print(f"💥 Exception: {e}")
|
|
return
|
|
|
|
# 2. Test Deduplication (Upload same file again)
|
|
print("\n[Test 2] Testing Deduplication Logic...")
|
|
files_dup = [
|
|
("file_list", ("v3_multi_1.txt", io.BytesIO(b"V3 Content 1"), "text/plain"))
|
|
]
|
|
|
|
try:
|
|
response = requests.post(url, headers=headers, files=files_dup, data=data)
|
|
print(f"Status: {response.status_code}")
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
file_data = result.get('data', [])[0]
|
|
print(f"Already Exists Flag: {file_data.get('already_exists')}")
|
|
assert file_data.get('already_exists') == True
|
|
print("✅ Deduplication logic verified.")
|
|
else:
|
|
print(f"❌ Failed: {response.text}")
|
|
except Exception as e:
|
|
print(f"💥 Exception: {e}")
|
|
|
|
# 3. Test Extension Rejection
|
|
print("\n[Test 3] Testing Extension Rejection...")
|
|
files_bad = [
|
|
("file_list", ("virus.exe", io.BytesIO(b"bad"), "application/octet-stream"))
|
|
]
|
|
params_restrict = {"allowed_extensions": ["txt"]}
|
|
|
|
try:
|
|
response = requests.post(url, headers=headers, files=files_bad, data=data, params=params_restrict)
|
|
print(f"Status: {response.status_code} (Expected 400)")
|
|
if response.status_code == 400:
|
|
print(f"✅ Success: correctly rejected .exe file.")
|
|
else:
|
|
print(f"❌ Failure: allowed .exe file with status {response.status_code}")
|
|
except Exception as e:
|
|
print(f"💥 Exception: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
test_v3_upload_flow()
|