feat(prober,correlation): attacker fingerprint rotation detection (DEBT-032)
When the prober observes a NEW hash for an (attacker_uuid, port, probe_type) triple it has seen before — VPS rotation, SSH server rebuild, TLS cert swap — emit a derived attacker.fingerprint_rotated event carrying both old and new hash. Detection is a small library (decnet.correlation.fingerprint_rotation) called inline from the prober at each of the three emit sites (JARM/HASSH/TCPFP). No new daemon. New AttackerFingerprintState table holds per-triple last-hash state; Attacker.rotation_count and Attacker.last_rotation_at are stamped on every diff. Library is sync, fully unit-tested via injected publish_fn / syslog_fn callbacks.
This commit is contained in:
@@ -114,6 +114,14 @@ ATTACKER_SCORED = "scored"
|
||||
# Distinct from ``observed`` which is the correlator's first-sight signal —
|
||||
# a fingerprint is additional evidence about an already-observed attacker.
|
||||
ATTACKER_FINGERPRINTED = "fingerprinted"
|
||||
# Published when the prober observes a NEW hash for an
|
||||
# (attacker_ip, port, probe_type) triple it has seen before — i.e. the
|
||||
# attacker rotated their VPS, rebuilt their SSH server, swapped their
|
||||
# TLS cert. Distinct from ``fingerprinted`` which fires on every probe
|
||||
# result; ``fingerprint_rotated`` fires only on diff and carries both
|
||||
# old_hash + new_hash. Producer: prober (via the rotation library);
|
||||
# consumers: dashboard, forensics, attribution clustering.
|
||||
ATTACKER_FINGERPRINT_ROTATED = "fingerprint_rotated"
|
||||
ATTACKER_SESSION_STARTED = "session.started"
|
||||
ATTACKER_SESSION_ENDED = "session.ended"
|
||||
# Published by the ``decnet enrich`` worker after an enrichment pass
|
||||
|
||||
153
decnet/correlation/fingerprint_rotation.py
Normal file
153
decnet/correlation/fingerprint_rotation.py
Normal file
@@ -0,0 +1,153 @@
|
||||
"""Attacker substrate-fingerprint rotation detection.
|
||||
|
||||
Called inline from the prober at each fingerprint emit site. Looks up
|
||||
the last persisted hash for ``(attacker_uuid, port, probe_type)``;
|
||||
when the new hash differs from the last one, emits a derived
|
||||
``attacker.fingerprint_rotated`` event (bus + RFC 5424 syslog) and
|
||||
stamps the ``Attacker`` row's rotation telemetry.
|
||||
|
||||
This is a pure library — no daemon, no async loop. The prober is the
|
||||
only producer. We just teach it to derive a second event on hash
|
||||
flip without standing up another worker (DEBT-032).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid as _uuid
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any, Callable, Literal
|
||||
|
||||
from sqlmodel import Session, select
|
||||
|
||||
from decnet.web.db.models import Attacker, AttackerFingerprintState
|
||||
|
||||
ProbeType = Literal["jarm", "hassh", "tcpfp"]
|
||||
RotationKind = Literal[
|
||||
"no_attacker_row", # caller raced ahead of correlator; skip silently
|
||||
"first_sighting", # state row created, no prior hash
|
||||
"unchanged", # same hash as last sighting
|
||||
"rotated", # hash differs; event emitted, Attacker stamped
|
||||
]
|
||||
|
||||
PublishFn = Callable[[str, dict[str, Any]], None]
|
||||
SyslogFn = Callable[[str, dict[str, Any]], None]
|
||||
|
||||
|
||||
@dataclass
|
||||
class RotationOutcome:
|
||||
"""Return shape of :func:`record_fingerprint`. Caller usually
|
||||
ignores it; useful for tests + tracing."""
|
||||
kind: RotationKind
|
||||
old_hash: str | None
|
||||
new_hash: str
|
||||
rotation_count: int
|
||||
|
||||
|
||||
_ROTATED_EVENT_TYPE = "attacker.fingerprint_rotated"
|
||||
|
||||
|
||||
def record_fingerprint(
|
||||
session: Session,
|
||||
*,
|
||||
attacker_ip: str,
|
||||
port: int,
|
||||
probe_type: ProbeType,
|
||||
new_hash: str,
|
||||
ts: datetime,
|
||||
publish_fn: PublishFn | None = None,
|
||||
syslog_fn: SyslogFn | None = None,
|
||||
) -> RotationOutcome:
|
||||
"""Upsert state row; on hash diff, emit derived event + stamp.
|
||||
|
||||
Resolves ``attacker_uuid`` from ``attacker_ip`` via the existing
|
||||
Attacker table. If no Attacker row exists yet (the prober raced
|
||||
ahead of the correlator), returns ``kind="no_attacker_row"`` and
|
||||
does nothing — the next probe cycle will pick it up once the
|
||||
correlator has caught up.
|
||||
|
||||
State upsert + Attacker stamp + publish + syslog are committed in
|
||||
one transaction so a partial failure can't desync state from
|
||||
what was emitted.
|
||||
"""
|
||||
attacker = session.exec(
|
||||
select(Attacker).where(Attacker.ip == attacker_ip)
|
||||
).first()
|
||||
if attacker is None:
|
||||
return RotationOutcome(
|
||||
kind="no_attacker_row",
|
||||
old_hash=None,
|
||||
new_hash=new_hash,
|
||||
rotation_count=0,
|
||||
)
|
||||
|
||||
row = session.exec(
|
||||
select(AttackerFingerprintState).where(
|
||||
AttackerFingerprintState.attacker_uuid == attacker.uuid,
|
||||
AttackerFingerprintState.port == port,
|
||||
AttackerFingerprintState.probe_type == probe_type,
|
||||
)
|
||||
).first()
|
||||
|
||||
if row is None:
|
||||
session.add(AttackerFingerprintState(
|
||||
uuid=str(_uuid.uuid4()),
|
||||
attacker_uuid=attacker.uuid,
|
||||
port=port,
|
||||
probe_type=probe_type,
|
||||
last_hash=new_hash,
|
||||
last_seen=ts,
|
||||
rotation_count=0,
|
||||
))
|
||||
session.commit()
|
||||
return RotationOutcome(
|
||||
kind="first_sighting",
|
||||
old_hash=None,
|
||||
new_hash=new_hash,
|
||||
rotation_count=0,
|
||||
)
|
||||
|
||||
if row.last_hash == new_hash:
|
||||
row.last_seen = ts
|
||||
session.add(row)
|
||||
session.commit()
|
||||
return RotationOutcome(
|
||||
kind="unchanged",
|
||||
old_hash=row.last_hash,
|
||||
new_hash=new_hash,
|
||||
rotation_count=row.rotation_count,
|
||||
)
|
||||
|
||||
old_hash = row.last_hash
|
||||
row.last_hash = new_hash
|
||||
row.last_seen = ts
|
||||
row.rotation_count += 1
|
||||
session.add(row)
|
||||
|
||||
attacker.rotation_count += 1
|
||||
attacker.last_rotation_at = ts
|
||||
session.add(attacker)
|
||||
|
||||
payload: dict[str, Any] = {
|
||||
"attacker_uuid": attacker.uuid,
|
||||
"attacker_ip": attacker_ip,
|
||||
"port": port,
|
||||
"probe_type": probe_type,
|
||||
"old_hash": old_hash,
|
||||
"new_hash": new_hash,
|
||||
"rotation_count": row.rotation_count,
|
||||
"ts": ts.isoformat(),
|
||||
}
|
||||
|
||||
if publish_fn is not None:
|
||||
publish_fn(_ROTATED_EVENT_TYPE, payload)
|
||||
if syslog_fn is not None:
|
||||
syslog_fn(_ROTATED_EVENT_TYPE, payload)
|
||||
|
||||
session.commit()
|
||||
|
||||
return RotationOutcome(
|
||||
kind="rotated",
|
||||
old_hash=old_hash,
|
||||
new_hash=new_hash,
|
||||
rotation_count=row.rotation_count,
|
||||
)
|
||||
@@ -27,6 +27,9 @@ from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable
|
||||
|
||||
from sqlalchemy.engine import Engine
|
||||
from sqlmodel import Session
|
||||
|
||||
from decnet.bus import topics as _topics
|
||||
from decnet.bus.base import BaseBus
|
||||
from decnet.bus.factory import get_bus
|
||||
@@ -35,6 +38,10 @@ from decnet.bus.publish import (
|
||||
run_control_listener,
|
||||
run_health_heartbeat,
|
||||
)
|
||||
from decnet.correlation.fingerprint_rotation import (
|
||||
ProbeType,
|
||||
record_fingerprint,
|
||||
)
|
||||
from decnet.logging import get_logger
|
||||
from decnet.prober.hassh import hassh_server
|
||||
from decnet.prober.jarm import JARM_EMPTY_HASH, jarm_hash
|
||||
@@ -44,6 +51,21 @@ from decnet.telemetry import traced as _traced
|
||||
|
||||
logger = get_logger("prober")
|
||||
|
||||
|
||||
def _build_sync_engine() -> Engine:
|
||||
"""Construct a sync SQLite engine for rotation-detection state.
|
||||
|
||||
Used inline by the prober; it lives outside the async repository
|
||||
layer because rotation detection is a sync hook on a sync probe
|
||||
path. Honors the same defaulting as
|
||||
``decnet.web.db.sqlite.repository.SQLiteRepository``.
|
||||
"""
|
||||
import os
|
||||
from decnet.config import _ROOT
|
||||
from decnet.web.db.sqlite.database import get_sync_engine
|
||||
db_path = os.environ.get("DECNET_DB_PATH", str(_ROOT / "decnet.db"))
|
||||
return get_sync_engine(db_path)
|
||||
|
||||
# ─── Default ports per probe type ───────────────────────────────────────────
|
||||
|
||||
# JARM: common C2 callback / TLS server ports
|
||||
@@ -233,6 +255,14 @@ def _discover_attackers(json_path: Path, position: int) -> tuple[set[str], int]:
|
||||
|
||||
ProbePublishFn = Callable[[str, dict[str, Any]], None]
|
||||
|
||||
# Rotation recorder: takes (attacker_ip, port, probe_type, new_hash) and
|
||||
# performs the rotation-detection upsert + derived-event emission for the
|
||||
# DEBT-032 substrate-fingerprint flow. Optional; when None the prober
|
||||
# behaves exactly as before (raw fingerprint emit only, no rotation
|
||||
# detection). Construction lives at worker startup so phase functions
|
||||
# don't have to know about the DB engine.
|
||||
RotationRecorderFn = Callable[[str, int, "ProbeType", str], None]
|
||||
|
||||
|
||||
@_traced("prober.probe_cycle")
|
||||
def _probe_cycle(
|
||||
@@ -245,6 +275,7 @@ def _probe_cycle(
|
||||
json_path: Path,
|
||||
timeout: float = 5.0,
|
||||
publish_fn: ProbePublishFn | None = None,
|
||||
record_rotation: RotationRecorderFn | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Probe all known attacker IPs with JARM, HASSH, and TCP/IP fingerprinting.
|
||||
@@ -263,13 +294,13 @@ def _probe_cycle(
|
||||
ip_probed = probed.setdefault(ip, {})
|
||||
|
||||
# Phase 1: JARM (TLS fingerprinting)
|
||||
_jarm_phase(ip, ip_probed, jarm_ports, log_path, json_path, timeout, publish_fn)
|
||||
_jarm_phase(ip, ip_probed, jarm_ports, log_path, json_path, timeout, publish_fn, record_rotation)
|
||||
|
||||
# Phase 2: HASSHServer (SSH fingerprinting)
|
||||
_hassh_phase(ip, ip_probed, ssh_ports, log_path, json_path, timeout, publish_fn)
|
||||
_hassh_phase(ip, ip_probed, ssh_ports, log_path, json_path, timeout, publish_fn, record_rotation)
|
||||
|
||||
# Phase 3: TCP/IP stack fingerprinting
|
||||
_tcpfp_phase(ip, ip_probed, tcpfp_ports, log_path, json_path, timeout, publish_fn)
|
||||
_tcpfp_phase(ip, ip_probed, tcpfp_ports, log_path, json_path, timeout, publish_fn, record_rotation)
|
||||
|
||||
|
||||
@_traced("prober.jarm_phase")
|
||||
@@ -281,6 +312,7 @@ def _jarm_phase(
|
||||
json_path: Path,
|
||||
timeout: float,
|
||||
publish_fn: ProbePublishFn | None = None,
|
||||
record_rotation: RotationRecorderFn | None = None,
|
||||
) -> None:
|
||||
"""JARM-fingerprint an IP on the given TLS ports."""
|
||||
done = ip_probed.setdefault("jarm", set())
|
||||
@@ -301,6 +333,8 @@ def _jarm_phase(
|
||||
msg=f"JARM {ip}:{port} = {h}",
|
||||
)
|
||||
logger.info("prober: JARM %s:%d = %s", ip, port, h)
|
||||
if record_rotation is not None:
|
||||
record_rotation(ip, port, "jarm", h)
|
||||
if publish_fn is not None:
|
||||
publish_fn(
|
||||
"jarm",
|
||||
@@ -387,6 +421,7 @@ def _hassh_phase(
|
||||
json_path: Path,
|
||||
timeout: float,
|
||||
publish_fn: ProbePublishFn | None = None,
|
||||
record_rotation: RotationRecorderFn | None = None,
|
||||
) -> None:
|
||||
"""HASSHServer-fingerprint an IP on the given SSH ports."""
|
||||
done = ip_probed.setdefault("hassh", set())
|
||||
@@ -412,6 +447,8 @@ def _hassh_phase(
|
||||
msg=f"HASSH {ip}:{port} = {result['hassh_server']}",
|
||||
)
|
||||
logger.info("prober: HASSH %s:%d = %s", ip, port, result["hassh_server"])
|
||||
if record_rotation is not None:
|
||||
record_rotation(ip, port, "hassh", result["hassh_server"])
|
||||
if publish_fn is not None:
|
||||
publish_fn(
|
||||
"hassh",
|
||||
@@ -445,6 +482,7 @@ def _tcpfp_phase(
|
||||
json_path: Path,
|
||||
timeout: float,
|
||||
publish_fn: ProbePublishFn | None = None,
|
||||
record_rotation: RotationRecorderFn | None = None,
|
||||
) -> None:
|
||||
"""TCP/IP stack fingerprint an IP on the given ports."""
|
||||
done = ip_probed.setdefault("tcpfp", set())
|
||||
@@ -478,6 +516,8 @@ def _tcpfp_phase(
|
||||
msg=f"TCPFP {ip}:{port} = {result['tcpfp_hash']}",
|
||||
)
|
||||
logger.info("prober: TCPFP %s:%d = %s", ip, port, result["tcpfp_hash"])
|
||||
if record_rotation is not None:
|
||||
record_rotation(ip, port, "tcpfp", result["tcpfp_hash"])
|
||||
if publish_fn is not None:
|
||||
publish_fn(
|
||||
"tcpfp",
|
||||
@@ -586,6 +626,61 @@ async def prober_worker(
|
||||
event_type,
|
||||
)
|
||||
|
||||
# Substrate-rotation detection (DEBT-032) — open a sync engine for
|
||||
# the prober's lifetime; recorder closes a session per call so we
|
||||
# never hold a connection across phase boundaries. Failure to
|
||||
# connect is non-fatal: probes continue, rotation detection is
|
||||
# silently disabled.
|
||||
rotation_engine: Engine | None = None
|
||||
record_rotation: RotationRecorderFn | None = None
|
||||
try:
|
||||
rotation_engine = _build_sync_engine()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning(
|
||||
"prober: rotation-detection DB unavailable, "
|
||||
"running with rotation detection disabled: %s", exc,
|
||||
)
|
||||
|
||||
if rotation_engine is not None:
|
||||
def _publish_rotation(event_type: str, payload: dict[str, Any]) -> None:
|
||||
raw_publish(
|
||||
_topics.attacker(_topics.ATTACKER_FINGERPRINT_ROTATED),
|
||||
payload,
|
||||
event_type,
|
||||
)
|
||||
|
||||
def _syslog_rotation(event_type: str, payload: dict[str, Any]) -> None:
|
||||
_write_event(
|
||||
log_path, json_path,
|
||||
"fingerprint_rotated",
|
||||
target_ip=payload["attacker_ip"],
|
||||
target_port=str(payload["port"]),
|
||||
probe_type=payload["probe_type"],
|
||||
old_hash=payload.get("old_hash") or "",
|
||||
new_hash=payload["new_hash"],
|
||||
rotation_count=str(payload["rotation_count"]),
|
||||
msg=(
|
||||
f"FP rotation {payload['attacker_ip']}:{payload['port']} "
|
||||
f"{payload['probe_type']} {payload.get('old_hash')} → "
|
||||
f"{payload['new_hash']}"
|
||||
),
|
||||
)
|
||||
|
||||
def record_rotation(
|
||||
ip: str, port: int, probe_type: ProbeType, new_hash: str,
|
||||
) -> None:
|
||||
with Session(rotation_engine) as session:
|
||||
record_fingerprint(
|
||||
session,
|
||||
attacker_ip=ip,
|
||||
port=port,
|
||||
probe_type=probe_type,
|
||||
new_hash=new_hash,
|
||||
ts=datetime.now(timezone.utc),
|
||||
publish_fn=_publish_rotation,
|
||||
syslog_fn=_syslog_rotation,
|
||||
)
|
||||
|
||||
shutdown = asyncio.Event()
|
||||
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "prober"))
|
||||
control_task = asyncio.create_task(
|
||||
@@ -612,6 +707,7 @@ async def prober_worker(
|
||||
jarm_ports, hassh_ports, tcp_ports,
|
||||
log_path, json_path, timeout,
|
||||
_publish_attacker,
|
||||
record_rotation,
|
||||
)
|
||||
|
||||
try:
|
||||
@@ -626,3 +722,6 @@ async def prober_worker(
|
||||
if bus is not None:
|
||||
with contextlib.suppress(Exception):
|
||||
await bus.close()
|
||||
if rotation_engine is not None:
|
||||
with contextlib.suppress(Exception):
|
||||
rotation_engine.dispose()
|
||||
|
||||
@@ -45,6 +45,7 @@ from .auth import (
|
||||
from .attackers import (
|
||||
Attacker,
|
||||
AttackerBehavior,
|
||||
AttackerFingerprintState,
|
||||
AttackerIdentity,
|
||||
AttackersResponse,
|
||||
SessionProfile,
|
||||
@@ -242,6 +243,7 @@ __all__ = [
|
||||
# attackers
|
||||
"Attacker",
|
||||
"AttackerBehavior",
|
||||
"AttackerFingerprintState",
|
||||
"AttackerIdentity",
|
||||
"AttackerIntel",
|
||||
"AttackersResponse",
|
||||
|
||||
@@ -93,11 +93,48 @@ class Attacker(SQLModel, table=True):
|
||||
# private/loopback addresses never resolve. 256 chars matches
|
||||
# RFC 1035 max hostname length.
|
||||
ptr_record: Optional[str] = Field(default=None, max_length=256)
|
||||
# Substrate-rotation telemetry, maintained by
|
||||
# ``decnet.correlation.fingerprint_rotation.record_fingerprint`` whenever
|
||||
# the prober observes a new hash for an (attacker, port, probe_type)
|
||||
# triple it has seen before. Lets the dashboard render "rotated 3×
|
||||
# last 24h" without joining to AttackerFingerprintState.
|
||||
rotation_count: int = Field(default=0)
|
||||
last_rotation_at: Optional[datetime] = Field(default=None, index=True)
|
||||
updated_at: datetime = Field(
|
||||
default_factory=lambda: datetime.now(timezone.utc), index=True
|
||||
)
|
||||
|
||||
|
||||
class AttackerFingerprintState(SQLModel, table=True):
|
||||
"""Per-(attacker, port, probe_type) latest-hash row.
|
||||
|
||||
Sole purpose: give the prober memory across runs so it can detect when
|
||||
an attacker's HASSH/JARM/TCP fingerprint flips for the same port — i.e.
|
||||
they rotated their VPS, rebuilt their SSH server, swapped their TLS
|
||||
cert. Diff detection lives in
|
||||
``decnet.correlation.fingerprint_rotation``; the prober calls into
|
||||
that library inline at each emit site and this table is the only
|
||||
persistence it needs.
|
||||
|
||||
Bounded by ``attackers × probe families × ports`` — small in practice;
|
||||
a busy fleet sees O(thousands) of rows, not O(millions).
|
||||
"""
|
||||
__tablename__ = "attacker_fingerprint_state"
|
||||
uuid: str = Field(primary_key=True)
|
||||
attacker_uuid: str = Field(foreign_key="attackers.uuid", index=True)
|
||||
port: int
|
||||
probe_type: str = Field(max_length=16) # "jarm" | "hassh" | "tcpfp"
|
||||
last_hash: str = Field(max_length=128)
|
||||
last_seen: datetime = Field(index=True)
|
||||
rotation_count: int = Field(default=0)
|
||||
__table_args__ = (
|
||||
UniqueConstraint(
|
||||
"attacker_uuid", "port", "probe_type",
|
||||
name="uq_attacker_fingerprint_state_natural",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class AttackerIdentity(SQLModel, table=True):
|
||||
"""
|
||||
Resolved actor identity — the dedup'd "same hands" row that one or
|
||||
|
||||
Reference in New Issue
Block a user