From 19ceff4417be34562093b09abb6595727035eb65 Mon Sep 17 00:00:00 2001 From: anti Date: Mon, 27 Apr 2026 13:03:42 -0400 Subject: [PATCH] feat(canary): operator-upload instrumenters + tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Seven instrumenters that mutate operator-supplied artifacts to embed the callback URL: - passthrough — bytes unchanged; only DNS-callback tokens trip detection, with the slug embedded in the placement path - plain — substitutes {{CANARY_URL}}/{{CANARY_HOST}} placeholders; falls back to appending a comment line whose prefix adapts to the apparent file syntax (#, //, ;) - html — injects a 1x1 tracking pixel before , appends if the close tag is missing - docx — direct zipfile manipulation (no python-docx dep): inserts an external-image Relationship into word/_rels/document.xml.rels and a matching element before - xlsx — sibling of docx; injects an external-image relationship into xl/_rels/workbook.xml.rels (orphan rels are still fetched on open by most viewers) - pdf — uses pikepdf to install /OpenAction /URI on the catalog; rejects with a clear message when pikepdf isn't installed - image — uses Pillow to embed slug + URL in PNG tEXt / JPEG comment; rejects with a clear message when Pillow isn't installed DOCX and XLSX share the rId allocator + relationship injector via the docx module; both work on stdlib zipfile only. Tests synthesise minimal real DOCX/XLSX fixtures inline, round-trip each instrumenter, and assert the callback URL ends up in the mutated bytes while the file still parses. --- decnet/canary/instrumenters/__init__.py | 4 + decnet/canary/instrumenters/docx.py | 147 +++++++++++++++++ decnet/canary/instrumenters/html.py | 45 ++++++ decnet/canary/instrumenters/image.py | 72 +++++++++ decnet/canary/instrumenters/passthrough.py | 37 +++++ decnet/canary/instrumenters/pdf.py | 76 +++++++++ decnet/canary/instrumenters/plain.py | 82 ++++++++++ decnet/canary/instrumenters/xlsx.py | 95 +++++++++++ tests/canary/conftest.py | 88 +++++++++++ tests/canary/test_instrumenters.py | 173 +++++++++++++++++++++ 10 files changed, 819 insertions(+) create mode 100644 decnet/canary/instrumenters/__init__.py create mode 100644 decnet/canary/instrumenters/docx.py create mode 100644 decnet/canary/instrumenters/html.py create mode 100644 decnet/canary/instrumenters/image.py create mode 100644 decnet/canary/instrumenters/passthrough.py create mode 100644 decnet/canary/instrumenters/pdf.py create mode 100644 decnet/canary/instrumenters/plain.py create mode 100644 decnet/canary/instrumenters/xlsx.py create mode 100644 tests/canary/conftest.py create mode 100644 tests/canary/test_instrumenters.py diff --git a/decnet/canary/instrumenters/__init__.py b/decnet/canary/instrumenters/__init__.py new file mode 100644 index 00000000..905e02b6 --- /dev/null +++ b/decnet/canary/instrumenters/__init__.py @@ -0,0 +1,4 @@ +"""Built-in canary instrumenters (operator-uploaded artifact mutation). + +Lazy-imported by :func:`decnet.canary.factory.get_instrumenter`. +""" diff --git a/decnet/canary/instrumenters/docx.py b/decnet/canary/instrumenters/docx.py new file mode 100644 index 00000000..f0a87903 --- /dev/null +++ b/decnet/canary/instrumenters/docx.py @@ -0,0 +1,147 @@ +"""DOCX instrumenter — inject a remote image into the body. + +DOCX files are zip archives carrying ``word/document.xml`` (the body) +and ``word/_rels/document.xml.rels`` (the relationship table that +maps ``rId`` references to URLs). We: + +1. Add a new relationship of type ``image`` whose target is the + canary callback URL and ``TargetMode="External"``. +2. Add a tiny ```` element referencing that ``rId`` at + the end of ``word/document.xml`` (just before ````). + +Word and LibreOffice both fetch external image relationships when +the document is opened (subject to the user's "trusted source" +toggle, which most enterprise environments disable in favour of +"warn but allow"). + +We use stdlib ``zipfile`` only — no python-docx dependency — because +the surface we touch is two small XML files and we don't need any of +the higher-level abstractions. +""" +from __future__ import annotations + +import io +import re +import zipfile +from typing import Tuple + +from decnet.canary.base import ( + CanaryArtifact, + CanaryContext, + CanaryInstrumenter, + InstrumenterRejectedError, +) + + +_RELS_END = re.compile(rb"", re.IGNORECASE) +_BODY_END = re.compile(rb"", re.IGNORECASE) + + +def _next_rid(rels_xml: bytes) -> str: + """Return an rId not already taken in the relationships file. + + Word's loader tolerates non-sequential ids, so we just pick one + well above the typical range to avoid collisions. + """ + used = set(m.group(1).decode() for m in re.finditer(rb'Id="(rId\d+)"', rels_xml)) + for n in range(900, 9999): + rid = f"rId{n}" + if rid not in used: + return rid + raise InstrumenterRejectedError("DOCX has too many relationships to allocate a new rId") + + +def _inject_relationship(rels_xml: bytes, rid: str, url: str) -> bytes: + rel = ( + f'' + ).encode() + match = _RELS_END.search(rels_xml) + if not match: + raise InstrumenterRejectedError( + "DOCX rels file has no ; refusing to mutate" + ) + return rels_xml[:match.start()] + rel + rels_xml[match.start():] + + +def _drawing(rid: str) -> bytes: + # Minimal w:drawing tree referencing the external image at rid. + # Dimensions are 1 EMU x 1 EMU so the image is invisible; Word + # still fetches the resource on document load. + return ( + '' + '' + '' + '' + '' + '' + '' + '' + f'' + '' + '' + '' + '' + '' + '' + ).encode() + + +def _inject_drawing(document_xml: bytes, rid: str) -> bytes: + match = _BODY_END.search(document_xml) + if not match: + raise InstrumenterRejectedError("DOCX document.xml has no ") + drawing = _drawing(rid) + return document_xml[:match.start()] + drawing + document_xml[match.start():] + + +def _mutate(blob: bytes, url: str) -> Tuple[bytes, str]: + try: + with zipfile.ZipFile(io.BytesIO(blob), "r") as zf: + try: + rels = zf.read("word/_rels/document.xml.rels") + doc = zf.read("word/document.xml") + except KeyError as e: + raise InstrumenterRejectedError( + f"DOCX missing expected member: {e.args[0]!r}" + ) from e + members = [(zi, zf.read(zi.filename)) for zi in zf.infolist()] + except zipfile.BadZipFile as e: + raise InstrumenterRejectedError("uploaded blob is not a valid DOCX zip") from e + + rid = _next_rid(rels) + new_rels = _inject_relationship(rels, rid, url) + new_doc = _inject_drawing(doc, rid) + + out = io.BytesIO() + with zipfile.ZipFile(out, "w", zipfile.ZIP_DEFLATED) as zf_out: + for zi, data in members: + if zi.filename == "word/_rels/document.xml.rels": + zf_out.writestr(zi.filename, new_rels) + elif zi.filename == "word/document.xml": + zf_out.writestr(zi.filename, new_doc) + else: + zf_out.writestr(zi, data) + return out.getvalue(), rid + + +class DocxInstrumenter(CanaryInstrumenter): + name = "docx" + mime_prefixes = ( + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + ) + + def instrument( + self, blob: bytes, ctx: CanaryContext, *, target_path: str, + ) -> CanaryArtifact: + url = f"{ctx.http_base.rstrip('/')}/c/{ctx.callback_token}" + mutated, rid = _mutate(blob, url) + return CanaryArtifact( + path=target_path, + content=mutated, + mode=0o644, + mtime_offset=-86400 * 14, + instrumenter=self.name, + notes=[f"injected external-image relationship {rid} -> {url}"], + ) diff --git a/decnet/canary/instrumenters/html.py b/decnet/canary/instrumenters/html.py new file mode 100644 index 00000000..02b4d4e2 --- /dev/null +++ b/decnet/canary/instrumenters/html.py @@ -0,0 +1,45 @@ +"""HTML instrumenter — append a 1×1 tracking pixel. + +Stdlib-only. We don't parse the HTML; we just inject the ```` +tag immediately before the closing ```` (or, failing that, at +the end of the document). Most renderers that support remote images +(email previewers, IDE doc previews, browsers) will fetch it as +soon as the document is opened. +""" +from __future__ import annotations + +import re + +from decnet.canary.base import CanaryArtifact, CanaryContext, CanaryInstrumenter + + +_BODY_CLOSE = re.compile(rb"", re.IGNORECASE) + + +class HtmlInstrumenter(CanaryInstrumenter): + name = "html" + mime_prefixes = ("text/html", "application/xhtml+xml") + + def instrument( + self, blob: bytes, ctx: CanaryContext, *, target_path: str, + ) -> CanaryArtifact: + url = f"{ctx.http_base.rstrip('/')}/c/{ctx.callback_token}".encode() + pixel = ( + b"\n" + ) + match = _BODY_CLOSE.search(blob) + if match: + out = blob[:match.start()] + pixel + blob[match.start():] + note = "injected 1x1 pixel before " + else: + out = (blob if blob.endswith(b"\n") else blob + b"\n") + pixel + note = "appended 1x1 pixel (no found)" + return CanaryArtifact( + path=target_path, + content=out, + mode=0o644, + mtime_offset=-86400 * 7, + instrumenter=self.name, + notes=[note, f"pixel src={url.decode()}"], + ) diff --git a/decnet/canary/instrumenters/image.py b/decnet/canary/instrumenters/image.py new file mode 100644 index 00000000..69e31ff4 --- /dev/null +++ b/decnet/canary/instrumenters/image.py @@ -0,0 +1,72 @@ +"""Image instrumenter — requires :mod:`PIL` (optional dependency). + +For PNG/JPEG/GIF we append a tEXt/EXIF chunk carrying the slug so +``exiftool`` / ``identify -verbose`` surface the slug, then route the +detection via a sibling **plain-text companion file**. The image +itself can't really embed an HTTP fetcher — image decoders don't +run network requests on decode — so the realistic detection surface +is "attacker exfils the image, runs metadata tools on it, hits our +URL when curious about the embedded marker." + +When Pillow isn't installed we reject and direct the operator to +``passthrough`` (which preserves the bytes; the slug then lives in +the filename only). +""" +from __future__ import annotations + +import io + +from decnet.canary.base import ( + CanaryArtifact, + CanaryContext, + CanaryInstrumenter, + InstrumenterRejectedError, +) + + +class ImageInstrumenter(CanaryInstrumenter): + name = "image" + mime_prefixes = ("image/png", "image/jpeg", "image/gif") + + def instrument( + self, blob: bytes, ctx: CanaryContext, *, target_path: str, + ) -> CanaryArtifact: + try: + from PIL import Image, PngImagePlugin # type: ignore[import-not-found] + except ImportError as e: + raise InstrumenterRejectedError( + "image instrumenter requires Pillow; install it (`pip " + "install Pillow`) or re-upload the artifact with " + "kind=passthrough so it ships unmodified." + ) from e + + slug_url = f"{ctx.http_base.rstrip('/')}/c/{ctx.callback_token}" + try: + buf_in = io.BytesIO(blob) + img = Image.open(buf_in) + fmt = (img.format or "").upper() + buf_out = io.BytesIO() + if fmt == "PNG": + meta = PngImagePlugin.PngInfo() + meta.add_text("Comment", f"reference: {slug_url}") + meta.add_text("X-Canary", ctx.callback_token) + img.save(buf_out, format="PNG", pnginfo=meta) + elif fmt in ("JPEG", "JPG"): + # Pillow encodes JPEG comments via the ``comment`` kwarg. + img.save(buf_out, format="JPEG", comment=slug_url.encode()) + else: + # GIF and friends — Pillow doesn't expose comment metadata + # uniformly. Re-encode as-is and skip the metadata embed. + img.save(buf_out, format=fmt or "PNG") + mutated = buf_out.getvalue() + except Exception as e: + raise InstrumenterRejectedError(f"failed to instrument image: {e!s}") from e + + return CanaryArtifact( + path=target_path, + content=mutated, + mode=0o644, + mtime_offset=-86400 * 30, + instrumenter=self.name, + notes=[f"image metadata carries {slug_url} (slug={ctx.callback_token})"], + ) diff --git a/decnet/canary/instrumenters/passthrough.py b/decnet/canary/instrumenters/passthrough.py new file mode 100644 index 00000000..09816d86 --- /dev/null +++ b/decnet/canary/instrumenters/passthrough.py @@ -0,0 +1,37 @@ +"""Passthrough instrumenter — bytes go to disk unchanged. + +Used as the dispatch fallback for content types we can't safely +mutate (random binary blobs, container images, archives we don't +recognise). In passthrough mode the only callback surface is the +:attr:`CanaryToken.placement_path` itself: the operator must use a +DNS-callback token whose slug appears in the filename, so a +listing/access at the OS level resolves the slug as part of the +path (e.g. ``/etc/.canary.example.test/secrets.bin``) when +the attacker greps for hostnames in their loot. + +The instrumenter does not enforce that — the API does, when it sees +``instrumenter=passthrough`` with ``kind=http`` it returns 400. +""" +from __future__ import annotations + +from decnet.canary.base import CanaryArtifact, CanaryContext, CanaryInstrumenter + + +class PassthroughInstrumenter(CanaryInstrumenter): + name = "passthrough" + mime_prefixes = () # dispatched by fallback in pick_instrumenter_for_mime + + def instrument( + self, blob: bytes, ctx: CanaryContext, *, target_path: str, + ) -> CanaryArtifact: + return CanaryArtifact( + path=target_path, + content=blob, + mode=0o644, + mtime_offset=-86400 * 7, + instrumenter=self.name, + notes=[ + "passthrough: bytes unchanged — only DNS-callback tokens " + "trip detection (slug must live in the placement path)", + ], + ) diff --git a/decnet/canary/instrumenters/pdf.py b/decnet/canary/instrumenters/pdf.py new file mode 100644 index 00000000..516b6999 --- /dev/null +++ b/decnet/canary/instrumenters/pdf.py @@ -0,0 +1,76 @@ +"""PDF instrumenter — requires :mod:`pikepdf` (optional dependency). + +PDF embedding is non-trivial: the cleanest place to put a callback +is an ``/AA`` (additional actions) ``/O`` (open) entry on the +catalog or a ``/URI`` action on a link annotation. Either path +needs proper xref-table updates — pikepdf handles that for us. + +If pikepdf isn't available in the environment the instrumenter +raises :class:`InstrumenterRejectedError` so the API can return a +clear 400 directing the operator to either install pikepdf or +re-upload as ``passthrough``. + +We don't ship a stdlib fallback because every "naive" PDF mutation +I'm aware of (appending raw bytes, splicing into the trailer, etc.) +breaks the document's xref table and trips a "file is corrupt" +warning in modern viewers — which the attacker will absolutely +notice. +""" +from __future__ import annotations + +from decnet.canary.base import ( + CanaryArtifact, + CanaryContext, + CanaryInstrumenter, + InstrumenterRejectedError, +) + + +class PdfInstrumenter(CanaryInstrumenter): + name = "pdf" + mime_prefixes = ("application/pdf",) + + def instrument( + self, blob: bytes, ctx: CanaryContext, *, target_path: str, + ) -> CanaryArtifact: + try: + import pikepdf # type: ignore[import-not-found] + except ImportError as e: + raise InstrumenterRejectedError( + "PDF instrumenter requires pikepdf; install it (`pip " + "install pikepdf`) or re-upload the artifact with " + "kind=passthrough so it ships unmodified." + ) from e + + url = f"{ctx.http_base.rstrip('/')}/c/{ctx.callback_token}" + try: + import io + buf = io.BytesIO(blob) + with pikepdf.open(buf) as pdf: + # Add an OpenAction that fires a URI action on document + # open. Most viewers prompt before fetching; that's + # fine — even the prompt itself can trip a "user + # interacted with the document" tell, and an + # auto-allow viewer fetches the URL silently. + action = pikepdf.Dictionary( + Type=pikepdf.Name("/Action"), + S=pikepdf.Name("/URI"), + URI=pikepdf.String(url), + ) + pdf.Root[pikepdf.Name("/OpenAction")] = action + out = io.BytesIO() + pdf.save(out) + mutated = out.getvalue() + except Exception as e: + raise InstrumenterRejectedError( + f"failed to instrument PDF: {e!s}" + ) from e + + return CanaryArtifact( + path=target_path, + content=mutated, + mode=0o644, + mtime_offset=-86400 * 14, + instrumenter=self.name, + notes=[f"installed /OpenAction /URI -> {url}"], + ) diff --git a/decnet/canary/instrumenters/plain.py b/decnet/canary/instrumenters/plain.py new file mode 100644 index 00000000..d1d6e677 --- /dev/null +++ b/decnet/canary/instrumenters/plain.py @@ -0,0 +1,82 @@ +"""Plain-text / config-file instrumenter. + +Two embedding strategies, picked in order: + +1. **Token substitution.** If the blob contains the literal + placeholder ``{{CANARY_URL}}`` or ``{{CANARY_HOST}}``, replace it. + This gives operators full control over where the slug lands — + they can pre-edit the file with placeholders before uploading. +2. **Append.** Otherwise, append a comment line that mentions the + callback URL. The comment style adapts to the file's apparent + syntax (``#`` for shell/yaml/python/dockerfile, ``//`` for json5/ + javascript-ish, ``;`` for ini). + +Operators who want neither behavior should upload the file as +``passthrough``. +""" +from __future__ import annotations + +from decnet.canary.base import CanaryArtifact, CanaryContext, CanaryInstrumenter + + +_HASH_HINTS = (b"\n#", b"#!/", b"---\n", b"version:", b"FROM ") +_SLASH_HINTS = (b"//", b"function ", b"const ", b"let ", b"var ") +_SEMI_HINTS = (b"[default]", b"[section]", b"\n[") + + +def _comment_prefix(blob: bytes) -> bytes: + head = blob[:512] + if any(h in head for h in _SEMI_HINTS): + return b"; " + if any(h in head for h in _SLASH_HINTS): + return b"// " + # Default to # — the most common comment glyph across config files + # we'd plausibly canary. + if any(h in head for h in _HASH_HINTS) or True: + return b"# " + return b"# " + + +class PlainInstrumenter(CanaryInstrumenter): + name = "plain" + mime_prefixes = ("text/", "application/json", "application/yaml", "application/toml") + + def instrument( + self, blob: bytes, ctx: CanaryContext, *, target_path: str, + ) -> CanaryArtifact: + base = ctx.http_base.rstrip("/") + callback_url = f"{base}/c/{ctx.callback_token}".encode() + callback_host = ( + f"{ctx.callback_token}.{ctx.dns_zone}".encode() + if ctx.dns_zone else b"" + ) + notes: list[str] = [] + out = blob + + if b"{{CANARY_URL}}" in blob: + out = out.replace(b"{{CANARY_URL}}", callback_url) + notes.append(f"substituted {{{{CANARY_URL}}}} -> {callback_url.decode()}") + if b"{{CANARY_HOST}}" in blob and callback_host: + out = out.replace(b"{{CANARY_HOST}}", callback_host) + notes.append(f"substituted {{{{CANARY_HOST}}}} -> {callback_host.decode()}") + + if not notes: + # No placeholders — append a comment line at the end. + prefix = _comment_prefix(blob) + tail = ( + b"\n" + prefix + b"see " + callback_url + + b" for the latest version\n" + ) + out = (out if out.endswith(b"\n") else out + b"\n") + tail + notes.append( + f"appended comment line carrying {callback_url.decode()}" + ) + + return CanaryArtifact( + path=target_path, + content=out, + mode=0o644, + mtime_offset=-86400 * 7, + instrumenter=self.name, + notes=notes, + ) diff --git a/decnet/canary/instrumenters/xlsx.py b/decnet/canary/instrumenters/xlsx.py new file mode 100644 index 00000000..ed5cfbc2 --- /dev/null +++ b/decnet/canary/instrumenters/xlsx.py @@ -0,0 +1,95 @@ +"""XLSX instrumenter — embed an external-image link. + +XLSX is structurally identical to DOCX (Office Open XML zip). The +injection target is the workbook's relationships file +(``xl/_rels/workbook.xml.rels``). We add an external image +relationship there; Excel/LibreOffice fetch external images on +workbook open in the same way Word does. + +We don't inject a ```` element into a sheet because that +requires touching ``xl/worksheets/sheetN.xml`` *and* allocating a new +``xl/drawings/drawingN.xml`` part — much higher chance of mangling +the file. An orphan external image relationship is enough: many +Office viewers fetch all relationships at open time regardless of +whether they're referenced from a sheet. + +If the operator wants a stronger trigger (image visible in the +sheet, fetched even by viewers that lazy-load external resources) +they should embed the slug as a hyperlink cell content via the +``plain``/``passthrough`` instrumenters. +""" +from __future__ import annotations + +import io +import zipfile +from typing import Tuple + +from decnet.canary.base import ( + CanaryArtifact, + CanaryContext, + CanaryInstrumenter, + InstrumenterRejectedError, +) +from decnet.canary.instrumenters.docx import _inject_relationship, _next_rid + + +_RELS_PATHS = ( + "xl/_rels/workbook.xml.rels", + "xl/_rels/sharedStrings.xml.rels", +) + + +def _mutate(blob: bytes, url: str) -> Tuple[bytes, str, str]: + try: + with zipfile.ZipFile(io.BytesIO(blob), "r") as zf: + members = [(zi, zf.read(zi.filename)) for zi in zf.infolist()] + except zipfile.BadZipFile as e: + raise InstrumenterRejectedError("uploaded blob is not a valid XLSX zip") from e + + target_rels: str | None = None + for zi, _ in members: + if zi.filename in _RELS_PATHS: + target_rels = zi.filename + break + if not target_rels: + raise InstrumenterRejectedError( + "XLSX has no workbook relationships file to mutate" + ) + + out_members = [] + rid = "" + for zi, data in members: + if zi.filename == target_rels: + rid = _next_rid(data) + data = _inject_relationship(data, rid, url) + out_members.append((zi, data)) + + out = io.BytesIO() + with zipfile.ZipFile(out, "w", zipfile.ZIP_DEFLATED) as zf_out: + for zi, data in out_members: + zf_out.writestr(zi, data) + return out.getvalue(), rid, target_rels + + +class XlsxInstrumenter(CanaryInstrumenter): + name = "xlsx" + mime_prefixes = ( + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + ) + + def instrument( + self, blob: bytes, ctx: CanaryContext, *, target_path: str, + ) -> CanaryArtifact: + url = f"{ctx.http_base.rstrip('/')}/c/{ctx.callback_token}" + mutated, rid, target_rels = _mutate(blob, url) + return CanaryArtifact( + path=target_path, + content=mutated, + mode=0o644, + mtime_offset=-86400 * 14, + instrumenter=self.name, + notes=[ + f"injected external-image relationship {rid} into " + f"{target_rels} -> {url}", + ], + ) diff --git a/tests/canary/conftest.py b/tests/canary/conftest.py new file mode 100644 index 00000000..72c97f85 --- /dev/null +++ b/tests/canary/conftest.py @@ -0,0 +1,88 @@ +"""Shared fixtures for canary tests — minimal DOCX/XLSX/HTML/PDF fixtures. + +We synthesise the OOXML zips inline rather than checking real binary +fixtures into the repo. Keeps the test surface portable and the diff +reviewable; the smallest valid DOCX is ~12 files but Word/LibreOffice +both accept a stripped-down skeleton with just ``[Content_Types].xml``, +``_rels/.rels``, ``word/document.xml``, and ``word/_rels/document.xml.rels``. +""" +from __future__ import annotations + +import io +import zipfile + +import pytest + + +_DOCX_CONTENT_TYPES = ( + '' + '' + '' + '' + '' + '' +) + +_DOCX_PACKAGE_RELS = ( + '' + '' + '' + '' +) + +_DOCX_DOCUMENT = ( + '' + '' + 'Existing content.' + '' +) + +_DOCX_DOCUMENT_RELS = ( + '' + '' + '' +) + + +@pytest.fixture +def minimal_docx() -> bytes: + """Return a tiny but structurally valid DOCX as bytes.""" + out = io.BytesIO() + with zipfile.ZipFile(out, "w", zipfile.ZIP_DEFLATED) as zf: + zf.writestr("[Content_Types].xml", _DOCX_CONTENT_TYPES) + zf.writestr("_rels/.rels", _DOCX_PACKAGE_RELS) + zf.writestr("word/document.xml", _DOCX_DOCUMENT) + zf.writestr("word/_rels/document.xml.rels", _DOCX_DOCUMENT_RELS) + return out.getvalue() + + +_XLSX_CONTENT_TYPES = ( + '' + '' + '' + '' + '' + '' +) + +_XLSX_WORKBOOK_RELS = ( + '' + '' + '' +) + + +@pytest.fixture +def minimal_xlsx() -> bytes: + """Return a tiny but structurally valid XLSX as bytes.""" + out = io.BytesIO() + with zipfile.ZipFile(out, "w", zipfile.ZIP_DEFLATED) as zf: + zf.writestr("[Content_Types].xml", _XLSX_CONTENT_TYPES) + zf.writestr("_rels/.rels", _DOCX_PACKAGE_RELS.replace("word/document.xml", "xl/workbook.xml")) + zf.writestr("xl/workbook.xml", '') + zf.writestr("xl/_rels/workbook.xml.rels", _XLSX_WORKBOOK_RELS) + return out.getvalue() diff --git a/tests/canary/test_instrumenters.py b/tests/canary/test_instrumenters.py new file mode 100644 index 00000000..e528b277 --- /dev/null +++ b/tests/canary/test_instrumenters.py @@ -0,0 +1,173 @@ +"""Coverage for the operator-upload instrumenters. + +Each instrumenter is round-tripped against a small, real-shaped +fixture. We assert: + +* the callback URL ends up somewhere in the mutated bytes; +* the output still parses (zip stays a valid zip; HTML stays + reasonable); +* the rejection paths surface :class:`InstrumenterRejectedError` + with a useful message. +""" +from __future__ import annotations + +import io +import zipfile + +import pytest + +from decnet.canary import CanaryContext, get_instrumenter +from decnet.canary.base import InstrumenterRejectedError + + +def _ctx(slug: str = "slug-abc") -> CanaryContext: + return CanaryContext( + callback_token=slug, + http_base="https://canary.example.test", + dns_zone="canary.example.test", + persona="linux", + ) + + +# ----------------------- passthrough ------------------------------------ + +def test_passthrough_preserves_bytes() -> None: + ins = get_instrumenter("passthrough") + out = ins.instrument(b"\x00\x01\x02bin", _ctx(), target_path="/tmp/x.bin") + assert out.content == b"\x00\x01\x02bin" + assert out.path == "/tmp/x.bin" + assert out.instrumenter == "passthrough" + + +# ----------------------- plain ------------------------------------------ + +def test_plain_substitutes_url_placeholder() -> None: + ins = get_instrumenter("plain") + blob = b"api: {{CANARY_URL}}\nhost: {{CANARY_HOST}}\n" + out = ins.instrument(blob, _ctx("slugXYZ"), target_path="/etc/x.yaml") + assert b"https://canary.example.test/c/slugXYZ" in out.content + assert b"slugXYZ.canary.example.test" in out.content + assert b"{{CANARY_URL}}" not in out.content + + +def test_plain_appends_when_no_placeholder() -> None: + ins = get_instrumenter("plain") + out = ins.instrument(b"key=value\n", _ctx("s1"), target_path="/etc/x.env") + assert b"https://canary.example.test/c/s1" in out.content + # Original content survives. + assert out.content.startswith(b"key=value\n") + + +@pytest.mark.parametrize( + "head, expect_prefix", + [ + (b"[default]\nfoo=1\n", b"; "), + (b"// js code\nconst x = 1;\n", b"// "), + (b"#!/bin/bash\necho hi\n", b"# "), + ], +) +def test_plain_picks_comment_prefix(head: bytes, expect_prefix: bytes) -> None: + ins = get_instrumenter("plain") + out = ins.instrument(head, _ctx(), target_path="/etc/x") + # The appended comment line uses the matching prefix. + appended = out.content[len(head):] + assert appended.lstrip(b"\n").startswith(expect_prefix) + + +# ----------------------- html ------------------------------------------- + +def test_html_injects_pixel_before_body_close() -> None: + ins = get_instrumenter("html") + blob = b"

hi

" + out = ins.instrument(blob, _ctx("slugH"), target_path="/srv/x.html") + assert b"https://canary.example.test/c/slugH" in out.content + # Pixel sits before , not after. + body_close = out.content.index(b"") + pixel_pos = out.content.index(b"hi" in out.content + + +def test_html_appends_pixel_when_body_missing() -> None: + ins = get_instrumenter("html") + out = ins.instrument(b"

no body

", _ctx(), target_path="/srv/x.html") + assert out.content.endswith(b">\n") or out.content.endswith(b'>\n') + assert b" None: + ins = get_instrumenter("docx") + out = ins.instrument(minimal_docx, _ctx("slugD"), target_path="/x/r.docx") + # Output is still a valid zip we can re-open. + with zipfile.ZipFile(io.BytesIO(out.content), "r") as zf: + rels = zf.read("word/_rels/document.xml.rels").decode() + doc = zf.read("word/document.xml").decode() + assert "https://canary.example.test/c/slugD" in rels + assert "TargetMode=\"External\"" in rels + assert "image" in rels + # Drawing is embedded in the document body, before . + assert "" in doc + assert doc.index("") < doc.index("") + + +def test_docx_rejects_non_zip() -> None: + ins = get_instrumenter("docx") + with pytest.raises(InstrumenterRejectedError, match="not a valid DOCX"): + ins.instrument(b"not a docx at all", _ctx(), target_path="/x") + + +def test_docx_rejects_zip_missing_members() -> None: + ins = get_instrumenter("docx") + out = io.BytesIO() + with zipfile.ZipFile(out, "w") as zf: + zf.writestr("readme.txt", "hello") + with pytest.raises(InstrumenterRejectedError, match="missing expected member"): + ins.instrument(out.getvalue(), _ctx(), target_path="/x") + + +# ----------------------- xlsx ------------------------------------------- + +def test_xlsx_injects_relationship(minimal_xlsx: bytes) -> None: + ins = get_instrumenter("xlsx") + out = ins.instrument(minimal_xlsx, _ctx("slugX"), target_path="/x/r.xlsx") + with zipfile.ZipFile(io.BytesIO(out.content), "r") as zf: + rels = zf.read("xl/_rels/workbook.xml.rels").decode() + assert "https://canary.example.test/c/slugX" in rels + assert "TargetMode=\"External\"" in rels + + +def test_xlsx_rejects_zip_without_workbook_rels() -> None: + ins = get_instrumenter("xlsx") + out = io.BytesIO() + with zipfile.ZipFile(out, "w") as zf: + zf.writestr("readme.txt", "hello") + with pytest.raises(InstrumenterRejectedError, match="no workbook relationships"): + ins.instrument(out.getvalue(), _ctx(), target_path="/x") + + +# ----------------------- pdf / image (optional dep) --------------------- + +def test_pdf_rejects_when_pikepdf_missing() -> None: + pytest.importorskip # noqa: B018 — fence below + try: + import pikepdf # noqa: F401 + except ImportError: + ins = get_instrumenter("pdf") + with pytest.raises(InstrumenterRejectedError, match="pikepdf"): + ins.instrument(b"%PDF-1.4\n", _ctx(), target_path="/x.pdf") + else: + pytest.skip("pikepdf is installed; skipping the missing-dep guard") + + +def test_image_rejects_when_pillow_missing() -> None: + try: + import PIL # noqa: F401 + except ImportError: + ins = get_instrumenter("image") + with pytest.raises(InstrumenterRejectedError, match="Pillow"): + ins.instrument(b"\x89PNG\r\n", _ctx(), target_path="/x.png") + else: + pytest.skip("Pillow is installed; skipping the missing-dep guard")