test(swarm): add forwarder/listener resilience scenarios
Covers failure modes the happy-path tests miss: - log rotation (copytruncate): st_size shrinks under the forwarder, it resets offset=0 and reships the new contents instead of getting wedged past EOF; - listener restart: forwarder retries, resumes from the persisted offset, and the previously-acked lines are NOT duplicated on the master; - listener tolerates a well-authenticated client that sends a partial octet-count frame and drops — the server must stay up and accept follow-on connections; - peer_cn / fingerprint_from_ssl degrade to 'unknown' / None when no peer cert is available (defensive path that otherwise rarely fires).
This commit is contained in:
256
tests/swarm/test_forwarder_resilience.py
Normal file
256
tests/swarm/test_forwarder_resilience.py
Normal file
@@ -0,0 +1,256 @@
|
||||
"""Extra resilience tests for the syslog-over-TLS pipeline.
|
||||
|
||||
Covers failure modes the happy-path tests in test_log_forwarder.py don't
|
||||
exercise:
|
||||
|
||||
* log rotation (st_size shrinks under the forwarder) resets offset to 0
|
||||
and re-ships from the start;
|
||||
* listener restart — forwarder reconnects and continues from the last
|
||||
persisted offset, no duplicates;
|
||||
* listener tolerates a client that connects with a valid cert and drops
|
||||
mid-frame (IncompleteReadError path) without crashing the server task;
|
||||
* peer_cn + fingerprint_from_ssl degrade gracefully on missing/invalid
|
||||
peer certificates.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import pathlib
|
||||
import socket
|
||||
|
||||
import pytest
|
||||
import ssl
|
||||
|
||||
from decnet.swarm import log_forwarder as fwd
|
||||
from decnet.swarm import log_listener as lst
|
||||
from decnet.swarm import pki
|
||||
from decnet.swarm.client import ensure_master_identity
|
||||
|
||||
|
||||
SAMPLE = (
|
||||
'<13>1 2026-04-18T00:00:00Z decky01 svc 1 - '
|
||||
'[decnet@53595 decky="decky01" service="ssh-service" '
|
||||
'event_type="connect" attacker_ip="1.2.3.4" attacker_port="4242"] {msg}\n'
|
||||
)
|
||||
|
||||
|
||||
def _free_port() -> int:
|
||||
s = socket.socket()
|
||||
s.bind(("127.0.0.1", 0))
|
||||
port = s.getsockname()[1]
|
||||
s.close()
|
||||
return port
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def _pki_env(tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch):
|
||||
ca_dir = tmp_path / "ca"
|
||||
pki.ensure_ca(ca_dir)
|
||||
ensure_master_identity(ca_dir)
|
||||
worker_dir = tmp_path / "agent"
|
||||
issued = pki.issue_worker_cert(pki.load_ca(ca_dir), "worker-y", ["127.0.0.1"])
|
||||
pki.write_worker_bundle(issued, worker_dir)
|
||||
monkeypatch.setattr(pki, "DEFAULT_CA_DIR", ca_dir)
|
||||
monkeypatch.setattr(pki, "DEFAULT_AGENT_DIR", worker_dir)
|
||||
return {"ca_dir": ca_dir, "worker_dir": worker_dir}
|
||||
|
||||
|
||||
async def _wait_for(pred, timeout: float = 5.0, interval: float = 0.1) -> bool:
|
||||
steps = max(1, int(timeout / interval))
|
||||
for _ in range(steps):
|
||||
if pred():
|
||||
return True
|
||||
await asyncio.sleep(interval)
|
||||
return False
|
||||
|
||||
|
||||
# ----------------------------------------------------------- pure helpers
|
||||
|
||||
|
||||
def test_peer_cn_returns_unknown_when_no_ssl_object() -> None:
|
||||
assert lst.peer_cn(None) == "unknown"
|
||||
|
||||
|
||||
def test_fingerprint_from_ssl_handles_missing_peer_cert() -> None:
|
||||
assert lst.fingerprint_from_ssl(None) is None
|
||||
|
||||
|
||||
# ---------------------------------------------------- rotation / crash loops
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_forwarder_reships_after_log_rotation(
|
||||
tmp_path: pathlib.Path, _pki_env: dict
|
||||
) -> None:
|
||||
"""If the log file shrinks (logrotate truncation), the forwarder must
|
||||
reset offset=0 and re-ship the new contents — never get stuck past EOF."""
|
||||
port = _free_port()
|
||||
worker_log = tmp_path / "decnet.log"
|
||||
master_log = tmp_path / "master.log"
|
||||
master_json = tmp_path / "master.json"
|
||||
|
||||
listener_cfg = lst.ListenerConfig(
|
||||
log_path=master_log, json_path=master_json,
|
||||
bind_host="127.0.0.1", bind_port=port, ca_dir=_pki_env["ca_dir"],
|
||||
)
|
||||
fwd_cfg = fwd.ForwarderConfig(
|
||||
log_path=worker_log, master_host="127.0.0.1", master_port=port,
|
||||
agent_dir=_pki_env["worker_dir"], state_db=tmp_path / "fwd.db",
|
||||
)
|
||||
stop = asyncio.Event()
|
||||
lt = asyncio.create_task(lst.run_listener(listener_cfg, stop_event=stop))
|
||||
await asyncio.sleep(0.2)
|
||||
ft = asyncio.create_task(fwd.run_forwarder(fwd_cfg, poll_interval=0.05, stop_event=stop))
|
||||
|
||||
# Phase 1: write TWO pre-rotation lines so the offset is deep into the file.
|
||||
worker_log.write_text(SAMPLE.format(msg="rotate-A") + SAMPLE.format(msg="rotate-B"))
|
||||
ok = await _wait_for(lambda: master_log.exists() and b"rotate-B" in master_log.read_bytes())
|
||||
assert ok, "pre-rotation lines never reached master"
|
||||
size_before_rotate = master_log.stat().st_size
|
||||
|
||||
# Phase 2: rotate (truncate to a strictly SHORTER content) so the
|
||||
# forwarder's offset tracker lands past EOF and must reset to 0.
|
||||
worker_log.write_text(SAMPLE.format(msg="P"))
|
||||
|
||||
ok = await _wait_for(
|
||||
lambda: master_log.stat().st_size > size_before_rotate
|
||||
and master_log.read_text().rstrip().endswith("P"),
|
||||
timeout=5.0,
|
||||
)
|
||||
assert ok, "forwarder got stuck past EOF after rotation (expected reset → ship post-rotate 'P' line)"
|
||||
|
||||
stop.set()
|
||||
for t in (ft, lt):
|
||||
try:
|
||||
await asyncio.wait_for(t, timeout=5)
|
||||
except asyncio.TimeoutError:
|
||||
t.cancel()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_forwarder_resumes_after_listener_restart(
|
||||
tmp_path: pathlib.Path, _pki_env: dict
|
||||
) -> None:
|
||||
"""Listener goes down mid-session, forwarder retries with backoff; on
|
||||
restart, we must NOT re-ship lines that were already drained."""
|
||||
port = _free_port()
|
||||
worker_log = tmp_path / "decnet.log"
|
||||
master_log = tmp_path / "master.log"
|
||||
master_json = tmp_path / "master.json"
|
||||
state_db = tmp_path / "fwd.db"
|
||||
|
||||
listener_cfg = lst.ListenerConfig(
|
||||
log_path=master_log, json_path=master_json,
|
||||
bind_host="127.0.0.1", bind_port=port, ca_dir=_pki_env["ca_dir"],
|
||||
)
|
||||
fwd_cfg = fwd.ForwarderConfig(
|
||||
log_path=worker_log, master_host="127.0.0.1", master_port=port,
|
||||
agent_dir=_pki_env["worker_dir"], state_db=state_db,
|
||||
)
|
||||
|
||||
# --- phase 1 ----------------------------------------------------------
|
||||
stop1 = asyncio.Event()
|
||||
lt1 = asyncio.create_task(lst.run_listener(listener_cfg, stop_event=stop1))
|
||||
await asyncio.sleep(0.2)
|
||||
stop_fwd = asyncio.Event()
|
||||
ft = asyncio.create_task(fwd.run_forwarder(fwd_cfg, poll_interval=0.05, stop_event=stop_fwd))
|
||||
|
||||
worker_log.write_text(SAMPLE.format(msg="before-outage"))
|
||||
ok = await _wait_for(lambda: master_log.exists() and b"before-outage" in master_log.read_bytes())
|
||||
assert ok, "phase-1 line never reached master"
|
||||
|
||||
# --- outage -----------------------------------------------------------
|
||||
stop1.set()
|
||||
try:
|
||||
await asyncio.wait_for(lt1, timeout=5)
|
||||
except asyncio.TimeoutError:
|
||||
lt1.cancel()
|
||||
|
||||
# While listener is down, append another line. Forwarder will retry.
|
||||
with open(worker_log, "a", encoding="utf-8") as f:
|
||||
f.write(SAMPLE.format(msg="during-outage"))
|
||||
|
||||
await asyncio.sleep(0.3)
|
||||
|
||||
# --- phase 2: listener back ------------------------------------------
|
||||
stop2 = asyncio.Event()
|
||||
lt2 = asyncio.create_task(lst.run_listener(listener_cfg, stop_event=stop2))
|
||||
|
||||
ok = await _wait_for(lambda: b"during-outage" in master_log.read_bytes(), timeout=15.0)
|
||||
assert ok, "forwarder never reshipped the buffered line after listener restart"
|
||||
|
||||
# Crucially, "before-outage" appears exactly once — not re-shipped.
|
||||
body = master_log.read_text()
|
||||
assert body.count("before-outage") == 1, "forwarder duplicated a line across reconnect"
|
||||
assert body.count("during-outage") == 1
|
||||
|
||||
# --- shutdown ---------------------------------------------------------
|
||||
stop_fwd.set()
|
||||
stop2.set()
|
||||
for t in (ft, lt2):
|
||||
try:
|
||||
await asyncio.wait_for(t, timeout=5)
|
||||
except asyncio.TimeoutError:
|
||||
t.cancel()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_listener_tolerates_client_dropping_mid_stream(
|
||||
tmp_path: pathlib.Path, _pki_env: dict
|
||||
) -> None:
|
||||
"""A well-authenticated client that sends a partial frame and drops must
|
||||
not take the listener down or wedge subsequent connections."""
|
||||
port = _free_port()
|
||||
master_log = tmp_path / "master.log"
|
||||
master_json = tmp_path / "master.json"
|
||||
listener_cfg = lst.ListenerConfig(
|
||||
log_path=master_log, json_path=master_json,
|
||||
bind_host="127.0.0.1", bind_port=port, ca_dir=_pki_env["ca_dir"],
|
||||
)
|
||||
stop = asyncio.Event()
|
||||
listener_task = asyncio.create_task(lst.run_listener(listener_cfg, stop_event=stop))
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
try:
|
||||
# Client 1: send a truncated octet-count prefix ("99 ") but no payload
|
||||
# before closing — exercises IncompleteReadError in read_frame.
|
||||
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
||||
ctx.load_cert_chain(
|
||||
str(_pki_env["worker_dir"] / "worker.crt"),
|
||||
str(_pki_env["worker_dir"] / "worker.key"),
|
||||
)
|
||||
ctx.load_verify_locations(cafile=str(_pki_env["worker_dir"] / "ca.crt"))
|
||||
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||
ctx.check_hostname = False
|
||||
|
||||
r, w = await asyncio.open_connection("127.0.0.1", port, ssl=ctx)
|
||||
w.write(b"99 ") # promise 99 bytes, send 0
|
||||
await w.drain()
|
||||
w.close()
|
||||
try:
|
||||
await w.wait_closed()
|
||||
except Exception: # nosec B110
|
||||
pass
|
||||
|
||||
# Client 2: reconnect cleanly and actually ship a frame. If the
|
||||
# listener survived client-1's misbehavior, this must succeed.
|
||||
r2, w2 = await asyncio.open_connection("127.0.0.1", port, ssl=ctx)
|
||||
payload = b'<13>1 2026-04-18T00:00:00Z decky01 svc - - - post-drop'
|
||||
w2.write(f"{len(payload)} ".encode() + payload)
|
||||
await w2.drain()
|
||||
w2.close()
|
||||
try:
|
||||
await w2.wait_closed()
|
||||
except Exception: # nosec B110
|
||||
pass
|
||||
|
||||
ok = await _wait_for(
|
||||
lambda: master_log.exists() and b"post-drop" in master_log.read_bytes()
|
||||
)
|
||||
assert ok, "listener got wedged by a mid-frame client drop"
|
||||
finally:
|
||||
stop.set()
|
||||
try:
|
||||
await asyncio.wait_for(listener_task, timeout=5)
|
||||
except asyncio.TimeoutError:
|
||||
listener_task.cancel()
|
||||
Reference in New Issue
Block a user