TLS cert capture was the last prober special-case that bypassed ActiveProbeMeta. Moves logic into TlsCertProbe (priority=200, runs after JARM) in probes/tlscert_probe.py; drops _capture_tls_cert, the probe.probe_name=="jarm" name-check, and the direct fetch_leaf_cert import from worker.py.
89 lines
2.9 KiB
Python
89 lines
2.9 KiB
Python
"""
|
|
ActiveProbe ABC and metaclass registry for port-iterating active probes.
|
|
|
|
Adding a new active probe = one class with three methods.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from abc import ABCMeta, abstractmethod
|
|
from typing import Any
|
|
|
|
from decnet.correlation.fingerprint_rotation import ProbeType
|
|
|
|
|
|
class ActiveProbeMeta(ABCMeta):
|
|
"""Metaclass that auto-registers every ActiveProbe subclass by probe_name."""
|
|
|
|
_registry: dict[str, type[ActiveProbe]] = {}
|
|
|
|
def __new__(
|
|
mcs,
|
|
name: str,
|
|
bases: tuple[type, ...],
|
|
namespace: dict[str, Any],
|
|
) -> ActiveProbeMeta:
|
|
cls = super().__new__(mcs, name, bases, namespace)
|
|
if bases and getattr(cls, "probe_name", None):
|
|
mcs._registry[cls.probe_name] = cls # type: ignore[attr-defined,assignment]
|
|
return cls
|
|
|
|
@classmethod
|
|
def all(mcs) -> list[type[ActiveProbe]]:
|
|
"""Return registered probes sorted by (priority asc, probe_name asc)."""
|
|
return sorted(mcs._registry.values(), key=lambda c: (c.priority, c.probe_name))
|
|
|
|
|
|
class ActiveProbe(metaclass=ActiveProbeMeta):
|
|
"""Base class for all port-iterating active probes.
|
|
|
|
Subclasses declare class-level attributes and implement three methods.
|
|
Registration is automatic via ActiveProbeMeta.
|
|
|
|
Port override: set DECNET_PROBE_PORTS_<NAME_UPPER> (comma-separated) to
|
|
override default_ports at runtime without touching the class.
|
|
"""
|
|
|
|
probe_name: str
|
|
default_ports: list[int | None]
|
|
event_type: str
|
|
rotation_type: ProbeType | None = None
|
|
rotation_hash_key: str | None = None
|
|
priority: int = 100
|
|
|
|
def __init__(self) -> None:
|
|
env_key = f"DECNET_PROBE_PORTS_{self.probe_name.upper()}"
|
|
raw = os.environ.get(env_key, "").strip()
|
|
if raw:
|
|
try:
|
|
self._ports: list[int | None] = [int(p.strip()) for p in raw.split(",") if p.strip()]
|
|
except ValueError:
|
|
self._ports = list(self.default_ports)
|
|
else:
|
|
self._ports = list(self.default_ports)
|
|
|
|
@property
|
|
def ports(self) -> list[int | None]:
|
|
return self._ports
|
|
|
|
@abstractmethod
|
|
def run(self, ip: str, port: int | None, timeout: float) -> dict[str, Any] | None:
|
|
"""Execute the probe against ip:port (port is None for port-free probes).
|
|
|
|
Return a result dict on success, or None to suppress emission (e.g.
|
|
empty JARM hash means the port doesn't speak TLS).
|
|
"""
|
|
|
|
@abstractmethod
|
|
def syslog_fields(self, ip: str, port: int | None, result: dict[str, Any]) -> tuple[dict[str, Any], str]:
|
|
"""Return (sd_fields, human_msg) for _write_event.
|
|
|
|
target_ip and target_port are injected by _run_probe; do not include
|
|
them in sd_fields.
|
|
"""
|
|
|
|
@abstractmethod
|
|
def publish_payload(self, ip: str, port: int | None, result: dict[str, Any]) -> dict[str, Any]:
|
|
"""Return the bus payload dict for attacker.fingerprinted events."""
|