fix(ingester): retry get_state on startup DB errors; bump deps + rename behave packages

ingester: wrap bootstrap get_state() in forever-retry loop — MySQL coming
up after the API process killed the ingestion task permanently before it
ever entered _run_loop. Regression test added.

deps: idna 3.13→3.15 (CVE-2026-45409), twisted 26.4.0rc2→26.4.0
(PYSEC-2026-160), pip 26.1→26.1.1 (CVE-2026-3219 resolved upstream),
behave-core/behave-shell renamed from decnet-behave-* and bumped to 0.1.1.
pre-commit hook updated to reflect current ignore list.
This commit is contained in:
2026-05-20 22:10:15 -04:00
parent 916b21b652
commit 7bac3a29c6
3 changed files with 71 additions and 4 deletions

View File

@@ -41,7 +41,22 @@ async def log_ingestion_worker(repo: BaseRepository) -> None:
_json_log_path: Path = Path(_base_log_file).with_suffix(".json") _json_log_path: Path = Path(_base_log_file).with_suffix(".json")
_saved = await repo.get_state(_INGEST_STATE_KEY) # Retry forever on transient DB failures — MySQL may still be coming
# up when the API spawns this task (api.py's own DB-init retry only
# bounds the *first* connection, not subsequent ops). If we throw here
# the task dies and the ingester is silently dead for the whole API
# lifetime — exactly the failure mode that caused decnet.json to grow
# unbounded with zero rows in the `logs` table on 2026-05-20.
while True:
try:
_saved = await repo.get_state(_INGEST_STATE_KEY)
break
except Exception as _exc:
logger.warning(
"ingester: DB not ready for state load, retrying in 5s: %s",
_exc,
)
await _sleep(5)
_position: int = _saved.get("position", 0) if _saved else 0 _position: int = _saved.get("position", 0) if _saved else 0
logger.info("ingest worker started path=%s position=%d", _json_log_path, _position) logger.info("ingest worker started path=%s position=%d", _json_log_path, _position)

View File

@@ -52,8 +52,8 @@ dependencies = [
# the Observation envelope; shell ships the primitive registry + # the Observation envelope; shell ships the primitive registry +
# bus event adapter consumed by decnet/profiler/behave_shell/. Pin # bus event adapter consumed by decnet/profiler/behave_shell/. Pin
# range tracks BEHAVE-INTEGRATION.md §"Versioning". # range tracks BEHAVE-INTEGRATION.md §"Versioning".
"decnet-behave-core>=0.1.0,<0.2", "behave-core>=0.1.0,<0.2",
"decnet-behave-shell>=0.1.0,<0.2", "behave-shell>=0.1.0,<0.2",
# STIX → MISP conversion: CIRCL-maintained reference converter used by # STIX → MISP conversion: CIRCL-maintained reference converter used by
# MISP itself. Pulls pymisp transitively (needed for MISPEvent output). # MISP itself. Pulls pymisp transitively (needed for MISPEvent output).
"misp-stix>=2026.4", "misp-stix>=2026.4",
@@ -96,7 +96,7 @@ dev = [
"pytest-xdist>=3.8.0", "pytest-xdist>=3.8.0",
"pytest-timeout>=2.4.0", "pytest-timeout>=2.4.0",
"flask>=3.1.3", "flask>=3.1.3",
"twisted>=26.4.0rc2", "twisted>=26.4.0",
"requests>=2.33.1", "requests>=2.33.1",
"redis>=7.4.0", "redis>=7.4.0",
"pymysql>=1.1.2", "pymysql>=1.1.2",

View File

@@ -770,3 +770,55 @@ class TestLogIngestionWorker:
if c[0][0] == _INGEST_STATE_KEY and c[0][1] == {"position": 0} if c[0][0] == _INGEST_STATE_KEY and c[0][1] == {"position": 0}
] ]
assert reset_calls, "set_state not called with position=0 after truncation" assert reset_calls, "set_state not called with position=0 after truncation"
@pytest.mark.asyncio
async def test_get_state_db_error_retries_then_recovers(self, tmp_path):
"""OperationalError on initial get_state must not kill the task.
Regression test for the 2026-05-20 incident: MySQL came up after the
API process, so the very first get_state() threw before _run_loop was
ever entered. The task died with an unhandled exception and the
ingester was silently dead for the whole API lifetime.
"""
from decnet.web.ingester import log_ingestion_worker
from sqlalchemy.exc import OperationalError
log_file = str(tmp_path / "test.log")
json_file = tmp_path / "test.json"
json_file.write_text(
json.dumps({"decky": "d1", "service": "ssh", "event_type": "auth",
"attacker_ip": "1.2.3.4", "fields": {}, "raw_line": "x", "msg": ""}) + "\n"
)
_get_state_calls: int = 0
async def fake_get_state(key):
nonlocal _get_state_calls
_get_state_calls += 1
if _get_state_calls == 1:
raise OperationalError("connection refused", None, None)
return {"position": 0}
mock_repo = MagicMock()
mock_repo.get_state = fake_get_state
mock_repo.add_logs = AsyncMock()
mock_repo.add_bounty = AsyncMock()
mock_repo.set_state = AsyncMock()
_sleep_count: int = 0
async def fake_sleep(secs):
nonlocal _sleep_count
_sleep_count += 1
# First sleep is the retry backoff after the OperationalError.
# Second sleep means we entered _run_loop and processed the batch.
if _sleep_count >= 2:
raise asyncio.CancelledError()
with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}):
with patch("decnet.web.ingester._sleep", side_effect=fake_sleep):
with pytest.raises(asyncio.CancelledError):
await log_ingestion_worker(mock_repo)
assert _get_state_calls == 2, "should have retried get_state once after the error"
mock_repo.add_logs.assert_awaited_once()