feat: overhaul behavioral profiler — multi-tool detection, improved classification, TTL OS fallback
This commit is contained in:
@@ -3,11 +3,15 @@ Unit tests for the profiler behavioral/timing analyzer.
|
||||
|
||||
Covers:
|
||||
- timing_stats: mean/median/stdev/cv on synthetic event streams
|
||||
- classify_behavior: beaconing vs interactive vs scanning vs mixed vs unknown
|
||||
- guess_tool: attribution matching and tolerance boundaries
|
||||
- classify_behavior: beaconing / interactive / scanning / brute_force /
|
||||
slow_scan / mixed / unknown
|
||||
- guess_tools: C2 attribution, list return, multi-match
|
||||
- detect_tools_from_headers: Nmap NSE, Gophish, unknown headers
|
||||
- phase_sequence: recon → exfil latency detection
|
||||
- sniffer_rollup: OS-guess mode, hop median, retransmit sum
|
||||
- build_behavior_record: composite output shape (JSON-encoded subfields)
|
||||
- sniffer_rollup: OS-guess mode + TTL fallback, hop median (zeros excluded),
|
||||
retransmit sum
|
||||
- build_behavior_record: composite output shape (JSON-encoded subfields,
|
||||
tool_guesses list)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -19,7 +23,9 @@ from decnet.correlation.parser import LogEvent
|
||||
from decnet.profiler.behavioral import (
|
||||
build_behavior_record,
|
||||
classify_behavior,
|
||||
detect_tools_from_headers,
|
||||
guess_tool,
|
||||
guess_tools,
|
||||
phase_sequence,
|
||||
sniffer_rollup,
|
||||
timing_stats,
|
||||
@@ -131,6 +137,29 @@ class TestClassifyBehavior:
|
||||
s = timing_stats(events)
|
||||
assert classify_behavior(s, services_count=5) == "scanning"
|
||||
|
||||
def test_scanning_fast_single_service_is_brute_force(self):
|
||||
# Very fast, regular bursts on one service → brute_force, not scanning.
|
||||
# Scanning requires multi-service sweep.
|
||||
events = [_mk(i * 0.5) for i in range(8)]
|
||||
s = timing_stats(events)
|
||||
assert classify_behavior(s, services_count=1) == "brute_force"
|
||||
|
||||
def test_brute_force(self):
|
||||
# 10 rapid-ish login attempts on one service, moderate regularity
|
||||
events = [_mk(i * 2.0) for i in range(10)]
|
||||
s = timing_stats(events)
|
||||
# mean=2s, cv=0, single service
|
||||
assert classify_behavior(s, services_count=1) == "brute_force"
|
||||
|
||||
def test_slow_scan(self):
|
||||
# Touches 3 services slowly — low-and-slow reconnaisance
|
||||
events = []
|
||||
svcs = ["ssh", "rdp", "smb"]
|
||||
for i in range(6):
|
||||
events.append(_mk(i * 15.0, service=svcs[i % 3]))
|
||||
s = timing_stats(events)
|
||||
assert classify_behavior(s, services_count=3) == "slow_scan"
|
||||
|
||||
def test_mixed_fallback(self):
|
||||
# Moderate count, moderate cv, single service, moderate cadence
|
||||
events = _regular_beacon(count=6, interval_s=20.0, jitter_s=10.0)
|
||||
@@ -140,22 +169,50 @@ class TestClassifyBehavior:
|
||||
assert result in ("mixed", "interactive") # either is acceptable
|
||||
|
||||
|
||||
# ─── guess_tool ─────────────────────────────────────────────────────────────
|
||||
# ─── guess_tools ─────────────────────────────────────────────────────────────
|
||||
|
||||
class TestGuessTools:
|
||||
def test_cobalt_strike(self):
|
||||
assert "cobalt_strike" in guess_tools(mean_iat_s=60.0, cv=0.20)
|
||||
|
||||
def test_havoc(self):
|
||||
assert "havoc" in guess_tools(mean_iat_s=45.0, cv=0.10)
|
||||
|
||||
def test_mythic(self):
|
||||
assert "mythic" in guess_tools(mean_iat_s=30.0, cv=0.15)
|
||||
|
||||
def test_no_match_outside_tolerance(self):
|
||||
assert guess_tools(mean_iat_s=5.0, cv=0.10) == []
|
||||
|
||||
def test_none_when_stats_missing(self):
|
||||
assert guess_tools(None, None) == []
|
||||
assert guess_tools(60.0, None) == []
|
||||
|
||||
def test_multiple_matches_all_returned(self):
|
||||
# Cobalt (60±8s, cv 0.20±0.05) and Sliver (60±10s, cv 0.30±0.08)
|
||||
# both accept cv=0.25 at 60s.
|
||||
result = guess_tools(mean_iat_s=60.0, cv=0.25)
|
||||
assert "cobalt_strike" in result
|
||||
assert "sliver" in result
|
||||
|
||||
def test_returns_list(self):
|
||||
result = guess_tools(mean_iat_s=60.0, cv=0.20)
|
||||
assert isinstance(result, list)
|
||||
|
||||
|
||||
class TestGuessToolLegacy:
|
||||
"""The deprecated single-string alias must still work."""
|
||||
|
||||
class TestGuessTool:
|
||||
def test_cobalt_strike(self):
|
||||
# Default: 60s interval, 20% jitter → cv 0.20
|
||||
assert guess_tool(mean_iat_s=60.0, cv=0.20) == "cobalt_strike"
|
||||
|
||||
def test_havoc(self):
|
||||
# 45s interval, 10% jitter → cv 0.10
|
||||
assert guess_tool(mean_iat_s=45.0, cv=0.10) == "havoc"
|
||||
|
||||
def test_mythic(self):
|
||||
assert guess_tool(mean_iat_s=30.0, cv=0.15) == "mythic"
|
||||
|
||||
def test_no_match_outside_tolerance(self):
|
||||
# 5-second beacon is far from any default
|
||||
assert guess_tool(mean_iat_s=5.0, cv=0.10) is None
|
||||
|
||||
def test_none_when_stats_missing(self):
|
||||
@@ -163,14 +220,74 @@ class TestGuessTool:
|
||||
assert guess_tool(60.0, None) is None
|
||||
|
||||
def test_ambiguous_returns_none(self):
|
||||
# If a signature set is tweaked such that two profiles overlap,
|
||||
# guess_tool must not attribute.
|
||||
# Cobalt (60±10s, cv 0.20±0.08) and Sliver (60±15s, cv 0.30±0.10)
|
||||
# overlap around (60s, cv=0.25). Both match → None.
|
||||
# Two matches → legacy function returns None (ambiguous).
|
||||
result = guess_tool(mean_iat_s=60.0, cv=0.25)
|
||||
assert result is None
|
||||
|
||||
|
||||
# ─── detect_tools_from_headers ───────────────────────────────────────────────
|
||||
|
||||
class TestDetectToolsFromHeaders:
|
||||
def _http_event(self, headers: dict, offset_s: float = 0) -> LogEvent:
|
||||
return _mk(offset_s, event_type="request",
|
||||
service="http", fields={"headers": json.dumps(headers)})
|
||||
|
||||
def test_nmap_nse_user_agent(self):
|
||||
e = self._http_event({
|
||||
"User-Agent": "Mozilla/5.0 (compatible; Nmap Scripting Engine; "
|
||||
"https://nmap.org/book/nse.html)"
|
||||
})
|
||||
assert "nmap" in detect_tools_from_headers([e])
|
||||
|
||||
def test_gophish_x_mailer(self):
|
||||
e = self._http_event({"X-Mailer": "gophish"})
|
||||
assert "gophish" in detect_tools_from_headers([e])
|
||||
|
||||
def test_sqlmap_user_agent(self):
|
||||
e = self._http_event({"User-Agent": "sqlmap/1.7.9#stable (https://sqlmap.org)"})
|
||||
assert "sqlmap" in detect_tools_from_headers([e])
|
||||
|
||||
def test_curl_anchor_pattern(self):
|
||||
e = self._http_event({"User-Agent": "curl/8.1.2"})
|
||||
assert "curl" in detect_tools_from_headers([e])
|
||||
|
||||
def test_curl_anchor_no_false_positive(self):
|
||||
# "not-curl/something" should NOT match the anchored ^curl/ pattern.
|
||||
e = self._http_event({"User-Agent": "not-curl/1.0"})
|
||||
assert "curl" not in detect_tools_from_headers([e])
|
||||
|
||||
def test_header_keys_case_insensitive(self):
|
||||
# Header key in mixed case should still match.
|
||||
e = self._http_event({"user-agent": "Nikto/2.1.6"})
|
||||
assert "nikto" in detect_tools_from_headers([e])
|
||||
|
||||
def test_multiple_tools_in_one_session(self):
|
||||
events = [
|
||||
self._http_event({"User-Agent": "Nmap Scripting Engine"}, 0),
|
||||
self._http_event({"X-Mailer": "gophish"}, 10),
|
||||
]
|
||||
result = detect_tools_from_headers(events)
|
||||
assert "nmap" in result
|
||||
assert "gophish" in result
|
||||
|
||||
def test_no_request_events_returns_empty(self):
|
||||
events = [_mk(0, event_type="connection")]
|
||||
assert detect_tools_from_headers(events) == []
|
||||
|
||||
def test_unknown_ua_returns_empty(self):
|
||||
e = self._http_event({"User-Agent": "Mozilla/5.0 (Windows NT 10.0)"})
|
||||
assert detect_tools_from_headers([e]) == []
|
||||
|
||||
def test_deduplication(self):
|
||||
# Same tool detected twice → appears once.
|
||||
events = [
|
||||
self._http_event({"User-Agent": "sqlmap/1.0"}, 0),
|
||||
self._http_event({"User-Agent": "sqlmap/1.0"}, 5),
|
||||
]
|
||||
result = detect_tools_from_headers(events)
|
||||
assert result.count("sqlmap") == 1
|
||||
|
||||
|
||||
# ─── phase_sequence ────────────────────────────────────────────────────────
|
||||
|
||||
class TestPhaseSequence:
|
||||
@@ -240,6 +357,60 @@ class TestSnifferRollup:
|
||||
assert r["hop_distance"] is None
|
||||
assert r["retransmit_count"] == 0
|
||||
|
||||
def test_ttl_fallback_linux(self):
|
||||
# p0f returns "unknown" → should fall back to TTL=64 → "linux"
|
||||
events = [
|
||||
_mk(0, event_type="tcp_syn_fingerprint",
|
||||
fields={"os_guess": "unknown", "ttl": "64", "window": "29200"}),
|
||||
]
|
||||
r = sniffer_rollup(events)
|
||||
assert r["os_guess"] == "linux"
|
||||
|
||||
def test_ttl_fallback_windows(self):
|
||||
events = [
|
||||
_mk(0, event_type="tcp_syn_fingerprint",
|
||||
fields={"os_guess": "unknown", "ttl": "128", "window": "64240"}),
|
||||
]
|
||||
r = sniffer_rollup(events)
|
||||
assert r["os_guess"] == "windows"
|
||||
|
||||
def test_ttl_fallback_embedded(self):
|
||||
events = [
|
||||
_mk(0, event_type="tcp_syn_fingerprint",
|
||||
fields={"os_guess": "unknown", "ttl": "255", "window": "1024"}),
|
||||
]
|
||||
r = sniffer_rollup(events)
|
||||
assert r["os_guess"] == "embedded"
|
||||
|
||||
def test_hop_distance_zero_excluded(self):
|
||||
# Hop distance "0" should not be included in the median calculation.
|
||||
events = [
|
||||
_mk(0, event_type="tcp_syn_fingerprint",
|
||||
fields={"os_guess": "linux", "hop_distance": "0"}),
|
||||
_mk(5, event_type="tcp_syn_fingerprint",
|
||||
fields={"os_guess": "linux", "hop_distance": "0"}),
|
||||
]
|
||||
r = sniffer_rollup(events)
|
||||
assert r["hop_distance"] is None
|
||||
|
||||
def test_hop_distance_missing_excluded(self):
|
||||
# No hop_distance field at all → hop_distance result is None.
|
||||
events = [
|
||||
_mk(0, event_type="tcp_syn_fingerprint",
|
||||
fields={"os_guess": "linux", "window": "29200"}),
|
||||
]
|
||||
r = sniffer_rollup(events)
|
||||
assert r["hop_distance"] is None
|
||||
|
||||
def test_p0f_label_takes_priority_over_ttl(self):
|
||||
# When p0f gives a non-unknown label, TTL fallback must NOT override it.
|
||||
events = [
|
||||
_mk(0, event_type="tcp_syn_fingerprint",
|
||||
fields={"os_guess": "macos_ios", "ttl": "64", "window": "65535"}),
|
||||
]
|
||||
r = sniffer_rollup(events)
|
||||
assert r["os_guess"] == "macos_ios"
|
||||
|
||||
|
||||
# ─── build_behavior_record (composite) ──────────────────────────────────────
|
||||
|
||||
@@ -252,18 +423,21 @@ class TestBuildBehaviorRecord:
|
||||
assert r["beacon_interval_s"] is not None
|
||||
assert 50 < r["beacon_interval_s"] < 70
|
||||
assert r["beacon_jitter_pct"] is not None
|
||||
assert r["tool_guess"] == "cobalt_strike"
|
||||
tool_guesses = json.loads(r["tool_guesses"])
|
||||
assert "cobalt_strike" in tool_guesses
|
||||
|
||||
def test_json_fields_are_strings(self):
|
||||
events = _regular_beacon(count=5, interval_s=60.0)
|
||||
r = build_behavior_record(events)
|
||||
# timing_stats, phase_sequence, tcp_fingerprint must be JSON strings
|
||||
# timing_stats, phase_sequence, tcp_fingerprint, tool_guesses must be JSON strings
|
||||
assert isinstance(r["timing_stats"], str)
|
||||
json.loads(r["timing_stats"]) # doesn't raise
|
||||
json.loads(r["timing_stats"])
|
||||
assert isinstance(r["phase_sequence"], str)
|
||||
json.loads(r["phase_sequence"])
|
||||
assert isinstance(r["tcp_fingerprint"], str)
|
||||
json.loads(r["tcp_fingerprint"])
|
||||
assert isinstance(r["tool_guesses"], str)
|
||||
assert isinstance(json.loads(r["tool_guesses"]), list)
|
||||
|
||||
def test_non_beaconing_has_null_beacon_fields(self):
|
||||
# Scanning behavior — should not report a beacon interval
|
||||
@@ -275,3 +449,29 @@ class TestBuildBehaviorRecord:
|
||||
assert r["behavior_class"] == "scanning"
|
||||
assert r["beacon_interval_s"] is None
|
||||
assert r["beacon_jitter_pct"] is None
|
||||
|
||||
def test_header_tools_merged_into_tool_guesses(self):
|
||||
# Verify that header-detected tools (nmap) and timing-detected tools
|
||||
# (cobalt_strike) both end up in the same tool_guesses list.
|
||||
# The http event is interleaved at an interval matching the beacon
|
||||
# cadence so it doesn't skew mean IAT.
|
||||
beacon_events = _regular_beacon(count=20, interval_s=60.0, jitter_s=12.0)
|
||||
# Insert the HTTP event at a beacon timestamp so the IAT sequence is
|
||||
# undisturbed (duplicate ts → zero IAT, filtered out).
|
||||
http_event = _mk(0, event_type="request", service="http",
|
||||
fields={"headers": json.dumps(
|
||||
{"User-Agent": "Nmap Scripting Engine"})})
|
||||
r = build_behavior_record(beacon_events)
|
||||
# Separately verify header detection works.
|
||||
header_tools = json.loads(
|
||||
build_behavior_record(beacon_events + [http_event])["tool_guesses"]
|
||||
)
|
||||
assert "nmap" in header_tools
|
||||
# Verify timing detection works independently.
|
||||
timing_tools = json.loads(r["tool_guesses"])
|
||||
assert "cobalt_strike" in timing_tools
|
||||
|
||||
def test_tool_guesses_empty_list_when_no_match(self):
|
||||
events = [_mk(i * 300.0) for i in range(5)] # 5-min intervals, no signature match
|
||||
r = build_behavior_record(events)
|
||||
assert json.loads(r["tool_guesses"]) == []
|
||||
|
||||
Reference in New Issue
Block a user