feat(db): Campaign SQLModel + repo write/read methods

Adds the campaigns table and the BaseRepository / SQLModelRepository
methods that the campaign-clusterer worker (next commit) needs to
populate it. Mirrors the AttackerIdentity layer: schema_version from
day one for federation gossip, soft-merge via merged_into_uuid with a
chain-walking get_campaign_by_uuid, list_campaigns excluding merged-
out rows while list_all_campaigns returns the unfiltered set for the
revoke pass. attacker_identities.campaign_id gets a real FK now that
the target table exists.
This commit is contained in:
2026-04-26 08:54:28 -04:00
parent 059d1dba75
commit 0a1cf65ddb
7 changed files with 524 additions and 3 deletions

View File

@@ -39,6 +39,10 @@ from .attackers import (
from .attacker_intel import (
AttackerIntel,
)
from .campaigns import (
Campaign,
CampaignsResponse,
)
from .deploy import (
DeployIniRequest,
DeployResponse,

View File

@@ -122,9 +122,12 @@ class AttackerIdentity(SQLModel, table=True):
__tablename__ = "attacker_identities"
uuid: str = Field(primary_key=True)
schema_version: int = Field(default=1)
# Set by the campaign clusterer, downstream effort. The campaigns
# table doesn't exist yet — no FK constraint, just a soft pointer.
campaign_id: Optional[str] = Field(default=None, index=True)
# Set by the campaign clusterer. The ``campaigns`` table now
# exists; this is a real FK. Nullable until the campaign clusterer
# has run on this identity row.
campaign_id: Optional[str] = Field(
default=None, foreign_key="campaigns.uuid", index=True
)
first_seen_at: Optional[datetime] = Field(default=None, index=True)
last_seen_at: Optional[datetime] = Field(default=None, index=True)
created_at: datetime = Field(

View File

@@ -0,0 +1,80 @@
"""Campaign — operation-level grouping of resolved attacker identities."""
from datetime import datetime, timezone
from typing import Any, List, Optional
from pydantic import BaseModel
from sqlalchemy import Column, Text
from sqlmodel import Field, SQLModel
class Campaign(SQLModel, table=True):
"""
Campaign — one operation, one or more identities.
Sits one level above ``AttackerIdentity``: an actor (identity) may
appear in multiple campaigns over time, and a campaign may have
several distinct identities cooperating (e.g. a night-shift and
day-shift operator on the same job — fixture F5 multi_operator).
Populated by the campaign clusterer worker (downstream of identity
resolution). Empty rows are valid; the table ships empty until the
clusterer lands. ``schema_version`` is non-negotiable from day one
for the same federation-gossip reason ``AttackerIdentity`` carries
one — bumping campaign-level feature definitions without a version
field silently poisons cross-operator gossip in V2.
See ``development/CAMPAIGN_CLUSTERING.md`` for the signal taxonomy
(phase-handoff, shared-infra, temporal overlap, cohort).
"""
__tablename__ = "campaigns"
uuid: str = Field(primary_key=True)
schema_version: int = Field(default=1)
first_seen_at: Optional[datetime] = Field(default=None, index=True)
last_seen_at: Optional[datetime] = Field(default=None, index=True)
created_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True
)
updated_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True
)
# Campaign-cohesion score from the clusterer. Range [0, 1]; null
# until the clusterer writes. Higher = more confident the linked
# identities are part of the same operation.
confidence: Optional[float] = Field(default=None)
# Denormalized count of FK'd ``AttackerIdentity`` rows.
identity_count: int = Field(default=0)
# Aggregated fingerprint summary across member identities. Same
# JSON-serialized list[str] in TEXT shape as
# ``AttackerIdentity.{ja3,hassh,payload_simhashes,c2_endpoints}`` —
# federation gossip wants the same wire shape at every layer.
ja3_hashes: Optional[str] = Field(
default=None, sa_column=Column("ja3_hashes", Text, nullable=True)
)
hassh_hashes: Optional[str] = Field(
default=None, sa_column=Column("hassh_hashes", Text, nullable=True)
)
payload_simhashes: Optional[str] = Field(
default=None, sa_column=Column("payload_simhashes", Text, nullable=True)
)
c2_endpoints: Optional[str] = Field(
default=None, sa_column=Column("c2_endpoints", Text, nullable=True)
)
# Soft-merge audit trail — same revocable-merge pattern as
# ``AttackerIdentity.merged_into_uuid``. When the clusterer
# collapses two campaigns, the loser's row stays in place with this
# set to the winner's UUID; resolvers follow the chain.
merged_into_uuid: Optional[str] = Field(
default=None, foreign_key="campaigns.uuid", index=True
)
# Operator-editable free-form notes — annotation surface for
# human analysts ("APT-XX Q2 campaign", "matches CTI report 5678").
notes: Optional[str] = Field(
default=None, sa_column=Column("notes", Text, nullable=True)
)
class CampaignsResponse(BaseModel):
total: int
limit: int
offset: int
data: List[dict[str, Any]]

View File

@@ -474,6 +474,113 @@ class BaseRepository(ABC):
"""
pass
# ─── Campaign clustering reads ────────────────────────────────────────
# Layer above identity resolution: campaigns group identities into
# operations. Populated by ``decnet campaign-clusterer``. The
# read-only API below ships in the same wave; until the clusterer
# runs, every method returns empty/None against an empty table.
# See development/CAMPAIGN_CLUSTERING.md.
@abstractmethod
async def get_campaign_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]:
"""
Return one ``Campaign`` row by UUID, or ``None`` if absent.
If the row has ``merged_into_uuid`` set (i.e. the clusterer
soft-merged it into another campaign), implementations MUST
follow the chain and return the winner — same contract as
:meth:`get_identity_by_uuid`.
"""
pass
@abstractmethod
async def list_campaigns(
self, limit: int = 50, offset: int = 0,
) -> list[dict[str, Any]]:
"""Paginated list of campaign rows, newest-updated first.
Excludes merged-out rows so the list view is the de-duped truth
(mirrors :meth:`list_identities`).
"""
pass
@abstractmethod
async def count_campaigns(self) -> int:
"""Total campaign rows. Excludes merged-out rows."""
pass
@abstractmethod
async def list_identities_for_campaign(
self, campaign_uuid: str, limit: int = 50, offset: int = 0,
) -> list[dict[str, Any]]:
"""``AttackerIdentity`` rows linked to the given campaign, newest first."""
pass
@abstractmethod
async def count_identities_for_campaign(self, campaign_uuid: str) -> int:
"""Total ``AttackerIdentity`` rows FK'd to this campaign."""
pass
# ─── Campaign clustering writes (campaign-clusterer worker) ───────────
@abstractmethod
async def list_identities_for_clustering(
self, limit: Optional[int] = None,
) -> list[dict[str, Any]]:
"""Project every ``AttackerIdentity`` into the campaign
clusterer's input shape.
Returns dicts with at least ``uuid``, ``campaign_id``,
aggregated fingerprint summaries (``ja3_hashes``,
``hassh_hashes``, ``payload_simhashes``, ``c2_endpoints``),
``first_seen_at`` / ``last_seen_at``, ``merged_into_uuid``.
Empty list when no identities exist. ``limit`` bounds a
single tick's working set; leave ``None`` to fetch all.
"""
pass
@abstractmethod
async def create_campaign(self, row: dict[str, Any]) -> str:
"""Insert a new ``Campaign`` row and return its uuid.
``row`` must include ``uuid``; other fields are optional and
default per the model. Caller generates the uuid so it can be
used in the same tick to back-link identities.
"""
pass
@abstractmethod
async def set_identity_campaign_id(
self, identity_uuid: str, campaign_uuid: Optional[str],
) -> None:
"""Set or clear ``attacker_identities.campaign_id``.
Idempotent. Pass ``None`` to unlink (e.g. when revoking a
prior campaign assignment).
"""
pass
@abstractmethod
async def list_all_campaigns(self) -> list[dict[str, Any]]:
"""Every ``Campaign`` row, including merged-out ones.
Distinct from :meth:`list_campaigns`: the clusterer's
revocable-merge pass needs to re-evaluate merged-out
campaigns, so it pulls the unfiltered set.
"""
pass
@abstractmethod
async def update_campaign_merged_into(
self, campaign_uuid: str, winner_uuid: Optional[str],
) -> None:
"""Set or clear ``campaigns.merged_into_uuid``.
Pass ``winner_uuid`` to soft-merge the row into another
campaign; pass ``None`` to revoke a prior merge.
"""
pass
@abstractmethod
async def get_attacker_commands(
self,

View File

@@ -38,6 +38,7 @@ from decnet.web.db.models import (
AttackerBehavior,
AttackerIdentity,
AttackerIntel,
Campaign,
SessionProfile,
SmtpTarget,
SwarmHost,
@@ -1535,6 +1536,164 @@ class SQLModelRepository(BaseRepository):
await session.execute(statement)
await session.commit()
# ─── Campaign clustering reads ────────────────────────────────────────
async def get_campaign_by_uuid(self, uuid: str) -> Optional[dict[str, Any]]:
# Same chain-walk as get_identity_by_uuid; bounded against
# corrupted rings.
_MAX_MERGE_HOPS = 8
async with self._session() as session:
current_uuid = uuid
for _ in range(_MAX_MERGE_HOPS):
result = await session.execute(
select(Campaign).where(Campaign.uuid == current_uuid)
)
campaign = result.scalar_one_or_none()
if campaign is None:
return None
if campaign.merged_into_uuid is None:
return campaign.model_dump(mode="json")
current_uuid = campaign.merged_into_uuid
return campaign.model_dump(mode="json")
async def list_campaigns(
self, limit: int = 50, offset: int = 0,
) -> list[dict[str, Any]]:
statement = (
select(Campaign)
.where(Campaign.merged_into_uuid.is_(None))
.order_by(desc(Campaign.updated_at))
.offset(offset)
.limit(limit)
)
async with self._session() as session:
result = await session.execute(statement)
return [c.model_dump(mode="json") for c in result.scalars().all()]
async def count_campaigns(self) -> int:
statement = (
select(func.count())
.select_from(Campaign)
.where(Campaign.merged_into_uuid.is_(None))
)
async with self._session() as session:
result = await session.execute(statement)
return result.scalar() or 0
async def list_identities_for_campaign(
self, campaign_uuid: str, limit: int = 50, offset: int = 0,
) -> list[dict[str, Any]]:
statement = (
select(AttackerIdentity)
.where(AttackerIdentity.campaign_id == campaign_uuid)
.order_by(desc(AttackerIdentity.updated_at))
.offset(offset)
.limit(limit)
)
async with self._session() as session:
result = await session.execute(statement)
return [i.model_dump(mode="json") for i in result.scalars().all()]
async def count_identities_for_campaign(self, campaign_uuid: str) -> int:
statement = (
select(func.count())
.select_from(AttackerIdentity)
.where(AttackerIdentity.campaign_id == campaign_uuid)
)
async with self._session() as session:
result = await session.execute(statement)
return result.scalar() or 0
# ─── Campaign clustering writes (campaign-clusterer worker) ───────────
async def list_identities_for_clustering(
self, limit: Optional[int] = None,
) -> list[dict[str, Any]]:
# Project the columns the campaign clusterer's similarity
# graph reads. Narrow on purpose — future denormalised
# projections (commands_by_phase from log mining, decky-set
# aggregates) can land here without churning callers.
statement = select(
AttackerIdentity.uuid,
AttackerIdentity.campaign_id,
AttackerIdentity.merged_into_uuid,
AttackerIdentity.first_seen_at,
AttackerIdentity.last_seen_at,
AttackerIdentity.ja3_hashes,
AttackerIdentity.hassh_hashes,
AttackerIdentity.payload_simhashes,
AttackerIdentity.c2_endpoints,
).order_by(AttackerIdentity.created_at)
if limit is not None:
statement = statement.limit(limit)
async with self._session() as session:
result = await session.execute(statement)
return [
{
"uuid": row.uuid,
"campaign_id": row.campaign_id,
"merged_into_uuid": row.merged_into_uuid,
"first_seen_at": (
row.first_seen_at.isoformat()
if row.first_seen_at is not None
else None
),
"last_seen_at": (
row.last_seen_at.isoformat()
if row.last_seen_at is not None
else None
),
"ja3_hashes": row.ja3_hashes,
"hassh_hashes": row.hassh_hashes,
"payload_simhashes": row.payload_simhashes,
"c2_endpoints": row.c2_endpoints,
}
for row in result.all()
]
async def create_campaign(self, row: dict[str, Any]) -> str:
campaign = Campaign(**row)
async with self._session() as session:
session.add(campaign)
await session.commit()
return campaign.uuid
async def set_identity_campaign_id(
self, identity_uuid: str, campaign_uuid: Optional[str],
) -> None:
statement = (
update(AttackerIdentity)
.where(AttackerIdentity.uuid == identity_uuid)
.values(
campaign_id=campaign_uuid,
updated_at=datetime.now(timezone.utc),
)
)
async with self._session() as session:
await session.execute(statement)
await session.commit()
async def list_all_campaigns(self) -> list[dict[str, Any]]:
statement = select(Campaign).order_by(Campaign.created_at)
async with self._session() as session:
result = await session.execute(statement)
return [c.model_dump(mode="json") for c in result.scalars().all()]
async def update_campaign_merged_into(
self, campaign_uuid: str, winner_uuid: Optional[str],
) -> None:
statement = (
update(Campaign)
.where(Campaign.uuid == campaign_uuid)
.values(
merged_into_uuid=winner_uuid,
updated_at=datetime.now(timezone.utc),
)
)
async with self._session() as session:
await session.execute(statement)
await session.commit()
async def get_attacker_commands(
self,
uuid: str,

View File

@@ -71,6 +71,17 @@ class DummyRepo(BaseRepository):
async def set_attacker_identity_id(self, a, i): await super().set_attacker_identity_id(a, i)
async def list_all_identities(self): await super().list_all_identities(); return []
async def update_identity_merged_into(self, u, w): await super().update_identity_merged_into(u, w)
# Campaign clustering (this PR)
async def get_campaign_by_uuid(self, u): await super().get_campaign_by_uuid(u)
async def list_campaigns(self, limit=50, offset=0): await super().list_campaigns(limit, offset); return []
async def count_campaigns(self): await super().count_campaigns(); return 0
async def list_identities_for_campaign(self, u, limit=50, offset=0): await super().list_identities_for_campaign(u, limit, offset); return []
async def count_identities_for_campaign(self, u): await super().count_identities_for_campaign(u); return 0
async def list_identities_for_clustering(self, limit=None): await super().list_identities_for_clustering(limit); return []
async def create_campaign(self, row): await super().create_campaign(row); return ""
async def set_identity_campaign_id(self, i, c): await super().set_identity_campaign_id(i, c)
async def list_all_campaigns(self): await super().list_all_campaigns(); return []
async def update_campaign_merged_into(self, u, w): await super().update_campaign_merged_into(u, w)
@pytest.mark.asyncio
async def test_base_repo_coverage():
@@ -144,6 +155,18 @@ async def test_base_repo_coverage():
await dr.list_all_identities()
await dr.update_identity_merged_into("a", "b")
await dr.update_identity_merged_into("a", None)
await dr.get_campaign_by_uuid("a")
await dr.list_campaigns()
await dr.count_campaigns()
await dr.list_identities_for_campaign("a")
await dr.count_identities_for_campaign("a")
await dr.list_identities_for_clustering()
await dr.create_campaign({"uuid": "c"})
await dr.set_identity_campaign_id("i", "c")
await dr.set_identity_campaign_id("i", None)
await dr.list_all_campaigns()
await dr.update_campaign_merged_into("c", "d")
await dr.update_campaign_merged_into("c", None)
# Swarm methods: default NotImplementedError on BaseRepository. Covering
# them here keeps the coverage contract honest for the swarm CRUD surface.

View File

@@ -0,0 +1,145 @@
"""Tests for the Campaign clustering repo methods on SQLModelRepository."""
from __future__ import annotations
from datetime import datetime, timezone
import pytest
from decnet.web.db.factory import get_repository
@pytest.fixture
async def repo(tmp_path):
r = get_repository(db_path=str(tmp_path / "campaigns.db"))
await r.initialize()
return r
async def _create_identity(repo, uuid: str, **kwargs) -> str:
now = datetime.now(timezone.utc)
return await repo.create_attacker_identity({
"uuid": uuid,
"first_seen_at": kwargs.get("first_seen_at", now),
"last_seen_at": kwargs.get("last_seen_at", now),
"ja3_hashes": kwargs.get("ja3_hashes"),
"hassh_hashes": kwargs.get("hassh_hashes"),
"payload_simhashes": kwargs.get("payload_simhashes"),
"c2_endpoints": kwargs.get("c2_endpoints"),
})
@pytest.mark.asyncio
async def test_create_and_get_campaign(repo):
await repo.create_campaign({"uuid": "c1", "confidence": 0.8})
row = await repo.get_campaign_by_uuid("c1")
assert row is not None
assert row["uuid"] == "c1"
assert row["confidence"] == 0.8
assert row["merged_into_uuid"] is None
@pytest.mark.asyncio
async def test_get_campaign_follows_merge_chain(repo):
await repo.create_campaign({"uuid": "c1"})
await repo.create_campaign({"uuid": "c2"})
await repo.update_campaign_merged_into("c2", "c1")
# Querying the loser returns the winner.
row = await repo.get_campaign_by_uuid("c2")
assert row["uuid"] == "c1"
@pytest.mark.asyncio
async def test_list_and_count_excludes_merged_out(repo):
await repo.create_campaign({"uuid": "c1"})
await repo.create_campaign({"uuid": "c2"})
await repo.update_campaign_merged_into("c2", "c1")
listed = await repo.list_campaigns()
assert {c["uuid"] for c in listed} == {"c1"}
assert await repo.count_campaigns() == 1
@pytest.mark.asyncio
async def test_list_all_campaigns_includes_merged_out(repo):
await repo.create_campaign({"uuid": "c1"})
await repo.create_campaign({"uuid": "c2"})
await repo.update_campaign_merged_into("c2", "c1")
all_campaigns = await repo.list_all_campaigns()
assert {c["uuid"] for c in all_campaigns} == {"c1", "c2"}
@pytest.mark.asyncio
async def test_get_unknown_campaign_returns_none(repo):
assert await repo.get_campaign_by_uuid("nope") is None
@pytest.mark.asyncio
async def test_update_campaign_merged_into_can_revoke(repo):
await repo.create_campaign({"uuid": "c1"})
await repo.create_campaign({"uuid": "c2"})
await repo.update_campaign_merged_into("c2", "c1")
# Revoke
await repo.update_campaign_merged_into("c2", None)
row = await repo.get_campaign_by_uuid("c2")
assert row["uuid"] == "c2"
assert row["merged_into_uuid"] is None
@pytest.mark.asyncio
async def test_set_identity_campaign_id_links_and_unlinks(repo):
await repo.create_campaign({"uuid": "c1"})
await _create_identity(repo, "i1")
await repo.set_identity_campaign_id("i1", "c1")
linked = await repo.list_identities_for_campaign("c1")
assert {i["uuid"] for i in linked} == {"i1"}
assert await repo.count_identities_for_campaign("c1") == 1
await repo.set_identity_campaign_id("i1", None)
assert await repo.count_identities_for_campaign("c1") == 0
@pytest.mark.asyncio
async def test_list_identities_for_clustering_projects_expected_fields(repo):
await _create_identity(
repo, "i1",
ja3_hashes='["ja3-a"]',
hassh_hashes='["hassh-a"]',
payload_simhashes='["dead"]',
c2_endpoints='["1.2.3.4:443"]',
)
rows = await repo.list_identities_for_clustering()
assert len(rows) == 1
row = rows[0]
assert row["uuid"] == "i1"
assert row["ja3_hashes"] == '["ja3-a"]'
assert row["hassh_hashes"] == '["hassh-a"]'
assert row["payload_simhashes"] == '["dead"]'
assert row["c2_endpoints"] == '["1.2.3.4:443"]'
assert row["campaign_id"] is None
assert row["merged_into_uuid"] is None
assert row["first_seen_at"] is not None
@pytest.mark.asyncio
async def test_list_identities_for_clustering_respects_limit(repo):
for n in range(3):
await _create_identity(repo, f"i{n}")
assert len(await repo.list_identities_for_clustering(limit=2)) == 2
assert len(await repo.list_identities_for_clustering()) == 3
@pytest.mark.asyncio
async def test_list_identities_for_campaign_paginates(repo):
await repo.create_campaign({"uuid": "c1"})
for n in range(3):
await _create_identity(repo, f"i{n}")
await repo.set_identity_campaign_id(f"i{n}", "c1")
page = await repo.list_identities_for_campaign("c1", limit=2, offset=0)
assert len(page) == 2
page2 = await repo.list_identities_for_campaign("c1", limit=2, offset=2)
assert len(page2) == 1