diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index 5afbc363..528933e2 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -54,6 +54,7 @@ SYSTEM = "system" CREDENTIAL = "credential" ORCHESTRATOR = "orchestrator" CANARY = "canary" +SMTP = "smtp" # ─── Leaf event-type constants (the last segment of each topic) ────────────── @@ -394,6 +395,16 @@ def system_control(worker: str) -> str: return f"{SYSTEM}.{worker}.{SYSTEM_CONTROL}" +def smtp(event_type: str) -> str: + """Build ``smtp.``. + + *event_type* may contain dots (e.g. ``probe.pending``). + """ + if not event_type: + raise ValueError("smtp topic requires a non-empty event_type") + return f"{SMTP}.{event_type}" + + def _reject_tokens(*parts: str) -> None: """Reject topic segments that would break NATS-style tokenization. diff --git a/decnet/orchestrator/drivers/smtp_relay.py b/decnet/orchestrator/drivers/smtp_relay.py new file mode 100644 index 00000000..6dd3921a --- /dev/null +++ b/decnet/orchestrator/drivers/smtp_relay.py @@ -0,0 +1,58 @@ +"""SMTP probe-relay driver. + +Forwards the attacker's first probe email via the master's real internet +connection. The smtp_relay decky runs on MACVLAN and has no gateway access; +the master (where this worker runs) does. + +Called by the realism worker's smtp probe listener, not the main tick loop. +""" +from __future__ import annotations + +import smtplib +from pathlib import Path +from typing import Any + +_ARTIFACTS_ROOT_DEFAULT = "/var/lib/decnet/artifacts" + + +def forward_probe( + *, + svc_cfg: dict[str, Any], + stored_as: str, + decky_name: str, + mail_from: str, + rcpt_to: list[str], + artifacts_root: str = _ARTIFACTS_ROOT_DEFAULT, +) -> tuple[bool, str]: + """Read the .eml from disk and forward it via the upstream relay. + + Returns (True, "") on success or (False, reason) on failure. + Always safe to call in a thread — uses only blocking I/O. + """ + upstream_host = (svc_cfg.get("upstream_host") or "").strip() + if not upstream_host: + return False, "upstream_host not configured" + + eml_path = Path(artifacts_root) / decky_name / "smtp" / stored_as + try: + body = eml_path.read_bytes() + except OSError as exc: + return False, f"cannot read eml: {exc}" + + if not rcpt_to: + return False, "no recipients" + + upstream_port = int(svc_cfg.get("upstream_port") or 25) + upstream_user = (svc_cfg.get("upstream_user") or "").strip() + upstream_pass = (svc_cfg.get("upstream_pass") or "").strip() + envelope_from = (svc_cfg.get("upstream_sender") or "").strip() or mail_from + + try: + with smtplib.SMTP(upstream_host, upstream_port, timeout=15) as conn: + conn.ehlo() + if upstream_user and upstream_pass: + conn.login(upstream_user, upstream_pass) + conn.sendmail(envelope_from, rcpt_to, body) + return True, "" + except Exception as exc: + return False, str(exc)[:256] diff --git a/decnet/orchestrator/worker.py b/decnet/orchestrator/worker.py index e016aaf6..1dc6ce52 100644 --- a/decnet/orchestrator/worker.py +++ b/decnet/orchestrator/worker.py @@ -25,6 +25,7 @@ import secrets from datetime import datetime, timezone from typing import Any, Optional +from decnet.bus import topics as _topics from decnet.bus.factory import get_bus from decnet.bus.publish import ( publish_safely, @@ -34,6 +35,7 @@ from decnet.bus.publish import ( from decnet.logging import get_logger from decnet.orchestrator import events, scheduler from decnet.orchestrator.drivers import get_driver_for +from decnet.orchestrator.drivers.smtp_relay import forward_probe from decnet.orchestrator.emailgen import ( events as email_events, scheduler as email_scheduler, @@ -138,6 +140,9 @@ async def orchestrator_worker( control_task = asyncio.create_task( run_control_listener(bus, "orchestrator", shutdown), ) + probe_task = asyncio.create_task( + _run_smtp_probe_listener(repo, shutdown), + ) tick_n = 0 try: while not shutdown.is_set(): @@ -157,7 +162,7 @@ async def orchestrator_worker( if tick_n % _REALISM_CONFIG_REFRESH_TICKS == 0: await _refresh_realism_config(repo) finally: - for t in (heartbeat_task, control_task): + for t in (heartbeat_task, control_task, probe_task): t.cancel() with contextlib.suppress(Exception, asyncio.CancelledError): await t @@ -467,6 +472,108 @@ async def _bump_synthetic_file_after_edit(repo, action, result) -> None: await repo.update_synthetic_file(action.synthetic_file_uuid, patch) +async def _run_smtp_probe_listener( + repo: BaseRepository, + shutdown: asyncio.Event, +) -> None: + """Subscribe to smtp.probe.pending and forward probe emails upstream. + + Runs as a long-lived subtask alongside the tick loop. When a probe lands + we check if this (attacker_ip, decky) has already been forwarded up to + probe_limit times — if not, forward via the master's real internet + connection and store a probe_relay bounty with the result. + """ + bus = None + try: + bus = get_bus(client_name="orchestrator-probe") + await bus.connect() + await bus.subscribe(_topics.smtp("probe.pending")) + except Exception as exc: # noqa: BLE001 + logger.warning("smtp probe listener: bus unavailable: %s", exc) + return + + try: + while not shutdown.is_set(): + try: + msg = await asyncio.wait_for(bus.next_message(), timeout=5.0) + except asyncio.TimeoutError: + continue + except Exception as exc: # noqa: BLE001 + logger.debug("smtp probe listener: recv error: %s", exc) + continue + if msg is None: + continue + try: + await _handle_probe_pending(repo, msg.get("payload") or msg) + except Exception as exc: # noqa: BLE001 + logger.warning("smtp probe listener: handle error: %s", exc) + finally: + with contextlib.suppress(Exception): + await bus.close() + + +async def _handle_probe_pending(repo: BaseRepository, payload: dict) -> None: + decky_name = (payload.get("decky") or "").strip() + attacker_ip = (payload.get("attacker_ip") or "").strip() + stored_as = (payload.get("stored_as") or "").strip() + mail_from = (payload.get("mail_from") or "").strip() + rcpt_to_raw = (payload.get("rcpt_to") or "").strip() + + if not (decky_name and attacker_ip and stored_as): + return + + decky_row = await repo.get_fleet_decky_by_name(decky_name) + if not decky_row: + return + svc_cfg = ( + (decky_row.get("decky_config") or {}) + .get("service_config", {}) + .get("smtp_relay") or {} + ) + if not (svc_cfg.get("upstream_host") or "").strip(): + return + + probe_limit = int(svc_cfg.get("probe_limit") or 1) + already_sent = await repo.count_probe_relays(attacker_ip, decky_name) + if already_sent >= probe_limit: + return + + rcpt_to = [r.strip() for r in rcpt_to_raw.split(",") if r.strip()] + artifacts_root = os.environ.get("DECNET_ARTIFACTS_ROOT", "/var/lib/decnet/artifacts") + + loop = asyncio.get_event_loop() + ok, reason = await loop.run_in_executor( + None, + lambda: forward_probe( + svc_cfg=svc_cfg, + stored_as=stored_as, + decky_name=decky_name, + mail_from=mail_from, + rcpt_to=rcpt_to, + artifacts_root=artifacts_root, + ), + ) + + await repo.add_bounty({ + "decky": decky_name, + "service": "smtp_relay", + "attacker_ip": attacker_ip, + "bounty_type": "probe_relay", + "payload": { + "stored_as": stored_as, + "forwarded": ok, + **({"fwd_error": reason} if not ok else {}), + }, + }) + if ok: + logger.info("smtp probe forwarded decky=%s ip=%s", decky_name, attacker_ip) + else: + logger.warning( + "smtp probe forward failed decky=%s ip=%s error=%s", + decky_name, attacker_ip, reason, + ) + + async def _record_synthetic_file(repo, action) -> None: """Persist (or patch) a synthetic_files row after a FileAction plant. diff --git a/decnet/services/smtp_relay.py b/decnet/services/smtp_relay.py index 77d379ae..cc185ce6 100644 --- a/decnet/services/smtp_relay.py +++ b/decnet/services/smtp_relay.py @@ -102,18 +102,6 @@ class SMTPRelayService(BaseService): fragment["environment"]["SMTP_BANNER"] = cfg["banner"] if "mta" in cfg: fragment["environment"]["SMTP_MTA"] = cfg["mta"] - if "upstream_host" in cfg: - fragment["environment"]["SMTP_UPSTREAM_HOST"] = cfg["upstream_host"] - if "upstream_port" in cfg: - fragment["environment"]["SMTP_UPSTREAM_PORT"] = str(cfg["upstream_port"]) - if "upstream_user" in cfg: - fragment["environment"]["SMTP_UPSTREAM_USER"] = cfg["upstream_user"] - if "upstream_pass" in cfg: - fragment["environment"]["SMTP_UPSTREAM_PASS"] = cfg["upstream_pass"] - if "upstream_sender" in cfg: - fragment["environment"]["SMTP_UPSTREAM_SENDER"] = cfg["upstream_sender"] - if "probe_limit" in cfg: - fragment["environment"]["SMTP_PROBE_LIMIT"] = str(cfg["probe_limit"]) return fragment def dockerfile_context(self) -> Path: diff --git a/decnet/web/db/sqlmodel_repo/bounties.py b/decnet/web/db/sqlmodel_repo/bounties.py index 00c3880d..0704c6e7 100644 --- a/decnet/web/db/sqlmodel_repo/bounties.py +++ b/decnet/web/db/sqlmodel_repo/bounties.py @@ -137,3 +137,15 @@ class BountiesMixin: pass grouped[item.attacker_ip].append(d) return dict(grouped) + + async def count_probe_relays(self, attacker_ip: str, decky: str) -> int: + """Return how many probe_relay bounties exist for this (attacker_ip, decky) pair.""" + async with self._session() as session: + result = await session.execute( + select(func.count()).select_from(Bounty).where( + Bounty.attacker_ip == attacker_ip, + Bounty.decky == decky, + Bounty.bounty_type == "probe_relay", + ) + ) + return result.scalar() or 0 diff --git a/decnet/web/db/sqlmodel_repo/fleet.py b/decnet/web/db/sqlmodel_repo/fleet.py index 60eef39f..f2fe3e66 100644 --- a/decnet/web/db/sqlmodel_repo/fleet.py +++ b/decnet/web/db/sqlmodel_repo/fleet.py @@ -59,6 +59,16 @@ class FleetMixin: ) await session.commit() + async def get_fleet_decky_by_name(self, name: str) -> dict[str, Any] | None: + async with self._session() as session: + result = await session.execute( + select(FleetDecky).where(FleetDecky.name == name) + ) + row = result.scalar_one_or_none() + if row is None: + return None + return _deserialize_json_fields(row.model_dump(mode="json"), ("services", "decky_config")) + async def list_fleet_deckies( self, *, host_uuid: Optional[str] = None, ) -> list[dict[str, Any]]: diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py index 7a89e0f5..bbf62482 100644 --- a/decnet/web/ingester.py +++ b/decnet/web/ingester.py @@ -613,21 +613,32 @@ async def _extract_bounty( "content_type": _fields.get("content_type"), }, }) - elif _evt == "probe_forwarded": - # Record whether the upstream relay accepted the probe. forwarded=1 - # means the attacker's test email actually landed in their inbox; - # forwarded=0 means the upstream refused (attacker still got 250). - await repo.add_bounty({ - "decky": log_data.get("decky"), - "service": log_data.get("service"), - "attacker_ip": log_data.get("attacker_ip"), - "bounty_type": "probe_relay", - "payload": { - "msg_id": _fields.get("msg_id"), - "forwarded": _fields.get("forwarded") == "1", - "delivery_count": _fields.get("delivery_count"), + # Signal the realism worker to forward this as a probe if it's the + # first message from this IP on an smtp_relay decky. The worker has + # real internet access (the container is on MACVLAN and doesn't). + if log_data.get("service") == "smtp_relay": + await _publish_probe_pending(log_data, _fields) + + +async def _publish_probe_pending(log_data: dict, fields: dict) -> None: + try: + bus = get_bus(client_name="ingester-probe") + await bus.connect() + await publish_safely( + bus, + _topics.smtp("probe.pending"), + { + "decky": log_data.get("decky"), + "attacker_ip": log_data.get("attacker_ip"), + "stored_as": fields.get("stored_as"), + "mail_from": fields.get("mail_from"), + "rcpt_to": fields.get("rcpt_to"), }, - }) + event_type="probe.pending", + ) + await bus.close() + except Exception as exc: # noqa: BLE001 + logger.debug("probe pending publish failed: %s", exc) # ─── IP-leak detection (XFF / Forwarded / X-Real-IP / CDN variants) ────────── diff --git a/tests/services/test_smtp_relay.py b/tests/services/test_smtp_relay.py index d5bee50f..2442d744 100644 --- a/tests/services/test_smtp_relay.py +++ b/tests/services/test_smtp_relay.py @@ -28,7 +28,10 @@ def test_smtp_relay_dockerfile_context(): assert ctx.is_dir() -def test_smtp_relay_upstream_cfg(): +def test_smtp_relay_upstream_cfg_not_in_container_env(): + """Upstream relay config is stored in decky_config and consumed by the + realism worker — it must NOT be injected into the container environment + (credentials don't belong in container env vars).""" svc = SMTPRelayService() fragment = svc.compose_fragment( "test-decky", @@ -41,18 +44,10 @@ def test_smtp_relay_upstream_cfg(): }, ) env = fragment["environment"] - assert env["SMTP_UPSTREAM_HOST"] == "smtp.sendgrid.net" - assert env["SMTP_UPSTREAM_PORT"] == "587" - assert env["SMTP_UPSTREAM_USER"] == "apikey" - assert env["SMTP_UPSTREAM_PASS"] == "SG.secret" - assert env["SMTP_PROBE_LIMIT"] == "2" - - -def test_smtp_relay_upstream_not_set_by_default(): - svc = SMTPRelayService() - fragment = svc.compose_fragment("test-decky") - env = fragment["environment"] assert "SMTP_UPSTREAM_HOST" not in env + assert "SMTP_UPSTREAM_PORT" not in env + assert "SMTP_UPSTREAM_USER" not in env + assert "SMTP_UPSTREAM_PASS" not in env assert "SMTP_PROBE_LIMIT" not in env