main: remove tests and pytest dependency
This commit is contained in:
@@ -1,312 +0,0 @@
|
||||
"""
|
||||
Tests for machine archetypes and the amount= expansion feature.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import textwrap
|
||||
import tempfile
|
||||
import os
|
||||
import pytest
|
||||
|
||||
from decnet.archetypes import (
|
||||
ARCHETYPES,
|
||||
all_archetypes,
|
||||
get_archetype,
|
||||
random_archetype,
|
||||
)
|
||||
from decnet.ini_loader import load_ini, DeckySpec
|
||||
from decnet.distros import DISTROS
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Archetype registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_all_archetypes_returns_all():
|
||||
result = all_archetypes()
|
||||
assert isinstance(result, dict)
|
||||
assert len(result) == len(ARCHETYPES)
|
||||
|
||||
|
||||
def test_get_archetype_known():
|
||||
arch = get_archetype("linux-server")
|
||||
assert arch.slug == "linux-server"
|
||||
assert "ssh" in arch.services
|
||||
|
||||
|
||||
def test_get_archetype_unknown_raises():
|
||||
with pytest.raises(ValueError, match="Unknown archetype"):
|
||||
get_archetype("does-not-exist")
|
||||
|
||||
|
||||
def test_random_archetype_returns_valid():
|
||||
arch = random_archetype()
|
||||
assert arch.slug in ARCHETYPES
|
||||
|
||||
|
||||
def test_every_archetype_has_services():
|
||||
for slug, arch in ARCHETYPES.items():
|
||||
assert arch.services, f"Archetype '{slug}' has no services"
|
||||
|
||||
|
||||
def test_every_archetype_has_preferred_distros():
|
||||
for slug, arch in ARCHETYPES.items():
|
||||
assert arch.preferred_distros, f"Archetype '{slug}' has no preferred_distros"
|
||||
|
||||
|
||||
def test_every_archetype_preferred_distro_is_valid():
|
||||
valid_slugs = set(DISTROS.keys())
|
||||
for slug, arch in ARCHETYPES.items():
|
||||
for d in arch.preferred_distros:
|
||||
assert d in valid_slugs, (
|
||||
f"Archetype '{slug}' references unknown distro '{d}'"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# INI loader — archetype= parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _write_ini(content: str) -> str:
|
||||
"""Write INI content to a temp file and return the path."""
|
||||
content = textwrap.dedent(content)
|
||||
fd, path = tempfile.mkstemp(suffix=".ini")
|
||||
os.write(fd, content.encode())
|
||||
os.close(fd)
|
||||
return path
|
||||
|
||||
|
||||
def test_ini_archetype_parsed():
|
||||
path = _write_ini("""
|
||||
[general]
|
||||
net=10.0.0.0/24
|
||||
gw=10.0.0.1
|
||||
|
||||
[my-server]
|
||||
archetype=linux-server
|
||||
""")
|
||||
cfg = load_ini(path)
|
||||
os.unlink(path)
|
||||
assert len(cfg.deckies) == 1
|
||||
assert cfg.deckies[0].archetype == "linux-server"
|
||||
assert cfg.deckies[0].services is None # not overridden
|
||||
|
||||
|
||||
def test_ini_archetype_with_explicit_services_override():
|
||||
"""explicit services= must survive alongside archetype="""
|
||||
path = _write_ini("""
|
||||
[general]
|
||||
net=10.0.0.0/24
|
||||
gw=10.0.0.1
|
||||
|
||||
[my-server]
|
||||
archetype=linux-server
|
||||
services=ftp,smb
|
||||
""")
|
||||
cfg = load_ini(path)
|
||||
os.unlink(path)
|
||||
assert cfg.deckies[0].archetype == "linux-server"
|
||||
assert cfg.deckies[0].services == ["ftp", "smb"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# INI loader — amount= expansion
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_ini_amount_one_keeps_section_name():
|
||||
path = _write_ini("""
|
||||
[general]
|
||||
net=10.0.0.0/24
|
||||
gw=10.0.0.1
|
||||
|
||||
[my-printer]
|
||||
archetype=printer
|
||||
amount=1
|
||||
""")
|
||||
cfg = load_ini(path)
|
||||
os.unlink(path)
|
||||
assert len(cfg.deckies) == 1
|
||||
assert cfg.deckies[0].name == "my-printer"
|
||||
|
||||
|
||||
def test_ini_amount_expands_deckies():
|
||||
path = _write_ini("""
|
||||
[general]
|
||||
net=10.0.0.0/24
|
||||
gw=10.0.0.1
|
||||
|
||||
[corp-ws]
|
||||
archetype=windows-workstation
|
||||
amount=5
|
||||
""")
|
||||
cfg = load_ini(path)
|
||||
os.unlink(path)
|
||||
assert len(cfg.deckies) == 5
|
||||
for i, d in enumerate(cfg.deckies, start=1):
|
||||
assert d.name == f"corp-ws-{i:02d}"
|
||||
assert d.archetype == "windows-workstation"
|
||||
assert d.ip is None # auto-allocated
|
||||
|
||||
|
||||
def test_ini_amount_with_ip_raises():
|
||||
path = _write_ini("""
|
||||
[general]
|
||||
net=10.0.0.0/24
|
||||
gw=10.0.0.1
|
||||
|
||||
[bad-group]
|
||||
services=ssh
|
||||
ip=10.0.0.50
|
||||
amount=3
|
||||
""")
|
||||
with pytest.raises(ValueError, match="Cannot combine ip="):
|
||||
load_ini(path)
|
||||
os.unlink(path)
|
||||
|
||||
|
||||
def test_ini_amount_invalid_value_raises():
|
||||
path = _write_ini("""
|
||||
[general]
|
||||
net=10.0.0.0/24
|
||||
gw=10.0.0.1
|
||||
|
||||
[bad]
|
||||
services=ssh
|
||||
amount=potato
|
||||
""")
|
||||
with pytest.raises(ValueError, match="must be a positive integer"):
|
||||
load_ini(path)
|
||||
os.unlink(path)
|
||||
|
||||
|
||||
def test_ini_amount_zero_raises():
|
||||
path = _write_ini("""
|
||||
[general]
|
||||
net=10.0.0.0/24
|
||||
gw=10.0.0.1
|
||||
|
||||
[bad]
|
||||
services=ssh
|
||||
amount=0
|
||||
""")
|
||||
with pytest.raises(ValueError, match="must be a positive integer"):
|
||||
load_ini(path)
|
||||
os.unlink(path)
|
||||
|
||||
|
||||
def test_ini_amount_multiple_groups():
|
||||
"""Two groups with different amounts expand independently."""
|
||||
path = _write_ini("""
|
||||
[general]
|
||||
net=10.0.0.0/24
|
||||
gw=10.0.0.1
|
||||
|
||||
[workers]
|
||||
archetype=linux-server
|
||||
amount=3
|
||||
|
||||
[printers]
|
||||
archetype=printer
|
||||
amount=2
|
||||
""")
|
||||
cfg = load_ini(path)
|
||||
os.unlink(path)
|
||||
assert len(cfg.deckies) == 5
|
||||
names = [d.name for d in cfg.deckies]
|
||||
assert names == ["workers-01", "workers-02", "workers-03", "printers-01", "printers-02"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# INI loader — per-service subsections propagate to expanded deckies
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_ini_subsection_propagates_to_expanded_deckies():
|
||||
"""[group.ssh] must apply to group-01, group-02, ..."""
|
||||
path = _write_ini("""
|
||||
[general]
|
||||
net=10.0.0.0/24
|
||||
gw=10.0.0.1
|
||||
|
||||
[linux-hosts]
|
||||
archetype=linux-server
|
||||
amount=3
|
||||
|
||||
[linux-hosts.ssh]
|
||||
kernel_version=5.15.0-76-generic
|
||||
""")
|
||||
cfg = load_ini(path)
|
||||
os.unlink(path)
|
||||
assert len(cfg.deckies) == 3
|
||||
for d in cfg.deckies:
|
||||
assert "ssh" in d.service_config
|
||||
assert d.service_config["ssh"]["kernel_version"] == "5.15.0-76-generic"
|
||||
|
||||
|
||||
def test_ini_subsection_direct_match_unaffected():
|
||||
"""A direct [decky.svc] subsection must still work when amount=1."""
|
||||
path = _write_ini("""
|
||||
[general]
|
||||
net=10.0.0.0/24
|
||||
gw=10.0.0.1
|
||||
|
||||
[web-01]
|
||||
services=http
|
||||
|
||||
[web-01.http]
|
||||
server_header=Apache/2.4.51
|
||||
""")
|
||||
cfg = load_ini(path)
|
||||
os.unlink(path)
|
||||
assert cfg.deckies[0].service_config["http"]["server_header"] == "Apache/2.4.51"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _build_deckies — archetype applied via CLI path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_build_deckies_archetype_sets_services():
|
||||
from decnet.cli import _build_deckies
|
||||
from decnet.archetypes import get_archetype
|
||||
arch = get_archetype("mail-server")
|
||||
result = _build_deckies(
|
||||
n=2,
|
||||
ips=["10.0.0.10", "10.0.0.11"],
|
||||
services_explicit=None,
|
||||
randomize_services=False,
|
||||
archetype=arch,
|
||||
)
|
||||
assert len(result) == 2
|
||||
for d in result:
|
||||
assert set(d.services) == set(arch.services)
|
||||
assert d.archetype == "mail-server"
|
||||
|
||||
|
||||
def test_build_deckies_archetype_preferred_distros():
|
||||
from decnet.cli import _build_deckies
|
||||
from decnet.archetypes import get_archetype
|
||||
arch = get_archetype("iot-device") # preferred_distros=["alpine"]
|
||||
result = _build_deckies(
|
||||
n=3,
|
||||
ips=["10.0.0.10", "10.0.0.11", "10.0.0.12"],
|
||||
services_explicit=None,
|
||||
randomize_services=False,
|
||||
archetype=arch,
|
||||
)
|
||||
for d in result:
|
||||
assert d.distro == "alpine"
|
||||
|
||||
|
||||
def test_build_deckies_explicit_services_override_archetype():
|
||||
from decnet.cli import _build_deckies
|
||||
from decnet.archetypes import get_archetype
|
||||
arch = get_archetype("linux-server")
|
||||
result = _build_deckies(
|
||||
n=1,
|
||||
ips=["10.0.0.10"],
|
||||
services_explicit=["ftp"],
|
||||
randomize_services=False,
|
||||
archetype=arch,
|
||||
)
|
||||
assert result[0].services == ["ftp"]
|
||||
assert result[0].archetype == "linux-server"
|
||||
@@ -1,81 +0,0 @@
|
||||
"""
|
||||
Tests for the CLI service pool — verifies that --randomize-services draws
|
||||
from all registered services, not just the original hardcoded 5.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from decnet.cli import _all_service_names, _build_deckies
|
||||
from decnet.services.registry import all_services
|
||||
|
||||
|
||||
ORIGINAL_5 = {"ssh", "smb", "rdp", "http", "ftp"}
|
||||
|
||||
|
||||
def test_all_service_names_covers_full_registry():
|
||||
"""_all_service_names() must return every service in the registry."""
|
||||
pool = set(_all_service_names())
|
||||
registry = set(all_services().keys())
|
||||
assert pool == registry
|
||||
|
||||
|
||||
def test_all_service_names_is_sorted():
|
||||
names = _all_service_names()
|
||||
assert names == sorted(names)
|
||||
|
||||
|
||||
def test_all_service_names_includes_at_least_25():
|
||||
assert len(_all_service_names()) >= 25
|
||||
|
||||
|
||||
def test_all_service_names_includes_all_original_5():
|
||||
pool = set(_all_service_names())
|
||||
assert ORIGINAL_5.issubset(pool)
|
||||
|
||||
|
||||
def test_randomize_services_pool_exceeds_original_5():
|
||||
"""
|
||||
After enough random draws, at least one service outside the original 5 must appear.
|
||||
With 25 services and picking 1-3 at a time, 200 draws makes this ~100% certain.
|
||||
"""
|
||||
all_drawn: set[str] = set()
|
||||
for _ in range(200):
|
||||
deckies = _build_deckies(
|
||||
n=1,
|
||||
ips=["10.0.0.10"],
|
||||
services_explicit=None,
|
||||
randomize_services=True,
|
||||
)
|
||||
all_drawn.update(deckies[0].services)
|
||||
|
||||
beyond_original = all_drawn - ORIGINAL_5
|
||||
assert beyond_original, (
|
||||
f"After 200 draws only saw the original 5 services. "
|
||||
f"All drawn: {sorted(all_drawn)}"
|
||||
)
|
||||
|
||||
|
||||
def test_build_deckies_randomize_services_valid():
|
||||
"""All randomly chosen services must exist in the registry."""
|
||||
registry = set(all_services().keys())
|
||||
for _ in range(50):
|
||||
deckies = _build_deckies(
|
||||
n=3,
|
||||
ips=["10.0.0.10", "10.0.0.11", "10.0.0.12"],
|
||||
services_explicit=None,
|
||||
randomize_services=True,
|
||||
)
|
||||
for decky in deckies:
|
||||
unknown = set(decky.services) - registry
|
||||
assert not unknown, f"Decky {decky.name} got unknown services: {unknown}"
|
||||
|
||||
|
||||
def test_build_deckies_explicit_services_unchanged():
|
||||
"""Explicit service list must pass through untouched."""
|
||||
deckies = _build_deckies(
|
||||
n=2,
|
||||
ips=["10.0.0.10", "10.0.0.11"],
|
||||
services_explicit=["ssh", "ftp"],
|
||||
randomize_services=False,
|
||||
)
|
||||
for decky in deckies:
|
||||
assert decky.services == ["ssh", "ftp"]
|
||||
@@ -1,243 +0,0 @@
|
||||
"""
|
||||
Tests for the composer — verifies BASE_IMAGE injection and distro heterogeneity.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from decnet.config import DeckyConfig, DecnetConfig
|
||||
from decnet.composer import generate_compose
|
||||
from decnet.distros import all_distros, DISTROS
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
APT_COMPATIBLE = {
|
||||
"debian:bookworm-slim",
|
||||
"ubuntu:22.04",
|
||||
"ubuntu:20.04",
|
||||
"kalilinux/kali-rolling",
|
||||
}
|
||||
|
||||
BUILD_SERVICES = [
|
||||
"ssh", "http", "rdp", "smb", "ftp", "smtp", "elasticsearch",
|
||||
"pop3", "imap", "mysql", "mssql", "redis", "mongodb", "postgres",
|
||||
"ldap", "vnc", "docker_api", "k8s", "sip",
|
||||
"mqtt", "llmnr", "snmp", "tftp",
|
||||
]
|
||||
|
||||
UPSTREAM_SERVICES = ["telnet", "conpot"]
|
||||
|
||||
|
||||
def _make_config(services, distro="debian", base_image=None, build_base=None):
|
||||
profile = DISTROS[distro]
|
||||
decky = DeckyConfig(
|
||||
name="decky-01",
|
||||
ip="10.0.0.10",
|
||||
services=services,
|
||||
distro=distro,
|
||||
base_image=base_image or profile.image,
|
||||
build_base=build_base or profile.build_base,
|
||||
hostname="test-host",
|
||||
)
|
||||
return DecnetConfig(
|
||||
mode="unihost",
|
||||
interface="eth0",
|
||||
subnet="10.0.0.0/24",
|
||||
gateway="10.0.0.1",
|
||||
deckies=[decky],
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# BASE_IMAGE injection — build services
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.parametrize("svc", BUILD_SERVICES)
|
||||
def test_build_service_gets_base_image_arg(svc):
|
||||
"""Every build service must have BASE_IMAGE injected in compose args."""
|
||||
config = _make_config([svc], distro="debian")
|
||||
compose = generate_compose(config)
|
||||
key = f"decky-01-{svc}"
|
||||
fragment = compose["services"][key]
|
||||
assert "build" in fragment, f"{svc}: missing 'build' key"
|
||||
assert "args" in fragment["build"], f"{svc}: build section missing 'args'"
|
||||
assert "BASE_IMAGE" in fragment["build"]["args"], f"{svc}: BASE_IMAGE not in args"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("distro,expected_build_base", [
|
||||
("debian", "debian:bookworm-slim"),
|
||||
("ubuntu22", "ubuntu:22.04"),
|
||||
("ubuntu20", "ubuntu:20.04"),
|
||||
("kali", "kalilinux/kali-rolling"),
|
||||
("rocky9", "debian:bookworm-slim"),
|
||||
("alpine", "debian:bookworm-slim"),
|
||||
])
|
||||
def test_build_service_base_image_matches_distro(distro, expected_build_base):
|
||||
"""BASE_IMAGE arg must match the distro's build_base."""
|
||||
config = _make_config(["http"], distro=distro)
|
||||
compose = generate_compose(config)
|
||||
fragment = compose["services"]["decky-01-http"]
|
||||
assert fragment["build"]["args"]["BASE_IMAGE"] == expected_build_base
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# BASE_IMAGE NOT injected for upstream-image services
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.parametrize("svc", UPSTREAM_SERVICES)
|
||||
def test_upstream_service_has_no_build_section(svc):
|
||||
"""Upstream-image services must not receive a build section or BASE_IMAGE."""
|
||||
config = _make_config([svc])
|
||||
compose = generate_compose(config)
|
||||
fragment = compose["services"][f"decky-01-{svc}"]
|
||||
assert "build" not in fragment
|
||||
assert "image" in fragment
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# service_config propagation tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_service_config_http_server_header():
|
||||
"""service_config for http must inject SERVER_HEADER into compose env."""
|
||||
from decnet.config import DeckyConfig, DecnetConfig
|
||||
from decnet.distros import DISTROS
|
||||
profile = DISTROS["debian"]
|
||||
decky = DeckyConfig(
|
||||
name="decky-01", ip="10.0.0.10",
|
||||
services=["http"], distro="debian",
|
||||
base_image=profile.image, build_base=profile.build_base,
|
||||
hostname="test-host",
|
||||
service_config={"http": {"server_header": "nginx/1.18.0"}},
|
||||
)
|
||||
config = DecnetConfig(
|
||||
mode="unihost", interface="eth0",
|
||||
subnet="10.0.0.0/24", gateway="10.0.0.1",
|
||||
deckies=[decky],
|
||||
)
|
||||
compose = generate_compose(config)
|
||||
env = compose["services"]["decky-01-http"]["environment"]
|
||||
assert env.get("SERVER_HEADER") == "nginx/1.18.0"
|
||||
|
||||
|
||||
def test_service_config_ssh_kernel_version():
|
||||
"""service_config for ssh must inject COWRIE_HONEYPOT_KERNEL_VERSION."""
|
||||
from decnet.config import DeckyConfig, DecnetConfig
|
||||
from decnet.distros import DISTROS
|
||||
profile = DISTROS["debian"]
|
||||
decky = DeckyConfig(
|
||||
name="decky-01", ip="10.0.0.10",
|
||||
services=["ssh"], distro="debian",
|
||||
base_image=profile.image, build_base=profile.build_base,
|
||||
hostname="test-host",
|
||||
service_config={"ssh": {"kernel_version": "5.15.0-76-generic"}},
|
||||
)
|
||||
config = DecnetConfig(
|
||||
mode="unihost", interface="eth0",
|
||||
subnet="10.0.0.0/24", gateway="10.0.0.1",
|
||||
deckies=[decky],
|
||||
)
|
||||
compose = generate_compose(config)
|
||||
env = compose["services"]["decky-01-ssh"]["environment"]
|
||||
assert env.get("COWRIE_HONEYPOT_KERNEL_VERSION") == "5.15.0-76-generic"
|
||||
|
||||
|
||||
def test_service_config_for_one_service_does_not_affect_another():
|
||||
"""service_config for http must not bleed into ftp fragment."""
|
||||
from decnet.config import DeckyConfig, DecnetConfig
|
||||
from decnet.distros import DISTROS
|
||||
profile = DISTROS["debian"]
|
||||
decky = DeckyConfig(
|
||||
name="decky-01", ip="10.0.0.10",
|
||||
services=["http", "ftp"], distro="debian",
|
||||
base_image=profile.image, build_base=profile.build_base,
|
||||
hostname="test-host",
|
||||
service_config={"http": {"server_header": "nginx/1.18.0"}},
|
||||
)
|
||||
config = DecnetConfig(
|
||||
mode="unihost", interface="eth0",
|
||||
subnet="10.0.0.0/24", gateway="10.0.0.1",
|
||||
deckies=[decky],
|
||||
)
|
||||
compose = generate_compose(config)
|
||||
ftp_env = compose["services"]["decky-01-ftp"]["environment"]
|
||||
assert "SERVER_HEADER" not in ftp_env
|
||||
|
||||
|
||||
def test_no_service_config_produces_no_extra_env():
|
||||
"""A decky with no service_config must not have new persona env vars."""
|
||||
config = _make_config(["http", "mysql"])
|
||||
compose = generate_compose(config)
|
||||
for svc in ("http", "mysql"):
|
||||
env = compose["services"][f"decky-01-{svc}"]["environment"]
|
||||
assert "SERVER_HEADER" not in env
|
||||
assert "MYSQL_VERSION" not in env
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Base container uses distro image, not build_base
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.parametrize("distro", list(DISTROS.keys()))
|
||||
def test_base_container_uses_full_distro_image(distro):
|
||||
"""The IP-holder base container must use distro.image, not build_base."""
|
||||
config = _make_config(["ssh"], distro=distro)
|
||||
compose = generate_compose(config)
|
||||
base = compose["services"]["decky-01"]
|
||||
expected = DISTROS[distro].image
|
||||
assert base["image"] == expected, (
|
||||
f"distro={distro}: base container image '{base['image']}' != '{expected}'"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Distro profile — build_base is always apt-compatible
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_all_distros_have_build_base():
|
||||
for slug, profile in all_distros().items():
|
||||
assert profile.build_base, f"Distro '{slug}' has empty build_base"
|
||||
|
||||
|
||||
def test_all_distro_build_bases_are_apt_compatible():
|
||||
for slug, profile in all_distros().items():
|
||||
assert profile.build_base in APT_COMPATIBLE, (
|
||||
f"Distro '{slug}' build_base '{profile.build_base}' is not apt-compatible. "
|
||||
f"Allowed: {APT_COMPATIBLE}"
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Heterogeneity — multiple deckies with different distros get different images
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_multiple_deckies_different_build_bases():
|
||||
"""A multi-decky deployment with ubuntu22 and debian must differ in BASE_IMAGE."""
|
||||
deckies = [
|
||||
DeckyConfig(
|
||||
name="decky-01", ip="10.0.0.10",
|
||||
services=["http"], distro="debian",
|
||||
base_image="debian:bookworm-slim", build_base="debian:bookworm-slim",
|
||||
hostname="host-01",
|
||||
),
|
||||
DeckyConfig(
|
||||
name="decky-02", ip="10.0.0.11",
|
||||
services=["http"], distro="ubuntu22",
|
||||
base_image="ubuntu:22.04", build_base="ubuntu:22.04",
|
||||
hostname="host-02",
|
||||
),
|
||||
]
|
||||
config = DecnetConfig(
|
||||
mode="unihost", interface="eth0",
|
||||
subnet="10.0.0.0/24", gateway="10.0.0.1",
|
||||
deckies=deckies,
|
||||
)
|
||||
compose = generate_compose(config)
|
||||
|
||||
base_img_01 = compose["services"]["decky-01-http"]["build"]["args"]["BASE_IMAGE"]
|
||||
base_img_02 = compose["services"]["decky-02-http"]["build"]["args"]["BASE_IMAGE"]
|
||||
|
||||
assert base_img_01 == "debian:bookworm-slim"
|
||||
assert base_img_02 == "ubuntu:22.04"
|
||||
assert base_img_01 != base_img_02
|
||||
@@ -1,420 +0,0 @@
|
||||
"""
|
||||
Tests for the DECNET cross-decky correlation engine.
|
||||
|
||||
Covers:
|
||||
- RFC 5424 line parsing (parser.py)
|
||||
- Traversal graph data types (graph.py)
|
||||
- CorrelationEngine ingestion, querying, and reporting (engine.py)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from decnet.correlation.parser import LogEvent, parse_line
|
||||
from decnet.correlation.graph import AttackerTraversal, TraversalHop
|
||||
from decnet.correlation.engine import CorrelationEngine, _fmt_duration
|
||||
from decnet.logging.syslog_formatter import format_rfc5424, SEVERITY_INFO, SEVERITY_WARNING
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fixtures & helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_TS = "2026-04-04T10:00:00+00:00"
|
||||
_TS2 = "2026-04-04T10:05:00+00:00"
|
||||
_TS3 = "2026-04-04T10:10:00+00:00"
|
||||
|
||||
|
||||
def _make_line(
|
||||
service: str = "http",
|
||||
hostname: str = "decky-01",
|
||||
event_type: str = "connection",
|
||||
src_ip: str = "1.2.3.4",
|
||||
timestamp: str = _TS,
|
||||
extra_fields: dict | None = None,
|
||||
) -> str:
|
||||
"""Build a real RFC 5424 DECNET syslog line via the formatter."""
|
||||
fields = {}
|
||||
if src_ip:
|
||||
fields["src_ip"] = src_ip
|
||||
if extra_fields:
|
||||
fields.update(extra_fields)
|
||||
return format_rfc5424(
|
||||
service=service,
|
||||
hostname=hostname,
|
||||
event_type=event_type,
|
||||
severity=SEVERITY_INFO,
|
||||
timestamp=datetime.fromisoformat(timestamp),
|
||||
**fields,
|
||||
)
|
||||
|
||||
|
||||
def _make_line_src(hostname: str, src: str, timestamp: str = _TS) -> str:
|
||||
"""Build a line that uses `src` instead of `src_ip` (mssql style)."""
|
||||
return format_rfc5424(
|
||||
service="mssql",
|
||||
hostname=hostname,
|
||||
event_type="unknown_packet",
|
||||
severity=SEVERITY_INFO,
|
||||
timestamp=datetime.fromisoformat(timestamp),
|
||||
src=src,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# parser.py — parse_line
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestParserBasic:
|
||||
def test_returns_none_for_blank(self):
|
||||
assert parse_line("") is None
|
||||
assert parse_line(" ") is None
|
||||
|
||||
def test_returns_none_for_non_rfc5424(self):
|
||||
assert parse_line("this is not a syslog line") is None
|
||||
assert parse_line("Jan 1 00:00:00 host sshd: blah") is None
|
||||
|
||||
def test_returns_log_event(self):
|
||||
event = parse_line(_make_line())
|
||||
assert isinstance(event, LogEvent)
|
||||
|
||||
def test_hostname_extracted(self):
|
||||
event = parse_line(_make_line(hostname="decky-07"))
|
||||
assert event.decky == "decky-07"
|
||||
|
||||
def test_service_extracted(self):
|
||||
event = parse_line(_make_line(service="ftp"))
|
||||
assert event.service == "ftp"
|
||||
|
||||
def test_event_type_extracted(self):
|
||||
event = parse_line(_make_line(event_type="login_attempt"))
|
||||
assert event.event_type == "login_attempt"
|
||||
|
||||
def test_timestamp_parsed(self):
|
||||
event = parse_line(_make_line(timestamp=_TS))
|
||||
assert event.timestamp == datetime.fromisoformat(_TS)
|
||||
|
||||
def test_raw_line_preserved(self):
|
||||
line = _make_line()
|
||||
event = parse_line(line)
|
||||
assert event.raw == line.strip()
|
||||
|
||||
|
||||
class TestParserAttackerIP:
|
||||
def test_src_ip_field(self):
|
||||
event = parse_line(_make_line(src_ip="10.0.0.1"))
|
||||
assert event.attacker_ip == "10.0.0.1"
|
||||
|
||||
def test_src_field_fallback(self):
|
||||
"""mssql logs use `src` instead of `src_ip`."""
|
||||
event = parse_line(_make_line_src("decky-win", "192.168.1.5"))
|
||||
assert event.attacker_ip == "192.168.1.5"
|
||||
|
||||
def test_no_ip_field_gives_none(self):
|
||||
line = format_rfc5424("http", "decky-01", "startup", SEVERITY_INFO)
|
||||
event = parse_line(line)
|
||||
assert event is not None
|
||||
assert event.attacker_ip is None
|
||||
|
||||
def test_extra_fields_in_dict(self):
|
||||
event = parse_line(_make_line(extra_fields={"username": "root", "password": "admin"}))
|
||||
assert event.fields["username"] == "root"
|
||||
assert event.fields["password"] == "admin"
|
||||
|
||||
def test_src_ip_priority_over_src(self):
|
||||
"""src_ip should win when both are present."""
|
||||
line = format_rfc5424(
|
||||
"mssql", "decky-01", "evt", SEVERITY_INFO,
|
||||
timestamp=datetime.fromisoformat(_TS),
|
||||
src_ip="1.1.1.1",
|
||||
src="2.2.2.2",
|
||||
)
|
||||
event = parse_line(line)
|
||||
assert event.attacker_ip == "1.1.1.1"
|
||||
|
||||
def test_sd_escape_chars_decoded(self):
|
||||
"""Escaped characters in SD values should be unescaped."""
|
||||
line = format_rfc5424(
|
||||
"http", "decky-01", "evt", SEVERITY_INFO,
|
||||
timestamp=datetime.fromisoformat(_TS),
|
||||
src_ip="1.2.3.4",
|
||||
path='/search?q=a"b',
|
||||
)
|
||||
event = parse_line(line)
|
||||
assert '"' in event.fields["path"]
|
||||
|
||||
def test_nilvalue_hostname_skipped(self):
|
||||
line = format_rfc5424("-", "decky-01", "evt", SEVERITY_INFO)
|
||||
assert parse_line(line) is None
|
||||
|
||||
def test_nilvalue_service_skipped(self):
|
||||
line = format_rfc5424("http", "-", "evt", SEVERITY_INFO)
|
||||
assert parse_line(line) is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# graph.py — AttackerTraversal
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_traversal(ip: str, hops_spec: list[tuple]) -> AttackerTraversal:
|
||||
"""hops_spec: list of (ts_str, decky, service, event_type)"""
|
||||
hops = [
|
||||
TraversalHop(
|
||||
timestamp=datetime.fromisoformat(ts),
|
||||
decky=decky,
|
||||
service=svc,
|
||||
event_type=evt,
|
||||
)
|
||||
for ts, decky, svc, evt in hops_spec
|
||||
]
|
||||
return AttackerTraversal(attacker_ip=ip, hops=hops)
|
||||
|
||||
|
||||
class TestTraversalGraph:
|
||||
def setup_method(self):
|
||||
self.t = _make_traversal("5.6.7.8", [
|
||||
(_TS, "decky-01", "ssh", "login_attempt"),
|
||||
(_TS2, "decky-03", "http", "request"),
|
||||
(_TS3, "decky-05", "ftp", "auth_attempt"),
|
||||
])
|
||||
|
||||
def test_first_seen(self):
|
||||
assert self.t.first_seen == datetime.fromisoformat(_TS)
|
||||
|
||||
def test_last_seen(self):
|
||||
assert self.t.last_seen == datetime.fromisoformat(_TS3)
|
||||
|
||||
def test_duration_seconds(self):
|
||||
assert self.t.duration_seconds == 600.0
|
||||
|
||||
def test_deckies_ordered(self):
|
||||
assert self.t.deckies == ["decky-01", "decky-03", "decky-05"]
|
||||
|
||||
def test_decky_count(self):
|
||||
assert self.t.decky_count == 3
|
||||
|
||||
def test_path_string(self):
|
||||
assert self.t.path == "decky-01 → decky-03 → decky-05"
|
||||
|
||||
def test_to_dict_keys(self):
|
||||
d = self.t.to_dict()
|
||||
assert d["attacker_ip"] == "5.6.7.8"
|
||||
assert d["decky_count"] == 3
|
||||
assert d["hop_count"] == 3
|
||||
assert len(d["hops"]) == 3
|
||||
assert d["path"] == "decky-01 → decky-03 → decky-05"
|
||||
|
||||
def test_to_dict_hops_structure(self):
|
||||
hop = self.t.to_dict()["hops"][0]
|
||||
assert set(hop.keys()) == {"timestamp", "decky", "service", "event_type"}
|
||||
|
||||
def test_repeated_decky_not_double_counted_in_path(self):
|
||||
t = _make_traversal("1.1.1.1", [
|
||||
(_TS, "decky-01", "ssh", "conn"),
|
||||
(_TS2, "decky-02", "ftp", "conn"),
|
||||
(_TS3, "decky-01", "ssh", "conn"), # revisit
|
||||
])
|
||||
assert t.deckies == ["decky-01", "decky-02"]
|
||||
assert t.decky_count == 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# engine.py — CorrelationEngine
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestEngineIngestion:
|
||||
def test_ingest_returns_event(self):
|
||||
engine = CorrelationEngine()
|
||||
evt = engine.ingest(_make_line())
|
||||
assert evt is not None
|
||||
|
||||
def test_ingest_blank_returns_none(self):
|
||||
engine = CorrelationEngine()
|
||||
assert engine.ingest("") is None
|
||||
|
||||
def test_lines_parsed_counter(self):
|
||||
engine = CorrelationEngine()
|
||||
engine.ingest(_make_line())
|
||||
engine.ingest("garbage")
|
||||
assert engine.lines_parsed == 2
|
||||
|
||||
def test_events_indexed_counter(self):
|
||||
engine = CorrelationEngine()
|
||||
engine.ingest(_make_line(src_ip="1.2.3.4"))
|
||||
engine.ingest(_make_line(src_ip="")) # no IP
|
||||
assert engine.events_indexed == 1
|
||||
|
||||
def test_ingest_file(self, tmp_path):
|
||||
log = tmp_path / "decnet.log"
|
||||
lines = [
|
||||
_make_line("ssh", "decky-01", "conn", "10.0.0.1", _TS),
|
||||
_make_line("http", "decky-02", "req", "10.0.0.1", _TS2),
|
||||
_make_line("ftp", "decky-03", "auth", "10.0.0.1", _TS3),
|
||||
]
|
||||
log.write_text("\n".join(lines))
|
||||
engine = CorrelationEngine()
|
||||
count = engine.ingest_file(log)
|
||||
assert count == 3
|
||||
|
||||
|
||||
class TestEngineTraversals:
|
||||
def _engine_with(self, specs: list[tuple]) -> CorrelationEngine:
|
||||
"""specs: (service, decky, event_type, src_ip, timestamp)"""
|
||||
engine = CorrelationEngine()
|
||||
for svc, decky, evt, ip, ts in specs:
|
||||
engine.ingest(_make_line(svc, decky, evt, ip, ts))
|
||||
return engine
|
||||
|
||||
def test_single_decky_not_a_traversal(self):
|
||||
engine = self._engine_with([
|
||||
("ssh", "decky-01", "conn", "1.1.1.1", _TS),
|
||||
("ssh", "decky-01", "conn", "1.1.1.1", _TS2),
|
||||
])
|
||||
assert engine.traversals() == []
|
||||
|
||||
def test_two_deckies_is_traversal(self):
|
||||
engine = self._engine_with([
|
||||
("ssh", "decky-01", "conn", "1.1.1.1", _TS),
|
||||
("http", "decky-02", "req", "1.1.1.1", _TS2),
|
||||
])
|
||||
t = engine.traversals()
|
||||
assert len(t) == 1
|
||||
assert t[0].attacker_ip == "1.1.1.1"
|
||||
assert t[0].decky_count == 2
|
||||
|
||||
def test_min_deckies_filter(self):
|
||||
engine = self._engine_with([
|
||||
("ssh", "decky-01", "conn", "1.1.1.1", _TS),
|
||||
("http", "decky-02", "req", "1.1.1.1", _TS2),
|
||||
("ftp", "decky-03", "auth", "1.1.1.1", _TS3),
|
||||
])
|
||||
assert len(engine.traversals(min_deckies=3)) == 1
|
||||
assert len(engine.traversals(min_deckies=4)) == 0
|
||||
|
||||
def test_multiple_attackers_separate_traversals(self):
|
||||
engine = self._engine_with([
|
||||
("ssh", "decky-01", "conn", "1.1.1.1", _TS),
|
||||
("http", "decky-02", "req", "1.1.1.1", _TS2),
|
||||
("ssh", "decky-03", "conn", "9.9.9.9", _TS),
|
||||
("ftp", "decky-04", "auth", "9.9.9.9", _TS2),
|
||||
])
|
||||
traversals = engine.traversals()
|
||||
assert len(traversals) == 2
|
||||
ips = {t.attacker_ip for t in traversals}
|
||||
assert ips == {"1.1.1.1", "9.9.9.9"}
|
||||
|
||||
def test_traversals_sorted_by_first_seen(self):
|
||||
engine = self._engine_with([
|
||||
("ssh", "decky-01", "conn", "9.9.9.9", _TS2), # later
|
||||
("ftp", "decky-02", "auth", "9.9.9.9", _TS3),
|
||||
("http", "decky-03", "req", "1.1.1.1", _TS), # earlier
|
||||
("smb", "decky-04", "auth", "1.1.1.1", _TS2),
|
||||
])
|
||||
traversals = engine.traversals()
|
||||
assert traversals[0].attacker_ip == "1.1.1.1"
|
||||
assert traversals[1].attacker_ip == "9.9.9.9"
|
||||
|
||||
def test_hops_ordered_chronologically(self):
|
||||
engine = self._engine_with([
|
||||
("ftp", "decky-02", "auth", "5.5.5.5", _TS2), # ingested first but later ts
|
||||
("ssh", "decky-01", "conn", "5.5.5.5", _TS),
|
||||
])
|
||||
t = engine.traversals()[0]
|
||||
assert t.hops[0].decky == "decky-01"
|
||||
assert t.hops[1].decky == "decky-02"
|
||||
|
||||
def test_all_attackers(self):
|
||||
engine = self._engine_with([
|
||||
("ssh", "decky-01", "conn", "1.1.1.1", _TS),
|
||||
("ssh", "decky-01", "conn", "1.1.1.1", _TS2),
|
||||
("ssh", "decky-01", "conn", "2.2.2.2", _TS),
|
||||
])
|
||||
attackers = engine.all_attackers()
|
||||
assert attackers["1.1.1.1"] == 2
|
||||
assert attackers["2.2.2.2"] == 1
|
||||
|
||||
def test_mssql_src_field_correlated(self):
|
||||
"""Verify that `src=` (mssql style) is picked up for cross-decky correlation."""
|
||||
engine = CorrelationEngine()
|
||||
engine.ingest(_make_line_src("decky-win1", "10.10.10.5", _TS))
|
||||
engine.ingest(_make_line_src("decky-win2", "10.10.10.5", _TS2))
|
||||
t = engine.traversals()
|
||||
assert len(t) == 1
|
||||
assert t[0].decky_count == 2
|
||||
|
||||
|
||||
class TestEngineReporting:
|
||||
def _two_decky_engine(self) -> CorrelationEngine:
|
||||
engine = CorrelationEngine()
|
||||
engine.ingest(_make_line("ssh", "decky-01", "conn", "3.3.3.3", _TS))
|
||||
engine.ingest(_make_line("http", "decky-02", "req", "3.3.3.3", _TS2))
|
||||
return engine
|
||||
|
||||
def test_report_json_structure(self):
|
||||
engine = self._two_decky_engine()
|
||||
report = engine.report_json()
|
||||
assert "stats" in report
|
||||
assert "traversals" in report
|
||||
assert report["stats"]["traversals"] == 1
|
||||
t = report["traversals"][0]
|
||||
assert t["attacker_ip"] == "3.3.3.3"
|
||||
assert t["decky_count"] == 2
|
||||
|
||||
def test_report_json_serialisable(self):
|
||||
engine = self._two_decky_engine()
|
||||
# Should not raise
|
||||
json.dumps(engine.report_json())
|
||||
|
||||
def test_report_table_returns_rich_table(self):
|
||||
from rich.table import Table
|
||||
engine = self._two_decky_engine()
|
||||
table = engine.report_table()
|
||||
assert isinstance(table, Table)
|
||||
|
||||
def test_traversal_syslog_lines_count(self):
|
||||
engine = self._two_decky_engine()
|
||||
lines = engine.traversal_syslog_lines()
|
||||
assert len(lines) == 1
|
||||
|
||||
def test_traversal_syslog_line_is_rfc5424(self):
|
||||
engine = self._two_decky_engine()
|
||||
line = engine.traversal_syslog_lines()[0]
|
||||
# Must match RFC 5424 header
|
||||
assert re.match(r"^<\d+>1 \S+ \S+ correlator - traversal_detected", line)
|
||||
|
||||
def test_traversal_syslog_contains_attacker_ip(self):
|
||||
engine = self._two_decky_engine()
|
||||
line = engine.traversal_syslog_lines()[0]
|
||||
assert "3.3.3.3" in line
|
||||
|
||||
def test_traversal_syslog_severity_is_warning(self):
|
||||
engine = self._two_decky_engine()
|
||||
line = engine.traversal_syslog_lines()[0]
|
||||
pri = int(re.match(r"^<(\d+)>", line).group(1))
|
||||
assert pri == 16 * 8 + SEVERITY_WARNING # local0 + warning
|
||||
|
||||
def test_no_traversals_empty_json(self):
|
||||
engine = CorrelationEngine()
|
||||
engine.ingest(_make_line()) # single decky, no traversal
|
||||
assert engine.report_json()["stats"]["traversals"] == 0
|
||||
assert engine.traversal_syslog_lines() == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _fmt_duration helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestFmtDuration:
|
||||
def test_seconds(self):
|
||||
assert _fmt_duration(45) == "45s"
|
||||
|
||||
def test_minutes(self):
|
||||
assert _fmt_duration(90) == "1.5m"
|
||||
|
||||
def test_hours(self):
|
||||
assert _fmt_duration(7200) == "2.0h"
|
||||
@@ -1,71 +0,0 @@
|
||||
"""Tests for the syslog file handler."""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
import decnet.logging.file_handler as fh
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def reset_handler(tmp_path, monkeypatch):
|
||||
"""Reset the module-level logger between tests."""
|
||||
monkeypatch.setattr(fh, "_handler", None)
|
||||
monkeypatch.setattr(fh, "_logger", None)
|
||||
monkeypatch.setenv(fh._LOG_FILE_ENV, str(tmp_path / "test.log"))
|
||||
yield
|
||||
# Remove handlers to avoid file lock issues on next test
|
||||
if fh._logger is not None:
|
||||
for h in list(fh._logger.handlers):
|
||||
h.close()
|
||||
fh._logger.removeHandler(h)
|
||||
fh._handler = None
|
||||
fh._logger = None
|
||||
|
||||
|
||||
def test_write_creates_log_file(tmp_path):
|
||||
log_path = tmp_path / "decnet.log"
|
||||
os.environ[fh._LOG_FILE_ENV] = str(log_path)
|
||||
fh.write_syslog("<134>1 2026-04-04T12:00:00+00:00 h svc - e - test message")
|
||||
assert log_path.exists()
|
||||
assert "test message" in log_path.read_text()
|
||||
|
||||
|
||||
def test_write_appends_multiple_lines(tmp_path):
|
||||
log_path = tmp_path / "decnet.log"
|
||||
os.environ[fh._LOG_FILE_ENV] = str(log_path)
|
||||
for i in range(3):
|
||||
fh.write_syslog(f"<134>1 ts host svc - event{i} -")
|
||||
lines = log_path.read_text().splitlines()
|
||||
assert len(lines) == 3
|
||||
assert "event0" in lines[0]
|
||||
assert "event2" in lines[2]
|
||||
|
||||
|
||||
def test_get_log_path_default(monkeypatch):
|
||||
monkeypatch.delenv(fh._LOG_FILE_ENV, raising=False)
|
||||
assert fh.get_log_path() == Path(fh._DEFAULT_LOG_FILE)
|
||||
|
||||
|
||||
def test_get_log_path_custom(monkeypatch, tmp_path):
|
||||
custom = str(tmp_path / "custom.log")
|
||||
monkeypatch.setenv(fh._LOG_FILE_ENV, custom)
|
||||
assert fh.get_log_path() == Path(custom)
|
||||
|
||||
|
||||
def test_rotating_handler_configured(tmp_path):
|
||||
log_path = tmp_path / "r.log"
|
||||
os.environ[fh._LOG_FILE_ENV] = str(log_path)
|
||||
logger = fh._get_logger()
|
||||
handler = logger.handlers[0]
|
||||
assert isinstance(handler, logging.handlers.RotatingFileHandler)
|
||||
assert handler.maxBytes == fh._MAX_BYTES
|
||||
assert handler.backupCount == fh._BACKUP_COUNT
|
||||
|
||||
|
||||
def test_write_syslog_does_not_raise_on_bad_path(monkeypatch):
|
||||
monkeypatch.setenv(fh._LOG_FILE_ENV, "/no/such/dir/that/exists/decnet.log")
|
||||
# Should not raise — falls back to StreamHandler
|
||||
fh.write_syslog("<134>1 ts h svc - e -")
|
||||
@@ -1,217 +0,0 @@
|
||||
"""
|
||||
Tests for the INI loader — subsection parsing, custom service definitions,
|
||||
and per-service config propagation.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
from decnet.ini_loader import load_ini, IniConfig
|
||||
|
||||
|
||||
def _write_ini(tmp_path: Path, content: str) -> Path:
|
||||
f = tmp_path / "decnet.ini"
|
||||
f.write_text(textwrap.dedent(content))
|
||||
return f
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Basic decky parsing (regression)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_basic_decky_parsed(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[general]
|
||||
net = 192.168.1.0/24
|
||||
gw = 192.168.1.1
|
||||
|
||||
[decky-01]
|
||||
ip = 192.168.1.101
|
||||
services = ssh, http
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert len(cfg.deckies) == 1
|
||||
assert cfg.deckies[0].name == "decky-01"
|
||||
assert cfg.deckies[0].services == ["ssh", "http"]
|
||||
assert cfg.deckies[0].service_config == {}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Per-service subsection parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_subsection_parsed_into_service_config(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
ip = 192.168.1.101
|
||||
services = ssh
|
||||
|
||||
[decky-01.ssh]
|
||||
kernel_version = 5.15.0-76-generic
|
||||
hardware_platform = x86_64
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
svc_cfg = cfg.deckies[0].service_config
|
||||
assert "ssh" in svc_cfg
|
||||
assert svc_cfg["ssh"]["kernel_version"] == "5.15.0-76-generic"
|
||||
assert svc_cfg["ssh"]["hardware_platform"] == "x86_64"
|
||||
|
||||
|
||||
def test_multiple_subsections_for_same_decky(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
services = ssh, http
|
||||
|
||||
[decky-01.ssh]
|
||||
users = root:toor
|
||||
|
||||
[decky-01.http]
|
||||
server_header = nginx/1.18.0
|
||||
fake_app = wordpress
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
svc_cfg = cfg.deckies[0].service_config
|
||||
assert svc_cfg["ssh"]["users"] == "root:toor"
|
||||
assert svc_cfg["http"]["server_header"] == "nginx/1.18.0"
|
||||
assert svc_cfg["http"]["fake_app"] == "wordpress"
|
||||
|
||||
|
||||
def test_subsection_for_unknown_decky_is_ignored(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
services = ssh
|
||||
|
||||
[ghost.ssh]
|
||||
kernel_version = 5.15.0
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
# ghost.ssh must not create a new decky or error out
|
||||
assert len(cfg.deckies) == 1
|
||||
assert cfg.deckies[0].name == "decky-01"
|
||||
assert cfg.deckies[0].service_config == {}
|
||||
|
||||
|
||||
def test_plain_decky_without_subsections_has_empty_service_config(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
services = http
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert cfg.deckies[0].service_config == {}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Bring-your-own service (BYOS) parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_custom_service_parsed(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[general]
|
||||
net = 10.0.0.0/24
|
||||
gw = 10.0.0.1
|
||||
|
||||
[custom-myservice]
|
||||
binary = my-image:latest
|
||||
exec = /usr/bin/myapp -p 8080
|
||||
ports = 8080
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert len(cfg.custom_services) == 1
|
||||
cs = cfg.custom_services[0]
|
||||
assert cs.name == "myservice"
|
||||
assert cs.image == "my-image:latest"
|
||||
assert cs.exec_cmd == "/usr/bin/myapp -p 8080"
|
||||
assert cs.ports == [8080]
|
||||
|
||||
|
||||
def test_custom_service_without_ports(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[custom-scanner]
|
||||
binary = scanner:1.0
|
||||
exec = /usr/bin/scanner
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert cfg.custom_services[0].ports == []
|
||||
|
||||
|
||||
def test_custom_service_not_added_to_deckies(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
services = ssh
|
||||
|
||||
[custom-myservice]
|
||||
binary = foo:bar
|
||||
exec = /bin/foo
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert len(cfg.deckies) == 1
|
||||
assert cfg.deckies[0].name == "decky-01"
|
||||
assert len(cfg.custom_services) == 1
|
||||
|
||||
|
||||
def test_no_custom_services_gives_empty_list(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
services = http
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert cfg.custom_services == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# nmap_os parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_nmap_os_parsed_from_ini(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-win]
|
||||
ip = 192.168.1.101
|
||||
services = rdp, smb
|
||||
nmap_os = windows
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert cfg.deckies[0].nmap_os == "windows"
|
||||
|
||||
|
||||
def test_nmap_os_defaults_to_none_when_absent(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
services = ssh
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert cfg.deckies[0].nmap_os is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("os_family", ["linux", "windows", "bsd", "embedded", "cisco"])
|
||||
def test_nmap_os_all_families_accepted(tmp_path, os_family):
|
||||
ini_file = _write_ini(tmp_path, f"""
|
||||
[decky-01]
|
||||
services = ssh
|
||||
nmap_os = {os_family}
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert cfg.deckies[0].nmap_os == os_family
|
||||
|
||||
|
||||
def test_nmap_os_propagates_to_amount_expanded_deckies(tmp_path):
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[corp-printers]
|
||||
services = snmp
|
||||
nmap_os = embedded
|
||||
amount = 3
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert len(cfg.deckies) == 3
|
||||
for d in cfg.deckies:
|
||||
assert d.nmap_os == "embedded"
|
||||
|
||||
|
||||
def test_nmap_os_hyphen_alias_accepted(tmp_path):
|
||||
"""nmap-os= (hyphen) should work as an alias for nmap_os=."""
|
||||
ini_file = _write_ini(tmp_path, """
|
||||
[decky-01]
|
||||
services = ssh
|
||||
nmap-os = bsd
|
||||
""")
|
||||
cfg = load_ini(ini_file)
|
||||
assert cfg.deckies[0].nmap_os == "bsd"
|
||||
@@ -1,97 +0,0 @@
|
||||
"""Tests for log_file volume mount in compose generation."""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from decnet.composer import _CONTAINER_LOG_DIR, _resolve_log_file, generate_compose
|
||||
from decnet.config import DeckyConfig, DecnetConfig
|
||||
from decnet.distros import DISTROS
|
||||
|
||||
|
||||
def _make_config(log_file: str | None = None) -> DecnetConfig:
|
||||
profile = DISTROS["debian"]
|
||||
decky = DeckyConfig(
|
||||
name="decky-01",
|
||||
ip="10.0.0.10",
|
||||
services=["http"],
|
||||
distro="debian",
|
||||
base_image=profile.image,
|
||||
build_base=profile.build_base,
|
||||
hostname="test-host",
|
||||
)
|
||||
return DecnetConfig(
|
||||
mode="unihost",
|
||||
interface="eth0",
|
||||
subnet="10.0.0.0/24",
|
||||
gateway="10.0.0.1",
|
||||
deckies=[decky],
|
||||
log_file=log_file,
|
||||
)
|
||||
|
||||
|
||||
class TestResolveLogFile:
|
||||
def test_absolute_path(self, tmp_path):
|
||||
log_path = str(tmp_path / "decnet.log")
|
||||
host_dir, container_path = _resolve_log_file(log_path)
|
||||
assert host_dir == str(tmp_path)
|
||||
assert container_path == f"{_CONTAINER_LOG_DIR}/decnet.log"
|
||||
|
||||
def test_relative_path_resolves_to_absolute(self):
|
||||
host_dir, container_path = _resolve_log_file("decnet.log")
|
||||
assert Path(host_dir).is_absolute()
|
||||
assert container_path == f"{_CONTAINER_LOG_DIR}/decnet.log"
|
||||
|
||||
def test_nested_filename_preserved(self, tmp_path):
|
||||
log_path = str(tmp_path / "logs" / "honeypot.log")
|
||||
_, container_path = _resolve_log_file(log_path)
|
||||
assert container_path.endswith("honeypot.log")
|
||||
|
||||
|
||||
class TestComposeLogFileMount:
|
||||
def test_no_log_file_no_volume(self):
|
||||
config = _make_config(log_file=None)
|
||||
compose = generate_compose(config)
|
||||
fragment = compose["services"]["decky-01-http"]
|
||||
assert "DECNET_LOG_FILE" not in fragment.get("environment", {})
|
||||
volumes = fragment.get("volumes", [])
|
||||
assert not any(_CONTAINER_LOG_DIR in v for v in volumes)
|
||||
|
||||
def test_log_file_sets_env_var(self, tmp_path):
|
||||
config = _make_config(log_file=str(tmp_path / "decnet.log"))
|
||||
compose = generate_compose(config)
|
||||
fragment = compose["services"]["decky-01-http"]
|
||||
env = fragment["environment"]
|
||||
assert "DECNET_LOG_FILE" in env
|
||||
assert env["DECNET_LOG_FILE"].startswith(_CONTAINER_LOG_DIR)
|
||||
assert env["DECNET_LOG_FILE"].endswith("decnet.log")
|
||||
|
||||
def test_log_file_adds_volume_mount(self, tmp_path):
|
||||
config = _make_config(log_file=str(tmp_path / "decnet.log"))
|
||||
compose = generate_compose(config)
|
||||
fragment = compose["services"]["decky-01-http"]
|
||||
volumes = fragment.get("volumes", [])
|
||||
assert any(_CONTAINER_LOG_DIR in v for v in volumes)
|
||||
|
||||
def test_volume_mount_format(self, tmp_path):
|
||||
config = _make_config(log_file=str(tmp_path / "decnet.log"))
|
||||
compose = generate_compose(config)
|
||||
fragment = compose["services"]["decky-01-http"]
|
||||
mount = next(v for v in fragment["volumes"] if _CONTAINER_LOG_DIR in v)
|
||||
host_part, container_part = mount.split(":")
|
||||
assert Path(host_part).is_absolute()
|
||||
assert container_part == _CONTAINER_LOG_DIR
|
||||
|
||||
def test_host_log_dir_created(self, tmp_path):
|
||||
log_dir = tmp_path / "newdir"
|
||||
config = _make_config(log_file=str(log_dir / "decnet.log"))
|
||||
generate_compose(config)
|
||||
assert log_dir.exists()
|
||||
|
||||
def test_volume_not_duplicated(self, tmp_path):
|
||||
"""Same mount must not appear twice even if fragment already has volumes."""
|
||||
config = _make_config(log_file=str(tmp_path / "decnet.log"))
|
||||
compose = generate_compose(config)
|
||||
fragment = compose["services"]["decky-01-http"]
|
||||
log_mounts = [v for v in fragment["volumes"] if _CONTAINER_LOG_DIR in v]
|
||||
assert len(log_mounts) == 1
|
||||
@@ -1,196 +0,0 @@
|
||||
"""
|
||||
Tests for decnet.network utility functions.
|
||||
"""
|
||||
|
||||
from unittest.mock import MagicMock, call, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from decnet.network import (
|
||||
HOST_IPVLAN_IFACE,
|
||||
HOST_MACVLAN_IFACE,
|
||||
MACVLAN_NETWORK_NAME,
|
||||
create_ipvlan_network,
|
||||
create_macvlan_network,
|
||||
ips_to_range,
|
||||
setup_host_ipvlan,
|
||||
setup_host_macvlan,
|
||||
teardown_host_ipvlan,
|
||||
teardown_host_macvlan,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ips_to_range
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestIpsToRange:
|
||||
def test_single_ip(self):
|
||||
assert ips_to_range(["192.168.1.100"]) == "192.168.1.100/32"
|
||||
|
||||
def test_consecutive_small_range(self):
|
||||
# .97–.101: max^min = 4, bit_length=3, prefix=29 → .96/29
|
||||
result = ips_to_range([f"192.168.1.{i}" for i in range(97, 102)])
|
||||
from ipaddress import IPv4Network, IPv4Address
|
||||
net = IPv4Network(result)
|
||||
for i in range(97, 102):
|
||||
assert IPv4Address(f"192.168.1.{i}") in net
|
||||
|
||||
def test_range_crossing_cidr_boundary(self):
|
||||
# .110–.119 crosses the /28 boundary (.96–.111 vs .112–.127)
|
||||
# Subtraction gives /28 (wrong), XOR gives /27 (correct)
|
||||
ips = [f"192.168.1.{i}" for i in range(110, 120)]
|
||||
result = ips_to_range(ips)
|
||||
from ipaddress import IPv4Network, IPv4Address
|
||||
net = IPv4Network(result)
|
||||
for i in range(110, 120):
|
||||
assert IPv4Address(f"192.168.1.{i}") in net, (
|
||||
f"192.168.1.{i} not in computed range {result}"
|
||||
)
|
||||
|
||||
def test_all_ips_covered(self):
|
||||
# Larger spread: .10–.200
|
||||
ips = [f"10.0.0.{i}" for i in range(10, 201)]
|
||||
result = ips_to_range(ips)
|
||||
from ipaddress import IPv4Network, IPv4Address
|
||||
net = IPv4Network(result)
|
||||
for i in range(10, 201):
|
||||
assert IPv4Address(f"10.0.0.{i}") in net
|
||||
|
||||
def test_two_ips_same_cidr(self):
|
||||
# .100 and .101 share /31
|
||||
result = ips_to_range(["192.168.1.100", "192.168.1.101"])
|
||||
from ipaddress import IPv4Network, IPv4Address
|
||||
net = IPv4Network(result)
|
||||
assert IPv4Address("192.168.1.100") in net
|
||||
assert IPv4Address("192.168.1.101") in net
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# create_macvlan_network
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCreateMacvlanNetwork:
|
||||
def _make_client(self, existing=None):
|
||||
client = MagicMock()
|
||||
nets = [MagicMock(name=n) for n in (existing or [])]
|
||||
for net, n in zip(nets, (existing or [])):
|
||||
net.name = n
|
||||
client.networks.list.return_value = nets
|
||||
return client
|
||||
|
||||
def test_creates_network_when_absent(self):
|
||||
client = self._make_client([])
|
||||
create_macvlan_network(client, "eth0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
|
||||
client.networks.create.assert_called_once()
|
||||
kwargs = client.networks.create.call_args
|
||||
assert kwargs[1]["driver"] == "macvlan"
|
||||
assert kwargs[1]["name"] == MACVLAN_NETWORK_NAME
|
||||
assert kwargs[1]["options"]["parent"] == "eth0"
|
||||
|
||||
def test_noop_when_network_exists(self):
|
||||
client = self._make_client([MACVLAN_NETWORK_NAME])
|
||||
create_macvlan_network(client, "eth0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
|
||||
client.networks.create.assert_not_called()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# create_ipvlan_network
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestCreateIpvlanNetwork:
|
||||
def _make_client(self, existing=None):
|
||||
client = MagicMock()
|
||||
nets = [MagicMock(name=n) for n in (existing or [])]
|
||||
for net, n in zip(nets, (existing or [])):
|
||||
net.name = n
|
||||
client.networks.list.return_value = nets
|
||||
return client
|
||||
|
||||
def test_creates_ipvlan_network(self):
|
||||
client = self._make_client([])
|
||||
create_ipvlan_network(client, "wlan0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
|
||||
client.networks.create.assert_called_once()
|
||||
kwargs = client.networks.create.call_args
|
||||
assert kwargs[1]["driver"] == "ipvlan"
|
||||
assert kwargs[1]["options"]["parent"] == "wlan0"
|
||||
assert kwargs[1]["options"]["ipvlan_mode"] == "l2"
|
||||
|
||||
def test_noop_when_network_exists(self):
|
||||
client = self._make_client([MACVLAN_NETWORK_NAME])
|
||||
create_ipvlan_network(client, "wlan0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
|
||||
client.networks.create.assert_not_called()
|
||||
|
||||
def test_uses_same_network_name_as_macvlan(self):
|
||||
"""Both drivers share the same logical network name so compose files are identical."""
|
||||
client = self._make_client([])
|
||||
create_ipvlan_network(client, "wlan0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
|
||||
assert client.networks.create.call_args[1]["name"] == MACVLAN_NETWORK_NAME
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# setup_host_macvlan / teardown_host_macvlan
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSetupHostMacvlan:
|
||||
@patch("decnet.network.os.geteuid", return_value=0)
|
||||
@patch("decnet.network._run")
|
||||
def test_creates_interface_when_absent(self, mock_run, _):
|
||||
# Simulate interface not existing (returncode != 0)
|
||||
mock_run.side_effect = lambda cmd, **kw: MagicMock(returncode=1) if "show" in cmd else MagicMock(returncode=0)
|
||||
setup_host_macvlan("eth0", "192.168.1.5", "192.168.1.96/27")
|
||||
calls = [str(c) for c in mock_run.call_args_list]
|
||||
assert any("macvlan" in c for c in calls)
|
||||
assert any("mode" in c and "bridge" in c for c in calls)
|
||||
|
||||
@patch("decnet.network.os.geteuid", return_value=0)
|
||||
@patch("decnet.network._run")
|
||||
def test_skips_create_when_interface_exists(self, mock_run, _):
|
||||
mock_run.return_value = MagicMock(returncode=0)
|
||||
setup_host_macvlan("eth0", "192.168.1.5", "192.168.1.96/27")
|
||||
calls = [c[0][0] for c in mock_run.call_args_list]
|
||||
# "ip link add <iface> link ..." should not be called when iface exists
|
||||
assert not any("link" in cmd and "add" in cmd and HOST_MACVLAN_IFACE in cmd for cmd in calls)
|
||||
|
||||
@patch("decnet.network.os.geteuid", return_value=1)
|
||||
def test_requires_root(self, _):
|
||||
with pytest.raises(PermissionError):
|
||||
setup_host_macvlan("eth0", "192.168.1.5", "192.168.1.96/27")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# setup_host_ipvlan / teardown_host_ipvlan
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSetupHostIpvlan:
|
||||
@patch("decnet.network.os.geteuid", return_value=0)
|
||||
@patch("decnet.network._run")
|
||||
def test_creates_ipvlan_interface(self, mock_run, _):
|
||||
mock_run.side_effect = lambda cmd, **kw: MagicMock(returncode=1) if "show" in cmd else MagicMock(returncode=0)
|
||||
setup_host_ipvlan("wlan0", "192.168.1.5", "192.168.1.96/27")
|
||||
calls = [str(c) for c in mock_run.call_args_list]
|
||||
assert any("ipvlan" in c for c in calls)
|
||||
assert any("mode" in c and "l2" in c for c in calls)
|
||||
|
||||
@patch("decnet.network.os.geteuid", return_value=0)
|
||||
@patch("decnet.network._run")
|
||||
def test_uses_ipvlan_iface_name(self, mock_run, _):
|
||||
mock_run.side_effect = lambda cmd, **kw: MagicMock(returncode=1) if "show" in cmd else MagicMock(returncode=0)
|
||||
setup_host_ipvlan("wlan0", "192.168.1.5", "192.168.1.96/27")
|
||||
calls = [str(c) for c in mock_run.call_args_list]
|
||||
assert any(HOST_IPVLAN_IFACE in c for c in calls)
|
||||
assert not any(HOST_MACVLAN_IFACE in c for c in calls)
|
||||
|
||||
@patch("decnet.network.os.geteuid", return_value=1)
|
||||
def test_requires_root(self, _):
|
||||
with pytest.raises(PermissionError):
|
||||
setup_host_ipvlan("wlan0", "192.168.1.5", "192.168.1.96/27")
|
||||
|
||||
@patch("decnet.network.os.geteuid", return_value=0)
|
||||
@patch("decnet.network._run")
|
||||
def test_teardown_uses_ipvlan_iface(self, mock_run, _):
|
||||
mock_run.return_value = MagicMock(returncode=0)
|
||||
teardown_host_ipvlan("192.168.1.96/27")
|
||||
calls = [str(c) for c in mock_run.call_args_list]
|
||||
assert any(HOST_IPVLAN_IFACE in c for c in calls)
|
||||
assert not any(HOST_MACVLAN_IFACE in c for c in calls)
|
||||
@@ -1,248 +0,0 @@
|
||||
"""
|
||||
Tests for the OS TCP/IP fingerprint spoof feature.
|
||||
|
||||
Covers:
|
||||
- os_fingerprint.py: profiles, TTL values, fallback behaviour
|
||||
- archetypes.py: every archetype has a valid nmap_os
|
||||
- config.py: DeckyConfig carries nmap_os
|
||||
- composer.py: base container gets sysctls + cap_add injected
|
||||
- cli.py helpers: nmap_os propagated from archetype → DeckyConfig
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from decnet.archetypes import ARCHETYPES, all_archetypes
|
||||
from decnet.composer import generate_compose
|
||||
from decnet.config import DeckyConfig, DecnetConfig
|
||||
from decnet.os_fingerprint import OS_SYSCTLS, all_os_families, get_os_sysctls
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# os_fingerprint module
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_linux_ttl_is_64():
|
||||
assert get_os_sysctls("linux")["net.ipv4.ip_default_ttl"] == "64"
|
||||
|
||||
|
||||
def test_windows_ttl_is_128():
|
||||
assert get_os_sysctls("windows")["net.ipv4.ip_default_ttl"] == "128"
|
||||
|
||||
|
||||
def test_embedded_ttl_is_255():
|
||||
assert get_os_sysctls("embedded")["net.ipv4.ip_default_ttl"] == "255"
|
||||
|
||||
|
||||
def test_cisco_ttl_is_255():
|
||||
assert get_os_sysctls("cisco")["net.ipv4.ip_default_ttl"] == "255"
|
||||
|
||||
|
||||
def test_bsd_ttl_is_64():
|
||||
assert get_os_sysctls("bsd")["net.ipv4.ip_default_ttl"] == "64"
|
||||
|
||||
|
||||
def test_unknown_os_falls_back_to_linux():
|
||||
result = get_os_sysctls("nonexistent-os")
|
||||
assert result == get_os_sysctls("linux")
|
||||
|
||||
|
||||
def test_get_os_sysctls_returns_copy():
|
||||
"""Mutating the returned dict must not alter the master profile."""
|
||||
s = get_os_sysctls("windows")
|
||||
s["net.ipv4.ip_default_ttl"] = "999"
|
||||
assert OS_SYSCTLS["windows"]["net.ipv4.ip_default_ttl"] == "128"
|
||||
|
||||
|
||||
def test_all_os_families_non_empty():
|
||||
families = all_os_families()
|
||||
assert len(families) > 0
|
||||
assert "linux" in families
|
||||
assert "windows" in families
|
||||
assert "embedded" in families
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Archetypes carry valid nmap_os values
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.parametrize("slug,arch", list(ARCHETYPES.items()))
|
||||
def test_archetype_nmap_os_is_known(slug, arch):
|
||||
assert arch.nmap_os in all_os_families(), (
|
||||
f"Archetype '{slug}' has nmap_os='{arch.nmap_os}' which is not in OS_SYSCTLS"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("slug", ["windows-workstation", "windows-server", "domain-controller"])
|
||||
def test_windows_archetypes_have_windows_nmap_os(slug):
|
||||
assert ARCHETYPES[slug].nmap_os == "windows"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("slug", ["printer", "iot-device", "industrial-control"])
|
||||
def test_embedded_archetypes_have_embedded_nmap_os(slug):
|
||||
assert ARCHETYPES[slug].nmap_os == "embedded"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("slug", ["linux-server", "web-server", "database-server",
|
||||
"mail-server", "file-server", "voip-server",
|
||||
"monitoring-node", "devops-host"])
|
||||
def test_linux_archetypes_have_linux_nmap_os(slug):
|
||||
assert ARCHETYPES[slug].nmap_os == "linux"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DeckyConfig default
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_decky(nmap_os: str = "linux") -> DeckyConfig:
|
||||
return DeckyConfig(
|
||||
name="decky-01",
|
||||
ip="10.0.0.10",
|
||||
services=["ssh"],
|
||||
distro="debian",
|
||||
base_image="debian:bookworm-slim",
|
||||
build_base="debian:bookworm-slim",
|
||||
hostname="test-host",
|
||||
nmap_os=nmap_os,
|
||||
)
|
||||
|
||||
|
||||
def test_deckyconfig_default_nmap_os_is_linux():
|
||||
cfg = DeckyConfig(
|
||||
name="decky-01",
|
||||
ip="10.0.0.10",
|
||||
services=["ssh"],
|
||||
distro="debian",
|
||||
base_image="debian:bookworm-slim",
|
||||
build_base="debian:bookworm-slim",
|
||||
hostname="test-host",
|
||||
)
|
||||
assert cfg.nmap_os == "linux"
|
||||
|
||||
|
||||
def test_deckyconfig_accepts_custom_nmap_os():
|
||||
cfg = _make_decky(nmap_os="windows")
|
||||
assert cfg.nmap_os == "windows"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Composer injects sysctls + cap_add into base container
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _make_config(nmap_os: str = "linux") -> DecnetConfig:
|
||||
return DecnetConfig(
|
||||
mode="unihost",
|
||||
interface="eth0",
|
||||
subnet="10.0.0.0/24",
|
||||
gateway="10.0.0.1",
|
||||
deckies=[_make_decky(nmap_os=nmap_os)],
|
||||
)
|
||||
|
||||
|
||||
def test_compose_base_has_sysctls():
|
||||
compose = generate_compose(_make_config("linux"))
|
||||
base = compose["services"]["decky-01"]
|
||||
assert "sysctls" in base
|
||||
|
||||
|
||||
def test_compose_base_has_cap_net_admin():
|
||||
compose = generate_compose(_make_config("linux"))
|
||||
base = compose["services"]["decky-01"]
|
||||
assert "cap_add" in base
|
||||
assert "NET_ADMIN" in base["cap_add"]
|
||||
|
||||
|
||||
def test_compose_linux_ttl_64():
|
||||
compose = generate_compose(_make_config("linux"))
|
||||
sysctls = compose["services"]["decky-01"]["sysctls"]
|
||||
assert sysctls["net.ipv4.ip_default_ttl"] == "64"
|
||||
|
||||
|
||||
def test_compose_windows_ttl_128():
|
||||
compose = generate_compose(_make_config("windows"))
|
||||
sysctls = compose["services"]["decky-01"]["sysctls"]
|
||||
assert sysctls["net.ipv4.ip_default_ttl"] == "128"
|
||||
|
||||
|
||||
def test_compose_embedded_ttl_255():
|
||||
compose = generate_compose(_make_config("embedded"))
|
||||
sysctls = compose["services"]["decky-01"]["sysctls"]
|
||||
assert sysctls["net.ipv4.ip_default_ttl"] == "255"
|
||||
|
||||
|
||||
def test_compose_service_containers_have_no_sysctls():
|
||||
"""Service containers share the base network namespace — no sysctls needed there."""
|
||||
compose = generate_compose(_make_config("windows"))
|
||||
svc = compose["services"]["decky-01-ssh"]
|
||||
assert "sysctls" not in svc
|
||||
|
||||
|
||||
def test_compose_two_deckies_independent_nmap_os():
|
||||
"""Each decky gets its own OS profile."""
|
||||
decky_win = _make_decky(nmap_os="windows")
|
||||
decky_lin = DeckyConfig(
|
||||
name="decky-02",
|
||||
ip="10.0.0.11",
|
||||
services=["ssh"],
|
||||
distro="debian",
|
||||
base_image="debian:bookworm-slim",
|
||||
build_base="debian:bookworm-slim",
|
||||
hostname="test-host-2",
|
||||
nmap_os="linux",
|
||||
)
|
||||
config = DecnetConfig(
|
||||
mode="unihost",
|
||||
interface="eth0",
|
||||
subnet="10.0.0.0/24",
|
||||
gateway="10.0.0.1",
|
||||
deckies=[decky_win, decky_lin],
|
||||
)
|
||||
compose = generate_compose(config)
|
||||
assert compose["services"]["decky-01"]["sysctls"]["net.ipv4.ip_default_ttl"] == "128"
|
||||
assert compose["services"]["decky-02"]["sysctls"]["net.ipv4.ip_default_ttl"] == "64"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# CLI helper: nmap_os flows from archetype into DeckyConfig
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_build_deckies_windows_archetype_sets_nmap_os():
|
||||
from decnet.archetypes import get_archetype
|
||||
from decnet.cli import _build_deckies
|
||||
|
||||
arch = get_archetype("windows-workstation")
|
||||
deckies = _build_deckies(
|
||||
n=1,
|
||||
ips=["10.0.0.20"],
|
||||
services_explicit=None,
|
||||
randomize_services=False,
|
||||
archetype=arch,
|
||||
)
|
||||
assert deckies[0].nmap_os == "windows"
|
||||
|
||||
|
||||
def test_build_deckies_no_archetype_defaults_linux():
|
||||
from decnet.cli import _build_deckies
|
||||
|
||||
deckies = _build_deckies(
|
||||
n=1,
|
||||
ips=["10.0.0.20"],
|
||||
services_explicit=["ssh"],
|
||||
randomize_services=False,
|
||||
archetype=None,
|
||||
)
|
||||
assert deckies[0].nmap_os == "linux"
|
||||
|
||||
|
||||
def test_build_deckies_embedded_archetype_sets_nmap_os():
|
||||
from decnet.archetypes import get_archetype
|
||||
from decnet.cli import _build_deckies
|
||||
|
||||
arch = get_archetype("iot-device")
|
||||
deckies = _build_deckies(
|
||||
n=1,
|
||||
ips=["10.0.0.20"],
|
||||
services_explicit=None,
|
||||
randomize_services=False,
|
||||
archetype=arch,
|
||||
)
|
||||
assert deckies[0].nmap_os == "embedded"
|
||||
@@ -1,130 +0,0 @@
|
||||
"""
|
||||
Tests for the RealSSHService plugin and the deaddeck archetype.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
|
||||
from decnet.services.registry import all_services, get_service
|
||||
from decnet.archetypes import get_archetype
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _fragment(service_cfg: dict | None = None, log_target: str | None = None) -> dict:
|
||||
return get_service("real_ssh").compose_fragment(
|
||||
"test-decky", log_target=log_target, service_cfg=service_cfg
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Registration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_real_ssh_registered():
|
||||
assert "real_ssh" in all_services()
|
||||
|
||||
|
||||
def test_real_ssh_ports():
|
||||
svc = get_service("real_ssh")
|
||||
assert svc.ports == [22]
|
||||
|
||||
|
||||
def test_real_ssh_is_build_service():
|
||||
svc = get_service("real_ssh")
|
||||
assert svc.default_image == "build"
|
||||
|
||||
|
||||
def test_real_ssh_dockerfile_context_exists():
|
||||
svc = get_service("real_ssh")
|
||||
ctx = svc.dockerfile_context()
|
||||
assert ctx is not None
|
||||
assert ctx.is_dir(), f"Dockerfile context directory missing: {ctx}"
|
||||
assert (ctx / "Dockerfile").exists(), "Dockerfile missing in real_ssh template dir"
|
||||
assert (ctx / "entrypoint.sh").exists(), "entrypoint.sh missing in real_ssh template dir"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# compose_fragment structure
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_compose_fragment_has_build():
|
||||
frag = _fragment()
|
||||
assert "build" in frag
|
||||
assert "context" in frag["build"]
|
||||
|
||||
|
||||
def test_compose_fragment_container_name():
|
||||
frag = _fragment()
|
||||
assert frag["container_name"] == "test-decky-real-ssh"
|
||||
|
||||
|
||||
def test_compose_fragment_restart_policy():
|
||||
frag = _fragment()
|
||||
assert frag["restart"] == "unless-stopped"
|
||||
|
||||
|
||||
def test_compose_fragment_cap_add():
|
||||
frag = _fragment()
|
||||
assert "NET_BIND_SERVICE" in frag.get("cap_add", [])
|
||||
|
||||
|
||||
def test_compose_fragment_default_password():
|
||||
frag = _fragment()
|
||||
env = frag["environment"]
|
||||
assert env["SSH_ROOT_PASSWORD"] == "admin"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# service_cfg overrides
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_custom_password():
|
||||
frag = _fragment(service_cfg={"password": "s3cr3t!"})
|
||||
assert frag["environment"]["SSH_ROOT_PASSWORD"] == "s3cr3t!"
|
||||
|
||||
|
||||
def test_custom_hostname():
|
||||
frag = _fragment(service_cfg={"hostname": "srv-prod-01"})
|
||||
assert frag["environment"]["SSH_HOSTNAME"] == "srv-prod-01"
|
||||
|
||||
|
||||
def test_no_hostname_by_default():
|
||||
frag = _fragment()
|
||||
assert "SSH_HOSTNAME" not in frag["environment"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# log_target: real_ssh does not forward logs via LOG_TARGET
|
||||
# (no log aggregation on the entry-point — attacker shouldn't see it)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_no_log_target_env_injected():
|
||||
frag = _fragment(log_target="10.0.0.1:5140")
|
||||
assert "LOG_TARGET" not in frag.get("environment", {})
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Deaddeck archetype
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_deaddeck_archetype_exists():
|
||||
arch = get_archetype("deaddeck")
|
||||
assert arch.slug == "deaddeck"
|
||||
|
||||
|
||||
def test_deaddeck_uses_real_ssh():
|
||||
arch = get_archetype("deaddeck")
|
||||
assert "real_ssh" in arch.services
|
||||
|
||||
|
||||
def test_deaddeck_nmap_os():
|
||||
arch = get_archetype("deaddeck")
|
||||
assert arch.nmap_os == "linux"
|
||||
|
||||
|
||||
def test_deaddeck_preferred_distros_not_empty():
|
||||
arch = get_archetype("deaddeck")
|
||||
assert len(arch.preferred_distros) >= 1
|
||||
@@ -1,341 +0,0 @@
|
||||
"""
|
||||
Tests for all 25 DECNET service plugins.
|
||||
|
||||
Covers:
|
||||
- Service registration via the plugin registry
|
||||
- compose_fragment structure (container_name, restart, image/build)
|
||||
- LOG_TARGET propagation for custom-build services
|
||||
- dockerfile_context returns Path for build services, None for upstream-image services
|
||||
- Per-service persona config (service_cfg) propagation
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from decnet.services.registry import all_services, get_service
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _fragment(name: str, log_target: str | None = None, service_cfg: dict | None = None) -> dict:
|
||||
return get_service(name).compose_fragment("test-decky", log_target, service_cfg)
|
||||
|
||||
|
||||
def _is_build_service(name: str) -> bool:
|
||||
svc = get_service(name)
|
||||
return svc.default_image == "build"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tier 1: upstream-image services (non-build)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
UPSTREAM_SERVICES = {
|
||||
"telnet": ("cowrie/cowrie", [23]),
|
||||
"conpot": ("honeynet/conpot", [502, 161, 80]),
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tier 2: custom-build services (including ssh, which now uses build)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
BUILD_SERVICES = {
|
||||
"ssh": ([22, 2222], "ssh"),
|
||||
"http": ([80, 443], "http"),
|
||||
"rdp": ([3389], "rdp"),
|
||||
"smb": ([445, 139], "smb"),
|
||||
"ftp": ([21], "ftp"),
|
||||
"smtp": ([25, 587], "smtp"),
|
||||
"elasticsearch": ([9200], "elasticsearch"),
|
||||
"pop3": ([110, 995], "pop3"),
|
||||
"imap": ([143, 993], "imap"),
|
||||
"mysql": ([3306], "mysql"),
|
||||
"mssql": ([1433], "mssql"),
|
||||
"redis": ([6379], "redis"),
|
||||
"mongodb": ([27017], "mongodb"),
|
||||
"postgres": ([5432], "postgres"),
|
||||
"ldap": ([389, 636], "ldap"),
|
||||
"vnc": ([5900], "vnc"),
|
||||
"docker_api": ([2375, 2376], "docker_api"),
|
||||
"k8s": ([6443, 8080], "k8s"),
|
||||
"sip": ([5060], "sip"),
|
||||
"mqtt": ([1883], "mqtt"),
|
||||
"llmnr": ([5355, 5353], "llmnr"),
|
||||
"snmp": ([161], "snmp"),
|
||||
"tftp": ([69], "tftp"),
|
||||
}
|
||||
|
||||
ALL_SERVICE_NAMES = list(UPSTREAM_SERVICES) + list(BUILD_SERVICES)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Registration tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.parametrize("name", ALL_SERVICE_NAMES)
|
||||
def test_service_registered(name):
|
||||
"""Every service must appear in the registry."""
|
||||
registry = all_services()
|
||||
assert name in registry, f"Service '{name}' not found in registry"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name", ALL_SERVICE_NAMES)
|
||||
def test_service_ports_defined(name):
|
||||
"""Every service must declare at least one port."""
|
||||
svc = get_service(name)
|
||||
assert isinstance(svc.ports, list)
|
||||
assert len(svc.ports) >= 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Upstream-image service tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.parametrize("name,expected", [
|
||||
(n, (img, ports)) for n, (img, ports) in UPSTREAM_SERVICES.items()
|
||||
])
|
||||
def test_upstream_image(name, expected):
|
||||
expected_image, _ = expected
|
||||
frag = _fragment(name)
|
||||
assert frag.get("image") == expected_image
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name", UPSTREAM_SERVICES)
|
||||
def test_upstream_no_dockerfile_context(name):
|
||||
assert get_service(name).dockerfile_context() is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name", UPSTREAM_SERVICES)
|
||||
def test_upstream_container_name(name):
|
||||
frag = _fragment(name)
|
||||
assert frag["container_name"] == f"test-decky-{name.replace('_', '-')}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name", UPSTREAM_SERVICES)
|
||||
def test_upstream_restart_policy(name):
|
||||
frag = _fragment(name)
|
||||
assert frag.get("restart") == "unless-stopped"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Build-service tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.parametrize("name", BUILD_SERVICES)
|
||||
def test_build_service_uses_build(name):
|
||||
frag = _fragment(name)
|
||||
assert "build" in frag, f"Service '{name}' fragment missing 'build' key"
|
||||
assert "context" in frag["build"]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name", BUILD_SERVICES)
|
||||
def test_build_service_dockerfile_context_is_path(name):
|
||||
ctx = get_service(name).dockerfile_context()
|
||||
assert isinstance(ctx, Path), f"Service '{name}' dockerfile_context should return a Path"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name", BUILD_SERVICES)
|
||||
def test_build_service_dockerfile_exists(name):
|
||||
ctx = get_service(name).dockerfile_context()
|
||||
dockerfile = ctx / "Dockerfile"
|
||||
assert dockerfile.exists(), f"Dockerfile missing at {dockerfile}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name", BUILD_SERVICES)
|
||||
def test_build_service_container_name(name):
|
||||
frag = _fragment(name)
|
||||
slug = name.replace("_", "-")
|
||||
assert frag["container_name"] == f"test-decky-{slug}"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name", BUILD_SERVICES)
|
||||
def test_build_service_restart_policy(name):
|
||||
frag = _fragment(name)
|
||||
assert frag.get("restart") == "unless-stopped"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name", BUILD_SERVICES)
|
||||
def test_build_service_node_name_env(name):
|
||||
frag = _fragment(name)
|
||||
env = frag.get("environment", {})
|
||||
assert "NODE_NAME" in env
|
||||
assert env["NODE_NAME"] == "test-decky"
|
||||
|
||||
|
||||
# SSH uses COWRIE_OUTPUT_TCP_* instead of LOG_TARGET — exclude from generic tests
|
||||
_LOG_TARGET_SERVICES = [n for n in BUILD_SERVICES if n != "ssh"]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name", _LOG_TARGET_SERVICES)
|
||||
def test_build_service_log_target_propagated(name):
|
||||
frag = _fragment(name, log_target="10.0.0.1:5140")
|
||||
env = frag.get("environment", {})
|
||||
assert env.get("LOG_TARGET") == "10.0.0.1:5140"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name", _LOG_TARGET_SERVICES)
|
||||
def test_build_service_no_log_target_by_default(name):
|
||||
frag = _fragment(name)
|
||||
env = frag.get("environment", {})
|
||||
assert "LOG_TARGET" not in env
|
||||
|
||||
|
||||
def test_ssh_log_target_uses_cowrie_tcp_output():
|
||||
"""SSH forwards logs via Cowrie TCP output, not LOG_TARGET."""
|
||||
env = _fragment("ssh", log_target="10.0.0.1:5140").get("environment", {})
|
||||
assert env.get("COWRIE_OUTPUT_TCP_ENABLED") == "true"
|
||||
assert env.get("COWRIE_OUTPUT_TCP_HOST") == "10.0.0.1"
|
||||
assert env.get("COWRIE_OUTPUT_TCP_PORT") == "5140"
|
||||
assert "LOG_TARGET" not in env
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Port coverage tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@pytest.mark.parametrize("name,expected", [
|
||||
(n, ports) for n, (ports, _) in BUILD_SERVICES.items()
|
||||
])
|
||||
def test_build_service_ports(name, expected):
|
||||
svc = get_service(name)
|
||||
assert svc.ports == expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name,expected", [
|
||||
(n, ports) for n, (_, ports) in UPSTREAM_SERVICES.items()
|
||||
])
|
||||
def test_upstream_service_ports(name, expected):
|
||||
svc = get_service(name)
|
||||
assert svc.ports == expected
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Registry completeness
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_total_service_count():
|
||||
"""Sanity check: at least 25 services registered."""
|
||||
assert len(all_services()) >= 25
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Per-service persona config (service_cfg)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# HTTP -----------------------------------------------------------------------
|
||||
|
||||
def test_http_default_no_extra_env():
|
||||
"""No service_cfg → none of the new env vars should appear."""
|
||||
env = _fragment("http").get("environment", {})
|
||||
for key in ("SERVER_HEADER", "RESPONSE_CODE", "FAKE_APP", "EXTRA_HEADERS", "CUSTOM_BODY", "FILES_DIR"):
|
||||
assert key not in env, f"Expected {key} absent by default"
|
||||
|
||||
|
||||
def test_http_server_header():
|
||||
env = _fragment("http", service_cfg={"server_header": "nginx/1.18.0"}).get("environment", {})
|
||||
assert env.get("SERVER_HEADER") == "nginx/1.18.0"
|
||||
|
||||
|
||||
def test_http_response_code():
|
||||
env = _fragment("http", service_cfg={"response_code": 200}).get("environment", {})
|
||||
assert env.get("RESPONSE_CODE") == "200"
|
||||
|
||||
|
||||
def test_http_fake_app():
|
||||
env = _fragment("http", service_cfg={"fake_app": "wordpress"}).get("environment", {})
|
||||
assert env.get("FAKE_APP") == "wordpress"
|
||||
|
||||
|
||||
def test_http_extra_headers():
|
||||
import json
|
||||
env = _fragment("http", service_cfg={"extra_headers": {"X-Frame-Options": "SAMEORIGIN"}}).get("environment", {})
|
||||
assert "EXTRA_HEADERS" in env
|
||||
assert json.loads(env["EXTRA_HEADERS"]) == {"X-Frame-Options": "SAMEORIGIN"}
|
||||
|
||||
|
||||
def test_http_custom_body():
|
||||
env = _fragment("http", service_cfg={"custom_body": "<html>hi</html>"}).get("environment", {})
|
||||
assert env.get("CUSTOM_BODY") == "<html>hi</html>"
|
||||
|
||||
|
||||
def test_http_empty_service_cfg_no_extra_env():
|
||||
env = _fragment("http", service_cfg={}).get("environment", {})
|
||||
assert "SERVER_HEADER" not in env
|
||||
|
||||
|
||||
# SSH ------------------------------------------------------------------------
|
||||
|
||||
def test_ssh_default_no_persona_env():
|
||||
env = _fragment("ssh").get("environment", {})
|
||||
for key in ("COWRIE_HONEYPOT_KERNEL_VERSION", "COWRIE_HONEYPOT_HARDWARE_PLATFORM",
|
||||
"COWRIE_SSH_VERSION", "COWRIE_USERDB_ENTRIES"):
|
||||
assert key not in env, f"Expected {key} absent by default"
|
||||
|
||||
|
||||
def test_ssh_kernel_version():
|
||||
env = _fragment("ssh", service_cfg={"kernel_version": "5.15.0-76-generic"}).get("environment", {})
|
||||
assert env.get("COWRIE_HONEYPOT_KERNEL_VERSION") == "5.15.0-76-generic"
|
||||
|
||||
|
||||
def test_ssh_hardware_platform():
|
||||
env = _fragment("ssh", service_cfg={"hardware_platform": "aarch64"}).get("environment", {})
|
||||
assert env.get("COWRIE_HONEYPOT_HARDWARE_PLATFORM") == "aarch64"
|
||||
|
||||
|
||||
def test_ssh_banner():
|
||||
env = _fragment("ssh", service_cfg={"ssh_banner": "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.3"}).get("environment", {})
|
||||
assert env.get("COWRIE_SSH_VERSION") == "SSH-2.0-OpenSSH_8.9p1 Ubuntu-3ubuntu0.3"
|
||||
|
||||
|
||||
def test_ssh_users():
|
||||
env = _fragment("ssh", service_cfg={"users": "root:toor,admin:admin123"}).get("environment", {})
|
||||
assert env.get("COWRIE_USERDB_ENTRIES") == "root:toor,admin:admin123"
|
||||
|
||||
|
||||
# SMTP -----------------------------------------------------------------------
|
||||
|
||||
def test_smtp_banner():
|
||||
env = _fragment("smtp", service_cfg={"banner": "220 mail.corp.local ESMTP Sendmail"}).get("environment", {})
|
||||
assert env.get("SMTP_BANNER") == "220 mail.corp.local ESMTP Sendmail"
|
||||
|
||||
|
||||
def test_smtp_mta():
|
||||
env = _fragment("smtp", service_cfg={"mta": "mail.corp.local"}).get("environment", {})
|
||||
assert env.get("SMTP_MTA") == "mail.corp.local"
|
||||
|
||||
|
||||
def test_smtp_default_no_extra_env():
|
||||
env = _fragment("smtp").get("environment", {})
|
||||
assert "SMTP_BANNER" not in env
|
||||
assert "SMTP_MTA" not in env
|
||||
|
||||
|
||||
# MySQL ----------------------------------------------------------------------
|
||||
|
||||
def test_mysql_version():
|
||||
env = _fragment("mysql", service_cfg={"version": "8.0.33"}).get("environment", {})
|
||||
assert env.get("MYSQL_VERSION") == "8.0.33"
|
||||
|
||||
|
||||
def test_mysql_default_no_version_env():
|
||||
env = _fragment("mysql").get("environment", {})
|
||||
assert "MYSQL_VERSION" not in env
|
||||
|
||||
|
||||
# Redis ----------------------------------------------------------------------
|
||||
|
||||
def test_redis_version():
|
||||
env = _fragment("redis", service_cfg={"version": "6.2.14"}).get("environment", {})
|
||||
assert env.get("REDIS_VERSION") == "6.2.14"
|
||||
|
||||
|
||||
def test_redis_os_string():
|
||||
env = _fragment("redis", service_cfg={"os_string": "Linux 4.19.0"}).get("environment", {})
|
||||
assert env.get("REDIS_OS") == "Linux 4.19.0"
|
||||
|
||||
|
||||
def test_redis_default_no_extra_env():
|
||||
env = _fragment("redis").get("environment", {})
|
||||
assert "REDIS_VERSION" not in env
|
||||
assert "REDIS_OS" not in env
|
||||
@@ -1,135 +0,0 @@
|
||||
"""Tests for RFC 5424 syslog formatter."""
|
||||
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from decnet.logging.syslog_formatter import (
|
||||
SEVERITY_ERROR,
|
||||
SEVERITY_INFO,
|
||||
SEVERITY_WARNING,
|
||||
format_rfc5424,
|
||||
)
|
||||
|
||||
# RFC 5424 header regex: <PRI>1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID SD [MSG]
|
||||
_RFC5424_RE = re.compile(
|
||||
r"^<(\d+)>1 " # PRI + version
|
||||
r"(\S+) " # TIMESTAMP
|
||||
r"(\S+) " # HOSTNAME
|
||||
r"(\S+) " # APP-NAME
|
||||
r"- " # PROCID (NILVALUE)
|
||||
r"(\S+) " # MSGID
|
||||
r"(.+)$", # SD + optional MSG
|
||||
)
|
||||
|
||||
|
||||
def _parse(line: str) -> re.Match:
|
||||
m = _RFC5424_RE.match(line)
|
||||
assert m is not None, f"Not RFC 5424: {line!r}"
|
||||
return m
|
||||
|
||||
|
||||
class TestPRI:
|
||||
def test_info_pri(self):
|
||||
line = format_rfc5424("http", "host1", "request", SEVERITY_INFO)
|
||||
m = _parse(line)
|
||||
pri = int(m.group(1))
|
||||
assert pri == 16 * 8 + 6 # local0 + info = 134
|
||||
|
||||
def test_warning_pri(self):
|
||||
line = format_rfc5424("http", "host1", "warn", SEVERITY_WARNING)
|
||||
pri = int(_parse(line).group(1))
|
||||
assert pri == 16 * 8 + 4 # 132
|
||||
|
||||
def test_error_pri(self):
|
||||
line = format_rfc5424("http", "host1", "err", SEVERITY_ERROR)
|
||||
pri = int(_parse(line).group(1))
|
||||
assert pri == 16 * 8 + 3 # 131
|
||||
|
||||
def test_pri_range(self):
|
||||
for sev in range(8):
|
||||
line = format_rfc5424("svc", "h", "e", sev)
|
||||
pri = int(_parse(line).group(1))
|
||||
assert 0 <= pri <= 191
|
||||
|
||||
|
||||
class TestTimestamp:
|
||||
def test_utc_timestamp(self):
|
||||
ts_str = datetime(2026, 4, 4, 12, 0, 0, tzinfo=timezone.utc).isoformat()
|
||||
line = format_rfc5424("svc", "h", "e", timestamp=datetime(2026, 4, 4, 12, 0, 0, tzinfo=timezone.utc))
|
||||
m = _parse(line)
|
||||
assert m.group(2) == ts_str
|
||||
|
||||
def test_default_timestamp_is_utc(self):
|
||||
line = format_rfc5424("svc", "h", "e")
|
||||
ts_field = _parse(line).group(2)
|
||||
# Should end with +00:00 or Z
|
||||
assert "+" in ts_field or ts_field.endswith("Z")
|
||||
|
||||
|
||||
class TestHeader:
|
||||
def test_hostname(self):
|
||||
line = format_rfc5424("http", "decky-01", "request")
|
||||
assert _parse(line).group(3) == "decky-01"
|
||||
|
||||
def test_appname(self):
|
||||
line = format_rfc5424("mysql", "host", "login_attempt")
|
||||
assert _parse(line).group(4) == "mysql"
|
||||
|
||||
def test_msgid(self):
|
||||
line = format_rfc5424("ftp", "host", "login_attempt")
|
||||
assert _parse(line).group(5) == "login_attempt"
|
||||
|
||||
def test_procid_is_nilvalue(self):
|
||||
line = format_rfc5424("svc", "h", "e")
|
||||
assert " - " in line # PROCID is always NILVALUE
|
||||
|
||||
def test_appname_truncated(self):
|
||||
long_name = "a" * 100
|
||||
line = format_rfc5424(long_name, "h", "e")
|
||||
appname = _parse(line).group(4)
|
||||
assert len(appname) <= 48
|
||||
|
||||
def test_msgid_truncated(self):
|
||||
long_msgid = "x" * 100
|
||||
line = format_rfc5424("svc", "h", long_msgid)
|
||||
msgid = _parse(line).group(5)
|
||||
assert len(msgid) <= 32
|
||||
|
||||
|
||||
class TestStructuredData:
|
||||
def test_nilvalue_when_no_fields(self):
|
||||
line = format_rfc5424("svc", "h", "e")
|
||||
sd_and_msg = _parse(line).group(6)
|
||||
assert sd_and_msg.startswith("-")
|
||||
|
||||
def test_sd_element_present(self):
|
||||
line = format_rfc5424("http", "h", "request", remote_addr="1.2.3.4", method="GET")
|
||||
sd_and_msg = _parse(line).group(6)
|
||||
assert sd_and_msg.startswith("[decnet@55555 ")
|
||||
assert 'remote_addr="1.2.3.4"' in sd_and_msg
|
||||
assert 'method="GET"' in sd_and_msg
|
||||
|
||||
def test_sd_escape_double_quote(self):
|
||||
line = format_rfc5424("svc", "h", "e", ua='foo"bar')
|
||||
assert r'ua="foo\"bar"' in line
|
||||
|
||||
def test_sd_escape_backslash(self):
|
||||
line = format_rfc5424("svc", "h", "e", path="a\\b")
|
||||
assert r'path="a\\b"' in line
|
||||
|
||||
def test_sd_escape_close_bracket(self):
|
||||
line = format_rfc5424("svc", "h", "e", val="a]b")
|
||||
assert r'val="a\]b"' in line
|
||||
|
||||
|
||||
class TestMsg:
|
||||
def test_optional_msg_appended(self):
|
||||
line = format_rfc5424("svc", "h", "e", msg="hello world")
|
||||
assert line.endswith(" hello world")
|
||||
|
||||
def test_no_msg_no_trailing_space_in_sd(self):
|
||||
line = format_rfc5424("svc", "h", "e", key="val")
|
||||
# SD element closes with ]
|
||||
assert line.rstrip().endswith("]")
|
||||
Reference in New Issue
Block a user