From 0dd3811436e55c9fdfc2cd6e88b55d7989847c32 Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 04:56:47 -0400 Subject: [PATCH] feat(intel): attacker_intel table + repo helpers New TTL-cached threat-intel row keyed by attacker IP, with per-provider verdict/raw/queried_at columns for GreyNoise, AbuseIPDB, abuse.ch Feodo Tracker and ThreatFox. Carries schema_version from day one (federation wire-format precedent set by SessionProfile). Repo gains upsert_attacker_intel, get_attacker_intel_by_ip, and a get_unenriched_attacker_ips backfill primitive that picks fresh + stale rows for the forthcoming 'decnet enrich' worker. Also documents the open-source intel-source backlog in DEVELOPMENT_V2. --- decnet/web/db/models/__init__.py | 4 + decnet/web/db/models/attacker_intel.py | 83 +++++++++++++++++ decnet/web/db/repository.py | 22 +++++ decnet/web/db/sqlmodel_repo.py | 68 ++++++++++++++ development/DEVELOPMENT_V2.md | 49 ++++++++++ tests/intel/__init__.py | 0 tests/intel/test_attacker_intel_repo.py | 113 ++++++++++++++++++++++++ 7 files changed, 339 insertions(+) create mode 100644 decnet/web/db/models/attacker_intel.py create mode 100644 tests/intel/__init__.py create mode 100644 tests/intel/test_attacker_intel_repo.py diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index c55f99ac..06f76ddc 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -35,6 +35,9 @@ from .attackers import ( SessionProfile, SmtpTarget, ) +from .attacker_intel import ( + AttackerIntel, +) from .deploy import ( DeployIniRequest, DeployResponse, @@ -157,6 +160,7 @@ __all__ = [ # attackers "Attacker", "AttackerBehavior", + "AttackerIntel", "AttackersResponse", "SessionProfile", "SmtpTarget", diff --git a/decnet/web/db/models/attacker_intel.py b/decnet/web/db/models/attacker_intel.py new file mode 100644 index 00000000..4454b835 --- /dev/null +++ b/decnet/web/db/models/attacker_intel.py @@ -0,0 +1,83 @@ +"""Threat-intel enrichment row — one per attacker IP, TTL-cached.""" +from datetime import datetime, timezone +from typing import Optional + +from sqlalchemy import Column +from sqlmodel import Field, SQLModel + +from ._base import _BIG_TEXT + + +class AttackerIntel(SQLModel, table=True): + """Aggregated threat-intel verdict for a single attacker IP. + + Populated by the ``decnet enrich`` worker, which queries multiple + free-tier intel providers (GreyNoise Community, AbuseIPDB, + abuse.ch Feodo Tracker + ThreatFox) and writes one row per + attacker IP. The row is TTL-cached via ``expires_at`` so re-firings + inside the cache window short-circuit before any HTTP egress. + + Per-provider columns are nullable until each provider has answered; + the enrichment pass writes whichever providers succeeded and leaves + the rest unchanged on a partial failure. + + ``schema_version`` is committed to storage from day one — federation + gossip in v2/v3 requires cross-operator compatibility, and + retrofitting a version column after rows exist is painful. Mirrors + the rationale on :class:`SessionProfile`. + """ + + __tablename__ = "attacker_intel" + + uuid: str = Field(primary_key=True) # uuid.uuid4().hex, generated by writer + attacker_uuid: Optional[str] = Field(default=None, index=True) + attacker_ip: str = Field(index=True, unique=True) + schema_version: int = Field(default=1) + + # ── GreyNoise Community ───────────────────────────────────────────── + # classification ∈ {"benign", "malicious", "suspicious", "unknown"} + greynoise_classification: Optional[str] = Field(default=None, max_length=32) + greynoise_raw: str = Field( + default="{}", + sa_column=Column("greynoise_raw", _BIG_TEXT, nullable=False, default="{}"), + ) + greynoise_queried_at: Optional[datetime] = Field(default=None) + + # ── AbuseIPDB ──────────────────────────────────────────────────────── + # 0..100 abuse confidence score + abuseipdb_score: Optional[int] = Field(default=None) + abuseipdb_raw: str = Field( + default="{}", + sa_column=Column("abuseipdb_raw", _BIG_TEXT, nullable=False, default="{}"), + ) + abuseipdb_queried_at: Optional[datetime] = Field(default=None) + + # ── abuse.ch Feodo Tracker ─────────────────────────────────────────── + feodo_listed: Optional[bool] = Field(default=None) + feodo_raw: str = Field( + default="{}", + sa_column=Column("feodo_raw", _BIG_TEXT, nullable=False, default="{}"), + ) + feodo_queried_at: Optional[datetime] = Field(default=None) + + # ── abuse.ch ThreatFox ─────────────────────────────────────────────── + threatfox_listed: Optional[bool] = Field(default=None) + threatfox_raw: str = Field( + default="{}", + sa_column=Column("threatfox_raw", _BIG_TEXT, nullable=False, default="{}"), + ) + threatfox_queried_at: Optional[datetime] = Field(default=None) + + # ── Aggregate verdict ──────────────────────────────────────────────── + # Synthesised from per-provider columns. ∈ {"malicious", "suspicious", + # "benign", "unknown"}. Used by the dashboard and webhook consumers + # that don't want to reason over four provider columns. + aggregate_verdict: Optional[str] = Field( + default=None, max_length=32, index=True + ) + + # ── TTL bookkeeping ────────────────────────────────────────────────── + cached_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), index=True + ) + expires_at: datetime = Field(index=True) diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 7260b220..9414aacc 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -278,6 +278,28 @@ class BaseRepository(ABC): """Retrieve the keystroke-dynamics profile row for a session.""" pass + @abstractmethod + async def upsert_attacker_intel(self, data: dict[str, Any]) -> str: + """Insert or update the threat-intel row for an attacker IP. + + ``data`` MUST include ``attacker_ip`` and ``expires_at``. Returns + the row UUID. Used by the ``decnet enrich`` worker. + """ + pass + + @abstractmethod + async def get_attacker_intel_by_ip(self, ip: str) -> Optional[dict[str, Any]]: + """Return the threat-intel row for ``ip`` or ``None`` if missing.""" + pass + + @abstractmethod + async def get_unenriched_attacker_ips(self, limit: int = 100) -> list[str]: + """List attacker IPs with no intel row OR whose row is past expires_at. + + Used by the enrich worker to backfill on startup and on each wake. + """ + pass + @abstractmethod async def increment_smtp_target(self, attacker_uuid: str, domain: str) -> None: """ diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 8a72aa38..77e25c28 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -36,6 +36,7 @@ from decnet.web.db.models import ( State, Attacker, AttackerBehavior, + AttackerIntel, SessionProfile, SmtpTarget, SwarmHost, @@ -1195,6 +1196,73 @@ class SQLModelRepository(BaseRepository): return None return row.model_dump(mode="json") + async def upsert_attacker_intel(self, data: dict[str, Any]) -> str: + ip = data["attacker_ip"] + async with self._session() as session: + result = await session.execute( + select(AttackerIntel).where(AttackerIntel.attacker_ip == ip) + ) + existing = result.scalar_one_or_none() + if existing: + for k, v in data.items(): + setattr(existing, k, v) + session.add(existing) + row_uuid = existing.uuid + else: + row_uuid = uuid.uuid4().hex + session.add(AttackerIntel(uuid=row_uuid, **data)) + await session.commit() + return row_uuid + + async def get_attacker_intel_by_ip( + self, + ip: str, + ) -> Optional[dict[str, Any]]: + async with self._session() as session: + result = await session.execute( + select(AttackerIntel).where(AttackerIntel.attacker_ip == ip) + ) + row = result.scalar_one_or_none() + if not row: + return None + d = row.model_dump(mode="json") + for key in ( + "greynoise_raw", + "abuseipdb_raw", + "feodo_raw", + "threatfox_raw", + ): + raw = d.get(key) + if isinstance(raw, str): + try: + d[key] = json.loads(raw) + except (json.JSONDecodeError, TypeError): + pass + return d + + async def get_unenriched_attacker_ips(self, limit: int = 100) -> list[str]: + """IPs in ``attackers`` with no intel row OR a stale (expired) one. + + Stale = ``expires_at < now``. Ordered by ``attackers.last_seen`` desc + so the worker prioritises recent activity on backfill. + """ + now = datetime.now(timezone.utc) + async with self._session() as session: + stmt = ( + select(Attacker.ip) + .outerjoin(AttackerIntel, AttackerIntel.attacker_ip == Attacker.ip) + .where( + or_( + AttackerIntel.uuid.is_(None), + AttackerIntel.expires_at < now, + ) + ) + .order_by(desc(Attacker.last_seen)) + .limit(limit) + ) + result = await session.execute(stmt) + return [row for row in result.scalars().all()] + async def increment_smtp_target(self, attacker_uuid: str, domain: str) -> None: """Upsert an (attacker_uuid, domain) pair and bump count + last_seen. diff --git a/development/DEVELOPMENT_V2.md b/development/DEVELOPMENT_V2.md index ce11eb45..58b1638e 100644 --- a/development/DEVELOPMENT_V2.md +++ b/development/DEVELOPMENT_V2.md @@ -543,3 +543,52 @@ push-only covers it. query surface. - Channel 2 — pull: scope-verified, exact-match, prospective-only, aggregate-response, rate-limited. + +--- + +## Threat Intel Enrichment — Provider Backlog + +Long list of candidate sources for `decnet/intel/`. Open / free-tier +prioritized; Shodan is the explicit paid exception. v1 ships three +(GreyNoise Community, AbuseIPDB, abuse.ch); the rest are post-v1 fodder +slotted in as demand surfaces. + +### Reputation / abuse reports +- AbuseIPDB — community abuse scores, free 1k/day **[v1]** +- CrowdSec CTI — community blocklist API, free +- Spamhaus DROP/EDROP — hijacked netblocks, free +- CINS Score (Sentinel IPS) — reputation feed, free +- FireHOL IP lists — aggregated reputation (GitHub), free +- Project Honey Pot HTTP:BL — DNSBL for HTTP attackers, free +- Emerging Threats open — free blocklist + +### Scanner / noise classification +- GreyNoise Community API — purpose-built for honeypot noise filtering, free **[v1]** +- DShield / SANS ISC — scanned-IP feeds, free +- Tor Project exit-node list — free, no key + +### Active C2 / malware attribution +- abuse.ch Feodo Tracker — botnet C2 IPs, free, no key **[v1]** +- abuse.ch ThreatFox — IOCs from malware analysis, free **[v1]** +- abuse.ch URLhaus — malicious URLs, free +- abuse.ch SSLBL — malicious TLS certs, free +- abuse.ch MalwareBazaar — payload hashes (pairs with payload capture) +- AlienVault OTX — pulse-based IOCs, free with key + +### Host scan / infrastructure +- Shodan — paid, cheap tiers (approved exception) +- Censys — free tier, host scan data +- BinaryEdge — ~250/mo free +- CIRCL passive DNS / passive SSL — free for researchers +- VirusTotal — 4 lookups/min free + +### Network ownership / geo +- Team Cymru IP-to-ASN whois — free DNS-based, no key +- IPinfo — free tier, ASN/company +- MaxMind GeoLite2 — already in use (GeoIP mapping) + +### Misc +- Cloudflare Radar — aggregate intel, free +- Pulsedive — IOC enrichment, free tier +- MISP communities — federated OSINT + diff --git a/tests/intel/__init__.py b/tests/intel/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/intel/test_attacker_intel_repo.py b/tests/intel/test_attacker_intel_repo.py new file mode 100644 index 00000000..d17c57fb --- /dev/null +++ b/tests/intel/test_attacker_intel_repo.py @@ -0,0 +1,113 @@ +""" +Round-trip tests for the ``attacker_intel`` table and its repo helpers. + +Covers: +* empty-write upsert path +* per-provider partial update +* JSON-blob deserialization on read +* TTL bookkeeping (cached_at + expires_at) round-trips intact +* ``get_unenriched_attacker_ips`` selects fresh + stale, skips cached +""" +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +import pytest + +from decnet.web.db.factory import get_repository + + +@pytest.fixture +async def repo(tmp_path): + r = get_repository(db_path=str(tmp_path / "attacker_intel.db")) + await r.initialize() + return r + + +def _intel_payload(ip: str, *, ttl_hours: int = 24, **overrides) -> dict: + now = datetime.now(timezone.utc) + base = { + "attacker_ip": ip, + "cached_at": now, + "expires_at": now + timedelta(hours=ttl_hours), + } + base.update(overrides) + return base + + +@pytest.mark.anyio +async def test_empty_upsert_writes_minimal_row(repo): + row_uuid = await repo.upsert_attacker_intel(_intel_payload("1.2.3.4")) + assert row_uuid + + row = await repo.get_attacker_intel_by_ip("1.2.3.4") + assert row is not None + assert row["attacker_ip"] == "1.2.3.4" + assert row["uuid"] == row_uuid + assert row["schema_version"] == 1 + # All per-provider verdicts default to None. + assert row["greynoise_classification"] is None + assert row["abuseipdb_score"] is None + assert row["feodo_listed"] is None + assert row["threatfox_listed"] is None + assert row["aggregate_verdict"] is None + + +@pytest.mark.anyio +async def test_partial_provider_update_preserves_others(repo): + # First pass: GreyNoise responds, others lag. + first_uuid = await repo.upsert_attacker_intel( + _intel_payload( + "9.9.9.9", + greynoise_classification="malicious", + greynoise_raw='{"classification":"malicious"}', + greynoise_queried_at=datetime.now(timezone.utc), + ) + ) + # Second pass: AbuseIPDB lands. Re-upsert MUST NOT clobber GreyNoise + # columns — the worker passes only the new fields. + second_uuid = await repo.upsert_attacker_intel( + _intel_payload( + "9.9.9.9", + abuseipdb_score=85, + abuseipdb_raw='{"abuseConfidenceScore":85}', + abuseipdb_queried_at=datetime.now(timezone.utc), + ) + ) + assert first_uuid == second_uuid # same row + + row = await repo.get_attacker_intel_by_ip("9.9.9.9") + assert row["greynoise_classification"] == "malicious" + assert row["greynoise_raw"] == {"classification": "malicious"} + assert row["abuseipdb_score"] == 85 + assert row["abuseipdb_raw"] == {"abuseConfidenceScore": 85} + + +@pytest.mark.anyio +async def test_get_missing_returns_none(repo): + assert await repo.get_attacker_intel_by_ip("0.0.0.0") is None + + +@pytest.mark.anyio +async def test_unenriched_selects_fresh_and_stale_ips(repo): + # Seed three attackers via upsert_attacker. + now = datetime.now(timezone.utc) + for ip in ("10.0.0.1", "10.0.0.2", "10.0.0.3"): + await repo.upsert_attacker( + { + "ip": ip, + "first_seen": now, + "last_seen": now, + "event_count": 1, + } + ) + # 10.0.0.1 has fresh intel (not due for refresh). + await repo.upsert_attacker_intel(_intel_payload("10.0.0.1", ttl_hours=24)) + # 10.0.0.2 has stale intel (already expired). + await repo.upsert_attacker_intel(_intel_payload("10.0.0.2", ttl_hours=-1)) + # 10.0.0.3 has no intel row at all. + + pending = await repo.get_unenriched_attacker_ips(limit=10) + assert "10.0.0.1" not in pending # fresh, skipped + assert "10.0.0.2" in pending # stale, queue it + assert "10.0.0.3" in pending # never enriched