Compare commits

...

2 Commits

Author SHA1 Message Date
448cb9cee0 chore: untrack .claude/settings.local.json (already covered by .gitignore)
Some checks failed
CI / Lint (ruff) (push) Has been cancelled
CI / SAST (bandit) (push) Has been cancelled
CI / Dependency audit (pip-audit) (push) Has been cancelled
CI / Test (Standard) (3.11) (push) Has been cancelled
CI / Test (Standard) (3.12) (push) Has been cancelled
CI / Test (Live) (3.11) (push) Has been cancelled
CI / Test (Fuzz) (3.11) (push) Has been cancelled
CI / Merge dev → testing (push) Has been cancelled
CI / Prepare Merge to Main (push) Has been cancelled
CI / Finalize Merge to Main (push) Has been cancelled
2026-04-13 07:45:12 -04:00
035499f255 feat: add component-aware RFC 5424 application logging system
- Modify Rfc5424Formatter to read decnet_component from LogRecord
  and use it as RFC 5424 APP-NAME field (falls back to 'decnet')
- Add get_logger(component) factory in decnet/logging/__init__.py
  with _ComponentFilter that injects decnet_component on each record
- Wire all five layers to their component tag:
    cli -> 'cli', engine -> 'engine', api -> 'api' (api.py, ingester,
    routers), mutator -> 'mutator', collector -> 'collector'
- Add structured INFO/DEBUG/WARNING/ERROR log calls throughout each
  layer per the defined vocabulary; DEBUG calls are suppressed unless
  DECNET_DEVELOPER=true
- Add tests/test_logging.py covering factory, filter, formatter
  component-awareness, fallback behaviour, and level gating
2026-04-13 07:39:01 -04:00
12 changed files with 270 additions and 49 deletions

View File

@@ -1,29 +0,0 @@
{
"permissions": {
"allow": [
"mcp__plugin_context-mode_context-mode__ctx_batch_execute",
"mcp__plugin_context-mode_context-mode__ctx_search",
"Bash(grep:*)",
"Bash(python -m pytest --tb=short -q)",
"Bash(pip install:*)",
"Bash(pip show:*)",
"Bash(python:*)",
"Bash(DECNET_JWT_SECRET=\"test-secret-xyz-1234!\" DECNET_ADMIN_PASSWORD=\"test-pass-xyz-1234!\" python:*)",
"Bash(ls /home/anti/Tools/DECNET/*.db* /home/anti/Tools/DECNET/test_*.db*)",
"mcp__plugin_context-mode_context-mode__ctx_execute_file",
"Bash(nc)",
"Bash(nmap:*)",
"Bash(ping -c1 -W2 192.168.1.200)",
"Bash(xxd)",
"Bash(curl -s http://192.168.1.200:2375/version)",
"Bash(python3 -m json.tool)",
"Bash(curl -s http://192.168.1.200:9200/)",
"Bash(docker image:*)",
"Read(//home/anti/Tools/cowrie/src/cowrie/data/txtcmds/**)",
"Read(//home/anti/Tools/cowrie/src/cowrie/data/txtcmds/bin/**)",
"mcp__plugin_context-mode_context-mode__ctx_index",
"Bash(ls:*)",
"mcp__plugin_context-mode_context-mode__ctx_execute"
]
}
}

View File

@@ -8,6 +8,7 @@ Usage:
decnet services decnet services
""" """
import logging
import signal import signal
from typing import Optional from typing import Optional
@@ -15,6 +16,7 @@ import typer
from rich.console import Console from rich.console import Console
from rich.table import Table from rich.table import Table
from decnet.logging import get_logger
from decnet.env import ( from decnet.env import (
DECNET_API_HOST, DECNET_API_HOST,
DECNET_API_PORT, DECNET_API_PORT,
@@ -32,6 +34,8 @@ from decnet.ini_loader import load_ini
from decnet.network import detect_interface, detect_subnet, allocate_ips, get_host_ip from decnet.network import detect_interface, detect_subnet, allocate_ips, get_host_ip
from decnet.services.registry import all_services from decnet.services.registry import all_services
log = get_logger("cli")
app = typer.Typer( app = typer.Typer(
name="decnet", name="decnet",
help="Deploy a deception network of honeypot deckies on your LAN.", help="Deploy a deception network of honeypot deckies on your LAN.",
@@ -77,6 +81,7 @@ def api(
import sys import sys
import os import os
log.info("API command invoked host=%s port=%d", host, port)
console.print(f"[green]Starting DECNET API on {host}:{port}...[/]") console.print(f"[green]Starting DECNET API on {host}:{port}...[/]")
_env: dict[str, str] = os.environ.copy() _env: dict[str, str] = os.environ.copy()
_env["DECNET_INGEST_LOG_FILE"] = str(log_file) _env["DECNET_INGEST_LOG_FILE"] = str(log_file)
@@ -115,6 +120,7 @@ def deploy(
) -> None: ) -> None:
"""Deploy deckies to the LAN.""" """Deploy deckies to the LAN."""
import os import os
log.info("deploy command invoked mode=%s deckies=%s dry_run=%s", mode, deckies, dry_run)
if mode not in ("unihost", "swarm"): if mode not in ("unihost", "swarm"):
console.print("[red]--mode must be 'unihost' or 'swarm'[/]") console.print("[red]--mode must be 'unihost' or 'swarm'[/]")
raise typer.Exit(1) raise typer.Exit(1)
@@ -234,8 +240,13 @@ def deploy(
mutate_interval=mutate_interval, mutate_interval=mutate_interval,
) )
log.debug("deploy: config built deckies=%d interface=%s subnet=%s", len(config.deckies), config.interface, config.subnet)
from decnet.engine import deploy as _deploy from decnet.engine import deploy as _deploy
_deploy(config, dry_run=dry_run, no_cache=no_cache, parallel=parallel) _deploy(config, dry_run=dry_run, no_cache=no_cache, parallel=parallel)
if dry_run:
log.info("deploy: dry-run complete, no containers started")
else:
log.info("deploy: deployment complete deckies=%d", len(config.deckies))
if mutate_interval is not None and not dry_run: if mutate_interval is not None and not dry_run:
import subprocess # nosec B404 import subprocess # nosec B404
@@ -290,6 +301,7 @@ def collect(
"""Stream Docker logs from all running decky service containers to a log file.""" """Stream Docker logs from all running decky service containers to a log file."""
import asyncio import asyncio
from decnet.collector import log_collector_worker from decnet.collector import log_collector_worker
log.info("collect command invoked log_file=%s", log_file)
console.print(f"[bold cyan]Collector starting[/] → {log_file}") console.print(f"[bold cyan]Collector starting[/] → {log_file}")
asyncio.run(log_collector_worker(log_file)) asyncio.run(log_collector_worker(log_file))
@@ -322,6 +334,7 @@ def mutate(
@app.command() @app.command()
def status() -> None: def status() -> None:
"""Show running deckies and their status.""" """Show running deckies and their status."""
log.info("status command invoked")
from decnet.engine import status as _status from decnet.engine import status as _status
_status() _status()
@@ -336,8 +349,10 @@ def teardown(
console.print("[red]Specify --all or --id <name>.[/]") console.print("[red]Specify --all or --id <name>.[/]")
raise typer.Exit(1) raise typer.Exit(1)
log.info("teardown command invoked all=%s id=%s", all_, id_)
from decnet.engine import teardown as _teardown from decnet.engine import teardown as _teardown
_teardown(decky_id=id_) _teardown(decky_id=id_)
log.info("teardown complete all=%s id=%s", all_, id_)
if all_: if all_:
_kill_api() _kill_api()

View File

@@ -8,13 +8,14 @@ The ingester tails the .json file; rsyslog can consume the .log file independent
import asyncio import asyncio
import json import json
import logging
import re import re
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any, Optional from typing import Any, Optional
logger = logging.getLogger("decnet.collector") from decnet.logging import get_logger
logger = get_logger("collector")
# ─── RFC 5424 parser ────────────────────────────────────────────────────────── # ─── RFC 5424 parser ──────────────────────────────────────────────────────────
@@ -139,10 +140,13 @@ def _stream_container(container_id: str, log_path: Path, json_path: Path) -> Non
lf.flush() lf.flush()
parsed = parse_rfc5424(line) parsed = parse_rfc5424(line)
if parsed: if parsed:
logger.debug("collector: event written decky=%s type=%s", parsed.get("decky"), parsed.get("event_type"))
jf.write(json.dumps(parsed) + "\n") jf.write(json.dumps(parsed) + "\n")
jf.flush() jf.flush()
else:
logger.debug("collector: malformed RFC5424 line snippet=%r", line[:80])
except Exception as exc: except Exception as exc:
logger.debug("Log stream ended for container %s: %s", container_id, exc) logger.debug("collector: log stream ended container_id=%s reason=%s", container_id, exc)
# ─── Async collector ────────────────────────────────────────────────────────── # ─── Async collector ──────────────────────────────────────────────────────────
@@ -170,9 +174,10 @@ async def log_collector_worker(log_file: str) -> None:
asyncio.to_thread(_stream_container, container_id, log_path, json_path), asyncio.to_thread(_stream_container, container_id, log_path, json_path),
loop=loop, loop=loop,
) )
logger.info("Collecting logs from container: %s", container_name) logger.info("collector: streaming container=%s", container_name)
try: try:
logger.info("collector started log_path=%s", log_path)
client = docker.from_env() client = docker.from_env()
for container in client.containers.list(): for container in client.containers.list():
@@ -193,8 +198,9 @@ async def log_collector_worker(log_file: str) -> None:
await asyncio.to_thread(_watch_events) await asyncio.to_thread(_watch_events)
except asyncio.CancelledError: except asyncio.CancelledError:
logger.info("collector shutdown requested cancelling %d tasks", len(active))
for task in active.values(): for task in active.values():
task.cancel() task.cancel()
raise raise
except Exception as exc: except Exception as exc:
logger.error("Collector error: %s", exc) logger.error("collector error: %s", exc)

View File

@@ -48,8 +48,9 @@ class Rfc5424Formatter(logging.Formatter):
msg = record.getMessage() msg = record.getMessage()
if record.exc_info: if record.exc_info:
msg += "\n" + self.formatException(record.exc_info) msg += "\n" + self.formatException(record.exc_info)
app = getattr(record, "decnet_component", self._app)
return ( return (
f"<{prival}>1 {ts} {self._hostname} {self._app}" f"<{prival}>1 {ts} {self._hostname} {app}"
f" {os.getpid()} {record.name} - {msg}" f" {os.getpid()} {record.name} - {msg}"
) )

View File

@@ -11,6 +11,7 @@ import docker
from rich.console import Console from rich.console import Console
from rich.table import Table from rich.table import Table
from decnet.logging import get_logger
from decnet.config import DecnetConfig, clear_state, load_state, save_state from decnet.config import DecnetConfig, clear_state, load_state, save_state
from decnet.composer import write_compose from decnet.composer import write_compose
from decnet.network import ( from decnet.network import (
@@ -26,6 +27,7 @@ from decnet.network import (
teardown_host_macvlan, teardown_host_macvlan,
) )
log = get_logger("engine")
console = Console() console = Console()
COMPOSE_FILE = Path("decnet-compose.yml") COMPOSE_FILE = Path("decnet-compose.yml")
_CANONICAL_LOGGING = Path(__file__).parent.parent.parent / "templates" / "decnet_logging.py" _CANONICAL_LOGGING = Path(__file__).parent.parent.parent / "templates" / "decnet_logging.py"
@@ -106,11 +108,14 @@ def _compose_with_retry(
def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, parallel: bool = False) -> None: def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, parallel: bool = False) -> None:
log.info("deployment started n_deckies=%d interface=%s subnet=%s dry_run=%s", len(config.deckies), config.interface, config.subnet, dry_run)
log.debug("deploy: deckies=%s", [d.name for d in config.deckies])
client = docker.from_env() client = docker.from_env()
ip_list = [d.ip for d in config.deckies] ip_list = [d.ip for d in config.deckies]
decky_range = ips_to_range(ip_list) decky_range = ips_to_range(ip_list)
host_ip = get_host_ip(config.interface) host_ip = get_host_ip(config.interface)
log.debug("deploy: ip_range=%s host_ip=%s", decky_range, host_ip)
net_driver = "IPvlan L2" if config.ipvlan else "MACVLAN" net_driver = "IPvlan L2" if config.ipvlan else "MACVLAN"
console.print(f"[bold cyan]Creating {net_driver} network[/] ({MACVLAN_NETWORK_NAME}) on {config.interface}") console.print(f"[bold cyan]Creating {net_driver} network[/] ({MACVLAN_NETWORK_NAME}) on {config.interface}")
@@ -140,6 +145,7 @@ def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False,
console.print(f"[bold cyan]Compose file written[/] → {compose_path}") console.print(f"[bold cyan]Compose file written[/] → {compose_path}")
if dry_run: if dry_run:
log.info("deployment dry-run complete compose_path=%s", compose_path)
console.print("[yellow]Dry run — no containers started.[/]") console.print("[yellow]Dry run — no containers started.[/]")
return return
@@ -161,12 +167,15 @@ def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False,
_compose_with_retry("build", "--no-cache", compose_file=compose_path) _compose_with_retry("build", "--no-cache", compose_file=compose_path)
_compose_with_retry("up", "--build", "-d", compose_file=compose_path) _compose_with_retry("up", "--build", "-d", compose_file=compose_path)
log.info("deployment complete n_deckies=%d", len(config.deckies))
_print_status(config) _print_status(config)
def teardown(decky_id: str | None = None) -> None: def teardown(decky_id: str | None = None) -> None:
log.info("teardown requested decky_id=%s", decky_id or "all")
state = load_state() state = load_state()
if state is None: if state is None:
log.warning("teardown: no active deployment found")
console.print("[red]No active deployment found (no decnet-state.json).[/]") console.print("[red]No active deployment found (no decnet-state.json).[/]")
return return
@@ -193,6 +202,7 @@ def teardown(decky_id: str | None = None) -> None:
clear_state() clear_state()
net_driver = "IPvlan" if config.ipvlan else "MACVLAN" net_driver = "IPvlan" if config.ipvlan else "MACVLAN"
log.info("teardown complete all deckies removed network_driver=%s", net_driver)
console.print(f"[green]All deckies torn down. {net_driver} network removed.[/]") console.print(f"[green]All deckies torn down. {net_driver} network removed.[/]")

View File

@@ -0,0 +1,42 @@
"""
DECNET application logging helpers.
Usage:
from decnet.logging import get_logger
log = get_logger("engine") # APP-NAME in RFC 5424 output becomes "engine"
The returned logger propagates to the root logger (configured in config.py with
Rfc5424Formatter), so level control via DECNET_DEVELOPER still applies globally.
"""
from __future__ import annotations
import logging
class _ComponentFilter(logging.Filter):
"""Injects *decnet_component* onto every LogRecord so Rfc5424Formatter can
use it as the RFC 5424 APP-NAME field instead of the hardcoded "decnet"."""
def __init__(self, component: str) -> None:
super().__init__()
self.component = component
def filter(self, record: logging.LogRecord) -> bool:
record.decnet_component = self.component # type: ignore[attr-defined]
return True
def get_logger(component: str) -> logging.Logger:
"""Return a named logger that self-identifies as *component* in RFC 5424.
Valid components: cli, engine, api, mutator, collector.
The logger is named ``decnet.<component>`` and propagates normally, so the
root handler (Rfc5424Formatter + level gate from DECNET_DEVELOPER) handles
output. Calling this function multiple times for the same component is safe.
"""
logger = logging.getLogger(f"decnet.{component}")
if not any(isinstance(f, _ComponentFilter) for f in logger.filters):
logger.addFilter(_ComponentFilter(component))
return logger

View File

@@ -14,12 +14,14 @@ from decnet.fleet import all_service_names
from decnet.composer import write_compose from decnet.composer import write_compose
from decnet.config import DeckyConfig, DecnetConfig from decnet.config import DeckyConfig, DecnetConfig
from decnet.engine import _compose_with_retry from decnet.engine import _compose_with_retry
from decnet.logging import get_logger
from pathlib import Path from pathlib import Path
import anyio import anyio
import asyncio import asyncio
from decnet.web.db.repository import BaseRepository from decnet.web.db.repository import BaseRepository
log = get_logger("mutator")
console = Console() console = Console()
@@ -28,8 +30,10 @@ async def mutate_decky(decky_name: str, repo: BaseRepository) -> bool:
Perform an Intra-Archetype Shuffle for a specific decky. Perform an Intra-Archetype Shuffle for a specific decky.
Returns True if mutation succeeded, False otherwise. Returns True if mutation succeeded, False otherwise.
""" """
log.debug("mutate_decky: start decky=%s", decky_name)
state_dict = await repo.get_state("deployment") state_dict = await repo.get_state("deployment")
if state_dict is None: if state_dict is None:
log.error("mutate_decky: no active deployment found in database")
console.print("[red]No active deployment found in database.[/]") console.print("[red]No active deployment found in database.[/]")
return False return False
@@ -73,12 +77,14 @@ async def mutate_decky(decky_name: str, repo: BaseRepository) -> bool:
# Still writes files for Docker to use # Still writes files for Docker to use
write_compose(config, compose_path) write_compose(config, compose_path)
log.info("mutation applied decky=%s services=%s", decky_name, ",".join(decky.services))
console.print(f"[cyan]Mutating '{decky_name}' to services: {', '.join(decky.services)}[/]") console.print(f"[cyan]Mutating '{decky_name}' to services: {', '.join(decky.services)}[/]")
try: try:
# Wrap blocking call in thread # Wrap blocking call in thread
await anyio.to_thread.run_sync(_compose_with_retry, "up", "-d", "--remove-orphans", compose_path) await anyio.to_thread.run_sync(_compose_with_retry, "up", "-d", "--remove-orphans", compose_path)
except Exception as e: except Exception as e:
log.error("mutation failed decky=%s error=%s", decky_name, e)
console.print(f"[red]Failed to mutate '{decky_name}': {e}[/]") console.print(f"[red]Failed to mutate '{decky_name}': {e}[/]")
return False return False
@@ -90,8 +96,10 @@ async def mutate_all(repo: BaseRepository, force: bool = False) -> None:
Check all deckies and mutate those that are due. Check all deckies and mutate those that are due.
If force=True, mutates all deckies regardless of schedule. If force=True, mutates all deckies regardless of schedule.
""" """
log.debug("mutate_all: start force=%s", force)
state_dict = await repo.get_state("deployment") state_dict = await repo.get_state("deployment")
if state_dict is None: if state_dict is None:
log.error("mutate_all: no active deployment found")
console.print("[red]No active deployment found.[/]") console.print("[red]No active deployment found.[/]")
return return
@@ -116,15 +124,20 @@ async def mutate_all(repo: BaseRepository, force: bool = False) -> None:
mutated_count += 1 mutated_count += 1
if mutated_count == 0 and not force: if mutated_count == 0 and not force:
log.debug("mutate_all: no deckies due for mutation")
console.print("[dim]No deckies are due for mutation.[/]") console.print("[dim]No deckies are due for mutation.[/]")
else:
log.info("mutate_all: complete mutated_count=%d", mutated_count)
async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> None: async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> None:
"""Run an infinite loop checking for deckies that need mutation.""" """Run an infinite loop checking for deckies that need mutation."""
log.info("mutator watch loop started poll_interval_secs=%d", poll_interval_secs)
console.print(f"[green]DECNET Mutator Watcher started (polling every {poll_interval_secs}s).[/]") console.print(f"[green]DECNET Mutator Watcher started (polling every {poll_interval_secs}s).[/]")
try: try:
while True: while True:
await mutate_all(force=False, repo=repo) await mutate_all(force=False, repo=repo)
await asyncio.sleep(poll_interval_secs) await asyncio.sleep(poll_interval_secs)
except KeyboardInterrupt: except KeyboardInterrupt:
log.info("mutator watch loop stopped")
console.print("\n[dim]Mutator watcher stopped.[/]") console.print("\n[dim]Mutator watcher stopped.[/]")

View File

@@ -1,5 +1,4 @@
import asyncio import asyncio
import logging
import os import os
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Optional from typing import Any, AsyncGenerator, Optional
@@ -11,12 +10,13 @@ from pydantic import ValidationError
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from decnet.env import DECNET_CORS_ORIGINS, DECNET_DEVELOPER, DECNET_INGEST_LOG_FILE from decnet.env import DECNET_CORS_ORIGINS, DECNET_DEVELOPER, DECNET_INGEST_LOG_FILE
from decnet.logging import get_logger
from decnet.web.dependencies import repo from decnet.web.dependencies import repo
from decnet.collector import log_collector_worker from decnet.collector import log_collector_worker
from decnet.web.ingester import log_ingestion_worker from decnet.web.ingester import log_ingestion_worker
from decnet.web.router import api_router from decnet.web.router import api_router
log = logging.getLogger(__name__) log = get_logger("api")
ingestion_task: Optional[asyncio.Task[Any]] = None ingestion_task: Optional[asyncio.Task[Any]] = None
collector_task: Optional[asyncio.Task[Any]] = None collector_task: Optional[asyncio.Task[Any]] = None
@@ -25,9 +25,11 @@ collector_task: Optional[asyncio.Task[Any]] = None
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
global ingestion_task, collector_task global ingestion_task, collector_task
log.info("API startup initialising database")
for attempt in range(1, 6): for attempt in range(1, 6):
try: try:
await repo.initialize() await repo.initialize()
log.debug("API startup DB initialised attempt=%d", attempt)
break break
except Exception as exc: except Exception as exc:
log.warning("DB init attempt %d/5 failed: %s", attempt, exc) log.warning("DB init attempt %d/5 failed: %s", attempt, exc)
@@ -40,11 +42,13 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
# Start background ingestion task # Start background ingestion task
if ingestion_task is None or ingestion_task.done(): if ingestion_task is None or ingestion_task.done():
ingestion_task = asyncio.create_task(log_ingestion_worker(repo)) ingestion_task = asyncio.create_task(log_ingestion_worker(repo))
log.debug("API startup ingest worker started")
# Start Docker log collector (writes to log file; ingester reads from it) # Start Docker log collector (writes to log file; ingester reads from it)
_log_file = os.environ.get("DECNET_INGEST_LOG_FILE", DECNET_INGEST_LOG_FILE) _log_file = os.environ.get("DECNET_INGEST_LOG_FILE", DECNET_INGEST_LOG_FILE)
if _log_file and (collector_task is None or collector_task.done()): if _log_file and (collector_task is None or collector_task.done()):
collector_task = asyncio.create_task(log_collector_worker(_log_file)) collector_task = asyncio.create_task(log_collector_worker(_log_file))
log.debug("API startup collector worker started log_file=%s", _log_file)
elif not _log_file: elif not _log_file:
log.warning("DECNET_INGEST_LOG_FILE not set — Docker log collection disabled.") log.warning("DECNET_INGEST_LOG_FILE not set — Docker log collection disabled.")
else: else:
@@ -52,7 +56,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
yield yield
# Shutdown background tasks log.info("API shutdown cancelling background tasks")
for task in (ingestion_task, collector_task): for task in (ingestion_task, collector_task):
if task and not task.done(): if task and not task.done():
task.cancel() task.cancel()
@@ -62,6 +66,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
pass pass
except Exception as exc: except Exception as exc:
log.warning("Task shutdown error: %s", exc) log.warning("Task shutdown error: %s", exc)
log.info("API shutdown complete")
app: FastAPI = FastAPI( app: FastAPI = FastAPI(

View File

@@ -1,13 +1,13 @@
import asyncio import asyncio
import os import os
import logging
import json import json
from typing import Any from typing import Any
from pathlib import Path from pathlib import Path
from decnet.logging import get_logger
from decnet.web.db.repository import BaseRepository from decnet.web.db.repository import BaseRepository
logger: logging.Logger = logging.getLogger("decnet.web.ingester") logger = get_logger("api")
async def log_ingestion_worker(repo: BaseRepository) -> None: async def log_ingestion_worker(repo: BaseRepository) -> None:
""" """
@@ -22,7 +22,7 @@ async def log_ingestion_worker(repo: BaseRepository) -> None:
_json_log_path: Path = Path(_base_log_file).with_suffix(".json") _json_log_path: Path = Path(_base_log_file).with_suffix(".json")
_position: int = 0 _position: int = 0
logger.info(f"Starting JSON log ingestion from {_json_log_path}") logger.info("ingest worker started path=%s", _json_log_path)
while True: while True:
try: try:
@@ -53,10 +53,11 @@ async def log_ingestion_worker(repo: BaseRepository) -> None:
try: try:
_log_data: dict[str, Any] = json.loads(_line.strip()) _log_data: dict[str, Any] = json.loads(_line.strip())
logger.debug("ingest: record decky=%s event_type=%s", _log_data.get("decky"), _log_data.get("event_type"))
await repo.add_log(_log_data) await repo.add_log(_log_data)
await _extract_bounty(repo, _log_data) await _extract_bounty(repo, _log_data)
except json.JSONDecodeError: except json.JSONDecodeError:
logger.error(f"Failed to decode JSON log line: {_line}") logger.error("ingest: failed to decode JSON log line: %s", _line.strip())
continue continue
# Update position after successful line read # Update position after successful line read
@@ -65,10 +66,10 @@ async def log_ingestion_worker(repo: BaseRepository) -> None:
except Exception as _e: except Exception as _e:
_err_str = str(_e).lower() _err_str = str(_e).lower()
if "no such table" in _err_str or "no active connection" in _err_str or "connection closed" in _err_str: if "no such table" in _err_str or "no active connection" in _err_str or "connection closed" in _err_str:
logger.error(f"Post-shutdown or fatal DB error in ingester: {_e}") logger.error("ingest: post-shutdown or fatal DB error: %s", _e)
break # Exit worker — DB is gone or uninitialized break # Exit worker — DB is gone or uninitialized
logger.error(f"Error in log ingestion worker: {_e}") logger.error("ingest: error in worker: %s", _e)
await asyncio.sleep(5) await asyncio.sleep(5)
await asyncio.sleep(1) await asyncio.sleep(1)

View File

@@ -1,9 +1,11 @@
import logging
import os import os
from fastapi import APIRouter, Depends, HTTPException from fastapi import APIRouter, Depends, HTTPException
from decnet.config import DEFAULT_MUTATE_INTERVAL, DecnetConfig, _ROOT, log from decnet.logging import get_logger
from decnet.config import DEFAULT_MUTATE_INTERVAL, DecnetConfig, _ROOT
log = get_logger("api")
from decnet.engine import deploy as _deploy from decnet.engine import deploy as _deploy
from decnet.ini_loader import load_ini_from_string from decnet.ini_loader import load_ini_from_string
from decnet.network import detect_interface, detect_subnet, get_host_ip from decnet.network import detect_interface, detect_subnet, get_host_ip
@@ -100,7 +102,7 @@ async def api_deploy_deckies(req: DeployIniRequest, current_user: str = Depends(
} }
await repo.set_state("deployment", new_state_payload) await repo.set_state("deployment", new_state_payload)
except Exception as e: except Exception as e:
logging.getLogger("decnet.web.api").exception("Deployment failed: %s", e) log.exception("Deployment failed: %s", e)
raise HTTPException(status_code=500, detail="Deployment failed. Check server logs for details.") raise HTTPException(status_code=500, detail="Deployment failed. Check server logs for details.")
return {"message": "Deckies deployed successfully"} return {"message": "Deckies deployed successfully"}

View File

@@ -1,15 +1,15 @@
import json import json
import asyncio import asyncio
import logging
from typing import AsyncGenerator, Optional from typing import AsyncGenerator, Optional
from fastapi import APIRouter, Depends, Query, Request from fastapi import APIRouter, Depends, Query, Request
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
from decnet.env import DECNET_DEVELOPER from decnet.env import DECNET_DEVELOPER
from decnet.logging import get_logger
from decnet.web.dependencies import get_stream_user, repo from decnet.web.dependencies import get_stream_user, repo
log = logging.getLogger(__name__) log = get_logger("api")
router = APIRouter() router = APIRouter()

155
tests/test_logging.py Normal file
View File

@@ -0,0 +1,155 @@
"""
Tests for DECNET application logging system.
Covers:
- get_logger() factory and _ComponentFilter injection
- Rfc5424Formatter component-aware APP-NAME field
- Log level gating via DECNET_DEVELOPER
- Component tags for all five microservice layers
"""
from __future__ import annotations
import logging
import os
import re
import pytest
from decnet.logging import _ComponentFilter, get_logger
# RFC 5424 parser: <PRI>1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID SD MSG
_RFC5424_RE = re.compile(
r"^<(\d+)>1 " # PRI
r"\S+ " # TIMESTAMP
r"\S+ " # HOSTNAME
r"(\S+) " # APP-NAME ← what we care about
r"\S+ " # PROCID
r"(\S+) " # MSGID
r"(.+)$", # SD + MSG
)
def _format_record(logger: logging.Logger, level: int, msg: str) -> str:
"""Emit a log record through the root handler and return the formatted string."""
from decnet.config import Rfc5424Formatter
formatter = Rfc5424Formatter()
record = logger.makeRecord(
logger.name, level, "<test>", 0, msg, (), None
)
# Run all filters attached to the logger so decnet_component gets injected
for f in logger.filters:
f.filter(record)
return formatter.format(record)
class TestGetLogger:
def test_returns_logger(self):
log = get_logger("cli")
assert isinstance(log, logging.Logger)
def test_logger_name(self):
log = get_logger("engine")
assert log.name == "decnet.engine"
def test_filter_attached(self):
log = get_logger("api")
assert any(isinstance(f, _ComponentFilter) for f in log.filters)
def test_idempotent_filter(self):
log = get_logger("mutator")
get_logger("mutator") # second call
component_filters = [f for f in log.filters if isinstance(f, _ComponentFilter)]
assert len(component_filters) == 1
@pytest.mark.parametrize("component", ["cli", "engine", "api", "mutator", "collector"])
def test_all_components_registered(self, component):
log = get_logger(component)
assert any(isinstance(f, _ComponentFilter) for f in log.filters)
class TestComponentFilter:
def test_injects_attribute(self):
f = _ComponentFilter("engine")
record = logging.LogRecord("test", logging.INFO, "", 0, "msg", (), None)
assert f.filter(record) is True
assert record.decnet_component == "engine" # type: ignore[attr-defined]
def test_always_passes(self):
f = _ComponentFilter("collector")
record = logging.LogRecord("test", logging.DEBUG, "", 0, "msg", (), None)
assert f.filter(record) is True
class TestRfc5424FormatterComponentAware:
@pytest.mark.parametrize("component", ["cli", "engine", "api", "mutator", "collector"])
def test_app_name_is_component(self, component):
log = get_logger(component)
line = _format_record(log, logging.INFO, "test message")
m = _RFC5424_RE.match(line)
assert m is not None, f"Not RFC 5424: {line!r}"
assert m.group(2) == component, f"Expected APP-NAME={component!r}, got {m.group(2)!r}"
def test_fallback_app_name_without_component(self):
"""Untagged loggers (no _ComponentFilter) fall back to 'decnet'."""
from decnet.config import Rfc5424Formatter
formatter = Rfc5424Formatter()
record = logging.LogRecord("some.module", logging.INFO, "", 0, "hello", (), None)
line = formatter.format(record)
m = _RFC5424_RE.match(line)
assert m is not None
assert m.group(2) == "decnet"
def test_msgid_is_logger_name(self):
log = get_logger("engine")
line = _format_record(log, logging.INFO, "deploying")
m = _RFC5424_RE.match(line)
assert m is not None
assert m.group(3) == "decnet.engine"
class TestLogLevelGating:
def test_configure_logging_normal_mode_sets_info(self):
"""_configure_logging(dev=False) must set root to INFO."""
from decnet.config import _configure_logging, Rfc5424Formatter
root = logging.getLogger()
original_level = root.level
original_handlers = root.handlers[:]
# Remove any existing RFC5424 handlers so idempotency check doesn't skip
root.handlers = [
h for h in root.handlers
if not (isinstance(h, logging.StreamHandler) and isinstance(h.formatter, Rfc5424Formatter))
]
try:
_configure_logging(dev=False)
assert root.level == logging.INFO
finally:
root.setLevel(original_level)
root.handlers = original_handlers
def test_configure_logging_dev_mode_sets_debug(self):
"""_configure_logging(dev=True) must set root to DEBUG."""
from decnet.config import _configure_logging, Rfc5424Formatter
root = logging.getLogger()
original_level = root.level
original_handlers = root.handlers[:]
root.handlers = [
h for h in root.handlers
if not (isinstance(h, logging.StreamHandler) and isinstance(h.formatter, Rfc5424Formatter))
]
try:
_configure_logging(dev=True)
assert root.level == logging.DEBUG
finally:
root.setLevel(original_level)
root.handlers = original_handlers
def test_debug_enabled_in_developer_mode(self, monkeypatch):
"""Programmatically setting DEBUG on root allows debug records through."""
root = logging.getLogger()
original_level = root.level
root.setLevel(logging.DEBUG)
try:
assert root.isEnabledFor(logging.DEBUG)
finally:
root.setLevel(original_level)