From 7483d013116d4d4b7266fdf89feb8ad01f9948ff Mon Sep 17 00:00:00 2001 From: anti Date: Tue, 28 Apr 2026 15:07:39 -0400 Subject: [PATCH] refactor(db): extract IdentitiesMixin and CampaignsMixin Splits the AttackerIdentity and Campaign clustering reads/writes into sqlmodel_repo/identities.py and sqlmodel_repo/campaigns.py. Both call _deserialize_attacker (identities only) which resolves through AttackersMixin via MRO. --- decnet/web/db/sqlmodel_repo/__init__.py | 333 +--------------------- decnet/web/db/sqlmodel_repo/campaigns.py | 173 +++++++++++ decnet/web/db/sqlmodel_repo/identities.py | 185 ++++++++++++ 3 files changed, 362 insertions(+), 329 deletions(-) create mode 100644 decnet/web/db/sqlmodel_repo/campaigns.py create mode 100644 decnet/web/db/sqlmodel_repo/identities.py diff --git a/decnet/web/db/sqlmodel_repo/__init__.py b/decnet/web/db/sqlmodel_repo/__init__.py index e202ae98..b6e7b92f 100644 --- a/decnet/web/db/sqlmodel_repo/__init__.py +++ b/decnet/web/db/sqlmodel_repo/__init__.py @@ -29,9 +29,6 @@ from decnet.web.db.repository import BaseRepository from decnet.web.db.models import ( User, State, - Attacker, - AttackerIdentity, - Campaign, Topology, LAN, TopologyDecky, @@ -52,10 +49,12 @@ from decnet.web.db.sqlmodel_repo.attacker_intel import AttackerIntelMixin from decnet.web.db.sqlmodel_repo.attackers import AttackersMixin from decnet.web.db.sqlmodel_repo.auth import AuthMixin from decnet.web.db.sqlmodel_repo.bounties import BountiesMixin +from decnet.web.db.sqlmodel_repo.campaigns import CampaignsMixin from decnet.web.db.sqlmodel_repo.canary import CanaryMixin from decnet.web.db.sqlmodel_repo.credentials import CredentialsMixin from decnet.web.db.sqlmodel_repo.deckies import DeckiesMixin from decnet.web.db.sqlmodel_repo.fleet import FleetMixin +from decnet.web.db.sqlmodel_repo.identities import IdentitiesMixin from decnet.web.db.sqlmodel_repo.logs import LogsMixin from decnet.web.db.sqlmodel_repo.orchestrator import OrchestratorMixin from decnet.web.db.sqlmodel_repo.realism import RealismMixin @@ -68,10 +67,12 @@ class SQLModelRepository( AttackersMixin, AuthMixin, BountiesMixin, + CampaignsMixin, CanaryMixin, CredentialsMixin, DeckiesMixin, FleetMixin, + IdentitiesMixin, LogsMixin, OrchestratorMixin, RealismMixin, @@ -173,332 +174,6 @@ class SQLModelRepository( await session.commit() - # ----------------------------------------------------------- attackers - - # ─── Identity resolution reads ──────────────────────────────────────── - - async def get_identity_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: - # Follow merged_into_uuid up to the winner. Loop bounded by - # _MAX_MERGE_HOPS so a (hypothetically) corrupted ring can't - # spin the worker. Clusterer is responsible for never producing - # a cycle; this guard is belt-and-braces. - _MAX_MERGE_HOPS = 8 - async with self._session() as session: - current_uuid = uuid - for _ in range(_MAX_MERGE_HOPS): - result = await session.execute( - select(AttackerIdentity).where(AttackerIdentity.uuid == current_uuid) - ) - identity = result.scalar_one_or_none() - if identity is None: - return None - if identity.merged_into_uuid is None: - return identity.model_dump(mode="json") - current_uuid = identity.merged_into_uuid - # Hit the hop cap — surface what we have rather than recurse. - return identity.model_dump(mode="json") - - async def list_identities( - self, limit: int = 50, offset: int = 0, - ) -> list[dict[str, Any]]: - # Exclude merged-out rows so the list view is the de-duped truth. - # The history is still queryable per-uuid via get_identity_by_uuid - # and a future "merged into" endpoint when we need it. - statement = ( - select(AttackerIdentity) - .where(AttackerIdentity.merged_into_uuid.is_(None)) - .order_by(desc(AttackerIdentity.updated_at)) - .offset(offset) - .limit(limit) - ) - async with self._session() as session: - result = await session.execute(statement) - return [i.model_dump(mode="json") for i in result.scalars().all()] - - async def count_identities(self) -> int: - statement = ( - select(func.count()) - .select_from(AttackerIdentity) - .where(AttackerIdentity.merged_into_uuid.is_(None)) - ) - async with self._session() as session: - result = await session.execute(statement) - return result.scalar() or 0 - - async def list_observations_for_identity( - self, identity_uuid: str, limit: int = 50, offset: int = 0, - ) -> list[dict[str, Any]]: - statement = ( - select(Attacker) - .where(Attacker.identity_id == identity_uuid) - .order_by(desc(Attacker.last_seen)) - .offset(offset) - .limit(limit) - ) - async with self._session() as session: - result = await session.execute(statement) - return [ - self._deserialize_attacker(a.model_dump(mode="json")) - for a in result.scalars().all() - ] - - async def count_observations_for_identity(self, identity_uuid: str) -> int: - statement = ( - select(func.count()) - .select_from(Attacker) - .where(Attacker.identity_id == identity_uuid) - ) - async with self._session() as session: - result = await session.execute(statement) - return result.scalar() or 0 - - # ─── Identity resolution writes (clusterer worker) ───────────────────── - - async def list_attackers_for_clustering( - self, limit: Optional[int] = None, - ) -> list[dict[str, Any]]: - # Project the columns the clusterer's similarity graph reads. - # Keep it narrow so future denormalised projections (payloads - # joined from logs, c2 endpoints aggregated from sessions) can - # land here without churning every caller. ``fingerprints`` is - # the raw JSON list — the clusterer parses for JA3 / HASSH. - statement = select( - Attacker.uuid, Attacker.asn, Attacker.identity_id, Attacker.fingerprints, - ).order_by(Attacker.first_seen) - if limit is not None: - statement = statement.limit(limit) - async with self._session() as session: - result = await session.execute(statement) - return [ - { - "uuid": row.uuid, - "asn": row.asn, - "identity_id": row.identity_id, - "fingerprints": row.fingerprints, - } - for row in result.all() - ] - - async def create_attacker_identity(self, row: dict[str, Any]) -> str: - identity = AttackerIdentity(**row) - async with self._session() as session: - session.add(identity) - await session.commit() - return identity.uuid - - async def set_attacker_identity_id( - self, attacker_uuid: str, identity_uuid: str, - ) -> None: - statement = ( - update(Attacker) - .where(Attacker.uuid == attacker_uuid) - .values(identity_id=identity_uuid) - ) - async with self._session() as session: - await session.execute(statement) - await session.commit() - - async def list_all_identities(self) -> list[dict[str, Any]]: - statement = select(AttackerIdentity).order_by(AttackerIdentity.created_at) - async with self._session() as session: - result = await session.execute(statement) - return [i.model_dump(mode="json") for i in result.scalars().all()] - - async def update_identity_merged_into( - self, identity_uuid: str, winner_uuid: Optional[str], - ) -> None: - statement = ( - update(AttackerIdentity) - .where(AttackerIdentity.uuid == identity_uuid) - .values( - merged_into_uuid=winner_uuid, - updated_at=datetime.now(timezone.utc), - ) - ) - async with self._session() as session: - await session.execute(statement) - await session.commit() - - async def update_identity_fingerprints( - self, - identity_uuid: str, - *, - ja3_hashes: Optional[str] = None, - hassh_hashes: Optional[str] = None, - tls_cert_sha256: Optional[str] = None, - ) -> None: - statement = ( - update(AttackerIdentity) - .where(AttackerIdentity.uuid == identity_uuid) - .values( - ja3_hashes=ja3_hashes, - hassh_hashes=hassh_hashes, - tls_cert_sha256=tls_cert_sha256, - updated_at=datetime.now(timezone.utc), - ) - ) - async with self._session() as session: - await session.execute(statement) - await session.commit() - - # ─── Campaign clustering reads ──────────────────────────────────────── - - async def get_campaign_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: - # Same chain-walk as get_identity_by_uuid; bounded against - # corrupted rings. - _MAX_MERGE_HOPS = 8 - async with self._session() as session: - current_uuid = uuid - for _ in range(_MAX_MERGE_HOPS): - result = await session.execute( - select(Campaign).where(Campaign.uuid == current_uuid) - ) - campaign = result.scalar_one_or_none() - if campaign is None: - return None - if campaign.merged_into_uuid is None: - return campaign.model_dump(mode="json") - current_uuid = campaign.merged_into_uuid - return campaign.model_dump(mode="json") - - async def list_campaigns( - self, limit: int = 50, offset: int = 0, - ) -> list[dict[str, Any]]: - statement = ( - select(Campaign) - .where(Campaign.merged_into_uuid.is_(None)) - .order_by(desc(Campaign.updated_at)) - .offset(offset) - .limit(limit) - ) - async with self._session() as session: - result = await session.execute(statement) - return [c.model_dump(mode="json") for c in result.scalars().all()] - - async def count_campaigns(self) -> int: - statement = ( - select(func.count()) - .select_from(Campaign) - .where(Campaign.merged_into_uuid.is_(None)) - ) - async with self._session() as session: - result = await session.execute(statement) - return result.scalar() or 0 - - async def list_identities_for_campaign( - self, campaign_uuid: str, limit: int = 50, offset: int = 0, - ) -> list[dict[str, Any]]: - statement = ( - select(AttackerIdentity) - .where(AttackerIdentity.campaign_id == campaign_uuid) - .order_by(desc(AttackerIdentity.updated_at)) - .offset(offset) - .limit(limit) - ) - async with self._session() as session: - result = await session.execute(statement) - return [i.model_dump(mode="json") for i in result.scalars().all()] - - async def count_identities_for_campaign(self, campaign_uuid: str) -> int: - statement = ( - select(func.count()) - .select_from(AttackerIdentity) - .where(AttackerIdentity.campaign_id == campaign_uuid) - ) - async with self._session() as session: - result = await session.execute(statement) - return result.scalar() or 0 - - # ─── Campaign clustering writes (campaign-clusterer worker) ─────────── - - async def list_identities_for_clustering( - self, limit: Optional[int] = None, - ) -> list[dict[str, Any]]: - # Project the columns the campaign clusterer's similarity - # graph reads. Narrow on purpose — future denormalised - # projections (commands_by_phase from log mining, decky-set - # aggregates) can land here without churning callers. - statement = select( - AttackerIdentity.uuid, - AttackerIdentity.campaign_id, - AttackerIdentity.merged_into_uuid, - AttackerIdentity.first_seen_at, - AttackerIdentity.last_seen_at, - AttackerIdentity.ja3_hashes, - AttackerIdentity.hassh_hashes, - AttackerIdentity.payload_simhashes, - AttackerIdentity.c2_endpoints, - ).order_by(AttackerIdentity.created_at) - if limit is not None: - statement = statement.limit(limit) - async with self._session() as session: - result = await session.execute(statement) - return [ - { - "uuid": row.uuid, - "campaign_id": row.campaign_id, - "merged_into_uuid": row.merged_into_uuid, - "first_seen_at": ( - row.first_seen_at.isoformat() - if row.first_seen_at is not None - else None - ), - "last_seen_at": ( - row.last_seen_at.isoformat() - if row.last_seen_at is not None - else None - ), - "ja3_hashes": row.ja3_hashes, - "hassh_hashes": row.hassh_hashes, - "payload_simhashes": row.payload_simhashes, - "c2_endpoints": row.c2_endpoints, - } - for row in result.all() - ] - - async def create_campaign(self, row: dict[str, Any]) -> str: - campaign = Campaign(**row) - async with self._session() as session: - session.add(campaign) - await session.commit() - return campaign.uuid - - async def set_identity_campaign_id( - self, identity_uuid: str, campaign_uuid: Optional[str], - ) -> None: - statement = ( - update(AttackerIdentity) - .where(AttackerIdentity.uuid == identity_uuid) - .values( - campaign_id=campaign_uuid, - updated_at=datetime.now(timezone.utc), - ) - ) - async with self._session() as session: - await session.execute(statement) - await session.commit() - - async def list_all_campaigns(self) -> list[dict[str, Any]]: - statement = select(Campaign).order_by(Campaign.created_at) - async with self._session() as session: - result = await session.execute(statement) - return [c.model_dump(mode="json") for c in result.scalars().all()] - - async def update_campaign_merged_into( - self, campaign_uuid: str, winner_uuid: Optional[str], - ) -> None: - statement = ( - update(Campaign) - .where(Campaign.uuid == campaign_uuid) - .values( - merged_into_uuid=winner_uuid, - updated_at=datetime.now(timezone.utc), - ) - ) - async with self._session() as session: - await session.execute(statement) - await session.commit() - # ------------------------------------------------------------ mazenet async def create_topology(self, data: dict[str, Any]) -> str: diff --git a/decnet/web/db/sqlmodel_repo/campaigns.py b/decnet/web/db/sqlmodel_repo/campaigns.py new file mode 100644 index 00000000..9c9b10ff --- /dev/null +++ b/decnet/web/db/sqlmodel_repo/campaigns.py @@ -0,0 +1,173 @@ +"""Campaign reads + writes. + +Campaign = the second-tier clustering output that groups multiple +``AttackerIdentity`` rows into a coordinated activity cluster. The +campaign-clusterer worker drives the writes; the dashboard drives +the reads. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any, Optional + +from sqlalchemy import desc, func, select, update + +from decnet.web.db.models import AttackerIdentity, Campaign + + +class CampaignsMixin: + """Mixin: composed onto ``SQLModelRepository``.""" + + async def get_campaign_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: + # Same chain-walk as get_identity_by_uuid; bounded against + # corrupted rings. + _MAX_MERGE_HOPS = 8 + async with self._session() as session: + current_uuid = uuid + for _ in range(_MAX_MERGE_HOPS): + result = await session.execute( + select(Campaign).where(Campaign.uuid == current_uuid) + ) + campaign = result.scalar_one_or_none() + if campaign is None: + return None + if campaign.merged_into_uuid is None: + return campaign.model_dump(mode="json") + current_uuid = campaign.merged_into_uuid + return campaign.model_dump(mode="json") + + async def list_campaigns( + self, limit: int = 50, offset: int = 0, + ) -> list[dict[str, Any]]: + statement = ( + select(Campaign) + .where(Campaign.merged_into_uuid.is_(None)) + .order_by(desc(Campaign.updated_at)) + .offset(offset) + .limit(limit) + ) + async with self._session() as session: + result = await session.execute(statement) + return [c.model_dump(mode="json") for c in result.scalars().all()] + + async def count_campaigns(self) -> int: + statement = ( + select(func.count()) + .select_from(Campaign) + .where(Campaign.merged_into_uuid.is_(None)) + ) + async with self._session() as session: + result = await session.execute(statement) + return result.scalar() or 0 + + async def list_identities_for_campaign( + self, campaign_uuid: str, limit: int = 50, offset: int = 0, + ) -> list[dict[str, Any]]: + statement = ( + select(AttackerIdentity) + .where(AttackerIdentity.campaign_id == campaign_uuid) + .order_by(desc(AttackerIdentity.updated_at)) + .offset(offset) + .limit(limit) + ) + async with self._session() as session: + result = await session.execute(statement) + return [i.model_dump(mode="json") for i in result.scalars().all()] + + async def count_identities_for_campaign(self, campaign_uuid: str) -> int: + statement = ( + select(func.count()) + .select_from(AttackerIdentity) + .where(AttackerIdentity.campaign_id == campaign_uuid) + ) + async with self._session() as session: + result = await session.execute(statement) + return result.scalar() or 0 + + async def list_identities_for_clustering( + self, limit: Optional[int] = None, + ) -> list[dict[str, Any]]: + # Project the columns the campaign clusterer's similarity + # graph reads. Narrow on purpose — future denormalised + # projections (commands_by_phase from log mining, decky-set + # aggregates) can land here without churning callers. + statement = select( + AttackerIdentity.uuid, + AttackerIdentity.campaign_id, + AttackerIdentity.merged_into_uuid, + AttackerIdentity.first_seen_at, + AttackerIdentity.last_seen_at, + AttackerIdentity.ja3_hashes, + AttackerIdentity.hassh_hashes, + AttackerIdentity.payload_simhashes, + AttackerIdentity.c2_endpoints, + ).order_by(AttackerIdentity.created_at) + if limit is not None: + statement = statement.limit(limit) + async with self._session() as session: + result = await session.execute(statement) + return [ + { + "uuid": row.uuid, + "campaign_id": row.campaign_id, + "merged_into_uuid": row.merged_into_uuid, + "first_seen_at": ( + row.first_seen_at.isoformat() + if row.first_seen_at is not None + else None + ), + "last_seen_at": ( + row.last_seen_at.isoformat() + if row.last_seen_at is not None + else None + ), + "ja3_hashes": row.ja3_hashes, + "hassh_hashes": row.hassh_hashes, + "payload_simhashes": row.payload_simhashes, + "c2_endpoints": row.c2_endpoints, + } + for row in result.all() + ] + + async def create_campaign(self, row: dict[str, Any]) -> str: + campaign = Campaign(**row) + async with self._session() as session: + session.add(campaign) + await session.commit() + return campaign.uuid + + async def set_identity_campaign_id( + self, identity_uuid: str, campaign_uuid: Optional[str], + ) -> None: + statement = ( + update(AttackerIdentity) + .where(AttackerIdentity.uuid == identity_uuid) + .values( + campaign_id=campaign_uuid, + updated_at=datetime.now(timezone.utc), + ) + ) + async with self._session() as session: + await session.execute(statement) + await session.commit() + + async def list_all_campaigns(self) -> list[dict[str, Any]]: + statement = select(Campaign).order_by(Campaign.created_at) + async with self._session() as session: + result = await session.execute(statement) + return [c.model_dump(mode="json") for c in result.scalars().all()] + + async def update_campaign_merged_into( + self, campaign_uuid: str, winner_uuid: Optional[str], + ) -> None: + statement = ( + update(Campaign) + .where(Campaign.uuid == campaign_uuid) + .values( + merged_into_uuid=winner_uuid, + updated_at=datetime.now(timezone.utc), + ) + ) + async with self._session() as session: + await session.execute(statement) + await session.commit() diff --git a/decnet/web/db/sqlmodel_repo/identities.py b/decnet/web/db/sqlmodel_repo/identities.py new file mode 100644 index 00000000..afb0b45d --- /dev/null +++ b/decnet/web/db/sqlmodel_repo/identities.py @@ -0,0 +1,185 @@ +"""AttackerIdentity reads + writes. + +Identity = the clustering output that groups multiple ``Attacker`` rows +(usually different IPs from the same actor) into one logical actor. +The identity-clusterer worker drives the writes; the dashboard drives +the reads. +""" +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Any, Optional + +from sqlalchemy import desc, func, select, update + +from decnet.web.db.models import Attacker, AttackerIdentity + + +class IdentitiesMixin: + """Mixin: composed onto ``SQLModelRepository``. + + ``self._deserialize_attacker`` resolves through ``AttackersMixin`` + via MRO. + """ + + async def get_identity_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: + # Follow merged_into_uuid up to the winner. Loop bounded by + # _MAX_MERGE_HOPS so a (hypothetically) corrupted ring can't + # spin the worker. Clusterer is responsible for never producing + # a cycle; this guard is belt-and-braces. + _MAX_MERGE_HOPS = 8 + async with self._session() as session: + current_uuid = uuid + for _ in range(_MAX_MERGE_HOPS): + result = await session.execute( + select(AttackerIdentity).where(AttackerIdentity.uuid == current_uuid) + ) + identity = result.scalar_one_or_none() + if identity is None: + return None + if identity.merged_into_uuid is None: + return identity.model_dump(mode="json") + current_uuid = identity.merged_into_uuid + # Hit the hop cap — surface what we have rather than recurse. + return identity.model_dump(mode="json") + + async def list_identities( + self, limit: int = 50, offset: int = 0, + ) -> list[dict[str, Any]]: + # Exclude merged-out rows so the list view is the de-duped truth. + # The history is still queryable per-uuid via get_identity_by_uuid + # and a future "merged into" endpoint when we need it. + statement = ( + select(AttackerIdentity) + .where(AttackerIdentity.merged_into_uuid.is_(None)) + .order_by(desc(AttackerIdentity.updated_at)) + .offset(offset) + .limit(limit) + ) + async with self._session() as session: + result = await session.execute(statement) + return [i.model_dump(mode="json") for i in result.scalars().all()] + + async def count_identities(self) -> int: + statement = ( + select(func.count()) + .select_from(AttackerIdentity) + .where(AttackerIdentity.merged_into_uuid.is_(None)) + ) + async with self._session() as session: + result = await session.execute(statement) + return result.scalar() or 0 + + async def list_observations_for_identity( + self, identity_uuid: str, limit: int = 50, offset: int = 0, + ) -> list[dict[str, Any]]: + statement = ( + select(Attacker) + .where(Attacker.identity_id == identity_uuid) + .order_by(desc(Attacker.last_seen)) + .offset(offset) + .limit(limit) + ) + async with self._session() as session: + result = await session.execute(statement) + return [ + self._deserialize_attacker(a.model_dump(mode="json")) + for a in result.scalars().all() + ] + + async def count_observations_for_identity(self, identity_uuid: str) -> int: + statement = ( + select(func.count()) + .select_from(Attacker) + .where(Attacker.identity_id == identity_uuid) + ) + async with self._session() as session: + result = await session.execute(statement) + return result.scalar() or 0 + + async def list_attackers_for_clustering( + self, limit: Optional[int] = None, + ) -> list[dict[str, Any]]: + # Project the columns the clusterer's similarity graph reads. + # Keep it narrow so future denormalised projections (payloads + # joined from logs, c2 endpoints aggregated from sessions) can + # land here without churning every caller. ``fingerprints`` is + # the raw JSON list — the clusterer parses for JA3 / HASSH. + statement = select( + Attacker.uuid, Attacker.asn, Attacker.identity_id, Attacker.fingerprints, + ).order_by(Attacker.first_seen) + if limit is not None: + statement = statement.limit(limit) + async with self._session() as session: + result = await session.execute(statement) + return [ + { + "uuid": row.uuid, + "asn": row.asn, + "identity_id": row.identity_id, + "fingerprints": row.fingerprints, + } + for row in result.all() + ] + + async def create_attacker_identity(self, row: dict[str, Any]) -> str: + identity = AttackerIdentity(**row) + async with self._session() as session: + session.add(identity) + await session.commit() + return identity.uuid + + async def set_attacker_identity_id( + self, attacker_uuid: str, identity_uuid: str, + ) -> None: + statement = ( + update(Attacker) + .where(Attacker.uuid == attacker_uuid) + .values(identity_id=identity_uuid) + ) + async with self._session() as session: + await session.execute(statement) + await session.commit() + + async def list_all_identities(self) -> list[dict[str, Any]]: + statement = select(AttackerIdentity).order_by(AttackerIdentity.created_at) + async with self._session() as session: + result = await session.execute(statement) + return [i.model_dump(mode="json") for i in result.scalars().all()] + + async def update_identity_merged_into( + self, identity_uuid: str, winner_uuid: Optional[str], + ) -> None: + statement = ( + update(AttackerIdentity) + .where(AttackerIdentity.uuid == identity_uuid) + .values( + merged_into_uuid=winner_uuid, + updated_at=datetime.now(timezone.utc), + ) + ) + async with self._session() as session: + await session.execute(statement) + await session.commit() + + async def update_identity_fingerprints( + self, + identity_uuid: str, + *, + ja3_hashes: Optional[str] = None, + hassh_hashes: Optional[str] = None, + tls_cert_sha256: Optional[str] = None, + ) -> None: + statement = ( + update(AttackerIdentity) + .where(AttackerIdentity.uuid == identity_uuid) + .values( + ja3_hashes=ja3_hashes, + hassh_hashes=hassh_hashes, + tls_cert_sha256=tls_cert_sha256, + updated_at=datetime.now(timezone.utc), + ) + ) + async with self._session() as session: + await session.execute(statement) + await session.commit()