Replaces LICENSE (GPLv3 -> AGPLv3) and prepends `SPDX-License-Identifier: AGPL-3.0-or-later` to every source file across decnet/, decnet_web/, tests/, scripts/, and tools/. Rationale: closes the GPLv3 ASP loophole so any party operating a modified DECNET as a network service must offer their modified source. Personal copyright (Samuel Paschuan) + inbound=outbound contributions make a future unilateral relicense infeasible. - LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt) - COPYRIGHT: project copyright notice - tools/add_spdx_headers.py: idempotent header injector (shebang- and PEP 263-aware) Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh). No behavior change; comments only.
105 lines
3.5 KiB
Python
105 lines
3.5 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
||
"""Contract tests for the six per-source lifters (E.1.6).
|
||
|
||
Scoped to the contract surface: each lifter is a :class:`TolerantTagger`
|
||
subclass with a non-empty ``HANDLES`` ⊆ :data:`KNOWN_SOURCE_KINDS`,
|
||
unique ``name``, and an empty-list return from ``_tag_impl``. Behavioral
|
||
absence-tolerance assertions from E.2.6 (per-provider null patterns,
|
||
session-without-AttackerBehavior, etc.) are present but xfail-strict
|
||
pending E.3.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
|
||
import pytest
|
||
|
||
from decnet.ttp.base import KNOWN_SOURCE_KINDS, TaggerEvent, TolerantTagger
|
||
from decnet.ttp.impl.behavioral_lifter import BehavioralLifter
|
||
from decnet.ttp.impl.canary_fingerprint_lifter import CanaryFingerprintLifter
|
||
from decnet.ttp.impl.credential_lifter import CredentialLifter
|
||
from decnet.ttp.impl.email_lifter import EmailLifter
|
||
from decnet.ttp.impl.identity_lifter import IdentityLifter
|
||
from decnet.ttp.impl.intel_lifter import IntelLifter
|
||
from tests.ttp._stub_store import StubRuleStore
|
||
|
||
|
||
def _instantiate(cls: type[TolerantTagger]) -> TolerantTagger:
|
||
"""Every shipped lifter (E.3.9–E.3.13) takes a :class:`RuleStore`."""
|
||
return cls(StubRuleStore()) # type: ignore[call-arg]
|
||
|
||
ALL_LIFTERS = [
|
||
BehavioralLifter,
|
||
IntelLifter,
|
||
EmailLifter,
|
||
CanaryFingerprintLifter,
|
||
IdentityLifter,
|
||
CredentialLifter,
|
||
]
|
||
|
||
|
||
def _ev(source_kind: str) -> TaggerEvent:
|
||
return TaggerEvent(
|
||
source_kind=source_kind,
|
||
source_id="src1",
|
||
attacker_uuid="att1",
|
||
identity_uuid=None,
|
||
session_id=None,
|
||
decky_id=None,
|
||
payload={},
|
||
)
|
||
|
||
|
||
@pytest.mark.parametrize("cls", ALL_LIFTERS)
|
||
def test_lifter_subclasses_tolerant_tagger(cls):
|
||
assert issubclass(cls, TolerantTagger)
|
||
|
||
|
||
@pytest.mark.parametrize("cls", ALL_LIFTERS)
|
||
def test_lifter_handles_is_non_empty_frozenset_subset_of_known(cls):
|
||
assert isinstance(cls.HANDLES, frozenset)
|
||
assert cls.HANDLES, f"{cls.__name__}.HANDLES must not be empty"
|
||
assert cls.HANDLES <= KNOWN_SOURCE_KINDS, (
|
||
f"{cls.__name__}.HANDLES contains kinds not in KNOWN_SOURCE_KINDS"
|
||
)
|
||
|
||
|
||
def test_lifter_names_are_unique_and_non_empty():
|
||
names = [cls.name for cls in ALL_LIFTERS]
|
||
assert all(n for n in names), "every lifter needs a non-empty name"
|
||
assert len(set(names)) == len(names), "lifter names must be unique"
|
||
|
||
|
||
@pytest.mark.parametrize("cls", ALL_LIFTERS)
|
||
def test_lifter_tag_returns_empty_list_for_handled_event(cls):
|
||
lifter = _instantiate(cls)
|
||
kind = next(iter(cls.HANDLES))
|
||
out = asyncio.run(lifter.tag(_ev(kind)))
|
||
assert out == []
|
||
|
||
|
||
@pytest.mark.parametrize("cls", ALL_LIFTERS)
|
||
def test_lifter_instantiable(cls):
|
||
# No abstract methods left — concrete subclass must be constructible.
|
||
_instantiate(cls)
|
||
|
||
|
||
# ── E.2.6 deferred absence-tolerance behavior ──────────────────────
|
||
|
||
|
||
def test_e26_intel_lifter_partial_provider_nulls():
|
||
"""E.3.10: with no actionable per-provider signal (e.g. score set
|
||
but categories absent), IntelLifter returns []. No errors."""
|
||
lifter = IntelLifter(StubRuleStore())
|
||
out = asyncio.run(lifter.tag(_ev("intel")))
|
||
assert out == []
|
||
|
||
|
||
def test_e26_behavioral_lifter_no_attacker_behavior_row():
|
||
"""E.3.9: a session event with no AttackerBehavior fields populated
|
||
must produce zero tags and zero errors. Was xfail-strict before
|
||
BehavioralLifter shipped; now a real assertion."""
|
||
lifter = BehavioralLifter(StubRuleStore())
|
||
out = asyncio.run(lifter.tag(_ev("session")))
|
||
assert out == []
|