Replace per-decky sniffer containers with a single host-side sniffer that monitors all traffic on the MACVLAN interface. Runs as a background task in the FastAPI lifespan alongside the collector, fully fault-isolated so failures never crash the API. - Add fleet_singleton flag to BaseService; sniffer marked as singleton - Composer skips fleet_singleton services in compose generation - Fleet builder excludes singletons from random service assignment - Extract TLS fingerprinting engine from templates/sniffer/server.py into decnet/sniffer/ package (parameterized for fleet-wide use) - Sniffer worker maps packets to deckies via IP→name state mapping - Original templates/sniffer/server.py preserved for future use
178 lines
6.1 KiB
Python
178 lines
6.1 KiB
Python
"""
|
|
Fleet builder — shared logic for constructing DeckyConfig lists.
|
|
|
|
Used by both the CLI and the web API router to build deckies from
|
|
flags or INI config. Lives here (not in cli.py) so that the web layer
|
|
and the mutation engine can import it without depending on the CLI.
|
|
"""
|
|
|
|
import random
|
|
from typing import Optional
|
|
|
|
from decnet.archetypes import Archetype, get_archetype
|
|
from decnet.config import DeckyConfig, random_hostname
|
|
from decnet.distros import all_distros, get_distro, random_distro
|
|
from decnet.models import IniConfig
|
|
from decnet.services.registry import all_services
|
|
|
|
|
|
def all_service_names() -> list[str]:
|
|
"""Return all registered per-decky service names (excludes fleet singletons)."""
|
|
return sorted(
|
|
name for name, svc in all_services().items()
|
|
if not svc.fleet_singleton
|
|
)
|
|
|
|
|
|
def resolve_distros(
|
|
distros_explicit: list[str] | None,
|
|
randomize_distros: bool,
|
|
n: int,
|
|
archetype: Archetype | None = None,
|
|
) -> list[str]:
|
|
"""Return a list of n distro slugs based on flags or archetype preference."""
|
|
if distros_explicit:
|
|
return [distros_explicit[i % len(distros_explicit)] for i in range(n)]
|
|
if randomize_distros:
|
|
return [random_distro().slug for _ in range(n)]
|
|
if archetype:
|
|
pool = archetype.preferred_distros
|
|
return [pool[i % len(pool)] for i in range(n)]
|
|
slugs = list(all_distros().keys())
|
|
return [slugs[i % len(slugs)] for i in range(n)]
|
|
|
|
|
|
def build_deckies(
|
|
n: int,
|
|
ips: list[str],
|
|
services_explicit: list[str] | None,
|
|
randomize_services: bool,
|
|
distros_explicit: list[str] | None = None,
|
|
randomize_distros: bool = False,
|
|
archetype: Archetype | None = None,
|
|
mutate_interval: Optional[int] = None,
|
|
) -> list[DeckyConfig]:
|
|
"""Build a list of DeckyConfigs from CLI-style flags."""
|
|
deckies = []
|
|
used_combos: set[frozenset] = set()
|
|
distro_slugs = resolve_distros(distros_explicit, randomize_distros, n, archetype)
|
|
|
|
for i, ip in enumerate(ips):
|
|
name = f"decky-{i + 1:02d}"
|
|
distro = get_distro(distro_slugs[i])
|
|
hostname = random_hostname(distro.slug)
|
|
|
|
if services_explicit:
|
|
svc_list = services_explicit
|
|
elif archetype:
|
|
svc_list = list(archetype.services)
|
|
elif randomize_services:
|
|
svc_pool = all_service_names()
|
|
attempts = 0
|
|
while True:
|
|
count = random.randint(1, min(3, len(svc_pool))) # nosec B311
|
|
chosen = frozenset(random.sample(svc_pool, count)) # nosec B311
|
|
attempts += 1
|
|
if chosen not in used_combos or attempts > 20:
|
|
break
|
|
svc_list = list(chosen)
|
|
used_combos.add(chosen)
|
|
else:
|
|
raise ValueError("Provide services_explicit, archetype, or randomize_services=True.")
|
|
|
|
deckies.append(
|
|
DeckyConfig(
|
|
name=name,
|
|
ip=ip,
|
|
services=svc_list,
|
|
distro=distro.slug,
|
|
base_image=distro.image,
|
|
build_base=distro.build_base,
|
|
hostname=hostname,
|
|
archetype=archetype.slug if archetype else None,
|
|
nmap_os=archetype.nmap_os if archetype else "linux",
|
|
mutate_interval=mutate_interval,
|
|
)
|
|
)
|
|
return deckies
|
|
|
|
|
|
def build_deckies_from_ini(
|
|
ini: IniConfig,
|
|
subnet_cidr: str,
|
|
gateway: str,
|
|
host_ip: str,
|
|
randomize: bool,
|
|
cli_mutate_interval: int | None = None,
|
|
) -> list[DeckyConfig]:
|
|
"""Build DeckyConfig list from an IniConfig, auto-allocating missing IPs."""
|
|
from ipaddress import IPv4Address, IPv4Network
|
|
import time
|
|
now = time.time()
|
|
|
|
explicit_ips: set[IPv4Address] = {
|
|
IPv4Address(s.ip) for s in ini.deckies if s.ip
|
|
}
|
|
|
|
net = IPv4Network(subnet_cidr, strict=False)
|
|
reserved = {
|
|
net.network_address,
|
|
net.broadcast_address,
|
|
IPv4Address(gateway),
|
|
IPv4Address(host_ip),
|
|
} | explicit_ips
|
|
|
|
auto_pool = (str(addr) for addr in net.hosts() if addr not in reserved)
|
|
|
|
deckies: list[DeckyConfig] = []
|
|
for spec in ini.deckies:
|
|
arch: Archetype | None = None
|
|
if spec.archetype:
|
|
arch = get_archetype(spec.archetype)
|
|
|
|
distro_pool = arch.preferred_distros if arch else list(all_distros().keys())
|
|
distro = get_distro(distro_pool[len(deckies) % len(distro_pool)])
|
|
hostname = random_hostname(distro.slug)
|
|
|
|
ip = spec.ip or next(auto_pool, None)
|
|
if ip is None:
|
|
raise ValueError(f"Not enough free IPs in {subnet_cidr} while assigning IP for '{spec.name}'.")
|
|
|
|
if spec.services:
|
|
known = set(all_service_names())
|
|
unknown = [s for s in spec.services if s not in known]
|
|
if unknown:
|
|
raise ValueError(
|
|
f"Unknown service(s) in [{spec.name}]: {unknown}. "
|
|
f"Available: {all_service_names()}"
|
|
)
|
|
svc_list = spec.services
|
|
elif arch:
|
|
svc_list = list(arch.services)
|
|
elif randomize or (not spec.services and not arch):
|
|
svc_pool = all_service_names()
|
|
count = random.randint(1, min(3, len(svc_pool))) # nosec B311
|
|
svc_list = random.sample(svc_pool, count) # nosec B311
|
|
|
|
resolved_nmap_os = spec.nmap_os or (arch.nmap_os if arch else "linux")
|
|
|
|
decky_mutate_interval = cli_mutate_interval
|
|
if decky_mutate_interval is None:
|
|
decky_mutate_interval = spec.mutate_interval if spec.mutate_interval is not None else ini.mutate_interval
|
|
|
|
deckies.append(DeckyConfig(
|
|
name=spec.name,
|
|
ip=ip,
|
|
services=svc_list,
|
|
distro=distro.slug,
|
|
base_image=distro.image,
|
|
build_base=distro.build_base,
|
|
hostname=hostname,
|
|
archetype=arch.slug if arch else None,
|
|
service_config=spec.service_config,
|
|
nmap_os=resolved_nmap_os,
|
|
mutate_interval=decky_mutate_interval,
|
|
last_mutated=now,
|
|
))
|
|
return deckies
|