feat(profiler): backfill Credential.attacker_uuid on attacker upsert

Credential capture runs before the profiler mints an Attacker, so
Credential.attacker_uuid is nullable on write. The profiler now
backfills the FK after each successful upsert_attacker. Soft-fail
posture matches the surrounding behavior + smtp rollups so a backfill
error never blocks the next attacker.
This commit is contained in:
2026-04-26 03:30:44 -04:00
parent ce4be68501
commit 00ecea924a
2 changed files with 84 additions and 0 deletions

View File

@@ -248,6 +248,17 @@ async def _update_profiles(
record = _build_record(ip, events, traversal, bounties, commands) record = _build_record(ip, events, traversal, bounties, commands)
attacker_uuid = await repo.upsert_attacker(record) attacker_uuid = await repo.upsert_attacker(record)
# Backfill Credential.attacker_uuid for every credential row
# captured before the profiler had minted this Attacker. The
# capture path runs before the profiler — coupling them would
# create a chicken-and-egg ordering bug. Soft-fail so a backfill
# error never blocks the next attacker.
try:
await repo.update_credential_attacker_uuid(ip, attacker_uuid)
except Exception as exc:
_span.record_exception(exc)
logger.error("attacker worker: credential backfill failed for %s: %s", ip, exc)
_span.set_attribute("is_traversal", traversal is not None) _span.set_attribute("is_traversal", traversal is not None)
_span.set_attribute("bounty_count", len(bounties)) _span.set_attribute("bounty_count", len(bounties))
_span.set_attribute("command_count", len(commands)) _span.set_attribute("command_count", len(commands))

View File

@@ -138,6 +138,7 @@ def _make_repo(logs=None, bounties=None, bounties_for_ips=None, max_log_id=0, sa
repo.set_state = AsyncMock() repo.set_state = AsyncMock()
repo.upsert_attacker = AsyncMock(return_value="mock-uuid") repo.upsert_attacker = AsyncMock(return_value="mock-uuid")
repo.upsert_attacker_behavior = AsyncMock() repo.upsert_attacker_behavior = AsyncMock()
repo.update_credential_attacker_uuid = AsyncMock(return_value=0)
return repo return repo
@@ -697,6 +698,78 @@ class TestAttackerProfileWorker:
assert initialized is False assert initialized is False
# ─── Credential.attacker_uuid backfill ───────────────────────────────────────
class TestCredentialBackfill:
"""Profiler must call repo.update_credential_attacker_uuid(ip, uuid)
after every successful upsert_attacker so credentials captured before
the attacker was minted get retroactively linked.
"""
@pytest.mark.asyncio
async def test_backfill_called_per_attacker(self):
rows = [
_make_log_row(
row_id=i + 1,
raw_line=_make_raw_line("ssh", "decky-01", "conn", ip, _TS1),
attacker_ip=ip,
)
for i, ip in enumerate(["1.1.1.1", "2.2.2.2"])
]
repo = _make_repo(logs=rows, max_log_id=2)
state = _WorkerState()
await _incremental_update(repo, state)
assert repo.update_credential_attacker_uuid.await_count == 2
called_with = {
(c.args[0], c.args[1])
for c in repo.update_credential_attacker_uuid.call_args_list
}
assert called_with == {("1.1.1.1", "mock-uuid"), ("2.2.2.2", "mock-uuid")}
@pytest.mark.asyncio
async def test_backfill_uses_returned_uuid(self):
"""The (ip, uuid) pair passed to backfill must match the upsert result."""
raw = _make_raw_line("ssh", "decky-01", "conn", "9.9.9.9", _TS1)
repo = _make_repo(
logs=[_make_log_row(row_id=1, raw_line=raw, attacker_ip="9.9.9.9")],
max_log_id=1,
)
repo.upsert_attacker = AsyncMock(return_value="uuid-for-9999")
state = _WorkerState()
await _incremental_update(repo, state)
repo.update_credential_attacker_uuid.assert_awaited_once_with(
"9.9.9.9", "uuid-for-9999"
)
@pytest.mark.asyncio
async def test_backfill_failure_does_not_crash_worker(self):
"""Soft-fail: a backfill error must not block subsequent attackers."""
rows = [
_make_log_row(
row_id=i + 1,
raw_line=_make_raw_line("ssh", "decky-01", "conn", ip, _TS1),
attacker_ip=ip,
)
for i, ip in enumerate(["1.1.1.1", "2.2.2.2"])
]
repo = _make_repo(logs=rows, max_log_id=2)
repo.update_credential_attacker_uuid = AsyncMock(
side_effect=RuntimeError("DB exploded")
)
state = _WorkerState()
await _incremental_update(repo, state)
# Both attackers were still upserted and the behavior rollup ran.
assert repo.upsert_attacker.await_count == 2
assert repo.upsert_attacker_behavior.await_count == 2
# ─── JA3 bounty extraction from ingester ───────────────────────────────────── # ─── JA3 bounty extraction from ingester ─────────────────────────────────────
class TestJA3BountyExtraction: class TestJA3BountyExtraction: