diff --git a/decnet/cli.py b/decnet/cli.py index 91415e5..b52e9b8 100644 --- a/decnet/cli.py +++ b/decnet/cli.py @@ -8,6 +8,7 @@ Usage: decnet services """ +import logging import signal from typing import Optional @@ -15,6 +16,7 @@ import typer from rich.console import Console from rich.table import Table +from decnet.logging import get_logger from decnet.env import ( DECNET_API_HOST, DECNET_API_PORT, @@ -32,6 +34,8 @@ from decnet.ini_loader import load_ini from decnet.network import detect_interface, detect_subnet, allocate_ips, get_host_ip from decnet.services.registry import all_services +log = get_logger("cli") + app = typer.Typer( name="decnet", help="Deploy a deception network of honeypot deckies on your LAN.", @@ -77,6 +81,7 @@ def api( import sys import os + log.info("API command invoked host=%s port=%d", host, port) console.print(f"[green]Starting DECNET API on {host}:{port}...[/]") _env: dict[str, str] = os.environ.copy() _env["DECNET_INGEST_LOG_FILE"] = str(log_file) @@ -115,6 +120,7 @@ def deploy( ) -> None: """Deploy deckies to the LAN.""" import os + log.info("deploy command invoked mode=%s deckies=%s dry_run=%s", mode, deckies, dry_run) if mode not in ("unihost", "swarm"): console.print("[red]--mode must be 'unihost' or 'swarm'[/]") raise typer.Exit(1) @@ -234,8 +240,13 @@ def deploy( mutate_interval=mutate_interval, ) + log.debug("deploy: config built deckies=%d interface=%s subnet=%s", len(config.deckies), config.interface, config.subnet) from decnet.engine import deploy as _deploy _deploy(config, dry_run=dry_run, no_cache=no_cache, parallel=parallel) + if dry_run: + log.info("deploy: dry-run complete, no containers started") + else: + log.info("deploy: deployment complete deckies=%d", len(config.deckies)) if mutate_interval is not None and not dry_run: import subprocess # nosec B404 @@ -290,6 +301,7 @@ def collect( """Stream Docker logs from all running decky service containers to a log file.""" import asyncio from decnet.collector import log_collector_worker + log.info("collect command invoked log_file=%s", log_file) console.print(f"[bold cyan]Collector starting[/] → {log_file}") asyncio.run(log_collector_worker(log_file)) @@ -322,6 +334,7 @@ def mutate( @app.command() def status() -> None: """Show running deckies and their status.""" + log.info("status command invoked") from decnet.engine import status as _status _status() @@ -336,8 +349,10 @@ def teardown( console.print("[red]Specify --all or --id .[/]") raise typer.Exit(1) + log.info("teardown command invoked all=%s id=%s", all_, id_) from decnet.engine import teardown as _teardown _teardown(decky_id=id_) + log.info("teardown complete all=%s id=%s", all_, id_) if all_: _kill_api() diff --git a/decnet/collector/worker.py b/decnet/collector/worker.py index 69e2c6b..d96ed4f 100644 --- a/decnet/collector/worker.py +++ b/decnet/collector/worker.py @@ -8,13 +8,14 @@ The ingester tails the .json file; rsyslog can consume the .log file independent import asyncio import json -import logging import re from datetime import datetime from pathlib import Path from typing import Any, Optional -logger = logging.getLogger("decnet.collector") +from decnet.logging import get_logger + +logger = get_logger("collector") # ─── RFC 5424 parser ────────────────────────────────────────────────────────── @@ -139,10 +140,13 @@ def _stream_container(container_id: str, log_path: Path, json_path: Path) -> Non lf.flush() parsed = parse_rfc5424(line) if parsed: + logger.debug("collector: event written decky=%s type=%s", parsed.get("decky"), parsed.get("event_type")) jf.write(json.dumps(parsed) + "\n") jf.flush() + else: + logger.debug("collector: malformed RFC5424 line snippet=%r", line[:80]) except Exception as exc: - logger.debug("Log stream ended for container %s: %s", container_id, exc) + logger.debug("collector: log stream ended container_id=%s reason=%s", container_id, exc) # ─── Async collector ────────────────────────────────────────────────────────── @@ -170,9 +174,10 @@ async def log_collector_worker(log_file: str) -> None: asyncio.to_thread(_stream_container, container_id, log_path, json_path), loop=loop, ) - logger.info("Collecting logs from container: %s", container_name) + logger.info("collector: streaming container=%s", container_name) try: + logger.info("collector started log_path=%s", log_path) client = docker.from_env() for container in client.containers.list(): @@ -193,8 +198,9 @@ async def log_collector_worker(log_file: str) -> None: await asyncio.to_thread(_watch_events) except asyncio.CancelledError: + logger.info("collector shutdown requested cancelling %d tasks", len(active)) for task in active.values(): task.cancel() raise except Exception as exc: - logger.error("Collector error: %s", exc) + logger.error("collector error: %s", exc) diff --git a/decnet/config.py b/decnet/config.py index f07c682..80c7e38 100644 --- a/decnet/config.py +++ b/decnet/config.py @@ -48,8 +48,9 @@ class Rfc5424Formatter(logging.Formatter): msg = record.getMessage() if record.exc_info: msg += "\n" + self.formatException(record.exc_info) + app = getattr(record, "decnet_component", self._app) return ( - f"<{prival}>1 {ts} {self._hostname} {self._app}" + f"<{prival}>1 {ts} {self._hostname} {app}" f" {os.getpid()} {record.name} - {msg}" ) diff --git a/decnet/engine/deployer.py b/decnet/engine/deployer.py index 3f03c63..aa9252b 100644 --- a/decnet/engine/deployer.py +++ b/decnet/engine/deployer.py @@ -11,6 +11,7 @@ import docker from rich.console import Console from rich.table import Table +from decnet.logging import get_logger from decnet.config import DecnetConfig, clear_state, load_state, save_state from decnet.composer import write_compose from decnet.network import ( @@ -26,6 +27,7 @@ from decnet.network import ( teardown_host_macvlan, ) +log = get_logger("engine") console = Console() COMPOSE_FILE = Path("decnet-compose.yml") _CANONICAL_LOGGING = Path(__file__).parent.parent.parent / "templates" / "decnet_logging.py" @@ -106,11 +108,14 @@ def _compose_with_retry( def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, parallel: bool = False) -> None: + log.info("deployment started n_deckies=%d interface=%s subnet=%s dry_run=%s", len(config.deckies), config.interface, config.subnet, dry_run) + log.debug("deploy: deckies=%s", [d.name for d in config.deckies]) client = docker.from_env() ip_list = [d.ip for d in config.deckies] decky_range = ips_to_range(ip_list) host_ip = get_host_ip(config.interface) + log.debug("deploy: ip_range=%s host_ip=%s", decky_range, host_ip) net_driver = "IPvlan L2" if config.ipvlan else "MACVLAN" console.print(f"[bold cyan]Creating {net_driver} network[/] ({MACVLAN_NETWORK_NAME}) on {config.interface}") @@ -140,6 +145,7 @@ def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, console.print(f"[bold cyan]Compose file written[/] → {compose_path}") if dry_run: + log.info("deployment dry-run complete compose_path=%s", compose_path) console.print("[yellow]Dry run — no containers started.[/]") return @@ -161,12 +167,15 @@ def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, _compose_with_retry("build", "--no-cache", compose_file=compose_path) _compose_with_retry("up", "--build", "-d", compose_file=compose_path) + log.info("deployment complete n_deckies=%d", len(config.deckies)) _print_status(config) def teardown(decky_id: str | None = None) -> None: + log.info("teardown requested decky_id=%s", decky_id or "all") state = load_state() if state is None: + log.warning("teardown: no active deployment found") console.print("[red]No active deployment found (no decnet-state.json).[/]") return @@ -193,6 +202,7 @@ def teardown(decky_id: str | None = None) -> None: clear_state() net_driver = "IPvlan" if config.ipvlan else "MACVLAN" + log.info("teardown complete all deckies removed network_driver=%s", net_driver) console.print(f"[green]All deckies torn down. {net_driver} network removed.[/]") diff --git a/decnet/logging/__init__.py b/decnet/logging/__init__.py index e69de29..ad716e7 100644 --- a/decnet/logging/__init__.py +++ b/decnet/logging/__init__.py @@ -0,0 +1,42 @@ +""" +DECNET application logging helpers. + +Usage: + from decnet.logging import get_logger + log = get_logger("engine") # APP-NAME in RFC 5424 output becomes "engine" + +The returned logger propagates to the root logger (configured in config.py with +Rfc5424Formatter), so level control via DECNET_DEVELOPER still applies globally. +""" + +from __future__ import annotations + +import logging + + +class _ComponentFilter(logging.Filter): + """Injects *decnet_component* onto every LogRecord so Rfc5424Formatter can + use it as the RFC 5424 APP-NAME field instead of the hardcoded "decnet".""" + + def __init__(self, component: str) -> None: + super().__init__() + self.component = component + + def filter(self, record: logging.LogRecord) -> bool: + record.decnet_component = self.component # type: ignore[attr-defined] + return True + + +def get_logger(component: str) -> logging.Logger: + """Return a named logger that self-identifies as *component* in RFC 5424. + + Valid components: cli, engine, api, mutator, collector. + + The logger is named ``decnet.`` and propagates normally, so the + root handler (Rfc5424Formatter + level gate from DECNET_DEVELOPER) handles + output. Calling this function multiple times for the same component is safe. + """ + logger = logging.getLogger(f"decnet.{component}") + if not any(isinstance(f, _ComponentFilter) for f in logger.filters): + logger.addFilter(_ComponentFilter(component)) + return logger diff --git a/decnet/mutator/engine.py b/decnet/mutator/engine.py index 6d97e23..6ef916c 100644 --- a/decnet/mutator/engine.py +++ b/decnet/mutator/engine.py @@ -14,12 +14,14 @@ from decnet.fleet import all_service_names from decnet.composer import write_compose from decnet.config import DeckyConfig, DecnetConfig from decnet.engine import _compose_with_retry +from decnet.logging import get_logger from pathlib import Path import anyio import asyncio from decnet.web.db.repository import BaseRepository +log = get_logger("mutator") console = Console() @@ -28,8 +30,10 @@ async def mutate_decky(decky_name: str, repo: BaseRepository) -> bool: Perform an Intra-Archetype Shuffle for a specific decky. Returns True if mutation succeeded, False otherwise. """ + log.debug("mutate_decky: start decky=%s", decky_name) state_dict = await repo.get_state("deployment") if state_dict is None: + log.error("mutate_decky: no active deployment found in database") console.print("[red]No active deployment found in database.[/]") return False @@ -73,12 +77,14 @@ async def mutate_decky(decky_name: str, repo: BaseRepository) -> bool: # Still writes files for Docker to use write_compose(config, compose_path) + log.info("mutation applied decky=%s services=%s", decky_name, ",".join(decky.services)) console.print(f"[cyan]Mutating '{decky_name}' to services: {', '.join(decky.services)}[/]") try: # Wrap blocking call in thread await anyio.to_thread.run_sync(_compose_with_retry, "up", "-d", "--remove-orphans", compose_path) except Exception as e: + log.error("mutation failed decky=%s error=%s", decky_name, e) console.print(f"[red]Failed to mutate '{decky_name}': {e}[/]") return False @@ -90,8 +96,10 @@ async def mutate_all(repo: BaseRepository, force: bool = False) -> None: Check all deckies and mutate those that are due. If force=True, mutates all deckies regardless of schedule. """ + log.debug("mutate_all: start force=%s", force) state_dict = await repo.get_state("deployment") if state_dict is None: + log.error("mutate_all: no active deployment found") console.print("[red]No active deployment found.[/]") return @@ -116,15 +124,20 @@ async def mutate_all(repo: BaseRepository, force: bool = False) -> None: mutated_count += 1 if mutated_count == 0 and not force: + log.debug("mutate_all: no deckies due for mutation") console.print("[dim]No deckies are due for mutation.[/]") + else: + log.info("mutate_all: complete mutated_count=%d", mutated_count) async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> None: """Run an infinite loop checking for deckies that need mutation.""" + log.info("mutator watch loop started poll_interval_secs=%d", poll_interval_secs) console.print(f"[green]DECNET Mutator Watcher started (polling every {poll_interval_secs}s).[/]") try: while True: await mutate_all(force=False, repo=repo) await asyncio.sleep(poll_interval_secs) except KeyboardInterrupt: + log.info("mutator watch loop stopped") console.print("\n[dim]Mutator watcher stopped.[/]") diff --git a/decnet/web/api.py b/decnet/web/api.py index d5e3ca3..4eabe79 100644 --- a/decnet/web/api.py +++ b/decnet/web/api.py @@ -1,5 +1,4 @@ import asyncio -import logging import os from contextlib import asynccontextmanager from typing import Any, AsyncGenerator, Optional @@ -11,12 +10,13 @@ from pydantic import ValidationError from fastapi.middleware.cors import CORSMiddleware from decnet.env import DECNET_CORS_ORIGINS, DECNET_DEVELOPER, DECNET_INGEST_LOG_FILE +from decnet.logging import get_logger from decnet.web.dependencies import repo from decnet.collector import log_collector_worker from decnet.web.ingester import log_ingestion_worker from decnet.web.router import api_router -log = logging.getLogger(__name__) +log = get_logger("api") ingestion_task: Optional[asyncio.Task[Any]] = None collector_task: Optional[asyncio.Task[Any]] = None @@ -25,9 +25,11 @@ collector_task: Optional[asyncio.Task[Any]] = None async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: global ingestion_task, collector_task + log.info("API startup initialising database") for attempt in range(1, 6): try: await repo.initialize() + log.debug("API startup DB initialised attempt=%d", attempt) break except Exception as exc: log.warning("DB init attempt %d/5 failed: %s", attempt, exc) @@ -40,11 +42,13 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: # Start background ingestion task if ingestion_task is None or ingestion_task.done(): ingestion_task = asyncio.create_task(log_ingestion_worker(repo)) + log.debug("API startup ingest worker started") # Start Docker log collector (writes to log file; ingester reads from it) _log_file = os.environ.get("DECNET_INGEST_LOG_FILE", DECNET_INGEST_LOG_FILE) if _log_file and (collector_task is None or collector_task.done()): collector_task = asyncio.create_task(log_collector_worker(_log_file)) + log.debug("API startup collector worker started log_file=%s", _log_file) elif not _log_file: log.warning("DECNET_INGEST_LOG_FILE not set — Docker log collection disabled.") else: @@ -52,7 +56,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: yield - # Shutdown background tasks + log.info("API shutdown cancelling background tasks") for task in (ingestion_task, collector_task): if task and not task.done(): task.cancel() @@ -62,6 +66,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: pass except Exception as exc: log.warning("Task shutdown error: %s", exc) + log.info("API shutdown complete") app: FastAPI = FastAPI( diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py index 96a224a..ddf555d 100644 --- a/decnet/web/ingester.py +++ b/decnet/web/ingester.py @@ -1,13 +1,13 @@ import asyncio import os -import logging import json from typing import Any from pathlib import Path +from decnet.logging import get_logger from decnet.web.db.repository import BaseRepository -logger: logging.Logger = logging.getLogger("decnet.web.ingester") +logger = get_logger("api") async def log_ingestion_worker(repo: BaseRepository) -> None: """ @@ -22,7 +22,7 @@ async def log_ingestion_worker(repo: BaseRepository) -> None: _json_log_path: Path = Path(_base_log_file).with_suffix(".json") _position: int = 0 - logger.info(f"Starting JSON log ingestion from {_json_log_path}") + logger.info("ingest worker started path=%s", _json_log_path) while True: try: @@ -53,10 +53,11 @@ async def log_ingestion_worker(repo: BaseRepository) -> None: try: _log_data: dict[str, Any] = json.loads(_line.strip()) + logger.debug("ingest: record decky=%s event_type=%s", _log_data.get("decky"), _log_data.get("event_type")) await repo.add_log(_log_data) await _extract_bounty(repo, _log_data) except json.JSONDecodeError: - logger.error(f"Failed to decode JSON log line: {_line}") + logger.error("ingest: failed to decode JSON log line: %s", _line.strip()) continue # Update position after successful line read @@ -65,10 +66,10 @@ async def log_ingestion_worker(repo: BaseRepository) -> None: except Exception as _e: _err_str = str(_e).lower() if "no such table" in _err_str or "no active connection" in _err_str or "connection closed" in _err_str: - logger.error(f"Post-shutdown or fatal DB error in ingester: {_e}") + logger.error("ingest: post-shutdown or fatal DB error: %s", _e) break # Exit worker — DB is gone or uninitialized - logger.error(f"Error in log ingestion worker: {_e}") + logger.error("ingest: error in worker: %s", _e) await asyncio.sleep(5) await asyncio.sleep(1) diff --git a/decnet/web/router/fleet/api_deploy_deckies.py b/decnet/web/router/fleet/api_deploy_deckies.py index 914a64c..d6654c9 100644 --- a/decnet/web/router/fleet/api_deploy_deckies.py +++ b/decnet/web/router/fleet/api_deploy_deckies.py @@ -1,9 +1,11 @@ -import logging import os from fastapi import APIRouter, Depends, HTTPException -from decnet.config import DEFAULT_MUTATE_INTERVAL, DecnetConfig, _ROOT, log +from decnet.logging import get_logger +from decnet.config import DEFAULT_MUTATE_INTERVAL, DecnetConfig, _ROOT + +log = get_logger("api") from decnet.engine import deploy as _deploy from decnet.ini_loader import load_ini_from_string from decnet.network import detect_interface, detect_subnet, get_host_ip @@ -100,7 +102,7 @@ async def api_deploy_deckies(req: DeployIniRequest, current_user: str = Depends( } await repo.set_state("deployment", new_state_payload) except Exception as e: - logging.getLogger("decnet.web.api").exception("Deployment failed: %s", e) + log.exception("Deployment failed: %s", e) raise HTTPException(status_code=500, detail="Deployment failed. Check server logs for details.") return {"message": "Deckies deployed successfully"} diff --git a/decnet/web/router/stream/api_stream_events.py b/decnet/web/router/stream/api_stream_events.py index 0690b6a..8bd56e6 100644 --- a/decnet/web/router/stream/api_stream_events.py +++ b/decnet/web/router/stream/api_stream_events.py @@ -1,15 +1,15 @@ import json import asyncio -import logging from typing import AsyncGenerator, Optional from fastapi import APIRouter, Depends, Query, Request from fastapi.responses import StreamingResponse from decnet.env import DECNET_DEVELOPER +from decnet.logging import get_logger from decnet.web.dependencies import get_stream_user, repo -log = logging.getLogger(__name__) +log = get_logger("api") router = APIRouter() diff --git a/tests/test_logging.py b/tests/test_logging.py new file mode 100644 index 0000000..565a872 --- /dev/null +++ b/tests/test_logging.py @@ -0,0 +1,155 @@ +""" +Tests for DECNET application logging system. + +Covers: +- get_logger() factory and _ComponentFilter injection +- Rfc5424Formatter component-aware APP-NAME field +- Log level gating via DECNET_DEVELOPER +- Component tags for all five microservice layers +""" + +from __future__ import annotations + +import logging +import os +import re + +import pytest + +from decnet.logging import _ComponentFilter, get_logger + +# RFC 5424 parser: 1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID SD MSG +_RFC5424_RE = re.compile( + r"^<(\d+)>1 " # PRI + r"\S+ " # TIMESTAMP + r"\S+ " # HOSTNAME + r"(\S+) " # APP-NAME ← what we care about + r"\S+ " # PROCID + r"(\S+) " # MSGID + r"(.+)$", # SD + MSG +) + + +def _format_record(logger: logging.Logger, level: int, msg: str) -> str: + """Emit a log record through the root handler and return the formatted string.""" + from decnet.config import Rfc5424Formatter + formatter = Rfc5424Formatter() + record = logger.makeRecord( + logger.name, level, "", 0, msg, (), None + ) + # Run all filters attached to the logger so decnet_component gets injected + for f in logger.filters: + f.filter(record) + return formatter.format(record) + + +class TestGetLogger: + def test_returns_logger(self): + log = get_logger("cli") + assert isinstance(log, logging.Logger) + + def test_logger_name(self): + log = get_logger("engine") + assert log.name == "decnet.engine" + + def test_filter_attached(self): + log = get_logger("api") + assert any(isinstance(f, _ComponentFilter) for f in log.filters) + + def test_idempotent_filter(self): + log = get_logger("mutator") + get_logger("mutator") # second call + component_filters = [f for f in log.filters if isinstance(f, _ComponentFilter)] + assert len(component_filters) == 1 + + @pytest.mark.parametrize("component", ["cli", "engine", "api", "mutator", "collector"]) + def test_all_components_registered(self, component): + log = get_logger(component) + assert any(isinstance(f, _ComponentFilter) for f in log.filters) + + +class TestComponentFilter: + def test_injects_attribute(self): + f = _ComponentFilter("engine") + record = logging.LogRecord("test", logging.INFO, "", 0, "msg", (), None) + assert f.filter(record) is True + assert record.decnet_component == "engine" # type: ignore[attr-defined] + + def test_always_passes(self): + f = _ComponentFilter("collector") + record = logging.LogRecord("test", logging.DEBUG, "", 0, "msg", (), None) + assert f.filter(record) is True + + +class TestRfc5424FormatterComponentAware: + @pytest.mark.parametrize("component", ["cli", "engine", "api", "mutator", "collector"]) + def test_app_name_is_component(self, component): + log = get_logger(component) + line = _format_record(log, logging.INFO, "test message") + m = _RFC5424_RE.match(line) + assert m is not None, f"Not RFC 5424: {line!r}" + assert m.group(2) == component, f"Expected APP-NAME={component!r}, got {m.group(2)!r}" + + def test_fallback_app_name_without_component(self): + """Untagged loggers (no _ComponentFilter) fall back to 'decnet'.""" + from decnet.config import Rfc5424Formatter + formatter = Rfc5424Formatter() + record = logging.LogRecord("some.module", logging.INFO, "", 0, "hello", (), None) + line = formatter.format(record) + m = _RFC5424_RE.match(line) + assert m is not None + assert m.group(2) == "decnet" + + def test_msgid_is_logger_name(self): + log = get_logger("engine") + line = _format_record(log, logging.INFO, "deploying") + m = _RFC5424_RE.match(line) + assert m is not None + assert m.group(3) == "decnet.engine" + + +class TestLogLevelGating: + def test_configure_logging_normal_mode_sets_info(self): + """_configure_logging(dev=False) must set root to INFO.""" + from decnet.config import _configure_logging, Rfc5424Formatter + root = logging.getLogger() + original_level = root.level + original_handlers = root.handlers[:] + # Remove any existing RFC5424 handlers so idempotency check doesn't skip + root.handlers = [ + h for h in root.handlers + if not (isinstance(h, logging.StreamHandler) and isinstance(h.formatter, Rfc5424Formatter)) + ] + try: + _configure_logging(dev=False) + assert root.level == logging.INFO + finally: + root.setLevel(original_level) + root.handlers = original_handlers + + def test_configure_logging_dev_mode_sets_debug(self): + """_configure_logging(dev=True) must set root to DEBUG.""" + from decnet.config import _configure_logging, Rfc5424Formatter + root = logging.getLogger() + original_level = root.level + original_handlers = root.handlers[:] + root.handlers = [ + h for h in root.handlers + if not (isinstance(h, logging.StreamHandler) and isinstance(h.formatter, Rfc5424Formatter)) + ] + try: + _configure_logging(dev=True) + assert root.level == logging.DEBUG + finally: + root.setLevel(original_level) + root.handlers = original_handlers + + def test_debug_enabled_in_developer_mode(self, monkeypatch): + """Programmatically setting DEBUG on root allows debug records through.""" + root = logging.getLogger() + original_level = root.level + root.setLevel(logging.DEBUG) + try: + assert root.isEnabledFor(logging.DEBUG) + finally: + root.setLevel(original_level)