""" Home Assistant tools — read device states and call services. Credentials are read automatically from the current user's channels.json: "homeassistant": {"url": "https://ha.example.com", "token": ""} Configure in Settings → Notifications → Home Assistant. """ import json import logging import httpx from google.genai import types from auth_utils import get_user_channels from persona import get_user logger = logging.getLogger(__name__) _TIMEOUT = 10 # Attributes that are internal/noisy and not useful to show _SKIP_ATTRS = { "friendly_name", "icon", "entity_picture", "supported_features", "supported_color_modes", "color_mode", "min_color_temp_kelvin", "max_color_temp_kelvin", "min_mireds", "max_mireds", "assumed_state", "attribution", } def _get_ha_cfg() -> tuple[str, str]: """Return (base_url, token) from the current user's channels.json.""" channels = get_user_channels(get_user()) ha = channels.get("homeassistant") or {} url = (ha.get("url") or "").rstrip("/") token = ha.get("token") or "" if not url or not token: raise ValueError( "Home Assistant not configured — add URL and token in Settings → Notifications." ) return url, token def _auth(token: str) -> dict: return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"} def _fmt_state(s: dict) -> str: """Format a single HA state dict as a compact readable line.""" entity_id = s.get("entity_id", "") state = s.get("state", "unknown") attrs = s.get("attributes", {}) name = attrs.get("friendly_name", entity_id) label = f"{name} ({entity_id})" if name != entity_id else entity_id useful = {k: v for k, v in attrs.items() if k not in _SKIP_ATTRS} extra = "" if useful: parts = [] for k, v in list(useful.items())[:6]: # cap at 6 attrs per entity parts.append(f"{k}: {v}") extra = " [" + ", ".join(parts) + "]" return f"{label}: {state}{extra}" async def ha_get_state(entity_id: str) -> str: """Return the current state and attributes of a single Home Assistant entity.""" try: url, token = _get_ha_cfg() except ValueError as e: return str(e) try: async with httpx.AsyncClient(timeout=_TIMEOUT) as client: resp = await client.get(f"{url}/api/states/{entity_id}", headers=_auth(token)) if resp.status_code == 404: return f"Entity not found: {entity_id}" if resp.status_code != 200: return f"HA API error {resp.status_code}: {resp.text[:400]}" s = resp.json() attrs = s.get("attributes", {}) lines = [ f"**{attrs.get('friendly_name', entity_id)}** (`{entity_id}`)", f"State: **{s.get('state', 'unknown')}**", ] changed = (s.get("last_changed") or "")[:19].replace("T", " ") if changed: lines.append(f"Last changed: {changed} UTC") useful = {k: v for k, v in attrs.items() if k not in _SKIP_ATTRS} if useful: lines.append("Attributes:") for k, v in useful.items(): lines.append(f" {k}: {v}") return "\n".join(lines) except httpx.HTTPError as e: return f"Connection error: {e}" except Exception as e: logger.warning("ha_get_state error: %s", e) return f"Error: {e}" async def ha_get_states(domain: str = "", area: str = "") -> str: """List HA entity states, optionally filtered by domain (e.g. 'light') or area name.""" try: url, token = _get_ha_cfg() except ValueError as e: return str(e) try: async with httpx.AsyncClient(timeout=_TIMEOUT) as client: resp = await client.get(f"{url}/api/states", headers=_auth(token)) if resp.status_code != 200: return f"HA API error {resp.status_code}: {resp.text[:400]}" states = resp.json() if domain: states = [s for s in states if s.get("entity_id", "").startswith(f"{domain}.")] if area: al = area.lower() states = [s for s in states if al in (s.get("attributes", {}).get("friendly_name") or "").lower()] if not states: filters = [f"domain={domain}"] * bool(domain) + [f"area={area}"] * bool(area) return "No entities found" + (f" ({', '.join(filters)})" if filters else "") lines = [f"{len(states)} entit{'y' if len(states) == 1 else 'ies'}:"] for s in sorted(states, key=lambda x: x.get("entity_id", "")): lines.append(_fmt_state(s)) return "\n".join(lines) except httpx.HTTPError as e: return f"Connection error: {e}" except Exception as e: logger.warning("ha_get_states error: %s", e) return f"Error: {e}" async def ha_call_service( domain: str, service: str, entity_id: str = "", data: str = "", ) -> str: """Call a Home Assistant service (turn on/off lights, set thermostat, lock doors, etc.).""" try: url, token = _get_ha_cfg() except ValueError as e: return str(e) payload: dict = {} if entity_id: payload["entity_id"] = entity_id if data: try: extra = json.loads(data) if isinstance(extra, dict): payload.update(extra) except json.JSONDecodeError: return f"Invalid JSON in data: {data}" try: async with httpx.AsyncClient(timeout=_TIMEOUT) as client: resp = await client.post( f"{url}/api/services/{domain}/{service}", headers=_auth(token), json=payload, ) if resp.status_code not in (200, 201): return f"HA API error {resp.status_code}: {resp.text[:400]}" changed = resp.json() if not changed: return f"✓ {domain}.{service} called (no state changes reported)." lines = [f"✓ {domain}.{service} — {len(changed)} entity state(s) updated:"] for s in changed: lines.append(f" {s.get('entity_id', '')}: {s.get('state', '')}") return "\n".join(lines) except httpx.HTTPError as e: return f"Connection error: {e}" except Exception as e: logger.warning("ha_call_service error: %s", e) return f"Error: {e}" DECLARATIONS = [ types.FunctionDeclaration( name="ha_get_state", description=( "Get the current state and attributes of a single Home Assistant entity. " "Use to check if a light is on, read a thermostat temperature, check a " "door/window sensor, battery level, HVAC mode, etc. " "entity_id format: domain.name — e.g. light.living_room, switch.garage, " "climate.ecobee, binary_sensor.front_door, sensor.outdoor_temp." ), parameters=types.Schema( type=types.Type.OBJECT, properties={ "entity_id": types.Schema( type=types.Type.STRING, description="Full entity ID, e.g. light.living_room or climate.ecobee_main", ), }, required=["entity_id"], ), ), types.FunctionDeclaration( name="ha_get_states", description=( "List Home Assistant entity states, optionally filtered by domain or area. " "Use to survey what devices exist or check multiple entities at once. " "Domain examples: light, switch, sensor, climate, binary_sensor, lock, cover, " "media_player, input_boolean. Leave both blank to list everything (can be large)." ), parameters=types.Schema( type=types.Type.OBJECT, properties={ "domain": types.Schema( type=types.Type.STRING, description="Filter to this domain, e.g. 'light' or 'switch' (optional)", ), "area": types.Schema( type=types.Type.STRING, description="Filter by area name substring match on friendly name (optional)", ), }, ), ), types.FunctionDeclaration( name="ha_call_service", description=( "Call a Home Assistant service to control a device or trigger an automation. " "Requires user confirmation before executing. Common examples: " "domain=light service=turn_on entity_id=light.living_room; " "domain=light service=turn_off entity_id=light.all; " "domain=switch service=toggle entity_id=switch.garage; " "domain=climate service=set_temperature data={\"temperature\":72}; " "domain=lock service=lock entity_id=lock.front_door; " "domain=script service=turn_on entity_id=script.goodnight." ), parameters=types.Schema( type=types.Type.OBJECT, properties={ "domain": types.Schema( type=types.Type.STRING, description="Service domain: light, switch, climate, lock, cover, script, automation, etc.", ), "service": types.Schema( type=types.Type.STRING, description="Service name: turn_on, turn_off, toggle, set_temperature, lock, unlock, open, close, etc.", ), "entity_id": types.Schema( type=types.Type.STRING, description="Target entity ID — omit for services that don't target a specific entity", ), "data": types.Schema( type=types.Type.STRING, description='Extra service data as JSON string, e.g. {"temperature": 72, "hvac_mode": "heat"}', ), }, required=["domain", "service"], ), ), ]