7 Commits

Author SHA1 Message Date
201d246c07 fix(ci): fix indentation on ci.yaml
Some checks failed
CI / Lint (ruff) (push) Successful in 14s
CI / SAST (bandit) (push) Successful in 15s
CI / Test (Standard) (3.11) (push) Has been cancelled
CI / Test (Live) (3.11) (push) Has been cancelled
CI / Test (Fuzz) (3.11) (push) Has been cancelled
CI / Merge dev → testing (push) Has been cancelled
CI / Prepare Merge to Main (push) Has been cancelled
CI / Finalize Merge to Main (push) Has been cancelled
CI / Dependency audit (pip-audit) (push) Has been cancelled
2026-04-20 16:46:30 -04:00
47cd200e1d feat(mazenet): repo methods for topology/LAN/decky/edge/status events
Adds topology CRUD to BaseRepository (NotImplementedError defaults) and
implements them in SQLModelRepository: create/get/list/delete topologies,
add/update/list LANs and TopologyDeckies, add/list edges, plus an atomic
update_topology_status that appends a TopologyStatusEvent in the same
transaction.  Cascade delete sweeps children before the topology row.

Covered by tests/topology/test_repo.py (roundtrip, per-topology name
uniqueness, status event log, cascade delete, status filter) and an
extension to tests/test_base_repo.py for the NotImplementedError surface.
2026-04-20 16:43:49 -04:00
096a35b24a feat(mazenet): add topology schema to models.py
Introduces five new SQLModel tables for MazeNET (nested deception
topologies): Topology, LAN, TopologyDecky, TopologyEdge, and
TopologyStatusEvent.  DeckyShard is intentionally not touched —
TopologyDecky is a purpose-built sibling for MazeNET's lifecycle
(topology-scoped UUIDs, per-topology name uniqueness).

Part of MazeNET v1 (nested self-container network-of-networks).
2026-04-20 16:40:10 -04:00
8a2876fe86 fix(api): document missing HTTP status codes on router endpoints
All checks were successful
CI / Lint (ruff) (push) Successful in 16s
CI / SAST (bandit) (push) Successful in 18s
CI / Dependency audit (pip-audit) (push) Successful in 26s
CI / Test (Standard) (3.11) (push) Successful in 2m41s
CI / Test (Live) (3.11) (push) Successful in 1m6s
CI / Test (Fuzz) (3.11) (push) Successful in 1h9m14s
CI / Finalize Merge to Main (push) Has been skipped
CI / Merge dev → testing (push) Successful in 12s
CI / Prepare Merge to Main (push) Has been skipped
Schemathesis was failing CI on routes that returned status codes not
declared in their OpenAPI responses= dicts. Adds the missing codes
across swarm_updates, swarm_mgmt, swarm, fleet and attackers routers.

Also adds 400 to every POST/PUT/PATCH that accepts a JSON body —
Starlette returns 400 on malformed/non-UTF8 bodies before FastAPI's
422 validation runs, which schemathesis fuzzing trips every time.

No handler logic changed.
2026-04-20 15:25:02 -04:00
3e8e4c9e1c fix(ci): run less harsh tests on CI, let local runners run harder ones 2026-04-20 14:07:34 -04:00
64bc6fcb1d chores(pyproj): modified some values 2026-04-20 13:22:49 -04:00
af9d59d3ee fixed(api): documentation 2026-04-20 13:20:42 -04:00
24 changed files with 692 additions and 76 deletions

View File

@@ -61,22 +61,22 @@ jobs:
name: Test (Live) name: Test (Live)
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: [test-standard] needs: [test-standard]
services:
mysql:
image: mysql:8.0
env:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: decnet_test
ports:
- 3307:3306
options: >-
--health-cmd="mysqladmin ping -h 127.0.0.1"
--health-interval=10s
--health-timeout=5s
--health-retries=5
strategy: strategy:
matrix: matrix:
python-version: ["3.11"] python-version: ["3.11"]
services:
mysql:
image: mysql:8.0
env:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: decnet_test
ports:
- 3307:3306
options: >-
--health-cmd="mysqladmin ping -h 127.0.0.1"
--health-interval=10s
--health-timeout=5s
--health-retries=5
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- uses: actions/setup-python@v5 - uses: actions/setup-python@v5
@@ -105,6 +105,8 @@ jobs:
python-version: ${{ matrix.python-version }} python-version: ${{ matrix.python-version }}
- run: pip install -e .[dev] - run: pip install -e .[dev]
- run: pytest -m fuzz - run: pytest -m fuzz
env:
SCHEMATHESIS_CONFIG: schemathesis.ci.toml
merge-to-testing: merge-to-testing:
name: Merge dev → testing name: Merge dev → testing

View File

@@ -1,58 +0,0 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Commands
```bash
# Install (dev)
pip install -e .
# List registered service plugins
decnet services
# Dry-run (generates compose, no containers)
decnet deploy --mode unihost --deckies 3 --randomize-services --dry-run
# Full deploy (requires root for MACVLAN)
sudo decnet deploy --mode unihost --deckies 5 --interface eth0 --randomize-services
sudo decnet deploy --mode unihost --deckies 3 --services ssh,smb --log-target 192.168.1.5:5140
# Status / teardown
decnet status
sudo decnet teardown --all
sudo decnet teardown --id decky-01
```
## Project Overview
DECNET is a honeypot/deception network framework. It deploys fake machines (called **deckies**) with realistic services (RDP, SMB, SSH, FTP, etc.) to lure and profile attackers. All attacker interactions are aggregated to an isolated logging network (ELK stack / SIEM).
## Deployment Models
**UNIHOST** — one real host spins up _n_ deckies via a container orchestrator. Simpler, single-machine deployment.
**SWARM (MULTIHOST)**_n_ real hosts each running deckies. Orchestrated via Ansible/sshpass or similar tooling.
## Core Technology Choices
- **Containers**: Docker Compose is the starting point but other orchestration frameworks should be evaluated if they serve the project better. `debian:bookworm-slim` is the default base image; mixing in Ubuntu, CentOS, or other distros is encouraged to make the decoy network look heterogeneous.
- **Networking**: Deckies need to appear as real machines on the LAN (own MACs/IPs). MACVLAN and IPVLAN are candidates; the right driver depends on the host environment. WSL has known limitations — bare metal or a VM is preferred for testing.
- **Log pipeline**: Logstash → ELK stack → SIEM (isolated network, not reachable from decoy network)
## Architecture Constraints
- The decoy network must be reachable from the outside (attacker-facing).
- The logging/aggregation network must be isolated from the decoy network.
- A publicly accessible real server acts as the bridge between the two networks.
- Deckies should differ in exposed services and OS fingerprints to appear as a heterogeneous network.
- **IMPORTANT**: The system now strictly enforces dependency injection for storage. Do not import `SQLiteRepository` directly in new features; instead, use `get_repository()` from the factory or the FastAPI `get_repo` dependency.
## Development and testing
- For every new feature, pytests must me made.
- Pytest is the main testing framework in use.
- NEVER pass broken code to the user.
- Broken means: not running, not passing 100% tests, etc.
- After tests pass with 100%, always git commit your changes.
- NEVER add "Co-Authored-By" or any Claude attribution lines to git commit messages.

View File

@@ -1,6 +1,7 @@
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Literal, Optional, Any, List, Annotated from typing import Literal, Optional, Any, List, Annotated
from sqlalchemy import Column, Text from uuid import uuid4
from sqlalchemy import Column, Text, UniqueConstraint
from sqlalchemy.dialects.mysql import MEDIUMTEXT from sqlalchemy.dialects.mysql import MEDIUMTEXT
from sqlmodel import SQLModel, Field from sqlmodel import SQLModel, Field
from pydantic import BaseModel, ConfigDict, Field as PydanticField, BeforeValidator from pydantic import BaseModel, ConfigDict, Field as PydanticField, BeforeValidator
@@ -192,6 +193,110 @@ class AttackerBehavior(SQLModel, table=True):
default_factory=lambda: datetime.now(timezone.utc), index=True default_factory=lambda: datetime.now(timezone.utc), index=True
) )
# --- MazeNET tables ---
# Nested deception topologies: an arbitrary-depth DAG of LANs connected by
# multi-homed "bridge" deckies. Purpose-built; disjoint from DeckyShard which
# remains SWARM-only.
class Topology(SQLModel, table=True):
__tablename__ = "topologies"
id: str = Field(default_factory=lambda: str(uuid4()), primary_key=True)
name: str = Field(index=True, unique=True)
mode: str = Field(default="unihost") # unihost|agent
# Full TopologyConfig snapshot (including seed) used at generation time.
config_snapshot: str = Field(
sa_column=Column("config_snapshot", _BIG_TEXT, nullable=False, default="{}")
)
status: str = Field(
default="pending", index=True
) # pending|deploying|active|degraded|failed|tearing_down|torn_down
status_changed_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc)
)
created_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True
)
class LAN(SQLModel, table=True):
__tablename__ = "lans"
__table_args__ = (UniqueConstraint("topology_id", "name", name="uq_lan_topology_name"),)
id: str = Field(default_factory=lambda: str(uuid4()), primary_key=True)
topology_id: str = Field(foreign_key="topologies.id", index=True)
name: str
# Populated after the Docker network is created; nullable before deploy.
docker_network_id: Optional[str] = Field(default=None)
subnet: str
is_dmz: bool = Field(default=False)
class TopologyDecky(SQLModel, table=True):
"""A decky belonging to a MazeNET topology.
Disjoint from DeckyShard (which is SWARM-only). UUID PK; decky name is
unique only within a topology, so two topologies can both have a
``decky-01`` without colliding.
"""
__tablename__ = "topology_deckies"
__table_args__ = (
UniqueConstraint("topology_id", "name", name="uq_topology_decky_name"),
)
uuid: str = Field(default_factory=lambda: str(uuid4()), primary_key=True)
topology_id: str = Field(foreign_key="topologies.id", index=True)
name: str
# JSON list[str] of service names on this decky (snapshot of assignment).
services: str = Field(
sa_column=Column("services", _BIG_TEXT, nullable=False, default="[]")
)
# Full serialised DeckyConfig snapshot — lets the dashboard render the
# same card shape as DeckyShard without a live round-trip.
decky_config: Optional[str] = Field(
default=None, sa_column=Column("decky_config", _BIG_TEXT, nullable=True)
)
ip: Optional[str] = Field(default=None)
# Same vocabulary as DeckyShard.state to keep dashboard rendering uniform.
state: str = Field(
default="pending", index=True
) # pending|running|failed|torn_down|degraded|tearing_down|teardown_failed
last_error: Optional[str] = Field(
default=None, sa_column=Column("last_error", Text, nullable=True)
)
compose_hash: Optional[str] = Field(default=None)
last_seen: Optional[datetime] = Field(default=None)
updated_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc)
)
class TopologyEdge(SQLModel, table=True):
"""Membership edge: a decky attached to a LAN.
A decky appearing in ≥2 edges is multi-homed (a bridge decky).
"""
__tablename__ = "topology_edges"
id: str = Field(default_factory=lambda: str(uuid4()), primary_key=True)
topology_id: str = Field(foreign_key="topologies.id", index=True)
decky_uuid: str = Field(foreign_key="topology_deckies.uuid", index=True)
lan_id: str = Field(foreign_key="lans.id", index=True)
is_bridge: bool = Field(default=False)
forwards_l3: bool = Field(default=False)
class TopologyStatusEvent(SQLModel, table=True):
"""Append-only audit log of topology status transitions."""
__tablename__ = "topology_status_events"
id: str = Field(default_factory=lambda: str(uuid4()), primary_key=True)
topology_id: str = Field(foreign_key="topologies.id", index=True)
from_status: str
to_status: str
at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True
)
reason: Optional[str] = Field(
default=None, sa_column=Column("reason", Text, nullable=True)
)
# --- API Request/Response Models (Pydantic) --- # --- API Request/Response Models (Pydantic) ---
class Token(BaseModel): class Token(BaseModel):

View File

@@ -234,3 +234,67 @@ class BaseRepository(ABC):
async def delete_decky_shard(self, decky_name: str) -> bool: async def delete_decky_shard(self, decky_name: str) -> bool:
raise NotImplementedError raise NotImplementedError
# ----------------------------------------------------------- mazenet
# MazeNET topology persistence. Default no-op / NotImplementedError so
# non-default backends stay functional; SQLModelRepository provides the
# real implementation used by SQLite and MySQL.
async def create_topology(self, data: dict[str, Any]) -> str:
raise NotImplementedError
async def get_topology(self, topology_id: str) -> Optional[dict[str, Any]]:
raise NotImplementedError
async def list_topologies(
self, status: Optional[str] = None
) -> list[dict[str, Any]]:
raise NotImplementedError
async def update_topology_status(
self,
topology_id: str,
new_status: str,
reason: Optional[str] = None,
) -> None:
raise NotImplementedError
async def delete_topology_cascade(self, topology_id: str) -> bool:
raise NotImplementedError
async def add_lan(self, data: dict[str, Any]) -> str:
raise NotImplementedError
async def update_lan(self, lan_id: str, fields: dict[str, Any]) -> None:
raise NotImplementedError
async def list_lans_for_topology(
self, topology_id: str
) -> list[dict[str, Any]]:
raise NotImplementedError
async def add_topology_decky(self, data: dict[str, Any]) -> str:
raise NotImplementedError
async def update_topology_decky(
self, decky_uuid: str, fields: dict[str, Any]
) -> None:
raise NotImplementedError
async def list_topology_deckies(
self, topology_id: str
) -> list[dict[str, Any]]:
raise NotImplementedError
async def add_topology_edge(self, data: dict[str, Any]) -> str:
raise NotImplementedError
async def list_topology_edges(
self, topology_id: str
) -> list[dict[str, Any]]:
raise NotImplementedError
async def list_topology_status_events(
self, topology_id: str, limit: int = 100
) -> list[dict[str, Any]]:
raise NotImplementedError

View File

@@ -36,6 +36,11 @@ from decnet.web.db.models import (
AttackerBehavior, AttackerBehavior,
SwarmHost, SwarmHost,
DeckyShard, DeckyShard,
Topology,
LAN,
TopologyDecky,
TopologyEdge,
TopologyStatusEvent,
) )
@@ -899,3 +904,220 @@ class SQLModelRepository(BaseRepository):
) )
await session.commit() await session.commit()
return bool(result.rowcount) return bool(result.rowcount)
# ------------------------------------------------------------ mazenet
@staticmethod
def _serialize_json_fields(data: dict[str, Any], keys: tuple[str, ...]) -> dict[str, Any]:
out = dict(data)
for k in keys:
v = out.get(k)
if v is not None and not isinstance(v, str):
out[k] = orjson.dumps(v).decode()
return out
@staticmethod
def _deserialize_json_fields(d: dict[str, Any], keys: tuple[str, ...]) -> dict[str, Any]:
for k in keys:
v = d.get(k)
if isinstance(v, str):
try:
d[k] = json.loads(v)
except (json.JSONDecodeError, TypeError):
pass
return d
async def create_topology(self, data: dict[str, Any]) -> str:
payload = self._serialize_json_fields(data, ("config_snapshot",))
async with self._session() as session:
row = Topology(**payload)
session.add(row)
await session.commit()
await session.refresh(row)
return row.id
async def get_topology(self, topology_id: str) -> Optional[dict[str, Any]]:
async with self._session() as session:
result = await session.execute(
select(Topology).where(Topology.id == topology_id)
)
row = result.scalar_one_or_none()
if not row:
return None
d = row.model_dump(mode="json")
return self._deserialize_json_fields(d, ("config_snapshot",))
async def list_topologies(
self, status: Optional[str] = None
) -> list[dict[str, Any]]:
statement = select(Topology).order_by(desc(Topology.created_at))
if status:
statement = statement.where(Topology.status == status)
async with self._session() as session:
result = await session.execute(statement)
return [
self._deserialize_json_fields(
r.model_dump(mode="json"), ("config_snapshot",)
)
for r in result.scalars().all()
]
async def update_topology_status(
self,
topology_id: str,
new_status: str,
reason: Optional[str] = None,
) -> None:
"""Update topology.status and append a TopologyStatusEvent atomically.
Transition legality is enforced in ``decnet.topology.status``; this
method trusts the caller.
"""
now = datetime.now(timezone.utc)
async with self._session() as session:
result = await session.execute(
select(Topology).where(Topology.id == topology_id)
)
topo = result.scalar_one_or_none()
if topo is None:
return
from_status = topo.status
topo.status = new_status
topo.status_changed_at = now
session.add(topo)
session.add(
TopologyStatusEvent(
topology_id=topology_id,
from_status=from_status,
to_status=new_status,
at=now,
reason=reason,
)
)
await session.commit()
async def delete_topology_cascade(self, topology_id: str) -> bool:
"""Delete topology and all children. No portable ON DELETE CASCADE."""
async with self._session() as session:
params = {"t": topology_id}
await session.execute(
text("DELETE FROM topology_status_events WHERE topology_id = :t"),
params,
)
await session.execute(
text("DELETE FROM topology_edges WHERE topology_id = :t"),
params,
)
await session.execute(
text("DELETE FROM topology_deckies WHERE topology_id = :t"),
params,
)
await session.execute(
text("DELETE FROM lans WHERE topology_id = :t"),
params,
)
result = await session.execute(
select(Topology).where(Topology.id == topology_id)
)
topo = result.scalar_one_or_none()
if not topo:
await session.commit()
return False
await session.delete(topo)
await session.commit()
return True
async def add_lan(self, data: dict[str, Any]) -> str:
async with self._session() as session:
row = LAN(**data)
session.add(row)
await session.commit()
await session.refresh(row)
return row.id
async def update_lan(self, lan_id: str, fields: dict[str, Any]) -> None:
if not fields:
return
async with self._session() as session:
await session.execute(
update(LAN).where(LAN.id == lan_id).values(**fields)
)
await session.commit()
async def list_lans_for_topology(
self, topology_id: str
) -> list[dict[str, Any]]:
async with self._session() as session:
result = await session.execute(
select(LAN).where(LAN.topology_id == topology_id).order_by(asc(LAN.name))
)
return [r.model_dump(mode="json") for r in result.scalars().all()]
async def add_topology_decky(self, data: dict[str, Any]) -> str:
payload = self._serialize_json_fields(data, ("services", "decky_config"))
async with self._session() as session:
row = TopologyDecky(**payload)
session.add(row)
await session.commit()
await session.refresh(row)
return row.uuid
async def update_topology_decky(
self, decky_uuid: str, fields: dict[str, Any]
) -> None:
if not fields:
return
payload = self._serialize_json_fields(fields, ("services", "decky_config"))
payload.setdefault("updated_at", datetime.now(timezone.utc))
async with self._session() as session:
await session.execute(
update(TopologyDecky)
.where(TopologyDecky.uuid == decky_uuid)
.values(**payload)
)
await session.commit()
async def list_topology_deckies(
self, topology_id: str
) -> list[dict[str, Any]]:
async with self._session() as session:
result = await session.execute(
select(TopologyDecky)
.where(TopologyDecky.topology_id == topology_id)
.order_by(asc(TopologyDecky.name))
)
return [
self._deserialize_json_fields(
r.model_dump(mode="json"), ("services", "decky_config")
)
for r in result.scalars().all()
]
async def add_topology_edge(self, data: dict[str, Any]) -> str:
async with self._session() as session:
row = TopologyEdge(**data)
session.add(row)
await session.commit()
await session.refresh(row)
return row.id
async def list_topology_edges(
self, topology_id: str
) -> list[dict[str, Any]]:
async with self._session() as session:
result = await session.execute(
select(TopologyEdge).where(TopologyEdge.topology_id == topology_id)
)
return [r.model_dump(mode="json") for r in result.scalars().all()]
async def list_topology_status_events(
self, topology_id: str, limit: int = 100
) -> list[dict[str, Any]]:
async with self._session() as session:
result = await session.execute(
select(TopologyStatusEvent)
.where(TopologyStatusEvent.topology_id == topology_id)
.order_by(desc(TopologyStatusEvent.at))
.limit(limit)
)
return [r.model_dump(mode="json") for r in result.scalars().all()]

View File

@@ -30,8 +30,11 @@ api_router = APIRouter(
# require_* Depends or by the global auth middleware). Document 401/403 # require_* Depends or by the global auth middleware). Document 401/403
# here so the OpenAPI schema reflects reality for contract tests. # here so the OpenAPI schema reflects reality for contract tests.
responses={ responses={
400: {"description": "Malformed request body"},
401: {"description": "Missing or invalid credentials"}, 401: {"description": "Missing or invalid credentials"},
403: {"description": "Authenticated but not authorized"}, 403: {"description": "Authenticated but not authorized"},
404: {"description": "Referenced resource does not exist"},
409: {"description": "Conflict with existing resource"},
}, },
) )

View File

@@ -15,6 +15,7 @@ router = APIRouter()
401: {"description": "Could not validate credentials"}, 401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"}, 403: {"description": "Insufficient permissions"},
404: {"description": "Attacker not found"}, 404: {"description": "Attacker not found"},
422: {"description": "Query parameter validation error (limit/offset out of range or invalid)"},
}, },
) )
@_traced("api.get_attacker_commands") @_traced("api.get_attacker_commands")

View File

@@ -26,7 +26,8 @@ router = APIRouter()
403: {"description": "Insufficient permissions"}, 403: {"description": "Insufficient permissions"},
409: {"description": "Configuration conflict (e.g. invalid IP allocation or network mismatch)"}, 409: {"description": "Configuration conflict (e.g. invalid IP allocation or network mismatch)"},
422: {"description": "Invalid INI config or schema validation error"}, 422: {"description": "Invalid INI config or schema validation error"},
500: {"description": "Deployment failed"} 500: {"description": "Deployment failed"},
502: {"description": "Partial swarm deploy failure — one or more worker hosts returned an error"},
} }
) )
@_traced("api.deploy_deckies") @_traced("api.deploy_deckies")

View File

@@ -11,7 +11,12 @@ router = APIRouter()
@router.post( @router.post(
"/deckies/{decky_name}/mutate", "/deckies/{decky_name}/mutate",
tags=["Fleet Management"], tags=["Fleet Management"],
responses={401: {"description": "Could not validate credentials"}, 403: {"description": "Insufficient permissions"}, 404: {"description": "Decky not found"}} responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Decky not found"},
422: {"description": "Path parameter validation error (decky_name must match ^[a-z0-9\\-]{1,64}$)"},
}
) )
@_traced("api.mutate_decky") @_traced("api.mutate_decky")
async def api_mutate_decky( async def api_mutate_decky(

View File

@@ -29,7 +29,11 @@ router = APIRouter()
response_model=SwarmEnrolledBundle, response_model=SwarmEnrolledBundle,
status_code=status.HTTP_201_CREATED, status_code=status.HTTP_201_CREATED,
tags=["Swarm Hosts"], tags=["Swarm Hosts"],
responses={409: {"description": "A worker with this name is already enrolled"}}, responses={
400: {"description": "Bad Request (malformed JSON body)"},
409: {"description": "A worker with this name is already enrolled"},
422: {"description": "Request body validation error"},
},
) )
async def api_enroll_host( async def api_enroll_host(
req: SwarmEnrollRequest, req: SwarmEnrollRequest,

View File

@@ -101,8 +101,10 @@ async def _verify_peer_matches_host(
status_code=204, status_code=204,
tags=["Swarm Health"], tags=["Swarm Health"],
responses={ responses={
400: {"description": "Bad Request (malformed JSON body)"},
403: {"description": "Peer cert missing, or its fingerprint does not match the host's pinned cert"}, 403: {"description": "Peer cert missing, or its fingerprint does not match the host's pinned cert"},
404: {"description": "host_uuid is not enrolled"}, 404: {"description": "host_uuid is not enrolled"},
422: {"description": "Request body validation error"},
}, },
) )
async def heartbeat( async def heartbeat(

View File

@@ -25,7 +25,11 @@ router = APIRouter()
"/teardown", "/teardown",
response_model=SwarmDeployResponse, response_model=SwarmDeployResponse,
tags=["Swarm Deployments"], tags=["Swarm Deployments"],
responses={404: {"description": "A targeted host does not exist"}}, responses={
400: {"description": "Bad Request (malformed JSON body)"},
404: {"description": "A targeted host does not exist"},
422: {"description": "Request body validation error"},
},
) )
async def api_teardown_swarm( async def api_teardown_swarm(
req: SwarmTeardownRequest, req: SwarmTeardownRequest,

View File

@@ -24,6 +24,12 @@ router = APIRouter()
"/hosts/{uuid}", "/hosts/{uuid}",
status_code=status.HTTP_204_NO_CONTENT, status_code=status.HTTP_204_NO_CONTENT,
tags=["Swarm Management"], tags=["Swarm Management"],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Host not found"},
422: {"description": "Path parameter validation error"},
},
) )
async def decommission_host( async def decommission_host(
uuid: str, uuid: str,

View File

@@ -322,6 +322,13 @@ def _render_bootstrap(
response_model=EnrollBundleResponse, response_model=EnrollBundleResponse,
status_code=status.HTTP_201_CREATED, status_code=status.HTTP_201_CREATED,
tags=["Swarm Management"], tags=["Swarm Management"],
responses={
400: {"description": "Bad Request (malformed JSON body)"},
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
409: {"description": "A worker with this name is already enrolled"},
422: {"description": "Request body validation error"},
},
) )
async def create_enroll_bundle( async def create_enroll_bundle(
req: EnrollBundleRequest, req: EnrollBundleRequest,

View File

@@ -115,6 +115,13 @@ async def _run_teardown(
response_model=TeardownHostResponse, response_model=TeardownHostResponse,
status_code=status.HTTP_202_ACCEPTED, status_code=status.HTTP_202_ACCEPTED,
tags=["Swarm Management"], tags=["Swarm Management"],
responses={
400: {"description": "Bad Request (malformed JSON body)"},
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Host not found"},
422: {"description": "Request body or path parameter validation error"},
},
) )
async def teardown_host( async def teardown_host(
uuid: str, uuid: str,

View File

@@ -64,6 +64,10 @@ async def _probe_host(host: dict[str, Any]) -> HostReleaseInfo:
"/hosts", "/hosts",
response_model=HostReleasesResponse, response_model=HostReleasesResponse,
tags=["Swarm Updates"], tags=["Swarm Updates"],
responses={
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
},
) )
async def api_list_host_releases( async def api_list_host_releases(
admin: dict = Depends(require_admin), admin: dict = Depends(require_admin),

View File

@@ -128,6 +128,13 @@ def _is_expected_connection_drop(exc: BaseException) -> bool:
"/push", "/push",
response_model=PushUpdateResponse, response_model=PushUpdateResponse,
tags=["Swarm Updates"], tags=["Swarm Updates"],
responses={
400: {"description": "Bad Request (malformed JSON body or conflicting host_uuids/all flags)"},
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "No matching target hosts or no updater-capable hosts enrolled"},
422: {"description": "Request body validation error"},
},
) )
async def api_push_update( async def api_push_update(
req: PushUpdateRequest, req: PushUpdateRequest,

View File

@@ -68,6 +68,13 @@ async def _push_self_one(host: dict[str, Any], tarball: bytes, sha: str) -> Push
"/push-self", "/push-self",
response_model=PushUpdateResponse, response_model=PushUpdateResponse,
tags=["Swarm Updates"], tags=["Swarm Updates"],
responses={
400: {"description": "Bad Request (malformed JSON body or conflicting host_uuids/all flags)"},
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "No matching target hosts or no updater-capable hosts enrolled"},
422: {"description": "Request body validation error"},
},
) )
async def api_push_update_self( async def api_push_update_self(
req: PushUpdateRequest, req: PushUpdateRequest,

View File

@@ -23,6 +23,13 @@ router = APIRouter()
"/rollback", "/rollback",
response_model=RollbackResponse, response_model=RollbackResponse,
tags=["Swarm Updates"], tags=["Swarm Updates"],
responses={
400: {"description": "Bad Request (malformed JSON body or host has no updater bundle)"},
401: {"description": "Could not validate credentials"},
403: {"description": "Insufficient permissions"},
404: {"description": "Unknown host, or no previous release slot on the worker"},
422: {"description": "Request body validation error"},
},
) )
async def api_rollback_host( async def api_rollback_host(
req: RollbackRequest, req: RollbackRequest,

View File

@@ -76,6 +76,7 @@ decnet = "decnet.cli:app"
[tool.pytest.ini_options] [tool.pytest.ini_options]
asyncio_mode = "auto" asyncio_mode = "auto"
asyncio_debug = "true" asyncio_debug = "true"
asyncio_default_fixture_loop_scope = "module"
addopts = "-m 'not fuzz and not live and not stress and not bench and not docker' -v -q -x -n logical --dist loadscope" addopts = "-m 'not fuzz and not live and not stress and not bench and not docker' -v -q -x -n logical --dist loadscope"
markers = [ markers = [
"fuzz: hypothesis-based fuzz tests (slow, run with -m fuzz or -m '' for all)", "fuzz: hypothesis-based fuzz tests (slow, run with -m fuzz or -m '' for all)",

35
schemathesis.ci.toml Normal file
View File

@@ -0,0 +1,35 @@
# schemathesis.ci.toml
[[project]]
title = "DECNET API"
continue-on-failure = true
request-timeout = 10.0
workers = "auto"
[generation]
mode = "all"
max-examples = 50 # 10x less than local
no-shrink = true # skip shrinking in CI, saves time
allow-x00 = true
unique-inputs = true
[phases.examples]
enabled = true
fill-missing = true
[phases.coverage]
enabled = true
[phases.fuzzing]
enabled = true
[phases.stateful]
enabled = true
max-steps = 5 # 4x less than local
[checks]
status_code_conformance.enabled = true
content_type_conformance.enabled = true
response_schema_conformance.enabled = true
negative_data_rejection.enabled = true
ignored_auth.enabled = true
max_response_time = 5.0 # more lenient than local 2s

View File

@@ -88,6 +88,20 @@ async def test_base_repo_coverage():
(dr.upsert_decky_shard, ({},)), (dr.upsert_decky_shard, ({},)),
(dr.list_decky_shards, ()), (dr.list_decky_shards, ()),
(dr.delete_decky_shards_for_host, ("u",)), (dr.delete_decky_shards_for_host, ("u",)),
(dr.create_topology, ({},)),
(dr.get_topology, ("t",)),
(dr.list_topologies, ()),
(dr.update_topology_status, ("t", "active")),
(dr.delete_topology_cascade, ("t",)),
(dr.add_lan, ({},)),
(dr.update_lan, ("l", {})),
(dr.list_lans_for_topology, ("t",)),
(dr.add_topology_decky, ({},)),
(dr.update_topology_decky, ("d", {})),
(dr.list_topology_deckies, ("t",)),
(dr.add_topology_edge, ({},)),
(dr.list_topology_edges, ("t",)),
(dr.list_topology_status_events, ("t",)),
]: ]:
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
await coro(*args) await coro(*args)

View File

166
tests/topology/test_repo.py Normal file
View File

@@ -0,0 +1,166 @@
"""Direct async tests for MazeNET topology persistence.
Exercises the repository layer without going through the HTTP stack or
the in-memory generator. The synthetic topology here is hand-built so
the test remains meaningful even if generator.py regresses.
"""
import pytest
from decnet.web.db.factory import get_repository
@pytest.fixture
async def repo(tmp_path):
r = get_repository(db_path=str(tmp_path / "mazenet.db"))
await r.initialize()
return r
@pytest.mark.anyio
async def test_topology_roundtrip(repo):
t_id = await repo.create_topology(
{
"name": "alpha",
"mode": "unihost",
"config_snapshot": {"depth": 3, "seed": 42},
}
)
assert t_id
t = await repo.get_topology(t_id)
assert t is not None
assert t["name"] == "alpha"
assert t["status"] == "pending"
# JSON field round-trips as a dict, not a string
assert t["config_snapshot"] == {"depth": 3, "seed": 42}
@pytest.mark.anyio
async def test_lan_add_update_list(repo):
t_id = await repo.create_topology(
{"name": "beta", "mode": "unihost", "config_snapshot": {}}
)
lan_id = await repo.add_lan(
{"topology_id": t_id, "name": "DMZ", "subnet": "172.20.0.0/24", "is_dmz": True}
)
await repo.add_lan(
{"topology_id": t_id, "name": "LAN-A", "subnet": "172.20.1.0/24"}
)
await repo.update_lan(lan_id, {"docker_network_id": "abc123"})
lans = await repo.list_lans_for_topology(t_id)
assert len(lans) == 2
by_name = {lan["name"]: lan for lan in lans}
assert by_name["DMZ"]["docker_network_id"] == "abc123"
assert by_name["DMZ"]["is_dmz"] is True
assert by_name["LAN-A"]["is_dmz"] is False
@pytest.mark.anyio
async def test_topology_decky_json_roundtrip(repo):
t_id = await repo.create_topology(
{"name": "gamma", "mode": "unihost", "config_snapshot": {}}
)
d_uuid = await repo.add_topology_decky(
{
"topology_id": t_id,
"name": "decky-01",
"services": ["ssh", "http"],
"decky_config": {"hostname": "bastion"},
"ip": "172.20.0.10",
}
)
assert d_uuid
deckies = await repo.list_topology_deckies(t_id)
assert len(deckies) == 1
assert deckies[0]["services"] == ["ssh", "http"]
assert deckies[0]["decky_config"] == {"hostname": "bastion"}
assert deckies[0]["state"] == "pending"
await repo.update_topology_decky(d_uuid, {"state": "running", "ip": "172.20.0.11"})
deckies = await repo.list_topology_deckies(t_id)
assert deckies[0]["state"] == "running"
assert deckies[0]["ip"] == "172.20.0.11"
@pytest.mark.anyio
async def test_topology_decky_name_unique_within_topology(repo):
"""Same decky name is legal across topologies, forbidden within one."""
t1 = await repo.create_topology(
{"name": "one", "mode": "unihost", "config_snapshot": {}}
)
t2 = await repo.create_topology(
{"name": "two", "mode": "unihost", "config_snapshot": {}}
)
await repo.add_topology_decky(
{"topology_id": t1, "name": "decky-01", "services": []}
)
# Same name, different topology — must succeed.
await repo.add_topology_decky(
{"topology_id": t2, "name": "decky-01", "services": []}
)
# Same name, same topology — must fail at the DB level.
with pytest.raises(Exception):
await repo.add_topology_decky(
{"topology_id": t1, "name": "decky-01", "services": []}
)
@pytest.mark.anyio
async def test_status_transition_writes_event(repo):
t_id = await repo.create_topology(
{"name": "delta", "mode": "unihost", "config_snapshot": {}}
)
await repo.update_topology_status(t_id, "deploying", reason="kickoff")
await repo.update_topology_status(t_id, "active")
topo = await repo.get_topology(t_id)
assert topo["status"] == "active"
events = await repo.list_topology_status_events(t_id)
assert len(events) == 2
# Ordered desc by at — latest first
assert events[0]["to_status"] == "active"
assert events[0]["from_status"] == "deploying"
assert events[1]["to_status"] == "deploying"
assert events[1]["from_status"] == "pending"
assert events[1]["reason"] == "kickoff"
@pytest.mark.anyio
async def test_cascade_delete_clears_all_children(repo):
t_id = await repo.create_topology(
{"name": "eps", "mode": "unihost", "config_snapshot": {}}
)
lan_id = await repo.add_lan(
{"topology_id": t_id, "name": "L", "subnet": "10.0.0.0/24"}
)
d_uuid = await repo.add_topology_decky(
{"topology_id": t_id, "name": "d", "services": []}
)
await repo.add_topology_edge(
{"topology_id": t_id, "decky_uuid": d_uuid, "lan_id": lan_id}
)
await repo.update_topology_status(t_id, "deploying")
assert await repo.delete_topology_cascade(t_id) is True
assert await repo.get_topology(t_id) is None
assert await repo.list_lans_for_topology(t_id) == []
assert await repo.list_topology_deckies(t_id) == []
assert await repo.list_topology_edges(t_id) == []
assert await repo.list_topology_status_events(t_id) == []
# Second delete on a missing row returns False, no raise
assert await repo.delete_topology_cascade(t_id) is False
@pytest.mark.anyio
async def test_list_topologies_filters_by_status(repo):
a = await repo.create_topology(
{"name": "a", "mode": "unihost", "config_snapshot": {}}
)
b = await repo.create_topology(
{"name": "b", "mode": "unihost", "config_snapshot": {}}
)
await repo.update_topology_status(b, "deploying")
pend = await repo.list_topologies(status="pending")
assert {t["id"] for t in pend} == {a}
dep = await repo.list_topologies(status="deploying")
assert {t["id"] for t in dep} == {b}
both = await repo.list_topologies()
assert {t["id"] for t in both} == {a, b}