feat: add component-aware RFC 5424 application logging system
- Modify Rfc5424Formatter to read decnet_component from LogRecord
and use it as RFC 5424 APP-NAME field (falls back to 'decnet')
- Add get_logger(component) factory in decnet/logging/__init__.py
with _ComponentFilter that injects decnet_component on each record
- Wire all five layers to their component tag:
cli -> 'cli', engine -> 'engine', api -> 'api' (api.py, ingester,
routers), mutator -> 'mutator', collector -> 'collector'
- Add structured INFO/DEBUG/WARNING/ERROR log calls throughout each
layer per the defined vocabulary; DEBUG calls are suppressed unless
DECNET_DEVELOPER=true
- Add tests/test_logging.py covering factory, filter, formatter
component-awareness, fallback behaviour, and level gating
This commit is contained in:
@@ -1,5 +1,4 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Any, AsyncGenerator, Optional
|
||||
@@ -11,12 +10,13 @@ from pydantic import ValidationError
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
|
||||
from decnet.env import DECNET_CORS_ORIGINS, DECNET_DEVELOPER, DECNET_INGEST_LOG_FILE
|
||||
from decnet.logging import get_logger
|
||||
from decnet.web.dependencies import repo
|
||||
from decnet.collector import log_collector_worker
|
||||
from decnet.web.ingester import log_ingestion_worker
|
||||
from decnet.web.router import api_router
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
log = get_logger("api")
|
||||
ingestion_task: Optional[asyncio.Task[Any]] = None
|
||||
collector_task: Optional[asyncio.Task[Any]] = None
|
||||
|
||||
@@ -25,9 +25,11 @@ collector_task: Optional[asyncio.Task[Any]] = None
|
||||
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
global ingestion_task, collector_task
|
||||
|
||||
log.info("API startup initialising database")
|
||||
for attempt in range(1, 6):
|
||||
try:
|
||||
await repo.initialize()
|
||||
log.debug("API startup DB initialised attempt=%d", attempt)
|
||||
break
|
||||
except Exception as exc:
|
||||
log.warning("DB init attempt %d/5 failed: %s", attempt, exc)
|
||||
@@ -40,11 +42,13 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
# Start background ingestion task
|
||||
if ingestion_task is None or ingestion_task.done():
|
||||
ingestion_task = asyncio.create_task(log_ingestion_worker(repo))
|
||||
log.debug("API startup ingest worker started")
|
||||
|
||||
# Start Docker log collector (writes to log file; ingester reads from it)
|
||||
_log_file = os.environ.get("DECNET_INGEST_LOG_FILE", DECNET_INGEST_LOG_FILE)
|
||||
if _log_file and (collector_task is None or collector_task.done()):
|
||||
collector_task = asyncio.create_task(log_collector_worker(_log_file))
|
||||
log.debug("API startup collector worker started log_file=%s", _log_file)
|
||||
elif not _log_file:
|
||||
log.warning("DECNET_INGEST_LOG_FILE not set — Docker log collection disabled.")
|
||||
else:
|
||||
@@ -52,7 +56,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
|
||||
yield
|
||||
|
||||
# Shutdown background tasks
|
||||
log.info("API shutdown cancelling background tasks")
|
||||
for task in (ingestion_task, collector_task):
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
@@ -62,6 +66,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||
pass
|
||||
except Exception as exc:
|
||||
log.warning("Task shutdown error: %s", exc)
|
||||
log.info("API shutdown complete")
|
||||
|
||||
|
||||
app: FastAPI = FastAPI(
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
import asyncio
|
||||
import os
|
||||
import logging
|
||||
import json
|
||||
from typing import Any
|
||||
from pathlib import Path
|
||||
|
||||
from decnet.logging import get_logger
|
||||
from decnet.web.db.repository import BaseRepository
|
||||
|
||||
logger: logging.Logger = logging.getLogger("decnet.web.ingester")
|
||||
logger = get_logger("api")
|
||||
|
||||
async def log_ingestion_worker(repo: BaseRepository) -> None:
|
||||
"""
|
||||
@@ -22,7 +22,7 @@ async def log_ingestion_worker(repo: BaseRepository) -> None:
|
||||
_json_log_path: Path = Path(_base_log_file).with_suffix(".json")
|
||||
_position: int = 0
|
||||
|
||||
logger.info(f"Starting JSON log ingestion from {_json_log_path}")
|
||||
logger.info("ingest worker started path=%s", _json_log_path)
|
||||
|
||||
while True:
|
||||
try:
|
||||
@@ -53,10 +53,11 @@ async def log_ingestion_worker(repo: BaseRepository) -> None:
|
||||
|
||||
try:
|
||||
_log_data: dict[str, Any] = json.loads(_line.strip())
|
||||
logger.debug("ingest: record decky=%s event_type=%s", _log_data.get("decky"), _log_data.get("event_type"))
|
||||
await repo.add_log(_log_data)
|
||||
await _extract_bounty(repo, _log_data)
|
||||
except json.JSONDecodeError:
|
||||
logger.error(f"Failed to decode JSON log line: {_line}")
|
||||
logger.error("ingest: failed to decode JSON log line: %s", _line.strip())
|
||||
continue
|
||||
|
||||
# Update position after successful line read
|
||||
@@ -65,10 +66,10 @@ async def log_ingestion_worker(repo: BaseRepository) -> None:
|
||||
except Exception as _e:
|
||||
_err_str = str(_e).lower()
|
||||
if "no such table" in _err_str or "no active connection" in _err_str or "connection closed" in _err_str:
|
||||
logger.error(f"Post-shutdown or fatal DB error in ingester: {_e}")
|
||||
logger.error("ingest: post-shutdown or fatal DB error: %s", _e)
|
||||
break # Exit worker — DB is gone or uninitialized
|
||||
|
||||
logger.error(f"Error in log ingestion worker: {_e}")
|
||||
logger.error("ingest: error in worker: %s", _e)
|
||||
await asyncio.sleep(5)
|
||||
|
||||
await asyncio.sleep(1)
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
from fastapi import APIRouter, Depends, HTTPException
|
||||
|
||||
from decnet.config import DEFAULT_MUTATE_INTERVAL, DecnetConfig, _ROOT, log
|
||||
from decnet.logging import get_logger
|
||||
from decnet.config import DEFAULT_MUTATE_INTERVAL, DecnetConfig, _ROOT
|
||||
|
||||
log = get_logger("api")
|
||||
from decnet.engine import deploy as _deploy
|
||||
from decnet.ini_loader import load_ini_from_string
|
||||
from decnet.network import detect_interface, detect_subnet, get_host_ip
|
||||
@@ -100,7 +102,7 @@ async def api_deploy_deckies(req: DeployIniRequest, current_user: str = Depends(
|
||||
}
|
||||
await repo.set_state("deployment", new_state_payload)
|
||||
except Exception as e:
|
||||
logging.getLogger("decnet.web.api").exception("Deployment failed: %s", e)
|
||||
log.exception("Deployment failed: %s", e)
|
||||
raise HTTPException(status_code=500, detail="Deployment failed. Check server logs for details.")
|
||||
|
||||
return {"message": "Deckies deployed successfully"}
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
import json
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import AsyncGenerator, Optional
|
||||
|
||||
from fastapi import APIRouter, Depends, Query, Request
|
||||
from fastapi.responses import StreamingResponse
|
||||
|
||||
from decnet.env import DECNET_DEVELOPER
|
||||
from decnet.logging import get_logger
|
||||
from decnet.web.dependencies import get_stream_user, repo
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
log = get_logger("api")
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user