Files
DECNET/tests/clustering/test_worker_publish.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

98 lines
2.9 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Clusterer publishes ``identity.formed`` and ``identity.merged``.
Pins the producer wiring. The clusterer reports its tick output via a
:class:`ClusterResult`; the worker fans the four sub-lists out to the
matching ``identity.*`` topics. This test runs one tick with a fake
clusterer that returns a result containing one formed and one merged
identity, and asserts the bus saw both envelopes.
"""
from __future__ import annotations
import asyncio
from typing import Any
import pytest
from decnet.bus import topics as _topics
from decnet.bus.fake import FakeBus
from decnet.clustering import worker as _cw
from decnet.clustering.base import ClusterResult, Clusterer
from decnet.web.db.repository import BaseRepository
class _FakeClusterer(Clusterer):
name = "fake"
def __init__(self, results: list[ClusterResult]) -> None:
self._results = list(results)
async def tick(self, _repo: BaseRepository) -> ClusterResult:
if self._results:
return self._results.pop(0)
return ClusterResult()
@pytest.mark.asyncio
async def test_clusterer_publishes_identity_formed_and_merged(
monkeypatch: pytest.MonkeyPatch,
) -> None:
bus = FakeBus()
await bus.connect()
monkeypatch.setattr(_cw, "get_bus", lambda *_a, **_kw: bus)
captured: list[tuple[str, dict[str, Any]]] = []
sub = bus.subscribe("identity.>")
async def drain() -> None:
try:
async with sub:
async for ev in sub:
captured.append((ev.topic, ev.payload))
except Exception:
pass
drain_task = asyncio.create_task(drain())
await asyncio.sleep(0)
result = ClusterResult(
identities_formed=[
{"identity_uuid": "id-1", "observation_uuids": ["obs-1", "obs-2"]},
],
identities_merged=[
{"winner_uuid": "id-1", "loser_uuid": "id-9"},
],
)
fake = _FakeClusterer([result])
shutdown = asyncio.Event()
class _RepoStub:
pass
loop_task = asyncio.create_task(_cw.run_clusterer_loop(
_RepoStub(), # type: ignore[arg-type]
poll_interval_secs=0.05, clusterer=fake,
shutdown=shutdown,
))
await asyncio.sleep(0.15)
shutdown.set()
await asyncio.wait_for(loop_task, timeout=2.0)
drain_task.cancel()
await bus.close()
topics_seen = [t for t, _ in captured]
assert _topics.identity(_topics.IDENTITY_FORMED) in topics_seen
assert _topics.identity(_topics.IDENTITY_MERGED) in topics_seen
formed = next(
p for t, p in captured
if t == _topics.identity(_topics.IDENTITY_FORMED)
)
assert formed["identity_uuid"] == "id-1"
merged = next(
p for t, p in captured
if t == _topics.identity(_topics.IDENTITY_MERGED)
)
assert merged["winner_uuid"] == "id-1"
assert merged["loser_uuid"] == "id-9"