feat(canary): operator-upload instrumenters + tests

Seven instrumenters that mutate operator-supplied artifacts to
embed the callback URL:

- passthrough — bytes unchanged; only DNS-callback tokens trip
  detection, with the slug embedded in the placement path
- plain      — substitutes {{CANARY_URL}}/{{CANARY_HOST}} placeholders;
  falls back to appending a comment line whose prefix adapts to the
  apparent file syntax (#, //, ;)
- html       — injects a 1x1 tracking pixel before </body>, appends
  if the close tag is missing
- docx       — direct zipfile manipulation (no python-docx dep):
  inserts an external-image Relationship into word/_rels/document.xml.rels
  and a matching <w:drawing> element before </w:body>
- xlsx       — sibling of docx; injects an external-image relationship
  into xl/_rels/workbook.xml.rels (orphan rels are still fetched on
  open by most viewers)
- pdf        — uses pikepdf to install /OpenAction /URI on the catalog;
  rejects with a clear message when pikepdf isn't installed
- image      — uses Pillow to embed slug + URL in PNG tEXt / JPEG
  comment; rejects with a clear message when Pillow isn't installed

DOCX and XLSX share the rId allocator + relationship injector via
the docx module; both work on stdlib zipfile only.

Tests synthesise minimal real DOCX/XLSX fixtures inline, round-trip
each instrumenter, and assert the callback URL ends up in the
mutated bytes while the file still parses.
This commit is contained in:
2026-04-27 13:03:42 -04:00
parent c7658ea65e
commit 19ceff4417
10 changed files with 819 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
"""Built-in canary instrumenters (operator-uploaded artifact mutation).
Lazy-imported by :func:`decnet.canary.factory.get_instrumenter`.
"""

View File

@@ -0,0 +1,147 @@
"""DOCX instrumenter — inject a remote image into the body.
DOCX files are zip archives carrying ``word/document.xml`` (the body)
and ``word/_rels/document.xml.rels`` (the relationship table that
maps ``rId`` references to URLs). We:
1. Add a new relationship of type ``image`` whose target is the
canary callback URL and ``TargetMode="External"``.
2. Add a tiny ``<w:drawing>`` element referencing that ``rId`` at
the end of ``word/document.xml`` (just before ``</w:body>``).
Word and LibreOffice both fetch external image relationships when
the document is opened (subject to the user's "trusted source"
toggle, which most enterprise environments disable in favour of
"warn but allow").
We use stdlib ``zipfile`` only — no python-docx dependency — because
the surface we touch is two small XML files and we don't need any of
the higher-level abstractions.
"""
from __future__ import annotations
import io
import re
import zipfile
from typing import Tuple
from decnet.canary.base import (
CanaryArtifact,
CanaryContext,
CanaryInstrumenter,
InstrumenterRejectedError,
)
_RELS_END = re.compile(rb"</Relationships\s*>", re.IGNORECASE)
_BODY_END = re.compile(rb"</w:body\s*>", re.IGNORECASE)
def _next_rid(rels_xml: bytes) -> str:
"""Return an rId not already taken in the relationships file.
Word's loader tolerates non-sequential ids, so we just pick one
well above the typical range to avoid collisions.
"""
used = set(m.group(1).decode() for m in re.finditer(rb'Id="(rId\d+)"', rels_xml))
for n in range(900, 9999):
rid = f"rId{n}"
if rid not in used:
return rid
raise InstrumenterRejectedError("DOCX has too many relationships to allocate a new rId")
def _inject_relationship(rels_xml: bytes, rid: str, url: str) -> bytes:
rel = (
f'<Relationship Id="{rid}" '
f'Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/image" '
f'Target="{url}" TargetMode="External"/>'
).encode()
match = _RELS_END.search(rels_xml)
if not match:
raise InstrumenterRejectedError(
"DOCX rels file has no </Relationships>; refusing to mutate"
)
return rels_xml[:match.start()] + rel + rels_xml[match.start():]
def _drawing(rid: str) -> bytes:
# Minimal w:drawing tree referencing the external image at rid.
# Dimensions are 1 EMU x 1 EMU so the image is invisible; Word
# still fetches the resource on document load.
return (
'<w:p><w:r><w:drawing>'
'<wp:inline xmlns:wp="http://schemas.openxmlformats.org/drawingml/2006/wordprocessingDrawing">'
'<wp:extent cx="1" cy="1"/><wp:docPr id="1" name="canary"/>'
'<a:graphic xmlns:a="http://schemas.openxmlformats.org/drawingml/2006/main">'
'<a:graphicData uri="http://schemas.openxmlformats.org/drawingml/2006/picture">'
'<pic:pic xmlns:pic="http://schemas.openxmlformats.org/drawingml/2006/picture">'
'<pic:nvPicPr><pic:cNvPr id="1" name="canary"/><pic:cNvPicPr/></pic:nvPicPr>'
'<pic:blipFill>'
f'<a:blip xmlns:r="http://schemas.openxmlformats.org/officeDocument/2006/relationships" r:link="{rid}"/>'
'<a:stretch><a:fillRect/></a:stretch>'
'</pic:blipFill>'
'<pic:spPr><a:xfrm><a:off x="0" y="0"/><a:ext cx="1" cy="1"/></a:xfrm>'
'<a:prstGeom prst="rect"><a:avLst/></a:prstGeom></pic:spPr>'
'</pic:pic></a:graphicData></a:graphic></wp:inline>'
'</w:drawing></w:r></w:p>'
).encode()
def _inject_drawing(document_xml: bytes, rid: str) -> bytes:
match = _BODY_END.search(document_xml)
if not match:
raise InstrumenterRejectedError("DOCX document.xml has no </w:body>")
drawing = _drawing(rid)
return document_xml[:match.start()] + drawing + document_xml[match.start():]
def _mutate(blob: bytes, url: str) -> Tuple[bytes, str]:
try:
with zipfile.ZipFile(io.BytesIO(blob), "r") as zf:
try:
rels = zf.read("word/_rels/document.xml.rels")
doc = zf.read("word/document.xml")
except KeyError as e:
raise InstrumenterRejectedError(
f"DOCX missing expected member: {e.args[0]!r}"
) from e
members = [(zi, zf.read(zi.filename)) for zi in zf.infolist()]
except zipfile.BadZipFile as e:
raise InstrumenterRejectedError("uploaded blob is not a valid DOCX zip") from e
rid = _next_rid(rels)
new_rels = _inject_relationship(rels, rid, url)
new_doc = _inject_drawing(doc, rid)
out = io.BytesIO()
with zipfile.ZipFile(out, "w", zipfile.ZIP_DEFLATED) as zf_out:
for zi, data in members:
if zi.filename == "word/_rels/document.xml.rels":
zf_out.writestr(zi.filename, new_rels)
elif zi.filename == "word/document.xml":
zf_out.writestr(zi.filename, new_doc)
else:
zf_out.writestr(zi, data)
return out.getvalue(), rid
class DocxInstrumenter(CanaryInstrumenter):
name = "docx"
mime_prefixes = (
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
)
def instrument(
self, blob: bytes, ctx: CanaryContext, *, target_path: str,
) -> CanaryArtifact:
url = f"{ctx.http_base.rstrip('/')}/c/{ctx.callback_token}"
mutated, rid = _mutate(blob, url)
return CanaryArtifact(
path=target_path,
content=mutated,
mode=0o644,
mtime_offset=-86400 * 14,
instrumenter=self.name,
notes=[f"injected external-image relationship {rid} -> {url}"],
)

View File

@@ -0,0 +1,45 @@
"""HTML instrumenter — append a 1×1 tracking pixel.
Stdlib-only. We don't parse the HTML; we just inject the ``<img>``
tag immediately before the closing ``</body>`` (or, failing that, at
the end of the document). Most renderers that support remote images
(email previewers, IDE doc previews, browsers) will fetch it as
soon as the document is opened.
"""
from __future__ import annotations
import re
from decnet.canary.base import CanaryArtifact, CanaryContext, CanaryInstrumenter
_BODY_CLOSE = re.compile(rb"</body\s*>", re.IGNORECASE)
class HtmlInstrumenter(CanaryInstrumenter):
name = "html"
mime_prefixes = ("text/html", "application/xhtml+xml")
def instrument(
self, blob: bytes, ctx: CanaryContext, *, target_path: str,
) -> CanaryArtifact:
url = f"{ctx.http_base.rstrip('/')}/c/{ctx.callback_token}".encode()
pixel = (
b"<img src=\"" + url + b"\" width=\"1\" height=\"1\" "
b"alt=\"\" style=\"display:none\">\n"
)
match = _BODY_CLOSE.search(blob)
if match:
out = blob[:match.start()] + pixel + blob[match.start():]
note = "injected 1x1 pixel before </body>"
else:
out = (blob if blob.endswith(b"\n") else blob + b"\n") + pixel
note = "appended 1x1 pixel (no </body> found)"
return CanaryArtifact(
path=target_path,
content=out,
mode=0o644,
mtime_offset=-86400 * 7,
instrumenter=self.name,
notes=[note, f"pixel src={url.decode()}"],
)

View File

@@ -0,0 +1,72 @@
"""Image instrumenter — requires :mod:`PIL` (optional dependency).
For PNG/JPEG/GIF we append a tEXt/EXIF chunk carrying the slug so
``exiftool`` / ``identify -verbose`` surface the slug, then route the
detection via a sibling **plain-text companion file**. The image
itself can't really embed an HTTP fetcher — image decoders don't
run network requests on decode — so the realistic detection surface
is "attacker exfils the image, runs metadata tools on it, hits our
URL when curious about the embedded marker."
When Pillow isn't installed we reject and direct the operator to
``passthrough`` (which preserves the bytes; the slug then lives in
the filename only).
"""
from __future__ import annotations
import io
from decnet.canary.base import (
CanaryArtifact,
CanaryContext,
CanaryInstrumenter,
InstrumenterRejectedError,
)
class ImageInstrumenter(CanaryInstrumenter):
name = "image"
mime_prefixes = ("image/png", "image/jpeg", "image/gif")
def instrument(
self, blob: bytes, ctx: CanaryContext, *, target_path: str,
) -> CanaryArtifact:
try:
from PIL import Image, PngImagePlugin # type: ignore[import-not-found]
except ImportError as e:
raise InstrumenterRejectedError(
"image instrumenter requires Pillow; install it (`pip "
"install Pillow`) or re-upload the artifact with "
"kind=passthrough so it ships unmodified."
) from e
slug_url = f"{ctx.http_base.rstrip('/')}/c/{ctx.callback_token}"
try:
buf_in = io.BytesIO(blob)
img = Image.open(buf_in)
fmt = (img.format or "").upper()
buf_out = io.BytesIO()
if fmt == "PNG":
meta = PngImagePlugin.PngInfo()
meta.add_text("Comment", f"reference: {slug_url}")
meta.add_text("X-Canary", ctx.callback_token)
img.save(buf_out, format="PNG", pnginfo=meta)
elif fmt in ("JPEG", "JPG"):
# Pillow encodes JPEG comments via the ``comment`` kwarg.
img.save(buf_out, format="JPEG", comment=slug_url.encode())
else:
# GIF and friends — Pillow doesn't expose comment metadata
# uniformly. Re-encode as-is and skip the metadata embed.
img.save(buf_out, format=fmt or "PNG")
mutated = buf_out.getvalue()
except Exception as e:
raise InstrumenterRejectedError(f"failed to instrument image: {e!s}") from e
return CanaryArtifact(
path=target_path,
content=mutated,
mode=0o644,
mtime_offset=-86400 * 30,
instrumenter=self.name,
notes=[f"image metadata carries {slug_url} (slug={ctx.callback_token})"],
)

View File

@@ -0,0 +1,37 @@
"""Passthrough instrumenter — bytes go to disk unchanged.
Used as the dispatch fallback for content types we can't safely
mutate (random binary blobs, container images, archives we don't
recognise). In passthrough mode the only callback surface is the
:attr:`CanaryToken.placement_path` itself: the operator must use a
DNS-callback token whose slug appears in the filename, so a
listing/access at the OS level resolves the slug as part of the
path (e.g. ``/etc/<slug>.canary.example.test/secrets.bin``) when
the attacker greps for hostnames in their loot.
The instrumenter does not enforce that — the API does, when it sees
``instrumenter=passthrough`` with ``kind=http`` it returns 400.
"""
from __future__ import annotations
from decnet.canary.base import CanaryArtifact, CanaryContext, CanaryInstrumenter
class PassthroughInstrumenter(CanaryInstrumenter):
name = "passthrough"
mime_prefixes = () # dispatched by fallback in pick_instrumenter_for_mime
def instrument(
self, blob: bytes, ctx: CanaryContext, *, target_path: str,
) -> CanaryArtifact:
return CanaryArtifact(
path=target_path,
content=blob,
mode=0o644,
mtime_offset=-86400 * 7,
instrumenter=self.name,
notes=[
"passthrough: bytes unchanged — only DNS-callback tokens "
"trip detection (slug must live in the placement path)",
],
)

View File

@@ -0,0 +1,76 @@
"""PDF instrumenter — requires :mod:`pikepdf` (optional dependency).
PDF embedding is non-trivial: the cleanest place to put a callback
is an ``/AA`` (additional actions) ``/O`` (open) entry on the
catalog or a ``/URI`` action on a link annotation. Either path
needs proper xref-table updates — pikepdf handles that for us.
If pikepdf isn't available in the environment the instrumenter
raises :class:`InstrumenterRejectedError` so the API can return a
clear 400 directing the operator to either install pikepdf or
re-upload as ``passthrough``.
We don't ship a stdlib fallback because every "naive" PDF mutation
I'm aware of (appending raw bytes, splicing into the trailer, etc.)
breaks the document's xref table and trips a "file is corrupt"
warning in modern viewers — which the attacker will absolutely
notice.
"""
from __future__ import annotations
from decnet.canary.base import (
CanaryArtifact,
CanaryContext,
CanaryInstrumenter,
InstrumenterRejectedError,
)
class PdfInstrumenter(CanaryInstrumenter):
name = "pdf"
mime_prefixes = ("application/pdf",)
def instrument(
self, blob: bytes, ctx: CanaryContext, *, target_path: str,
) -> CanaryArtifact:
try:
import pikepdf # type: ignore[import-not-found]
except ImportError as e:
raise InstrumenterRejectedError(
"PDF instrumenter requires pikepdf; install it (`pip "
"install pikepdf`) or re-upload the artifact with "
"kind=passthrough so it ships unmodified."
) from e
url = f"{ctx.http_base.rstrip('/')}/c/{ctx.callback_token}"
try:
import io
buf = io.BytesIO(blob)
with pikepdf.open(buf) as pdf:
# Add an OpenAction that fires a URI action on document
# open. Most viewers prompt before fetching; that's
# fine — even the prompt itself can trip a "user
# interacted with the document" tell, and an
# auto-allow viewer fetches the URL silently.
action = pikepdf.Dictionary(
Type=pikepdf.Name("/Action"),
S=pikepdf.Name("/URI"),
URI=pikepdf.String(url),
)
pdf.Root[pikepdf.Name("/OpenAction")] = action
out = io.BytesIO()
pdf.save(out)
mutated = out.getvalue()
except Exception as e:
raise InstrumenterRejectedError(
f"failed to instrument PDF: {e!s}"
) from e
return CanaryArtifact(
path=target_path,
content=mutated,
mode=0o644,
mtime_offset=-86400 * 14,
instrumenter=self.name,
notes=[f"installed /OpenAction /URI -> {url}"],
)

View File

@@ -0,0 +1,82 @@
"""Plain-text / config-file instrumenter.
Two embedding strategies, picked in order:
1. **Token substitution.** If the blob contains the literal
placeholder ``{{CANARY_URL}}`` or ``{{CANARY_HOST}}``, replace it.
This gives operators full control over where the slug lands —
they can pre-edit the file with placeholders before uploading.
2. **Append.** Otherwise, append a comment line that mentions the
callback URL. The comment style adapts to the file's apparent
syntax (``#`` for shell/yaml/python/dockerfile, ``//`` for json5/
javascript-ish, ``;`` for ini).
Operators who want neither behavior should upload the file as
``passthrough``.
"""
from __future__ import annotations
from decnet.canary.base import CanaryArtifact, CanaryContext, CanaryInstrumenter
_HASH_HINTS = (b"\n#", b"#!/", b"---\n", b"version:", b"FROM ")
_SLASH_HINTS = (b"//", b"function ", b"const ", b"let ", b"var ")
_SEMI_HINTS = (b"[default]", b"[section]", b"\n[")
def _comment_prefix(blob: bytes) -> bytes:
head = blob[:512]
if any(h in head for h in _SEMI_HINTS):
return b"; "
if any(h in head for h in _SLASH_HINTS):
return b"// "
# Default to # — the most common comment glyph across config files
# we'd plausibly canary.
if any(h in head for h in _HASH_HINTS) or True:
return b"# "
return b"# "
class PlainInstrumenter(CanaryInstrumenter):
name = "plain"
mime_prefixes = ("text/", "application/json", "application/yaml", "application/toml")
def instrument(
self, blob: bytes, ctx: CanaryContext, *, target_path: str,
) -> CanaryArtifact:
base = ctx.http_base.rstrip("/")
callback_url = f"{base}/c/{ctx.callback_token}".encode()
callback_host = (
f"{ctx.callback_token}.{ctx.dns_zone}".encode()
if ctx.dns_zone else b""
)
notes: list[str] = []
out = blob
if b"{{CANARY_URL}}" in blob:
out = out.replace(b"{{CANARY_URL}}", callback_url)
notes.append(f"substituted {{{{CANARY_URL}}}} -> {callback_url.decode()}")
if b"{{CANARY_HOST}}" in blob and callback_host:
out = out.replace(b"{{CANARY_HOST}}", callback_host)
notes.append(f"substituted {{{{CANARY_HOST}}}} -> {callback_host.decode()}")
if not notes:
# No placeholders — append a comment line at the end.
prefix = _comment_prefix(blob)
tail = (
b"\n" + prefix + b"see " + callback_url
+ b" for the latest version\n"
)
out = (out if out.endswith(b"\n") else out + b"\n") + tail
notes.append(
f"appended comment line carrying {callback_url.decode()}"
)
return CanaryArtifact(
path=target_path,
content=out,
mode=0o644,
mtime_offset=-86400 * 7,
instrumenter=self.name,
notes=notes,
)

View File

@@ -0,0 +1,95 @@
"""XLSX instrumenter — embed an external-image link.
XLSX is structurally identical to DOCX (Office Open XML zip). The
injection target is the workbook's relationships file
(``xl/_rels/workbook.xml.rels``). We add an external image
relationship there; Excel/LibreOffice fetch external images on
workbook open in the same way Word does.
We don't inject a ``<drawing>`` element into a sheet because that
requires touching ``xl/worksheets/sheetN.xml`` *and* allocating a new
``xl/drawings/drawingN.xml`` part — much higher chance of mangling
the file. An orphan external image relationship is enough: many
Office viewers fetch all relationships at open time regardless of
whether they're referenced from a sheet.
If the operator wants a stronger trigger (image visible in the
sheet, fetched even by viewers that lazy-load external resources)
they should embed the slug as a hyperlink cell content via the
``plain``/``passthrough`` instrumenters.
"""
from __future__ import annotations
import io
import zipfile
from typing import Tuple
from decnet.canary.base import (
CanaryArtifact,
CanaryContext,
CanaryInstrumenter,
InstrumenterRejectedError,
)
from decnet.canary.instrumenters.docx import _inject_relationship, _next_rid
_RELS_PATHS = (
"xl/_rels/workbook.xml.rels",
"xl/_rels/sharedStrings.xml.rels",
)
def _mutate(blob: bytes, url: str) -> Tuple[bytes, str, str]:
try:
with zipfile.ZipFile(io.BytesIO(blob), "r") as zf:
members = [(zi, zf.read(zi.filename)) for zi in zf.infolist()]
except zipfile.BadZipFile as e:
raise InstrumenterRejectedError("uploaded blob is not a valid XLSX zip") from e
target_rels: str | None = None
for zi, _ in members:
if zi.filename in _RELS_PATHS:
target_rels = zi.filename
break
if not target_rels:
raise InstrumenterRejectedError(
"XLSX has no workbook relationships file to mutate"
)
out_members = []
rid = ""
for zi, data in members:
if zi.filename == target_rels:
rid = _next_rid(data)
data = _inject_relationship(data, rid, url)
out_members.append((zi, data))
out = io.BytesIO()
with zipfile.ZipFile(out, "w", zipfile.ZIP_DEFLATED) as zf_out:
for zi, data in out_members:
zf_out.writestr(zi, data)
return out.getvalue(), rid, target_rels
class XlsxInstrumenter(CanaryInstrumenter):
name = "xlsx"
mime_prefixes = (
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
def instrument(
self, blob: bytes, ctx: CanaryContext, *, target_path: str,
) -> CanaryArtifact:
url = f"{ctx.http_base.rstrip('/')}/c/{ctx.callback_token}"
mutated, rid, target_rels = _mutate(blob, url)
return CanaryArtifact(
path=target_path,
content=mutated,
mode=0o644,
mtime_offset=-86400 * 14,
instrumenter=self.name,
notes=[
f"injected external-image relationship {rid} into "
f"{target_rels} -> {url}",
],
)

88
tests/canary/conftest.py Normal file
View File

@@ -0,0 +1,88 @@
"""Shared fixtures for canary tests — minimal DOCX/XLSX/HTML/PDF fixtures.
We synthesise the OOXML zips inline rather than checking real binary
fixtures into the repo. Keeps the test surface portable and the diff
reviewable; the smallest valid DOCX is ~12 files but Word/LibreOffice
both accept a stripped-down skeleton with just ``[Content_Types].xml``,
``_rels/.rels``, ``word/document.xml``, and ``word/_rels/document.xml.rels``.
"""
from __future__ import annotations
import io
import zipfile
import pytest
_DOCX_CONTENT_TYPES = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<Types xmlns="http://schemas.openxmlformats.org/package/2006/content-types">'
'<Default Extension="xml" ContentType="application/xml"/>'
'<Default Extension="rels" ContentType="application/vnd.openxmlformats-package.relationships+xml"/>'
'<Override PartName="/word/document.xml" '
'ContentType="application/vnd.openxmlformats-officedocument.wordprocessingml.document.main+xml"/>'
'</Types>'
)
_DOCX_PACKAGE_RELS = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">'
'<Relationship Id="rId1" '
'Type="http://schemas.openxmlformats.org/officeDocument/2006/relationships/officeDocument" '
'Target="word/document.xml"/>'
'</Relationships>'
)
_DOCX_DOCUMENT = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<w:document xmlns:w="http://schemas.openxmlformats.org/wordprocessingml/2006/main">'
'<w:body><w:p><w:r><w:t>Existing content.</w:t></w:r></w:p></w:body>'
'</w:document>'
)
_DOCX_DOCUMENT_RELS = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">'
'</Relationships>'
)
@pytest.fixture
def minimal_docx() -> bytes:
"""Return a tiny but structurally valid DOCX as bytes."""
out = io.BytesIO()
with zipfile.ZipFile(out, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr("[Content_Types].xml", _DOCX_CONTENT_TYPES)
zf.writestr("_rels/.rels", _DOCX_PACKAGE_RELS)
zf.writestr("word/document.xml", _DOCX_DOCUMENT)
zf.writestr("word/_rels/document.xml.rels", _DOCX_DOCUMENT_RELS)
return out.getvalue()
_XLSX_CONTENT_TYPES = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<Types xmlns="http://schemas.openxmlformats.org/package/2006/content-types">'
'<Default Extension="xml" ContentType="application/xml"/>'
'<Default Extension="rels" ContentType="application/vnd.openxmlformats-package.relationships+xml"/>'
'<Override PartName="/xl/workbook.xml" '
'ContentType="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet.main+xml"/>'
'</Types>'
)
_XLSX_WORKBOOK_RELS = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>'
'<Relationships xmlns="http://schemas.openxmlformats.org/package/2006/relationships">'
'</Relationships>'
)
@pytest.fixture
def minimal_xlsx() -> bytes:
"""Return a tiny but structurally valid XLSX as bytes."""
out = io.BytesIO()
with zipfile.ZipFile(out, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr("[Content_Types].xml", _XLSX_CONTENT_TYPES)
zf.writestr("_rels/.rels", _DOCX_PACKAGE_RELS.replace("word/document.xml", "xl/workbook.xml"))
zf.writestr("xl/workbook.xml", '<workbook/>')
zf.writestr("xl/_rels/workbook.xml.rels", _XLSX_WORKBOOK_RELS)
return out.getvalue()

View File

@@ -0,0 +1,173 @@
"""Coverage for the operator-upload instrumenters.
Each instrumenter is round-tripped against a small, real-shaped
fixture. We assert:
* the callback URL ends up somewhere in the mutated bytes;
* the output still parses (zip stays a valid zip; HTML stays
reasonable);
* the rejection paths surface :class:`InstrumenterRejectedError`
with a useful message.
"""
from __future__ import annotations
import io
import zipfile
import pytest
from decnet.canary import CanaryContext, get_instrumenter
from decnet.canary.base import InstrumenterRejectedError
def _ctx(slug: str = "slug-abc") -> CanaryContext:
return CanaryContext(
callback_token=slug,
http_base="https://canary.example.test",
dns_zone="canary.example.test",
persona="linux",
)
# ----------------------- passthrough ------------------------------------
def test_passthrough_preserves_bytes() -> None:
ins = get_instrumenter("passthrough")
out = ins.instrument(b"\x00\x01\x02bin", _ctx(), target_path="/tmp/x.bin")
assert out.content == b"\x00\x01\x02bin"
assert out.path == "/tmp/x.bin"
assert out.instrumenter == "passthrough"
# ----------------------- plain ------------------------------------------
def test_plain_substitutes_url_placeholder() -> None:
ins = get_instrumenter("plain")
blob = b"api: {{CANARY_URL}}\nhost: {{CANARY_HOST}}\n"
out = ins.instrument(blob, _ctx("slugXYZ"), target_path="/etc/x.yaml")
assert b"https://canary.example.test/c/slugXYZ" in out.content
assert b"slugXYZ.canary.example.test" in out.content
assert b"{{CANARY_URL}}" not in out.content
def test_plain_appends_when_no_placeholder() -> None:
ins = get_instrumenter("plain")
out = ins.instrument(b"key=value\n", _ctx("s1"), target_path="/etc/x.env")
assert b"https://canary.example.test/c/s1" in out.content
# Original content survives.
assert out.content.startswith(b"key=value\n")
@pytest.mark.parametrize(
"head, expect_prefix",
[
(b"[default]\nfoo=1\n", b"; "),
(b"// js code\nconst x = 1;\n", b"// "),
(b"#!/bin/bash\necho hi\n", b"# "),
],
)
def test_plain_picks_comment_prefix(head: bytes, expect_prefix: bytes) -> None:
ins = get_instrumenter("plain")
out = ins.instrument(head, _ctx(), target_path="/etc/x")
# The appended comment line uses the matching prefix.
appended = out.content[len(head):]
assert appended.lstrip(b"\n").startswith(expect_prefix)
# ----------------------- html -------------------------------------------
def test_html_injects_pixel_before_body_close() -> None:
ins = get_instrumenter("html")
blob = b"<html><body><h1>hi</h1></body></html>"
out = ins.instrument(blob, _ctx("slugH"), target_path="/srv/x.html")
assert b"https://canary.example.test/c/slugH" in out.content
# Pixel sits before </body>, not after.
body_close = out.content.index(b"</body>")
pixel_pos = out.content.index(b"<img ")
assert pixel_pos < body_close
# Original markup survives intact.
assert b"<h1>hi</h1>" in out.content
def test_html_appends_pixel_when_body_missing() -> None:
ins = get_instrumenter("html")
out = ins.instrument(b"<p>no body</p>", _ctx(), target_path="/srv/x.html")
assert out.content.endswith(b">\n") or out.content.endswith(b'>\n')
assert b"<img" in out.content
# ----------------------- docx -------------------------------------------
def test_docx_injects_external_image_relationship(minimal_docx: bytes) -> None:
ins = get_instrumenter("docx")
out = ins.instrument(minimal_docx, _ctx("slugD"), target_path="/x/r.docx")
# Output is still a valid zip we can re-open.
with zipfile.ZipFile(io.BytesIO(out.content), "r") as zf:
rels = zf.read("word/_rels/document.xml.rels").decode()
doc = zf.read("word/document.xml").decode()
assert "https://canary.example.test/c/slugD" in rels
assert "TargetMode=\"External\"" in rels
assert "image" in rels
# Drawing is embedded in the document body, before </w:body>.
assert "<w:drawing>" in doc
assert doc.index("<w:drawing>") < doc.index("</w:body>")
def test_docx_rejects_non_zip() -> None:
ins = get_instrumenter("docx")
with pytest.raises(InstrumenterRejectedError, match="not a valid DOCX"):
ins.instrument(b"not a docx at all", _ctx(), target_path="/x")
def test_docx_rejects_zip_missing_members() -> None:
ins = get_instrumenter("docx")
out = io.BytesIO()
with zipfile.ZipFile(out, "w") as zf:
zf.writestr("readme.txt", "hello")
with pytest.raises(InstrumenterRejectedError, match="missing expected member"):
ins.instrument(out.getvalue(), _ctx(), target_path="/x")
# ----------------------- xlsx -------------------------------------------
def test_xlsx_injects_relationship(minimal_xlsx: bytes) -> None:
ins = get_instrumenter("xlsx")
out = ins.instrument(minimal_xlsx, _ctx("slugX"), target_path="/x/r.xlsx")
with zipfile.ZipFile(io.BytesIO(out.content), "r") as zf:
rels = zf.read("xl/_rels/workbook.xml.rels").decode()
assert "https://canary.example.test/c/slugX" in rels
assert "TargetMode=\"External\"" in rels
def test_xlsx_rejects_zip_without_workbook_rels() -> None:
ins = get_instrumenter("xlsx")
out = io.BytesIO()
with zipfile.ZipFile(out, "w") as zf:
zf.writestr("readme.txt", "hello")
with pytest.raises(InstrumenterRejectedError, match="no workbook relationships"):
ins.instrument(out.getvalue(), _ctx(), target_path="/x")
# ----------------------- pdf / image (optional dep) ---------------------
def test_pdf_rejects_when_pikepdf_missing() -> None:
pytest.importorskip # noqa: B018 — fence below
try:
import pikepdf # noqa: F401
except ImportError:
ins = get_instrumenter("pdf")
with pytest.raises(InstrumenterRejectedError, match="pikepdf"):
ins.instrument(b"%PDF-1.4\n", _ctx(), target_path="/x.pdf")
else:
pytest.skip("pikepdf is installed; skipping the missing-dep guard")
def test_image_rejects_when_pillow_missing() -> None:
try:
import PIL # noqa: F401
except ImportError:
ins = get_instrumenter("image")
with pytest.raises(InstrumenterRejectedError, match="Pillow"):
ins.instrument(b"\x89PNG\r\n", _ctx(), target_path="/x.png")
else:
pytest.skip("Pillow is installed; skipping the missing-dep guard")