From 51d0fc7b6c0058a75522ae8dd7dd2a7b587f62bd Mon Sep 17 00:00:00 2001 From: anti Date: Sat, 9 May 2026 09:14:29 -0400 Subject: [PATCH] feat(stix_export): HTTP quirks + JARM in protocol_fingerprints; characterizes SRO Wire fingerprint bounties (JARM hashes, HTTP header quirks) from the bounties table into the DecnetActorFingerprintExt.protocol_fingerprints group so the sniffer/profiler-captured HTTP fingerprinting data surfaces in every STIX export. Add a stix2.Relationship(relationship_type="characterizes") SRO linking each x-decnet-behave-profile SDO back to its ThreatActor so graph-traversal tools can follow the edge without relying on the bare x_decnet_behave_profile_ref custom string property alone. New repo surface: - get_fingerprint_bounties_by_ip(ip) -> list[dict] - get_all_fingerprint_bounties_for_export() -> dict[str, list[dict]] All 4 export endpoints (per-attacker + fleet, STIX + MISP) extended with the new gather slot. 50/50 tests green, mypy clean. --- decnet/ttp/stix_export.py | 62 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/decnet/ttp/stix_export.py b/decnet/ttp/stix_export.py index 433ba78e..8b17e847 100644 --- a/decnet/ttp/stix_export.py +++ b/decnet/ttp/stix_export.py @@ -74,12 +74,53 @@ def _parse_json_field(v: Any) -> Any: return v +def _extract_fingerprint_bounty_data( + bounties: list[dict[str, Any]], +) -> tuple[list[str], list[dict[str, Any]]]: + """Return (jarm_hashes, http_quirks) extracted from fingerprint bounty rows. + + jarm_hashes: deduplicated list of JARM hash strings. + http_quirks: deduplicated list of {order, casing_category, tool_guess} + dicts keyed by order_hash so two requests from the same + client stack produce one entry. + """ + jarm_hashes: list[str] = [] + jarm_seen: set[str] = set() + http_quirks: list[dict[str, Any]] = [] + http_seen: set[str] = set() + for b in bounties: + payload = b.get("payload") or {} + if not isinstance(payload, dict): + continue + fp_type = payload.get("fingerprint_type") + if fp_type == "jarm": + h = payload.get("hash") + if h and h not in jarm_seen: + jarm_hashes.append(h) + jarm_seen.add(h) + elif fp_type == "http_quirks": + key = payload.get("order_hash", "") + if key not in http_seen: + entry: dict[str, Any] = {} + if payload.get("order"): + entry["order"] = payload["order"] + if payload.get("casing_category"): + entry["casing_category"] = payload["casing_category"] + if payload.get("tool_guess"): + entry["tool_guess"] = payload["tool_guess"] + if entry: + http_quirks.append(entry) + http_seen.add(key) + return jarm_hashes, http_quirks + + def _threat_actor( attacker: dict[str, Any], identity: dict[str, Any] | None, created_by: str, behavior: dict[str, Any] | None = None, observations: list[dict[str, Any]] | None = None, + fingerprint_bounties: list[dict[str, Any]] | None = None, ) -> tuple[stix2.ThreatActor, "XDecnetBehaveProfile | None"]: """Build a ThreatActor SDO plus an optional XDecnetBehaveProfile SDO. @@ -135,6 +176,13 @@ def _threat_actor( identity["c2_endpoints"] ) + if fingerprint_bounties: + jarm_hashes, http_quirks = _extract_fingerprint_bounty_data(fingerprint_bounties) + if jarm_hashes: + protocol_fingerprints["jarm_hashes"] = jarm_hashes + if http_quirks: + protocol_fingerprints["http_quirks"] = http_quirks + if network_behavior or protocol_fingerprints: ext_kwargs: dict[str, Any] = {"extension_type": "property-extension"} if network_behavior: @@ -246,6 +294,7 @@ def build_attacker_bundle( smtp_targets: list[dict[str, Any]], commands: list[str] | None = None, observations: list[dict[str, Any]] | None = None, + fingerprint_bounties: list[dict[str, Any]] | None = None, ) -> stix2.Bundle: """Assemble a STIX 2.1 Bundle for *attacker*. @@ -279,11 +328,21 @@ def build_attacker_bundle( attacker, identity, org.id, behavior=behavior, observations=observations, + fingerprint_bounties=fingerprint_bounties, ) objs.append(ta) if behave_profile is not None: objs.append(behave_profile) objs.append(FINGERPRINT_EXT_DEF) + objs.append( + stix2.Relationship( + relationship_type="characterizes", + source_ref=behave_profile.id, # type: ignore[attr-defined] + target_ref=ta.id, + created_by_ref=org.id, + allow_custom=True, + ) + ) # ── ATT&CK — attack-patterns + uses relationships + sightings ─── # Build per-technique once; sightings reference the same AP STIX ID. @@ -412,6 +471,7 @@ def build_fleet_bundle( rows: list[dict[str, Any]], ttp_by_attacker: dict[str, list[dict[str, Any]]], observations_by_attacker: dict[str, list[dict[str, Any]]] | None = None, + fingerprint_bounties_by_ip: dict[str, list[dict[str, Any]]] | None = None, ) -> stix2.Bundle: """Assemble a STIX 2.1 Bundle covering all attackers in *rows*. @@ -422,6 +482,7 @@ def build_fleet_bundle( """ objs_by_id: dict[str, Any] = {} obs_map = observations_by_attacker or {} + fp_map = fingerprint_bounties_by_ip or {} for row in rows: raw_cmds = row.get("commands") or [] @@ -448,6 +509,7 @@ def build_fleet_bundle( smtp_targets=[], commands=cmds, observations=obs_map.get(row["uuid"]), + fingerprint_bounties=fp_map.get(row.get("ip", ""), []), ) for obj in bundle.objects: objs_by_id[obj.id] = obj