From d3d9bd5aa732f91d550120e51fd2a54f946628d9 Mon Sep 17 00:00:00 2001 From: anti Date: Sun, 26 Apr 2026 05:17:25 -0400 Subject: [PATCH] feat(intel): `decnet enrich` CLI + GET /attackers/{ip}/intel endpoint CLI command mirrors the reuse-correlate shape (--poll-interval, --ttl-hours, --daemon). Run it under systemd as a sibling worker. The API endpoint returns the most recent cached row for an attacker IP or 404. Auth-gated via require_viewer like every other attacker route. Also extends the worker test with a real FakeBus so the attacker.intel.enriched publish path is exercised end-to-end (no longer a no-op against NullBus). --- decnet/cli/workers.py | 54 +++++++++++++++++ decnet/web/router/__init__.py | 2 + .../attackers/api_get_attacker_intel.py | 36 ++++++++++++ tests/intel/test_worker.py | 58 +++++++++++++++++++ tests/web/test_api_attacker_intel.py | 52 +++++++++++++++++ 5 files changed, 202 insertions(+) create mode 100644 decnet/web/router/attackers/api_get_attacker_intel.py create mode 100644 tests/web/test_api_attacker_intel.py diff --git a/decnet/cli/workers.py b/decnet/cli/workers.py index 39c8a76d..426f88ee 100644 --- a/decnet/cli/workers.py +++ b/decnet/cli/workers.py @@ -82,6 +82,60 @@ def register(app: typer.Typer) -> None: asyncio.run(_run()) + @app.command(name="enrich") + def enrich( + poll_interval_secs: float = typer.Option( + 60.0, "--poll-interval", "-i", + help="Slow-tick fallback when the bus is idle or unavailable (seconds)", + ), + ttl_hours: int = typer.Option( + 24, "--ttl-hours", + help="Cache lifetime per attacker IP — re-firings inside the window short-circuit before any HTTP egress", + ), + daemon: bool = typer.Option( + False, "--daemon", "-d", + help="Detach to background as a daemon process", + ), + ) -> None: + """Threat-intel enrichment worker — fan out per attacker IP across + configured providers (GreyNoise, AbuseIPDB, abuse.ch Feodo Tracker + + ThreatFox), cache the verdict in ``attacker_intel``, and publish + ``attacker.intel.enriched`` for SIEM-bound webhook consumers. + """ + import asyncio + from decnet.intel.worker import run_intel_loop + from decnet.web.dependencies import repo + + if daemon: + log.info( + "enrich daemonizing poll=%s ttl_hours=%d", + poll_interval_secs, ttl_hours, + ) + _utils._daemonize() + + log.info( + "enrich command invoked poll=%s ttl_hours=%d", + poll_interval_secs, ttl_hours, + ) + console.print( + f"[bold cyan]Intel enrichment starting[/] " + f"poll={poll_interval_secs}s ttl={ttl_hours}h" + ) + console.print("[dim]Press Ctrl+C to stop[/]") + + async def _run() -> None: + await repo.initialize() + await run_intel_loop( + repo, + poll_interval_secs=poll_interval_secs, + ttl_hours=ttl_hours, + ) + + try: + asyncio.run(_run()) + except KeyboardInterrupt: + console.print("\n[yellow]Intel enrichment stopped.[/]") + @app.command(name="reuse-correlate") def reuse_correlate( min_targets: int = typer.Option( diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index 40eddd1b..71fc4402 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -20,6 +20,7 @@ from .attackers.api_get_attacker_artifacts import router as attacker_artifacts_r from .attackers.api_get_attacker_transcripts import router as attacker_transcripts_router from .attackers.api_get_attacker_smtp_targets import router as attacker_smtp_targets_router from .attackers.api_get_attacker_mail import router as attacker_mail_router +from .attackers.api_get_attacker_intel import router as attacker_intel_router from .transcripts import transcripts_router from .config.api_get_config import router as config_get_router from .config.api_update_config import router as config_update_router @@ -81,6 +82,7 @@ api_router.include_router(attacker_artifacts_router) api_router.include_router(attacker_transcripts_router) api_router.include_router(attacker_smtp_targets_router) api_router.include_router(attacker_mail_router) +api_router.include_router(attacker_intel_router) # Observability api_router.include_router(stats_router) diff --git a/decnet/web/router/attackers/api_get_attacker_intel.py b/decnet/web/router/attackers/api_get_attacker_intel.py new file mode 100644 index 00000000..93d57b6c --- /dev/null +++ b/decnet/web/router/attackers/api_get_attacker_intel.py @@ -0,0 +1,36 @@ +"""GET /api/v1/attackers/{ip}/intel — latest threat-intel row for an IP.""" +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException + +from decnet.telemetry import traced as _traced +from decnet.web.dependencies import repo, require_viewer + +router = APIRouter() + + +@router.get( + "/attackers/{ip}/intel", + tags=["Attacker Profiles"], + responses={ + 401: {"description": "Could not validate credentials"}, + 403: {"description": "Insufficient permissions"}, + 404: {"description": "No intel cached for this IP"}, + }, +) +@_traced("api.get_attacker_intel") +async def get_attacker_intel( + ip: str, + user: dict = Depends(require_viewer), +) -> dict[str, Any]: + """Return the most recent cached threat-intel verdict for ``ip``. + + The row is populated out-of-band by the ``decnet enrich`` worker + (typically within seconds of first observation, sub-second when the + bus is healthy). 404 means either the worker has not run yet or the + IP has never been observed by DECNET. + """ + record = await repo.get_attacker_intel_by_ip(ip) + if not record: + raise HTTPException(status_code=404, detail="No intel cached for this IP") + return record diff --git a/tests/intel/test_worker.py b/tests/intel/test_worker.py index 11319973..05a76d99 100644 --- a/tests/intel/test_worker.py +++ b/tests/intel/test_worker.py @@ -203,3 +203,61 @@ async def test_provider_error_does_not_poison_row(repo): assert row["abuseipdb_score"] is None # Aggregate reflects only the providers that responded. assert row["aggregate_verdict"] == "benign" + + +@pytest.mark.anyio +async def test_intel_enriched_event_published_to_bus(repo, monkeypatch): + """End-to-end: worker dispatches providers + publishes the event.""" + from decnet.bus.fake import FakeBus + from decnet.bus.topics import ATTACKER_INTEL_ENRICHED, attacker + + # Re-enable bus path; swap factory for a shared FakeBus instance the + # test can also subscribe to. + monkeypatch.setenv("DECNET_BUS_ENABLED", "true") + monkeypatch.setenv("DECNET_BUS_TYPE", "fake") + shared_bus = FakeBus() + + from decnet.intel import worker as worker_mod + monkeypatch.setattr( + worker_mod, "get_bus", lambda **_: shared_bus, + ) + + # Subscribe before the worker starts so we don't race the publish. + sub = shared_bus.subscribe(attacker(ATTACKER_INTEL_ENRICHED)) + await sub.__aenter__() + + now = datetime.now(timezone.utc) + await repo.upsert_attacker( + {"ip": "4.4.4.4", "first_seen": now, "last_seen": now, "event_count": 1} + ) + + provider = _FakeProvider( + "greynoise", + verdict="malicious", + column_updates={ + "greynoise_classification": "malicious", + "greynoise_raw": "{}", + "greynoise_queried_at": datetime.now(timezone.utc), + }, + ) + + shutdown = asyncio.Event() + task = asyncio.create_task( + run_intel_loop( + repo, + poll_interval_secs=0.05, + providers=[provider], + shutdown=shutdown, + ) + ) + try: + event = await asyncio.wait_for(sub.__anext__(), timeout=2.0) + finally: + shutdown.set() + await asyncio.wait_for(task, timeout=2.0) + await sub.__aexit__(None, None, None) + + payload = event.payload + assert payload["attacker_ip"] == "4.4.4.4" + assert payload["aggregate_verdict"] == "malicious" + assert payload["providers"] == ["greynoise"] diff --git a/tests/web/test_api_attacker_intel.py b/tests/web/test_api_attacker_intel.py new file mode 100644 index 00000000..b7b31405 --- /dev/null +++ b/tests/web/test_api_attacker_intel.py @@ -0,0 +1,52 @@ +"""Tests for GET /api/v1/attackers/{ip}/intel.""" +from __future__ import annotations + +from unittest.mock import AsyncMock, patch + +import pytest +from fastapi import HTTPException + + +@pytest.mark.asyncio +async def test_returns_cached_intel_row(): + from decnet.web.router.attackers.api_get_attacker_intel import ( + get_attacker_intel, + ) + + fake_row = { + "attacker_ip": "1.2.3.4", + "aggregate_verdict": "malicious", + "greynoise_classification": "malicious", + "abuseipdb_score": 92, + "feodo_listed": True, + "threatfox_listed": False, + } + with patch( + "decnet.web.router.attackers.api_get_attacker_intel.repo" + ) as mock_repo: + mock_repo.get_attacker_intel_by_ip = AsyncMock(return_value=fake_row) + result = await get_attacker_intel( + ip="1.2.3.4", + user={"uuid": "viewer", "role": "viewer"}, + ) + assert result["aggregate_verdict"] == "malicious" + assert result["abuseipdb_score"] == 92 + + +@pytest.mark.asyncio +async def test_404_when_no_row_cached(): + from decnet.web.router.attackers.api_get_attacker_intel import ( + get_attacker_intel, + ) + + with patch( + "decnet.web.router.attackers.api_get_attacker_intel.repo" + ) as mock_repo: + mock_repo.get_attacker_intel_by_ip = AsyncMock(return_value=None) + with pytest.raises(HTTPException) as excinfo: + await get_attacker_intel( + ip="0.0.0.0", + user={"uuid": "viewer", "role": "viewer"}, + ) + assert excinfo.value.status_code == 404 + assert "No intel cached" in excinfo.value.detail