From 590c2b0faccf61da0acef24e9c803d0b91e228be Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 03:37:49 -0400 Subject: [PATCH] feat(correlation): credential-reuse engine + reuse-correlate worker Adds CorrelationEngine.correlate_credential_reuse + the `decnet reuse-correlate` long-running worker. The worker mirrors the mutator's bus-wake + slow-tick pattern: wakes on credential.captured and attacker.observed for sub-second latency, falls back to a 60s poll if the bus is unavailable, and publishes credential.reuse.detected once per new or grown CredentialReuse row (group-deduped so a 5-cred reuse doesn't emit 5 partial events). The web ingester now publishes credential.captured after every successful Credential upsert; bus + new repo helper find_credential_reuse_candidates feed the engine pass. --- decnet/cli/workers.py | 56 ++++ decnet/correlation/engine.py | 56 ++++ decnet/correlation/reuse_worker.py | 153 +++++++++++ decnet/web/db/repository.py | 12 + decnet/web/db/sqlmodel_repo.py | 48 ++++ decnet/web/ingester.py | 31 ++- tests/correlation/test_credential_reuse.py | 287 +++++++++++++++++++++ tests/web/test_ingester_bus.py | 67 +++++ 8 files changed, 705 insertions(+), 5 deletions(-) create mode 100644 decnet/correlation/reuse_worker.py create mode 100644 tests/correlation/test_credential_reuse.py diff --git a/decnet/cli/workers.py b/decnet/cli/workers.py index cc0ba520..635dc144 100644 --- a/decnet/cli/workers.py +++ b/decnet/cli/workers.py @@ -140,3 +140,59 @@ def register(app: typer.Typer) -> None: if emit_syslog: for line in engine.traversal_syslog_lines(min_deckies): typer.echo(line) + + @app.command(name="reuse-correlate") + def reuse_correlate( + min_targets: int = typer.Option( + 2, "--min-targets", "-m", + help="Minimum distinct (decky, service) targets a secret must hit before a CredentialReuse row is persisted", + ), + poll_interval_secs: float = typer.Option( + 60.0, "--poll-interval", "-i", + help="Slow-tick fallback when the bus is idle or unavailable (seconds)", + ), + daemon: bool = typer.Option( + False, "--daemon", "-d", + help="Detach to background as a daemon process", + ), + ) -> None: + """Long-running credential-reuse correlator. + + Watches the bus for ``credential.captured`` and ``attacker.observed`` + events, re-runs the reuse pass on each wake, and publishes + ``credential.reuse.detected`` for every new or grown + ``CredentialReuse`` row. + """ + import asyncio + from decnet.correlation.reuse_worker import run_reuse_loop + from decnet.web.dependencies import repo + + if daemon: + log.info( + "reuse-correlate daemonizing min_targets=%d poll=%s", + min_targets, poll_interval_secs, + ) + _utils._daemonize() + + log.info( + "reuse-correlate command invoked min_targets=%d poll=%s", + min_targets, poll_interval_secs, + ) + console.print( + f"[bold cyan]Reuse correlator starting[/] " + f"min_targets={min_targets} poll={poll_interval_secs}s" + ) + console.print("[dim]Press Ctrl+C to stop[/]") + + async def _run() -> None: + await repo.initialize() + await run_reuse_loop( + repo, + poll_interval_secs=poll_interval_secs, + min_targets=min_targets, + ) + + try: + asyncio.run(_run()) + except KeyboardInterrupt: + console.print("\n[yellow]Reuse correlator stopped.[/]") diff --git a/decnet/correlation/engine.py b/decnet/correlation/engine.py index e5ac3f4f..2fc92baf 100644 --- a/decnet/correlation/engine.py +++ b/decnet/correlation/engine.py @@ -214,6 +214,62 @@ class CorrelationEngine: "traversals": [t.to_dict() for t in self.traversals(min_deckies)], } + # ------------------------------------------------------------------ # + # Credential reuse # + # ------------------------------------------------------------------ # + + async def correlate_credential_reuse( + self, + repo: Any, + min_targets: int = 2, + ) -> list[dict[str, Any]]: + """Detect cross-target credential reuse and persist findings. + + Groups every ``Credential`` row by ``(secret_sha256, secret_kind, + principal)``. Groups crossing *min_targets* distinct + ``(decky, service)`` pairs are folded into ``CredentialReuse`` via + :meth:`BaseRepository.upsert_credential_reuse` — one upsert per + underlying credential row, since the upsert itself dedups on the + unique key and recomputes aggregates from the credentials table. + + Returns the upsert results that flipped ``inserted`` or + ``changed``, so the caller can publish ``credential.reuse.detected`` + for each new or grown finding without re-querying. + """ + results: list[dict[str, Any]] = [] + candidates = await repo.find_credential_reuse_candidates(min_targets) + for group in candidates: + # Per-group flags: each credential in a group hits the same + # CredentialReuse row, so several upserts may flip + # ``inserted``/``changed`` along the way. Collapse to one + # publish per group keyed by the final state — otherwise a + # group of N creds emits N partial reuse.detected events + # with intermediate target_counts. + final_row: dict[str, Any] | None = None + saw_insert = False + saw_change = False + for cred in group["credentials"]: + row = await repo.upsert_credential_reuse( + secret_sha256=group["secret_sha256"], + secret_kind=group["secret_kind"], + principal=group["principal"], + attacker_uuid=cred.get("attacker_uuid"), + attacker_ip=cred["attacker_ip"], + decky=cred["decky_name"], + service=cred["service"], + attempt_count=int(cred.get("attempt_count") or 1), + ) + if row is None: + continue + final_row = row + saw_insert = saw_insert or bool(row.get("inserted")) + saw_change = saw_change or bool(row.get("changed")) + if final_row is not None and (saw_insert or saw_change): + final_row["inserted"] = saw_insert + final_row["changed"] = saw_change + results.append(final_row) + return results + @_traced("correlation.traversal_syslog_lines") def traversal_syslog_lines(self, min_deckies: int = 2) -> list[str]: """ diff --git a/decnet/correlation/reuse_worker.py b/decnet/correlation/reuse_worker.py new file mode 100644 index 00000000..07ba33c2 --- /dev/null +++ b/decnet/correlation/reuse_worker.py @@ -0,0 +1,153 @@ +"""Long-running credential-reuse correlator. + +Loops :meth:`CorrelationEngine.correlate_credential_reuse` over the +credentials table and publishes ``credential.reuse.detected`` for every +new or grown ``CredentialReuse`` row. Mirrors the mutator's bus-wake + +slow-tick pattern from :mod:`decnet.mutator.engine`: woken on +``credential.captured`` and ``attacker.observed`` for sub-second latency, +falls back to a 60s poll if the bus is unavailable. +""" +from __future__ import annotations + +import asyncio +import contextlib + +from decnet.bus import topics as _topics +from decnet.bus.base import BaseBus +from decnet.bus.factory import get_bus +from decnet.bus.publish import ( + publish_safely, + run_control_listener_signal as _run_control_listener_signal, + run_health_heartbeat as _run_health_heartbeat, +) +from decnet.correlation.engine import CorrelationEngine +from decnet.logging import get_logger +from decnet.web.db.repository import BaseRepository + +log = get_logger("correlation.reuse_worker") + +_DEFAULT_POLL_SECS = 60.0 +_DEFAULT_MIN_TARGETS = 2 + + +async def run_reuse_loop( + repo: BaseRepository, + *, + poll_interval_secs: float = _DEFAULT_POLL_SECS, + min_targets: int = _DEFAULT_MIN_TARGETS, + shutdown: asyncio.Event | None = None, +) -> None: + """Run the credential-reuse correlator until cancelled. + + *shutdown* is an optional external stop signal; the loop also exits + cleanly on ``CancelledError`` and ``KeyboardInterrupt``. The + *min_targets* threshold is the minimum number of distinct + ``(decky, service)`` pairs a secret must touch before it's persisted + as a reuse finding. + """ + log.info( + "reuse correlator started poll_interval_secs=%s min_targets=%s", + poll_interval_secs, min_targets, + ) + + bus: BaseBus | None = None + wake = asyncio.Event() + wake_tasks: list[asyncio.Task] = [] + heartbeat_task: asyncio.Task | None = None + try: + candidate = get_bus(client_name="reuse-correlator") + await candidate.connect() + bus = candidate + wake_tasks.append(asyncio.create_task( + _wake_on(bus, wake, _topics.credential(_topics.CREDENTIAL_CAPTURED)), + )) + wake_tasks.append(asyncio.create_task( + _wake_on(bus, wake, _topics.attacker(_topics.ATTACKER_OBSERVED)), + )) + heartbeat_task = asyncio.create_task( + _run_health_heartbeat(bus, "reuse-correlator"), + ) + wake_tasks.append(asyncio.create_task( + _run_control_listener_signal(bus, "reuse-correlator"), + )) + except Exception as exc: # noqa: BLE001 + log.warning( + "reuse correlator: bus unavailable, running in poll-only mode: %s", + exc, + ) + + engine = CorrelationEngine() + if shutdown is None: + shutdown = asyncio.Event() + + try: + while not shutdown.is_set(): + try: + results = await engine.correlate_credential_reuse( + repo, min_targets=min_targets, + ) + except Exception: # noqa: BLE001 + log.exception("reuse correlator: tick failed") + results = [] + + for row in results: + await publish_safely( + bus, + _topics.credential(_topics.CREDENTIAL_REUSE_DETECTED), + { + "id": row.get("id"), + "secret_kind": row.get("secret_kind"), + "target_count": row.get("target_count"), + "attacker_uuids": row.get("attacker_uuids"), + "attacker_ips": row.get("attacker_ips"), + "deckies": row.get("deckies"), + "services": row.get("services"), + }, + event_type=_topics.CREDENTIAL_REUSE_DETECTED, + ) + + try: + await asyncio.wait_for( + wake.wait(), timeout=float(poll_interval_secs), + ) + except asyncio.TimeoutError: + pass + wake.clear() + except (asyncio.CancelledError, KeyboardInterrupt): + log.info("reuse correlator stopped") + finally: + for t in wake_tasks: + t.cancel() + if heartbeat_task is not None: + heartbeat_task.cancel() + for t in (*wake_tasks, heartbeat_task): + if t is None: + continue + with contextlib.suppress(asyncio.CancelledError, Exception): + await t + if bus is not None: + with contextlib.suppress(Exception): + await bus.close() + + +async def _wake_on(bus: BaseBus, wake: asyncio.Event, pattern: str) -> None: + """Flip *wake* every time *pattern* fires on the bus. + + Survives transient subscriber errors by logging and exiting; the + poll-interval fallback keeps the loop alive in poll-only mode. + """ + try: + sub = bus.subscribe(pattern) + async with sub: + async for _event in sub: + wake.set() + except asyncio.CancelledError: + raise + except Exception as exc: # noqa: BLE001 + log.warning( + "reuse correlator: subscriber for %s died (%s); falling back to poll", + pattern, exc, + ) + + +__all__ = ["run_reuse_loop"] diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index fcb0c030..7260b220 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -179,6 +179,18 @@ class BaseRepository(ABC): """ pass + @abstractmethod + async def find_credential_reuse_candidates( + self, min_targets: int = 2 + ) -> list[dict[str, Any]]: + """Group ``credentials`` by ``(secret_sha256, secret_kind, principal)`` + and return groups whose distinct ``(decky_name, service)`` count is + at least *min_targets*. Each entry has the group key, the + ``target_count``, and the underlying credential rows for the + correlator to fold into ``CredentialReuse``. + """ + pass + @abstractmethod async def list_credential_reuses( self, diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index f4b1e964..5ef674bb 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -849,6 +849,54 @@ class SQLModelRepository(BaseRepository): d["changed"] = changed return d + async def find_credential_reuse_candidates( + self, min_targets: int = 2 + ) -> List[dict[str, Any]]: + """Find credential groups crossing the reuse threshold. + + Returns one dict per qualifying ``(secret_sha256, secret_kind, + principal)`` group, with the keys plus a ``credentials`` list of + the underlying rows so the correlator can fold each into + ``CredentialReuse`` via ``upsert_credential_reuse``. + """ + target_expr = func.count( + func.distinct(Credential.decky_name + ":" + Credential.service) + ).label("target_count") + async with self._session() as session: + group_stmt = ( + select( + Credential.secret_sha256, + Credential.secret_kind, + Credential.principal, + target_expr, + ) + .group_by( + Credential.secret_sha256, + Credential.secret_kind, + Credential.principal, + ) + .having(target_expr >= int(min_targets)) + ) + groups = (await session.execute(group_stmt)).all() + out: List[dict[str, Any]] = [] + for sha, kind, principal, target_count in groups: + cred_stmt = select(Credential).where( + Credential.secret_sha256 == sha, + Credential.secret_kind == kind, + (Credential.principal == principal) + if principal is not None + else Credential.principal.is_(None), + ) + rows = (await session.execute(cred_stmt)).scalars().all() + out.append({ + "secret_sha256": sha, + "secret_kind": kind, + "principal": principal, + "target_count": int(target_count or 0), + "credentials": [r.model_dump(mode="json") for r in rows], + }) + return out + async def list_credential_reuses( self, limit: int = 50, diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py index 32c09d14..b4623d88 100644 --- a/decnet/web/ingester.py +++ b/decnet/web/ingester.py @@ -131,7 +131,7 @@ async def _run_loop( time.monotonic() - _batch_started >= _max_wait_s ): _flushed = len(_batch) - _position = await _flush_batch(repo, _batch, _position) + _position = await _flush_batch(repo, _batch, _position, _bus) _batch.clear() _batch_started = time.monotonic() await _publish_batch(_bus, _flushed, _position) @@ -139,7 +139,7 @@ async def _run_loop( # Flush any remainder collected before EOF / partial-line break. if _batch: _flushed = len(_batch) - _position = await _flush_batch(repo, _batch, _position) + _position = await _flush_batch(repo, _batch, _position, _bus) await _publish_batch(_bus, _flushed, _position) except Exception as _e: @@ -175,6 +175,7 @@ async def _flush_batch( repo: BaseRepository, batch: list[tuple[dict[str, Any], int]], current_position: int, + bus: Any = None, ) -> int: """Commit a batch of log rows and return the new file position. @@ -192,7 +193,7 @@ async def _flush_batch( _new_position = batch[-1][1] await repo.add_logs(_entries) for _entry in _entries: - await _extract_bounty(repo, _entry) + await _extract_bounty(repo, _entry, bus) await repo.set_state(_INGEST_STATE_KEY, {"position": _new_position}) return _new_position @@ -220,6 +221,7 @@ async def _ingest_credential_native( repo: BaseRepository, log_data: dict[str, Any], fields: dict[str, Any], + bus: Any = None, ) -> None: """Native-shape credential: SD-block already carries secret_b64. @@ -263,9 +265,28 @@ async def _ingest_credential_native( "fields": fields, # repo handles json.dumps with ensure_ascii=True }) + # Wake the reuse correlator. Fire-and-forget; a dead bus never blocks + # ingestion. Worker receives this and re-runs the GROUP BY pass. + await publish_safely( + bus, + _topics.credential(_topics.CREDENTIAL_CAPTURED), + { + "secret_sha256": sha256_hex, + "secret_kind": secret_kind, + "attacker_ip": log_data.get("attacker_ip"), + "decky": log_data.get("decky"), + "service": log_data.get("service"), + }, + event_type=_topics.CREDENTIAL_CAPTURED, + ) + @_traced("ingester.extract_bounty") -async def _extract_bounty(repo: BaseRepository, log_data: dict[str, Any]) -> None: +async def _extract_bounty( + repo: BaseRepository, + log_data: dict[str, Any], + bus: Any = None, +) -> None: """Detect and extract valuable artifacts (bounties) from log entries.""" _fields = log_data.get("fields") if not isinstance(_fields, dict): @@ -278,7 +299,7 @@ async def _extract_bounty(repo: BaseRepository, log_data: dict[str, Any]) -> Non # native branch directly. Redis (no principal) and LDAP (principal= # dn) also land here — they were previously dropped silently. if "secret_b64" in _fields: - await _ingest_credential_native(repo, log_data, _fields) + await _ingest_credential_native(repo, log_data, _fields, bus) # 2. HTTP User-Agent fingerprint _h_raw = _fields.get("headers") diff --git a/tests/correlation/test_credential_reuse.py b/tests/correlation/test_credential_reuse.py new file mode 100644 index 00000000..a4f1c999 --- /dev/null +++ b/tests/correlation/test_credential_reuse.py @@ -0,0 +1,287 @@ +"""Credential-reuse correlator tests. + +Covers: +- ``CorrelationEngine.correlate_credential_reuse`` — group detection, + threshold gating, idempotency on a second call. +- ``run_reuse_loop`` — bus-driven wake, reuse.detected publish on + insert/grow, clean shutdown via the *shutdown* signal. +- Repo helper ``find_credential_reuse_candidates`` — used by the engine. +""" +from __future__ import annotations + +import asyncio +import contextlib +import hashlib +from pathlib import Path + +import pytest + +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus +from decnet.correlation.engine import CorrelationEngine +from decnet.correlation.reuse_worker import run_reuse_loop +from decnet.web.db.factory import get_repository + + +def _sha256(s: str) -> str: + return hashlib.sha256(s.encode("utf-8")).hexdigest() + + +@pytest.fixture +async def repo(tmp_path: Path): + r = get_repository(db_path=str(tmp_path / "reuse_corr.db")) + await r.initialize() + return r + + +async def _seed_credential(repo, **overrides): + base = { + "attacker_ip": "10.0.0.5", + "decky_name": "decky-01", + "service": "ssh", + "principal": "root", + "secret_kind": "plaintext", + "secret_sha256": _sha256("hunter2"), + "secret_b64": "aHVudGVyMg==", + "secret_printable": "hunter2", + "fields": {}, + } + base.update(overrides) + return await repo.upsert_credential(base) + + +# ─── find_credential_reuse_candidates ──────────────────────────────────────── + + +class TestFindCandidates: + @pytest.mark.anyio + async def test_below_threshold_excluded(self, repo) -> None: + sha = _sha256("solo") + await _seed_credential(repo, secret_sha256=sha, decky_name="d1", service="ssh") + + groups = await repo.find_credential_reuse_candidates(min_targets=2) + assert groups == [] + + @pytest.mark.anyio + async def test_threshold_exact_match_included(self, repo) -> None: + sha = _sha256("p4ss") + await _seed_credential(repo, secret_sha256=sha, decky_name="d1", service="ssh") + await _seed_credential(repo, secret_sha256=sha, decky_name="d2", service="ftp") + + groups = await repo.find_credential_reuse_candidates(min_targets=2) + assert len(groups) == 1 + g = groups[0] + assert g["secret_sha256"] == sha + assert g["secret_kind"] == "plaintext" + assert g["target_count"] == 2 + assert len(g["credentials"]) == 2 + + @pytest.mark.anyio + async def test_distinct_principals_form_distinct_groups(self, repo) -> None: + """Same secret + different principals → two separate groups.""" + sha = _sha256("hunter2") + await _seed_credential( + repo, secret_sha256=sha, principal="root", + decky_name="d1", service="ssh", + ) + await _seed_credential( + repo, secret_sha256=sha, principal="root", + decky_name="d2", service="ftp", + ) + await _seed_credential( + repo, secret_sha256=sha, principal="admin", + decky_name="d1", service="ssh", + ) + await _seed_credential( + repo, secret_sha256=sha, principal="admin", + decky_name="d2", service="ftp", + ) + + groups = await repo.find_credential_reuse_candidates(min_targets=2) + principals = sorted(g["principal"] for g in groups) + assert principals == ["admin", "root"] + + @pytest.mark.anyio + async def test_repeated_decky_service_does_not_count_twice(self, repo) -> None: + """A repeat attempt on the same (decky, service) doesn't pad target_count.""" + sha = _sha256("h2") + # Two attempts on the same decky/service → upsert dedups. + await _seed_credential(repo, secret_sha256=sha, decky_name="d1", service="ssh") + await _seed_credential(repo, secret_sha256=sha, decky_name="d1", service="ssh") + + groups = await repo.find_credential_reuse_candidates(min_targets=2) + assert groups == [] + + +# ─── CorrelationEngine.correlate_credential_reuse ──────────────────────────── + + +class TestEngineCorrelate: + @pytest.mark.anyio + async def test_emits_reuse_for_qualifying_group(self, repo) -> None: + sha = _sha256("hunter2") + await _seed_credential(repo, secret_sha256=sha, decky_name="d1", service="ssh") + await _seed_credential(repo, secret_sha256=sha, decky_name="d2", service="ftp") + + engine = CorrelationEngine() + results = await engine.correlate_credential_reuse(repo, min_targets=2) + + assert len(results) >= 1 + assert any(r.get("inserted") for r in results) + + total, rows = await repo.list_credential_reuses(min_target_count=2) + assert total == 1 + assert rows[0]["target_count"] == 2 + + @pytest.mark.anyio + async def test_below_threshold_persists_nothing(self, repo) -> None: + sha = _sha256("loner") + await _seed_credential(repo, secret_sha256=sha, decky_name="d1", service="ssh") + + engine = CorrelationEngine() + results = await engine.correlate_credential_reuse(repo, min_targets=2) + + assert results == [] + total, _ = await repo.list_credential_reuses(min_target_count=2) + assert total == 0 + + @pytest.mark.anyio + async def test_idempotent_on_second_run(self, repo) -> None: + """A second call with no new credentials returns no + insert/grow rows and leaves the table at the same row count. + """ + sha = _sha256("idempotent") + await _seed_credential(repo, secret_sha256=sha, decky_name="d1", service="ssh") + await _seed_credential(repo, secret_sha256=sha, decky_name="d2", service="ftp") + + engine = CorrelationEngine() + await engine.correlate_credential_reuse(repo, min_targets=2) + before_total, _ = await repo.list_credential_reuses(min_target_count=2) + + results2 = await engine.correlate_credential_reuse(repo, min_targets=2) + after_total, _ = await repo.list_credential_reuses(min_target_count=2) + + assert before_total == after_total == 1 + assert results2 == [] + + @pytest.mark.anyio + async def test_growth_emits_changed(self, repo) -> None: + """Adding a third target after an initial reuse run yields a + ``changed`` row on the next correlation pass. + """ + sha = _sha256("grower") + await _seed_credential(repo, secret_sha256=sha, decky_name="d1", service="ssh") + await _seed_credential(repo, secret_sha256=sha, decky_name="d2", service="ftp") + + engine = CorrelationEngine() + await engine.correlate_credential_reuse(repo, min_targets=2) + + await _seed_credential(repo, secret_sha256=sha, decky_name="d3", service="rdp") + results = await engine.correlate_credential_reuse(repo, min_targets=2) + + assert any(r.get("changed") for r in results) + _, rows = await repo.list_credential_reuses(min_target_count=2) + assert rows[0]["target_count"] == 3 + + +# ─── run_reuse_loop ────────────────────────────────────────────────────────── + + +class TestRunReuseLoop: + @pytest.mark.anyio + async def test_publishes_reuse_detected_on_insert(self, repo, monkeypatch) -> None: + """One ``credential.reuse.detected`` per new CredentialReuse row.""" + bus = FakeBus() + await bus.connect() + + # Force the worker to pick up our FakeBus. + from decnet.correlation import reuse_worker as _rw + monkeypatch.setattr(_rw, "get_bus", lambda client_name=None: bus) + + sha = _sha256("loop-insert") + await _seed_credential(repo, secret_sha256=sha, decky_name="d1", service="ssh") + await _seed_credential(repo, secret_sha256=sha, decky_name="d2", service="ftp") + + sub = bus.subscribe(_topics.credential(_topics.CREDENTIAL_REUSE_DETECTED)) + shutdown = asyncio.Event() + task = asyncio.create_task(run_reuse_loop( + repo, poll_interval_secs=60.0, min_targets=2, shutdown=shutdown, + )) + + # Wait for the first tick to publish. + async with sub: + event = await asyncio.wait_for(sub.__anext__(), timeout=5.0) + + assert event.topic == _topics.credential(_topics.CREDENTIAL_REUSE_DETECTED) + assert event.payload["target_count"] == 2 + assert event.payload["secret_kind"] == "plaintext" + + shutdown.set() + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await bus.close() + + @pytest.mark.anyio + async def test_no_reuse_no_publish(self, repo, monkeypatch) -> None: + """A loop with no qualifying groups publishes nothing on its tick.""" + bus = FakeBus() + await bus.connect() + from decnet.correlation import reuse_worker as _rw + monkeypatch.setattr(_rw, "get_bus", lambda client_name=None: bus) + + sha = _sha256("loner-loop") + await _seed_credential(repo, secret_sha256=sha, decky_name="d1", service="ssh") + + sub = bus.subscribe(_topics.credential(_topics.CREDENTIAL_REUSE_DETECTED)) + shutdown = asyncio.Event() + task = asyncio.create_task(run_reuse_loop( + repo, poll_interval_secs=0.05, min_targets=2, shutdown=shutdown, + )) + + # Let the loop run a few ticks. + await asyncio.sleep(0.3) + + async with sub: + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(sub.__anext__(), timeout=0.1) + + shutdown.set() + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await bus.close() + + @pytest.mark.anyio + async def test_no_duplicate_publish_on_second_tick( + self, repo, monkeypatch, + ) -> None: + """A subsequent tick with no new credentials must not republish.""" + bus = FakeBus() + await bus.connect() + from decnet.correlation import reuse_worker as _rw + monkeypatch.setattr(_rw, "get_bus", lambda client_name=None: bus) + + sha = _sha256("once") + await _seed_credential(repo, secret_sha256=sha, decky_name="d1", service="ssh") + await _seed_credential(repo, secret_sha256=sha, decky_name="d2", service="ftp") + + sub = bus.subscribe(_topics.credential(_topics.CREDENTIAL_REUSE_DETECTED)) + shutdown = asyncio.Event() + task = asyncio.create_task(run_reuse_loop( + repo, poll_interval_secs=0.05, min_targets=2, shutdown=shutdown, + )) + + # Drain the first publish (the insert). + async with sub: + await asyncio.wait_for(sub.__anext__(), timeout=5.0) + + # Subsequent ticks must produce no further publishes. + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(sub.__anext__(), timeout=0.3) + + shutdown.set() + task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await task + await bus.close() diff --git a/tests/web/test_ingester_bus.py b/tests/web/test_ingester_bus.py index 8ba78d6c..63b9efc3 100644 --- a/tests/web/test_ingester_bus.py +++ b/tests/web/test_ingester_bus.py @@ -70,6 +70,73 @@ async def test_publish_batch_swallows_bus_failures(monkeypatch) -> None: await _publish_batch(_ExplodingBus(), flushed=3, position=42) +@pytest.mark.asyncio +async def test_credential_captured_published_on_upsert(bus: FakeBus) -> None: + """A successful credential ingest publishes ``credential.captured`` once + with the secret hash, kind, attacker IP, decky, and service. + """ + from unittest.mock import AsyncMock + + from decnet.web.ingester import _ingest_credential_native + + repo = AsyncMock() + repo.upsert_credential = AsyncMock(return_value=1) + + sub = bus.subscribe("credential.captured") + async with sub: + await _ingest_credential_native( + repo, + log_data={ + "attacker_ip": "10.0.0.5", + "decky": "decky-01", + "service": "ssh", + }, + fields={ + "secret_b64": "aHVudGVyMg==", + "secret_kind": "plaintext", + "principal": "root", + "secret_printable": "hunter2", + }, + bus=bus, + ) + event = await asyncio.wait_for(sub.__anext__(), timeout=2.0) + + assert event.topic == "credential.captured" + assert event.type == "captured" + assert event.payload["secret_kind"] == "plaintext" + assert event.payload["attacker_ip"] == "10.0.0.5" + assert event.payload["decky"] == "decky-01" + assert event.payload["service"] == "ssh" + # Hash is sha256 of decoded "hunter2". + import hashlib + assert event.payload["secret_sha256"] == hashlib.sha256(b"hunter2").hexdigest() + repo.upsert_credential.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_credential_captured_silent_on_validation_failure(bus: FakeBus) -> None: + """A dropped credential (invalid b64) must not publish anything.""" + from unittest.mock import AsyncMock + + from decnet.web.ingester import _ingest_credential_native + + repo = AsyncMock() + repo.upsert_credential = AsyncMock() + + sub = bus.subscribe("credential.captured") + async with sub: + await _ingest_credential_native( + repo, + log_data={"attacker_ip": "10.0.0.5", "decky": "d", "service": "ssh"}, + fields={"secret_b64": "not-valid-base64!!!"}, + bus=bus, + ) + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(sub.__anext__(), timeout=0.2) + + repo.upsert_credential.assert_not_awaited() + + @pytest.mark.asyncio async def test_ingester_degrades_cleanly_when_bus_disabled( monkeypatch: pytest.MonkeyPatch,