feat: replace bind-mount log pipeline with Docker log streaming

Services now print RFC 5424 to stdout; Docker captures via json-file driver.
A new host-side collector (decnet.web.collector) streams docker logs from all
running decky service containers and writes RFC 5424 + parsed JSON to the host
log file. The existing ingester continues to tail the .json file unchanged.
rsyslog can consume the .log file independently — no DECNET involvement needed.

Removes: bind-mount volume injection, _LOG_NETWORK bridge, log_target config
field and --log-target CLI flag, TCP syslog forwarding from service templates.
This commit is contained in:
2026-04-10 00:14:14 -04:00
parent 8d023147cc
commit 25ba3fb56a
11 changed files with 368 additions and 328 deletions

View File

@@ -1,23 +1,26 @@
import asyncio
import logging
import os
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Optional
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from decnet.env import DECNET_CORS_ORIGINS, DECNET_DEVELOPER
from decnet.env import DECNET_CORS_ORIGINS, DECNET_DEVELOPER, DECNET_INGEST_LOG_FILE
from decnet.web.dependencies import repo
from decnet.web.collector import log_collector_worker
from decnet.web.ingester import log_ingestion_worker
from decnet.web.router import api_router
log = logging.getLogger(__name__)
ingestion_task: Optional[asyncio.Task[Any]] = None
collector_task: Optional[asyncio.Task[Any]] = None
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
global ingestion_task
global ingestion_task, collector_task
for attempt in range(1, 6):
try:
@@ -28,16 +31,24 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
if attempt == 5:
log.error("DB failed to initialize after 5 attempts — startup may be degraded")
await asyncio.sleep(0.5)
# Start background ingestion task
if ingestion_task is None or ingestion_task.done():
ingestion_task = asyncio.create_task(log_ingestion_worker(repo))
# Start Docker log collector (writes to log file; ingester reads from it)
_log_file = os.environ.get("DECNET_INGEST_LOG_FILE", DECNET_INGEST_LOG_FILE)
if _log_file and (collector_task is None or collector_task.done()):
collector_task = asyncio.create_task(log_collector_worker(_log_file))
else:
log.warning("DECNET_INGEST_LOG_FILE not set — Docker log collection disabled.")
yield
# Shutdown ingestion task
if ingestion_task:
ingestion_task.cancel()
# Shutdown background tasks
for task in (ingestion_task, collector_task):
if task:
task.cancel()
app: FastAPI = FastAPI(

180
decnet/web/collector.py Normal file
View File

@@ -0,0 +1,180 @@
"""
Host-side Docker log collector.
Streams stdout from all running decky service containers via the Docker SDK,
writes RFC 5424 lines to <log_file> and parsed JSON records to <log_file>.json.
The ingester tails the .json file; rsyslog can consume the .log file independently.
"""
import asyncio
import json
import logging
import re
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
logger = logging.getLogger("decnet.web.collector")
# ─── RFC 5424 parser ──────────────────────────────────────────────────────────
_RFC5424_RE = re.compile(
r"^<\d+>1 "
r"(\S+) " # 1: TIMESTAMP
r"(\S+) " # 2: HOSTNAME (decky name)
r"(\S+) " # 3: APP-NAME (service)
r"- " # PROCID always NILVALUE
r"(\S+) " # 4: MSGID (event_type)
r"(.+)$", # 5: SD element + optional MSG
)
_SD_BLOCK_RE = re.compile(r'\[decnet@55555\s+(.*?)\]', re.DOTALL)
_PARAM_RE = re.compile(r'(\w+)="((?:[^"\\]|\\.)*)"')
_IP_FIELDS = ("src_ip", "src", "client_ip", "remote_ip", "ip")
def parse_rfc5424(line: str) -> Optional[dict[str, Any]]:
"""
Parse an RFC 5424 DECNET log line into a structured dict.
Returns None if the line does not match the expected format.
"""
m = _RFC5424_RE.match(line)
if not m:
return None
ts_raw, decky, service, event_type, sd_rest = m.groups()
fields: dict[str, str] = {}
msg: str = ""
if sd_rest.startswith("-"):
msg = sd_rest[1:].lstrip()
elif sd_rest.startswith("["):
block = _SD_BLOCK_RE.search(sd_rest)
if block:
for k, v in _PARAM_RE.findall(block.group(1)):
fields[k] = v.replace('\\"', '"').replace("\\\\", "\\").replace("\\]", "]")
msg_match = re.search(r'\]\s+(.+)$', sd_rest)
if msg_match:
msg = msg_match.group(1).strip()
else:
msg = sd_rest
attacker_ip = "Unknown"
for fname in _IP_FIELDS:
if fname in fields:
attacker_ip = fields[fname]
break
try:
ts_formatted = datetime.fromisoformat(ts_raw).strftime("%Y-%m-%d %H:%M:%S")
except ValueError:
ts_formatted = ts_raw
return {
"timestamp": ts_formatted,
"decky": decky,
"service": service,
"event_type": event_type,
"attacker_ip": attacker_ip,
"fields": fields,
"msg": msg,
"raw_line": line,
}
# ─── Container helpers ────────────────────────────────────────────────────────
def is_service_container(name: str) -> bool:
"""
Return True for decky service containers (decky-NN-service).
Base containers (decky-NN, which run sleep infinity) return False.
"""
return bool(re.match(r'^decky-\d+-\w', name.lstrip("/")))
# ─── Blocking stream worker (runs in a thread) ────────────────────────────────
def _stream_container(container_id: str, log_path: Path, json_path: Path) -> None:
"""Stream logs from one container and append to the host log files."""
import docker # type: ignore[import]
try:
client = docker.from_env()
container = client.containers.get(container_id)
log_stream = container.logs(stream=True, follow=True, stdout=True, stderr=False)
buf = ""
with (
open(log_path, "a", encoding="utf-8") as lf,
open(json_path, "a", encoding="utf-8") as jf,
):
for chunk in log_stream:
buf += chunk.decode("utf-8", errors="replace")
while "\n" in buf:
line, buf = buf.split("\n", 1)
line = line.rstrip()
if not line:
continue
lf.write(line + "\n")
lf.flush()
parsed = parse_rfc5424(line)
if parsed:
jf.write(json.dumps(parsed) + "\n")
jf.flush()
except Exception as exc:
logger.debug("Log stream ended for container %s: %s", container_id, exc)
# ─── Async collector ──────────────────────────────────────────────────────────
async def log_collector_worker(log_file: str) -> None:
"""
Background task: streams Docker logs from all running decky service
containers, writing RFC 5424 lines to log_file and parsed JSON records
to log_file.json for the ingester to consume.
Watches Docker events to pick up containers started after initial scan.
"""
import docker # type: ignore[import]
log_path = Path(log_file)
json_path = log_path.with_suffix(".json")
log_path.parent.mkdir(parents=True, exist_ok=True)
active: dict[str, asyncio.Task[None]] = {}
loop = asyncio.get_running_loop()
def _spawn(container_id: str, container_name: str) -> None:
if container_id not in active or active[container_id].done():
active[container_id] = asyncio.ensure_future(
asyncio.to_thread(_stream_container, container_id, log_path, json_path),
loop=loop,
)
logger.info("Collecting logs from container: %s", container_name)
try:
client = docker.from_env()
# Collect from already-running containers
for container in client.containers.list():
name = container.name.lstrip("/")
if is_service_container(name):
_spawn(container.id, name)
# Watch for new containers starting
def _watch_events() -> None:
for event in client.events(
decode=True,
filters={"type": "container", "event": "start"},
):
name = event.get("Actor", {}).get("Attributes", {}).get("name", "")
cid = event.get("id", "")
if cid and is_service_container(name):
loop.call_soon_threadsafe(_spawn, cid, name)
await asyncio.to_thread(_watch_events)
except asyncio.CancelledError:
for task in active.values():
task.cancel()
raise
except Exception as exc:
logger.error("Collector error: %s", exc)

View File

@@ -50,7 +50,6 @@ async def api_deploy_deckies(req: DeployIniRequest, current_user: str = Depends(
subnet=subnet_cidr,
gateway=gateway,
deckies=[],
log_target=ini.log_target,
log_file=ingest_log_file,
ipvlan=False,
mutate_interval=ini.mutate_interval or DEFAULT_MUTATE_INTERVAL