diff --git a/decnet/collector/worker.py b/decnet/collector/worker.py index 2a747a23..582dc608 100644 --- a/decnet/collector/worker.py +++ b/decnet/collector/worker.py @@ -7,6 +7,7 @@ The ingester tails the .json file; rsyslog can consume the .log file independent """ import asyncio +import contextlib import json import os import re @@ -15,11 +16,19 @@ import time from concurrent.futures import ThreadPoolExecutor from datetime import datetime from pathlib import Path -from typing import Any, Optional +from typing import Any, Callable, Optional +from decnet.bus import topics as _topics +from decnet.bus.factory import get_bus +from decnet.bus.publish import make_thread_safe_publisher from decnet.logging import get_logger from decnet.telemetry import traced as _traced, get_tracer as _get_tracer, inject_context as _inject_ctx +# Collector publish signature: ``publish_fn(parsed_event_dict)``. Callable +# from the container-stream threads; the worker wraps it around a thread-safe +# bus publisher that marshals onto the asyncio loop. +CollectorPublishFn = Callable[[dict[str, Any]], None] + logger = get_logger("collector") # ─── Ingestion rate limiter ─────────────────────────────────────────────────── @@ -274,7 +283,12 @@ def _reopen_if_needed(path: Path, fh: Optional[Any]) -> Any: @_traced("collector.stream_container") -def _stream_container(container_id: str, log_path: Path, json_path: Path) -> None: +def _stream_container( + container_id: str, + log_path: Path, + json_path: Path, + publish_fn: CollectorPublishFn | None = None, +) -> None: """Stream logs from one container and append to the host log files.""" import docker # type: ignore[import] @@ -309,6 +323,13 @@ def _stream_container(container_id: str, log_path: Path, json_path: Path) -> Non jf = _reopen_if_needed(json_path, jf) jf.write(json.dumps(parsed) + "\n") jf.flush() + if publish_fn is not None: + try: + publish_fn(parsed) + except Exception as exc: + logger.debug( + "collector: bus publish failed: %s", exc, + ) else: logger.debug( "collector: rate-limited decky=%s service=%s type=%s attacker=%s", @@ -328,6 +349,41 @@ def _stream_container(container_id: str, log_path: Path, json_path: Path) -> Non pass +# ─── Bus plumbing ───────────────────────────────────────────────────────────── + +def _make_system_log_publisher( + bus: Any, loop: asyncio.AbstractEventLoop, +) -> CollectorPublishFn: + """Factory: returns a ``publish_fn(parsed)`` for use by stream threads. + + When *bus* is ``None`` the returned callable is a no-op, so the stream + thread can call it unconditionally. Otherwise each call is marshalled + onto *loop* (the asyncio event loop that owns the bus socket) via + ``make_thread_safe_publisher``. + """ + raw_publish = make_thread_safe_publisher(bus, loop) if bus is not None else None + if raw_publish is None: + return lambda _parsed: None + + topic = _topics.system(_topics.SYSTEM_LOG) + + def _publish(parsed: dict[str, Any]) -> None: + event_type = parsed.get("event_type", "") + raw_publish( + topic, + { + "decky": parsed.get("decky", ""), + "service": parsed.get("service", ""), + "event_type": event_type, + "attacker_ip": parsed.get("attacker_ip", "Unknown"), + "timestamp": parsed.get("timestamp", ""), + }, + event_type, + ) + + return _publish + + # ─── Async collector ────────────────────────────────────────────────────────── async def log_collector_worker(log_file: str) -> None: @@ -347,6 +403,19 @@ async def log_collector_worker(log_file: str) -> None: active: dict[str, asyncio.Task[None]] = {} loop = asyncio.get_running_loop() + # Optional bus wiring — per-line system.log publish. Fan-in from many + # container-stream threads is handled by make_thread_safe_publisher, + # which marshals each publish onto this loop. + bus = None + try: + bus = get_bus(client_name="collector") + await bus.connect() + except Exception as exc: + logger.warning("collector: bus unavailable, continuing without publish: %s", exc) + bus = None + + _publish_log = _make_system_log_publisher(bus, loop) + # Dedicated thread pool so long-running container log streams don't # saturate the default asyncio executor and starve short-lived # to_thread() calls elsewhere (e.g. load_state in the web API). @@ -359,7 +428,7 @@ async def log_collector_worker(log_file: str) -> None: active[container_id] = asyncio.ensure_future( loop.run_in_executor( collector_pool, _stream_container, - container_id, log_path, json_path, + container_id, log_path, json_path, _publish_log, ), loop=loop, ) @@ -396,3 +465,6 @@ async def log_collector_worker(log_file: str) -> None: logger.error("collector error: %s", exc) finally: collector_pool.shutdown(wait=False) + if bus is not None: + with contextlib.suppress(Exception): + await bus.close() diff --git a/tests/collector/__init__.py b/tests/collector/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/collector/test_collector_bus.py b/tests/collector/test_collector_bus.py new file mode 100644 index 00000000..493e709e --- /dev/null +++ b/tests/collector/test_collector_bus.py @@ -0,0 +1,185 @@ +"""Bus wiring for the collector (DEBT-031, worker 5). + +Collector streams logs from Docker containers in a thread pool — can't be +exercised cleanly under pytest. These tests pin the two things that +actually carry the contract: + +1. ``_stream_container`` invokes ``publish_fn(parsed)`` right after writing + the JSON record, and skips publish when the hook is absent. +2. ``_make_system_log_publisher`` routes under ``system.log`` with the + expected compact payload shape. +""" +from __future__ import annotations + +import asyncio +import json + +import pytest +import pytest_asyncio + +from decnet.bus.fake import FakeBus +from decnet.collector.worker import ( + _make_system_log_publisher, + _stream_container, +) + + +@pytest_asyncio.fixture +async def bus() -> FakeBus: + b = FakeBus() + await b.connect() + yield b + await b.close() + + +# ─── Thread-safe publisher factory ─────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_publisher_routes_under_system_log(bus: FakeBus) -> None: + loop = asyncio.get_running_loop() + publish = _make_system_log_publisher(bus, loop) + + sub = bus.subscribe("system.log") + async with sub: + publish({ + "timestamp": "2026-04-21 10:00:00", + "decky": "decky-a", + "service": "ssh", + "event_type": "auth_fail", + "attacker_ip": "1.2.3.4", + }) + event = await asyncio.wait_for(sub.__anext__(), timeout=2.0) + + assert event.topic == "system.log" + assert event.type == "auth_fail" + assert event.payload == { + "decky": "decky-a", + "service": "ssh", + "event_type": "auth_fail", + "attacker_ip": "1.2.3.4", + "timestamp": "2026-04-21 10:00:00", + } + + +@pytest.mark.asyncio +async def test_publisher_no_bus_is_noop() -> None: + # get_bus() failure path returns None → publisher is a no-op callable. + loop = asyncio.get_running_loop() + publish = _make_system_log_publisher(None, loop) + # Must be safely invocable; no exception, no hang. + publish({"event_type": "anything"}) + + +# ─── Stream-thread integration: publish_fn wiring ──────────────────────────── + +class _FakeContainer: + """Minimal duck-typed stand-in for docker.Container.logs(stream=True).""" + + def __init__(self, lines: list[bytes]) -> None: + self._lines = lines + + def logs(self, stream=True, follow=True, stdout=True, stderr=False): + yield from self._lines + + +class _FakeDockerClient: + def __init__(self, container: _FakeContainer) -> None: + self.containers = self # so .get() lookup below works + self._container = container + + def get(self, _container_id: str) -> _FakeContainer: + return self._container + + +def _make_rfc5424_line() -> str: + # Crafted to pass _RFC5424_RE in collector.worker. + return ( + "<134>1 2026-04-21T10:00:00+00:00 decky-a ssh - auth_fail " + "[decnet@32473 src_ip=\"1.2.3.4\"] failed password" + ) + + +def test_stream_container_invokes_publish_fn(monkeypatch, tmp_path): + line = _make_rfc5424_line() + fake_container = _FakeContainer([line.encode() + b"\n"]) + fake_client = _FakeDockerClient(fake_container) + + import docker as _docker_mod + monkeypatch.setattr(_docker_mod, "from_env", lambda: fake_client) + + captured: list[dict] = [] + _stream_container( + "cid-xyz", + tmp_path / "decnet.log", + tmp_path / "decnet.json", + publish_fn=lambda parsed: captured.append(parsed), + ) + + # One parseable line → one publish call with the parsed dict. + assert len(captured) == 1 + assert captured[0]["decky"] == "decky-a" + assert captured[0]["service"] == "ssh" + assert captured[0]["event_type"] == "auth_fail" + + # JSON file still written — bus publishing is additive, not a replacement. + jf = (tmp_path / "decnet.json").read_text().strip().splitlines() + assert len(jf) == 1 + assert json.loads(jf[0])["event_type"] == "auth_fail" + + +def test_stream_container_runs_without_publish_fn(monkeypatch, tmp_path): + # Pre-bus behavior: no publish_fn, no crash, JSON still written. + line = _make_rfc5424_line() + fake_container = _FakeContainer([line.encode() + b"\n"]) + fake_client = _FakeDockerClient(fake_container) + + import docker as _docker_mod + monkeypatch.setattr(_docker_mod, "from_env", lambda: fake_client) + + _stream_container( + "cid-xyz", + tmp_path / "decnet.log", + tmp_path / "decnet.json", + ) + + jf = (tmp_path / "decnet.json").read_text().strip().splitlines() + assert len(jf) == 1 + + +def test_stream_container_swallows_publish_failures(monkeypatch, tmp_path): + # Hook failure must not abort the stream thread. + line = _make_rfc5424_line() + fake_container = _FakeContainer([line.encode() + b"\n"]) + fake_client = _FakeDockerClient(fake_container) + + import docker as _docker_mod + monkeypatch.setattr(_docker_mod, "from_env", lambda: fake_client) + + def _boom(_parsed): + raise RuntimeError("transport exploded") + + # Must not raise. + _stream_container( + "cid-xyz", + tmp_path / "decnet.log", + tmp_path / "decnet.json", + publish_fn=_boom, + ) + + jf = (tmp_path / "decnet.json").read_text().strip().splitlines() + assert len(jf) == 1 + + +# ─── Bus-disabled escape hatch ─────────────────────────────────────────────── + +@pytest.mark.asyncio +async def test_collector_degrades_cleanly_when_bus_disabled( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from decnet.bus.factory import get_bus + + monkeypatch.setenv("DECNET_BUS_ENABLED", "false") + b = get_bus(client_name="collector") + await b.connect() + await b.publish("system.log", {"event_type": "auth_fail"}, event_type="auth_fail") + await b.close()