From a022b4fed679a7f19ece4557285e24491f21633c Mon Sep 17 00:00:00 2001 From: anti Date: Mon, 13 Apr 2026 22:35:13 -0400 Subject: [PATCH] =?UTF-8?q?feat:=20attacker=20profiles=20=E2=80=94=20UUID?= =?UTF-8?q?=20model,=20API=20routes,=20list/detail=20frontend?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Migrate Attacker model from IP-based to UUID-based primary key with auto-migration for old schema. Add GET /attackers (paginated, search, sort) and GET /attackers/{uuid} API routes. Rewrite Attackers.tsx as a card grid with full threat info and create AttackerDetail.tsx as a dedicated detail page with back navigation, stats, commands table, and fingerprints. --- decnet/web/attacker_worker.py | 147 ++++--- decnet/web/db/models.py | 3 +- decnet/web/db/repository.py | 20 + decnet/web/db/sqlite/repository.py | 51 ++- decnet/web/router/__init__.py | 6 + decnet/web/router/attackers/__init__.py | 0 .../attackers/api_get_attacker_detail.py | 26 ++ .../web/router/attackers/api_get_attackers.py | 36 ++ decnet_web/src/App.tsx | 2 + decnet_web/src/components/AttackerDetail.tsx | 258 ++++++++++++ decnet_web/src/components/Attackers.tsx | 234 ++++++++++- decnet_web/src/components/Dashboard.css | 58 +++ tests/test_api_attackers.py | 213 ++++++++++ tests/test_attacker_worker.py | 386 +++++++++++++----- tests/test_base_repo.py | 8 + 15 files changed, 1266 insertions(+), 182 deletions(-) create mode 100644 decnet/web/router/attackers/__init__.py create mode 100644 decnet/web/router/attackers/api_get_attacker_detail.py create mode 100644 decnet/web/router/attackers/api_get_attackers.py create mode 100644 decnet_web/src/components/AttackerDetail.tsx create mode 100644 tests/test_api_attackers.py diff --git a/decnet/web/attacker_worker.py b/decnet/web/attacker_worker.py index 7d207fa..3b633a9 100644 --- a/decnet/web/attacker_worker.py +++ b/decnet/web/attacker_worker.py @@ -1,21 +1,20 @@ """ -Attacker profile builder — background worker. +Attacker profile builder — incremental background worker. -Periodically rebuilds the `attackers` table by: - 1. Feeding all stored Log.raw_line values through the CorrelationEngine - (which parses RFC 5424 and tracks per-IP event histories + traversals). - 2. Merging with the Bounty table (fingerprints, credentials). - 3. Extracting commands executed per IP from the structured log fields. - 4. Upserting one Attacker record per observed IP. +Maintains a persistent CorrelationEngine and a log-ID cursor across cycles. +On cold start (first cycle or process restart), performs one full build from +all stored logs. Subsequent cycles fetch only new logs via the cursor, +ingest them into the existing engine, and rebuild profiles for affected IPs +only. -Runs every _REBUILD_INTERVAL seconds. Full rebuild each cycle — simple and -correct at honeypot log volumes. +Complexity per cycle: O(new_logs + affected_ips) instead of O(total_logs²). """ from __future__ import annotations import asyncio import json +from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any @@ -27,6 +26,8 @@ from decnet.web.db.repository import BaseRepository logger = get_logger("attacker_worker") _REBUILD_INTERVAL = 30 # seconds +_BATCH_SIZE = 500 +_STATE_KEY = "attacker_worker_cursor" # Event types that indicate active command/query execution (not just connection/scan) _COMMAND_EVENT_TYPES = frozenset({ @@ -38,44 +39,95 @@ _COMMAND_EVENT_TYPES = frozenset({ _COMMAND_FIELDS = ("command", "query", "input", "line", "sql", "cmd") +@dataclass +class _WorkerState: + engine: CorrelationEngine = field(default_factory=CorrelationEngine) + last_log_id: int = 0 + initialized: bool = False + + async def attacker_profile_worker(repo: BaseRepository) -> None: - """Periodically rebuilds the Attacker table. Designed to run as an asyncio Task.""" + """Periodically updates the Attacker table incrementally. Designed to run as an asyncio Task.""" logger.info("attacker profile worker started interval=%ds", _REBUILD_INTERVAL) + state = _WorkerState() while True: await asyncio.sleep(_REBUILD_INTERVAL) try: - await _rebuild(repo) + await _incremental_update(repo, state) except Exception as exc: - logger.error("attacker worker: rebuild failed: %s", exc) + logger.error("attacker worker: update failed: %s", exc) -async def _rebuild(repo: BaseRepository) -> None: +async def _incremental_update(repo: BaseRepository, state: _WorkerState) -> None: + if not state.initialized: + await _cold_start(repo, state) + return + + affected_ips: set[str] = set() + + while True: + batch = await repo.get_logs_after_id(state.last_log_id, limit=_BATCH_SIZE) + if not batch: + break + + for row in batch: + event = state.engine.ingest(row["raw_line"]) + if event and event.attacker_ip: + affected_ips.add(event.attacker_ip) + state.last_log_id = row["id"] + + if len(batch) < _BATCH_SIZE: + break + + if not affected_ips: + await repo.set_state(_STATE_KEY, {"last_log_id": state.last_log_id}) + return + + await _update_profiles(repo, state, affected_ips) + await repo.set_state(_STATE_KEY, {"last_log_id": state.last_log_id}) + + logger.debug("attacker worker: updated %d profiles (incremental)", len(affected_ips)) + + +async def _cold_start(repo: BaseRepository, state: _WorkerState) -> None: all_logs = await repo.get_all_logs_raw() if not all_logs: + state.last_log_id = await repo.get_max_log_id() + state.initialized = True + await repo.set_state(_STATE_KEY, {"last_log_id": state.last_log_id}) return - # Feed raw RFC 5424 lines into the CorrelationEngine - engine = CorrelationEngine() for row in all_logs: - engine.ingest(row["raw_line"]) + state.engine.ingest(row["raw_line"]) + state.last_log_id = max(state.last_log_id, row["id"]) - if not engine._events: - return + all_ips = set(state.engine._events.keys()) + await _update_profiles(repo, state, all_ips) + await repo.set_state(_STATE_KEY, {"last_log_id": state.last_log_id}) - traversal_map = {t.attacker_ip: t for t in engine.traversals(min_deckies=2)} - all_bounties = await repo.get_all_bounties_by_ip() + state.initialized = True + logger.debug("attacker worker: cold start rebuilt %d profiles", len(all_ips)) + + +async def _update_profiles( + repo: BaseRepository, + state: _WorkerState, + ips: set[str], +) -> None: + traversal_map = {t.attacker_ip: t for t in state.engine.traversals(min_deckies=2)} + bounties_map = await repo.get_bounties_for_ips(ips) + + for ip in ips: + events = state.engine._events.get(ip, []) + if not events: + continue - count = 0 - for ip, events in engine._events.items(): traversal = traversal_map.get(ip) - bounties = all_bounties.get(ip, []) - commands = _extract_commands(all_logs, ip) + bounties = bounties_map.get(ip, []) + commands = _extract_commands_from_events(events) record = _build_record(ip, events, traversal, bounties, commands) await repo.upsert_attacker(record) - count += 1 - - logger.debug("attacker worker: rebuilt %d profiles", count) def _build_record( @@ -122,42 +174,20 @@ def _first_contact_deckies(events: list[LogEvent]) -> list[str]: return seen -def _extract_commands( - all_logs: list[dict[str, Any]], ip: str -) -> list[dict[str, Any]]: +def _extract_commands_from_events(events: list[LogEvent]) -> list[dict[str, Any]]: """ - Extract executed commands for a given attacker IP from raw log rows. + Extract executed commands from LogEvent objects. - Looks for rows where: - - attacker_ip matches - - event_type is a known command-execution type - - fields JSON contains a command-like key - - Returns a list of {service, decky, command, timestamp} dicts. + Works directly on LogEvent.fields (already a dict), so no JSON parsing needed. """ commands: list[dict[str, Any]] = [] - for row in all_logs: - if row.get("attacker_ip") != ip: + for event in events: + if event.event_type not in _COMMAND_EVENT_TYPES: continue - if row.get("event_type") not in _COMMAND_EVENT_TYPES: - continue - - raw_fields = row.get("fields") - if not raw_fields: - continue - - # fields is stored as a JSON string in the DB row - if isinstance(raw_fields, str): - try: - fields = json.loads(raw_fields) - except (json.JSONDecodeError, ValueError): - continue - else: - fields = raw_fields cmd_text: str | None = None for key in _COMMAND_FIELDS: - val = fields.get(key) + val = event.fields.get(key) if val: cmd_text = str(val) break @@ -165,12 +195,11 @@ def _extract_commands( if not cmd_text: continue - ts = row.get("timestamp") commands.append({ - "service": row.get("service", ""), - "decky": row.get("decky", ""), + "service": event.service, + "decky": event.decky, "command": cmd_text, - "timestamp": ts.isoformat() if isinstance(ts, datetime) else str(ts), + "timestamp": event.timestamp.isoformat(), }) return commands diff --git a/decnet/web/db/models.py b/decnet/web/db/models.py index a8e18d1..313489d 100644 --- a/decnet/web/db/models.py +++ b/decnet/web/db/models.py @@ -53,7 +53,8 @@ class State(SQLModel, table=True): class Attacker(SQLModel, table=True): __tablename__ = "attackers" - ip: str = Field(primary_key=True) + uuid: str = Field(primary_key=True) + ip: str = Field(index=True) first_seen: datetime = Field(index=True) last_seen: datetime = Field(index=True) event_count: int = Field(default=0) diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 7fcfdaa..ecca4a1 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -96,16 +96,36 @@ class BaseRepository(ABC): """Retrieve all log rows with fields needed by the attacker profile worker.""" pass + @abstractmethod + async def get_max_log_id(self) -> int: + """Return the highest log ID, or 0 if the table is empty.""" + pass + + @abstractmethod + async def get_logs_after_id(self, last_id: int, limit: int = 500) -> list[dict[str, Any]]: + """Return logs with id > last_id, ordered by id ASC, up to limit.""" + pass + @abstractmethod async def get_all_bounties_by_ip(self) -> dict[str, list[dict[str, Any]]]: """Retrieve all bounty rows grouped by attacker_ip.""" pass + @abstractmethod + async def get_bounties_for_ips(self, ips: set[str]) -> dict[str, list[dict[str, Any]]]: + """Retrieve bounty rows grouped by attacker_ip, filtered to only the given IPs.""" + pass + @abstractmethod async def upsert_attacker(self, data: dict[str, Any]) -> None: """Insert or replace an attacker profile record.""" pass + @abstractmethod + async def get_attacker_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: + """Retrieve a single attacker profile by UUID.""" + pass + @abstractmethod async def get_attackers( self, diff --git a/decnet/web/db/sqlite/repository.py b/decnet/web/db/sqlite/repository.py index 49606cf..db6bd3f 100644 --- a/decnet/web/db/sqlite/repository.py +++ b/decnet/web/db/sqlite/repository.py @@ -29,6 +29,7 @@ class SQLiteRepository(BaseRepository): async def initialize(self) -> None: """Async warm-up / verification. Creates tables if they don't exist.""" from sqlmodel import SQLModel + await self._migrate_attackers_table() async with self.engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all) @@ -47,6 +48,13 @@ class SQLiteRepository(BaseRepository): )) await session.commit() + async def _migrate_attackers_table(self) -> None: + """Drop the old attackers table if it lacks the uuid column (pre-UUID schema).""" + async with self.engine.begin() as conn: + rows = (await conn.execute(text("PRAGMA table_info(attackers)"))).fetchall() + if rows and not any(r[1] == "uuid" for r in rows): + await conn.execute(text("DROP TABLE attackers")) + async def reinitialize(self) -> None: """Initialize the database schema asynchronously (useful for tests).""" from sqlmodel import SQLModel @@ -418,6 +426,22 @@ class SQLiteRepository(BaseRepository): grouped[item.attacker_ip].append(d) return dict(grouped) + async def get_bounties_for_ips(self, ips: set[str]) -> dict[str, List[dict[str, Any]]]: + from collections import defaultdict + async with self.session_factory() as session: + result = await session.execute( + select(Bounty).where(Bounty.attacker_ip.in_(ips)).order_by(asc(Bounty.timestamp)) + ) + grouped: dict[str, List[dict[str, Any]]] = defaultdict(list) + for item in result.scalars().all(): + d = item.model_dump(mode="json") + try: + d["payload"] = json.loads(d["payload"]) + except (json.JSONDecodeError, TypeError): + pass + grouped[item.attacker_ip].append(d) + return dict(grouped) + async def upsert_attacker(self, data: dict[str, Any]) -> None: async with self.session_factory() as session: result = await session.execute( @@ -429,9 +453,31 @@ class SQLiteRepository(BaseRepository): setattr(existing, k, v) session.add(existing) else: + data["uuid"] = str(uuid.uuid4()) session.add(Attacker(**data)) await session.commit() + @staticmethod + def _deserialize_attacker(d: dict[str, Any]) -> dict[str, Any]: + """Parse JSON-encoded list fields in an attacker dict.""" + for key in ("services", "deckies", "fingerprints", "commands"): + if isinstance(d.get(key), str): + try: + d[key] = json.loads(d[key]) + except (json.JSONDecodeError, TypeError): + pass + return d + + async def get_attacker_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: + async with self.session_factory() as session: + result = await session.execute( + select(Attacker).where(Attacker.uuid == uuid) + ) + attacker = result.scalar_one_or_none() + if not attacker: + return None + return self._deserialize_attacker(attacker.model_dump(mode="json")) + async def get_attackers( self, limit: int = 50, @@ -450,7 +496,10 @@ class SQLiteRepository(BaseRepository): async with self.session_factory() as session: result = await session.execute(statement) - return [a.model_dump(mode="json") for a in result.scalars().all()] + return [ + self._deserialize_attacker(a.model_dump(mode="json")) + for a in result.scalars().all() + ] async def get_total_attackers(self, search: Optional[str] = None) -> int: statement = select(func.count()).select_from(Attacker) diff --git a/decnet/web/router/__init__.py b/decnet/web/router/__init__.py index b1bd92e..87a2cef 100644 --- a/decnet/web/router/__init__.py +++ b/decnet/web/router/__init__.py @@ -11,6 +11,8 @@ from .fleet.api_mutate_decky import router as mutate_decky_router from .fleet.api_mutate_interval import router as mutate_interval_router from .fleet.api_deploy_deckies import router as deploy_deckies_router from .stream.api_stream_events import router as stream_router +from .attackers.api_get_attackers import router as attackers_router +from .attackers.api_get_attacker_detail import router as attacker_detail_router api_router = APIRouter() @@ -31,6 +33,10 @@ api_router.include_router(mutate_decky_router) api_router.include_router(mutate_interval_router) api_router.include_router(deploy_deckies_router) +# Attacker Profiles +api_router.include_router(attackers_router) +api_router.include_router(attacker_detail_router) + # Observability api_router.include_router(stats_router) api_router.include_router(stream_router) diff --git a/decnet/web/router/attackers/__init__.py b/decnet/web/router/attackers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/decnet/web/router/attackers/api_get_attacker_detail.py b/decnet/web/router/attackers/api_get_attacker_detail.py new file mode 100644 index 0000000..42bad76 --- /dev/null +++ b/decnet/web/router/attackers/api_get_attacker_detail.py @@ -0,0 +1,26 @@ +from typing import Any + +from fastapi import APIRouter, Depends, HTTPException + +from decnet.web.dependencies import get_current_user, repo + +router = APIRouter() + + +@router.get( + "/attackers/{uuid}", + tags=["Attacker Profiles"], + responses={ + 401: {"description": "Could not validate credentials"}, + 404: {"description": "Attacker not found"}, + }, +) +async def get_attacker_detail( + uuid: str, + current_user: str = Depends(get_current_user), +) -> dict[str, Any]: + """Retrieve a single attacker profile by UUID.""" + attacker = await repo.get_attacker_by_uuid(uuid) + if not attacker: + raise HTTPException(status_code=404, detail="Attacker not found") + return attacker diff --git a/decnet/web/router/attackers/api_get_attackers.py b/decnet/web/router/attackers/api_get_attackers.py new file mode 100644 index 0000000..aa3fa07 --- /dev/null +++ b/decnet/web/router/attackers/api_get_attackers.py @@ -0,0 +1,36 @@ +from typing import Any, Optional + +from fastapi import APIRouter, Depends, Query + +from decnet.web.dependencies import get_current_user, repo +from decnet.web.db.models import AttackersResponse + +router = APIRouter() + + +@router.get( + "/attackers", + response_model=AttackersResponse, + tags=["Attacker Profiles"], + responses={ + 401: {"description": "Could not validate credentials"}, + 422: {"description": "Validation error"}, + }, +) +async def get_attackers( + limit: int = Query(50, ge=1, le=1000), + offset: int = Query(0, ge=0, le=2147483647), + search: Optional[str] = None, + sort_by: str = Query("recent", pattern="^(recent|active|traversals)$"), + current_user: str = Depends(get_current_user), +) -> dict[str, Any]: + """Retrieve paginated attacker profiles.""" + def _norm(v: Optional[str]) -> Optional[str]: + if v in (None, "null", "NULL", "undefined", ""): + return None + return v + + s = _norm(search) + _data = await repo.get_attackers(limit=limit, offset=offset, search=s, sort_by=sort_by) + _total = await repo.get_total_attackers(search=s) + return {"total": _total, "limit": limit, "offset": offset, "data": _data} diff --git a/decnet_web/src/App.tsx b/decnet_web/src/App.tsx index 5ff438c..937ce94 100644 --- a/decnet_web/src/App.tsx +++ b/decnet_web/src/App.tsx @@ -6,6 +6,7 @@ import Dashboard from './components/Dashboard'; import DeckyFleet from './components/DeckyFleet'; import LiveLogs from './components/LiveLogs'; import Attackers from './components/Attackers'; +import AttackerDetail from './components/AttackerDetail'; import Config from './components/Config'; import Bounty from './components/Bounty'; @@ -61,6 +62,7 @@ function App() { } /> } /> } /> + } /> } /> } /> diff --git a/decnet_web/src/components/AttackerDetail.tsx b/decnet_web/src/components/AttackerDetail.tsx new file mode 100644 index 0000000..349cda0 --- /dev/null +++ b/decnet_web/src/components/AttackerDetail.tsx @@ -0,0 +1,258 @@ +import React, { useEffect, useState } from 'react'; +import { useParams, useNavigate } from 'react-router-dom'; +import { ArrowLeft, Crosshair } from 'lucide-react'; +import api from '../utils/api'; +import './Dashboard.css'; + +interface AttackerData { + uuid: string; + ip: string; + first_seen: string; + last_seen: string; + event_count: number; + service_count: number; + decky_count: number; + services: string[]; + deckies: string[]; + traversal_path: string | null; + is_traversal: boolean; + bounty_count: number; + credential_count: number; + fingerprints: any[]; + commands: { service: string; decky: string; command: string; timestamp: string }[]; + updated_at: string; +} + +const AttackerDetail: React.FC = () => { + const { id } = useParams<{ id: string }>(); + const navigate = useNavigate(); + const [attacker, setAttacker] = useState(null); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(null); + + useEffect(() => { + const fetchAttacker = async () => { + setLoading(true); + try { + const res = await api.get(`/attackers/${id}`); + setAttacker(res.data); + } catch (err: any) { + if (err.response?.status === 404) { + setError('ATTACKER NOT FOUND'); + } else { + setError('FAILED TO LOAD ATTACKER PROFILE'); + } + } finally { + setLoading(false); + } + }; + fetchAttacker(); + }, [id]); + + if (loading) { + return ( +
+
+ LOADING THREAT PROFILE... +
+
+ ); + } + + if (error || !attacker) { + return ( +
+ +
+ {error || 'ATTACKER NOT FOUND'} +
+
+ ); + } + + return ( +
+ {/* Back Button */} + + + {/* Header */} +
+ +

+ {attacker.ip} +

+ {attacker.is_traversal && ( + TRAVERSAL + )} +
+ + {/* Stats Row */} +
+
+
{attacker.event_count}
+
EVENTS
+
+
+
{attacker.bounty_count}
+
BOUNTIES
+
+
+
{attacker.credential_count}
+
CREDENTIALS
+
+
+
{attacker.service_count}
+
SERVICES
+
+
+
{attacker.decky_count}
+
DECKIES
+
+
+ + {/* Timestamps */} +
+
+

TIMELINE

+
+
+
+ FIRST SEEN: + {new Date(attacker.first_seen).toLocaleString()} +
+
+ LAST SEEN: + {new Date(attacker.last_seen).toLocaleString()} +
+
+ UPDATED: + {new Date(attacker.updated_at).toLocaleString()} +
+
+
+ + {/* Services */} +
+
+

SERVICES TARGETED

+
+
+ {attacker.services.length > 0 ? attacker.services.map((svc) => ( + + {svc.toUpperCase()} + + )) : ( + No services recorded + )} +
+
+ + {/* Deckies & Traversal */} +
+
+

DECKY INTERACTIONS

+
+
+ {attacker.traversal_path ? ( +
+ TRAVERSAL PATH: + {attacker.traversal_path} +
+ ) : ( +
+ {attacker.deckies.map((d) => ( + + {d} + + ))} + {attacker.deckies.length === 0 && No deckies recorded} +
+ )} +
+
+ + {/* Commands */} +
+
+

COMMANDS ({attacker.commands.length})

+
+ {attacker.commands.length > 0 ? ( +
+ + + + + + + + + + + {attacker.commands.map((cmd, i) => ( + + + + + + + ))} + +
TIMESTAMPSERVICEDECKYCOMMAND
+ {cmd.timestamp ? new Date(cmd.timestamp).toLocaleString() : '-'} + {cmd.service}{cmd.decky}{cmd.command}
+
+ ) : ( +
+ NO COMMANDS CAPTURED +
+ )} +
+ + {/* Fingerprints */} +
+
+

FINGERPRINTS ({attacker.fingerprints.length})

+
+ {attacker.fingerprints.length > 0 ? ( +
+ + + + + + + + + {attacker.fingerprints.map((fp, i) => ( + + + + + ))} + +
TYPEVALUE
{fp.type || fp.bounty_type || 'unknown'} + {typeof fp === 'object' ? JSON.stringify(fp) : String(fp)} +
+
+ ) : ( +
+ NO FINGERPRINTS CAPTURED +
+ )} +
+ + {/* UUID footer */} +
+ UUID: {attacker.uuid} +
+
+ ); +}; + +export default AttackerDetail; diff --git a/decnet_web/src/components/Attackers.tsx b/decnet_web/src/components/Attackers.tsx index 0ed1ce9..a8453a3 100644 --- a/decnet_web/src/components/Attackers.tsx +++ b/decnet_web/src/components/Attackers.tsx @@ -1,17 +1,233 @@ -import React from 'react'; -import { Activity } from 'lucide-react'; +import React, { useEffect, useState } from 'react'; +import { useSearchParams, useNavigate } from 'react-router-dom'; +import { Crosshair, Search, ChevronLeft, ChevronRight, Filter } from 'lucide-react'; +import api from '../utils/api'; import './Dashboard.css'; +interface AttackerEntry { + uuid: string; + ip: string; + first_seen: string; + last_seen: string; + event_count: number; + service_count: number; + decky_count: number; + services: string[]; + deckies: string[]; + traversal_path: string | null; + is_traversal: boolean; + bounty_count: number; + credential_count: number; + fingerprints: any[]; + commands: any[]; + updated_at: string; +} + +function timeAgo(dateStr: string): string { + const diff = Date.now() - new Date(dateStr).getTime(); + const mins = Math.floor(diff / 60000); + if (mins < 1) return 'just now'; + if (mins < 60) return `${mins}m ago`; + const hrs = Math.floor(mins / 60); + if (hrs < 24) return `${hrs}h ago`; + const days = Math.floor(hrs / 24); + return `${days}d ago`; +} + const Attackers: React.FC = () => { + const navigate = useNavigate(); + const [searchParams, setSearchParams] = useSearchParams(); + const query = searchParams.get('q') || ''; + const sortBy = searchParams.get('sort_by') || 'recent'; + const page = parseInt(searchParams.get('page') || '1'); + + const [attackers, setAttackers] = useState([]); + const [total, setTotal] = useState(0); + const [loading, setLoading] = useState(true); + const [searchInput, setSearchInput] = useState(query); + + const limit = 50; + + const fetchAttackers = async () => { + setLoading(true); + try { + const offset = (page - 1) * limit; + let url = `/attackers?limit=${limit}&offset=${offset}&sort_by=${sortBy}`; + if (query) url += `&search=${encodeURIComponent(query)}`; + + const res = await api.get(url); + setAttackers(res.data.data); + setTotal(res.data.total); + } catch (err) { + console.error('Failed to fetch attackers', err); + } finally { + setLoading(false); + } + }; + + useEffect(() => { + fetchAttackers(); + }, [query, sortBy, page]); + + const handleSearch = (e: React.FormEvent) => { + e.preventDefault(); + setSearchParams({ q: searchInput, sort_by: sortBy, page: '1' }); + }; + + const setPage = (p: number) => { + setSearchParams({ q: query, sort_by: sortBy, page: p.toString() }); + }; + + const setSort = (s: string) => { + setSearchParams({ q: query, sort_by: s, page: '1' }); + }; + + const totalPages = Math.ceil(total / limit); + return ( -
-
- -

ATTACKER PROFILES

+
+ {/* Page Header */} +
+
+ +

ATTACKER PROFILES

+
+ +
+
+ + +
+ +
+ + setSearchInput(e.target.value)} + style={{ background: 'transparent', border: 'none', padding: '4px', fontSize: '0.8rem', width: '200px' }} + /> + +
-
-

NO ACTIVE THREATS PROFILED YET.

-

(Attackers view placeholder)

+ + {/* Summary & Pagination */} +
+
+
+ {total} THREATS PROFILED +
+ +
+ + Page {page} of {totalPages || 1} + +
+ + +
+
+
+ + {/* Card Grid */} + {loading ? ( +
+ SCANNING THREAT PROFILES... +
+ ) : attackers.length === 0 ? ( +
+ NO ACTIVE THREATS PROFILED YET +
+ ) : ( +
+ {attackers.map((a) => { + const lastCmd = a.commands.length > 0 + ? a.commands[a.commands.length - 1] + : null; + + return ( +
navigate(`/attackers/${a.uuid}`)} + > + {/* Header row */} +
+ {a.ip} + {a.is_traversal && ( + TRAVERSAL + )} +
+ + {/* Timestamps */} +
+ First: {new Date(a.first_seen).toLocaleDateString()} + Last: {timeAgo(a.last_seen)} +
+ + {/* Counts */} +
+ Events: {a.event_count} + Bounties: {a.bounty_count} + Creds: {a.credential_count} +
+ + {/* Services */} +
+ {a.services.map((svc) => ( + {svc.toUpperCase()} + ))} +
+ + {/* Deckies / Traversal Path */} + {a.traversal_path ? ( +
+ Path: {a.traversal_path} +
+ ) : a.deckies.length > 0 ? ( +
+ Deckies: {a.deckies.join(', ')} +
+ ) : null} + + {/* Commands & Fingerprints */} +
+ Cmds: {a.commands.length} + Fingerprints: {a.fingerprints.length} +
+ + {/* Last command preview */} + {lastCmd && ( +
+ Last cmd: {lastCmd.command} +
+ )} +
+ ); + })} +
+ )}
); diff --git a/decnet_web/src/components/Dashboard.css b/decnet_web/src/components/Dashboard.css index 773fcd9..3de3e15 100644 --- a/decnet_web/src/components/Dashboard.css +++ b/decnet_web/src/components/Dashboard.css @@ -127,3 +127,61 @@ from { transform: rotate(0deg); } to { transform: rotate(360deg); } } + +/* Attacker Profiles */ +.attacker-grid { + display: grid; + grid-template-columns: repeat(auto-fill, minmax(340px, 1fr)); + gap: 16px; + padding: 16px; +} + +.attacker-card { + background: var(--secondary-color); + border: 1px solid var(--border-color); + padding: 16px; + cursor: pointer; + transition: transform 0.15s ease, box-shadow 0.15s ease, border-color 0.15s ease; +} + +.attacker-card:hover { + transform: translateY(-2px); + border-color: var(--text-color); + box-shadow: var(--matrix-green-glow); +} + +.traversal-badge { + font-size: 0.65rem; + padding: 2px 8px; + border: 1px solid var(--accent-color); + background: rgba(238, 130, 238, 0.1); + color: var(--accent-color); + letter-spacing: 2px; +} + +.service-badge { + font-size: 0.7rem; + padding: 2px 8px; + border: 1px solid var(--text-color); + background: rgba(0, 255, 65, 0.05); + color: var(--text-color); +} + +.back-button { + display: inline-flex; + align-items: center; + gap: 8px; + padding: 8px 16px; + border: 1px solid var(--border-color); + background: transparent; + color: var(--text-color); + cursor: pointer; + font-size: 0.8rem; + letter-spacing: 2px; + transition: border-color 0.15s ease, box-shadow 0.15s ease; +} + +.back-button:hover { + border-color: var(--text-color); + box-shadow: var(--matrix-green-glow); +} diff --git a/tests/test_api_attackers.py b/tests/test_api_attackers.py new file mode 100644 index 0000000..2b62399 --- /dev/null +++ b/tests/test_api_attackers.py @@ -0,0 +1,213 @@ +""" +Tests for the attacker profile API routes. + +Covers: +- GET /attackers: paginated list, search, sort_by +- GET /attackers/{uuid}: single profile detail, 404 on missing UUID +- Auth enforcement on both endpoints +""" + +import json +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from fastapi import HTTPException + +from decnet.web.auth import create_access_token + + +# ─── Helpers ────────────────────────────────────────────────────────────────── + +def _auth_request(uuid: str = "test-user-uuid") -> MagicMock: + token = create_access_token({"uuid": uuid}) + req = MagicMock() + req.headers = {"Authorization": f"Bearer {token}"} + return req + + +def _sample_attacker(uuid: str = "att-uuid-1", ip: str = "1.2.3.4") -> dict: + return { + "uuid": uuid, + "ip": ip, + "first_seen": datetime(2026, 4, 1, tzinfo=timezone.utc).isoformat(), + "last_seen": datetime(2026, 4, 10, tzinfo=timezone.utc).isoformat(), + "event_count": 42, + "service_count": 3, + "decky_count": 2, + "services": ["ssh", "http", "ftp"], + "deckies": ["decky-01", "decky-02"], + "traversal_path": "decky-01 → decky-02", + "is_traversal": True, + "bounty_count": 5, + "credential_count": 2, + "fingerprints": [{"type": "ja3", "hash": "abc"}], + "commands": [{"service": "ssh", "decky": "decky-01", "command": "id", "timestamp": "2026-04-01T10:00:00"}], + "updated_at": datetime(2026, 4, 10, tzinfo=timezone.utc).isoformat(), + } + + +# ─── GET /attackers ────────────────────────────────────────────────────────── + +class TestGetAttackers: + @pytest.mark.asyncio + async def test_returns_paginated_response(self): + from decnet.web.router.attackers.api_get_attackers import get_attackers + + sample = _sample_attacker() + with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo: + mock_repo.get_attackers = AsyncMock(return_value=[sample]) + mock_repo.get_total_attackers = AsyncMock(return_value=1) + + result = await get_attackers( + limit=50, offset=0, search=None, sort_by="recent", + current_user="test-user", + ) + + assert result["total"] == 1 + assert result["limit"] == 50 + assert result["offset"] == 0 + assert len(result["data"]) == 1 + assert result["data"][0]["uuid"] == "att-uuid-1" + + @pytest.mark.asyncio + async def test_search_parameter_forwarded(self): + from decnet.web.router.attackers.api_get_attackers import get_attackers + + with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo: + mock_repo.get_attackers = AsyncMock(return_value=[]) + mock_repo.get_total_attackers = AsyncMock(return_value=0) + + await get_attackers( + limit=50, offset=0, search="192.168", sort_by="recent", + current_user="test-user", + ) + + mock_repo.get_attackers.assert_awaited_once_with( + limit=50, offset=0, search="192.168", sort_by="recent", + ) + mock_repo.get_total_attackers.assert_awaited_once_with(search="192.168") + + @pytest.mark.asyncio + async def test_null_search_normalized(self): + from decnet.web.router.attackers.api_get_attackers import get_attackers + + with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo: + mock_repo.get_attackers = AsyncMock(return_value=[]) + mock_repo.get_total_attackers = AsyncMock(return_value=0) + + await get_attackers( + limit=50, offset=0, search="null", sort_by="recent", + current_user="test-user", + ) + + mock_repo.get_attackers.assert_awaited_once_with( + limit=50, offset=0, search=None, sort_by="recent", + ) + + @pytest.mark.asyncio + async def test_sort_by_active(self): + from decnet.web.router.attackers.api_get_attackers import get_attackers + + with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo: + mock_repo.get_attackers = AsyncMock(return_value=[]) + mock_repo.get_total_attackers = AsyncMock(return_value=0) + + await get_attackers( + limit=50, offset=0, search=None, sort_by="active", + current_user="test-user", + ) + + mock_repo.get_attackers.assert_awaited_once_with( + limit=50, offset=0, search=None, sort_by="active", + ) + + @pytest.mark.asyncio + async def test_empty_search_normalized_to_none(self): + from decnet.web.router.attackers.api_get_attackers import get_attackers + + with patch("decnet.web.router.attackers.api_get_attackers.repo") as mock_repo: + mock_repo.get_attackers = AsyncMock(return_value=[]) + mock_repo.get_total_attackers = AsyncMock(return_value=0) + + await get_attackers( + limit=50, offset=0, search="", sort_by="recent", + current_user="test-user", + ) + + mock_repo.get_attackers.assert_awaited_once_with( + limit=50, offset=0, search=None, sort_by="recent", + ) + + +# ─── GET /attackers/{uuid} ─────────────────────────────────────────────────── + +class TestGetAttackerDetail: + @pytest.mark.asyncio + async def test_returns_attacker_by_uuid(self): + from decnet.web.router.attackers.api_get_attacker_detail import get_attacker_detail + + sample = _sample_attacker() + with patch("decnet.web.router.attackers.api_get_attacker_detail.repo") as mock_repo: + mock_repo.get_attacker_by_uuid = AsyncMock(return_value=sample) + + result = await get_attacker_detail(uuid="att-uuid-1", current_user="test-user") + + assert result["uuid"] == "att-uuid-1" + assert result["ip"] == "1.2.3.4" + assert result["is_traversal"] is True + assert isinstance(result["commands"], list) + + @pytest.mark.asyncio + async def test_404_on_unknown_uuid(self): + from decnet.web.router.attackers.api_get_attacker_detail import get_attacker_detail + + with patch("decnet.web.router.attackers.api_get_attacker_detail.repo") as mock_repo: + mock_repo.get_attacker_by_uuid = AsyncMock(return_value=None) + + with pytest.raises(HTTPException) as exc_info: + await get_attacker_detail(uuid="nonexistent", current_user="test-user") + + assert exc_info.value.status_code == 404 + + @pytest.mark.asyncio + async def test_deserialized_json_fields(self): + from decnet.web.router.attackers.api_get_attacker_detail import get_attacker_detail + + sample = _sample_attacker() + with patch("decnet.web.router.attackers.api_get_attacker_detail.repo") as mock_repo: + mock_repo.get_attacker_by_uuid = AsyncMock(return_value=sample) + + result = await get_attacker_detail(uuid="att-uuid-1", current_user="test-user") + + assert isinstance(result["services"], list) + assert isinstance(result["deckies"], list) + assert isinstance(result["fingerprints"], list) + assert isinstance(result["commands"], list) + + +# ─── Auth enforcement ──────────────────────────────────────────────────────── + +class TestAttackersAuth: + @pytest.mark.asyncio + async def test_list_requires_auth(self): + """get_current_user dependency raises 401 when called without valid token.""" + from decnet.web.dependencies import get_current_user + + req = MagicMock() + req.headers = {} + + with pytest.raises(HTTPException) as exc_info: + await get_current_user(req) + assert exc_info.value.status_code == 401 + + @pytest.mark.asyncio + async def test_detail_requires_auth(self): + from decnet.web.dependencies import get_current_user + + req = MagicMock() + req.headers = {"Authorization": "Bearer bad-token"} + + with pytest.raises(HTTPException) as exc_info: + await get_current_user(req) + assert exc_info.value.status_code == 401 diff --git a/tests/test_attacker_worker.py b/tests/test_attacker_worker.py index 57f44fe..7c7ceaa 100644 --- a/tests/test_attacker_worker.py +++ b/tests/test_attacker_worker.py @@ -2,8 +2,10 @@ Tests for decnet/web/attacker_worker.py Covers: -- _rebuild(): CorrelationEngine integration, traversal detection, upsert calls -- _extract_commands(): command harvesting from raw log rows +- _cold_start(): full build on first run, cursor persistence +- _incremental_update(): delta processing, affected-IP-only updates +- _update_profiles(): traversal detection, bounty merging +- _extract_commands_from_events(): command harvesting from LogEvent objects - _build_record(): record assembly from engine events + bounties - _first_contact_deckies(): ordering for single-decky attackers - attacker_profile_worker(): cancellation and error handling @@ -18,15 +20,20 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest +from decnet.correlation.parser import LogEvent from decnet.logging.syslog_formatter import SEVERITY_INFO, format_rfc5424 from decnet.web.attacker_worker import ( + _BATCH_SIZE, + _STATE_KEY, + _WorkerState, _build_record, - _extract_commands, + _cold_start, + _extract_commands_from_events, _first_contact_deckies, - _rebuild, + _incremental_update, + _update_profiles, attacker_profile_worker, ) -from decnet.correlation.parser import LogEvent # ─── Helpers ────────────────────────────────────────────────────────────────── @@ -59,6 +66,7 @@ def _make_raw_line( def _make_log_row( + row_id: int = 1, raw_line: str = "", attacker_ip: str = "1.2.3.4", service: str = "ssh", @@ -76,7 +84,7 @@ def _make_log_row( timestamp=timestamp.isoformat(), ) return { - "id": 1, + "id": row_id, "raw_line": raw_line, "attacker_ip": attacker_ip, "service": service, @@ -87,10 +95,15 @@ def _make_log_row( } -def _make_repo(logs=None, bounties=None): +def _make_repo(logs=None, bounties=None, bounties_for_ips=None, max_log_id=0, saved_state=None): repo = MagicMock() repo.get_all_logs_raw = AsyncMock(return_value=logs or []) repo.get_all_bounties_by_ip = AsyncMock(return_value=bounties or {}) + repo.get_bounties_for_ips = AsyncMock(return_value=bounties_for_ips or {}) + repo.get_max_log_id = AsyncMock(return_value=max_log_id) + repo.get_logs_after_id = AsyncMock(return_value=[]) + repo.get_state = AsyncMock(return_value=saved_state) + repo.set_state = AsyncMock() repo.upsert_attacker = AsyncMock() return repo @@ -101,6 +114,7 @@ def _make_log_event( service: str = "ssh", event_type: str = "connection", timestamp: datetime = _DT1, + fields: dict | None = None, ) -> LogEvent: return LogEvent( timestamp=timestamp, @@ -108,7 +122,7 @@ def _make_log_event( service=service, event_type=event_type, attacker_ip=ip, - fields={}, + fields=fields or {}, raw="", ) @@ -138,75 +152,52 @@ class TestFirstContactDeckies: assert result.count("decky-01") == 1 -# ─── _extract_commands ──────────────────────────────────────────────────────── - -class TestExtractCommands: - def _row(self, ip, event_type, fields): - return _make_log_row( - attacker_ip=ip, - event_type=event_type, - service="ssh", - decky="decky-01", - fields=json.dumps(fields), - ) +# ─── _extract_commands_from_events ─────────────────────────────────────────── +class TestExtractCommandsFromEvents: def test_extracts_command_field(self): - rows = [self._row("1.1.1.1", "command", {"command": "id"})] - result = _extract_commands(rows, "1.1.1.1") + events = [_make_log_event("1.1.1.1", "decky-01", "ssh", "command", _DT1, {"command": "id"})] + result = _extract_commands_from_events(events) assert len(result) == 1 assert result[0]["command"] == "id" assert result[0]["service"] == "ssh" assert result[0]["decky"] == "decky-01" def test_extracts_query_field(self): - rows = [self._row("2.2.2.2", "query", {"query": "SELECT * FROM users"})] - result = _extract_commands(rows, "2.2.2.2") + events = [_make_log_event("2.2.2.2", "decky-01", "mysql", "query", _DT1, {"query": "SELECT * FROM users"})] + result = _extract_commands_from_events(events) assert len(result) == 1 assert result[0]["command"] == "SELECT * FROM users" def test_extracts_input_field(self): - rows = [self._row("3.3.3.3", "input", {"input": "ls -la"})] - result = _extract_commands(rows, "3.3.3.3") + events = [_make_log_event("3.3.3.3", "decky-01", "ssh", "input", _DT1, {"input": "ls -la"})] + result = _extract_commands_from_events(events) assert len(result) == 1 assert result[0]["command"] == "ls -la" def test_non_command_event_type_ignored(self): - rows = [self._row("1.1.1.1", "connection", {"command": "id"})] - result = _extract_commands(rows, "1.1.1.1") - assert result == [] - - def test_wrong_ip_ignored(self): - rows = [self._row("9.9.9.9", "command", {"command": "whoami"})] - result = _extract_commands(rows, "1.1.1.1") + events = [_make_log_event("1.1.1.1", "decky-01", "ssh", "connection", _DT1, {"command": "id"})] + result = _extract_commands_from_events(events) assert result == [] def test_no_command_field_skipped(self): - rows = [self._row("1.1.1.1", "command", {"other": "stuff"})] - result = _extract_commands(rows, "1.1.1.1") - assert result == [] - - def test_invalid_json_fields_skipped(self): - row = _make_log_row( - attacker_ip="1.1.1.1", - event_type="command", - fields="not valid json", - ) - result = _extract_commands([row], "1.1.1.1") + events = [_make_log_event("1.1.1.1", "decky-01", "ssh", "command", _DT1, {"other": "stuff"})] + result = _extract_commands_from_events(events) assert result == [] def test_multiple_commands_all_extracted(self): - rows = [ - self._row("5.5.5.5", "command", {"command": "id"}), - self._row("5.5.5.5", "command", {"command": "uname -a"}), + events = [ + _make_log_event("5.5.5.5", "decky-01", "ssh", "command", _DT1, {"command": "id"}), + _make_log_event("5.5.5.5", "decky-01", "ssh", "command", _DT2, {"command": "uname -a"}), ] - result = _extract_commands(rows, "5.5.5.5") + result = _extract_commands_from_events(events) assert len(result) == 2 cmds = {r["command"] for r in result} assert cmds == {"id", "uname -a"} def test_timestamp_serialized_to_string(self): - rows = [self._row("1.1.1.1", "command", {"command": "pwd"})] - result = _extract_commands(rows, "1.1.1.1") + events = [_make_log_event("1.1.1.1", "decky-01", "ssh", "command", _DT1, {"command": "pwd"})] + result = _extract_commands_from_events(events) assert isinstance(result[0]["timestamp"], str) @@ -291,112 +282,283 @@ class TestBuildRecord: assert record["updated_at"].tzinfo is not None -# ─── _rebuild ───────────────────────────────────────────────────────────────── +# ─── _cold_start ───────────────────────────────────────────────────────────── -class TestRebuild: +class TestColdStart: @pytest.mark.asyncio - async def test_empty_logs_no_upsert(self): - repo = _make_repo(logs=[]) - await _rebuild(repo) - repo.upsert_attacker.assert_not_awaited() - - @pytest.mark.asyncio - async def test_single_attacker_upserted(self): - raw = _make_raw_line("ssh", "decky-01", "connection", "10.0.0.1", _TS1) - row = _make_log_row(raw_line=raw, attacker_ip="10.0.0.1") - repo = _make_repo(logs=[row]) - await _rebuild(repo) - repo.upsert_attacker.assert_awaited_once() - record = repo.upsert_attacker.call_args[0][0] - assert record["ip"] == "10.0.0.1" - assert record["event_count"] == 1 - - @pytest.mark.asyncio - async def test_multiple_attackers_all_upserted(self): + async def test_cold_start_builds_all_profiles(self): rows = [ _make_log_row( + row_id=i + 1, raw_line=_make_raw_line("ssh", "decky-01", "conn", ip, _TS1), attacker_ip=ip, ) - for ip in ["1.1.1.1", "2.2.2.2", "3.3.3.3"] + for i, ip in enumerate(["1.1.1.1", "2.2.2.2", "3.3.3.3"]) ] - repo = _make_repo(logs=rows) - await _rebuild(repo) + repo = _make_repo(logs=rows, max_log_id=3) + state = _WorkerState() + + await _cold_start(repo, state) + + assert state.initialized is True + assert state.last_log_id == 3 assert repo.upsert_attacker.await_count == 3 upserted_ips = {c[0][0]["ip"] for c in repo.upsert_attacker.call_args_list} assert upserted_ips == {"1.1.1.1", "2.2.2.2", "3.3.3.3"} + repo.set_state.assert_awaited_with(_STATE_KEY, {"last_log_id": 3}) @pytest.mark.asyncio - async def test_traversal_detected_across_two_deckies(self): + async def test_cold_start_empty_db(self): + repo = _make_repo(logs=[], max_log_id=0) + state = _WorkerState() + + await _cold_start(repo, state) + + assert state.initialized is True + assert state.last_log_id == 0 + repo.upsert_attacker.assert_not_awaited() + repo.set_state.assert_awaited() + + @pytest.mark.asyncio + async def test_cold_start_traversal_detected(self): rows = [ _make_log_row( + row_id=1, raw_line=_make_raw_line("ssh", "decky-01", "conn", "5.5.5.5", _TS1), attacker_ip="5.5.5.5", decky="decky-01", ), _make_log_row( + row_id=2, raw_line=_make_raw_line("http", "decky-02", "req", "5.5.5.5", _TS2), attacker_ip="5.5.5.5", decky="decky-02", ), ] - repo = _make_repo(logs=rows) - await _rebuild(repo) + repo = _make_repo(logs=rows, max_log_id=2) + state = _WorkerState() + + await _cold_start(repo, state) + record = repo.upsert_attacker.call_args[0][0] assert record["is_traversal"] is True assert "decky-01" in record["traversal_path"] assert "decky-02" in record["traversal_path"] @pytest.mark.asyncio - async def test_single_decky_not_traversal(self): - rows = [ - _make_log_row( - raw_line=_make_raw_line("ssh", "decky-01", "conn", "7.7.7.7", _TS1), - attacker_ip="7.7.7.7", - ), - _make_log_row( - raw_line=_make_raw_line("http", "decky-01", "req", "7.7.7.7", _TS2), - attacker_ip="7.7.7.7", - ), - ] - repo = _make_repo(logs=rows) - await _rebuild(repo) - record = repo.upsert_attacker.call_args[0][0] - assert record["is_traversal"] is False - - @pytest.mark.asyncio - async def test_bounties_merged_into_record(self): + async def test_cold_start_bounties_merged(self): raw = _make_raw_line("ssh", "decky-01", "conn", "8.8.8.8", _TS1) repo = _make_repo( - logs=[_make_log_row(raw_line=raw, attacker_ip="8.8.8.8")], - bounties={"8.8.8.8": [ + logs=[_make_log_row(row_id=1, raw_line=raw, attacker_ip="8.8.8.8")], + max_log_id=1, + bounties_for_ips={"8.8.8.8": [ {"bounty_type": "credential", "attacker_ip": "8.8.8.8", "payload": {}}, {"bounty_type": "fingerprint", "attacker_ip": "8.8.8.8", "payload": {"ja3": "abc"}}, ]}, ) - await _rebuild(repo) + state = _WorkerState() + + await _cold_start(repo, state) + record = repo.upsert_attacker.call_args[0][0] assert record["bounty_count"] == 2 assert record["credential_count"] == 1 - fps = json.loads(record["fingerprints"]) - assert len(fps) == 1 @pytest.mark.asyncio - async def test_commands_extracted_during_rebuild(self): - raw = _make_raw_line("ssh", "decky-01", "command", "9.9.9.9", _TS1) + async def test_cold_start_commands_extracted(self): + raw = _make_raw_line("ssh", "decky-01", "command", "9.9.9.9", _TS1, command="cat /etc/passwd") row = _make_log_row( + row_id=1, raw_line=raw, attacker_ip="9.9.9.9", event_type="command", fields=json.dumps({"command": "cat /etc/passwd"}), ) - repo = _make_repo(logs=[row]) - await _rebuild(repo) + repo = _make_repo(logs=[row], max_log_id=1) + state = _WorkerState() + + await _cold_start(repo, state) + record = repo.upsert_attacker.call_args[0][0] commands = json.loads(record["commands"]) assert len(commands) == 1 assert commands[0]["command"] == "cat /etc/passwd" -# ─── attacker_profile_worker ────────────────────────────────────────────────── +# ─── _incremental_update ──────────────────────────────────────────────────── + +class TestIncrementalUpdate: + @pytest.mark.asyncio + async def test_no_new_logs_skips_upsert(self): + repo = _make_repo() + state = _WorkerState(initialized=True, last_log_id=10) + + await _incremental_update(repo, state) + + repo.upsert_attacker.assert_not_awaited() + repo.set_state.assert_awaited_with(_STATE_KEY, {"last_log_id": 10}) + + @pytest.mark.asyncio + async def test_only_affected_ips_upserted(self): + """Pre-populate engine with IP-A, then feed new logs only for IP-B.""" + state = _WorkerState(initialized=True, last_log_id=5) + # Pre-populate engine with IP-A events + line_a = _make_raw_line("ssh", "decky-01", "conn", "1.1.1.1", _TS1) + state.engine.ingest(line_a) + + # New batch has only IP-B + new_row = _make_log_row( + row_id=6, + raw_line=_make_raw_line("ssh", "decky-01", "conn", "2.2.2.2", _TS2), + attacker_ip="2.2.2.2", + ) + repo = _make_repo() + repo.get_logs_after_id = AsyncMock(return_value=[new_row]) + + await _incremental_update(repo, state) + + assert repo.upsert_attacker.await_count == 1 + upserted_ip = repo.upsert_attacker.call_args[0][0]["ip"] + assert upserted_ip == "2.2.2.2" + + @pytest.mark.asyncio + async def test_merges_with_existing_engine_state(self): + """Engine has 2 events for IP. New batch adds 1 more. Record should show event_count=3.""" + state = _WorkerState(initialized=True, last_log_id=2) + state.engine.ingest(_make_raw_line("ssh", "decky-01", "conn", "1.1.1.1", _TS1)) + state.engine.ingest(_make_raw_line("http", "decky-01", "req", "1.1.1.1", _TS2)) + + new_row = _make_log_row( + row_id=3, + raw_line=_make_raw_line("ftp", "decky-01", "login", "1.1.1.1", _TS3), + attacker_ip="1.1.1.1", + ) + repo = _make_repo() + repo.get_logs_after_id = AsyncMock(return_value=[new_row]) + + await _incremental_update(repo, state) + + record = repo.upsert_attacker.call_args[0][0] + assert record["event_count"] == 3 + assert record["ip"] == "1.1.1.1" + + @pytest.mark.asyncio + async def test_cursor_persisted_after_update(self): + new_row = _make_log_row( + row_id=42, + raw_line=_make_raw_line("ssh", "decky-01", "conn", "1.1.1.1", _TS1), + attacker_ip="1.1.1.1", + ) + repo = _make_repo() + repo.get_logs_after_id = AsyncMock(return_value=[new_row]) + state = _WorkerState(initialized=True, last_log_id=41) + + await _incremental_update(repo, state) + + assert state.last_log_id == 42 + repo.set_state.assert_awaited_with(_STATE_KEY, {"last_log_id": 42}) + + @pytest.mark.asyncio + async def test_traversal_detected_across_cycles(self): + """IP hits decky-01 during cold start, decky-02 in incremental → traversal.""" + state = _WorkerState(initialized=True, last_log_id=1) + state.engine.ingest(_make_raw_line("ssh", "decky-01", "conn", "5.5.5.5", _TS1)) + + new_row = _make_log_row( + row_id=2, + raw_line=_make_raw_line("http", "decky-02", "req", "5.5.5.5", _TS2), + attacker_ip="5.5.5.5", + ) + repo = _make_repo() + repo.get_logs_after_id = AsyncMock(return_value=[new_row]) + + await _incremental_update(repo, state) + + record = repo.upsert_attacker.call_args[0][0] + assert record["is_traversal"] is True + assert "decky-01" in record["traversal_path"] + assert "decky-02" in record["traversal_path"] + + @pytest.mark.asyncio + async def test_batch_loop_processes_all(self): + """First batch returns BATCH_SIZE rows, second returns fewer — all processed.""" + batch_1 = [ + _make_log_row( + row_id=i + 1, + raw_line=_make_raw_line("ssh", "decky-01", "conn", f"10.0.0.{i}", _TS1), + attacker_ip=f"10.0.0.{i}", + ) + for i in range(_BATCH_SIZE) + ] + batch_2 = [ + _make_log_row( + row_id=_BATCH_SIZE + 1, + raw_line=_make_raw_line("ssh", "decky-01", "conn", "10.0.1.1", _TS2), + attacker_ip="10.0.1.1", + ), + ] + + call_count = 0 + + async def mock_get_logs(last_id, limit=_BATCH_SIZE): + nonlocal call_count + call_count += 1 + if call_count == 1: + return batch_1 + elif call_count == 2: + return batch_2 + return [] + + repo = _make_repo() + repo.get_logs_after_id = AsyncMock(side_effect=mock_get_logs) + state = _WorkerState(initialized=True, last_log_id=0) + + await _incremental_update(repo, state) + + assert state.last_log_id == _BATCH_SIZE + 1 + assert repo.upsert_attacker.await_count == _BATCH_SIZE + 1 + + @pytest.mark.asyncio + async def test_bounties_fetched_only_for_affected_ips(self): + new_rows = [ + _make_log_row( + row_id=1, + raw_line=_make_raw_line("ssh", "decky-01", "conn", "1.1.1.1", _TS1), + attacker_ip="1.1.1.1", + ), + _make_log_row( + row_id=2, + raw_line=_make_raw_line("ssh", "decky-01", "conn", "2.2.2.2", _TS2), + attacker_ip="2.2.2.2", + ), + ] + repo = _make_repo() + repo.get_logs_after_id = AsyncMock(return_value=new_rows) + state = _WorkerState(initialized=True, last_log_id=0) + + await _incremental_update(repo, state) + + repo.get_bounties_for_ips.assert_awaited_once() + called_ips = repo.get_bounties_for_ips.call_args[0][0] + assert called_ips == {"1.1.1.1", "2.2.2.2"} + + @pytest.mark.asyncio + async def test_uninitialized_state_triggers_cold_start(self): + rows = [ + _make_log_row( + row_id=1, + raw_line=_make_raw_line("ssh", "decky-01", "conn", "1.1.1.1", _TS1), + attacker_ip="1.1.1.1", + ), + ] + repo = _make_repo(logs=rows, max_log_id=1) + state = _WorkerState() + + await _incremental_update(repo, state) + + assert state.initialized is True + repo.get_all_logs_raw.assert_awaited_once() + + +# ─── attacker_profile_worker ──────────────────────────────────────────────── class TestAttackerProfileWorker: @pytest.mark.asyncio @@ -409,7 +571,7 @@ class TestAttackerProfileWorker: await task @pytest.mark.asyncio - async def test_worker_handles_rebuild_error_without_crashing(self): + async def test_worker_handles_update_error_without_crashing(self): repo = _make_repo() _call_count = 0 @@ -419,16 +581,16 @@ class TestAttackerProfileWorker: if _call_count >= 2: raise asyncio.CancelledError() - async def bad_rebuild(_repo): + async def bad_update(_repo, _state): raise RuntimeError("DB exploded") with patch("decnet.web.attacker_worker.asyncio.sleep", side_effect=fake_sleep): - with patch("decnet.web.attacker_worker._rebuild", side_effect=bad_rebuild): + with patch("decnet.web.attacker_worker._incremental_update", side_effect=bad_update): with pytest.raises(asyncio.CancelledError): await attacker_profile_worker(repo) @pytest.mark.asyncio - async def test_worker_calls_rebuild_after_sleep(self): + async def test_worker_calls_update_after_sleep(self): repo = _make_repo() _call_count = 0 @@ -438,17 +600,17 @@ class TestAttackerProfileWorker: if _call_count >= 2: raise asyncio.CancelledError() - rebuild_calls = [] + update_calls = [] - async def mock_rebuild(_repo): - rebuild_calls.append(True) + async def mock_update(_repo, _state): + update_calls.append(True) with patch("decnet.web.attacker_worker.asyncio.sleep", side_effect=fake_sleep): - with patch("decnet.web.attacker_worker._rebuild", side_effect=mock_rebuild): + with patch("decnet.web.attacker_worker._incremental_update", side_effect=mock_update): with pytest.raises(asyncio.CancelledError): await attacker_profile_worker(repo) - assert len(rebuild_calls) >= 1 + assert len(update_calls) >= 1 # ─── JA3 bounty extraction from ingester ───────────────────────────────────── diff --git a/tests/test_base_repo.py b/tests/test_base_repo.py index 5ba51db..dad3496 100644 --- a/tests/test_base_repo.py +++ b/tests/test_base_repo.py @@ -22,8 +22,12 @@ class DummyRepo(BaseRepository): async def get_state(self, k): await super().get_state(k) async def set_state(self, k, v): await super().set_state(k, v) async def get_all_logs_raw(self): await super().get_all_logs_raw() + async def get_max_log_id(self): await super().get_max_log_id() + async def get_logs_after_id(self, last_id, limit=500): await super().get_logs_after_id(last_id, limit) async def get_all_bounties_by_ip(self): await super().get_all_bounties_by_ip() + async def get_bounties_for_ips(self, ips): await super().get_bounties_for_ips(ips) async def upsert_attacker(self, d): await super().upsert_attacker(d) + async def get_attacker_by_uuid(self, u): await super().get_attacker_by_uuid(u) async def get_attackers(self, **kw): await super().get_attackers(**kw) async def get_total_attackers(self, **kw): await super().get_total_attackers(**kw) @@ -47,7 +51,11 @@ async def test_base_repo_coverage(): await dr.get_state("k") await dr.set_state("k", "v") await dr.get_all_logs_raw() + await dr.get_max_log_id() + await dr.get_logs_after_id(0) await dr.get_all_bounties_by_ip() + await dr.get_bounties_for_ips({"1.1.1.1"}) await dr.upsert_attacker({}) + await dr.get_attacker_by_uuid("a") await dr.get_attackers() await dr.get_total_attackers()