Files
OSIT-AE-API-FastAPI/app/routers/agent_bridge.py

246 lines
8.5 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query
import os
import platform
import json
import shutil
from typing import Dict, Any, List, Optional
from datetime import datetime
from app.lib_general_v3 import AccountContext, get_account_context
from app.models.response_models import Resp_Body_Base, mk_resp
router = APIRouter()
def is_admin(account: AccountContext):
if account.auth_method == 'bypass':
return True
if getattr(account, "administrator", False) or getattr(account, "manager", False):
return True
return False
@router.get("/status", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def get_container_status(
account: AccountContext = Depends(get_account_context)
):
"""
Returns diagnostic information about the container environment.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
# Simple check for administrative access or bypass
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
boot_time = None
try:
import psutil
boot_time = datetime.fromtimestamp(psutil.boot_time()).isoformat()
except Exception as e:
boot_time = f"Error: {str(e)}"
status_data = {
"os": platform.system(),
"release": platform.release(),
"python_version": platform.python_version(),
"hostname": platform.node(),
"cpu_count": os.cpu_count(),
"environment_vars": {k: v for k, v in os.environ.items() if not any(s in k.upper() for s in ["PASSWORD", "KEY", "SECRET", "AUTH", "TOKEN"])},
"cwd": os.getcwd(),
"container": os.path.exists('/.dockerenv'),
"boot_time": boot_time
}
return mk_resp(data=status_data)
@router.get("/system/usage", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def get_system_usage(
account: AccountContext = Depends(get_account_context)
):
"""
Returns real-time CPU, Memory, and Disk usage.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
cpu_data = {"error": "psutil not available"}
mem_data = {"error": "psutil not available"}
try:
import psutil
# CPU usage per core
cpu_percent = psutil.cpu_percent(interval=0.1, percpu=True)
cpu_data = {
"percent_avg": sum(cpu_percent) / len(cpu_percent) if cpu_percent else 0,
"percent_per_core": cpu_percent,
"count": psutil.cpu_count(),
"load_avg": os.getloadavg() if hasattr(os, 'getloadavg') else None
}
# Memory usage
mem = psutil.virtual_memory()
mem_data = {
"total": mem.total,
"available": mem.available,
"percent": mem.percent,
"used": mem.used
}
except Exception as e:
cpu_data = {"error": str(e)}
mem_data = {"error": str(e)}
# Disk usage (root) - uses shutil which is standard lib
disk_data = {}
try:
disk = shutil.disk_usage("/")
disk_data = {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": (disk.used / disk.total) * 100 if disk.total else 0
}
except Exception as e:
disk_data = {"error": str(e)}
usage_data = {
"cpu": cpu_data,
"memory": mem_data,
"disk": disk_data
}
return mk_resp(data=usage_data)
@router.get("/logs", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def get_latest_logs(
lines: int = 50,
log_file: str = "aether_api.log",
account: AccountContext = Depends(get_account_context)
):
"""
Returns the last N lines of a specified log file.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
# Sanitize log_file to prevent directory traversal
log_file = os.path.basename(log_file)
from app.config import settings
log_dir = os.path.dirname(settings.LOG_PATH.get('app', '/logs/aether_api.log'))
log_path = os.path.join(log_dir, log_file)
if not os.path.exists(log_path):
return mk_resp(data=False, status_message=f"Log file not found at {log_path}", status_code=404)
try:
# Using tail if available for efficiency
import subprocess
result = subprocess.run(['tail', f'-n {lines}', log_path], capture_output=True, text=True)
if result.returncode == 0:
return mk_resp(data=result.stdout)
else:
raise Exception(result.stderr)
except Exception as e:
# Fallback to python read if tail fails
try:
with open(log_path, 'r') as f:
log_lines = f.readlines()
latest = log_lines[-lines:] if len(log_lines) > lines else log_lines
return mk_resp(data="".join(latest))
except Exception as inner_e:
return mk_resp(data=False, status_message=f"Error reading logs: {str(e)} | {str(inner_e)}", status_code=500)
@router.get("/logs/list", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def list_log_files(
account: AccountContext = Depends(get_account_context)
):
"""
Lists available log files in the log directory.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
from app.config import settings
log_dir = os.path.dirname(settings.LOG_PATH.get('app', '/logs/aether_api.log'))
if not os.path.exists(log_dir):
return mk_resp(data=[], status_message=f"Log directory not found at {log_dir}")
files = []
for f in os.listdir(log_dir):
path = os.path.join(log_dir, f)
if os.path.isfile(path):
stats = os.stat(path)
files.append({
"name": f,
"size": stats.st_size,
"modified": datetime.fromtimestamp(stats.st_mtime).isoformat()
})
return mk_resp(data=files)
@router.get("/processes", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def list_processes(
limit: int = 10,
sort_by: str = Query("cpu", enum=["cpu", "memory"]),
account: AccountContext = Depends(get_account_context)
):
"""
Lists top processes by CPU or Memory usage.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
procs = []
try:
import psutil
for proc in psutil.process_iter(['pid', 'name', 'username', 'cpu_percent', 'memory_info']):
try:
pinfo = proc.info
pinfo['memory_rss'] = pinfo['memory_info'].rss if pinfo.get('memory_info') else 0
procs.append(pinfo)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
except Exception as e:
return mk_resp(data=[], status_message=f"Error listing processes: {str(e)}", status_code=500)
if sort_by == "cpu":
procs.sort(key=lambda x: x['cpu_percent'], reverse=True)
else:
procs.sort(key=lambda x: x['memory_rss'], reverse=True)
return mk_resp(data=procs[:limit])
@router.get("/container/metadata", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def get_container_metadata(
account: AccountContext = Depends(get_account_context)
):
"""
Attempts to gather Docker-specific metadata from the environment.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
metadata = {
"is_docker": os.path.exists('/.dockerenv'),
"cgroup": None,
"mounts": None
}
if os.path.exists('/proc/self/cgroup'):
try:
with open('/proc/self/cgroup', 'r') as f:
metadata['cgroup'] = f.read()
except:
pass
if os.path.exists('/proc/self/mounts'):
try:
with open('/proc/self/mounts', 'r') as f:
metadata['mounts'] = f.read()
except:
pass
return mk_resp(data=metadata)