feat(ttp): E.1.1 schema contract — TTPTag, TTPRule, TTPRuleState, evidence TypedDicts, compute_tag_uuid

First contract commit of TTP tagging. Shapes only — no behavior.

- TTPTag SQLModel: deterministic UUIDv5 PK; (source_kind, source_id)
  discriminated provenance; nullable attacker_uuid + identity_uuid
  with ON DELETE CASCADE; native sqlalchemy.JSON evidence column;
  required attack_release; CheckConstraint('attacker_uuid IS NOT
  NULL OR identity_uuid IS NOT NULL'); composite indexes for the
  primary query patterns (identity_uuid+technique_id,
  attacker_uuid+technique_id, technique_id+created_at); __init__
  guard raising ValueError with both anchor names in the message
  (belt-and-braces for MySQL <8.0.16 where CHECK is silent).
- compute_tag_uuid(): RFC-4122 UUIDv5 over the six tag-identity
  fields under a fixed _TTP_TAG_NS. Pure, deterministic, replay-safe.
- Per-source_kind evidence TypedDicts (CommandEvidence,
  IntelEvidence, EmailEvidence, CanaryFingerprintEvidence) — PII
  rule lives in the type: EmailEvidence has no field for raw rcpt
  addresses or body bytes.
- TTPRule + TTPRuleState tables for the DatabaseRuleStore (E.1.11).
- All symbols re-exported from decnet.web.db.models per the
  package's existing convention.

Tests for invariants (CHECK behavior, evidence round-trip across
SQLite+MySQL, idempotency property, init-guard ordering) land in
E.2.1/E.2.2 with xfail-strict markers per Appendix E discipline.
This commit is contained in:
2026-05-01 06:03:45 -04:00
parent d09764beec
commit ce7efdfdd2
3 changed files with 258 additions and 0 deletions

View File

@@ -185,6 +185,16 @@ from .tarpit import (
TarpitRuleResponse,
TarpitStatusResponse,
)
from .ttp import (
CanaryFingerprintEvidence,
CommandEvidence,
EmailEvidence,
IntelEvidence,
TTPRule,
TTPRuleState,
TTPTag,
compute_tag_uuid,
)
__all__ = [
# _base
@@ -345,4 +355,13 @@ __all__ = [
"TarpitRule",
"TarpitRuleResponse",
"TarpitStatusResponse",
# ttp
"CanaryFingerprintEvidence",
"CommandEvidence",
"EmailEvidence",
"IntelEvidence",
"TTPRule",
"TTPRuleState",
"TTPTag",
"compute_tag_uuid",
]

237
decnet/web/db/models/ttp.py Normal file
View File

@@ -0,0 +1,237 @@
"""TTP-tagging schema — `ttp_tag`, `ttp_rule`, `ttp_rule_state`.
Contract step E.1.1 of `development/TTP_TAGGING.md`. Shapes only — no
behavior. Bus topics, ABCs, factories, RuleEngine, lifters, API, repo,
RuleStore land in subsequent contract commits and import from here.
"""
from __future__ import annotations
import uuid as _uuid
from datetime import datetime, timezone
from typing import Any, Literal, Optional, TypedDict
from sqlalchemy import JSON, CheckConstraint, Column, Index
from sqlmodel import Field, SQLModel
from ._base import _BIG_TEXT
# Fixed namespace for `compute_tag_uuid()`. Derived once from the URL
# namespace + the literal label "decnet:ttp_tag:v1" so every process
# arrives at the same UUID. Pinned golden value is asserted in
# E.2.2 — DO NOT regenerate it; that would silently break replay
# safety for every existing tag UUID. The label in the comment is the
# input; the constant below is the resulting namespace UUID.
_TTP_TAG_NS: _uuid.UUID = _uuid.uuid5(_uuid.NAMESPACE_URL, "decnet:ttp_tag:v1")
def compute_tag_uuid(
source_kind: str,
source_id: str,
rule_id: str,
rule_version: int,
technique_id: str,
sub_technique_id: Optional[str],
) -> str:
"""Deterministic UUIDv5 over the tag-identity tuple.
Inputs are EXACTLY the six fields enumerated in the parameter list
— `(source_kind, source_id, rule_id, rule_version, technique_id,
sub_technique_id)`. Adding `created_at`, a process PID, a random
salt, or any other non-deterministic input breaks replay safety:
the worker re-processing the same source events would write
duplicate rows on every run. The CDD test in E.2.2 introspects
this signature; a contributor must update that test deliberately
to change the input set.
"""
key = "|".join([
source_kind,
source_id,
rule_id,
str(rule_version),
technique_id,
sub_technique_id or "",
])
return str(_uuid.uuid5(_TTP_TAG_NS, key))
# ── Evidence shape contract ─────────────────────────────────────────
# One TypedDict per `source_kind` carried in `TTPTag.evidence`. Adding
# a new `source_kind` means adding a TypedDict here AND a parametrized
# entry in `tests/ttp/test_evidence_shape.py`. The PII discipline
# from the design doc lives in the *type*: `EmailEvidence` has no
# field accommodating raw rcpt addresses or body bytes, so a lifter
# attempting to leak them fails type-check before it can run.
class CommandEvidence(TypedDict):
matched_tokens: list[str]
rule_pattern: str # regex source string, never user input
class IntelEvidence(TypedDict):
intel_uuid: str
provider: Literal["abuseipdb", "greynoise", "feodo", "threatfox"]
category: Optional[int]
score: float # already normalized to [0.0, 1.0]
class EmailEvidence(TypedDict):
body_sha256: str # hash, never raw body
matched_headers: list[str] # header NAMES, not values
rcpt_domain_set: list[str] # domains, not addresses
attachment_sha256s: list[str]
rcpt_count: int
class CanaryFingerprintEvidence(TypedDict):
metric: str # "navigator_webdriver", "canvas_hash", …
matched_signature: str # signature ID, not raw fingerprint blob
# ── Tables ──────────────────────────────────────────────────────────
class TTPTag(SQLModel, table=True):
"""One row per (source-event × MITRE technique × rule)."""
__tablename__ = "ttp_tag"
# RFC-4122 UUIDv5 string, deterministic over
# (source_kind, source_id, rule_id, rule_version, technique_id,
# sub_technique_id) under `_TTP_TAG_NS`. See `compute_tag_uuid()`.
uuid: str = Field(primary_key=True)
# Provenance — discriminator + opaque ID. No FK on `source_id`
# because `source_kind` varies (see design doc "No FK on
# source_id" + "Retention: tags outlive sources").
source_kind: str
source_id: str
# Scope anchors. CHECK constraint requires at least one set.
attacker_uuid: Optional[str] = Field(
default=None,
foreign_key="attackers.uuid",
index=True,
ondelete="CASCADE",
)
identity_uuid: Optional[str] = Field(
default=None,
foreign_key="attacker_identities.uuid",
index=True,
ondelete="CASCADE",
)
session_id: Optional[str] = Field(default=None, index=True)
decky_id: Optional[str] = Field(default=None, index=True)
# ATT&CK
tactic: str = Field(index=True) # "TA0001".."TA0043" / ICS range
technique_id: str = Field(index=True) # "T1110"
sub_technique_id: Optional[str] = Field(default=None, index=True)
# Confidence + evidence
confidence: float
rule_id: str = Field(index=True)
rule_version: int
# Native JSON column, dialect-adaptive (SQLite TEXT, MySQL JSON).
# No `default=`; every insert MUST supply evidence. Per-source_kind
# shape is pinned by the TypedDicts above and tested in E.2.1b.
evidence: dict[str, Any] = Field(
sa_column=Column(JSON, nullable=False),
)
# ATT&CK matrix release the tag was emitted against. REQUIRED —
# technique IDs migrate between releases; a tag without a release
# ID cannot render deterministically in MITRE Navigator.
attack_release: str = Field(index=True)
created_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc),
index=True,
)
__table_args__ = (
# MySQL <8.0.16 parses CHECK but does not enforce it; the
# __init__ guard below covers that gap.
CheckConstraint(
"attacker_uuid IS NOT NULL OR identity_uuid IS NOT NULL",
name="ttp_tag_has_anchor",
),
Index(
"ix_ttp_tag_identity_technique",
"identity_uuid",
"technique_id",
),
Index(
"ix_ttp_tag_attacker_technique",
"attacker_uuid",
"technique_id",
),
Index(
"ix_ttp_tag_technique_created",
"technique_id",
"created_at",
),
)
def __init__(self, **kwargs: Any) -> None:
# Belt-and-braces for MySQL <8.0.16 where CHECK is silently
# ignored. This guard runs BEFORE super().__init__() — i.e.
# before Pydantic field validation — so the failure surfaces
# as a plain `ValueError` with both anchor names in the
# message, not as a generic `ValidationError`. The CDD test
# in E.2.1 introspects this ordering and asserts the message
# contains both substrings; do not "simplify" into a
# `@field_validator` or generic `assert`.
if (
kwargs.get("attacker_uuid") is None
and kwargs.get("identity_uuid") is None
):
raise ValueError(
"ttp_tag requires at least one of attacker_uuid / "
"identity_uuid; both NULL is not a valid anchor."
)
super().__init__(**kwargs)
class TTPRule(SQLModel, table=True):
"""Rule definition mirror — populated by DatabaseRuleStore from
on-disk YAML; FilesystemRuleStore reads disk directly and never
writes here. One row per rule_id."""
__tablename__ = "ttp_rule"
rule_id: str = Field(primary_key=True)
rule_version: int
source_path: str
yaml_content: str = Field(
sa_column=Column("yaml_content", _BIG_TEXT, nullable=False),
)
updated_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc),
)
# Operator who pushed the edit. "filesystem" / "git" for the FS
# store, the admin JWT subject for the DB store.
updated_by: str
class TTPRuleState(SQLModel, table=True):
"""Operational state for a rule (enabled / disabled / clipped).
Separate from TTPRule because state has fast lifecycle (operator
hits a disable button) while definition has slow lifecycle (git
commit + review). The engine merges (definition, state) at
compile time.
"""
__tablename__ = "ttp_rule_state"
rule_id: str = Field(primary_key=True)
state: str # "enabled" | "disabled" | "clipped"
confidence_max: Optional[float] = Field(default=None)
expires_at: Optional[datetime] = Field(default=None)
reason: Optional[str] = Field(default=None)
set_by: Optional[str] = Field(default=None)
set_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc),
)

View File

@@ -2221,6 +2221,8 @@ Contracts ship in this order, one commit per step:
**E.1.1 — Schema contract** (`decnet/web/db/models/ttp.py`)
**Status:** ✅ done.
- `TTPTag` SQLModel with the schema from "Schema" section above,
including: `evidence` as `dict[str, Any]` over a SQLAlchemy JSON
column (`Column(JSON, nullable=False)`); `attack_release` as