diff --git a/decnet/web/db/models/__init__.py b/decnet/web/db/models/__init__.py index 10ea3d18..a5338f3d 100644 --- a/decnet/web/db/models/__init__.py +++ b/decnet/web/db/models/__init__.py @@ -185,6 +185,16 @@ from .tarpit import ( TarpitRuleResponse, TarpitStatusResponse, ) +from .ttp import ( + CanaryFingerprintEvidence, + CommandEvidence, + EmailEvidence, + IntelEvidence, + TTPRule, + TTPRuleState, + TTPTag, + compute_tag_uuid, +) __all__ = [ # _base @@ -345,4 +355,13 @@ __all__ = [ "TarpitRule", "TarpitRuleResponse", "TarpitStatusResponse", + # ttp + "CanaryFingerprintEvidence", + "CommandEvidence", + "EmailEvidence", + "IntelEvidence", + "TTPRule", + "TTPRuleState", + "TTPTag", + "compute_tag_uuid", ] diff --git a/decnet/web/db/models/ttp.py b/decnet/web/db/models/ttp.py new file mode 100644 index 00000000..fc22160d --- /dev/null +++ b/decnet/web/db/models/ttp.py @@ -0,0 +1,237 @@ +"""TTP-tagging schema — `ttp_tag`, `ttp_rule`, `ttp_rule_state`. + +Contract step E.1.1 of `development/TTP_TAGGING.md`. Shapes only — no +behavior. Bus topics, ABCs, factories, RuleEngine, lifters, API, repo, +RuleStore land in subsequent contract commits and import from here. +""" +from __future__ import annotations + +import uuid as _uuid +from datetime import datetime, timezone +from typing import Any, Literal, Optional, TypedDict + +from sqlalchemy import JSON, CheckConstraint, Column, Index +from sqlmodel import Field, SQLModel + +from ._base import _BIG_TEXT + + +# Fixed namespace for `compute_tag_uuid()`. Derived once from the URL +# namespace + the literal label "decnet:ttp_tag:v1" so every process +# arrives at the same UUID. Pinned golden value is asserted in +# E.2.2 — DO NOT regenerate it; that would silently break replay +# safety for every existing tag UUID. The label in the comment is the +# input; the constant below is the resulting namespace UUID. +_TTP_TAG_NS: _uuid.UUID = _uuid.uuid5(_uuid.NAMESPACE_URL, "decnet:ttp_tag:v1") + + +def compute_tag_uuid( + source_kind: str, + source_id: str, + rule_id: str, + rule_version: int, + technique_id: str, + sub_technique_id: Optional[str], +) -> str: + """Deterministic UUIDv5 over the tag-identity tuple. + + Inputs are EXACTLY the six fields enumerated in the parameter list + — `(source_kind, source_id, rule_id, rule_version, technique_id, + sub_technique_id)`. Adding `created_at`, a process PID, a random + salt, or any other non-deterministic input breaks replay safety: + the worker re-processing the same source events would write + duplicate rows on every run. The CDD test in E.2.2 introspects + this signature; a contributor must update that test deliberately + to change the input set. + """ + key = "|".join([ + source_kind, + source_id, + rule_id, + str(rule_version), + technique_id, + sub_technique_id or "", + ]) + return str(_uuid.uuid5(_TTP_TAG_NS, key)) + + +# ── Evidence shape contract ───────────────────────────────────────── +# One TypedDict per `source_kind` carried in `TTPTag.evidence`. Adding +# a new `source_kind` means adding a TypedDict here AND a parametrized +# entry in `tests/ttp/test_evidence_shape.py`. The PII discipline +# from the design doc lives in the *type*: `EmailEvidence` has no +# field accommodating raw rcpt addresses or body bytes, so a lifter +# attempting to leak them fails type-check before it can run. + +class CommandEvidence(TypedDict): + matched_tokens: list[str] + rule_pattern: str # regex source string, never user input + + +class IntelEvidence(TypedDict): + intel_uuid: str + provider: Literal["abuseipdb", "greynoise", "feodo", "threatfox"] + category: Optional[int] + score: float # already normalized to [0.0, 1.0] + + +class EmailEvidence(TypedDict): + body_sha256: str # hash, never raw body + matched_headers: list[str] # header NAMES, not values + rcpt_domain_set: list[str] # domains, not addresses + attachment_sha256s: list[str] + rcpt_count: int + + +class CanaryFingerprintEvidence(TypedDict): + metric: str # "navigator_webdriver", "canvas_hash", … + matched_signature: str # signature ID, not raw fingerprint blob + + +# ── Tables ────────────────────────────────────────────────────────── + + +class TTPTag(SQLModel, table=True): + """One row per (source-event × MITRE technique × rule).""" + + __tablename__ = "ttp_tag" + + # RFC-4122 UUIDv5 string, deterministic over + # (source_kind, source_id, rule_id, rule_version, technique_id, + # sub_technique_id) under `_TTP_TAG_NS`. See `compute_tag_uuid()`. + uuid: str = Field(primary_key=True) + + # Provenance — discriminator + opaque ID. No FK on `source_id` + # because `source_kind` varies (see design doc "No FK on + # source_id" + "Retention: tags outlive sources"). + source_kind: str + source_id: str + + # Scope anchors. CHECK constraint requires at least one set. + attacker_uuid: Optional[str] = Field( + default=None, + foreign_key="attackers.uuid", + index=True, + ondelete="CASCADE", + ) + identity_uuid: Optional[str] = Field( + default=None, + foreign_key="attacker_identities.uuid", + index=True, + ondelete="CASCADE", + ) + session_id: Optional[str] = Field(default=None, index=True) + decky_id: Optional[str] = Field(default=None, index=True) + + # ATT&CK + tactic: str = Field(index=True) # "TA0001".."TA0043" / ICS range + technique_id: str = Field(index=True) # "T1110" + sub_technique_id: Optional[str] = Field(default=None, index=True) + + # Confidence + evidence + confidence: float + rule_id: str = Field(index=True) + rule_version: int + + # Native JSON column, dialect-adaptive (SQLite TEXT, MySQL JSON). + # No `default=`; every insert MUST supply evidence. Per-source_kind + # shape is pinned by the TypedDicts above and tested in E.2.1b. + evidence: dict[str, Any] = Field( + sa_column=Column(JSON, nullable=False), + ) + + # ATT&CK matrix release the tag was emitted against. REQUIRED — + # technique IDs migrate between releases; a tag without a release + # ID cannot render deterministically in MITRE Navigator. + attack_release: str = Field(index=True) + + created_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), + index=True, + ) + + __table_args__ = ( + # MySQL <8.0.16 parses CHECK but does not enforce it; the + # __init__ guard below covers that gap. + CheckConstraint( + "attacker_uuid IS NOT NULL OR identity_uuid IS NOT NULL", + name="ttp_tag_has_anchor", + ), + Index( + "ix_ttp_tag_identity_technique", + "identity_uuid", + "technique_id", + ), + Index( + "ix_ttp_tag_attacker_technique", + "attacker_uuid", + "technique_id", + ), + Index( + "ix_ttp_tag_technique_created", + "technique_id", + "created_at", + ), + ) + + def __init__(self, **kwargs: Any) -> None: + # Belt-and-braces for MySQL <8.0.16 where CHECK is silently + # ignored. This guard runs BEFORE super().__init__() — i.e. + # before Pydantic field validation — so the failure surfaces + # as a plain `ValueError` with both anchor names in the + # message, not as a generic `ValidationError`. The CDD test + # in E.2.1 introspects this ordering and asserts the message + # contains both substrings; do not "simplify" into a + # `@field_validator` or generic `assert`. + if ( + kwargs.get("attacker_uuid") is None + and kwargs.get("identity_uuid") is None + ): + raise ValueError( + "ttp_tag requires at least one of attacker_uuid / " + "identity_uuid; both NULL is not a valid anchor." + ) + super().__init__(**kwargs) + + +class TTPRule(SQLModel, table=True): + """Rule definition mirror — populated by DatabaseRuleStore from + on-disk YAML; FilesystemRuleStore reads disk directly and never + writes here. One row per rule_id.""" + + __tablename__ = "ttp_rule" + + rule_id: str = Field(primary_key=True) + rule_version: int + source_path: str + yaml_content: str = Field( + sa_column=Column("yaml_content", _BIG_TEXT, nullable=False), + ) + updated_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), + ) + # Operator who pushed the edit. "filesystem" / "git" for the FS + # store, the admin JWT subject for the DB store. + updated_by: str + + +class TTPRuleState(SQLModel, table=True): + """Operational state for a rule (enabled / disabled / clipped). + + Separate from TTPRule because state has fast lifecycle (operator + hits a disable button) while definition has slow lifecycle (git + commit + review). The engine merges (definition, state) at + compile time. + """ + + __tablename__ = "ttp_rule_state" + + rule_id: str = Field(primary_key=True) + state: str # "enabled" | "disabled" | "clipped" + confidence_max: Optional[float] = Field(default=None) + expires_at: Optional[datetime] = Field(default=None) + reason: Optional[str] = Field(default=None) + set_by: Optional[str] = Field(default=None) + set_at: datetime = Field( + default_factory=lambda: datetime.now(timezone.utc), + ) diff --git a/development/TTP_TAGGING.md b/development/TTP_TAGGING.md index 1610a9b2..956ed1d9 100644 --- a/development/TTP_TAGGING.md +++ b/development/TTP_TAGGING.md @@ -2221,6 +2221,8 @@ Contracts ship in this order, one commit per step: **E.1.1 — Schema contract** (`decnet/web/db/models/ttp.py`) +**Status:** ✅ done. + - `TTPTag` SQLModel with the schema from "Schema" section above, including: `evidence` as `dict[str, Any]` over a SQLAlchemy JSON column (`Column(JSON, nullable=False)`); `attack_release` as