feat(v3-data-store): harden search security and standardize test suite

This commit is contained in:
Scott Idem
2026-02-09 19:03:04 -05:00
parent 9715d28bd6
commit 68e883ba98
3 changed files with 126 additions and 72 deletions

View File

@@ -242,34 +242,75 @@ async def search_v3_data_store_obj_w_code(
):
"""
Advanced Search for Data Store (within a code hierarchy).
Enforces Account Context isolation and uses ID Vision (v_data_store).
"""
if delay.sleep_time_s > 0: await asyncio.sleep(delay.sleep_time_s)
# 1. Resolve IDs
# 1. Resolve parent IDs if provided
resolved_for_id = None
if for_type and for_id:
resolved_for_id = redis_lookup_id_random(record_id_random=for_id, table_name=for_type)
if not resolved_for_id:
return mk_resp(data=False, status_code=404, status_message="Parent for_id not found.")
# 2. Apply advanced search on top of the standard lookup
# We use sql_select with the SearchQuery
sql_result = sql_select(
table_name='data_store',
enabled=status_filter.enabled,
hidden=status_filter.hidden,
search_query=search_query,
field_name='code',
field_value=data_store_code,
limit=pagination.limit,
offset=pagination.offset,
as_list=True,
# 2. Construct the hierarchical search SQL
# We must enforce that users only see their own account records OR global defaults (account_id IS NULL)
from app.db_sql import sql_enable_part, sql_hidden_part, sql_search_qry_part, sql_limit_offset_part
sql_enabled, data_enabled = sql_enable_part('data_store', status_filter.enabled)
sql_hidden, data_hidden = sql_hidden_part('data_store', status_filter.hidden)
# Generate search logic from the SearchQuery model
search_sql, search_data = sql_search_qry_part(
search_query=search_query,
table_name='v_data_store'
)
sql_limit = sql_limit_offset_part(limit=pagination.limit, offset=pagination.offset)
# Prepare parameter dictionary
data = {
'code': data_store_code,
'account_id': account.account_id,
'for_type': for_type,
'for_id': resolved_for_id
}
data.update(search_data)
if data_enabled is not None: data['enable'] = data_enabled
if data_hidden is not None: data['hide'] = data_hidden
# Hierarchical Fallback Logic: (Object Override > Account Override > Global System Default)
# This matches the GET lookup logic but allows multiple results via search.
sql = f"""
SELECT *
FROM `v_data_store` AS `data_store`
WHERE
(
`data_store`.account_id = :account_id
OR `data_store`.account_id IS NULL
OR (`data_store`.for_type = :for_type AND `data_store`.for_id = :for_id)
)
AND `data_store`.code = :code
{sql_enabled}
{sql_hidden}
{search_sql}
ORDER BY `data_store`.for_id DESC, `data_store`.account_id DESC, `data_store`.created_on DESC
{sql_limit};
"""
sql_result = sql_select(sql=sql, data=data, as_list=True)
if sql_result is False:
return mk_resp(data=False, status_code=500, status_message="Database error during search.")
return mk_resp(data=sql_result)
# 3. Parse results through Pydantic model to enforce ID Vision (random IDs only)
try:
data_objs = [Data_Store_Base(**r) for r in sql_result]
except Exception as e:
log.error(f"Validation error during Data Store search: {e}")
return mk_resp(data=False, status_code=500, status_message="Data integrity error during result mapping.")
return mk_resp(data=data_objs)
async def handle_get_data_store_obj_w_code(

View File

@@ -44,6 +44,16 @@ These consolidated scripts are the primary verification tool for the V3 API.
2. **Archiving**: When a new specialized test is created, check if it can be combined into one of the "Primary" suites above. If so, combine and move the original to `archive/`.
3. **Cleanup**: Always use `tests/e2e/cleanup_test_files.py` after running file lifecycle tests.
## 🛠️ Standards & Patterns
To maintain a "nice" and readable test suite, follow these patterns in all new Python E2E scripts:
- **Helper Functions**: Use a `print_result(label, success, message="")` helper to output standardized `✅ PASS` and `❌ FAIL` status lines.
- **Functional Isolation**: Wrap test scenarios in descriptive functions (e.g., `test_hierarchical_fallback()`) rather than writing monolithic scripts.
- **Vision Compliance**: Always verify that IDs in the response are strings (Random IDs) and not integers, to ensure "ID Vision" is working.
- **Performance Logging**: Include a suite timer at the end of the `if __name__ == "__main__":` block to track execution speed.
- **Clean Environment**: Do not leave temporary debug or local-only scripts in the `e2e/` directory.
---
## 🚀 How to Run

View File

@@ -1,6 +1,5 @@
import requests
import json
import argparse
import time
# --- Configuration ---
@@ -9,61 +8,61 @@ AGENT_API_KEY = "PMM4n50teUCaOMMTN8qOJA"
# Default Contexts for Testing
CONTEXTS = {
"account_1": "_XY7DXtc9MY",
"account_1": "_XY7DXtc9MY", # Standard Test Account
"account_5": "xFP7AhU8Zlc",
"event_1358": "nmBfuGFeR0k"
"event_1": "nmBfuGFeR0k"
}
def run_lookup(code, description, account_id=None, for_type=None, for_id=None, limit=1, delay_ms=0):
"""
Performs a Data Store lookup and prints standardized results.
"""
print(f"[V3] {description} (Limit: {limit}, Delay: {delay_ms}ms)")
def get_headers(account_id=None):
headers = {
"X-Aether-API-Key": AGENT_API_KEY,
"Content-Type": "application/json"
}
if account_id:
headers["x-account-id"] = account_id
else:
headers["x-no-account-id"] = "bypass"
return headers
def print_result(label, success, message=""):
"""Standardized output helper."""
status = "✅ PASS" if success else "❌ FAIL"
print(f"[{status}] {label} {message}")
def test_standard_lookup():
"""Verifies the cascading lookup (Object > Account > Global)."""
print("\n--- Testing Standard Cascading Lookup ---")
code = "event_launcher_main_info"
url = f"{BASE_URL}/v3/data_store/code/{code}"
params = {"for_type": for_type, "for_id": for_id, "limit": limit, "delay_ms": delay_ms}
# Test Account Lookup
resp = requests.get(url, headers=get_headers(CONTEXTS["account_1"]))
print_result("Lookup: Account Context", resp.status_code == 200)
# Test Object Override Lookup
params = {"for_type": "event", "for_id": CONTEXTS["event_1"]}
resp = requests.get(url, headers=get_headers(CONTEXTS["account_1"]), params=params)
print_result("Lookup: Object Context Override", resp.status_code == 200)
def test_delay_simulation():
"""Verifies X-Delay-ms header and delay_ms query param."""
print("\n--- Testing Latency Simulation ---")
code = "event_launcher_main_info"
url = f"{BASE_URL}/v3/data_store/code/{code}"
delay_ms = 500
start_time = time.time()
response = requests.get(url, headers=headers, params=params)
resp = requests.get(url, headers=get_headers(CONTEXTS["account_1"]), params={"delay_ms": delay_ms})
duration = (time.time() - start_time) * 1000
print(f" Status: {response.status_code} ({duration:.0f}ms)")
if response.status_code == 200:
data = response.json().get('data')
if data:
if isinstance(data, list):
print(f" Result: SUCCESS (List of {len(data)})")
else:
print(f" Result: SUCCESS (Single Object)")
else:
print(f" Result: NULL (No record found)")
else:
print(f" Result: ERROR - {response.text[:200]}")
print("-" * 60)
# Allow for some network overhead but check if it's at least the delay
print_result(f"Delay: query param ({delay_ms}ms)", resp.status_code == 200 and duration >= delay_ms)
def run_search(code, description):
"""
Tests the new POST /search endpoint for Data Store codes.
"""
print(f"[V3-SEARCH] {description}")
def test_advanced_search():
"""Verifies POST /search with hierarchical logic and ID Vision."""
print("\n--- Testing Advanced POST Search ---")
code = "event_launcher_main_info"
url = f"{BASE_URL}/v3/data_store/code/{code}/search"
headers = {
"X-Aether-API-Key": AGENT_API_KEY,
"x-no-account-id": "bypass",
"Content-Type": "application/json"
}
search_query = {
"and_filters": [
@@ -71,27 +70,31 @@ def run_search(code, description):
]
}
response = requests.post(url, headers=headers, json=search_query)
print(f" Status: {response.status_code}")
resp = requests.post(url, headers=get_headers(), json=search_query)
success = resp.status_code == 200
if response.status_code == 200:
data = response.json().get('data', [])
print(f" Result: SUCCESS (Found {len(data)} results via POST Search)")
else:
print(f" Result: ERROR - {response.text[:200]}")
print("-" * 60)
vision_ok = True
if success:
data = resp.json().get('data', [])
if data:
# Check ID Vision (strings, not ints)
item = data[0]
if not isinstance(item.get('id'), str) or not isinstance(item.get('account_id'), (str, type(None))):
vision_ok = False
print(f" ❌ ID Vision Failure: id={item.get('id')}, account_id={item.get('account_id')}")
print_result("Search: POST /search with ID Vision", success and vision_ok)
if __name__ == "__main__":
CODE = "event_launcher_main_info"
print(f"=== Aether Data Store V3 Parity Tester ===\n")
# 1. Standard Lookup
run_lookup(CODE, "Scenario: Standard Lookup", account_id=CONTEXTS["account_1"])
# 2. Delay Simulation
run_lookup(CODE, "Scenario: Delay Simulation (500ms)", account_id=CONTEXTS["account_1"], delay_ms=500)
# 3. Advanced Search (POST)
run_search(CODE, "Scenario: Advanced Search via POST")
print("\nTests Complete.")
print(f"=== Aether Data Store V3 E2E Suite ===")
print(f"Target: {BASE_URL}")
start_time = time.time()
try:
test_standard_lookup()
test_delay_simulation()
test_advanced_search()
except Exception as e:
print(f"💥 Suite Error: {e}")
print(f"\nSuite completed in {time.time() - start_time:.2f}s")