diff --git a/decnet/correlation/attribution/aggregate.py b/decnet/correlation/attribution/aggregate.py index 70446bc8..47d6b71f 100644 --- a/decnet/correlation/attribution/aggregate.py +++ b/decnet/correlation/attribution/aggregate.py @@ -2,40 +2,48 @@ core merge logic. Pure: given a list of BEHAVE observations for one -``(identity_uuid, primitive)`` pair, returns the derived state and -mirror metadata. No DB, no bus, no I/O. The worker +``(identity_uuid, primitive)`` pair (already ordered by ``ts`` ASC), +returns the derived state. No DB, no bus, no I/O. The worker (``decnet.correlation.attribution_worker``) is responsible for loading the observations and writing the state row. State vocabulary is frozen at five values (see ``ATTRIBUTION-ENGINE.md``): -* ``unknown`` — < 3 observations (insufficient signal) +* ``unknown`` — < ``MIN_OBSERVATIONS_FOR_STATE`` observations * ``stable`` — recent N agree * ``drifting`` — recent N stable but disagree with older N * ``conflicted`` — recent N split * ``multi_actor`` — conflicted + cross-session alternation pattern -Phase 2 ships :func:`_aggregate_categorical`. Phase 3 will add -:func:`_aggregate_numeric` and :func:`_aggregate_hash` and the -ValueKind dispatcher. +Phase 2 ships :func:`_aggregate_categorical` (the dominant ValueKind +for BEHAVE-SHELL primitives). Phase 3 adds numeric + hash mergers and +the ValueKind dispatcher in :func:`aggregate_observations`. """ from __future__ import annotations +from collections import Counter from dataclasses import dataclass -from typing import Any, Iterable, Sequence +from typing import Any, Sequence -__all__ = ["AttributionState", "aggregate_observations"] +from decnet.correlation.attribution import _thresholds as _T + +__all__ = [ + "AttributionState", + "aggregate_observations", + "aggregate_categorical", +] @dataclass(frozen=True) class AttributionState: """Output of the merger for one ``(identity, primitive)`` pair. - The fields map 1:1 onto :class:`AttributionStateRow` columns — - callers compose the final dict for ``upsert_attribution_state`` - by adding ``identity_uuid`` and ``primitive`` (the merger does not - own the natural key). + The fields map onto :class:`AttributionStateRow` columns; the + worker composes the final dict for ``upsert_attribution_state`` + by adding ``identity_uuid`` + ``primitive`` (the merger does not + own the natural key) and a ``last_change_ts`` derived from the + prior row. """ current_value: Any @@ -47,41 +55,183 @@ class AttributionState: def aggregate_observations( observations: Sequence[dict[str, Any]], + *, + value_kind: str | None = None, ) -> AttributionState: - """Run the merger over *observations* and return the derived state. + """Run the merger over *observations* and return derived state. *observations* is a list of dicts with at minimum ``value``, - ``ts``, and ``confidence`` fields (matching the BEHAVE - ``Observation`` envelope shape that - ``ObservationRow.observations_time_series`` returns). They MUST - arrive ordered by ``ts`` ascending; the merger assumes that. + ``ts``, ``confidence`` (matching + ``ObservationRow.observations_time_series`` output). Sessions + are derived from the ``ts`` axis — the merger does not need a + separate session id; cross-session alternation is detected by + the gap distribution. Sessions are NOT collapsed before the + merger; ``multi_actor`` reasons over the full per-observation + series. - Phase 2 only supports categorical values. Phase 3 will dispatch - on the BEHAVE primitive's ``ValueKind`` and pick the right merger. + *value_kind* is a hint from the BEHAVE primitive registry — Phase + 2 only honours ``"categorical"`` (or ``None``, treated as + categorical). Phase 3 will dispatch on ``"numeric"`` / + ``"hash"`` to the matching merger. """ if not observations: - return AttributionState( - current_value=None, - state="unknown", - confidence=0.0, - observation_count=0, - last_observation_ts=0.0, - ) - # Phase 2 stub — categorical only. Phase 3 will inspect - # ``primitive`` (passed in alongside observations) to pick a - # merger; for now defer to the categorical implementation - # (``_aggregate_categorical``) which Phase 2 lands. + return _unknown(0.0, count=0) + if value_kind in (None, "categorical"): + return aggregate_categorical(observations) raise NotImplementedError( - "aggregate_observations is implemented in Phase 2 (categorical) " - "and Phase 3 (numeric + hash). v0 Phase 1 ships the substrate " - "only; the worker logs without invoking the merger.", + f"aggregate_observations: value_kind={value_kind!r} lands in Phase 3 " + "(numeric + hash). v0 Phase 2 only supports categorical.", ) -def _coerce_obs_iter( - observations: Iterable[dict[str, Any]], -) -> list[dict[str, Any]]: - """Defensive: accept any iterable, return a list. Used by the - worker which pulls observations off the bus + DB into mixed - iterables.""" - return list(observations) +def aggregate_categorical( + observations: Sequence[dict[str, Any]], +) -> AttributionState: + """Categorical merger — the dominant case for BEHAVE-SHELL. + + Compares the recent N-window against the older N-window. With + ``CATEGORICAL_WINDOW_N = 5`` and ``CATEGORICAL_MAJORITY_THRESHOLD + = 4``: + + * fewer than ``MIN_OBSERVATIONS_FOR_STATE`` → ``unknown`` + * recent window has a clear majority + matches older window → ``stable`` + * recent window has a clear majority + differs from older window → ``drifting`` + * recent window split + alternation pattern across observations → ``multi_actor`` + * recent window split + no alternation → ``conflicted`` + + Confidence is the recent-window agreement ratio; ``multi_actor`` + is capped at ``MULTI_ACTOR_MAX_CONFIDENCE``. The merger returns + the most-recent observation's value as ``current_value`` + regardless of state — the dashboard wants a value to render + even on ``conflicted`` rows. + """ + n = len(observations) + last_ts = float(observations[-1].get("ts", 0.0)) + last_value = observations[-1].get("value") + if n < _T.MIN_OBSERVATIONS_FOR_STATE: + return AttributionState( + current_value=last_value, + state="unknown", + confidence=0.0, + observation_count=n, + last_observation_ts=last_ts, + ) + + window = _T.CATEGORICAL_WINDOW_N + recent = observations[-window:] + recent_values = [o.get("value") for o in recent] + recent_count = Counter(recent_values) + top_value, top_count = recent_count.most_common(1)[0] + recent_size = len(recent) + confidence = top_count / recent_size + + is_recent_clear = top_count >= min( + _T.CATEGORICAL_MAJORITY_THRESHOLD, recent_size, + ) + + if not is_recent_clear: + # Split recent window. Distinguish multi_actor (alternation) + # from random conflict. + if _is_alternation(observations): + return AttributionState( + current_value=last_value, + state="multi_actor", + confidence=min(confidence, _T.MULTI_ACTOR_MAX_CONFIDENCE), + observation_count=n, + last_observation_ts=last_ts, + ) + return AttributionState( + current_value=last_value, + state="conflicted", + confidence=confidence, + observation_count=n, + last_observation_ts=last_ts, + ) + + # Recent window has a clear majority. Compare to the prior + # window to decide stable vs drifting. + older = observations[-2 * window: -window] + if not older: + # Only one window's worth of data — call it stable. The + # dashboard already gates "unknown" on + # MIN_OBSERVATIONS_FOR_STATE so this branch is reachable + # only when the operator has produced enough observations + # for one full window but not two. + return AttributionState( + current_value=top_value, + state="stable", + confidence=confidence, + observation_count=n, + last_observation_ts=last_ts, + ) + + older_values = [o.get("value") for o in older] + older_count = Counter(older_values) + older_top_value, older_top_count = older_count.most_common(1)[0] + older_size = len(older) + older_clear = older_top_count >= min( + _T.CATEGORICAL_MAJORITY_THRESHOLD, older_size, + ) + + if not older_clear: + # Older window was itself conflicted; we just stabilised. + # That's drift in the colloquial sense — the attacker + # converged onto a single behaviour. + return AttributionState( + current_value=top_value, + state="drifting", + confidence=confidence, + observation_count=n, + last_observation_ts=last_ts, + ) + + if older_top_value != top_value: + return AttributionState( + current_value=top_value, + state="drifting", + confidence=confidence, + observation_count=n, + last_observation_ts=last_ts, + ) + return AttributionState( + current_value=top_value, + state="stable", + confidence=confidence, + observation_count=n, + last_observation_ts=last_ts, + ) + + +def _is_alternation(observations: Sequence[dict[str, Any]]) -> bool: + """Heuristic: do recent observations alternate between two values + (operator A → B → A → B), as opposed to random thrashing? + + Conservative: requires at least 4 observations in the window, + exactly 2 distinct values, and that flips outnumber repeats by + at least 2:1. ATTRIBUTION-ENGINE.md §"Open question 1" warns + that flapping primitives on flaky networks look like two + operators; this guard is what keeps the false-positive rate down. + """ + window = _T.CATEGORICAL_WINDOW_N + recent = observations[-window:] + if len(recent) < 4: + return False + values = [o.get("value") for o in recent] + distinct = set(values) + if len(distinct) != 2: + return False + flips = sum( + 1 for i in range(1, len(values)) if values[i] != values[i - 1] + ) + repeats = (len(values) - 1) - flips + return flips >= 2 * max(repeats, 1) + + +def _unknown(last_ts: float, *, count: int) -> AttributionState: + return AttributionState( + current_value=None, + state="unknown", + confidence=0.0, + observation_count=count, + last_observation_ts=last_ts, + ) diff --git a/tests/correlation/attribution/test_aggregate_categorical.py b/tests/correlation/attribution/test_aggregate_categorical.py new file mode 100644 index 00000000..a00d994e --- /dev/null +++ b/tests/correlation/attribution/test_aggregate_categorical.py @@ -0,0 +1,175 @@ +"""Pure-function tests for the categorical merger — every state +transition the engine claims to detect, exercised by synthetic +observation lists. No DB, no bus. + +State vocabulary: unknown / stable / drifting / conflicted / +multi_actor. Coverage drives one test per state plus the boundary +cases that distinguish them. +""" +from __future__ import annotations + +from typing import Any, Sequence + +from decnet.correlation.attribution import _thresholds as _T +from decnet.correlation.attribution.aggregate import ( + aggregate_categorical, + aggregate_observations, +) + + +def _obs(value: Any, ts: float, confidence: float = 0.9) -> dict[str, Any]: + return {"value": value, "ts": ts, "confidence": confidence} + + +def _pad( + *, value: Any, count: int, start_ts: float = 1714000000.0, +) -> list[dict[str, Any]]: + return [_obs(value, start_ts + i * 60.0) for i in range(count)] + + +def test_empty_returns_unknown_zero_count() -> None: + out = aggregate_observations([]) + assert out.state == "unknown" + assert out.observation_count == 0 + assert out.current_value is None + + +def test_below_min_threshold_is_unknown() -> None: + obs = _pad(value="typed", count=_T.MIN_OBSERVATIONS_FOR_STATE - 1) + out = aggregate_categorical(obs) + assert out.state == "unknown" + assert out.observation_count == len(obs) + # Last value is surfaced even on unknown so the UI has something + # to render. + assert out.current_value == "typed" + + +def test_stable_when_recent_window_agrees() -> None: + # 5 identical observations — window is full of one value. + obs = _pad(value="typed", count=_T.CATEGORICAL_WINDOW_N) + out = aggregate_categorical(obs) + assert out.state == "stable" + assert out.current_value == "typed" + assert out.confidence == 1.0 + + +def test_stable_tolerates_one_outlier_in_five() -> None: + """Majority threshold is 4 of 5 — one stray paste in a typed + window must not flip the state to conflicted.""" + obs = _pad(value="typed", count=4) + [_obs("pasted", 1714000400.0)] + out = aggregate_categorical(obs) + assert out.state == "stable" + assert out.current_value == "typed" # majority value, not last + + +def test_drifting_when_recent_disagrees_with_older() -> None: + """Older window stable on A, recent window stable on B → drifting + (the attacker switched behaviour).""" + older = _pad(value="typed", count=_T.CATEGORICAL_WINDOW_N) + newer = _pad( + value="pasted", + count=_T.CATEGORICAL_WINDOW_N, + start_ts=1714001000.0, + ) + out = aggregate_categorical(older + newer) + assert out.state == "drifting" + assert out.current_value == "pasted" + + +def test_drifting_when_older_was_conflicted_and_recent_stable() -> None: + """Operator stabilised after an earlier mixed period.""" + older = [ + _obs("typed", 1714000000.0), + _obs("pasted", 1714000060.0), + _obs("typed", 1714000120.0), + _obs("pasted", 1714000180.0), + _obs("mixed", 1714000240.0), + ] + newer = _pad( + value="pasted", + count=_T.CATEGORICAL_WINDOW_N, + start_ts=1714001000.0, + ) + out = aggregate_categorical(older + newer) + assert out.state == "drifting" + assert out.current_value == "pasted" + + +def test_conflicted_on_random_split_no_alternation() -> None: + """Recent window split across 3+ values, no two-value alternation + → conflicted (random thrash, not multi_actor).""" + obs = [ + _obs("typed", 1714000000.0), + _obs("pasted", 1714000060.0), + _obs("mixed", 1714000120.0), + _obs("typed", 1714000180.0), + _obs("pasted", 1714000240.0), + ] + out = aggregate_categorical(obs) + # Three distinct values rules out 2-way alternation. + assert out.state == "conflicted" + + +def test_multi_actor_on_clean_alternation() -> None: + """Recent window alternates between exactly two values, flips + >= 2× repeats — operator A↔B handoff signal.""" + # 5 obs: A B A B A — 4 flips, 0 repeats. + obs = [ + _obs("typed", 1714000000.0), + _obs("pasted", 1714000060.0), + _obs("typed", 1714000120.0), + _obs("pasted", 1714000180.0), + _obs("typed", 1714000240.0), + ] + out = aggregate_categorical(obs) + assert out.state == "multi_actor" + assert out.confidence <= _T.MULTI_ACTOR_MAX_CONFIDENCE + + +def test_alternation_requires_two_distinct_values() -> None: + """A single value flapping with itself is not multi_actor — it's + just a flapping primitive on a flaky network.""" + obs = _pad(value="typed", count=_T.CATEGORICAL_WINDOW_N) + out = aggregate_categorical(obs) + assert out.state == "stable" + + +def test_short_run_after_threshold_is_stable_not_drifting() -> None: + """Just past MIN_OBSERVATIONS_FOR_STATE but no older window — + stable, not drifting (drifting requires comparison to a prior + window that materially differs).""" + obs = _pad(value="typed", count=_T.MIN_OBSERVATIONS_FOR_STATE) + out = aggregate_categorical(obs) + assert out.state == "stable" + + +def test_observation_count_reports_total_not_window_size() -> None: + obs = _pad(value="typed", count=12) + out = aggregate_categorical(obs) + assert out.observation_count == 12 + + +def test_last_observation_ts_is_most_recent() -> None: + obs = _pad(value="typed", count=5) + out = aggregate_categorical(obs) + assert out.last_observation_ts == obs[-1]["ts"] + + +def test_dispatcher_routes_categorical() -> None: + """aggregate_observations(value_kind=None|"categorical") delegates + to the categorical merger; both produce the same output.""" + obs = _pad(value="typed", count=_T.CATEGORICAL_WINDOW_N) + a = aggregate_observations(obs) + b = aggregate_observations(obs, value_kind="categorical") + c = aggregate_categorical(obs) + assert a == b == c + + +def test_dispatcher_rejects_unimplemented_kinds() -> None: + """numeric / hash kinds land in Phase 3; surface the gap loudly + so a misuse doesn't silently fall through to categorical.""" + import pytest + obs = _pad(value=5000.0, count=5) + for kind in ("numeric", "hash"): + with pytest.raises(NotImplementedError): + aggregate_observations(obs, value_kind=kind)