From 9350ce195a0f1777d41cf1abfa4594989b8964cc Mon Sep 17 00:00:00 2001 From: anti Date: Mon, 27 Apr 2026 23:16:42 -0400 Subject: [PATCH] fix(collector,correlation): extract attacker IP from sshd/pam free-form prose Native sshd and pam_unix lines route through rsyslog without the relay@55555 SD wrapper and without key=value pairs, so attacker_ip fell through to "Unknown". Add a prose-IP fallback to both parsers: anchored patterns (from/rhost/client/src) win first so we never pick the local listener in "Connection from X port Y on Z port 22", with a bare-IPv4 scan as the last resort. --- decnet/collector/worker.py | 29 ++++++++++++++++++++++ decnet/correlation/parser.py | 30 +++++++++++++++++++++-- tests/collector/test_collector.py | 35 ++++++++++++++++++++++++++- tests/correlation/test_correlation.py | 26 ++++++++++++++++++++ 4 files changed, 117 insertions(+), 3 deletions(-) diff --git a/decnet/collector/worker.py b/decnet/collector/worker.py index 5ceb5e87..da548d29 100644 --- a/decnet/collector/worker.py +++ b/decnet/collector/worker.py @@ -140,6 +140,22 @@ _IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "remote_addr", "target_ # as one unit; we only care about IP-shaped fields here anyway. _MSG_KV_RE = re.compile(r'(\w+)=(\S+)') +# Native sshd / pam syslog lines arrive without an SD block and without +# key=value pairs. The remote address shows up as free prose: +# "Failed password for root from 1.2.3.4 port 42772 ssh2" +# "Connection from 1.2.3.4 port 42772 on 10.0.0.2 port 22" +# "pam_unix(sshd:auth): authentication failure; … rhost=1.2.3.4 user=root" +# Anchored patterns first so we never confuse the attacker with the +# local listener IP ("on 10.0.0.2"). Bare IP scan is the last resort. +_IPV4 = r"\d{1,3}(?:\.\d{1,3}){3}" +_IPV6 = r"[0-9a-fA-F:]+:[0-9a-fA-F:]+" +_IP = rf"(?:{_IPV4}|{_IPV6})" +_MSG_IP_ANCHORED_RE = re.compile( + rf"\b(?:from|rhost[:=]|client[:=]|src[:=])\s*({_IP})", + re.IGNORECASE, +) +_MSG_IP_BARE_RE = re.compile(rf"\b({_IPV4})\b") + def parse_rfc5424(line: str) -> Optional[dict[str, Any]]: """ @@ -186,6 +202,19 @@ def parse_rfc5424(line: str) -> Optional[dict[str, Any]]: attacker_ip = v break + # Final fallback for native syslog producers that emit free-form prose + # (notably sshd and pam_unix routed via rsyslog without the relay@55555 + # SD wrapper). Prefer anchored matches so the local listener address in + # "Connection from X port Y on Z port 22" never wins over X. + if attacker_ip == "Unknown" and msg: + anchored = _MSG_IP_ANCHORED_RE.search(msg) + if anchored: + attacker_ip = anchored.group(1) + else: + bare = _MSG_IP_BARE_RE.search(msg) + if bare: + attacker_ip = bare.group(1) + try: ts_formatted = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S") except ValueError: diff --git a/decnet/correlation/parser.py b/decnet/correlation/parser.py index cbf8195d..9740d490 100644 --- a/decnet/correlation/parser.py +++ b/decnet/correlation/parser.py @@ -41,6 +41,20 @@ _PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"') # Field names to probe for attacker IP, in priority order _IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "remote_addr", "target_ip", "ip") +# Native syslog producers (sshd, pam_unix routed through rsyslog) emit +# free prose with no SD block. Pull the remote address out of idiomatic +# anchors first ("from ", "rhost="), then fall back to the first +# IPv4 in the line. Anchored matches keep us from picking the local +# listener in "Connection from X port Y on Z port 22". +_IPV4 = r"\d{1,3}(?:\.\d{1,3}){3}" +_IPV6 = r"[0-9a-fA-F:]+:[0-9a-fA-F:]+" +_IP_RE = rf"(?:{_IPV4}|{_IPV6})" +_MSG_IP_ANCHORED_RE = re.compile( + rf"\b(?:from|rhost[:=]|client[:=]|src[:=])\s*({_IP_RE})", + re.IGNORECASE, +) +_MSG_IP_BARE_RE = re.compile(rf"\b({_IPV4})\b") + EventKind = Literal["attacker", "mutation"] @@ -76,10 +90,17 @@ def _parse_sd_params(sd_rest: str) -> dict[str, str]: return params -def _extract_attacker_ip(fields: dict[str, str]) -> str | None: +def _extract_attacker_ip(fields: dict[str, str], msg: str = "") -> str | None: for fname in _IP_FIELDS: if fname in fields: return fields[fname] + if msg: + anchored = _MSG_IP_ANCHORED_RE.search(msg) + if anchored: + return anchored.group(1) + bare = _MSG_IP_BARE_RE.search(msg) + if bare: + return bare.group(1) return None @@ -109,7 +130,12 @@ def parse_line(line: str) -> LogEvent | None: return None fields = _parse_sd_params(sd_rest) - attacker_ip = _extract_attacker_ip(fields) + if sd_rest.startswith("-"): + msg = sd_rest[1:].lstrip() + else: + tail = re.search(r'\]\s+(.+)$', sd_rest) + msg = tail.group(1).strip() if tail else "" + attacker_ip = _extract_attacker_ip(fields, msg) # Mutator-emitted transitions arrive on the same ingest stream but # belong in the substrate-state index, not the per-IP attacker one. diff --git a/tests/collector/test_collector.py b/tests/collector/test_collector.py index 2fb6f91b..a1ad8fcd 100644 --- a/tests/collector/test_collector.py +++ b/tests/collector/test_collector.py @@ -93,7 +93,40 @@ class TestParseRfc5424: assert result["decky"] == "omega-decky" assert result["service"] == "sshd" assert "Accepted password" in result["msg"] - assert result["attacker_ip"] == "Unknown" # no key=value in this msg + # Native sshd lines have no key=value; the prose fallback pulls + # the IP out of "from ". + assert result["attacker_ip"] == "192.168.1.5" + + def test_extracts_attacker_ip_from_sshd_prose(self): + """sshd routed via rsyslog emits free prose with no SD block and no + key=value pairs. The parser must still find the remote IP.""" + cases = [ + ( + "<38>1 2026-04-27T03:08:48+00:00 dmz-gateway sshd 940 - - " + "Failed password for root from 157.66.144.16 port 42772 ssh2", + "157.66.144.16", + ), + ( + "<38>1 2026-04-27T03:08:45+00:00 dmz-gateway sshd 940 - - " + "Connection from 157.66.144.16 port 42772 on 10.0.0.2 port 22 rdomain \"\"", + "157.66.144.16", # must beat the local listener 10.0.0.2 + ), + ( + "<38>1 2026-04-27T03:08:49+00:00 dmz-gateway sshd 940 - - " + "Connection closed by authenticating user root 157.66.144.16 port 42772 [preauth]", + "157.66.144.16", + ), + ( + "<38>1 2026-04-27T03:08:46+00:00 dmz-gateway sshd 940 - - " + "pam_unix(sshd:auth): authentication failure; " + "logname= uid=0 euid=0 tty=ssh ruser= rhost=157.66.144.16 user=root", + "157.66.144.16", + ), + ] + for line, expected in cases: + result = parse_rfc5424(line) + assert result is not None, line + assert result["attacker_ip"] == expected, (line, result["attacker_ip"]) def test_extracts_attacker_ip_from_msg_body_kv(self): """SSH container's bash PROMPT_COMMAND uses `logger -t bash "CMD ... src=IP ..."` diff --git a/tests/correlation/test_correlation.py b/tests/correlation/test_correlation.py index b1772201..5601c6f8 100644 --- a/tests/correlation/test_correlation.py +++ b/tests/correlation/test_correlation.py @@ -154,6 +154,32 @@ class TestParserAttackerIP: line = format_rfc5424("http", "-", "evt", SEVERITY_INFO) assert parse_line(line) is None + def test_attacker_ip_from_sshd_prose(self): + """sshd routed via rsyslog has no SD block — IP lives in free prose. + Anchored "from " must beat the local listener in + "Connection from X port Y on Z port 22".""" + cases = [ + ( + "<38>1 2026-04-27T03:08:48+00:00 dmz-gateway sshd - - - " + "Failed password for root from 157.66.144.16 port 42772 ssh2", + "157.66.144.16", + ), + ( + "<38>1 2026-04-27T03:08:45+00:00 dmz-gateway sshd - - - " + "Connection from 157.66.144.16 port 42772 on 10.0.0.2 port 22", + "157.66.144.16", + ), + ( + "<38>1 2026-04-27T03:08:46+00:00 dmz-gateway sshd - - - " + "pam_unix(sshd:auth): authentication failure; rhost=157.66.144.16 user=root", + "157.66.144.16", + ), + ] + for line, expected in cases: + event = parse_line(line) + assert event is not None, line + assert event.attacker_ip == expected, (line, event.attacker_ip) + # --------------------------------------------------------------------------- # graph.py — AttackerTraversal