Router Registry Cleanup: Archive unreferenced legacy routes

- Moved 29 unreferenced legacy router files to app/routers/archive/
- Restored and registered 'websockets' router in registry.py (exempted from deprecation)
- Cleaned up registry imports and verified application compilation.
This commit is contained in:
Scott Idem
2026-01-28 11:47:18 -05:00
parent cfbe6f458f
commit 3eaf176b05
30 changed files with 2 additions and 1 deletions

View File

@@ -0,0 +1,246 @@
from fastapi import APIRouter, Depends, HTTPException, Query
import os
import platform
import json
import shutil
from typing import Dict, Any, List, Optional
from datetime import datetime
from app.lib_general_v3 import AccountContext, get_account_context
from app.models.response_models import Resp_Body_Base, mk_resp
router = APIRouter()
def is_admin(account: AccountContext):
if account.auth_method == 'bypass':
return True
if getattr(account, "administrator", False) or getattr(account, "manager", False):
return True
return False
@router.get("/status", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def get_container_status(
account: AccountContext = Depends(get_account_context)
):
"""
Returns diagnostic information about the container environment.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
# Simple check for administrative access or bypass
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
boot_time = None
try:
import psutil
boot_time = datetime.fromtimestamp(psutil.boot_time()).isoformat()
except Exception as e:
boot_time = f"Error: {str(e)}"
status_data = {
"os": platform.system(),
"release": platform.release(),
"python_version": platform.python_version(),
"hostname": platform.node(),
"cpu_count": os.cpu_count(),
"environment_vars": {k: v for k, v in os.environ.items() if not any(s in k.upper() for s in ["PASSWORD", "KEY", "SECRET", "AUTH", "TOKEN"])},
"cwd": os.getcwd(),
"container": os.path.exists('/.dockerenv'),
"boot_time": boot_time
}
return mk_resp(data=status_data)
@router.get("/system/usage", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def get_system_usage(
account: AccountContext = Depends(get_account_context)
):
"""
Returns real-time CPU, Memory, and Disk usage.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
cpu_data = {"error": "psutil not available"}
mem_data = {"error": "psutil not available"}
try:
import psutil
# CPU usage per core
cpu_percent = psutil.cpu_percent(interval=0.1, percpu=True)
cpu_data = {
"percent_avg": sum(cpu_percent) / len(cpu_percent) if cpu_percent else 0,
"percent_per_core": cpu_percent,
"count": psutil.cpu_count(),
"load_avg": os.getloadavg() if hasattr(os, 'getloadavg') else None
}
# Memory usage
mem = psutil.virtual_memory()
mem_data = {
"total": mem.total,
"available": mem.available,
"percent": mem.percent,
"used": mem.used
}
except Exception as e:
cpu_data = {"error": str(e)}
mem_data = {"error": str(e)}
# Disk usage (root) - uses shutil which is standard lib
disk_data = {}
try:
disk = shutil.disk_usage("/")
disk_data = {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": (disk.used / disk.total) * 100 if disk.total else 0
}
except Exception as e:
disk_data = {"error": str(e)}
usage_data = {
"cpu": cpu_data,
"memory": mem_data,
"disk": disk_data
}
return mk_resp(data=usage_data)
@router.get("/logs", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def get_latest_logs(
lines: int = 50,
log_file: str = "aether_api.log",
account: AccountContext = Depends(get_account_context)
):
"""
Returns the last N lines of a specified log file.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
# Sanitize log_file to prevent directory traversal
log_file = os.path.basename(log_file)
from app.config import settings
log_dir = os.path.dirname(settings.LOG_PATH.get('app', '/logs/aether_api.log'))
log_path = os.path.join(log_dir, log_file)
if not os.path.exists(log_path):
return mk_resp(data=False, status_message=f"Log file not found at {log_path}", status_code=404)
try:
# Using tail if available for efficiency
import subprocess
result = subprocess.run(['tail', f'-n {lines}', log_path], capture_output=True, text=True)
if result.returncode == 0:
return mk_resp(data=result.stdout)
else:
raise Exception(result.stderr)
except Exception as e:
# Fallback to python read if tail fails
try:
with open(log_path, 'r') as f:
log_lines = f.readlines()
latest = log_lines[-lines:] if len(log_lines) > lines else log_lines
return mk_resp(data="".join(latest))
except Exception as inner_e:
return mk_resp(data=False, status_message=f"Error reading logs: {str(e)} | {str(inner_e)}", status_code=500)
@router.get("/logs/list", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def list_log_files(
account: AccountContext = Depends(get_account_context)
):
"""
Lists available log files in the log directory.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
from app.config import settings
log_dir = os.path.dirname(settings.LOG_PATH.get('app', '/logs/aether_api.log'))
if not os.path.exists(log_dir):
return mk_resp(data=[], status_message=f"Log directory not found at {log_dir}")
files = []
for f in os.listdir(log_dir):
path = os.path.join(log_dir, f)
if os.path.isfile(path):
stats = os.stat(path)
files.append({
"name": f,
"size": stats.st_size,
"modified": datetime.fromtimestamp(stats.st_mtime).isoformat()
})
return mk_resp(data=files)
@router.get("/processes", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def list_processes(
limit: int = 10,
sort_by: str = Query("cpu", enum=["cpu", "memory"]),
account: AccountContext = Depends(get_account_context)
):
"""
Lists top processes by CPU or Memory usage.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
procs = []
try:
import psutil
for proc in psutil.process_iter(['pid', 'name', 'username', 'cpu_percent', 'memory_info']):
try:
pinfo = proc.info
pinfo['memory_rss'] = pinfo['memory_info'].rss if pinfo.get('memory_info') else 0
procs.append(pinfo)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
except Exception as e:
return mk_resp(data=[], status_message=f"Error listing processes: {str(e)}", status_code=500)
if sort_by == "cpu":
procs.sort(key=lambda x: x['cpu_percent'], reverse=True)
else:
procs.sort(key=lambda x: x['memory_rss'], reverse=True)
return mk_resp(data=procs[:limit])
@router.get("/container/metadata", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def get_container_metadata(
account: AccountContext = Depends(get_account_context)
):
"""
Attempts to gather Docker-specific metadata from the environment.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
metadata = {
"is_docker": os.path.exists('/.dockerenv'),
"cgroup": None,
"mounts": None
}
if os.path.exists('/proc/self/cgroup'):
try:
with open('/proc/self/cgroup', 'r') as f:
metadata['cgroup'] = f.read()
except:
pass
if os.path.exists('/proc/self/mounts'):
try:
with open('/proc/self/mounts', 'r') as f:
metadata['mounts'] = f.read()
except:
pass
return mk_resp(data=metadata)