Files
DECNET/tests/web/test_ingester_bus.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

152 lines
4.9 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""Bus wiring for the ingester (DEBT-031, worker 6).
The ingester emits one ``system.log`` event per DB-committed batch via
``_publish_batch``. Per-line noise lives on the collector side; the
ingester's job is to signal "N rows landed in the DB up to offset P" so
heartbeat / federation consumers can tail DB progress without polling
the state table.
"""
from __future__ import annotations
import asyncio
import pytest
import pytest_asyncio
from decnet.bus.fake import FakeBus
from decnet.web.ingester import _publish_batch
@pytest_asyncio.fixture
async def bus() -> FakeBus:
b = FakeBus()
await b.connect()
yield b
await b.close()
@pytest.mark.asyncio
async def test_publish_batch_fires_on_nonempty_flush(bus: FakeBus) -> None:
sub = bus.subscribe("system.log")
async with sub:
await _publish_batch(bus, flushed=17, position=4096)
event = await asyncio.wait_for(sub.__anext__(), timeout=2.0)
assert event.topic == "system.log"
assert event.type == "batch_committed"
assert event.payload == {
"component": "ingester",
"flushed": 17,
"position": 4096,
}
@pytest.mark.asyncio
async def test_publish_batch_skips_zero_row_flush(bus: FakeBus) -> None:
# An empty batch shouldn't pollute the topic — nothing to signal.
sub = bus.subscribe("system.log")
async with sub:
await _publish_batch(bus, flushed=0, position=0)
# Expect nothing within a short window. asyncio.wait_for raises
# TimeoutError when no event arrives.
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(sub.__anext__(), timeout=0.2)
@pytest.mark.asyncio
async def test_publish_batch_is_noop_when_bus_is_none() -> None:
# Bus-disabled path: ingester passes bus=None into _publish_batch.
# Must be a safe no-op; no exceptions, no hangs.
await _publish_batch(None, flushed=5, position=123)
@pytest.mark.asyncio
async def test_publish_batch_swallows_bus_failures(monkeypatch) -> None:
# A dead bus must never break the ingestion loop.
class _ExplodingBus:
async def publish(self, *_args, **_kwargs):
raise RuntimeError("transport exploded")
await _publish_batch(_ExplodingBus(), flushed=3, position=42)
@pytest.mark.asyncio
async def test_credential_captured_published_on_upsert(bus: FakeBus) -> None:
"""A successful credential ingest publishes ``credential.captured`` once
with the secret hash, kind, attacker IP, decky, and service.
"""
from unittest.mock import AsyncMock
from decnet.web.ingester import _ingest_credential_native
repo = AsyncMock()
repo.upsert_credential = AsyncMock(return_value=1)
sub = bus.subscribe("credential.captured")
async with sub:
await _ingest_credential_native(
repo,
log_data={
"attacker_ip": "10.0.0.5",
"decky": "decky-01",
"service": "ssh",
},
fields={
"secret_b64": "aHVudGVyMg==",
"secret_kind": "plaintext",
"principal": "root",
"secret_printable": "hunter2",
},
bus=bus,
)
event = await asyncio.wait_for(sub.__anext__(), timeout=2.0)
assert event.topic == "credential.captured"
assert event.type == "captured"
assert event.payload["secret_kind"] == "plaintext"
assert event.payload["attacker_ip"] == "10.0.0.5"
assert event.payload["decky"] == "decky-01"
assert event.payload["service"] == "ssh"
# Hash is sha256 of decoded "hunter2".
import hashlib
assert event.payload["secret_sha256"] == hashlib.sha256(b"hunter2").hexdigest()
repo.upsert_credential.assert_awaited_once()
@pytest.mark.asyncio
async def test_credential_captured_silent_on_validation_failure(bus: FakeBus) -> None:
"""A dropped credential (invalid b64) must not publish anything."""
from unittest.mock import AsyncMock
from decnet.web.ingester import _ingest_credential_native
repo = AsyncMock()
repo.upsert_credential = AsyncMock()
sub = bus.subscribe("credential.captured")
async with sub:
await _ingest_credential_native(
repo,
log_data={"attacker_ip": "10.0.0.5", "decky": "d", "service": "ssh"},
fields={"secret_b64": "not-valid-base64!!!"},
bus=bus,
)
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(sub.__anext__(), timeout=0.2)
repo.upsert_credential.assert_not_awaited()
@pytest.mark.asyncio
async def test_ingester_degrades_cleanly_when_bus_disabled(
monkeypatch: pytest.MonkeyPatch,
) -> None:
from decnet.bus.factory import get_bus
monkeypatch.setenv("DECNET_BUS_ENABLED", "false")
b = get_bus(client_name="ingester")
await b.connect()
await b.publish("system.log", {"component": "ingester"}, event_type="batch_committed")
await b.close()