perf: rate-limit connect/disconnect events in collector to spare ingester

Connection-lifecycle events (connect, disconnect, accept, close) fire once
per TCP connection. During a portscan or credential-stuffing run this
firehoses the SQLite ingester with tiny WAL writes and starves all reads
until the queue drains.

The collector now deduplicates these events by
(attacker_ip, decky, service, event_type) over a 1-second window before
writing to the .json ingestion stream. The raw .log file is untouched, so
rsyslog/SIEM still see every event for forensic fidelity.

Tunable via DECNET_COLLECTOR_RL_WINDOW_SEC and DECNET_COLLECTOR_RL_EVENT_TYPES.
This commit is contained in:
2026-04-15 12:04:04 -04:00
parent 2d65d74069
commit f6cb90ee66
2 changed files with 197 additions and 4 deletions

View File

@@ -8,7 +8,10 @@ The ingester tails the .json file; rsyslog can consume the .log file independent
import asyncio
import json
import os
import re
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
@@ -17,6 +20,87 @@ from decnet.logging import get_logger
logger = get_logger("collector")
# ─── Ingestion rate limiter ───────────────────────────────────────────────────
#
# Rationale: connection-lifecycle events (connect/disconnect/accept/close) are
# emitted once per TCP connection. During a portscan or credential-stuffing
# run, a single attacker can generate hundreds of these per second from the
# honeypot services themselves — each becoming a tiny WAL-write transaction
# through the ingester, starving reads until the queue drains.
#
# The collector still writes every line to the raw .log file (forensic record
# for rsyslog/SIEM). Only the .json path — which feeds SQLite — is deduped.
#
# Dedup key: (attacker_ip, decky, service, event_type)
# Window: DECNET_COLLECTOR_RL_WINDOW_SEC seconds (default 1.0)
# Scope: DECNET_COLLECTOR_RL_EVENT_TYPES comma list
# (default: connect,disconnect,connection,accept,close)
# Events outside that set bypass the limiter untouched.
def _parse_float_env(name: str, default: float) -> float:
raw = os.environ.get(name)
if raw is None:
return default
try:
value = float(raw)
except ValueError:
logger.warning("collector: invalid %s=%r, using default %s", name, raw, default)
return default
return max(0.0, value)
_RL_WINDOW_SEC: float = _parse_float_env("DECNET_COLLECTOR_RL_WINDOW_SEC", 1.0)
_RL_EVENT_TYPES: frozenset[str] = frozenset(
t.strip()
for t in os.environ.get(
"DECNET_COLLECTOR_RL_EVENT_TYPES",
"connect,disconnect,connection,accept,close",
).split(",")
if t.strip()
)
_RL_MAX_ENTRIES: int = 10_000
_rl_lock: threading.Lock = threading.Lock()
_rl_last: dict[tuple[str, str, str, str], float] = {}
def _should_ingest(parsed: dict[str, Any]) -> bool:
"""
Return True if this parsed event should be written to the JSON ingestion
stream. Rate-limited connection-lifecycle events return False when another
event with the same (attacker_ip, decky, service, event_type) was emitted
inside the dedup window.
"""
event_type = parsed.get("event_type", "")
if _RL_WINDOW_SEC <= 0.0 or event_type not in _RL_EVENT_TYPES:
return True
key = (
parsed.get("attacker_ip", "Unknown"),
parsed.get("decky", ""),
parsed.get("service", ""),
event_type,
)
now = time.monotonic()
with _rl_lock:
last = _rl_last.get(key, 0.0)
if now - last < _RL_WINDOW_SEC:
return False
_rl_last[key] = now
# Opportunistic GC: when the map grows past the cap, drop entries older
# than 60 windows (well outside any realistic in-flight dedup range).
if len(_rl_last) > _RL_MAX_ENTRIES:
cutoff = now - (_RL_WINDOW_SEC * 60.0)
stale = [k for k, t in _rl_last.items() if t < cutoff]
for k in stale:
del _rl_last[k]
return True
def _reset_rate_limiter() -> None:
"""Test-only helper — clear dedup state between test cases."""
with _rl_lock:
_rl_last.clear()
# ─── RFC 5424 parser ──────────────────────────────────────────────────────────
_RFC5424_RE = re.compile(
@@ -140,9 +224,16 @@ def _stream_container(container_id: str, log_path: Path, json_path: Path) -> Non
lf.flush()
parsed = parse_rfc5424(line)
if parsed:
if _should_ingest(parsed):
logger.debug("collector: event written decky=%s type=%s", parsed.get("decky"), parsed.get("event_type"))
jf.write(json.dumps(parsed) + "\n")
jf.flush()
else:
logger.debug(
"collector: rate-limited decky=%s service=%s type=%s attacker=%s",
parsed.get("decky"), parsed.get("service"),
parsed.get("event_type"), parsed.get("attacker_ip"),
)
else:
logger.debug("collector: malformed RFC5424 line snippet=%r", line[:80])
except Exception as exc:

View File

@@ -9,7 +9,9 @@ from decnet.collector import parse_rfc5424, is_service_container, is_service_eve
from decnet.collector.worker import (
_stream_container,
_load_service_container_names,
log_collector_worker
_should_ingest,
_reset_rate_limiter,
log_collector_worker,
)
_KNOWN_NAMES = {"omega-decky-http", "omega-decky-smtp", "relay-decky-ftp"}
@@ -287,6 +289,106 @@ class TestStreamContainer:
assert log_path.read_text() == ""
class TestIngestRateLimiter:
def setup_method(self):
_reset_rate_limiter()
def _event(self, event_type="connect", attacker_ip="1.2.3.4",
decky="decky-01", service="ssh"):
return {
"event_type": event_type,
"attacker_ip": attacker_ip,
"decky": decky,
"service": service,
}
def test_non_limited_event_types_always_pass(self):
# login_attempt / request / etc. carry distinguishing payload — never deduped.
for _ in range(5):
assert _should_ingest(self._event(event_type="login_attempt")) is True
assert _should_ingest(self._event(event_type="request")) is True
def test_first_connect_passes(self):
assert _should_ingest(self._event()) is True
def test_duplicate_connect_within_window_is_dropped(self):
assert _should_ingest(self._event()) is True
assert _should_ingest(self._event()) is False
assert _should_ingest(self._event()) is False
def test_different_attackers_tracked_independently(self):
assert _should_ingest(self._event(attacker_ip="1.1.1.1")) is True
assert _should_ingest(self._event(attacker_ip="2.2.2.2")) is True
def test_different_deckies_tracked_independently(self):
assert _should_ingest(self._event(decky="a")) is True
assert _should_ingest(self._event(decky="b")) is True
def test_different_services_tracked_independently(self):
assert _should_ingest(self._event(service="ssh")) is True
assert _should_ingest(self._event(service="http")) is True
def test_disconnect_and_connect_tracked_independently(self):
assert _should_ingest(self._event(event_type="connect")) is True
assert _should_ingest(self._event(event_type="disconnect")) is True
def test_window_expiry_allows_next_event(self, monkeypatch):
import decnet.collector.worker as worker
t = [1000.0]
monkeypatch.setattr(worker.time, "monotonic", lambda: t[0])
assert _should_ingest(self._event()) is True
assert _should_ingest(self._event()) is False
# Advance past 1-second window.
t[0] += 1.5
assert _should_ingest(self._event()) is True
def test_window_zero_disables_limiter(self, monkeypatch):
import decnet.collector.worker as worker
monkeypatch.setattr(worker, "_RL_WINDOW_SEC", 0.0)
for _ in range(10):
assert _should_ingest(self._event()) is True
def test_raw_log_gets_all_lines_json_dedupes(self, tmp_path):
"""End-to-end: duplicates hit the .log file but NOT the .json stream."""
log_path = tmp_path / "test.log"
json_path = tmp_path / "test.json"
line = (
'<134>1 2024-01-15T12:00:00+00:00 decky-01 ssh - connect '
'[decnet@55555 src_ip="1.2.3.4"]\n'
)
payload = (line * 5).encode("utf-8")
mock_container = MagicMock()
mock_container.logs.return_value = [payload]
mock_client = MagicMock()
mock_client.containers.get.return_value = mock_container
with patch("docker.from_env", return_value=mock_client):
_stream_container("test-id", log_path, json_path)
# Raw log: all 5 lines preserved (forensic fidelity).
assert log_path.read_text().count("\n") == 5
# JSON ingest: only the first one written (4 dropped by the limiter).
json_lines = [l for l in json_path.read_text().splitlines() if l.strip()]
assert len(json_lines) == 1
def test_gc_trims_oversized_map(self, monkeypatch):
import decnet.collector.worker as worker
# Seed the map with stale entries, then push past the cap.
monkeypatch.setattr(worker, "_RL_MAX_ENTRIES", 10)
t = [1000.0]
monkeypatch.setattr(worker.time, "monotonic", lambda: t[0])
for i in range(9):
assert _should_ingest(self._event(attacker_ip=f"10.0.0.{i}")) is True
# Jump well past 60 windows to make prior entries stale.
t[0] += 1000.0
# This insertion pushes len to 10; GC triggers on >10 so stays.
assert _should_ingest(self._event(attacker_ip="10.0.0.99")) is True
assert _should_ingest(self._event(attacker_ip="10.0.0.100")) is True
# After the map exceeds the cap, stale entries must be purged.
assert len(worker._rl_last) < 10
class TestLogCollectorWorker:
@pytest.mark.asyncio
async def test_worker_initial_discovery(self, tmp_path):