feat(profiler): wire BEHAVE-SHELL extraction onto attacker.session.ended

The profiler worker now consumes attacker.session.ended on the bus
AND walks unprofiled session_recorded log rows on every tick. Both
paths converge on a single handler that:

1. Validates required payload fields (session_id, decky_id, service,
   attacker_ip, shard_path).
2. Builds evidence_ref shard:{decky}/{service}/{shard_basename}#{sid}
   and skips when has_observations_for_evidence is True (idempotent
   re-runs).
3. Resolves attacker_uuid via get_attacker_uuid_by_ip; defers if the
   profiler tick hasn't materialised the row yet.
4. Reads the asciinema shard, slices events for the sid, calls
   extract_session, persists each Observation via upsert_observation
   (per-row; batch transaction filed as follow-up), then publishes
   each on the bus best-effort (fire-and-forget per DEBT-029 §6).

Architecture:
* Handler lives in decnet/profiler/behave_shell/_handler.py — pure
  function, unit-tested in isolation.
* Worker.py adds _behave_pump (queue feed), _drain_behave_queue
  (per-tick drain), _behave_poll_tick (cursor scan over
  session_recorded logs), and _payload_from_log_row (Log → bus-shape
  payload projection).
* Poll cursor uses a separate state key
  (attacker_worker_session_cursor) so the correlation tick's cursor
  doesn't conflate.
* has_observations_for_evidence promoted to BaseRepository abstract.

22 new tests across handler / drain / poll layers covering happy
path, all skip paths, isolation against handler exceptions,
idempotency on re-run, and cursor key separation. TTP worker bus
tests still green — payload field is purely additive.

Closes BEHAVE-INTEGRATION.md Phase 4.
This commit is contained in:
2026-05-08 18:57:45 -04:00
parent 834aa613b1
commit 5ff89eefe7
6 changed files with 828 additions and 0 deletions

View File

@@ -0,0 +1,219 @@
"""``attacker.session.ended`` handler — Phase 4 wiring.
Pure handler module: takes a payload (from bus or poll fallback),
disk-reaches the asciinema shard, runs ``extract_session()``,
upserts observations, and publishes them on the bus best-effort.
Lives outside ``worker.py`` so unit tests can exercise it without
spinning up the asyncio worker loop.
Trigger isolation: every public entry point is wrapped in a single
try/except in the worker; this module is allowed to raise. The worker
logs and continues with the next event.
"""
from __future__ import annotations
import collections
import json
from pathlib import Path
from typing import Any, Callable, Iterable, Optional
from decnet_behave_core.spec.envelope import Observation
from decnet_behave_shell.spec.event_adapter import event_topic_for, to_event_payload
from decnet.logging import get_logger
from decnet.profiler.behave_shell import extract_session
from decnet.profiler.behave_shell._parse import AsciinemaEvent, parse_shard_line
from decnet.web.db.repository import BaseRepository
log = get_logger("profiler.behave_handler")
PublishFn = Callable[[str, dict[str, Any], str], None]
"""Bus-publish callable (sync). The thread-safe publisher returned by
``decnet.bus.publish.make_thread_safe_publisher`` matches this shape;
``None`` is also accepted (no-op publish path)."""
_REQUIRED_FIELDS: tuple[str, ...] = (
"session_id", "decky_id", "service", "attacker_ip",
)
def _build_evidence_ref(decky: str, service: str, shard_path: str, sid: str) -> str:
"""Canonical ``shard:{decky}/{service}/{shard_basename}#{sid}`` pointer.
Stays a *pointer*, never the evidence itself. Worker uses it as
the idempotency key against the ``observations`` table.
"""
basename = Path(shard_path).name
return f"shard:{decky}/{service}/{basename}#{sid}"
def _events_for_sid(shard: Path, sid: str) -> list[AsciinemaEvent]:
"""Read ``shard``, return parsed events whose ``sid`` matches.
Mirrors the loader pattern in
``tests/profiler/behave_shell/test_calibration_grid.py``: skip
headers / non-matching sids / unparseable lines silently.
"""
events: list[AsciinemaEvent] = []
with shard.open() as f:
for line in f:
try:
rec = json.loads(line)
except (ValueError, json.JSONDecodeError):
continue
if not isinstance(rec, dict):
continue
if rec.get("sid") != sid or "hdr" in rec:
continue
ev = parse_shard_line(line)
if ev is not None:
events.append(ev)
return events
def _flatten_observation(obs: Observation, attacker_uuid: str) -> dict[str, Any]:
"""Project a BEHAVE Observation onto the ObservationRow column shape.
Mirrors the storage schema in
``decnet/web/db/models/observations.py`` — flattens
``window.{start,end}_ts`` and stamps the DECNET-side
``attacker_uuid`` denorm. ``id`` / ``ts`` / ``v`` / ``identity_ref``
/ ``evidence_ref`` ride through unchanged.
"""
return {
"id": obs.id,
"identity_ref": obs.identity_ref,
"primitive": obs.primitive,
"value": obs.value,
"confidence": obs.confidence,
"window_start_ts": obs.window.start_ts,
"window_end_ts": obs.window.end_ts,
"source": obs.source,
"evidence_ref": obs.evidence_ref,
"envelope_v": obs.v,
"ts": obs.ts,
"attacker_uuid": attacker_uuid,
}
def _publish_observation(publish: Optional[PublishFn], obs: Observation) -> None:
"""Best-effort publish; never raise. Re-merges id/ts/v into payload
per BEHAVE-INTEGRATION.md §339-366 deviation note."""
if publish is None:
return
payload = to_event_payload(obs) | {
"id": obs.id,
"ts": obs.ts,
"v": obs.v,
}
try:
publish(event_topic_for(obs.primitive), payload, obs.primitive)
except Exception as exc: # noqa: BLE001
log.debug(
"behave_handler: publish failed for primitive=%s: %s",
obs.primitive, exc,
)
async def handle_session_ended(
repo: BaseRepository,
payload: dict[str, Any],
publish: Optional[PublishFn] = None,
) -> int:
"""Process one ``attacker.session.ended`` event end-to-end.
Returns the number of observations persisted (zero on any skip
path: missing fields, missing shard, idempotency hit, attacker
not yet resolved, sid not in shard, extractor produced nothing).
Order: persist first, publish best-effort. DB is the source of
truth (see BEHAVE-INTEGRATION.md §"Persistence").
"""
# 1. Required-field guard.
missing = [k for k in _REQUIRED_FIELDS if not payload.get(k)]
if missing:
log.debug(
"behave_handler: skipping session.ended (missing fields=%s)",
missing,
)
return 0
shard_path = payload.get("shard_path")
if not shard_path:
log.debug("behave_handler: skipping session.ended (no shard_path)")
return 0
sid = str(payload["session_id"])
decky = str(payload["decky_id"])
service = str(payload["service"])
attacker_ip = str(payload["attacker_ip"])
# 2. evidence_ref + idempotency.
evidence_ref = _build_evidence_ref(decky, service, str(shard_path), sid)
if await repo.has_observations_for_evidence(evidence_ref):
log.debug(
"behave_handler: already profiled evidence_ref=%s", evidence_ref,
)
return 0
# 3. Resolve attacker_uuid. Skip until profiler tick has materialised
# the Attacker row — same posture as TTP's _resolve_attacker_uuid.
attacker_uuid = await repo.get_attacker_uuid_by_ip(attacker_ip)
if not attacker_uuid:
log.info(
"behave_handler: no Attacker row for ip=%s yet; deferring",
attacker_ip,
)
return 0
# 4. Load shard, slice events.
shard = Path(shard_path)
if not shard.is_file():
log.info(
"behave_handler: shard not on disk yet path=%s sid=%s; deferring",
shard_path, sid,
)
return 0
events = _events_for_sid(shard, sid)
if not events:
log.info(
"behave_handler: sid=%s not present in shard=%s; skipping",
sid, shard_path,
)
return 0
# 5. Extract.
observations: list[Observation] = []
for obs in extract_session(events, sid=sid, evidence_ref=evidence_ref):
observations.append(obs)
if not observations:
log.info(
"behave_handler: extractor produced zero observations sid=%s",
sid,
)
return 0
# 6. Persist. Per-row upsert via the existing repo method; the
# idempotency unique index makes accidental duplicates impossible.
# Any per-row failure aborts publishing — DB is source of truth.
persisted = 0
for obs in observations:
await repo.upsert_observation(_flatten_observation(obs, attacker_uuid))
persisted += 1
# 7. Publish — fire-and-forget, never raises out.
for obs in observations:
_publish_observation(publish, obs)
log.info(
"behave_handler: persisted=%d primitives sid=%s attacker_ip=%s",
persisted, sid, attacker_ip,
)
return persisted
def primitive_counts(observations: Iterable[Observation]) -> dict[str, int]:
"""Small debug helper: count emissions by primitive name."""
counter: collections.Counter[str] = collections.Counter()
for obs in observations:
counter[obs.primitive] += 1
return dict(counter)

View File

@@ -20,7 +20,9 @@ from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Callable
from decnet.artifacts.shards import find_shard_with_sid
from decnet.bus import topics as _topics
from decnet.bus.base import BaseBus, Event
from decnet.bus.factory import get_bus
from decnet.bus.publish import (
make_thread_safe_publisher,
@@ -33,6 +35,7 @@ from decnet.asn import enrich_ip as enrich_ip_asn
from decnet.geoip import enrich_ip
from decnet.geoip.ptr import resolve_ptr_record
from decnet.logging import get_logger
from decnet.profiler.behave_shell._handler import handle_session_ended
from decnet.profiler.behavioral import build_behavior_record
from decnet.telemetry import traced as _traced, get_tracer as _get_tracer
from decnet.web.db.repository import BaseRepository
@@ -41,6 +44,14 @@ logger = get_logger("attacker_worker")
_BATCH_SIZE = 500
_STATE_KEY = "attacker_worker_cursor"
# Separate cursor for the BEHAVE-SHELL poll fallback so it doesn't
# conflate with the correlation tick's log-id cursor (memory rule:
# "Poll fallback's Log cursor — use a separate state key").
_BEHAVE_POLL_STATE_KEY = "attacker_worker_session_cursor"
# Pattern the bus subscription matches. Single-topic for BEHAVE-SHELL
# wiring; matches what the collector publishes from
# ``_SessionAggregator._emit_session``.
_BEHAVE_TOPIC = _topics.attacker(_topics.ATTACKER_SESSION_ENDED)
# Event types that indicate active command/query execution — the
# shell-family subset of INTERACTION_EVENT_TYPES in
@@ -126,6 +137,17 @@ async def attacker_profile_worker(repo: BaseRepository, *, interval: int = 30) -
control_task = asyncio.create_task(
run_control_listener(bus, "profiler", shutdown),
)
# BEHAVE-SHELL session-ended handler — bus subscription pump (when
# bus is available) feeds an asyncio.Queue; the tick body drains
# the queue per iteration. Same shape as decnet/ttp/worker.py.
behave_queue: "asyncio.Queue[tuple[str, Event] | None]" = asyncio.Queue()
behave_pump_task: asyncio.Task[None] | None = None
if bus is not None:
behave_pump_task = asyncio.create_task(
_behave_pump(bus, behave_queue),
)
try:
while not shutdown.is_set():
try:
@@ -138,11 +160,26 @@ async def attacker_profile_worker(repo: BaseRepository, *, interval: int = 30) -
await _incremental_update(repo, state)
except Exception as exc:
logger.error("attacker worker: update failed: %s", exc)
# BEHAVE-SHELL drain (bus path).
await _drain_behave_queue(repo, behave_queue, raw_publish)
# BEHAVE-SHELL poll fallback. Always runs — when bus is up
# this catches anything the subscription missed during a
# transient reconnect; when bus is down it's the only path.
try:
await _behave_poll_tick(repo, raw_publish)
except Exception as exc: # noqa: BLE001
logger.error(
"attacker worker: behave poll tick failed: %s", exc,
)
finally:
for t in (heartbeat_task, control_task):
t.cancel()
with contextlib.suppress(Exception, asyncio.CancelledError):
await t
if behave_pump_task is not None:
behave_pump_task.cancel()
with contextlib.suppress(Exception, asyncio.CancelledError):
await behave_pump_task
if bus is not None:
with contextlib.suppress(Exception):
await bus.close()
@@ -440,3 +477,132 @@ def _extract_smtp_domains(events: list[LogEvent]) -> set[str]:
if domain:
domains.add(domain)
return domains
# ── BEHAVE-SHELL session-ended wiring (Phase 4) ─────────────────────────────
async def _behave_pump(
bus: BaseBus,
queue: "asyncio.Queue[tuple[str, Event] | None]",
) -> None:
"""Forward every ``attacker.session.ended`` event into ``queue``.
Tolerance contract mirrors :func:`decnet.ttp.worker._pump`: the
subscriber dies → log-and-fall-back-to-poll, never crash the worker
loop. The poll path (always-on per tick) catches anything missed
while the subscription is down.
"""
try:
sub = bus.subscribe(_BEHAVE_TOPIC)
async with sub:
async for event in sub:
await queue.put((event.topic, event))
except asyncio.CancelledError:
raise
except Exception as exc: # noqa: BLE001
logger.warning(
"attacker worker: behave subscriber for %s died (%s); "
"falling back to poll", _BEHAVE_TOPIC, exc,
)
async def _drain_behave_queue(
repo: BaseRepository,
queue: "asyncio.Queue[tuple[str, Event] | None]",
publish: Callable[[str, dict[str, Any], str], None] | None,
) -> None:
"""Drain queued ``attacker.session.ended`` events through the
handler. Each handler invocation is isolated — exceptions log and
do not block the next event."""
while not queue.empty():
item = queue.get_nowait()
if item is None:
continue
_topic, event = item
try:
await handle_session_ended(repo, event.payload, publish)
except Exception as exc: # noqa: BLE001
logger.error(
"attacker worker: behave handler raised on bus path: %s", exc,
)
async def _behave_poll_tick(
repo: BaseRepository,
publish: Callable[[str, dict[str, Any], str], None] | None,
) -> None:
"""Poll fallback: scan ``Log`` rows after the saved cursor for
``event_type='session_recorded'`` and call the handler for any
not yet profiled.
Cursor is stored under :data:`_BEHAVE_POLL_STATE_KEY`, separate from
the correlation tick's cursor so the two never conflate.
"""
cursor_state = await repo.get_state(_BEHAVE_POLL_STATE_KEY) or {}
last_id = int(cursor_state.get("last_log_id", 0))
rows = await repo.get_logs_after_id(last_id, limit=_BATCH_SIZE)
if not rows:
return
new_cursor = last_id
for row in rows:
new_cursor = max(new_cursor, int(row.get("id", 0)))
if row.get("event_type") != "session_recorded":
continue
payload = _payload_from_log_row(row)
if payload is None:
continue
try:
await handle_session_ended(repo, payload, publish)
except Exception as exc: # noqa: BLE001
logger.error(
"attacker worker: behave handler raised on poll path: %s", exc,
)
if new_cursor > last_id:
await repo.set_state(
_BEHAVE_POLL_STATE_KEY, {"last_log_id": new_cursor},
)
def _payload_from_log_row(row: dict[str, Any]) -> dict[str, Any] | None:
"""Project a ``session_recorded`` Log row into the same shape the
collector publishes on the bus.
Returns ``None`` when required fields are missing — the handler
has its own guard, but pre-filtering here avoids the round-trip to
the handler's logger for malformed rows.
"""
fields_raw = row.get("fields") or "{}"
if isinstance(fields_raw, dict):
fields = fields_raw
else:
try:
fields = json.loads(fields_raw)
except (ValueError, TypeError):
return None
sid = fields.get("sid")
decky = row.get("decky")
service = fields.get("service") or row.get("service")
attacker_ip = row.get("attacker_ip")
if not (sid and decky and service and attacker_ip):
return None
# Resolve shard_path locally — the Log row may not carry one
# (sessrec.c does not yet emit fields.shard_path).
shard_path: str | None = None
try:
resolved = find_shard_with_sid(str(decky), str(service), str(sid))
except (ValueError, OSError, PermissionError):
resolved = None
if resolved is not None:
shard_path = str(resolved)
return {
"session_id": str(sid),
"attacker_uuid": None,
"attacker_ip": str(attacker_ip),
"decky_id": str(decky),
"service": str(service),
"ended_at": row.get("timestamp"),
"duration_s": fields.get("duration_s"),
"commands": [],
"shard_path": shard_path,
}

View File

@@ -341,6 +341,16 @@ class BaseRepository(ABC):
ordered by ``ts`` ASC. Empty list when none."""
raise NotImplementedError
@abstractmethod
async def has_observations_for_evidence(self, evidence_ref: str) -> bool:
"""True iff any observation row carries this ``evidence_ref``.
Worker uses this as the "have we already profiled this session?"
check before kicking the BEHAVE-SHELL extractor — equivalent
to "is this ``(decky, service, sid)`` already in the table?".
"""
raise NotImplementedError
async def upsert_observed_attachment(
self,
*,