- Schedules web UI (/settings/crons): list, add, edit, pause/resume, delete jobs - cron task type: full orchestrator tool loop on a schedule, result → notification channel - parse_schedule: monthly/yearly formats (monthly:DD:HH:MM, yearly:MM:DD:HH:MM) - HA inbound webhook tools toggle: orchestrator loop vs. direct LLM, configurable in UI - ae_db_query/describe/show_view: SELECT-only Aether MariaDB access (admin, per-user creds) - /settings/integrations: admin-only page for Aether DB credentials - Schedules nav link added to all settings pages - pymysql added to requirements - Docs updated: HELP.md, MASTER.md, CLAUDE.md Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
254 lines
8.5 KiB
Python
254 lines
8.5 KiB
Python
"""
|
|
Aether MariaDB tools — SELECT-only access to the Aether Platform database.
|
|
|
|
Credentials are read from the current user's channels.json:
|
|
"aether_db": {
|
|
"host": "192.168.64.5",
|
|
"port": 3306,
|
|
"name": "aether_dev",
|
|
"user": "aether_dev",
|
|
"password": "..."
|
|
}
|
|
|
|
Configure per-user in Settings → Notifications (or edit channels.json directly).
|
|
Only SELECT, SHOW, DESCRIBE, and EXPLAIN statements are permitted — no writes possible.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
|
|
from google.genai import types
|
|
|
|
from auth_utils import get_user_channels
|
|
from persona import get_user
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_MAX_ROWS = 200
|
|
_MAX_CELL = 120
|
|
_ALLOWED = {"select", "show", "describe", "desc", "explain"}
|
|
_SAFE_ID = re.compile(r'^[a-zA-Z0-9_]+$')
|
|
|
|
|
|
def _get_db_cfg() -> tuple[dict, str | None]:
|
|
"""Return (cfg_dict, error_string). cfg is empty dict on error."""
|
|
channels = get_user_channels(get_user())
|
|
cfg = channels.get("aether_db") or {}
|
|
if not cfg.get("host") or not cfg.get("user"):
|
|
return {}, (
|
|
"Aether DB not configured for this user. "
|
|
"Add an 'aether_db' block to channels.json: "
|
|
'{"host": "...", "port": 3306, "name": "aether_dev", "user": "...", "password": "..."}'
|
|
)
|
|
return cfg, None
|
|
|
|
|
|
def _is_read_only(sql: str) -> bool:
|
|
stripped = sql.strip()
|
|
if not stripped:
|
|
return False
|
|
first = stripped.split()[0].lower().rstrip(";")
|
|
return first in _ALLOWED
|
|
|
|
|
|
def _fmt(columns: list[str], rows: list[tuple]) -> str:
|
|
if not rows:
|
|
return f"({len(columns)} column{'s' if len(columns) != 1 else ''}, 0 rows)"
|
|
|
|
str_rows = [
|
|
[("NULL" if v is None else str(v))[:_MAX_CELL] for v in row]
|
|
for row in rows
|
|
]
|
|
|
|
widths = [
|
|
max([len(col)] + [len(r[i]) for r in str_rows])
|
|
for i, col in enumerate(columns)
|
|
]
|
|
|
|
sep = "+" + "+".join("-" * (w + 2) for w in widths) + "+"
|
|
header = "|" + "|".join(f" {c:<{w}} " for c, w in zip(columns, widths)) + "|"
|
|
lines = [sep, header, sep]
|
|
for row in str_rows:
|
|
lines.append("|" + "|".join(f" {v:<{w}} " for v, w in zip(row, widths)) + "|")
|
|
lines.append(sep)
|
|
|
|
note = " — results truncated at limit" if len(rows) == _MAX_ROWS else ""
|
|
lines.append(f"({len(rows)} row{'s' if len(rows) != 1 else ''}{note})")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def _connect(cfg: dict):
|
|
import pymysql
|
|
import pymysql.cursors
|
|
return pymysql.connect(
|
|
host=cfg["host"],
|
|
port=int(cfg.get("port", 3306)),
|
|
user=cfg["user"],
|
|
password=cfg.get("password", ""),
|
|
database=cfg.get("name", "aether_dev"),
|
|
cursorclass=pymysql.cursors.Cursor,
|
|
connect_timeout=10,
|
|
)
|
|
|
|
|
|
async def ae_db_query(sql: str) -> str:
|
|
"""Run a read-only SQL query against the Aether MariaDB and return formatted results."""
|
|
cfg, err = _get_db_cfg()
|
|
if err:
|
|
return err
|
|
|
|
if not _is_read_only(sql):
|
|
first = sql.strip().split()[0] if sql.strip() else "(empty)"
|
|
return f"Only SELECT, SHOW, DESCRIBE, and EXPLAIN are permitted. Got: {first!r}"
|
|
|
|
def _run() -> tuple[list[str], list[tuple]]:
|
|
conn = _connect(cfg)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql)
|
|
columns = [d[0] for d in cur.description] if cur.description else []
|
|
rows = list(cur.fetchmany(_MAX_ROWS))
|
|
return columns, rows
|
|
finally:
|
|
conn.close()
|
|
|
|
try:
|
|
columns, rows = await asyncio.to_thread(_run)
|
|
return _fmt(columns, rows)
|
|
except Exception as e:
|
|
logger.warning("ae_db_query error: %s", e)
|
|
return f"Query error: {e}"
|
|
|
|
|
|
async def ae_db_describe(table: str, detailed: bool = False) -> str:
|
|
"""Describe the columns of an Aether DB table or view."""
|
|
cfg, err = _get_db_cfg()
|
|
if err:
|
|
return err
|
|
|
|
if not _SAFE_ID.match(table):
|
|
return f"Invalid table name: {table!r}. Only letters, digits, and underscores allowed."
|
|
|
|
def _run():
|
|
conn = _connect(cfg)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(f"DESCRIBE `{table}`")
|
|
columns = [d[0] for d in cur.description] if cur.description else []
|
|
rows = list(cur.fetchall())
|
|
return columns, rows
|
|
finally:
|
|
conn.close()
|
|
|
|
try:
|
|
columns, rows = await asyncio.to_thread(_run)
|
|
if not detailed:
|
|
fields = [row[0] for row in rows]
|
|
return f"{table}: " + ", ".join(fields)
|
|
return _fmt(columns, rows)
|
|
except Exception as e:
|
|
logger.warning("ae_db_describe error: %s", e)
|
|
return f"Describe error: {e}"
|
|
|
|
|
|
async def ae_db_show_view(view_name: str) -> str:
|
|
"""Return the CREATE VIEW SQL for an Aether DB view."""
|
|
cfg, err = _get_db_cfg()
|
|
if err:
|
|
return err
|
|
|
|
if not _SAFE_ID.match(view_name):
|
|
return f"Invalid view name: {view_name!r}. Only letters, digits, and underscores allowed."
|
|
|
|
def _run():
|
|
conn = _connect(cfg)
|
|
try:
|
|
with conn.cursor() as cur:
|
|
cur.execute(f"SHOW CREATE VIEW `{view_name}`")
|
|
return cur.fetchone()
|
|
finally:
|
|
conn.close()
|
|
|
|
try:
|
|
row = await asyncio.to_thread(_run)
|
|
if not row:
|
|
return f"View not found: {view_name}"
|
|
return str(row[1]) if len(row) > 1 else str(row[0])
|
|
except Exception as e:
|
|
logger.warning("ae_db_show_view error: %s", e)
|
|
return f"Show view error: {e}"
|
|
|
|
|
|
DECLARATIONS = [
|
|
types.FunctionDeclaration(
|
|
name="ae_db_describe",
|
|
description=(
|
|
"Describe the columns of an Aether Platform table or view. "
|
|
"Returns a compact field list by default; pass detailed=true for full schema "
|
|
"(type, nullability, default, key). Use to understand data structure before "
|
|
"writing a SELECT query, or to answer 'what fields does X have?'. "
|
|
"Examples: table='ae_journals'; table='clients'; table='time_entries'."
|
|
),
|
|
parameters=types.Schema(
|
|
type=types.Type.OBJECT,
|
|
properties={
|
|
"table": types.Schema(
|
|
type=types.Type.STRING,
|
|
description="Table or view name (letters, digits, underscores only)",
|
|
),
|
|
"detailed": types.Schema(
|
|
type=types.Type.BOOLEAN,
|
|
description="Return full schema (type, nullability, key, default) instead of just field names",
|
|
),
|
|
},
|
|
required=["table"],
|
|
),
|
|
),
|
|
types.FunctionDeclaration(
|
|
name="ae_db_show_view",
|
|
description=(
|
|
"Return the CREATE VIEW SQL for an Aether Platform database view. "
|
|
"Use to understand how a view is constructed before querying it, "
|
|
"or to debug unexpected results from a view. "
|
|
"Example: view_name='v_active_journals'."
|
|
),
|
|
parameters=types.Schema(
|
|
type=types.Type.OBJECT,
|
|
properties={
|
|
"view_name": types.Schema(
|
|
type=types.Type.STRING,
|
|
description="View name (letters, digits, underscores only)",
|
|
),
|
|
},
|
|
required=["view_name"],
|
|
),
|
|
),
|
|
types.FunctionDeclaration(
|
|
name="ae_db_query",
|
|
description=(
|
|
"Run a read-only SQL query against the Aether Platform MariaDB. "
|
|
"Permitted statements: SELECT, SHOW, DESCRIBE, EXPLAIN. No writes are possible. "
|
|
"Use for debugging: bad data, missing records, broken foreign keys, schema questions. "
|
|
"Results capped at 200 rows; cells truncated at 120 chars. "
|
|
"Examples: SELECT * FROM clients WHERE email = 'x@y.com'; "
|
|
"SELECT COUNT(*) FROM time_entries WHERE billed = 0 AND deleted_at IS NULL; "
|
|
"SHOW TABLES; DESCRIBE ae_journals; "
|
|
"SELECT id_random, enable, deleted_at FROM ae_journals WHERE id_random = 'abc123'."
|
|
),
|
|
parameters=types.Schema(
|
|
type=types.Type.OBJECT,
|
|
properties={
|
|
"sql": types.Schema(
|
|
type=types.Type.STRING,
|
|
description=(
|
|
"SQL query to run — SELECT, SHOW, DESCRIBE, or EXPLAIN only. "
|
|
"No semicolons required but harmless if present."
|
|
),
|
|
),
|
|
},
|
|
required=["sql"],
|
|
),
|
|
),
|
|
]
|