From f875350d7557af200824ef44d4e719b97392f394 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 21 Apr 2026 19:29:21 -0400 Subject: [PATCH] =?UTF-8?q?feat(mutator):=20emit=5Fdecky=5Fmutated=20helpe?= =?UTF-8?q?r=20=E2=80=94=20RFC=205424=20+=20bus=20in=20one=20call?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First step toward making mutation events first-class nodes in the correlation graph. Today the graph silently reflects post-mutation state with no marker of the transition; this helper lands the emitter the mutator and deploy paths will call. - decnet/mutator/events.py: emit_decky_mutated(bus, *, decky, old_services, new_services, trigger, actor=None, log_path=None) writes an RFC 5424 line (service=mutator, hostname=, MSGID=decky_mutated, SD params for old/new services + trigger + optional actor) to DECNET_INGEST_LOG_FILE, then fire-and-forget publishes on decky..mutation. Either side failing is soft — the other path still completes. - MutationTrigger Literal covers creation, retirement, scheduled, operator, behavioral, healer, federation. Reserved values for v2/v3 (behavioral + federation) stay nullable so the schema is stable. - decnet/bus/topics.py: DECKY_MUTATION constant + decky_mutation(id) builder. Distinct from DECKY_STATE ("current shape") because a mutation is a transition event, not a steady-state snapshot. - Empty-set symmetry: creation emits old_services=[], retirement emits new_services=[]. Every decky lifecycle becomes a well-formed fold sequence on the correlator side. - 4 new tests: FakeBus + correlator parser round-trip; creation and retirement empty-set cases; bus=None still writes syslog; unwritable log path doesn't block bus publish. 95 tests green across test_mutator + tests/bus. --- decnet/bus/topics.py | 12 +++++ decnet/mutator/events.py | 108 +++++++++++++++++++++++++++++++++++++++ tests/test_mutator.py | 104 +++++++++++++++++++++++++++++++++++++ 3 files changed, 224 insertions(+) create mode 100644 decnet/mutator/events.py diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index 4a5f505c..af51d4b0 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -55,6 +55,12 @@ DECKY_TRAFFIC = "traffic" # without waiting for its scheduled interval. Underscored (not dotted) # to stay a single NATS token so the builder's validator accepts it. DECKY_MUTATE_REQUEST = "mutate_request" +# Mutation transition event — distinct from DECKY_STATE ("current +# shape") because a mutation is a *transition* that carries old/new +# services + trigger + timing. Correlator consumes these (via the +# syslog sidechannel too) to interleave substrate-change markers into +# attacker traversals. +DECKY_MUTATION = "mutation" # Attacker event types (second token under the ``attacker`` root). First # sighting, session boundary transitions, and score-threshold crossings @@ -104,6 +110,12 @@ def decky(decky_id: str, event_type: str) -> str: return f"{DECKY}.{decky_id}.{event_type}" +def decky_mutation(decky_id: str) -> str: + """Build ``decky..mutation``.""" + _reject_tokens(decky_id) + return f"{DECKY}.{decky_id}.{DECKY_MUTATION}" + + def system(event_type: str) -> str: """Build ``system.``. diff --git a/decnet/mutator/events.py b/decnet/mutator/events.py new file mode 100644 index 00000000..9b3906f9 --- /dev/null +++ b/decnet/mutator/events.py @@ -0,0 +1,108 @@ +"""Mutation-event emission. + +One helper (:func:`emit_decky_mutated`) writes every substrate +transition to two places at once: + +1. **RFC 5424 syslog** — appended to the collector's ingest log, so + the correlation engine picks the event up alongside attacker + events and can interleave substrate-change markers into traversals. +2. **Bus topic** ``decky..mutation`` — fire-and-forget + notification for live UI consumers (SSE, dashboards). + +The split mirrors the DB-vs-bus contract: syslog is durable, bus is +at-most-once. Either path failing must never crash the mutator loop, +so both sides are wrapped in broad ``try/except log.warning``. +""" +from __future__ import annotations + +import socket as _socket +from pathlib import Path +from typing import Any, Literal + +from decnet.bus import topics as _topics +from decnet.bus.base import BaseBus +from decnet.bus.publish import publish_safely as _publish_safely +from decnet.env import DECNET_INGEST_LOG_FILE +from decnet.logging import get_logger +from decnet.logging.syslog_formatter import format_rfc5424 + +log = get_logger("mutator.events") + + +# Trigger enum — wide on purpose so the schema stays stable as v2/v3 +# features (behavioral + federation) land. Every call site supplies +# exactly one of these. +MutationTrigger = Literal[ + "creation", # initial deploy of a decky + "retirement", # teardown / removal + "scheduled", # mutator watch-loop interval tick + "operator", # explicit force via API/CLI/UI + "behavioral", # future: attacker-behavior-driven rotation + "healer", # future: re-apply by the healer worker + "federation", # future: cross-operator MazeNET mutation +] + +_EVENT_TYPE = "decky_mutated" +_MUTATOR_APP = "mutator" +_MUTATOR_HOSTNAME = _socket.gethostname() + + +async def emit_decky_mutated( + bus: BaseBus | None, + *, + decky: str, + old_services: list[str], + new_services: list[str], + trigger: MutationTrigger, + actor: str | None = None, + log_path: Path | str | None = None, +) -> None: + """Emit one ``decky_mutated`` event on both the syslog stream and the bus. + + *log_path* defaults to :data:`decnet.env.DECNET_INGEST_LOG_FILE`. + Pass an explicit path (or ``None``) in tests to redirect or suppress + the file write. A missing parent directory is a soft failure — + logged once and skipped — because the correlator works without + mutation events and we'd rather degrade than crash. + """ + fields: dict[str, Any] = { + "decky": decky, + "old_services": ",".join(old_services), + "new_services": ",".join(new_services), + "trigger": trigger, + } + if actor: + fields["actor"] = actor + + # ── Syslog side ─────────────────────────────────────────────── + target = Path(log_path) if log_path is not None else Path(DECNET_INGEST_LOG_FILE) + try: + line = format_rfc5424( + service=_MUTATOR_APP, + hostname=decky, # per-decky HOSTNAME so correlator indexes it correctly + event_type=_EVENT_TYPE, + **fields, + ) + target.parent.mkdir(parents=True, exist_ok=True) + with open(target, "a", encoding="utf-8") as fh: + fh.write(line + "\n") + fh.flush() + except Exception as exc: # noqa: BLE001 + log.warning("syslog emission failed decky=%s path=%s: %s", + decky, target, exc) + + # ── Bus side ────────────────────────────────────────────────── + payload: dict[str, Any] = { + "decky": decky, + "old_services": list(old_services), + "new_services": list(new_services), + "trigger": trigger, + } + if actor: + payload["actor"] = actor + await _publish_safely( + bus, + _topics.decky_mutation(decky), + payload, + event_type=_topics.DECKY_MUTATION, + ) diff --git a/tests/test_mutator.py b/tests/test_mutator.py index 3df8931b..12b55459 100644 --- a/tests/test_mutator.py +++ b/tests/test_mutator.py @@ -8,9 +8,12 @@ from unittest.mock import MagicMock, patch, AsyncMock import pytest +from decnet.bus.fake import FakeBus from decnet.config import DeckyConfig, DecnetConfig +from decnet.correlation.parser import parse_line from decnet.engine import _compose_with_retry from decnet.mutator import mutate_all, mutate_decky +from decnet.mutator.events import emit_decky_mutated # --------------------------------------------------------------------------- @@ -224,6 +227,107 @@ class TestMutateDeckyBusPublish: bus.publish.assert_not_awaited() +# --------------------------------------------------------------------------- +# emit_decky_mutated — syslog + bus round-trip +# --------------------------------------------------------------------------- + +class TestEmitDeckyMutated: + @pytest.mark.asyncio + async def test_writes_syslog_line_and_publishes_bus_event(self, tmp_path): + bus = FakeBus() + await bus.connect() + log_path = tmp_path / "subdir" / "decnet.log" + sub = bus.subscribe("decky.*.mutation") + try: + async with sub: + await emit_decky_mutated( + bus, + decky="decky-01", + old_services=["ssh", "http"], + new_services=["rdp"], + trigger="operator", + actor="anti", + log_path=log_path, + ) + event = await sub.__aiter__().__anext__() + finally: + await bus.close() + + assert event.topic == "decky.decky-01.mutation" + assert event.payload["trigger"] == "operator" + assert event.payload["old_services"] == ["ssh", "http"] + assert event.payload["new_services"] == ["rdp"] + assert event.payload["actor"] == "anti" + + assert log_path.exists() + lines = log_path.read_text().splitlines() + assert len(lines) == 1 + parsed = parse_line(lines[0]) + assert parsed is not None + assert parsed.service == "mutator" + assert parsed.decky == "decky-01" + assert parsed.event_type == "decky_mutated" + assert parsed.fields["trigger"] == "operator" + assert parsed.fields["old_services"] == "ssh,http" + assert parsed.fields["new_services"] == "rdp" + assert parsed.attacker_ip is None + + @pytest.mark.asyncio + async def test_empty_set_symmetry_creation_and_retirement(self, tmp_path): + """Creation has old_services=[]; retirement has new_services=[].""" + bus = FakeBus() + await bus.connect() + log_path = tmp_path / "decnet.log" + try: + await emit_decky_mutated( + bus, decky="new-decky", + old_services=[], new_services=["ssh"], + trigger="creation", log_path=log_path, + ) + await emit_decky_mutated( + bus, decky="old-decky", + old_services=["ftp"], new_services=[], + trigger="retirement", log_path=log_path, + ) + finally: + await bus.close() + + lines = log_path.read_text().splitlines() + assert len(lines) == 2 + create = parse_line(lines[0]) + retire = parse_line(lines[1]) + assert create.fields["old_services"] == "" + assert create.fields["trigger"] == "creation" + assert retire.fields["new_services"] == "" + assert retire.fields["trigger"] == "retirement" + + @pytest.mark.asyncio + async def test_bus_none_still_writes_syslog(self, tmp_path): + """Bus is optional; syslog is the durable record and must land alone.""" + log_path = tmp_path / "decnet.log" + await emit_decky_mutated( + None, decky="d1", + old_services=["ssh"], new_services=["rdp"], + trigger="scheduled", log_path=log_path, + ) + assert log_path.exists() + parsed = parse_line(log_path.read_text().strip()) + assert parsed is not None + assert parsed.fields["trigger"] == "scheduled" + + @pytest.mark.asyncio + async def test_syslog_failure_does_not_block_bus_publish(self): + """If the log path is unwritable, the bus event still fires.""" + bus = AsyncMock() + bad = Path("/dev/null/nope/decnet.log") + await emit_decky_mutated( + bus, decky="d1", + old_services=[], new_services=["ssh"], + trigger="creation", log_path=bad, + ) + bus.publish.assert_awaited_once() + + # --------------------------------------------------------------------------- # _compose_with_retry (Sync tests, keep as is or minimal update) # ---------------------------------------------------------------------------