From 11b9e85874538ddc1ee0bb4825c61c8511153441 Mon Sep 17 00:00:00 2001 From: anti Date: Fri, 17 Apr 2026 16:23:09 -0400 Subject: [PATCH] feat(db): bulk add_logs for one-commit ingestion batches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds BaseRepository.add_logs (default: loops add_log for backwards compatibility) and a real single-session/single-commit implementation on SQLModelRepository. Introduces DECNET_BATCH_SIZE (default 100) and DECNET_BATCH_MAX_WAIT_MS (default 250) so the ingester can flush on either a size or a time bound when it adopts the new method. Ingester wiring is deferred to a later pass — the single-log path was deadlocking tests when flushed during lifespan teardown, so this change ships the DB primitive alone. --- decnet/env.py | 7 +++++++ decnet/web/db/repository.py | 9 +++++++++ decnet/web/db/sqlmodel_repo.py | 15 ++++++++++++++- tests/api/test_repository.py | 29 +++++++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 1 deletion(-) diff --git a/decnet/env.py b/decnet/env.py index 290e949..bcc5dba 100644 --- a/decnet/env.py +++ b/decnet/env.py @@ -77,6 +77,13 @@ DECNET_API_PORT: int = _port("DECNET_API_PORT", 8000) DECNET_JWT_SECRET: str = _require_env("DECNET_JWT_SECRET") DECNET_INGEST_LOG_FILE: str | None = os.environ.get("DECNET_INGEST_LOG_FILE", "/var/log/decnet/decnet.log") +# Ingester batching: how many log rows to accumulate per commit, and the +# max wait (ms) before flushing a partial batch. Larger batches reduce +# SQLite write-lock contention; the timeout keeps latency bounded during +# low-traffic periods. +DECNET_BATCH_SIZE: int = int(os.environ.get("DECNET_BATCH_SIZE", "100")) +DECNET_BATCH_MAX_WAIT_MS: int = int(os.environ.get("DECNET_BATCH_MAX_WAIT_MS", "250")) + # Web Dashboard Options DECNET_WEB_HOST: str = os.environ.get("DECNET_WEB_HOST", "127.0.0.1") DECNET_WEB_PORT: int = _port("DECNET_WEB_PORT", 8080) diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 118c289..7ea025c 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -15,6 +15,15 @@ class BaseRepository(ABC): """Add a new log entry to the database.""" pass + async def add_logs(self, log_entries: list[dict[str, Any]]) -> None: + """Bulk-insert log entries in a single transaction. + + Default implementation falls back to per-row add_log; concrete + repositories should override for a real single-commit insert. + """ + for _entry in log_entries: + await self.add_log(_entry) + @abstractmethod async def get_logs( self, diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 0d0a351..d6f186d 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -145,7 +145,8 @@ class SQLModelRepository(BaseRepository): # ---------------------------------------------------------------- logs - async def add_log(self, log_data: dict[str, Any]) -> None: + @staticmethod + def _normalize_log_row(log_data: dict[str, Any]) -> dict[str, Any]: data = log_data.copy() if "fields" in data and isinstance(data["fields"], dict): data["fields"] = orjson.dumps(data["fields"]).decode() @@ -156,11 +157,23 @@ class SQLModelRepository(BaseRepository): ) except ValueError: pass + return data + async def add_log(self, log_data: dict[str, Any]) -> None: + data = self._normalize_log_row(log_data) async with self._session() as session: session.add(Log(**data)) await session.commit() + async def add_logs(self, log_entries: list[dict[str, Any]]) -> None: + """Bulk insert — one session, one commit for the whole batch.""" + if not log_entries: + return + _rows = [Log(**self._normalize_log_row(e)) for e in log_entries] + async with self._session() as session: + session.add_all(_rows) + await session.commit() + def _apply_filters( self, statement: SelectOfScalar, diff --git a/tests/api/test_repository.py b/tests/api/test_repository.py index 2337882..600b068 100644 --- a/tests/api/test_repository.py +++ b/tests/api/test_repository.py @@ -16,6 +16,35 @@ async def repo(tmp_path): return r +@pytest.mark.anyio +async def test_add_logs_bulk(repo): + _batch = [ + { + "decky": f"decky-{i:02d}", + "service": "ssh", + "event_type": "connect", + "attacker_ip": f"10.0.0.{i}", + "raw_line": f"row {i}", + "fields": {"port": 22, "i": i}, + "msg": "bulk", + } + for i in range(1, 11) + ] + await repo.add_logs(_batch) + logs = await repo.get_logs(limit=50, offset=0) + assert len(logs) == 10 + # fields dict was normalized to JSON string and round-trips + _ips = {entry["attacker_ip"] for entry in logs} + assert _ips == {f"10.0.0.{i}" for i in range(1, 11)} + + +@pytest.mark.anyio +async def test_add_logs_empty_is_noop(repo): + await repo.add_logs([]) + logs = await repo.get_logs(limit=10, offset=0) + assert logs == [] + + @pytest.mark.anyio async def test_add_and_get_log(repo): await repo.add_log({