merge testing->tomerge/main #7
483
tests/test_service_isolation.py
Normal file
483
tests/test_service_isolation.py
Normal file
@@ -0,0 +1,483 @@
|
||||
"""
|
||||
Service isolation tests.
|
||||
|
||||
Verifies that each background worker handles missing dependencies gracefully
|
||||
and that failures in one service do not cascade to others.
|
||||
|
||||
Dependency graph under test:
|
||||
Collector → (Docker SDK, state file, log file)
|
||||
Ingester → (Collector's JSON output, DB repo)
|
||||
Attacker → (DB repo)
|
||||
Sniffer → (MACVLAN interface, scapy, state file)
|
||||
API → (DB init, all workers)
|
||||
|
||||
Each test disables or breaks one dependency and asserts the affected
|
||||
worker degrades gracefully while unrelated workers remain unaffected.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ─── Collector isolation ─────────────────────────────────────────────────────
|
||||
|
||||
class TestCollectorIsolation:
|
||||
"""Collector depends on Docker SDK and state file."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_collector_survives_docker_unavailable(self):
|
||||
"""Collector must not crash when Docker daemon is not running."""
|
||||
import docker as docker_mod
|
||||
from decnet.collector.worker import log_collector_worker
|
||||
|
||||
original_from_env = docker_mod.from_env
|
||||
with patch.object(docker_mod, "from_env",
|
||||
side_effect=Exception("Cannot connect to Docker daemon")):
|
||||
task = asyncio.create_task(log_collector_worker("/tmp/decnet-test-collector.log"))
|
||||
await asyncio.sleep(0.1)
|
||||
assert task.done()
|
||||
exc = task.exception()
|
||||
assert exc is None # Should not propagate exceptions
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_collector_survives_no_state_file(self):
|
||||
"""Collector must handle missing state file (no deckies deployed)."""
|
||||
from decnet.collector.worker import _load_service_container_names
|
||||
|
||||
with patch("decnet.config.load_state", return_value=None):
|
||||
result = _load_service_container_names()
|
||||
assert result == set()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_collector_survives_empty_fleet(self):
|
||||
"""Collector runs but finds no matching containers when fleet is empty."""
|
||||
import docker as docker_mod
|
||||
from decnet.collector.worker import log_collector_worker
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.containers.list.return_value = []
|
||||
mock_client.events.side_effect = Exception("connection closed")
|
||||
|
||||
with patch.object(docker_mod, "from_env", return_value=mock_client):
|
||||
with patch("decnet.config.load_state", return_value=None):
|
||||
task = asyncio.create_task(log_collector_worker("/tmp/decnet-test-collector.log"))
|
||||
await asyncio.sleep(0.1)
|
||||
assert task.done()
|
||||
assert task.exception() is None
|
||||
|
||||
def test_collector_container_filter_with_unknown_containers(self):
|
||||
"""is_service_container must reject containers not in state."""
|
||||
from decnet.collector.worker import is_service_container
|
||||
|
||||
with patch("decnet.collector.worker._load_service_container_names",
|
||||
return_value={"decky-01-ssh", "decky-01-http"}):
|
||||
assert is_service_container("decky-01-ssh") is True
|
||||
assert is_service_container("random-container") is False
|
||||
assert is_service_container("decky-99-ftp") is False
|
||||
|
||||
|
||||
# ─── Ingester isolation ──────────────────────────────────────────────────────
|
||||
|
||||
class TestIngesterIsolation:
|
||||
"""Ingester depends on collector's JSON output and DB repo."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ingester_survives_missing_log_file(self):
|
||||
"""Ingester must wait patiently when JSON log file doesn't exist yet."""
|
||||
from decnet.web.ingester import log_ingestion_worker
|
||||
|
||||
mock_repo = MagicMock()
|
||||
iterations = 0
|
||||
|
||||
async def _controlled_sleep(seconds):
|
||||
nonlocal iterations
|
||||
iterations += 1
|
||||
if iterations >= 3:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": "/tmp/nonexistent-decnet-test.log"}):
|
||||
with patch("decnet.web.ingester.asyncio.sleep", side_effect=_controlled_sleep):
|
||||
task = asyncio.create_task(log_ingestion_worker(mock_repo))
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await task
|
||||
# Should have waited at least 2 iterations without crashing
|
||||
assert iterations >= 2
|
||||
mock_repo.add_log.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ingester_survives_no_log_file_env(self):
|
||||
"""Ingester must exit gracefully when DECNET_INGEST_LOG_FILE is unset."""
|
||||
from decnet.web.ingester import log_ingestion_worker
|
||||
|
||||
mock_repo = MagicMock()
|
||||
with patch.dict(os.environ, {}, clear=False):
|
||||
# Remove the env var if it exists
|
||||
os.environ.pop("DECNET_INGEST_LOG_FILE", None)
|
||||
await log_ingestion_worker(mock_repo)
|
||||
# Should return immediately without error
|
||||
mock_repo.add_log.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ingester_survives_malformed_json(self, tmp_path):
|
||||
"""Ingester must skip malformed JSON lines without crashing."""
|
||||
from decnet.web.ingester import log_ingestion_worker
|
||||
|
||||
json_file = tmp_path / "test.json"
|
||||
json_file.write_text("not valid json\n{also broken\n")
|
||||
|
||||
mock_repo = MagicMock()
|
||||
mock_repo.add_log = AsyncMock()
|
||||
iterations = 0
|
||||
|
||||
async def _controlled_sleep(seconds):
|
||||
nonlocal iterations
|
||||
iterations += 1
|
||||
if iterations >= 3:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": str(tmp_path / "test.log")}):
|
||||
with patch("decnet.web.ingester.asyncio.sleep", side_effect=_controlled_sleep):
|
||||
task = asyncio.create_task(log_ingestion_worker(mock_repo))
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await task
|
||||
mock_repo.add_log.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ingester_exits_on_db_fatal_error(self, tmp_path):
|
||||
"""Ingester must exit cleanly on fatal DB errors (table missing, connection closed)."""
|
||||
from decnet.web.ingester import log_ingestion_worker
|
||||
|
||||
json_file = tmp_path / "test.json"
|
||||
valid_record = {
|
||||
"timestamp": "2026-01-01 00:00:00",
|
||||
"decky": "decky-01",
|
||||
"service": "ssh",
|
||||
"event_type": "login_attempt",
|
||||
"attacker_ip": "10.0.0.1",
|
||||
"fields": {},
|
||||
"msg": "",
|
||||
"raw_line": "<134>1 2026-01-01T00:00:00Z decky-01 ssh - login_attempt -",
|
||||
}
|
||||
json_file.write_text(json.dumps(valid_record) + "\n")
|
||||
|
||||
mock_repo = MagicMock()
|
||||
mock_repo.add_log = AsyncMock(side_effect=Exception("no such table: logs"))
|
||||
|
||||
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": str(tmp_path / "test.log")}):
|
||||
# Worker should exit the loop on fatal DB error
|
||||
await log_ingestion_worker(mock_repo)
|
||||
# Should have attempted to add the log before dying
|
||||
mock_repo.add_log.assert_awaited_once()
|
||||
|
||||
|
||||
# ─── Attacker worker isolation ───────────────────────────────────────────────
|
||||
|
||||
class TestAttackerWorkerIsolation:
|
||||
"""Attacker worker depends on DB repo."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_attacker_worker_survives_db_error(self):
|
||||
"""Attacker worker must catch DB errors and continue looping."""
|
||||
from decnet.web.attacker_worker import attacker_profile_worker
|
||||
|
||||
mock_repo = MagicMock()
|
||||
mock_repo.get_all_logs_raw = AsyncMock(side_effect=Exception("DB is locked"))
|
||||
mock_repo.get_max_log_id = AsyncMock(return_value=0)
|
||||
mock_repo.set_state = AsyncMock()
|
||||
|
||||
iterations = 0
|
||||
|
||||
async def _controlled_sleep(seconds):
|
||||
nonlocal iterations
|
||||
iterations += 1
|
||||
if iterations >= 3:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
with patch("decnet.web.attacker_worker.asyncio.sleep", side_effect=_controlled_sleep):
|
||||
task = asyncio.create_task(attacker_profile_worker(mock_repo))
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await task
|
||||
# Worker should have retried at least twice before we cancelled
|
||||
assert iterations >= 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_attacker_worker_survives_empty_db(self):
|
||||
"""Attacker worker must handle an empty database gracefully."""
|
||||
from decnet.web.attacker_worker import _WorkerState, _incremental_update
|
||||
|
||||
mock_repo = MagicMock()
|
||||
mock_repo.get_all_logs_raw = AsyncMock(return_value=[])
|
||||
mock_repo.get_max_log_id = AsyncMock(return_value=0)
|
||||
mock_repo.set_state = AsyncMock()
|
||||
|
||||
state = _WorkerState()
|
||||
await _incremental_update(mock_repo, state)
|
||||
assert state.initialized is True
|
||||
assert state.last_log_id == 0
|
||||
|
||||
|
||||
# ─── Sniffer isolation ───────────────────────────────────────────────────────
|
||||
|
||||
class TestSnifferIsolation:
|
||||
"""Sniffer depends on MACVLAN interface, scapy, and state file."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sniffer_survives_missing_interface(self):
|
||||
"""Sniffer must exit gracefully when MACVLAN interface doesn't exist."""
|
||||
from decnet.sniffer.worker import sniffer_worker
|
||||
|
||||
with patch("decnet.sniffer.worker._interface_exists", return_value=False):
|
||||
await sniffer_worker("/tmp/decnet-test-sniffer.log")
|
||||
# Should return without error
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sniffer_survives_no_state(self):
|
||||
"""Sniffer must exit gracefully when no deckies are deployed."""
|
||||
from decnet.sniffer.worker import sniffer_worker
|
||||
|
||||
with patch("decnet.sniffer.worker._interface_exists", return_value=True):
|
||||
with patch("decnet.config.load_state", return_value=None):
|
||||
await sniffer_worker("/tmp/decnet-test-sniffer.log")
|
||||
# Should return without error
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sniffer_survives_scapy_import_error(self):
|
||||
"""Sniffer must handle missing scapy library gracefully."""
|
||||
from decnet.sniffer.worker import _sniff_loop
|
||||
|
||||
import threading
|
||||
stop = threading.Event()
|
||||
|
||||
with patch("decnet.config.load_state", return_value=None):
|
||||
with patch.dict("sys.modules", {"scapy": None, "scapy.sendrecv": None}):
|
||||
# Should exit gracefully (no deckies = early return before scapy import)
|
||||
_sniff_loop("fake0", Path("/tmp/test.log"), Path("/tmp/test.json"), stop)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sniffer_survives_scapy_crash(self):
|
||||
"""Sniffer must handle scapy runtime errors without crashing the API."""
|
||||
from decnet.sniffer.worker import sniffer_worker
|
||||
|
||||
mock_state = MagicMock()
|
||||
mock_config = MagicMock()
|
||||
mock_config.deckies = [MagicMock(ip="192.168.1.10", name="decky-01")]
|
||||
|
||||
with patch("decnet.sniffer.worker._interface_exists", return_value=True):
|
||||
with patch("decnet.config.load_state", return_value=(mock_config, Path("/tmp"))):
|
||||
with patch("decnet.sniffer.worker.asyncio.to_thread",
|
||||
side_effect=Exception("scapy segfault")):
|
||||
# Should catch and log, not raise
|
||||
await sniffer_worker("/tmp/decnet-test-sniffer.log")
|
||||
|
||||
def test_sniffer_engine_ignores_non_decky_traffic(self):
|
||||
"""Engine must silently skip packets not involving any known decky."""
|
||||
from decnet.sniffer.fingerprint import SnifferEngine
|
||||
|
||||
written: list[str] = []
|
||||
engine = SnifferEngine(
|
||||
ip_to_decky={"192.168.1.10": "decky-01"},
|
||||
write_fn=written.append,
|
||||
)
|
||||
# Simulate a packet between two unknown IPs
|
||||
pkt = MagicMock()
|
||||
pkt.haslayer.return_value = True
|
||||
ip_layer = MagicMock()
|
||||
ip_layer.src = "10.0.0.1"
|
||||
ip_layer.dst = "10.0.0.2"
|
||||
tcp_layer = MagicMock()
|
||||
tcp_layer.sport = 12345
|
||||
tcp_layer.dport = 443
|
||||
tcp_layer.flags = MagicMock(value=0x10)
|
||||
tcp_layer.payload = b""
|
||||
pkt.__getitem__ = lambda self, cls: ip_layer if cls.__name__ == "IP" else tcp_layer
|
||||
# Import layers for haslayer check
|
||||
from scapy.layers.inet import IP, TCP
|
||||
pkt.haslayer.side_effect = lambda layer: True
|
||||
|
||||
engine.on_packet(pkt)
|
||||
assert written == [] # Nothing written for non-decky traffic
|
||||
|
||||
|
||||
# ─── API lifespan isolation ──────────────────────────────────────────────────
|
||||
|
||||
class TestApiLifespanIsolation:
|
||||
"""API lifespan must survive individual worker failures."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_api_survives_all_workers_failing(self):
|
||||
"""API must start and serve requests even if every worker fails to start."""
|
||||
from decnet.web.api import lifespan
|
||||
|
||||
mock_app = MagicMock()
|
||||
mock_repo = MagicMock()
|
||||
mock_repo.initialize = AsyncMock()
|
||||
|
||||
with patch("decnet.web.api.repo", mock_repo):
|
||||
with patch("decnet.web.api.log_ingestion_worker",
|
||||
side_effect=Exception("ingester exploded")):
|
||||
with patch("decnet.web.api.log_collector_worker",
|
||||
side_effect=Exception("collector exploded")):
|
||||
with patch("decnet.web.api.attacker_profile_worker",
|
||||
side_effect=Exception("attacker exploded")):
|
||||
with patch("decnet.sniffer.sniffer_worker",
|
||||
side_effect=Exception("sniffer exploded")):
|
||||
# API should still start
|
||||
async with lifespan(mock_app):
|
||||
mock_repo.initialize.assert_awaited_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_api_survives_db_init_failure(self):
|
||||
"""API must survive even if DB never initializes (5 failed attempts)."""
|
||||
from decnet.web.api import lifespan
|
||||
|
||||
mock_app = MagicMock()
|
||||
mock_repo = MagicMock()
|
||||
mock_repo.initialize = AsyncMock(side_effect=Exception("DB locked"))
|
||||
|
||||
with patch("decnet.web.api.repo", mock_repo):
|
||||
with patch("decnet.web.api.asyncio.sleep", new_callable=AsyncMock):
|
||||
with patch("decnet.web.api.log_ingestion_worker", return_value=asyncio.sleep(0)):
|
||||
with patch("decnet.web.api.log_collector_worker", return_value=asyncio.sleep(0)):
|
||||
with patch("decnet.web.api.attacker_profile_worker", return_value=asyncio.sleep(0)):
|
||||
async with lifespan(mock_app):
|
||||
# DB init failed 5 times but API is running
|
||||
assert mock_repo.initialize.await_count == 5
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_api_survives_sniffer_import_failure(self):
|
||||
"""API must start even if the sniffer module cannot be imported."""
|
||||
from decnet.web.api import lifespan
|
||||
|
||||
mock_app = MagicMock()
|
||||
mock_repo = MagicMock()
|
||||
mock_repo.initialize = AsyncMock()
|
||||
|
||||
with patch("decnet.web.api.repo", mock_repo):
|
||||
with patch("decnet.web.api.log_ingestion_worker", return_value=asyncio.sleep(0)):
|
||||
with patch("decnet.web.api.log_collector_worker", return_value=asyncio.sleep(0)):
|
||||
with patch("decnet.web.api.attacker_profile_worker", return_value=asyncio.sleep(0)):
|
||||
# Simulate sniffer import failure
|
||||
import builtins
|
||||
real_import = builtins.__import__
|
||||
|
||||
def _mock_import(name, *args, **kwargs):
|
||||
if name == "decnet.sniffer":
|
||||
raise ImportError("No module named 'scapy'")
|
||||
return real_import(name, *args, **kwargs)
|
||||
|
||||
with patch("builtins.__import__", side_effect=_mock_import):
|
||||
async with lifespan(mock_app):
|
||||
mock_repo.initialize.assert_awaited_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_shutdown_handles_already_dead_tasks(self):
|
||||
"""Shutdown must not crash when tasks have already completed or failed."""
|
||||
from decnet.web.api import lifespan
|
||||
|
||||
mock_app = MagicMock()
|
||||
mock_repo = MagicMock()
|
||||
mock_repo.initialize = AsyncMock()
|
||||
|
||||
# Workers that complete immediately
|
||||
async def _instant_worker(*args):
|
||||
return
|
||||
|
||||
with patch("decnet.web.api.repo", mock_repo):
|
||||
with patch("decnet.web.api.log_ingestion_worker", side_effect=_instant_worker):
|
||||
with patch("decnet.web.api.log_collector_worker", side_effect=_instant_worker):
|
||||
with patch("decnet.web.api.attacker_profile_worker", side_effect=_instant_worker):
|
||||
async with lifespan(mock_app):
|
||||
# Let tasks finish
|
||||
await asyncio.sleep(0.05)
|
||||
# Shutdown should handle already-done tasks gracefully
|
||||
|
||||
|
||||
# ─── Cross-service cascade tests ────────────────────────────────────────────
|
||||
|
||||
class TestCascadeIsolation:
|
||||
"""Verify that failure in one service does not cascade to others."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_collector_failure_does_not_kill_ingester(self, tmp_path):
|
||||
"""When collector dies, ingester must keep waiting (not crash)."""
|
||||
from decnet.web.ingester import log_ingestion_worker
|
||||
|
||||
json_file = tmp_path / "cascade.json"
|
||||
# File doesn't exist — simulates collector never writing
|
||||
|
||||
mock_repo = MagicMock()
|
||||
mock_repo.add_log = AsyncMock()
|
||||
iterations = 0
|
||||
|
||||
async def _controlled_sleep(seconds):
|
||||
nonlocal iterations
|
||||
iterations += 1
|
||||
if iterations >= 5:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": str(tmp_path / "cascade.log")}):
|
||||
with patch("decnet.web.ingester.asyncio.sleep", side_effect=_controlled_sleep):
|
||||
task = asyncio.create_task(log_ingestion_worker(mock_repo))
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await task
|
||||
# Ingester should have been patiently waiting, not crashing
|
||||
assert iterations >= 4
|
||||
mock_repo.add_log.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ingester_failure_does_not_kill_attacker(self):
|
||||
"""When ingester dies, attacker worker must keep running independently."""
|
||||
from decnet.web.attacker_worker import attacker_profile_worker
|
||||
|
||||
mock_repo = MagicMock()
|
||||
mock_repo.get_all_logs_raw = AsyncMock(return_value=[])
|
||||
mock_repo.get_max_log_id = AsyncMock(return_value=0)
|
||||
mock_repo.set_state = AsyncMock()
|
||||
mock_repo.get_logs_after_id = AsyncMock(return_value=[])
|
||||
|
||||
iterations = 0
|
||||
|
||||
async def _controlled_sleep(seconds):
|
||||
nonlocal iterations
|
||||
iterations += 1
|
||||
if iterations >= 3:
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
with patch("decnet.web.attacker_worker.asyncio.sleep", side_effect=_controlled_sleep):
|
||||
task = asyncio.create_task(attacker_profile_worker(mock_repo))
|
||||
with pytest.raises(asyncio.CancelledError):
|
||||
await task
|
||||
# Attacker should have run independently
|
||||
assert iterations >= 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sniffer_crash_does_not_affect_collector(self):
|
||||
"""Sniffer crash must not affect collector operation."""
|
||||
from decnet.collector.worker import is_service_container, is_service_event
|
||||
|
||||
# These should work regardless of sniffer state
|
||||
with patch("decnet.collector.worker._load_service_container_names",
|
||||
return_value={"decky-01-ssh"}):
|
||||
assert is_service_container("decky-01-ssh") is True
|
||||
assert is_service_event({"name": "decky-01-ssh"}) is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_db_failure_does_not_crash_sniffer(self):
|
||||
"""Sniffer has no DB dependency — must be completely unaffected by DB issues."""
|
||||
from decnet.sniffer.fingerprint import SnifferEngine
|
||||
|
||||
written: list[str] = []
|
||||
engine = SnifferEngine(
|
||||
ip_to_decky={"192.168.1.10": "decky-01"},
|
||||
write_fn=written.append,
|
||||
)
|
||||
# Engine should work with zero DB interaction
|
||||
engine._log("decky-01", "tls_client_hello", src_ip="10.0.0.1", ja3="abc", ja4="def")
|
||||
assert len(written) == 1
|
||||
assert "decky-01" in written[0]
|
||||
Reference in New Issue
Block a user