feat(identity): AttackerIdentity table + nullable attackers.identity_id FK

Schema-only commit, first of the five-step substrate for identity
resolution. The clusterer that populates identities lands later; this
ships the table empty and the FK uniformly NULL on existing rows.

* decnet/web/db/models/attackers.py — new AttackerIdentity SQLModel
  (uuid PK, schema_version, fingerprint summary lists, kd_digraph_simhash,
  merged_into_uuid self-FK, all clusterer-populated fields nullable).
  Attacker grows a nullable indexed identity_id FK + docstring marking
  it as the per-IP observation row.
* decnet/web/db/models/__init__.py — re-exports AttackerIdentity.
* tests/db/test_identity_schema.py — 9 schema invariants: table exists,
  identity_id nullable + indexed, FK targets attacker_identities.uuid,
  schema_version defaults to 1, attacker rows inserted with NULL
  identity_id, FK constraint blocks orphans.

463 unrelated db/web/profiler/correlation tests still green. See
development/IDENTITY_RESOLUTION.md for the full design.
This commit is contained in:
2026-04-26 07:00:24 -04:00
parent 7904ef1308
commit 84c1ca9c9b
3 changed files with 312 additions and 0 deletions

View File

@@ -31,6 +31,7 @@ from .auth import (
from .attackers import (
Attacker,
AttackerBehavior,
AttackerIdentity,
AttackersResponse,
SessionProfile,
SmtpTarget,
@@ -160,6 +161,7 @@ __all__ = [
# attackers
"Attacker",
"AttackerBehavior",
"AttackerIdentity",
"AttackerIntel",
"AttackersResponse",
"SessionProfile",

View File

@@ -30,9 +30,28 @@ KD_START_OF_ACTION_IDLE_S: float = 2.0 # idle gap that counts as "new action"
class Attacker(SQLModel, table=True):
"""
Per-IP **observation** row. Every distinct source IP we observe gets
one of these. The semantic role is "observation event," not "actor
identity" — an actor rotating across N IPs produces N rows here.
The deduped actor view lives in ``AttackerIdentity`` (one identity
per actor; many observations per identity); the per-operation view
lives in ``Campaign``. ``identity_id`` is set by the clusterer
worker once it resolves which observations are the same hands.
NULL while the clusterer hasn't run on this row yet.
See ``development/IDENTITY_RESOLUTION.md`` for the three-level
hierarchy rationale.
"""
__tablename__ = "attackers"
uuid: str = Field(primary_key=True)
ip: str = Field(index=True)
identity_id: Optional[str] = Field(
default=None,
foreign_key="attacker_identities.uuid",
index=True,
)
first_seen: datetime = Field(index=True)
last_seen: datetime = Field(index=True)
event_count: int = Field(default=0)
@@ -79,6 +98,91 @@ class Attacker(SQLModel, table=True):
)
class AttackerIdentity(SQLModel, table=True):
"""
Resolved actor identity — the dedup'd "same hands" row that one or
more ``Attacker`` observations FK into. Populated by the (future)
clusterer worker; NULL on every observation until it runs.
Why a separate table from ``Attacker``: an actor rotating across N
IPs produces N observation rows but only ONE identity row. The
identity is recovered from signals the attacker can't cheaply
rotate — JA3, HASSH, payload hashes, C2 callbacks, and (V2)
keystroke-rhythm SimHash. See ``development/IDENTITY_RESOLUTION.md``.
All clusterer-populated fields are nullable; the table ships empty
in the schema-only PR (commit 1) and stays empty until the
clusterer lands. Empty is valid.
``schema_version`` is non-negotiable from day one. Federation
gossip in V2 will share identity vectors across operators;
bumping feature definitions without a version field silently
poisons receivers.
"""
__tablename__ = "attacker_identities"
uuid: str = Field(primary_key=True)
schema_version: int = Field(default=1)
# Set by the campaign clusterer, downstream effort. The campaigns
# table doesn't exist yet — no FK constraint, just a soft pointer.
campaign_id: Optional[str] = Field(default=None, index=True)
first_seen_at: Optional[datetime] = Field(default=None, index=True)
last_seen_at: Optional[datetime] = Field(default=None, index=True)
created_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True
)
updated_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc), index=True
)
# Identity-cohesion score from the clusterer. Range [0, 1]; null
# until the clusterer writes. Higher = more confident the
# observations linked to this identity are the same hands.
confidence: Optional[float] = Field(default=None)
# Denormalized count of FK'd Attacker rows. Maintained by the
# clusterer when it links/unlinks. Cheap dashboard read.
observation_count: int = Field(default=0)
# Fingerprint summary columns. JSON-serialized list[str] in TEXT
# because: (a) federation gossip wants this exact shape on the
# wire, (b) MySQL can't index BLOB/TEXT without prefix lengths,
# (c) actors can present multiple JA3/HASSH values across tools
# so a scalar column is wrong.
ja3_hashes: Optional[str] = Field(
default=None, sa_column=Column("ja3_hashes", Text, nullable=True)
)
hassh_hashes: Optional[str] = Field(
default=None, sa_column=Column("hassh_hashes", Text, nullable=True)
)
# Payload SimHash list — 64-bit ints serialized as hex strings.
# SimHashes are Hamming-comparable, which is the entire reason
# they're a list (not a set).
payload_simhashes: Optional[str] = Field(
default=None, sa_column=Column("payload_simhashes", Text, nullable=True)
)
c2_endpoints: Optional[str] = Field(
default=None, sa_column=Column("c2_endpoints", Text, nullable=True)
)
# V2 keystroke-dynamics hook. Same shape as
# SessionProfile.kd_digraph_simhash; this is the centroid (or
# majority vote) across the identity's sessions. BINARY(8) so
# MySQL can index without a prefix length, same as session_profile.
kd_digraph_simhash: Optional[bytes] = Field(
default=None,
sa_column=Column("kd_digraph_simhash", BINARY(8), nullable=True, index=True),
)
# Soft-merge audit trail. When the clusterer collapses two
# identities, the loser's row stays in place with this set to the
# winner's UUID — preserves the audit trail without orphaning FKs
# from any cached subscribers. Resolvers (e.g.
# GET /identities/{uuid}) follow the chain and surface the winner.
merged_into_uuid: Optional[str] = Field(
default=None, foreign_key="attacker_identities.uuid", index=True
)
# Operator-editable free-form notes — annotation surface for human
# analysts ("known APT-XX cluster," "matches MISP event 1234").
notes: Optional[str] = Field(
default=None, sa_column=Column("notes", Text, nullable=True)
)
class AttackerBehavior(SQLModel, table=True):
"""
Timing & behavioral profile for an attacker, joined to Attacker by uuid.

View File

@@ -0,0 +1,206 @@
"""
Schema-only tests for the AttackerIdentity table and the
attackers.identity_id FK.
The identities table ships empty in this PR; the clusterer that
populates it is a separate downstream effort. These tests verify only
that the schema lands correctly:
* the table exists after metadata.create_all()
* attackers.identity_id is nullable and indexed
* the FK references attacker_identities.uuid
* an attacker row may be inserted with identity_id=NULL
* an identity row may be inserted with all clusterer-populated columns NULL
If any of these regress, downstream API/frontend/clusterer work all
stop. See development/IDENTITY_RESOLUTION.md §Schema.
"""
from __future__ import annotations
import sqlite3
import uuid
from datetime import datetime, timezone
import pytest
from sqlalchemy import inspect
from sqlmodel import Session
from decnet.web.db.models import Attacker, AttackerIdentity
from decnet.web.db.sqlite.database import get_sync_engine, init_db
@pytest.fixture
def db_path(tmp_path) -> str:
p = tmp_path / "schema.db"
init_db(str(p))
return str(p)
def test_attacker_identities_table_exists(db_path: str) -> None:
engine = get_sync_engine(db_path)
inspector = inspect(engine)
assert "attacker_identities" in inspector.get_table_names()
def test_attackers_identity_id_column_present_and_nullable(db_path: str) -> None:
engine = get_sync_engine(db_path)
inspector = inspect(engine)
columns = {c["name"]: c for c in inspector.get_columns("attackers")}
assert "identity_id" in columns, "attackers.identity_id column missing"
assert columns["identity_id"]["nullable"] is True, (
"attackers.identity_id must be nullable — clusterer hasn't run yet on existing rows"
)
def test_attackers_identity_id_is_indexed(db_path: str) -> None:
engine = get_sync_engine(db_path)
inspector = inspect(engine)
indexes = inspector.get_indexes("attackers")
indexed_columns = {col for idx in indexes for col in idx["column_names"]}
assert "identity_id" in indexed_columns, (
"attackers.identity_id needs an index for join performance "
"(IdentityDetail aggregates by identity_id; without an index "
"every lookup is a full scan)"
)
def test_attackers_identity_id_fk_targets_attacker_identities(db_path: str) -> None:
engine = get_sync_engine(db_path)
inspector = inspect(engine)
fks = inspector.get_foreign_keys("attackers")
identity_fks = [
fk for fk in fks if "identity_id" in fk["constrained_columns"]
]
assert identity_fks, "no FK on attackers.identity_id"
assert identity_fks[0]["referred_table"] == "attacker_identities"
assert identity_fks[0]["referred_columns"] == ["uuid"]
def test_identity_schema_version_default_is_1(db_path: str) -> None:
"""
schema_version is non-negotiable from day one. Federation gossip
in V2 will share identity vectors across operators; bumping the
feature definitions without a version field silently poisons
receivers. Default must be 1 on insert.
"""
engine = get_sync_engine(db_path)
with Session(engine) as session:
identity = AttackerIdentity(uuid=str(uuid.uuid4()))
session.add(identity)
session.commit()
session.refresh(identity)
assert identity.schema_version == 1
def test_attacker_can_be_inserted_with_null_identity_id(db_path: str) -> None:
"""
Existing code paths (profiler, correlator) keep upserting attackers
without setting identity_id. They MUST work unchanged — the
identity_id column is nullable and remains NULL until the clusterer
runs.
"""
engine = get_sync_engine(db_path)
with Session(engine) as session:
now = datetime.now(timezone.utc)
att = Attacker(
uuid=str(uuid.uuid4()),
ip="203.0.113.4",
first_seen=now,
last_seen=now,
)
session.add(att)
session.commit()
session.refresh(att)
assert att.identity_id is None
def test_identity_with_all_clusterer_fields_null(db_path: str) -> None:
"""
The table ships empty; even when the clusterer eventually inserts
rows, it may write a row with most fields null (e.g. before
fingerprint summaries have been computed). Every clusterer-populated
column must accept NULL.
"""
engine = get_sync_engine(db_path)
with Session(engine) as session:
identity = AttackerIdentity(uuid=str(uuid.uuid4()))
session.add(identity)
session.commit()
session.refresh(identity)
for field in (
"campaign_id",
"first_seen_at",
"last_seen_at",
"confidence",
"ja3_hashes",
"hassh_hashes",
"payload_simhashes",
"c2_endpoints",
"kd_digraph_simhash",
"merged_into_uuid",
"notes",
):
assert getattr(identity, field) is None, (
f"AttackerIdentity.{field} must default to None — "
f"the table ships empty pre-clusterer"
)
# observation_count is denormalized; defaults to 0 (not NULL).
assert identity.observation_count == 0
def test_attacker_identity_link_round_trip(db_path: str) -> None:
"""
End-to-end: insert an identity, link an attacker observation to
it via identity_id FK, query both sides. Smoke-tests the schema
works as designed without invoking the production repo layer.
"""
engine = get_sync_engine(db_path)
with Session(engine) as session:
identity = AttackerIdentity(uuid=str(uuid.uuid4()))
session.add(identity)
session.commit()
now = datetime.now(timezone.utc)
att = Attacker(
uuid=str(uuid.uuid4()),
ip="203.0.113.5",
first_seen=now,
last_seen=now,
identity_id=identity.uuid,
)
session.add(att)
session.commit()
session.refresh(att)
assert att.identity_id == identity.uuid
def test_identity_id_fk_constraint_blocks_orphans(db_path: str) -> None:
"""
Inserting an attacker with identity_id pointing at a nonexistent
identity must fail. The clusterer should never write an orphan
link; the schema enforces that contract.
SQLite's PRAGMA foreign_keys is off by default at the connection
level; we enable it explicitly here so the test reflects the
contract production code relies on (via the same PRAGMA on its
connections).
"""
with sqlite3.connect(db_path) as conn:
conn.execute("PRAGMA foreign_keys = ON")
with pytest.raises(sqlite3.IntegrityError):
conn.execute(
"INSERT INTO attackers (uuid, ip, first_seen, last_seen, "
"event_count, service_count, decky_count, services, deckies, "
"is_traversal, bounty_count, credential_count, fingerprints, "
"commands, updated_at, identity_id) VALUES "
"(?, ?, ?, ?, 0, 0, 0, '[]', '[]', 0, 0, 0, '[]', '[]', ?, ?)",
(
str(uuid.uuid4()),
"203.0.113.6",
datetime.now(timezone.utc).isoformat(),
datetime.now(timezone.utc).isoformat(),
datetime.now(timezone.utc).isoformat(),
"ffffffff-ffff-ffff-ffff-ffffffffffff", # nonexistent identity
),
)
conn.commit()