Files
DECNET/tests/test_ingester.py
anti a10aee282f perf(ingester): batch log writes into bulk commits
The ingester now accumulates up to DECNET_BATCH_SIZE rows (default 100)
or DECNET_BATCH_MAX_WAIT_MS (default 250ms) before flushing through
repo.add_logs — one transaction, one COMMIT per batch instead of per
row. Under attacker traffic this collapses N commits into ⌈N/100⌉ and
takes most of the SQLite writer-lock contention off the hot path.

Flush semantics are cancel-safe: _position only advances after a batch
commits successfully, and the flush helper bails without touching the
DB if the enclosing task is being cancelled (lifespan teardown).
Un-flushed lines stay in the file and are re-read on next startup.

Tests updated to assert on add_logs (bulk) instead of the per-row
add_log that the ingester no longer uses, plus a new test that 250
lines flush in ≤5 calls.
2026-04-17 16:37:34 -04:00

395 lines
16 KiB
Python

"""
Tests for decnet/web/ingester.py
Covers log_ingestion_worker and _extract_bounty with
async tests using temporary files.
"""
import asyncio
import json
import os
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
# ── _extract_bounty ───────────────────────────────────────────────────────────
class TestExtractBounty:
@pytest.mark.asyncio
async def test_credential_extraction(self):
from decnet.web.ingester import _extract_bounty
mock_repo = MagicMock()
mock_repo.add_bounty = AsyncMock()
log_data: dict = {
"decky": "decky-01",
"service": "ssh",
"attacker_ip": "10.0.0.5",
"fields": {"username": "admin", "password": "hunter2"},
}
await _extract_bounty(mock_repo, log_data)
mock_repo.add_bounty.assert_awaited_once()
bounty = mock_repo.add_bounty.call_args[0][0]
assert bounty["bounty_type"] == "credential"
assert bounty["payload"]["username"] == "admin"
assert bounty["payload"]["password"] == "hunter2"
@pytest.mark.asyncio
async def test_no_fields_skips(self):
from decnet.web.ingester import _extract_bounty
mock_repo = MagicMock()
mock_repo.add_bounty = AsyncMock()
await _extract_bounty(mock_repo, {"decky": "x"})
mock_repo.add_bounty.assert_not_awaited()
@pytest.mark.asyncio
async def test_fields_not_dict_skips(self):
from decnet.web.ingester import _extract_bounty
mock_repo = MagicMock()
mock_repo.add_bounty = AsyncMock()
await _extract_bounty(mock_repo, {"fields": "not-a-dict"})
mock_repo.add_bounty.assert_not_awaited()
@pytest.mark.asyncio
async def test_missing_password_skips(self):
from decnet.web.ingester import _extract_bounty
mock_repo = MagicMock()
mock_repo.add_bounty = AsyncMock()
await _extract_bounty(mock_repo, {"fields": {"username": "admin"}})
mock_repo.add_bounty.assert_not_awaited()
@pytest.mark.asyncio
async def test_missing_username_skips(self):
from decnet.web.ingester import _extract_bounty
mock_repo = MagicMock()
mock_repo.add_bounty = AsyncMock()
await _extract_bounty(mock_repo, {"fields": {"password": "pass"}})
mock_repo.add_bounty.assert_not_awaited()
# ── log_ingestion_worker ──────────────────────────────────────────────────────
class TestLogIngestionWorker:
@pytest.mark.asyncio
async def test_no_env_var_returns_immediately(self):
from decnet.web.ingester import log_ingestion_worker
mock_repo = MagicMock()
with patch.dict(os.environ, {}, clear=False):
# Remove DECNET_INGEST_LOG_FILE if set
os.environ.pop("DECNET_INGEST_LOG_FILE", None)
await log_ingestion_worker(mock_repo)
# Should return immediately without error
@pytest.mark.asyncio
async def test_file_not_exists_waits(self, tmp_path):
from decnet.web.ingester import log_ingestion_worker
mock_repo = MagicMock()
mock_repo.add_log = AsyncMock()
mock_repo.add_logs = AsyncMock()
mock_repo.get_state = AsyncMock(return_value=None)
mock_repo.set_state = AsyncMock()
log_file = str(tmp_path / "nonexistent.log")
_call_count: int = 0
async def fake_sleep(secs):
nonlocal _call_count
_call_count += 1
if _call_count >= 2:
raise asyncio.CancelledError()
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
with pytest.raises(asyncio.CancelledError):
await log_ingestion_worker(mock_repo)
mock_repo.add_logs.assert_not_awaited()
@pytest.mark.asyncio
async def test_ingests_json_lines(self, tmp_path):
from decnet.web.ingester import log_ingestion_worker
mock_repo = MagicMock()
mock_repo.add_log = AsyncMock()
mock_repo.add_logs = AsyncMock()
mock_repo.add_bounty = AsyncMock()
mock_repo.get_state = AsyncMock(return_value=None)
mock_repo.set_state = AsyncMock()
log_file = str(tmp_path / "test.log")
json_file = tmp_path / "test.json"
json_file.write_text(
json.dumps({"decky": "d1", "service": "ssh", "event_type": "auth",
"attacker_ip": "1.2.3.4", "fields": {}, "raw_line": "x", "msg": ""}) + "\n"
)
_call_count: int = 0
async def fake_sleep(secs):
nonlocal _call_count
_call_count += 1
if _call_count >= 2:
raise asyncio.CancelledError()
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
with pytest.raises(asyncio.CancelledError):
await log_ingestion_worker(mock_repo)
mock_repo.add_logs.assert_awaited_once()
_batch = mock_repo.add_logs.call_args[0][0]
assert len(_batch) == 1
assert _batch[0]["attacker_ip"] == "1.2.3.4"
@pytest.mark.asyncio
async def test_handles_json_decode_error(self, tmp_path):
from decnet.web.ingester import log_ingestion_worker
mock_repo = MagicMock()
mock_repo.add_log = AsyncMock()
mock_repo.add_logs = AsyncMock()
mock_repo.add_bounty = AsyncMock()
mock_repo.get_state = AsyncMock(return_value=None)
mock_repo.set_state = AsyncMock()
log_file = str(tmp_path / "test.log")
json_file = tmp_path / "test.json"
json_file.write_text("not valid json\n")
_call_count: int = 0
async def fake_sleep(secs):
nonlocal _call_count
_call_count += 1
if _call_count >= 2:
raise asyncio.CancelledError()
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
with pytest.raises(asyncio.CancelledError):
await log_ingestion_worker(mock_repo)
mock_repo.add_logs.assert_not_awaited()
@pytest.mark.asyncio
async def test_file_truncation_resets_position(self, tmp_path):
from decnet.web.ingester import log_ingestion_worker
mock_repo = MagicMock()
mock_repo.add_log = AsyncMock()
mock_repo.add_logs = AsyncMock()
mock_repo.add_bounty = AsyncMock()
mock_repo.get_state = AsyncMock(return_value=None)
mock_repo.set_state = AsyncMock()
log_file = str(tmp_path / "test.log")
json_file = tmp_path / "test.json"
_line: str = json.dumps({"decky": "d1", "service": "ssh", "event_type": "auth",
"attacker_ip": "1.2.3.4", "fields": {}, "raw_line": "x", "msg": ""})
# Write 2 lines, then truncate to 1
json_file.write_text(_line + "\n" + _line + "\n")
_call_count: int = 0
async def fake_sleep(secs):
nonlocal _call_count
_call_count += 1
if _call_count == 2:
# Simulate truncation
json_file.write_text(_line + "\n")
if _call_count >= 4:
raise asyncio.CancelledError()
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
with pytest.raises(asyncio.CancelledError):
await log_ingestion_worker(mock_repo)
# Should have ingested lines from original + after truncation
_total = sum(len(call.args[0]) for call in mock_repo.add_logs.call_args_list)
assert _total >= 2
@pytest.mark.asyncio
async def test_partial_line_not_processed(self, tmp_path):
from decnet.web.ingester import log_ingestion_worker
mock_repo = MagicMock()
mock_repo.add_log = AsyncMock()
mock_repo.add_logs = AsyncMock()
mock_repo.add_bounty = AsyncMock()
mock_repo.get_state = AsyncMock(return_value=None)
mock_repo.set_state = AsyncMock()
log_file = str(tmp_path / "test.log")
json_file = tmp_path / "test.json"
# Write a partial line (no newline at end)
json_file.write_text('{"partial": true')
_call_count: int = 0
async def fake_sleep(secs):
nonlocal _call_count
_call_count += 1
if _call_count >= 2:
raise asyncio.CancelledError()
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
with pytest.raises(asyncio.CancelledError):
await log_ingestion_worker(mock_repo)
mock_repo.add_logs.assert_not_awaited()
@pytest.mark.asyncio
async def test_position_restored_skips_already_seen_lines(self, tmp_path):
"""Worker resumes from saved position and skips already-ingested content."""
from decnet.web.ingester import log_ingestion_worker
mock_repo = MagicMock()
mock_repo.add_log = AsyncMock()
mock_repo.add_logs = AsyncMock()
mock_repo.add_bounty = AsyncMock()
mock_repo.set_state = AsyncMock()
log_file = str(tmp_path / "test.log")
json_file = tmp_path / "test.json"
line_old = json.dumps({"decky": "d1", "service": "ssh", "event_type": "auth",
"attacker_ip": "1.1.1.1", "fields": {}, "raw_line": "x", "msg": ""}) + "\n"
line_new = json.dumps({"decky": "d2", "service": "ftp", "event_type": "auth",
"attacker_ip": "2.2.2.2", "fields": {}, "raw_line": "y", "msg": ""}) + "\n"
json_file.write_text(line_old + line_new)
# Saved position points to end of first line — only line_new should be ingested
saved_position = len(line_old.encode("utf-8"))
mock_repo.get_state = AsyncMock(return_value={"position": saved_position})
_call_count: int = 0
async def fake_sleep(secs):
nonlocal _call_count
_call_count += 1
if _call_count >= 2:
raise asyncio.CancelledError()
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
with pytest.raises(asyncio.CancelledError):
await log_ingestion_worker(mock_repo)
_rows = [r for call in mock_repo.add_logs.call_args_list for r in call.args[0]]
assert len(_rows) == 1
assert _rows[0]["attacker_ip"] == "2.2.2.2"
@pytest.mark.asyncio
async def test_set_state_called_with_position_after_batch(self, tmp_path):
"""set_state is called with the updated byte position after processing lines."""
from decnet.web.ingester import log_ingestion_worker, _INGEST_STATE_KEY
mock_repo = MagicMock()
mock_repo.add_log = AsyncMock()
mock_repo.add_logs = AsyncMock()
mock_repo.add_bounty = AsyncMock()
mock_repo.get_state = AsyncMock(return_value=None)
mock_repo.set_state = AsyncMock()
log_file = str(tmp_path / "test.log")
json_file = tmp_path / "test.json"
line = json.dumps({"decky": "d1", "service": "ssh", "event_type": "auth",
"attacker_ip": "1.1.1.1", "fields": {}, "raw_line": "x", "msg": ""}) + "\n"
json_file.write_text(line)
_call_count: int = 0
async def fake_sleep(secs):
nonlocal _call_count
_call_count += 1
if _call_count >= 2:
raise asyncio.CancelledError()
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
with pytest.raises(asyncio.CancelledError):
await log_ingestion_worker(mock_repo)
set_state_calls = mock_repo.set_state.call_args_list
position_calls = [c for c in set_state_calls if c[0][0] == _INGEST_STATE_KEY]
assert position_calls, "set_state never called with ingest position key"
saved_pos = position_calls[-1][0][1]["position"]
assert saved_pos == len(line.encode("utf-8"))
@pytest.mark.asyncio
async def test_batches_many_lines_into_few_commits(self, tmp_path):
"""250 lines with BATCH_SIZE=100 should flush in a handful of calls."""
from decnet.web.ingester import log_ingestion_worker
mock_repo = MagicMock()
mock_repo.add_log = AsyncMock()
mock_repo.add_logs = AsyncMock()
mock_repo.add_bounty = AsyncMock()
mock_repo.get_state = AsyncMock(return_value=None)
mock_repo.set_state = AsyncMock()
log_file = str(tmp_path / "test.log")
json_file = tmp_path / "test.json"
_lines = "".join(
json.dumps({
"decky": f"d{i}", "service": "ssh", "event_type": "auth",
"attacker_ip": f"10.0.0.{i % 256}", "fields": {}, "raw_line": "x", "msg": ""
}) + "\n"
for i in range(250)
)
json_file.write_text(_lines)
_call_count: int = 0
async def fake_sleep(secs):
nonlocal _call_count
_call_count += 1
if _call_count >= 2:
raise asyncio.CancelledError()
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
with pytest.raises(asyncio.CancelledError):
await log_ingestion_worker(mock_repo)
# 250 lines, batch=100 → 2 size-triggered flushes + 1 remainder flush.
# Asserting <= 5 leaves headroom for time-triggered flushes on slow CI.
assert mock_repo.add_logs.await_count <= 5
_rows = [r for call in mock_repo.add_logs.call_args_list for r in call.args[0]]
assert len(_rows) == 250
@pytest.mark.asyncio
async def test_truncation_resets_and_saves_zero_position(self, tmp_path):
"""On file truncation, set_state is called with position=0."""
from decnet.web.ingester import log_ingestion_worker, _INGEST_STATE_KEY
mock_repo = MagicMock()
mock_repo.add_log = AsyncMock()
mock_repo.add_logs = AsyncMock()
mock_repo.add_bounty = AsyncMock()
mock_repo.set_state = AsyncMock()
log_file = str(tmp_path / "test.log")
json_file = tmp_path / "test.json"
line = json.dumps({"decky": "d1", "service": "ssh", "event_type": "auth",
"attacker_ip": "1.1.1.1", "fields": {}, "raw_line": "x", "msg": ""}) + "\n"
# Pretend the saved position is past the end (simulates prior larger file)
big_position = len(line.encode("utf-8")) * 10
mock_repo.get_state = AsyncMock(return_value={"position": big_position})
json_file.write_text(line) # file is smaller than saved position → truncation
_call_count: int = 0
async def fake_sleep(secs):
nonlocal _call_count
_call_count += 1
if _call_count >= 2:
raise asyncio.CancelledError()
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
with patch("decnet.web.ingester.asyncio.sleep", side_effect=fake_sleep):
with pytest.raises(asyncio.CancelledError):
await log_ingestion_worker(mock_repo)
reset_calls = [
c for c in mock_repo.set_state.call_args_list
if c[0][0] == _INGEST_STATE_KEY and c[0][1] == {"position": 0}
]
assert reset_calls, "set_state not called with position=0 after truncation"