Files
DECNET/tests/clustering/test_clusterer_worker.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

184 lines
5.4 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""End-to-end tests for the clusterer worker shell.
The skeleton clusterer is a no-op; these tests cover the shell:
* exits cleanly on shutdown signal (and via cancel)
* invokes ``tick`` on each loop iteration
* publishes :class:`ClusterResult` side-effects on the right topics
* a clusterer raising from ``tick`` is logged and does not crash the loop
"""
from __future__ import annotations
import asyncio
import pytest
from decnet.bus import topics as _topics
from decnet.clustering.base import Clusterer, ClusterResult
from decnet.clustering.impl.connected_components import ConnectedComponentsClusterer
from decnet.clustering.worker import run_clusterer_loop
from decnet.web.db.factory import get_repository
@pytest.fixture
async def repo(tmp_path):
r = get_repository(db_path=str(tmp_path / "clusterer.db"))
await r.initialize()
return r
@pytest.fixture(autouse=True)
def _no_bus(monkeypatch):
"""Run workers in poll-only mode — no real Unix socket."""
monkeypatch.setenv("DECNET_BUS_ENABLED", "false")
class _FakeClusterer(Clusterer):
"""Test double: returns canned :class:`ClusterResult` per call."""
name = "fake"
def __init__(self, results: list[ClusterResult] | None = None) -> None:
self._results = list(results or [])
self.calls = 0
async def tick(self, repo) -> ClusterResult:
self.calls += 1
if self._results:
return self._results.pop(0)
return ClusterResult()
class _RaisingClusterer(Clusterer):
name = "raising"
def __init__(self) -> None:
self.calls = 0
async def tick(self, repo) -> ClusterResult:
self.calls += 1
raise RuntimeError("boom")
@pytest.mark.anyio
async def test_loop_exits_on_shutdown_signal(repo):
shutdown = asyncio.Event()
clusterer = _FakeClusterer()
task = asyncio.create_task(
run_clusterer_loop(
repo,
poll_interval_secs=0.05,
clusterer=clusterer,
shutdown=shutdown,
)
)
await asyncio.sleep(0.12)
shutdown.set()
await asyncio.wait_for(task, timeout=2.0)
assert clusterer.calls >= 1
@pytest.mark.anyio
async def test_loop_exits_on_cancel(repo):
clusterer = _FakeClusterer()
task = asyncio.create_task(
run_clusterer_loop(
repo,
poll_interval_secs=0.05,
clusterer=clusterer,
)
)
await asyncio.sleep(0.1)
task.cancel()
# The loop catches CancelledError and exits cleanly, mirroring the
# intel + reuse worker shells.
await asyncio.wait_for(task, timeout=2.0)
assert clusterer.calls >= 1
@pytest.mark.anyio
async def test_tick_failure_does_not_crash_loop(repo):
"""A clusterer raising from tick must be logged, not propagated."""
shutdown = asyncio.Event()
clusterer = _RaisingClusterer()
task = asyncio.create_task(
run_clusterer_loop(
repo,
poll_interval_secs=0.05,
clusterer=clusterer,
shutdown=shutdown,
)
)
await asyncio.sleep(0.2)
shutdown.set()
await asyncio.wait_for(task, timeout=2.0)
# Loop kept ticking despite the raise.
assert clusterer.calls >= 2
@pytest.mark.anyio
async def test_skeleton_clusterer_returns_empty_result(repo):
"""The connected-components skeleton produces no side-effects yet."""
c = ConnectedComponentsClusterer()
result = await c.tick(repo)
assert result.identities_formed == []
assert result.observations_linked == []
assert result.identities_merged == []
assert result.identities_unmerged == []
@pytest.mark.anyio
async def test_publishes_cluster_result_on_bus(monkeypatch, repo):
"""Every entry in ClusterResult fans out to the correct topic."""
published: list[tuple[str, dict, str]] = []
async def _fake_publish(bus, topic, payload, event_type=""):
published.append((topic, payload, event_type))
monkeypatch.setattr(
"decnet.clustering.worker.publish_safely", _fake_publish,
)
result = ClusterResult(
identities_formed=[
{"identity_uuid": "id-1", "observation_uuids": ["obs-1", "obs-2"]},
],
observations_linked=[
{"identity_uuid": "id-1", "observation_uuid": "obs-3"},
],
identities_merged=[
{"winner_uuid": "id-1", "loser_uuid": "id-2"},
],
identities_unmerged=[
{"resurrected_uuid": "id-2", "former_winner_uuid": "id-1"},
],
)
clusterer = _FakeClusterer(results=[result])
shutdown = asyncio.Event()
task = asyncio.create_task(
run_clusterer_loop(
repo,
poll_interval_secs=0.05,
clusterer=clusterer,
shutdown=shutdown,
)
)
await asyncio.sleep(0.1)
shutdown.set()
await asyncio.wait_for(task, timeout=2.0)
topics_seen = {t for t, _, _ in published}
assert _topics.identity(_topics.IDENTITY_FORMED) in topics_seen
assert _topics.identity(_topics.IDENTITY_OBSERVATION_LINKED) in topics_seen
assert _topics.identity(_topics.IDENTITY_MERGED) in topics_seen
assert _topics.identity(_topics.IDENTITY_UNMERGED) in topics_seen
@pytest.mark.anyio
async def test_clusterer_registered_in_cli():
"""`decnet clusterer` is registered as a master-only command."""
from decnet.cli.gating import MASTER_ONLY_COMMANDS
assert "clusterer" in MASTER_ONLY_COMMANDS