refactor(intel): re-key attacker_intel on attacker_uuid (closes DEBT-041)
The threat-intel surface was IP-keyed on day one as an expedient — the
worker is woken by IP-bearing bus events. ANTI's call: don't carry that
debt. NO IPs as primary keys anywhere on the attacker-intel surface.
Schema:
- attacker_uuid is now the canonical key — UNIQUE + FK to attackers.uuid.
- attacker_ip stays as a denormalised, indexed, NON-UNIQUE value column.
Updated on every upsert; useful for SIEM payloads and audit lookups,
but explicitly NOT a key. Model docstring says so.
- Pre-v1, no Alembic migration needed. SQLModel.metadata.create_all()
builds the new shape on fresh DBs.
Repo:
- upsert_attacker_intel now keys on attacker_uuid.
- get_attacker_intel_by_ip → get_attacker_intel_by_uuid.
- get_unenriched_attacker_ips → get_unenriched_attackers, returning
[{uuid, ip}] tuples so the worker writes by UUID and dispatches
provider calls by IP without a second round-trip.
Worker:
- _enrich_one(uuid, ip, ...) — UUID lands on the row, IP rides for
provider egress.
- attacker.intel.enriched bus payload gains attacker_uuid alongside
attacker_ip — webhook → SIEM consumers benefit; no removal.
API:
- GET /api/v1/attackers/{ip}/intel deleted outright (rip-and-replace,
never deployed beyond dev).
- GET /api/v1/attackers/{uuid}/intel is the only public route, matching
every other /attackers/* route.
Frontend:
- <IntelPanel uuid={id!} /> uses the URL param directly, fetches in
parallel with the rest of AttackerDetail rather than waiting on
attacker.ip.
Tests: re-keyed in place, 39 passed (same coverage as before the
refactor). Provider-impl tests untouched.
DEBT-041: closed in DEBT.md (entry preserved as historical rationale,
summary table flipped to ✅, remaining-open list shortened by one).
This commit is contained in:
@@ -60,11 +60,17 @@ def _aggregate(verdicts: list[Optional[str]]) -> Optional[str]:
|
||||
|
||||
|
||||
async def _enrich_one(
|
||||
attacker_uuid: str,
|
||||
ip: str,
|
||||
providers: list[IntelProvider],
|
||||
ttl_hours: int,
|
||||
) -> dict[str, Any]:
|
||||
"""Fan out across providers for a single IP and assemble the row update."""
|
||||
"""Fan out across providers for a single attacker and assemble the row.
|
||||
|
||||
Keyed on ``attacker_uuid`` for the eventual upsert; the IP is the wire
|
||||
value the providers see and is denormalised onto the row for SIEM /
|
||||
audit consumers.
|
||||
"""
|
||||
results: list[IntelResult] = await asyncio.gather(
|
||||
*(p.lookup(ip) for p in providers),
|
||||
return_exceptions=False, # providers contractually never raise
|
||||
@@ -72,6 +78,7 @@ async def _enrich_one(
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
row: dict[str, Any] = {
|
||||
"attacker_uuid": attacker_uuid,
|
||||
"attacker_ip": ip,
|
||||
"cached_at": now,
|
||||
"expires_at": now + timedelta(hours=ttl_hours),
|
||||
@@ -144,7 +151,7 @@ async def run_intel_loop(
|
||||
try:
|
||||
while not shutdown.is_set():
|
||||
try:
|
||||
pending = await repo.get_unenriched_attacker_ips(
|
||||
pending = await repo.get_unenriched_attackers(
|
||||
limit=backfill_batch,
|
||||
)
|
||||
except Exception: # noqa: BLE001
|
||||
@@ -152,16 +159,21 @@ async def run_intel_loop(
|
||||
pending = []
|
||||
|
||||
if pending and providers:
|
||||
for ip in pending:
|
||||
for entry in pending:
|
||||
if shutdown.is_set():
|
||||
break
|
||||
attacker_uuid = entry["uuid"]
|
||||
ip = entry["ip"]
|
||||
try:
|
||||
row = await _enrich_one(ip, providers, ttl_hours)
|
||||
row = await _enrich_one(
|
||||
attacker_uuid, ip, providers, ttl_hours,
|
||||
)
|
||||
await repo.upsert_attacker_intel(row)
|
||||
await publish_safely(
|
||||
bus,
|
||||
_topics.attacker(_topics.ATTACKER_INTEL_ENRICHED),
|
||||
{
|
||||
"attacker_uuid": attacker_uuid,
|
||||
"attacker_ip": ip,
|
||||
"aggregate_verdict": row.get("aggregate_verdict"),
|
||||
"providers": [p.name for p in providers],
|
||||
@@ -170,7 +182,8 @@ async def run_intel_loop(
|
||||
)
|
||||
except Exception: # noqa: BLE001
|
||||
log.exception(
|
||||
"intel worker: enrichment failed for ip=%s", ip,
|
||||
"intel worker: enrichment failed for uuid=%s ip=%s",
|
||||
attacker_uuid, ip,
|
||||
)
|
||||
|
||||
try:
|
||||
|
||||
@@ -30,8 +30,18 @@ class AttackerIntel(SQLModel, table=True):
|
||||
__tablename__ = "attacker_intel"
|
||||
|
||||
uuid: str = Field(primary_key=True) # uuid.uuid4().hex, generated by writer
|
||||
attacker_uuid: Optional[str] = Field(default=None, index=True)
|
||||
attacker_ip: str = Field(index=True, unique=True)
|
||||
# Canonical key. One intel row per attacker UUID; FK guarantees no orphan
|
||||
# rows when an attacker is deleted, and UNIQUE keeps upserts honest.
|
||||
attacker_uuid: str = Field(
|
||||
foreign_key="attackers.uuid",
|
||||
unique=True,
|
||||
index=True,
|
||||
)
|
||||
# DENORMALISED — NOT a key. The IP the worker queried providers with at
|
||||
# write time. Useful for SIEM payloads and audit lookups; updated on every
|
||||
# upsert if the attacker rotates IPs. Never use this column as a lookup
|
||||
# key; ``attacker_uuid`` is the only canonical identifier here.
|
||||
attacker_ip: str = Field(index=True)
|
||||
schema_version: int = Field(default=1)
|
||||
|
||||
# ── GreyNoise Community ─────────────────────────────────────────────
|
||||
|
||||
@@ -280,23 +280,31 @@ class BaseRepository(ABC):
|
||||
|
||||
@abstractmethod
|
||||
async def upsert_attacker_intel(self, data: dict[str, Any]) -> str:
|
||||
"""Insert or update the threat-intel row for an attacker IP.
|
||||
"""Insert or update the threat-intel row for an attacker UUID.
|
||||
|
||||
``data`` MUST include ``attacker_ip`` and ``expires_at``. Returns
|
||||
the row UUID. Used by the ``decnet enrich`` worker.
|
||||
``data`` MUST include ``attacker_uuid``, ``attacker_ip`` and
|
||||
``expires_at``. Returns the row UUID. Keyed on ``attacker_uuid``
|
||||
(UNIQUE + FK to ``attackers.uuid``); ``attacker_ip`` is denormalised
|
||||
— it gets overwritten on every upsert if the attacker rotates IPs.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_attacker_intel_by_ip(self, ip: str) -> Optional[dict[str, Any]]:
|
||||
"""Return the threat-intel row for ``ip`` or ``None`` if missing."""
|
||||
async def get_attacker_intel_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]:
|
||||
"""Return the threat-intel row for ``uuid`` or ``None`` if missing."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_unenriched_attacker_ips(self, limit: int = 100) -> list[str]:
|
||||
"""List attacker IPs with no intel row OR whose row is past expires_at.
|
||||
async def get_unenriched_attackers(
|
||||
self, limit: int = 100,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""List ``{"uuid", "ip"}`` pairs for attackers with no intel row OR
|
||||
whose row is past ``expires_at``.
|
||||
|
||||
Used by the enrich worker to backfill on startup and on each wake.
|
||||
Returns both fields so the worker can write keyed on UUID without
|
||||
a second per-attacker DB round-trip to resolve the IP for outbound
|
||||
provider calls.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
@@ -1197,10 +1197,12 @@ class SQLModelRepository(BaseRepository):
|
||||
return row.model_dump(mode="json")
|
||||
|
||||
async def upsert_attacker_intel(self, data: dict[str, Any]) -> str:
|
||||
ip = data["attacker_ip"]
|
||||
attacker_uuid_value = data["attacker_uuid"]
|
||||
async with self._session() as session:
|
||||
result = await session.execute(
|
||||
select(AttackerIntel).where(AttackerIntel.attacker_ip == ip)
|
||||
select(AttackerIntel).where(
|
||||
AttackerIntel.attacker_uuid == attacker_uuid_value,
|
||||
)
|
||||
)
|
||||
existing = result.scalar_one_or_none()
|
||||
if existing:
|
||||
@@ -1214,13 +1216,13 @@ class SQLModelRepository(BaseRepository):
|
||||
await session.commit()
|
||||
return row_uuid
|
||||
|
||||
async def get_attacker_intel_by_ip(
|
||||
async def get_attacker_intel_by_uuid(
|
||||
self,
|
||||
ip: str,
|
||||
uuid: str,
|
||||
) -> Optional[dict[str, Any]]:
|
||||
async with self._session() as session:
|
||||
result = await session.execute(
|
||||
select(AttackerIntel).where(AttackerIntel.attacker_ip == ip)
|
||||
select(AttackerIntel).where(AttackerIntel.attacker_uuid == uuid)
|
||||
)
|
||||
row = result.scalar_one_or_none()
|
||||
if not row:
|
||||
@@ -1240,17 +1242,23 @@ class SQLModelRepository(BaseRepository):
|
||||
pass
|
||||
return d
|
||||
|
||||
async def get_unenriched_attacker_ips(self, limit: int = 100) -> list[str]:
|
||||
"""IPs in ``attackers`` with no intel row OR a stale (expired) one.
|
||||
async def get_unenriched_attackers(
|
||||
self, limit: int = 100,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""``{"uuid", "ip"}`` pairs with no intel row OR a stale (expired) one.
|
||||
|
||||
Stale = ``expires_at < now``. Ordered by ``attackers.last_seen`` desc
|
||||
so the worker prioritises recent activity on backfill.
|
||||
so the worker prioritises recent activity on backfill. Both columns
|
||||
are projected so the worker can write keyed on UUID and dispatch
|
||||
provider calls keyed on IP without a second round-trip.
|
||||
"""
|
||||
now = datetime.now(timezone.utc)
|
||||
async with self._session() as session:
|
||||
stmt = (
|
||||
select(Attacker.ip)
|
||||
.outerjoin(AttackerIntel, AttackerIntel.attacker_ip == Attacker.ip)
|
||||
select(Attacker.uuid, Attacker.ip)
|
||||
.outerjoin(
|
||||
AttackerIntel, AttackerIntel.attacker_uuid == Attacker.uuid,
|
||||
)
|
||||
.where(
|
||||
or_(
|
||||
AttackerIntel.uuid.is_(None),
|
||||
@@ -1261,7 +1269,10 @@ class SQLModelRepository(BaseRepository):
|
||||
.limit(limit)
|
||||
)
|
||||
result = await session.execute(stmt)
|
||||
return [row for row in result.scalars().all()]
|
||||
return [
|
||||
{"uuid": uuid_, "ip": ip}
|
||||
for uuid_, ip in result.all()
|
||||
]
|
||||
|
||||
async def increment_smtp_target(self, attacker_uuid: str, domain: str) -> None:
|
||||
"""Upsert an (attacker_uuid, domain) pair and bump count + last_seen.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
"""GET /api/v1/attackers/{ip}/intel — latest threat-intel row for an IP."""
|
||||
"""GET /api/v1/attackers/{uuid}/intel — latest threat-intel row for an attacker."""
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
@@ -10,27 +10,29 @@ router = APIRouter()
|
||||
|
||||
|
||||
@router.get(
|
||||
"/attackers/{ip}/intel",
|
||||
"/attackers/{uuid}/intel",
|
||||
tags=["Attacker Profiles"],
|
||||
responses={
|
||||
401: {"description": "Could not validate credentials"},
|
||||
403: {"description": "Insufficient permissions"},
|
||||
404: {"description": "No intel cached for this IP"},
|
||||
404: {"description": "No intel cached for this attacker"},
|
||||
},
|
||||
)
|
||||
@_traced("api.get_attacker_intel")
|
||||
async def get_attacker_intel(
|
||||
ip: str,
|
||||
uuid: str,
|
||||
user: dict = Depends(require_viewer),
|
||||
) -> dict[str, Any]:
|
||||
"""Return the most recent cached threat-intel verdict for ``ip``.
|
||||
"""Return the most recent cached threat-intel verdict for an attacker.
|
||||
|
||||
The row is populated out-of-band by the ``decnet enrich`` worker
|
||||
(typically within seconds of first observation, sub-second when the
|
||||
bus is healthy). 404 means either the worker has not run yet or the
|
||||
IP has never been observed by DECNET.
|
||||
UUID does not correspond to an attacker DECNET has seen.
|
||||
"""
|
||||
record = await repo.get_attacker_intel_by_ip(ip)
|
||||
record = await repo.get_attacker_intel_by_uuid(uuid)
|
||||
if not record:
|
||||
raise HTTPException(status_code=404, detail="No intel cached for this IP")
|
||||
raise HTTPException(
|
||||
status_code=404, detail="No intel cached for this attacker",
|
||||
)
|
||||
return record
|
||||
|
||||
Reference in New Issue
Block a user