diff --git a/decnet/profiler/behave_shell/_features/__init__.py b/decnet/profiler/behave_shell/_features/__init__.py index 7cba8bc2..eff85fce 100644 --- a/decnet/profiler/behave_shell/_features/__init__.py +++ b/decnet/profiler/behave_shell/_features/__init__.py @@ -25,6 +25,7 @@ from decnet.profiler.behave_shell._features.cognitive import ( inter_command_latency_class, ) from decnet.profiler.behave_shell._features.environmental import ( + locale, shell_type, terminal_multiplexer, ) @@ -73,4 +74,5 @@ FEATURES: tuple[FeatureFn, ...] = ( landing_ritual, shell_type, terminal_multiplexer, + locale, ) diff --git a/decnet/profiler/behave_shell/_features/environmental.py b/decnet/profiler/behave_shell/_features/environmental.py index a96d815b..cdba4722 100644 --- a/decnet/profiler/behave_shell/_features/environmental.py +++ b/decnet/profiler/behave_shell/_features/environmental.py @@ -7,12 +7,25 @@ which F.1 / F.3 / E.4 read. Step F.1: ``environmental.shell_type``. Step F.2: ``environmental.terminal_multiplexer``. +Step F.3: ``environmental.locale``. """ from __future__ import annotations import collections +import re from typing import Iterator +from decnet_behave_core.spec.envelope import Observation + +from decnet.profiler.behave_shell._ctx import SessionContext +from decnet.profiler.behave_shell._features._emit import make_observation +from decnet.profiler.behave_shell._parse import PromptLine, strip_ansi +from decnet.profiler.behave_shell._thresholds import ( + LOCALE_MIN_VALUE_LENGTH, + SHELL_TYPE_MIN_PROMPTS, +) + + # Multiplexer fingerprints scanned over RAW output (multiplexer escapes # ARE ANSI sequences, so we must NOT strip-ANSI before searching). # Sources: @@ -31,14 +44,42 @@ _SCREEN_MARKERS: tuple[str, ...] = ( "\x1b]83;", ) -from decnet_behave_core.spec.envelope import Observation -from decnet.profiler.behave_shell._ctx import SessionContext -from decnet.profiler.behave_shell._features._emit import make_observation -from decnet.profiler.behave_shell._parse import PromptLine -from decnet.profiler.behave_shell._thresholds import ( - SHELL_TYPE_MIN_PROMPTS, +# Locale envvar regex: matches `KEY=VALUE` where KEY is one of the +# three locale envvars and VALUE is a POSIX locale name. The value +# pattern is intentionally restrictive — letters, underscore for the +# territory delimiter, optional codeset (.UTF-8 / .utf8), optional +# modifier (@euro). The trailing `(?=[\s'\"\\$]|$)` anchors the +# match against shell quoting and end-of-line. +_LOCALE_VALUE_RE = re.compile( + r"(?PLC_ALL|LANG|LC_CTYPE)=(?P[A-Za-z]{2,3}" + r"(?:_[A-Za-z]{2,3})?(?:\.[A-Za-z0-9-]+)?(?:@[A-Za-z0-9]+)?|C|POSIX)" ) +_LOCALE_KEY_PRIORITY: dict[str, int] = {"LC_ALL": 3, "LANG": 2, "LC_CTYPE": 1} + + +def _to_bcp47(posix_value: str) -> str | None: + """Normalise a POSIX locale value to a BCP-47 tag. + + Returns: + ``None`` when the value is malformed (caller skips emission). + ``"und"`` for ``C`` / ``POSIX`` (BCP-47 'undetermined'). + Otherwise ``language-REGION`` (e.g. ``en-US``, ``pt-BR``); + codeset / modifier suffixes dropped (BCP-47 doesn't carry them). + """ + if posix_value in ("C", "POSIX"): + return "und" + base = posix_value.split(".", 1)[0].split("@", 1)[0] + parts = base.split("_") + if not parts or not parts[0].isalpha() or len(parts[0]) < 2: + return None + lang = parts[0].lower() + if len(parts) == 1: + return lang + region = parts[1] + if not region.isalpha(): + return None + return f"{lang}-{region.upper()}" def _classify_shell_from_prompt(p: PromptLine) -> str: @@ -139,3 +180,51 @@ def terminal_multiplexer(ctx: SessionContext) -> Iterator[Observation]: value=value, confidence=confidence, ) + + +def locale(ctx: SessionContext) -> Iterator[Observation]: + """Emit ``environmental.locale`` (free-string BCP-47 tag). + + Searches the ANSI-stripped output stream for ``LANG=``, + ``LC_ALL=``, or ``LC_CTYPE=`` substrings — emitted when the + operator runs ``env``, ``locale``, or ``printenv``. Highest-priority + key wins (``LC_ALL`` > ``LANG`` > ``LC_CTYPE``); the POSIX value is + normalised to BCP-47: + + * ``en_US.UTF-8`` → ``en-US`` + * ``pt_BR.UTF-8`` → ``pt-BR`` + * ``C`` / ``POSIX`` → ``und`` + * malformed → skip emission + + Skip emission when no envvar dump is found in the output — + silence rather than fabricating a default. + """ + if not ctx.commands: + return + # Concatenate output; strip ANSI once (locale values aren't escape + # sequences themselves so the strip is safe). + raw = "".join(d for _t, _k, d in ctx.output_events) + if not raw: + return + text = strip_ansi(raw) + + best_priority = 0 + best_value: str | None = None + for m in _LOCALE_VALUE_RE.finditer(text): + prio = _LOCALE_KEY_PRIORITY[m.group("key")] + if prio <= best_priority: + continue + bcp47 = _to_bcp47(m.group("val")) + if bcp47 is None or len(bcp47) < LOCALE_MIN_VALUE_LENGTH: + continue + best_priority = prio + best_value = bcp47 + + if best_value is None: + return + yield make_observation( + ctx, + primitive="environmental.locale", + value=best_value, + confidence=0.80, + ) diff --git a/decnet/profiler/behave_shell/_thresholds.py b/decnet/profiler/behave_shell/_thresholds.py index 3b49d0ae..dfc98c5f 100644 --- a/decnet/profiler/behave_shell/_thresholds.py +++ b/decnet/profiler/behave_shell/_thresholds.py @@ -233,6 +233,11 @@ PROMPT_LINE_MAX_CHARS: int = 256 # honesty). Above, the shell-type vote is robust. SHELL_TYPE_MIN_PROMPTS: int = 3 +# ── environmental.locale (Step F.3) ──────────────────────────────────────── +# Below this many characters in the parsed locale value, treat as +# noise and skip emission (a single 'C' or 'en' is too thin). +LOCALE_MIN_VALUE_LENGTH: int = 2 + # ── motor.keystroke_cadence (Step B.1) ────────────────────────────────────── # Typing bursts split at gaps > IKI_THINK_MAX_S so think-pauses between # commands don't inflate the within-burst CV. Mirrors the prototype's diff --git a/tests/profiler/behave_shell/test_environmental_locale.py b/tests/profiler/behave_shell/test_environmental_locale.py new file mode 100644 index 00000000..fa213bce --- /dev/null +++ b/tests/profiler/behave_shell/test_environmental_locale.py @@ -0,0 +1,95 @@ +"""Step F.3: ``environmental.locale``.""" +from __future__ import annotations + +from decnet.profiler.behave_shell import extract_session +from decnet.profiler.behave_shell._features.environmental import _to_bcp47 +from decnet.profiler.behave_shell._parse import AsciinemaEvent + + +PRIMITIVE = "environmental.locale" + + +def _of(observations: list, primitive: str): + obs = [o for o in observations if o.primitive == primitive] + assert len(obs) == 1, f"expected exactly one {primitive}, got {len(obs)}" + return obs[0] + + +def _typed(text: str, t0: float = 0.0, dt: float = 0.05) -> list[AsciinemaEvent]: + return [(t0 + i * dt, "i", c) for i, c in enumerate(text)] + + +# ── _to_bcp47 ────────────────────────────────────────────────────────────── + + +def test_to_bcp47_lang_region() -> None: + assert _to_bcp47("en_US.UTF-8") == "en-US" + assert _to_bcp47("pt_BR.UTF-8") == "pt-BR" + assert _to_bcp47("de_DE@euro") == "de-DE" + + +def test_to_bcp47_language_only() -> None: + assert _to_bcp47("fr") == "fr" + + +def test_to_bcp47_c_posix() -> None: + assert _to_bcp47("C") == "und" + assert _to_bcp47("POSIX") == "und" + + +def test_to_bcp47_malformed() -> None: + assert _to_bcp47("X") is None # too short + assert _to_bcp47("en_99") is None # non-alpha region + + +# ── feature integration ──────────────────────────────────────────────────── + + +def test_no_envvar_dump_no_emission() -> None: + events: list[AsciinemaEvent] = [ + *_typed("ls\r"), + (0.20, "o", "file1\nfile2\n"), + ] + out = list(extract_session(events, sid="loc-none")) + assert [o for o in out if o.primitive == PRIMITIVE] == [] + + +def test_lang_envvar_dump_emits_bcp47() -> None: + events: list[AsciinemaEvent] = [ + *_typed("env\r"), + (0.20, "o", "PATH=/usr/bin\nLANG=en_US.UTF-8\nUSER=anti\n"), + ] + obs = _of(list(extract_session(events, sid="loc-en")), PRIMITIVE) + assert obs.value == "en-US" + + +def test_lc_all_takes_precedence_over_lang() -> None: + events: list[AsciinemaEvent] = [ + *_typed("env\r"), + (0.20, "o", "LANG=en_US.UTF-8\nLC_ALL=pt_BR.UTF-8\nUSER=anti\n"), + ] + obs = _of(list(extract_session(events, sid="loc-prec")), PRIMITIVE) + assert obs.value == "pt-BR" + + +def test_c_locale_emits_und() -> None: + events: list[AsciinemaEvent] = [ + *_typed("env\r"), + (0.20, "o", "LANG=C\nUSER=anti\n"), + ] + obs = _of(list(extract_session(events, sid="loc-und")), PRIMITIVE) + assert obs.value == "und" + + +def test_pii_locale_value_only_no_surrounding_output() -> None: + """Surrounding output isn't leaked — only the parsed BCP-47 value.""" + events: list[AsciinemaEvent] = [ + *_typed("env\r"), + (0.20, "o", "SECRET_TOKEN=abcdef123\nLANG=en_US.UTF-8\n"), + ] + out = list(extract_session(events, sid="loc-pii")) + obs = _of(out, PRIMITIVE) + serialised = obs.model_dump_json() + assert "SECRET_TOKEN" not in serialised + assert "abcdef123" not in serialised + assert "en-US" in serialised