Neue Tests in dieser Migration: - test_database.py (Merkliste-CRUD, Subscriptions, abgeordnetenwatch-Joins) - test_clustering.py (82% Coverage) - test_drucksache_typen.py (100%) - test_mail.py (86%) - test_monitoring.py (23 Tests) - test_abgeordnetenwatch.py (23 Tests, inkl. Drucksache-Extraction) - test_redline_parser.py (20 Tests fuer §INS§/§DEL§-Marker) - test_bug_regressions.py (PRAGMA, JWT-azp, CDU-PDF, PFLICHT-FRAKTIONEN, NRW-Titel) - test_embeddings_v3_v4.py (WRITE/READ-Pattern) - test_wahlprogramm_check.py (#128) - test_wahlprogramm_fetch.py (#138) - test_antrag/bewertung/abonnement_repository.py + test_llm_bewerter.py (DDD) - test_domain_behavior.py (5 Domain-Methoden boundary tests) - tests/e2e/test_ui.py (Playwright) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
355 lines
14 KiB
Python
355 lines
14 KiB
Python
"""Unit-Tests für app/mail.py (#134 Phase 2).
|
|
|
|
Testet Unsubscribe-Token-Round-Trip, Digest-Komposition, Filter-Logik
|
|
und run_daily_digest() mit leerer Subscription-Tabelle. SMTP-Calls
|
|
werden via monkeypatch/unittest.mock gestubbt — kein echter Netzwerk-Call.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import sys
|
|
import types
|
|
from unittest.mock import AsyncMock, MagicMock, patch, patch as _patch
|
|
|
|
import pytest
|
|
|
|
# aiosqlite ist im Unit-Test-Environment nicht installiert — stub before database import
|
|
if "aiosqlite" not in sys.modules:
|
|
_aio = types.ModuleType("aiosqlite")
|
|
sys.modules["aiosqlite"] = _aio
|
|
|
|
# ─── Import-Vorbereitung ─────────────────────────────────────────────────────
|
|
# config.py importiert pydantic_settings — conftest stubbt das bereits,
|
|
# aber für den direkten Mail-Test laden wir nochmal explizit ab.
|
|
|
|
from app.mail import (
|
|
_unsubscribe_token,
|
|
verify_unsubscribe_token,
|
|
unsubscribe_url,
|
|
compose_digest,
|
|
_filter_assessments,
|
|
run_daily_digest,
|
|
)
|
|
from app.config import settings
|
|
|
|
|
|
# ─── Hilfsfixtures ───────────────────────────────────────────────────────────
|
|
|
|
def _make_sub(id: int = 1, email: str = "test@example.com",
|
|
bundesland: str | None = None, partei: str | None = None,
|
|
last_sent: str | None = None) -> dict:
|
|
return {
|
|
"id": id,
|
|
"email": email,
|
|
"bundesland": bundesland,
|
|
"partei": partei,
|
|
"last_sent": last_sent,
|
|
"frequency": "daily",
|
|
}
|
|
|
|
|
|
def _make_assessment(drucksache: str = "18/1234",
|
|
title: str = "Testantrag",
|
|
bundesland: str = "NRW",
|
|
fraktionen: list[str] | None = None,
|
|
gwoe_score: int = 6,
|
|
empfehlung: str = "Empfohlen",
|
|
updated_at: str = "2026-04-20T10:00:00") -> dict:
|
|
return {
|
|
"drucksache": drucksache,
|
|
"title": title,
|
|
"bundesland": bundesland,
|
|
"fraktionen": fraktionen or ["SPD"],
|
|
"gwoe_score": gwoe_score,
|
|
"empfehlung": empfehlung,
|
|
"antrag_zusammenfassung": "Eine kurze Zusammenfassung.",
|
|
"updated_at": updated_at,
|
|
}
|
|
|
|
|
|
# ─── Unsubscribe-Token ────────────────────────────────────────────────────────
|
|
|
|
class TestUnsubscribeToken:
|
|
def test_round_trip_valid(self):
|
|
token = _unsubscribe_token(42)
|
|
assert verify_unsubscribe_token(42, token) is True
|
|
|
|
def test_wrong_sub_id_rejected(self):
|
|
token = _unsubscribe_token(42)
|
|
assert verify_unsubscribe_token(99, token) is False
|
|
|
|
def test_tampered_token_rejected(self):
|
|
token = _unsubscribe_token(1)
|
|
tampered = token[:-1] + ("X" if token[-1] != "X" else "Y")
|
|
assert verify_unsubscribe_token(1, tampered) is False
|
|
|
|
def test_token_is_urlsafe_string(self):
|
|
"""Token darf keine +, / oder = enthalten (URL-Sicherheit)."""
|
|
token = _unsubscribe_token(7)
|
|
assert "+" not in token
|
|
assert "/" not in token
|
|
assert "=" not in token
|
|
|
|
def test_token_length_22(self):
|
|
token = _unsubscribe_token(1)
|
|
assert len(token) == 22
|
|
|
|
def test_different_ids_produce_different_tokens(self):
|
|
t1 = _unsubscribe_token(1)
|
|
t2 = _unsubscribe_token(2)
|
|
assert t1 != t2
|
|
|
|
def test_unsubscribe_url_contains_base_url_and_token(self):
|
|
url = unsubscribe_url(5)
|
|
token = _unsubscribe_token(5)
|
|
assert settings.base_url in url
|
|
assert "/unsubscribe/5/" in url
|
|
assert token in url
|
|
|
|
|
|
# ─── _filter_assessments ─────────────────────────────────────────────────────
|
|
|
|
class TestFilterAssessments:
|
|
def test_no_filter_returns_all(self):
|
|
rows = [_make_assessment(bundesland="NRW"), _make_assessment(bundesland="BY")]
|
|
result = _filter_assessments(rows, bundesland=None, partei=None, since=None)
|
|
assert len(result) == 2
|
|
|
|
def test_bundesland_filter_nrw_only(self):
|
|
rows = [
|
|
_make_assessment(bundesland="NRW"),
|
|
_make_assessment(bundesland="BY"),
|
|
_make_assessment(bundesland="NRW"),
|
|
]
|
|
result = _filter_assessments(rows, bundesland="NRW", partei=None, since=None)
|
|
assert len(result) == 2
|
|
assert all(r["bundesland"] == "NRW" for r in result)
|
|
|
|
def test_bundesland_filter_empty_result(self):
|
|
rows = [_make_assessment(bundesland="BY")]
|
|
result = _filter_assessments(rows, bundesland="NRW", partei=None, since=None)
|
|
assert result == []
|
|
|
|
def test_partei_filter_case_insensitive(self):
|
|
rows = [
|
|
_make_assessment(fraktionen=["SPD", "GRÜNE"]),
|
|
_make_assessment(fraktionen=["CDU"]),
|
|
]
|
|
result = _filter_assessments(rows, bundesland=None, partei="spd", since=None)
|
|
assert len(result) == 1
|
|
assert "SPD" in result[0]["fraktionen"]
|
|
|
|
def test_partei_filter_no_match(self):
|
|
rows = [_make_assessment(fraktionen=["CDU"])]
|
|
result = _filter_assessments(rows, bundesland=None, partei="FDP", since=None)
|
|
assert result == []
|
|
|
|
def test_since_filter_excludes_older(self):
|
|
rows = [
|
|
_make_assessment(updated_at="2026-04-19T10:00:00"), # vor since → raus
|
|
_make_assessment(updated_at="2026-04-20T10:00:00"), # gleich since → raus (<=)
|
|
_make_assessment(updated_at="2026-04-21T10:00:00"), # nach since → drin
|
|
]
|
|
result = _filter_assessments(rows, bundesland=None, partei=None,
|
|
since="2026-04-20T10:00:00")
|
|
assert len(result) == 1
|
|
assert result[0]["updated_at"] == "2026-04-21T10:00:00"
|
|
|
|
def test_combined_bundesland_and_partei_filter(self):
|
|
rows = [
|
|
_make_assessment(bundesland="NRW", fraktionen=["SPD"]),
|
|
_make_assessment(bundesland="NRW", fraktionen=["CDU"]),
|
|
_make_assessment(bundesland="BY", fraktionen=["SPD"]),
|
|
]
|
|
result = _filter_assessments(rows, bundesland="NRW", partei="SPD", since=None)
|
|
assert len(result) == 1
|
|
assert result[0]["bundesland"] == "NRW"
|
|
assert "SPD" in result[0]["fraktionen"]
|
|
|
|
def test_none_fraktionen_handled(self):
|
|
rows = [{"drucksache": "x", "bundesland": "NRW", "fraktionen": None,
|
|
"updated_at": "2026-04-20T10:00:00"}]
|
|
result = _filter_assessments(rows, bundesland=None, partei="SPD", since=None)
|
|
assert result == []
|
|
|
|
|
|
# ─── compose_digest ──────────────────────────────────────────────────────────
|
|
|
|
class TestComposeDigest:
|
|
def test_subject_contains_count_and_filter_label(self):
|
|
sub = _make_sub(bundesland="NRW", partei="SPD")
|
|
assessments = [_make_assessment(), _make_assessment(drucksache="18/5678")]
|
|
subject, _, _ = compose_digest(sub, assessments)
|
|
assert "2" in subject
|
|
assert "NRW" in subject
|
|
assert "SPD" in subject
|
|
|
|
def test_subject_singular_for_one_assessment(self):
|
|
sub = _make_sub()
|
|
subject, _, _ = compose_digest(sub, [_make_assessment()])
|
|
# "Bewertung" ohne "en" bei n=1
|
|
assert "Bewertung" in subject
|
|
assert "Bewertungen" not in subject
|
|
|
|
def test_subject_plural_for_multiple(self):
|
|
sub = _make_sub()
|
|
rows = [_make_assessment(drucksache=f"18/{i}") for i in range(3)]
|
|
subject, _, _ = compose_digest(sub, rows)
|
|
assert "Bewertungen" in subject
|
|
|
|
def test_filter_label_all_when_no_filter(self):
|
|
sub = _make_sub() # kein BL/Partei
|
|
subject, text, _ = compose_digest(sub, [_make_assessment()])
|
|
assert "alle Bundesländer" in text
|
|
|
|
def test_text_body_contains_assessment_title(self):
|
|
sub = _make_sub()
|
|
row = _make_assessment(title="Klimaschutzantrag NRW")
|
|
_, text, _ = compose_digest(sub, [row])
|
|
assert "Klimaschutzantrag NRW" in text
|
|
|
|
def test_text_body_contains_unsubscribe_url(self):
|
|
sub = _make_sub(id=7)
|
|
_, text, _ = compose_digest(sub, [_make_assessment()])
|
|
token = _unsubscribe_token(7)
|
|
assert token in text
|
|
assert "/unsubscribe/7/" in text
|
|
|
|
def test_html_body_is_valid_html(self):
|
|
sub = _make_sub()
|
|
_, _, html_body = compose_digest(sub, [_make_assessment()])
|
|
assert "<!DOCTYPE html>" in html_body
|
|
assert "<html" in html_body
|
|
assert "</html>" in html_body
|
|
|
|
def test_html_body_contains_score(self):
|
|
sub = _make_sub()
|
|
row = _make_assessment(gwoe_score=8)
|
|
_, _, html_body = compose_digest(sub, [row])
|
|
assert "8/10" in html_body
|
|
|
|
def test_truncation_at_20_assessments(self):
|
|
sub = _make_sub()
|
|
rows = [_make_assessment(drucksache=f"18/{i}") for i in range(25)]
|
|
_, text, html_body = compose_digest(sub, rows)
|
|
assert "und 5 weitere" in text
|
|
assert "5 weitere" in html_body
|
|
|
|
def test_no_truncation_marker_for_20_or_fewer(self):
|
|
sub = _make_sub()
|
|
rows = [_make_assessment(drucksache=f"18/{i}") for i in range(20)]
|
|
_, text, _ = compose_digest(sub, rows)
|
|
assert "weitere" not in text
|
|
|
|
def test_html_escaping_in_title(self):
|
|
sub = _make_sub()
|
|
row = _make_assessment(title='<script>alert("xss")</script>')
|
|
_, _, html_body = compose_digest(sub, [row])
|
|
assert "<script>" not in html_body
|
|
assert "<script>" in html_body
|
|
|
|
|
|
# ─── run_daily_digest — leere Subscription-Tabelle ───────────────────────────
|
|
|
|
class TestRunDailyDigest:
|
|
def test_empty_subscriptions_returns_zero_stats(self):
|
|
"""Dry-Run mit leerer Subscription-Tabelle → {sent:0, failed:0, skipped_empty:0}."""
|
|
# run_daily_digest importiert database-Symbole lazy (inside the function),
|
|
# daher müssen wir app.database patchen, nicht app.mail.
|
|
import app.database as db_mod
|
|
|
|
async def fake_get_all_subscriptions_due(frequency):
|
|
return []
|
|
|
|
with patch.object(db_mod, "get_all_subscriptions_due",
|
|
side_effect=fake_get_all_subscriptions_due):
|
|
result = asyncio.run(run_daily_digest())
|
|
|
|
assert result == {"sent": 0, "failed": 0, "skipped_empty": 0}
|
|
|
|
def test_subscription_with_no_matching_assessments_increments_skipped(self):
|
|
"""Subscription ohne passende Anträge → skipped_empty +1, kein SMTP-Call."""
|
|
import app.database as db_mod
|
|
|
|
sub = _make_sub(id=1, bundesland="NRW", last_sent="2026-04-20T00:00:00")
|
|
|
|
async def fake_subs(frequency):
|
|
return [sub]
|
|
|
|
async def fake_all_assessments(limit):
|
|
# Nur BY-Anträge, kein NRW → kein Match für sub
|
|
return [_make_assessment(bundesland="BY")]
|
|
|
|
async def fake_mark_sent(sub_id):
|
|
pass
|
|
|
|
with (
|
|
patch.object(db_mod, "get_all_subscriptions_due", side_effect=fake_subs),
|
|
patch.object(db_mod, "get_all_assessments", side_effect=fake_all_assessments),
|
|
patch.object(db_mod, "mark_subscription_sent", side_effect=fake_mark_sent),
|
|
):
|
|
result = asyncio.run(run_daily_digest())
|
|
|
|
assert result["skipped_empty"] == 1
|
|
assert result["sent"] == 0
|
|
assert result["failed"] == 0
|
|
|
|
def test_successful_send_increments_sent(self):
|
|
"""Subscription mit passendem Assessment → sent +1, kein echter SMTP-Call."""
|
|
import app.database as db_mod
|
|
import app.mail as mail_mod
|
|
|
|
sub = _make_sub(id=2, bundesland="NRW", last_sent="2026-04-01T00:00:00")
|
|
assessment = _make_assessment(bundesland="NRW", updated_at="2026-04-20T12:00:00")
|
|
|
|
async def fake_subs(frequency):
|
|
return [sub]
|
|
|
|
async def fake_all_assessments(limit):
|
|
return [assessment]
|
|
|
|
async def fake_mark_sent(sub_id):
|
|
pass
|
|
|
|
async def fake_send_mail(to, subject, text, html):
|
|
pass # kein SMTP
|
|
|
|
with (
|
|
patch.object(db_mod, "get_all_subscriptions_due", side_effect=fake_subs),
|
|
patch.object(db_mod, "get_all_assessments", side_effect=fake_all_assessments),
|
|
patch.object(db_mod, "mark_subscription_sent", side_effect=fake_mark_sent),
|
|
patch.object(mail_mod, "send_mail", side_effect=fake_send_mail),
|
|
):
|
|
result = asyncio.run(run_daily_digest())
|
|
|
|
assert result["sent"] == 1
|
|
assert result["failed"] == 0
|
|
assert result["skipped_empty"] == 0
|
|
|
|
def test_smtp_exception_increments_failed(self):
|
|
"""Wenn send_mail wirft → failed +1, kein Crash."""
|
|
import app.database as db_mod
|
|
import app.mail as mail_mod
|
|
|
|
sub = _make_sub(id=3, bundesland="NRW", last_sent="2026-04-01T00:00:00")
|
|
assessment = _make_assessment(bundesland="NRW", updated_at="2026-04-20T12:00:00")
|
|
|
|
async def fake_subs(frequency):
|
|
return [sub]
|
|
|
|
async def fake_all_assessments(limit):
|
|
return [assessment]
|
|
|
|
async def fake_send_mail_fail(to, subject, text, html):
|
|
raise ConnectionError("TCP drop (1blu)")
|
|
|
|
with (
|
|
patch.object(db_mod, "get_all_subscriptions_due", side_effect=fake_subs),
|
|
patch.object(db_mod, "get_all_assessments", side_effect=fake_all_assessments),
|
|
patch.object(mail_mod, "send_mail", side_effect=fake_send_mail_fail),
|
|
):
|
|
result = asyncio.run(run_daily_digest())
|
|
|
|
assert result["failed"] == 1
|
|
assert result["sent"] == 0
|