Files
DECNET/tests/clustering/test_similarity.py
anti a9775c4000 feat(clustering): similarity-graph primitives
Adds the four weight-tier edge functions as pure, time-agnostic
scoring primitives over an Observation projection. Each returns a
score in [0, 1]; the connected-components impl will combine + threshold
in subsequent commits.

Tier semantics (from IDENTITY_RESOLUTION.md):
- high   — JA3/HASSH/payload-hash/C2-endpoint exact match
- medium — phase-bucketed command-sequence Jaccard
- low    — credential-attempt-set Jaccard (defeated alone by F1)
- very low — ASN equality (defeated alone by F2)

Time-agnostic invariant is a static test: Observation has no time
fields, so no edge function can silently start using them. Fixture 7
forbids recency-decay clustering on multi-month APT campaigns.

A from_synthetic() adapter projects SyntheticAttacker corpora into
Observation; the production-row adapter lands when the clusterer
starts reading the attackers table.
2026-04-26 08:13:29 -04:00

222 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Unit tests for the similarity-graph primitives.
Each edge function is tested in isolation: agreement → high score,
disagreement → zero, missing-data → zero. Combination logic +
thresholds live in the connected-components impl and are covered by
the fixture suite once those land.
"""
from __future__ import annotations
import pytest
from decnet.clustering.impl.similarity import (
Observation,
from_synthetic,
high_weight_edge,
low_weight_edge,
medium_weight_edge,
very_low_weight_edge,
)
def _obs(**kwargs) -> Observation:
"""Build an Observation with sensible defaults for tests."""
kwargs.setdefault("observation_id", "obs-x")
return Observation(**kwargs)
# ─── high_weight_edge ──────────────────────────────────────────────────────
def test_high_weight_ja3_match():
a = _obs(ja3="ja3-stable")
b = _obs(ja3="ja3-stable")
assert high_weight_edge(a, b) == 1.0
def test_high_weight_hassh_match():
a = _obs(hassh="hassh-stable")
b = _obs(hassh="hassh-stable")
assert high_weight_edge(a, b) == 1.0
def test_high_weight_payload_hash_overlap():
a = _obs(payload_hashes=frozenset({"pl-1", "pl-2"}))
b = _obs(payload_hashes=frozenset({"pl-2", "pl-3"}))
assert high_weight_edge(a, b) == 1.0
def test_high_weight_c2_overlap():
a = _obs(c2_endpoints=frozenset({"c2.example.com"}))
b = _obs(c2_endpoints=frozenset({"c2.example.com", "c2-alt.example.com"}))
assert high_weight_edge(a, b) == 1.0
def test_high_weight_no_match():
a = _obs(ja3="ja3-a", hassh="hassh-a", payload_hashes=frozenset({"x"}))
b = _obs(ja3="ja3-b", hassh="hassh-b", payload_hashes=frozenset({"y"}))
assert high_weight_edge(a, b) == 0.0
def test_high_weight_both_null_ja3_does_not_match():
"""Both-null JA3 must not be treated as 'agreement' — that would
fuse every un-fingerprinted noise scanner into one mega-cluster."""
a = _obs(ja3=None, hassh=None)
b = _obs(ja3=None, hassh=None)
assert high_weight_edge(a, b) == 0.0
def test_high_weight_empty_sets_no_match():
a = _obs(payload_hashes=frozenset(), c2_endpoints=frozenset())
b = _obs(payload_hashes=frozenset(), c2_endpoints=frozenset())
assert high_weight_edge(a, b) == 0.0
# ─── medium_weight_edge ────────────────────────────────────────────────────
def test_medium_weight_jaccard_full_match_in_one_phase():
a = _obs(commands_by_phase={"discovery": ("ls", "id", "uname -a")})
b = _obs(commands_by_phase={"discovery": ("ls", "id", "uname -a")})
assert medium_weight_edge(a, b) == pytest.approx(1.0)
def test_medium_weight_jaccard_partial_match():
a = _obs(commands_by_phase={"discovery": ("ls", "id", "uname -a", "whoami")})
b = _obs(commands_by_phase={"discovery": ("ls", "id")})
# |A∩B|=2, |AB|=4 → 0.5
assert medium_weight_edge(a, b) == pytest.approx(0.5)
def test_medium_weight_picks_max_across_phases():
a = _obs(commands_by_phase={
"discovery": ("ls",),
"exploitation": ("./payload", "chmod +x payload"),
})
b = _obs(commands_by_phase={
"discovery": ("ps",), # 0.0
"exploitation": ("./payload", "chmod +x payload"), # 1.0
})
assert medium_weight_edge(a, b) == pytest.approx(1.0)
def test_medium_weight_no_shared_phase_returns_zero():
a = _obs(commands_by_phase={"discovery": ("ls",)})
b = _obs(commands_by_phase={"exploitation": ("./payload",)})
assert medium_weight_edge(a, b) == 0.0
def test_medium_weight_disjoint_commands_in_shared_phase():
a = _obs(commands_by_phase={"discovery": ("ls",)})
b = _obs(commands_by_phase={"discovery": ("ps",)})
# |A∩B|=0, |AB|=2
assert medium_weight_edge(a, b) == 0.0
def test_medium_weight_empty_corpora_returns_zero():
a = _obs()
b = _obs()
assert medium_weight_edge(a, b) == 0.0
# ─── low_weight_edge ───────────────────────────────────────────────────────
def test_low_weight_credential_jaccard_match():
a = _obs(credentials=frozenset({("root", "toor"), ("admin", "admin")}))
b = _obs(credentials=frozenset({("root", "toor"), ("admin", "admin")}))
assert low_weight_edge(a, b) == pytest.approx(1.0)
def test_low_weight_credential_partial_overlap():
a = _obs(credentials=frozenset({("root", "toor"), ("admin", "admin")}))
b = _obs(credentials=frozenset({("root", "toor"), ("user", "user")}))
assert low_weight_edge(a, b) == pytest.approx(1 / 3)
def test_low_weight_no_credentials_returns_zero():
a = _obs()
b = _obs(credentials=frozenset({("root", "toor")}))
assert low_weight_edge(a, b) == 0.0
# ─── very_low_weight_edge ──────────────────────────────────────────────────
def test_very_low_weight_asn_match():
a = _obs(asn=64500)
b = _obs(asn=64500)
assert very_low_weight_edge(a, b) == 1.0
def test_very_low_weight_asn_mismatch():
a = _obs(asn=64500)
b = _obs(asn=64501)
assert very_low_weight_edge(a, b) == 0.0
def test_very_low_weight_asn_null_returns_zero():
a = _obs(asn=None)
b = _obs(asn=64500)
assert very_low_weight_edge(a, b) == 0.0
# ─── time-agnostic invariant ───────────────────────────────────────────────
def test_observations_carry_no_timestamps():
"""Compile-time guarantee: Observation has no time fields, so no
edge function can accidentally start using them. Fixture 7 forbids
recency-decay clustering."""
field_names = set(Observation.__dataclass_fields__.keys())
forbidden = {"first_seen", "last_seen", "started_at", "session_midpoint", "timestamp"}
assert field_names.isdisjoint(forbidden), (
f"Observation grew time fields: {field_names & forbidden}. "
"Fixture 7 (slow_burn) forbids recency-aware clustering."
)
# ─── from_synthetic adapter ────────────────────────────────────────────────
def test_from_synthetic_round_trip():
"""The adapter projects a SyntheticAttacker into an Observation
that the edge functions can score over."""
from datetime import datetime, timezone
from tests.factories.campaign_factory import (
SyntheticAttacker, SyntheticSession,
)
from decnet.clustering.ukc import UKCPhase
now = datetime.now(timezone.utc)
sess = SyntheticSession(
session_id="s1",
attacker_id="a1",
decky_id="d1",
started_at=now,
duration_s=10.0,
phase=UKCPhase.DISCOVERY,
commands=["ls", "id"],
credentials_tried=[("root", "toor")],
payload_hash="pl-1",
c2_callback="c2.example.com",
truth_campaign_id="c1",
truth_actor_id="actor-1",
)
att = SyntheticAttacker(
attacker_id="a1", ip="1.1.1.1", asn=64500,
ja3="ja3-x", hassh="hassh-y",
first_seen=now, last_seen=now,
truth_campaign_id="c1", truth_actor_id="actor-1",
sessions=[sess],
)
obs = from_synthetic(att)
assert obs.observation_id == "a1"
assert obs.ja3 == "ja3-x"
assert obs.hassh == "hassh-y"
assert obs.asn == 64500
assert obs.payload_hashes == frozenset({"pl-1"})
assert obs.c2_endpoints == frozenset({"c2.example.com"})
assert obs.credentials == frozenset({("root", "toor")})
assert obs.commands_by_phase == {"discovery": ("ls", "id")}