import requests import io import json import os # --- Configuration --- API_BASE = "https://dev-api.oneskyit.com/v3/action" API_KEY = "PMM4n50teUCaOMMTN8qOJA" ACCOUNT_ID = "Q8lR8Ai8hx2FjbQ3C_EH1Q" # OSIT # Linking target for test files LINK_TYPE = "archive_content" LINK_ID = "bZOa7CtUm0E" def get_headers(): return { "X-Aether-API-Key": API_KEY, "x-account-id": ACCOUNT_ID, "x-no-account-id": "bypass" } def test_file_lifecycle(): print(f"--- Starting Hosted File Lifecycle Test ---") # 1. UPLOAD print("\n[Step 1] Uploading test file...") test_content = b"Lifecycle Test Content: " + os.urandom(8).hex().encode() files = [("file_list", ("lifecycle_test.txt", io.BytesIO(test_content), "text/plain"))] data = {"account_id": ACCOUNT_ID, "link_to_type": LINK_TYPE, "link_to_id": LINK_ID} up_resp = requests.post(f"{API_BASE}/hosted_file/upload", headers=get_headers(), files=files, data=data) if up_resp.status_code != 200: print(f" āŒ Upload Failed: {up_resp.text}") return False file_info = up_resp.json()['data'][0] file_id = file_info['id'] file_hash = file_info['hash_sha256'] subdir = file_hash[:2] expected_path = f"/srv/aether_api/srv/ae_hosted_files/{subdir}/{file_hash}.file" print(f" āœ… Uploaded: {file_id} (Hash: {file_hash[:10]}...)") print(f" šŸ” Expected physical path: {expected_path}") # 2. DOWNLOAD (Direct) print("\n[Step 2] Downloading by ID...") dl_resp = requests.get(f"{API_BASE}/hosted_file/{file_id}/download", headers=get_headers()) if dl_resp.status_code == 200 and dl_resp.content == test_content: print(f" āœ… Downloaded content matches original.") else: print(f" āŒ Download Failed or content mismatch. Status: {dl_resp.status_code}") print(f" Original: {test_content}") print(f" Received: {dl_resp.content}") return False # 3. DOWNLOAD (Hash + Query Auth) print("\n[Step 3] Downloading by Hash (Query Auth)...") hash_url = f"{API_BASE}/hosted_file/hash/{file_hash}/download?api_key={API_KEY}" h_dl_resp = requests.get(hash_url) if h_dl_resp.status_code == 200: print(f" āœ… Hash download successful.") else: print(f" āŒ Hash download failed: {h_dl_resp.status_code}") return False # 4. DELETE (Clean Cleanup) print("\n[Step 4] Deleting test file (rm_orphan=true)...") del_params = { "link_to_type": LINK_TYPE, "link_to_id": LINK_ID, "rm_orphan": "true", "method": "delete" } del_resp = requests.delete(f"{API_BASE}/hosted_file/{file_id}", headers=get_headers(), params=del_params) if del_resp.status_code == 200: print(f" āœ… Deletion request successful.") else: print(f" āŒ Deletion failed: {del_resp.text}") return False # 5. VERIFY DELETED print("\n[Step 5] Verifying record is gone...") check_resp = requests.get(f"https://dev-api.oneskyit.com/v3/crud/hosted_file/{file_id}", headers=get_headers()) if check_resp.status_code == 404: print(f" āœ… Success: Record purged from DB.") else: print(f" āŒ Failure: Record still exists after deletion.") return False return True if __name__ == "__main__": if test_file_lifecycle(): print("\nšŸŽ‰ HOSTED FILE LIFECYCLE VERIFIED!") else: print("\nāŒ LIFECYCLE TEST FAILED.") exit(1)