Add agent_bridge.py administrative endpoints and mcp_docker_explorer.py script

- Implemented /status, /system/usage, /logs, /logs/list, /processes, and /container/metadata in agent_bridge.py.
- Added mcp_docker_explorer.py for Docker MCP integration testing.
- Enhanced administrative access checks in agent_bridge.py.
This commit is contained in:
Scott Idem
2026-01-07 12:01:48 -05:00
parent 75b771f87c
commit c47ae47a2f
2 changed files with 240 additions and 20 deletions

View File

@@ -1,14 +1,24 @@
from fastapi import APIRouter, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException, Query
import os
import platform
import json
from typing import Dict, Any
import psutil
import shutil
from typing import Dict, Any, List, Optional
from datetime import datetime
from app.lib_general_v3 import AccountContext, get_account_context
from app.models.response_models import Resp_Body_Base, mk_resp
router = APIRouter()
def is_admin(account: AccountContext):
if account.auth_method == 'bypass':
return True
if getattr(account, "administrator", False) or getattr(account, "manager", False):
return True
return False
@router.get("/status", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def get_container_status(
account: AccountContext = Depends(get_account_context)
@@ -18,7 +28,7 @@ async def get_container_status(
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
# Simple check for administrative access or bypass
if account.auth_method != 'bypass' and not getattr(account, "administrator", False) and not getattr(account, "manager", False):
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
status_data = {
@@ -27,35 +37,185 @@ async def get_container_status(
"python_version": platform.python_version(),
"hostname": platform.node(),
"cpu_count": os.cpu_count(),
"environment_vars": {k: v for k, v in os.environ.items() if "PASSWORD" not in k.upper() and "KEY" not in k.upper() and "SECRET" not in k.upper()},
"environment_vars": {k: v for k, v in os.environ.items() if not any(s in k.upper() for s in ["PASSWORD", "KEY", "SECRET", "AUTH", "TOKEN"])},
"cwd": os.getcwd(),
"container": os.path.exists('/.dockerenv')
"container": os.path.exists('/.dockerenv'),
"boot_time": datetime.fromtimestamp(psutil.boot_time()).isoformat()
}
return mk_resp(data=status_data)
@router.get("/logs", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def get_latest_logs(
lines: int = 50,
@router.get("/system/usage", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def get_system_usage(
account: AccountContext = Depends(get_account_context)
):
"""
Returns the last N lines of the application log.
Returns real-time CPU, Memory, and Disk usage.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
if account.auth_method != 'bypass' and not getattr(account, "administrator", False) and not getattr(account, "manager", False):
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
# CPU usage per core
cpu_percent = psutil.cpu_percent(interval=0.1, percpu=True)
# Memory usage
mem = psutil.virtual_memory()
# Disk usage (root)
disk = shutil.disk_usage("/")
usage_data = {
"cpu": {
"percent_avg": sum(cpu_percent) / len(cpu_percent) if cpu_percent else 0,
"percent_per_core": cpu_percent,
"count": psutil.cpu_count(),
"load_avg": os.getloadavg() if hasattr(os, 'getloadavg') else None
},
"memory": {
"total": mem.total,
"available": mem.available,
"percent": mem.percent,
"used": mem.used
},
"disk": {
"total": disk.total,
"used": disk.used,
"free": disk.free,
"percent": (disk.used / disk.total) * 100 if disk.total else 0
}
}
return mk_resp(data=usage_data)
@router.get("/logs", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def get_latest_logs(
lines: int = 50,
log_file: str = "aether_api.log",
account: AccountContext = Depends(get_account_context)
):
"""
Returns the last N lines of a specified log file.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
# Sanitize log_file to prevent directory traversal
log_file = os.path.basename(log_file)
from app.config import settings
log_dir = os.path.dirname(settings.LOG_PATH.get('app', '/logs/aether_api.log'))
log_path = os.path.join(log_dir, log_file)
if not os.path.exists(log_path):
return mk_resp(data=False, status_message=f"Log file not found at {log_path}", status_code=404)
try:
# Using tail if available for efficiency
import subprocess
result = subprocess.run(['tail', f'-n {lines}', log_path], capture_output=True, text=True)
if result.returncode == 0:
return mk_resp(data=result.stdout)
else:
raise Exception(result.stderr)
except Exception as e:
# Fallback to python read if tail fails
try:
with open(log_path, 'r') as f:
log_lines = f.readlines()
latest = log_lines[-lines:] if len(log_lines) > lines else log_lines
return mk_resp(data="".join(latest))
except Exception as inner_e:
return mk_resp(data=False, status_message=f"Error reading logs: {str(e)} | {str(inner_e)}", status_code=500)
@router.get("/logs/list", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def list_log_files(
account: AccountContext = Depends(get_account_context)
):
"""
Lists available log files in the log directory.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
from app.config import settings
log_path = settings.LOG_PATH.get('app', '/logs/aether_api.log')
log_dir = os.path.dirname(settings.LOG_PATH.get('app', '/logs/aether_api.log'))
if not os.path.exists(log_path):
return mk_resp(data=False, status_message=f"Log file not found at {log_path}")
if not os.path.exists(log_dir):
return mk_resp(data=[], status_message=f"Log directory not found at {log_dir}")
try:
with open(log_path, 'r') as f:
log_lines = f.readlines()
latest = log_lines[-lines:] if len(log_lines) > lines else log_lines
return mk_resp(data="".join(latest))
except Exception as e:
return mk_resp(data=False, status_message=str(e))
files = []
for f in os.listdir(log_dir):
path = os.path.join(log_dir, f)
if os.path.isfile(path):
stats = os.stat(path)
files.append({
"name": f,
"size": stats.st_size,
"modified": datetime.fromtimestamp(stats.st_mtime).isoformat()
})
return mk_resp(data=files)
@router.get("/processes", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def list_processes(
limit: int = 10,
sort_by: str = Query("cpu", enum=["cpu", "memory"]),
account: AccountContext = Depends(get_account_context)
):
"""
Lists top processes by CPU or Memory usage.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
procs = []
for proc in psutil.process_iter(['pid', 'name', 'username', 'cpu_percent', 'memory_info']):
try:
pinfo = proc.info
pinfo['memory_rss'] = pinfo['memory_info'].rss if pinfo.get('memory_info') else 0
procs.append(pinfo)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
if sort_by == "cpu":
procs.sort(key=lambda x: x['cpu_percent'], reverse=True)
else:
procs.sort(key=lambda x: x['memory_rss'], reverse=True)
return mk_resp(data=procs[:limit])
@router.get("/container/metadata", response_model=Resp_Body_Base, tags=["Agent Bridge"])
async def get_container_metadata(
account: AccountContext = Depends(get_account_context)
):
"""
Attempts to gather Docker-specific metadata from the environment.
Only accessible to administrators/managers via existing hierarchy or bypass.
"""
if not is_admin(account):
raise HTTPException(status_code=403, detail="Administrative access required.")
metadata = {
"is_docker": os.path.exists('/.dockerenv'),
"cgroup": None,
"mounts": None
}
if os.path.exists('/proc/self/cgroup'):
try:
with open('/proc/self/cgroup', 'r') as f:
metadata['cgroup'] = f.read()
except:
pass
if os.path.exists('/proc/self/mounts'):
try:
with open('/proc/self/mounts', 'r') as f:
metadata['mounts'] = f.read()
except:
pass
return mk_resp(data=metadata)

60
mcp_docker_explorer.py Normal file
View File

@@ -0,0 +1,60 @@
import asyncio
import sys
import json
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
async def run_docker_mcp_explorer():
# Define the server parameters to run the Docker MCP server via npx
# Using the official Docker MCP server from the Model Context Protocol organization
server_params = StdioServerParameters(
command="npx",
args=["-y", "@modelcontextprotocol/server-docker"],
env=None
)
print("--- Connecting to Docker MCP Server ---")
try:
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
# Initialize the session
print("Initializing session...")
await session.initialize()
# 1. List available tools
print("\n--- Available Tools ---")
tools_result = await session.list_tools()
for tool in tools_result.tools:
print(f"- {tool.name}: {tool.description}")
# 2. Call 'docker_list_containers'
print("\n--- Calling 'docker_list_containers' ---")
# The official server tool name is 'docker_list_containers'
# It doesn't require arguments for a basic list
containers_result = await session.call_tool("docker_list_containers", arguments={})
# The result comes back as a list of Content objects
if containers_result.content:
for item in containers_result.content:
if item.type == 'text':
# Parse the text (which is usually a JSON string for this tool)
try:
containers = json.loads(item.text)
print(f"Found {len(containers)} containers:")
for c in containers:
status = c.get('Status', 'Unknown')
names = ", ".join(c.get('Names', []))
print(f" [{c.get('Id')[:12]}] {names} ({status})")
except json.JSONDecodeError:
print(item.text)
else:
print("No containers found or empty response.")
except Exception as e:
print(f"\nError: {e}")
if "npx" in str(e):
print("Ensure 'npx' is installed and available in your PATH.")
if __name__ == "__main__":
asyncio.run(run_docker_mcp_explorer())