refactor: separate engine, collector, mutator, and fleet into independent subpackages
- decnet/engine/ — container lifecycle (deploy, teardown, status); _kill_api removed - decnet/collector/ — Docker log streaming (moved from web/collector.py) - decnet/mutator/ — mutation engine (no longer imports from cli or duplicates deployer code) - decnet/fleet.py — shared decky-building logic extracted from cli.py Cross-contamination eliminated: - web router no longer imports from decnet.cli - mutator no longer imports from decnet.cli - cli no longer imports from decnet.web - _kill_api() moved to cli (process management, not engine concern) - _compose_with_retry duplicate removed from mutator
This commit is contained in:
13
decnet/collector/__init__.py
Normal file
13
decnet/collector/__init__.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from decnet.collector.worker import (
|
||||
is_service_container,
|
||||
is_service_event,
|
||||
log_collector_worker,
|
||||
parse_rfc5424,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"is_service_container",
|
||||
"is_service_event",
|
||||
"log_collector_worker",
|
||||
"parse_rfc5424",
|
||||
]
|
||||
200
decnet/collector/worker.py
Normal file
200
decnet/collector/worker.py
Normal file
@@ -0,0 +1,200 @@
|
||||
"""
|
||||
Host-side Docker log collector.
|
||||
|
||||
Streams stdout from all running decky service containers via the Docker SDK,
|
||||
writes RFC 5424 lines to <log_file> and parsed JSON records to <log_file>.json.
|
||||
The ingester tails the .json file; rsyslog can consume the .log file independently.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
logger = logging.getLogger("decnet.collector")
|
||||
|
||||
# ─── RFC 5424 parser ──────────────────────────────────────────────────────────
|
||||
|
||||
_RFC5424_RE = re.compile(
|
||||
r"^<\d+>1 "
|
||||
r"(\S+) " # 1: TIMESTAMP
|
||||
r"(\S+) " # 2: HOSTNAME (decky name)
|
||||
r"(\S+) " # 3: APP-NAME (service)
|
||||
r"- " # PROCID always NILVALUE
|
||||
r"(\S+) " # 4: MSGID (event_type)
|
||||
r"(.+)$", # 5: SD element + optional MSG
|
||||
)
|
||||
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
|
||||
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
|
||||
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
|
||||
|
||||
|
||||
def parse_rfc5424(line: str) -> Optional[dict[str, Any]]:
|
||||
"""
|
||||
Parse an RFC 5424 DECNET log line into a structured dict.
|
||||
Returns None if the line does not match the expected format.
|
||||
"""
|
||||
m = _RFC5424_RE.match(line)
|
||||
if not m:
|
||||
return None
|
||||
ts_raw, decky, service, event_type, sd_rest = m.groups()
|
||||
|
||||
fields: dict[str, str] = {}
|
||||
msg: str = ""
|
||||
|
||||
if sd_rest.startswith("-"):
|
||||
msg = sd_rest[1:].lstrip()
|
||||
elif sd_rest.startswith("["):
|
||||
block = _SD_BLOCK_RE.search(sd_rest)
|
||||
if block:
|
||||
for k, v in _PARAM_RE.findall(block.group(1)):
|
||||
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
|
||||
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
|
||||
if msg_match:
|
||||
msg = msg_match.group(1).strip()
|
||||
else:
|
||||
msg = sd_rest
|
||||
|
||||
attacker_ip = "Unknown"
|
||||
for fname in _IP_FIELDS:
|
||||
if fname in fields:
|
||||
attacker_ip = fields[fname]
|
||||
break
|
||||
|
||||
try:
|
||||
ts_formatted = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
|
||||
except ValueError:
|
||||
ts_formatted = ts_raw
|
||||
|
||||
return {
|
||||
"timestamp": ts_formatted,
|
||||
"decky": decky,
|
||||
"service": service,
|
||||
"event_type": event_type,
|
||||
"attacker_ip": attacker_ip,
|
||||
"fields": fields,
|
||||
"msg": msg,
|
||||
"raw_line": line,
|
||||
}
|
||||
|
||||
|
||||
# ─── Container helpers ────────────────────────────────────────────────────────
|
||||
|
||||
def _load_service_container_names() -> set[str]:
|
||||
"""
|
||||
Return the exact set of service container names from decnet-state.json.
|
||||
Format: {decky_name}-{service_name}, e.g. 'omega-decky-smtp'.
|
||||
Returns an empty set if no state file exists.
|
||||
"""
|
||||
from decnet.config import load_state
|
||||
state = load_state()
|
||||
if state is None:
|
||||
return set()
|
||||
config, _ = state
|
||||
names: set[str] = set()
|
||||
for decky in config.deckies:
|
||||
for svc in decky.services:
|
||||
names.add(f"{decky.name}-{svc.replace('_', '-')}")
|
||||
return names
|
||||
|
||||
|
||||
def is_service_container(container) -> bool:
|
||||
"""Return True if this Docker container is a known DECNET service container."""
|
||||
name = (container if isinstance(container, str) else container.name).lstrip("/")
|
||||
return name in _load_service_container_names()
|
||||
|
||||
|
||||
def is_service_event(attrs: dict) -> bool:
|
||||
"""Return True if a Docker start event is for a known DECNET service container."""
|
||||
name = attrs.get("name", "").lstrip("/")
|
||||
return name in _load_service_container_names()
|
||||
|
||||
|
||||
# ─── Blocking stream worker (runs in a thread) ────────────────────────────────
|
||||
|
||||
def _stream_container(container_id: str, log_path: Path, json_path: Path) -> None:
|
||||
"""Stream logs from one container and append to the host log files."""
|
||||
import docker # type: ignore[import]
|
||||
|
||||
try:
|
||||
client = docker.from_env()
|
||||
container = client.containers.get(container_id)
|
||||
log_stream = container.logs(stream=True, follow=True, stdout=True, stderr=False)
|
||||
buf = ""
|
||||
with (
|
||||
open(log_path, "a", encoding="utf-8") as lf,
|
||||
open(json_path, "a", encoding="utf-8") as jf,
|
||||
):
|
||||
for chunk in log_stream:
|
||||
buf += chunk.decode("utf-8", errors="replace")
|
||||
while "\n" in buf:
|
||||
line, buf = buf.split("\n", 1)
|
||||
line = line.rstrip()
|
||||
if not line:
|
||||
continue
|
||||
lf.write(line + "\n")
|
||||
lf.flush()
|
||||
parsed = parse_rfc5424(line)
|
||||
if parsed:
|
||||
jf.write(json.dumps(parsed) + "\n")
|
||||
jf.flush()
|
||||
except Exception as exc:
|
||||
logger.debug("Log stream ended for container %s: %s", container_id, exc)
|
||||
|
||||
|
||||
# ─── Async collector ──────────────────────────────────────────────────────────
|
||||
|
||||
async def log_collector_worker(log_file: str) -> None:
|
||||
"""
|
||||
Background task: streams Docker logs from all running decky service
|
||||
containers, writing RFC 5424 lines to log_file and parsed JSON records
|
||||
to log_file.json for the ingester to consume.
|
||||
|
||||
Watches Docker events to pick up containers started after initial scan.
|
||||
"""
|
||||
import docker # type: ignore[import]
|
||||
|
||||
log_path = Path(log_file)
|
||||
json_path = log_path.with_suffix(".json")
|
||||
log_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
active: dict[str, asyncio.Task[None]] = {}
|
||||
loop = asyncio.get_running_loop()
|
||||
|
||||
def _spawn(container_id: str, container_name: str) -> None:
|
||||
if container_id not in active or active[container_id].done():
|
||||
active[container_id] = asyncio.ensure_future(
|
||||
asyncio.to_thread(_stream_container, container_id, log_path, json_path),
|
||||
loop=loop,
|
||||
)
|
||||
logger.info("Collecting logs from container: %s", container_name)
|
||||
|
||||
try:
|
||||
client = docker.from_env()
|
||||
|
||||
for container in client.containers.list():
|
||||
if is_service_container(container):
|
||||
_spawn(container.id, container.name.lstrip("/"))
|
||||
|
||||
def _watch_events() -> None:
|
||||
for event in client.events(
|
||||
decode=True,
|
||||
filters={"type": "container", "event": "start"},
|
||||
):
|
||||
attrs = event.get("Actor", {}).get("Attributes", {})
|
||||
cid = event.get("id", "")
|
||||
name = attrs.get("name", "")
|
||||
if cid and is_service_event(attrs):
|
||||
loop.call_soon_threadsafe(_spawn, cid, name)
|
||||
|
||||
await asyncio.to_thread(_watch_events)
|
||||
|
||||
except asyncio.CancelledError:
|
||||
for task in active.values():
|
||||
task.cancel()
|
||||
raise
|
||||
except Exception as exc:
|
||||
logger.error("Collector error: %s", exc)
|
||||
Reference in New Issue
Block a user