diff --git a/decnet/cli.py b/decnet/cli.py index c65a303..9d3de7d 100644 --- a/decnet/cli.py +++ b/decnet/cli.py @@ -388,6 +388,61 @@ def list_distros() -> None: console.print(table) +@app.command(name="correlate") +def correlate( + log_file: Optional[str] = typer.Option(None, "--log-file", "-f", help="Path to DECNET syslog file to analyse"), + min_deckies: int = typer.Option(2, "--min-deckies", "-m", help="Minimum number of distinct deckies an IP must touch to be reported"), + output: str = typer.Option("table", "--output", "-o", help="Output format: table | json | syslog"), + emit_syslog: bool = typer.Option(False, "--emit-syslog", help="Also print traversal events as RFC 5424 lines (for SIEM piping)"), +) -> None: + """Analyse logs for cross-decky traversals and print the attacker movement graph.""" + import sys + import json as _json + from pathlib import Path + from decnet.correlation.engine import CorrelationEngine + + engine = CorrelationEngine() + + if log_file: + path = Path(log_file) + if not path.exists(): + console.print(f"[red]Log file not found: {log_file}[/]") + raise typer.Exit(1) + engine.ingest_file(path) + elif not sys.stdin.isatty(): + for line in sys.stdin: + engine.ingest(line) + else: + console.print("[red]Provide --log-file or pipe log data via stdin.[/]") + raise typer.Exit(1) + + traversals = engine.traversals(min_deckies) + + if output == "json": + console.print_json(_json.dumps(engine.report_json(min_deckies), indent=2)) + elif output == "syslog": + for line in engine.traversal_syslog_lines(min_deckies): + typer.echo(line) + else: + if not traversals: + console.print( + f"[yellow]No traversals detected " + f"(min_deckies={min_deckies}, events_indexed={engine.events_indexed}).[/]" + ) + else: + console.print(engine.report_table(min_deckies)) + console.print( + f"[dim]Parsed {engine.lines_parsed} lines · " + f"indexed {engine.events_indexed} events · " + f"{len(engine.all_attackers())} unique IPs · " + f"[bold]{len(traversals)}[/] traversal(s)[/]" + ) + + if emit_syslog: + for line in engine.traversal_syslog_lines(min_deckies): + typer.echo(line) + + @app.command(name="archetypes") def list_archetypes() -> None: """List all machine archetype profiles.""" diff --git a/decnet/correlation/__init__.py b/decnet/correlation/__init__.py new file mode 100644 index 0000000..1018556 --- /dev/null +++ b/decnet/correlation/__init__.py @@ -0,0 +1,13 @@ +"""Cross-decky correlation engine for DECNET.""" + +from decnet.correlation.engine import CorrelationEngine +from decnet.correlation.graph import AttackerTraversal, TraversalHop +from decnet.correlation.parser import LogEvent, parse_line + +__all__ = [ + "CorrelationEngine", + "AttackerTraversal", + "TraversalHop", + "LogEvent", + "parse_line", +] diff --git a/decnet/correlation/engine.py b/decnet/correlation/engine.py new file mode 100644 index 0000000..3e17d4d --- /dev/null +++ b/decnet/correlation/engine.py @@ -0,0 +1,186 @@ +""" +Cross-decky correlation engine. + +Ingests RFC 5424 syslog lines from DECNET service containers and identifies +attackers that have touched more than one decky — indicating lateral movement +or an active sweep through the deception network. + +Core concept +------------ +Every log event that carries a source IP is indexed by that IP. Once ingestion +is complete, ``traversals()`` returns the subset of IPs that hit at least +``min_deckies`` distinct deckies, along with the full chronological hop list +for each one. + +Usage +----- + engine = CorrelationEngine() + engine.ingest_file(Path("/var/log/decnet/decnet.log")) + for t in engine.traversals(): + print(t.path, t.decky_count) +""" + +from __future__ import annotations + +import json +from collections import defaultdict +from pathlib import Path + +from rich.table import Table + +from decnet.correlation.graph import AttackerTraversal, TraversalHop +from decnet.correlation.parser import LogEvent, parse_line +from decnet.logging.syslog_formatter import ( + SEVERITY_WARNING, + format_rfc5424, +) + + +class CorrelationEngine: + def __init__(self) -> None: + # attacker_ip → chronological list of events (only events with an IP) + self._events: dict[str, list[LogEvent]] = defaultdict(list) + # Total lines parsed (including no-IP and non-DECNET lines) + self.lines_parsed: int = 0 + # Total events indexed (had an attacker_ip) + self.events_indexed: int = 0 + + # ------------------------------------------------------------------ # + # Ingestion # + # ------------------------------------------------------------------ # + + def ingest(self, line: str) -> LogEvent | None: + """ + Parse and index one log line. + + Returns the parsed LogEvent (even if it has no attacker IP), or + None if the line is blank / not RFC 5424. + """ + self.lines_parsed += 1 + event = parse_line(line) + if event is None: + return None + if event.attacker_ip: + self._events[event.attacker_ip].append(event) + self.events_indexed += 1 + return event + + def ingest_file(self, path: Path) -> int: + """ + Parse every line of *path* and index it. + + Returns the number of events that had an attacker IP. + """ + with open(path) as fh: + for line in fh: + self.ingest(line) + return self.events_indexed + + # ------------------------------------------------------------------ # + # Query # + # ------------------------------------------------------------------ # + + def traversals(self, min_deckies: int = 2) -> list[AttackerTraversal]: + """ + Return all attackers that touched at least *min_deckies* distinct + deckies, sorted by first-seen time. + """ + result: list[AttackerTraversal] = [] + for ip, events in self._events.items(): + if len({e.decky for e in events}) < min_deckies: + continue + hops = sorted( + (TraversalHop(e.timestamp, e.decky, e.service, e.event_type) + for e in events), + key=lambda h: h.timestamp, + ) + result.append(AttackerTraversal(attacker_ip=ip, hops=hops)) + return sorted(result, key=lambda t: t.first_seen) + + def all_attackers(self) -> dict[str, int]: + """Return {attacker_ip: event_count} for every IP seen, sorted by count desc.""" + return dict( + sorted( + {ip: len(evts) for ip, evts in self._events.items()}.items(), + key=lambda kv: kv[1], + reverse=True, + ) + ) + + # ------------------------------------------------------------------ # + # Reporting # + # ------------------------------------------------------------------ # + + def report_table(self, min_deckies: int = 2) -> Table: + """Rich table showing every cross-decky traversal.""" + table = Table( + title="[bold red]Traversal Graph — Cross-Decky Attackers[/]", + show_lines=True, + ) + table.add_column("Attacker IP", style="bold red") + table.add_column("Deckies", style="cyan", justify="right") + table.add_column("Traversal Path", style="yellow") + table.add_column("First Seen", style="dim") + table.add_column("Duration", justify="right") + table.add_column("Events", justify="right") + + for t in self.traversals(min_deckies): + dur = _fmt_duration(t.duration_seconds) + table.add_row( + t.attacker_ip, + str(t.decky_count), + t.path, + t.first_seen.strftime("%Y-%m-%d %H:%M:%S UTC"), + dur, + str(len(t.hops)), + ) + return table + + def report_json(self, min_deckies: int = 2) -> dict: + """Serialisable dict representation of all traversals.""" + return { + "stats": { + "lines_parsed": self.lines_parsed, + "events_indexed": self.events_indexed, + "unique_ips": len(self._events), + "traversals": len(self.traversals(min_deckies)), + }, + "traversals": [t.to_dict() for t in self.traversals(min_deckies)], + } + + def traversal_syslog_lines(self, min_deckies: int = 2) -> list[str]: + """ + Emit one RFC 5424 syslog line per detected traversal. + + Useful for forwarding correlation findings back to the SIEM alongside + the raw service events. + """ + lines: list[str] = [] + for t in self.traversals(min_deckies): + line = format_rfc5424( + service="correlator", + hostname="decnet-correlator", + event_type="traversal_detected", + severity=SEVERITY_WARNING, + attacker_ip=t.attacker_ip, + decky_count=str(t.decky_count), + deckies=",".join(t.deckies), + first_seen=t.first_seen.isoformat(), + last_seen=t.last_seen.isoformat(), + hop_count=str(len(t.hops)), + duration_s=str(int(t.duration_seconds)), + ) + lines.append(line) + return lines + + +# ------------------------------------------------------------------ # +# Helpers # +# ------------------------------------------------------------------ # + +def _fmt_duration(seconds: float) -> str: + if seconds < 60: + return f"{seconds:.0f}s" + if seconds < 3600: + return f"{seconds / 60:.1f}m" + return f"{seconds / 3600:.1f}h" diff --git a/decnet/correlation/graph.py b/decnet/correlation/graph.py new file mode 100644 index 0000000..d24f0dd --- /dev/null +++ b/decnet/correlation/graph.py @@ -0,0 +1,84 @@ +""" +Traversal graph data types for the DECNET correlation engine. + +An AttackerTraversal represents one attacker IP's movement across multiple +deckies. Hops are ordered chronologically; the traversal path is derived +by reading the unique decky sequence from the hop list. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime + + +@dataclass +class TraversalHop: + """A single event in an attacker's traversal through the deception network.""" + + timestamp: datetime + decky: str # decky node name (e.g. "decky-01") + service: str # service that logged the event (e.g. "ssh", "http") + event_type: str # MSGID from the log line (e.g. "login_attempt") + + +@dataclass +class AttackerTraversal: + """ + All activity from a single attacker IP across two or more deckies, + sorted in chronological order. + """ + + attacker_ip: str + hops: list[TraversalHop] # chronologically sorted + + @property + def first_seen(self) -> datetime: + return self.hops[0].timestamp + + @property + def last_seen(self) -> datetime: + return self.hops[-1].timestamp + + @property + def duration_seconds(self) -> float: + return (self.last_seen - self.first_seen).total_seconds() + + @property + def deckies(self) -> list[str]: + """Unique deckies touched, preserving first-contact order.""" + seen: list[str] = [] + for hop in self.hops: + if hop.decky not in seen: + seen.append(hop.decky) + return seen + + @property + def decky_count(self) -> int: + return len(set(h.decky for h in self.hops)) + + @property + def path(self) -> str: + """Human-readable traversal path: decky-01 → decky-03 → decky-07""" + return " → ".join(self.deckies) + + def to_dict(self) -> dict: + return { + "attacker_ip": self.attacker_ip, + "decky_count": self.decky_count, + "deckies": self.deckies, + "path": self.path, + "first_seen": self.first_seen.isoformat(), + "last_seen": self.last_seen.isoformat(), + "duration_seconds": self.duration_seconds, + "hop_count": len(self.hops), + "hops": [ + { + "timestamp": h.timestamp.isoformat(), + "decky": h.decky, + "service": h.service, + "event_type": h.event_type, + } + for h in self.hops + ], + } diff --git a/decnet/correlation/parser.py b/decnet/correlation/parser.py new file mode 100644 index 0000000..927d0c8 --- /dev/null +++ b/decnet/correlation/parser.py @@ -0,0 +1,112 @@ +""" +RFC 5424 log line parser for the DECNET correlation engine. + +Parses log lines produced by decnet service containers and extracts +the fields needed for cross-decky correlation: attacker IP, decky name, +service, event type, and timestamp. + +Log format (produced by decnet.logging.syslog_formatter): + 1 TIMESTAMP HOSTNAME APP-NAME - MSGID [decnet@55555 k1="v1" k2="v2"] [MSG] + +The attacker IP may appear under several field names depending on service: + src_ip — ftp, smtp, http, most services + src — mssql (legacy) + client_ip, remote_ip, ip — future / third-party services +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from datetime import datetime + +# RFC 5424 line structure +_RFC5424_RE = re.compile( + r"^<\d+>1 " + r"(\S+) " # 1: TIMESTAMP + r"(\S+) " # 2: HOSTNAME (decky name) + r"(\S+) " # 3: APP-NAME (service) + r"- " # PROCID always NILVALUE + r"(\S+) " # 4: MSGID (event_type) + r"(.+)$", # 5: SD element + optional MSG +) + +# Structured data block: [decnet@55555 k="v" ...] +_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL) + +# Individual param: key="value" (with escaped chars inside value) +_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"') + +# Field names to probe for attacker IP, in priority order +_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip") + + +@dataclass +class LogEvent: + """A single parsed event from a DECNET syslog line.""" + + timestamp: datetime + decky: str # HOSTNAME field — the decky node name + service: str # APP-NAME — which honeypot service + event_type: str # MSGID — what happened (connection, login_attempt, …) + attacker_ip: str | None # extracted from SD params; None if not present + fields: dict[str, str] # all structured data params + raw: str # original log line (stripped) + + +def _parse_sd_params(sd_rest: str) -> dict[str, str]: + """Extract key=value pairs from the SD element portion of a log line.""" + block = _SD_BLOCK_RE.search(sd_rest) + if not block: + return {} + params: dict[str, str] = {} + for key, val in _PARAM_RE.findall(block.group(1)): + # Unescape RFC 5424 SD-PARAM-VALUE escapes + params[key] = val.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]") + return params + + +def _extract_attacker_ip(fields: dict[str, str]) -> str | None: + for fname in _IP_FIELDS: + if fname in fields: + return fields[fname] + return None + + +def parse_line(line: str) -> LogEvent | None: + """ + Parse a single RFC 5424 DECNET syslog line into a LogEvent. + + Returns None for blank lines, non-DECNET lines, or lines missing + the required RFC 5424 header fields. + """ + line = line.strip() + if not line: + return None + + m = _RFC5424_RE.match(line) + if not m: + return None + + ts_raw, decky, service, event_type, sd_rest = m.groups() + + if decky == "-" or service == "-": + return None + + try: + timestamp = datetime.fromisoformat(ts_raw) + except ValueError: + return None + + fields = _parse_sd_params(sd_rest) + attacker_ip = _extract_attacker_ip(fields) + + return LogEvent( + timestamp=timestamp, + decky=decky, + service=service, + event_type=event_type, + attacker_ip=attacker_ip, + fields=fields, + raw=line, + ) diff --git a/tests/test_correlation.py b/tests/test_correlation.py new file mode 100644 index 0000000..3194284 --- /dev/null +++ b/tests/test_correlation.py @@ -0,0 +1,420 @@ +""" +Tests for the DECNET cross-decky correlation engine. + +Covers: +- RFC 5424 line parsing (parser.py) +- Traversal graph data types (graph.py) +- CorrelationEngine ingestion, querying, and reporting (engine.py) +""" + +from __future__ import annotations + +import json +import re +from datetime import datetime, timezone +from pathlib import Path + +import pytest + +from decnet.correlation.parser import LogEvent, parse_line +from decnet.correlation.graph import AttackerTraversal, TraversalHop +from decnet.correlation.engine import CorrelationEngine, _fmt_duration +from decnet.logging.syslog_formatter import format_rfc5424, SEVERITY_INFO, SEVERITY_WARNING + +# --------------------------------------------------------------------------- +# Fixtures & helpers +# --------------------------------------------------------------------------- + +_TS = "2026-04-04T10:00:00+00:00" +_TS2 = "2026-04-04T10:05:00+00:00" +_TS3 = "2026-04-04T10:10:00+00:00" + + +def _make_line( + service: str = "http", + hostname: str = "decky-01", + event_type: str = "connection", + src_ip: str = "1.2.3.4", + timestamp: str = _TS, + extra_fields: dict | None = None, +) -> str: + """Build a real RFC 5424 DECNET syslog line via the formatter.""" + fields = {} + if src_ip: + fields["src_ip"] = src_ip + if extra_fields: + fields.update(extra_fields) + return format_rfc5424( + service=service, + hostname=hostname, + event_type=event_type, + severity=SEVERITY_INFO, + timestamp=datetime.fromisoformat(timestamp), + **fields, + ) + + +def _make_line_src(hostname: str, src: str, timestamp: str = _TS) -> str: + """Build a line that uses `src` instead of `src_ip` (mssql style).""" + return format_rfc5424( + service="mssql", + hostname=hostname, + event_type="unknown_packet", + severity=SEVERITY_INFO, + timestamp=datetime.fromisoformat(timestamp), + src=src, + ) + + +# --------------------------------------------------------------------------- +# parser.py — parse_line +# --------------------------------------------------------------------------- + +class TestParserBasic: + def test_returns_none_for_blank(self): + assert parse_line("") is None + assert parse_line(" ") is None + + def test_returns_none_for_non_rfc5424(self): + assert parse_line("this is not a syslog line") is None + assert parse_line("Jan 1 00:00:00 host sshd: blah") is None + + def test_returns_log_event(self): + event = parse_line(_make_line()) + assert isinstance(event, LogEvent) + + def test_hostname_extracted(self): + event = parse_line(_make_line(hostname="decky-07")) + assert event.decky == "decky-07" + + def test_service_extracted(self): + event = parse_line(_make_line(service="ftp")) + assert event.service == "ftp" + + def test_event_type_extracted(self): + event = parse_line(_make_line(event_type="login_attempt")) + assert event.event_type == "login_attempt" + + def test_timestamp_parsed(self): + event = parse_line(_make_line(timestamp=_TS)) + assert event.timestamp == datetime.fromisoformat(_TS) + + def test_raw_line_preserved(self): + line = _make_line() + event = parse_line(line) + assert event.raw == line.strip() + + +class TestParserAttackerIP: + def test_src_ip_field(self): + event = parse_line(_make_line(src_ip="10.0.0.1")) + assert event.attacker_ip == "10.0.0.1" + + def test_src_field_fallback(self): + """mssql logs use `src` instead of `src_ip`.""" + event = parse_line(_make_line_src("decky-win", "192.168.1.5")) + assert event.attacker_ip == "192.168.1.5" + + def test_no_ip_field_gives_none(self): + line = format_rfc5424("http", "decky-01", "startup", SEVERITY_INFO) + event = parse_line(line) + assert event is not None + assert event.attacker_ip is None + + def test_extra_fields_in_dict(self): + event = parse_line(_make_line(extra_fields={"username": "root", "password": "admin"})) + assert event.fields["username"] == "root" + assert event.fields["password"] == "admin" + + def test_src_ip_priority_over_src(self): + """src_ip should win when both are present.""" + line = format_rfc5424( + "mssql", "decky-01", "evt", SEVERITY_INFO, + timestamp=datetime.fromisoformat(_TS), + src_ip="1.1.1.1", + src="2.2.2.2", + ) + event = parse_line(line) + assert event.attacker_ip == "1.1.1.1" + + def test_sd_escape_chars_decoded(self): + """Escaped characters in SD values should be unescaped.""" + line = format_rfc5424( + "http", "decky-01", "evt", SEVERITY_INFO, + timestamp=datetime.fromisoformat(_TS), + src_ip="1.2.3.4", + path='/search?q=a"b', + ) + event = parse_line(line) + assert '"' in event.fields["path"] + + def test_nilvalue_hostname_skipped(self): + line = format_rfc5424("-", "decky-01", "evt", SEVERITY_INFO) + assert parse_line(line) is None + + def test_nilvalue_service_skipped(self): + line = format_rfc5424("http", "-", "evt", SEVERITY_INFO) + assert parse_line(line) is None + + +# --------------------------------------------------------------------------- +# graph.py — AttackerTraversal +# --------------------------------------------------------------------------- + +def _make_traversal(ip: str, hops_spec: list[tuple]) -> AttackerTraversal: + """hops_spec: list of (ts_str, decky, service, event_type)""" + hops = [ + TraversalHop( + timestamp=datetime.fromisoformat(ts), + decky=decky, + service=svc, + event_type=evt, + ) + for ts, decky, svc, evt in hops_spec + ] + return AttackerTraversal(attacker_ip=ip, hops=hops) + + +class TestTraversalGraph: + def setup_method(self): + self.t = _make_traversal("5.6.7.8", [ + (_TS, "decky-01", "ssh", "login_attempt"), + (_TS2, "decky-03", "http", "request"), + (_TS3, "decky-05", "ftp", "auth_attempt"), + ]) + + def test_first_seen(self): + assert self.t.first_seen == datetime.fromisoformat(_TS) + + def test_last_seen(self): + assert self.t.last_seen == datetime.fromisoformat(_TS3) + + def test_duration_seconds(self): + assert self.t.duration_seconds == 600.0 + + def test_deckies_ordered(self): + assert self.t.deckies == ["decky-01", "decky-03", "decky-05"] + + def test_decky_count(self): + assert self.t.decky_count == 3 + + def test_path_string(self): + assert self.t.path == "decky-01 → decky-03 → decky-05" + + def test_to_dict_keys(self): + d = self.t.to_dict() + assert d["attacker_ip"] == "5.6.7.8" + assert d["decky_count"] == 3 + assert d["hop_count"] == 3 + assert len(d["hops"]) == 3 + assert d["path"] == "decky-01 → decky-03 → decky-05" + + def test_to_dict_hops_structure(self): + hop = self.t.to_dict()["hops"][0] + assert set(hop.keys()) == {"timestamp", "decky", "service", "event_type"} + + def test_repeated_decky_not_double_counted_in_path(self): + t = _make_traversal("1.1.1.1", [ + (_TS, "decky-01", "ssh", "conn"), + (_TS2, "decky-02", "ftp", "conn"), + (_TS3, "decky-01", "ssh", "conn"), # revisit + ]) + assert t.deckies == ["decky-01", "decky-02"] + assert t.decky_count == 2 + + +# --------------------------------------------------------------------------- +# engine.py — CorrelationEngine +# --------------------------------------------------------------------------- + +class TestEngineIngestion: + def test_ingest_returns_event(self): + engine = CorrelationEngine() + evt = engine.ingest(_make_line()) + assert evt is not None + + def test_ingest_blank_returns_none(self): + engine = CorrelationEngine() + assert engine.ingest("") is None + + def test_lines_parsed_counter(self): + engine = CorrelationEngine() + engine.ingest(_make_line()) + engine.ingest("garbage") + assert engine.lines_parsed == 2 + + def test_events_indexed_counter(self): + engine = CorrelationEngine() + engine.ingest(_make_line(src_ip="1.2.3.4")) + engine.ingest(_make_line(src_ip="")) # no IP + assert engine.events_indexed == 1 + + def test_ingest_file(self, tmp_path): + log = tmp_path / "decnet.log" + lines = [ + _make_line("ssh", "decky-01", "conn", "10.0.0.1", _TS), + _make_line("http", "decky-02", "req", "10.0.0.1", _TS2), + _make_line("ftp", "decky-03", "auth", "10.0.0.1", _TS3), + ] + log.write_text("\n".join(lines)) + engine = CorrelationEngine() + count = engine.ingest_file(log) + assert count == 3 + + +class TestEngineTraversals: + def _engine_with(self, specs: list[tuple]) -> CorrelationEngine: + """specs: (service, decky, event_type, src_ip, timestamp)""" + engine = CorrelationEngine() + for svc, decky, evt, ip, ts in specs: + engine.ingest(_make_line(svc, decky, evt, ip, ts)) + return engine + + def test_single_decky_not_a_traversal(self): + engine = self._engine_with([ + ("ssh", "decky-01", "conn", "1.1.1.1", _TS), + ("ssh", "decky-01", "conn", "1.1.1.1", _TS2), + ]) + assert engine.traversals() == [] + + def test_two_deckies_is_traversal(self): + engine = self._engine_with([ + ("ssh", "decky-01", "conn", "1.1.1.1", _TS), + ("http", "decky-02", "req", "1.1.1.1", _TS2), + ]) + t = engine.traversals() + assert len(t) == 1 + assert t[0].attacker_ip == "1.1.1.1" + assert t[0].decky_count == 2 + + def test_min_deckies_filter(self): + engine = self._engine_with([ + ("ssh", "decky-01", "conn", "1.1.1.1", _TS), + ("http", "decky-02", "req", "1.1.1.1", _TS2), + ("ftp", "decky-03", "auth", "1.1.1.1", _TS3), + ]) + assert len(engine.traversals(min_deckies=3)) == 1 + assert len(engine.traversals(min_deckies=4)) == 0 + + def test_multiple_attackers_separate_traversals(self): + engine = self._engine_with([ + ("ssh", "decky-01", "conn", "1.1.1.1", _TS), + ("http", "decky-02", "req", "1.1.1.1", _TS2), + ("ssh", "decky-03", "conn", "9.9.9.9", _TS), + ("ftp", "decky-04", "auth", "9.9.9.9", _TS2), + ]) + traversals = engine.traversals() + assert len(traversals) == 2 + ips = {t.attacker_ip for t in traversals} + assert ips == {"1.1.1.1", "9.9.9.9"} + + def test_traversals_sorted_by_first_seen(self): + engine = self._engine_with([ + ("ssh", "decky-01", "conn", "9.9.9.9", _TS2), # later + ("ftp", "decky-02", "auth", "9.9.9.9", _TS3), + ("http", "decky-03", "req", "1.1.1.1", _TS), # earlier + ("smb", "decky-04", "auth", "1.1.1.1", _TS2), + ]) + traversals = engine.traversals() + assert traversals[0].attacker_ip == "1.1.1.1" + assert traversals[1].attacker_ip == "9.9.9.9" + + def test_hops_ordered_chronologically(self): + engine = self._engine_with([ + ("ftp", "decky-02", "auth", "5.5.5.5", _TS2), # ingested first but later ts + ("ssh", "decky-01", "conn", "5.5.5.5", _TS), + ]) + t = engine.traversals()[0] + assert t.hops[0].decky == "decky-01" + assert t.hops[1].decky == "decky-02" + + def test_all_attackers(self): + engine = self._engine_with([ + ("ssh", "decky-01", "conn", "1.1.1.1", _TS), + ("ssh", "decky-01", "conn", "1.1.1.1", _TS2), + ("ssh", "decky-01", "conn", "2.2.2.2", _TS), + ]) + attackers = engine.all_attackers() + assert attackers["1.1.1.1"] == 2 + assert attackers["2.2.2.2"] == 1 + + def test_mssql_src_field_correlated(self): + """Verify that `src=` (mssql style) is picked up for cross-decky correlation.""" + engine = CorrelationEngine() + engine.ingest(_make_line_src("decky-win1", "10.10.10.5", _TS)) + engine.ingest(_make_line_src("decky-win2", "10.10.10.5", _TS2)) + t = engine.traversals() + assert len(t) == 1 + assert t[0].decky_count == 2 + + +class TestEngineReporting: + def _two_decky_engine(self) -> CorrelationEngine: + engine = CorrelationEngine() + engine.ingest(_make_line("ssh", "decky-01", "conn", "3.3.3.3", _TS)) + engine.ingest(_make_line("http", "decky-02", "req", "3.3.3.3", _TS2)) + return engine + + def test_report_json_structure(self): + engine = self._two_decky_engine() + report = engine.report_json() + assert "stats" in report + assert "traversals" in report + assert report["stats"]["traversals"] == 1 + t = report["traversals"][0] + assert t["attacker_ip"] == "3.3.3.3" + assert t["decky_count"] == 2 + + def test_report_json_serialisable(self): + engine = self._two_decky_engine() + # Should not raise + json.dumps(engine.report_json()) + + def test_report_table_returns_rich_table(self): + from rich.table import Table + engine = self._two_decky_engine() + table = engine.report_table() + assert isinstance(table, Table) + + def test_traversal_syslog_lines_count(self): + engine = self._two_decky_engine() + lines = engine.traversal_syslog_lines() + assert len(lines) == 1 + + def test_traversal_syslog_line_is_rfc5424(self): + engine = self._two_decky_engine() + line = engine.traversal_syslog_lines()[0] + # Must match RFC 5424 header + assert re.match(r"^<\d+>1 \S+ \S+ correlator - traversal_detected", line) + + def test_traversal_syslog_contains_attacker_ip(self): + engine = self._two_decky_engine() + line = engine.traversal_syslog_lines()[0] + assert "3.3.3.3" in line + + def test_traversal_syslog_severity_is_warning(self): + engine = self._two_decky_engine() + line = engine.traversal_syslog_lines()[0] + pri = int(re.match(r"^<(\d+)>", line).group(1)) + assert pri == 16 * 8 + SEVERITY_WARNING # local0 + warning + + def test_no_traversals_empty_json(self): + engine = CorrelationEngine() + engine.ingest(_make_line()) # single decky, no traversal + assert engine.report_json()["stats"]["traversals"] == 0 + assert engine.traversal_syslog_lines() == [] + + +# --------------------------------------------------------------------------- +# _fmt_duration helper +# --------------------------------------------------------------------------- + +class TestFmtDuration: + def test_seconds(self): + assert _fmt_duration(45) == "45s" + + def test_minutes(self): + assert _fmt_duration(90) == "1.5m" + + def test_hours(self): + assert _fmt_duration(7200) == "2.0h"