diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index d697b0bf..1f5adb93 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -114,6 +114,14 @@ ATTACKER_SCORED = "scored" # Distinct from ``observed`` which is the correlator's first-sight signal — # a fingerprint is additional evidence about an already-observed attacker. ATTACKER_FINGERPRINTED = "fingerprinted" +# Published when the prober observes a NEW hash for an +# (attacker_ip, port, probe_type) triple it has seen before — i.e. the +# attacker rotated their VPS, rebuilt their SSH server, swapped their +# TLS cert. Distinct from ``fingerprinted`` which fires on every probe +# result; ``fingerprint_rotated`` fires only on diff and carries both +# old_hash + new_hash. Producer: prober (via the rotation library); +# consumers: dashboard, forensics, attribution clustering. +ATTACKER_FINGERPRINT_ROTATED = "fingerprint_rotated" ATTACKER_SESSION_STARTED = "session.started" ATTACKER_SESSION_ENDED = "session.ended" # Published by the ``decnet enrich`` worker after an enrichment pass diff --git a/decnet/correlation/fingerprint_rotation.py b/decnet/correlation/fingerprint_rotation.py new file mode 100644 index 00000000..90d262ab --- /dev/null +++ b/decnet/correlation/fingerprint_rotation.py @@ -0,0 +1,153 @@ +"""Attacker substrate-fingerprint rotation detection. + +Called inline from the prober at each fingerprint emit site. Looks up +the last persisted hash for ``(attacker_uuid, port, probe_type)``; +when the new hash differs from the last one, emits a derived +``attacker.fingerprint_rotated`` event (bus + RFC 5424 syslog) and +stamps the ``Attacker`` row's rotation telemetry. + +This is a pure library — no daemon, no async loop. The prober is the +only producer. We just teach it to derive a second event on hash +flip without standing up another worker (DEBT-032). +""" +from __future__ import annotations + +import uuid as _uuid +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Callable, Literal + +from sqlmodel import Session, select + +from decnet.web.db.models import Attacker, AttackerFingerprintState + +ProbeType = Literal["jarm", "hassh", "tcpfp"] +RotationKind = Literal[ + "no_attacker_row", # caller raced ahead of correlator; skip silently + "first_sighting", # state row created, no prior hash + "unchanged", # same hash as last sighting + "rotated", # hash differs; event emitted, Attacker stamped +] + +PublishFn = Callable[[str, dict[str, Any]], None] +SyslogFn = Callable[[str, dict[str, Any]], None] + + +@dataclass +class RotationOutcome: + """Return shape of :func:`record_fingerprint`. Caller usually + ignores it; useful for tests + tracing.""" + kind: RotationKind + old_hash: str | None + new_hash: str + rotation_count: int + + +_ROTATED_EVENT_TYPE = "attacker.fingerprint_rotated" + + +def record_fingerprint( + session: Session, + *, + attacker_ip: str, + port: int, + probe_type: ProbeType, + new_hash: str, + ts: datetime, + publish_fn: PublishFn | None = None, + syslog_fn: SyslogFn | None = None, +) -> RotationOutcome: + """Upsert state row; on hash diff, emit derived event + stamp. + + Resolves ``attacker_uuid`` from ``attacker_ip`` via the existing + Attacker table. If no Attacker row exists yet (the prober raced + ahead of the correlator), returns ``kind="no_attacker_row"`` and + does nothing — the next probe cycle will pick it up once the + correlator has caught up. + + State upsert + Attacker stamp + publish + syslog are committed in + one transaction so a partial failure can't desync state from + what was emitted. + """ + attacker = session.exec( + select(Attacker).where(Attacker.ip == attacker_ip) + ).first() + if attacker is None: + return RotationOutcome( + kind="no_attacker_row", + old_hash=None, + new_hash=new_hash, + rotation_count=0, + ) + + row = session.exec( + select(AttackerFingerprintState).where( + AttackerFingerprintState.attacker_uuid == attacker.uuid, + AttackerFingerprintState.port == port, + AttackerFingerprintState.probe_type == probe_type, + ) + ).first() + + if row is None: + session.add(AttackerFingerprintState( + uuid=str(_uuid.uuid4()), + attacker_uuid=attacker.uuid, + port=port, + probe_type=probe_type, + last_hash=new_hash, + last_seen=ts, + rotation_count=0, + )) + session.commit() + return RotationOutcome( + kind="first_sighting", + old_hash=None, + new_hash=new_hash, + rotation_count=0, + ) + + if row.last_hash == new_hash: + row.last_seen = ts + session.add(row) + session.commit() + return RotationOutcome( + kind="unchanged", + old_hash=row.last_hash, + new_hash=new_hash, + rotation_count=row.rotation_count, + ) + + old_hash = row.last_hash + row.last_hash = new_hash + row.last_seen = ts + row.rotation_count += 1 + session.add(row) + + attacker.rotation_count += 1 + attacker.last_rotation_at = ts + session.add(attacker) + + payload: dict[str, Any] = { + "attacker_uuid": attacker.uuid, + "attacker_ip": attacker_ip, + "port": port, + "probe_type": probe_type, + "old_hash": old_hash, + "new_hash": new_hash, + "rotation_count": row.rotation_count, + "ts": ts.isoformat(), + } + + if publish_fn is not None: + publish_fn(_ROTATED_EVENT_TYPE, payload) + if syslog_fn is not None: + syslog_fn(_ROTATED_EVENT_TYPE, payload) + + session.commit() + + return RotationOutcome( + kind="rotated", + old_hash=old_hash, + new_hash=new_hash, + rotation_count=row.rotation_count, + ) diff --git a/decnet/prober/worker.py b/decnet/prober/worker.py index 52da4c17..8013f74e 100644 --- a/decnet/prober/worker.py +++ b/decnet/prober/worker.py @@ -27,6 +27,9 @@ from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable +from sqlalchemy.engine import Engine +from sqlmodel import Session + from decnet.bus import topics as _topics from decnet.bus.base import BaseBus from decnet.bus.factory import get_bus @@ -35,6 +38,10 @@ from decnet.bus.publish import ( run_control_listener, run_health_heartbeat, ) +from decnet.correlation.fingerprint_rotation import ( + ProbeType, + record_fingerprint, +) from decnet.logging import get_logger from decnet.prober.hassh import hassh_server from decnet.prober.jarm import JARM_EMPTY_HASH, jarm_hash @@ -44,6 +51,21 @@ from decnet.telemetry import traced as _traced logger = get_logger("prober") + +def _build_sync_engine() -> Engine: + """Construct a sync SQLite engine for rotation-detection state. + + Used inline by the prober; it lives outside the async repository + layer because rotation detection is a sync hook on a sync probe + path. Honors the same defaulting as + ``decnet.web.db.sqlite.repository.SQLiteRepository``. + """ + import os + from decnet.config import _ROOT + from decnet.web.db.sqlite.database import get_sync_engine + db_path = os.environ.get("DECNET_DB_PATH", str(_ROOT / "decnet.db")) + return get_sync_engine(db_path) + # ─── Default ports per probe type ─────────────────────────────────────────── # JARM: common C2 callback / TLS server ports @@ -233,6 +255,14 @@ def _discover_attackers(json_path: Path, position: int) -> tuple[set[str], int]: ProbePublishFn = Callable[[str, dict[str, Any]], None] +# Rotation recorder: takes (attacker_ip, port, probe_type, new_hash) and +# performs the rotation-detection upsert + derived-event emission for the +# DEBT-032 substrate-fingerprint flow. Optional; when None the prober +# behaves exactly as before (raw fingerprint emit only, no rotation +# detection). Construction lives at worker startup so phase functions +# don't have to know about the DB engine. +RotationRecorderFn = Callable[[str, int, "ProbeType", str], None] + @_traced("prober.probe_cycle") def _probe_cycle( @@ -245,6 +275,7 @@ def _probe_cycle( json_path: Path, timeout: float = 5.0, publish_fn: ProbePublishFn | None = None, + record_rotation: RotationRecorderFn | None = None, ) -> None: """ Probe all known attacker IPs with JARM, HASSH, and TCP/IP fingerprinting. @@ -263,13 +294,13 @@ def _probe_cycle( ip_probed = probed.setdefault(ip, {}) # Phase 1: JARM (TLS fingerprinting) - _jarm_phase(ip, ip_probed, jarm_ports, log_path, json_path, timeout, publish_fn) + _jarm_phase(ip, ip_probed, jarm_ports, log_path, json_path, timeout, publish_fn, record_rotation) # Phase 2: HASSHServer (SSH fingerprinting) - _hassh_phase(ip, ip_probed, ssh_ports, log_path, json_path, timeout, publish_fn) + _hassh_phase(ip, ip_probed, ssh_ports, log_path, json_path, timeout, publish_fn, record_rotation) # Phase 3: TCP/IP stack fingerprinting - _tcpfp_phase(ip, ip_probed, tcpfp_ports, log_path, json_path, timeout, publish_fn) + _tcpfp_phase(ip, ip_probed, tcpfp_ports, log_path, json_path, timeout, publish_fn, record_rotation) @_traced("prober.jarm_phase") @@ -281,6 +312,7 @@ def _jarm_phase( json_path: Path, timeout: float, publish_fn: ProbePublishFn | None = None, + record_rotation: RotationRecorderFn | None = None, ) -> None: """JARM-fingerprint an IP on the given TLS ports.""" done = ip_probed.setdefault("jarm", set()) @@ -301,6 +333,8 @@ def _jarm_phase( msg=f"JARM {ip}:{port} = {h}", ) logger.info("prober: JARM %s:%d = %s", ip, port, h) + if record_rotation is not None: + record_rotation(ip, port, "jarm", h) if publish_fn is not None: publish_fn( "jarm", @@ -387,6 +421,7 @@ def _hassh_phase( json_path: Path, timeout: float, publish_fn: ProbePublishFn | None = None, + record_rotation: RotationRecorderFn | None = None, ) -> None: """HASSHServer-fingerprint an IP on the given SSH ports.""" done = ip_probed.setdefault("hassh", set()) @@ -412,6 +447,8 @@ def _hassh_phase( msg=f"HASSH {ip}:{port} = {result['hassh_server']}", ) logger.info("prober: HASSH %s:%d = %s", ip, port, result["hassh_server"]) + if record_rotation is not None: + record_rotation(ip, port, "hassh", result["hassh_server"]) if publish_fn is not None: publish_fn( "hassh", @@ -445,6 +482,7 @@ def _tcpfp_phase( json_path: Path, timeout: float, publish_fn: ProbePublishFn | None = None, + record_rotation: RotationRecorderFn | None = None, ) -> None: """TCP/IP stack fingerprint an IP on the given ports.""" done = ip_probed.setdefault("tcpfp", set()) @@ -478,6 +516,8 @@ def _tcpfp_phase( msg=f"TCPFP {ip}:{port} = {result['tcpfp_hash']}", ) logger.info("prober: TCPFP %s:%d = %s", ip, port, result["tcpfp_hash"]) + if record_rotation is not None: + record_rotation(ip, port, "tcpfp", result["tcpfp_hash"]) if publish_fn is not None: publish_fn( "tcpfp", @@ -586,6 +626,61 @@ async def prober_worker( event_type, ) + # Substrate-rotation detection (DEBT-032) — open a sync engine for + # the prober's lifetime; recorder closes a session per call so we + # never hold a connection across phase boundaries. Failure to + # connect is non-fatal: probes continue, rotation detection is + # silently disabled. + rotation_engine: Engine | None = None + record_rotation: RotationRecorderFn | None = None + try: + rotation_engine = _build_sync_engine() + except Exception as exc: # noqa: BLE001 + logger.warning( + "prober: rotation-detection DB unavailable, " + "running with rotation detection disabled: %s", exc, + ) + + if rotation_engine is not None: + def _publish_rotation(event_type: str, payload: dict[str, Any]) -> None: + raw_publish( + _topics.attacker(_topics.ATTACKER_FINGERPRINT_ROTATED), + payload, + event_type, + ) + + def _syslog_rotation(event_type: str, payload: dict[str, Any]) -> None: + _write_event( + log_path, json_path, + "fingerprint_rotated", + target_ip=payload["attacker_ip"], + target_port=str(payload["port"]), + probe_type=payload["probe_type"], + old_hash=payload.get("old_hash") or "", + new_hash=payload["new_hash"], + rotation_count=str(payload["rotation_count"]), + msg=( + f"FP rotation {payload['attacker_ip']}:{payload['port']} " + f"{payload['probe_type']} {payload.get('old_hash')} → " + f"{payload['new_hash']}" + ), + ) + + def record_rotation( + ip: str, port: int, probe_type: ProbeType, new_hash: str, + ) -> None: + with Session(rotation_engine) as session: + record_fingerprint( + session, + attacker_ip=ip, + port=port, + probe_type=probe_type, + new_hash=new_hash, + ts=datetime.now(timezone.utc), + publish_fn=_publish_rotation, + syslog_fn=_syslog_rotation, + ) + shutdown = asyncio.Event() heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "prober")) control_task = asyncio.create_task( @@ -612,6 +707,7 @@ async def prober_worker( jarm_ports, hassh_ports, tcp_ports, log_path, json_path, timeout, _publish_attacker, + record_rotation, ) try: @@ -626,3 +722,6 @@ async def prober_worker( if bus is not None: with contextlib.suppress(Exception): await bus.close() + if rotation_engine is not None: + with contextlib.suppress(Exception): + rotation_engine.dispose() diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index 2db36826..18c08364 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -45,6 +45,7 @@ from .auth import ( from .attackers import ( Attacker, AttackerBehavior, + AttackerFingerprintState, AttackerIdentity, AttackersResponse, SessionProfile, @@ -242,6 +243,7 @@ __all__ = [ # attackers "Attacker", "AttackerBehavior", + "AttackerFingerprintState", "AttackerIdentity", "AttackerIntel", "AttackersResponse", diff --git a/decnet/web/db/models/attackers.py b/decnet/web/db/models/attackers.py index 9ac3ed62..061e5be5 100644 --- a/decnet/web/db/models/attackers.py +++ b/decnet/web/db/models/attackers.py @@ -93,11 +93,48 @@ class Attacker(SQLModel, table=True): # private/loopback addresses never resolve. 256 chars matches # RFC 1035 max hostname length. ptr_record: Optional[str] = Field(default=None, max_length=256) + # Substrate-rotation telemetry, maintained by + # ``decnet.correlation.fingerprint_rotation.record_fingerprint`` whenever + # the prober observes a new hash for an (attacker, port, probe_type) + # triple it has seen before. Lets the dashboard render "rotated 3× + # last 24h" without joining to AttackerFingerprintState. + rotation_count: int = Field(default=0) + last_rotation_at: Optional[datetime] = Field(default=None, index=True) updated_at: datetime = Field( default_factory=lambda: datetime.now(timezone.utc), index=True ) +class AttackerFingerprintState(SQLModel, table=True): + """Per-(attacker, port, probe_type) latest-hash row. + + Sole purpose: give the prober memory across runs so it can detect when + an attacker's HASSH/JARM/TCP fingerprint flips for the same port — i.e. + they rotated their VPS, rebuilt their SSH server, swapped their TLS + cert. Diff detection lives in + ``decnet.correlation.fingerprint_rotation``; the prober calls into + that library inline at each emit site and this table is the only + persistence it needs. + + Bounded by ``attackers × probe families × ports`` — small in practice; + a busy fleet sees O(thousands) of rows, not O(millions). + """ + __tablename__ = "attacker_fingerprint_state" + uuid: str = Field(primary_key=True) + attacker_uuid: str = Field(foreign_key="attackers.uuid", index=True) + port: int + probe_type: str = Field(max_length=16) # "jarm" | "hassh" | "tcpfp" + last_hash: str = Field(max_length=128) + last_seen: datetime = Field(index=True) + rotation_count: int = Field(default=0) + __table_args__ = ( + UniqueConstraint( + "attacker_uuid", "port", "probe_type", + name="uq_attacker_fingerprint_state_natural", + ), + ) + + class AttackerIdentity(SQLModel, table=True): """ Resolved actor identity — the dedup'd "same hands" row that one or diff --git a/development/DEBT.md b/development/DEBT.md index 55ef7d1e..bcc168a7 100644 --- a/development/DEBT.md +++ b/development/DEBT.md @@ -470,22 +470,14 @@ The threat-intel enrichment surface (DEBT-N/A: `feat(intel)` series) keys every **Status:** Open. No operational impact today (single-IP attackers are the dominant case), but worth closing before the federation layer lands so the wire-format and API both speak in identity terms, not IP terms. -### DEBT-032 — Prober can't detect fingerprint rotation without mutation -**Files:** `decnet/prober/worker.py` (~lines 235, 286, 334, 392), `decnet/web/db/models.py` (new `decky_service_fingerprints` table). +### ~~DEBT-032 — Attacker fingerprint rotation detection~~ ✅ RESOLVED +**Files:** `decnet/correlation/fingerprint_rotation.py` (new), `decnet/prober/worker.py`, `decnet/web/db/models/attackers.py`, `decnet/bus/topics.py`. -Substrate identity is `(service_name, implementation_fingerprint)`, not service name alone. A base-image rebuild that rotates OpenSSH 8.4 → 9.2 — or any recompose that changes JARM / HASSH / TCP fingerprint without changing the service list — is a substrate transition from the attacker's recon POV, and today the correlation graph sees none of it. +Resolved 2026-05-03. **Reframed during planning:** the original entry described this as a per-decky substrate-integrity problem, but the prober probes *attackers*, not deckies. The actual gap was attacker substrate tracking — same attacker IP rotating their VPS, rebuilding their SSH server, swapping their TLS cert — invisible at correlator-time because nothing diffed consecutive hashes for the same `(attacker_ip, port, probe_type)` triple. -The prober already computes JARM (`worker.py:286`), HASSH (`worker.py:334`), and TCP fingerprint (`worker.py:392`), and emits each as RFC 5424 syslog + optional bus publish. What's missing is **per-(decky, service, probe_type) persistence** to diff against: the current dedup set `probed: dict[IP → {probe_type → set(ports)}]` (`worker.py:235`) is in-memory and scoped to one run, so any restart loses history and any same-IP probe on a changed substrate can't be detected as a change. +Implemented as a small library (`decnet.correlation.fingerprint_rotation.record_fingerprint`) called inline from the prober at each of the three emit sites (JARM / HASSH / TCPFP). No new worker daemon; the prober is still the only producer, just teaches it to derive a second event on hash flip. New `AttackerFingerprintState` table holds per-`(attacker_uuid, port, probe_type)` last-hash state. New bus topic `attacker.fingerprint_rotated` carries `{attacker_uuid, attacker_ip, port, probe_type, old_hash, new_hash, rotation_count, ts}`. `Attacker.rotation_count` and `Attacker.last_rotation_at` are stamped on every diff so the dashboard can render rotation telemetry without joining. Library is fully sync + unit-tested with injected publish_fn / syslog_fn callbacks. -**Design:** -1. New SQLModel table `decky_service_fingerprints` keyed by `(decky_name, service, probe_type)` with `last_hash, last_seen_at, sample_count`. One upsert per probe; bounded by fleet × probe families. -2. Prober reads `last_hash` before emitting; on diff, emits a new `substrate_fingerprint_changed` event (RFC 5424 syslog + `decky.{id}.fingerprint` bus topic) with `{decky, service, probe_type, old_hash, new_hash}`. On match, upsert the timestamp and skip the event. -3. Correlator consumes the new event kind into a parallel per-decky index (mirroring the mutation index landed in this session) and interleaves `🔍 decky-03 hassh drift` markers in `AttackerTraversal.fingerprints_during`. -4. Divergence detector: compare `substrate_state(t)` fold (mutations) vs `observed_identity(t)` fold (fingerprints) per decky. A fingerprint change without a preceding mutation ⇒ `substrate_divergence` finding — container drift, compromised base image, rootkit banner rewrite, or prober lag. Falls out of the data model for free once both streams exist. - -**Prerequisite satisfied:** mutation event stream + correlator mutation-kind parser landed alongside this DEBT entry (commits `f875350`, `fa0cdb3`, `bf5ed7a`, `d4d8a2a` on `dev`). The fingerprint stream plugs into the same substrate: same RFC 5424 emission pattern, sibling per-decky engine index, same timeline interleaving. - -**Status:** Open — deferred to its own commit sequence. The dedup state in `worker.py:235` is the only thing standing between "JARM hash computed" and "substrate rotation detected." +Out of scope (deferred): dashboard surfacing of `rotation_count`; attribution clustering across attackers (same JARM seen from different IPs); backfill from existing event store. --- @@ -713,7 +705,7 @@ user who needs it. | DEBT-029 | 🟡 Medium | Architecture / Bus | ✅ resolved | | DEBT-030 | 🟡 Medium | Web / Live mutations | ✅ resolved (Phase A) | | ~~DEBT-031~~ | ✅ | Workers / Bus integration | resolved | -| DEBT-032 | 🟡 Medium | Correlation / Prober | open | +| ~~DEBT-032~~ | ✅ | Correlation / Prober | resolved 2026-05-03 | | DEBT-033 | 🟡 Medium | Storage / Session recording | open | | ~~DEBT-035~~ | ✅ | Artifacts / Filesystem perms | resolved 2026-05-02 | | DEBT-036 | 🟡 Medium | Correlation / Keystroke dynamics | open | @@ -731,5 +723,5 @@ user who needs it. | DEBT-048 | 🟡 Medium | TTP / Intel provider mapping review (recurring) | open / recurring | | DEBT-049 | 🟡 Medium | TTP / Sigma adapter (post-v1) | open | -**Remaining open:** DEBT-011 (Alembic), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-032 (fingerprint rotation detection), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-042 (orchestrator failure-count window), DEBT-043 (frontend test framework), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-046 (mal-hash feed), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1). +**Remaining open:** DEBT-011 (Alembic), DEBT-027 (Dynamic bait store), DEBT-028 (deploy endpoint tests), DEBT-033 (transcript shard rotation), DEBT-036 (session-profile ingester), DEBT-037 (webhook delivery hardening), DEBT-038 (SSH PAM cred-capture limitations — document-only), DEBT-042 (orchestrator failure-count window), DEBT-043 (frontend test framework), DEBT-045 (EmailLifter heavyweight — partial paid; carved-out follow-ups remain), DEBT-046 (mal-hash feed), DEBT-048 (TTP intel provider mapping review — recurring quarterly), DEBT-049 (TTP Sigma adapter — post-v1). **Estimated remaining effort:** ~21 hours plus the new EmailLifter / TTP follow-ups. DEBT-030 Phase B (optimistic staged-buffer editor) is a follow-up, not debt. diff --git a/tests/correlation/test_fingerprint_rotation.py b/tests/correlation/test_fingerprint_rotation.py new file mode 100644 index 00000000..372e3e87 --- /dev/null +++ b/tests/correlation/test_fingerprint_rotation.py @@ -0,0 +1,236 @@ +"""Unit tests for ``decnet.correlation.fingerprint_rotation``. + +Pure library: in-memory SQLite + sync Session + collected callback +calls. No prober, no bus, no async. Each test seeds an Attacker row, +calls ``record_fingerprint``, asserts on the returned outcome + the +side-effects (state row, Attacker stamp, callback invocations). +""" +from __future__ import annotations + +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy.engine import Engine +from sqlmodel import Session, SQLModel, create_engine, select + +from decnet.correlation.fingerprint_rotation import ( + record_fingerprint, + RotationOutcome, +) +from decnet.web.db.models import ( + Attacker, + AttackerFingerprintState, +) + + +@pytest.fixture +def engine() -> Engine: + eng = create_engine("sqlite://", connect_args={"check_same_thread": False}) + SQLModel.metadata.create_all(eng) + return eng + + +@pytest.fixture +def now() -> datetime: + return datetime(2026, 5, 3, 12, 0, 0, tzinfo=timezone.utc) + + +def _seed_attacker(session: Session, ip: str = "1.2.3.4") -> Attacker: + a = Attacker( + uuid="attacker-uuid-1", + ip=ip, + first_seen=datetime.now(timezone.utc), + last_seen=datetime.now(timezone.utc), + ) + session.add(a) + session.commit() + session.refresh(a) + return a + + +class _Recorder: + """Capture (event_type, payload) tuples from publish_fn / syslog_fn.""" + def __init__(self) -> None: + self.calls: list[tuple[str, dict]] = [] + + def __call__(self, event_type: str, payload: dict) -> None: + self.calls.append((event_type, payload)) + + +def test_no_attacker_row_returns_noop(engine, now): + publish, syslog = _Recorder(), _Recorder() + with Session(engine) as session: + outcome = record_fingerprint( + session, + attacker_ip="9.9.9.9", + port=22, + probe_type="hassh", + new_hash="abc", + ts=now, + publish_fn=publish, + syslog_fn=syslog, + ) + assert outcome.kind == "no_attacker_row" + assert publish.calls == [] + assert syslog.calls == [] + with Session(engine) as session: + rows = session.exec(select(AttackerFingerprintState)).all() + assert rows == [] + + +def test_first_sighting_creates_state_row_no_event(engine, now): + publish, syslog = _Recorder(), _Recorder() + with Session(engine) as session: + _seed_attacker(session) + outcome = record_fingerprint( + session, + attacker_ip="1.2.3.4", + port=22, + probe_type="hassh", + new_hash="hash-1", + ts=now, + publish_fn=publish, + syslog_fn=syslog, + ) + assert outcome.kind == "first_sighting" + assert outcome.old_hash is None + assert outcome.new_hash == "hash-1" + assert outcome.rotation_count == 0 + assert publish.calls == [] + assert syslog.calls == [] + with Session(engine) as session: + rows = session.exec(select(AttackerFingerprintState)).all() + assert len(rows) == 1 + assert rows[0].last_hash == "hash-1" + assert rows[0].rotation_count == 0 + a = session.exec(select(Attacker)).one() + assert a.rotation_count == 0 + assert a.last_rotation_at is None + + +def test_unchanged_hash_bumps_last_seen_no_event(engine, now): + publish, syslog = _Recorder(), _Recorder() + later = now + timedelta(minutes=10) + with Session(engine) as session: + _seed_attacker(session) + record_fingerprint( + session, + attacker_ip="1.2.3.4", port=22, probe_type="hassh", + new_hash="hash-1", ts=now, + ) + outcome = record_fingerprint( + session, + attacker_ip="1.2.3.4", port=22, probe_type="hassh", + new_hash="hash-1", ts=later, + publish_fn=publish, syslog_fn=syslog, + ) + assert outcome.kind == "unchanged" + assert publish.calls == [] + assert syslog.calls == [] + with Session(engine) as session: + row = session.exec(select(AttackerFingerprintState)).one() + # SQLite strips tzinfo on round-trip; compare naive values. + assert row.last_seen.replace(tzinfo=timezone.utc) == later + assert row.rotation_count == 0 + + +def test_rotated_emits_event_and_stamps_attacker(engine, now): + publish, syslog = _Recorder(), _Recorder() + later = now + timedelta(hours=1) + with Session(engine) as session: + _seed_attacker(session) + record_fingerprint( + session, + attacker_ip="1.2.3.4", port=22, probe_type="hassh", + new_hash="hash-1", ts=now, + ) + outcome = record_fingerprint( + session, + attacker_ip="1.2.3.4", port=22, probe_type="hassh", + new_hash="hash-2", ts=later, + publish_fn=publish, syslog_fn=syslog, + ) + + assert outcome.kind == "rotated" + assert outcome.old_hash == "hash-1" + assert outcome.new_hash == "hash-2" + assert outcome.rotation_count == 1 + + assert len(publish.calls) == 1 + assert len(syslog.calls) == 1 + event_type, payload = publish.calls[0] + assert event_type == "attacker.fingerprint_rotated" + assert payload["attacker_uuid"] == "attacker-uuid-1" + assert payload["attacker_ip"] == "1.2.3.4" + assert payload["port"] == 22 + assert payload["probe_type"] == "hassh" + assert payload["old_hash"] == "hash-1" + assert payload["new_hash"] == "hash-2" + assert payload["rotation_count"] == 1 + assert payload["ts"] == later.isoformat() + + with Session(engine) as session: + a = session.exec(select(Attacker)).one() + assert a.rotation_count == 1 + assert a.last_rotation_at is not None + assert a.last_rotation_at.replace(tzinfo=timezone.utc) == later + row = session.exec(select(AttackerFingerprintState)).one() + assert row.last_hash == "hash-2" + assert row.rotation_count == 1 + + +def test_three_probe_types_independent(engine, now): + with Session(engine) as session: + _seed_attacker(session) + for ptype in ("jarm", "hassh", "tcpfp"): + record_fingerprint( + session, + attacker_ip="1.2.3.4", port=22, probe_type=ptype, + new_hash=f"{ptype}-1", ts=now, + ) + with Session(engine) as session: + rows = session.exec(select(AttackerFingerprintState)).all() + assert {r.probe_type for r in rows} == {"jarm", "hassh", "tcpfp"} + assert {r.last_hash for r in rows} == {"jarm-1", "hassh-1", "tcpfp-1"} + + +def test_two_ports_same_probe_type_independent(engine, now): + with Session(engine) as session: + _seed_attacker(session) + for port in (22, 2222): + record_fingerprint( + session, + attacker_ip="1.2.3.4", port=port, probe_type="hassh", + new_hash=f"hash-{port}", ts=now, + ) + with Session(engine) as session: + rows = session.exec(select(AttackerFingerprintState)).all() + assert {r.port for r in rows} == {22, 2222} + + +def test_multiple_rotations_increment_counter(engine, now): + publish = _Recorder() + with Session(engine) as session: + _seed_attacker(session) + record_fingerprint( + session, + attacker_ip="1.2.3.4", port=22, probe_type="hassh", + new_hash="h1", ts=now, publish_fn=publish, + ) + record_fingerprint( + session, + attacker_ip="1.2.3.4", port=22, probe_type="hassh", + new_hash="h2", ts=now + timedelta(minutes=5), publish_fn=publish, + ) + record_fingerprint( + session, + attacker_ip="1.2.3.4", port=22, probe_type="hassh", + new_hash="h3", ts=now + timedelta(minutes=10), publish_fn=publish, + ) + assert len(publish.calls) == 2 # first call was first_sighting (no event) + with Session(engine) as session: + a = session.exec(select(Attacker)).one() + assert a.rotation_count == 2 + row = session.exec(select(AttackerFingerprintState)).one() + assert row.rotation_count == 2 + assert row.last_hash == "h3" diff --git a/tests/prober/test_prober_rotation.py b/tests/prober/test_prober_rotation.py new file mode 100644 index 00000000..cfb67fcb --- /dev/null +++ b/tests/prober/test_prober_rotation.py @@ -0,0 +1,142 @@ +"""Integration test: prober phase functions invoke the rotation recorder. + +The prober worker constructs the recorder closure at startup; here we +verify that ``_probe_cycle`` threads a recorder through to JARM / HASSH +/ TCPFP phases and that the recorder gets the (ip, port, probe_type, +hash) tuple it expects. The library itself is unit-tested separately. +""" +from __future__ import annotations + +from pathlib import Path +from unittest.mock import MagicMock, patch + +from decnet.prober.worker import _probe_cycle + + +@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) +@patch("decnet.prober.worker.tcp_fingerprint", return_value=None) +@patch("decnet.prober.worker.hassh_server", return_value=None) +@patch("decnet.prober.worker.jarm_hash") +def test_jarm_phase_calls_recorder( + mock_jarm: MagicMock, + _mock_hassh: MagicMock, + _mock_tcpfp: MagicMock, + _mock_cert: MagicMock, + tmp_path: Path, +): + mock_jarm.return_value = "c0c" * 10 + "a" * 32 + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + rec_calls: list[tuple] = [] + recorder = lambda ip, port, ptype, h: rec_calls.append((ip, port, ptype, h)) # noqa: E731 + + _probe_cycle( + {"10.0.0.5"}, {}, + [443], [], [], + log_path, json_path, + timeout=1.0, + publish_fn=None, + record_rotation=recorder, + ) + + assert rec_calls == [("10.0.0.5", 443, "jarm", "c0c" * 10 + "a" * 32)] + + +@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) +@patch("decnet.prober.worker.tcp_fingerprint", return_value=None) +@patch("decnet.prober.worker.hassh_server") +@patch("decnet.prober.worker.jarm_hash", return_value="") +def test_hassh_phase_calls_recorder( + _mock_jarm: MagicMock, + mock_hassh: MagicMock, + _mock_tcpfp: MagicMock, + _mock_cert: MagicMock, + tmp_path: Path, +): + mock_hassh.return_value = { + "hassh_server": "deadbeef", + "banner": "SSH-2.0-OpenSSH_9.2", + "kex_algorithms": "x", + "encryption_s2c": "x", + "mac_s2c": "x", + "compression_s2c": "x", + } + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + rec_calls: list[tuple] = [] + recorder = lambda ip, port, ptype, h: rec_calls.append((ip, port, ptype, h)) # noqa: E731 + + _probe_cycle( + {"10.0.0.5"}, {}, + [], [22], [], + log_path, json_path, + timeout=1.0, + publish_fn=None, + record_rotation=recorder, + ) + + assert rec_calls == [("10.0.0.5", 22, "hassh", "deadbeef")] + + +@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) +@patch("decnet.prober.worker.tcp_fingerprint") +@patch("decnet.prober.worker.hassh_server", return_value=None) +@patch("decnet.prober.worker.jarm_hash", return_value="") +def test_tcpfp_phase_calls_recorder( + _mock_jarm, _mock_hassh, mock_tcpfp, _mock_cert, tmp_path: Path, +): + mock_tcpfp.return_value = { + "tcpfp_hash": "tcpfp-hash-1", + "tcpfp_raw": "raw", + "ttl": 64, + "window_size": 65535, + "df_bit": True, + "mss": 1460, + "window_scale": 7, + "sack_ok": True, + "timestamp": True, + "options_order": "MSS,SACK,TS,NOP,WS", + "tos": 0, + "dscp": 0, + "ecn": 0, + "server_isn": 0, + } + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + rec_calls: list[tuple] = [] + recorder = lambda ip, port, ptype, h: rec_calls.append((ip, port, ptype, h)) # noqa: E731 + + _probe_cycle( + {"10.0.0.5"}, {}, + [], [], [22], + log_path, json_path, + timeout=1.0, + publish_fn=None, + record_rotation=recorder, + ) + + assert rec_calls == [("10.0.0.5", 22, "tcpfp", "tcpfp-hash-1")] + + +@patch("decnet.prober.worker.fetch_leaf_cert", return_value=None) +@patch("decnet.prober.worker.tcp_fingerprint", return_value=None) +@patch("decnet.prober.worker.hassh_server", return_value=None) +@patch("decnet.prober.worker.jarm_hash") +def test_recorder_optional_no_crash_when_none( + mock_jarm: MagicMock, + _mock_hassh: MagicMock, + _mock_tcpfp: MagicMock, + _mock_cert: MagicMock, + tmp_path: Path, +): + """record_rotation=None must keep the prober's pre-DEBT-032 behavior.""" + mock_jarm.return_value = "c0c" * 10 + "a" * 32 + _probe_cycle( + {"10.0.0.5"}, {}, + [443], [], [], + tmp_path / "decnet.log", tmp_path / "decnet.json", + timeout=1.0, + publish_fn=None, + record_rotation=None, + ) + # No error, probe completes.