fix(correlation): prober events no longer count as attacker traversal

The prober writes events with hostname=decnet-prober and target_ip=
<the attacker being fingerprinted>. The parser pulls target_ip into
attacker_ip (it's one of _IP_FIELDS), which is correct for indexing
fingerprints under the attacker — but it had a side effect: every
fingerprinted attacker had two distinct deckies on file (the real
decoy they touched + decnet-prober) and the correlation engine's
traversals() classified that as lateral movement. Live dashboard
showed bogus "dmz-gateway -> decnet-prober" paths and TRAVERSAL
badges on attackers who'd done nothing but knock on the front door.

The prober is internal infrastructure, not a hop. Filter the
"decnet-" namespace out of distinct-decky counts and hop paths in
the engine. Fingerprints stay attached to the attacker profile via
the existing per-IP event index — just no longer as traversal.
This commit is contained in:
2026-04-27 23:02:23 -04:00
parent e03a6d10a0
commit 3c571cce5a
2 changed files with 59 additions and 2 deletions

View File

@@ -285,6 +285,38 @@ class TestEngineTraversals:
assert t[0].attacker_ip == "1.1.1.1"
assert t[0].decky_count == 2
def test_prober_event_does_not_count_as_traversal(self):
"""Hit live on first VPS deploy: every fingerprinted attacker
showed up as a 2-decky traversal because the prober's outbound
fingerprint events (decky=decnet-prober, target_ip=<attacker>)
got co-indexed with the attacker's actual decoy hops. The
prober is internal infrastructure, not a hop — its events
must not bump the distinct-decky count."""
engine = self._engine_with([
("ssh", "dmz-gateway", "conn", "1.1.1.1", _TS),
("ssh", "decnet-prober", "hassh_fingerprint", "1.1.1.1", _TS2),
])
# Only one *real* decky touched — no traversal.
assert engine.traversals() == []
def test_prober_excluded_from_traversal_path(self):
"""When a real traversal exists, the prober's hops must not
appear in the path or inflate the decky count."""
engine = self._engine_with([
("ssh", "dmz-gateway", "conn", "1.1.1.1", _TS),
("ssh", "decnet-prober", "hassh_fingerprint", "1.1.1.1", _TS2),
("http", "decky-internal", "req", "1.1.1.1", _TS3),
])
traversals = engine.traversals()
assert len(traversals) == 1
t = traversals[0]
assert t.decky_count == 2, (
f"prober should not inflate decky_count; got {t.decky_count}"
)
assert "decnet-prober" not in t.path, (
f"prober should not appear in traversal path; got {t.path!r}"
)
def test_min_deckies_filter(self):
engine = self._engine_with([
("ssh", "decky-01", "conn", "1.1.1.1", _TS),