From 04db13afae102247085a5de8d59e6ad8b0a4bdcb Mon Sep 17 00:00:00 2001 From: anti Date: Wed, 15 Apr 2026 23:52:13 -0400 Subject: [PATCH] feat: cross-stage trace propagation and granular per-event spans Collector now creates a span per event and injects W3C trace context into JSON records. Ingester extracts that context and creates child spans, connecting the full event journey: collector -> ingester -> db.add_log + extract_bounty -> db.add_bounty. Profiler now creates per-IP spans inside update_profiles with rich attributes (event_count, is_traversal, bounty_count, command_count). Traces in Jaeger now show the complete execution map from capture through ingestion and profiling. --- decnet/collector/worker.py | 17 +++++++---- decnet/profiler/worker.py | 36 ++++++++++++++--------- decnet/telemetry.py | 60 ++++++++++++++++++++++++++++++++++++++ decnet/web/ingester.py | 23 ++++++++++++--- 4 files changed, 114 insertions(+), 22 deletions(-) diff --git a/decnet/collector/worker.py b/decnet/collector/worker.py index a6714bd..83c14e9 100644 --- a/decnet/collector/worker.py +++ b/decnet/collector/worker.py @@ -18,7 +18,7 @@ from pathlib import Path from typing import Any, Optional from decnet.logging import get_logger -from decnet.telemetry import traced as _traced +from decnet.telemetry import traced as _traced, get_tracer as _get_tracer, inject_context as _inject_ctx logger = get_logger("collector") @@ -246,10 +246,17 @@ def _stream_container(container_id: str, log_path: Path, json_path: Path) -> Non parsed = parse_rfc5424(line) if parsed: if _should_ingest(parsed): - logger.debug("collector: event written decky=%s type=%s", parsed.get("decky"), parsed.get("event_type")) - jf = _reopen_if_needed(json_path, jf) - jf.write(json.dumps(parsed) + "\n") - jf.flush() + _tracer = _get_tracer("collector") + with _tracer.start_as_current_span("collector.event") as _span: + _span.set_attribute("decky", parsed.get("decky", "")) + _span.set_attribute("service", parsed.get("service", "")) + _span.set_attribute("event_type", parsed.get("event_type", "")) + _span.set_attribute("attacker_ip", parsed.get("attacker_ip", "")) + _inject_ctx(parsed) + logger.debug("collector: event written decky=%s type=%s", parsed.get("decky"), parsed.get("event_type")) + jf = _reopen_if_needed(json_path, jf) + jf.write(json.dumps(parsed) + "\n") + jf.flush() else: logger.debug( "collector: rate-limited decky=%s service=%s type=%s attacker=%s", diff --git a/decnet/profiler/worker.py b/decnet/profiler/worker.py index 86fc81a..3abaf8e 100644 --- a/decnet/profiler/worker.py +++ b/decnet/profiler/worker.py @@ -22,7 +22,7 @@ from decnet.correlation.engine import CorrelationEngine from decnet.correlation.parser import LogEvent from decnet.logging import get_logger from decnet.profiler.behavioral import build_behavior_record -from decnet.telemetry import traced as _traced +from decnet.telemetry import traced as _traced, get_tracer as _get_tracer from decnet.web.db.repository import BaseRepository logger = get_logger("attacker_worker") @@ -109,25 +109,35 @@ async def _update_profiles( traversal_map = {t.attacker_ip: t for t in state.engine.traversals(min_deckies=2)} bounties_map = await repo.get_bounties_for_ips(ips) + _tracer = _get_tracer("profiler") for ip in ips: events = state.engine._events.get(ip, []) if not events: continue - traversal = traversal_map.get(ip) - bounties = bounties_map.get(ip, []) - commands = _extract_commands_from_events(events) + with _tracer.start_as_current_span("profiler.process_ip") as _span: + _span.set_attribute("attacker_ip", ip) + _span.set_attribute("event_count", len(events)) - record = _build_record(ip, events, traversal, bounties, commands) - attacker_uuid = await repo.upsert_attacker(record) + traversal = traversal_map.get(ip) + bounties = bounties_map.get(ip, []) + commands = _extract_commands_from_events(events) - # Behavioral / fingerprint rollup lives in a sibling table so failures - # here never block the core attacker profile upsert. - try: - behavior = build_behavior_record(events) - await repo.upsert_attacker_behavior(attacker_uuid, behavior) - except Exception as exc: - logger.error("attacker worker: behavior upsert failed for %s: %s", ip, exc) + record = _build_record(ip, events, traversal, bounties, commands) + attacker_uuid = await repo.upsert_attacker(record) + + _span.set_attribute("is_traversal", traversal is not None) + _span.set_attribute("bounty_count", len(bounties)) + _span.set_attribute("command_count", len(commands)) + + # Behavioral / fingerprint rollup lives in a sibling table so failures + # here never block the core attacker profile upsert. + try: + behavior = build_behavior_record(events) + await repo.upsert_attacker_behavior(attacker_uuid, behavior) + except Exception as exc: + _span.record_exception(exc) + logger.error("attacker worker: behavior upsert failed for %s: %s", ip, exc) def _build_record( diff --git a/decnet/telemetry.py b/decnet/telemetry.py index 57d0884..d742a73 100644 --- a/decnet/telemetry.py +++ b/decnet/telemetry.py @@ -244,3 +244,63 @@ def wrap_repository(repo: Any) -> Any: return attr return TracedRepository(repo) + + +# --------------------------------------------------------------------------- +# Cross-stage trace context propagation +# --------------------------------------------------------------------------- +# The DECNET pipeline is decoupled via JSON files: +# collector -> .json file -> ingester -> DB -> profiler +# +# To show the full journey of an event in Jaeger, we embed W3C trace context +# into the JSON records. The collector injects it; the ingester extracts it +# and continues the trace as a child span. + +def inject_context(record: dict[str, Any]) -> None: + """Inject current OTEL trace context into *record* under ``_trace``. + + No-op when tracing is disabled. The ``_trace`` key is stripped by the + ingester after extraction — it never reaches the DB. + """ + if not _ENABLED: + return + try: + from opentelemetry.propagate import inject + carrier: dict[str, str] = {} + inject(carrier) + if carrier: + record["_trace"] = carrier + except Exception: + pass + + +def extract_context(record: dict[str, Any]) -> Any: + """Extract OTEL trace context from *record* and return it. + + Returns ``None`` when tracing is disabled or no context is present. + Removes the ``_trace`` key from the record so it doesn't leak into the DB. + """ + if not _ENABLED: + record.pop("_trace", None) + return None + try: + carrier = record.pop("_trace", None) + if not carrier: + return None + from opentelemetry.propagate import extract + return extract(carrier) + except Exception: + return None + + +def start_span_with_context(tracer: Any, name: str, context: Any = None) -> Any: + """Start a span, optionally as a child of an extracted context. + + Returns a context manager span. When *context* is ``None``, creates a + root span (normal behavior). + """ + if not _ENABLED: + return _NoOpSpan() + if context is not None: + return tracer.start_as_current_span(name, context=context) + return tracer.start_as_current_span(name) diff --git a/decnet/web/ingester.py b/decnet/web/ingester.py index 7a0a8ef..529b2aa 100644 --- a/decnet/web/ingester.py +++ b/decnet/web/ingester.py @@ -5,7 +5,12 @@ from typing import Any from pathlib import Path from decnet.logging import get_logger -from decnet.telemetry import traced as _traced +from decnet.telemetry import ( + traced as _traced, + get_tracer as _get_tracer, + extract_context as _extract_ctx, + start_span_with_context as _start_span, +) from decnet.web.db.repository import BaseRepository logger = get_logger("api") @@ -60,9 +65,19 @@ async def log_ingestion_worker(repo: BaseRepository) -> None: try: _log_data: dict[str, Any] = json.loads(_line.strip()) - logger.debug("ingest: record decky=%s event_type=%s", _log_data.get("decky"), _log_data.get("event_type")) - await repo.add_log(_log_data) - await _extract_bounty(repo, _log_data) + # Extract trace context injected by the collector. + # This makes the ingester span a child of the collector span, + # showing the full event journey in Jaeger. + _parent_ctx = _extract_ctx(_log_data) + _tracer = _get_tracer("ingester") + with _start_span(_tracer, "ingester.process_record", context=_parent_ctx) as _span: + _span.set_attribute("decky", _log_data.get("decky", "")) + _span.set_attribute("service", _log_data.get("service", "")) + _span.set_attribute("event_type", _log_data.get("event_type", "")) + _span.set_attribute("attacker_ip", _log_data.get("attacker_ip", "")) + logger.debug("ingest: record decky=%s event_type=%s", _log_data.get("decky"), _log_data.get("event_type")) + await repo.add_log(_log_data) + await _extract_bounty(repo, _log_data) except json.JSONDecodeError: logger.error("ingest: failed to decode JSON log line: %s", _line.strip()) continue