feat(smtp_relay): move probe forwarding to realism worker via bus
Attacker probe emails are now forwarded by the master (realism worker) rather than inside the MACVLAN container, which has no internet gateway. - New smtp.probe.pending bus topic: ingester publishes when smtp_relay message_stored fires; worker subscribes and does the actual delivery - decnet/orchestrator/drivers/smtp_relay.py: pure-sync forward_probe() reads the .eml from disk and sends via smtplib on a thread executor - worker.py: _run_smtp_probe_listener + _handle_probe_pending subtask; limit enforced via count_probe_relays() (DB-backed, restart-safe) - bounties.py: count_probe_relays() query on probe_relay bounty type - fleet.py: get_fleet_decky_by_name() to pull service config from DB - services/smtp_relay.py: upstream_* and probe_limit fields defined in config_schema but NOT injected into container env (credentials stay out of docker env vars) - ingester.py: stripped of smtplib; publishes probe.pending and exits - tests: assert upstream keys absent from container environment
This commit is contained in:
@@ -54,6 +54,7 @@ SYSTEM = "system"
|
|||||||
CREDENTIAL = "credential"
|
CREDENTIAL = "credential"
|
||||||
ORCHESTRATOR = "orchestrator"
|
ORCHESTRATOR = "orchestrator"
|
||||||
CANARY = "canary"
|
CANARY = "canary"
|
||||||
|
SMTP = "smtp"
|
||||||
|
|
||||||
|
|
||||||
# ─── Leaf event-type constants (the last segment of each topic) ──────────────
|
# ─── Leaf event-type constants (the last segment of each topic) ──────────────
|
||||||
@@ -394,6 +395,16 @@ def system_control(worker: str) -> str:
|
|||||||
return f"{SYSTEM}.{worker}.{SYSTEM_CONTROL}"
|
return f"{SYSTEM}.{worker}.{SYSTEM_CONTROL}"
|
||||||
|
|
||||||
|
|
||||||
|
def smtp(event_type: str) -> str:
|
||||||
|
"""Build ``smtp.<event_type>``.
|
||||||
|
|
||||||
|
*event_type* may contain dots (e.g. ``probe.pending``).
|
||||||
|
"""
|
||||||
|
if not event_type:
|
||||||
|
raise ValueError("smtp topic requires a non-empty event_type")
|
||||||
|
return f"{SMTP}.{event_type}"
|
||||||
|
|
||||||
|
|
||||||
def _reject_tokens(*parts: str) -> None:
|
def _reject_tokens(*parts: str) -> None:
|
||||||
"""Reject topic segments that would break NATS-style tokenization.
|
"""Reject topic segments that would break NATS-style tokenization.
|
||||||
|
|
||||||
|
|||||||
58
decnet/orchestrator/drivers/smtp_relay.py
Normal file
58
decnet/orchestrator/drivers/smtp_relay.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
"""SMTP probe-relay driver.
|
||||||
|
|
||||||
|
Forwards the attacker's first probe email via the master's real internet
|
||||||
|
connection. The smtp_relay decky runs on MACVLAN and has no gateway access;
|
||||||
|
the master (where this worker runs) does.
|
||||||
|
|
||||||
|
Called by the realism worker's smtp probe listener, not the main tick loop.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import smtplib
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
_ARTIFACTS_ROOT_DEFAULT = "/var/lib/decnet/artifacts"
|
||||||
|
|
||||||
|
|
||||||
|
def forward_probe(
|
||||||
|
*,
|
||||||
|
svc_cfg: dict[str, Any],
|
||||||
|
stored_as: str,
|
||||||
|
decky_name: str,
|
||||||
|
mail_from: str,
|
||||||
|
rcpt_to: list[str],
|
||||||
|
artifacts_root: str = _ARTIFACTS_ROOT_DEFAULT,
|
||||||
|
) -> tuple[bool, str]:
|
||||||
|
"""Read the .eml from disk and forward it via the upstream relay.
|
||||||
|
|
||||||
|
Returns (True, "") on success or (False, reason) on failure.
|
||||||
|
Always safe to call in a thread — uses only blocking I/O.
|
||||||
|
"""
|
||||||
|
upstream_host = (svc_cfg.get("upstream_host") or "").strip()
|
||||||
|
if not upstream_host:
|
||||||
|
return False, "upstream_host not configured"
|
||||||
|
|
||||||
|
eml_path = Path(artifacts_root) / decky_name / "smtp" / stored_as
|
||||||
|
try:
|
||||||
|
body = eml_path.read_bytes()
|
||||||
|
except OSError as exc:
|
||||||
|
return False, f"cannot read eml: {exc}"
|
||||||
|
|
||||||
|
if not rcpt_to:
|
||||||
|
return False, "no recipients"
|
||||||
|
|
||||||
|
upstream_port = int(svc_cfg.get("upstream_port") or 25)
|
||||||
|
upstream_user = (svc_cfg.get("upstream_user") or "").strip()
|
||||||
|
upstream_pass = (svc_cfg.get("upstream_pass") or "").strip()
|
||||||
|
envelope_from = (svc_cfg.get("upstream_sender") or "").strip() or mail_from
|
||||||
|
|
||||||
|
try:
|
||||||
|
with smtplib.SMTP(upstream_host, upstream_port, timeout=15) as conn:
|
||||||
|
conn.ehlo()
|
||||||
|
if upstream_user and upstream_pass:
|
||||||
|
conn.login(upstream_user, upstream_pass)
|
||||||
|
conn.sendmail(envelope_from, rcpt_to, body)
|
||||||
|
return True, ""
|
||||||
|
except Exception as exc:
|
||||||
|
return False, str(exc)[:256]
|
||||||
@@ -25,6 +25,7 @@ import secrets
|
|||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
from typing import Any, Optional
|
from typing import Any, Optional
|
||||||
|
|
||||||
|
from decnet.bus import topics as _topics
|
||||||
from decnet.bus.factory import get_bus
|
from decnet.bus.factory import get_bus
|
||||||
from decnet.bus.publish import (
|
from decnet.bus.publish import (
|
||||||
publish_safely,
|
publish_safely,
|
||||||
@@ -34,6 +35,7 @@ from decnet.bus.publish import (
|
|||||||
from decnet.logging import get_logger
|
from decnet.logging import get_logger
|
||||||
from decnet.orchestrator import events, scheduler
|
from decnet.orchestrator import events, scheduler
|
||||||
from decnet.orchestrator.drivers import get_driver_for
|
from decnet.orchestrator.drivers import get_driver_for
|
||||||
|
from decnet.orchestrator.drivers.smtp_relay import forward_probe
|
||||||
from decnet.orchestrator.emailgen import (
|
from decnet.orchestrator.emailgen import (
|
||||||
events as email_events,
|
events as email_events,
|
||||||
scheduler as email_scheduler,
|
scheduler as email_scheduler,
|
||||||
@@ -138,6 +140,9 @@ async def orchestrator_worker(
|
|||||||
control_task = asyncio.create_task(
|
control_task = asyncio.create_task(
|
||||||
run_control_listener(bus, "orchestrator", shutdown),
|
run_control_listener(bus, "orchestrator", shutdown),
|
||||||
)
|
)
|
||||||
|
probe_task = asyncio.create_task(
|
||||||
|
_run_smtp_probe_listener(repo, shutdown),
|
||||||
|
)
|
||||||
tick_n = 0
|
tick_n = 0
|
||||||
try:
|
try:
|
||||||
while not shutdown.is_set():
|
while not shutdown.is_set():
|
||||||
@@ -157,7 +162,7 @@ async def orchestrator_worker(
|
|||||||
if tick_n % _REALISM_CONFIG_REFRESH_TICKS == 0:
|
if tick_n % _REALISM_CONFIG_REFRESH_TICKS == 0:
|
||||||
await _refresh_realism_config(repo)
|
await _refresh_realism_config(repo)
|
||||||
finally:
|
finally:
|
||||||
for t in (heartbeat_task, control_task):
|
for t in (heartbeat_task, control_task, probe_task):
|
||||||
t.cancel()
|
t.cancel()
|
||||||
with contextlib.suppress(Exception, asyncio.CancelledError):
|
with contextlib.suppress(Exception, asyncio.CancelledError):
|
||||||
await t
|
await t
|
||||||
@@ -467,6 +472,108 @@ async def _bump_synthetic_file_after_edit(repo, action, result) -> None:
|
|||||||
await repo.update_synthetic_file(action.synthetic_file_uuid, patch)
|
await repo.update_synthetic_file(action.synthetic_file_uuid, patch)
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_smtp_probe_listener(
|
||||||
|
repo: BaseRepository,
|
||||||
|
shutdown: asyncio.Event,
|
||||||
|
) -> None:
|
||||||
|
"""Subscribe to smtp.probe.pending and forward probe emails upstream.
|
||||||
|
|
||||||
|
Runs as a long-lived subtask alongside the tick loop. When a probe lands
|
||||||
|
we check if this (attacker_ip, decky) has already been forwarded up to
|
||||||
|
probe_limit times — if not, forward via the master's real internet
|
||||||
|
connection and store a probe_relay bounty with the result.
|
||||||
|
"""
|
||||||
|
bus = None
|
||||||
|
try:
|
||||||
|
bus = get_bus(client_name="orchestrator-probe")
|
||||||
|
await bus.connect()
|
||||||
|
await bus.subscribe(_topics.smtp("probe.pending"))
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
logger.warning("smtp probe listener: bus unavailable: %s", exc)
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
while not shutdown.is_set():
|
||||||
|
try:
|
||||||
|
msg = await asyncio.wait_for(bus.next_message(), timeout=5.0)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
continue
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
logger.debug("smtp probe listener: recv error: %s", exc)
|
||||||
|
continue
|
||||||
|
if msg is None:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
await _handle_probe_pending(repo, msg.get("payload") or msg)
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
logger.warning("smtp probe listener: handle error: %s", exc)
|
||||||
|
finally:
|
||||||
|
with contextlib.suppress(Exception):
|
||||||
|
await bus.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def _handle_probe_pending(repo: BaseRepository, payload: dict) -> None:
|
||||||
|
decky_name = (payload.get("decky") or "").strip()
|
||||||
|
attacker_ip = (payload.get("attacker_ip") or "").strip()
|
||||||
|
stored_as = (payload.get("stored_as") or "").strip()
|
||||||
|
mail_from = (payload.get("mail_from") or "").strip()
|
||||||
|
rcpt_to_raw = (payload.get("rcpt_to") or "").strip()
|
||||||
|
|
||||||
|
if not (decky_name and attacker_ip and stored_as):
|
||||||
|
return
|
||||||
|
|
||||||
|
decky_row = await repo.get_fleet_decky_by_name(decky_name)
|
||||||
|
if not decky_row:
|
||||||
|
return
|
||||||
|
svc_cfg = (
|
||||||
|
(decky_row.get("decky_config") or {})
|
||||||
|
.get("service_config", {})
|
||||||
|
.get("smtp_relay") or {}
|
||||||
|
)
|
||||||
|
if not (svc_cfg.get("upstream_host") or "").strip():
|
||||||
|
return
|
||||||
|
|
||||||
|
probe_limit = int(svc_cfg.get("probe_limit") or 1)
|
||||||
|
already_sent = await repo.count_probe_relays(attacker_ip, decky_name)
|
||||||
|
if already_sent >= probe_limit:
|
||||||
|
return
|
||||||
|
|
||||||
|
rcpt_to = [r.strip() for r in rcpt_to_raw.split(",") if r.strip()]
|
||||||
|
artifacts_root = os.environ.get("DECNET_ARTIFACTS_ROOT", "/var/lib/decnet/artifacts")
|
||||||
|
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
ok, reason = await loop.run_in_executor(
|
||||||
|
None,
|
||||||
|
lambda: forward_probe(
|
||||||
|
svc_cfg=svc_cfg,
|
||||||
|
stored_as=stored_as,
|
||||||
|
decky_name=decky_name,
|
||||||
|
mail_from=mail_from,
|
||||||
|
rcpt_to=rcpt_to,
|
||||||
|
artifacts_root=artifacts_root,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
await repo.add_bounty({
|
||||||
|
"decky": decky_name,
|
||||||
|
"service": "smtp_relay",
|
||||||
|
"attacker_ip": attacker_ip,
|
||||||
|
"bounty_type": "probe_relay",
|
||||||
|
"payload": {
|
||||||
|
"stored_as": stored_as,
|
||||||
|
"forwarded": ok,
|
||||||
|
**({"fwd_error": reason} if not ok else {}),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if ok:
|
||||||
|
logger.info("smtp probe forwarded decky=%s ip=%s", decky_name, attacker_ip)
|
||||||
|
else:
|
||||||
|
logger.warning(
|
||||||
|
"smtp probe forward failed decky=%s ip=%s error=%s",
|
||||||
|
decky_name, attacker_ip, reason,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
async def _record_synthetic_file(repo, action) -> None:
|
async def _record_synthetic_file(repo, action) -> None:
|
||||||
"""Persist (or patch) a synthetic_files row after a FileAction plant.
|
"""Persist (or patch) a synthetic_files row after a FileAction plant.
|
||||||
|
|
||||||
|
|||||||
@@ -102,18 +102,6 @@ class SMTPRelayService(BaseService):
|
|||||||
fragment["environment"]["SMTP_BANNER"] = cfg["banner"]
|
fragment["environment"]["SMTP_BANNER"] = cfg["banner"]
|
||||||
if "mta" in cfg:
|
if "mta" in cfg:
|
||||||
fragment["environment"]["SMTP_MTA"] = cfg["mta"]
|
fragment["environment"]["SMTP_MTA"] = cfg["mta"]
|
||||||
if "upstream_host" in cfg:
|
|
||||||
fragment["environment"]["SMTP_UPSTREAM_HOST"] = cfg["upstream_host"]
|
|
||||||
if "upstream_port" in cfg:
|
|
||||||
fragment["environment"]["SMTP_UPSTREAM_PORT"] = str(cfg["upstream_port"])
|
|
||||||
if "upstream_user" in cfg:
|
|
||||||
fragment["environment"]["SMTP_UPSTREAM_USER"] = cfg["upstream_user"]
|
|
||||||
if "upstream_pass" in cfg:
|
|
||||||
fragment["environment"]["SMTP_UPSTREAM_PASS"] = cfg["upstream_pass"]
|
|
||||||
if "upstream_sender" in cfg:
|
|
||||||
fragment["environment"]["SMTP_UPSTREAM_SENDER"] = cfg["upstream_sender"]
|
|
||||||
if "probe_limit" in cfg:
|
|
||||||
fragment["environment"]["SMTP_PROBE_LIMIT"] = str(cfg["probe_limit"])
|
|
||||||
return fragment
|
return fragment
|
||||||
|
|
||||||
def dockerfile_context(self) -> Path:
|
def dockerfile_context(self) -> Path:
|
||||||
|
|||||||
@@ -137,3 +137,15 @@ class BountiesMixin:
|
|||||||
pass
|
pass
|
||||||
grouped[item.attacker_ip].append(d)
|
grouped[item.attacker_ip].append(d)
|
||||||
return dict(grouped)
|
return dict(grouped)
|
||||||
|
|
||||||
|
async def count_probe_relays(self, attacker_ip: str, decky: str) -> int:
|
||||||
|
"""Return how many probe_relay bounties exist for this (attacker_ip, decky) pair."""
|
||||||
|
async with self._session() as session:
|
||||||
|
result = await session.execute(
|
||||||
|
select(func.count()).select_from(Bounty).where(
|
||||||
|
Bounty.attacker_ip == attacker_ip,
|
||||||
|
Bounty.decky == decky,
|
||||||
|
Bounty.bounty_type == "probe_relay",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return result.scalar() or 0
|
||||||
|
|||||||
@@ -59,6 +59,16 @@ class FleetMixin:
|
|||||||
)
|
)
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
|
async def get_fleet_decky_by_name(self, name: str) -> dict[str, Any] | None:
|
||||||
|
async with self._session() as session:
|
||||||
|
result = await session.execute(
|
||||||
|
select(FleetDecky).where(FleetDecky.name == name)
|
||||||
|
)
|
||||||
|
row = result.scalar_one_or_none()
|
||||||
|
if row is None:
|
||||||
|
return None
|
||||||
|
return _deserialize_json_fields(row.model_dump(mode="json"), ("services", "decky_config"))
|
||||||
|
|
||||||
async def list_fleet_deckies(
|
async def list_fleet_deckies(
|
||||||
self, *, host_uuid: Optional[str] = None,
|
self, *, host_uuid: Optional[str] = None,
|
||||||
) -> list[dict[str, Any]]:
|
) -> list[dict[str, Any]]:
|
||||||
|
|||||||
@@ -613,21 +613,32 @@ async def _extract_bounty(
|
|||||||
"content_type": _fields.get("content_type"),
|
"content_type": _fields.get("content_type"),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
elif _evt == "probe_forwarded":
|
# Signal the realism worker to forward this as a probe if it's the
|
||||||
# Record whether the upstream relay accepted the probe. forwarded=1
|
# first message from this IP on an smtp_relay decky. The worker has
|
||||||
# means the attacker's test email actually landed in their inbox;
|
# real internet access (the container is on MACVLAN and doesn't).
|
||||||
# forwarded=0 means the upstream refused (attacker still got 250).
|
if log_data.get("service") == "smtp_relay":
|
||||||
await repo.add_bounty({
|
await _publish_probe_pending(log_data, _fields)
|
||||||
"decky": log_data.get("decky"),
|
|
||||||
"service": log_data.get("service"),
|
|
||||||
"attacker_ip": log_data.get("attacker_ip"),
|
async def _publish_probe_pending(log_data: dict, fields: dict) -> None:
|
||||||
"bounty_type": "probe_relay",
|
try:
|
||||||
"payload": {
|
bus = get_bus(client_name="ingester-probe")
|
||||||
"msg_id": _fields.get("msg_id"),
|
await bus.connect()
|
||||||
"forwarded": _fields.get("forwarded") == "1",
|
await publish_safely(
|
||||||
"delivery_count": _fields.get("delivery_count"),
|
bus,
|
||||||
|
_topics.smtp("probe.pending"),
|
||||||
|
{
|
||||||
|
"decky": log_data.get("decky"),
|
||||||
|
"attacker_ip": log_data.get("attacker_ip"),
|
||||||
|
"stored_as": fields.get("stored_as"),
|
||||||
|
"mail_from": fields.get("mail_from"),
|
||||||
|
"rcpt_to": fields.get("rcpt_to"),
|
||||||
},
|
},
|
||||||
})
|
event_type="probe.pending",
|
||||||
|
)
|
||||||
|
await bus.close()
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
logger.debug("probe pending publish failed: %s", exc)
|
||||||
|
|
||||||
|
|
||||||
# ─── IP-leak detection (XFF / Forwarded / X-Real-IP / CDN variants) ──────────
|
# ─── IP-leak detection (XFF / Forwarded / X-Real-IP / CDN variants) ──────────
|
||||||
|
|||||||
@@ -28,7 +28,10 @@ def test_smtp_relay_dockerfile_context():
|
|||||||
assert ctx.is_dir()
|
assert ctx.is_dir()
|
||||||
|
|
||||||
|
|
||||||
def test_smtp_relay_upstream_cfg():
|
def test_smtp_relay_upstream_cfg_not_in_container_env():
|
||||||
|
"""Upstream relay config is stored in decky_config and consumed by the
|
||||||
|
realism worker — it must NOT be injected into the container environment
|
||||||
|
(credentials don't belong in container env vars)."""
|
||||||
svc = SMTPRelayService()
|
svc = SMTPRelayService()
|
||||||
fragment = svc.compose_fragment(
|
fragment = svc.compose_fragment(
|
||||||
"test-decky",
|
"test-decky",
|
||||||
@@ -41,18 +44,10 @@ def test_smtp_relay_upstream_cfg():
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
env = fragment["environment"]
|
env = fragment["environment"]
|
||||||
assert env["SMTP_UPSTREAM_HOST"] == "smtp.sendgrid.net"
|
|
||||||
assert env["SMTP_UPSTREAM_PORT"] == "587"
|
|
||||||
assert env["SMTP_UPSTREAM_USER"] == "apikey"
|
|
||||||
assert env["SMTP_UPSTREAM_PASS"] == "SG.secret"
|
|
||||||
assert env["SMTP_PROBE_LIMIT"] == "2"
|
|
||||||
|
|
||||||
|
|
||||||
def test_smtp_relay_upstream_not_set_by_default():
|
|
||||||
svc = SMTPRelayService()
|
|
||||||
fragment = svc.compose_fragment("test-decky")
|
|
||||||
env = fragment["environment"]
|
|
||||||
assert "SMTP_UPSTREAM_HOST" not in env
|
assert "SMTP_UPSTREAM_HOST" not in env
|
||||||
|
assert "SMTP_UPSTREAM_PORT" not in env
|
||||||
|
assert "SMTP_UPSTREAM_USER" not in env
|
||||||
|
assert "SMTP_UPSTREAM_PASS" not in env
|
||||||
assert "SMTP_PROBE_LIMIT" not in env
|
assert "SMTP_PROBE_LIMIT" not in env
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user