feat(orchestrator): MVP synthetic life-injection worker (SSH only)

Adds a new decnet orchestrate worker whose job is to keep the honeypot
ecosystem from looking suspiciously static — a frozen LAN with no
inter-host traffic and no filesystem aging is its own honeypot tell.

MVP scope:
- New OrchestratorEvent table + repo methods (purpose-built sibling
  to Log so synthetic events stay separable from attacker-driven ones).
- New orchestrator.{activity,file}.<decky_id> bus topics +
  system.orchestrator.health heartbeat.
- SSH-only driver. Traffic action runs python3 inside src container
  to TCP-connect dst:22 and read the SSH banner — real on-the-wire
  SSH-protocol traffic without shipping creds. File action drops or
  refreshes a small file via docker exec on the destination.
- Random scheduler (50/50 traffic/file when >=2 SSH-capable deckies
  are running). Diurnal shaping, role-aware pairing, and session-aware
  backoff are explicit non-goals for MVP.
- CLI registration, systemd unit (SupplementaryGroups=docker),
  worker-registry entry so the dashboard shows orchestrator health.
- 11 tests: scheduler policy, driver argv shape + injection-safety,
  end-to-end one-tick integration with FakeBus + SQLite.
This commit is contained in:
2026-04-26 19:43:20 -04:00
parent cc2deb73f7
commit 4c37ece39e
21 changed files with 972 additions and 1 deletions

View File

@@ -10,6 +10,8 @@ Token structure (NATS-style, dot-separated):
topology.{topology_id}.status
decky.{decky_id}.state
decky.{decky_id}.traffic
orchestrator.activity.{decky_id}
orchestrator.file.{decky_id}
attacker.observed
attacker.scored
attacker.session.started
@@ -46,6 +48,7 @@ IDENTITY = "identity"
CAMPAIGN = "campaign"
SYSTEM = "system"
CREDENTIAL = "credential"
ORCHESTRATOR = "orchestrator"
# ─── Leaf event-type constants (the last segment of each topic) ──────────────
@@ -160,6 +163,16 @@ CAMPAIGN_UNMERGED = "unmerged"
CREDENTIAL_CAPTURED = "captured"
CREDENTIAL_REUSE_DETECTED = "reuse.detected"
# Orchestrator event types (second token under ``orchestrator``). The
# orchestrator worker publishes one of these per synthetic action it
# drives against a decky — cheap inter-decky traffic and filesystem
# mutations whose role is to keep the honeypot from looking suspiciously
# static. Always nested with the destination decky uuid as the third
# token, so consumers can subscribe to a single decky's life-injection
# stream via ``orchestrator.*.<decky_uuid>``.
ORCHESTRATOR_ACTIVITY = "activity"
ORCHESTRATOR_FILE = "file"
# System event types.
SYSTEM_LOG = "log"
SYSTEM_BUS_HEALTH = "bus.health"
@@ -280,6 +293,18 @@ def identity(event_type: str) -> str:
return f"{IDENTITY}.{event_type}"
def orchestrator(event_type: str, decky_id: str) -> str:
"""Build ``orchestrator.<event_type>.<decky_id>``.
*event_type* should be one of :data:`ORCHESTRATOR_ACTIVITY` or
:data:`ORCHESTRATOR_FILE`. The destination decky is always the
third token so per-decky subscribers can use
``orchestrator.*.<decky_uuid>``.
"""
_reject_tokens(event_type, decky_id)
return f"{ORCHESTRATOR}.{event_type}.{decky_id}"
def system_health(worker: str) -> str:
"""Build ``system.<worker>.health``.

View File

@@ -30,6 +30,7 @@ from . import (
inventory,
lifecycle,
listener,
orchestrator,
profiler,
sniffer,
swarm,
@@ -54,7 +55,7 @@ for _mod in (
api, swarmctl, agent, updater, listener, forwarder,
swarm,
deploy, lifecycle, workers, inventory,
web, profiler, sniffer, db,
web, profiler, orchestrator, sniffer, db,
topology, bus, geoip, init, webhook,
):
_mod.register(app)

View File

@@ -0,0 +1,42 @@
from __future__ import annotations
import typer
from . import utils as _utils
from .utils import console, log
def register(app: typer.Typer) -> None:
@app.command(name="orchestrate")
def orchestrate_cmd(
interval: int = typer.Option(
60, "--interval", "-i",
help="Seconds between synthetic activity ticks",
),
daemon: bool = typer.Option(
False, "--daemon", "-d",
help="Detach to background as a daemon process",
),
) -> None:
"""Inject synthetic life (inter-decky traffic + file ops) into the fleet."""
import asyncio
from decnet.orchestrator import orchestrator_worker
from decnet.web.dependencies import repo
if daemon:
log.info("orchestrator daemonizing interval=%d", interval)
_utils._daemonize()
log.info("orchestrator starting interval=%d", interval)
console.print(
f"[bold cyan]Orchestrator starting[/] (interval: {interval}s)"
)
async def _run() -> None:
await repo.initialize()
await orchestrator_worker(repo, interval=interval)
try:
asyncio.run(_run())
except KeyboardInterrupt:
console.print("\n[yellow]Orchestrator stopped.[/]")

View File

@@ -0,0 +1,9 @@
"""DECNET orchestrator — synthetic life-injection worker.
Drives realistic-looking activity between deckies (inter-decky traffic and
in-decky filesystem mutations) so the honeypot stops looking suspiciously
static. Sole writer of the ``OrchestratorEvent`` table.
"""
from decnet.orchestrator.worker import orchestrator_worker
__all__ = ["orchestrator_worker"]

View File

@@ -0,0 +1,5 @@
"""Activity drivers for the orchestrator (MVP: SSH only)."""
from decnet.orchestrator.drivers.base import ActivityResult, Driver
from decnet.orchestrator.drivers.ssh import SSHDriver
__all__ = ["ActivityResult", "Driver", "SSHDriver"]

View File

@@ -0,0 +1,27 @@
"""Driver protocol for orchestrator actions.
Future protocols (HTTP, SMB, MySQL, …) implement this interface alongside
the SSH driver. Kept deliberately minimal — the orchestrator only needs
"run this action and tell me how it went".
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Protocol
from decnet.orchestrator.scheduler import Action
@dataclass
class ActivityResult:
"""Outcome of one driver invocation.
``payload`` is the per-action JSON envelope the worker writes to the
``OrchestratorEvent.payload`` column and to the bus event body.
"""
success: bool
payload: dict[str, Any] = field(default_factory=dict)
class Driver(Protocol):
async def run(self, action: Action) -> ActivityResult: ...

View File

@@ -0,0 +1,153 @@
"""MVP SSH-flavoured driver.
Two action shapes:
* :class:`~decnet.orchestrator.scheduler.TrafficAction` — exec a tiny
Python one-liner *inside the source decky's ssh container* that opens
TCP/22 against the destination decky's IP and reads the SSH banner.
This generates real on-the-wire SSH-protocol traffic between the two
containers (sshd announces the banner on connect), without us having
to ship credentials anywhere.
* :class:`~decnet.orchestrator.scheduler.FileAction` — drop / refresh a
file inside the destination decky's ssh container via ``docker exec``.
Both shell out via :func:`asyncio.create_subprocess_exec` with argv
lists — never a shell string — so an attacker-controllable decky name
or IP can't escape into a shell.
"""
from __future__ import annotations
import asyncio
import shlex
from typing import Any
from decnet.logging import get_logger
from decnet.orchestrator.drivers.base import ActivityResult
from decnet.orchestrator.scheduler import Action, FileAction, TrafficAction
log = get_logger("orchestrator.ssh")
_DOCKER = "docker"
# Per-call wall-clock cap. The orchestrator runs serially (one action
# per tick); a wedged docker exec must not stall the whole worker.
_TIMEOUT = 8.0
# Container suffix convention: services/*.py emit container_name as
# ``<decky_name>-<service>``. The MVP only drives the ssh service.
_SSH_CONTAINER_SUFFIX = "-ssh"
def _container_for(decky_name: str) -> str:
return f"{decky_name}{_SSH_CONTAINER_SUFFIX}"
async def _run(argv: list[str]) -> tuple[int, str, str]:
"""Spawn *argv* and capture (rc, stdout, stderr).
Returns ``(rc=124, "", "timeout")`` on wall-clock expiry. Never
raises — orchestrator success/failure is a payload attribute, not
an exception.
"""
try:
proc = await asyncio.create_subprocess_exec(
*argv,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as exc:
return 127, "", f"argv[0] not found: {exc}"
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=_TIMEOUT)
except asyncio.TimeoutError:
try:
proc.kill()
except ProcessLookupError:
pass
return 124, "", "timeout"
return (
proc.returncode if proc.returncode is not None else -1,
stdout.decode("utf-8", "replace"),
stderr.decode("utf-8", "replace"),
)
# Python one-liner that probes the destination's SSH banner. Kept inline
# so the driver has zero filesystem dependencies on the host side; the
# *container* needs python3 (ssh service template ships it).
_PROBE_PY = (
"import socket,sys;"
"s=socket.socket();s.settimeout(3);"
"s.connect((sys.argv[1], 22));"
"b=s.recv(128);s.close();"
"sys.stdout.write(b.decode('latin1','replace'))"
)
class SSHDriver:
"""Concrete :class:`Driver` for the MVP."""
async def run(self, action: Action) -> ActivityResult:
if isinstance(action, TrafficAction):
return await self._run_traffic(action)
if isinstance(action, FileAction):
return await self._run_file(action)
raise TypeError(f"unsupported action type: {type(action)!r}")
async def _run_traffic(self, action: TrafficAction) -> ActivityResult:
container = _container_for(action.src_name)
argv = [
_DOCKER, "exec", container,
"python3", "-c", _PROBE_PY, action.dst_ip,
]
rc, stdout, stderr = await _run(argv)
success = rc == 0 and stdout.startswith("SSH-")
payload: dict[str, Any] = {
"src_decky": action.src_name,
"dst_decky": action.dst_name,
"dst_ip": action.dst_ip,
"dst_port": 22,
"rc": rc,
"banner": stdout.strip()[:128] if success else None,
"stderr": stderr.strip()[:256] if not success else None,
}
if not success:
log.debug(
"orchestrator.ssh.traffic failed src=%s dst=%s rc=%d stderr=%r",
action.src_name, action.dst_name, rc, stderr[:120],
)
return ActivityResult(success=success, payload=payload)
async def _run_file(self, action: FileAction) -> ActivityResult:
container = _container_for(action.dst_name)
# `tee` is in coreutils on every base image; using it (instead of
# `>` redirection) keeps the argv free of shell metacharacters
# the dst_name/path could otherwise weaponise. Path validation
# still belongs upstream — the scheduler's templates are fixed.
# We do invoke `sh -c` so the parent dir gets mkdir'd in one
# call; the sh argv stays trivially auditable.
sh_cmd = (
f"mkdir -p {shlex.quote(_dirname(action.path))} && "
f"printf %s {shlex.quote(action.content)} > {shlex.quote(action.path)} && "
f"touch {shlex.quote(action.path)}"
)
argv = [_DOCKER, "exec", container, "sh", "-c", sh_cmd]
rc, stdout, stderr = await _run(argv)
success = rc == 0
payload: dict[str, Any] = {
"dst_decky": action.dst_name,
"path": action.path,
"bytes": len(action.content.encode("utf-8")),
"rc": rc,
"stderr": stderr.strip()[:256] if not success else None,
}
return ActivityResult(success=success, payload=payload)
def _dirname(path: str) -> str:
"""Pure-string dirname. We can't trust ``os.path.dirname`` on the
host to share the destination container's separator semantics, but
deckies are POSIX so a plain ``rfind('/')`` suffices."""
idx = path.rfind("/")
if idx <= 0:
return "/"
return path[:idx]

View File

@@ -0,0 +1,53 @@
"""DB-row + bus-topic helpers for the orchestrator."""
from __future__ import annotations
from datetime import datetime, timezone
from typing import Any
from decnet.bus import topics as _topics
from decnet.orchestrator.drivers.base import ActivityResult
from decnet.orchestrator.scheduler import Action, FileAction, TrafficAction
def to_row(action: Action, result: ActivityResult) -> dict[str, Any]:
"""Build the kwargs dict for ``OrchestratorEvent(**...)``."""
base: dict[str, Any] = {
"ts": datetime.now(timezone.utc),
"protocol": "ssh",
"success": result.success,
"payload": result.payload, # repo serialises dict→json
}
if isinstance(action, TrafficAction):
base.update(
kind="traffic",
action=f"exec:{action.description}",
src_decky_uuid=action.src_uuid,
dst_decky_uuid=action.dst_uuid,
)
elif isinstance(action, FileAction):
base.update(
kind="file",
action=action.description,
src_decky_uuid=None,
dst_decky_uuid=action.dst_uuid,
)
else:
raise TypeError(f"unsupported action type: {type(action)!r}")
return base
def topic_for(action: Action) -> str:
"""Map an action to its bus topic."""
if isinstance(action, TrafficAction):
return _topics.orchestrator(_topics.ORCHESTRATOR_ACTIVITY, action.dst_uuid)
if isinstance(action, FileAction):
return _topics.orchestrator(_topics.ORCHESTRATOR_FILE, action.dst_uuid)
raise TypeError(f"unsupported action type: {type(action)!r}")
def event_type_for(action: Action) -> str:
if isinstance(action, TrafficAction):
return _topics.ORCHESTRATOR_ACTIVITY
if isinstance(action, FileAction):
return _topics.ORCHESTRATOR_FILE
raise TypeError(f"unsupported action type: {type(action)!r}")

View File

@@ -0,0 +1,97 @@
"""Action picker for the orchestrator.
MVP policy: flat random — pick one (src, dst) pair where both deckies
expose SSH, then choose one of {ssh-traffic, file-touch}. No diurnal
shaping, no role-aware pairing — those land in v1.
"""
from __future__ import annotations
import secrets
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any, Optional, Sequence
# A small set of plausible filenames the orchestrator drops or refreshes.
# Scope on purpose: the file driver is "prove the docker-exec write path
# works", not "generate believable user activity". Realism is v2.
# Paths target the filesystem *inside* a decoy container, not the host.
# Bandit B108 is a host-side concern; suppressed at the data definition.
_FILE_TEMPLATES: tuple[tuple[str, str], ...] = ( # nosec B108
("/tmp/.cache-{ts}.tmp", "session={ts}\n"), # nosec B108
("/var/log/cron-{ts}.log", "{ts} CRON[{n}]: ({user}) CMD (run-parts /etc/cron.daily)\n"),
("/home/{user}/notes-{ts}.txt", "todo: rotate keys; check on backup task\n"),
)
_USERS = ("admin", "ubuntu", "service")
@dataclass(frozen=True)
class TrafficAction:
src_uuid: str
src_name: str
dst_uuid: str
dst_name: str
dst_ip: str
protocol: str = "ssh"
description: str = "tcp_connect:22"
@dataclass(frozen=True)
class FileAction:
dst_uuid: str
dst_name: str
path: str
content: str
description: str = "file:create"
Action = TrafficAction | FileAction
def _has_ssh(decky: dict[str, Any]) -> bool:
services = decky.get("services") or []
if isinstance(services, str):
return False # not deserialised — treat as "we don't know"
return "ssh" in services
def pick(
deckies: Sequence[dict[str, Any]],
*,
rand: Optional[secrets.SystemRandom] = None,
) -> Optional[Action]:
"""Pick one action against the given decky set.
Returns ``None`` when no action is possible (fewer than two SSH-capable
deckies for traffic, or no deckies at all for file ops). The worker
treats ``None`` as "skip this tick".
"""
rng = rand or secrets.SystemRandom()
ssh_deckies = [d for d in deckies if _has_ssh(d) and d.get("ip")]
if not ssh_deckies:
return None
kind = "traffic" if (len(ssh_deckies) >= 2 and rng.random() < 0.5) else "file"
if kind == "traffic":
src, dst = rng.sample(ssh_deckies, 2)
return TrafficAction(
src_uuid=src["uuid"],
src_name=src["name"],
dst_uuid=dst["uuid"],
dst_name=dst["name"],
dst_ip=dst["ip"],
)
dst = rng.choice(ssh_deckies)
template, content_template = rng.choice(_FILE_TEMPLATES)
ts = int(datetime.now(timezone.utc).timestamp())
user = rng.choice(_USERS)
path = template.format(ts=ts, user=user)
content = content_template.format(ts=ts, user=user, n=rng.randint(1000, 99999))
return FileAction(
dst_uuid=dst["uuid"],
dst_name=dst["name"],
path=path,
content=content,
)

View File

@@ -0,0 +1,114 @@
"""Orchestrator main loop.
One tick = one (src, dst, action) pick + one driver invocation + one DB
write + one fire-and-forget bus publish. Intentionally serial — MVP
honesty: a wedged docker exec stalls only this worker, never another.
Modeled after :mod:`decnet.profiler.worker` for consistency: same control
listener, same heartbeat helper, same shutdown semantics.
"""
from __future__ import annotations
import asyncio
import contextlib
from decnet.bus.factory import get_bus
from decnet.bus.publish import (
publish_safely,
run_control_listener,
run_health_heartbeat,
)
from decnet.logging import get_logger
from decnet.orchestrator import events, scheduler
from decnet.orchestrator.drivers import SSHDriver
from decnet.web.db.repository import BaseRepository
logger = get_logger("orchestrator")
async def orchestrator_worker(
repo: BaseRepository,
*,
interval: int = 60,
) -> None:
"""Periodically inject synthetic activity into the running fleet.
Runs as a long-lived asyncio task. Honours the bus control topic
(``system.orchestrator.control``) for graceful shutdown.
"""
logger.info("orchestrator worker started interval=%ds", interval)
bus = None
try:
bus = get_bus(client_name="orchestrator")
await bus.connect()
except Exception as exc: # noqa: BLE001
logger.warning(
"orchestrator: bus unavailable, continuing without publish: %s", exc
)
bus = None
driver = SSHDriver()
shutdown = asyncio.Event()
heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "orchestrator"))
control_task = asyncio.create_task(
run_control_listener(bus, "orchestrator", shutdown),
)
try:
while not shutdown.is_set():
try:
await asyncio.wait_for(shutdown.wait(), timeout=interval)
except asyncio.TimeoutError:
pass # normal tick
if shutdown.is_set():
break
try:
await _one_tick(repo, driver, bus)
except Exception as exc: # noqa: BLE001
logger.error("orchestrator tick failed: %s", exc)
finally:
for t in (heartbeat_task, control_task):
t.cancel()
with contextlib.suppress(Exception, asyncio.CancelledError):
await t
if bus is not None:
with contextlib.suppress(Exception):
await bus.close()
async def _one_tick(repo: BaseRepository, driver, bus) -> None:
deckies = await repo.list_running_topology_deckies()
action = scheduler.pick(deckies)
if action is None:
logger.debug(
"orchestrator: no actionable deckies (running+ssh count=%d)",
len(deckies),
)
return
result = await driver.run(action)
row = events.to_row(action, result)
await repo.record_orchestrator_event(row)
if bus is not None:
topic = events.topic_for(action)
# Bus payload mirrors the row but uses iso8601 for ts so SSE
# consumers don't have to JSON-handle datetime themselves.
bus_payload = {
"kind": row["kind"],
"protocol": row["protocol"],
"action": row["action"],
"src_decky_uuid": row.get("src_decky_uuid"),
"dst_decky_uuid": row["dst_decky_uuid"],
"success": row["success"],
"payload": result.payload,
"ts": row["ts"].isoformat(),
}
await publish_safely(
bus, topic, bus_payload, event_type=events.event_type_for(action)
)
logger.info(
"orchestrator tick kind=%s success=%s dst=%s",
row["kind"], row["success"], row["dst_decky_uuid"],
)

View File

@@ -53,6 +53,9 @@ from .health import (
ComponentHealth,
HealthResponse,
)
from .orchestrator import (
OrchestratorEvent,
)
from .logs import (
Bounty,
BountyResponse,
@@ -181,6 +184,8 @@ __all__ = [
# health
"ComponentHealth",
"HealthResponse",
# orchestrator
"OrchestratorEvent",
# logs
"Bounty",
"BountyResponse",

View File

@@ -0,0 +1,52 @@
"""Orchestrator-emitted activity events.
Purpose-built sibling to ``logs.Log`` so attacker-originated events stay
cleanly separable from synthetic life-injection events at query time.
The orchestrator worker is the sole writer.
"""
from datetime import datetime, timezone
from typing import Optional
from uuid import uuid4
from sqlalchemy import Column, Index, Text
from sqlmodel import Field, SQLModel
class OrchestratorEvent(SQLModel, table=True):
"""One orchestrator-driven action against a decky.
``kind`` discriminates the two MVP flavours:
* ``"traffic"`` — a protocol-driven interaction (SSH command exec for
MVP). ``src_decky_uuid`` is the *logical* originator and may differ
from the actual TCP source for the duration of the MVP, where the
orchestrator process drives the connection from the host. ``v1``
will execute the connection from inside the source container.
* ``"file"`` — a filesystem touch via ``docker exec`` against the
destination decky. ``src_decky_uuid`` is null.
``payload`` is the per-action JSON envelope: command run, exit code,
stdout/stderr digest, file path, byte counts, etc. Schema is
deliberately loose — the worker can extend it without a migration.
"""
__tablename__ = "orchestrator_events"
__table_args__ = (
Index("ix_orchestrator_events_dst_ts", "dst_decky_uuid", "ts"),
)
uuid: str = Field(default_factory=lambda: str(uuid4()), primary_key=True)
ts: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True
)
kind: str = Field(index=True, max_length=16) # traffic|file
protocol: str = Field(index=True, max_length=16) # ssh for MVP
action: str = Field(max_length=64) # exec:uptime|file:create|...
src_decky_uuid: Optional[str] = Field(
default=None, foreign_key="topology_deckies.uuid", index=True
)
dst_decky_uuid: str = Field(
foreign_key="topology_deckies.uuid", index=True
)
success: bool = Field(default=False, index=True)
payload: str = Field(
sa_column=Column("payload", Text, nullable=False, default="{}")
)

View File

@@ -857,3 +857,24 @@ class BaseRepository(ABC):
"""Auto-disable a subscription after repeated failures. Sets
``enabled=False`` and stamps ``auto_disabled_at``."""
raise NotImplementedError
# ---------------------------------------------------------- orchestrator
async def list_running_topology_deckies(self) -> list[dict[str, Any]]:
"""Return every TopologyDecky row whose ``state == 'running'``.
The orchestrator picks pairs from this set to drive synthetic
inter-decky activity. Cross-topology by design: a multi-topology
host still has a single orchestrator instance.
"""
raise NotImplementedError
async def record_orchestrator_event(self, data: dict[str, Any]) -> str:
"""Insert one orchestrator-emitted event row, returning its uuid."""
raise NotImplementedError
async def list_orchestrator_events(
self, *, limit: int = 100, since_ts: Optional[Any] = None
) -> list[dict[str, Any]]:
"""Return recent orchestrator events newest-first."""
raise NotImplementedError

View File

@@ -49,6 +49,7 @@ from decnet.web.db.models import (
TopologyEdge,
TopologyStatusEvent,
TopologyMutation,
OrchestratorEvent,
WebhookSubscription,
)
@@ -2787,3 +2788,42 @@ class SQLModelRepository(BaseRepository):
)
)
await session.commit()
# ---------------------------------------------------------- orchestrator
async def list_running_topology_deckies(self) -> list[dict[str, Any]]:
async with self._session() as session:
result = await session.execute(
select(TopologyDecky).where(TopologyDecky.state == "running")
)
return [
self._deserialize_json_fields(
r.model_dump(mode="json"), ("services", "decky_config")
)
for r in result.scalars().all()
]
async def record_orchestrator_event(self, data: dict[str, Any]) -> str:
payload = data.get("payload")
if isinstance(payload, (dict, list)):
data = {**data, "payload": json.dumps(payload)}
async with self._session() as session:
row = OrchestratorEvent(**data)
session.add(row)
await session.commit()
await session.refresh(row)
return row.uuid
async def list_orchestrator_events(
self,
*,
limit: int = 100,
since_ts: Optional[datetime] = None,
) -> list[dict[str, Any]]:
async with self._session() as session:
stmt = select(OrchestratorEvent)
if since_ts is not None:
stmt = stmt.where(OrchestratorEvent.ts >= since_ts)
stmt = stmt.order_by(desc(OrchestratorEvent.ts)).limit(limit)
result = await session.execute(stmt)
return [r.model_dump(mode="json") for r in result.scalars().all()]

View File

@@ -28,6 +28,7 @@ _PREFERRED_ORDER: tuple[str, ...] = (
"reuse-correlator",
"enrich",
"webhook",
"orchestrator",
)

View File

@@ -41,6 +41,7 @@ KNOWN_WORKERS: tuple[str, ...] = (
"reuse-correlator", # credential-reuse pass — bus-woken on credential.captured
"enrich", # threat-intel enrichment — bus-woken on attacker.observed/scored
"webhook", # external SIEM/SOAR egress — bus consumer → HMAC HTTP POSTs
"orchestrator", # synthetic life-injection — inter-decky traffic + file ops
"agent",
"forwarder",
"updater",

View File

@@ -0,0 +1,43 @@
[Unit]
Description=DECNET Orchestrator (synthetic life-injection — inter-decky traffic + file ops)
Documentation=https://git.resacachile.cl/anti/DECNET/wiki/Workers#orchestrator
After=network-online.target decnet-bus.service
Wants=network-online.target decnet-bus.service
[Service]
Type=simple
User={{ user }}
Group={{ group }}
WorkingDirectory={{ install_dir }}
EnvironmentFile=-{{ install_dir }}/.env.local
Environment=DECNET_SYSTEM_LOGS=/var/log/decnet/decnet.orchestrator.log
ExecStart={{ venv_dir }}/bin/decnet orchestrate
StandardOutput=append:/var/log/decnet/decnet.orchestrator.log
StandardError=append:/var/log/decnet/decnet.orchestrator.log
# The orchestrator drives `docker exec` against decky containers, so it
# needs membership in the docker group. It does NOT bind to the network,
# launch new containers, or write outside its own logs and install dir.
SupplementaryGroups=docker
CapabilityBoundingSet=
AmbientCapabilities=
# Security Hardening
NoNewPrivileges=yes
ProtectSystem=full
ProtectHome=read-only
PrivateTmp=yes
ProtectKernelTunables=yes
ProtectKernelModules=yes
ProtectControlGroups=yes
RestrictSUIDSGID=yes
LockPersonality=yes
ReadWritePaths={{ install_dir }} /var/log/decnet
Restart=on-failure
RestartSec=5
TimeoutStopSec=15
[Install]
WantedBy=multi-user.target

View File

View File

@@ -0,0 +1,60 @@
"""Picker policy tests for the orchestrator scheduler."""
from __future__ import annotations
import secrets
import pytest
from decnet.orchestrator import scheduler
def _decky(uuid: str, name: str, ip: str | None, services: list[str] | str):
return {"uuid": uuid, "name": name, "ip": ip, "services": services}
def test_pick_returns_none_when_no_ssh_deckies():
deckies = [
_decky("u1", "decky-01", "10.0.0.1", ["http"]),
_decky("u2", "decky-02", "10.0.0.2", ["smb"]),
]
assert scheduler.pick(deckies) is None
def test_pick_returns_none_when_ssh_decky_has_no_ip():
deckies = [_decky("u1", "decky-01", None, ["ssh"])]
assert scheduler.pick(deckies) is None
def test_pick_file_action_with_single_ssh_decky():
deckies = [_decky("u1", "decky-01", "10.0.0.1", ["ssh"])]
rng = secrets.SystemRandom()
rng.seed = lambda *_: None # SystemRandom doesn't seed; ignore
action = scheduler.pick(deckies, rand=rng)
assert isinstance(action, scheduler.FileAction)
assert action.dst_uuid == "u1"
assert action.path.startswith("/")
assert action.content
def test_pick_traffic_or_file_with_two_ssh_deckies():
deckies = [
_decky("u1", "decky-01", "10.0.0.1", ["ssh"]),
_decky("u2", "decky-02", "10.0.0.2", ["ssh"]),
]
seen_kinds: set[str] = set()
# 50/50 split — 40 trials makes both kinds essentially certain
for _ in range(40):
action = scheduler.pick(deckies)
assert action is not None
seen_kinds.add("traffic" if isinstance(action, scheduler.TrafficAction) else "file")
if isinstance(action, scheduler.TrafficAction):
assert action.src_uuid != action.dst_uuid
assert action.dst_ip in {"10.0.0.1", "10.0.0.2"}
assert action.protocol == "ssh"
assert seen_kinds == {"traffic", "file"}
def test_pick_skips_non_deserialised_services():
"""If services is still a JSON string (defensive), the decky is excluded."""
deckies = [_decky("u1", "decky-01", "10.0.0.1", '["ssh"]')]
assert scheduler.pick(deckies) is None

View File

@@ -0,0 +1,99 @@
"""Driver tests with the docker subprocess mocked.
We don't need a real Docker daemon to validate the driver's contract:
it boils down to "build an argv, call _run, classify the result". A
dependency-injected ``_run`` keeps the tests hermetic.
"""
from __future__ import annotations
import pytest
from decnet.orchestrator.drivers import ssh as ssh_driver
from decnet.orchestrator.drivers.base import ActivityResult
from decnet.orchestrator.scheduler import FileAction, TrafficAction
@pytest.mark.asyncio
async def test_traffic_success_classifies_on_ssh_banner(monkeypatch):
captured_argv: list[list[str]] = []
async def fake_run(argv):
captured_argv.append(argv)
return 0, "SSH-2.0-OpenSSH_9.6\r\n", ""
monkeypatch.setattr(ssh_driver, "_run", fake_run)
drv = ssh_driver.SSHDriver()
action = TrafficAction(
src_uuid="u1", src_name="decky-01",
dst_uuid="u2", dst_name="decky-02",
dst_ip="10.0.0.2",
)
result = await drv.run(action)
assert isinstance(result, ActivityResult)
assert result.success is True
assert result.payload["banner"].startswith("SSH-2.0-OpenSSH")
assert captured_argv[0][:3] == ["docker", "exec", "decky-01-ssh"]
assert captured_argv[0][-1] == "10.0.0.2"
@pytest.mark.asyncio
async def test_traffic_failure_when_banner_missing(monkeypatch):
async def fake_run(argv):
return 1, "", "Connection refused"
monkeypatch.setattr(ssh_driver, "_run", fake_run)
drv = ssh_driver.SSHDriver()
action = TrafficAction(
src_uuid="u1", src_name="decky-01",
dst_uuid="u2", dst_name="decky-02",
dst_ip="10.0.0.2",
)
result = await drv.run(action)
assert result.success is False
assert result.payload["rc"] == 1
assert "Connection refused" in result.payload["stderr"]
@pytest.mark.asyncio
async def test_file_action_invokes_docker_exec_on_dst(monkeypatch):
captured_argv: list[list[str]] = []
async def fake_run(argv):
captured_argv.append(argv)
return 0, "", ""
monkeypatch.setattr(ssh_driver, "_run", fake_run)
drv = ssh_driver.SSHDriver()
action = FileAction(
dst_uuid="u2", dst_name="decky-02",
path="/tmp/.cache-1700000000.tmp",
content="session=1700000000\n",
)
result = await drv.run(action)
assert result.success is True
assert result.payload["bytes"] == len("session=1700000000\n".encode())
argv = captured_argv[0]
assert argv[:3] == ["docker", "exec", "decky-02-ssh"]
assert argv[3] == "sh"
assert argv[4] == "-c"
# The shell payload must single-quote both the content and the path —
# any unquoted ``;`` or ``$`` here would mean a shell-injection bug.
sh_cmd = argv[5]
# Path appears (shlex.quote leaves safe paths unquoted) and content
# is single-quoted — that's the shell-injection-safe contract.
assert "/tmp/.cache-1700000000.tmp" in sh_cmd
assert "'session=1700000000\n'" in sh_cmd
assert "mkdir -p /tmp" in sh_cmd
@pytest.mark.asyncio
async def test_run_handles_missing_docker_binary(monkeypatch):
async def fake_create(*args, **kwargs):
raise FileNotFoundError("docker")
monkeypatch.setattr(
"asyncio.create_subprocess_exec", fake_create,
)
rc, out, err = await ssh_driver._run(["docker", "exec", "x", "true"])
assert rc == 127
assert "not found" in err

View File

@@ -0,0 +1,123 @@
"""End-to-end-ish: run one orchestrator tick against a real SQLite repo +
FakeBus, with the docker subprocess stubbed. Verifies that:
* :func:`scheduler.pick` reads the deckies the repo returns,
* the driver result is persisted to ``orchestrator_events``,
* a bus event is published to the right topic.
"""
from __future__ import annotations
import json
import pytest
import pytest_asyncio
from decnet.bus.fake import FakeBus
from decnet.orchestrator import worker as orch_worker
from decnet.orchestrator.drivers import ssh as ssh_driver
from decnet.web.db.models import TopologyDecky, Topology
from decnet.web.db.sqlite.repository import SQLiteRepository
@pytest_asyncio.fixture
async def repo(tmp_path):
r = SQLiteRepository(db_path=str(tmp_path / "decnet.db"))
await r.initialize()
yield r
await r.engine.dispose()
@pytest_asyncio.fixture
async def fake_bus():
bus = FakeBus()
await bus.connect()
try:
yield bus
finally:
await bus.close()
async def _seed_two_running_ssh_deckies(repo: SQLiteRepository) -> tuple[str, str]:
async with repo._session() as session:
topo = Topology(name="t1", config_snapshot="{}", status="active")
session.add(topo)
await session.commit()
await session.refresh(topo)
d1 = TopologyDecky(
topology_id=topo.id, name="decky-01",
services=json.dumps(["ssh"]), ip="10.0.0.1", state="running",
)
d2 = TopologyDecky(
topology_id=topo.id, name="decky-02",
services=json.dumps(["ssh"]), ip="10.0.0.2", state="running",
)
session.add(d1)
session.add(d2)
await session.commit()
await session.refresh(d1)
await session.refresh(d2)
return d1.uuid, d2.uuid
@pytest.mark.asyncio
async def test_one_tick_records_event_and_publishes(repo, fake_bus, monkeypatch):
await _seed_two_running_ssh_deckies(repo)
# Pretend every docker exec succeeds with an SSH banner; that lets
# both action kinds (traffic + file) land as success rows so the
# assertions below don't have to care which one the scheduler picked.
async def fake_run(argv):
if argv[3] == "python3":
return 0, "SSH-2.0-OpenSSH_9.6\r\n", ""
return 0, "", ""
monkeypatch.setattr(ssh_driver, "_run", fake_run)
received: list = []
async def collect():
async with fake_bus.subscribe("orchestrator.>") as sub:
async for ev in sub:
received.append(ev)
if len(received) >= 1:
return
import asyncio
collector = asyncio.create_task(collect())
# Yield once so the subscription is registered before we publish.
await asyncio.sleep(0)
driver = ssh_driver.SSHDriver()
await orch_worker._one_tick(repo, driver, fake_bus)
await asyncio.wait_for(collector, timeout=2.0)
rows = await repo.list_orchestrator_events(limit=10)
assert len(rows) == 1
row = rows[0]
assert row["success"] is True
assert row["protocol"] == "ssh"
assert row["kind"] in {"traffic", "file"}
assert len(received) == 1
ev = received[0]
assert ev.topic.startswith("orchestrator.")
assert ev.payload["success"] is True
assert ev.payload["kind"] == row["kind"]
@pytest.mark.asyncio
async def test_tick_is_noop_when_no_running_deckies(repo, fake_bus, monkeypatch):
called = False
async def fake_run(argv):
nonlocal called
called = True
return 0, "SSH-2.0-foo", ""
monkeypatch.setattr(ssh_driver, "_run", fake_run)
driver = ssh_driver.SSHDriver()
await orch_worker._one_tick(repo, driver, fake_bus)
assert called is False
assert await repo.list_orchestrator_events(limit=10) == []