diff --git a/tests/api/topology/test_events_stream.py b/tests/api/topology/test_events_stream.py new file mode 100644 index 00000000..8109f3ce --- /dev/null +++ b/tests/api/topology/test_events_stream.py @@ -0,0 +1,140 @@ +"""SSE events stream — GET /topologies/{id}/events (DEBT-030).""" +from __future__ import annotations + +import asyncio + +import httpx +import pytest + +from decnet.bus import app as _bus_app +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus +from decnet.topology.config import TopologyConfig +from decnet.topology.generator import generate +from decnet.topology.persistence import persist, transition_status +from decnet.topology.status import TopologyStatus +from decnet.web.api import app +from decnet.web.dependencies import repo as _repo + +_V1 = "/api/v1/topologies" + + +def _cfg(name: str) -> TopologyConfig: + return TopologyConfig( + name=name, depth=1, branching_factor=1, + deckies_per_lan_min=1, deckies_per_lan_max=1, + services_explicit=["ssh"], randomize_services=False, seed=0, + ) + + +async def _seed_active(name: str) -> str: + tid = await persist(_repo, generate(_cfg(name))) + await transition_status(_repo, tid, TopologyStatus.DEPLOYING) + await transition_status(_repo, tid, TopologyStatus.ACTIVE) + return tid + + +@pytest.fixture +def _fake_app_bus(monkeypatch): + bus = FakeBus() + + async def _get() -> FakeBus: + if not bus._connected: + await bus.connect() + return bus + + monkeypatch.setattr(_bus_app, "get_app_bus", _get) + from decnet.web.router.topology import api_events as _ev + from decnet.web.router.topology import api_mutations as _mu + monkeypatch.setattr(_ev, "get_app_bus", _get) + monkeypatch.setattr(_mu, "get_app_bus", _get) + return bus + + +@pytest.mark.anyio +async def test_events_unauthenticated_401(): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test", + ) as ac: + r = await ac.get(f"{_V1}/any/events") + assert r.status_code == 401 + + +@pytest.mark.anyio +async def test_events_missing_topology_404(auth_token, _fake_app_bus): + async with httpx.AsyncClient( + transport=httpx.ASGITransport(app=app), base_url="http://test", + ) as ac: + r = await ac.get( + f"{_V1}/nope/events", + params={"token": auth_token}, + ) + assert r.status_code == 404 + + +@pytest.mark.anyio +async def test_events_emits_snapshot_and_live_event(auth_token, _fake_app_bus): + """Drive the generator directly — avoids the full httpx streaming + roundtrip, which is painful under ASGITransport + an infinite SSE loop. + + The route is thin glue: if the generator yields snapshot + mapped + bus events, the handler works. Auth/404 paths are covered above. + """ + from decnet.web.router.topology import api_events as _ev + + tid = await _seed_active("evt-live") + + class _FakeRequest: + async def is_disconnected(self) -> bool: + return False + + # Patch out the role gate so we can call the async endpoint directly. + response = await _ev.api_topology_events( + topology_id=tid, + request=_FakeRequest(), # type: ignore[arg-type] + _user={"role": "admin"}, + ) + gen = response.body_iterator + + def _as_text(frame) -> str: + return frame if isinstance(frame, str) else frame.decode() + + async def _publish_after_snapshot() -> None: + # Wait for the generator to reach its blocking subscribe state. + # We don't have a synchronization primitive, so a short sleep is + # good enough — the test-level timeout catches any real hang. + await asyncio.sleep(0.1) + await _fake_app_bus.publish( + _topics.topology_mutation(tid, _topics.MUTATION_APPLIED), + {"mutation_id": "m1", "op": "add_lan"}, + event_type=_topics.MUTATION_APPLIED, + ) + + pub_task = asyncio.create_task(_publish_after_snapshot()) + + async def _drive() -> tuple[bool, bool]: + saw_snapshot = False + saw_live = False + # Bounded — real loop produces keepalive, snapshot, (waits), then + # forwarded event. Max 5 iterations covers pathological orderings. + for _ in range(5): + frame = _as_text(await gen.__anext__()) + if "event: snapshot" in frame: + saw_snapshot = True + if "event: mutation.applied" in frame: + saw_live = True + break + return saw_snapshot, saw_live + + try: + saw_snapshot, saw_live = await asyncio.wait_for(_drive(), timeout=5.0) + finally: + pub_task.cancel() + try: + await pub_task + except (asyncio.CancelledError, Exception): + pass + await gen.aclose() + + assert saw_snapshot + assert saw_live diff --git a/tests/api/topology/test_mutations.py b/tests/api/topology/test_mutations.py index 4bbaa00d..9284812c 100644 --- a/tests/api/topology/test_mutations.py +++ b/tests/api/topology/test_mutations.py @@ -3,6 +3,9 @@ from __future__ import annotations import pytest +from decnet.bus import app as _bus_app +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus from decnet.topology.config import TopologyConfig from decnet.topology.generator import generate from decnet.topology.persistence import persist, transition_status @@ -157,3 +160,44 @@ async def test_list_viewer_ok(client, viewer_token): headers=_hdr(viewer_token), ) assert r.status_code == 200 + + +# ── Bus publish on enqueue (DEBT-030) ───────────────────────────── + + +@pytest.fixture +def _fake_app_bus(monkeypatch): + """Replace the process-wide app bus with an in-process FakeBus.""" + bus = FakeBus() + + async def _get() -> FakeBus: + if not bus._connected: + await bus.connect() + return bus + + monkeypatch.setattr(_bus_app, "get_app_bus", _get) + # Also patch the re-export in the route module. + from decnet.web.router.topology import api_mutations as _mod + monkeypatch.setattr(_mod, "get_app_bus", _get) + return bus + + +@pytest.mark.anyio +async def test_enqueue_publishes_on_bus(client, auth_token, _fake_app_bus): + topology_id = await _seed_active("enq-pub") + sub = _fake_app_bus.subscribe( + _topics.topology_mutation(topology_id, _topics.MUTATION_ENQUEUED), + ) + async with sub: + r = await client.post( + f"{_V1}/{topology_id}/mutations", + json={"op": "add_lan", "payload": {"name": "pub-lan"}}, + headers=_hdr(auth_token), + ) + assert r.status_code == 202 + mutation_id = r.json()["mutation_id"] + import asyncio + event = await asyncio.wait_for(sub.__aiter__().__anext__(), timeout=1.0) + assert event.type == _topics.MUTATION_ENQUEUED + assert event.payload["mutation_id"] == mutation_id + assert event.payload["op"] == "add_lan" diff --git a/tests/topology/test_mutator.py b/tests/topology/test_mutator.py index 3b38cd6e..65508b2c 100644 --- a/tests/topology/test_mutator.py +++ b/tests/topology/test_mutator.py @@ -1,10 +1,13 @@ """Step 7 — topology_mutations queue + mutator reconciler branch.""" from __future__ import annotations +import asyncio import json import pytest +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus from decnet.mutator import engine as _engine from decnet.mutator.ops import MutationError, apply_add_lan, apply_update_decky from decnet.topology.config import TopologyConfig @@ -272,3 +275,104 @@ def test_ops_payload_shape_docstring_present(): def _payload_json(d: dict) -> str: return json.dumps(d) + + +# ---------------------------------------------------- bus publishing (DEBT-030) + + +async def _drain(sub, expected: int, timeout: float = 2.0) -> list: + """Collect up to *expected* events from *sub* with a hard timeout. + + Used to assert bus publishes without racing against the in-process + FakeBus queue — drains are short by construction (the reconciler + produces a bounded number of events per claim). + """ + events: list = [] + sub_iter = sub.__aiter__() + for _ in range(expected): + events.append(await asyncio.wait_for(sub_iter.__anext__(), timeout=timeout)) + return events + + +@pytest.mark.anyio +async def test_reconcile_publishes_applying_and_applied(repo): + tid = await _make_active(repo) + await repo.enqueue_topology_mutation( + tid, "add_lan", + {"name": "LAN-PUB", "subnet": "172.20.45.0/24"}, + ) + bus = FakeBus() + await bus.connect() + sub = bus.subscribe(f"{_topics.TOPOLOGY}.{tid}.>") + try: + async with sub: + drained = await _engine.reconcile_topologies(repo, bus=bus) + assert drained == 1 + events = await _drain(sub, expected=2) + finally: + await bus.close() + types = [e.type for e in events] + assert types == [_topics.MUTATION_APPLYING, _topics.MUTATION_APPLIED] + + +@pytest.mark.anyio +async def test_reconcile_publishes_failed_and_status(repo): + tid = await _make_active(repo) + existing = (await repo.list_lans_for_topology(tid))[0]["name"] + await repo.enqueue_topology_mutation( + tid, "add_lan", {"name": existing, "subnet": "172.20.89.0/24"}, + ) + bus = FakeBus() + await bus.connect() + sub = bus.subscribe(f"{_topics.TOPOLOGY}.{tid}.>") + try: + async with sub: + await _engine.reconcile_topologies(repo, bus=bus) + # applying + failed + status(degraded) + events = await _drain(sub, expected=3) + finally: + await bus.close() + types = [e.type for e in events] + assert types == [ + _topics.MUTATION_APPLYING, _topics.MUTATION_FAILED, _topics.TOPOLOGY_STATUS, + ] + assert events[-1].payload["state"] == TopologyStatus.DEGRADED + + +@pytest.mark.anyio +async def test_reconcile_with_null_bus_is_safe(repo): + """Passing ``bus=None`` must not break the reconciler — publish is + a fire-and-forget nicety, the DB is the source of truth.""" + tid = await _make_active(repo) + await repo.enqueue_topology_mutation( + tid, "add_lan", + {"name": "LAN-NULL", "subnet": "172.20.46.0/24"}, + ) + drained = await _engine.reconcile_topologies(repo, bus=None) + assert drained == 1 + + +@pytest.mark.anyio +async def test_wake_on_enqueue_sets_event(repo): + """``_wake_on_enqueue`` flips the asyncio.Event on every matching event.""" + bus = FakeBus() + await bus.connect() + wake = asyncio.Event() + task = asyncio.create_task(_engine._wake_on_enqueue(bus, wake)) + try: + # Give the subscription a tick to register. + await asyncio.sleep(0) + await bus.publish( + _topics.topology_mutation("abc", _topics.MUTATION_ENQUEUED), + {"mutation_id": "m1", "op": "add_lan"}, + event_type=_topics.MUTATION_ENQUEUED, + ) + await asyncio.wait_for(wake.wait(), timeout=1.0) + assert wake.is_set() + finally: + task.cancel() + try: + await task + except (asyncio.CancelledError, Exception): + pass + await bus.close()