First contract commit of TTP tagging. Shapes only — no behavior.
- TTPTag SQLModel: deterministic UUIDv5 PK; (source_kind, source_id)
discriminated provenance; nullable attacker_uuid + identity_uuid
with ON DELETE CASCADE; native sqlalchemy.JSON evidence column;
required attack_release; CheckConstraint('attacker_uuid IS NOT
NULL OR identity_uuid IS NOT NULL'); composite indexes for the
primary query patterns (identity_uuid+technique_id,
attacker_uuid+technique_id, technique_id+created_at); __init__
guard raising ValueError with both anchor names in the message
(belt-and-braces for MySQL <8.0.16 where CHECK is silent).
- compute_tag_uuid(): RFC-4122 UUIDv5 over the six tag-identity
fields under a fixed _TTP_TAG_NS. Pure, deterministic, replay-safe.
- Per-source_kind evidence TypedDicts (CommandEvidence,
IntelEvidence, EmailEvidence, CanaryFingerprintEvidence) — PII
rule lives in the type: EmailEvidence has no field for raw rcpt
addresses or body bytes.
- TTPRule + TTPRuleState tables for the DatabaseRuleStore (E.1.11).
- All symbols re-exported from decnet.web.db.models per the
package's existing convention.
Tests for invariants (CHECK behavior, evidence round-trip across
SQLite+MySQL, idempotency property, init-guard ordering) land in
E.2.1/E.2.2 with xfail-strict markers per Appendix E discipline.
238 lines
8.4 KiB
Python
238 lines
8.4 KiB
Python
"""TTP-tagging schema — `ttp_tag`, `ttp_rule`, `ttp_rule_state`.
|
||
|
||
Contract step E.1.1 of `development/TTP_TAGGING.md`. Shapes only — no
|
||
behavior. Bus topics, ABCs, factories, RuleEngine, lifters, API, repo,
|
||
RuleStore land in subsequent contract commits and import from here.
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import uuid as _uuid
|
||
from datetime import datetime, timezone
|
||
from typing import Any, Literal, Optional, TypedDict
|
||
|
||
from sqlalchemy import JSON, CheckConstraint, Column, Index
|
||
from sqlmodel import Field, SQLModel
|
||
|
||
from ._base import _BIG_TEXT
|
||
|
||
|
||
# Fixed namespace for `compute_tag_uuid()`. Derived once from the URL
|
||
# namespace + the literal label "decnet:ttp_tag:v1" so every process
|
||
# arrives at the same UUID. Pinned golden value is asserted in
|
||
# E.2.2 — DO NOT regenerate it; that would silently break replay
|
||
# safety for every existing tag UUID. The label in the comment is the
|
||
# input; the constant below is the resulting namespace UUID.
|
||
_TTP_TAG_NS: _uuid.UUID = _uuid.uuid5(_uuid.NAMESPACE_URL, "decnet:ttp_tag:v1")
|
||
|
||
|
||
def compute_tag_uuid(
|
||
source_kind: str,
|
||
source_id: str,
|
||
rule_id: str,
|
||
rule_version: int,
|
||
technique_id: str,
|
||
sub_technique_id: Optional[str],
|
||
) -> str:
|
||
"""Deterministic UUIDv5 over the tag-identity tuple.
|
||
|
||
Inputs are EXACTLY the six fields enumerated in the parameter list
|
||
— `(source_kind, source_id, rule_id, rule_version, technique_id,
|
||
sub_technique_id)`. Adding `created_at`, a process PID, a random
|
||
salt, or any other non-deterministic input breaks replay safety:
|
||
the worker re-processing the same source events would write
|
||
duplicate rows on every run. The CDD test in E.2.2 introspects
|
||
this signature; a contributor must update that test deliberately
|
||
to change the input set.
|
||
"""
|
||
key = "|".join([
|
||
source_kind,
|
||
source_id,
|
||
rule_id,
|
||
str(rule_version),
|
||
technique_id,
|
||
sub_technique_id or "",
|
||
])
|
||
return str(_uuid.uuid5(_TTP_TAG_NS, key))
|
||
|
||
|
||
# ── Evidence shape contract ─────────────────────────────────────────
|
||
# One TypedDict per `source_kind` carried in `TTPTag.evidence`. Adding
|
||
# a new `source_kind` means adding a TypedDict here AND a parametrized
|
||
# entry in `tests/ttp/test_evidence_shape.py`. The PII discipline
|
||
# from the design doc lives in the *type*: `EmailEvidence` has no
|
||
# field accommodating raw rcpt addresses or body bytes, so a lifter
|
||
# attempting to leak them fails type-check before it can run.
|
||
|
||
class CommandEvidence(TypedDict):
|
||
matched_tokens: list[str]
|
||
rule_pattern: str # regex source string, never user input
|
||
|
||
|
||
class IntelEvidence(TypedDict):
|
||
intel_uuid: str
|
||
provider: Literal["abuseipdb", "greynoise", "feodo", "threatfox"]
|
||
category: Optional[int]
|
||
score: float # already normalized to [0.0, 1.0]
|
||
|
||
|
||
class EmailEvidence(TypedDict):
|
||
body_sha256: str # hash, never raw body
|
||
matched_headers: list[str] # header NAMES, not values
|
||
rcpt_domain_set: list[str] # domains, not addresses
|
||
attachment_sha256s: list[str]
|
||
rcpt_count: int
|
||
|
||
|
||
class CanaryFingerprintEvidence(TypedDict):
|
||
metric: str # "navigator_webdriver", "canvas_hash", …
|
||
matched_signature: str # signature ID, not raw fingerprint blob
|
||
|
||
|
||
# ── Tables ──────────────────────────────────────────────────────────
|
||
|
||
|
||
class TTPTag(SQLModel, table=True):
|
||
"""One row per (source-event × MITRE technique × rule)."""
|
||
|
||
__tablename__ = "ttp_tag"
|
||
|
||
# RFC-4122 UUIDv5 string, deterministic over
|
||
# (source_kind, source_id, rule_id, rule_version, technique_id,
|
||
# sub_technique_id) under `_TTP_TAG_NS`. See `compute_tag_uuid()`.
|
||
uuid: str = Field(primary_key=True)
|
||
|
||
# Provenance — discriminator + opaque ID. No FK on `source_id`
|
||
# because `source_kind` varies (see design doc "No FK on
|
||
# source_id" + "Retention: tags outlive sources").
|
||
source_kind: str
|
||
source_id: str
|
||
|
||
# Scope anchors. CHECK constraint requires at least one set.
|
||
attacker_uuid: Optional[str] = Field(
|
||
default=None,
|
||
foreign_key="attackers.uuid",
|
||
index=True,
|
||
ondelete="CASCADE",
|
||
)
|
||
identity_uuid: Optional[str] = Field(
|
||
default=None,
|
||
foreign_key="attacker_identities.uuid",
|
||
index=True,
|
||
ondelete="CASCADE",
|
||
)
|
||
session_id: Optional[str] = Field(default=None, index=True)
|
||
decky_id: Optional[str] = Field(default=None, index=True)
|
||
|
||
# ATT&CK
|
||
tactic: str = Field(index=True) # "TA0001".."TA0043" / ICS range
|
||
technique_id: str = Field(index=True) # "T1110"
|
||
sub_technique_id: Optional[str] = Field(default=None, index=True)
|
||
|
||
# Confidence + evidence
|
||
confidence: float
|
||
rule_id: str = Field(index=True)
|
||
rule_version: int
|
||
|
||
# Native JSON column, dialect-adaptive (SQLite TEXT, MySQL JSON).
|
||
# No `default=`; every insert MUST supply evidence. Per-source_kind
|
||
# shape is pinned by the TypedDicts above and tested in E.2.1b.
|
||
evidence: dict[str, Any] = Field(
|
||
sa_column=Column(JSON, nullable=False),
|
||
)
|
||
|
||
# ATT&CK matrix release the tag was emitted against. REQUIRED —
|
||
# technique IDs migrate between releases; a tag without a release
|
||
# ID cannot render deterministically in MITRE Navigator.
|
||
attack_release: str = Field(index=True)
|
||
|
||
created_at: datetime = Field(
|
||
default_factory=lambda: datetime.now(timezone.utc),
|
||
index=True,
|
||
)
|
||
|
||
__table_args__ = (
|
||
# MySQL <8.0.16 parses CHECK but does not enforce it; the
|
||
# __init__ guard below covers that gap.
|
||
CheckConstraint(
|
||
"attacker_uuid IS NOT NULL OR identity_uuid IS NOT NULL",
|
||
name="ttp_tag_has_anchor",
|
||
),
|
||
Index(
|
||
"ix_ttp_tag_identity_technique",
|
||
"identity_uuid",
|
||
"technique_id",
|
||
),
|
||
Index(
|
||
"ix_ttp_tag_attacker_technique",
|
||
"attacker_uuid",
|
||
"technique_id",
|
||
),
|
||
Index(
|
||
"ix_ttp_tag_technique_created",
|
||
"technique_id",
|
||
"created_at",
|
||
),
|
||
)
|
||
|
||
def __init__(self, **kwargs: Any) -> None:
|
||
# Belt-and-braces for MySQL <8.0.16 where CHECK is silently
|
||
# ignored. This guard runs BEFORE super().__init__() — i.e.
|
||
# before Pydantic field validation — so the failure surfaces
|
||
# as a plain `ValueError` with both anchor names in the
|
||
# message, not as a generic `ValidationError`. The CDD test
|
||
# in E.2.1 introspects this ordering and asserts the message
|
||
# contains both substrings; do not "simplify" into a
|
||
# `@field_validator` or generic `assert`.
|
||
if (
|
||
kwargs.get("attacker_uuid") is None
|
||
and kwargs.get("identity_uuid") is None
|
||
):
|
||
raise ValueError(
|
||
"ttp_tag requires at least one of attacker_uuid / "
|
||
"identity_uuid; both NULL is not a valid anchor."
|
||
)
|
||
super().__init__(**kwargs)
|
||
|
||
|
||
class TTPRule(SQLModel, table=True):
|
||
"""Rule definition mirror — populated by DatabaseRuleStore from
|
||
on-disk YAML; FilesystemRuleStore reads disk directly and never
|
||
writes here. One row per rule_id."""
|
||
|
||
__tablename__ = "ttp_rule"
|
||
|
||
rule_id: str = Field(primary_key=True)
|
||
rule_version: int
|
||
source_path: str
|
||
yaml_content: str = Field(
|
||
sa_column=Column("yaml_content", _BIG_TEXT, nullable=False),
|
||
)
|
||
updated_at: datetime = Field(
|
||
default_factory=lambda: datetime.now(timezone.utc),
|
||
)
|
||
# Operator who pushed the edit. "filesystem" / "git" for the FS
|
||
# store, the admin JWT subject for the DB store.
|
||
updated_by: str
|
||
|
||
|
||
class TTPRuleState(SQLModel, table=True):
|
||
"""Operational state for a rule (enabled / disabled / clipped).
|
||
|
||
Separate from TTPRule because state has fast lifecycle (operator
|
||
hits a disable button) while definition has slow lifecycle (git
|
||
commit + review). The engine merges (definition, state) at
|
||
compile time.
|
||
"""
|
||
|
||
__tablename__ = "ttp_rule_state"
|
||
|
||
rule_id: str = Field(primary_key=True)
|
||
state: str # "enabled" | "disabled" | "clipped"
|
||
confidence_max: Optional[float] = Field(default=None)
|
||
expires_at: Optional[datetime] = Field(default=None)
|
||
reason: Optional[str] = Field(default=None)
|
||
set_by: Optional[str] = Field(default=None)
|
||
set_at: datetime = Field(
|
||
default_factory=lambda: datetime.now(timezone.utc),
|
||
)
|