Replaces LICENSE (GPLv3 -> AGPLv3) and prepends `SPDX-License-Identifier: AGPL-3.0-or-later` to every source file across decnet/, decnet_web/, tests/, scripts/, and tools/. Rationale: closes the GPLv3 ASP loophole so any party operating a modified DECNET as a network service must offer their modified source. Personal copyright (Samuel Paschuan) + inbound=outbound contributions make a future unilateral relicense infeasible. - LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt) - COPYRIGHT: project copyright notice - tools/add_spdx_headers.py: idempotent header injector (shebang- and PEP 263-aware) Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh). No behavior change; comments only.
147 lines
5.7 KiB
Python
147 lines
5.7 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
"""Realism-driven canary cultivation.
|
|
|
|
Stage 7 of the realism migration: the orchestrator's planner picks a
|
|
canary content_class ~3% of file ticks; the cultivator turns that into
|
|
a CanaryArtifact + persisted CanaryToken row.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
|
|
from decnet.canary.cultivator import cultivate
|
|
from decnet.realism.taxonomy import ContentClass, Plan
|
|
from decnet.web.db.sqlite.repository import SQLiteRepository
|
|
|
|
|
|
@pytest_asyncio.fixture
|
|
async def repo(tmp_path):
|
|
r = SQLiteRepository(db_path=str(tmp_path / "decnet.db"))
|
|
await r.initialize()
|
|
yield r
|
|
await r.engine.dispose()
|
|
|
|
|
|
def _plan(cls: ContentClass, persona: str = "admin") -> Plan:
|
|
return Plan(
|
|
decky_uuid="d1",
|
|
decky_name="alpha",
|
|
persona=persona,
|
|
content_class=cls,
|
|
action="create",
|
|
target_path="",
|
|
mtime=datetime(2026, 4, 27, 11, 30, tzinfo=timezone.utc),
|
|
body_hint=None,
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cultivate_records_canary_token_row(repo, monkeypatch):
|
|
monkeypatch.setenv("DECNET_CANARY_HTTP_BASE", "https://canary.example.test")
|
|
monkeypatch.setenv("DECNET_CANARY_DNS_ZONE", "canary.example.test")
|
|
|
|
artifact = await cultivate(
|
|
_plan(ContentClass.CANARY_GIT_CONFIG), repo,
|
|
)
|
|
assert artifact.path == "/home/admin/.git/config"
|
|
assert artifact.content
|
|
# Token row landed and the slug round-trips through the slug index.
|
|
rows = await repo.list_canary_tokens(decky_name="alpha")
|
|
assert len(rows) == 1
|
|
assert rows[0]["generator"] == "git_config"
|
|
assert rows[0]["placement_path"] == "/home/admin/.git/config"
|
|
assert rows[0]["callback_token"] in artifact.content.decode("utf-8")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cultivate_persists_path_for_each_class(repo, monkeypatch):
|
|
monkeypatch.setenv("DECNET_CANARY_HTTP_BASE", "https://canary.example.test")
|
|
monkeypatch.setenv("DECNET_CANARY_DNS_ZONE", "canary.example.test")
|
|
|
|
classes_and_paths = {
|
|
ContentClass.CANARY_AWS_CREDS: "/home/admin/.aws/credentials",
|
|
ContentClass.CANARY_ENV_FILE: "/home/admin/app/.env",
|
|
ContentClass.CANARY_GIT_CONFIG: "/home/admin/.git/config",
|
|
ContentClass.CANARY_SSH_KEY: "/home/admin/.ssh/id_rsa",
|
|
ContentClass.CANARY_HONEYDOC: "/home/admin/Documents/notes.html",
|
|
ContentClass.CANARY_MYSQL_DUMP: "/var/backups/db_backup.sql",
|
|
}
|
|
for cls, expected in classes_and_paths.items():
|
|
artifact = await cultivate(_plan(cls), repo)
|
|
assert artifact.path == expected, (
|
|
f"{cls.value!r} planted at {artifact.path!r}, want {expected!r}"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cultivate_rejects_non_canary_class(repo):
|
|
with pytest.raises(ValueError, match="non-canary"):
|
|
await cultivate(_plan(ContentClass.NOTE), repo)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cultivate_persona_login_normalisation(repo, monkeypatch):
|
|
monkeypatch.setenv("DECNET_CANARY_HTTP_BASE", "https://canary.example.test")
|
|
monkeypatch.setenv("DECNET_CANARY_DNS_ZONE", "canary.example.test")
|
|
artifact = await cultivate(
|
|
_plan(ContentClass.CANARY_AWS_CREDS, persona="John Smith"), repo,
|
|
)
|
|
# Spaces collapsed to lowercase login, same convention as the
|
|
# realism namer's _home() function.
|
|
assert artifact.path == "/home/johnsmith/.aws/credentials"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cultivate_artifact_does_not_leak_decnet_string(repo, monkeypatch):
|
|
"""Stealth contract (per feedback_stealth.md): a planted canary's
|
|
bytes must never carry the DECNET literal — that would tell an
|
|
attacker the file is a honeypot trap."""
|
|
monkeypatch.setenv("DECNET_CANARY_HTTP_BASE", "https://canary.example.test")
|
|
monkeypatch.setenv("DECNET_CANARY_DNS_ZONE", "canary.example.test")
|
|
for cls in (
|
|
ContentClass.CANARY_AWS_CREDS,
|
|
ContentClass.CANARY_GIT_CONFIG,
|
|
ContentClass.CANARY_ENV_FILE,
|
|
ContentClass.CANARY_SSH_KEY,
|
|
):
|
|
artifact = await cultivate(_plan(cls), repo)
|
|
body = artifact.content.decode("utf-8", errors="replace")
|
|
assert "decnet" not in body.lower(), (
|
|
f"{cls.value!r} body leaked 'decnet': "
|
|
f"{body[:120]!r}"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_cultivate_records_kind_per_generator(repo, monkeypatch):
|
|
"""The token row's ``kind`` reflects the trip surface of the
|
|
underlying generator: HTTP slug callback, DNS resolution, or
|
|
passive bait. The canary worker uses ``kind`` to route incoming
|
|
callbacks; a wrong kind means the trip won't attribute correctly."""
|
|
monkeypatch.setenv("DECNET_CANARY_HTTP_BASE", "https://canary.example.test")
|
|
monkeypatch.setenv("DECNET_CANARY_DNS_ZONE", "canary.example.test")
|
|
cases = [
|
|
(ContentClass.CANARY_AWS_CREDS, "aws_passive"),
|
|
(ContentClass.CANARY_ENV_FILE, "http"),
|
|
(ContentClass.CANARY_GIT_CONFIG, "http"),
|
|
(ContentClass.CANARY_HONEYDOC, "http"),
|
|
(ContentClass.CANARY_HONEYDOC_DOCX, "http"),
|
|
(ContentClass.CANARY_HONEYDOC_PDF, "http"),
|
|
(ContentClass.CANARY_SSH_KEY, "dns"),
|
|
(ContentClass.CANARY_MYSQL_DUMP, "dns"),
|
|
]
|
|
for cls, expected_kind in cases:
|
|
await cultivate(_plan(cls, persona=f"p-{cls.value}"), repo)
|
|
rows = await repo.list_canary_tokens(decky_name="alpha")
|
|
by_gen = {r["generator"]: r["kind"] for r in rows}
|
|
for cls, expected_kind in cases:
|
|
from decnet.canary.cultivator import _CLASS_TO_GENERATOR
|
|
gen = _CLASS_TO_GENERATOR[cls]
|
|
assert by_gen[gen] == expected_kind, (
|
|
f"{cls.value!r} → generator {gen!r} got kind={by_gen[gen]!r}, "
|
|
f"want {expected_kind!r}"
|
|
)
|