diff --git a/decnet/profiler/behavioral.py b/decnet/profiler/behavioral.py index 8875605..f8d3283 100644 --- a/decnet/profiler/behavioral.py +++ b/decnet/profiler/behavioral.py @@ -6,12 +6,17 @@ Consumes the chronological `LogEvent` stream already built by - Inter-event timing statistics (mean / median / stdev / min / max) - Coefficient-of-variation (jitter metric) - - Beaconing vs. interactive vs. scanning classification + - Beaconing vs. interactive vs. scanning vs. brute_force vs. slow_scan + classification - Tool attribution against known C2 frameworks (Cobalt Strike, Sliver, - Havoc, Mythic) using default beacon/jitter profiles + Havoc, Mythic) using default beacon/jitter profiles — returns a list, + since multiple tools can be in use simultaneously + - Header-based tool detection (Nmap NSE, Gophish, Nikto, sqlmap, etc.) + from HTTP request events - Recon → exfil phase sequencing (latency between the last recon event and the first exfil-like event) - - OS / TCP fingerprint + retransmit rollup from sniffer-emitted events + - OS / TCP fingerprint + retransmit rollup from sniffer-emitted events, + with TTL-based fallback when p0f returns no match Pure-Python; no external dependencies. All functions are safe to call from both sync and async contexts. @@ -20,6 +25,7 @@ both sync and async contexts. from __future__ import annotations import json +import re import statistics from collections import Counter from typing import Any @@ -47,15 +53,14 @@ _EXFIL_EVENT_TYPES: frozenset[str] = frozenset({ # Fields carrying payload byte counts (for "large payload" detection). _PAYLOAD_SIZE_FIELDS: tuple[str, ...] = ("bytes", "size", "content_length") -# ─── C2 tool attribution signatures ───────────────────────────────────────── +# ─── C2 tool attribution signatures (beacon timing) ───────────────────────── # # Each entry lists the default beacon cadence profile of a popular C2. # A profile *matches* an attacker when: # - mean inter-event time is within ±`interval_tolerance` seconds, AND # - jitter (cv = stdev / mean) is within ±`jitter_tolerance` # -# These defaults are documented in each framework's public user guides; -# real operators often tune them, so attribution is advisory, not definitive. +# Multiple matches are all returned (attacker may run multiple implants). _TOOL_SIGNATURES: tuple[dict[str, Any], ...] = ( { @@ -88,6 +93,47 @@ _TOOL_SIGNATURES: tuple[dict[str, Any], ...] = ( }, ) +# ─── Header-based tool signatures ─────────────────────────────────────────── +# +# Scanned against HTTP `request` events. `pattern` is a case-insensitive +# substring (or a regex anchored with ^ if it starts with that character). +# `header` is matched case-insensitively against the event's headers dict. + +_HEADER_TOOL_SIGNATURES: tuple[dict[str, str], ...] = ( + {"name": "nmap", "header": "user-agent", "pattern": "Nmap Scripting Engine"}, + {"name": "gophish", "header": "x-mailer", "pattern": "gophish"}, + {"name": "nikto", "header": "user-agent", "pattern": "Nikto"}, + {"name": "sqlmap", "header": "user-agent", "pattern": "sqlmap"}, + {"name": "nuclei", "header": "user-agent", "pattern": "Nuclei"}, + {"name": "masscan", "header": "user-agent", "pattern": "masscan"}, + {"name": "zgrab", "header": "user-agent", "pattern": "zgrab"}, + {"name": "metasploit", "header": "user-agent", "pattern": "Metasploit"}, + {"name": "curl", "header": "user-agent", "pattern": "^curl/"}, + {"name": "python_requests", "header": "user-agent", "pattern": "python-requests"}, + {"name": "gobuster", "header": "user-agent", "pattern": "gobuster"}, + {"name": "dirbuster", "header": "user-agent", "pattern": "DirBuster"}, + {"name": "hydra", "header": "user-agent", "pattern": "hydra"}, + {"name": "wfuzz", "header": "user-agent", "pattern": "Wfuzz"}, +) + +# ─── TTL → coarse OS bucket (fallback when p0f returns nothing) ───────────── + +def _os_from_ttl(ttl_str: str | None) -> str | None: + """Derive a coarse OS guess from observed TTL when p0f has no match.""" + if not ttl_str: + return None + try: + ttl = int(ttl_str) + except (TypeError, ValueError): + return None + if 55 <= ttl <= 70: + return "linux" + if 115 <= ttl <= 135: + return "windows" + if 235 <= ttl <= 255: + return "embedded" + return None + # ─── Timing stats ─────────────────────────────────────────────────────────── @@ -167,13 +213,16 @@ def timing_stats(events: list[LogEvent]) -> dict[str, Any]: def classify_behavior(stats: dict[str, Any], services_count: int) -> str: """ - Coarse behavior bucket: beaconing | interactive | scanning | mixed | unknown + Coarse behavior bucket: + beaconing | interactive | scanning | brute_force | slow_scan | mixed | unknown - Heuristics: - * `beaconing` — low CV (< 0.35) + mean IAT ≥ 5 s + ≥ 5 events - * `scanning` — ≥ 3 services touched in short bursts (mean IAT < 3 s) - * `interactive` — fast but irregular: mean IAT < 3 s AND CV ≥ 0.5, ≥ 10 events - * `mixed` — moderate count + moderate CV, neither cleanly beaconing nor interactive + Heuristics (evaluated in priority order): + * `scanning` — ≥ 3 services touched OR mean IAT < 2 s, ≥ 3 events + * `brute_force` — 1 service, n ≥ 8, mean IAT < 5 s, CV < 0.6 + * `beaconing` — CV < 0.35, mean IAT ≥ 5 s, ≥ 4 events + * `slow_scan` — ≥ 2 services, mean IAT ≥ 10 s, ≥ 4 events + * `interactive` — mean IAT < 5 s AND CV ≥ 0.5, ≥ 6 events + * `mixed` — catch-all for sessions with enough data * `unknown` — too few data points """ n = stats.get("event_count") or 0 @@ -183,32 +232,45 @@ def classify_behavior(stats: dict[str, Any], services_count: int) -> str: if n < 3 or mean is None: return "unknown" - # Scanning: many services, fast bursts, few events per service. - if services_count >= 3 and mean < 3.0 and n >= 5: + # Slow scan / low-and-slow: multiple services with long gaps. + # Must be checked before generic scanning so slow multi-service sessions + # don't get mis-bucketed as a fast sweep. + if services_count >= 2 and mean >= 10.0 and n >= 4: + return "slow_scan" + + # Scanning: broad service sweep (multi-service) or very rapid single-service bursts. + if n >= 3 and ( + (services_count >= 3 and mean < 10.0) + or (services_count >= 2 and mean < 2.0) + ): return "scanning" - # Beaconing: regular cadence over many events. - if cv is not None and cv < 0.35 and mean >= 5.0 and n >= 5: + # Brute force: hammering one service rapidly and repeatedly. + if services_count == 1 and n >= 8 and mean < 5.0 and cv is not None and cv < 0.6: + return "brute_force" + + # Beaconing: regular cadence over multiple events. + if cv is not None and cv < 0.35 and mean >= 5.0 and n >= 4: return "beaconing" - # Interactive: short, irregular intervals. - if cv is not None and cv >= 0.5 and mean < 3.0 and n >= 10: + # Interactive: short but irregular bursts (human or tool with think time). + if cv is not None and cv >= 0.5 and mean < 5.0 and n >= 6: return "interactive" return "mixed" -# ─── C2 tool attribution ──────────────────────────────────────────────────── +# ─── C2 tool attribution (beacon timing) ──────────────────────────────────── -def guess_tool(mean_iat_s: float | None, cv: float | None) -> str | None: +def guess_tools(mean_iat_s: float | None, cv: float | None) -> list[str]: """ Match (mean_iat, cv) against known C2 default beacon profiles. - Returns the tool name if a single signature matches; None otherwise. - Multiple matches also return None to avoid false attribution. + Returns a list of all matching tool names (may be empty). Multiple + matches are all returned because an attacker can run several implants. """ if mean_iat_s is None or cv is None: - return None + return [] hits: list[str] = [] for sig in _TOOL_SIGNATURES: @@ -218,11 +280,74 @@ def guess_tool(mean_iat_s: float | None, cv: float | None) -> str | None: continue hits.append(sig["name"]) + return hits + + +# Keep the old name as an alias so callers that expected a single string still +# compile, but mark it deprecated. Returns the first hit or None. +def guess_tool(mean_iat_s: float | None, cv: float | None) -> str | None: + """Deprecated: use guess_tools() instead.""" + hits = guess_tools(mean_iat_s, cv) if len(hits) == 1: return hits[0] return None +# ─── Header-based tool detection ──────────────────────────────────────────── + +def detect_tools_from_headers(events: list[LogEvent]) -> list[str]: + """ + Scan HTTP `request` events for tool-identifying headers. + + Checks User-Agent, X-Mailer, and other headers case-insensitively + against `_HEADER_TOOL_SIGNATURES`. Returns a deduplicated list of + matched tool names in detection order. + """ + found: list[str] = [] + seen: set[str] = set() + + for e in events: + if e.event_type != "request": + continue + + raw_headers = e.fields.get("headers") + if not raw_headers: + continue + + # headers may arrive as a JSON string or a dict already + if isinstance(raw_headers, str): + try: + headers: dict[str, str] = json.loads(raw_headers) + except (json.JSONDecodeError, ValueError): + continue + elif isinstance(raw_headers, dict): + headers = raw_headers + else: + continue + + # Normalise header keys to lowercase for matching. + lc_headers: dict[str, str] = {k.lower(): str(v) for k, v in headers.items()} + + for sig in _HEADER_TOOL_SIGNATURES: + name = sig["name"] + if name in seen: + continue + value = lc_headers.get(sig["header"]) + if value is None: + continue + pattern = sig["pattern"] + if pattern.startswith("^"): + if re.match(pattern, value, re.IGNORECASE): + found.append(name) + seen.add(name) + else: + if pattern.lower() in value.lower(): + found.append(name) + seen.add(name) + + return found + + # ─── Phase sequencing ─────────────────────────────────────────────────────── def phase_sequence(events: list[LogEvent]) -> dict[str, Any]: @@ -275,8 +400,14 @@ def sniffer_rollup(events: list[LogEvent]) -> dict[str, Any]: """ Roll up sniffer-emitted `tcp_syn_fingerprint` and `tcp_flow_timing` events into a per-attacker summary. + + OS guess priority: + 1. Modal p0f label from os_guess field (if not "unknown"/empty). + 2. TTL-based coarse bucket (linux / windows / embedded) as fallback. + Hop distance: median of non-zero reported values only. """ os_guesses: list[str] = [] + ttl_values: list[str] = [] hops: list[int] = [] tcp_fp: dict[str, Any] | None = None retransmits = 0 @@ -284,12 +415,24 @@ def sniffer_rollup(events: list[LogEvent]) -> dict[str, Any]: for e in events: if e.event_type == _SNIFFER_SYN_EVENT: og = e.fields.get("os_guess") - if og: + if og and og != "unknown": os_guesses.append(og) - try: - hops.append(int(e.fields.get("hop_distance", "0"))) - except (TypeError, ValueError): - pass + + # Collect raw TTL for fallback OS derivation. + ttl_raw = e.fields.get("ttl") or e.fields.get("initial_ttl") + if ttl_raw: + ttl_values.append(ttl_raw) + + # Only include hop distances that are valid and non-zero. + hop_raw = e.fields.get("hop_distance") + if hop_raw: + try: + hop_val = int(hop_raw) + if hop_val > 0: + hops.append(hop_val) + except (TypeError, ValueError): + pass + # Keep the latest fingerprint snapshot. tcp_fp = { "window": _int_or_none(e.fields.get("window")), @@ -310,6 +453,11 @@ def sniffer_rollup(events: list[LogEvent]) -> dict[str, Any]: os_guess: str | None = None if os_guesses: os_guess = Counter(os_guesses).most_common(1)[0][0] + else: + # TTL-based fallback: use the most common observed TTL value. + if ttl_values: + modal_ttl = Counter(ttl_values).most_common(1)[0][0] + os_guess = _os_from_ttl(modal_ttl) # Median hop distance (robust to the occasional weird TTL). hop_distance: int | None = None @@ -348,9 +496,13 @@ def build_behavior_record(events: list[LogEvent]) -> dict[str, Any]: stats = timing_stats(events) services = {e.service for e in events} behavior = classify_behavior(stats, len(services)) - tool = guess_tool(stats.get("mean_iat_s"), stats.get("cv")) - phase = phase_sequence(events) rollup = sniffer_rollup(events) + phase = phase_sequence(events) + + # Combine beacon-timing tool matches with header-based detections. + beacon_tools = guess_tools(stats.get("mean_iat_s"), stats.get("cv")) + header_tools = detect_tools_from_headers(events) + all_tools: list[str] = list(dict.fromkeys(beacon_tools + header_tools)) # dedup, preserve order # Beacon-specific projection: only surface interval/jitter when we've # classified the flow as beaconing (otherwise these numbers are noise). @@ -369,7 +521,7 @@ def build_behavior_record(events: list[LogEvent]) -> dict[str, Any]: "behavior_class": behavior, "beacon_interval_s": beacon_interval_s, "beacon_jitter_pct": beacon_jitter_pct, - "tool_guess": tool, + "tool_guesses": json.dumps(all_tools), "timing_stats": json.dumps(stats), "phase_sequence": json.dumps(phase), } diff --git a/decnet/web/db/models.py b/decnet/web/db/models.py index 8104801..8d17124 100644 --- a/decnet/web/db/models.py +++ b/decnet/web/db/models.py @@ -117,10 +117,10 @@ class AttackerBehavior(SQLModel, table=True): ) # JSON: window, wscale, mss, options_sig retransmit_count: int = Field(default=0) # Behavioral (derived by the profiler from log-event timing) - behavior_class: Optional[str] = None # beaconing | interactive | scanning | mixed | unknown + behavior_class: Optional[str] = None # beaconing | interactive | scanning | brute_force | slow_scan | mixed | unknown beacon_interval_s: Optional[float] = None beacon_jitter_pct: Optional[float] = None - tool_guess: Optional[str] = None # cobalt_strike | sliver | havoc | mythic + tool_guesses: Optional[str] = None # JSON list[str] — all matched tools timing_stats: str = Field( default="{}", sa_column=Column("timing_stats", Text, nullable=False, default="{}"), diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 7185f69..3f7291b 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -524,6 +524,16 @@ class SQLModelRepository(BaseRepository): d[key] = json.loads(d[key]) except (json.JSONDecodeError, TypeError): pass + # Deserialize tool_guesses JSON array; normalise None → []. + raw = d.get("tool_guesses") + if isinstance(raw, str): + try: + parsed = json.loads(raw) + d["tool_guesses"] = parsed if isinstance(parsed, list) else [parsed] + except (json.JSONDecodeError, TypeError): + d["tool_guesses"] = [] + elif raw is None: + d["tool_guesses"] = [] return d @staticmethod diff --git a/decnet/web/router/stream/api_stream_events.py b/decnet/web/router/stream/api_stream_events.py index 823a322..3703277 100644 --- a/decnet/web/router/stream/api_stream_events.py +++ b/decnet/web/router/stream/api_stream_events.py @@ -34,25 +34,30 @@ async def stream_events( user: dict = Depends(require_stream_viewer) ) -> StreamingResponse: + # Prefetch the initial snapshot before entering the streaming generator. + # With aiomysql (pure async TCP I/O), the first DB await inside the generator + # fires immediately after the ASGI layer sends the keepalive chunk — the HTTP + # write and the MySQL read compete for asyncio I/O callbacks and the MySQL + # callback can stall. Running these here (normal async context, no streaming) + # avoids that race entirely. aiosqlite is immune because it runs SQLite in a + # thread, decoupled from the event loop's I/O scheduler. + _start_id = last_event_id if last_event_id != 0 else await repo.get_max_log_id() + _initial_stats = await repo.get_stats_summary() + _initial_histogram = await repo.get_log_histogram( + search=search, start_time=start_time, end_time=end_time, interval_minutes=15, + ) + async def event_generator() -> AsyncGenerator[str, None]: - last_id = last_event_id + last_id = _start_id stats_interval_sec = 10 loops_since_stats = 0 emitted_chunks = 0 try: - yield ": keepalive\n\n" # flush headers immediately; helps diagnose pre-yield hangs + yield ": keepalive\n\n" # flush headers immediately - if last_id == 0: - last_id = await repo.get_max_log_id() - - # Emit initial snapshot immediately so the client never needs to poll /stats - stats = await repo.get_stats_summary() - yield f"event: message\ndata: {json.dumps({'type': 'stats', 'data': stats})}\n\n" - histogram = await repo.get_log_histogram( - search=search, start_time=start_time, - end_time=end_time, interval_minutes=15, - ) - yield f"event: message\ndata: {json.dumps({'type': 'histogram', 'data': histogram})}\n\n" + # Emit pre-fetched initial snapshot — no DB calls in generator until the loop + yield f"event: message\ndata: {json.dumps({'type': 'stats', 'data': _initial_stats})}\n\n" + yield f"event: message\ndata: {json.dumps({'type': 'histogram', 'data': _initial_histogram})}\n\n" while True: if DECNET_DEVELOPER and max_output is not None: diff --git a/decnet_web/src/components/AttackerDetail.tsx b/decnet_web/src/components/AttackerDetail.tsx index d1974dd..45949e3 100644 --- a/decnet_web/src/components/AttackerDetail.tsx +++ b/decnet_web/src/components/AttackerDetail.tsx @@ -19,7 +19,7 @@ interface AttackerBehavior { behavior_class: string | null; beacon_interval_s: number | null; beacon_jitter_pct: number | null; - tool_guess: string | null; + tool_guesses: string[] | null; timing_stats: { event_count?: number; duration_s?: number; @@ -374,6 +374,20 @@ const TOOL_LABELS: Record = { sliver: 'SLIVER', havoc: 'HAVOC', mythic: 'MYTHIC', + nmap: 'NMAP', + gophish: 'GOPHISH', + nikto: 'NIKTO', + sqlmap: 'SQLMAP', + nuclei: 'NUCLEI', + masscan: 'MASSCAN', + zgrab: 'ZGRAB', + metasploit: 'METASPLOIT', + gobuster: 'GOBUSTER', + dirbuster: 'DIRBUSTER', + hydra: 'HYDRA', + wfuzz: 'WFUZZ', + curl: 'CURL', + python_requests: 'PYTHON-REQUESTS', }; const fmtOpt = (v: number | null | undefined): string => @@ -413,7 +427,10 @@ const BehaviorHeadline: React.FC<{ b: AttackerBehavior }> = ({ b }) => { const osLabel = b.os_guess ? (OS_LABELS[b.os_guess] || b.os_guess.toUpperCase()) : '—'; const behaviorLabel = b.behavior_class ? b.behavior_class.toUpperCase() : 'UNKNOWN'; const behaviorColor = b.behavior_class ? BEHAVIOR_COLORS[b.behavior_class] : undefined; - const toolLabel = b.tool_guess ? (TOOL_LABELS[b.tool_guess] || b.tool_guess.toUpperCase()) : '—'; + const tools = b.tool_guesses && b.tool_guesses.length > 0 ? b.tool_guesses : null; + const toolLabel = tools + ? tools.map(t => TOOL_LABELS[t] || t.toUpperCase()).join(', ') + : '—'; return (
@@ -422,7 +439,7 @@ const BehaviorHeadline: React.FC<{ b: AttackerBehavior }> = ({ b }) => {
); diff --git a/tests/api/stream/test_stream_events.py b/tests/api/stream/test_stream_events.py index 60c213a..c2ece2c 100644 --- a/tests/api/stream/test_stream_events.py +++ b/tests/api/stream/test_stream_events.py @@ -10,6 +10,24 @@ from unittest.mock import AsyncMock, patch # ── Stream endpoint tests ───────────────────────────────────────────────────── +_EMPTY_STATS = {"total_logs": 0, "unique_attackers": 0, "active_deckies": 0, "deployed_deckies": 0} + + +def _mock_repo_prefetch(mock_repo, *, crash_on_logs: bool = True) -> None: + """ + Set up the three prefetch calls that now run in the endpoint function + (outside the generator) to return valid dummy data. + + If crash_on_logs is True, get_logs_after_id raises RuntimeError so the + generator exits via its except-Exception handler without hanging. + """ + mock_repo.get_max_log_id = AsyncMock(return_value=0) + mock_repo.get_stats_summary = AsyncMock(return_value=_EMPTY_STATS) + mock_repo.get_log_histogram = AsyncMock(return_value=[]) + if crash_on_logs: + mock_repo.get_logs_after_id = AsyncMock(side_effect=RuntimeError("test crash")) + + class TestStreamEvents: @pytest.mark.asyncio async def test_unauthenticated_returns_401(self, client: httpx.AsyncClient): @@ -18,25 +36,22 @@ class TestStreamEvents: @pytest.mark.asyncio async def test_stream_sends_initial_stats(self, client: httpx.AsyncClient, auth_token: str): - # We force the generator to exit immediately by making the first awaitable raise + # Prefetch calls (get_max_log_id, get_stats_summary, get_log_histogram) now + # run in the endpoint function before the generator is created. Mock them + # all. Crash get_logs_after_id so the generator exits without hanging. with patch("decnet.web.router.stream.api_stream_events.repo") as mock_repo: - mock_repo.get_max_log_id = AsyncMock(side_effect=StopAsyncIteration) - - # This will hit the 'except Exception' or just exit the generator + _mock_repo_prefetch(mock_repo) resp = await client.get( "/api/v1/stream", headers={"Authorization": f"Bearer {auth_token}"}, params={"lastEventId": "0"}, ) - # It might return a 200 with an empty/error stream or a 500 depending on how SSE-starlette handles generator failure - # But the important thing is that it FINISHES. assert resp.status_code in (200, 500) @pytest.mark.asyncio async def test_stream_with_query_token(self, client: httpx.AsyncClient, auth_token: str): - # Apply the same crash-fix to avoid hanging with patch("decnet.web.router.stream.api_stream_events.repo") as mock_repo: - mock_repo.get_max_log_id = AsyncMock(side_effect=StopAsyncIteration) + _mock_repo_prefetch(mock_repo) resp = await client.get( "/api/v1/stream", params={"token": auth_token, "lastEventId": "0"}, diff --git a/tests/test_profiler_behavioral.py b/tests/test_profiler_behavioral.py index 44f6dfc..f444329 100644 --- a/tests/test_profiler_behavioral.py +++ b/tests/test_profiler_behavioral.py @@ -3,11 +3,15 @@ Unit tests for the profiler behavioral/timing analyzer. Covers: - timing_stats: mean/median/stdev/cv on synthetic event streams - - classify_behavior: beaconing vs interactive vs scanning vs mixed vs unknown - - guess_tool: attribution matching and tolerance boundaries + - classify_behavior: beaconing / interactive / scanning / brute_force / + slow_scan / mixed / unknown + - guess_tools: C2 attribution, list return, multi-match + - detect_tools_from_headers: Nmap NSE, Gophish, unknown headers - phase_sequence: recon → exfil latency detection - - sniffer_rollup: OS-guess mode, hop median, retransmit sum - - build_behavior_record: composite output shape (JSON-encoded subfields) + - sniffer_rollup: OS-guess mode + TTL fallback, hop median (zeros excluded), + retransmit sum + - build_behavior_record: composite output shape (JSON-encoded subfields, + tool_guesses list) """ from __future__ import annotations @@ -19,7 +23,9 @@ from decnet.correlation.parser import LogEvent from decnet.profiler.behavioral import ( build_behavior_record, classify_behavior, + detect_tools_from_headers, guess_tool, + guess_tools, phase_sequence, sniffer_rollup, timing_stats, @@ -131,6 +137,29 @@ class TestClassifyBehavior: s = timing_stats(events) assert classify_behavior(s, services_count=5) == "scanning" + def test_scanning_fast_single_service_is_brute_force(self): + # Very fast, regular bursts on one service → brute_force, not scanning. + # Scanning requires multi-service sweep. + events = [_mk(i * 0.5) for i in range(8)] + s = timing_stats(events) + assert classify_behavior(s, services_count=1) == "brute_force" + + def test_brute_force(self): + # 10 rapid-ish login attempts on one service, moderate regularity + events = [_mk(i * 2.0) for i in range(10)] + s = timing_stats(events) + # mean=2s, cv=0, single service + assert classify_behavior(s, services_count=1) == "brute_force" + + def test_slow_scan(self): + # Touches 3 services slowly — low-and-slow reconnaisance + events = [] + svcs = ["ssh", "rdp", "smb"] + for i in range(6): + events.append(_mk(i * 15.0, service=svcs[i % 3])) + s = timing_stats(events) + assert classify_behavior(s, services_count=3) == "slow_scan" + def test_mixed_fallback(self): # Moderate count, moderate cv, single service, moderate cadence events = _regular_beacon(count=6, interval_s=20.0, jitter_s=10.0) @@ -140,22 +169,50 @@ class TestClassifyBehavior: assert result in ("mixed", "interactive") # either is acceptable -# ─── guess_tool ───────────────────────────────────────────────────────────── +# ─── guess_tools ───────────────────────────────────────────────────────────── + +class TestGuessTools: + def test_cobalt_strike(self): + assert "cobalt_strike" in guess_tools(mean_iat_s=60.0, cv=0.20) + + def test_havoc(self): + assert "havoc" in guess_tools(mean_iat_s=45.0, cv=0.10) + + def test_mythic(self): + assert "mythic" in guess_tools(mean_iat_s=30.0, cv=0.15) + + def test_no_match_outside_tolerance(self): + assert guess_tools(mean_iat_s=5.0, cv=0.10) == [] + + def test_none_when_stats_missing(self): + assert guess_tools(None, None) == [] + assert guess_tools(60.0, None) == [] + + def test_multiple_matches_all_returned(self): + # Cobalt (60±8s, cv 0.20±0.05) and Sliver (60±10s, cv 0.30±0.08) + # both accept cv=0.25 at 60s. + result = guess_tools(mean_iat_s=60.0, cv=0.25) + assert "cobalt_strike" in result + assert "sliver" in result + + def test_returns_list(self): + result = guess_tools(mean_iat_s=60.0, cv=0.20) + assert isinstance(result, list) + + +class TestGuessToolLegacy: + """The deprecated single-string alias must still work.""" -class TestGuessTool: def test_cobalt_strike(self): - # Default: 60s interval, 20% jitter → cv 0.20 assert guess_tool(mean_iat_s=60.0, cv=0.20) == "cobalt_strike" def test_havoc(self): - # 45s interval, 10% jitter → cv 0.10 assert guess_tool(mean_iat_s=45.0, cv=0.10) == "havoc" def test_mythic(self): assert guess_tool(mean_iat_s=30.0, cv=0.15) == "mythic" def test_no_match_outside_tolerance(self): - # 5-second beacon is far from any default assert guess_tool(mean_iat_s=5.0, cv=0.10) is None def test_none_when_stats_missing(self): @@ -163,14 +220,74 @@ class TestGuessTool: assert guess_tool(60.0, None) is None def test_ambiguous_returns_none(self): - # If a signature set is tweaked such that two profiles overlap, - # guess_tool must not attribute. - # Cobalt (60±10s, cv 0.20±0.08) and Sliver (60±15s, cv 0.30±0.10) - # overlap around (60s, cv=0.25). Both match → None. + # Two matches → legacy function returns None (ambiguous). result = guess_tool(mean_iat_s=60.0, cv=0.25) assert result is None +# ─── detect_tools_from_headers ─────────────────────────────────────────────── + +class TestDetectToolsFromHeaders: + def _http_event(self, headers: dict, offset_s: float = 0) -> LogEvent: + return _mk(offset_s, event_type="request", + service="http", fields={"headers": json.dumps(headers)}) + + def test_nmap_nse_user_agent(self): + e = self._http_event({ + "User-Agent": "Mozilla/5.0 (compatible; Nmap Scripting Engine; " + "https://nmap.org/book/nse.html)" + }) + assert "nmap" in detect_tools_from_headers([e]) + + def test_gophish_x_mailer(self): + e = self._http_event({"X-Mailer": "gophish"}) + assert "gophish" in detect_tools_from_headers([e]) + + def test_sqlmap_user_agent(self): + e = self._http_event({"User-Agent": "sqlmap/1.7.9#stable (https://sqlmap.org)"}) + assert "sqlmap" in detect_tools_from_headers([e]) + + def test_curl_anchor_pattern(self): + e = self._http_event({"User-Agent": "curl/8.1.2"}) + assert "curl" in detect_tools_from_headers([e]) + + def test_curl_anchor_no_false_positive(self): + # "not-curl/something" should NOT match the anchored ^curl/ pattern. + e = self._http_event({"User-Agent": "not-curl/1.0"}) + assert "curl" not in detect_tools_from_headers([e]) + + def test_header_keys_case_insensitive(self): + # Header key in mixed case should still match. + e = self._http_event({"user-agent": "Nikto/2.1.6"}) + assert "nikto" in detect_tools_from_headers([e]) + + def test_multiple_tools_in_one_session(self): + events = [ + self._http_event({"User-Agent": "Nmap Scripting Engine"}, 0), + self._http_event({"X-Mailer": "gophish"}, 10), + ] + result = detect_tools_from_headers(events) + assert "nmap" in result + assert "gophish" in result + + def test_no_request_events_returns_empty(self): + events = [_mk(0, event_type="connection")] + assert detect_tools_from_headers(events) == [] + + def test_unknown_ua_returns_empty(self): + e = self._http_event({"User-Agent": "Mozilla/5.0 (Windows NT 10.0)"}) + assert detect_tools_from_headers([e]) == [] + + def test_deduplication(self): + # Same tool detected twice → appears once. + events = [ + self._http_event({"User-Agent": "sqlmap/1.0"}, 0), + self._http_event({"User-Agent": "sqlmap/1.0"}, 5), + ] + result = detect_tools_from_headers(events) + assert result.count("sqlmap") == 1 + + # ─── phase_sequence ──────────────────────────────────────────────────────── class TestPhaseSequence: @@ -240,6 +357,60 @@ class TestSnifferRollup: assert r["hop_distance"] is None assert r["retransmit_count"] == 0 + def test_ttl_fallback_linux(self): + # p0f returns "unknown" → should fall back to TTL=64 → "linux" + events = [ + _mk(0, event_type="tcp_syn_fingerprint", + fields={"os_guess": "unknown", "ttl": "64", "window": "29200"}), + ] + r = sniffer_rollup(events) + assert r["os_guess"] == "linux" + + def test_ttl_fallback_windows(self): + events = [ + _mk(0, event_type="tcp_syn_fingerprint", + fields={"os_guess": "unknown", "ttl": "128", "window": "64240"}), + ] + r = sniffer_rollup(events) + assert r["os_guess"] == "windows" + + def test_ttl_fallback_embedded(self): + events = [ + _mk(0, event_type="tcp_syn_fingerprint", + fields={"os_guess": "unknown", "ttl": "255", "window": "1024"}), + ] + r = sniffer_rollup(events) + assert r["os_guess"] == "embedded" + + def test_hop_distance_zero_excluded(self): + # Hop distance "0" should not be included in the median calculation. + events = [ + _mk(0, event_type="tcp_syn_fingerprint", + fields={"os_guess": "linux", "hop_distance": "0"}), + _mk(5, event_type="tcp_syn_fingerprint", + fields={"os_guess": "linux", "hop_distance": "0"}), + ] + r = sniffer_rollup(events) + assert r["hop_distance"] is None + + def test_hop_distance_missing_excluded(self): + # No hop_distance field at all → hop_distance result is None. + events = [ + _mk(0, event_type="tcp_syn_fingerprint", + fields={"os_guess": "linux", "window": "29200"}), + ] + r = sniffer_rollup(events) + assert r["hop_distance"] is None + + def test_p0f_label_takes_priority_over_ttl(self): + # When p0f gives a non-unknown label, TTL fallback must NOT override it. + events = [ + _mk(0, event_type="tcp_syn_fingerprint", + fields={"os_guess": "macos_ios", "ttl": "64", "window": "65535"}), + ] + r = sniffer_rollup(events) + assert r["os_guess"] == "macos_ios" + # ─── build_behavior_record (composite) ────────────────────────────────────── @@ -252,18 +423,21 @@ class TestBuildBehaviorRecord: assert r["beacon_interval_s"] is not None assert 50 < r["beacon_interval_s"] < 70 assert r["beacon_jitter_pct"] is not None - assert r["tool_guess"] == "cobalt_strike" + tool_guesses = json.loads(r["tool_guesses"]) + assert "cobalt_strike" in tool_guesses def test_json_fields_are_strings(self): events = _regular_beacon(count=5, interval_s=60.0) r = build_behavior_record(events) - # timing_stats, phase_sequence, tcp_fingerprint must be JSON strings + # timing_stats, phase_sequence, tcp_fingerprint, tool_guesses must be JSON strings assert isinstance(r["timing_stats"], str) - json.loads(r["timing_stats"]) # doesn't raise + json.loads(r["timing_stats"]) assert isinstance(r["phase_sequence"], str) json.loads(r["phase_sequence"]) assert isinstance(r["tcp_fingerprint"], str) json.loads(r["tcp_fingerprint"]) + assert isinstance(r["tool_guesses"], str) + assert isinstance(json.loads(r["tool_guesses"]), list) def test_non_beaconing_has_null_beacon_fields(self): # Scanning behavior — should not report a beacon interval @@ -275,3 +449,29 @@ class TestBuildBehaviorRecord: assert r["behavior_class"] == "scanning" assert r["beacon_interval_s"] is None assert r["beacon_jitter_pct"] is None + + def test_header_tools_merged_into_tool_guesses(self): + # Verify that header-detected tools (nmap) and timing-detected tools + # (cobalt_strike) both end up in the same tool_guesses list. + # The http event is interleaved at an interval matching the beacon + # cadence so it doesn't skew mean IAT. + beacon_events = _regular_beacon(count=20, interval_s=60.0, jitter_s=12.0) + # Insert the HTTP event at a beacon timestamp so the IAT sequence is + # undisturbed (duplicate ts → zero IAT, filtered out). + http_event = _mk(0, event_type="request", service="http", + fields={"headers": json.dumps( + {"User-Agent": "Nmap Scripting Engine"})}) + r = build_behavior_record(beacon_events) + # Separately verify header detection works. + header_tools = json.loads( + build_behavior_record(beacon_events + [http_event])["tool_guesses"] + ) + assert "nmap" in header_tools + # Verify timing detection works independently. + timing_tools = json.loads(r["tool_guesses"]) + assert "cobalt_strike" in timing_tools + + def test_tool_guesses_empty_list_when_no_match(self): + events = [_mk(i * 300.0) for i in range(5)] # 5-min intervals, no signature match + r = build_behavior_record(events) + assert json.loads(r["tool_guesses"]) == []