feat(collector): publish system.log per ingested event (DEBT-031 worker 5)

log_collector_worker connects the bus at startup, builds a thread-safe
system.log publisher, and hands it to each container-stream thread
through _stream_container's new publish_fn parameter.  Publishing fires
right after the JSON record is written — same rate-limiter path, no
extra parsing, compact payload (decky/service/event_type/attacker_ip/
timestamp) so subscribers can redraw without re-reading the DB.

Bus stays optional: if get_bus() fails or DECNET_BUS_ENABLED=false the
factory returns a no-op publisher and the stream thread calls it
unconditionally.  Hook failures are logged and never abort the thread.
This commit is contained in:
2026-04-21 16:57:21 -04:00
parent 67c2e30f89
commit a448dbe283
3 changed files with 260 additions and 3 deletions

View File

@@ -7,6 +7,7 @@ The ingester tails the .json file; rsyslog can consume the .log file independent
""" """
import asyncio import asyncio
import contextlib
import json import json
import os import os
import re import re
@@ -15,11 +16,19 @@ import time
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any, Optional from typing import Any, Callable, Optional
from decnet.bus import topics as _topics
from decnet.bus.factory import get_bus
from decnet.bus.publish import make_thread_safe_publisher
from decnet.logging import get_logger from decnet.logging import get_logger
from decnet.telemetry import traced as _traced, get_tracer as _get_tracer, inject_context as _inject_ctx from decnet.telemetry import traced as _traced, get_tracer as _get_tracer, inject_context as _inject_ctx
# Collector publish signature: ``publish_fn(parsed_event_dict)``. Callable
# from the container-stream threads; the worker wraps it around a thread-safe
# bus publisher that marshals onto the asyncio loop.
CollectorPublishFn = Callable[[dict[str, Any]], None]
logger = get_logger("collector") logger = get_logger("collector")
# ─── Ingestion rate limiter ─────────────────────────────────────────────────── # ─── Ingestion rate limiter ───────────────────────────────────────────────────
@@ -274,7 +283,12 @@ def _reopen_if_needed(path: Path, fh: Optional[Any]) -> Any:
@_traced("collector.stream_container") @_traced("collector.stream_container")
def _stream_container(container_id: str, log_path: Path, json_path: Path) -> None: def _stream_container(
container_id: str,
log_path: Path,
json_path: Path,
publish_fn: CollectorPublishFn | None = None,
) -> None:
"""Stream logs from one container and append to the host log files.""" """Stream logs from one container and append to the host log files."""
import docker # type: ignore[import] import docker # type: ignore[import]
@@ -309,6 +323,13 @@ def _stream_container(container_id: str, log_path: Path, json_path: Path) -> Non
jf = _reopen_if_needed(json_path, jf) jf = _reopen_if_needed(json_path, jf)
jf.write(json.dumps(parsed) + "\n") jf.write(json.dumps(parsed) + "\n")
jf.flush() jf.flush()
if publish_fn is not None:
try:
publish_fn(parsed)
except Exception as exc:
logger.debug(
"collector: bus publish failed: %s", exc,
)
else: else:
logger.debug( logger.debug(
"collector: rate-limited decky=%s service=%s type=%s attacker=%s", "collector: rate-limited decky=%s service=%s type=%s attacker=%s",
@@ -328,6 +349,41 @@ def _stream_container(container_id: str, log_path: Path, json_path: Path) -> Non
pass pass
# ─── Bus plumbing ─────────────────────────────────────────────────────────────
def _make_system_log_publisher(
bus: Any, loop: asyncio.AbstractEventLoop,
) -> CollectorPublishFn:
"""Factory: returns a ``publish_fn(parsed)`` for use by stream threads.
When *bus* is ``None`` the returned callable is a no-op, so the stream
thread can call it unconditionally. Otherwise each call is marshalled
onto *loop* (the asyncio event loop that owns the bus socket) via
``make_thread_safe_publisher``.
"""
raw_publish = make_thread_safe_publisher(bus, loop) if bus is not None else None
if raw_publish is None:
return lambda _parsed: None
topic = _topics.system(_topics.SYSTEM_LOG)
def _publish(parsed: dict[str, Any]) -> None:
event_type = parsed.get("event_type", "")
raw_publish(
topic,
{
"decky": parsed.get("decky", ""),
"service": parsed.get("service", ""),
"event_type": event_type,
"attacker_ip": parsed.get("attacker_ip", "Unknown"),
"timestamp": parsed.get("timestamp", ""),
},
event_type,
)
return _publish
# ─── Async collector ────────────────────────────────────────────────────────── # ─── Async collector ──────────────────────────────────────────────────────────
async def log_collector_worker(log_file: str) -> None: async def log_collector_worker(log_file: str) -> None:
@@ -347,6 +403,19 @@ async def log_collector_worker(log_file: str) -> None:
active: dict[str, asyncio.Task[None]] = {} active: dict[str, asyncio.Task[None]] = {}
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
# Optional bus wiring — per-line system.log publish. Fan-in from many
# container-stream threads is handled by make_thread_safe_publisher,
# which marshals each publish onto this loop.
bus = None
try:
bus = get_bus(client_name="collector")
await bus.connect()
except Exception as exc:
logger.warning("collector: bus unavailable, continuing without publish: %s", exc)
bus = None
_publish_log = _make_system_log_publisher(bus, loop)
# Dedicated thread pool so long-running container log streams don't # Dedicated thread pool so long-running container log streams don't
# saturate the default asyncio executor and starve short-lived # saturate the default asyncio executor and starve short-lived
# to_thread() calls elsewhere (e.g. load_state in the web API). # to_thread() calls elsewhere (e.g. load_state in the web API).
@@ -359,7 +428,7 @@ async def log_collector_worker(log_file: str) -> None:
active[container_id] = asyncio.ensure_future( active[container_id] = asyncio.ensure_future(
loop.run_in_executor( loop.run_in_executor(
collector_pool, _stream_container, collector_pool, _stream_container,
container_id, log_path, json_path, container_id, log_path, json_path, _publish_log,
), ),
loop=loop, loop=loop,
) )
@@ -396,3 +465,6 @@ async def log_collector_worker(log_file: str) -> None:
logger.error("collector error: %s", exc) logger.error("collector error: %s", exc)
finally: finally:
collector_pool.shutdown(wait=False) collector_pool.shutdown(wait=False)
if bus is not None:
with contextlib.suppress(Exception):
await bus.close()

View File

View File

@@ -0,0 +1,185 @@
"""Bus wiring for the collector (DEBT-031, worker 5).
Collector streams logs from Docker containers in a thread pool — can't be
exercised cleanly under pytest. These tests pin the two things that
actually carry the contract:
1. ``_stream_container`` invokes ``publish_fn(parsed)`` right after writing
the JSON record, and skips publish when the hook is absent.
2. ``_make_system_log_publisher`` routes under ``system.log`` with the
expected compact payload shape.
"""
from __future__ import annotations
import asyncio
import json
import pytest
import pytest_asyncio
from decnet.bus.fake import FakeBus
from decnet.collector.worker import (
_make_system_log_publisher,
_stream_container,
)
@pytest_asyncio.fixture
async def bus() -> FakeBus:
b = FakeBus()
await b.connect()
yield b
await b.close()
# ─── Thread-safe publisher factory ───────────────────────────────────────────
@pytest.mark.asyncio
async def test_publisher_routes_under_system_log(bus: FakeBus) -> None:
loop = asyncio.get_running_loop()
publish = _make_system_log_publisher(bus, loop)
sub = bus.subscribe("system.log")
async with sub:
publish({
"timestamp": "2026-04-21 10:00:00",
"decky": "decky-a",
"service": "ssh",
"event_type": "auth_fail",
"attacker_ip": "1.2.3.4",
})
event = await asyncio.wait_for(sub.__anext__(), timeout=2.0)
assert event.topic == "system.log"
assert event.type == "auth_fail"
assert event.payload == {
"decky": "decky-a",
"service": "ssh",
"event_type": "auth_fail",
"attacker_ip": "1.2.3.4",
"timestamp": "2026-04-21 10:00:00",
}
@pytest.mark.asyncio
async def test_publisher_no_bus_is_noop() -> None:
# get_bus() failure path returns None → publisher is a no-op callable.
loop = asyncio.get_running_loop()
publish = _make_system_log_publisher(None, loop)
# Must be safely invocable; no exception, no hang.
publish({"event_type": "anything"})
# ─── Stream-thread integration: publish_fn wiring ────────────────────────────
class _FakeContainer:
"""Minimal duck-typed stand-in for docker.Container.logs(stream=True)."""
def __init__(self, lines: list[bytes]) -> None:
self._lines = lines
def logs(self, stream=True, follow=True, stdout=True, stderr=False):
yield from self._lines
class _FakeDockerClient:
def __init__(self, container: _FakeContainer) -> None:
self.containers = self # so .get() lookup below works
self._container = container
def get(self, _container_id: str) -> _FakeContainer:
return self._container
def _make_rfc5424_line() -> str:
# Crafted to pass _RFC5424_RE in collector.worker.
return (
"<134>1 2026-04-21T10:00:00+00:00 decky-a ssh - auth_fail "
"[decnet@32473 src_ip=\"1.2.3.4\"] failed password"
)
def test_stream_container_invokes_publish_fn(monkeypatch, tmp_path):
line = _make_rfc5424_line()
fake_container = _FakeContainer([line.encode() + b"\n"])
fake_client = _FakeDockerClient(fake_container)
import docker as _docker_mod
monkeypatch.setattr(_docker_mod, "from_env", lambda: fake_client)
captured: list[dict] = []
_stream_container(
"cid-xyz",
tmp_path / "decnet.log",
tmp_path / "decnet.json",
publish_fn=lambda parsed: captured.append(parsed),
)
# One parseable line → one publish call with the parsed dict.
assert len(captured) == 1
assert captured[0]["decky"] == "decky-a"
assert captured[0]["service"] == "ssh"
assert captured[0]["event_type"] == "auth_fail"
# JSON file still written — bus publishing is additive, not a replacement.
jf = (tmp_path / "decnet.json").read_text().strip().splitlines()
assert len(jf) == 1
assert json.loads(jf[0])["event_type"] == "auth_fail"
def test_stream_container_runs_without_publish_fn(monkeypatch, tmp_path):
# Pre-bus behavior: no publish_fn, no crash, JSON still written.
line = _make_rfc5424_line()
fake_container = _FakeContainer([line.encode() + b"\n"])
fake_client = _FakeDockerClient(fake_container)
import docker as _docker_mod
monkeypatch.setattr(_docker_mod, "from_env", lambda: fake_client)
_stream_container(
"cid-xyz",
tmp_path / "decnet.log",
tmp_path / "decnet.json",
)
jf = (tmp_path / "decnet.json").read_text().strip().splitlines()
assert len(jf) == 1
def test_stream_container_swallows_publish_failures(monkeypatch, tmp_path):
# Hook failure must not abort the stream thread.
line = _make_rfc5424_line()
fake_container = _FakeContainer([line.encode() + b"\n"])
fake_client = _FakeDockerClient(fake_container)
import docker as _docker_mod
monkeypatch.setattr(_docker_mod, "from_env", lambda: fake_client)
def _boom(_parsed):
raise RuntimeError("transport exploded")
# Must not raise.
_stream_container(
"cid-xyz",
tmp_path / "decnet.log",
tmp_path / "decnet.json",
publish_fn=_boom,
)
jf = (tmp_path / "decnet.json").read_text().strip().splitlines()
assert len(jf) == 1
# ─── Bus-disabled escape hatch ───────────────────────────────────────────────
@pytest.mark.asyncio
async def test_collector_degrades_cleanly_when_bus_disabled(
monkeypatch: pytest.MonkeyPatch,
) -> None:
from decnet.bus.factory import get_bus
monkeypatch.setenv("DECNET_BUS_ENABLED", "false")
b = get_bus(client_name="collector")
await b.connect()
await b.publish("system.log", {"event_type": "auth_fail"}, event_type="auth_fail")
await b.close()