diff --git a/decnet/ttp/misp_export.py b/decnet/ttp/misp_export.py index 83f4d3ad..6863fbfa 100644 --- a/decnet/ttp/misp_export.py +++ b/decnet/ttp/misp_export.py @@ -47,6 +47,7 @@ def build_attacker_misp_event( artifacts: list[dict[str, Any]], smtp_targets: list[dict[str, Any]], commands: list[str] | None = None, + observations: list[dict[str, Any]] | None = None, ) -> dict[str, Any]: """Return a MISP event dict for *attacker*. @@ -63,6 +64,7 @@ def build_attacker_misp_event( artifacts=artifacts, smtp_targets=smtp_targets, commands=commands, + observations=observations, ) return _parse_bundle(bundle) @@ -70,6 +72,7 @@ def build_attacker_misp_event( def build_fleet_misp_collection( rows: list[dict[str, Any]], ttp_by_attacker: dict[str, list[dict[str, Any]]], + observations_by_attacker: dict[str, list[dict[str, Any]]] | None = None, ) -> dict[str, Any]: """Return a MISP collection dict with one event per attacker in *rows*. @@ -80,6 +83,7 @@ def build_fleet_misp_collection( attacker always has at least an IP) are silently omitted. """ events: list[dict[str, Any]] = [] + obs_map = observations_by_attacker or {} for row in rows: raw_cmds = row.get("commands") or [] if isinstance(raw_cmds, str): @@ -102,6 +106,7 @@ def build_fleet_misp_collection( artifacts=[], smtp_targets=[], commands=cmds, + observations=obs_map.get(row["uuid"]), ) event = _parse_bundle(bundle) if event: diff --git a/decnet/ttp/stix_custom.py b/decnet/ttp/stix_custom.py new file mode 100644 index 00000000..70a492bd --- /dev/null +++ b/decnet/ttp/stix_custom.py @@ -0,0 +1,100 @@ +"""DECNET-defined STIX 2.1 custom extension and object types. + +Import this module before parsing any DECNET-produced bundle so the types are +registered with the stix2 library and ``stix2.parse(bundle, allow_custom=True)`` +rebuilds them as typed objects rather than opaque dicts. + +Classes +------- +DecnetActorFingerprintExt + ``@CustomExtension`` on ThreatActor — carries ``network_behavior`` + (TCP/TLS/SSH sniffer rollup) and ``protocol_fingerprints`` (hashes + + raw orderings). +XDecnetBehaveProfile + ``@CustomObject`` — autonomous STIX SDO carrying the BEHAVE-SHELL + observation stream for one attacker. Referenced from ThreatActor via + ``x_decnet_behave_profile_ref``. + +Constants +--------- +ACTOR_FINGERPRINT_EXT_ID : str + Fixed ``extension-definition--`` ID for ``DecnetActorFingerprintExt``. +FINGERPRINT_EXT_DEF : stix2.ExtensionDefinition + Singleton ``extension-definition`` SDO — add to every bundle that uses + the fingerprint extension. +""" +from __future__ import annotations + +import uuid as _uuid + +import stix2 +from stix2 import CustomExtension, CustomObject, ExtensionDefinition +from stix2 import properties as _P + +_NS = _uuid.UUID("b5d2c3a1-8f4e-4d1b-9a6c-0e7f5b3d2c1a") + +# Stable ID for the actor fingerprint extension-definition SDO. +ACTOR_FINGERPRINT_EXT_ID: str = ( + f"extension-definition--{_uuid.uuid5(_NS, 'decnet-actor-fingerprint-v1')}" +) + +_DECNET_ORG_ID = f"identity--{_uuid.uuid5(_NS, 'decnet-honeypot')}" + + +@CustomExtension( + ACTOR_FINGERPRINT_EXT_ID, + [ + ("extension_type", _P.StringProperty(required=True)), + ("network_behavior", _P.DictionaryProperty()), + ("protocol_fingerprints", _P.DictionaryProperty()), + ], +) +class DecnetActorFingerprintExt: + """Property extension on ThreatActor. + + ``network_behavior`` keys: os_guess, hop_distance, tcp_fingerprint, + retransmit_count, timing_stats, phase_sequence, behavior_class, + beacon_interval_s, beacon_jitter_pct, tool_guesses. + + ``protocol_fingerprints`` keys: ja3_hashes, hassh_hashes, kex_order_raw, + ssh_client_banners, tls_cert_sha256, payload_simhashes, c2_endpoints. + """ + + +@CustomObject( + "x-decnet-behave-profile", + [ + ("schema_version", _P.IntegerProperty()), + ("kd_digraph_simhash", _P.StringProperty()), + ("observations", _P.ListProperty(_P.DictionaryProperty())), + ], +) +class XDecnetBehaveProfile: + """BEHAVE-SHELL observation stream for one attacker. + + ``observations`` is a list of BEHAVE envelope dicts with keys: + primitive, value, confidence, window (start_ts/end_ts), source, + evidence_ref, identity_ref (optional). + + ``schema_version`` matches ``OBSERVATION_SCHEMA_VERSION`` from + decnet_behave_shell.spec.envelope — bump when the envelope schema changes. + + ``kd_digraph_simhash`` is the 8-byte digraph SimHash from + AttackerIdentity, hex-encoded. Null when identity has not been clustered. + """ + + +# Singleton extension-definition SDO. +FINGERPRINT_EXT_DEF: stix2.ExtensionDefinition = ExtensionDefinition( + id=ACTOR_FINGERPRINT_EXT_ID, + name="DECNET Actor Fingerprint", + description=( + "Extends ThreatActor with DECNET-observed network behavior " + "(TCP/TLS/SSH stack-level fingerprints, IAT timing, phase sequence) " + "and BEHAVE-SHELL keystroke-dynamics observation primitives." + ), + schema="https://decnet.dev/schemas/actor-fingerprint/v1", + version="1.0.0", + extension_types=["property-extension"], + created_by_ref=_DECNET_ORG_ID, +) diff --git a/decnet/ttp/stix_export.py b/decnet/ttp/stix_export.py index 5d36b450..433ba78e 100644 --- a/decnet/ttp/stix_export.py +++ b/decnet/ttp/stix_export.py @@ -23,6 +23,7 @@ public ATT&CK bundle by any consumer that already has it. """ from __future__ import annotations +import base64 import json import uuid as _uuid from datetime import datetime, timezone @@ -31,6 +32,12 @@ from typing import Any import stix2 from decnet.ttp import attack_stix +from decnet.ttp.stix_custom import ( + ACTOR_FINGERPRINT_EXT_ID, + FINGERPRINT_EXT_DEF, + DecnetActorFingerprintExt, + XDecnetBehaveProfile, +) # Deterministic DECNET org identity ID — stable across all bundles this # instance produces. Consumers can correlate across exports. @@ -57,15 +64,32 @@ def _decnet_org() -> stix2.Identity: ) +def _parse_json_field(v: Any) -> Any: + """JSON-decode strings; return non-strings unchanged.""" + if isinstance(v, str): + try: + return json.loads(v) + except Exception: + return v + return v + + def _threat_actor( attacker: dict[str, Any], identity: dict[str, Any] | None, created_by: str, -) -> stix2.ThreatActor: + behavior: dict[str, Any] | None = None, + observations: list[dict[str, Any]] | None = None, +) -> tuple[stix2.ThreatActor, "XDecnetBehaveProfile | None"]: + """Build a ThreatActor SDO plus an optional XDecnetBehaveProfile SDO. + + Returns ``(threat_actor, behave_profile_or_None)``. + """ if identity: name = f"DECNET-identity-{identity['uuid'][:8]}" else: name = f"DECNET-attacker-{attacker['uuid'][:8]}" + kwargs: dict[str, Any] = dict( id=f"threat-actor--{_uuid.uuid5(_NS, attacker['uuid'])}", name=name, @@ -73,20 +97,84 @@ def _threat_actor( created_by_ref=created_by, allow_custom=True, ) + + # Tier 1 — stable scalars if attacker.get("country_code"): kwargs["x_decnet_country_code"] = attacker["country_code"] if attacker.get("asn"): kwargs["x_decnet_asn"] = attacker["asn"] if attacker.get("as_name"): kwargs["x_decnet_as_name"] = attacker["as_name"] + + # Tier 2 — DecnetActorFingerprintExt (network_behavior + protocol_fingerprints) + network_behavior: dict[str, Any] = {} + protocol_fingerprints: dict[str, Any] = {} + + if behavior: + for key in ("os_guess", "hop_distance", "retransmit_count", + "behavior_class", "beacon_interval_s", "beacon_jitter_pct"): + v = behavior.get(key) + if v is not None: + network_behavior[key] = v + for key in ("tcp_fingerprint", "timing_stats", "phase_sequence", "tool_guesses"): + v = _parse_json_field(behavior.get(key)) + if v: + network_behavior[key] = v + for key in ("kex_order_raw", "ssh_client_banners"): + v = _parse_json_field(behavior.get(key)) + if v: + protocol_fingerprints[key] = v + if identity: - if identity.get("ja3_hashes"): - kwargs["x_decnet_ja3_hashes"] = identity["ja3_hashes"] - if identity.get("hassh_hashes"): - kwargs["x_decnet_hassh_hashes"] = identity["hassh_hashes"] + for key in ("ja3_hashes", "hassh_hashes", "tls_cert_sha256", "payload_simhashes"): + v = _parse_json_field(identity.get(key)) + if v: + protocol_fingerprints[key] = v if identity.get("c2_endpoints"): - kwargs["x_decnet_c2_endpoints"] = identity["c2_endpoints"] - return stix2.ThreatActor(**kwargs) + protocol_fingerprints["c2_endpoints"] = _parse_json_field( + identity["c2_endpoints"] + ) + + if network_behavior or protocol_fingerprints: + ext_kwargs: dict[str, Any] = {"extension_type": "property-extension"} + if network_behavior: + ext_kwargs["network_behavior"] = network_behavior + if protocol_fingerprints: + ext_kwargs["protocol_fingerprints"] = protocol_fingerprints + kwargs["extensions"] = { + ACTOR_FINGERPRINT_EXT_ID: DecnetActorFingerprintExt(**ext_kwargs), + } + + # Tier 3 — XDecnetBehaveProfile (BEHAVE observations) + behave_profile: XDecnetBehaveProfile | None = None + kd_hash: str | None = None + if identity: + raw_kd = identity.get("kd_digraph_simhash") + if raw_kd is not None: + if isinstance(raw_kd, (bytes, bytearray)): + kd_hash = raw_kd.hex() + elif isinstance(raw_kd, str) and raw_kd: + try: + kd_hash = base64.b64decode(raw_kd).hex() + except Exception: + kd_hash = raw_kd + + obs_list = observations or [] + if obs_list or kd_hash is not None: + from decnet_behave_shell.spec.envelope import OBSERVATION_SCHEMA_VERSION + profile_id = ( + f"x-decnet-behave-profile--{_uuid.uuid5(_NS, attacker['uuid'])}" + ) + behave_profile = XDecnetBehaveProfile( # type: ignore[call-arg] + id=profile_id, + created_by_ref=created_by, + schema_version=OBSERVATION_SCHEMA_VERSION, + kd_digraph_simhash=kd_hash, + observations=obs_list, + ) + kwargs["x_decnet_behave_profile_ref"] = profile_id + + return stix2.ThreatActor(**kwargs), behave_profile def _attack_pattern_sdo(technique_id: str, created_by: str) -> stix2.AttackPattern | None: @@ -157,6 +245,7 @@ def build_attacker_bundle( artifacts: list[dict[str, Any]], smtp_targets: list[dict[str, Any]], commands: list[str] | None = None, + observations: list[dict[str, Any]] | None = None, ) -> stix2.Bundle: """Assemble a STIX 2.1 Bundle for *attacker*. @@ -185,9 +274,16 @@ def build_attacker_bundle( ) objs.append(ip_obs) - # ── Threat actor ───────────────────────────────────────────────── - ta = _threat_actor(attacker, identity, org.id) + # ── Threat actor + BEHAVE profile ──────────────────────────────── + ta, behave_profile = _threat_actor( + attacker, identity, org.id, + behavior=behavior, + observations=observations, + ) objs.append(ta) + if behave_profile is not None: + objs.append(behave_profile) + objs.append(FINGERPRINT_EXT_DEF) # ── ATT&CK — attack-patterns + uses relationships + sightings ─── # Build per-technique once; sightings reference the same AP STIX ID. @@ -315,6 +411,7 @@ def build_attacker_bundle( def build_fleet_bundle( rows: list[dict[str, Any]], ttp_by_attacker: dict[str, list[dict[str, Any]]], + observations_by_attacker: dict[str, list[dict[str, Any]]] | None = None, ) -> stix2.Bundle: """Assemble a STIX 2.1 Bundle covering all attackers in *rows*. @@ -324,6 +421,7 @@ def build_fleet_bundle( (too verbose; use the per-attacker endpoint for full fidelity). """ objs_by_id: dict[str, Any] = {} + obs_map = observations_by_attacker or {} for row in rows: raw_cmds = row.get("commands") or [] @@ -349,6 +447,7 @@ def build_fleet_bundle( artifacts=[], smtp_targets=[], commands=cmds, + observations=obs_map.get(row["uuid"]), ) for obj in bundle.objects: objs_by_id[obj.id] = obj diff --git a/decnet/web/db/repository.py b/decnet/web/db/repository.py index addd2b75..6d003f13 100644 --- a/decnet/web/db/repository.py +++ b/decnet/web/db/repository.py @@ -365,6 +365,35 @@ class BaseRepository(ABC): """ raise NotImplementedError + @abstractmethod + async def list_observations_by_attacker( + self, attacker_uuid: str, + ) -> list[dict[str, Any]]: + """All BEHAVE-SHELL observations for *attacker_uuid*, ordered by + ``window_end_ts`` ASC, shaped as BEHAVE envelope dicts. + + Each dict carries: ``primitive``, ``value``, ``confidence``, + ``window`` (``{start_ts, end_ts}``), ``source``, ``evidence_ref``, + and (when present) ``identity_ref``. + + Empty list when the attacker has no observations. Used by the STIX + export to populate ``XDecnetBehaveProfile.observations``. + """ + raise NotImplementedError + + @abstractmethod + async def get_all_observations_for_export( + self, + ) -> dict[str, list[dict[str, Any]]]: + """Return ``{attacker_uuid: [observation_envelope, ...]}`` for all + attackers that have BEHAVE observations. + + Used by the fleet STIX export to attach per-attacker observation + streams without N+1 queries. Same envelope shape as + ``list_observations_by_attacker``. + """ + raise NotImplementedError + async def upsert_observed_attachment( self, *, diff --git a/decnet/web/db/sqlmodel_repo/observations.py b/decnet/web/db/sqlmodel_repo/observations.py index 2a90c9f7..ddd7bceb 100644 --- a/decnet/web/db/sqlmodel_repo/observations.py +++ b/decnet/web/db/sqlmodel_repo/observations.py @@ -29,6 +29,21 @@ from decnet.web.db.models import Attacker, ObservationRow from decnet.web.db.sqlmodel_repo._helpers import _MixinBase +def _to_envelope(row: "ObservationRow") -> dict: + """Map an ObservationRow to a BEHAVE envelope dict for STIX export.""" + d: dict = { + "primitive": row.primitive, + "value": row.value, + "confidence": row.confidence, + "window": {"start_ts": row.window_start_ts, "end_ts": row.window_end_ts}, + "source": row.source, + "evidence_ref": row.evidence_ref, + } + if row.identity_ref is not None: + d["identity_ref"] = row.identity_ref + return d + + class ObservationsMixin(_MixinBase): """Mixin: methods composed onto :class:`SQLModelRepository`.""" @@ -209,6 +224,36 @@ class ObservationsMixin(_MixinBase): ) return (await session.execute(stmt)).scalar_one_or_none() is not None + async def list_observations_by_attacker( + self, attacker_uuid: str, + ) -> list[dict[str, Any]]: + """All observations for *attacker_uuid*, ordered by ``window_end_ts`` + ASC, shaped as BEHAVE envelope dicts. + """ + async with self._session() as session: + stmt = ( + select(ObservationRow) + .where(ObservationRow.attacker_uuid == attacker_uuid) + .order_by(ObservationRow.window_end_ts) + ) + rows = (await session.execute(stmt)).scalars().all() + return [_to_envelope(row) for row in rows] + + async def get_all_observations_for_export( + self, + ) -> dict[str, list[dict[str, Any]]]: + """Return ``{attacker_uuid: [envelope, ...]}`` for all attackers.""" + async with self._session() as session: + stmt = ( + select(ObservationRow) + .order_by(ObservationRow.attacker_uuid, ObservationRow.window_end_ts) + ) + rows = (await session.execute(stmt)).scalars().all() + result: dict[str, list[dict[str, Any]]] = {} + for row in rows: + result.setdefault(row.attacker_uuid, []).append(_to_envelope(row)) + return result + # Order desc(ts) reserved as the most-recent-first listing if a # paginated UI surface lands later. Not exposed today; named here # so a future grep finds the canonical desc-ts pattern. diff --git a/decnet/web/router/attackers/api_export_attacker_misp.py b/decnet/web/router/attackers/api_export_attacker_misp.py index 4b04ff41..beb9e9ff 100644 --- a/decnet/web/router/attackers/api_export_attacker_misp.py +++ b/decnet/web/router/attackers/api_export_attacker_misp.py @@ -65,6 +65,7 @@ async def api_export_attacker_misp( repo.get_attacker_artifacts(uuid), repo.list_smtp_targets(uuid), repo.list_attacker_commands_deduped(uuid), + repo.list_observations_by_attacker(uuid), ) behavior = cast(dict[str, Any] | None, results[0]) identity = cast(dict[str, Any] | None, results[1]) @@ -74,6 +75,7 @@ async def api_export_attacker_misp( artifacts = cast(list[dict[str, Any]], results[5]) smtp_targets = cast(list[dict[str, Any]], results[6]) commands = cast(list[str], results[7]) + observations = cast(list[dict[str, Any]], results[8]) event = build_attacker_misp_event( attacker=attacker, @@ -88,6 +90,7 @@ async def api_export_attacker_misp( artifacts=artifacts, smtp_targets=smtp_targets, commands=commands, + observations=observations, ) return Response( content=json.dumps(event, default=str), diff --git a/decnet/web/router/attackers/api_export_attacker_stix.py b/decnet/web/router/attackers/api_export_attacker_stix.py index c4792633..70670aaf 100644 --- a/decnet/web/router/attackers/api_export_attacker_stix.py +++ b/decnet/web/router/attackers/api_export_attacker_stix.py @@ -69,6 +69,7 @@ async def api_export_attacker_stix( repo.get_attacker_artifacts(uuid), repo.list_smtp_targets(uuid), repo.list_attacker_commands_deduped(uuid), + repo.list_observations_by_attacker(uuid), ) behavior = cast(dict[str, Any] | None, results[0]) identity = cast(dict[str, Any] | None, results[1]) @@ -78,6 +79,7 @@ async def api_export_attacker_stix( artifacts = cast(list[dict[str, Any]], results[5]) smtp_targets = cast(list[dict[str, Any]], results[6]) commands = cast(list[str], results[7]) + observations = cast(list[dict[str, Any]], results[8]) bundle = build_attacker_bundle( attacker=attacker, @@ -92,6 +94,7 @@ async def api_export_attacker_stix( artifacts=artifacts, smtp_targets=smtp_targets, commands=commands, + observations=observations, ) return Response( content=bundle.serialize(pretty=True, indent=2), diff --git a/decnet/web/router/attackers/api_export_attackers_misp.py b/decnet/web/router/attackers/api_export_attackers_misp.py index b68560b0..1d644793 100644 --- a/decnet/web/router/attackers/api_export_attackers_misp.py +++ b/decnet/web/router/attackers/api_export_attackers_misp.py @@ -43,13 +43,15 @@ async def api_export_attackers_misp( user: dict[str, Any] = Depends(require_viewer), ) -> Response: """Download a MISP collection JSON covering every observed attacker.""" - rows, ttp_by_attacker = await asyncio.gather( + rows, ttp_by_attacker, obs_by_attacker = await asyncio.gather( repo.get_all_attackers_for_export(), repo.get_all_ttp_rollups_for_export(), + repo.get_all_observations_for_export(), ) collection = build_fleet_misp_collection( rows=rows, ttp_by_attacker=ttp_by_attacker, + observations_by_attacker=obs_by_attacker, ) ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") return Response( diff --git a/decnet/web/router/attackers/api_export_attackers_stix.py b/decnet/web/router/attackers/api_export_attackers_stix.py index 2660b3bb..01b60cce 100644 --- a/decnet/web/router/attackers/api_export_attackers_stix.py +++ b/decnet/web/router/attackers/api_export_attackers_stix.py @@ -42,8 +42,12 @@ async def api_export_attackers_stix( user: dict[str, Any] = Depends(require_viewer), ) -> Response: """Download a STIX 2.1 bundle covering every observed attacker.""" - rows, ttp_by_attacker = await _gather_fleet_data() - bundle = build_fleet_bundle(rows=rows, ttp_by_attacker=ttp_by_attacker) + rows, ttp_by_attacker, obs_by_attacker = await _gather_fleet_data() + bundle = build_fleet_bundle( + rows=rows, + ttp_by_attacker=ttp_by_attacker, + observations_by_attacker=obs_by_attacker, + ) ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") return Response( content=bundle.serialize(pretty=True, indent=2), @@ -56,10 +60,15 @@ async def api_export_attackers_stix( ) -async def _gather_fleet_data() -> tuple[list[dict[str, Any]], dict[str, list[dict[str, Any]]]]: +async def _gather_fleet_data() -> tuple[ + list[dict[str, Any]], + dict[str, list[dict[str, Any]]], + dict[str, list[dict[str, Any]]], +]: import asyncio - rows, ttp_by_attacker = await asyncio.gather( + rows, ttp_by_attacker, obs_by_attacker = await asyncio.gather( repo.get_all_attackers_for_export(), repo.get_all_ttp_rollups_for_export(), + repo.get_all_observations_for_export(), ) - return rows, ttp_by_attacker + return rows, ttp_by_attacker, obs_by_attacker diff --git a/tests/db/test_base_repo.py b/tests/db/test_base_repo.py index 68bc8133..12735941 100644 --- a/tests/db/test_base_repo.py +++ b/tests/db/test_base_repo.py @@ -111,6 +111,10 @@ class DummyRepo(BaseRepository): await super().get_log_histogram(*a, **kw); return [] async def has_observations_for_evidence(self, evidence_ref): await super().has_observations_for_evidence(evidence_ref); return False + async def list_observations_by_attacker(self, attacker_uuid): + await super().list_observations_by_attacker(attacker_uuid); return [] + async def get_all_observations_for_export(self): + await super().get_all_observations_for_export(); return {} async def get_attacker_uuid_by_ip(self, ip): await super().get_attacker_uuid_by_ip(ip); return None # TTP rollup surface (TTP_TAGGING.md) @@ -253,6 +257,10 @@ async def test_base_repo_coverage(): await dr.get_log_histogram() with pytest.raises(NotImplementedError): await dr.has_observations_for_evidence("shard:x#1") + with pytest.raises(NotImplementedError): + await dr.list_observations_by_attacker("a") + with pytest.raises(NotImplementedError): + await dr.get_all_observations_for_export() with pytest.raises(NotImplementedError): await dr.get_attacker_uuid_by_ip("1.1.1.1") with pytest.raises(NotImplementedError): diff --git a/tests/ttp/test_stix_custom.py b/tests/ttp/test_stix_custom.py new file mode 100644 index 00000000..b0511aa4 --- /dev/null +++ b/tests/ttp/test_stix_custom.py @@ -0,0 +1,157 @@ +"""Unit tests for decnet/ttp/stix_custom.py custom STIX types. + +Verifies that: +- DecnetActorFingerprintExt instantiates, serialises, and round-trips. +- XDecnetBehaveProfile instantiates, serialises, and round-trips. +- Both types survive a full bundle parse with allow_custom=True. +- FINGERPRINT_EXT_DEF is a valid ExtensionDefinition SDO. +""" +from __future__ import annotations + +import json +import uuid as _uuid + +import pytest +import stix2 + +from decnet.ttp.stix_custom import ( + ACTOR_FINGERPRINT_EXT_ID, + FINGERPRINT_EXT_DEF, + DecnetActorFingerprintExt, + XDecnetBehaveProfile, +) + +_NS = _uuid.UUID("b5d2c3a1-8f4e-4d1b-9a6c-0e7f5b3d2c1a") +_ORG_ID = f"identity--{_uuid.uuid5(_NS, 'decnet-honeypot')}" + + +def test_ext_id_is_extension_definition(): + assert ACTOR_FINGERPRINT_EXT_ID.startswith("extension-definition--") + + +def test_fingerprint_ext_def_valid(): + assert FINGERPRINT_EXT_DEF.id == ACTOR_FINGERPRINT_EXT_ID + assert FINGERPRINT_EXT_DEF.type == "extension-definition" + assert "property-extension" in FINGERPRINT_EXT_DEF.extension_types + + +def test_decnet_actor_fingerprint_ext_roundtrip(): + net = {"os_guess": "Linux 4.x", "hop_distance": 7, "retransmit_count": 1} + fp = {"ja3_hashes": ["abc123"], "kex_order_raw": ["curve25519-sha256"]} + ext = DecnetActorFingerprintExt( + extension_type="property-extension", + network_behavior=net, + protocol_fingerprints=fp, + ) + raw = json.loads(ext.serialize()) + assert raw["extension_type"] == "property-extension" + assert raw["network_behavior"]["os_guess"] == "Linux 4.x" + assert raw["protocol_fingerprints"]["ja3_hashes"] == ["abc123"] + + +def test_decnet_actor_fingerprint_ext_partial(): + ext = DecnetActorFingerprintExt( + extension_type="property-extension", + network_behavior={"behavior_class": "scanning"}, + ) + raw = json.loads(ext.serialize()) + assert "protocol_fingerprints" not in raw + + +def test_x_decnet_behave_profile_roundtrip(): + obs = [ + { + "primitive": "motor.input_modality", + "value": "typed", + "confidence": 0.9, + "window": {"start_ts": 1.0, "end_ts": 2.0}, + "source": "ssh", + "evidence_ref": "shard:dky/ssh/2026-01-01.jsonl#1", + } + ] + profile = XDecnetBehaveProfile( # type: ignore[call-arg] + id=f"x-decnet-behave-profile--{_uuid.uuid5(_NS, 'attacker-1')}", + created_by_ref=_ORG_ID, + schema_version=1, + kd_digraph_simhash="deadbeef12345678", + observations=obs, + ) + raw = json.loads(profile.serialize()) + assert raw["type"] == "x-decnet-behave-profile" + assert raw["schema_version"] == 1 + assert raw["kd_digraph_simhash"] == "deadbeef12345678" + assert len(raw["observations"]) == 1 + assert raw["observations"][0]["primitive"] == "motor.input_modality" + + +def test_x_decnet_behave_profile_stix2_parse_roundtrip(): + profile = XDecnetBehaveProfile( # type: ignore[call-arg] + id=f"x-decnet-behave-profile--{_uuid.uuid5(_NS, 'attacker-2')}", + created_by_ref=_ORG_ID, + schema_version=1, + kd_digraph_simhash=None, + observations=[], + ) + parsed = stix2.parse(profile.serialize(), allow_custom=True) + assert type(parsed).__name__ == "XDecnetBehaveProfile" + + +def test_threat_actor_with_extension_bundle_roundtrip(): + """Full bundle round-trip: ThreatActor with ext + profile SDO + ext-def SDO.""" + net = {"os_guess": "FreeBSD", "hop_distance": 3} + fp = {"hassh_hashes": ["h1h2h3"]} + ext = DecnetActorFingerprintExt( + extension_type="property-extension", + network_behavior=net, + protocol_fingerprints=fp, + ) + profile_id = f"x-decnet-behave-profile--{_uuid.uuid5(_NS, 'attacker-rt')}" + obs = [ + { + "primitive": "cognitive.exploration_style", + "value": "targeted", + "confidence": 0.85, + "window": {"start_ts": 100.0, "end_ts": 200.0}, + "source": "ssh", + "evidence_ref": "shard:dky/ssh/2026-01-02.jsonl#42", + } + ] + profile = XDecnetBehaveProfile( # type: ignore[call-arg] + id=profile_id, + created_by_ref=_ORG_ID, + schema_version=1, + kd_digraph_simhash="cafebabe00000000", + observations=obs, + ) + ta = stix2.ThreatActor( + id=f"threat-actor--{_uuid.uuid5(_NS, 'attacker-rt')}", + name="DECNET-test-actor", + threat_actor_types=["unknown"], + created_by_ref=_ORG_ID, + extensions={ACTOR_FINGERPRINT_EXT_ID: ext}, + x_decnet_behave_profile_ref=profile_id, + allow_custom=True, + ) + bundle = stix2.Bundle( + objects=[FINGERPRINT_EXT_DEF, profile, ta], allow_custom=True + ) + parsed = stix2.parse(bundle.serialize(), allow_custom=True) + + parsed_ta = next(o for o in parsed.objects if o.type == "threat-actor") + parsed_ext = parsed_ta.extensions[ACTOR_FINGERPRINT_EXT_ID] + parsed_profile = next( + o for o in parsed.objects if o.type == "x-decnet-behave-profile" + ) + + # Extension is typed, not a bare dict + assert type(parsed_ext).__name__ == "DecnetActorFingerprintExt" + assert parsed_ext.network_behavior["os_guess"] == "FreeBSD" + assert parsed_ext.protocol_fingerprints["hassh_hashes"] == ["h1h2h3"] + + # Profile SDO is typed and lossless + assert type(parsed_profile).__name__ == "XDecnetBehaveProfile" + assert parsed_profile.kd_digraph_simhash == "cafebabe00000000" + assert parsed_profile.observations[0]["primitive"] == "cognitive.exploration_style" + + # Ref survives + assert parsed_ta.x_decnet_behave_profile_ref == profile_id diff --git a/tests/ttp/test_stix_export_threat_actor_extensions.py b/tests/ttp/test_stix_export_threat_actor_extensions.py new file mode 100644 index 00000000..0455cb90 --- /dev/null +++ b/tests/ttp/test_stix_export_threat_actor_extensions.py @@ -0,0 +1,266 @@ +"""Integration tests for the x_decnet_* ThreatActor extensions in stix_export.py. + +Covers: +- Skinny attacker (no behavior, no identity, no observations) → no extension block, + no profile SDO, no extension-definition SDO. +- Attacker with behavior → extension block with network_behavior populated. +- Attacker with identity fingerprints → protocol_fingerprints group. +- Attacker with BEHAVE observations → x-decnet-behave-profile SDO in bundle, + x_decnet_behave_profile_ref on ThreatActor, extension-definition SDO present. +- kd_digraph_simhash hex-encoded correctly (bytes and base64 inputs). +- Full inter-DECNET round-trip: stix2.parse(bundle, allow_custom=True) yields + typed extension objects, not bare dicts. +""" +from __future__ import annotations + +import base64 +import json +import uuid as _uuid +from datetime import datetime, timezone + +import pytest +import stix2 + +from decnet.ttp.stix_custom import ( + ACTOR_FINGERPRINT_EXT_ID, + DecnetActorFingerprintExt, + XDecnetBehaveProfile, +) +from decnet.ttp.stix_export import build_attacker_bundle + +_NS = _uuid.UUID("b5d2c3a1-8f4e-4d1b-9a6c-0e7f5b3d2c1a") + + +def _attacker(uid: str = "att-aaaabbbbccccdddd") -> dict: + return { + "uuid": uid, + "ip": "1.2.3.4", + "first_seen": datetime(2026, 1, 1, tzinfo=timezone.utc), + "last_seen": datetime(2026, 1, 31, tzinfo=timezone.utc), + "event_count": 100, + "country_code": "US", + "asn": 15169, + "as_name": "GOOGLE", + } + + +def _behavior() -> dict: + return { + "os_guess": "Linux 4.x", + "hop_distance": 7, + "tcp_fingerprint": json.dumps({ + "window": 65535, "wscale": 6, "mss": 1460, + "options_sig": "MSTNNT", "has_sack": True, "ipid_class": "zero", + }), + "kex_order_raw": json.dumps(["curve25519-sha256", "ecdh-sha2-nistp256"]), + "ssh_client_banners": json.dumps(["SSH-2.0-OpenSSH_8.9"]), + "retransmit_count": 3, + "behavior_class": "brute_force", + "beacon_interval_s": 60.0, + "beacon_jitter_pct": 0.05, + "timing_stats": json.dumps({"mean": 1.2, "stdev": 0.3}), + "phase_sequence": json.dumps({"recon_end": "2026-01-10T00:00:00"}), + "tool_guesses": json.dumps(["hydra"]), + } + + +def _identity(uid: str = "ident-1111222233334444") -> dict: + return { + "uuid": uid, + "ja3_hashes": json.dumps(["abc123def456"]), + "hassh_hashes": json.dumps(["hashhash01"]), + "tls_cert_sha256": json.dumps(["a" * 64]), + "payload_simhashes": json.dumps(["deadbeef12345678"]), + "c2_endpoints": json.dumps([{"host": "bad.example.com", "port": 4444}]), + "kd_digraph_simhash": None, + } + + +def _obs() -> list[dict]: + return [ + { + "primitive": "motor.input_modality", + "value": "typed", + "confidence": 0.9, + "window": {"start_ts": 1000.0, "end_ts": 2000.0}, + "source": "ssh", + "evidence_ref": "shard:dky/ssh/2026-01-01.jsonl#1", + }, + { + "primitive": "cognitive.exploration_style", + "value": "targeted", + "confidence": 0.8, + "window": {"start_ts": 2000.0, "end_ts": 3000.0}, + "source": "ssh", + "evidence_ref": "shard:dky/ssh/2026-01-01.jsonl#2", + }, + ] + + +def _get_ta(bundle: stix2.Bundle) -> stix2.ThreatActor: + return next(o for o in bundle.objects if o.type == "threat-actor") + + +def test_skinny_attacker_no_extension(): + """No behavior, no identity, no observations → no extension block.""" + bundle = build_attacker_bundle( + attacker=_attacker(), + behavior=None, identity=None, intel=None, + technique_rollup=[], raw_tags=[], artifacts=[], + smtp_targets=[], observations=None, + ) + ta = _get_ta(bundle) + assert not getattr(ta, "extensions", None) + profile_sdos = [o for o in bundle.objects if o.type == "x-decnet-behave-profile"] + assert len(profile_sdos) == 0 + ext_def_sdos = [o for o in bundle.objects if o.type == "extension-definition"] + assert len(ext_def_sdos) == 0 + + +def test_behavior_produces_network_behavior_group(): + bundle = build_attacker_bundle( + attacker=_attacker(), + behavior=_behavior(), identity=None, intel=None, + technique_rollup=[], raw_tags=[], artifacts=[], + smtp_targets=[], observations=None, + ) + ta = _get_ta(bundle) + assert ta.extensions, "expected extension block" + ext = ta.extensions[ACTOR_FINGERPRINT_EXT_ID] + nb = ext.network_behavior + assert nb["os_guess"] == "Linux 4.x" + assert nb["hop_distance"] == 7 + assert nb["retransmit_count"] == 3 + assert nb["behavior_class"] == "brute_force" + assert nb["tcp_fingerprint"]["window"] == 65535 + assert nb["timing_stats"]["mean"] == pytest.approx(1.2) + assert "hydra" in nb["tool_guesses"] + + +def test_behavior_produces_protocol_fingerprints_from_behavior(): + bundle = build_attacker_bundle( + attacker=_attacker(), + behavior=_behavior(), identity=None, intel=None, + technique_rollup=[], raw_tags=[], artifacts=[], + smtp_targets=[], observations=None, + ) + ta = _get_ta(bundle) + ext = ta.extensions[ACTOR_FINGERPRINT_EXT_ID] + fp = ext.protocol_fingerprints + assert fp["kex_order_raw"] == ["curve25519-sha256", "ecdh-sha2-nistp256"] + assert fp["ssh_client_banners"] == ["SSH-2.0-OpenSSH_8.9"] + + +def test_identity_fingerprints_in_protocol_group(): + bundle = build_attacker_bundle( + attacker=_attacker(), + behavior=None, identity=_identity(), intel=None, + technique_rollup=[], raw_tags=[], artifacts=[], + smtp_targets=[], observations=None, + ) + ta = _get_ta(bundle) + ext = ta.extensions[ACTOR_FINGERPRINT_EXT_ID] + fp = ext.protocol_fingerprints + assert fp["ja3_hashes"] == ["abc123def456"] + assert fp["hassh_hashes"] == ["hashhash01"] + assert fp["tls_cert_sha256"] == ["a" * 64] + c2 = fp["c2_endpoints"] + assert isinstance(c2, list) and c2[0]["host"] == "bad.example.com" + + +def test_no_legacy_flat_x_decnet_hash_properties(): + """Dropped: x_decnet_ja3_hashes / x_decnet_hassh_hashes / x_decnet_c2_endpoints.""" + bundle = build_attacker_bundle( + attacker=_attacker(), + behavior=_behavior(), identity=_identity(), intel=None, + technique_rollup=[], raw_tags=[], artifacts=[], + smtp_targets=[], observations=None, + ) + ta = _get_ta(bundle) + for old_prop in ("x_decnet_ja3_hashes", "x_decnet_hassh_hashes", "x_decnet_c2_endpoints"): + assert not hasattr(ta, old_prop), f"legacy property {old_prop!r} should not exist" + + +def test_observations_produce_behave_profile_sdo(): + bundle = build_attacker_bundle( + attacker=_attacker(), + behavior=None, identity=None, intel=None, + technique_rollup=[], raw_tags=[], artifacts=[], + smtp_targets=[], observations=_obs(), + ) + ta = _get_ta(bundle) + assert hasattr(ta, "x_decnet_behave_profile_ref") + profile_sdos = [o for o in bundle.objects if o.type == "x-decnet-behave-profile"] + assert len(profile_sdos) == 1 + profile = profile_sdos[0] + assert profile.id == ta.x_decnet_behave_profile_ref + assert len(profile.observations) == 2 + assert profile.observations[0]["primitive"] == "motor.input_modality" + assert profile.observations[0]["confidence"] == pytest.approx(0.9) + assert profile.observations[0]["window"]["start_ts"] == pytest.approx(1000.0) + + +def test_observations_include_extension_def_sdo(): + bundle = build_attacker_bundle( + attacker=_attacker(), + behavior=None, identity=None, intel=None, + technique_rollup=[], raw_tags=[], artifacts=[], + smtp_targets=[], observations=_obs(), + ) + ext_defs = [o for o in bundle.objects if o.type == "extension-definition"] + assert len(ext_defs) == 1 + assert ext_defs[0].id == ACTOR_FINGERPRINT_EXT_ID + + +def test_kd_digraph_simhash_bytes_input(): + ident = _identity() + ident["kd_digraph_simhash"] = b"\xde\xad\xbe\xef\x12\x34\x56\x78" + bundle = build_attacker_bundle( + attacker=_attacker(), + behavior=None, identity=ident, intel=None, + technique_rollup=[], raw_tags=[], artifacts=[], + smtp_targets=[], observations=[_obs()[0]], + ) + profile = next(o for o in bundle.objects if o.type == "x-decnet-behave-profile") + assert profile.kd_digraph_simhash == "deadbeef12345678" + + +def test_kd_digraph_simhash_base64_input(): + raw = b"\xca\xfe\xba\xbe\x00\x00\x00\x00" + ident = _identity() + ident["kd_digraph_simhash"] = base64.b64encode(raw).decode() + bundle = build_attacker_bundle( + attacker=_attacker(), + behavior=None, identity=ident, intel=None, + technique_rollup=[], raw_tags=[], artifacts=[], + smtp_targets=[], observations=[_obs()[0]], + ) + profile = next(o for o in bundle.objects if o.type == "x-decnet-behave-profile") + assert profile.kd_digraph_simhash == raw.hex() + + +def test_inter_decnet_round_trip(): + """Primary fidelity: stix2.parse restores typed objects, not bare dicts.""" + ident = _identity() + ident["kd_digraph_simhash"] = b"\xde\xad\xbe\xef\x12\x34\x56\x78" + bundle = build_attacker_bundle( + attacker=_attacker(), + behavior=_behavior(), identity=ident, intel=None, + technique_rollup=[], raw_tags=[], artifacts=[], + smtp_targets=[], observations=_obs(), + ) + parsed = stix2.parse(bundle.serialize(pretty=True, indent=2), allow_custom=True) + + parsed_ta = next(o for o in parsed.objects if o.type == "threat-actor") + assert ACTOR_FINGERPRINT_EXT_ID in parsed_ta.extensions + parsed_ext = parsed_ta.extensions[ACTOR_FINGERPRINT_EXT_ID] + assert type(parsed_ext).__name__ == "DecnetActorFingerprintExt" + assert parsed_ext.network_behavior["os_guess"] == "Linux 4.x" + assert parsed_ext.protocol_fingerprints["ja3_hashes"] == ["abc123def456"] + + parsed_profile = next(o for o in parsed.objects if o.type == "x-decnet-behave-profile") + assert type(parsed_profile).__name__ == "XDecnetBehaveProfile" + assert parsed_profile.kd_digraph_simhash == "deadbeef12345678" + primitives = {obs["primitive"] for obs in parsed_profile.observations} + assert "motor.input_modality" in primitives + assert "cognitive.exploration_style" in primitives diff --git a/tests/web/test_api_export_attacker_misp.py b/tests/web/test_api_export_attacker_misp.py index 505c0a97..21f7dc8a 100644 --- a/tests/web/test_api_export_attacker_misp.py +++ b/tests/web/test_api_export_attacker_misp.py @@ -112,6 +112,7 @@ def _mock_repo(*, attacker=None, intel=None, rollup=None, tags=None, m.get_attacker_artifacts = AsyncMock(return_value=artifacts or []) m.list_smtp_targets = AsyncMock(return_value=smtp or []) m.list_attacker_commands_deduped = AsyncMock(return_value=commands or []) + m.list_observations_by_attacker = AsyncMock(return_value=[]) return m diff --git a/tests/web/test_api_export_attacker_stix.py b/tests/web/test_api_export_attacker_stix.py index 7c8124ae..5763fc81 100644 --- a/tests/web/test_api_export_attacker_stix.py +++ b/tests/web/test_api_export_attacker_stix.py @@ -128,6 +128,7 @@ def _mock_repo(*, attacker=None, identity=None, intel=None, m.get_attacker_artifacts = AsyncMock(return_value=artifacts or []) m.list_smtp_targets = AsyncMock(return_value=smtp or []) m.list_attacker_commands_deduped = AsyncMock(return_value=commands or []) + m.list_observations_by_attacker = AsyncMock(return_value=[]) return m diff --git a/tests/web/test_api_export_attackers_misp.py b/tests/web/test_api_export_attackers_misp.py index 1061845a..6a16dc77 100644 --- a/tests/web/test_api_export_attackers_misp.py +++ b/tests/web/test_api_export_attackers_misp.py @@ -51,6 +51,7 @@ def _mock_repo(*, rows=None, ttp_by_attacker=None): m = type("M", (), {})() m.get_all_attackers_for_export = AsyncMock(return_value=rows or []) m.get_all_ttp_rollups_for_export = AsyncMock(return_value=ttp_by_attacker or {}) + m.get_all_observations_for_export = AsyncMock(return_value={}) return m diff --git a/tests/web/test_api_export_attackers_stix.py b/tests/web/test_api_export_attackers_stix.py index d3befee9..7e708a07 100644 --- a/tests/web/test_api_export_attackers_stix.py +++ b/tests/web/test_api_export_attackers_stix.py @@ -52,6 +52,7 @@ def _mock_repo(*, rows=None, ttp_by_attacker=None): m = type("M", (), {})() m.get_all_attackers_for_export = AsyncMock(return_value=rows or []) m.get_all_ttp_rollups_for_export = AsyncMock(return_value=ttp_by_attacker or {}) + m.get_all_observations_for_export = AsyncMock(return_value={}) return m