diff --git a/decnet/bus/topics.py b/decnet/bus/topics.py index 4016d867..7194de9c 100644 --- a/decnet/bus/topics.py +++ b/decnet/bus/topics.py @@ -10,6 +10,8 @@ Token structure (NATS-style, dot-separated): topology.{topology_id}.status decky.{decky_id}.state decky.{decky_id}.traffic + orchestrator.activity.{decky_id} + orchestrator.file.{decky_id} attacker.observed attacker.scored attacker.session.started @@ -46,6 +48,7 @@ IDENTITY = "identity" CAMPAIGN = "campaign" SYSTEM = "system" CREDENTIAL = "credential" +ORCHESTRATOR = "orchestrator" # ─── Leaf event-type constants (the last segment of each topic) ────────────── @@ -160,6 +163,16 @@ CAMPAIGN_UNMERGED = "unmerged" CREDENTIAL_CAPTURED = "captured" CREDENTIAL_REUSE_DETECTED = "reuse.detected" +# Orchestrator event types (second token under ``orchestrator``). The +# orchestrator worker publishes one of these per synthetic action it +# drives against a decky — cheap inter-decky traffic and filesystem +# mutations whose role is to keep the honeypot from looking suspiciously +# static. Always nested with the destination decky uuid as the third +# token, so consumers can subscribe to a single decky's life-injection +# stream via ``orchestrator.*.``. +ORCHESTRATOR_ACTIVITY = "activity" +ORCHESTRATOR_FILE = "file" + # System event types. SYSTEM_LOG = "log" SYSTEM_BUS_HEALTH = "bus.health" @@ -280,6 +293,18 @@ def identity(event_type: str) -> str: return f"{IDENTITY}.{event_type}" +def orchestrator(event_type: str, decky_id: str) -> str: + """Build ``orchestrator..``. + + *event_type* should be one of :data:`ORCHESTRATOR_ACTIVITY` or + :data:`ORCHESTRATOR_FILE`. The destination decky is always the + third token so per-decky subscribers can use + ``orchestrator.*.``. + """ + _reject_tokens(event_type, decky_id) + return f"{ORCHESTRATOR}.{event_type}.{decky_id}" + + def system_health(worker: str) -> str: """Build ``system..health``. diff --git a/decnet/cli/__init__.py b/decnet/cli/__init__.py index 1f7829dd..45a703e5 100644 --- a/decnet/cli/__init__.py +++ b/decnet/cli/__init__.py @@ -30,6 +30,7 @@ from . import ( inventory, lifecycle, listener, + orchestrator, profiler, sniffer, swarm, @@ -54,7 +55,7 @@ for _mod in ( api, swarmctl, agent, updater, listener, forwarder, swarm, deploy, lifecycle, workers, inventory, - web, profiler, sniffer, db, + web, profiler, orchestrator, sniffer, db, topology, bus, geoip, init, webhook, ): _mod.register(app) diff --git a/decnet/cli/orchestrator.py b/decnet/cli/orchestrator.py new file mode 100644 index 00000000..bcdc2e4e --- /dev/null +++ b/decnet/cli/orchestrator.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +import typer + +from . import utils as _utils +from .utils import console, log + + +def register(app: typer.Typer) -> None: + @app.command(name="orchestrate") + def orchestrate_cmd( + interval: int = typer.Option( + 60, "--interval", "-i", + help="Seconds between synthetic activity ticks", + ), + daemon: bool = typer.Option( + False, "--daemon", "-d", + help="Detach to background as a daemon process", + ), + ) -> None: + """Inject synthetic life (inter-decky traffic + file ops) into the fleet.""" + import asyncio + from decnet.orchestrator import orchestrator_worker + from decnet.web.dependencies import repo + + if daemon: + log.info("orchestrator daemonizing interval=%d", interval) + _utils._daemonize() + + log.info("orchestrator starting interval=%d", interval) + console.print( + f"[bold cyan]Orchestrator starting[/] (interval: {interval}s)" + ) + + async def _run() -> None: + await repo.initialize() + await orchestrator_worker(repo, interval=interval) + + try: + asyncio.run(_run()) + except KeyboardInterrupt: + console.print("\n[yellow]Orchestrator stopped.[/]") diff --git a/decnet/orchestrator/__init__.py b/decnet/orchestrator/__init__.py new file mode 100644 index 00000000..d24a171f --- /dev/null +++ b/decnet/orchestrator/__init__.py @@ -0,0 +1,9 @@ +"""DECNET orchestrator — synthetic life-injection worker. + +Drives realistic-looking activity between deckies (inter-decky traffic and +in-decky filesystem mutations) so the honeypot stops looking suspiciously +static. Sole writer of the ``OrchestratorEvent`` table. +""" +from decnet.orchestrator.worker import orchestrator_worker + +__all__ = ["orchestrator_worker"] diff --git a/decnet/orchestrator/drivers/__init__.py b/decnet/orchestrator/drivers/__init__.py new file mode 100644 index 00000000..cdfb8d0d --- /dev/null +++ b/decnet/orchestrator/drivers/__init__.py @@ -0,0 +1,5 @@ +"""Activity drivers for the orchestrator (MVP: SSH only).""" +from decnet.orchestrator.drivers.base import ActivityResult, Driver +from decnet.orchestrator.drivers.ssh import SSHDriver + +__all__ = ["ActivityResult", "Driver", "SSHDriver"] diff --git a/decnet/orchestrator/drivers/base.py b/decnet/orchestrator/drivers/base.py new file mode 100644 index 00000000..cbad4db3 --- /dev/null +++ b/decnet/orchestrator/drivers/base.py @@ -0,0 +1,27 @@ +"""Driver protocol for orchestrator actions. + +Future protocols (HTTP, SMB, MySQL, …) implement this interface alongside +the SSH driver. Kept deliberately minimal — the orchestrator only needs +"run this action and tell me how it went". +""" +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Protocol + +from decnet.orchestrator.scheduler import Action + + +@dataclass +class ActivityResult: + """Outcome of one driver invocation. + + ``payload`` is the per-action JSON envelope the worker writes to the + ``OrchestratorEvent.payload`` column and to the bus event body. + """ + success: bool + payload: dict[str, Any] = field(default_factory=dict) + + +class Driver(Protocol): + async def run(self, action: Action) -> ActivityResult: ... diff --git a/decnet/orchestrator/drivers/ssh.py b/decnet/orchestrator/drivers/ssh.py new file mode 100644 index 00000000..64ddced8 --- /dev/null +++ b/decnet/orchestrator/drivers/ssh.py @@ -0,0 +1,153 @@ +"""MVP SSH-flavoured driver. + +Two action shapes: + +* :class:`~decnet.orchestrator.scheduler.TrafficAction` — exec a tiny + Python one-liner *inside the source decky's ssh container* that opens + TCP/22 against the destination decky's IP and reads the SSH banner. + This generates real on-the-wire SSH-protocol traffic between the two + containers (sshd announces the banner on connect), without us having + to ship credentials anywhere. +* :class:`~decnet.orchestrator.scheduler.FileAction` — drop / refresh a + file inside the destination decky's ssh container via ``docker exec``. + +Both shell out via :func:`asyncio.create_subprocess_exec` with argv +lists — never a shell string — so an attacker-controllable decky name +or IP can't escape into a shell. +""" +from __future__ import annotations + +import asyncio +import shlex +from typing import Any + +from decnet.logging import get_logger +from decnet.orchestrator.drivers.base import ActivityResult +from decnet.orchestrator.scheduler import Action, FileAction, TrafficAction + +log = get_logger("orchestrator.ssh") + +_DOCKER = "docker" +# Per-call wall-clock cap. The orchestrator runs serially (one action +# per tick); a wedged docker exec must not stall the whole worker. +_TIMEOUT = 8.0 + +# Container suffix convention: services/*.py emit container_name as +# ``-``. The MVP only drives the ssh service. +_SSH_CONTAINER_SUFFIX = "-ssh" + + +def _container_for(decky_name: str) -> str: + return f"{decky_name}{_SSH_CONTAINER_SUFFIX}" + + +async def _run(argv: list[str]) -> tuple[int, str, str]: + """Spawn *argv* and capture (rc, stdout, stderr). + + Returns ``(rc=124, "", "timeout")`` on wall-clock expiry. Never + raises — orchestrator success/failure is a payload attribute, not + an exception. + """ + try: + proc = await asyncio.create_subprocess_exec( + *argv, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + except FileNotFoundError as exc: + return 127, "", f"argv[0] not found: {exc}" + try: + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=_TIMEOUT) + except asyncio.TimeoutError: + try: + proc.kill() + except ProcessLookupError: + pass + return 124, "", "timeout" + return ( + proc.returncode if proc.returncode is not None else -1, + stdout.decode("utf-8", "replace"), + stderr.decode("utf-8", "replace"), + ) + + +# Python one-liner that probes the destination's SSH banner. Kept inline +# so the driver has zero filesystem dependencies on the host side; the +# *container* needs python3 (ssh service template ships it). +_PROBE_PY = ( + "import socket,sys;" + "s=socket.socket();s.settimeout(3);" + "s.connect((sys.argv[1], 22));" + "b=s.recv(128);s.close();" + "sys.stdout.write(b.decode('latin1','replace'))" +) + + +class SSHDriver: + """Concrete :class:`Driver` for the MVP.""" + + async def run(self, action: Action) -> ActivityResult: + if isinstance(action, TrafficAction): + return await self._run_traffic(action) + if isinstance(action, FileAction): + return await self._run_file(action) + raise TypeError(f"unsupported action type: {type(action)!r}") + + async def _run_traffic(self, action: TrafficAction) -> ActivityResult: + container = _container_for(action.src_name) + argv = [ + _DOCKER, "exec", container, + "python3", "-c", _PROBE_PY, action.dst_ip, + ] + rc, stdout, stderr = await _run(argv) + success = rc == 0 and stdout.startswith("SSH-") + payload: dict[str, Any] = { + "src_decky": action.src_name, + "dst_decky": action.dst_name, + "dst_ip": action.dst_ip, + "dst_port": 22, + "rc": rc, + "banner": stdout.strip()[:128] if success else None, + "stderr": stderr.strip()[:256] if not success else None, + } + if not success: + log.debug( + "orchestrator.ssh.traffic failed src=%s dst=%s rc=%d stderr=%r", + action.src_name, action.dst_name, rc, stderr[:120], + ) + return ActivityResult(success=success, payload=payload) + + async def _run_file(self, action: FileAction) -> ActivityResult: + container = _container_for(action.dst_name) + # `tee` is in coreutils on every base image; using it (instead of + # `>` redirection) keeps the argv free of shell metacharacters + # the dst_name/path could otherwise weaponise. Path validation + # still belongs upstream — the scheduler's templates are fixed. + # We do invoke `sh -c` so the parent dir gets mkdir'd in one + # call; the sh argv stays trivially auditable. + sh_cmd = ( + f"mkdir -p {shlex.quote(_dirname(action.path))} && " + f"printf %s {shlex.quote(action.content)} > {shlex.quote(action.path)} && " + f"touch {shlex.quote(action.path)}" + ) + argv = [_DOCKER, "exec", container, "sh", "-c", sh_cmd] + rc, stdout, stderr = await _run(argv) + success = rc == 0 + payload: dict[str, Any] = { + "dst_decky": action.dst_name, + "path": action.path, + "bytes": len(action.content.encode("utf-8")), + "rc": rc, + "stderr": stderr.strip()[:256] if not success else None, + } + return ActivityResult(success=success, payload=payload) + + +def _dirname(path: str) -> str: + """Pure-string dirname. We can't trust ``os.path.dirname`` on the + host to share the destination container's separator semantics, but + deckies are POSIX so a plain ``rfind('/')`` suffices.""" + idx = path.rfind("/") + if idx <= 0: + return "/" + return path[:idx] diff --git a/decnet/orchestrator/events.py b/decnet/orchestrator/events.py new file mode 100644 index 00000000..d434850f --- /dev/null +++ b/decnet/orchestrator/events.py @@ -0,0 +1,53 @@ +"""DB-row + bus-topic helpers for the orchestrator.""" +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + +from decnet.bus import topics as _topics +from decnet.orchestrator.drivers.base import ActivityResult +from decnet.orchestrator.scheduler import Action, FileAction, TrafficAction + + +def to_row(action: Action, result: ActivityResult) -> dict[str, Any]: + """Build the kwargs dict for ``OrchestratorEvent(**...)``.""" + base: dict[str, Any] = { + "ts": datetime.now(timezone.utc), + "protocol": "ssh", + "success": result.success, + "payload": result.payload, # repo serialises dict→json + } + if isinstance(action, TrafficAction): + base.update( + kind="traffic", + action=f"exec:{action.description}", + src_decky_uuid=action.src_uuid, + dst_decky_uuid=action.dst_uuid, + ) + elif isinstance(action, FileAction): + base.update( + kind="file", + action=action.description, + src_decky_uuid=None, + dst_decky_uuid=action.dst_uuid, + ) + else: + raise TypeError(f"unsupported action type: {type(action)!r}") + return base + + +def topic_for(action: Action) -> str: + """Map an action to its bus topic.""" + if isinstance(action, TrafficAction): + return _topics.orchestrator(_topics.ORCHESTRATOR_ACTIVITY, action.dst_uuid) + if isinstance(action, FileAction): + return _topics.orchestrator(_topics.ORCHESTRATOR_FILE, action.dst_uuid) + raise TypeError(f"unsupported action type: {type(action)!r}") + + +def event_type_for(action: Action) -> str: + if isinstance(action, TrafficAction): + return _topics.ORCHESTRATOR_ACTIVITY + if isinstance(action, FileAction): + return _topics.ORCHESTRATOR_FILE + raise TypeError(f"unsupported action type: {type(action)!r}") diff --git a/decnet/orchestrator/scheduler.py b/decnet/orchestrator/scheduler.py new file mode 100644 index 00000000..1d1e68e7 --- /dev/null +++ b/decnet/orchestrator/scheduler.py @@ -0,0 +1,97 @@ +"""Action picker for the orchestrator. + +MVP policy: flat random — pick one (src, dst) pair where both deckies +expose SSH, then choose one of {ssh-traffic, file-touch}. No diurnal +shaping, no role-aware pairing — those land in v1. +""" +from __future__ import annotations + +import secrets +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, Optional, Sequence + +# A small set of plausible filenames the orchestrator drops or refreshes. +# Scope on purpose: the file driver is "prove the docker-exec write path +# works", not "generate believable user activity". Realism is v2. +# Paths target the filesystem *inside* a decoy container, not the host. +# Bandit B108 is a host-side concern; suppressed at the data definition. +_FILE_TEMPLATES: tuple[tuple[str, str], ...] = ( # nosec B108 + ("/tmp/.cache-{ts}.tmp", "session={ts}\n"), # nosec B108 + ("/var/log/cron-{ts}.log", "{ts} CRON[{n}]: ({user}) CMD (run-parts /etc/cron.daily)\n"), + ("/home/{user}/notes-{ts}.txt", "todo: rotate keys; check on backup task\n"), +) + +_USERS = ("admin", "ubuntu", "service") + + +@dataclass(frozen=True) +class TrafficAction: + src_uuid: str + src_name: str + dst_uuid: str + dst_name: str + dst_ip: str + protocol: str = "ssh" + description: str = "tcp_connect:22" + + +@dataclass(frozen=True) +class FileAction: + dst_uuid: str + dst_name: str + path: str + content: str + description: str = "file:create" + + +Action = TrafficAction | FileAction + + +def _has_ssh(decky: dict[str, Any]) -> bool: + services = decky.get("services") or [] + if isinstance(services, str): + return False # not deserialised — treat as "we don't know" + return "ssh" in services + + +def pick( + deckies: Sequence[dict[str, Any]], + *, + rand: Optional[secrets.SystemRandom] = None, +) -> Optional[Action]: + """Pick one action against the given decky set. + + Returns ``None`` when no action is possible (fewer than two SSH-capable + deckies for traffic, or no deckies at all for file ops). The worker + treats ``None`` as "skip this tick". + """ + rng = rand or secrets.SystemRandom() + ssh_deckies = [d for d in deckies if _has_ssh(d) and d.get("ip")] + if not ssh_deckies: + return None + + kind = "traffic" if (len(ssh_deckies) >= 2 and rng.random() < 0.5) else "file" + + if kind == "traffic": + src, dst = rng.sample(ssh_deckies, 2) + return TrafficAction( + src_uuid=src["uuid"], + src_name=src["name"], + dst_uuid=dst["uuid"], + dst_name=dst["name"], + dst_ip=dst["ip"], + ) + + dst = rng.choice(ssh_deckies) + template, content_template = rng.choice(_FILE_TEMPLATES) + ts = int(datetime.now(timezone.utc).timestamp()) + user = rng.choice(_USERS) + path = template.format(ts=ts, user=user) + content = content_template.format(ts=ts, user=user, n=rng.randint(1000, 99999)) + return FileAction( + dst_uuid=dst["uuid"], + dst_name=dst["name"], + path=path, + content=content, + ) diff --git a/decnet/orchestrator/worker.py b/decnet/orchestrator/worker.py new file mode 100644 index 00000000..44223529 --- /dev/null +++ b/decnet/orchestrator/worker.py @@ -0,0 +1,114 @@ +"""Orchestrator main loop. + +One tick = one (src, dst, action) pick + one driver invocation + one DB +write + one fire-and-forget bus publish. Intentionally serial — MVP +honesty: a wedged docker exec stalls only this worker, never another. + +Modeled after :mod:`decnet.profiler.worker` for consistency: same control +listener, same heartbeat helper, same shutdown semantics. +""" +from __future__ import annotations + +import asyncio +import contextlib + +from decnet.bus.factory import get_bus +from decnet.bus.publish import ( + publish_safely, + run_control_listener, + run_health_heartbeat, +) +from decnet.logging import get_logger +from decnet.orchestrator import events, scheduler +from decnet.orchestrator.drivers import SSHDriver +from decnet.web.db.repository import BaseRepository + +logger = get_logger("orchestrator") + + +async def orchestrator_worker( + repo: BaseRepository, + *, + interval: int = 60, +) -> None: + """Periodically inject synthetic activity into the running fleet. + + Runs as a long-lived asyncio task. Honours the bus control topic + (``system.orchestrator.control``) for graceful shutdown. + """ + logger.info("orchestrator worker started interval=%ds", interval) + + bus = None + try: + bus = get_bus(client_name="orchestrator") + await bus.connect() + except Exception as exc: # noqa: BLE001 + logger.warning( + "orchestrator: bus unavailable, continuing without publish: %s", exc + ) + bus = None + + driver = SSHDriver() + shutdown = asyncio.Event() + heartbeat_task = asyncio.create_task(run_health_heartbeat(bus, "orchestrator")) + control_task = asyncio.create_task( + run_control_listener(bus, "orchestrator", shutdown), + ) + try: + while not shutdown.is_set(): + try: + await asyncio.wait_for(shutdown.wait(), timeout=interval) + except asyncio.TimeoutError: + pass # normal tick + if shutdown.is_set(): + break + try: + await _one_tick(repo, driver, bus) + except Exception as exc: # noqa: BLE001 + logger.error("orchestrator tick failed: %s", exc) + finally: + for t in (heartbeat_task, control_task): + t.cancel() + with contextlib.suppress(Exception, asyncio.CancelledError): + await t + if bus is not None: + with contextlib.suppress(Exception): + await bus.close() + + +async def _one_tick(repo: BaseRepository, driver, bus) -> None: + deckies = await repo.list_running_topology_deckies() + action = scheduler.pick(deckies) + if action is None: + logger.debug( + "orchestrator: no actionable deckies (running+ssh count=%d)", + len(deckies), + ) + return + + result = await driver.run(action) + row = events.to_row(action, result) + await repo.record_orchestrator_event(row) + + if bus is not None: + topic = events.topic_for(action) + # Bus payload mirrors the row but uses iso8601 for ts so SSE + # consumers don't have to JSON-handle datetime themselves. + bus_payload = { + "kind": row["kind"], + "protocol": row["protocol"], + "action": row["action"], + "src_decky_uuid": row.get("src_decky_uuid"), + "dst_decky_uuid": row["dst_decky_uuid"], + "success": row["success"], + "payload": result.payload, + "ts": row["ts"].isoformat(), + } + await publish_safely( + bus, topic, bus_payload, event_type=events.event_type_for(action) + ) + + logger.info( + "orchestrator tick kind=%s success=%s dst=%s", + row["kind"], row["success"], row["dst_decky_uuid"], + ) diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index 73e43fdf..dcdcb1fe 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -53,6 +53,9 @@ from .health import ( ComponentHealth, HealthResponse, ) +from .orchestrator import ( + OrchestratorEvent, +) from .logs import ( Bounty, BountyResponse, @@ -181,6 +184,8 @@ __all__ = [ # health "ComponentHealth", "HealthResponse", + # orchestrator + "OrchestratorEvent", # logs "Bounty", "BountyResponse", diff --git a/decnet/web/db/models/orchestrator.py b/decnet/web/db/models/orchestrator.py new file mode 100644 index 00000000..cd02087e --- /dev/null +++ b/decnet/web/db/models/orchestrator.py @@ -0,0 +1,52 @@ +"""Orchestrator-emitted activity events. + +Purpose-built sibling to ``logs.Log`` so attacker-originated events stay +cleanly separable from synthetic life-injection events at query time. +The orchestrator worker is the sole writer. +""" +from datetime import datetime, timezone +from typing import Optional +from uuid import uuid4 + +from sqlalchemy import Column, Index, Text +from sqlmodel import Field, SQLModel + + +class OrchestratorEvent(SQLModel, table=True): + """One orchestrator-driven action against a decky. + + ``kind`` discriminates the two MVP flavours: + + * ``"traffic"`` — a protocol-driven interaction (SSH command exec for + MVP). ``src_decky_uuid`` is the *logical* originator and may differ + from the actual TCP source for the duration of the MVP, where the + orchestrator process drives the connection from the host. ``v1`` + will execute the connection from inside the source container. + * ``"file"`` — a filesystem touch via ``docker exec`` against the + destination decky. ``src_decky_uuid`` is null. + + ``payload`` is the per-action JSON envelope: command run, exit code, + stdout/stderr digest, file path, byte counts, etc. Schema is + deliberately loose — the worker can extend it without a migration. + """ + __tablename__ = "orchestrator_events" + __table_args__ = ( + Index("ix_orchestrator_events_dst_ts", "dst_decky_uuid", "ts"), + ) + uuid: str = Field(default_factory=lambda: str(uuid4()), primary_key=True) + ts: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), index=True + ) + kind: str = Field(index=True, max_length=16) # traffic|file + protocol: str = Field(index=True, max_length=16) # ssh for MVP + action: str = Field(max_length=64) # exec:uptime|file:create|... + src_decky_uuid: Optional[str] = Field( + default=None, foreign_key="topology_deckies.uuid", index=True + ) + dst_decky_uuid: str = Field( + foreign_key="topology_deckies.uuid", index=True + ) + success: bool = Field(default=False, index=True) + payload: str = Field( + sa_column=Column("payload", Text, nullable=False, default="{}") + ) diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index a316f7f9..1959a590 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -857,3 +857,24 @@ class BaseRepository(ABC): """Auto-disable a subscription after repeated failures. Sets ``enabled=False`` and stamps ``auto_disabled_at``.""" raise NotImplementedError + + # ---------------------------------------------------------- orchestrator + + async def list_running_topology_deckies(self) -> list[dict[str, Any]]: + """Return every TopologyDecky row whose ``state == 'running'``. + + The orchestrator picks pairs from this set to drive synthetic + inter-decky activity. Cross-topology by design: a multi-topology + host still has a single orchestrator instance. + """ + raise NotImplementedError + + async def record_orchestrator_event(self, data: dict[str, Any]) -> str: + """Insert one orchestrator-emitted event row, returning its uuid.""" + raise NotImplementedError + + async def list_orchestrator_events( + self, *, limit: int = 100, since_ts: Optional[Any] = None + ) -> list[dict[str, Any]]: + """Return recent orchestrator events newest-first.""" + raise NotImplementedError diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 817b51f1..aa9d82f1 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -49,6 +49,7 @@ from decnet.web.db.models import ( TopologyEdge, TopologyStatusEvent, TopologyMutation, + OrchestratorEvent, WebhookSubscription, ) @@ -2787,3 +2788,42 @@ class SQLModelRepository(BaseRepository): ) ) await session.commit() + + # ---------------------------------------------------------- orchestrator + + async def list_running_topology_deckies(self) -> list[dict[str, Any]]: + async with self._session() as session: + result = await session.execute( + select(TopologyDecky).where(TopologyDecky.state == "running") + ) + return [ + self._deserialize_json_fields( + r.model_dump(mode="json"), ("services", "decky_config") + ) + for r in result.scalars().all() + ] + + async def record_orchestrator_event(self, data: dict[str, Any]) -> str: + payload = data.get("payload") + if isinstance(payload, (dict, list)): + data = {**data, "payload": json.dumps(payload)} + async with self._session() as session: + row = OrchestratorEvent(**data) + session.add(row) + await session.commit() + await session.refresh(row) + return row.uuid + + async def list_orchestrator_events( + self, + *, + limit: int = 100, + since_ts: Optional[datetime] = None, + ) -> list[dict[str, Any]]: + async with self._session() as session: + stmt = select(OrchestratorEvent) + if since_ts is not None: + stmt = stmt.where(OrchestratorEvent.ts >= since_ts) + stmt = stmt.order_by(desc(OrchestratorEvent.ts)).limit(limit) + result = await session.execute(stmt) + return [r.model_dump(mode="json") for r in result.scalars().all()] diff --git a/decnet/web/router/workers/api_start_all_workers.py b/decnet/web/router/workers/api_start_all_workers.py index eeab98a9..1f28d733 100644 --- a/decnet/web/router/workers/api_start_all_workers.py +++ b/decnet/web/router/workers/api_start_all_workers.py @@ -28,6 +28,7 @@ _PREFERRED_ORDER: tuple[str, ...] = ( "reuse-correlator", "enrich", "webhook", + "orchestrator", ) diff --git a/decnet/web/worker_registry.py b/decnet/web/worker_registry.py index f2158ef1..02ae7f21 100644 --- a/decnet/web/worker_registry.py +++ b/decnet/web/worker_registry.py @@ -41,6 +41,7 @@ KNOWN_WORKERS: tuple[str, ...] = ( "reuse-correlator", # credential-reuse pass — bus-woken on credential.captured "enrich", # threat-intel enrichment — bus-woken on attacker.observed/scored "webhook", # external SIEM/SOAR egress — bus consumer → HMAC HTTP POSTs + "orchestrator", # synthetic life-injection — inter-decky traffic + file ops "agent", "forwarder", "updater", diff --git a/deploy/decnet-orchestrator.service.j2 b/deploy/decnet-orchestrator.service.j2 new file mode 100644 index 00000000..968d97a2 --- /dev/null +++ b/deploy/decnet-orchestrator.service.j2 @@ -0,0 +1,43 @@ +[Unit] +Description=DECNET Orchestrator (synthetic life-injection — inter-decky traffic + file ops) +Documentation=https://git.resacachile.cl/anti/DECNET/wiki/Workers#orchestrator +After=network-online.target decnet-bus.service +Wants=network-online.target decnet-bus.service + +[Service] +Type=simple +User={{ user }} +Group={{ group }} +WorkingDirectory={{ install_dir }} +EnvironmentFile=-{{ install_dir }}/.env.local +Environment=DECNET_SYSTEM_LOGS=/var/log/decnet/decnet.orchestrator.log +ExecStart={{ venv_dir }}/bin/decnet orchestrate +StandardOutput=append:/var/log/decnet/decnet.orchestrator.log +StandardError=append:/var/log/decnet/decnet.orchestrator.log + +# The orchestrator drives `docker exec` against decky containers, so it +# needs membership in the docker group. It does NOT bind to the network, +# launch new containers, or write outside its own logs and install dir. +SupplementaryGroups=docker + +CapabilityBoundingSet= +AmbientCapabilities= + +# Security Hardening +NoNewPrivileges=yes +ProtectSystem=full +ProtectHome=read-only +PrivateTmp=yes +ProtectKernelTunables=yes +ProtectKernelModules=yes +ProtectControlGroups=yes +RestrictSUIDSGID=yes +LockPersonality=yes +ReadWritePaths={{ install_dir }} /var/log/decnet + +Restart=on-failure +RestartSec=5 +TimeoutStopSec=15 + +[Install] +WantedBy=multi-user.target diff --git a/tests/orchestrator/__init__.py b/tests/orchestrator/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/orchestrator/test_scheduler.py b/tests/orchestrator/test_scheduler.py new file mode 100644 index 00000000..1ef34c3a --- /dev/null +++ b/tests/orchestrator/test_scheduler.py @@ -0,0 +1,60 @@ +"""Picker policy tests for the orchestrator scheduler.""" +from __future__ import annotations + +import secrets + +import pytest + +from decnet.orchestrator import scheduler + + +def _decky(uuid: str, name: str, ip: str | None, services: list[str] | str): + return {"uuid": uuid, "name": name, "ip": ip, "services": services} + + +def test_pick_returns_none_when_no_ssh_deckies(): + deckies = [ + _decky("u1", "decky-01", "10.0.0.1", ["http"]), + _decky("u2", "decky-02", "10.0.0.2", ["smb"]), + ] + assert scheduler.pick(deckies) is None + + +def test_pick_returns_none_when_ssh_decky_has_no_ip(): + deckies = [_decky("u1", "decky-01", None, ["ssh"])] + assert scheduler.pick(deckies) is None + + +def test_pick_file_action_with_single_ssh_decky(): + deckies = [_decky("u1", "decky-01", "10.0.0.1", ["ssh"])] + rng = secrets.SystemRandom() + rng.seed = lambda *_: None # SystemRandom doesn't seed; ignore + action = scheduler.pick(deckies, rand=rng) + assert isinstance(action, scheduler.FileAction) + assert action.dst_uuid == "u1" + assert action.path.startswith("/") + assert action.content + + +def test_pick_traffic_or_file_with_two_ssh_deckies(): + deckies = [ + _decky("u1", "decky-01", "10.0.0.1", ["ssh"]), + _decky("u2", "decky-02", "10.0.0.2", ["ssh"]), + ] + seen_kinds: set[str] = set() + # 50/50 split — 40 trials makes both kinds essentially certain + for _ in range(40): + action = scheduler.pick(deckies) + assert action is not None + seen_kinds.add("traffic" if isinstance(action, scheduler.TrafficAction) else "file") + if isinstance(action, scheduler.TrafficAction): + assert action.src_uuid != action.dst_uuid + assert action.dst_ip in {"10.0.0.1", "10.0.0.2"} + assert action.protocol == "ssh" + assert seen_kinds == {"traffic", "file"} + + +def test_pick_skips_non_deserialised_services(): + """If services is still a JSON string (defensive), the decky is excluded.""" + deckies = [_decky("u1", "decky-01", "10.0.0.1", '["ssh"]')] + assert scheduler.pick(deckies) is None diff --git a/tests/orchestrator/test_ssh_driver.py b/tests/orchestrator/test_ssh_driver.py new file mode 100644 index 00000000..b76d162b --- /dev/null +++ b/tests/orchestrator/test_ssh_driver.py @@ -0,0 +1,99 @@ +"""Driver tests with the docker subprocess mocked. + +We don't need a real Docker daemon to validate the driver's contract: +it boils down to "build an argv, call _run, classify the result". A +dependency-injected ``_run`` keeps the tests hermetic. +""" +from __future__ import annotations + +import pytest + +from decnet.orchestrator.drivers import ssh as ssh_driver +from decnet.orchestrator.drivers.base import ActivityResult +from decnet.orchestrator.scheduler import FileAction, TrafficAction + + +@pytest.mark.asyncio +async def test_traffic_success_classifies_on_ssh_banner(monkeypatch): + captured_argv: list[list[str]] = [] + + async def fake_run(argv): + captured_argv.append(argv) + return 0, "SSH-2.0-OpenSSH_9.6\r\n", "" + + monkeypatch.setattr(ssh_driver, "_run", fake_run) + drv = ssh_driver.SSHDriver() + action = TrafficAction( + src_uuid="u1", src_name="decky-01", + dst_uuid="u2", dst_name="decky-02", + dst_ip="10.0.0.2", + ) + result = await drv.run(action) + assert isinstance(result, ActivityResult) + assert result.success is True + assert result.payload["banner"].startswith("SSH-2.0-OpenSSH") + assert captured_argv[0][:3] == ["docker", "exec", "decky-01-ssh"] + assert captured_argv[0][-1] == "10.0.0.2" + + +@pytest.mark.asyncio +async def test_traffic_failure_when_banner_missing(monkeypatch): + async def fake_run(argv): + return 1, "", "Connection refused" + + monkeypatch.setattr(ssh_driver, "_run", fake_run) + drv = ssh_driver.SSHDriver() + action = TrafficAction( + src_uuid="u1", src_name="decky-01", + dst_uuid="u2", dst_name="decky-02", + dst_ip="10.0.0.2", + ) + result = await drv.run(action) + assert result.success is False + assert result.payload["rc"] == 1 + assert "Connection refused" in result.payload["stderr"] + + +@pytest.mark.asyncio +async def test_file_action_invokes_docker_exec_on_dst(monkeypatch): + captured_argv: list[list[str]] = [] + + async def fake_run(argv): + captured_argv.append(argv) + return 0, "", "" + + monkeypatch.setattr(ssh_driver, "_run", fake_run) + drv = ssh_driver.SSHDriver() + action = FileAction( + dst_uuid="u2", dst_name="decky-02", + path="/tmp/.cache-1700000000.tmp", + content="session=1700000000\n", + ) + result = await drv.run(action) + assert result.success is True + assert result.payload["bytes"] == len("session=1700000000\n".encode()) + argv = captured_argv[0] + assert argv[:3] == ["docker", "exec", "decky-02-ssh"] + assert argv[3] == "sh" + assert argv[4] == "-c" + # The shell payload must single-quote both the content and the path — + # any unquoted ``;`` or ``$`` here would mean a shell-injection bug. + sh_cmd = argv[5] + # Path appears (shlex.quote leaves safe paths unquoted) and content + # is single-quoted — that's the shell-injection-safe contract. + assert "/tmp/.cache-1700000000.tmp" in sh_cmd + assert "'session=1700000000\n'" in sh_cmd + assert "mkdir -p /tmp" in sh_cmd + + +@pytest.mark.asyncio +async def test_run_handles_missing_docker_binary(monkeypatch): + async def fake_create(*args, **kwargs): + raise FileNotFoundError("docker") + + monkeypatch.setattr( + "asyncio.create_subprocess_exec", fake_create, + ) + rc, out, err = await ssh_driver._run(["docker", "exec", "x", "true"]) + assert rc == 127 + assert "not found" in err diff --git a/tests/orchestrator/test_worker_integration.py b/tests/orchestrator/test_worker_integration.py new file mode 100644 index 00000000..2d661d27 --- /dev/null +++ b/tests/orchestrator/test_worker_integration.py @@ -0,0 +1,123 @@ +"""End-to-end-ish: run one orchestrator tick against a real SQLite repo + +FakeBus, with the docker subprocess stubbed. Verifies that: + +* :func:`scheduler.pick` reads the deckies the repo returns, +* the driver result is persisted to ``orchestrator_events``, +* a bus event is published to the right topic. +""" +from __future__ import annotations + +import json + +import pytest +import pytest_asyncio + +from decnet.bus.fake import FakeBus +from decnet.orchestrator import worker as orch_worker +from decnet.orchestrator.drivers import ssh as ssh_driver +from decnet.web.db.models import TopologyDecky, Topology +from decnet.web.db.sqlite.repository import SQLiteRepository + + +@pytest_asyncio.fixture +async def repo(tmp_path): + r = SQLiteRepository(db_path=str(tmp_path / "decnet.db")) + await r.initialize() + yield r + await r.engine.dispose() + + +@pytest_asyncio.fixture +async def fake_bus(): + bus = FakeBus() + await bus.connect() + try: + yield bus + finally: + await bus.close() + + +async def _seed_two_running_ssh_deckies(repo: SQLiteRepository) -> tuple[str, str]: + async with repo._session() as session: + topo = Topology(name="t1", config_snapshot="{}", status="active") + session.add(topo) + await session.commit() + await session.refresh(topo) + d1 = TopologyDecky( + topology_id=topo.id, name="decky-01", + services=json.dumps(["ssh"]), ip="10.0.0.1", state="running", + ) + d2 = TopologyDecky( + topology_id=topo.id, name="decky-02", + services=json.dumps(["ssh"]), ip="10.0.0.2", state="running", + ) + session.add(d1) + session.add(d2) + await session.commit() + await session.refresh(d1) + await session.refresh(d2) + return d1.uuid, d2.uuid + + +@pytest.mark.asyncio +async def test_one_tick_records_event_and_publishes(repo, fake_bus, monkeypatch): + await _seed_two_running_ssh_deckies(repo) + + # Pretend every docker exec succeeds with an SSH banner; that lets + # both action kinds (traffic + file) land as success rows so the + # assertions below don't have to care which one the scheduler picked. + async def fake_run(argv): + if argv[3] == "python3": + return 0, "SSH-2.0-OpenSSH_9.6\r\n", "" + return 0, "", "" + + monkeypatch.setattr(ssh_driver, "_run", fake_run) + + received: list = [] + + async def collect(): + async with fake_bus.subscribe("orchestrator.>") as sub: + async for ev in sub: + received.append(ev) + if len(received) >= 1: + return + + import asyncio + collector = asyncio.create_task(collect()) + # Yield once so the subscription is registered before we publish. + await asyncio.sleep(0) + + driver = ssh_driver.SSHDriver() + await orch_worker._one_tick(repo, driver, fake_bus) + + await asyncio.wait_for(collector, timeout=2.0) + + rows = await repo.list_orchestrator_events(limit=10) + assert len(rows) == 1 + row = rows[0] + assert row["success"] is True + assert row["protocol"] == "ssh" + assert row["kind"] in {"traffic", "file"} + + assert len(received) == 1 + ev = received[0] + assert ev.topic.startswith("orchestrator.") + assert ev.payload["success"] is True + assert ev.payload["kind"] == row["kind"] + + +@pytest.mark.asyncio +async def test_tick_is_noop_when_no_running_deckies(repo, fake_bus, monkeypatch): + called = False + + async def fake_run(argv): + nonlocal called + called = True + return 0, "SSH-2.0-foo", "" + + monkeypatch.setattr(ssh_driver, "_run", fake_run) + driver = ssh_driver.SSHDriver() + await orch_worker._one_tick(repo, driver, fake_bus) + + assert called is False + assert await repo.list_orchestrator_events(limit=10) == []