import requests import json import sys import os # Ensure we can import from the project root sys.path.append(os.getcwd()) API_BASE_URL = "https://dev-api.oneskyit.com/v3/crud" # Standard Agent API Key HEADERS = { "x-aether-api-key": "PMM4n50teUCaOMMTN8qOJA", "Content-Type": "application/json" } def test_post_vision_parity(): print("\n[Testing Post Vision Parity]") endpoint = f"{API_BASE_URL}/post/?limit=1" response = requests.get(endpoint, headers=HEADERS) if response.status_code == 200: data = response.json().get('data', []) if data: post = data[0] # Verify clean string IDs id_val = post.get('id') account_id = post.get('account_id') if isinstance(id_val, str) and isinstance(account_id, str): print(f"[✅ PASS] Clean string IDs found: id={id_val}, account_id={account_id}") else: print(f"[❌ FAIL] Integer IDs found in response: id={type(id_val)}, account_id={type(account_id)}") else: print("[⚠️ WARN] No post data found to verify.") else: print(f"[❌ FAIL] Request failed with status {response.status_code}") def test_post_comment_vision_parity(): print("\n[Testing Post Comment Vision Parity]") # Search for comments endpoint = f"{API_BASE_URL}/post_comment/search" query = {"and": []} # Generic search response = requests.post(endpoint, headers=HEADERS, json=query) if response.status_code == 200: data = response.json().get('data', []) if data: comment = data[0] # Verify clean string IDs and account context from view join id_val = comment.get('id') post_id = comment.get('post_id') account_id = comment.get('account_id') results = [] if isinstance(id_val, str): results.append("id") if isinstance(post_id, str): results.append("post_id") if isinstance(account_id, str): results.append("account_id") if len(results) == 3: print(f"[✅ PASS] All Vision IDs are strings: {', '.join(results)}") print(f" Values: id={id_val}, post_id={post_id}, account_id={account_id}") else: print(f"[❌ FAIL] Missing or integer IDs found: {comment}") else: print("[⚠️ WARN] No post_comment data found to verify.") else: print(f"[❌ FAIL] Request failed with status {response.status_code}") def test_cms_search_external_id(): print("\n[Testing CMS Search by external_person_id]") # We'll just verify the field is accepted by the search engine endpoint = f"{API_BASE_URL}/post/search" query = { "and": [ {"field": "external_person_id", "op": "is_not_null"} ] } response = requests.post(endpoint, headers=HEADERS, json=query) if response.status_code == 200: print("[✅ PASS] external_person_id is recognized as a searchable field for posts.") else: print(f"[❌ FAIL] Search failed with status {response.status_code}: {response.text}") if __name__ == "__main__": print("--- Starting CMS Vision Parity E2E Tests ---") test_post_vision_parity() test_post_comment_vision_parity() test_cms_search_external_id() print("\n--- CMS Tests Complete ---")