+ {/* Page Header */}
+
+
+
+
ATTACKER PROFILES
+
+
+
+
+
+
+
+
+
+
-
-
NO ACTIVE THREATS PROFILED YET.
-
(Attackers view placeholder)
+
+ {/* Summary & Pagination */}
+
+
+
+ {total} THREATS PROFILED
+
+
+
+
+ Page {page} of {totalPages || 1}
+
+
+
+
+
+
+
+
+ {/* Card Grid */}
+ {loading ? (
+
+ SCANNING THREAT PROFILES...
+
+ ) : attackers.length === 0 ? (
+
+ NO ACTIVE THREATS PROFILED YET
+
+ ) : (
+
+ {attackers.map((a) => {
+ const lastCmd = a.commands.length > 0
+ ? a.commands[a.commands.length - 1]
+ : null;
+
+ return (
+
navigate(`/attackers/${a.uuid}`)}
+ >
+ {/* Header row */}
+
+ {a.ip}
+ {a.is_traversal && (
+ TRAVERSAL
+ )}
+
+
+ {/* Timestamps */}
+
+ First: {new Date(a.first_seen).toLocaleDateString()}
+ Last: {timeAgo(a.last_seen)}
+
+
+ {/* Counts */}
+
+ Events: {a.event_count}
+ Bounties: {a.bounty_count}
+ Creds: {a.credential_count}
+
+
+ {/* Services */}
+
+ {a.services.map((svc) => (
+ {svc.toUpperCase()}
+ ))}
+
+
+ {/* Deckies / Traversal Path */}
+ {a.traversal_path ? (
+
+ Path: {a.traversal_path}
+
+ ) : a.deckies.length > 0 ? (
+
+ Deckies: {a.deckies.join(', ')}
+
+ ) : null}
+
+ {/* Commands & Fingerprints */}
+
+ Cmds: {a.commands.length}
+ Fingerprints: {a.fingerprints.length}
+
+
+ {/* Last command preview */}
+ {lastCmd && (
+
+ Last cmd: {lastCmd.command}
+
+ )}
+
+ );
+ })}
+
+ )}
);
diff --git a/decnet_web/src/components/Dashboard.css b/decnet_web/src/components/Dashboard.css
index 773fcd9..3de3e15 100644
--- a/decnet_web/src/components/Dashboard.css
+++ b/decnet_web/src/components/Dashboard.css
@@ -127,3 +127,61 @@
from { transform: rotate(0deg); }
to { transform: rotate(360deg); }
}
+
+/* Attacker Profiles */
+.attacker-grid {
+ display: grid;
+ grid-template-columns: repeat(auto-fill, minmax(340px, 1fr));
+ gap: 16px;
+ padding: 16px;
+}
+
+.attacker-card {
+ background: var(--secondary-color);
+ border: 1px solid var(--border-color);
+ padding: 16px;
+ cursor: pointer;
+ transition: transform 0.15s ease, box-shadow 0.15s ease, border-color 0.15s ease;
+}
+
+.attacker-card:hover {
+ transform: translateY(-2px);
+ border-color: var(--text-color);
+ box-shadow: var(--matrix-green-glow);
+}
+
+.traversal-badge {
+ font-size: 0.65rem;
+ padding: 2px 8px;
+ border: 1px solid var(--accent-color);
+ background: rgba(238, 130, 238, 0.1);
+ color: var(--accent-color);
+ letter-spacing: 2px;
+}
+
+.service-badge {
+ font-size: 0.7rem;
+ padding: 2px 8px;
+ border: 1px solid var(--text-color);
+ background: rgba(0, 255, 65, 0.05);
+ color: var(--text-color);
+}
+
+.back-button {
+ display: inline-flex;
+ align-items: center;
+ gap: 8px;
+ padding: 8px 16px;
+ border: 1px solid var(--border-color);
+ background: transparent;
+ color: var(--text-color);
+ cursor: pointer;
+ font-size: 0.8rem;
+ letter-spacing: 2px;
+ transition: border-color 0.15s ease, box-shadow 0.15s ease;
+}
+
+.back-button:hover {
+ border-color: var(--text-color);
+ box-shadow: var(--matrix-green-glow);
+}
diff --git a/tests/test_api_attackers.py b/tests/test_api_attackers.py
new file mode 100644
index 0000000..2b62399
--- /dev/null
+++ b/tests/test_api_attackers.py
@@ -0,0 +1,213 @@
+"""
+Tests for the attacker profile API routes.
+
+Covers:
+- GET /attackers: paginated list, search, sort_by
+- GET /attackers/{uuid}: single profile detail, 404 on missing UUID
+- Auth enforcement on both endpoints
+"""
+
+import json
+from datetime import datetime, timezone
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+from fastapi import HTTPException
+
+from decnet.web.auth import create_access_token
+
+
+# ─── Helpers ──────────────────────────────────────────────────────────────────
+
+def _auth_request(uuid: str = "test-user-uuid") -> MagicMock:
+ token = create_access_token({"uuid": uuid})
+ req = MagicMock()
+ req.headers = {"Authorization": f"Bearer {token}"}
+ return req
+
+
+def _sample_attacker(uuid: str = "att-uuid-1", ip: str = "1.2.3.4") -> dict:
+ return {
+ "uuid": uuid,
+ "ip": ip,
+ "first_seen": datetime(2026, 4, 1, tzinfo=timezone.utc).isoformat(),
+ "last_seen": datetime(2026, 4, 10, tzinfo=timezone.utc).isoformat(),
+ "event_count": 42,
+ "service_count": 3,
+ "decky_count": 2,
+ "services": ["ssh", "http", "ftp"],
+ "deckies": ["decky-01", "decky-02"],
+ "traversal_path": "decky-01 → decky-02",
+ "is_traversal": True,
+ "bounty_count": 5,
+ "credential_count": 2,
+ "fingerprints": [{"type": "ja3", "hash": "abc"}],
+ "commands": [{"service": "ssh", "decky": "decky-01", "command": "id", "timestamp": "2026-04-01T10:00:00"}],
+ "updated_at": datetime(2026, 4, 10, tzinfo=timezone.utc).isoformat(),
+ }
+
+
+# ─── GET /attackers ──────────────────────────────────────────────────────────
+
+class TestGetAttackers:
+ @pytest.mark.asyncio
+ async def test_returns_paginated_response(self):
+ from decnet.web.router.attackers.api_get_attackers import get_attackers
+
+ sample = _sample_attacker()
+ with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo:
+ mock_repo.get_attackers = AsyncMock(return_value=[sample])
+ mock_repo.get_total_attackers = AsyncMock(return_value=1)
+
+ result = await get_attackers(
+ limit=50, offset=0, search=None, sort_by="recent",
+ current_user="test-user",
+ )
+
+ assert result["total"] == 1
+ assert result["limit"] == 50
+ assert result["offset"] == 0
+ assert len(result["data"]) == 1
+ assert result["data"][0]["uuid"] == "att-uuid-1"
+
+ @pytest.mark.asyncio
+ async def test_search_parameter_forwarded(self):
+ from decnet.web.router.attackers.api_get_attackers import get_attackers
+
+ with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo:
+ mock_repo.get_attackers = AsyncMock(return_value=[])
+ mock_repo.get_total_attackers = AsyncMock(return_value=0)
+
+ await get_attackers(
+ limit=50, offset=0, search="192.168", sort_by="recent",
+ current_user="test-user",
+ )
+
+ mock_repo.get_attackers.assert_awaited_once_with(
+ limit=50, offset=0, search="192.168", sort_by="recent",
+ )
+ mock_repo.get_total_attackers.assert_awaited_once_with(search="192.168")
+
+ @pytest.mark.asyncio
+ async def test_null_search_normalized(self):
+ from decnet.web.router.attackers.api_get_attackers import get_attackers
+
+ with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo:
+ mock_repo.get_attackers = AsyncMock(return_value=[])
+ mock_repo.get_total_attackers = AsyncMock(return_value=0)
+
+ await get_attackers(
+ limit=50, offset=0, search="null", sort_by="recent",
+ current_user="test-user",
+ )
+
+ mock_repo.get_attackers.assert_awaited_once_with(
+ limit=50, offset=0, search=None, sort_by="recent",
+ )
+
+ @pytest.mark.asyncio
+ async def test_sort_by_active(self):
+ from decnet.web.router.attackers.api_get_attackers import get_attackers
+
+ with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo:
+ mock_repo.get_attackers = AsyncMock(return_value=[])
+ mock_repo.get_total_attackers = AsyncMock(return_value=0)
+
+ await get_attackers(
+ limit=50, offset=0, search=None, sort_by="active",
+ current_user="test-user",
+ )
+
+ mock_repo.get_attackers.assert_awaited_once_with(
+ limit=50, offset=0, search=None, sort_by="active",
+ )
+
+ @pytest.mark.asyncio
+ async def test_empty_search_normalized_to_none(self):
+ from decnet.web.router.attackers.api_get_attackers import get_attackers
+
+ with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo:
+ mock_repo.get_attackers = AsyncMock(return_value=[])
+ mock_repo.get_total_attackers = AsyncMock(return_value=0)
+
+ await get_attackers(
+ limit=50, offset=0, search="", sort_by="recent",
+ current_user="test-user",
+ )
+
+ mock_repo.get_attackers.assert_awaited_once_with(
+ limit=50, offset=0, search=None, sort_by="recent",
+ )
+
+
+# ─── GET /attackers/{uuid} ───────────────────────────────────────────────────
+
+class TestGetAttackerDetail:
+ @pytest.mark.asyncio
+ async def test_returns_attacker_by_uuid(self):
+ from decnet.web.router.attackers.api_get_attacker_detail import get_attacker_detail
+
+ sample = _sample_attacker()
+ with patch("decnet.web.router.attackers.api_get_attacker_detail.repo") as mock_repo:
+ mock_repo.get_attacker_by_uuid = AsyncMock(return_value=sample)
+
+ result = await get_attacker_detail(uuid="att-uuid-1", current_user="test-user")
+
+ assert result["uuid"] == "att-uuid-1"
+ assert result["ip"] == "1.2.3.4"
+ assert result["is_traversal"] is True
+ assert isinstance(result["commands"], list)
+
+ @pytest.mark.asyncio
+ async def test_404_on_unknown_uuid(self):
+ from decnet.web.router.attackers.api_get_attacker_detail import get_attacker_detail
+
+ with patch("decnet.web.router.attackers.api_get_attacker_detail.repo") as mock_repo:
+ mock_repo.get_attacker_by_uuid = AsyncMock(return_value=None)
+
+ with pytest.raises(HTTPException) as exc_info:
+ await get_attacker_detail(uuid="nonexistent", current_user="test-user")
+
+ assert exc_info.value.status_code == 404
+
+ @pytest.mark.asyncio
+ async def test_deserialized_json_fields(self):
+ from decnet.web.router.attackers.api_get_attacker_detail import get_attacker_detail
+
+ sample = _sample_attacker()
+ with patch("decnet.web.router.attackers.api_get_attacker_detail.repo") as mock_repo:
+ mock_repo.get_attacker_by_uuid = AsyncMock(return_value=sample)
+
+ result = await get_attacker_detail(uuid="att-uuid-1", current_user="test-user")
+
+ assert isinstance(result["services"], list)
+ assert isinstance(result["deckies"], list)
+ assert isinstance(result["fingerprints"], list)
+ assert isinstance(result["commands"], list)
+
+
+# ─── Auth enforcement ────────────────────────────────────────────────────────
+
+class TestAttackersAuth:
+ @pytest.mark.asyncio
+ async def test_list_requires_auth(self):
+ """get_current_user dependency raises 401 when called without valid token."""
+ from decnet.web.dependencies import get_current_user
+
+ req = MagicMock()
+ req.headers = {}
+
+ with pytest.raises(HTTPException) as exc_info:
+ await get_current_user(req)
+ assert exc_info.value.status_code == 401
+
+ @pytest.mark.asyncio
+ async def test_detail_requires_auth(self):
+ from decnet.web.dependencies import get_current_user
+
+ req = MagicMock()
+ req.headers = {"Authorization": "Bearer bad-token"}
+
+ with pytest.raises(HTTPException) as exc_info:
+ await get_current_user(req)
+ assert exc_info.value.status_code == 401
diff --git a/tests/test_attacker_worker.py b/tests/test_attacker_worker.py
index 57f44fe..7c7ceaa 100644
--- a/tests/test_attacker_worker.py
+++ b/tests/test_attacker_worker.py
@@ -2,8 +2,10 @@
Tests for decnet/web/attacker_worker.py
Covers:
-- _rebuild(): CorrelationEngine integration, traversal detection, upsert calls
-- _extract_commands(): command harvesting from raw log rows
+- _cold_start(): full build on first run, cursor persistence
+- _incremental_update(): delta processing, affected-IP-only updates
+- _update_profiles(): traversal detection, bounty merging
+- _extract_commands_from_events(): command harvesting from LogEvent objects
- _build_record(): record assembly from engine events + bounties
- _first_contact_deckies(): ordering for single-decky attackers
- attacker_profile_worker(): cancellation and error handling
@@ -18,15 +20,20 @@ from unittest.mock import AsyncMock, MagicMock, patch
import pytest
+from decnet.correlation.parser import LogEvent
from decnet.logging.syslog_formatter import SEVERITY_INFO, format_rfc5424
from decnet.web.attacker_worker import (
+ _BATCH_SIZE,
+ _STATE_KEY,
+ _WorkerState,
_build_record,
- _extract_commands,
+ _cold_start,
+ _extract_commands_from_events,
_first_contact_deckies,
- _rebuild,
+ _incremental_update,
+ _update_profiles,
attacker_profile_worker,
)
-from decnet.correlation.parser import LogEvent
# ─── Helpers ──────────────────────────────────────────────────────────────────
@@ -59,6 +66,7 @@ def _make_raw_line(
def _make_log_row(
+ row_id: int = 1,
raw_line: str = "",
attacker_ip: str = "1.2.3.4",
service: str = "ssh",
@@ -76,7 +84,7 @@ def _make_log_row(
timestamp=timestamp.isoformat(),
)
return {
- "id": 1,
+ "id": row_id,
"raw_line": raw_line,
"attacker_ip": attacker_ip,
"service": service,
@@ -87,10 +95,15 @@ def _make_log_row(
}
-def _make_repo(logs=None, bounties=None):
+def _make_repo(logs=None, bounties=None, bounties_for_ips=None, max_log_id=0, saved_state=None):
repo = MagicMock()
repo.get_all_logs_raw = AsyncMock(return_value=logs or [])
repo.get_all_bounties_by_ip = AsyncMock(return_value=bounties or {})
+ repo.get_bounties_for_ips = AsyncMock(return_value=bounties_for_ips or {})
+ repo.get_max_log_id = AsyncMock(return_value=max_log_id)
+ repo.get_logs_after_id = AsyncMock(return_value=[])
+ repo.get_state = AsyncMock(return_value=saved_state)
+ repo.set_state = AsyncMock()
repo.upsert_attacker = AsyncMock()
return repo
@@ -101,6 +114,7 @@ def _make_log_event(
service: str = "ssh",
event_type: str = "connection",
timestamp: datetime = _DT1,
+ fields: dict | None = None,
) -> LogEvent:
return LogEvent(
timestamp=timestamp,
@@ -108,7 +122,7 @@ def _make_log_event(
service=service,
event_type=event_type,
attacker_ip=ip,
- fields={},
+ fields=fields or {},
raw="",
)
@@ -138,75 +152,52 @@ class TestFirstContactDeckies:
assert result.count("decky-01") == 1
-# ─── _extract_commands ────────────────────────────────────────────────────────
-
-class TestExtractCommands:
- def _row(self, ip, event_type, fields):
- return _make_log_row(
- attacker_ip=ip,
- event_type=event_type,
- service="ssh",
- decky="decky-01",
- fields=json.dumps(fields),
- )
+# ─── _extract_commands_from_events ───────────────────────────────────────────
+class TestExtractCommandsFromEvents:
def test_extracts_command_field(self):
- rows = [self._row("1.1.1.1", "command", {"command": "id"})]
- result = _extract_commands(rows, "1.1.1.1")
+ events = [_make_log_event("1.1.1.1", "decky-01", "ssh", "command", _DT1, {"command": "id"})]
+ result = _extract_commands_from_events(events)
assert len(result) == 1
assert result[0]["command"] == "id"
assert result[0]["service"] == "ssh"
assert result[0]["decky"] == "decky-01"
def test_extracts_query_field(self):
- rows = [self._row("2.2.2.2", "query", {"query": "SELECT * FROM users"})]
- result = _extract_commands(rows, "2.2.2.2")
+ events = [_make_log_event("2.2.2.2", "decky-01", "mysql", "query", _DT1, {"query": "SELECT * FROM users"})]
+ result = _extract_commands_from_events(events)
assert len(result) == 1
assert result[0]["command"] == "SELECT * FROM users"
def test_extracts_input_field(self):
- rows = [self._row("3.3.3.3", "input", {"input": "ls -la"})]
- result = _extract_commands(rows, "3.3.3.3")
+ events = [_make_log_event("3.3.3.3", "decky-01", "ssh", "input", _DT1, {"input": "ls -la"})]
+ result = _extract_commands_from_events(events)
assert len(result) == 1
assert result[0]["command"] == "ls -la"
def test_non_command_event_type_ignored(self):
- rows = [self._row("1.1.1.1", "connection", {"command": "id"})]
- result = _extract_commands(rows, "1.1.1.1")
- assert result == []
-
- def test_wrong_ip_ignored(self):
- rows = [self._row("9.9.9.9", "command", {"command": "whoami"})]
- result = _extract_commands(rows, "1.1.1.1")
+ events = [_make_log_event("1.1.1.1", "decky-01", "ssh", "connection", _DT1, {"command": "id"})]
+ result = _extract_commands_from_events(events)
assert result == []
def test_no_command_field_skipped(self):
- rows = [self._row("1.1.1.1", "command", {"other": "stuff"})]
- result = _extract_commands(rows, "1.1.1.1")
- assert result == []
-
- def test_invalid_json_fields_skipped(self):
- row = _make_log_row(
- attacker_ip="1.1.1.1",
- event_type="command",
- fields="not valid json",
- )
- result = _extract_commands([row], "1.1.1.1")
+ events = [_make_log_event("1.1.1.1", "decky-01", "ssh", "command", _DT1, {"other": "stuff"})]
+ result = _extract_commands_from_events(events)
assert result == []
def test_multiple_commands_all_extracted(self):
- rows = [
- self._row("5.5.5.5", "command", {"command": "id"}),
- self._row("5.5.5.5", "command", {"command": "uname -a"}),
+ events = [
+ _make_log_event("5.5.5.5", "decky-01", "ssh", "command", _DT1, {"command": "id"}),
+ _make_log_event("5.5.5.5", "decky-01", "ssh", "command", _DT2, {"command": "uname -a"}),
]
- result = _extract_commands(rows, "5.5.5.5")
+ result = _extract_commands_from_events(events)
assert len(result) == 2
cmds = {r["command"] for r in result}
assert cmds == {"id", "uname -a"}
def test_timestamp_serialized_to_string(self):
- rows = [self._row("1.1.1.1", "command", {"command": "pwd"})]
- result = _extract_commands(rows, "1.1.1.1")
+ events = [_make_log_event("1.1.1.1", "decky-01", "ssh", "command", _DT1, {"command": "pwd"})]
+ result = _extract_commands_from_events(events)
assert isinstance(result[0]["timestamp"], str)
@@ -291,112 +282,283 @@ class TestBuildRecord:
assert record["updated_at"].tzinfo is not None
-# ─── _rebuild ─────────────────────────────────────────────────────────────────
+# ─── _cold_start ─────────────────────────────────────────────────────────────
-class TestRebuild:
+class TestColdStart:
@pytest.mark.asyncio
- async def test_empty_logs_no_upsert(self):
- repo = _make_repo(logs=[])
- await _rebuild(repo)
- repo.upsert_attacker.assert_not_awaited()
-
- @pytest.mark.asyncio
- async def test_single_attacker_upserted(self):
- raw = _make_raw_line("ssh", "decky-01", "connection", "10.0.0.1", _TS1)
- row = _make_log_row(raw_line=raw, attacker_ip="10.0.0.1")
- repo = _make_repo(logs=[row])
- await _rebuild(repo)
- repo.upsert_attacker.assert_awaited_once()
- record = repo.upsert_attacker.call_args[0][0]
- assert record["ip"] == "10.0.0.1"
- assert record["event_count"] == 1
-
- @pytest.mark.asyncio
- async def test_multiple_attackers_all_upserted(self):
+ async def test_cold_start_builds_all_profiles(self):
rows = [
_make_log_row(
+ row_id=i + 1,
raw_line=_make_raw_line("ssh", "decky-01", "conn", ip, _TS1),
attacker_ip=ip,
)
- for ip in ["1.1.1.1", "2.2.2.2", "3.3.3.3"]
+ for i, ip in enumerate(["1.1.1.1", "2.2.2.2", "3.3.3.3"])
]
- repo = _make_repo(logs=rows)
- await _rebuild(repo)
+ repo = _make_repo(logs=rows, max_log_id=3)
+ state = _WorkerState()
+
+ await _cold_start(repo, state)
+
+ assert state.initialized is True
+ assert state.last_log_id == 3
assert repo.upsert_attacker.await_count == 3
upserted_ips = {c[0][0]["ip"] for c in repo.upsert_attacker.call_args_list}
assert upserted_ips == {"1.1.1.1", "2.2.2.2", "3.3.3.3"}
+ repo.set_state.assert_awaited_with(_STATE_KEY, {"last_log_id": 3})
@pytest.mark.asyncio
- async def test_traversal_detected_across_two_deckies(self):
+ async def test_cold_start_empty_db(self):
+ repo = _make_repo(logs=[], max_log_id=0)
+ state = _WorkerState()
+
+ await _cold_start(repo, state)
+
+ assert state.initialized is True
+ assert state.last_log_id == 0
+ repo.upsert_attacker.assert_not_awaited()
+ repo.set_state.assert_awaited()
+
+ @pytest.mark.asyncio
+ async def test_cold_start_traversal_detected(self):
rows = [
_make_log_row(
+ row_id=1,
raw_line=_make_raw_line("ssh", "decky-01", "conn", "5.5.5.5", _TS1),
attacker_ip="5.5.5.5", decky="decky-01",
),
_make_log_row(
+ row_id=2,
raw_line=_make_raw_line("http", "decky-02", "req", "5.5.5.5", _TS2),
attacker_ip="5.5.5.5", decky="decky-02",
),
]
- repo = _make_repo(logs=rows)
- await _rebuild(repo)
+ repo = _make_repo(logs=rows, max_log_id=2)
+ state = _WorkerState()
+
+ await _cold_start(repo, state)
+
record = repo.upsert_attacker.call_args[0][0]
assert record["is_traversal"] is True
assert "decky-01" in record["traversal_path"]
assert "decky-02" in record["traversal_path"]
@pytest.mark.asyncio
- async def test_single_decky_not_traversal(self):
- rows = [
- _make_log_row(
- raw_line=_make_raw_line("ssh", "decky-01", "conn", "7.7.7.7", _TS1),
- attacker_ip="7.7.7.7",
- ),
- _make_log_row(
- raw_line=_make_raw_line("http", "decky-01", "req", "7.7.7.7", _TS2),
- attacker_ip="7.7.7.7",
- ),
- ]
- repo = _make_repo(logs=rows)
- await _rebuild(repo)
- record = repo.upsert_attacker.call_args[0][0]
- assert record["is_traversal"] is False
-
- @pytest.mark.asyncio
- async def test_bounties_merged_into_record(self):
+ async def test_cold_start_bounties_merged(self):
raw = _make_raw_line("ssh", "decky-01", "conn", "8.8.8.8", _TS1)
repo = _make_repo(
- logs=[_make_log_row(raw_line=raw, attacker_ip="8.8.8.8")],
- bounties={"8.8.8.8": [
+ logs=[_make_log_row(row_id=1, raw_line=raw, attacker_ip="8.8.8.8")],
+ max_log_id=1,
+ bounties_for_ips={"8.8.8.8": [
{"bounty_type": "credential", "attacker_ip": "8.8.8.8", "payload": {}},
{"bounty_type": "fingerprint", "attacker_ip": "8.8.8.8", "payload": {"ja3": "abc"}},
]},
)
- await _rebuild(repo)
+ state = _WorkerState()
+
+ await _cold_start(repo, state)
+
record = repo.upsert_attacker.call_args[0][0]
assert record["bounty_count"] == 2
assert record["credential_count"] == 1
- fps = json.loads(record["fingerprints"])
- assert len(fps) == 1
@pytest.mark.asyncio
- async def test_commands_extracted_during_rebuild(self):
- raw = _make_raw_line("ssh", "decky-01", "command", "9.9.9.9", _TS1)
+ async def test_cold_start_commands_extracted(self):
+ raw = _make_raw_line("ssh", "decky-01", "command", "9.9.9.9", _TS1, command="cat /etc/passwd")
row = _make_log_row(
+ row_id=1,
raw_line=raw,
attacker_ip="9.9.9.9",
event_type="command",
fields=json.dumps({"command": "cat /etc/passwd"}),
)
- repo = _make_repo(logs=[row])
- await _rebuild(repo)
+ repo = _make_repo(logs=[row], max_log_id=1)
+ state = _WorkerState()
+
+ await _cold_start(repo, state)
+
record = repo.upsert_attacker.call_args[0][0]
commands = json.loads(record["commands"])
assert len(commands) == 1
assert commands[0]["command"] == "cat /etc/passwd"
-# ─── attacker_profile_worker ──────────────────────────────────────────────────
+# ─── _incremental_update ────────────────────────────────────────────────────
+
+class TestIncrementalUpdate:
+ @pytest.mark.asyncio
+ async def test_no_new_logs_skips_upsert(self):
+ repo = _make_repo()
+ state = _WorkerState(initialized=True, last_log_id=10)
+
+ await _incremental_update(repo, state)
+
+ repo.upsert_attacker.assert_not_awaited()
+ repo.set_state.assert_awaited_with(_STATE_KEY, {"last_log_id": 10})
+
+ @pytest.mark.asyncio
+ async def test_only_affected_ips_upserted(self):
+ """Pre-populate engine with IP-A, then feed new logs only for IP-B."""
+ state = _WorkerState(initialized=True, last_log_id=5)
+ # Pre-populate engine with IP-A events
+ line_a = _make_raw_line("ssh", "decky-01", "conn", "1.1.1.1", _TS1)
+ state.engine.ingest(line_a)
+
+ # New batch has only IP-B
+ new_row = _make_log_row(
+ row_id=6,
+ raw_line=_make_raw_line("ssh", "decky-01", "conn", "2.2.2.2", _TS2),
+ attacker_ip="2.2.2.2",
+ )
+ repo = _make_repo()
+ repo.get_logs_after_id = AsyncMock(return_value=[new_row])
+
+ await _incremental_update(repo, state)
+
+ assert repo.upsert_attacker.await_count == 1
+ upserted_ip = repo.upsert_attacker.call_args[0][0]["ip"]
+ assert upserted_ip == "2.2.2.2"
+
+ @pytest.mark.asyncio
+ async def test_merges_with_existing_engine_state(self):
+ """Engine has 2 events for IP. New batch adds 1 more. Record should show event_count=3."""
+ state = _WorkerState(initialized=True, last_log_id=2)
+ state.engine.ingest(_make_raw_line("ssh", "decky-01", "conn", "1.1.1.1", _TS1))
+ state.engine.ingest(_make_raw_line("http", "decky-01", "req", "1.1.1.1", _TS2))
+
+ new_row = _make_log_row(
+ row_id=3,
+ raw_line=_make_raw_line("ftp", "decky-01", "login", "1.1.1.1", _TS3),
+ attacker_ip="1.1.1.1",
+ )
+ repo = _make_repo()
+ repo.get_logs_after_id = AsyncMock(return_value=[new_row])
+
+ await _incremental_update(repo, state)
+
+ record = repo.upsert_attacker.call_args[0][0]
+ assert record["event_count"] == 3
+ assert record["ip"] == "1.1.1.1"
+
+ @pytest.mark.asyncio
+ async def test_cursor_persisted_after_update(self):
+ new_row = _make_log_row(
+ row_id=42,
+ raw_line=_make_raw_line("ssh", "decky-01", "conn", "1.1.1.1", _TS1),
+ attacker_ip="1.1.1.1",
+ )
+ repo = _make_repo()
+ repo.get_logs_after_id = AsyncMock(return_value=[new_row])
+ state = _WorkerState(initialized=True, last_log_id=41)
+
+ await _incremental_update(repo, state)
+
+ assert state.last_log_id == 42
+ repo.set_state.assert_awaited_with(_STATE_KEY, {"last_log_id": 42})
+
+ @pytest.mark.asyncio
+ async def test_traversal_detected_across_cycles(self):
+ """IP hits decky-01 during cold start, decky-02 in incremental → traversal."""
+ state = _WorkerState(initialized=True, last_log_id=1)
+ state.engine.ingest(_make_raw_line("ssh", "decky-01", "conn", "5.5.5.5", _TS1))
+
+ new_row = _make_log_row(
+ row_id=2,
+ raw_line=_make_raw_line("http", "decky-02", "req", "5.5.5.5", _TS2),
+ attacker_ip="5.5.5.5",
+ )
+ repo = _make_repo()
+ repo.get_logs_after_id = AsyncMock(return_value=[new_row])
+
+ await _incremental_update(repo, state)
+
+ record = repo.upsert_attacker.call_args[0][0]
+ assert record["is_traversal"] is True
+ assert "decky-01" in record["traversal_path"]
+ assert "decky-02" in record["traversal_path"]
+
+ @pytest.mark.asyncio
+ async def test_batch_loop_processes_all(self):
+ """First batch returns BATCH_SIZE rows, second returns fewer — all processed."""
+ batch_1 = [
+ _make_log_row(
+ row_id=i + 1,
+ raw_line=_make_raw_line("ssh", "decky-01", "conn", f"10.0.0.{i}", _TS1),
+ attacker_ip=f"10.0.0.{i}",
+ )
+ for i in range(_BATCH_SIZE)
+ ]
+ batch_2 = [
+ _make_log_row(
+ row_id=_BATCH_SIZE + 1,
+ raw_line=_make_raw_line("ssh", "decky-01", "conn", "10.0.1.1", _TS2),
+ attacker_ip="10.0.1.1",
+ ),
+ ]
+
+ call_count = 0
+
+ async def mock_get_logs(last_id, limit=_BATCH_SIZE):
+ nonlocal call_count
+ call_count += 1
+ if call_count == 1:
+ return batch_1
+ elif call_count == 2:
+ return batch_2
+ return []
+
+ repo = _make_repo()
+ repo.get_logs_after_id = AsyncMock(side_effect=mock_get_logs)
+ state = _WorkerState(initialized=True, last_log_id=0)
+
+ await _incremental_update(repo, state)
+
+ assert state.last_log_id == _BATCH_SIZE + 1
+ assert repo.upsert_attacker.await_count == _BATCH_SIZE + 1
+
+ @pytest.mark.asyncio
+ async def test_bounties_fetched_only_for_affected_ips(self):
+ new_rows = [
+ _make_log_row(
+ row_id=1,
+ raw_line=_make_raw_line("ssh", "decky-01", "conn", "1.1.1.1", _TS1),
+ attacker_ip="1.1.1.1",
+ ),
+ _make_log_row(
+ row_id=2,
+ raw_line=_make_raw_line("ssh", "decky-01", "conn", "2.2.2.2", _TS2),
+ attacker_ip="2.2.2.2",
+ ),
+ ]
+ repo = _make_repo()
+ repo.get_logs_after_id = AsyncMock(return_value=new_rows)
+ state = _WorkerState(initialized=True, last_log_id=0)
+
+ await _incremental_update(repo, state)
+
+ repo.get_bounties_for_ips.assert_awaited_once()
+ called_ips = repo.get_bounties_for_ips.call_args[0][0]
+ assert called_ips == {"1.1.1.1", "2.2.2.2"}
+
+ @pytest.mark.asyncio
+ async def test_uninitialized_state_triggers_cold_start(self):
+ rows = [
+ _make_log_row(
+ row_id=1,
+ raw_line=_make_raw_line("ssh", "decky-01", "conn", "1.1.1.1", _TS1),
+ attacker_ip="1.1.1.1",
+ ),
+ ]
+ repo = _make_repo(logs=rows, max_log_id=1)
+ state = _WorkerState()
+
+ await _incremental_update(repo, state)
+
+ assert state.initialized is True
+ repo.get_all_logs_raw.assert_awaited_once()
+
+
+# ─── attacker_profile_worker ────────────────────────────────────────────────
class TestAttackerProfileWorker:
@pytest.mark.asyncio
@@ -409,7 +571,7 @@ class TestAttackerProfileWorker:
await task
@pytest.mark.asyncio
- async def test_worker_handles_rebuild_error_without_crashing(self):
+ async def test_worker_handles_update_error_without_crashing(self):
repo = _make_repo()
_call_count = 0
@@ -419,16 +581,16 @@ class TestAttackerProfileWorker:
if _call_count >= 2:
raise asyncio.CancelledError()
- async def bad_rebuild(_repo):
+ async def bad_update(_repo, _state):
raise RuntimeError("DB exploded")
with patch("decnet.web.attacker_worker.asyncio.sleep", side_effect=fake_sleep):
- with patch("decnet.web.attacker_worker._rebuild", side_effect=bad_rebuild):
+ with patch("decnet.web.attacker_worker._incremental_update", side_effect=bad_update):
with pytest.raises(asyncio.CancelledError):
await attacker_profile_worker(repo)
@pytest.mark.asyncio
- async def test_worker_calls_rebuild_after_sleep(self):
+ async def test_worker_calls_update_after_sleep(self):
repo = _make_repo()
_call_count = 0
@@ -438,17 +600,17 @@ class TestAttackerProfileWorker:
if _call_count >= 2:
raise asyncio.CancelledError()
- rebuild_calls = []
+ update_calls = []
- async def mock_rebuild(_repo):
- rebuild_calls.append(True)
+ async def mock_update(_repo, _state):
+ update_calls.append(True)
with patch("decnet.web.attacker_worker.asyncio.sleep", side_effect=fake_sleep):
- with patch("decnet.web.attacker_worker._rebuild", side_effect=mock_rebuild):
+ with patch("decnet.web.attacker_worker._incremental_update", side_effect=mock_update):
with pytest.raises(asyncio.CancelledError):
await attacker_profile_worker(repo)
- assert len(rebuild_calls) >= 1
+ assert len(update_calls) >= 1
# ─── JA3 bounty extraction from ingester ─────────────────────────────────────
diff --git a/tests/test_base_repo.py b/tests/test_base_repo.py
index 5ba51db..dad3496 100644
--- a/tests/test_base_repo.py
+++ b/tests/test_base_repo.py
@@ -22,8 +22,12 @@ class DummyRepo(BaseRepository):
async def get_state(self, k): await super().get_state(k)
async def set_state(self, k, v): await super().set_state(k, v)
async def get_all_logs_raw(self): await super().get_all_logs_raw()
+ async def get_max_log_id(self): await super().get_max_log_id()
+ async def get_logs_after_id(self, last_id, limit=500): await super().get_logs_after_id(last_id, limit)
async def get_all_bounties_by_ip(self): await super().get_all_bounties_by_ip()
+ async def get_bounties_for_ips(self, ips): await super().get_bounties_for_ips(ips)
async def upsert_attacker(self, d): await super().upsert_attacker(d)
+ async def get_attacker_by_uuid(self, u): await super().get_attacker_by_uuid(u)
async def get_attackers(self, **kw): await super().get_attackers(**kw)
async def get_total_attackers(self, **kw): await super().get_total_attackers(**kw)
@@ -47,7 +51,11 @@ async def test_base_repo_coverage():
await dr.get_state("k")
await dr.set_state("k", "v")
await dr.get_all_logs_raw()
+ await dr.get_max_log_id()
+ await dr.get_logs_after_id(0)
await dr.get_all_bounties_by_ip()
+ await dr.get_bounties_for_ips({"1.1.1.1"})
await dr.upsert_attacker({})
+ await dr.get_attacker_by_uuid("a")
await dr.get_attackers()
await dr.get_total_attackers()