feat(clustering): identity clusterer worker skeleton
Adds the decnet clusterer master-only command + provider-subpackage shape (base.py + factory.py + impl/connected_components.py) so subsequent commits can land similarity-graph features without churning callers. The skeleton ConnectedComponentsClusterer.tick is a no-op; the worker shell is fully wired (bus consumer on attacker.observed + attacker.scored, slow-tick fallback, health heartbeat, control listener, ClusterResult fan-out to identity.formed/observation.linked /merged). Subscribers on identity.> see no events from this clusterer until edge functions land, but the lifecycle is in place.
This commit is contained in:
@@ -29,7 +29,7 @@ MASTER_ONLY_COMMANDS: frozenset[str] = frozenset({
|
|||||||
"api", "swarmctl", "deploy", "redeploy", "teardown",
|
"api", "swarmctl", "deploy", "redeploy", "teardown",
|
||||||
"mutate", "listener", "profiler",
|
"mutate", "listener", "profiler",
|
||||||
"services", "distros", "correlate", "archetypes", "web",
|
"services", "distros", "correlate", "archetypes", "web",
|
||||||
"db-reset", "init", "webhook",
|
"db-reset", "init", "webhook", "clusterer",
|
||||||
})
|
})
|
||||||
MASTER_ONLY_GROUPS: frozenset[str] = frozenset({"swarm", "topology", "geoip"})
|
MASTER_ONLY_GROUPS: frozenset[str] = frozenset({"swarm", "topology", "geoip"})
|
||||||
|
|
||||||
|
|||||||
@@ -191,3 +191,51 @@ def register(app: typer.Typer) -> None:
|
|||||||
asyncio.run(_run())
|
asyncio.run(_run())
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
console.print("\n[yellow]Reuse correlator stopped.[/]")
|
console.print("\n[yellow]Reuse correlator stopped.[/]")
|
||||||
|
|
||||||
|
@app.command(name="clusterer")
|
||||||
|
def clusterer(
|
||||||
|
poll_interval_secs: float = typer.Option(
|
||||||
|
60.0, "--poll-interval", "-i",
|
||||||
|
help="Slow-tick fallback when the bus is idle or unavailable (seconds)",
|
||||||
|
),
|
||||||
|
daemon: bool = typer.Option(
|
||||||
|
False, "--daemon", "-d",
|
||||||
|
help="Detach to background as a daemon process",
|
||||||
|
),
|
||||||
|
) -> None:
|
||||||
|
"""Identity-resolution clusterer.
|
||||||
|
|
||||||
|
Bus-woken on ``attacker.observed`` and ``attacker.scored``;
|
||||||
|
builds a similarity graph over observations, runs
|
||||||
|
connected-components, writes ``attacker_identities`` rows, and
|
||||||
|
publishes ``identity.formed`` / ``identity.observation.linked``
|
||||||
|
/ ``identity.merged`` / ``identity.unmerged``.
|
||||||
|
"""
|
||||||
|
import asyncio
|
||||||
|
from decnet.cli.gating import _require_master_mode
|
||||||
|
from decnet.clustering.worker import run_clusterer_loop
|
||||||
|
from decnet.web.dependencies import repo
|
||||||
|
|
||||||
|
_require_master_mode("clusterer")
|
||||||
|
|
||||||
|
if daemon:
|
||||||
|
log.info("clusterer daemonizing poll=%s", poll_interval_secs)
|
||||||
|
_utils._daemonize()
|
||||||
|
|
||||||
|
log.info("clusterer command invoked poll=%s", poll_interval_secs)
|
||||||
|
console.print(
|
||||||
|
f"[bold cyan]Identity clusterer starting[/] "
|
||||||
|
f"poll={poll_interval_secs}s"
|
||||||
|
)
|
||||||
|
console.print("[dim]Press Ctrl+C to stop[/]")
|
||||||
|
|
||||||
|
async def _run() -> None:
|
||||||
|
await repo.initialize()
|
||||||
|
await run_clusterer_loop(
|
||||||
|
repo, poll_interval_secs=poll_interval_secs,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
asyncio.run(_run())
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
console.print("\n[yellow]Identity clusterer stopped.[/]")
|
||||||
|
|||||||
83
decnet/clustering/base.py
Normal file
83
decnet/clustering/base.py
Normal file
@@ -0,0 +1,83 @@
|
|||||||
|
"""Identity-resolution clusterer protocol.
|
||||||
|
|
||||||
|
Each concrete clusterer (``decnet.clustering.impl.connected_components``,
|
||||||
|
and any future variant) implements this. Callers must obtain the active
|
||||||
|
clusterer via :func:`decnet.clustering.factory.get_clusterer` — never
|
||||||
|
instantiate a concrete class directly.
|
||||||
|
|
||||||
|
The clusterer mirrors the provider-subpackage convention used by
|
||||||
|
:mod:`decnet.bus` and :mod:`decnet.web.db`: ``base.py`` defines the
|
||||||
|
protocol, ``factory.py`` dispatches on ``DECNET_CLUSTERER_TYPE``, and
|
||||||
|
``impl/`` holds concrete implementations.
|
||||||
|
|
||||||
|
Distinct from the ``tests/factories/campaign_factory.py`` namespace —
|
||||||
|
that's the synthetic-data DSL used by the fixture suite. The clusterer
|
||||||
|
here is the production worker that the fixture suite *gates*.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from decnet.web.db.repository import BaseRepository
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ClusterResult:
|
||||||
|
"""Side-effects produced by a single clusterer ``tick``.
|
||||||
|
|
||||||
|
The worker shell consumes these to publish on the bus
|
||||||
|
(``identity.formed`` / ``identity.observation.linked`` /
|
||||||
|
``identity.merged`` / ``identity.unmerged``). The clusterer itself
|
||||||
|
has already committed any DB writes by the time it returns this —
|
||||||
|
losing a publish is at most a few seconds of UI latency.
|
||||||
|
"""
|
||||||
|
|
||||||
|
identities_formed: list[dict[str, Any]] = field(default_factory=list)
|
||||||
|
"""One dict per newly created identity. Shape:
|
||||||
|
``{"identity_uuid": str, "observation_uuids": [str, ...]}``."""
|
||||||
|
|
||||||
|
observations_linked: list[dict[str, Any]] = field(default_factory=list)
|
||||||
|
"""One dict per observation attached to an existing identity. Shape:
|
||||||
|
``{"identity_uuid": str, "observation_uuid": str}``."""
|
||||||
|
|
||||||
|
identities_merged: list[dict[str, Any]] = field(default_factory=list)
|
||||||
|
"""One dict per merge. Shape: ``{"winner_uuid": str,
|
||||||
|
"loser_uuid": str}``."""
|
||||||
|
|
||||||
|
identities_unmerged: list[dict[str, Any]] = field(default_factory=list)
|
||||||
|
"""One dict per revoked merge (contradicting evidence re-split a
|
||||||
|
previously-merged pair). Shape:
|
||||||
|
``{"resurrected_uuid": str, "former_winner_uuid": str}``.
|
||||||
|
|
||||||
|
Reserved for the revocable-merge work; the skeleton clusterer never
|
||||||
|
produces these. Subscribers on ``identity.>`` should still handle
|
||||||
|
them from day one — see ``identity.unmerged`` in
|
||||||
|
:mod:`decnet.bus.topics`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class Clusterer(ABC):
|
||||||
|
"""Abstract identity-resolution clusterer.
|
||||||
|
|
||||||
|
Single-method contract: ``tick`` reads pending observations from the
|
||||||
|
repo, runs a clustering pass, commits ``attacker_identities`` rows +
|
||||||
|
sets ``attackers.identity_id``, and returns a :class:`ClusterResult`
|
||||||
|
summarising the side-effects so the worker shell can publish.
|
||||||
|
|
||||||
|
Implementations MUST NOT raise from ``tick``: a single bad pass
|
||||||
|
cannot be allowed to crash the worker. Internal failures should be
|
||||||
|
logged and the method should return an empty :class:`ClusterResult`.
|
||||||
|
"""
|
||||||
|
|
||||||
|
#: Short tag — surfaces in logs and in
|
||||||
|
#: ``DECNET_CLUSTERER_TYPE`` for factory dispatch.
|
||||||
|
name: str
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
async def tick(self, repo: BaseRepository) -> ClusterResult:
|
||||||
|
"""Run a single clustering pass. See class docstring."""
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ["Clusterer", "ClusterResult"]
|
||||||
46
decnet/clustering/factory.py
Normal file
46
decnet/clustering/factory.py
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
"""Clusterer factory.
|
||||||
|
|
||||||
|
Returns the active :class:`~decnet.clustering.base.Clusterer` instance.
|
||||||
|
Mirrors :mod:`decnet.bus.factory` and :mod:`decnet.web.db.factory`:
|
||||||
|
callers obtain the clusterer via :func:`get_clusterer` rather than
|
||||||
|
importing a concrete impl directly.
|
||||||
|
|
||||||
|
Configuration knobs (env-overridable):
|
||||||
|
|
||||||
|
* ``DECNET_CLUSTERER_TYPE`` — which implementation to use. Default
|
||||||
|
``"connected_components"``. Unknown values raise :class:`ValueError`
|
||||||
|
so a typo in ``decnet.ini`` surfaces immediately rather than silently
|
||||||
|
falling back.
|
||||||
|
|
||||||
|
The ``connected_components`` implementation is the v1 production
|
||||||
|
clusterer. Other implementations (e.g. an HDBSCAN variant) can land
|
||||||
|
here later without churning callers.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
from decnet.clustering.base import Clusterer
|
||||||
|
|
||||||
|
_KNOWN_CLUSTERERS = ("connected_components",)
|
||||||
|
_DEFAULT_CLUSTERER = "connected_components"
|
||||||
|
|
||||||
|
|
||||||
|
def get_clusterer() -> Clusterer:
|
||||||
|
"""Return the configured clusterer instance.
|
||||||
|
|
||||||
|
Lazy-imports the concrete impl so the base module stays free of
|
||||||
|
implementation-specific dependencies.
|
||||||
|
"""
|
||||||
|
name = os.environ.get("DECNET_CLUSTERER_TYPE", _DEFAULT_CLUSTERER).strip().lower()
|
||||||
|
if name == "connected_components":
|
||||||
|
from decnet.clustering.impl.connected_components import (
|
||||||
|
ConnectedComponentsClusterer,
|
||||||
|
)
|
||||||
|
return ConnectedComponentsClusterer()
|
||||||
|
raise ValueError(
|
||||||
|
f"Unknown clusterer: {name!r}. Known: {_KNOWN_CLUSTERERS}"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ["get_clusterer"]
|
||||||
6
decnet/clustering/impl/__init__.py
Normal file
6
decnet/clustering/impl/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
"""Concrete clusterer implementations.
|
||||||
|
|
||||||
|
Each module here contains exactly one :class:`~decnet.clustering.base.Clusterer`
|
||||||
|
subclass. New implementations register themselves in
|
||||||
|
:func:`decnet.clustering.factory.get_clusterer`.
|
||||||
|
"""
|
||||||
48
decnet/clustering/impl/connected_components.py
Normal file
48
decnet/clustering/impl/connected_components.py
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
"""Connected-components identity clusterer (v1).
|
||||||
|
|
||||||
|
Builds a similarity graph over observations (per-IP attacker rows),
|
||||||
|
runs connected-components over edges that pass a confidence threshold,
|
||||||
|
and writes one ``attacker_identities`` row per component.
|
||||||
|
|
||||||
|
This module is the **skeleton**. The ``tick`` method is a no-op until
|
||||||
|
the similarity-graph features land in subsequent commits. Subscribers
|
||||||
|
on ``identity.>`` see no traffic from this clusterer until the edge
|
||||||
|
functions are wired in.
|
||||||
|
|
||||||
|
Subsequent commits add, in order:
|
||||||
|
|
||||||
|
1. Similarity-graph scaffolding (``impl/similarity.py``).
|
||||||
|
2. High-weight edges (JA3/JA4/HASSH/payload/C2 exact match).
|
||||||
|
3. Medium-weight edges (command-sequence Jaccard bucketed by UKC phase).
|
||||||
|
4. Phase-handoff edges (designed for fixture 5).
|
||||||
|
5. Low-weight edges (credential Jaccard, ASN) — must NOT cluster F1/F2 alone.
|
||||||
|
6. Revocable merges (``identity.merged`` / ``identity.unmerged``).
|
||||||
|
|
||||||
|
Edges MUST stay time-agnostic — fixture 7 proves recency-decay clustering
|
||||||
|
fragments multi-month APT campaigns.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from decnet.clustering.base import Clusterer, ClusterResult
|
||||||
|
from decnet.logging import get_logger
|
||||||
|
from decnet.web.db.repository import BaseRepository
|
||||||
|
|
||||||
|
log = get_logger("clustering.connected_components")
|
||||||
|
|
||||||
|
|
||||||
|
class ConnectedComponentsClusterer(Clusterer):
|
||||||
|
"""Connected-components clusterer.
|
||||||
|
|
||||||
|
Skeleton implementation: ``tick`` is a no-op. Wiring lands in
|
||||||
|
subsequent commits.
|
||||||
|
"""
|
||||||
|
|
||||||
|
name = "connected_components"
|
||||||
|
|
||||||
|
async def tick(self, repo: BaseRepository) -> ClusterResult:
|
||||||
|
# No similarity edges defined yet; produce an empty result.
|
||||||
|
# Subsequent commits replace this with the real pass.
|
||||||
|
return ClusterResult()
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ["ConnectedComponentsClusterer"]
|
||||||
176
decnet/clustering/worker.py
Normal file
176
decnet/clustering/worker.py
Normal file
@@ -0,0 +1,176 @@
|
|||||||
|
"""Long-running identity-resolution clusterer worker.
|
||||||
|
|
||||||
|
Runs :meth:`Clusterer.tick` on bus-wake or slow-tick fallback. Mirrors
|
||||||
|
:mod:`decnet.intel.worker` and :mod:`decnet.correlation.reuse_worker`:
|
||||||
|
woken on ``attacker.observed`` and ``attacker.scored`` for sub-second
|
||||||
|
latency, falls back to a 60s poll when the bus is unavailable.
|
||||||
|
|
||||||
|
The clusterer itself owns its DB writes (``attacker_identities`` +
|
||||||
|
``attackers.identity_id`` updates). The worker shell is responsible only
|
||||||
|
for:
|
||||||
|
|
||||||
|
* lifecycle (bus connect, heartbeat, control listener, clean shutdown),
|
||||||
|
* publishing ``identity.formed`` / ``identity.observation.linked`` /
|
||||||
|
``identity.merged`` / ``identity.unmerged`` from the
|
||||||
|
:class:`ClusterResult` returned by ``tick``.
|
||||||
|
|
||||||
|
The skeleton ``ConnectedComponentsClusterer.tick`` returns an empty
|
||||||
|
result, so this worker runs but emits no identity events until edges
|
||||||
|
are wired in.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import contextlib
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from decnet.bus import topics as _topics
|
||||||
|
from decnet.bus.base import BaseBus
|
||||||
|
from decnet.bus.factory import get_bus
|
||||||
|
from decnet.bus.publish import (
|
||||||
|
publish_safely,
|
||||||
|
run_control_listener_signal as _run_control_listener_signal,
|
||||||
|
run_health_heartbeat as _run_health_heartbeat,
|
||||||
|
)
|
||||||
|
from decnet.clustering.base import Clusterer, ClusterResult
|
||||||
|
from decnet.clustering.factory import get_clusterer
|
||||||
|
from decnet.logging import get_logger
|
||||||
|
from decnet.web.db.repository import BaseRepository
|
||||||
|
|
||||||
|
log = get_logger("clustering.worker")
|
||||||
|
|
||||||
|
_DEFAULT_POLL_SECS = 60.0
|
||||||
|
|
||||||
|
|
||||||
|
async def run_clusterer_loop(
|
||||||
|
repo: BaseRepository,
|
||||||
|
*,
|
||||||
|
poll_interval_secs: float = _DEFAULT_POLL_SECS,
|
||||||
|
clusterer: Optional[Clusterer] = None,
|
||||||
|
shutdown: Optional[asyncio.Event] = None,
|
||||||
|
) -> None:
|
||||||
|
"""Run the identity clusterer until cancelled.
|
||||||
|
|
||||||
|
*clusterer* defaults to :func:`get_clusterer` — tests pass a fake.
|
||||||
|
*shutdown* is an optional external stop signal; the loop also exits
|
||||||
|
cleanly on :class:`asyncio.CancelledError` and
|
||||||
|
:class:`KeyboardInterrupt`.
|
||||||
|
"""
|
||||||
|
if clusterer is None:
|
||||||
|
clusterer = get_clusterer()
|
||||||
|
log.info(
|
||||||
|
"clusterer started impl=%s poll_interval_secs=%s",
|
||||||
|
clusterer.name, poll_interval_secs,
|
||||||
|
)
|
||||||
|
|
||||||
|
bus: Optional[BaseBus] = None
|
||||||
|
wake = asyncio.Event()
|
||||||
|
wake_tasks: list[asyncio.Task] = []
|
||||||
|
heartbeat_task: Optional[asyncio.Task] = None
|
||||||
|
try:
|
||||||
|
candidate = get_bus(client_name="clusterer")
|
||||||
|
await candidate.connect()
|
||||||
|
bus = candidate
|
||||||
|
wake_tasks.append(asyncio.create_task(
|
||||||
|
_wake_on(bus, wake, _topics.attacker(_topics.ATTACKER_OBSERVED)),
|
||||||
|
))
|
||||||
|
wake_tasks.append(asyncio.create_task(
|
||||||
|
_wake_on(bus, wake, _topics.attacker(_topics.ATTACKER_SCORED)),
|
||||||
|
))
|
||||||
|
heartbeat_task = asyncio.create_task(
|
||||||
|
_run_health_heartbeat(bus, "clusterer"),
|
||||||
|
)
|
||||||
|
wake_tasks.append(asyncio.create_task(
|
||||||
|
_run_control_listener_signal(bus, "clusterer"),
|
||||||
|
))
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
log.warning(
|
||||||
|
"clusterer: bus unavailable, running in poll-only mode: %s", exc,
|
||||||
|
)
|
||||||
|
|
||||||
|
if shutdown is None:
|
||||||
|
shutdown = asyncio.Event()
|
||||||
|
|
||||||
|
try:
|
||||||
|
while not shutdown.is_set():
|
||||||
|
try:
|
||||||
|
result = await clusterer.tick(repo)
|
||||||
|
except Exception: # noqa: BLE001
|
||||||
|
log.exception("clusterer: tick failed")
|
||||||
|
result = ClusterResult()
|
||||||
|
|
||||||
|
await _publish_result(bus, result)
|
||||||
|
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(
|
||||||
|
wake.wait(), timeout=float(poll_interval_secs),
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
|
wake.clear()
|
||||||
|
except (asyncio.CancelledError, KeyboardInterrupt):
|
||||||
|
log.info("clusterer stopped")
|
||||||
|
finally:
|
||||||
|
for t in wake_tasks:
|
||||||
|
t.cancel()
|
||||||
|
if heartbeat_task is not None:
|
||||||
|
heartbeat_task.cancel()
|
||||||
|
for t in (*wake_tasks, heartbeat_task):
|
||||||
|
if t is None:
|
||||||
|
continue
|
||||||
|
with contextlib.suppress(asyncio.CancelledError, Exception):
|
||||||
|
await t
|
||||||
|
if bus is not None:
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
await bus.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def _publish_result(bus: Optional[BaseBus], result: ClusterResult) -> None:
|
||||||
|
"""Fan ``ClusterResult`` out to the four ``identity.*`` topics."""
|
||||||
|
for formed in result.identities_formed:
|
||||||
|
await publish_safely(
|
||||||
|
bus,
|
||||||
|
_topics.identity(_topics.IDENTITY_FORMED),
|
||||||
|
formed,
|
||||||
|
event_type=_topics.IDENTITY_FORMED,
|
||||||
|
)
|
||||||
|
for linked in result.observations_linked:
|
||||||
|
await publish_safely(
|
||||||
|
bus,
|
||||||
|
_topics.identity(_topics.IDENTITY_OBSERVATION_LINKED),
|
||||||
|
linked,
|
||||||
|
event_type=_topics.IDENTITY_OBSERVATION_LINKED,
|
||||||
|
)
|
||||||
|
for merged in result.identities_merged:
|
||||||
|
await publish_safely(
|
||||||
|
bus,
|
||||||
|
_topics.identity(_topics.IDENTITY_MERGED),
|
||||||
|
merged,
|
||||||
|
event_type=_topics.IDENTITY_MERGED,
|
||||||
|
)
|
||||||
|
# identities_unmerged ships once IDENTITY_UNMERGED is reserved
|
||||||
|
# (next commit). The field is already on ClusterResult so the
|
||||||
|
# revocable-merge work doesn't reshape the dataclass.
|
||||||
|
|
||||||
|
|
||||||
|
async def _wake_on(bus: BaseBus, wake: asyncio.Event, pattern: str) -> None:
|
||||||
|
"""Flip *wake* every time *pattern* fires on the bus.
|
||||||
|
|
||||||
|
Survives transient subscriber errors by logging and exiting; the
|
||||||
|
poll-interval fallback keeps the loop alive in poll-only mode.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
sub = bus.subscribe(pattern)
|
||||||
|
async with sub:
|
||||||
|
async for _event in sub:
|
||||||
|
wake.set()
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
raise
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
log.warning(
|
||||||
|
"clusterer: subscriber for %s died (%s); falling back to poll",
|
||||||
|
pattern, exc,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ["run_clusterer_loop"]
|
||||||
34
tests/clustering/test_clusterer_factory.py
Normal file
34
tests/clustering/test_clusterer_factory.py
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
"""Tests for :mod:`decnet.clustering.factory`."""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from decnet.clustering.base import Clusterer
|
||||||
|
from decnet.clustering.factory import get_clusterer
|
||||||
|
from decnet.clustering.impl.connected_components import ConnectedComponentsClusterer
|
||||||
|
|
||||||
|
|
||||||
|
def test_default_returns_connected_components(monkeypatch):
|
||||||
|
monkeypatch.delenv("DECNET_CLUSTERER_TYPE", raising=False)
|
||||||
|
c = get_clusterer()
|
||||||
|
assert isinstance(c, ConnectedComponentsClusterer)
|
||||||
|
assert isinstance(c, Clusterer)
|
||||||
|
assert c.name == "connected_components"
|
||||||
|
|
||||||
|
|
||||||
|
def test_explicit_connected_components(monkeypatch):
|
||||||
|
monkeypatch.setenv("DECNET_CLUSTERER_TYPE", "connected_components")
|
||||||
|
c = get_clusterer()
|
||||||
|
assert isinstance(c, ConnectedComponentsClusterer)
|
||||||
|
|
||||||
|
|
||||||
|
def test_unknown_clusterer_type_raises(monkeypatch):
|
||||||
|
monkeypatch.setenv("DECNET_CLUSTERER_TYPE", "nope")
|
||||||
|
with pytest.raises(ValueError, match="Unknown clusterer"):
|
||||||
|
get_clusterer()
|
||||||
|
|
||||||
|
|
||||||
|
def test_case_insensitive(monkeypatch):
|
||||||
|
monkeypatch.setenv("DECNET_CLUSTERER_TYPE", " CONNECTED_COMPONENTS ")
|
||||||
|
c = get_clusterer()
|
||||||
|
assert isinstance(c, ConnectedComponentsClusterer)
|
||||||
178
tests/clustering/test_clusterer_worker.py
Normal file
178
tests/clustering/test_clusterer_worker.py
Normal file
@@ -0,0 +1,178 @@
|
|||||||
|
"""End-to-end tests for the clusterer worker shell.
|
||||||
|
|
||||||
|
The skeleton clusterer is a no-op; these tests cover the shell:
|
||||||
|
|
||||||
|
* exits cleanly on shutdown signal (and via cancel)
|
||||||
|
* invokes ``tick`` on each loop iteration
|
||||||
|
* publishes :class:`ClusterResult` side-effects on the right topics
|
||||||
|
* a clusterer raising from ``tick`` is logged and does not crash the loop
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from decnet.bus import topics as _topics
|
||||||
|
from decnet.clustering.base import Clusterer, ClusterResult
|
||||||
|
from decnet.clustering.impl.connected_components import ConnectedComponentsClusterer
|
||||||
|
from decnet.clustering.worker import run_clusterer_loop
|
||||||
|
from decnet.web.db.factory import get_repository
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def repo(tmp_path):
|
||||||
|
r = get_repository(db_path=str(tmp_path / "clusterer.db"))
|
||||||
|
await r.initialize()
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _no_bus(monkeypatch):
|
||||||
|
"""Run workers in poll-only mode — no real Unix socket."""
|
||||||
|
monkeypatch.setenv("DECNET_BUS_ENABLED", "false")
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeClusterer(Clusterer):
|
||||||
|
"""Test double: returns canned :class:`ClusterResult` per call."""
|
||||||
|
|
||||||
|
name = "fake"
|
||||||
|
|
||||||
|
def __init__(self, results: list[ClusterResult] | None = None) -> None:
|
||||||
|
self._results = list(results or [])
|
||||||
|
self.calls = 0
|
||||||
|
|
||||||
|
async def tick(self, repo) -> ClusterResult:
|
||||||
|
self.calls += 1
|
||||||
|
if self._results:
|
||||||
|
return self._results.pop(0)
|
||||||
|
return ClusterResult()
|
||||||
|
|
||||||
|
|
||||||
|
class _RaisingClusterer(Clusterer):
|
||||||
|
name = "raising"
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.calls = 0
|
||||||
|
|
||||||
|
async def tick(self, repo) -> ClusterResult:
|
||||||
|
self.calls += 1
|
||||||
|
raise RuntimeError("boom")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_loop_exits_on_shutdown_signal(repo):
|
||||||
|
shutdown = asyncio.Event()
|
||||||
|
clusterer = _FakeClusterer()
|
||||||
|
task = asyncio.create_task(
|
||||||
|
run_clusterer_loop(
|
||||||
|
repo,
|
||||||
|
poll_interval_secs=0.05,
|
||||||
|
clusterer=clusterer,
|
||||||
|
shutdown=shutdown,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
await asyncio.sleep(0.12)
|
||||||
|
shutdown.set()
|
||||||
|
await asyncio.wait_for(task, timeout=2.0)
|
||||||
|
assert clusterer.calls >= 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_loop_exits_on_cancel(repo):
|
||||||
|
clusterer = _FakeClusterer()
|
||||||
|
task = asyncio.create_task(
|
||||||
|
run_clusterer_loop(
|
||||||
|
repo,
|
||||||
|
poll_interval_secs=0.05,
|
||||||
|
clusterer=clusterer,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
task.cancel()
|
||||||
|
# The loop catches CancelledError and exits cleanly, mirroring the
|
||||||
|
# intel + reuse worker shells.
|
||||||
|
await asyncio.wait_for(task, timeout=2.0)
|
||||||
|
assert clusterer.calls >= 1
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_tick_failure_does_not_crash_loop(repo):
|
||||||
|
"""A clusterer raising from tick must be logged, not propagated."""
|
||||||
|
shutdown = asyncio.Event()
|
||||||
|
clusterer = _RaisingClusterer()
|
||||||
|
task = asyncio.create_task(
|
||||||
|
run_clusterer_loop(
|
||||||
|
repo,
|
||||||
|
poll_interval_secs=0.05,
|
||||||
|
clusterer=clusterer,
|
||||||
|
shutdown=shutdown,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
shutdown.set()
|
||||||
|
await asyncio.wait_for(task, timeout=2.0)
|
||||||
|
# Loop kept ticking despite the raise.
|
||||||
|
assert clusterer.calls >= 2
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_skeleton_clusterer_returns_empty_result(repo):
|
||||||
|
"""The connected-components skeleton produces no side-effects yet."""
|
||||||
|
c = ConnectedComponentsClusterer()
|
||||||
|
result = await c.tick(repo)
|
||||||
|
assert result.identities_formed == []
|
||||||
|
assert result.observations_linked == []
|
||||||
|
assert result.identities_merged == []
|
||||||
|
assert result.identities_unmerged == []
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_publishes_cluster_result_on_bus(monkeypatch, repo):
|
||||||
|
"""Every entry in ClusterResult fans out to the correct topic."""
|
||||||
|
published: list[tuple[str, dict, str]] = []
|
||||||
|
|
||||||
|
async def _fake_publish(bus, topic, payload, event_type=""):
|
||||||
|
published.append((topic, payload, event_type))
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"decnet.clustering.worker.publish_safely", _fake_publish,
|
||||||
|
)
|
||||||
|
|
||||||
|
result = ClusterResult(
|
||||||
|
identities_formed=[
|
||||||
|
{"identity_uuid": "id-1", "observation_uuids": ["obs-1", "obs-2"]},
|
||||||
|
],
|
||||||
|
observations_linked=[
|
||||||
|
{"identity_uuid": "id-1", "observation_uuid": "obs-3"},
|
||||||
|
],
|
||||||
|
identities_merged=[
|
||||||
|
{"winner_uuid": "id-1", "loser_uuid": "id-2"},
|
||||||
|
],
|
||||||
|
)
|
||||||
|
clusterer = _FakeClusterer(results=[result])
|
||||||
|
|
||||||
|
shutdown = asyncio.Event()
|
||||||
|
task = asyncio.create_task(
|
||||||
|
run_clusterer_loop(
|
||||||
|
repo,
|
||||||
|
poll_interval_secs=0.05,
|
||||||
|
clusterer=clusterer,
|
||||||
|
shutdown=shutdown,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
shutdown.set()
|
||||||
|
await asyncio.wait_for(task, timeout=2.0)
|
||||||
|
|
||||||
|
topics_seen = {t for t, _, _ in published}
|
||||||
|
assert _topics.identity(_topics.IDENTITY_FORMED) in topics_seen
|
||||||
|
assert _topics.identity(_topics.IDENTITY_OBSERVATION_LINKED) in topics_seen
|
||||||
|
assert _topics.identity(_topics.IDENTITY_MERGED) in topics_seen
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_clusterer_registered_in_cli():
|
||||||
|
"""`decnet clusterer` is registered as a master-only command."""
|
||||||
|
from decnet.cli.gating import MASTER_ONLY_COMMANDS
|
||||||
|
assert "clusterer" in MASTER_ONLY_COMMANDS
|
||||||
Reference in New Issue
Block a user