Closes the cred-coverage gap for 7 services that already had the data
on the wire but never landed it in the Credential table:
- SNMP — community string lands as secret_kind="snmp_community",
principal=None (v1/v2c has no per-user identity, the community IS
the auth).
- SIP — Digest response hash, previously buried in the auth= header
dump, now classify_authorization()-extracted.
- HTTP / HTTPS — Authorization header was in the headers JSON but
never extracted. Now Basic decodes to plaintext, Bearer →
http_bearer (principal=None), Digest → http_digest_md5.
- K8s — already extracted Authorization but didn't normalize. Service-
account JWTs flow through as Bearer.
- Docker API — headers absent entirely. Adds the headers JSON dump
and runs Authorization through the classifier.
- Elasticsearch — five distinct request handlers; each gains a
per-handler _cred_fields() helper.
Adds canonical templates/syslog_bridge.py:classify_authorization().
Recognised: Basic / Bearer / Token / Digest. Unknown schemes (NTLM,
AWS4-HMAC, Negotiate) return None; the header still rides in the
ambient SD-block but isn't normalized as a credential. The SD shape
on the wire collapses sip_digest_md5 into http_digest_md5 — same
algorithm, so cross-protocol reuse correlates correctly when (rare)
nonce collisions allow.
Drive-by repair of tests/core/test_fingerprinting.py:
- The pre-existing `test_http_useragent_extracted` asserted both that
add_bounty was called exactly once AND that the UA payload carried
`path` and `method` fields. Both wrong since this session opened:
the http_quirks fingerprint added later fires too, and the UA
payload never actually included path/method despite the assertion.
- Adds `path`/`method` to the UA fingerprint payload (real operator
value: "Nikto hit /admin" beats "Nikto seen on this decky").
- Replaces `assert_awaited_once` with a `_find_ua_bounty()` helper
that filters add_bounty calls by `fingerprint_type`. New fingerprint
families landing later won't retroactively break old tests.
- Updates the two credential-bearing tests to use the post-DEBT-039
native shape (`secret_b64` / `principal`) and `upsert_credential`,
not the deleted legacy `username+password` adapter.
Also rebuilds the per-service fake `syslog_bridge` modules in
tests/service_testing/{conftest,test_imap,test_pop3,test_snmp,test_mqtt,test_smtp}.py
to expose `encode_secret` + `classify_authorization`. Service templates
that import either now no longer fail at test collection.
173 tests pass in the touched scope. Phases 2-7 still pending.
70 lines
2.1 KiB
Python
70 lines
2.1 KiB
Python
"""
|
|
Shared helpers for binary-protocol service tests.
|
|
"""
|
|
|
|
import os
|
|
import threading
|
|
from types import ModuleType
|
|
from unittest.mock import MagicMock
|
|
|
|
import pytest
|
|
from hypothesis import HealthCheck
|
|
|
|
|
|
_FUZZ_SETTINGS = dict(
|
|
max_examples=int(os.environ.get("HYPOTHESIS_MAX_EXAMPLES", "200")),
|
|
deadline=2000,
|
|
suppress_health_check=[HealthCheck.function_scoped_fixture],
|
|
)
|
|
|
|
|
|
def make_fake_syslog_bridge() -> ModuleType:
|
|
mod = ModuleType("syslog_bridge")
|
|
mod.syslog_line = MagicMock(return_value="")
|
|
mod.write_syslog_file = MagicMock()
|
|
mod.forward_syslog = MagicMock()
|
|
mod.SEVERITY_WARNING = 4
|
|
mod.SEVERITY_INFO = 6
|
|
# encode_secret returns the universal cred SD shape; tests don't
|
|
# care about the exact bytes, just that the key set is correct.
|
|
mod.encode_secret = MagicMock(
|
|
return_value={"secret_printable": "", "secret_b64": ""}
|
|
)
|
|
# classify_authorization returns None for unknown / absent auth so
|
|
# services that call **(cred or {}) get a no-op spread.
|
|
mod.classify_authorization = MagicMock(return_value=None)
|
|
return mod
|
|
|
|
|
|
def load_real_instance_seed() -> ModuleType:
|
|
"""Load the real instance_seed helper so templates under test see the
|
|
actual per-instance seeding behavior, not a stub. Tests that need
|
|
determinism should pin NODE_NAME via monkeypatch before loading a
|
|
template."""
|
|
import importlib.util
|
|
spec = importlib.util.spec_from_file_location(
|
|
"instance_seed", "decnet/templates/instance_seed.py"
|
|
)
|
|
mod = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(mod)
|
|
return mod
|
|
|
|
|
|
def run_with_timeout(fn, *args, timeout: float = 2.0) -> None:
|
|
"""Run fn(*args) in a daemon thread. pytest.fail if it doesn't return in time."""
|
|
exc_box: list[BaseException] = []
|
|
|
|
def _target():
|
|
try:
|
|
fn(*args)
|
|
except Exception as e:
|
|
exc_box.append(e)
|
|
|
|
t = threading.Thread(target=_target, daemon=True)
|
|
t.start()
|
|
t.join(timeout)
|
|
if t.is_alive():
|
|
pytest.fail(f"data_received hung for >{timeout}s — likely infinite loop")
|
|
if exc_box:
|
|
raise exc_box[0]
|