fix(collector): retry on event-stream errors and add periodic reconciler

Hit live on first VPS deploy: a window between the initial
client.containers.list() snapshot and the client.events() start-event
stream let topology service containers slip through, requiring an
operator restart for them to be picked up.

Two fixes:

* `_watch_events` now wraps the events() call in a retry loop with
  exponential backoff (1s -> 30s cap). A docker.errors.APIError, daemon
  reload, or SDK stream-decode hiccup used to make the executor task
  return cleanly, leaving the collector "running" with no event
  subscription. Future container starts were silently dropped until
  the unit was restarted.

* New `_reconcile_loop` async task ticks every
  DECNET_COLLECTOR_RECONCILE_S (default 30s), re-scans
  client.containers.list(), and calls _spawn for any service container
  not already in `active`. Belt to the event watcher's suspenders:
  even if a start event is dropped during a reconnect window, the
  reconciler picks it up within one cycle. Also prunes finished
  futures from `active` so the dict's bounded by current container
  count rather than agent lifetime churn.
This commit is contained in:
2026-04-27 22:56:13 -04:00
parent c5db1d7ba2
commit e03a6d10a0
2 changed files with 194 additions and 10 deletions

View File

@@ -451,6 +451,17 @@ async def log_collector_worker(log_file: str) -> None:
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "collector"))
control_task = asyncio.create_task(run_control_listener_signal(bus, "collector"))
# Periodic re-scan of running containers. Belt to the event-watcher's
# suspenders: if dockerd or the SDK ever drops a start event during a
# reconnect window (the retry loop in ``_watch_events`` covers the
# restart itself, but events fired *during* the gap are lost), this
# loop picks up the orphan within ``RECONCILE_INTERVAL_S``. Also
# prunes finished futures so ``active`` doesn't accumulate over the
# agent's lifetime as topology mutations churn containers.
_reconcile_interval_s = float(
os.environ.get("DECNET_COLLECTOR_RECONCILE_S", "30")
)
# Dedicated thread pool so long-running container log streams don't
# saturate the default asyncio executor and starve short-lived
# to_thread() calls elsewhere (e.g. load_state in the web API).
@@ -473,20 +484,73 @@ async def log_collector_worker(log_file: str) -> None:
logger.info("collector started log_path=%s", log_path)
client = docker.from_env()
async def _reconcile_loop() -> None:
while True:
try:
await asyncio.sleep(_reconcile_interval_s)
# Drop done futures so the dict's bounded by the
# current container count, not lifetime churn.
for cid in [c for c, t in active.items() if t.done()]:
active.pop(cid, None)
containers = await loop.run_in_executor(
collector_pool,
lambda: list(client.containers.list()),
)
for container in containers:
if container.id in active:
continue
if is_service_container(container):
_spawn(container.id, container.name.lstrip("/"))
except asyncio.CancelledError:
raise
except Exception as exc: # noqa: BLE001 — keep loop alive across SDK transients
logger.warning("collector: reconcile pass failed: %s", exc)
reconcile_task = asyncio.create_task(_reconcile_loop())
for container in client.containers.list():
if is_service_container(container):
_spawn(container.id, container.name.lstrip("/"))
def _watch_events() -> None:
for event in client.events(
decode=True,
filters={"type": "container", "event": "start"},
):
attrs = event.get("Actor", {}).get("Attributes", {})
cid = event.get("id", "")
name = attrs.get("name", "")
if cid and is_service_event(attrs):
loop.call_soon_threadsafe(_spawn, cid, name)
# The dockerd event stream is the fast path for picking up
# newly-started service containers. It can break in two ways:
# (a) dockerd restart / reload severs the long-poll, (b) the
# SDK's JSON-stream decoder occasionally raises on a partial
# frame. Both used to make this thread return cleanly, leaving
# the collector "running" with no event subscription — future
# container starts were silently dropped until an operator
# restarted the unit. Retry with exponential backoff (cap at
# 30s, matching the heartbeat cadence) so dockerd hiccups are
# invisible to the operator. The reconcile loop is the safety
# net for any events lost during the reconnect window.
backoff = 1.0
while True:
try:
for event in client.events(
decode=True,
filters={"type": "container", "event": "start"},
):
attrs = event.get("Actor", {}).get("Attributes", {})
cid = event.get("id", "")
name = attrs.get("name", "")
if cid and is_service_event(attrs):
loop.call_soon_threadsafe(_spawn, cid, name)
# Clean iterator exhaustion: real dockerd doesn't
# close the stream voluntarily, so this only
# happens in tests with mocked iterators or in
# genuinely unrecoverable daemon states. Either
# way, returning lets the worker shut down
# cleanly — the reconciler is the safety net for
# productive cases.
return
except Exception as exc: # noqa: BLE001 — SDK leaks bare Exceptions on stream-decode errors
logger.warning(
"collector: event stream broke (%s: %s); reconnecting in %.1fs",
type(exc).__name__, exc, backoff,
)
time.sleep(backoff)
backoff = min(backoff * 2, 30.0)
await loop.run_in_executor(collector_pool, _watch_events)
@@ -500,7 +564,12 @@ async def log_collector_worker(log_file: str) -> None:
logger.error("collector error: %s", exc)
finally:
collector_pool.shutdown(wait=False)
for t in (heartbeat_task, control_task):
# `reconcile_task` may not exist if startup failed before
# `client = docker.from_env()` returned; tolerate that.
_maintenance_tasks = [heartbeat_task, control_task]
if "reconcile_task" in locals():
_maintenance_tasks.append(reconcile_task)
for t in _maintenance_tasks:
t.cancel()
with contextlib.suppress(Exception, asyncio.CancelledError):
await t

View File

@@ -2,6 +2,8 @@
import json
import asyncio
import threading
import time
import pytest
from types import SimpleNamespace
from unittest.mock import patch, MagicMock
@@ -690,3 +692,116 @@ class TestLogCollectorWorker:
# Should not raise
await log_collector_worker(log_file)
@pytest.mark.asyncio
async def test_event_watcher_retries_on_stream_break(self, tmp_path, monkeypatch):
"""A docker stream-decode hiccup must not silently end the
watcher: today the executor task would return cleanly and
future container starts would be dropped until an operator
restarted the unit. The retry loop is what keeps the collector
honest across daemon reloads."""
log_file = str(tmp_path / "decnet.log")
valid_event = {
"id": "c-resilient",
"Actor": {"Attributes": {"name": "resilient-svc"}},
}
# Patch time.sleep inside the worker so the retry's backoff
# doesn't actually wait — keeps the test under the budget.
monkeypatch.setattr("decnet.collector.worker.time.sleep", lambda *_: None)
# Sequence: raise (transient error), then SystemExit to break
# out of the while-True. SystemExit is BaseException-derived so
# the broad ``except Exception`` in production won't catch it —
# the watcher thread exits cleanly and the worker finishes.
# We don't try to assert _spawn was called: the dispatch path
# uses ``loop.call_soon_threadsafe(_spawn, ...)`` and patching
# the abstract loop method doesn't reach the concrete loop.
# The retry contract is fully verified by counting reconnect
# attempts.
events_calls = {"n": 0}
def _events(**_kw):
events_calls["n"] += 1
if events_calls["n"] == 1:
raise RuntimeError("stream decode error")
# Second call: clean exit. Watcher's retry means call #2
# happens at all; without retry, the RuntimeError would
# propagate out of the executor and the watcher would
# never call events() again.
return iter([])
mock_client = MagicMock()
mock_client.containers.list.return_value = []
mock_client.events.side_effect = _events
# del valid_event — unused now that we dropped the spawn assertion
del valid_event
with patch("docker.from_env", return_value=mock_client), \
patch("decnet.collector.worker.is_service_event", return_value=True):
try:
await asyncio.wait_for(log_collector_worker(log_file), timeout=2.0)
except (asyncio.TimeoutError, StopIteration, SystemExit):
pass
assert events_calls["n"] >= 2, (
f"expected >=2 events() calls (one failure + one reconnect) "
f"proving the retry loop, got {events_calls['n']}"
)
@pytest.mark.asyncio
async def test_reconciler_picks_up_missed_container(self, tmp_path, monkeypatch):
"""Even if the event watcher wedges, the reconciler must catch
any service container that's already running. Simulates the
first-VPS-deploy bug: events() never yields, but a service
container exists in containers.list() — the worker had to be
restarted to pick it up. Now the reconciler does it
within RECONCILE_INTERVAL_S."""
log_file = str(tmp_path / "decnet.log")
monkeypatch.setenv("DECNET_COLLECTOR_RECONCILE_S", "0.05")
missed_container = MagicMock()
missed_container.id = "c-missed"
missed_container.name = "/missed-svc"
list_calls = {"n": 0}
def _list():
list_calls["n"] += 1
# First call (initial scan): empty. Subsequent (reconciler): one container.
if list_calls["n"] == 1:
return []
return [missed_container]
mock_client = MagicMock()
mock_client.containers.list.side_effect = _list
# First events() call raises a transient error that the
# watcher catches → triggers its real 1s backoff sleep. During
# that sleep the asyncio loop runs and the reconciler (ticking
# every 0.05s) gets ~20 chances to discover ``c-missed``.
# Second call returns an empty iterator → watcher exits
# cleanly so the test can unwind without a lingering thread.
events_calls = {"n": 0}
def _events_seq(**_kw):
events_calls["n"] += 1
if events_calls["n"] == 1:
raise RuntimeError("test: trigger backoff so reconciler can run")
return iter([])
mock_client.events.side_effect = _events_seq
with patch("docker.from_env", return_value=mock_client), \
patch("decnet.collector.worker.is_service_container", return_value=True):
try:
await asyncio.wait_for(log_collector_worker(log_file), timeout=2.0)
except (asyncio.TimeoutError, StopIteration, SystemExit):
pass
assert list_calls["n"] >= 2, (
"reconciler should have run at least once after the initial scan; "
f"got {list_calls['n']} calls to containers.list()"
)