The /opt/emit_capture.py, /opt/syslog_bridge.py, and /usr/libexec/udev/journal-relay files were plaintext and world-readable to any attacker root-shelled into the SSH honeypot — revealing the full capture logic on a single cat. Pack all three into /entrypoint.sh as XOR+gzip+base64 blobs at build time (_build_stealth.py), then decode in-memory at container start and exec the capture loop from a bash -c string. No .py files under /opt, no journal-relay file under /usr/libexec/udev, no argv_zap name anywhere. The LD_PRELOAD shim is installed as /usr/lib/x86_64-linux-gnu/libudev-shared.so.1 — sits next to the real libudev.so.1 and blends into the multiarch layout. A 1-byte random XOR key is chosen at image build so a bare 'base64 -d | gunzip' probe on the visible entrypoint returns binary noise instead of readable Python. Docker-dependent tests live under tests/docker/ behind a new 'docker' pytest marker (excluded from the default run, same pattern as fuzz / live / bench).
85 lines
2.3 KiB
Python
85 lines
2.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Emit an RFC 5424 `file_captured` line to stdout.
|
|
|
|
Called by capture.sh after a file drop has been mirrored into the quarantine
|
|
directory. Reads a single JSON object from stdin describing the event; emits
|
|
one syslog line that the collector parses into `logs.fields`.
|
|
|
|
The input JSON may contain arbitrary nested structures (writer cmdline,
|
|
concurrent_sessions, ss_snapshot). Bulky fields are base64-encoded into a
|
|
single `meta_json_b64` SD param — this avoids pathological characters
|
|
(`]`, `"`, `\\`) that the collector's SD-block regex cannot losslessly
|
|
round-trip when embedded directly.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import json
|
|
import os
|
|
import sys
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from syslog_bridge import syslog_line, write_syslog_file # noqa: E402
|
|
|
|
# Flat fields ride as individual SD params (searchable, rendered as pills).
|
|
# Everything else is rolled into the base64 meta blob.
|
|
_FLAT_FIELDS: tuple[str, ...] = (
|
|
"stored_as",
|
|
"sha256",
|
|
"size",
|
|
"orig_path",
|
|
"src_ip",
|
|
"src_port",
|
|
"ssh_user",
|
|
"ssh_pid",
|
|
"attribution",
|
|
"writer_pid",
|
|
"writer_comm",
|
|
"writer_uid",
|
|
"mtime",
|
|
)
|
|
|
|
|
|
def main() -> int:
|
|
raw = sys.stdin.read()
|
|
if not raw.strip():
|
|
print("emit_capture: empty stdin", file=sys.stderr)
|
|
return 1
|
|
try:
|
|
event: dict = json.loads(raw)
|
|
except json.JSONDecodeError as exc:
|
|
print(f"emit_capture: bad JSON: {exc}", file=sys.stderr)
|
|
return 1
|
|
|
|
hostname = str(event.pop("_hostname", None) or os.environ.get("HOSTNAME") or "-")
|
|
service = str(event.pop("_service", "ssh"))
|
|
event_type = str(event.pop("_event_type", "file_captured"))
|
|
|
|
fields: dict[str, str] = {}
|
|
for key in _FLAT_FIELDS:
|
|
if key in event:
|
|
value = event.pop(key)
|
|
if value is None or value == "":
|
|
continue
|
|
fields[key] = str(value)
|
|
|
|
if event:
|
|
payload = json.dumps(event, separators=(",", ":"), ensure_ascii=False, sort_keys=True)
|
|
fields["meta_json_b64"] = base64.b64encode(payload.encode("utf-8")).decode("ascii")
|
|
|
|
line = syslog_line(
|
|
service=service,
|
|
hostname=hostname,
|
|
event_type=event_type,
|
|
**fields,
|
|
)
|
|
write_syslog_file(line)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|