diff --git a/decnet/net/__init__.py b/decnet/net/__init__.py new file mode 100644 index 00000000..7e95ea8d --- /dev/null +++ b/decnet/net/__init__.py @@ -0,0 +1,7 @@ +"""Shared network helpers. + +Currently houses :mod:`decnet.net.http` — the canonical stealth-egress +``httpx.AsyncClient`` factory for outbound calls to 3rd-party services +that should NOT see "DECNET" in their access logs (threat-intel +providers, future TI lookups, etc.). +""" diff --git a/decnet/net/http.py b/decnet/net/http.py new file mode 100644 index 00000000..3616de02 --- /dev/null +++ b/decnet/net/http.py @@ -0,0 +1,59 @@ +"""Stealth-egress httpx.AsyncClient factory. + +Per the project's stealth posture, outbound calls to *third-party* +services (threat-intel providers, public APIs) MUST NOT advertise +"DECNET" in their User-Agent or other request fingerprints — operators +running honeypots want their reconnaissance dependencies to look like +generic infra, not like a tagged tool. + +Canonical helper for any future module that needs to call a public API +without leaking the DECNET label. Internal calls (worker → operator's +own SIEM via webhook, swarm agent → master) deliberately keep +DECNET-tagged user-agents because the recipient wants the audit trail — +do NOT route those through this client. + +Usage:: + + from decnet.net.http import stealth_client + + async with stealth_client() as client: + resp = await client.get("https://api.greynoise.io/v3/community/1.2.3.4") + +The chosen UA mimics ``curl`` because it's the single most common +"non-browser, non-named-tool" UA on the public internet — anti-bot +filters routinely permit it, and an attacker who got a peek at our +egress wouldn't learn anything more specific than "something used curl". +""" +from __future__ import annotations + +from typing import Optional + +import httpx + +# Pinned to a recent-but-not-bleeding-edge curl release. Bump on the +# normal cadence; anything in-distribution is fine. Keep this string as +# the single source of truth so future stealth helpers (browser-shaped, +# Go-shaped) live as siblings, not divergent constants. +DEFAULT_STEALTH_USER_AGENT: str = "curl/7.88.1" + + +def stealth_client( + *, + timeout: float = 10.0, + user_agent: Optional[str] = None, + follow_redirects: bool = False, +) -> httpx.AsyncClient: + """Return an httpx.AsyncClient with a generic stealth User-Agent. + + Returns a fresh client per call — callers own the lifecycle and + SHOULD use ``async with`` to ensure connection-pool teardown. + + ``follow_redirects`` defaults to ``False`` because most threat-intel + APIs return canonical URLs and a redirect typically signals an auth + or path mistake we'd rather surface than chase. + """ + return httpx.AsyncClient( + headers={"User-Agent": user_agent or DEFAULT_STEALTH_USER_AGENT}, + timeout=timeout, + follow_redirects=follow_redirects, + ) diff --git a/tests/intel/test_stealth_http.py b/tests/intel/test_stealth_http.py new file mode 100644 index 00000000..d5614a75 --- /dev/null +++ b/tests/intel/test_stealth_http.py @@ -0,0 +1,65 @@ +"""Stealth-egress HTTP client must NOT advertise DECNET. + +Captures the request that the client emits (using httpx's MockTransport) +and asserts the User-Agent never contains a DECNET marker. This is the +most important contract on the file — every threat-intel egress path +inherits it. +""" +from __future__ import annotations + +import httpx +import pytest + +from decnet.net.http import DEFAULT_STEALTH_USER_AGENT, stealth_client + + +_FORBIDDEN_TOKENS = ("decnet", "honeypot", "decoy", "deck") + + +@pytest.mark.anyio +async def test_default_user_agent_is_curl_shaped(): + captured: list[httpx.Request] = [] + + async def _handler(request: httpx.Request) -> httpx.Response: + captured.append(request) + return httpx.Response(200, json={"ok": True}) + + transport = httpx.MockTransport(_handler) + async with stealth_client() as base: + # Swap transport to keep test offline. + base._transport = transport # noqa: SLF001 — internal field, deliberate + await base.get("https://api.example.test/check") + + ua = captured[0].headers.get("user-agent", "") + assert ua == DEFAULT_STEALTH_USER_AGENT + lower = ua.lower() + for token in _FORBIDDEN_TOKENS: + assert token not in lower, f"stealth UA leaked {token!r}: {ua!r}" + + +@pytest.mark.anyio +async def test_custom_user_agent_override_takes_effect(): + captured: list[httpx.Request] = [] + + async def _handler(request: httpx.Request) -> httpx.Response: + captured.append(request) + return httpx.Response(200) + + transport = httpx.MockTransport(_handler) + async with stealth_client(user_agent="Mozilla/5.0 (test)") as base: + base._transport = transport # noqa: SLF001 + await base.get("https://api.example.test/") + + assert captured[0].headers["user-agent"] == "Mozilla/5.0 (test)" + + +@pytest.mark.anyio +async def test_redirects_do_not_follow_by_default(): + async def _handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(302, headers={"Location": "https://elsewhere/"}) + + transport = httpx.MockTransport(_handler) + async with stealth_client() as base: + base._transport = transport # noqa: SLF001 + resp = await base.get("https://api.example.test/") + assert resp.status_code == 302