From 7bac3a29c678a94c9ac1899a42844602546c3d14 Mon Sep 17 00:00:00 2001 From: anti Date: Wed, 20 May 2026 22:10:15 -0400 Subject: [PATCH] fix(ingester): retry get_state on startup DB errors; bump deps + rename behave packages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ingester: wrap bootstrap get_state() in forever-retry loop — MySQL coming up after the API process killed the ingestion task permanently before it ever entered _run_loop. Regression test added. deps: idna 3.13→3.15 (CVE-2026-45409), twisted 26.4.0rc2→26.4.0 (PYSEC-2026-160), pip 26.1→26.1.1 (CVE-2026-3219 resolved upstream), behave-core/behave-shell renamed from decnet-behave-* and bumped to 0.1.1. pre-commit hook updated to reflect current ignore list. --- decnet/web/ingester.py | 17 ++++++++++++- pyproject.toml | 6 ++--- tests/web/test_ingester.py | 52 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 71 insertions(+), 4 deletions(-) diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py index d8c791bc..55214cb8 100644 --- a/decnet/web/ingester.py +++ b/decnet/web/ingester.py @@ -41,7 +41,22 @@ async def log_ingestion_worker(repo: BaseRepository) -> None: _json_log_path: Path = Path(_base_log_file).with_suffix(".json") - _saved = await repo.get_state(_INGEST_STATE_KEY) + # Retry forever on transient DB failures — MySQL may still be coming + # up when the API spawns this task (api.py's own DB-init retry only + # bounds the *first* connection, not subsequent ops). If we throw here + # the task dies and the ingester is silently dead for the whole API + # lifetime — exactly the failure mode that caused decnet.json to grow + # unbounded with zero rows in the `logs` table on 2026-05-20. + while True: + try: + _saved = await repo.get_state(_INGEST_STATE_KEY) + break + except Exception as _exc: + logger.warning( + "ingester: DB not ready for state load, retrying in 5s: %s", + _exc, + ) + await _sleep(5) _position: int = _saved.get("position", 0) if _saved else 0 logger.info("ingest worker started path=%s position=%d", _json_log_path, _position) diff --git a/pyproject.toml b/pyproject.toml index 25c3fcb2..e99c6712 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,8 +52,8 @@ dependencies = [ # the Observation envelope; shell ships the primitive registry + # bus event adapter consumed by decnet/profiler/behave_shell/. Pin # range tracks BEHAVE-INTEGRATION.md §"Versioning". - "decnet-behave-core>=0.1.0,<0.2", - "decnet-behave-shell>=0.1.0,<0.2", + "behave-core>=0.1.0,<0.2", + "behave-shell>=0.1.0,<0.2", # STIX → MISP conversion: CIRCL-maintained reference converter used by # MISP itself. Pulls pymisp transitively (needed for MISPEvent output). "misp-stix>=2026.4", @@ -96,7 +96,7 @@ dev = [ "pytest-xdist>=3.8.0", "pytest-timeout>=2.4.0", "flask>=3.1.3", - "twisted>=26.4.0rc2", + "twisted>=26.4.0", "requests>=2.33.1", "redis>=7.4.0", "pymysql>=1.1.2", diff --git a/tests/web/test_ingester.py b/tests/web/test_ingester.py index 89b8669e..37c1a837 100644 --- a/tests/web/test_ingester.py +++ b/tests/web/test_ingester.py @@ -770,3 +770,55 @@ class TestLogIngestionWorker: if c[0][0] == _INGEST_STATE_KEY and c[0][1] == {"position": 0} ] assert reset_calls, "set_state not called with position=0 after truncation" + + @pytest.mark.asyncio + async def test_get_state_db_error_retries_then_recovers(self, tmp_path): + """OperationalError on initial get_state must not kill the task. + + Regression test for the 2026-05-20 incident: MySQL came up after the + API process, so the very first get_state() threw before _run_loop was + ever entered. The task died with an unhandled exception and the + ingester was silently dead for the whole API lifetime. + """ + from decnet.web.ingester import log_ingestion_worker + from sqlalchemy.exc import OperationalError + + log_file = str(tmp_path / "test.log") + json_file = tmp_path / "test.json" + json_file.write_text( + json.dumps({"decky": "d1", "service": "ssh", "event_type": "auth", + "attacker_ip": "1.2.3.4", "fields": {}, "raw_line": "x", "msg": ""}) + "\n" + ) + + _get_state_calls: int = 0 + + async def fake_get_state(key): + nonlocal _get_state_calls + _get_state_calls += 1 + if _get_state_calls == 1: + raise OperationalError("connection refused", None, None) + return {"position": 0} + + mock_repo = MagicMock() + mock_repo.get_state = fake_get_state + mock_repo.add_logs = AsyncMock() + mock_repo.add_bounty = AsyncMock() + mock_repo.set_state = AsyncMock() + + _sleep_count: int = 0 + + async def fake_sleep(secs): + nonlocal _sleep_count + _sleep_count += 1 + # First sleep is the retry backoff after the OperationalError. + # Second sleep means we entered _run_loop and processed the batch. + if _sleep_count >= 2: + raise asyncio.CancelledError() + + with patch.dict(os.environ, {"DECNET_INGEST_LOG_FILE": log_file}): + with patch("decnet.web.ingester._sleep", side_effect=fake_sleep): + with pytest.raises(asyncio.CancelledError): + await log_ingestion_worker(mock_repo) + + assert _get_state_calls == 2, "should have retried get_state once after the error" + mock_repo.add_logs.assert_awaited_once()