log_collector_worker connects the bus at startup, builds a thread-safe system.log publisher, and hands it to each container-stream thread through _stream_container's new publish_fn parameter. Publishing fires right after the JSON record is written — same rate-limiter path, no extra parsing, compact payload (decky/service/event_type/attacker_ip/ timestamp) so subscribers can redraw without re-reading the DB. Bus stays optional: if get_bus() fails or DECNET_BUS_ENABLED=false the factory returns a no-op publisher and the stream thread calls it unconditionally. Hook failures are logged and never abort the thread.
186 lines
5.9 KiB
Python
186 lines
5.9 KiB
Python
"""Bus wiring for the collector (DEBT-031, worker 5).
|
|
|
|
Collector streams logs from Docker containers in a thread pool — can't be
|
|
exercised cleanly under pytest. These tests pin the two things that
|
|
actually carry the contract:
|
|
|
|
1. ``_stream_container`` invokes ``publish_fn(parsed)`` right after writing
|
|
the JSON record, and skips publish when the hook is absent.
|
|
2. ``_make_system_log_publisher`` routes under ``system.log`` with the
|
|
expected compact payload shape.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
|
|
from decnet.bus.fake import FakeBus
|
|
from decnet.collector.worker import (
|
|
_make_system_log_publisher,
|
|
_stream_container,
|
|
)
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def bus() -> FakeBus:
|
|
b = FakeBus()
|
|
await b.connect()
|
|
yield b
|
|
await b.close()
|
|
|
|
|
|
# ─── Thread-safe publisher factory ───────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_publisher_routes_under_system_log(bus: FakeBus) -> None:
|
|
loop = asyncio.get_running_loop()
|
|
publish = _make_system_log_publisher(bus, loop)
|
|
|
|
sub = bus.subscribe("system.log")
|
|
async with sub:
|
|
publish({
|
|
"timestamp": "2026-04-21 10:00:00",
|
|
"decky": "decky-a",
|
|
"service": "ssh",
|
|
"event_type": "auth_fail",
|
|
"attacker_ip": "1.2.3.4",
|
|
})
|
|
event = await asyncio.wait_for(sub.__anext__(), timeout=2.0)
|
|
|
|
assert event.topic == "system.log"
|
|
assert event.type == "auth_fail"
|
|
assert event.payload == {
|
|
"decky": "decky-a",
|
|
"service": "ssh",
|
|
"event_type": "auth_fail",
|
|
"attacker_ip": "1.2.3.4",
|
|
"timestamp": "2026-04-21 10:00:00",
|
|
}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_publisher_no_bus_is_noop() -> None:
|
|
# get_bus() failure path returns None → publisher is a no-op callable.
|
|
loop = asyncio.get_running_loop()
|
|
publish = _make_system_log_publisher(None, loop)
|
|
# Must be safely invocable; no exception, no hang.
|
|
publish({"event_type": "anything"})
|
|
|
|
|
|
# ─── Stream-thread integration: publish_fn wiring ────────────────────────────
|
|
|
|
class _FakeContainer:
|
|
"""Minimal duck-typed stand-in for docker.Container.logs(stream=True)."""
|
|
|
|
def __init__(self, lines: list[bytes]) -> None:
|
|
self._lines = lines
|
|
|
|
def logs(self, stream=True, follow=True, stdout=True, stderr=False):
|
|
yield from self._lines
|
|
|
|
|
|
class _FakeDockerClient:
|
|
def __init__(self, container: _FakeContainer) -> None:
|
|
self.containers = self # so .get() lookup below works
|
|
self._container = container
|
|
|
|
def get(self, _container_id: str) -> _FakeContainer:
|
|
return self._container
|
|
|
|
|
|
def _make_rfc5424_line() -> str:
|
|
# Crafted to pass _RFC5424_RE in collector.worker.
|
|
return (
|
|
"<134>1 2026-04-21T10:00:00+00:00 decky-a ssh - auth_fail "
|
|
"[decnet@32473 src_ip=\"1.2.3.4\"] failed password"
|
|
)
|
|
|
|
|
|
def test_stream_container_invokes_publish_fn(monkeypatch, tmp_path):
|
|
line = _make_rfc5424_line()
|
|
fake_container = _FakeContainer([line.encode() + b"\n"])
|
|
fake_client = _FakeDockerClient(fake_container)
|
|
|
|
import docker as _docker_mod
|
|
monkeypatch.setattr(_docker_mod, "from_env", lambda: fake_client)
|
|
|
|
captured: list[dict] = []
|
|
_stream_container(
|
|
"cid-xyz",
|
|
tmp_path / "decnet.log",
|
|
tmp_path / "decnet.json",
|
|
publish_fn=lambda parsed: captured.append(parsed),
|
|
)
|
|
|
|
# One parseable line → one publish call with the parsed dict.
|
|
assert len(captured) == 1
|
|
assert captured[0]["decky"] == "decky-a"
|
|
assert captured[0]["service"] == "ssh"
|
|
assert captured[0]["event_type"] == "auth_fail"
|
|
|
|
# JSON file still written — bus publishing is additive, not a replacement.
|
|
jf = (tmp_path / "decnet.json").read_text().strip().splitlines()
|
|
assert len(jf) == 1
|
|
assert json.loads(jf[0])["event_type"] == "auth_fail"
|
|
|
|
|
|
def test_stream_container_runs_without_publish_fn(monkeypatch, tmp_path):
|
|
# Pre-bus behavior: no publish_fn, no crash, JSON still written.
|
|
line = _make_rfc5424_line()
|
|
fake_container = _FakeContainer([line.encode() + b"\n"])
|
|
fake_client = _FakeDockerClient(fake_container)
|
|
|
|
import docker as _docker_mod
|
|
monkeypatch.setattr(_docker_mod, "from_env", lambda: fake_client)
|
|
|
|
_stream_container(
|
|
"cid-xyz",
|
|
tmp_path / "decnet.log",
|
|
tmp_path / "decnet.json",
|
|
)
|
|
|
|
jf = (tmp_path / "decnet.json").read_text().strip().splitlines()
|
|
assert len(jf) == 1
|
|
|
|
|
|
def test_stream_container_swallows_publish_failures(monkeypatch, tmp_path):
|
|
# Hook failure must not abort the stream thread.
|
|
line = _make_rfc5424_line()
|
|
fake_container = _FakeContainer([line.encode() + b"\n"])
|
|
fake_client = _FakeDockerClient(fake_container)
|
|
|
|
import docker as _docker_mod
|
|
monkeypatch.setattr(_docker_mod, "from_env", lambda: fake_client)
|
|
|
|
def _boom(_parsed):
|
|
raise RuntimeError("transport exploded")
|
|
|
|
# Must not raise.
|
|
_stream_container(
|
|
"cid-xyz",
|
|
tmp_path / "decnet.log",
|
|
tmp_path / "decnet.json",
|
|
publish_fn=_boom,
|
|
)
|
|
|
|
jf = (tmp_path / "decnet.json").read_text().strip().splitlines()
|
|
assert len(jf) == 1
|
|
|
|
|
|
# ─── Bus-disabled escape hatch ───────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_collector_degrades_cleanly_when_bus_disabled(
|
|
monkeypatch: pytest.MonkeyPatch,
|
|
) -> None:
|
|
from decnet.bus.factory import get_bus
|
|
|
|
monkeypatch.setenv("DECNET_BUS_ENABLED", "false")
|
|
b = get_bus(client_name="collector")
|
|
await b.connect()
|
|
await b.publish("system.log", {"event_type": "auth_fail"}, event_type="auth_fail")
|
|
await b.close()
|