Files
DECNET/tests/test_prober_worker.py
anti 8dd4c78b33 refactor: strip DECNET tokens from container-visible surface
Rename the container-side logging module decnet_logging → syslog_bridge
(canonical at templates/syslog_bridge.py, synced into each template by
the deployer). Drop the stale per-template copies; setuptools find was
picking them up anyway. Swap useradd/USER/chown "decnet" for "logrelay"
so no obvious token appears in the rendered container image.

Apply the same cloaking pattern to the telnet template that SSH got:
syslog pipe moves to /run/systemd/journal/syslog-relay and the relay
is cat'd via exec -a "systemd-journal-fwd". rsyslog.d conf rename
99-decnet.conf → 50-journal-forward.conf. SSH capture script:
/var/decnet/captured → /var/lib/systemd/coredump (real systemd path),
logger tag decnet-capture → systemd-journal. Compose volume updated
to match the new in-container quarantine path.

SD element ID shifts decnet@55555 → relay@55555; synced across
collector, parser, sniffer, prober, formatter, tests, and docs so the
host-side pipeline still matches what containers emit.
2026-04-17 22:57:53 -04:00

481 lines
20 KiB
Python

"""
Tests for the prober worker — target discovery from the log stream and
probe cycle behavior (JARM, HASSH, TCP/IP fingerprinting).
"""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from decnet.prober.jarm import JARM_EMPTY_HASH
from decnet.prober.worker import (
DEFAULT_PROBE_PORTS,
DEFAULT_SSH_PORTS,
DEFAULT_TCPFP_PORTS,
_discover_attackers,
_probe_cycle,
_write_event,
)
# ─── _discover_attackers ─────────────────────────────────────────────────────
class TestDiscoverAttackers:
def test_discovers_unique_ips(self, tmp_path: Path):
json_file = tmp_path / "decnet.json"
records = [
{"service": "sniffer", "event_type": "tls_client_hello", "attacker_ip": "10.0.0.1", "fields": {}},
{"service": "ssh", "event_type": "login_attempt", "attacker_ip": "10.0.0.2", "fields": {}},
{"service": "sniffer", "event_type": "tls_client_hello", "attacker_ip": "10.0.0.1", "fields": {}}, # dup
]
json_file.write_text("\n".join(json.dumps(r) for r in records) + "\n")
ips, pos = _discover_attackers(json_file, 0)
assert ips == {"10.0.0.1", "10.0.0.2"}
assert pos > 0
def test_skips_prober_events(self, tmp_path: Path):
json_file = tmp_path / "decnet.json"
records = [
{"service": "prober", "event_type": "jarm_fingerprint", "attacker_ip": "10.0.0.99", "fields": {}},
{"service": "ssh", "event_type": "login_attempt", "attacker_ip": "10.0.0.1", "fields": {}},
]
json_file.write_text("\n".join(json.dumps(r) for r in records) + "\n")
ips, _ = _discover_attackers(json_file, 0)
assert "10.0.0.99" not in ips
assert "10.0.0.1" in ips
def test_skips_unknown_ips(self, tmp_path: Path):
json_file = tmp_path / "decnet.json"
records = [
{"service": "sniffer", "event_type": "startup", "attacker_ip": "Unknown", "fields": {}},
]
json_file.write_text("\n".join(json.dumps(r) for r in records) + "\n")
ips, _ = _discover_attackers(json_file, 0)
assert len(ips) == 0
def test_handles_missing_file(self, tmp_path: Path):
json_file = tmp_path / "nonexistent.json"
ips, pos = _discover_attackers(json_file, 0)
assert len(ips) == 0
assert pos == 0
def test_resumes_from_position(self, tmp_path: Path):
json_file = tmp_path / "decnet.json"
line1 = json.dumps({"service": "ssh", "attacker_ip": "10.0.0.1", "fields": {}}) + "\n"
json_file.write_text(line1)
_, pos1 = _discover_attackers(json_file, 0)
# Append more
with open(json_file, "a") as f:
f.write(json.dumps({"service": "ssh", "attacker_ip": "10.0.0.2", "fields": {}}) + "\n")
ips, pos2 = _discover_attackers(json_file, pos1)
assert ips == {"10.0.0.2"} # only the new one
assert pos2 > pos1
def test_handles_file_rotation(self, tmp_path: Path):
json_file = tmp_path / "decnet.json"
# Write enough data to push position well ahead
lines = [json.dumps({"service": "ssh", "attacker_ip": f"10.0.0.{i}", "fields": {}}) + "\n" for i in range(10)]
json_file.write_text("".join(lines))
_, pos = _discover_attackers(json_file, 0)
assert pos > 0
# Simulate rotation — new file is smaller than the old position
json_file.write_text(json.dumps({"service": "ssh", "attacker_ip": "10.0.0.99", "fields": {}}) + "\n")
assert json_file.stat().st_size < pos
ips, new_pos = _discover_attackers(json_file, pos)
assert "10.0.0.99" in ips
def test_handles_malformed_json(self, tmp_path: Path):
json_file = tmp_path / "decnet.json"
json_file.write_text("not valid json\n" + json.dumps({"service": "ssh", "attacker_ip": "10.0.0.1", "fields": {}}) + "\n")
ips, _ = _discover_attackers(json_file, 0)
assert "10.0.0.1" in ips
# ─── _probe_cycle: JARM phase ──────────────────────────────────────────────
class TestProbeCycleJARM:
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash")
def test_probes_new_ips(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, tmp_path: Path):
mock_jarm.return_value = "c0c" * 10 + "a" * 32 # fake 62-char hash
mock_hassh.return_value = None
mock_tcpfp.return_value = None
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
targets = {"10.0.0.1"}
probed: dict[str, dict[str, set[int]]] = {}
_probe_cycle(targets, probed, [443, 8443], [], [], log_path, json_path, timeout=1.0)
assert mock_jarm.call_count == 2 # two ports
assert 443 in probed["10.0.0.1"]["jarm"]
assert 8443 in probed["10.0.0.1"]["jarm"]
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash")
def test_skips_already_probed_ports(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, tmp_path: Path):
mock_jarm.return_value = "c0c" * 10 + "a" * 32
mock_hassh.return_value = None
mock_tcpfp.return_value = None
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
targets = {"10.0.0.1"}
probed: dict[str, dict[str, set[int]]] = {"10.0.0.1": {"jarm": {443}}}
_probe_cycle(targets, probed, [443, 8443], [], [], log_path, json_path, timeout=1.0)
# Should only probe 8443 (443 already done)
assert mock_jarm.call_count == 1
mock_jarm.assert_called_once_with("10.0.0.1", 8443, timeout=1.0)
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash")
def test_empty_hash_not_logged(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, tmp_path: Path):
mock_jarm.return_value = JARM_EMPTY_HASH
mock_hassh.return_value = None
mock_tcpfp.return_value = None
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
targets = {"10.0.0.1"}
probed: dict[str, dict[str, set[int]]] = {}
_probe_cycle(targets, probed, [443], [], [], log_path, json_path, timeout=1.0)
assert 443 in probed["10.0.0.1"]["jarm"]
if json_path.exists():
content = json_path.read_text()
assert "jarm_fingerprint" not in content
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash")
def test_exception_marks_port_probed(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, tmp_path: Path):
mock_jarm.side_effect = OSError("Connection refused")
mock_hassh.return_value = None
mock_tcpfp.return_value = None
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
targets = {"10.0.0.1"}
probed: dict[str, dict[str, set[int]]] = {}
_probe_cycle(targets, probed, [443], [], [], log_path, json_path, timeout=1.0)
assert 443 in probed["10.0.0.1"]["jarm"]
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash")
def test_skips_ip_with_all_ports_done(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, tmp_path: Path):
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
targets = {"10.0.0.1"}
probed: dict[str, dict[str, set[int]]] = {
"10.0.0.1": {"jarm": {443, 8443}, "hassh": set(), "tcpfp": set()},
}
_probe_cycle(targets, probed, [443, 8443], [], [], log_path, json_path, timeout=1.0)
assert mock_jarm.call_count == 0
# ─── _probe_cycle: HASSH phase ─────────────────────────────────────────────
class TestProbeCycleHASSH:
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash")
def test_probes_ssh_ports(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, tmp_path: Path):
mock_jarm.return_value = JARM_EMPTY_HASH
mock_hassh.return_value = {
"hassh_server": "a" * 32,
"banner": "SSH-2.0-OpenSSH_8.9p1",
"kex_algorithms": "curve25519-sha256",
"encryption_s2c": "aes256-gcm@openssh.com",
"mac_s2c": "hmac-sha2-256-etm@openssh.com",
"compression_s2c": "none",
}
mock_tcpfp.return_value = None
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
targets = {"10.0.0.1"}
probed: dict[str, dict[str, set[int]]] = {}
_probe_cycle(targets, probed, [], [22, 2222], [], log_path, json_path, timeout=1.0)
assert mock_hassh.call_count == 2
assert 22 in probed["10.0.0.1"]["hassh"]
assert 2222 in probed["10.0.0.1"]["hassh"]
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash")
def test_hassh_writes_event(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, tmp_path: Path):
mock_jarm.return_value = JARM_EMPTY_HASH
mock_hassh.return_value = {
"hassh_server": "b" * 32,
"banner": "SSH-2.0-Paramiko_3.0",
"kex_algorithms": "diffie-hellman-group14-sha1",
"encryption_s2c": "aes128-cbc",
"mac_s2c": "hmac-sha1",
"compression_s2c": "none",
}
mock_tcpfp.return_value = None
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
targets = {"10.0.0.1"}
probed: dict[str, dict[str, set[int]]] = {}
_probe_cycle(targets, probed, [], [22], [], log_path, json_path, timeout=1.0)
assert json_path.exists()
content = json_path.read_text()
assert "hassh_fingerprint" in content
record = json.loads(content.strip())
assert record["fields"]["hassh_server_hash"] == "b" * 32
assert record["fields"]["ssh_banner"] == "SSH-2.0-Paramiko_3.0"
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash")
def test_hassh_none_not_logged(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, tmp_path: Path):
mock_jarm.return_value = JARM_EMPTY_HASH
mock_hassh.return_value = None # No SSH server
mock_tcpfp.return_value = None
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
targets = {"10.0.0.1"}
probed: dict[str, dict[str, set[int]]] = {}
_probe_cycle(targets, probed, [], [22], [], log_path, json_path, timeout=1.0)
assert 22 in probed["10.0.0.1"]["hassh"]
if json_path.exists():
content = json_path.read_text()
assert "hassh_fingerprint" not in content
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash")
def test_hassh_skips_already_probed(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, tmp_path: Path):
mock_jarm.return_value = JARM_EMPTY_HASH
mock_tcpfp.return_value = None
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
targets = {"10.0.0.1"}
probed: dict[str, dict[str, set[int]]] = {"10.0.0.1": {"hassh": {22}}}
_probe_cycle(targets, probed, [], [22, 2222], [], log_path, json_path, timeout=1.0)
assert mock_hassh.call_count == 1 # only 2222
mock_hassh.assert_called_once_with("10.0.0.1", 2222, timeout=1.0)
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash")
def test_hassh_exception_marks_probed(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, tmp_path: Path):
mock_jarm.return_value = JARM_EMPTY_HASH
mock_hassh.side_effect = OSError("Connection refused")
mock_tcpfp.return_value = None
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
targets = {"10.0.0.1"}
probed: dict[str, dict[str, set[int]]] = {}
_probe_cycle(targets, probed, [], [22], [], log_path, json_path, timeout=1.0)
assert 22 in probed["10.0.0.1"]["hassh"]
# ─── _probe_cycle: TCPFP phase ─────────────────────────────────────────────
class TestProbeCycleTCPFP:
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash")
def test_probes_tcpfp_ports(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, tmp_path: Path):
mock_jarm.return_value = JARM_EMPTY_HASH
mock_hassh.return_value = None
mock_tcpfp.return_value = {
"tcpfp_hash": "d" * 32,
"tcpfp_raw": "64:65535:1:1460:7:1:1:M,N,W,N,N,T,S,E",
"ttl": 64, "window_size": 65535, "df_bit": 1,
"mss": 1460, "window_scale": 7, "sack_ok": 1,
"timestamp": 1, "options_order": "M,N,W,N,N,T,S,E",
}
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
targets = {"10.0.0.1"}
probed: dict[str, dict[str, set[int]]] = {}
_probe_cycle(targets, probed, [], [], [80, 443], log_path, json_path, timeout=1.0)
assert mock_tcpfp.call_count == 2
assert 80 in probed["10.0.0.1"]["tcpfp"]
assert 443 in probed["10.0.0.1"]["tcpfp"]
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash")
def test_tcpfp_writes_event_with_all_fields(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, tmp_path: Path):
mock_jarm.return_value = JARM_EMPTY_HASH
mock_hassh.return_value = None
mock_tcpfp.return_value = {
"tcpfp_hash": "e" * 32,
"tcpfp_raw": "128:8192:1:1460:8:1:0:M,N,W,N,N,S",
"ttl": 128, "window_size": 8192, "df_bit": 1,
"mss": 1460, "window_scale": 8, "sack_ok": 1,
"timestamp": 0, "options_order": "M,N,W,N,N,S",
}
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
targets = {"10.0.0.1"}
probed: dict[str, dict[str, set[int]]] = {}
_probe_cycle(targets, probed, [], [], [443], log_path, json_path, timeout=1.0)
content = json_path.read_text()
assert "tcpfp_fingerprint" in content
record = json.loads(content.strip())
assert record["fields"]["tcpfp_hash"] == "e" * 32
assert record["fields"]["ttl"] == "128"
assert record["fields"]["window_size"] == "8192"
assert record["fields"]["options_order"] == "M,N,W,N,N,S"
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash")
def test_tcpfp_none_not_logged(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, tmp_path: Path):
mock_jarm.return_value = JARM_EMPTY_HASH
mock_hassh.return_value = None
mock_tcpfp.return_value = None
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
targets = {"10.0.0.1"}
probed: dict[str, dict[str, set[int]]] = {}
_probe_cycle(targets, probed, [], [], [443], log_path, json_path, timeout=1.0)
assert 443 in probed["10.0.0.1"]["tcpfp"]
if json_path.exists():
content = json_path.read_text()
assert "tcpfp_fingerprint" not in content
# ─── Probe type isolation ───────────────────────────────────────────────────
class TestProbeTypeIsolation:
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash")
def test_jarm_does_not_mark_hassh(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, tmp_path: Path):
"""JARM probing port 2222 should not mark HASSH port 2222 as done."""
mock_jarm.return_value = JARM_EMPTY_HASH
mock_hassh.return_value = None
mock_tcpfp.return_value = None
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
targets = {"10.0.0.1"}
probed: dict[str, dict[str, set[int]]] = {}
# Probe with JARM on 2222 and HASSH on 2222
_probe_cycle(targets, probed, [2222], [2222], [], log_path, json_path, timeout=1.0)
# Both should be called
assert mock_jarm.call_count == 1
assert mock_hassh.call_count == 1
assert 2222 in probed["10.0.0.1"]["jarm"]
assert 2222 in probed["10.0.0.1"]["hassh"]
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash")
def test_all_three_probes_run(self, mock_jarm: MagicMock, mock_hassh: MagicMock,
mock_tcpfp: MagicMock, tmp_path: Path):
mock_jarm.return_value = JARM_EMPTY_HASH
mock_hassh.return_value = None
mock_tcpfp.return_value = None
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
targets = {"10.0.0.1"}
probed: dict[str, dict[str, set[int]]] = {}
_probe_cycle(targets, probed, [443], [22], [80], log_path, json_path, timeout=1.0)
assert mock_jarm.call_count == 1
assert mock_hassh.call_count == 1
assert mock_tcpfp.call_count == 1
# ─── _write_event ────────────────────────────────────────────────────────────
class TestWriteEvent:
def test_writes_rfc5424_and_json(self, tmp_path: Path):
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
_write_event(log_path, json_path, "test_event", target_ip="10.0.0.1", msg="test")
assert log_path.exists()
assert json_path.exists()
log_content = log_path.read_text()
assert "test_event" in log_content
assert "relay@55555" in log_content
json_content = json_path.read_text()
record = json.loads(json_content.strip())
assert record["event_type"] == "test_event"
assert record["service"] == "prober"
assert record["fields"]["target_ip"] == "10.0.0.1"