from fastapi import APIRouter, Depends, HTTPException, Query import os import platform import json import shutil from typing import Dict, Any, List, Optional from datetime import datetime from app.lib_general_v3 import AccountContext, get_account_context from app.models.response_models import Resp_Body_Base, mk_resp router = APIRouter() def is_admin(account: AccountContext): if account.auth_method == 'bypass': return True if getattr(account, "administrator", False) or getattr(account, "manager", False): return True return False @router.get("/status", response_model=Resp_Body_Base, tags=["Agent Bridge"]) async def get_container_status( account: AccountContext = Depends(get_account_context) ): """ Returns diagnostic information about the container environment. Only accessible to administrators/managers via existing hierarchy or bypass. """ # Simple check for administrative access or bypass if not is_admin(account): raise HTTPException(status_code=403, detail="Administrative access required.") boot_time = None try: import psutil boot_time = datetime.fromtimestamp(psutil.boot_time()).isoformat() except Exception as e: boot_time = f"Error: {str(e)}" status_data = { "os": platform.system(), "release": platform.release(), "python_version": platform.python_version(), "hostname": platform.node(), "cpu_count": os.cpu_count(), "environment_vars": {k: v for k, v in os.environ.items() if not any(s in k.upper() for s in ["PASSWORD", "KEY", "SECRET", "AUTH", "TOKEN"])}, "cwd": os.getcwd(), "container": os.path.exists('/.dockerenv'), "boot_time": boot_time } return mk_resp(data=status_data) @router.get("/system/usage", response_model=Resp_Body_Base, tags=["Agent Bridge"]) async def get_system_usage( account: AccountContext = Depends(get_account_context) ): """ Returns real-time CPU, Memory, and Disk usage. Only accessible to administrators/managers via existing hierarchy or bypass. """ if not is_admin(account): raise HTTPException(status_code=403, detail="Administrative access required.") cpu_data = {"error": "psutil not available"} mem_data = {"error": "psutil not available"} try: import psutil # CPU usage per core cpu_percent = psutil.cpu_percent(interval=0.1, percpu=True) cpu_data = { "percent_avg": sum(cpu_percent) / len(cpu_percent) if cpu_percent else 0, "percent_per_core": cpu_percent, "count": psutil.cpu_count(), "load_avg": os.getloadavg() if hasattr(os, 'getloadavg') else None } # Memory usage mem = psutil.virtual_memory() mem_data = { "total": mem.total, "available": mem.available, "percent": mem.percent, "used": mem.used } except Exception as e: cpu_data = {"error": str(e)} mem_data = {"error": str(e)} # Disk usage (root) - uses shutil which is standard lib disk_data = {} try: disk = shutil.disk_usage("/") disk_data = { "total": disk.total, "used": disk.used, "free": disk.free, "percent": (disk.used / disk.total) * 100 if disk.total else 0 } except Exception as e: disk_data = {"error": str(e)} usage_data = { "cpu": cpu_data, "memory": mem_data, "disk": disk_data } return mk_resp(data=usage_data) @router.get("/logs", response_model=Resp_Body_Base, tags=["Agent Bridge"]) async def get_latest_logs( lines: int = 50, log_file: str = "aether_api.log", account: AccountContext = Depends(get_account_context) ): """ Returns the last N lines of a specified log file. Only accessible to administrators/managers via existing hierarchy or bypass. """ if not is_admin(account): raise HTTPException(status_code=403, detail="Administrative access required.") # Sanitize log_file to prevent directory traversal log_file = os.path.basename(log_file) from app.config import settings log_dir = os.path.dirname(settings.LOG_PATH.get('app', '/logs/aether_api.log')) log_path = os.path.join(log_dir, log_file) if not os.path.exists(log_path): return mk_resp(data=False, status_message=f"Log file not found at {log_path}", status_code=404) try: # Using tail if available for efficiency import subprocess result = subprocess.run(['tail', f'-n {lines}', log_path], capture_output=True, text=True) if result.returncode == 0: return mk_resp(data=result.stdout) else: raise Exception(result.stderr) except Exception as e: # Fallback to python read if tail fails try: with open(log_path, 'r') as f: log_lines = f.readlines() latest = log_lines[-lines:] if len(log_lines) > lines else log_lines return mk_resp(data="".join(latest)) except Exception as inner_e: return mk_resp(data=False, status_message=f"Error reading logs: {str(e)} | {str(inner_e)}", status_code=500) @router.get("/logs/list", response_model=Resp_Body_Base, tags=["Agent Bridge"]) async def list_log_files( account: AccountContext = Depends(get_account_context) ): """ Lists available log files in the log directory. Only accessible to administrators/managers via existing hierarchy or bypass. """ if not is_admin(account): raise HTTPException(status_code=403, detail="Administrative access required.") from app.config import settings log_dir = os.path.dirname(settings.LOG_PATH.get('app', '/logs/aether_api.log')) if not os.path.exists(log_dir): return mk_resp(data=[], status_message=f"Log directory not found at {log_dir}") files = [] for f in os.listdir(log_dir): path = os.path.join(log_dir, f) if os.path.isfile(path): stats = os.stat(path) files.append({ "name": f, "size": stats.st_size, "modified": datetime.fromtimestamp(stats.st_mtime).isoformat() }) return mk_resp(data=files) @router.get("/processes", response_model=Resp_Body_Base, tags=["Agent Bridge"]) async def list_processes( limit: int = 10, sort_by: str = Query("cpu", enum=["cpu", "memory"]), account: AccountContext = Depends(get_account_context) ): """ Lists top processes by CPU or Memory usage. Only accessible to administrators/managers via existing hierarchy or bypass. """ if not is_admin(account): raise HTTPException(status_code=403, detail="Administrative access required.") procs = [] try: import psutil for proc in psutil.process_iter(['pid', 'name', 'username', 'cpu_percent', 'memory_info']): try: pinfo = proc.info pinfo['memory_rss'] = pinfo['memory_info'].rss if pinfo.get('memory_info') else 0 procs.append(pinfo) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass except Exception as e: return mk_resp(data=[], status_message=f"Error listing processes: {str(e)}", status_code=500) if sort_by == "cpu": procs.sort(key=lambda x: x['cpu_percent'], reverse=True) else: procs.sort(key=lambda x: x['memory_rss'], reverse=True) return mk_resp(data=procs[:limit]) @router.get("/container/metadata", response_model=Resp_Body_Base, tags=["Agent Bridge"]) async def get_container_metadata( account: AccountContext = Depends(get_account_context) ): """ Attempts to gather Docker-specific metadata from the environment. Only accessible to administrators/managers via existing hierarchy or bypass. """ if not is_admin(account): raise HTTPException(status_code=403, detail="Administrative access required.") metadata = { "is_docker": os.path.exists('/.dockerenv'), "cgroup": None, "mounts": None } if os.path.exists('/proc/self/cgroup'): try: with open('/proc/self/cgroup', 'r') as f: metadata['cgroup'] = f.read() except: pass if os.path.exists('/proc/self/mounts'): try: with open('/proc/self/mounts', 'r') as f: metadata['mounts'] = f.read() except: pass return mk_resp(data=metadata)