Files
DECNET/tests/services/test_custom_service.py
anti ea95a009df refactor(tests): move flat tests/*.py into per-subsystem subfolders
Groups every flat test_*.py under the module it exercises, matching the
existing tests/{profiler,sniffer,prober,collector,correlation,cli,web,
topology,swarm,bus,updater,api,docker,geoip,...} layout. New folders:
services/, fleet/, config/, logging/, db/ (+ db/mysql/), telemetry/,
mutator/, core/.

Path-dependent __file__ references bumped an extra .parent in three
files that moved one level deeper:
- tests/sniffer/test_sniffer_ja3.py   (template path)
- tests/services/test_ssh_capture_emit.py (template path)
- tests/cli/test_mode_gating.py  (REPO root)
- tests/web/test_env_lazy_jwt.py (repo var)

Also drops two SQLite runtime artifacts (test_decnet.db-{shm,wal}) that
were leaking into the repo from a previous test run.

Fixes two test_service_isolation cases that patched asyncio.sleep (no
longer on the profiler main-loop hot path — same pre-existing bug I
fixed earlier in test_attacker_worker.py) by patching asyncio.wait_for
and passing interval=0.
2026-04-23 21:34:25 -04:00

65 lines
2.4 KiB
Python

"""
Tests for decnet.custom_service — BYOS (bring-your-own-service) support.
"""
from decnet.custom_service import CustomService
class TestCustomServiceComposeFragment:
def _svc(self, name="my-tool", image="myrepo/mytool:latest",
exec_cmd="", ports=None):
return CustomService(name=name, image=image,
exec_cmd=exec_cmd, ports=ports)
def test_basic_fragment_structure(self):
svc = self._svc()
frag = svc.compose_fragment("decky-01")
assert frag["image"] == "myrepo/mytool:latest"
assert frag["container_name"] == "decky-01-my-tool"
assert frag["restart"] == "unless-stopped"
assert frag["environment"]["NODE_NAME"] == "decky-01"
def test_underscores_in_name_become_dashes(self):
svc = self._svc(name="my_custom_tool")
frag = svc.compose_fragment("decky-01")
assert frag["container_name"] == "decky-01-my-custom-tool"
def test_exec_cmd_is_split_into_list(self):
svc = self._svc(exec_cmd="/usr/bin/server --port 8080")
frag = svc.compose_fragment("decky-01")
assert frag["command"] == ["/usr/bin/server", "--port", "8080"]
def test_empty_exec_cmd_omits_command_key(self):
svc = self._svc(exec_cmd="")
frag = svc.compose_fragment("decky-01")
assert "command" not in frag
def test_log_target_injected_into_environment(self):
svc = self._svc()
frag = svc.compose_fragment("decky-01", log_target="10.0.0.5:5140")
assert frag["environment"]["LOG_TARGET"] == "10.0.0.5:5140"
def test_no_log_target_omits_key(self):
svc = self._svc()
frag = svc.compose_fragment("decky-01", log_target=None)
assert "LOG_TARGET" not in frag["environment"]
def test_service_cfg_is_accepted_without_error(self):
svc = self._svc()
# service_cfg is accepted but not used by CustomService
frag = svc.compose_fragment("decky-01", service_cfg={"key": "val"})
assert frag is not None
def test_ports_stored_on_instance(self):
svc = CustomService("tool", "img", "", ports=[8080, 9090])
assert svc.ports == [8080, 9090]
def test_no_ports_defaults_to_empty_list(self):
svc = CustomService("tool", "img", "")
assert svc.ports == []
class TestCustomServiceDockerfileContext:
def test_returns_none(self):
svc = CustomService("tool", "img", "cmd")
assert svc.dockerfile_context() is None