diff --git a/tests/api/config/__init__.py b/tests/api/config/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/api/config/__pycache__/__init__.cpython-314.pyc b/tests/api/config/__pycache__/__init__.cpython-314.pyc new file mode 100644 index 0000000..a06feb9 Binary files /dev/null and b/tests/api/config/__pycache__/__init__.cpython-314.pyc differ diff --git a/tests/api/config/__pycache__/conftest.cpython-314-pytest-9.0.3.pyc b/tests/api/config/__pycache__/conftest.cpython-314-pytest-9.0.3.pyc new file mode 100644 index 0000000..b77e929 Binary files /dev/null and b/tests/api/config/__pycache__/conftest.cpython-314-pytest-9.0.3.pyc differ diff --git a/tests/api/config/__pycache__/test_deploy_limit.cpython-314-pytest-9.0.3.pyc b/tests/api/config/__pycache__/test_deploy_limit.cpython-314-pytest-9.0.3.pyc new file mode 100644 index 0000000..ea634d6 Binary files /dev/null and b/tests/api/config/__pycache__/test_deploy_limit.cpython-314-pytest-9.0.3.pyc differ diff --git a/tests/api/config/__pycache__/test_get_config.cpython-314-pytest-9.0.3.pyc b/tests/api/config/__pycache__/test_get_config.cpython-314-pytest-9.0.3.pyc new file mode 100644 index 0000000..6143994 Binary files /dev/null and b/tests/api/config/__pycache__/test_get_config.cpython-314-pytest-9.0.3.pyc differ diff --git a/tests/api/config/__pycache__/test_reinit.cpython-314-pytest-9.0.3.pyc b/tests/api/config/__pycache__/test_reinit.cpython-314-pytest-9.0.3.pyc new file mode 100644 index 0000000..a983b79 Binary files /dev/null and b/tests/api/config/__pycache__/test_reinit.cpython-314-pytest-9.0.3.pyc differ diff --git a/tests/api/config/__pycache__/test_update_config.cpython-314-pytest-9.0.3.pyc b/tests/api/config/__pycache__/test_update_config.cpython-314-pytest-9.0.3.pyc new file mode 100644 index 0000000..f127209 Binary files /dev/null and b/tests/api/config/__pycache__/test_update_config.cpython-314-pytest-9.0.3.pyc differ diff --git a/tests/api/config/__pycache__/test_user_management.cpython-314-pytest-9.0.3.pyc b/tests/api/config/__pycache__/test_user_management.cpython-314-pytest-9.0.3.pyc new file mode 100644 index 0000000..51269c0 Binary files /dev/null and b/tests/api/config/__pycache__/test_user_management.cpython-314-pytest-9.0.3.pyc differ diff --git a/tests/api/config/conftest.py b/tests/api/config/conftest.py new file mode 100644 index 0000000..fb95821 --- /dev/null +++ b/tests/api/config/conftest.py @@ -0,0 +1 @@ +# viewer_token fixture is now in tests/api/conftest.py (shared across all API tests) diff --git a/tests/api/config/test_deploy_limit.py b/tests/api/config/test_deploy_limit.py new file mode 100644 index 0000000..82e5f0a --- /dev/null +++ b/tests/api/config/test_deploy_limit.py @@ -0,0 +1,57 @@ +import pytest +from unittest.mock import patch + +from decnet.web.dependencies import repo + + +@pytest.fixture(autouse=True) +def contract_test_mode(monkeypatch): + """Skip actual Docker deployment in tests.""" + monkeypatch.setenv("DECNET_CONTRACT_TEST", "true") + + +@pytest.fixture(autouse=True) +def mock_network(): + """Mock network detection so deploy doesn't call `ip addr show`.""" + with patch("decnet.web.router.fleet.api_deploy_deckies.get_host_ip", return_value="192.168.1.100"): + yield + + +@pytest.mark.anyio +async def test_deploy_respects_limit(client, auth_token, mock_state_file): + """Deploy should reject if total deckies would exceed limit.""" + await repo.set_state("config_limits", {"deployment_limit": 1}) + await repo.set_state("deployment", mock_state_file) + + ini = """[decky-new] +services = ssh +""" + resp = await client.post( + "/api/v1/deckies/deploy", + json={"ini_content": ini}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + # 2 existing + 1 new = 3 > limit of 1 + assert resp.status_code == 409 + assert "limit" in resp.json()["detail"].lower() + + +@pytest.mark.anyio +async def test_deploy_within_limit(client, auth_token, mock_state_file): + """Deploy should succeed when within limit.""" + await repo.set_state("config_limits", {"deployment_limit": 100}) + await repo.set_state("deployment", mock_state_file) + + ini = """[decky-new] +services = ssh +""" + resp = await client.post( + "/api/v1/deckies/deploy", + json={"ini_content": ini}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + # Should not fail due to limit + if resp.status_code == 409: + assert "limit" not in resp.json()["detail"].lower() + else: + assert resp.status_code == 200 diff --git a/tests/api/config/test_get_config.py b/tests/api/config/test_get_config.py new file mode 100644 index 0000000..ab4a437 --- /dev/null +++ b/tests/api/config/test_get_config.py @@ -0,0 +1,69 @@ +import pytest + + +@pytest.mark.anyio +async def test_get_config_defaults_admin(client, auth_token): + """Admin gets full config with users list and defaults.""" + resp = await client.get( + "/api/v1/config", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["role"] == "admin" + assert data["deployment_limit"] == 10 + assert data["global_mutation_interval"] == "30m" + assert "users" in data + assert isinstance(data["users"], list) + assert len(data["users"]) >= 1 + # Ensure no password_hash leaked + for user in data["users"]: + assert "password_hash" not in user + assert "uuid" in user + assert "username" in user + assert "role" in user + + +@pytest.mark.anyio +async def test_get_config_viewer_no_users(client, auth_token, viewer_token): + """Viewer gets config without users list — server-side gating.""" + resp = await client.get( + "/api/v1/config", + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["role"] == "viewer" + assert data["deployment_limit"] == 10 + assert data["global_mutation_interval"] == "30m" + assert "users" not in data + + +@pytest.mark.anyio +async def test_get_config_returns_stored_values(client, auth_token): + """Config returns stored values after update.""" + await client.put( + "/api/v1/config/deployment-limit", + json={"deployment_limit": 42}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + await client.put( + "/api/v1/config/global-mutation-interval", + json={"global_mutation_interval": "7d"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + + resp = await client.get( + "/api/v1/config", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["deployment_limit"] == 42 + assert data["global_mutation_interval"] == "7d" + + +@pytest.mark.anyio +async def test_get_config_unauthenticated(client): + resp = await client.get("/api/v1/config") + assert resp.status_code == 401 diff --git a/tests/api/config/test_reinit.py b/tests/api/config/test_reinit.py new file mode 100644 index 0000000..32f09ac --- /dev/null +++ b/tests/api/config/test_reinit.py @@ -0,0 +1,76 @@ +import pytest + +from decnet.web.dependencies import repo + + +@pytest.fixture(autouse=True) +def enable_developer_mode(monkeypatch): + monkeypatch.setattr("decnet.web.router.config.api_reinit.DECNET_DEVELOPER", True) + monkeypatch.setattr("decnet.web.router.config.api_get_config.DECNET_DEVELOPER", True) + + +@pytest.mark.anyio +async def test_reinit_purges_data(client, auth_token): + """Admin can purge all logs, bounties, and attackers in developer mode.""" + # Seed some data + await repo.add_log({ + "decky": "d1", "service": "ssh", "event_type": "connect", + "attacker_ip": "1.2.3.4", "raw_line": "test", "fields": "{}", + }) + await repo.add_bounty({ + "decky": "d1", "service": "ssh", "attacker_ip": "1.2.3.4", + "bounty_type": "credential", "payload": '{"user":"root"}', + }) + + resp = await client.delete( + "/api/v1/config/reinit", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["deleted"]["logs"] >= 1 + assert data["deleted"]["bounties"] >= 1 + + +@pytest.mark.anyio +async def test_reinit_viewer_forbidden(client, auth_token, viewer_token): + resp = await client.delete( + "/api/v1/config/reinit", + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 403 + + +@pytest.mark.anyio +async def test_reinit_forbidden_without_developer_mode(client, auth_token, monkeypatch): + monkeypatch.setattr("decnet.web.router.config.api_reinit.DECNET_DEVELOPER", False) + + resp = await client.delete( + "/api/v1/config/reinit", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 403 + assert "developer mode" in resp.json()["detail"].lower() + + +@pytest.mark.anyio +async def test_config_includes_developer_mode(client, auth_token): + """Admin config response includes developer_mode when enabled.""" + resp = await client.get( + "/api/v1/config", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200 + assert resp.json()["developer_mode"] is True + + +@pytest.mark.anyio +async def test_config_excludes_developer_mode_when_disabled(client, auth_token, monkeypatch): + monkeypatch.setattr("decnet.web.router.config.api_get_config.DECNET_DEVELOPER", False) + + resp = await client.get( + "/api/v1/config", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200 + assert "developer_mode" not in resp.json() diff --git a/tests/api/config/test_update_config.py b/tests/api/config/test_update_config.py new file mode 100644 index 0000000..9f83459 --- /dev/null +++ b/tests/api/config/test_update_config.py @@ -0,0 +1,77 @@ +import pytest + + +@pytest.mark.anyio +async def test_update_deployment_limit_admin(client, auth_token): + resp = await client.put( + "/api/v1/config/deployment-limit", + json={"deployment_limit": 50}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200 + assert resp.json()["message"] == "Deployment limit updated" + + +@pytest.mark.anyio +async def test_update_deployment_limit_out_of_range(client, auth_token): + resp = await client.put( + "/api/v1/config/deployment-limit", + json={"deployment_limit": 0}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 422 + + resp = await client.put( + "/api/v1/config/deployment-limit", + json={"deployment_limit": 501}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 422 + + +@pytest.mark.anyio +async def test_update_deployment_limit_viewer_forbidden(client, auth_token, viewer_token): + resp = await client.put( + "/api/v1/config/deployment-limit", + json={"deployment_limit": 50}, + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 403 + + +@pytest.mark.anyio +async def test_update_global_mutation_interval_admin(client, auth_token): + resp = await client.put( + "/api/v1/config/global-mutation-interval", + json={"global_mutation_interval": "7d"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200 + assert resp.json()["message"] == "Global mutation interval updated" + + +@pytest.mark.anyio +async def test_update_global_mutation_interval_invalid(client, auth_token): + resp = await client.put( + "/api/v1/config/global-mutation-interval", + json={"global_mutation_interval": "abc"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 422 + + resp = await client.put( + "/api/v1/config/global-mutation-interval", + json={"global_mutation_interval": "0m"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 422 + + +@pytest.mark.anyio +async def test_update_global_mutation_interval_viewer_forbidden(client, auth_token, viewer_token): + resp = await client.put( + "/api/v1/config/global-mutation-interval", + json={"global_mutation_interval": "7d"}, + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 403 diff --git a/tests/api/config/test_user_management.py b/tests/api/config/test_user_management.py new file mode 100644 index 0000000..3f6d807 --- /dev/null +++ b/tests/api/config/test_user_management.py @@ -0,0 +1,188 @@ +import pytest + + +@pytest.mark.anyio +async def test_create_user(client, auth_token): + resp = await client.post( + "/api/v1/config/users", + json={"username": "newuser", "password": "securepass123", "role": "viewer"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200 + data = resp.json() + assert data["username"] == "newuser" + assert data["role"] == "viewer" + assert data["must_change_password"] is True + assert "password_hash" not in data + + +@pytest.mark.anyio +async def test_create_user_duplicate(client, auth_token): + await client.post( + "/api/v1/config/users", + json={"username": "dupuser", "password": "securepass123", "role": "viewer"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + resp = await client.post( + "/api/v1/config/users", + json={"username": "dupuser", "password": "securepass456", "role": "viewer"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 409 + + +@pytest.mark.anyio +async def test_create_user_viewer_forbidden(client, auth_token, viewer_token): + resp = await client.post( + "/api/v1/config/users", + json={"username": "blocked", "password": "securepass123", "role": "viewer"}, + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 403 + + +@pytest.mark.anyio +async def test_delete_user(client, auth_token): + # Create a user to delete + create_resp = await client.post( + "/api/v1/config/users", + json={"username": "todelete", "password": "securepass123", "role": "viewer"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + user_uuid = create_resp.json()["uuid"] + + resp = await client.delete( + f"/api/v1/config/users/{user_uuid}", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200 + + +@pytest.mark.anyio +async def test_delete_self_forbidden(client, auth_token): + # Get own UUID from config + config_resp = await client.get( + "/api/v1/config", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + users = config_resp.json()["users"] + admin_uuid = next(u["uuid"] for u in users if u["role"] == "admin") + + resp = await client.delete( + f"/api/v1/config/users/{admin_uuid}", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 403 + + +@pytest.mark.anyio +async def test_delete_nonexistent_user(client, auth_token): + resp = await client.delete( + "/api/v1/config/users/00000000-0000-0000-0000-000000000000", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 404 + + +@pytest.mark.anyio +async def test_update_user_role(client, auth_token): + create_resp = await client.post( + "/api/v1/config/users", + json={"username": "roletest", "password": "securepass123", "role": "viewer"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + user_uuid = create_resp.json()["uuid"] + + resp = await client.put( + f"/api/v1/config/users/{user_uuid}/role", + json={"role": "admin"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200 + + # Verify role changed + config_resp = await client.get( + "/api/v1/config", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + updated = next(u for u in config_resp.json()["users"] if u["uuid"] == user_uuid) + assert updated["role"] == "admin" + + +@pytest.mark.anyio +async def test_update_own_role_forbidden(client, auth_token): + config_resp = await client.get( + "/api/v1/config", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + admin_uuid = next(u["uuid"] for u in config_resp.json()["users"] if u["role"] == "admin") + + resp = await client.put( + f"/api/v1/config/users/{admin_uuid}/role", + json={"role": "viewer"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 403 + + +@pytest.mark.anyio +async def test_reset_user_password(client, auth_token): + create_resp = await client.post( + "/api/v1/config/users", + json={"username": "resetme", "password": "securepass123", "role": "viewer"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + user_uuid = create_resp.json()["uuid"] + + resp = await client.put( + f"/api/v1/config/users/{user_uuid}/reset-password", + json={"new_password": "newpass12345"}, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200 + + # Verify must_change_password is set + config_resp = await client.get( + "/api/v1/config", + headers={"Authorization": f"Bearer {auth_token}"}, + ) + updated = next(u for u in config_resp.json()["users"] if u["uuid"] == user_uuid) + assert updated["must_change_password"] is True + + # Verify new password works + login_resp = await client.post( + "/api/v1/auth/login", + json={"username": "resetme", "password": "newpass12345"}, + ) + assert login_resp.status_code == 200 + + +@pytest.mark.anyio +async def test_all_user_endpoints_viewer_forbidden(client, auth_token, viewer_token): + """Viewer cannot access any user management endpoints.""" + resp = await client.post( + "/api/v1/config/users", + json={"username": "x", "password": "securepass123", "role": "viewer"}, + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 403 + + resp = await client.delete( + "/api/v1/config/users/fake-uuid", + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 403 + + resp = await client.put( + "/api/v1/config/users/fake-uuid/role", + json={"role": "admin"}, + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 403 + + resp = await client.put( + "/api/v1/config/users/fake-uuid/reset-password", + json={"new_password": "securepass123"}, + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 403 diff --git a/tests/api/test_rbac.py b/tests/api/test_rbac.py new file mode 100644 index 0000000..e05c561 --- /dev/null +++ b/tests/api/test_rbac.py @@ -0,0 +1,116 @@ +"""RBAC matrix tests — verify role enforcement on every API endpoint.""" +import pytest + + +# ── Read-only endpoints: viewer + admin should both get access ────────── + +_VIEWER_ENDPOINTS = [ + ("GET", "/api/v1/logs"), + ("GET", "/api/v1/logs/histogram"), + ("GET", "/api/v1/bounty"), + ("GET", "/api/v1/deckies"), + ("GET", "/api/v1/stats"), + ("GET", "/api/v1/attackers"), + ("GET", "/api/v1/config"), +] + + +@pytest.mark.anyio +@pytest.mark.parametrize("method,path", _VIEWER_ENDPOINTS) +async def test_viewer_can_access_read_endpoints(client, viewer_token, method, path): + resp = await client.request( + method, path, headers={"Authorization": f"Bearer {viewer_token}"} + ) + assert resp.status_code == 200, f"{method} {path} returned {resp.status_code}" + + +@pytest.mark.anyio +@pytest.mark.parametrize("method,path", _VIEWER_ENDPOINTS) +async def test_admin_can_access_read_endpoints(client, auth_token, method, path): + resp = await client.request( + method, path, headers={"Authorization": f"Bearer {auth_token}"} + ) + assert resp.status_code == 200, f"{method} {path} returned {resp.status_code}" + + +# ── Admin-only endpoints: viewer must get 403 ────────────────────────── + +_ADMIN_ENDPOINTS = [ + ("PUT", "/api/v1/config/deployment-limit", {"deployment_limit": 5}), + ("PUT", "/api/v1/config/global-mutation-interval", {"global_mutation_interval": "1d"}), + ("POST", "/api/v1/config/users", {"username": "rbac-test", "password": "pass123456", "role": "viewer"}), +] + + +@pytest.mark.anyio +@pytest.mark.parametrize("method,path,body", _ADMIN_ENDPOINTS) +async def test_viewer_blocked_from_admin_endpoints(client, viewer_token, method, path, body): + resp = await client.request( + method, path, + json=body, + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 403, f"{method} {path} returned {resp.status_code}" + + +@pytest.mark.anyio +@pytest.mark.parametrize("method,path,body", _ADMIN_ENDPOINTS) +async def test_admin_can_access_admin_endpoints(client, auth_token, method, path, body): + resp = await client.request( + method, path, + json=body, + headers={"Authorization": f"Bearer {auth_token}"}, + ) + assert resp.status_code == 200, f"{method} {path} returned {resp.status_code}" + + +# ── Unauthenticated access: must get 401 ─────────────────────────────── + +_ALL_PROTECTED = [ + ("GET", "/api/v1/logs"), + ("GET", "/api/v1/stats"), + ("GET", "/api/v1/deckies"), + ("GET", "/api/v1/bounty"), + ("GET", "/api/v1/attackers"), + ("GET", "/api/v1/config"), + ("PUT", "/api/v1/config/deployment-limit"), + ("POST", "/api/v1/config/users"), +] + + +@pytest.mark.anyio +@pytest.mark.parametrize("method,path", _ALL_PROTECTED) +async def test_unauthenticated_returns_401(client, method, path): + resp = await client.request(method, path) + assert resp.status_code == 401, f"{method} {path} returned {resp.status_code}" + + +# ── Fleet write endpoints: viewer must get 403 ───────────────────────── + +@pytest.mark.anyio +async def test_viewer_blocked_from_deploy(client, viewer_token): + resp = await client.post( + "/api/v1/deckies/deploy", + json={"ini_content": "[decky-rbac-test]\nservices=ssh"}, + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 403 + + +@pytest.mark.anyio +async def test_viewer_blocked_from_mutate(client, viewer_token): + resp = await client.post( + "/api/v1/deckies/test-decky/mutate", + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 403 + + +@pytest.mark.anyio +async def test_viewer_blocked_from_mutate_interval(client, viewer_token): + resp = await client.put( + "/api/v1/deckies/test-decky/mutate-interval", + json={"mutate_interval": "5d"}, + headers={"Authorization": f"Bearer {viewer_token}"}, + ) + assert resp.status_code == 403 diff --git a/tests/test_profiler_behavioral.py b/tests/test_profiler_behavioral.py new file mode 100644 index 0000000..44f6dfc --- /dev/null +++ b/tests/test_profiler_behavioral.py @@ -0,0 +1,277 @@ +""" +Unit tests for the profiler behavioral/timing analyzer. + +Covers: + - timing_stats: mean/median/stdev/cv on synthetic event streams + - classify_behavior: beaconing vs interactive vs scanning vs mixed vs unknown + - guess_tool: attribution matching and tolerance boundaries + - phase_sequence: recon → exfil latency detection + - sniffer_rollup: OS-guess mode, hop median, retransmit sum + - build_behavior_record: composite output shape (JSON-encoded subfields) +""" + +from __future__ import annotations + +import json +from datetime import datetime, timedelta, timezone + +from decnet.correlation.parser import LogEvent +from decnet.profiler.behavioral import ( + build_behavior_record, + classify_behavior, + guess_tool, + phase_sequence, + sniffer_rollup, + timing_stats, +) + + +# ─── Helpers ──────────────────────────────────────────────────────────────── + +_BASE = datetime(2026, 4, 15, 12, 0, 0, tzinfo=timezone.utc) + + +def _mk( + ts_offset_s: float, + event_type: str = "connection", + service: str = "ssh", + decky: str = "decky-01", + fields: dict | None = None, + ip: str = "10.0.0.7", +) -> LogEvent: + """Build a synthetic LogEvent at BASE + offset seconds.""" + return LogEvent( + timestamp=_BASE + timedelta(seconds=ts_offset_s), + decky=decky, + service=service, + event_type=event_type, + attacker_ip=ip, + fields=fields or {}, + raw="", + ) + + +def _regular_beacon(count: int, interval_s: float, jitter_s: float = 0.0) -> list[LogEvent]: + """ + Build *count* events with alternating IATs of (interval_s ± jitter_s). + + This yields: + - mean IAT = interval_s + - stdev IAT = jitter_s + - coefficient of variation = jitter_s / interval_s + """ + events: list[LogEvent] = [] + offset = 0.0 + events.append(_mk(offset)) + for i in range(1, count): + iat = interval_s + (jitter_s if i % 2 == 1 else -jitter_s) + offset += iat + events.append(_mk(offset)) + return events + + +# ─── timing_stats ─────────────────────────────────────────────────────────── + +class TestTimingStats: + def test_empty_returns_nulls(self): + s = timing_stats([]) + assert s["event_count"] == 0 + assert s["mean_iat_s"] is None + assert s["cv"] is None + + def test_single_event(self): + s = timing_stats([_mk(0)]) + assert s["event_count"] == 1 + assert s["duration_s"] == 0.0 + assert s["mean_iat_s"] is None + + def test_regular_cadence_cv_is_zero(self): + events = _regular_beacon(count=10, interval_s=60.0) + s = timing_stats(events) + assert s["event_count"] == 10 + assert s["mean_iat_s"] == 60.0 + assert s["cv"] == 0.0 + assert s["stdev_iat_s"] == 0.0 + + def test_jittered_cadence(self): + events = _regular_beacon(count=20, interval_s=60.0, jitter_s=12.0) + s = timing_stats(events) + # Mean is close to 60, cv ~20% (jitter 12 / interval 60) + assert abs(s["mean_iat_s"] - 60.0) < 2.0 + assert s["cv"] is not None + assert 0.10 < s["cv"] < 0.50 + + +# ─── classify_behavior ────────────────────────────────────────────────────── + +class TestClassifyBehavior: + def test_unknown_if_too_few(self): + s = timing_stats(_regular_beacon(count=2, interval_s=60.0)) + assert classify_behavior(s, services_count=1) == "unknown" + + def test_beaconing_regular_cadence(self): + s = timing_stats(_regular_beacon(count=10, interval_s=60.0, jitter_s=3.0)) + assert classify_behavior(s, services_count=1) == "beaconing" + + def test_interactive_fast_irregular(self): + # Very fast events with high variance ≈ a human hitting keys + think time + events = [] + times = [0, 0.2, 0.5, 1.0, 5.0, 5.1, 5.3, 10.0, 10.1, 10.2, 12.0] + for t in times: + events.append(_mk(t)) + s = timing_stats(events) + assert classify_behavior(s, services_count=1) == "interactive" + + def test_scanning_many_services_fast(self): + # 10 events across 5 services, each 0.2s apart + events = [] + svcs = ["ssh", "http", "smb", "ftp", "rdp"] + for i in range(10): + events.append(_mk(i * 0.2, service=svcs[i % 5])) + s = timing_stats(events) + assert classify_behavior(s, services_count=5) == "scanning" + + def test_mixed_fallback(self): + # Moderate count, moderate cv, single service, moderate cadence + events = _regular_beacon(count=6, interval_s=20.0, jitter_s=10.0) + s = timing_stats(events) + # cv ~0.5, not tight enough for beaconing, mean 20s > interactive + result = classify_behavior(s, services_count=1) + assert result in ("mixed", "interactive") # either is acceptable + + +# ─── guess_tool ───────────────────────────────────────────────────────────── + +class TestGuessTool: + def test_cobalt_strike(self): + # Default: 60s interval, 20% jitter → cv 0.20 + assert guess_tool(mean_iat_s=60.0, cv=0.20) == "cobalt_strike" + + def test_havoc(self): + # 45s interval, 10% jitter → cv 0.10 + assert guess_tool(mean_iat_s=45.0, cv=0.10) == "havoc" + + def test_mythic(self): + assert guess_tool(mean_iat_s=30.0, cv=0.15) == "mythic" + + def test_no_match_outside_tolerance(self): + # 5-second beacon is far from any default + assert guess_tool(mean_iat_s=5.0, cv=0.10) is None + + def test_none_when_stats_missing(self): + assert guess_tool(None, None) is None + assert guess_tool(60.0, None) is None + + def test_ambiguous_returns_none(self): + # If a signature set is tweaked such that two profiles overlap, + # guess_tool must not attribute. + # Cobalt (60±10s, cv 0.20±0.08) and Sliver (60±15s, cv 0.30±0.10) + # overlap around (60s, cv=0.25). Both match → None. + result = guess_tool(mean_iat_s=60.0, cv=0.25) + assert result is None + + +# ─── phase_sequence ──────────────────────────────────────────────────────── + +class TestPhaseSequence: + def test_recon_then_exfil(self): + events = [ + _mk(0, event_type="scan"), + _mk(10, event_type="login_attempt"), + _mk(20, event_type="auth_failure"), + _mk(120, event_type="exec"), + _mk(150, event_type="download"), + ] + p = phase_sequence(events) + assert p["recon_end_ts"] is not None + assert p["exfil_start_ts"] is not None + assert p["exfil_latency_s"] == 100.0 # 120 - 20 + + def test_no_exfil(self): + events = [_mk(0, event_type="scan"), _mk(10, event_type="scan")] + p = phase_sequence(events) + assert p["exfil_start_ts"] is None + assert p["exfil_latency_s"] is None + + def test_large_payload_counted(self): + events = [ + _mk(0, event_type="download", fields={"bytes": "2097152"}), # 2 MiB + _mk(10, event_type="download", fields={"bytes": "500"}), # small + _mk(20, event_type="upload", fields={"size": "10485760"}), # 10 MiB + ] + p = phase_sequence(events) + assert p["large_payload_count"] == 2 + + +# ─── sniffer_rollup ───────────────────────────────────────────────────────── + +class TestSnifferRollup: + def test_os_mode(self): + events = [ + _mk(0, event_type="tcp_syn_fingerprint", + fields={"os_guess": "linux", "hop_distance": "3", + "window": "29200", "mss": "1460"}), + _mk(5, event_type="tcp_syn_fingerprint", + fields={"os_guess": "linux", "hop_distance": "3", + "window": "29200", "mss": "1460"}), + _mk(10, event_type="tcp_syn_fingerprint", + fields={"os_guess": "windows", "hop_distance": "8", + "window": "64240", "mss": "1460"}), + ] + r = sniffer_rollup(events) + assert r["os_guess"] == "linux" # mode + # Median of [3, 3, 8] = 3 + assert r["hop_distance"] == 3 + # Latest fingerprint snapshot wins + assert r["tcp_fingerprint"]["window"] == 64240 + + def test_retransmits_summed(self): + events = [ + _mk(0, event_type="tcp_flow_timing", fields={"retransmits": "2"}), + _mk(10, event_type="tcp_flow_timing", fields={"retransmits": "5"}), + _mk(20, event_type="tcp_flow_timing", fields={"retransmits": "0"}), + ] + r = sniffer_rollup(events) + assert r["retransmit_count"] == 7 + + def test_empty(self): + r = sniffer_rollup([]) + assert r["os_guess"] is None + assert r["hop_distance"] is None + assert r["retransmit_count"] == 0 + + +# ─── build_behavior_record (composite) ────────────────────────────────────── + +class TestBuildBehaviorRecord: + def test_beaconing_with_cobalt_strike_match(self): + # 60s interval, 20% jitter → cobalt strike default + events = _regular_beacon(count=20, interval_s=60.0, jitter_s=12.0) + r = build_behavior_record(events) + assert r["behavior_class"] == "beaconing" + assert r["beacon_interval_s"] is not None + assert 50 < r["beacon_interval_s"] < 70 + assert r["beacon_jitter_pct"] is not None + assert r["tool_guess"] == "cobalt_strike" + + def test_json_fields_are_strings(self): + events = _regular_beacon(count=5, interval_s=60.0) + r = build_behavior_record(events) + # timing_stats, phase_sequence, tcp_fingerprint must be JSON strings + assert isinstance(r["timing_stats"], str) + json.loads(r["timing_stats"]) # doesn't raise + assert isinstance(r["phase_sequence"], str) + json.loads(r["phase_sequence"]) + assert isinstance(r["tcp_fingerprint"], str) + json.loads(r["tcp_fingerprint"]) + + def test_non_beaconing_has_null_beacon_fields(self): + # Scanning behavior — should not report a beacon interval + events = [] + svcs = ["ssh", "http", "smb", "ftp", "rdp"] + for i in range(10): + events.append(_mk(i * 0.2, service=svcs[i % 5])) + r = build_behavior_record(events) + assert r["behavior_class"] == "scanning" + assert r["beacon_interval_s"] is None + assert r["beacon_jitter_pct"] is None