feat(mutator): emit_decky_mutated helper — RFC 5424 + bus in one call
First step toward making mutation events first-class nodes in the
correlation graph. Today the graph silently reflects post-mutation
state with no marker of the transition; this helper lands the
emitter the mutator and deploy paths will call.
- decnet/mutator/events.py: emit_decky_mutated(bus, *, decky,
old_services, new_services, trigger, actor=None, log_path=None)
writes an RFC 5424 line (service=mutator, hostname=<decky>,
MSGID=decky_mutated, SD params for old/new services + trigger +
optional actor) to DECNET_INGEST_LOG_FILE, then fire-and-forget
publishes on decky.<id>.mutation. Either side failing is soft —
the other path still completes.
- MutationTrigger Literal covers creation, retirement, scheduled,
operator, behavioral, healer, federation. Reserved values for v2/v3
(behavioral + federation) stay nullable so the schema is stable.
- decnet/bus/topics.py: DECKY_MUTATION constant + decky_mutation(id)
builder. Distinct from DECKY_STATE ("current shape") because a
mutation is a transition event, not a steady-state snapshot.
- Empty-set symmetry: creation emits old_services=[], retirement
emits new_services=[]. Every decky lifecycle becomes a well-formed
fold sequence on the correlator side.
- 4 new tests: FakeBus + correlator parser round-trip; creation and
retirement empty-set cases; bus=None still writes syslog;
unwritable log path doesn't block bus publish. 95 tests green
across test_mutator + tests/bus.
This commit is contained in:
@@ -55,6 +55,12 @@ DECKY_TRAFFIC = "traffic"
|
|||||||
# without waiting for its scheduled interval. Underscored (not dotted)
|
# without waiting for its scheduled interval. Underscored (not dotted)
|
||||||
# to stay a single NATS token so the builder's validator accepts it.
|
# to stay a single NATS token so the builder's validator accepts it.
|
||||||
DECKY_MUTATE_REQUEST = "mutate_request"
|
DECKY_MUTATE_REQUEST = "mutate_request"
|
||||||
|
# Mutation transition event — distinct from DECKY_STATE ("current
|
||||||
|
# shape") because a mutation is a *transition* that carries old/new
|
||||||
|
# services + trigger + timing. Correlator consumes these (via the
|
||||||
|
# syslog sidechannel too) to interleave substrate-change markers into
|
||||||
|
# attacker traversals.
|
||||||
|
DECKY_MUTATION = "mutation"
|
||||||
|
|
||||||
# Attacker event types (second token under the ``attacker`` root). First
|
# Attacker event types (second token under the ``attacker`` root). First
|
||||||
# sighting, session boundary transitions, and score-threshold crossings
|
# sighting, session boundary transitions, and score-threshold crossings
|
||||||
@@ -104,6 +110,12 @@ def decky(decky_id: str, event_type: str) -> str:
|
|||||||
return f"{DECKY}.{decky_id}.{event_type}"
|
return f"{DECKY}.{decky_id}.{event_type}"
|
||||||
|
|
||||||
|
|
||||||
|
def decky_mutation(decky_id: str) -> str:
|
||||||
|
"""Build ``decky.<id>.mutation``."""
|
||||||
|
_reject_tokens(decky_id)
|
||||||
|
return f"{DECKY}.{decky_id}.{DECKY_MUTATION}"
|
||||||
|
|
||||||
|
|
||||||
def system(event_type: str) -> str:
|
def system(event_type: str) -> str:
|
||||||
"""Build ``system.<event_type>``.
|
"""Build ``system.<event_type>``.
|
||||||
|
|
||||||
|
|||||||
108
decnet/mutator/events.py
Normal file
108
decnet/mutator/events.py
Normal file
@@ -0,0 +1,108 @@
|
|||||||
|
"""Mutation-event emission.
|
||||||
|
|
||||||
|
One helper (:func:`emit_decky_mutated`) writes every substrate
|
||||||
|
transition to two places at once:
|
||||||
|
|
||||||
|
1. **RFC 5424 syslog** — appended to the collector's ingest log, so
|
||||||
|
the correlation engine picks the event up alongside attacker
|
||||||
|
events and can interleave substrate-change markers into traversals.
|
||||||
|
2. **Bus topic** ``decky.<name>.mutation`` — fire-and-forget
|
||||||
|
notification for live UI consumers (SSE, dashboards).
|
||||||
|
|
||||||
|
The split mirrors the DB-vs-bus contract: syslog is durable, bus is
|
||||||
|
at-most-once. Either path failing must never crash the mutator loop,
|
||||||
|
so both sides are wrapped in broad ``try/except log.warning``.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import socket as _socket
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Any, Literal
|
||||||
|
|
||||||
|
from decnet.bus import topics as _topics
|
||||||
|
from decnet.bus.base import BaseBus
|
||||||
|
from decnet.bus.publish import publish_safely as _publish_safely
|
||||||
|
from decnet.env import DECNET_INGEST_LOG_FILE
|
||||||
|
from decnet.logging import get_logger
|
||||||
|
from decnet.logging.syslog_formatter import format_rfc5424
|
||||||
|
|
||||||
|
log = get_logger("mutator.events")
|
||||||
|
|
||||||
|
|
||||||
|
# Trigger enum — wide on purpose so the schema stays stable as v2/v3
|
||||||
|
# features (behavioral + federation) land. Every call site supplies
|
||||||
|
# exactly one of these.
|
||||||
|
MutationTrigger = Literal[
|
||||||
|
"creation", # initial deploy of a decky
|
||||||
|
"retirement", # teardown / removal
|
||||||
|
"scheduled", # mutator watch-loop interval tick
|
||||||
|
"operator", # explicit force via API/CLI/UI
|
||||||
|
"behavioral", # future: attacker-behavior-driven rotation
|
||||||
|
"healer", # future: re-apply by the healer worker
|
||||||
|
"federation", # future: cross-operator MazeNET mutation
|
||||||
|
]
|
||||||
|
|
||||||
|
_EVENT_TYPE = "decky_mutated"
|
||||||
|
_MUTATOR_APP = "mutator"
|
||||||
|
_MUTATOR_HOSTNAME = _socket.gethostname()
|
||||||
|
|
||||||
|
|
||||||
|
async def emit_decky_mutated(
|
||||||
|
bus: BaseBus | None,
|
||||||
|
*,
|
||||||
|
decky: str,
|
||||||
|
old_services: list[str],
|
||||||
|
new_services: list[str],
|
||||||
|
trigger: MutationTrigger,
|
||||||
|
actor: str | None = None,
|
||||||
|
log_path: Path | str | None = None,
|
||||||
|
) -> None:
|
||||||
|
"""Emit one ``decky_mutated`` event on both the syslog stream and the bus.
|
||||||
|
|
||||||
|
*log_path* defaults to :data:`decnet.env.DECNET_INGEST_LOG_FILE`.
|
||||||
|
Pass an explicit path (or ``None``) in tests to redirect or suppress
|
||||||
|
the file write. A missing parent directory is a soft failure —
|
||||||
|
logged once and skipped — because the correlator works without
|
||||||
|
mutation events and we'd rather degrade than crash.
|
||||||
|
"""
|
||||||
|
fields: dict[str, Any] = {
|
||||||
|
"decky": decky,
|
||||||
|
"old_services": ",".join(old_services),
|
||||||
|
"new_services": ",".join(new_services),
|
||||||
|
"trigger": trigger,
|
||||||
|
}
|
||||||
|
if actor:
|
||||||
|
fields["actor"] = actor
|
||||||
|
|
||||||
|
# ── Syslog side ───────────────────────────────────────────────
|
||||||
|
target = Path(log_path) if log_path is not None else Path(DECNET_INGEST_LOG_FILE)
|
||||||
|
try:
|
||||||
|
line = format_rfc5424(
|
||||||
|
service=_MUTATOR_APP,
|
||||||
|
hostname=decky, # per-decky HOSTNAME so correlator indexes it correctly
|
||||||
|
event_type=_EVENT_TYPE,
|
||||||
|
**fields,
|
||||||
|
)
|
||||||
|
target.parent.mkdir(parents=True, exist_ok=True)
|
||||||
|
with open(target, "a", encoding="utf-8") as fh:
|
||||||
|
fh.write(line + "\n")
|
||||||
|
fh.flush()
|
||||||
|
except Exception as exc: # noqa: BLE001
|
||||||
|
log.warning("syslog emission failed decky=%s path=%s: %s",
|
||||||
|
decky, target, exc)
|
||||||
|
|
||||||
|
# ── Bus side ──────────────────────────────────────────────────
|
||||||
|
payload: dict[str, Any] = {
|
||||||
|
"decky": decky,
|
||||||
|
"old_services": list(old_services),
|
||||||
|
"new_services": list(new_services),
|
||||||
|
"trigger": trigger,
|
||||||
|
}
|
||||||
|
if actor:
|
||||||
|
payload["actor"] = actor
|
||||||
|
await _publish_safely(
|
||||||
|
bus,
|
||||||
|
_topics.decky_mutation(decky),
|
||||||
|
payload,
|
||||||
|
event_type=_topics.DECKY_MUTATION,
|
||||||
|
)
|
||||||
@@ -8,9 +8,12 @@ from unittest.mock import MagicMock, patch, AsyncMock
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
from decnet.bus.fake import FakeBus
|
||||||
from decnet.config import DeckyConfig, DecnetConfig
|
from decnet.config import DeckyConfig, DecnetConfig
|
||||||
|
from decnet.correlation.parser import parse_line
|
||||||
from decnet.engine import _compose_with_retry
|
from decnet.engine import _compose_with_retry
|
||||||
from decnet.mutator import mutate_all, mutate_decky
|
from decnet.mutator import mutate_all, mutate_decky
|
||||||
|
from decnet.mutator.events import emit_decky_mutated
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
@@ -224,6 +227,107 @@ class TestMutateDeckyBusPublish:
|
|||||||
bus.publish.assert_not_awaited()
|
bus.publish.assert_not_awaited()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# emit_decky_mutated — syslog + bus round-trip
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestEmitDeckyMutated:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_writes_syslog_line_and_publishes_bus_event(self, tmp_path):
|
||||||
|
bus = FakeBus()
|
||||||
|
await bus.connect()
|
||||||
|
log_path = tmp_path / "subdir" / "decnet.log"
|
||||||
|
sub = bus.subscribe("decky.*.mutation")
|
||||||
|
try:
|
||||||
|
async with sub:
|
||||||
|
await emit_decky_mutated(
|
||||||
|
bus,
|
||||||
|
decky="decky-01",
|
||||||
|
old_services=["ssh", "http"],
|
||||||
|
new_services=["rdp"],
|
||||||
|
trigger="operator",
|
||||||
|
actor="anti",
|
||||||
|
log_path=log_path,
|
||||||
|
)
|
||||||
|
event = await sub.__aiter__().__anext__()
|
||||||
|
finally:
|
||||||
|
await bus.close()
|
||||||
|
|
||||||
|
assert event.topic == "decky.decky-01.mutation"
|
||||||
|
assert event.payload["trigger"] == "operator"
|
||||||
|
assert event.payload["old_services"] == ["ssh", "http"]
|
||||||
|
assert event.payload["new_services"] == ["rdp"]
|
||||||
|
assert event.payload["actor"] == "anti"
|
||||||
|
|
||||||
|
assert log_path.exists()
|
||||||
|
lines = log_path.read_text().splitlines()
|
||||||
|
assert len(lines) == 1
|
||||||
|
parsed = parse_line(lines[0])
|
||||||
|
assert parsed is not None
|
||||||
|
assert parsed.service == "mutator"
|
||||||
|
assert parsed.decky == "decky-01"
|
||||||
|
assert parsed.event_type == "decky_mutated"
|
||||||
|
assert parsed.fields["trigger"] == "operator"
|
||||||
|
assert parsed.fields["old_services"] == "ssh,http"
|
||||||
|
assert parsed.fields["new_services"] == "rdp"
|
||||||
|
assert parsed.attacker_ip is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_empty_set_symmetry_creation_and_retirement(self, tmp_path):
|
||||||
|
"""Creation has old_services=[]; retirement has new_services=[]."""
|
||||||
|
bus = FakeBus()
|
||||||
|
await bus.connect()
|
||||||
|
log_path = tmp_path / "decnet.log"
|
||||||
|
try:
|
||||||
|
await emit_decky_mutated(
|
||||||
|
bus, decky="new-decky",
|
||||||
|
old_services=[], new_services=["ssh"],
|
||||||
|
trigger="creation", log_path=log_path,
|
||||||
|
)
|
||||||
|
await emit_decky_mutated(
|
||||||
|
bus, decky="old-decky",
|
||||||
|
old_services=["ftp"], new_services=[],
|
||||||
|
trigger="retirement", log_path=log_path,
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
await bus.close()
|
||||||
|
|
||||||
|
lines = log_path.read_text().splitlines()
|
||||||
|
assert len(lines) == 2
|
||||||
|
create = parse_line(lines[0])
|
||||||
|
retire = parse_line(lines[1])
|
||||||
|
assert create.fields["old_services"] == ""
|
||||||
|
assert create.fields["trigger"] == "creation"
|
||||||
|
assert retire.fields["new_services"] == ""
|
||||||
|
assert retire.fields["trigger"] == "retirement"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_bus_none_still_writes_syslog(self, tmp_path):
|
||||||
|
"""Bus is optional; syslog is the durable record and must land alone."""
|
||||||
|
log_path = tmp_path / "decnet.log"
|
||||||
|
await emit_decky_mutated(
|
||||||
|
None, decky="d1",
|
||||||
|
old_services=["ssh"], new_services=["rdp"],
|
||||||
|
trigger="scheduled", log_path=log_path,
|
||||||
|
)
|
||||||
|
assert log_path.exists()
|
||||||
|
parsed = parse_line(log_path.read_text().strip())
|
||||||
|
assert parsed is not None
|
||||||
|
assert parsed.fields["trigger"] == "scheduled"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_syslog_failure_does_not_block_bus_publish(self):
|
||||||
|
"""If the log path is unwritable, the bus event still fires."""
|
||||||
|
bus = AsyncMock()
|
||||||
|
bad = Path("/dev/null/nope/decnet.log")
|
||||||
|
await emit_decky_mutated(
|
||||||
|
bus, decky="d1",
|
||||||
|
old_services=[], new_services=["ssh"],
|
||||||
|
trigger="creation", log_path=bad,
|
||||||
|
)
|
||||||
|
bus.publish.assert_awaited_once()
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# _compose_with_retry (Sync tests, keep as is or minimal update)
|
# _compose_with_retry (Sync tests, keep as is or minimal update)
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|||||||
Reference in New Issue
Block a user