feat(canary): package scaffolding (base/factory/paths/storage) + tests

Mirrors the decnet.intel layout (base + factory + lazy concrete
imports). Defines:

- CanaryArtifact / CanaryContext dataclasses + the generator and
  instrumenter ABCs they share
- factory dispatch for generators (git_config/env_file/ssh_key/
  aws_creds/honeydoc) and instrumenters (docx/xlsx/pdf/html/image/
  plain/passthrough), plus pick_instrumenter_for_mime() for MIME-driven
  dispatch on operator uploads
- persona-aware default placement paths (Linux vs. Windows-shaped)
  and absolute-path validation that the API will use to validate
  operator-supplied placement_path values
- on-disk blob store: sha256-keyed two-level fan-out, idempotent
  writes, refcount-aware unlink (the DB row is the source of truth)

Also covers prior commits' tests (bus topics, models, repo CRUD)
under tests/canary/. 79 tests, all pass.
This commit is contained in:
2026-04-27 12:56:01 -04:00
parent 6a0d140e91
commit 8f19adecfe
12 changed files with 989 additions and 0 deletions

0
tests/canary/__init__.py Normal file
View File

View File

@@ -0,0 +1,87 @@
"""Coverage for the generator/instrumenter factory + MIME dispatch.
The concrete generators and instrumenters land in subsequent commits;
this file only tests the dispatch surface — it must reject unknown
names with ``ValueError`` and pick the right instrumenter for known
MIME types (with passthrough as the fallback for binary blobs we
can't safely mutate).
"""
from __future__ import annotations
import pytest
from decnet.canary.factory import (
KNOWN_GENERATORS,
KNOWN_INSTRUMENTERS,
pick_instrumenter_for_mime,
)
@pytest.mark.parametrize(
"mime, expected",
[
("application/pdf", "pdf"),
("application/PDF", "pdf"), # case-insensitive
("application/vnd.openxmlformats-officedocument.wordprocessingml.document", "docx"),
("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", "xlsx"),
("text/html", "html"),
("application/xhtml+xml", "html"),
("text/plain", "plain"),
("text/x-yaml", "plain"),
("application/json", "plain"),
("application/yaml", "plain"),
("application/toml", "plain"),
("image/png", "image"),
("image/jpeg", "image"),
("image/gif", "image"),
],
)
def test_mime_dispatch_known(mime: str, expected: str) -> None:
assert pick_instrumenter_for_mime(mime) == expected
@pytest.mark.parametrize(
"mime",
[
"",
"application/octet-stream",
"application/x-tar",
"application/zip", # bare zip — DOCX/XLSX dispatch by alias, not raw zip
"video/mp4",
"audio/mpeg",
],
)
def test_mime_dispatch_falls_back_to_passthrough(mime: str) -> None:
assert pick_instrumenter_for_mime(mime) == "passthrough"
def test_known_lists_are_stable() -> None:
# If anyone adds/removes from the dispatch tables, the test
# surfaces it. Keeps the schema-of-record in one place.
assert KNOWN_GENERATORS == (
"git_config", "env_file", "ssh_key", "aws_creds", "honeydoc",
)
assert KNOWN_INSTRUMENTERS == (
"docx", "xlsx", "pdf", "html", "image", "plain", "passthrough",
)
def test_unknown_generator_raises() -> None:
from decnet.canary.factory import get_generator
with pytest.raises(ValueError, match="Unknown canary generator"):
get_generator("bogus")
def test_unknown_instrumenter_raises() -> None:
from decnet.canary.factory import get_instrumenter
with pytest.raises(ValueError, match="Unknown canary instrumenter"):
get_instrumenter("bogus")
def test_base_artifact_dataclass_defaults() -> None:
from decnet.canary import CanaryArtifact
a = CanaryArtifact(path="/x", content=b"y")
assert a.mode == 0o600
assert a.mtime_offset == 0
assert a.notes == []
assert a.generator is None and a.instrumenter is None

View File

@@ -0,0 +1,85 @@
"""Smoke coverage for the Pydantic request/response shapes + helpers.
The tables themselves are exercised end-to-end in
:mod:`tests.canary.test_repository`; this module only covers the
helpers and request validation that don't go through the DB —
``CanaryTrigger.headers()`` JSON decoding, the
``CanaryTokenCreateRequest`` body shape, and the dump-roundtrip on
the response models.
"""
from __future__ import annotations
import pytest
from decnet.web.db.models import (
CanaryBlobResponse,
CanaryTokenCreateRequest,
CanaryTokenResponse,
CanaryTrigger,
CanaryTriggerResponse,
)
def test_create_request_minimal() -> None:
r = CanaryTokenCreateRequest(
decky_name="web1",
kind="http",
placement_path="/home/admin/.env",
generator="env_file",
)
assert r.blob_uuid is None
assert r.persona_path_hint is None
def test_create_request_kind_is_constrained() -> None:
with pytest.raises(ValueError):
CanaryTokenCreateRequest(
decky_name="web1", kind="bogus", # type: ignore[arg-type]
placement_path="/x", generator="aws_creds",
)
def test_trigger_headers_decode_valid_json() -> None:
t = CanaryTrigger(
token_uuid="t",
src_ip="1.2.3.4",
raw_headers='{"user-agent":"curl"}',
)
assert t.headers() == {"user-agent": "curl"}
@pytest.mark.parametrize("raw", ["", "not json", "[1,2,3]", "null"])
def test_trigger_headers_falls_back_to_empty(raw: str) -> None:
t = CanaryTrigger(token_uuid="t", src_ip="1.2.3.4", raw_headers=raw)
assert t.headers() == {}
def test_response_models_round_trip() -> None:
# Canonical shapes — proves the field set + types match what the
# router will hand back. Strings everywhere because the DB layer
# uses str UUIDs (project convention).
blob = CanaryBlobResponse(
uuid="b1", sha256="0" * 64, filename="x.docx",
content_type="application/octet-stream", size_bytes=1,
uploaded_by="u1", uploaded_at="2026-04-27T00:00:00Z", # type: ignore[arg-type]
token_count=2,
)
assert blob.token_count == 2
tok = CanaryTokenResponse(
uuid="t1", kind="http", decky_name="web1",
blob_uuid=None, instrumenter=None, generator="aws_creds",
placement_path="/a", callback_token="s",
placed_at="2026-04-27T00:00:00Z", # type: ignore[arg-type]
last_triggered_at=None, trigger_count=0,
created_by="u1", state="planted", last_error=None,
)
assert tok.kind == "http"
trig = CanaryTriggerResponse(
uuid="x", token_uuid="t1",
occurred_at="2026-04-27T00:00:00Z", # type: ignore[arg-type]
src_ip="1.2.3.4", user_agent=None, request_path=None,
dns_qname=None, headers={}, attacker_id=None,
)
assert trig.src_ip == "1.2.3.4"

View File

@@ -0,0 +1,66 @@
"""Coverage for the persona-aware path resolver + placement validator."""
from __future__ import annotations
import pytest
from decnet.canary.paths import (
DEFAULT_LINUX_USER,
DEFAULT_WINDOWS_USER,
default_path_for,
default_user,
normalize_placement,
)
def test_default_user_dispatch() -> None:
assert default_user("linux") == DEFAULT_LINUX_USER
assert default_user("windows") == DEFAULT_WINDOWS_USER
# Unknown personas fall through to Linux — better to plant than fail.
assert default_user("aix") == DEFAULT_LINUX_USER
@pytest.mark.parametrize(
"generator, persona, expected_substr",
[
("aws_creds", "linux", "/home/admin/.aws/credentials"),
("aws_creds", "windows", "/home/Administrator/.aws/credentials"),
("env_file", "linux", "/home/admin/.env"),
("env_file", "windows", "/home/Administrator/Desktop/prod.env"),
("git_config", "linux", "/home/admin/.git/config"),
("ssh_key", "linux", "/home/admin/.ssh/id_rsa"),
("honeydoc", "linux", "/home/admin/Documents/quarterly_report.docx"),
],
)
def test_default_path_for_known_generators(
generator: str, persona: str, expected_substr: str,
) -> None:
assert default_path_for(generator, persona) == expected_substr
def test_default_path_for_unknown_generator_falls_through() -> None:
# Unknown generator — defensive /tmp drop. The API rejects unknowns
# upstream, but the resolver shouldn't crash if one slips through.
assert default_path_for("bogus") == "/tmp/bogus.canary"
def test_normalize_placement_accepts_clean_paths() -> None:
assert normalize_placement("/home/admin/.env") == "/home/admin/.env"
assert normalize_placement("/var/lib/x") == "/var/lib/x"
@pytest.mark.parametrize(
"bad",
[
"",
"relative/path",
"./still-relative",
"/path/with\x00nul",
"/path/with\nnewline",
"/path/with\rcr",
"/path/../escape",
"/trailing/..",
],
)
def test_normalize_placement_rejects_bad(bad: str) -> None:
with pytest.raises(ValueError):
normalize_placement(bad)

View File

@@ -0,0 +1,179 @@
"""Repository CRUD coverage for canary blobs / tokens / triggers.
Same harness as the rest of :mod:`tests.db` — spin up a SQLite-backed
:class:`SQLiteRepository` against a tempfile, exercise the public
methods, assert observable state.
We deliberately don't go through the API; that gets its own test
module once the router lands. This file proves the repository layer
in isolation: dedup, refcount-aware delete, slug lookup, atomic
trigger record + counter bump, attribution.
"""
from __future__ import annotations
import hashlib
from typing import AsyncIterator
import pytest
import pytest_asyncio
from decnet.web.db.sqlite.repository import SQLiteRepository
import decnet.web.db.models # noqa: F401 — registers tables on import
@pytest_asyncio.fixture
async def repo(tmp_path) -> AsyncIterator[SQLiteRepository]:
r = SQLiteRepository(str(tmp_path / "canary.db"))
await r.initialize()
yield r
async def _make_blob(repo: SQLiteRepository, content: bytes, *, by: str = "u1") -> dict:
return await repo.upsert_canary_blob({
"sha256": hashlib.sha256(content).hexdigest(),
"filename": "report.docx",
"content_type": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"size_bytes": len(content),
"uploaded_by": by,
})
@pytest.mark.asyncio
async def test_upsert_blob_dedupes_by_sha256(repo: SQLiteRepository) -> None:
a = await _make_blob(repo, b"same bytes", by="u1")
b = await _make_blob(repo, b"same bytes", by="u2")
assert a["uuid"] == b["uuid"], "second upload must return the canonical row"
# Different bytes → different blob.
c = await _make_blob(repo, b"different bytes", by="u1")
assert c["uuid"] != a["uuid"]
@pytest.mark.asyncio
async def test_upsert_blob_requires_sha256(repo: SQLiteRepository) -> None:
with pytest.raises(ValueError):
await repo.upsert_canary_blob({"filename": "x", "content_type": "x", "size_bytes": 0, "uploaded_by": "u"})
@pytest.mark.asyncio
async def test_get_blob_by_sha256(repo: SQLiteRepository) -> None:
blob = await _make_blob(repo, b"x")
found = await repo.get_canary_blob_by_sha256(blob["sha256"])
assert found is not None and found["uuid"] == blob["uuid"]
assert await repo.get_canary_blob_by_sha256("0" * 64) is None
@pytest.mark.asyncio
async def test_list_blobs_carries_token_count(repo: SQLiteRepository) -> None:
blob = await _make_blob(repo, b"x")
listed = await repo.list_canary_blobs()
assert len(listed) == 1 and listed[0]["token_count"] == 0
await repo.create_canary_token({
"kind": "http", "decky_name": "web1", "blob_uuid": blob["uuid"],
"instrumenter": "docx", "placement_path": "/tmp/x.docx",
"callback_token": "slug-1", "secret_seed": "s", "created_by": "u1",
})
listed = await repo.list_canary_blobs()
assert listed[0]["token_count"] == 1
@pytest.mark.asyncio
async def test_delete_blob_refuses_while_referenced(repo: SQLiteRepository) -> None:
blob = await _make_blob(repo, b"x")
await repo.create_canary_token({
"kind": "http", "decky_name": "web1", "blob_uuid": blob["uuid"],
"instrumenter": "docx", "placement_path": "/tmp/x.docx",
"callback_token": "slug-r", "secret_seed": "s", "created_by": "u1",
})
assert await repo.delete_canary_blob(blob["uuid"]) is False
# Even after revoke, the row still references the blob — operator
# must explicitly clean tokens before they can prune the blob.
tok = await repo.get_canary_token_by_slug("slug-r")
await repo.update_canary_token_state(tok["uuid"], "revoked")
assert await repo.delete_canary_blob(blob["uuid"]) is False
@pytest.mark.asyncio
async def test_delete_blob_returns_false_for_missing(repo: SQLiteRepository) -> None:
assert await repo.delete_canary_blob("00000000-0000-0000-0000-000000000000") is False
@pytest.mark.asyncio
async def test_token_slug_lookup(repo: SQLiteRepository) -> None:
await repo.create_canary_token({
"kind": "http", "decky_name": "web1", "generator": "aws_creds",
"placement_path": "/home/admin/.aws/credentials",
"callback_token": "slug-aws", "secret_seed": "s", "created_by": "u1",
})
found = await repo.get_canary_token_by_slug("slug-aws")
assert found is not None and found["decky_name"] == "web1"
assert await repo.get_canary_token_by_slug("nonexistent") is None
@pytest.mark.asyncio
async def test_list_tokens_filters(repo: SQLiteRepository) -> None:
await repo.create_canary_token({
"kind": "http", "decky_name": "web1", "generator": "aws_creds",
"placement_path": "/a", "callback_token": "s1",
"secret_seed": "s", "created_by": "u1",
})
await repo.create_canary_token({
"kind": "dns", "decky_name": "web2", "generator": "aws_creds",
"placement_path": "/b", "callback_token": "s2",
"secret_seed": "s", "created_by": "u1",
})
assert len(await repo.list_canary_tokens()) == 2
assert len(await repo.list_canary_tokens(decky_name="web1")) == 1
assert len(await repo.list_canary_tokens(kind="dns")) == 1
assert len(await repo.list_canary_tokens(state="revoked")) == 0
@pytest.mark.asyncio
async def test_record_trigger_bumps_counters_atomically(repo: SQLiteRepository) -> None:
await repo.create_canary_token({
"kind": "http", "decky_name": "web1", "generator": "aws_creds",
"placement_path": "/a", "callback_token": "slug-c",
"secret_seed": "s", "created_by": "u1",
})
tok = await repo.get_canary_token_by_slug("slug-c")
assert tok["trigger_count"] == 0 and tok["last_triggered_at"] is None
trig_id = await repo.record_canary_trigger({
"token_uuid": tok["uuid"], "src_ip": "1.2.3.4",
"request_path": "/c/slug-c", "user_agent": "curl/8.0",
"raw_headers": {"user-agent": "curl/8.0"},
})
assert trig_id
tok2 = await repo.get_canary_token_by_slug("slug-c")
assert tok2["trigger_count"] == 1
assert tok2["last_triggered_at"] is not None
# raw_headers stored as JSON text and decodes via the model helper.
triggers = await repo.list_canary_triggers(tok["uuid"])
assert len(triggers) == 1
assert triggers[0]["src_ip"] == "1.2.3.4"
@pytest.mark.asyncio
async def test_attribute_trigger_sets_attacker(repo: SQLiteRepository) -> None:
await repo.create_canary_token({
"kind": "http", "decky_name": "web1", "generator": "aws_creds",
"placement_path": "/a", "callback_token": "slug-at",
"secret_seed": "s", "created_by": "u1",
})
tok = await repo.get_canary_token_by_slug("slug-at")
trig_id = await repo.record_canary_trigger({
"token_uuid": tok["uuid"], "src_ip": "9.9.9.9",
})
assert await repo.attribute_canary_trigger(trig_id, "attacker-uuid-123") is True
assert await repo.attribute_canary_trigger("missing-trig", "x") is False
triggers = await repo.list_canary_triggers(tok["uuid"])
assert triggers[0]["attacker_id"] == "attacker-uuid-123"
@pytest.mark.asyncio
async def test_get_token_returns_none_for_missing(repo: SQLiteRepository) -> None:
assert await repo.get_canary_token("00000000-0000-0000-0000-000000000000") is None
assert await repo.get_canary_blob("00000000-0000-0000-0000-000000000000") is None
@pytest.mark.asyncio
async def test_update_state_returns_false_for_missing(repo: SQLiteRepository) -> None:
assert await repo.update_canary_token_state("missing", "revoked") is False

View File

@@ -0,0 +1,52 @@
"""Coverage for the on-disk blob store."""
from __future__ import annotations
import hashlib
from decnet.canary import storage
def test_write_blob_is_idempotent(tmp_path, monkeypatch) -> None:
monkeypatch.setenv("DECNET_CANARY_BLOB_DIR", str(tmp_path))
sha1, p1, sz1 = storage.write_blob(b"hello canary")
sha2, p2, sz2 = storage.write_blob(b"hello canary")
assert sha1 == sha2 == hashlib.sha256(b"hello canary").hexdigest()
assert p1 == p2
assert sz1 == sz2 == len(b"hello canary")
# Two-level fan-out: ab/cd/abcd...
assert p1.parent.parent.parent == tmp_path
assert p1.parent.name == sha1[2:4]
assert p1.parent.parent.name == sha1[:2]
def test_read_blob_returns_bytes(tmp_path, monkeypatch) -> None:
monkeypatch.setenv("DECNET_CANARY_BLOB_DIR", str(tmp_path))
sha, _, _ = storage.write_blob(b"some payload")
assert storage.read_blob(sha) == b"some payload"
def test_unlink_blob_returns_false_for_missing(tmp_path, monkeypatch) -> None:
monkeypatch.setenv("DECNET_CANARY_BLOB_DIR", str(tmp_path))
sha = "0" * 64
assert storage.unlink_blob(sha) is False
def test_unlink_blob_removes_file(tmp_path, monkeypatch) -> None:
monkeypatch.setenv("DECNET_CANARY_BLOB_DIR", str(tmp_path))
sha, path, _ = storage.write_blob(b"to be removed")
assert path.exists()
assert storage.unlink_blob(sha) is True
assert not path.exists()
# Second unlink is a no-op rather than a crash.
assert storage.unlink_blob(sha) is False
def test_blob_dir_honors_env(monkeypatch, tmp_path) -> None:
monkeypatch.setenv("DECNET_CANARY_BLOB_DIR", str(tmp_path / "alt"))
assert storage.blob_dir() == tmp_path / "alt"
def test_short_sha_rejected() -> None:
import pytest
with pytest.raises(ValueError):
storage._path_for("abc")

View File

@@ -0,0 +1,42 @@
"""Coverage for the canary bus-topic builder + constants.
The builder shares :func:`_reject_tokens` with every other family in
:mod:`decnet.bus.topics`, so we only need to exercise the canary
surface: the three leaf constants and that bogus segments are
rejected. Anything more would duplicate :mod:`tests.bus.test_topics`.
"""
from __future__ import annotations
import pytest
from decnet.bus import topics
def test_canary_constants_are_distinct() -> None:
assert topics.CANARY == "canary"
assert topics.CANARY_PLACED == "placed"
assert topics.CANARY_TRIGGERED == "triggered"
assert topics.CANARY_REVOKED == "revoked"
assert len({
topics.CANARY_PLACED,
topics.CANARY_TRIGGERED,
topics.CANARY_REVOKED,
}) == 3
def test_canary_builder_round_trip() -> None:
assert topics.canary("abc-123", topics.CANARY_TRIGGERED) == "canary.abc-123.triggered"
assert topics.canary("xyz", topics.CANARY_PLACED) == "canary.xyz.placed"
assert topics.canary("xyz", topics.CANARY_REVOKED) == "canary.xyz.revoked"
@pytest.mark.parametrize("bogus_id", ["", "with.dot", "with*wildcard", "with>chevron", "with space"])
def test_canary_builder_rejects_bad_token_id(bogus_id: str) -> None:
with pytest.raises(ValueError):
topics.canary(bogus_id, topics.CANARY_TRIGGERED)
@pytest.mark.parametrize("bogus_event", ["", "x.y", "*", ">"])
def test_canary_builder_rejects_bad_event(bogus_event: str) -> None:
with pytest.raises(ValueError):
topics.canary("good_id", bogus_event)