diff --git a/decnet/cli/__init__.py b/decnet/cli/__init__.py index c51e7c25..1f7829dd 100644 --- a/decnet/cli/__init__.py +++ b/decnet/cli/__init__.py @@ -37,6 +37,7 @@ from . import ( topology, updater, web, + webhook, workers, ) from .gating import _gate_commands_by_mode @@ -54,7 +55,7 @@ for _mod in ( swarm, deploy, lifecycle, workers, inventory, web, profiler, sniffer, db, - topology, bus, geoip, init, + topology, bus, geoip, init, webhook, ): _mod.register(app) diff --git a/decnet/cli/gating.py b/decnet/cli/gating.py index a685cc6b..2b36236a 100644 --- a/decnet/cli/gating.py +++ b/decnet/cli/gating.py @@ -29,7 +29,7 @@ MASTER_ONLY_COMMANDS: frozenset[str] = frozenset({ "api", "swarmctl", "deploy", "redeploy", "teardown", "mutate", "listener", "profiler", "services", "distros", "correlate", "archetypes", "web", - "db-reset", "init", + "db-reset", "init", "webhook", }) MASTER_ONLY_GROUPS: frozenset[str] = frozenset({"swarm", "topology", "geoip"}) diff --git a/decnet/cli/webhook.py b/decnet/cli/webhook.py new file mode 100644 index 00000000..e01a4586 --- /dev/null +++ b/decnet/cli/webhook.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import typer + +from . import utils as _utils +from .utils import console, log + + +def register(app: typer.Typer) -> None: + @app.command(name="webhook") + def webhook_cmd( + daemon: bool = typer.Option( + False, "--daemon", "-d", help="Detach to background as a daemon process" + ), + ) -> None: + """Run the webhook dispatcher — bus consumer → external HTTP egress.""" + import asyncio + from decnet.web.dependencies import repo + from decnet.webhook import webhook_worker + + if daemon: + log.info("webhook daemonizing") + _utils._daemonize() + + log.info("webhook starting") + console.print("[bold cyan]Webhook dispatcher starting[/]") + + async def _run() -> None: + await repo.initialize() + await webhook_worker(repo) + + try: + asyncio.run(_run()) + except KeyboardInterrupt: + console.print("\n[yellow]Webhook worker stopped.[/]") diff --git a/decnet/webhook/__init__.py b/decnet/webhook/__init__.py index 51d405a4..a70e6632 100644 --- a/decnet/webhook/__init__.py +++ b/decnet/webhook/__init__.py @@ -1 +1,4 @@ """External webhook egress — ship bus events to SIEM/SOAR stacks.""" +from decnet.webhook.worker import webhook_worker + +__all__ = ["webhook_worker"] diff --git a/decnet/webhook/worker.py b/decnet/webhook/worker.py new file mode 100644 index 00000000..fcdf94eb --- /dev/null +++ b/decnet/webhook/worker.py @@ -0,0 +1,248 @@ +"""Webhook dispatcher — bus consumer → HTTP egress. + +Spawns one asyncio task per (subscription, pattern) pair. Each task +subscribes to the bus, iterates matching events, and POSTs them via +`decnet.webhook.client.deliver`. Reloads on `WEBHOOK_SUBSCRIPTIONS_CHANGED` +bus signals and as a slow fallback so a dropped signal costs latency, +not correctness. + +One-task-per-pair is deliberately dumb: cancellation propagates cleanly, +and the bus's own trie does the actual pattern matching — no in-memory +filter logic to maintain. Scales fine up to thousands of subs; if that +ever breaks down we collapse to one task per distinct pattern and add +in-memory dispatch. +""" +from __future__ import annotations + +import asyncio +import contextlib +import json +from datetime import datetime, timezone +from typing import Any + +import httpx + +from decnet.bus.factory import get_bus +from decnet.bus.publish import run_control_listener, run_health_heartbeat +from decnet.bus import topics as _topics +from decnet.logging import get_logger +from decnet.web.db.repository import BaseRepository +from decnet.webhook.client import deliver + +logger = get_logger("webhook_worker") + + +_RELOAD_FALLBACK_SECS = 60.0 +# Max parallel HTTP egress — one global semaphore keeps the process's +# outbound footprint bounded regardless of event volume. +_EGRESS_CONCURRENCY = 10 + + +def _patterns_for(sub: dict[str, Any]) -> list[str]: + raw = sub.get("topic_patterns") or "[]" + try: + return [p for p in json.loads(raw) if isinstance(p, str)] + except (ValueError, TypeError): + return [] + + +def _union_patterns(subs: list[dict[str, Any]]) -> list[str]: + """Dedup patterns across all subs, preserving first-occurrence order.""" + seen: set[str] = set() + out: list[str] = [] + for sub in subs: + for p in _patterns_for(sub): + if p not in seen: + seen.add(p) + out.append(p) + return out + + +async def webhook_worker( + repo: BaseRepository, + *, + reload_interval: float = _RELOAD_FALLBACK_SECS, + http_client: httpx.AsyncClient | None = None, +) -> None: + """Main entry — connect bus, spawn per-subscription delivery tasks, + reload on signal.""" + logger.info("webhook worker started") + + bus = None + try: + bus = get_bus(client_name="webhook") + await bus.connect() + except Exception as exc: # noqa: BLE001 — bus is optional (DEBT-031) + logger.warning("webhook: bus unavailable, running in idle mode: %s", exc) + bus = None + + shutdown = asyncio.Event() + reload_flag = asyncio.Event() + + heartbeat_task = ( + asyncio.create_task(run_health_heartbeat(bus, "webhook")) + if bus is not None else None + ) + control_task = ( + asyncio.create_task(run_control_listener(bus, "webhook", shutdown)) + if bus is not None else None + ) + reload_task = ( + asyncio.create_task(_reload_listener(bus, reload_flag, shutdown)) + if bus is not None else None + ) + + owns_http = http_client is None + if owns_http: + http_client = httpx.AsyncClient(timeout=10.0) + + semaphore = asyncio.Semaphore(_EGRESS_CONCURRENCY) + consumer_tasks: list[asyncio.Task] = [] + + try: + while not shutdown.is_set(): + # Cancel prior epoch's consumers before starting new ones. + await _cancel_all(consumer_tasks) + consumer_tasks.clear() + + subs = await repo.list_webhook_subscriptions(enabled_only=True) + + if bus is not None: + for sub in subs: + for pattern in _patterns_for(sub): + consumer_tasks.append(asyncio.create_task( + _consume( + bus, pattern, sub, repo, http_client, semaphore + ) + )) + + # Wait for reload OR timer fallback. Shutdown propagates via + # CancelledError when the outer task is cancelled — no explicit + # race required because `await` points are cancellation-safe. + with contextlib.suppress(asyncio.TimeoutError): + await asyncio.wait_for( + reload_flag.wait(), timeout=reload_interval + ) + reload_flag.clear() + except asyncio.CancelledError: + shutdown.set() + raise + finally: + await _cancel_all(consumer_tasks) + for t in (heartbeat_task, control_task, reload_task): + if t is not None: + t.cancel() + for t in (heartbeat_task, control_task, reload_task): + if t is not None: + with contextlib.suppress(asyncio.CancelledError, Exception): + await t + if bus is not None: + with contextlib.suppress(Exception): + await bus.close() + if owns_http and http_client is not None: + await http_client.aclose() + + +async def _cancel_all(tasks: list[asyncio.Task]) -> None: + for t in tasks: + if not t.done(): + t.cancel() + for t in tasks: + with contextlib.suppress(asyncio.CancelledError, Exception): + await t + + +async def _consume( + bus, + pattern: str, + sub: dict[str, Any], + repo: BaseRepository, + http_client: httpx.AsyncClient, + semaphore: asyncio.Semaphore, +) -> None: + """Subscribe to one pattern and dispatch events to one webhook.""" + try: + subscription = bus.subscribe(pattern) + async with subscription: + async for event in subscription: + asyncio.create_task( + _dispatch_one(repo, http_client, semaphore, sub, event) + ) + except asyncio.CancelledError: + raise + except Exception as exc: + logger.warning( + "webhook: consumer crashed sub=%s pattern=%s err=%s", + sub.get("name"), pattern, exc, + ) + + +async def _dispatch_one( + repo: BaseRepository, + http_client: httpx.AsyncClient, + semaphore: asyncio.Semaphore, + sub: dict[str, Any], + event: Any, +) -> None: + async with semaphore: + try: + result = await deliver(sub, event, client=http_client) + except Exception as exc: # noqa: BLE001 + logger.exception( + "webhook: deliver raised for sub=%s topic=%s: %s", + sub.get("uuid"), getattr(event, "topic", ""), exc, + ) + await _safe_record_failure(repo, sub["uuid"], f"internal: {exc}") + return + + now = datetime.now(timezone.utc) + if result.ok: + await _safe_record_success(repo, sub["uuid"], now) + else: + logger.warning( + "webhook: delivery failed sub=%s topic=%s status=%s err=%s", + sub.get("name"), getattr(event, "topic", ""), + result.status_code, result.error, + ) + await _safe_record_failure( + repo, sub["uuid"], result.error or "unknown" + ) + + +async def _safe_record_success( + repo: BaseRepository, uuid: str, ts: datetime +) -> None: + try: + await repo.record_webhook_success(uuid, ts) + except Exception as exc: + logger.warning("webhook: record_success failed: %s", exc) + + +async def _safe_record_failure( + repo: BaseRepository, uuid: str, error: str +) -> None: + try: + await repo.record_webhook_failure( + uuid, datetime.now(timezone.utc), error + ) + except Exception as exc: + logger.warning("webhook: record_failure failed: %s", exc) + + +async def _reload_listener( + bus, reload_flag: asyncio.Event, shutdown: asyncio.Event +) -> None: + """Set `reload_flag` on every WEBHOOK_SUBSCRIPTIONS_CHANGED signal.""" + try: + sub = bus.subscribe(_topics.WEBHOOK_SUBSCRIPTIONS_CHANGED) + async with sub: + async for _event in sub: + if shutdown.is_set(): + return + reload_flag.set() + except asyncio.CancelledError: + raise + except Exception as exc: + logger.warning( + "webhook: reload listener crashed, fallback timer only: %s", exc + ) diff --git a/deploy/decnet-webhook.service.j2 b/deploy/decnet-webhook.service.j2 new file mode 100644 index 00000000..02bf488c --- /dev/null +++ b/deploy/decnet-webhook.service.j2 @@ -0,0 +1,38 @@ +[Unit] +Description=DECNET Webhook Dispatcher (external SIEM/SOAR egress) +Documentation=https://git.resacachile.cl/anti/DECNET/wiki/Workers#webhook +After=network-online.target decnet-bus.service decnet-api.service +Wants=network-online.target decnet-bus.service + +[Service] +Type=simple +User={{ user }} +Group={{ group }} +WorkingDirectory={{ install_dir }} +EnvironmentFile=-{{ install_dir }}/.env.local +Environment=DECNET_SYSTEM_LOGS=/var/log/decnet/decnet.webhook.log +ExecStart={{ venv_dir }}/bin/decnet webhook +StandardOutput=append:/var/log/decnet/decnet.webhook.log +StandardError=append:/var/log/decnet/decnet.webhook.log + +CapabilityBoundingSet= +AmbientCapabilities= + +# Security Hardening +NoNewPrivileges=yes +ProtectSystem=full +ProtectHome=read-only +PrivateTmp=yes +ProtectKernelTunables=yes +ProtectKernelModules=yes +ProtectControlGroups=yes +RestrictSUIDSGID=yes +LockPersonality=yes +ReadWritePaths={{ install_dir }} /var/log/decnet + +Restart=on-failure +RestartSec=5 +TimeoutStopSec=15 + +[Install] +WantedBy=multi-user.target diff --git a/deploy/decnet.target b/deploy/decnet.target index 950e4697..7e25a177 100644 --- a/deploy/decnet.target +++ b/deploy/decnet.target @@ -12,7 +12,8 @@ Wants=decnet-bus.service \ decnet-profiler.service \ decnet-sniffer.service \ decnet-prober.service \ - decnet-mutator.service + decnet-mutator.service \ + decnet-webhook.service After=decnet-bus.service [Install] diff --git a/tests/webhook/conftest.py b/tests/webhook/conftest.py new file mode 100644 index 00000000..04e15dea --- /dev/null +++ b/tests/webhook/conftest.py @@ -0,0 +1,18 @@ +"""Shared fixtures for webhook worker tests.""" +from __future__ import annotations + +from typing import AsyncIterator + +import pytest_asyncio + +from decnet.bus.fake import FakeBus + + +@pytest_asyncio.fixture +async def fake_bus() -> AsyncIterator[FakeBus]: + bus = FakeBus() + await bus.connect() + try: + yield bus + finally: + await bus.close() diff --git a/tests/webhook/test_worker.py b/tests/webhook/test_worker.py new file mode 100644 index 00000000..35c1e924 --- /dev/null +++ b/tests/webhook/test_worker.py @@ -0,0 +1,236 @@ +"""Webhook worker — bus consumer → HTTP egress integration test.""" +from __future__ import annotations + +import asyncio +import json +from datetime import datetime, timezone +from typing import Any +from unittest.mock import patch + +import httpx +import pytest + +from decnet.bus import topics as _topics +from decnet.webhook.worker import ( + _patterns_for, + _union_patterns, + webhook_worker, +) + + +def _sub( + uuid: str, + name: str, + patterns: list[str], + *, + url: str = "https://w.example/x", + secret: str = "s" * 32, + enabled: bool = True, +) -> dict[str, Any]: + return { + "uuid": uuid, + "name": name, + "url": url, + "secret": secret, + "topic_patterns": json.dumps(patterns), + "enabled": enabled, + "consecutive_failures": 0, + "last_success_at": None, + "last_failure_at": None, + "last_error": None, + "created_at": datetime.now(timezone.utc), + "updated_at": datetime.now(timezone.utc), + } + + +class _FakeRepo: + def __init__(self, subs: list[dict[str, Any]]): + self.subs = subs + self.success_calls: list[str] = [] + self.failure_calls: list[tuple[str, str]] = [] + + async def list_webhook_subscriptions(self, enabled_only: bool = False) -> list[dict[str, Any]]: + return [s for s in self.subs if s["enabled"]] if enabled_only else list(self.subs) + + async def record_webhook_success(self, uuid: str, ts: datetime) -> None: + self.success_calls.append(uuid) + + async def record_webhook_failure(self, uuid: str, ts: datetime, error: str) -> None: + self.failure_calls.append((uuid, error)) + + +def test_patterns_for_decodes_json(): + assert _patterns_for( + {"topic_patterns": json.dumps(["attacker.>", "decky.*.state"])} + ) == ["attacker.>", "decky.*.state"] + + +def test_patterns_for_bad_json_returns_empty(): + assert _patterns_for({"topic_patterns": "not-json"}) == [] + + +def test_union_patterns_dedupes_across_subs(): + s1 = _sub("u1", "w1", ["attacker.>", "system.>"]) + s2 = _sub("u2", "w2", ["system.>", "decky.*.state"]) + assert _union_patterns([s1, s2]) == ["attacker.>", "system.>", "decky.*.state"] + + +@pytest.mark.asyncio +async def test_worker_dispatches_matching_event(fake_bus): + """A bus event matching a sub's pattern should produce an HTTP POST.""" + sub = _sub("u1", "w1", ["attacker.>"]) + repo = _FakeRepo([sub]) + captured: list[httpx.Request] = [] + + async def handler(request: httpx.Request) -> httpx.Response: + captured.append(request) + return httpx.Response(200) + + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + with patch("decnet.webhook.worker.get_bus", return_value=fake_bus): + task = asyncio.create_task( + webhook_worker(repo, reload_interval=0.5, http_client=client) + ) + # Give the worker a moment to subscribe. + await asyncio.sleep(0.2) + + await fake_bus.publish( + "attacker.observed", + {"ip": "1.2.3.4"}, + event_type="first_sighting", + ) + # Poll briefly for delivery. + for _ in range(40): + if captured: + break + await asyncio.sleep(0.05) + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + assert len(captured) == 1 + req = captured[0] + assert req.headers.get("X-DECNET-Signature", "").startswith("sha256=") + assert "attacker.observed" in req.headers.get("X-DECNET-Event-Topic", "") + assert repo.success_calls == ["u1"] + + +@pytest.mark.asyncio +async def test_worker_ignores_non_matching_event(fake_bus): + """An event outside the sub's pattern must not trigger a POST.""" + sub = _sub("u1", "w1", ["attacker.>"]) + repo = _FakeRepo([sub]) + captured: list[httpx.Request] = [] + + async def handler(request: httpx.Request) -> httpx.Response: + captured.append(request) + return httpx.Response(200) + + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + with patch("decnet.webhook.worker.get_bus", return_value=fake_bus): + task = asyncio.create_task( + webhook_worker(repo, reload_interval=0.5, http_client=client) + ) + await asyncio.sleep(0.2) + # system.log is NOT in attacker.> + await fake_bus.publish( + "system.log", + {"m": "irrelevant"}, + event_type="batch_committed", + ) + await asyncio.sleep(0.3) + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + assert captured == [] + assert repo.success_calls == [] + + +@pytest.mark.asyncio +async def test_worker_records_failure_on_5xx(fake_bus, monkeypatch): + sub = _sub("u1", "w1", ["attacker.>"]) + repo = _FakeRepo([sub]) + + # Collapse the retry schedule to zero-delay so the test doesn't wait + # the real 1+2+4s backoff sequence. + monkeypatch.setattr( + "decnet.webhook.client._DEFAULT_RETRY_SCHEDULE", (0.0, 0.0, 0.0) + ) + + async def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(503) + + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + with patch("decnet.webhook.worker.get_bus", return_value=fake_bus): + task = asyncio.create_task( + webhook_worker(repo, reload_interval=0.5, http_client=client) + ) + await asyncio.sleep(0.2) + await fake_bus.publish( + "attacker.observed", {"ip": "1.2.3.4"}, event_type="x" + ) + for _ in range(80): + if repo.failure_calls: + break + await asyncio.sleep(0.05) + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + assert repo.failure_calls + assert repo.failure_calls[0][0] == "u1" + + +@pytest.mark.asyncio +async def test_worker_reloads_on_subscriptions_changed_signal(fake_bus): + """A newly-enabled sub that arrives via the reload-signal path must + start receiving events without a worker restart.""" + subs = [_sub("u1", "w1", ["attacker.>"])] + repo = _FakeRepo(subs) + captured: list[httpx.Request] = [] + + async def handler(request: httpx.Request) -> httpx.Response: + captured.append(request) + return httpx.Response(200) + + async with httpx.AsyncClient(transport=httpx.MockTransport(handler)) as client: + with patch("decnet.webhook.worker.get_bus", return_value=fake_bus): + task = asyncio.create_task( + webhook_worker(repo, reload_interval=60.0, http_client=client) + ) + await asyncio.sleep(0.2) + + # Hot-add a sub that wants system.> + subs.append(_sub("u2", "w2", ["system.>"])) + await fake_bus.publish( + _topics.WEBHOOK_SUBSCRIPTIONS_CHANGED, {}, event_type="changed" + ) + await asyncio.sleep(0.3) # let worker reload + resubscribe + + await fake_bus.publish( + "system.log", {"m": "hi"}, event_type="batch_committed" + ) + for _ in range(80): + if captured: + break + await asyncio.sleep(0.05) + + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + # The new sub (u2) should have received the system.log event. + assert len(captured) == 1 + assert "system.log" in captured[0].headers.get("X-DECNET-Event-Topic", "")