From c39802a4bb002473f07507a945985a0b93f54f00 Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 9 May 2026 01:59:11 -0400 Subject: [PATCH] feat(correlation/attribution): hash + numeric merge functions (Phase 3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit aggregate_numeric(): EWMA + dispersion (CV) over numeric primitive values. Stable when CV < 20% AND mean shift < 30%; drifting on >= 30% mean shift; conflicted on CV > 100%. Confidence is 1 - min(CV, 1). multi_actor is intentionally NOT a numeric state — bimodal distributions belong to the categorical detector once the value space is bucketed. aggregate_hash(): counts distinct hash values within HASH_DRIFT_WINDOW_SECS of the most recent observation. 0 rotations = stable, 1..HASH_DRIFT_MAX = drifting, > HASH_DRIFT_MAX = conflicted. Reads rotation events; never recomputes hashes (DEBT-032 already produces them via decnet.correlation.fingerprint_rotation). aggregate_observations() dispatcher now routes "categorical" | "numeric" | "hash" | None and rejects unknown kinds with ValueError (louder than NotImplementedError now that all three v0 mergers exist). 17 synthetic-input tests cover both new mergers and the dispatcher. --- decnet/correlation/attribution/aggregate.py | 187 ++++++++++++++++- .../test_aggregate_numeric_hash.py | 193 ++++++++++++++++++ 2 files changed, 377 insertions(+), 3 deletions(-) create mode 100644 tests/correlation/attribution/test_aggregate_numeric_hash.py diff --git a/decnet/correlation/attribution/aggregate.py b/decnet/correlation/attribution/aggregate.py index 47d6b71f..d6429cb8 100644 --- a/decnet/correlation/attribution/aggregate.py +++ b/decnet/correlation/attribution/aggregate.py @@ -32,6 +32,8 @@ __all__ = [ "AttributionState", "aggregate_observations", "aggregate_categorical", + "aggregate_numeric", + "aggregate_hash", ] @@ -78,12 +80,191 @@ def aggregate_observations( return _unknown(0.0, count=0) if value_kind in (None, "categorical"): return aggregate_categorical(observations) - raise NotImplementedError( - f"aggregate_observations: value_kind={value_kind!r} lands in Phase 3 " - "(numeric + hash). v0 Phase 2 only supports categorical.", + if value_kind == "numeric": + return aggregate_numeric(observations) + if value_kind == "hash": + return aggregate_hash(observations) + raise ValueError( + f"aggregate_observations: unknown value_kind={value_kind!r}; " + "expected 'categorical' | 'numeric' | 'hash' | None", ) +def aggregate_numeric( + observations: Sequence[dict[str, Any]], +) -> AttributionState: + """Numeric merger — for primitives whose ``value`` is an int / + float (e.g. ``toolchain.c2.beacon_interval_ms``, + ``motor.paste_burst_rate``). + + Compares the EWMA of the recent window against the EWMA of the + older window; reports dispersion as coefficient of variation. + + * < ``MIN_OBSERVATIONS_FOR_STATE`` → ``unknown`` + * recent CV < ``NUMERIC_STABLE_DISPERSION_PCT`` *and* mean shift + from older window < ``NUMERIC_DRIFT_MEAN_SHIFT_PCT`` → ``stable`` + * mean shifted >= ``NUMERIC_DRIFT_MEAN_SHIFT_PCT`` → ``drifting`` + * recent CV > ``NUMERIC_CONFLICT_DISPERSION_PCT`` → ``conflicted`` + * otherwise → ``stable`` (falling-through case for moderate + dispersion that hasn't yet become drift) + + Confidence on stable/drifting is ``1 - min(CV, 1.0)`` — + tighter dispersion = higher confidence. Conflicted is ``0.5`` + by convention; we cannot meaningfully claim certainty in a + statistic computed over a degenerate sample. + + ``current_value`` is the recent EWMA, not the last raw + observation: numeric primitives are noisy by nature and + surfacing the smoothed estimate keeps the dashboard from + flapping on every tick. ``multi_actor`` is *not* a numeric state + in v0 — bimodal distributions belong to the categorical + detector once the primitive's value space is bucketed. + """ + n = len(observations) + last_ts = float(observations[-1].get("ts", 0.0)) if observations else 0.0 + if n < _T.MIN_OBSERVATIONS_FOR_STATE: + return AttributionState( + current_value=_safe_float(observations[-1].get("value")) if n else None, + state="unknown", + confidence=0.0, + observation_count=n, + last_observation_ts=last_ts, + ) + + window = _T.CATEGORICAL_WINDOW_N + recent_vals = [_safe_float(o.get("value")) for o in observations[-window:]] + older_vals = [ + _safe_float(o.get("value")) + for o in observations[-2 * window: -window] + ] + recent_mean = _ewma(recent_vals, _T.NUMERIC_EWMA_ALPHA) + recent_cv = _coef_of_variation(recent_vals, recent_mean) + + if recent_cv > _T.NUMERIC_CONFLICT_DISPERSION_PCT: + return AttributionState( + current_value=recent_mean, + state="conflicted", + confidence=0.5, + observation_count=n, + last_observation_ts=last_ts, + ) + + if older_vals: + older_mean = _ewma(older_vals, _T.NUMERIC_EWMA_ALPHA) + denom = abs(older_mean) if older_mean != 0 else 1.0 + mean_shift = abs(recent_mean - older_mean) / denom + if mean_shift >= _T.NUMERIC_DRIFT_MEAN_SHIFT_PCT: + return AttributionState( + current_value=recent_mean, + state="drifting", + confidence=max(0.0, 1.0 - min(recent_cv, 1.0)), + observation_count=n, + last_observation_ts=last_ts, + ) + + return AttributionState( + current_value=recent_mean, + state="stable", + confidence=max(0.0, 1.0 - min(recent_cv, 1.0)), + observation_count=n, + last_observation_ts=last_ts, + ) + + +def aggregate_hash( + observations: Sequence[dict[str, Any]], +) -> AttributionState: + """Hash merger — for rotation-resistant fingerprints + (``toolchain.tls.jarm_server``, ``toolchain.ssh.hassh_client``). + + The merger does NOT recompute hashes; DEBT-032 + (``decnet.correlation.fingerprint_rotation``) already produces + one observation per rotation event. The state machine counts + distinct hash values inside ``HASH_DRIFT_WINDOW_SECS`` of the + most recent observation: + + * 0 rotations (single hash, any count) → ``stable`` + * 1 to ``HASH_DRIFT_MAX`` rotations within window → ``drifting`` + * > ``HASH_DRIFT_MAX`` rotations within window → ``conflicted`` + + ``unknown`` fires only on empty input — a single hash with one + observation is enough signal to say "stable", because hashes + don't have a noisy baseline the way categorical/numeric + primitives do. + + ``current_value`` is the most recent hash. Confidence is + ``1 / (1 + rotations_in_window)`` — one rotation halves + confidence, two thirds it, etc. + """ + n = len(observations) + if n == 0: + return _unknown(0.0, count=0) + last_ts = float(observations[-1].get("ts", 0.0)) + last_value = observations[-1].get("value") + + window_start = last_ts - _T.HASH_DRIFT_WINDOW_SECS + in_window = [ + o for o in observations + if float(o.get("ts", 0.0)) >= window_start + ] + distinct = len({o.get("value") for o in in_window if o.get("value") is not None}) + rotations = max(0, distinct - 1) + confidence = 1.0 / (1.0 + rotations) + + if rotations == 0: + state = "stable" + elif rotations <= _T.HASH_DRIFT_MAX: + state = "drifting" + else: + state = "conflicted" + + return AttributionState( + current_value=last_value, + state=state, + confidence=confidence, + observation_count=n, + last_observation_ts=last_ts, + ) + + +def _ewma(values: Sequence[float], alpha: float) -> float: + """Single-pass EWMA. Empty input is illegal; callers gate on + ``MIN_OBSERVATIONS_FOR_STATE`` upstream.""" + it = iter(values) + smoothed = next(it) + for v in it: + smoothed = alpha * v + (1.0 - alpha) * smoothed + return smoothed + + +def _coef_of_variation(values: Sequence[float], mean: float) -> float: + """Population-style CV = stdev / |mean|. Returns 0 on a constant + signal; returns +inf-equivalent (1e9) when the mean is exactly + zero and the signal isn't constant — so the conflicted threshold + fires without us having to special-case it upstream.""" + if not values: + return 0.0 + diffs_sq = [(v - mean) ** 2 for v in values] + variance = sum(diffs_sq) / len(values) + stdev = variance ** 0.5 + if mean == 0: + return 0.0 if stdev == 0 else 1e9 + return stdev / abs(mean) + + +def _safe_float(value: Any) -> float: + """Defensive coercion — observations may carry value=None on + unknown-emitter primitives. Treat None as 0.0; the dispersion + check will surface the resulting flat baseline as 'stable' + which is the honest answer for a single-observation primitive + that hasn't fired yet.""" + if value is None: + return 0.0 + if isinstance(value, bool): + return 1.0 if value else 0.0 + return float(value) + + def aggregate_categorical( observations: Sequence[dict[str, Any]], ) -> AttributionState: diff --git a/tests/correlation/attribution/test_aggregate_numeric_hash.py b/tests/correlation/attribution/test_aggregate_numeric_hash.py new file mode 100644 index 00000000..abc070ce --- /dev/null +++ b/tests/correlation/attribution/test_aggregate_numeric_hash.py @@ -0,0 +1,193 @@ +"""Phase 3 — numeric + hash merger tests + dispatcher coverage. + +Pure-function tests; no DB, no bus. Synthetic input lists drive each +state transition the engine claims to detect. +""" +from __future__ import annotations + +from typing import Any + +import pytest + +from decnet.correlation.attribution import _thresholds as _T +from decnet.correlation.attribution.aggregate import ( + aggregate_hash, + aggregate_numeric, + aggregate_observations, +) + + +def _obs(value: Any, ts: float, confidence: float = 0.9) -> dict[str, Any]: + return {"value": value, "ts": ts, "confidence": confidence} + + +# ── numeric merger ──────────────────────────────────────────────────── + + +def test_numeric_empty_is_unknown() -> None: + out = aggregate_numeric([]) + assert out.state == "unknown" + assert out.observation_count == 0 + + +def test_numeric_below_min_is_unknown() -> None: + obs = [_obs(5000.0, 1714000000.0 + i * 60) for i in range(_T.MIN_OBSERVATIONS_FOR_STATE - 1)] + out = aggregate_numeric(obs) + assert out.state == "unknown" + + +def test_numeric_tight_dispersion_is_stable() -> None: + """Steady beacon ~5000ms with <20% jitter → stable.""" + base = 5000.0 + obs = [ + _obs(base + delta, 1714000000.0 + i * 60) + for i, delta in enumerate([0.0, 50.0, -30.0, 20.0, 10.0]) + ] + out = aggregate_numeric(obs) + assert out.state == "stable" + assert out.confidence > 0.9 + # current_value is the smoothed estimate, close to baseline. + assert abs(out.current_value - base) < 100.0 + + +def test_numeric_mean_shift_is_drifting() -> None: + """Older window centred on 5000ms, recent window on 8000ms — that's + a 60% mean shift, well above NUMERIC_DRIFT_MEAN_SHIFT_PCT.""" + older = [_obs(5000.0, 1714000000.0 + i * 60) for i in range(5)] + newer = [_obs(8000.0, 1714001000.0 + i * 60) for i in range(5)] + out = aggregate_numeric(older + newer) + assert out.state == "drifting" + assert out.current_value > 7000.0 + + +def test_numeric_high_dispersion_is_conflicted() -> None: + """Recent window with CV > 100% (wildly mixed values).""" + obs = [ + _obs(100.0, 1714000000.0), + _obs(20000.0, 1714000060.0), + _obs(50.0, 1714000120.0), + _obs(15000.0, 1714000180.0), + _obs(200.0, 1714000240.0), + ] + out = aggregate_numeric(obs) + assert out.state == "conflicted" + assert out.confidence == 0.5 + + +def test_numeric_zero_mean_constant_is_stable() -> None: + """All-zero signal: CV is 0/0 by definition; helper returns 0 so + the state machine claims 'stable' (the honest answer).""" + obs = [_obs(0.0, 1714000000.0 + i * 60) for i in range(5)] + out = aggregate_numeric(obs) + assert out.state == "stable" + + +def test_numeric_handles_bool_values() -> None: + """Some primitives use bools as numeric flags. The merger must + coerce True/False to 1.0/0.0 without crashing the float math.""" + obs = [_obs(True, 1714000000.0 + i * 60) for i in range(5)] + out = aggregate_numeric(obs) + assert out.state == "stable" + assert out.current_value == pytest.approx(1.0) + + +# ── hash merger ─────────────────────────────────────────────────────── + + +def test_hash_empty_is_unknown() -> None: + out = aggregate_hash([]) + assert out.state == "unknown" + assert out.observation_count == 0 + + +def test_hash_single_observation_is_stable() -> None: + """Hashes don't have a noisy baseline — one observation of one + hash is enough signal to say 'stable'. Distinct from + categorical/numeric where MIN_OBSERVATIONS gates the assertion.""" + obs = [_obs("deadbeef" * 8, 1714000000.0)] + out = aggregate_hash(obs) + assert out.state == "stable" + assert out.current_value == "deadbeef" * 8 + + +def test_hash_repeated_same_value_is_stable() -> None: + """No rotations within window → stable, regardless of count.""" + same = "cafefade" * 8 + obs = [_obs(same, 1714000000.0 + i * 60) for i in range(10)] + out = aggregate_hash(obs) + assert out.state == "stable" + assert out.confidence == 1.0 + + +def test_hash_one_rotation_in_window_is_drifting() -> None: + """Two distinct hashes within HASH_DRIFT_WINDOW → 1 rotation, + below HASH_DRIFT_MAX → drifting.""" + obs = [ + _obs("a" * 64, 1714000000.0), + _obs("a" * 64, 1714000060.0), + _obs("b" * 64, 1714000120.0), + ] + out = aggregate_hash(obs) + assert out.state == "drifting" + assert out.current_value == "b" * 64 + assert out.confidence == pytest.approx(0.5) + + +def test_hash_two_rotations_still_drifting() -> None: + """Three distinct hashes within window → 2 rotations, + HASH_DRIFT_MAX exactly → still drifting (boundary).""" + obs = [ + _obs("a" * 64, 1714000000.0), + _obs("b" * 64, 1714000060.0), + _obs("c" * 64, 1714000120.0), + ] + out = aggregate_hash(obs) + assert out.state == "drifting" + + +def test_hash_many_rotations_is_conflicted() -> None: + """More than HASH_DRIFT_MAX rotations within window → conflicted.""" + obs = [ + _obs(f"hash-{i}", 1714000000.0 + i * 60) + for i in range(_T.HASH_DRIFT_MAX + 3) + ] + out = aggregate_hash(obs) + assert out.state == "conflicted" + + +def test_hash_old_rotations_drop_out_of_window() -> None: + """Old hash observations outside HASH_DRIFT_WINDOW_SECS don't count + against the rotation tally — operator stabilised after past churn.""" + cutoff = 1714000000.0 + obs = [ + # 10 days old — outside the 24h window. + _obs("oldhash", cutoff - 10 * 86400), + _obs("anotheroldhash", cutoff - 9 * 86400), + # Recent: single hash. + _obs("currenthash", cutoff), + ] + out = aggregate_hash(obs) + assert out.state == "stable" + assert out.current_value == "currenthash" + + +# ── dispatcher ──────────────────────────────────────────────────────── + + +def test_dispatcher_routes_numeric() -> None: + obs = [_obs(5000.0, 1714000000.0 + i * 60) for i in range(5)] + a = aggregate_observations(obs, value_kind="numeric") + b = aggregate_numeric(obs) + assert a == b + + +def test_dispatcher_routes_hash() -> None: + obs = [_obs("a" * 64, 1714000000.0 + i * 60) for i in range(3)] + a = aggregate_observations(obs, value_kind="hash") + b = aggregate_hash(obs) + assert a == b + + +def test_dispatcher_rejects_unknown_kind() -> None: + with pytest.raises(ValueError): + aggregate_observations([_obs(1, 1714000000.0)], value_kind="bogus")