Replaces LICENSE (GPLv3 -> AGPLv3) and prepends `SPDX-License-Identifier: AGPL-3.0-or-later` to every source file across decnet/, decnet_web/, tests/, scripts/, and tools/. Rationale: closes the GPLv3 ASP loophole so any party operating a modified DECNET as a network service must offer their modified source. Personal copyright (Samuel Paschuan) + inbound=outbound contributions make a future unilateral relicense infeasible. - LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt) - COPYRIGHT: project copyright notice - tools/add_spdx_headers.py: idempotent header injector (shebang- and PEP 263-aware) Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh). No behavior change; comments only.
154 lines
5.3 KiB
Python
154 lines
5.3 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""Bus wiring for the correlation engine (DEBT-031, worker 3).
|
|
|
|
The correlator is not a standalone worker — ``CorrelationEngine`` is a
|
|
batch class instantiated inside the profiler worker. DEBT-031 wires it
|
|
via an optional ``publish_fn`` constructor arg: on the first sighting of
|
|
an attacker IP, the engine emits ``("observed", payload)`` through the
|
|
hook. The profiler worker carries the bus physically and translates
|
|
those sync hook calls into ``attacker.observed`` publishes.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from datetime import datetime
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
|
|
from decnet.bus import topics as _topics
|
|
from decnet.bus.fake import FakeBus
|
|
from decnet.bus.publish import make_thread_safe_publisher
|
|
from decnet.correlation.engine import CorrelationEngine
|
|
from decnet.logging.syslog_formatter import SEVERITY_INFO, format_rfc5424
|
|
|
|
|
|
_TS = "2026-04-21T10:00:00+00:00"
|
|
|
|
|
|
def _line(ip: str = "1.2.3.4", decky: str = "decky-01", event_type: str = "connection") -> str:
|
|
return format_rfc5424(
|
|
service="http",
|
|
hostname=decky,
|
|
event_type=event_type,
|
|
severity=SEVERITY_INFO,
|
|
timestamp=datetime.fromisoformat(_TS),
|
|
src_ip=ip,
|
|
)
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def bus() -> FakeBus:
|
|
b = FakeBus()
|
|
await b.connect()
|
|
yield b
|
|
await b.close()
|
|
|
|
|
|
# ─── Engine-level publish hook ───────────────────────────────────────────────
|
|
|
|
def test_engine_publishes_once_on_first_sighting() -> None:
|
|
captured: list[tuple[str, dict]] = []
|
|
engine = CorrelationEngine(
|
|
publish_fn=lambda event_type, payload: captured.append((event_type, payload)),
|
|
)
|
|
|
|
# Same IP three times: only the first should publish.
|
|
engine.ingest(_line(ip="9.9.9.9"))
|
|
engine.ingest(_line(ip="9.9.9.9", event_type="login"))
|
|
engine.ingest(_line(ip="9.9.9.9", decky="decky-02"))
|
|
|
|
assert len(captured) == 1
|
|
event_type, payload = captured[0]
|
|
assert event_type == "observed"
|
|
assert payload["attacker_ip"] == "9.9.9.9"
|
|
assert payload["decky"] == "decky-01"
|
|
assert payload["service"] == "http"
|
|
assert payload["event_type"] == "connection"
|
|
assert payload["first_seen"].startswith("2026-04-21T10:00:00")
|
|
|
|
|
|
def test_engine_publishes_per_unique_ip() -> None:
|
|
captured: list[tuple[str, dict]] = []
|
|
engine = CorrelationEngine(
|
|
publish_fn=lambda event_type, payload: captured.append((event_type, payload)),
|
|
)
|
|
|
|
engine.ingest(_line(ip="1.1.1.1"))
|
|
engine.ingest(_line(ip="2.2.2.2"))
|
|
engine.ingest(_line(ip="1.1.1.1")) # dup, no publish
|
|
engine.ingest(_line(ip="3.3.3.3"))
|
|
|
|
ips = [p["attacker_ip"] for _, p in captured]
|
|
assert ips == ["1.1.1.1", "2.2.2.2", "3.3.3.3"]
|
|
|
|
|
|
def test_engine_swallows_publish_fn_failures() -> None:
|
|
# A publish hook that blows up must never break ingestion.
|
|
def _boom(_event_type, _payload):
|
|
raise RuntimeError("transport exploded")
|
|
|
|
engine = CorrelationEngine(publish_fn=_boom)
|
|
result = engine.ingest(_line(ip="5.5.5.5"))
|
|
assert result is not None
|
|
assert engine.events_indexed == 1
|
|
|
|
|
|
def test_engine_runs_unchanged_without_publish_fn() -> None:
|
|
# Pre-bus behavior. No hook, no publishes, same indexing result.
|
|
engine = CorrelationEngine()
|
|
engine.ingest(_line(ip="7.7.7.7"))
|
|
engine.ingest(_line(ip="7.7.7.7"))
|
|
assert engine.events_indexed == 2
|
|
|
|
|
|
def test_engine_ignores_lines_without_attacker_ip() -> None:
|
|
captured: list[tuple[str, dict]] = []
|
|
engine = CorrelationEngine(
|
|
publish_fn=lambda event_type, payload: captured.append((event_type, payload)),
|
|
)
|
|
# Line without src_ip — parser still returns a LogEvent but attacker_ip is empty.
|
|
line_no_ip = format_rfc5424(
|
|
service="http",
|
|
hostname="decky-01",
|
|
event_type="boot",
|
|
severity=SEVERITY_INFO,
|
|
timestamp=datetime.fromisoformat(_TS),
|
|
)
|
|
engine.ingest(line_no_ip)
|
|
assert captured == []
|
|
|
|
|
|
# ─── End-to-end through the bus ──────────────────────────────────────────────
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_correlator_publishes_on_attacker_observed_topic(bus: FakeBus) -> None:
|
|
loop = asyncio.get_running_loop()
|
|
raw = make_thread_safe_publisher(bus, loop)
|
|
|
|
def publish(event_type: str, payload: dict) -> None:
|
|
raw(_topics.attacker(_topics.ATTACKER_OBSERVED), payload, event_type)
|
|
|
|
engine = CorrelationEngine(publish_fn=publish)
|
|
|
|
sub = bus.subscribe("attacker.observed")
|
|
async with sub:
|
|
engine.ingest(_line(ip="8.8.8.8"))
|
|
event = await asyncio.wait_for(sub.__anext__(), timeout=2.0)
|
|
|
|
assert event.topic == "attacker.observed"
|
|
assert event.type == "observed"
|
|
assert event.payload["attacker_ip"] == "8.8.8.8"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_correlator_degrades_cleanly_when_bus_disabled(monkeypatch: pytest.MonkeyPatch) -> None:
|
|
# DECNET_BUS_ENABLED=false returns NullBus; connect()+publish must never raise.
|
|
from decnet.bus.factory import get_bus
|
|
|
|
monkeypatch.setenv("DECNET_BUS_ENABLED", "false")
|
|
b = get_bus(client_name="profiler")
|
|
await b.connect()
|
|
await b.publish("attacker.observed", {"attacker_ip": "1.2.3.4"}, event_type="observed")
|
|
await b.close()
|