feat(web): orchestrator events read API + SSE stream

GET /api/v1/orchestrator/events — paginated list with optional
kind=traffic|file filter. GET /api/v1/orchestrator/events/stream —
SSE: snapshot on connect, live forward of orchestrator.> bus events
mapped to 'traffic' / 'file' SSE event names.

Repo gains list_orchestrator_events(limit, offset, kind?, since_ts?),
count_orchestrator_events(kind?), and prune_orchestrator_events
(per_dst_cap=10000) for periodic worker-side trimming.
This commit is contained in:
2026-04-26 19:58:12 -04:00
parent 900c0c3ef5
commit 5b5ff54fa2
11 changed files with 521 additions and 5 deletions

View File

@@ -55,6 +55,7 @@ from .health import (
)
from .orchestrator import (
OrchestratorEvent,
OrchestratorEventsResponse,
)
from .logs import (
Bounty,
@@ -186,6 +187,7 @@ __all__ = [
"HealthResponse",
# orchestrator
"OrchestratorEvent",
"OrchestratorEventsResponse",
# logs
"Bounty",
"BountyResponse",

View File

@@ -5,9 +5,10 @@ cleanly separable from synthetic life-injection events at query time.
The orchestrator worker is the sole writer.
"""
from datetime import datetime, timezone
from typing import Optional
from typing import Any, List, Optional
from uuid import uuid4
from pydantic import BaseModel
from sqlalchemy import Column, Index, Text
from sqlmodel import Field, SQLModel
@@ -50,3 +51,10 @@ class OrchestratorEvent(SQLModel, table=True):
payload: str = Field(
sa_column=Column("payload", Text, nullable=False, default="{}")
)
class OrchestratorEventsResponse(BaseModel):
total: int
limit: int
offset: int
data: List[dict[str, Any]]

View File

@@ -874,7 +874,31 @@ class BaseRepository(ABC):
raise NotImplementedError
async def list_orchestrator_events(
self, *, limit: int = 100, since_ts: Optional[Any] = None
self,
limit: int = 100,
offset: int = 0,
*,
kind: Optional[str] = None,
since_ts: Optional[Any] = None,
) -> list[dict[str, Any]]:
"""Return recent orchestrator events newest-first."""
"""Paginated orchestrator events newest-first.
``kind`` filters to ``"traffic"`` | ``"file"`` (matches
:class:`OrchestratorEvent.kind`). ``since_ts`` is the snapshot
delta filter used by SSE replay; leave ``None`` for the list view.
"""
raise NotImplementedError
async def count_orchestrator_events(
self, *, kind: Optional[str] = None,
) -> int:
"""Total orchestrator-event rows, optionally filtered by kind."""
raise NotImplementedError
async def prune_orchestrator_events(self, per_dst_cap: int = 10000) -> int:
"""Trim per-``dst_decky_uuid`` rows to a cap. Returns deleted count.
Periodic prune target — keeps the orchestrator_events table from
unbounded growth without paying the cost on every write.
"""
raise NotImplementedError

View File

@@ -2816,14 +2816,59 @@ class SQLModelRepository(BaseRepository):
async def list_orchestrator_events(
self,
*,
limit: int = 100,
offset: int = 0,
*,
kind: Optional[str] = None,
since_ts: Optional[datetime] = None,
) -> list[dict[str, Any]]:
async with self._session() as session:
stmt = select(OrchestratorEvent)
if kind is not None:
stmt = stmt.where(OrchestratorEvent.kind == kind)
if since_ts is not None:
stmt = stmt.where(OrchestratorEvent.ts >= since_ts)
stmt = stmt.order_by(desc(OrchestratorEvent.ts)).limit(limit)
stmt = (
stmt.order_by(desc(OrchestratorEvent.ts))
.offset(offset)
.limit(limit)
)
result = await session.execute(stmt)
return [r.model_dump(mode="json") for r in result.scalars().all()]
async def count_orchestrator_events(
self, *, kind: Optional[str] = None,
) -> int:
stmt = select(func.count()).select_from(OrchestratorEvent)
if kind is not None:
stmt = stmt.where(OrchestratorEvent.kind == kind)
async with self._session() as session:
result = await session.execute(stmt)
return result.scalar() or 0
async def prune_orchestrator_events(self, per_dst_cap: int = 10000) -> int:
"""Trim per-dst rows to *per_dst_cap*, oldest-first. Returns deleted count."""
deleted = 0
async with self._session() as session:
dst_rows = await session.execute(
select(OrchestratorEvent.dst_decky_uuid).distinct()
)
for (dst,) in dst_rows.all():
keep = await session.execute(
select(OrchestratorEvent.uuid)
.where(OrchestratorEvent.dst_decky_uuid == dst)
.order_by(desc(OrchestratorEvent.ts))
.limit(per_dst_cap)
)
keep_uuids = [u for (u,) in keep.all()]
if not keep_uuids:
continue
from sqlalchemy import delete as _delete
stmt = _delete(OrchestratorEvent).where(
OrchestratorEvent.dst_decky_uuid == dst,
OrchestratorEvent.uuid.notin_(keep_uuids),
)
res = await session.execute(stmt)
deleted += res.rowcount or 0
await session.commit()
return deleted

View File

@@ -29,6 +29,8 @@ from .campaigns.api_list_campaigns import router as campaigns_list_router
from .campaigns.api_get_campaign_detail import router as campaign_detail_router
from .campaigns.api_list_campaign_identities import router as campaign_identities_router
from .campaigns.api_events import router as campaign_events_router
from .orchestrator.api_list_events import router as orchestrator_list_router
from .orchestrator.api_events import router as orchestrator_events_router
from .transcripts import transcripts_router
from .config.api_get_config import router as config_get_router
from .config.api_update_config import router as config_update_router
@@ -104,6 +106,8 @@ api_router.include_router(campaigns_list_router)
api_router.include_router(campaign_detail_router)
api_router.include_router(campaign_identities_router)
api_router.include_router(campaign_events_router)
api_router.include_router(orchestrator_list_router)
api_router.include_router(orchestrator_events_router)
# Observability
api_router.include_router(stats_router)

View File

@@ -0,0 +1,123 @@
"""SSE stream of orchestrator events.
Subscribes to ``orchestrator.>`` for the duration of the request and
forwards each event as a Server-Sent Event. Emits a one-shot snapshot
on connect (latest 50 rows).
Mirror of :mod:`decnet.web.router.campaigns.api_events`.
"""
from __future__ import annotations
import asyncio
from typing import AsyncGenerator
import orjson
from fastapi import APIRouter, Depends, Request
from fastapi.responses import StreamingResponse
from decnet.bus import topics as _topics
from decnet.bus.app import get_app_bus
from decnet.logging import get_logger
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import repo, require_stream_viewer
from decnet.web.sse_limits import sse_connection_slot
log = get_logger("api.orchestrator.events")
router = APIRouter()
_KEEPALIVE_SECS = 15.0
_SNAPSHOT_LIMIT = 50
def _format_sse(event_name: str, data: dict) -> str:
return f"event: {event_name}\ndata: {orjson.dumps(data).decode()}\n\n"
@router.get(
"/orchestrator/events/stream",
tags=["Orchestrator"],
responses={
200: {
"content": {"text/event-stream": {}},
"description": "SSE stream of orchestrator events",
},
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
429: {"description": "Per-user SSE connection cap reached"},
},
)
@_traced("api.orchestrator.events")
async def api_orchestrator_events(
request: Request,
user: dict = Depends(require_stream_viewer),
) -> StreamingResponse:
snapshot = await repo.list_orchestrator_events(
limit=_SNAPSHOT_LIMIT, offset=0,
)
async def generator() -> AsyncGenerator[str, None]:
async with sse_connection_slot(user["uuid"]):
yield ": keepalive\n\n"
yield _format_sse("snapshot", {"events": snapshot})
bus = await get_app_bus()
if bus is None:
while not await request.is_disconnected():
try:
await asyncio.sleep(_KEEPALIVE_SECS)
except asyncio.CancelledError:
break
yield ": keepalive\n\n"
return
sub = bus.subscribe(f"{_topics.ORCHESTRATOR}.>")
try:
async with sub:
sub_iter = sub.__aiter__()
while True:
if await request.is_disconnected():
break
next_task = asyncio.ensure_future(sub_iter.__anext__())
try:
event = await asyncio.wait_for(
next_task, timeout=_KEEPALIVE_SECS,
)
except asyncio.TimeoutError:
next_task.cancel()
yield ": keepalive\n\n"
continue
except StopAsyncIteration:
break
yield _format_sse(
_sse_name_for(event.topic),
{
"topic": event.topic,
"type": event.type,
"ts": event.ts,
"payload": event.payload,
},
)
except asyncio.CancelledError:
pass
except Exception:
log.exception("orchestrator events stream crashed")
yield _format_sse("error", {"message": "Stream interrupted"})
return StreamingResponse(
generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
def _sse_name_for(topic: str) -> str:
"""``orchestrator.traffic.<uuid>`` → ``traffic``;
``orchestrator.file.<uuid>`` → ``file``."""
parts = topic.split(".", 2)
if len(parts) >= 2 and parts[0] == _topics.ORCHESTRATOR:
return parts[1]
return topic

View File

@@ -0,0 +1,37 @@
"""GET /api/v1/orchestrator/events — paginated orchestrator activity.
Mirrors :mod:`decnet.web.router.campaigns.api_list_campaigns`. The
orchestrator worker is the sole writer; this surface is read-only.
"""
from typing import Any, Optional
from fastapi import APIRouter, Depends, Query
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
@router.get(
"/orchestrator/events",
tags=["Orchestrator"],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
422: {"description": "Validation error"},
},
)
@_traced("api.list_orchestrator_events")
async def list_orchestrator_events(
limit: int = Query(50, ge=1, le=1000),
offset: int = Query(0, ge=0, le=2147483647),
kind: Optional[str] = Query(None, pattern="^(traffic|file)$"),
user: dict = Depends(require_viewer),
) -> dict[str, Any]:
"""Paginated orchestrator-event list, newest first."""
data = await repo.list_orchestrator_events(
limit=limit, offset=offset, kind=kind,
)
total = await repo.count_orchestrator_events(kind=kind)
return {"total": total, "limit": limit, "offset": offset, "data": data}