feat: cross-stage trace propagation and granular per-event spans
Collector now creates a span per event and injects W3C trace context into JSON records. Ingester extracts that context and creates child spans, connecting the full event journey: collector -> ingester -> db.add_log + extract_bounty -> db.add_bounty. Profiler now creates per-IP spans inside update_profiles with rich attributes (event_count, is_traversal, bounty_count, command_count). Traces in Jaeger now show the complete execution map from capture through ingestion and profiling.
This commit is contained in:
@@ -18,7 +18,7 @@ from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
from decnet.logging import get_logger
|
||||
from decnet.telemetry import traced as _traced
|
||||
from decnet.telemetry import traced as _traced, get_tracer as _get_tracer, inject_context as _inject_ctx
|
||||
|
||||
logger = get_logger("collector")
|
||||
|
||||
@@ -246,10 +246,17 @@ def _stream_container(container_id: str, log_path: Path, json_path: Path) -> Non
|
||||
parsed = parse_rfc5424(line)
|
||||
if parsed:
|
||||
if _should_ingest(parsed):
|
||||
logger.debug("collector: event written decky=%s type=%s", parsed.get("decky"), parsed.get("event_type"))
|
||||
jf = _reopen_if_needed(json_path, jf)
|
||||
jf.write(json.dumps(parsed) + "\n")
|
||||
jf.flush()
|
||||
_tracer = _get_tracer("collector")
|
||||
with _tracer.start_as_current_span("collector.event") as _span:
|
||||
_span.set_attribute("decky", parsed.get("decky", ""))
|
||||
_span.set_attribute("service", parsed.get("service", ""))
|
||||
_span.set_attribute("event_type", parsed.get("event_type", ""))
|
||||
_span.set_attribute("attacker_ip", parsed.get("attacker_ip", ""))
|
||||
_inject_ctx(parsed)
|
||||
logger.debug("collector: event written decky=%s type=%s", parsed.get("decky"), parsed.get("event_type"))
|
||||
jf = _reopen_if_needed(json_path, jf)
|
||||
jf.write(json.dumps(parsed) + "\n")
|
||||
jf.flush()
|
||||
else:
|
||||
logger.debug(
|
||||
"collector: rate-limited decky=%s service=%s type=%s attacker=%s",
|
||||
|
||||
@@ -22,7 +22,7 @@ from decnet.correlation.engine import CorrelationEngine
|
||||
from decnet.correlation.parser import LogEvent
|
||||
from decnet.logging import get_logger
|
||||
from decnet.profiler.behavioral import build_behavior_record
|
||||
from decnet.telemetry import traced as _traced
|
||||
from decnet.telemetry import traced as _traced, get_tracer as _get_tracer
|
||||
from decnet.web.db.repository import BaseRepository
|
||||
|
||||
logger = get_logger("attacker_worker")
|
||||
@@ -109,25 +109,35 @@ async def _update_profiles(
|
||||
traversal_map = {t.attacker_ip: t for t in state.engine.traversals(min_deckies=2)}
|
||||
bounties_map = await repo.get_bounties_for_ips(ips)
|
||||
|
||||
_tracer = _get_tracer("profiler")
|
||||
for ip in ips:
|
||||
events = state.engine._events.get(ip, [])
|
||||
if not events:
|
||||
continue
|
||||
|
||||
traversal = traversal_map.get(ip)
|
||||
bounties = bounties_map.get(ip, [])
|
||||
commands = _extract_commands_from_events(events)
|
||||
with _tracer.start_as_current_span("profiler.process_ip") as _span:
|
||||
_span.set_attribute("attacker_ip", ip)
|
||||
_span.set_attribute("event_count", len(events))
|
||||
|
||||
record = _build_record(ip, events, traversal, bounties, commands)
|
||||
attacker_uuid = await repo.upsert_attacker(record)
|
||||
traversal = traversal_map.get(ip)
|
||||
bounties = bounties_map.get(ip, [])
|
||||
commands = _extract_commands_from_events(events)
|
||||
|
||||
# Behavioral / fingerprint rollup lives in a sibling table so failures
|
||||
# here never block the core attacker profile upsert.
|
||||
try:
|
||||
behavior = build_behavior_record(events)
|
||||
await repo.upsert_attacker_behavior(attacker_uuid, behavior)
|
||||
except Exception as exc:
|
||||
logger.error("attacker worker: behavior upsert failed for %s: %s", ip, exc)
|
||||
record = _build_record(ip, events, traversal, bounties, commands)
|
||||
attacker_uuid = await repo.upsert_attacker(record)
|
||||
|
||||
_span.set_attribute("is_traversal", traversal is not None)
|
||||
_span.set_attribute("bounty_count", len(bounties))
|
||||
_span.set_attribute("command_count", len(commands))
|
||||
|
||||
# Behavioral / fingerprint rollup lives in a sibling table so failures
|
||||
# here never block the core attacker profile upsert.
|
||||
try:
|
||||
behavior = build_behavior_record(events)
|
||||
await repo.upsert_attacker_behavior(attacker_uuid, behavior)
|
||||
except Exception as exc:
|
||||
_span.record_exception(exc)
|
||||
logger.error("attacker worker: behavior upsert failed for %s: %s", ip, exc)
|
||||
|
||||
|
||||
def _build_record(
|
||||
|
||||
@@ -244,3 +244,63 @@ def wrap_repository(repo: Any) -> Any:
|
||||
return attr
|
||||
|
||||
return TracedRepository(repo)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cross-stage trace context propagation
|
||||
# ---------------------------------------------------------------------------
|
||||
# The DECNET pipeline is decoupled via JSON files:
|
||||
# collector -> .json file -> ingester -> DB -> profiler
|
||||
#
|
||||
# To show the full journey of an event in Jaeger, we embed W3C trace context
|
||||
# into the JSON records. The collector injects it; the ingester extracts it
|
||||
# and continues the trace as a child span.
|
||||
|
||||
def inject_context(record: dict[str, Any]) -> None:
|
||||
"""Inject current OTEL trace context into *record* under ``_trace``.
|
||||
|
||||
No-op when tracing is disabled. The ``_trace`` key is stripped by the
|
||||
ingester after extraction — it never reaches the DB.
|
||||
"""
|
||||
if not _ENABLED:
|
||||
return
|
||||
try:
|
||||
from opentelemetry.propagate import inject
|
||||
carrier: dict[str, str] = {}
|
||||
inject(carrier)
|
||||
if carrier:
|
||||
record["_trace"] = carrier
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def extract_context(record: dict[str, Any]) -> Any:
|
||||
"""Extract OTEL trace context from *record* and return it.
|
||||
|
||||
Returns ``None`` when tracing is disabled or no context is present.
|
||||
Removes the ``_trace`` key from the record so it doesn't leak into the DB.
|
||||
"""
|
||||
if not _ENABLED:
|
||||
record.pop("_trace", None)
|
||||
return None
|
||||
try:
|
||||
carrier = record.pop("_trace", None)
|
||||
if not carrier:
|
||||
return None
|
||||
from opentelemetry.propagate import extract
|
||||
return extract(carrier)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def start_span_with_context(tracer: Any, name: str, context: Any = None) -> Any:
|
||||
"""Start a span, optionally as a child of an extracted context.
|
||||
|
||||
Returns a context manager span. When *context* is ``None``, creates a
|
||||
root span (normal behavior).
|
||||
"""
|
||||
if not _ENABLED:
|
||||
return _NoOpSpan()
|
||||
if context is not None:
|
||||
return tracer.start_as_current_span(name, context=context)
|
||||
return tracer.start_as_current_span(name)
|
||||
|
||||
@@ -5,7 +5,12 @@ from typing import Any
|
||||
from pathlib import Path
|
||||
|
||||
from decnet.logging import get_logger
|
||||
from decnet.telemetry import traced as _traced
|
||||
from decnet.telemetry import (
|
||||
traced as _traced,
|
||||
get_tracer as _get_tracer,
|
||||
extract_context as _extract_ctx,
|
||||
start_span_with_context as _start_span,
|
||||
)
|
||||
from decnet.web.db.repository import BaseRepository
|
||||
|
||||
logger = get_logger("api")
|
||||
@@ -60,9 +65,19 @@ async def log_ingestion_worker(repo: BaseRepository) -> None:
|
||||
|
||||
try:
|
||||
_log_data: dict[str, Any] = json.loads(_line.strip())
|
||||
logger.debug("ingest: record decky=%s event_type=%s", _log_data.get("decky"), _log_data.get("event_type"))
|
||||
await repo.add_log(_log_data)
|
||||
await _extract_bounty(repo, _log_data)
|
||||
# Extract trace context injected by the collector.
|
||||
# This makes the ingester span a child of the collector span,
|
||||
# showing the full event journey in Jaeger.
|
||||
_parent_ctx = _extract_ctx(_log_data)
|
||||
_tracer = _get_tracer("ingester")
|
||||
with _start_span(_tracer, "ingester.process_record", context=_parent_ctx) as _span:
|
||||
_span.set_attribute("decky", _log_data.get("decky", ""))
|
||||
_span.set_attribute("service", _log_data.get("service", ""))
|
||||
_span.set_attribute("event_type", _log_data.get("event_type", ""))
|
||||
_span.set_attribute("attacker_ip", _log_data.get("attacker_ip", ""))
|
||||
logger.debug("ingest: record decky=%s event_type=%s", _log_data.get("decky"), _log_data.get("event_type"))
|
||||
await repo.add_log(_log_data)
|
||||
await _extract_bounty(repo, _log_data)
|
||||
except json.JSONDecodeError:
|
||||
logger.error("ingest: failed to decode JSON log line: %s", _line.strip())
|
||||
continue
|
||||
|
||||
Reference in New Issue
Block a user