diff --git a/decnet/clustering/worker.py b/decnet/clustering/worker.py index 98b49ace..cf3bed06 100644 --- a/decnet/clustering/worker.py +++ b/decnet/clustering/worker.py @@ -102,13 +102,13 @@ async def run_clusterer_loop( await _publish_result(bus, result) + wake.clear() try: await asyncio.wait_for( wake.wait(), timeout=float(poll_interval_secs), ) except asyncio.TimeoutError: pass - wake.clear() except (asyncio.CancelledError, KeyboardInterrupt): log.info("clusterer stopped") finally: diff --git a/decnet/correlation/reuse_worker.py b/decnet/correlation/reuse_worker.py index a3a3e607..e386c8c0 100644 --- a/decnet/correlation/reuse_worker.py +++ b/decnet/correlation/reuse_worker.py @@ -107,13 +107,13 @@ async def run_reuse_loop( event_type=_topics.CREDENTIAL_REUSE_DETECTED, ) + wake.clear() try: await asyncio.wait_for( wake.wait(), timeout=float(poll_interval_secs), ) except asyncio.TimeoutError: pass - wake.clear() except (asyncio.CancelledError, KeyboardInterrupt): log.info("reuse correlator stopped") finally: diff --git a/decnet/env.py b/decnet/env.py index f6eacc2a..fc0c3e3f 100644 --- a/decnet/env.py +++ b/decnet/env.py @@ -49,7 +49,12 @@ def _require_env(name: str) -> str: f"Set it in .env.local or export it before starting DECNET." ) - if any(k.startswith("PYTEST") for k in os.environ): + # Strength validation is bypassed ONLY under the explicit, non-attacker- + # injectable DECNET_TESTING=1 flag (set by the test harness). The old + # "any PYTEST* var present" check was a fail-open bug: PYTEST* is an + # attacker-controllable namespace, so leaking one into a prod environment + # silently disabled the known-bad/length guards. Fail closed (V2.1.7). + if os.environ.get("DECNET_TESTING") == "1": return value if value.lower() in _KNOWN_BAD: diff --git a/decnet/intel/worker.py b/decnet/intel/worker.py index e957e4ad..f0ec371f 100644 --- a/decnet/intel/worker.py +++ b/decnet/intel/worker.py @@ -108,10 +108,19 @@ async def _enrich_one( async with p._semaphore: return await p.lookup(ip) - results: list[IntelResult] = await asyncio.gather( + raw = await asyncio.gather( *(_guarded_lookup(p, ip) for p in providers), - return_exceptions=False, # providers contractually never raise + return_exceptions=True, ) + results: list[IntelResult] = [] + for r in raw: + if isinstance(r, BaseException): + log.warning( + "intel: provider raised unexpectedly for ip=%s: %s", + ip, r, + ) + else: + results.append(r) now = datetime.now(timezone.utc) row: dict[str, Any] = { @@ -220,13 +229,13 @@ async def run_intel_loop( attacker_uuid, ip, ) + wake.clear() try: await asyncio.wait_for( wake.wait(), timeout=float(poll_interval_secs), ) except asyncio.TimeoutError: pass - wake.clear() except (asyncio.CancelledError, KeyboardInterrupt): log.info("intel worker stopped") finally: diff --git a/decnet/network.py b/decnet/network.py index 0cb9c987..2c489a69 100644 --- a/decnet/network.py +++ b/decnet/network.py @@ -15,6 +15,10 @@ from ipaddress import IPv4Address, IPv4Interface, IPv4Network import docker +from decnet.logging import get_logger + +log = get_logger("network") + MACVLAN_NETWORK_NAME = "decnet_lan" HOST_MACVLAN_IFACE = "decnet_macvlan0" HOST_IPVLAN_IFACE = "decnet_ipvlan0" @@ -491,9 +495,12 @@ def get_container_veth(container_name: str) -> str: check=False, ) if result.returncode != 0: - raise LookupError( - f"container {container_name!r} not reachable: {result.stderr.strip()}" + log.warning( + "get_container_veth: docker exec failed for container %r: %s", + container_name, + result.stderr.strip(), ) + raise LookupError(f"container {container_name!r} not reachable") peer_index = result.stdout.strip() links = _run(["ip", "link", "show"]) for line in links.stdout.splitlines(): diff --git a/decnet/swarm/log_forwarder.py b/decnet/swarm/log_forwarder.py index 1c73cb6b..9fa95a13 100644 --- a/decnet/swarm/log_forwarder.py +++ b/decnet/swarm/log_forwarder.py @@ -240,8 +240,14 @@ async def run_forwarder( backoff = min(_MAX_BACKOFF, backoff * 2) finally: heartbeat_task.cancel() - with contextlib.suppress(asyncio.CancelledError, Exception): + try: await heartbeat_task + except asyncio.CancelledError: + pass + except Exception: + # BUG-16 — don't silently swallow a real heartbeat-task error on + # shutdown; log it so a failing heartbeat coroutine is visible. + log.exception("forwarder heartbeat task errored during shutdown") if bus is not None: with contextlib.suppress(Exception): await bus.close() diff --git a/decnet/swarm/log_listener.py b/decnet/swarm/log_listener.py index 374fe333..6769e473 100644 --- a/decnet/swarm/log_listener.py +++ b/decnet/swarm/log_listener.py @@ -113,6 +113,24 @@ async def _handle_connection( ssl_obj = writer.get_extra_info("ssl_object") cn = peer_cn(ssl_obj) peer = writer.get_extra_info("peername") + + # V9.1.3 — FAIL CLOSED on unattributable provenance. The CA gates + # enrollment at the TLS handshake, but a CA-signed cert with a + # malformed/empty/missing CN subject slips through with an opaque + # 'unknown' label. We refuse to ingest anything we cannot attribute: + # close the connection immediately and ingest NOTHING. + if cn == "unknown": + log.warning( + "listener rejecting unattributable peer (CN=unknown) peer=%s — closing, ingesting nothing", + peer, + ) + writer.close() + try: + await writer.wait_closed() + except Exception: # nosec B110 — socket cleanup is best-effort + pass + return + log.info("listener accepted worker=%s peer=%s", cn, peer) # Lazy import to avoid a circular dep if the collector pulls in logger setup. @@ -191,5 +209,9 @@ async def run_listener( serve_task.cancel() try: await serve_task - except (asyncio.CancelledError, Exception): # nosec B110 + except asyncio.CancelledError: pass + except Exception: + # BUG-16 — do NOT swallow real shutdown errors (OSError etc). + # Surface them so a wedged/erroring serve task is visible. + log.exception("listener serve task errored during shutdown") diff --git a/decnet/ttp/store/impl/database.py b/decnet/ttp/store/impl/database.py index d63521db..a4dd3d6f 100644 --- a/decnet/ttp/store/impl/database.py +++ b/decnet/ttp/store/impl/database.py @@ -231,6 +231,9 @@ class DatabaseRuleStore(RuleStore): self._subscribers: list[asyncio.Queue[RuleChange]] = [] self._tail_task: asyncio.Task[None] | None = None self._tail_watermark: datetime | None = None + # rule_ids already emitted at the current watermark timestamp; reset + # whenever the watermark advances (BUG-13 dedup across same-ts rows). + self._tail_seen_ids: set[str] = set() self._sync_task: asyncio.Task[None] | None = None self._stop = asyncio.Event() self._lazy_lock = asyncio.Lock() @@ -504,6 +507,10 @@ class DatabaseRuleStore(RuleStore): receive per-rule definition changes without a shared bus round-trip. The watermark advances on every observed row; first poll initializes it to "now" so we don't replay history. + + Single-poller only: the instance state ``_tail_watermark`` / + ``_tail_seen_ids`` is NOT safe for concurrent pollers on the same + store instance. """ repo = await self._ensure_repo() if self._tail_watermark is None: @@ -511,14 +518,31 @@ class DatabaseRuleStore(RuleStore): while not self._stop.is_set(): try: async with repo._session() as session: # type: ignore[attr-defined] + # Use >= so rules whose updated_at equals the watermark are + # not silently skipped on the next poll (BUG-13 fix). + # Rows at exactly the watermark timestamp are deduplicated + # by rule_id so we don't re-emit rules we already fired. rows = ( await session.execute( sa_select(TTPRule).where( - col(TTPRule.updated_at) > self._tail_watermark, + col(TTPRule.updated_at) >= self._tail_watermark, ), ) ).scalars().all() + max_ts: datetime | None = None + emitted_at_ts: dict[str, datetime] = {} for rule_row in rows: + # Normalize to UTC-aware before comparison so naive + # datetimes stored by the DB don't cause TypeError. + row_ts = rule_row.updated_at + if row_ts.tzinfo is None: + row_ts = row_ts.replace(tzinfo=timezone.utc) + # Skip rules we already emitted at exactly this watermark. + if ( + row_ts == self._tail_watermark + and rule_row.rule_id in self._tail_seen_ids + ): + continue state = await self.get_state(rule_row.rule_id) compiled = _yaml_to_compiled(rule_row.yaml_content, state) await self._emit_change( @@ -529,11 +553,31 @@ class DatabaseRuleStore(RuleStore): "rule_version": compiled.rule_version, }, ) - if ( - self._tail_watermark is None - or rule_row.updated_at > self._tail_watermark - ): - self._tail_watermark = rule_row.updated_at + emitted_at_ts[rule_row.rule_id] = row_ts + if max_ts is None or row_ts > max_ts: + max_ts = row_ts + if max_ts is not None: + # Keep the watermark AT max_ts — do NOT add 1 µs. On coarse + # second-resolution timestamps (MySQL DATETIME) a 1 µs bump + # would land inside the same whole-second bucket and the + # next `>= watermark` query would silently drop a rule + # saved later in that same second (reintroducing BUG-13). + # We rely SOLELY on _tail_seen_ids to dedup already-emitted + # rule_ids at the watermark timestamp. + if max_ts > self._tail_watermark: + # A strictly-newer timestamp appeared: advance the + # watermark and reset seen-ids to only the rule_ids + # AT the new max_ts (rows below it cannot reappear in a + # future `>= max_ts` query, so they need no dedup). + self._tail_watermark = max_ts + self._tail_seen_ids = { + rid for rid, ts in emitted_at_ts.items() + if ts == max_ts + } + else: + # All emitted rows share the current watermark + # timestamp; record them so the next poll skips them. + self._tail_seen_ids.update(emitted_at_ts) except Exception: # noqa: BLE001 _log.exception("ttp.store.db: tail poll failed") try: diff --git a/decnet/web/auth.py b/decnet/web/auth.py index 95890658..d59a9e56 100644 --- a/decnet/web/auth.py +++ b/decnet/web/auth.py @@ -9,6 +9,10 @@ from decnet.env import DECNET_JWT_SECRET, DECNET_JWT_EXP_MINUTES SECRET_KEY: str = DECNET_JWT_SECRET ALGORITHM: str = "HS256" +# Live constant — sourced from env DECNET_JWT_EXP_MINUTES (default 240 / 4 h). +# Idle/inactivity timeout is intentionally not implemented: jti denylist covers +# explicit logout and the 4 h absolute TTL bounds the worst-case exposure window. +# Accept-risk: LOW / pre-v1 — revisit at v1 when user-facing session UX lands. ACCESS_TOKEN_EXPIRE_MINUTES: int = DECNET_JWT_EXP_MINUTES # Pinned issuer/audience/type so a token signed with DECNET_JWT_SECRET for any @@ -21,6 +25,9 @@ JWT_TYPE: str = "access" def verify_password(plain_password: str, hashed_password: str) -> bool: + # [:72] is a defensive safety-net against bcrypt silent truncation. + # Validated callers already reject >72-byte passwords via field_validator, + # so this slice is unreachable for well-formed input. return bcrypt.checkpw( plain_password.encode("utf-8")[:72], hashed_password.encode("utf-8") @@ -28,7 +35,9 @@ def verify_password(plain_password: str, hashed_password: str) -> bool: def get_password_hash(password: str) -> str: - # Use a cost factor of 12 (default for passlib/bcrypt) + # Use a cost factor of 12 (default for passlib/bcrypt). + # [:72] is a defensive safety-net; field_validator rejects >72-byte input + # before it reaches this function. _salt: bytes = bcrypt.gensalt(rounds=12) _hashed: bytes = bcrypt.hashpw(password.encode("utf-8")[:72], _salt) return _hashed.decode("utf-8") diff --git a/decnet/web/db/models/auth.py b/decnet/web/db/models/auth.py index 34ad20e2..85ae18aa 100644 --- a/decnet/web/db/models/auth.py +++ b/decnet/web/db/models/auth.py @@ -3,10 +3,18 @@ from datetime import datetime, timezone from typing import List, Literal, Optional -from pydantic import BaseModel, Field as PydanticField +from pydantic import BaseModel, Field as PydanticField, field_validator from sqlmodel import Field, SQLModel +def _reject_over_72_bytes(v: str) -> str: + """bcrypt silently truncates at 72 bytes; reject instead to avoid + collision/confusion between passwords that share a 72-byte prefix.""" + if len(v.encode("utf-8")) > 72: + raise ValueError("password must not exceed 72 UTF-8 bytes (bcrypt limit)") + return v + + class User(SQLModel, table=True): __tablename__ = "users" uuid: str = Field(primary_key=True) @@ -55,6 +63,11 @@ class ChangePasswordRequest(BaseModel): # floor a seeded admin could clear must_change_password with a 1-char secret. new_password: str = PydanticField(..., min_length=12, max_length=72) + @field_validator("old_password", "new_password", mode="after") + @classmethod + def _check_byte_limit(cls, v: str) -> str: + return _reject_over_72_bytes(v) + class SSETicketResponse(BaseModel): """Single-use, short-lived opaque ticket the dashboard exchanges its header @@ -68,16 +81,26 @@ class SSETicketResponse(BaseModel): class CreateUserRequest(BaseModel): username: str = PydanticField(..., min_length=1, max_length=64) - password: str = PydanticField(..., min_length=8, max_length=72) + password: str = PydanticField(..., min_length=12, max_length=72) role: Literal["admin", "viewer"] = "viewer" + @field_validator("password", mode="after") + @classmethod + def _check_byte_limit(cls, v: str) -> str: + return _reject_over_72_bytes(v) + class UpdateUserRoleRequest(BaseModel): role: Literal["admin", "viewer"] class ResetUserPasswordRequest(BaseModel): - new_password: str = PydanticField(..., min_length=8, max_length=72) + new_password: str = PydanticField(..., min_length=12, max_length=72) + + @field_validator("new_password", mode="after") + @classmethod + def _check_byte_limit(cls, v: str) -> str: + return _reject_over_72_bytes(v) class DeploymentLimitRequest(BaseModel): diff --git a/decnet/web/db/models/logs.py b/decnet/web/db/models/logs.py index da581009..824c2ba9 100644 --- a/decnet/web/db/models/logs.py +++ b/decnet/web/db/models/logs.py @@ -67,6 +67,16 @@ class Credential(SQLModel, table=True): __table_args__ = ( Index("ix_credentials_secret_service", "secret_sha256", "service"), Index("ix_credentials_principal_service", "principal", "service"), + # Dedup constraint: same (attacker_ip, decky, service, secret_kind, + # secret_sha256, principal_key) → one row. ``principal_key`` is + # the non-null canonical form of ``principal`` (empty string when + # principal is NULL) so the constraint is UNIQUE-safe under SQLite's + # NULL-distinct behaviour and MySQL's standard UNIQUE semantics. + UniqueConstraint( + "attacker_ip", "decky_name", "service", + "secret_kind", "secret_sha256", "principal_key", + name="uq_credentials_dedup", + ), ) id: Optional[int] = Field(default=None, primary_key=True) # Keyed by attacker IP (not attackers.uuid) on the write path to @@ -81,6 +91,11 @@ class Credential(SQLModel, table=True): decky_name: str = Field(index=True) service: str = Field(index=True) principal: Optional[str] = Field(default=None, index=True, max_length=256) + # Non-null canonical form of ``principal`` used in ``uq_credentials_dedup``. + # Empty string when ``principal`` is NULL so the UNIQUE constraint behaves + # correctly under SQLite's NULL-distinct semantics (same pattern as + # ``CredentialReuse.principal_key``). + principal_key: str = Field(default="", max_length=256) # Discriminator for what `secret_b64` actually contains. Default # ``"plaintext"`` — a recoverable password the attacker sent on the # wire (SSH/Telnet/FTP/IMAP/POP3/SMTP/Redis/LDAP/MQTT). Other kinds: diff --git a/decnet/web/db/sqlmodel_repo/credentials/_core.py b/decnet/web/db/sqlmodel_repo/credentials/_core.py index 5f4607d3..1b2fd3e5 100644 --- a/decnet/web/db/sqlmodel_repo/credentials/_core.py +++ b/decnet/web/db/sqlmodel_repo/credentials/_core.py @@ -7,6 +7,7 @@ from datetime import datetime, timezone from typing import Any, List, Optional from sqlalchemy import desc, func, or_, select, update +from sqlalchemy.exc import IntegrityError from sqlmodel import col from sqlmodel.sql.expression import SelectOfScalar @@ -31,18 +32,32 @@ class CredentialsCoreMixin(_MixinBase): payload["fields"] = json.dumps(payload["fields"], ensure_ascii=True) principal = payload.get("principal") + # Non-null canonical form used by the uq_credentials_dedup constraint + # AND by the dedup SELECT — both MUST key on the SAME value or the + # SELECT can miss a row that the constraint then collides on + # (e.g. principal=None and principal="" both canonicalize to ""): + # the SELECT would treat them as distinct, INSERT, hit IntegrityError, + # then re-SELECT with the wrong filter and re-raise. ``principal or ""`` + # collapses None and "" identically — mirrors CredentialReuse and the + # constraint key. (BUG-12: canonicalization must not diverge.) + principal_key = principal or "" secret_kind = payload.get("secret_kind") or "plaintext" - async with self._session() as session: - stmt = select(Credential).where( + + def _build_dedup_filter(): + return ( Credential.attacker_ip == payload["attacker_ip"], Credential.decky_name == payload["decky_name"], Credential.service == payload["service"], Credential.secret_kind == secret_kind, Credential.secret_sha256 == payload["secret_sha256"], - # NULL == NULL is False under SQL — branch the predicate. - (Credential.principal == principal) if principal is not None - else col(Credential.principal).is_(None), + # Key the SELECT on principal_key — the SAME canonical value + # the UNIQUE constraint uses — so SELECT and constraint never + # disagree about which rows collide. + Credential.principal_key == principal_key, ) + + async with self._session() as session: + stmt = select(Credential).where(*_build_dedup_filter()) existing = (await session.execute(stmt)).scalar_one_or_none() now = datetime.now(timezone.utc) if existing is not None: @@ -58,6 +73,7 @@ class CredentialsCoreMixin(_MixinBase): decky_name=payload["decky_name"], service=payload["service"], principal=principal, + principal_key=principal_key, secret_kind=secret_kind, secret_sha256=payload["secret_sha256"], secret_b64=payload.get("secret_b64"), @@ -69,7 +85,25 @@ class CredentialsCoreMixin(_MixinBase): attempt_count=1, ) session.add(row) - await session.commit() + try: + await session.commit() + except IntegrityError: + # Concurrent upsert for the same dedup key beat us — re-SELECT + # the winner row and increment its counter in a fresh session. + await session.rollback() + async with self._session() as session2: + stmt2 = select(Credential).where(*_build_dedup_filter()) + existing2 = (await session2.execute(stmt2)).scalar_one_or_none() + if existing2 is None: + # Extremely unlikely (e.g. deleted between races); bail. + raise + existing2.attempt_count = (existing2.attempt_count or 1) + 1 + existing2.last_seen = now + if payload.get("outcome") is not None: + existing2.outcome = payload["outcome"] + session2.add(existing2) + await session2.commit() + return existing2.id await session.refresh(row) return row.id # type: ignore[return-value] diff --git a/decnet/web/dependencies.py b/decnet/web/dependencies.py index eca34aae..ccc3e5ab 100644 --- a/decnet/web/dependencies.py +++ b/decnet/web/dependencies.py @@ -37,6 +37,17 @@ oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/login") # Per-request user lookup was the hidden tax behind every authed endpoint — # SELECT users WHERE uuid=? ran once per call, serializing through aiosqlite. # 10s TTL is well below JWT expiry and we invalidate on all user writes. +# +# REVOCATION-WINDOW NOTE (V2.1.8, accept-risk): these per-process caches bound +# how long a *stale* user/username/denylist row can be served. Single-process: +# a role downgrade or password change is reflected within ~_USER_TTL (10s) even +# if the in-process invalidate_user_cache hook is missed; the local write path +# invalidates immediately. Multi-worker (gunicorn/uvicorn --workers>1) gives +# each worker its own cache, so the worst-case cross-worker staleness is also +# ~10s and would need a shared cache (Redis) to collapse to zero. This staleness +# is NOT the authoritative revocation control: JWT bulk-revoke via the user's +# tokens_valid_from cutoff (enforced in _resolve_token) is the hard cutoff and +# is unaffected by these caches. _USER_TTL = 10.0 _user_cache: dict[str, tuple[Optional[dict[str, Any]], float]] = {} _user_cache_lock: Optional[asyncio.Lock] = None @@ -389,6 +400,11 @@ def require_stream_role(*allowed_roles: str): header_token = _bearer_from_header(request) if header_token: _user_uuid, user = await _resolve_token(header_token) + if user.get("must_change_password"): + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Password change required before accessing this resource", + ) if user["role"] not in allowed_roles: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, diff --git a/decnet/web/router/deckies/api_file_drop.py b/decnet/web/router/deckies/api_file_drop.py index 592465ea..62b7ca28 100644 --- a/decnet/web/router/deckies/api_file_drop.py +++ b/decnet/web/router/deckies/api_file_drop.py @@ -75,7 +75,7 @@ async def api_drop_file( content = base64.b64decode(req.content_b64, validate=True) except (ValueError, TypeError) as exc: raise HTTPException( - status_code=400, detail=f"content_b64 is not valid base64: {exc}", + status_code=400, detail="content_b64 is not valid base64", ) from exc container = await _resolve_container_or_4xx(req.decky_name, req.topology_id) diff --git a/decnet/web/router/deckies/api_tarpit.py b/decnet/web/router/deckies/api_tarpit.py index f0e654ae..7dc8defc 100644 --- a/decnet/web/router/deckies/api_tarpit.py +++ b/decnet/web/router/deckies/api_tarpit.py @@ -50,7 +50,8 @@ def _apply_tarpit(veth: str, ports: list[int], delay_ms: int) -> None: for args in steps: r = _tc(*args) if r.returncode != 0: - raise RuntimeError(r.stderr.strip()) + log.warning("tarpit tc apply failed veth=%s cmd=%s stderr=%r", veth, args[0], r.stderr.strip()) + raise RuntimeError("tarpit command failed") for port in ports: r = _tc( @@ -60,7 +61,8 @@ def _apply_tarpit(veth: str, ports: list[int], delay_ms: int) -> None: "flowid", "1:1", ) if r.returncode != 0: - raise RuntimeError(r.stderr.strip()) + log.warning("tarpit tc filter failed veth=%s port=%d stderr=%r", veth, port, r.stderr.strip()) + raise RuntimeError("tarpit command failed") def _remove_tarpit(veth: str) -> bool: @@ -69,7 +71,8 @@ def _remove_tarpit(veth: str) -> bool: if r.returncode != 0: if "Cannot find" in r.stderr or "No such" in r.stderr: return False - raise RuntimeError(r.stderr.strip()) + log.warning("tarpit tc remove failed veth=%s stderr=%r", veth, r.stderr.strip()) + raise RuntimeError("tarpit command failed") return True @@ -126,7 +129,8 @@ async def api_enable_tarpit( try: await asyncio.to_thread(_apply_tarpit, veth, req.ports, req.delay_ms) except RuntimeError as exc: - raise HTTPException(status_code=409, detail=str(exc)) from exc + log.warning("tarpit enable failed decky=%s: %s", decky_name, exc, exc_info=True) + raise HTTPException(status_code=409, detail="tarpit command failed") from exc ports_json = json.dumps(req.ports) await repo.set_tarpit_rule({ @@ -212,7 +216,8 @@ async def api_disable_tarpit( try: await asyncio.to_thread(_remove_tarpit, veth) except RuntimeError as exc: - raise HTTPException(status_code=409, detail=str(exc)) from exc + log.warning("tarpit disable failed decky=%s: %s", decky_name, exc, exc_info=True) + raise HTTPException(status_code=409, detail="tarpit command failed") from exc await repo.delete_tarpit_rule(decky_name) await repo.add_log({ diff --git a/decnet/web/router/health/api_get_health.py b/decnet/web/router/health/api_get_health.py index ab93ca1b..74de1a35 100644 --- a/decnet/web/router/health/api_get_health.py +++ b/decnet/web/router/health/api_get_health.py @@ -62,8 +62,10 @@ async def _check_database_cached() -> ComponentHealth: try: await repo.get_total_logs() _db_component = ComponentHealth(status="ok") - except Exception as exc: - _db_component = ComponentHealth(status="failing", detail=str(exc)) + except Exception: + import logging as _logging + _logging.getLogger("api.get_health").exception("database liveness check failed") + _db_component = ComponentHealth(status="failing", detail="database unavailable") _db_last_check = time.monotonic() return _db_component @@ -95,7 +97,7 @@ async def get_health(user: dict = Depends(require_viewer)) -> Any: detail = "cancelled" else: exc = task.exception() - detail = f"exited: {exc}" if exc else "exited unexpectedly" + detail = "exited unexpectedly" if not exc else "exited with error" components[name] = ComponentHealth(status="failing", detail=detail) else: components[name] = ComponentHealth(status="ok") @@ -112,10 +114,12 @@ async def get_health(user: dict = Depends(require_viewer)) -> Any: await asyncio.to_thread(_docker_client.ping) # type: ignore[union-attr] _docker_healthy = True _docker_detail = "" - except Exception as exc: + except Exception: + import logging as _logging + _logging.getLogger("api.get_health").exception("docker daemon ping failed") _docker_client = None _docker_healthy = False - _docker_detail = str(exc) + _docker_detail = "docker daemon unavailable" _docker_last_check = now if _docker_healthy: diff --git a/decnet/web/router/realism/api_llm.py b/decnet/web/router/realism/api_llm.py index 2b09159a..deed2462 100644 --- a/decnet/web/router/realism/api_llm.py +++ b/decnet/web/router/realism/api_llm.py @@ -145,23 +145,26 @@ async def put_llm_config( try: from decnet.web.db.secrets import encrypt_secret merged["api_key_ciphertext"] = encrypt_secret(str(api_key_raw)) - except RuntimeError as exc: + except RuntimeError: + log.exception("api.realism.put_llm: secret encryption unavailable") raise HTTPException( status_code=500, - detail=f"Secret encryption unavailable: {exc}", - ) from exc + detail="Secret encryption unavailable; check server configuration.", + ) from None try: cfg = LLMConfig(**merged) except Exception as exc: - raise HTTPException(status_code=400, detail=str(exc)) from exc + log.warning("api.realism.put_llm: LLMConfig validation failed: %s", exc) + raise HTTPException(status_code=400, detail="Invalid LLM configuration payload.") from exc try: llm_config.apply(cfg) - except Exception as exc: + except Exception: + log.exception("api.realism.put_llm: backend init failed") raise HTTPException( - status_code=400, detail=f"Backend init failed: {exc}" - ) from exc + status_code=400, detail="Backend init failed; check provider/model settings." + ) from None await repo.set_realism_config(_CONFIG_KEY, json.dumps(merged)) _hydrated = True diff --git a/decnet/web/router/swarm_updates/api_list_host_releases.py b/decnet/web/router/swarm_updates/api_list_host_releases.py index 4683813d..7c8f340d 100644 --- a/decnet/web/router/swarm_updates/api_list_host_releases.py +++ b/decnet/web/router/swarm_updates/api_list_host_releases.py @@ -39,13 +39,14 @@ async def _probe_host(host: dict[str, Any]) -> HostReleaseInfo: try: async with UpdaterClient(host=host) as u: body = await u.health() - except Exception as exc: # noqa: BLE001 + except Exception: # noqa: BLE001 + log.warning("swarm_updates.list probe unreachable host=%s", host.get("name"), exc_info=True) return HostReleaseInfo( host_uuid=host["uuid"], host_name=host["name"], address=host["address"], reachable=False, - detail=f"{type(exc).__name__}: {exc}", + detail="host unreachable", ) releases = body.get("releases") or [] current, previous = _extract_shas(releases) diff --git a/decnet/web/router/swarm_updates/api_push_update.py b/decnet/web/router/swarm_updates/api_push_update.py index 9c948a72..a9df7362 100644 --- a/decnet/web/router/swarm_updates/api_push_update.py +++ b/decnet/web/router/swarm_updates/api_push_update.py @@ -96,10 +96,14 @@ async def _push_one( # Connection drop on update-self is expected and not an error. self_ok = _is_expected_connection_drop(exc) if not self_ok: + log.warning( + "swarm_updates.push self-update transport failure host=%s: %s", + host.get("name"), exc, + ) return PushUpdateResult( host_uuid=host["uuid"], host_name=host["name"], status="self-failed", http_status=r.status_code, sha=sha, - detail=f"agent updated OK but self-update failed: {exc}", + detail="agent updated OK but self-update transport failure", stderr=stderr, ) status = "self-updated" if self_ok else "self-failed" @@ -110,12 +114,12 @@ async def _push_one( detail=body.get("error") or body.get("probe") if isinstance(body, dict) else None, stderr=stderr, ) - except Exception as exc: # noqa: BLE001 + except Exception: # noqa: BLE001 log.exception("swarm_updates.push failed host=%s", host.get("name")) return PushUpdateResult( host_uuid=host["uuid"], host_name=host["name"], status="failed", - detail=f"{type(exc).__name__}: {exc}", + detail="transport failure", ) diff --git a/decnet/web/router/swarm_updates/api_push_update_self.py b/decnet/web/router/swarm_updates/api_push_update_self.py index 4fbb0a4b..03182dae 100644 --- a/decnet/web/router/swarm_updates/api_push_update_self.py +++ b/decnet/web/router/swarm_updates/api_push_update_self.py @@ -56,12 +56,12 @@ async def _push_self_one(host: dict[str, Any], tarball: bytes, sha: str) -> Push http_status=http_status, sha=sha, detail=detail, stderr=stderr, ) - except Exception as exc: # noqa: BLE001 + except Exception: # noqa: BLE001 log.exception("swarm_updates.push_self failed host=%s", host.get("name")) return PushUpdateResult( host_uuid=host["uuid"], host_name=host["name"], status="self-failed", - detail=f"{type(exc).__name__}: {exc}", + detail="transport failure", ) diff --git a/decnet/web/router/swarm_updates/api_rollback_host.py b/decnet/web/router/swarm_updates/api_rollback_host.py index 176cfe05..6ba93608 100644 --- a/decnet/web/router/swarm_updates/api_rollback_host.py +++ b/decnet/web/router/swarm_updates/api_rollback_host.py @@ -49,12 +49,12 @@ async def api_rollback_host( try: async with UpdaterClient(host=host) as u: r = await u.rollback() - except Exception as exc: # noqa: BLE001 + except Exception: # noqa: BLE001 log.exception("swarm_updates.rollback transport failure host=%s", host["name"]) return RollbackResponse( host_uuid=host["uuid"], host_name=host["name"], status="failed", - detail=f"{type(exc).__name__}: {exc}", + detail="transport failure", ) body = r.json() if r.content else {} diff --git a/decnet/web/router/system/api_deployment_mode.py b/decnet/web/router/system/api_deployment_mode.py index 90277d96..574942d2 100644 --- a/decnet/web/router/system/api_deployment_mode.py +++ b/decnet/web/router/system/api_deployment_mode.py @@ -13,7 +13,7 @@ from fastapi import APIRouter, Depends from pydantic import BaseModel from decnet.web.db.repository import BaseRepository -from decnet.web.dependencies import get_repo +from decnet.web.dependencies import get_repo, require_viewer router = APIRouter() @@ -24,9 +24,15 @@ class DeploymentModeResponse(BaseModel): swarm_host_count: int +# Auth-gated (V4.1.6): the response leaks host role + enrolled-worker count, +# which is recon-useful to an unauthenticated attacker. The dashboard only ever +# calls this from inside the post-login app shell (App.tsx gates the whole app +# behind a valid token), so there is no pre-auth UI-mode use case to preserve — +# gate the entire endpoint behind require_viewer rather than splitting it. @router.get("/deployment-mode", response_model=DeploymentModeResponse) async def get_deployment_mode( repo: BaseRepository = Depends(get_repo), + _user: dict = Depends(require_viewer), ) -> DeploymentModeResponse: role = os.environ.get("DECNET_MODE", "master").lower() hosts = 0 diff --git a/decnet/web/router/topology/api_create_blank_topology.py b/decnet/web/router/topology/api_create_blank_topology.py index 0e2c0ed2..e0fc43ea 100644 --- a/decnet/web/router/topology/api_create_blank_topology.py +++ b/decnet/web/router/topology/api_create_blank_topology.py @@ -14,6 +14,7 @@ import json from fastapi import APIRouter, Depends, HTTPException, status from pydantic import BaseModel, Field as PydanticField +from sqlalchemy.exc import IntegrityError from decnet.telemetry import traced as _traced from decnet.topology.allocator import SubnetAllocator, reserved_subnets @@ -62,8 +63,13 @@ async def api_create_blank_topology( "config_snapshot": json.dumps({"blank": True}), } ) - except Exception as exc: # noqa: BLE001 — surface duplicate-name as 409 - raise HTTPException(status_code=409, detail=str(exc)) from exc + except IntegrityError as exc: + # Unique constraint on topologies.name — report the collision without + # leaking the raw DB message. + raise HTTPException( + status_code=409, + detail=f"A topology named {body.name!r} already exists.", + ) from exc # 2. DMZ LAN with auto-allocated subnet try: diff --git a/decnet/web/router/topology/api_tarpit.py b/decnet/web/router/topology/api_tarpit.py index 8943da5a..c93b1f39 100644 --- a/decnet/web/router/topology/api_tarpit.py +++ b/decnet/web/router/topology/api_tarpit.py @@ -76,7 +76,11 @@ async def api_enable_tarpit( try: await asyncio.to_thread(_apply_tarpit, veth, req.ports, req.delay_ms) except RuntimeError as exc: - raise HTTPException(status_code=409, detail=str(exc)) from exc + log.warning( + "tarpit enable failed topology=%s decky=%s: %s", + topology_id, decky_name, exc, exc_info=True, + ) + raise HTTPException(status_code=409, detail="tarpit command failed") from exc db_key = _db_key(topology_id, decky_name) ports_json = json.dumps(req.ports) @@ -175,7 +179,11 @@ async def api_disable_tarpit( try: await asyncio.to_thread(_remove_tarpit, veth) except RuntimeError as exc: - raise HTTPException(status_code=409, detail=str(exc)) from exc + log.warning( + "tarpit disable failed topology=%s decky=%s: %s", + topology_id, decky_name, exc, exc_info=True, + ) + raise HTTPException(status_code=409, detail="tarpit command failed") from exc db_key = _db_key(topology_id, decky_name) await repo.delete_tarpit_rule(db_key) diff --git a/pyproject.toml b/pyproject.toml index e99c6712..00443086 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" name = "decnet" version = "0.2.0" description = "Deception network: deploy honeypot deckies that appear as real LAN hosts" -license = "GPL-3.0-or-later" +license = "AGPL-3.0-or-later" license-files = ["LICENSE"] requires-python = ">=3.11" classifiers = [ diff --git a/tests/api/auth/test_password_policy.py b/tests/api/auth/test_password_policy.py new file mode 100644 index 00000000..b00bd6f0 --- /dev/null +++ b/tests/api/auth/test_password_policy.py @@ -0,0 +1,206 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""V2.1.4 + V2.1.5 password policy tests. + +Covers: +- min_length=12 enforced on CreateUserRequest and ResetUserPasswordRequest +- bcrypt 72-byte limit: multi-byte passwords >72 bytes are rejected (not silently truncated) +""" +import pytest +import httpx +from decnet.env import DECNET_ADMIN_USER, DECNET_ADMIN_PASSWORD + + +# '€' (U+20AC) encodes to 3 UTF-8 bytes. +# 25 × 3 = 75 bytes > 72 — valid char count, over the byte cap. +_OVER_72_BYTES: str = "€" * 25 + +# Exactly at the limit: 24 × 3 = 72 bytes — must be ACCEPTED. +_EXACTLY_72_BYTES: str = "€" * 24 + + +# ─── V2.1.4: min_length=12 on create ──────────────────────────────────────── + + +@pytest.mark.anyio +async def test_create_user_11char_password_rejected( + client: httpx.AsyncClient, auth_token: str +) -> None: + """11-character password must be rejected on user creation (min is 12).""" + resp = await client.post( + "/api/v1/config/users", + json={"username": "shortpwduser", "password": "short11char", "role": "viewer"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + # Schema-guard middleware may surface as 400; FastAPI validation as 422. + assert resp.status_code in (400, 422), ( + f"Expected 400/422 for 11-char password on create, got {resp.status_code}" + ) + + +@pytest.mark.anyio +async def test_create_user_12char_password_accepted( + client: httpx.AsyncClient, auth_token: str +) -> None: + """Exactly 12-character ASCII password must be accepted.""" + resp = await client.post( + "/api/v1/config/users", + json={"username": "minpwduser12", "password": "exactly12chr", "role": "viewer"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200, ( + f"Expected 200 for 12-char password on create, got {resp.status_code}: {resp.text}" + ) + + +# ─── V2.1.4: min_length=12 on reset ───────────────────────────────────────── + + +@pytest.mark.anyio +async def test_reset_password_11char_rejected( + client: httpx.AsyncClient, auth_token: str +) -> None: + """11-character new_password must be rejected on admin password reset.""" + # Create a user to reset + create_resp = await client.post( + "/api/v1/config/users", + json={"username": "resetpolicy1", "password": "securepass123", "role": "viewer"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert create_resp.status_code == 200 + user_uuid = create_resp.json()["uuid"] + + resp = await client.put( + f"/api/v1/config/users/{user_uuid}/reset-password", + json={"new_password": "short11char"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code in (400, 422), ( + f"Expected 400/422 for 11-char password on reset, got {resp.status_code}" + ) + + +@pytest.mark.anyio +async def test_reset_password_12char_accepted( + client: httpx.AsyncClient, auth_token: str +) -> None: + """Exactly 12-character password must be accepted on admin password reset.""" + create_resp = await client.post( + "/api/v1/config/users", + json={"username": "resetpolicy2", "password": "securepass123", "role": "viewer"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert create_resp.status_code == 200 + user_uuid = create_resp.json()["uuid"] + + resp = await client.put( + f"/api/v1/config/users/{user_uuid}/reset-password", + json={"new_password": "exactly12chr"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200, ( + f"Expected 200 for 12-char password on reset, got {resp.status_code}: {resp.text}" + ) + + +# ─── V2.1.5: bcrypt 72-byte rejection on create ───────────────────────────── + + +@pytest.mark.anyio +async def test_create_user_over_72_bytes_rejected( + client: httpx.AsyncClient, auth_token: str +) -> None: + """Password >72 UTF-8 bytes must be rejected on create (bcrypt truncation guard).""" + resp = await client.post( + "/api/v1/config/users", + json={"username": "bytepolicyusr", "password": _OVER_72_BYTES, "role": "viewer"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code in (400, 422), ( + f"Expected 400/422 for >{72}-byte password on create, got {resp.status_code}" + ) + + +@pytest.mark.anyio +async def test_create_user_exactly_72_bytes_accepted( + client: httpx.AsyncClient, auth_token: str +) -> None: + """Password of exactly 72 UTF-8 bytes must be accepted (at the limit, not over).""" + resp = await client.post( + "/api/v1/config/users", + json={"username": "byteedgeuser1", "password": _EXACTLY_72_BYTES, "role": "viewer"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200, ( + f"Expected 200 for exactly-72-byte password on create, got {resp.status_code}: {resp.text}" + ) + + +# ─── V2.1.5: bcrypt 72-byte rejection on reset ────────────────────────────── + + +@pytest.mark.anyio +async def test_reset_password_over_72_bytes_rejected( + client: httpx.AsyncClient, auth_token: str +) -> None: + """new_password >72 UTF-8 bytes must be rejected on admin password reset.""" + create_resp = await client.post( + "/api/v1/config/users", + json={"username": "byteresetuser", "password": "securepass123", "role": "viewer"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert create_resp.status_code == 200 + user_uuid = create_resp.json()["uuid"] + + resp = await client.put( + f"/api/v1/config/users/{user_uuid}/reset-password", + json={"new_password": _OVER_72_BYTES}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code in (400, 422), ( + f"Expected 400/422 for >{72}-byte new_password on reset, got {resp.status_code}" + ) + + +# ─── V2.1.5: bcrypt 72-byte rejection on change-password ──────────────────── + + +@pytest.mark.anyio +async def test_change_password_new_over_72_bytes_rejected( + client: httpx.AsyncClient, +) -> None: + """new_password >72 UTF-8 bytes must be rejected on change-password.""" + login_resp = await client.post( + "/api/v1/auth/login", + json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}, + ) + token = login_resp.json()["access_token"] + + resp = await client.post( + "/api/v1/auth/change-password", + json={"old_password": DECNET_ADMIN_PASSWORD, "new_password": _OVER_72_BYTES}, + headers={"Authorization": f"Bearer {token}"}, + ) + assert resp.status_code in (400, 422), ( + f"Expected 400/422 for >{72}-byte new_password on change-password, got {resp.status_code}" + ) + + +@pytest.mark.anyio +async def test_change_password_old_over_72_bytes_rejected( + client: httpx.AsyncClient, +) -> None: + """old_password >72 UTF-8 bytes must be rejected (no point checking against hash).""" + login_resp = await client.post( + "/api/v1/auth/login", + json={"username": DECNET_ADMIN_USER, "password": DECNET_ADMIN_PASSWORD}, + ) + token = login_resp.json()["access_token"] + + resp = await client.post( + "/api/v1/auth/change-password", + json={"old_password": _OVER_72_BYTES, "new_password": "new_secure_password"}, + headers={"Authorization": f"Bearer {token}"}, + ) + assert resp.status_code in (400, 422), ( + f"Expected 400/422 for >{72}-byte old_password on change-password, got {resp.status_code}" + ) diff --git a/tests/api/auth/test_sse_ticket.py b/tests/api/auth/test_sse_ticket.py index c96b6eb5..d9a01a3b 100644 --- a/tests/api/auth/test_sse_ticket.py +++ b/tests/api/auth/test_sse_ticket.py @@ -80,6 +80,43 @@ async def test_sse_ticket_endpoint_mints_and_redeems( assert "uuid" in identity and identity["role"] in ("admin", "viewer") +@pytest.mark.anyio +async def test_sse_header_jwt_rejects_must_change_password(monkeypatch) -> None: + """V4.1.5: the header-JWT SSE branch must enforce must_change_password the + same way require_role does. A user blocked from every REST endpoint must not + be able to subscribe to live SSE streams with their existing token.""" + async def _fake_resolve(token: str): + return "user-1", {"uuid": "user-1", "role": "viewer", "must_change_password": True} + + monkeypatch.setattr(deps, "_resolve_token", _fake_resolve) + + class _Req: + headers = {"Authorization": "Bearer some.jwt.token"} + + gate = deps.require_stream_role("viewer", "admin") + with pytest.raises(HTTPException) as exc: + await gate(_Req(), ticket=None) # type: ignore[arg-type] + assert exc.value.status_code == 403 + assert "Password change required" in exc.value.detail + + +@pytest.mark.anyio +async def test_sse_header_jwt_allows_cleared_user(monkeypatch) -> None: + """Control: a user who has cleared must_change_password passes the header-JWT + SSE gate (proves the new guard didn't break the happy path).""" + async def _fake_resolve(token: str): + return "user-1", {"uuid": "user-1", "role": "viewer", "must_change_password": False} + + monkeypatch.setattr(deps, "_resolve_token", _fake_resolve) + + class _Req: + headers = {"Authorization": "Bearer some.jwt.token"} + + gate = deps.require_stream_role("viewer", "admin") + user = await gate(_Req(), ticket=None) # type: ignore[arg-type] + assert user["uuid"] == "user-1" + + def test_raw_jwt_in_sse_query_rejected() -> None: """V3.1.1: a raw JWT is not a valid opaque ticket — _redeem_sse_ticket rejects any token that wasn't minted by mint_sse_ticket (unknown key → 401).""" diff --git a/tests/api/deckies/test_tarpit_leak.py b/tests/api/deckies/test_tarpit_leak.py new file mode 100644 index 00000000..485d455f --- /dev/null +++ b/tests/api/deckies/test_tarpit_leak.py @@ -0,0 +1,275 @@ +# SPDX-License-Identifier: AGPL-3.0-or-later +"""V7.1.2 regression: tarpit endpoints must NOT leak raw tc stderr to API callers. + +Covers both fleet (deckies) and topology tarpit paths. Forces a tc failure +by monkeypatching _apply_tarpit / _remove_tarpit to raise RuntimeError with +a realistic iproute2/kernel error string (veth name, qdisc id, errno text), +then asserts: + + 1. Status code is 409. + 2. The response body detail is a generic string. + 3. The response body contains NO raw tc output — no veth names, no + "RTNETLINK", no kernel errno strings, no qdisc identifiers. +""" +from __future__ import annotations + +import pytest +import httpx + +from decnet.web.router.deckies import api_tarpit as _deckies_tarpit +from decnet.web.router.topology import api_tarpit as _topology_tarpit + +_FLEET_URL = "/api/v1/deckies/web1/tarpit" +_TOPO_URL = "/api/v1/topologies/topo1/deckies/web1/tarpit" + +# Realistic iproute2 stderr fragments — must NOT appear in any API response. +_TC_STDERR_FRAGMENTS = [ + "RTNETLINK", + "veth", + "qdisc", + "Cannot find device", + "No such file or directory", + "Error: Exclusivity flag on, cannot modify.", + "NLMSG_ERROR", + "errno", +] + +# Realistic docker exec / runtime stderr fragments that must NOT leak via the +# get_container_veth LookupError path (V7.1.2 — 404 path). +_DOCKER_STDERR_FRAGMENTS = [ + "Error response from daemon", + "No such container", + "OCI runtime", + "exec failed", + "permission denied", +] + +_TARPIT_BODY = {"ports": [22, 80], "delay_ms": 500} + + +def _hdr(token: str) -> dict[str, str]: + return {"Authorization": f"Bearer {token}"} + + +def _assert_no_tc_leak(body: dict) -> None: + """Assert that the response detail contains no raw tc/kernel output.""" + detail = str(body.get("detail", "")) + for fragment in _TC_STDERR_FRAGMENTS: + assert fragment not in detail, ( + f"V7.1.2 leak: raw tc stderr fragment {fragment!r} found in API response: {detail!r}" + ) + + +# ── fleet / deckies ──────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_fleet_enable_tarpit_tc_failure_returns_generic_detail( + client: httpx.AsyncClient, + auth_token: str, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """POST fleet tarpit with forced tc failure yields 409 + generic detail.""" + + def _fake_apply(veth: str, ports: list[int], delay_ms: int) -> None: + raise RuntimeError( + "RTNETLINK answers: File exists\n" + f"Error: Exclusivity flag on, cannot modify. veth={veth} qdisc=1:" + ) + + monkeypatch.setattr(_deckies_tarpit, "_apply_tarpit", _fake_apply) + + # Also patch get_container_veth so the 404 path doesn't fire first. + monkeypatch.setattr( + _deckies_tarpit, + "get_container_veth", + lambda name: f"veth-{name}-abc123", + ) + + res = await client.post(_FLEET_URL, json=_TARPIT_BODY, headers=_hdr(auth_token)) + assert res.status_code == 409, res.text + _assert_no_tc_leak(res.json()) + + +@pytest.mark.asyncio +async def test_fleet_disable_tarpit_tc_failure_returns_generic_detail( + client: httpx.AsyncClient, + auth_token: str, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """DELETE fleet tarpit with forced tc failure yields 409 + generic detail.""" + + def _fake_remove(veth: str) -> bool: + raise RuntimeError( + f"RTNETLINK answers: No such file or directory\nveth={veth}" + ) + + monkeypatch.setattr(_deckies_tarpit, "_remove_tarpit", _fake_remove) + monkeypatch.setattr( + _deckies_tarpit, + "get_container_veth", + lambda name: f"veth-{name}-abc123", + ) + + res = await client.delete(_FLEET_URL, headers=_hdr(auth_token)) + assert res.status_code == 409, res.text + _assert_no_tc_leak(res.json()) + + +# ── topology ─────────────────────────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_topology_enable_tarpit_tc_failure_returns_generic_detail( + client: httpx.AsyncClient, + auth_token: str, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """POST topology tarpit with forced tc failure yields 409 + generic detail.""" + + def _fake_apply(veth: str, ports: list[int], delay_ms: int) -> None: + raise RuntimeError( + f"RTNETLINK answers: File exists\nveth={veth} qdisc=1: NLMSG_ERROR errno=17" + ) + + monkeypatch.setattr(_topology_tarpit, "_apply_tarpit", _fake_apply) + + async def _fake_resolve(repo, decky_name, *, topology_id): + return f"decnet_t_{decky_name}" + + monkeypatch.setattr( + _topology_tarpit, + "resolve_decky_container", + _fake_resolve, + ) + monkeypatch.setattr( + _topology_tarpit, + "get_container_veth", + lambda name: f"veth-{name}-abc123", + ) + + res = await client.post(_TOPO_URL, json=_TARPIT_BODY, headers=_hdr(auth_token)) + assert res.status_code == 409, res.text + _assert_no_tc_leak(res.json()) + + +@pytest.mark.asyncio +async def test_topology_disable_tarpit_tc_failure_returns_generic_detail( + client: httpx.AsyncClient, + auth_token: str, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """DELETE topology tarpit with forced tc failure yields 409 + generic detail.""" + + def _fake_remove(veth: str) -> bool: + raise RuntimeError( + f"RTNETLINK answers: No such file or directory\nveth={veth} errno=2" + ) + + monkeypatch.setattr(_topology_tarpit, "_remove_tarpit", _fake_remove) + + async def _fake_resolve(repo, decky_name, *, topology_id): + return f"decnet_t_{decky_name}" + + monkeypatch.setattr( + _topology_tarpit, + "resolve_decky_container", + _fake_resolve, + ) + monkeypatch.setattr( + _topology_tarpit, + "get_container_veth", + lambda name: f"veth-{name}-abc123", + ) + + res = await client.delete(_TOPO_URL, headers=_hdr(auth_token)) + assert res.status_code == 409, res.text + _assert_no_tc_leak(res.json()) + + +# ── veth LookupError 404 path (V7.1.2 — docker stderr must not leak) ──────── + + +def _assert_no_docker_stderr_leak(body: dict) -> None: + """Assert that the response detail contains no raw docker/runtime output.""" + detail = str(body.get("detail", "")) + for fragment in _DOCKER_STDERR_FRAGMENTS: + assert fragment not in detail, ( + f"V7.1.2 leak: raw docker stderr fragment {fragment!r} found in 404 detail: {detail!r}" + ) + # The detail must NOT contain a colon-separated suffix (i.e. no ': ') + # — generic message ends with 'not reachable', nothing after. + assert "Error response from daemon" not in detail + assert "OCI runtime" not in detail + + +@pytest.mark.asyncio +async def test_fleet_enable_tarpit_veth_failure_does_not_leak_stderr( + client: httpx.AsyncClient, + auth_token: str, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """POST fleet tarpit: veth LookupError 404 must not expose docker stderr.""" + + def _fake_veth(name: str) -> str: + raise LookupError(f"container {name!r} not reachable") + + monkeypatch.setattr(_deckies_tarpit, "get_container_veth", _fake_veth) + + res = await client.post(_FLEET_URL, json=_TARPIT_BODY, headers=_hdr(auth_token)) + assert res.status_code == 404, res.text + body = res.json() + detail = str(body.get("detail", "")) + # Generic message present, no docker runtime fragments + assert "not reachable" in detail + _assert_no_docker_stderr_leak(body) + # Specifically assert the colon+stderr suffix is absent + assert ":" not in detail.split("not reachable", 1)[-1] + + +@pytest.mark.asyncio +async def test_fleet_disable_tarpit_veth_failure_does_not_leak_stderr( + client: httpx.AsyncClient, + auth_token: str, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """DELETE fleet tarpit: veth LookupError 404 must not expose docker stderr.""" + + def _fake_veth(name: str) -> str: + raise LookupError(f"container {name!r} not reachable") + + monkeypatch.setattr(_deckies_tarpit, "get_container_veth", _fake_veth) + + res = await client.delete(_FLEET_URL, headers=_hdr(auth_token)) + assert res.status_code == 404, res.text + body = res.json() + detail = str(body.get("detail", "")) + assert "not reachable" in detail + _assert_no_docker_stderr_leak(body) + assert ":" not in detail.split("not reachable", 1)[-1] + + +@pytest.mark.asyncio +async def test_topology_enable_tarpit_veth_failure_does_not_leak_stderr( + client: httpx.AsyncClient, + auth_token: str, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """POST topology tarpit: veth LookupError 404 must not expose docker stderr.""" + + def _fake_veth(name: str) -> str: + raise LookupError(f"container {name!r} not reachable") + + async def _fake_resolve(repo, decky_name, *, topology_id): + return f"decnet_t_{decky_name}" + + monkeypatch.setattr(_topology_tarpit, "resolve_decky_container", _fake_resolve) + monkeypatch.setattr(_topology_tarpit, "get_container_veth", _fake_veth) + + res = await client.post(_TOPO_URL, json=_TARPIT_BODY, headers=_hdr(auth_token)) + assert res.status_code == 404, res.text + body = res.json() + detail = str(body.get("detail", "")) + assert "not reachable" in detail + _assert_no_docker_stderr_leak(body) + assert ":" not in detail.split("not reachable", 1)[-1] diff --git a/tests/api/fleet/test_deploy_automode.py b/tests/api/fleet/test_deploy_automode.py index 40c471ff..ea27ad8d 100644 --- a/tests/api/fleet/test_deploy_automode.py +++ b/tests/api/fleet/test_deploy_automode.py @@ -190,3 +190,15 @@ async def test_deployment_mode_endpoint(client, auth_token, monkeypatch): assert body["role"] == "master" assert body["mode"] == "unihost" assert body["swarm_host_count"] == 0 + + +@pytest.mark.anyio +async def test_deployment_mode_endpoint_requires_auth(client): + # V4.1.6: the response leaks host role + enrolled-worker count, so the + # endpoint must not be reachable without a valid viewer/admin JWT. The + # dashboard only ever calls it from inside the post-login app shell. + resp = await client.get("/api/v1/system/deployment-mode") + assert resp.status_code == 401 + # Body must not leak the recon fields on the unauthenticated path. + assert "role" not in resp.text + assert "swarm_host_count" not in resp.text diff --git a/tests/api/health/test_get_health.py b/tests/api/health/test_get_health.py index be3b9e84..5cdb93ea 100644 --- a/tests/api/health/test_get_health.py +++ b/tests/api/health/test_get_health.py @@ -5,14 +5,16 @@ from unittest.mock import AsyncMock, MagicMock, patch import httpx import pytest -from decnet.web.router.health.api_get_health import _reset_docker_cache +from decnet.web.router.health.api_get_health import _reset_docker_cache, _reset_db_cache @pytest.fixture(autouse=True) -def _clear_docker_cache(): +def _clear_health_caches(): _reset_docker_cache() + _reset_db_cache() yield _reset_docker_cache() + _reset_db_cache() @pytest.mark.anyio @@ -144,7 +146,9 @@ async def test_health_docker_failing(client: httpx.AsyncClient, auth_token: str) comp = resp.json()["components"]["docker"] assert comp["status"] == "failing" - assert "connection refused" in comp["detail"] + # Internal exception message must NOT be in the detail (V7.1.2 fix). + assert "connection refused" not in comp["detail"] + assert comp["detail"] == "docker daemon unavailable" @pytest.mark.anyio @@ -161,7 +165,9 @@ async def test_health_database_failing(client: httpx.AsyncClient, auth_token: st comp = resp.json()["components"]["database"] assert comp["status"] == "failing" - assert "disk full" in comp["detail"] + # Internal exception message must NOT be in the detail (V7.1.2 fix). + assert "disk full" not in comp["detail"] + assert comp["detail"] == "database unavailable" @pytest.mark.anyio @@ -181,7 +187,50 @@ async def test_health_worker_exited_with_exception(client: httpx.AsyncClient, au comp = resp.json()["components"]["collector_worker"] assert comp["status"] == "failing" - assert "segfault" in comp["detail"] + # Internal exception message must NOT be in the detail (V7.1.2 fix). + assert "segfault" not in comp["detail"] + assert comp["detail"] == "exited with error" + + +# ── V7.1.2: no internal exception detail in response body ──────────────────── + +@pytest.mark.anyio +async def test_health_db_failure_does_not_leak_exception_class( + client: httpx.AsyncClient, auth_token: str, +) -> None: + """V7.1.2: DB exception class/message must not appear in the HTTP response.""" + from decnet.web.dependencies import repo as real_repo + + with patch("decnet.web.api.get_background_tasks") as mock_tasks, \ + patch("docker.from_env") as mock_docker, \ + patch.object( + real_repo, "get_total_logs", + new=AsyncMock(side_effect=OSError("[Errno 28] No space left on device")), + ): + _make_all_running(mock_tasks) + mock_docker.return_value = MagicMock() + resp = await client.get("/api/v1/health", headers={"Authorization": f"Bearer {auth_token}"}) + + detail = resp.json()["components"]["database"].get("detail", "") + assert "Errno" not in detail + assert "No space left" not in detail + assert "OSError" not in detail + + +@pytest.mark.anyio +async def test_health_docker_failure_does_not_leak_exception_class( + client: httpx.AsyncClient, auth_token: str, +) -> None: + """V7.1.2: Docker socket exception must not appear in the HTTP response.""" + with patch("decnet.web.api.get_background_tasks") as mock_tasks, \ + patch("docker.from_env", side_effect=OSError("[Errno 111] Connection refused")): + _make_all_running(mock_tasks) + resp = await client.get("/api/v1/health", headers={"Authorization": f"Bearer {auth_token}"}) + + detail = resp.json()["components"]["docker"].get("detail", "") + assert "Errno" not in detail + assert "Connection refused" not in detail + assert "OSError" not in detail # ── Helpers ────────────────────────────────────────────────────────────────── diff --git a/tests/api/swarm_updates/test_list_host_releases.py b/tests/api/swarm_updates/test_list_host_releases.py index aade6f15..483b2009 100644 --- a/tests/api/swarm_updates/test_list_host_releases.py +++ b/tests/api/swarm_updates/test_list_host_releases.py @@ -34,7 +34,10 @@ async def test_admin_lists_reachable_and_unreachable_hosts( assert hosts["alpha"]["current_sha"] == "aaaa111" assert hosts["alpha"]["previous_sha"] == "0000000" assert hosts["beta"]["reachable"] is False - assert "TLS handshake" in hosts["beta"]["detail"] + # V7.1.2: internal exception text must not leak to the response body. + assert "TLS handshake" not in hosts["beta"]["detail"] + assert "RuntimeError" not in hosts["beta"]["detail"] + assert hosts["beta"]["detail"] == "host unreachable" @pytest.mark.anyio diff --git a/tests/api/swarm_updates/test_push_update.py b/tests/api/swarm_updates/test_push_update.py index ccfbdab4..7102a16b 100644 --- a/tests/api/swarm_updates/test_push_update.py +++ b/tests/api/swarm_updates/test_push_update.py @@ -2,8 +2,28 @@ """POST /api/v1/swarm-updates/push — happy paths, rollback, validation.""" from __future__ import annotations +import re + import pytest +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +_INTERNAL_LEAK_RE = re.compile( + r"Errno|ConnectionRefused|TimeoutError|OSError|RuntimeError|Exception|" + r"Traceback|\w+Error:\s|\w+Exception:\s|File \"|line \d+", + re.IGNORECASE, +) + +def _assert_no_internal_detail(detail: str | None) -> None: + """Assert the detail string does not contain any internal exception noise.""" + if detail is None: + return + assert not _INTERNAL_LEAK_RE.search(detail), ( + f"V7.1.2: internal exception detail leaked to client: {detail!r}" + ) + @pytest.mark.anyio async def test_push_to_single_host_success(client, auth_token, add_host, fake_updater): @@ -62,6 +82,49 @@ async def test_push_all_aggregates_mixed_results(client, auth_token, add_host, f assert statuses == {"alpha": "updated", "beta": "failed"} +@pytest.mark.anyio +async def test_transport_exception_detail_does_not_leak_internals( + client, auth_token, add_host, fake_updater, +): + """V7.1.2: a raw transport exception must never appear in the response body.""" + await add_host("alpha", "10.0.0.1") + fake_updater["client"].update_responses = { + "alpha": OSError("[Errno 111] Connection refused"), + } + + resp = await client.post( + "/api/v1/swarm-updates/push", + headers={"Authorization": f"Bearer {auth_token}"}, + json={"all": True}, + ) + assert resp.status_code == 200 + result = resp.json()["results"][0] + assert result["status"] == "failed" + _assert_no_internal_detail(result.get("detail")) + + +@pytest.mark.anyio +async def test_include_self_failure_detail_does_not_leak_internals( + client, auth_token, add_host, fake_updater, +): + """V7.1.2: include_self transport failure must not expose exception class/msg.""" + await add_host("alpha", "10.0.0.1") + fake_updater["client"].update_self_responses = { + "alpha": OSError("[Errno 104] Connection reset by peer"), + } + + resp = await client.post( + "/api/v1/swarm-updates/push", + headers={"Authorization": f"Bearer {auth_token}"}, + json={"all": True, "include_self": True}, + ) + assert resp.status_code == 200 + result = resp.json()["results"][0] + # status is self-failed (non-expected drop) + assert result["status"] == "self-failed" + _assert_no_internal_detail(result.get("detail")) + + @pytest.mark.anyio async def test_tarball_built_once_across_multi_host_push( client, auth_token, add_host, fake_updater, monkeypatch, diff --git a/tests/api/swarm_updates/test_rollback_host.py b/tests/api/swarm_updates/test_rollback_host.py index 63639e1a..e416ba57 100644 --- a/tests/api/swarm_updates/test_rollback_host.py +++ b/tests/api/swarm_updates/test_rollback_host.py @@ -50,7 +50,10 @@ async def test_rollback_transport_failure_reported(client, auth_token, add_host, assert resp.status_code == 200 body = resp.json() assert body["status"] == "failed" - assert "TLS handshake" in body["detail"] + # V7.1.2: internal exception text must not leak to the response body. + assert "TLS handshake" not in body["detail"] + assert "RuntimeError" not in body["detail"] + assert body["detail"] == "transport failure" @pytest.mark.anyio diff --git a/tests/clustering/test_clusterer_worker.py b/tests/clustering/test_clusterer_worker.py index f45639ad..6c843090 100644 --- a/tests/clustering/test_clusterer_worker.py +++ b/tests/clustering/test_clusterer_worker.py @@ -181,3 +181,117 @@ async def test_clusterer_registered_in_cli(): """`decnet clusterer` is registered as a master-only command.""" from decnet.cli.gating import MASTER_ONLY_COMMANDS assert "clusterer" in MASTER_ONLY_COMMANDS + + +@pytest.mark.anyio +async def test_wake_during_tick_is_not_lost(repo): + """BUG-14 regression: wake.clear() must run BEFORE wake.wait(), not after. + + The worker loop pattern: + + Fixed (after fix): tick → clear → wait ← current code + Buggy (before fix): tick → wait → clear + + The race in the buggy pattern: a wake.set() could arrive from a _wake_on + background task between wait() returning and clear() executing. In asyncio + the task switch requires an ``await``; the original code had + ``await _publish_result`` between wait() and clear(), providing a real + window. The fix closes this by moving clear() to run immediately after + tick (before wait), so there is no window between wait() returning and the + next clear(). + + **Structural test (red-before / green-after, deterministic):** + We intercept the internal ``wake`` event's ``clear()`` and ``wait()`` + methods to record their invocation order, then assert that every + ``clear`` call is immediately followed by a ``wait`` (never preceded by + one). Reverting the worker to the buggy ``wait → clear`` order produces + ``("wait", "clear")`` consecutive pairs, which the assertion catches + deterministically without relying on wall-clock timing races. + """ + import unittest.mock as _mock + + captured_wake: list[asyncio.Event] = [] + call_log: list[str] = [] + + _orig_event_cls = asyncio.Event + + class _LoggingEvent(_orig_event_cls): # type: ignore[misc] + """Subclass captures the first event created (the wake event) and + logs clear()/wait() calls so we can verify their relative order.""" + + def __init__(self) -> None: + super().__init__() + if not captured_wake: + captured_wake.append(self) + + def clear(self) -> None: + if captured_wake and self is captured_wake[0]: + call_log.append("clear") + super().clear() + + async def wait(self) -> bool: # type: ignore[override] + if captured_wake and self is captured_wake[0]: + call_log.append("wait") + return await super().wait() + + class _SimpleTicker(Clusterer): + name = "bug14_ticker" + + async def tick(self, _repo) -> ClusterResult: + return ClusterResult() + + shutdown = asyncio.Event() + + with _mock.patch("decnet.clustering.worker.asyncio.Event", _LoggingEvent): + task = asyncio.create_task( + run_clusterer_loop( + repo, + poll_interval_secs=0.1, + clusterer=_SimpleTicker(), + shutdown=shutdown, + ) + ) + # Run for long enough to accumulate several clear/wait pairs. + await asyncio.sleep(0.5) + shutdown.set() + await asyncio.wait_for(task, timeout=2.0) + + # Must have enough events to be meaningful. + assert len(call_log) >= 4, ( + f"BUG-14: too few clear/wait calls recorded ({call_log!r}); " + "loop may not have run or event capture failed" + ) + + # Fixed invariant: every loop iteration runs clear() BEFORE wait(). + # The resulting call_log for the fixed code is: clear, wait, clear, wait, ... + # For the buggy code (wait → clear) the log would be: wait, clear, wait, clear, ... + # + # We verify TWO conditions that together guarantee the fixed order: + # + # 1. The log starts with "clear" — the very first thing after tick is a clear. + # Buggy code starts with "wait" (wait ran first in the original loop). + # + # 2. Within each (clear, wait) pair at positions (2k, 2k+1), clear always + # precedes wait. We check that no "clear" appears at an ODD index (which + # would mean a clear followed another clear, or a wait appeared before + # the next clear). + assert call_log[0] == "clear", ( + f"BUG-14 regression: first wake call was {call_log[0]!r}, expected 'clear'. " + f"Full log: {call_log!r}. The worker is using the buggy 'wait-then-clear' " + "order — wake.clear() must execute BEFORE wake.wait() each iteration." + ) + # Verify the alternating pattern holds: indices 0,2,4,... should be "clear" + # and indices 1,3,5,... should be "wait". + for idx, call in enumerate(call_log): + if idx % 2 == 0: + assert call == "clear", ( + f"BUG-14 regression: expected 'clear' at position {idx} but got " + f"{call!r} in log {call_log!r}. This indicates the loop is NOT " + "using the fixed 'clear → wait' order within each iteration." + ) + else: + assert call == "wait", ( + f"BUG-14 regression: expected 'wait' at position {idx} but got " + f"{call!r} in log {call_log!r}. This indicates the loop is NOT " + "using the fixed 'clear → wait' order within each iteration." + ) diff --git a/tests/conftest.py b/tests/conftest.py index 460ce485..bc852e8f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,6 +16,11 @@ os.environ.setdefault("DECNET_LOG_FILE", os.path.join(_TEST_LOG_DIR, "decnet.log os.environ.setdefault("DECNET_INGEST_LOG_FILE", os.path.join(_TEST_LOG_DIR, "decnet.log")) os.environ.setdefault("DECNET_AGENT_LOG_FILE", os.path.join(_TEST_LOG_DIR, "agent.log")) +# Explicit test-harness flag: tells env._require_env to skip secret-strength +# validation so the suite can run with weak/short secrets. This is the ONLY +# bypass — it replaced the old "any PYTEST* var present" fail-open check (V2.1.7). +os.environ["DECNET_TESTING"] = "1" + os.environ["DECNET_JWT_SECRET"] = "stable-test-secret-key-at-least-32-chars-long" os.environ["DECNET_ADMIN_PASSWORD"] = "test-password-123" os.environ["DECNET_DEVELOPER"] = "true" diff --git a/tests/core/test_env_secrets.py b/tests/core/test_env_secrets.py index 6de921f9..b12fd799 100644 --- a/tests/core/test_env_secrets.py +++ b/tests/core/test_env_secrets.py @@ -51,13 +51,34 @@ def test_admin_password_strong_value_passes(monkeypatch: pytest.MonkeyPatch) -> assert val == "a-strong-unique-password" -def test_pytest_short_circuit_returns_value_unchecked() -> None: - # Under live pytest (PYTEST_* present), _require_env returns the configured - # value without the production checks — documents why the dev loop is safe. - # conftest sets a strong value, so this also proves lazy resolution works. +def test_testing_flag_short_circuit_returns_value_unchecked() -> None: + # Under the test harness (DECNET_TESTING=1, set in conftest), _require_env + # returns the configured value without the production checks — documents + # why the dev loop is safe. conftest sets a strong value, so this also + # proves lazy resolution works. assert envmod._require_env("DECNET_ADMIN_PASSWORD") == "test-password-123" +def test_pytest_var_leak_does_not_bypass_validation(monkeypatch: pytest.MonkeyPatch) -> None: + # V2.1.7 regression: a leaked PYTEST_* env var must NOT disable strength + # validation. With DECNET_TESTING unset, a known-bad / short secret is + # still rejected even though a PYTEST_* var is present. + monkeypatch.setattr( + envmod.os, + "environ", + {"PYTEST_CURRENT_TEST": "x", "DECNET_ADMIN_PASSWORD": "admin"}, + ) + with pytest.raises(ValueError): + envmod._require_env("DECNET_ADMIN_PASSWORD") + monkeypatch.setattr( + envmod.os, + "environ", + {"PYTEST_CURRENT_TEST": "x", "DECNET_ADMIN_PASSWORD": "short1"}, + ) + with pytest.raises(ValueError): + envmod._require_env("DECNET_ADMIN_PASSWORD") + + def test_lazy_getattr_resolves_admin_password() -> None: # Accessing the attribute (not a module global anymore) routes through # __getattr__ -> _require_env. diff --git a/tests/db/test_credentials.py b/tests/db/test_credentials.py index a38954a7..04138cbd 100644 --- a/tests/db/test_credentials.py +++ b/tests/db/test_credentials.py @@ -8,6 +8,7 @@ from pathlib import Path import pytest from decnet.web.db.factory import get_repository +from decnet.web.db.models import Credential @pytest.fixture @@ -167,3 +168,103 @@ async def test_filters(repo) -> None: assert len(rows) == 1 and rows[0]["service"] == "ssh" assert await repo.get_total_credentials(service="ssh") == 1 assert await repo.get_total_credentials() == 2 + + +@pytest.mark.anyio +async def test_concurrent_upsert_hits_integrity_retry_branch( + repo, monkeypatch +) -> None: + """BUG-12 regression: deterministically exercise the IntegrityError + retry branch in ``upsert_credential``. + + The prior asyncio.gather test proved nothing — aiosqlite serializes + both calls through one worker thread, so the second's dedup SELECT + runs only AFTER the first commits and takes the 'existing is not None' + fast path. The except-IntegrityError handler NEVER executed; the test + passed with or without the fix. + + Here we force the race deterministically: the first upsert creates the + row normally. For the second upsert we monkeypatch the module-level + ``select`` so its FIRST call (the dedup SELECT) yields a statement that + matches NOTHING — simulating two callers who both saw 'not found'. The + second upsert then attempts an INSERT that hits the UNIQUE constraint + → IntegrityError → rollback → re-SELECT (a fresh, un-poisoned ``select`` + call) finds the winner row → returns its id + increments attempt_count. + + Red-before/green-after: if the ``except IntegrityError`` handler is + removed, the IntegrityError propagates out of the second upsert and + this test fails (raises instead of returning a matching id). + """ + from decnet.web.db.sqlmodel_repo.credentials import _core + + payload = { + "attacker_ip": "10.0.0.99", + "decky_name": "decky-concurrent", + "service": "ssh", + "principal": "root", + "secret_sha256": _sha256("racepassword"), + "secret_b64": "cmFjZXBhc3N3b3Jk", + "secret_printable": "racepassword", + "fields": {}, + } + + # First upsert: lands the row normally. + id_a = await repo.upsert_credential(payload) + + # Poison ONLY the first select() call of the next upsert so the dedup + # SELECT matches nothing (the simulated race). All later select() calls + # — including the post-IntegrityError re-SELECT — behave normally. + real_select = _core.select + calls = {"n": 0} + + def _poisoned_select(*args, **kwargs): + stmt = real_select(*args, **kwargs) + calls["n"] += 1 + if calls["n"] == 1: + # Append an always-false predicate so the dedup SELECT returns + # None even though the row exists → forces the INSERT path. + stmt = stmt.where(Credential.id == -1) + return stmt + + monkeypatch.setattr(_core, "select", _poisoned_select) + + # Second upsert: dedup SELECT misses → INSERT → IntegrityError → retry. + id_b = await repo.upsert_credential(payload) + + monkeypatch.undo() + + assert id_a == id_b, "retry branch must return the existing winner's id" + rows = await repo.get_credentials() + assert len(rows) == 1, f"expected 1 row, got {len(rows)} (duplicate inserts)" + assert rows[0]["attempt_count"] == 2, ( + "retry branch must increment attempt_count on the winner row" + ) + + +@pytest.mark.anyio +async def test_none_and_empty_principal_canonicalize_to_one_row(repo) -> None: + """BUG-12 canonicalization: principal=None and principal='' canonicalize + to the SAME principal_key ('') and, with an otherwise-identical dedup + tuple, must dedup to ONE row — not crash on the UNIQUE constraint. + + Before the fix the dedup SELECT distinguished None from '' (it branched + on ``is_(None)`` vs ``== principal``) while the constraint keyed on + principal_key='' for both → the second upsert's SELECT missed, the + INSERT collided, and the re-SELECT used the wrong (mismatched) filter + → re-raise / crash. Now SELECT and constraint agree on principal_key. + """ + base = { + "attacker_ip": "10.0.0.7", + "decky_name": "decky-canon", + "service": "ssh", + "secret_sha256": _sha256("hunter2"), + "secret_b64": "aHVudGVyMg==", + "secret_printable": "hunter2", + "fields": {}, + } + id_none = await repo.upsert_credential({**base, "principal": None}) + id_empty = await repo.upsert_credential({**base, "principal": ""}) + assert id_none == id_empty, "None and '' must dedup to the same row" + rows = await repo.get_credentials() + assert len(rows) == 1, f"expected 1 row, got {len(rows)}" + assert rows[0]["attempt_count"] == 2 diff --git a/tests/intel/test_worker.py b/tests/intel/test_worker.py index 5ace57d6..f0a2ad2c 100644 --- a/tests/intel/test_worker.py +++ b/tests/intel/test_worker.py @@ -19,7 +19,7 @@ from typing import Optional import pytest from decnet.intel.base import IntelProvider, IntelResult -from decnet.intel.worker import run_intel_loop, _aggregate +from decnet.intel.worker import run_intel_loop, _aggregate, _enrich_one from decnet.web.db.factory import get_repository @@ -206,6 +206,54 @@ async def test_provider_error_does_not_poison_row(repo): assert row["aggregate_verdict"] == "benign" +@pytest.mark.anyio +async def test_unexpected_provider_raise_does_not_lose_other_results(): + """BUG-15 regression: an unexpected exception from one provider must + not cancel sibling providers or swallow their results. + + Before the fix ``asyncio.gather(..., return_exceptions=False)`` let an + unexpected raise propagate immediately, cancelling all sibling tasks + and losing their results for the whole IP batch. + + After the fix ``return_exceptions=True`` is used; exception results are + filtered out and logged, while valid :class:`IntelResult` objects from + other providers are processed normally. + """ + + class _RaisingProvider(IntelProvider): + """Simulates an unexpected (non-contractual) exception.""" + concurrency = 1 + min_dispatch_interval_s = 0.0 + name = "exploding" + + async def lookup(self, ip: str) -> IntelResult: + raise RuntimeError("unexpected boom") + + good = _FakeProvider( + "greynoise", + verdict="benign", + column_updates={ + "greynoise_classification": "benign", + "greynoise_raw": {}, + "greynoise_queried_at": datetime.now(timezone.utc), + }, + ) + bad = _RaisingProvider() + + row = await _enrich_one( + attacker_uuid="test-uuid", + ip="10.0.0.1", + providers=[good, bad], + ttl_hours=24, + ) + + # The good provider's data must be present despite the bad one raising. + assert row["greynoise_classification"] == "benign" + assert row["aggregate_verdict"] == "benign" + # The bad provider did not poison the row or raise to the caller. + assert good.calls == ["10.0.0.1"] + + @pytest.mark.anyio async def test_intel_enriched_event_published_to_bus(repo, monkeypatch): """End-to-end: worker dispatches providers + publishes the event.""" diff --git a/tests/swarm/test_forwarder_resilience.py b/tests/swarm/test_forwarder_resilience.py index d3befdc3..2c90d2df 100644 --- a/tests/swarm/test_forwarder_resilience.py +++ b/tests/swarm/test_forwarder_resilience.py @@ -16,6 +16,7 @@ exercise: from __future__ import annotations import asyncio +import logging import pathlib import socket @@ -255,3 +256,213 @@ async def test_listener_tolerates_client_dropping_mid_stream( await asyncio.wait_for(listener_task, timeout=5) except asyncio.TimeoutError: listener_task.cancel() + + +# ----------------------------------------------------- V9.1.3 fail-closed CN + + +class _FakeWriter: + """Minimal asyncio.StreamWriter stand-in for _handle_connection. + + Records close()/wait_closed() so a test can assert the connection was + torn down without binding a real socket. + """ + + def __init__(self, ssl_object: object = None, peername: object = ("1.2.3.4", 4242)) -> None: + self._extra = {"ssl_object": ssl_object, "peername": peername} + self.closed = False + self.wait_closed_called = False + self.written: list[bytes] = [] + + def get_extra_info(self, key: str, default: object = None) -> object: + return self._extra.get(key, default) + + def close(self) -> None: + self.closed = True + + async def wait_closed(self) -> None: + self.wait_closed_called = True + + def write(self, data: bytes) -> None: # pragma: no cover - not expected + self.written.append(data) + + +def _drained_reader(frame: bytes) -> asyncio.StreamReader: + r = asyncio.StreamReader() + r.feed_data(frame) + r.feed_eof() + return r + + +@pytest.mark.asyncio +async def test_listener_rejects_unknown_cn_ingests_nothing( + tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """V9.1.3 FAIL-CLOSED: a peer whose cert yields CN='unknown' + (malformed/empty/missing CN) must be closed and ingest NOTHING — even + though the frame on the wire is a perfectly valid RFC 5424 line.""" + master_log = tmp_path / "master.log" + master_json = tmp_path / "master.json" + cfg = lst.ListenerConfig( + log_path=master_log, json_path=master_json, + bind_host="127.0.0.1", bind_port=0, ca_dir=tmp_path / "ca", + ) + + # Force peer_cn -> "unknown" regardless of the (absent) ssl object. + monkeypatch.setattr(lst, "peer_cn", lambda _ssl: "unknown") + + payload = b'<13>1 2026-04-18T00:00:00Z decky01 svc - - - should-not-ingest' + reader = _drained_reader(f"{len(payload)} ".encode() + payload) + writer = _FakeWriter() + + await lst._handle_connection(reader, writer, cfg) # type: ignore[arg-type] + + assert writer.closed, "unknown-CN connection must be closed" + assert writer.wait_closed_called + # Nothing must have been ingested into either sink. + assert not master_log.exists() or master_log.stat().st_size == 0 + assert not master_json.exists() or master_json.stat().st_size == 0 + + +@pytest.mark.asyncio +async def test_listener_processes_valid_cn_normally( + tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """A peer with a parseable CN is still processed and tagged with its + provenance — the fail-closed guard does not regress the happy path.""" + master_log = tmp_path / "master.log" + master_json = tmp_path / "master.json" + cfg = lst.ListenerConfig( + log_path=master_log, json_path=master_json, + bind_host="127.0.0.1", bind_port=0, ca_dir=tmp_path / "ca", + ) + + monkeypatch.setattr(lst, "peer_cn", lambda _ssl: "worker-good") + + payload = ( + b'<13>1 2026-04-18T00:00:00Z decky01 svc 1 - ' + b'[decnet@53595 decky="decky01" service="svc" event_type="connect" ' + b'attacker_ip="1.2.3.4" attacker_port="4242"] hello-good' + ) + reader = _drained_reader(f"{len(payload)} ".encode() + payload) + writer = _FakeWriter() + + await lst._handle_connection(reader, writer, cfg) # type: ignore[arg-type] + + assert writer.closed + assert master_log.exists() and b"hello-good" in master_log.read_bytes() + # Provenance tagged from the (good) CN in the JSON sink. + assert master_json.exists() and "worker-good" in master_json.read_text() + + +# ------------------------------------------------------- BUG-16 shutdown errors + + +@pytest.mark.asyncio +async def test_listener_shutdown_surfaces_serve_task_error( + tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture +) -> None: + """BUG-16: a non-CancelledError raised by the serve task during shutdown + must be logged, not silently swallowed.""" + + class _BoomServer: + def __init__(self) -> None: + self.sockets: tuple = () + + async def serve_forever(self) -> None: + # Run until cancelled, then raise a REAL error instead of honoring + # the CancelledError — emulates an OSError surfacing as the serve + # task is awaited after server.close()/cancel() during shutdown. + try: + await asyncio.Event().wait() # block until cancelled + except asyncio.CancelledError: + raise OSError("boom during serve") from None + + def close(self) -> None: + pass + + async def __aenter__(self) -> "_BoomServer": + return self + + async def __aexit__(self, *exc: object) -> None: + pass + + async def _fake_start_server(*_a: object, **_kw: object) -> _BoomServer: + return _BoomServer() + + monkeypatch.setattr(lst.asyncio, "start_server", _fake_start_server) + monkeypatch.setattr(lst, "build_listener_ssl_context", lambda _ca: None) + + cfg = lst.ListenerConfig( + log_path=tmp_path / "m.log", json_path=tmp_path / "m.json", + bind_host="127.0.0.1", bind_port=0, ca_dir=tmp_path / "ca", + ) + stop = asyncio.Event() + + async def _stop_soon() -> None: + # Let the serve task actually start before we request shutdown, so + # the cancel path (not a never-scheduled task) is what surfaces. + await asyncio.sleep(0.05) + stop.set() + + waiter = asyncio.create_task(_stop_soon()) + with caplog.at_level(logging.ERROR, logger="swarm.listener"): + await lst.run_listener(cfg, stop_event=stop) + await waiter + + assert any( + "serve task errored during shutdown" in r.getMessage() for r in caplog.records + ), "listener swallowed a real serve-task error on shutdown" + + +@pytest.mark.asyncio +async def test_forwarder_shutdown_surfaces_heartbeat_error( + tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch, caplog: pytest.LogCaptureFixture +) -> None: + """BUG-16: a non-CancelledError from the heartbeat task during forwarder + shutdown must be logged, not silently suppressed.""" + + started = asyncio.Event() + + async def _boom_heartbeat(*_a: object, **_kw: object) -> None: + # Signal that we actually ran, then fail — guarantees the task has a + # stored exception (not just a pending cancel) by shutdown time. + started.set() + raise RuntimeError("heartbeat boom") + + # Bus unavailable -> bus=None path; heartbeat task still created. + def _no_bus(*_a: object, **_kw: object): + raise RuntimeError("no bus in test") + + # Make the connect attempt fail with OSError so run_forwarder takes its + # caught backoff branch (which yields control, letting the heartbeat task + # run and raise) instead of propagating an uncaught error. + def _boom_ctx(*_a: object, **_kw: object): + raise OSError("no ssl context in test") + + monkeypatch.setattr(fwd, "get_bus", _no_bus) + monkeypatch.setattr(fwd, "run_health_heartbeat", _boom_heartbeat) + monkeypatch.setattr(fwd, "build_worker_ssl_context", _boom_ctx) + + cfg = fwd.ForwarderConfig( + log_path=tmp_path / "decnet.log", + master_host="127.0.0.1", master_port=0, + agent_dir=tmp_path / "agent", + state_db=tmp_path / "fwd.db", + ) + stop = asyncio.Event() + + async def _stop_after_heartbeat_ran() -> None: + # Let the heartbeat task get scheduled and raise before we ask the + # forwarder to shut down, so the finally block observes the error. + await started.wait() + stop.set() + + waiter = asyncio.create_task(_stop_after_heartbeat_ran()) + with caplog.at_level(logging.ERROR, logger="swarm.forwarder"): + await fwd.run_forwarder(cfg, poll_interval=0.01, stop_event=stop) + await waiter + + assert any( + "heartbeat task errored during shutdown" in r.getMessage() for r in caplog.records + ), "forwarder swallowed a real heartbeat-task error on shutdown" diff --git a/tests/ttp/store/test_database.py b/tests/ttp/store/test_database.py index 48dcf23f..6bd0c9a7 100644 --- a/tests/ttp/store/test_database.py +++ b/tests/ttp/store/test_database.py @@ -20,7 +20,7 @@ from __future__ import annotations import inspect import sys -from datetime import datetime, timezone +from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any @@ -209,3 +209,185 @@ emits: await sync_task except (asyncio.CancelledError, Exception): # noqa: BLE001 pass + + +# --------------------------------------------------------------------------- +# BUG-13 regression: tail_db must not drop rules updated AT the watermark +# --------------------------------------------------------------------------- + +_SHARED_YAML_TEMPLATE = """\ +rule_id: {rule_id} +rule_version: 1 +name: {name} +applies_to: [command] +match: + pattern: 'test' +emits: + - tactic: TA0007 + technique_id: T1033 + confidence: 0.85 +""" + + +async def test_tail_db_same_timestamp_both_rules_emitted( + db_store: DatabaseRuleStore, tmp_path: Path, +) -> None: + """BUG-13 regression: two rules with the SAME updated_at timestamp are + BOTH emitted by tail_db across the watermark boundary (none dropped). + + The pre-fix code used ``updated_at > watermark`` which silently + dropped rules whose timestamp equalled the watermark. The fix + changes to ``>=`` and deduplicates by rule_id within the window, + advancing the watermark by 1 µs after emitting to prevent re-emission. + """ + import asyncio # noqa: PLC0415 + + # Pin a past watermark so both rules are in scope on the first poll. + shared_ts = datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc) + db_store._tail_watermark = shared_ts + + # Insert two TTPRule rows with identical updated_at. + repo = db_store._repo + assert repo is not None + for rule_id, name in (("R1001", "rule one"), ("R1002", "rule two")): + yaml_content = _SHARED_YAML_TEMPLATE.format(rule_id=rule_id, name=name) + async with repo._session() as session: # type: ignore[attr-defined] + row = TTPRule( + rule_id=rule_id, + rule_version=1, + source_path=f"./rules/ttp/{rule_id}.yaml", + yaml_content=yaml_content, + updated_at=shared_ts, + updated_by="test", + ) + session.add(row) + await session.commit() + + # Patch _emit_change to capture rule_ids without touching subscribers. + emitted_via_tail: set[str] = set() + original_emit = db_store._emit_change + + async def _capture_emit(change, **kwargs): # type: ignore[no-untyped-def] + emitted_via_tail.add(change.rule_id) + await original_emit(change, **kwargs) + + db_store._emit_change = _capture_emit # type: ignore[assignment] + db_store._stop.clear() + + # Run tail_db for one short cycle then stop. + poll_task = asyncio.create_task(db_store.tail_db(poll_interval=0.01)) + await asyncio.sleep(0.08) + db_store._stop.set() + try: + await asyncio.wait_for(poll_task, timeout=2.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + poll_task.cancel() + + assert "R1001" in emitted_via_tail, "R1001 must be emitted by tail_db" + assert "R1002" in emitted_via_tail, "R1002 must be emitted by tail_db" + # After emitting both rules at shared_ts, the seen-ids set must record + # them so that a second poll at the same watermark skips re-emission. + # The watermark itself stays at shared_ts (no newer rows existed) but + # _tail_seen_ids acts as the dedup guard. + assert "R1001" in db_store._tail_seen_ids, "R1001 must be in _tail_seen_ids" + assert "R1002" in db_store._tail_seen_ids, "R1002 must be in _tail_seen_ids" + + # Simulate a second poll — the rules should NOT be re-emitted. + emitted_via_tail.clear() + db_store._stop.clear() + poll_task2 = asyncio.create_task(db_store.tail_db(poll_interval=0.01)) + await asyncio.sleep(0.08) + db_store._stop.set() + try: + await asyncio.wait_for(poll_task2, timeout=2.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + poll_task2.cancel() + + assert "R1001" not in emitted_via_tail, "R1001 must NOT be re-emitted on second poll" + assert "R1002" not in emitted_via_tail, "R1002 must NOT be re-emitted on second poll" + + +async def test_tail_db_coarse_timestamp_late_rule_still_emitted( + db_store: DatabaseRuleStore, tmp_path: Path, +) -> None: + """BUG-13 (microsecond-advance regression): on coarse second-resolution + timestamps (MySQL DATETIME) a rule saved at the SAME whole-second AFTER + a poll must STILL be emitted on the next poll — not dropped. + + The defective fix advanced the watermark to ``max_ts + 1 µs`` after a + poll. On second-resolution storage that bump lands inside the same + whole-second bucket, so a row written later in that same second has + ``updated_at < watermark`` and the ``>= watermark`` query silently drops + it — reintroducing the same-timestamp bug. + + The correct fix keeps the watermark AT max_ts and relies solely on + ``_tail_seen_ids`` for dedup. This test simulates coarse storage by + writing every row at the identical whole-second timestamp. + + Red-before/green-after: with ``max_ts + 1 µs`` the second rule + (written at the same whole second after the first poll) is dropped and + this test fails; keeping the watermark at max_ts emits it. + """ + import asyncio # noqa: PLC0415 + + coarse_ts = datetime(2024, 6, 1, 9, 0, 0, tzinfo=timezone.utc) + # Start the watermark strictly BEFORE the rows so the first poll takes + # the advancing (max_ts > watermark) branch — the exact branch where the + # defective +1 µs bump skips past same-second late arrivals. + db_store._tail_watermark = coarse_ts - timedelta(seconds=5) + + repo = db_store._repo + assert repo is not None + + async def _insert(rule_id: str) -> None: + yaml_content = _SHARED_YAML_TEMPLATE.format(rule_id=rule_id, name=rule_id) + async with repo._session() as session: # type: ignore[attr-defined] + session.add(TTPRule( + rule_id=rule_id, + rule_version=1, + source_path=f"./rules/ttp/{rule_id}.yaml", + yaml_content=yaml_content, + updated_at=coarse_ts, # identical whole-second timestamp + updated_by="test", + )) + await session.commit() + + emitted: list[str] = [] + original_emit = db_store._emit_change + + async def _capture_emit(change, **kwargs): # type: ignore[no-untyped-def] + emitted.append(change.rule_id) + await original_emit(change, **kwargs) + + db_store._emit_change = _capture_emit # type: ignore[assignment] + + # First poll: only R2001 exists yet. + await _insert("R2001") + db_store._stop.clear() + poll1 = asyncio.create_task(db_store.tail_db(poll_interval=0.01)) + await asyncio.sleep(0.05) + db_store._stop.set() + try: + await asyncio.wait_for(poll1, timeout=2.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + poll1.cancel() + + assert "R2001" in emitted + # Watermark stayed at the coarse second; seen-ids guards re-emission. + assert db_store._tail_watermark == coarse_ts + assert "R2001" in db_store._tail_seen_ids + + # A rule arrives LATER, in the same whole second (coarse resolution). + emitted.clear() + await _insert("R2002") + db_store._stop.clear() + poll2 = asyncio.create_task(db_store.tail_db(poll_interval=0.01)) + await asyncio.sleep(0.05) + db_store._stop.set() + try: + await asyncio.wait_for(poll2, timeout=2.0) + except (asyncio.CancelledError, asyncio.TimeoutError): + poll2.cancel() + + assert "R2002" in emitted, "late same-second rule must NOT be dropped" + assert "R2001" not in emitted, "already-emitted rule must not re-fire"