feat(stix_export): wire fingerprint bounties through all endpoints + tests
Remaining files from the fingerprint-bounties + characterizes-SRO commit: misp_export, repository, bounties mixin, all 4 router endpoints, and test suite updates. Prerequisite: previous commit added _extract_fingerprint_bounty_data and the stix_export changes.
This commit is contained in:
@@ -141,6 +141,40 @@ class BountiesMixin(_MixinBase):
|
||||
grouped[item.attacker_ip].append(d)
|
||||
return dict(grouped)
|
||||
|
||||
async def get_fingerprint_bounties_by_ip(self, ip: str) -> List[dict[str, Any]]:
|
||||
async with self._session() as session:
|
||||
result = await session.execute(
|
||||
select(Bounty)
|
||||
.where(Bounty.attacker_ip == ip, Bounty.bounty_type == "fingerprint")
|
||||
.order_by(asc(Bounty.timestamp))
|
||||
)
|
||||
out: List[dict[str, Any]] = []
|
||||
for item in result.scalars().all():
|
||||
d = item.model_dump(mode="json")
|
||||
try:
|
||||
d["payload"] = json.loads(d["payload"])
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
out.append(d)
|
||||
return out
|
||||
|
||||
async def get_all_fingerprint_bounties_for_export(self) -> dict[str, List[dict[str, Any]]]:
|
||||
async with self._session() as session:
|
||||
result = await session.execute(
|
||||
select(Bounty)
|
||||
.where(Bounty.bounty_type == "fingerprint")
|
||||
.order_by(asc(Bounty.timestamp))
|
||||
)
|
||||
grouped: dict[str, List[dict[str, Any]]] = defaultdict(list)
|
||||
for item in result.scalars().all():
|
||||
d = item.model_dump(mode="json")
|
||||
try:
|
||||
d["payload"] = json.loads(d["payload"])
|
||||
except (json.JSONDecodeError, TypeError):
|
||||
pass
|
||||
grouped[item.attacker_ip].append(d)
|
||||
return dict(grouped)
|
||||
|
||||
async def count_probe_relays(self, attacker_ip: str, decky: str) -> int:
|
||||
"""Return how many probe_relay bounties exist for this (attacker_ip, decky) pair."""
|
||||
async with self._session() as session:
|
||||
|
||||
Reference in New Issue
Block a user