feat(correlation): credential-reuse engine + reuse-correlate worker

Adds CorrelationEngine.correlate_credential_reuse + the
`decnet reuse-correlate` long-running worker. The worker mirrors the
mutator's bus-wake + slow-tick pattern: wakes on credential.captured
and attacker.observed for sub-second latency, falls back to a 60s
poll if the bus is unavailable, and publishes
credential.reuse.detected once per new or grown CredentialReuse row
(group-deduped so a 5-cred reuse doesn't emit 5 partial events).

The web ingester now publishes credential.captured after every
successful Credential upsert; bus + new repo helper
find_credential_reuse_candidates feed the engine pass.
This commit is contained in:
2026-04-26 03:37:49 -04:00
parent 00ecea924a
commit 590c2b0fac
8 changed files with 705 additions and 5 deletions

View File

@@ -131,7 +131,7 @@ async def _run_loop(
time.monotonic() - _batch_started >= _max_wait_s
):
_flushed = len(_batch)
_position = await _flush_batch(repo, _batch, _position)
_position = await _flush_batch(repo, _batch, _position, _bus)
_batch.clear()
_batch_started = time.monotonic()
await _publish_batch(_bus, _flushed, _position)
@@ -139,7 +139,7 @@ async def _run_loop(
# Flush any remainder collected before EOF / partial-line break.
if _batch:
_flushed = len(_batch)
_position = await _flush_batch(repo, _batch, _position)
_position = await _flush_batch(repo, _batch, _position, _bus)
await _publish_batch(_bus, _flushed, _position)
except Exception as _e:
@@ -175,6 +175,7 @@ async def _flush_batch(
repo: BaseRepository,
batch: list[tuple[dict[str, Any], int]],
current_position: int,
bus: Any = None,
) -> int:
"""Commit a batch of log rows and return the new file position.
@@ -192,7 +193,7 @@ async def _flush_batch(
_new_position = batch[-1][1]
await repo.add_logs(_entries)
for _entry in _entries:
await _extract_bounty(repo, _entry)
await _extract_bounty(repo, _entry, bus)
await repo.set_state(_INGEST_STATE_KEY, {"position": _new_position})
return _new_position
@@ -220,6 +221,7 @@ async def _ingest_credential_native(
repo: BaseRepository,
log_data: dict[str, Any],
fields: dict[str, Any],
bus: Any = None,
) -> None:
"""Native-shape credential: SD-block already carries secret_b64.
@@ -263,9 +265,28 @@ async def _ingest_credential_native(
"fields": fields, # repo handles json.dumps with ensure_ascii=True
})
# Wake the reuse correlator. Fire-and-forget; a dead bus never blocks
# ingestion. Worker receives this and re-runs the GROUP BY pass.
await publish_safely(
bus,
_topics.credential(_topics.CREDENTIAL_CAPTURED),
{
"secret_sha256": sha256_hex,
"secret_kind": secret_kind,
"attacker_ip": log_data.get("attacker_ip"),
"decky": log_data.get("decky"),
"service": log_data.get("service"),
},
event_type=_topics.CREDENTIAL_CAPTURED,
)
@_traced("ingester.extract_bounty")
async def _extract_bounty(repo: BaseRepository, log_data: dict[str, Any]) -> None:
async def _extract_bounty(
repo: BaseRepository,
log_data: dict[str, Any],
bus: Any = None,
) -> None:
"""Detect and extract valuable artifacts (bounties) from log entries."""
_fields = log_data.get("fields")
if not isinstance(_fields, dict):
@@ -278,7 +299,7 @@ async def _extract_bounty(repo: BaseRepository, log_data: dict[str, Any]) -> Non
# native branch directly. Redis (no principal) and LDAP (principal=
# dn) also land here — they were previously dropped silently.
if "secret_b64" in _fields:
await _ingest_credential_native(repo, log_data, _fields)
await _ingest_credential_native(repo, log_data, _fields, bus)
# 2. HTTP User-Agent fingerprint
_h_raw = _fields.get("headers")