Replaces LICENSE (GPLv3 -> AGPLv3) and prepends `SPDX-License-Identifier: AGPL-3.0-or-later` to every source file across decnet/, decnet_web/, tests/, scripts/, and tools/. Rationale: closes the GPLv3 ASP loophole so any party operating a modified DECNET as a network service must offer their modified source. Personal copyright (Samuel Paschuan) + inbound=outbound contributions make a future unilateral relicense infeasible. - LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt) - COPYRIGHT: project copyright notice - tools/add_spdx_headers.py: idempotent header injector (shebang- and PEP 263-aware) Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh). No behavior change; comments only.
86 lines
2.7 KiB
Python
86 lines
2.7 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""W.3 bus-path drain tests.
|
|
|
|
Exercises ``_drain_behave_queue`` directly without the asyncio worker
|
|
loop. The handler is unit-tested in
|
|
``test_handler_session_ended.py``; this file pins the queue-drain
|
|
plumbing (Event unwrapping, isolation against handler exceptions,
|
|
empty-queue no-op).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from typing import Any
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
from decnet.profiler.worker import _drain_behave_queue
|
|
|
|
|
|
async def _make_event(payload: dict[str, Any]):
|
|
"""Build a minimal Event-like object the drain expects."""
|
|
ev = MagicMock()
|
|
ev.topic = "attacker.session.ended"
|
|
ev.payload = payload
|
|
return ev
|
|
|
|
|
|
async def test_drain_empty_queue_is_noop() -> None:
|
|
repo = AsyncMock()
|
|
queue: asyncio.Queue = asyncio.Queue()
|
|
await _drain_behave_queue(repo, queue, None)
|
|
repo.has_observations_for_evidence.assert_not_awaited()
|
|
|
|
|
|
async def test_drain_skips_none_sentinel() -> None:
|
|
repo = AsyncMock()
|
|
queue: asyncio.Queue = asyncio.Queue()
|
|
await queue.put(None)
|
|
await _drain_behave_queue(repo, queue, None)
|
|
repo.has_observations_for_evidence.assert_not_awaited()
|
|
|
|
|
|
async def test_drain_passes_event_payload_to_handler(monkeypatch) -> None:
|
|
"""The drain unwraps Event.payload and feeds it to the handler."""
|
|
captured: list[dict[str, Any]] = []
|
|
|
|
async def _fake_handler(repo, payload, publish):
|
|
captured.append(payload)
|
|
return 0
|
|
|
|
monkeypatch.setattr(
|
|
"decnet.profiler.worker.handle_session_ended", _fake_handler,
|
|
)
|
|
repo = AsyncMock()
|
|
queue: asyncio.Queue = asyncio.Queue()
|
|
ev = await _make_event({"session_id": "abc", "decky_id": "d"})
|
|
await queue.put((ev.topic, ev))
|
|
await _drain_behave_queue(repo, queue, None)
|
|
assert captured == [{"session_id": "abc", "decky_id": "d"}]
|
|
|
|
|
|
async def test_drain_isolates_handler_exception(monkeypatch) -> None:
|
|
"""A handler that raises must not crash subsequent events."""
|
|
call_count = 0
|
|
|
|
async def _maybe_blowing_handler(repo, payload, publish):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
if call_count == 1:
|
|
raise RuntimeError("handler exploded")
|
|
return 0
|
|
|
|
monkeypatch.setattr(
|
|
"decnet.profiler.worker.handle_session_ended",
|
|
_maybe_blowing_handler,
|
|
)
|
|
repo = AsyncMock()
|
|
queue: asyncio.Queue = asyncio.Queue()
|
|
ev1 = await _make_event({"session_id": "a"})
|
|
ev2 = await _make_event({"session_id": "b"})
|
|
await queue.put((ev1.topic, ev1))
|
|
await queue.put((ev2.topic, ev2))
|
|
|
|
# Should not raise; both events should be drained.
|
|
await _drain_behave_queue(repo, queue, None)
|
|
assert call_count == 2
|