Compare commits
2 Commits
0706919469
...
448cb9cee0
| Author | SHA1 | Date | |
|---|---|---|---|
| 448cb9cee0 | |||
| 035499f255 |
@@ -1,29 +0,0 @@
|
|||||||
{
|
|
||||||
"permissions": {
|
|
||||||
"allow": [
|
|
||||||
"mcp__plugin_context-mode_context-mode__ctx_batch_execute",
|
|
||||||
"mcp__plugin_context-mode_context-mode__ctx_search",
|
|
||||||
"Bash(grep:*)",
|
|
||||||
"Bash(python -m pytest --tb=short -q)",
|
|
||||||
"Bash(pip install:*)",
|
|
||||||
"Bash(pip show:*)",
|
|
||||||
"Bash(python:*)",
|
|
||||||
"Bash(DECNET_JWT_SECRET=\"test-secret-xyz-1234!\" DECNET_ADMIN_PASSWORD=\"test-pass-xyz-1234!\" python:*)",
|
|
||||||
"Bash(ls /home/anti/Tools/DECNET/*.db* /home/anti/Tools/DECNET/test_*.db*)",
|
|
||||||
"mcp__plugin_context-mode_context-mode__ctx_execute_file",
|
|
||||||
"Bash(nc)",
|
|
||||||
"Bash(nmap:*)",
|
|
||||||
"Bash(ping -c1 -W2 192.168.1.200)",
|
|
||||||
"Bash(xxd)",
|
|
||||||
"Bash(curl -s http://192.168.1.200:2375/version)",
|
|
||||||
"Bash(python3 -m json.tool)",
|
|
||||||
"Bash(curl -s http://192.168.1.200:9200/)",
|
|
||||||
"Bash(docker image:*)",
|
|
||||||
"Read(//home/anti/Tools/cowrie/src/cowrie/data/txtcmds/**)",
|
|
||||||
"Read(//home/anti/Tools/cowrie/src/cowrie/data/txtcmds/bin/**)",
|
|
||||||
"mcp__plugin_context-mode_context-mode__ctx_index",
|
|
||||||
"Bash(ls:*)",
|
|
||||||
"mcp__plugin_context-mode_context-mode__ctx_execute"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -8,6 +8,7 @@ Usage:
|
|||||||
decnet services
|
decnet services
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
import signal
|
import signal
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
@@ -15,6 +16,7 @@ import typer
|
|||||||
from rich.console import Console
|
from rich.console import Console
|
||||||
from rich.table import Table
|
from rich.table import Table
|
||||||
|
|
||||||
|
from decnet.logging import get_logger
|
||||||
from decnet.env import (
|
from decnet.env import (
|
||||||
DECNET_API_HOST,
|
DECNET_API_HOST,
|
||||||
DECNET_API_PORT,
|
DECNET_API_PORT,
|
||||||
@@ -32,6 +34,8 @@ from decnet.ini_loader import load_ini
|
|||||||
from decnet.network import detect_interface, detect_subnet, allocate_ips, get_host_ip
|
from decnet.network import detect_interface, detect_subnet, allocate_ips, get_host_ip
|
||||||
from decnet.services.registry import all_services
|
from decnet.services.registry import all_services
|
||||||
|
|
||||||
|
log = get_logger("cli")
|
||||||
|
|
||||||
app = typer.Typer(
|
app = typer.Typer(
|
||||||
name="decnet",
|
name="decnet",
|
||||||
help="Deploy a deception network of honeypot deckies on your LAN.",
|
help="Deploy a deception network of honeypot deckies on your LAN.",
|
||||||
@@ -77,6 +81,7 @@ def api(
|
|||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
log.info("API command invoked host=%s port=%d", host, port)
|
||||||
console.print(f"[green]Starting DECNET API on {host}:{port}...[/]")
|
console.print(f"[green]Starting DECNET API on {host}:{port}...[/]")
|
||||||
_env: dict[str, str] = os.environ.copy()
|
_env: dict[str, str] = os.environ.copy()
|
||||||
_env["DECNET_INGEST_LOG_FILE"] = str(log_file)
|
_env["DECNET_INGEST_LOG_FILE"] = str(log_file)
|
||||||
@@ -115,6 +120,7 @@ def deploy(
|
|||||||
) -> None:
|
) -> None:
|
||||||
"""Deploy deckies to the LAN."""
|
"""Deploy deckies to the LAN."""
|
||||||
import os
|
import os
|
||||||
|
log.info("deploy command invoked mode=%s deckies=%s dry_run=%s", mode, deckies, dry_run)
|
||||||
if mode not in ("unihost", "swarm"):
|
if mode not in ("unihost", "swarm"):
|
||||||
console.print("[red]--mode must be 'unihost' or 'swarm'[/]")
|
console.print("[red]--mode must be 'unihost' or 'swarm'[/]")
|
||||||
raise typer.Exit(1)
|
raise typer.Exit(1)
|
||||||
@@ -234,8 +240,13 @@ def deploy(
|
|||||||
mutate_interval=mutate_interval,
|
mutate_interval=mutate_interval,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
log.debug("deploy: config built deckies=%d interface=%s subnet=%s", len(config.deckies), config.interface, config.subnet)
|
||||||
from decnet.engine import deploy as _deploy
|
from decnet.engine import deploy as _deploy
|
||||||
_deploy(config, dry_run=dry_run, no_cache=no_cache, parallel=parallel)
|
_deploy(config, dry_run=dry_run, no_cache=no_cache, parallel=parallel)
|
||||||
|
if dry_run:
|
||||||
|
log.info("deploy: dry-run complete, no containers started")
|
||||||
|
else:
|
||||||
|
log.info("deploy: deployment complete deckies=%d", len(config.deckies))
|
||||||
|
|
||||||
if mutate_interval is not None and not dry_run:
|
if mutate_interval is not None and not dry_run:
|
||||||
import subprocess # nosec B404
|
import subprocess # nosec B404
|
||||||
@@ -290,6 +301,7 @@ def collect(
|
|||||||
"""Stream Docker logs from all running decky service containers to a log file."""
|
"""Stream Docker logs from all running decky service containers to a log file."""
|
||||||
import asyncio
|
import asyncio
|
||||||
from decnet.collector import log_collector_worker
|
from decnet.collector import log_collector_worker
|
||||||
|
log.info("collect command invoked log_file=%s", log_file)
|
||||||
console.print(f"[bold cyan]Collector starting[/] → {log_file}")
|
console.print(f"[bold cyan]Collector starting[/] → {log_file}")
|
||||||
asyncio.run(log_collector_worker(log_file))
|
asyncio.run(log_collector_worker(log_file))
|
||||||
|
|
||||||
@@ -322,6 +334,7 @@ def mutate(
|
|||||||
@app.command()
|
@app.command()
|
||||||
def status() -> None:
|
def status() -> None:
|
||||||
"""Show running deckies and their status."""
|
"""Show running deckies and their status."""
|
||||||
|
log.info("status command invoked")
|
||||||
from decnet.engine import status as _status
|
from decnet.engine import status as _status
|
||||||
_status()
|
_status()
|
||||||
|
|
||||||
@@ -336,8 +349,10 @@ def teardown(
|
|||||||
console.print("[red]Specify --all or --id <name>.[/]")
|
console.print("[red]Specify --all or --id <name>.[/]")
|
||||||
raise typer.Exit(1)
|
raise typer.Exit(1)
|
||||||
|
|
||||||
|
log.info("teardown command invoked all=%s id=%s", all_, id_)
|
||||||
from decnet.engine import teardown as _teardown
|
from decnet.engine import teardown as _teardown
|
||||||
_teardown(decky_id=id_)
|
_teardown(decky_id=id_)
|
||||||
|
log.info("teardown complete all=%s id=%s", all_, id_)
|
||||||
|
|
||||||
if all_:
|
if all_:
|
||||||
_kill_api()
|
_kill_api()
|
||||||
|
|||||||
@@ -8,13 +8,14 @@ The ingester tails the .json file; rsyslog can consume the .log file independent
|
|||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import logging
|
|
||||||
import re
|
import re
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Optional
|
from typing import Any, Optional
|
||||||
|
|
||||||
logger = logging.getLogger("decnet.collector")
|
from decnet.logging import get_logger
|
||||||
|
|
||||||
|
logger = get_logger("collector")
|
||||||
|
|
||||||
# ─── RFC 5424 parser ──────────────────────────────────────────────────────────
|
# ─── RFC 5424 parser ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
@@ -139,10 +140,13 @@ def _stream_container(container_id: str, log_path: Path, json_path: Path) -> Non
|
|||||||
lf.flush()
|
lf.flush()
|
||||||
parsed = parse_rfc5424(line)
|
parsed = parse_rfc5424(line)
|
||||||
if parsed:
|
if parsed:
|
||||||
|
logger.debug("collector: event written decky=%s type=%s", parsed.get("decky"), parsed.get("event_type"))
|
||||||
jf.write(json.dumps(parsed) + "\n")
|
jf.write(json.dumps(parsed) + "\n")
|
||||||
jf.flush()
|
jf.flush()
|
||||||
|
else:
|
||||||
|
logger.debug("collector: malformed RFC5424 line snippet=%r", line[:80])
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.debug("Log stream ended for container %s: %s", container_id, exc)
|
logger.debug("collector: log stream ended container_id=%s reason=%s", container_id, exc)
|
||||||
|
|
||||||
|
|
||||||
# ─── Async collector ──────────────────────────────────────────────────────────
|
# ─── Async collector ──────────────────────────────────────────────────────────
|
||||||
@@ -170,9 +174,10 @@ async def log_collector_worker(log_file: str) -> None:
|
|||||||
asyncio.to_thread(_stream_container, container_id, log_path, json_path),
|
asyncio.to_thread(_stream_container, container_id, log_path, json_path),
|
||||||
loop=loop,
|
loop=loop,
|
||||||
)
|
)
|
||||||
logger.info("Collecting logs from container: %s", container_name)
|
logger.info("collector: streaming container=%s", container_name)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
logger.info("collector started log_path=%s", log_path)
|
||||||
client = docker.from_env()
|
client = docker.from_env()
|
||||||
|
|
||||||
for container in client.containers.list():
|
for container in client.containers.list():
|
||||||
@@ -193,8 +198,9 @@ async def log_collector_worker(log_file: str) -> None:
|
|||||||
await asyncio.to_thread(_watch_events)
|
await asyncio.to_thread(_watch_events)
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
|
logger.info("collector shutdown requested cancelling %d tasks", len(active))
|
||||||
for task in active.values():
|
for task in active.values():
|
||||||
task.cancel()
|
task.cancel()
|
||||||
raise
|
raise
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.error("Collector error: %s", exc)
|
logger.error("collector error: %s", exc)
|
||||||
|
|||||||
@@ -48,8 +48,9 @@ class Rfc5424Formatter(logging.Formatter):
|
|||||||
msg = record.getMessage()
|
msg = record.getMessage()
|
||||||
if record.exc_info:
|
if record.exc_info:
|
||||||
msg += "\n" + self.formatException(record.exc_info)
|
msg += "\n" + self.formatException(record.exc_info)
|
||||||
|
app = getattr(record, "decnet_component", self._app)
|
||||||
return (
|
return (
|
||||||
f"<{prival}>1 {ts} {self._hostname} {self._app}"
|
f"<{prival}>1 {ts} {self._hostname} {app}"
|
||||||
f" {os.getpid()} {record.name} - {msg}"
|
f" {os.getpid()} {record.name} - {msg}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import docker
|
|||||||
from rich.console import Console
|
from rich.console import Console
|
||||||
from rich.table import Table
|
from rich.table import Table
|
||||||
|
|
||||||
|
from decnet.logging import get_logger
|
||||||
from decnet.config import DecnetConfig, clear_state, load_state, save_state
|
from decnet.config import DecnetConfig, clear_state, load_state, save_state
|
||||||
from decnet.composer import write_compose
|
from decnet.composer import write_compose
|
||||||
from decnet.network import (
|
from decnet.network import (
|
||||||
@@ -26,6 +27,7 @@ from decnet.network import (
|
|||||||
teardown_host_macvlan,
|
teardown_host_macvlan,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
log = get_logger("engine")
|
||||||
console = Console()
|
console = Console()
|
||||||
COMPOSE_FILE = Path("decnet-compose.yml")
|
COMPOSE_FILE = Path("decnet-compose.yml")
|
||||||
_CANONICAL_LOGGING = Path(__file__).parent.parent.parent / "templates" / "decnet_logging.py"
|
_CANONICAL_LOGGING = Path(__file__).parent.parent.parent / "templates" / "decnet_logging.py"
|
||||||
@@ -106,11 +108,14 @@ def _compose_with_retry(
|
|||||||
|
|
||||||
|
|
||||||
def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, parallel: bool = False) -> None:
|
def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False, parallel: bool = False) -> None:
|
||||||
|
log.info("deployment started n_deckies=%d interface=%s subnet=%s dry_run=%s", len(config.deckies), config.interface, config.subnet, dry_run)
|
||||||
|
log.debug("deploy: deckies=%s", [d.name for d in config.deckies])
|
||||||
client = docker.from_env()
|
client = docker.from_env()
|
||||||
|
|
||||||
ip_list = [d.ip for d in config.deckies]
|
ip_list = [d.ip for d in config.deckies]
|
||||||
decky_range = ips_to_range(ip_list)
|
decky_range = ips_to_range(ip_list)
|
||||||
host_ip = get_host_ip(config.interface)
|
host_ip = get_host_ip(config.interface)
|
||||||
|
log.debug("deploy: ip_range=%s host_ip=%s", decky_range, host_ip)
|
||||||
|
|
||||||
net_driver = "IPvlan L2" if config.ipvlan else "MACVLAN"
|
net_driver = "IPvlan L2" if config.ipvlan else "MACVLAN"
|
||||||
console.print(f"[bold cyan]Creating {net_driver} network[/] ({MACVLAN_NETWORK_NAME}) on {config.interface}")
|
console.print(f"[bold cyan]Creating {net_driver} network[/] ({MACVLAN_NETWORK_NAME}) on {config.interface}")
|
||||||
@@ -140,6 +145,7 @@ def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False,
|
|||||||
console.print(f"[bold cyan]Compose file written[/] → {compose_path}")
|
console.print(f"[bold cyan]Compose file written[/] → {compose_path}")
|
||||||
|
|
||||||
if dry_run:
|
if dry_run:
|
||||||
|
log.info("deployment dry-run complete compose_path=%s", compose_path)
|
||||||
console.print("[yellow]Dry run — no containers started.[/]")
|
console.print("[yellow]Dry run — no containers started.[/]")
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -161,12 +167,15 @@ def deploy(config: DecnetConfig, dry_run: bool = False, no_cache: bool = False,
|
|||||||
_compose_with_retry("build", "--no-cache", compose_file=compose_path)
|
_compose_with_retry("build", "--no-cache", compose_file=compose_path)
|
||||||
_compose_with_retry("up", "--build", "-d", compose_file=compose_path)
|
_compose_with_retry("up", "--build", "-d", compose_file=compose_path)
|
||||||
|
|
||||||
|
log.info("deployment complete n_deckies=%d", len(config.deckies))
|
||||||
_print_status(config)
|
_print_status(config)
|
||||||
|
|
||||||
|
|
||||||
def teardown(decky_id: str | None = None) -> None:
|
def teardown(decky_id: str | None = None) -> None:
|
||||||
|
log.info("teardown requested decky_id=%s", decky_id or "all")
|
||||||
state = load_state()
|
state = load_state()
|
||||||
if state is None:
|
if state is None:
|
||||||
|
log.warning("teardown: no active deployment found")
|
||||||
console.print("[red]No active deployment found (no decnet-state.json).[/]")
|
console.print("[red]No active deployment found (no decnet-state.json).[/]")
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -193,6 +202,7 @@ def teardown(decky_id: str | None = None) -> None:
|
|||||||
clear_state()
|
clear_state()
|
||||||
|
|
||||||
net_driver = "IPvlan" if config.ipvlan else "MACVLAN"
|
net_driver = "IPvlan" if config.ipvlan else "MACVLAN"
|
||||||
|
log.info("teardown complete all deckies removed network_driver=%s", net_driver)
|
||||||
console.print(f"[green]All deckies torn down. {net_driver} network removed.[/]")
|
console.print(f"[green]All deckies torn down. {net_driver} network removed.[/]")
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,42 @@
|
|||||||
|
"""
|
||||||
|
DECNET application logging helpers.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
from decnet.logging import get_logger
|
||||||
|
log = get_logger("engine") # APP-NAME in RFC 5424 output becomes "engine"
|
||||||
|
|
||||||
|
The returned logger propagates to the root logger (configured in config.py with
|
||||||
|
Rfc5424Formatter), so level control via DECNET_DEVELOPER still applies globally.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
class _ComponentFilter(logging.Filter):
|
||||||
|
"""Injects *decnet_component* onto every LogRecord so Rfc5424Formatter can
|
||||||
|
use it as the RFC 5424 APP-NAME field instead of the hardcoded "decnet"."""
|
||||||
|
|
||||||
|
def __init__(self, component: str) -> None:
|
||||||
|
super().__init__()
|
||||||
|
self.component = component
|
||||||
|
|
||||||
|
def filter(self, record: logging.LogRecord) -> bool:
|
||||||
|
record.decnet_component = self.component # type: ignore[attr-defined]
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def get_logger(component: str) -> logging.Logger:
|
||||||
|
"""Return a named logger that self-identifies as *component* in RFC 5424.
|
||||||
|
|
||||||
|
Valid components: cli, engine, api, mutator, collector.
|
||||||
|
|
||||||
|
The logger is named ``decnet.<component>`` and propagates normally, so the
|
||||||
|
root handler (Rfc5424Formatter + level gate from DECNET_DEVELOPER) handles
|
||||||
|
output. Calling this function multiple times for the same component is safe.
|
||||||
|
"""
|
||||||
|
logger = logging.getLogger(f"decnet.{component}")
|
||||||
|
if not any(isinstance(f, _ComponentFilter) for f in logger.filters):
|
||||||
|
logger.addFilter(_ComponentFilter(component))
|
||||||
|
return logger
|
||||||
|
|||||||
@@ -14,12 +14,14 @@ from decnet.fleet import all_service_names
|
|||||||
from decnet.composer import write_compose
|
from decnet.composer import write_compose
|
||||||
from decnet.config import DeckyConfig, DecnetConfig
|
from decnet.config import DeckyConfig, DecnetConfig
|
||||||
from decnet.engine import _compose_with_retry
|
from decnet.engine import _compose_with_retry
|
||||||
|
from decnet.logging import get_logger
|
||||||
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import anyio
|
import anyio
|
||||||
import asyncio
|
import asyncio
|
||||||
from decnet.web.db.repository import BaseRepository
|
from decnet.web.db.repository import BaseRepository
|
||||||
|
|
||||||
|
log = get_logger("mutator")
|
||||||
console = Console()
|
console = Console()
|
||||||
|
|
||||||
|
|
||||||
@@ -28,8 +30,10 @@ async def mutate_decky(decky_name: str, repo: BaseRepository) -> bool:
|
|||||||
Perform an Intra-Archetype Shuffle for a specific decky.
|
Perform an Intra-Archetype Shuffle for a specific decky.
|
||||||
Returns True if mutation succeeded, False otherwise.
|
Returns True if mutation succeeded, False otherwise.
|
||||||
"""
|
"""
|
||||||
|
log.debug("mutate_decky: start decky=%s", decky_name)
|
||||||
state_dict = await repo.get_state("deployment")
|
state_dict = await repo.get_state("deployment")
|
||||||
if state_dict is None:
|
if state_dict is None:
|
||||||
|
log.error("mutate_decky: no active deployment found in database")
|
||||||
console.print("[red]No active deployment found in database.[/]")
|
console.print("[red]No active deployment found in database.[/]")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@@ -73,12 +77,14 @@ async def mutate_decky(decky_name: str, repo: BaseRepository) -> bool:
|
|||||||
# Still writes files for Docker to use
|
# Still writes files for Docker to use
|
||||||
write_compose(config, compose_path)
|
write_compose(config, compose_path)
|
||||||
|
|
||||||
|
log.info("mutation applied decky=%s services=%s", decky_name, ",".join(decky.services))
|
||||||
console.print(f"[cyan]Mutating '{decky_name}' to services: {', '.join(decky.services)}[/]")
|
console.print(f"[cyan]Mutating '{decky_name}' to services: {', '.join(decky.services)}[/]")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Wrap blocking call in thread
|
# Wrap blocking call in thread
|
||||||
await anyio.to_thread.run_sync(_compose_with_retry, "up", "-d", "--remove-orphans", compose_path)
|
await anyio.to_thread.run_sync(_compose_with_retry, "up", "-d", "--remove-orphans", compose_path)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
log.error("mutation failed decky=%s error=%s", decky_name, e)
|
||||||
console.print(f"[red]Failed to mutate '{decky_name}': {e}[/]")
|
console.print(f"[red]Failed to mutate '{decky_name}': {e}[/]")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@@ -90,8 +96,10 @@ async def mutate_all(repo: BaseRepository, force: bool = False) -> None:
|
|||||||
Check all deckies and mutate those that are due.
|
Check all deckies and mutate those that are due.
|
||||||
If force=True, mutates all deckies regardless of schedule.
|
If force=True, mutates all deckies regardless of schedule.
|
||||||
"""
|
"""
|
||||||
|
log.debug("mutate_all: start force=%s", force)
|
||||||
state_dict = await repo.get_state("deployment")
|
state_dict = await repo.get_state("deployment")
|
||||||
if state_dict is None:
|
if state_dict is None:
|
||||||
|
log.error("mutate_all: no active deployment found")
|
||||||
console.print("[red]No active deployment found.[/]")
|
console.print("[red]No active deployment found.[/]")
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -116,15 +124,20 @@ async def mutate_all(repo: BaseRepository, force: bool = False) -> None:
|
|||||||
mutated_count += 1
|
mutated_count += 1
|
||||||
|
|
||||||
if mutated_count == 0 and not force:
|
if mutated_count == 0 and not force:
|
||||||
|
log.debug("mutate_all: no deckies due for mutation")
|
||||||
console.print("[dim]No deckies are due for mutation.[/]")
|
console.print("[dim]No deckies are due for mutation.[/]")
|
||||||
|
else:
|
||||||
|
log.info("mutate_all: complete mutated_count=%d", mutated_count)
|
||||||
|
|
||||||
|
|
||||||
async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> None:
|
async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> None:
|
||||||
"""Run an infinite loop checking for deckies that need mutation."""
|
"""Run an infinite loop checking for deckies that need mutation."""
|
||||||
|
log.info("mutator watch loop started poll_interval_secs=%d", poll_interval_secs)
|
||||||
console.print(f"[green]DECNET Mutator Watcher started (polling every {poll_interval_secs}s).[/]")
|
console.print(f"[green]DECNET Mutator Watcher started (polling every {poll_interval_secs}s).[/]")
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
await mutate_all(force=False, repo=repo)
|
await mutate_all(force=False, repo=repo)
|
||||||
await asyncio.sleep(poll_interval_secs)
|
await asyncio.sleep(poll_interval_secs)
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
|
log.info("mutator watch loop stopped")
|
||||||
console.print("\n[dim]Mutator watcher stopped.[/]")
|
console.print("\n[dim]Mutator watcher stopped.[/]")
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
|
||||||
import os
|
import os
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from typing import Any, AsyncGenerator, Optional
|
from typing import Any, AsyncGenerator, Optional
|
||||||
@@ -11,12 +10,13 @@ from pydantic import ValidationError
|
|||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
|
||||||
from decnet.env import DECNET_CORS_ORIGINS, DECNET_DEVELOPER, DECNET_INGEST_LOG_FILE
|
from decnet.env import DECNET_CORS_ORIGINS, DECNET_DEVELOPER, DECNET_INGEST_LOG_FILE
|
||||||
|
from decnet.logging import get_logger
|
||||||
from decnet.web.dependencies import repo
|
from decnet.web.dependencies import repo
|
||||||
from decnet.collector import log_collector_worker
|
from decnet.collector import log_collector_worker
|
||||||
from decnet.web.ingester import log_ingestion_worker
|
from decnet.web.ingester import log_ingestion_worker
|
||||||
from decnet.web.router import api_router
|
from decnet.web.router import api_router
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = get_logger("api")
|
||||||
ingestion_task: Optional[asyncio.Task[Any]] = None
|
ingestion_task: Optional[asyncio.Task[Any]] = None
|
||||||
collector_task: Optional[asyncio.Task[Any]] = None
|
collector_task: Optional[asyncio.Task[Any]] = None
|
||||||
|
|
||||||
@@ -25,9 +25,11 @@ collector_task: Optional[asyncio.Task[Any]] = None
|
|||||||
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
||||||
global ingestion_task, collector_task
|
global ingestion_task, collector_task
|
||||||
|
|
||||||
|
log.info("API startup initialising database")
|
||||||
for attempt in range(1, 6):
|
for attempt in range(1, 6):
|
||||||
try:
|
try:
|
||||||
await repo.initialize()
|
await repo.initialize()
|
||||||
|
log.debug("API startup DB initialised attempt=%d", attempt)
|
||||||
break
|
break
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
log.warning("DB init attempt %d/5 failed: %s", attempt, exc)
|
log.warning("DB init attempt %d/5 failed: %s", attempt, exc)
|
||||||
@@ -40,11 +42,13 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
|||||||
# Start background ingestion task
|
# Start background ingestion task
|
||||||
if ingestion_task is None or ingestion_task.done():
|
if ingestion_task is None or ingestion_task.done():
|
||||||
ingestion_task = asyncio.create_task(log_ingestion_worker(repo))
|
ingestion_task = asyncio.create_task(log_ingestion_worker(repo))
|
||||||
|
log.debug("API startup ingest worker started")
|
||||||
|
|
||||||
# Start Docker log collector (writes to log file; ingester reads from it)
|
# Start Docker log collector (writes to log file; ingester reads from it)
|
||||||
_log_file = os.environ.get("DECNET_INGEST_LOG_FILE", DECNET_INGEST_LOG_FILE)
|
_log_file = os.environ.get("DECNET_INGEST_LOG_FILE", DECNET_INGEST_LOG_FILE)
|
||||||
if _log_file and (collector_task is None or collector_task.done()):
|
if _log_file and (collector_task is None or collector_task.done()):
|
||||||
collector_task = asyncio.create_task(log_collector_worker(_log_file))
|
collector_task = asyncio.create_task(log_collector_worker(_log_file))
|
||||||
|
log.debug("API startup collector worker started log_file=%s", _log_file)
|
||||||
elif not _log_file:
|
elif not _log_file:
|
||||||
log.warning("DECNET_INGEST_LOG_FILE not set — Docker log collection disabled.")
|
log.warning("DECNET_INGEST_LOG_FILE not set — Docker log collection disabled.")
|
||||||
else:
|
else:
|
||||||
@@ -52,7 +56,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
|||||||
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
# Shutdown background tasks
|
log.info("API shutdown cancelling background tasks")
|
||||||
for task in (ingestion_task, collector_task):
|
for task in (ingestion_task, collector_task):
|
||||||
if task and not task.done():
|
if task and not task.done():
|
||||||
task.cancel()
|
task.cancel()
|
||||||
@@ -62,6 +66,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
|
|||||||
pass
|
pass
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
log.warning("Task shutdown error: %s", exc)
|
log.warning("Task shutdown error: %s", exc)
|
||||||
|
log.info("API shutdown complete")
|
||||||
|
|
||||||
|
|
||||||
app: FastAPI = FastAPI(
|
app: FastAPI = FastAPI(
|
||||||
|
|||||||
@@ -1,13 +1,13 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import os
|
import os
|
||||||
import logging
|
|
||||||
import json
|
import json
|
||||||
from typing import Any
|
from typing import Any
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
from decnet.logging import get_logger
|
||||||
from decnet.web.db.repository import BaseRepository
|
from decnet.web.db.repository import BaseRepository
|
||||||
|
|
||||||
logger: logging.Logger = logging.getLogger("decnet.web.ingester")
|
logger = get_logger("api")
|
||||||
|
|
||||||
async def log_ingestion_worker(repo: BaseRepository) -> None:
|
async def log_ingestion_worker(repo: BaseRepository) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -22,7 +22,7 @@ async def log_ingestion_worker(repo: BaseRepository) -> None:
|
|||||||
_json_log_path: Path = Path(_base_log_file).with_suffix(".json")
|
_json_log_path: Path = Path(_base_log_file).with_suffix(".json")
|
||||||
_position: int = 0
|
_position: int = 0
|
||||||
|
|
||||||
logger.info(f"Starting JSON log ingestion from {_json_log_path}")
|
logger.info("ingest worker started path=%s", _json_log_path)
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
@@ -53,10 +53,11 @@ async def log_ingestion_worker(repo: BaseRepository) -> None:
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
_log_data: dict[str, Any] = json.loads(_line.strip())
|
_log_data: dict[str, Any] = json.loads(_line.strip())
|
||||||
|
logger.debug("ingest: record decky=%s event_type=%s", _log_data.get("decky"), _log_data.get("event_type"))
|
||||||
await repo.add_log(_log_data)
|
await repo.add_log(_log_data)
|
||||||
await _extract_bounty(repo, _log_data)
|
await _extract_bounty(repo, _log_data)
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
logger.error(f"Failed to decode JSON log line: {_line}")
|
logger.error("ingest: failed to decode JSON log line: %s", _line.strip())
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Update position after successful line read
|
# Update position after successful line read
|
||||||
@@ -65,10 +66,10 @@ async def log_ingestion_worker(repo: BaseRepository) -> None:
|
|||||||
except Exception as _e:
|
except Exception as _e:
|
||||||
_err_str = str(_e).lower()
|
_err_str = str(_e).lower()
|
||||||
if "no such table" in _err_str or "no active connection" in _err_str or "connection closed" in _err_str:
|
if "no such table" in _err_str or "no active connection" in _err_str or "connection closed" in _err_str:
|
||||||
logger.error(f"Post-shutdown or fatal DB error in ingester: {_e}")
|
logger.error("ingest: post-shutdown or fatal DB error: %s", _e)
|
||||||
break # Exit worker — DB is gone or uninitialized
|
break # Exit worker — DB is gone or uninitialized
|
||||||
|
|
||||||
logger.error(f"Error in log ingestion worker: {_e}")
|
logger.error("ingest: error in worker: %s", _e)
|
||||||
await asyncio.sleep(5)
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|||||||
@@ -1,9 +1,11 @@
|
|||||||
import logging
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from fastapi import APIRouter, Depends, HTTPException
|
from fastapi import APIRouter, Depends, HTTPException
|
||||||
|
|
||||||
from decnet.config import DEFAULT_MUTATE_INTERVAL, DecnetConfig, _ROOT, log
|
from decnet.logging import get_logger
|
||||||
|
from decnet.config import DEFAULT_MUTATE_INTERVAL, DecnetConfig, _ROOT
|
||||||
|
|
||||||
|
log = get_logger("api")
|
||||||
from decnet.engine import deploy as _deploy
|
from decnet.engine import deploy as _deploy
|
||||||
from decnet.ini_loader import load_ini_from_string
|
from decnet.ini_loader import load_ini_from_string
|
||||||
from decnet.network import detect_interface, detect_subnet, get_host_ip
|
from decnet.network import detect_interface, detect_subnet, get_host_ip
|
||||||
@@ -100,7 +102,7 @@ async def api_deploy_deckies(req: DeployIniRequest, current_user: str = Depends(
|
|||||||
}
|
}
|
||||||
await repo.set_state("deployment", new_state_payload)
|
await repo.set_state("deployment", new_state_payload)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logging.getLogger("decnet.web.api").exception("Deployment failed: %s", e)
|
log.exception("Deployment failed: %s", e)
|
||||||
raise HTTPException(status_code=500, detail="Deployment failed. Check server logs for details.")
|
raise HTTPException(status_code=500, detail="Deployment failed. Check server logs for details.")
|
||||||
|
|
||||||
return {"message": "Deckies deployed successfully"}
|
return {"message": "Deckies deployed successfully"}
|
||||||
|
|||||||
@@ -1,15 +1,15 @@
|
|||||||
import json
|
import json
|
||||||
import asyncio
|
import asyncio
|
||||||
import logging
|
|
||||||
from typing import AsyncGenerator, Optional
|
from typing import AsyncGenerator, Optional
|
||||||
|
|
||||||
from fastapi import APIRouter, Depends, Query, Request
|
from fastapi import APIRouter, Depends, Query, Request
|
||||||
from fastapi.responses import StreamingResponse
|
from fastapi.responses import StreamingResponse
|
||||||
|
|
||||||
from decnet.env import DECNET_DEVELOPER
|
from decnet.env import DECNET_DEVELOPER
|
||||||
|
from decnet.logging import get_logger
|
||||||
from decnet.web.dependencies import get_stream_user, repo
|
from decnet.web.dependencies import get_stream_user, repo
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
log = get_logger("api")
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
|
|||||||
155
tests/test_logging.py
Normal file
155
tests/test_logging.py
Normal file
@@ -0,0 +1,155 @@
|
|||||||
|
"""
|
||||||
|
Tests for DECNET application logging system.
|
||||||
|
|
||||||
|
Covers:
|
||||||
|
- get_logger() factory and _ComponentFilter injection
|
||||||
|
- Rfc5424Formatter component-aware APP-NAME field
|
||||||
|
- Log level gating via DECNET_DEVELOPER
|
||||||
|
- Component tags for all five microservice layers
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import re
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from decnet.logging import _ComponentFilter, get_logger
|
||||||
|
|
||||||
|
# RFC 5424 parser: <PRI>1 TIMESTAMP HOSTNAME APP-NAME PROCID MSGID SD MSG
|
||||||
|
_RFC5424_RE = re.compile(
|
||||||
|
r"^<(\d+)>1 " # PRI
|
||||||
|
r"\S+ " # TIMESTAMP
|
||||||
|
r"\S+ " # HOSTNAME
|
||||||
|
r"(\S+) " # APP-NAME ← what we care about
|
||||||
|
r"\S+ " # PROCID
|
||||||
|
r"(\S+) " # MSGID
|
||||||
|
r"(.+)$", # SD + MSG
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _format_record(logger: logging.Logger, level: int, msg: str) -> str:
|
||||||
|
"""Emit a log record through the root handler and return the formatted string."""
|
||||||
|
from decnet.config import Rfc5424Formatter
|
||||||
|
formatter = Rfc5424Formatter()
|
||||||
|
record = logger.makeRecord(
|
||||||
|
logger.name, level, "<test>", 0, msg, (), None
|
||||||
|
)
|
||||||
|
# Run all filters attached to the logger so decnet_component gets injected
|
||||||
|
for f in logger.filters:
|
||||||
|
f.filter(record)
|
||||||
|
return formatter.format(record)
|
||||||
|
|
||||||
|
|
||||||
|
class TestGetLogger:
|
||||||
|
def test_returns_logger(self):
|
||||||
|
log = get_logger("cli")
|
||||||
|
assert isinstance(log, logging.Logger)
|
||||||
|
|
||||||
|
def test_logger_name(self):
|
||||||
|
log = get_logger("engine")
|
||||||
|
assert log.name == "decnet.engine"
|
||||||
|
|
||||||
|
def test_filter_attached(self):
|
||||||
|
log = get_logger("api")
|
||||||
|
assert any(isinstance(f, _ComponentFilter) for f in log.filters)
|
||||||
|
|
||||||
|
def test_idempotent_filter(self):
|
||||||
|
log = get_logger("mutator")
|
||||||
|
get_logger("mutator") # second call
|
||||||
|
component_filters = [f for f in log.filters if isinstance(f, _ComponentFilter)]
|
||||||
|
assert len(component_filters) == 1
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("component", ["cli", "engine", "api", "mutator", "collector"])
|
||||||
|
def test_all_components_registered(self, component):
|
||||||
|
log = get_logger(component)
|
||||||
|
assert any(isinstance(f, _ComponentFilter) for f in log.filters)
|
||||||
|
|
||||||
|
|
||||||
|
class TestComponentFilter:
|
||||||
|
def test_injects_attribute(self):
|
||||||
|
f = _ComponentFilter("engine")
|
||||||
|
record = logging.LogRecord("test", logging.INFO, "", 0, "msg", (), None)
|
||||||
|
assert f.filter(record) is True
|
||||||
|
assert record.decnet_component == "engine" # type: ignore[attr-defined]
|
||||||
|
|
||||||
|
def test_always_passes(self):
|
||||||
|
f = _ComponentFilter("collector")
|
||||||
|
record = logging.LogRecord("test", logging.DEBUG, "", 0, "msg", (), None)
|
||||||
|
assert f.filter(record) is True
|
||||||
|
|
||||||
|
|
||||||
|
class TestRfc5424FormatterComponentAware:
|
||||||
|
@pytest.mark.parametrize("component", ["cli", "engine", "api", "mutator", "collector"])
|
||||||
|
def test_app_name_is_component(self, component):
|
||||||
|
log = get_logger(component)
|
||||||
|
line = _format_record(log, logging.INFO, "test message")
|
||||||
|
m = _RFC5424_RE.match(line)
|
||||||
|
assert m is not None, f"Not RFC 5424: {line!r}"
|
||||||
|
assert m.group(2) == component, f"Expected APP-NAME={component!r}, got {m.group(2)!r}"
|
||||||
|
|
||||||
|
def test_fallback_app_name_without_component(self):
|
||||||
|
"""Untagged loggers (no _ComponentFilter) fall back to 'decnet'."""
|
||||||
|
from decnet.config import Rfc5424Formatter
|
||||||
|
formatter = Rfc5424Formatter()
|
||||||
|
record = logging.LogRecord("some.module", logging.INFO, "", 0, "hello", (), None)
|
||||||
|
line = formatter.format(record)
|
||||||
|
m = _RFC5424_RE.match(line)
|
||||||
|
assert m is not None
|
||||||
|
assert m.group(2) == "decnet"
|
||||||
|
|
||||||
|
def test_msgid_is_logger_name(self):
|
||||||
|
log = get_logger("engine")
|
||||||
|
line = _format_record(log, logging.INFO, "deploying")
|
||||||
|
m = _RFC5424_RE.match(line)
|
||||||
|
assert m is not None
|
||||||
|
assert m.group(3) == "decnet.engine"
|
||||||
|
|
||||||
|
|
||||||
|
class TestLogLevelGating:
|
||||||
|
def test_configure_logging_normal_mode_sets_info(self):
|
||||||
|
"""_configure_logging(dev=False) must set root to INFO."""
|
||||||
|
from decnet.config import _configure_logging, Rfc5424Formatter
|
||||||
|
root = logging.getLogger()
|
||||||
|
original_level = root.level
|
||||||
|
original_handlers = root.handlers[:]
|
||||||
|
# Remove any existing RFC5424 handlers so idempotency check doesn't skip
|
||||||
|
root.handlers = [
|
||||||
|
h for h in root.handlers
|
||||||
|
if not (isinstance(h, logging.StreamHandler) and isinstance(h.formatter, Rfc5424Formatter))
|
||||||
|
]
|
||||||
|
try:
|
||||||
|
_configure_logging(dev=False)
|
||||||
|
assert root.level == logging.INFO
|
||||||
|
finally:
|
||||||
|
root.setLevel(original_level)
|
||||||
|
root.handlers = original_handlers
|
||||||
|
|
||||||
|
def test_configure_logging_dev_mode_sets_debug(self):
|
||||||
|
"""_configure_logging(dev=True) must set root to DEBUG."""
|
||||||
|
from decnet.config import _configure_logging, Rfc5424Formatter
|
||||||
|
root = logging.getLogger()
|
||||||
|
original_level = root.level
|
||||||
|
original_handlers = root.handlers[:]
|
||||||
|
root.handlers = [
|
||||||
|
h for h in root.handlers
|
||||||
|
if not (isinstance(h, logging.StreamHandler) and isinstance(h.formatter, Rfc5424Formatter))
|
||||||
|
]
|
||||||
|
try:
|
||||||
|
_configure_logging(dev=True)
|
||||||
|
assert root.level == logging.DEBUG
|
||||||
|
finally:
|
||||||
|
root.setLevel(original_level)
|
||||||
|
root.handlers = original_handlers
|
||||||
|
|
||||||
|
def test_debug_enabled_in_developer_mode(self, monkeypatch):
|
||||||
|
"""Programmatically setting DEBUG on root allows debug records through."""
|
||||||
|
root = logging.getLogger()
|
||||||
|
original_level = root.level
|
||||||
|
root.setLevel(logging.DEBUG)
|
||||||
|
try:
|
||||||
|
assert root.isEnabledFor(logging.DEBUG)
|
||||||
|
finally:
|
||||||
|
root.setLevel(original_level)
|
||||||
Reference in New Issue
Block a user