Files
DECNET/tests/services/test_ssh_stealth.py
anti ea95a009df refactor(tests): move flat tests/*.py into per-subsystem subfolders
Groups every flat test_*.py under the module it exercises, matching the
existing tests/{profiler,sniffer,prober,collector,correlation,cli,web,
topology,swarm,bus,updater,api,docker,geoip,...} layout. New folders:
services/, fleet/, config/, logging/, db/ (+ db/mysql/), telemetry/,
mutator/, core/.

Path-dependent __file__ references bumped an extra .parent in three
files that moved one level deeper:
- tests/sniffer/test_sniffer_ja3.py   (template path)
- tests/services/test_ssh_capture_emit.py (template path)
- tests/cli/test_mode_gating.py  (REPO root)
- tests/web/test_env_lazy_jwt.py (repo var)

Also drops two SQLite runtime artifacts (test_decnet.db-{shm,wal}) that
were leaking into the repo from a previous test run.

Fixes two test_service_isolation cases that patched asyncio.sleep (no
longer on the profiler main-loop hot path — same pre-existing bug I
fixed earlier in test_attacker_worker.py) by patching asyncio.wait_for
and passing interval=0.
2026-04-23 21:34:25 -04:00

144 lines
5.0 KiB
Python

"""
Stealth-hardening assertions for the SSH honeypot template.
The three capture artifacts — syslog_bridge.py, emit_capture.py, capture.sh —
used to land as plaintext files in the container (world-readable by the
attacker, who is root in-container). They are now packed into /entrypoint.sh
as XOR+gzip+base64 blobs at image-build time by _build_stealth.py.
These tests pin the stealth contract at the source-template level so
regressions surface without needing a docker build.
"""
from __future__ import annotations
import base64
import gzip
import importlib.util
import sys
from pathlib import Path
from decnet.services.registry import get_service
def _ctx() -> Path:
return get_service("ssh").dockerfile_context()
def _load_build_stealth():
path = _ctx() / "_build_stealth.py"
spec = importlib.util.spec_from_file_location("_build_stealth", path)
mod = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = mod
spec.loader.exec_module(mod)
return mod
# ---------------------------------------------------------------------------
# Build helper exists and is wired into the Dockerfile
# ---------------------------------------------------------------------------
def test_build_stealth_helper_shipped():
helper = _ctx() / "_build_stealth.py"
assert helper.exists(), "_build_stealth.py missing from SSH template"
body = helper.read_text()
assert "__STEALTH_KEY__" in body
assert "__EMIT_CAPTURE_B64__" in body
assert "__JOURNAL_RELAY_B64__" in body
def test_dockerfile_invokes_build_stealth():
df = (_ctx() / "Dockerfile").read_text()
assert "_build_stealth.py" in df
assert "python3 /tmp/build/_build_stealth.py" in df
# ---------------------------------------------------------------------------
# Entrypoint template shape
# ---------------------------------------------------------------------------
def test_entrypoint_is_template_with_placeholders():
ep = (_ctx() / "entrypoint.sh").read_text()
# Pre-build template — placeholders must be present; the Docker build
# stage substitutes them.
assert "__STEALTH_KEY__" in ep
assert "__EMIT_CAPTURE_B64__" in ep
assert "__JOURNAL_RELAY_B64__" in ep
def test_entrypoint_decodes_via_xor():
ep = (_ctx() / "entrypoint.sh").read_text()
# XOR-then-gunzip layering: base64 -> xor -> gunzip
assert "base64 -d" in ep
assert "gunzip" in ep
# The decoded vars drive the capture loop.
assert "EMIT_CAPTURE_PY" in ep
assert "export EMIT_CAPTURE_PY" in ep
def test_entrypoint_no_plaintext_python_path():
ep = (_ctx() / "entrypoint.sh").read_text()
assert "/opt/emit_capture.py" not in ep
assert "/opt/syslog_bridge.py" not in ep
assert "/usr/libexec/udev/journal-relay" not in ep
# ---------------------------------------------------------------------------
# End-to-end: pack + round-trip
# ---------------------------------------------------------------------------
def test_build_stealth_merge_and_pack_roundtrip(tmp_path, monkeypatch):
"""Merge the real sources, pack them, and decode — assert semantic equality."""
mod = _load_build_stealth()
build = tmp_path / "build"
build.mkdir()
ctx = _ctx()
for name in ("syslog_bridge.py", "emit_capture.py", "capture.sh", "entrypoint.sh"):
(build / name).write_text((ctx / name).read_text())
monkeypatch.setattr(mod, "BUILD", build)
out_dir = tmp_path / "out"
out_dir.mkdir()
# Redirect the write target so we don't touch /entrypoint.sh.
import pathlib
real_path = pathlib.Path
def fake_path(arg, *a, **kw):
if arg == "/entrypoint.sh":
return real_path(out_dir) / "entrypoint.sh"
return real_path(arg, *a, **kw)
monkeypatch.setattr(mod, "Path", fake_path)
rc = mod.main()
assert rc == 0
rendered = (out_dir / "entrypoint.sh").read_text()
for marker in ("__STEALTH_KEY__", "__EMIT_CAPTURE_B64__", "__JOURNAL_RELAY_B64__"):
assert marker not in rendered, f"{marker} left in rendered entrypoint"
# Extract key + blobs and decode.
import re
key = int(re.search(r"_STEALTH_KEY=(\d+)", rendered).group(1))
emit_b64 = re.search(r"_EMIT_CAPTURE_B64='([^']+)'", rendered).group(1)
relay_b64 = re.search(r"_JOURNAL_RELAY_B64='([^']+)'", rendered).group(1)
def unpack(s: str) -> str:
xored = base64.b64decode(s)
gz = bytes(b ^ key for b in xored)
return gzip.decompress(gz).decode("utf-8")
emit_src = unpack(emit_b64)
relay_src = unpack(relay_b64)
# Merged python must contain both module bodies, with the import hack stripped.
assert "def syslog_line(" in emit_src
assert "def main() -> int:" in emit_src
assert "from syslog_bridge import" not in emit_src
assert "sys.path.insert" not in emit_src
# Capture loop must reference the in-memory python var, not the old path.
assert "EMIT_CAPTURE_PY" in relay_src
assert "/opt/emit_capture.py" not in relay_src
assert "inotifywait" in relay_src or "INOTIFY_BIN" in relay_src