test(templates): cover instance_seed helper and update service tests

Add tests/service_testing/test_instance_seed.py — pins NODE_NAME to assert
determinism of seeded functions and sweeps NODE_NAMEs to assert cross-fleet
divergence. Conftest gains load_real_instance_seed() so template tests see
the real seeding behavior instead of a stub. Existing template tests updated
to pin NODE_NAME and match seeded outputs.
This commit is contained in:
2026-04-22 09:24:28 -04:00
parent 3fb84ac5d0
commit a63708a3d1
10 changed files with 302 additions and 78 deletions

View File

@@ -28,6 +28,20 @@ def make_fake_syslog_bridge() -> ModuleType:
return mod
def load_real_instance_seed() -> ModuleType:
"""Load the real instance_seed helper so templates under test see the
actual per-instance seeding behavior, not a stub. Tests that need
determinism should pin NODE_NAME via monkeypatch before loading a
template."""
import importlib.util
spec = importlib.util.spec_from_file_location(
"instance_seed", "decnet/templates/instance_seed.py"
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
def run_with_timeout(fn, *args, timeout: float = 2.0) -> None:
"""Run fn(*args) in a daemon thread. pytest.fail if it doesn't return in time."""
exc_box: list[BaseException] = []

View File

@@ -0,0 +1,91 @@
"""
Tests for decnet/templates/instance_seed.py — the per-instance stealth
seeding helper. These tests pin NODE_NAME to assert determinism of the
seeded functions, and sweep NODE_NAMEs to assert cross-fleet divergence.
"""
import asyncio
import importlib.util
import sys
import time
from unittest.mock import patch
def _load_seed(node_name: str):
sys.modules.pop("instance_seed", None)
spec = importlib.util.spec_from_file_location(
"instance_seed", "decnet/templates/instance_seed.py"
)
mod = importlib.util.module_from_spec(spec)
with patch.dict("os.environ", {"NODE_NAME": node_name}, clear=False):
spec.loader.exec_module(mod)
return mod
def test_same_nodename_yields_stable_uuid():
a = _load_seed("deckie-42").instance_uuid("x")
b = _load_seed("deckie-42").instance_uuid("x")
assert a == b
def test_different_nodename_yields_different_uuid():
a = _load_seed("deckie-alpha").instance_uuid("x")
b = _load_seed("deckie-beta").instance_uuid("x")
assert a != b
def test_pick_is_deterministic_per_instance():
choices = ["a", "b", "c", "d", "e"]
m1 = _load_seed("hostX")
m2 = _load_seed("hostX")
assert m1.pick(choices) == m2.pick(choices)
def test_pick_varies_across_fleet():
"""For a reasonable fleet size, pick should land on at least 2 distinct
values. Anything less means the seed isn't actually diversifying output."""
choices = list("abcdefghij")
picks = {_load_seed(f"host{i}").pick(choices) for i in range(20)}
assert len(picks) >= 3
def test_uptime_monotonic_across_calls():
mod = _load_seed("uptime-host")
u1 = mod.uptime_seconds()
time.sleep(0.02)
u2 = mod.uptime_seconds()
assert u2 >= u1
def test_uptime_includes_boot_offset():
"""uptime should be > a few minutes even at process start — deckies
should not look like they just booted."""
mod = _load_seed("fresh-host")
assert mod.uptime_seconds() > 600
def test_fresh_bytes_is_not_deterministic():
"""fresh_bytes is per-connection randomness, not seeded — otherwise
two MySQL handshakes to the same decky would present identical salts."""
mod = _load_seed("host")
assert mod.fresh_bytes(16) != mod.fresh_bytes(16)
def test_random_bytes_is_deterministic():
"""random_bytes is the *seeded* variant — used for stable per-instance
identifiers like cluster UUIDs."""
a = _load_seed("h").random_bytes(16, "ns")
b = _load_seed("h").random_bytes(16, "ns")
assert a == b
def test_jitter_sleeps_in_range():
mod = _load_seed("jh")
async def run():
start = time.perf_counter()
await mod.jitter(10, 30)
return time.perf_counter() - start
elapsed = asyncio.run(run())
assert 0.005 <= elapsed <= 0.200 # generous upper bound for CI jitter

View File

@@ -14,16 +14,21 @@ import pytest
from hypothesis import given, settings
from hypothesis import strategies as st
from .conftest import _FUZZ_SETTINGS, make_fake_syslog_bridge, run_with_timeout
from .conftest import (
_FUZZ_SETTINGS,
load_real_instance_seed,
make_fake_syslog_bridge,
run_with_timeout,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _load_mongodb():
for key in list(sys.modules):
if key in ("mongodb_server", "syslog_bridge"):
del sys.modules[key]
for key in ("mongodb_server", "syslog_bridge", "instance_seed"):
sys.modules.pop(key, None)
sys.modules["syslog_bridge"] = make_fake_syslog_bridge()
sys.modules["instance_seed"] = load_real_instance_seed()
spec = importlib.util.spec_from_file_location("mongodb_server", "decnet/templates/mongodb/server.py")
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)

View File

@@ -33,15 +33,16 @@ def _load_mqtt(accept_all: bool = True, custom_topics: str = "", persona: str =
"MQTT_PERSONA": persona,
"MQTT_CUSTOM_TOPICS": custom_topics,
}
for key in list(sys.modules):
if key in ("mqtt_server", "syslog_bridge"):
del sys.modules[key]
for key in ("mqtt_server", "syslog_bridge", "instance_seed"):
sys.modules.pop(key, None)
sys.modules["syslog_bridge"] = _make_fake_syslog_bridge()
spec = importlib.util.spec_from_file_location("mqtt_server", "decnet/templates/mqtt/server.py")
mod = importlib.util.module_from_spec(spec)
with patch.dict("os.environ", env, clear=False):
from .conftest import load_real_instance_seed
sys.modules["instance_seed"] = load_real_instance_seed()
spec.loader.exec_module(mod)
return mod
@@ -127,14 +128,18 @@ def test_subscribe_wildcard_retained(mqtt_mod):
_send(proto, _connect_packet())
written.clear()
_send(proto, _subscribe_packet("plant/#"))
# The water_plant persona now picks a per-instance site prefix (north,
# south, plant-a, etc.) instead of hardcoding "plant/". Use the top-level
# wildcard so the test doesn't depend on which site this decky rolled.
_send(proto, _subscribe_packet("#"))
assert len(written) >= 2 # At least SUBACK + some publishes
assert written[0].startswith(b"\x90") # SUBACK
combined = b"".join(written[1:])
# Should contain some water plant topics
assert b"plant/water/tank1/level" in combined
# The identifying tail of water-plant topics is stable regardless of
# which site prefix was chosen at instance boot.
assert b"water/tank1/level" in combined
def test_publish_qos1_returns_puback(mqtt_mod):
proto, _, written = _make_protocol(mqtt_mod)

View File

@@ -15,16 +15,21 @@ import pytest
from hypothesis import given, settings
from hypothesis import strategies as st
from .conftest import _FUZZ_SETTINGS, make_fake_syslog_bridge, run_with_timeout
from .conftest import (
_FUZZ_SETTINGS,
load_real_instance_seed,
make_fake_syslog_bridge,
run_with_timeout,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _load_mqtt():
for key in list(sys.modules):
if key in ("mqtt_server", "syslog_bridge"):
del sys.modules[key]
for key in ("mqtt_server", "syslog_bridge", "instance_seed"):
sys.modules.pop(key, None)
sys.modules["syslog_bridge"] = make_fake_syslog_bridge()
sys.modules["instance_seed"] = load_real_instance_seed()
spec = importlib.util.spec_from_file_location("mqtt_server", "decnet/templates/mqtt/server.py")
mod = importlib.util.module_from_spec(spec)
with patch.dict("os.environ", {"MQTT_ACCEPT_ALL": "1", "MQTT_PERSONA": "water_plant"}, clear=False):

View File

@@ -14,16 +14,21 @@ import pytest
from hypothesis import given, settings
from hypothesis import strategies as st
from .conftest import _FUZZ_SETTINGS, make_fake_syslog_bridge, run_with_timeout
from .conftest import (
_FUZZ_SETTINGS,
load_real_instance_seed,
make_fake_syslog_bridge,
run_with_timeout,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _load_mssql():
for key in list(sys.modules):
if key in ("mssql_server", "syslog_bridge"):
del sys.modules[key]
for key in ("mssql_server", "syslog_bridge", "instance_seed"):
sys.modules.pop(key, None)
sys.modules["syslog_bridge"] = make_fake_syslog_bridge()
sys.modules["instance_seed"] = load_real_instance_seed()
spec = importlib.util.spec_from_file_location("mssql_server", "decnet/templates/mssql/server.py")
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)

View File

@@ -14,16 +14,21 @@ import pytest
from hypothesis import given, settings
from hypothesis import strategies as st
from .conftest import _FUZZ_SETTINGS, make_fake_syslog_bridge, run_with_timeout
from .conftest import (
_FUZZ_SETTINGS,
load_real_instance_seed,
make_fake_syslog_bridge,
run_with_timeout,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _load_mysql():
for key in list(sys.modules):
if key in ("mysql_server", "syslog_bridge"):
del sys.modules[key]
for key in ("mysql_server", "syslog_bridge", "instance_seed"):
sys.modules.pop(key, None)
sys.modules["syslog_bridge"] = make_fake_syslog_bridge()
sys.modules["instance_seed"] = load_real_instance_seed()
spec = importlib.util.spec_from_file_location("mysql_server", "decnet/templates/mysql/server.py")
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
@@ -33,6 +38,7 @@ def _load_mysql():
def _make_protocol(mod):
proto = mod.MySQLProtocol()
transport = MagicMock()
transport.is_closing.return_value = False
written: list[bytes] = []
transport.write.side_effect = written.append
proto.connection_made(transport)
@@ -66,6 +72,7 @@ def mysql_mod():
def test_connection_sends_greeting(mysql_mod):
proto = mysql_mod.MySQLProtocol()
transport = MagicMock()
transport.is_closing.return_value = False
written: list[bytes] = []
transport.write.side_effect = written.append
proto.connection_made(transport)

View File

@@ -14,16 +14,21 @@ import pytest
from hypothesis import given, settings
from hypothesis import strategies as st
from .conftest import _FUZZ_SETTINGS, make_fake_syslog_bridge, run_with_timeout
from .conftest import (
_FUZZ_SETTINGS,
load_real_instance_seed,
make_fake_syslog_bridge,
run_with_timeout,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
def _load_postgres():
for key in list(sys.modules):
if key in ("postgres_server", "syslog_bridge"):
del sys.modules[key]
for key in ("postgres_server", "syslog_bridge", "instance_seed"):
sys.modules.pop(key, None)
sys.modules["syslog_bridge"] = make_fake_syslog_bridge()
sys.modules["instance_seed"] = load_real_instance_seed()
spec = importlib.util.spec_from_file_location("postgres_server", "decnet/templates/postgres/server.py")
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)

View File

@@ -5,28 +5,27 @@ from unittest.mock import MagicMock, patch
import pytest
def _make_fake_syslog_bridge() -> ModuleType:
mod = ModuleType("syslog_bridge")
mod.syslog_line = MagicMock(return_value="")
mod.write_syslog_file = MagicMock()
mod.forward_syslog = MagicMock()
mod.SEVERITY_WARNING = 4
mod.SEVERITY_INFO = 6
return mod
from tests.service_testing.conftest import (
load_real_instance_seed,
make_fake_syslog_bridge,
)
def _load_redis():
env = {"NODE_NAME": "testredis"}
for key in list(sys.modules):
if key in ("redis_server", "syslog_bridge"):
del sys.modules[key]
def _load_redis(node_name: str = "testredis"):
env = {"NODE_NAME": node_name}
for key in ("redis_server", "syslog_bridge", "instance_seed"):
sys.modules.pop(key, None)
sys.modules["syslog_bridge"] = _make_fake_syslog_bridge()
sys.modules["syslog_bridge"] = make_fake_syslog_bridge()
spec = importlib.util.spec_from_file_location("redis_server", "decnet/templates/redis/server.py")
mod = importlib.util.module_from_spec(spec)
# Pin NODE_NAME before loading instance_seed — the seed is derived at
# import time.
with patch.dict("os.environ", env, clear=False):
sys.modules["instance_seed"] = load_real_instance_seed()
spec = importlib.util.spec_from_file_location(
"redis_server", "decnet/templates/redis/server.py"
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
@@ -39,6 +38,7 @@ def redis_mod():
def _make_protocol(mod):
proto = mod.RedisProtocol()
transport = MagicMock()
transport.is_closing.return_value = False
written: list[bytes] = []
transport.write.side_effect = written.append
proto.connection_made(transport)
@@ -51,54 +51,73 @@ def _send(proto, *lines: bytes) -> None:
proto.data_received(line)
def test_auth_accepted(redis_mod):
def test_auth_with_no_password_configured(redis_mod, monkeypatch):
"""Default config has no REDIS_PASSWORD — real redis rejects AUTH with
the 'no password is set' ERR message. Accepting any AUTH blindly (the
old behavior) is a honeypot tell."""
proto, _, written = _make_protocol(redis_mod)
_send(proto, b"AUTH password\r\n")
assert b"".join(written) == b"+OK\r\n"
resp = b"".join(written)
assert resp.startswith(b"-ERR")
assert b"no password is set" in resp
def test_keys_wildcard(redis_mod):
def test_keys_pattern_yields_subset(redis_mod):
"""Fake store contents are now per-instance. We can't assert exact keys,
but KEYS with a narrow prefix should still return a proper RESP array
whose length matches the filtered subset."""
proto, _, written = _make_protocol(redis_mod)
_send(proto, b"*2\r\n$4\r\nKEYS\r\n$8\r\nsession:\r\n")
response = b"".join(written)
assert response.startswith(b"*0\r\n") # no key equals literal "session:"
def test_keys_star_returns_all(redis_mod):
proto, _, written = _make_protocol(redis_mod)
_send(proto, b"*2\r\n$4\r\nKEYS\r\n$1\r\n*\r\n")
response = b"".join(written)
assert response.startswith(b"*10\r\n")
assert b"config:aws_access_key" in response
# First bulk line is the array length; at least 1 key must exist.
assert response.startswith(b"*")
count = int(response.split(b"\r\n", 1)[0][1:])
assert count >= 1
def test_keys_prefix(redis_mod):
def test_config_get_returns_real_kv_pairs(redis_mod):
"""Old server returned *0 for CONFIG — a strong honeypot signature.
New behavior returns real config key/value pairs."""
proto, _, written = _make_protocol(redis_mod)
_send(proto, b"*2\r\n$4\r\nKEYS\r\n$6\r\nuser:*\r\n")
# Use a wildcard to make the length prefix unambiguous and match both
# "maxmemory" and "maxmemory-policy".
_send(proto, b"*3\r\n$6\r\nCONFIG\r\n$3\r\nGET\r\n$10\r\nmaxmemory*\r\n")
response = b"".join(written)
assert response.startswith(b"*2\r\n")
assert b"user:admin" in response
assert response.startswith(b"*4\r\n") # 2 keys × (key+value) = 4 elements
assert b"maxmemory" in response
assert b"maxmemory-policy" in response
def test_get_valid_key(redis_mod):
def test_info_has_dynamic_uptime(redis_mod):
proto, _, written = _make_protocol(redis_mod)
_send(proto, b"*2\r\n$3\r\nGET\r\n$13\r\ncache:api_key\r\n")
_send(proto, b"*1\r\n$4\r\nINFO\r\n")
response = b"".join(written)
assert response == b"$38\r\nsk_live_9mK3xF2aP7qR1bN8cT4dW6vE0yU5hJ\r\n"
assert b"uptime_in_seconds:" in response
# Old server hard-coded 864000 — ensure we're not regressing.
assert b"uptime_in_seconds:864000\r\n" not in response
def test_get_invalid_key(redis_mod):
proto, _, written = _make_protocol(redis_mod)
_send(proto, b"*2\r\n$3\r\nGET\r\n$7\r\nunknown\r\n")
response = b"".join(written)
assert response == b"$-1\r\n"
def test_scan(redis_mod):
proto, _, written = _make_protocol(redis_mod)
_send(proto, b"*1\r\n$4\r\nSCAN\r\n")
response = b"".join(written)
assert response.startswith(b"*2\r\n$1\r\n0\r\n*10\r\n")
def test_per_instance_version_differs_across_decky_names():
"""Two deckies with different NODE_NAMEs should, with high probability,
pick different redis versions from the weighted pool."""
picks: set[str] = set()
for name in ("decky-a", "decky-b", "decky-c", "decky-d", "decky-e", "decky-f"):
mod = _load_redis(node_name=name)
picks.add(mod._REDIS_VER)
assert len(picks) >= 2
def test_type_and_ttl(redis_mod):
proto, _, written = _make_protocol(redis_mod)
_send(proto, b"TYPE somekey\r\n")
assert b"".join(written) == b"+string\r\n"
assert b"+string\r\n" in b"".join(written)
written.clear()
_send(proto, b"TTL somekey\r\n")
assert b"".join(written) == b":-1\r\n"
assert b":-1\r\n" in b"".join(written)

View File

@@ -36,16 +36,23 @@ def _load_smtp(open_relay: bool):
Injects a stub syslog_bridge into sys.modules so the template can import
it without needing the real file on sys.path.
"""
env = {"SMTP_OPEN_RELAY": "1" if open_relay else "0", "NODE_NAME": "testhost"}
for key in list(sys.modules):
if key in ("smtp_server", "syslog_bridge"):
del sys.modules[key]
env = {
"SMTP_OPEN_RELAY": "1" if open_relay else "0",
"NODE_NAME": "testhost",
# Force deterministic RCPT acceptance in tests; relay filtering is
# covered in its own dedicated test class below.
"SMTP_RCPT_DROP_RATE": "0",
}
for key in ("smtp_server", "syslog_bridge", "instance_seed"):
sys.modules.pop(key, None)
sys.modules["syslog_bridge"] = _make_fake_syslog_bridge()
spec = importlib.util.spec_from_file_location("smtp_server", "decnet/templates/smtp/server.py")
mod = importlib.util.module_from_spec(spec)
with patch.dict("os.environ", env, clear=False):
from .conftest import load_real_instance_seed
sys.modules["instance_seed"] = load_real_instance_seed()
spec.loader.exec_module(mod)
return mod
@@ -237,6 +244,63 @@ class TestOpenRelay:
assert any("queued as" in r for r in replies)
# ── OPEN-RELAY FILTERING ─────────────────────────────────────────────────────
class TestOpenRelayFiltering:
"""Real open relays reject malformed/bogus RCPTs even when they accept
external mail — a pure tarpit is a honeypot tell."""
@staticmethod
def _session_with_env(env_extra: dict, *lines) -> list[str]:
env = {
"SMTP_OPEN_RELAY": "1",
"NODE_NAME": "testhost",
"SMTP_RCPT_DROP_RATE": "0",
**env_extra,
}
for key in ("smtp_server", "syslog_bridge", "instance_seed"):
sys.modules.pop(key, None)
sys.modules["syslog_bridge"] = _make_fake_syslog_bridge()
spec = importlib.util.spec_from_file_location(
"smtp_server", "decnet/templates/smtp/server.py"
)
mod = importlib.util.module_from_spec(spec)
with patch.dict("os.environ", env, clear=False):
from .conftest import load_real_instance_seed
sys.modules["instance_seed"] = load_real_instance_seed()
spec.loader.exec_module(mod)
proto, _, written = _make_protocol(mod)
_send(proto, *lines)
return _replies(written)
def test_malformed_rcpt_returns_501(self):
replies = self._session_with_env(
{},
"EHLO x.com",
"MAIL FROM:<a@b.com>",
"RCPT TO:<notanaddress>",
)
assert any(r.startswith("501") for r in replies)
def test_blocked_tld_returns_550(self):
replies = self._session_with_env(
{},
"EHLO x.com",
"MAIL FROM:<a@b.com>",
"RCPT TO:<admin@foo.invalid>",
)
assert any(r.startswith("550") for r in replies)
def test_always_greylist_returns_451(self):
replies = self._session_with_env(
{"SMTP_RCPT_DROP_RATE": "1.0"},
"EHLO x.com",
"MAIL FROM:<a@b.com>",
"RCPT TO:<victim@legit-domain.com>",
)
assert any(r.startswith("451") for r in replies)
# ── CREDENTIAL HARVESTER MODE ─────────────────────────────────────────────────
class TestCredentialHarvester:
@@ -296,10 +360,14 @@ class TestCredentialHarvester:
# ── Queue ID ──────────────────────────────────────────────────────────────────
def test_rand_msg_id_format(relay_mod):
# Postfix queue IDs use a vowel-free alphabet (no aeiou, no 0/1) and
# vary in length with the current microsecond magnitude — typically
# 10-12 chars.
postfix_alphabet = set("BCDFGHJKLMNPQRSTVWXYZ23456789")
for _ in range(50):
mid = relay_mod._rand_msg_id()
assert len(mid) == 12
assert mid.isalnum()
assert 10 <= len(mid) <= 12
assert set(mid).issubset(postfix_alphabet)
# ── AUTH PLAIN decode ─────────────────────────────────────────────────────────