feat(correlation/attribution): categorical merge state machine (Phase 2)

aggregate_categorical(): pure function over a per-(identity, primitive)
observation list. Five-state vocabulary, last-N=5 window comparison
with one-outlier-tolerant majority threshold:

* unknown — < 3 observations
* stable — recent 5 agree (≥ 4 of 5 share top value), older 5 same
* drifting — recent 5 stable but disagrees with older 5, or older
  was conflicted and recent stabilised
* conflicted — recent 5 split, no two-value alternation pattern
* multi_actor — recent 5 split + alternation between exactly two
  values (operator A↔B handoff). Confidence capped at 0.6 per
  _thresholds.MULTI_ACTOR_MAX_CONFIDENCE; flapping primitives on
  flaky networks would otherwise look like two operators.

aggregate_observations() dispatcher honours value_kind="categorical"
(or None) and raises NotImplementedError for "numeric" / "hash" so
Phase 3 lands cleanly. 14 synthetic-input tests cover every state
+ boundary condition.
This commit is contained in:
2026-05-08 23:18:22 -04:00
parent c2891d6cca
commit 4956977739
2 changed files with 365 additions and 40 deletions

View File

@@ -2,40 +2,48 @@
core merge logic.
Pure: given a list of BEHAVE observations for one
``(identity_uuid, primitive)`` pair, returns the derived state and
mirror metadata. No DB, no bus, no I/O. The worker
``(identity_uuid, primitive)`` pair (already ordered by ``ts`` ASC),
returns the derived state. No DB, no bus, no I/O. The worker
(``decnet.correlation.attribution_worker``) is responsible for loading
the observations and writing the state row.
State vocabulary is frozen at five values (see
``ATTRIBUTION-ENGINE.md``):
* ``unknown`` — < 3 observations (insufficient signal)
* ``unknown`` — < ``MIN_OBSERVATIONS_FOR_STATE`` observations
* ``stable`` — recent N agree
* ``drifting`` — recent N stable but disagree with older N
* ``conflicted`` — recent N split
* ``multi_actor`` — conflicted + cross-session alternation pattern
Phase 2 ships :func:`_aggregate_categorical`. Phase 3 will add
:func:`_aggregate_numeric` and :func:`_aggregate_hash` and the
ValueKind dispatcher.
Phase 2 ships :func:`_aggregate_categorical` (the dominant ValueKind
for BEHAVE-SHELL primitives). Phase 3 adds numeric + hash mergers and
the ValueKind dispatcher in :func:`aggregate_observations`.
"""
from __future__ import annotations
from collections import Counter
from dataclasses import dataclass
from typing import Any, Iterable, Sequence
from typing import Any, Sequence
__all__ = ["AttributionState", "aggregate_observations"]
from decnet.correlation.attribution import _thresholds as _T
__all__ = [
"AttributionState",
"aggregate_observations",
"aggregate_categorical",
]
@dataclass(frozen=True)
class AttributionState:
"""Output of the merger for one ``(identity, primitive)`` pair.
The fields map 1:1 onto :class:`AttributionStateRow` columns
callers compose the final dict for ``upsert_attribution_state``
by adding ``identity_uuid`` and ``primitive`` (the merger does not
own the natural key).
The fields map onto :class:`AttributionStateRow` columns; the
worker composes the final dict for ``upsert_attribution_state``
by adding ``identity_uuid`` + ``primitive`` (the merger does not
own the natural key) and a ``last_change_ts`` derived from the
prior row.
"""
current_value: Any
@@ -47,41 +55,183 @@ class AttributionState:
def aggregate_observations(
observations: Sequence[dict[str, Any]],
*,
value_kind: str | None = None,
) -> AttributionState:
"""Run the merger over *observations* and return the derived state.
"""Run the merger over *observations* and return derived state.
*observations* is a list of dicts with at minimum ``value``,
``ts``, and ``confidence`` fields (matching the BEHAVE
``Observation`` envelope shape that
``ObservationRow.observations_time_series`` returns). They MUST
arrive ordered by ``ts`` ascending; the merger assumes that.
``ts``, ``confidence`` (matching
``ObservationRow.observations_time_series`` output). Sessions
are derived from the ``ts`` axis — the merger does not need a
separate session id; cross-session alternation is detected by
the gap distribution. Sessions are NOT collapsed before the
merger; ``multi_actor`` reasons over the full per-observation
series.
Phase 2 only supports categorical values. Phase 3 will dispatch
on the BEHAVE primitive's ``ValueKind`` and pick the right merger.
*value_kind* is a hint from the BEHAVE primitive registry — Phase
2 only honours ``"categorical"`` (or ``None``, treated as
categorical). Phase 3 will dispatch on ``"numeric"`` /
``"hash"`` to the matching merger.
"""
if not observations:
return AttributionState(
current_value=None,
state="unknown",
confidence=0.0,
observation_count=0,
last_observation_ts=0.0,
)
# Phase 2 stub — categorical only. Phase 3 will inspect
# ``primitive`` (passed in alongside observations) to pick a
# merger; for now defer to the categorical implementation
# (``_aggregate_categorical``) which Phase 2 lands.
return _unknown(0.0, count=0)
if value_kind in (None, "categorical"):
return aggregate_categorical(observations)
raise NotImplementedError(
"aggregate_observations is implemented in Phase 2 (categorical) "
"and Phase 3 (numeric + hash). v0 Phase 1 ships the substrate "
"only; the worker logs without invoking the merger.",
f"aggregate_observations: value_kind={value_kind!r} lands in Phase 3 "
"(numeric + hash). v0 Phase 2 only supports categorical.",
)
def _coerce_obs_iter(
observations: Iterable[dict[str, Any]],
) -> list[dict[str, Any]]:
"""Defensive: accept any iterable, return a list. Used by the
worker which pulls observations off the bus + DB into mixed
iterables."""
return list(observations)
def aggregate_categorical(
observations: Sequence[dict[str, Any]],
) -> AttributionState:
"""Categorical merger — the dominant case for BEHAVE-SHELL.
Compares the recent N-window against the older N-window. With
``CATEGORICAL_WINDOW_N = 5`` and ``CATEGORICAL_MAJORITY_THRESHOLD
= 4``:
* fewer than ``MIN_OBSERVATIONS_FOR_STATE`` → ``unknown``
* recent window has a clear majority + matches older window → ``stable``
* recent window has a clear majority + differs from older window → ``drifting``
* recent window split + alternation pattern across observations → ``multi_actor``
* recent window split + no alternation → ``conflicted``
Confidence is the recent-window agreement ratio; ``multi_actor``
is capped at ``MULTI_ACTOR_MAX_CONFIDENCE``. The merger returns
the most-recent observation's value as ``current_value``
regardless of state — the dashboard wants a value to render
even on ``conflicted`` rows.
"""
n = len(observations)
last_ts = float(observations[-1].get("ts", 0.0))
last_value = observations[-1].get("value")
if n < _T.MIN_OBSERVATIONS_FOR_STATE:
return AttributionState(
current_value=last_value,
state="unknown",
confidence=0.0,
observation_count=n,
last_observation_ts=last_ts,
)
window = _T.CATEGORICAL_WINDOW_N
recent = observations[-window:]
recent_values = [o.get("value") for o in recent]
recent_count = Counter(recent_values)
top_value, top_count = recent_count.most_common(1)[0]
recent_size = len(recent)
confidence = top_count / recent_size
is_recent_clear = top_count >= min(
_T.CATEGORICAL_MAJORITY_THRESHOLD, recent_size,
)
if not is_recent_clear:
# Split recent window. Distinguish multi_actor (alternation)
# from random conflict.
if _is_alternation(observations):
return AttributionState(
current_value=last_value,
state="multi_actor",
confidence=min(confidence, _T.MULTI_ACTOR_MAX_CONFIDENCE),
observation_count=n,
last_observation_ts=last_ts,
)
return AttributionState(
current_value=last_value,
state="conflicted",
confidence=confidence,
observation_count=n,
last_observation_ts=last_ts,
)
# Recent window has a clear majority. Compare to the prior
# window to decide stable vs drifting.
older = observations[-2 * window: -window]
if not older:
# Only one window's worth of data — call it stable. The
# dashboard already gates "unknown" on
# MIN_OBSERVATIONS_FOR_STATE so this branch is reachable
# only when the operator has produced enough observations
# for one full window but not two.
return AttributionState(
current_value=top_value,
state="stable",
confidence=confidence,
observation_count=n,
last_observation_ts=last_ts,
)
older_values = [o.get("value") for o in older]
older_count = Counter(older_values)
older_top_value, older_top_count = older_count.most_common(1)[0]
older_size = len(older)
older_clear = older_top_count >= min(
_T.CATEGORICAL_MAJORITY_THRESHOLD, older_size,
)
if not older_clear:
# Older window was itself conflicted; we just stabilised.
# That's drift in the colloquial sense — the attacker
# converged onto a single behaviour.
return AttributionState(
current_value=top_value,
state="drifting",
confidence=confidence,
observation_count=n,
last_observation_ts=last_ts,
)
if older_top_value != top_value:
return AttributionState(
current_value=top_value,
state="drifting",
confidence=confidence,
observation_count=n,
last_observation_ts=last_ts,
)
return AttributionState(
current_value=top_value,
state="stable",
confidence=confidence,
observation_count=n,
last_observation_ts=last_ts,
)
def _is_alternation(observations: Sequence[dict[str, Any]]) -> bool:
"""Heuristic: do recent observations alternate between two values
(operator A → B → A → B), as opposed to random thrashing?
Conservative: requires at least 4 observations in the window,
exactly 2 distinct values, and that flips outnumber repeats by
at least 2:1. ATTRIBUTION-ENGINE.md §"Open question 1" warns
that flapping primitives on flaky networks look like two
operators; this guard is what keeps the false-positive rate down.
"""
window = _T.CATEGORICAL_WINDOW_N
recent = observations[-window:]
if len(recent) < 4:
return False
values = [o.get("value") for o in recent]
distinct = set(values)
if len(distinct) != 2:
return False
flips = sum(
1 for i in range(1, len(values)) if values[i] != values[i - 1]
)
repeats = (len(values) - 1) - flips
return flips >= 2 * max(repeats, 1)
def _unknown(last_ts: float, *, count: int) -> AttributionState:
return AttributionState(
current_value=None,
state="unknown",
confidence=0.0,
observation_count=count,
last_observation_ts=last_ts,
)

View File

@@ -0,0 +1,175 @@
"""Pure-function tests for the categorical merger — every state
transition the engine claims to detect, exercised by synthetic
observation lists. No DB, no bus.
State vocabulary: unknown / stable / drifting / conflicted /
multi_actor. Coverage drives one test per state plus the boundary
cases that distinguish them.
"""
from __future__ import annotations
from typing import Any, Sequence
from decnet.correlation.attribution import _thresholds as _T
from decnet.correlation.attribution.aggregate import (
aggregate_categorical,
aggregate_observations,
)
def _obs(value: Any, ts: float, confidence: float = 0.9) -> dict[str, Any]:
return {"value": value, "ts": ts, "confidence": confidence}
def _pad(
*, value: Any, count: int, start_ts: float = 1714000000.0,
) -> list[dict[str, Any]]:
return [_obs(value, start_ts + i * 60.0) for i in range(count)]
def test_empty_returns_unknown_zero_count() -> None:
out = aggregate_observations([])
assert out.state == "unknown"
assert out.observation_count == 0
assert out.current_value is None
def test_below_min_threshold_is_unknown() -> None:
obs = _pad(value="typed", count=_T.MIN_OBSERVATIONS_FOR_STATE - 1)
out = aggregate_categorical(obs)
assert out.state == "unknown"
assert out.observation_count == len(obs)
# Last value is surfaced even on unknown so the UI has something
# to render.
assert out.current_value == "typed"
def test_stable_when_recent_window_agrees() -> None:
# 5 identical observations — window is full of one value.
obs = _pad(value="typed", count=_T.CATEGORICAL_WINDOW_N)
out = aggregate_categorical(obs)
assert out.state == "stable"
assert out.current_value == "typed"
assert out.confidence == 1.0
def test_stable_tolerates_one_outlier_in_five() -> None:
"""Majority threshold is 4 of 5 — one stray paste in a typed
window must not flip the state to conflicted."""
obs = _pad(value="typed", count=4) + [_obs("pasted", 1714000400.0)]
out = aggregate_categorical(obs)
assert out.state == "stable"
assert out.current_value == "typed" # majority value, not last
def test_drifting_when_recent_disagrees_with_older() -> None:
"""Older window stable on A, recent window stable on B → drifting
(the attacker switched behaviour)."""
older = _pad(value="typed", count=_T.CATEGORICAL_WINDOW_N)
newer = _pad(
value="pasted",
count=_T.CATEGORICAL_WINDOW_N,
start_ts=1714001000.0,
)
out = aggregate_categorical(older + newer)
assert out.state == "drifting"
assert out.current_value == "pasted"
def test_drifting_when_older_was_conflicted_and_recent_stable() -> None:
"""Operator stabilised after an earlier mixed period."""
older = [
_obs("typed", 1714000000.0),
_obs("pasted", 1714000060.0),
_obs("typed", 1714000120.0),
_obs("pasted", 1714000180.0),
_obs("mixed", 1714000240.0),
]
newer = _pad(
value="pasted",
count=_T.CATEGORICAL_WINDOW_N,
start_ts=1714001000.0,
)
out = aggregate_categorical(older + newer)
assert out.state == "drifting"
assert out.current_value == "pasted"
def test_conflicted_on_random_split_no_alternation() -> None:
"""Recent window split across 3+ values, no two-value alternation
→ conflicted (random thrash, not multi_actor)."""
obs = [
_obs("typed", 1714000000.0),
_obs("pasted", 1714000060.0),
_obs("mixed", 1714000120.0),
_obs("typed", 1714000180.0),
_obs("pasted", 1714000240.0),
]
out = aggregate_categorical(obs)
# Three distinct values rules out 2-way alternation.
assert out.state == "conflicted"
def test_multi_actor_on_clean_alternation() -> None:
"""Recent window alternates between exactly two values, flips
>= 2× repeats — operator A↔B handoff signal."""
# 5 obs: A B A B A — 4 flips, 0 repeats.
obs = [
_obs("typed", 1714000000.0),
_obs("pasted", 1714000060.0),
_obs("typed", 1714000120.0),
_obs("pasted", 1714000180.0),
_obs("typed", 1714000240.0),
]
out = aggregate_categorical(obs)
assert out.state == "multi_actor"
assert out.confidence <= _T.MULTI_ACTOR_MAX_CONFIDENCE
def test_alternation_requires_two_distinct_values() -> None:
"""A single value flapping with itself is not multi_actor — it's
just a flapping primitive on a flaky network."""
obs = _pad(value="typed", count=_T.CATEGORICAL_WINDOW_N)
out = aggregate_categorical(obs)
assert out.state == "stable"
def test_short_run_after_threshold_is_stable_not_drifting() -> None:
"""Just past MIN_OBSERVATIONS_FOR_STATE but no older window —
stable, not drifting (drifting requires comparison to a prior
window that materially differs)."""
obs = _pad(value="typed", count=_T.MIN_OBSERVATIONS_FOR_STATE)
out = aggregate_categorical(obs)
assert out.state == "stable"
def test_observation_count_reports_total_not_window_size() -> None:
obs = _pad(value="typed", count=12)
out = aggregate_categorical(obs)
assert out.observation_count == 12
def test_last_observation_ts_is_most_recent() -> None:
obs = _pad(value="typed", count=5)
out = aggregate_categorical(obs)
assert out.last_observation_ts == obs[-1]["ts"]
def test_dispatcher_routes_categorical() -> None:
"""aggregate_observations(value_kind=None|"categorical") delegates
to the categorical merger; both produce the same output."""
obs = _pad(value="typed", count=_T.CATEGORICAL_WINDOW_N)
a = aggregate_observations(obs)
b = aggregate_observations(obs, value_kind="categorical")
c = aggregate_categorical(obs)
assert a == b == c
def test_dispatcher_rejects_unimplemented_kinds() -> None:
"""numeric / hash kinds land in Phase 3; surface the gap loudly
so a misuse doesn't silently fall through to categorical."""
import pytest
obs = _pad(value=5000.0, count=5)
for kind in ("numeric", "hash"):
with pytest.raises(NotImplementedError):
aggregate_observations(obs, value_kind=kind)