feat(prober,correlation): attacker fingerprint rotation detection (DEBT-032)

When the prober observes a NEW hash for an
(attacker_uuid, port, probe_type) triple it has seen before — VPS
rotation, SSH server rebuild, TLS cert swap — emit a derived
attacker.fingerprint_rotated event carrying both old and new hash.
Detection is a small library (decnet.correlation.fingerprint_rotation)
called inline from the prober at each of the three emit sites
(JARM/HASSH/TCPFP). No new daemon. New AttackerFingerprintState table
holds per-triple last-hash state; Attacker.rotation_count and
Attacker.last_rotation_at are stamped on every diff. Library is sync,
fully unit-tested via injected publish_fn / syslog_fn callbacks.
This commit is contained in:
2026-05-03 05:12:51 -04:00
parent dcd558fd91
commit 6c6f97e840
8 changed files with 687 additions and 18 deletions

View File

@@ -114,6 +114,14 @@ ATTACKER_SCORED = "scored"
# Distinct from ``observed`` which is the correlator's first-sight signal — # Distinct from ``observed`` which is the correlator's first-sight signal —
# a fingerprint is additional evidence about an already-observed attacker. # a fingerprint is additional evidence about an already-observed attacker.
ATTACKER_FINGERPRINTED = "fingerprinted" ATTACKER_FINGERPRINTED = "fingerprinted"
# Published when the prober observes a NEW hash for an
# (attacker_ip, port, probe_type) triple it has seen before — i.e. the
# attacker rotated their VPS, rebuilt their SSH server, swapped their
# TLS cert. Distinct from ``fingerprinted`` which fires on every probe
# result; ``fingerprint_rotated`` fires only on diff and carries both
# old_hash + new_hash. Producer: prober (via the rotation library);
# consumers: dashboard, forensics, attribution clustering.
ATTACKER_FINGERPRINT_ROTATED = "fingerprint_rotated"
ATTACKER_SESSION_STARTED = "session.started" ATTACKER_SESSION_STARTED = "session.started"
ATTACKER_SESSION_ENDED = "session.ended" ATTACKER_SESSION_ENDED = "session.ended"
# Published by the ``decnet enrich`` worker after an enrichment pass # Published by the ``decnet enrich`` worker after an enrichment pass

View File

@@ -0,0 +1,153 @@
"""Attacker substrate-fingerprint rotation detection.
Called inline from the prober at each fingerprint emit site. Looks up
the last persisted hash for ``(attacker_uuid, port, probe_type)``;
when the new hash differs from the last one, emits a derived
``attacker.fingerprint_rotated`` event (bus + RFC 5424 syslog) and
stamps the ``Attacker`` row's rotation telemetry.
This is a pure library — no daemon, no async loop. The prober is the
only producer. We just teach it to derive a second event on hash
flip without standing up another worker (DEBT-032).
"""
from __future__ import annotations
import uuid as _uuid
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable, Literal
from sqlmodel import Session, select
from decnet.web.db.models import Attacker, AttackerFingerprintState
ProbeType = Literal["jarm", "hassh", "tcpfp"]
RotationKind = Literal[
"no_attacker_row", # caller raced ahead of correlator; skip silently
"first_sighting", # state row created, no prior hash
"unchanged", # same hash as last sighting
"rotated", # hash differs; event emitted, Attacker stamped
]
PublishFn = Callable[[str, dict[str, Any]], None]
SyslogFn = Callable[[str, dict[str, Any]], None]
@dataclass
class RotationOutcome:
"""Return shape of :func:`record_fingerprint`. Caller usually
ignores it; useful for tests + tracing."""
kind: RotationKind
old_hash: str | None
new_hash: str
rotation_count: int
_ROTATED_EVENT_TYPE = "attacker.fingerprint_rotated"
def record_fingerprint(
session: Session,
*,
attacker_ip: str,
port: int,
probe_type: ProbeType,
new_hash: str,
ts: datetime,
publish_fn: PublishFn | None = None,
syslog_fn: SyslogFn | None = None,
) -> RotationOutcome:
"""Upsert state row; on hash diff, emit derived event + stamp.
Resolves ``attacker_uuid`` from ``attacker_ip`` via the existing
Attacker table. If no Attacker row exists yet (the prober raced
ahead of the correlator), returns ``kind="no_attacker_row"`` and
does nothing — the next probe cycle will pick it up once the
correlator has caught up.
State upsert + Attacker stamp + publish + syslog are committed in
one transaction so a partial failure can't desync state from
what was emitted.
"""
attacker = session.exec(
select(Attacker).where(Attacker.ip == attacker_ip)
).first()
if attacker is None:
return RotationOutcome(
kind="no_attacker_row",
old_hash=None,
new_hash=new_hash,
rotation_count=0,
)
row = session.exec(
select(AttackerFingerprintState).where(
AttackerFingerprintState.attacker_uuid == attacker.uuid,
AttackerFingerprintState.port == port,
AttackerFingerprintState.probe_type == probe_type,
)
).first()
if row is None:
session.add(AttackerFingerprintState(
uuid=str(_uuid.uuid4()),
attacker_uuid=attacker.uuid,
port=port,
probe_type=probe_type,
last_hash=new_hash,
last_seen=ts,
rotation_count=0,
))
session.commit()
return RotationOutcome(
kind="first_sighting",
old_hash=None,
new_hash=new_hash,
rotation_count=0,
)
if row.last_hash == new_hash:
row.last_seen = ts
session.add(row)
session.commit()
return RotationOutcome(
kind="unchanged",
old_hash=row.last_hash,
new_hash=new_hash,
rotation_count=row.rotation_count,
)
old_hash = row.last_hash
row.last_hash = new_hash
row.last_seen = ts
row.rotation_count += 1
session.add(row)
attacker.rotation_count += 1
attacker.last_rotation_at = ts
session.add(attacker)
payload: dict[str, Any] = {
"attacker_uuid": attacker.uuid,
"attacker_ip": attacker_ip,
"port": port,
"probe_type": probe_type,
"old_hash": old_hash,
"new_hash": new_hash,
"rotation_count": row.rotation_count,
"ts": ts.isoformat(),
}
if publish_fn is not None:
publish_fn(_ROTATED_EVENT_TYPE, payload)
if syslog_fn is not None:
syslog_fn(_ROTATED_EVENT_TYPE, payload)
session.commit()
return RotationOutcome(
kind="rotated",
old_hash=old_hash,
new_hash=new_hash,
rotation_count=row.rotation_count,
)

View File

@@ -27,6 +27,9 @@ from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any, Callable from typing import Any, Callable
from sqlalchemy.engine import Engine
from sqlmodel import Session
from decnet.bus import topics as _topics from decnet.bus import topics as _topics
from decnet.bus.base import BaseBus from decnet.bus.base import BaseBus
from decnet.bus.factory import get_bus from decnet.bus.factory import get_bus
@@ -35,6 +38,10 @@ from decnet.bus.publish import (
run_control_listener, run_control_listener,
run_health_heartbeat, run_health_heartbeat,
) )
from decnet.correlation.fingerprint_rotation import (
ProbeType,
record_fingerprint,
)
from decnet.logging import get_logger from decnet.logging import get_logger
from decnet.prober.hassh import hassh_server from decnet.prober.hassh import hassh_server
from decnet.prober.jarm import JARM_EMPTY_HASH, jarm_hash from decnet.prober.jarm import JARM_EMPTY_HASH, jarm_hash
@@ -44,6 +51,21 @@ from decnet.telemetry import traced as _traced
logger = get_logger("prober") logger = get_logger("prober")
def _build_sync_engine() -> Engine:
"""Construct a sync SQLite engine for rotation-detection state.
Used inline by the prober; it lives outside the async repository
layer because rotation detection is a sync hook on a sync probe
path. Honors the same defaulting as
``decnet.web.db.sqlite.repository.SQLiteRepository``.
"""
import os
from decnet.config import _ROOT
from decnet.web.db.sqlite.database import get_sync_engine
db_path = os.environ.get("DECNET_DB_PATH", str(_ROOT / "decnet.db"))
return get_sync_engine(db_path)
# ─── Default ports per probe type ─────────────────────────────────────────── # ─── Default ports per probe type ───────────────────────────────────────────
# JARM: common C2 callback / TLS server ports # JARM: common C2 callback / TLS server ports
@@ -233,6 +255,14 @@ def _discover_attackers(json_path: Path, position: int) -> tuple[set[str], int]:
ProbePublishFn = Callable[[str, dict[str, Any]], None] ProbePublishFn = Callable[[str, dict[str, Any]], None]
# Rotation recorder: takes (attacker_ip, port, probe_type, new_hash) and
# performs the rotation-detection upsert + derived-event emission for the
# DEBT-032 substrate-fingerprint flow. Optional; when None the prober
# behaves exactly as before (raw fingerprint emit only, no rotation
# detection). Construction lives at worker startup so phase functions
# don't have to know about the DB engine.
RotationRecorderFn = Callable[[str, int, "ProbeType", str], None]
@_traced("prober.probe_cycle") @_traced("prober.probe_cycle")
def _probe_cycle( def _probe_cycle(
@@ -245,6 +275,7 @@ def _probe_cycle(
json_path: Path, json_path: Path,
timeout: float = 5.0, timeout: float = 5.0,
publish_fn: ProbePublishFn | None = None, publish_fn: ProbePublishFn | None = None,
record_rotation: RotationRecorderFn | None = None,
) -> None: ) -> None:
""" """
Probe all known attacker IPs with JARM, HASSH, and TCP/IP fingerprinting. Probe all known attacker IPs with JARM, HASSH, and TCP/IP fingerprinting.
@@ -263,13 +294,13 @@ def _probe_cycle(
ip_probed = probed.setdefault(ip, {}) ip_probed = probed.setdefault(ip, {})
# Phase 1: JARM (TLS fingerprinting) # Phase 1: JARM (TLS fingerprinting)
_jarm_phase(ip, ip_probed, jarm_ports, log_path, json_path, timeout, publish_fn) _jarm_phase(ip, ip_probed, jarm_ports, log_path, json_path, timeout, publish_fn, record_rotation)
# Phase 2: HASSHServer (SSH fingerprinting) # Phase 2: HASSHServer (SSH fingerprinting)
_hassh_phase(ip, ip_probed, ssh_ports, log_path, json_path, timeout, publish_fn) _hassh_phase(ip, ip_probed, ssh_ports, log_path, json_path, timeout, publish_fn, record_rotation)
# Phase 3: TCP/IP stack fingerprinting # Phase 3: TCP/IP stack fingerprinting
_tcpfp_phase(ip, ip_probed, tcpfp_ports, log_path, json_path, timeout, publish_fn) _tcpfp_phase(ip, ip_probed, tcpfp_ports, log_path, json_path, timeout, publish_fn, record_rotation)
@_traced("prober.jarm_phase") @_traced("prober.jarm_phase")
@@ -281,6 +312,7 @@ def _jarm_phase(
json_path: Path, json_path: Path,
timeout: float, timeout: float,
publish_fn: ProbePublishFn | None = None, publish_fn: ProbePublishFn | None = None,
record_rotation: RotationRecorderFn | None = None,
) -> None: ) -> None:
"""JARM-fingerprint an IP on the given TLS ports.""" """JARM-fingerprint an IP on the given TLS ports."""
done = ip_probed.setdefault("jarm", set()) done = ip_probed.setdefault("jarm", set())
@@ -301,6 +333,8 @@ def _jarm_phase(
msg=f"JARM {ip}:{port} = {h}", msg=f"JARM {ip}:{port} = {h}",
) )
logger.info("prober: JARM %s:%d = %s", ip, port, h) logger.info("prober: JARM %s:%d = %s", ip, port, h)
if record_rotation is not None:
record_rotation(ip, port, "jarm", h)
if publish_fn is not None: if publish_fn is not None:
publish_fn( publish_fn(
"jarm", "jarm",
@@ -387,6 +421,7 @@ def _hassh_phase(
json_path: Path, json_path: Path,
timeout: float, timeout: float,
publish_fn: ProbePublishFn | None = None, publish_fn: ProbePublishFn | None = None,
record_rotation: RotationRecorderFn | None = None,
) -> None: ) -> None:
"""HASSHServer-fingerprint an IP on the given SSH ports.""" """HASSHServer-fingerprint an IP on the given SSH ports."""
done = ip_probed.setdefault("hassh", set()) done = ip_probed.setdefault("hassh", set())
@@ -412,6 +447,8 @@ def _hassh_phase(
msg=f"HASSH {ip}:{port} = {result['hassh_server']}", msg=f"HASSH {ip}:{port} = {result['hassh_server']}",
) )
logger.info("prober: HASSH %s:%d = %s", ip, port, result["hassh_server"]) logger.info("prober: HASSH %s:%d = %s", ip, port, result["hassh_server"])
if record_rotation is not None:
record_rotation(ip, port, "hassh", result["hassh_server"])
if publish_fn is not None: if publish_fn is not None:
publish_fn( publish_fn(
"hassh", "hassh",
@@ -445,6 +482,7 @@ def _tcpfp_phase(
json_path: Path, json_path: Path,
timeout: float, timeout: float,
publish_fn: ProbePublishFn | None = None, publish_fn: ProbePublishFn | None = None,
record_rotation: RotationRecorderFn | None = None,
) -> None: ) -> None:
"""TCP/IP stack fingerprint an IP on the given ports.""" """TCP/IP stack fingerprint an IP on the given ports."""
done = ip_probed.setdefault("tcpfp", set()) done = ip_probed.setdefault("tcpfp", set())
@@ -478,6 +516,8 @@ def _tcpfp_phase(
msg=f"TCPFP {ip}:{port} = {result['tcpfp_hash']}", msg=f"TCPFP {ip}:{port} = {result['tcpfp_hash']}",
) )
logger.info("prober: TCPFP %s:%d = %s", ip, port, result["tcpfp_hash"]) logger.info("prober: TCPFP %s:%d = %s", ip, port, result["tcpfp_hash"])
if record_rotation is not None:
record_rotation(ip, port, "tcpfp", result["tcpfp_hash"])
if publish_fn is not None: if publish_fn is not None:
publish_fn( publish_fn(
"tcpfp", "tcpfp",
@@ -586,6 +626,61 @@ async def prober_worker(
event_type, event_type,
) )
# Substrate-rotation detection (DEBT-032) — open a sync engine for
# the prober's lifetime; recorder closes a session per call so we
# never hold a connection across phase boundaries. Failure to
# connect is non-fatal: probes continue, rotation detection is
# silently disabled.
rotation_engine: Engine | None = None
record_rotation: RotationRecorderFn | None = None
try:
rotation_engine = _build_sync_engine()
except Exception as exc: # noqa: BLE001
logger.warning(
"prober: rotation-detection DB unavailable, "
"running with rotation detection disabled: %s", exc,
)
if rotation_engine is not None:
def _publish_rotation(event_type: str, payload: dict[str, Any]) -> None:
raw_publish(
_topics.attacker(_topics.ATTACKER_FINGERPRINT_ROTATED),
payload,
event_type,
)
def _syslog_rotation(event_type: str, payload: dict[str, Any]) -> None:
_write_event(
log_path, json_path,
"fingerprint_rotated",
target_ip=payload["attacker_ip"],
target_port=str(payload["port"]),
probe_type=payload["probe_type"],
old_hash=payload.get("old_hash") or "",
new_hash=payload["new_hash"],
rotation_count=str(payload["rotation_count"]),
msg=(
f"FP rotation {payload['attacker_ip']}:{payload['port']} "
f"{payload['probe_type']} {payload.get('old_hash')}"
f"{payload['new_hash']}"
),
)
def record_rotation(
ip: str, port: int, probe_type: ProbeType, new_hash: str,
) -> None:
with Session(rotation_engine) as session:
record_fingerprint(
session,
attacker_ip=ip,
port=port,
probe_type=probe_type,
new_hash=new_hash,
ts=datetime.now(timezone.utc),
publish_fn=_publish_rotation,
syslog_fn=_syslog_rotation,
)
shutdown = asyncio.Event() shutdown = asyncio.Event()
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "prober")) heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "prober"))
control_task = asyncio.create_task( control_task = asyncio.create_task(
@@ -612,6 +707,7 @@ async def prober_worker(
jarm_ports, hassh_ports, tcp_ports, jarm_ports, hassh_ports, tcp_ports,
log_path, json_path, timeout, log_path, json_path, timeout,
_publish_attacker, _publish_attacker,
record_rotation,
) )
try: try:
@@ -626,3 +722,6 @@ async def prober_worker(
if bus is not None: if bus is not None:
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
await bus.close() await bus.close()
if rotation_engine is not None:
with contextlib.suppress(Exception):
rotation_engine.dispose()

View File

@@ -45,6 +45,7 @@ from .auth import (
from .attackers import ( from .attackers import (
Attacker, Attacker,
AttackerBehavior, AttackerBehavior,
AttackerFingerprintState,
AttackerIdentity, AttackerIdentity,
AttackersResponse, AttackersResponse,
SessionProfile, SessionProfile,
@@ -242,6 +243,7 @@ __all__ = [
# attackers # attackers
"Attacker", "Attacker",
"AttackerBehavior", "AttackerBehavior",
"AttackerFingerprintState",
"AttackerIdentity", "AttackerIdentity",
"AttackerIntel", "AttackerIntel",
"AttackersResponse", "AttackersResponse",

View File

@@ -93,11 +93,48 @@ class Attacker(SQLModel, table=True):
# private/loopback addresses never resolve. 256 chars matches # private/loopback addresses never resolve. 256 chars matches
# RFC 1035 max hostname length. # RFC 1035 max hostname length.
ptr_record: Optional[str] = Field(default=None, max_length=256) ptr_record: Optional[str] = Field(default=None, max_length=256)
# Substrate-rotation telemetry, maintained by
# ``decnet.correlation.fingerprint_rotation.record_fingerprint`` whenever
# the prober observes a new hash for an (attacker, port, probe_type)
# triple it has seen before. Lets the dashboard render "rotated 3×
# last 24h" without joining to AttackerFingerprintState.
rotation_count: int = Field(default=0)
last_rotation_at: Optional[datetime] = Field(default=None, index=True)
updated_at: datetime = Field( updated_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True default_factory=lambda: datetime.now(timezone.utc), index=True
) )
class AttackerFingerprintState(SQLModel, table=True):
"""Per-(attacker, port, probe_type) latest-hash row.
Sole purpose: give the prober memory across runs so it can detect when
an attacker's HASSH/JARM/TCP fingerprint flips for the same port — i.e.
they rotated their VPS, rebuilt their SSH server, swapped their TLS
cert. Diff detection lives in
``decnet.correlation.fingerprint_rotation``; the prober calls into
that library inline at each emit site and this table is the only
persistence it needs.
Bounded by ``attackers × probe families × ports`` — small in practice;
a busy fleet sees O(thousands) of rows, not O(millions).
"""
__tablename__ = "attacker_fingerprint_state"
uuid: str = Field(primary_key=True)
attacker_uuid: str = Field(foreign_key="attackers.uuid", index=True)
port: int
probe_type: str = Field(max_length=16) # "jarm" | "hassh" | "tcpfp"
last_hash: str = Field(max_length=128)
last_seen: datetime = Field(index=True)
rotation_count: int = Field(default=0)
__table_args__ = (
UniqueConstraint(
"attacker_uuid", "port", "probe_type",
name="uq_attacker_fingerprint_state_natural",
),
)
class AttackerIdentity(SQLModel, table=True): class AttackerIdentity(SQLModel, table=True):
""" """
Resolved actor identity — the dedup'd "same hands" row that one or Resolved actor identity — the dedup'd "same hands" row that one or

View File

@@ -470,22 +470,14 @@ The threat-intel enrichment surface (DEBT-N/A: `feat(intel)` series) keys every
**Status:** Open. No operational impact today (single-IP attackers are the dominant case), but worth closing before the federation layer lands so the wire-format and API both speak in identity terms, not IP terms. **Status:** Open. No operational impact today (single-IP attackers are the dominant case), but worth closing before the federation layer lands so the wire-format and API both speak in identity terms, not IP terms.
### DEBT-032 — Prober can't detect fingerprint rotation without mutation ### ~~DEBT-032 — Attacker fingerprint rotation detection~~ ✅ RESOLVED
**Files:** `decnet/prober/worker.py` (~lines 235, 286, 334, 392), `decnet/web/db/models.py` (new `decky_service_fingerprints` table). **Files:** `decnet/correlation/fingerprint_rotation.py` (new), `decnet/prober/worker.py`, `decnet/web/db/models/attackers.py`, `decnet/bus/topics.py`.
Substrate identity is `(service_name, implementation_fingerprint)`, not service name alone. A base-image rebuild that rotates OpenSSH 8.4 → 9.2 — or any recompose that changes JARM / HASSH / TCP fingerprint without changing the service list — is a substrate transition from the attacker's recon POV, and today the correlation graph sees none of it. Resolved 2026-05-03. **Reframed during planning:** the original entry described this as a per-decky substrate-integrity problem, but the prober probes *attackers*, not deckies. The actual gap was attacker substrate tracking — same attacker IP rotating their VPS, rebuilding their SSH server, swapping their TLS cert — invisible at correlator-time because nothing diffed consecutive hashes for the same `(attacker_ip, port, probe_type)` triple.
The prober already computes JARM (`worker.py:286`), HASSH (`worker.py:334`), and TCP fingerprint (`worker.py:392`), and emits each as RFC 5424 syslog + optional bus publish. What's missing is **per-(decky, service, probe_type) persistence** to diff against: the current dedup set `probed: dict[IP → {probe_type → set(ports)}]` (`worker.py:235`) is in-memory and scoped to one run, so any restart loses history and any same-IP probe on a changed substrate can't be detected as a change. Implemented as a small library (`decnet.correlation.fingerprint_rotation.record_fingerprint`) called inline from the prober at each of the three emit sites (JARM / HASSH / TCPFP). No new worker daemon; the prober is still the only producer, just teaches it to derive a second event on hash flip. New `AttackerFingerprintState` table holds per-`(attacker_uuid, port, probe_type)` last-hash state. New bus topic `attacker.fingerprint_rotated` carries `{attacker_uuid, attacker_ip, port, probe_type, old_hash, new_hash, rotation_count, ts}`. `Attacker.rotation_count` and `Attacker.last_rotation_at` are stamped on every diff so the dashboard can render rotation telemetry without joining. Library is fully sync + unit-tested with injected publish_fn / syslog_fn callbacks.
**Design:** Out of scope (deferred): dashboard surfacing of `rotation_count`; attribution clustering across attackers (same JARM seen from different IPs); backfill from existing event store.
1. New SQLModel table `decky_service_fingerprints` keyed by `(decky_name, service, probe_type)` with `last_hash, last_seen_at, sample_count`. One upsert per probe; bounded by fleet × probe families.
2. Prober reads `last_hash` before emitting; on diff, emits a new `substrate_fingerprint_changed` event (RFC 5424 syslog + `decky.{id}.fingerprint` bus topic) with `{decky, service, probe_type, old_hash, new_hash}`. On match, upsert the timestamp and skip the event.
3. Correlator consumes the new event kind into a parallel per-decky index (mirroring the mutation index landed in this session) and interleaves `🔍 decky-03 hassh drift` markers in `AttackerTraversal.fingerprints_during`.
4. Divergence detector: compare `substrate_state(t)` fold (mutations) vs `observed_identity(t)` fold (fingerprints) per decky. A fingerprint change without a preceding mutation ⇒ `substrate_divergence` finding — container drift, compromised base image, rootkit banner rewrite, or prober lag. Falls out of the data model for free once both streams exist.
**Prerequisite satisfied:** mutation event stream + correlator mutation-kind parser landed alongside this DEBT entry (commits `f875350`, `fa0cdb3`, `bf5ed7a`, `d4d8a2a` on `dev`). The fingerprint stream plugs into the same substrate: same RFC 5424 emission pattern, sibling per-decky engine index, same timeline interleaving.
**Status:** Open — deferred to its own commit sequence. The dedup state in `worker.py:235` is the only thing standing between "JARM hash computed" and "substrate rotation detected."
--- ---
@@ -713,7 +705,7 @@ user who needs it.
| DEBT-029 | 🟡 Medium | Architecture / Bus | ✅ resolved | | DEBT-029 | 🟡 Medium | Architecture / Bus | ✅ resolved |
| DEBT-030 | 🟡 Medium | Web / Live mutations | ✅ resolved (Phase A) | | DEBT-030 | 🟡 Medium | Web / Live mutations | ✅ resolved (Phase A) |
| ~~DEBT-031~~ | ✅ | Workers / Bus integration | resolved | | ~~DEBT-031~~ | ✅ | Workers / Bus integration | resolved |
| DEBT-032 | 🟡 Medium | Correlation / Prober | open | | ~~DEBT-032~~ | ✅ | Correlation / Prober | resolved 2026-05-03 |
| DEBT-033 | 🟡 Medium | Storage / Session recording | open | | DEBT-033 | 🟡 Medium | Storage / Session recording | open |
| ~~DEBT-035~~ | ✅ | Artifacts / Filesystem perms | resolved 2026-05-02 | | ~~DEBT-035~~ | ✅ | Artifacts / Filesystem perms | resolved 2026-05-02 |
| DEBT-036 | 🟡 Medium | Correlation / Keystroke dynamics | open | | DEBT-036 | 🟡 Medium | Correlation / Keystroke dynamics | open |
@@ -731,5 +723,5 @@ user who needs it.
| DEBT-048 | 🟡 Medium | TTP / Intel provider mapping review (recurring) | open / recurring | | DEBT-048 | 🟡 Medium | TTP / Intel provider mapping review (recurring) | open / recurring |
| DEBT-049 | 🟡 Medium | TTP / Sigma adapter (post-v1) | open | | DEBT-049 | 🟡 Medium | TTP / Sigma adapter (post-v1) | open |
**Remaining open:** DEBT-011 (Alembic), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-032 (fingerprint rotation detection), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-042 (orchestrator failure-count window), DEBT-043 (frontend test framework), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-046 (mal-hash feed), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1). **Remaining open:** DEBT-011 (Alembic), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-042 (orchestrator failure-count window), DEBT-043 (frontend test framework), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-046 (mal-hash feed), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1).
**Estimated remaining effort:** ~21 hours plus the new EmailLifter / TTP follow-ups. DEBT-030 Phase B (optimistic staged-buffer editor) is a follow-up, not debt. **Estimated remaining effort:** ~21 hours plus the new EmailLifter / TTP follow-ups. DEBT-030 Phase B (optimistic staged-buffer editor) is a follow-up, not debt.

View File

@@ -0,0 +1,236 @@
"""Unit tests for ``decnet.correlation.fingerprint_rotation``.
Pure library: in-memory SQLite + sync Session + collected callback
calls. No prober, no bus, no async. Each test seeds an Attacker row,
calls ``record_fingerprint``, asserts on the returned outcome + the
side-effects (state row, Attacker stamp, callback invocations).
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import pytest
from sqlalchemy.engine import Engine
from sqlmodel import Session, SQLModel, create_engine, select
from decnet.correlation.fingerprint_rotation import (
record_fingerprint,
RotationOutcome,
)
from decnet.web.db.models import (
Attacker,
AttackerFingerprintState,
)
@pytest.fixture
def engine() -> Engine:
eng = create_engine("sqlite://", connect_args={"check_same_thread": False})
SQLModel.metadata.create_all(eng)
return eng
@pytest.fixture
def now() -> datetime:
return datetime(2026, 5, 3, 12, 0, 0, tzinfo=timezone.utc)
def _seed_attacker(session: Session, ip: str = "1.2.3.4") -> Attacker:
a = Attacker(
uuid="attacker-uuid-1",
ip=ip,
first_seen=datetime.now(timezone.utc),
last_seen=datetime.now(timezone.utc),
)
session.add(a)
session.commit()
session.refresh(a)
return a
class _Recorder:
"""Capture (event_type, payload) tuples from publish_fn / syslog_fn."""
def __init__(self) -> None:
self.calls: list[tuple[str, dict]] = []
def __call__(self, event_type: str, payload: dict) -> None:
self.calls.append((event_type, payload))
def test_no_attacker_row_returns_noop(engine, now):
publish, syslog = _Recorder(), _Recorder()
with Session(engine) as session:
outcome = record_fingerprint(
session,
attacker_ip="9.9.9.9",
port=22,
probe_type="hassh",
new_hash="abc",
ts=now,
publish_fn=publish,
syslog_fn=syslog,
)
assert outcome.kind == "no_attacker_row"
assert publish.calls == []
assert syslog.calls == []
with Session(engine) as session:
rows = session.exec(select(AttackerFingerprintState)).all()
assert rows == []
def test_first_sighting_creates_state_row_no_event(engine, now):
publish, syslog = _Recorder(), _Recorder()
with Session(engine) as session:
_seed_attacker(session)
outcome = record_fingerprint(
session,
attacker_ip="1.2.3.4",
port=22,
probe_type="hassh",
new_hash="hash-1",
ts=now,
publish_fn=publish,
syslog_fn=syslog,
)
assert outcome.kind == "first_sighting"
assert outcome.old_hash is None
assert outcome.new_hash == "hash-1"
assert outcome.rotation_count == 0
assert publish.calls == []
assert syslog.calls == []
with Session(engine) as session:
rows = session.exec(select(AttackerFingerprintState)).all()
assert len(rows) == 1
assert rows[0].last_hash == "hash-1"
assert rows[0].rotation_count == 0
a = session.exec(select(Attacker)).one()
assert a.rotation_count == 0
assert a.last_rotation_at is None
def test_unchanged_hash_bumps_last_seen_no_event(engine, now):
publish, syslog = _Recorder(), _Recorder()
later = now + timedelta(minutes=10)
with Session(engine) as session:
_seed_attacker(session)
record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
new_hash="hash-1", ts=now,
)
outcome = record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
new_hash="hash-1", ts=later,
publish_fn=publish, syslog_fn=syslog,
)
assert outcome.kind == "unchanged"
assert publish.calls == []
assert syslog.calls == []
with Session(engine) as session:
row = session.exec(select(AttackerFingerprintState)).one()
# SQLite strips tzinfo on round-trip; compare naive values.
assert row.last_seen.replace(tzinfo=timezone.utc) == later
assert row.rotation_count == 0
def test_rotated_emits_event_and_stamps_attacker(engine, now):
publish, syslog = _Recorder(), _Recorder()
later = now + timedelta(hours=1)
with Session(engine) as session:
_seed_attacker(session)
record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
new_hash="hash-1", ts=now,
)
outcome = record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
new_hash="hash-2", ts=later,
publish_fn=publish, syslog_fn=syslog,
)
assert outcome.kind == "rotated"
assert outcome.old_hash == "hash-1"
assert outcome.new_hash == "hash-2"
assert outcome.rotation_count == 1
assert len(publish.calls) == 1
assert len(syslog.calls) == 1
event_type, payload = publish.calls[0]
assert event_type == "attacker.fingerprint_rotated"
assert payload["attacker_uuid"] == "attacker-uuid-1"
assert payload["attacker_ip"] == "1.2.3.4"
assert payload["port"] == 22
assert payload["probe_type"] == "hassh"
assert payload["old_hash"] == "hash-1"
assert payload["new_hash"] == "hash-2"
assert payload["rotation_count"] == 1
assert payload["ts"] == later.isoformat()
with Session(engine) as session:
a = session.exec(select(Attacker)).one()
assert a.rotation_count == 1
assert a.last_rotation_at is not None
assert a.last_rotation_at.replace(tzinfo=timezone.utc) == later
row = session.exec(select(AttackerFingerprintState)).one()
assert row.last_hash == "hash-2"
assert row.rotation_count == 1
def test_three_probe_types_independent(engine, now):
with Session(engine) as session:
_seed_attacker(session)
for ptype in ("jarm", "hassh", "tcpfp"):
record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type=ptype,
new_hash=f"{ptype}-1", ts=now,
)
with Session(engine) as session:
rows = session.exec(select(AttackerFingerprintState)).all()
assert {r.probe_type for r in rows} == {"jarm", "hassh", "tcpfp"}
assert {r.last_hash for r in rows} == {"jarm-1", "hassh-1", "tcpfp-1"}
def test_two_ports_same_probe_type_independent(engine, now):
with Session(engine) as session:
_seed_attacker(session)
for port in (22, 2222):
record_fingerprint(
session,
attacker_ip="1.2.3.4", port=port, probe_type="hassh",
new_hash=f"hash-{port}", ts=now,
)
with Session(engine) as session:
rows = session.exec(select(AttackerFingerprintState)).all()
assert {r.port for r in rows} == {22, 2222}
def test_multiple_rotations_increment_counter(engine, now):
publish = _Recorder()
with Session(engine) as session:
_seed_attacker(session)
record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
new_hash="h1", ts=now, publish_fn=publish,
)
record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
new_hash="h2", ts=now + timedelta(minutes=5), publish_fn=publish,
)
record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
new_hash="h3", ts=now + timedelta(minutes=10), publish_fn=publish,
)
assert len(publish.calls) == 2 # first call was first_sighting (no event)
with Session(engine) as session:
a = session.exec(select(Attacker)).one()
assert a.rotation_count == 2
row = session.exec(select(AttackerFingerprintState)).one()
assert row.rotation_count == 2
assert row.last_hash == "h3"

View File

@@ -0,0 +1,142 @@
"""Integration test: prober phase functions invoke the rotation recorder.
The prober worker constructs the recorder closure at startup; here we
verify that ``_probe_cycle`` threads a recorder through to JARM / HASSH
/ TCPFP phases and that the recorder gets the (ip, port, probe_type,
hash) tuple it expects. The library itself is unit-tested separately.
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
from decnet.prober.worker import _probe_cycle
@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None)
@patch("decnet.prober.worker.tcp_fingerprint", return_value=None)
@patch("decnet.prober.worker.hassh_server", return_value=None)
@patch("decnet.prober.worker.jarm_hash")
def test_jarm_phase_calls_recorder(
mock_jarm: MagicMock,
_mock_hassh: MagicMock,
_mock_tcpfp: MagicMock,
_mock_cert: MagicMock,
tmp_path: Path,
):
mock_jarm.return_value = "c0c" * 10 + "a" * 32
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
rec_calls: list[tuple] = []
recorder = lambda ip, port, ptype, h: rec_calls.append((ip, port, ptype, h)) # noqa: E731
_probe_cycle(
{"10.0.0.5"}, {},
[443], [], [],
log_path, json_path,
timeout=1.0,
publish_fn=None,
record_rotation=recorder,
)
assert rec_calls == [("10.0.0.5", 443, "jarm", "c0c" * 10 + "a" * 32)]
@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None)
@patch("decnet.prober.worker.tcp_fingerprint", return_value=None)
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash", return_value="")
def test_hassh_phase_calls_recorder(
_mock_jarm: MagicMock,
mock_hassh: MagicMock,
_mock_tcpfp: MagicMock,
_mock_cert: MagicMock,
tmp_path: Path,
):
mock_hassh.return_value = {
"hassh_server": "deadbeef",
"banner": "SSH-2.0-OpenSSH_9.2",
"kex_algorithms": "x",
"encryption_s2c": "x",
"mac_s2c": "x",
"compression_s2c": "x",
}
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
rec_calls: list[tuple] = []
recorder = lambda ip, port, ptype, h: rec_calls.append((ip, port, ptype, h)) # noqa: E731
_probe_cycle(
{"10.0.0.5"}, {},
[], [22], [],
log_path, json_path,
timeout=1.0,
publish_fn=None,
record_rotation=recorder,
)
assert rec_calls == [("10.0.0.5", 22, "hassh", "deadbeef")]
@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None)
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server", return_value=None)
@patch("decnet.prober.worker.jarm_hash", return_value="")
def test_tcpfp_phase_calls_recorder(
_mock_jarm, _mock_hassh, mock_tcpfp, _mock_cert, tmp_path: Path,
):
mock_tcpfp.return_value = {
"tcpfp_hash": "tcpfp-hash-1",
"tcpfp_raw": "raw",
"ttl": 64,
"window_size": 65535,
"df_bit": True,
"mss": 1460,
"window_scale": 7,
"sack_ok": True,
"timestamp": True,
"options_order": "MSS,SACK,TS,NOP,WS",
"tos": 0,
"dscp": 0,
"ecn": 0,
"server_isn": 0,
}
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
rec_calls: list[tuple] = []
recorder = lambda ip, port, ptype, h: rec_calls.append((ip, port, ptype, h)) # noqa: E731
_probe_cycle(
{"10.0.0.5"}, {},
[], [], [22],
log_path, json_path,
timeout=1.0,
publish_fn=None,
record_rotation=recorder,
)
assert rec_calls == [("10.0.0.5", 22, "tcpfp", "tcpfp-hash-1")]
@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None)
@patch("decnet.prober.worker.tcp_fingerprint", return_value=None)
@patch("decnet.prober.worker.hassh_server", return_value=None)
@patch("decnet.prober.worker.jarm_hash")
def test_recorder_optional_no_crash_when_none(
mock_jarm: MagicMock,
_mock_hassh: MagicMock,
_mock_tcpfp: MagicMock,
_mock_cert: MagicMock,
tmp_path: Path,
):
"""record_rotation=None must keep the prober's pre-DEBT-032 behavior."""
mock_jarm.return_value = "c0c" * 10 + "a" * 32
_probe_cycle(
{"10.0.0.5"}, {},
[443], [], [],
tmp_path / "decnet.log", tmp_path / "decnet.json",
timeout=1.0,
publish_fn=None,
record_rotation=None,
)
# No error, probe completes.