diff --git a/decnet/cli/gating.py b/decnet/cli/gating.py index 2b36236a..201cec60 100644 --- a/decnet/cli/gating.py +++ b/decnet/cli/gating.py @@ -29,7 +29,7 @@ MASTER_ONLY_COMMANDS: frozenset[str] = frozenset({ "api", "swarmctl", "deploy", "redeploy", "teardown", "mutate", "listener", "profiler", "services", "distros", "correlate", "archetypes", "web", - "db-reset", "init", "webhook", + "db-reset", "init", "webhook", "clusterer", }) MASTER_ONLY_GROUPS: frozenset[str] = frozenset({"swarm", "topology", "geoip"}) diff --git a/decnet/cli/workers.py b/decnet/cli/workers.py index 426f88ee..378e9f6f 100644 --- a/decnet/cli/workers.py +++ b/decnet/cli/workers.py @@ -191,3 +191,51 @@ def register(app: typer.Typer) -> None: asyncio.run(_run()) except KeyboardInterrupt: console.print("\n[yellow]Reuse correlator stopped.[/]") + + @app.command(name="clusterer") + def clusterer( + poll_interval_secs: float = typer.Option( + 60.0, "--poll-interval", "-i", + help="Slow-tick fallback when the bus is idle or unavailable (seconds)", + ), + daemon: bool = typer.Option( + False, "--daemon", "-d", + help="Detach to background as a daemon process", + ), + ) -> None: + """Identity-resolution clusterer. + + Bus-woken on ``attacker.observed`` and ``attacker.scored``; + builds a similarity graph over observations, runs + connected-components, writes ``attacker_identities`` rows, and + publishes ``identity.formed`` / ``identity.observation.linked`` + / ``identity.merged`` / ``identity.unmerged``. + """ + import asyncio + from decnet.cli.gating import _require_master_mode + from decnet.clustering.worker import run_clusterer_loop + from decnet.web.dependencies import repo + + _require_master_mode("clusterer") + + if daemon: + log.info("clusterer daemonizing poll=%s", poll_interval_secs) + _utils._daemonize() + + log.info("clusterer command invoked poll=%s", poll_interval_secs) + console.print( + f"[bold cyan]Identity clusterer starting[/] " + f"poll={poll_interval_secs}s" + ) + console.print("[dim]Press Ctrl+C to stop[/]") + + async def _run() -> None: + await repo.initialize() + await run_clusterer_loop( + repo, poll_interval_secs=poll_interval_secs, + ) + + try: + asyncio.run(_run()) + except KeyboardInterrupt: + console.print("\n[yellow]Identity clusterer stopped.[/]") diff --git a/decnet/clustering/base.py b/decnet/clustering/base.py new file mode 100644 index 00000000..dc49a818 --- /dev/null +++ b/decnet/clustering/base.py @@ -0,0 +1,83 @@ +"""Identity-resolution clusterer protocol. + +Each concrete clusterer (``decnet.clustering.impl.connected_components``, +and any future variant) implements this. Callers must obtain the active +clusterer via :func:`decnet.clustering.factory.get_clusterer` — never +instantiate a concrete class directly. + +The clusterer mirrors the provider-subpackage convention used by +:mod:`decnet.bus` and :mod:`decnet.web.db`: ``base.py`` defines the +protocol, ``factory.py`` dispatches on ``DECNET_CLUSTERER_TYPE``, and +``impl/`` holds concrete implementations. + +Distinct from the ``tests/factories/campaign_factory.py`` namespace — +that's the synthetic-data DSL used by the fixture suite. The clusterer +here is the production worker that the fixture suite *gates*. +""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any + +from decnet.web.db.repository import BaseRepository + + +@dataclass +class ClusterResult: + """Side-effects produced by a single clusterer ``tick``. + + The worker shell consumes these to publish on the bus + (``identity.formed`` / ``identity.observation.linked`` / + ``identity.merged`` / ``identity.unmerged``). The clusterer itself + has already committed any DB writes by the time it returns this — + losing a publish is at most a few seconds of UI latency. + """ + + identities_formed: list[dict[str, Any]] = field(default_factory=list) + """One dict per newly created identity. Shape: + ``{"identity_uuid": str, "observation_uuids": [str, ...]}``.""" + + observations_linked: list[dict[str, Any]] = field(default_factory=list) + """One dict per observation attached to an existing identity. Shape: + ``{"identity_uuid": str, "observation_uuid": str}``.""" + + identities_merged: list[dict[str, Any]] = field(default_factory=list) + """One dict per merge. Shape: ``{"winner_uuid": str, + "loser_uuid": str}``.""" + + identities_unmerged: list[dict[str, Any]] = field(default_factory=list) + """One dict per revoked merge (contradicting evidence re-split a + previously-merged pair). Shape: + ``{"resurrected_uuid": str, "former_winner_uuid": str}``. + + Reserved for the revocable-merge work; the skeleton clusterer never + produces these. Subscribers on ``identity.>`` should still handle + them from day one — see ``identity.unmerged`` in + :mod:`decnet.bus.topics`. + """ + + +class Clusterer(ABC): + """Abstract identity-resolution clusterer. + + Single-method contract: ``tick`` reads pending observations from the + repo, runs a clustering pass, commits ``attacker_identities`` rows + + sets ``attackers.identity_id``, and returns a :class:`ClusterResult` + summarising the side-effects so the worker shell can publish. + + Implementations MUST NOT raise from ``tick``: a single bad pass + cannot be allowed to crash the worker. Internal failures should be + logged and the method should return an empty :class:`ClusterResult`. + """ + + #: Short tag — surfaces in logs and in + #: ``DECNET_CLUSTERER_TYPE`` for factory dispatch. + name: str + + @abstractmethod + async def tick(self, repo: BaseRepository) -> ClusterResult: + """Run a single clustering pass. See class docstring.""" + + +__all__ = ["Clusterer", "ClusterResult"] diff --git a/decnet/clustering/factory.py b/decnet/clustering/factory.py new file mode 100644 index 00000000..bd5c396b --- /dev/null +++ b/decnet/clustering/factory.py @@ -0,0 +1,46 @@ +"""Clusterer factory. + +Returns the active :class:`~decnet.clustering.base.Clusterer` instance. +Mirrors :mod:`decnet.bus.factory` and :mod:`decnet.web.db.factory`: +callers obtain the clusterer via :func:`get_clusterer` rather than +importing a concrete impl directly. + +Configuration knobs (env-overridable): + +* ``DECNET_CLUSTERER_TYPE`` — which implementation to use. Default + ``"connected_components"``. Unknown values raise :class:`ValueError` + so a typo in ``decnet.ini`` surfaces immediately rather than silently + falling back. + +The ``connected_components`` implementation is the v1 production +clusterer. Other implementations (e.g. an HDBSCAN variant) can land +here later without churning callers. +""" +from __future__ import annotations + +import os + +from decnet.clustering.base import Clusterer + +_KNOWN_CLUSTERERS = ("connected_components",) +_DEFAULT_CLUSTERER = "connected_components" + + +def get_clusterer() -> Clusterer: + """Return the configured clusterer instance. + + Lazy-imports the concrete impl so the base module stays free of + implementation-specific dependencies. + """ + name = os.environ.get("DECNET_CLUSTERER_TYPE", _DEFAULT_CLUSTERER).strip().lower() + if name == "connected_components": + from decnet.clustering.impl.connected_components import ( + ConnectedComponentsClusterer, + ) + return ConnectedComponentsClusterer() + raise ValueError( + f"Unknown clusterer: {name!r}. Known: {_KNOWN_CLUSTERERS}" + ) + + +__all__ = ["get_clusterer"] diff --git a/decnet/clustering/impl/__init__.py b/decnet/clustering/impl/__init__.py new file mode 100644 index 00000000..fa3c7514 --- /dev/null +++ b/decnet/clustering/impl/__init__.py @@ -0,0 +1,6 @@ +"""Concrete clusterer implementations. + +Each module here contains exactly one :class:`~decnet.clustering.base.Clusterer` +subclass. New implementations register themselves in +:func:`decnet.clustering.factory.get_clusterer`. +""" diff --git a/decnet/clustering/impl/connected_components.py b/decnet/clustering/impl/connected_components.py new file mode 100644 index 00000000..83fb419d --- /dev/null +++ b/decnet/clustering/impl/connected_components.py @@ -0,0 +1,48 @@ +"""Connected-components identity clusterer (v1). + +Builds a similarity graph over observations (per-IP attacker rows), +runs connected-components over edges that pass a confidence threshold, +and writes one ``attacker_identities`` row per component. + +This module is the **skeleton**. The ``tick`` method is a no-op until +the similarity-graph features land in subsequent commits. Subscribers +on ``identity.>`` see no traffic from this clusterer until the edge +functions are wired in. + +Subsequent commits add, in order: + +1. Similarity-graph scaffolding (``impl/similarity.py``). +2. High-weight edges (JA3/JA4/HASSH/payload/C2 exact match). +3. Medium-weight edges (command-sequence Jaccard bucketed by UKC phase). +4. Phase-handoff edges (designed for fixture 5). +5. Low-weight edges (credential Jaccard, ASN) — must NOT cluster F1/F2 alone. +6. Revocable merges (``identity.merged`` / ``identity.unmerged``). + +Edges MUST stay time-agnostic — fixture 7 proves recency-decay clustering +fragments multi-month APT campaigns. +""" +from __future__ import annotations + +from decnet.clustering.base import Clusterer, ClusterResult +from decnet.logging import get_logger +from decnet.web.db.repository import BaseRepository + +log = get_logger("clustering.connected_components") + + +class ConnectedComponentsClusterer(Clusterer): + """Connected-components clusterer. + + Skeleton implementation: ``tick`` is a no-op. Wiring lands in + subsequent commits. + """ + + name = "connected_components" + + async def tick(self, repo: BaseRepository) -> ClusterResult: + # No similarity edges defined yet; produce an empty result. + # Subsequent commits replace this with the real pass. + return ClusterResult() + + +__all__ = ["ConnectedComponentsClusterer"] diff --git a/decnet/clustering/worker.py b/decnet/clustering/worker.py new file mode 100644 index 00000000..0229f855 --- /dev/null +++ b/decnet/clustering/worker.py @@ -0,0 +1,176 @@ +"""Long-running identity-resolution clusterer worker. + +Runs :meth:`Clusterer.tick` on bus-wake or slow-tick fallback. Mirrors +:mod:`decnet.intel.worker` and :mod:`decnet.correlation.reuse_worker`: +woken on ``attacker.observed`` and ``attacker.scored`` for sub-second +latency, falls back to a 60s poll when the bus is unavailable. + +The clusterer itself owns its DB writes (``attacker_identities`` + +``attackers.identity_id`` updates). The worker shell is responsible only +for: + +* lifecycle (bus connect, heartbeat, control listener, clean shutdown), +* publishing ``identity.formed`` / ``identity.observation.linked`` / + ``identity.merged`` / ``identity.unmerged`` from the + :class:`ClusterResult` returned by ``tick``. + +The skeleton ``ConnectedComponentsClusterer.tick`` returns an empty +result, so this worker runs but emits no identity events until edges +are wired in. +""" +from __future__ import annotations + +import asyncio +import contextlib +from typing import Optional + +from decnet.bus import topics as _topics +from decnet.bus.base import BaseBus +from decnet.bus.factory import get_bus +from decnet.bus.publish import ( + publish_safely, + run_control_listener_signal as _run_control_listener_signal, + run_health_heartbeat as _run_health_heartbeat, +) +from decnet.clustering.base import Clusterer, ClusterResult +from decnet.clustering.factory import get_clusterer +from decnet.logging import get_logger +from decnet.web.db.repository import BaseRepository + +log = get_logger("clustering.worker") + +_DEFAULT_POLL_SECS = 60.0 + + +async def run_clusterer_loop( + repo: BaseRepository, + *, + poll_interval_secs: float = _DEFAULT_POLL_SECS, + clusterer: Optional[Clusterer] = None, + shutdown: Optional[asyncio.Event] = None, +) -> None: + """Run the identity clusterer until cancelled. + + *clusterer* defaults to :func:`get_clusterer` — tests pass a fake. + *shutdown* is an optional external stop signal; the loop also exits + cleanly on :class:`asyncio.CancelledError` and + :class:`KeyboardInterrupt`. + """ + if clusterer is None: + clusterer = get_clusterer() + log.info( + "clusterer started impl=%s poll_interval_secs=%s", + clusterer.name, poll_interval_secs, + ) + + bus: Optional[BaseBus] = None + wake = asyncio.Event() + wake_tasks: list[asyncio.Task] = [] + heartbeat_task: Optional[asyncio.Task] = None + try: + candidate = get_bus(client_name="clusterer") + await candidate.connect() + bus = candidate + wake_tasks.append(asyncio.create_task( + _wake_on(bus, wake, _topics.attacker(_topics.ATTACKER_OBSERVED)), + )) + wake_tasks.append(asyncio.create_task( + _wake_on(bus, wake, _topics.attacker(_topics.ATTACKER_SCORED)), + )) + heartbeat_task = asyncio.create_task( + _run_health_heartbeat(bus, "clusterer"), + ) + wake_tasks.append(asyncio.create_task( + _run_control_listener_signal(bus, "clusterer"), + )) + except Exception as exc: # noqa: BLE001 + log.warning( + "clusterer: bus unavailable, running in poll-only mode: %s", exc, + ) + + if shutdown is None: + shutdown = asyncio.Event() + + try: + while not shutdown.is_set(): + try: + result = await clusterer.tick(repo) + except Exception: # noqa: BLE001 + log.exception("clusterer: tick failed") + result = ClusterResult() + + await _publish_result(bus, result) + + try: + await asyncio.wait_for( + wake.wait(), timeout=float(poll_interval_secs), + ) + except asyncio.TimeoutError: + pass + wake.clear() + except (asyncio.CancelledError, KeyboardInterrupt): + log.info("clusterer stopped") + finally: + for t in wake_tasks: + t.cancel() + if heartbeat_task is not None: + heartbeat_task.cancel() + for t in (*wake_tasks, heartbeat_task): + if t is None: + continue + with contextlib.suppress(asyncio.CancelledError, Exception): + await t + if bus is not None: + with contextlib.suppress(Exception): + await bus.close() + + +async def _publish_result(bus: Optional[BaseBus], result: ClusterResult) -> None: + """Fan ``ClusterResult`` out to the four ``identity.*`` topics.""" + for formed in result.identities_formed: + await publish_safely( + bus, + _topics.identity(_topics.IDENTITY_FORMED), + formed, + event_type=_topics.IDENTITY_FORMED, + ) + for linked in result.observations_linked: + await publish_safely( + bus, + _topics.identity(_topics.IDENTITY_OBSERVATION_LINKED), + linked, + event_type=_topics.IDENTITY_OBSERVATION_LINKED, + ) + for merged in result.identities_merged: + await publish_safely( + bus, + _topics.identity(_topics.IDENTITY_MERGED), + merged, + event_type=_topics.IDENTITY_MERGED, + ) + # identities_unmerged ships once IDENTITY_UNMERGED is reserved + # (next commit). The field is already on ClusterResult so the + # revocable-merge work doesn't reshape the dataclass. + + +async def _wake_on(bus: BaseBus, wake: asyncio.Event, pattern: str) -> None: + """Flip *wake* every time *pattern* fires on the bus. + + Survives transient subscriber errors by logging and exiting; the + poll-interval fallback keeps the loop alive in poll-only mode. + """ + try: + sub = bus.subscribe(pattern) + async with sub: + async for _event in sub: + wake.set() + except asyncio.CancelledError: + raise + except Exception as exc: # noqa: BLE001 + log.warning( + "clusterer: subscriber for %s died (%s); falling back to poll", + pattern, exc, + ) + + +__all__ = ["run_clusterer_loop"] diff --git a/tests/clustering/test_clusterer_factory.py b/tests/clustering/test_clusterer_factory.py new file mode 100644 index 00000000..e5af93ca --- /dev/null +++ b/tests/clustering/test_clusterer_factory.py @@ -0,0 +1,34 @@ +"""Tests for :mod:`decnet.clustering.factory`.""" +from __future__ import annotations + +import pytest + +from decnet.clustering.base import Clusterer +from decnet.clustering.factory import get_clusterer +from decnet.clustering.impl.connected_components import ConnectedComponentsClusterer + + +def test_default_returns_connected_components(monkeypatch): + monkeypatch.delenv("DECNET_CLUSTERER_TYPE", raising=False) + c = get_clusterer() + assert isinstance(c, ConnectedComponentsClusterer) + assert isinstance(c, Clusterer) + assert c.name == "connected_components" + + +def test_explicit_connected_components(monkeypatch): + monkeypatch.setenv("DECNET_CLUSTERER_TYPE", "connected_components") + c = get_clusterer() + assert isinstance(c, ConnectedComponentsClusterer) + + +def test_unknown_clusterer_type_raises(monkeypatch): + monkeypatch.setenv("DECNET_CLUSTERER_TYPE", "nope") + with pytest.raises(ValueError, match="Unknown clusterer"): + get_clusterer() + + +def test_case_insensitive(monkeypatch): + monkeypatch.setenv("DECNET_CLUSTERER_TYPE", " CONNECTED_COMPONENTS ") + c = get_clusterer() + assert isinstance(c, ConnectedComponentsClusterer) diff --git a/tests/clustering/test_clusterer_worker.py b/tests/clustering/test_clusterer_worker.py new file mode 100644 index 00000000..40163213 --- /dev/null +++ b/tests/clustering/test_clusterer_worker.py @@ -0,0 +1,178 @@ +"""End-to-end tests for the clusterer worker shell. + +The skeleton clusterer is a no-op; these tests cover the shell: + +* exits cleanly on shutdown signal (and via cancel) +* invokes ``tick`` on each loop iteration +* publishes :class:`ClusterResult` side-effects on the right topics +* a clusterer raising from ``tick`` is logged and does not crash the loop +""" +from __future__ import annotations + +import asyncio + +import pytest + +from decnet.bus import topics as _topics +from decnet.clustering.base import Clusterer, ClusterResult +from decnet.clustering.impl.connected_components import ConnectedComponentsClusterer +from decnet.clustering.worker import run_clusterer_loop +from decnet.web.db.factory import get_repository + + +@pytest.fixture +async def repo(tmp_path): + r = get_repository(db_path=str(tmp_path / "clusterer.db")) + await r.initialize() + return r + + +@pytest.fixture(autouse=True) +def _no_bus(monkeypatch): + """Run workers in poll-only mode — no real Unix socket.""" + monkeypatch.setenv("DECNET_BUS_ENABLED", "false") + + +class _FakeClusterer(Clusterer): + """Test double: returns canned :class:`ClusterResult` per call.""" + + name = "fake" + + def __init__(self, results: list[ClusterResult] | None = None) -> None: + self._results = list(results or []) + self.calls = 0 + + async def tick(self, repo) -> ClusterResult: + self.calls += 1 + if self._results: + return self._results.pop(0) + return ClusterResult() + + +class _RaisingClusterer(Clusterer): + name = "raising" + + def __init__(self) -> None: + self.calls = 0 + + async def tick(self, repo) -> ClusterResult: + self.calls += 1 + raise RuntimeError("boom") + + +@pytest.mark.anyio +async def test_loop_exits_on_shutdown_signal(repo): + shutdown = asyncio.Event() + clusterer = _FakeClusterer() + task = asyncio.create_task( + run_clusterer_loop( + repo, + poll_interval_secs=0.05, + clusterer=clusterer, + shutdown=shutdown, + ) + ) + await asyncio.sleep(0.12) + shutdown.set() + await asyncio.wait_for(task, timeout=2.0) + assert clusterer.calls >= 1 + + +@pytest.mark.anyio +async def test_loop_exits_on_cancel(repo): + clusterer = _FakeClusterer() + task = asyncio.create_task( + run_clusterer_loop( + repo, + poll_interval_secs=0.05, + clusterer=clusterer, + ) + ) + await asyncio.sleep(0.1) + task.cancel() + # The loop catches CancelledError and exits cleanly, mirroring the + # intel + reuse worker shells. + await asyncio.wait_for(task, timeout=2.0) + assert clusterer.calls >= 1 + + +@pytest.mark.anyio +async def test_tick_failure_does_not_crash_loop(repo): + """A clusterer raising from tick must be logged, not propagated.""" + shutdown = asyncio.Event() + clusterer = _RaisingClusterer() + task = asyncio.create_task( + run_clusterer_loop( + repo, + poll_interval_secs=0.05, + clusterer=clusterer, + shutdown=shutdown, + ) + ) + await asyncio.sleep(0.2) + shutdown.set() + await asyncio.wait_for(task, timeout=2.0) + # Loop kept ticking despite the raise. + assert clusterer.calls >= 2 + + +@pytest.mark.anyio +async def test_skeleton_clusterer_returns_empty_result(repo): + """The connected-components skeleton produces no side-effects yet.""" + c = ConnectedComponentsClusterer() + result = await c.tick(repo) + assert result.identities_formed == [] + assert result.observations_linked == [] + assert result.identities_merged == [] + assert result.identities_unmerged == [] + + +@pytest.mark.anyio +async def test_publishes_cluster_result_on_bus(monkeypatch, repo): + """Every entry in ClusterResult fans out to the correct topic.""" + published: list[tuple[str, dict, str]] = [] + + async def _fake_publish(bus, topic, payload, event_type=""): + published.append((topic, payload, event_type)) + + monkeypatch.setattr( + "decnet.clustering.worker.publish_safely", _fake_publish, + ) + + result = ClusterResult( + identities_formed=[ + {"identity_uuid": "id-1", "observation_uuids": ["obs-1", "obs-2"]}, + ], + observations_linked=[ + {"identity_uuid": "id-1", "observation_uuid": "obs-3"}, + ], + identities_merged=[ + {"winner_uuid": "id-1", "loser_uuid": "id-2"}, + ], + ) + clusterer = _FakeClusterer(results=[result]) + + shutdown = asyncio.Event() + task = asyncio.create_task( + run_clusterer_loop( + repo, + poll_interval_secs=0.05, + clusterer=clusterer, + shutdown=shutdown, + ) + ) + await asyncio.sleep(0.1) + shutdown.set() + await asyncio.wait_for(task, timeout=2.0) + + topics_seen = {t for t, _, _ in published} + assert _topics.identity(_topics.IDENTITY_FORMED) in topics_seen + assert _topics.identity(_topics.IDENTITY_OBSERVATION_LINKED) in topics_seen + assert _topics.identity(_topics.IDENTITY_MERGED) in topics_seen + + +@pytest.mark.anyio +async def test_clusterer_registered_in_cli(): + """`decnet clusterer` is registered as a master-only command.""" + from decnet.cli.gating import MASTER_ONLY_COMMANDS + assert "clusterer" in MASTER_ONLY_COMMANDS