feat(profiler/behave_shell): G.0 intent lexicon + lexical counter pass
Phase G shared infrastructure (no primitive yet emitted): * New `_intent.py` — five precomputed first-token-hash sets (recon / exfil / persistence / lateral / destructive) with documented precedence, plus opsec-history and three lexeme sets (positive / negative / obscenity) for the typed-text counter pass. Stop words that collide with registry value vocabulary (`no`, `hell`, `ok`) are deliberately excluded — the PII regression test catches such collisions. * `_typed_char_histograms()` extended with five integer counters populated in the same single-pass walk: `obscenity_hits`, `positive_lex_hits`, `negative_lex_hits`, `caps_run_max`, `bang_run_max`. Longest-suffix match against bounded lexicon (`LEXEME_MAX_LEN`); paste-class events excluded. * `SessionContext` widened by the same five fields. Drives G.5 (valence), G.6 (arousal), G.8 (frustration_venting) without retaining raw operator text. * Bump twisted >= 26.4.0rc2 to clear CVE-2026-42304 (pre-existing, caught by pre-commit pip-audit). Adjust ftp template type-ignore code from attr-defined to misc to match the new Twisted typing. PII discipline: same shape as F.4 — fixed-vocabulary integer counters on ctx, never on observations.
This commit is contained in:
@@ -14,6 +14,12 @@ import math
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Iterable, Mapping
|
||||
|
||||
from decnet.profiler.behave_shell._intent import (
|
||||
LEXEME_MAX_LEN,
|
||||
NEGATIVE_LEXEMES,
|
||||
OBSCENITY_LEXEMES,
|
||||
POSITIVE_LEXEMES,
|
||||
)
|
||||
from decnet.profiler.behave_shell._parse import (
|
||||
AsciinemaEvent,
|
||||
Command,
|
||||
@@ -34,6 +40,20 @@ from decnet.profiler.behave_shell._thresholds import (
|
||||
)
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class _LexCounters:
|
||||
"""Lexical counters from the typed-text walk (G.0).
|
||||
|
||||
Internal to the ctx-builder; flattened onto SessionContext fields
|
||||
in :func:`build_session_context`.
|
||||
"""
|
||||
obscenity_hits: int = 0
|
||||
positive_lex_hits: int = 0
|
||||
negative_lex_hits: int = 0
|
||||
caps_run_max: int = 0
|
||||
bang_run_max: int = 0
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class SessionContext:
|
||||
sid: str
|
||||
@@ -76,6 +96,16 @@ class SessionContext:
|
||||
typed_bigram_counts: Mapping[str, int] = field(default_factory=dict)
|
||||
typed_letter_count: int = 0
|
||||
|
||||
# Step G.0 derivations — lexical counters from the same single-pass
|
||||
# typed-text walk. No raw text retained; only fixed-vocabulary
|
||||
# membership counts and run-lengths. Drives valence (G.5), arousal
|
||||
# (G.6), and frustration_venting (G.8).
|
||||
obscenity_hits: int = 0
|
||||
positive_lex_hits: int = 0
|
||||
negative_lex_hits: int = 0
|
||||
caps_run_max: int = 0
|
||||
bang_run_max: int = 0
|
||||
|
||||
|
||||
def _detect_paste_bursts(
|
||||
inputs: list[AsciinemaEvent],
|
||||
@@ -309,28 +339,83 @@ def _output_bytes_between(
|
||||
|
||||
def _typed_char_histograms(
|
||||
inputs: list[AsciinemaEvent],
|
||||
) -> tuple[Mapping[str, int], Mapping[str, int], int]:
|
||||
"""Walk input events, build typed-only unigram + bigram histograms.
|
||||
) -> tuple[Mapping[str, int], Mapping[str, int], int, _LexCounters]:
|
||||
"""Walk input events, build typed-only unigram + bigram histograms
|
||||
plus the Phase G lexical counters.
|
||||
|
||||
Skip paste-class events (``len(data) >= PASTE_MIN_CHARS_PER_EVENT``)
|
||||
— pasted text reveals nothing about the operator's keyboard. Letter
|
||||
bigrams chain only across consecutive ASCII-letter chars; a digit
|
||||
or punctuation character breaks the chain.
|
||||
— pasted text reveals nothing about the operator's keyboard or
|
||||
sentiment. Letter bigrams chain only across consecutive ASCII-letter
|
||||
chars; a digit or punctuation character breaks the chain.
|
||||
|
||||
Returns ``(unigrams, bigrams, total_letters)``. The bigram dict is
|
||||
truncated to the top ``LAYOUT_BIGRAM_TOP_N`` entries by count to
|
||||
bound memory (the layout signals only need the head of the
|
||||
distribution).
|
||||
Lexical counters (G.0): a small word buffer (≤ ``LEXEME_MAX_LEN``)
|
||||
accumulates ASCII-letter chars (case-folded). On any non-letter
|
||||
boundary, every suffix of the buffer is checked against
|
||||
``POSITIVE_LEXEMES`` / ``NEGATIVE_LEXEMES`` / ``OBSCENITY_LEXEMES``;
|
||||
the longest match wins (so ``fucking`` counts as one obscenity hit,
|
||||
not two — ``fuck`` + ``fucking``). Caps and bang runs are tracked
|
||||
in the same walk.
|
||||
|
||||
Returns ``(unigrams, bigrams, total_letters, lex_counters)``.
|
||||
"""
|
||||
unigrams: dict[str, int] = {}
|
||||
bigrams: dict[str, int] = {}
|
||||
total_letters = 0
|
||||
last_letter: str | None = None
|
||||
|
||||
word_buf: list[str] = []
|
||||
obscenity_hits = 0
|
||||
positive_lex_hits = 0
|
||||
negative_lex_hits = 0
|
||||
caps_run_cur = 0
|
||||
caps_run_max = 0
|
||||
bang_run_cur = 0
|
||||
bang_run_max = 0
|
||||
|
||||
def _flush_word() -> tuple[int, int, int]:
|
||||
"""Match longest lexeme suffix in ``word_buf``; return per-set deltas."""
|
||||
if not word_buf:
|
||||
return 0, 0, 0
|
||||
s = "".join(word_buf)
|
||||
# Longest-suffix scan against fixed lexicons.
|
||||
for length in range(min(len(s), LEXEME_MAX_LEN), 0, -1):
|
||||
suffix = s[-length:]
|
||||
if suffix in OBSCENITY_LEXEMES:
|
||||
return 1, 0, 0
|
||||
if suffix in POSITIVE_LEXEMES:
|
||||
return 0, 1, 0
|
||||
if suffix in NEGATIVE_LEXEMES:
|
||||
return 0, 0, 1
|
||||
return 0, 0, 0
|
||||
|
||||
for _t, _kind, data in inputs:
|
||||
if len(data) >= PASTE_MIN_CHARS_PER_EVENT:
|
||||
# Paste boundary breaks every running counter.
|
||||
last_letter = None
|
||||
obs_d, pos_d, neg_d = _flush_word()
|
||||
obscenity_hits += obs_d
|
||||
positive_lex_hits += pos_d
|
||||
negative_lex_hits += neg_d
|
||||
word_buf.clear()
|
||||
caps_run_cur = 0
|
||||
bang_run_cur = 0
|
||||
continue
|
||||
for c in data:
|
||||
# Caps-run tracking
|
||||
if c.isascii() and c.isupper():
|
||||
caps_run_cur += 1
|
||||
if caps_run_cur > caps_run_max:
|
||||
caps_run_max = caps_run_cur
|
||||
else:
|
||||
caps_run_cur = 0
|
||||
# Bang-run tracking
|
||||
if c == "!":
|
||||
bang_run_cur += 1
|
||||
if bang_run_cur > bang_run_max:
|
||||
bang_run_max = bang_run_cur
|
||||
else:
|
||||
bang_run_cur = 0
|
||||
# Histogram + lexeme buffering
|
||||
if c.isascii() and c.isalpha():
|
||||
lower = c.lower()
|
||||
unigrams[lower] = unigrams.get(lower, 0) + 1
|
||||
@@ -339,12 +424,34 @@ def _typed_char_histograms(
|
||||
big = last_letter + lower
|
||||
bigrams[big] = bigrams.get(big, 0) + 1
|
||||
last_letter = lower
|
||||
word_buf.append(lower)
|
||||
if len(word_buf) > LEXEME_MAX_LEN:
|
||||
# Slide window — only the tail can match a lexeme.
|
||||
word_buf[:] = word_buf[-LEXEME_MAX_LEN:]
|
||||
else:
|
||||
last_letter = None
|
||||
obs_d, pos_d, neg_d = _flush_word()
|
||||
obscenity_hits += obs_d
|
||||
positive_lex_hits += pos_d
|
||||
negative_lex_hits += neg_d
|
||||
word_buf.clear()
|
||||
|
||||
# Trailing word (no boundary at end of input).
|
||||
obs_d, pos_d, neg_d = _flush_word()
|
||||
obscenity_hits += obs_d
|
||||
positive_lex_hits += pos_d
|
||||
negative_lex_hits += neg_d
|
||||
|
||||
if len(bigrams) > LAYOUT_BIGRAM_TOP_N:
|
||||
top = sorted(bigrams.items(), key=lambda kv: -kv[1])[:LAYOUT_BIGRAM_TOP_N]
|
||||
bigrams = dict(top)
|
||||
return unigrams, bigrams, total_letters
|
||||
return unigrams, bigrams, total_letters, _LexCounters(
|
||||
obscenity_hits=obscenity_hits,
|
||||
positive_lex_hits=positive_lex_hits,
|
||||
negative_lex_hits=negative_lex_hits,
|
||||
caps_run_max=caps_run_max,
|
||||
bang_run_max=bang_run_max,
|
||||
)
|
||||
|
||||
|
||||
def _output_window(
|
||||
@@ -432,7 +539,7 @@ def build_session_context(
|
||||
for i in range(len(commands) - 1)
|
||||
)
|
||||
intra_command_iats = _per_command_iats(commands, inputs)
|
||||
typed_uni, typed_bi, typed_letters = _typed_char_histograms(inputs)
|
||||
typed_uni, typed_bi, typed_letters, lex = _typed_char_histograms(inputs)
|
||||
|
||||
return SessionContext(
|
||||
sid=sid,
|
||||
@@ -458,4 +565,9 @@ def build_session_context(
|
||||
typed_unigram_counts=typed_uni,
|
||||
typed_bigram_counts=typed_bi,
|
||||
typed_letter_count=typed_letters,
|
||||
obscenity_hits=lex.obscenity_hits,
|
||||
positive_lex_hits=lex.positive_lex_hits,
|
||||
negative_lex_hits=lex.negative_lex_hits,
|
||||
caps_run_max=lex.caps_run_max,
|
||||
bang_run_max=lex.bang_run_max,
|
||||
)
|
||||
|
||||
115
decnet/profiler/behave_shell/_intent.py
Normal file
115
decnet/profiler/behave_shell/_intent.py
Normal file
@@ -0,0 +1,115 @@
|
||||
"""Phase G — shared command-intent + lexical-counter vocabulary.
|
||||
|
||||
Used by:
|
||||
* ``operational.objective`` (G.1) via ``INTENT_SETS``
|
||||
* ``operational.opsec_discipline`` (G.2) via ``OPSEC_HISTORY_TOKENS``
|
||||
* ``emotional_valence.valence`` (G.5) via ``POSITIVE_LEXEMES`` / ``NEGATIVE_LEXEMES``
|
||||
* ``emotional_valence.frustration_venting`` (G.8) via ``OBSCENITY_LEXEMES``
|
||||
|
||||
All ``*_TOKENS`` frozensets contain ``hash_token()`` SHA256 hexes — the
|
||||
only PII-safe handle on a command's first token. Lexeme frozensets
|
||||
contain lowercased word forms (used by the typed-text counter pass in
|
||||
``_ctx.py`` to *count* matches without retaining text).
|
||||
|
||||
Set membership is intentionally overlapping. ``rm`` rides in
|
||||
``DESTRUCTIVE_TOKENS`` AND in the cleanup vocabulary; ``unset`` rides
|
||||
in ``OPSEC_HISTORY_TOKENS`` AND in cleanup. G.1's classifier resolves
|
||||
multi-membership by fixed precedence (see :data:`INTENT_PRECEDENCE`).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from decnet.profiler.behave_shell._parse import hash_token
|
||||
|
||||
# ── operational.objective intent sets (G.1) ────────────────────────────────
|
||||
RECON_TOKENS: frozenset[str] = frozenset(
|
||||
hash_token(t) for t in (
|
||||
"ls", "pwd", "whoami", "id", "uname", "ps", "netstat", "ss",
|
||||
"cat", "find", "which", "env", "printenv", "hostname", "w",
|
||||
"who", "date", "uptime", "df", "du", "free", "lsof", "lsblk",
|
||||
)
|
||||
)
|
||||
EXFIL_TOKENS: frozenset[str] = frozenset(
|
||||
hash_token(t) for t in (
|
||||
"curl", "wget", "scp", "rsync", "nc", "ncat", "socat", "tar",
|
||||
"base64", "xxd", "python", "python3", "openssl",
|
||||
)
|
||||
)
|
||||
PERSISTENCE_TOKENS: frozenset[str] = frozenset(
|
||||
hash_token(t) for t in (
|
||||
"crontab", "systemctl", "useradd", "usermod", "passwd", "chsh",
|
||||
"at", "service", "chkconfig", "update-rc.d", "authorized_keys",
|
||||
)
|
||||
)
|
||||
LATERAL_TOKENS: frozenset[str] = frozenset(
|
||||
hash_token(t) for t in (
|
||||
"ssh", "telnet", "rsh", "rlogin", "ftp", "sftp", "mosh",
|
||||
"kubectl", "docker", "psql", "mysql", "redis-cli",
|
||||
)
|
||||
)
|
||||
DESTRUCTIVE_TOKENS: frozenset[str] = frozenset(
|
||||
hash_token(t) for t in (
|
||||
"rm", "dd", "mkfs", "shred", "wipe", "kill", "pkill", "killall",
|
||||
"truncate", "fdisk",
|
||||
)
|
||||
)
|
||||
|
||||
# G.1 majority-vote classifier walks first_token_hash → category in this
|
||||
# order; first hit wins. ``destructive`` outranks ``persistence`` because
|
||||
# a session that destroys outweighs one that also installs cron jobs;
|
||||
# ``exfil`` outranks ``lateral`` because pulling data is the more
|
||||
# specific signal.
|
||||
INTENT_PRECEDENCE: tuple[tuple[str, frozenset[str]], ...] = (
|
||||
("destructive", DESTRUCTIVE_TOKENS),
|
||||
("persistence", PERSISTENCE_TOKENS),
|
||||
("exfil", EXFIL_TOKENS),
|
||||
("lateral", LATERAL_TOKENS),
|
||||
("recon", RECON_TOKENS),
|
||||
)
|
||||
|
||||
|
||||
def classify_intent(first_token_hash: str) -> str | None:
|
||||
"""Return the registry intent label for ``first_token_hash``.
|
||||
|
||||
``None`` if the hash isn't in any intent set.
|
||||
"""
|
||||
for label, hashes in INTENT_PRECEDENCE:
|
||||
if first_token_hash in hashes:
|
||||
return label
|
||||
return None
|
||||
|
||||
|
||||
# ── operational.opsec_discipline (G.2) ─────────────────────────────────────
|
||||
# History-clearing / log-tampering vocabulary (first-token).
|
||||
OPSEC_HISTORY_TOKENS: frozenset[str] = frozenset(
|
||||
hash_token(t) for t in (
|
||||
"history", "unset", "export", "set", "script",
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
# ── emotional_valence lexicons (G.5 / G.8) ────────────────────────────────
|
||||
# Lowercased lexeme word-forms. Membership-tested against typed-text
|
||||
# tokens during the single-pass histogram walk in ``_ctx.py``. No raw
|
||||
# text retained — only per-set integer counters.
|
||||
#
|
||||
# Stop-word collisions with registry values (``no``, ``none``, ``ok``,
|
||||
# ``hell``→``shell_type``) are excluded — registry value strings travel
|
||||
# through observations and would trigger PII regression checks. Kept
|
||||
# lexemes are those that don't collide with primitive value vocabulary.
|
||||
POSITIVE_LEXEMES: frozenset[str] = frozenset({
|
||||
"thanks", "nice", "cool", "great", "okay",
|
||||
"perfect", "love", "awesome",
|
||||
})
|
||||
NEGATIVE_LEXEMES: frozenset[str] = frozenset({
|
||||
"wtf", "damn", "crap", "ugh", "broken", "stupid",
|
||||
"hate", "stuck", "wrong",
|
||||
})
|
||||
OBSCENITY_LEXEMES: frozenset[str] = frozenset({
|
||||
"fuck", "fucking", "fucked", "shit", "bitch", "ass", "cunt",
|
||||
"dick", "asshole",
|
||||
})
|
||||
|
||||
ALL_LEXEMES: frozenset[str] = (
|
||||
POSITIVE_LEXEMES | NEGATIVE_LEXEMES | OBSCENITY_LEXEMES
|
||||
)
|
||||
LEXEME_MAX_LEN: int = max((len(x) for x in ALL_LEXEMES), default=0)
|
||||
@@ -146,4 +146,4 @@ if __name__ == "__main__":
|
||||
twisted_log.startLoggingWithObserver(lambda e: None, setStdout=False)
|
||||
_log("startup", msg=f"FTP server starting as {NODE_NAME} on port {PORT}")
|
||||
cast(IReactorTCP, reactor).listenTCP(PORT, ServerFTPFactory()) # type: ignore[arg-type]
|
||||
reactor.run() # type: ignore[attr-defined]
|
||||
reactor.run() # type: ignore[misc]
|
||||
|
||||
@@ -79,7 +79,7 @@ dev = [
|
||||
"pytest-xdist>=3.8.0",
|
||||
"pytest-timeout>=2.4.0",
|
||||
"flask>=3.1.3",
|
||||
"twisted>=25.5.0",
|
||||
"twisted>=26.4.0rc2",
|
||||
"requests>=2.33.1",
|
||||
"redis>=7.4.0",
|
||||
"pymysql>=1.1.2",
|
||||
|
||||
147
tests/profiler/behave_shell/test_intent_lexicon.py
Normal file
147
tests/profiler/behave_shell/test_intent_lexicon.py
Normal file
@@ -0,0 +1,147 @@
|
||||
"""Step G.0: command-intent lexicon + lexical counter pass.
|
||||
|
||||
No primitive emitted by this commit — it's the shared infrastructure
|
||||
G.1-G.8 read from. Tests cover:
|
||||
|
||||
* hash-set sanity (no precedence-corrupting overlaps)
|
||||
* :func:`classify_intent` returns the correct registry label
|
||||
* the typed-text counter pass increments lexical counters and tracks
|
||||
caps / bang runs
|
||||
* paste-class events do NOT contribute to the typed counters
|
||||
* PII regression: counters land on ctx, no raw text on observations
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
from decnet.profiler.behave_shell import build_context, extract_session
|
||||
from decnet.profiler.behave_shell._intent import (
|
||||
DESTRUCTIVE_TOKENS,
|
||||
EXFIL_TOKENS,
|
||||
INTENT_PRECEDENCE,
|
||||
LATERAL_TOKENS,
|
||||
LEXEME_MAX_LEN,
|
||||
NEGATIVE_LEXEMES,
|
||||
OBSCENITY_LEXEMES,
|
||||
OPSEC_HISTORY_TOKENS,
|
||||
PERSISTENCE_TOKENS,
|
||||
POSITIVE_LEXEMES,
|
||||
RECON_TOKENS,
|
||||
classify_intent,
|
||||
)
|
||||
from decnet.profiler.behave_shell._parse import AsciinemaEvent, hash_token
|
||||
|
||||
|
||||
def _typed(text: str, t0: float = 0.0, dt: float = 0.05) -> list[AsciinemaEvent]:
|
||||
return [(t0 + i * dt, "i", c) for i, c in enumerate(text)]
|
||||
|
||||
|
||||
def test_intent_sets_disjoint_where_precedence_matters() -> None:
|
||||
"""``destructive`` and ``recon`` must not overlap — recon-only tokens
|
||||
should never accidentally classify as destructive (the high-precedence
|
||||
label). Cross-set overlap is *allowed*; precedence corruption is not.
|
||||
"""
|
||||
# rm appears in destructive AND in some cleanup contexts elsewhere;
|
||||
# but recon must not accidentally pull a destructive token.
|
||||
assert not (RECON_TOKENS & DESTRUCTIVE_TOKENS)
|
||||
assert not (RECON_TOKENS & PERSISTENCE_TOKENS)
|
||||
assert not (LATERAL_TOKENS & EXFIL_TOKENS)
|
||||
|
||||
|
||||
def test_classify_intent_returns_registry_labels() -> None:
|
||||
assert classify_intent(hash_token("rm")) == "destructive"
|
||||
assert classify_intent(hash_token("crontab")) == "persistence"
|
||||
assert classify_intent(hash_token("curl")) == "exfil"
|
||||
assert classify_intent(hash_token("ssh")) == "lateral"
|
||||
assert classify_intent(hash_token("ls")) == "recon"
|
||||
|
||||
|
||||
def test_classify_intent_unknown_returns_none() -> None:
|
||||
assert classify_intent(hash_token("vim")) is None
|
||||
assert classify_intent(hash_token("nonsense_xyz")) is None
|
||||
|
||||
|
||||
def test_lexicon_max_len_bounded() -> None:
|
||||
"""Lexeme buffer can't grow without bound."""
|
||||
assert LEXEME_MAX_LEN >= max(len(x) for x in OBSCENITY_LEXEMES)
|
||||
assert LEXEME_MAX_LEN < 32 # sanity — single short word forms only
|
||||
|
||||
|
||||
def test_obscenity_counter_fires_on_typed_token() -> None:
|
||||
"""Typed ``fuck `` (with trailing boundary) increments
|
||||
``obscenity_hits``; the lexeme is not retained as text."""
|
||||
events = _typed("fuck ")
|
||||
ctx = build_context(events, sid="g0-obs")
|
||||
assert ctx.obscenity_hits == 1
|
||||
assert ctx.positive_lex_hits == 0
|
||||
assert ctx.negative_lex_hits == 0
|
||||
|
||||
|
||||
def test_lexeme_longest_match_fucking_counts_once() -> None:
|
||||
"""``fucking`` is in the obscenity set; it should match once — not
|
||||
twice (``fuck`` + ``fucking``)."""
|
||||
events = _typed("fucking ")
|
||||
ctx = build_context(events, sid="g0-long")
|
||||
assert ctx.obscenity_hits == 1
|
||||
|
||||
|
||||
def test_positive_and_negative_counters() -> None:
|
||||
events = _typed("nice work damn it ")
|
||||
ctx = build_context(events, sid="g0-mix")
|
||||
assert ctx.positive_lex_hits == 1 # nice
|
||||
assert ctx.negative_lex_hits == 1 # damn
|
||||
|
||||
|
||||
def test_caps_run_max_tracks_longest_uppercase_streak() -> None:
|
||||
events = _typed("ok FUCK and OK ")
|
||||
ctx = build_context(events, sid="g0-caps")
|
||||
assert ctx.caps_run_max >= 4 # FUCK
|
||||
# obscenity is case-folded → still counts
|
||||
assert ctx.obscenity_hits >= 1
|
||||
|
||||
|
||||
def test_bang_run_max_tracks_longest_bang_streak() -> None:
|
||||
events = _typed("wait!!! no!!\n")
|
||||
ctx = build_context(events, sid="g0-bang")
|
||||
assert ctx.bang_run_max == 3
|
||||
|
||||
|
||||
def test_paste_class_events_excluded_from_lex_counters() -> None:
|
||||
"""A pasted obscenity must NOT increment counters — paste-class
|
||||
events are the F.4 / G.0 boundary the operator's own typing is on
|
||||
one side of, pasted text on the other."""
|
||||
events: list[AsciinemaEvent] = [(0.0, "i", "fuck and shit pasted in")]
|
||||
ctx = build_context(events, sid="g0-paste")
|
||||
assert ctx.obscenity_hits == 0
|
||||
assert ctx.negative_lex_hits == 0
|
||||
|
||||
|
||||
def test_no_lex_text_in_observation_values() -> None:
|
||||
"""PII regression: lexeme word forms must not appear in any emitted
|
||||
observation's ``value`` field. (Primitive names like ``shell_type``
|
||||
legitimately contain ``hell`` — this test guards the data, not the
|
||||
schema.)"""
|
||||
events = _typed("oh fuck this is broken damn ")
|
||||
obs = list(extract_session(events, sid="g0-pii"))
|
||||
for o in obs:
|
||||
v_str = json.dumps(o.value)
|
||||
for lex in (OBSCENITY_LEXEMES | NEGATIVE_LEXEMES | POSITIVE_LEXEMES):
|
||||
assert lex not in v_str, (
|
||||
f"raw lexeme {lex!r} leaked into observation value "
|
||||
f"for primitive {o.primitive!r}: {o.value!r}"
|
||||
)
|
||||
|
||||
|
||||
def test_intent_precedence_destructive_outranks_recon() -> None:
|
||||
"""``rm`` must classify as destructive even though recon includes
|
||||
file-system tools."""
|
||||
h = hash_token("rm")
|
||||
assert h in DESTRUCTIVE_TOKENS
|
||||
assert classify_intent(h) == "destructive"
|
||||
# Sanity: the precedence tuple's first entry is destructive.
|
||||
assert INTENT_PRECEDENCE[0][0] == "destructive"
|
||||
|
||||
|
||||
def test_opsec_history_tokens_populated() -> None:
|
||||
assert hash_token("history") in OPSEC_HISTORY_TOKENS
|
||||
assert hash_token("unset") in OPSEC_HISTORY_TOKENS
|
||||
Reference in New Issue
Block a user