Files
DECNET/tests/live/test_service_isolation_live.py

502 lines
19 KiB
Python

"""
Live service isolation tests.
Unlike tests/test_service_isolation.py (which mocks dependencies), these tests
run real workers against real (temporary) resources to verify graceful degradation
in conditions that actually occur on a host machine.
Dependency graph under test:
Collector → (Docker SDK, state file, log file)
Ingester → (Collector's JSON output, DB repo)
Attacker → (DB repo)
Sniffer → (MACVLAN interface, scapy, state file)
API → (DB init, all workers, Docker, health endpoint)
Run: pytest -m live tests/live/test_service_isolation_live.py -v
"""
import asyncio
import json
import os
import uuid as _uuid
from pathlib import Path
import httpx
import pytest
pytestmark = pytest.mark.skipif(
os.environ.get("CI") == "true",
reason="live tests run locally, CI environment not advanced enough to handle this."
)
# Must be set before any decnet import
os.environ.setdefault("DECNET_JWT_SECRET", "test-secret-key-at-least-32-chars-long!!")
os.environ.setdefault("DECNET_ADMIN_PASSWORD", "test-password-123")
os.environ["DECNET_CONTRACT_TEST"] = "true"
from decnet.collector.worker import ( # noqa: E402
log_collector_worker,
parse_rfc5424,
_load_service_container_names,
is_service_container,
)
from decnet.web.ingester import log_ingestion_worker # noqa: E402
from decnet.profiler.worker import ( # noqa: E402
attacker_profile_worker,
_WorkerState,
_incremental_update,
)
from decnet.sniffer.worker import sniffer_worker, _interface_exists # noqa: E402
from decnet.web.api import app, lifespan # noqa: E402
from decnet.web.dependencies import repo # noqa: E402
from decnet.web.db.models import User, Log # noqa: E402
from decnet.web.auth import get_password_hash # noqa: E402
from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD # noqa: E402
from sqlmodel import SQLModel # noqa: E402
from sqlalchemy import select # noqa: E402
from sqlalchemy.ext.asyncio import ( # noqa: E402
AsyncSession,
async_sessionmaker,
create_async_engine,
)
from sqlalchemy.pool import StaticPool # noqa: E402
# ─── Shared fixtures ────────────────────────────────────────────────────────
@pytest.fixture(scope="module", loop_scope="module", autouse=True)
async def live_db():
"""Real in-memory SQLite — shared across this module."""
engine = create_async_engine(
"sqlite+aiosqlite:///:memory:",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
session_factory = async_sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
repo.engine = engine
repo.session_factory = session_factory
async with engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all)
async with session_factory() as session:
existing = await session.execute(
select(User).where(User.username == DECNET_ADMIN_USER)
)
if not existing.scalar_one_or_none():
session.add(
User(
uuid=str(_uuid.uuid4()),
username=DECNET_ADMIN_USER,
password_hash=get_password_hash(DECNET_ADMIN_PASSWORD),
role="admin",
must_change_password=False,
)
)
await session.commit()
yield
await engine.dispose()
@pytest.fixture(scope="module", loop_scope="module")
async def live_client(live_db):
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=app),
base_url="http://test",
) as ac:
yield ac
@pytest.fixture(scope="module", loop_scope="module")
async def token(live_client):
resp = await live_client.post(
"/api/v1/auth/login",
json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD},
)
return resp.json()["access_token"]
# ─── Collector live isolation ────────────────────────────────────────────────
@pytest.mark.live
class TestCollectorLiveIsolation:
"""Real collector behaviour against the actual Docker daemon."""
async def test_collector_finds_no_deckies_without_state(self, tmp_path):
"""With no deckies in state and no DECNET labels, the scan rejects
every container.
is_service_container has two acceptance paths:
1. label-based (decnet.fleet.service / decnet.topology.service)
2. name match against decnet-state.json
With state empty AND labels absent, both paths must reject. We
feed synthetic container objects (no real Docker call) so the
result is independent of whatever fleet may already be running on
the host — which would otherwise satisfy path (1).
"""
import decnet.config as cfg
from unittest.mock import MagicMock
original_state = cfg.STATE_FILE
try:
cfg.STATE_FILE = tmp_path / "empty-state.json"
unlabeled = MagicMock()
unlabeled.name = "some-random-container"
unlabeled.attrs = {"Config": {"Labels": {}}}
unlabeled.labels = {}
assert is_service_container(unlabeled) is False
finally:
cfg.STATE_FILE = original_state
async def test_state_loader_returns_empty_without_state_file(self):
"""Real _load_service_container_names against no state file."""
import decnet.config as cfg
original = cfg.STATE_FILE
try:
cfg.STATE_FILE = Path("/tmp/nonexistent-decnet-state-live.json")
result = _load_service_container_names()
assert result == set()
finally:
cfg.STATE_FILE = original
def test_rfc5424_parser_handles_real_formats(self):
"""Parser works on real log lines, not just test fixtures."""
valid = '<134>1 2026-04-14T12:00:00Z decky-01 ssh - login_attempt [relay@55555 src_ip="10.0.0.1" username="root" password="toor"] Failed login'
result = parse_rfc5424(valid)
assert result is not None
assert result["decky"] == "decky-01"
assert result["service"] == "ssh"
assert result["attacker_ip"] == "10.0.0.1"
assert result["fields"]["username"] == "root"
# Garbage must return None, not crash
assert parse_rfc5424("random garbage") is None
assert parse_rfc5424("") is None
def test_container_filter_rejects_real_system_containers(self):
"""is_service_container must not match system containers."""
import decnet.config as cfg
original = cfg.STATE_FILE
try:
cfg.STATE_FILE = Path("/tmp/nonexistent-decnet-state-live.json")
# With no state, nothing is a service container
assert is_service_container("dockerd") is False
assert is_service_container("portainer") is False
assert is_service_container("kube-proxy") is False
finally:
cfg.STATE_FILE = original
# ─── Ingester live isolation ─────────────────────────────────────────────────
@pytest.mark.live
class TestIngesterLiveIsolation:
"""Real ingester against real DB and real filesystem."""
async def test_ingester_waits_for_missing_log_file(self, tmp_path):
"""Ingester must poll patiently when the log file doesn't exist yet."""
log_base = str(tmp_path / "missing.log")
os.environ["DECNET_INGEST_LOG_FILE"] = log_base
try:
task = asyncio.create_task(log_ingestion_worker(repo))
await asyncio.sleep(0.5)
assert not task.done(), "Ingester should be waiting, not exited"
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
finally:
os.environ.pop("DECNET_INGEST_LOG_FILE", None)
async def test_ingester_processes_real_json_into_db(self, tmp_path):
"""Write real JSON log lines → ingester inserts them into the real DB."""
json_file = tmp_path / "ingest.json"
log_base = str(tmp_path / "ingest.log")
record = {
"timestamp": "2026-04-14 12:00:00",
"decky": "decky-live-01",
"service": "ssh",
"event_type": "login_attempt",
"attacker_ip": "10.99.99.1",
"fields": {"username": "root", "password": "toor"},
"msg": "Failed login",
"raw_line": '<134>1 2026-04-14T12:00:00Z decky-live-01 ssh - login_attempt [relay@55555 src_ip="10.99.99.1"] Failed login',
}
json_file.write_text(json.dumps(record) + "\n")
os.environ["DECNET_INGEST_LOG_FILE"] = log_base
try:
task = asyncio.create_task(log_ingestion_worker(repo))
# Give ingester time to pick up the file and process it
await asyncio.sleep(1.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# Verify the record landed in the real DB
total = await repo.get_total_logs()
assert total >= 1
logs = await repo.get_logs(limit=100, offset=0)
matching = [l for l in logs if l["attacker_ip"] == "10.99.99.1"]
assert len(matching) >= 1
assert matching[0]["service"] == "ssh"
finally:
os.environ.pop("DECNET_INGEST_LOG_FILE", None)
async def test_ingester_skips_malformed_lines_without_crashing(self, tmp_path):
"""Ingester must skip bad JSON and keep going on good lines."""
json_file = tmp_path / "mixed.json"
log_base = str(tmp_path / "mixed.log")
good_record = {
"timestamp": "2026-04-14 13:00:00",
"decky": "decky-live-02",
"service": "http",
"event_type": "request",
"attacker_ip": "10.88.88.1",
"fields": {},
"msg": "",
"raw_line": "<134>1 2026-04-14T13:00:00Z decky-live-02 http - request -",
}
json_file.write_text(
"not valid json\n"
"{broken too\n"
+ json.dumps(good_record)
+ "\n"
)
os.environ["DECNET_INGEST_LOG_FILE"] = log_base
try:
task = asyncio.create_task(log_ingestion_worker(repo))
await asyncio.sleep(1.5)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# The good record should have made it through
logs = await repo.get_logs(limit=100, offset=0)
matching = [l for l in logs if l["attacker_ip"] == "10.88.88.1"]
assert len(matching) >= 1
finally:
os.environ.pop("DECNET_INGEST_LOG_FILE", None)
async def test_ingester_exits_gracefully_without_env_var(self):
"""Ingester must return immediately when DECNET_INGEST_LOG_FILE is unset."""
os.environ.pop("DECNET_INGEST_LOG_FILE", None)
# Should complete instantly with no error
await log_ingestion_worker(repo)
# ─── Attacker worker live isolation ──────────────────────────────────────────
@pytest.mark.live
class TestAttackerWorkerLiveIsolation:
"""Real attacker worker against real DB."""
async def test_attacker_worker_cold_starts_on_empty_db(self):
"""Worker cold start must handle an empty database without error."""
state = _WorkerState()
await _incremental_update(repo, state)
assert state.initialized is True
async def test_attacker_worker_builds_profile_from_real_logs(self):
"""Worker must build attacker profiles from logs already in the DB."""
# Seed some logs from a known attacker IP
for i in range(3):
await repo.add_log({
"timestamp": f"2026-04-14 14:0{i}:00",
"decky": "decky-live-03",
"service": "ssh" if i < 2 else "http",
"event_type": "login_attempt",
"attacker_ip": "10.77.77.1",
"fields": {"username": "admin"},
"msg": "",
"raw_line": f'<134>1 2026-04-14T14:0{i}:00Z decky-live-03 {"ssh" if i < 2 else "http"} - login_attempt [relay@55555 src_ip="10.77.77.1" username="admin"]',
})
state = _WorkerState()
await _incremental_update(repo, state)
# The worker should have created an attacker record
result = await repo.get_attackers(limit=100, offset=0, search="10.77.77.1")
matching = [a for a in result if a["ip"] == "10.77.77.1"]
assert len(matching) >= 1
assert matching[0]["event_count"] >= 3
async def test_attacker_worker_survives_cycle_with_no_new_logs(self):
"""Incremental update with no new logs must not crash or corrupt state."""
state = _WorkerState()
await _incremental_update(repo, state)
last_id = state.last_log_id
# Second update with no new data
await _incremental_update(repo, state)
assert state.last_log_id >= last_id # unchanged or higher
# ─── Sniffer live isolation ──────────────────────────────────────────────────
@pytest.mark.live
class TestSnifferLiveIsolation:
"""Real sniffer against the actual host network stack."""
async def test_sniffer_exits_cleanly_no_interface(self, tmp_path):
"""Sniffer must exit gracefully when MACVLAN interface doesn't exist."""
os.environ["DECNET_SNIFFER_IFACE"] = "decnet_fake_iface_xyz"
try:
await sniffer_worker(str(tmp_path / "sniffer.log"))
# Should return without exception
finally:
os.environ.pop("DECNET_SNIFFER_IFACE", None)
def test_interface_exists_check_works(self):
"""_interface_exists returns True for loopback, False for nonsense."""
import os
lo_exists = os.path.exists("/sys/class/net/lo")
if lo_exists:
assert _interface_exists("lo") is True
else:
pytest.skip("loopback interface not found, probably in CI. passing...")
assert _interface_exists("definitely_not_a_real_iface") is False
def test_sniffer_engine_isolation_from_db(self):
"""SnifferEngine has zero DB dependency — works standalone."""
from decnet.sniffer.fingerprint import SnifferEngine
written: list[str] = []
engine = SnifferEngine(
ip_to_decky={"192.168.1.10": "decky-01"},
write_fn=written.append,
)
engine._log("decky-01", "tls_client_hello", src_ip="10.0.0.1", ja3="abc123")
assert len(written) == 1
assert "decky-01" in written[0]
assert "10.0.0.1" in written[0]
# ─── API lifespan live isolation ─────────────────────────────────────────────
@pytest.mark.live
class TestApiLifespanLiveIsolation:
"""Real API lifespan against real DB and real host state."""
async def test_api_serves_requests_in_contract_mode(
self, live_client, token
):
"""With workers disabled, API must still serve all endpoints."""
# Stats
resp = await live_client.get(
"/api/v1/stats",
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 200
# Health
resp = await live_client.get(
"/api/v1/health",
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code in (200, 503)
assert "components" in resp.json()
async def test_health_reflects_real_db_state(self, live_client, token):
"""Health endpoint correctly reports DB as ok with real in-memory DB."""
resp = await live_client.get(
"/api/v1/health",
headers={"Authorization": f"Bearer {token}"},
)
assert resp.json()["components"]["database"]["status"] == "ok"
async def test_health_reports_workers_not_started(self, live_client, token):
"""In contract-test mode, workers are not started — health must report that."""
resp = await live_client.get(
"/api/v1/health",
headers={"Authorization": f"Bearer {token}"},
)
data = resp.json()
for w in ("ingestion_worker", "collector_worker", "attacker_worker"):
assert data["components"][w]["status"] == "failing"
assert "not started" in data["components"][w]["detail"]
# ─── Cross-service cascade live tests ────────────────────────────────────────
@pytest.mark.live
class TestCascadeLiveIsolation:
"""Verify that real component failures do not cascade."""
async def test_ingester_survives_collector_never_writing(self, tmp_path):
"""When the collector never writes output, ingester waits without crashing."""
log_base = str(tmp_path / "no-collector.log")
os.environ["DECNET_INGEST_LOG_FILE"] = log_base
try:
task = asyncio.create_task(log_ingestion_worker(repo))
await asyncio.sleep(0.5)
assert not task.done(), "Ingester crashed instead of waiting"
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
finally:
os.environ.pop("DECNET_INGEST_LOG_FILE", None)
async def test_api_serves_during_worker_failure(self, live_client, token):
"""API must respond to requests even when all workers are dead."""
# Verify multiple endpoints still work
for endpoint in ("/api/v1/stats", "/api/v1/health", "/api/v1/logs"):
resp = await live_client.get(
endpoint,
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code != 500, f"{endpoint} returned 500"
async def test_sniffer_failure_invisible_to_api(self, live_client, token):
"""Sniffer crash must not affect API responses."""
# Force sniffer to fail
os.environ["DECNET_SNIFFER_IFACE"] = "nonexistent_iface_xyz"
try:
await sniffer_worker(str(Path("/tmp/sniffer-cascade.log")))
finally:
os.environ.pop("DECNET_SNIFFER_IFACE", None)
# API should be completely unaffected
resp = await live_client.get(
"/api/v1/stats",
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code == 200
async def test_attacker_worker_independent_of_ingester(self):
"""Attacker worker runs against real DB regardless of ingester state."""
state = _WorkerState()
# Should work fine — it queries the DB directly, not the ingester
await _incremental_update(repo, state)
assert state.initialized is True