From c47ae47a2ffda4f41c60cbbfcaa0b6a4fe863acf Mon Sep 17 00:00:00 2001 From: Scott Idem Date: Wed, 7 Jan 2026 12:01:48 -0500 Subject: [PATCH] Add agent_bridge.py administrative endpoints and mcp_docker_explorer.py script - Implemented /status, /system/usage, /logs, /logs/list, /processes, and /container/metadata in agent_bridge.py. - Added mcp_docker_explorer.py for Docker MCP integration testing. - Enhanced administrative access checks in agent_bridge.py. --- app/routers/agent_bridge.py | 200 ++++++++++++++++++++++++++++++++---- mcp_docker_explorer.py | 60 +++++++++++ 2 files changed, 240 insertions(+), 20 deletions(-) create mode 100644 mcp_docker_explorer.py diff --git a/app/routers/agent_bridge.py b/app/routers/agent_bridge.py index 140395e..5dd37a8 100644 --- a/app/routers/agent_bridge.py +++ b/app/routers/agent_bridge.py @@ -1,14 +1,24 @@ -from fastapi import APIRouter, Depends, HTTPException +from fastapi import APIRouter, Depends, HTTPException, Query import os import platform import json -from typing import Dict, Any +import psutil +import shutil +from typing import Dict, Any, List, Optional +from datetime import datetime from app.lib_general_v3 import AccountContext, get_account_context from app.models.response_models import Resp_Body_Base, mk_resp router = APIRouter() +def is_admin(account: AccountContext): + if account.auth_method == 'bypass': + return True + if getattr(account, "administrator", False) or getattr(account, "manager", False): + return True + return False + @router.get("/status", response_model=Resp_Body_Base, tags=["Agent Bridge"]) async def get_container_status( account: AccountContext = Depends(get_account_context) @@ -18,7 +28,7 @@ async def get_container_status( Only accessible to administrators/managers via existing hierarchy or bypass. """ # Simple check for administrative access or bypass - if account.auth_method != 'bypass' and not getattr(account, "administrator", False) and not getattr(account, "manager", False): + if not is_admin(account): raise HTTPException(status_code=403, detail="Administrative access required.") status_data = { @@ -27,35 +37,185 @@ async def get_container_status( "python_version": platform.python_version(), "hostname": platform.node(), "cpu_count": os.cpu_count(), - "environment_vars": {k: v for k, v in os.environ.items() if "PASSWORD" not in k.upper() and "KEY" not in k.upper() and "SECRET" not in k.upper()}, + "environment_vars": {k: v for k, v in os.environ.items() if not any(s in k.upper() for s in ["PASSWORD", "KEY", "SECRET", "AUTH", "TOKEN"])}, "cwd": os.getcwd(), - "container": os.path.exists('/.dockerenv') + "container": os.path.exists('/.dockerenv'), + "boot_time": datetime.fromtimestamp(psutil.boot_time()).isoformat() } return mk_resp(data=status_data) -@router.get("/logs", response_model=Resp_Body_Base, tags=["Agent Bridge"]) -async def get_latest_logs( - lines: int = 50, +@router.get("/system/usage", response_model=Resp_Body_Base, tags=["Agent Bridge"]) +async def get_system_usage( account: AccountContext = Depends(get_account_context) ): """ - Returns the last N lines of the application log. + Returns real-time CPU, Memory, and Disk usage. Only accessible to administrators/managers via existing hierarchy or bypass. """ - if account.auth_method != 'bypass' and not getattr(account, "administrator", False) and not getattr(account, "manager", False): + if not is_admin(account): + raise HTTPException(status_code=403, detail="Administrative access required.") + + # CPU usage per core + cpu_percent = psutil.cpu_percent(interval=0.1, percpu=True) + + # Memory usage + mem = psutil.virtual_memory() + + # Disk usage (root) + disk = shutil.disk_usage("/") + + usage_data = { + "cpu": { + "percent_avg": sum(cpu_percent) / len(cpu_percent) if cpu_percent else 0, + "percent_per_core": cpu_percent, + "count": psutil.cpu_count(), + "load_avg": os.getloadavg() if hasattr(os, 'getloadavg') else None + }, + "memory": { + "total": mem.total, + "available": mem.available, + "percent": mem.percent, + "used": mem.used + }, + "disk": { + "total": disk.total, + "used": disk.used, + "free": disk.free, + "percent": (disk.used / disk.total) * 100 if disk.total else 0 + } + } + + return mk_resp(data=usage_data) + +@router.get("/logs", response_model=Resp_Body_Base, tags=["Agent Bridge"]) +async def get_latest_logs( + lines: int = 50, + log_file: str = "aether_api.log", + account: AccountContext = Depends(get_account_context) +): + """ + Returns the last N lines of a specified log file. + Only accessible to administrators/managers via existing hierarchy or bypass. + """ + if not is_admin(account): + raise HTTPException(status_code=403, detail="Administrative access required.") + + # Sanitize log_file to prevent directory traversal + log_file = os.path.basename(log_file) + from app.config import settings + log_dir = os.path.dirname(settings.LOG_PATH.get('app', '/logs/aether_api.log')) + log_path = os.path.join(log_dir, log_file) + + if not os.path.exists(log_path): + return mk_resp(data=False, status_message=f"Log file not found at {log_path}", status_code=404) + + try: + # Using tail if available for efficiency + import subprocess + result = subprocess.run(['tail', f'-n {lines}', log_path], capture_output=True, text=True) + if result.returncode == 0: + return mk_resp(data=result.stdout) + else: + raise Exception(result.stderr) + except Exception as e: + # Fallback to python read if tail fails + try: + with open(log_path, 'r') as f: + log_lines = f.readlines() + latest = log_lines[-lines:] if len(log_lines) > lines else log_lines + return mk_resp(data="".join(latest)) + except Exception as inner_e: + return mk_resp(data=False, status_message=f"Error reading logs: {str(e)} | {str(inner_e)}", status_code=500) + +@router.get("/logs/list", response_model=Resp_Body_Base, tags=["Agent Bridge"]) +async def list_log_files( + account: AccountContext = Depends(get_account_context) +): + """ + Lists available log files in the log directory. + Only accessible to administrators/managers via existing hierarchy or bypass. + """ + if not is_admin(account): raise HTTPException(status_code=403, detail="Administrative access required.") from app.config import settings - log_path = settings.LOG_PATH.get('app', '/logs/aether_api.log') + log_dir = os.path.dirname(settings.LOG_PATH.get('app', '/logs/aether_api.log')) - if not os.path.exists(log_path): - return mk_resp(data=False, status_message=f"Log file not found at {log_path}") + if not os.path.exists(log_dir): + return mk_resp(data=[], status_message=f"Log directory not found at {log_dir}") - try: - with open(log_path, 'r') as f: - log_lines = f.readlines() - latest = log_lines[-lines:] if len(log_lines) > lines else log_lines - return mk_resp(data="".join(latest)) - except Exception as e: - return mk_resp(data=False, status_message=str(e)) \ No newline at end of file + files = [] + for f in os.listdir(log_dir): + path = os.path.join(log_dir, f) + if os.path.isfile(path): + stats = os.stat(path) + files.append({ + "name": f, + "size": stats.st_size, + "modified": datetime.fromtimestamp(stats.st_mtime).isoformat() + }) + + return mk_resp(data=files) + +@router.get("/processes", response_model=Resp_Body_Base, tags=["Agent Bridge"]) +async def list_processes( + limit: int = 10, + sort_by: str = Query("cpu", enum=["cpu", "memory"]), + account: AccountContext = Depends(get_account_context) +): + """ + Lists top processes by CPU or Memory usage. + Only accessible to administrators/managers via existing hierarchy or bypass. + """ + if not is_admin(account): + raise HTTPException(status_code=403, detail="Administrative access required.") + + procs = [] + for proc in psutil.process_iter(['pid', 'name', 'username', 'cpu_percent', 'memory_info']): + try: + pinfo = proc.info + pinfo['memory_rss'] = pinfo['memory_info'].rss if pinfo.get('memory_info') else 0 + procs.append(pinfo) + except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): + pass + + if sort_by == "cpu": + procs.sort(key=lambda x: x['cpu_percent'], reverse=True) + else: + procs.sort(key=lambda x: x['memory_rss'], reverse=True) + + return mk_resp(data=procs[:limit]) + +@router.get("/container/metadata", response_model=Resp_Body_Base, tags=["Agent Bridge"]) +async def get_container_metadata( + account: AccountContext = Depends(get_account_context) +): + """ + Attempts to gather Docker-specific metadata from the environment. + Only accessible to administrators/managers via existing hierarchy or bypass. + """ + if not is_admin(account): + raise HTTPException(status_code=403, detail="Administrative access required.") + + metadata = { + "is_docker": os.path.exists('/.dockerenv'), + "cgroup": None, + "mounts": None + } + + if os.path.exists('/proc/self/cgroup'): + try: + with open('/proc/self/cgroup', 'r') as f: + metadata['cgroup'] = f.read() + except: + pass + + if os.path.exists('/proc/self/mounts'): + try: + with open('/proc/self/mounts', 'r') as f: + metadata['mounts'] = f.read() + except: + pass + + return mk_resp(data=metadata) \ No newline at end of file diff --git a/mcp_docker_explorer.py b/mcp_docker_explorer.py new file mode 100644 index 0000000..5467247 --- /dev/null +++ b/mcp_docker_explorer.py @@ -0,0 +1,60 @@ +import asyncio +import sys +import json +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client + +async def run_docker_mcp_explorer(): + # Define the server parameters to run the Docker MCP server via npx + # Using the official Docker MCP server from the Model Context Protocol organization + server_params = StdioServerParameters( + command="npx", + args=["-y", "@modelcontextprotocol/server-docker"], + env=None + ) + + print("--- Connecting to Docker MCP Server ---") + + try: + async with stdio_client(server_params) as (read, write): + async with ClientSession(read, write) as session: + # Initialize the session + print("Initializing session...") + await session.initialize() + + # 1. List available tools + print("\n--- Available Tools ---") + tools_result = await session.list_tools() + for tool in tools_result.tools: + print(f"- {tool.name}: {tool.description}") + + # 2. Call 'docker_list_containers' + print("\n--- Calling 'docker_list_containers' ---") + # The official server tool name is 'docker_list_containers' + # It doesn't require arguments for a basic list + containers_result = await session.call_tool("docker_list_containers", arguments={}) + + # The result comes back as a list of Content objects + if containers_result.content: + for item in containers_result.content: + if item.type == 'text': + # Parse the text (which is usually a JSON string for this tool) + try: + containers = json.loads(item.text) + print(f"Found {len(containers)} containers:") + for c in containers: + status = c.get('Status', 'Unknown') + names = ", ".join(c.get('Names', [])) + print(f" [{c.get('Id')[:12]}] {names} ({status})") + except json.JSONDecodeError: + print(item.text) + else: + print("No containers found or empty response.") + + except Exception as e: + print(f"\nError: {e}") + if "npx" in str(e): + print("Ensure 'npx' is installed and available in your PATH.") + +if __name__ == "__main__": + asyncio.run(run_docker_mcp_explorer())