358 lines
12 KiB
Python
358 lines
12 KiB
Python
"""End-to-end tests for the campaign-clusterer worker shell + tick.
|
|
|
|
Mirrors :mod:`tests.clustering.test_clusterer_worker` for the layer
|
|
above. Covers shell lifecycle (shutdown / cancel / raising tick),
|
|
end-to-end ``tick`` against SQLite (form, link, merge, revoke), bus
|
|
fan-out to the four ``campaign.*`` topics + cross-family
|
|
``identity.campaign.assigned``, factory dispatch, and CLI gating.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
from datetime import datetime, timezone
|
|
|
|
import pytest
|
|
|
|
from decnet.bus import topics as _topics
|
|
from decnet.clustering.campaign.base import (
|
|
CampaignClusterer,
|
|
CampaignClusterResult,
|
|
)
|
|
from decnet.clustering.campaign.factory import get_campaign_clusterer
|
|
from decnet.clustering.campaign.impl.connected_components import (
|
|
ConnectedComponentsCampaignClusterer,
|
|
cluster_identities,
|
|
from_identity_row,
|
|
)
|
|
from decnet.clustering.campaign.impl.similarity import IdentityFeatures
|
|
from decnet.clustering.campaign.worker import run_campaign_clusterer_loop
|
|
from decnet.web.db.factory import get_repository
|
|
|
|
|
|
@pytest.fixture
|
|
async def repo(tmp_path):
|
|
r = get_repository(db_path=str(tmp_path / "campaign.db"))
|
|
await r.initialize()
|
|
return r
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _no_bus(monkeypatch):
|
|
"""Run workers in poll-only mode — no real Unix socket."""
|
|
monkeypatch.setenv("DECNET_BUS_ENABLED", "false")
|
|
|
|
|
|
# ─── Test doubles ───────────────────────────────────────────────────────────
|
|
|
|
|
|
class _FakeClusterer(CampaignClusterer):
|
|
name = "fake"
|
|
|
|
def __init__(self, results=None) -> None:
|
|
self._results = list(results or [])
|
|
self.calls = 0
|
|
|
|
async def tick(self, repo) -> CampaignClusterResult:
|
|
self.calls += 1
|
|
if self._results:
|
|
return self._results.pop(0)
|
|
return CampaignClusterResult()
|
|
|
|
|
|
class _RaisingClusterer(CampaignClusterer):
|
|
name = "raising"
|
|
|
|
def __init__(self) -> None:
|
|
self.calls = 0
|
|
|
|
async def tick(self, repo) -> CampaignClusterResult:
|
|
self.calls += 1
|
|
raise RuntimeError("boom")
|
|
|
|
|
|
# ─── Shell lifecycle ────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_loop_exits_on_shutdown(repo):
|
|
shutdown = asyncio.Event()
|
|
clusterer = _FakeClusterer()
|
|
task = asyncio.create_task(
|
|
run_campaign_clusterer_loop(
|
|
repo, poll_interval_secs=0.05,
|
|
clusterer=clusterer, shutdown=shutdown,
|
|
)
|
|
)
|
|
await asyncio.sleep(0.12)
|
|
shutdown.set()
|
|
await asyncio.wait_for(task, timeout=2.0)
|
|
assert clusterer.calls >= 1
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_loop_exits_on_cancel(repo):
|
|
clusterer = _FakeClusterer()
|
|
task = asyncio.create_task(
|
|
run_campaign_clusterer_loop(
|
|
repo, poll_interval_secs=0.05, clusterer=clusterer,
|
|
)
|
|
)
|
|
await asyncio.sleep(0.1)
|
|
task.cancel()
|
|
await asyncio.wait_for(task, timeout=2.0)
|
|
assert clusterer.calls >= 1
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_failure_does_not_crash_loop(repo):
|
|
shutdown = asyncio.Event()
|
|
clusterer = _RaisingClusterer()
|
|
task = asyncio.create_task(
|
|
run_campaign_clusterer_loop(
|
|
repo, poll_interval_secs=0.05,
|
|
clusterer=clusterer, shutdown=shutdown,
|
|
)
|
|
)
|
|
await asyncio.sleep(0.2)
|
|
shutdown.set()
|
|
await asyncio.wait_for(task, timeout=2.0)
|
|
assert clusterer.calls >= 2
|
|
|
|
|
|
# ─── Bus fan-out ────────────────────────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_publishes_campaign_result_on_bus(monkeypatch, repo):
|
|
published: list[tuple[str, dict, str]] = []
|
|
|
|
async def _fake_publish(bus, topic, payload, event_type=""):
|
|
published.append((topic, payload, event_type))
|
|
|
|
monkeypatch.setattr(
|
|
"decnet.clustering.campaign.worker.publish_safely", _fake_publish,
|
|
)
|
|
|
|
result = CampaignClusterResult(
|
|
campaigns_formed=[
|
|
{"campaign_uuid": "c-1", "identity_uuids": ["i-1", "i-2"]},
|
|
],
|
|
identities_assigned=[
|
|
{"campaign_uuid": "c-1", "identity_uuid": "i-3",
|
|
"prior_campaign_uuid": None},
|
|
],
|
|
campaigns_merged=[
|
|
{"winner_uuid": "c-1", "loser_uuid": "c-2"},
|
|
],
|
|
campaigns_unmerged=[
|
|
{"resurrected_uuid": "c-2", "former_winner_uuid": "c-1"},
|
|
],
|
|
)
|
|
clusterer = _FakeClusterer(results=[result])
|
|
|
|
shutdown = asyncio.Event()
|
|
task = asyncio.create_task(
|
|
run_campaign_clusterer_loop(
|
|
repo, poll_interval_secs=0.05,
|
|
clusterer=clusterer, shutdown=shutdown,
|
|
)
|
|
)
|
|
await asyncio.sleep(0.1)
|
|
shutdown.set()
|
|
await asyncio.wait_for(task, timeout=2.0)
|
|
|
|
topics_seen = {t for t, _, _ in published}
|
|
assert _topics.campaign(_topics.CAMPAIGN_FORMED) in topics_seen
|
|
assert _topics.campaign(_topics.CAMPAIGN_IDENTITY_ASSIGNED) in topics_seen
|
|
assert _topics.campaign(_topics.CAMPAIGN_MERGED) in topics_seen
|
|
assert _topics.campaign(_topics.CAMPAIGN_UNMERGED) in topics_seen
|
|
# Cross-family signal — every campaigns_formed identity AND every
|
|
# identities_assigned identity should fire identity.campaign.assigned.
|
|
cross = _topics.identity(_topics.IDENTITY_CAMPAIGN_ASSIGNED)
|
|
cross_payloads = [p for t, p, _ in published if t == cross]
|
|
cross_idents = {p["identity_uuid"] for p in cross_payloads}
|
|
assert {"i-1", "i-2", "i-3"}.issubset(cross_idents)
|
|
|
|
|
|
# ─── Pure clusterer + projection ────────────────────────────────────────────
|
|
|
|
|
|
def test_cluster_identities_singletons():
|
|
a = IdentityFeatures(identity_uuid="a")
|
|
b = IdentityFeatures(identity_uuid="b")
|
|
labels = cluster_identities([a, b])
|
|
assert labels["a"] != labels["b"]
|
|
|
|
|
|
def test_cluster_identities_phase_handoff_unions():
|
|
a = IdentityFeatures(
|
|
identity_uuid="a",
|
|
last_phase_per_decky={"d1": "command_and_control"},
|
|
last_seen_per_decky={"d1": 1000.0},
|
|
)
|
|
b = IdentityFeatures(
|
|
identity_uuid="b",
|
|
first_phase_per_decky={"d1": "discovery"},
|
|
first_seen_per_decky={"d1": 1100.0},
|
|
)
|
|
labels = cluster_identities([a, b])
|
|
assert labels["a"] == labels["b"]
|
|
|
|
|
|
def test_from_identity_row_parses_json_lists():
|
|
feat = from_identity_row({
|
|
"uuid": "i-1",
|
|
"payload_simhashes": json.dumps(["h1", "h2"]),
|
|
"c2_endpoints": json.dumps(["c1"]),
|
|
})
|
|
assert feat.identity_uuid == "i-1"
|
|
assert feat.payload_hashes == frozenset({"h1", "h2"})
|
|
assert feat.c2_endpoints == frozenset({"c1"})
|
|
|
|
|
|
def test_from_identity_row_handles_null_and_garbage():
|
|
f = from_identity_row({
|
|
"uuid": "i-1",
|
|
"payload_simhashes": None,
|
|
"c2_endpoints": "not-json",
|
|
})
|
|
assert f.payload_hashes == frozenset()
|
|
assert f.c2_endpoints == frozenset()
|
|
|
|
|
|
# ─── End-to-end tick against SQLite ────────────────────────────────────────
|
|
|
|
|
|
async def _create_identity(repo, uuid: str, **kwargs) -> str:
|
|
now = datetime.now(timezone.utc)
|
|
return await repo.create_attacker_identity({
|
|
"uuid": uuid,
|
|
"first_seen_at": now,
|
|
"last_seen_at": now,
|
|
"payload_simhashes": kwargs.get("payload_simhashes"),
|
|
"c2_endpoints": kwargs.get("c2_endpoints"),
|
|
})
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_empty_db_returns_empty_result(repo):
|
|
c = ConnectedComponentsCampaignClusterer()
|
|
result = await c.tick(repo)
|
|
assert result.campaigns_formed == []
|
|
assert result.identities_assigned == []
|
|
assert result.campaigns_merged == []
|
|
assert result.campaigns_unmerged == []
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_forms_campaign_for_shared_infra_co_op(repo):
|
|
"""Two identities with shared payload + C2 fold to one campaign.
|
|
|
|
The canonical F5-style co-op pattern, exercised end-to-end through
|
|
the production-row adapter. ``from_identity_row`` reads
|
|
``payload_simhashes`` + ``c2_endpoints`` from the AttackerIdentity
|
|
JSON columns, builds IdentityFeatures, and the campaign weight
|
|
crosses threshold on shared_infra alone.
|
|
"""
|
|
await _create_identity(
|
|
repo, "i1",
|
|
payload_simhashes=json.dumps(["h1"]),
|
|
c2_endpoints=json.dumps(["c1"]),
|
|
)
|
|
await _create_identity(
|
|
repo, "i2",
|
|
payload_simhashes=json.dumps(["h1"]),
|
|
c2_endpoints=json.dumps(["c1"]),
|
|
)
|
|
|
|
c = ConnectedComponentsCampaignClusterer()
|
|
result = await c.tick(repo)
|
|
|
|
assert len(result.campaigns_formed) == 1
|
|
formed_idents = set(result.campaigns_formed[0]["identity_uuids"])
|
|
assert formed_idents == {"i1", "i2"}
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_keeps_distinct_payloads_separate(repo):
|
|
"""No payload/C2 overlap → singleton per identity."""
|
|
await _create_identity(
|
|
repo, "i1",
|
|
payload_simhashes=json.dumps(["h1"]),
|
|
c2_endpoints=json.dumps(["c1"]),
|
|
)
|
|
await _create_identity(
|
|
repo, "i2",
|
|
payload_simhashes=json.dumps(["h2"]),
|
|
c2_endpoints=json.dumps(["c2"]),
|
|
)
|
|
|
|
c = ConnectedComponentsCampaignClusterer()
|
|
result = await c.tick(repo)
|
|
|
|
assert len(result.campaigns_formed) == 2
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_idempotent_links_existing_identity(repo):
|
|
"""Second tick on same input doesn't double-create campaigns."""
|
|
await _create_identity(repo, "i1")
|
|
c = ConnectedComponentsCampaignClusterer()
|
|
|
|
r1 = await c.tick(repo)
|
|
assert len(r1.campaigns_formed) == 1
|
|
campaign_uuid = r1.campaigns_formed[0]["campaign_uuid"]
|
|
|
|
r2 = await c.tick(repo)
|
|
# Identity already linked — no new campaign, no new assignment.
|
|
assert r2.campaigns_formed == []
|
|
assert r2.identities_assigned == []
|
|
# And the existing assignment persisted.
|
|
assert await repo.count_identities_for_campaign(campaign_uuid) == 1
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_skips_merged_out_identities(repo):
|
|
"""Merged-out identity rows must not show up as cluster inputs."""
|
|
await _create_identity(repo, "i1")
|
|
await _create_identity(repo, "i2")
|
|
# Soft-merge i2 into i1 at the identity layer.
|
|
await repo.update_identity_merged_into("i2", "i1")
|
|
|
|
c = ConnectedComponentsCampaignClusterer()
|
|
result = await c.tick(repo)
|
|
|
|
# Only i1 is an active row; one campaign formed, with one identity.
|
|
assert len(result.campaigns_formed) == 1
|
|
assert result.campaigns_formed[0]["identity_uuids"] == ["i1"]
|
|
|
|
|
|
# ─── Factory + CLI gating ────────────────────────────────────────────────────
|
|
|
|
|
|
def test_factory_default():
|
|
c = get_campaign_clusterer()
|
|
assert isinstance(c, ConnectedComponentsCampaignClusterer)
|
|
|
|
|
|
def test_factory_unknown_raises(monkeypatch):
|
|
monkeypatch.setenv("DECNET_CAMPAIGN_CLUSTERER_TYPE", "nope")
|
|
with pytest.raises(ValueError):
|
|
get_campaign_clusterer()
|
|
|
|
|
|
def test_campaign_clusterer_registered_in_cli():
|
|
from decnet.cli.gating import MASTER_ONLY_COMMANDS
|
|
assert "campaign-clusterer" in MASTER_ONLY_COMMANDS
|
|
|
|
|
|
def test_campaign_topic_builder_round_trips():
|
|
assert _topics.campaign(_topics.CAMPAIGN_FORMED) == "campaign.formed"
|
|
assert _topics.campaign(_topics.CAMPAIGN_IDENTITY_ASSIGNED) == (
|
|
"campaign.identity.assigned"
|
|
)
|
|
assert _topics.identity(_topics.IDENTITY_CAMPAIGN_ASSIGNED) == (
|
|
"identity.campaign.assigned"
|
|
)
|