refactor(tests): move flat tests/*.py into per-subsystem subfolders

Groups every flat test_*.py under the module it exercises, matching the
existing tests/{profiler,sniffer,prober,collector,correlation,cli,web,
topology,swarm,bus,updater,api,docker,geoip,...} layout. New folders:
services/, fleet/, config/, logging/, db/ (+ db/mysql/), telemetry/,
mutator/, core/.

Path-dependent __file__ references bumped an extra .parent in three
files that moved one level deeper:
- tests/sniffer/test_sniffer_ja3.py   (template path)
- tests/services/test_ssh_capture_emit.py (template path)
- tests/cli/test_mode_gating.py  (REPO root)
- tests/web/test_env_lazy_jwt.py (repo var)

Also drops two SQLite runtime artifacts (test_decnet.db-{shm,wal}) that
were leaking into the repo from a previous test run.

Fixes two test_service_isolation cases that patched asyncio.sleep (no
longer on the profiler main-loop hot path — same pre-existing bug I
fixed earlier in test_attacker_worker.py) by patching asyncio.wait_for
and passing interval=0.
This commit is contained in:
2026-04-23 21:34:25 -04:00
parent 21e6820714
commit ea95a009df
78 changed files with 18 additions and 10 deletions

0
tests/core/__init__.py Normal file
View File

66
tests/core/test_build.py Normal file
View File

@@ -0,0 +1,66 @@
"""Smoke test: verify the package and all submodules import cleanly."""
import importlib
import pytest
MODULES = [
"decnet",
"decnet.cli",
"decnet.config",
"decnet.composer",
"decnet.engine",
"decnet.engine.deployer",
"decnet.collector",
"decnet.collector.worker",
"decnet.mutator",
"decnet.mutator.engine",
"decnet.fleet",
"decnet.network",
"decnet.archetypes",
"decnet.distros",
"decnet.os_fingerprint",
"decnet.ini_loader",
"decnet.custom_service",
"decnet.correlation",
"decnet.correlation.engine",
"decnet.correlation.graph",
"decnet.correlation.parser",
"decnet.logging",
"decnet.logging.file_handler",
"decnet.logging.forwarder",
"decnet.logging.syslog_formatter",
"decnet.services",
"decnet.services.registry",
"decnet.services.base",
"decnet.services.ssh",
"decnet.services.ftp",
"decnet.services.http",
"decnet.services.smb",
"decnet.services.rdp",
"decnet.services.smtp",
"decnet.services.mysql",
"decnet.services.postgres",
"decnet.services.redis",
"decnet.services.mongodb",
"decnet.services.mssql",
"decnet.services.elasticsearch",
"decnet.services.ldap",
"decnet.services.k8s",
"decnet.services.docker_api",
"decnet.services.vnc",
"decnet.services.telnet",
"decnet.services.tftp",
"decnet.services.snmp",
"decnet.services.sip",
"decnet.services.mqtt",
"decnet.services.llmnr",
"decnet.services.imap",
"decnet.services.pop3",
"decnet.services.conpot",
"decnet.services.registry",
]
@pytest.mark.parametrize("module", MODULES)
def test_module_imports(module):
importlib.import_module(module)

View File

@@ -0,0 +1,404 @@
"""Tests for attacker fingerprint extraction in the ingester."""
import pytest
from unittest.mock import AsyncMock, MagicMock, call
from decnet.web.ingester import _extract_bounty
def _make_repo():
repo = MagicMock()
repo.add_bounty = AsyncMock()
return repo
# ---------------------------------------------------------------------------
# HTTP User-Agent
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_http_useragent_extracted():
repo = _make_repo()
log_data = {
"decky": "decky-01",
"service": "http",
"attacker_ip": "10.0.0.1",
"event_type": "request",
"fields": {
"method": "GET",
"path": "/admin",
"headers": {"User-Agent": "Nikto/2.1.6", "Host": "target"},
},
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_awaited_once()
call_kwargs = repo.add_bounty.call_args[0][0]
assert call_kwargs["bounty_type"] == "fingerprint"
assert call_kwargs["payload"]["fingerprint_type"] == "http_useragent"
assert call_kwargs["payload"]["value"] == "Nikto/2.1.6"
assert call_kwargs["payload"]["path"] == "/admin"
assert call_kwargs["payload"]["method"] == "GET"
@pytest.mark.asyncio
async def test_http_useragent_lowercase_key():
repo = _make_repo()
log_data = {
"decky": "decky-01",
"service": "http",
"attacker_ip": "10.0.0.2",
"event_type": "request",
"fields": {
"headers": {"user-agent": "sqlmap/1.7"},
},
}
await _extract_bounty(repo, log_data)
call_kwargs = repo.add_bounty.call_args[0][0]
assert call_kwargs["payload"]["value"] == "sqlmap/1.7"
@pytest.mark.asyncio
async def test_http_no_useragent_no_fingerprint_bounty():
repo = _make_repo()
log_data = {
"decky": "decky-01",
"service": "http",
"attacker_ip": "10.0.0.3",
"event_type": "request",
"fields": {
"headers": {"Host": "target"},
},
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_not_awaited()
@pytest.mark.asyncio
async def test_http_headers_not_dict_no_crash():
repo = _make_repo()
log_data = {
"decky": "decky-01",
"service": "http",
"attacker_ip": "10.0.0.4",
"event_type": "request",
"fields": {"headers": "raw-string-not-a-dict"},
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_not_awaited()
# ---------------------------------------------------------------------------
# VNC client version
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_vnc_client_version_extracted():
repo = _make_repo()
log_data = {
"decky": "decky-02",
"service": "vnc",
"attacker_ip": "10.0.0.5",
"event_type": "version",
"fields": {"client_version": "RFB 003.008", "src": "10.0.0.5"},
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_awaited_once()
call_kwargs = repo.add_bounty.call_args[0][0]
assert call_kwargs["bounty_type"] == "fingerprint"
assert call_kwargs["payload"]["fingerprint_type"] == "vnc_client_version"
assert call_kwargs["payload"]["value"] == "RFB 003.008"
@pytest.mark.asyncio
async def test_vnc_non_version_event_no_fingerprint():
repo = _make_repo()
log_data = {
"decky": "decky-02",
"service": "vnc",
"attacker_ip": "10.0.0.6",
"event_type": "auth_response",
"fields": {"client_version": "RFB 003.008", "src": "10.0.0.6"},
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_not_awaited()
@pytest.mark.asyncio
async def test_vnc_version_event_no_client_version_field():
repo = _make_repo()
log_data = {
"decky": "decky-02",
"service": "vnc",
"attacker_ip": "10.0.0.7",
"event_type": "version",
"fields": {"src": "10.0.0.7"},
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_not_awaited()
# ---------------------------------------------------------------------------
# Credential extraction unaffected
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_credential_still_extracted_alongside_fingerprint():
repo = _make_repo()
log_data = {
"decky": "decky-03",
"service": "ftp",
"attacker_ip": "10.0.0.8",
"event_type": "auth_attempt",
"fields": {"username": "admin", "password": "1234"},
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_awaited_once()
call_kwargs = repo.add_bounty.call_args[0][0]
assert call_kwargs["bounty_type"] == "credential"
@pytest.mark.asyncio
async def test_http_credential_and_fingerprint_both_extracted():
"""An HTTP login attempt can yield both a credential and a UA fingerprint."""
repo = _make_repo()
log_data = {
"decky": "decky-03",
"service": "http",
"attacker_ip": "10.0.0.9",
"event_type": "request",
"fields": {
"username": "root",
"password": "toor",
"headers": {"User-Agent": "curl/7.88.1"},
},
}
await _extract_bounty(repo, log_data)
assert repo.add_bounty.await_count == 2
types = {c[0][0]["bounty_type"] for c in repo.add_bounty.call_args_list}
assert types == {"credential", "fingerprint"}
# ---------------------------------------------------------------------------
# Edge cases
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_fields_not_dict_no_crash():
repo = _make_repo()
log_data = {
"decky": "decky-04",
"service": "http",
"attacker_ip": "10.0.0.10",
"event_type": "request",
"fields": None,
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_not_awaited()
@pytest.mark.asyncio
async def test_fields_missing_entirely_no_crash():
repo = _make_repo()
log_data = {
"decky": "decky-04",
"service": "http",
"attacker_ip": "10.0.0.11",
"event_type": "request",
}
await _extract_bounty(repo, log_data)
repo.add_bounty.assert_not_awaited()
# ---------------------------------------------------------------------------
# JA4/JA4S extraction (sniffer)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ja4_included_in_ja3_bounty():
repo = _make_repo()
log_data = {
"decky": "decky-05",
"service": "sniffer",
"attacker_ip": "10.0.0.20",
"event_type": "tls_session",
"fields": {
"ja3": "abc123",
"ja3s": "def456",
"ja4": "t13d0203h2_aabbccddee00_112233445566",
"ja4s": "t1302h2_ffeeddccbbaa",
"tls_version": "TLS 1.3",
"dst_port": "443",
},
}
await _extract_bounty(repo, log_data)
calls = repo.add_bounty.call_args_list
ja3_calls = [c for c in calls if c[0][0]["payload"].get("fingerprint_type") == "ja3"]
assert len(ja3_calls) == 1
payload = ja3_calls[0][0][0]["payload"]
assert payload["ja4"] == "t13d0203h2_aabbccddee00_112233445566"
assert payload["ja4s"] == "t1302h2_ffeeddccbbaa"
# ---------------------------------------------------------------------------
# JA4L latency extraction
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ja4l_bounty_extracted():
repo = _make_repo()
log_data = {
"decky": "decky-05",
"service": "sniffer",
"attacker_ip": "10.0.0.21",
"event_type": "tls_session",
"fields": {
"ja4l_rtt_ms": "12.5",
"ja4l_client_ttl": "64",
},
}
await _extract_bounty(repo, log_data)
calls = repo.add_bounty.call_args_list
ja4l_calls = [c for c in calls if c[0][0]["payload"].get("fingerprint_type") == "ja4l"]
assert len(ja4l_calls) == 1
payload = ja4l_calls[0][0][0]["payload"]
assert payload["rtt_ms"] == "12.5"
assert payload["client_ttl"] == "64"
@pytest.mark.asyncio
async def test_ja4l_not_extracted_without_rtt():
repo = _make_repo()
log_data = {
"decky": "decky-05",
"service": "sniffer",
"attacker_ip": "10.0.0.22",
"event_type": "tls_session",
"fields": {
"ja4l_client_ttl": "64",
},
}
await _extract_bounty(repo, log_data)
calls = repo.add_bounty.call_args_list
ja4l_calls = [c for c in calls if c[0][0].get("payload", {}).get("fingerprint_type") == "ja4l"]
assert len(ja4l_calls) == 0
# ---------------------------------------------------------------------------
# TLS session resumption extraction
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_tls_resumption_bounty_extracted():
repo = _make_repo()
log_data = {
"decky": "decky-05",
"service": "sniffer",
"attacker_ip": "10.0.0.23",
"event_type": "tls_client_hello",
"fields": {
"resumption": "session_ticket,psk",
},
}
await _extract_bounty(repo, log_data)
calls = repo.add_bounty.call_args_list
res_calls = [c for c in calls if c[0][0]["payload"].get("fingerprint_type") == "tls_resumption"]
assert len(res_calls) == 1
assert res_calls[0][0][0]["payload"]["mechanisms"] == "session_ticket,psk"
@pytest.mark.asyncio
async def test_no_resumption_no_bounty():
repo = _make_repo()
log_data = {
"decky": "decky-05",
"service": "sniffer",
"attacker_ip": "10.0.0.24",
"event_type": "tls_client_hello",
"fields": {
"ja3": "abc123",
},
}
await _extract_bounty(repo, log_data)
calls = repo.add_bounty.call_args_list
res_calls = [c for c in calls if c[0][0]["payload"].get("fingerprint_type") == "tls_resumption"]
assert len(res_calls) == 0
# ---------------------------------------------------------------------------
# TLS certificate extraction
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_tls_certificate_bounty_extracted():
repo = _make_repo()
log_data = {
"decky": "decky-05",
"service": "sniffer",
"attacker_ip": "10.0.0.25",
"event_type": "tls_certificate",
"fields": {
"subject_cn": "evil.c2.local",
"issuer": "CN=Evil CA",
"self_signed": "true",
"not_before": "230101000000Z",
"not_after": "260101000000Z",
"sans": "evil.c2.local,*.evil.c2.local",
"sni": "evil.c2.local",
},
}
await _extract_bounty(repo, log_data)
calls = repo.add_bounty.call_args_list
cert_calls = [c for c in calls if c[0][0]["payload"].get("fingerprint_type") == "tls_certificate"]
assert len(cert_calls) == 1
payload = cert_calls[0][0][0]["payload"]
assert payload["subject_cn"] == "evil.c2.local"
assert payload["self_signed"] == "true"
assert payload["issuer"] == "CN=Evil CA"
@pytest.mark.asyncio
async def test_tls_certificate_not_extracted_from_non_sniffer():
repo = _make_repo()
log_data = {
"decky": "decky-05",
"service": "http",
"attacker_ip": "10.0.0.26",
"event_type": "tls_certificate",
"fields": {
"subject_cn": "not-from-sniffer.local",
},
}
await _extract_bounty(repo, log_data)
calls = repo.add_bounty.call_args_list
cert_calls = [c for c in calls if c[0][0].get("payload", {}).get("fingerprint_type") == "tls_certificate"]
assert len(cert_calls) == 0
# ---------------------------------------------------------------------------
# Multiple fingerprints from single sniffer log
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_sniffer_log_yields_multiple_fingerprint_types():
"""A complete TLS session log with JA3 + JA4L + resumption yields 3 bounties."""
repo = _make_repo()
log_data = {
"decky": "decky-05",
"service": "sniffer",
"attacker_ip": "10.0.0.30",
"event_type": "tls_session",
"fields": {
"ja3": "abc123",
"ja3s": "def456",
"ja4": "t13d0203h2_aabb_ccdd",
"ja4s": "t1302h2_eeff",
"ja4l_rtt_ms": "5.2",
"ja4l_client_ttl": "128",
"resumption": "session_ticket",
"tls_version": "TLS 1.3",
"dst_port": "443",
},
}
await _extract_bounty(repo, log_data)
assert repo.add_bounty.await_count == 3
types = {c[0][0]["payload"]["fingerprint_type"] for c in repo.add_bounty.call_args_list}
assert types == {"ja3", "ja4l", "tls_resumption"}

402
tests/core/test_network.py Normal file
View File

@@ -0,0 +1,402 @@
"""
Tests for decnet.network utility functions.
"""
from unittest.mock import MagicMock, patch
import pytest
from decnet.network import (
HOST_IPVLAN_IFACE,
HOST_MACVLAN_IFACE,
MACVLAN_NETWORK_NAME,
allocate_ips,
create_ipvlan_network,
create_macvlan_network,
detect_interface,
detect_subnet,
get_host_ip,
ips_to_range,
remove_macvlan_network,
setup_host_ipvlan,
setup_host_macvlan,
teardown_host_ipvlan,
teardown_host_macvlan,
)
# ---------------------------------------------------------------------------
# ips_to_range
# ---------------------------------------------------------------------------
class TestIpsToRange:
def test_single_ip(self):
assert ips_to_range(["192.168.1.100"]) == "192.168.1.100/32"
def test_consecutive_small_range(self):
# .97.101: max^min = 4, bit_length=3, prefix=29 → .96/29
result = ips_to_range([f"192.168.1.{i}" for i in range(97, 102)])
from ipaddress import IPv4Network, IPv4Address
net = IPv4Network(result)
for i in range(97, 102):
assert IPv4Address(f"192.168.1.{i}") in net
def test_range_crossing_cidr_boundary(self):
# .110.119 crosses the /28 boundary (.96.111 vs .112.127)
# Subtraction gives /28 (wrong), XOR gives /27 (correct)
ips = [f"192.168.1.{i}" for i in range(110, 120)]
result = ips_to_range(ips)
from ipaddress import IPv4Network, IPv4Address
net = IPv4Network(result)
for i in range(110, 120):
assert IPv4Address(f"192.168.1.{i}") in net, (
f"192.168.1.{i} not in computed range {result}"
)
def test_all_ips_covered(self):
# Larger spread: .10.200
ips = [f"10.0.0.{i}" for i in range(10, 201)]
result = ips_to_range(ips)
from ipaddress import IPv4Network, IPv4Address
net = IPv4Network(result)
for i in range(10, 201):
assert IPv4Address(f"10.0.0.{i}") in net
def test_two_ips_same_cidr(self):
# .100 and .101 share /31
result = ips_to_range(["192.168.1.100", "192.168.1.101"])
from ipaddress import IPv4Network, IPv4Address
net = IPv4Network(result)
assert IPv4Address("192.168.1.100") in net
assert IPv4Address("192.168.1.101") in net
# ---------------------------------------------------------------------------
# create_macvlan_network
# ---------------------------------------------------------------------------
class TestCreateMacvlanNetwork:
def _make_client(self, existing=None, existing_driver="macvlan",
ipam_subnet="192.168.1.0/24", ipam_gateway="192.168.1.1",
ipam_range="192.168.1.96/27"):
client = MagicMock()
nets = [MagicMock(name=n) for n in (existing or [])]
for net, n in zip(nets, (existing or [])):
net.name = n
net.attrs = {
"Driver": existing_driver,
"Containers": {},
"IPAM": {"Config": [{
"Subnet": ipam_subnet,
"Gateway": ipam_gateway,
"IPRange": ipam_range,
}]},
}
client.networks.list.return_value = nets
return client
def test_creates_network_when_absent(self):
client = self._make_client([])
create_macvlan_network(client, "eth0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
client.networks.create.assert_called_once()
kwargs = client.networks.create.call_args
assert kwargs[1]["driver"] == "macvlan"
assert kwargs[1]["name"] == MACVLAN_NETWORK_NAME
assert kwargs[1]["options"]["parent"] == "eth0"
def test_noop_when_network_exists(self):
client = self._make_client([MACVLAN_NETWORK_NAME])
create_macvlan_network(client, "eth0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
client.networks.create.assert_not_called()
def test_rebuilds_when_ipam_subnet_drifted(self):
"""Existing net matches driver+name, but IPAM pool is stale. Reusing it
hands out addresses from the old pool — surfaces as 'Address already in
use'. Must tear down + recreate."""
client = self._make_client([MACVLAN_NETWORK_NAME], ipam_subnet="10.0.0.0/24")
old_net = client.networks.list.return_value[0]
create_macvlan_network(client, "eth0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
old_net.remove.assert_called_once()
client.networks.create.assert_called_once()
# ---------------------------------------------------------------------------
# create_ipvlan_network
# ---------------------------------------------------------------------------
class TestCreateIpvlanNetwork:
def _make_client(self, existing=None, existing_driver="ipvlan",
ipam_subnet="192.168.1.0/24", ipam_gateway="192.168.1.1",
ipam_range="192.168.1.96/27"):
client = MagicMock()
nets = [MagicMock(name=n) for n in (existing or [])]
for net, n in zip(nets, (existing or [])):
net.name = n
net.attrs = {
"Driver": existing_driver,
"Containers": {},
"IPAM": {"Config": [{
"Subnet": ipam_subnet,
"Gateway": ipam_gateway,
"IPRange": ipam_range,
}]},
}
client.networks.list.return_value = nets
return client
def test_creates_ipvlan_network(self):
client = self._make_client([])
create_ipvlan_network(client, "wlan0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
client.networks.create.assert_called_once()
kwargs = client.networks.create.call_args
assert kwargs[1]["driver"] == "ipvlan"
assert kwargs[1]["options"]["parent"] == "wlan0"
assert kwargs[1]["options"]["ipvlan_mode"] == "l2"
def test_noop_when_network_exists(self):
client = self._make_client([MACVLAN_NETWORK_NAME])
create_ipvlan_network(client, "wlan0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
client.networks.create.assert_not_called()
def test_replaces_macvlan_network_with_ipvlan(self):
"""If an old macvlan-driver net exists under the same name, remove+recreate.
Short-circuiting on name alone leaves Docker attaching containers to the
wrong driver — on the next port create the parent NIC goes EBUSY because
macvlan and ipvlan slaves can't share it."""
client = self._make_client([MACVLAN_NETWORK_NAME], existing_driver="macvlan")
old_net = client.networks.list.return_value[0]
create_ipvlan_network(client, "wlan0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
old_net.remove.assert_called_once()
client.networks.create.assert_called_once()
assert client.networks.create.call_args[1]["driver"] == "ipvlan"
def test_uses_same_network_name_as_macvlan(self):
"""Both drivers share the same logical network name so compose files are identical."""
client = self._make_client([])
create_ipvlan_network(client, "wlan0", "192.168.1.0/24", "192.168.1.1", "192.168.1.96/27")
assert client.networks.create.call_args[1]["name"] == MACVLAN_NETWORK_NAME
# ---------------------------------------------------------------------------
# setup_host_macvlan / teardown_host_macvlan
# ---------------------------------------------------------------------------
class TestSetupHostMacvlan:
@patch("decnet.network.os.geteuid", return_value=0)
@patch("decnet.network._run")
def test_creates_interface_when_absent(self, mock_run, _):
# Simulate interface not existing (returncode != 0)
mock_run.side_effect = lambda cmd, **kw: MagicMock(returncode=1) if "show" in cmd else MagicMock(returncode=0)
setup_host_macvlan("eth0", "192.168.1.5", "192.168.1.96/27")
calls = [str(c) for c in mock_run.call_args_list]
assert any("macvlan" in c for c in calls)
assert any("mode" in c and "bridge" in c for c in calls)
@patch("decnet.network.os.geteuid", return_value=0)
@patch("decnet.network._run")
def test_skips_create_when_interface_exists(self, mock_run, _):
mock_run.return_value = MagicMock(returncode=0)
setup_host_macvlan("eth0", "192.168.1.5", "192.168.1.96/27")
calls = [c[0][0] for c in mock_run.call_args_list]
# "ip link add <iface> link ..." should not be called when iface exists.
# (The opportunistic `ip link del decnet_ipvlan0` cleanup is allowed.)
add_cmds = [cmd for cmd in calls if cmd[:3] == ["ip", "link", "add"]]
assert not any(HOST_MACVLAN_IFACE in cmd for cmd in add_cmds)
@patch("decnet.network.os.geteuid", return_value=1)
def test_requires_root(self, _):
with pytest.raises(PermissionError):
setup_host_macvlan("eth0", "192.168.1.5", "192.168.1.96/27")
# ---------------------------------------------------------------------------
# setup_host_ipvlan / teardown_host_ipvlan
# ---------------------------------------------------------------------------
class TestSetupHostIpvlan:
@patch("decnet.network.os.geteuid", return_value=0)
@patch("decnet.network._run")
def test_creates_ipvlan_interface(self, mock_run, _):
mock_run.side_effect = lambda cmd, **kw: MagicMock(returncode=1) if "show" in cmd else MagicMock(returncode=0)
setup_host_ipvlan("wlan0", "192.168.1.5", "192.168.1.96/27")
calls = [str(c) for c in mock_run.call_args_list]
assert any("ipvlan" in c for c in calls)
assert any("mode" in c and "l2" in c for c in calls)
@patch("decnet.network.os.geteuid", return_value=0)
@patch("decnet.network._run")
def test_uses_ipvlan_iface_name(self, mock_run, _):
mock_run.side_effect = lambda cmd, **kw: MagicMock(returncode=1) if "show" in cmd else MagicMock(returncode=0)
setup_host_ipvlan("wlan0", "192.168.1.5", "192.168.1.96/27")
calls = [c[0][0] for c in mock_run.call_args_list]
# Primary interface created is the ipvlan slave.
assert any("add" in cmd and HOST_IPVLAN_IFACE in cmd for cmd in calls)
# The only macvlan reference allowed is the opportunistic `del`
# that cleans up a stale helper from a prior macvlan deploy.
macvlan_refs = [cmd for cmd in calls if HOST_MACVLAN_IFACE in cmd]
assert all(cmd[:3] == ["ip", "link", "del"] for cmd in macvlan_refs)
@patch("decnet.network.os.geteuid", return_value=1)
def test_requires_root(self, _):
with pytest.raises(PermissionError):
setup_host_ipvlan("wlan0", "192.168.1.5", "192.168.1.96/27")
@patch("decnet.network.os.geteuid", return_value=0)
@patch("decnet.network._run")
def test_teardown_uses_ipvlan_iface(self, mock_run, _):
mock_run.return_value = MagicMock(returncode=0)
teardown_host_ipvlan("192.168.1.96/27")
calls = [str(c) for c in mock_run.call_args_list]
assert any(HOST_IPVLAN_IFACE in c for c in calls)
assert not any(HOST_MACVLAN_IFACE in c for c in calls)
# ---------------------------------------------------------------------------
# allocate_ips (pure logic — no subprocess / Docker)
# ---------------------------------------------------------------------------
class TestAllocateIps:
def test_basic_allocation(self):
ips = allocate_ips("192.168.1.0/24", "192.168.1.1", "192.168.1.100", count=3)
assert len(ips) == 3
assert "192.168.1.1" not in ips # gateway skipped
assert "192.168.1.100" not in ips # host IP skipped
def test_skips_network_and_broadcast(self):
ips = allocate_ips("10.0.0.0/30", "10.0.0.1", "10.0.0.3", count=1)
# /30 hosts: .1 (gateway), .2. .3 is host_ip → only .2 available
assert ips == ["10.0.0.2"]
def test_respects_ip_start(self):
ips = allocate_ips("192.168.1.0/24", "192.168.1.1", "192.168.1.1",
count=2, ip_start="192.168.1.50")
assert all(ip >= "192.168.1.50" for ip in ips)
def test_raises_when_not_enough_ips(self):
# /30 only has 2 host addresses; reserving both leaves 0
with pytest.raises(RuntimeError, match="Not enough free IPs"):
allocate_ips("10.0.0.0/30", "10.0.0.1", "10.0.0.2", count=3)
def test_no_duplicates(self):
ips = allocate_ips("10.0.0.0/24", "10.0.0.1", "10.0.0.2", count=10)
assert len(ips) == len(set(ips))
def test_exact_count_returned(self):
ips = allocate_ips("172.16.0.0/24", "172.16.0.1", "172.16.0.254", count=5)
assert len(ips) == 5
# ---------------------------------------------------------------------------
# detect_interface
# ---------------------------------------------------------------------------
class TestDetectInterface:
@patch("decnet.network._run")
def test_parses_dev_from_route(self, mock_run):
mock_run.return_value = MagicMock(
stdout="default via 192.168.1.1 dev eth0 proto dhcp\n"
)
assert detect_interface() == "eth0"
@patch("decnet.network._run")
def test_raises_when_no_dev_found(self, mock_run):
mock_run.return_value = MagicMock(stdout="")
with pytest.raises(RuntimeError, match="Could not auto-detect"):
detect_interface()
# ---------------------------------------------------------------------------
# detect_subnet
# ---------------------------------------------------------------------------
class TestDetectSubnet:
def _make_run(self, addr_output, route_output):
def side_effect(cmd, **kwargs):
if "addr" in cmd:
return MagicMock(stdout=addr_output)
return MagicMock(stdout=route_output)
return side_effect
@patch("decnet.network._run")
def test_parses_subnet_and_gateway(self, mock_run):
mock_run.side_effect = self._make_run(
" inet 192.168.1.5/24 brd 192.168.1.255 scope global eth0\n",
"default via 192.168.1.1 dev eth0\n",
)
subnet, gw = detect_subnet("eth0")
assert subnet == "192.168.1.0/24"
assert gw == "192.168.1.1"
@patch("decnet.network._run")
def test_raises_when_no_inet(self, mock_run):
mock_run.side_effect = self._make_run("", "default via 192.168.1.1 dev eth0\n")
with pytest.raises(RuntimeError, match="Could not detect subnet"):
detect_subnet("eth0")
@patch("decnet.network._run")
def test_raises_when_no_gateway(self, mock_run):
mock_run.side_effect = self._make_run(
" inet 192.168.1.5/24 brd 192.168.1.255 scope global eth0\n", ""
)
with pytest.raises(RuntimeError, match="Could not detect gateway"):
detect_subnet("eth0")
# ---------------------------------------------------------------------------
# get_host_ip
# ---------------------------------------------------------------------------
class TestGetHostIp:
@patch("decnet.network._run")
def test_returns_host_ip(self, mock_run):
mock_run.return_value = MagicMock(
stdout=" inet 10.0.0.5/24 brd 10.0.0.255 scope global eth0\n"
)
assert get_host_ip("eth0") == "10.0.0.5"
@patch("decnet.network._run")
def test_raises_when_no_inet(self, mock_run):
mock_run.return_value = MagicMock(stdout="link/ether aa:bb:cc:dd:ee:ff\n")
with pytest.raises(RuntimeError, match="Could not determine host IP"):
get_host_ip("eth0")
# ---------------------------------------------------------------------------
# remove_macvlan_network
# ---------------------------------------------------------------------------
class TestRemoveMacvlanNetwork:
def test_removes_matching_network(self):
client = MagicMock()
net = MagicMock()
net.name = MACVLAN_NETWORK_NAME
client.networks.list.return_value = [net]
remove_macvlan_network(client)
net.remove.assert_called_once()
def test_noop_when_no_matching_network(self):
client = MagicMock()
other = MagicMock()
other.name = "some-other-network"
client.networks.list.return_value = [other]
remove_macvlan_network(client)
other.remove.assert_not_called()
# ---------------------------------------------------------------------------
# teardown_host_macvlan
# ---------------------------------------------------------------------------
class TestTeardownHostMacvlan:
@patch("decnet.network.os.geteuid", return_value=0)
@patch("decnet.network._run")
def test_deletes_macvlan_iface(self, mock_run, _):
mock_run.return_value = MagicMock(returncode=0)
teardown_host_macvlan("192.168.1.96/27")
calls = [str(c) for c in mock_run.call_args_list]
assert any(HOST_MACVLAN_IFACE in c for c in calls)
@patch("decnet.network.os.geteuid", return_value=1)
def test_requires_root(self, _):
with pytest.raises(PermissionError):
teardown_host_macvlan("192.168.1.96/27")

View File

@@ -0,0 +1,475 @@
"""
Tests for the OS TCP/IP fingerprint spoof feature.
Covers:
- os_fingerprint.py: profiles, TTL values, fallback behaviour
- archetypes.py: every archetype has a valid nmap_os
- config.py: DeckyConfig carries nmap_os
- composer.py: base container gets sysctls + cap_add injected
- cli.py helpers: nmap_os propagated from archetype → DeckyConfig
"""
import pytest
from decnet.archetypes import ARCHETYPES
from decnet.composer import generate_compose
from decnet.config import DeckyConfig, DecnetConfig
from decnet.os_fingerprint import OS_SYSCTLS, all_os_families, get_os_sysctls
# ---------------------------------------------------------------------------
# os_fingerprint module — TTL
# ---------------------------------------------------------------------------
def test_linux_ttl_is_64():
assert get_os_sysctls("linux")["net.ipv4.ip_default_ttl"] == "64"
def test_windows_ttl_is_128():
assert get_os_sysctls("windows")["net.ipv4.ip_default_ttl"] == "128"
def test_embedded_ttl_is_255():
assert get_os_sysctls("embedded")["net.ipv4.ip_default_ttl"] == "255"
def test_cisco_ttl_is_255():
assert get_os_sysctls("cisco")["net.ipv4.ip_default_ttl"] == "255"
def test_bsd_ttl_is_64():
assert get_os_sysctls("bsd")["net.ipv4.ip_default_ttl"] == "64"
# ---------------------------------------------------------------------------
# os_fingerprint module — tcp_timestamps
# ---------------------------------------------------------------------------
def test_linux_tcp_timestamps_is_1():
assert get_os_sysctls("linux")["net.ipv4.tcp_timestamps"] == "1"
def test_windows_tcp_timestamps_is_0():
assert get_os_sysctls("windows")["net.ipv4.tcp_timestamps"] == "0"
def test_embedded_tcp_timestamps_is_0():
assert get_os_sysctls("embedded")["net.ipv4.tcp_timestamps"] == "0"
def test_bsd_tcp_timestamps_is_1():
assert get_os_sysctls("bsd")["net.ipv4.tcp_timestamps"] == "1"
def test_cisco_tcp_timestamps_is_0():
assert get_os_sysctls("cisco")["net.ipv4.tcp_timestamps"] == "0"
# ---------------------------------------------------------------------------
# os_fingerprint module — tcp_sack
# ---------------------------------------------------------------------------
def test_linux_tcp_sack_is_1():
assert get_os_sysctls("linux")["net.ipv4.tcp_sack"] == "1"
def test_windows_tcp_sack_is_1():
assert get_os_sysctls("windows")["net.ipv4.tcp_sack"] == "1"
def test_embedded_tcp_sack_is_0():
assert get_os_sysctls("embedded")["net.ipv4.tcp_sack"] == "0"
def test_cisco_tcp_sack_is_0():
assert get_os_sysctls("cisco")["net.ipv4.tcp_sack"] == "0"
# ---------------------------------------------------------------------------
# os_fingerprint module — tcp_ecn
# ---------------------------------------------------------------------------
def test_linux_tcp_ecn_is_2():
assert get_os_sysctls("linux")["net.ipv4.tcp_ecn"] == "2"
def test_windows_tcp_ecn_is_0():
assert get_os_sysctls("windows")["net.ipv4.tcp_ecn"] == "0"
def test_embedded_tcp_ecn_is_0():
assert get_os_sysctls("embedded")["net.ipv4.tcp_ecn"] == "0"
def test_bsd_tcp_ecn_is_0():
assert get_os_sysctls("bsd")["net.ipv4.tcp_ecn"] == "0"
# ---------------------------------------------------------------------------
# os_fingerprint module — tcp_window_scaling
# ---------------------------------------------------------------------------
def test_linux_tcp_window_scaling_is_1():
assert get_os_sysctls("linux")["net.ipv4.tcp_window_scaling"] == "1"
def test_windows_tcp_window_scaling_is_1():
assert get_os_sysctls("windows")["net.ipv4.tcp_window_scaling"] == "1"
def test_embedded_tcp_window_scaling_is_0():
assert get_os_sysctls("embedded")["net.ipv4.tcp_window_scaling"] == "0"
def test_cisco_tcp_window_scaling_is_0():
assert get_os_sysctls("cisco")["net.ipv4.tcp_window_scaling"] == "0"
# ---------------------------------------------------------------------------
# os_fingerprint module — ip_no_pmtu_disc
# ---------------------------------------------------------------------------
def test_linux_ip_no_pmtu_disc_is_0():
assert get_os_sysctls("linux")["net.ipv4.ip_no_pmtu_disc"] == "0"
def test_windows_ip_no_pmtu_disc_is_0():
assert get_os_sysctls("windows")["net.ipv4.ip_no_pmtu_disc"] == "0"
def test_embedded_ip_no_pmtu_disc_is_1():
assert get_os_sysctls("embedded")["net.ipv4.ip_no_pmtu_disc"] == "1"
def test_cisco_ip_no_pmtu_disc_is_1():
assert get_os_sysctls("cisco")["net.ipv4.ip_no_pmtu_disc"] == "1"
# ---------------------------------------------------------------------------
# os_fingerprint module — tcp_fin_timeout
# ---------------------------------------------------------------------------
def test_linux_tcp_fin_timeout_is_60():
assert get_os_sysctls("linux")["net.ipv4.tcp_fin_timeout"] == "60"
def test_windows_tcp_fin_timeout_is_30():
assert get_os_sysctls("windows")["net.ipv4.tcp_fin_timeout"] == "30"
def test_embedded_tcp_fin_timeout_is_15():
assert get_os_sysctls("embedded")["net.ipv4.tcp_fin_timeout"] == "15"
def test_cisco_tcp_fin_timeout_is_15():
assert get_os_sysctls("cisco")["net.ipv4.tcp_fin_timeout"] == "15"
# ---------------------------------------------------------------------------
# os_fingerprint module — icmp_ratelimit
# ---------------------------------------------------------------------------
def test_linux_icmp_ratelimit_is_1000():
assert get_os_sysctls("linux")["net.ipv4.icmp_ratelimit"] == "1000"
def test_windows_icmp_ratelimit_is_0():
assert get_os_sysctls("windows")["net.ipv4.icmp_ratelimit"] == "0"
def test_bsd_icmp_ratelimit_is_250():
assert get_os_sysctls("bsd")["net.ipv4.icmp_ratelimit"] == "250"
def test_embedded_icmp_ratelimit_is_0():
assert get_os_sysctls("embedded")["net.ipv4.icmp_ratelimit"] == "0"
def test_cisco_icmp_ratelimit_is_0():
assert get_os_sysctls("cisco")["net.ipv4.icmp_ratelimit"] == "0"
# ---------------------------------------------------------------------------
# os_fingerprint module — icmp_ratemask
# ---------------------------------------------------------------------------
def test_linux_icmp_ratemask_is_6168():
assert get_os_sysctls("linux")["net.ipv4.icmp_ratemask"] == "6168"
def test_windows_icmp_ratemask_is_0():
assert get_os_sysctls("windows")["net.ipv4.icmp_ratemask"] == "0"
def test_bsd_icmp_ratemask_is_6168():
assert get_os_sysctls("bsd")["net.ipv4.icmp_ratemask"] == "6168"
def test_embedded_icmp_ratemask_is_0():
assert get_os_sysctls("embedded")["net.ipv4.icmp_ratemask"] == "0"
def test_cisco_icmp_ratemask_is_0():
assert get_os_sysctls("cisco")["net.ipv4.icmp_ratemask"] == "0"
# ---------------------------------------------------------------------------
# os_fingerprint module — structural / completeness
# ---------------------------------------------------------------------------
def test_unknown_os_falls_back_to_linux():
result = get_os_sysctls("nonexistent-os")
assert result == get_os_sysctls("linux")
def test_get_os_sysctls_returns_copy():
"""Mutating the returned dict must not alter the master profile."""
s = get_os_sysctls("windows")
s["net.ipv4.ip_default_ttl"] = "999"
assert OS_SYSCTLS["windows"]["net.ipv4.ip_default_ttl"] == "128"
def test_all_os_families_non_empty():
families = all_os_families()
assert len(families) > 0
assert "linux" in families
assert "windows" in families
assert "embedded" in families
@pytest.mark.parametrize("family", ["linux", "windows", "bsd", "embedded", "cisco"])
def test_all_os_profiles_have_required_sysctls(family: str):
"""Every OS profile must define the full canonical sysctl set."""
from decnet.os_fingerprint import _REQUIRED_SYSCTLS
result = get_os_sysctls(family)
missing = _REQUIRED_SYSCTLS - result.keys()
assert not missing, f"OS profile '{family}' is missing sysctls: {missing}"
@pytest.mark.parametrize("family", ["linux", "windows", "bsd", "embedded", "cisco"])
def test_all_os_sysctl_values_are_strings(family: str):
"""Docker Compose requires sysctl values to be strings, never ints."""
for _key, _val in get_os_sysctls(family).items():
assert isinstance(_val, str), (
f"OS profile '{family}': sysctl '{_key}' value {_val!r} is not a string"
)
# ---------------------------------------------------------------------------
# Archetypes carry valid nmap_os values
# ---------------------------------------------------------------------------
@pytest.mark.parametrize("slug,arch", list(ARCHETYPES.items()))
def test_archetype_nmap_os_is_known(slug, arch):
assert arch.nmap_os in all_os_families(), (
f"Archetype '{slug}' has nmap_os='{arch.nmap_os}' which is not in OS_SYSCTLS"
)
@pytest.mark.parametrize("slug", ["windows-workstation", "windows-server", "domain-controller"])
def test_windows_archetypes_have_windows_nmap_os(slug):
assert ARCHETYPES[slug].nmap_os == "windows"
@pytest.mark.parametrize("slug", ["printer", "iot-device", "industrial-control"])
def test_embedded_archetypes_have_embedded_nmap_os(slug):
assert ARCHETYPES[slug].nmap_os == "embedded"
@pytest.mark.parametrize("slug", ["linux-server", "web-server", "database-server",
"mail-server", "file-server", "voip-server",
"monitoring-node", "devops-host"])
def test_linux_archetypes_have_linux_nmap_os(slug):
assert ARCHETYPES[slug].nmap_os == "linux"
# ---------------------------------------------------------------------------
# DeckyConfig default
# ---------------------------------------------------------------------------
def _make_decky(nmap_os: str = "linux") -> DeckyConfig:
return DeckyConfig(
name="decky-01",
ip="10.0.0.10",
services=["ssh"],
distro="debian",
base_image="debian:bookworm-slim",
build_base="debian:bookworm-slim",
hostname="test-host",
nmap_os=nmap_os,
)
def test_deckyconfig_default_nmap_os_is_linux():
cfg = DeckyConfig(
name="decky-01",
ip="10.0.0.10",
services=["ssh"],
distro="debian",
base_image="debian:bookworm-slim",
build_base="debian:bookworm-slim",
hostname="test-host",
)
assert cfg.nmap_os == "linux"
def test_deckyconfig_accepts_custom_nmap_os():
cfg = _make_decky(nmap_os="windows")
assert cfg.nmap_os == "windows"
# ---------------------------------------------------------------------------
# Composer injects sysctls + cap_add into base container
# ---------------------------------------------------------------------------
def _make_config(nmap_os: str = "linux") -> DecnetConfig:
return DecnetConfig(
mode="unihost",
interface="eth0",
subnet="10.0.0.0/24",
gateway="10.0.0.1",
deckies=[_make_decky(nmap_os=nmap_os)],
)
def test_compose_base_has_sysctls():
compose = generate_compose(_make_config("linux"))
base = compose["services"]["decky-01"]
assert "sysctls" in base
def test_compose_base_has_cap_net_admin():
compose = generate_compose(_make_config("linux"))
base = compose["services"]["decky-01"]
assert "cap_add" in base
assert "NET_ADMIN" in base["cap_add"]
def test_compose_linux_ttl_64():
compose = generate_compose(_make_config("linux"))
sysctls = compose["services"]["decky-01"]["sysctls"]
assert sysctls["net.ipv4.ip_default_ttl"] == "64"
def test_compose_windows_ttl_128():
compose = generate_compose(_make_config("windows"))
sysctls = compose["services"]["decky-01"]["sysctls"]
assert sysctls["net.ipv4.ip_default_ttl"] == "128"
def test_compose_embedded_ttl_255():
compose = generate_compose(_make_config("embedded"))
sysctls = compose["services"]["decky-01"]["sysctls"]
assert sysctls["net.ipv4.ip_default_ttl"] == "255"
def test_compose_service_containers_have_no_sysctls():
"""Service containers share the base network namespace — no sysctls needed there."""
compose = generate_compose(_make_config("windows"))
svc = compose["services"]["decky-01-ssh"]
assert "sysctls" not in svc
def test_compose_two_deckies_independent_nmap_os():
"""Each decky gets its own OS profile."""
decky_win = _make_decky(nmap_os="windows")
decky_lin = DeckyConfig(
name="decky-02",
ip="10.0.0.11",
services=["ssh"],
distro="debian",
base_image="debian:bookworm-slim",
build_base="debian:bookworm-slim",
hostname="test-host-2",
nmap_os="linux",
)
config = DecnetConfig(
mode="unihost",
interface="eth0",
subnet="10.0.0.0/24",
gateway="10.0.0.1",
deckies=[decky_win, decky_lin],
)
compose = generate_compose(config)
assert compose["services"]["decky-01"]["sysctls"]["net.ipv4.ip_default_ttl"] == "128"
assert compose["services"]["decky-02"]["sysctls"]["net.ipv4.ip_default_ttl"] == "64"
def test_compose_linux_sysctls_include_timestamps():
"""Linux compose output must have tcp_timestamps enabled (= 1)."""
compose = generate_compose(_make_config("linux"))
sysctls = compose["services"]["decky-01"]["sysctls"]
assert sysctls.get("net.ipv4.tcp_timestamps") == "1"
def test_compose_windows_sysctls_no_timestamps():
"""Windows compose output must have tcp_timestamps disabled (= 0)."""
compose = generate_compose(_make_config("windows"))
sysctls = compose["services"]["decky-01"]["sysctls"]
assert sysctls.get("net.ipv4.tcp_timestamps") == "0"
def test_compose_linux_sysctls_full_set():
"""Linux compose output must carry all 8 canonical sysctls."""
from decnet.os_fingerprint import _REQUIRED_SYSCTLS
compose = generate_compose(_make_config("linux"))
sysctls = compose["services"]["decky-01"]["sysctls"]
missing = _REQUIRED_SYSCTLS - sysctls.keys()
assert not missing, f"Compose output missing sysctls: {missing}"
def test_compose_embedded_sysctls_full_set():
"""Embedded compose output must carry all 8 canonical sysctls."""
from decnet.os_fingerprint import _REQUIRED_SYSCTLS
compose = generate_compose(_make_config("embedded"))
sysctls = compose["services"]["decky-01"]["sysctls"]
missing = _REQUIRED_SYSCTLS - sysctls.keys()
assert not missing, f"Compose output missing sysctls: {missing}"
# ---------------------------------------------------------------------------
# CLI helper: nmap_os flows from archetype into DeckyConfig
# ---------------------------------------------------------------------------
def test_build_deckies_windows_archetype_sets_nmap_os():
from decnet.archetypes import get_archetype
from decnet.fleet import build_deckies as _build_deckies
arch = get_archetype("windows-workstation")
deckies = _build_deckies(
n=1,
ips=["10.0.0.20"],
services_explicit=None,
randomize_services=False,
archetype=arch,
)
assert deckies[0].nmap_os == "windows"
def test_build_deckies_no_archetype_defaults_linux():
from decnet.fleet import build_deckies as _build_deckies
deckies = _build_deckies(
n=1,
ips=["10.0.0.20"],
services_explicit=["ssh"],
randomize_services=False,
archetype=None,
)
assert deckies[0].nmap_os == "linux"
def test_build_deckies_embedded_archetype_sets_nmap_os():
from decnet.archetypes import get_archetype
from decnet.fleet import build_deckies as _build_deckies
arch = get_archetype("iot-device")
deckies = _build_deckies(
n=1,
ips=["10.0.0.20"],
services_explicit=None,
randomize_services=False,
archetype=arch,
)
assert deckies[0].nmap_os == "embedded"

113
tests/core/test_privdrop.py Normal file
View File

@@ -0,0 +1,113 @@
"""
Unit tests for decnet.privdrop — no actual root required.
We stub os.geteuid / os.chown to simulate root and capture the calls,
so these tests are portable (CI doesn't run as root).
"""
import os
import pytest
from decnet import privdrop
def test_chown_noop_when_not_root(tmp_path, monkeypatch):
target = tmp_path / "x.log"
target.write_text("")
monkeypatch.setattr(os, "geteuid", lambda: 1000)
monkeypatch.setenv("SUDO_UID", "1000")
monkeypatch.setenv("SUDO_GID", "1000")
called = []
monkeypatch.setattr(os, "chown", lambda *a, **kw: called.append(a))
privdrop.chown_to_invoking_user(target)
assert called == []
def test_chown_noop_when_no_sudo_env(tmp_path, monkeypatch):
target = tmp_path / "x.log"
target.write_text("")
monkeypatch.setattr(os, "geteuid", lambda: 0)
monkeypatch.delenv("SUDO_UID", raising=False)
monkeypatch.delenv("SUDO_GID", raising=False)
called = []
monkeypatch.setattr(os, "chown", lambda *a, **kw: called.append(a))
privdrop.chown_to_invoking_user(target)
assert called == []
def test_chown_noop_when_path_missing(tmp_path, monkeypatch):
monkeypatch.setattr(os, "geteuid", lambda: 0)
monkeypatch.setenv("SUDO_UID", "1000")
monkeypatch.setenv("SUDO_GID", "1000")
called = []
monkeypatch.setattr(os, "chown", lambda *a, **kw: called.append(a))
privdrop.chown_to_invoking_user(tmp_path / "does-not-exist")
assert called == []
def test_chown_applies_sudo_ids(tmp_path, monkeypatch):
target = tmp_path / "x.log"
target.write_text("")
monkeypatch.setattr(os, "geteuid", lambda: 0)
monkeypatch.setenv("SUDO_UID", "4242")
monkeypatch.setenv("SUDO_GID", "4243")
seen = {}
def fake_chown(path, uid, gid):
seen["path"] = str(path)
seen["uid"] = uid
seen["gid"] = gid
monkeypatch.setattr(os, "chown", fake_chown)
privdrop.chown_to_invoking_user(target)
assert seen == {"path": str(target), "uid": 4242, "gid": 4243}
def test_chown_tree_recurses(tmp_path, monkeypatch):
(tmp_path / "a").mkdir()
(tmp_path / "a" / "b.log").write_text("")
(tmp_path / "c.log").write_text("")
monkeypatch.setattr(os, "geteuid", lambda: 0)
monkeypatch.setenv("SUDO_UID", "1000")
monkeypatch.setenv("SUDO_GID", "1000")
chowned = []
monkeypatch.setattr(os, "chown", lambda p, *a: chowned.append(str(p)))
privdrop.chown_tree_to_invoking_user(tmp_path)
assert str(tmp_path) in chowned
assert str(tmp_path / "a") in chowned
assert str(tmp_path / "a" / "b.log") in chowned
assert str(tmp_path / "c.log") in chowned
def test_chown_swallows_oserror(tmp_path, monkeypatch):
"""A failed chown (e.g. cross-fs sudo edge case) must not raise."""
target = tmp_path / "x.log"
target.write_text("")
monkeypatch.setattr(os, "geteuid", lambda: 0)
monkeypatch.setenv("SUDO_UID", "1000")
monkeypatch.setenv("SUDO_GID", "1000")
def boom(*_a, **_kw):
raise OSError("EPERM")
monkeypatch.setattr(os, "chown", boom)
privdrop.chown_to_invoking_user(target) # must not raise
def test_chown_rejects_malformed_sudo_ids(tmp_path, monkeypatch):
target = tmp_path / "x.log"
target.write_text("")
monkeypatch.setattr(os, "geteuid", lambda: 0)
monkeypatch.setenv("SUDO_UID", "not-an-int")
monkeypatch.setenv("SUDO_GID", "1000")
called = []
monkeypatch.setattr(os, "chown", lambda *a, **kw: called.append(a))
privdrop.chown_to_invoking_user(target)
assert called == []