fix(api): gate embedded Docker-log collector on DECNET_EMBED_COLLECTOR
The API lifespan unconditionally spawned log_collector_worker, appending every container line to DECNET_INGEST_LOG_FILE. On hosts that also run decnet-collector.service (installed by 'decnet init') that's two tailers writing the same events to the same file — the ingester then inserts each event twice and the dashboard shows every command duplicated. Add DECNET_EMBED_COLLECTOR (default false), matching the existing DECNET_EMBED_PROFILER and DECNET_EMBED_SNIFFER pattern directly above this block. Single-process dev setups without systemd can flip it on to restore the all-in-one behaviour; multi-process production gets the single-writer invariant by default.
This commit is contained in:
@@ -70,6 +70,15 @@ DECNET_EMBED_PROFILER: bool = os.environ.get("DECNET_EMBED_PROFILER", "").lower(
|
||||
# workers sniffing the same interface — duplicated events and wasted CPU.
|
||||
DECNET_EMBED_SNIFFER: bool = os.environ.get("DECNET_EMBED_SNIFFER", "").lower() == "true"
|
||||
|
||||
# Set to "true" to embed the Docker log collector inside the API process.
|
||||
# Leave unset (default) when `decnet-collector.service` (or a standalone
|
||||
# `decnet collect --daemon`) is running — embedding both yields two
|
||||
# tailers appending every container log line to the ingest file, which
|
||||
# the ingester then inserts into the DB twice. Single-process dev
|
||||
# setups without systemd units can flip this on to get the old all-in
|
||||
# -one behaviour.
|
||||
DECNET_EMBED_COLLECTOR: bool = os.environ.get("DECNET_EMBED_COLLECTOR", "").lower() == "true"
|
||||
|
||||
# Set to "true" to mount the Pyinstrument ASGI middleware on the FastAPI app.
|
||||
# Produces per-request HTML flamegraphs under ./profiles/. Off by default so
|
||||
# production and normal dev runs pay zero profiling overhead.
|
||||
|
||||
@@ -14,6 +14,7 @@ from fastapi.middleware.cors import CORSMiddleware
|
||||
from decnet.env import (
|
||||
DECNET_CORS_ORIGINS,
|
||||
DECNET_DEVELOPER,
|
||||
DECNET_EMBED_COLLECTOR,
|
||||
DECNET_EMBED_PROFILER,
|
||||
DECNET_EMBED_SNIFFER,
|
||||
DECNET_INGEST_LOG_FILE,
|
||||
@@ -87,13 +88,25 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
ingestion_task = asyncio.create_task(log_ingestion_worker(repo))
|
||||
log.debug("API startup ingest worker started")
|
||||
|
||||
# Start Docker log collector (writes to log file; ingester reads from it)
|
||||
# Start Docker log collector (writes to log file; ingester reads from it).
|
||||
# Gated on DECNET_EMBED_COLLECTOR: when `decnet-collector.service` (or
|
||||
# any other standalone collector) is running, embedding a second tailer
|
||||
# here writes every container line twice — the ingester then inserts
|
||||
# the same event into the DB twice, which surfaces as duplicate rows
|
||||
# on the dashboard.
|
||||
_log_file = os.environ.get("DECNET_INGEST_LOG_FILE", DECNET_INGEST_LOG_FILE)
|
||||
if DECNET_EMBED_COLLECTOR:
|
||||
if _log_file and (collector_task is None or collector_task.done()):
|
||||
collector_task = asyncio.create_task(log_collector_worker(_log_file))
|
||||
log.debug("API startup collector worker started log_file=%s", _log_file)
|
||||
log.info(
|
||||
"API startup: embedded collector started "
|
||||
"(DECNET_EMBED_COLLECTOR=true) log_file=%s",
|
||||
_log_file,
|
||||
)
|
||||
elif not _log_file:
|
||||
log.warning("DECNET_INGEST_LOG_FILE not set — Docker log collection disabled.")
|
||||
log.warning("DECNET_INGEST_LOG_FILE not set — embedded collector disabled.")
|
||||
else:
|
||||
log.debug("API startup: collector not embedded — expecting standalone daemon")
|
||||
|
||||
# Start attacker profile rebuild worker only when explicitly requested.
|
||||
# Default is OFF because `decnet deploy` always starts a standalone
|
||||
|
||||
Reference in New Issue
Block a user