feat(canary): synthesised-artifact generators + tests
Five built-in generators that produce deterministic fake artifacts
keyed by the token slug:
- aws_creds — passive [default]/[prod] credentials block, no
callback wiring (AWS-key tokens require an external
trap, which is post-v1)
- git_config — .git/config with origin url = http_base/c/<slug>/repo.git
- env_file — .env with API_BASE_URL + WEBHOOK_NOTIFY_URL embedding
the callback URL plus inert realism filler
- ssh_key — PEM-shaped fake private key whose host comment carries
<slug>.<dns_zone> when DNS is deployed, else the
http_base host
- honeydoc — minimal HTML report with a 1x1 tracking-pixel <img>
whose src is the callback URL; fallback for the
deploy-time baseline before the operator uploads a
real DOCX/PDF
Tests assert byte-stability (same ctx -> same bytes), slug presence
in the embedded fields, that aws_creds is intentionally URL-free,
and that every artifact carries operator-facing notes for the
preview endpoint.
This commit is contained in:
7
decnet/canary/generators/__init__.py
Normal file
7
decnet/canary/generators/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
"""Built-in canary generators (synthesised fake artifacts).
|
||||
|
||||
Concrete classes live in sibling modules and are imported lazily by
|
||||
:func:`decnet.canary.factory.get_generator` to keep the import-time
|
||||
cost of :mod:`decnet.canary` cheap for callers that only need the
|
||||
ABCs.
|
||||
"""
|
||||
86
decnet/canary/generators/aws_creds.py
Normal file
86
decnet/canary/generators/aws_creds.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""Fake ``~/.aws/credentials`` block (passive bait).
|
||||
|
||||
This is the **passive** variant — no callback wiring. An attacker
|
||||
who exfils these keys can't trip a detection unless we run a real
|
||||
AWS account with a deny-all CloudTrail listener (post-v1). The
|
||||
realism is the point: the file looks like a routinely used credentials
|
||||
file, so the rest of the decky's persona feels lived-in.
|
||||
|
||||
If the operator picks ``kind="aws_passive"`` we accept that no slug
|
||||
will be embedded. If they pick ``kind="http"`` or ``kind="dns"`` for
|
||||
this generator, the API will reject the combination with a 400 — AWS
|
||||
keys have no plausible field where a URL or hostname survives a
|
||||
``grep -E '[A-Z0-9]{20}'`` smell test.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
from secrets import token_urlsafe
|
||||
|
||||
from decnet.canary.base import CanaryArtifact, CanaryContext, CanaryGenerator
|
||||
|
||||
|
||||
# Stable AWS-style key body derived from the slug. Keeping the
|
||||
# generator deterministic (per-slug) means re-seeding produces the
|
||||
# same bytes — the planter is naturally idempotent and an operator
|
||||
# who runs ``decnet canary verify`` can re-derive the expected file
|
||||
# without touching the DB.
|
||||
|
||||
def _fake_access_key(seed: str) -> str:
|
||||
# AWS access keys are 20 chars, uppercase alphanum, AKIA prefix.
|
||||
body = hashlib.sha256(seed.encode()).hexdigest().upper()
|
||||
return "AKIA" + body[:16]
|
||||
|
||||
|
||||
def _fake_secret_key(seed: str) -> str:
|
||||
# AWS secret keys are 40 chars, mixed-case base64-ish. We use
|
||||
# base64-safe characters from token_urlsafe seeded by a SHA-256
|
||||
# of the seed so the output is stable per slug.
|
||||
h = hashlib.sha256(("secret:" + seed).encode()).digest()
|
||||
# Reuse token_urlsafe for the alphabet but pad to 40 chars from
|
||||
# the deterministic bytes so we don't depend on os.urandom.
|
||||
import base64
|
||||
return base64.b64encode(h)[:40].decode()
|
||||
|
||||
|
||||
class AWSCredsGenerator(CanaryGenerator):
|
||||
name = "aws_creds"
|
||||
|
||||
def generate(self, ctx: CanaryContext) -> CanaryArtifact:
|
||||
seed = ctx.callback_token
|
||||
access = _fake_access_key(seed)
|
||||
secret = _fake_secret_key(seed)
|
||||
body = (
|
||||
"[default]\n"
|
||||
f"aws_access_key_id = {access}\n"
|
||||
f"aws_secret_access_key = {secret}\n"
|
||||
"region = us-east-1\n"
|
||||
"\n"
|
||||
"[prod]\n"
|
||||
f"aws_access_key_id = {_fake_access_key('prod-' + seed)}\n"
|
||||
f"aws_secret_access_key = {_fake_secret_key('prod-' + seed)}\n"
|
||||
"region = us-west-2\n"
|
||||
)
|
||||
return CanaryArtifact(
|
||||
path="", # caller (planter) fills this from CanaryToken.placement_path
|
||||
content=body.encode("utf-8"),
|
||||
mode=0o600,
|
||||
mtime_offset=-86400 * 14, # 2 weeks ago — looks lived-in
|
||||
generator=self.name,
|
||||
notes=[
|
||||
"fake AWS keys; no callback embedded — passive bait only",
|
||||
f"derived deterministically from slug={seed}",
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
# Re-exported so the slug helper is reusable from the
|
||||
# instrumenters/passthrough module without an internal import path.
|
||||
__all__ = ["AWSCredsGenerator", "_fake_access_key", "_fake_secret_key"]
|
||||
|
||||
|
||||
# Imports at the bottom keep the public dataclasses on top — pylint
|
||||
# doesn't run on this repo, but tests do, and putting ``token_urlsafe``
|
||||
# in a public symbol confuses readers. Suppress the unused warning by
|
||||
# referencing it once.
|
||||
_ = token_urlsafe
|
||||
56
decnet/canary/generators/env_file.py
Normal file
56
decnet/canary/generators/env_file.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""Fake ``.env`` with embedded callback URLs.
|
||||
|
||||
Modern web stacks read environment variables for everything from
|
||||
database DSNs to webhook URLs, so dropping a few realistic-looking
|
||||
``KEY=value`` pairs alongside the canary URL is unremarkable. The
|
||||
slug appears in two fields:
|
||||
|
||||
* ``API_BASE_URL`` — the obvious one; an attacker scripting against
|
||||
the credentials hits the worker on first invocation.
|
||||
* ``WEBHOOK_NOTIFY_URL`` — secondary, in case the attacker greps for
|
||||
``WEBHOOK`` and pivots there.
|
||||
|
||||
Other fields (``DB_PASSWORD``, ``REDIS_URL``, ``JWT_SECRET``) are
|
||||
plausible but inert — they're realism filler, not detection
|
||||
mechanisms.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
|
||||
from decnet.canary.base import CanaryArtifact, CanaryContext, CanaryGenerator
|
||||
|
||||
|
||||
def _stable_token(seed: str, prefix: str = "") -> str:
|
||||
h = hashlib.sha256((prefix + seed).encode()).hexdigest()
|
||||
return h[:32]
|
||||
|
||||
|
||||
class EnvFileGenerator(CanaryGenerator):
|
||||
name = "env_file"
|
||||
|
||||
def generate(self, ctx: CanaryContext) -> CanaryArtifact:
|
||||
base = ctx.http_base.rstrip("/")
|
||||
slug = ctx.callback_token
|
||||
api_url = f"{base}/c/{slug}"
|
||||
body = (
|
||||
"# Production environment — DO NOT COMMIT\n"
|
||||
f"API_BASE_URL={api_url}\n"
|
||||
f"WEBHOOK_NOTIFY_URL={api_url}/webhook\n"
|
||||
f"DB_PASSWORD={_stable_token(slug, 'db:')}\n"
|
||||
f"REDIS_URL=redis://:{_stable_token(slug, 'redis:')[:16]}@redis.internal:6379/0\n"
|
||||
f"JWT_SECRET={_stable_token(slug, 'jwt:')}\n"
|
||||
"LOG_LEVEL=info\n"
|
||||
"ENVIRONMENT=production\n"
|
||||
)
|
||||
return CanaryArtifact(
|
||||
path="",
|
||||
content=body.encode("utf-8"),
|
||||
mode=0o600,
|
||||
mtime_offset=-86400 * 7, # last edited a week ago
|
||||
generator=self.name,
|
||||
notes=[
|
||||
f"API_BASE_URL embeds {api_url}",
|
||||
f"WEBHOOK_NOTIFY_URL embeds {api_url}/webhook",
|
||||
],
|
||||
)
|
||||
53
decnet/canary/generators/git_config.py
Normal file
53
decnet/canary/generators/git_config.py
Normal file
@@ -0,0 +1,53 @@
|
||||
"""Fake ``.git/config`` with an attacker-bait remote URL.
|
||||
|
||||
The ``[remote "origin"]`` ``url`` field is the natural place to embed
|
||||
an HTTP-callback URL: it's normal for git remotes to be HTTPS, the
|
||||
URL is read by every git command an attacker runs (``git pull``,
|
||||
``git fetch``, ``git remote -v``), and the slug fits naturally as
|
||||
part of a path.
|
||||
|
||||
The generator emits a plausible private-mirror remote (``git.<org>``
|
||||
or the canary host's hostname) so an attacker doesn't immediately
|
||||
recognise it as a honeypot. The slug ends up in the URL path:
|
||||
|
||||
[remote "origin"]
|
||||
url = https://canary.example.test/c/<slug>/repo.git
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from decnet.canary.base import CanaryArtifact, CanaryContext, CanaryGenerator
|
||||
|
||||
|
||||
class GitConfigGenerator(CanaryGenerator):
|
||||
name = "git_config"
|
||||
|
||||
def generate(self, ctx: CanaryContext) -> CanaryArtifact:
|
||||
# Strip trailing slash defensively — operator may have
|
||||
# configured DECNET_CANARY_HTTP_BASE either way.
|
||||
base = ctx.http_base.rstrip("/")
|
||||
slug = ctx.callback_token
|
||||
# The /c/<slug>/repo.git suffix gives us a realistic-looking
|
||||
# path the worker can route on a single ``startswith("/c/")``
|
||||
# check, while still surviving a quick grep for the slug.
|
||||
url = f"{base}/c/{slug}/repo.git"
|
||||
body = (
|
||||
"[core]\n"
|
||||
"\trepositoryformatversion = 0\n"
|
||||
"\tfilemode = true\n"
|
||||
"\tbare = false\n"
|
||||
"\tlogallrefupdates = true\n"
|
||||
"[remote \"origin\"]\n"
|
||||
f"\turl = {url}\n"
|
||||
"\tfetch = +refs/heads/*:refs/remotes/origin/*\n"
|
||||
"[branch \"main\"]\n"
|
||||
"\tremote = origin\n"
|
||||
"\tmerge = refs/heads/main\n"
|
||||
)
|
||||
return CanaryArtifact(
|
||||
path="",
|
||||
content=body.encode("utf-8"),
|
||||
mode=0o644,
|
||||
mtime_offset=-86400 * 30, # checked out a month ago
|
||||
generator=self.name,
|
||||
notes=[f"git remote 'origin' embeds {url}"],
|
||||
)
|
||||
61
decnet/canary/generators/honeydoc.py
Normal file
61
decnet/canary/generators/honeydoc.py
Normal file
@@ -0,0 +1,61 @@
|
||||
"""Built-in honeydoc — a minimal HTML "report" with a tracking pixel.
|
||||
|
||||
This is the *fallback* honeydoc used when the operator hasn't
|
||||
uploaded a real document. The HTML instrumenter handles operator
|
||||
uploads via :mod:`decnet.canary.instrumenters.html`; this generator
|
||||
exists so the deploy-time baseline can plant *something* convincing
|
||||
without first prompting the operator to drop a file.
|
||||
|
||||
The realism here is intentionally modest: a Documents-folder HTML
|
||||
page with internal-looking content and a 1×1 remote image at the
|
||||
bottom whose ``src`` is the canary callback URL. Most desktop
|
||||
HTML renderers fetch the image as soon as the file is opened in a
|
||||
browser preview, so opening the doc trips the callback.
|
||||
|
||||
Operators who want a richer artifact should upload their own DOCX
|
||||
or PDF; the corresponding instrumenter embeds the same callback in
|
||||
the appropriate format.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from decnet.canary.base import CanaryArtifact, CanaryContext, CanaryGenerator
|
||||
|
||||
|
||||
class HoneydocGenerator(CanaryGenerator):
|
||||
name = "honeydoc"
|
||||
|
||||
def generate(self, ctx: CanaryContext) -> CanaryArtifact:
|
||||
base = ctx.http_base.rstrip("/")
|
||||
slug = ctx.callback_token
|
||||
pixel_url = f"{base}/c/{slug}"
|
||||
body = (
|
||||
"<!DOCTYPE html>\n"
|
||||
"<html lang=\"en\">\n"
|
||||
"<head>\n"
|
||||
"<meta charset=\"utf-8\">\n"
|
||||
"<title>Q3 Operations Review — DRAFT</title>\n"
|
||||
"</head>\n"
|
||||
"<body>\n"
|
||||
"<h1>Q3 Operations Review (DRAFT — DO NOT DISTRIBUTE)</h1>\n"
|
||||
"<p>Forecast and remediation timeline below. Numbers are\n"
|
||||
"preliminary and subject to revision before the all-hands.</p>\n"
|
||||
"<table>\n"
|
||||
"<tr><th>Region</th><th>Incidents</th><th>MTTR (h)</th></tr>\n"
|
||||
"<tr><td>us-east</td><td>14</td><td>3.2</td></tr>\n"
|
||||
"<tr><td>us-west</td><td>9</td><td>4.7</td></tr>\n"
|
||||
"<tr><td>eu-central</td><td>22</td><td>2.1</td></tr>\n"
|
||||
"</table>\n"
|
||||
"<p>Internal contact: <a href=\"mailto:secops@internal\">"
|
||||
"secops@internal</a></p>\n"
|
||||
f"<img src=\"{pixel_url}\" width=\"1\" height=\"1\" alt=\"\">\n"
|
||||
"</body>\n"
|
||||
"</html>\n"
|
||||
)
|
||||
return CanaryArtifact(
|
||||
path="",
|
||||
content=body.encode("utf-8"),
|
||||
mode=0o644, # docs are typically world-readable
|
||||
mtime_offset=-86400 * 21, # 3 weeks ago
|
||||
generator=self.name,
|
||||
notes=[f"tracking pixel src={pixel_url}"],
|
||||
)
|
||||
68
decnet/canary/generators/ssh_key.py
Normal file
68
decnet/canary/generators/ssh_key.py
Normal file
@@ -0,0 +1,68 @@
|
||||
"""Fake SSH private key with the callback host in the comment.
|
||||
|
||||
OpenSSH private keys carry a free-form comment field — typically
|
||||
``user@host`` — that's preserved across rounds of ``ssh-keygen -p``.
|
||||
We embed the canary host as the ``user@host`` so an attacker who
|
||||
imports the key into their own keyring or runs ``ssh-keygen -lf`` on
|
||||
it sees a hostname they may then try to reach.
|
||||
|
||||
The key bytes themselves are syntactically valid (PEM envelope, base64
|
||||
body) but cryptographically junk — the body is a deterministic SHA-256
|
||||
hash of the slug repeated to the right length. We don't ship a real
|
||||
RSA/Ed25519 key because (a) we don't want a real private key sitting
|
||||
on disk pretending to be valuable, and (b) the attacker ``cat``-ing
|
||||
the file or running ``ssh -i`` will trigger the callback regardless
|
||||
of cryptographic validity.
|
||||
|
||||
The DNS-callback variant uses ``<slug>.canary.<dns_zone>`` as the
|
||||
hostname so a bare ``ssh-keygen -lf`` on the file resolves a unique
|
||||
subdomain even if the attacker never hits HTTP.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import hashlib
|
||||
|
||||
from decnet.canary.base import CanaryArtifact, CanaryContext, CanaryGenerator
|
||||
|
||||
|
||||
def _fake_key_body(seed: str) -> str:
|
||||
# Real OpenSSH keys are several hundred base64 chars; we make a
|
||||
# plausible-looking 24-line block from a SHA-256-derived stream.
|
||||
h = hashlib.sha256(seed.encode()).digest()
|
||||
long_stream = (h * 32)[:768] # 768 bytes → ~1024 base64 chars
|
||||
encoded = base64.b64encode(long_stream).decode()
|
||||
# Wrap at 70 chars per line — same shape ``ssh-keygen`` produces.
|
||||
return "\n".join(encoded[i:i + 70] for i in range(0, len(encoded), 70))
|
||||
|
||||
|
||||
class SSHKeyGenerator(CanaryGenerator):
|
||||
name = "ssh_key"
|
||||
|
||||
def generate(self, ctx: CanaryContext) -> CanaryArtifact:
|
||||
slug = ctx.callback_token
|
||||
body = _fake_key_body(slug)
|
||||
# Hostname for the comment: prefer DNS-zone form when the
|
||||
# operator has DNS deployed (so ssh-keygen -lf names a subdomain
|
||||
# the attacker may resolve); fall back to the http_base host
|
||||
# otherwise.
|
||||
if ctx.dns_zone:
|
||||
host_comment = f"deploy@{slug}.{ctx.dns_zone}"
|
||||
else:
|
||||
from urllib.parse import urlparse
|
||||
host = urlparse(ctx.http_base).hostname or "deploy.local"
|
||||
host_comment = f"deploy@{host}"
|
||||
content = (
|
||||
"-----BEGIN OPENSSH PRIVATE KEY-----\n"
|
||||
f"{body}\n"
|
||||
"-----END OPENSSH PRIVATE KEY-----\n"
|
||||
f"# {host_comment}\n"
|
||||
)
|
||||
return CanaryArtifact(
|
||||
path="",
|
||||
content=content.encode("utf-8"),
|
||||
mode=0o600,
|
||||
mtime_offset=-86400 * 60, # 2 months ago
|
||||
generator=self.name,
|
||||
notes=[f"comment line embeds {host_comment}"],
|
||||
)
|
||||
115
tests/canary/test_generators.py
Normal file
115
tests/canary/test_generators.py
Normal file
@@ -0,0 +1,115 @@
|
||||
"""Coverage for the synthesised-artifact generators.
|
||||
|
||||
Each generator MUST be deterministic for a given ``CanaryContext`` —
|
||||
the planter relies on that idempotency to re-seed without storing
|
||||
the rendered bytes. We assert byte-for-byte stability across two
|
||||
calls with the same inputs as well as the obvious "slug appears in
|
||||
the artifact" property.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
|
||||
import pytest
|
||||
|
||||
from decnet.canary import CanaryContext, get_generator
|
||||
from decnet.canary.factory import KNOWN_GENERATORS
|
||||
|
||||
|
||||
def _ctx(**kw) -> CanaryContext:
|
||||
defaults = dict(
|
||||
callback_token="abcDEF123-test",
|
||||
http_base="https://canary.example.test",
|
||||
dns_zone="canary.example.test",
|
||||
persona="linux",
|
||||
)
|
||||
defaults.update(kw)
|
||||
return CanaryContext(**defaults)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name", KNOWN_GENERATORS)
|
||||
def test_generator_is_deterministic(name: str) -> None:
|
||||
g = get_generator(name)
|
||||
a = g.generate(_ctx())
|
||||
b = g.generate(_ctx())
|
||||
assert a.content == b.content, f"{name} not deterministic"
|
||||
assert a.generator == name
|
||||
assert a.instrumenter is None
|
||||
assert a.mode in (0o600, 0o644)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("name", ["git_config", "env_file", "honeydoc"])
|
||||
def test_callback_url_embedded(name: str) -> None:
|
||||
g = get_generator(name)
|
||||
art = g.generate(_ctx(callback_token="slug-XYZ"))
|
||||
body = art.content.decode("utf-8")
|
||||
assert "slug-XYZ" in body, f"{name} did not embed slug"
|
||||
assert "https://canary.example.test" in body
|
||||
|
||||
|
||||
def test_aws_creds_passive_does_not_embed_url() -> None:
|
||||
# AWS creds are passive — there's no realistic field to hide a URL
|
||||
# in. Asserting the absence prevents a regression where a future
|
||||
# change tries to slip the slug into a comment and breaks realism.
|
||||
g = get_generator("aws_creds")
|
||||
art = g.generate(_ctx(callback_token="slug-XYZ"))
|
||||
body = art.content.decode("utf-8")
|
||||
assert "https://" not in body
|
||||
assert "slug-XYZ" not in body
|
||||
# Access key matches the AKIA[A-Z0-9]{16} shape.
|
||||
assert re.search(r"AKIA[A-Z0-9]{16}", body)
|
||||
|
||||
|
||||
def test_aws_creds_changes_with_slug() -> None:
|
||||
g = get_generator("aws_creds")
|
||||
a = g.generate(_ctx(callback_token="slug-A"))
|
||||
b = g.generate(_ctx(callback_token="slug-B"))
|
||||
assert a.content != b.content
|
||||
|
||||
|
||||
def test_ssh_key_uses_dns_zone_when_available() -> None:
|
||||
g = get_generator("ssh_key")
|
||||
art = g.generate(_ctx(callback_token="slugZ", dns_zone="canary.test"))
|
||||
assert b"slugZ.canary.test" in art.content
|
||||
|
||||
|
||||
def test_ssh_key_falls_back_to_http_host_without_dns() -> None:
|
||||
g = get_generator("ssh_key")
|
||||
art = g.generate(_ctx(
|
||||
http_base="https://example.test", dns_zone="",
|
||||
))
|
||||
assert b"example.test" in art.content
|
||||
|
||||
|
||||
def test_honeydoc_html_is_valid_ish_html() -> None:
|
||||
g = get_generator("honeydoc")
|
||||
art = g.generate(_ctx())
|
||||
body = art.content.decode("utf-8")
|
||||
assert "<!DOCTYPE html>" in body
|
||||
assert "<img" in body
|
||||
assert "width=\"1\" height=\"1\"" in body
|
||||
|
||||
|
||||
def test_git_config_remote_url_shape() -> None:
|
||||
g = get_generator("git_config")
|
||||
art = g.generate(_ctx(callback_token="slug42"))
|
||||
body = art.content.decode("utf-8")
|
||||
assert "[remote \"origin\"]" in body
|
||||
assert "https://canary.example.test/c/slug42/repo.git" in body
|
||||
|
||||
|
||||
def test_env_file_carries_two_callback_fields() -> None:
|
||||
g = get_generator("env_file")
|
||||
art = g.generate(_ctx(callback_token="slugEnv"))
|
||||
body = art.content.decode("utf-8")
|
||||
assert "API_BASE_URL=https://canary.example.test/c/slugEnv" in body
|
||||
assert "WEBHOOK_NOTIFY_URL=https://canary.example.test/c/slugEnv/webhook" in body
|
||||
|
||||
|
||||
def test_artifacts_carry_notes() -> None:
|
||||
# Notes drive the API ``preview`` endpoint so operators can sanity-
|
||||
# check what we did before the file lands. Empty notes would mean
|
||||
# the operator is staring at opaque bytes.
|
||||
for name in KNOWN_GENERATORS:
|
||||
art = get_generator(name).generate(_ctx())
|
||||
assert art.notes, f"{name} produced no notes"
|
||||
Reference in New Issue
Block a user