test(clustering): fixture 7 slow_burn + recency_decay reference
Multi-month APT campaign modeling real APT operational tempo: recon over weeks, exploitation later, action-on-objectives later still. The unique signal this fixture stresses is TIME-AGNOSTIC IDENTITY across multi-week silences — a clusterer that silently expires old edges fragments any campaign that operates over months. Three DSL actors represent the operator's three operational windows (week 2, month 2, month 3 of a 90-day campaign), all sharing JA3 + HASSH + payload + C2 callback. Campaign-level fixture only — the three actors mint distinct truth_identity_id rows by design (same modeling caveat as fixtures 4 and 5). The fixture's narrative mirrors how an APT works a deep nested topology (DECNET MazeNET mode): map decoy networks for weeks, only then commit to exploitation. Slow-and-low pacing is the signal. recency_decay_clusterer added to fixture_harness — same edge construction as composite_signals_clusterer, but each edge weighted by exp(-time_distance / half_life_days) and dropped below a threshold. Adversarial reference for slow_burn: with 14-day half- life and 0.5 threshold, edges between operational windows (24+ days apart) decay below threshold and drop. The campaign fragments into three clusters; completeness collapses. This is the canonical production failure mode for graph clusterers that bound memory or bias toward "what's hot" by silently expiring old edges. Catching it in synthetic data is what fixture 7 exists for; the replay tier will surface real-world drift / dwell patterns that calibrate the half-life threshold the real algorithm should tolerate. Four tests: corpus shape (window-isolated sessions, stable fingerprint), pipeline pass via composite_signals_clusterer (time- agnostic — folds all three windows), adversarial fragmentation (3 clusters at 14-day half-life), long-half-life sanity (gentle decay unions everything; confirms behavior depends on the half-life parameter, not on something unrelated).
This commit is contained in:
@@ -59,6 +59,14 @@ cluster on, not the quality of the result.
|
||||
``(ja3, hassh)`` match OR shared C2 callback into the same
|
||||
cluster. Approximates the planned similarity graph well enough
|
||||
to score the combined-corpus fixture (fixture 6, noise_floor).
|
||||
|
||||
* `recency_decay_clusterer` — deliberately-bad reference that
|
||||
starts from the same composite signal graph but weights each
|
||||
edge by ``exp(-time_distance / half_life_days)`` and drops
|
||||
edges below a threshold. Adversarial reference for fixture 7
|
||||
(slow_burn): the canonical production failure mode where a
|
||||
graph clusterer with recency decay fragments long-running
|
||||
APT campaigns by silently expiring multi-week-old edges.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -284,6 +292,90 @@ def composite_signals_clusterer(corpus: GeneratedCorpus) -> dict[str, str]:
|
||||
return pred
|
||||
|
||||
|
||||
def recency_decay_clusterer(
|
||||
corpus: GeneratedCorpus,
|
||||
*,
|
||||
half_life_days: float = 14.0,
|
||||
threshold: float = 0.5,
|
||||
) -> dict[str, str]:
|
||||
"""Composite-signal graph with exponential time decay on edges.
|
||||
|
||||
Same edge construction as ``composite_signals_clusterer``
|
||||
(fingerprint match OR overlapping C2), but each edge's weight
|
||||
is multiplied by ``exp(-time_distance / half_life_days)`` where
|
||||
``time_distance`` is the gap (in days) between the two attackers'
|
||||
session-midpoint timestamps. Edges with decayed weight below
|
||||
``threshold`` are dropped before connected components are
|
||||
extracted.
|
||||
|
||||
Deliberately-bad reference for fixture 7 (slow_burn): an APT
|
||||
campaign that operates over months will be fragmented by any
|
||||
clusterer that silently expires old edges. This is the canonical
|
||||
production failure mode for recency-weighted graph clustering on
|
||||
long-running threat actors.
|
||||
|
||||
Attackers with no signals or no sessions stay singleton.
|
||||
"""
|
||||
import math
|
||||
from datetime import timedelta
|
||||
|
||||
callbacks: dict[str, set[str]] = {}
|
||||
fingerprint: dict[str, tuple[str | None, str | None] | None] = {}
|
||||
midpoint: dict[str, "object | None"] = {}
|
||||
for att in corpus.attackers:
|
||||
callbacks[att.attacker_id] = {
|
||||
s.c2_callback for s in att.sessions if s.c2_callback
|
||||
}
|
||||
if att.ja3 is None and att.hassh is None:
|
||||
fingerprint[att.attacker_id] = None
|
||||
else:
|
||||
fingerprint[att.attacker_id] = (att.ja3, att.hassh)
|
||||
if att.sessions:
|
||||
starts = [s.started_at for s in att.sessions]
|
||||
ends = [s.started_at + timedelta(seconds=s.duration_s) for s in att.sessions]
|
||||
mid = min(starts) + (max(ends) - min(starts)) / 2
|
||||
midpoint[att.attacker_id] = mid
|
||||
else:
|
||||
midpoint[att.attacker_id] = None
|
||||
|
||||
ids = list(callbacks.keys())
|
||||
_parent, find, union = _union_find(ids)
|
||||
|
||||
def edge_strength(a: str, b: str) -> float:
|
||||
"""Base signal strength before time decay; 1.0 on match, else 0."""
|
||||
fa, fb = fingerprint[a], fingerprint[b]
|
||||
if fa is not None and fb is not None and fa == fb:
|
||||
return 1.0
|
||||
sa, sb = callbacks[a], callbacks[b]
|
||||
if sa and sb and (sa & sb):
|
||||
return 1.0
|
||||
return 0.0
|
||||
|
||||
for i, a in enumerate(ids):
|
||||
ma = midpoint[a]
|
||||
if ma is None:
|
||||
continue
|
||||
for b in ids[i + 1 :]:
|
||||
mb = midpoint[b]
|
||||
if mb is None:
|
||||
continue
|
||||
base = edge_strength(a, b)
|
||||
if base <= 0.0:
|
||||
continue
|
||||
gap_days = abs((ma - mb).total_seconds()) / 86400.0
|
||||
weight = base * math.exp(-gap_days / half_life_days)
|
||||
if weight >= threshold:
|
||||
union(a, b)
|
||||
|
||||
pred: dict[str, str] = {}
|
||||
for aid in ids:
|
||||
if fingerprint[aid] is None and not callbacks[aid]:
|
||||
pred[aid] = f"recency-singleton-{aid}"
|
||||
else:
|
||||
pred[aid] = f"recency-{find(aid)}"
|
||||
return pred
|
||||
|
||||
|
||||
def time_window_clusterer(
|
||||
corpus: GeneratedCorpus, *, gap_days: float = 1.0
|
||||
) -> dict[str, str]:
|
||||
|
||||
Reference in New Issue
Block a user