diff --git a/decnet/collector/worker.py b/decnet/collector/worker.py index e57b9853..3a95bde7 100644 --- a/decnet/collector/worker.py +++ b/decnet/collector/worker.py @@ -18,6 +18,7 @@ from datetime import datetime from pathlib import Path from typing import Any, Callable, Optional +from decnet.artifacts.shards import find_shard_with_sid from decnet.bus import topics as _topics from decnet.bus.factory import get_bus from decnet.bus.publish import ( @@ -304,6 +305,25 @@ class _SessionAggregator: entry[key] = value commands.append(entry) + # Resolve the asciinema shard so consumers (notably the BEHAVE-SHELL + # session-ended handler in the profiler worker) don't each have to + # disk-reach independently. Shard fields can be malformed or the + # transcripts dir may not exist yet — find_shard_with_sid returns + # None in those cases and we publish ``shard_path: None`` so the + # consumer skips honestly. Additive field; existing TTP consumers + # ignore it. + shard_path: str | None = None + if sid and decky and service: + try: + resolved = find_shard_with_sid(decky, service, sid) + except (ValueError, OSError, PermissionError) as exc: + logger.debug( + "collector: shard resolve failed for sid=%s: %s", sid, exc, + ) + resolved = None + if resolved is not None: + shard_path = str(resolved) + payload: dict[str, Any] = { "session_id": sid or None, "attacker_uuid": None, # consumer resolves via repo @@ -313,6 +333,7 @@ class _SessionAggregator: "ended_at": ended_at.isoformat(), "duration_s": duration_s, "commands": commands, + "shard_path": shard_path, } topic = _topics.attacker(_topics.ATTACKER_SESSION_ENDED) try: diff --git a/tests/collector/test_session_aggregator.py b/tests/collector/test_session_aggregator.py index 64c754b5..b32f10ad 100644 --- a/tests/collector/test_session_aggregator.py +++ b/tests/collector/test_session_aggregator.py @@ -254,3 +254,54 @@ def test_publish_failure_is_swallowed() -> None: agg.add_event(_cmd("2026-05-02T06:22:50", "whoami")) # Should NOT raise. agg.add_event(_session_recorded("2026-05-02T06:23:00", sid="s1")) + + +# ── shard_path enrichment (W.1) ───────────────────────────────────── + + +def test_session_ended_payload_carries_shard_path_when_shard_exists( + aggregator: _SessionAggregator, + captured_publishes: list[tuple[str, dict[str, Any], str]], + tmp_path, + monkeypatch, +) -> None: + """When find_shard_with_sid resolves, the payload carries the path.""" + import json + from decnet.artifacts import shards + + sid = "11111111-2222-3333-4444-555555555555" + shard_dir = tmp_path / "omega-decky" / "ssh" / "transcripts" + shard_dir.mkdir(parents=True) + shard = shard_dir / "sessions-2026-05-02.jsonl" + shard.write_text(json.dumps({"sid": sid, "hdr": {}}) + "\n") + + monkeypatch.setattr(shards, "ARTIFACTS_ROOT", tmp_path) + shards._INDEX_CACHE.clear() + + aggregator.add_event(_cmd("2026-05-02T06:22:48", "whoami")) + aggregator.add_event(_session_recorded( + "2026-05-02T06:23:00", sid=sid, duration_s=120.0, + )) + + payload = captured_publishes[0][1] + assert payload["shard_path"] == str(shard.resolve()) + + +def test_session_ended_payload_shard_path_none_when_unresolvable( + aggregator: _SessionAggregator, + captured_publishes: list[tuple[str, dict[str, Any], str]], + tmp_path, + monkeypatch, +) -> None: + """No shard on disk → shard_path is None (consumer skips honestly).""" + from decnet.artifacts import shards + monkeypatch.setattr(shards, "ARTIFACTS_ROOT", tmp_path) + shards._INDEX_CACHE.clear() + + aggregator.add_event(_cmd("2026-05-02T06:22:48", "whoami")) + aggregator.add_event(_session_recorded( + "2026-05-02T06:23:00", sid="ffffffff-eeee-dddd-cccc-bbbbbbbbbbbb", + )) + + payload = captured_publishes[0][1] + assert payload["shard_path"] is None