diff --git a/decnet/bus/unix_client.py b/decnet/bus/unix_client.py index e8d9fd79..f42fea41 100644 --- a/decnet/bus/unix_client.py +++ b/decnet/bus/unix_client.py @@ -105,14 +105,25 @@ class UnixSocketBus(BaseBus): # ─── Lifecycle ────────────────────────────────────────────────────────── async def connect(self) -> None: + # Double-checked locking: the cheap unlocked check fast-paths the + # already-connected case, but the actual connect must hold ``_lock`` + # so two coroutines racing on a fresh bus (e.g. concurrent + # publish()/subscribe() both lazily calling connect()) can't each + # open a socket and spawn a reader task — the loser would orphan a + # live FD and an uncancelled reader_loop that close() never reaps. if self._writer is not None: return - if self._closed: - raise RuntimeError("connect on closed bus") - self._reader, self._writer = await asyncio.open_unix_connection(str(self._path)) - await self._send(protocol.encode(protocol.HELLO, args=self._client_name)) - self._reader_task = asyncio.create_task(self._reader_loop()) - log.debug("bus.client: connected to %s as %s", self._path, self._client_name) + async with self._lock: + # Re-check under the lock: a racing caller may have connected + # while we awaited the lock. + if self._writer is not None: + return + if self._closed: + raise RuntimeError("connect on closed bus") + self._reader, self._writer = await asyncio.open_unix_connection(str(self._path)) + await self._send(protocol.encode(protocol.HELLO, args=self._client_name)) + self._reader_task = asyncio.create_task(self._reader_loop()) + log.debug("bus.client: connected to %s as %s", self._path, self._client_name) async def close(self) -> None: if self._closed: diff --git a/decnet/engine/deployer.py b/decnet/engine/deployer.py index c08f7b7b..626e3732 100644 --- a/decnet/engine/deployer.py +++ b/decnet/engine/deployer.py @@ -1091,6 +1091,18 @@ async def deploy_topology(repo, topology_id: str, *, dry_run: bool = False) -> N lambda: _compose_ps(compose_path, project=compose_project), ) bad: list[str] = [] + # Build the set of expected decky base names so we can distinguish base + # containers from service containers without relying on a hyphen heuristic. + # The generator names every decky "decky-" (which contains a hyphen), + # so `"-" not in service_name` would never match generator-named deckies. + # Instead, explicitly check membership in the known decky name set. + expected_decky_names: set[str] = set() + for _d in hydrated.get("deckies", []): + _cfg = _d.get("decky_config") or {} + _dn = _cfg.get("name") or _d.get("name") + if _dn: + expected_decky_names.add(_dn) + # Build the per-decky state map. The base container's compose # service name == decky name, which is what we cache on the # TopologyDecky row. Service containers (named ``-``) @@ -1100,8 +1112,8 @@ async def deploy_topology(repo, topology_id: str, *, dry_run: bool = False) -> N for row in ps_rows: state = str(row.get("State", "")).lower() service_name = str(row.get("Service") or "") - if service_name and "-" not in service_name: - # Plain decky base; cache its docker state. + if service_name and service_name in expected_decky_names: + # This is a decky base container; cache its docker state. decky_state_by_name[service_name] = state or "unknown" if state and state != "running": name = str(row.get("Name") or row.get("Service") or "?") diff --git a/decnet/intel/worker.py b/decnet/intel/worker.py index 7262a8fd..e957e4ad 100644 --- a/decnet/intel/worker.py +++ b/decnet/intel/worker.py @@ -104,8 +104,12 @@ async def _enrich_one( value the providers see and is denormalised onto the row for SIEM / audit consumers. """ + async def _guarded_lookup(p: IntelProvider, ip: str) -> IntelResult: + async with p._semaphore: + return await p.lookup(ip) + results: list[IntelResult] = await asyncio.gather( - *(p.lookup(ip) for p in providers), + *(_guarded_lookup(p, ip) for p in providers), return_exceptions=False, # providers contractually never raise ) diff --git a/decnet/orchestrator/drivers/__init__.py b/decnet/orchestrator/drivers/__init__.py index ae97a534..b9759e21 100644 --- a/decnet/orchestrator/drivers/__init__.py +++ b/decnet/orchestrator/drivers/__init__.py @@ -13,7 +13,7 @@ from decnet.orchestrator.drivers.base import ( ActivityResult, Driver, ) -from decnet.orchestrator.scheduler import Action, FileAction, TrafficAction +from decnet.orchestrator.scheduler import Action, EditAction, FileAction, TrafficAction __all__ = [ "ActivityDriver", @@ -58,7 +58,7 @@ def get_driver_for(action: Action) -> ActivityDriver: # modules out of every importer's graph. from decnet.orchestrator.drivers.ssh import SSHDriver - if isinstance(action, (TrafficAction, FileAction)): + if isinstance(action, (TrafficAction, FileAction, EditAction)): return SSHDriver() # EmailAction lands in stage 5; reachable only after that import is # added to scheduler. Importing inside the branch avoids a cycle @@ -66,7 +66,7 @@ def get_driver_for(action: Action) -> ActivityDriver: try: from decnet.orchestrator.emailgen.scheduler import EmailAction except ImportError: # pragma: no cover - scheduler always exists - EmailAction = None # type: ignore[assignment, misc] + EmailAction = None # type: ignore[misc] if EmailAction is not None and isinstance(action, EmailAction): from decnet.orchestrator.drivers.email import EmailDriver return EmailDriver() diff --git a/decnet/ttp/impl/_rule_index.py b/decnet/ttp/impl/_rule_index.py index 47a1a097..cf4db38a 100644 --- a/decnet/ttp/impl/_rule_index.py +++ b/decnet/ttp/impl/_rule_index.py @@ -68,6 +68,15 @@ class RuleIndex: if not rule.applies_to and not rule.emits: self.evict(rule.rule_id) return + # Evict stale kind-buckets for any kinds the updated rule no longer + # claims, so re-install with a narrower applies_to doesn't leave + # ghost entries in the old buckets. + old = self._by_rule.get(rule.rule_id) + if old is not None: + stale_kinds = old.applies_to - rule.applies_to + for kind in stale_kinds: + current = self._by_kind.get(kind, []) + self._by_kind[kind] = [r for r in current if r.rule_id != rule.rule_id] self._by_rule[rule.rule_id] = rule for kind in rule.applies_to: current = self._by_kind.get(kind, []) diff --git a/decnet/web/db/mysql/repository.py b/decnet/web/db/mysql/repository.py index d6dec907..1e81c480 100644 --- a/decnet/web/db/mysql/repository.py +++ b/decnet/web/db/mysql/repository.py @@ -118,10 +118,10 @@ class MySQLRepository(SQLModelRepository): await lock_conn.execute(text("SELECT RELEASE_LOCK('decnet_schema_init')")) await lock_conn.close() - def _json_field_equals(self, key: str): + def _json_field_equals(self, key: str, param_name: str = "val"): # MySQL 5.7+ exposes JSON_EXTRACT; quoted string result returned for # TEXT-stored JSON, same behavior we rely on in SQLite. - return text(f"JSON_UNQUOTE(JSON_EXTRACT(fields, '$.{key}')) = :val") + return text(f"JSON_UNQUOTE(JSON_EXTRACT(fields, '$.{key}')) = :{param_name}") async def _insert_tags_or_ignore(self, rows: list[TTPTag]) -> int: """Bulk-insert with MySQL's ``INSERT IGNORE`` on the ``uuid`` PK. diff --git a/decnet/web/db/sqlite/repository.py b/decnet/web/db/sqlite/repository.py index f4f77b05..97f10220 100644 --- a/decnet/web/db/sqlite/repository.py +++ b/decnet/web/db/sqlite/repository.py @@ -56,9 +56,9 @@ class SQLiteRepository(SQLModelRepository): "ALTER TABLE attackers ADD COLUMN country_source VARCHAR(16)" )) - def _json_field_equals(self, key: str): + def _json_field_equals(self, key: str, param_name: str = "val"): # SQLite stores JSON as text; json_extract is the canonical accessor. - return text(f"json_extract(fields, '$.{key}') = :val") + return text(f"json_extract(fields, '$.{key}') = :{param_name}") async def _insert_tags_or_ignore(self, rows: list[TTPTag]) -> int: """Bulk-insert with SQLite's ``ON CONFLICT DO NOTHING`` on the diff --git a/decnet/web/db/sqlmodel_repo/logs.py b/decnet/web/db/sqlmodel_repo/logs.py index a6f4e393..6c7712a8 100644 --- a/decnet/web/db/sqlmodel_repo/logs.py +++ b/decnet/web/db/sqlmodel_repo/logs.py @@ -84,6 +84,7 @@ class LogsMixin(_MixinBase): "attacker_ip": Log.attacker_ip, } + _json_token_idx = 0 for token in tokens: if ":" in token: key, val = token.split(":", 1) @@ -92,9 +93,15 @@ class LogsMixin(_MixinBase): else: key_safe = re.sub(r"[^a-zA-Z0-9_]", "", key) if key_safe: + # Each JSON-field filter needs its own bind-param + # name; sharing `:val` across multiple tokens means + # only the last `.params(val=...)` call survives + # and earlier filters match the wrong value. + param_name = f"jval_{_json_token_idx}" + _json_token_idx += 1 statement = statement.where( - self._json_field_equals(key_safe) - ).params(val=val) + self._json_field_equals(key_safe, param_name) + ).params(**{param_name: val}) else: lk = f"%{token}%" statement = statement.where( @@ -107,15 +114,17 @@ class LogsMixin(_MixinBase): ) return statement - def _json_field_equals(self, key: str): - """Return a text() predicate that matches rows where fields->key == :val. + def _json_field_equals(self, key: str, param_name: str = "val"): + """Return a text() predicate that matches rows where fields->key == :. Both SQLite and MySQL expose a ``JSON_EXTRACT`` function; MySQL also exposes the same function under ``json_extract`` (case-insensitive). - The ``:val`` parameter is bound separately and must be supplied with - ``.params(val=...)`` by the caller, which keeps us safe from injection. + The bind parameter is supplied with ``.params(=...)`` by + the caller. Pass a distinct ``param_name`` for each token so that + multiple JSON-field filters in the same query each bind their own + value instead of sharing the last-written ``:val``. """ - return text(f"JSON_EXTRACT(fields, '$.{key}') = :val") + return text(f"JSON_EXTRACT(fields, '$.{key}') = :{param_name}") async def get_logs( self, diff --git a/decnet/web/router/swarm/api_check_hosts.py b/decnet/web/router/swarm/api_check_hosts.py index 5a931c42..beff44d1 100644 --- a/decnet/web/router/swarm/api_check_hosts.py +++ b/decnet/web/router/swarm/api_check_hosts.py @@ -59,6 +59,9 @@ async def api_check_hosts( detail=body, ) except Exception as exc: + # Log the real exception server-side; never surface internal + # exception text (file paths, TLS internals, library guts) to the + # caller. Same fail-closed posture as the global 500 handler. log.warning("swarm.check unreachable host=%s err=%s", host["name"], exc) await repo.update_swarm_host(host["uuid"], {"status": "unreachable"}) return SwarmHostHealth( @@ -66,7 +69,7 @@ async def api_check_hosts( name=host["name"], address=host["address"], reachable=False, - detail=str(exc), + detail="probe failed", ) results = await asyncio.gather(*(_probe(h) for h in hosts)) diff --git a/tests/bus/test_unix_socket_bus.py b/tests/bus/test_unix_socket_bus.py index 5606f888..4ea5f30e 100644 --- a/tests/bus/test_unix_socket_bus.py +++ b/tests/bus/test_unix_socket_bus.py @@ -7,6 +7,7 @@ the tmp filesystem — no Docker, no external broker. from __future__ import annotations import asyncio +import contextlib import pathlib import stat @@ -130,3 +131,52 @@ class TestEndToEnd: server = BusServer(sock, group=None) with pytest.raises(FileNotFoundError): await server.start() + + +class TestConcurrentConnect: + """BUG-5: connect() must hold the bus lock so concurrent first-connects + can't each open a socket and spawn a reader (orphaning the loser's FD + + reader_loop task).""" + + async def test_concurrent_connect_opens_one_socket_and_reader( + self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + sock = tmp_path / "bus.sock" + server = BusServer(sock, group=None) + await server.start() + serve_task = asyncio.create_task(server.serve_forever()) + + bus = UnixSocketBus(sock, client_name="race-client") + + # Wrap the real transport opener with a counter, and yield control + # mid-open so two racing connect() calls actually interleave. Without + # the lock both would pass the `self._writer is None` guard and each + # open a socket; the lock + re-check collapse that to one open. + real_open = asyncio.open_unix_connection + calls = 0 + + async def _counting_open(*args, **kwargs): + nonlocal calls + calls += 1 + await asyncio.sleep(0) # force the scheduler to interleave callers + return await real_open(*args, **kwargs) + + monkeypatch.setattr(asyncio, "open_unix_connection", _counting_open) + + try: + await asyncio.gather(bus.connect(), bus.connect(), bus.connect()) + + # Exactly one socket opened and exactly one live reader task. + assert calls == 1 + assert bus._writer is not None + assert bus._reader_task is not None + assert not bus._reader_task.done() + finally: + await bus.close() + serve_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await serve_task + await server.close() + + # close() reaped the single reader task — no orphans left behind. + assert bus._reader_task is None diff --git a/tests/db/test_log_multi_token_search.py b/tests/db/test_log_multi_token_search.py new file mode 100644 index 00000000..6deb4dc6 --- /dev/null +++ b/tests/db/test_log_multi_token_search.py @@ -0,0 +1,113 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Regression test for BUG-6: multi-token JSON-field search collapses all +custom filters to the last value (shared :val bind parameter). + +Root cause: ``_apply_filters`` loops over search tokens and for each +JSON-field token calls ``.params(val=val)``. Because every call reuses the +same bind name ``:val``, SQLAlchemy's last ``.params()`` call overwrites all +earlier ones — only the last JSON-field token's value is actually bound. + +The fix gives each JSON-field token a distinct bind parameter name +(``jval_0``, ``jval_1``, …) so every token value survives. +""" +from __future__ import annotations + +import json +from pathlib import Path + +import pytest + +from decnet.web.db.factory import get_repository + + +@pytest.fixture +async def repo(tmp_path: Path): + r = get_repository(db_path=str(tmp_path / "log_search.db")) + await r.initialize() + return r + + +async def _add_log(repo, **kwargs) -> None: + base = { + "raw_line": "test", + "decky": "decky-01", + "service": "ssh", + "event_type": "cmd", + "attacker_ip": "10.0.0.1", + "timestamp": "2025-01-01T00:00:00", + "fields": {}, + } + base.update(kwargs) + await repo.add_log(base) + + +# ── BUG-6 regression ───────────────────────────────────────────────────────── + +@pytest.mark.anyio +async def test_single_json_field_token_matches(repo) -> None: + """Baseline: a single JSON-field token filters correctly.""" + await _add_log(repo, fields={"cmd": "ls", "user": "root"}) + await _add_log(repo, fields={"cmd": "rm", "user": "bob"}) + + logs = await repo.get_logs(search='cmd:ls') + assert len(logs) == 1 + assert json.loads(logs[0]["fields"])["cmd"] == "ls" + + +@pytest.mark.anyio +async def test_two_distinct_json_field_tokens_both_applied(repo) -> None: + """BUG-6 regression: two JSON-field tokens must BOTH filter. + + Before the fix, only the last token's value was bound. A search for + ``cmd:ls user:root`` would execute with ``val='root'`` for both + predicates — rows with ``cmd='ls'`` but ``user='bob'`` would appear + in the results instead of being filtered out. + """ + await _add_log(repo, fields={"cmd": "ls", "user": "root"}) # should match + await _add_log(repo, fields={"cmd": "ls", "user": "bob"}) # cmd matches, user doesn't + await _add_log(repo, fields={"cmd": "rm", "user": "root"}) # user matches, cmd doesn't + await _add_log(repo, fields={"cmd": "rm", "user": "bob"}) # neither matches + + logs = await repo.get_logs(search='cmd:ls user:root') + # Only the first row satisfies both predicates. + assert len(logs) == 1, ( + f"Expected 1 log matching cmd:ls AND user:root, got {len(logs)}. " + "BUG-6: shared :val bind param causes last token to overwrite earlier ones." + ) + fields = json.loads(logs[0]["fields"]) + assert fields["cmd"] == "ls" + assert fields["user"] == "root" + + +@pytest.mark.anyio +async def test_three_json_field_tokens_all_applied(repo) -> None: + """Three JSON-field tokens must all filter independently.""" + await _add_log(repo, fields={"a": "1", "b": "2", "c": "3"}) # full match + await _add_log(repo, fields={"a": "1", "b": "2", "c": "X"}) # c mismatch + await _add_log(repo, fields={"a": "1", "b": "X", "c": "3"}) # b mismatch + await _add_log(repo, fields={"a": "X", "b": "2", "c": "3"}) # a mismatch + + logs = await repo.get_logs(search='a:1 b:2 c:3') + assert len(logs) == 1 + fields = json.loads(logs[0]["fields"]) + assert fields == {"a": "1", "b": "2", "c": "3"} + + +@pytest.mark.anyio +async def test_json_field_token_mixed_with_core_field_token(repo) -> None: + """A JSON-field token combined with a core-field filter both apply.""" + await _add_log( + repo, + decky="decky-01", + fields={"cmd": "whoami"}, + ) + await _add_log( + repo, + decky="decky-02", + fields={"cmd": "whoami"}, + ) + + # Only decky-01 row should match. + logs = await repo.get_logs(search='decky:decky-01 cmd:whoami') + assert len(logs) == 1 + assert logs[0]["decky"] == "decky-01" diff --git a/tests/deploy/test_reconcile_generator_names.py b/tests/deploy/test_reconcile_generator_names.py new file mode 100644 index 00000000..2a97d480 --- /dev/null +++ b/tests/deploy/test_reconcile_generator_names.py @@ -0,0 +1,232 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""BUG-2 regression: post-deploy reconcile must NOT mark generator-named +deckies (``decky-NNN``) as ``failed`` when their containers are running. + +Root cause: the OLD heuristic ``"-" not in service_name`` never fires for +generator-named deckies because those names always contain a hyphen. The fix +replaces the heuristic with explicit set-membership against +``expected_decky_names`` built from ``hydrated['deckies']``. + +These tests exercise the REAL production code path: +``decnet.engine.deployer.deploy_topology``. They mock every external I/O +boundary (Docker, compose, repo, filesystem) at the same layer used by the +rest of the deploy test-suite, so the assertions flow through the actual +``expected_decky_names`` / ``decky_state_by_name`` logic in deployer.py. +A revert of the BUG-2 fix causes both primary tests to FAIL (red-before / +green-after verified manually — see docstring on each test). +""" +from __future__ import annotations + +import uuid +from pathlib import Path +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + + +# ── helpers ────────────────────────────────────────────────────────────────── + +def _make_decky(name: str, *, uuid_val: str | None = None) -> dict[str, Any]: + return { + "uuid": uuid_val or str(uuid.uuid4()), + "name": name, + "decky_config": {"name": name}, + } + + +def _ps_rows(decky_name: str, *service_suffixes: str, state: str = "running") -> list[dict]: + """Simulate ``docker compose ps`` JSON rows for one decky + its services.""" + rows: list[dict] = [ + {"Service": decky_name, "Name": decky_name, "State": state, "ExitCode": 0}, + ] + for svc in service_suffixes: + container = f"{decky_name}-{svc}" + rows.append({"Service": container, "Name": container, "State": state, "ExitCode": 0}) + return rows + + +def _build_hydrated(deckies: list[dict[str, Any]]) -> dict[str, Any]: + """Minimal hydrated topology dict that satisfies deploy_topology's lookups.""" + return { + "topology": { + "uuid": "topo-test-1234", + # No target_host_uuid → master-local deploy path + }, + "lans": [ + { + "name": "DMZ", + "subnet": "10.99.0.0/24", + "is_dmz": True, + } + ], + "deckies": deckies, + } + + +async def _run_deploy(hydrated: dict, ps_rows: list[dict]) -> dict[str, str]: + """Drive deploy_topology with full I/O mocks; return the state values + passed to ``repo.update_topology_decky`` keyed by decky UUID.""" + from decnet.engine import deployer as _dep + + topology_id = hydrated["topology"]["uuid"] + recorded: dict[str, str] = {} + + repo = MagicMock() + repo.update_topology_decky = AsyncMock(side_effect=lambda uid, patch: recorded.__setitem__(uid, patch["state"])) + + # Map uuid → name so we can translate the assertion later + uuid_to_name = {d["uuid"]: d["name"] for d in hydrated["deckies"]} + + with ( + patch.object(_dep, "hydrate", new=AsyncMock(return_value=hydrated)), + patch.object(_dep, "_validate_topology", return_value={}), + patch.object(_dep, "_validation_errors", return_value=False), + patch.object(_dep, "check_no_host_port_collision", return_value=[]), + patch.object(_dep, "_warn_if_userland_proxy_enabled"), + patch.object(_dep, "transition_status", new=AsyncMock()), + # _topology_compose_path must return a Path; compose_path.exists() + # is checked in the rollback guard — return a path that does NOT exist + # so the rollback branch is skipped. + patch.object(_dep, "_topology_compose_path", return_value=Path("/nonexistent/compose.yml")), + patch.object(_dep, "_topology_compose_project", return_value="test-project"), + patch.object(_dep, "create_bridge_network"), + patch.object(_dep, "write_topology_compose"), + # _compose_with_retry is called inside anyio.to_thread.run_sync(lambda: ...) + # We patch it so the lambda is a no-op. + patch.object(_dep, "_compose_with_retry"), + # _compose_ps is also called inside anyio.to_thread.run_sync; patch it + # to return our controlled rows. + patch.object(_dep, "_compose_ps", return_value=ps_rows), + # docker.from_env() is called at deploy time + patch("decnet.engine.deployer.docker") as mock_docker, + # Silence the canary planter import that runs at the end + patch.dict("sys.modules", {"decnet.canary": MagicMock(), "decnet.canary.planter": MagicMock()}), + ): + mock_docker.from_env.return_value = MagicMock() + await _dep.deploy_topology(repo, topology_id) + + # Translate uuid keys → decky names for readable assertions + return {uuid_to_name[uid]: state for uid, state in recorded.items()} + + +# ── BUG-2 primary regression tests ─────────────────────────────────────────── + +@pytest.mark.anyio +async def test_generator_named_decky_reconciles_running() -> None: + """BUG-2 primary: generator-named decky whose container is RUNNING must be + reconciled to state='running', NOT 'failed'. + + RED before fix: the old ``"-" not in service_name`` heuristic never cached + "decky-001" (contains a hyphen), so ``decky_state_by_name.get("decky-001")`` + returned ``"unknown"`` and new_state was forced to ``"failed"``. + GREEN after fix: membership check against expected_decky_names finds + "decky-001" and correctly stores state="running". + """ + decky = _make_decky("decky-001") + hydrated = _build_hydrated([decky]) + ps = _ps_rows("decky-001", "ssh", "http") # base + two service containers + + result = await _run_deploy(hydrated, ps) + + assert result["decky-001"] == "running", ( + "Generator-named decky with running container must reconcile to 'running'" + ) + + +@pytest.mark.anyio +async def test_absent_decky_reconciles_failed() -> None: + """Genuinely absent / stopped decky must reconcile to state='failed'. + + This covers the other branch: if no ps row matches the decky name + (container never started or exited), new_state must be 'failed'. + GREEN in both old and new code — ensures the 'failed' path is not broken + by the BUG-2 fix. + """ + decky = _make_decky("decky-002") + hydrated = _build_hydrated([decky]) + # ps rows contain nothing for decky-002 — simulates a decky that never started + ps: list[dict] = [] + + result = await _run_deploy(hydrated, ps) + + assert result["decky-002"] == "failed", ( + "Decky with no running container must reconcile to 'failed'" + ) + + +@pytest.mark.anyio +async def test_both_branches_in_one_topology() -> None: + """Running generator-named decky → 'running'; absent decky → 'failed'. + + Exercises both branches of the reconcile loop simultaneously, which + is the most direct regression guard: if the fix is reverted, decky-001 + flips to 'failed' while decky-002 stays 'failed', making the first + assertion fail. + """ + decky_running = _make_decky("decky-001") + decky_absent = _make_decky("decky-099") + hydrated = _build_hydrated([decky_running, decky_absent]) + + # Only decky-001 has running containers; decky-099 has none + ps = _ps_rows("decky-001", "ssh") + + result = await _run_deploy(hydrated, ps) + + assert result["decky-001"] == "running", ( + "Running generator-named decky must not be marked failed" + ) + assert result["decky-099"] == "failed", ( + "Absent decky must be marked failed" + ) + + +@pytest.mark.anyio +async def test_decky_config_nested_name_is_honoured() -> None: + """When decky_config.name differs from outer name, the config name is + used for both compose service lookup and repo update — same logic as + deployer.py lines 1101-1104 and 1136-1138.""" + outer_name = "old-outer-name" + config_name = "decky-007" + uid = str(uuid.uuid4()) + decky = { + "uuid": uid, + "name": outer_name, + "decky_config": {"name": config_name}, + } + hydrated = _build_hydrated([decky]) + ps = _ps_rows(config_name, "ssh") + + from decnet.engine import deployer as _dep + from unittest.mock import AsyncMock, MagicMock, patch + + recorded: dict[str, str] = {} + repo = MagicMock() + repo.update_topology_decky = AsyncMock( + side_effect=lambda u, p: recorded.__setitem__(u, p["state"]) + ) + + topology_id = hydrated["topology"]["uuid"] + + with ( + patch.object(_dep, "hydrate", new=AsyncMock(return_value=hydrated)), + patch.object(_dep, "_validate_topology", return_value={}), + patch.object(_dep, "_validation_errors", return_value=False), + patch.object(_dep, "check_no_host_port_collision", return_value=[]), + patch.object(_dep, "_warn_if_userland_proxy_enabled"), + patch.object(_dep, "transition_status", new=AsyncMock()), + patch.object(_dep, "_topology_compose_path", return_value=Path("/nonexistent/compose.yml")), + patch.object(_dep, "_topology_compose_project", return_value="test-project"), + patch.object(_dep, "create_bridge_network"), + patch.object(_dep, "write_topology_compose"), + patch.object(_dep, "_compose_with_retry"), + patch.object(_dep, "_compose_ps", return_value=ps), + patch("decnet.engine.deployer.docker") as mock_docker, + patch.dict("sys.modules", {"decnet.canary": MagicMock(), "decnet.canary.planter": MagicMock()}), + ): + mock_docker.from_env.return_value = MagicMock() + await _dep.deploy_topology(repo, topology_id) + + assert recorded.get(uid) == "running", ( + "decky_config.name must be used for ps lookup; decky should reconcile running" + ) diff --git a/tests/intel/test_semaphore_enforcement.py b/tests/intel/test_semaphore_enforcement.py new file mode 100644 index 00000000..cadabae3 --- /dev/null +++ b/tests/intel/test_semaphore_enforcement.py @@ -0,0 +1,107 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Regression test for BUG-3: provider semaphore was declared but never acquired. + +The ``IntelProvider`` ABC creates ``self._semaphore = asyncio.Semaphore(self.concurrency)`` +in ``__init__``, but ``_enrich_one`` called ``p.lookup(ip)`` directly without +acquiring the semaphore first — concurrency and rate limits were silently +unenforced. + +The fix wraps each ``p.lookup(ip)`` call with ``async with p._semaphore``. +""" +from __future__ import annotations + +import asyncio +from typing import Optional + +import pytest + +from decnet.intel.base import IntelProvider, IntelResult +from decnet.intel.worker import _enrich_one + + +class _OrderedProvider(IntelProvider): + """Test double that records order of entry/exit and enforces capacity == 1.""" + + concurrency = 1 # semaphore with N=1 forces serialization + min_dispatch_interval_s = 0.0 + + def __init__(self, name: str, delay: float = 0.05) -> None: + super().__init__() + self.name = name + self._delay = delay + self.concurrent_high_watermark = 0 + self._in_flight = 0 + self.calls: list[str] = [] + + async def lookup(self, ip: str) -> IntelResult: + self._in_flight += 1 + self.concurrent_high_watermark = max( + self.concurrent_high_watermark, self._in_flight + ) + self.calls.append(ip) + await asyncio.sleep(self._delay) + self._in_flight -= 1 + return IntelResult( + provider=self.name, + verdict="benign", + column_updates={}, + ) + + +@pytest.mark.anyio +async def test_semaphore_serializes_concurrent_callers() -> None: + """BUG-3 regression: N=1 semaphore must prevent >1 concurrent lookup. + + We fire two _enrich_one calls concurrently against the same provider + (concurrency=1). With the semaphore enforced, the provider's + concurrent_high_watermark stays at 1. Without it, both calls would + enter lookup simultaneously and the watermark would reach 2. + """ + provider = _OrderedProvider("test_provider", delay=0.05) + + results = await asyncio.gather( + _enrich_one("uuid-a", "1.1.1.1", [provider], ttl_hours=24), + _enrich_one("uuid-b", "2.2.2.2", [provider], ttl_hours=24), + ) + + assert len(results) == 2 + # Both callers should complete successfully. + assert results[0]["attacker_uuid"] == "uuid-a" + assert results[1]["attacker_uuid"] == "uuid-b" + # The semaphore serialized access — never more than 1 in-flight at once. + assert provider.concurrent_high_watermark == 1, ( + f"Semaphore not enforced: {provider.concurrent_high_watermark} concurrent " + "calls observed, expected at most 1" + ) + # Both IPs were looked up. + assert set(provider.calls) == {"1.1.1.1", "2.2.2.2"} + + +@pytest.mark.anyio +async def test_semaphore_with_higher_concurrency_allows_parallel() -> None: + """With concurrency=2, two callers may proceed simultaneously.""" + + class _Wide(IntelProvider): + concurrency = 2 + min_dispatch_interval_s = 0.0 + + def __init__(self) -> None: + super().__init__() + self.name = "wide" + self._in_flight = 0 + self.watermark = 0 + + async def lookup(self, ip: str) -> IntelResult: + self._in_flight += 1 + self.watermark = max(self.watermark, self._in_flight) + await asyncio.sleep(0.05) + self._in_flight -= 1 + return IntelResult(provider=self.name, verdict=None, column_updates={}) + + wide = _Wide() + await asyncio.gather( + _enrich_one("uuid-a", "1.1.1.1", [wide], ttl_hours=24), + _enrich_one("uuid-b", "2.2.2.2", [wide], ttl_hours=24), + ) + # With concurrency=2 both calls are allowed in simultaneously. + assert wide.watermark == 2 diff --git a/tests/orchestrator/test_driver_registry.py b/tests/orchestrator/test_driver_registry.py new file mode 100644 index 00000000..a1deb530 --- /dev/null +++ b/tests/orchestrator/test_driver_registry.py @@ -0,0 +1,78 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""Regression tests for :func:`decnet.orchestrator.drivers.get_driver_for`. + +BUG-1: EditAction had no registered driver — get_driver_for raised TypeError +instead of returning an SSHDriver, silently crashing every edit tick. +""" +from __future__ import annotations + +import pytest + +from decnet.orchestrator.drivers import get_driver_for +from decnet.orchestrator.drivers.ssh import SSHDriver +from decnet.orchestrator.scheduler import EditAction, FileAction, TrafficAction + + +def _traffic_action() -> TrafficAction: + return TrafficAction( + src_uuid="u1", src_name="decky-01", + dst_uuid="u2", dst_name="decky-02", + dst_ip="10.0.0.2", + ) + + +def _file_action() -> FileAction: + return FileAction( + dst_uuid="u1", dst_name="decky-01", + path="/tmp/test.txt", + content="hello", + ) + + +def _edit_action() -> EditAction: + return EditAction( + dst_uuid="u1", dst_name="decky-01", + path="/tmp/notes.txt", + persona="alice", + content_class="note", + previous_body="old content", + synthetic_file_uuid="sf-uuid-001", + ) + + +def test_traffic_action_resolves_to_ssh_driver() -> None: + drv = get_driver_for(_traffic_action()) + assert isinstance(drv, SSHDriver) + + +def test_file_action_resolves_to_ssh_driver() -> None: + drv = get_driver_for(_file_action()) + assert isinstance(drv, SSHDriver) + + +def test_edit_action_resolves_to_ssh_driver() -> None: + """BUG-1 regression: EditAction must resolve to SSHDriver, not TypeError.""" + # Before the fix this raised: + # TypeError: no driver registered for action type EditAction + drv = get_driver_for(_edit_action()) + assert isinstance(drv, SSHDriver) + + +@pytest.mark.asyncio +async def test_edit_action_driver_executes_without_crash(monkeypatch) -> None: + """BUG-1 regression: SSHDriver.run(EditAction) must not crash silently. + + We mock _run to avoid needing a live Docker daemon; the important + assertion is that the driver resolves, runs, and returns an ActivityResult. + """ + from decnet.orchestrator.drivers import ssh as ssh_driver + from decnet.orchestrator.drivers.base import ActivityResult + + async def fake_run(argv): + return 0, "", "" + + monkeypatch.setattr(ssh_driver, "_run", fake_run) + + drv = get_driver_for(_edit_action()) + result = await drv.run(_edit_action()) + assert isinstance(result, ActivityResult) diff --git a/tests/swarm/test_swarm_api.py b/tests/swarm/test_swarm_api.py index 9c323427..8b8c66ae 100644 --- a/tests/swarm/test_swarm_api.py +++ b/tests/swarm/test_swarm_api.py @@ -437,6 +437,43 @@ def test_check_marks_hosts_active(client: TestClient, stub_agent) -> None: assert one["last_heartbeat"] is not None +# V7.1.1: a probe failure must mark the host unreachable WITHOUT leaking the +# raw exception text (file paths, TLS internals) back to the caller. +_LEAKY_PROBE_SECRET = "/etc/decnet/tls/worker-7.key: permission denied [TLSV1_ALERT]" + + +class _LeakyAgentClient(_StubAgentClient): + async def __aenter__(self) -> "_LeakyAgentClient": + raise RuntimeError(_LEAKY_PROBE_SECRET) + + +def test_check_unreachable_does_not_leak_exception_text( + client: TestClient, monkeypatch: pytest.MonkeyPatch +) -> None: + from decnet.web.router.swarm import api_check_hosts as check_mod + + monkeypatch.setattr(check_mod, "AgentClient", _LeakyAgentClient) + + h = client.post( + "/swarm/enroll", + json={"name": "leaky-w", "address": "10.0.0.13", "agent_port": 8765}, + ).json() + + resp = client.post("/swarm/check") + assert resp.status_code == 200 + results = resp.json()["results"] + assert len(results) == 1 + assert results[0]["reachable"] is False + # Generic message only — the internal exception string must be absent. + assert results[0]["detail"] == "probe failed" + assert _LEAKY_PROBE_SECRET not in resp.text + assert "permission denied" not in resp.text + + # The host is still correctly marked unreachable server-side. + one = client.get(f"/swarm/hosts/{h['host_uuid']}").json() + assert one["status"] == "unreachable" + + # ---------------------------------------------------------------- /deckies diff --git a/tests/ttp/test_rule_index.py b/tests/ttp/test_rule_index.py index f85d86c4..b0e9e60a 100644 --- a/tests/ttp/test_rule_index.py +++ b/tests/ttp/test_rule_index.py @@ -278,6 +278,54 @@ def test_apply_change_continues_on_error(caplog: pytest.LogCaptureFixture) -> No assert idx.get("R_BAD") is None +def test_install_evicts_stale_kinds_on_reinstall() -> None: + """BUG-4 regression: re-installing a rule with a narrower applies_to must + remove the rule from kinds it no longer claims. + + Before the fix, install() only added the rule to new kind-buckets; + it never cleaned up the old buckets. A rule installed for {A, B} then + re-installed for {A} would remain in B's bucket indefinitely. + """ + idx = RuleIndex() + # First install: rule covers both "command" and "email" kinds. + r1 = _rule("R_AB", applies_to=frozenset({"command", "email"})) + idx.install(r1) + assert idx.get("R_AB") is r1 + assert any(r.rule_id == "R_AB" for r in idx.by_kind("command")) + assert any(r.rule_id == "R_AB" for r in idx.by_kind("email")) + + # Re-install with a narrower set: only "command" now. + r2 = _rule("R_AB", rule_version=2, applies_to=frozenset({"command"})) + idx.install(r2) + + assert idx.get("R_AB") is r2 + # Rule must still be in "command". + assert any(r.rule_id == "R_AB" for r in idx.by_kind("command")) + # BUG-4: rule must NO LONGER be in "email" — the stale entry must be evicted. + assert not any(r.rule_id == "R_AB" for r in idx.by_kind("email")), ( + "Stale kind bucket 'email' still contains R_AB after re-install with " + "applies_to={command}; eviction on reinstall is broken" + ) + + +def test_install_eviction_does_not_affect_other_rules_in_same_kind() -> None: + """Evicting stale kinds for one rule must leave other rules in those kinds intact.""" + idx = RuleIndex() + r_ab = _rule("R_AB", applies_to=frozenset({"command", "email"})) + r_email = _rule("R_EMAIL_ONLY", applies_to=frozenset({"email"})) + idx.install(r_ab) + idx.install(r_email) + + # Re-install R_AB without "email". + r_ab_v2 = _rule("R_AB", rule_version=2, applies_to=frozenset({"command"})) + idx.install(r_ab_v2) + + # R_EMAIL_ONLY must still be in "email". + assert any(r.rule_id == "R_EMAIL_ONLY" for r in idx.by_kind("email")) + # R_AB must be gone from "email". + assert not any(r.rule_id == "R_AB" for r in idx.by_kind("email")) + + def test_expired_state_treated_as_disabled_by_is_active() -> None: """Sanity check on the helper used by both engine and lifters.""" from decnet.ttp.impl._state import is_active