fix(security): close LOW ASVS findings — env bypass, SSE/deployment authz, CN fail-close, password byte-limit, exception leaks, BUG-12..16

Auth/session (V2.1.7, V4.1.5, V4.1.6, V2.1.4/V2.1.5):
- env secret validation no longer bypassed by attacker-injectable PYTEST* env;
  gated on explicit DECNET_TESTING=1 (set only in conftest).
- must_change_password now enforced on the SSE header-JWT path, not just ticket mint.
- GET /system/deployment-mode requires viewer auth (was leaking role + topology size).
- CreateUser/ResetUser passwords min_length=12; passwords >72 bytes rejected
  explicitly instead of bcrypt silently truncating.

Swarm ingestion (V9.1.3, BUG-16):
- Log listener hard-rejects peers with unparseable/empty cert CN (fail closed,
  ingests nothing) instead of tagging 'unknown'.
- Shutdown handlers no longer swallow real errors (narrowed to CancelledError).

Info leakage (V7.1.2, V14.1.2):
- Exception text sanitized on swarm-update, health, tarpit, realism, file-drop,
  blank-topology endpoints (raw tc/docker stderr, DB/Docker errors logged
  server-side, generic detail returned). pyproject license corrected to AGPL-3.0.

Correctness (BUG-12..16):
- BUG-12 atomic credential upsert (UNIQUE constraint + IntegrityError retry,
  consistent principal_key canonicalization).
- BUG-13 rule-tail watermark uses >= with seen-id dedup (no same-second drop).
- BUG-14 worker wake cleared before wait (no lost wake during tick).
- BUG-15 intel gather tolerates an unexpected provider raise.
- BUG-16 see above.

Already-closed (verified, no change): V2.1.6, V5.1.3, V9.1.2. Accept-risk +
documented: V2.1.8 cache window, V3.1.3 idle timeout. Tests added for every fix;
unanimous adversarial review after two refute-fix rounds.
This commit is contained in:
2026-06-10 13:27:14 -04:00
parent d80e6aa6d1
commit 245975a6dd
40 changed files with 1629 additions and 72 deletions

View File

@@ -0,0 +1,206 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""V2.1.4 + V2.1.5 password policy tests.
Covers:
- min_length=12 enforced on CreateUserRequest and ResetUserPasswordRequest
- bcrypt 72-byte limit: multi-byte passwords >72 bytes are rejected (not silently truncated)
"""
import pytest
import httpx
from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD
# '€' (U+20AC) encodes to 3 UTF-8 bytes.
# 25 × 3 = 75 bytes > 72 — valid char count, over the byte cap.
_OVER_72_BYTES: str = "" * 25
# Exactly at the limit: 24 × 3 = 72 bytes — must be ACCEPTED.
_EXACTLY_72_BYTES: str = "" * 24
# ─── V2.1.4: min_length=12 on create ────────────────────────────────────────
@pytest.mark.anyio
async def test_create_user_11char_password_rejected(
client: httpx.AsyncClient, auth_token: str
) -> None:
"""11-character password must be rejected on user creation (min is 12)."""
resp = await client.post(
"/api/v1/config/users",
json={"username": "shortpwduser", "password": "short11char", "role": "viewer"},
headers={"Authorization": f"Bearer {auth_token}"},
)
# Schema-guard middleware may surface as 400; FastAPI validation as 422.
assert resp.status_code in (400, 422), (
f"Expected 400/422 for 11-char password on create, got {resp.status_code}"
)
@pytest.mark.anyio
async def test_create_user_12char_password_accepted(
client: httpx.AsyncClient, auth_token: str
) -> None:
"""Exactly 12-character ASCII password must be accepted."""
resp = await client.post(
"/api/v1/config/users",
json={"username": "minpwduser12", "password": "exactly12chr", "role": "viewer"},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert resp.status_code == 200, (
f"Expected 200 for 12-char password on create, got {resp.status_code}: {resp.text}"
)
# ─── V2.1.4: min_length=12 on reset ─────────────────────────────────────────
@pytest.mark.anyio
async def test_reset_password_11char_rejected(
client: httpx.AsyncClient, auth_token: str
) -> None:
"""11-character new_password must be rejected on admin password reset."""
# Create a user to reset
create_resp = await client.post(
"/api/v1/config/users",
json={"username": "resetpolicy1", "password": "securepass123", "role": "viewer"},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert create_resp.status_code == 200
user_uuid = create_resp.json()["uuid"]
resp = await client.put(
f"/api/v1/config/users/{user_uuid}/reset-password",
json={"new_password": "short11char"},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert resp.status_code in (400, 422), (
f"Expected 400/422 for 11-char password on reset, got {resp.status_code}"
)
@pytest.mark.anyio
async def test_reset_password_12char_accepted(
client: httpx.AsyncClient, auth_token: str
) -> None:
"""Exactly 12-character password must be accepted on admin password reset."""
create_resp = await client.post(
"/api/v1/config/users",
json={"username": "resetpolicy2", "password": "securepass123", "role": "viewer"},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert create_resp.status_code == 200
user_uuid = create_resp.json()["uuid"]
resp = await client.put(
f"/api/v1/config/users/{user_uuid}/reset-password",
json={"new_password": "exactly12chr"},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert resp.status_code == 200, (
f"Expected 200 for 12-char password on reset, got {resp.status_code}: {resp.text}"
)
# ─── V2.1.5: bcrypt 72-byte rejection on create ─────────────────────────────
@pytest.mark.anyio
async def test_create_user_over_72_bytes_rejected(
client: httpx.AsyncClient, auth_token: str
) -> None:
"""Password >72 UTF-8 bytes must be rejected on create (bcrypt truncation guard)."""
resp = await client.post(
"/api/v1/config/users",
json={"username": "bytepolicyusr", "password": _OVER_72_BYTES, "role": "viewer"},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert resp.status_code in (400, 422), (
f"Expected 400/422 for >{72}-byte password on create, got {resp.status_code}"
)
@pytest.mark.anyio
async def test_create_user_exactly_72_bytes_accepted(
client: httpx.AsyncClient, auth_token: str
) -> None:
"""Password of exactly 72 UTF-8 bytes must be accepted (at the limit, not over)."""
resp = await client.post(
"/api/v1/config/users",
json={"username": "byteedgeuser1", "password": _EXACTLY_72_BYTES, "role": "viewer"},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert resp.status_code == 200, (
f"Expected 200 for exactly-72-byte password on create, got {resp.status_code}: {resp.text}"
)
# ─── V2.1.5: bcrypt 72-byte rejection on reset ──────────────────────────────
@pytest.mark.anyio
async def test_reset_password_over_72_bytes_rejected(
client: httpx.AsyncClient, auth_token: str
) -> None:
"""new_password >72 UTF-8 bytes must be rejected on admin password reset."""
create_resp = await client.post(
"/api/v1/config/users",
json={"username": "byteresetuser", "password": "securepass123", "role": "viewer"},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert create_resp.status_code == 200
user_uuid = create_resp.json()["uuid"]
resp = await client.put(
f"/api/v1/config/users/{user_uuid}/reset-password",
json={"new_password": _OVER_72_BYTES},
headers={"Authorization": f"Bearer {auth_token}"},
)
assert resp.status_code in (400, 422), (
f"Expected 400/422 for >{72}-byte new_password on reset, got {resp.status_code}"
)
# ─── V2.1.5: bcrypt 72-byte rejection on change-password ────────────────────
@pytest.mark.anyio
async def test_change_password_new_over_72_bytes_rejected(
client: httpx.AsyncClient,
) -> None:
"""new_password >72 UTF-8 bytes must be rejected on change-password."""
login_resp = await client.post(
"/api/v1/auth/login",
json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD},
)
token = login_resp.json()["access_token"]
resp = await client.post(
"/api/v1/auth/change-password",
json={"old_password": DECNET_ADMIN_PASSWORD, "new_password": _OVER_72_BYTES},
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code in (400, 422), (
f"Expected 400/422 for >{72}-byte new_password on change-password, got {resp.status_code}"
)
@pytest.mark.anyio
async def test_change_password_old_over_72_bytes_rejected(
client: httpx.AsyncClient,
) -> None:
"""old_password >72 UTF-8 bytes must be rejected (no point checking against hash)."""
login_resp = await client.post(
"/api/v1/auth/login",
json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD},
)
token = login_resp.json()["access_token"]
resp = await client.post(
"/api/v1/auth/change-password",
json={"old_password": _OVER_72_BYTES, "new_password": "new_secure_password"},
headers={"Authorization": f"Bearer {token}"},
)
assert resp.status_code in (400, 422), (
f"Expected 400/422 for >{72}-byte old_password on change-password, got {resp.status_code}"
)

View File

@@ -80,6 +80,43 @@ async def test_sse_ticket_endpoint_mints_and_redeems(
assert "uuid" in identity and identity["role"] in ("admin", "viewer")
@pytest.mark.anyio
async def test_sse_header_jwt_rejects_must_change_password(monkeypatch) -> None:
"""V4.1.5: the header-JWT SSE branch must enforce must_change_password the
same way require_role does. A user blocked from every REST endpoint must not
be able to subscribe to live SSE streams with their existing token."""
async def _fake_resolve(token: str):
return "user-1", {"uuid": "user-1", "role": "viewer", "must_change_password": True}
monkeypatch.setattr(deps, "_resolve_token", _fake_resolve)
class _Req:
headers = {"Authorization": "Bearer some.jwt.token"}
gate = deps.require_stream_role("viewer", "admin")
with pytest.raises(HTTPException) as exc:
await gate(_Req(), ticket=None) # type: ignore[arg-type]
assert exc.value.status_code == 403
assert "Password change required" in exc.value.detail
@pytest.mark.anyio
async def test_sse_header_jwt_allows_cleared_user(monkeypatch) -> None:
"""Control: a user who has cleared must_change_password passes the header-JWT
SSE gate (proves the new guard didn't break the happy path)."""
async def _fake_resolve(token: str):
return "user-1", {"uuid": "user-1", "role": "viewer", "must_change_password": False}
monkeypatch.setattr(deps, "_resolve_token", _fake_resolve)
class _Req:
headers = {"Authorization": "Bearer some.jwt.token"}
gate = deps.require_stream_role("viewer", "admin")
user = await gate(_Req(), ticket=None) # type: ignore[arg-type]
assert user["uuid"] == "user-1"
def test_raw_jwt_in_sse_query_rejected() -> None:
"""V3.1.1: a raw JWT is not a valid opaque ticket — _redeem_sse_ticket rejects
any token that wasn't minted by mint_sse_ticket (unknown key → 401)."""

View File

@@ -0,0 +1,275 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
"""V7.1.2 regression: tarpit endpoints must NOT leak raw tc stderr to API callers.
Covers both fleet (deckies) and topology tarpit paths. Forces a tc failure
by monkeypatching _apply_tarpit / _remove_tarpit to raise RuntimeError with
a realistic iproute2/kernel error string (veth name, qdisc id, errno text),
then asserts:
1. Status code is 409.
2. The response body detail is a generic string.
3. The response body contains NO raw tc output — no veth names, no
"RTNETLINK", no kernel errno strings, no qdisc identifiers.
"""
from __future__ import annotations
import pytest
import httpx
from decnet.web.router.deckies import api_tarpit as _deckies_tarpit
from decnet.web.router.topology import api_tarpit as _topology_tarpit
_FLEET_URL = "/api/v1/deckies/web1/tarpit"
_TOPO_URL = "/api/v1/topologies/topo1/deckies/web1/tarpit"
# Realistic iproute2 stderr fragments — must NOT appear in any API response.
_TC_STDERR_FRAGMENTS = [
"RTNETLINK",
"veth",
"qdisc",
"Cannot find device",
"No such file or directory",
"Error: Exclusivity flag on, cannot modify.",
"NLMSG_ERROR",
"errno",
]
# Realistic docker exec / runtime stderr fragments that must NOT leak via the
# get_container_veth LookupError path (V7.1.2 — 404 path).
_DOCKER_STDERR_FRAGMENTS = [
"Error response from daemon",
"No such container",
"OCI runtime",
"exec failed",
"permission denied",
]
_TARPIT_BODY = {"ports": [22, 80], "delay_ms": 500}
def _hdr(token: str) -> dict[str, str]:
return {"Authorization": f"Bearer {token}"}
def _assert_no_tc_leak(body: dict) -> None:
"""Assert that the response detail contains no raw tc/kernel output."""
detail = str(body.get("detail", ""))
for fragment in _TC_STDERR_FRAGMENTS:
assert fragment not in detail, (
f"V7.1.2 leak: raw tc stderr fragment {fragment!r} found in API response: {detail!r}"
)
# ── fleet / deckies ────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_fleet_enable_tarpit_tc_failure_returns_generic_detail(
client: httpx.AsyncClient,
auth_token: str,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""POST fleet tarpit with forced tc failure yields 409 + generic detail."""
def _fake_apply(veth: str, ports: list[int], delay_ms: int) -> None:
raise RuntimeError(
"RTNETLINK answers: File exists\n"
f"Error: Exclusivity flag on, cannot modify. veth={veth} qdisc=1:"
)
monkeypatch.setattr(_deckies_tarpit, "_apply_tarpit", _fake_apply)
# Also patch get_container_veth so the 404 path doesn't fire first.
monkeypatch.setattr(
_deckies_tarpit,
"get_container_veth",
lambda name: f"veth-{name}-abc123",
)
res = await client.post(_FLEET_URL, json=_TARPIT_BODY, headers=_hdr(auth_token))
assert res.status_code == 409, res.text
_assert_no_tc_leak(res.json())
@pytest.mark.asyncio
async def test_fleet_disable_tarpit_tc_failure_returns_generic_detail(
client: httpx.AsyncClient,
auth_token: str,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""DELETE fleet tarpit with forced tc failure yields 409 + generic detail."""
def _fake_remove(veth: str) -> bool:
raise RuntimeError(
f"RTNETLINK answers: No such file or directory\nveth={veth}"
)
monkeypatch.setattr(_deckies_tarpit, "_remove_tarpit", _fake_remove)
monkeypatch.setattr(
_deckies_tarpit,
"get_container_veth",
lambda name: f"veth-{name}-abc123",
)
res = await client.delete(_FLEET_URL, headers=_hdr(auth_token))
assert res.status_code == 409, res.text
_assert_no_tc_leak(res.json())
# ── topology ───────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_topology_enable_tarpit_tc_failure_returns_generic_detail(
client: httpx.AsyncClient,
auth_token: str,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""POST topology tarpit with forced tc failure yields 409 + generic detail."""
def _fake_apply(veth: str, ports: list[int], delay_ms: int) -> None:
raise RuntimeError(
f"RTNETLINK answers: File exists\nveth={veth} qdisc=1: NLMSG_ERROR errno=17"
)
monkeypatch.setattr(_topology_tarpit, "_apply_tarpit", _fake_apply)
async def _fake_resolve(repo, decky_name, *, topology_id):
return f"decnet_t_{decky_name}"
monkeypatch.setattr(
_topology_tarpit,
"resolve_decky_container",
_fake_resolve,
)
monkeypatch.setattr(
_topology_tarpit,
"get_container_veth",
lambda name: f"veth-{name}-abc123",
)
res = await client.post(_TOPO_URL, json=_TARPIT_BODY, headers=_hdr(auth_token))
assert res.status_code == 409, res.text
_assert_no_tc_leak(res.json())
@pytest.mark.asyncio
async def test_topology_disable_tarpit_tc_failure_returns_generic_detail(
client: httpx.AsyncClient,
auth_token: str,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""DELETE topology tarpit with forced tc failure yields 409 + generic detail."""
def _fake_remove(veth: str) -> bool:
raise RuntimeError(
f"RTNETLINK answers: No such file or directory\nveth={veth} errno=2"
)
monkeypatch.setattr(_topology_tarpit, "_remove_tarpit", _fake_remove)
async def _fake_resolve(repo, decky_name, *, topology_id):
return f"decnet_t_{decky_name}"
monkeypatch.setattr(
_topology_tarpit,
"resolve_decky_container",
_fake_resolve,
)
monkeypatch.setattr(
_topology_tarpit,
"get_container_veth",
lambda name: f"veth-{name}-abc123",
)
res = await client.delete(_TOPO_URL, headers=_hdr(auth_token))
assert res.status_code == 409, res.text
_assert_no_tc_leak(res.json())
# ── veth LookupError 404 path (V7.1.2 — docker stderr must not leak) ────────
def _assert_no_docker_stderr_leak(body: dict) -> None:
"""Assert that the response detail contains no raw docker/runtime output."""
detail = str(body.get("detail", ""))
for fragment in _DOCKER_STDERR_FRAGMENTS:
assert fragment not in detail, (
f"V7.1.2 leak: raw docker stderr fragment {fragment!r} found in 404 detail: {detail!r}"
)
# The detail must NOT contain a colon-separated suffix (i.e. no ': <stderr>')
# — generic message ends with 'not reachable', nothing after.
assert "Error response from daemon" not in detail
assert "OCI runtime" not in detail
@pytest.mark.asyncio
async def test_fleet_enable_tarpit_veth_failure_does_not_leak_stderr(
client: httpx.AsyncClient,
auth_token: str,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""POST fleet tarpit: veth LookupError 404 must not expose docker stderr."""
def _fake_veth(name: str) -> str:
raise LookupError(f"container {name!r} not reachable")
monkeypatch.setattr(_deckies_tarpit, "get_container_veth", _fake_veth)
res = await client.post(_FLEET_URL, json=_TARPIT_BODY, headers=_hdr(auth_token))
assert res.status_code == 404, res.text
body = res.json()
detail = str(body.get("detail", ""))
# Generic message present, no docker runtime fragments
assert "not reachable" in detail
_assert_no_docker_stderr_leak(body)
# Specifically assert the colon+stderr suffix is absent
assert ":" not in detail.split("not reachable", 1)[-1]
@pytest.mark.asyncio
async def test_fleet_disable_tarpit_veth_failure_does_not_leak_stderr(
client: httpx.AsyncClient,
auth_token: str,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""DELETE fleet tarpit: veth LookupError 404 must not expose docker stderr."""
def _fake_veth(name: str) -> str:
raise LookupError(f"container {name!r} not reachable")
monkeypatch.setattr(_deckies_tarpit, "get_container_veth", _fake_veth)
res = await client.delete(_FLEET_URL, headers=_hdr(auth_token))
assert res.status_code == 404, res.text
body = res.json()
detail = str(body.get("detail", ""))
assert "not reachable" in detail
_assert_no_docker_stderr_leak(body)
assert ":" not in detail.split("not reachable", 1)[-1]
@pytest.mark.asyncio
async def test_topology_enable_tarpit_veth_failure_does_not_leak_stderr(
client: httpx.AsyncClient,
auth_token: str,
monkeypatch: pytest.MonkeyPatch,
) -> None:
"""POST topology tarpit: veth LookupError 404 must not expose docker stderr."""
def _fake_veth(name: str) -> str:
raise LookupError(f"container {name!r} not reachable")
async def _fake_resolve(repo, decky_name, *, topology_id):
return f"decnet_t_{decky_name}"
monkeypatch.setattr(_topology_tarpit, "resolve_decky_container", _fake_resolve)
monkeypatch.setattr(_topology_tarpit, "get_container_veth", _fake_veth)
res = await client.post(_TOPO_URL, json=_TARPIT_BODY, headers=_hdr(auth_token))
assert res.status_code == 404, res.text
body = res.json()
detail = str(body.get("detail", ""))
assert "not reachable" in detail
_assert_no_docker_stderr_leak(body)
assert ":" not in detail.split("not reachable", 1)[-1]

View File

@@ -190,3 +190,15 @@ async def test_deployment_mode_endpoint(client, auth_token, monkeypatch):
assert body["role"] == "master"
assert body["mode"] == "unihost"
assert body["swarm_host_count"] == 0
@pytest.mark.anyio
async def test_deployment_mode_endpoint_requires_auth(client):
# V4.1.6: the response leaks host role + enrolled-worker count, so the
# endpoint must not be reachable without a valid viewer/admin JWT. The
# dashboard only ever calls it from inside the post-login app shell.
resp = await client.get("/api/v1/system/deployment-mode")
assert resp.status_code == 401
# Body must not leak the recon fields on the unauthenticated path.
assert "role" not in resp.text
assert "swarm_host_count" not in resp.text

View File

@@ -5,14 +5,16 @@ from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from decnet.web.router.health.api_get_health import _reset_docker_cache
from decnet.web.router.health.api_get_health import _reset_docker_cache, _reset_db_cache
@pytest.fixture(autouse=True)
def _clear_docker_cache():
def _clear_health_caches():
_reset_docker_cache()
_reset_db_cache()
yield
_reset_docker_cache()
_reset_db_cache()
@pytest.mark.anyio
@@ -144,7 +146,9 @@ async def test_health_docker_failing(client: httpx.AsyncClient, auth_token: str)
comp = resp.json()["components"]["docker"]
assert comp["status"] == "failing"
assert "connection refused" in comp["detail"]
# Internal exception message must NOT be in the detail (V7.1.2 fix).
assert "connection refused" not in comp["detail"]
assert comp["detail"] == "docker daemon unavailable"
@pytest.mark.anyio
@@ -161,7 +165,9 @@ async def test_health_database_failing(client: httpx.AsyncClient, auth_token: st
comp = resp.json()["components"]["database"]
assert comp["status"] == "failing"
assert "disk full" in comp["detail"]
# Internal exception message must NOT be in the detail (V7.1.2 fix).
assert "disk full" not in comp["detail"]
assert comp["detail"] == "database unavailable"
@pytest.mark.anyio
@@ -181,7 +187,50 @@ async def test_health_worker_exited_with_exception(client: httpx.AsyncClient, au
comp = resp.json()["components"]["collector_worker"]
assert comp["status"] == "failing"
assert "segfault" in comp["detail"]
# Internal exception message must NOT be in the detail (V7.1.2 fix).
assert "segfault" not in comp["detail"]
assert comp["detail"] == "exited with error"
# ── V7.1.2: no internal exception detail in response body ────────────────────
@pytest.mark.anyio
async def test_health_db_failure_does_not_leak_exception_class(
client: httpx.AsyncClient, auth_token: str,
) -> None:
"""V7.1.2: DB exception class/message must not appear in the HTTP response."""
from decnet.web.dependencies import repo as real_repo
with patch("decnet.web.api.get_background_tasks") as mock_tasks, \
patch("docker.from_env") as mock_docker, \
patch.object(
real_repo, "get_total_logs",
new=AsyncMock(side_effect=OSError("[Errno 28] No space left on device")),
):
_make_all_running(mock_tasks)
mock_docker.return_value = MagicMock()
resp = await client.get("/api/v1/health", headers={"Authorization": f"Bearer {auth_token}"})
detail = resp.json()["components"]["database"].get("detail", "")
assert "Errno" not in detail
assert "No space left" not in detail
assert "OSError" not in detail
@pytest.mark.anyio
async def test_health_docker_failure_does_not_leak_exception_class(
client: httpx.AsyncClient, auth_token: str,
) -> None:
"""V7.1.2: Docker socket exception must not appear in the HTTP response."""
with patch("decnet.web.api.get_background_tasks") as mock_tasks, \
patch("docker.from_env", side_effect=OSError("[Errno 111] Connection refused")):
_make_all_running(mock_tasks)
resp = await client.get("/api/v1/health", headers={"Authorization": f"Bearer {auth_token}"})
detail = resp.json()["components"]["docker"].get("detail", "")
assert "Errno" not in detail
assert "Connection refused" not in detail
assert "OSError" not in detail
# ── Helpers ──────────────────────────────────────────────────────────────────

View File

@@ -34,7 +34,10 @@ async def test_admin_lists_reachable_and_unreachable_hosts(
assert hosts["alpha"]["current_sha"] == "aaaa111"
assert hosts["alpha"]["previous_sha"] == "0000000"
assert hosts["beta"]["reachable"] is False
assert "TLS handshake" in hosts["beta"]["detail"]
# V7.1.2: internal exception text must not leak to the response body.
assert "TLS handshake" not in hosts["beta"]["detail"]
assert "RuntimeError" not in hosts["beta"]["detail"]
assert hosts["beta"]["detail"] == "host unreachable"
@pytest.mark.anyio

View File

@@ -2,8 +2,28 @@
"""POST /api/v1/swarm-updates/push — happy paths, rollback, validation."""
from __future__ import annotations
import re
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_INTERNAL_LEAK_RE = re.compile(
r"Errno|ConnectionRefused|TimeoutError|OSError|RuntimeError|Exception|"
r"Traceback|\w+Error:\s|\w+Exception:\s|File \"|line \d+",
re.IGNORECASE,
)
def _assert_no_internal_detail(detail: str | None) -> None:
"""Assert the detail string does not contain any internal exception noise."""
if detail is None:
return
assert not _INTERNAL_LEAK_RE.search(detail), (
f"V7.1.2: internal exception detail leaked to client: {detail!r}"
)
@pytest.mark.anyio
async def test_push_to_single_host_success(client, auth_token, add_host, fake_updater):
@@ -62,6 +82,49 @@ async def test_push_all_aggregates_mixed_results(client, auth_token, add_host, f
assert statuses == {"alpha": "updated", "beta": "failed"}
@pytest.mark.anyio
async def test_transport_exception_detail_does_not_leak_internals(
client, auth_token, add_host, fake_updater,
):
"""V7.1.2: a raw transport exception must never appear in the response body."""
await add_host("alpha", "10.0.0.1")
fake_updater["client"].update_responses = {
"alpha": OSError("[Errno 111] Connection refused"),
}
resp = await client.post(
"/api/v1/swarm-updates/push",
headers={"Authorization": f"Bearer {auth_token}"},
json={"all": True},
)
assert resp.status_code == 200
result = resp.json()["results"][0]
assert result["status"] == "failed"
_assert_no_internal_detail(result.get("detail"))
@pytest.mark.anyio
async def test_include_self_failure_detail_does_not_leak_internals(
client, auth_token, add_host, fake_updater,
):
"""V7.1.2: include_self transport failure must not expose exception class/msg."""
await add_host("alpha", "10.0.0.1")
fake_updater["client"].update_self_responses = {
"alpha": OSError("[Errno 104] Connection reset by peer"),
}
resp = await client.post(
"/api/v1/swarm-updates/push",
headers={"Authorization": f"Bearer {auth_token}"},
json={"all": True, "include_self": True},
)
assert resp.status_code == 200
result = resp.json()["results"][0]
# status is self-failed (non-expected drop)
assert result["status"] == "self-failed"
_assert_no_internal_detail(result.get("detail"))
@pytest.mark.anyio
async def test_tarball_built_once_across_multi_host_push(
client, auth_token, add_host, fake_updater, monkeypatch,

View File

@@ -50,7 +50,10 @@ async def test_rollback_transport_failure_reported(client, auth_token, add_host,
assert resp.status_code == 200
body = resp.json()
assert body["status"] == "failed"
assert "TLS handshake" in body["detail"]
# V7.1.2: internal exception text must not leak to the response body.
assert "TLS handshake" not in body["detail"]
assert "RuntimeError" not in body["detail"]
assert body["detail"] == "transport failure"
@pytest.mark.anyio

View File

@@ -181,3 +181,117 @@ async def test_clusterer_registered_in_cli():
"""`decnet clusterer` is registered as a master-only command."""
from decnet.cli.gating import MASTER_ONLY_COMMANDS
assert "clusterer" in MASTER_ONLY_COMMANDS
@pytest.mark.anyio
async def test_wake_during_tick_is_not_lost(repo):
"""BUG-14 regression: wake.clear() must run BEFORE wake.wait(), not after.
The worker loop pattern:
Fixed (after fix): tick → clear → wait ← current code
Buggy (before fix): tick → wait → clear
The race in the buggy pattern: a wake.set() could arrive from a _wake_on
background task between wait() returning and clear() executing. In asyncio
the task switch requires an ``await``; the original code had
``await _publish_result`` between wait() and clear(), providing a real
window. The fix closes this by moving clear() to run immediately after
tick (before wait), so there is no window between wait() returning and the
next clear().
**Structural test (red-before / green-after, deterministic):**
We intercept the internal ``wake`` event's ``clear()`` and ``wait()``
methods to record their invocation order, then assert that every
``clear`` call is immediately followed by a ``wait`` (never preceded by
one). Reverting the worker to the buggy ``wait → clear`` order produces
``("wait", "clear")`` consecutive pairs, which the assertion catches
deterministically without relying on wall-clock timing races.
"""
import unittest.mock as _mock
captured_wake: list[asyncio.Event] = []
call_log: list[str] = []
_orig_event_cls = asyncio.Event
class _LoggingEvent(_orig_event_cls): # type: ignore[misc]
"""Subclass captures the first event created (the wake event) and
logs clear()/wait() calls so we can verify their relative order."""
def __init__(self) -> None:
super().__init__()
if not captured_wake:
captured_wake.append(self)
def clear(self) -> None:
if captured_wake and self is captured_wake[0]:
call_log.append("clear")
super().clear()
async def wait(self) -> bool: # type: ignore[override]
if captured_wake and self is captured_wake[0]:
call_log.append("wait")
return await super().wait()
class _SimpleTicker(Clusterer):
name = "bug14_ticker"
async def tick(self, _repo) -> ClusterResult:
return ClusterResult()
shutdown = asyncio.Event()
with _mock.patch("decnet.clustering.worker.asyncio.Event", _LoggingEvent):
task = asyncio.create_task(
run_clusterer_loop(
repo,
poll_interval_secs=0.1,
clusterer=_SimpleTicker(),
shutdown=shutdown,
)
)
# Run for long enough to accumulate several clear/wait pairs.
await asyncio.sleep(0.5)
shutdown.set()
await asyncio.wait_for(task, timeout=2.0)
# Must have enough events to be meaningful.
assert len(call_log) >= 4, (
f"BUG-14: too few clear/wait calls recorded ({call_log!r}); "
"loop may not have run or event capture failed"
)
# Fixed invariant: every loop iteration runs clear() BEFORE wait().
# The resulting call_log for the fixed code is: clear, wait, clear, wait, ...
# For the buggy code (wait → clear) the log would be: wait, clear, wait, clear, ...
#
# We verify TWO conditions that together guarantee the fixed order:
#
# 1. The log starts with "clear" — the very first thing after tick is a clear.
# Buggy code starts with "wait" (wait ran first in the original loop).
#
# 2. Within each (clear, wait) pair at positions (2k, 2k+1), clear always
# precedes wait. We check that no "clear" appears at an ODD index (which
# would mean a clear followed another clear, or a wait appeared before
# the next clear).
assert call_log[0] == "clear", (
f"BUG-14 regression: first wake call was {call_log[0]!r}, expected 'clear'. "
f"Full log: {call_log!r}. The worker is using the buggy 'wait-then-clear' "
"order — wake.clear() must execute BEFORE wake.wait() each iteration."
)
# Verify the alternating pattern holds: indices 0,2,4,... should be "clear"
# and indices 1,3,5,... should be "wait".
for idx, call in enumerate(call_log):
if idx % 2 == 0:
assert call == "clear", (
f"BUG-14 regression: expected 'clear' at position {idx} but got "
f"{call!r} in log {call_log!r}. This indicates the loop is NOT "
"using the fixed 'clear → wait' order within each iteration."
)
else:
assert call == "wait", (
f"BUG-14 regression: expected 'wait' at position {idx} but got "
f"{call!r} in log {call_log!r}. This indicates the loop is NOT "
"using the fixed 'clear → wait' order within each iteration."
)

View File

@@ -16,6 +16,11 @@ os.environ.setdefault("DECNET_LOG_FILE", os.path.join(_TEST_LOG_DIR, "decnet.log
os.environ.setdefault("DECNET_INGEST_LOG_FILE", os.path.join(_TEST_LOG_DIR, "decnet.log"))
os.environ.setdefault("DECNET_AGENT_LOG_FILE", os.path.join(_TEST_LOG_DIR, "agent.log"))
# Explicit test-harness flag: tells env._require_env to skip secret-strength
# validation so the suite can run with weak/short secrets. This is the ONLY
# bypass — it replaced the old "any PYTEST* var present" fail-open check (V2.1.7).
os.environ["DECNET_TESTING"] = "1"
os.environ["DECNET_JWT_SECRET"] = "stable-test-secret-key-at-least-32-chars-long"
os.environ["DECNET_ADMIN_PASSWORD"] = "test-password-123"
os.environ["DECNET_DEVELOPER"] = "true"

View File

@@ -51,13 +51,34 @@ def test_admin_password_strong_value_passes(monkeypatch: pytest.MonkeyPatch) ->
assert val == "a-strong-unique-password"
def test_pytest_short_circuit_returns_value_unchecked() -> None:
# Under live pytest (PYTEST_* present), _require_env returns the configured
# value without the production checks — documents why the dev loop is safe.
# conftest sets a strong value, so this also proves lazy resolution works.
def test_testing_flag_short_circuit_returns_value_unchecked() -> None:
# Under the test harness (DECNET_TESTING=1, set in conftest), _require_env
# returns the configured value without the production checks — documents
# why the dev loop is safe. conftest sets a strong value, so this also
# proves lazy resolution works.
assert envmod._require_env("DECNET_ADMIN_PASSWORD") == "test-password-123"
def test_pytest_var_leak_does_not_bypass_validation(monkeypatch: pytest.MonkeyPatch) -> None:
# V2.1.7 regression: a leaked PYTEST_* env var must NOT disable strength
# validation. With DECNET_TESTING unset, a known-bad / short secret is
# still rejected even though a PYTEST_* var is present.
monkeypatch.setattr(
envmod.os,
"environ",
{"PYTEST_CURRENT_TEST": "x", "DECNET_ADMIN_PASSWORD": "admin"},
)
with pytest.raises(ValueError):
envmod._require_env("DECNET_ADMIN_PASSWORD")
monkeypatch.setattr(
envmod.os,
"environ",
{"PYTEST_CURRENT_TEST": "x", "DECNET_ADMIN_PASSWORD": "short1"},
)
with pytest.raises(ValueError):
envmod._require_env("DECNET_ADMIN_PASSWORD")
def test_lazy_getattr_resolves_admin_password() -> None:
# Accessing the attribute (not a module global anymore) routes through
# __getattr__ -> _require_env.

View File

@@ -8,6 +8,7 @@ from pathlib import Path
import pytest
from decnet.web.db.factory import get_repository
from decnet.web.db.models import Credential
@pytest.fixture
@@ -167,3 +168,103 @@ async def test_filters(repo) -> None:
assert len(rows) == 1 and rows[0]["service"] == "ssh"
assert await repo.get_total_credentials(service="ssh") == 1
assert await repo.get_total_credentials() == 2
@pytest.mark.anyio
async def test_concurrent_upsert_hits_integrity_retry_branch(
repo, monkeypatch
) -> None:
"""BUG-12 regression: deterministically exercise the IntegrityError
retry branch in ``upsert_credential``.
The prior asyncio.gather test proved nothing — aiosqlite serializes
both calls through one worker thread, so the second's dedup SELECT
runs only AFTER the first commits and takes the 'existing is not None'
fast path. The except-IntegrityError handler NEVER executed; the test
passed with or without the fix.
Here we force the race deterministically: the first upsert creates the
row normally. For the second upsert we monkeypatch the module-level
``select`` so its FIRST call (the dedup SELECT) yields a statement that
matches NOTHING — simulating two callers who both saw 'not found'. The
second upsert then attempts an INSERT that hits the UNIQUE constraint
→ IntegrityError → rollback → re-SELECT (a fresh, un-poisoned ``select``
call) finds the winner row → returns its id + increments attempt_count.
Red-before/green-after: if the ``except IntegrityError`` handler is
removed, the IntegrityError propagates out of the second upsert and
this test fails (raises instead of returning a matching id).
"""
from decnet.web.db.sqlmodel_repo.credentials import _core
payload = {
"attacker_ip": "10.0.0.99",
"decky_name": "decky-concurrent",
"service": "ssh",
"principal": "root",
"secret_sha256": _sha256("racepassword"),
"secret_b64": "cmFjZXBhc3N3b3Jk",
"secret_printable": "racepassword",
"fields": {},
}
# First upsert: lands the row normally.
id_a = await repo.upsert_credential(payload)
# Poison ONLY the first select() call of the next upsert so the dedup
# SELECT matches nothing (the simulated race). All later select() calls
# — including the post-IntegrityError re-SELECT — behave normally.
real_select = _core.select
calls = {"n": 0}
def _poisoned_select(*args, **kwargs):
stmt = real_select(*args, **kwargs)
calls["n"] += 1
if calls["n"] == 1:
# Append an always-false predicate so the dedup SELECT returns
# None even though the row exists → forces the INSERT path.
stmt = stmt.where(Credential.id == -1)
return stmt
monkeypatch.setattr(_core, "select", _poisoned_select)
# Second upsert: dedup SELECT misses → INSERT → IntegrityError → retry.
id_b = await repo.upsert_credential(payload)
monkeypatch.undo()
assert id_a == id_b, "retry branch must return the existing winner's id"
rows = await repo.get_credentials()
assert len(rows) == 1, f"expected 1 row, got {len(rows)} (duplicate inserts)"
assert rows[0]["attempt_count"] == 2, (
"retry branch must increment attempt_count on the winner row"
)
@pytest.mark.anyio
async def test_none_and_empty_principal_canonicalize_to_one_row(repo) -> None:
"""BUG-12 canonicalization: principal=None and principal='' canonicalize
to the SAME principal_key ('') and, with an otherwise-identical dedup
tuple, must dedup to ONE row — not crash on the UNIQUE constraint.
Before the fix the dedup SELECT distinguished None from '' (it branched
on ``is_(None)`` vs ``== principal``) while the constraint keyed on
principal_key='' for both → the second upsert's SELECT missed, the
INSERT collided, and the re-SELECT used the wrong (mismatched) filter
→ re-raise / crash. Now SELECT and constraint agree on principal_key.
"""
base = {
"attacker_ip": "10.0.0.7",
"decky_name": "decky-canon",
"service": "ssh",
"secret_sha256": _sha256("hunter2"),
"secret_b64": "aHVudGVyMg==",
"secret_printable": "hunter2",
"fields": {},
}
id_none = await repo.upsert_credential({**base, "principal": None})
id_empty = await repo.upsert_credential({**base, "principal": ""})
assert id_none == id_empty, "None and '' must dedup to the same row"
rows = await repo.get_credentials()
assert len(rows) == 1, f"expected 1 row, got {len(rows)}"
assert rows[0]["attempt_count"] == 2

View File

@@ -19,7 +19,7 @@ from typing import Optional
import pytest
from decnet.intel.base import IntelProvider, IntelResult
from decnet.intel.worker import run_intel_loop, _aggregate
from decnet.intel.worker import run_intel_loop, _aggregate, _enrich_one
from decnet.web.db.factory import get_repository
@@ -206,6 +206,54 @@ async def test_provider_error_does_not_poison_row(repo):
assert row["aggregate_verdict"] == "benign"
@pytest.mark.anyio
async def test_unexpected_provider_raise_does_not_lose_other_results():
"""BUG-15 regression: an unexpected exception from one provider must
not cancel sibling providers or swallow their results.
Before the fix ``asyncio.gather(..., return_exceptions=False)`` let an
unexpected raise propagate immediately, cancelling all sibling tasks
and losing their results for the whole IP batch.
After the fix ``return_exceptions=True`` is used; exception results are
filtered out and logged, while valid :class:`IntelResult` objects from
other providers are processed normally.
"""
class _RaisingProvider(IntelProvider):
"""Simulates an unexpected (non-contractual) exception."""
concurrency = 1
min_dispatch_interval_s = 0.0
name = "exploding"
async def lookup(self, ip: str) -> IntelResult:
raise RuntimeError("unexpected boom")
good = _FakeProvider(
"greynoise",
verdict="benign",
column_updates={
"greynoise_classification": "benign",
"greynoise_raw": {},
"greynoise_queried_at": datetime.now(timezone.utc),
},
)
bad = _RaisingProvider()
row = await _enrich_one(
attacker_uuid="test-uuid",
ip="10.0.0.1",
providers=[good, bad],
ttl_hours=24,
)
# The good provider's data must be present despite the bad one raising.
assert row["greynoise_classification"] == "benign"
assert row["aggregate_verdict"] == "benign"
# The bad provider did not poison the row or raise to the caller.
assert good.calls == ["10.0.0.1"]
@pytest.mark.anyio
async def test_intel_enriched_event_published_to_bus(repo, monkeypatch):
"""End-to-end: worker dispatches providers + publishes the event."""

View File

@@ -16,6 +16,7 @@ exercise:
from __future__ import annotations
import asyncio
import logging
import pathlib
import socket
@@ -255,3 +256,213 @@ async def test_listener_tolerates_client_dropping_mid_stream(
await asyncio.wait_for(listener_task, timeout=5)
except asyncio.TimeoutError:
listener_task.cancel()
# ----------------------------------------------------- V9.1.3 fail-closed CN
class _FakeWriter:
"""Minimal asyncio.StreamWriter stand-in for _handle_connection.
Records close()/wait_closed() so a test can assert the connection was
torn down without binding a real socket.
"""
def __init__(self, ssl_object: object = None, peername: object = ("1.2.3.4", 4242)) -> None:
self._extra = {"ssl_object": ssl_object, "peername": peername}
self.closed = False
self.wait_closed_called = False
self.written: list[bytes] = []
def get_extra_info(self, key: str, default: object = None) -> object:
return self._extra.get(key, default)
def close(self) -> None:
self.closed = True
async def wait_closed(self) -> None:
self.wait_closed_called = True
def write(self, data: bytes) -> None: # pragma: no cover - not expected
self.written.append(data)
def _drained_reader(frame: bytes) -> asyncio.StreamReader:
r = asyncio.StreamReader()
r.feed_data(frame)
r.feed_eof()
return r
@pytest.mark.asyncio
async def test_listener_rejects_unknown_cn_ingests_nothing(
tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""V9.1.3 FAIL-CLOSED: a peer whose cert yields CN='unknown'
(malformed/empty/missing CN) must be closed and ingest NOTHING — even
though the frame on the wire is a perfectly valid RFC 5424 line."""
master_log = tmp_path / "master.log"
master_json = tmp_path / "master.json"
cfg = lst.ListenerConfig(
log_path=master_log, json_path=master_json,
bind_host="127.0.0.1", bind_port=0, ca_dir=tmp_path / "ca",
)
# Force peer_cn -> "unknown" regardless of the (absent) ssl object.
monkeypatch.setattr(lst, "peer_cn", lambda _ssl: "unknown")
payload = b'<13>1 2026-04-18T00:00:00Z decky01 svc - - - should-not-ingest'
reader = _drained_reader(f"{len(payload)} ".encode() + payload)
writer = _FakeWriter()
await lst._handle_connection(reader, writer, cfg) # type: ignore[arg-type]
assert writer.closed, "unknown-CN connection must be closed"
assert writer.wait_closed_called
# Nothing must have been ingested into either sink.
assert not master_log.exists() or master_log.stat().st_size == 0
assert not master_json.exists() or master_json.stat().st_size == 0
@pytest.mark.asyncio
async def test_listener_processes_valid_cn_normally(
tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch
) -> None:
"""A peer with a parseable CN is still processed and tagged with its
provenance — the fail-closed guard does not regress the happy path."""
master_log = tmp_path / "master.log"
master_json = tmp_path / "master.json"
cfg = lst.ListenerConfig(
log_path=master_log, json_path=master_json,
bind_host="127.0.0.1", bind_port=0, ca_dir=tmp_path / "ca",
)
monkeypatch.setattr(lst, "peer_cn", lambda _ssl: "worker-good")
payload = (
b'<13>1 2026-04-18T00:00:00Z decky01 svc 1 - '
b'[decnet@53595 decky="decky01" service="svc" event_type="connect" '
b'attacker_ip="1.2.3.4" attacker_port="4242"] hello-good'
)
reader = _drained_reader(f"{len(payload)} ".encode() + payload)
writer = _FakeWriter()
await lst._handle_connection(reader, writer, cfg) # type: ignore[arg-type]
assert writer.closed
assert master_log.exists() and b"hello-good" in master_log.read_bytes()
# Provenance tagged from the (good) CN in the JSON sink.
assert master_json.exists() and "worker-good" in master_json.read_text()
# ------------------------------------------------------- BUG-16 shutdown errors
@pytest.mark.asyncio
async def test_listener_shutdown_surfaces_serve_task_error(
tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
) -> None:
"""BUG-16: a non-CancelledError raised by the serve task during shutdown
must be logged, not silently swallowed."""
class _BoomServer:
def __init__(self) -> None:
self.sockets: tuple = ()
async def serve_forever(self) -> None:
# Run until cancelled, then raise a REAL error instead of honoring
# the CancelledError — emulates an OSError surfacing as the serve
# task is awaited after server.close()/cancel() during shutdown.
try:
await asyncio.Event().wait() # block until cancelled
except asyncio.CancelledError:
raise OSError("boom during serve") from None
def close(self) -> None:
pass
async def __aenter__(self) -> "_BoomServer":
return self
async def __aexit__(self, *exc: object) -> None:
pass
async def _fake_start_server(*_a: object, **_kw: object) -> _BoomServer:
return _BoomServer()
monkeypatch.setattr(lst.asyncio, "start_server", _fake_start_server)
monkeypatch.setattr(lst, "build_listener_ssl_context", lambda _ca: None)
cfg = lst.ListenerConfig(
log_path=tmp_path / "m.log", json_path=tmp_path / "m.json",
bind_host="127.0.0.1", bind_port=0, ca_dir=tmp_path / "ca",
)
stop = asyncio.Event()
async def _stop_soon() -> None:
# Let the serve task actually start before we request shutdown, so
# the cancel path (not a never-scheduled task) is what surfaces.
await asyncio.sleep(0.05)
stop.set()
waiter = asyncio.create_task(_stop_soon())
with caplog.at_level(logging.ERROR, logger="swarm.listener"):
await lst.run_listener(cfg, stop_event=stop)
await waiter
assert any(
"serve task errored during shutdown" in r.getMessage() for r in caplog.records
), "listener swallowed a real serve-task error on shutdown"
@pytest.mark.asyncio
async def test_forwarder_shutdown_surfaces_heartbeat_error(
tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture
) -> None:
"""BUG-16: a non-CancelledError from the heartbeat task during forwarder
shutdown must be logged, not silently suppressed."""
started = asyncio.Event()
async def _boom_heartbeat(*_a: object, **_kw: object) -> None:
# Signal that we actually ran, then fail — guarantees the task has a
# stored exception (not just a pending cancel) by shutdown time.
started.set()
raise RuntimeError("heartbeat boom")
# Bus unavailable -> bus=None path; heartbeat task still created.
def _no_bus(*_a: object, **_kw: object):
raise RuntimeError("no bus in test")
# Make the connect attempt fail with OSError so run_forwarder takes its
# caught backoff branch (which yields control, letting the heartbeat task
# run and raise) instead of propagating an uncaught error.
def _boom_ctx(*_a: object, **_kw: object):
raise OSError("no ssl context in test")
monkeypatch.setattr(fwd, "get_bus", _no_bus)
monkeypatch.setattr(fwd, "run_health_heartbeat", _boom_heartbeat)
monkeypatch.setattr(fwd, "build_worker_ssl_context", _boom_ctx)
cfg = fwd.ForwarderConfig(
log_path=tmp_path / "decnet.log",
master_host="127.0.0.1", master_port=0,
agent_dir=tmp_path / "agent",
state_db=tmp_path / "fwd.db",
)
stop = asyncio.Event()
async def _stop_after_heartbeat_ran() -> None:
# Let the heartbeat task get scheduled and raise before we ask the
# forwarder to shut down, so the finally block observes the error.
await started.wait()
stop.set()
waiter = asyncio.create_task(_stop_after_heartbeat_ran())
with caplog.at_level(logging.ERROR, logger="swarm.forwarder"):
await fwd.run_forwarder(cfg, poll_interval=0.01, stop_event=stop)
await waiter
assert any(
"heartbeat task errored during shutdown" in r.getMessage() for r in caplog.records
), "forwarder swallowed a real heartbeat-task error on shutdown"

View File

@@ -20,7 +20,7 @@ from __future__ import annotations
import inspect
import sys
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
@@ -209,3 +209,185 @@ emits:
await sync_task
except (asyncio.CancelledError, Exception): # noqa: BLE001
pass
# ---------------------------------------------------------------------------
# BUG-13 regression: tail_db must not drop rules updated AT the watermark
# ---------------------------------------------------------------------------
_SHARED_YAML_TEMPLATE = """\
rule_id: {rule_id}
rule_version: 1
name: {name}
applies_to: [command]
match:
pattern: 'test'
emits:
- tactic: TA0007
technique_id: T1033
confidence: 0.85
"""
async def test_tail_db_same_timestamp_both_rules_emitted(
db_store: DatabaseRuleStore, tmp_path: Path,
) -> None:
"""BUG-13 regression: two rules with the SAME updated_at timestamp are
BOTH emitted by tail_db across the watermark boundary (none dropped).
The pre-fix code used ``updated_at > watermark`` which silently
dropped rules whose timestamp equalled the watermark. The fix
changes to ``>=`` and deduplicates by rule_id within the window,
advancing the watermark by 1 µs after emitting to prevent re-emission.
"""
import asyncio # noqa: PLC0415
# Pin a past watermark so both rules are in scope on the first poll.
shared_ts = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
db_store._tail_watermark = shared_ts
# Insert two TTPRule rows with identical updated_at.
repo = db_store._repo
assert repo is not None
for rule_id, name in (("R1001", "rule one"), ("R1002", "rule two")):
yaml_content = _SHARED_YAML_TEMPLATE.format(rule_id=rule_id, name=name)
async with repo._session() as session: # type: ignore[attr-defined]
row = TTPRule(
rule_id=rule_id,
rule_version=1,
source_path=f"./rules/ttp/{rule_id}.yaml",
yaml_content=yaml_content,
updated_at=shared_ts,
updated_by="test",
)
session.add(row)
await session.commit()
# Patch _emit_change to capture rule_ids without touching subscribers.
emitted_via_tail: set[str] = set()
original_emit = db_store._emit_change
async def _capture_emit(change, **kwargs): # type: ignore[no-untyped-def]
emitted_via_tail.add(change.rule_id)
await original_emit(change, **kwargs)
db_store._emit_change = _capture_emit # type: ignore[assignment]
db_store._stop.clear()
# Run tail_db for one short cycle then stop.
poll_task = asyncio.create_task(db_store.tail_db(poll_interval=0.01))
await asyncio.sleep(0.08)
db_store._stop.set()
try:
await asyncio.wait_for(poll_task, timeout=2.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
poll_task.cancel()
assert "R1001" in emitted_via_tail, "R1001 must be emitted by tail_db"
assert "R1002" in emitted_via_tail, "R1002 must be emitted by tail_db"
# After emitting both rules at shared_ts, the seen-ids set must record
# them so that a second poll at the same watermark skips re-emission.
# The watermark itself stays at shared_ts (no newer rows existed) but
# _tail_seen_ids acts as the dedup guard.
assert "R1001" in db_store._tail_seen_ids, "R1001 must be in _tail_seen_ids"
assert "R1002" in db_store._tail_seen_ids, "R1002 must be in _tail_seen_ids"
# Simulate a second poll — the rules should NOT be re-emitted.
emitted_via_tail.clear()
db_store._stop.clear()
poll_task2 = asyncio.create_task(db_store.tail_db(poll_interval=0.01))
await asyncio.sleep(0.08)
db_store._stop.set()
try:
await asyncio.wait_for(poll_task2, timeout=2.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
poll_task2.cancel()
assert "R1001" not in emitted_via_tail, "R1001 must NOT be re-emitted on second poll"
assert "R1002" not in emitted_via_tail, "R1002 must NOT be re-emitted on second poll"
async def test_tail_db_coarse_timestamp_late_rule_still_emitted(
db_store: DatabaseRuleStore, tmp_path: Path,
) -> None:
"""BUG-13 (microsecond-advance regression): on coarse second-resolution
timestamps (MySQL DATETIME) a rule saved at the SAME whole-second AFTER
a poll must STILL be emitted on the next poll — not dropped.
The defective fix advanced the watermark to ``max_ts + 1 µs`` after a
poll. On second-resolution storage that bump lands inside the same
whole-second bucket, so a row written later in that same second has
``updated_at < watermark`` and the ``>= watermark`` query silently drops
it — reintroducing the same-timestamp bug.
The correct fix keeps the watermark AT max_ts and relies solely on
``_tail_seen_ids`` for dedup. This test simulates coarse storage by
writing every row at the identical whole-second timestamp.
Red-before/green-after: with ``max_ts + 1 µs`` the second rule
(written at the same whole second after the first poll) is dropped and
this test fails; keeping the watermark at max_ts emits it.
"""
import asyncio # noqa: PLC0415
coarse_ts = datetime(2024, 6, 1, 9, 0, 0, tzinfo=timezone.utc)
# Start the watermark strictly BEFORE the rows so the first poll takes
# the advancing (max_ts > watermark) branch — the exact branch where the
# defective +1 µs bump skips past same-second late arrivals.
db_store._tail_watermark = coarse_ts - timedelta(seconds=5)
repo = db_store._repo
assert repo is not None
async def _insert(rule_id: str) -> None:
yaml_content = _SHARED_YAML_TEMPLATE.format(rule_id=rule_id, name=rule_id)
async with repo._session() as session: # type: ignore[attr-defined]
session.add(TTPRule(
rule_id=rule_id,
rule_version=1,
source_path=f"./rules/ttp/{rule_id}.yaml",
yaml_content=yaml_content,
updated_at=coarse_ts, # identical whole-second timestamp
updated_by="test",
))
await session.commit()
emitted: list[str] = []
original_emit = db_store._emit_change
async def _capture_emit(change, **kwargs): # type: ignore[no-untyped-def]
emitted.append(change.rule_id)
await original_emit(change, **kwargs)
db_store._emit_change = _capture_emit # type: ignore[assignment]
# First poll: only R2001 exists yet.
await _insert("R2001")
db_store._stop.clear()
poll1 = asyncio.create_task(db_store.tail_db(poll_interval=0.01))
await asyncio.sleep(0.05)
db_store._stop.set()
try:
await asyncio.wait_for(poll1, timeout=2.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
poll1.cancel()
assert "R2001" in emitted
# Watermark stayed at the coarse second; seen-ids guards re-emission.
assert db_store._tail_watermark == coarse_ts
assert "R2001" in db_store._tail_seen_ids
# A rule arrives LATER, in the same whole second (coarse resolution).
emitted.clear()
await _insert("R2002")
db_store._stop.clear()
poll2 = asyncio.create_task(db_store.tail_db(poll_interval=0.01))
await asyncio.sleep(0.05)
db_store._stop.set()
try:
await asyncio.wait_for(poll2, timeout=2.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
poll2.cancel()
assert "R2002" in emitted, "late same-second rule must NOT be dropped"
assert "R2001" not in emitted, "already-emitted rule must not re-fire"