Files
OSIT-AE-API-FastAPI/tests/e2e/test_e2e_v3_cms_vision_parity.py
2026-02-05 19:28:03 -05:00

90 lines
3.4 KiB
Python

import requests
import json
import sys
import os
# Ensure we can import from the project root
sys.path.append(os.getcwd())
API_BASE_URL = "https://dev-api.oneskyit.com/v3/crud"
# Standard Agent API Key
HEADERS = {
"x-aether-api-key": "PMM4n50teUCaOMMTN8qOJA",
"Content-Type": "application/json"
}
def test_post_vision_parity():
print("\n[Testing Post Vision Parity]")
endpoint = f"{API_BASE_URL}/post/?limit=1"
response = requests.get(endpoint, headers=HEADERS)
if response.status_code == 200:
data = response.json().get('data', [])
if data:
post = data[0]
# Verify clean string IDs
id_val = post.get('id')
account_id = post.get('account_id')
if isinstance(id_val, str) and isinstance(account_id, str):
print(f"[✅ PASS] Clean string IDs found: id={id_val}, account_id={account_id}")
else:
print(f"[❌ FAIL] Integer IDs found in response: id={type(id_val)}, account_id={type(account_id)}")
else:
print("[⚠️ WARN] No post data found to verify.")
else:
print(f"[❌ FAIL] Request failed with status {response.status_code}")
def test_post_comment_vision_parity():
print("\n[Testing Post Comment Vision Parity]")
# Search for comments
endpoint = f"{API_BASE_URL}/post_comment/search"
query = {"and": []} # Generic search
response = requests.post(endpoint, headers=HEADERS, json=query)
if response.status_code == 200:
data = response.json().get('data', [])
if data:
comment = data[0]
# Verify clean string IDs and account context from view join
id_val = comment.get('id')
post_id = comment.get('post_id')
account_id = comment.get('account_id')
results = []
if isinstance(id_val, str): results.append("id")
if isinstance(post_id, str): results.append("post_id")
if isinstance(account_id, str): results.append("account_id")
if len(results) == 3:
print(f"[✅ PASS] All Vision IDs are strings: {', '.join(results)}")
print(f" Values: id={id_val}, post_id={post_id}, account_id={account_id}")
else:
print(f"[❌ FAIL] Missing or integer IDs found: {comment}")
else:
print("[⚠️ WARN] No post_comment data found to verify.")
else:
print(f"[❌ FAIL] Request failed with status {response.status_code}")
def test_cms_search_external_id():
print("\n[Testing CMS Search by external_person_id]")
# We'll just verify the field is accepted by the search engine
endpoint = f"{API_BASE_URL}/post/search"
query = {
"and": [
{"field": "external_person_id", "op": "is_not_null"}
]
}
response = requests.post(endpoint, headers=HEADERS, json=query)
if response.status_code == 200:
print("[✅ PASS] external_person_id is recognized as a searchable field for posts.")
else:
print(f"[❌ FAIL] Search failed with status {response.status_code}: {response.text}")
if __name__ == "__main__":
print("--- Starting CMS Vision Parity E2E Tests ---")
test_post_vision_parity()
test_post_comment_vision_parity()
test_cms_search_external_id()
print("\n--- CMS Tests Complete ---")