From 39d2077a3ab554be8be843ed1a9f1eb3acdea815 Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 18 Apr 2026 19:33:58 -0400 Subject: [PATCH] feat(swarm): syslog-over-TLS log pipeline (RFC 5425, TCP 6514) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Worker-side log_forwarder tails the local RFC 5424 log file and ships each line as an octet-counted frame to the master over mTLS. Offset is persisted in a tiny local SQLite so master outages never cause loss or duplication — reconnect resumes from the exact byte where the previous session left off. Impostor workers (cert not signed by DECNET CA) are rejected at TLS handshake. Master-side log_listener terminates mTLS on 0.0.0.0:6514, validates the client cert, extracts the peer CN as authoritative worker provenance, and appends each frame to the master's ingest log files. Attacker- controlled syslog HOSTNAME field is ignored — the CA-controlled CN is the only source of provenance. 7 tests added covering framing codec, offset persistence across reopens, end-to-end mTLS delivery, crash-resilience (offset survives restart, no duplicate shipping), and impostor-CA rejection. DECNET_SWARM_SYSLOG_PORT / DECNET_SWARM_MASTER_HOST env bindings added. --- decnet/env.py | 5 + decnet/swarm/log_forwarder.py | 293 ++++++++++++++++++++++++++++++ decnet/swarm/log_listener.py | 194 ++++++++++++++++++++ tests/swarm/test_log_forwarder.py | 282 ++++++++++++++++++++++++++++ 4 files changed, 774 insertions(+) create mode 100644 decnet/swarm/log_forwarder.py create mode 100644 decnet/swarm/log_listener.py create mode 100644 tests/swarm/test_log_forwarder.py diff --git a/decnet/env.py b/decnet/env.py index bcc5dba..a016c7a 100644 --- a/decnet/env.py +++ b/decnet/env.py @@ -77,6 +77,11 @@ DECNET_API_PORT: int = _port("DECNET_API_PORT", 8000) DECNET_JWT_SECRET: str = _require_env("DECNET_JWT_SECRET") DECNET_INGEST_LOG_FILE: str | None = os.environ.get("DECNET_INGEST_LOG_FILE", "/var/log/decnet/decnet.log") +# SWARM log pipeline — RFC 5425 syslog-over-TLS between worker forwarders +# and the master listener. Plaintext syslog across hosts is forbidden. +DECNET_SWARM_SYSLOG_PORT: int = _port("DECNET_SWARM_SYSLOG_PORT", 6514) +DECNET_SWARM_MASTER_HOST: str | None = os.environ.get("DECNET_SWARM_MASTER_HOST") + # Ingester batching: how many log rows to accumulate per commit, and the # max wait (ms) before flushing a partial batch. Larger batches reduce # SQLite write-lock contention; the timeout keeps latency bounded during diff --git a/decnet/swarm/log_forwarder.py b/decnet/swarm/log_forwarder.py new file mode 100644 index 0000000..0a87343 --- /dev/null +++ b/decnet/swarm/log_forwarder.py @@ -0,0 +1,293 @@ +"""Worker-side syslog-over-TLS forwarder (RFC 5425). + +Runs alongside the worker agent. Tails the worker's local RFC 5424 log +file (written by the existing docker-collector) and ships each line to +the master's listener on TCP 6514 using octet-counted framing over mTLS. +Persists the last-forwarded byte offset in a tiny local SQLite so a +master crash never causes loss or duplication. + +Design constraints (from the plan, non-negotiable): +* transport MUST be TLS — plaintext syslog is never acceptable between + hosts; only loopback (decky → worker-local collector) may be plaintext; +* mTLS — the listener pins the worker cert against the DECNET CA, so only + enrolled workers can push logs; +* offset persistence MUST be transactional w.r.t. the send — we only + advance the offset after ``writer.drain()`` returns without error. + +The forwarder is intentionally a standalone coroutine, not a worker +inside the agent process. That keeps ``decnet agent`` crashes from +losing the log tail, and vice versa. +""" +from __future__ import annotations + +import asyncio +import os +import pathlib +import sqlite3 +import ssl +from dataclasses import dataclass +from typing import Optional + +from decnet.logging import get_logger +from decnet.swarm import pki + +log = get_logger("swarm.forwarder") + +# RFC 5425 framing: " ". +# The message itself is a standard RFC 5424 line (no trailing newline). +_FRAME_SEP = b" " + +_INITIAL_BACKOFF = 1.0 +_MAX_BACKOFF = 30.0 + + +@dataclass(frozen=True) +class ForwarderConfig: + log_path: pathlib.Path # worker's RFC 5424 .log file + master_host: str + master_port: int = 6514 + agent_dir: pathlib.Path = pki.DEFAULT_AGENT_DIR + state_db: Optional[pathlib.Path] = None # default: agent_dir / "forwarder.db" + # Max unacked bytes to keep in the local buffer when master is down. + # We bound the lag to avoid unbounded disk growth on catastrophic master + # outage — older lines are surfaced as a warning and dropped by advancing + # the offset. + max_lag_bytes: int = 128 * 1024 * 1024 # 128 MiB + + +# ------------------------------------------------------------ offset storage + + +class _OffsetStore: + """Single-row SQLite offset tracker. Stdlib only — no ORM, no async.""" + + def __init__(self, db_path: pathlib.Path) -> None: + db_path.parent.mkdir(parents=True, exist_ok=True) + self._conn = sqlite3.connect(str(db_path)) + self._conn.execute( + "CREATE TABLE IF NOT EXISTS forwarder_offset (" + " key TEXT PRIMARY KEY, offset INTEGER NOT NULL)" + ) + self._conn.commit() + + def get(self, key: str = "default") -> int: + row = self._conn.execute( + "SELECT offset FROM forwarder_offset WHERE key=?", (key,) + ).fetchone() + return int(row[0]) if row else 0 + + def set(self, offset: int, key: str = "default") -> None: + self._conn.execute( + "INSERT INTO forwarder_offset(key, offset) VALUES(?, ?) " + "ON CONFLICT(key) DO UPDATE SET offset=excluded.offset", + (key, offset), + ) + self._conn.commit() + + def close(self) -> None: + self._conn.close() + + +# ---------------------------------------------------------------- TLS setup + + +def build_worker_ssl_context(agent_dir: pathlib.Path) -> ssl.SSLContext: + """Client-side mTLS context for the forwarder. + + Worker presents its agent bundle (same cert used for the control-plane + HTTPS listener). The CA is the DECNET CA; we pin by CA, not hostname, + because workers reach masters by operator-supplied address. + """ + bundle = pki.load_worker_bundle(agent_dir) + if bundle is None: + raise RuntimeError( + f"no worker bundle at {agent_dir} — enroll from the master first" + ) + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.load_cert_chain( + certfile=str(agent_dir / "worker.crt"), + keyfile=str(agent_dir / "worker.key"), + ) + ctx.load_verify_locations(cafile=str(agent_dir / "ca.crt")) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.check_hostname = False + return ctx + + +# ----------------------------------------------------------- frame encoding + + +def encode_frame(line: str) -> bytes: + """RFC 5425 octet-counted framing: ``" "``. + + ``N`` is the byte length of the payload that follows (after the space). + """ + payload = line.rstrip("\n").encode("utf-8", errors="replace") + return f"{len(payload)}".encode("ascii") + _FRAME_SEP + payload + + +async def read_frame(reader: asyncio.StreamReader) -> Optional[bytes]: + """Read one octet-counted frame. Returns None on clean EOF.""" + # Read the ASCII length up to the first space. Bound the prefix so a + # malicious peer can't force us to buffer unbounded bytes before we know + # it's a valid frame. + prefix = b"" + while True: + c = await reader.read(1) + if not c: + return None if not prefix else b"" + if c == _FRAME_SEP: + break + if len(prefix) >= 10 or not c.isdigit(): + # RFC 5425 caps the length prefix at ~10 digits (< 4 GiB payload). + raise ValueError(f"invalid octet-count prefix: {prefix!r}") + prefix += c + n = int(prefix) + buf = await reader.readexactly(n) + return buf + + +# ----------------------------------------------------------------- main loop + + +async def _send_batch( + writer: asyncio.StreamWriter, + offset: int, + lines: list[tuple[int, str]], + store: _OffsetStore, +) -> int: + """Write every line as a frame, drain, then persist the last offset.""" + for _, line in lines: + writer.write(encode_frame(line)) + await writer.drain() + last_offset = lines[-1][0] + store.set(last_offset) + return last_offset + + +async def run_forwarder( + cfg: ForwarderConfig, + *, + poll_interval: float = 0.5, + stop_event: Optional[asyncio.Event] = None, +) -> None: + """Main forwarder loop. Run as a dedicated task. + + Stops when ``stop_event`` is set (used by tests and clean shutdown). + Exceptions trigger exponential backoff but are never fatal — the + forwarder is expected to outlive transient master/network failures. + """ + state_db = cfg.state_db or (cfg.agent_dir / "forwarder.db") + store = _OffsetStore(state_db) + offset = store.get() + backoff = _INITIAL_BACKOFF + + log.info( + "forwarder start log=%s master=%s:%d offset=%d", + cfg.log_path, cfg.master_host, cfg.master_port, offset, + ) + + try: + while stop_event is None or not stop_event.is_set(): + try: + ctx = build_worker_ssl_context(cfg.agent_dir) + reader, writer = await asyncio.open_connection( + cfg.master_host, cfg.master_port, ssl=ctx + ) + log.info("forwarder connected master=%s:%d", cfg.master_host, cfg.master_port) + backoff = _INITIAL_BACKOFF + try: + offset = await _pump(cfg, store, writer, offset, poll_interval, stop_event) + finally: + writer.close() + try: + await writer.wait_closed() + except Exception: # nosec B110 — socket cleanup is best-effort + pass + # Keep reader alive until here to avoid "reader garbage + # collected" warnings on some Python builds. + del reader + except (OSError, ssl.SSLError, ConnectionError) as exc: + log.warning( + "forwarder disconnected: %s — retrying in %.1fs", exc, backoff + ) + try: + await asyncio.wait_for( + _sleep_unless_stopped(backoff, stop_event), timeout=backoff + 1 + ) + except asyncio.TimeoutError: + pass + backoff = min(_MAX_BACKOFF, backoff * 2) + finally: + store.close() + log.info("forwarder stopped offset=%d", offset) + + +async def _pump( + cfg: ForwarderConfig, + store: _OffsetStore, + writer: asyncio.StreamWriter, + offset: int, + poll_interval: float, + stop_event: Optional[asyncio.Event], +) -> int: + """Read new lines since ``offset`` and ship them until disconnect.""" + while stop_event is None or not stop_event.is_set(): + if not cfg.log_path.exists(): + await _sleep_unless_stopped(poll_interval, stop_event) + continue + + stat = cfg.log_path.stat() + if stat.st_size < offset: + # truncated/rotated — reset. + log.warning("forwarder log rotated — resetting offset=0") + offset = 0 + store.set(0) + if stat.st_size - offset > cfg.max_lag_bytes: + # Catastrophic lag — skip ahead to cap local disk pressure. + skip_to = stat.st_size - cfg.max_lag_bytes + log.warning( + "forwarder lag %d > cap %d — dropping oldest %d bytes", + stat.st_size - offset, cfg.max_lag_bytes, skip_to - offset, + ) + offset = skip_to + store.set(offset) + + if stat.st_size == offset: + await _sleep_unless_stopped(poll_interval, stop_event) + continue + + batch: list[tuple[int, str]] = [] + with open(cfg.log_path, "r", encoding="utf-8", errors="replace") as f: + f.seek(offset) + while True: + line = f.readline() + if not line or not line.endswith("\n"): + break + offset_after = f.tell() + batch.append((offset_after, line.rstrip("\n"))) + if len(batch) >= 500: + break + if batch: + offset = await _send_batch(writer, offset, batch, store) + return offset + + +async def _sleep_unless_stopped( + seconds: float, stop_event: Optional[asyncio.Event] +) -> None: + if stop_event is None: + await asyncio.sleep(seconds) + return + try: + await asyncio.wait_for(stop_event.wait(), timeout=seconds) + except asyncio.TimeoutError: + pass + + +# Re-exported for CLI convenience +DEFAULT_PORT = 6514 + + +def default_master_host() -> Optional[str]: + return os.environ.get("DECNET_SWARM_MASTER_HOST") diff --git a/decnet/swarm/log_listener.py b/decnet/swarm/log_listener.py new file mode 100644 index 0000000..b3b4b39 --- /dev/null +++ b/decnet/swarm/log_listener.py @@ -0,0 +1,194 @@ +"""Master-side syslog-over-TLS listener (RFC 5425). + +Accepts mTLS-authenticated worker connections on TCP 6514, reads +octet-counted frames, parses each as an RFC 5424 line, and appends it to +the master's local ingest log files. The existing log_ingestion_worker +tails those files and inserts records into the master repo — worker +provenance is embedded in the parsed record's ``source_worker`` field. + +Design: +* TLS is mandatory. No plaintext fallback. A peer without a CA-signed + cert is rejected at the TLS handshake; nothing gets past the kernel. +* The listener never trusts the syslog HOSTNAME field for provenance — + that's attacker-supplied from the decky. The authoritative source is + the peer cert's CN, which the CA controlled at enrollment. +* Dropped connections are fine — the worker's forwarder holds the + offset and resumes from the same byte on reconnect. +""" +from __future__ import annotations + +import asyncio +import json +import pathlib +import ssl +from dataclasses import dataclass +from typing import Optional + +from cryptography import x509 +from cryptography.hazmat.primitives import serialization +from cryptography.x509.oid import NameOID + +from decnet.logging import get_logger +from decnet.swarm import pki +from decnet.swarm.log_forwarder import read_frame + +log = get_logger("swarm.listener") + + +@dataclass(frozen=True) +class ListenerConfig: + log_path: pathlib.Path # master's RFC 5424 .log (forensic sink) + json_path: pathlib.Path # master's .json (ingester tails this) + bind_host: str = "0.0.0.0" # nosec B104 — listener must bind publicly + bind_port: int = 6514 + ca_dir: pathlib.Path = pki.DEFAULT_CA_DIR + + +# --------------------------------------------------------- TLS context + + +def build_listener_ssl_context(ca_dir: pathlib.Path) -> ssl.SSLContext: + """Server-side mTLS context: master presents its master cert; clients + must present a cert signed by the DECNET CA.""" + master_dir = ca_dir / "master" + ca_cert = master_dir / "ca.crt" + cert = master_dir / "worker.crt" # master re-uses the 'worker' bundle layout + key = master_dir / "worker.key" + for p in (ca_cert, cert, key): + if not p.exists(): + raise RuntimeError( + f"master identity missing at {master_dir} — call ensure_master_identity first" + ) + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ctx.load_cert_chain(certfile=str(cert), keyfile=str(key)) + ctx.load_verify_locations(cafile=str(ca_cert)) + ctx.verify_mode = ssl.CERT_REQUIRED + return ctx + + +# ---------------------------------------------------------- helpers + + +def peer_cn(ssl_object: Optional[ssl.SSLObject]) -> str: + """Extract the CN from the TLS peer certificate (worker provenance). + + Falls back to ``"unknown"`` on any parse error — we refuse to crash on + malformed cert DNs and instead tag the message for later inspection. + """ + if ssl_object is None: + return "unknown" + der = ssl_object.getpeercert(binary_form=True) + if der is None: + return "unknown" + try: + cert = x509.load_der_x509_certificate(der) + attrs = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME) + return attrs[0].value if attrs else "unknown" + except Exception: # nosec B110 — provenance is best-effort + return "unknown" + + +def fingerprint_from_ssl(ssl_object: Optional[ssl.SSLObject]) -> Optional[str]: + if ssl_object is None: + return None + der = ssl_object.getpeercert(binary_form=True) + if der is None: + return None + try: + cert = x509.load_der_x509_certificate(der) + return pki.fingerprint(cert.public_bytes(serialization.Encoding.PEM)) + except Exception: + return None + + +# --------------------------------------------------- per-connection handler + + +async def _handle_connection( + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + cfg: ListenerConfig, +) -> None: + ssl_obj = writer.get_extra_info("ssl_object") + cn = peer_cn(ssl_obj) + peer = writer.get_extra_info("peername") + log.info("listener accepted worker=%s peer=%s", cn, peer) + + # Lazy import to avoid a circular dep if the collector pulls in logger setup. + from decnet.collector.worker import parse_rfc5424 + + cfg.log_path.parent.mkdir(parents=True, exist_ok=True) + cfg.json_path.parent.mkdir(parents=True, exist_ok=True) + + try: + with open(cfg.log_path, "a", encoding="utf-8") as lf, open( + cfg.json_path, "a", encoding="utf-8" + ) as jf: + while True: + try: + frame = await read_frame(reader) + except asyncio.IncompleteReadError: + break + except ValueError as exc: + log.warning("listener bad frame worker=%s err=%s", cn, exc) + break + if frame is None: + break + if not frame: + continue + line = frame.decode("utf-8", errors="replace") + lf.write(line + "\n") + lf.flush() + parsed = parse_rfc5424(line) + if parsed is not None: + parsed["source_worker"] = cn + jf.write(json.dumps(parsed) + "\n") + jf.flush() + else: + log.debug("listener malformed RFC5424 worker=%s snippet=%r", cn, line[:80]) + except Exception as exc: + log.warning("listener connection error worker=%s err=%s", cn, exc) + finally: + writer.close() + try: + await writer.wait_closed() + except Exception: # nosec B110 — socket cleanup is best-effort + pass + log.info("listener closed worker=%s", cn) + + +# ---------------------------------------------------------------- server + + +async def run_listener( + cfg: ListenerConfig, + *, + stop_event: Optional[asyncio.Event] = None, +) -> None: + ctx = build_listener_ssl_context(cfg.ca_dir) + + async def _client_cb( + reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ) -> None: + await _handle_connection(reader, writer, cfg) + + server = await asyncio.start_server( + _client_cb, host=cfg.bind_host, port=cfg.bind_port, ssl=ctx + ) + sockets = server.sockets or () + log.info( + "listener bound host=%s port=%d sockets=%d", + cfg.bind_host, cfg.bind_port, len(sockets), + ) + async with server: + if stop_event is None: + await server.serve_forever() + else: + serve_task = asyncio.create_task(server.serve_forever()) + await stop_event.wait() + server.close() + serve_task.cancel() + try: + await serve_task + except (asyncio.CancelledError, Exception): # nosec B110 + pass diff --git a/tests/swarm/test_log_forwarder.py b/tests/swarm/test_log_forwarder.py new file mode 100644 index 0000000..596f7e4 --- /dev/null +++ b/tests/swarm/test_log_forwarder.py @@ -0,0 +1,282 @@ +"""Tests for the syslog-over-TLS pipeline. + +Covers: +* octet-counted framing encode/decode (pure functions); +* offset persistence across reopens; +* end-to-end mTLS roundtrip forwarder → listener; +* impostor-CA worker is rejected at TLS handshake. +""" +from __future__ import annotations + +import asyncio +import pathlib +import socket + +import pytest +import ssl + +from decnet.swarm import log_forwarder as fwd +from decnet.swarm import log_listener as lst +from decnet.swarm import pki +from decnet.swarm.client import ensure_master_identity + + +def _free_port() -> int: + s = socket.socket() + s.bind(("127.0.0.1", 0)) + port = s.getsockname()[1] + s.close() + return port + + +# ------------------------------------------------------------ pure framing + + +def test_encode_frame_matches_rfc5425_shape() -> None: + out = fwd.encode_frame("<13>1 2026-04-18T00:00:00Z decky01 svc - - - hi") + # " " — ASCII digits, space, then the UTF-8 payload. + assert out.startswith(b"47 ") + assert out.endswith(b"hi") + assert int(out.split(b" ", 1)[0]) == len(out.split(b" ", 1)[1]) + + +@pytest.mark.asyncio +async def test_read_frame_roundtrip() -> None: + payload = b"<13>1 2026-04-18T00:00:00Z host app - - - msg" + frame = fwd.encode_frame(payload.decode()) + reader = asyncio.StreamReader() + reader.feed_data(frame) + reader.feed_eof() + got = await fwd.read_frame(reader) + assert got == payload + + +@pytest.mark.asyncio +async def test_read_frame_rejects_bad_prefix() -> None: + reader = asyncio.StreamReader() + reader.feed_data(b"NOTANUMBER msg") + reader.feed_eof() + with pytest.raises(ValueError): + await fwd.read_frame(reader) + + +# ------------------------------------------------------------- offset store + + +def test_offset_store_persists_across_reopen(tmp_path: pathlib.Path) -> None: + db = tmp_path / "fwd.db" + s1 = fwd._OffsetStore(db) + assert s1.get() == 0 + s1.set(4242) + s1.close() + + s2 = fwd._OffsetStore(db) + assert s2.get() == 4242 + s2.close() + + +# ------------------------------------------------------------ TLS roundtrip + + +@pytest.fixture +def _pki_env(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch): + ca_dir = tmp_path / "ca" + pki.ensure_ca(ca_dir) + # Master identity (also used as listener server cert). + master_id = ensure_master_identity(ca_dir) + # Give master's cert a 127.0.0.1 SAN so workers can resolve it if they + # happen to enable check_hostname; we don't, but future-proof anyway. + # (The default ensure_master_identity() cert already has 127.0.0.1.) + _ = master_id + + # Worker bundle — enrolled with 127.0.0.1 SAN. + worker_dir = tmp_path / "agent" + issued = pki.issue_worker_cert(pki.load_ca(ca_dir), "worker-x", ["127.0.0.1"]) + pki.write_worker_bundle(issued, worker_dir) + + monkeypatch.setattr(pki, "DEFAULT_CA_DIR", ca_dir) + monkeypatch.setattr(pki, "DEFAULT_AGENT_DIR", worker_dir) + return {"ca_dir": ca_dir, "worker_dir": worker_dir} + + +@pytest.mark.asyncio +async def test_forwarder_to_listener_roundtrip( + tmp_path: pathlib.Path, _pki_env: dict +) -> None: + port = _free_port() + worker_log = tmp_path / "decnet.log" + worker_log.write_text("") # create empty + + master_log = tmp_path / "master.log" + master_json = tmp_path / "master.json" + + listener_cfg = lst.ListenerConfig( + log_path=master_log, + json_path=master_json, + bind_host="127.0.0.1", + bind_port=port, + ca_dir=_pki_env["ca_dir"], + ) + fwd_cfg = fwd.ForwarderConfig( + log_path=worker_log, + master_host="127.0.0.1", + master_port=port, + agent_dir=_pki_env["worker_dir"], + state_db=tmp_path / "fwd.db", + ) + stop = asyncio.Event() + + listener_task = asyncio.create_task(lst.run_listener(listener_cfg, stop_event=stop)) + await asyncio.sleep(0.2) # wait for bind + + forwarder_task = asyncio.create_task( + fwd.run_forwarder(fwd_cfg, poll_interval=0.05, stop_event=stop) + ) + + # Write a few RFC 5424-ish lines into the worker log. + sample = ( + '<13>1 2026-04-18T00:00:00Z decky01 ssh-service 1 - ' + '[decnet@53595 decky="decky01" service="ssh-service" event_type="connect" ' + 'attacker_ip="1.2.3.4" attacker_port="4242"] ssh connect\n' + ) + with open(worker_log, "a", encoding="utf-8") as f: + for _ in range(3): + f.write(sample) + + # Poll for delivery on the master side. + for _ in range(50): + if master_log.exists() and master_log.stat().st_size > 0: + break + await asyncio.sleep(0.1) + + stop.set() + for t in (forwarder_task, listener_task): + try: + await asyncio.wait_for(t, timeout=5) + except asyncio.TimeoutError: + t.cancel() + + assert master_log.exists() + body = master_log.read_text() + assert body.count("ssh connect") == 3 + # Worker provenance tagged in the JSON sink. + assert master_json.exists() + assert "worker-x" in master_json.read_text() + + +@pytest.mark.asyncio +async def test_forwarder_resumes_from_persisted_offset( + tmp_path: pathlib.Path, _pki_env: dict +) -> None: + """Simulate a listener outage: forwarder persists offset locally and, + after the listener comes back, only ships lines added AFTER the crash.""" + port = _free_port() + worker_log = tmp_path / "decnet.log" + master_log = tmp_path / "master.log" + master_json = tmp_path / "master.json" + state_db = tmp_path / "fwd.db" + + # Pre-populate 2 lines and the offset store as if a previous forwarder run + # had already delivered them. The new run must NOT re-ship them. + line = ( + '<13>1 2026-04-18T00:00:00Z decky01 svc 1 - [x] old\n' + ) + worker_log.write_text(line * 2) + seed = fwd._OffsetStore(state_db) + seed.set(len(line) * 2) + seed.close() + + listener_cfg = lst.ListenerConfig( + log_path=master_log, json_path=master_json, + bind_host="127.0.0.1", bind_port=port, ca_dir=_pki_env["ca_dir"], + ) + fwd_cfg = fwd.ForwarderConfig( + log_path=worker_log, master_host="127.0.0.1", master_port=port, + agent_dir=_pki_env["worker_dir"], state_db=state_db, + ) + stop = asyncio.Event() + lt = asyncio.create_task(lst.run_listener(listener_cfg, stop_event=stop)) + await asyncio.sleep(0.2) + ft = asyncio.create_task(fwd.run_forwarder(fwd_cfg, poll_interval=0.05, stop_event=stop)) + + # Append a NEW line after startup — only this should reach the master. + new_line = ( + '<13>1 2026-04-18T00:00:01Z decky01 svc 1 - [x] fresh\n' + ) + with open(worker_log, "a", encoding="utf-8") as f: + f.write(new_line) + + for _ in range(50): + if master_log.exists() and b"fresh" in master_log.read_bytes(): + break + await asyncio.sleep(0.1) + + stop.set() + for t in (ft, lt): + try: + await asyncio.wait_for(t, timeout=5) + except asyncio.TimeoutError: + t.cancel() + + body = master_log.read_text() + assert "fresh" in body + assert "old" not in body, "forwarder re-shipped lines already acked before restart" + + +@pytest.mark.asyncio +async def test_impostor_worker_rejected_at_tls( + tmp_path: pathlib.Path, _pki_env: dict +) -> None: + port = _free_port() + master_log = tmp_path / "master.log" + master_json = tmp_path / "master.json" + listener_cfg = lst.ListenerConfig( + log_path=master_log, + json_path=master_json, + bind_host="127.0.0.1", + bind_port=port, + ca_dir=_pki_env["ca_dir"], + ) + stop = asyncio.Event() + listener_task = asyncio.create_task(lst.run_listener(listener_cfg, stop_event=stop)) + await asyncio.sleep(0.2) + + try: + # Build a forwarder SSL context from a DIFFERENT CA — should be rejected. + evil_ca = pki.generate_ca("Evil CA") + evil_dir = tmp_path / "evil" + pki.write_worker_bundle( + pki.issue_worker_cert(evil_ca, "evil-worker", ["127.0.0.1"]), evil_dir + ) + + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + ctx.load_cert_chain(str(evil_dir / "worker.crt"), str(evil_dir / "worker.key")) + ctx.load_verify_locations(cafile=str(evil_dir / "ca.crt")) + ctx.verify_mode = ssl.CERT_REQUIRED + ctx.check_hostname = False + + rejected = False + try: + r, w = await asyncio.open_connection("127.0.0.1", port, ssl=ctx) + # If TLS somehow succeeded, push a byte and expect the server to drop. + w.write(b"5 hello") + await w.drain() + # If the server accepted this from an unknown CA, that's a failure. + await asyncio.sleep(0.2) + w.close() + try: + await w.wait_closed() + except Exception: + pass + except (ssl.SSLError, OSError, ConnectionError): + rejected = True + + assert rejected or master_log.stat().st_size == 0, ( + "impostor connection must be rejected or produce no log lines" + ) + finally: + stop.set() + try: + await asyncio.wait_for(listener_task, timeout=5) + except asyncio.TimeoutError: + listener_task.cancel()