246 lines
8.5 KiB
Python
246 lines
8.5 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, Query
|
|
import os
|
|
import platform
|
|
import json
|
|
import shutil
|
|
from typing import Dict, Any, List, Optional
|
|
from datetime import datetime
|
|
|
|
from app.lib_general_v3 import AccountContext, get_account_context
|
|
from app.models.response_models import Resp_Body_Base, mk_resp
|
|
|
|
router = APIRouter()
|
|
|
|
def is_admin(account: AccountContext):
|
|
if account.auth_method == 'bypass':
|
|
return True
|
|
if getattr(account, "administrator", False) or getattr(account, "manager", False):
|
|
return True
|
|
return False
|
|
|
|
@router.get("/status", response_model=Resp_Body_Base, tags=["Agent Bridge"])
|
|
async def get_container_status(
|
|
account: AccountContext = Depends(get_account_context)
|
|
):
|
|
"""
|
|
Returns diagnostic information about the container environment.
|
|
Only accessible to administrators/managers via existing hierarchy or bypass.
|
|
"""
|
|
# Simple check for administrative access or bypass
|
|
if not is_admin(account):
|
|
raise HTTPException(status_code=403, detail="Administrative access required.")
|
|
|
|
boot_time = None
|
|
try:
|
|
import psutil
|
|
boot_time = datetime.fromtimestamp(psutil.boot_time()).isoformat()
|
|
except Exception as e:
|
|
boot_time = f"Error: {str(e)}"
|
|
|
|
status_data = {
|
|
"os": platform.system(),
|
|
"release": platform.release(),
|
|
"python_version": platform.python_version(),
|
|
"hostname": platform.node(),
|
|
"cpu_count": os.cpu_count(),
|
|
"environment_vars": {k: v for k, v in os.environ.items() if not any(s in k.upper() for s in ["PASSWORD", "KEY", "SECRET", "AUTH", "TOKEN"])},
|
|
"cwd": os.getcwd(),
|
|
"container": os.path.exists('/.dockerenv'),
|
|
"boot_time": boot_time
|
|
}
|
|
|
|
return mk_resp(data=status_data)
|
|
|
|
@router.get("/system/usage", response_model=Resp_Body_Base, tags=["Agent Bridge"])
|
|
async def get_system_usage(
|
|
account: AccountContext = Depends(get_account_context)
|
|
):
|
|
"""
|
|
Returns real-time CPU, Memory, and Disk usage.
|
|
Only accessible to administrators/managers via existing hierarchy or bypass.
|
|
"""
|
|
if not is_admin(account):
|
|
raise HTTPException(status_code=403, detail="Administrative access required.")
|
|
|
|
cpu_data = {"error": "psutil not available"}
|
|
mem_data = {"error": "psutil not available"}
|
|
|
|
try:
|
|
import psutil
|
|
# CPU usage per core
|
|
cpu_percent = psutil.cpu_percent(interval=0.1, percpu=True)
|
|
cpu_data = {
|
|
"percent_avg": sum(cpu_percent) / len(cpu_percent) if cpu_percent else 0,
|
|
"percent_per_core": cpu_percent,
|
|
"count": psutil.cpu_count(),
|
|
"load_avg": os.getloadavg() if hasattr(os, 'getloadavg') else None
|
|
}
|
|
|
|
# Memory usage
|
|
mem = psutil.virtual_memory()
|
|
mem_data = {
|
|
"total": mem.total,
|
|
"available": mem.available,
|
|
"percent": mem.percent,
|
|
"used": mem.used
|
|
}
|
|
except Exception as e:
|
|
cpu_data = {"error": str(e)}
|
|
mem_data = {"error": str(e)}
|
|
|
|
# Disk usage (root) - uses shutil which is standard lib
|
|
disk_data = {}
|
|
try:
|
|
disk = shutil.disk_usage("/")
|
|
disk_data = {
|
|
"total": disk.total,
|
|
"used": disk.used,
|
|
"free": disk.free,
|
|
"percent": (disk.used / disk.total) * 100 if disk.total else 0
|
|
}
|
|
except Exception as e:
|
|
disk_data = {"error": str(e)}
|
|
|
|
usage_data = {
|
|
"cpu": cpu_data,
|
|
"memory": mem_data,
|
|
"disk": disk_data
|
|
}
|
|
|
|
return mk_resp(data=usage_data)
|
|
|
|
@router.get("/logs", response_model=Resp_Body_Base, tags=["Agent Bridge"])
|
|
async def get_latest_logs(
|
|
lines: int = 50,
|
|
log_file: str = "aether_api.log",
|
|
account: AccountContext = Depends(get_account_context)
|
|
):
|
|
"""
|
|
Returns the last N lines of a specified log file.
|
|
Only accessible to administrators/managers via existing hierarchy or bypass.
|
|
"""
|
|
if not is_admin(account):
|
|
raise HTTPException(status_code=403, detail="Administrative access required.")
|
|
|
|
# Sanitize log_file to prevent directory traversal
|
|
log_file = os.path.basename(log_file)
|
|
from app.config import settings
|
|
log_dir = os.path.dirname(settings.LOG_PATH.get('app', '/logs/aether_api.log'))
|
|
log_path = os.path.join(log_dir, log_file)
|
|
|
|
if not os.path.exists(log_path):
|
|
return mk_resp(data=False, status_message=f"Log file not found at {log_path}", status_code=404)
|
|
|
|
try:
|
|
# Using tail if available for efficiency
|
|
import subprocess
|
|
result = subprocess.run(['tail', f'-n {lines}', log_path], capture_output=True, text=True)
|
|
if result.returncode == 0:
|
|
return mk_resp(data=result.stdout)
|
|
else:
|
|
raise Exception(result.stderr)
|
|
except Exception as e:
|
|
# Fallback to python read if tail fails
|
|
try:
|
|
with open(log_path, 'r') as f:
|
|
log_lines = f.readlines()
|
|
latest = log_lines[-lines:] if len(log_lines) > lines else log_lines
|
|
return mk_resp(data="".join(latest))
|
|
except Exception as inner_e:
|
|
return mk_resp(data=False, status_message=f"Error reading logs: {str(e)} | {str(inner_e)}", status_code=500)
|
|
|
|
@router.get("/logs/list", response_model=Resp_Body_Base, tags=["Agent Bridge"])
|
|
async def list_log_files(
|
|
account: AccountContext = Depends(get_account_context)
|
|
):
|
|
"""
|
|
Lists available log files in the log directory.
|
|
Only accessible to administrators/managers via existing hierarchy or bypass.
|
|
"""
|
|
if not is_admin(account):
|
|
raise HTTPException(status_code=403, detail="Administrative access required.")
|
|
|
|
from app.config import settings
|
|
log_dir = os.path.dirname(settings.LOG_PATH.get('app', '/logs/aether_api.log'))
|
|
|
|
if not os.path.exists(log_dir):
|
|
return mk_resp(data=[], status_message=f"Log directory not found at {log_dir}")
|
|
|
|
files = []
|
|
for f in os.listdir(log_dir):
|
|
path = os.path.join(log_dir, f)
|
|
if os.path.isfile(path):
|
|
stats = os.stat(path)
|
|
files.append({
|
|
"name": f,
|
|
"size": stats.st_size,
|
|
"modified": datetime.fromtimestamp(stats.st_mtime).isoformat()
|
|
})
|
|
|
|
return mk_resp(data=files)
|
|
|
|
@router.get("/processes", response_model=Resp_Body_Base, tags=["Agent Bridge"])
|
|
async def list_processes(
|
|
limit: int = 10,
|
|
sort_by: str = Query("cpu", enum=["cpu", "memory"]),
|
|
account: AccountContext = Depends(get_account_context)
|
|
):
|
|
"""
|
|
Lists top processes by CPU or Memory usage.
|
|
Only accessible to administrators/managers via existing hierarchy or bypass.
|
|
"""
|
|
if not is_admin(account):
|
|
raise HTTPException(status_code=403, detail="Administrative access required.")
|
|
|
|
procs = []
|
|
try:
|
|
import psutil
|
|
for proc in psutil.process_iter(['pid', 'name', 'username', 'cpu_percent', 'memory_info']):
|
|
try:
|
|
pinfo = proc.info
|
|
pinfo['memory_rss'] = pinfo['memory_info'].rss if pinfo.get('memory_info') else 0
|
|
procs.append(pinfo)
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
pass
|
|
except Exception as e:
|
|
return mk_resp(data=[], status_message=f"Error listing processes: {str(e)}", status_code=500)
|
|
|
|
if sort_by == "cpu":
|
|
procs.sort(key=lambda x: x['cpu_percent'], reverse=True)
|
|
else:
|
|
procs.sort(key=lambda x: x['memory_rss'], reverse=True)
|
|
|
|
return mk_resp(data=procs[:limit])
|
|
|
|
@router.get("/container/metadata", response_model=Resp_Body_Base, tags=["Agent Bridge"])
|
|
async def get_container_metadata(
|
|
account: AccountContext = Depends(get_account_context)
|
|
):
|
|
"""
|
|
Attempts to gather Docker-specific metadata from the environment.
|
|
Only accessible to administrators/managers via existing hierarchy or bypass.
|
|
"""
|
|
if not is_admin(account):
|
|
raise HTTPException(status_code=403, detail="Administrative access required.")
|
|
|
|
metadata = {
|
|
"is_docker": os.path.exists('/.dockerenv'),
|
|
"cgroup": None,
|
|
"mounts": None
|
|
}
|
|
|
|
if os.path.exists('/proc/self/cgroup'):
|
|
try:
|
|
with open('/proc/self/cgroup', 'r') as f:
|
|
metadata['cgroup'] = f.read()
|
|
except:
|
|
pass
|
|
|
|
if os.path.exists('/proc/self/mounts'):
|
|
try:
|
|
with open('/proc/self/mounts', 'r') as f:
|
|
metadata['mounts'] = f.read()
|
|
except:
|
|
pass
|
|
|
|
return mk_resp(data=metadata) |