fix(core): close HIGH ASVS findings V7.1.1 and correctness bugs BUG-1..6

- V7.1.1: /swarm/check no longer returns raw exception text; logs detail
  server-side, returns generic 'probe failed'.
- BUG-1: register EditAction -> SSHDriver so edit ticks no longer crash.
- BUG-2: topology reconcile matches generator-named deckies by
  expected-name membership instead of a hyphen heuristic.
- BUG-3: intel provider lookups acquire the per-provider semaphore so
  declared concurrency bounds are enforced.
- BUG-4: RuleIndex.install evicts a rule from kinds it no longer applies to.
- BUG-5: UnixSocketBus.connect() is lock-guarded with a double-check so
  concurrent first-connects open exactly one socket and reader task.
- BUG-6/V5.1.3: multi-token JSON-field search binds each token to a
  distinct parameter instead of collapsing to the last value.

Regression tests added for every fix, verified red-before/green-after.
V4.1.1c/V12.1.1 (updater master-CN gate) and V12.5.1 (tarball include-list)
confirmed already fixed in prior commits and left untouched.
This commit is contained in:
2026-06-09 23:12:49 -04:00
parent 8d18c59201
commit 6a8af315fb
16 changed files with 737 additions and 24 deletions

View File

@@ -105,14 +105,25 @@ class UnixSocketBus(BaseBus):
# ─── Lifecycle ──────────────────────────────────────────────────────────
async def connect(self) -> None:
# Double-checked locking: the cheap unlocked check fast-paths the
# already-connected case, but the actual connect must hold ``_lock``
# so two coroutines racing on a fresh bus (e.g. concurrent
# publish()/subscribe() both lazily calling connect()) can't each
# open a socket and spawn a reader task — the loser would orphan a
# live FD and an uncancelled reader_loop that close() never reaps.
if self._writer is not None:
return
if self._closed:
raise RuntimeError("connect on closed bus")
self._reader, self._writer = await asyncio.open_unix_connection(str(self._path))
await self._send(protocol.encode(protocol.HELLO, args=self._client_name))
self._reader_task = asyncio.create_task(self._reader_loop())
log.debug("bus.client: connected to %s as %s", self._path, self._client_name)
async with self._lock:
# Re-check under the lock: a racing caller may have connected
# while we awaited the lock.
if self._writer is not None:
return
if self._closed:
raise RuntimeError("connect on closed bus")
self._reader, self._writer = await asyncio.open_unix_connection(str(self._path))
await self._send(protocol.encode(protocol.HELLO, args=self._client_name))
self._reader_task = asyncio.create_task(self._reader_loop())
log.debug("bus.client: connected to %s as %s", self._path, self._client_name)
async def close(self) -> None:
if self._closed:

View File

@@ -1091,6 +1091,18 @@ async def deploy_topology(repo, topology_id: str, *, dry_run: bool = False) -> N
lambda: _compose_ps(compose_path, project=compose_project),
)
bad: list[str] = []
# Build the set of expected decky base names so we can distinguish base
# containers from service containers without relying on a hyphen heuristic.
# The generator names every decky "decky-<NNN>" (which contains a hyphen),
# so `"-" not in service_name` would never match generator-named deckies.
# Instead, explicitly check membership in the known decky name set.
expected_decky_names: set[str] = set()
for _d in hydrated.get("deckies", []):
_cfg = _d.get("decky_config") or {}
_dn = _cfg.get("name") or _d.get("name")
if _dn:
expected_decky_names.add(_dn)
# Build the per-decky state map. The base container's compose
# service name == decky name, which is what we cache on the
# TopologyDecky row. Service containers (named ``<decky>-<svc>``)
@@ -1100,8 +1112,8 @@ async def deploy_topology(repo, topology_id: str, *, dry_run: bool = False) -> N
for row in ps_rows:
state = str(row.get("State", "")).lower()
service_name = str(row.get("Service") or "")
if service_name and "-" not in service_name:
# Plain decky base; cache its docker state.
if service_name and service_name in expected_decky_names:
# This is a decky base container; cache its docker state.
decky_state_by_name[service_name] = state or "unknown"
if state and state != "running":
name = str(row.get("Name") or row.get("Service") or "?")

View File

@@ -104,8 +104,12 @@ async def _enrich_one(
value the providers see and is denormalised onto the row for SIEM /
audit consumers.
"""
async def _guarded_lookup(p: IntelProvider, ip: str) -> IntelResult:
async with p._semaphore:
return await p.lookup(ip)
results: list[IntelResult] = await asyncio.gather(
*(p.lookup(ip) for p in providers),
*(_guarded_lookup(p, ip) for p in providers),
return_exceptions=False, # providers contractually never raise
)

View File

@@ -13,7 +13,7 @@ from decnet.orchestrator.drivers.base import (
ActivityResult,
Driver,
)
from decnet.orchestrator.scheduler import Action, FileAction, TrafficAction
from decnet.orchestrator.scheduler import Action, EditAction, FileAction, TrafficAction
__all__ = [
"ActivityDriver",
@@ -58,7 +58,7 @@ def get_driver_for(action: Action) -> ActivityDriver:
# modules out of every importer's graph.
from decnet.orchestrator.drivers.ssh import SSHDriver
if isinstance(action, (TrafficAction, FileAction)):
if isinstance(action, (TrafficAction, FileAction, EditAction)):
return SSHDriver()
# EmailAction lands in stage 5; reachable only after that import is
# added to scheduler. Importing inside the branch avoids a cycle
@@ -66,7 +66,7 @@ def get_driver_for(action: Action) -> ActivityDriver:
try:
from decnet.orchestrator.emailgen.scheduler import EmailAction
except ImportError: # pragma: no cover - scheduler always exists
EmailAction = None # type: ignore[assignment, misc]
EmailAction = None # type: ignore[misc]
if EmailAction is not None and isinstance(action, EmailAction):
from decnet.orchestrator.drivers.email import EmailDriver
return EmailDriver()

View File

@@ -68,6 +68,15 @@ class RuleIndex:
if not rule.applies_to and not rule.emits:
self.evict(rule.rule_id)
return
# Evict stale kind-buckets for any kinds the updated rule no longer
# claims, so re-install with a narrower applies_to doesn't leave
# ghost entries in the old buckets.
old = self._by_rule.get(rule.rule_id)
if old is not None:
stale_kinds = old.applies_to - rule.applies_to
for kind in stale_kinds:
current = self._by_kind.get(kind, [])
self._by_kind[kind] = [r for r in current if r.rule_id != rule.rule_id]
self._by_rule[rule.rule_id] = rule
for kind in rule.applies_to:
current = self._by_kind.get(kind, [])

View File

@@ -118,10 +118,10 @@ class MySQLRepository(SQLModelRepository):
await lock_conn.execute(text("SELECT RELEASE_LOCK('decnet_schema_init')"))
await lock_conn.close()
def _json_field_equals(self, key: str):
def _json_field_equals(self, key: str, param_name: str = "val"):
# MySQL 5.7+ exposes JSON_EXTRACT; quoted string result returned for
# TEXT-stored JSON, same behavior we rely on in SQLite.
return text(f"JSON_UNQUOTE(JSON_EXTRACT(fields, '$.{key}')) = :val")
return text(f"JSON_UNQUOTE(JSON_EXTRACT(fields, '$.{key}')) = :{param_name}")
async def _insert_tags_or_ignore(self, rows: list[TTPTag]) -> int:
"""Bulk-insert with MySQL's ``INSERT IGNORE`` on the ``uuid`` PK.

View File

@@ -56,9 +56,9 @@ class SQLiteRepository(SQLModelRepository):
"ALTER TABLE attackers ADD COLUMN country_source VARCHAR(16)"
))
def _json_field_equals(self, key: str):
def _json_field_equals(self, key: str, param_name: str = "val"):
# SQLite stores JSON as text; json_extract is the canonical accessor.
return text(f"json_extract(fields, '$.{key}') = :val")
return text(f"json_extract(fields, '$.{key}') = :{param_name}")
async def _insert_tags_or_ignore(self, rows: list[TTPTag]) -> int:
"""Bulk-insert with SQLite's ``ON CONFLICT DO NOTHING`` on the

View File

@@ -84,6 +84,7 @@ class LogsMixin(_MixinBase):
"attacker_ip": Log.attacker_ip,
}
_json_token_idx = 0
for token in tokens:
if ":" in token:
key, val = token.split(":", 1)
@@ -92,9 +93,15 @@ class LogsMixin(_MixinBase):
else:
key_safe = re.sub(r"[^a-zA-Z0-9_]", "", key)
if key_safe:
# Each JSON-field filter needs its own bind-param
# name; sharing `:val` across multiple tokens means
# only the last `.params(val=...)` call survives
# and earlier filters match the wrong value.
param_name = f"jval_{_json_token_idx}"
_json_token_idx += 1
statement = statement.where(
self._json_field_equals(key_safe)
).params(val=val)
self._json_field_equals(key_safe, param_name)
).params(**{param_name: val})
else:
lk = f"%{token}%"
statement = statement.where(
@@ -107,15 +114,17 @@ class LogsMixin(_MixinBase):
)
return statement
def _json_field_equals(self, key: str):
"""Return a text() predicate that matches rows where fields->key == :val.
def _json_field_equals(self, key: str, param_name: str = "val"):
"""Return a text() predicate that matches rows where fields->key == :<param_name>.
Both SQLite and MySQL expose a ``JSON_EXTRACT`` function; MySQL also
exposes the same function under ``json_extract`` (case-insensitive).
The ``:val`` parameter is bound separately and must be supplied with
``.params(val=...)`` by the caller, which keeps us safe from injection.
The bind parameter is supplied with ``.params(<param_name>=...)`` by
the caller. Pass a distinct ``param_name`` for each token so that
multiple JSON-field filters in the same query each bind their own
value instead of sharing the last-written ``:val``.
"""
return text(f"JSON_EXTRACT(fields, '$.{key}') = :val")
return text(f"JSON_EXTRACT(fields, '$.{key}') = :{param_name}")
async def get_logs(
self,

View File

@@ -59,6 +59,9 @@ async def api_check_hosts(
detail=body,
)
except Exception as exc:
# Log the real exception server-side; never surface internal
# exception text (file paths, TLS internals, library guts) to the
# caller. Same fail-closed posture as the global 500 handler.
log.warning("swarm.check unreachable host=%s err=%s", host["name"], exc)
await repo.update_swarm_host(host["uuid"], {"status": "unreachable"})
return SwarmHostHealth(
@@ -66,7 +69,7 @@ async def api_check_hosts(
name=host["name"],
address=host["address"],
reachable=False,
detail=str(exc),
detail="probe failed",
)
results = await asyncio.gather(*(_probe(h) for h in hosts))

View File

@@ -7,6 +7,7 @@ the tmp filesystem — no Docker, no external broker.
from __future__ import annotations
import asyncio
import contextlib
import pathlib
import stat
@@ -130,3 +131,52 @@ class TestEndToEnd:
server = BusServer(sock, group=None)
with pytest.raises(FileNotFoundError):
await server.start()
class TestConcurrentConnect:
"""BUG-5: connect() must hold the bus lock so concurrent first-connects
can't each open a socket and spawn a reader (orphaning the loser's FD +
reader_loop task)."""
async def test_concurrent_connect_opens_one_socket_and_reader(
self, tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
) -> None:
sock = tmp_path / "bus.sock"
server = BusServer(sock, group=None)
await server.start()
serve_task = asyncio.create_task(server.serve_forever())
bus = UnixSocketBus(sock, client_name="race-client")
# Wrap the real transport opener with a counter, and yield control
# mid-open so two racing connect() calls actually interleave. Without
# the lock both would pass the `self._writer is None` guard and each
# open a socket; the lock + re-check collapse that to one open.
real_open = asyncio.open_unix_connection
calls = 0
async def _counting_open(*args, **kwargs):
nonlocal calls
calls += 1
await asyncio.sleep(0) # force the scheduler to interleave callers
return await real_open(*args, **kwargs)
monkeypatch.setattr(asyncio, "open_unix_connection", _counting_open)
try:
await asyncio.gather(bus.connect(), bus.connect(), bus.connect())
# Exactly one socket opened and exactly one live reader task.
assert calls == 1
assert bus._writer is not None
assert bus._reader_task is not None
assert not bus._reader_task.done()
finally:
await bus.close()
serve_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await serve_task
await server.close()
# close() reaped the single reader task — no orphans left behind.
assert bus._reader_task is None

View File

@@ -0,0 +1,113 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Regression test for BUG-6: multi-token JSON-field search collapses all
custom filters to the last value (shared :val bind parameter).
Root cause: ``_apply_filters`` loops over search tokens and for each
JSON-field token calls ``.params(val=val)``. Because every call reuses the
same bind name ``:val``, SQLAlchemy's last ``.params()`` call overwrites all
earlier ones — only the last JSON-field token's value is actually bound.
The fix gives each JSON-field token a distinct bind parameter name
(``jval_0``, ``jval_1``, …) so every token value survives.
"""
from __future__ import annotations
import json
from pathlib import Path
import pytest
from decnet.web.db.factory import get_repository
@pytest.fixture
async def repo(tmp_path: Path):
r = get_repository(db_path=str(tmp_path / "log_search.db"))
await r.initialize()
return r
async def _add_log(repo, **kwargs) -> None:
base = {
"raw_line": "test",
"decky": "decky-01",
"service": "ssh",
"event_type": "cmd",
"attacker_ip": "10.0.0.1",
"timestamp": "2025-01-01T00:00:00",
"fields": {},
}
base.update(kwargs)
await repo.add_log(base)
# ── BUG-6 regression ─────────────────────────────────────────────────────────
@pytest.mark.anyio
async def test_single_json_field_token_matches(repo) -> None:
"""Baseline: a single JSON-field token filters correctly."""
await _add_log(repo, fields={"cmd": "ls", "user": "root"})
await _add_log(repo, fields={"cmd": "rm", "user": "bob"})
logs = await repo.get_logs(search='cmd:ls')
assert len(logs) == 1
assert json.loads(logs[0]["fields"])["cmd"] == "ls"
@pytest.mark.anyio
async def test_two_distinct_json_field_tokens_both_applied(repo) -> None:
"""BUG-6 regression: two JSON-field tokens must BOTH filter.
Before the fix, only the last token's value was bound. A search for
``cmd:ls user:root`` would execute with ``val='root'`` for both
predicates — rows with ``cmd='ls'`` but ``user='bob'`` would appear
in the results instead of being filtered out.
"""
await _add_log(repo, fields={"cmd": "ls", "user": "root"}) # should match
await _add_log(repo, fields={"cmd": "ls", "user": "bob"}) # cmd matches, user doesn't
await _add_log(repo, fields={"cmd": "rm", "user": "root"}) # user matches, cmd doesn't
await _add_log(repo, fields={"cmd": "rm", "user": "bob"}) # neither matches
logs = await repo.get_logs(search='cmd:ls user:root')
# Only the first row satisfies both predicates.
assert len(logs) == 1, (
f"Expected 1 log matching cmd:ls AND user:root, got {len(logs)}. "
"BUG-6: shared :val bind param causes last token to overwrite earlier ones."
)
fields = json.loads(logs[0]["fields"])
assert fields["cmd"] == "ls"
assert fields["user"] == "root"
@pytest.mark.anyio
async def test_three_json_field_tokens_all_applied(repo) -> None:
"""Three JSON-field tokens must all filter independently."""
await _add_log(repo, fields={"a": "1", "b": "2", "c": "3"}) # full match
await _add_log(repo, fields={"a": "1", "b": "2", "c": "X"}) # c mismatch
await _add_log(repo, fields={"a": "1", "b": "X", "c": "3"}) # b mismatch
await _add_log(repo, fields={"a": "X", "b": "2", "c": "3"}) # a mismatch
logs = await repo.get_logs(search='a:1 b:2 c:3')
assert len(logs) == 1
fields = json.loads(logs[0]["fields"])
assert fields == {"a": "1", "b": "2", "c": "3"}
@pytest.mark.anyio
async def test_json_field_token_mixed_with_core_field_token(repo) -> None:
"""A JSON-field token combined with a core-field filter both apply."""
await _add_log(
repo,
decky="decky-01",
fields={"cmd": "whoami"},
)
await _add_log(
repo,
decky="decky-02",
fields={"cmd": "whoami"},
)
# Only decky-01 row should match.
logs = await repo.get_logs(search='decky:decky-01 cmd:whoami')
assert len(logs) == 1
assert logs[0]["decky"] == "decky-01"

View File

@@ -0,0 +1,232 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""BUG-2 regression: post-deploy reconcile must NOT mark generator-named
deckies (``decky-NNN``) as ``failed`` when their containers are running.
Root cause: the OLD heuristic ``"-" not in service_name`` never fires for
generator-named deckies because those names always contain a hyphen. The fix
replaces the heuristic with explicit set-membership against
``expected_decky_names`` built from ``hydrated['deckies']``.
These tests exercise the REAL production code path:
``decnet.engine.deployer.deploy_topology``. They mock every external I/O
boundary (Docker, compose, repo, filesystem) at the same layer used by the
rest of the deploy test-suite, so the assertions flow through the actual
``expected_decky_names`` / ``decky_state_by_name`` logic in deployer.py.
A revert of the BUG-2 fix causes both primary tests to FAIL (red-before /
green-after verified manually — see docstring on each test).
"""
from __future__ import annotations
import uuid
from pathlib import Path
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ── helpers ──────────────────────────────────────────────────────────────────
def _make_decky(name: str, *, uuid_val: str | None = None) -> dict[str, Any]:
return {
"uuid": uuid_val or str(uuid.uuid4()),
"name": name,
"decky_config": {"name": name},
}
def _ps_rows(decky_name: str, *service_suffixes: str, state: str = "running") -> list[dict]:
"""Simulate ``docker compose ps`` JSON rows for one decky + its services."""
rows: list[dict] = [
{"Service": decky_name, "Name": decky_name, "State": state, "ExitCode": 0},
]
for svc in service_suffixes:
container = f"{decky_name}-{svc}"
rows.append({"Service": container, "Name": container, "State": state, "ExitCode": 0})
return rows
def _build_hydrated(deckies: list[dict[str, Any]]) -> dict[str, Any]:
"""Minimal hydrated topology dict that satisfies deploy_topology's lookups."""
return {
"topology": {
"uuid": "topo-test-1234",
# No target_host_uuid → master-local deploy path
},
"lans": [
{
"name": "DMZ",
"subnet": "10.99.0.0/24",
"is_dmz": True,
}
],
"deckies": deckies,
}
async def _run_deploy(hydrated: dict, ps_rows: list[dict]) -> dict[str, str]:
"""Drive deploy_topology with full I/O mocks; return the state values
passed to ``repo.update_topology_decky`` keyed by decky UUID."""
from decnet.engine import deployer as _dep
topology_id = hydrated["topology"]["uuid"]
recorded: dict[str, str] = {}
repo = MagicMock()
repo.update_topology_decky = AsyncMock(side_effect=lambda uid, patch: recorded.__setitem__(uid, patch["state"]))
# Map uuid → name so we can translate the assertion later
uuid_to_name = {d["uuid"]: d["name"] for d in hydrated["deckies"]}
with (
patch.object(_dep, "hydrate", new=AsyncMock(return_value=hydrated)),
patch.object(_dep, "_validate_topology", return_value={}),
patch.object(_dep, "_validation_errors", return_value=False),
patch.object(_dep, "check_no_host_port_collision", return_value=[]),
patch.object(_dep, "_warn_if_userland_proxy_enabled"),
patch.object(_dep, "transition_status", new=AsyncMock()),
# _topology_compose_path must return a Path; compose_path.exists()
# is checked in the rollback guard — return a path that does NOT exist
# so the rollback branch is skipped.
patch.object(_dep, "_topology_compose_path", return_value=Path("/nonexistent/compose.yml")),
patch.object(_dep, "_topology_compose_project", return_value="test-project"),
patch.object(_dep, "create_bridge_network"),
patch.object(_dep, "write_topology_compose"),
# _compose_with_retry is called inside anyio.to_thread.run_sync(lambda: ...)
# We patch it so the lambda is a no-op.
patch.object(_dep, "_compose_with_retry"),
# _compose_ps is also called inside anyio.to_thread.run_sync; patch it
# to return our controlled rows.
patch.object(_dep, "_compose_ps", return_value=ps_rows),
# docker.from_env() is called at deploy time
patch("decnet.engine.deployer.docker") as mock_docker,
# Silence the canary planter import that runs at the end
patch.dict("sys.modules", {"decnet.canary": MagicMock(), "decnet.canary.planter": MagicMock()}),
):
mock_docker.from_env.return_value = MagicMock()
await _dep.deploy_topology(repo, topology_id)
# Translate uuid keys → decky names for readable assertions
return {uuid_to_name[uid]: state for uid, state in recorded.items()}
# ── BUG-2 primary regression tests ───────────────────────────────────────────
@pytest.mark.anyio
async def test_generator_named_decky_reconciles_running() -> None:
"""BUG-2 primary: generator-named decky whose container is RUNNING must be
reconciled to state='running', NOT 'failed'.
RED before fix: the old ``"-" not in service_name`` heuristic never cached
"decky-001" (contains a hyphen), so ``decky_state_by_name.get("decky-001")``
returned ``"unknown"`` and new_state was forced to ``"failed"``.
GREEN after fix: membership check against expected_decky_names finds
"decky-001" and correctly stores state="running".
"""
decky = _make_decky("decky-001")
hydrated = _build_hydrated([decky])
ps = _ps_rows("decky-001", "ssh", "http") # base + two service containers
result = await _run_deploy(hydrated, ps)
assert result["decky-001"] == "running", (
"Generator-named decky with running container must reconcile to 'running'"
)
@pytest.mark.anyio
async def test_absent_decky_reconciles_failed() -> None:
"""Genuinely absent / stopped decky must reconcile to state='failed'.
This covers the other branch: if no ps row matches the decky name
(container never started or exited), new_state must be 'failed'.
GREEN in both old and new code — ensures the 'failed' path is not broken
by the BUG-2 fix.
"""
decky = _make_decky("decky-002")
hydrated = _build_hydrated([decky])
# ps rows contain nothing for decky-002 — simulates a decky that never started
ps: list[dict] = []
result = await _run_deploy(hydrated, ps)
assert result["decky-002"] == "failed", (
"Decky with no running container must reconcile to 'failed'"
)
@pytest.mark.anyio
async def test_both_branches_in_one_topology() -> None:
"""Running generator-named decky → 'running'; absent decky → 'failed'.
Exercises both branches of the reconcile loop simultaneously, which
is the most direct regression guard: if the fix is reverted, decky-001
flips to 'failed' while decky-002 stays 'failed', making the first
assertion fail.
"""
decky_running = _make_decky("decky-001")
decky_absent = _make_decky("decky-099")
hydrated = _build_hydrated([decky_running, decky_absent])
# Only decky-001 has running containers; decky-099 has none
ps = _ps_rows("decky-001", "ssh")
result = await _run_deploy(hydrated, ps)
assert result["decky-001"] == "running", (
"Running generator-named decky must not be marked failed"
)
assert result["decky-099"] == "failed", (
"Absent decky must be marked failed"
)
@pytest.mark.anyio
async def test_decky_config_nested_name_is_honoured() -> None:
"""When decky_config.name differs from outer name, the config name is
used for both compose service lookup and repo update — same logic as
deployer.py lines 1101-1104 and 1136-1138."""
outer_name = "old-outer-name"
config_name = "decky-007"
uid = str(uuid.uuid4())
decky = {
"uuid": uid,
"name": outer_name,
"decky_config": {"name": config_name},
}
hydrated = _build_hydrated([decky])
ps = _ps_rows(config_name, "ssh")
from decnet.engine import deployer as _dep
from unittest.mock import AsyncMock, MagicMock, patch
recorded: dict[str, str] = {}
repo = MagicMock()
repo.update_topology_decky = AsyncMock(
side_effect=lambda u, p: recorded.__setitem__(u, p["state"])
)
topology_id = hydrated["topology"]["uuid"]
with (
patch.object(_dep, "hydrate", new=AsyncMock(return_value=hydrated)),
patch.object(_dep, "_validate_topology", return_value={}),
patch.object(_dep, "_validation_errors", return_value=False),
patch.object(_dep, "check_no_host_port_collision", return_value=[]),
patch.object(_dep, "_warn_if_userland_proxy_enabled"),
patch.object(_dep, "transition_status", new=AsyncMock()),
patch.object(_dep, "_topology_compose_path", return_value=Path("/nonexistent/compose.yml")),
patch.object(_dep, "_topology_compose_project", return_value="test-project"),
patch.object(_dep, "create_bridge_network"),
patch.object(_dep, "write_topology_compose"),
patch.object(_dep, "_compose_with_retry"),
patch.object(_dep, "_compose_ps", return_value=ps),
patch("decnet.engine.deployer.docker") as mock_docker,
patch.dict("sys.modules", {"decnet.canary": MagicMock(), "decnet.canary.planter": MagicMock()}),
):
mock_docker.from_env.return_value = MagicMock()
await _dep.deploy_topology(repo, topology_id)
assert recorded.get(uid) == "running", (
"decky_config.name must be used for ps lookup; decky should reconcile running"
)

View File

@@ -0,0 +1,107 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Regression test for BUG-3: provider semaphore was declared but never acquired.
The ``IntelProvider`` ABC creates ``self._semaphore = asyncio.Semaphore(self.concurrency)``
in ``__init__``, but ``_enrich_one`` called ``p.lookup(ip)`` directly without
acquiring the semaphore first — concurrency and rate limits were silently
unenforced.
The fix wraps each ``p.lookup(ip)`` call with ``async with p._semaphore``.
"""
from __future__ import annotations
import asyncio
from typing import Optional
import pytest
from decnet.intel.base import IntelProvider, IntelResult
from decnet.intel.worker import _enrich_one
class _OrderedProvider(IntelProvider):
"""Test double that records order of entry/exit and enforces capacity == 1."""
concurrency = 1 # semaphore with N=1 forces serialization
min_dispatch_interval_s = 0.0
def __init__(self, name: str, delay: float = 0.05) -> None:
super().__init__()
self.name = name
self._delay = delay
self.concurrent_high_watermark = 0
self._in_flight = 0
self.calls: list[str] = []
async def lookup(self, ip: str) -> IntelResult:
self._in_flight += 1
self.concurrent_high_watermark = max(
self.concurrent_high_watermark, self._in_flight
)
self.calls.append(ip)
await asyncio.sleep(self._delay)
self._in_flight -= 1
return IntelResult(
provider=self.name,
verdict="benign",
column_updates={},
)
@pytest.mark.anyio
async def test_semaphore_serializes_concurrent_callers() -> None:
"""BUG-3 regression: N=1 semaphore must prevent >1 concurrent lookup.
We fire two _enrich_one calls concurrently against the same provider
(concurrency=1). With the semaphore enforced, the provider's
concurrent_high_watermark stays at 1. Without it, both calls would
enter lookup simultaneously and the watermark would reach 2.
"""
provider = _OrderedProvider("test_provider", delay=0.05)
results = await asyncio.gather(
_enrich_one("uuid-a", "1.1.1.1", [provider], ttl_hours=24),
_enrich_one("uuid-b", "2.2.2.2", [provider], ttl_hours=24),
)
assert len(results) == 2
# Both callers should complete successfully.
assert results[0]["attacker_uuid"] == "uuid-a"
assert results[1]["attacker_uuid"] == "uuid-b"
# The semaphore serialized access — never more than 1 in-flight at once.
assert provider.concurrent_high_watermark == 1, (
f"Semaphore not enforced: {provider.concurrent_high_watermark} concurrent "
"calls observed, expected at most 1"
)
# Both IPs were looked up.
assert set(provider.calls) == {"1.1.1.1", "2.2.2.2"}
@pytest.mark.anyio
async def test_semaphore_with_higher_concurrency_allows_parallel() -> None:
"""With concurrency=2, two callers may proceed simultaneously."""
class _Wide(IntelProvider):
concurrency = 2
min_dispatch_interval_s = 0.0
def __init__(self) -> None:
super().__init__()
self.name = "wide"
self._in_flight = 0
self.watermark = 0
async def lookup(self, ip: str) -> IntelResult:
self._in_flight += 1
self.watermark = max(self.watermark, self._in_flight)
await asyncio.sleep(0.05)
self._in_flight -= 1
return IntelResult(provider=self.name, verdict=None, column_updates={})
wide = _Wide()
await asyncio.gather(
_enrich_one("uuid-a", "1.1.1.1", [wide], ttl_hours=24),
_enrich_one("uuid-b", "2.2.2.2", [wide], ttl_hours=24),
)
# With concurrency=2 both calls are allowed in simultaneously.
assert wide.watermark == 2

View File

@@ -0,0 +1,78 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""Regression tests for :func:`decnet.orchestrator.drivers.get_driver_for`.
BUG-1: EditAction had no registered driver — get_driver_for raised TypeError
instead of returning an SSHDriver, silently crashing every edit tick.
"""
from __future__ import annotations
import pytest
from decnet.orchestrator.drivers import get_driver_for
from decnet.orchestrator.drivers.ssh import SSHDriver
from decnet.orchestrator.scheduler import EditAction, FileAction, TrafficAction
def _traffic_action() -> TrafficAction:
return TrafficAction(
src_uuid="u1", src_name="decky-01",
dst_uuid="u2", dst_name="decky-02",
dst_ip="10.0.0.2",
)
def _file_action() -> FileAction:
return FileAction(
dst_uuid="u1", dst_name="decky-01",
path="/tmp/test.txt",
content="hello",
)
def _edit_action() -> EditAction:
return EditAction(
dst_uuid="u1", dst_name="decky-01",
path="/tmp/notes.txt",
persona="alice",
content_class="note",
previous_body="old content",
synthetic_file_uuid="sf-uuid-001",
)
def test_traffic_action_resolves_to_ssh_driver() -> None:
drv = get_driver_for(_traffic_action())
assert isinstance(drv, SSHDriver)
def test_file_action_resolves_to_ssh_driver() -> None:
drv = get_driver_for(_file_action())
assert isinstance(drv, SSHDriver)
def test_edit_action_resolves_to_ssh_driver() -> None:
"""BUG-1 regression: EditAction must resolve to SSHDriver, not TypeError."""
# Before the fix this raised:
# TypeError: no driver registered for action type EditAction
drv = get_driver_for(_edit_action())
assert isinstance(drv, SSHDriver)
@pytest.mark.asyncio
async def test_edit_action_driver_executes_without_crash(monkeypatch) -> None:
"""BUG-1 regression: SSHDriver.run(EditAction) must not crash silently.
We mock _run to avoid needing a live Docker daemon; the important
assertion is that the driver resolves, runs, and returns an ActivityResult.
"""
from decnet.orchestrator.drivers import ssh as ssh_driver
from decnet.orchestrator.drivers.base import ActivityResult
async def fake_run(argv):
return 0, "", ""
monkeypatch.setattr(ssh_driver, "_run", fake_run)
drv = get_driver_for(_edit_action())
result = await drv.run(_edit_action())
assert isinstance(result, ActivityResult)

View File

@@ -437,6 +437,43 @@ def test_check_marks_hosts_active(client: TestClient, stub_agent) -> None:
assert one["last_heartbeat"] is not None
# V7.1.1: a probe failure must mark the host unreachable WITHOUT leaking the
# raw exception text (file paths, TLS internals) back to the caller.
_LEAKY_PROBE_SECRET = "/etc/decnet/tls/worker-7.key: permission denied [TLSV1_ALERT]"
class _LeakyAgentClient(_StubAgentClient):
async def __aenter__(self) -> "_LeakyAgentClient":
raise RuntimeError(_LEAKY_PROBE_SECRET)
def test_check_unreachable_does_not_leak_exception_text(
client: TestClient, monkeypatch: pytest.MonkeyPatch
) -> None:
from decnet.web.router.swarm import api_check_hosts as check_mod
monkeypatch.setattr(check_mod, "AgentClient", _LeakyAgentClient)
h = client.post(
"/swarm/enroll",
json={"name": "leaky-w", "address": "10.0.0.13", "agent_port": 8765},
).json()
resp = client.post("/swarm/check")
assert resp.status_code == 200
results = resp.json()["results"]
assert len(results) == 1
assert results[0]["reachable"] is False
# Generic message only — the internal exception string must be absent.
assert results[0]["detail"] == "probe failed"
assert _LEAKY_PROBE_SECRET not in resp.text
assert "permission denied" not in resp.text
# The host is still correctly marked unreachable server-side.
one = client.get(f"/swarm/hosts/{h['host_uuid']}").json()
assert one["status"] == "unreachable"
# ---------------------------------------------------------------- /deckies

View File

@@ -278,6 +278,54 @@ def test_apply_change_continues_on_error(caplog: pytest.LogCaptureFixture) -> No
assert idx.get("R_BAD") is None
def test_install_evicts_stale_kinds_on_reinstall() -> None:
"""BUG-4 regression: re-installing a rule with a narrower applies_to must
remove the rule from kinds it no longer claims.
Before the fix, install() only added the rule to new kind-buckets;
it never cleaned up the old buckets. A rule installed for {A, B} then
re-installed for {A} would remain in B's bucket indefinitely.
"""
idx = RuleIndex()
# First install: rule covers both "command" and "email" kinds.
r1 = _rule("R_AB", applies_to=frozenset({"command", "email"}))
idx.install(r1)
assert idx.get("R_AB") is r1
assert any(r.rule_id == "R_AB" for r in idx.by_kind("command"))
assert any(r.rule_id == "R_AB" for r in idx.by_kind("email"))
# Re-install with a narrower set: only "command" now.
r2 = _rule("R_AB", rule_version=2, applies_to=frozenset({"command"}))
idx.install(r2)
assert idx.get("R_AB") is r2
# Rule must still be in "command".
assert any(r.rule_id == "R_AB" for r in idx.by_kind("command"))
# BUG-4: rule must NO LONGER be in "email" — the stale entry must be evicted.
assert not any(r.rule_id == "R_AB" for r in idx.by_kind("email")), (
"Stale kind bucket 'email' still contains R_AB after re-install with "
"applies_to={command}; eviction on reinstall is broken"
)
def test_install_eviction_does_not_affect_other_rules_in_same_kind() -> None:
"""Evicting stale kinds for one rule must leave other rules in those kinds intact."""
idx = RuleIndex()
r_ab = _rule("R_AB", applies_to=frozenset({"command", "email"}))
r_email = _rule("R_EMAIL_ONLY", applies_to=frozenset({"email"}))
idx.install(r_ab)
idx.install(r_email)
# Re-install R_AB without "email".
r_ab_v2 = _rule("R_AB", rule_version=2, applies_to=frozenset({"command"}))
idx.install(r_ab_v2)
# R_EMAIL_ONLY must still be in "email".
assert any(r.rule_id == "R_EMAIL_ONLY" for r in idx.by_kind("email"))
# R_AB must be gone from "email".
assert not any(r.rule_id == "R_AB" for r in idx.by_kind("email"))
def test_expired_state_treated_as_disabled_by_is_active() -> None:
"""Sanity check on the helper used by both engine and lifters."""
from decnet.ttp.impl._state import is_active