From d43303251d67633bd23b1ffd4d07ed7a9c3172c1 Mon Sep 17 00:00:00 2001 From: anti Date: Wed, 22 Apr 2026 22:23:27 -0400 Subject: [PATCH] feat(profiler): track SMTP victim domains per attacker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit New SmtpTarget table records each (attacker, domain) pair observed via the SMTP honeypots. Only the domain is stored — local-parts are dropped at ingestion, so this table holds no user-identifying data beyond the target organisation's identity. The profiler worker extracts domains from rcpt_to / rcpt_denied / message_accepted events, normalizes them (lowercase, strip local-part, drop blocked TLDs), and upserts one row per pair with a running count + first_seen / last_seen. Three repo methods shipped: * increment_smtp_target(attacker, domain) — upsert + bump * list_smtp_targets(attacker) — per-attacker view * smtp_target_seen(domain) — cross-attacker aggregate, shaped as the federation-gossip RPC that V2 will expose. The gossip-query shape is load-bearing: each operator can answer "have any of your attackers targeted corp1.com?" without leaking which attackers or when — the aggregate returns a bool + total count + first/last seen, nothing else. --- decnet/profiler/worker.py | 74 ++++++++++++++ decnet/web/db/models/__init__.py | 2 + decnet/web/db/models/attackers.py | 32 +++++- decnet/web/db/repository.py | 29 ++++++ decnet/web/db/sqlmodel_repo.py | 58 +++++++++++ tests/test_base_repo.py | 6 ++ tests/test_smtp_targets.py | 160 ++++++++++++++++++++++++++++++ 7 files changed, 360 insertions(+), 1 deletion(-) create mode 100644 tests/test_smtp_targets.py diff --git a/decnet/profiler/worker.py b/decnet/profiler/worker.py index d33f4767..99de0a0b 100644 --- a/decnet/profiler/worker.py +++ b/decnet/profiler/worker.py @@ -15,6 +15,7 @@ from __future__ import annotations import asyncio import contextlib import json +import re from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any, Callable @@ -47,6 +48,18 @@ _COMMAND_EVENT_TYPES = frozenset({ # Fields that carry the executed command/query text _COMMAND_FIELDS = ("command", "query", "input", "line", "sql", "cmd") +# SMTP events that carry a recipient email address. `rcpt_to` fires once per +# accepted RCPT (open-relay mode), `rcpt_denied` once per denied RCPT +# (harvester mode). `message_accepted` carries the comma-joined rcpt list +# on the final DATA commit — covered for replay safety, though every +# address it contains already arrived via `rcpt_to` earlier in the session. +_SMTP_RCPT_EVENTS = frozenset({"rcpt_to", "rcpt_denied", "message_accepted"}) + +# Pseudo-TLDs we never want to report on: the RFC 6761 special-use names +# plus common lab-only values. Matching happens on the *last* label so +# `foo.example.com` is filtered but `example.corp` is not. +_BLOCKED_TLDS = frozenset({"invalid", "test", "localhost", "local", "example"}) + @dataclass class _WorkerState: @@ -211,6 +224,17 @@ async def _update_profiles( _span.record_exception(exc) logger.error("attacker worker: behavior upsert failed for %s: %s", ip, exc) + # SMTP victim-domain tracking — extract domains from RCPT events + # and upsert one row per (attacker, domain) pair. Same + # soft-fail posture as the behavior rollup: errors here must + # not block the next attacker. + try: + for domain in _extract_smtp_domains(events): + await repo.increment_smtp_target(attacker_uuid, domain) + except Exception as exc: + _span.record_exception(exc) + logger.error("attacker worker: smtp target upsert failed for %s: %s", ip, exc) + def _build_record( ip: str, @@ -285,3 +309,53 @@ def _extract_commands_from_events(events: list[LogEvent]) -> list[dict[str, Any] }) return commands + + +_SMTP_ADDR_RE = re.compile(r"@]+)@([A-Za-z0-9.-]+\.[A-Za-z]{2,})>?") + + +def _normalize_smtp_domain(raw: str) -> str | None: + """Extract a lowercased domain from an envelope-address fragment. + + Returns None when the input doesn't look like an email address or the + resulting TLD is on the blocklist. Local-parts (the bit before `@`) + are intentionally dropped — this table stores no user-identifying + data, only the targeted organisation's domain. + """ + if not raw: + return None + match = _SMTP_ADDR_RE.search(raw.strip()) + if not match: + return None + domain = match.group(2).lower().strip(".") + if not domain: + return None + tld = domain.rsplit(".", 1)[-1] + if tld in _BLOCKED_TLDS: + return None + return domain + + +def _extract_smtp_domains(events: list[LogEvent]) -> set[str]: + """Collect the set of victim domains an attacker targeted via SMTP. + + Deduped at the attacker level — repeated hits on the same domain + within a single batch collapse to one upsert, and the per-row count + is bumped by ``increment_smtp_target`` on each call. The set return + type is intentional: we care about *which* domains were seen, not + the per-batch frequency (which the DB aggregates over time). + """ + domains: set[str] = set() + for event in events: + if event.service != "smtp" or event.event_type not in _SMTP_RCPT_EVENTS: + continue + if event.event_type == "message_accepted": + raw_list = event.fields.get("rcpt_to", "") + candidates = raw_list.split(",") if raw_list else [] + else: + candidates = [event.fields.get("value", "")] + for candidate in candidates: + domain = _normalize_smtp_domain(candidate) + if domain: + domains.add(domain) + return domains diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index ae796f90..69295d07 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -30,6 +30,7 @@ from .attackers import ( AttackerBehavior, AttackersResponse, SessionProfile, + SmtpTarget, ) from .deploy import ( DeployIniRequest, @@ -137,6 +138,7 @@ __all__ = [ "AttackerBehavior", "AttackersResponse", "SessionProfile", + "SmtpTarget", # deploy "DeployIniRequest", "MutateIntervalRequest", diff --git a/decnet/web/db/models/attackers.py b/decnet/web/db/models/attackers.py index 1a925db9..c612b077 100644 --- a/decnet/web/db/models/attackers.py +++ b/decnet/web/db/models/attackers.py @@ -3,7 +3,7 @@ from datetime import datetime, timezone from typing import Any, List, Optional from pydantic import BaseModel -from sqlalchemy import Column, Text +from sqlalchemy import Column, Text, UniqueConstraint from sqlmodel import Field, SQLModel from ._base import _BIG_TEXT @@ -143,6 +143,36 @@ class SessionProfile(SQLModel, table=True): ) +class SmtpTarget(SQLModel, table=True): + """ + Per-attacker list of victim domains observed via the SMTP honeypots. + + Each row is one (attacker_uuid, domain) pair — an attacker who relays + mail to 500 addresses at acme.com collapses into a single row with + count=500. Only the *domain* is stored; local-parts (the bit before + `@`) are dropped at ingestion, so this table contains no PII beyond + the target organisation's identity. + + Shape is designed for future V2 federation gossip: the + `smtp_target_seen(domain)` query returns aggregate counts with zero + cross-org attacker leakage — each operator can answer "have you seen + this domain being targeted?" without exposing *which* attackers did. + """ + __tablename__ = "smtp_targets" + id: Optional[int] = Field(default=None, primary_key=True) + attacker_uuid: str = Field(foreign_key="attackers.uuid", index=True) + domain: str = Field(index=True) + first_seen: datetime = Field(default_factory=lambda: datetime.now(timezone.utc)) + last_seen: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), index=True + ) + # Aggregate counter — one rcpt_to / message_accepted recipient bumps this. + count: int = Field(default=1) + __table_args__ = ( + UniqueConstraint("attacker_uuid", "domain", name="uq_smtp_targets_attacker_domain"), + ) + + class AttackersResponse(BaseModel): total: int limit: int diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index a93aaf49..f781d54d 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -170,6 +170,35 @@ class BaseRepository(ABC): """Retrieve the keystroke-dynamics profile row for a session.""" pass + @abstractmethod + async def increment_smtp_target(self, attacker_uuid: str, domain: str) -> None: + """ + Record that ``attacker_uuid`` targeted ``domain`` via SMTP. + + Upserts the (attacker_uuid, domain) row: inserts with count=1 + + first_seen=now on first sight, bumps count + last_seen on every + subsequent hit. Callers must pre-normalize ``domain`` (lowercase, + local-part stripped). + """ + pass + + @abstractmethod + async def list_smtp_targets(self, attacker_uuid: str) -> list[dict[str, Any]]: + """Return SmtpTarget rows for an attacker, ordered by most-recent first.""" + pass + + @abstractmethod + async def smtp_target_seen(self, domain: str) -> dict[str, Any]: + """ + Cross-attacker aggregate for a victim domain. + + Returns ``{seen: bool, count: int, first_seen: datetime|None, + last_seen: datetime|None}``. Shaped as the federation-gossip RPC + that V2 will expose — each operator can answer "have any of your + attackers targeted this domain?" without leaking attacker identity. + """ + pass + @abstractmethod async def get_attacker_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: """Retrieve a single attacker profile by UUID.""" diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index da2d121c..e709ec52 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -35,6 +35,7 @@ from decnet.web.db.models import ( Attacker, AttackerBehavior, SessionProfile, + SmtpTarget, SwarmHost, DeckyShard, Topology, @@ -734,6 +735,63 @@ class SQLModelRepository(BaseRepository): return None return row.model_dump(mode="json") + async def increment_smtp_target(self, attacker_uuid: str, domain: str) -> None: + """Upsert an (attacker_uuid, domain) pair and bump count + last_seen. + + Read-then-write under a single session — the UNIQUE constraint on + (attacker_uuid, domain) guards against duplicate rows if the race + ever materialises; we accept the ~1ms extra round-trip in exchange + for a single dialect-portable implementation. + """ + async with self._session() as session: + result = await session.execute( + select(SmtpTarget) + .where(SmtpTarget.attacker_uuid == attacker_uuid) + .where(SmtpTarget.domain == domain) + ) + existing = result.scalar_one_or_none() + now = datetime.now(timezone.utc) + if existing: + existing.count += 1 + existing.last_seen = now + session.add(existing) + else: + session.add(SmtpTarget( + attacker_uuid=attacker_uuid, + domain=domain, + first_seen=now, + last_seen=now, + count=1, + )) + await session.commit() + + async def list_smtp_targets(self, attacker_uuid: str) -> list[dict[str, Any]]: + async with self._session() as session: + result = await session.execute( + select(SmtpTarget) + .where(SmtpTarget.attacker_uuid == attacker_uuid) + .order_by(desc(SmtpTarget.last_seen)) + ) + return [r.model_dump(mode="json") for r in result.scalars().all()] + + async def smtp_target_seen(self, domain: str) -> dict[str, Any]: + """Aggregate rows for this domain across every attacker in the DB.""" + async with self._session() as session: + result = await session.execute( + select( + func.coalesce(func.sum(SmtpTarget.count), 0), + func.min(SmtpTarget.first_seen), + func.max(SmtpTarget.last_seen), + ).where(SmtpTarget.domain == domain) + ) + total, first_seen, last_seen = result.one() + return { + "seen": int(total) > 0, + "count": int(total), + "first_seen": first_seen, + "last_seen": last_seen, + } + @staticmethod def _deserialize_attacker(d: dict[str, Any]) -> dict[str, Any]: for key in ("services", "deckies", "fingerprints", "commands"): diff --git a/tests/test_base_repo.py b/tests/test_base_repo.py index fd9be219..900b910f 100644 --- a/tests/test_base_repo.py +++ b/tests/test_base_repo.py @@ -31,6 +31,9 @@ class DummyRepo(BaseRepository): async def get_behaviors_for_ips(self, ips): await super().get_behaviors_for_ips(ips) async def upsert_session_profile(self, sid, data): await super().upsert_session_profile(sid, data) async def get_session_profile(self, sid): await super().get_session_profile(sid) + async def increment_smtp_target(self, u, d): await super().increment_smtp_target(u, d) + async def list_smtp_targets(self, u): await super().list_smtp_targets(u) + async def smtp_target_seen(self, d): await super().smtp_target_seen(d) async def get_attacker_by_uuid(self, u): await super().get_attacker_by_uuid(u) async def get_attackers(self, **kw): await super().get_attackers(**kw) async def get_total_attackers(self, **kw): await super().get_total_attackers(**kw) @@ -72,6 +75,9 @@ async def test_base_repo_coverage(): await dr.get_behaviors_for_ips({"1.1.1.1"}) await dr.upsert_session_profile("sid", {}) await dr.get_session_profile("sid") + await dr.increment_smtp_target("uuid", "corp.com") + await dr.list_smtp_targets("uuid") + await dr.smtp_target_seen("corp.com") await dr.get_attacker_by_uuid("a") await dr.get_attackers() await dr.get_total_attackers() diff --git a/tests/test_smtp_targets.py b/tests/test_smtp_targets.py new file mode 100644 index 00000000..45dd3089 --- /dev/null +++ b/tests/test_smtp_targets.py @@ -0,0 +1,160 @@ +""" +Tests for SMTP victim-domain tracking (SmtpTarget table + profiler ingestion). + +Two surfaces under test: + * Repo upsert / list / aggregate-seen helpers. + * The profiler's `_extract_smtp_domains` + `_normalize_smtp_domain` + parsers — pure functions exercised directly without running the + full worker loop. +""" +from datetime import datetime, timezone + +import pytest + +from decnet.web.db.factory import get_repository +from decnet.correlation.parser import LogEvent +from decnet.profiler.worker import _extract_smtp_domains, _normalize_smtp_domain + + +@pytest.fixture +async def repo(tmp_path): + r = get_repository(db_path=str(tmp_path / "smtp_targets.db")) + await r.initialize() + return r + + +def _smtp_event(event_type: str, **fields) -> LogEvent: + return LogEvent( + timestamp=datetime.now(timezone.utc), + decky="decky-01", + service="smtp", + event_type=event_type, + attacker_ip="1.2.3.4", + fields=fields, + raw="", + ) + + +# ── Domain normalization ───────────────────────────────────────────────────── + +@pytest.mark.parametrize("raw, expected", [ + ("", "corp1.com"), + ("JOHN@CORP1.COM", "corp1.com"), + ("", "mail.corp.io"), + # Empty / malformed → None + ("", None), + ("notanemail", None), + ("@nouser.com", None), + ("user@", None), + # Blocked TLDs + ("admin@foo.invalid", None), + ("test@bar.test", None), + ("x@local.example", None), + # Punctuation / angle-bracket forms the RCPT parser already validated + ("RCPT TO:", "d.com"), +]) +def test_normalize_smtp_domain(raw, expected): + assert _normalize_smtp_domain(raw) == expected + + +# ── Event → domain extraction ──────────────────────────────────────────────── + +def test_extract_from_rcpt_to(): + events = [ + _smtp_event("rcpt_to", value=""), + _smtp_event("rcpt_to", value=""), + ] + assert _extract_smtp_domains(events) == {"target.com", "other.com"} + + +def test_extract_from_rcpt_denied(): + events = [_smtp_event("rcpt_denied", value="")] + assert _extract_smtp_domains(events) == {"corp.net"} + + +def test_extract_from_message_accepted_splits_recipients(): + """`message_accepted.rcpt_to` is a comma-joined list, not a single addr.""" + events = [_smtp_event( + "message_accepted", + rcpt_to=",,", + mail_from="", + )] + assert _extract_smtp_domains(events) == {"one.com", "two.com"} + + +def test_extract_ignores_non_smtp_events(): + """Identical `value` fields on non-smtp services must not leak in.""" + events = [ + LogEvent( + timestamp=datetime.now(timezone.utc), + decky="decky-01", service="ssh", event_type="rcpt_to", + attacker_ip="1.2.3.4", + fields={"value": ""}, raw="", + ), + ] + assert _extract_smtp_domains(events) == set() + + +def test_extract_dedupes_within_batch(): + events = [ + _smtp_event("rcpt_to", value=""), + _smtp_event("rcpt_to", value=""), + _smtp_event("rcpt_to", value=""), + ] + assert _extract_smtp_domains(events) == {"corp.com"} + + +# ── Repo: increment + list + seen ──────────────────────────────────────────── + +@pytest.mark.anyio +async def test_increment_creates_then_bumps(repo): + await repo.increment_smtp_target("uuid-1", "corp.com") + rows = await repo.list_smtp_targets("uuid-1") + assert len(rows) == 1 + assert rows[0]["domain"] == "corp.com" + assert rows[0]["count"] == 1 + first_seen_1 = rows[0]["first_seen"] + + # Second hit bumps count + last_seen, preserves first_seen. + await repo.increment_smtp_target("uuid-1", "corp.com") + rows = await repo.list_smtp_targets("uuid-1") + assert rows[0]["count"] == 2 + assert rows[0]["first_seen"] == first_seen_1 + + +@pytest.mark.anyio +async def test_increment_isolates_per_attacker(repo): + await repo.increment_smtp_target("uuid-a", "corp.com") + await repo.increment_smtp_target("uuid-b", "corp.com") + assert len(await repo.list_smtp_targets("uuid-a")) == 1 + assert len(await repo.list_smtp_targets("uuid-b")) == 1 + + +@pytest.mark.anyio +async def test_list_orders_by_last_seen_desc(repo): + await repo.increment_smtp_target("uuid-1", "older.com") + await repo.increment_smtp_target("uuid-1", "newer.com") + rows = await repo.list_smtp_targets("uuid-1") + # Second call (newer.com) has a later last_seen → first row. + assert [r["domain"] for r in rows] == ["newer.com", "older.com"] + + +@pytest.mark.anyio +async def test_smtp_target_seen_aggregates_across_attackers(repo): + await repo.increment_smtp_target("uuid-a", "corp.com") + await repo.increment_smtp_target("uuid-a", "corp.com") + await repo.increment_smtp_target("uuid-b", "corp.com") + agg = await repo.smtp_target_seen("corp.com") + assert agg["seen"] is True + assert agg["count"] == 3 # 2 + 1 + assert agg["first_seen"] is not None + assert agg["last_seen"] is not None + + +@pytest.mark.anyio +async def test_smtp_target_seen_unknown_domain(repo): + agg = await repo.smtp_target_seen("never-targeted.org") + assert agg["seen"] is False + assert agg["count"] == 0 + assert agg["first_seen"] is None + assert agg["last_seen"] is None