feat(prober,correlation): attacker fingerprint rotation detection (DEBT-032)

When the prober observes a NEW hash for an
(attacker_uuid, port, probe_type) triple it has seen before — VPS
rotation, SSH server rebuild, TLS cert swap — emit a derived
attacker.fingerprint_rotated event carrying both old and new hash.
Detection is a small library (decnet.correlation.fingerprint_rotation)
called inline from the prober at each of the three emit sites
(JARM/HASSH/TCPFP). No new daemon. New AttackerFingerprintState table
holds per-triple last-hash state; Attacker.rotation_count and
Attacker.last_rotation_at are stamped on every diff. Library is sync,
fully unit-tested via injected publish_fn / syslog_fn callbacks.
This commit is contained in:
2026-05-03 05:12:51 -04:00
parent dcd558fd91
commit 6c6f97e840
8 changed files with 687 additions and 18 deletions

View File

@@ -114,6 +114,14 @@ ATTACKER_SCORED = "scored"
# Distinct from ``observed`` which is the correlator's first-sight signal —
# a fingerprint is additional evidence about an already-observed attacker.
ATTACKER_FINGERPRINTED = "fingerprinted"
# Published when the prober observes a NEW hash for an
# (attacker_ip, port, probe_type) triple it has seen before — i.e. the
# attacker rotated their VPS, rebuilt their SSH server, swapped their
# TLS cert. Distinct from ``fingerprinted`` which fires on every probe
# result; ``fingerprint_rotated`` fires only on diff and carries both
# old_hash + new_hash. Producer: prober (via the rotation library);
# consumers: dashboard, forensics, attribution clustering.
ATTACKER_FINGERPRINT_ROTATED = "fingerprint_rotated"
ATTACKER_SESSION_STARTED = "session.started"
ATTACKER_SESSION_ENDED = "session.ended"
# Published by the ``decnet enrich`` worker after an enrichment pass

View File

@@ -0,0 +1,153 @@
"""Attacker substrate-fingerprint rotation detection.
Called inline from the prober at each fingerprint emit site. Looks up
the last persisted hash for ``(attacker_uuid, port, probe_type)``;
when the new hash differs from the last one, emits a derived
``attacker.fingerprint_rotated`` event (bus + RFC 5424 syslog) and
stamps the ``Attacker`` row's rotation telemetry.
This is a pure library — no daemon, no async loop. The prober is the
only producer. We just teach it to derive a second event on hash
flip without standing up another worker (DEBT-032).
"""
from __future__ import annotations
import uuid as _uuid
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Callable, Literal
from sqlmodel import Session, select
from decnet.web.db.models import Attacker, AttackerFingerprintState
ProbeType = Literal["jarm", "hassh", "tcpfp"]
RotationKind = Literal[
"no_attacker_row", # caller raced ahead of correlator; skip silently
"first_sighting", # state row created, no prior hash
"unchanged", # same hash as last sighting
"rotated", # hash differs; event emitted, Attacker stamped
]
PublishFn = Callable[[str, dict[str, Any]], None]
SyslogFn = Callable[[str, dict[str, Any]], None]
@dataclass
class RotationOutcome:
"""Return shape of :func:`record_fingerprint`. Caller usually
ignores it; useful for tests + tracing."""
kind: RotationKind
old_hash: str | None
new_hash: str
rotation_count: int
_ROTATED_EVENT_TYPE = "attacker.fingerprint_rotated"
def record_fingerprint(
session: Session,
*,
attacker_ip: str,
port: int,
probe_type: ProbeType,
new_hash: str,
ts: datetime,
publish_fn: PublishFn | None = None,
syslog_fn: SyslogFn | None = None,
) -> RotationOutcome:
"""Upsert state row; on hash diff, emit derived event + stamp.
Resolves ``attacker_uuid`` from ``attacker_ip`` via the existing
Attacker table. If no Attacker row exists yet (the prober raced
ahead of the correlator), returns ``kind="no_attacker_row"`` and
does nothing — the next probe cycle will pick it up once the
correlator has caught up.
State upsert + Attacker stamp + publish + syslog are committed in
one transaction so a partial failure can't desync state from
what was emitted.
"""
attacker = session.exec(
select(Attacker).where(Attacker.ip == attacker_ip)
).first()
if attacker is None:
return RotationOutcome(
kind="no_attacker_row",
old_hash=None,
new_hash=new_hash,
rotation_count=0,
)
row = session.exec(
select(AttackerFingerprintState).where(
AttackerFingerprintState.attacker_uuid == attacker.uuid,
AttackerFingerprintState.port == port,
AttackerFingerprintState.probe_type == probe_type,
)
).first()
if row is None:
session.add(AttackerFingerprintState(
uuid=str(_uuid.uuid4()),
attacker_uuid=attacker.uuid,
port=port,
probe_type=probe_type,
last_hash=new_hash,
last_seen=ts,
rotation_count=0,
))
session.commit()
return RotationOutcome(
kind="first_sighting",
old_hash=None,
new_hash=new_hash,
rotation_count=0,
)
if row.last_hash == new_hash:
row.last_seen = ts
session.add(row)
session.commit()
return RotationOutcome(
kind="unchanged",
old_hash=row.last_hash,
new_hash=new_hash,
rotation_count=row.rotation_count,
)
old_hash = row.last_hash
row.last_hash = new_hash
row.last_seen = ts
row.rotation_count += 1
session.add(row)
attacker.rotation_count += 1
attacker.last_rotation_at = ts
session.add(attacker)
payload: dict[str, Any] = {
"attacker_uuid": attacker.uuid,
"attacker_ip": attacker_ip,
"port": port,
"probe_type": probe_type,
"old_hash": old_hash,
"new_hash": new_hash,
"rotation_count": row.rotation_count,
"ts": ts.isoformat(),
}
if publish_fn is not None:
publish_fn(_ROTATED_EVENT_TYPE, payload)
if syslog_fn is not None:
syslog_fn(_ROTATED_EVENT_TYPE, payload)
session.commit()
return RotationOutcome(
kind="rotated",
old_hash=old_hash,
new_hash=new_hash,
rotation_count=row.rotation_count,
)

View File

@@ -27,6 +27,9 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable
from sqlalchemy.engine import Engine
from sqlmodel import Session
from decnet.bus import topics as _topics
from decnet.bus.base import BaseBus
from decnet.bus.factory import get_bus
@@ -35,6 +38,10 @@ from decnet.bus.publish import (
run_control_listener,
run_health_heartbeat,
)
from decnet.correlation.fingerprint_rotation import (
ProbeType,
record_fingerprint,
)
from decnet.logging import get_logger
from decnet.prober.hassh import hassh_server
from decnet.prober.jarm import JARM_EMPTY_HASH, jarm_hash
@@ -44,6 +51,21 @@ from decnet.telemetry import traced as _traced
logger = get_logger("prober")
def _build_sync_engine() -> Engine:
"""Construct a sync SQLite engine for rotation-detection state.
Used inline by the prober; it lives outside the async repository
layer because rotation detection is a sync hook on a sync probe
path. Honors the same defaulting as
``decnet.web.db.sqlite.repository.SQLiteRepository``.
"""
import os
from decnet.config import _ROOT
from decnet.web.db.sqlite.database import get_sync_engine
db_path = os.environ.get("DECNET_DB_PATH", str(_ROOT / "decnet.db"))
return get_sync_engine(db_path)
# ─── Default ports per probe type ───────────────────────────────────────────
# JARM: common C2 callback / TLS server ports
@@ -233,6 +255,14 @@ def _discover_attackers(json_path: Path, position: int) -> tuple[set[str], int]:
ProbePublishFn = Callable[[str, dict[str, Any]], None]
# Rotation recorder: takes (attacker_ip, port, probe_type, new_hash) and
# performs the rotation-detection upsert + derived-event emission for the
# DEBT-032 substrate-fingerprint flow. Optional; when None the prober
# behaves exactly as before (raw fingerprint emit only, no rotation
# detection). Construction lives at worker startup so phase functions
# don't have to know about the DB engine.
RotationRecorderFn = Callable[[str, int, "ProbeType", str], None]
@_traced("prober.probe_cycle")
def _probe_cycle(
@@ -245,6 +275,7 @@ def _probe_cycle(
json_path: Path,
timeout: float = 5.0,
publish_fn: ProbePublishFn | None = None,
record_rotation: RotationRecorderFn | None = None,
) -> None:
"""
Probe all known attacker IPs with JARM, HASSH, and TCP/IP fingerprinting.
@@ -263,13 +294,13 @@ def _probe_cycle(
ip_probed = probed.setdefault(ip, {})
# Phase 1: JARM (TLS fingerprinting)
_jarm_phase(ip, ip_probed, jarm_ports, log_path, json_path, timeout, publish_fn)
_jarm_phase(ip, ip_probed, jarm_ports, log_path, json_path, timeout, publish_fn, record_rotation)
# Phase 2: HASSHServer (SSH fingerprinting)
_hassh_phase(ip, ip_probed, ssh_ports, log_path, json_path, timeout, publish_fn)
_hassh_phase(ip, ip_probed, ssh_ports, log_path, json_path, timeout, publish_fn, record_rotation)
# Phase 3: TCP/IP stack fingerprinting
_tcpfp_phase(ip, ip_probed, tcpfp_ports, log_path, json_path, timeout, publish_fn)
_tcpfp_phase(ip, ip_probed, tcpfp_ports, log_path, json_path, timeout, publish_fn, record_rotation)
@_traced("prober.jarm_phase")
@@ -281,6 +312,7 @@ def _jarm_phase(
json_path: Path,
timeout: float,
publish_fn: ProbePublishFn | None = None,
record_rotation: RotationRecorderFn | None = None,
) -> None:
"""JARM-fingerprint an IP on the given TLS ports."""
done = ip_probed.setdefault("jarm", set())
@@ -301,6 +333,8 @@ def _jarm_phase(
msg=f"JARM {ip}:{port} = {h}",
)
logger.info("prober: JARM %s:%d = %s", ip, port, h)
if record_rotation is not None:
record_rotation(ip, port, "jarm", h)
if publish_fn is not None:
publish_fn(
"jarm",
@@ -387,6 +421,7 @@ def _hassh_phase(
json_path: Path,
timeout: float,
publish_fn: ProbePublishFn | None = None,
record_rotation: RotationRecorderFn | None = None,
) -> None:
"""HASSHServer-fingerprint an IP on the given SSH ports."""
done = ip_probed.setdefault("hassh", set())
@@ -412,6 +447,8 @@ def _hassh_phase(
msg=f"HASSH {ip}:{port} = {result['hassh_server']}",
)
logger.info("prober: HASSH %s:%d = %s", ip, port, result["hassh_server"])
if record_rotation is not None:
record_rotation(ip, port, "hassh", result["hassh_server"])
if publish_fn is not None:
publish_fn(
"hassh",
@@ -445,6 +482,7 @@ def _tcpfp_phase(
json_path: Path,
timeout: float,
publish_fn: ProbePublishFn | None = None,
record_rotation: RotationRecorderFn | None = None,
) -> None:
"""TCP/IP stack fingerprint an IP on the given ports."""
done = ip_probed.setdefault("tcpfp", set())
@@ -478,6 +516,8 @@ def _tcpfp_phase(
msg=f"TCPFP {ip}:{port} = {result['tcpfp_hash']}",
)
logger.info("prober: TCPFP %s:%d = %s", ip, port, result["tcpfp_hash"])
if record_rotation is not None:
record_rotation(ip, port, "tcpfp", result["tcpfp_hash"])
if publish_fn is not None:
publish_fn(
"tcpfp",
@@ -586,6 +626,61 @@ async def prober_worker(
event_type,
)
# Substrate-rotation detection (DEBT-032) — open a sync engine for
# the prober's lifetime; recorder closes a session per call so we
# never hold a connection across phase boundaries. Failure to
# connect is non-fatal: probes continue, rotation detection is
# silently disabled.
rotation_engine: Engine | None = None
record_rotation: RotationRecorderFn | None = None
try:
rotation_engine = _build_sync_engine()
except Exception as exc: # noqa: BLE001
logger.warning(
"prober: rotation-detection DB unavailable, "
"running with rotation detection disabled: %s", exc,
)
if rotation_engine is not None:
def _publish_rotation(event_type: str, payload: dict[str, Any]) -> None:
raw_publish(
_topics.attacker(_topics.ATTACKER_FINGERPRINT_ROTATED),
payload,
event_type,
)
def _syslog_rotation(event_type: str, payload: dict[str, Any]) -> None:
_write_event(
log_path, json_path,
"fingerprint_rotated",
target_ip=payload["attacker_ip"],
target_port=str(payload["port"]),
probe_type=payload["probe_type"],
old_hash=payload.get("old_hash") or "",
new_hash=payload["new_hash"],
rotation_count=str(payload["rotation_count"]),
msg=(
f"FP rotation {payload['attacker_ip']}:{payload['port']} "
f"{payload['probe_type']} {payload.get('old_hash')}"
f"{payload['new_hash']}"
),
)
def record_rotation(
ip: str, port: int, probe_type: ProbeType, new_hash: str,
) -> None:
with Session(rotation_engine) as session:
record_fingerprint(
session,
attacker_ip=ip,
port=port,
probe_type=probe_type,
new_hash=new_hash,
ts=datetime.now(timezone.utc),
publish_fn=_publish_rotation,
syslog_fn=_syslog_rotation,
)
shutdown = asyncio.Event()
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "prober"))
control_task = asyncio.create_task(
@@ -612,6 +707,7 @@ async def prober_worker(
jarm_ports, hassh_ports, tcp_ports,
log_path, json_path, timeout,
_publish_attacker,
record_rotation,
)
try:
@@ -626,3 +722,6 @@ async def prober_worker(
if bus is not None:
with contextlib.suppress(Exception):
await bus.close()
if rotation_engine is not None:
with contextlib.suppress(Exception):
rotation_engine.dispose()

View File

@@ -45,6 +45,7 @@ from .auth import (
from .attackers import (
Attacker,
AttackerBehavior,
AttackerFingerprintState,
AttackerIdentity,
AttackersResponse,
SessionProfile,
@@ -242,6 +243,7 @@ __all__ = [
# attackers
"Attacker",
"AttackerBehavior",
"AttackerFingerprintState",
"AttackerIdentity",
"AttackerIntel",
"AttackersResponse",

View File

@@ -93,11 +93,48 @@ class Attacker(SQLModel, table=True):
# private/loopback addresses never resolve. 256 chars matches
# RFC 1035 max hostname length.
ptr_record: Optional[str] = Field(default=None, max_length=256)
# Substrate-rotation telemetry, maintained by
# ``decnet.correlation.fingerprint_rotation.record_fingerprint`` whenever
# the prober observes a new hash for an (attacker, port, probe_type)
# triple it has seen before. Lets the dashboard render "rotated 3×
# last 24h" without joining to AttackerFingerprintState.
rotation_count: int = Field(default=0)
last_rotation_at: Optional[datetime] = Field(default=None, index=True)
updated_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True
)
class AttackerFingerprintState(SQLModel, table=True):
"""Per-(attacker, port, probe_type) latest-hash row.
Sole purpose: give the prober memory across runs so it can detect when
an attacker's HASSH/JARM/TCP fingerprint flips for the same port — i.e.
they rotated their VPS, rebuilt their SSH server, swapped their TLS
cert. Diff detection lives in
``decnet.correlation.fingerprint_rotation``; the prober calls into
that library inline at each emit site and this table is the only
persistence it needs.
Bounded by ``attackers × probe families × ports`` — small in practice;
a busy fleet sees O(thousands) of rows, not O(millions).
"""
__tablename__ = "attacker_fingerprint_state"
uuid: str = Field(primary_key=True)
attacker_uuid: str = Field(foreign_key="attackers.uuid", index=True)
port: int
probe_type: str = Field(max_length=16) # "jarm" | "hassh" | "tcpfp"
last_hash: str = Field(max_length=128)
last_seen: datetime = Field(index=True)
rotation_count: int = Field(default=0)
__table_args__ = (
UniqueConstraint(
"attacker_uuid", "port", "probe_type",
name="uq_attacker_fingerprint_state_natural",
),
)
class AttackerIdentity(SQLModel, table=True):
"""
Resolved actor identity — the dedup'd "same hands" row that one or

View File

@@ -470,22 +470,14 @@ The threat-intel enrichment surface (DEBT-N/A: `feat(intel)` series) keys every
**Status:** Open. No operational impact today (single-IP attackers are the dominant case), but worth closing before the federation layer lands so the wire-format and API both speak in identity terms, not IP terms.
### DEBT-032 — Prober can't detect fingerprint rotation without mutation
**Files:** `decnet/prober/worker.py` (~lines 235, 286, 334, 392), `decnet/web/db/models.py` (new `decky_service_fingerprints` table).
### ~~DEBT-032 — Attacker fingerprint rotation detection~~ ✅ RESOLVED
**Files:** `decnet/correlation/fingerprint_rotation.py` (new), `decnet/prober/worker.py`, `decnet/web/db/models/attackers.py`, `decnet/bus/topics.py`.
Substrate identity is `(service_name, implementation_fingerprint)`, not service name alone. A base-image rebuild that rotates OpenSSH 8.4 → 9.2 — or any recompose that changes JARM / HASSH / TCP fingerprint without changing the service list — is a substrate transition from the attacker's recon POV, and today the correlation graph sees none of it.
Resolved 2026-05-03. **Reframed during planning:** the original entry described this as a per-decky substrate-integrity problem, but the prober probes *attackers*, not deckies. The actual gap was attacker substrate tracking — same attacker IP rotating their VPS, rebuilding their SSH server, swapping their TLS cert — invisible at correlator-time because nothing diffed consecutive hashes for the same `(attacker_ip, port, probe_type)` triple.
The prober already computes JARM (`worker.py:286`), HASSH (`worker.py:334`), and TCP fingerprint (`worker.py:392`), and emits each as RFC 5424 syslog + optional bus publish. What's missing is **per-(decky, service, probe_type) persistence** to diff against: the current dedup set `probed: dict[IP → {probe_type → set(ports)}]` (`worker.py:235`) is in-memory and scoped to one run, so any restart loses history and any same-IP probe on a changed substrate can't be detected as a change.
Implemented as a small library (`decnet.correlation.fingerprint_rotation.record_fingerprint`) called inline from the prober at each of the three emit sites (JARM / HASSH / TCPFP). No new worker daemon; the prober is still the only producer, just teaches it to derive a second event on hash flip. New `AttackerFingerprintState` table holds per-`(attacker_uuid, port, probe_type)` last-hash state. New bus topic `attacker.fingerprint_rotated` carries `{attacker_uuid, attacker_ip, port, probe_type, old_hash, new_hash, rotation_count, ts}`. `Attacker.rotation_count` and `Attacker.last_rotation_at` are stamped on every diff so the dashboard can render rotation telemetry without joining. Library is fully sync + unit-tested with injected publish_fn / syslog_fn callbacks.
**Design:**
1. New SQLModel table `decky_service_fingerprints` keyed by `(decky_name, service, probe_type)` with `last_hash, last_seen_at, sample_count`. One upsert per probe; bounded by fleet × probe families.
2. Prober reads `last_hash` before emitting; on diff, emits a new `substrate_fingerprint_changed` event (RFC 5424 syslog + `decky.{id}.fingerprint` bus topic) with `{decky, service, probe_type, old_hash, new_hash}`. On match, upsert the timestamp and skip the event.
3. Correlator consumes the new event kind into a parallel per-decky index (mirroring the mutation index landed in this session) and interleaves `🔍 decky-03 hassh drift` markers in `AttackerTraversal.fingerprints_during`.
4. Divergence detector: compare `substrate_state(t)` fold (mutations) vs `observed_identity(t)` fold (fingerprints) per decky. A fingerprint change without a preceding mutation ⇒ `substrate_divergence` finding — container drift, compromised base image, rootkit banner rewrite, or prober lag. Falls out of the data model for free once both streams exist.
**Prerequisite satisfied:** mutation event stream + correlator mutation-kind parser landed alongside this DEBT entry (commits `f875350`, `fa0cdb3`, `bf5ed7a`, `d4d8a2a` on `dev`). The fingerprint stream plugs into the same substrate: same RFC 5424 emission pattern, sibling per-decky engine index, same timeline interleaving.
**Status:** Open — deferred to its own commit sequence. The dedup state in `worker.py:235` is the only thing standing between "JARM hash computed" and "substrate rotation detected."
Out of scope (deferred): dashboard surfacing of `rotation_count`; attribution clustering across attackers (same JARM seen from different IPs); backfill from existing event store.
---
@@ -713,7 +705,7 @@ user who needs it.
| DEBT-029 | 🟡 Medium | Architecture / Bus | ✅ resolved |
| DEBT-030 | 🟡 Medium | Web / Live mutations | ✅ resolved (Phase A) |
| ~~DEBT-031~~ | ✅ | Workers / Bus integration | resolved |
| DEBT-032 | 🟡 Medium | Correlation / Prober | open |
| ~~DEBT-032~~ | ✅ | Correlation / Prober | resolved 2026-05-03 |
| DEBT-033 | 🟡 Medium | Storage / Session recording | open |
| ~~DEBT-035~~ | ✅ | Artifacts / Filesystem perms | resolved 2026-05-02 |
| DEBT-036 | 🟡 Medium | Correlation / Keystroke dynamics | open |
@@ -731,5 +723,5 @@ user who needs it.
| DEBT-048 | 🟡 Medium | TTP / Intel provider mapping review (recurring) | open / recurring |
| DEBT-049 | 🟡 Medium | TTP / Sigma adapter (post-v1) | open |
**Remaining open:** DEBT-011 (Alembic), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-032 (fingerprint rotation detection), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-042 (orchestrator failure-count window), DEBT-043 (frontend test framework), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-046 (mal-hash feed), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1).
**Remaining open:** DEBT-011 (Alembic), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-042 (orchestrator failure-count window), DEBT-043 (frontend test framework), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-046 (mal-hash feed), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1).
**Estimated remaining effort:** ~21 hours plus the new EmailLifter / TTP follow-ups. DEBT-030 Phase B (optimistic staged-buffer editor) is a follow-up, not debt.

View File

@@ -0,0 +1,236 @@
"""Unit tests for ``decnet.correlation.fingerprint_rotation``.
Pure library: in-memory SQLite + sync Session + collected callback
calls. No prober, no bus, no async. Each test seeds an Attacker row,
calls ``record_fingerprint``, asserts on the returned outcome + the
side-effects (state row, Attacker stamp, callback invocations).
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
import pytest
from sqlalchemy.engine import Engine
from sqlmodel import Session, SQLModel, create_engine, select
from decnet.correlation.fingerprint_rotation import (
record_fingerprint,
RotationOutcome,
)
from decnet.web.db.models import (
Attacker,
AttackerFingerprintState,
)
@pytest.fixture
def engine() -> Engine:
eng = create_engine("sqlite://", connect_args={"check_same_thread": False})
SQLModel.metadata.create_all(eng)
return eng
@pytest.fixture
def now() -> datetime:
return datetime(2026, 5, 3, 12, 0, 0, tzinfo=timezone.utc)
def _seed_attacker(session: Session, ip: str = "1.2.3.4") -> Attacker:
a = Attacker(
uuid="attacker-uuid-1",
ip=ip,
first_seen=datetime.now(timezone.utc),
last_seen=datetime.now(timezone.utc),
)
session.add(a)
session.commit()
session.refresh(a)
return a
class _Recorder:
"""Capture (event_type, payload) tuples from publish_fn / syslog_fn."""
def __init__(self) -> None:
self.calls: list[tuple[str, dict]] = []
def __call__(self, event_type: str, payload: dict) -> None:
self.calls.append((event_type, payload))
def test_no_attacker_row_returns_noop(engine, now):
publish, syslog = _Recorder(), _Recorder()
with Session(engine) as session:
outcome = record_fingerprint(
session,
attacker_ip="9.9.9.9",
port=22,
probe_type="hassh",
new_hash="abc",
ts=now,
publish_fn=publish,
syslog_fn=syslog,
)
assert outcome.kind == "no_attacker_row"
assert publish.calls == []
assert syslog.calls == []
with Session(engine) as session:
rows = session.exec(select(AttackerFingerprintState)).all()
assert rows == []
def test_first_sighting_creates_state_row_no_event(engine, now):
publish, syslog = _Recorder(), _Recorder()
with Session(engine) as session:
_seed_attacker(session)
outcome = record_fingerprint(
session,
attacker_ip="1.2.3.4",
port=22,
probe_type="hassh",
new_hash="hash-1",
ts=now,
publish_fn=publish,
syslog_fn=syslog,
)
assert outcome.kind == "first_sighting"
assert outcome.old_hash is None
assert outcome.new_hash == "hash-1"
assert outcome.rotation_count == 0
assert publish.calls == []
assert syslog.calls == []
with Session(engine) as session:
rows = session.exec(select(AttackerFingerprintState)).all()
assert len(rows) == 1
assert rows[0].last_hash == "hash-1"
assert rows[0].rotation_count == 0
a = session.exec(select(Attacker)).one()
assert a.rotation_count == 0
assert a.last_rotation_at is None
def test_unchanged_hash_bumps_last_seen_no_event(engine, now):
publish, syslog = _Recorder(), _Recorder()
later = now + timedelta(minutes=10)
with Session(engine) as session:
_seed_attacker(session)
record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
new_hash="hash-1", ts=now,
)
outcome = record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
new_hash="hash-1", ts=later,
publish_fn=publish, syslog_fn=syslog,
)
assert outcome.kind == "unchanged"
assert publish.calls == []
assert syslog.calls == []
with Session(engine) as session:
row = session.exec(select(AttackerFingerprintState)).one()
# SQLite strips tzinfo on round-trip; compare naive values.
assert row.last_seen.replace(tzinfo=timezone.utc) == later
assert row.rotation_count == 0
def test_rotated_emits_event_and_stamps_attacker(engine, now):
publish, syslog = _Recorder(), _Recorder()
later = now + timedelta(hours=1)
with Session(engine) as session:
_seed_attacker(session)
record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
new_hash="hash-1", ts=now,
)
outcome = record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
new_hash="hash-2", ts=later,
publish_fn=publish, syslog_fn=syslog,
)
assert outcome.kind == "rotated"
assert outcome.old_hash == "hash-1"
assert outcome.new_hash == "hash-2"
assert outcome.rotation_count == 1
assert len(publish.calls) == 1
assert len(syslog.calls) == 1
event_type, payload = publish.calls[0]
assert event_type == "attacker.fingerprint_rotated"
assert payload["attacker_uuid"] == "attacker-uuid-1"
assert payload["attacker_ip"] == "1.2.3.4"
assert payload["port"] == 22
assert payload["probe_type"] == "hassh"
assert payload["old_hash"] == "hash-1"
assert payload["new_hash"] == "hash-2"
assert payload["rotation_count"] == 1
assert payload["ts"] == later.isoformat()
with Session(engine) as session:
a = session.exec(select(Attacker)).one()
assert a.rotation_count == 1
assert a.last_rotation_at is not None
assert a.last_rotation_at.replace(tzinfo=timezone.utc) == later
row = session.exec(select(AttackerFingerprintState)).one()
assert row.last_hash == "hash-2"
assert row.rotation_count == 1
def test_three_probe_types_independent(engine, now):
with Session(engine) as session:
_seed_attacker(session)
for ptype in ("jarm", "hassh", "tcpfp"):
record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type=ptype,
new_hash=f"{ptype}-1", ts=now,
)
with Session(engine) as session:
rows = session.exec(select(AttackerFingerprintState)).all()
assert {r.probe_type for r in rows} == {"jarm", "hassh", "tcpfp"}
assert {r.last_hash for r in rows} == {"jarm-1", "hassh-1", "tcpfp-1"}
def test_two_ports_same_probe_type_independent(engine, now):
with Session(engine) as session:
_seed_attacker(session)
for port in (22, 2222):
record_fingerprint(
session,
attacker_ip="1.2.3.4", port=port, probe_type="hassh",
new_hash=f"hash-{port}", ts=now,
)
with Session(engine) as session:
rows = session.exec(select(AttackerFingerprintState)).all()
assert {r.port for r in rows} == {22, 2222}
def test_multiple_rotations_increment_counter(engine, now):
publish = _Recorder()
with Session(engine) as session:
_seed_attacker(session)
record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
new_hash="h1", ts=now, publish_fn=publish,
)
record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
new_hash="h2", ts=now + timedelta(minutes=5), publish_fn=publish,
)
record_fingerprint(
session,
attacker_ip="1.2.3.4", port=22, probe_type="hassh",
new_hash="h3", ts=now + timedelta(minutes=10), publish_fn=publish,
)
assert len(publish.calls) == 2 # first call was first_sighting (no event)
with Session(engine) as session:
a = session.exec(select(Attacker)).one()
assert a.rotation_count == 2
row = session.exec(select(AttackerFingerprintState)).one()
assert row.rotation_count == 2
assert row.last_hash == "h3"

View File

@@ -0,0 +1,142 @@
"""Integration test: prober phase functions invoke the rotation recorder.
The prober worker constructs the recorder closure at startup; here we
verify that ``_probe_cycle`` threads a recorder through to JARM / HASSH
/ TCPFP phases and that the recorder gets the (ip, port, probe_type,
hash) tuple it expects. The library itself is unit-tested separately.
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock, patch
from decnet.prober.worker import _probe_cycle
@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None)
@patch("decnet.prober.worker.tcp_fingerprint", return_value=None)
@patch("decnet.prober.worker.hassh_server", return_value=None)
@patch("decnet.prober.worker.jarm_hash")
def test_jarm_phase_calls_recorder(
mock_jarm: MagicMock,
_mock_hassh: MagicMock,
_mock_tcpfp: MagicMock,
_mock_cert: MagicMock,
tmp_path: Path,
):
mock_jarm.return_value = "c0c" * 10 + "a" * 32
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
rec_calls: list[tuple] = []
recorder = lambda ip, port, ptype, h: rec_calls.append((ip, port, ptype, h)) # noqa: E731
_probe_cycle(
{"10.0.0.5"}, {},
[443], [], [],
log_path, json_path,
timeout=1.0,
publish_fn=None,
record_rotation=recorder,
)
assert rec_calls == [("10.0.0.5", 443, "jarm", "c0c" * 10 + "a" * 32)]
@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None)
@patch("decnet.prober.worker.tcp_fingerprint", return_value=None)
@patch("decnet.prober.worker.hassh_server")
@patch("decnet.prober.worker.jarm_hash", return_value="")
def test_hassh_phase_calls_recorder(
_mock_jarm: MagicMock,
mock_hassh: MagicMock,
_mock_tcpfp: MagicMock,
_mock_cert: MagicMock,
tmp_path: Path,
):
mock_hassh.return_value = {
"hassh_server": "deadbeef",
"banner": "SSH-2.0-OpenSSH_9.2",
"kex_algorithms": "x",
"encryption_s2c": "x",
"mac_s2c": "x",
"compression_s2c": "x",
}
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
rec_calls: list[tuple] = []
recorder = lambda ip, port, ptype, h: rec_calls.append((ip, port, ptype, h)) # noqa: E731
_probe_cycle(
{"10.0.0.5"}, {},
[], [22], [],
log_path, json_path,
timeout=1.0,
publish_fn=None,
record_rotation=recorder,
)
assert rec_calls == [("10.0.0.5", 22, "hassh", "deadbeef")]
@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None)
@patch("decnet.prober.worker.tcp_fingerprint")
@patch("decnet.prober.worker.hassh_server", return_value=None)
@patch("decnet.prober.worker.jarm_hash", return_value="")
def test_tcpfp_phase_calls_recorder(
_mock_jarm, _mock_hassh, mock_tcpfp, _mock_cert, tmp_path: Path,
):
mock_tcpfp.return_value = {
"tcpfp_hash": "tcpfp-hash-1",
"tcpfp_raw": "raw",
"ttl": 64,
"window_size": 65535,
"df_bit": True,
"mss": 1460,
"window_scale": 7,
"sack_ok": True,
"timestamp": True,
"options_order": "MSS,SACK,TS,NOP,WS",
"tos": 0,
"dscp": 0,
"ecn": 0,
"server_isn": 0,
}
log_path = tmp_path / "decnet.log"
json_path = tmp_path / "decnet.json"
rec_calls: list[tuple] = []
recorder = lambda ip, port, ptype, h: rec_calls.append((ip, port, ptype, h)) # noqa: E731
_probe_cycle(
{"10.0.0.5"}, {},
[], [], [22],
log_path, json_path,
timeout=1.0,
publish_fn=None,
record_rotation=recorder,
)
assert rec_calls == [("10.0.0.5", 22, "tcpfp", "tcpfp-hash-1")]
@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None)
@patch("decnet.prober.worker.tcp_fingerprint", return_value=None)
@patch("decnet.prober.worker.hassh_server", return_value=None)
@patch("decnet.prober.worker.jarm_hash")
def test_recorder_optional_no_crash_when_none(
mock_jarm: MagicMock,
_mock_hassh: MagicMock,
_mock_tcpfp: MagicMock,
_mock_cert: MagicMock,
tmp_path: Path,
):
"""record_rotation=None must keep the prober's pre-DEBT-032 behavior."""
mock_jarm.return_value = "c0c" * 10 + "a" * 32
_probe_cycle(
{"10.0.0.5"}, {},
[443], [], [],
tmp_path / "decnet.log", tmp_path / "decnet.json",
timeout=1.0,
publish_fn=None,
record_rotation=None,
)
# No error, probe completes.