Add cross-decky correlation engine and decnet correlate command

When the same attacker IP touches multiple deckies, the engine builds a
chronological traversal graph and reports the lateral movement path.

decnet/correlation/
  parser.py   — RFC 5424 line → LogEvent; handles src_ip + src field variants
  graph.py    — AttackerTraversal / TraversalHop data types with path/duration
  engine.py   — CorrelationEngine: ingest(), traversals(), report_table/json,
                traversal_syslog_lines() (emits WARNING-severity RFC 5424)
  __init__.py — public API re-exports

decnet/cli.py — `decnet correlate` command (--log-file, --min-deckies,
                --output table|json|syslog, --emit-syslog)

tests/test_correlation.py — 49 tests: parser, graph, engine, reporting

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-04 13:53:30 -03:00
parent 7aff040579
commit bff03d1198
6 changed files with 870 additions and 0 deletions

View File

@@ -388,6 +388,61 @@ def list_distros() -> None:
console.print(table)
@app.command(name="correlate")
def correlate(
log_file: Optional[str] = typer.Option(None, "--log-file", "-f", help="Path to DECNET syslog file to analyse"),
min_deckies: int = typer.Option(2, "--min-deckies", "-m", help="Minimum number of distinct deckies an IP must touch to be reported"),
output: str = typer.Option("table", "--output", "-o", help="Output format: table | json | syslog"),
emit_syslog: bool = typer.Option(False, "--emit-syslog", help="Also print traversal events as RFC 5424 lines (for SIEM piping)"),
) -> None:
"""Analyse logs for cross-decky traversals and print the attacker movement graph."""
import sys
import json as _json
from pathlib import Path
from decnet.correlation.engine import CorrelationEngine
engine = CorrelationEngine()
if log_file:
path = Path(log_file)
if not path.exists():
console.print(f"[red]Log file not found: {log_file}[/]")
raise typer.Exit(1)
engine.ingest_file(path)
elif not sys.stdin.isatty():
for line in sys.stdin:
engine.ingest(line)
else:
console.print("[red]Provide --log-file or pipe log data via stdin.[/]")
raise typer.Exit(1)
traversals = engine.traversals(min_deckies)
if output == "json":
console.print_json(_json.dumps(engine.report_json(min_deckies), indent=2))
elif output == "syslog":
for line in engine.traversal_syslog_lines(min_deckies):
typer.echo(line)
else:
if not traversals:
console.print(
f"[yellow]No traversals detected "
f"(min_deckies={min_deckies}, events_indexed={engine.events_indexed}).[/]"
)
else:
console.print(engine.report_table(min_deckies))
console.print(
f"[dim]Parsed {engine.lines_parsed} lines · "
f"indexed {engine.events_indexed} events · "
f"{len(engine.all_attackers())} unique IPs · "
f"[bold]{len(traversals)}[/] traversal(s)[/]"
)
if emit_syslog:
for line in engine.traversal_syslog_lines(min_deckies):
typer.echo(line)
@app.command(name="archetypes")
def list_archetypes() -> None:
"""List all machine archetype profiles."""

View File

@@ -0,0 +1,13 @@
"""Cross-decky correlation engine for DECNET."""
from decnet.correlation.engine import CorrelationEngine
from decnet.correlation.graph import AttackerTraversal, TraversalHop
from decnet.correlation.parser import LogEvent, parse_line
__all__ = [
"CorrelationEngine",
"AttackerTraversal",
"TraversalHop",
"LogEvent",
"parse_line",
]

View File

@@ -0,0 +1,186 @@
"""
Cross-decky correlation engine.
Ingests RFC 5424 syslog lines from DECNET service containers and identifies
attackers that have touched more than one decky — indicating lateral movement
or an active sweep through the deception network.
Core concept
------------
Every log event that carries a source IP is indexed by that IP. Once ingestion
is complete, ``traversals()`` returns the subset of IPs that hit at least
``min_deckies`` distinct deckies, along with the full chronological hop list
for each one.
Usage
-----
engine = CorrelationEngine()
engine.ingest_file(Path("/var/log/decnet/decnet.log"))
for t in engine.traversals():
print(t.path, t.decky_count)
"""
from __future__ import annotations
import json
from collections import defaultdict
from pathlib import Path
from rich.table import Table
from decnet.correlation.graph import AttackerTraversal, TraversalHop
from decnet.correlation.parser import LogEvent, parse_line
from decnet.logging.syslog_formatter import (
SEVERITY_WARNING,
format_rfc5424,
)
class CorrelationEngine:
def __init__(self) -> None:
# attacker_ip → chronological list of events (only events with an IP)
self._events: dict[str, list[LogEvent]] = defaultdict(list)
# Total lines parsed (including no-IP and non-DECNET lines)
self.lines_parsed: int = 0
# Total events indexed (had an attacker_ip)
self.events_indexed: int = 0
# ------------------------------------------------------------------ #
# Ingestion #
# ------------------------------------------------------------------ #
def ingest(self, line: str) -> LogEvent | None:
"""
Parse and index one log line.
Returns the parsed LogEvent (even if it has no attacker IP), or
None if the line is blank / not RFC 5424.
"""
self.lines_parsed += 1
event = parse_line(line)
if event is None:
return None
if event.attacker_ip:
self._events[event.attacker_ip].append(event)
self.events_indexed += 1
return event
def ingest_file(self, path: Path) -> int:
"""
Parse every line of *path* and index it.
Returns the number of events that had an attacker IP.
"""
with open(path) as fh:
for line in fh:
self.ingest(line)
return self.events_indexed
# ------------------------------------------------------------------ #
# Query #
# ------------------------------------------------------------------ #
def traversals(self, min_deckies: int = 2) -> list[AttackerTraversal]:
"""
Return all attackers that touched at least *min_deckies* distinct
deckies, sorted by first-seen time.
"""
result: list[AttackerTraversal] = []
for ip, events in self._events.items():
if len({e.decky for e in events}) < min_deckies:
continue
hops = sorted(
(TraversalHop(e.timestamp, e.decky, e.service, e.event_type)
for e in events),
key=lambda h: h.timestamp,
)
result.append(AttackerTraversal(attacker_ip=ip, hops=hops))
return sorted(result, key=lambda t: t.first_seen)
def all_attackers(self) -> dict[str, int]:
"""Return {attacker_ip: event_count} for every IP seen, sorted by count desc."""
return dict(
sorted(
{ip: len(evts) for ip, evts in self._events.items()}.items(),
key=lambda kv: kv[1],
reverse=True,
)
)
# ------------------------------------------------------------------ #
# Reporting #
# ------------------------------------------------------------------ #
def report_table(self, min_deckies: int = 2) -> Table:
"""Rich table showing every cross-decky traversal."""
table = Table(
title="[bold red]Traversal Graph — Cross-Decky Attackers[/]",
show_lines=True,
)
table.add_column("Attacker IP", style="bold red")
table.add_column("Deckies", style="cyan", justify="right")
table.add_column("Traversal Path", style="yellow")
table.add_column("First Seen", style="dim")
table.add_column("Duration", justify="right")
table.add_column("Events", justify="right")
for t in self.traversals(min_deckies):
dur = _fmt_duration(t.duration_seconds)
table.add_row(
t.attacker_ip,
str(t.decky_count),
t.path,
t.first_seen.strftime("%Y-%m-%d %H:%M:%S UTC"),
dur,
str(len(t.hops)),
)
return table
def report_json(self, min_deckies: int = 2) -> dict:
"""Serialisable dict representation of all traversals."""
return {
"stats": {
"lines_parsed": self.lines_parsed,
"events_indexed": self.events_indexed,
"unique_ips": len(self._events),
"traversals": len(self.traversals(min_deckies)),
},
"traversals": [t.to_dict() for t in self.traversals(min_deckies)],
}
def traversal_syslog_lines(self, min_deckies: int = 2) -> list[str]:
"""
Emit one RFC 5424 syslog line per detected traversal.
Useful for forwarding correlation findings back to the SIEM alongside
the raw service events.
"""
lines: list[str] = []
for t in self.traversals(min_deckies):
line = format_rfc5424(
service="correlator",
hostname="decnet-correlator",
event_type="traversal_detected",
severity=SEVERITY_WARNING,
attacker_ip=t.attacker_ip,
decky_count=str(t.decky_count),
deckies=",".join(t.deckies),
first_seen=t.first_seen.isoformat(),
last_seen=t.last_seen.isoformat(),
hop_count=str(len(t.hops)),
duration_s=str(int(t.duration_seconds)),
)
lines.append(line)
return lines
# ------------------------------------------------------------------ #
# Helpers #
# ------------------------------------------------------------------ #
def _fmt_duration(seconds: float) -> str:
if seconds < 60:
return f"{seconds:.0f}s"
if seconds < 3600:
return f"{seconds / 60:.1f}m"
return f"{seconds / 3600:.1f}h"

View File

@@ -0,0 +1,84 @@
"""
Traversal graph data types for the DECNET correlation engine.
An AttackerTraversal represents one attacker IP's movement across multiple
deckies. Hops are ordered chronologically; the traversal path is derived
by reading the unique decky sequence from the hop list.
"""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
@dataclass
class TraversalHop:
"""A single event in an attacker's traversal through the deception network."""
timestamp: datetime
decky: str # decky node name (e.g. "decky-01")
service: str # service that logged the event (e.g. "ssh", "http")
event_type: str # MSGID from the log line (e.g. "login_attempt")
@dataclass
class AttackerTraversal:
"""
All activity from a single attacker IP across two or more deckies,
sorted in chronological order.
"""
attacker_ip: str
hops: list[TraversalHop] # chronologically sorted
@property
def first_seen(self) -> datetime:
return self.hops[0].timestamp
@property
def last_seen(self) -> datetime:
return self.hops[-1].timestamp
@property
def duration_seconds(self) -> float:
return (self.last_seen - self.first_seen).total_seconds()
@property
def deckies(self) -> list[str]:
"""Unique deckies touched, preserving first-contact order."""
seen: list[str] = []
for hop in self.hops:
if hop.decky not in seen:
seen.append(hop.decky)
return seen
@property
def decky_count(self) -> int:
return len(set(h.decky for h in self.hops))
@property
def path(self) -> str:
"""Human-readable traversal path: decky-01 → decky-03 → decky-07"""
return "".join(self.deckies)
def to_dict(self) -> dict:
return {
"attacker_ip": self.attacker_ip,
"decky_count": self.decky_count,
"deckies": self.deckies,
"path": self.path,
"first_seen": self.first_seen.isoformat(),
"last_seen": self.last_seen.isoformat(),
"duration_seconds": self.duration_seconds,
"hop_count": len(self.hops),
"hops": [
{
"timestamp": h.timestamp.isoformat(),
"decky": h.decky,
"service": h.service,
"event_type": h.event_type,
}
for h in self.hops
],
}

View File

@@ -0,0 +1,112 @@
"""
RFC 5424 log line parser for the DECNET correlation engine.
Parses log lines produced by decnet service containers and extracts
the fields needed for cross-decky correlation: attacker IP, decky name,
service, event type, and timestamp.
Log format (produced by decnet.logging.syslog_formatter):
<PRI>1 TIMESTAMP HOSTNAME APP-NAME - MSGID [decnet@55555 k1="v1" k2="v2"] [MSG]
The attacker IP may appear under several field names depending on service:
src_ip — ftp, smtp, http, most services
src — mssql (legacy)
client_ip, remote_ip, ip — future / third-party services
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from datetime import datetime
# RFC 5424 line structure
_RFC5424_RE = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
r"(\S+) " # 3: APP-NAME (service)
r"- " # PROCID always NILVALUE
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
# Structured data block: [decnet@55555 k="v" ...]
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
# Individual param: key="value" (with escaped chars inside value)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
# Field names to probe for attacker IP, in priority order
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
@dataclass
class LogEvent:
"""A single parsed event from a DECNET syslog line."""
timestamp: datetime
decky: str # HOSTNAME field — the decky node name
service: str # APP-NAME — which honeypot service
event_type: str # MSGID — what happened (connection, login_attempt, …)
attacker_ip: str | None # extracted from SD params; None if not present
fields: dict[str, str] # all structured data params
raw: str # original log line (stripped)
def _parse_sd_params(sd_rest: str) -> dict[str, str]:
"""Extract key=value pairs from the SD element portion of a log line."""
block = _SD_BLOCK_RE.search(sd_rest)
if not block:
return {}
params: dict[str, str] = {}
for key, val in _PARAM_RE.findall(block.group(1)):
# Unescape RFC 5424 SD-PARAM-VALUE escapes
params[key] = val.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
return params
def _extract_attacker_ip(fields: dict[str, str]) -> str | None:
for fname in _IP_FIELDS:
if fname in fields:
return fields[fname]
return None
def parse_line(line: str) -> LogEvent | None:
"""
Parse a single RFC 5424 DECNET syslog line into a LogEvent.
Returns None for blank lines, non-DECNET lines, or lines missing
the required RFC 5424 header fields.
"""
line = line.strip()
if not line:
return None
m = _RFC5424_RE.match(line)
if not m:
return None
ts_raw, decky, service, event_type, sd_rest = m.groups()
if decky == "-" or service == "-":
return None
try:
timestamp = datetime.fromisoformat(ts_raw)
except ValueError:
return None
fields = _parse_sd_params(sd_rest)
attacker_ip = _extract_attacker_ip(fields)
return LogEvent(
timestamp=timestamp,
decky=decky,
service=service,
event_type=event_type,
attacker_ip=attacker_ip,
fields=fields,
raw=line,
)