fix(collector,correlation): extract attacker IP from sshd/pam free-form prose

Native sshd and pam_unix lines route through rsyslog without the
relay@55555 SD wrapper and without key=value pairs, so attacker_ip
fell through to "Unknown". Add a prose-IP fallback to both parsers:
anchored patterns (from/rhost/client/src) win first so we never pick
the local listener in "Connection from X port Y on Z port 22", with
a bare-IPv4 scan as the last resort.
This commit is contained in:
2026-04-27 23:16:42 -04:00
parent 3c571cce5a
commit 9350ce195a
4 changed files with 117 additions and 3 deletions

View File

@@ -140,6 +140,22 @@ _IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "remote_addr", "target_
# as one unit; we only care about IP-shaped fields here anyway.
_MSG_KV_RE = re.compile(r'(\w+)=(\S+)')
# Native sshd / pam syslog lines arrive without an SD block and without
# key=value pairs. The remote address shows up as free prose:
# "Failed password for root from 1.2.3.4 port 42772 ssh2"
# "Connection from 1.2.3.4 port 42772 on 10.0.0.2 port 22"
# "pam_unix(sshd:auth): authentication failure; … rhost=1.2.3.4 user=root"
# Anchored patterns first so we never confuse the attacker with the
# local listener IP ("on 10.0.0.2"). Bare IP scan is the last resort.
_IPV4 = r"\d{1,3}(?:\.\d{1,3}){3}"
_IPV6 = r"[0-9a-fA-F:]+:[0-9a-fA-F:]+"
_IP = rf"(?:{_IPV4}|{_IPV6})"
_MSG_IP_ANCHORED_RE = re.compile(
rf"\b(?:from|rhost[:=]|client[:=]|src[:=])\s*({_IP})",
re.IGNORECASE,
)
_MSG_IP_BARE_RE = re.compile(rf"\b({_IPV4})\b")
def parse_rfc5424(line: str) -> Optional[dict[str, Any]]:
"""
@@ -186,6 +202,19 @@ def parse_rfc5424(line: str) -> Optional[dict[str, Any]]:
attacker_ip = v
break
# Final fallback for native syslog producers that emit free-form prose
# (notably sshd and pam_unix routed via rsyslog without the relay@55555
# SD wrapper). Prefer anchored matches so the local listener address in
# "Connection from X port Y on Z port 22" never wins over X.
if attacker_ip == "Unknown" and msg:
anchored = _MSG_IP_ANCHORED_RE.search(msg)
if anchored:
attacker_ip = anchored.group(1)
else:
bare = _MSG_IP_BARE_RE.search(msg)
if bare:
attacker_ip = bare.group(1)
try:
ts_formatted = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:

View File

@@ -41,6 +41,20 @@ _PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
# Field names to probe for attacker IP, in priority order
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "remote_addr", "target_ip", "ip")
# Native syslog producers (sshd, pam_unix routed through rsyslog) emit
# free prose with no SD block. Pull the remote address out of idiomatic
# anchors first ("from <ip>", "rhost=<ip>"), then fall back to the first
# IPv4 in the line. Anchored matches keep us from picking the local
# listener in "Connection from X port Y on Z port 22".
_IPV4 = r"\d{1,3}(?:\.\d{1,3}){3}"
_IPV6 = r"[0-9a-fA-F:]+:[0-9a-fA-F:]+"
_IP_RE = rf"(?:{_IPV4}|{_IPV6})"
_MSG_IP_ANCHORED_RE = re.compile(
rf"\b(?:from|rhost[:=]|client[:=]|src[:=])\s*({_IP_RE})",
re.IGNORECASE,
)
_MSG_IP_BARE_RE = re.compile(rf"\b({_IPV4})\b")
EventKind = Literal["attacker", "mutation"]
@@ -76,10 +90,17 @@ def _parse_sd_params(sd_rest: str) -> dict[str, str]:
return params
def _extract_attacker_ip(fields: dict[str, str]) -> str | None:
def _extract_attacker_ip(fields: dict[str, str], msg: str = "") -> str | None:
for fname in _IP_FIELDS:
if fname in fields:
return fields[fname]
if msg:
anchored = _MSG_IP_ANCHORED_RE.search(msg)
if anchored:
return anchored.group(1)
bare = _MSG_IP_BARE_RE.search(msg)
if bare:
return bare.group(1)
return None
@@ -109,7 +130,12 @@ def parse_line(line: str) -> LogEvent | None:
return None
fields = _parse_sd_params(sd_rest)
attacker_ip = _extract_attacker_ip(fields)
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
else:
tail = re.search(r'\]\s+(.+)$', sd_rest)
msg = tail.group(1).strip() if tail else ""
attacker_ip = _extract_attacker_ip(fields, msg)
# Mutator-emitted transitions arrive on the same ingest stream but
# belong in the substrate-state index, not the per-IP attacker one.

View File

@@ -93,7 +93,40 @@ class TestParseRfc5424:
assert result["decky"] == "omega-decky"
assert result["service"] == "sshd"
assert "Accepted password" in result["msg"]
assert result["attacker_ip"] == "Unknown" # no key=value in this msg
# Native sshd lines have no key=value; the prose fallback pulls
# the IP out of "from <ip>".
assert result["attacker_ip"] == "192.168.1.5"
def test_extracts_attacker_ip_from_sshd_prose(self):
"""sshd routed via rsyslog emits free prose with no SD block and no
key=value pairs. The parser must still find the remote IP."""
cases = [
(
"<38>1 2026-04-27T03:08:48+00:00 dmz-gateway sshd 940 - - "
"Failed password for root from 157.66.144.16 port 42772 ssh2",
"157.66.144.16",
),
(
"<38>1 2026-04-27T03:08:45+00:00 dmz-gateway sshd 940 - - "
"Connection from 157.66.144.16 port 42772 on 10.0.0.2 port 22 rdomain \"\"",
"157.66.144.16", # must beat the local listener 10.0.0.2
),
(
"<38>1 2026-04-27T03:08:49+00:00 dmz-gateway sshd 940 - - "
"Connection closed by authenticating user root 157.66.144.16 port 42772 [preauth]",
"157.66.144.16",
),
(
"<38>1 2026-04-27T03:08:46+00:00 dmz-gateway sshd 940 - - "
"pam_unix(sshd:auth): authentication failure; "
"logname= uid=0 euid=0 tty=ssh ruser= rhost=157.66.144.16 user=root",
"157.66.144.16",
),
]
for line, expected in cases:
result = parse_rfc5424(line)
assert result is not None, line
assert result["attacker_ip"] == expected, (line, result["attacker_ip"])
def test_extracts_attacker_ip_from_msg_body_kv(self):
"""SSH container's bash PROMPT_COMMAND uses `logger -t bash "CMD ... src=IP ..."`

View File

@@ -154,6 +154,32 @@ class TestParserAttackerIP:
line = format_rfc5424("http", "-", "evt", SEVERITY_INFO)
assert parse_line(line) is None
def test_attacker_ip_from_sshd_prose(self):
"""sshd routed via rsyslog has no SD block — IP lives in free prose.
Anchored "from <ip>" must beat the local listener in
"Connection from X port Y on Z port 22"."""
cases = [
(
"<38>1 2026-04-27T03:08:48+00:00 dmz-gateway sshd - - - "
"Failed password for root from 157.66.144.16 port 42772 ssh2",
"157.66.144.16",
),
(
"<38>1 2026-04-27T03:08:45+00:00 dmz-gateway sshd - - - "
"Connection from 157.66.144.16 port 42772 on 10.0.0.2 port 22",
"157.66.144.16",
),
(
"<38>1 2026-04-27T03:08:46+00:00 dmz-gateway sshd - - - "
"pam_unix(sshd:auth): authentication failure; rhost=157.66.144.16 user=root",
"157.66.144.16",
),
]
for line, expected in cases:
event = parse_line(line)
assert event is not None, line
assert event.attacker_ip == expected, (line, event.attacker_ip)
# ---------------------------------------------------------------------------
# graph.py — AttackerTraversal