From 1d361fe809f1fe964b8261631767e874692ecfec Mon Sep 17 00:00:00 2001 From: Scott Idem Date: Mon, 11 May 2026 19:45:59 -0400 Subject: [PATCH] feat: NCT orchestrator support + Home Assistant webhook MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit nextcloud_talk.py: - Fix missing import hmac / import hashlib (NameError bug in _verify_signature) - Add orchestrator routing when channels.json "tools": true — sends "⏳ Working on it…" immediately, then runs the full tool loop and replies with the result; checkpoint case gets a web UI confirmation note - Read tier and role from channel config (defaults: default_tier / "chat") - Pass cfg through to _process_message homeassistant.py (new): - POST /webhook/ha/{username}/{webhook_id} - Auth: webhook_id path segment matched against channels.json - Accepts JSON or form-encoded body from HA automations - Builds natural-language task from payload (uses "message" key if present, otherwise serialises full body as context) - Same orchestrator/direct dispatch as NCT - Delivers response via notify() — NC Talk, web push, or configured channel - Session key: ha_{username} for continuity across HA events - Registered in main.py; /webhook/ prefix already public in auth_middleware channels.json schema addition: "homeassistant": { "webhook_id": "your-secret-id", "persona": "inara", "tier": 2, "role": "chat", "tools": false } Co-Authored-By: Claude Sonnet 4.6 --- cortex/main.py | 3 +- cortex/routers/homeassistant.py | 192 +++++++++++++++++++++++++++++++ cortex/routers/nextcloud_talk.py | 89 ++++++++++++-- 3 files changed, 273 insertions(+), 11 deletions(-) create mode 100644 cortex/routers/homeassistant.py diff --git a/cortex/main.py b/cortex/main.py index fa133b9..58c59b1 100644 --- a/cortex/main.py +++ b/cortex/main.py @@ -8,7 +8,7 @@ logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(messag from config import settings from auth_middleware import SessionAuthMiddleware -from routers import chat, google_chat, nextcloud_talk, files, distill, auth, orchestrator +from routers import chat, google_chat, nextcloud_talk, homeassistant, files, distill, auth, orchestrator from routers import ui, onboarding, settings, help, auth_google, local_llm, push, audit, usage @@ -30,6 +30,7 @@ app.add_middleware(SessionAuthMiddleware) app.include_router(chat.router) app.include_router(google_chat.router) app.include_router(nextcloud_talk.router) +app.include_router(homeassistant.router) app.include_router(files.router) app.include_router(distill.router) app.include_router(auth.router) diff --git a/cortex/routers/homeassistant.py b/cortex/routers/homeassistant.py new file mode 100644 index 0000000..819e18b --- /dev/null +++ b/cortex/routers/homeassistant.py @@ -0,0 +1,192 @@ +""" +Home Assistant webhook router — POST /webhook/ha/{username}/{webhook_id} + +Receives event payloads from HA automations and routes them to Inara. +Auth: the webhook_id in the URL acts as the shared secret (same model HA uses). +Response is delivered async via notify() — NC Talk, web push, etc. + +channels.json schema: + "homeassistant": { + "webhook_id": "your-secret-id", + "persona": "inara", + "tier": 2, + "role": "chat", + "tools": false + } + +HA automation example (rest_command): + rest_command: + cortex_notify: + url: "https://cortex.dgrzone.com/webhook/ha/scott/your-secret-id" + method: POST + content_type: "application/json" + payload: '{"message": "{{message}}", "entity_id": "{{entity_id}}", "state": "{{state}}"}' + + automation: + trigger: + - trigger: state + entity_id: binary_sensor.front_door + to: "on" + action: + - action: rest_command.cortex_notify + data: + message: "Front door opened" + entity_id: "binary_sensor.front_door" + state: "on" +""" + +import json +import logging + +from fastapi import APIRouter, BackgroundTasks, HTTPException, Request, Response + +from auth_utils import get_user_channels, get_user_gemini_key, get_user_role, get_tool_policy +from context_loader import load_context +from llm_client import complete +from notification import notify +from persona import set_context +from session_logger import log_turn +from session_store import load as load_session, save as save_session +from config import settings +import event_bus +import model_registry +import orchestrator_engine +import openai_orchestrator + +logger = logging.getLogger(__name__) +router = APIRouter() + + +def _build_task(body: dict) -> str: + """Turn an HA event payload into a natural-language prompt for Inara.""" + if "message" in body: + msg = str(body["message"]) + extras = {k: body[k] for k in ("entity_id", "state", "trigger", "event", "area") if k in body} + if extras: + msg += "\n\nContext: " + json.dumps(extras) + return msg + return "Home Assistant event:\n" + json.dumps(body, indent=2) + + +async def _process_event(username: str, body: dict, cfg: dict) -> None: + persona_name = cfg.get("persona", "inara") + tier = cfg.get("tier") or settings.default_tier + role = cfg.get("role", "chat") + use_tools = cfg.get("tools", False) + + set_context(username, persona_name) + + task = _build_task(body) + session_id = f"ha_{username}" + history = load_session(session_id) + session_msgs = list(history) + + logger.info("HA event for %s: %r", username, task[:80]) + + backend = "unknown" + try: + if use_tools: + role_cfg = model_registry.get_role_config(username, role) + system_prompt = load_context( + tier, + role_append=role_cfg.get("system_append", ""), + inject_datetime=role_cfg.get("inject_datetime", True), + inject_mode=role_cfg.get("inject_mode", True), + ) + orch_model = model_registry.get_model_for_role(username, "orchestrator") + user_role_val = get_user_role(username) + tool_list = role_cfg.get("tools") + policy = get_tool_policy(username) + c_allow = set(policy.get("allow", [])) + c_deny = set(policy.get("deny", [])) + + if orch_model and orch_model.get("type") == "local_openai": + result = await openai_orchestrator.run( + task=task, + system_prompt=system_prompt, + session_messages=session_msgs or None, + model_cfg=orch_model, + user_role=user_role_val, + tool_list=tool_list, + confirm_allow=c_allow, + confirm_deny=c_deny, + ) + else: + gemini_key = ( + (orch_model.get("api_key") if orch_model else None) + or get_user_gemini_key(username) + ) + result = await orchestrator_engine.run( + task=task, + system_prompt=system_prompt, + session_messages=session_msgs or None, + respond_with_claude=True, + gemini_api_key=gemini_key, + model_name=orch_model.get("model_name") if orch_model else None, + response_role=role, + user_role=user_role_val, + tool_list=tool_list, + confirm_allow=c_allow, + confirm_deny=c_deny, + ) + response_text = result.response + backend = result.backend + + else: + system_prompt = load_context(tier) + msgs = list(session_msgs) + [{"role": "user", "content": task}] + response_text, backend = await complete(system_prompt=system_prompt, messages=msgs) + + except Exception as e: + logger.error("HA event error for %s: %s", username, e) + return + + logger.info("HA response via %s (%d chars)", backend, len(response_text)) + + history.append({"role": "user", "content": task}) + history.append({"role": "assistant", "content": response_text}) + save_session(session_id, history) + log_turn(session_id, task, response_text) + + await event_bus.publish({ + "type": "ha_event", + "session_id": session_id, + "response": response_text, + "backend": backend, + }) + + await notify(username, response_text) + + +@router.post("/webhook/ha/{username}/{webhook_id}") +async def ha_webhook( + username: str, + webhook_id: str, + request: Request, + background_tasks: BackgroundTasks, +) -> Response: + """Receive an event from a Home Assistant automation and route it to Inara.""" + channels = get_user_channels(username) + cfg = channels.get("homeassistant") + if not cfg: + raise HTTPException(status_code=404, detail="Channel not configured") + + if webhook_id != cfg.get("webhook_id", ""): + logger.warning("HA webhook: bad webhook_id for user %r", username) + raise HTTPException(status_code=401, detail="Invalid webhook ID") + + content_type = request.headers.get("content-type", "") + if "application/json" in content_type: + try: + body = await request.json() + except Exception: + raise HTTPException(status_code=400, detail="Invalid JSON") + else: + form = await request.form() + body = dict(form) + + if not body: + return Response(status_code=200) + + background_tasks.add_task(_process_event, username, body, cfg) + return Response(status_code=200) diff --git a/cortex/routers/nextcloud_talk.py b/cortex/routers/nextcloud_talk.py index f28123c..acd18c0 100644 --- a/cortex/routers/nextcloud_talk.py +++ b/cortex/routers/nextcloud_talk.py @@ -1,10 +1,12 @@ import asyncio +import hashlib +import hmac import json import logging from fastapi import APIRouter, BackgroundTasks, HTTPException, Request, Response -from auth_utils import get_user_channels +from auth_utils import get_user_channels, get_user_gemini_key, get_user_role, get_tool_policy from context_loader import load_context from llm_client import complete from notification import _send_nct_message @@ -13,6 +15,9 @@ from session_logger import log_turn from session_store import load as load_session, save as save_session from config import settings import event_bus +import model_registry +import orchestrator_engine +import openai_orchestrator logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -50,15 +55,19 @@ async def _process_message( nextcloud_url: str, secret: str, timeout: int, + cfg: dict, ) -> None: logger.info("NCT process: token=%s user=%s text=%r", conversation_token, actor_name, user_text) set_context(username, persona_name) - session_id = f"nct_{username}_{conversation_token}" - system_prompt = load_context(settings.default_tier) - history = load_session(session_id) - history.append({"role": "user", "content": user_text}) + tier = cfg.get("tier") or settings.default_tier + role = cfg.get("role", "chat") + use_tools = cfg.get("tools", False) + + session_id = f"nct_{username}_{conversation_token}" + history = load_session(session_id) + session_msgs = list(history) # snapshot before we append await event_bus.publish({ "type": "nct_message", @@ -68,11 +77,69 @@ async def _process_message( "actor": actor_name, }) + backend = "unknown" try: - response_text, backend = await asyncio.wait_for( - complete(system_prompt=system_prompt, messages=history), - timeout=timeout, - ) + if use_tools: + await _send_reply(conversation_token, "⏳ Working on it…", nextcloud_url, secret) + + role_cfg = model_registry.get_role_config(username, role) + system_prompt = load_context( + tier, + role_append=role_cfg.get("system_append", ""), + inject_datetime=role_cfg.get("inject_datetime", True), + inject_mode=role_cfg.get("inject_mode", True), + ) + orch_model = model_registry.get_model_for_role(username, "orchestrator") + user_role_val = get_user_role(username) + tool_list = role_cfg.get("tools") + policy = get_tool_policy(username) + c_allow = set(policy.get("allow", [])) + c_deny = set(policy.get("deny", [])) + + if orch_model and orch_model.get("type") == "local_openai": + result = await openai_orchestrator.run( + task=user_text, + system_prompt=system_prompt, + session_messages=session_msgs or None, + model_cfg=orch_model, + user_role=user_role_val, + tool_list=tool_list, + confirm_allow=c_allow, + confirm_deny=c_deny, + ) + else: + gemini_key = ( + (orch_model.get("api_key") if orch_model else None) + or get_user_gemini_key(username) + ) + result = await orchestrator_engine.run( + task=user_text, + system_prompt=system_prompt, + session_messages=session_msgs or None, + respond_with_claude=True, + gemini_api_key=gemini_key, + model_name=orch_model.get("model_name") if orch_model else None, + response_role=role, + user_role=user_role_val, + tool_list=tool_list, + confirm_allow=c_allow, + confirm_deny=c_deny, + ) + + response_text = result.response + backend = result.backend + + if result.checkpoint: + response_text += "\n\n_(This action requires confirmation — use the web UI to approve or deny.)_" + + else: + system_prompt = load_context(tier) + history_for_llm = list(session_msgs) + [{"role": "user", "content": user_text}] + response_text, backend = await asyncio.wait_for( + complete(system_prompt=system_prompt, messages=history_for_llm), + timeout=timeout, + ) + except asyncio.TimeoutError: logger.warning("NCT timeout for %s", conversation_token) await _send_reply(conversation_token, "⏳ Still thinking — this is taking longer than usual.", nextcloud_url, secret) @@ -83,6 +150,8 @@ async def _process_message( return logger.info("NCT LLM responded via %s (%d chars)", backend, len(response_text)) + + history.append({"role": "user", "content": user_text}) history.append({"role": "assistant", "content": response_text}) save_session(session_id, history) log_turn(session_id, user_text, response_text) @@ -163,6 +232,6 @@ async def nextcloud_talk_webhook(username: str, request: Request, background_tas background_tasks.add_task( _process_message, conversation_token, user_text, actor_name, - username, persona_name, nextcloud_url, secret, timeout, + username, persona_name, nextcloud_url, secret, timeout, cfg, ) return Response(status_code=200)