From 70d8ffc6070554a52a76e492d4a20f26f3c5c6ef Mon Sep 17 00:00:00 2001 From: anti Date: Thu, 16 Apr 2026 00:58:08 -0400 Subject: [PATCH] feat: complete OTEL tracing across all services with pipeline bridge and docs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extends tracing to every remaining module: all 23 API route handlers, correlation engine, sniffer (fingerprint/p0f/syslog), prober (jarm/hassh/tcpfp), profiler behavioral analysis, logging subsystem, engine, and mutator. Bridges the ingester→SSE trace gap by persisting trace_id/span_id columns on the logs table and creating OTEL span links in the SSE endpoint. Adds log-trace correlation via _TraceContextFilter injecting otel_trace_id into Python LogRecords. Includes development/docs/TRACING.md with full span reference (76 spans), pipeline propagation architecture, quick start guide, and troubleshooting. --- decnet/correlation/engine.py | 10 + decnet/engine/deployer.py | 1 + decnet/logging/__init__.py | 50 ++++ decnet/logging/file_handler.py | 14 +- decnet/logging/forwarder.py | 3 + decnet/mutator/engine.py | 1 + decnet/prober/hassh.py | 6 +- decnet/prober/jarm.py | 6 +- decnet/prober/tcpfp.py | 4 + decnet/prober/worker.py | 2 + decnet/profiler/behavioral.py | 16 ++ decnet/sniffer/fingerprint.py | 266 ++++++++++-------- decnet/sniffer/p0f.py | 3 + decnet/sniffer/syslog.py | 2 + decnet/sniffer/worker.py | 1 + decnet/telemetry.py | 4 +- decnet/web/db/models.py | 5 + decnet/web/ingester.py | 8 + .../attackers/api_get_attacker_commands.py | 2 + .../attackers/api_get_attacker_detail.py | 2 + .../web/router/attackers/api_get_attackers.py | 2 + decnet/web/router/auth/api_change_pass.py | 2 + decnet/web/router/auth/api_login.py | 2 + decnet/web/router/bounty/api_get_bounties.py | 2 + decnet/web/router/config/api_get_config.py | 2 + decnet/web/router/config/api_manage_users.py | 5 + decnet/web/router/config/api_reinit.py | 2 + decnet/web/router/config/api_update_config.py | 3 + decnet/web/router/fleet/api_deploy_deckies.py | 2 + decnet/web/router/fleet/api_get_deckies.py | 2 + decnet/web/router/fleet/api_mutate_decky.py | 2 + .../web/router/fleet/api_mutate_interval.py | 2 + decnet/web/router/health/api_get_health.py | 2 + decnet/web/router/logs/api_get_histogram.py | 2 + decnet/web/router/logs/api_get_logs.py | 2 + decnet/web/router/stats/api_get_stats.py | 2 + decnet/web/router/stream/api_stream_events.py | 40 ++- development/docs/TRACING.md | 219 ++++++++++++++ 38 files changed, 577 insertions(+), 124 deletions(-) create mode 100644 development/docs/TRACING.md diff --git a/decnet/correlation/engine.py b/decnet/correlation/engine.py index 1f9f748..198d544 100644 --- a/decnet/correlation/engine.py +++ b/decnet/correlation/engine.py @@ -33,6 +33,7 @@ from decnet.logging.syslog_formatter import ( SEVERITY_WARNING, format_rfc5424, ) +from decnet.telemetry import traced as _traced, get_tracer as _get_tracer class CorrelationEngine: @@ -64,6 +65,7 @@ class CorrelationEngine: self.events_indexed += 1 return event + @_traced("correlation.ingest_file") def ingest_file(self, path: Path) -> int: """ Parse every line of *path* and index it. @@ -73,12 +75,18 @@ class CorrelationEngine: with open(path) as fh: for line in fh: self.ingest(line) + _tracer = _get_tracer("correlation") + with _tracer.start_as_current_span("correlation.ingest_file.summary") as _span: + _span.set_attribute("lines_parsed", self.lines_parsed) + _span.set_attribute("events_indexed", self.events_indexed) + _span.set_attribute("unique_ips", len(self._events)) return self.events_indexed # ------------------------------------------------------------------ # # Query # # ------------------------------------------------------------------ # + @_traced("correlation.traversals") def traversals(self, min_deckies: int = 2) -> list[AttackerTraversal]: """ Return all attackers that touched at least *min_deckies* distinct @@ -135,6 +143,7 @@ class CorrelationEngine: ) return table + @_traced("correlation.report_json") def report_json(self, min_deckies: int = 2) -> dict: """Serialisable dict representation of all traversals.""" return { @@ -147,6 +156,7 @@ class CorrelationEngine: "traversals": [t.to_dict() for t in self.traversals(min_deckies)], } + @_traced("correlation.traversal_syslog_lines") def traversal_syslog_lines(self, min_deckies: int = 2) -> list[str]: """ Emit one RFC 5424 syslog line per detected traversal. diff --git a/decnet/engine/deployer.py b/decnet/engine/deployer.py index 70c2357..d468b0a 100644 --- a/decnet/engine/deployer.py +++ b/decnet/engine/deployer.py @@ -68,6 +68,7 @@ _PERMANENT_ERRORS = ( ) +@_traced("engine.compose_with_retry") def _compose_with_retry( *args: str, compose_file: Path = COMPOSE_FILE, diff --git a/decnet/logging/__init__.py b/decnet/logging/__init__.py index ad716e7..73f6102 100644 --- a/decnet/logging/__init__.py +++ b/decnet/logging/__init__.py @@ -7,6 +7,11 @@ Usage: The returned logger propagates to the root logger (configured in config.py with Rfc5424Formatter), so level control via DECNET_DEVELOPER still applies globally. + +When ``DECNET_DEVELOPER_TRACING`` is active, every LogRecord is enriched with +``otel_trace_id`` and ``otel_span_id`` from the current OTEL span context. +This lets you correlate log lines with Jaeger traces — click a log entry and +jump straight to the span that produced it. """ from __future__ import annotations @@ -27,6 +32,51 @@ class _ComponentFilter(logging.Filter): return True +class _TraceContextFilter(logging.Filter): + """Injects ``otel_trace_id`` and ``otel_span_id`` onto every LogRecord + from the active OTEL span context. + + Installed once by ``enable_trace_context()`` on the root ``decnet`` logger + so all child loggers inherit the enrichment via propagation. + + When no span is active, both fields are set to ``"0"`` (cheap string + comparison downstream, no None-checks needed). + """ + + def filter(self, record: logging.LogRecord) -> bool: + try: + from opentelemetry import trace + span = trace.get_current_span() + ctx = span.get_span_context() + if ctx and ctx.trace_id: + record.otel_trace_id = format(ctx.trace_id, "032x") # type: ignore[attr-defined] + record.otel_span_id = format(ctx.span_id, "016x") # type: ignore[attr-defined] + else: + record.otel_trace_id = "0" # type: ignore[attr-defined] + record.otel_span_id = "0" # type: ignore[attr-defined] + except Exception: + record.otel_trace_id = "0" # type: ignore[attr-defined] + record.otel_span_id = "0" # type: ignore[attr-defined] + return True + + +_trace_filter_installed: bool = False + + +def enable_trace_context() -> None: + """Install the OTEL trace-context filter on the root ``decnet`` logger. + + Called once from ``decnet.telemetry.setup_tracing()`` after the + TracerProvider is initialised. Safe to call multiple times (idempotent). + """ + global _trace_filter_installed + if _trace_filter_installed: + return + root = logging.getLogger("decnet") + root.addFilter(_TraceContextFilter()) + _trace_filter_installed = True + + def get_logger(component: str) -> logging.Logger: """Return a named logger that self-identifies as *component* in RFC 5424. diff --git a/decnet/logging/file_handler.py b/decnet/logging/file_handler.py index 50a83d1..635959f 100644 --- a/decnet/logging/file_handler.py +++ b/decnet/logging/file_handler.py @@ -13,6 +13,8 @@ import logging.handlers import os from pathlib import Path +from decnet.telemetry import traced as _traced + _LOG_FILE_ENV = "DECNET_LOG_FILE" _DEFAULT_LOG_FILE = "/var/log/decnet/decnet.log" _MAX_BYTES = 10 * 1024 * 1024 # 10 MB @@ -22,10 +24,10 @@ _handler: logging.handlers.RotatingFileHandler | None = None _logger: logging.Logger | None = None -def _get_logger() -> logging.Logger: +@_traced("logging.init_file_handler") +def _init_file_handler() -> logging.Logger: + """One-time initialisation of the rotating file handler.""" global _handler, _logger - if _logger is not None: - return _logger log_path = Path(os.environ.get(_LOG_FILE_ENV, _DEFAULT_LOG_FILE)) log_path.parent.mkdir(parents=True, exist_ok=True) @@ -46,6 +48,12 @@ def _get_logger() -> logging.Logger: return _logger +def _get_logger() -> logging.Logger: + if _logger is not None: + return _logger + return _init_file_handler() + + def write_syslog(line: str) -> None: """Write a single RFC 5424 syslog line to the rotating log file.""" try: diff --git a/decnet/logging/forwarder.py b/decnet/logging/forwarder.py index 9ddbd07..31b0e1e 100644 --- a/decnet/logging/forwarder.py +++ b/decnet/logging/forwarder.py @@ -11,6 +11,8 @@ shared utilities for validating and parsing the log_target string. import socket +from decnet.telemetry import traced as _traced + def parse_log_target(log_target: str) -> tuple[str, int]: """ @@ -23,6 +25,7 @@ def parse_log_target(log_target: str) -> tuple[str, int]: return parts[0], int(parts[1]) +@_traced("logging.probe_log_target") def probe_log_target(log_target: str, timeout: float = 2.0) -> bool: """ Return True if the log target is reachable (TCP connect succeeds). diff --git a/decnet/mutator/engine.py b/decnet/mutator/engine.py index d011b19..0e4a925 100644 --- a/decnet/mutator/engine.py +++ b/decnet/mutator/engine.py @@ -133,6 +133,7 @@ async def mutate_all(repo: BaseRepository, force: bool = False) -> None: log.info("mutate_all: complete mutated_count=%d", mutated_count) +@_traced("mutator.watch_loop") async def run_watch_loop(repo: BaseRepository, poll_interval_secs: int = 10) -> None: """Run an infinite loop checking for deckies that need mutation.""" log.info("mutator watch loop started poll_interval_secs=%d", poll_interval_secs) diff --git a/decnet/prober/hassh.py b/decnet/prober/hassh.py index de2e19e..36ecaa1 100644 --- a/decnet/prober/hassh.py +++ b/decnet/prober/hassh.py @@ -9,7 +9,7 @@ This is the *server* variant of HASSH (HASSHServer). It fingerprints what the server *offers*, which identifies the SSH implementation (OpenSSH, Paramiko, libssh, Cobalt Strike SSH, etc.). -Stdlib only (socket, struct, hashlib). No DECNET imports. +Stdlib only (socket, struct, hashlib) plus decnet.telemetry for tracing (zero-cost when disabled). """ from __future__ import annotations @@ -19,6 +19,8 @@ import socket import struct from typing import Any +from decnet.telemetry import traced as _traced + # SSH protocol constants _SSH_MSG_KEXINIT = 20 _KEX_INIT_COOKIE_LEN = 16 @@ -36,6 +38,7 @@ _MAX_PACKET_LEN = 35000 # ─── SSH connection + KEX_INIT capture ────────────────────────────────────── +@_traced("prober.hassh_ssh_connect") def _ssh_connect( host: str, port: int, @@ -213,6 +216,7 @@ def _compute_hassh(kex: str, enc: str, mac: str, comp: str) -> str: # ─── Public API ───────────────────────────────────────────────────────────── +@_traced("prober.hassh_server") def hassh_server( host: str, port: int, diff --git a/decnet/prober/jarm.py b/decnet/prober/jarm.py index 54807e3..7cd1502 100644 --- a/decnet/prober/jarm.py +++ b/decnet/prober/jarm.py @@ -8,7 +8,7 @@ fingerprint that identifies the TLS server implementation. Reference: https://github.com/salesforce/jarm -No DECNET imports — this module is self-contained and testable in isolation. +Only DECNET import is decnet.telemetry for tracing (zero-cost when disabled). """ from __future__ import annotations @@ -19,6 +19,8 @@ import struct import time from typing import Any +from decnet.telemetry import traced as _traced + # ─── Constants ──────────────────────────────────────────────────────────────── JARM_EMPTY_HASH = "0" * 62 @@ -379,6 +381,7 @@ def _version_to_str(version: int) -> str: # ─── Probe sender ──────────────────────────────────────────────────────────── +@_traced("prober.jarm_send_probe") def _send_probe(host: str, port: int, hello: bytes, timeout: float = 5.0) -> bytes | None: """ Open a TCP connection, send the ClientHello, and read the ServerHello. @@ -471,6 +474,7 @@ def _compute_jarm(responses: list[str]) -> str: # ─── Public API ────────────────────────────────────────────────────────────── +@_traced("prober.jarm_hash") def jarm_hash(host: str, port: int, timeout: float = 5.0) -> str: """ Compute the JARM fingerprint for a TLS server. diff --git a/decnet/prober/tcpfp.py b/decnet/prober/tcpfp.py index 8044c63..37737b0 100644 --- a/decnet/prober/tcpfp.py +++ b/decnet/prober/tcpfp.py @@ -15,6 +15,8 @@ import hashlib import random from typing import Any +from decnet.telemetry import traced as _traced + # Lazy-import scapy to avoid breaking non-root usage of HASSH/JARM. # The actual import happens inside functions that need it. @@ -36,6 +38,7 @@ _OPT_CODES: dict[str, str] = { # ─── Packet construction ─────────────────────────────────────────────────── +@_traced("prober.tcpfp_send_syn") def _send_syn( host: str, port: int, @@ -196,6 +199,7 @@ def _compute_fingerprint(fields: dict[str, Any]) -> tuple[str, str]: # ─── Public API ───────────────────────────────────────────────────────────── +@_traced("prober.tcp_fingerprint") def tcp_fingerprint( host: str, port: int, diff --git a/decnet/prober/worker.py b/decnet/prober/worker.py index 48cc58e..face17b 100644 --- a/decnet/prober/worker.py +++ b/decnet/prober/worker.py @@ -169,6 +169,7 @@ def _write_event( # ─── Target discovery from log stream ──────────────────────────────────────── +@_traced("prober.discover_attackers") def _discover_attackers(json_path: Path, position: int) -> tuple[set[str], int]: """ Read new JSON log lines from the given position and extract unique @@ -399,6 +400,7 @@ def _tcpfp_phase( # ─── Main worker ───────────────────────────────────────────────────────────── +@_traced("prober.worker") async def prober_worker( log_file: str, interval: int = 300, diff --git a/decnet/profiler/behavioral.py b/decnet/profiler/behavioral.py index 7e440db..757b997 100644 --- a/decnet/profiler/behavioral.py +++ b/decnet/profiler/behavioral.py @@ -31,6 +31,7 @@ from collections import Counter from typing import Any from decnet.correlation.parser import LogEvent +from decnet.telemetry import traced as _traced, get_tracer as _get_tracer # ─── Event-type taxonomy ──────────────────────────────────────────────────── @@ -147,6 +148,7 @@ def _os_from_ttl(ttl_str: str | None) -> str | None: # ─── Timing stats ─────────────────────────────────────────────────────────── +@_traced("profiler.timing_stats") def timing_stats(events: list[LogEvent]) -> dict[str, Any]: """ Compute inter-arrival-time statistics across *events* (sorted by ts). @@ -221,6 +223,7 @@ def timing_stats(events: list[LogEvent]) -> dict[str, Any]: # ─── Behavior classification ──────────────────────────────────────────────── +@_traced("profiler.classify_behavior") def classify_behavior(stats: dict[str, Any], services_count: int) -> str: """ Coarse behavior bucket: @@ -305,6 +308,7 @@ def guess_tool(mean_iat_s: float | None, cv: float | None) -> str | None: # ─── Header-based tool detection ──────────────────────────────────────────── +@_traced("profiler.detect_tools_from_headers") def detect_tools_from_headers(events: list[LogEvent]) -> list[str]: """ Scan HTTP `request` events for tool-identifying headers. @@ -372,6 +376,7 @@ def detect_tools_from_headers(events: list[LogEvent]) -> list[str]: # ─── Phase sequencing ─────────────────────────────────────────────────────── +@_traced("profiler.phase_sequence") def phase_sequence(events: list[LogEvent]) -> dict[str, Any]: """ Derive recon→exfil phase transition info. @@ -418,6 +423,7 @@ def phase_sequence(events: list[LogEvent]) -> dict[str, Any]: # ─── Sniffer rollup (OS fingerprint + retransmits) ────────────────────────── +@_traced("profiler.sniffer_rollup") def sniffer_rollup(events: list[LogEvent]) -> dict[str, Any]: """ Roll up sniffer-emitted `tcp_syn_fingerprint` and `tcp_flow_timing` @@ -535,6 +541,7 @@ def _int_or_none(v: Any) -> int | None: # ─── Composite: build the full AttackerBehavior record ────────────────────── +@_traced("profiler.build_behavior_record") def build_behavior_record(events: list[LogEvent]) -> dict[str, Any]: """ Build the dict to persist in the `attacker_behavior` table. @@ -572,6 +579,15 @@ def build_behavior_record(events: list[LogEvent]) -> dict[str, Any]: cv = stats.get("cv") beacon_jitter_pct = round(cv * 100, 2) if cv is not None else None + _tracer = _get_tracer("profiler") + with _tracer.start_as_current_span("profiler.behavior_summary") as _span: + _span.set_attribute("behavior_class", behavior) + _span.set_attribute("os_guess", rollup["os_guess"] or "unknown") + _span.set_attribute("tool_count", len(all_tools)) + _span.set_attribute("event_count", stats.get("event_count", 0)) + if all_tools: + _span.set_attribute("tools", ",".join(all_tools)) + return { "os_guess": rollup["os_guess"], "hop_distance": rollup["hop_distance"], diff --git a/decnet/sniffer/fingerprint.py b/decnet/sniffer/fingerprint.py index 70a1a39..8a132c6 100644 --- a/decnet/sniffer/fingerprint.py +++ b/decnet/sniffer/fingerprint.py @@ -17,6 +17,7 @@ from typing import Any, Callable from decnet.prober.tcpfp import _extract_options_order from decnet.sniffer.p0f import guess_os, hop_distance, initial_ttl from decnet.sniffer.syslog import SEVERITY_INFO, SEVERITY_WARNING, syslog_line +from decnet.telemetry import traced as _traced, get_tracer as _get_tracer # ─── Constants ─────────────────────────────────────────────────────────────── @@ -94,6 +95,7 @@ def _filter_grease(values: list[int]) -> list[int]: # ─── TLS parsers ───────────────────────────────────────────────────────────── +@_traced("sniffer.parse_client_hello") def _parse_client_hello(data: bytes) -> dict[str, Any] | None: try: if len(data) < 6: @@ -228,6 +230,7 @@ def _parse_client_hello(data: bytes) -> dict[str, Any] | None: return None +@_traced("sniffer.parse_server_hello") def _parse_server_hello(data: bytes) -> dict[str, Any] | None: try: if len(data) < 6 or data[0] != _TLS_RECORD_HANDSHAKE: @@ -294,6 +297,7 @@ def _parse_server_hello(data: bytes) -> dict[str, Any] | None: return None +@_traced("sniffer.parse_certificate") def _parse_certificate(data: bytes) -> dict[str, Any] | None: try: if len(data) < 6 or data[0] != _TLS_RECORD_HANDSHAKE: @@ -547,6 +551,7 @@ def _tls_version_str(version: int) -> str: }.get(version, f"0x{version:04x}") +@_traced("sniffer.ja3") def _ja3(ch: dict[str, Any]) -> tuple[str, str]: parts = [ str(ch["tls_version"]), @@ -559,6 +564,7 @@ def _ja3(ch: dict[str, Any]) -> tuple[str, str]: return ja3_str, hashlib.md5(ja3_str.encode()).hexdigest() # nosec B324 +@_traced("sniffer.ja3s") def _ja3s(sh: dict[str, Any]) -> tuple[str, str]: parts = [ str(sh["tls_version"]), @@ -605,6 +611,7 @@ def _sha256_12(text: str) -> str: return hashlib.sha256(text.encode()).hexdigest()[:12] +@_traced("sniffer.ja4") def _ja4(ch: dict[str, Any]) -> str: proto = "t" ver = _ja4_version(ch) @@ -624,6 +631,7 @@ def _ja4(ch: dict[str, Any]) -> str: return f"{section_a}_{section_b}_{section_c}" +@_traced("sniffer.ja4s") def _ja4s(sh: dict[str, Any]) -> str: proto = "t" selected = sh.get("selected_version") @@ -653,6 +661,7 @@ def _ja4l( # ─── Session resumption ───────────────────────────────────────────────────── +@_traced("sniffer.session_resumption_info") def _session_resumption_info(ch: dict[str, Any]) -> dict[str, Any]: mechanisms: list[str] = [] if ch.get("has_session_ticket_data"): @@ -965,33 +974,38 @@ class SnifferEngine: # when the destination is a known decky, i.e. we're seeing an # attacker's initial packet. if dst_ip in self._ip_to_decky: - tcp_fp = _extract_tcp_fingerprint(list(tcp.options or [])) - os_label = guess_os( - ttl=ip.ttl, - window=int(tcp.window), - mss=tcp_fp["mss"], - wscale=tcp_fp["wscale"], - options_sig=tcp_fp["options_sig"], - ) - target_node = self._ip_to_decky[dst_ip] - self._log( - target_node, - "tcp_syn_fingerprint", - src_ip=src_ip, - src_port=str(src_port), - dst_ip=dst_ip, - dst_port=str(dst_port), - ttl=str(ip.ttl), - initial_ttl=str(initial_ttl(ip.ttl)), - hop_distance=str(hop_distance(ip.ttl)), - window=str(int(tcp.window)), - mss=str(tcp_fp["mss"]), - wscale=("" if tcp_fp["wscale"] is None else str(tcp_fp["wscale"])), - options_sig=tcp_fp["options_sig"], - has_sack=str(tcp_fp["sack_ok"]).lower(), - has_timestamps=str(tcp_fp["has_timestamps"]).lower(), - os_guess=os_label, - ) + _tracer = _get_tracer("sniffer") + with _tracer.start_as_current_span("sniffer.tcp_syn_fingerprint") as _span: + _span.set_attribute("attacker_ip", src_ip) + _span.set_attribute("dst_port", dst_port) + tcp_fp = _extract_tcp_fingerprint(list(tcp.options or [])) + os_label = guess_os( + ttl=ip.ttl, + window=int(tcp.window), + mss=tcp_fp["mss"], + wscale=tcp_fp["wscale"], + options_sig=tcp_fp["options_sig"], + ) + _span.set_attribute("os_guess", os_label) + target_node = self._ip_to_decky[dst_ip] + self._log( + target_node, + "tcp_syn_fingerprint", + src_ip=src_ip, + src_port=str(src_port), + dst_ip=dst_ip, + dst_port=str(dst_port), + ttl=str(ip.ttl), + initial_ttl=str(initial_ttl(ip.ttl)), + hop_distance=str(hop_distance(ip.ttl)), + window=str(int(tcp.window)), + mss=str(tcp_fp["mss"]), + wscale=("" if tcp_fp["wscale"] is None else str(tcp_fp["wscale"])), + options_sig=tcp_fp["options_sig"], + has_sack=str(tcp_fp["sack_ok"]).lower(), + has_timestamps=str(tcp_fp["has_timestamps"]).lower(), + os_guess=os_label, + ) elif flags & _TCP_SYN and flags & _TCP_ACK: rev_key = (dst_ip, dst_port, src_ip, src_port) @@ -1019,116 +1033,134 @@ class SnifferEngine: # ClientHello ch = _parse_client_hello(payload) if ch is not None: - self._cleanup_sessions() + _tracer = _get_tracer("sniffer") + with _tracer.start_as_current_span("sniffer.tls_client_hello") as _span: + _span.set_attribute("attacker_ip", src_ip) + _span.set_attribute("dst_port", dst_port) + self._cleanup_sessions() - key = (src_ip, src_port, dst_ip, dst_port) - ja3_str, ja3_hash = _ja3(ch) - ja4_hash = _ja4(ch) - resumption = _session_resumption_info(ch) - rtt_data = _ja4l(key, self._tcp_rtt) + key = (src_ip, src_port, dst_ip, dst_port) + ja3_str, ja3_hash = _ja3(ch) + ja4_hash = _ja4(ch) + resumption = _session_resumption_info(ch) + rtt_data = _ja4l(key, self._tcp_rtt) - self._sessions[key] = { - "ja3": ja3_hash, - "ja3_str": ja3_str, - "ja4": ja4_hash, - "tls_version": ch["tls_version"], - "cipher_suites": ch["cipher_suites"], - "extensions": ch["extensions"], - "signature_algorithms": ch.get("signature_algorithms", []), - "supported_versions": ch.get("supported_versions", []), - "sni": ch["sni"], - "alpn": ch["alpn"], - "resumption": resumption, - } - self._session_ts[key] = time.monotonic() + _span.set_attribute("ja3", ja3_hash) + _span.set_attribute("ja4", ja4_hash) + _span.set_attribute("sni", ch["sni"] or "") - log_fields: dict[str, Any] = { - "src_ip": src_ip, - "src_port": str(src_port), - "dst_ip": dst_ip, - "dst_port": str(dst_port), - "ja3": ja3_hash, - "ja4": ja4_hash, - "tls_version": _tls_version_str(ch["tls_version"]), - "sni": ch["sni"] or "", - "alpn": ",".join(ch["alpn"]), - "raw_ciphers": "-".join(str(c) for c in ch["cipher_suites"]), - "raw_extensions": "-".join(str(e) for e in ch["extensions"]), - } + self._sessions[key] = { + "ja3": ja3_hash, + "ja3_str": ja3_str, + "ja4": ja4_hash, + "tls_version": ch["tls_version"], + "cipher_suites": ch["cipher_suites"], + "extensions": ch["extensions"], + "signature_algorithms": ch.get("signature_algorithms", []), + "supported_versions": ch.get("supported_versions", []), + "sni": ch["sni"], + "alpn": ch["alpn"], + "resumption": resumption, + } + self._session_ts[key] = time.monotonic() - if resumption["resumption_attempted"]: - log_fields["resumption"] = ",".join(resumption["mechanisms"]) + log_fields: dict[str, Any] = { + "src_ip": src_ip, + "src_port": str(src_port), + "dst_ip": dst_ip, + "dst_port": str(dst_port), + "ja3": ja3_hash, + "ja4": ja4_hash, + "tls_version": _tls_version_str(ch["tls_version"]), + "sni": ch["sni"] or "", + "alpn": ",".join(ch["alpn"]), + "raw_ciphers": "-".join(str(c) for c in ch["cipher_suites"]), + "raw_extensions": "-".join(str(e) for e in ch["extensions"]), + } - if rtt_data: - log_fields["ja4l_rtt_ms"] = str(rtt_data["rtt_ms"]) - log_fields["ja4l_client_ttl"] = str(rtt_data["client_ttl"]) + if resumption["resumption_attempted"]: + log_fields["resumption"] = ",".join(resumption["mechanisms"]) - # Resolve node for the *destination* (the decky being attacked) - target_node = self._ip_to_decky.get(dst_ip, node_name) - self._log(target_node, "tls_client_hello", **log_fields) + if rtt_data: + log_fields["ja4l_rtt_ms"] = str(rtt_data["rtt_ms"]) + log_fields["ja4l_client_ttl"] = str(rtt_data["client_ttl"]) + + # Resolve node for the *destination* (the decky being attacked) + target_node = self._ip_to_decky.get(dst_ip, node_name) + self._log(target_node, "tls_client_hello", **log_fields) return # ServerHello sh = _parse_server_hello(payload) if sh is not None: - rev_key = (dst_ip, dst_port, src_ip, src_port) - ch_data = self._sessions.pop(rev_key, None) - self._session_ts.pop(rev_key, None) + _tracer = _get_tracer("sniffer") + with _tracer.start_as_current_span("sniffer.tls_server_hello") as _span: + _span.set_attribute("attacker_ip", dst_ip) + rev_key = (dst_ip, dst_port, src_ip, src_port) + ch_data = self._sessions.pop(rev_key, None) + self._session_ts.pop(rev_key, None) - ja3s_str, ja3s_hash = _ja3s(sh) - ja4s_hash = _ja4s(sh) + ja3s_str, ja3s_hash = _ja3s(sh) + ja4s_hash = _ja4s(sh) - fields: dict[str, Any] = { - "src_ip": dst_ip, - "src_port": str(dst_port), - "dst_ip": src_ip, - "dst_port": str(src_port), - "ja3s": ja3s_hash, - "ja4s": ja4s_hash, - "tls_version": _tls_version_str(sh["tls_version"]), - } + _span.set_attribute("ja3s", ja3s_hash) + _span.set_attribute("ja4s", ja4s_hash) - if ch_data: - fields["ja3"] = ch_data["ja3"] - fields["ja4"] = ch_data.get("ja4", "") - fields["sni"] = ch_data["sni"] or "" - fields["alpn"] = ",".join(ch_data["alpn"]) - fields["raw_ciphers"] = "-".join(str(c) for c in ch_data["cipher_suites"]) - fields["raw_extensions"] = "-".join(str(e) for e in ch_data["extensions"]) - if ch_data.get("resumption", {}).get("resumption_attempted"): - fields["resumption"] = ",".join(ch_data["resumption"]["mechanisms"]) + fields: dict[str, Any] = { + "src_ip": dst_ip, + "src_port": str(dst_port), + "dst_ip": src_ip, + "dst_port": str(src_port), + "ja3s": ja3s_hash, + "ja4s": ja4s_hash, + "tls_version": _tls_version_str(sh["tls_version"]), + } - rtt_data = self._tcp_rtt.pop(rev_key, None) - if rtt_data: - fields["ja4l_rtt_ms"] = str(rtt_data["rtt_ms"]) - fields["ja4l_client_ttl"] = str(rtt_data["client_ttl"]) + if ch_data: + fields["ja3"] = ch_data["ja3"] + fields["ja4"] = ch_data.get("ja4", "") + fields["sni"] = ch_data["sni"] or "" + fields["alpn"] = ",".join(ch_data["alpn"]) + fields["raw_ciphers"] = "-".join(str(c) for c in ch_data["cipher_suites"]) + fields["raw_extensions"] = "-".join(str(e) for e in ch_data["extensions"]) + if ch_data.get("resumption", {}).get("resumption_attempted"): + fields["resumption"] = ",".join(ch_data["resumption"]["mechanisms"]) - # Server response — resolve by src_ip (the decky responding) - target_node = self._ip_to_decky.get(src_ip, node_name) - self._log(target_node, "tls_session", severity=SEVERITY_WARNING, **fields) + rtt_data = self._tcp_rtt.pop(rev_key, None) + if rtt_data: + fields["ja4l_rtt_ms"] = str(rtt_data["rtt_ms"]) + fields["ja4l_client_ttl"] = str(rtt_data["client_ttl"]) + + # Server response — resolve by src_ip (the decky responding) + target_node = self._ip_to_decky.get(src_ip, node_name) + self._log(target_node, "tls_session", severity=SEVERITY_WARNING, **fields) return # Certificate (TLS 1.2 only) cert = _parse_certificate(payload) if cert is not None: - rev_key = (dst_ip, dst_port, src_ip, src_port) - ch_data = self._sessions.get(rev_key) + _tracer = _get_tracer("sniffer") + with _tracer.start_as_current_span("sniffer.tls_certificate") as _span: + _span.set_attribute("subject_cn", cert["subject_cn"]) + _span.set_attribute("self_signed", cert["self_signed"]) + rev_key = (dst_ip, dst_port, src_ip, src_port) + ch_data = self._sessions.get(rev_key) - cert_fields: dict[str, Any] = { - "src_ip": dst_ip, - "src_port": str(dst_port), - "dst_ip": src_ip, - "dst_port": str(src_port), - "subject_cn": cert["subject_cn"], - "issuer": cert["issuer"], - "self_signed": str(cert["self_signed"]).lower(), - "not_before": cert["not_before"], - "not_after": cert["not_after"], - } - if cert["sans"]: - cert_fields["sans"] = ",".join(cert["sans"]) - if ch_data: - cert_fields["sni"] = ch_data.get("sni", "") + cert_fields: dict[str, Any] = { + "src_ip": dst_ip, + "src_port": str(dst_port), + "dst_ip": src_ip, + "dst_port": str(src_port), + "subject_cn": cert["subject_cn"], + "issuer": cert["issuer"], + "self_signed": str(cert["self_signed"]).lower(), + "not_before": cert["not_before"], + "not_after": cert["not_after"], + } + if cert["sans"]: + cert_fields["sans"] = ",".join(cert["sans"]) + if ch_data: + cert_fields["sni"] = ch_data.get("sni", "") - target_node = self._ip_to_decky.get(src_ip, node_name) - self._log(target_node, "tls_certificate", **cert_fields) + target_node = self._ip_to_decky.get(src_ip, node_name) + self._log(target_node, "tls_certificate", **cert_fields) diff --git a/decnet/sniffer/p0f.py b/decnet/sniffer/p0f.py index 41ae41e..88cceca 100644 --- a/decnet/sniffer/p0f.py +++ b/decnet/sniffer/p0f.py @@ -22,6 +22,8 @@ No external dependencies. from __future__ import annotations +from decnet.telemetry import traced as _traced + # ─── TTL → initial TTL bucket ─────────────────────────────────────────────── # Common "hop 0" TTLs. Packets decrement TTL once per hop, so we round up @@ -216,6 +218,7 @@ def _match_signature( return True +@_traced("sniffer.p0f_guess_os") def guess_os( ttl: int, window: int, diff --git a/decnet/sniffer/syslog.py b/decnet/sniffer/syslog.py index 1fd7587..8889b78 100644 --- a/decnet/sniffer/syslog.py +++ b/decnet/sniffer/syslog.py @@ -11,6 +11,7 @@ from pathlib import Path from typing import Any from decnet.collector.worker import parse_rfc5424 +from decnet.telemetry import traced as _traced # ─── Constants (must match templates/sniffer/decnet_logging.py) ────────────── @@ -57,6 +58,7 @@ def syslog_line( return f"{pri}1 {ts} {host} {appname} {_NILVALUE} {msgid} {sd}{message}" +@_traced("sniffer.write_event") def write_event(line: str, log_path: Path, json_path: Path) -> None: """Append a syslog line to the raw log and its parsed JSON to the json log.""" with open(log_path, "a", encoding="utf-8") as lf: diff --git a/decnet/sniffer/worker.py b/decnet/sniffer/worker.py index dca71ab..8cd532a 100644 --- a/decnet/sniffer/worker.py +++ b/decnet/sniffer/worker.py @@ -110,6 +110,7 @@ def _sniff_loop( logger.info("sniffer: sniff loop ended") +@_traced("sniffer.worker") async def sniffer_worker(log_file: str) -> None: """ Async entry point — started as asyncio.create_task in the API lifespan. diff --git a/decnet/telemetry.py b/decnet/telemetry.py index d742a73..cdecebd 100644 --- a/decnet/telemetry.py +++ b/decnet/telemetry.py @@ -64,7 +64,9 @@ def setup_tracing(app: Any) -> None: _init_provider() from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor FastAPIInstrumentor.instrument_app(app) - log.info("FastAPI auto-instrumentation active") + from decnet.logging import enable_trace_context + enable_trace_context() + log.info("FastAPI auto-instrumentation active, log-trace correlation enabled") except Exception as exc: log.warning("OTEL setup failed — continuing without tracing: %s", exc) diff --git a/decnet/web/db/models.py b/decnet/web/db/models.py index 8d17124..31de680 100644 --- a/decnet/web/db/models.py +++ b/decnet/web/db/models.py @@ -43,6 +43,11 @@ class Log(SQLModel, table=True): raw_line: str = Field(sa_column=Column("raw_line", Text, nullable=False)) fields: str = Field(sa_column=Column("fields", Text, nullable=False)) msg: Optional[str] = Field(default=None, sa_column=Column("msg", Text, nullable=True)) + # OTEL trace context — bridges the collector→ingester trace to the SSE + # read path. Nullable so pre-existing rows and non-traced deployments + # are unaffected. + trace_id: Optional[str] = Field(default=None) + span_id: Optional[str] = Field(default=None) class Bounty(SQLModel, table=True): __tablename__ = "bounty" diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py index 529b2aa..1fffee7 100644 --- a/decnet/web/ingester.py +++ b/decnet/web/ingester.py @@ -75,6 +75,14 @@ async def log_ingestion_worker(repo: BaseRepository) -> None: _span.set_attribute("service", _log_data.get("service", "")) _span.set_attribute("event_type", _log_data.get("event_type", "")) _span.set_attribute("attacker_ip", _log_data.get("attacker_ip", "")) + # Persist trace context in the DB row so the SSE + # read path can link back to this ingestion trace. + _sctx = getattr(_span, "get_span_context", None) + if _sctx: + _ctx = _sctx() + if _ctx and getattr(_ctx, "trace_id", 0): + _log_data["trace_id"] = format(_ctx.trace_id, "032x") + _log_data["span_id"] = format(_ctx.span_id, "016x") logger.debug("ingest: record decky=%s event_type=%s", _log_data.get("decky"), _log_data.get("event_type")) await repo.add_log(_log_data) await _extract_bounty(repo, _log_data) diff --git a/decnet/web/router/attackers/api_get_attacker_commands.py b/decnet/web/router/attackers/api_get_attacker_commands.py index 8653d95..d2afb8a 100644 --- a/decnet/web/router/attackers/api_get_attacker_commands.py +++ b/decnet/web/router/attackers/api_get_attacker_commands.py @@ -2,6 +2,7 @@ from typing import Any, Optional from fastapi import APIRouter, Depends, HTTPException, Query +from decnet.telemetry import traced as _traced from decnet.web.dependencies import require_viewer, repo router = APIRouter() @@ -15,6 +16,7 @@ router = APIRouter() 404: {"description": "Attacker not found"}, }, ) +@_traced("api.get_attacker_commands") async def get_attacker_commands( uuid: str, limit: int = Query(50, ge=1, le=200), diff --git a/decnet/web/router/attackers/api_get_attacker_detail.py b/decnet/web/router/attackers/api_get_attacker_detail.py index 4d23537..cd29ea1 100644 --- a/decnet/web/router/attackers/api_get_attacker_detail.py +++ b/decnet/web/router/attackers/api_get_attacker_detail.py @@ -2,6 +2,7 @@ from typing import Any from fastapi import APIRouter, Depends, HTTPException +from decnet.telemetry import traced as _traced from decnet.web.dependencies import require_viewer, repo router = APIRouter() @@ -15,6 +16,7 @@ router = APIRouter() 404: {"description": "Attacker not found"}, }, ) +@_traced("api.get_attacker_detail") async def get_attacker_detail( uuid: str, user: dict = Depends(require_viewer), diff --git a/decnet/web/router/attackers/api_get_attackers.py b/decnet/web/router/attackers/api_get_attackers.py index 8961266..958676f 100644 --- a/decnet/web/router/attackers/api_get_attackers.py +++ b/decnet/web/router/attackers/api_get_attackers.py @@ -2,6 +2,7 @@ from typing import Any, Optional from fastapi import APIRouter, Depends, Query +from decnet.telemetry import traced as _traced from decnet.web.dependencies import require_viewer, repo from decnet.web.db.models import AttackersResponse @@ -17,6 +18,7 @@ router = APIRouter() 422: {"description": "Validation error"}, }, ) +@_traced("api.get_attackers") async def get_attackers( limit: int = Query(50, ge=1, le=1000), offset: int = Query(0, ge=0, le=2147483647), diff --git a/decnet/web/router/auth/api_change_pass.py b/decnet/web/router/auth/api_change_pass.py index c186973..fec8bac 100644 --- a/decnet/web/router/auth/api_change_pass.py +++ b/decnet/web/router/auth/api_change_pass.py @@ -2,6 +2,7 @@ from typing import Any, Optional from fastapi import APIRouter, Depends, HTTPException, status +from decnet.telemetry import traced as _traced from decnet.web.auth import get_password_hash, verify_password from decnet.web.dependencies import get_current_user_unchecked, repo from decnet.web.db.models import ChangePasswordRequest @@ -18,6 +19,7 @@ router = APIRouter() 422: {"description": "Validation error"} }, ) +@_traced("api.change_password") async def change_password(request: ChangePasswordRequest, current_user: str = Depends(get_current_user_unchecked)) -> dict[str, str]: _user: Optional[dict[str, Any]] = await repo.get_user_by_uuid(current_user) if not _user or not verify_password(request.old_password, _user["password_hash"]): diff --git a/decnet/web/router/auth/api_login.py b/decnet/web/router/auth/api_login.py index a9db5b7..252a652 100644 --- a/decnet/web/router/auth/api_login.py +++ b/decnet/web/router/auth/api_login.py @@ -3,6 +3,7 @@ from typing import Any, Optional from fastapi import APIRouter, HTTPException, status +from decnet.telemetry import traced as _traced from decnet.web.auth import ( ACCESS_TOKEN_EXPIRE_MINUTES, create_access_token, @@ -24,6 +25,7 @@ router = APIRouter() 422: {"description": "Validation error"} }, ) +@_traced("api.login") async def login(request: LoginRequest) -> dict[str, Any]: _user: Optional[dict[str, Any]] = await repo.get_user_by_username(request.username) if not _user or not verify_password(request.password, _user["password_hash"]): diff --git a/decnet/web/router/bounty/api_get_bounties.py b/decnet/web/router/bounty/api_get_bounties.py index 30da3b8..04dc784 100644 --- a/decnet/web/router/bounty/api_get_bounties.py +++ b/decnet/web/router/bounty/api_get_bounties.py @@ -2,6 +2,7 @@ from typing import Any, Optional from fastapi import APIRouter, Depends, Query +from decnet.telemetry import traced as _traced from decnet.web.dependencies import require_viewer, repo from decnet.web.db.models import BountyResponse @@ -10,6 +11,7 @@ router = APIRouter() @router.get("/bounty", response_model=BountyResponse, tags=["Bounty Vault"], responses={401: {"description": "Could not validate credentials"}, 422: {"description": "Validation error"}},) +@_traced("api.get_bounties") async def get_bounties( limit: int = Query(50, ge=1, le=1000), offset: int = Query(0, ge=0, le=2147483647), diff --git a/decnet/web/router/config/api_get_config.py b/decnet/web/router/config/api_get_config.py index 397318c..495dc4c 100644 --- a/decnet/web/router/config/api_get_config.py +++ b/decnet/web/router/config/api_get_config.py @@ -1,6 +1,7 @@ from fastapi import APIRouter, Depends from decnet.env import DECNET_DEVELOPER +from decnet.telemetry import traced as _traced from decnet.web.dependencies import require_viewer, repo from decnet.web.db.models import UserResponse @@ -17,6 +18,7 @@ _DEFAULT_MUTATION_INTERVAL = "30m" 401: {"description": "Could not validate credentials"}, }, ) +@_traced("api.get_config") async def api_get_config(user: dict = Depends(require_viewer)) -> dict: limits_state = await repo.get_state("config_limits") globals_state = await repo.get_state("config_globals") diff --git a/decnet/web/router/config/api_manage_users.py b/decnet/web/router/config/api_manage_users.py index c1bf9a8..717980d 100644 --- a/decnet/web/router/config/api_manage_users.py +++ b/decnet/web/router/config/api_manage_users.py @@ -2,6 +2,7 @@ import uuid as _uuid from fastapi import APIRouter, Depends, HTTPException +from decnet.telemetry import traced as _traced from decnet.web.auth import get_password_hash from decnet.web.dependencies import require_admin, repo from decnet.web.db.models import ( @@ -24,6 +25,7 @@ router = APIRouter() 422: {"description": "Validation error"}, }, ) +@_traced("api.create_user") async def api_create_user( req: CreateUserRequest, admin: dict = Depends(require_admin), @@ -57,6 +59,7 @@ async def api_create_user( 404: {"description": "User not found"}, }, ) +@_traced("api.delete_user") async def api_delete_user( user_uuid: str, admin: dict = Depends(require_admin), @@ -80,6 +83,7 @@ async def api_delete_user( 422: {"description": "Validation error"}, }, ) +@_traced("api.update_user_role") async def api_update_user_role( user_uuid: str, req: UpdateUserRoleRequest, @@ -106,6 +110,7 @@ async def api_update_user_role( 422: {"description": "Validation error"}, }, ) +@_traced("api.reset_user_password") async def api_reset_user_password( user_uuid: str, req: ResetUserPasswordRequest, diff --git a/decnet/web/router/config/api_reinit.py b/decnet/web/router/config/api_reinit.py index ced28b1..ebdd1c7 100644 --- a/decnet/web/router/config/api_reinit.py +++ b/decnet/web/router/config/api_reinit.py @@ -1,6 +1,7 @@ from fastapi import APIRouter, Depends, HTTPException from decnet.env import DECNET_DEVELOPER +from decnet.telemetry import traced as _traced from decnet.web.dependencies import require_admin, repo router = APIRouter() @@ -14,6 +15,7 @@ router = APIRouter() 403: {"description": "Admin access required or developer mode not enabled"}, }, ) +@_traced("api.reinit") async def api_reinit(admin: dict = Depends(require_admin)) -> dict: if not DECNET_DEVELOPER: raise HTTPException(status_code=403, detail="Developer mode is not enabled") diff --git a/decnet/web/router/config/api_update_config.py b/decnet/web/router/config/api_update_config.py index d5c60f8..53826e5 100644 --- a/decnet/web/router/config/api_update_config.py +++ b/decnet/web/router/config/api_update_config.py @@ -1,5 +1,6 @@ from fastapi import APIRouter, Depends +from decnet.telemetry import traced as _traced from decnet.web.dependencies import require_admin, repo from decnet.web.db.models import DeploymentLimitRequest, GlobalMutationIntervalRequest @@ -15,6 +16,7 @@ router = APIRouter() 422: {"description": "Validation error"}, }, ) +@_traced("api.update_deployment_limit") async def api_update_deployment_limit( req: DeploymentLimitRequest, admin: dict = Depends(require_admin), @@ -32,6 +34,7 @@ async def api_update_deployment_limit( 422: {"description": "Validation error"}, }, ) +@_traced("api.update_global_mutation_interval") async def api_update_global_mutation_interval( req: GlobalMutationIntervalRequest, admin: dict = Depends(require_admin), diff --git a/decnet/web/router/fleet/api_deploy_deckies.py b/decnet/web/router/fleet/api_deploy_deckies.py index c799fc7..cdf2b44 100644 --- a/decnet/web/router/fleet/api_deploy_deckies.py +++ b/decnet/web/router/fleet/api_deploy_deckies.py @@ -3,6 +3,7 @@ import os from fastapi import APIRouter, Depends, HTTPException from decnet.logging import get_logger +from decnet.telemetry import traced as _traced from decnet.config import DEFAULT_MUTATE_INTERVAL, DecnetConfig, _ROOT from decnet.engine import deploy as _deploy from decnet.ini_loader import load_ini_from_string @@ -27,6 +28,7 @@ router = APIRouter() 500: {"description": "Deployment failed"} } ) +@_traced("api.deploy_deckies") async def api_deploy_deckies(req: DeployIniRequest, admin: dict = Depends(require_admin)) -> dict[str, str]: from decnet.fleet import build_deckies_from_ini diff --git a/decnet/web/router/fleet/api_get_deckies.py b/decnet/web/router/fleet/api_get_deckies.py index c520ae8..6d933fa 100644 --- a/decnet/web/router/fleet/api_get_deckies.py +++ b/decnet/web/router/fleet/api_get_deckies.py @@ -2,6 +2,7 @@ from typing import Any from fastapi import APIRouter, Depends +from decnet.telemetry import traced as _traced from decnet.web.dependencies import require_viewer, repo router = APIRouter() @@ -9,5 +10,6 @@ router = APIRouter() @router.get("/deckies", tags=["Fleet Management"], responses={401: {"description": "Could not validate credentials"}, 422: {"description": "Validation error"}},) +@_traced("api.get_deckies") async def get_deckies(user: dict = Depends(require_viewer)) -> list[dict[str, Any]]: return await repo.get_deckies() diff --git a/decnet/web/router/fleet/api_mutate_decky.py b/decnet/web/router/fleet/api_mutate_decky.py index b98fa7b..ea47be0 100644 --- a/decnet/web/router/fleet/api_mutate_decky.py +++ b/decnet/web/router/fleet/api_mutate_decky.py @@ -1,6 +1,7 @@ import os from fastapi import APIRouter, Depends, HTTPException, Path +from decnet.telemetry import traced as _traced from decnet.mutator import mutate_decky from decnet.web.dependencies import require_admin, repo @@ -12,6 +13,7 @@ router = APIRouter() tags=["Fleet Management"], responses={401: {"description": "Could not validate credentials"}, 403: {"description": "Insufficient permissions"}, 404: {"description": "Decky not found"}} ) +@_traced("api.mutate_decky") async def api_mutate_decky( decky_name: str = Path(..., pattern=r"^[a-z0-9\-]{1,64}$"), admin: dict = Depends(require_admin), diff --git a/decnet/web/router/fleet/api_mutate_interval.py b/decnet/web/router/fleet/api_mutate_interval.py index f8c5202..10afba9 100644 --- a/decnet/web/router/fleet/api_mutate_interval.py +++ b/decnet/web/router/fleet/api_mutate_interval.py @@ -1,5 +1,6 @@ from fastapi import APIRouter, Depends, HTTPException +from decnet.telemetry import traced as _traced from decnet.config import DecnetConfig from decnet.web.dependencies import require_admin, repo from decnet.web.db.models import MutateIntervalRequest @@ -24,6 +25,7 @@ def _parse_duration(s: str) -> int: 422: {"description": "Validation error"} }, ) +@_traced("api.update_mutate_interval") async def api_update_mutate_interval(decky_name: str, req: MutateIntervalRequest, admin: dict = Depends(require_admin)) -> dict[str, str]: state_dict = await repo.get_state("deployment") if not state_dict: diff --git a/decnet/web/router/health/api_get_health.py b/decnet/web/router/health/api_get_health.py index be84e7f..6beb271 100644 --- a/decnet/web/router/health/api_get_health.py +++ b/decnet/web/router/health/api_get_health.py @@ -3,6 +3,7 @@ from typing import Any from fastapi import APIRouter, Depends from fastapi.responses import JSONResponse +from decnet.telemetry import traced as _traced from decnet.web.dependencies import require_viewer, repo from decnet.web.db.models import HealthResponse, ComponentHealth @@ -20,6 +21,7 @@ _OPTIONAL_SERVICES = {"sniffer_worker"} 503: {"model": HealthResponse, "description": "System unhealthy"}, }, ) +@_traced("api.get_health") async def get_health(user: dict = Depends(require_viewer)) -> Any: components: dict[str, ComponentHealth] = {} diff --git a/decnet/web/router/logs/api_get_histogram.py b/decnet/web/router/logs/api_get_histogram.py index 2fe9775..4ea54e5 100644 --- a/decnet/web/router/logs/api_get_histogram.py +++ b/decnet/web/router/logs/api_get_histogram.py @@ -2,6 +2,7 @@ from typing import Any, Optional from fastapi import APIRouter, Depends, Query +from decnet.telemetry import traced as _traced from decnet.web.dependencies import require_viewer, repo router = APIRouter() @@ -9,6 +10,7 @@ router = APIRouter() @router.get("/logs/histogram", tags=["Logs"], responses={401: {"description": "Could not validate credentials"}, 422: {"description": "Validation error"}},) +@_traced("api.get_logs_histogram") async def get_logs_histogram( search: Optional[str] = None, start_time: Optional[str] = Query(None), diff --git a/decnet/web/router/logs/api_get_logs.py b/decnet/web/router/logs/api_get_logs.py index 74fec9f..68d9b11 100644 --- a/decnet/web/router/logs/api_get_logs.py +++ b/decnet/web/router/logs/api_get_logs.py @@ -2,6 +2,7 @@ from typing import Any, Optional from fastapi import APIRouter, Depends, Query +from decnet.telemetry import traced as _traced from decnet.web.dependencies import require_viewer, repo from decnet.web.db.models import LogsResponse @@ -10,6 +11,7 @@ router = APIRouter() @router.get("/logs", response_model=LogsResponse, tags=["Logs"], responses={401: {"description": "Could not validate credentials"}, 422: {"description": "Validation error"}}) +@_traced("api.get_logs") async def get_logs( limit: int = Query(50, ge=1, le=1000), offset: int = Query(0, ge=0, le=2147483647), diff --git a/decnet/web/router/stats/api_get_stats.py b/decnet/web/router/stats/api_get_stats.py index caf1c6f..21ae610 100644 --- a/decnet/web/router/stats/api_get_stats.py +++ b/decnet/web/router/stats/api_get_stats.py @@ -2,6 +2,7 @@ from typing import Any from fastapi import APIRouter, Depends +from decnet.telemetry import traced as _traced from decnet.web.dependencies import require_viewer, repo from decnet.web.db.models import StatsResponse @@ -10,5 +11,6 @@ router = APIRouter() @router.get("/stats", response_model=StatsResponse, tags=["Observability"], responses={401: {"description": "Could not validate credentials"}, 422: {"description": "Validation error"}},) +@_traced("api.get_stats") async def get_stats(user: dict = Depends(require_viewer)) -> dict[str, Any]: return await repo.get_stats_summary() diff --git a/decnet/web/router/stream/api_stream_events.py b/decnet/web/router/stream/api_stream_events.py index 3703277..6d9f910 100644 --- a/decnet/web/router/stream/api_stream_events.py +++ b/decnet/web/router/stream/api_stream_events.py @@ -7,6 +7,7 @@ from fastapi.responses import StreamingResponse from decnet.env import DECNET_DEVELOPER from decnet.logging import get_logger +from decnet.telemetry import traced as _traced, get_tracer as _get_tracer from decnet.web.dependencies import require_stream_viewer, repo log = get_logger("api") @@ -14,6 +15,34 @@ log = get_logger("api") router = APIRouter() +def _build_trace_links(logs: list[dict]) -> list: + """Build OTEL span links from persisted trace_id/span_id in log rows. + + Returns an empty list when tracing is disabled (no OTEL imports). + """ + try: + from opentelemetry.trace import Link, SpanContext, TraceFlags + except ImportError: + return [] + links: list[Link] = [] + for entry in logs: + tid = entry.get("trace_id") + sid = entry.get("span_id") + if not tid or not sid or tid == "0": + continue + try: + ctx = SpanContext( + trace_id=int(tid, 16), + span_id=int(sid, 16), + is_remote=True, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + ) + links.append(Link(ctx)) + except (ValueError, TypeError): + continue + return links + + @router.get("/stream", tags=["Observability"], responses={ 200: { @@ -24,6 +53,7 @@ router = APIRouter() 422: {"description": "Validation error"} }, ) +@_traced("api.stream_events") async def stream_events( request: Request, last_event_id: int = Query(0, alias="lastEventId"), @@ -75,7 +105,15 @@ async def stream_events( ) if new_logs: last_id = max(entry["id"] for entry in new_logs) - yield f"event: message\ndata: {json.dumps({'type': 'logs', 'data': new_logs})}\n\n" + # Create a span linking back to the ingestion traces + # stored in each log row, closing the pipeline gap. + _links = _build_trace_links(new_logs) + _tracer = _get_tracer("sse") + with _tracer.start_as_current_span( + "sse.emit_logs", links=_links, + attributes={"log_count": len(new_logs)}, + ): + yield f"event: message\ndata: {json.dumps({'type': 'logs', 'data': new_logs})}\n\n" loops_since_stats = stats_interval_sec if loops_since_stats >= stats_interval_sec: diff --git a/development/docs/TRACING.md b/development/docs/TRACING.md new file mode 100644 index 0000000..a59617b --- /dev/null +++ b/development/docs/TRACING.md @@ -0,0 +1,219 @@ +# Distributed Tracing + +OpenTelemetry (OTEL) distributed tracing across all DECNET services. Gated by the `DECNET_DEVELOPER_TRACING` environment variable (off by default). When disabled, zero overhead: no OTEL imports occur, `@traced` returns the original unwrapped function, and no middleware is installed. + +## Quick Start + +```bash +# 1. Start Jaeger (OTLP receiver on :4317, UI on :16686) +docker compose -f development/docker-compose.otel.yml up -d + +# 2. Run DECNET with tracing enabled +DECNET_DEVELOPER_TRACING=true decnet web + +# 3. Open Jaeger UI — service name is "decnet" +open http://localhost:16686 +``` + +| Variable | Default | Purpose | +|----------|---------|---------| +| `DECNET_DEVELOPER_TRACING` | `false` | Enable/disable all tracing | +| `DECNET_OTEL_ENDPOINT` | `http://localhost:4317` | OTLP gRPC exporter target | + +## Architecture + +The core module is `decnet/telemetry.py`. All tracing flows through it. + +| Export | Purpose | +|--------|---------| +| `setup_tracing(app)` | Init TracerProvider, instrument FastAPI, enable log-trace correlation | +| `shutdown_tracing()` | Flush and shut down the TracerProvider | +| `get_tracer(component)` | Return an OTEL Tracer or `_NoOpTracer` when disabled | +| `@traced(name)` | Decorator wrapping sync/async functions in spans (no-op when disabled) | +| `wrap_repository(repo)` | Dynamic `__getattr__` proxy adding `db.*` spans to every async method | +| `inject_context(record)` | Embed W3C trace context into a JSON record under `_trace` | +| `extract_context(record)` | Recover trace context from `_trace` and remove it from the record | +| `start_span_with_context(tracer, name, ctx)` | Start a span as child of an extracted context | + +**TracerProvider config**: Resource(`service.name=decnet`, `service.version=0.2.0`), `BatchSpanProcessor`, OTLP gRPC exporter. + +**When disabled**: `_NoOpTracer` and `_NoOpSpan` stubs are returned. No OTEL SDK packages are imported. The `@traced` decorator returns the original function object at decoration time. + +## Pipeline Trace Propagation + +The DECNET data pipeline is decoupled through JSON files and the database, which normally breaks trace continuity. Four mechanisms bridge the gaps: + +1. **Collector → JSON**: `inject_context()` embeds W3C `traceparent`/`tracestate` into each JSON log record under a `_trace` key. +2. **JSON → Ingester**: `extract_context()` recovers the parent context. The ingester creates `ingester.process_record` as a child span, preserving the collector→ingester parent-child relationship. +3. **Ingester → DB**: The ingester persists the current span's `trace_id` and `span_id` as columns on the `logs` table before calling `repo.add_log()`. +4. **DB → SSE**: The SSE endpoint reads `trace_id`/`span_id` from log rows and creates OTEL **span links** (FOLLOWS_FROM) on `sse.emit_logs`, connecting the read path back to the original ingestion traces. + +**Log-trace correlation**: `_TraceContextFilter` (installed by `enable_trace_context()`) injects `otel_trace_id` and `otel_span_id` into Python `LogRecord` objects, bridging structured logs with trace context. + +## Span Reference + +### API Endpoints (20 spans) + +| Span | Endpoint | +|------|----------| +| `api.login` | `POST /auth/login` | +| `api.change_password` | `POST /auth/change-password` | +| `api.get_logs` | `GET /logs` | +| `api.get_logs_histogram` | `GET /logs/histogram` | +| `api.get_bounties` | `GET /bounty` | +| `api.get_attackers` | `GET /attackers` | +| `api.get_attacker_detail` | `GET /attackers/{uuid}` | +| `api.get_attacker_commands` | `GET /attackers/{uuid}/commands` | +| `api.get_stats` | `GET /stats` | +| `api.get_deckies` | `GET /fleet/deckies` | +| `api.deploy_deckies` | `POST /fleet/deploy` | +| `api.mutate_decky` | `POST /fleet/mutate/{decky_id}` | +| `api.update_mutate_interval` | `POST /fleet/mutate-interval/{decky_id}` | +| `api.get_config` | `GET /config` | +| `api.update_deployment_limit` | `PUT /config/deployment-limit` | +| `api.update_global_mutation_interval` | `PUT /config/global-mutation-interval` | +| `api.create_user` | `POST /config/users` | +| `api.delete_user` | `DELETE /config/users/{uuid}` | +| `api.update_user_role` | `PUT /config/users/{uuid}/role` | +| `api.reset_user_password` | `PUT /config/users/{uuid}/password` | +| `api.reinit` | `POST /config/reinit` | +| `api.get_health` | `GET /health` | +| `api.stream_events` | `GET /stream` | + +### DB Layer (dynamic) + +Every async method on `BaseRepository` is automatically wrapped by `TracedRepository` as `db.` (e.g. `db.add_log`, `db.get_attackers`, `db.upsert_attacker`). + +### Collector + +| Span | Type | +|------|------| +| `collector.stream_container` | `@traced` | +| `collector.event` | inline | + +### Ingester + +| Span | Type | +|------|------| +| `ingester.process_record` | inline (with parent context) | +| `ingester.extract_bounty` | `@traced` | + +### Profiler + +| Span | Type | +|------|------| +| `profiler.incremental_update` | `@traced` | +| `profiler.update_profiles` | `@traced` | +| `profiler.process_ip` | inline | +| `profiler.timing_stats` | `@traced` | +| `profiler.classify_behavior` | `@traced` | +| `profiler.detect_tools_from_headers` | `@traced` | +| `profiler.phase_sequence` | `@traced` | +| `profiler.sniffer_rollup` | `@traced` | +| `profiler.build_behavior_record` | `@traced` | +| `profiler.behavior_summary` | inline | + +### Sniffer + +| Span | Type | +|------|------| +| `sniffer.worker` | `@traced` | +| `sniffer.sniff_loop` | `@traced` | +| `sniffer.tcp_syn_fingerprint` | inline | +| `sniffer.tls_client_hello` | inline | +| `sniffer.tls_server_hello` | inline | +| `sniffer.tls_certificate` | inline | +| `sniffer.parse_client_hello` | `@traced` | +| `sniffer.parse_server_hello` | `@traced` | +| `sniffer.parse_certificate` | `@traced` | +| `sniffer.ja3` | `@traced` | +| `sniffer.ja3s` | `@traced` | +| `sniffer.ja4` | `@traced` | +| `sniffer.ja4s` | `@traced` | +| `sniffer.session_resumption_info` | `@traced` | +| `sniffer.p0f_guess_os` | `@traced` | +| `sniffer.write_event` | `@traced` | + +### Prober + +| Span | Type | +|------|------| +| `prober.worker` | `@traced` | +| `prober.discover_attackers` | `@traced` | +| `prober.probe_cycle` | `@traced` | +| `prober.jarm_phase` | `@traced` | +| `prober.hassh_phase` | `@traced` | +| `prober.tcpfp_phase` | `@traced` | +| `prober.jarm_hash` | `@traced` | +| `prober.jarm_send_probe` | `@traced` | +| `prober.hassh_server` | `@traced` | +| `prober.hassh_ssh_connect` | `@traced` | +| `prober.tcp_fingerprint` | `@traced` | +| `prober.tcpfp_send_syn` | `@traced` | + +### Engine + +| Span | Type | +|------|------| +| `engine.deploy` | `@traced` | +| `engine.teardown` | `@traced` | +| `engine.compose_with_retry` | `@traced` | + +### Mutator + +| Span | Type | +|------|------| +| `mutator.mutate_decky` | `@traced` | +| `mutator.mutate_all` | `@traced` | +| `mutator.watch_loop` | `@traced` | + +### Correlation + +| Span | Type | +|------|------| +| `correlation.ingest_file` | `@traced` | +| `correlation.ingest_file.summary` | inline | +| `correlation.traversals` | `@traced` | +| `correlation.report_json` | `@traced` | +| `correlation.traversal_syslog_lines` | `@traced` | + +### Logging + +| Span | Type | +|------|------| +| `logging.init_file_handler` | `@traced` | +| `logging.probe_log_target` | `@traced` | + +### SSE + +| Span | Type | +|------|------| +| `sse.emit_logs` | inline (with span links to ingestion traces) | + +## Adding New Traces + +```python +from decnet.telemetry import traced as _traced, get_tracer as _get_tracer + +# Decorator (preferred for entire functions) +@_traced("component.operation") +async def my_function(): + ... + +# Inline (for sub-sections within a function) +with _get_tracer("component").start_as_current_span("component.sub_op") as span: + span.set_attribute("key", "value") + ... +``` + +Naming convention: `component.operation` (e.g. `prober.jarm_hash`, `profiler.timing_stats`). + +## Troubleshooting + +| Symptom | Check | +|---------|-------| +| No traces in Jaeger | `DECNET_DEVELOPER_TRACING=true`? Jaeger running on port 4317? | +| `ImportError` on OTEL packages | Run `pip install -e ".[dev]"` (OTEL is in optional deps) | +| Partial traces (ingester orphaned) | Verify `_trace` key present in JSON log file records | +| SSE spans have no links | Confirm `trace_id`/`span_id` columns exist in `logs` table | +| Performance concern | BatchSpanProcessor adds ~1ms per span; zero overhead when disabled |