feat(orchestrator): authoritative failure-count badge endpoint (DEBT-042)

New GET /api/v1/orchestrator/events/stats?since=1h&success=false&kind=...
backed by repo.count_orchestrator_failures(since_ts, kind), which
counts failed rows across both orchestrator_events and
orchestrator_emails since the cutoff.

Window parser accepts ^\d+[smhd]$, capped at 7d. Today only
success=false is accepted on this surface so the endpoint isn't
accidentally repurposed before the next consumer is properly
designed.

Orchestrator.tsx polls the endpoint on mount + every 30 s and
renders the authoritative DB-derived count instead of deriving from
the in-memory SSE buffer + one paginated page (which silently
excluded failures older than the local window).
This commit is contained in:
2026-05-03 05:26:45 -04:00
parent 866a76eccf
commit 03beff3840
9 changed files with 431 additions and 28 deletions

View File

@@ -1131,6 +1131,17 @@ class BaseRepository(ABC):
"""Total orchestrator-event rows, optionally filtered by kind."""
raise NotImplementedError
async def count_orchestrator_failures(
self,
*,
since_ts: Any,
kind: Optional[str] = None,
) -> int:
"""Count failed orchestrator activity since *since_ts*, across
both event + email tables. Backs the dashboard's failure-count
badge (DEBT-042)."""
raise NotImplementedError
async def prune_orchestrator_events(self, per_dst_cap: int = 10000) -> int:
"""Trim per-``dst_decky_uuid`` rows to a cap. Returns deleted count.

View File

@@ -60,6 +60,46 @@ class OrchestratorMixin(_MixinBase):
result = await session.execute(stmt)
return result.scalar() or 0
async def count_orchestrator_failures(
self,
*,
since_ts: datetime,
kind: Optional[str] = None,
) -> int:
"""Count failed orchestrator activity since *since_ts*, across
both ``orchestrator_events`` (traffic / file) and
``orchestrator_emails`` (email).
Backs the dashboard's failure-count badge — see DEBT-042. The
in-memory window the badge previously computed against was
bounded by the SSE-buffer + paginated page, so failures older
than the local window read low. This is the authoritative count.
"""
async with self._session() as session:
ev_stmt = (
select(func.count()).select_from(OrchestratorEvent)
.where(
col(OrchestratorEvent.success).is_(False),
OrchestratorEvent.ts >= since_ts,
)
)
if kind in ("traffic", "file"):
ev_stmt = ev_stmt.where(OrchestratorEvent.kind == kind)
em_stmt = (
select(func.count()).select_from(OrchestratorEmail)
.where(
col(OrchestratorEmail.success).is_(False),
OrchestratorEmail.ts >= since_ts,
)
)
ev_count = 0
em_count = 0
if kind in (None, "traffic", "file"):
ev_count = (await session.execute(ev_stmt)).scalar() or 0
if kind in (None, "email"):
em_count = (await session.execute(em_stmt)).scalar() or 0
return ev_count + em_count
async def prune_orchestrator_events(self, per_dst_cap: int = 10000) -> int:
"""Trim per-dst rows to *per_dst_cap*, oldest-first. Returns deleted count."""
deleted = 0

View File

@@ -32,6 +32,7 @@ from .campaigns.api_list_campaign_identities import router as campaign_identitie
from .campaigns.api_events import router as campaign_events_router
from .orchestrator.api_list_events import router as orchestrator_list_router
from .orchestrator.api_events import router as orchestrator_events_router
from .orchestrator.api_event_stats import router as orchestrator_stats_router
from .realism.api_config import router as realism_config_router
from .realism.api_personas import router as realism_personas_router
from .realism.api_synthetic_files import router as realism_synthetic_files_router
@@ -123,6 +124,7 @@ api_router.include_router(campaign_identities_router)
api_router.include_router(campaign_events_router)
api_router.include_router(orchestrator_list_router)
api_router.include_router(orchestrator_events_router)
api_router.include_router(orchestrator_stats_router)
# Realism — global persona pool CRUD for the dashboard's
# "Persona Generation" page. The orchestrator reads from the same

View File

@@ -0,0 +1,99 @@
"""GET /api/v1/orchestrator/events/stats — authoritative failure count.
The dashboard's failure-count badge previously derived its number from
the in-memory SSE buffer + a single paginated page (capped at 500 +
limit rows). On busy fleets, failures older than the local window
were silently excluded and the badge read low — see DEBT-042. This
endpoint returns the real count straight from the DB so the badge
matches reality.
"""
from __future__ import annotations
import re
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from decnet.telemetry import traced as _traced
from decnet.web.dependencies import repo, require_viewer
router = APIRouter()
_SINCE_RE = re.compile(r"^(\d+)([smhd])$")
# Bounded to avoid unintentionally-expensive scans. 7d covers the
# operator UX use case (failure-count badge) while still returning
# in O(index seek + count).
_MAX_SINCE = timedelta(days=7)
def _parse_since(s: str) -> timedelta:
m = _SINCE_RE.match(s)
if not m:
raise HTTPException(
status_code=422,
detail="since must match ^(\\d+)[smhd]$ (e.g. '15m', '1h', '24h', '7d')",
)
value, unit = int(m.group(1)), m.group(2)
if value <= 0:
raise HTTPException(status_code=422, detail="since must be > 0")
delta = {
"s": timedelta(seconds=value),
"m": timedelta(minutes=value),
"h": timedelta(hours=value),
"d": timedelta(days=value),
}[unit]
if delta > _MAX_SINCE:
raise HTTPException(
status_code=422,
detail=f"since exceeds maximum window of {_MAX_SINCE}",
)
return delta
@router.get(
"/orchestrator/events/stats",
tags=["Orchestrator"],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
422: {"description": "Validation error"},
},
)
@_traced("api.orchestrator.events.stats")
async def orchestrator_event_stats(
since: str = Query("1h", description="Window relative to now, e.g. '15m', '1h', '24h'."),
success: Optional[bool] = Query(
None,
description="If set, restrict the count to rows with this success value.",
),
kind: Optional[str] = Query(
None, pattern="^(traffic|file|email)$",
),
user: dict = Depends(require_viewer),
) -> dict[str, Any]:
"""Aggregate counts for the orchestrator activity feed.
Today only the failure-count badge consumes this surface, so the
only supported aggregate is ``success=false`` (everything else is
rejected — ``success=true`` and the unfiltered total can be served
by the existing ``count`` on the list endpoint without a window
filter, and we'd rather not paint ourselves into a corner before
the next consumer shows up).
"""
if success is not False:
raise HTTPException(
status_code=422,
detail="only success=false is supported on this surface today",
)
delta = _parse_since(since)
since_ts = datetime.now(timezone.utc) - delta
count = await repo.count_orchestrator_failures(
since_ts=since_ts, kind=kind,
)
return {
"since": since,
"success": success,
"kind": kind,
"count": count,
}

View File

@@ -26,6 +26,26 @@ vi.mock('./useOrchestratorStream', () => ({
import api from '../utils/api';
const apiGet = api.get as ReturnType<typeof vi.fn>;
/** Two endpoints fire at mount — events list + failure-count stats.
* This dispatcher maps URLs to canned responses so per-test cases stay
* focused on the path they care about. */
const buildApiResponder = (overrides: {
events?: { data: unknown[]; total: number };
failures?: number;
} = {}) => {
const events = overrides.events ?? { data: [], total: 0 };
const failures = overrides.failures ?? 0;
return (url: string) => {
if (url.startsWith('/orchestrator/events/stats')) {
return Promise.resolve({ data: { count: failures } });
}
if (url.startsWith('/orchestrator/events')) {
return Promise.resolve({ data: events });
}
return Promise.resolve({ data: {} });
};
};
const renderPage = () =>
render(
<MemoryRouter initialEntries={['/orchestrator']}>
@@ -40,7 +60,7 @@ describe('Orchestrator', () => {
});
it('renders the empty state when the API returns no events', async () => {
apiGet.mockResolvedValueOnce({ data: { data: [], total: 0 } });
apiGet.mockImplementation(buildApiResponder());
renderPage();
@@ -50,11 +70,14 @@ describe('Orchestrator', () => {
});
it('switches the kind filter and refetches scoped to that kind', async () => {
apiGet.mockResolvedValue({ data: { data: [], total: 0 } });
apiGet.mockImplementation(buildApiResponder());
renderPage();
await waitFor(() => expect(apiGet).toHaveBeenCalledTimes(1));
expect(apiGet.mock.calls[0][0]).toMatch(/^\/orchestrator\/events\?limit=50&offset=0$/);
await waitFor(() =>
expect(
apiGet.mock.calls.some((c) => /^\/orchestrator\/events\?limit=50&offset=0$/.test(c[0])),
).toBe(true),
);
await userEvent.click(screen.getByRole('tab', { name: /^email$/ }));
@@ -65,7 +88,7 @@ describe('Orchestrator', () => {
});
it('prepends a row when the live stream pushes a traffic event', async () => {
apiGet.mockResolvedValueOnce({ data: { data: [], total: 0 } });
apiGet.mockImplementation(buildApiResponder());
renderPage();
await waitFor(() => expect(capturedOnEvent).not.toBeNull());
@@ -90,4 +113,31 @@ describe('Orchestrator', () => {
// 1 event shown after a single push.
expect(screen.getByText(/1 EVENTS SHOWN/i)).toBeInTheDocument();
});
it('renders the failure-count badge from the stats endpoint (DEBT-042)', async () => {
apiGet.mockImplementation(buildApiResponder({ failures: 42 }));
renderPage();
expect(await screen.findByText(/42 FAILURES \/ 1H/i)).toBeInTheDocument();
// Stats endpoint is the authoritative source — verify it was actually queried.
expect(
apiGet.mock.calls.some((c) =>
/\/orchestrator\/events\/stats\?since=1h&success=false/.test(c[0]),
),
).toBe(true);
});
it('hides the failure-count badge when the stats endpoint reports zero', async () => {
apiGet.mockImplementation(buildApiResponder({ failures: 0 }));
renderPage();
await waitFor(() =>
expect(
apiGet.mock.calls.some((c) => /\/orchestrator\/events\/stats/.test(c[0])),
).toBe(true),
);
expect(screen.queryByText(/FAILURES \/ 1H/i)).not.toBeInTheDocument();
});
});

View File

@@ -36,7 +36,6 @@ type KindFilter = 'all' | 'traffic' | 'file' | 'email';
type StreamStatus = 'connecting' | 'live' | 'error';
const ROW_CAP = 500;
const HOUR_MS = 60 * 60 * 1000;
const FRESH_MS = 5_000;
const timeAgo = (dateStr: string | null): string => {
@@ -64,6 +63,7 @@ const Orchestrator: React.FC = () => {
const [paused, setPaused] = useState(false);
const [now, setNow] = useState(Date.now());
const [selected, setSelected] = useState<OrchestratorEntry | null>(null);
const [failuresLastHour, setFailuresLastHour] = useState(0);
const limit = 50;
const pausedRef = useRef(paused);
@@ -75,6 +75,27 @@ const Orchestrator: React.FC = () => {
return () => clearInterval(t);
}, []);
// Authoritative failure count from the DB — see DEBT-042. The
// in-memory derivation it replaced was bounded by the SSE buffer +
// one paginated page, so failures older than the local window were
// silently excluded and the badge read low on busy fleets.
useEffect(() => {
let cancelled = false;
const fetchStats = async () => {
try {
const res = await api.get(
'/orchestrator/events/stats?since=1h&success=false',
);
if (!cancelled) setFailuresLastHour(res.data?.count ?? 0);
} catch {
// Silent: the badge is a hint, missing data shouldn't blow up the page.
}
};
fetchStats();
const t = setInterval(fetchStats, 30_000);
return () => { cancelled = true; clearInterval(t); };
}, []);
const fetchEvents = async () => {
setLoading(true);
try {
@@ -163,13 +184,6 @@ const Orchestrator: React.FC = () => {
return merged.filter((r) => r.kind === kindParam);
}, [streamRows, rows, kindParam]);
const failuresLastHour = useMemo(() => {
const cutoff = now - HOUR_MS;
return [...streamRows, ...rows].filter(
(r) => !r.success && new Date(r.ts).getTime() >= cutoff,
).length;
}, [streamRows, rows, now]);
const statusLabel =
status === 'live' ? 'LIVE'
: status === 'connecting' ? 'CONNECTING'

View File

@@ -498,19 +498,21 @@ Resolved 2026-05-03. All base images now carry `image:tag@sha256:<digest>` refer
~~**Files:** Project root~~
`requirements.lock` generated via `pip freeze`. Reproducible installs now available via `pip install -r requirements.lock`.
### DEBT-042 — Orchestrator failure-count badge is window-bound
**File:** `decnet_web/src/components/Orchestrator.tsx`
The "X failures / 1h" header badge is computed from the in-memory SSE
window (capped at 500 rows merged with one paginated server page). On
busy fleets — many deckies × dense activity — failures older than the
local window or beyond the visible page are silently excluded, so the
badge can read low. Acceptable for MVP; the badge is a hint, not a
metric.
**Remediation:** add a dedicated count endpoint
(`GET /api/v1/orchestrator/events/stats?since=1h&success=false`) and
have the badge call it on the same cadence the page already polls.
Trigger: first time the count visibly diverges from a hand-checked
DB query, or fleet size ≥ 10 active deckies.
### ~~DEBT-042 — Orchestrator failure-count badge is window-bound~~ ✅ RESOLVED 2026-05-03
**Files:** `decnet/web/router/orchestrator/api_event_stats.py` (new),
`decnet/web/db/sqlmodel_repo/orchestrator.py`, `decnet/web/db/repository.py`,
`decnet_web/src/components/Orchestrator.tsx`.
New `GET /api/v1/orchestrator/events/stats?since=1h&success=false&kind=...`
endpoint backed by `repo.count_orchestrator_failures(since_ts, kind)`,
which counts failed rows across both `orchestrator_events` and
`orchestrator_emails` since the cutoff. The badge polls the endpoint
on mount + every 30 s and renders the authoritative DB-derived count
instead of deriving from the SSE buffer + one paginated page. Window
parser accepts `^\d+[smhd]$`, capped at 7d. Today only `success=false`
is accepted on this surface (the only consumer); other modes are
rejected so the endpoint isn't accidentally repurposed before the
next consumer is properly designed. Repo + endpoint + badge tests
land in the same commit.
### ~~DEBT-043 — No frontend test framework configured~~ ✅ RESOLVED 2026-05-03
**Files:** `decnet_web/package.json`, `decnet_web/vite.config.ts`,
@@ -720,7 +722,7 @@ user who needs it.
| ~~DEBT-039~~ | ✅ | Honeypot / Cred emitters | resolved |
| ~~DEBT-040~~ | ✅ | Honeypot / RDP+SMB cred framers | resolved |
| ~~DEBT-041~~ | ✅ | API / UI / Threat-intel keying | resolved |
| DEBT-042 | 🟢 Low | UI / Orchestrator failure-count window | open |
| ~~DEBT-042~~ | ✅ | UI / Orchestrator failure-count window | resolved 2026-05-03 |
| ~~DEBT-043~~ | ✅ | Frontend test framework missing | resolved 2026-05-03 |
| ~~DEBT-044~~ | ✅ | TTP / Email producer wiring | resolved 2026-05-02 |
| DEBT-045 | 🟡 Medium | TTP / EmailLifter heavyweight extraction | partial paid 2026-05-02 |
@@ -729,5 +731,5 @@ user who needs it.
| DEBT-048 | 🟡 Medium | TTP / Intel provider mapping review (recurring) | open / recurring |
| DEBT-049 | 🟡 Medium | TTP / Sigma adapter (post-v1) | open |
**Remaining open:** DEBT-011 (Alembic), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-042 (orchestrator failure-count window), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-046 (mal-hash feed), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1).
**Remaining open:** DEBT-011 (Alembic), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-046 (mal-hash feed), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1).
**Estimated remaining effort:** ~21 hours plus the new EmailLifter / TTP follow-ups. DEBT-030 Phase B (optimistic staged-buffer editor) is a follow-up, not debt.

View File

@@ -0,0 +1,136 @@
"""GET /api/v1/orchestrator/events/stats — failure-count badge endpoint (DEBT-042)."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import httpx
import pytest
from decnet.web.api import app
_V1 = "/api/v1/orchestrator"
@pytest.mark.anyio
async def test_stats_unauthenticated_401():
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=app), base_url="http://test",
) as ac:
r = await ac.get(f"{_V1}/events/stats?since=1h&success=false")
assert r.status_code == 401
@pytest.mark.asyncio
async def test_stats_returns_failure_count_with_window():
from decnet.web.router.orchestrator.api_event_stats import (
orchestrator_event_stats,
)
with patch(
"decnet.web.router.orchestrator.api_event_stats.repo"
) as mock_repo:
mock_repo.count_orchestrator_failures = AsyncMock(return_value=7)
result = await orchestrator_event_stats(
since="1h", success=False, kind=None,
user={"uuid": "u", "role": "viewer"},
)
assert result["count"] == 7
assert result["since"] == "1h"
assert result["success"] is False
assert result["kind"] is None
# Window must be "now - 1h", not 5h or 30s. Tolerance of 5 seconds
# for the test execution.
call = mock_repo.count_orchestrator_failures.await_args
since_ts = call.kwargs["since_ts"]
expected = datetime.now(timezone.utc) - timedelta(hours=1)
assert abs((since_ts - expected).total_seconds()) < 5
@pytest.mark.asyncio
async def test_stats_forwards_kind_filter():
from decnet.web.router.orchestrator.api_event_stats import (
orchestrator_event_stats,
)
with patch(
"decnet.web.router.orchestrator.api_event_stats.repo"
) as mock_repo:
mock_repo.count_orchestrator_failures = AsyncMock(return_value=2)
await orchestrator_event_stats(
since="15m", success=False, kind="email",
user={"uuid": "u", "role": "viewer"},
)
assert mock_repo.count_orchestrator_failures.await_args.kwargs["kind"] == "email"
@pytest.mark.asyncio
async def test_stats_rejects_success_true():
"""Only success=false is supported on this surface today; everything
else is rejected so the endpoint isn't accidentally repurposed
before the next consumer is properly designed."""
from fastapi import HTTPException
from decnet.web.router.orchestrator.api_event_stats import (
orchestrator_event_stats,
)
with pytest.raises(HTTPException) as exc:
await orchestrator_event_stats(
since="1h", success=True, kind=None,
user={"uuid": "u", "role": "viewer"},
)
assert exc.value.status_code == 422
@pytest.mark.asyncio
async def test_stats_rejects_success_unset():
from fastapi import HTTPException
from decnet.web.router.orchestrator.api_event_stats import (
orchestrator_event_stats,
)
with pytest.raises(HTTPException) as exc:
await orchestrator_event_stats(
since="1h", success=None, kind=None,
user={"uuid": "u", "role": "viewer"},
)
assert exc.value.status_code == 422
@pytest.mark.asyncio
async def test_stats_rejects_malformed_since():
from fastapi import HTTPException
from decnet.web.router.orchestrator.api_event_stats import (
orchestrator_event_stats,
)
with pytest.raises(HTTPException) as exc:
await orchestrator_event_stats(
since="garbage", success=False, kind=None,
user={"uuid": "u", "role": "viewer"},
)
assert exc.value.status_code == 422
@pytest.mark.asyncio
async def test_stats_rejects_window_over_max():
from fastapi import HTTPException
from decnet.web.router.orchestrator.api_event_stats import (
orchestrator_event_stats,
)
with pytest.raises(HTTPException) as exc:
await orchestrator_event_stats(
since="30d", success=False, kind=None,
user={"uuid": "u", "role": "viewer"},
)
assert exc.value.status_code == 422

View File

@@ -97,6 +97,55 @@ async def test_kind_filter_narrows(tmp_path):
assert {r["kind"] for r in only_file} == {"file"}
@pytest.mark.asyncio
async def test_count_failures_window_and_kind(tmp_path):
"""count_orchestrator_failures must:
- count both tables (events + emails) when kind is None
- respect the since_ts cutoff
- skip success=True rows
- narrow to a single source table when kind is set"""
from datetime import datetime, timedelta, timezone
repo = await _make_repo(tmp_path, "failures.db")
dst = await _seed_decky(repo, "decky-A")
# 2 fresh failures + 1 fresh success on the events table.
for i in range(2):
await repo.record_orchestrator_event({
"kind": "traffic", "protocol": "ssh",
"action": f"fail:{i}", "src_decky_uuid": None,
"dst_decky_uuid": dst, "success": False, "payload": {},
})
await repo.record_orchestrator_event({
"kind": "traffic", "protocol": "ssh",
"action": "ok", "src_decky_uuid": None,
"dst_decky_uuid": dst, "success": True, "payload": {},
})
# 1 fresh email failure.
await repo.record_orchestrator_email({
"ts": datetime.now(timezone.utc),
"subject": "boom", "sender_email": "a@x", "recipient_email": "b@y",
"mail_decky_uuid": "mh", "language": "en",
"thread_id": "t1", "message_id": "<m1@x>", "in_reply_to": None,
"eml_path": "/tmp/m1.eml",
"success": False, "payload": "{}",
})
cutoff = datetime.now(timezone.utc) - timedelta(hours=1)
assert await repo.count_orchestrator_failures(since_ts=cutoff) == 3
assert (
await repo.count_orchestrator_failures(since_ts=cutoff, kind="traffic")
) == 2
assert (
await repo.count_orchestrator_failures(since_ts=cutoff, kind="email")
) == 1
# Future cutoff → nothing matches.
future = datetime.now(timezone.utc) + timedelta(hours=1)
assert await repo.count_orchestrator_failures(since_ts=future) == 0
@pytest.mark.asyncio
async def test_prune_caps_per_dst(tmp_path):
repo = await _make_repo(tmp_path, "prune.db")