Add SSE real-time Talk activity, file editor UI, and identity file API
- event_bus.py: in-process asyncio pub/sub (one Queue per SSE client) - nextcloud_talk.py: publishes nct_message/nct_response events to bus - chat.py: GET /events SSE endpoint streams Talk activity to browser - routers/files.py: whitelist-protected GET/PUT for Inara identity .md files - main.py: register files router - static/index.html: real-time Talk feed, blue badge on Sessions btn, Files modal with preview/edit toggle and Ctrl+S save Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
33
cortex/event_bus.py
Normal file
33
cortex/event_bus.py
Normal file
@@ -0,0 +1,33 @@
|
||||
"""
|
||||
Simple in-process pub/sub for server-sent events.
|
||||
|
||||
Usage:
|
||||
# Publisher (e.g. nextcloud_talk router)
|
||||
await event_bus.publish({"type": "nct_message", ...})
|
||||
|
||||
# Consumer (SSE endpoint in chat router)
|
||||
q = event_bus.subscribe()
|
||||
try:
|
||||
event = await asyncio.wait_for(q.get(), timeout=20)
|
||||
finally:
|
||||
event_bus.unsubscribe(q)
|
||||
"""
|
||||
import asyncio
|
||||
from typing import Any
|
||||
|
||||
_subscribers: set[asyncio.Queue] = set()
|
||||
|
||||
|
||||
def subscribe() -> asyncio.Queue:
|
||||
q: asyncio.Queue = asyncio.Queue()
|
||||
_subscribers.add(q)
|
||||
return q
|
||||
|
||||
|
||||
def unsubscribe(q: asyncio.Queue) -> None:
|
||||
_subscribers.discard(q)
|
||||
|
||||
|
||||
async def publish(event: dict[str, Any]) -> None:
|
||||
for q in list(_subscribers):
|
||||
await q.put(event)
|
||||
@@ -8,7 +8,7 @@ import uvicorn
|
||||
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s")
|
||||
|
||||
from config import settings
|
||||
from routers import chat, google_chat, nextcloud_talk
|
||||
from routers import chat, google_chat, nextcloud_talk, files
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@@ -23,6 +23,7 @@ app = FastAPI(title="Cortex Dispatcher", lifespan=lifespan)
|
||||
app.include_router(chat.router)
|
||||
app.include_router(google_chat.router)
|
||||
app.include_router(nextcloud_talk.router)
|
||||
app.include_router(files.router)
|
||||
app.mount("/static", StaticFiles(directory="static"), name="static")
|
||||
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ from llm_client import complete
|
||||
from session_logger import log_turn
|
||||
from session_store import load as load_session, save as save_session, list_all, generate_session_id
|
||||
from config import settings
|
||||
import event_bus
|
||||
|
||||
|
||||
router = APIRouter()
|
||||
@@ -141,6 +142,30 @@ async def replace_history(session_id: str, req: HistoryUpdate) -> dict:
|
||||
return {"ok": True, "session_id": session_id}
|
||||
|
||||
|
||||
@router.get("/events")
|
||||
async def sse_events() -> StreamingResponse:
|
||||
"""Server-sent events stream — pushes real-time Talk activity to the browser."""
|
||||
async def stream():
|
||||
q = event_bus.subscribe()
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
event = await asyncio.wait_for(q.get(), timeout=20)
|
||||
yield f"data: {json.dumps(event)}\n\n"
|
||||
except asyncio.TimeoutError:
|
||||
yield 'data: {"type":"keepalive"}\n\n'
|
||||
except (GeneratorExit, asyncio.CancelledError):
|
||||
pass
|
||||
finally:
|
||||
event_bus.unsubscribe(q)
|
||||
|
||||
return StreamingResponse(
|
||||
stream(),
|
||||
media_type="text/event-stream",
|
||||
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
|
||||
)
|
||||
|
||||
|
||||
@router.post("/note")
|
||||
async def add_note(req: NoteRequest) -> dict:
|
||||
"""Inject a public note into session history so the LLM sees it next turn."""
|
||||
|
||||
57
cortex/routers/files.py
Normal file
57
cortex/routers/files.py
Normal file
@@ -0,0 +1,57 @@
|
||||
"""
|
||||
Read/write the Inara identity markdown files.
|
||||
Only whitelisted filenames are accessible — no path traversal possible.
|
||||
"""
|
||||
from fastapi import APIRouter, HTTPException
|
||||
from pydantic import BaseModel
|
||||
from config import settings
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
ALLOWED = {
|
||||
"SOUL.md",
|
||||
"IDENTITY.md",
|
||||
"USER.md",
|
||||
"MEMORY.md",
|
||||
"PROTOCOLS.md",
|
||||
"CONTEXT_TIERS.md",
|
||||
}
|
||||
|
||||
|
||||
def _path(filename: str):
|
||||
if filename not in ALLOWED:
|
||||
raise HTTPException(status_code=404, detail=f"File not found: {filename}")
|
||||
return settings.inara_path() / filename
|
||||
|
||||
|
||||
@router.get("/files")
|
||||
async def list_files() -> dict:
|
||||
inara_dir = settings.inara_path()
|
||||
files = []
|
||||
for name in sorted(ALLOWED):
|
||||
p = inara_dir / name
|
||||
files.append({
|
||||
"name": name,
|
||||
"exists": p.exists(),
|
||||
"size": p.stat().st_size if p.exists() else 0,
|
||||
})
|
||||
return {"files": files}
|
||||
|
||||
|
||||
@router.get("/files/{filename}")
|
||||
async def get_file(filename: str) -> dict:
|
||||
p = _path(filename)
|
||||
if not p.exists():
|
||||
raise HTTPException(status_code=404, detail=f"{filename} does not exist")
|
||||
return {"name": filename, "content": p.read_text()}
|
||||
|
||||
|
||||
class FileWrite(BaseModel):
|
||||
content: str
|
||||
|
||||
|
||||
@router.put("/files/{filename}")
|
||||
async def save_file(filename: str, req: FileWrite) -> dict:
|
||||
p = _path(filename)
|
||||
p.write_text(req.content)
|
||||
return {"ok": True, "name": filename, "size": len(req.content)}
|
||||
@@ -13,6 +13,7 @@ from context_loader import load_context
|
||||
from llm_client import complete
|
||||
from session_logger import log_turn
|
||||
from session_store import load as load_session, save as save_session
|
||||
import event_bus
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
@@ -78,6 +79,14 @@ async def _process_message(conversation_token: str, user_text: str, actor_name:
|
||||
history = load_session(session_id)
|
||||
history.append({"role": "user", "content": user_text})
|
||||
|
||||
await event_bus.publish({
|
||||
"type": "nct_message",
|
||||
"session_id": session_id,
|
||||
"role": "user",
|
||||
"content": user_text,
|
||||
"actor": actor_name,
|
||||
})
|
||||
|
||||
try:
|
||||
response_text, backend = await asyncio.wait_for(
|
||||
complete(system_prompt=system_prompt, messages=history),
|
||||
@@ -96,6 +105,15 @@ async def _process_message(conversation_token: str, user_text: str, actor_name:
|
||||
history.append({"role": "assistant", "content": response_text})
|
||||
save_session(session_id, history)
|
||||
log_turn(session_id, user_text, response_text)
|
||||
|
||||
await event_bus.publish({
|
||||
"type": "nct_response",
|
||||
"session_id": session_id,
|
||||
"role": "assistant",
|
||||
"content": response_text,
|
||||
"backend": backend,
|
||||
})
|
||||
|
||||
await _send_reply(conversation_token, response_text)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user