Replaces LICENSE (GPLv3 -> AGPLv3) and prepends `SPDX-License-Identifier: AGPL-3.0-or-later` to every source file across decnet/, decnet_web/, tests/, scripts/, and tools/. Rationale: closes the GPLv3 ASP loophole so any party operating a modified DECNET as a network service must offer their modified source. Personal copyright (Samuel Paschuan) + inbound=outbound contributions make a future unilateral relicense infeasible. - LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt) - COPYRIGHT: project copyright notice - tools/add_spdx_headers.py: idempotent header injector (shebang- and PEP 263-aware) Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh). No behavior change; comments only.
110 lines
3.8 KiB
Python
110 lines
3.8 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""Tests for :class:`decnet.bus.fake.FakeBus` and :class:`NullBus`."""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
import pytest
|
|
|
|
from decnet.bus.fake import FakeBus, NullBus
|
|
|
|
|
|
async def _collect(sub, n: int, timeout: float = 1.0) -> list:
|
|
out = []
|
|
try:
|
|
async with asyncio.timeout(timeout):
|
|
async for event in sub:
|
|
out.append(event)
|
|
if len(out) >= n:
|
|
break
|
|
except TimeoutError:
|
|
pass
|
|
return out
|
|
|
|
|
|
class TestFakeBus:
|
|
async def test_publish_delivers_to_exact_match(self, fake_bus: FakeBus) -> None:
|
|
sub = fake_bus.subscribe("topology.abc.status")
|
|
async with sub:
|
|
await fake_bus.publish("topology.abc.status", {"status": "active"})
|
|
events = await _collect(sub, 1)
|
|
assert len(events) == 1
|
|
assert events[0].payload == {"status": "active"}
|
|
|
|
async def test_publish_delivers_to_wildcard(self, fake_bus: FakeBus) -> None:
|
|
sub = fake_bus.subscribe("topology.*.mutation.*")
|
|
async with sub:
|
|
await fake_bus.publish("topology.t1.mutation.applied", {"id": 1})
|
|
await fake_bus.publish("topology.t2.mutation.failed", {"id": 2})
|
|
await fake_bus.publish("decky.x.state", {"state": "running"}) # should not match
|
|
events = await _collect(sub, 2)
|
|
assert len(events) == 2
|
|
assert {e.payload["id"] for e in events} == {1, 2}
|
|
|
|
async def test_multiple_subscribers_each_get_copy(self, fake_bus: FakeBus) -> None:
|
|
sub_a = fake_bus.subscribe("topology.>")
|
|
sub_b = fake_bus.subscribe("topology.>")
|
|
async with sub_a, sub_b:
|
|
await fake_bus.publish("topology.abc.status", {"status": "active"})
|
|
a = await _collect(sub_a, 1)
|
|
b = await _collect(sub_b, 1)
|
|
assert len(a) == 1
|
|
assert len(b) == 1
|
|
|
|
async def test_subscription_close_unblocks_iter(self, fake_bus: FakeBus) -> None:
|
|
sub = fake_bus.subscribe("topology.>")
|
|
|
|
async def consume() -> list:
|
|
out = []
|
|
async for event in sub:
|
|
out.append(event)
|
|
return out
|
|
|
|
task = asyncio.create_task(consume())
|
|
await asyncio.sleep(0.01) # let task block on queue.get()
|
|
await sub.aclose()
|
|
events = await asyncio.wait_for(task, timeout=0.5)
|
|
assert events == []
|
|
|
|
async def test_close_is_idempotent(self, fake_bus: FakeBus) -> None:
|
|
await fake_bus.close()
|
|
await fake_bus.close() # second call must not raise
|
|
|
|
async def test_publish_on_closed_raises(self, fake_bus: FakeBus) -> None:
|
|
await fake_bus.close()
|
|
with pytest.raises(RuntimeError):
|
|
await fake_bus.publish("x", {})
|
|
with pytest.raises(RuntimeError):
|
|
fake_bus.subscribe("x")
|
|
|
|
async def test_backpressure_drops_oldest(self) -> None:
|
|
bus = FakeBus(queue_size=2)
|
|
await bus.connect()
|
|
try:
|
|
sub = bus.subscribe("t")
|
|
# Don't consume; publish 5 — queue holds at most 2, oldest dropped.
|
|
for i in range(5):
|
|
await bus.publish("t", {"i": i})
|
|
events = await _collect(sub, 2, timeout=0.2)
|
|
assert len(events) == 2
|
|
# We kept the 2 most recent.
|
|
assert events[-1].payload["i"] == 4
|
|
finally:
|
|
await bus.close()
|
|
|
|
|
|
class TestNullBus:
|
|
async def test_publish_is_noop(self) -> None:
|
|
bus = NullBus()
|
|
await bus.connect()
|
|
await bus.publish("anything", {"x": 1})
|
|
await bus.close()
|
|
|
|
async def test_subscribe_yields_nothing(self) -> None:
|
|
bus = NullBus()
|
|
sub = bus.subscribe("topology.>")
|
|
async with sub:
|
|
# Iteration must stop immediately.
|
|
events = [e async for e in sub]
|
|
assert events == []
|