diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py index bca1d634..b1f962ae 100644 --- a/decnet/web/ingester.py +++ b/decnet/web/ingester.py @@ -1,10 +1,14 @@ import asyncio +import contextlib import os import json import time from typing import Any from pathlib import Path +from decnet.bus import topics as _topics +from decnet.bus.factory import get_bus +from decnet.bus.publish import publish_safely from decnet.env import DECNET_BATCH_SIZE, DECNET_BATCH_MAX_WAIT_MS from decnet.logging import get_logger from decnet.telemetry import ( @@ -37,6 +41,31 @@ async def log_ingestion_worker(repo: BaseRepository) -> None: logger.info("ingest worker started path=%s position=%d", _json_log_path, _position) + # Optional bus wiring — emit one system.log event per committed batch so + # downstream consumers (dashboard heartbeats, federation forwarder) can + # track DB-persisted progress without polling the state table. + _bus = None + try: + _bus = get_bus(client_name="ingester") + await _bus.connect() + except Exception as _exc: + logger.warning("ingester: bus unavailable, continuing without publish: %s", _exc) + _bus = None + + try: + await _run_loop(repo, _json_log_path, _position, _bus) + finally: + if _bus is not None: + with contextlib.suppress(Exception): + await _bus.close() + + +async def _run_loop( + repo: BaseRepository, + _json_log_path: Path, + _position: int, + _bus: Any, +) -> None: while True: try: if not _json_log_path.exists(): @@ -97,13 +126,17 @@ async def log_ingestion_worker(repo: BaseRepository) -> None: if len(_batch) >= DECNET_BATCH_SIZE or ( time.monotonic() - _batch_started >= _max_wait_s ): + _flushed = len(_batch) _position = await _flush_batch(repo, _batch, _position) _batch.clear() _batch_started = time.monotonic() + await _publish_batch(_bus, _flushed, _position) # Flush any remainder collected before EOF / partial-line break. if _batch: + _flushed = len(_batch) _position = await _flush_batch(repo, _batch, _position) + await _publish_batch(_bus, _flushed, _position) except Exception as _e: _err_str = str(_e).lower() @@ -117,6 +150,23 @@ async def log_ingestion_worker(repo: BaseRepository) -> None: await asyncio.sleep(1) +async def _publish_batch(bus: Any, flushed: int, position: int) -> None: + """Emit one ``system.log`` event summarising a committed batch. + + Fire-and-forget via :func:`publish_safely`; a dead bus never blocks the + ingestion loop. Zero-row flushes are suppressed so the topic stays + meaningful. + """ + if bus is None or flushed <= 0: + return + await publish_safely( + bus, + _topics.system(_topics.SYSTEM_LOG), + {"component": "ingester", "flushed": flushed, "position": position}, + event_type="batch_committed", + ) + + async def _flush_batch( repo: BaseRepository, batch: list[tuple[dict[str, Any], int]], diff --git a/tests/web/__init__.py b/tests/web/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/web/test_ingester_bus.py b/tests/web/test_ingester_bus.py new file mode 100644 index 00000000..8ba78d6c --- /dev/null +++ b/tests/web/test_ingester_bus.py @@ -0,0 +1,83 @@ +"""Bus wiring for the ingester (DEBT-031, worker 6). + +The ingester emits one ``system.log`` event per DB-committed batch via +``_publish_batch``. Per-line noise lives on the collector side; the +ingester's job is to signal "N rows landed in the DB up to offset P" so +heartbeat / federation consumers can tail DB progress without polling +the state table. +""" +from __future__ import annotations + +import asyncio + +import pytest +import pytest_asyncio + +from decnet.bus.fake import FakeBus +from decnet.web.ingester import _publish_batch + + +@pytest_asyncio.fixture +async def bus() -> FakeBus: + b = FakeBus() + await b.connect() + yield b + await b.close() + + +@pytest.mark.asyncio +async def test_publish_batch_fires_on_nonempty_flush(bus: FakeBus) -> None: + sub = bus.subscribe("system.log") + async with sub: + await _publish_batch(bus, flushed=17, position=4096) + event = await asyncio.wait_for(sub.__anext__(), timeout=2.0) + + assert event.topic == "system.log" + assert event.type == "batch_committed" + assert event.payload == { + "component": "ingester", + "flushed": 17, + "position": 4096, + } + + +@pytest.mark.asyncio +async def test_publish_batch_skips_zero_row_flush(bus: FakeBus) -> None: + # An empty batch shouldn't pollute the topic — nothing to signal. + sub = bus.subscribe("system.log") + async with sub: + await _publish_batch(bus, flushed=0, position=0) + # Expect nothing within a short window. asyncio.wait_for raises + # TimeoutError when no event arrives. + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(sub.__anext__(), timeout=0.2) + + +@pytest.mark.asyncio +async def test_publish_batch_is_noop_when_bus_is_none() -> None: + # Bus-disabled path: ingester passes bus=None into _publish_batch. + # Must be a safe no-op; no exceptions, no hangs. + await _publish_batch(None, flushed=5, position=123) + + +@pytest.mark.asyncio +async def test_publish_batch_swallows_bus_failures(monkeypatch) -> None: + # A dead bus must never break the ingestion loop. + class _ExplodingBus: + async def publish(self, *_args, **_kwargs): + raise RuntimeError("transport exploded") + + await _publish_batch(_ExplodingBus(), flushed=3, position=42) + + +@pytest.mark.asyncio +async def test_ingester_degrades_cleanly_when_bus_disabled( + monkeypatch: pytest.MonkeyPatch, +) -> None: + from decnet.bus.factory import get_bus + + monkeypatch.setenv("DECNET_BUS_ENABLED", "false") + b = get_bus(client_name="ingester") + await b.connect() + await b.publish("system.log", {"component": "ingester"}, event_type="batch_committed") + await b.close()