809 lines
32 KiB
Python
809 lines
32 KiB
Python
"""Tests for the connected-components clusterer (commit 4 — high-weight edges).
|
|
|
|
Covers, in order:
|
|
|
|
* The pure ``cluster_observations`` algorithm — singletons stay
|
|
isolated, exact-match high-weight signals fold them together,
|
|
un-fingerprinted observations stay un-mergeable.
|
|
* The production-row adapter ``from_attacker_row`` — JA3 / HASSH
|
|
recovered from the fingerprints JSON; absent fields project to
|
|
``None``.
|
|
* End-to-end ``tick`` against a real SQLite repo: seeded attackers
|
|
with shared / divergent fingerprints get the right identity rows
|
|
written and the right ``identity_id`` links set.
|
|
* Three fixture-bound assertions: lone_wolf (pure singletons),
|
|
shared_wordlist (no fingerprint signal — singletons), and
|
|
vpn_hopping at identity-level (one identity from 5 rotated IPs
|
|
via shared JA3 + HASSH).
|
|
|
|
The tick is bus-free here — the worker shell tests cover bus fan-out
|
|
separately. We're validating the algorithm + DB writes here.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from decnet.clustering.impl.connected_components import (
|
|
ConnectedComponentsClusterer,
|
|
cluster_observations,
|
|
from_attacker_row,
|
|
)
|
|
from decnet.clustering.impl.similarity import Observation, from_synthetic
|
|
from decnet.web.db.factory import get_repository
|
|
|
|
FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "campaigns"
|
|
|
|
|
|
# ─── pure algorithm ─────────────────────────────────────────────────────────
|
|
|
|
|
|
def _obs(obs_id: str, **kwargs) -> Observation:
|
|
return Observation(observation_id=obs_id, **kwargs)
|
|
|
|
|
|
def test_cluster_observations_singletons_stay_isolated():
|
|
a = _obs("a", ja3="ja3-a")
|
|
b = _obs("b", ja3="ja3-b")
|
|
c = _obs("c") # no fingerprint
|
|
labels = cluster_observations([a, b, c])
|
|
assert labels["a"] != labels["b"]
|
|
assert labels["b"] != labels["c"]
|
|
assert labels["a"] != labels["c"]
|
|
|
|
|
|
def test_cluster_observations_ja3_match_unions():
|
|
a = _obs("a", ja3="ja3-shared")
|
|
b = _obs("b", ja3="ja3-shared")
|
|
c = _obs("c", ja3="ja3-other")
|
|
labels = cluster_observations([a, b, c])
|
|
assert labels["a"] == labels["b"]
|
|
assert labels["a"] != labels["c"]
|
|
|
|
|
|
def test_cluster_observations_unfingerprinted_stay_separate():
|
|
"""Two observations with no signals must NOT collapse into one
|
|
cluster — that would fuse every noise scanner together."""
|
|
a = _obs("a")
|
|
b = _obs("b")
|
|
labels = cluster_observations([a, b])
|
|
assert labels["a"] != labels["b"]
|
|
|
|
|
|
def test_cluster_observations_transitive_via_payload():
|
|
"""A↔B via JA3, B↔C via payload → A, B, C all in one component."""
|
|
a = _obs("a", ja3="ja3-x")
|
|
b = _obs("b", ja3="ja3-x", payload_hashes=frozenset({"pl-1"}))
|
|
c = _obs("c", payload_hashes=frozenset({"pl-1"}))
|
|
labels = cluster_observations([a, b, c])
|
|
assert labels["a"] == labels["b"] == labels["c"]
|
|
|
|
|
|
def test_cluster_observations_empty_input():
|
|
assert cluster_observations([]) == {}
|
|
|
|
|
|
def test_cluster_observations_deterministic():
|
|
"""Same input → same labels. Load-bearing for fixture stability."""
|
|
obs = [_obs("a", ja3="x"), _obs("b", ja3="x"), _obs("c")]
|
|
assert cluster_observations(obs) == cluster_observations(obs)
|
|
|
|
|
|
# ─── production-row adapter ────────────────────────────────────────────────
|
|
|
|
|
|
def test_from_attacker_row_extracts_ja3_and_hassh():
|
|
row = {
|
|
"uuid": "att-1",
|
|
"asn": 64500,
|
|
"identity_id": None,
|
|
"fingerprints": json.dumps([
|
|
{"kind": "ja3", "hash": "ja3-abc"},
|
|
{"kind": "hassh", "hash": "hassh-def"},
|
|
{"kind": "jarm", "hash": "jarm-ghi"}, # not used in v1
|
|
]),
|
|
}
|
|
obs = from_attacker_row(row)
|
|
assert obs.observation_id == "att-1"
|
|
assert obs.ja3 == "ja3-abc"
|
|
assert obs.hassh == "hassh-def"
|
|
assert obs.asn == 64500
|
|
|
|
|
|
def test_from_attacker_row_handles_empty_fingerprints():
|
|
row = {"uuid": "att-2", "asn": None, "identity_id": None, "fingerprints": "[]"}
|
|
obs = from_attacker_row(row)
|
|
assert obs.ja3 is None
|
|
assert obs.hassh is None
|
|
assert obs.asn is None
|
|
|
|
|
|
def test_from_attacker_row_handles_malformed_json():
|
|
row = {"uuid": "att-3", "asn": None, "identity_id": None, "fingerprints": "not json"}
|
|
obs = from_attacker_row(row)
|
|
assert obs.ja3 is None
|
|
assert obs.hassh is None
|
|
|
|
|
|
# ─── end-to-end tick against SQLite ────────────────────────────────────────
|
|
|
|
|
|
@pytest.fixture
|
|
async def repo(tmp_path):
|
|
r = get_repository(db_path=str(tmp_path / "clusterer.db"))
|
|
await r.initialize()
|
|
return r
|
|
|
|
|
|
async def _seed_attacker(
|
|
repo, ip: str, *,
|
|
ja3: str | None = None,
|
|
hassh: str | None = None,
|
|
asn: int | None = None,
|
|
cert_sha256: str | None = None,
|
|
) -> str:
|
|
now = datetime.now(timezone.utc)
|
|
# Two-shape fingerprint payload:
|
|
# - the "kind" entries feed the clusterer's from_attacker_row
|
|
# (test-fixture shape, line ~115 of connected_components.py)
|
|
# - the "bounty_type/payload" entries feed identity_rollup's
|
|
# extract_fp_summaries (production shape, written by the
|
|
# profiler from real bounty rows). Both shapes coexist in
|
|
# the same JSON list so the same seed exercises clustering
|
|
# AND the identity-column rollup.
|
|
fingerprints: list[dict] = []
|
|
if ja3:
|
|
fingerprints.append({"kind": "ja3", "hash": ja3})
|
|
fingerprints.append({
|
|
"bounty_type": "fingerprint",
|
|
"payload": {"fingerprint_type": "ja3", "ja3": ja3},
|
|
})
|
|
if hassh:
|
|
fingerprints.append({"kind": "hassh", "hash": hassh})
|
|
fingerprints.append({
|
|
"bounty_type": "fingerprint",
|
|
"payload": {"fingerprint_type": "hassh_server", "hash": hassh},
|
|
})
|
|
if cert_sha256:
|
|
fingerprints.append({
|
|
"bounty_type": "fingerprint",
|
|
"payload": {
|
|
"fingerprint_type": "tls_certificate",
|
|
"cert_sha256": cert_sha256,
|
|
},
|
|
})
|
|
return await repo.upsert_attacker({
|
|
"ip": ip,
|
|
"first_seen": now,
|
|
"last_seen": now,
|
|
"event_count": 1,
|
|
"asn": asn,
|
|
"fingerprints": json.dumps(fingerprints),
|
|
})
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_on_empty_db_is_noop(repo):
|
|
c = ConnectedComponentsClusterer()
|
|
result = await c.tick(repo)
|
|
assert result.identities_formed == []
|
|
assert result.observations_linked == []
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_clusters_shared_ja3(repo):
|
|
"""Two observations with the same JA3 → one identity row, both linked."""
|
|
a = await _seed_attacker(repo, "1.1.1.1", ja3="ja3-x", asn=64500)
|
|
b = await _seed_attacker(repo, "2.2.2.2", ja3="ja3-x", asn=64501)
|
|
|
|
c = ConnectedComponentsClusterer()
|
|
result = await c.tick(repo)
|
|
|
|
assert len(result.identities_formed) == 1
|
|
formed = result.identities_formed[0]
|
|
assert set(formed["observation_uuids"]) == {a, b}
|
|
|
|
# Identity row exists and both attackers FK to it.
|
|
identity_uuid = formed["identity_uuid"]
|
|
identity = await repo.get_identity_by_uuid(identity_uuid)
|
|
assert identity is not None
|
|
assert identity["uuid"] == identity_uuid
|
|
|
|
obs_for_id = await repo.list_observations_for_identity(identity_uuid)
|
|
obs_uuids = {o["uuid"] for o in obs_for_id}
|
|
assert obs_uuids == {a, b}
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_keeps_distinct_ja3_separate(repo):
|
|
"""Two divergent JA3s with no other shared signal → two singletons,
|
|
no identity rows written (singletons stay un-clustered in v1)."""
|
|
await _seed_attacker(repo, "1.1.1.1", ja3="ja3-a")
|
|
await _seed_attacker(repo, "2.2.2.2", ja3="ja3-b")
|
|
|
|
c = ConnectedComponentsClusterer()
|
|
result = await c.tick(repo)
|
|
|
|
# Singletons get identity rows of their own (one observation per cluster).
|
|
assert len(result.identities_formed) == 2
|
|
for formed in result.identities_formed:
|
|
assert len(formed["observation_uuids"]) == 1
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_merges_two_identities_when_component_spans_them(repo):
|
|
"""Two pre-existing identities whose observations now cluster
|
|
together (e.g. a previously-missing fingerprint shows up) get
|
|
soft-merged: the smaller-uuid identity wins, the loser's
|
|
merged_into_uuid is set, observations stay FK'd to their
|
|
original identity row."""
|
|
# Tick 1: two distinct fingerprints → two distinct identities.
|
|
a = await _seed_attacker(repo, "1.1.1.1", ja3="ja3-A")
|
|
b = await _seed_attacker(repo, "2.2.2.2", ja3="ja3-B")
|
|
|
|
c = ConnectedComponentsClusterer()
|
|
first = await c.tick(repo)
|
|
assert len(first.identities_formed) == 2
|
|
|
|
# Snapshot the two identity uuids; we'll need them after the merge.
|
|
identities_after_first = await repo.list_all_identities()
|
|
assert len(identities_after_first) == 2
|
|
uuids = sorted(i["uuid"] for i in identities_after_first)
|
|
expected_winner, expected_loser = uuids[0], uuids[1]
|
|
|
|
# Tick 2: a bridging observation — fingerprints match BOTH prior
|
|
# rows. The bridge can't agree with both JA3s simultaneously, so
|
|
# use a HASSH that matches A and a payload that matches B.
|
|
# Simulate this with two new attackers, each linking a side.
|
|
# Simpler: change attacker A's stored fingerprint to also include
|
|
# ja3-B by re-seeding (in production this would be a fresh
|
|
# observation that bridges them).
|
|
bridge = await _seed_attacker(repo, "3.3.3.3", ja3="ja3-A", hassh="hassh-bridge")
|
|
# Make B's row carry the same hassh so the bridge can union them.
|
|
import json as _json
|
|
from datetime import datetime, timezone
|
|
now = datetime.now(timezone.utc)
|
|
await repo.upsert_attacker({
|
|
"ip": "2.2.2.2", "first_seen": now, "last_seen": now,
|
|
"event_count": 1,
|
|
"fingerprints": _json.dumps([
|
|
{"kind": "ja3", "hash": "ja3-B"},
|
|
{"kind": "hassh", "hash": "hassh-bridge"},
|
|
]),
|
|
})
|
|
|
|
second = await c.tick(repo)
|
|
assert len(second.identities_merged) == 1
|
|
merge = second.identities_merged[0]
|
|
assert merge["winner_uuid"] == expected_winner
|
|
assert merge["loser_uuid"] == expected_loser
|
|
|
|
# The loser's row still exists with merged_into_uuid set.
|
|
all_after = {i["uuid"]: i for i in await repo.list_all_identities()}
|
|
assert all_after[expected_loser]["merged_into_uuid"] == expected_winner
|
|
assert all_after[expected_winner]["merged_into_uuid"] is None
|
|
|
|
# Observations stay FK'd to their original identity row — the
|
|
# merge is a soft pointer, NOT a re-point.
|
|
a_row = await repo.get_attacker_by_uuid(a)
|
|
b_row = await repo.get_attacker_by_uuid(b)
|
|
assert a_row["identity_id"] in {expected_winner, expected_loser}
|
|
assert b_row["identity_id"] in {expected_winner, expected_loser}
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_unmerges_when_observations_diverge(repo):
|
|
"""Pre-seed a soft-merged pair, then change the underlying
|
|
observations so they no longer cluster. The tick must clear
|
|
merged_into_uuid and emit identities_unmerged."""
|
|
import json as _json
|
|
from datetime import datetime, timezone
|
|
now = datetime.now(timezone.utc)
|
|
|
|
# Two attackers with same JA3 → tick merges them via shared
|
|
# high-tier signal (one identity formed).
|
|
a = await _seed_attacker(repo, "1.1.1.1", ja3="ja3-shared")
|
|
b = await _seed_attacker(repo, "2.2.2.2", ja3="ja3-shared")
|
|
c = ConnectedComponentsClusterer()
|
|
first = await c.tick(repo)
|
|
assert len(first.identities_formed) == 1
|
|
one_identity_uuid = first.identities_formed[0]["identity_uuid"]
|
|
|
|
# Force a soft-merge state: split observation b out into its own
|
|
# identity, then merge that back into the first via the repo
|
|
# directly. This emulates a state the clusterer would have
|
|
# arrived at across multiple ticks (form, then merge).
|
|
second_uuid = "00000000-0000-0000-0000-00000000bbbb"
|
|
await repo.create_attacker_identity({
|
|
"uuid": second_uuid,
|
|
"schema_version": 1,
|
|
"first_seen_at": now, "last_seen_at": now,
|
|
"created_at": now, "updated_at": now,
|
|
"observation_count": 1,
|
|
})
|
|
await repo.set_attacker_identity_id(b, second_uuid)
|
|
# Soft-merge second_uuid into one_identity_uuid (winner).
|
|
winner = min(one_identity_uuid, second_uuid)
|
|
loser = max(one_identity_uuid, second_uuid)
|
|
if loser == one_identity_uuid:
|
|
# Make the canonical mapping consistent with the test setup —
|
|
# we need the merge to be "loser → winner" by min-uuid rule.
|
|
# Swap ownership so the smaller-uuid keeps the active observations.
|
|
await repo.set_attacker_identity_id(a, winner)
|
|
await repo.set_attacker_identity_id(b, loser)
|
|
await repo.update_identity_merged_into(loser, winner)
|
|
|
|
# Verify the soft-merge is in place.
|
|
pre = {i["uuid"]: i for i in await repo.list_all_identities()}
|
|
assert pre[loser]["merged_into_uuid"] == winner
|
|
|
|
# Now change the underlying fingerprints so a and b no longer cluster.
|
|
await repo.upsert_attacker({
|
|
"ip": "2.2.2.2", "first_seen": now, "last_seen": now,
|
|
"event_count": 1,
|
|
"fingerprints": _json.dumps([{"kind": "ja3", "hash": "ja3-different"}]),
|
|
})
|
|
|
|
# Tick should detect the divergence and revoke the merge.
|
|
third = await c.tick(repo)
|
|
assert len(third.identities_unmerged) == 1
|
|
unmerged = third.identities_unmerged[0]
|
|
assert unmerged["resurrected_uuid"] == loser
|
|
assert unmerged["former_winner_uuid"] == winner
|
|
|
|
post = {i["uuid"]: i for i in await repo.list_all_identities()}
|
|
assert post[loser]["merged_into_uuid"] is None
|
|
assert post[winner]["merged_into_uuid"] is None
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_is_idempotent_under_no_changes(repo):
|
|
"""Running tick twice with no state changes between produces no
|
|
side-effects on the second run."""
|
|
await _seed_attacker(repo, "1.1.1.1", ja3="ja3-x")
|
|
await _seed_attacker(repo, "2.2.2.2", ja3="ja3-x")
|
|
await _seed_attacker(repo, "3.3.3.3", ja3="ja3-y")
|
|
|
|
c = ConnectedComponentsClusterer()
|
|
first = await c.tick(repo)
|
|
second = await c.tick(repo)
|
|
assert second.identities_formed == []
|
|
assert second.observations_linked == []
|
|
assert second.identities_merged == []
|
|
assert second.identities_unmerged == []
|
|
# Sanity: the first tick did do something.
|
|
assert first.identities_formed
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_links_new_observation_to_existing_identity(repo):
|
|
"""First tick: 2 attackers cluster into one identity. Second tick:
|
|
a new attacker with the same JA3 should get linked, not minted."""
|
|
a = await _seed_attacker(repo, "1.1.1.1", ja3="ja3-x")
|
|
b = await _seed_attacker(repo, "2.2.2.2", ja3="ja3-x")
|
|
|
|
c = ConnectedComponentsClusterer()
|
|
first = await c.tick(repo)
|
|
assert len(first.identities_formed) == 1
|
|
identity_uuid = first.identities_formed[0]["identity_uuid"]
|
|
|
|
# New observation arrives; same JA3.
|
|
d = await _seed_attacker(repo, "3.3.3.3", ja3="ja3-x")
|
|
|
|
second = await c.tick(repo)
|
|
# No new identity should be formed for the existing component;
|
|
# observation-linked should fire for the new one.
|
|
formed_uuids = {f["identity_uuid"] for f in second.identities_formed}
|
|
assert identity_uuid not in formed_uuids, (
|
|
"second tick must link to the existing identity, not mint a new one"
|
|
)
|
|
linked_uuids = {l_["observation_uuid"] for l_ in second.observations_linked}
|
|
assert d in linked_uuids
|
|
|
|
|
|
# ─── identity fingerprint rollup ───────────────────────────────────────────
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_rolls_up_fingerprint_columns_on_create(repo):
|
|
"""A fresh-component tick must populate ja3_hashes / hassh_hashes /
|
|
tls_cert_sha256 on the newly-minted identity row, deduplicated and
|
|
sorted across all member observations."""
|
|
await _seed_attacker(
|
|
repo, "1.1.1.1", ja3="ja3-x", hassh="hassh-y", cert_sha256="ab" * 32,
|
|
)
|
|
await _seed_attacker(
|
|
repo, "2.2.2.2", ja3="ja3-x", hassh="hassh-y", cert_sha256="cd" * 32,
|
|
)
|
|
c = ConnectedComponentsClusterer()
|
|
result = await c.tick(repo)
|
|
assert len(result.identities_formed) == 1
|
|
identity_uuid = result.identities_formed[0]["identity_uuid"]
|
|
|
|
rows = {i["uuid"]: i for i in await repo.list_all_identities()}
|
|
identity = rows[identity_uuid]
|
|
assert json.loads(identity["ja3_hashes"]) == ["ja3-x"]
|
|
assert json.loads(identity["hassh_hashes"]) == ["hassh-y"]
|
|
assert json.loads(identity["tls_cert_sha256"]) == sorted(["ab" * 32, "cd" * 32])
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_rolls_up_fingerprints_on_link(repo):
|
|
"""When a new observation links into an existing identity, the
|
|
rollup must reflect any new cert SHA-256 it brings."""
|
|
await _seed_attacker(
|
|
repo, "1.1.1.1", ja3="ja3-x", cert_sha256="ab" * 32,
|
|
)
|
|
c = ConnectedComponentsClusterer()
|
|
first = await c.tick(repo)
|
|
identity_uuid = first.identities_formed[0]["identity_uuid"]
|
|
|
|
# New observation, same JA3, fresh cert.
|
|
await _seed_attacker(
|
|
repo, "2.2.2.2", ja3="ja3-x", cert_sha256="cd" * 32,
|
|
)
|
|
await c.tick(repo)
|
|
|
|
rows = {i["uuid"]: i for i in await repo.list_all_identities()}
|
|
identity = rows[identity_uuid]
|
|
assert json.loads(identity["tls_cert_sha256"]) == sorted(["ab" * 32, "cd" * 32])
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_tick_leaves_columns_null_when_no_fingerprints(repo):
|
|
"""Two attackers with NO fingerprint signal cluster as separate
|
|
singletons; their identity rows must keep all rollup columns NULL
|
|
(not "[]" — NULL distinguishes 'no signal yet' from 'known empty')."""
|
|
await _seed_attacker(repo, "1.1.1.1")
|
|
await _seed_attacker(repo, "2.2.2.2")
|
|
c = ConnectedComponentsClusterer()
|
|
await c.tick(repo)
|
|
|
|
for identity in await repo.list_all_identities():
|
|
assert identity["ja3_hashes"] is None
|
|
assert identity["hassh_hashes"] is None
|
|
assert identity["tls_cert_sha256"] is None
|
|
|
|
|
|
# ─── fixture-bound assertions (in-memory) ──────────────────────────────────
|
|
|
|
|
|
def _production_clusterer_predict(corpus) -> dict[str, str]:
|
|
"""Run the production cluster_observations over a corpus.
|
|
|
|
Mirrors the reference clusterer signature (corpus → dict) so it can
|
|
be passed to ``assert_fixture_bounds``. Pure / in-memory — does NOT
|
|
touch the DB. The DB-side path is covered by the tick tests above.
|
|
"""
|
|
obs = [from_synthetic(att) for att in corpus.attackers]
|
|
labels = cluster_observations(obs)
|
|
|
|
# Singletons (no shared signal) get unique cluster ids so the
|
|
# metrics see them as distinct classes — matches the
|
|
# fingerprint_clusterer reference shape on lone_wolf / shared_wordlist.
|
|
pred: dict[str, str] = {}
|
|
cluster_sizes: dict[str, int] = {}
|
|
for cid in labels.values():
|
|
cluster_sizes[cid] = cluster_sizes.get(cid, 0) + 1
|
|
for obs_id, cid in labels.items():
|
|
if cluster_sizes[cid] == 1:
|
|
pred[obs_id] = f"cc-singleton-{obs_id}"
|
|
else:
|
|
pred[obs_id] = cid
|
|
return pred
|
|
|
|
|
|
def test_lone_wolf_passes_with_production_clusterer():
|
|
"""Fixture 3: every actor singleton. The production clusterer
|
|
keeps them all separate (no shared high-weight signal)."""
|
|
from tests.clustering.fixture_harness import assert_fixture_bounds
|
|
from tests.factories.campaign_factory import generate, load_yaml
|
|
|
|
corpus = generate(load_yaml(FIXTURE_DIR / "lone_wolf.yaml"), seed=0)
|
|
assert_fixture_bounds(
|
|
corpus, _production_clusterer_predict,
|
|
FIXTURE_DIR / "lone_wolf.expected.yaml",
|
|
)
|
|
|
|
|
|
def test_shared_wordlist_passes_with_production_clusterer():
|
|
"""Fixture 1: two campaigns sharing only credentials, divergent
|
|
infra. The production clusterer (high-weight edges only) keeps
|
|
them separate — credential overlap is not a v1 signal yet."""
|
|
from tests.clustering.fixture_harness import assert_fixture_bounds
|
|
from tests.factories.campaign_factory import generate, load_yaml
|
|
|
|
corpus = generate(load_yaml(FIXTURE_DIR / "shared_wordlist.yaml"), seed=0)
|
|
assert_fixture_bounds(
|
|
corpus, _production_clusterer_predict,
|
|
FIXTURE_DIR / "shared_wordlist.expected.yaml",
|
|
)
|
|
|
|
|
|
def test_paused_campaign_passes_with_production_clusterer():
|
|
"""Fixture 4: one campaign split across two operational windows by
|
|
a multi-day silence. Both halves share JA3 + HASSH + payload + C2;
|
|
the production clusterer must fold them into one identity. Time-
|
|
agnostic invariant: the silence window is irrelevant to clustering."""
|
|
from tests.clustering.fixture_harness import assert_fixture_bounds
|
|
from tests.factories.campaign_factory import generate, load_yaml
|
|
|
|
corpus = generate(load_yaml(FIXTURE_DIR / "paused_campaign.yaml"), seed=0)
|
|
assert_fixture_bounds(
|
|
corpus, _production_clusterer_predict,
|
|
FIXTURE_DIR / "paused_campaign.expected.yaml",
|
|
)
|
|
|
|
|
|
def test_multi_operator_keeps_distinct_identities_with_production_clusterer():
|
|
"""Fixture 5 at identity-level: two operators with distinct
|
|
JA3 + HASSH, sharing C2 + payload. The production clusterer's
|
|
fingerprint-disagreement veto must keep them as 2 identities."""
|
|
from tests.factories.campaign_factory import generate, load_yaml
|
|
from tests.clustering.metrics import score
|
|
|
|
corpus = generate(load_yaml(FIXTURE_DIR / "multi_operator.yaml"), seed=0)
|
|
pred = _production_clusterer_predict(corpus)
|
|
# Two distinct truth identities; the production clusterer must
|
|
# produce two distinct predicted clusters (no merge across
|
|
# fingerprint-disagreeing operators).
|
|
assert len(set(pred.values())) == 2
|
|
metrics = score(corpus.truth_labels(level="identity"), pred)
|
|
# Perfect identity-level recovery: ARI = 1.0, homogeneity = 1.0.
|
|
assert metrics["adjusted_rand_index"] == pytest.approx(1.0)
|
|
assert metrics["homogeneity"] == pytest.approx(1.0)
|
|
|
|
|
|
def test_cluster_observations_credentials_alone_does_not_fuse():
|
|
"""Two observations sharing a credential set but nothing else
|
|
must stay distinct. Fixture 1's failure mode in miniature."""
|
|
a = Observation(
|
|
observation_id="a",
|
|
credentials=frozenset({("root", "toor"), ("admin", "admin")}),
|
|
)
|
|
b = Observation(
|
|
observation_id="b",
|
|
credentials=frozenset({("root", "toor"), ("admin", "admin")}),
|
|
)
|
|
labels = cluster_observations([a, b])
|
|
assert labels["a"] != labels["b"]
|
|
|
|
|
|
def test_cluster_observations_asn_alone_does_not_fuse():
|
|
"""Two observations sharing only ASN must stay distinct.
|
|
Fixture 2's failure mode in miniature — VPN/proxy hopping
|
|
fragments ASN within a single identity, and ASN sharing
|
|
across identities is common; can't drive clustering."""
|
|
a = Observation(observation_id="a", asn=64500)
|
|
b = Observation(observation_id="b", asn=64500)
|
|
labels = cluster_observations([a, b])
|
|
assert labels["a"] != labels["b"]
|
|
|
|
|
|
def test_cluster_observations_all_weak_signals_combined_does_not_fuse():
|
|
"""Even credentials + commands + ASN together don't drive
|
|
clustering — only a high-tier signal does. Stack everything
|
|
a campaign-level F1+F2 hybrid would have, confirm singletons."""
|
|
a = Observation(
|
|
observation_id="a",
|
|
asn=64500,
|
|
credentials=frozenset({("root", "toor"), ("admin", "admin")}),
|
|
commands_by_phase={"discovery": ("ls", "id")},
|
|
)
|
|
b = Observation(
|
|
observation_id="b",
|
|
asn=64500,
|
|
credentials=frozenset({("root", "toor"), ("admin", "admin")}),
|
|
commands_by_phase={"discovery": ("ls", "id")},
|
|
)
|
|
labels = cluster_observations([a, b])
|
|
assert labels["a"] != labels["b"]
|
|
|
|
|
|
def test_shared_wordlist_no_false_merge_at_identity_level():
|
|
"""F1 ratchet: even at identity level (where each row is its own
|
|
identity), the production clusterer must not fuse credential-
|
|
sharing observations. Tightens the F1 bound by asserting
|
|
completeness == 1.0 at identity-level scoring (no truth identity
|
|
is split, because every row is its own truth identity)."""
|
|
from tests.factories.campaign_factory import generate, load_yaml
|
|
from tests.clustering.metrics import score
|
|
|
|
corpus = generate(load_yaml(FIXTURE_DIR / "shared_wordlist.yaml"), seed=0)
|
|
pred = _production_clusterer_predict(corpus)
|
|
metrics = score(corpus.truth_labels(level="identity"), pred)
|
|
# Each row must land in its own predicted cluster — anything else
|
|
# is a false merge driven by the credential-overlap signal.
|
|
assert len(set(pred.values())) == len(corpus.attackers)
|
|
assert metrics["homogeneity"] == pytest.approx(1.0)
|
|
|
|
|
|
def test_vpn_hopping_asn_alone_would_have_fragmented_but_doesnt():
|
|
"""F2 ratchet: vpn_hopping has 5 distinct ASNs across one identity.
|
|
A clusterer that lets ASN drive would split into 5; the production
|
|
clusterer doesn't because ASN is very-low-tier and JA3 / HASSH
|
|
are stable. Confirms tier discipline holds end-to-end."""
|
|
from tests.factories.campaign_factory import generate, load_yaml
|
|
corpus = generate(load_yaml(FIXTURE_DIR / "vpn_hopping.yaml"), seed=0)
|
|
pred = _production_clusterer_predict(corpus)
|
|
asns = {a.asn for a in corpus.attackers}
|
|
assert len(asns) == 5, "fixture sanity: 5 distinct ASNs"
|
|
# All 5 land in one cluster, not 5.
|
|
assert len(set(pred.values())) == 1
|
|
|
|
|
|
def test_cluster_observations_medium_alone_does_not_fuse():
|
|
"""Two observations sharing only command-sequence (medium-tier)
|
|
must stay in distinct clusters — medium is a supporting signal."""
|
|
a = Observation(
|
|
observation_id="a",
|
|
commands_by_phase={"discovery": ("ls", "id", "uname")},
|
|
)
|
|
b = Observation(
|
|
observation_id="b",
|
|
commands_by_phase={"discovery": ("ls", "id", "uname")},
|
|
)
|
|
labels = cluster_observations([a, b])
|
|
assert labels["a"] != labels["b"]
|
|
|
|
|
|
def _build_noise_floor_corpus():
|
|
"""Expand noise_floor.yaml's include_fixtures block into one corpus."""
|
|
import yaml as _yaml
|
|
from typing import Any
|
|
from tests.factories.campaign_factory import generate, load_yaml
|
|
|
|
declared = _yaml.safe_load(
|
|
(FIXTURE_DIR / "noise_floor.yaml").read_text(encoding="utf-8")
|
|
)
|
|
campaigns: list[dict[str, Any]] = []
|
|
inherited_noise = 0
|
|
for fname in declared["include_fixtures"]:
|
|
sub = load_yaml(FIXTURE_DIR / fname)
|
|
if "corpus" in sub:
|
|
campaigns.extend(sub["corpus"].get("campaigns", []))
|
|
inherited_noise += int(
|
|
(sub["corpus"].get("noise") or {}).get("scanner_count", 0)
|
|
)
|
|
else:
|
|
campaigns.append({"campaign": sub["campaign"]})
|
|
extra = int(declared.get("extra_noise_scanners", 0))
|
|
spec = {"corpus": {
|
|
"campaigns": campaigns,
|
|
"noise": {"scanner_count": inherited_noise + extra},
|
|
}}
|
|
return generate(spec, seed=0)
|
|
|
|
|
|
def test_noise_floor_singleton_recall_holds_with_production_clusterer():
|
|
"""Fixture 6 ratchet — noise floor isolation.
|
|
|
|
The load-bearing F6 invariant for the *production* clusterer:
|
|
truth-singleton noise scanners must not be absorbed into real
|
|
campaigns. A clusterer that pulls noise into campaigns dilutes
|
|
attribution to nothing.
|
|
|
|
Scored at *campaign* level so the truth-singleton noise scanners
|
|
align with the prediction (each noise row has its own truth
|
|
campaign id). Identity-level scoring is muddier here — see
|
|
``test_noise_floor_intra_campaign_recovery`` below for the
|
|
constituent-campaign test that *is* identity-shaped.
|
|
"""
|
|
from tests.clustering.metrics import score
|
|
|
|
corpus = _build_noise_floor_corpus()
|
|
pred = _production_clusterer_predict(corpus)
|
|
metrics = score(corpus.truth_labels(level="campaign"), pred)
|
|
assert metrics["singleton_recall"] >= 0.95, metrics
|
|
|
|
|
|
def test_noise_floor_intra_campaign_recovery_with_production_clusterer():
|
|
"""The other half of F6: real campaigns must still resolve through
|
|
the noise. Specifically: vpn_hopping's 5 rotations land in one
|
|
cluster (its identity-level signature), and shared_wordlist's two
|
|
distinct campaigns stay un-merged despite sharing wordlists.
|
|
Demonstrates the production clusterer's tier discipline holds
|
|
under cross-corpus interference, not just per-fixture in
|
|
isolation."""
|
|
corpus = _build_noise_floor_corpus()
|
|
pred = _production_clusterer_predict(corpus)
|
|
|
|
# vpn_hopping: all 5 rotation rows fold into one predicted cluster.
|
|
vpn_obs = [
|
|
a.attacker_id for a in corpus.attackers
|
|
if a.truth_campaign_id == "vpn-hopping-001"
|
|
]
|
|
assert len(vpn_obs) == 5
|
|
vpn_clusters = {pred[oid] for oid in vpn_obs}
|
|
assert len(vpn_clusters) == 1, (
|
|
"vpn_hopping must consolidate to one cluster across rotations"
|
|
)
|
|
|
|
# shared_wordlist A and B: distinct fingerprints → must stay
|
|
# separate clusters despite shared credentials in the noise floor.
|
|
sw_a = [
|
|
a.attacker_id for a in corpus.attackers
|
|
if a.truth_campaign_id == "shared-wordlist-A"
|
|
]
|
|
sw_b = [
|
|
a.attacker_id for a in corpus.attackers
|
|
if a.truth_campaign_id == "shared-wordlist-B"
|
|
]
|
|
assert sw_a and sw_b
|
|
sw_a_clusters = {pred[oid] for oid in sw_a}
|
|
sw_b_clusters = {pred[oid] for oid in sw_b}
|
|
assert sw_a_clusters.isdisjoint(sw_b_clusters), (
|
|
"shared_wordlist A and B must not share a cluster"
|
|
)
|
|
|
|
|
|
def test_slow_burn_passes_with_production_clusterer():
|
|
"""Fixture 7 (slow_burn): one campaign across 3 multi-week operational
|
|
windows. Shared JA3 + HASSH + C2 across all 3 actors. The production
|
|
clusterer must fold them into one cluster — *despite* the multi-week
|
|
silence between windows. Time-agnostic invariant in action."""
|
|
from tests.clustering.fixture_harness import assert_fixture_bounds
|
|
from tests.factories.campaign_factory import generate, load_yaml
|
|
|
|
corpus = generate(load_yaml(FIXTURE_DIR / "slow_burn.yaml"), seed=0)
|
|
metrics = assert_fixture_bounds(
|
|
corpus, _production_clusterer_predict,
|
|
FIXTURE_DIR / "slow_burn.expected.yaml",
|
|
)
|
|
pred = _production_clusterer_predict(corpus)
|
|
# All three operational windows in one cluster — the F7 contract.
|
|
assert len(set(pred.values())) == 1
|
|
assert metrics["completeness"] == pytest.approx(1.0)
|
|
|
|
|
|
def test_slow_burn_time_shift_invariance():
|
|
"""Time-agnostic invariant in execution: shifting every observation's
|
|
session timestamps by an arbitrary delta must not change the
|
|
predicted clusters. This is the runtime counterpart of the
|
|
Observation-no-time-fields static check in test_similarity.py."""
|
|
from datetime import timedelta
|
|
from tests.factories.campaign_factory import generate, load_yaml
|
|
|
|
corpus = generate(load_yaml(FIXTURE_DIR / "slow_burn.yaml"), seed=0)
|
|
baseline = _production_clusterer_predict(corpus)
|
|
|
|
# Shift every session by +90 days (a full multi-month gap) and
|
|
# re-cluster. Predicted membership must be identical.
|
|
for att in corpus.attackers:
|
|
att.first_seen += timedelta(days=90)
|
|
att.last_seen += timedelta(days=90)
|
|
for s in att.sessions:
|
|
s.started_at += timedelta(days=90)
|
|
|
|
shifted = _production_clusterer_predict(corpus)
|
|
# Cluster ids may differ as opaque labels but membership groupings
|
|
# must match. Convert each prediction to canonical form: a set of
|
|
# frozensets of co-clustered observation_ids.
|
|
def _canonical(pred: dict[str, str]) -> set[frozenset[str]]:
|
|
groups: dict[str, set[str]] = {}
|
|
for oid, cid in pred.items():
|
|
groups.setdefault(cid, set()).add(oid)
|
|
return {frozenset(g) for g in groups.values()}
|
|
|
|
assert _canonical(baseline) == _canonical(shifted)
|
|
|
|
|
|
def test_vpn_hopping_passes_at_identity_level_with_production_clusterer():
|
|
"""Fixture 2: one rotating actor with stable JA3 + HASSH across
|
|
5 ASNs. The production clusterer must fold all 5 observations into
|
|
one identity (high-weight JA3 / HASSH agreement)."""
|
|
from tests.clustering.fixture_harness import assert_fixture_bounds
|
|
from tests.factories.campaign_factory import generate, load_yaml
|
|
|
|
corpus = generate(load_yaml(FIXTURE_DIR / "vpn_hopping.yaml"), seed=0)
|
|
metrics = assert_fixture_bounds(
|
|
corpus, _production_clusterer_predict,
|
|
FIXTURE_DIR / "vpn_hopping.expected.yaml",
|
|
truth_level="identity",
|
|
)
|
|
assert metrics["adjusted_rand_index"] == pytest.approx(1.0)
|
|
assert metrics["completeness"] == pytest.approx(1.0)
|