feat(prober,correlation): attacker fingerprint rotation detection (DEBT-032)
When the prober observes a NEW hash for an (attacker_uuid, port, probe_type) triple it has seen before — VPS rotation, SSH server rebuild, TLS cert swap — emit a derived attacker.fingerprint_rotated event carrying both old and new hash. Detection is a small library (decnet.correlation.fingerprint_rotation) called inline from the prober at each of the three emit sites (JARM/HASSH/TCPFP). No new daemon. New AttackerFingerprintState table holds per-triple last-hash state; Attacker.rotation_count and Attacker.last_rotation_at are stamped on every diff. Library is sync, fully unit-tested via injected publish_fn / syslog_fn callbacks.
This commit is contained in:
@@ -114,6 +114,14 @@ ATTACKER_SCORED = "scored"
|
||||
# Distinct from ``observed`` which is the correlator's first-sight signal —
|
||||
# a fingerprint is additional evidence about an already-observed attacker.
|
||||
ATTACKER_FINGERPRINTED = "fingerprinted"
|
||||
# Published when the prober observes a NEW hash for an
|
||||
# (attacker_ip, port, probe_type) triple it has seen before — i.e. the
|
||||
# attacker rotated their VPS, rebuilt their SSH server, swapped their
|
||||
# TLS cert. Distinct from ``fingerprinted`` which fires on every probe
|
||||
# result; ``fingerprint_rotated`` fires only on diff and carries both
|
||||
# old_hash + new_hash. Producer: prober (via the rotation library);
|
||||
# consumers: dashboard, forensics, attribution clustering.
|
||||
ATTACKER_FINGERPRINT_ROTATED = "fingerprint_rotated"
|
||||
ATTACKER_SESSION_STARTED = "session.started"
|
||||
ATTACKER_SESSION_ENDED = "session.ended"
|
||||
# Published by the ``decnet enrich`` worker after an enrichment pass
|
||||
|
||||
153
decnet/correlation/fingerprint_rotation.py
Normal file
153
decnet/correlation/fingerprint_rotation.py
Normal file
@@ -0,0 +1,153 @@
|
||||
"""Attacker substrate-fingerprint rotation detection.
|
||||
|
||||
Called inline from the prober at each fingerprint emit site. Looks up
|
||||
the last persisted hash for ``(attacker_uuid, port, probe_type)``;
|
||||
when the new hash differs from the last one, emits a derived
|
||||
``attacker.fingerprint_rotated`` event (bus + RFC 5424 syslog) and
|
||||
stamps the ``Attacker`` row's rotation telemetry.
|
||||
|
||||
This is a pure library — no daemon, no async loop. The prober is the
|
||||
only producer. We just teach it to derive a second event on hash
|
||||
flip without standing up another worker (DEBT-032).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import uuid as _uuid
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from typing import Any, Callable, Literal
|
||||
|
||||
from sqlmodel import Session, select
|
||||
|
||||
from decnet.web.db.models import Attacker, AttackerFingerprintState
|
||||
|
||||
ProbeType = Literal["jarm", "hassh", "tcpfp"]
|
||||
RotationKind = Literal[
|
||||
"no_attacker_row", # caller raced ahead of correlator; skip silently
|
||||
"first_sighting", # state row created, no prior hash
|
||||
"unchanged", # same hash as last sighting
|
||||
"rotated", # hash differs; event emitted, Attacker stamped
|
||||
]
|
||||
|
||||
PublishFn = Callable[[str, dict[str, Any]], None]
|
||||
SyslogFn = Callable[[str, dict[str, Any]], None]
|
||||
|
||||
|
||||
@dataclass
|
||||
class RotationOutcome:
|
||||
"""Return shape of :func:`record_fingerprint`. Caller usually
|
||||
ignores it; useful for tests + tracing."""
|
||||
kind: RotationKind
|
||||
old_hash: str | None
|
||||
new_hash: str
|
||||
rotation_count: int
|
||||
|
||||
|
||||
_ROTATED_EVENT_TYPE = "attacker.fingerprint_rotated"
|
||||
|
||||
|
||||
def record_fingerprint(
|
||||
session: Session,
|
||||
*,
|
||||
attacker_ip: str,
|
||||
port: int,
|
||||
probe_type: ProbeType,
|
||||
new_hash: str,
|
||||
ts: datetime,
|
||||
publish_fn: PublishFn | None = None,
|
||||
syslog_fn: SyslogFn | None = None,
|
||||
) -> RotationOutcome:
|
||||
"""Upsert state row; on hash diff, emit derived event + stamp.
|
||||
|
||||
Resolves ``attacker_uuid`` from ``attacker_ip`` via the existing
|
||||
Attacker table. If no Attacker row exists yet (the prober raced
|
||||
ahead of the correlator), returns ``kind="no_attacker_row"`` and
|
||||
does nothing — the next probe cycle will pick it up once the
|
||||
correlator has caught up.
|
||||
|
||||
State upsert + Attacker stamp + publish + syslog are committed in
|
||||
one transaction so a partial failure can't desync state from
|
||||
what was emitted.
|
||||
"""
|
||||
attacker = session.exec(
|
||||
select(Attacker).where(Attacker.ip == attacker_ip)
|
||||
).first()
|
||||
if attacker is None:
|
||||
return RotationOutcome(
|
||||
kind="no_attacker_row",
|
||||
old_hash=None,
|
||||
new_hash=new_hash,
|
||||
rotation_count=0,
|
||||
)
|
||||
|
||||
row = session.exec(
|
||||
select(AttackerFingerprintState).where(
|
||||
AttackerFingerprintState.attacker_uuid == attacker.uuid,
|
||||
AttackerFingerprintState.port == port,
|
||||
AttackerFingerprintState.probe_type == probe_type,
|
||||
)
|
||||
).first()
|
||||
|
||||
if row is None:
|
||||
session.add(AttackerFingerprintState(
|
||||
uuid=str(_uuid.uuid4()),
|
||||
attacker_uuid=attacker.uuid,
|
||||
port=port,
|
||||
probe_type=probe_type,
|
||||
last_hash=new_hash,
|
||||
last_seen=ts,
|
||||
rotation_count=0,
|
||||
))
|
||||
session.commit()
|
||||
return RotationOutcome(
|
||||
kind="first_sighting",
|
||||
old_hash=None,
|
||||
new_hash=new_hash,
|
||||
rotation_count=0,
|
||||
)
|
||||
|
||||
if row.last_hash == new_hash:
|
||||
row.last_seen = ts
|
||||
session.add(row)
|
||||
session.commit()
|
||||
return RotationOutcome(
|
||||
kind="unchanged",
|
||||
old_hash=row.last_hash,
|
||||
new_hash=new_hash,
|
||||
rotation_count=row.rotation_count,
|
||||
)
|
||||
|
||||
old_hash = row.last_hash
|
||||
row.last_hash = new_hash
|
||||
row.last_seen = ts
|
||||
row.rotation_count += 1
|
||||
session.add(row)
|
||||
|
||||
attacker.rotation_count += 1
|
||||
attacker.last_rotation_at = ts
|
||||
session.add(attacker)
|
||||
|
||||
payload: dict[str, Any] = {
|
||||
"attacker_uuid": attacker.uuid,
|
||||
"attacker_ip": attacker_ip,
|
||||
"port": port,
|
||||
"probe_type": probe_type,
|
||||
"old_hash": old_hash,
|
||||
"new_hash": new_hash,
|
||||
"rotation_count": row.rotation_count,
|
||||
"ts": ts.isoformat(),
|
||||
}
|
||||
|
||||
if publish_fn is not None:
|
||||
publish_fn(_ROTATED_EVENT_TYPE, payload)
|
||||
if syslog_fn is not None:
|
||||
syslog_fn(_ROTATED_EVENT_TYPE, payload)
|
||||
|
||||
session.commit()
|
||||
|
||||
return RotationOutcome(
|
||||
kind="rotated",
|
||||
old_hash=old_hash,
|
||||
new_hash=new_hash,
|
||||
rotation_count=row.rotation_count,
|
||||
)
|
||||
@@ -27,6 +27,9 @@ from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable
|
||||
|
||||
from sqlalchemy.engine import Engine
|
||||
from sqlmodel import Session
|
||||
|
||||
from decnet.bus import topics as _topics
|
||||
from decnet.bus.base import BaseBus
|
||||
from decnet.bus.factory import get_bus
|
||||
@@ -35,6 +38,10 @@ from decnet.bus.publish import (
|
||||
run_control_listener,
|
||||
run_health_heartbeat,
|
||||
)
|
||||
from decnet.correlation.fingerprint_rotation import (
|
||||
ProbeType,
|
||||
record_fingerprint,
|
||||
)
|
||||
from decnet.logging import get_logger
|
||||
from decnet.prober.hassh import hassh_server
|
||||
from decnet.prober.jarm import JARM_EMPTY_HASH, jarm_hash
|
||||
@@ -44,6 +51,21 @@ from decnet.telemetry import traced as _traced
|
||||
|
||||
logger = get_logger("prober")
|
||||
|
||||
|
||||
def _build_sync_engine() -> Engine:
|
||||
"""Construct a sync SQLite engine for rotation-detection state.
|
||||
|
||||
Used inline by the prober; it lives outside the async repository
|
||||
layer because rotation detection is a sync hook on a sync probe
|
||||
path. Honors the same defaulting as
|
||||
``decnet.web.db.sqlite.repository.SQLiteRepository``.
|
||||
"""
|
||||
import os
|
||||
from decnet.config import _ROOT
|
||||
from decnet.web.db.sqlite.database import get_sync_engine
|
||||
db_path = os.environ.get("DECNET_DB_PATH", str(_ROOT / "decnet.db"))
|
||||
return get_sync_engine(db_path)
|
||||
|
||||
# ─── Default ports per probe type ───────────────────────────────────────────
|
||||
|
||||
# JARM: common C2 callback / TLS server ports
|
||||
@@ -233,6 +255,14 @@ def _discover_attackers(json_path: Path, position: int) -> tuple[set[str], int]:
|
||||
|
||||
ProbePublishFn = Callable[[str, dict[str, Any]], None]
|
||||
|
||||
# Rotation recorder: takes (attacker_ip, port, probe_type, new_hash) and
|
||||
# performs the rotation-detection upsert + derived-event emission for the
|
||||
# DEBT-032 substrate-fingerprint flow. Optional; when None the prober
|
||||
# behaves exactly as before (raw fingerprint emit only, no rotation
|
||||
# detection). Construction lives at worker startup so phase functions
|
||||
# don't have to know about the DB engine.
|
||||
RotationRecorderFn = Callable[[str, int, "ProbeType", str], None]
|
||||
|
||||
|
||||
@_traced("prober.probe_cycle")
|
||||
def _probe_cycle(
|
||||
@@ -245,6 +275,7 @@ def _probe_cycle(
|
||||
json_path: Path,
|
||||
timeout: float = 5.0,
|
||||
publish_fn: ProbePublishFn | None = None,
|
||||
record_rotation: RotationRecorderFn | None = None,
|
||||
) -> None:
|
||||
"""
|
||||
Probe all known attacker IPs with JARM, HASSH, and TCP/IP fingerprinting.
|
||||
@@ -263,13 +294,13 @@ def _probe_cycle(
|
||||
ip_probed = probed.setdefault(ip, {})
|
||||
|
||||
# Phase 1: JARM (TLS fingerprinting)
|
||||
_jarm_phase(ip, ip_probed, jarm_ports, log_path, json_path, timeout, publish_fn)
|
||||
_jarm_phase(ip, ip_probed, jarm_ports, log_path, json_path, timeout, publish_fn, record_rotation)
|
||||
|
||||
# Phase 2: HASSHServer (SSH fingerprinting)
|
||||
_hassh_phase(ip, ip_probed, ssh_ports, log_path, json_path, timeout, publish_fn)
|
||||
_hassh_phase(ip, ip_probed, ssh_ports, log_path, json_path, timeout, publish_fn, record_rotation)
|
||||
|
||||
# Phase 3: TCP/IP stack fingerprinting
|
||||
_tcpfp_phase(ip, ip_probed, tcpfp_ports, log_path, json_path, timeout, publish_fn)
|
||||
_tcpfp_phase(ip, ip_probed, tcpfp_ports, log_path, json_path, timeout, publish_fn, record_rotation)
|
||||
|
||||
|
||||
@_traced("prober.jarm_phase")
|
||||
@@ -281,6 +312,7 @@ def _jarm_phase(
|
||||
json_path: Path,
|
||||
timeout: float,
|
||||
publish_fn: ProbePublishFn | None = None,
|
||||
record_rotation: RotationRecorderFn | None = None,
|
||||
) -> None:
|
||||
"""JARM-fingerprint an IP on the given TLS ports."""
|
||||
done = ip_probed.setdefault("jarm", set())
|
||||
@@ -301,6 +333,8 @@ def _jarm_phase(
|
||||
msg=f"JARM {ip}:{port} = {h}",
|
||||
)
|
||||
logger.info("prober: JARM %s:%d = %s", ip, port, h)
|
||||
if record_rotation is not None:
|
||||
record_rotation(ip, port, "jarm", h)
|
||||
if publish_fn is not None:
|
||||
publish_fn(
|
||||
"jarm",
|
||||
@@ -387,6 +421,7 @@ def _hassh_phase(
|
||||
json_path: Path,
|
||||
timeout: float,
|
||||
publish_fn: ProbePublishFn | None = None,
|
||||
record_rotation: RotationRecorderFn | None = None,
|
||||
) -> None:
|
||||
"""HASSHServer-fingerprint an IP on the given SSH ports."""
|
||||
done = ip_probed.setdefault("hassh", set())
|
||||
@@ -412,6 +447,8 @@ def _hassh_phase(
|
||||
msg=f"HASSH {ip}:{port} = {result['hassh_server']}",
|
||||
)
|
||||
logger.info("prober: HASSH %s:%d = %s", ip, port, result["hassh_server"])
|
||||
if record_rotation is not None:
|
||||
record_rotation(ip, port, "hassh", result["hassh_server"])
|
||||
if publish_fn is not None:
|
||||
publish_fn(
|
||||
"hassh",
|
||||
@@ -445,6 +482,7 @@ def _tcpfp_phase(
|
||||
json_path: Path,
|
||||
timeout: float,
|
||||
publish_fn: ProbePublishFn | None = None,
|
||||
record_rotation: RotationRecorderFn | None = None,
|
||||
) -> None:
|
||||
"""TCP/IP stack fingerprint an IP on the given ports."""
|
||||
done = ip_probed.setdefault("tcpfp", set())
|
||||
@@ -478,6 +516,8 @@ def _tcpfp_phase(
|
||||
msg=f"TCPFP {ip}:{port} = {result['tcpfp_hash']}",
|
||||
)
|
||||
logger.info("prober: TCPFP %s:%d = %s", ip, port, result["tcpfp_hash"])
|
||||
if record_rotation is not None:
|
||||
record_rotation(ip, port, "tcpfp", result["tcpfp_hash"])
|
||||
if publish_fn is not None:
|
||||
publish_fn(
|
||||
"tcpfp",
|
||||
@@ -586,6 +626,61 @@ async def prober_worker(
|
||||
event_type,
|
||||
)
|
||||
|
||||
# Substrate-rotation detection (DEBT-032) — open a sync engine for
|
||||
# the prober's lifetime; recorder closes a session per call so we
|
||||
# never hold a connection across phase boundaries. Failure to
|
||||
# connect is non-fatal: probes continue, rotation detection is
|
||||
# silently disabled.
|
||||
rotation_engine: Engine | None = None
|
||||
record_rotation: RotationRecorderFn | None = None
|
||||
try:
|
||||
rotation_engine = _build_sync_engine()
|
||||
except Exception as exc: # noqa: BLE001
|
||||
logger.warning(
|
||||
"prober: rotation-detection DB unavailable, "
|
||||
"running with rotation detection disabled: %s", exc,
|
||||
)
|
||||
|
||||
if rotation_engine is not None:
|
||||
def _publish_rotation(event_type: str, payload: dict[str, Any]) -> None:
|
||||
raw_publish(
|
||||
_topics.attacker(_topics.ATTACKER_FINGERPRINT_ROTATED),
|
||||
payload,
|
||||
event_type,
|
||||
)
|
||||
|
||||
def _syslog_rotation(event_type: str, payload: dict[str, Any]) -> None:
|
||||
_write_event(
|
||||
log_path, json_path,
|
||||
"fingerprint_rotated",
|
||||
target_ip=payload["attacker_ip"],
|
||||
target_port=str(payload["port"]),
|
||||
probe_type=payload["probe_type"],
|
||||
old_hash=payload.get("old_hash") or "",
|
||||
new_hash=payload["new_hash"],
|
||||
rotation_count=str(payload["rotation_count"]),
|
||||
msg=(
|
||||
f"FP rotation {payload['attacker_ip']}:{payload['port']} "
|
||||
f"{payload['probe_type']} {payload.get('old_hash')} → "
|
||||
f"{payload['new_hash']}"
|
||||
),
|
||||
)
|
||||
|
||||
def record_rotation(
|
||||
ip: str, port: int, probe_type: ProbeType, new_hash: str,
|
||||
) -> None:
|
||||
with Session(rotation_engine) as session:
|
||||
record_fingerprint(
|
||||
session,
|
||||
attacker_ip=ip,
|
||||
port=port,
|
||||
probe_type=probe_type,
|
||||
new_hash=new_hash,
|
||||
ts=datetime.now(timezone.utc),
|
||||
publish_fn=_publish_rotation,
|
||||
syslog_fn=_syslog_rotation,
|
||||
)
|
||||
|
||||
shutdown = asyncio.Event()
|
||||
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "prober"))
|
||||
control_task = asyncio.create_task(
|
||||
@@ -612,6 +707,7 @@ async def prober_worker(
|
||||
jarm_ports, hassh_ports, tcp_ports,
|
||||
log_path, json_path, timeout,
|
||||
_publish_attacker,
|
||||
record_rotation,
|
||||
)
|
||||
|
||||
try:
|
||||
@@ -626,3 +722,6 @@ async def prober_worker(
|
||||
if bus is not None:
|
||||
with contextlib.suppress(Exception):
|
||||
await bus.close()
|
||||
if rotation_engine is not None:
|
||||
with contextlib.suppress(Exception):
|
||||
rotation_engine.dispose()
|
||||
|
||||
@@ -45,6 +45,7 @@ from .auth import (
|
||||
from .attackers import (
|
||||
Attacker,
|
||||
AttackerBehavior,
|
||||
AttackerFingerprintState,
|
||||
AttackerIdentity,
|
||||
AttackersResponse,
|
||||
SessionProfile,
|
||||
@@ -242,6 +243,7 @@ __all__ = [
|
||||
# attackers
|
||||
"Attacker",
|
||||
"AttackerBehavior",
|
||||
"AttackerFingerprintState",
|
||||
"AttackerIdentity",
|
||||
"AttackerIntel",
|
||||
"AttackersResponse",
|
||||
|
||||
@@ -93,11 +93,48 @@ class Attacker(SQLModel, table=True):
|
||||
# private/loopback addresses never resolve. 256 chars matches
|
||||
# RFC 1035 max hostname length.
|
||||
ptr_record: Optional[str] = Field(default=None, max_length=256)
|
||||
# Substrate-rotation telemetry, maintained by
|
||||
# ``decnet.correlation.fingerprint_rotation.record_fingerprint`` whenever
|
||||
# the prober observes a new hash for an (attacker, port, probe_type)
|
||||
# triple it has seen before. Lets the dashboard render "rotated 3×
|
||||
# last 24h" without joining to AttackerFingerprintState.
|
||||
rotation_count: int = Field(default=0)
|
||||
last_rotation_at: Optional[datetime] = Field(default=None, index=True)
|
||||
updated_at: datetime = Field(
|
||||
default_factory=lambda: datetime.now(timezone.utc), index=True
|
||||
)
|
||||
|
||||
|
||||
class AttackerFingerprintState(SQLModel, table=True):
|
||||
"""Per-(attacker, port, probe_type) latest-hash row.
|
||||
|
||||
Sole purpose: give the prober memory across runs so it can detect when
|
||||
an attacker's HASSH/JARM/TCP fingerprint flips for the same port — i.e.
|
||||
they rotated their VPS, rebuilt their SSH server, swapped their TLS
|
||||
cert. Diff detection lives in
|
||||
``decnet.correlation.fingerprint_rotation``; the prober calls into
|
||||
that library inline at each emit site and this table is the only
|
||||
persistence it needs.
|
||||
|
||||
Bounded by ``attackers × probe families × ports`` — small in practice;
|
||||
a busy fleet sees O(thousands) of rows, not O(millions).
|
||||
"""
|
||||
__tablename__ = "attacker_fingerprint_state"
|
||||
uuid: str = Field(primary_key=True)
|
||||
attacker_uuid: str = Field(foreign_key="attackers.uuid", index=True)
|
||||
port: int
|
||||
probe_type: str = Field(max_length=16) # "jarm" | "hassh" | "tcpfp"
|
||||
last_hash: str = Field(max_length=128)
|
||||
last_seen: datetime = Field(index=True)
|
||||
rotation_count: int = Field(default=0)
|
||||
__table_args__ = (
|
||||
UniqueConstraint(
|
||||
"attacker_uuid", "port", "probe_type",
|
||||
name="uq_attacker_fingerprint_state_natural",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class AttackerIdentity(SQLModel, table=True):
|
||||
"""
|
||||
Resolved actor identity — the dedup'd "same hands" row that one or
|
||||
|
||||
@@ -470,22 +470,14 @@ The threat-intel enrichment surface (DEBT-N/A: `feat(intel)` series) keys every
|
||||
|
||||
**Status:** Open. No operational impact today (single-IP attackers are the dominant case), but worth closing before the federation layer lands so the wire-format and API both speak in identity terms, not IP terms.
|
||||
|
||||
### DEBT-032 — Prober can't detect fingerprint rotation without mutation
|
||||
**Files:** `decnet/prober/worker.py` (~lines 235, 286, 334, 392), `decnet/web/db/models.py` (new `decky_service_fingerprints` table).
|
||||
### ~~DEBT-032 — Attacker fingerprint rotation detection~~ ✅ RESOLVED
|
||||
**Files:** `decnet/correlation/fingerprint_rotation.py` (new), `decnet/prober/worker.py`, `decnet/web/db/models/attackers.py`, `decnet/bus/topics.py`.
|
||||
|
||||
Substrate identity is `(service_name, implementation_fingerprint)`, not service name alone. A base-image rebuild that rotates OpenSSH 8.4 → 9.2 — or any recompose that changes JARM / HASSH / TCP fingerprint without changing the service list — is a substrate transition from the attacker's recon POV, and today the correlation graph sees none of it.
|
||||
Resolved 2026-05-03. **Reframed during planning:** the original entry described this as a per-decky substrate-integrity problem, but the prober probes *attackers*, not deckies. The actual gap was attacker substrate tracking — same attacker IP rotating their VPS, rebuilding their SSH server, swapping their TLS cert — invisible at correlator-time because nothing diffed consecutive hashes for the same `(attacker_ip, port, probe_type)` triple.
|
||||
|
||||
The prober already computes JARM (`worker.py:286`), HASSH (`worker.py:334`), and TCP fingerprint (`worker.py:392`), and emits each as RFC 5424 syslog + optional bus publish. What's missing is **per-(decky, service, probe_type) persistence** to diff against: the current dedup set `probed: dict[IP → {probe_type → set(ports)}]` (`worker.py:235`) is in-memory and scoped to one run, so any restart loses history and any same-IP probe on a changed substrate can't be detected as a change.
|
||||
Implemented as a small library (`decnet.correlation.fingerprint_rotation.record_fingerprint`) called inline from the prober at each of the three emit sites (JARM / HASSH / TCPFP). No new worker daemon; the prober is still the only producer, just teaches it to derive a second event on hash flip. New `AttackerFingerprintState` table holds per-`(attacker_uuid, port, probe_type)` last-hash state. New bus topic `attacker.fingerprint_rotated` carries `{attacker_uuid, attacker_ip, port, probe_type, old_hash, new_hash, rotation_count, ts}`. `Attacker.rotation_count` and `Attacker.last_rotation_at` are stamped on every diff so the dashboard can render rotation telemetry without joining. Library is fully sync + unit-tested with injected publish_fn / syslog_fn callbacks.
|
||||
|
||||
**Design:**
|
||||
1. New SQLModel table `decky_service_fingerprints` keyed by `(decky_name, service, probe_type)` with `last_hash, last_seen_at, sample_count`. One upsert per probe; bounded by fleet × probe families.
|
||||
2. Prober reads `last_hash` before emitting; on diff, emits a new `substrate_fingerprint_changed` event (RFC 5424 syslog + `decky.{id}.fingerprint` bus topic) with `{decky, service, probe_type, old_hash, new_hash}`. On match, upsert the timestamp and skip the event.
|
||||
3. Correlator consumes the new event kind into a parallel per-decky index (mirroring the mutation index landed in this session) and interleaves `🔍 decky-03 hassh drift` markers in `AttackerTraversal.fingerprints_during`.
|
||||
4. Divergence detector: compare `substrate_state(t)` fold (mutations) vs `observed_identity(t)` fold (fingerprints) per decky. A fingerprint change without a preceding mutation ⇒ `substrate_divergence` finding — container drift, compromised base image, rootkit banner rewrite, or prober lag. Falls out of the data model for free once both streams exist.
|
||||
|
||||
**Prerequisite satisfied:** mutation event stream + correlator mutation-kind parser landed alongside this DEBT entry (commits `f875350`, `fa0cdb3`, `bf5ed7a`, `d4d8a2a` on `dev`). The fingerprint stream plugs into the same substrate: same RFC 5424 emission pattern, sibling per-decky engine index, same timeline interleaving.
|
||||
|
||||
**Status:** Open — deferred to its own commit sequence. The dedup state in `worker.py:235` is the only thing standing between "JARM hash computed" and "substrate rotation detected."
|
||||
Out of scope (deferred): dashboard surfacing of `rotation_count`; attribution clustering across attackers (same JARM seen from different IPs); backfill from existing event store.
|
||||
|
||||
---
|
||||
|
||||
@@ -713,7 +705,7 @@ user who needs it.
|
||||
| DEBT-029 | 🟡 Medium | Architecture / Bus | ✅ resolved |
|
||||
| DEBT-030 | 🟡 Medium | Web / Live mutations | ✅ resolved (Phase A) |
|
||||
| ~~DEBT-031~~ | ✅ | Workers / Bus integration | resolved |
|
||||
| DEBT-032 | 🟡 Medium | Correlation / Prober | open |
|
||||
| ~~DEBT-032~~ | ✅ | Correlation / Prober | resolved 2026-05-03 |
|
||||
| DEBT-033 | 🟡 Medium | Storage / Session recording | open |
|
||||
| ~~DEBT-035~~ | ✅ | Artifacts / Filesystem perms | resolved 2026-05-02 |
|
||||
| DEBT-036 | 🟡 Medium | Correlation / Keystroke dynamics | open |
|
||||
@@ -731,5 +723,5 @@ user who needs it.
|
||||
| DEBT-048 | 🟡 Medium | TTP / Intel provider mapping review (recurring) | open / recurring |
|
||||
| DEBT-049 | 🟡 Medium | TTP / Sigma adapter (post-v1) | open |
|
||||
|
||||
**Remaining open:** DEBT-011 (Alembic), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-032 (fingerprint rotation detection), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-042 (orchestrator failure-count window), DEBT-043 (frontend test framework), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-046 (mal-hash feed), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1).
|
||||
**Remaining open:** DEBT-011 (Alembic), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-042 (orchestrator failure-count window), DEBT-043 (frontend test framework), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-046 (mal-hash feed), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1).
|
||||
**Estimated remaining effort:** ~21 hours plus the new EmailLifter / TTP follow-ups. DEBT-030 Phase B (optimistic staged-buffer editor) is a follow-up, not debt.
|
||||
|
||||
236
tests/correlation/test_fingerprint_rotation.py
Normal file
236
tests/correlation/test_fingerprint_rotation.py
Normal file
@@ -0,0 +1,236 @@
|
||||
"""Unit tests for ``decnet.correlation.fingerprint_rotation``.
|
||||
|
||||
Pure library: in-memory SQLite + sync Session + collected callback
|
||||
calls. No prober, no bus, no async. Each test seeds an Attacker row,
|
||||
calls ``record_fingerprint``, asserts on the returned outcome + the
|
||||
side-effects (state row, Attacker stamp, callback invocations).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.engine import Engine
|
||||
from sqlmodel import Session, SQLModel, create_engine, select
|
||||
|
||||
from decnet.correlation.fingerprint_rotation import (
|
||||
record_fingerprint,
|
||||
RotationOutcome,
|
||||
)
|
||||
from decnet.web.db.models import (
|
||||
Attacker,
|
||||
AttackerFingerprintState,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def engine() -> Engine:
|
||||
eng = create_engine("sqlite://", connect_args={"check_same_thread": False})
|
||||
SQLModel.metadata.create_all(eng)
|
||||
return eng
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def now() -> datetime:
|
||||
return datetime(2026, 5, 3, 12, 0, 0, tzinfo=timezone.utc)
|
||||
|
||||
|
||||
def _seed_attacker(session: Session, ip: str = "1.2.3.4") -> Attacker:
|
||||
a = Attacker(
|
||||
uuid="attacker-uuid-1",
|
||||
ip=ip,
|
||||
first_seen=datetime.now(timezone.utc),
|
||||
last_seen=datetime.now(timezone.utc),
|
||||
)
|
||||
session.add(a)
|
||||
session.commit()
|
||||
session.refresh(a)
|
||||
return a
|
||||
|
||||
|
||||
class _Recorder:
|
||||
"""Capture (event_type, payload) tuples from publish_fn / syslog_fn."""
|
||||
def __init__(self) -> None:
|
||||
self.calls: list[tuple[str, dict]] = []
|
||||
|
||||
def __call__(self, event_type: str, payload: dict) -> None:
|
||||
self.calls.append((event_type, payload))
|
||||
|
||||
|
||||
def test_no_attacker_row_returns_noop(engine, now):
|
||||
publish, syslog = _Recorder(), _Recorder()
|
||||
with Session(engine) as session:
|
||||
outcome = record_fingerprint(
|
||||
session,
|
||||
attacker_ip="9.9.9.9",
|
||||
port=22,
|
||||
probe_type="hassh",
|
||||
new_hash="abc",
|
||||
ts=now,
|
||||
publish_fn=publish,
|
||||
syslog_fn=syslog,
|
||||
)
|
||||
assert outcome.kind == "no_attacker_row"
|
||||
assert publish.calls == []
|
||||
assert syslog.calls == []
|
||||
with Session(engine) as session:
|
||||
rows = session.exec(select(AttackerFingerprintState)).all()
|
||||
assert rows == []
|
||||
|
||||
|
||||
def test_first_sighting_creates_state_row_no_event(engine, now):
|
||||
publish, syslog = _Recorder(), _Recorder()
|
||||
with Session(engine) as session:
|
||||
_seed_attacker(session)
|
||||
outcome = record_fingerprint(
|
||||
session,
|
||||
attacker_ip="1.2.3.4",
|
||||
port=22,
|
||||
probe_type="hassh",
|
||||
new_hash="hash-1",
|
||||
ts=now,
|
||||
publish_fn=publish,
|
||||
syslog_fn=syslog,
|
||||
)
|
||||
assert outcome.kind == "first_sighting"
|
||||
assert outcome.old_hash is None
|
||||
assert outcome.new_hash == "hash-1"
|
||||
assert outcome.rotation_count == 0
|
||||
assert publish.calls == []
|
||||
assert syslog.calls == []
|
||||
with Session(engine) as session:
|
||||
rows = session.exec(select(AttackerFingerprintState)).all()
|
||||
assert len(rows) == 1
|
||||
assert rows[0].last_hash == "hash-1"
|
||||
assert rows[0].rotation_count == 0
|
||||
a = session.exec(select(Attacker)).one()
|
||||
assert a.rotation_count == 0
|
||||
assert a.last_rotation_at is None
|
||||
|
||||
|
||||
def test_unchanged_hash_bumps_last_seen_no_event(engine, now):
|
||||
publish, syslog = _Recorder(), _Recorder()
|
||||
later = now + timedelta(minutes=10)
|
||||
with Session(engine) as session:
|
||||
_seed_attacker(session)
|
||||
record_fingerprint(
|
||||
session,
|
||||
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
|
||||
new_hash="hash-1", ts=now,
|
||||
)
|
||||
outcome = record_fingerprint(
|
||||
session,
|
||||
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
|
||||
new_hash="hash-1", ts=later,
|
||||
publish_fn=publish, syslog_fn=syslog,
|
||||
)
|
||||
assert outcome.kind == "unchanged"
|
||||
assert publish.calls == []
|
||||
assert syslog.calls == []
|
||||
with Session(engine) as session:
|
||||
row = session.exec(select(AttackerFingerprintState)).one()
|
||||
# SQLite strips tzinfo on round-trip; compare naive values.
|
||||
assert row.last_seen.replace(tzinfo=timezone.utc) == later
|
||||
assert row.rotation_count == 0
|
||||
|
||||
|
||||
def test_rotated_emits_event_and_stamps_attacker(engine, now):
|
||||
publish, syslog = _Recorder(), _Recorder()
|
||||
later = now + timedelta(hours=1)
|
||||
with Session(engine) as session:
|
||||
_seed_attacker(session)
|
||||
record_fingerprint(
|
||||
session,
|
||||
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
|
||||
new_hash="hash-1", ts=now,
|
||||
)
|
||||
outcome = record_fingerprint(
|
||||
session,
|
||||
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
|
||||
new_hash="hash-2", ts=later,
|
||||
publish_fn=publish, syslog_fn=syslog,
|
||||
)
|
||||
|
||||
assert outcome.kind == "rotated"
|
||||
assert outcome.old_hash == "hash-1"
|
||||
assert outcome.new_hash == "hash-2"
|
||||
assert outcome.rotation_count == 1
|
||||
|
||||
assert len(publish.calls) == 1
|
||||
assert len(syslog.calls) == 1
|
||||
event_type, payload = publish.calls[0]
|
||||
assert event_type == "attacker.fingerprint_rotated"
|
||||
assert payload["attacker_uuid"] == "attacker-uuid-1"
|
||||
assert payload["attacker_ip"] == "1.2.3.4"
|
||||
assert payload["port"] == 22
|
||||
assert payload["probe_type"] == "hassh"
|
||||
assert payload["old_hash"] == "hash-1"
|
||||
assert payload["new_hash"] == "hash-2"
|
||||
assert payload["rotation_count"] == 1
|
||||
assert payload["ts"] == later.isoformat()
|
||||
|
||||
with Session(engine) as session:
|
||||
a = session.exec(select(Attacker)).one()
|
||||
assert a.rotation_count == 1
|
||||
assert a.last_rotation_at is not None
|
||||
assert a.last_rotation_at.replace(tzinfo=timezone.utc) == later
|
||||
row = session.exec(select(AttackerFingerprintState)).one()
|
||||
assert row.last_hash == "hash-2"
|
||||
assert row.rotation_count == 1
|
||||
|
||||
|
||||
def test_three_probe_types_independent(engine, now):
|
||||
with Session(engine) as session:
|
||||
_seed_attacker(session)
|
||||
for ptype in ("jarm", "hassh", "tcpfp"):
|
||||
record_fingerprint(
|
||||
session,
|
||||
attacker_ip="1.2.3.4", port=22, probe_type=ptype,
|
||||
new_hash=f"{ptype}-1", ts=now,
|
||||
)
|
||||
with Session(engine) as session:
|
||||
rows = session.exec(select(AttackerFingerprintState)).all()
|
||||
assert {r.probe_type for r in rows} == {"jarm", "hassh", "tcpfp"}
|
||||
assert {r.last_hash for r in rows} == {"jarm-1", "hassh-1", "tcpfp-1"}
|
||||
|
||||
|
||||
def test_two_ports_same_probe_type_independent(engine, now):
|
||||
with Session(engine) as session:
|
||||
_seed_attacker(session)
|
||||
for port in (22, 2222):
|
||||
record_fingerprint(
|
||||
session,
|
||||
attacker_ip="1.2.3.4", port=port, probe_type="hassh",
|
||||
new_hash=f"hash-{port}", ts=now,
|
||||
)
|
||||
with Session(engine) as session:
|
||||
rows = session.exec(select(AttackerFingerprintState)).all()
|
||||
assert {r.port for r in rows} == {22, 2222}
|
||||
|
||||
|
||||
def test_multiple_rotations_increment_counter(engine, now):
|
||||
publish = _Recorder()
|
||||
with Session(engine) as session:
|
||||
_seed_attacker(session)
|
||||
record_fingerprint(
|
||||
session,
|
||||
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
|
||||
new_hash="h1", ts=now, publish_fn=publish,
|
||||
)
|
||||
record_fingerprint(
|
||||
session,
|
||||
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
|
||||
new_hash="h2", ts=now + timedelta(minutes=5), publish_fn=publish,
|
||||
)
|
||||
record_fingerprint(
|
||||
session,
|
||||
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
|
||||
new_hash="h3", ts=now + timedelta(minutes=10), publish_fn=publish,
|
||||
)
|
||||
assert len(publish.calls) == 2 # first call was first_sighting (no event)
|
||||
with Session(engine) as session:
|
||||
a = session.exec(select(Attacker)).one()
|
||||
assert a.rotation_count == 2
|
||||
row = session.exec(select(AttackerFingerprintState)).one()
|
||||
assert row.rotation_count == 2
|
||||
assert row.last_hash == "h3"
|
||||
142
tests/prober/test_prober_rotation.py
Normal file
142
tests/prober/test_prober_rotation.py
Normal file
@@ -0,0 +1,142 @@
|
||||
"""Integration test: prober phase functions invoke the rotation recorder.
|
||||
|
||||
The prober worker constructs the recorder closure at startup; here we
|
||||
verify that ``_probe_cycle`` threads a recorder through to JARM / HASSH
|
||||
/ TCPFP phases and that the recorder gets the (ip, port, probe_type,
|
||||
hash) tuple it expects. The library itself is unit-tested separately.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from decnet.prober.worker import _probe_cycle
|
||||
|
||||
|
||||
@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None)
|
||||
@patch("decnet.prober.worker.tcp_fingerprint", return_value=None)
|
||||
@patch("decnet.prober.worker.hassh_server", return_value=None)
|
||||
@patch("decnet.prober.worker.jarm_hash")
|
||||
def test_jarm_phase_calls_recorder(
|
||||
mock_jarm: MagicMock,
|
||||
_mock_hassh: MagicMock,
|
||||
_mock_tcpfp: MagicMock,
|
||||
_mock_cert: MagicMock,
|
||||
tmp_path: Path,
|
||||
):
|
||||
mock_jarm.return_value = "c0c" * 10 + "a" * 32
|
||||
log_path = tmp_path / "decnet.log"
|
||||
json_path = tmp_path / "decnet.json"
|
||||
rec_calls: list[tuple] = []
|
||||
recorder = lambda ip, port, ptype, h: rec_calls.append((ip, port, ptype, h)) # noqa: E731
|
||||
|
||||
_probe_cycle(
|
||||
{"10.0.0.5"}, {},
|
||||
[443], [], [],
|
||||
log_path, json_path,
|
||||
timeout=1.0,
|
||||
publish_fn=None,
|
||||
record_rotation=recorder,
|
||||
)
|
||||
|
||||
assert rec_calls == [("10.0.0.5", 443, "jarm", "c0c" * 10 + "a" * 32)]
|
||||
|
||||
|
||||
@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None)
|
||||
@patch("decnet.prober.worker.tcp_fingerprint", return_value=None)
|
||||
@patch("decnet.prober.worker.hassh_server")
|
||||
@patch("decnet.prober.worker.jarm_hash", return_value="")
|
||||
def test_hassh_phase_calls_recorder(
|
||||
_mock_jarm: MagicMock,
|
||||
mock_hassh: MagicMock,
|
||||
_mock_tcpfp: MagicMock,
|
||||
_mock_cert: MagicMock,
|
||||
tmp_path: Path,
|
||||
):
|
||||
mock_hassh.return_value = {
|
||||
"hassh_server": "deadbeef",
|
||||
"banner": "SSH-2.0-OpenSSH_9.2",
|
||||
"kex_algorithms": "x",
|
||||
"encryption_s2c": "x",
|
||||
"mac_s2c": "x",
|
||||
"compression_s2c": "x",
|
||||
}
|
||||
log_path = tmp_path / "decnet.log"
|
||||
json_path = tmp_path / "decnet.json"
|
||||
rec_calls: list[tuple] = []
|
||||
recorder = lambda ip, port, ptype, h: rec_calls.append((ip, port, ptype, h)) # noqa: E731
|
||||
|
||||
_probe_cycle(
|
||||
{"10.0.0.5"}, {},
|
||||
[], [22], [],
|
||||
log_path, json_path,
|
||||
timeout=1.0,
|
||||
publish_fn=None,
|
||||
record_rotation=recorder,
|
||||
)
|
||||
|
||||
assert rec_calls == [("10.0.0.5", 22, "hassh", "deadbeef")]
|
||||
|
||||
|
||||
@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None)
|
||||
@patch("decnet.prober.worker.tcp_fingerprint")
|
||||
@patch("decnet.prober.worker.hassh_server", return_value=None)
|
||||
@patch("decnet.prober.worker.jarm_hash", return_value="")
|
||||
def test_tcpfp_phase_calls_recorder(
|
||||
_mock_jarm, _mock_hassh, mock_tcpfp, _mock_cert, tmp_path: Path,
|
||||
):
|
||||
mock_tcpfp.return_value = {
|
||||
"tcpfp_hash": "tcpfp-hash-1",
|
||||
"tcpfp_raw": "raw",
|
||||
"ttl": 64,
|
||||
"window_size": 65535,
|
||||
"df_bit": True,
|
||||
"mss": 1460,
|
||||
"window_scale": 7,
|
||||
"sack_ok": True,
|
||||
"timestamp": True,
|
||||
"options_order": "MSS,SACK,TS,NOP,WS",
|
||||
"tos": 0,
|
||||
"dscp": 0,
|
||||
"ecn": 0,
|
||||
"server_isn": 0,
|
||||
}
|
||||
log_path = tmp_path / "decnet.log"
|
||||
json_path = tmp_path / "decnet.json"
|
||||
rec_calls: list[tuple] = []
|
||||
recorder = lambda ip, port, ptype, h: rec_calls.append((ip, port, ptype, h)) # noqa: E731
|
||||
|
||||
_probe_cycle(
|
||||
{"10.0.0.5"}, {},
|
||||
[], [], [22],
|
||||
log_path, json_path,
|
||||
timeout=1.0,
|
||||
publish_fn=None,
|
||||
record_rotation=recorder,
|
||||
)
|
||||
|
||||
assert rec_calls == [("10.0.0.5", 22, "tcpfp", "tcpfp-hash-1")]
|
||||
|
||||
|
||||
@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None)
|
||||
@patch("decnet.prober.worker.tcp_fingerprint", return_value=None)
|
||||
@patch("decnet.prober.worker.hassh_server", return_value=None)
|
||||
@patch("decnet.prober.worker.jarm_hash")
|
||||
def test_recorder_optional_no_crash_when_none(
|
||||
mock_jarm: MagicMock,
|
||||
_mock_hassh: MagicMock,
|
||||
_mock_tcpfp: MagicMock,
|
||||
_mock_cert: MagicMock,
|
||||
tmp_path: Path,
|
||||
):
|
||||
"""record_rotation=None must keep the prober's pre-DEBT-032 behavior."""
|
||||
mock_jarm.return_value = "c0c" * 10 + "a" * 32
|
||||
_probe_cycle(
|
||||
{"10.0.0.5"}, {},
|
||||
[443], [], [],
|
||||
tmp_path / "decnet.log", tmp_path / "decnet.json",
|
||||
timeout=1.0,
|
||||
publish_fn=None,
|
||||
record_rotation=None,
|
||||
)
|
||||
# No error, probe completes.
|
||||
Reference in New Issue
Block a user