Files
DECNET/tests/webhook/test_worker.py
anti efc98285aa fix(webhook/worker): self-heal when bus starts late or restarts
Before: if the bus was unreachable at worker start, we logged
"running in idle mode" once and parked on shutdown forever. systemd
doesn't guarantee bus is fully up before the webhook worker starts,
so a race on boot left the worker permanently dead until restart.

Now: wrap the whole bus-use in an outer reconnect loop.

  while not shutdown:
    try: connect()
    except: sleep(RECONNECT_SECS) ; continue
    try: run_with_bus(...)       # heartbeat + dispatch
    except: log+close ; reconnect on next iter

Clean consequence: if the bus dies mid-operation the dispatch loop's
subscriptions raise inside the consumer tasks, `_run_with_bus` exits,
the outer loop closes the stale connection and reconnects. No partial
state leaks across epochs — fresh bus, fresh subs, fresh heartbeat.

Interval is 60s by default, overridable via
DECNET_WEBHOOK_BUS_RECONNECT_SECS. Shutdown wakes the wait so
systemctl stop doesn't hang for a minute.

Test added: flaky get_bus that fails once, then returns a live
FakeBus — asserts retry + successful delivery.

get_app_bus() in decnet/bus/app.py already has a 2s backoff retry so
the FastAPI hot path self-heals; this commit brings the standalone
webhook worker in line with the same posture.
2026-04-24 16:39:38 -04:00

352 lines
12 KiB
Python

"""Webhook worker — bus consumer → HTTP egress integration test."""
from __future__ import annotations
import asyncio
import json
from datetime import datetime, timezone
from typing import Any
from unittest.mock import patch
import httpx
import pytest
from decnet.bus import topics as _topics
from decnet.webhook.worker import (
_patterns_for,
_union_patterns,
webhook_worker,
)
def _sub(
uuid: str,
name: str,
patterns: list[str],
*,
url: str = "https://w.example/x",
secret: str = "s" * 32,
enabled: bool = True,
) -> dict[str, Any]:
return {
"uuid": uuid,
"name": name,
"url": url,
"secret": secret,
"topic_patterns": json.dumps(patterns),
"enabled": enabled,
"consecutive_failures": 0,
"last_success_at": None,
"last_failure_at": None,
"last_error": None,
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
class _FakeRepo:
def __init__(self, subs: list[dict[str, Any]]):
self.subs = subs
self.success_calls: list[str] = []
self.failure_calls: list[tuple[str, str]] = []
self.trip_calls: list[str] = []
self._failure_counts: dict[str, int] = {}
async def list_webhook_subscriptions(self, enabled_only: bool = False) -> list[dict[str, Any]]:
return [s for s in self.subs if s["enabled"]] if enabled_only else list(self.subs)
async def record_webhook_success(self, uuid: str, ts: datetime) -> None:
self.success_calls.append(uuid)
self._failure_counts[uuid] = 0
async def record_webhook_failure(self, uuid: str, ts: datetime, error: str) -> int:
self.failure_calls.append((uuid, error))
self._failure_counts[uuid] = self._failure_counts.get(uuid, 0) + 1
return self._failure_counts[uuid]
async def trip_webhook_circuit(self, uuid: str, ts: datetime) -> None:
self.trip_calls.append(uuid)
# Mirror the real DB effect: flip enabled=False so next reload
# skips this sub.
for s in self.subs:
if s["uuid"] == uuid:
s["enabled"] = False
s["auto_disabled_at"] = ts
def test_patterns_for_decodes_json():
assert _patterns_for(
{"topic_patterns": json.dumps(["attacker.>", "decky.*.state"])}
) == ["attacker.>", "decky.*.state"]
def test_patterns_for_bad_json_returns_empty():
assert _patterns_for({"topic_patterns": "not-json"}) == []
def test_union_patterns_dedupes_across_subs():
s1 = _sub("u1", "w1", ["attacker.>", "system.>"])
s2 = _sub("u2", "w2", ["system.>", "decky.*.state"])
assert _union_patterns([s1, s2]) == ["attacker.>", "system.>", "decky.*.state"]
@pytest.mark.asyncio
async def test_worker_dispatches_matching_event(fake_bus):
"""A bus event matching a sub's pattern should produce an HTTP POST."""
sub = _sub("u1", "w1", ["attacker.>"])
repo = _FakeRepo([sub])
captured: list[httpx.Request] = []
async def handler(request: httpx.Request) -> httpx.Response:
captured.append(request)
return httpx.Response(200)
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
with patch("decnet.webhook.worker.get_bus", return_value=fake_bus):
task = asyncio.create_task(
webhook_worker(repo, reload_interval=0.5, http_client=client)
)
# Give the worker a moment to subscribe.
await asyncio.sleep(0.2)
await fake_bus.publish(
"attacker.observed",
{"ip": "1.2.3.4"},
event_type="first_sighting",
)
# Poll briefly for delivery.
for _ in range(40):
if captured:
break
await asyncio.sleep(0.05)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
assert len(captured) == 1
req = captured[0]
assert req.headers.get("X-DECNET-Signature", "").startswith("sha256=")
assert "attacker.observed" in req.headers.get("X-DECNET-Event-Topic", "")
assert repo.success_calls == ["u1"]
@pytest.mark.asyncio
async def test_worker_ignores_non_matching_event(fake_bus):
"""An event outside the sub's pattern must not trigger a POST."""
sub = _sub("u1", "w1", ["attacker.>"])
repo = _FakeRepo([sub])
captured: list[httpx.Request] = []
async def handler(request: httpx.Request) -> httpx.Response:
captured.append(request)
return httpx.Response(200)
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
with patch("decnet.webhook.worker.get_bus", return_value=fake_bus):
task = asyncio.create_task(
webhook_worker(repo, reload_interval=0.5, http_client=client)
)
await asyncio.sleep(0.2)
# system.log is NOT in attacker.>
await fake_bus.publish(
"system.log",
{"m": "irrelevant"},
event_type="batch_committed",
)
await asyncio.sleep(0.3)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
assert captured == []
assert repo.success_calls == []
@pytest.mark.asyncio
async def test_worker_records_failure_on_5xx(fake_bus, monkeypatch):
sub = _sub("u1", "w1", ["attacker.>"])
repo = _FakeRepo([sub])
# Collapse the retry schedule to zero-delay so the test doesn't wait
# the real 1+2+4s backoff sequence.
monkeypatch.setattr(
"decnet.webhook.client._DEFAULT_RETRY_SCHEDULE", (0.0, 0.0, 0.0)
)
async def handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(503)
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
with patch("decnet.webhook.worker.get_bus", return_value=fake_bus):
task = asyncio.create_task(
webhook_worker(repo, reload_interval=0.5, http_client=client)
)
await asyncio.sleep(0.2)
await fake_bus.publish(
"attacker.observed", {"ip": "1.2.3.4"}, event_type="x"
)
for _ in range(80):
if repo.failure_calls:
break
await asyncio.sleep(0.05)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
assert repo.failure_calls
assert repo.failure_calls[0][0] == "u1"
@pytest.mark.asyncio
async def test_worker_reloads_on_subscriptions_changed_signal(fake_bus):
"""A newly-enabled sub that arrives via the reload-signal path must
start receiving events without a worker restart."""
subs = [_sub("u1", "w1", ["attacker.>"])]
repo = _FakeRepo(subs)
captured: list[httpx.Request] = []
async def handler(request: httpx.Request) -> httpx.Response:
captured.append(request)
return httpx.Response(200)
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
with patch("decnet.webhook.worker.get_bus", return_value=fake_bus):
task = asyncio.create_task(
webhook_worker(repo, reload_interval=60.0, http_client=client)
)
await asyncio.sleep(0.2)
# Hot-add a sub that wants system.>
subs.append(_sub("u2", "w2", ["system.>"]))
await fake_bus.publish(
_topics.WEBHOOK_SUBSCRIPTIONS_CHANGED, {}, event_type="changed"
)
await asyncio.sleep(0.3) # let worker reload + resubscribe
await fake_bus.publish(
"system.log", {"m": "hi"}, event_type="batch_committed"
)
for _ in range(80):
if captured:
break
await asyncio.sleep(0.05)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
# The new sub (u2) should have received the system.log event.
assert len(captured) == 1
assert "system.log" in captured[0].headers.get("X-DECNET-Event-Topic", "")
@pytest.mark.asyncio
async def test_worker_trips_circuit_after_threshold(fake_bus, monkeypatch):
"""After N consecutive failures the worker auto-disables the sub."""
sub = _sub("u1", "w1", ["attacker.>"])
repo = _FakeRepo([sub])
# Tight threshold + zero-delay retry so the test finishes fast.
monkeypatch.setattr("decnet.webhook.worker._CIRCUIT_THRESHOLD", 2)
monkeypatch.setattr(
"decnet.webhook.client._DEFAULT_RETRY_SCHEDULE", (0.0, 0.0, 0.0)
)
async def handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(503)
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
with patch("decnet.webhook.worker.get_bus", return_value=fake_bus):
task = asyncio.create_task(
webhook_worker(repo, reload_interval=0.5, http_client=client)
)
await asyncio.sleep(0.2)
# Publish two events — each fails N retries, each increments
# consecutive_failures by 1. Second trip should fire.
await fake_bus.publish("attacker.observed", {}, event_type="x")
await fake_bus.publish("attacker.observed", {}, event_type="x")
for _ in range(120):
if repo.trip_calls:
break
await asyncio.sleep(0.05)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
assert repo.trip_calls, "expected circuit to trip after threshold"
assert repo.trip_calls[0] == "u1"
# The sub was flipped to enabled=False by trip_webhook_circuit.
assert sub["enabled"] is False
assert sub["auto_disabled_at"] is not None
@pytest.mark.asyncio
async def test_worker_self_heals_when_bus_starts_late(fake_bus):
"""Bus down at startup → worker parks in a retry loop. Once the bus
comes up on the next attempt, the worker transitions to dispatch
mode and delivers events normally.
"""
sub = _sub("u1", "w1", ["attacker.>"])
repo = _FakeRepo([sub])
captured: list[httpx.Request] = []
async def handler(request: httpx.Request) -> httpx.Response:
captured.append(request)
return httpx.Response(200)
# First call: raise (bus unavailable). Second call: hand out the
# FakeBus. The worker's retry loop should bridge the gap.
calls = {"n": 0}
def flaky_get_bus(*args, **kwargs):
calls["n"] += 1
if calls["n"] == 1:
raise ConnectionError("bus not ready yet")
return fake_bus
async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client:
with patch("decnet.webhook.worker.get_bus", side_effect=flaky_get_bus):
task = asyncio.create_task(
webhook_worker(
repo,
reload_interval=0.5,
http_client=client,
bus_reconnect_secs=0.3,
)
)
# First attempt fails, worker waits bus_reconnect_secs, second
# attempt succeeds. Give it a generous window.
await asyncio.sleep(0.8)
await fake_bus.publish(
"attacker.observed", {"ip": "1.2.3.4"}, event_type="x"
)
for _ in range(60):
if captured:
break
await asyncio.sleep(0.05)
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
assert calls["n"] >= 2, "expected at least one retry after initial failure"
assert len(captured) == 1, "expected delivery once bus came up"