Files
DECNET/decnet/realism/llm/circuit.py
anti 4e436da569 feat(realism): LLM enrichment for user-class file bodies
Stage 6 of the realism migration. User-class file bodies (note,
todo, draft, script) optionally get LLM-authored content; system
classes (cron / daemon logs, /tmp caches) stay template-only because
formulaic *is* the right look for them.

New surface:

- realism.llm.circuit.LLMCircuitBreaker — process-local sliding-window
  breaker. 3 consecutive failures trip open; 60s cooldown to half-open;
  half-open success closes, failure re-opens. Protects the orchestrator
  tick from sustained Ollama wedges (per-call timeout already covers
  one-shot hangs).
- realism.prompts._style — em-dash suppression lifted from the
  email prompt. Persona.uses_llms_heavily opts out per the
  feedback_em_dash_llm_tell.md memory. Includes strip_em_dashes
  belt-and-braces sub for output that slipped past the prompt rule.
- realism.prompts.filebody — class-conditioned prompts (note / todo
  / draft / script) with persona context, language pinning, output
  shape rule.
- realism.bodies.make_body_with_llm — async wrapper around make_body
  that calls the LLM when one is provided AND the breaker allows.
  Falls back to template on timeout / error / empty / system-class.

Wiring:

- scheduler.pick_file accepts optional llm + llm_breaker + llm_timeout.
  When the planner picks a create action and the content_class is a
  user-class, the body_hint is replaced with the LLM-authored body
  (or falls back to the deterministic body_hint).
- orchestrator.worker constructs get_llm() at startup gated by
  DECNET_REALISM_LLM env var (any non-empty value enables; empty /
  "off" / "none" / "0" disables). Passes llm + breaker through every
  tick.
- decnet orchestrate gains --llm/--no-llm flag overriding the env var.
2026-04-27 16:42:58 -04:00

100 lines
3.4 KiB
Python

"""Process-local circuit breaker for LLM calls.
Per-call timeouts (``asyncio.wait_for(llm.generate, timeout=...)``)
protect a single tick from a single hung Ollama. They do NOT protect
the worker from a *sustained* problem: 100 consecutive 60-second
timeouts chew up an hour of orchestrator time on dead requests before
anything notices.
This breaker watches a sliding window of recent outcomes and flips
``open`` after ``failure_threshold`` consecutive failures. Open
breakers short-circuit ``allow_call`` to ``False`` so callers fall
back to deterministic templates without the per-tick cost. After
``cooldown_seconds`` the breaker enters ``half_open`` and the next
call is allowed; success closes the breaker, failure re-opens it
with a fresh cooldown.
Process-local on purpose — cross-process state would require shared
memory and is overkill for a single orchestrator worker.
"""
from __future__ import annotations
import threading
import time
from enum import Enum
class _State(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class LLMCircuitBreaker:
"""Threadsafe sliding-window circuit breaker.
Default ``failure_threshold=3`` consecutive failures → open;
``cooldown_seconds=60`` of open before transitioning to
half-open. These match the realism worker's tick cadence: 3
consecutive 60s timeouts = 3 minutes of dead air, which is the
point at which a deterministic fallback is overdue.
"""
def __init__(
self,
*,
failure_threshold: int = 3,
cooldown_seconds: float = 60.0,
clock=time.monotonic,
) -> None:
self._failure_threshold = failure_threshold
self._cooldown = cooldown_seconds
self._clock = clock
self._lock = threading.Lock()
self._state = _State.CLOSED
self._consecutive_failures = 0
self._opened_at: float = 0.0
@property
def state(self) -> str:
with self._lock:
return self._state.value
def allow_call(self) -> bool:
"""Return True if the next call should run, False if it should
short-circuit to the fallback path.
Promotes ``open`` → ``half_open`` after the cooldown elapses
so the next caller acts as a probe.
"""
with self._lock:
if self._state == _State.CLOSED:
return True
if self._state == _State.HALF_OPEN:
return True
# OPEN: check cooldown.
if self._clock() - self._opened_at >= self._cooldown:
self._state = _State.HALF_OPEN
return True
return False
def record_success(self) -> None:
with self._lock:
self._state = _State.CLOSED
self._consecutive_failures = 0
self._opened_at = 0.0
def record_failure(self) -> None:
with self._lock:
if self._state == _State.HALF_OPEN:
# The probe call failed — re-open with a fresh cooldown.
self._state = _State.OPEN
self._opened_at = self._clock()
# Don't reset the failure count; the probe failure
# implies the underlying issue is unresolved.
return
self._consecutive_failures += 1
if self._consecutive_failures >= self._failure_threshold:
self._state = _State.OPEN
self._opened_at = self._clock()