From d4d8a2ad0d9e1642766cea9edb6b2952bd979468 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 21 Apr 2026 19:37:35 -0400 Subject: [PATCH] feat(correlation): interleave mutation markers into attacker traversals MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Parser now tags ``mutator`` / ``decky_mutated`` lines with ``kind="mutation"`` so the engine can route them into a sibling ``_mutations`` index keyed by decky name instead of the per-IP attacker index. ``traversals()`` joins the two streams: every attacker gets a ``mutations_during`` list of markers from touched deckies bounded by their first/last-seen window. ``AttackerTraversal.to_dict()`` grows a ``mutations_during`` field and a ``timeline`` that chronologically interleaves hops and markers, so an ``SSH at T5 → mutation at T6 → HTTP at T7`` substrate transition is visible to UI consumers instead of reading as a silent discontinuity. The existing hops-only JSON shape is preserved; old clients that ignore unknown keys keep working. --- decnet/correlation/engine.py | 48 ++++++++++++- decnet/correlation/graph.py | 65 +++++++++++++++++- decnet/correlation/parser.py | 21 +++++- tests/test_correlation.py | 130 +++++++++++++++++++++++++++++++++++ 4 files changed, 260 insertions(+), 4 deletions(-) diff --git a/decnet/correlation/engine.py b/decnet/correlation/engine.py index c8340df0..e5ac3f4f 100644 --- a/decnet/correlation/engine.py +++ b/decnet/correlation/engine.py @@ -28,7 +28,7 @@ from typing import Any, Callable from rich.table import Table -from decnet.correlation.graph import AttackerTraversal, TraversalHop +from decnet.correlation.graph import AttackerTraversal, MutationMarker, TraversalHop from decnet.correlation.parser import LogEvent, parse_line from decnet.logging.syslog_formatter import ( SEVERITY_WARNING, @@ -56,10 +56,15 @@ class CorrelationEngine: ) -> None: # attacker_ip → chronological list of events (only events with an IP) self._events: dict[str, list[LogEvent]] = defaultdict(list) + # decky_name → chronological list of mutation events. Sibling + # index to ``_events``; traversals() joins them by time window. + self._mutations: dict[str, list[LogEvent]] = defaultdict(list) # Total lines parsed (including no-IP and non-DECNET lines) self.lines_parsed: int = 0 # Total events indexed (had an attacker_ip) self.events_indexed: int = 0 + # Total mutation events indexed (kind="mutation") + self.mutations_indexed: int = 0 # Optional bus hook — invoked on first-sighting of an attacker IP. # Always fires exactly once per IP for the lifetime of the engine. self._publish_fn = publish_fn @@ -79,6 +84,10 @@ class CorrelationEngine: event = parse_line(line) if event is None: return None + if event.kind == "mutation": + self._mutations[event.decky].append(event) + self.mutations_indexed += 1 + return event if event.attacker_ip: first_sighting = event.attacker_ip not in self._events self._events[event.attacker_ip].append(event) @@ -135,7 +144,22 @@ class CorrelationEngine: for e in events), key=lambda h: h.timestamp, ) - result.append(AttackerTraversal(attacker_ip=ip, hops=hops)) + # Per-attacker mutation markers: any mutation on a touched + # decky between first_seen and last_seen. Window is + # inclusive on both ends so a creation-at-T0 + first-contact- + # at-T0 race still attaches the marker. + first_ts = hops[0].timestamp + last_ts = hops[-1].timestamp + touched = {h.decky for h in hops} + markers: list[MutationMarker] = [] + for decky in touched: + for mev in self._mutations.get(decky, ()): + if first_ts <= mev.timestamp <= last_ts: + markers.append(_marker_from_event(mev)) + markers.sort(key=lambda m: m.timestamp) + result.append(AttackerTraversal( + attacker_ip=ip, hops=hops, mutations_during=markers, + )) return sorted(result, key=lambda t: t.first_seen) def all_attackers(self) -> dict[str, int]: @@ -221,6 +245,26 @@ class CorrelationEngine: # Helpers # # ------------------------------------------------------------------ # +def _marker_from_event(event: LogEvent) -> MutationMarker: + """Build a :class:`MutationMarker` from a parsed ``decky_mutated`` log event. + + The mutator emits ``old_services``/``new_services`` as comma-joined + strings in the SD params (the RFC 5424 grammar doesn't have native + lists). We split them back on the way out — empty string ⇒ empty + list, matching the creation/retirement emission sites. + """ + def _split(s: str) -> list[str]: + return [p for p in s.split(",") if p] + + return MutationMarker( + timestamp=event.timestamp, + decky=event.decky, + old_services=_split(event.fields.get("old_services", "")), + new_services=_split(event.fields.get("new_services", "")), + trigger=event.fields.get("trigger", ""), + ) + + def _fmt_duration(seconds: float) -> str: if seconds < 60: return f"{seconds:.0f}s" diff --git a/decnet/correlation/graph.py b/decnet/correlation/graph.py index d24f0dd1..ee08b4d1 100644 --- a/decnet/correlation/graph.py +++ b/decnet/correlation/graph.py @@ -8,10 +8,29 @@ by reading the unique decky sequence from the hop list. from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime +@dataclass +class MutationMarker: + """A substrate transition that occurred during an attacker's traversal. + + Emitted by the mutator (or deploy/teardown) and consumed by the + correlation engine so ``AttackerTraversal.to_dict()`` can interleave + substrate-change markers chronologically with attacker hops — an + interaction with ``decky-03@T5`` followed by a mutation at ``T6`` and + another interaction at ``T7`` is a substrate transition mid-session, + not a silent discontinuity. + """ + + timestamp: datetime + decky: str + old_services: list[str] + new_services: list[str] + trigger: str # creation | retirement | scheduled | operator | … + + @dataclass class TraversalHop: """A single event in an attacker's traversal through the deception network.""" @@ -31,6 +50,10 @@ class AttackerTraversal: attacker_ip: str hops: list[TraversalHop] # chronologically sorted + # Substrate-change markers on deckies this attacker touched, bounded + # by first_seen/last_seen. Empty for legacy attacker-only ingest; + # populated once mutation events flow through the engine. + mutations_during: list[MutationMarker] = field(default_factory=list) @property def first_seen(self) -> datetime: @@ -62,6 +85,35 @@ class AttackerTraversal: """Human-readable traversal path: decky-01 → decky-03 → decky-07""" return " → ".join(self.deckies) + def timeline(self) -> list[dict]: + """Chronologically interleaved hops and mutation markers. + + Each entry carries a ``kind`` discriminant (``hop`` | ``mutation``) + so JSON consumers can render them distinctly. Mutations of + deckies the attacker never touched are already filtered out at + the engine; here we just merge by timestamp. + """ + merged: list[tuple[datetime, dict]] = [] + for h in self.hops: + merged.append((h.timestamp, { + "kind": "hop", + "timestamp": h.timestamp.isoformat(), + "decky": h.decky, + "service": h.service, + "event_type": h.event_type, + })) + for m in self.mutations_during: + merged.append((m.timestamp, { + "kind": "mutation", + "timestamp": m.timestamp.isoformat(), + "decky": m.decky, + "old_services": m.old_services, + "new_services": m.new_services, + "trigger": m.trigger, + })) + merged.sort(key=lambda kv: kv[0]) + return [entry for _, entry in merged] + def to_dict(self) -> dict: return { "attacker_ip": self.attacker_ip, @@ -81,4 +133,15 @@ class AttackerTraversal: } for h in self.hops ], + "mutations_during": [ + { + "timestamp": m.timestamp.isoformat(), + "decky": m.decky, + "old_services": m.old_services, + "new_services": m.new_services, + "trigger": m.trigger, + } + for m in self.mutations_during + ], + "timeline": self.timeline(), } diff --git a/decnet/correlation/parser.py b/decnet/correlation/parser.py index 4aae3812..cbf8195d 100644 --- a/decnet/correlation/parser.py +++ b/decnet/correlation/parser.py @@ -17,8 +17,9 @@ The attacker IP may appear under several field names depending on service: from __future__ import annotations import re -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime +from typing import Literal # RFC 5424 line structure _RFC5424_RE = re.compile( @@ -41,6 +42,9 @@ _PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"') _IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "remote_addr", "target_ip", "ip") +EventKind = Literal["attacker", "mutation"] + + @dataclass class LogEvent: """A single parsed event from a DECNET syslog line.""" @@ -52,6 +56,12 @@ class LogEvent: attacker_ip: str | None # extracted from SD params; None if not present fields: dict[str, str] # all structured data params raw: str # original log line (stripped) + # ``attacker`` = service-emitted event keyed on a source IP (the + # existing correlation input). ``mutation`` = ``mutator`` worker + # event — same RFC 5424 wire format but routed into a separate + # per-decky index so substrate transitions can be interleaved into + # attacker traversals without polluting the per-IP event stream. + kind: EventKind = field(default="attacker") def _parse_sd_params(sd_rest: str) -> dict[str, str]: @@ -101,6 +111,14 @@ def parse_line(line: str) -> LogEvent | None: fields = _parse_sd_params(sd_rest) attacker_ip = _extract_attacker_ip(fields) + # Mutator-emitted transitions arrive on the same ingest stream but + # belong in the substrate-state index, not the per-IP attacker one. + kind: EventKind = ( + "mutation" + if service == "mutator" and event_type == "decky_mutated" + else "attacker" + ) + return LogEvent( timestamp=timestamp, decky=decky, @@ -109,4 +127,5 @@ def parse_line(line: str) -> LogEvent | None: attacker_ip=attacker_ip, fields=fields, raw=line, + kind=kind, ) diff --git a/tests/test_correlation.py b/tests/test_correlation.py index 7764ec8b..ddff7dc8 100644 --- a/tests/test_correlation.py +++ b/tests/test_correlation.py @@ -416,3 +416,133 @@ class TestFmtDuration: def test_hours(self): assert _fmt_duration(7200) == "2.0h" + + +# --------------------------------------------------------------------------- +# Mutation-event stream (parser kind + engine index + graph markers) +# --------------------------------------------------------------------------- + +def _mutation_line( + decky: str, + *, + old: str = "", + new: str = "ssh", + trigger: str = "scheduled", + timestamp: str = _TS, +) -> str: + return format_rfc5424( + service="mutator", + hostname=decky, + event_type="decky_mutated", + severity=SEVERITY_INFO, + timestamp=datetime.fromisoformat(timestamp), + decky=decky, + old_services=old, + new_services=new, + trigger=trigger, + ) + + +class TestParserMutationKind: + def test_mutator_line_kind_is_mutation(self): + ev = parse_line(_mutation_line("decky-01", old="ssh", new="rdp", + trigger="scheduled")) + assert ev is not None + assert ev.kind == "mutation" + + def test_default_kind_is_attacker(self): + ev = parse_line(_make_line()) + assert ev is not None + assert ev.kind == "attacker" + + def test_non_mutator_service_stays_attacker(self): + # Same event_type but different service ⇒ not a mutation + line = format_rfc5424( + service="ssh", + hostname="decky-01", + event_type="decky_mutated", + severity=SEVERITY_INFO, + timestamp=datetime.fromisoformat(_TS), + src_ip="1.1.1.1", + ) + ev = parse_line(line) + assert ev is not None + assert ev.kind == "attacker" + + +class TestEngineMutationIndex: + def test_mutation_indexed_separately(self): + engine = CorrelationEngine() + engine.ingest(_mutation_line("decky-01", old="ssh", new="rdp")) + assert engine.mutations_indexed == 1 + assert engine.events_indexed == 0 + assert "decky-01" in engine._mutations + assert "decky-01" not in engine._events + + def test_mutations_interleaved_into_traversal(self): + engine = CorrelationEngine() + # Attacker hits decky-01 and decky-02; decky-01 mutates in between + engine.ingest(_make_line(hostname="decky-01", src_ip="9.9.9.9", + timestamp=_TS)) + engine.ingest(_mutation_line("decky-01", old="ssh", new="rdp", + trigger="scheduled", timestamp=_TS2)) + engine.ingest(_make_line(hostname="decky-02", src_ip="9.9.9.9", + timestamp=_TS3)) + traversals = engine.traversals() + assert len(traversals) == 1 + t = traversals[0] + assert len(t.mutations_during) == 1 + m = t.mutations_during[0] + assert m.decky == "decky-01" + assert m.old_services == ["ssh"] + assert m.new_services == ["rdp"] + assert m.trigger == "scheduled" + + def test_mutation_outside_window_excluded(self): + engine = CorrelationEngine() + # Mutation at _TS — before attacker first_seen at _TS2 + engine.ingest(_mutation_line("decky-01", old="", new="ssh", + trigger="creation", timestamp=_TS)) + engine.ingest(_make_line(hostname="decky-01", src_ip="9.9.9.9", + timestamp=_TS2)) + engine.ingest(_make_line(hostname="decky-02", src_ip="9.9.9.9", + timestamp=_TS3)) + t = engine.traversals()[0] + # The creation happened BEFORE first contact, so it's not "during" + assert t.mutations_during == [] + + def test_mutation_on_untouched_decky_excluded(self): + engine = CorrelationEngine() + engine.ingest(_make_line(hostname="decky-01", src_ip="9.9.9.9", + timestamp=_TS)) + engine.ingest(_make_line(hostname="decky-02", src_ip="9.9.9.9", + timestamp=_TS3)) + # decky-03 mutates mid-window but the attacker never touched it + engine.ingest(_mutation_line("decky-03", old="ftp", new="smtp", + trigger="operator", timestamp=_TS2)) + t = engine.traversals()[0] + assert t.mutations_during == [] + + def test_to_dict_includes_timeline_with_markers(self): + engine = CorrelationEngine() + engine.ingest(_make_line(hostname="decky-01", src_ip="9.9.9.9", + timestamp=_TS)) + engine.ingest(_mutation_line("decky-01", old="ssh", new="rdp", + trigger="scheduled", timestamp=_TS2)) + engine.ingest(_make_line(hostname="decky-02", src_ip="9.9.9.9", + timestamp=_TS3)) + d = engine.traversals()[0].to_dict() + assert len(d["mutations_during"]) == 1 + assert d["mutations_during"][0]["trigger"] == "scheduled" + kinds = [entry["kind"] for entry in d["timeline"]] + assert kinds == ["hop", "mutation", "hop"] + + def test_report_json_serialisable_with_mutations(self): + engine = CorrelationEngine() + engine.ingest(_make_line(hostname="decky-01", src_ip="9.9.9.9", + timestamp=_TS)) + engine.ingest(_mutation_line("decky-01", old="ssh", new="rdp", + trigger="scheduled", timestamp=_TS2)) + engine.ingest(_make_line(hostname="decky-02", src_ip="9.9.9.9", + timestamp=_TS3)) + json.dumps(engine.report_json()) # must not raise