diff --git a/decnet/cli.py b/decnet/cli.py index 8cc83e6..0e503e6 100644 --- a/decnet/cli.py +++ b/decnet/cli.py @@ -120,8 +120,6 @@ def deploy( config_file: Optional[str] = typer.Option(None, "--config", "-c", help="Path to INI config file"), api: bool = typer.Option(False, "--api", help="Start the FastAPI backend to ingest and serve logs"), api_port: int = typer.Option(8000, "--api-port", help="Port for the backend API"), - probe_targets: Optional[str] = typer.Option(None, "--probe-targets", help="Comma-separated ip:port pairs for JARM active probing (e.g. 10.0.0.1:443,10.0.0.2:8443)"), - probe_interval: int = typer.Option(300, "--probe-interval", help="Seconds between JARM probe cycles (default: 300)"), ) -> None: """Deploy deckies to the LAN.""" import os @@ -298,18 +296,16 @@ def deploy( except (FileNotFoundError, subprocess.SubprocessError): console.print("[red]Failed to start API. Ensure 'uvicorn' is installed in the current environment.[/]") - if probe_targets and not dry_run: + if effective_log_file and not dry_run: import subprocess # nosec B404 import sys - console.print(f"[bold cyan]Starting DECNET-PROBER[/] → targets: {probe_targets}") + console.print("[bold cyan]Starting DECNET-PROBER[/] (auto-discovers attackers from log stream)") try: _prober_args = [ sys.executable, "-m", "decnet.cli", "probe", - "--targets", probe_targets, - "--interval", str(probe_interval), + "--daemon", + "--log-file", str(effective_log_file), ] - if effective_log_file: - _prober_args.extend(["--log-file", str(effective_log_file)]) subprocess.Popen( # nosec B603 _prober_args, stdin=subprocess.DEVNULL, @@ -323,17 +319,28 @@ def deploy( @app.command() def probe( - targets: str = typer.Option(..., "--targets", "-t", help="Comma-separated ip:port pairs to JARM fingerprint"), - log_file: str = typer.Option(DECNET_INGEST_LOG_FILE, "--log-file", "-f", help="Path for RFC 5424 syslog + .json output"), + log_file: str = typer.Option(DECNET_INGEST_LOG_FILE, "--log-file", "-f", help="Path for RFC 5424 syslog + .json output (reads attackers from .json, writes results to both)"), interval: int = typer.Option(300, "--interval", "-i", help="Seconds between probe cycles (default: 300)"), timeout: float = typer.Option(5.0, "--timeout", help="Per-probe TCP timeout in seconds"), + daemon: bool = typer.Option(False, "--daemon", "-d", help="Detach to background (used by deploy, no console output)"), ) -> None: - """Run JARM active fingerprinting against target hosts.""" + """JARM-fingerprint all attackers discovered in the log stream.""" import asyncio from decnet.prober import prober_worker - log.info("probe command invoked targets=%s interval=%d", targets, interval) - console.print(f"[bold cyan]DECNET-PROBER starting[/] → {targets}") - asyncio.run(prober_worker(log_file, targets, interval=interval, timeout=timeout)) + + if daemon: + # Suppress console output when running as background daemon + import os + log.info("probe daemon starting log_file=%s interval=%d", log_file, interval) + asyncio.run(prober_worker(log_file, interval=interval, timeout=timeout)) + else: + log.info("probe command invoked log_file=%s interval=%d", log_file, interval) + console.print(f"[bold cyan]DECNET-PROBER[/] watching {log_file} for attackers (interval: {interval}s)") + console.print("[dim]Press Ctrl+C to stop[/]") + try: + asyncio.run(prober_worker(log_file, interval=interval, timeout=timeout)) + except KeyboardInterrupt: + console.print("\n[yellow]DECNET-PROBER stopped.[/]") @app.command() diff --git a/decnet/prober/worker.py b/decnet/prober/worker.py index 74c6795..ba133cb 100644 --- a/decnet/prober/worker.py +++ b/decnet/prober/worker.py @@ -1,10 +1,12 @@ """ DECNET-PROBER standalone worker. -Runs as a detached host-level process. Probes targets on a configurable -interval and writes results as RFC 5424 syslog + JSON to the same log -files the collector uses. The ingester tails the JSON file and extracts -JARM bounties automatically. +Runs as a detached host-level process. Discovers attacker IPs by tailing the +collector's JSON log file, then JARM-probes them on common C2/TLS ports. +Results are written as RFC 5424 syslog + JSON to the same log files. + +Target discovery is fully automatic — every unique attacker IP seen in the +log stream gets probed. No manual target list required. Tech debt: writing directly to the collector's log files couples the prober to the collector's file format. A future refactor should introduce @@ -21,10 +23,17 @@ from pathlib import Path from typing import Any from decnet.logging import get_logger -from decnet.prober.jarm import jarm_hash +from decnet.prober.jarm import JARM_EMPTY_HASH, jarm_hash logger = get_logger("prober") +# ─── Default ports to JARM-probe on each attacker IP ───────────────────────── +# Common C2 callback / TLS server ports (Cobalt Strike, Sliver, Metasploit, etc.) + +DEFAULT_PROBE_PORTS: list[int] = [ + 443, 8443, 8080, 4443, 50050, 2222, 993, 995, 8888, 9001, +] + # ─── RFC 5424 formatting (inline, mirrors templates/*/decnet_logging.py) ───── _FACILITY_LOCAL0 = 16 @@ -144,100 +153,181 @@ def _write_event( f.flush() -# ─── Target parser ─────────────────────────────────────────────────────────── +# ─── Target discovery from log stream ──────────────────────────────────────── -def _parse_targets(raw: str) -> list[tuple[str, int]]: - """Parse 'ip:port,ip:port,...' into a list of (host, port) tuples.""" - targets: list[tuple[str, int]] = [] - for entry in raw.split(","): - entry = entry.strip() - if not entry: - continue - if ":" not in entry: - logger.warning("prober: skipping malformed target %r (missing port)", entry) - continue - host, _, port_str = entry.rpartition(":") - try: - port = int(port_str) - if not (1 <= port <= 65535): - raise ValueError - targets.append((host, port)) - except ValueError: - logger.warning("prober: skipping malformed target %r (bad port)", entry) - return targets +def _discover_attackers(json_path: Path, position: int) -> tuple[set[str], int]: + """ + Read new JSON log lines from the given position and extract unique + attacker IPs. Returns (new_ips, new_position). + + Only considers IPs that are not "Unknown" and come from events that + indicate real attacker interaction (not prober's own events). + """ + new_ips: set[str] = set() + + if not json_path.exists(): + return new_ips, position + + size = json_path.stat().st_size + if size < position: + position = 0 # file rotated + + if size == position: + return new_ips, position + + with open(json_path, "r", encoding="utf-8", errors="replace") as f: + f.seek(position) + while True: + line = f.readline() + if not line: + break + if not line.endswith("\n"): + break # partial line + + try: + record = json.loads(line.strip()) + except json.JSONDecodeError: + position = f.tell() + continue + + # Skip our own events + if record.get("service") == "prober": + position = f.tell() + continue + + ip = record.get("attacker_ip", "Unknown") + if ip != "Unknown" and ip: + new_ips.add(ip) + + position = f.tell() + + return new_ips, position # ─── Probe cycle ───────────────────────────────────────────────────────────── def _probe_cycle( - targets: list[tuple[str, int]], + targets: set[str], + probed: dict[str, set[int]], + ports: list[int], log_path: Path, json_path: Path, timeout: float = 5.0, ) -> None: - for host, port in targets: - try: - h = jarm_hash(host, port, timeout=timeout) - _write_event( - log_path, json_path, - "jarm_fingerprint", - target_ip=host, - target_port=str(port), - jarm_hash=h, - msg=f"JARM {host}:{port} = {h}", - ) - logger.info("prober: JARM %s:%d = %s", host, port, h) - except Exception as exc: - _write_event( - log_path, json_path, - "prober_error", - severity=_SEVERITY_WARNING, - target_ip=host, - target_port=str(port), - error=str(exc), - msg=f"JARM probe failed for {host}:{port}: {exc}", - ) - logger.warning("prober: JARM probe failed %s:%d: %s", host, port, exc) + """ + Probe all known attacker IPs on the configured ports. + + Args: + targets: set of attacker IPs to probe + probed: dict mapping IP -> set of ports already successfully probed + ports: list of ports to probe on each IP + log_path: RFC 5424 log file + json_path: JSON log file + timeout: per-probe TCP timeout + """ + for ip in sorted(targets): + already_done = probed.get(ip, set()) + ports_to_probe = [p for p in ports if p not in already_done] + + if not ports_to_probe: + continue + + for port in ports_to_probe: + try: + h = jarm_hash(ip, port, timeout=timeout) + if h == JARM_EMPTY_HASH: + # No TLS server on this port — don't log, don't reprobed + probed.setdefault(ip, set()).add(port) + continue + + _write_event( + log_path, json_path, + "jarm_fingerprint", + target_ip=ip, + target_port=str(port), + jarm_hash=h, + msg=f"JARM {ip}:{port} = {h}", + ) + logger.info("prober: JARM %s:%d = %s", ip, port, h) + probed.setdefault(ip, set()).add(port) + + except Exception as exc: + _write_event( + log_path, json_path, + "prober_error", + severity=_SEVERITY_WARNING, + target_ip=ip, + target_port=str(port), + error=str(exc), + msg=f"JARM probe failed for {ip}:{port}: {exc}", + ) + logger.warning("prober: JARM probe failed %s:%d: %s", ip, port, exc) + # Mark as probed to avoid infinite retries + probed.setdefault(ip, set()).add(port) # ─── Main worker ───────────────────────────────────────────────────────────── async def prober_worker( log_file: str, - targets_raw: str, interval: int = 300, timeout: float = 5.0, + ports: list[int] | None = None, ) -> None: """ Main entry point for the standalone prober process. + Discovers attacker IPs automatically by tailing the JSON log file, + then JARM-probes each IP on common C2 ports. + Args: log_file: base path for log files (RFC 5424 to .log, JSON to .json) - targets_raw: comma-separated ip:port pairs interval: seconds between probe cycles timeout: per-probe TCP timeout + ports: list of ports to probe (defaults to DEFAULT_PROBE_PORTS) """ - targets = _parse_targets(targets_raw) - if not targets: - logger.error("prober: no valid targets, exiting") - return + probe_ports = ports or DEFAULT_PROBE_PORTS log_path = Path(log_file) json_path = log_path.with_suffix(".json") log_path.parent.mkdir(parents=True, exist_ok=True) - logger.info("prober started targets=%d interval=%ds log=%s", len(targets), interval, log_path) + logger.info( + "prober started interval=%ds ports=%s log=%s", + interval, ",".join(str(p) for p in probe_ports), log_path, + ) _write_event( log_path, json_path, "prober_startup", - target_count=str(len(targets)), interval=str(interval), - msg=f"DECNET-PROBER started with {len(targets)} targets, interval {interval}s", + probe_ports=",".join(str(p) for p in probe_ports), + msg=f"DECNET-PROBER started, interval {interval}s, " + f"ports {','.join(str(p) for p in probe_ports)}", ) + known_attackers: set[str] = set() + probed: dict[str, set[int]] = {} # IP -> set of ports already probed + log_position: int = 0 + while True: - await asyncio.to_thread( - _probe_cycle, targets, log_path, json_path, timeout, + # Discover new attacker IPs from the log stream + new_ips, log_position = await asyncio.to_thread( + _discover_attackers, json_path, log_position, ) + + if new_ips - known_attackers: + fresh = new_ips - known_attackers + known_attackers.update(fresh) + logger.info( + "prober: discovered %d new attacker(s), total=%d", + len(fresh), len(known_attackers), + ) + + if known_attackers: + await asyncio.to_thread( + _probe_cycle, known_attackers, probed, probe_ports, + log_path, json_path, timeout, + ) + await asyncio.sleep(interval) diff --git a/tests/test_prober_jarm.py b/tests/test_prober_jarm.py index 67bf8ed..d0f8fd1 100644 --- a/tests/test_prober_jarm.py +++ b/tests/test_prober_jarm.py @@ -60,15 +60,6 @@ class TestBuildClientHello: # supported_versions extension type = 0x002B assert b"\x00\x2b" in data, f"Probe {idx} missing supported_versions" - def test_non_tls13_probes_lack_supported_versions(self): - """Probes 0, 1, 2, 7, 8 should NOT include supported_versions.""" - for idx in (0, 1, 2, 7, 8): - data = _build_client_hello(idx, host="example.com") - # Check that 0x002B doesn't appear as extension type - # We need to be more careful here — just check it's not in extensions area - # After session_id, ciphers, compression comes extensions - assert data[0] == 0x16 # sanity - def test_probe_9_includes_alpn_http11(self): data = _build_client_hello(9, host="example.com") assert b"http/1.1" in data @@ -129,7 +120,6 @@ class TestParseServerHello: def test_tls13_via_supported_versions(self): """When supported_versions extension says TLS 1.3, version should be tls13.""" - # supported_versions extension: type=0x002B, length=2, version=0x0304 ext = struct.pack("!HHH", 0x002B, 2, 0x0304) data = _make_server_hello(cipher=0x1301, version=0x0303, extensions=ext) result = _parse_server_hello(data) @@ -153,7 +143,6 @@ class TestParseServerHello: def test_non_server_hello_returns_separator(self): """A Certificate message (type 0x0B) should not parse as ServerHello.""" - # Build a record that's handshake type but has wrong hs type body = b"\x00" * 40 hs = struct.pack("B", 0x0B) + struct.pack("!I", len(body))[1:] + body record = struct.pack("B", 0x16) + struct.pack("!H", 0x0303) + struct.pack("!H", len(hs)) + hs diff --git a/tests/test_prober_worker.py b/tests/test_prober_worker.py new file mode 100644 index 0000000..208907a --- /dev/null +++ b/tests/test_prober_worker.py @@ -0,0 +1,207 @@ +""" +Tests for the prober worker — target discovery from the log stream and +probe cycle behavior. +""" + +from __future__ import annotations + +import json +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from decnet.prober.jarm import JARM_EMPTY_HASH +from decnet.prober.worker import ( + DEFAULT_PROBE_PORTS, + _discover_attackers, + _probe_cycle, + _write_event, +) + + +# ─── _discover_attackers ───────────────────────────────────────────────────── + +class TestDiscoverAttackers: + + def test_discovers_unique_ips(self, tmp_path: Path): + json_file = tmp_path / "decnet.json" + records = [ + {"service": "sniffer", "event_type": "tls_client_hello", "attacker_ip": "10.0.0.1", "fields": {}}, + {"service": "ssh", "event_type": "login_attempt", "attacker_ip": "10.0.0.2", "fields": {}}, + {"service": "sniffer", "event_type": "tls_client_hello", "attacker_ip": "10.0.0.1", "fields": {}}, # dup + ] + json_file.write_text("\n".join(json.dumps(r) for r in records) + "\n") + + ips, pos = _discover_attackers(json_file, 0) + assert ips == {"10.0.0.1", "10.0.0.2"} + assert pos > 0 + + def test_skips_prober_events(self, tmp_path: Path): + json_file = tmp_path / "decnet.json" + records = [ + {"service": "prober", "event_type": "jarm_fingerprint", "attacker_ip": "10.0.0.99", "fields": {}}, + {"service": "ssh", "event_type": "login_attempt", "attacker_ip": "10.0.0.1", "fields": {}}, + ] + json_file.write_text("\n".join(json.dumps(r) for r in records) + "\n") + + ips, _ = _discover_attackers(json_file, 0) + assert "10.0.0.99" not in ips + assert "10.0.0.1" in ips + + def test_skips_unknown_ips(self, tmp_path: Path): + json_file = tmp_path / "decnet.json" + records = [ + {"service": "sniffer", "event_type": "startup", "attacker_ip": "Unknown", "fields": {}}, + ] + json_file.write_text("\n".join(json.dumps(r) for r in records) + "\n") + + ips, _ = _discover_attackers(json_file, 0) + assert len(ips) == 0 + + def test_handles_missing_file(self, tmp_path: Path): + json_file = tmp_path / "nonexistent.json" + ips, pos = _discover_attackers(json_file, 0) + assert len(ips) == 0 + assert pos == 0 + + def test_resumes_from_position(self, tmp_path: Path): + json_file = tmp_path / "decnet.json" + line1 = json.dumps({"service": "ssh", "attacker_ip": "10.0.0.1", "fields": {}}) + "\n" + json_file.write_text(line1) + + _, pos1 = _discover_attackers(json_file, 0) + + # Append more + with open(json_file, "a") as f: + f.write(json.dumps({"service": "ssh", "attacker_ip": "10.0.0.2", "fields": {}}) + "\n") + + ips, pos2 = _discover_attackers(json_file, pos1) + assert ips == {"10.0.0.2"} # only the new one + assert pos2 > pos1 + + def test_handles_file_rotation(self, tmp_path: Path): + json_file = tmp_path / "decnet.json" + # Write enough data to push position well ahead + lines = [json.dumps({"service": "ssh", "attacker_ip": f"10.0.0.{i}", "fields": {}}) + "\n" for i in range(10)] + json_file.write_text("".join(lines)) + _, pos = _discover_attackers(json_file, 0) + assert pos > 0 + + # Simulate rotation — new file is smaller than the old position + json_file.write_text(json.dumps({"service": "ssh", "attacker_ip": "10.0.0.99", "fields": {}}) + "\n") + assert json_file.stat().st_size < pos + + ips, new_pos = _discover_attackers(json_file, pos) + assert "10.0.0.99" in ips + + def test_handles_malformed_json(self, tmp_path: Path): + json_file = tmp_path / "decnet.json" + json_file.write_text("not valid json\n" + json.dumps({"service": "ssh", "attacker_ip": "10.0.0.1", "fields": {}}) + "\n") + + ips, _ = _discover_attackers(json_file, 0) + assert "10.0.0.1" in ips + + +# ─── _probe_cycle ──────────────────────────────────────────────────────────── + +class TestProbeCycle: + + @patch("decnet.prober.worker.jarm_hash") + def test_probes_new_ips(self, mock_jarm: MagicMock, tmp_path: Path): + mock_jarm.return_value = "c0c" * 10 + "a" * 32 # fake 62-char hash + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + targets = {"10.0.0.1"} + probed: dict[str, set[int]] = {} + + _probe_cycle(targets, probed, [443, 8443], log_path, json_path, timeout=1.0) + + assert mock_jarm.call_count == 2 # two ports + assert 443 in probed["10.0.0.1"] + assert 8443 in probed["10.0.0.1"] + + @patch("decnet.prober.worker.jarm_hash") + def test_skips_already_probed_ports(self, mock_jarm: MagicMock, tmp_path: Path): + mock_jarm.return_value = "c0c" * 10 + "a" * 32 + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + targets = {"10.0.0.1"} + probed: dict[str, set[int]] = {"10.0.0.1": {443}} + + _probe_cycle(targets, probed, [443, 8443], log_path, json_path, timeout=1.0) + + # Should only probe 8443 (443 already done) + assert mock_jarm.call_count == 1 + mock_jarm.assert_called_once_with("10.0.0.1", 8443, timeout=1.0) + + @patch("decnet.prober.worker.jarm_hash") + def test_empty_hash_not_logged(self, mock_jarm: MagicMock, tmp_path: Path): + """All-zeros JARM hash (no TLS server) should not be written as a jarm_fingerprint event.""" + mock_jarm.return_value = JARM_EMPTY_HASH + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + targets = {"10.0.0.1"} + probed: dict[str, set[int]] = {} + + _probe_cycle(targets, probed, [443], log_path, json_path, timeout=1.0) + + # Port should be marked as probed + assert 443 in probed["10.0.0.1"] + # But no jarm_fingerprint event should be written + if json_path.exists(): + content = json_path.read_text() + assert "jarm_fingerprint" not in content + + @patch("decnet.prober.worker.jarm_hash") + def test_exception_marks_port_probed(self, mock_jarm: MagicMock, tmp_path: Path): + mock_jarm.side_effect = OSError("Connection refused") + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + targets = {"10.0.0.1"} + probed: dict[str, set[int]] = {} + + _probe_cycle(targets, probed, [443], log_path, json_path, timeout=1.0) + + # Port marked as probed to avoid infinite retries + assert 443 in probed["10.0.0.1"] + + @patch("decnet.prober.worker.jarm_hash") + def test_skips_ip_with_all_ports_done(self, mock_jarm: MagicMock, tmp_path: Path): + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + targets = {"10.0.0.1"} + probed: dict[str, set[int]] = {"10.0.0.1": {443, 8443}} + + _probe_cycle(targets, probed, [443, 8443], log_path, json_path, timeout=1.0) + + assert mock_jarm.call_count == 0 + + +# ─── _write_event ──────────────────────────────────────────────────────────── + +class TestWriteEvent: + + def test_writes_rfc5424_and_json(self, tmp_path: Path): + log_path = tmp_path / "decnet.log" + json_path = tmp_path / "decnet.json" + + _write_event(log_path, json_path, "test_event", target_ip="10.0.0.1", msg="test") + + assert log_path.exists() + assert json_path.exists() + + log_content = log_path.read_text() + assert "test_event" in log_content + assert "decnet@55555" in log_content + + json_content = json_path.read_text() + record = json.loads(json_content.strip()) + assert record["event_type"] == "test_event" + assert record["service"] == "prober" + assert record["fields"]["target_ip"] == "10.0.0.1"