feat: NCT orchestrator support + Home Assistant webhook

nextcloud_talk.py:
- Fix missing import hmac / import hashlib (NameError bug in _verify_signature)
- Add orchestrator routing when channels.json "tools": true — sends
  " Working on it…" immediately, then runs the full tool loop and
  replies with the result; checkpoint case gets a web UI confirmation note
- Read tier and role from channel config (defaults: default_tier / "chat")
- Pass cfg through to _process_message

homeassistant.py (new):
- POST /webhook/ha/{username}/{webhook_id}
- Auth: webhook_id path segment matched against channels.json
- Accepts JSON or form-encoded body from HA automations
- Builds natural-language task from payload (uses "message" key if present,
  otherwise serialises full body as context)
- Same orchestrator/direct dispatch as NCT
- Delivers response via notify() — NC Talk, web push, or configured channel
- Session key: ha_{username} for continuity across HA events
- Registered in main.py; /webhook/ prefix already public in auth_middleware

channels.json schema addition:
  "homeassistant": {
    "webhook_id": "your-secret-id",
    "persona": "inara",
    "tier": 2,
    "role": "chat",
    "tools": false
  }

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Scott Idem
2026-05-11 19:45:59 -04:00
parent 19d6f004ed
commit 1d361fe809
3 changed files with 273 additions and 11 deletions

View File

@@ -8,7 +8,7 @@ logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(messag
from config import settings
from auth_middleware import SessionAuthMiddleware
from routers import chat, google_chat, nextcloud_talk, files, distill, auth, orchestrator
from routers import chat, google_chat, nextcloud_talk, homeassistant, files, distill, auth, orchestrator
from routers import ui, onboarding, settings, help, auth_google, local_llm, push, audit, usage
@@ -30,6 +30,7 @@ app.add_middleware(SessionAuthMiddleware)
app.include_router(chat.router)
app.include_router(google_chat.router)
app.include_router(nextcloud_talk.router)
app.include_router(homeassistant.router)
app.include_router(files.router)
app.include_router(distill.router)
app.include_router(auth.router)

View File

@@ -0,0 +1,192 @@
"""
Home Assistant webhook router — POST /webhook/ha/{username}/{webhook_id}
Receives event payloads from HA automations and routes them to Inara.
Auth: the webhook_id in the URL acts as the shared secret (same model HA uses).
Response is delivered async via notify() — NC Talk, web push, etc.
channels.json schema:
"homeassistant": {
"webhook_id": "your-secret-id",
"persona": "inara",
"tier": 2,
"role": "chat",
"tools": false
}
HA automation example (rest_command):
rest_command:
cortex_notify:
url: "https://cortex.dgrzone.com/webhook/ha/scott/your-secret-id"
method: POST
content_type: "application/json"
payload: '{"message": "{{message}}", "entity_id": "{{entity_id}}", "state": "{{state}}"}'
automation:
trigger:
- trigger: state
entity_id: binary_sensor.front_door
to: "on"
action:
- action: rest_command.cortex_notify
data:
message: "Front door opened"
entity_id: "binary_sensor.front_door"
state: "on"
"""
import json
import logging
from fastapi import APIRouter, BackgroundTasks, HTTPException, Request, Response
from auth_utils import get_user_channels, get_user_gemini_key, get_user_role, get_tool_policy
from context_loader import load_context
from llm_client import complete
from notification import notify
from persona import set_context
from session_logger import log_turn
from session_store import load as load_session, save as save_session
from config import settings
import event_bus
import model_registry
import orchestrator_engine
import openai_orchestrator
logger = logging.getLogger(__name__)
router = APIRouter()
def _build_task(body: dict) -> str:
"""Turn an HA event payload into a natural-language prompt for Inara."""
if "message" in body:
msg = str(body["message"])
extras = {k: body[k] for k in ("entity_id", "state", "trigger", "event", "area") if k in body}
if extras:
msg += "\n\nContext: " + json.dumps(extras)
return msg
return "Home Assistant event:\n" + json.dumps(body, indent=2)
async def _process_event(username: str, body: dict, cfg: dict) -> None:
persona_name = cfg.get("persona", "inara")
tier = cfg.get("tier") or settings.default_tier
role = cfg.get("role", "chat")
use_tools = cfg.get("tools", False)
set_context(username, persona_name)
task = _build_task(body)
session_id = f"ha_{username}"
history = load_session(session_id)
session_msgs = list(history)
logger.info("HA event for %s: %r", username, task[:80])
backend = "unknown"
try:
if use_tools:
role_cfg = model_registry.get_role_config(username, role)
system_prompt = load_context(
tier,
role_append=role_cfg.get("system_append", ""),
inject_datetime=role_cfg.get("inject_datetime", True),
inject_mode=role_cfg.get("inject_mode", True),
)
orch_model = model_registry.get_model_for_role(username, "orchestrator")
user_role_val = get_user_role(username)
tool_list = role_cfg.get("tools")
policy = get_tool_policy(username)
c_allow = set(policy.get("allow", []))
c_deny = set(policy.get("deny", []))
if orch_model and orch_model.get("type") == "local_openai":
result = await openai_orchestrator.run(
task=task,
system_prompt=system_prompt,
session_messages=session_msgs or None,
model_cfg=orch_model,
user_role=user_role_val,
tool_list=tool_list,
confirm_allow=c_allow,
confirm_deny=c_deny,
)
else:
gemini_key = (
(orch_model.get("api_key") if orch_model else None)
or get_user_gemini_key(username)
)
result = await orchestrator_engine.run(
task=task,
system_prompt=system_prompt,
session_messages=session_msgs or None,
respond_with_claude=True,
gemini_api_key=gemini_key,
model_name=orch_model.get("model_name") if orch_model else None,
response_role=role,
user_role=user_role_val,
tool_list=tool_list,
confirm_allow=c_allow,
confirm_deny=c_deny,
)
response_text = result.response
backend = result.backend
else:
system_prompt = load_context(tier)
msgs = list(session_msgs) + [{"role": "user", "content": task}]
response_text, backend = await complete(system_prompt=system_prompt, messages=msgs)
except Exception as e:
logger.error("HA event error for %s: %s", username, e)
return
logger.info("HA response via %s (%d chars)", backend, len(response_text))
history.append({"role": "user", "content": task})
history.append({"role": "assistant", "content": response_text})
save_session(session_id, history)
log_turn(session_id, task, response_text)
await event_bus.publish({
"type": "ha_event",
"session_id": session_id,
"response": response_text,
"backend": backend,
})
await notify(username, response_text)
@router.post("/webhook/ha/{username}/{webhook_id}")
async def ha_webhook(
username: str,
webhook_id: str,
request: Request,
background_tasks: BackgroundTasks,
) -> Response:
"""Receive an event from a Home Assistant automation and route it to Inara."""
channels = get_user_channels(username)
cfg = channels.get("homeassistant")
if not cfg:
raise HTTPException(status_code=404, detail="Channel not configured")
if webhook_id != cfg.get("webhook_id", ""):
logger.warning("HA webhook: bad webhook_id for user %r", username)
raise HTTPException(status_code=401, detail="Invalid webhook ID")
content_type = request.headers.get("content-type", "")
if "application/json" in content_type:
try:
body = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON")
else:
form = await request.form()
body = dict(form)
if not body:
return Response(status_code=200)
background_tasks.add_task(_process_event, username, body, cfg)
return Response(status_code=200)

View File

@@ -1,10 +1,12 @@
import asyncio
import hashlib
import hmac
import json
import logging
from fastapi import APIRouter, BackgroundTasks, HTTPException, Request, Response
from auth_utils import get_user_channels
from auth_utils import get_user_channels, get_user_gemini_key, get_user_role, get_tool_policy
from context_loader import load_context
from llm_client import complete
from notification import _send_nct_message
@@ -13,6 +15,9 @@ from session_logger import log_turn
from session_store import load as load_session, save as save_session
from config import settings
import event_bus
import model_registry
import orchestrator_engine
import openai_orchestrator
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
@@ -50,15 +55,19 @@ async def _process_message(
nextcloud_url: str,
secret: str,
timeout: int,
cfg: dict,
) -> None:
logger.info("NCT process: token=%s user=%s text=%r", conversation_token, actor_name, user_text)
set_context(username, persona_name)
session_id = f"nct_{username}_{conversation_token}"
system_prompt = load_context(settings.default_tier)
history = load_session(session_id)
history.append({"role": "user", "content": user_text})
tier = cfg.get("tier") or settings.default_tier
role = cfg.get("role", "chat")
use_tools = cfg.get("tools", False)
session_id = f"nct_{username}_{conversation_token}"
history = load_session(session_id)
session_msgs = list(history) # snapshot before we append
await event_bus.publish({
"type": "nct_message",
@@ -68,11 +77,69 @@ async def _process_message(
"actor": actor_name,
})
backend = "unknown"
try:
response_text, backend = await asyncio.wait_for(
complete(system_prompt=system_prompt, messages=history),
timeout=timeout,
)
if use_tools:
await _send_reply(conversation_token, "⏳ Working on it…", nextcloud_url, secret)
role_cfg = model_registry.get_role_config(username, role)
system_prompt = load_context(
tier,
role_append=role_cfg.get("system_append", ""),
inject_datetime=role_cfg.get("inject_datetime", True),
inject_mode=role_cfg.get("inject_mode", True),
)
orch_model = model_registry.get_model_for_role(username, "orchestrator")
user_role_val = get_user_role(username)
tool_list = role_cfg.get("tools")
policy = get_tool_policy(username)
c_allow = set(policy.get("allow", []))
c_deny = set(policy.get("deny", []))
if orch_model and orch_model.get("type") == "local_openai":
result = await openai_orchestrator.run(
task=user_text,
system_prompt=system_prompt,
session_messages=session_msgs or None,
model_cfg=orch_model,
user_role=user_role_val,
tool_list=tool_list,
confirm_allow=c_allow,
confirm_deny=c_deny,
)
else:
gemini_key = (
(orch_model.get("api_key") if orch_model else None)
or get_user_gemini_key(username)
)
result = await orchestrator_engine.run(
task=user_text,
system_prompt=system_prompt,
session_messages=session_msgs or None,
respond_with_claude=True,
gemini_api_key=gemini_key,
model_name=orch_model.get("model_name") if orch_model else None,
response_role=role,
user_role=user_role_val,
tool_list=tool_list,
confirm_allow=c_allow,
confirm_deny=c_deny,
)
response_text = result.response
backend = result.backend
if result.checkpoint:
response_text += "\n\n_(This action requires confirmation — use the web UI to approve or deny.)_"
else:
system_prompt = load_context(tier)
history_for_llm = list(session_msgs) + [{"role": "user", "content": user_text}]
response_text, backend = await asyncio.wait_for(
complete(system_prompt=system_prompt, messages=history_for_llm),
timeout=timeout,
)
except asyncio.TimeoutError:
logger.warning("NCT timeout for %s", conversation_token)
await _send_reply(conversation_token, "⏳ Still thinking — this is taking longer than usual.", nextcloud_url, secret)
@@ -83,6 +150,8 @@ async def _process_message(
return
logger.info("NCT LLM responded via %s (%d chars)", backend, len(response_text))
history.append({"role": "user", "content": user_text})
history.append({"role": "assistant", "content": response_text})
save_session(session_id, history)
log_turn(session_id, user_text, response_text)
@@ -163,6 +232,6 @@ async def nextcloud_talk_webhook(username: str, request: Request, background_tas
background_tasks.add_task(
_process_message,
conversation_token, user_text, actor_name,
username, persona_name, nextcloud_url, secret, timeout,
username, persona_name, nextcloud_url, secret, timeout, cfg,
)
return Response(status_code=200)