Files
DECNET/tests/orchestrator/emailgen/test_events.py
anti f2b3393669 chore: relicense to AGPL-3.0-or-later and add SPDX headers
Replaces LICENSE (GPLv3 -> AGPLv3) and prepends
`SPDX-License-Identifier: AGPL-3.0-or-later` to every source file
across decnet/, decnet_web/, tests/, scripts/, and tools/.

Rationale: closes the GPLv3 ASP loophole so any party operating a
modified DECNET as a network service must offer their modified
source. Personal copyright (Samuel Paschuan) + inbound=outbound
contributions make a future unilateral relicense infeasible.

- LICENSE: full AGPL-3.0 text (gnu.org/licenses/agpl-3.0.txt)
- COPYRIGHT: project copyright notice
- tools/add_spdx_headers.py: idempotent header injector
  (shebang- and PEP 263-aware)

Touches 1565 source files (.py, .ts, .tsx, .js, .jsx, .css, .sh).
No behavior change; comments only.
2026-05-22 21:04:16 -04:00

74 lines
2.3 KiB
Python

# SPDX-License-Identifier: AGPL-3.0-or-later
"""events.to_row / topic_for / event_type_for."""
from __future__ import annotations
from decnet.bus import topics as _topics
from decnet.orchestrator.drivers.base import ActivityResult
from decnet.orchestrator.emailgen import events
from decnet.realism.personas import EmailPersona
from decnet.orchestrator.emailgen.scheduler import EmailAction
def _persona(email="john@corp.com"):
return EmailPersona(
name="John", email=email, role="COO", tone="formal",
mannerisms=[], language="en",
)
def _action():
return EmailAction(
mail_decky_uuid="d1",
mail_decky_name="mailhost",
mail_decky_services=("imap",),
sender=_persona(),
recipient=_persona(email="sarah@corp.com"),
thread_id="thr1",
parent_message_id=None,
references="",
subject_hint=None,
parent_excerpt=None,
context_hint="Q3 budget",
is_reply=False,
)
def test_to_row_pulls_message_id_subject_from_payload():
res = ActivityResult(
success=True,
payload={
"message_id": "<m1@corp.com>",
"subject": "Q3 budget",
"language": "en",
"eml_path": "/var/spool/decnet-emails/thr1/m1.eml",
"model": "llama3.1",
},
)
row = events.to_row(_action(), res)
assert row["mail_decky_uuid"] == "d1"
assert row["thread_id"] == "thr1"
assert row["message_id"] == "<m1@corp.com>"
assert row["subject"] == "Q3 budget"
assert row["sender_email"] == "john@corp.com"
assert row["recipient_email"] == "sarah@corp.com"
assert row["language"] == "en"
assert row["eml_path"].endswith(".eml")
assert row["success"] is True
assert row["payload"]["model"] == "llama3.1"
def test_to_row_falls_back_to_persona_language():
res = ActivityResult(success=True, payload={})
row = events.to_row(_action(), res)
assert row["language"] == "en"
assert row["message_id"] == ""
def test_topic_for_uses_orchestrator_email_root():
topic = events.topic_for(_action())
assert topic == f"orchestrator.{_topics.ORCHESTRATOR_EMAIL}.d1"
def test_event_type_for_returns_email_constant():
assert events.event_type_for(_action()) == _topics.ORCHESTRATOR_EMAIL