diff --git a/decnet/collector/worker.py b/decnet/collector/worker.py index d96ed4f..7b73acd 100644 --- a/decnet/collector/worker.py +++ b/decnet/collector/worker.py @@ -8,7 +8,10 @@ The ingester tails the .json file; rsyslog can consume the .log file independent import asyncio import json +import os import re +import threading +import time from datetime import datetime from pathlib import Path from typing import Any, Optional @@ -17,6 +20,87 @@ from decnet.logging import get_logger logger = get_logger("collector") +# ─── Ingestion rate limiter ─────────────────────────────────────────────────── +# +# Rationale: connection-lifecycle events (connect/disconnect/accept/close) are +# emitted once per TCP connection. During a portscan or credential-stuffing +# run, a single attacker can generate hundreds of these per second from the +# honeypot services themselves — each becoming a tiny WAL-write transaction +# through the ingester, starving reads until the queue drains. +# +# The collector still writes every line to the raw .log file (forensic record +# for rsyslog/SIEM). Only the .json path — which feeds SQLite — is deduped. +# +# Dedup key: (attacker_ip, decky, service, event_type) +# Window: DECNET_COLLECTOR_RL_WINDOW_SEC seconds (default 1.0) +# Scope: DECNET_COLLECTOR_RL_EVENT_TYPES comma list +# (default: connect,disconnect,connection,accept,close) +# Events outside that set bypass the limiter untouched. + +def _parse_float_env(name: str, default: float) -> float: + raw = os.environ.get(name) + if raw is None: + return default + try: + value = float(raw) + except ValueError: + logger.warning("collector: invalid %s=%r, using default %s", name, raw, default) + return default + return max(0.0, value) + + +_RL_WINDOW_SEC: float = _parse_float_env("DECNET_COLLECTOR_RL_WINDOW_SEC", 1.0) +_RL_EVENT_TYPES: frozenset[str] = frozenset( + t.strip() + for t in os.environ.get( + "DECNET_COLLECTOR_RL_EVENT_TYPES", + "connect,disconnect,connection,accept,close", + ).split(",") + if t.strip() +) +_RL_MAX_ENTRIES: int = 10_000 + +_rl_lock: threading.Lock = threading.Lock() +_rl_last: dict[tuple[str, str, str, str], float] = {} + + +def _should_ingest(parsed: dict[str, Any]) -> bool: + """ + Return True if this parsed event should be written to the JSON ingestion + stream. Rate-limited connection-lifecycle events return False when another + event with the same (attacker_ip, decky, service, event_type) was emitted + inside the dedup window. + """ + event_type = parsed.get("event_type", "") + if _RL_WINDOW_SEC <= 0.0 or event_type not in _RL_EVENT_TYPES: + return True + key = ( + parsed.get("attacker_ip", "Unknown"), + parsed.get("decky", ""), + parsed.get("service", ""), + event_type, + ) + now = time.monotonic() + with _rl_lock: + last = _rl_last.get(key, 0.0) + if now - last < _RL_WINDOW_SEC: + return False + _rl_last[key] = now + # Opportunistic GC: when the map grows past the cap, drop entries older + # than 60 windows (well outside any realistic in-flight dedup range). + if len(_rl_last) > _RL_MAX_ENTRIES: + cutoff = now - (_RL_WINDOW_SEC * 60.0) + stale = [k for k, t in _rl_last.items() if t < cutoff] + for k in stale: + del _rl_last[k] + return True + + +def _reset_rate_limiter() -> None: + """Test-only helper — clear dedup state between test cases.""" + with _rl_lock: + _rl_last.clear() + # ─── RFC 5424 parser ────────────────────────────────────────────────────────── _RFC5424_RE = re.compile( @@ -140,9 +224,16 @@ def _stream_container(container_id: str, log_path: Path, json_path: Path) -> Non lf.flush() parsed = parse_rfc5424(line) if parsed: - logger.debug("collector: event written decky=%s type=%s", parsed.get("decky"), parsed.get("event_type")) - jf.write(json.dumps(parsed) + "\n") - jf.flush() + if _should_ingest(parsed): + logger.debug("collector: event written decky=%s type=%s", parsed.get("decky"), parsed.get("event_type")) + jf.write(json.dumps(parsed) + "\n") + jf.flush() + else: + logger.debug( + "collector: rate-limited decky=%s service=%s type=%s attacker=%s", + parsed.get("decky"), parsed.get("service"), + parsed.get("event_type"), parsed.get("attacker_ip"), + ) else: logger.debug("collector: malformed RFC5424 line snippet=%r", line[:80]) except Exception as exc: diff --git a/tests/test_collector.py b/tests/test_collector.py index d43f2e3..1ef4766 100644 --- a/tests/test_collector.py +++ b/tests/test_collector.py @@ -9,7 +9,9 @@ from decnet.collector import parse_rfc5424, is_service_container, is_service_eve from decnet.collector.worker import ( _stream_container, _load_service_container_names, - log_collector_worker + _should_ingest, + _reset_rate_limiter, + log_collector_worker, ) _KNOWN_NAMES = {"omega-decky-http", "omega-decky-smtp", "relay-decky-ftp"} @@ -287,6 +289,106 @@ class TestStreamContainer: assert log_path.read_text() == "" +class TestIngestRateLimiter: + def setup_method(self): + _reset_rate_limiter() + + def _event(self, event_type="connect", attacker_ip="1.2.3.4", + decky="decky-01", service="ssh"): + return { + "event_type": event_type, + "attacker_ip": attacker_ip, + "decky": decky, + "service": service, + } + + def test_non_limited_event_types_always_pass(self): + # login_attempt / request / etc. carry distinguishing payload — never deduped. + for _ in range(5): + assert _should_ingest(self._event(event_type="login_attempt")) is True + assert _should_ingest(self._event(event_type="request")) is True + + def test_first_connect_passes(self): + assert _should_ingest(self._event()) is True + + def test_duplicate_connect_within_window_is_dropped(self): + assert _should_ingest(self._event()) is True + assert _should_ingest(self._event()) is False + assert _should_ingest(self._event()) is False + + def test_different_attackers_tracked_independently(self): + assert _should_ingest(self._event(attacker_ip="1.1.1.1")) is True + assert _should_ingest(self._event(attacker_ip="2.2.2.2")) is True + + def test_different_deckies_tracked_independently(self): + assert _should_ingest(self._event(decky="a")) is True + assert _should_ingest(self._event(decky="b")) is True + + def test_different_services_tracked_independently(self): + assert _should_ingest(self._event(service="ssh")) is True + assert _should_ingest(self._event(service="http")) is True + + def test_disconnect_and_connect_tracked_independently(self): + assert _should_ingest(self._event(event_type="connect")) is True + assert _should_ingest(self._event(event_type="disconnect")) is True + + def test_window_expiry_allows_next_event(self, monkeypatch): + import decnet.collector.worker as worker + t = [1000.0] + monkeypatch.setattr(worker.time, "monotonic", lambda: t[0]) + assert _should_ingest(self._event()) is True + assert _should_ingest(self._event()) is False + # Advance past 1-second window. + t[0] += 1.5 + assert _should_ingest(self._event()) is True + + def test_window_zero_disables_limiter(self, monkeypatch): + import decnet.collector.worker as worker + monkeypatch.setattr(worker, "_RL_WINDOW_SEC", 0.0) + for _ in range(10): + assert _should_ingest(self._event()) is True + + def test_raw_log_gets_all_lines_json_dedupes(self, tmp_path): + """End-to-end: duplicates hit the .log file but NOT the .json stream.""" + log_path = tmp_path / "test.log" + json_path = tmp_path / "test.json" + line = ( + '<134>1 2024-01-15T12:00:00+00:00 decky-01 ssh - connect ' + '[decnet@55555 src_ip="1.2.3.4"]\n' + ) + payload = (line * 5).encode("utf-8") + + mock_container = MagicMock() + mock_container.logs.return_value = [payload] + mock_client = MagicMock() + mock_client.containers.get.return_value = mock_container + + with patch("docker.from_env", return_value=mock_client): + _stream_container("test-id", log_path, json_path) + + # Raw log: all 5 lines preserved (forensic fidelity). + assert log_path.read_text().count("\n") == 5 + # JSON ingest: only the first one written (4 dropped by the limiter). + json_lines = [l for l in json_path.read_text().splitlines() if l.strip()] + assert len(json_lines) == 1 + + def test_gc_trims_oversized_map(self, monkeypatch): + import decnet.collector.worker as worker + # Seed the map with stale entries, then push past the cap. + monkeypatch.setattr(worker, "_RL_MAX_ENTRIES", 10) + t = [1000.0] + monkeypatch.setattr(worker.time, "monotonic", lambda: t[0]) + for i in range(9): + assert _should_ingest(self._event(attacker_ip=f"10.0.0.{i}")) is True + # Jump well past 60 windows to make prior entries stale. + t[0] += 1000.0 + # This insertion pushes len to 10; GC triggers on >10 so stays. + assert _should_ingest(self._event(attacker_ip="10.0.0.99")) is True + assert _should_ingest(self._event(attacker_ip="10.0.0.100")) is True + # After the map exceeds the cap, stale entries must be purged. + assert len(worker._rl_last) < 10 + + class TestLogCollectorWorker: @pytest.mark.asyncio async def test_worker_initial_discovery(self, tmp_path):