docs(ttp): catalogue producer wiring for every TTP-watched topic
Add a "Producer wiring" subsection under TTP_TAGGING.md §"Bus topics" mapping every topic the TTP worker subscribes to onto the file:line that publishes it. Calls out the gap (`email.received` has no producer today) and the new `attacker.session.ended` payload shape from the collector aggregator. Also lists the four producer regression tests added in this series so a future contributor sees the safety net before staring at the silent rule engine. DEBT.md gets the `attacker.email.received` follow-up entry — wire the producer when SMTP-receive persistence lands, since today the honeypot relay path doesn't store received emails anywhere a publisher could read from.
This commit is contained in:
15
DEBT.md
15
DEBT.md
@@ -33,3 +33,18 @@ stays YAML-only.
|
|||||||
|
|
||||||
Trigger: v0 precision targets met + at least one downstream user
|
Trigger: v0 precision targets met + at least one downstream user
|
||||||
who needs it.
|
who needs it.
|
||||||
|
|
||||||
|
### `attacker.email.received` producer — wire when SMTP-receive
|
||||||
|
### persistence lands
|
||||||
|
|
||||||
|
The TTP worker subscribes to `email.received` for the EmailLifter
|
||||||
|
(R0041–R0048), but no upstream component publishes the topic today.
|
||||||
|
The honeypot SMTP-relay path (`decnet/services/smtp_relay.py`) does
|
||||||
|
not persist received emails to a DB table the way ingester /
|
||||||
|
collector persist log events, so there is no source row to fan out
|
||||||
|
on. See `development/TTP_TAGGING.md` §"Bus topics → Producer
|
||||||
|
wiring" for the full producer audit.
|
||||||
|
|
||||||
|
Trigger: SMTP-receive persistence model lands (a `ReceivedEmail`
|
||||||
|
SQLModel + ingest path). Wire the publisher in the same PR.
|
||||||
|
Owner: TBD.
|
||||||
|
|||||||
@@ -658,6 +658,63 @@ test plan) cross-reference back here rather than restating —
|
|||||||
duplicating the rule across three locations is a maintenance
|
duplicating the rule across three locations is a maintenance
|
||||||
liability, not enforcement.
|
liability, not enforcement.
|
||||||
|
|
||||||
|
### Producer wiring (who publishes what)
|
||||||
|
|
||||||
|
The TTP worker subscribes; the topics it watches are produced
|
||||||
|
elsewhere in the tree. Catalogued here because "subscriber set up,
|
||||||
|
nothing happens" is the failure mode worth surfacing first when
|
||||||
|
debugging silent rule-engine output.
|
||||||
|
|
||||||
|
| Topic | Producer | Notes |
|
||||||
|
|---|---|---|
|
||||||
|
| `attacker.observed` | `decnet/correlation/engine.py` (`_publish_fn` on first sighting per IP) | One event per attacker_ip per profiler-process lifetime — replays after a restart re-emit. |
|
||||||
|
| `attacker.scored` | `decnet/profiler/worker.py` | Fired after every incremental profile update. |
|
||||||
|
| `attacker.intel.enriched` | `decnet/intel/worker.py` | Per-row publish after `upsert_attacker_intel`. Gated on `repo.get_unenriched_attackers` returning rows. |
|
||||||
|
| `identity.formed` / `merged` / `observation.linked` / `unmerged` | `decnet/clustering/worker.py:_publish_result` | Fans out the four sub-lists of `ClusterResult`. Gated on the clusterer producing material side-effects. |
|
||||||
|
| `credential.reuse.detected` | `decnet/correlation/reuse_worker.py` | Per-finding publish; gated on `min_targets ≥ 2`. |
|
||||||
|
| `attacker.session.ended` | `decnet/collector/worker.py:_SessionAggregator` | Indexes shell `command` events per `attacker_ip` and emits one envelope per `session_recorded` log event. |
|
||||||
|
| `canary.{token}.triggered` | `decnet/canary/planter.py` | Per-token canary callbacks. |
|
||||||
|
| `email.received` | **none** | No producer in tree (DEBT — wire when SMTP-receive persistence lands). |
|
||||||
|
|
||||||
|
**`attacker.session.ended` payload shape** (commit-1 of the
|
||||||
|
collector producer wiring):
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"session_id": "<sid>" | null,
|
||||||
|
"attacker_uuid": null,
|
||||||
|
"attacker_ip": "192.168.1.5",
|
||||||
|
"decky_id": "omega-decky",
|
||||||
|
"service": "ssh",
|
||||||
|
"ended_at": "2026-05-02T06:23:30+00:00",
|
||||||
|
"duration_s": 165.914,
|
||||||
|
"commands": [
|
||||||
|
{"id": "<sid>#0", "command_text": "ls /var/www/html",
|
||||||
|
"ts": "2026-05-02T06:22:48+00:00",
|
||||||
|
"decky": "SRV-DELTA-77", "service": "bash"}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
`attacker_uuid` is null because the collector doesn't talk to the
|
||||||
|
DB; the TTP worker resolves it from `attacker_ip` on the consume
|
||||||
|
side. `id` per command is `f"{sid}#{idx}"` so the deterministic
|
||||||
|
`compute_tag_uuid` collapses on replay (loop-prevention).
|
||||||
|
|
||||||
|
### Producer–consumer health checks
|
||||||
|
|
||||||
|
Each producer is pinned by a regression test that drives one tick
|
||||||
|
with a fake bus + stubbed repo and asserts the topic fires:
|
||||||
|
|
||||||
|
* `tests/collector/test_session_ended_publish.py`
|
||||||
|
* `tests/correlation/test_reuse_worker_publish.py`
|
||||||
|
* `tests/clustering/test_worker_publish.py`
|
||||||
|
* `tests/intel/test_worker_publish.py`
|
||||||
|
|
||||||
|
These run alongside the TTP suite. If a future refactor moves a
|
||||||
|
publish call out of the loop body or mis-spells a topic constant,
|
||||||
|
one of these flips red on the next CI run.
|
||||||
|
|
||||||
## Worker shape
|
## Worker shape
|
||||||
|
|
||||||
`decnet/ttp/` mirrors `decnet/intel/` and `decnet/clustering/` —
|
`decnet/ttp/` mirrors `decnet/intel/` and `decnet/clustering/` —
|
||||||
|
|||||||
Reference in New Issue
Block a user