Files
DECNET/tests/test_real_ssh.py
anti d4ac53c0c9 feat(ssh): replace Cowrie with real OpenSSH + rsyslog logging pipeline
Scraps the Cowrie emulation layer. The real_ssh template now runs a
genuine sshd backed by a three-layer logging stack forwarded to stdout
as RFC 5424 for the DECNET collector:

  auth,authpriv.*  → rsyslogd → named pipe → stdout  (logins/failures)
  user.*           → rsyslogd → named pipe → stdout  (PROMPT_COMMAND cmds)
  sudo syslog=auth → rsyslogd → named pipe → stdout  (privilege escalation)
  sudo logfile     → /var/log/sudo.log               (local backup with I/O)

The ssh.py service plugin now points to templates/real_ssh and drops all
COWRIE_* / NODE_NAME env vars, sharing the same compose fragment shape as
real_ssh.py.
2026-04-11 19:12:54 -04:00

189 lines
5.5 KiB
Python

"""
Tests for the RealSSHService plugin and the deaddeck archetype.
"""
from pathlib import Path
from decnet.services.registry import all_services, get_service
from decnet.archetypes import get_archetype
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _fragment(service_cfg: dict | None = None, log_target: str | None = None) -> dict:
return get_service("real_ssh").compose_fragment(
"test-decky", log_target=log_target, service_cfg=service_cfg
)
# ---------------------------------------------------------------------------
# Registration
# ---------------------------------------------------------------------------
def test_real_ssh_registered():
assert "real_ssh" in all_services()
def test_real_ssh_ports():
svc = get_service("real_ssh")
assert svc.ports == [22]
def test_real_ssh_is_build_service():
svc = get_service("real_ssh")
assert svc.default_image == "build"
def test_real_ssh_dockerfile_context_exists():
svc = get_service("real_ssh")
ctx = svc.dockerfile_context()
assert ctx is not None
assert ctx.is_dir(), f"Dockerfile context directory missing: {ctx}"
assert (ctx / "Dockerfile").exists(), "Dockerfile missing in real_ssh template dir"
assert (ctx / "entrypoint.sh").exists(), "entrypoint.sh missing in real_ssh template dir"
# ---------------------------------------------------------------------------
# compose_fragment structure
# ---------------------------------------------------------------------------
def test_compose_fragment_has_build():
frag = _fragment()
assert "build" in frag
assert "context" in frag["build"]
def test_compose_fragment_container_name():
frag = _fragment()
assert frag["container_name"] == "test-decky-real-ssh"
def test_compose_fragment_restart_policy():
frag = _fragment()
assert frag["restart"] == "unless-stopped"
def test_compose_fragment_cap_add():
frag = _fragment()
assert "NET_BIND_SERVICE" in frag.get("cap_add", [])
def test_compose_fragment_default_password():
frag = _fragment()
env = frag["environment"]
assert env["SSH_ROOT_PASSWORD"] == "admin"
# ---------------------------------------------------------------------------
# service_cfg overrides
# ---------------------------------------------------------------------------
def test_custom_password():
frag = _fragment(service_cfg={"password": "s3cr3t!"})
assert frag["environment"]["SSH_ROOT_PASSWORD"] == "s3cr3t!"
def test_custom_hostname():
frag = _fragment(service_cfg={"hostname": "srv-prod-01"})
assert frag["environment"]["SSH_HOSTNAME"] == "srv-prod-01"
def test_no_hostname_by_default():
frag = _fragment()
assert "SSH_HOSTNAME" not in frag["environment"]
# ---------------------------------------------------------------------------
# log_target: real_ssh does not forward logs via LOG_TARGET
# (no log aggregation on the entry-point — attacker shouldn't see it)
# ---------------------------------------------------------------------------
def test_no_log_target_env_injected():
frag = _fragment(log_target="10.0.0.1:5140")
assert "LOG_TARGET" not in frag.get("environment", {})
# ---------------------------------------------------------------------------
# Deaddeck archetype
# ---------------------------------------------------------------------------
def test_deaddeck_archetype_exists():
arch = get_archetype("deaddeck")
assert arch.slug == "deaddeck"
def test_deaddeck_uses_real_ssh():
arch = get_archetype("deaddeck")
assert "real_ssh" in arch.services
def test_deaddeck_nmap_os():
arch = get_archetype("deaddeck")
assert arch.nmap_os == "linux"
def test_deaddeck_preferred_distros_not_empty():
arch = get_archetype("deaddeck")
assert len(arch.preferred_distros) >= 1
# ---------------------------------------------------------------------------
# Logging pipeline wiring (Dockerfile + entrypoint)
# ---------------------------------------------------------------------------
def _dockerfile_text() -> str:
svc = get_service("real_ssh")
return (svc.dockerfile_context() / "Dockerfile").read_text()
def _entrypoint_text() -> str:
svc = get_service("real_ssh")
return (svc.dockerfile_context() / "entrypoint.sh").read_text()
def test_dockerfile_has_rsyslog():
assert "rsyslog" in _dockerfile_text()
def test_dockerfile_runs_as_root():
"""sshd requires root — no USER directive should appear after setup."""
lines = [l.strip() for l in _dockerfile_text().splitlines()]
user_lines = [l for l in lines if l.startswith("USER ")]
assert user_lines == [], f"Unexpected USER directive(s): {user_lines}"
def test_dockerfile_rsyslog_conf_created():
df = _dockerfile_text()
assert "99-decnet.conf" in df
assert "RFC5424fmt" in df
def test_dockerfile_sudoers_syslog():
df = _dockerfile_text()
assert "syslog=auth" in df
assert "log_input" in df
assert "log_output" in df
def test_dockerfile_prompt_command_logger():
df = _dockerfile_text()
assert "PROMPT_COMMAND" in df
assert "logger" in df
def test_entrypoint_creates_named_pipe():
assert "mkfifo" in _entrypoint_text()
def test_entrypoint_starts_rsyslogd():
assert "rsyslogd" in _entrypoint_text()
def test_entrypoint_sshd_no_dash_e():
ep = _entrypoint_text()
assert "sshd -D" in ep
# -e flag would bypass syslog; must not be present
assert "sshd -D -e" not in ep