Stage 6 of the realism migration. User-class file bodies (note, todo, draft, script) optionally get LLM-authored content; system classes (cron / daemon logs, /tmp caches) stay template-only because formulaic *is* the right look for them. New surface: - realism.llm.circuit.LLMCircuitBreaker — process-local sliding-window breaker. 3 consecutive failures trip open; 60s cooldown to half-open; half-open success closes, failure re-opens. Protects the orchestrator tick from sustained Ollama wedges (per-call timeout already covers one-shot hangs). - realism.prompts._style — em-dash suppression lifted from the email prompt. Persona.uses_llms_heavily opts out per the feedback_em_dash_llm_tell.md memory. Includes strip_em_dashes belt-and-braces sub for output that slipped past the prompt rule. - realism.prompts.filebody — class-conditioned prompts (note / todo / draft / script) with persona context, language pinning, output shape rule. - realism.bodies.make_body_with_llm — async wrapper around make_body that calls the LLM when one is provided AND the breaker allows. Falls back to template on timeout / error / empty / system-class. Wiring: - scheduler.pick_file accepts optional llm + llm_breaker + llm_timeout. When the planner picks a create action and the content_class is a user-class, the body_hint is replaced with the LLM-authored body (or falls back to the deterministic body_hint). - orchestrator.worker constructs get_llm() at startup gated by DECNET_REALISM_LLM env var (any non-empty value enables; empty / "off" / "none" / "0" disables). Passes llm + breaker through every tick. - decnet orchestrate gains --llm/--no-llm flag overriding the env var.
100 lines
3.4 KiB
Python
100 lines
3.4 KiB
Python
"""Process-local circuit breaker for LLM calls.
|
|
|
|
Per-call timeouts (``asyncio.wait_for(llm.generate, timeout=...)``)
|
|
protect a single tick from a single hung Ollama. They do NOT protect
|
|
the worker from a *sustained* problem: 100 consecutive 60-second
|
|
timeouts chew up an hour of orchestrator time on dead requests before
|
|
anything notices.
|
|
|
|
This breaker watches a sliding window of recent outcomes and flips
|
|
``open`` after ``failure_threshold`` consecutive failures. Open
|
|
breakers short-circuit ``allow_call`` to ``False`` so callers fall
|
|
back to deterministic templates without the per-tick cost. After
|
|
``cooldown_seconds`` the breaker enters ``half_open`` and the next
|
|
call is allowed; success closes the breaker, failure re-opens it
|
|
with a fresh cooldown.
|
|
|
|
Process-local on purpose — cross-process state would require shared
|
|
memory and is overkill for a single orchestrator worker.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import threading
|
|
import time
|
|
from enum import Enum
|
|
|
|
|
|
class _State(Enum):
|
|
CLOSED = "closed"
|
|
OPEN = "open"
|
|
HALF_OPEN = "half_open"
|
|
|
|
|
|
class LLMCircuitBreaker:
|
|
"""Threadsafe sliding-window circuit breaker.
|
|
|
|
Default ``failure_threshold=3`` consecutive failures → open;
|
|
``cooldown_seconds=60`` of open before transitioning to
|
|
half-open. These match the realism worker's tick cadence: 3
|
|
consecutive 60s timeouts = 3 minutes of dead air, which is the
|
|
point at which a deterministic fallback is overdue.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
failure_threshold: int = 3,
|
|
cooldown_seconds: float = 60.0,
|
|
clock=time.monotonic,
|
|
) -> None:
|
|
self._failure_threshold = failure_threshold
|
|
self._cooldown = cooldown_seconds
|
|
self._clock = clock
|
|
self._lock = threading.Lock()
|
|
self._state = _State.CLOSED
|
|
self._consecutive_failures = 0
|
|
self._opened_at: float = 0.0
|
|
|
|
@property
|
|
def state(self) -> str:
|
|
with self._lock:
|
|
return self._state.value
|
|
|
|
def allow_call(self) -> bool:
|
|
"""Return True if the next call should run, False if it should
|
|
short-circuit to the fallback path.
|
|
|
|
Promotes ``open`` → ``half_open`` after the cooldown elapses
|
|
so the next caller acts as a probe.
|
|
"""
|
|
with self._lock:
|
|
if self._state == _State.CLOSED:
|
|
return True
|
|
if self._state == _State.HALF_OPEN:
|
|
return True
|
|
# OPEN: check cooldown.
|
|
if self._clock() - self._opened_at >= self._cooldown:
|
|
self._state = _State.HALF_OPEN
|
|
return True
|
|
return False
|
|
|
|
def record_success(self) -> None:
|
|
with self._lock:
|
|
self._state = _State.CLOSED
|
|
self._consecutive_failures = 0
|
|
self._opened_at = 0.0
|
|
|
|
def record_failure(self) -> None:
|
|
with self._lock:
|
|
if self._state == _State.HALF_OPEN:
|
|
# The probe call failed — re-open with a fresh cooldown.
|
|
self._state = _State.OPEN
|
|
self._opened_at = self._clock()
|
|
# Don't reset the failure count; the probe failure
|
|
# implies the underlying issue is unresolved.
|
|
return
|
|
self._consecutive_failures += 1
|
|
if self._consecutive_failures >= self._failure_threshold:
|
|
self._state = _State.OPEN
|
|
self._opened_at = self._clock()
|