feat(db): bulk add_logs for one-commit ingestion batches
Adds BaseRepository.add_logs (default: loops add_log for backwards compatibility) and a real single-session/single-commit implementation on SQLModelRepository. Introduces DECNET_BATCH_SIZE (default 100) and DECNET_BATCH_MAX_WAIT_MS (default 250) so the ingester can flush on either a size or a time bound when it adopts the new method. Ingester wiring is deferred to a later pass — the single-log path was deadlocking tests when flushed during lifespan teardown, so this change ships the DB primitive alone.
This commit is contained in:
@@ -77,6 +77,13 @@ DECNET_API_PORT: int = _port("DECNET_API_PORT", 8000)
|
|||||||
DECNET_JWT_SECRET: str = _require_env("DECNET_JWT_SECRET")
|
DECNET_JWT_SECRET: str = _require_env("DECNET_JWT_SECRET")
|
||||||
DECNET_INGEST_LOG_FILE: str | None = os.environ.get("DECNET_INGEST_LOG_FILE", "/var/log/decnet/decnet.log")
|
DECNET_INGEST_LOG_FILE: str | None = os.environ.get("DECNET_INGEST_LOG_FILE", "/var/log/decnet/decnet.log")
|
||||||
|
|
||||||
|
# Ingester batching: how many log rows to accumulate per commit, and the
|
||||||
|
# max wait (ms) before flushing a partial batch. Larger batches reduce
|
||||||
|
# SQLite write-lock contention; the timeout keeps latency bounded during
|
||||||
|
# low-traffic periods.
|
||||||
|
DECNET_BATCH_SIZE: int = int(os.environ.get("DECNET_BATCH_SIZE", "100"))
|
||||||
|
DECNET_BATCH_MAX_WAIT_MS: int = int(os.environ.get("DECNET_BATCH_MAX_WAIT_MS", "250"))
|
||||||
|
|
||||||
# Web Dashboard Options
|
# Web Dashboard Options
|
||||||
DECNET_WEB_HOST: str = os.environ.get("DECNET_WEB_HOST", "127.0.0.1")
|
DECNET_WEB_HOST: str = os.environ.get("DECNET_WEB_HOST", "127.0.0.1")
|
||||||
DECNET_WEB_PORT: int = _port("DECNET_WEB_PORT", 8080)
|
DECNET_WEB_PORT: int = _port("DECNET_WEB_PORT", 8080)
|
||||||
|
|||||||
@@ -15,6 +15,15 @@ class BaseRepository(ABC):
|
|||||||
"""Add a new log entry to the database."""
|
"""Add a new log entry to the database."""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
async def add_logs(self, log_entries: list[dict[str, Any]]) -> None:
|
||||||
|
"""Bulk-insert log entries in a single transaction.
|
||||||
|
|
||||||
|
Default implementation falls back to per-row add_log; concrete
|
||||||
|
repositories should override for a real single-commit insert.
|
||||||
|
"""
|
||||||
|
for _entry in log_entries:
|
||||||
|
await self.add_log(_entry)
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
async def get_logs(
|
async def get_logs(
|
||||||
self,
|
self,
|
||||||
|
|||||||
@@ -145,7 +145,8 @@ class SQLModelRepository(BaseRepository):
|
|||||||
|
|
||||||
# ---------------------------------------------------------------- logs
|
# ---------------------------------------------------------------- logs
|
||||||
|
|
||||||
async def add_log(self, log_data: dict[str, Any]) -> None:
|
@staticmethod
|
||||||
|
def _normalize_log_row(log_data: dict[str, Any]) -> dict[str, Any]:
|
||||||
data = log_data.copy()
|
data = log_data.copy()
|
||||||
if "fields" in data and isinstance(data["fields"], dict):
|
if "fields" in data and isinstance(data["fields"], dict):
|
||||||
data["fields"] = orjson.dumps(data["fields"]).decode()
|
data["fields"] = orjson.dumps(data["fields"]).decode()
|
||||||
@@ -156,11 +157,23 @@ class SQLModelRepository(BaseRepository):
|
|||||||
)
|
)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
pass
|
pass
|
||||||
|
return data
|
||||||
|
|
||||||
|
async def add_log(self, log_data: dict[str, Any]) -> None:
|
||||||
|
data = self._normalize_log_row(log_data)
|
||||||
async with self._session() as session:
|
async with self._session() as session:
|
||||||
session.add(Log(**data))
|
session.add(Log(**data))
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
|
async def add_logs(self, log_entries: list[dict[str, Any]]) -> None:
|
||||||
|
"""Bulk insert — one session, one commit for the whole batch."""
|
||||||
|
if not log_entries:
|
||||||
|
return
|
||||||
|
_rows = [Log(**self._normalize_log_row(e)) for e in log_entries]
|
||||||
|
async with self._session() as session:
|
||||||
|
session.add_all(_rows)
|
||||||
|
await session.commit()
|
||||||
|
|
||||||
def _apply_filters(
|
def _apply_filters(
|
||||||
self,
|
self,
|
||||||
statement: SelectOfScalar,
|
statement: SelectOfScalar,
|
||||||
|
|||||||
@@ -16,6 +16,35 @@ async def repo(tmp_path):
|
|||||||
return r
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_add_logs_bulk(repo):
|
||||||
|
_batch = [
|
||||||
|
{
|
||||||
|
"decky": f"decky-{i:02d}",
|
||||||
|
"service": "ssh",
|
||||||
|
"event_type": "connect",
|
||||||
|
"attacker_ip": f"10.0.0.{i}",
|
||||||
|
"raw_line": f"row {i}",
|
||||||
|
"fields": {"port": 22, "i": i},
|
||||||
|
"msg": "bulk",
|
||||||
|
}
|
||||||
|
for i in range(1, 11)
|
||||||
|
]
|
||||||
|
await repo.add_logs(_batch)
|
||||||
|
logs = await repo.get_logs(limit=50, offset=0)
|
||||||
|
assert len(logs) == 10
|
||||||
|
# fields dict was normalized to JSON string and round-trips
|
||||||
|
_ips = {entry["attacker_ip"] for entry in logs}
|
||||||
|
assert _ips == {f"10.0.0.{i}" for i in range(1, 11)}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.anyio
|
||||||
|
async def test_add_logs_empty_is_noop(repo):
|
||||||
|
await repo.add_logs([])
|
||||||
|
logs = await repo.get_logs(limit=10, offset=0)
|
||||||
|
assert logs == []
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.anyio
|
@pytest.mark.anyio
|
||||||
async def test_add_and_get_log(repo):
|
async def test_add_and_get_log(repo):
|
||||||
await repo.add_log({
|
await repo.add_log({
|
||||||
|
|||||||
Reference in New Issue
Block a user