diff --git a/decnet/collector/worker.py b/decnet/collector/worker.py index ccbffdea..351b36ee 100644 --- a/decnet/collector/worker.py +++ b/decnet/collector/worker.py @@ -132,6 +132,157 @@ def _reset_rate_limiter() -> None: with _rl_lock: _rl_last.clear() + +# ─── Session aggregator (TTP `attacker.session.ended` producer) ────────────── +# +# The TTP worker subscribes to ``attacker.session.ended`` and turns each +# emitted command into a ``source_kind="command"`` :class:`TaggerEvent` +# (see ``decnet/ttp/worker._build_events``). No upstream worker was +# producing that topic — the rule pack therefore never fired on live +# traffic. The aggregator below indexes shell-command events +# per-attacker_ip and emits one ``attacker.session.ended`` envelope +# whenever the SSH ``sessrec`` worker publishes ``session_recorded``. +# +# Memory bound: each attacker_ip's deque is capped by a TTL eviction +# (default 3600 s). Override via ``DECNET_COLLECTOR_SESSION_AGG_TTL_SEC``. + +_SESSION_AGG_TTL_SEC: float = _parse_float_env( + "DECNET_COLLECTOR_SESSION_AGG_TTL_SEC", 3600.0, +) + + +def _parse_iso_ts(value: str) -> Optional[datetime]: + """Best-effort ISO-8601 parse for parsed event timestamps. + + The collector's parser stamps ``timestamp`` either as the original + ISO-8601 string (when ``datetime.fromisoformat`` failed) or as the + reformatted ``%Y-%m-%d %H:%M:%S`` string. Both round-trip through + ``fromisoformat`` after a space→T swap. Returns None if neither + shape parses — the aggregator skips events it can't time-stamp. + """ + if not value: + return None + candidates = (value, value.replace(" ", "T")) + for cand in candidates: + try: + return datetime.fromisoformat(cand) + except ValueError: + continue + return None + + +class _SessionAggregator: + """Per-attacker_ip command index that emits ``attacker.session.ended``. + + Thread-safe — :meth:`add_event` is called from the per-container + stream threads. Internal state is protected by a single lock; the + publish fan-out happens inside the lock for simplicity (the + downstream publish_fn is the thread-safe marshaller from + :mod:`decnet.bus.publish`, which is non-blocking). + """ + + def __init__( + self, + publish_fn: Callable[[str, dict[str, Any], str], None], + *, + ttl_sec: float = _SESSION_AGG_TTL_SEC, + ) -> None: + self._publish = publish_fn + self._ttl = ttl_sec + self._lock = threading.Lock() + # attacker_ip → list of (timestamp, parsed_event) tuples. + # Stored as a list rather than a deque so the ``in_window`` + # filter can index linearly; the per-attacker volume is + # bounded by the TTL and by typical session size (≤ a few + # hundred commands) so this stays cheap. + self._cmds: dict[str, list[tuple[datetime, dict[str, Any]]]] = {} + + def add_event(self, parsed: dict[str, Any]) -> None: + """Index a parsed event. Emits on ``session_recorded``.""" + event_type = parsed.get("event_type", "") + attacker_ip = parsed.get("attacker_ip") or "" + if not attacker_ip or attacker_ip == "Unknown": + return + ts = _parse_iso_ts(str(parsed.get("timestamp", ""))) + if ts is None: + return + with self._lock: + self._evict_expired(ts) + if event_type == "command": + self._cmds.setdefault(attacker_ip, []).append((ts, parsed)) + return + if event_type == "session_recorded": + self._emit_session(parsed, attacker_ip, ts) + + def _evict_expired(self, now: datetime) -> None: + """Drop commands older than ``self._ttl`` seconds.""" + cutoff = now.timestamp() - self._ttl + for ip, entries in list(self._cmds.items()): + kept = [(t, p) for t, p in entries if t.timestamp() >= cutoff] + if kept: + self._cmds[ip] = kept + else: + del self._cmds[ip] + + def _emit_session( + self, parsed: dict[str, Any], attacker_ip: str, ended_at: datetime, + ) -> None: + """Build an ``attacker.session.ended`` envelope and publish it. + + Slices the per-IP deque to commands whose timestamp falls + inside ``[ended_at - duration_s, ended_at]``. Commands stay in + the deque after the slice — the TTL eviction is the only path + that drops them, so two back-to-back sessions for the same IP + share the visible window without losing rows. + """ + fields = parsed.get("fields", {}) or {} + duration_raw = fields.get("duration_s") or "0" + try: + duration_s = float(duration_raw) + except (TypeError, ValueError): + duration_s = 0.0 + sid = str(fields.get("sid") or "") + service = str(fields.get("service") or parsed.get("service") or "") + decky = parsed.get("decky") or "" + + commands_window = self._cmds.get(attacker_ip, []) + cutoff_lo = ended_at.timestamp() - max(duration_s, 0.0) + commands: list[dict[str, Any]] = [] + for idx, (cmd_ts, cmd_parsed) in enumerate(commands_window): + if cmd_ts.timestamp() < cutoff_lo: + continue + cmd_fields = cmd_parsed.get("fields", {}) or {} + cmd_text = ( + cmd_fields.get("command") + or cmd_fields.get("cmd") + or cmd_parsed.get("msg", "") + ) + commands.append({ + "id": f"{sid}#{idx}" if sid else f"{attacker_ip}-{cmd_ts.isoformat()}", + "command_text": str(cmd_text), + "ts": cmd_ts.isoformat(), + "decky": cmd_parsed.get("decky", ""), + "service": cmd_parsed.get("service", ""), + }) + + payload: dict[str, Any] = { + "session_id": sid or None, + "attacker_uuid": None, # consumer resolves via repo + "attacker_ip": attacker_ip, + "decky_id": decky, + "service": service, + "ended_at": ended_at.isoformat(), + "duration_s": duration_s, + "commands": commands, + } + topic = _topics.attacker(_topics.ATTACKER_SESSION_ENDED) + try: + self._publish(topic, payload, _topics.ATTACKER_SESSION_ENDED) + except Exception as exc: # noqa: BLE001 + logger.debug( + "collector: session.ended publish failed: %s", exc, + ) + # ─── RFC 5424 parser ────────────────────────────────────────────────────────── _RFC5424_RE = re.compile( @@ -479,12 +630,17 @@ def _make_system_log_publisher( thread can call it unconditionally. Otherwise each call is marshalled onto *loop* (the asyncio event loop that owns the bus socket) via ``make_thread_safe_publisher``. + + The same call also feeds a :class:`_SessionAggregator` so shell + commands are indexed per-attacker_ip and ``attacker.session.ended`` + fires whenever the SSH ``sessrec`` worker logs ``session_recorded``. """ raw_publish = make_thread_safe_publisher(bus, loop) if bus is not None else None if raw_publish is None: return lambda _parsed: None topic = _topics.system(_topics.SYSTEM_LOG) + aggregator = _SessionAggregator(raw_publish) def _publish(parsed: dict[str, Any]) -> None: event_type = parsed.get("event_type", "") @@ -499,6 +655,7 @@ def _make_system_log_publisher( }, event_type, ) + aggregator.add_event(parsed) return _publish diff --git a/tests/collector/test_session_aggregator.py b/tests/collector/test_session_aggregator.py new file mode 100644 index 00000000..783939a1 --- /dev/null +++ b/tests/collector/test_session_aggregator.py @@ -0,0 +1,207 @@ +"""Collector session aggregator emits ``attacker.session.ended``. + +Pins the producer-side fix that closes the gap surfaced in TTP +debugging: the TTP worker subscribes to ``attacker.session.ended`` but +no upstream component published it. The aggregator indexes shell +``command`` events per attacker_ip and emits one envelope per +``session_recorded`` event with the commands that fall inside the +session window. +""" +from __future__ import annotations + +from typing import Any + +import pytest + +from decnet.bus import topics as _topics +from decnet.collector.worker import _SessionAggregator + + +_ATTACKER_IP = "192.168.1.5" + + +def _cmd(ts_iso: str, text: str) -> dict[str, Any]: + return { + "timestamp": ts_iso, + "decky": "SRV-DELTA-77", + "service": "bash", + "event_type": "command", + "attacker_ip": _ATTACKER_IP, + "fields": {"command": text, "src": _ATTACKER_IP}, + } + + +def _session_recorded( + ts_iso: str, sid: str, duration_s: float = 60.0, +) -> dict[str, Any]: + return { + "timestamp": ts_iso, + "decky": "omega-decky", + "service": "sessrec", + "event_type": "session_recorded", + "attacker_ip": _ATTACKER_IP, + "fields": { + "sid": sid, + "service": "ssh", + "duration_s": str(duration_s), + "src_ip": _ATTACKER_IP, + }, + } + + +@pytest.fixture +def captured_publishes() -> list[tuple[str, dict[str, Any], str]]: + return [] + + +@pytest.fixture +def aggregator( + captured_publishes: list[tuple[str, dict[str, Any], str]], +) -> _SessionAggregator: + def _publish(topic: str, payload: dict[str, Any], event_type: str) -> None: + captured_publishes.append((topic, payload, event_type)) + + return _SessionAggregator(_publish, ttl_sec=3600.0) + + +# ── Indexing ──────────────────────────────────────────────────────── + + +def test_command_events_are_indexed_per_attacker_ip( + aggregator: _SessionAggregator, +) -> None: + aggregator.add_event(_cmd("2026-05-02T06:22:48", "whoami")) + aggregator.add_event(_cmd("2026-05-02T06:22:50", "id")) + assert len(aggregator._cmds[_ATTACKER_IP]) == 2 + + +def test_unknown_attacker_ip_is_ignored( + aggregator: _SessionAggregator, + captured_publishes: list[tuple[str, dict[str, Any], str]], +) -> None: + bad = _cmd("2026-05-02T06:22:48", "whoami") + bad["attacker_ip"] = "Unknown" + aggregator.add_event(bad) + assert aggregator._cmds == {} + + +def test_unparseable_timestamp_is_skipped( + aggregator: _SessionAggregator, +) -> None: + bad = _cmd("not-a-timestamp", "whoami") + aggregator.add_event(bad) + assert aggregator._cmds == {} + + +# ── Session emission ──────────────────────────────────────────────── + + +def test_session_recorded_emits_attacker_session_ended( + aggregator: _SessionAggregator, + captured_publishes: list[tuple[str, dict[str, Any], str]], +) -> None: + aggregator.add_event(_cmd("2026-05-02T06:22:48", "whoami")) + aggregator.add_event(_cmd("2026-05-02T06:23:00", "id")) + aggregator.add_event(_cmd("2026-05-02T06:23:10", "uname -a")) + aggregator.add_event(_session_recorded( + "2026-05-02T06:23:30", sid="sess-123", duration_s=120.0, + )) + + assert len(captured_publishes) == 1 + topic, payload, event_type = captured_publishes[0] + assert topic == _topics.attacker(_topics.ATTACKER_SESSION_ENDED) + assert event_type == _topics.ATTACKER_SESSION_ENDED + assert payload["session_id"] == "sess-123" + assert payload["attacker_ip"] == _ATTACKER_IP + assert payload["decky_id"] == "omega-decky" + assert payload["service"] == "ssh" + assert payload["duration_s"] == 120.0 + cmds = payload["commands"] + assert [c["command_text"] for c in cmds] == ["whoami", "id", "uname -a"] + assert [c["id"] for c in cmds] == [ + "sess-123#0", "sess-123#1", "sess-123#2", + ] + + +def test_commands_outside_session_window_are_excluded( + aggregator: _SessionAggregator, + captured_publishes: list[tuple[str, dict[str, Any], str]], +) -> None: + """duration_s window is [ended_at - duration_s, ended_at].""" + # Old command from 10 minutes before the session — out of window + # for a 60-second session. + aggregator.add_event(_cmd("2026-05-02T06:13:00", "older-than-window")) + # In-window + aggregator.add_event(_cmd("2026-05-02T06:22:50", "whoami")) + aggregator.add_event(_session_recorded( + "2026-05-02T06:23:00", sid="s1", duration_s=60.0, + )) + payload = captured_publishes[0][1] + cmd_texts = [c["command_text"] for c in payload["commands"]] + assert "whoami" in cmd_texts + assert "older-than-window" not in cmd_texts + + +def test_back_to_back_sessions_emit_distinct_envelopes( + aggregator: _SessionAggregator, + captured_publishes: list[tuple[str, dict[str, Any], str]], +) -> None: + aggregator.add_event(_cmd("2026-05-02T06:22:50", "whoami")) + aggregator.add_event(_session_recorded( + "2026-05-02T06:23:00", sid="s1", duration_s=60.0, + )) + aggregator.add_event(_cmd("2026-05-02T06:30:00", "ls")) + aggregator.add_event(_session_recorded( + "2026-05-02T06:30:30", sid="s2", duration_s=60.0, + )) + assert len(captured_publishes) == 2 + s1, s2 = captured_publishes[0][1], captured_publishes[1][1] + assert s1["session_id"] == "s1" + assert s2["session_id"] == "s2" + # The second session window is 60s — only "ls" lands in it. + assert [c["command_text"] for c in s2["commands"]] == ["ls"] + + +def test_session_without_sid_falls_back_to_synthetic_id( + aggregator: _SessionAggregator, + captured_publishes: list[tuple[str, dict[str, Any], str]], +) -> None: + aggregator.add_event(_cmd("2026-05-02T06:22:50", "whoami")) + sr = _session_recorded("2026-05-02T06:23:00", sid="", duration_s=60.0) + sr["fields"]["sid"] = "" + aggregator.add_event(sr) + payload = captured_publishes[0][1] + assert payload["session_id"] is None + cmd_id = payload["commands"][0]["id"] + assert cmd_id.startswith(f"{_ATTACKER_IP}-2026-05-02T06:22:50") + + +# ── TTL eviction ──────────────────────────────────────────────────── + + +def test_ttl_eviction_drops_old_commands() -> None: + publishes: list[tuple[str, dict[str, Any], str]] = [] + + def _publish(topic: str, payload: dict[str, Any], event_type: str) -> None: + publishes.append((topic, payload, event_type)) + + agg = _SessionAggregator(_publish, ttl_sec=10.0) + agg.add_event(_cmd("2026-05-02T06:00:00", "old")) + # New command 30 seconds later — TTL=10s, so the old one evicts. + agg.add_event(_cmd("2026-05-02T06:00:30", "fresh")) + remaining = [ + p.get("fields", {}).get("command") + for _, p in agg._cmds[_ATTACKER_IP] + ] + assert remaining == ["fresh"] + + +def test_publish_failure_is_swallowed() -> None: + """A blowing-up publish must not propagate into the stream thread.""" + def _bad(_t: str, _p: dict[str, Any], _e: str) -> None: + raise RuntimeError("bus exploded") + + agg = _SessionAggregator(_bad, ttl_sec=3600.0) + agg.add_event(_cmd("2026-05-02T06:22:50", "whoami")) + # Should NOT raise. + agg.add_event(_session_recorded("2026-05-02T06:23:00", sid="s1")) diff --git a/tests/collector/test_session_ended_publish.py b/tests/collector/test_session_ended_publish.py new file mode 100644 index 00000000..cca8bc77 --- /dev/null +++ b/tests/collector/test_session_ended_publish.py @@ -0,0 +1,82 @@ +"""Collector publishes ``attacker.session.ended`` end-to-end. + +Wires :func:`_make_system_log_publisher` against a fake bus, drives +two parsed events (a CMD then a session_recorded) through the +returned publish_fn, and asserts the bus saw one +``attacker.session.ended`` envelope alongside the per-line +``system.log`` traffic. +""" +from __future__ import annotations + +import asyncio +from typing import Any, AsyncIterator + +import pytest +import pytest_asyncio + +from decnet.bus import topics as _topics +from decnet.bus.fake import FakeBus +from decnet.collector.worker import _make_system_log_publisher + + +@pytest_asyncio.fixture +async def bus() -> AsyncIterator[FakeBus]: + b = FakeBus() + await b.connect() + try: + yield b + finally: + await b.close() + + +@pytest.mark.asyncio +async def test_session_ended_published_alongside_system_log( + bus: FakeBus, +) -> None: + captured: list[tuple[str, dict[str, Any]]] = [] + sub = bus.subscribe(_topics.attacker(_topics.ATTACKER_SESSION_ENDED)) + + async def drain() -> None: + try: + async with sub: + async for ev in sub: + captured.append((ev.topic, ev.payload)) + except Exception: + pass + + drain_task = asyncio.create_task(drain()) + await asyncio.sleep(0) + + loop = asyncio.get_running_loop() + publish_fn = _make_system_log_publisher(bus, loop) + + publish_fn({ + "timestamp": "2026-05-02T06:22:48", + "decky": "SRV-DELTA-77", + "service": "bash", + "event_type": "command", + "attacker_ip": "192.168.1.5", + "fields": {"command": "whoami"}, + }) + publish_fn({ + "timestamp": "2026-05-02T06:23:00", + "decky": "omega-decky", + "service": "sessrec", + "event_type": "session_recorded", + "attacker_ip": "192.168.1.5", + "fields": { + "sid": "sess-abc", + "service": "ssh", + "duration_s": "60.0", + }, + }) + + # Give the marshalled publish a tick to land. + await asyncio.sleep(0.1) + drain_task.cancel() + + assert len(captured) == 1 + topic, payload = captured[0] + assert topic == _topics.attacker(_topics.ATTACKER_SESSION_ENDED) + assert payload["session_id"] == "sess-abc" + assert [c["command_text"] for c in payload["commands"]] == ["whoami"]