From 03beff3840535d37a5b436d460ee068900d3d32a Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 3 May 2026 05:26:45 -0400 Subject: [PATCH] feat(orchestrator): authoritative failure-count badge endpoint (DEBT-042) New GET /api/v1/orchestrator/events/stats?since=1h&success=false&kind=... backed by repo.count_orchestrator_failures(since_ts, kind), which counts failed rows across both orchestrator_events and orchestrator_emails since the cutoff. Window parser accepts ^\d+[smhd]$, capped at 7d. Today only success=false is accepted on this surface so the endpoint isn't accidentally repurposed before the next consumer is properly designed. Orchestrator.tsx polls the endpoint on mount + every 30 s and renders the authoritative DB-derived count instead of deriving from the in-memory SSE buffer + one paginated page (which silently excluded failures older than the local window). --- decnet/web/db/repository.py | 11 ++ decnet/web/db/sqlmodel_repo/orchestrator.py | 40 ++++++ decnet/web/router/__init__.py | 2 + .../router/orchestrator/api_event_stats.py | 99 +++++++++++++ .../src/components/Orchestrator.test.tsx | 60 +++++++- decnet_web/src/components/Orchestrator.tsx | 30 ++-- development/DEBT.md | 32 +++-- tests/api/orchestrator/test_event_stats.py | 136 ++++++++++++++++++ tests/orchestrator/test_repo_pagination.py | 49 +++++++ 9 files changed, 431 insertions(+), 28 deletions(-) create mode 100644 decnet/web/router/orchestrator/api_event_stats.py create mode 100644 tests/api/orchestrator/test_event_stats.py diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 241d1bb1..45ee139a 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -1131,6 +1131,17 @@ class BaseRepository(ABC): """Total orchestrator-event rows, optionally filtered by kind.""" raise NotImplementedError + async def count_orchestrator_failures( + self, + *, + since_ts: Any, + kind: Optional[str] = None, + ) -> int: + """Count failed orchestrator activity since *since_ts*, across + both event + email tables. Backs the dashboard's failure-count + badge (DEBT-042).""" + raise NotImplementedError + async def prune_orchestrator_events(self, per_dst_cap: int = 10000) -> int: """Trim per-``dst_decky_uuid`` rows to a cap. Returns deleted count. diff --git a/decnet/web/db/sqlmodel_repo/orchestrator.py b/decnet/web/db/sqlmodel_repo/orchestrator.py index 6fceb507..dce4b0c0 100644 --- a/decnet/web/db/sqlmodel_repo/orchestrator.py +++ b/decnet/web/db/sqlmodel_repo/orchestrator.py @@ -60,6 +60,46 @@ class OrchestratorMixin(_MixinBase): result = await session.execute(stmt) return result.scalar() or 0 + async def count_orchestrator_failures( + self, + *, + since_ts: datetime, + kind: Optional[str] = None, + ) -> int: + """Count failed orchestrator activity since *since_ts*, across + both ``orchestrator_events`` (traffic / file) and + ``orchestrator_emails`` (email). + + Backs the dashboard's failure-count badge — see DEBT-042. The + in-memory window the badge previously computed against was + bounded by the SSE-buffer + paginated page, so failures older + than the local window read low. This is the authoritative count. + """ + async with self._session() as session: + ev_stmt = ( + select(func.count()).select_from(OrchestratorEvent) + .where( + col(OrchestratorEvent.success).is_(False), + OrchestratorEvent.ts >= since_ts, + ) + ) + if kind in ("traffic", "file"): + ev_stmt = ev_stmt.where(OrchestratorEvent.kind == kind) + em_stmt = ( + select(func.count()).select_from(OrchestratorEmail) + .where( + col(OrchestratorEmail.success).is_(False), + OrchestratorEmail.ts >= since_ts, + ) + ) + ev_count = 0 + em_count = 0 + if kind in (None, "traffic", "file"): + ev_count = (await session.execute(ev_stmt)).scalar() or 0 + if kind in (None, "email"): + em_count = (await session.execute(em_stmt)).scalar() or 0 + return ev_count + em_count + async def prune_orchestrator_events(self, per_dst_cap: int = 10000) -> int: """Trim per-dst rows to *per_dst_cap*, oldest-first. Returns deleted count.""" deleted = 0 diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index 6ef44db3..fdd5576f 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -32,6 +32,7 @@ from .campaigns.api_list_campaign_identities import router as campaign_identitie from .campaigns.api_events import router as campaign_events_router from .orchestrator.api_list_events import router as orchestrator_list_router from .orchestrator.api_events import router as orchestrator_events_router +from .orchestrator.api_event_stats import router as orchestrator_stats_router from .realism.api_config import router as realism_config_router from .realism.api_personas import router as realism_personas_router from .realism.api_synthetic_files import router as realism_synthetic_files_router @@ -123,6 +124,7 @@ api_router.include_router(campaign_identities_router) api_router.include_router(campaign_events_router) api_router.include_router(orchestrator_list_router) api_router.include_router(orchestrator_events_router) +api_router.include_router(orchestrator_stats_router) # Realism — global persona pool CRUD for the dashboard's # "Persona Generation" page. The orchestrator reads from the same diff --git a/decnet/web/router/orchestrator/api_event_stats.py b/decnet/web/router/orchestrator/api_event_stats.py new file mode 100644 index 00000000..33433add --- /dev/null +++ b/decnet/web/router/orchestrator/api_event_stats.py @@ -0,0 +1,99 @@ +"""GET /api/v1/orchestrator/events/stats — authoritative failure count. + +The dashboard's failure-count badge previously derived its number from +the in-memory SSE buffer + a single paginated page (capped at 500 + +limit rows). On busy fleets, failures older than the local window +were silently excluded and the badge read low — see DEBT-042. This +endpoint returns the real count straight from the DB so the badge +matches reality. +""" +from __future__ import annotations + +import re +from datetime import datetime, timedelta, timezone +from typing import Any, Optional + +from fastapi import APIRouter, Depends, HTTPException, Query + +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import repo, require_viewer + +router = APIRouter() + +_SINCE_RE = re.compile(r"^(\d+)([smhd])$") +# Bounded to avoid unintentionally-expensive scans. 7d covers the +# operator UX use case (failure-count badge) while still returning +# in O(index seek + count). +_MAX_SINCE = timedelta(days=7) + + +def _parse_since(s: str) -> timedelta: + m = _SINCE_RE.match(s) + if not m: + raise HTTPException( + status_code=422, + detail="since must match ^(\\d+)[smhd]$ (e.g. '15m', '1h', '24h', '7d')", + ) + value, unit = int(m.group(1)), m.group(2) + if value <= 0: + raise HTTPException(status_code=422, detail="since must be > 0") + delta = { + "s": timedelta(seconds=value), + "m": timedelta(minutes=value), + "h": timedelta(hours=value), + "d": timedelta(days=value), + }[unit] + if delta > _MAX_SINCE: + raise HTTPException( + status_code=422, + detail=f"since exceeds maximum window of {_MAX_SINCE}", + ) + return delta + + +@router.get( + "/orchestrator/events/stats", + tags=["Orchestrator"], + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 422: {"description": "Validation error"}, + }, +) +@_traced("api.orchestrator.events.stats") +async def orchestrator_event_stats( + since: str = Query("1h", description="Window relative to now, e.g. '15m', '1h', '24h'."), + success: Optional[bool] = Query( + None, + description="If set, restrict the count to rows with this success value.", + ), + kind: Optional[str] = Query( + None, pattern="^(traffic|file|email)$", + ), + user: dict = Depends(require_viewer), +) -> dict[str, Any]: + """Aggregate counts for the orchestrator activity feed. + + Today only the failure-count badge consumes this surface, so the + only supported aggregate is ``success=false`` (everything else is + rejected — ``success=true`` and the unfiltered total can be served + by the existing ``count`` on the list endpoint without a window + filter, and we'd rather not paint ourselves into a corner before + the next consumer shows up). + """ + if success is not False: + raise HTTPException( + status_code=422, + detail="only success=false is supported on this surface today", + ) + delta = _parse_since(since) + since_ts = datetime.now(timezone.utc) - delta + count = await repo.count_orchestrator_failures( + since_ts=since_ts, kind=kind, + ) + return { + "since": since, + "success": success, + "kind": kind, + "count": count, + } diff --git a/decnet_web/src/components/Orchestrator.test.tsx b/decnet_web/src/components/Orchestrator.test.tsx index 0a488b75..a0046fde 100644 --- a/decnet_web/src/components/Orchestrator.test.tsx +++ b/decnet_web/src/components/Orchestrator.test.tsx @@ -26,6 +26,26 @@ vi.mock('./useOrchestratorStream', () => ({ import api from '../utils/api'; const apiGet = api.get as ReturnType; +/** Two endpoints fire at mount — events list + failure-count stats. + * This dispatcher maps URLs to canned responses so per-test cases stay + * focused on the path they care about. */ +const buildApiResponder = (overrides: { + events?: { data: unknown[]; total: number }; + failures?: number; +} = {}) => { + const events = overrides.events ?? { data: [], total: 0 }; + const failures = overrides.failures ?? 0; + return (url: string) => { + if (url.startsWith('/orchestrator/events/stats')) { + return Promise.resolve({ data: { count: failures } }); + } + if (url.startsWith('/orchestrator/events')) { + return Promise.resolve({ data: events }); + } + return Promise.resolve({ data: {} }); + }; +}; + const renderPage = () => render( @@ -40,7 +60,7 @@ describe('Orchestrator', () => { }); it('renders the empty state when the API returns no events', async () => { - apiGet.mockResolvedValueOnce({ data: { data: [], total: 0 } }); + apiGet.mockImplementation(buildApiResponder()); renderPage(); @@ -50,11 +70,14 @@ describe('Orchestrator', () => { }); it('switches the kind filter and refetches scoped to that kind', async () => { - apiGet.mockResolvedValue({ data: { data: [], total: 0 } }); + apiGet.mockImplementation(buildApiResponder()); renderPage(); - await waitFor(() => expect(apiGet).toHaveBeenCalledTimes(1)); - expect(apiGet.mock.calls[0][0]).toMatch(/^\/orchestrator\/events\?limit=50&offset=0$/); + await waitFor(() => + expect( + apiGet.mock.calls.some((c) => /^\/orchestrator\/events\?limit=50&offset=0$/.test(c[0])), + ).toBe(true), + ); await userEvent.click(screen.getByRole('tab', { name: /^email$/ })); @@ -65,7 +88,7 @@ describe('Orchestrator', () => { }); it('prepends a row when the live stream pushes a traffic event', async () => { - apiGet.mockResolvedValueOnce({ data: { data: [], total: 0 } }); + apiGet.mockImplementation(buildApiResponder()); renderPage(); await waitFor(() => expect(capturedOnEvent).not.toBeNull()); @@ -90,4 +113,31 @@ describe('Orchestrator', () => { // 1 event shown after a single push. expect(screen.getByText(/1 EVENTS SHOWN/i)).toBeInTheDocument(); }); + + it('renders the failure-count badge from the stats endpoint (DEBT-042)', async () => { + apiGet.mockImplementation(buildApiResponder({ failures: 42 })); + + renderPage(); + + expect(await screen.findByText(/42 FAILURES \/ 1H/i)).toBeInTheDocument(); + // Stats endpoint is the authoritative source — verify it was actually queried. + expect( + apiGet.mock.calls.some((c) => + /\/orchestrator\/events\/stats\?since=1h&success=false/.test(c[0]), + ), + ).toBe(true); + }); + + it('hides the failure-count badge when the stats endpoint reports zero', async () => { + apiGet.mockImplementation(buildApiResponder({ failures: 0 })); + + renderPage(); + await waitFor(() => + expect( + apiGet.mock.calls.some((c) => /\/orchestrator\/events\/stats/.test(c[0])), + ).toBe(true), + ); + + expect(screen.queryByText(/FAILURES \/ 1H/i)).not.toBeInTheDocument(); + }); }); diff --git a/decnet_web/src/components/Orchestrator.tsx b/decnet_web/src/components/Orchestrator.tsx index d6c41c54..0cef5702 100644 --- a/decnet_web/src/components/Orchestrator.tsx +++ b/decnet_web/src/components/Orchestrator.tsx @@ -36,7 +36,6 @@ type KindFilter = 'all' | 'traffic' | 'file' | 'email'; type StreamStatus = 'connecting' | 'live' | 'error'; const ROW_CAP = 500; -const HOUR_MS = 60 * 60 * 1000; const FRESH_MS = 5_000; const timeAgo = (dateStr: string | null): string => { @@ -64,6 +63,7 @@ const Orchestrator: React.FC = () => { const [paused, setPaused] = useState(false); const [now, setNow] = useState(Date.now()); const [selected, setSelected] = useState(null); + const [failuresLastHour, setFailuresLastHour] = useState(0); const limit = 50; const pausedRef = useRef(paused); @@ -75,6 +75,27 @@ const Orchestrator: React.FC = () => { return () => clearInterval(t); }, []); + // Authoritative failure count from the DB — see DEBT-042. The + // in-memory derivation it replaced was bounded by the SSE buffer + + // one paginated page, so failures older than the local window were + // silently excluded and the badge read low on busy fleets. + useEffect(() => { + let cancelled = false; + const fetchStats = async () => { + try { + const res = await api.get( + '/orchestrator/events/stats?since=1h&success=false', + ); + if (!cancelled) setFailuresLastHour(res.data?.count ?? 0); + } catch { + // Silent: the badge is a hint, missing data shouldn't blow up the page. + } + }; + fetchStats(); + const t = setInterval(fetchStats, 30_000); + return () => { cancelled = true; clearInterval(t); }; + }, []); + const fetchEvents = async () => { setLoading(true); try { @@ -163,13 +184,6 @@ const Orchestrator: React.FC = () => { return merged.filter((r) => r.kind === kindParam); }, [streamRows, rows, kindParam]); - const failuresLastHour = useMemo(() => { - const cutoff = now - HOUR_MS; - return [...streamRows, ...rows].filter( - (r) => !r.success && new Date(r.ts).getTime() >= cutoff, - ).length; - }, [streamRows, rows, now]); - const statusLabel = status === 'live' ? 'LIVE' : status === 'connecting' ? 'CONNECTING' diff --git a/development/DEBT.md b/development/DEBT.md index a96ce617..9ca73da1 100644 --- a/development/DEBT.md +++ b/development/DEBT.md @@ -498,19 +498,21 @@ Resolved 2026-05-03. All base images now carry `image:tag@sha256:` refer ~~**Files:** Project root~~ `requirements.lock` generated via `pip freeze`. Reproducible installs now available via `pip install -r requirements.lock`. -### DEBT-042 — Orchestrator failure-count badge is window-bound -**File:** `decnet_web/src/components/Orchestrator.tsx` -The "X failures / 1h" header badge is computed from the in-memory SSE -window (capped at 500 rows merged with one paginated server page). On -busy fleets — many deckies × dense activity — failures older than the -local window or beyond the visible page are silently excluded, so the -badge can read low. Acceptable for MVP; the badge is a hint, not a -metric. -**Remediation:** add a dedicated count endpoint -(`GET /api/v1/orchestrator/events/stats?since=1h&success=false`) and -have the badge call it on the same cadence the page already polls. -Trigger: first time the count visibly diverges from a hand-checked -DB query, or fleet size ≥ 10 active deckies. +### ~~DEBT-042 — Orchestrator failure-count badge is window-bound~~ ✅ RESOLVED 2026-05-03 +**Files:** `decnet/web/router/orchestrator/api_event_stats.py` (new), +`decnet/web/db/sqlmodel_repo/orchestrator.py`, `decnet/web/db/repository.py`, +`decnet_web/src/components/Orchestrator.tsx`. +New `GET /api/v1/orchestrator/events/stats?since=1h&success=false&kind=...` +endpoint backed by `repo.count_orchestrator_failures(since_ts, kind)`, +which counts failed rows across both `orchestrator_events` and +`orchestrator_emails` since the cutoff. The badge polls the endpoint +on mount + every 30 s and renders the authoritative DB-derived count +instead of deriving from the SSE buffer + one paginated page. Window +parser accepts `^\d+[smhd]$`, capped at 7d. Today only `success=false` +is accepted on this surface (the only consumer); other modes are +rejected so the endpoint isn't accidentally repurposed before the +next consumer is properly designed. Repo + endpoint + badge tests +land in the same commit. ### ~~DEBT-043 — No frontend test framework configured~~ ✅ RESOLVED 2026-05-03 **Files:** `decnet_web/package.json`, `decnet_web/vite.config.ts`, @@ -720,7 +722,7 @@ user who needs it. | ~~DEBT-039~~ | ✅ | Honeypot / Cred emitters | resolved | | ~~DEBT-040~~ | ✅ | Honeypot / RDP+SMB cred framers | resolved | | ~~DEBT-041~~ | ✅ | API / UI / Threat-intel keying | resolved | -| DEBT-042 | 🟢 Low | UI / Orchestrator failure-count window | open | +| ~~DEBT-042~~ | ✅ | UI / Orchestrator failure-count window | resolved 2026-05-03 | | ~~DEBT-043~~ | ✅ | Frontend test framework missing | resolved 2026-05-03 | | ~~DEBT-044~~ | ✅ | TTP / Email producer wiring | resolved 2026-05-02 | | DEBT-045 | 🟡 Medium | TTP / EmailLifter heavyweight extraction | partial paid 2026-05-02 | @@ -729,5 +731,5 @@ user who needs it. | DEBT-048 | 🟡 Medium | TTP / Intel provider mapping review (recurring) | open / recurring | | DEBT-049 | 🟡 Medium | TTP / Sigma adapter (post-v1) | open | -**Remaining open:** DEBT-011 (Alembic), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-042 (orchestrator failure-count window), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-046 (mal-hash feed), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1). +**Remaining open:** DEBT-011 (Alembic), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-046 (mal-hash feed), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1). **Estimated remaining effort:** ~21 hours plus the new EmailLifter / TTP follow-ups. DEBT-030 Phase B (optimistic staged-buffer editor) is a follow-up, not debt. diff --git a/tests/api/orchestrator/test_event_stats.py b/tests/api/orchestrator/test_event_stats.py new file mode 100644 index 00000000..89110a19 --- /dev/null +++ b/tests/api/orchestrator/test_event_stats.py @@ -0,0 +1,136 @@ +"""GET /api/v1/orchestrator/events/stats — failure-count badge endpoint (DEBT-042).""" +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, patch + +import httpx +import pytest + +from decnet.web.api import app + +_V1 = "/api/v1/orchestrator" + + +@pytest.mark.anyio +async def test_stats_unauthenticated_401(): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test", + ) as ac: + r = await ac.get(f"{_V1}/events/stats?since=1h&success=false") + assert r.status_code == 401 + + +@pytest.mark.asyncio +async def test_stats_returns_failure_count_with_window(): + from decnet.web.router.orchestrator.api_event_stats import ( + orchestrator_event_stats, + ) + + with patch( + "decnet.web.router.orchestrator.api_event_stats.repo" + ) as mock_repo: + mock_repo.count_orchestrator_failures = AsyncMock(return_value=7) + + result = await orchestrator_event_stats( + since="1h", success=False, kind=None, + user={"uuid": "u", "role": "viewer"}, + ) + + assert result["count"] == 7 + assert result["since"] == "1h" + assert result["success"] is False + assert result["kind"] is None + + # Window must be "now - 1h", not 5h or 30s. Tolerance of 5 seconds + # for the test execution. + call = mock_repo.count_orchestrator_failures.await_args + since_ts = call.kwargs["since_ts"] + expected = datetime.now(timezone.utc) - timedelta(hours=1) + assert abs((since_ts - expected).total_seconds()) < 5 + + +@pytest.mark.asyncio +async def test_stats_forwards_kind_filter(): + from decnet.web.router.orchestrator.api_event_stats import ( + orchestrator_event_stats, + ) + + with patch( + "decnet.web.router.orchestrator.api_event_stats.repo" + ) as mock_repo: + mock_repo.count_orchestrator_failures = AsyncMock(return_value=2) + + await orchestrator_event_stats( + since="15m", success=False, kind="email", + user={"uuid": "u", "role": "viewer"}, + ) + + assert mock_repo.count_orchestrator_failures.await_args.kwargs["kind"] == "email" + + +@pytest.mark.asyncio +async def test_stats_rejects_success_true(): + """Only success=false is supported on this surface today; everything + else is rejected so the endpoint isn't accidentally repurposed + before the next consumer is properly designed.""" + from fastapi import HTTPException + + from decnet.web.router.orchestrator.api_event_stats import ( + orchestrator_event_stats, + ) + + with pytest.raises(HTTPException) as exc: + await orchestrator_event_stats( + since="1h", success=True, kind=None, + user={"uuid": "u", "role": "viewer"}, + ) + assert exc.value.status_code == 422 + + +@pytest.mark.asyncio +async def test_stats_rejects_success_unset(): + from fastapi import HTTPException + + from decnet.web.router.orchestrator.api_event_stats import ( + orchestrator_event_stats, + ) + + with pytest.raises(HTTPException) as exc: + await orchestrator_event_stats( + since="1h", success=None, kind=None, + user={"uuid": "u", "role": "viewer"}, + ) + assert exc.value.status_code == 422 + + +@pytest.mark.asyncio +async def test_stats_rejects_malformed_since(): + from fastapi import HTTPException + + from decnet.web.router.orchestrator.api_event_stats import ( + orchestrator_event_stats, + ) + + with pytest.raises(HTTPException) as exc: + await orchestrator_event_stats( + since="garbage", success=False, kind=None, + user={"uuid": "u", "role": "viewer"}, + ) + assert exc.value.status_code == 422 + + +@pytest.mark.asyncio +async def test_stats_rejects_window_over_max(): + from fastapi import HTTPException + + from decnet.web.router.orchestrator.api_event_stats import ( + orchestrator_event_stats, + ) + + with pytest.raises(HTTPException) as exc: + await orchestrator_event_stats( + since="30d", success=False, kind=None, + user={"uuid": "u", "role": "viewer"}, + ) + assert exc.value.status_code == 422 diff --git a/tests/orchestrator/test_repo_pagination.py b/tests/orchestrator/test_repo_pagination.py index e28ff2e9..a213afa4 100644 --- a/tests/orchestrator/test_repo_pagination.py +++ b/tests/orchestrator/test_repo_pagination.py @@ -97,6 +97,55 @@ async def test_kind_filter_narrows(tmp_path): assert {r["kind"] for r in only_file} == {"file"} +@pytest.mark.asyncio +async def test_count_failures_window_and_kind(tmp_path): + """count_orchestrator_failures must: + - count both tables (events + emails) when kind is None + - respect the since_ts cutoff + - skip success=True rows + - narrow to a single source table when kind is set""" + from datetime import datetime, timedelta, timezone + + repo = await _make_repo(tmp_path, "failures.db") + dst = await _seed_decky(repo, "decky-A") + + # 2 fresh failures + 1 fresh success on the events table. + for i in range(2): + await repo.record_orchestrator_event({ + "kind": "traffic", "protocol": "ssh", + "action": f"fail:{i}", "src_decky_uuid": None, + "dst_decky_uuid": dst, "success": False, "payload": {}, + }) + await repo.record_orchestrator_event({ + "kind": "traffic", "protocol": "ssh", + "action": "ok", "src_decky_uuid": None, + "dst_decky_uuid": dst, "success": True, "payload": {}, + }) + + # 1 fresh email failure. + await repo.record_orchestrator_email({ + "ts": datetime.now(timezone.utc), + "subject": "boom", "sender_email": "a@x", "recipient_email": "b@y", + "mail_decky_uuid": "mh", "language": "en", + "thread_id": "t1", "message_id": "", "in_reply_to": None, + "eml_path": "/tmp/m1.eml", + "success": False, "payload": "{}", + }) + + cutoff = datetime.now(timezone.utc) - timedelta(hours=1) + + assert await repo.count_orchestrator_failures(since_ts=cutoff) == 3 + assert ( + await repo.count_orchestrator_failures(since_ts=cutoff, kind="traffic") + ) == 2 + assert ( + await repo.count_orchestrator_failures(since_ts=cutoff, kind="email") + ) == 1 + # Future cutoff → nothing matches. + future = datetime.now(timezone.utc) + timedelta(hours=1) + assert await repo.count_orchestrator_failures(since_ts=future) == 0 + + @pytest.mark.asyncio async def test_prune_caps_per_dst(tmp_path): repo = await _make_repo(tmp_path, "prune.db")