From 3d00de8fd3492bdabaf8e325d36d95c8151df5a4 Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 28 Apr 2026 15:46:28 -0400 Subject: [PATCH] refactor(db): split attackers.py into an attackers/ subpackage Splits the 494-line attackers.py into five submixin files plus a composing AttackersMixin in attackers/__init__.py: _core.py (~95) Attacker CRUD + _deserialize_attacker behavior.py (~110) AttackerBehavior + _deserialize_behavior sessions.py (~50) SessionProfile read/write smtp.py (~70) SmtpTarget per-attacker + cross-attacker views activity.py (~190) log-derived activity (commands, leaks, artifacts, stored mail, session log, transcripts) IdentitiesMixin.list_observations_for_identity calls self._deserialize_attacker; MRO resolves it onto AttackersCoreMixin through the composed SQLModelRepository class. --- decnet/web/db/sqlmodel_repo/attackers.py | 494 ------------------ .../db/sqlmodel_repo/attackers/__init__.py | 32 ++ .../web/db/sqlmodel_repo/attackers/_core.py | 95 ++++ .../db/sqlmodel_repo/attackers/activity.py | 207 ++++++++ .../db/sqlmodel_repo/attackers/behavior.py | 106 ++++ .../db/sqlmodel_repo/attackers/sessions.py | 49 ++ decnet/web/db/sqlmodel_repo/attackers/smtp.py | 69 +++ 7 files changed, 558 insertions(+), 494 deletions(-) delete mode 100644 decnet/web/db/sqlmodel_repo/attackers.py create mode 100644 decnet/web/db/sqlmodel_repo/attackers/__init__.py create mode 100644 decnet/web/db/sqlmodel_repo/attackers/_core.py create mode 100644 decnet/web/db/sqlmodel_repo/attackers/activity.py create mode 100644 decnet/web/db/sqlmodel_repo/attackers/behavior.py create mode 100644 decnet/web/db/sqlmodel_repo/attackers/sessions.py create mode 100644 decnet/web/db/sqlmodel_repo/attackers/smtp.py diff --git a/decnet/web/db/sqlmodel_repo/attackers.py b/decnet/web/db/sqlmodel_repo/attackers.py deleted file mode 100644 index 7c97d743..00000000 --- a/decnet/web/db/sqlmodel_repo/attackers.py +++ /dev/null @@ -1,494 +0,0 @@ -"""Attacker domain: core CRUD, behavior, sessions, smtp targets, and -log-derived activity views (commands, leaks, artifacts, transcripts). - -Identity-resolution and campaign-clustering reads live in their own -modules (``identities.py`` / ``campaigns.py``) — they're conceptually -about grouping attackers, not the attackers themselves. -""" -from __future__ import annotations - -import json -import uuid as _uuid -from datetime import datetime, timezone -from typing import Any, List, Optional - -from sqlalchemy import desc, func, select - -from decnet.web.db.models import ( - Attacker, - AttackerBehavior, - Bounty, - Log, - SessionProfile, - SmtpTarget, -) - - -class AttackersMixin: - """Mixin: composed onto ``SQLModelRepository``.""" - - # ─── core attacker rows ──────────────────────────────────────────────── - - async def upsert_attacker(self, data: dict[str, Any]) -> str: - async with self._session() as session: - result = await session.execute( - select(Attacker).where(Attacker.ip == data["ip"]) - ) - existing = result.scalar_one_or_none() - if existing: - for k, v in data.items(): - setattr(existing, k, v) - session.add(existing) - row_uuid = existing.uuid - else: - row_uuid = str(_uuid.uuid4()) - data = {**data, "uuid": row_uuid} - session.add(Attacker(**data)) - await session.commit() - return row_uuid - - @staticmethod - def _deserialize_attacker(d: dict[str, Any]) -> dict[str, Any]: - for key in ("services", "deckies", "fingerprints", "commands"): - if isinstance(d.get(key), str): - try: - d[key] = json.loads(d[key]) - except (json.JSONDecodeError, TypeError): - pass - return d - - async def get_attacker_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: - async with self._session() as session: - result = await session.execute( - select(Attacker).where(Attacker.uuid == uuid) - ) - attacker = result.scalar_one_or_none() - if not attacker: - return None - return self._deserialize_attacker(attacker.model_dump(mode="json")) - - async def get_attackers( - self, - limit: int = 50, - offset: int = 0, - search: Optional[str] = None, - sort_by: str = "recent", - service: Optional[str] = None, - ) -> List[dict[str, Any]]: - order = { - "active": desc(Attacker.event_count), - "traversals": desc(Attacker.is_traversal), - }.get(sort_by, desc(Attacker.last_seen)) - - statement = select(Attacker).order_by(order).offset(offset).limit(limit) - if search: - statement = statement.where(Attacker.ip.like(f"%{search}%")) - if service: - statement = statement.where(Attacker.services.like(f'%"{service}"%')) - - async with self._session() as session: - result = await session.execute(statement) - return [ - self._deserialize_attacker(a.model_dump(mode="json")) - for a in result.scalars().all() - ] - - async def get_total_attackers( - self, search: Optional[str] = None, service: Optional[str] = None - ) -> int: - statement = select(func.count()).select_from(Attacker) - if search: - statement = statement.where(Attacker.ip.like(f"%{search}%")) - if service: - statement = statement.where(Attacker.services.like(f'%"{service}"%')) - - async with self._session() as session: - result = await session.execute(statement) - return result.scalar() or 0 - - # ─── attacker behavior (TCP fingerprint, timing, etc.) ──────────────── - - async def upsert_attacker_behavior( - self, - attacker_uuid: str, - data: dict[str, Any], - ) -> None: - async with self._session() as session: - result = await session.execute( - select(AttackerBehavior).where( - AttackerBehavior.attacker_uuid == attacker_uuid - ) - ) - existing = result.scalar_one_or_none() - payload = {**data, "updated_at": datetime.now(timezone.utc)} - if existing: - for k, v in payload.items(): - setattr(existing, k, v) - session.add(existing) - else: - session.add(AttackerBehavior(attacker_uuid=attacker_uuid, **payload)) - await session.commit() - - async def get_attacker_behavior( - self, - attacker_uuid: str, - ) -> Optional[dict[str, Any]]: - async with self._session() as session: - result = await session.execute( - select(AttackerBehavior).where( - AttackerBehavior.attacker_uuid == attacker_uuid - ) - ) - row = result.scalar_one_or_none() - if not row: - return None - return self._deserialize_behavior(row.model_dump(mode="json")) - - async def get_behaviors_for_ips( - self, - ips: set[str], - ) -> dict[str, dict[str, Any]]: - if not ips: - return {} - async with self._session() as session: - result = await session.execute( - select(Attacker.ip, AttackerBehavior) - .join(AttackerBehavior, Attacker.uuid == AttackerBehavior.attacker_uuid) - .where(Attacker.ip.in_(ips)) - ) - out: dict[str, dict[str, Any]] = {} - for ip, row in result.all(): - out[ip] = self._deserialize_behavior(row.model_dump(mode="json")) - return out - - @staticmethod - def _deserialize_behavior(d: dict[str, Any]) -> dict[str, Any]: - for key in ("tcp_fingerprint", "timing_stats", "phase_sequence"): - if isinstance(d.get(key), str): - try: - d[key] = json.loads(d[key]) - except (json.JSONDecodeError, TypeError): - pass - # Deserialize tool_guesses JSON array; normalise None → []. - raw = d.get("tool_guesses") - if isinstance(raw, str): - try: - parsed = json.loads(raw) - d["tool_guesses"] = parsed if isinstance(parsed, list) else [parsed] - except (json.JSONDecodeError, TypeError): - d["tool_guesses"] = [] - elif raw is None: - d["tool_guesses"] = [] - # Same list-or-None pattern for kex_order_raw. - raw_kex = d.get("kex_order_raw") - if isinstance(raw_kex, str): - try: - parsed_kex = json.loads(raw_kex) - d["kex_order_raw"] = parsed_kex if isinstance(parsed_kex, list) else [parsed_kex] - except (json.JSONDecodeError, TypeError): - d["kex_order_raw"] = [] - elif raw_kex is None: - d["kex_order_raw"] = [] - # Same list-or-None pattern for ssh_client_banners. - raw_banners = d.get("ssh_client_banners") - if isinstance(raw_banners, str): - try: - parsed_banners = json.loads(raw_banners) - d["ssh_client_banners"] = parsed_banners if isinstance(parsed_banners, list) else [parsed_banners] - except (json.JSONDecodeError, TypeError): - d["ssh_client_banners"] = [] - elif raw_banners is None: - d["ssh_client_banners"] = [] - return d - - # ─── session profiles ──────────────────────────────────────────────── - - async def upsert_session_profile( - self, - sid: str, - data: dict[str, Any], - ) -> None: - """ - Write (or update) the session_profile row for *sid*. - - Pre-v1, the typical call is the empty-write path at session close: - `upsert_session_profile(sid, {"log_id": })` — all keystroke - feature columns stay NULL until the V2 ingestion job populates them. - """ - async with self._session() as session: - result = await session.execute( - select(SessionProfile).where(SessionProfile.sid == sid) - ) - existing = result.scalar_one_or_none() - if existing: - for k, v in data.items(): - setattr(existing, k, v) - session.add(existing) - else: - session.add(SessionProfile(sid=sid, **data)) - await session.commit() - - async def get_session_profile( - self, - sid: str, - ) -> Optional[dict[str, Any]]: - async with self._session() as session: - result = await session.execute( - select(SessionProfile).where(SessionProfile.sid == sid) - ) - row = result.scalar_one_or_none() - if not row: - return None - return row.model_dump(mode="json") - - # ─── smtp targets ───────────────────────────────────────────────────── - - async def increment_smtp_target(self, attacker_uuid: str, domain: str) -> None: - """Upsert an (attacker_uuid, domain) pair and bump count + last_seen. - - Read-then-write under a single session — the UNIQUE constraint on - (attacker_uuid, domain) guards against duplicate rows if the race - ever materialises; we accept the ~1ms extra round-trip in exchange - for a single dialect-portable implementation. - """ - async with self._session() as session: - result = await session.execute( - select(SmtpTarget) - .where(SmtpTarget.attacker_uuid == attacker_uuid) - .where(SmtpTarget.domain == domain) - ) - existing = result.scalar_one_or_none() - now = datetime.now(timezone.utc) - if existing: - existing.count += 1 - existing.last_seen = now - session.add(existing) - else: - session.add(SmtpTarget( - attacker_uuid=attacker_uuid, - domain=domain, - first_seen=now, - last_seen=now, - count=1, - )) - await session.commit() - - async def list_smtp_targets(self, attacker_uuid: str) -> list[dict[str, Any]]: - async with self._session() as session: - result = await session.execute( - select(SmtpTarget) - .where(SmtpTarget.attacker_uuid == attacker_uuid) - .order_by(desc(SmtpTarget.last_seen)) - ) - return [r.model_dump(mode="json") for r in result.scalars().all()] - - async def smtp_target_seen(self, domain: str) -> dict[str, Any]: - """Aggregate rows for this domain across every attacker in the DB.""" - async with self._session() as session: - result = await session.execute( - select( - func.coalesce(func.sum(SmtpTarget.count), 0), - func.min(SmtpTarget.first_seen), - func.max(SmtpTarget.last_seen), - ).where(SmtpTarget.domain == domain) - ) - total, first_seen, last_seen = result.one() - return { - "seen": int(total) > 0, - "count": int(total), - "first_seen": first_seen, - "last_seen": last_seen, - } - - # ─── log-derived activity views ─────────────────────────────────────── - - async def get_attacker_commands( - self, - uuid: str, - limit: int = 50, - offset: int = 0, - service: Optional[str] = None, - ) -> dict[str, Any]: - async with self._session() as session: - result = await session.execute( - select(Attacker.commands).where(Attacker.uuid == uuid) - ) - raw = result.scalar_one_or_none() - if raw is None: - return {"total": 0, "data": []} - - commands: list = json.loads(raw) if isinstance(raw, str) else raw - if service: - commands = [c for c in commands if c.get("service") == service] - - total = len(commands) - page = commands[offset: offset + limit] - return {"total": total, "data": page} - - async def get_attacker_service_activity( - self, attacker_uuid: str - ) -> list[tuple[str, str]]: - """Return distinct ``(service, event_type)`` pairs for an attacker. - - Resolves IP then ``SELECT DISTINCT service, event_type FROM logs - WHERE attacker_ip = :ip`` — the result set is bounded by the - cardinality of services × event_types (tens, not thousands), so - this stays cheap even for attackers with long event streams. - Caller applies `event_kinds.bucket_services` to split into - scanned vs. interacted. - """ - async with self._session() as session: - ip_res = await session.execute( - select(Attacker.ip).where(Attacker.uuid == attacker_uuid) - ) - ip = ip_res.scalar_one_or_none() - if not ip: - return [] - rows = await session.execute( - select(Log.service, Log.event_type) - .where(Log.attacker_ip == ip) - .distinct() - ) - return [(svc, evt) for svc, evt in rows.all()] - - async def get_attacker_ip_leaks( - self, attacker_uuid: str, *, limit: int = 10, - ) -> list[dict[str, Any]]: - """Return ``bounty_type='ip_leak'`` rows for this attacker, newest - first, capped at ``limit``. Shape matches the XFF-mismatch - payload emitted by the ingester: keys include ``real_ip_claim``, - ``source_header``, ``headers_seen``. Use - :meth:`count_attacker_ip_leaks` to get the unbounded total for - rotation detection.""" - async with self._session() as session: - ip_res = await session.execute( - select(Attacker.ip).where(Attacker.uuid == attacker_uuid) - ) - ip = ip_res.scalar_one_or_none() - if not ip: - return [] - rows = await session.execute( - select(Bounty) - .where(Bounty.attacker_ip == ip) - .where(Bounty.bounty_type == "ip_leak") - .order_by(desc(Bounty.timestamp)) - .limit(limit) - ) - out: list[dict[str, Any]] = [] - for row in rows.scalars().all(): - rec = row.model_dump(mode="json") - # Bounty.payload is stored JSON-encoded; pre-decode for UX. - raw = rec.get("payload") - if isinstance(raw, str): - try: - rec["payload"] = json.loads(raw) - except (ValueError, TypeError): - rec["payload"] = {} - out.append(rec) - return out - - async def count_attacker_ip_leaks(self, attacker_uuid: str) -> int: - """Cheap COUNT(*) for XFF-rotation detection.""" - async with self._session() as session: - ip_res = await session.execute( - select(Attacker.ip).where(Attacker.uuid == attacker_uuid) - ) - ip = ip_res.scalar_one_or_none() - if not ip: - return 0 - count_res = await session.execute( - select(func.count(Bounty.id)) - .where(Bounty.attacker_ip == ip) - .where(Bounty.bounty_type == "ip_leak") - ) - return int(count_res.scalar() or 0) - - async def get_attacker_artifacts(self, uuid: str) -> list[dict[str, Any]]: - """Return `file_captured` logs for the attacker identified by UUID. - - Resolves the attacker's IP first, then queries the logs table on two - indexed columns (``attacker_ip`` and ``event_type``). No JSON extract - needed — the decky/stored_as are already decoded into ``fields`` by - the ingester and returned to the frontend for drawer rendering. - """ - async with self._session() as session: - ip_res = await session.execute( - select(Attacker.ip).where(Attacker.uuid == uuid) - ) - ip = ip_res.scalar_one_or_none() - if not ip: - return [] - rows = await session.execute( - select(Log) - .where(Log.attacker_ip == ip) - .where(Log.event_type == "file_captured") - .order_by(desc(Log.timestamp)) - .limit(200) - ) - return [r.model_dump(mode="json") for r in rows.scalars().all()] - - async def get_attacker_stored_mail(self, uuid: str) -> list[dict[str, Any]]: - """Return `message_stored` logs for an attacker, newest first. - - Mirrors :meth:`get_attacker_artifacts` — the SMTP template emits one - `message_stored` row per accepted DATA body, with headers + sha256 + - attachment manifest already decoded into ``fields`` by the ingester. - Capped at 200 rows to match the artifact/transcript query shape. - """ - async with self._session() as session: - ip_res = await session.execute( - select(Attacker.ip).where(Attacker.uuid == uuid) - ) - ip = ip_res.scalar_one_or_none() - if not ip: - return [] - rows = await session.execute( - select(Log) - .where(Log.attacker_ip == ip) - .where(Log.event_type == "message_stored") - .order_by(desc(Log.timestamp)) - .limit(200) - ) - return [r.model_dump(mode="json") for r in rows.scalars().all()] - - async def get_session_log(self, sid: str) -> Optional[dict[str, Any]]: - """Look up the `session_recorded` Log row that owns a given sid. - - sid is a v4 UUID embedded in the row's ``fields`` JSON blob. Matched - with LIKE on the textual sid substring — cheap given the bounded - cardinality of session_recorded rows vs. the full logs table. - """ - needle = f'"sid":"{sid}"' - async with self._session() as session: - rows = await session.execute( - select(Log) - .where(Log.event_type == "session_recorded") - .where(Log.fields.contains(needle)) - .limit(1) - ) - row = rows.scalars().first() - return row.model_dump(mode="json") if row else None - - async def get_attacker_transcripts(self, uuid: str) -> list[dict[str, Any]]: - """Return `session_recorded` logs for the attacker identified by UUID. - - Mirror of :meth:`get_attacker_artifacts` — sessions ride in the same - Log table with event_type=session_recorded; the ingester decodes the - RFC 5424 SD fields (sid, service, decky, src_ip, duration_s, bytes, - truncated, shard_path) into the returned ``fields`` blob. - """ - async with self._session() as session: - ip_res = await session.execute( - select(Attacker.ip).where(Attacker.uuid == uuid) - ) - ip = ip_res.scalar_one_or_none() - if not ip: - return [] - rows = await session.execute( - select(Log) - .where(Log.attacker_ip == ip) - .where(Log.event_type == "session_recorded") - .order_by(desc(Log.timestamp)) - .limit(200) - ) - return [r.model_dump(mode="json") for r in rows.scalars().all()] diff --git a/decnet/web/db/sqlmodel_repo/attackers/__init__.py b/decnet/web/db/sqlmodel_repo/attackers/__init__.py new file mode 100644 index 00000000..cb0b5b08 --- /dev/null +++ b/decnet/web/db/sqlmodel_repo/attackers/__init__.py @@ -0,0 +1,32 @@ +"""Attacker repository methods. + +The full domain spans ~500 lines of methods across attacker rows, +behavior signals, session profiles, SMTP victim tracking, and +log-derived activity views. Each concern lives in its own submixin; +``AttackersMixin`` composes them. + +``_deserialize_attacker`` lives on ``AttackersCoreMixin`` and is reached +from ``IdentitiesMixin.list_observations_for_identity`` via ``self.`` — +Python's MRO resolves it to the core mixin on the composed +``SQLModelRepository`` class. +""" +from __future__ import annotations + +from decnet.web.db.sqlmodel_repo.attackers._core import AttackersCoreMixin +from decnet.web.db.sqlmodel_repo.attackers.activity import AttackerActivityMixin +from decnet.web.db.sqlmodel_repo.attackers.behavior import AttackerBehaviorMixin +from decnet.web.db.sqlmodel_repo.attackers.sessions import SessionProfilesMixin +from decnet.web.db.sqlmodel_repo.attackers.smtp import SmtpTargetsMixin + + +class AttackersMixin( + AttackerActivityMixin, + AttackerBehaviorMixin, + SessionProfilesMixin, + SmtpTargetsMixin, + AttackersCoreMixin, +): + """Composed attackers mixin — see submixins for the actual methods.""" + + +__all__ = ["AttackersMixin"] diff --git a/decnet/web/db/sqlmodel_repo/attackers/_core.py b/decnet/web/db/sqlmodel_repo/attackers/_core.py new file mode 100644 index 00000000..a5240032 --- /dev/null +++ b/decnet/web/db/sqlmodel_repo/attackers/_core.py @@ -0,0 +1,95 @@ +"""Core ``Attacker`` row CRUD + the ``_deserialize_attacker`` helper. + +The helper lives here because sibling submixins and ``IdentitiesMixin`` +(``list_observations_for_identity``) both call it through ``self.`` — +MRO resolves them onto this mixin on the composed +``SQLModelRepository``. +""" +from __future__ import annotations + +import json +import uuid as _uuid +from typing import Any, List, Optional + +from sqlalchemy import desc, func, select + +from decnet.web.db.models import Attacker + + +class AttackersCoreMixin: + @staticmethod + def _deserialize_attacker(d: dict[str, Any]) -> dict[str, Any]: + for key in ("services", "deckies", "fingerprints", "commands"): + if isinstance(d.get(key), str): + try: + d[key] = json.loads(d[key]) + except (json.JSONDecodeError, TypeError): + pass + return d + + async def upsert_attacker(self, data: dict[str, Any]) -> str: + async with self._session() as session: + result = await session.execute( + select(Attacker).where(Attacker.ip == data["ip"]) + ) + existing = result.scalar_one_or_none() + if existing: + for k, v in data.items(): + setattr(existing, k, v) + session.add(existing) + row_uuid = existing.uuid + else: + row_uuid = str(_uuid.uuid4()) + data = {**data, "uuid": row_uuid} + session.add(Attacker(**data)) + await session.commit() + return row_uuid + + async def get_attacker_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: + async with self._session() as session: + result = await session.execute( + select(Attacker).where(Attacker.uuid == uuid) + ) + attacker = result.scalar_one_or_none() + if not attacker: + return None + return self._deserialize_attacker(attacker.model_dump(mode="json")) + + async def get_attackers( + self, + limit: int = 50, + offset: int = 0, + search: Optional[str] = None, + sort_by: str = "recent", + service: Optional[str] = None, + ) -> List[dict[str, Any]]: + order = { + "active": desc(Attacker.event_count), + "traversals": desc(Attacker.is_traversal), + }.get(sort_by, desc(Attacker.last_seen)) + + statement = select(Attacker).order_by(order).offset(offset).limit(limit) + if search: + statement = statement.where(Attacker.ip.like(f"%{search}%")) + if service: + statement = statement.where(Attacker.services.like(f'%"{service}"%')) + + async with self._session() as session: + result = await session.execute(statement) + return [ + self._deserialize_attacker(a.model_dump(mode="json")) + for a in result.scalars().all() + ] + + async def get_total_attackers( + self, search: Optional[str] = None, service: Optional[str] = None + ) -> int: + statement = select(func.count()).select_from(Attacker) + if search: + statement = statement.where(Attacker.ip.like(f"%{search}%")) + if service: + statement = statement.where(Attacker.services.like(f'%"{service}"%')) + + async with self._session() as session: + result = await session.execute(statement) + return result.scalar() or 0 diff --git a/decnet/web/db/sqlmodel_repo/attackers/activity.py b/decnet/web/db/sqlmodel_repo/attackers/activity.py new file mode 100644 index 00000000..60848fdb --- /dev/null +++ b/decnet/web/db/sqlmodel_repo/attackers/activity.py @@ -0,0 +1,207 @@ +"""Log-derived activity views: commands, service activity, IP leaks, +artifacts, stored mail, recorded sessions, transcripts. + +These read from the ``logs`` and ``bounty`` tables joined against the +``Attacker`` row to scope by IP — no separate activity table. +""" +from __future__ import annotations + +import json +from typing import Any, Optional + +from sqlalchemy import desc, func, select + +from decnet.web.db.models import Attacker, Bounty, Log + + +class AttackerActivityMixin: + async def get_attacker_commands( + self, + uuid: str, + limit: int = 50, + offset: int = 0, + service: Optional[str] = None, + ) -> dict[str, Any]: + async with self._session() as session: + result = await session.execute( + select(Attacker.commands).where(Attacker.uuid == uuid) + ) + raw = result.scalar_one_or_none() + if raw is None: + return {"total": 0, "data": []} + + commands: list = json.loads(raw) if isinstance(raw, str) else raw + if service: + commands = [c for c in commands if c.get("service") == service] + + total = len(commands) + page = commands[offset: offset + limit] + return {"total": total, "data": page} + + async def get_attacker_service_activity( + self, attacker_uuid: str + ) -> list[tuple[str, str]]: + """Return distinct ``(service, event_type)`` pairs for an attacker. + + Resolves IP then ``SELECT DISTINCT service, event_type FROM logs + WHERE attacker_ip = :ip`` — the result set is bounded by the + cardinality of services × event_types (tens, not thousands), so + this stays cheap even for attackers with long event streams. + Caller applies `event_kinds.bucket_services` to split into + scanned vs. interacted. + """ + async with self._session() as session: + ip_res = await session.execute( + select(Attacker.ip).where(Attacker.uuid == attacker_uuid) + ) + ip = ip_res.scalar_one_or_none() + if not ip: + return [] + rows = await session.execute( + select(Log.service, Log.event_type) + .where(Log.attacker_ip == ip) + .distinct() + ) + return [(svc, evt) for svc, evt in rows.all()] + + async def get_attacker_ip_leaks( + self, attacker_uuid: str, *, limit: int = 10, + ) -> list[dict[str, Any]]: + """Return ``bounty_type='ip_leak'`` rows for this attacker, newest + first, capped at ``limit``. Shape matches the XFF-mismatch + payload emitted by the ingester: keys include ``real_ip_claim``, + ``source_header``, ``headers_seen``. Use + :meth:`count_attacker_ip_leaks` to get the unbounded total for + rotation detection.""" + async with self._session() as session: + ip_res = await session.execute( + select(Attacker.ip).where(Attacker.uuid == attacker_uuid) + ) + ip = ip_res.scalar_one_or_none() + if not ip: + return [] + rows = await session.execute( + select(Bounty) + .where(Bounty.attacker_ip == ip) + .where(Bounty.bounty_type == "ip_leak") + .order_by(desc(Bounty.timestamp)) + .limit(limit) + ) + out: list[dict[str, Any]] = [] + for row in rows.scalars().all(): + rec = row.model_dump(mode="json") + # Bounty.payload is stored JSON-encoded; pre-decode for UX. + raw = rec.get("payload") + if isinstance(raw, str): + try: + rec["payload"] = json.loads(raw) + except (ValueError, TypeError): + rec["payload"] = {} + out.append(rec) + return out + + async def count_attacker_ip_leaks(self, attacker_uuid: str) -> int: + """Cheap COUNT(*) for XFF-rotation detection.""" + async with self._session() as session: + ip_res = await session.execute( + select(Attacker.ip).where(Attacker.uuid == attacker_uuid) + ) + ip = ip_res.scalar_one_or_none() + if not ip: + return 0 + count_res = await session.execute( + select(func.count(Bounty.id)) + .where(Bounty.attacker_ip == ip) + .where(Bounty.bounty_type == "ip_leak") + ) + return int(count_res.scalar() or 0) + + async def get_attacker_artifacts(self, uuid: str) -> list[dict[str, Any]]: + """Return `file_captured` logs for the attacker identified by UUID. + + Resolves the attacker's IP first, then queries the logs table on two + indexed columns (``attacker_ip`` and ``event_type``). No JSON extract + needed — the decky/stored_as are already decoded into ``fields`` by + the ingester and returned to the frontend for drawer rendering. + """ + async with self._session() as session: + ip_res = await session.execute( + select(Attacker.ip).where(Attacker.uuid == uuid) + ) + ip = ip_res.scalar_one_or_none() + if not ip: + return [] + rows = await session.execute( + select(Log) + .where(Log.attacker_ip == ip) + .where(Log.event_type == "file_captured") + .order_by(desc(Log.timestamp)) + .limit(200) + ) + return [r.model_dump(mode="json") for r in rows.scalars().all()] + + async def get_attacker_stored_mail(self, uuid: str) -> list[dict[str, Any]]: + """Return `message_stored` logs for an attacker, newest first. + + Mirrors :meth:`get_attacker_artifacts` — the SMTP template emits one + `message_stored` row per accepted DATA body, with headers + sha256 + + attachment manifest already decoded into ``fields`` by the ingester. + Capped at 200 rows to match the artifact/transcript query shape. + """ + async with self._session() as session: + ip_res = await session.execute( + select(Attacker.ip).where(Attacker.uuid == uuid) + ) + ip = ip_res.scalar_one_or_none() + if not ip: + return [] + rows = await session.execute( + select(Log) + .where(Log.attacker_ip == ip) + .where(Log.event_type == "message_stored") + .order_by(desc(Log.timestamp)) + .limit(200) + ) + return [r.model_dump(mode="json") for r in rows.scalars().all()] + + async def get_session_log(self, sid: str) -> Optional[dict[str, Any]]: + """Look up the `session_recorded` Log row that owns a given sid. + + sid is a v4 UUID embedded in the row's ``fields`` JSON blob. Matched + with LIKE on the textual sid substring — cheap given the bounded + cardinality of session_recorded rows vs. the full logs table. + """ + needle = f'"sid":"{sid}"' + async with self._session() as session: + rows = await session.execute( + select(Log) + .where(Log.event_type == "session_recorded") + .where(Log.fields.contains(needle)) + .limit(1) + ) + row = rows.scalars().first() + return row.model_dump(mode="json") if row else None + + async def get_attacker_transcripts(self, uuid: str) -> list[dict[str, Any]]: + """Return `session_recorded` logs for the attacker identified by UUID. + + Mirror of :meth:`get_attacker_artifacts` — sessions ride in the same + Log table with event_type=session_recorded; the ingester decodes the + RFC 5424 SD fields (sid, service, decky, src_ip, duration_s, bytes, + truncated, shard_path) into the returned ``fields`` blob. + """ + async with self._session() as session: + ip_res = await session.execute( + select(Attacker.ip).where(Attacker.uuid == uuid) + ) + ip = ip_res.scalar_one_or_none() + if not ip: + return [] + rows = await session.execute( + select(Log) + .where(Log.attacker_ip == ip) + .where(Log.event_type == "session_recorded") + .order_by(desc(Log.timestamp)) + .limit(200) + ) + return [r.model_dump(mode="json") for r in rows.scalars().all()] diff --git a/decnet/web/db/sqlmodel_repo/attackers/behavior.py b/decnet/web/db/sqlmodel_repo/attackers/behavior.py new file mode 100644 index 00000000..c413557b --- /dev/null +++ b/decnet/web/db/sqlmodel_repo/attackers/behavior.py @@ -0,0 +1,106 @@ +"""Per-attacker behavior signals (TCP fingerprint, timing stats, phase +sequence, tool guesses, KEX order, SSH client banners).""" +from __future__ import annotations + +import json +from datetime import datetime, timezone +from typing import Any, Optional + +from sqlalchemy import select + +from decnet.web.db.models import Attacker, AttackerBehavior + + +class AttackerBehaviorMixin: + async def upsert_attacker_behavior( + self, + attacker_uuid: str, + data: dict[str, Any], + ) -> None: + async with self._session() as session: + result = await session.execute( + select(AttackerBehavior).where( + AttackerBehavior.attacker_uuid == attacker_uuid + ) + ) + existing = result.scalar_one_or_none() + payload = {**data, "updated_at": datetime.now(timezone.utc)} + if existing: + for k, v in payload.items(): + setattr(existing, k, v) + session.add(existing) + else: + session.add(AttackerBehavior(attacker_uuid=attacker_uuid, **payload)) + await session.commit() + + async def get_attacker_behavior( + self, + attacker_uuid: str, + ) -> Optional[dict[str, Any]]: + async with self._session() as session: + result = await session.execute( + select(AttackerBehavior).where( + AttackerBehavior.attacker_uuid == attacker_uuid + ) + ) + row = result.scalar_one_or_none() + if not row: + return None + return self._deserialize_behavior(row.model_dump(mode="json")) + + async def get_behaviors_for_ips( + self, + ips: set[str], + ) -> dict[str, dict[str, Any]]: + if not ips: + return {} + async with self._session() as session: + result = await session.execute( + select(Attacker.ip, AttackerBehavior) + .join(AttackerBehavior, Attacker.uuid == AttackerBehavior.attacker_uuid) + .where(Attacker.ip.in_(ips)) + ) + out: dict[str, dict[str, Any]] = {} + for ip, row in result.all(): + out[ip] = self._deserialize_behavior(row.model_dump(mode="json")) + return out + + @staticmethod + def _deserialize_behavior(d: dict[str, Any]) -> dict[str, Any]: + for key in ("tcp_fingerprint", "timing_stats", "phase_sequence"): + if isinstance(d.get(key), str): + try: + d[key] = json.loads(d[key]) + except (json.JSONDecodeError, TypeError): + pass + # Deserialize tool_guesses JSON array; normalise None → []. + raw = d.get("tool_guesses") + if isinstance(raw, str): + try: + parsed = json.loads(raw) + d["tool_guesses"] = parsed if isinstance(parsed, list) else [parsed] + except (json.JSONDecodeError, TypeError): + d["tool_guesses"] = [] + elif raw is None: + d["tool_guesses"] = [] + # Same list-or-None pattern for kex_order_raw. + raw_kex = d.get("kex_order_raw") + if isinstance(raw_kex, str): + try: + parsed_kex = json.loads(raw_kex) + d["kex_order_raw"] = parsed_kex if isinstance(parsed_kex, list) else [parsed_kex] + except (json.JSONDecodeError, TypeError): + d["kex_order_raw"] = [] + elif raw_kex is None: + d["kex_order_raw"] = [] + # Same list-or-None pattern for ssh_client_banners. + raw_banners = d.get("ssh_client_banners") + if isinstance(raw_banners, str): + try: + parsed_banners = json.loads(raw_banners) + d["ssh_client_banners"] = parsed_banners if isinstance(parsed_banners, list) else [parsed_banners] + except (json.JSONDecodeError, TypeError): + d["ssh_client_banners"] = [] + elif raw_banners is None: + d["ssh_client_banners"] = [] + return d diff --git a/decnet/web/db/sqlmodel_repo/attackers/sessions.py b/decnet/web/db/sqlmodel_repo/attackers/sessions.py new file mode 100644 index 00000000..2374bea5 --- /dev/null +++ b/decnet/web/db/sqlmodel_repo/attackers/sessions.py @@ -0,0 +1,49 @@ +"""Per-session profile rows (keystroke-dynamics features land here at +ingestion-time post-V2).""" +from __future__ import annotations + +from typing import Any, Optional + +from sqlalchemy import select + +from decnet.web.db.models import SessionProfile + + +class SessionProfilesMixin: + async def upsert_session_profile( + self, + sid: str, + data: dict[str, Any], + ) -> None: + """ + Write (or update) the session_profile row for *sid*. + + Pre-v1, the typical call is the empty-write path at session close: + `upsert_session_profile(sid, {"log_id": })` — all keystroke + feature columns stay NULL until the V2 ingestion job populates them. + """ + async with self._session() as session: + result = await session.execute( + select(SessionProfile).where(SessionProfile.sid == sid) + ) + existing = result.scalar_one_or_none() + if existing: + for k, v in data.items(): + setattr(existing, k, v) + session.add(existing) + else: + session.add(SessionProfile(sid=sid, **data)) + await session.commit() + + async def get_session_profile( + self, + sid: str, + ) -> Optional[dict[str, Any]]: + async with self._session() as session: + result = await session.execute( + select(SessionProfile).where(SessionProfile.sid == sid) + ) + row = result.scalar_one_or_none() + if not row: + return None + return row.model_dump(mode="json") diff --git a/decnet/web/db/sqlmodel_repo/attackers/smtp.py b/decnet/web/db/sqlmodel_repo/attackers/smtp.py new file mode 100644 index 00000000..a6e5a1ec --- /dev/null +++ b/decnet/web/db/sqlmodel_repo/attackers/smtp.py @@ -0,0 +1,69 @@ +"""SMTP victim-domain tracking (per-attacker counters and +cross-attacker aggregate).""" +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any + +from sqlalchemy import desc, func, select + +from decnet.web.db.models import SmtpTarget + + +class SmtpTargetsMixin: + async def increment_smtp_target(self, attacker_uuid: str, domain: str) -> None: + """Upsert an (attacker_uuid, domain) pair and bump count + last_seen. + + Read-then-write under a single session — the UNIQUE constraint on + (attacker_uuid, domain) guards against duplicate rows if the race + ever materialises; we accept the ~1ms extra round-trip in exchange + for a single dialect-portable implementation. + """ + async with self._session() as session: + result = await session.execute( + select(SmtpTarget) + .where(SmtpTarget.attacker_uuid == attacker_uuid) + .where(SmtpTarget.domain == domain) + ) + existing = result.scalar_one_or_none() + now = datetime.now(timezone.utc) + if existing: + existing.count += 1 + existing.last_seen = now + session.add(existing) + else: + session.add(SmtpTarget( + attacker_uuid=attacker_uuid, + domain=domain, + first_seen=now, + last_seen=now, + count=1, + )) + await session.commit() + + async def list_smtp_targets(self, attacker_uuid: str) -> list[dict[str, Any]]: + async with self._session() as session: + result = await session.execute( + select(SmtpTarget) + .where(SmtpTarget.attacker_uuid == attacker_uuid) + .order_by(desc(SmtpTarget.last_seen)) + ) + return [r.model_dump(mode="json") for r in result.scalars().all()] + + async def smtp_target_seen(self, domain: str) -> dict[str, Any]: + """Aggregate rows for this domain across every attacker in the DB.""" + async with self._session() as session: + result = await session.execute( + select( + func.coalesce(func.sum(SmtpTarget.count), 0), + func.min(SmtpTarget.first_seen), + func.max(SmtpTarget.last_seen), + ).where(SmtpTarget.domain == domain) + ) + total, first_seen, last_seen = result.one() + return { + "seen": int(total) > 0, + "count": int(total), + "first_seen": first_seen, + "last_seen": last_seen, + }