Files
DECNET/tests/service_testing/conftest.py
anti d63e396410 fix(protocols): guard against zero/malformed length fields in binary protocol parsers
MongoDB had the same infinite-loop bug as MSSQL (msg_len=0 → buffer never
shrinks in while loop). Postgres, MySQL, and MQTT had related length-field
issues (stuck state, resource exhaustion, overlong remaining-length).

Also fixes an existing MongoDB _op_reply struct.pack format bug (extra 'q'
specifier caused struct.error on any OP_QUERY response).

Adds 53 regression + protocol boundary tests across MSSQL, MongoDB,
Postgres, MySQL, and MQTT, including a _run_with_timeout threading harness
to catch infinite loops and @pytest.mark.fuzz hypothesis tests for each.
2026-04-12 01:01:13 -04:00

48 lines
1.2 KiB
Python

"""
Shared helpers for binary-protocol service tests.
"""
import os
import threading
from types import ModuleType
from unittest.mock import MagicMock
import pytest
from hypothesis import HealthCheck
_FUZZ_SETTINGS = dict(
max_examples=int(os.environ.get("HYPOTHESIS_MAX_EXAMPLES", "200")),
deadline=2000,
suppress_health_check=[HealthCheck.function_scoped_fixture],
)
def make_fake_decnet_logging() -> ModuleType:
mod = ModuleType("decnet_logging")
mod.syslog_line = MagicMock(return_value="")
mod.write_syslog_file = MagicMock()
mod.forward_syslog = MagicMock()
mod.SEVERITY_WARNING = 4
mod.SEVERITY_INFO = 6
return mod
def run_with_timeout(fn, *args, timeout: float = 2.0) -> None:
"""Run fn(*args) in a daemon thread. pytest.fail if it doesn't return in time."""
exc_box: list[BaseException] = []
def _target():
try:
fn(*args)
except Exception as e:
exc_box.append(e)
t = threading.Thread(target=_target, daemon=True)
t.start()
t.join(timeout)
if t.is_alive():
pytest.fail(f"data_received hung for >{timeout}s — likely infinite loop")
if exc_box:
raise exc_box[0]