feat(site_domain): restore access_key enforcement for FQDN lookups
- api_crud_v3: strip falsy access_key values; restrict keyless queries to public domains (both site_access_key and site_domain_access_key must be NULL/empty); 75-line recursive block replaced with ~16 lines - lib_sql_search: expand virtual 'access_key' field into priority SQL — site_access_key first, site_domain_access_key as fallback - cms.py: add site_domain_access_key to site_domain searchable_fields - docs: update frontend guide with access key behavior and examples - e2e test: expand to cover all valid/invalid access key scenarios (15/15) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -6,13 +6,13 @@ import os
|
||||
# --- Configuration ---
|
||||
API_ROOT = "https://dev-api.oneskyit.com"
|
||||
# Using the key provided in your examples
|
||||
API_KEY = "dFP6J9DVj9hUgIMn-fNIqg"
|
||||
API_KEY = "dFP6J9DVj9hUgIMn-fNIqg"
|
||||
# A known private journal ID from account 1
|
||||
PRIVATE_JOURNAL_ID = "SWFK-48-89-90"
|
||||
# A known public object type/ID
|
||||
PUBLIC_FQDN = "dev-app.oneskyit.com"
|
||||
# A known valid account ID random for testing restoration of access
|
||||
VALID_ACCOUNT_ID_RAND = "_XY7DXtc9MY"
|
||||
VALID_ACCOUNT_ID_RAND = "_XY7DXtc9MY"
|
||||
|
||||
def print_result(label, success, message=""):
|
||||
status = "✅ PASS" if success else "❌ FAIL"
|
||||
@@ -24,13 +24,13 @@ def test_hardened_search_leak():
|
||||
url = f"{API_ROOT}/v3/crud/journal/search"
|
||||
headers = {"x-aether-api-key": API_KEY}
|
||||
# NO account header, NO JWT
|
||||
payload = {"and": []}
|
||||
|
||||
payload = {"and": []}
|
||||
|
||||
resp = requests.post(url, headers=headers, json=payload)
|
||||
|
||||
|
||||
if resp.status_code == 200:
|
||||
data = resp.json().get('data', [])
|
||||
# Should be 0 because all journals in DB have an account_id,
|
||||
# Should be 0 because all journals in DB have an account_id,
|
||||
# and we are now strictly filtering for account_id IS NULL.
|
||||
success = (len(data) == 0)
|
||||
print_result("Leak Blocked (Journal Search)", success, f"- Found {len(data)} records (Expected 0)")
|
||||
@@ -43,9 +43,9 @@ def test_strict_id_block():
|
||||
url = f"{API_ROOT}/v3/crud/journal/{PRIVATE_JOURNAL_ID}"
|
||||
headers = {"x-aether-api-key": API_KEY}
|
||||
# NO account header, NO JWT
|
||||
|
||||
|
||||
resp = requests.get(url, headers=headers)
|
||||
|
||||
|
||||
success = (resp.status_code == 403)
|
||||
print_result("Access Denied (Journal GET ID)", success, f"- Status: {resp.status_code} (Expected 403)")
|
||||
|
||||
@@ -55,9 +55,9 @@ def test_bootstrap_exception():
|
||||
url = f"{API_ROOT}/v3/crud/site_domain/search"
|
||||
headers = {"x-aether-api-key": API_KEY}
|
||||
payload = {"and": [{"field": "fqdn", "op": "eq", "value": PUBLIC_FQDN}]}
|
||||
|
||||
|
||||
resp = requests.post(url, headers=headers, json=payload)
|
||||
|
||||
|
||||
success = (resp.status_code == 200 and len(resp.json().get('data', [])) > 0)
|
||||
print_result("Bootstrap Allowed (Site Domain)", success, f"- Status: {resp.status_code}")
|
||||
|
||||
@@ -69,22 +69,82 @@ def test_restored_access():
|
||||
"x-aether-api-key": API_KEY,
|
||||
"x-account-id": VALID_ACCOUNT_ID_RAND
|
||||
}
|
||||
|
||||
|
||||
resp = requests.get(url, headers=headers)
|
||||
|
||||
|
||||
success = (resp.status_code == 200)
|
||||
print_result("Access Restored (Journal with Header)", success, f"- Status: {resp.status_code}")
|
||||
|
||||
|
||||
def test_site_domain_access_key():
|
||||
"""
|
||||
Verify site_domain lookup respects access_key.
|
||||
|
||||
The frontend reads the 'key' query param from the browser URL and forwards it
|
||||
as 'access_key' in the POST body. No key means a public domain is expected.
|
||||
|
||||
Valid (should return a result):
|
||||
https://dev-demo.oneskyit.com — public, no key needed
|
||||
http://idaa.localhost:5173/?key=restricted — correct key
|
||||
https://dev-idaa.oneskyit.com/?key=restricted-access — correct key
|
||||
https://sk-idaa.oneskyit.com/?key=8VTOJ0X5hvT6JdiTJsGEzQ — correct key
|
||||
|
||||
Invalid (should return empty):
|
||||
http://idaa.localhost:5173/ — key required, none given
|
||||
http://idaa.localhost:5173/?key=bad-key-example — wrong key
|
||||
https://dev-idaa.oneskyit.com/ — key required, none given
|
||||
https://dev-idaa.oneskyit.com/?key= — empty key treated as none
|
||||
https://dev-idaa.oneskyit.com/?key=any-wrong-key — wrong key
|
||||
https://sk-idaa.oneskyit.com/ — key required, none given
|
||||
https://sk-idaa.oneskyit.com/?key=another-bad-key-example — wrong key
|
||||
"""
|
||||
print("\n--- Test 5: Site Domain Access Key Behavior ---")
|
||||
url = f"{API_ROOT}/v3/crud/site_domain/search"
|
||||
headers = {"x-aether-api-key": API_KEY}
|
||||
|
||||
cases = [
|
||||
# (fqdn, key, should_pass, label)
|
||||
# --- valid ---
|
||||
("dev-demo.oneskyit.com", None, True, "public domain, no key"),
|
||||
("idaa.localhost:5173", "restricted", True, "correct key"),
|
||||
("dev-idaa.oneskyit.com", "restricted-access", True, "correct key"),
|
||||
("sk-idaa.oneskyit.com", "8VTOJ0X5hvT6JdiTJsGEzQ", True, "correct key"),
|
||||
# --- invalid ---
|
||||
("idaa.localhost:5173", None, False, "key required, none given"),
|
||||
("idaa.localhost:5173", "bad-key-example", False, "wrong key"),
|
||||
("dev-idaa.oneskyit.com", None, False, "key required, none given"),
|
||||
("dev-idaa.oneskyit.com", "", False, "empty key treated as none"),
|
||||
("dev-idaa.oneskyit.com", "any-wrong-key", False, "wrong key"),
|
||||
("sk-idaa.oneskyit.com", None, False, "key required, none given"),
|
||||
("sk-idaa.oneskyit.com", "another-bad-key-example", False, "wrong key"),
|
||||
]
|
||||
|
||||
for fqdn, key, should_pass, label in cases:
|
||||
payload = {"and": [{"field": "fqdn", "op": "eq", "value": fqdn}]}
|
||||
# Omit access_key entirely when None (no key in URL); send it when present (even if empty)
|
||||
if key is not None:
|
||||
payload["and"].append({"field": "access_key", "op": "eq", "value": key})
|
||||
|
||||
try:
|
||||
resp = requests.post(url, headers=headers, json=payload)
|
||||
data = resp.json().get('data', []) if resp.status_code == 200 else []
|
||||
success = (resp.status_code == 200 and ((len(data) > 0) == should_pass))
|
||||
tag = "VALID " if should_pass else "INVALID"
|
||||
print_result(f"[{tag}] {fqdn} key={key!r:30} ({label})", success, f"- Count: {len(data)}")
|
||||
except Exception as e:
|
||||
print_result(f"[{'VALID ' if should_pass else 'INVALID'}] {fqdn} key={key!r}", False, f"- Exception: {e}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(f"Starting V3 Security Hardening Verification")
|
||||
print(f"Target: {API_ROOT}")
|
||||
|
||||
|
||||
try:
|
||||
test_hardened_search_leak()
|
||||
test_strict_id_block()
|
||||
test_bootstrap_exception()
|
||||
test_restored_access()
|
||||
test_site_domain_access_key()
|
||||
except Exception as e:
|
||||
print(f"\n❌ ERROR during test execution: {e}")
|
||||
|
||||
|
||||
print("\nVerification completed.")
|
||||
|
||||
Reference in New Issue
Block a user