feat(net): stealth-egress httpx client factory
Outbound calls to 3rd-party services (threat-intel providers, future TI lookups) MUST NOT advertise 'DECNET' in their user-agent — operators running honeypots want their reconnaissance dependencies to look like generic infra. New decnet.net.http.stealth_client() returns a fresh httpx.AsyncClient with a curl-shaped UA (pinned to a single constant so future siblings — browser-shaped, Go-shaped — sit next to it cleanly). Internal egress (webhook → operator's own SIEM, swarm worker → master) keeps its DECNET-tagged UA; the docstring is explicit about not routing those through this client.
This commit is contained in:
7
decnet/net/__init__.py
Normal file
7
decnet/net/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
"""Shared network helpers.
|
||||||
|
|
||||||
|
Currently houses :mod:`decnet.net.http` — the canonical stealth-egress
|
||||||
|
``httpx.AsyncClient`` factory for outbound calls to 3rd-party services
|
||||||
|
that should NOT see "DECNET" in their access logs (threat-intel
|
||||||
|
providers, future TI lookups, etc.).
|
||||||
|
"""
|
||||||
59
decnet/net/http.py
Normal file
59
decnet/net/http.py
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
"""Stealth-egress httpx.AsyncClient factory.
|
||||||
|
|
||||||
|
Per the project's stealth posture, outbound calls to *third-party*
|
||||||
|
services (threat-intel providers, public APIs) MUST NOT advertise
|
||||||
|
"DECNET" in their User-Agent or other request fingerprints — operators
|
||||||
|
running honeypots want their reconnaissance dependencies to look like
|
||||||
|
generic infra, not like a tagged tool.
|
||||||
|
|
||||||
|
Canonical helper for any future module that needs to call a public API
|
||||||
|
without leaking the DECNET label. Internal calls (worker → operator's
|
||||||
|
own SIEM via webhook, swarm agent → master) deliberately keep
|
||||||
|
DECNET-tagged user-agents because the recipient wants the audit trail —
|
||||||
|
do NOT route those through this client.
|
||||||
|
|
||||||
|
Usage::
|
||||||
|
|
||||||
|
from decnet.net.http import stealth_client
|
||||||
|
|
||||||
|
async with stealth_client() as client:
|
||||||
|
resp = await client.get("https://api.greynoise.io/v3/community/1.2.3.4")
|
||||||
|
|
||||||
|
The chosen UA mimics ``curl`` because it's the single most common
|
||||||
|
"non-browser, non-named-tool" UA on the public internet — anti-bot
|
||||||
|
filters routinely permit it, and an attacker who got a peek at our
|
||||||
|
egress wouldn't learn anything more specific than "something used curl".
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
# Pinned to a recent-but-not-bleeding-edge curl release. Bump on the
|
||||||
|
# normal cadence; anything in-distribution is fine. Keep this string as
|
||||||
|
# the single source of truth so future stealth helpers (browser-shaped,
|
||||||
|
# Go-shaped) live as siblings, not divergent constants.
|
||||||
|
DEFAULT_STEALTH_USER_AGENT: str = "curl/7.88.1"
|
||||||
|
|
||||||
|
|
||||||
|
def stealth_client(
|
||||||
|
*,
|
||||||
|
timeout: float = 10.0,
|
||||||
|
user_agent: Optional[str] = None,
|
||||||
|
follow_redirects: bool = False,
|
||||||
|
) -> httpx.AsyncClient:
|
||||||
|
"""Return an httpx.AsyncClient with a generic stealth User-Agent.
|
||||||
|
|
||||||
|
Returns a fresh client per call — callers own the lifecycle and
|
||||||
|
SHOULD use ``async with`` to ensure connection-pool teardown.
|
||||||
|
|
||||||
|
``follow_redirects`` defaults to ``False`` because most threat-intel
|
||||||
|
APIs return canonical URLs and a redirect typically signals an auth
|
||||||
|
or path mistake we'd rather surface than chase.
|
||||||
|
"""
|
||||||
|
return httpx.AsyncClient(
|
||||||
|
headers={"User-Agent": user_agent or DEFAULT_STEALTH_USER_AGENT},
|
||||||
|
timeout=timeout,
|
||||||
|
follow_redirects=follow_redirects,
|
||||||
|
)
|
||||||
65
tests/intel/test_stealth_http.py
Normal file
65
tests/intel/test_stealth_http.py
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
"""Stealth-egress HTTP client must NOT advertise DECNET.
|
||||||
|
|
||||||
|
Captures the request that the client emits (using httpx's MockTransport)
|
||||||
|
and asserts the User-Agent never contains a DECNET marker. This is the
|
||||||
|
most important contract on the file — every threat-intel egress path
|
||||||
|
inherits it.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from decnet.net.http import DEFAULT_STEALTH_USER_AGENT, stealth_client
|
||||||
|
|
||||||
|
|
||||||
|
_FORBIDDEN_TOKENS = ("decnet", "honeypot", "decoy", "deck")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_default_user_agent_is_curl_shaped():
|
||||||
|
captured: list[httpx.Request] = []
|
||||||
|
|
||||||
|
async def _handler(request: httpx.Request) -> httpx.Response:
|
||||||
|
captured.append(request)
|
||||||
|
return httpx.Response(200, json={"ok": True})
|
||||||
|
|
||||||
|
transport = httpx.MockTransport(_handler)
|
||||||
|
async with stealth_client() as base:
|
||||||
|
# Swap transport to keep test offline.
|
||||||
|
base._transport = transport # noqa: SLF001 — internal field, deliberate
|
||||||
|
await base.get("https://api.example.test/check")
|
||||||
|
|
||||||
|
ua = captured[0].headers.get("user-agent", "")
|
||||||
|
assert ua == DEFAULT_STEALTH_USER_AGENT
|
||||||
|
lower = ua.lower()
|
||||||
|
for token in _FORBIDDEN_TOKENS:
|
||||||
|
assert token not in lower, f"stealth UA leaked {token!r}: {ua!r}"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_custom_user_agent_override_takes_effect():
|
||||||
|
captured: list[httpx.Request] = []
|
||||||
|
|
||||||
|
async def _handler(request: httpx.Request) -> httpx.Response:
|
||||||
|
captured.append(request)
|
||||||
|
return httpx.Response(200)
|
||||||
|
|
||||||
|
transport = httpx.MockTransport(_handler)
|
||||||
|
async with stealth_client(user_agent="Mozilla/5.0 (test)") as base:
|
||||||
|
base._transport = transport # noqa: SLF001
|
||||||
|
await base.get("https://api.example.test/")
|
||||||
|
|
||||||
|
assert captured[0].headers["user-agent"] == "Mozilla/5.0 (test)"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_redirects_do_not_follow_by_default():
|
||||||
|
async def _handler(request: httpx.Request) -> httpx.Response:
|
||||||
|
return httpx.Response(302, headers={"Location": "https://elsewhere/"})
|
||||||
|
|
||||||
|
transport = httpx.MockTransport(_handler)
|
||||||
|
async with stealth_client() as base:
|
||||||
|
base._transport = transport # noqa: SLF001
|
||||||
|
resp = await base.get("https://api.example.test/")
|
||||||
|
assert resp.status_code == 302
|
||||||
Reference in New Issue
Block a user