feat(canary): mysql_dump generator with phone-home replica payload

Mirrors the Canarytokens.org trick: a base64-wrapped CHANGE REPLICATION
SOURCE TO + START REPLICA block in the dump trailer. Importing the
file into MySQL resolves <slug>.<dns_zone> (DNS trip) and opens a 3306
replica handshake whose SOURCE_USER smuggles @@hostname and
@@lc_time_names of the victim DB.

DNS lookup alone is sufficient for detection via the existing canary
dns_server; capturing the smuggled metadata via a 3306 handshake
responder is a follow-up.
This commit is contained in:
2026-04-27 13:52:55 -04:00
parent 5ac8e0f91a
commit 6376523923
4 changed files with 237 additions and 1 deletions

View File

@@ -60,7 +60,7 @@ def test_known_lists_are_stable() -> None:
# surfaces it. Keeps the schema-of-record in one place.
assert KNOWN_GENERATORS == (
"git_config", "env_file", "ssh_key", "aws_creds",
"honeydoc", "honeydoc_docx", "honeydoc_pdf",
"honeydoc", "honeydoc_docx", "honeydoc_pdf", "mysql_dump",
)
assert KNOWN_INSTRUMENTERS == (
"docx", "xlsx", "pdf", "html", "image", "plain", "passthrough",

View File

@@ -137,6 +137,48 @@ def test_env_file_carries_two_callback_fields() -> None:
assert "WEBHOOK_NOTIFY_URL=https://canary.example.test/c/slugEnv/webhook" in body
def test_mysql_dump_requires_dns_zone() -> None:
g = get_generator("mysql_dump")
with pytest.raises(ValueError, match="dns_zone"):
g.generate(_ctx(dns_zone=""))
def test_mysql_dump_payload_round_trips_through_base64() -> None:
import base64 as _b64
g = get_generator("mysql_dump")
art = g.generate(_ctx(callback_token="slugSQL", dns_zone="canary.test"))
body = art.content.decode("utf-8")
# Slug must NOT appear in plaintext — the camouflage is base64.
assert "slugSQL" not in body.replace("\n", " ").split("SET @b = '")[0]
# Locate the base64 blob and decode it; the inner SQL must reference
# the slug-bearing replica host, smuggle @@hostname/@@lc_time_names
# into SOURCE_USER, and target port 3306.
m = re.search(r"SET @b = '([A-Za-z0-9+/=]+)';", body)
assert m, "expected base64 payload assignment"
inner = _b64.b64decode(m.group(1)).decode("utf-8")
assert "slugSQL.canary.test" in inner
assert "SOURCE_PORT=3306" in inner
assert "@@hostname" in inner
assert "@@lc_time_names" in inner
assert "CHANGE REPLICATION SOURCE TO" in inner
def test_mysql_dump_executes_and_starts_replica() -> None:
g = get_generator("mysql_dump")
art = g.generate(_ctx(callback_token="slugSQL2", dns_zone="canary.test"))
body = art.content.decode("utf-8")
# The PREPARE/EXECUTE/START REPLICA chain is what makes the import
# actually phone home; missing any of these silently breaks the trip.
assert "PREPARE stmt1 FROM @s2;" in body
assert "EXECUTE stmt1;" in body
assert "PREPARE stmt2 FROM @bb;" in body
assert "EXECUTE stmt2;" in body
assert "START REPLICA;" in body
# Realism: header + trailer markers that mysqldump emits.
assert body.startswith("-- MySQL dump")
assert "-- Dump completed" in body
def test_artifacts_carry_notes() -> None:
# Notes drive the API ``preview`` endpoint so operators can sanity-
# check what we did before the file lands. Empty notes would mean