diff --git a/decnet/profiler/worker.py b/decnet/profiler/worker.py index 383582a6..9ac19098 100644 --- a/decnet/profiler/worker.py +++ b/decnet/profiler/worker.py @@ -248,6 +248,17 @@ async def _update_profiles( record = _build_record(ip, events, traversal, bounties, commands) attacker_uuid = await repo.upsert_attacker(record) + # Backfill Credential.attacker_uuid for every credential row + # captured before the profiler had minted this Attacker. The + # capture path runs before the profiler — coupling them would + # create a chicken-and-egg ordering bug. Soft-fail so a backfill + # error never blocks the next attacker. + try: + await repo.update_credential_attacker_uuid(ip, attacker_uuid) + except Exception as exc: + _span.record_exception(exc) + logger.error("attacker worker: credential backfill failed for %s: %s", ip, exc) + _span.set_attribute("is_traversal", traversal is not None) _span.set_attribute("bounty_count", len(bounties)) _span.set_attribute("command_count", len(commands)) diff --git a/tests/profiler/test_attacker_worker.py b/tests/profiler/test_attacker_worker.py index 1c144a30..416eb28e 100644 --- a/tests/profiler/test_attacker_worker.py +++ b/tests/profiler/test_attacker_worker.py @@ -138,6 +138,7 @@ def _make_repo(logs=None, bounties=None, bounties_for_ips=None, max_log_id=0, sa repo.set_state = AsyncMock() repo.upsert_attacker = AsyncMock(return_value="mock-uuid") repo.upsert_attacker_behavior = AsyncMock() + repo.update_credential_attacker_uuid = AsyncMock(return_value=0) return repo @@ -697,6 +698,78 @@ class TestAttackerProfileWorker: assert initialized is False +# ─── Credential.attacker_uuid backfill ─────────────────────────────────────── + + +class TestCredentialBackfill: + """Profiler must call repo.update_credential_attacker_uuid(ip, uuid) + after every successful upsert_attacker so credentials captured before + the attacker was minted get retroactively linked. + """ + + @pytest.mark.asyncio + async def test_backfill_called_per_attacker(self): + rows = [ + _make_log_row( + row_id=i + 1, + raw_line=_make_raw_line("ssh", "decky-01", "conn", ip, _TS1), + attacker_ip=ip, + ) + for i, ip in enumerate(["1.1.1.1", "2.2.2.2"]) + ] + repo = _make_repo(logs=rows, max_log_id=2) + state = _WorkerState() + + await _incremental_update(repo, state) + + assert repo.update_credential_attacker_uuid.await_count == 2 + called_with = { + (c.args[0], c.args[1]) + for c in repo.update_credential_attacker_uuid.call_args_list + } + assert called_with == {("1.1.1.1", "mock-uuid"), ("2.2.2.2", "mock-uuid")} + + @pytest.mark.asyncio + async def test_backfill_uses_returned_uuid(self): + """The (ip, uuid) pair passed to backfill must match the upsert result.""" + raw = _make_raw_line("ssh", "decky-01", "conn", "9.9.9.9", _TS1) + repo = _make_repo( + logs=[_make_log_row(row_id=1, raw_line=raw, attacker_ip="9.9.9.9")], + max_log_id=1, + ) + repo.upsert_attacker = AsyncMock(return_value="uuid-for-9999") + state = _WorkerState() + + await _incremental_update(repo, state) + + repo.update_credential_attacker_uuid.assert_awaited_once_with( + "9.9.9.9", "uuid-for-9999" + ) + + @pytest.mark.asyncio + async def test_backfill_failure_does_not_crash_worker(self): + """Soft-fail: a backfill error must not block subsequent attackers.""" + rows = [ + _make_log_row( + row_id=i + 1, + raw_line=_make_raw_line("ssh", "decky-01", "conn", ip, _TS1), + attacker_ip=ip, + ) + for i, ip in enumerate(["1.1.1.1", "2.2.2.2"]) + ] + repo = _make_repo(logs=rows, max_log_id=2) + repo.update_credential_attacker_uuid = AsyncMock( + side_effect=RuntimeError("DB exploded") + ) + state = _WorkerState() + + await _incremental_update(repo, state) + + # Both attackers were still upserted and the behavior rollup ran. + assert repo.upsert_attacker.await_count == 2 + assert repo.upsert_attacker_behavior.await_count == 2 + + # ─── JA3 bounty extraction from ingester ───────────────────────────────────── class TestJA3BountyExtraction: