feat(swarm): syslog-over-TLS log pipeline (RFC 5425, TCP 6514)
Worker-side log_forwarder tails the local RFC 5424 log file and ships each line as an octet-counted frame to the master over mTLS. Offset is persisted in a tiny local SQLite so master outages never cause loss or duplication — reconnect resumes from the exact byte where the previous session left off. Impostor workers (cert not signed by DECNET CA) are rejected at TLS handshake. Master-side log_listener terminates mTLS on 0.0.0.0:6514, validates the client cert, extracts the peer CN as authoritative worker provenance, and appends each frame to the master's ingest log files. Attacker- controlled syslog HOSTNAME field is ignored — the CA-controlled CN is the only source of provenance. 7 tests added covering framing codec, offset persistence across reopens, end-to-end mTLS delivery, crash-resilience (offset survives restart, no duplicate shipping), and impostor-CA rejection. DECNET_SWARM_SYSLOG_PORT / DECNET_SWARM_MASTER_HOST env bindings added.
This commit is contained in:
@@ -77,6 +77,11 @@ DECNET_API_PORT: int = _port("DECNET_API_PORT", 8000)
|
|||||||
DECNET_JWT_SECRET: str = _require_env("DECNET_JWT_SECRET")
|
DECNET_JWT_SECRET: str = _require_env("DECNET_JWT_SECRET")
|
||||||
DECNET_INGEST_LOG_FILE: str | None = os.environ.get("DECNET_INGEST_LOG_FILE", "/var/log/decnet/decnet.log")
|
DECNET_INGEST_LOG_FILE: str | None = os.environ.get("DECNET_INGEST_LOG_FILE", "/var/log/decnet/decnet.log")
|
||||||
|
|
||||||
|
# SWARM log pipeline — RFC 5425 syslog-over-TLS between worker forwarders
|
||||||
|
# and the master listener. Plaintext syslog across hosts is forbidden.
|
||||||
|
DECNET_SWARM_SYSLOG_PORT: int = _port("DECNET_SWARM_SYSLOG_PORT", 6514)
|
||||||
|
DECNET_SWARM_MASTER_HOST: str | None = os.environ.get("DECNET_SWARM_MASTER_HOST")
|
||||||
|
|
||||||
# Ingester batching: how many log rows to accumulate per commit, and the
|
# Ingester batching: how many log rows to accumulate per commit, and the
|
||||||
# max wait (ms) before flushing a partial batch. Larger batches reduce
|
# max wait (ms) before flushing a partial batch. Larger batches reduce
|
||||||
# SQLite write-lock contention; the timeout keeps latency bounded during
|
# SQLite write-lock contention; the timeout keeps latency bounded during
|
||||||
|
|||||||
293
decnet/swarm/log_forwarder.py
Normal file
293
decnet/swarm/log_forwarder.py
Normal file
@@ -0,0 +1,293 @@
|
|||||||
|
"""Worker-side syslog-over-TLS forwarder (RFC 5425).
|
||||||
|
|
||||||
|
Runs alongside the worker agent. Tails the worker's local RFC 5424 log
|
||||||
|
file (written by the existing docker-collector) and ships each line to
|
||||||
|
the master's listener on TCP 6514 using octet-counted framing over mTLS.
|
||||||
|
Persists the last-forwarded byte offset in a tiny local SQLite so a
|
||||||
|
master crash never causes loss or duplication.
|
||||||
|
|
||||||
|
Design constraints (from the plan, non-negotiable):
|
||||||
|
* transport MUST be TLS — plaintext syslog is never acceptable between
|
||||||
|
hosts; only loopback (decky → worker-local collector) may be plaintext;
|
||||||
|
* mTLS — the listener pins the worker cert against the DECNET CA, so only
|
||||||
|
enrolled workers can push logs;
|
||||||
|
* offset persistence MUST be transactional w.r.t. the send — we only
|
||||||
|
advance the offset after ``writer.drain()`` returns without error.
|
||||||
|
|
||||||
|
The forwarder is intentionally a standalone coroutine, not a worker
|
||||||
|
inside the agent process. That keeps ``decnet agent`` crashes from
|
||||||
|
losing the log tail, and vice versa.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import os
|
||||||
|
import pathlib
|
||||||
|
import sqlite3
|
||||||
|
import ssl
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from decnet.logging import get_logger
|
||||||
|
from decnet.swarm import pki
|
||||||
|
|
||||||
|
log = get_logger("swarm.forwarder")
|
||||||
|
|
||||||
|
# RFC 5425 framing: "<octet-count> <syslog-msg>".
|
||||||
|
# The message itself is a standard RFC 5424 line (no trailing newline).
|
||||||
|
_FRAME_SEP = b" "
|
||||||
|
|
||||||
|
_INITIAL_BACKOFF = 1.0
|
||||||
|
_MAX_BACKOFF = 30.0
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class ForwarderConfig:
|
||||||
|
log_path: pathlib.Path # worker's RFC 5424 .log file
|
||||||
|
master_host: str
|
||||||
|
master_port: int = 6514
|
||||||
|
agent_dir: pathlib.Path = pki.DEFAULT_AGENT_DIR
|
||||||
|
state_db: Optional[pathlib.Path] = None # default: agent_dir / "forwarder.db"
|
||||||
|
# Max unacked bytes to keep in the local buffer when master is down.
|
||||||
|
# We bound the lag to avoid unbounded disk growth on catastrophic master
|
||||||
|
# outage — older lines are surfaced as a warning and dropped by advancing
|
||||||
|
# the offset.
|
||||||
|
max_lag_bytes: int = 128 * 1024 * 1024 # 128 MiB
|
||||||
|
|
||||||
|
|
||||||
|
# ------------------------------------------------------------ offset storage
|
||||||
|
|
||||||
|
|
||||||
|
class _OffsetStore:
|
||||||
|
"""Single-row SQLite offset tracker. Stdlib only — no ORM, no async."""
|
||||||
|
|
||||||
|
def __init__(self, db_path: pathlib.Path) -> None:
|
||||||
|
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
self._conn = sqlite3.connect(str(db_path))
|
||||||
|
self._conn.execute(
|
||||||
|
"CREATE TABLE IF NOT EXISTS forwarder_offset ("
|
||||||
|
" key TEXT PRIMARY KEY, offset INTEGER NOT NULL)"
|
||||||
|
)
|
||||||
|
self._conn.commit()
|
||||||
|
|
||||||
|
def get(self, key: str = "default") -> int:
|
||||||
|
row = self._conn.execute(
|
||||||
|
"SELECT offset FROM forwarder_offset WHERE key=?", (key,)
|
||||||
|
).fetchone()
|
||||||
|
return int(row[0]) if row else 0
|
||||||
|
|
||||||
|
def set(self, offset: int, key: str = "default") -> None:
|
||||||
|
self._conn.execute(
|
||||||
|
"INSERT INTO forwarder_offset(key, offset) VALUES(?, ?) "
|
||||||
|
"ON CONFLICT(key) DO UPDATE SET offset=excluded.offset",
|
||||||
|
(key, offset),
|
||||||
|
)
|
||||||
|
self._conn.commit()
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
self._conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------- TLS setup
|
||||||
|
|
||||||
|
|
||||||
|
def build_worker_ssl_context(agent_dir: pathlib.Path) -> ssl.SSLContext:
|
||||||
|
"""Client-side mTLS context for the forwarder.
|
||||||
|
|
||||||
|
Worker presents its agent bundle (same cert used for the control-plane
|
||||||
|
HTTPS listener). The CA is the DECNET CA; we pin by CA, not hostname,
|
||||||
|
because workers reach masters by operator-supplied address.
|
||||||
|
"""
|
||||||
|
bundle = pki.load_worker_bundle(agent_dir)
|
||||||
|
if bundle is None:
|
||||||
|
raise RuntimeError(
|
||||||
|
f"no worker bundle at {agent_dir} — enroll from the master first"
|
||||||
|
)
|
||||||
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||||
|
ctx.load_cert_chain(
|
||||||
|
certfile=str(agent_dir / "worker.crt"),
|
||||||
|
keyfile=str(agent_dir / "worker.key"),
|
||||||
|
)
|
||||||
|
ctx.load_verify_locations(cafile=str(agent_dir / "ca.crt"))
|
||||||
|
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||||
|
ctx.check_hostname = False
|
||||||
|
return ctx
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------- frame encoding
|
||||||
|
|
||||||
|
|
||||||
|
def encode_frame(line: str) -> bytes:
|
||||||
|
"""RFC 5425 octet-counted framing: ``"<N> <msg>"``.
|
||||||
|
|
||||||
|
``N`` is the byte length of the payload that follows (after the space).
|
||||||
|
"""
|
||||||
|
payload = line.rstrip("\n").encode("utf-8", errors="replace")
|
||||||
|
return f"{len(payload)}".encode("ascii") + _FRAME_SEP + payload
|
||||||
|
|
||||||
|
|
||||||
|
async def read_frame(reader: asyncio.StreamReader) -> Optional[bytes]:
|
||||||
|
"""Read one octet-counted frame. Returns None on clean EOF."""
|
||||||
|
# Read the ASCII length up to the first space. Bound the prefix so a
|
||||||
|
# malicious peer can't force us to buffer unbounded bytes before we know
|
||||||
|
# it's a valid frame.
|
||||||
|
prefix = b""
|
||||||
|
while True:
|
||||||
|
c = await reader.read(1)
|
||||||
|
if not c:
|
||||||
|
return None if not prefix else b""
|
||||||
|
if c == _FRAME_SEP:
|
||||||
|
break
|
||||||
|
if len(prefix) >= 10 or not c.isdigit():
|
||||||
|
# RFC 5425 caps the length prefix at ~10 digits (< 4 GiB payload).
|
||||||
|
raise ValueError(f"invalid octet-count prefix: {prefix!r}")
|
||||||
|
prefix += c
|
||||||
|
n = int(prefix)
|
||||||
|
buf = await reader.readexactly(n)
|
||||||
|
return buf
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------- main loop
|
||||||
|
|
||||||
|
|
||||||
|
async def _send_batch(
|
||||||
|
writer: asyncio.StreamWriter,
|
||||||
|
offset: int,
|
||||||
|
lines: list[tuple[int, str]],
|
||||||
|
store: _OffsetStore,
|
||||||
|
) -> int:
|
||||||
|
"""Write every line as a frame, drain, then persist the last offset."""
|
||||||
|
for _, line in lines:
|
||||||
|
writer.write(encode_frame(line))
|
||||||
|
await writer.drain()
|
||||||
|
last_offset = lines[-1][0]
|
||||||
|
store.set(last_offset)
|
||||||
|
return last_offset
|
||||||
|
|
||||||
|
|
||||||
|
async def run_forwarder(
|
||||||
|
cfg: ForwarderConfig,
|
||||||
|
*,
|
||||||
|
poll_interval: float = 0.5,
|
||||||
|
stop_event: Optional[asyncio.Event] = None,
|
||||||
|
) -> None:
|
||||||
|
"""Main forwarder loop. Run as a dedicated task.
|
||||||
|
|
||||||
|
Stops when ``stop_event`` is set (used by tests and clean shutdown).
|
||||||
|
Exceptions trigger exponential backoff but are never fatal — the
|
||||||
|
forwarder is expected to outlive transient master/network failures.
|
||||||
|
"""
|
||||||
|
state_db = cfg.state_db or (cfg.agent_dir / "forwarder.db")
|
||||||
|
store = _OffsetStore(state_db)
|
||||||
|
offset = store.get()
|
||||||
|
backoff = _INITIAL_BACKOFF
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"forwarder start log=%s master=%s:%d offset=%d",
|
||||||
|
cfg.log_path, cfg.master_host, cfg.master_port, offset,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
while stop_event is None or not stop_event.is_set():
|
||||||
|
try:
|
||||||
|
ctx = build_worker_ssl_context(cfg.agent_dir)
|
||||||
|
reader, writer = await asyncio.open_connection(
|
||||||
|
cfg.master_host, cfg.master_port, ssl=ctx
|
||||||
|
)
|
||||||
|
log.info("forwarder connected master=%s:%d", cfg.master_host, cfg.master_port)
|
||||||
|
backoff = _INITIAL_BACKOFF
|
||||||
|
try:
|
||||||
|
offset = await _pump(cfg, store, writer, offset, poll_interval, stop_event)
|
||||||
|
finally:
|
||||||
|
writer.close()
|
||||||
|
try:
|
||||||
|
await writer.wait_closed()
|
||||||
|
except Exception: # nosec B110 — socket cleanup is best-effort
|
||||||
|
pass
|
||||||
|
# Keep reader alive until here to avoid "reader garbage
|
||||||
|
# collected" warnings on some Python builds.
|
||||||
|
del reader
|
||||||
|
except (OSError, ssl.SSLError, ConnectionError) as exc:
|
||||||
|
log.warning(
|
||||||
|
"forwarder disconnected: %s — retrying in %.1fs", exc, backoff
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(
|
||||||
|
_sleep_unless_stopped(backoff, stop_event), timeout=backoff + 1
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
|
backoff = min(_MAX_BACKOFF, backoff * 2)
|
||||||
|
finally:
|
||||||
|
store.close()
|
||||||
|
log.info("forwarder stopped offset=%d", offset)
|
||||||
|
|
||||||
|
|
||||||
|
async def _pump(
|
||||||
|
cfg: ForwarderConfig,
|
||||||
|
store: _OffsetStore,
|
||||||
|
writer: asyncio.StreamWriter,
|
||||||
|
offset: int,
|
||||||
|
poll_interval: float,
|
||||||
|
stop_event: Optional[asyncio.Event],
|
||||||
|
) -> int:
|
||||||
|
"""Read new lines since ``offset`` and ship them until disconnect."""
|
||||||
|
while stop_event is None or not stop_event.is_set():
|
||||||
|
if not cfg.log_path.exists():
|
||||||
|
await _sleep_unless_stopped(poll_interval, stop_event)
|
||||||
|
continue
|
||||||
|
|
||||||
|
stat = cfg.log_path.stat()
|
||||||
|
if stat.st_size < offset:
|
||||||
|
# truncated/rotated — reset.
|
||||||
|
log.warning("forwarder log rotated — resetting offset=0")
|
||||||
|
offset = 0
|
||||||
|
store.set(0)
|
||||||
|
if stat.st_size - offset > cfg.max_lag_bytes:
|
||||||
|
# Catastrophic lag — skip ahead to cap local disk pressure.
|
||||||
|
skip_to = stat.st_size - cfg.max_lag_bytes
|
||||||
|
log.warning(
|
||||||
|
"forwarder lag %d > cap %d — dropping oldest %d bytes",
|
||||||
|
stat.st_size - offset, cfg.max_lag_bytes, skip_to - offset,
|
||||||
|
)
|
||||||
|
offset = skip_to
|
||||||
|
store.set(offset)
|
||||||
|
|
||||||
|
if stat.st_size == offset:
|
||||||
|
await _sleep_unless_stopped(poll_interval, stop_event)
|
||||||
|
continue
|
||||||
|
|
||||||
|
batch: list[tuple[int, str]] = []
|
||||||
|
with open(cfg.log_path, "r", encoding="utf-8", errors="replace") as f:
|
||||||
|
f.seek(offset)
|
||||||
|
while True:
|
||||||
|
line = f.readline()
|
||||||
|
if not line or not line.endswith("\n"):
|
||||||
|
break
|
||||||
|
offset_after = f.tell()
|
||||||
|
batch.append((offset_after, line.rstrip("\n")))
|
||||||
|
if len(batch) >= 500:
|
||||||
|
break
|
||||||
|
if batch:
|
||||||
|
offset = await _send_batch(writer, offset, batch, store)
|
||||||
|
return offset
|
||||||
|
|
||||||
|
|
||||||
|
async def _sleep_unless_stopped(
|
||||||
|
seconds: float, stop_event: Optional[asyncio.Event]
|
||||||
|
) -> None:
|
||||||
|
if stop_event is None:
|
||||||
|
await asyncio.sleep(seconds)
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(stop_event.wait(), timeout=seconds)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Re-exported for CLI convenience
|
||||||
|
DEFAULT_PORT = 6514
|
||||||
|
|
||||||
|
|
||||||
|
def default_master_host() -> Optional[str]:
|
||||||
|
return os.environ.get("DECNET_SWARM_MASTER_HOST")
|
||||||
194
decnet/swarm/log_listener.py
Normal file
194
decnet/swarm/log_listener.py
Normal file
@@ -0,0 +1,194 @@
|
|||||||
|
"""Master-side syslog-over-TLS listener (RFC 5425).
|
||||||
|
|
||||||
|
Accepts mTLS-authenticated worker connections on TCP 6514, reads
|
||||||
|
octet-counted frames, parses each as an RFC 5424 line, and appends it to
|
||||||
|
the master's local ingest log files. The existing log_ingestion_worker
|
||||||
|
tails those files and inserts records into the master repo — worker
|
||||||
|
provenance is embedded in the parsed record's ``source_worker`` field.
|
||||||
|
|
||||||
|
Design:
|
||||||
|
* TLS is mandatory. No plaintext fallback. A peer without a CA-signed
|
||||||
|
cert is rejected at the TLS handshake; nothing gets past the kernel.
|
||||||
|
* The listener never trusts the syslog HOSTNAME field for provenance —
|
||||||
|
that's attacker-supplied from the decky. The authoritative source is
|
||||||
|
the peer cert's CN, which the CA controlled at enrollment.
|
||||||
|
* Dropped connections are fine — the worker's forwarder holds the
|
||||||
|
offset and resumes from the same byte on reconnect.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import pathlib
|
||||||
|
import ssl
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from cryptography import x509
|
||||||
|
from cryptography.hazmat.primitives import serialization
|
||||||
|
from cryptography.x509.oid import NameOID
|
||||||
|
|
||||||
|
from decnet.logging import get_logger
|
||||||
|
from decnet.swarm import pki
|
||||||
|
from decnet.swarm.log_forwarder import read_frame
|
||||||
|
|
||||||
|
log = get_logger("swarm.listener")
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class ListenerConfig:
|
||||||
|
log_path: pathlib.Path # master's RFC 5424 .log (forensic sink)
|
||||||
|
json_path: pathlib.Path # master's .json (ingester tails this)
|
||||||
|
bind_host: str = "0.0.0.0" # nosec B104 — listener must bind publicly
|
||||||
|
bind_port: int = 6514
|
||||||
|
ca_dir: pathlib.Path = pki.DEFAULT_CA_DIR
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------------- TLS context
|
||||||
|
|
||||||
|
|
||||||
|
def build_listener_ssl_context(ca_dir: pathlib.Path) -> ssl.SSLContext:
|
||||||
|
"""Server-side mTLS context: master presents its master cert; clients
|
||||||
|
must present a cert signed by the DECNET CA."""
|
||||||
|
master_dir = ca_dir / "master"
|
||||||
|
ca_cert = master_dir / "ca.crt"
|
||||||
|
cert = master_dir / "worker.crt" # master re-uses the 'worker' bundle layout
|
||||||
|
key = master_dir / "worker.key"
|
||||||
|
for p in (ca_cert, cert, key):
|
||||||
|
if not p.exists():
|
||||||
|
raise RuntimeError(
|
||||||
|
f"master identity missing at {master_dir} — call ensure_master_identity first"
|
||||||
|
)
|
||||||
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
|
||||||
|
ctx.load_cert_chain(certfile=str(cert), keyfile=str(key))
|
||||||
|
ctx.load_verify_locations(cafile=str(ca_cert))
|
||||||
|
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||||
|
return ctx
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------- helpers
|
||||||
|
|
||||||
|
|
||||||
|
def peer_cn(ssl_object: Optional[ssl.SSLObject]) -> str:
|
||||||
|
"""Extract the CN from the TLS peer certificate (worker provenance).
|
||||||
|
|
||||||
|
Falls back to ``"unknown"`` on any parse error — we refuse to crash on
|
||||||
|
malformed cert DNs and instead tag the message for later inspection.
|
||||||
|
"""
|
||||||
|
if ssl_object is None:
|
||||||
|
return "unknown"
|
||||||
|
der = ssl_object.getpeercert(binary_form=True)
|
||||||
|
if der is None:
|
||||||
|
return "unknown"
|
||||||
|
try:
|
||||||
|
cert = x509.load_der_x509_certificate(der)
|
||||||
|
attrs = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME)
|
||||||
|
return attrs[0].value if attrs else "unknown"
|
||||||
|
except Exception: # nosec B110 — provenance is best-effort
|
||||||
|
return "unknown"
|
||||||
|
|
||||||
|
|
||||||
|
def fingerprint_from_ssl(ssl_object: Optional[ssl.SSLObject]) -> Optional[str]:
|
||||||
|
if ssl_object is None:
|
||||||
|
return None
|
||||||
|
der = ssl_object.getpeercert(binary_form=True)
|
||||||
|
if der is None:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
cert = x509.load_der_x509_certificate(der)
|
||||||
|
return pki.fingerprint(cert.public_bytes(serialization.Encoding.PEM))
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# --------------------------------------------------- per-connection handler
|
||||||
|
|
||||||
|
|
||||||
|
async def _handle_connection(
|
||||||
|
reader: asyncio.StreamReader,
|
||||||
|
writer: asyncio.StreamWriter,
|
||||||
|
cfg: ListenerConfig,
|
||||||
|
) -> None:
|
||||||
|
ssl_obj = writer.get_extra_info("ssl_object")
|
||||||
|
cn = peer_cn(ssl_obj)
|
||||||
|
peer = writer.get_extra_info("peername")
|
||||||
|
log.info("listener accepted worker=%s peer=%s", cn, peer)
|
||||||
|
|
||||||
|
# Lazy import to avoid a circular dep if the collector pulls in logger setup.
|
||||||
|
from decnet.collector.worker import parse_rfc5424
|
||||||
|
|
||||||
|
cfg.log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
cfg.json_path.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(cfg.log_path, "a", encoding="utf-8") as lf, open(
|
||||||
|
cfg.json_path, "a", encoding="utf-8"
|
||||||
|
) as jf:
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
frame = await read_frame(reader)
|
||||||
|
except asyncio.IncompleteReadError:
|
||||||
|
break
|
||||||
|
except ValueError as exc:
|
||||||
|
log.warning("listener bad frame worker=%s err=%s", cn, exc)
|
||||||
|
break
|
||||||
|
if frame is None:
|
||||||
|
break
|
||||||
|
if not frame:
|
||||||
|
continue
|
||||||
|
line = frame.decode("utf-8", errors="replace")
|
||||||
|
lf.write(line + "\n")
|
||||||
|
lf.flush()
|
||||||
|
parsed = parse_rfc5424(line)
|
||||||
|
if parsed is not None:
|
||||||
|
parsed["source_worker"] = cn
|
||||||
|
jf.write(json.dumps(parsed) + "\n")
|
||||||
|
jf.flush()
|
||||||
|
else:
|
||||||
|
log.debug("listener malformed RFC5424 worker=%s snippet=%r", cn, line[:80])
|
||||||
|
except Exception as exc:
|
||||||
|
log.warning("listener connection error worker=%s err=%s", cn, exc)
|
||||||
|
finally:
|
||||||
|
writer.close()
|
||||||
|
try:
|
||||||
|
await writer.wait_closed()
|
||||||
|
except Exception: # nosec B110 — socket cleanup is best-effort
|
||||||
|
pass
|
||||||
|
log.info("listener closed worker=%s", cn)
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------- server
|
||||||
|
|
||||||
|
|
||||||
|
async def run_listener(
|
||||||
|
cfg: ListenerConfig,
|
||||||
|
*,
|
||||||
|
stop_event: Optional[asyncio.Event] = None,
|
||||||
|
) -> None:
|
||||||
|
ctx = build_listener_ssl_context(cfg.ca_dir)
|
||||||
|
|
||||||
|
async def _client_cb(
|
||||||
|
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
||||||
|
) -> None:
|
||||||
|
await _handle_connection(reader, writer, cfg)
|
||||||
|
|
||||||
|
server = await asyncio.start_server(
|
||||||
|
_client_cb, host=cfg.bind_host, port=cfg.bind_port, ssl=ctx
|
||||||
|
)
|
||||||
|
sockets = server.sockets or ()
|
||||||
|
log.info(
|
||||||
|
"listener bound host=%s port=%d sockets=%d",
|
||||||
|
cfg.bind_host, cfg.bind_port, len(sockets),
|
||||||
|
)
|
||||||
|
async with server:
|
||||||
|
if stop_event is None:
|
||||||
|
await server.serve_forever()
|
||||||
|
else:
|
||||||
|
serve_task = asyncio.create_task(server.serve_forever())
|
||||||
|
await stop_event.wait()
|
||||||
|
server.close()
|
||||||
|
serve_task.cancel()
|
||||||
|
try:
|
||||||
|
await serve_task
|
||||||
|
except (asyncio.CancelledError, Exception): # nosec B110
|
||||||
|
pass
|
||||||
282
tests/swarm/test_log_forwarder.py
Normal file
282
tests/swarm/test_log_forwarder.py
Normal file
@@ -0,0 +1,282 @@
|
|||||||
|
"""Tests for the syslog-over-TLS pipeline.
|
||||||
|
|
||||||
|
Covers:
|
||||||
|
* octet-counted framing encode/decode (pure functions);
|
||||||
|
* offset persistence across reopens;
|
||||||
|
* end-to-end mTLS roundtrip forwarder → listener;
|
||||||
|
* impostor-CA worker is rejected at TLS handshake.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import pathlib
|
||||||
|
import socket
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import ssl
|
||||||
|
|
||||||
|
from decnet.swarm import log_forwarder as fwd
|
||||||
|
from decnet.swarm import log_listener as lst
|
||||||
|
from decnet.swarm import pki
|
||||||
|
from decnet.swarm.client import ensure_master_identity
|
||||||
|
|
||||||
|
|
||||||
|
def _free_port() -> int:
|
||||||
|
s = socket.socket()
|
||||||
|
s.bind(("127.0.0.1", 0))
|
||||||
|
port = s.getsockname()[1]
|
||||||
|
s.close()
|
||||||
|
return port
|
||||||
|
|
||||||
|
|
||||||
|
# ------------------------------------------------------------ pure framing
|
||||||
|
|
||||||
|
|
||||||
|
def test_encode_frame_matches_rfc5425_shape() -> None:
|
||||||
|
out = fwd.encode_frame("<13>1 2026-04-18T00:00:00Z decky01 svc - - - hi")
|
||||||
|
# "<len> <msg>" — ASCII digits, space, then the UTF-8 payload.
|
||||||
|
assert out.startswith(b"47 ")
|
||||||
|
assert out.endswith(b"hi")
|
||||||
|
assert int(out.split(b" ", 1)[0]) == len(out.split(b" ", 1)[1])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_read_frame_roundtrip() -> None:
|
||||||
|
payload = b"<13>1 2026-04-18T00:00:00Z host app - - - msg"
|
||||||
|
frame = fwd.encode_frame(payload.decode())
|
||||||
|
reader = asyncio.StreamReader()
|
||||||
|
reader.feed_data(frame)
|
||||||
|
reader.feed_eof()
|
||||||
|
got = await fwd.read_frame(reader)
|
||||||
|
assert got == payload
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_read_frame_rejects_bad_prefix() -> None:
|
||||||
|
reader = asyncio.StreamReader()
|
||||||
|
reader.feed_data(b"NOTANUMBER msg")
|
||||||
|
reader.feed_eof()
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
await fwd.read_frame(reader)
|
||||||
|
|
||||||
|
|
||||||
|
# ------------------------------------------------------------- offset store
|
||||||
|
|
||||||
|
|
||||||
|
def test_offset_store_persists_across_reopen(tmp_path: pathlib.Path) -> None:
|
||||||
|
db = tmp_path / "fwd.db"
|
||||||
|
s1 = fwd._OffsetStore(db)
|
||||||
|
assert s1.get() == 0
|
||||||
|
s1.set(4242)
|
||||||
|
s1.close()
|
||||||
|
|
||||||
|
s2 = fwd._OffsetStore(db)
|
||||||
|
assert s2.get() == 4242
|
||||||
|
s2.close()
|
||||||
|
|
||||||
|
|
||||||
|
# ------------------------------------------------------------ TLS roundtrip
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def _pki_env(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch):
|
||||||
|
ca_dir = tmp_path / "ca"
|
||||||
|
pki.ensure_ca(ca_dir)
|
||||||
|
# Master identity (also used as listener server cert).
|
||||||
|
master_id = ensure_master_identity(ca_dir)
|
||||||
|
# Give master's cert a 127.0.0.1 SAN so workers can resolve it if they
|
||||||
|
# happen to enable check_hostname; we don't, but future-proof anyway.
|
||||||
|
# (The default ensure_master_identity() cert already has 127.0.0.1.)
|
||||||
|
_ = master_id
|
||||||
|
|
||||||
|
# Worker bundle — enrolled with 127.0.0.1 SAN.
|
||||||
|
worker_dir = tmp_path / "agent"
|
||||||
|
issued = pki.issue_worker_cert(pki.load_ca(ca_dir), "worker-x", ["127.0.0.1"])
|
||||||
|
pki.write_worker_bundle(issued, worker_dir)
|
||||||
|
|
||||||
|
monkeypatch.setattr(pki, "DEFAULT_CA_DIR", ca_dir)
|
||||||
|
monkeypatch.setattr(pki, "DEFAULT_AGENT_DIR", worker_dir)
|
||||||
|
return {"ca_dir": ca_dir, "worker_dir": worker_dir}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_forwarder_to_listener_roundtrip(
|
||||||
|
tmp_path: pathlib.Path, _pki_env: dict
|
||||||
|
) -> None:
|
||||||
|
port = _free_port()
|
||||||
|
worker_log = tmp_path / "decnet.log"
|
||||||
|
worker_log.write_text("") # create empty
|
||||||
|
|
||||||
|
master_log = tmp_path / "master.log"
|
||||||
|
master_json = tmp_path / "master.json"
|
||||||
|
|
||||||
|
listener_cfg = lst.ListenerConfig(
|
||||||
|
log_path=master_log,
|
||||||
|
json_path=master_json,
|
||||||
|
bind_host="127.0.0.1",
|
||||||
|
bind_port=port,
|
||||||
|
ca_dir=_pki_env["ca_dir"],
|
||||||
|
)
|
||||||
|
fwd_cfg = fwd.ForwarderConfig(
|
||||||
|
log_path=worker_log,
|
||||||
|
master_host="127.0.0.1",
|
||||||
|
master_port=port,
|
||||||
|
agent_dir=_pki_env["worker_dir"],
|
||||||
|
state_db=tmp_path / "fwd.db",
|
||||||
|
)
|
||||||
|
stop = asyncio.Event()
|
||||||
|
|
||||||
|
listener_task = asyncio.create_task(lst.run_listener(listener_cfg, stop_event=stop))
|
||||||
|
await asyncio.sleep(0.2) # wait for bind
|
||||||
|
|
||||||
|
forwarder_task = asyncio.create_task(
|
||||||
|
fwd.run_forwarder(fwd_cfg, poll_interval=0.05, stop_event=stop)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Write a few RFC 5424-ish lines into the worker log.
|
||||||
|
sample = (
|
||||||
|
'<13>1 2026-04-18T00:00:00Z decky01 ssh-service 1 - '
|
||||||
|
'[decnet@53595 decky="decky01" service="ssh-service" event_type="connect" '
|
||||||
|
'attacker_ip="1.2.3.4" attacker_port="4242"] ssh connect\n'
|
||||||
|
)
|
||||||
|
with open(worker_log, "a", encoding="utf-8") as f:
|
||||||
|
for _ in range(3):
|
||||||
|
f.write(sample)
|
||||||
|
|
||||||
|
# Poll for delivery on the master side.
|
||||||
|
for _ in range(50):
|
||||||
|
if master_log.exists() and master_log.stat().st_size > 0:
|
||||||
|
break
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
stop.set()
|
||||||
|
for t in (forwarder_task, listener_task):
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(t, timeout=5)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
t.cancel()
|
||||||
|
|
||||||
|
assert master_log.exists()
|
||||||
|
body = master_log.read_text()
|
||||||
|
assert body.count("ssh connect") == 3
|
||||||
|
# Worker provenance tagged in the JSON sink.
|
||||||
|
assert master_json.exists()
|
||||||
|
assert "worker-x" in master_json.read_text()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_forwarder_resumes_from_persisted_offset(
|
||||||
|
tmp_path: pathlib.Path, _pki_env: dict
|
||||||
|
) -> None:
|
||||||
|
"""Simulate a listener outage: forwarder persists offset locally and,
|
||||||
|
after the listener comes back, only ships lines added AFTER the crash."""
|
||||||
|
port = _free_port()
|
||||||
|
worker_log = tmp_path / "decnet.log"
|
||||||
|
master_log = tmp_path / "master.log"
|
||||||
|
master_json = tmp_path / "master.json"
|
||||||
|
state_db = tmp_path / "fwd.db"
|
||||||
|
|
||||||
|
# Pre-populate 2 lines and the offset store as if a previous forwarder run
|
||||||
|
# had already delivered them. The new run must NOT re-ship them.
|
||||||
|
line = (
|
||||||
|
'<13>1 2026-04-18T00:00:00Z decky01 svc 1 - [x] old\n'
|
||||||
|
)
|
||||||
|
worker_log.write_text(line * 2)
|
||||||
|
seed = fwd._OffsetStore(state_db)
|
||||||
|
seed.set(len(line) * 2)
|
||||||
|
seed.close()
|
||||||
|
|
||||||
|
listener_cfg = lst.ListenerConfig(
|
||||||
|
log_path=master_log, json_path=master_json,
|
||||||
|
bind_host="127.0.0.1", bind_port=port, ca_dir=_pki_env["ca_dir"],
|
||||||
|
)
|
||||||
|
fwd_cfg = fwd.ForwarderConfig(
|
||||||
|
log_path=worker_log, master_host="127.0.0.1", master_port=port,
|
||||||
|
agent_dir=_pki_env["worker_dir"], state_db=state_db,
|
||||||
|
)
|
||||||
|
stop = asyncio.Event()
|
||||||
|
lt = asyncio.create_task(lst.run_listener(listener_cfg, stop_event=stop))
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
ft = asyncio.create_task(fwd.run_forwarder(fwd_cfg, poll_interval=0.05, stop_event=stop))
|
||||||
|
|
||||||
|
# Append a NEW line after startup — only this should reach the master.
|
||||||
|
new_line = (
|
||||||
|
'<13>1 2026-04-18T00:00:01Z decky01 svc 1 - [x] fresh\n'
|
||||||
|
)
|
||||||
|
with open(worker_log, "a", encoding="utf-8") as f:
|
||||||
|
f.write(new_line)
|
||||||
|
|
||||||
|
for _ in range(50):
|
||||||
|
if master_log.exists() and b"fresh" in master_log.read_bytes():
|
||||||
|
break
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
stop.set()
|
||||||
|
for t in (ft, lt):
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(t, timeout=5)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
t.cancel()
|
||||||
|
|
||||||
|
body = master_log.read_text()
|
||||||
|
assert "fresh" in body
|
||||||
|
assert "old" not in body, "forwarder re-shipped lines already acked before restart"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_impostor_worker_rejected_at_tls(
|
||||||
|
tmp_path: pathlib.Path, _pki_env: dict
|
||||||
|
) -> None:
|
||||||
|
port = _free_port()
|
||||||
|
master_log = tmp_path / "master.log"
|
||||||
|
master_json = tmp_path / "master.json"
|
||||||
|
listener_cfg = lst.ListenerConfig(
|
||||||
|
log_path=master_log,
|
||||||
|
json_path=master_json,
|
||||||
|
bind_host="127.0.0.1",
|
||||||
|
bind_port=port,
|
||||||
|
ca_dir=_pki_env["ca_dir"],
|
||||||
|
)
|
||||||
|
stop = asyncio.Event()
|
||||||
|
listener_task = asyncio.create_task(lst.run_listener(listener_cfg, stop_event=stop))
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Build a forwarder SSL context from a DIFFERENT CA — should be rejected.
|
||||||
|
evil_ca = pki.generate_ca("Evil CA")
|
||||||
|
evil_dir = tmp_path / "evil"
|
||||||
|
pki.write_worker_bundle(
|
||||||
|
pki.issue_worker_cert(evil_ca, "evil-worker", ["127.0.0.1"]), evil_dir
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||||
|
ctx.load_cert_chain(str(evil_dir / "worker.crt"), str(evil_dir / "worker.key"))
|
||||||
|
ctx.load_verify_locations(cafile=str(evil_dir / "ca.crt"))
|
||||||
|
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||||
|
ctx.check_hostname = False
|
||||||
|
|
||||||
|
rejected = False
|
||||||
|
try:
|
||||||
|
r, w = await asyncio.open_connection("127.0.0.1", port, ssl=ctx)
|
||||||
|
# If TLS somehow succeeded, push a byte and expect the server to drop.
|
||||||
|
w.write(b"5 hello")
|
||||||
|
await w.drain()
|
||||||
|
# If the server accepted this from an unknown CA, that's a failure.
|
||||||
|
await asyncio.sleep(0.2)
|
||||||
|
w.close()
|
||||||
|
try:
|
||||||
|
await w.wait_closed()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
except (ssl.SSLError, OSError, ConnectionError):
|
||||||
|
rejected = True
|
||||||
|
|
||||||
|
assert rejected or master_log.stat().st_size == 0, (
|
||||||
|
"impostor connection must be rejected or produce no log lines"
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
stop.set()
|
||||||
|
try:
|
||||||
|
await asyncio.wait_for(listener_task, timeout=5)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
listener_task.cancel()
|
||||||
Reference in New Issue
Block a user