merge testing->tomerge/main #7

Open
anti wants to merge 242 commits from testing into tomerge/main
6 changed files with 593 additions and 0 deletions
Showing only changes of commit ddfb232590 - Show all commits

View File

@@ -0,0 +1,5 @@
"""DECNET profiler — standalone attacker profile builder worker."""
from decnet.profiler.worker import attacker_profile_worker
__all__ = ["attacker_profile_worker"]

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,375 @@
"""
Behavioral and timing analysis for DECNET attacker profiles.
Consumes the chronological `LogEvent` stream already built by
`decnet.correlation.engine.CorrelationEngine` and derives per-IP metrics:
- Inter-event timing statistics (mean / median / stdev / min / max)
- Coefficient-of-variation (jitter metric)
- Beaconing vs. interactive vs. scanning classification
- Tool attribution against known C2 frameworks (Cobalt Strike, Sliver,
Havoc, Mythic) using default beacon/jitter profiles
- Recon → exfil phase sequencing (latency between the last recon event
and the first exfil-like event)
- OS / TCP fingerprint + retransmit rollup from sniffer-emitted events
Pure-Python; no external dependencies. All functions are safe to call from
both sync and async contexts.
"""
from __future__ import annotations
import json
import statistics
from collections import Counter
from typing import Any
from decnet.correlation.parser import LogEvent
# ─── Event-type taxonomy ────────────────────────────────────────────────────
# Sniffer-emitted packet events that feed into fingerprint rollup.
_SNIFFER_SYN_EVENT: str = "tcp_syn_fingerprint"
_SNIFFER_FLOW_EVENT: str = "tcp_flow_timing"
# Events that signal "recon" phase (scans, probes, auth attempts).
_RECON_EVENT_TYPES: frozenset[str] = frozenset({
"scan", "connection", "banner", "probe",
"login_attempt", "auth", "auth_failure",
})
# Events that signal "exfil" / action-on-objective phase.
_EXFIL_EVENT_TYPES: frozenset[str] = frozenset({
"download", "upload", "file_transfer", "data_exfil",
"command", "exec", "query", "shell_input",
})
# Fields carrying payload byte counts (for "large payload" detection).
_PAYLOAD_SIZE_FIELDS: tuple[str, ...] = ("bytes", "size", "content_length")
# ─── C2 tool attribution signatures ─────────────────────────────────────────
#
# Each entry lists the default beacon cadence profile of a popular C2.
# A profile *matches* an attacker when:
# - mean inter-event time is within ±`interval_tolerance` seconds, AND
# - jitter (cv = stdev / mean) is within ±`jitter_tolerance`
#
# These defaults are documented in each framework's public user guides;
# real operators often tune them, so attribution is advisory, not definitive.
_TOOL_SIGNATURES: tuple[dict[str, Any], ...] = (
{
"name": "cobalt_strike",
"interval_s": 60.0,
"interval_tolerance_s": 8.0,
"jitter_cv": 0.20,
"jitter_tolerance": 0.05,
},
{
"name": "sliver",
"interval_s": 60.0,
"interval_tolerance_s": 10.0,
"jitter_cv": 0.30,
"jitter_tolerance": 0.08,
},
{
"name": "havoc",
"interval_s": 45.0,
"interval_tolerance_s": 8.0,
"jitter_cv": 0.10,
"jitter_tolerance": 0.03,
},
{
"name": "mythic",
"interval_s": 30.0,
"interval_tolerance_s": 6.0,
"jitter_cv": 0.15,
"jitter_tolerance": 0.03,
},
)
# ─── Timing stats ───────────────────────────────────────────────────────────
def timing_stats(events: list[LogEvent]) -> dict[str, Any]:
"""
Compute inter-arrival-time statistics across *events* (sorted by ts).
Returns a dict with:
mean_iat_s, median_iat_s, stdev_iat_s, min_iat_s, max_iat_s, cv,
event_count, duration_s
For n < 2 events the interval-based fields are None/0.
"""
if not events:
return {
"event_count": 0,
"duration_s": 0.0,
"mean_iat_s": None,
"median_iat_s": None,
"stdev_iat_s": None,
"min_iat_s": None,
"max_iat_s": None,
"cv": None,
}
sorted_events = sorted(events, key=lambda e: e.timestamp)
duration_s = (sorted_events[-1].timestamp - sorted_events[0].timestamp).total_seconds()
if len(sorted_events) < 2:
return {
"event_count": len(sorted_events),
"duration_s": round(duration_s, 3),
"mean_iat_s": None,
"median_iat_s": None,
"stdev_iat_s": None,
"min_iat_s": None,
"max_iat_s": None,
"cv": None,
}
iats = [
(sorted_events[i].timestamp - sorted_events[i - 1].timestamp).total_seconds()
for i in range(1, len(sorted_events))
]
# Exclude spuriously-negative (clock-skew) intervals.
iats = [v for v in iats if v >= 0]
if not iats:
return {
"event_count": len(sorted_events),
"duration_s": round(duration_s, 3),
"mean_iat_s": None,
"median_iat_s": None,
"stdev_iat_s": None,
"min_iat_s": None,
"max_iat_s": None,
"cv": None,
}
mean = statistics.fmean(iats)
median = statistics.median(iats)
stdev = statistics.pstdev(iats) if len(iats) > 1 else 0.0
cv = (stdev / mean) if mean > 0 else None
return {
"event_count": len(sorted_events),
"duration_s": round(duration_s, 3),
"mean_iat_s": round(mean, 3),
"median_iat_s": round(median, 3),
"stdev_iat_s": round(stdev, 3),
"min_iat_s": round(min(iats), 3),
"max_iat_s": round(max(iats), 3),
"cv": round(cv, 4) if cv is not None else None,
}
# ─── Behavior classification ────────────────────────────────────────────────
def classify_behavior(stats: dict[str, Any], services_count: int) -> str:
"""
Coarse behavior bucket: beaconing | interactive | scanning | mixed | unknown
Heuristics:
* `beaconing` — low CV (< 0.35) + mean IAT ≥ 5 s + ≥ 5 events
* `scanning` — ≥ 3 services touched in short bursts (mean IAT < 3 s)
* `interactive` — fast but irregular: mean IAT < 3 s AND CV ≥ 0.5, ≥ 10 events
* `mixed` — moderate count + moderate CV, neither cleanly beaconing nor interactive
* `unknown` — too few data points
"""
n = stats.get("event_count") or 0
mean = stats.get("mean_iat_s")
cv = stats.get("cv")
if n < 3 or mean is None:
return "unknown"
# Scanning: many services, fast bursts, few events per service.
if services_count >= 3 and mean < 3.0 and n >= 5:
return "scanning"
# Beaconing: regular cadence over many events.
if cv is not None and cv < 0.35 and mean >= 5.0 and n >= 5:
return "beaconing"
# Interactive: short, irregular intervals.
if cv is not None and cv >= 0.5 and mean < 3.0 and n >= 10:
return "interactive"
return "mixed"
# ─── C2 tool attribution ────────────────────────────────────────────────────
def guess_tool(mean_iat_s: float | None, cv: float | None) -> str | None:
"""
Match (mean_iat, cv) against known C2 default beacon profiles.
Returns the tool name if a single signature matches; None otherwise.
Multiple matches also return None to avoid false attribution.
"""
if mean_iat_s is None or cv is None:
return None
hits: list[str] = []
for sig in _TOOL_SIGNATURES:
if abs(mean_iat_s - sig["interval_s"]) > sig["interval_tolerance_s"]:
continue
if abs(cv - sig["jitter_cv"]) > sig["jitter_tolerance"]:
continue
hits.append(sig["name"])
if len(hits) == 1:
return hits[0]
return None
# ─── Phase sequencing ───────────────────────────────────────────────────────
def phase_sequence(events: list[LogEvent]) -> dict[str, Any]:
"""
Derive recon→exfil phase transition info.
Returns:
recon_end_ts : ISO timestamp of last recon-class event (or None)
exfil_start_ts : ISO timestamp of first exfil-class event (or None)
exfil_latency_s : seconds between them (None if not both present)
large_payload_count: count of events whose *fields* report a payload
≥ 1 MiB (heuristic for bulk data transfer)
"""
recon_end = None
exfil_start = None
large_payload_count = 0
for e in sorted(events, key=lambda x: x.timestamp):
if e.event_type in _RECON_EVENT_TYPES:
recon_end = e.timestamp
elif e.event_type in _EXFIL_EVENT_TYPES and exfil_start is None:
exfil_start = e.timestamp
for fname in _PAYLOAD_SIZE_FIELDS:
raw = e.fields.get(fname)
if raw is None:
continue
try:
if int(raw) >= 1_048_576:
large_payload_count += 1
break
except (TypeError, ValueError):
continue
latency: float | None = None
if recon_end is not None and exfil_start is not None and exfil_start >= recon_end:
latency = round((exfil_start - recon_end).total_seconds(), 3)
return {
"recon_end_ts": recon_end.isoformat() if recon_end else None,
"exfil_start_ts": exfil_start.isoformat() if exfil_start else None,
"exfil_latency_s": latency,
"large_payload_count": large_payload_count,
}
# ─── Sniffer rollup (OS fingerprint + retransmits) ──────────────────────────
def sniffer_rollup(events: list[LogEvent]) -> dict[str, Any]:
"""
Roll up sniffer-emitted `tcp_syn_fingerprint` and `tcp_flow_timing`
events into a per-attacker summary.
"""
os_guesses: list[str] = []
hops: list[int] = []
tcp_fp: dict[str, Any] | None = None
retransmits = 0
for e in events:
if e.event_type == _SNIFFER_SYN_EVENT:
og = e.fields.get("os_guess")
if og:
os_guesses.append(og)
try:
hops.append(int(e.fields.get("hop_distance", "0")))
except (TypeError, ValueError):
pass
# Keep the latest fingerprint snapshot.
tcp_fp = {
"window": _int_or_none(e.fields.get("window")),
"wscale": _int_or_none(e.fields.get("wscale")),
"mss": _int_or_none(e.fields.get("mss")),
"options_sig": e.fields.get("options_sig", ""),
"has_sack": e.fields.get("has_sack") == "true",
"has_timestamps": e.fields.get("has_timestamps") == "true",
}
elif e.event_type == _SNIFFER_FLOW_EVENT:
try:
retransmits += int(e.fields.get("retransmits", "0"))
except (TypeError, ValueError):
pass
# Mode for the OS bucket — most frequently observed label.
os_guess: str | None = None
if os_guesses:
os_guess = Counter(os_guesses).most_common(1)[0][0]
# Median hop distance (robust to the occasional weird TTL).
hop_distance: int | None = None
if hops:
hop_distance = int(statistics.median(hops))
return {
"os_guess": os_guess,
"hop_distance": hop_distance,
"tcp_fingerprint": tcp_fp or {},
"retransmit_count": retransmits,
}
def _int_or_none(v: Any) -> int | None:
if v is None or v == "":
return None
try:
return int(v)
except (TypeError, ValueError):
return None
# ─── Composite: build the full AttackerBehavior record ──────────────────────
def build_behavior_record(events: list[LogEvent]) -> dict[str, Any]:
"""
Build the dict to persist in the `attacker_behavior` table.
Callers (profiler worker) pre-serialize JSON-typed fields; we do the
JSON encoding here to keep the repo layer schema-agnostic.
"""
# Timing stats are computed across *all* events (not filtered), because
# a C2 beacon often reuses the same "connection" event_type on each
# check-in. Filtering would throw that signal away.
stats = timing_stats(events)
services = {e.service for e in events}
behavior = classify_behavior(stats, len(services))
tool = guess_tool(stats.get("mean_iat_s"), stats.get("cv"))
phase = phase_sequence(events)
rollup = sniffer_rollup(events)
# Beacon-specific projection: only surface interval/jitter when we've
# classified the flow as beaconing (otherwise these numbers are noise).
beacon_interval_s: float | None = None
beacon_jitter_pct: float | None = None
if behavior == "beaconing":
beacon_interval_s = stats.get("mean_iat_s")
cv = stats.get("cv")
beacon_jitter_pct = round(cv * 100, 2) if cv is not None else None
return {
"os_guess": rollup["os_guess"],
"hop_distance": rollup["hop_distance"],
"tcp_fingerprint": json.dumps(rollup["tcp_fingerprint"]),
"retransmit_count": rollup["retransmit_count"],
"behavior_class": behavior,
"beacon_interval_s": beacon_interval_s,
"beacon_jitter_pct": beacon_jitter_pct,
"tool_guess": tool,
"timing_stats": json.dumps(stats),
"phase_sequence": json.dumps(phase),
}

213
decnet/profiler/worker.py Normal file
View File

@@ -0,0 +1,213 @@
"""
Attacker profile builder — incremental background worker.
Maintains a persistent CorrelationEngine and a log-ID cursor across cycles.
On cold start (first cycle or process restart), performs one full build from
all stored logs. Subsequent cycles fetch only new logs via the cursor,
ingest them into the existing engine, and rebuild profiles for affected IPs
only.
Complexity per cycle: O(new_logs + affected_ips) instead of O(total_logs²).
"""
from __future__ import annotations
import asyncio
import json
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any
from decnet.correlation.engine import CorrelationEngine
from decnet.correlation.parser import LogEvent
from decnet.logging import get_logger
from decnet.profiler.behavioral import build_behavior_record
from decnet.web.db.repository import BaseRepository
logger = get_logger("attacker_worker")
_BATCH_SIZE = 500
_STATE_KEY = "attacker_worker_cursor"
# Event types that indicate active command/query execution (not just connection/scan)
_COMMAND_EVENT_TYPES = frozenset({
"command", "exec", "query", "input", "shell_input",
"execute", "run", "sql_query", "redis_command",
})
# Fields that carry the executed command/query text
_COMMAND_FIELDS = ("command", "query", "input", "line", "sql", "cmd")
@dataclass
class _WorkerState:
engine: CorrelationEngine = field(default_factory=CorrelationEngine)
last_log_id: int = 0
initialized: bool = False
async def attacker_profile_worker(repo: BaseRepository, *, interval: int = 30) -> None:
"""Periodically updates the Attacker table incrementally. Designed to run as an asyncio Task."""
logger.info("attacker profile worker started interval=%ds", interval)
state = _WorkerState()
while True:
await asyncio.sleep(interval)
try:
await _incremental_update(repo, state)
except Exception as exc:
logger.error("attacker worker: update failed: %s", exc)
async def _incremental_update(repo: BaseRepository, state: _WorkerState) -> None:
if not state.initialized:
await _cold_start(repo, state)
return
affected_ips: set[str] = set()
while True:
batch = await repo.get_logs_after_id(state.last_log_id, limit=_BATCH_SIZE)
if not batch:
break
for row in batch:
event = state.engine.ingest(row["raw_line"])
if event and event.attacker_ip:
affected_ips.add(event.attacker_ip)
state.last_log_id = row["id"]
if len(batch) < _BATCH_SIZE:
break
if not affected_ips:
await repo.set_state(_STATE_KEY, {"last_log_id": state.last_log_id})
return
await _update_profiles(repo, state, affected_ips)
await repo.set_state(_STATE_KEY, {"last_log_id": state.last_log_id})
logger.info("attacker worker: updated %d profiles (incremental)", len(affected_ips))
async def _cold_start(repo: BaseRepository, state: _WorkerState) -> None:
all_logs = await repo.get_all_logs_raw()
if not all_logs:
state.last_log_id = await repo.get_max_log_id()
state.initialized = True
await repo.set_state(_STATE_KEY, {"last_log_id": state.last_log_id})
return
for row in all_logs:
state.engine.ingest(row["raw_line"])
state.last_log_id = max(state.last_log_id, row["id"])
all_ips = set(state.engine._events.keys())
await _update_profiles(repo, state, all_ips)
await repo.set_state(_STATE_KEY, {"last_log_id": state.last_log_id})
state.initialized = True
logger.info("attacker worker: cold start rebuilt %d profiles", len(all_ips))
async def _update_profiles(
repo: BaseRepository,
state: _WorkerState,
ips: set[str],
) -> None:
traversal_map = {t.attacker_ip: t for t in state.engine.traversals(min_deckies=2)}
bounties_map = await repo.get_bounties_for_ips(ips)
for ip in ips:
events = state.engine._events.get(ip, [])
if not events:
continue
traversal = traversal_map.get(ip)
bounties = bounties_map.get(ip, [])
commands = _extract_commands_from_events(events)
record = _build_record(ip, events, traversal, bounties, commands)
attacker_uuid = await repo.upsert_attacker(record)
# Behavioral / fingerprint rollup lives in a sibling table so failures
# here never block the core attacker profile upsert.
try:
behavior = build_behavior_record(events)
await repo.upsert_attacker_behavior(attacker_uuid, behavior)
except Exception as exc:
logger.error("attacker worker: behavior upsert failed for %s: %s", ip, exc)
def _build_record(
ip: str,
events: list[LogEvent],
traversal: Any,
bounties: list[dict[str, Any]],
commands: list[dict[str, Any]],
) -> dict[str, Any]:
services = sorted({e.service for e in events})
deckies = (
traversal.deckies
if traversal
else _first_contact_deckies(events)
)
fingerprints = [b for b in bounties if b.get("bounty_type") == "fingerprint"]
credential_count = sum(1 for b in bounties if b.get("bounty_type") == "credential")
return {
"ip": ip,
"first_seen": min(e.timestamp for e in events),
"last_seen": max(e.timestamp for e in events),
"event_count": len(events),
"service_count": len(services),
"decky_count": len({e.decky for e in events}),
"services": json.dumps(services),
"deckies": json.dumps(deckies),
"traversal_path": traversal.path if traversal else None,
"is_traversal": traversal is not None,
"bounty_count": len(bounties),
"credential_count": credential_count,
"fingerprints": json.dumps(fingerprints),
"commands": json.dumps(commands),
"updated_at": datetime.now(timezone.utc),
}
def _first_contact_deckies(events: list[LogEvent]) -> list[str]:
"""Return unique deckies in first-contact order (for non-traversal attackers)."""
seen: list[str] = []
for e in sorted(events, key=lambda x: x.timestamp):
if e.decky not in seen:
seen.append(e.decky)
return seen
def _extract_commands_from_events(events: list[LogEvent]) -> list[dict[str, Any]]:
"""
Extract executed commands from LogEvent objects.
Works directly on LogEvent.fields (already a dict), so no JSON parsing needed.
"""
commands: list[dict[str, Any]] = []
for event in events:
if event.event_type not in _COMMAND_EVENT_TYPES:
continue
cmd_text: str | None = None
for key in _COMMAND_FIELDS:
val = event.fields.get(key)
if val:
cmd_text = str(val)
break
if not cmd_text:
continue
commands.append({
"service": event.service,
"decky": event.decky,
"command": cmd_text,
"timestamp": event.timestamp.isoformat(),
})
return commands