feat(collector): publish attacker.session.ended on session_recorded events

The TTP worker subscribes to attacker.session.ended but no upstream
component published it — the rule pack (R0001–R0030) therefore never
fired on live SSH traffic even after the consume-side wiring landed
in E.3.18a/b/c.

The collector now hosts a per-attacker_ip command index
(_SessionAggregator) that watches the same parsed-event stream as
_publish_log. Shell `command` events are appended to a per-IP list;
on `session_recorded` the aggregator slices the list to commands
inside the [ended_at - duration_s, ended_at] window and publishes
attacker.session.ended with the session metadata + commands list.
The TTP worker's _build_events fan-out (E.3.18b) turns each command
into a source_kind="command" TaggerEvent that the RuleEngineTagger
(E.3.18c) matches against R0001–R0030.

Memory bound: per-IP entries TTL-evict at DECNET_COLLECTOR_SESSION_AGG_TTL_SEC
(default 3600 s). Publish failures are swallowed in the aggregator —
a misbehaving bus cannot stall the per-container stream threads.
This commit is contained in:
2026-05-02 02:35:08 -04:00
parent d9d2a80573
commit b043c96d29
3 changed files with 446 additions and 0 deletions

View File

@@ -132,6 +132,157 @@ def _reset_rate_limiter() -> None:
with _rl_lock:
_rl_last.clear()
# ─── Session aggregator (TTP `attacker.session.ended` producer) ──────────────
#
# The TTP worker subscribes to ``attacker.session.ended`` and turns each
# emitted command into a ``source_kind="command"`` :class:`TaggerEvent`
# (see ``decnet/ttp/worker._build_events``). No upstream worker was
# producing that topic — the rule pack therefore never fired on live
# traffic. The aggregator below indexes shell-command events
# per-attacker_ip and emits one ``attacker.session.ended`` envelope
# whenever the SSH ``sessrec`` worker publishes ``session_recorded``.
#
# Memory bound: each attacker_ip's deque is capped by a TTL eviction
# (default 3600 s). Override via ``DECNET_COLLECTOR_SESSION_AGG_TTL_SEC``.
_SESSION_AGG_TTL_SEC: float = _parse_float_env(
"DECNET_COLLECTOR_SESSION_AGG_TTL_SEC", 3600.0,
)
def _parse_iso_ts(value: str) -> Optional[datetime]:
"""Best-effort ISO-8601 parse for parsed event timestamps.
The collector's parser stamps ``timestamp`` either as the original
ISO-8601 string (when ``datetime.fromisoformat`` failed) or as the
reformatted ``%Y-%m-%d %H:%M:%S`` string. Both round-trip through
``fromisoformat`` after a space→T swap. Returns None if neither
shape parses — the aggregator skips events it can't time-stamp.
"""
if not value:
return None
candidates = (value, value.replace(" ", "T"))
for cand in candidates:
try:
return datetime.fromisoformat(cand)
except ValueError:
continue
return None
class _SessionAggregator:
"""Per-attacker_ip command index that emits ``attacker.session.ended``.
Thread-safe — :meth:`add_event` is called from the per-container
stream threads. Internal state is protected by a single lock; the
publish fan-out happens inside the lock for simplicity (the
downstream publish_fn is the thread-safe marshaller from
:mod:`decnet.bus.publish`, which is non-blocking).
"""
def __init__(
self,
publish_fn: Callable[[str, dict[str, Any], str], None],
*,
ttl_sec: float = _SESSION_AGG_TTL_SEC,
) -> None:
self._publish = publish_fn
self._ttl = ttl_sec
self._lock = threading.Lock()
# attacker_ip → list of (timestamp, parsed_event) tuples.
# Stored as a list rather than a deque so the ``in_window``
# filter can index linearly; the per-attacker volume is
# bounded by the TTL and by typical session size (≤ a few
# hundred commands) so this stays cheap.
self._cmds: dict[str, list[tuple[datetime, dict[str, Any]]]] = {}
def add_event(self, parsed: dict[str, Any]) -> None:
"""Index a parsed event. Emits on ``session_recorded``."""
event_type = parsed.get("event_type", "")
attacker_ip = parsed.get("attacker_ip") or ""
if not attacker_ip or attacker_ip == "Unknown":
return
ts = _parse_iso_ts(str(parsed.get("timestamp", "")))
if ts is None:
return
with self._lock:
self._evict_expired(ts)
if event_type == "command":
self._cmds.setdefault(attacker_ip, []).append((ts, parsed))
return
if event_type == "session_recorded":
self._emit_session(parsed, attacker_ip, ts)
def _evict_expired(self, now: datetime) -> None:
"""Drop commands older than ``self._ttl`` seconds."""
cutoff = now.timestamp() - self._ttl
for ip, entries in list(self._cmds.items()):
kept = [(t, p) for t, p in entries if t.timestamp() >= cutoff]
if kept:
self._cmds[ip] = kept
else:
del self._cmds[ip]
def _emit_session(
self, parsed: dict[str, Any], attacker_ip: str, ended_at: datetime,
) -> None:
"""Build an ``attacker.session.ended`` envelope and publish it.
Slices the per-IP deque to commands whose timestamp falls
inside ``[ended_at - duration_s, ended_at]``. Commands stay in
the deque after the slice — the TTL eviction is the only path
that drops them, so two back-to-back sessions for the same IP
share the visible window without losing rows.
"""
fields = parsed.get("fields", {}) or {}
duration_raw = fields.get("duration_s") or "0"
try:
duration_s = float(duration_raw)
except (TypeError, ValueError):
duration_s = 0.0
sid = str(fields.get("sid") or "")
service = str(fields.get("service") or parsed.get("service") or "")
decky = parsed.get("decky") or ""
commands_window = self._cmds.get(attacker_ip, [])
cutoff_lo = ended_at.timestamp() - max(duration_s, 0.0)
commands: list[dict[str, Any]] = []
for idx, (cmd_ts, cmd_parsed) in enumerate(commands_window):
if cmd_ts.timestamp() < cutoff_lo:
continue
cmd_fields = cmd_parsed.get("fields", {}) or {}
cmd_text = (
cmd_fields.get("command")
or cmd_fields.get("cmd")
or cmd_parsed.get("msg", "")
)
commands.append({
"id": f"{sid}#{idx}" if sid else f"{attacker_ip}-{cmd_ts.isoformat()}",
"command_text": str(cmd_text),
"ts": cmd_ts.isoformat(),
"decky": cmd_parsed.get("decky", ""),
"service": cmd_parsed.get("service", ""),
})
payload: dict[str, Any] = {
"session_id": sid or None,
"attacker_uuid": None, # consumer resolves via repo
"attacker_ip": attacker_ip,
"decky_id": decky,
"service": service,
"ended_at": ended_at.isoformat(),
"duration_s": duration_s,
"commands": commands,
}
topic = _topics.attacker(_topics.ATTACKER_SESSION_ENDED)
try:
self._publish(topic, payload, _topics.ATTACKER_SESSION_ENDED)
except Exception as exc: # noqa: BLE001
logger.debug(
"collector: session.ended publish failed: %s", exc,
)
# ─── RFC 5424 parser ──────────────────────────────────────────────────────────
_RFC5424_RE = re.compile(
@@ -479,12 +630,17 @@ def _make_system_log_publisher(
thread can call it unconditionally. Otherwise each call is marshalled
onto *loop* (the asyncio event loop that owns the bus socket) via
``make_thread_safe_publisher``.
The same call also feeds a :class:`_SessionAggregator` so shell
commands are indexed per-attacker_ip and ``attacker.session.ended``
fires whenever the SSH ``sessrec`` worker logs ``session_recorded``.
"""
raw_publish = make_thread_safe_publisher(bus, loop) if bus is not None else None
if raw_publish is None:
return lambda _parsed: None
topic = _topics.system(_topics.SYSTEM_LOG)
aggregator = _SessionAggregator(raw_publish)
def _publish(parsed: dict[str, Any]) -> None:
event_type = parsed.get("event_type", "")
@@ -499,6 +655,7 @@ def _make_system_log_publisher(
},
event_type,
)
aggregator.add_event(parsed)
return _publish

View File

@@ -0,0 +1,207 @@
"""Collector session aggregator emits ``attacker.session.ended``.
Pins the producer-side fix that closes the gap surfaced in TTP
debugging: the TTP worker subscribes to ``attacker.session.ended`` but
no upstream component published it. The aggregator indexes shell
``command`` events per attacker_ip and emits one envelope per
``session_recorded`` event with the commands that fall inside the
session window.
"""
from __future__ import annotations
from typing import Any
import pytest
from decnet.bus import topics as _topics
from decnet.collector.worker import _SessionAggregator
_ATTACKER_IP = "192.168.1.5"
def _cmd(ts_iso: str, text: str) -> dict[str, Any]:
return {
"timestamp": ts_iso,
"decky": "SRV-DELTA-77",
"service": "bash",
"event_type": "command",
"attacker_ip": _ATTACKER_IP,
"fields": {"command": text, "src": _ATTACKER_IP},
}
def _session_recorded(
ts_iso: str, sid: str, duration_s: float = 60.0,
) -> dict[str, Any]:
return {
"timestamp": ts_iso,
"decky": "omega-decky",
"service": "sessrec",
"event_type": "session_recorded",
"attacker_ip": _ATTACKER_IP,
"fields": {
"sid": sid,
"service": "ssh",
"duration_s": str(duration_s),
"src_ip": _ATTACKER_IP,
},
}
@pytest.fixture
def captured_publishes() -> list[tuple[str, dict[str, Any], str]]:
return []
@pytest.fixture
def aggregator(
captured_publishes: list[tuple[str, dict[str, Any], str]],
) -> _SessionAggregator:
def _publish(topic: str, payload: dict[str, Any], event_type: str) -> None:
captured_publishes.append((topic, payload, event_type))
return _SessionAggregator(_publish, ttl_sec=3600.0)
# ── Indexing ────────────────────────────────────────────────────────
def test_command_events_are_indexed_per_attacker_ip(
aggregator: _SessionAggregator,
) -> None:
aggregator.add_event(_cmd("2026-05-02T06:22:48", "whoami"))
aggregator.add_event(_cmd("2026-05-02T06:22:50", "id"))
assert len(aggregator._cmds[_ATTACKER_IP]) == 2
def test_unknown_attacker_ip_is_ignored(
aggregator: _SessionAggregator,
captured_publishes: list[tuple[str, dict[str, Any], str]],
) -> None:
bad = _cmd("2026-05-02T06:22:48", "whoami")
bad["attacker_ip"] = "Unknown"
aggregator.add_event(bad)
assert aggregator._cmds == {}
def test_unparseable_timestamp_is_skipped(
aggregator: _SessionAggregator,
) -> None:
bad = _cmd("not-a-timestamp", "whoami")
aggregator.add_event(bad)
assert aggregator._cmds == {}
# ── Session emission ────────────────────────────────────────────────
def test_session_recorded_emits_attacker_session_ended(
aggregator: _SessionAggregator,
captured_publishes: list[tuple[str, dict[str, Any], str]],
) -> None:
aggregator.add_event(_cmd("2026-05-02T06:22:48", "whoami"))
aggregator.add_event(_cmd("2026-05-02T06:23:00", "id"))
aggregator.add_event(_cmd("2026-05-02T06:23:10", "uname -a"))
aggregator.add_event(_session_recorded(
"2026-05-02T06:23:30", sid="sess-123", duration_s=120.0,
))
assert len(captured_publishes) == 1
topic, payload, event_type = captured_publishes[0]
assert topic == _topics.attacker(_topics.ATTACKER_SESSION_ENDED)
assert event_type == _topics.ATTACKER_SESSION_ENDED
assert payload["session_id"] == "sess-123"
assert payload["attacker_ip"] == _ATTACKER_IP
assert payload["decky_id"] == "omega-decky"
assert payload["service"] == "ssh"
assert payload["duration_s"] == 120.0
cmds = payload["commands"]
assert [c["command_text"] for c in cmds] == ["whoami", "id", "uname -a"]
assert [c["id"] for c in cmds] == [
"sess-123#0", "sess-123#1", "sess-123#2",
]
def test_commands_outside_session_window_are_excluded(
aggregator: _SessionAggregator,
captured_publishes: list[tuple[str, dict[str, Any], str]],
) -> None:
"""duration_s window is [ended_at - duration_s, ended_at]."""
# Old command from 10 minutes before the session — out of window
# for a 60-second session.
aggregator.add_event(_cmd("2026-05-02T06:13:00", "older-than-window"))
# In-window
aggregator.add_event(_cmd("2026-05-02T06:22:50", "whoami"))
aggregator.add_event(_session_recorded(
"2026-05-02T06:23:00", sid="s1", duration_s=60.0,
))
payload = captured_publishes[0][1]
cmd_texts = [c["command_text"] for c in payload["commands"]]
assert "whoami" in cmd_texts
assert "older-than-window" not in cmd_texts
def test_back_to_back_sessions_emit_distinct_envelopes(
aggregator: _SessionAggregator,
captured_publishes: list[tuple[str, dict[str, Any], str]],
) -> None:
aggregator.add_event(_cmd("2026-05-02T06:22:50", "whoami"))
aggregator.add_event(_session_recorded(
"2026-05-02T06:23:00", sid="s1", duration_s=60.0,
))
aggregator.add_event(_cmd("2026-05-02T06:30:00", "ls"))
aggregator.add_event(_session_recorded(
"2026-05-02T06:30:30", sid="s2", duration_s=60.0,
))
assert len(captured_publishes) == 2
s1, s2 = captured_publishes[0][1], captured_publishes[1][1]
assert s1["session_id"] == "s1"
assert s2["session_id"] == "s2"
# The second session window is 60s — only "ls" lands in it.
assert [c["command_text"] for c in s2["commands"]] == ["ls"]
def test_session_without_sid_falls_back_to_synthetic_id(
aggregator: _SessionAggregator,
captured_publishes: list[tuple[str, dict[str, Any], str]],
) -> None:
aggregator.add_event(_cmd("2026-05-02T06:22:50", "whoami"))
sr = _session_recorded("2026-05-02T06:23:00", sid="", duration_s=60.0)
sr["fields"]["sid"] = ""
aggregator.add_event(sr)
payload = captured_publishes[0][1]
assert payload["session_id"] is None
cmd_id = payload["commands"][0]["id"]
assert cmd_id.startswith(f"{_ATTACKER_IP}-2026-05-02T06:22:50")
# ── TTL eviction ────────────────────────────────────────────────────
def test_ttl_eviction_drops_old_commands() -> None:
publishes: list[tuple[str, dict[str, Any], str]] = []
def _publish(topic: str, payload: dict[str, Any], event_type: str) -> None:
publishes.append((topic, payload, event_type))
agg = _SessionAggregator(_publish, ttl_sec=10.0)
agg.add_event(_cmd("2026-05-02T06:00:00", "old"))
# New command 30 seconds later — TTL=10s, so the old one evicts.
agg.add_event(_cmd("2026-05-02T06:00:30", "fresh"))
remaining = [
p.get("fields", {}).get("command")
for _, p in agg._cmds[_ATTACKER_IP]
]
assert remaining == ["fresh"]
def test_publish_failure_is_swallowed() -> None:
"""A blowing-up publish must not propagate into the stream thread."""
def _bad(_t: str, _p: dict[str, Any], _e: str) -> None:
raise RuntimeError("bus exploded")
agg = _SessionAggregator(_bad, ttl_sec=3600.0)
agg.add_event(_cmd("2026-05-02T06:22:50", "whoami"))
# Should NOT raise.
agg.add_event(_session_recorded("2026-05-02T06:23:00", sid="s1"))

View File

@@ -0,0 +1,82 @@
"""Collector publishes ``attacker.session.ended`` end-to-end.
Wires :func:`_make_system_log_publisher` against a fake bus, drives
two parsed events (a CMD then a session_recorded) through the
returned publish_fn, and asserts the bus saw one
``attacker.session.ended`` envelope alongside the per-line
``system.log`` traffic.
"""
from __future__ import annotations
import asyncio
from typing import Any, AsyncIterator
import pytest
import pytest_asyncio
from decnet.bus import topics as _topics
from decnet.bus.fake import FakeBus
from decnet.collector.worker import _make_system_log_publisher
@pytest_asyncio.fixture
async def bus() -> AsyncIterator[FakeBus]:
b = FakeBus()
await b.connect()
try:
yield b
finally:
await b.close()
@pytest.mark.asyncio
async def test_session_ended_published_alongside_system_log(
bus: FakeBus,
) -> None:
captured: list[tuple[str, dict[str, Any]]] = []
sub = bus.subscribe(_topics.attacker(_topics.ATTACKER_SESSION_ENDED))
async def drain() -> None:
try:
async with sub:
async for ev in sub:
captured.append((ev.topic, ev.payload))
except Exception:
pass
drain_task = asyncio.create_task(drain())
await asyncio.sleep(0)
loop = asyncio.get_running_loop()
publish_fn = _make_system_log_publisher(bus, loop)
publish_fn({
"timestamp": "2026-05-02T06:22:48",
"decky": "SRV-DELTA-77",
"service": "bash",
"event_type": "command",
"attacker_ip": "192.168.1.5",
"fields": {"command": "whoami"},
})
publish_fn({
"timestamp": "2026-05-02T06:23:00",
"decky": "omega-decky",
"service": "sessrec",
"event_type": "session_recorded",
"attacker_ip": "192.168.1.5",
"fields": {
"sid": "sess-abc",
"service": "ssh",
"duration_s": "60.0",
},
})
# Give the marshalled publish a tick to land.
await asyncio.sleep(0.1)
drain_task.cancel()
assert len(captured) == 1
topic, payload = captured[0]
assert topic == _topics.attacker(_topics.ATTACKER_SESSION_ENDED)
assert payload["session_id"] == "sess-abc"
assert [c["command_text"] for c in payload["commands"]] == ["whoami"]