diff --git a/decnet/orchestrator/worker.py b/decnet/orchestrator/worker.py index 2c3d6e2c..14b7cf9c 100644 --- a/decnet/orchestrator/worker.py +++ b/decnet/orchestrator/worker.py @@ -98,12 +98,30 @@ async def orchestrator_worker( async def _one_tick(repo: BaseRepository, driver, bus) -> None: - deckies = await repo.list_running_topology_deckies() + # Union view: MazeNET topology + unihost fleet + SWARM shards. Pre-fleet + # this only saw topology_deckies and was permanently blind to MACVLAN / + # IPVLAN unihost decoys. + deckies = await repo.list_running_deckies() action = scheduler.pick(deckies) if action is None: + # Report the actual SSH-eligible count (what the scheduler filters + # to), not just len(deckies) — the old "running+ssh count=N" line + # reported the pre-filter count and misled debugging. + ssh_eligible = sum( + 1 for d in deckies + if isinstance(d.get("services"), list) + and "ssh" in d["services"] + and d.get("ip") + ) + by_source: dict[str, int] = {} + for d in deckies: + by_source[d.get("source", "unknown")] = ( + by_source.get(d.get("source", "unknown"), 0) + 1 + ) logger.debug( - "orchestrator: no actionable deckies (running+ssh count=%d)", - len(deckies), + "orchestrator: no actionable deckies " + "(running=%d ssh_eligible=%d sources=%s)", + len(deckies), ssh_eligible, by_source, ) return diff --git a/tests/orchestrator/test_worker_integration.py b/tests/orchestrator/test_worker_integration.py index 2d661d27..ca3368ed 100644 --- a/tests/orchestrator/test_worker_integration.py +++ b/tests/orchestrator/test_worker_integration.py @@ -106,6 +106,44 @@ async def test_one_tick_records_event_and_publishes(repo, fake_bus, monkeypatch) assert ev.payload["kind"] == row["kind"] +@pytest.mark.asyncio +async def test_one_tick_picks_fleet_deckies(repo, fake_bus, monkeypatch): + """Regression: orchestrator was permanently blind to unihost MACVLAN / + IPVLAN deckies because list_running_topology_deckies only scans + topology_deckies. The new union view (list_running_deckies) must + pull in fleet_deckies rows too.""" + await repo.upsert_fleet_decky({ + "host_uuid": "local", + "name": "fleet-d1", + "services": ["ssh"], + "decky_ip": "10.0.0.50", + "state": "running", + }) + await repo.upsert_fleet_decky({ + "host_uuid": "local", + "name": "fleet-d2", + "services": ["ssh"], + "decky_ip": "10.0.0.51", + "state": "running", + }) + + async def fake_run(argv): + if argv[3] == "python3": + return 0, "SSH-2.0-OpenSSH_9.6\r\n", "" + return 0, "", "" + + monkeypatch.setattr(ssh_driver, "_run", fake_run) + + driver = ssh_driver.SSHDriver() + await orch_worker._one_tick(repo, driver, fake_bus) + + rows = await repo.list_orchestrator_events(limit=10) + assert len(rows) == 1 + # The dst_decky_uuid is our composite "host_uuid:name" identifier + # for fleet-source rows (see SQLModelRepository.list_running_deckies). + assert rows[0]["dst_decky_uuid"].startswith("local:fleet-") + + @pytest.mark.asyncio async def test_tick_is_noop_when_no_running_deckies(repo, fake_bus, monkeypatch): called = False