feat(ingester): publish system.log per committed batch (DEBT-031 worker 6)

Ingester connects the bus at startup, emits a batch-committed summary
(component/flushed/position) after each successful _flush_batch.  Zero-
row flushes are suppressed so the topic stays meaningful.

Complements the collector's per-line system.log publishes: collector
signals ingress, ingester signals DB-persisted progress.  Federation
forwarder (worker 8) will subscribe to the batch-committed leaf to
trigger its upstream push.

Bus stays optional: publish_safely swallows failures, get_bus() can
return None, DECNET_BUS_ENABLED=false leaves the ingestion loop fully
functional.
This commit is contained in:
2026-04-21 16:58:49 -04:00
parent a448dbe283
commit cbb394a160
3 changed files with 133 additions and 0 deletions

View File

@@ -1,10 +1,14 @@
import asyncio
import contextlib
import os
import json
import time
from typing import Any
from pathlib import Path
from decnet.bus import topics as _topics
from decnet.bus.factory import get_bus
from decnet.bus.publish import publish_safely
from decnet.env import DECNET_BATCH_SIZE, DECNET_BATCH_MAX_WAIT_MS
from decnet.logging import get_logger
from decnet.telemetry import (
@@ -37,6 +41,31 @@ async def log_ingestion_worker(repo: BaseRepository) -> None:
logger.info("ingest worker started path=%s position=%d", _json_log_path, _position)
# Optional bus wiring — emit one system.log event per committed batch so
# downstream consumers (dashboard heartbeats, federation forwarder) can
# track DB-persisted progress without polling the state table.
_bus = None
try:
_bus = get_bus(client_name="ingester")
await _bus.connect()
except Exception as _exc:
logger.warning("ingester: bus unavailable, continuing without publish: %s", _exc)
_bus = None
try:
await _run_loop(repo, _json_log_path, _position, _bus)
finally:
if _bus is not None:
with contextlib.suppress(Exception):
await _bus.close()
async def _run_loop(
repo: BaseRepository,
_json_log_path: Path,
_position: int,
_bus: Any,
) -> None:
while True:
try:
if not _json_log_path.exists():
@@ -97,13 +126,17 @@ async def log_ingestion_worker(repo: BaseRepository) -> None:
if len(_batch) >= DECNET_BATCH_SIZE or (
time.monotonic() - _batch_started >= _max_wait_s
):
_flushed = len(_batch)
_position = await _flush_batch(repo, _batch, _position)
_batch.clear()
_batch_started = time.monotonic()
await _publish_batch(_bus, _flushed, _position)
# Flush any remainder collected before EOF / partial-line break.
if _batch:
_flushed = len(_batch)
_position = await _flush_batch(repo, _batch, _position)
await _publish_batch(_bus, _flushed, _position)
except Exception as _e:
_err_str = str(_e).lower()
@@ -117,6 +150,23 @@ async def log_ingestion_worker(repo: BaseRepository) -> None:
await asyncio.sleep(1)
async def _publish_batch(bus: Any, flushed: int, position: int) -> None:
"""Emit one ``system.log`` event summarising a committed batch.
Fire-and-forget via :func:`publish_safely`; a dead bus never blocks the
ingestion loop. Zero-row flushes are suppressed so the topic stays
meaningful.
"""
if bus is None or flushed <= 0:
return
await publish_safely(
bus,
_topics.system(_topics.SYSTEM_LOG),
{"component": "ingester", "flushed": flushed, "position": position},
event_type="batch_committed",
)
async def _flush_batch(
repo: BaseRepository,
batch: list[tuple[dict[str, Any], int]],