gwoe-antragspruefer/app/mail.py

221 lines
8.4 KiB
Python
Raw Normal View History

"""Mail-Sending + Daily-Digest für E-Mail-Benachrichtigungen (#124).
Nutzt die Standard-Library `smtplib` (blockierend) in einem Thread-Executor,
damit kein zusätzlicher Dependency-Eintrag nötig ist. 1blu SMTP:
smtp.1blu.de:465 SSL, username = Postfachname (NICHT E-Mail!)
Credentials kommen aus settings.smtp_user / smtp_password via ENV.
Unsubscribe-Token: HMAC-SHA256 von sub_id + secret, URL-sicher base64-encoded.
"""
from __future__ import annotations
import asyncio
import base64
import hashlib
import hmac
import html
import logging
import smtplib
import ssl
from datetime import datetime
from email.message import EmailMessage
from .config import settings
logger = logging.getLogger(__name__)
# ─── Unsubscribe-Token ──────────────────────────────────────────────────────
def _unsubscribe_token(sub_id: int) -> str:
"""Erzeugt HMAC-Token für Unsubscribe-Link."""
msg = str(sub_id).encode()
sig = hmac.new(settings.unsubscribe_secret.encode(), msg, hashlib.sha256).digest()
return base64.urlsafe_b64encode(sig).decode().rstrip("=")[:22]
def verify_unsubscribe_token(sub_id: int, token: str) -> bool:
"""Verifiziert, dass der Token zur sub_id passt. Konstante Zeit."""
expected = _unsubscribe_token(sub_id)
return hmac.compare_digest(expected, token)
def unsubscribe_url(sub_id: int) -> str:
token = _unsubscribe_token(sub_id)
return f"{settings.base_url}/unsubscribe/{sub_id}/{token}"
# ─── SMTP-Send ──────────────────────────────────────────────────────────────
def _send_sync(to_email: str, subject: str, text_body: str, html_body: str) -> None:
"""Blockierender Send via smtplib."""
if not settings.smtp_host or not settings.smtp_user:
raise RuntimeError("SMTP nicht konfiguriert (settings.smtp_host/user leer)")
msg = EmailMessage()
msg["From"] = f"{settings.smtp_from_name} <{settings.smtp_from_email}>"
msg["To"] = to_email
msg["Subject"] = subject
msg.set_content(text_body)
msg.add_alternative(html_body, subtype="html")
ctx = ssl.create_default_context()
with smtplib.SMTP_SSL(settings.smtp_host, settings.smtp_port, context=ctx) as server:
server.login(settings.smtp_user, settings.smtp_password)
server.send_message(msg)
async def send_mail(to_email: str, subject: str, text_body: str, html_body: str) -> None:
"""Async-Wrapper — SMTP-Call läuft im Thread-Executor."""
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, _send_sync, to_email, subject, text_body, html_body)
# ─── Digest-Komposition ─────────────────────────────────────────────────────
def _filter_assessments(rows: list[dict], bundesland: str | None, partei: str | None, since: str | None) -> list[dict]:
"""Filtert Assessment-Rows nach Abo-Kriterien."""
result = []
for r in rows:
if bundesland and (r.get("bundesland") or "") != bundesland:
continue
if partei:
fraktionen = r.get("fraktionen") or []
if not any(partei.upper() in (f or "").upper() for f in fraktionen):
continue
if since and (r.get("updated_at") or "") <= since:
continue
result.append(r)
return result
def compose_digest(sub: dict, assessments: list[dict]) -> tuple[str, str, str]:
"""Baut Subject, Text- und HTML-Body für einen Digest.
Returns: (subject, text_body, html_body)
"""
n = len(assessments)
filter_label_parts = []
if sub.get("bundesland"):
filter_label_parts.append(sub["bundesland"])
if sub.get("partei"):
filter_label_parts.append(sub["partei"])
filter_label = " · ".join(filter_label_parts) if filter_label_parts else "alle Bundesländer & Parteien"
subject = f"[GWÖ-Antragsprüfer] {n} neue Bewertung{'en' if n != 1 else ''}{filter_label}"
unsub = unsubscribe_url(sub["id"])
# Plaintext
text_lines = [
f"Neue Antragsbewertungen — Filter: {filter_label}",
"=" * 60,
"",
]
for a in assessments[:20]:
score = a.get("gwoe_score")
title = a.get("title") or a.get("drucksache")
emp = a.get("empfehlung") or ""
fraktionen = ", ".join(a.get("fraktionen") or [])
url = f"{settings.base_url}/?drucksache={a.get('drucksache')}"
text_lines.append(f"{title}")
text_lines.append(f" Score: {score}/10 — {emp}")
text_lines.append(f" Fraktionen: {fraktionen}")
text_lines.append(f" {url}")
text_lines.append("")
if n > 20:
text_lines.append(f"… und {n - 20} weitere. Alle anzeigen: {settings.base_url}")
text_lines.append("")
text_lines.append("")
text_lines.append(f"Abo verwalten: {settings.base_url}")
text_lines.append(f"Abbestellen: {unsub}")
text_body = "\n".join(text_lines)
# HTML
html_items = []
for a in assessments[:20]:
score = a.get("gwoe_score")
title = html.escape(a.get("title") or a.get("drucksache") or "")
emp = html.escape(a.get("empfehlung") or "")
fraktionen = html.escape(", ".join(a.get("fraktionen") or []))
zus = html.escape((a.get("antrag_zusammenfassung") or "")[:200])
url = html.escape(f"{settings.base_url}/?drucksache={a.get('drucksache')}")
html_items.append(f"""
<div style="border-left:3px solid #007a80;padding:8px 12px;margin:12px 0;background:#f9f9f9">
<a href="{url}" style="color:#007a80;text-decoration:none;font-weight:bold">{title}</a><br>
<span style="color:#666;font-size:0.9em">Score: <b>{score}/10</b> {emp} {fraktionen}</span><br>
<span style="color:#444;font-size:0.9em">{zus}</span>
</div>""")
more_link = ""
if n > 20:
more_link = f'<p><a href="{settings.base_url}">… und {n - 20} weitere ansehen</a></p>'
html_body = f"""<!DOCTYPE html>
<html><body style="font-family:Helvetica,Arial,sans-serif;max-width:600px;margin:0 auto;padding:20px;color:#333">
<h2 style="color:#007a80">{n} neue Antragsbewertung{'en' if n != 1 else ''}</h2>
<p style="color:#666">Filter: <b>{html.escape(filter_label)}</b></p>
{''.join(html_items)}
{more_link}
<hr style="border:none;border-top:1px solid #ddd;margin:20px 0">
<p style="font-size:0.85em;color:#888">
<a href="{html.escape(settings.base_url)}" style="color:#888">Abo verwalten</a> &middot;
<a href="{html.escape(unsub)}" style="color:#888">Abbestellen</a>
</p>
</body></html>"""
return subject, text_body, html_body
async def run_daily_digest() -> dict:
"""Daily-Digest-Runner. Iteriert alle due Abos und verschickt.
Gibt Statistik zurück: {sent, failed, skipped_empty}.
"""
from .database import (
get_all_assessments,
get_all_subscriptions_due,
mark_subscription_sent,
)
stats = {"sent": 0, "failed": 0, "skipped_empty": 0}
subs = await get_all_subscriptions_due("daily")
if not subs:
logger.info("run_daily_digest: keine due subscriptions")
return stats
all_assessments = await get_all_assessments(None)
for sub in subs:
matches = _filter_assessments(
all_assessments,
bundesland=sub.get("bundesland"),
partei=sub.get("partei"),
since=sub.get("last_sent"),
)
if not matches:
stats["skipped_empty"] += 1
# Last-sent trotzdem setzen, damit wir nicht jede Minute wieder testen
await mark_subscription_sent(sub["id"])
continue
try:
subject, text_body, html_body = compose_digest(sub, matches)
await send_mail(sub["email"], subject, text_body, html_body)
await mark_subscription_sent(sub["id"])
stats["sent"] += 1
logger.info("digest sent to %s (%d items)", sub["email"], len(matches))
except Exception:
logger.exception("digest failed for sub_id=%s", sub["id"])
stats["failed"] += 1
return stats
if __name__ == "__main__":
# python -m app.mail → führt den Daily-Digest-Lauf aus
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
result = asyncio.run(run_daily_digest())
print(f"Digest-Lauf fertig: {result}")