chore(tests): reorganize test suite and archive redundant scripts
- Moved legacy/redundant tests to tests/archive/. - Relocated root-level debug scripts to tests/integration/. - Updated tests/README.md with final organized inventory. - Cleaned up root directory from one-off reproduction scripts.
This commit is contained in:
52
tests/archive/repro_security_bypass.py
Normal file
52
tests/archive/repro_security_bypass.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import requests
|
||||
import sys
|
||||
|
||||
BASE_URL = "http://dev-api.oneskyit.com" # Standard Dev Domain
|
||||
OBJ_TYPE = "journal"
|
||||
ENDPOINT = f"{BASE_URL}/v3/crud/{OBJ_TYPE}/schema"
|
||||
|
||||
# Replace with the key found from DB query
|
||||
VALID_API_KEY = "dummy_key_placeholder"
|
||||
|
||||
def test_request(description, headers, expected_status):
|
||||
print(f"Testing: {description}")
|
||||
try:
|
||||
response = requests.get(ENDPOINT, headers=headers, timeout=10)
|
||||
status = response.status_code
|
||||
result = "PASS" if status == expected_status else "FAIL"
|
||||
print(f" Status: {status} (Expected: {expected_status}) -> {result}")
|
||||
if result == "FAIL":
|
||||
print(f" Response: {response.text[:200]}...")
|
||||
return result == "PASS"
|
||||
except Exception as e:
|
||||
print(f" Error: {e}")
|
||||
return False
|
||||
|
||||
def main():
|
||||
if len(sys.argv) > 1:
|
||||
global VALID_API_KEY
|
||||
VALID_API_KEY = sys.argv[1]
|
||||
|
||||
print(f"--- Security Bypass Test (Target: {ENDPOINT}) ---")
|
||||
|
||||
# Case 1: No Auth
|
||||
# Expected: 403 Forbidden (Account context required)
|
||||
test_request("No Auth Headers", {}, 403)
|
||||
|
||||
# Case 2: Vulnerability Check (Bypass Header Only)
|
||||
# AFTER FIX: Expected 403 (Protected)
|
||||
test_request("Bypass Header Only (Vulnerability Check)", {"x-no-account-id": "bypass"}, 403)
|
||||
|
||||
# Case 3: Invalid API Key + Bypass Header
|
||||
# Expected: 403 Forbidden
|
||||
test_request("Bypass Header + Invalid Key", {"x-no-account-id": "bypass", "x-aether-api-key": "invalid-key-12345"}, 403)
|
||||
|
||||
# Case 4: Valid API Key + Bypass Header
|
||||
# Expected: 200 OK
|
||||
if VALID_API_KEY != "dummy_key_placeholder":
|
||||
test_request("Bypass Header + Valid Key", {"x-no-account-id": "bypass", "x-aether-api-key": VALID_API_KEY}, 200)
|
||||
else:
|
||||
print("Skipping Case 4 (No Valid API Key provided)")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
78
tests/archive/test_e2e_hosted_file_live_delete.py
Normal file
78
tests/archive/test_e2e_hosted_file_live_delete.py
Normal file
@@ -0,0 +1,78 @@
|
||||
import requests
|
||||
import json
|
||||
|
||||
# Configuration
|
||||
BASE_URL = "https://dev-api.oneskyit.com"
|
||||
API_KEY = "IDF68Em5X4HTZlswRNgepQ"
|
||||
ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q"
|
||||
LINK_TO_TYPE = "archive_content"
|
||||
LINK_TO_ID = "bZOa7CtUm0E"
|
||||
|
||||
# IDs created in the previous turn
|
||||
FILE_IDS = ['2R06T6yuQLw', 'dG60rqQK-mA', 'sKMzAD5mlwo', 'Ob-voYn7SkY']
|
||||
|
||||
def delete_file(file_id, rm_orphan=False):
|
||||
print(f"\n--- Deleting File ID: {file_id} (rm_orphan={rm_orphan}) ---")
|
||||
|
||||
url = f"{BASE_URL}/hosted_file/{file_id}"
|
||||
params = {
|
||||
"link_to_type": LINK_TO_TYPE,
|
||||
"link_to_id": LINK_TO_ID,
|
||||
"rm_orphan": str(rm_orphan).lower()
|
||||
}
|
||||
headers = {
|
||||
"X-Aether-API-Key": API_KEY,
|
||||
"x-account-id": ACCOUNT_ID
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.delete(url, headers=headers, params=params)
|
||||
print(f"Status Code: {response.status_code}")
|
||||
if response.status_code == 200:
|
||||
print(f"✅ Success: {response.json().get('status_message')}")
|
||||
return True
|
||||
else:
|
||||
print(f"❌ Failed: {response.text}")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"💥 Exception: {e}")
|
||||
return False
|
||||
|
||||
def verify_file_exists(file_id):
|
||||
url = f"{BASE_URL}/hosted_file/{file_id}"
|
||||
headers = {
|
||||
"X-Aether-API-Key": API_KEY,
|
||||
"x-account-id": ACCOUNT_ID
|
||||
}
|
||||
response = requests.get(url, headers=headers)
|
||||
return response.status_code == 200
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Starting Live E2E Deletion Tests...")
|
||||
|
||||
# 1. Delete one link WITHOUT rm_orphan
|
||||
# The record should still exist in the DB (but link is gone)
|
||||
target_1 = FILE_IDS[0]
|
||||
if delete_file(target_1, rm_orphan=False):
|
||||
exists = verify_file_exists(target_1)
|
||||
print(f"Verification: File record exists after link-only delete? {exists}")
|
||||
if not exists:
|
||||
print("⚠️ Warning: File record was deleted even though rm_orphan=False (or it was already missing).")
|
||||
|
||||
# 2. Delete one link WITH rm_orphan=True
|
||||
# Since these files only have one link, the record and physical file should be deleted.
|
||||
target_2 = FILE_IDS[1]
|
||||
if delete_file(target_2, rm_orphan=True):
|
||||
exists = verify_file_exists(target_2)
|
||||
print(f"Verification: File record exists after orphan delete? {exists}")
|
||||
if exists:
|
||||
print("❌ Failure: File record still exists after rm_orphan=True.")
|
||||
else:
|
||||
print("✅ Success: File record was correctly removed.")
|
||||
|
||||
# 3. Cleanup the rest
|
||||
print("\n--- Cleaning up remaining files ---")
|
||||
for fid in FILE_IDS[2:]:
|
||||
delete_file(fid, rm_orphan=True)
|
||||
|
||||
print("\nTests Complete.")
|
||||
73
tests/archive/test_e2e_hosted_file_live_upload.py
Normal file
73
tests/archive/test_e2e_hosted_file_live_upload.py
Normal file
@@ -0,0 +1,73 @@
|
||||
import requests
|
||||
import io
|
||||
import json
|
||||
|
||||
# Configuration
|
||||
BASE_URL = "https://dev-api.oneskyit.com"
|
||||
API_KEY = "IDF68Em5X4HTZlswRNgepQ"
|
||||
ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q"
|
||||
LINK_TO_TYPE = "archive_content"
|
||||
LINK_TO_ID = "bZOa7CtUm0E"
|
||||
|
||||
def test_live_file_uploads():
|
||||
print(f"--- Starting Live E2E Upload Tests against {BASE_URL} ---")
|
||||
|
||||
headers = {
|
||||
"X-Aether-API-Key": API_KEY,
|
||||
"x-account-id": ACCOUNT_ID # Route expects this as a header
|
||||
}
|
||||
|
||||
# 1. Single File Upload
|
||||
print("\n[Test 1] Single File Upload...")
|
||||
files = [
|
||||
("file_list", ("e2e_test_single.txt", io.BytesIO(b"Live E2E Single Upload Test Content"), "text/plain"))
|
||||
]
|
||||
data = {
|
||||
"account_id": ACCOUNT_ID,
|
||||
"link_to_type": LINK_TO_TYPE,
|
||||
"link_to_id": LINK_TO_ID
|
||||
}
|
||||
|
||||
url = f"{BASE_URL}/hosted_file/upload_files"
|
||||
|
||||
try:
|
||||
response = requests.post(url, headers=headers, files=files, data=data)
|
||||
print(f"Status: {response.status_code}")
|
||||
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
file_data = result.get('data', [])[0]
|
||||
print(f"✅ Success! Created hosted_file_id: {file_data.get('id')}")
|
||||
print(f"Response snippet: {json.dumps(file_data, indent=2)[:200]}...")
|
||||
else:
|
||||
print(f"❌ Failed: {response.text}")
|
||||
return
|
||||
except Exception as e:
|
||||
print(f"💥 Exception: {e}")
|
||||
return
|
||||
|
||||
# 2. Triple File Upload
|
||||
print("\n[Test 2] Triple File Upload...")
|
||||
files = [
|
||||
("file_list", ("e2e_multi_1.txt", io.BytesIO(b"Content 1"), "text/plain")),
|
||||
("file_list", ("e2e_multi_2.txt", io.BytesIO(b"Content 2"), "text/plain")),
|
||||
("file_list", ("e2e_multi_3.txt", io.BytesIO(b"Content 3"), "text/plain")),
|
||||
]
|
||||
|
||||
try:
|
||||
response = requests.post(url, headers=headers, files=files, data=data)
|
||||
print(f"Status: {response.status_code}")
|
||||
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
data_list = result.get('data', [])
|
||||
print(f"✅ Success! Uploaded {len(data_list)} files.")
|
||||
for i, f in enumerate(data_list):
|
||||
print(f" File {i+1} ID: {f.get('id')} | Name: {f.get('filename')}")
|
||||
else:
|
||||
print(f"❌ Failed: {response.text}")
|
||||
except Exception as e:
|
||||
print(f"💥 Exception: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_live_file_uploads()
|
||||
25
tests/archive/test_e2e_legacy_remote_schema.py
Normal file
25
tests/archive/test_e2e_legacy_remote_schema.py
Normal file
@@ -0,0 +1,25 @@
|
||||
import requests
|
||||
import json
|
||||
|
||||
BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
|
||||
ACCOUNT_ID = "nqOzejLCDXM"
|
||||
|
||||
def get_headers():
|
||||
return {"X-Account-ID": ACCOUNT_ID}
|
||||
|
||||
def check_schema(obj):
|
||||
print(f"--- Schema for {obj} ---")
|
||||
url = f"{BASE_URL}/{obj}/schema"
|
||||
r = requests.get(url, headers=get_headers())
|
||||
if r.status_code == 200:
|
||||
data = r.json()['data']
|
||||
cols = [c['field'] for col in [data['database']['columns']] for c in col]
|
||||
print(f"Columns: {cols}")
|
||||
fields = list(data['model']['fields'].keys())
|
||||
print(f"Pydantic Fields: {fields}")
|
||||
else:
|
||||
print(f"Error {r.status_code}: {r.text}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
check_schema("site")
|
||||
check_schema("site_domain")
|
||||
44
tests/archive/test_e2e_v3_action_scaffold.py
Normal file
44
tests/archive/test_e2e_v3_action_scaffold.py
Normal file
@@ -0,0 +1,44 @@
|
||||
import requests
|
||||
import json
|
||||
|
||||
# Configuration
|
||||
BASE_URL = "https://dev-api.oneskyit.com/v3/action/hosted_file"
|
||||
API_KEY = "IDF68Em5X4HTZlswRNgepQ"
|
||||
ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q"
|
||||
|
||||
def test_scaffold_reachability():
|
||||
print("--- Testing V3 Action Router Scaffold Reachability ---")
|
||||
|
||||
headers = {
|
||||
"X-Aether-API-Key": API_KEY,
|
||||
"x-account-id": ACCOUNT_ID
|
||||
}
|
||||
|
||||
# 1. Test Upload Scaffold
|
||||
print("\n[1] Testing Upload Action Reachability...")
|
||||
files = [("file_list", ("test.txt", b"content", "text/plain"))]
|
||||
data = {
|
||||
"account_id": ACCOUNT_ID,
|
||||
"link_to_type": "archive_content",
|
||||
"link_to_id": "bZOa7CtUm0E8hx2FjbQ3C_"
|
||||
}
|
||||
resp = requests.post(f"{BASE_URL}/upload", headers=headers, files=files, data=data)
|
||||
print(f"Status: {resp.status_code}")
|
||||
if resp.status_code == 200:
|
||||
print(f"✅ Success: {resp.json().get('status_message')}")
|
||||
else:
|
||||
print(f"❌ Failed: {resp.text}")
|
||||
|
||||
# 2. Test Download Scaffold with Delay
|
||||
print("\n[2] Testing Download Action Reachability (with 500ms delay)...")
|
||||
headers_w_delay = headers.copy()
|
||||
headers_w_delay["X-Delay-ms"] = "500"
|
||||
resp = requests.get(f"{BASE_URL}/some_file_id/download", headers=headers_w_delay)
|
||||
print(f"Status: {resp.status_code}")
|
||||
if resp.status_code == 200:
|
||||
print(f"✅ Success: {resp.json().get('status_message')}")
|
||||
else:
|
||||
print(f"❌ Failed: {resp.text}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_scaffold_reachability()
|
||||
53
tests/archive/test_e2e_v3_hash_download.py
Normal file
53
tests/archive/test_e2e_v3_hash_download.py
Normal file
@@ -0,0 +1,53 @@
|
||||
import requests
|
||||
|
||||
# Configuration
|
||||
# The hash from your previous message
|
||||
FILE_HASH = "384fef832fa07b347a1d70927d0606b24cf41771202c1bfa00d7026764db2bb2"
|
||||
BASE_URL = f"https://dev-api.oneskyit.com/v3/action/hosted_file/hash/{FILE_HASH}/download"
|
||||
API_KEY = "PMM4n50teUCaOMMTN8qOJA"
|
||||
|
||||
def test_hash_download_query_param():
|
||||
print(f"--- Testing Hash Download via Query Param API Key ---")
|
||||
|
||||
# Passing api_key in query string as requested
|
||||
url = f"{BASE_URL}?api_key={API_KEY}"
|
||||
|
||||
try:
|
||||
response = requests.get(url)
|
||||
print(f"Status: {response.status_code}")
|
||||
|
||||
if response.status_code == 200:
|
||||
print("✅ Success: File downloaded by hash!")
|
||||
print(f" Content-Length: {response.headers.get('Content-Length')}")
|
||||
return True
|
||||
else:
|
||||
print(f"❌ Failed: {response.text}")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"💥 Exception: {e}")
|
||||
return False
|
||||
|
||||
def test_hash_download_header():
|
||||
print(f"\n--- Testing Hash Download via Header API Key ---")
|
||||
|
||||
headers = {"X-Aether-API-Key": API_KEY}
|
||||
|
||||
try:
|
||||
response = requests.get(BASE_URL, headers=headers)
|
||||
print(f"Status: {response.status_code}")
|
||||
|
||||
if response.status_code == 200:
|
||||
print("✅ Success: File downloaded by hash via header!")
|
||||
return True
|
||||
else:
|
||||
print(f"❌ Failed: {response.status_code}")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"💥 Exception: {e}")
|
||||
return False
|
||||
|
||||
if __name__ == "__main__":
|
||||
s1 = test_hash_download_query_param()
|
||||
s2 = test_hash_download_header()
|
||||
if s1 and s2:
|
||||
print("\n🎉 HASH DOWNLOAD TESTS PASSED!")
|
||||
63
tests/archive/test_unit_email_safe.py
Normal file
63
tests/archive/test_unit_email_safe.py
Normal file
@@ -0,0 +1,63 @@
|
||||
import sys
|
||||
import os
|
||||
import logging
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
# Add current directory to path
|
||||
sys.path.append(os.getcwd())
|
||||
|
||||
# Mock app.config BEFORE imports
|
||||
mock_config = MagicMock()
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.SMTP = {}
|
||||
mock_config.settings = mock_settings
|
||||
sys.modules['app.config'] = mock_config
|
||||
|
||||
# Mock html2text before importing app.lib_email
|
||||
sys.modules['html2text'] = MagicMock()
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
|
||||
from app.lib_email import send_email
|
||||
from app.config import settings
|
||||
|
||||
class TestEmail(unittest.TestCase):
|
||||
def test_send_email_missing_settings(self):
|
||||
print("\nTesting send_email with missing SMTP settings...")
|
||||
# settings is already mocked to have an empty SMTP dict
|
||||
result = send_email(
|
||||
from_email="test@example.com",
|
||||
to_email="test@example.com",
|
||||
subject="Test Email",
|
||||
body_html="<p>Test</p>",
|
||||
test=True
|
||||
)
|
||||
|
||||
print(f"Result (should be False): {result}")
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_send_email_invalid_port(self):
|
||||
print("\nTesting send_email with invalid port...")
|
||||
|
||||
settings.SMTP = {
|
||||
'server': 'smtp.example.com',
|
||||
'port': 'invalid',
|
||||
'username': 'user',
|
||||
'password': 'pass'
|
||||
}
|
||||
|
||||
result = send_email(
|
||||
from_email="test@example.com",
|
||||
to_email="test@example.com",
|
||||
subject="Test Email",
|
||||
body_html="<p>Test</p>",
|
||||
test=True
|
||||
)
|
||||
|
||||
print(f"Result (should be False): {result}")
|
||||
self.assertFalse(result)
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user