"""Pagination + filter + prune for orchestrator_events repo methods.""" from __future__ import annotations import json import pytest from decnet.web.db.models import Topology, TopologyDecky from decnet.web.db.sqlite.repository import SQLiteRepository async def _make_repo(tmp_path, name: str) -> SQLiteRepository: r = SQLiteRepository(db_path=str(tmp_path / name)) await r.initialize() return r @pytest.mark.asyncio async def test_empty_table_zero_total(tmp_path): repo = await _make_repo(tmp_path, "orch.db") assert await repo.list_orchestrator_events(limit=50, offset=0) == [] assert await repo.count_orchestrator_events() == 0 async def _seed_decky(repo: SQLiteRepository, name: str = "d-1") -> str: async with repo._session() as session: topo = Topology(name=f"t-{name}", config_snapshot="{}", status="active") session.add(topo) await session.commit() await session.refresh(topo) d = TopologyDecky( topology_id=topo.id, name=name, services=json.dumps(["ssh"]), ip="10.0.0.2", state="running", ) session.add(d) await session.commit() await session.refresh(d) return d.uuid async def _seed( repo: SQLiteRepository, n: int = 5, kind: str = "traffic", dst: str | None = None, ) -> str: if dst is None: dst = await _seed_decky(repo, "decky-A") for i in range(n): await repo.record_orchestrator_event({ "kind": kind, "protocol": "ssh", "action": f"exec:{i}", "src_decky_uuid": None, "dst_decky_uuid": dst, "success": True, "payload": {"i": i}, }) return dst @pytest.mark.asyncio async def test_pagination_respects_limit_offset(tmp_path): repo = await _make_repo(tmp_path, "p.db") await _seed(repo, n=5) assert await repo.count_orchestrator_events() == 5 page1 = await repo.list_orchestrator_events(limit=2, offset=0) page2 = await repo.list_orchestrator_events(limit=2, offset=2) assert len(page1) == 2 assert len(page2) == 2 assert {r["uuid"] for r in page1}.isdisjoint({r["uuid"] for r in page2}) @pytest.mark.asyncio async def test_kind_filter_narrows(tmp_path): repo = await _make_repo(tmp_path, "k.db") dst = await _seed_decky(repo, "decky-K") for i in range(3): await repo.record_orchestrator_event({ "kind": "traffic", "protocol": "ssh", "action": f"a{i}", "src_decky_uuid": None, "dst_decky_uuid": dst, "success": True, "payload": {}, }) for i in range(2): await repo.record_orchestrator_event({ "kind": "file", "protocol": "ssh", "action": f"f{i}", "src_decky_uuid": None, "dst_decky_uuid": dst, "success": True, "payload": {}, }) assert await repo.count_orchestrator_events() == 5 assert await repo.count_orchestrator_events(kind="traffic") == 3 assert await repo.count_orchestrator_events(kind="file") == 2 only_file = await repo.list_orchestrator_events(limit=50, kind="file") assert {r["kind"] for r in only_file} == {"file"} @pytest.mark.asyncio async def test_prune_caps_per_dst(tmp_path): repo = await _make_repo(tmp_path, "prune.db") await _seed(repo, n=10) deleted = await repo.prune_orchestrator_events(per_dst_cap=3) assert deleted == 7 assert await repo.count_orchestrator_events() == 3