feat(realism): LLM enrichment for user-class file bodies

Stage 6 of the realism migration. User-class file bodies (note,
todo, draft, script) optionally get LLM-authored content; system
classes (cron / daemon logs, /tmp caches) stay template-only because
formulaic *is* the right look for them.

New surface:

- realism.llm.circuit.LLMCircuitBreaker — process-local sliding-window
  breaker. 3 consecutive failures trip open; 60s cooldown to half-open;
  half-open success closes, failure re-opens. Protects the orchestrator
  tick from sustained Ollama wedges (per-call timeout already covers
  one-shot hangs).
- realism.prompts._style — em-dash suppression lifted from the
  email prompt. Persona.uses_llms_heavily opts out per the
  feedback_em_dash_llm_tell.md memory. Includes strip_em_dashes
  belt-and-braces sub for output that slipped past the prompt rule.
- realism.prompts.filebody — class-conditioned prompts (note / todo
  / draft / script) with persona context, language pinning, output
  shape rule.
- realism.bodies.make_body_with_llm — async wrapper around make_body
  that calls the LLM when one is provided AND the breaker allows.
  Falls back to template on timeout / error / empty / system-class.

Wiring:

- scheduler.pick_file accepts optional llm + llm_breaker + llm_timeout.
  When the planner picks a create action and the content_class is a
  user-class, the body_hint is replaced with the LLM-authored body
  (or falls back to the deterministic body_hint).
- orchestrator.worker constructs get_llm() at startup gated by
  DECNET_REALISM_LLM env var (any non-empty value enables; empty /
  "off" / "none" / "0" disables). Passes llm + breaker through every
  tick.
- decnet orchestrate gains --llm/--no-llm flag overriding the env var.
This commit is contained in:
2026-04-27 16:42:58 -04:00
parent b321e29002
commit 4e436da569
9 changed files with 625 additions and 11 deletions

View File

@@ -1,5 +1,7 @@
from __future__ import annotations
from typing import Optional
import typer
from . import utils as _utils
@@ -17,8 +19,16 @@ def register(app: typer.Typer) -> None:
False, "--daemon", "-d",
help="Detach to background as a daemon process",
),
llm: Optional[bool] = typer.Option(
None, "--llm/--no-llm",
help=(
"Enable / disable LLM enrichment of user-class file "
"bodies. Default reads $DECNET_REALISM_LLM (any "
"non-empty value enables; 'off' / unset disables)."
),
),
) -> None:
"""Inject synthetic life (inter-decky traffic + file ops) into the fleet."""
"""Inject synthetic life (inter-decky traffic + file ops + email) into the fleet."""
import asyncio
from decnet.orchestrator import orchestrator_worker
from decnet.web.dependencies import repo
@@ -27,14 +37,17 @@ def register(app: typer.Typer) -> None:
log.info("orchestrator daemonizing interval=%d", interval)
_utils._daemonize()
log.info("orchestrator starting interval=%d", interval)
log.info(
"orchestrator starting interval=%d llm=%s",
interval, "default" if llm is None else ("on" if llm else "off"),
)
console.print(
f"[bold cyan]Orchestrator starting[/] (interval: {interval}s)"
)
async def _run() -> None:
await repo.initialize()
await orchestrator_worker(repo, interval=interval)
await orchestrator_worker(repo, interval=interval, llm_enabled=llm)
try:
asyncio.run(_run())

View File

@@ -121,6 +121,9 @@ async def pick_file(
*,
now: Optional[datetime] = None,
rand: Optional[secrets.SystemRandom] = None,
llm: Any = None,
llm_breaker: Any = None,
llm_timeout: float = 60.0,
) -> Optional[Action]:
"""Realism-driven file action — create or edit.
@@ -179,17 +182,46 @@ async def pick_file(
synthetic_file_uuid=(edit_candidate or {}).get("uuid", ""),
mtime=plan.mtime,
)
# Create branch. If LLM is wired, optionally swap body_hint for
# an LLM-authored body. Always keep the deterministic body_hint
# as the fallback the function call returns when LLM
# times out / errors / breaker-trips.
body = plan.body_hint or ""
if llm is not None and plan.content_class.is_user_class():
persona_obj = _persona_by_name(enriched, plan.persona)
if persona_obj is not None:
from decnet.realism.bodies import make_body_with_llm
body = await make_body_with_llm(
plan.content_class,
persona_obj,
llm=llm,
breaker=llm_breaker,
timeout=llm_timeout,
rand=rng,
)
return FileAction(
dst_uuid=plan.decky_uuid,
dst_name=plan.decky_name,
path=plan.target_path,
content=plan.body_hint or "",
content=body,
persona=plan.persona,
content_class=plan.content_class.value,
mtime=plan.mtime,
)
def _persona_by_name(
enriched: list[dict[str, Any]], name: str,
) -> Optional[EmailPersona]:
"""Find the persona instance the planner used; ``None`` if missing."""
for decky in enriched:
for persona in decky.get("_realism_personas") or []:
if persona.name == name:
return persona
return None
async def _resolve_personas(
deckies: Sequence[dict[str, Any]],
repo: Any,

View File

@@ -20,8 +20,10 @@ from __future__ import annotations
import asyncio
import contextlib
import hashlib
import os
import secrets
from datetime import datetime, timezone
from typing import Any, Optional
from decnet.bus.factory import get_bus
from decnet.bus.publish import (
@@ -37,6 +39,7 @@ from decnet.orchestrator.emailgen import (
scheduler as email_scheduler,
)
from decnet.orchestrator.emailgen.scheduler import EmailAction
from decnet.realism.llm.circuit import LLMCircuitBreaker
from decnet.web.db.repository import BaseRepository
logger = get_logger("orchestrator")
@@ -65,14 +68,43 @@ async def orchestrator_worker(
repo: BaseRepository,
*,
interval: int = 60,
llm_enabled: Optional[bool] = None,
) -> None:
"""Periodically inject synthetic activity into the running fleet.
Runs as a long-lived asyncio task. Honours the bus control topic
(``system.orchestrator.control``) for graceful shutdown.
LLM enrichment for user-class file bodies is opt-in via the
``DECNET_REALISM_LLM`` env var (set to ``ollama`` / ``fake`` /
empty). Pass ``llm_enabled=False`` from the CLI to override
(``decnet orchestrate --no-llm``). When the LLM is unreachable
or wedged, a process-local circuit breaker
(:class:`LLMCircuitBreaker`) trips after 3 consecutive failures
and the worker falls back to deterministic templates for 60
seconds before re-probing.
"""
logger.info("orchestrator worker started interval=%ds", interval)
llm: Any = None
breaker: Optional[LLMCircuitBreaker] = None
if _llm_should_enable(llm_enabled):
try:
from decnet.realism.llm import get_llm
llm = get_llm()
breaker = LLMCircuitBreaker()
logger.info(
"orchestrator: LLM enrichment enabled backend=%s model=%s",
os.environ.get("DECNET_REALISM_LLM", "ollama"),
getattr(llm, "model", "?"),
)
except Exception as exc: # noqa: BLE001
logger.warning(
"orchestrator: LLM init failed, continuing without "
"enrichment: %s", exc,
)
llm = None
bus = None
try:
bus = get_bus(client_name="orchestrator")
@@ -98,7 +130,7 @@ async def orchestrator_worker(
if shutdown.is_set():
break
try:
await _one_tick(repo, bus)
await _one_tick(repo, bus, llm=llm, breaker=breaker)
except Exception as exc: # noqa: BLE001
logger.error("orchestrator tick failed: %s", exc)
tick_n += 1
@@ -148,10 +180,29 @@ def _roll_action_kind(rng: secrets.SystemRandom) -> str:
return _ACTION_WEIGHTS[-1][0] # unreachable, satisfy mypy
def _llm_should_enable(explicit: Optional[bool]) -> bool:
"""Resolve the LLM-enabled flag from CLI / env / defaults.
*explicit* takes precedence (``--llm`` / ``--no-llm``). When unset,
the env var ``DECNET_REALISM_LLM`` decides: any non-empty value
(``ollama`` / ``fake`` / etc.) enables; empty string or ``off`` /
``none`` / ``0`` / ``false`` disables.
"""
if explicit is not None:
return explicit
raw = os.environ.get("DECNET_REALISM_LLM", "").strip().lower()
if raw in ("", "off", "none", "0", "false", "disabled"):
return False
return True
async def _pick_action(
repo: BaseRepository,
deckies: list[dict],
rng: secrets.SystemRandom,
*,
llm: Any = None,
breaker: Optional[LLMCircuitBreaker] = None,
):
"""Roll an action-kind, then pick the matching action.
@@ -168,7 +219,10 @@ async def _pick_action(
if kind == "traffic":
action = scheduler.pick(deckies, rand=rng)
elif kind == "file":
action = await scheduler.pick_file(deckies, repo, rand=rng)
action = await scheduler.pick_file(
deckies, repo, rand=rng,
llm=llm, llm_breaker=breaker,
)
elif kind == "email":
try:
action = await email_scheduler.pick(repo, rand=rng)
@@ -182,11 +236,17 @@ async def _pick_action(
return None
async def _one_tick(repo: BaseRepository, bus) -> None:
async def _one_tick(
repo: BaseRepository,
bus,
*,
llm: Any = None,
breaker: Optional[LLMCircuitBreaker] = None,
) -> None:
deckies = await repo.list_running_deckies()
rng = secrets.SystemRandom()
action = await _pick_action(repo, deckies, rng)
action = await _pick_action(repo, deckies, rng, llm=llm, breaker=breaker)
if action is None:
ssh_eligible = sum(
1 for d in deckies

View File

@@ -22,12 +22,16 @@ respectively, not from realism.bodies.
"""
from __future__ import annotations
import asyncio
import secrets
from datetime import datetime, timezone
from typing import Callable, Optional
from decnet.logging import get_logger
from decnet.realism.taxonomy import ContentClass
log = get_logger("realism.bodies")
# ── User-class body generators ─────────────────────────────────────────────
@@ -220,9 +224,10 @@ def make_body(
) -> str:
"""Return deterministic body bytes (utf-8 string) for *content_class*.
Stage 3 ships templates only; stage 6 adds an optional
``LLMBackend`` parameter that, when supplied and the breaker is
closed, replaces the template return for user-classes.
Stage 3 ships templates only. :func:`make_body_with_llm` is the
LLM-aware variant added in stage 6 — kept on a separate name so
the deterministic path stays trivially callable from tests and
from the LLM fallback itself.
"""
rng = rand or secrets.SystemRandom()
gen = _BODIES.get(content_class)
@@ -233,6 +238,72 @@ def make_body(
return gen(persona, rng)
async def make_body_with_llm(
content_class: ContentClass,
persona, # EmailPersona — typed loosely to avoid an import cycle
*,
llm=None, # LLMBackend | None
breaker=None, # LLMCircuitBreaker | None
timeout: float = 60.0,
rand: Optional[secrets.SystemRandom] = None,
) -> str:
"""LLM-enriched body for user-classes; deterministic fallback otherwise.
Falls back to :func:`make_body` whenever:
* ``llm`` is None,
* ``breaker.allow_call()`` returns False (sustained failure),
* the LLM call times out or returns empty,
* the content class isn't a user-class (system-class content
should look formulaic, so we never invoke LLM there).
Em-dash stripping runs on the LLM output as a belt-and-braces
guard (see :mod:`decnet.realism.prompts._style`). The function
is async because LLM calls are; the deterministic path returns
immediately so the orchestrator's tick doesn't pay async overhead
when LLM is disabled.
"""
rng = rand or secrets.SystemRandom()
# System / canary / email classes never touch the LLM.
if not content_class.is_user_class():
return make_body(content_class, persona.name, rand=rng)
if llm is None or (breaker is not None and not breaker.allow_call()):
return make_body(content_class, persona.name, rand=rng)
# Lazy imports keep the prompt + style modules out of the
# deterministic path's import graph.
from decnet.realism.llm.base import LLMTimeout
from decnet.realism.prompts import filebody as _filebody
from decnet.realism.prompts._style import strip_em_dashes
prompt = _filebody.build(content_class, persona)
try:
result = await asyncio.wait_for(llm.generate(prompt), timeout=timeout)
except (LLMTimeout, asyncio.TimeoutError):
log.debug("realism.bodies LLM timeout class=%s persona=%s",
content_class.value, persona.name)
if breaker is not None:
breaker.record_failure()
return make_body(content_class, persona.name, rand=rng)
except Exception as exc: # noqa: BLE001
log.warning("realism.bodies LLM error class=%s persona=%s: %s",
content_class.value, persona.name, exc)
if breaker is not None:
breaker.record_failure()
return make_body(content_class, persona.name, rand=rng)
if not result.success or not result.text.strip():
if breaker is not None:
breaker.record_failure()
return make_body(content_class, persona.name, rand=rng)
if breaker is not None:
breaker.record_success()
return strip_em_dashes(result.text.rstrip() + "\n", persona)
# ── Edit-in-place mutators ─────────────────────────────────────────────────
# Stage 3b: deterministic per-class mutations. The contract: take the
# previous body bytes, return a plausible *next* iteration (append a

View File

@@ -0,0 +1,99 @@
"""Process-local circuit breaker for LLM calls.
Per-call timeouts (``asyncio.wait_for(llm.generate, timeout=...)``)
protect a single tick from a single hung Ollama. They do NOT protect
the worker from a *sustained* problem: 100 consecutive 60-second
timeouts chew up an hour of orchestrator time on dead requests before
anything notices.
This breaker watches a sliding window of recent outcomes and flips
``open`` after ``failure_threshold`` consecutive failures. Open
breakers short-circuit ``allow_call`` to ``False`` so callers fall
back to deterministic templates without the per-tick cost. After
``cooldown_seconds`` the breaker enters ``half_open`` and the next
call is allowed; success closes the breaker, failure re-opens it
with a fresh cooldown.
Process-local on purpose — cross-process state would require shared
memory and is overkill for a single orchestrator worker.
"""
from __future__ import annotations
import threading
import time
from enum import Enum
class _State(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class LLMCircuitBreaker:
"""Threadsafe sliding-window circuit breaker.
Default ``failure_threshold=3`` consecutive failures → open;
``cooldown_seconds=60`` of open before transitioning to
half-open. These match the realism worker's tick cadence: 3
consecutive 60s timeouts = 3 minutes of dead air, which is the
point at which a deterministic fallback is overdue.
"""
def __init__(
self,
*,
failure_threshold: int = 3,
cooldown_seconds: float = 60.0,
clock=time.monotonic,
) -> None:
self._failure_threshold = failure_threshold
self._cooldown = cooldown_seconds
self._clock = clock
self._lock = threading.Lock()
self._state = _State.CLOSED
self._consecutive_failures = 0
self._opened_at: float = 0.0
@property
def state(self) -> str:
with self._lock:
return self._state.value
def allow_call(self) -> bool:
"""Return True if the next call should run, False if it should
short-circuit to the fallback path.
Promotes ``open`` → ``half_open`` after the cooldown elapses
so the next caller acts as a probe.
"""
with self._lock:
if self._state == _State.CLOSED:
return True
if self._state == _State.HALF_OPEN:
return True
# OPEN: check cooldown.
if self._clock() - self._opened_at >= self._cooldown:
self._state = _State.HALF_OPEN
return True
return False
def record_success(self) -> None:
with self._lock:
self._state = _State.CLOSED
self._consecutive_failures = 0
self._opened_at = 0.0
def record_failure(self) -> None:
with self._lock:
if self._state == _State.HALF_OPEN:
# The probe call failed — re-open with a fresh cooldown.
self._state = _State.OPEN
self._opened_at = self._clock()
# Don't reset the failure count; the probe failure
# implies the underlying issue is unresolved.
return
self._consecutive_failures += 1
if self._consecutive_failures >= self._failure_threshold:
self._state = _State.OPEN
self._opened_at = self._clock()

View File

@@ -0,0 +1,39 @@
"""Shared stylometric guards for LLM-bound prompts.
Lifted from the original ``orchestrator.emailgen.prompt`` em-dash
block so file-class prompts (note / todo / draft / script bodies)
pick up the same suppression. Per the
``feedback_em_dash_llm_tell.md`` memory: em-dashes (—) are a strong
LLM-authorship tell, suppress by default; allow only for personas
explicitly opted in via ``EmailPersona.uses_llms_heavily``.
"""
from __future__ import annotations
from decnet.realism.personas import EmailPersona
_SUPPRESS_RULE = (
"Do NOT use em-dashes (—). Use commas, periods, or "
"parentheses instead. Em-dashes are a tell."
)
_ALLOW_RULE = (
"Em-dashes are fine — this persona uses them naturally. "
"Write in your usual style."
)
def em_dash_rule(persona: EmailPersona) -> str:
"""Return the em-dash instruction line for *persona*'s prompt."""
if persona.uses_llms_heavily:
return _ALLOW_RULE
return _SUPPRESS_RULE
def strip_em_dashes(text: str, persona: EmailPersona) -> str:
"""Belt-and-braces: even with the prompt rule, small models leak
em-dashes occasionally. Substitute with comma+space so the
output reads naturally; opt-in personas pass through unchanged.
"""
if persona.uses_llms_heavily:
return text
return text.replace("", ", ").replace("", ", ")

View File

@@ -0,0 +1,91 @@
"""Class-conditioned prompt builder for user-class file bodies.
Stage 6 of the realism migration. Only user-classes (``note``,
``todo``, ``draft``, ``script``) get LLM enrichment — system-class
content (cron logs, daemon logs, /tmp caches) is *supposed* to look
formulaic, and an LLM-authored cron log is more suspicious than a
templated one.
The prompt asks for *short* output (LLM-authored ten-page essays in
``~/notes.txt`` are an instant tell) and pins the exit shape so the
worker doesn't need to scrape boilerplate. Em-dash suppression
flows through :mod:`decnet.realism.prompts._style`.
"""
from __future__ import annotations
from decnet.realism.personas import EmailPersona
from decnet.realism.prompts._style import em_dash_rule
from decnet.realism.taxonomy import ContentClass
_LANGUAGE_NAMES = {
"en": "English", "es": "Spanish", "pt": "Portuguese",
"fr": "French", "de": "German", "it": "Italian",
"nl": "Dutch", "ja": "Japanese", "zh": "Chinese",
}
def _lang_label(code: str) -> str:
return _LANGUAGE_NAMES.get((code or "en").lower(), code or "English")
_CLASS_GUIDANCE: dict[ContentClass, str] = {
ContentClass.NOTE: (
"A personal note file the persona keeps on their dev box. "
"26 short lines. Mix of TODOs, half-formed thoughts, "
"shorthand reminders. NOT a polished document. No headers "
"or markdown sections."
),
ContentClass.TODO: (
"A markdown TODO list the persona keeps on their dev box. "
"38 items in `- [ ] item` / `- [x] item` form. Some checked, "
"some not. Items are short, work-flavoured, lowercase, no "
"prose paragraphs. No headers. No introductory sentence."
),
ContentClass.DRAFT: (
"A short draft email or memo the persona is working on. "
"24 short paragraphs, conversational tone. No subject line, "
"no headers — this is the body in a notes file, not a sent "
"email. Sign off the way the persona would in their voice."
),
ContentClass.SCRIPT: (
"A short utility script the persona wrote. Pick a plausible "
"interpreter (bash or python3) and start with the matching "
"shebang. 1025 lines. Real-feeling intent (a backup, a "
"log rotation, a cleanup). Inline comments allowed but sparse."
),
}
def build(
content_class: ContentClass,
persona: EmailPersona,
) -> str:
"""Return a prompt for one body of *content_class* by *persona*.
Output the LLM is expected to produce: *just the file body*, no
commentary, no markdown fences. Caller substitutes em-dashes
server-side via :func:`decnet.realism.prompts._style.strip_em_dashes`
as a belt-and-braces guard.
"""
guidance = _CLASS_GUIDANCE.get(content_class)
if guidance is None:
raise KeyError(
f"no filebody prompt registered for content_class={content_class!r}"
)
language = _lang_label(persona.language or "en")
return (
f"You are writing one short file the persona below would "
f"plausibly keep on their dev box.\n\n"
f"Persona:\n"
f"- Name: {persona.name}\n"
f"- Role: {persona.role}\n"
f"- Tone: {persona.tone_custom if persona.tone == 'custom' and persona.tone_custom else persona.tone}\n\n"
f"File class: {content_class.value}\n"
f"Guidance: {guidance}\n\n"
f"Hard rules:\n"
f"1. Write the file body in {language}. Do not translate or code-switch.\n"
f"2. {em_dash_rule(persona)}\n"
f"3. Output ONLY the file body. No commentary, no markdown "
f" fences, no preamble like 'Here is the file:'.\n"
).strip()

View File

@@ -0,0 +1,128 @@
"""LLM-enriched body generation with deterministic fallback."""
from __future__ import annotations
import asyncio
import pytest
from decnet.realism.bodies import make_body_with_llm
from decnet.realism.llm.base import LLMResult, LLMTimeout
from decnet.realism.llm.circuit import LLMCircuitBreaker
from decnet.realism.personas import EmailPersona
from decnet.realism.taxonomy import ContentClass
def _persona(uses_llms: bool = False) -> EmailPersona:
return EmailPersona(
name="admin", email="admin@corp.com", role="ops",
tone="direct", mannerisms=["uses bullets"],
active_hours="00:00-00:00",
uses_llms_heavily=uses_llms,
)
class _StubLLM:
"""Async stub: returns canned LLMResult; no subprocess work."""
def __init__(self, *, text: str = "stub body\n", success: bool = True):
self.model = "stub-model"
self.timeout = 1.0
self._result = LLMResult(
success=success, text=text, model=self.model, latency_ms=1,
)
self.calls = 0
async def generate(self, prompt: str) -> LLMResult:
self.calls += 1
return self._result
class _TimeoutLLM:
model = "timeout-model"
timeout = 0.05
async def generate(self, prompt: str) -> LLMResult:
raise LLMTimeout("simulated")
@pytest.mark.asyncio
async def test_no_llm_falls_back_to_template() -> None:
body = await make_body_with_llm(ContentClass.NOTE, _persona(), llm=None)
assert body.strip() # template path returns non-empty
@pytest.mark.asyncio
async def test_llm_success_returns_llm_text() -> None:
llm = _StubLLM(text="LLM-produced note body\n")
body = await make_body_with_llm(
ContentClass.NOTE, _persona(), llm=llm,
)
assert "LLM-produced note body" in body
assert llm.calls == 1
@pytest.mark.asyncio
async def test_em_dashes_are_stripped_for_default_persona() -> None:
llm = _StubLLM(text="Hi — quick update — see attached.\n")
body = await make_body_with_llm(
ContentClass.NOTE, _persona(uses_llms=False), llm=llm,
)
assert "" not in body
@pytest.mark.asyncio
async def test_em_dashes_pass_through_for_llm_heavy_persona() -> None:
llm = _StubLLM(text="Hi — quick update — see attached.\n")
body = await make_body_with_llm(
ContentClass.NOTE, _persona(uses_llms=True), llm=llm,
)
assert "" in body
@pytest.mark.asyncio
async def test_timeout_falls_back_to_template_and_records_failure() -> None:
breaker = LLMCircuitBreaker(failure_threshold=3, cooldown_seconds=10.0)
body = await make_body_with_llm(
ContentClass.NOTE, _persona(),
llm=_TimeoutLLM(), breaker=breaker, timeout=0.01,
)
assert body.strip() # template fallback returned non-empty
assert breaker.state == "closed" # one failure isn't enough to trip
@pytest.mark.asyncio
async def test_breaker_open_skips_llm_call() -> None:
breaker = LLMCircuitBreaker(failure_threshold=1, cooldown_seconds=60.0)
breaker.record_failure() # trip immediately
assert breaker.allow_call() is False
llm = _StubLLM()
body = await make_body_with_llm(
ContentClass.NOTE, _persona(),
llm=llm, breaker=breaker,
)
# LLM was NOT called (breaker open) — fallback to template.
assert llm.calls == 0
assert body.strip()
@pytest.mark.asyncio
async def test_system_class_never_invokes_llm() -> None:
llm = _StubLLM()
body = await make_body_with_llm(
ContentClass.LOG_CRON, _persona(), llm=llm,
)
# System-class content is supposed to look formulaic; LLM-authored
# cron logs would be a regression in realism.
assert llm.calls == 0
assert "CRON[" in body # template path
@pytest.mark.asyncio
async def test_empty_llm_response_falls_back() -> None:
llm = _StubLLM(text="", success=True)
body = await make_body_with_llm(
ContentClass.NOTE, _persona(), llm=llm,
)
# LLM ran but produced empty output → template fallback.
assert body.strip()

View File

@@ -0,0 +1,81 @@
"""LLMCircuitBreaker — process-local sliding-window breaker."""
from __future__ import annotations
from decnet.realism.llm.circuit import LLMCircuitBreaker
def test_starts_closed_and_allows_calls() -> None:
breaker = LLMCircuitBreaker()
assert breaker.state == "closed"
assert breaker.allow_call() is True
def test_trips_open_after_threshold_failures() -> None:
clock_value = [0.0]
breaker = LLMCircuitBreaker(
failure_threshold=3, cooldown_seconds=60.0,
clock=lambda: clock_value[0],
)
breaker.record_failure()
assert breaker.state == "closed"
breaker.record_failure()
assert breaker.state == "closed"
breaker.record_failure()
assert breaker.state == "open"
assert breaker.allow_call() is False
def test_success_resets_consecutive_failure_count() -> None:
breaker = LLMCircuitBreaker(failure_threshold=3)
breaker.record_failure()
breaker.record_failure()
breaker.record_success()
breaker.record_failure()
breaker.record_failure()
assert breaker.state == "closed" # only 2 since the success
def test_half_open_after_cooldown() -> None:
clock_value = [0.0]
breaker = LLMCircuitBreaker(
failure_threshold=2, cooldown_seconds=10.0,
clock=lambda: clock_value[0],
)
breaker.record_failure()
breaker.record_failure()
assert breaker.state == "open"
assert breaker.allow_call() is False
clock_value[0] = 11.0
assert breaker.allow_call() is True
assert breaker.state == "half_open"
def test_half_open_failure_re_opens() -> None:
clock_value = [0.0]
breaker = LLMCircuitBreaker(
failure_threshold=2, cooldown_seconds=5.0,
clock=lambda: clock_value[0],
)
breaker.record_failure()
breaker.record_failure()
clock_value[0] = 6.0
breaker.allow_call()
assert breaker.state == "half_open"
breaker.record_failure()
assert breaker.state == "open"
def test_half_open_success_closes() -> None:
clock_value = [0.0]
breaker = LLMCircuitBreaker(
failure_threshold=2, cooldown_seconds=5.0,
clock=lambda: clock_value[0],
)
breaker.record_failure()
breaker.record_failure()
clock_value[0] = 6.0
breaker.allow_call()
breaker.record_success()
assert breaker.state == "closed"
assert breaker.allow_call() is True