diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index dcdcb1fe..9907cde8 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -55,6 +55,7 @@ from .health import ( ) from .orchestrator import ( OrchestratorEvent, + OrchestratorEventsResponse, ) from .logs import ( Bounty, @@ -186,6 +187,7 @@ __all__ = [ "HealthResponse", # orchestrator "OrchestratorEvent", + "OrchestratorEventsResponse", # logs "Bounty", "BountyResponse", diff --git a/decnet/web/db/models/orchestrator.py b/decnet/web/db/models/orchestrator.py index cd02087e..ac71b7ba 100644 --- a/decnet/web/db/models/orchestrator.py +++ b/decnet/web/db/models/orchestrator.py @@ -5,9 +5,10 @@ cleanly separable from synthetic life-injection events at query time. The orchestrator worker is the sole writer. """ from datetime import datetime, timezone -from typing import Optional +from typing import Any, List, Optional from uuid import uuid4 +from pydantic import BaseModel from sqlalchemy import Column, Index, Text from sqlmodel import Field, SQLModel @@ -50,3 +51,10 @@ class OrchestratorEvent(SQLModel, table=True): payload: str = Field( sa_column=Column("payload", Text, nullable=False, default="{}") ) + + +class OrchestratorEventsResponse(BaseModel): + total: int + limit: int + offset: int + data: List[dict[str, Any]] diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 1959a590..65864fc9 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -874,7 +874,31 @@ class BaseRepository(ABC): raise NotImplementedError async def list_orchestrator_events( - self, *, limit: int = 100, since_ts: Optional[Any] = None + self, + limit: int = 100, + offset: int = 0, + *, + kind: Optional[str] = None, + since_ts: Optional[Any] = None, ) -> list[dict[str, Any]]: - """Return recent orchestrator events newest-first.""" + """Paginated orchestrator events newest-first. + + ``kind`` filters to ``"traffic"`` | ``"file"`` (matches + :class:`OrchestratorEvent.kind`). ``since_ts`` is the snapshot + delta filter used by SSE replay; leave ``None`` for the list view. + """ + raise NotImplementedError + + async def count_orchestrator_events( + self, *, kind: Optional[str] = None, + ) -> int: + """Total orchestrator-event rows, optionally filtered by kind.""" + raise NotImplementedError + + async def prune_orchestrator_events(self, per_dst_cap: int = 10000) -> int: + """Trim per-``dst_decky_uuid`` rows to a cap. Returns deleted count. + + Periodic prune target — keeps the orchestrator_events table from + unbounded growth without paying the cost on every write. + """ raise NotImplementedError diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index aa9d82f1..75f03ae7 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -2816,14 +2816,59 @@ class SQLModelRepository(BaseRepository): async def list_orchestrator_events( self, - *, limit: int = 100, + offset: int = 0, + *, + kind: Optional[str] = None, since_ts: Optional[datetime] = None, ) -> list[dict[str, Any]]: async with self._session() as session: stmt = select(OrchestratorEvent) + if kind is not None: + stmt = stmt.where(OrchestratorEvent.kind == kind) if since_ts is not None: stmt = stmt.where(OrchestratorEvent.ts >= since_ts) - stmt = stmt.order_by(desc(OrchestratorEvent.ts)).limit(limit) + stmt = ( + stmt.order_by(desc(OrchestratorEvent.ts)) + .offset(offset) + .limit(limit) + ) result = await session.execute(stmt) return [r.model_dump(mode="json") for r in result.scalars().all()] + + async def count_orchestrator_events( + self, *, kind: Optional[str] = None, + ) -> int: + stmt = select(func.count()).select_from(OrchestratorEvent) + if kind is not None: + stmt = stmt.where(OrchestratorEvent.kind == kind) + async with self._session() as session: + result = await session.execute(stmt) + return result.scalar() or 0 + + async def prune_orchestrator_events(self, per_dst_cap: int = 10000) -> int: + """Trim per-dst rows to *per_dst_cap*, oldest-first. Returns deleted count.""" + deleted = 0 + async with self._session() as session: + dst_rows = await session.execute( + select(OrchestratorEvent.dst_decky_uuid).distinct() + ) + for (dst,) in dst_rows.all(): + keep = await session.execute( + select(OrchestratorEvent.uuid) + .where(OrchestratorEvent.dst_decky_uuid == dst) + .order_by(desc(OrchestratorEvent.ts)) + .limit(per_dst_cap) + ) + keep_uuids = [u for (u,) in keep.all()] + if not keep_uuids: + continue + from sqlalchemy import delete as _delete + stmt = _delete(OrchestratorEvent).where( + OrchestratorEvent.dst_decky_uuid == dst, + OrchestratorEvent.uuid.notin_(keep_uuids), + ) + res = await session.execute(stmt) + deleted += res.rowcount or 0 + await session.commit() + return deleted diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index f61789d3..8abb4443 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -29,6 +29,8 @@ from .campaigns.api_list_campaigns import router as campaigns_list_router from .campaigns.api_get_campaign_detail import router as campaign_detail_router from .campaigns.api_list_campaign_identities import router as campaign_identities_router from .campaigns.api_events import router as campaign_events_router +from .orchestrator.api_list_events import router as orchestrator_list_router +from .orchestrator.api_events import router as orchestrator_events_router from .transcripts import transcripts_router from .config.api_get_config import router as config_get_router from .config.api_update_config import router as config_update_router @@ -104,6 +106,8 @@ api_router.include_router(campaigns_list_router) api_router.include_router(campaign_detail_router) api_router.include_router(campaign_identities_router) api_router.include_router(campaign_events_router) +api_router.include_router(orchestrator_list_router) +api_router.include_router(orchestrator_events_router) # Observability api_router.include_router(stats_router) diff --git a/decnet/web/router/orchestrator/__init__.py b/decnet/web/router/orchestrator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/decnet/web/router/orchestrator/api_events.py b/decnet/web/router/orchestrator/api_events.py new file mode 100644 index 00000000..465a7a2b --- /dev/null +++ b/decnet/web/router/orchestrator/api_events.py @@ -0,0 +1,123 @@ +"""SSE stream of orchestrator events. + +Subscribes to ``orchestrator.>`` for the duration of the request and +forwards each event as a Server-Sent Event. Emits a one-shot snapshot +on connect (latest 50 rows). + +Mirror of :mod:`decnet.web.router.campaigns.api_events`. +""" +from __future__ import annotations + +import asyncio +from typing import AsyncGenerator + +import orjson +from fastapi import APIRouter, Depends, Request +from fastapi.responses import StreamingResponse + +from decnet.bus import topics as _topics +from decnet.bus.app import get_app_bus +from decnet.logging import get_logger +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import repo, require_stream_viewer +from decnet.web.sse_limits import sse_connection_slot + +log = get_logger("api.orchestrator.events") + +router = APIRouter() + +_KEEPALIVE_SECS = 15.0 +_SNAPSHOT_LIMIT = 50 + + +def _format_sse(event_name: str, data: dict) -> str: + return f"event: {event_name}\ndata: {orjson.dumps(data).decode()}\n\n" + + +@router.get( + "/orchestrator/events/stream", + tags=["Orchestrator"], + responses={ + 200: { + "content": {"text/event-stream": {}}, + "description": "SSE stream of orchestrator events", + }, + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 429: {"description": "Per-user SSE connection cap reached"}, + }, +) +@_traced("api.orchestrator.events") +async def api_orchestrator_events( + request: Request, + user: dict = Depends(require_stream_viewer), +) -> StreamingResponse: + snapshot = await repo.list_orchestrator_events( + limit=_SNAPSHOT_LIMIT, offset=0, + ) + + async def generator() -> AsyncGenerator[str, None]: + async with sse_connection_slot(user["uuid"]): + yield ": keepalive\n\n" + yield _format_sse("snapshot", {"events": snapshot}) + + bus = await get_app_bus() + if bus is None: + while not await request.is_disconnected(): + try: + await asyncio.sleep(_KEEPALIVE_SECS) + except asyncio.CancelledError: + break + yield ": keepalive\n\n" + return + + sub = bus.subscribe(f"{_topics.ORCHESTRATOR}.>") + try: + async with sub: + sub_iter = sub.__aiter__() + while True: + if await request.is_disconnected(): + break + next_task = asyncio.ensure_future(sub_iter.__anext__()) + try: + event = await asyncio.wait_for( + next_task, timeout=_KEEPALIVE_SECS, + ) + except asyncio.TimeoutError: + next_task.cancel() + yield ": keepalive\n\n" + continue + except StopAsyncIteration: + break + yield _format_sse( + _sse_name_for(event.topic), + { + "topic": event.topic, + "type": event.type, + "ts": event.ts, + "payload": event.payload, + }, + ) + except asyncio.CancelledError: + pass + except Exception: + log.exception("orchestrator events stream crashed") + yield _format_sse("error", {"message": "Stream interrupted"}) + + return StreamingResponse( + generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) + + +def _sse_name_for(topic: str) -> str: + """``orchestrator.traffic.`` → ``traffic``; + ``orchestrator.file.`` → ``file``.""" + parts = topic.split(".", 2) + if len(parts) >= 2 and parts[0] == _topics.ORCHESTRATOR: + return parts[1] + return topic diff --git a/decnet/web/router/orchestrator/api_list_events.py b/decnet/web/router/orchestrator/api_list_events.py new file mode 100644 index 00000000..22838915 --- /dev/null +++ b/decnet/web/router/orchestrator/api_list_events.py @@ -0,0 +1,37 @@ +"""GET /api/v1/orchestrator/events — paginated orchestrator activity. + +Mirrors :mod:`decnet.web.router.campaigns.api_list_campaigns`. The +orchestrator worker is the sole writer; this surface is read-only. +""" +from typing import Any, Optional + +from fastapi import APIRouter, Depends, Query + +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import repo, require_viewer + +router = APIRouter() + + +@router.get( + "/orchestrator/events", + tags=["Orchestrator"], + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 422: {"description": "Validation error"}, + }, +) +@_traced("api.list_orchestrator_events") +async def list_orchestrator_events( + limit: int = Query(50, ge=1, le=1000), + offset: int = Query(0, ge=0, le=2147483647), + kind: Optional[str] = Query(None, pattern="^(traffic|file)$"), + user: dict = Depends(require_viewer), +) -> dict[str, Any]: + """Paginated orchestrator-event list, newest first.""" + data = await repo.list_orchestrator_events( + limit=limit, offset=offset, kind=kind, + ) + total = await repo.count_orchestrator_events(kind=kind) + return {"total": total, "limit": limit, "offset": offset, "data": data} diff --git a/tests/api/orchestrator/__init__.py b/tests/api/orchestrator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/api/orchestrator/test_events_stream.py b/tests/api/orchestrator/test_events_stream.py new file mode 100644 index 00000000..e20fc05e --- /dev/null +++ b/tests/api/orchestrator/test_events_stream.py @@ -0,0 +1,166 @@ +"""SSE events stream + list — /api/v1/orchestrator/events{,/stream}.""" +from __future__ import annotations + +import asyncio +from unittest.mock import AsyncMock, patch + +import httpx +import pytest + +from decnet.bus import app as _bus_app +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus +from decnet.web.api import app + +_V1 = "/api/v1/orchestrator" + + +@pytest.fixture +def _fake_app_bus(monkeypatch): + bus = FakeBus() + + async def _get() -> FakeBus: + if not bus._connected: + await bus.connect() + return bus + + monkeypatch.setattr(_bus_app, "get_app_bus", _get) + from decnet.web.router.orchestrator import api_events as _ev + monkeypatch.setattr(_ev, "get_app_bus", _get) + return bus + + +@pytest.mark.anyio +async def test_events_unauthenticated_401(): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test", + ) as ac: + r = await ac.get(f"{_V1}/events") + assert r.status_code == 401 + + +@pytest.mark.anyio +async def test_stream_unauthenticated_401(): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test", + ) as ac: + r = await ac.get(f"{_V1}/events/stream") + assert r.status_code == 401 + + +@pytest.mark.asyncio +async def test_list_returns_paginated_envelope(): + from decnet.web.router.orchestrator.api_list_events import ( + list_orchestrator_events, + ) + + rows = [{"uuid": f"e-{n}", "kind": "traffic"} for n in range(3)] + with patch( + "decnet.web.router.orchestrator.api_list_events.repo" + ) as mock_repo: + mock_repo.list_orchestrator_events = AsyncMock(return_value=rows) + mock_repo.count_orchestrator_events = AsyncMock(return_value=3) + + result = await list_orchestrator_events( + limit=50, offset=0, kind=None, + user={"uuid": "u", "role": "viewer"}, + ) + + assert result == {"total": 3, "limit": 50, "offset": 0, "data": rows} + mock_repo.list_orchestrator_events.assert_awaited_once_with( + limit=50, offset=0, kind=None, + ) + + +@pytest.mark.asyncio +async def test_list_forwards_kind_filter(): + from decnet.web.router.orchestrator.api_list_events import ( + list_orchestrator_events, + ) + + with patch( + "decnet.web.router.orchestrator.api_list_events.repo" + ) as mock_repo: + mock_repo.list_orchestrator_events = AsyncMock(return_value=[]) + mock_repo.count_orchestrator_events = AsyncMock(return_value=0) + + await list_orchestrator_events( + limit=10, offset=20, kind="file", + user={"uuid": "u", "role": "viewer"}, + ) + + mock_repo.list_orchestrator_events.assert_awaited_once_with( + limit=10, offset=20, kind="file", + ) + mock_repo.count_orchestrator_events.assert_awaited_once_with(kind="file") + + +@pytest.mark.anyio +async def test_stream_emits_snapshot_and_live_events(_fake_app_bus): + from decnet.web.router.orchestrator import api_events as _ev + + class _FakeRequest: + async def is_disconnected(self) -> bool: + return False + + with patch( + "decnet.web.router.orchestrator.api_events.repo" + ) as mock_repo: + mock_repo.list_orchestrator_events = AsyncMock(return_value=[]) + response = await _ev.api_orchestrator_events( + request=_FakeRequest(), # type: ignore[arg-type] + user={"role": "admin", "uuid": "00000000-0000-0000-0000-000000000000"}, + ) + + gen = response.body_iterator + + def _as_text(frame) -> str: + return frame if isinstance(frame, str) else frame.decode() + + async def _publish_after_snapshot() -> None: + await asyncio.sleep(0.1) + await _fake_app_bus.publish( + _topics.orchestrator(_topics.ORCHESTRATOR_TRAFFIC, "decky-1"), + {"action": "exec:uptime", "success": True}, + event_type=_topics.ORCHESTRATOR_TRAFFIC, + ) + await asyncio.sleep(0.05) + await _fake_app_bus.publish( + _topics.orchestrator(_topics.ORCHESTRATOR_FILE, "decky-1"), + {"action": "file:create", "success": True}, + event_type=_topics.ORCHESTRATOR_FILE, + ) + + pub_task = asyncio.create_task(_publish_after_snapshot()) + + async def _drive(): + saw = {"snapshot": False, "traffic": False, "file": False} + for _ in range(8): + frame = _as_text(await gen.__anext__()) + for key in saw: + if f"event: {key}" in frame: + saw[key] = True + if all(saw.values()): + break + return saw + + try: + seen = await asyncio.wait_for(_drive(), timeout=5.0) + finally: + pub_task.cancel() + try: + await pub_task + except (asyncio.CancelledError, Exception): + pass + await gen.aclose() + + assert seen["snapshot"] + assert seen["traffic"] + assert seen["file"] + + +def test_sse_name_maps_topic_to_kind(): + from decnet.web.router.orchestrator.api_events import _sse_name_for + assert _sse_name_for("orchestrator.traffic.decky-1") == "traffic" + assert _sse_name_for("orchestrator.file.decky-1") == "file" + assert _sse_name_for("system.bus.health") == "system.bus.health" diff --git a/tests/orchestrator/test_repo_pagination.py b/tests/orchestrator/test_repo_pagination.py new file mode 100644 index 00000000..e28ff2e9 --- /dev/null +++ b/tests/orchestrator/test_repo_pagination.py @@ -0,0 +1,107 @@ +"""Pagination + filter + prune for orchestrator_events repo methods.""" +from __future__ import annotations + +import json + +import pytest + +from decnet.web.db.models import Topology, TopologyDecky +from decnet.web.db.sqlite.repository import SQLiteRepository + + +async def _make_repo(tmp_path, name: str) -> SQLiteRepository: + r = SQLiteRepository(db_path=str(tmp_path / name)) + await r.initialize() + return r + + +@pytest.mark.asyncio +async def test_empty_table_zero_total(tmp_path): + repo = await _make_repo(tmp_path, "orch.db") + assert await repo.list_orchestrator_events(limit=50, offset=0) == [] + assert await repo.count_orchestrator_events() == 0 + + +async def _seed_decky(repo: SQLiteRepository, name: str = "d-1") -> str: + async with repo._session() as session: + topo = Topology(name=f"t-{name}", config_snapshot="{}", status="active") + session.add(topo) + await session.commit() + await session.refresh(topo) + d = TopologyDecky( + topology_id=topo.id, name=name, + services=json.dumps(["ssh"]), ip="10.0.0.2", state="running", + ) + session.add(d) + await session.commit() + await session.refresh(d) + return d.uuid + + +async def _seed( + repo: SQLiteRepository, + n: int = 5, + kind: str = "traffic", + dst: str | None = None, +) -> str: + if dst is None: + dst = await _seed_decky(repo, "decky-A") + for i in range(n): + await repo.record_orchestrator_event({ + "kind": kind, + "protocol": "ssh", + "action": f"exec:{i}", + "src_decky_uuid": None, + "dst_decky_uuid": dst, + "success": True, + "payload": {"i": i}, + }) + return dst + + +@pytest.mark.asyncio +async def test_pagination_respects_limit_offset(tmp_path): + repo = await _make_repo(tmp_path, "p.db") + await _seed(repo, n=5) + + assert await repo.count_orchestrator_events() == 5 + page1 = await repo.list_orchestrator_events(limit=2, offset=0) + page2 = await repo.list_orchestrator_events(limit=2, offset=2) + assert len(page1) == 2 + assert len(page2) == 2 + assert {r["uuid"] for r in page1}.isdisjoint({r["uuid"] for r in page2}) + + +@pytest.mark.asyncio +async def test_kind_filter_narrows(tmp_path): + repo = await _make_repo(tmp_path, "k.db") + dst = await _seed_decky(repo, "decky-K") + for i in range(3): + await repo.record_orchestrator_event({ + "kind": "traffic", "protocol": "ssh", "action": f"a{i}", + "src_decky_uuid": None, "dst_decky_uuid": dst, + "success": True, "payload": {}, + }) + for i in range(2): + await repo.record_orchestrator_event({ + "kind": "file", "protocol": "ssh", "action": f"f{i}", + "src_decky_uuid": None, "dst_decky_uuid": dst, + "success": True, "payload": {}, + }) + + assert await repo.count_orchestrator_events() == 5 + assert await repo.count_orchestrator_events(kind="traffic") == 3 + assert await repo.count_orchestrator_events(kind="file") == 2 + + only_file = await repo.list_orchestrator_events(limit=50, kind="file") + assert {r["kind"] for r in only_file} == {"file"} + + +@pytest.mark.asyncio +async def test_prune_caps_per_dst(tmp_path): + repo = await _make_repo(tmp_path, "prune.db") + await _seed(repo, n=10) + + deleted = await repo.prune_orchestrator_events(per_dst_cap=3) + assert deleted == 7 + assert await repo.count_orchestrator_events() == 3