Real-world bug surfaced on the first live decky run: sessrec.c's json_escape (decnet/templates/_shared/sessrec/sessrec.c:111-141) only escapes bytes < 0x20 + DEL — bytes >= 0x80 pass through raw. An attacker pasting Latin-1 / GB18030 / any non-UTF-8 8-bit text yields a shard line that chokes Python's default UTF-8 text-mode read with 'utf-8 codec can't decode byte 0xac'. Three changes: 1. _events_for_sid now opens with errors='surrogateescape', preserving byte fidelity through the JSON parse. Surrogate-half chars correctly fail isascii() / isalpha() so the typed-letter histograms filter them out automatically. Tightening sessrec.c to escape >= 0x80 is filed for v0.2 — that's the proper forensic-data fix; the surrogateescape read makes the engine robust meanwhile. 2. Regression test (test_handler_tolerates_non_utf8_bytes_in_shard) builds a shard with raw 0xAC bytes inside a JSON 'data' string and asserts the handler still persists observations. 3. Collector's _emit_session now logs at WARNING (was DEBUG) when find_shard_with_sid returns None, citing the three usual causes (ARTIFACTS_ROOT perms, _SERVICE_RE whitelist, sessrec/collector race). Surfaces the silent-skip class of bug in seconds instead of hours — the first live run hid a perm mismatch (User=anti without SupplementaryGroups=decnet) for an entire session window before the symptom was traced upstream.
906 lines
38 KiB
Python
906 lines
38 KiB
Python
"""
|
|
Host-side Docker log collector.
|
|
|
|
Streams stdout from all running decky service containers via the Docker SDK,
|
|
writes RFC 5424 lines to <log_file> and parsed JSON records to <log_file>.json.
|
|
The ingester tails the .json file; rsyslog can consume the .log file independently.
|
|
"""
|
|
|
|
import asyncio
|
|
import contextlib
|
|
import json
|
|
import os
|
|
import re
|
|
import threading
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Optional
|
|
|
|
from decnet.artifacts.shards import find_shard_with_sid
|
|
from decnet.bus import topics as _topics
|
|
from decnet.bus.factory import get_bus
|
|
from decnet.bus.publish import (
|
|
make_thread_safe_publisher,
|
|
run_control_listener_signal,
|
|
run_health_heartbeat,
|
|
)
|
|
from decnet.logging import get_logger
|
|
from decnet.telemetry import traced as _traced, get_tracer as _get_tracer, inject_context as _inject_ctx
|
|
|
|
# Collector publish signature: ``publish_fn(parsed_event_dict)``. Callable
|
|
# from the container-stream threads; the worker wraps it around a thread-safe
|
|
# bus publisher that marshals onto the asyncio loop.
|
|
CollectorPublishFn = Callable[[dict[str, Any]], None]
|
|
|
|
logger = get_logger("collector")
|
|
|
|
# ─── Ingestion rate limiter ───────────────────────────────────────────────────
|
|
#
|
|
# Rationale: connection-lifecycle events (connect/disconnect/accept/close) are
|
|
# emitted once per TCP connection. During a portscan or credential-stuffing
|
|
# run, a single attacker can generate hundreds of these per second from the
|
|
# honeypot services themselves — each becoming a tiny WAL-write transaction
|
|
# through the ingester, starving reads until the queue drains.
|
|
#
|
|
# The collector still writes every line to the raw .log file (forensic record
|
|
# for rsyslog/SIEM). Only the .json path — which feeds SQLite — is deduped.
|
|
#
|
|
# Dedup key: (attacker_ip, decky, service, event_type)
|
|
# Window: DECNET_COLLECTOR_RL_WINDOW_SEC seconds (default 1.0)
|
|
# Scope: DECNET_COLLECTOR_RL_EVENT_TYPES comma list
|
|
# (default: connect,disconnect,connection,accept,close)
|
|
# Events outside that set bypass the limiter untouched.
|
|
|
|
def _parse_float_env(name: str, default: float) -> float:
|
|
raw = os.environ.get(name)
|
|
if raw is None:
|
|
return default
|
|
try:
|
|
value = float(raw)
|
|
except ValueError:
|
|
logger.warning("collector: invalid %s=%r, using default %s", name, raw, default)
|
|
return default
|
|
return max(0.0, value)
|
|
|
|
|
|
_RL_WINDOW_SEC: float = _parse_float_env("DECNET_COLLECTOR_RL_WINDOW_SEC", 1.0)
|
|
_RL_EVENT_TYPES: frozenset[str] = frozenset(
|
|
t.strip()
|
|
for t in os.environ.get(
|
|
"DECNET_COLLECTOR_RL_EVENT_TYPES",
|
|
"connect,disconnect,connection,accept,close",
|
|
).split(",")
|
|
if t.strip()
|
|
)
|
|
_RL_MAX_ENTRIES: int = 10_000
|
|
|
|
# APP-NAMEs we never want to see in the ingestion stream — native unix
|
|
# daemons that share a container with a DECNET service. Their logs are
|
|
# noise: sshd's "Failed password for root from X" duplicates the
|
|
# auth-helper's structured `auth_attempt` event, pam_unix repeats it
|
|
# again, and CRON/systemd/etc. say nothing about attacker behavior.
|
|
# Override or extend with DECNET_COLLECTOR_DROP_APPS (comma list).
|
|
_DROP_APPS: frozenset[str] = frozenset(
|
|
a.strip()
|
|
for a in os.environ.get(
|
|
"DECNET_COLLECTOR_DROP_APPS",
|
|
"sshd,pam_unix,sudo,su,CRON,cron,systemd,kernel,rsyslogd,dbus-daemon",
|
|
).split(",")
|
|
if a.strip()
|
|
)
|
|
|
|
_rl_lock: threading.Lock = threading.Lock()
|
|
_rl_last: dict[tuple[str, str, str, str], float] = {}
|
|
|
|
|
|
def _should_ingest(parsed: dict[str, Any]) -> bool:
|
|
"""
|
|
Return True if this parsed event should be written to the JSON ingestion
|
|
stream. Drops native unix daemon noise (sshd, pam_unix, …) outright;
|
|
rate-limits connection-lifecycle events within a dedup window.
|
|
"""
|
|
if parsed.get("service", "") in _DROP_APPS:
|
|
return False
|
|
event_type = parsed.get("event_type", "")
|
|
if _RL_WINDOW_SEC <= 0.0 or event_type not in _RL_EVENT_TYPES:
|
|
return True
|
|
key = (
|
|
parsed.get("attacker_ip", "Unknown"),
|
|
parsed.get("decky", ""),
|
|
parsed.get("service", ""),
|
|
event_type,
|
|
)
|
|
now = time.monotonic()
|
|
with _rl_lock:
|
|
last = _rl_last.get(key, 0.0)
|
|
if now - last < _RL_WINDOW_SEC:
|
|
return False
|
|
_rl_last[key] = now
|
|
# Opportunistic GC: when the map grows past the cap, drop entries older
|
|
# than 60 windows (well outside any realistic in-flight dedup range).
|
|
if len(_rl_last) > _RL_MAX_ENTRIES:
|
|
cutoff = now - (_RL_WINDOW_SEC * 60.0)
|
|
stale = [k for k, t in _rl_last.items() if t < cutoff]
|
|
for k in stale:
|
|
del _rl_last[k]
|
|
return True
|
|
|
|
|
|
def _reset_rate_limiter() -> None:
|
|
"""Test-only helper — clear dedup state between test cases."""
|
|
with _rl_lock:
|
|
_rl_last.clear()
|
|
|
|
|
|
# ─── Session aggregator (TTP `attacker.session.ended` producer) ──────────────
|
|
#
|
|
# The TTP worker subscribes to ``attacker.session.ended`` and turns each
|
|
# emitted command into a ``source_kind="command"`` :class:`TaggerEvent`
|
|
# (see ``decnet/ttp/worker._build_events``). No upstream worker was
|
|
# producing that topic — the rule pack therefore never fired on live
|
|
# traffic. The aggregator below indexes shell-command events
|
|
# per-attacker_ip and emits one ``attacker.session.ended`` envelope
|
|
# whenever the SSH ``sessrec`` worker publishes ``session_recorded``.
|
|
#
|
|
# Memory bound: each attacker_ip's deque is capped by a TTL eviction
|
|
# (default 3600 s). Override via ``DECNET_COLLECTOR_SESSION_AGG_TTL_SEC``.
|
|
|
|
_SESSION_AGG_TTL_SEC: float = _parse_float_env(
|
|
"DECNET_COLLECTOR_SESSION_AGG_TTL_SEC", 3600.0,
|
|
)
|
|
|
|
|
|
# Body of a bash PROMPT_COMMAND CMD line:
|
|
# ``CMD uid=0 user=root src=192.168.1.5 pwd=/root cmd=ls /var/www/html``
|
|
# Splits into the structured fields the inspector renders + the
|
|
# residual ``cmd=`` value (which may itself contain spaces — preserve
|
|
# everything after ``cmd=`` as one token, do NOT word-split).
|
|
_CMD_BODY_HEAD_KV_RE = re.compile(r'(\w+)=(\S+)')
|
|
|
|
|
|
def _parse_cmd_msg(msg: str) -> dict[str, str]:
|
|
"""Split a bash CMD msg body into ``{uid, user, src, pwd, command}``.
|
|
|
|
Returns the empty dict on a non-CMD msg. ``command`` carries the
|
|
full post-``cmd=`` rest, including any embedded whitespace —
|
|
tools like ``nmap -p- 192.168.1.0/24`` would otherwise lose
|
|
everything after the first space.
|
|
"""
|
|
if not msg.startswith("CMD "):
|
|
return {}
|
|
head, sep, cmd_rest = msg[4:].partition("cmd=")
|
|
out: dict[str, str] = {}
|
|
for k, v in _CMD_BODY_HEAD_KV_RE.findall(head):
|
|
out[k] = v
|
|
if sep:
|
|
out["command"] = cmd_rest
|
|
return out
|
|
|
|
|
|
def _parse_iso_ts(value: str) -> Optional[datetime]:
|
|
"""Best-effort ISO-8601 parse for parsed event timestamps.
|
|
|
|
The collector's parser stamps ``timestamp`` either as the original
|
|
ISO-8601 string (when ``datetime.fromisoformat`` failed) or as the
|
|
reformatted ``%Y-%m-%d %H:%M:%S`` string. Both round-trip through
|
|
``fromisoformat`` after a space→T swap. Returns None if neither
|
|
shape parses — the aggregator skips events it can't time-stamp.
|
|
"""
|
|
if not value:
|
|
return None
|
|
candidates = (value, value.replace(" ", "T"))
|
|
for cand in candidates:
|
|
try:
|
|
return datetime.fromisoformat(cand)
|
|
except ValueError:
|
|
continue
|
|
return None
|
|
|
|
|
|
class _SessionAggregator:
|
|
"""Per-attacker_ip command index that emits ``attacker.session.ended``.
|
|
|
|
Thread-safe — :meth:`add_event` is called from the per-container
|
|
stream threads. Internal state is protected by a single lock; the
|
|
publish fan-out happens inside the lock for simplicity (the
|
|
downstream publish_fn is the thread-safe marshaller from
|
|
:mod:`decnet.bus.publish`, which is non-blocking).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
publish_fn: Callable[[str, dict[str, Any], str], None],
|
|
*,
|
|
ttl_sec: float = _SESSION_AGG_TTL_SEC,
|
|
) -> None:
|
|
self._publish = publish_fn
|
|
self._ttl = ttl_sec
|
|
self._lock = threading.Lock()
|
|
# attacker_ip → list of (timestamp, parsed_event) tuples.
|
|
# Stored as a list rather than a deque so the ``in_window``
|
|
# filter can index linearly; the per-attacker volume is
|
|
# bounded by the TTL and by typical session size (≤ a few
|
|
# hundred commands) so this stays cheap.
|
|
self._cmds: dict[str, list[tuple[datetime, dict[str, Any]]]] = {}
|
|
|
|
def add_event(self, parsed: dict[str, Any]) -> None:
|
|
"""Index a parsed event. Emits on ``session_recorded``."""
|
|
event_type = parsed.get("event_type", "")
|
|
attacker_ip = parsed.get("attacker_ip") or ""
|
|
if not attacker_ip or attacker_ip == "Unknown":
|
|
return
|
|
ts = _parse_iso_ts(str(parsed.get("timestamp", "")))
|
|
if ts is None:
|
|
return
|
|
with self._lock:
|
|
self._evict_expired(ts)
|
|
if event_type == "command":
|
|
self._cmds.setdefault(attacker_ip, []).append((ts, parsed))
|
|
return
|
|
if event_type == "session_recorded":
|
|
self._emit_session(parsed, attacker_ip, ts)
|
|
|
|
def _evict_expired(self, now: datetime) -> None:
|
|
"""Drop commands older than ``self._ttl`` seconds."""
|
|
cutoff = now.timestamp() - self._ttl
|
|
for ip, entries in list(self._cmds.items()):
|
|
kept = [(t, p) for t, p in entries if t.timestamp() >= cutoff]
|
|
if kept:
|
|
self._cmds[ip] = kept
|
|
else:
|
|
del self._cmds[ip]
|
|
|
|
def _emit_session(
|
|
self, parsed: dict[str, Any], attacker_ip: str, ended_at: datetime,
|
|
) -> None:
|
|
"""Build an ``attacker.session.ended`` envelope and publish it.
|
|
|
|
Slices the per-IP deque to commands whose timestamp falls
|
|
inside ``[ended_at - duration_s, ended_at]``. Commands stay in
|
|
the deque after the slice — the TTL eviction is the only path
|
|
that drops them, so two back-to-back sessions for the same IP
|
|
share the visible window without losing rows.
|
|
"""
|
|
fields = parsed.get("fields", {}) or {}
|
|
duration_raw = fields.get("duration_s") or "0"
|
|
try:
|
|
duration_s = float(duration_raw)
|
|
except (TypeError, ValueError):
|
|
duration_s = 0.0
|
|
sid = str(fields.get("sid") or "")
|
|
service = str(fields.get("service") or parsed.get("service") or "")
|
|
decky = parsed.get("decky") or ""
|
|
|
|
commands_window = self._cmds.get(attacker_ip, [])
|
|
cutoff_lo = ended_at.timestamp() - max(duration_s, 0.0)
|
|
commands: list[dict[str, Any]] = []
|
|
for idx, (cmd_ts, cmd_parsed) in enumerate(commands_window):
|
|
if cmd_ts.timestamp() < cutoff_lo:
|
|
continue
|
|
cmd_fields = cmd_parsed.get("fields", {}) or {}
|
|
# Pull structured uid/user/src/pwd/command from the bash
|
|
# msg body. The inspector renders these as separate
|
|
# key/value rows, which is much friendlier than dumping
|
|
# the raw ``CMD uid=0 user=... cmd=...`` string into a
|
|
# single ``command_text`` blob.
|
|
parsed_kv = _parse_cmd_msg(str(cmd_parsed.get("msg", "")))
|
|
cmd_text = (
|
|
cmd_fields.get("command")
|
|
or cmd_fields.get("cmd")
|
|
or parsed_kv.get("command")
|
|
or cmd_parsed.get("msg", "")
|
|
)
|
|
entry: dict[str, Any] = {
|
|
"id": f"{sid}#{idx}" if sid else f"{attacker_ip}-{cmd_ts.isoformat()}",
|
|
"command_text": str(cmd_text),
|
|
"ts": cmd_ts.isoformat(),
|
|
"decky": cmd_parsed.get("decky", ""),
|
|
"service": cmd_parsed.get("service", ""),
|
|
}
|
|
for key in ("uid", "user", "src", "pwd"):
|
|
value = parsed_kv.get(key) or cmd_fields.get(key)
|
|
if value is not None:
|
|
entry[key] = value
|
|
commands.append(entry)
|
|
|
|
# Resolve the asciinema shard so consumers (notably the BEHAVE-SHELL
|
|
# session-ended handler in the profiler worker) don't each have to
|
|
# disk-reach independently. Shard fields can be malformed or the
|
|
# transcripts dir may not exist yet — find_shard_with_sid returns
|
|
# None in those cases and we publish ``shard_path: None`` so the
|
|
# consumer skips honestly. Additive field; existing TTP consumers
|
|
# ignore it.
|
|
shard_path: str | None = None
|
|
resolve_error: str | None = None
|
|
if sid and decky and service:
|
|
try:
|
|
resolved = find_shard_with_sid(decky, service, sid)
|
|
except (ValueError, OSError, PermissionError) as exc:
|
|
resolve_error = f"{type(exc).__name__}: {exc}"
|
|
resolved = None
|
|
if resolved is not None:
|
|
shard_path = str(resolved)
|
|
if shard_path is None and sid:
|
|
# Loud-by-default — the BEHAVE-SHELL handler will skip
|
|
# session.ended events with shard_path=None, so a silent
|
|
# miss here means the profiler panel never hydrates. Surface
|
|
# the most common failure modes inline so the operator can
|
|
# diagnose without grepping decnet/artifacts/shards.py.
|
|
#
|
|
# 1. ARTIFACTS_ROOT not readable by the collector's user
|
|
# (perm 0750 decnet:decnet vs. User=anti without
|
|
# SupplementaryGroups=decnet).
|
|
# 2. service whitelist (_SERVICE_RE accepts ssh|telnet only).
|
|
# 3. sessrec hasn't flushed the shard for this sid yet
|
|
# (collector tick won the race; next tick recovers).
|
|
logger.warning(
|
|
"collector: shard_path=None decky=%s service=%s sid=%s "
|
|
"(error=%s) — profiler will skip this session.ended; "
|
|
"check ARTIFACTS_ROOT perms / service whitelist",
|
|
decky, service, sid, resolve_error or "shard not found",
|
|
)
|
|
|
|
payload: dict[str, Any] = {
|
|
"session_id": sid or None,
|
|
"attacker_uuid": None, # consumer resolves via repo
|
|
"attacker_ip": attacker_ip,
|
|
"decky_id": decky,
|
|
"service": service,
|
|
"ended_at": ended_at.isoformat(),
|
|
"duration_s": duration_s,
|
|
"commands": commands,
|
|
"shard_path": shard_path,
|
|
}
|
|
topic = _topics.attacker(_topics.ATTACKER_SESSION_ENDED)
|
|
try:
|
|
self._publish(topic, payload, _topics.ATTACKER_SESSION_ENDED)
|
|
except Exception as exc: # noqa: BLE001
|
|
logger.debug(
|
|
"collector: session.ended publish failed: %s", exc,
|
|
)
|
|
|
|
# ─── RFC 5424 parser ──────────────────────────────────────────────────────────
|
|
|
|
_RFC5424_RE = re.compile(
|
|
r"^<\d+>1 "
|
|
r"(\S+) " # 1: TIMESTAMP
|
|
r"(\S+) " # 2: HOSTNAME (decky name)
|
|
r"(\S+) " # 3: APP-NAME (service)
|
|
r"\S+ " # PROCID — NILVALUE ("-") for syslog_bridge emitters,
|
|
# real PID for native syslog callers like sshd/sudo
|
|
# routed through rsyslog. Accept both; we don't consume it.
|
|
r"(\S+) " # 4: MSGID (event_type)
|
|
r"(.+)$", # 5: SD element + optional MSG
|
|
)
|
|
|
|
# Honeypot SSH containers export a ``PROMPT_COMMAND`` that calls
|
|
# ``logger --rfc5424 --msgid command -p user.info -t bash "CMD …"``.
|
|
# That inner RFC 5424 line lands on the container's stdout, where the
|
|
# Docker stream reader prepends ANOTHER RFC 5424 envelope (PRI=14,
|
|
# HOSTNAME=<decky>, APP-NAME=1, MSGID=NIL). The outer parse therefore
|
|
# sees ``event_type == "-"`` while the real MSGID (``command``) is
|
|
# inside the body. We detect that case and re-extract the inner
|
|
# ``HOSTNAME APP-NAME PROCID MSGID rest`` so downstream consumers see
|
|
# ``event_type == "command"`` plus the real source hostname.
|
|
#
|
|
# Anchored on an ISO-8601 timestamp at the head of the body so we
|
|
# don't false-match free-form prose like "Connection from 1.2.3.4".
|
|
_INNER_RFC5424_RE = re.compile(
|
|
r"^(\d{4}-\d{2}-\d{2}T\S+)\s+" # 1: inner TIMESTAMP
|
|
r"(\S+)\s+" # 2: inner HOSTNAME
|
|
r"(\S+)\s+" # 3: inner APP-NAME
|
|
r"\S+\s+" # PROCID (NIL or PID)
|
|
r"(\S+)\s+" # 4: inner MSGID
|
|
r"(.+)$", # 5: inner SD/MSG remainder
|
|
)
|
|
_SD_BLOCK_RE = re.compile(r'\[relay@55555\s+(.*?)\]', re.DOTALL)
|
|
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
|
|
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "remote_addr", "target_ip", "ip")
|
|
|
|
# Free-form `key=value` pairs in the MSG body. Used for lines that bypass the
|
|
# syslog_bridge SD format — e.g. the SSH container's PROMPT_COMMAND which
|
|
# calls `logger -t bash "CMD uid=0 user=root src=1.2.3.4 pwd=/root cmd=…"`.
|
|
# Values run until the next whitespace, so `cmd=…` at end-of-line is preserved
|
|
# as one unit; we only care about IP-shaped fields here anyway.
|
|
_MSG_KV_RE = re.compile(r'(\w+)=(\S+)')
|
|
|
|
# Native sshd / pam syslog lines arrive without an SD block and without
|
|
# key=value pairs. The remote address shows up as free prose:
|
|
# "Failed password for root from 1.2.3.4 port 42772 ssh2"
|
|
# "Connection from 1.2.3.4 port 42772 on 10.0.0.2 port 22"
|
|
# "pam_unix(sshd:auth): authentication failure; … rhost=1.2.3.4 user=root"
|
|
# Anchored patterns first so we never confuse the attacker with the
|
|
# local listener IP ("on 10.0.0.2"). Bare IP scan is the last resort.
|
|
_IPV4 = r"\d{1,3}(?:\.\d{1,3}){3}"
|
|
_IPV6 = r"[0-9a-fA-F:]+:[0-9a-fA-F:]+"
|
|
_IP = rf"(?:{_IPV4}|{_IPV6})"
|
|
_MSG_IP_ANCHORED_RE = re.compile(
|
|
rf"\b(?:from|rhost[:=]|client[:=]|src[:=])\s*({_IP})",
|
|
re.IGNORECASE,
|
|
)
|
|
_MSG_IP_BARE_RE = re.compile(rf"\b({_IPV4})\b")
|
|
|
|
|
|
def parse_rfc5424(line: str) -> Optional[dict[str, Any]]:
|
|
"""
|
|
Parse an RFC 5424 DECNET log line into a structured dict.
|
|
Returns None if the line does not match the expected format.
|
|
"""
|
|
m = _RFC5424_RE.match(line)
|
|
if not m:
|
|
return None
|
|
ts_raw, decky, service, event_type, sd_rest = m.groups()
|
|
|
|
fields: dict[str, str] = {}
|
|
|
|
# Honeypot SSH PROMPT_COMMAND lines are double-wrapped (Docker
|
|
# stdout envelope around the inner ``logger --msgid command`` line).
|
|
# Outer MSGID is NIL; the real MSGID is inside the body. Detect
|
|
# the inner shape and re-extract HOSTNAME / APP-NAME / MSGID /
|
|
# remainder so downstream extraction sees the real header.
|
|
if event_type == "-" and sd_rest.startswith("-"):
|
|
body = sd_rest[1:].lstrip()
|
|
inner = _INNER_RFC5424_RE.match(body)
|
|
if inner is not None:
|
|
_i_ts, i_host, i_app, i_msgid, i_rest = inner.groups()
|
|
decky = i_host
|
|
service = i_app
|
|
event_type = i_msgid
|
|
sd_rest = i_rest
|
|
|
|
msg: str = ""
|
|
if sd_rest.startswith("-"):
|
|
msg = sd_rest[1:].lstrip()
|
|
elif sd_rest.startswith("["):
|
|
block = _SD_BLOCK_RE.search(sd_rest)
|
|
if block:
|
|
for k, v in _PARAM_RE.findall(block.group(1)):
|
|
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
|
|
# Always recover the post-SD message tail, even when the SD
|
|
# block isn't ``relay@55555`` (e.g. the ``timeQuality`` block
|
|
# syslog auto-emits on bash CMD lines). Without this the body
|
|
# of unwrapped PROMPT_COMMAND lines stays empty and the
|
|
# attacker_ip kv-fallback below has nothing to scan.
|
|
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
|
|
if msg_match:
|
|
msg = msg_match.group(1).strip()
|
|
else:
|
|
msg = sd_rest
|
|
|
|
attacker_ip = "Unknown"
|
|
for fname in _IP_FIELDS:
|
|
if fname in fields:
|
|
attacker_ip = fields[fname]
|
|
break
|
|
|
|
# Fallback for plain `logger` callers that don't use SD params (notably
|
|
# the SSH container's bash PROMPT_COMMAND: `logger -t bash "CMD … src=IP …"`).
|
|
# Scan the MSG body for IP-shaped `key=value` tokens ONLY — don't fold
|
|
# them into `fields`, because the frontend's parseEventBody already
|
|
# renders kv pairs from the msg and doubling them up produces noisy
|
|
# duplicate pills. This keeps attacker attribution working without
|
|
# changing the shape of `fields` for non-SD lines.
|
|
if attacker_ip == "Unknown" and msg:
|
|
for k, v in _MSG_KV_RE.findall(msg):
|
|
if k in _IP_FIELDS:
|
|
attacker_ip = v
|
|
break
|
|
|
|
# Final fallback for native syslog producers that emit free-form prose
|
|
# (notably sshd and pam_unix routed via rsyslog without the relay@55555
|
|
# SD wrapper). Prefer anchored matches so the local listener address in
|
|
# "Connection from X port Y on Z port 22" never wins over X.
|
|
if attacker_ip == "Unknown" and msg:
|
|
anchored = _MSG_IP_ANCHORED_RE.search(msg)
|
|
if anchored:
|
|
attacker_ip = anchored.group(1)
|
|
else:
|
|
bare = _MSG_IP_BARE_RE.search(msg)
|
|
if bare:
|
|
attacker_ip = bare.group(1)
|
|
|
|
try:
|
|
ts_formatted = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
|
|
except ValueError:
|
|
ts_formatted = ts_raw
|
|
|
|
# Free-form bash PROMPT_COMMAND lines (MSGID=NIL, body starts with
|
|
# "CMD ") get event_type rewritten to "command". `fields` stays empty
|
|
# so the frontend's msg-based pill rendering doesn't double up.
|
|
if event_type == "-" and msg.startswith("CMD "):
|
|
event_type = "command"
|
|
|
|
return {
|
|
"timestamp": ts_formatted,
|
|
"decky": decky,
|
|
"service": service,
|
|
"event_type": event_type,
|
|
"attacker_ip": attacker_ip,
|
|
"fields": fields,
|
|
"msg": msg,
|
|
"raw_line": line,
|
|
}
|
|
|
|
|
|
# ─── Container helpers ────────────────────────────────────────────────────────
|
|
|
|
def _load_service_container_names() -> set[str]:
|
|
"""
|
|
Return the exact set of service container names from decnet-state.json.
|
|
Format: {decky_name}-{service_name}, e.g. 'omega-decky-smtp'.
|
|
Returns an empty set if no state file exists.
|
|
"""
|
|
from decnet.config import load_state
|
|
state = load_state()
|
|
if state is None:
|
|
return set()
|
|
config, _ = state
|
|
names: set[str] = set()
|
|
for decky in config.deckies:
|
|
for svc in decky.services:
|
|
names.add(f"{decky.name}-{svc.replace('_', '-')}")
|
|
return names
|
|
|
|
|
|
_TOPOLOGY_SERVICE_LABEL = "decnet.topology.service"
|
|
_FLEET_SERVICE_LABEL = "decnet.fleet.service"
|
|
|
|
|
|
def _has_decnet_service_label(labels: Optional[dict]) -> bool:
|
|
"""Recognize both fleet (``decnet.fleet.service``, set by
|
|
``decnet/composer.py``) and MazeNET topology (``decnet.topology.service``,
|
|
set by ``decnet/topology/compose.py``) containers.
|
|
|
|
Label-based detection is the canonical path: it's stateless and avoids
|
|
the race between ``docker compose up`` and the ``decnet-state.json``
|
|
write that previously caused freshly-deployed fleet containers to be
|
|
silently dropped by the docker-events watcher.
|
|
"""
|
|
if not labels:
|
|
return False
|
|
return (
|
|
labels.get(_TOPOLOGY_SERVICE_LABEL) == "true"
|
|
or labels.get(_FLEET_SERVICE_LABEL) == "true"
|
|
)
|
|
|
|
|
|
def is_service_container(container) -> bool:
|
|
"""Return True if this Docker container is a known DECNET service container.
|
|
|
|
Label-based detection is preferred (works for both fleet and MazeNET
|
|
topology containers without touching decnet-state.json). The
|
|
state-file name match remains as a fallback so containers built from
|
|
older composes — which predate the ``decnet.fleet.service`` label —
|
|
are still picked up.
|
|
"""
|
|
if isinstance(container, str):
|
|
return container.lstrip("/") in _load_service_container_names()
|
|
labels: Optional[dict] = None
|
|
attrs = getattr(container, "attrs", None)
|
|
if isinstance(attrs, dict):
|
|
labels = (attrs.get("Config") or {}).get("Labels")
|
|
if labels is None:
|
|
labels = getattr(container, "labels", None)
|
|
if _has_decnet_service_label(labels):
|
|
return True
|
|
# Fallback: legacy containers without labels still match by name.
|
|
name = container.name.lstrip("/")
|
|
return name in _load_service_container_names()
|
|
|
|
|
|
def is_service_event(attrs: dict) -> bool:
|
|
"""Return True if a Docker start event is for a known DECNET service container.
|
|
|
|
Docker start-event attrs flatten every container label alongside the
|
|
``name``/``image`` keys — no separate ``labels`` sub-dict — so label
|
|
detection happens directly on ``attrs``.
|
|
|
|
Prefer the label path because it's race-free with respect to the
|
|
``decnet-state.json`` write that ``decnet deploy`` performs around
|
|
``docker compose up``: a freshly-started container's start event can
|
|
arrive before the state file has been updated, and the legacy
|
|
name-based fallback would then drop the event.
|
|
"""
|
|
if _has_decnet_service_label(attrs):
|
|
return True
|
|
name = attrs.get("name", "").lstrip("/")
|
|
return name in _load_service_container_names()
|
|
|
|
|
|
# ─── Blocking stream worker (runs in a thread) ────────────────────────────────
|
|
|
|
def _reopen_if_needed(path: Path, fh: Optional[Any]) -> Any:
|
|
"""Return fh if it still points to the same inode as path; otherwise close
|
|
fh and open a fresh handle. Handles the file being deleted (manual rm) or
|
|
rotated (logrotate rename + create)."""
|
|
try:
|
|
if fh is not None and os.fstat(fh.fileno()).st_ino == os.stat(path).st_ino:
|
|
return fh
|
|
except OSError:
|
|
pass
|
|
# File gone or inode changed — close stale handle and open a new one.
|
|
if fh is not None:
|
|
try:
|
|
fh.close()
|
|
except Exception: # nosec B110 — best-effort file handle cleanup
|
|
pass
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
return open(path, "a", encoding="utf-8")
|
|
|
|
|
|
@_traced("collector.stream_container")
|
|
def _stream_container(
|
|
container_id: str,
|
|
log_path: Path,
|
|
json_path: Path,
|
|
publish_fn: CollectorPublishFn | None = None,
|
|
) -> None:
|
|
"""Stream logs from one container and append to the host log files."""
|
|
import docker
|
|
|
|
lf: Optional[Any] = None
|
|
jf: Optional[Any] = None
|
|
try:
|
|
client = docker.from_env()
|
|
container = client.containers.get(container_id)
|
|
log_stream = container.logs(stream=True, follow=True, stdout=True, stderr=False)
|
|
buf = ""
|
|
for chunk in log_stream:
|
|
buf += chunk.decode("utf-8", errors="replace")
|
|
while "\n" in buf:
|
|
line, buf = buf.split("\n", 1)
|
|
line = line.rstrip()
|
|
if not line:
|
|
continue
|
|
lf = _reopen_if_needed(log_path, lf)
|
|
lf.write(line + "\n")
|
|
lf.flush()
|
|
parsed = parse_rfc5424(line)
|
|
if parsed:
|
|
if _should_ingest(parsed):
|
|
_tracer = _get_tracer("collector")
|
|
with _tracer.start_as_current_span("collector.event") as _span:
|
|
_span.set_attribute("decky", parsed.get("decky", ""))
|
|
_span.set_attribute("service", parsed.get("service", ""))
|
|
_span.set_attribute("event_type", parsed.get("event_type", ""))
|
|
_span.set_attribute("attacker_ip", parsed.get("attacker_ip", ""))
|
|
_inject_ctx(parsed)
|
|
logger.debug("collector: event written decky=%s type=%s", parsed.get("decky"), parsed.get("event_type"))
|
|
jf = _reopen_if_needed(json_path, jf)
|
|
jf.write(json.dumps(parsed) + "\n")
|
|
jf.flush()
|
|
if publish_fn is not None:
|
|
try:
|
|
publish_fn(parsed)
|
|
except Exception as exc:
|
|
logger.debug(
|
|
"collector: bus publish failed: %s", exc,
|
|
)
|
|
else:
|
|
logger.debug(
|
|
"collector: rate-limited decky=%s service=%s type=%s attacker=%s",
|
|
parsed.get("decky"), parsed.get("service"),
|
|
parsed.get("event_type"), parsed.get("attacker_ip"),
|
|
)
|
|
else:
|
|
logger.debug("collector: malformed RFC5424 line snippet=%r", line[:80])
|
|
except Exception as exc:
|
|
logger.debug("collector: log stream ended container_id=%s reason=%s", container_id, exc)
|
|
finally:
|
|
for fh in (lf, jf):
|
|
if fh is not None:
|
|
try:
|
|
fh.close()
|
|
except Exception: # nosec B110 — best-effort file handle cleanup
|
|
pass
|
|
|
|
|
|
# ─── Bus plumbing ─────────────────────────────────────────────────────────────
|
|
|
|
def _make_system_log_publisher(
|
|
bus: Any, loop: asyncio.AbstractEventLoop,
|
|
) -> CollectorPublishFn:
|
|
"""Factory: returns a ``publish_fn(parsed)`` for use by stream threads.
|
|
|
|
When *bus* is ``None`` the returned callable is a no-op, so the stream
|
|
thread can call it unconditionally. Otherwise each call is marshalled
|
|
onto *loop* (the asyncio event loop that owns the bus socket) via
|
|
``make_thread_safe_publisher``.
|
|
|
|
The same call also feeds a :class:`_SessionAggregator` so shell
|
|
commands are indexed per-attacker_ip and ``attacker.session.ended``
|
|
fires whenever the SSH ``sessrec`` worker logs ``session_recorded``.
|
|
"""
|
|
raw_publish = make_thread_safe_publisher(bus, loop) if bus is not None else None
|
|
if raw_publish is None:
|
|
return lambda _parsed: None
|
|
|
|
topic = _topics.system(_topics.SYSTEM_LOG)
|
|
aggregator = _SessionAggregator(raw_publish)
|
|
|
|
def _publish(parsed: dict[str, Any]) -> None:
|
|
event_type = parsed.get("event_type", "")
|
|
raw_publish(
|
|
topic,
|
|
{
|
|
"decky": parsed.get("decky", ""),
|
|
"service": parsed.get("service", ""),
|
|
"event_type": event_type,
|
|
"attacker_ip": parsed.get("attacker_ip", "Unknown"),
|
|
"timestamp": parsed.get("timestamp", ""),
|
|
},
|
|
event_type,
|
|
)
|
|
aggregator.add_event(parsed)
|
|
|
|
return _publish
|
|
|
|
|
|
# ─── Async collector ──────────────────────────────────────────────────────────
|
|
|
|
async def log_collector_worker(log_file: str) -> None:
|
|
"""
|
|
Background task: streams Docker logs from all running decky service
|
|
containers, writing RFC 5424 lines to log_file and parsed JSON records
|
|
to log_file.json for the ingester to consume.
|
|
|
|
Watches Docker events to pick up containers started after initial scan.
|
|
"""
|
|
import docker
|
|
|
|
log_path = Path(log_file)
|
|
json_path = log_path.with_suffix(".json")
|
|
log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
active: dict[str, asyncio.Task[None]] = {}
|
|
loop = asyncio.get_running_loop()
|
|
|
|
# Optional bus wiring — per-line system.log publish. Fan-in from many
|
|
# container-stream threads is handled by make_thread_safe_publisher,
|
|
# which marshals each publish onto this loop.
|
|
bus = None
|
|
try:
|
|
bus = get_bus(client_name="collector")
|
|
await bus.connect()
|
|
except Exception as exc:
|
|
logger.warning("collector: bus unavailable, continuing without publish: %s", exc)
|
|
bus = None
|
|
|
|
_publish_log = _make_system_log_publisher(bus, loop)
|
|
|
|
# Workers panel health heartbeat + bus-driven stop control. The
|
|
# heartbeat beacons on system.collector.health every 30s; the
|
|
# control listener translates a bus stop intent into a SIGTERM to
|
|
# this process (collector's main loop is a blocking thread pool, so
|
|
# self-signalling is cleaner than threading a shutdown event).
|
|
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "collector"))
|
|
control_task = asyncio.create_task(run_control_listener_signal(bus, "collector"))
|
|
|
|
# Periodic re-scan of running containers. Belt to the event-watcher's
|
|
# suspenders: if dockerd or the SDK ever drops a start event during a
|
|
# reconnect window (the retry loop in ``_watch_events`` covers the
|
|
# restart itself, but events fired *during* the gap are lost), this
|
|
# loop picks up the orphan within ``RECONCILE_INTERVAL_S``. Also
|
|
# prunes finished futures so ``active`` doesn't accumulate over the
|
|
# agent's lifetime as topology mutations churn containers.
|
|
_reconcile_interval_s = float(
|
|
os.environ.get("DECNET_COLLECTOR_RECONCILE_S", "30")
|
|
)
|
|
|
|
# Dedicated thread pool so long-running container log streams don't
|
|
# saturate the default asyncio executor and starve short-lived
|
|
# to_thread() calls elsewhere (e.g. load_state in the web API).
|
|
collector_pool = ThreadPoolExecutor(
|
|
max_workers=64, thread_name_prefix="decnet-collector",
|
|
)
|
|
|
|
def _spawn(container_id: str, container_name: str) -> None:
|
|
if container_id not in active or active[container_id].done():
|
|
active[container_id] = asyncio.ensure_future(
|
|
loop.run_in_executor(
|
|
collector_pool, _stream_container,
|
|
container_id, log_path, json_path, _publish_log,
|
|
),
|
|
loop=loop,
|
|
)
|
|
logger.info("collector: streaming container=%s", container_name)
|
|
|
|
try:
|
|
logger.info("collector started log_path=%s", log_path)
|
|
client = docker.from_env()
|
|
|
|
async def _reconcile_loop() -> None:
|
|
while True:
|
|
try:
|
|
await asyncio.sleep(_reconcile_interval_s)
|
|
# Drop done futures so the dict's bounded by the
|
|
# current container count, not lifetime churn.
|
|
for cid in [c for c, t in active.items() if t.done()]:
|
|
active.pop(cid, None)
|
|
containers = await loop.run_in_executor(
|
|
collector_pool,
|
|
lambda: list(client.containers.list()),
|
|
)
|
|
for container in containers:
|
|
if container.id in active:
|
|
continue
|
|
if is_service_container(container):
|
|
_spawn(container.id, container.name.lstrip("/"))
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as exc: # noqa: BLE001 — keep loop alive across SDK transients
|
|
logger.warning("collector: reconcile pass failed: %s", exc)
|
|
|
|
reconcile_task = asyncio.create_task(_reconcile_loop())
|
|
|
|
for container in client.containers.list():
|
|
if is_service_container(container):
|
|
_spawn(container.id, container.name.lstrip("/"))
|
|
|
|
def _watch_events() -> None:
|
|
# The dockerd event stream is the fast path for picking up
|
|
# newly-started service containers. It can break in two ways:
|
|
# (a) dockerd restart / reload severs the long-poll, (b) the
|
|
# SDK's JSON-stream decoder occasionally raises on a partial
|
|
# frame. Both used to make this thread return cleanly, leaving
|
|
# the collector "running" with no event subscription — future
|
|
# container starts were silently dropped until an operator
|
|
# restarted the unit. Retry with exponential backoff (cap at
|
|
# 30s, matching the heartbeat cadence) so dockerd hiccups are
|
|
# invisible to the operator. The reconcile loop is the safety
|
|
# net for any events lost during the reconnect window.
|
|
backoff = 1.0
|
|
while True:
|
|
try:
|
|
for event in client.events(
|
|
decode=True,
|
|
filters={"type": "container", "event": "start"},
|
|
):
|
|
attrs = event.get("Actor", {}).get("Attributes", {})
|
|
cid = event.get("id", "")
|
|
name = attrs.get("name", "")
|
|
if cid and is_service_event(attrs):
|
|
loop.call_soon_threadsafe(_spawn, cid, name)
|
|
# Clean iterator exhaustion: real dockerd doesn't
|
|
# close the stream voluntarily, so this only
|
|
# happens in tests with mocked iterators or in
|
|
# genuinely unrecoverable daemon states. Either
|
|
# way, returning lets the worker shut down
|
|
# cleanly — the reconciler is the safety net for
|
|
# productive cases.
|
|
return
|
|
except Exception as exc: # noqa: BLE001 — SDK leaks bare Exceptions on stream-decode errors
|
|
logger.warning(
|
|
"collector: event stream broke (%s: %s); reconnecting in %.1fs",
|
|
type(exc).__name__, exc, backoff,
|
|
)
|
|
time.sleep(backoff)
|
|
backoff = min(backoff * 2, 30.0)
|
|
|
|
await loop.run_in_executor(collector_pool, _watch_events)
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info("collector shutdown requested cancelling %d tasks", len(active))
|
|
for task in active.values():
|
|
task.cancel()
|
|
collector_pool.shutdown(wait=False)
|
|
raise
|
|
except Exception as exc:
|
|
logger.error("collector error: %s", exc)
|
|
finally:
|
|
collector_pool.shutdown(wait=False)
|
|
# `reconcile_task` may not exist if startup failed before
|
|
# `client = docker.from_env()` returned; tolerate that.
|
|
_maintenance_tasks = [heartbeat_task, control_task]
|
|
if "reconcile_task" in locals():
|
|
_maintenance_tasks.append(reconcile_task)
|
|
for t in _maintenance_tasks:
|
|
t.cancel()
|
|
with contextlib.suppress(Exception, asyncio.CancelledError):
|
|
await t
|
|
if bus is not None:
|
|
with contextlib.suppress(Exception):
|
|
await bus.close()
|