feat(profiler): track SMTP victim domains per attacker

New SmtpTarget table records each (attacker, domain) pair observed via
the SMTP honeypots. Only the domain is stored — local-parts are dropped
at ingestion, so this table holds no user-identifying data beyond the
target organisation's identity.

The profiler worker extracts domains from rcpt_to / rcpt_denied /
message_accepted events, normalizes them (lowercase, strip local-part,
drop blocked TLDs), and upserts one row per pair with a running count +
first_seen / last_seen.

Three repo methods shipped:
  * increment_smtp_target(attacker, domain) — upsert + bump
  * list_smtp_targets(attacker) — per-attacker view
  * smtp_target_seen(domain) — cross-attacker aggregate, shaped as the
    federation-gossip RPC that V2 will expose.

The gossip-query shape is load-bearing: each operator can answer
"have any of your attackers targeted corp1.com?" without leaking
which attackers or when — the aggregate returns a bool + total count
+ first/last seen, nothing else.
This commit is contained in:
2026-04-22 22:23:27 -04:00
parent c50448995b
commit d43303251d
7 changed files with 360 additions and 1 deletions

View File

@@ -15,6 +15,7 @@ from __future__ import annotations
import asyncio
import contextlib
import json
import re
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Callable
@@ -47,6 +48,18 @@ _COMMAND_EVENT_TYPES = frozenset({
# Fields that carry the executed command/query text
_COMMAND_FIELDS = ("command", "query", "input", "line", "sql", "cmd")
# SMTP events that carry a recipient email address. `rcpt_to` fires once per
# accepted RCPT (open-relay mode), `rcpt_denied` once per denied RCPT
# (harvester mode). `message_accepted` carries the comma-joined rcpt list
# on the final DATA commit — covered for replay safety, though every
# address it contains already arrived via `rcpt_to` earlier in the session.
_SMTP_RCPT_EVENTS = frozenset({"rcpt_to", "rcpt_denied", "message_accepted"})
# Pseudo-TLDs we never want to report on: the RFC 6761 special-use names
# plus common lab-only values. Matching happens on the *last* label so
# `foo.example.com` is filtered but `example.corp` is not.
_BLOCKED_TLDS = frozenset({"invalid", "test", "localhost", "local", "example"})
@dataclass
class _WorkerState:
@@ -211,6 +224,17 @@ async def _update_profiles(
_span.record_exception(exc)
logger.error("attacker worker: behavior upsert failed for %s: %s", ip, exc)
# SMTP victim-domain tracking — extract domains from RCPT events
# and upsert one row per (attacker, domain) pair. Same
# soft-fail posture as the behavior rollup: errors here must
# not block the next attacker.
try:
for domain in _extract_smtp_domains(events):
await repo.increment_smtp_target(attacker_uuid, domain)
except Exception as exc:
_span.record_exception(exc)
logger.error("attacker worker: smtp target upsert failed for %s: %s", ip, exc)
def _build_record(
ip: str,
@@ -285,3 +309,53 @@ def _extract_commands_from_events(events: list[LogEvent]) -> list[dict[str, Any]
})
return commands
_SMTP_ADDR_RE = re.compile(r"<?([^\s<>@]+)@([A-Za-z0-9.-]+\.[A-Za-z]{2,})>?")
def _normalize_smtp_domain(raw: str) -> str | None:
"""Extract a lowercased domain from an envelope-address fragment.
Returns None when the input doesn't look like an email address or the
resulting TLD is on the blocklist. Local-parts (the bit before `@`)
are intentionally dropped — this table stores no user-identifying
data, only the targeted organisation's domain.
"""
if not raw:
return None
match = _SMTP_ADDR_RE.search(raw.strip())
if not match:
return None
domain = match.group(2).lower().strip(".")
if not domain:
return None
tld = domain.rsplit(".", 1)[-1]
if tld in _BLOCKED_TLDS:
return None
return domain
def _extract_smtp_domains(events: list[LogEvent]) -> set[str]:
"""Collect the set of victim domains an attacker targeted via SMTP.
Deduped at the attacker level — repeated hits on the same domain
within a single batch collapse to one upsert, and the per-row count
is bumped by ``increment_smtp_target`` on each call. The set return
type is intentional: we care about *which* domains were seen, not
the per-batch frequency (which the DB aggregates over time).
"""
domains: set[str] = set()
for event in events:
if event.service != "smtp" or event.event_type not in _SMTP_RCPT_EVENTS:
continue
if event.event_type == "message_accepted":
raw_list = event.fields.get("rcpt_to", "")
candidates = raw_list.split(",") if raw_list else []
else:
candidates = [event.fields.get("value", "")]
for candidate in candidates:
domain = _normalize_smtp_domain(candidate)
if domain:
domains.add(domain)
return domains

View File

@@ -30,6 +30,7 @@ from .attackers import (
AttackerBehavior,
AttackersResponse,
SessionProfile,
SmtpTarget,
)
from .deploy import (
DeployIniRequest,
@@ -137,6 +138,7 @@ __all__ = [
"AttackerBehavior",
"AttackersResponse",
"SessionProfile",
"SmtpTarget",
# deploy
"DeployIniRequest",
"MutateIntervalRequest",

View File

@@ -3,7 +3,7 @@ from datetime import datetime, timezone
from typing import Any, List, Optional
from pydantic import BaseModel
from sqlalchemy import Column, Text
from sqlalchemy import Column, Text, UniqueConstraint
from sqlmodel import Field, SQLModel
from ._base import _BIG_TEXT
@@ -143,6 +143,36 @@ class SessionProfile(SQLModel, table=True):
)
class SmtpTarget(SQLModel, table=True):
"""
Per-attacker list of victim domains observed via the SMTP honeypots.
Each row is one (attacker_uuid, domain) pair — an attacker who relays
mail to 500 addresses at acme.com collapses into a single row with
count=500. Only the *domain* is stored; local-parts (the bit before
`@`) are dropped at ingestion, so this table contains no PII beyond
the target organisation's identity.
Shape is designed for future V2 federation gossip: the
`smtp_target_seen(domain)` query returns aggregate counts with zero
cross-org attacker leakage — each operator can answer "have you seen
this domain being targeted?" without exposing *which* attackers did.
"""
__tablename__ = "smtp_targets"
id: Optional[int] = Field(default=None, primary_key=True)
attacker_uuid: str = Field(foreign_key="attackers.uuid", index=True)
domain: str = Field(index=True)
first_seen: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
last_seen: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True
)
# Aggregate counter — one rcpt_to / message_accepted recipient bumps this.
count: int = Field(default=1)
__table_args__ = (
UniqueConstraint("attacker_uuid", "domain", name="uq_smtp_targets_attacker_domain"),
)
class AttackersResponse(BaseModel):
total: int
limit: int

View File

@@ -170,6 +170,35 @@ class BaseRepository(ABC):
"""Retrieve the keystroke-dynamics profile row for a session."""
pass
@abstractmethod
async def increment_smtp_target(self, attacker_uuid: str, domain: str) -> None:
"""
Record that ``attacker_uuid`` targeted ``domain`` via SMTP.
Upserts the (attacker_uuid, domain) row: inserts with count=1 +
first_seen=now on first sight, bumps count + last_seen on every
subsequent hit. Callers must pre-normalize ``domain`` (lowercase,
local-part stripped).
"""
pass
@abstractmethod
async def list_smtp_targets(self, attacker_uuid: str) -> list[dict[str, Any]]:
"""Return SmtpTarget rows for an attacker, ordered by most-recent first."""
pass
@abstractmethod
async def smtp_target_seen(self, domain: str) -> dict[str, Any]:
"""
Cross-attacker aggregate for a victim domain.
Returns ``{seen: bool, count: int, first_seen: datetime|None,
last_seen: datetime|None}``. Shaped as the federation-gossip RPC
that V2 will expose — each operator can answer "have any of your
attackers targeted this domain?" without leaking attacker identity.
"""
pass
@abstractmethod
async def get_attacker_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]:
"""Retrieve a single attacker profile by UUID."""

View File

@@ -35,6 +35,7 @@ from decnet.web.db.models import (
Attacker,
AttackerBehavior,
SessionProfile,
SmtpTarget,
SwarmHost,
DeckyShard,
Topology,
@@ -734,6 +735,63 @@ class SQLModelRepository(BaseRepository):
return None
return row.model_dump(mode="json")
async def increment_smtp_target(self, attacker_uuid: str, domain: str) -> None:
"""Upsert an (attacker_uuid, domain) pair and bump count + last_seen.
Read-then-write under a single session — the UNIQUE constraint on
(attacker_uuid, domain) guards against duplicate rows if the race
ever materialises; we accept the ~1ms extra round-trip in exchange
for a single dialect-portable implementation.
"""
async with self._session() as session:
result = await session.execute(
select(SmtpTarget)
.where(SmtpTarget.attacker_uuid == attacker_uuid)
.where(SmtpTarget.domain == domain)
)
existing = result.scalar_one_or_none()
now = datetime.now(timezone.utc)
if existing:
existing.count += 1
existing.last_seen = now
session.add(existing)
else:
session.add(SmtpTarget(
attacker_uuid=attacker_uuid,
domain=domain,
first_seen=now,
last_seen=now,
count=1,
))
await session.commit()
async def list_smtp_targets(self, attacker_uuid: str) -> list[dict[str, Any]]:
async with self._session() as session:
result = await session.execute(
select(SmtpTarget)
.where(SmtpTarget.attacker_uuid == attacker_uuid)
.order_by(desc(SmtpTarget.last_seen))
)
return [r.model_dump(mode="json") for r in result.scalars().all()]
async def smtp_target_seen(self, domain: str) -> dict[str, Any]:
"""Aggregate rows for this domain across every attacker in the DB."""
async with self._session() as session:
result = await session.execute(
select(
func.coalesce(func.sum(SmtpTarget.count), 0),
func.min(SmtpTarget.first_seen),
func.max(SmtpTarget.last_seen),
).where(SmtpTarget.domain == domain)
)
total, first_seen, last_seen = result.one()
return {
"seen": int(total) > 0,
"count": int(total),
"first_seen": first_seen,
"last_seen": last_seen,
}
@staticmethod
def _deserialize_attacker(d: dict[str, Any]) -> dict[str, Any]:
for key in ("services", "deckies", "fingerprints", "commands"):

View File

@@ -31,6 +31,9 @@ class DummyRepo(BaseRepository):
async def get_behaviors_for_ips(self, ips): await super().get_behaviors_for_ips(ips)
async def upsert_session_profile(self, sid, data): await super().upsert_session_profile(sid, data)
async def get_session_profile(self, sid): await super().get_session_profile(sid)
async def increment_smtp_target(self, u, d): await super().increment_smtp_target(u, d)
async def list_smtp_targets(self, u): await super().list_smtp_targets(u)
async def smtp_target_seen(self, d): await super().smtp_target_seen(d)
async def get_attacker_by_uuid(self, u): await super().get_attacker_by_uuid(u)
async def get_attackers(self, **kw): await super().get_attackers(**kw)
async def get_total_attackers(self, **kw): await super().get_total_attackers(**kw)
@@ -72,6 +75,9 @@ async def test_base_repo_coverage():
await dr.get_behaviors_for_ips({"1.1.1.1"})
await dr.upsert_session_profile("sid", {})
await dr.get_session_profile("sid")
await dr.increment_smtp_target("uuid", "corp.com")
await dr.list_smtp_targets("uuid")
await dr.smtp_target_seen("corp.com")
await dr.get_attacker_by_uuid("a")
await dr.get_attackers()
await dr.get_total_attackers()

160
tests/test_smtp_targets.py Normal file
View File

@@ -0,0 +1,160 @@
"""
Tests for SMTP victim-domain tracking (SmtpTarget table + profiler ingestion).
Two surfaces under test:
* Repo upsert / list / aggregate-seen helpers.
* The profiler's `_extract_smtp_domains` + `_normalize_smtp_domain`
parsers — pure functions exercised directly without running the
full worker loop.
"""
from datetime import datetime, timezone
import pytest
from decnet.web.db.factory import get_repository
from decnet.correlation.parser import LogEvent
from decnet.profiler.worker import _extract_smtp_domains, _normalize_smtp_domain
@pytest.fixture
async def repo(tmp_path):
r = get_repository(db_path=str(tmp_path / "smtp_targets.db"))
await r.initialize()
return r
def _smtp_event(event_type: str, **fields) -> LogEvent:
return LogEvent(
timestamp=datetime.now(timezone.utc),
decky="decky-01",
service="smtp",
event_type=event_type,
attacker_ip="1.2.3.4",
fields=fields,
raw="",
)
# ── Domain normalization ─────────────────────────────────────────────────────
@pytest.mark.parametrize("raw, expected", [
("<john@corp1.com>", "corp1.com"),
("JOHN@CORP1.COM", "corp1.com"),
("<alice@mail.corp.io>", "mail.corp.io"),
# Empty / malformed → None
("", None),
("notanemail", None),
("@nouser.com", None),
("user@", None),
# Blocked TLDs
("admin@foo.invalid", None),
("test@bar.test", None),
("x@local.example", None),
# Punctuation / angle-bracket forms the RCPT parser already validated
("RCPT TO:<c@d.com>", "d.com"),
])
def test_normalize_smtp_domain(raw, expected):
assert _normalize_smtp_domain(raw) == expected
# ── Event → domain extraction ────────────────────────────────────────────────
def test_extract_from_rcpt_to():
events = [
_smtp_event("rcpt_to", value="<bob@target.com>"),
_smtp_event("rcpt_to", value="<alice@other.com>"),
]
assert _extract_smtp_domains(events) == {"target.com", "other.com"}
def test_extract_from_rcpt_denied():
events = [_smtp_event("rcpt_denied", value="<carol@corp.net>")]
assert _extract_smtp_domains(events) == {"corp.net"}
def test_extract_from_message_accepted_splits_recipients():
"""`message_accepted.rcpt_to` is a comma-joined list, not a single addr."""
events = [_smtp_event(
"message_accepted",
rcpt_to="<a@one.com>,<b@two.com>,<c@one.com>",
mail_from="<spam@evil.com>",
)]
assert _extract_smtp_domains(events) == {"one.com", "two.com"}
def test_extract_ignores_non_smtp_events():
"""Identical `value` fields on non-smtp services must not leak in."""
events = [
LogEvent(
timestamp=datetime.now(timezone.utc),
decky="decky-01", service="ssh", event_type="rcpt_to",
attacker_ip="1.2.3.4",
fields={"value": "<x@wrong.com>"}, raw="",
),
]
assert _extract_smtp_domains(events) == set()
def test_extract_dedupes_within_batch():
events = [
_smtp_event("rcpt_to", value="<a@corp.com>"),
_smtp_event("rcpt_to", value="<b@corp.com>"),
_smtp_event("rcpt_to", value="<c@corp.com>"),
]
assert _extract_smtp_domains(events) == {"corp.com"}
# ── Repo: increment + list + seen ────────────────────────────────────────────
@pytest.mark.anyio
async def test_increment_creates_then_bumps(repo):
await repo.increment_smtp_target("uuid-1", "corp.com")
rows = await repo.list_smtp_targets("uuid-1")
assert len(rows) == 1
assert rows[0]["domain"] == "corp.com"
assert rows[0]["count"] == 1
first_seen_1 = rows[0]["first_seen"]
# Second hit bumps count + last_seen, preserves first_seen.
await repo.increment_smtp_target("uuid-1", "corp.com")
rows = await repo.list_smtp_targets("uuid-1")
assert rows[0]["count"] == 2
assert rows[0]["first_seen"] == first_seen_1
@pytest.mark.anyio
async def test_increment_isolates_per_attacker(repo):
await repo.increment_smtp_target("uuid-a", "corp.com")
await repo.increment_smtp_target("uuid-b", "corp.com")
assert len(await repo.list_smtp_targets("uuid-a")) == 1
assert len(await repo.list_smtp_targets("uuid-b")) == 1
@pytest.mark.anyio
async def test_list_orders_by_last_seen_desc(repo):
await repo.increment_smtp_target("uuid-1", "older.com")
await repo.increment_smtp_target("uuid-1", "newer.com")
rows = await repo.list_smtp_targets("uuid-1")
# Second call (newer.com) has a later last_seen → first row.
assert [r["domain"] for r in rows] == ["newer.com", "older.com"]
@pytest.mark.anyio
async def test_smtp_target_seen_aggregates_across_attackers(repo):
await repo.increment_smtp_target("uuid-a", "corp.com")
await repo.increment_smtp_target("uuid-a", "corp.com")
await repo.increment_smtp_target("uuid-b", "corp.com")
agg = await repo.smtp_target_seen("corp.com")
assert agg["seen"] is True
assert agg["count"] == 3 # 2 + 1
assert agg["first_seen"] is not None
assert agg["last_seen"] is not None
@pytest.mark.anyio
async def test_smtp_target_seen_unknown_domain(repo):
agg = await repo.smtp_target_seen("never-targeted.org")
assert agg["seen"] is False
assert agg["count"] == 0
assert agg["first_seen"] is None
assert agg["last_seen"] is None