Files
DECNET/decnet/correlation/graph.py
anti bff03d1198 Add cross-decky correlation engine and decnet correlate command
When the same attacker IP touches multiple deckies, the engine builds a
chronological traversal graph and reports the lateral movement path.

decnet/correlation/
  parser.py   — RFC 5424 line → LogEvent; handles src_ip + src field variants
  graph.py    — AttackerTraversal / TraversalHop data types with path/duration
  engine.py   — CorrelationEngine: ingest(), traversals(), report_table/json,
                traversal_syslog_lines() (emits WARNING-severity RFC 5424)
  __init__.py — public API re-exports

decnet/cli.py — `decnet correlate` command (--log-file, --min-deckies,
                --output table|json|syslog, --emit-syslog)

tests/test_correlation.py — 49 tests: parser, graph, engine, reporting

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-04 13:53:30 -03:00

85 lines
2.5 KiB
Python

"""
Traversal graph data types for the DECNET correlation engine.
An AttackerTraversal represents one attacker IP's movement across multiple
deckies. Hops are ordered chronologically; the traversal path is derived
by reading the unique decky sequence from the hop list.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
@dataclass
class TraversalHop:
"""A single event in an attacker's traversal through the deception network."""
timestamp: datetime
decky: str # decky node name (e.g. "decky-01")
service: str # service that logged the event (e.g. "ssh", "http")
event_type: str # MSGID from the log line (e.g. "login_attempt")
@dataclass
class AttackerTraversal:
"""
All activity from a single attacker IP across two or more deckies,
sorted in chronological order.
"""
attacker_ip: str
hops: list[TraversalHop] # chronologically sorted
@property
def first_seen(self) -> datetime:
return self.hops[0].timestamp
@property
def last_seen(self) -> datetime:
return self.hops[-1].timestamp
@property
def duration_seconds(self) -> float:
return (self.last_seen - self.first_seen).total_seconds()
@property
def deckies(self) -> list[str]:
"""Unique deckies touched, preserving first-contact order."""
seen: list[str] = []
for hop in self.hops:
if hop.decky not in seen:
seen.append(hop.decky)
return seen
@property
def decky_count(self) -> int:
return len(set(h.decky for h in self.hops))
@property
def path(self) -> str:
"""Human-readable traversal path: decky-01 → decky-03 → decky-07"""
return "".join(self.deckies)
def to_dict(self) -> dict:
return {
"attacker_ip": self.attacker_ip,
"decky_count": self.decky_count,
"deckies": self.deckies,
"path": self.path,
"first_seen": self.first_seen.isoformat(),
"last_seen": self.last_seen.isoformat(),
"duration_seconds": self.duration_seconds,
"hop_count": len(self.hops),
"hops": [
{
"timestamp": h.timestamp.isoformat(),
"decky": h.decky,
"service": h.service,
"event_type": h.event_type,
}
for h in self.hops
],
}