feat(correlation): interleave mutation markers into attacker traversals

Parser now tags ``mutator`` / ``decky_mutated`` lines with ``kind="mutation"``
so the engine can route them into a sibling ``_mutations`` index keyed
by decky name instead of the per-IP attacker index.  ``traversals()``
joins the two streams: every attacker gets a ``mutations_during`` list
of markers from touched deckies bounded by their first/last-seen
window.  ``AttackerTraversal.to_dict()`` grows a ``mutations_during``
field and a ``timeline`` that chronologically interleaves hops and
markers, so an ``SSH at T5 → mutation at T6 → HTTP at T7`` substrate
transition is visible to UI consumers instead of reading as a silent
discontinuity.

The existing hops-only JSON shape is preserved; old clients that
ignore unknown keys keep working.
This commit is contained in:
2026-04-21 19:37:35 -04:00
parent bf5ed7abbb
commit d4d8a2ad0d
4 changed files with 260 additions and 4 deletions

View File

@@ -28,7 +28,7 @@ from typing import Any, Callable
from rich.table import Table
from decnet.correlation.graph import AttackerTraversal, TraversalHop
from decnet.correlation.graph import AttackerTraversal, MutationMarker, TraversalHop
from decnet.correlation.parser import LogEvent, parse_line
from decnet.logging.syslog_formatter import (
SEVERITY_WARNING,
@@ -56,10 +56,15 @@ class CorrelationEngine:
) -> None:
# attacker_ip → chronological list of events (only events with an IP)
self._events: dict[str, list[LogEvent]] = defaultdict(list)
# decky_name → chronological list of mutation events. Sibling
# index to ``_events``; traversals() joins them by time window.
self._mutations: dict[str, list[LogEvent]] = defaultdict(list)
# Total lines parsed (including no-IP and non-DECNET lines)
self.lines_parsed: int = 0
# Total events indexed (had an attacker_ip)
self.events_indexed: int = 0
# Total mutation events indexed (kind="mutation")
self.mutations_indexed: int = 0
# Optional bus hook — invoked on first-sighting of an attacker IP.
# Always fires exactly once per IP for the lifetime of the engine.
self._publish_fn = publish_fn
@@ -79,6 +84,10 @@ class CorrelationEngine:
event = parse_line(line)
if event is None:
return None
if event.kind == "mutation":
self._mutations[event.decky].append(event)
self.mutations_indexed += 1
return event
if event.attacker_ip:
first_sighting = event.attacker_ip not in self._events
self._events[event.attacker_ip].append(event)
@@ -135,7 +144,22 @@ class CorrelationEngine:
for e in events),
key=lambda h: h.timestamp,
)
result.append(AttackerTraversal(attacker_ip=ip, hops=hops))
# Per-attacker mutation markers: any mutation on a touched
# decky between first_seen and last_seen. Window is
# inclusive on both ends so a creation-at-T0 + first-contact-
# at-T0 race still attaches the marker.
first_ts = hops[0].timestamp
last_ts = hops[-1].timestamp
touched = {h.decky for h in hops}
markers: list[MutationMarker] = []
for decky in touched:
for mev in self._mutations.get(decky, ()):
if first_ts <= mev.timestamp <= last_ts:
markers.append(_marker_from_event(mev))
markers.sort(key=lambda m: m.timestamp)
result.append(AttackerTraversal(
attacker_ip=ip, hops=hops, mutations_during=markers,
))
return sorted(result, key=lambda t: t.first_seen)
def all_attackers(self) -> dict[str, int]:
@@ -221,6 +245,26 @@ class CorrelationEngine:
# Helpers #
# ------------------------------------------------------------------ #
def _marker_from_event(event: LogEvent) -> MutationMarker:
"""Build a :class:`MutationMarker` from a parsed ``decky_mutated`` log event.
The mutator emits ``old_services``/``new_services`` as comma-joined
strings in the SD params (the RFC 5424 grammar doesn't have native
lists). We split them back on the way out — empty string ⇒ empty
list, matching the creation/retirement emission sites.
"""
def _split(s: str) -> list[str]:
return [p for p in s.split(",") if p]
return MutationMarker(
timestamp=event.timestamp,
decky=event.decky,
old_services=_split(event.fields.get("old_services", "")),
new_services=_split(event.fields.get("new_services", "")),
trigger=event.fields.get("trigger", ""),
)
def _fmt_duration(seconds: float) -> str:
if seconds < 60:
return f"{seconds:.0f}s"