diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index d98d93e5..596ce8d9 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -39,6 +39,10 @@ from .attackers import ( from .attacker_intel import ( AttackerIntel, ) +from .campaigns import ( + Campaign, + CampaignsResponse, +) from .deploy import ( DeployIniRequest, DeployResponse, diff --git a/decnet/web/db/models/attackers.py b/decnet/web/db/models/attackers.py index dc66efac..38f6124a 100644 --- a/decnet/web/db/models/attackers.py +++ b/decnet/web/db/models/attackers.py @@ -122,9 +122,12 @@ class AttackerIdentity(SQLModel, table=True): __tablename__ = "attacker_identities" uuid: str = Field(primary_key=True) schema_version: int = Field(default=1) - # Set by the campaign clusterer, downstream effort. The campaigns - # table doesn't exist yet — no FK constraint, just a soft pointer. - campaign_id: Optional[str] = Field(default=None, index=True) + # Set by the campaign clusterer. The ``campaigns`` table now + # exists; this is a real FK. Nullable until the campaign clusterer + # has run on this identity row. + campaign_id: Optional[str] = Field( + default=None, foreign_key="campaigns.uuid", index=True + ) first_seen_at: Optional[datetime] = Field(default=None, index=True) last_seen_at: Optional[datetime] = Field(default=None, index=True) created_at: datetime = Field( diff --git a/decnet/web/db/models/campaigns.py b/decnet/web/db/models/campaigns.py new file mode 100644 index 00000000..478588e1 --- /dev/null +++ b/decnet/web/db/models/campaigns.py @@ -0,0 +1,80 @@ +"""Campaign — operation-level grouping of resolved attacker identities.""" +from datetime import datetime, timezone +from typing import Any, List, Optional + +from pydantic import BaseModel +from sqlalchemy import Column, Text +from sqlmodel import Field, SQLModel + + +class Campaign(SQLModel, table=True): + """ + Campaign — one operation, one or more identities. + + Sits one level above ``AttackerIdentity``: an actor (identity) may + appear in multiple campaigns over time, and a campaign may have + several distinct identities cooperating (e.g. a night-shift and + day-shift operator on the same job — fixture F5 multi_operator). + + Populated by the campaign clusterer worker (downstream of identity + resolution). Empty rows are valid; the table ships empty until the + clusterer lands. ``schema_version`` is non-negotiable from day one + for the same federation-gossip reason ``AttackerIdentity`` carries + one — bumping campaign-level feature definitions without a version + field silently poisons cross-operator gossip in V2. + + See ``development/CAMPAIGN_CLUSTERING.md`` for the signal taxonomy + (phase-handoff, shared-infra, temporal overlap, cohort). + """ + __tablename__ = "campaigns" + uuid: str = Field(primary_key=True) + schema_version: int = Field(default=1) + first_seen_at: Optional[datetime] = Field(default=None, index=True) + last_seen_at: Optional[datetime] = Field(default=None, index=True) + created_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), index=True + ) + updated_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), index=True + ) + # Campaign-cohesion score from the clusterer. Range [0, 1]; null + # until the clusterer writes. Higher = more confident the linked + # identities are part of the same operation. + confidence: Optional[float] = Field(default=None) + # Denormalized count of FK'd ``AttackerIdentity`` rows. + identity_count: int = Field(default=0) + # Aggregated fingerprint summary across member identities. Same + # JSON-serialized list[str] in TEXT shape as + # ``AttackerIdentity.{ja3,hassh,payload_simhashes,c2_endpoints}`` — + # federation gossip wants the same wire shape at every layer. + ja3_hashes: Optional[str] = Field( + default=None, sa_column=Column("ja3_hashes", Text, nullable=True) + ) + hassh_hashes: Optional[str] = Field( + default=None, sa_column=Column("hassh_hashes", Text, nullable=True) + ) + payload_simhashes: Optional[str] = Field( + default=None, sa_column=Column("payload_simhashes", Text, nullable=True) + ) + c2_endpoints: Optional[str] = Field( + default=None, sa_column=Column("c2_endpoints", Text, nullable=True) + ) + # Soft-merge audit trail — same revocable-merge pattern as + # ``AttackerIdentity.merged_into_uuid``. When the clusterer + # collapses two campaigns, the loser's row stays in place with this + # set to the winner's UUID; resolvers follow the chain. + merged_into_uuid: Optional[str] = Field( + default=None, foreign_key="campaigns.uuid", index=True + ) + # Operator-editable free-form notes — annotation surface for + # human analysts ("APT-XX Q2 campaign", "matches CTI report 5678"). + notes: Optional[str] = Field( + default=None, sa_column=Column("notes", Text, nullable=True) + ) + + +class CampaignsResponse(BaseModel): + total: int + limit: int + offset: int + data: List[dict[str, Any]] diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index 79759c5d..a316f7f9 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -474,6 +474,113 @@ class BaseRepository(ABC): """ pass + # ─── Campaign clustering reads ──────────────────────────────────────── + # Layer above identity resolution: campaigns group identities into + # operations. Populated by ``decnet campaign-clusterer``. The + # read-only API below ships in the same wave; until the clusterer + # runs, every method returns empty/None against an empty table. + # See development/CAMPAIGN_CLUSTERING.md. + + @abstractmethod + async def get_campaign_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: + """ + Return one ``Campaign`` row by UUID, or ``None`` if absent. + + If the row has ``merged_into_uuid`` set (i.e. the clusterer + soft-merged it into another campaign), implementations MUST + follow the chain and return the winner — same contract as + :meth:`get_identity_by_uuid`. + """ + pass + + @abstractmethod + async def list_campaigns( + self, limit: int = 50, offset: int = 0, + ) -> list[dict[str, Any]]: + """Paginated list of campaign rows, newest-updated first. + + Excludes merged-out rows so the list view is the de-duped truth + (mirrors :meth:`list_identities`). + """ + pass + + @abstractmethod + async def count_campaigns(self) -> int: + """Total campaign rows. Excludes merged-out rows.""" + pass + + @abstractmethod + async def list_identities_for_campaign( + self, campaign_uuid: str, limit: int = 50, offset: int = 0, + ) -> list[dict[str, Any]]: + """``AttackerIdentity`` rows linked to the given campaign, newest first.""" + pass + + @abstractmethod + async def count_identities_for_campaign(self, campaign_uuid: str) -> int: + """Total ``AttackerIdentity`` rows FK'd to this campaign.""" + pass + + # ─── Campaign clustering writes (campaign-clusterer worker) ─────────── + + @abstractmethod + async def list_identities_for_clustering( + self, limit: Optional[int] = None, + ) -> list[dict[str, Any]]: + """Project every ``AttackerIdentity`` into the campaign + clusterer's input shape. + + Returns dicts with at least ``uuid``, ``campaign_id``, + aggregated fingerprint summaries (``ja3_hashes``, + ``hassh_hashes``, ``payload_simhashes``, ``c2_endpoints``), + ``first_seen_at`` / ``last_seen_at``, ``merged_into_uuid``. + Empty list when no identities exist. ``limit`` bounds a + single tick's working set; leave ``None`` to fetch all. + """ + pass + + @abstractmethod + async def create_campaign(self, row: dict[str, Any]) -> str: + """Insert a new ``Campaign`` row and return its uuid. + + ``row`` must include ``uuid``; other fields are optional and + default per the model. Caller generates the uuid so it can be + used in the same tick to back-link identities. + """ + pass + + @abstractmethod + async def set_identity_campaign_id( + self, identity_uuid: str, campaign_uuid: Optional[str], + ) -> None: + """Set or clear ``attacker_identities.campaign_id``. + + Idempotent. Pass ``None`` to unlink (e.g. when revoking a + prior campaign assignment). + """ + pass + + @abstractmethod + async def list_all_campaigns(self) -> list[dict[str, Any]]: + """Every ``Campaign`` row, including merged-out ones. + + Distinct from :meth:`list_campaigns`: the clusterer's + revocable-merge pass needs to re-evaluate merged-out + campaigns, so it pulls the unfiltered set. + """ + pass + + @abstractmethod + async def update_campaign_merged_into( + self, campaign_uuid: str, winner_uuid: Optional[str], + ) -> None: + """Set or clear ``campaigns.merged_into_uuid``. + + Pass ``winner_uuid`` to soft-merge the row into another + campaign; pass ``None`` to revoke a prior merge. + """ + pass + @abstractmethod async def get_attacker_commands( self, diff --git a/decnet/web/db/sqlmodel_repo.py b/decnet/web/db/sqlmodel_repo.py index 0bc394be..817b51f1 100644 --- a/decnet/web/db/sqlmodel_repo.py +++ b/decnet/web/db/sqlmodel_repo.py @@ -38,6 +38,7 @@ from decnet.web.db.models import ( AttackerBehavior, AttackerIdentity, AttackerIntel, + Campaign, SessionProfile, SmtpTarget, SwarmHost, @@ -1535,6 +1536,164 @@ class SQLModelRepository(BaseRepository): await session.execute(statement) await session.commit() + # ─── Campaign clustering reads ──────────────────────────────────────── + + async def get_campaign_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]: + # Same chain-walk as get_identity_by_uuid; bounded against + # corrupted rings. + _MAX_MERGE_HOPS = 8 + async with self._session() as session: + current_uuid = uuid + for _ in range(_MAX_MERGE_HOPS): + result = await session.execute( + select(Campaign).where(Campaign.uuid == current_uuid) + ) + campaign = result.scalar_one_or_none() + if campaign is None: + return None + if campaign.merged_into_uuid is None: + return campaign.model_dump(mode="json") + current_uuid = campaign.merged_into_uuid + return campaign.model_dump(mode="json") + + async def list_campaigns( + self, limit: int = 50, offset: int = 0, + ) -> list[dict[str, Any]]: + statement = ( + select(Campaign) + .where(Campaign.merged_into_uuid.is_(None)) + .order_by(desc(Campaign.updated_at)) + .offset(offset) + .limit(limit) + ) + async with self._session() as session: + result = await session.execute(statement) + return [c.model_dump(mode="json") for c in result.scalars().all()] + + async def count_campaigns(self) -> int: + statement = ( + select(func.count()) + .select_from(Campaign) + .where(Campaign.merged_into_uuid.is_(None)) + ) + async with self._session() as session: + result = await session.execute(statement) + return result.scalar() or 0 + + async def list_identities_for_campaign( + self, campaign_uuid: str, limit: int = 50, offset: int = 0, + ) -> list[dict[str, Any]]: + statement = ( + select(AttackerIdentity) + .where(AttackerIdentity.campaign_id == campaign_uuid) + .order_by(desc(AttackerIdentity.updated_at)) + .offset(offset) + .limit(limit) + ) + async with self._session() as session: + result = await session.execute(statement) + return [i.model_dump(mode="json") for i in result.scalars().all()] + + async def count_identities_for_campaign(self, campaign_uuid: str) -> int: + statement = ( + select(func.count()) + .select_from(AttackerIdentity) + .where(AttackerIdentity.campaign_id == campaign_uuid) + ) + async with self._session() as session: + result = await session.execute(statement) + return result.scalar() or 0 + + # ─── Campaign clustering writes (campaign-clusterer worker) ─────────── + + async def list_identities_for_clustering( + self, limit: Optional[int] = None, + ) -> list[dict[str, Any]]: + # Project the columns the campaign clusterer's similarity + # graph reads. Narrow on purpose — future denormalised + # projections (commands_by_phase from log mining, decky-set + # aggregates) can land here without churning callers. + statement = select( + AttackerIdentity.uuid, + AttackerIdentity.campaign_id, + AttackerIdentity.merged_into_uuid, + AttackerIdentity.first_seen_at, + AttackerIdentity.last_seen_at, + AttackerIdentity.ja3_hashes, + AttackerIdentity.hassh_hashes, + AttackerIdentity.payload_simhashes, + AttackerIdentity.c2_endpoints, + ).order_by(AttackerIdentity.created_at) + if limit is not None: + statement = statement.limit(limit) + async with self._session() as session: + result = await session.execute(statement) + return [ + { + "uuid": row.uuid, + "campaign_id": row.campaign_id, + "merged_into_uuid": row.merged_into_uuid, + "first_seen_at": ( + row.first_seen_at.isoformat() + if row.first_seen_at is not None + else None + ), + "last_seen_at": ( + row.last_seen_at.isoformat() + if row.last_seen_at is not None + else None + ), + "ja3_hashes": row.ja3_hashes, + "hassh_hashes": row.hassh_hashes, + "payload_simhashes": row.payload_simhashes, + "c2_endpoints": row.c2_endpoints, + } + for row in result.all() + ] + + async def create_campaign(self, row: dict[str, Any]) -> str: + campaign = Campaign(**row) + async with self._session() as session: + session.add(campaign) + await session.commit() + return campaign.uuid + + async def set_identity_campaign_id( + self, identity_uuid: str, campaign_uuid: Optional[str], + ) -> None: + statement = ( + update(AttackerIdentity) + .where(AttackerIdentity.uuid == identity_uuid) + .values( + campaign_id=campaign_uuid, + updated_at=datetime.now(timezone.utc), + ) + ) + async with self._session() as session: + await session.execute(statement) + await session.commit() + + async def list_all_campaigns(self) -> list[dict[str, Any]]: + statement = select(Campaign).order_by(Campaign.created_at) + async with self._session() as session: + result = await session.execute(statement) + return [c.model_dump(mode="json") for c in result.scalars().all()] + + async def update_campaign_merged_into( + self, campaign_uuid: str, winner_uuid: Optional[str], + ) -> None: + statement = ( + update(Campaign) + .where(Campaign.uuid == campaign_uuid) + .values( + merged_into_uuid=winner_uuid, + updated_at=datetime.now(timezone.utc), + ) + ) + async with self._session() as session: + await session.execute(statement) + await session.commit() + async def get_attacker_commands( self, uuid: str, diff --git a/tests/db/test_base_repo.py b/tests/db/test_base_repo.py index 0d635d6e..271e6fc0 100644 --- a/tests/db/test_base_repo.py +++ b/tests/db/test_base_repo.py @@ -71,6 +71,17 @@ class DummyRepo(BaseRepository): async def set_attacker_identity_id(self, a, i): await super().set_attacker_identity_id(a, i) async def list_all_identities(self): await super().list_all_identities(); return [] async def update_identity_merged_into(self, u, w): await super().update_identity_merged_into(u, w) + # Campaign clustering (this PR) + async def get_campaign_by_uuid(self, u): await super().get_campaign_by_uuid(u) + async def list_campaigns(self, limit=50, offset=0): await super().list_campaigns(limit, offset); return [] + async def count_campaigns(self): await super().count_campaigns(); return 0 + async def list_identities_for_campaign(self, u, limit=50, offset=0): await super().list_identities_for_campaign(u, limit, offset); return [] + async def count_identities_for_campaign(self, u): await super().count_identities_for_campaign(u); return 0 + async def list_identities_for_clustering(self, limit=None): await super().list_identities_for_clustering(limit); return [] + async def create_campaign(self, row): await super().create_campaign(row); return "" + async def set_identity_campaign_id(self, i, c): await super().set_identity_campaign_id(i, c) + async def list_all_campaigns(self): await super().list_all_campaigns(); return [] + async def update_campaign_merged_into(self, u, w): await super().update_campaign_merged_into(u, w) @pytest.mark.asyncio async def test_base_repo_coverage(): @@ -144,6 +155,18 @@ async def test_base_repo_coverage(): await dr.list_all_identities() await dr.update_identity_merged_into("a", "b") await dr.update_identity_merged_into("a", None) + await dr.get_campaign_by_uuid("a") + await dr.list_campaigns() + await dr.count_campaigns() + await dr.list_identities_for_campaign("a") + await dr.count_identities_for_campaign("a") + await dr.list_identities_for_clustering() + await dr.create_campaign({"uuid": "c"}) + await dr.set_identity_campaign_id("i", "c") + await dr.set_identity_campaign_id("i", None) + await dr.list_all_campaigns() + await dr.update_campaign_merged_into("c", "d") + await dr.update_campaign_merged_into("c", None) # Swarm methods: default NotImplementedError on BaseRepository. Covering # them here keeps the coverage contract honest for the swarm CRUD surface. diff --git a/tests/db/test_campaign_repo.py b/tests/db/test_campaign_repo.py new file mode 100644 index 00000000..1e98a92d --- /dev/null +++ b/tests/db/test_campaign_repo.py @@ -0,0 +1,145 @@ +"""Tests for the Campaign clustering repo methods on SQLModelRepository.""" +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest + +from decnet.web.db.factory import get_repository + + +@pytest.fixture +async def repo(tmp_path): + r = get_repository(db_path=str(tmp_path / "campaigns.db")) + await r.initialize() + return r + + +async def _create_identity(repo, uuid: str, **kwargs) -> str: + now = datetime.now(timezone.utc) + return await repo.create_attacker_identity({ + "uuid": uuid, + "first_seen_at": kwargs.get("first_seen_at", now), + "last_seen_at": kwargs.get("last_seen_at", now), + "ja3_hashes": kwargs.get("ja3_hashes"), + "hassh_hashes": kwargs.get("hassh_hashes"), + "payload_simhashes": kwargs.get("payload_simhashes"), + "c2_endpoints": kwargs.get("c2_endpoints"), + }) + + +@pytest.mark.asyncio +async def test_create_and_get_campaign(repo): + await repo.create_campaign({"uuid": "c1", "confidence": 0.8}) + row = await repo.get_campaign_by_uuid("c1") + assert row is not None + assert row["uuid"] == "c1" + assert row["confidence"] == 0.8 + assert row["merged_into_uuid"] is None + + +@pytest.mark.asyncio +async def test_get_campaign_follows_merge_chain(repo): + await repo.create_campaign({"uuid": "c1"}) + await repo.create_campaign({"uuid": "c2"}) + await repo.update_campaign_merged_into("c2", "c1") + + # Querying the loser returns the winner. + row = await repo.get_campaign_by_uuid("c2") + assert row["uuid"] == "c1" + + +@pytest.mark.asyncio +async def test_list_and_count_excludes_merged_out(repo): + await repo.create_campaign({"uuid": "c1"}) + await repo.create_campaign({"uuid": "c2"}) + await repo.update_campaign_merged_into("c2", "c1") + + listed = await repo.list_campaigns() + assert {c["uuid"] for c in listed} == {"c1"} + assert await repo.count_campaigns() == 1 + + +@pytest.mark.asyncio +async def test_list_all_campaigns_includes_merged_out(repo): + await repo.create_campaign({"uuid": "c1"}) + await repo.create_campaign({"uuid": "c2"}) + await repo.update_campaign_merged_into("c2", "c1") + + all_campaigns = await repo.list_all_campaigns() + assert {c["uuid"] for c in all_campaigns} == {"c1", "c2"} + + +@pytest.mark.asyncio +async def test_get_unknown_campaign_returns_none(repo): + assert await repo.get_campaign_by_uuid("nope") is None + + +@pytest.mark.asyncio +async def test_update_campaign_merged_into_can_revoke(repo): + await repo.create_campaign({"uuid": "c1"}) + await repo.create_campaign({"uuid": "c2"}) + await repo.update_campaign_merged_into("c2", "c1") + # Revoke + await repo.update_campaign_merged_into("c2", None) + + row = await repo.get_campaign_by_uuid("c2") + assert row["uuid"] == "c2" + assert row["merged_into_uuid"] is None + + +@pytest.mark.asyncio +async def test_set_identity_campaign_id_links_and_unlinks(repo): + await repo.create_campaign({"uuid": "c1"}) + await _create_identity(repo, "i1") + + await repo.set_identity_campaign_id("i1", "c1") + linked = await repo.list_identities_for_campaign("c1") + assert {i["uuid"] for i in linked} == {"i1"} + assert await repo.count_identities_for_campaign("c1") == 1 + + await repo.set_identity_campaign_id("i1", None) + assert await repo.count_identities_for_campaign("c1") == 0 + + +@pytest.mark.asyncio +async def test_list_identities_for_clustering_projects_expected_fields(repo): + await _create_identity( + repo, "i1", + ja3_hashes='["ja3-a"]', + hassh_hashes='["hassh-a"]', + payload_simhashes='["dead"]', + c2_endpoints='["1.2.3.4:443"]', + ) + rows = await repo.list_identities_for_clustering() + assert len(rows) == 1 + row = rows[0] + assert row["uuid"] == "i1" + assert row["ja3_hashes"] == '["ja3-a"]' + assert row["hassh_hashes"] == '["hassh-a"]' + assert row["payload_simhashes"] == '["dead"]' + assert row["c2_endpoints"] == '["1.2.3.4:443"]' + assert row["campaign_id"] is None + assert row["merged_into_uuid"] is None + assert row["first_seen_at"] is not None + + +@pytest.mark.asyncio +async def test_list_identities_for_clustering_respects_limit(repo): + for n in range(3): + await _create_identity(repo, f"i{n}") + assert len(await repo.list_identities_for_clustering(limit=2)) == 2 + assert len(await repo.list_identities_for_clustering()) == 3 + + +@pytest.mark.asyncio +async def test_list_identities_for_campaign_paginates(repo): + await repo.create_campaign({"uuid": "c1"}) + for n in range(3): + await _create_identity(repo, f"i{n}") + await repo.set_identity_campaign_id(f"i{n}", "c1") + + page = await repo.list_identities_for_campaign("c1", limit=2, offset=0) + assert len(page) == 2 + page2 = await repo.list_identities_for_campaign("c1", limit=2, offset=2) + assert len(page2) == 1